Unfold Column

Transform a column's values into binary columns (one-hot encoding/dummification).

Unfold Column

Processing

This brick transforms a single categorical column in the input data structure (Pandas DataFrame, Polars DataFrame, or Arrow Table) into multiple binary (dummy/one-hot encoded) columns. Each unique value in the original column becomes a new column, containing 1 where the value matches and NULL otherwise.

Inputs

data
The input data structure (pandas DataFrame, Polars DataFrame, or Arrow Table) containing the column to be unfolded.
column (optional)
The name of the column whose unique values will be converted into binary columns. If provided as an input, it overrides the value set in options.
prefix (optional)
A custom string to use as the prefix for the newly generated column names. If provided, it overrides the logic defined by the Use Column Name as Prefix option.
max columns (optional)
The maximum number of new binary columns allowed to be created based on unique values. If the number of unique values exceeds this limit, only the top values are used.

Inputs Types

Input Types
data DataFrame, ArrowTable
column Str
prefix Str
max columns Int

You can check the list of supported types here: Available Type Hints.

Outputs

result
The input data structure with the new dummy columns appended, formatted according to the Output Format option.

Outputs Types

Output Types
result DataFrame, ArrowTable

You can check the list of supported types here: Available Type Hints.

Options

The Unfold Column brick contains some changeable options:

Column to Unfold
Specifies the name of the column whose unique values will be converted into binary columns. This setting is overridden if the column input pin is connected.
Use Column Name as Prefix
If enabled, the original column name will be used as a prefix for the new dummy columns, unless a custom prefix is explicitly provided. (Default: True)
Custom Prefix
A specific string to use as the prefix for all generated column names. This setting overrides the Use Column Name as Prefix option.
Max nb. columns to create
Sets the maximum number of binary columns that will be generated. If the number of unique values exceeds this limit, only the first N values are used. (Default: 100)
Drop Original Column
If enabled, the original column used for unfolding will be removed from the output data. (Default: False)
Include NULL Values
If enabled, a separate binary column will be created to mark rows where the original column value was NULL. (Default: False)
Sort New Columns
If enabled, the newly created binary columns will be ordered alphabetically within the resulting dataset. (Default: True)
Output Format
Specifies the data format for the output: pandas DataFrame, polars DataFrame, or arrow Table. (Default: pandas)
Verbose
If enabled, detailed logs about the unfolding process, execution, and potential warnings will be displayed. (Default: True)
import logging
import duckdb
import pandas as pd
import polars as pl
import pyarrow as pa
import re
from coded_flows.types import Union, List, DataFrame, ArrowTable, Str, Int

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def _coalesce(*values):
    """Return the first non-None value."""
    return next((v for v in values if v is not None), None)


def _sanitize_identifier(identifier):
    """
    Sanitize SQL identifier by escaping special characters.
    Handles double quotes and other problematic characters.
    """
    return identifier.replace('"', '""')


def _sanitize_column_name(name):
    """
    Sanitize column names to be SQL-safe and readable.
    Replace problematic characters with underscores.
    """
    sanitized = re.sub("[^\\w\\s-]", "_", str(name))
    sanitized = re.sub("[\\s_-]+", "_", sanitized)
    sanitized = sanitized.strip("_")
    if not sanitized:
        sanitized = "unknown"
    return sanitized


def unfold_column(
    data: Union[DataFrame, ArrowTable],
    column: Str = None,
    prefix: Str = None,
    max_columns: Int = None,
    options=None,
) -> Union[DataFrame, ArrowTable]:
    """
    Transform categorical column values into binary dummy columns (one-hot encoding).

    Args:
        data: Input data (pandas DataFrame, Polars DataFrame, or Arrow Table)
        column: Column name to unfold
        prefix: Custom prefix for new column names (overrides use_column_name_as_prefix)
        max_columns: Maximum number of columns to create
        options: Additional options dictionary

    Returns:
        Data with unfolded columns in the specified output format
    """
    brick_display_name = "Unfold Column"
    options = options or {}
    verbose = options.get("verbose", True)
    column = _coalesce(column, options.get("column", ""))
    prefix = _coalesce(prefix, options.get("prefix", ""))
    use_column_name_as_prefix = options.get("use_column_name_as_prefix", True)
    max_columns = _coalesce(max_columns, options.get("max_columns", 100))
    drop_original = options.get("drop_original", False)
    handle_nulls = options.get("handle_nulls", False)
    sort_columns = options.get("sort_columns", True)
    output_format = options.get("output_format", "pandas")
    result = None
    conn = None
    if not column or not isinstance(column, str):
        verbose and logger.error(
            f"[{brick_display_name}] Column name must be a non-empty string!"
        )
        raise ValueError("Column name must be a non-empty string!")
    if not isinstance(max_columns, int) or max_columns < 1:
        verbose and logger.error(
            f"[{brick_display_name}] Max columns must be a positive integer!"
        )
        raise ValueError("Max columns must be a positive integer!")
    try:
        data_type = None
        if isinstance(data, pd.DataFrame):
            data_type = "pandas"
        elif isinstance(data, pl.DataFrame):
            data_type = "polars"
        elif isinstance(data, (pa.Table, pa.lib.Table)):
            data_type = "arrow"
        if data_type is None:
            verbose and logger.error(
                f"[{brick_display_name}] Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
            )
            raise ValueError(
                "Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
            )
        verbose and logger.info(
            f"[{brick_display_name}] Detected input format: {data_type}."
        )
        conn = duckdb.connect(":memory:")
        conn.register("input_table", data)
        column_info = conn.execute("DESCRIBE input_table").fetchall()
        all_columns = {col[0]: col[1] for col in column_info}
        if column not in all_columns:
            verbose and logger.error(
                f"[{brick_display_name}] Column '{column}' not found in data!"
            )
            raise ValueError(f"Column '{column}' not found in data!")
        row_count = conn.execute("SELECT COUNT(*) FROM input_table").fetchone()[0]
        verbose and logger.info(
            f"[{brick_display_name}] Unfolding column '{column}' from {row_count} rows."
        )
        sanitized_col = _sanitize_identifier(column)
        if handle_nulls:
            distinct_query = f'\n                SELECT DISTINCT "{sanitized_col}" \n                FROM input_table \n                ORDER BY "{sanitized_col}" NULLS FIRST\n            '
        else:
            distinct_query = f'\n                SELECT DISTINCT "{sanitized_col}" \n                FROM input_table \n                WHERE "{sanitized_col}" IS NOT NULL\n                ORDER BY "{sanitized_col}"\n            '
        unique_values = conn.execute(distinct_query).fetchall()
        unique_values = [row[0] for row in unique_values]
        total_unique = len(unique_values)
        verbose and logger.info(
            f"[{brick_display_name}] Found {total_unique} unique value(s) in column '{column}'."
        )
        if total_unique > max_columns:
            verbose and logger.warning(
                f"[{brick_display_name}] Number of unique values ({total_unique}) exceeds max_columns ({max_columns}). Only the first {max_columns} values will be used."
            )
            unique_values = unique_values[:max_columns]
        if total_unique == 0:
            verbose and logger.warning(
                f"[{brick_display_name}] No non-null values found in column '{column}'. Returning data unchanged."
            )
            result = data
        else:
            final_prefix = ""
            if prefix:
                final_prefix = prefix if prefix.endswith("_") else f"{prefix}_"
                verbose and logger.info(
                    f"[{brick_display_name}] Using custom prefix: '{final_prefix}'."
                )
            elif use_column_name_as_prefix:
                final_prefix = f"{column}_"
                verbose and logger.info(
                    f"[{brick_display_name}] Using column name as prefix: '{final_prefix}'."
                )
            else:
                final_prefix = ""
                verbose and logger.info(
                    f"[{brick_display_name}] No prefix will be used for new columns."
                )
            verbose and logger.info(
                f"[{brick_display_name}] Creating {len(unique_values)} dummy column(s)."
            )
            case_statements = []
            new_column_names = []
            for value in unique_values:
                if value is None:
                    sanitized_value = "NULL"
                    comparison = f'"{sanitized_col}" IS NULL'
                else:
                    sanitized_value = _sanitize_column_name(value)
                    escaped_value = str(value).replace("'", "''")
                    comparison = f""""{sanitized_col}" = '{escaped_value}'"""
                new_col_name = f"{final_prefix}{sanitized_value}"
                new_column_names.append(new_col_name)
                case_statement = f'CASE WHEN {comparison} THEN 1 ELSE NULL END AS "{_sanitize_identifier(new_col_name)}"'
                case_statements.append(case_statement)
            if sort_columns:
                sorted_indices = sorted(
                    range(len(new_column_names)), key=lambda i: new_column_names[i]
                )
                case_statements = [case_statements[i] for i in sorted_indices]
                new_column_names = [new_column_names[i] for i in sorted_indices]
            if drop_original:
                other_columns = [
                    f'"{_sanitize_identifier(col)}"'
                    for col in all_columns.keys()
                    if col != column
                ]
                select_clause = ", ".join(other_columns + case_statements)
            else:
                select_clause = "*, " + ", ".join(case_statements)
            query = f"SELECT {select_clause} FROM input_table"
            verbose and logger.info(
                f"[{brick_display_name}] Executing unfold transformation."
            )
            if output_format == "pandas":
                result = conn.execute(query).df()
                verbose and logger.info(
                    f"[{brick_display_name}] Converted result to pandas DataFrame."
                )
            elif output_format == "polars":
                result = conn.execute(query).pl()
                verbose and logger.info(
                    f"[{brick_display_name}] Converted result to Polars DataFrame."
                )
            elif output_format == "arrow":
                result = conn.execute(query).fetch_arrow_table()
                verbose and logger.info(
                    f"[{brick_display_name}] Converted result to Arrow Table."
                )
            else:
                verbose and logger.error(
                    f"[{brick_display_name}] Unsupported output format: {output_format}"
                )
                raise ValueError(f"Unsupported output format: {output_format}")
            verbose and logger.info(
                f"[{brick_display_name}] Unfold operation completed successfully. Created {len(new_column_names)} new column(s): {', '.join(new_column_names[:5])}{('...' if len(new_column_names) > 5 else '')}."
            )
    except Exception as e:
        verbose and logger.error(
            f"[{brick_display_name}] Error during unfold operation: {str(e)}"
        )
        raise
    finally:
        if conn is not None:
            conn.close()
    return result

Brick Info

version v0.1.3
python 3.10, 3.11, 3.12, 3.13
requirements
  • pandas
  • polars[pyarrow]
  • duckdb
  • pyarrow