Filter Rows by Values

Filter rows from the dataset that contain specific textual values, or clear content from matching cells.

Filter Rows by Values

Processing

This brick filter rows in a dataset (Pandas, Polars, or Arrow Table) based on specific textual values found within selected columns. It supports various actions, including keeping or removing matching rows, or clearing the content of matching or non-matching cells. Filtering can be configured using exact matches, substrings, or regular expressions, with options for case-insensitive or accent-normalized comparisons.

Inputs

data
The input dataset (DataFrame or Arrow Table) to be processed and filtered.
action (optional)
Specifies the operation to perform, keep_matching_rows (default), remove_matching_rows, clear_matching_cells, or clear_non_matching_cells..
columns (optional)
A list of specific column names to check for matching values. If empty, selection is based on the regex pattern or all textual columns.
regex pattern (optional)
A Regular Expression pattern used to dynamically select which columns to apply the filter to.
match values (optional)
A list of specific values that the function should search for within the selected columns.
match mode (optional)
Defines how the match values are applied (complete value, substring search, or regular expression matching).
normalization mode (optional)
Defines how text comparison is handled (exact (default), ignore_case, or normalize_accents).
multi column mode (optional)
Defines how multiple column checks are combined: either all columns must match a value, or any column must match a value.

Inputs Types

Input Types
data DataFrame, ArrowTable
action Str
columns List
regex pattern Str
match values List
match mode Str
normalization mode Str
multi column mode Str

You can check the list of supported types here: Available Type Hints.

Outputs

result
The resulting dataset, which is either filtered, or has specific cell contents cleared, according to the configured action and output format.

Outputs Types

Output Types
result DataFrame, ArrowTable

You can check the list of supported types here: Available Type Hints.

Options

The Filter Rows by Values brick contains some changeable options:

Action
Specifies whether to keep_matching_rows (default), remove_matching_rows, clear_matching_cells, or clear_non_matching_cells.
Columns to Check
Provide a list of columns where the matching logic should be applied.
Column Regex Pattern
A regex pattern used to select textual columns to be checked dynamically.
Values to Match
The list of specific textual values used for filtering or clearing operations.
Match Mode
Determines the comparison method: complete_value (default), substring search, or regular_expression matching.
Normalization Mode
Specifies text comparison rules: exact (default), ignore_case, or normalize_accents.
Multi-Column Mode
If multiple columns are checked, specifies if all or any column must satisfy the match condition for a row to be affected. Defaults to any.
Output Format
Specifies the type of output data structure: pandas (default), polars, or arrow.
Safe Mode
If enabled, missing columns specified in 'Columns to Check' will be skipped instead of raising an error. Defaults to False.
Verbose
Enables detailed logging during execution. Defaults to True.
import logging
import duckdb
import pandas as pd
import polars as pl
import pyarrow as pa
import re
from coded_flows.types import Union, List, DataFrame, ArrowTable, Str, Dict, Bool

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def _coalesce(*values):
    return next((v for v in values if v is not None), None)


def _sanitize_identifier(identifier):
    """
    Sanitize SQL identifier by escaping special characters.
    """
    return identifier.replace('"', '""')


def _escape_sql_string(s):
    """Escape single quotes for SQL string literals."""
    return s.replace("'", "''")


def _escape_regex_for_sql(pattern):
    """Escape special characters in regex pattern for SQL."""
    return pattern.replace("'", "''")


def _normalize_text_sql(text_expr, normalization_mode):
    """Generate SQL expression to normalize text based on normalization mode."""
    if normalization_mode == "ignore_case":
        return f"LOWER({text_expr})"
    elif normalization_mode == "normalize_accents":
        normalized = f"LOWER({text_expr})"
        accent_map = {
            "á": "a",
            "à": "a",
            "â": "a",
            "ä": "a",
            "ã": "a",
            "å": "a",
            "é": "e",
            "è": "e",
            "ê": "e",
            "ë": "e",
            "í": "i",
            "ì": "i",
            "î": "i",
            "ï": "i",
            "ó": "o",
            "ò": "o",
            "ô": "o",
            "ö": "o",
            "õ": "o",
            "ú": "u",
            "ù": "u",
            "û": "u",
            "ü": "u",
            "ñ": "n",
            "ç": "c",
            "Á": "a",
            "À": "a",
            "Â": "a",
            "Ä": "a",
            "Ã": "a",
            "Å": "a",
            "É": "e",
            "È": "e",
            "Ê": "e",
            "Ë": "e",
            "Í": "i",
            "Ì": "i",
            "Î": "i",
            "Ï": "i",
            "Ó": "o",
            "Ò": "o",
            "Ô": "o",
            "Ö": "o",
            "Õ": "o",
            "Ú": "u",
            "Ù": "u",
            "Û": "u",
            "Ü": "u",
            "Ñ": "n",
            "Ç": "c",
        }
        for accented, plain in accent_map.items():
            normalized = f"REPLACE({normalized}, '{accented}', '{plain}')"
        return normalized
    else:
        return text_expr


def _build_match_condition(column_expr, match_values, match_mode, normalization_mode):
    """Build SQL condition to check if a column matches any of the values."""
    conditions = []
    for value in match_values:
        escaped_value = _escape_sql_string(value)
        if match_mode == "complete_value":
            if normalization_mode == "normalize_accents":
                normalized_col = _normalize_text_sql(column_expr, normalization_mode)
                normalized_val = _normalize_text_sql(
                    f"'{escaped_value}'", normalization_mode
                )
                conditions.append(f"{normalized_col} = {normalized_val}")
            elif normalization_mode == "ignore_case":
                conditions.append(f"LOWER({column_expr}) = LOWER('{escaped_value}')")
            else:
                conditions.append(f"{column_expr} = '{escaped_value}'")
        elif match_mode == "substring":
            if normalization_mode == "ignore_case":
                escaped_regex = _escape_regex_for_sql(re.escape(value))
                conditions.append(
                    f"regexp_matches({column_expr}, '{escaped_regex}', 'i')"
                )
            elif normalization_mode == "normalize_accents":
                normalized_col = _normalize_text_sql(column_expr, normalization_mode)
                normalized_val = _normalize_text_sql(
                    f"'{escaped_value}'", normalization_mode
                )
                conditions.append(
                    f"{normalized_col} LIKE '%' || {normalized_val} || '%'"
                )
            else:
                conditions.append(f"{column_expr} LIKE '%{escaped_value}%'")
        elif match_mode == "regular_expression":
            escaped_regex = _escape_regex_for_sql(value)
            if normalization_mode == "ignore_case":
                conditions.append(
                    f"regexp_matches({column_expr}, '{escaped_regex}', 'i')"
                )
            elif normalization_mode == "normalize_accents":
                normalized_col = _normalize_text_sql(column_expr, normalization_mode)
                conditions.append(
                    f"regexp_matches({normalized_col}, '{escaped_regex}')"
                )
            else:
                conditions.append(f"regexp_matches({column_expr}, '{escaped_regex}')")
    if conditions:
        return f"({' OR '.join(conditions)})"
    return "FALSE"


def filter_rows_by_values(
    data: Union[DataFrame, ArrowTable],
    action: Str = None,
    columns: List = None,
    regex_pattern: Str = None,
    match_values: List = None,
    match_mode: Str = None,
    normalization_mode: Str = None,
    multi_column_mode: Str = None,
    options=None,
) -> Union[DataFrame, ArrowTable]:
    brick_display_name = "Filter Rows by Values"
    options = options or {}
    verbose = options.get("verbose", True)
    action = _coalesce(action, options.get("action", "keep_matching_rows"))
    columns = _coalesce(columns, options.get("columns", []))
    regex_pattern = _coalesce(regex_pattern, options.get("regex_pattern", ""))
    match_values = _coalesce(match_values, options.get("match_values", []))
    match_mode = _coalesce(match_mode, options.get("match_mode", "complete_value"))
    normalization_mode = _coalesce(
        normalization_mode, options.get("normalization_mode", "exact")
    )
    multi_column_mode = _coalesce(
        multi_column_mode, options.get("multi_column_mode", "any")
    )
    process_all_columns = len(columns) == 0 and (not regex_pattern)
    output_format = options.get("output_format", "pandas")
    safe_mode = options.get("safe_mode", False)
    result = None
    conn = None
    if not isinstance(columns, list) or not all((isinstance(c, str) for c in columns)):
        verbose and logger.error(
            f"[{brick_display_name}] Invalid columns format! Expected a list."
        )
        raise ValueError("Columns must be provided as a list!")
    if not isinstance(match_values, list):
        verbose and logger.error(
            f"[{brick_display_name}] Invalid match_values format! Expected a list."
        )
        raise ValueError("Match values must be provided as a list!")
    if not match_values:
        verbose and logger.warning(
            f"[{brick_display_name}] No match values specified. Returning data unchanged."
        )
        result = data
    valid_actions = [
        "keep_matching_rows",
        "remove_matching_rows",
        "clear_matching_cells",
        "clear_non_matching_cells",
    ]
    if action not in valid_actions:
        verbose and logger.error(f"[{brick_display_name}] Invalid action: {action}.")
        raise ValueError(f"Action must be one of {valid_actions}")
    valid_match_modes = ["complete_value", "substring", "regular_expression"]
    if match_mode not in valid_match_modes:
        verbose and logger.error(
            f"[{brick_display_name}] Invalid match mode: {match_mode}."
        )
        raise ValueError(f"Match mode must be one of {valid_match_modes}")
    valid_normalization_modes = ["exact", "ignore_case", "normalize_accents"]
    if normalization_mode not in valid_normalization_modes:
        verbose and logger.error(
            f"[{brick_display_name}] Invalid normalization mode: {normalization_mode}."
        )
        raise ValueError(
            f"Normalization mode must be one of {valid_normalization_modes}"
        )
    valid_multi_column_modes = ["all", "any"]
    if multi_column_mode not in valid_multi_column_modes:
        verbose and logger.error(
            f"[{brick_display_name}] Invalid multi-column mode: {multi_column_mode}."
        )
        raise ValueError(f"Multi-column mode must be one of {valid_multi_column_modes}")
    if normalization_mode == "normalize_accents" and match_mode != "complete_value":
        verbose and logger.warning(
            f"[{brick_display_name}] Accent normalization is only fully supported for complete value matching. Using it anyway."
        )
    if result is None:
        try:
            verbose and logger.info(
                f"[{brick_display_name}] Starting filter with action '{action}', match mode '{match_mode}', and {len(match_values)} value(s)."
            )
            data_type = None
            if isinstance(data, pd.DataFrame):
                data_type = "pandas"
            elif isinstance(data, pl.DataFrame):
                data_type = "polars"
            elif isinstance(data, (pa.Table, pa.lib.Table)):
                data_type = "arrow"
            if data_type is None:
                verbose and logger.error(
                    f"[{brick_display_name}] Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
                )
                raise ValueError(
                    "Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
                )
            verbose and logger.info(
                f"[{brick_display_name}] Detected input format: {data_type}."
            )
            conn = duckdb.connect(":memory:")
            conn.register("input_table", data)
            column_info = conn.execute("DESCRIBE input_table").fetchall()
            all_columns = {col[0]: col[1] for col in column_info}
            verbose and logger.info(
                f"[{brick_display_name}] Total columns in data: {len(all_columns)}."
            )
            textual_types = ["VARCHAR", "TEXT", "STRING", "CHAR", "NVARCHAR"]
            textual_columns = {
                col: dtype
                for (col, dtype) in all_columns.items()
                if any((text_type in dtype.upper() for text_type in textual_types))
            }
            verbose and logger.info(
                f"[{brick_display_name}] Textual columns detected: {len(textual_columns)} out of {len(all_columns)}."
            )
            columns_to_check = []
            if regex_pattern:
                try:
                    pattern = re.compile(regex_pattern)
                    columns_to_check = [
                        col for col in textual_columns.keys() if pattern.search(col)
                    ]
                    if not columns_to_check:
                        verbose and logger.warning(
                            f"[{brick_display_name}] No textual columns matched regex pattern '{regex_pattern}'. Returning data unchanged."
                        )
                        result = data
                    else:
                        verbose and logger.info(
                            f"[{brick_display_name}] Regex pattern '{regex_pattern}' matched {len(columns_to_check)} textual columns."
                        )
                except re.error as e:
                    verbose and logger.error(
                        f"[{brick_display_name}] Invalid regex pattern."
                    )
                    raise ValueError(f"Invalid regex pattern: {e}")
            elif process_all_columns:
                columns_to_check = list(textual_columns.keys())
                verbose and logger.info(
                    f"[{brick_display_name}] Checking all {len(columns_to_check)} textual columns."
                )
            else:
                if not safe_mode:
                    missing_columns = [col for col in columns if col not in all_columns]
                    if missing_columns:
                        verbose and logger.error(
                            f"[{brick_display_name}] Columns not found in data: {missing_columns}"
                        )
                        raise ValueError(
                            f"Columns not found in data: {missing_columns}"
                        )
                non_textual_requested = [
                    col
                    for col in columns
                    if col in all_columns and col not in textual_columns
                ]
                if non_textual_requested:
                    verbose and logger.warning(
                        f"[{brick_display_name}] Skipping non-textual columns: {non_textual_requested}"
                    )
                columns_to_check = [col for col in columns if col in textual_columns]
                if safe_mode:
                    skipped = len(columns) - len(columns_to_check)
                    if skipped > 0:
                        skipped_cols = [
                            col for col in columns if col not in all_columns
                        ]
                        verbose and logger.warning(
                            f"[{brick_display_name}] Safe mode: Skipped {skipped} non-existent columns: {skipped_cols}"
                        )
                verbose and logger.info(
                    f"[{brick_display_name}] Checking {len(columns_to_check)} textual column(s)."
                )
            if result is None:
                if not columns_to_check:
                    verbose and logger.warning(
                        f"[{brick_display_name}] No columns to check. Returning data unchanged."
                    )
                    result = data
                else:
                    column_conditions = []
                    for col in columns_to_check:
                        sanitized_col = _sanitize_identifier(col)
                        col_expr = f'CAST("{sanitized_col}" AS VARCHAR)'
                        condition = _build_match_condition(
                            col_expr, match_values, match_mode, normalization_mode
                        )
                        column_conditions.append(condition)
                    if multi_column_mode == "all":
                        row_match_condition = " AND ".join(column_conditions)
                    else:
                        row_match_condition = " OR ".join(column_conditions)
                    if action == "keep_matching_rows":
                        query = f"SELECT * FROM input_table WHERE {row_match_condition}"
                        verbose and logger.info(
                            f"[{brick_display_name}] Keeping rows where match condition is TRUE."
                        )
                    elif action == "remove_matching_rows":
                        query = f"SELECT * FROM input_table WHERE NOT ({row_match_condition})"
                        verbose and logger.info(
                            f"[{brick_display_name}] Removing rows where match condition is TRUE."
                        )
                    elif action in ["clear_matching_cells", "clear_non_matching_cells"]:
                        select_parts = []
                        clear_on_match = action == "clear_matching_cells"
                        for col in all_columns.keys():
                            sanitized_col = _sanitize_identifier(col)
                            if col in columns_to_check:
                                col_expr = f'CAST("{sanitized_col}" AS VARCHAR)'
                                col_condition = _build_match_condition(
                                    col_expr,
                                    match_values,
                                    match_mode,
                                    normalization_mode,
                                )
                                if clear_on_match:
                                    select_parts.append(
                                        f'CASE WHEN {col_condition} THEN NULL ELSE "{sanitized_col}" END AS "{sanitized_col}"'
                                    )
                                else:
                                    select_parts.append(
                                        f'CASE WHEN {col_condition} THEN "{sanitized_col}" ELSE NULL END AS "{sanitized_col}"'
                                    )
                            else:
                                select_parts.append(f'"{sanitized_col}"')
                        select_clause = ", ".join(select_parts)
                        query = f"SELECT {select_clause} FROM input_table"
                        verbose and logger.info(
                            f"[{brick_display_name}] Clearing content of {('matching' if clear_on_match else 'non-matching')} cells."
                        )
                    if output_format == "pandas":
                        result = conn.execute(query).df()
                        verbose and logger.info(
                            f"[{brick_display_name}] Converted result to pandas DataFrame."
                        )
                    elif output_format == "polars":
                        result = conn.execute(query).pl()
                        verbose and logger.info(
                            f"[{brick_display_name}] Converted result to Polars DataFrame."
                        )
                    elif output_format == "arrow":
                        result = conn.execute(query).fetch_arrow_table()
                        verbose and logger.info(
                            f"[{brick_display_name}] Converted result to Arrow Table."
                        )
                    else:
                        verbose and logger.error(
                            f"[{brick_display_name}] Unsupported output format: {output_format}"
                        )
                        raise ValueError(f"Unsupported output format: {output_format}")
                    verbose and logger.info(
                        f"[{brick_display_name}] Filter completed successfully."
                    )
        except Exception as e:
            verbose and logger.error(
                f"[{brick_display_name}] Error during filtering: {str(e)}"
            )
            raise
        finally:
            if conn is not None:
                conn.close()
    return result

Brick Info

version v0.1.3
python 3.10, 3.11, 3.12, 3.13
requirements
  • pandas
  • polars[pyarrow]
  • duckdb
  • pyarrow