Fill Empty Cells

Fill empty/null cells in columns with a fixed value.

Fill Empty Cells

Processing

The function fills empty or null cells within specified columns of an input data structure (Pandas DataFrame, Polars DataFrame, or Arrow Table) using a fixed user-defined value. Columns can be targeted explicitly by name list, implicitly using a regular expression pattern, or by targeting all columns. If Auto Cast Value is enabled, the fill value is automatically cast to match the data type of the target column.

Inputs

data
The input data structure (Pandas DataFrame, Polars DataFrame, or PyArrow Table) containing cells to be filled.
columns (optional)
A list of specific column names to target for filling. If left empty, the operation applies to all columns unless a regex pattern or the Regex Pattern option is provided.
regex pattern (optional)
A regular expression pattern used to match and select columns for filling. If provided, this method of selection overrides the explicit columns list.
fill value (optional)
The fixed value used to replace empty or null cells in the target columns. This value can be a string, boolean, number, or date/datetime object.

Inputs Types

Input Types
data DataFrame, ArrowTable
columns List
regex pattern Str
fill value Str, Bool, Number, Date, Datetime

You can check the list of supported types here: Available Type Hints.

Outputs

result
The resulting data structure after the null/empty cells have been filled according to the specified options and preferred output format.

Outputs Types

Output Types
result DataFrame, ArrowTable

You can check the list of supported types here: Available Type Hints.

Options

The Fill Empty Cells brick contains some changeable options:

Columns to Fill
Specifies a list of column names that should have their empty cells replaced. If neither this nor the Regex Pattern is specified, all columns will be targeted.
Regex Pattern
A regular expression used to select columns for filling. If this option is provided, it takes precedence over the explicit column list.
Fill Value
The value to be inserted into the empty or null cells.
Auto Cast Value
If enabled, the Fill Value will be automatically cast to the appropriate data type of the target column (e.g., casting '10' to an integer if the column is numeric).
Output Format
Defines the data structure format of the returned result. Choices are 'pandas', 'polars', or 'arrow'. Defaults to 'pandas'.
Safe Mode
If enabled, the brick will ignore errors such as missing columns specified in the list or columns where the Fill Value cannot be successfully cast to the column's native type, allowing the flow to continue.
Verbose
If enabled, detailed log messages regarding the operation steps, column matching, and casting are displayed.
import logging
import duckdb
import pandas as pd
import polars as pl
import pyarrow as pa
import re
from coded_flows.types import (
    Union,
    List,
    DataFrame,
    ArrowTable,
    Str,
    Number,
    Date,
    Datetime,
    Bool,
)

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def _coalesce(*values):
    return next((v for v in values if v is not None), None)


def _sanitize_identifier(identifier):
    """
    Sanitize SQL identifier by escaping special characters.
    Handles double quotes and other problematic characters.
    """
    return identifier.replace('"', '""')


def _get_sql_cast_type(duckdb_type):
    """
    Map DuckDB types to appropriate SQL cast types.
    """
    type_lower = duckdb_type.lower()
    if any(
        (t in type_lower for t in ["tinyint", "smallint", "integer", "bigint", "int"])
    ):
        return "BIGINT"
    if any(
        (t in type_lower for t in ["float", "double", "real", "decimal", "numeric"])
    ):
        return "DOUBLE"
    if "bool" in type_lower:
        return "BOOLEAN"
    if "date" in type_lower and "time" not in type_lower:
        return "DATE"
    if "timestamp" in type_lower:
        return "TIMESTAMP"
    if "time" in type_lower:
        return "TIME"
    return "VARCHAR"


def _cast_value_for_sql(value, sql_type, column_name):
    """
    Prepare a value for SQL insertion, handling casting appropriately.
    Returns a SQL expression string.

    NOTE: This function assumes 'value' is already a STRING.
    """
    if value == "":
        if sql_type in ["BIGINT", "DOUBLE", "BOOLEAN"]:
            return "NULL"
        return "''"
    if value.upper() == "NULL":
        return "NULL"
    if sql_type == "BOOLEAN":
        if value.lower() in ["true", "1", "yes", "t", "y"]:
            return "TRUE"
        elif value.lower() in ["false", "0", "no", "f", "n"]:
            return "FALSE"
        else:
            raise ValueError(
                f"Cannot cast '{value}' to BOOLEAN for column '{column_name}'"
            )
    if sql_type in ["BIGINT", "DOUBLE"]:
        try:
            float(value)
            return value
        except ValueError:
            raise ValueError(
                f"Cannot cast '{value}' to {sql_type} for column '{column_name}'"
            )
    if sql_type in ["DATE", "TIMESTAMP", "TIME"]:
        escaped_value = value.replace("'", "''")
        return f"CAST('{escaped_value}' AS {sql_type})"
    escaped_value = value.replace("'", "''")
    return f"'{escaped_value}'"


def fill_empty_cells(
    data: Union[DataFrame, ArrowTable],
    columns: List = None,
    regex_pattern: Str = None,
    fill_value: Union[Str, Bool, Number, Date, Datetime] = None,
    options=None,
) -> Union[DataFrame, ArrowTable]:
    brick_display_name = "Fill Empty Cells"
    options = options or {}
    verbose = options.get("verbose", True)
    columns = _coalesce(columns, options.get("columns", []))
    regex_pattern = _coalesce(regex_pattern, options.get("regex_pattern", ""))
    fill_all_columns = len(columns) == 0
    fill_value = _coalesce(fill_value, options.get("fill_value", ""))
    fill_value_str = str(fill_value)
    auto_cast = options.get("auto_cast", True)
    output_format = options.get("output_format", "pandas")
    safe_mode = options.get("safe_mode", False)
    result = None
    conn = None
    if not isinstance(columns, list) and (
        not all((isinstance(c, str) for c in columns))
    ):
        verbose and logger.error(
            f"[{brick_display_name}] Invalid columns format! Expected a list."
        )
        raise ValueError("Columns must be provided as a list!")
    try:
        fill_mode = None
        if regex_pattern:
            fill_mode = "regex_pattern"
        elif fill_all_columns:
            fill_mode = "all_columns"
        else:
            fill_mode = "column_list"
        verbose and logger.info(
            f"[{brick_display_name}] Detected mode: '{fill_mode}'. Starting fill operation with value: '{fill_value_str}'."
        )
        data_type = None
        if isinstance(data, pd.DataFrame):
            data_type = "pandas"
        elif isinstance(data, pl.DataFrame):
            data_type = "polars"
        elif isinstance(data, (pa.Table, pa.lib.Table)):
            data_type = "arrow"
        if data_type is None:
            verbose and logger.error(
                f"[{brick_display_name}] Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
            )
            raise ValueError(
                "Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
            )
        verbose and logger.info(
            f"[{brick_display_name}] Detected input format: {data_type}."
        )
        conn = duckdb.connect(":memory:")
        conn.register("input_table", data)
        column_info = conn.execute("DESCRIBE input_table").fetchall()
        all_columns = {col[0]: col[1] for col in column_info}
        verbose and logger.info(
            f"[{brick_display_name}] Total columns in data: {len(all_columns)}."
        )
        columns_to_fill = []
        if fill_mode == "column_list":
            if not safe_mode:
                missing_columns = [col for col in columns if col not in all_columns]
                if missing_columns:
                    verbose and logger.error(
                        f"[{brick_display_name}] Columns not found in data: {missing_columns}"
                    )
                    raise ValueError(f"Columns not found in data: {missing_columns}")
            columns_to_fill = [col for col in columns if col in all_columns]
            skipped = len(columns) - len(columns_to_fill)
            if safe_mode and skipped > 0:
                skipped_cols = [col for col in columns if col not in all_columns]
                verbose and logger.warning(
                    f"[{brick_display_name}] Safe mode: Skipped {skipped} non-existent columns: {skipped_cols}"
                )
            verbose and logger.info(
                f"[{brick_display_name}] Filling {len(columns_to_fill)} column(s): {columns_to_fill}."
            )
        elif fill_mode == "regex_pattern":
            try:
                pattern = re.compile(regex_pattern)
                columns_to_fill = [
                    col for col in all_columns.keys() if pattern.search(col)
                ]
                if not columns_to_fill:
                    verbose and logger.warning(
                        f"[{brick_display_name}] No columns matched regex pattern '{regex_pattern}'. Returning data unchanged."
                    )
                verbose and logger.info(
                    f"[{brick_display_name}] Regex pattern '{regex_pattern}' matched {len(columns_to_fill)} columns: {columns_to_fill}."
                )
            except re.error as e:
                verbose and logger.error(
                    f"[{brick_display_name}] Invalid regex pattern."
                )
                raise ValueError(f"Invalid regex pattern!")
        elif fill_mode == "all_columns":
            columns_to_fill = list(all_columns.keys())
            verbose and logger.info(
                f"[{brick_display_name}] Filling all {len(columns_to_fill)} columns."
            )
        select_parts = []
        filled_count = 0
        for col in all_columns.keys():
            sanitized_col = _sanitize_identifier(col)
            if col in columns_to_fill:
                col_type = all_columns[col]
                try:
                    if auto_cast:
                        sql_type = _get_sql_cast_type(col_type)
                        cast_value = _cast_value_for_sql(fill_value_str, sql_type, col)
                        verbose and logger.info(
                            f"[{brick_display_name}] Column '{col}' (type: {col_type}): filling with {cast_value} (cast to {sql_type})."
                        )
                        select_parts.append(
                            f'COALESCE("{sanitized_col}", {cast_value}) AS "{sanitized_col}"'
                        )
                    else:
                        escaped_value = fill_value_str.replace("'", "''")
                        select_parts.append(
                            f'''COALESCE("{sanitized_col}", '{escaped_value}') AS "{sanitized_col}"'''
                        )
                        verbose and logger.info(
                            f"[{brick_display_name}] Column '{col}': filling with '{fill_value_str}' (no auto-cast)."
                        )
                    filled_count += 1
                except ValueError as e:
                    if safe_mode:
                        verbose and logger.warning(
                            f"[{brick_display_name}] Safe mode: Skipping column '{col}' due to cast error."
                        )
                        select_parts.append(f'"{sanitized_col}"')
                    else:
                        verbose and logger.error(
                            f"[{brick_display_name}] Error casting value for column '{col}'."
                        )
                        raise
            else:
                select_parts.append(f'"{sanitized_col}"')
        if filled_count == 0:
            verbose and logger.warning(
                f"[{brick_display_name}] No columns were filled. Returning data unchanged."
            )
            result = data
        else:
            select_clause = ", ".join(select_parts)
            query = f"SELECT {select_clause} FROM input_table"
            verbose and logger.info(
                f"[{brick_display_name}] Executing query to fill empty cells."
            )
            if output_format == "pandas":
                result = conn.execute(query).df()
                verbose and logger.info(
                    f"[{brick_display_name}] Converted result to pandas DataFrame."
                )
            elif output_format == "polars":
                result = conn.execute(query).pl()
                verbose and logger.info(
                    f"[{brick_display_name}] Converted result to Polars DataFrame."
                )
            elif output_format == "arrow":
                result = conn.execute(query).fetch_arrow_table()
                verbose and logger.info(
                    f"[{brick_display_name}] Converted result to Arrow Table."
                )
            else:
                verbose and logger.error(
                    f"[{brick_display_name}] Unsupported output format: {output_format}"
                )
                raise ValueError(f"Unsupported output format: {output_format}")
            verbose and logger.info(
                f"[{brick_display_name}] Fill operation completed successfully. Filled {filled_count} column(s)."
            )
    except Exception as e:
        verbose and logger.error(
            f"[{brick_display_name}] Error during fill operation: {str(e)}"
        )
        raise
    finally:
        if conn is not None:
            conn.close()
    return result

Brick Info

version v0.1.3
python 3.10, 3.11, 3.12, 3.13
requirements
  • pandas
  • polars[pyarrow]
  • duckdb
  • pyarrow