Filter Rows by Numeric Range
Filter rows from the dataset that contain numbers within a numerical range, or clear content from matching cells.
Filter Rows by Numeric Range
Processing
This function filters rows or modifies cells within an input dataset (Pandas DataFrame, Polars DataFrame, or Arrow Table) based on whether numeric values in specified columns fall within a defined minimum and maximum range. The exact operation (keeping rows, removing rows, or clearing cell content) is configurable via options. Processing is handled internally using DuckDB for robust and high-performance querying.
Inputs
- data
- The input dataset (DataFrame or Arrow Table) to be processed.
- action (optional)
- Determines how rows or cells are handled if they match the numeric range criteria,
keep_matching_rows(default),remove_matching_rows,clear_matching_cells, orclear_non_matching_cells.. - columns (optional)
- A list of specific numeric columns to check against the range criteria. If empty and no regex is provided, all numeric columns are checked.
- regex pattern (optional)
- A regex pattern used to select columns to check. If provided, it overrides the explicit
columnslist. - min value (optional)
- The inclusive lower bound of the numeric range.
- max value (optional)
- The inclusive upper bound of the numeric range.
- multi column mode (optional)
- Specifies the logical requirement when checking multiple columns (e.g., must
allmatch the range, oranymust match).
Inputs Types
| Input | Types |
|---|---|
data |
DataFrame, ArrowTable |
action |
Str |
columns |
List |
regex pattern |
Str |
min value |
Float |
max value |
Float |
multi column mode |
Str |
You can check the list of supported types here: Available Type Hints.
Outputs
- result
- The resulting dataset after applying the filter or cell clearance operations. The format depends on the
Output Formatoption.
Outputs Types
| Output | Types |
|---|---|
result |
DataFrame, ArrowTable |
You can check the list of supported types here: Available Type Hints.
Options
The Filter Rows by Numeric Range brick contains some changeable options:
- Action
- Defines the filtering behavior. Choices include
keep_matching_rows(default),remove_matching_rows,clear_matching_cells, orclear_non_matching_cells. - Columns to Check
- A list of column names that should be checked for values within the defined range.
- Column Regex Pattern
- A regular expression used to select columns for checking. If provided, it overrides explicit column selection.
- Minimum Value (inclusive)
- The lowest numeric value considered to be within the range.
- Maximum Value (inclusive)
- The highest numeric value considered to be within the range.
- Multi-Column Mode
- Controls how multiple checked columns contribute to the row filtering decision. Use
any(default) if one column must match, orallif all checked columns must match. - Output Format
- Specifies the desired format of the returned data (
pandas,polars, orarrow). - Safe Mode
- If enabled, the brick will ignore errors caused by requesting columns that do not exist in the dataset, allowing processing to continue.
- Verbose
- If enabled, logging information and details about the filtering process will be displayed.
import logging
import duckdb
import pandas as pd
import polars as pl
import pyarrow as pa
import re
from coded_flows.types import Union, List, DataFrame, ArrowTable, Str, Float, Bool
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def _coalesce(*values):
return next((v for v in values if v is not None), None)
def _sanitize_identifier(identifier):
"""
Sanitize SQL identifier by escaping special characters.
"""
return identifier.replace('"', '""')
def _build_range_condition(column_expr, min_value, max_value):
"""Build SQL condition to check if a column value is within the numeric range."""
numeric_expr = f"TRY_CAST({column_expr} AS DOUBLE)"
condition = f"({numeric_expr} IS NOT NULL AND {numeric_expr} >= {min_value} AND {numeric_expr} <= {max_value})"
return condition
def filter_rows_by_numeric_range(
data: Union[DataFrame, ArrowTable],
action: Str = None,
columns: List = None,
regex_pattern: Str = None,
min_value: Float = None,
max_value: Float = None,
multi_column_mode: Str = None,
options=None,
) -> Union[DataFrame, ArrowTable]:
brick_display_name = "Filter Rows by Numeric Range"
options = options or {}
verbose = options.get("verbose", True)
action = _coalesce(action, options.get("action", "keep_matching_rows"))
columns = _coalesce(columns, options.get("columns", []))
regex_pattern = _coalesce(regex_pattern, options.get("regex_pattern", ""))
min_value = _coalesce(min_value, options.get("min_value", 0))
max_value = _coalesce(max_value, options.get("max_value", 100))
multi_column_mode = _coalesce(
multi_column_mode, options.get("multi_column_mode", "any")
)
process_all_columns = len(columns) == 0 and (not regex_pattern)
output_format = options.get("output_format", "pandas")
safe_mode = options.get("safe_mode", False)
result = None
conn = None
if not isinstance(columns, list) or not all((isinstance(c, str) for c in columns)):
verbose and logger.error(
f"[{brick_display_name}] Invalid columns format! Expected a list."
)
raise ValueError("Columns must be provided as a list!")
if min_value > max_value:
verbose and logger.error(
f"[{brick_display_name}] Invalid range: min_value ({min_value}) is greater than max_value ({max_value})."
)
raise ValueError("min_value must be less than or equal to max_value!")
valid_actions = [
"keep_matching_rows",
"remove_matching_rows",
"clear_matching_cells",
"clear_non_matching_cells",
]
if action not in valid_actions:
verbose and logger.error(f"[{brick_display_name}] Invalid action: {action}.")
raise ValueError(f"Action must be one of {valid_actions}")
valid_multi_column_modes = ["all", "any"]
if multi_column_mode not in valid_multi_column_modes:
verbose and logger.error(
f"[{brick_display_name}] Invalid multi-column mode: {multi_column_mode}."
)
raise ValueError(f"Multi-column mode must be one of {valid_multi_column_modes}")
if result is None:
try:
verbose and logger.info(
f"[{brick_display_name}] Starting filter with action '{action}' for range [{min_value}, {max_value}]."
)
data_type = None
if isinstance(data, pd.DataFrame):
data_type = "pandas"
elif isinstance(data, pl.DataFrame):
data_type = "polars"
elif isinstance(data, (pa.Table, pa.lib.Table)):
data_type = "arrow"
if data_type is None:
verbose and logger.error(
f"[{brick_display_name}] Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
)
raise ValueError(
"Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
)
verbose and logger.info(
f"[{brick_display_name}] Detected input format: {data_type}."
)
conn = duckdb.connect(":memory:")
conn.register("input_table", data)
column_info = conn.execute("DESCRIBE input_table").fetchall()
all_columns = {col[0]: col[1] for col in column_info}
verbose and logger.info(
f"[{brick_display_name}] Total columns in data: {len(all_columns)}."
)
numeric_types = [
"TINYINT",
"SMALLINT",
"INTEGER",
"BIGINT",
"HUGEINT",
"UTINYINT",
"USMALLINT",
"UINTEGER",
"UBIGINT",
"FLOAT",
"DOUBLE",
"DECIMAL",
"NUMERIC",
"INT",
"INT2",
"INT4",
"INT8",
"REAL",
]
numeric_columns = {
col: dtype
for (col, dtype) in all_columns.items()
if any((num_type in dtype.upper() for num_type in numeric_types))
}
verbose and logger.info(
f"[{brick_display_name}] Numeric columns detected: {len(numeric_columns)} out of {len(all_columns)}."
)
columns_to_check = []
if regex_pattern:
try:
pattern = re.compile(regex_pattern)
columns_to_check = [
col for col in numeric_columns.keys() if pattern.search(col)
]
if not columns_to_check:
verbose and logger.warning(
f"[{brick_display_name}] No numeric columns matched regex pattern '{regex_pattern}'. Returning data unchanged."
)
result = data
else:
verbose and logger.info(
f"[{brick_display_name}] Regex pattern '{regex_pattern}' matched {len(columns_to_check)} numeric columns."
)
except re.error as e:
verbose and logger.error(
f"[{brick_display_name}] Invalid regex pattern."
)
raise ValueError(f"Invalid regex pattern: {e}")
elif process_all_columns:
columns_to_check = list(numeric_columns.keys())
verbose and logger.info(
f"[{brick_display_name}] Checking all {len(columns_to_check)} numeric columns."
)
else:
if not safe_mode:
missing_columns = [col for col in columns if col not in all_columns]
if missing_columns:
verbose and logger.error(
f"[{brick_display_name}] Columns not found in data: {missing_columns}"
)
raise ValueError(
f"Columns not found in data: {missing_columns}"
)
non_numeric_requested = [
col
for col in columns
if col in all_columns and col not in numeric_columns
]
if non_numeric_requested:
verbose and logger.warning(
f"[{brick_display_name}] Skipping non-numeric columns: {non_numeric_requested}"
)
columns_to_check = [col for col in columns if col in numeric_columns]
if safe_mode:
skipped = len(columns) - len(columns_to_check)
if skipped > 0:
skipped_cols = [
col for col in columns if col not in all_columns
]
verbose and logger.warning(
f"[{brick_display_name}] Safe mode: Skipped {skipped} non-existent columns: {skipped_cols}"
)
verbose and logger.info(
f"[{brick_display_name}] Checking {len(columns_to_check)} numeric column(s)."
)
if result is None:
if not columns_to_check:
verbose and logger.warning(
f"[{brick_display_name}] No columns to check. Returning data unchanged."
)
result = data
else:
column_conditions = []
for col in columns_to_check:
sanitized_col = _sanitize_identifier(col)
col_expr = f'"{sanitized_col}"'
condition = _build_range_condition(
col_expr, min_value, max_value
)
column_conditions.append(condition)
if multi_column_mode == "all":
row_match_condition = " AND ".join(column_conditions)
else:
row_match_condition = " OR ".join(column_conditions)
if action == "keep_matching_rows":
query = f"SELECT * FROM input_table WHERE {row_match_condition}"
verbose and logger.info(
f"[{brick_display_name}] Keeping rows where values are in range."
)
elif action == "remove_matching_rows":
query = f"SELECT * FROM input_table WHERE NOT ({row_match_condition})"
verbose and logger.info(
f"[{brick_display_name}] Removing rows where values are in range."
)
elif action in ["clear_matching_cells", "clear_non_matching_cells"]:
select_parts = []
clear_on_match = action == "clear_matching_cells"
for col in all_columns.keys():
sanitized_col = _sanitize_identifier(col)
if col in columns_to_check:
col_expr = f'"{sanitized_col}"'
col_condition = _build_range_condition(
col_expr, min_value, max_value
)
if clear_on_match:
select_parts.append(
f'CASE WHEN {col_condition} THEN NULL ELSE "{sanitized_col}" END AS "{sanitized_col}"'
)
else:
select_parts.append(
f'CASE WHEN {col_condition} THEN "{sanitized_col}" ELSE NULL END AS "{sanitized_col}"'
)
else:
select_parts.append(f'"{sanitized_col}"')
select_clause = ", ".join(select_parts)
query = f"SELECT {select_clause} FROM input_table"
verbose and logger.info(
f"[{brick_display_name}] Clearing content of {('matching' if clear_on_match else 'non-matching')} cells."
)
if output_format == "pandas":
result = conn.execute(query).df()
verbose and logger.info(
f"[{brick_display_name}] Converted result to pandas DataFrame."
)
elif output_format == "polars":
result = conn.execute(query).pl()
verbose and logger.info(
f"[{brick_display_name}] Converted result to Polars DataFrame."
)
elif output_format == "arrow":
result = conn.execute(query).fetch_arrow_table()
verbose and logger.info(
f"[{brick_display_name}] Converted result to Arrow Table."
)
else:
verbose and logger.error(
f"[{brick_display_name}] Unsupported output format: {output_format}"
)
raise ValueError(f"Unsupported output format: {output_format}")
verbose and logger.info(
f"[{brick_display_name}] Filter completed successfully."
)
except Exception as e:
verbose and logger.error(
f"[{brick_display_name}] Error during filtering: {str(e)}"
)
raise
finally:
if conn is not None:
conn.close()
return result
Brick Info
- pandas
- polars[pyarrow]
- duckdb
- pyarrow