Find and Replace Strings

Find and replace strings in one or more columns with multiple replacement patterns.

Find and Replace Strings

Processing

This brick perform string searching and replacement across one or more columns within a data structure (Pandas/Polars DataFrame or Arrow Table). It supports defining multiple find/replace patterns, various matching modes (complete value, substring, or regular expression), and normalization settings (case-sensitive or lowercase). Columns can be selected by name list or by a regular expression pattern, and replacements can be applied either in-place or saved to new output columns based on provided mappings.

Inputs

data
The input data structure (Pandas DataFrame, Polars DataFrame, or Arrow Table) on which replacements will be performed.
columns (optional)
A list of specific column names to target for string replacement. If empty, selection is based on the regex pattern or all textual columns.
regex pattern (optional)
A regular expression pattern used to select textual columns by their name.
output columns (optional)
A mapping (List of dicts [{"column": "new_output"}, ...]) defining how processed columns should be mapped to new output columns. If a column is mapped here, the original column is preserved, and the result of the replacement is written to the new column name.
replacements (optional)
A mapping (List of dicts [{"value": "replacement"}, ...]) defining the find and replace patterns (key/value pairs).
matching mode (optional)
Specifies the method used for matching strings: complete_value, substring, or regular_expression.
normalization mode (optional)
Specifies how case sensitivity is handled during matching: exact (case-sensitive) or lowercase (case-insensitive).
first match only (optional)
If set to True, the replacement process stops after the first successful match within a given cell value.

Inputs Types

Input Types
data DataFrame, ArrowTable
columns List
regex pattern Str
output columns List, Dict
replacements List, Dict
matching mode Str
normalization mode Str
first match only Bool

You can check the list of supported types here: Available Type Hints.

Outputs

result
The resulting data structure (DataFrame or Arrow Table) containing the columns with applied string replacements.

Outputs Types

Output Types
result DataFrame, ArrowTable

You can check the list of supported types here: Available Type Hints.

Options

The Find and Replace Strings brick contains some changeable options:

Columns to Process
A list of column names that the find/replace operations should target.
Column Regex Pattern
A regular expression used to dynamically select column names to be processed.
Output Column Mappings
Defines key/value pairs where the key is the source column name and the value is the new name for the output column containing the processed strings. If a column is not mapped, replacement happens in-place.
Find/Replace Mappings
Defines key/value pairs where the key is the string pattern to find and the value is the string to replace it with.
Matching Mode
Determines how the string pattern is matched within the cell values. Choices are complete_value (requires exact match), substring (standard string replacement), or regular_expression (uses regex syntax).
Normalization Mode
Determines if matching should be exact (case-sensitive) or lowercase (case-insensitive).
Only First Match
If enabled, only the first occurrence of a pattern within a cell is replaced. Useful primarily when using substring or regular_expression modes.
Output Format
Specifies the format of the returned data structure. Choices are pandas, polars, or arrow.
Safe Mode
If enabled, missing columns specified in the "Columns to Process" list will be ignored instead of raising an error.
Verbose
Enables detailed logging output during the processing steps.
import logging
import duckdb
import pandas as pd
import polars as pl
import pyarrow as pa
import re
from coded_flows.types import Union, List, DataFrame, ArrowTable, Str, Dict, Bool

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def _coalesce(*values):
    return next((v for v in values if v is not None), None)


def _sanitize_identifier(identifier):
    """
    Sanitize SQL identifier by escaping special characters.
    Handles double quotes and other problematic characters.
    """
    return identifier.replace('"', '""')


def _is_list_of_key_value_dicts(obj):
    if not isinstance(obj, list):
        return False
    for item in obj:
        if not isinstance(item, dict):
            return False
        if set(item.keys()) != {"key", "value"}:
            return False
    return True


def _is_textual_dict(obj):
    if not isinstance(obj, dict):
        return False
    return all((isinstance(k, str) and isinstance(v, str) for (k, v) in obj.items()))


def _escape_sql_string(s):
    """Escape single quotes for SQL string literals."""
    return s.replace("'", "''")


def _escape_regex_for_sql(pattern):
    """Escape special characters in regex pattern for SQL."""
    return pattern.replace("'", "''")


def _build_replacement_expr(
    base_expr, replacements_dict, matching_mode, normalization_mode, first_match_only
):
    """Build the SQL replacement expression for a column."""
    replacement_expr = base_expr
    for find_str, replace_str in replacements_dict.items():
        escaped_find = _escape_sql_string(find_str)
        escaped_replace = _escape_sql_string(replace_str)
        if matching_mode == "complete_value":
            if normalization_mode == "lowercase":
                condition = f"LOWER({replacement_expr}) = LOWER('{escaped_find}')"
            else:
                condition = f"{replacement_expr} = '{escaped_find}'"
            replacement_expr = f"CASE WHEN {condition} THEN '{escaped_replace}' ELSE {replacement_expr} END"
            if first_match_only:
                break
        elif matching_mode == "substring":
            if normalization_mode == "lowercase":
                escaped_find_regex = _escape_regex_for_sql(re.escape(find_str))
                replacement_expr = f"regexp_replace({replacement_expr}, '{escaped_find_regex}', '{escaped_replace}', 'gi')"
            else:
                replacement_expr = f"REPLACE({replacement_expr}, '{escaped_find}', '{escaped_replace}')"
            if first_match_only:
                break
        elif matching_mode == "regular_expression":
            escaped_find_regex = _escape_regex_for_sql(find_str)
            escaped_replace_regex = escaped_replace.replace("$", "\\")
            escaped_replace_regex = escaped_replace_regex.replace("\\\\$", "$")
            if normalization_mode == "lowercase":
                flags = "i" if first_match_only else "gi"
            else:
                flags = "" if first_match_only else "g"
            replacement_expr = f"regexp_replace({replacement_expr}, '{escaped_find_regex}', '{escaped_replace_regex}', '{flags}')"
            if first_match_only:
                break
    return replacement_expr


def find_replace_strings(
    data: Union[DataFrame, ArrowTable],
    columns: List = None,
    regex_pattern: Str = None,
    output_columns: Union[List, Dict] = None,
    replacements: Union[List, Dict] = None,
    matching_mode: Str = None,
    normalization_mode: Str = None,
    first_match_only: Bool = None,
    options=None,
) -> Union[DataFrame, ArrowTable]:
    brick_display_name = "Find and Replace Strings"
    options = options or {}
    verbose = options.get("verbose", True)
    columns = _coalesce(columns, options.get("columns", []))
    regex_pattern = _coalesce(regex_pattern, options.get("regex_pattern", ""))
    output_columns = _coalesce(output_columns, options.get("output_columns", {}))
    replacements = _coalesce(replacements, options.get("replacements", []))
    matching_mode = _coalesce(matching_mode, options.get("matching_mode", "substring"))
    normalization_mode = _coalesce(
        normalization_mode, options.get("normalization_mode", "exact")
    )
    first_match_only = _coalesce(
        first_match_only, options.get("first_match_only", False)
    )
    process_all_columns = len(columns) == 0 and (not regex_pattern)
    output_format = options.get("output_format", "pandas")
    safe_mode = options.get("safe_mode", False)
    result = None
    conn = None
    if not isinstance(columns, list) or not all((isinstance(c, str) for c in columns)):
        verbose and logger.error(
            f"[{brick_display_name}] Invalid columns format! Expected a list."
        )
        raise ValueError("Columns must be provided as a list!")
    if not replacements:
        verbose and logger.warning(
            f"[{brick_display_name}] No replacements specified. Returning data unchanged."
        )
        result = data
    valid_matching_modes = ["complete_value", "substring", "regular_expression"]
    if matching_mode not in valid_matching_modes:
        verbose and logger.error(
            f"[{brick_display_name}] Invalid matching mode: {matching_mode}."
        )
        raise ValueError(f"Matching mode must be one of {valid_matching_modes}")
    valid_normalization_modes = ["exact", "lowercase"]
    if normalization_mode not in valid_normalization_modes:
        verbose and logger.error(
            f"[{brick_display_name}] Invalid normalization mode: {normalization_mode}."
        )
        raise ValueError(
            f"Normalization mode must be one of {valid_normalization_modes}"
        )
    if result is None:
        if _is_list_of_key_value_dicts(replacements):
            replacements_dict = {item["key"]: item["value"] for item in replacements}
        elif _is_textual_dict(replacements):
            replacements_dict = replacements
        else:
            verbose and logger.error(
                f"[{brick_display_name}] Invalid replacements format!"
            )
            raise ValueError("Invalid replacements format!")
        if not replacements_dict:
            verbose and logger.warning(
                f"[{brick_display_name}] No valid replacements found. Returning data unchanged."
            )
            result = data
    if result is None:
        if _is_list_of_key_value_dicts(output_columns):
            output_columns_dict = {
                item["key"]: item["value"] for item in output_columns
            }
        elif _is_textual_dict(output_columns):
            output_columns_dict = output_columns
        else:
            output_columns_dict = {}
        verbose and logger.info(
            f"[{brick_display_name}] Output column mappings: {(output_columns_dict if output_columns_dict else 'None (in-place replacement)')}"
        )
    if result is None:
        try:
            verbose and logger.info(
                f"[{brick_display_name}] Starting find/replace with {len(replacements_dict)} replacement(s) in '{matching_mode}' mode."
            )
            data_type = None
            if isinstance(data, pd.DataFrame):
                data_type = "pandas"
            elif isinstance(data, pl.DataFrame):
                data_type = "polars"
            elif isinstance(data, (pa.Table, pa.lib.Table)):
                data_type = "arrow"
            if data_type is None:
                verbose and logger.error(
                    f"[{brick_display_name}] Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
                )
                raise ValueError(
                    "Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
                )
            verbose and logger.info(
                f"[{brick_display_name}] Detected input format: {data_type}."
            )
            conn = duckdb.connect(":memory:")
            conn.register("input_table", data)
            column_info = conn.execute("DESCRIBE input_table").fetchall()
            all_columns = {col[0]: col[1] for col in column_info}
            verbose and logger.info(
                f"[{brick_display_name}] Total columns in data: {len(all_columns)}."
            )
            textual_types = ["VARCHAR", "TEXT", "STRING", "CHAR", "NVARCHAR"]
            textual_columns = {
                col: dtype
                for (col, dtype) in all_columns.items()
                if any((text_type in dtype.upper() for text_type in textual_types))
            }
            verbose and logger.info(
                f"[{brick_display_name}] Textual columns detected: {len(textual_columns)} out of {len(all_columns)} ({list(textual_columns.keys())})."
            )
            columns_to_process = []
            if regex_pattern:
                try:
                    pattern = re.compile(regex_pattern)
                    columns_to_process = [
                        col for col in textual_columns.keys() if pattern.search(col)
                    ]
                    if not columns_to_process:
                        verbose and logger.warning(
                            f"[{brick_display_name}] No textual columns matched regex pattern '{regex_pattern}'. Returning data unchanged."
                        )
                        result = data
                    else:
                        verbose and logger.info(
                            f"[{brick_display_name}] Regex pattern '{regex_pattern}' matched {len(columns_to_process)} textual columns: {columns_to_process}."
                        )
                except re.error as e:
                    verbose and logger.error(
                        f"[{brick_display_name}] Invalid regex pattern."
                    )
                    raise ValueError(f"Invalid regex pattern: {e}")
            elif process_all_columns:
                columns_to_process = list(textual_columns.keys())
                verbose and logger.info(
                    f"[{brick_display_name}] Processing all {len(columns_to_process)} textual columns."
                )
            else:
                if not safe_mode:
                    missing_columns = [col for col in columns if col not in all_columns]
                    if missing_columns:
                        verbose and logger.error(
                            f"[{brick_display_name}] Columns not found in data: {missing_columns}"
                        )
                        raise ValueError(
                            f"Columns not found in data: {missing_columns}"
                        )
                non_textual_requested = [
                    col
                    for col in columns
                    if col in all_columns and col not in textual_columns
                ]
                if non_textual_requested:
                    verbose and logger.warning(
                        f"[{brick_display_name}] Skipping non-textual columns: {non_textual_requested}"
                    )
                columns_to_process = [col for col in columns if col in textual_columns]
                skipped = len(columns) - len(columns_to_process)
                if safe_mode and skipped > 0:
                    skipped_cols = [col for col in columns if col not in all_columns]
                    verbose and logger.warning(
                        f"[{brick_display_name}] Safe mode: Skipped {skipped} non-existent columns: {skipped_cols}"
                    )
                verbose and logger.info(
                    f"[{brick_display_name}] Processing {len(columns_to_process)} textual column(s): {columns_to_process}."
                )
            if result is None:
                if not columns_to_process:
                    verbose and logger.warning(
                        f"[{brick_display_name}] No columns to process. Returning data unchanged."
                    )
                    result = data
                else:
                    new_output_columns = set(output_columns_dict.values())
                    conflicting_columns = new_output_columns.intersection(
                        set(all_columns.keys())
                    )
                    if conflicting_columns:
                        verbose and logger.warning(
                            f"[{brick_display_name}] Output columns {conflicting_columns} already exist and will be overwritten."
                        )
                    select_parts = []
                    processed_output_cols = set()
                    for col in all_columns.keys():
                        sanitized_col = _sanitize_identifier(col)
                        if col in columns_to_process:
                            base_expr = f'CAST("{sanitized_col}" AS VARCHAR)'
                            replacement_expr = _build_replacement_expr(
                                base_expr,
                                replacements_dict,
                                matching_mode,
                                normalization_mode,
                                first_match_only,
                            )
                            if col in output_columns_dict:
                                output_col = output_columns_dict[col]
                                sanitized_output = _sanitize_identifier(output_col)
                                select_parts.append(f'"{sanitized_col}"')
                                processed_output_cols.add(output_col)
                                verbose and logger.info(
                                    f"[{brick_display_name}] Column '{col}' will be processed into new column '{output_col}'."
                                )
                            else:
                                select_parts.append(
                                    f'{replacement_expr} AS "{sanitized_col}"'
                                )
                                verbose and logger.info(
                                    f"[{brick_display_name}] Column '{col}' will be replaced in-place."
                                )
                        else:
                            select_parts.append(f'"{sanitized_col}"')
                    for col in columns_to_process:
                        if col in output_columns_dict:
                            output_col = output_columns_dict[col]
                            sanitized_col = _sanitize_identifier(col)
                            sanitized_output = _sanitize_identifier(output_col)
                            base_expr = f'CAST("{sanitized_col}" AS VARCHAR)'
                            replacement_expr = _build_replacement_expr(
                                base_expr,
                                replacements_dict,
                                matching_mode,
                                normalization_mode,
                                first_match_only,
                            )
                            select_parts.append(
                                f'{replacement_expr} AS "{sanitized_output}"'
                            )
                    select_clause = ", ".join(select_parts)
                    query = f"SELECT {select_clause} FROM input_table"
                    verbose and logger.info(
                        f"[{brick_display_name}] Executing query to perform find/replace."
                    )
                    if output_format == "pandas":
                        result = conn.execute(query).df()
                        verbose and logger.info(
                            f"[{brick_display_name}] Converted result to pandas DataFrame."
                        )
                    elif output_format == "polars":
                        result = conn.execute(query).pl()
                        verbose and logger.info(
                            f"[{brick_display_name}] Converted result to Polars DataFrame."
                        )
                    elif output_format == "arrow":
                        result = conn.execute(query).fetch_arrow_table()
                        verbose and logger.info(
                            f"[{brick_display_name}] Converted result to Arrow Table."
                        )
                    else:
                        verbose and logger.error(
                            f"[{brick_display_name}] Unsupported output format: {output_format}"
                        )
                        raise ValueError(f"Unsupported output format: {output_format}")
                    verbose and logger.info(
                        f"[{brick_display_name}] Find/replace completed successfully. Processed {len(columns_to_process)} column(s)."
                    )
        except Exception as e:
            verbose and logger.error(
                f"[{brick_display_name}] Error during find/replace: {str(e)}"
            )
            raise
        finally:
            if conn is not None:
                conn.close()
    return result

Brick Info

version v0.1.3
python 3.10, 3.11, 3.12, 3.13
requirements
  • pandas
  • polars[pyarrow]
  • duckdb
  • pyarrow