Fill Forward/Backward
Fill empty cells with previous (forward fill) or next (backward fill) non-empty value.
Fill Forward/Backward
Processing
Fill empty cells in specified columns using the preceding non-empty value (forward fill) or the subsequent non-empty value (backward fill). This operation handles null/empty values within Pandas DataFrames, Polars DataFrames, or Arrow Tables using efficient window functions executed internally.
Inputs
- data
- The input data structure (DataFrame or Arrow Table) to be processed.
- columns (optional)
- A list of column names on which the fill operation should be performed. If provided, it overrides column selection by regex, unless both are empty, in which case all columns are processed.
- regex pattern (optional)
- A regular expression pattern used to select columns for filling. If provided, it determines which columns are targeted.
- fill direction (optional)
- Specifies the direction of the fill ('forward' to use the previous non-null value, or 'backward' to use the next non-null value).
Inputs Types
| Input | Types |
|---|---|
data |
DataFrame, ArrowTable |
columns |
List |
regex pattern |
Str |
fill direction |
Str |
You can check the list of supported types here: Available Type Hints.
Outputs
- result
- The resulting data structure with empty cells filled according to the specified direction, returned in the format defined by the
Output Formatoption.
Outputs Types
| Output | Types |
|---|---|
result |
DataFrame, ArrowTable |
You can check the list of supported types here: Available Type Hints.
Options
The Fill Forward/Backward brick contains some changeable options:
- Columns to Fill
- A list of specific columns to apply the fill operation on. If left empty and no Regex Pattern is provided, all columns are targeted.
- Regex Pattern (optional)
- A regular expression used to filter and select columns for the fill operation. If provided, it determines which columns are targeted, overriding the column list selection.
- Fill Direction
- Determines whether to use the previous non-null value (
forward) or the next non-null value (backward) to fill empty cells. Defaults to 'forward'. - Output Format
- Specifies the desired format of the output data structure (
pandas,polars, orarrow). Defaults to 'pandas'. - Safe Mode
- If enabled, columns specified in
Columns to Fillthat do not exist in the input data will be silently skipped instead of raising an error. Defaults to False. - Verbose
- If enabled, detailed logging information regarding the process, mode detection, and execution will be displayed in the console. Defaults to True.
import logging
import duckdb
import pandas as pd
import polars as pl
import pyarrow as pa
import re
from coded_flows.types import Union, List, DataFrame, ArrowTable, Str
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def _coalesce(*values):
return next((v for v in values if v is not None), None)
def _sanitize_identifier(identifier):
"""
Sanitize SQL identifier by escaping special characters.
Handles double quotes and other problematic characters.
"""
return identifier.replace('"', '""')
def fill_forward_backward(
data: Union[DataFrame, ArrowTable],
columns: List = None,
regex_pattern: Str = None,
fill_direction: Str = None,
options=None,
) -> Union[DataFrame, ArrowTable]:
brick_display_name = "Fill Forward/Backward"
options = options or {}
verbose = options.get("verbose", True)
columns = _coalesce(columns, options.get("columns", []))
regex_pattern = _coalesce(regex_pattern, options.get("regex_pattern", ""))
fill_all_columns = len(columns) == 0
fill_direction = _coalesce(fill_direction, options.get("fill_direction", "forward"))
output_format = options.get("output_format", "pandas")
safe_mode = options.get("safe_mode", False)
result = None
conn = None
if not isinstance(columns, list) and (
not all((isinstance(c, str) for c in columns))
):
verbose and logger.error(
f"[{brick_display_name}] Invalid columns format! Expected a list."
)
raise ValueError("Columns must be provided as a list!")
try:
if fill_direction not in ["forward", "backward"]:
verbose and logger.error(
f"[{brick_display_name}] Invalid fill direction: {fill_direction}. Must be 'forward' or 'backward'."
)
raise ValueError("Fill direction must be 'forward' or 'backward'")
fill_mode = None
if regex_pattern:
fill_mode = "regex_pattern"
elif fill_all_columns:
fill_mode = "all_columns"
elif columns:
fill_mode = "column_list"
if result is None:
verbose and logger.info(
f"[{brick_display_name}] Detected mode: '{fill_mode}'. Starting {fill_direction} fill operation."
)
data_type = None
if isinstance(data, pd.DataFrame):
data_type = "pandas"
elif isinstance(data, pl.DataFrame):
data_type = "polars"
elif isinstance(data, (pa.Table, pa.lib.Table)):
data_type = "arrow"
if data_type is None:
verbose and logger.error(
f"[{brick_display_name}] Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
)
raise ValueError(
"Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
)
verbose and logger.info(
f"[{brick_display_name}] Detected input format: {data_type}."
)
conn = duckdb.connect(":memory:")
conn.register("input_table", data)
column_info = conn.execute("DESCRIBE input_table").fetchall()
all_columns = {col[0]: col[1] for col in column_info}
verbose and logger.info(
f"[{brick_display_name}] Total columns in data: {len(all_columns)}."
)
columns_to_fill = []
if fill_mode == "column_list":
if not safe_mode:
missing_columns = [col for col in columns if col not in all_columns]
if missing_columns:
verbose and logger.error(
f"[{brick_display_name}] Columns not found in data: {missing_columns}"
)
raise ValueError(
f"Columns not found in data: {missing_columns}"
)
columns_to_fill = [col for col in columns if col in all_columns]
skipped = len(columns) - len(columns_to_fill)
if safe_mode and skipped > 0:
skipped_cols = [col for col in columns if col not in all_columns]
verbose and logger.warning(
f"[{brick_display_name}] Safe mode: Skipped {skipped} non-existent columns: {skipped_cols}"
)
verbose and logger.info(
f"[{brick_display_name}] Filling {len(columns_to_fill)} column(s): {columns_to_fill}."
)
elif fill_mode == "regex_pattern":
try:
pattern = re.compile(regex_pattern)
columns_to_fill = [
col for col in all_columns.keys() if pattern.search(col)
]
if not columns_to_fill:
verbose and logger.warning(
f"[{brick_display_name}] No columns matched regex pattern '{regex_pattern}'. Returning data unchanged."
)
result = data
else:
verbose and logger.info(
f"[{brick_display_name}] Regex pattern '{regex_pattern}' matched {len(columns_to_fill)} columns: {columns_to_fill}."
)
except re.error as e:
verbose and logger.error(
f"[{brick_display_name}] Invalid regex pattern."
)
raise ValueError(f"Invalid regex pattern.")
elif fill_mode == "all_columns":
columns_to_fill = list(all_columns.keys())
verbose and logger.info(
f"[{brick_display_name}] Filling all {len(columns_to_fill)} columns."
)
if result is None:
if not columns_to_fill:
verbose and logger.warning(
f"[{brick_display_name}] No columns to fill. Returning data unchanged."
)
result = data
else:
conn.execute(
"CREATE TEMPORARY TABLE temp_table AS SELECT *, ROW_NUMBER() OVER () as __row_num FROM input_table"
)
select_parts = []
filled_count = 0
for col in all_columns.keys():
sanitized_col = _sanitize_identifier(col)
if col in columns_to_fill:
if fill_direction == "forward":
fill_expr = f'LAST_VALUE("{sanitized_col}" IGNORE NULLS) OVER (\n ORDER BY __row_num ASC\n ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW\n )'
else:
fill_expr = f'FIRST_VALUE("{sanitized_col}" IGNORE NULLS) OVER (\n ORDER BY __row_num ASC\n ROWS BETWEEN CURRENT ROW AND UNBOUNDED FOLLOWING\n )'
select_parts.append(f'{fill_expr} AS "{sanitized_col}"')
filled_count += 1
verbose and logger.info(
f"[{brick_display_name}] Column '{col}': applying {fill_direction} fill."
)
else:
select_parts.append(f'"{sanitized_col}"')
select_clause = ", ".join(select_parts)
query = (
f"SELECT {select_clause} FROM temp_table ORDER BY __row_num ASC"
)
verbose and logger.info(
f"[{brick_display_name}] Executing query to fill empty cells."
)
if output_format == "pandas":
result = conn.execute(query).df()
verbose and logger.info(
f"[{brick_display_name}] Converted result to pandas DataFrame."
)
elif output_format == "polars":
result = conn.execute(query).pl()
verbose and logger.info(
f"[{brick_display_name}] Converted result to Polars DataFrame."
)
elif output_format == "arrow":
result = conn.execute(query).fetch_arrow_table()
verbose and logger.info(
f"[{brick_display_name}] Converted result to Arrow Table."
)
else:
verbose and logger.error(
f"[{brick_display_name}] Unsupported output format: {output_format}"
)
raise ValueError(f"Unsupported output format: {output_format}")
verbose and logger.info(
f"[{brick_display_name}] Fill operation completed successfully. Filled {filled_count} column(s)."
)
except Exception as e:
verbose and logger.error(
f"[{brick_display_name}] Error during fill operation: {str(e)}"
)
raise
finally:
if conn is not None:
try:
conn.execute("DROP TABLE IF EXISTS temp_table")
except:
pass
conn.close()
return result
Brick Info
version
v0.1.3
python
3.10,
3.11,
3.12,
3.13
requirements
- pandas
- polars[pyarrow]
- duckdb
- pyarrow