Fill Empty Cells
Fill empty/null cells in columns with a fixed value.
Fill Empty Cells
Processing
The function fills empty or null cells within specified columns of an input data structure (Pandas DataFrame, Polars DataFrame, or Arrow Table) using a fixed user-defined value. Columns can be targeted explicitly by name list, implicitly using a regular expression pattern, or by targeting all columns. If Auto Cast Value is enabled, the fill value is automatically cast to match the data type of the target column.
Inputs
- data
- The input data structure (Pandas DataFrame, Polars DataFrame, or PyArrow Table) containing cells to be filled.
- columns (optional)
- A list of specific column names to target for filling. If left empty, the operation applies to all columns unless a
regex patternor theRegex Patternoption is provided. - regex pattern (optional)
- A regular expression pattern used to match and select columns for filling. If provided, this method of selection overrides the explicit
columnslist. - fill value (optional)
- The fixed value used to replace empty or null cells in the target columns. This value can be a string, boolean, number, or date/datetime object.
Inputs Types
| Input | Types |
|---|---|
data |
DataFrame, ArrowTable |
columns |
List |
regex pattern |
Str |
fill value |
Str, Bool, Number, Date, Datetime |
You can check the list of supported types here: Available Type Hints.
Outputs
- result
- The resulting data structure after the null/empty cells have been filled according to the specified options and preferred output format.
Outputs Types
| Output | Types |
|---|---|
| result | DataFrame, ArrowTable |
You can check the list of supported types here: Available Type Hints.
Options
The Fill Empty Cells brick contains some changeable options:
- Columns to Fill
- Specifies a list of column names that should have their empty cells replaced. If neither this nor the Regex Pattern is specified, all columns will be targeted.
- Regex Pattern
- A regular expression used to select columns for filling. If this option is provided, it takes precedence over the explicit column list.
- Fill Value
- The value to be inserted into the empty or null cells.
- Auto Cast Value
- If enabled, the Fill Value will be automatically cast to the appropriate data type of the target column (e.g., casting '10' to an integer if the column is numeric).
- Output Format
- Defines the data structure format of the returned result. Choices are 'pandas', 'polars', or 'arrow'. Defaults to 'pandas'.
- Safe Mode
- If enabled, the brick will ignore errors such as missing columns specified in the list or columns where the
Fill Valuecannot be successfully cast to the column's native type, allowing the flow to continue. - Verbose
- If enabled, detailed log messages regarding the operation steps, column matching, and casting are displayed.
import logging
import duckdb
import pandas as pd
import polars as pl
import pyarrow as pa
import re
from coded_flows.types import (
Union,
List,
DataFrame,
ArrowTable,
Str,
Number,
Date,
Datetime,
Bool,
)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def _coalesce(*values):
return next((v for v in values if v is not None), None)
def _sanitize_identifier(identifier):
"""
Sanitize SQL identifier by escaping special characters.
Handles double quotes and other problematic characters.
"""
return identifier.replace('"', '""')
def _get_sql_cast_type(duckdb_type):
"""
Map DuckDB types to appropriate SQL cast types.
"""
type_lower = duckdb_type.lower()
if any(
(t in type_lower for t in ["tinyint", "smallint", "integer", "bigint", "int"])
):
return "BIGINT"
if any(
(t in type_lower for t in ["float", "double", "real", "decimal", "numeric"])
):
return "DOUBLE"
if "bool" in type_lower:
return "BOOLEAN"
if "date" in type_lower and "time" not in type_lower:
return "DATE"
if "timestamp" in type_lower:
return "TIMESTAMP"
if "time" in type_lower:
return "TIME"
return "VARCHAR"
def _cast_value_for_sql(value, sql_type, column_name):
"""
Prepare a value for SQL insertion, handling casting appropriately.
Returns a SQL expression string.
NOTE: This function assumes 'value' is already a STRING.
"""
if value == "":
if sql_type in ["BIGINT", "DOUBLE", "BOOLEAN"]:
return "NULL"
return "''"
if value.upper() == "NULL":
return "NULL"
if sql_type == "BOOLEAN":
if value.lower() in ["true", "1", "yes", "t", "y"]:
return "TRUE"
elif value.lower() in ["false", "0", "no", "f", "n"]:
return "FALSE"
else:
raise ValueError(
f"Cannot cast '{value}' to BOOLEAN for column '{column_name}'"
)
if sql_type in ["BIGINT", "DOUBLE"]:
try:
float(value)
return value
except ValueError:
raise ValueError(
f"Cannot cast '{value}' to {sql_type} for column '{column_name}'"
)
if sql_type in ["DATE", "TIMESTAMP", "TIME"]:
escaped_value = value.replace("'", "''")
return f"CAST('{escaped_value}' AS {sql_type})"
escaped_value = value.replace("'", "''")
return f"'{escaped_value}'"
def fill_empty_cells(
data: Union[DataFrame, ArrowTable],
columns: List = None,
regex_pattern: Str = None,
fill_value: Union[Str, Bool, Number, Date, Datetime] = None,
options=None,
) -> Union[DataFrame, ArrowTable]:
brick_display_name = "Fill Empty Cells"
options = options or {}
verbose = options.get("verbose", True)
columns = _coalesce(columns, options.get("columns", []))
regex_pattern = _coalesce(regex_pattern, options.get("regex_pattern", ""))
fill_all_columns = len(columns) == 0
fill_value = _coalesce(fill_value, options.get("fill_value", ""))
fill_value_str = str(fill_value)
auto_cast = options.get("auto_cast", True)
output_format = options.get("output_format", "pandas")
safe_mode = options.get("safe_mode", False)
result = None
conn = None
if not isinstance(columns, list) and (
not all((isinstance(c, str) for c in columns))
):
verbose and logger.error(
f"[{brick_display_name}] Invalid columns format! Expected a list."
)
raise ValueError("Columns must be provided as a list!")
try:
fill_mode = None
if regex_pattern:
fill_mode = "regex_pattern"
elif fill_all_columns:
fill_mode = "all_columns"
else:
fill_mode = "column_list"
verbose and logger.info(
f"[{brick_display_name}] Detected mode: '{fill_mode}'. Starting fill operation with value: '{fill_value_str}'."
)
data_type = None
if isinstance(data, pd.DataFrame):
data_type = "pandas"
elif isinstance(data, pl.DataFrame):
data_type = "polars"
elif isinstance(data, (pa.Table, pa.lib.Table)):
data_type = "arrow"
if data_type is None:
verbose and logger.error(
f"[{brick_display_name}] Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
)
raise ValueError(
"Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
)
verbose and logger.info(
f"[{brick_display_name}] Detected input format: {data_type}."
)
conn = duckdb.connect(":memory:")
conn.register("input_table", data)
column_info = conn.execute("DESCRIBE input_table").fetchall()
all_columns = {col[0]: col[1] for col in column_info}
verbose and logger.info(
f"[{brick_display_name}] Total columns in data: {len(all_columns)}."
)
columns_to_fill = []
if fill_mode == "column_list":
if not safe_mode:
missing_columns = [col for col in columns if col not in all_columns]
if missing_columns:
verbose and logger.error(
f"[{brick_display_name}] Columns not found in data: {missing_columns}"
)
raise ValueError(f"Columns not found in data: {missing_columns}")
columns_to_fill = [col for col in columns if col in all_columns]
skipped = len(columns) - len(columns_to_fill)
if safe_mode and skipped > 0:
skipped_cols = [col for col in columns if col not in all_columns]
verbose and logger.warning(
f"[{brick_display_name}] Safe mode: Skipped {skipped} non-existent columns: {skipped_cols}"
)
verbose and logger.info(
f"[{brick_display_name}] Filling {len(columns_to_fill)} column(s): {columns_to_fill}."
)
elif fill_mode == "regex_pattern":
try:
pattern = re.compile(regex_pattern)
columns_to_fill = [
col for col in all_columns.keys() if pattern.search(col)
]
if not columns_to_fill:
verbose and logger.warning(
f"[{brick_display_name}] No columns matched regex pattern '{regex_pattern}'. Returning data unchanged."
)
verbose and logger.info(
f"[{brick_display_name}] Regex pattern '{regex_pattern}' matched {len(columns_to_fill)} columns: {columns_to_fill}."
)
except re.error as e:
verbose and logger.error(
f"[{brick_display_name}] Invalid regex pattern."
)
raise ValueError(f"Invalid regex pattern!")
elif fill_mode == "all_columns":
columns_to_fill = list(all_columns.keys())
verbose and logger.info(
f"[{brick_display_name}] Filling all {len(columns_to_fill)} columns."
)
select_parts = []
filled_count = 0
for col in all_columns.keys():
sanitized_col = _sanitize_identifier(col)
if col in columns_to_fill:
col_type = all_columns[col]
try:
if auto_cast:
sql_type = _get_sql_cast_type(col_type)
cast_value = _cast_value_for_sql(fill_value_str, sql_type, col)
verbose and logger.info(
f"[{brick_display_name}] Column '{col}' (type: {col_type}): filling with {cast_value} (cast to {sql_type})."
)
select_parts.append(
f'COALESCE("{sanitized_col}", {cast_value}) AS "{sanitized_col}"'
)
else:
escaped_value = fill_value_str.replace("'", "''")
select_parts.append(
f'''COALESCE("{sanitized_col}", '{escaped_value}') AS "{sanitized_col}"'''
)
verbose and logger.info(
f"[{brick_display_name}] Column '{col}': filling with '{fill_value_str}' (no auto-cast)."
)
filled_count += 1
except ValueError as e:
if safe_mode:
verbose and logger.warning(
f"[{brick_display_name}] Safe mode: Skipping column '{col}' due to cast error."
)
select_parts.append(f'"{sanitized_col}"')
else:
verbose and logger.error(
f"[{brick_display_name}] Error casting value for column '{col}'."
)
raise
else:
select_parts.append(f'"{sanitized_col}"')
if filled_count == 0:
verbose and logger.warning(
f"[{brick_display_name}] No columns were filled. Returning data unchanged."
)
result = data
else:
select_clause = ", ".join(select_parts)
query = f"SELECT {select_clause} FROM input_table"
verbose and logger.info(
f"[{brick_display_name}] Executing query to fill empty cells."
)
if output_format == "pandas":
result = conn.execute(query).df()
verbose and logger.info(
f"[{brick_display_name}] Converted result to pandas DataFrame."
)
elif output_format == "polars":
result = conn.execute(query).pl()
verbose and logger.info(
f"[{brick_display_name}] Converted result to Polars DataFrame."
)
elif output_format == "arrow":
result = conn.execute(query).fetch_arrow_table()
verbose and logger.info(
f"[{brick_display_name}] Converted result to Arrow Table."
)
else:
verbose and logger.error(
f"[{brick_display_name}] Unsupported output format: {output_format}"
)
raise ValueError(f"Unsupported output format: {output_format}")
verbose and logger.info(
f"[{brick_display_name}] Fill operation completed successfully. Filled {filled_count} column(s)."
)
except Exception as e:
verbose and logger.error(
f"[{brick_display_name}] Error during fill operation: {str(e)}"
)
raise
finally:
if conn is not None:
conn.close()
return result
Brick Info
- pandas
- polars[pyarrow]
- duckdb
- pyarrow