Fold Multiple Columns
Transform wide dataset to narrow format by folding/melting columns (opposite of pivot).
Fold Multiple Columns
Processing
Transforms a wide dataset (Pandas DataFrame, Polars DataFrame, or Arrow Table) into a narrow (long) format by melting or unpivoting specified columns, which is the opposite of a pivot operation. Users must specify which columns to fold, either by providing an explicit list (columns to fold) or by using a Regular Expression pattern (regex pattern).
The output dataset includes the original ID columns (those not folded) and two new columns: one containing the names of the original folded columns and another containing the corresponding values.
Inputs
- data
- Input data structure containing the wide dataset (Pandas DataFrame, Polars DataFrame, or Arrow Table).
- columns to fold (optional)
- An explicit list of column names to be unpivoted/folded. This input is ignored if a
regex patternis provided via options. - regex pattern (optional)
- A regular expression pattern used to dynamically select which columns should be folded. This takes precedence over
columns to fold. - folded column names (optional)
- The desired name for the new column containing the names of the original folded columns. Defaults to 'variable'.
- folded values (optional)
- The desired name for the new column containing the values from the original folded columns. Defaults to 'value'.
Inputs Types
| Input | Types |
|---|---|
data |
DataFrame, ArrowTable |
columns to fold |
List |
regex pattern |
Str |
folded column names |
Str |
folded values |
Str |
You can check the list of supported types here: Available Type Hints.
Outputs
- result
- The resulting dataset in narrow format, containing the original ID columns plus the new columns for variable names and values. The output format is determined by the
Output Formatoption.
Outputs Types
| Output | Types |
|---|---|
result |
DataFrame, ArrowTable |
You can check the list of supported types here: Available Type Hints.
Options
The Fold Multiple Columns brick contains some changeable options:
- Columns to Fold
- Allows specifying an explicit list of column names to be used in the folding operation.
- Regex Pattern for Column Selection
- A regular expression used to dynamically identify and select columns for folding. If provided, it overrides the explicit column list.
- Column for Folded Column Names
- Defines the name for the new column that will store the names of the original columns being folded (default:
variable). - Column for Folded Values
- Defines the name for the new column that will store the values corresponding to the folded columns (default:
value). - Drop NULL Values
- If enabled, rows where the folded value is NULL are excluded from the output dataset.
- Output Format
- Selects the desired data structure for the output (choices: pandas, polars, arrow).
- Safe Mode
- If enabled, if no columns are matched by the selection criteria (regex or list), the function will return the input data unchanged instead of raising an error.
- Verbose
- Enables detailed logging and information messages during execution.
import logging
import duckdb
import pandas as pd
import polars as pl
import pyarrow as pa
import re
from coded_flows.types import Union, List, DataFrame, ArrowTable, Str
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def _coalesce(*values):
"""Return the first non-None value."""
return next((v for v in values if v is not None), None)
def _sanitize_identifier(identifier):
"""
Sanitize SQL identifier by escaping special characters.
Handles double quotes and other problematic characters.
"""
return identifier.replace('"', '""')
def _validate_column_name(name, brick_display_name, verbose):
"""Validate that a column name is valid and not empty."""
if not name or not isinstance(name, str):
verbose and logger.error(
f"[{brick_display_name}] Column name must be a non-empty string!"
)
raise ValueError("Column name must be a non-empty string!")
if name.strip() != name:
verbose and logger.warning(
f"[{brick_display_name}] Column name '{name}' has leading/trailing whitespace."
)
return name.strip()
def fold_multiple_columns(
data: Union[DataFrame, ArrowTable],
columns_to_fold: List = None,
regex_pattern: Str = None,
folded_column_names: Str = None,
folded_values: Str = None,
options=None,
) -> Union[DataFrame, ArrowTable]:
"""
Fold multiple columns to transform wide dataset into narrow format (unpivot/melt).
Args:
data: Input data (pandas DataFrame, Polars DataFrame, or Arrow Table)
columns_to_fold: List of column names to fold
regex_pattern: Regex pattern to select columns to fold
folded_column_names: Name for the column containing folded column names
folded_values: Name for the column containing folded values
options: Additional options dictionary
Returns:
Data in narrow format with folded columns
"""
brick_display_name = "Fold Multiple Columns"
options = options or {}
verbose = options.get("verbose", True)
columns_to_fold = _coalesce(columns_to_fold, options.get("columns_to_fold", []))
regex_pattern = _coalesce(regex_pattern, options.get("regex_pattern", ""))
folded_column_names = _coalesce(
folded_column_names, options.get("folded_column_names", "variable")
)
folded_values = _coalesce(folded_values, options.get("folded_values", "value"))
drop_null_values = options.get("drop_null_values", False)
output_format = options.get("output_format", "pandas")
safe_mode = options.get("safe_mode", False)
result = None
conn = None
if not isinstance(columns_to_fold, list):
verbose and logger.error(
f"[{brick_display_name}] columns_to_fold must be a list!"
)
raise ValueError("columns_to_fold must be a list!")
folded_column_names = _validate_column_name(
folded_column_names, brick_display_name, verbose
)
folded_values = _validate_column_name(folded_values, brick_display_name, verbose)
if folded_column_names == folded_values:
verbose and logger.error(
f"[{brick_display_name}] Folded column names and values must be different!"
)
raise ValueError(
"Folded column names and values columns must have different names!"
)
try:
data_type = None
if isinstance(data, pd.DataFrame):
data_type = "pandas"
elif isinstance(data, pl.DataFrame):
data_type = "polars"
elif isinstance(data, (pa.Table, pa.lib.Table)):
data_type = "arrow"
if data_type is None:
verbose and logger.error(
f"[{brick_display_name}] Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
)
raise ValueError(
"Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
)
verbose and logger.info(
f"[{brick_display_name}] Detected input format: {data_type}."
)
conn = duckdb.connect(":memory:")
conn.register("input_table", data)
column_info = conn.execute("DESCRIBE input_table").fetchall()
all_columns = {col[0]: col[1] for col in column_info}
row_count = conn.execute("SELECT COUNT(*) FROM input_table").fetchone()[0]
verbose and logger.info(
f"[{brick_display_name}] Input has {len(all_columns)} columns and {row_count} rows."
)
cols_to_fold = []
if regex_pattern:
try:
pattern = re.compile(regex_pattern)
cols_to_fold = [
col for col in all_columns.keys() if pattern.search(col)
]
if not cols_to_fold:
verbose and logger.warning(
f"[{brick_display_name}] No columns matched regex pattern '{regex_pattern}'."
)
if not safe_mode:
raise ValueError(
f"No columns matched regex pattern '{regex_pattern}'"
)
else:
verbose and logger.warning(
f"[{brick_display_name}] Safe mode: Returning data unchanged."
)
result = data
else:
verbose and logger.info(
f"[{brick_display_name}] Regex pattern '{regex_pattern}' matched {len(cols_to_fold)} columns: {cols_to_fold}."
)
except re.error as e:
verbose and logger.error(
f"[{brick_display_name}] Invalid regex pattern: {e}"
)
raise ValueError(f"Invalid regex pattern: {e}")
elif columns_to_fold:
if not safe_mode:
missing_columns = [
col for col in columns_to_fold if col not in all_columns
]
if missing_columns:
verbose and logger.error(
f"[{brick_display_name}] Columns not found in data: {missing_columns}"
)
raise ValueError(f"Columns not found in data: {missing_columns}")
cols_to_fold = columns_to_fold
else:
cols_to_fold = [col for col in columns_to_fold if col in all_columns]
skipped = len(columns_to_fold) - len(cols_to_fold)
if skipped > 0:
skipped_cols = [
col for col in columns_to_fold if col not in all_columns
]
verbose and logger.warning(
f"[{brick_display_name}] Safe mode: Skipped {skipped} non-existent columns: {skipped_cols}"
)
if cols_to_fold:
verbose and logger.info(
f"[{brick_display_name}] Folding {len(cols_to_fold)} columns: {cols_to_fold}."
)
else:
verbose and logger.error(
f"[{brick_display_name}] No columns specified for folding! Provide either columns_to_fold or regex_pattern."
)
raise ValueError(
"No columns specified for folding! Provide either columns_to_fold or regex_pattern."
)
if result is None:
if not cols_to_fold:
verbose and logger.warning(
f"[{brick_display_name}] No valid columns to fold. Returning data unchanged."
)
result = data
else:
if folded_column_names in all_columns:
verbose and logger.error(
f"[{brick_display_name}] Column '{folded_column_names}' already exists in data!"
)
raise ValueError(
f"Column name '{folded_column_names}' already exists in data!"
)
if folded_values in all_columns:
verbose and logger.error(
f"[{brick_display_name}] Column '{folded_values}' already exists in data!"
)
raise ValueError(
f"Column name '{folded_values}' already exists in data!"
)
id_columns = [
col for col in all_columns.keys() if col not in cols_to_fold
]
verbose and logger.info(
f"[{brick_display_name}] ID columns (kept as-is): {(id_columns if id_columns else 'None')}."
)
sanitized_folded_values = _sanitize_identifier(folded_values)
sanitized_folded_names = _sanitize_identifier(folded_column_names)
unpivot_columns = ", ".join(
[f'"{_sanitize_identifier(col)}"' for col in cols_to_fold]
)
query = f'\n UNPIVOT input_table\n ON {unpivot_columns}\n INTO\n NAME "{sanitized_folded_names}"\n VALUE "{sanitized_folded_values}"\n '
if drop_null_values:
query = f'\n SELECT * FROM ({query}) AS unpivoted\n WHERE "{sanitized_folded_values}" IS NOT NULL\n '
verbose and logger.info(
f"[{brick_display_name}] Executing fold transformation."
)
if output_format == "pandas":
result = conn.execute(query).df()
verbose and logger.info(
f"[{brick_display_name}] Converted result to pandas DataFrame."
)
elif output_format == "polars":
result = conn.execute(query).pl()
verbose and logger.info(
f"[{brick_display_name}] Converted result to Polars DataFrame."
)
elif output_format == "arrow":
result = conn.execute(query).fetch_arrow_table()
verbose and logger.info(
f"[{brick_display_name}] Converted result to Arrow Table."
)
else:
verbose and logger.error(
f"[{brick_display_name}] Unsupported output format: {output_format}"
)
raise ValueError(f"Unsupported output format: {output_format}")
result_row_count = conn.execute(
f"SELECT COUNT(*) FROM ({query}) AS result"
).fetchone()[0]
verbose and logger.info(
f"[{brick_display_name}] Fold operation completed successfully. Transformed {row_count} rows × {len(cols_to_fold)} columns into {result_row_count} rows. Output columns: {id_columns + [folded_column_names, folded_values]}."
)
except Exception as e:
verbose and logger.error(
f"[{brick_display_name}] Error during fold operation: {str(e)}"
)
raise
finally:
if conn is not None:
conn.close()
return result
Brick Info
- pandas
- polars[pyarrow]
- duckdb
- pyarrow