Fill All Values

Replace all values in columns with a fixed value.

Fill All Values

Processing

This brick allows users to replace all existing values within specified columns of an input dataset (Pandas DataFrame, Polars DataFrame, or Arrow Table) with a single, fixed value. Column selection can be achieved by providing an explicit list of column names, using a regular expression pattern, or by targeting all columns if neither is provided.

Inputs

data
The input dataset (Pandas DataFrame, Polars DataFrame, or Arrow Table) to be processed.
columns (optional)
A list of specific column names to apply the fill operation to. If left empty, the operation applies to all columns unless a regex pattern or the Regex Pattern option is provided.
regex pattern (optional)
A regular expression used to dynamically select column names for the fill operation.
fill value (optional)
The value to be inserted into the selected columns, replacing all existing data.

Inputs Types

Input Types
data DataFrame, ArrowTable
columns List
regex pattern Str
fill value Str, Bool, Number, Date, Datetime

You can check the list of supported types here: Available Type Hints.

Outputs

result
The resulting dataset with the selected columns filled with the specified value, returned in the format defined by the Output Format option.

Outputs Types

Output Types
result DataFrame, ArrowTable

You can check the list of supported types here: Available Type Hints.

Options

The Fill All Values brick contains some changeable options:

Columns to Fill
A list of columns that will have all their values replaced by the Fill Value. This list is ignored if a Regex Pattern is provided.
Regex Pattern
If provided, this pattern will be used to select all matching column names for the fill operation. If both Columns to Fill and Regex Pattern are empty, all columns are targeted.
Fill Value
The static value that will be inserted into the target columns. This value can be a string, number, boolean, or date/datetime literal.
Auto Cast Value
If enabled (default), the system attempts to cast the Fill Value to match the native data type of the target column (e.g., converting a string representation of a number into an integer type). If disabled, the value is treated as a string, potentially causing type conflicts if inserted into strict numeric columns.
Output Format
Specifies the data format for the returned result: pandas DataFrame (default), polars DataFrame, or arrow Table.
Safe Mode
If enabled, the brick will safely skip columns that are specified but do not exist in the input data, or columns where type casting of the Fill Value fails, instead of raising an error and halting the flow.
Verbose
If enabled, detailed logs about the process, column matching, and casting attempts will be printed to the console.
import logging
import duckdb
import pandas as pd
import polars as pl
import pyarrow as pa
import re
from coded_flows.types import (
    Union,
    List,
    DataFrame,
    ArrowTable,
    Str,
    Number,
    Date,
    Datetime,
    Bool,
)

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def _coalesce(*values):
    return next((v for v in values if v is not None), None)


def _sanitize_identifier(identifier):
    """
    Sanitize SQL identifier by escaping special characters.
    Handles double quotes and other problematic characters.
    """
    return identifier.replace('"', '""')


def _get_sql_cast_type(duckdb_type):
    """
    Map DuckDB types to appropriate SQL cast types.
    """
    type_lower = duckdb_type.lower()
    if any(
        (t in type_lower for t in ["tinyint", "smallint", "integer", "bigint", "int"])
    ):
        return "BIGINT"
    if any(
        (t in type_lower for t in ["float", "double", "real", "decimal", "numeric"])
    ):
        return "DOUBLE"
    if "bool" in type_lower:
        return "BOOLEAN"
    if "date" in type_lower and "time" not in type_lower:
        return "DATE"
    if "timestamp" in type_lower:
        return "TIMESTAMP"
    if "time" in type_lower:
        return "TIME"
    return "VARCHAR"


def _cast_value_for_sql(value, sql_type, column_name):
    """
    Prepare a value for SQL insertion, handling casting appropriately.
    Returns a SQL expression string.

    NOTE: This function assumes 'value' is already a STRING.
    """
    if value == "":
        if sql_type in ["BIGINT", "DOUBLE", "BOOLEAN"]:
            return "NULL"
        return "''"
    if value.upper() == "NULL":
        return "NULL"
    if sql_type == "BOOLEAN":
        if value.lower() in ["true", "1", "yes", "t", "y"]:
            return "TRUE"
        elif value.lower() in ["false", "0", "no", "f", "n"]:
            return "FALSE"
        else:
            raise ValueError(
                f"Cannot cast '{value}' to BOOLEAN for column '{column_name}'"
            )
    if sql_type in ["BIGINT", "DOUBLE"]:
        try:
            float(value)
            return value
        except ValueError:
            raise ValueError(
                f"Cannot cast '{value}' to {sql_type} for column '{column_name}'"
            )
    if sql_type in ["DATE", "TIMESTAMP", "TIME"]:
        escaped_value = value.replace("'", "''")
        return f"CAST('{escaped_value}' AS {sql_type})"
    escaped_value = value.replace("'", "''")
    return f"'{escaped_value}'"


def fill_column(
    data: Union[DataFrame, ArrowTable],
    columns: List = None,
    regex_pattern: Str = None,
    fill_value: Union[Str, Bool, Number, Date, Datetime] = None,
    options=None,
) -> Union[DataFrame, ArrowTable]:
    brick_display_name = "Fill All Values"
    options = options or {}
    verbose = options.get("verbose", True)
    columns = _coalesce(columns, options.get("columns", []))
    regex_pattern = _coalesce(regex_pattern, options.get("regex_pattern", ""))
    fill_all_columns = len(columns) == 0
    fill_value = _coalesce(fill_value, options.get("fill_value", ""))
    fill_value_str = str(fill_value)
    auto_cast = options.get("auto_cast", True)
    output_format = options.get("output_format", "pandas")
    safe_mode = options.get("safe_mode", False)
    result = None
    conn = None
    if not isinstance(columns, list) or not all((isinstance(c, str) for c in columns)):
        verbose and logger.error(
            f"[{brick_display_name}] Invalid columns format! Expected a list."
        )
        raise ValueError("Columns must be provided as a list!")
    try:
        fill_mode = None
        if regex_pattern:
            fill_mode = "regex_pattern"
        elif fill_all_columns:
            fill_mode = "all_columns"
        else:
            fill_mode = "column_list"
        verbose and logger.info(
            f"[{brick_display_name}] Detected mode: '{fill_mode}'. Starting fill all operation with value: '{fill_value_str}'."
        )
        data_type = None
        if isinstance(data, pd.DataFrame):
            data_type = "pandas"
        elif isinstance(data, pl.DataFrame):
            data_type = "polars"
        elif isinstance(data, (pa.Table, pa.lib.Table)):
            data_type = "arrow"
        if data_type is None:
            verbose and logger.error(
                f"[{brick_display_name}] Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
            )
            raise ValueError(
                "Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
            )
        verbose and logger.info(
            f"[{brick_display_name}] Detected input format: {data_type}."
        )
        conn = duckdb.connect(":memory:")
        conn.register("input_table", data)
        column_info = conn.execute("DESCRIBE input_table").fetchall()
        all_columns = {col[0]: col[1] for col in column_info}
        verbose and logger.info(
            f"[{brick_display_name}] Total columns in data: {len(all_columns)}."
        )
        columns_to_fill = []
        if fill_mode == "column_list":
            if not safe_mode:
                missing_columns = [col for col in columns if col not in all_columns]
                if missing_columns:
                    verbose and logger.error(
                        f"[{brick_display_name}] Columns not found in data: {missing_columns}"
                    )
                    raise ValueError(f"Columns not found in data: {missing_columns}")
            columns_to_fill = [col for col in columns if col in all_columns]
            skipped = len(columns) - len(columns_to_fill)
            if safe_mode and skipped > 0:
                skipped_cols = [col for col in columns if col not in all_columns]
                verbose and logger.warning(
                    f"[{brick_display_name}] Safe mode: Skipped {skipped} non-existent columns: {skipped_cols}"
                )
            verbose and logger.info(
                f"[{brick_display_name}] Filling {len(columns_to_fill)} column(s): {columns_to_fill}."
            )
        elif fill_mode == "regex_pattern":
            try:
                pattern = re.compile(regex_pattern)
                columns_to_fill = [
                    col for col in all_columns.keys() if pattern.search(col)
                ]
                if not columns_to_fill:
                    verbose and logger.warning(
                        f"[{brick_display_name}] No columns matched regex pattern '{regex_pattern}'. Returning data unchanged."
                    )
                verbose and logger.info(
                    f"[{brick_display_name}] Regex pattern '{regex_pattern}' matched {len(columns_to_fill)} columns: {columns_to_fill}."
                )
            except re.error as e:
                verbose and logger.error(
                    f"[{brick_display_name}] Invalid regex pattern."
                )
                raise ValueError(f"Invalid regex pattern!")
        elif fill_mode == "all_columns":
            columns_to_fill = list(all_columns.keys())
            verbose and logger.info(
                f"[{brick_display_name}] Filling all {len(columns_to_fill)} columns."
            )
        select_parts = []
        filled_count = 0
        for col in all_columns.keys():
            sanitized_col = _sanitize_identifier(col)
            if col in columns_to_fill:
                col_type = all_columns[col]
                try:
                    if auto_cast:
                        sql_type = _get_sql_cast_type(col_type)
                        cast_value = _cast_value_for_sql(fill_value_str, sql_type, col)
                        verbose and logger.info(
                            f"[{brick_display_name}] Column '{col}' (type: {col_type}): replacing all values with {cast_value} (cast to {sql_type})."
                        )
                        select_parts.append(f'{cast_value} AS "{sanitized_col}"')
                    else:
                        escaped_value = fill_value_str.replace("'", "''")
                        select_parts.append(
                            f''''{escaped_value}' AS "{sanitized_col}"'''
                        )
                        verbose and logger.info(
                            f"[{brick_display_name}] Column '{col}': replacing all values with '{fill_value_str}' (no auto-cast)."
                        )
                    filled_count += 1
                except ValueError as e:
                    if safe_mode:
                        verbose and logger.warning(
                            f"[{brick_display_name}] Safe mode: Skipping column '{col}' due to cast error."
                        )
                        select_parts.append(f'"{sanitized_col}"')
                    else:
                        verbose and logger.error(
                            f"[{brick_display_name}] Error casting value for column '{col}'."
                        )
                        raise
            else:
                select_parts.append(f'"{sanitized_col}"')
        if filled_count == 0:
            verbose and logger.warning(
                f"[{brick_display_name}] No columns were filled. Returning data unchanged."
            )
            result = data
        else:
            select_clause = ", ".join(select_parts)
            query = f"SELECT {select_clause} FROM input_table"
            verbose and logger.info(
                f"[{brick_display_name}] Executing query to fill all values."
            )
            if output_format == "pandas":
                result = conn.execute(query).df()
                verbose and logger.info(
                    f"[{brick_display_name}] Converted result to pandas DataFrame."
                )
            elif output_format == "polars":
                result = conn.execute(query).pl()
                verbose and logger.info(
                    f"[{brick_display_name}] Converted result to Polars DataFrame."
                )
            elif output_format == "arrow":
                result = conn.execute(query).fetch_arrow_table()
                verbose and logger.info(
                    f"[{brick_display_name}] Converted result to Arrow Table."
                )
            else:
                verbose and logger.error(
                    f"[{brick_display_name}] Unsupported output format: {output_format}"
                )
                raise ValueError(f"Unsupported output format: {output_format}")
            verbose and logger.info(
                f"[{brick_display_name}] Fill all operation completed successfully. Filled {filled_count} column(s)."
            )
    except Exception as e:
        verbose and logger.error(
            f"[{brick_display_name}] Error during fill all operation: {str(e)}"
        )
        raise
    finally:
        if conn is not None:
            conn.close()
    return result

Brick Info

version v0.1.3
python 3.10, 3.11, 3.12, 3.13
requirements
  • pandas
  • polars[pyarrow]
  • duckdb
  • pyarrow