Pivot Table

Reshape a dataset from long to wide format by pivoting a column and aggregating values.

Pivot Table

Processing

This function reshapes an input dataset (DataFrame or Arrow Table) from a long format to a wide format. It pivots the data based on a specified Pivot Column, using Group Keys to define unique output rows, and applying an Aggregation Function (like sum, count, or average) to the Columns to Aggregate.

It supports optional preservation of the input sorting order when Group Keys are defined.

Inputs

data
The input dataset structure (Pandas DataFrame, Polars DataFrame, or Arrow Table) that needs to be pivoted.
pivot_on (optional)
The name of the column whose unique values will be transformed into new column headers in the output. This is required either here or via the options panel.
group_keys (optional)
A list of column names used to define the unique rows (row identifiers) after aggregation. These columns form the GROUP BY clause in the underlying SQL.
values_to_aggregate (optional)
A list of column names containing the values that will be aggregated into the new pivoted cells. This is required either here or via the options panel.
aggregation (optional)
The mathematical or statistical function to apply when aggregating multiple values into a single cell (e.g., 'sum', 'count', 'max').

Inputs Types

Input Types
data DataFrame, ArrowTable
pivot_on Str
group_keys List
values_to_aggregate List
aggregation Str

You can check the list of supported types here: Available Type Hints.

Outputs

result
The resulting dataset, which is reshaped into the wide format, structured according to the specified output format (Pandas DataFrame, Polars DataFrame, or Arrow Table).

Outputs Types

Output Types
result DataFrame, ArrowTable

You can check the list of supported types here: Available Type Hints.

Options

The Pivot Table brick contains some changeable options:

Pivot Column
The name of the column whose unique values will form the new column headers.
Group Keys (Row Identifiers)
A list of columns used to uniquely identify rows before pivoting (the columns that remain un-pivoted).
Columns to Aggregate
A list of columns whose values will be aggregated and used to populate the new pivoted cells.
Aggregation Function
Specifies the function used to aggregate values. Choices include: count, distinct, max, min, avg, median, sum, std, first, last, and concat.
Add Aggr. Name to Columns
If enabled, the aggregation function name (e.g., 'sum') will be prepended or suffixed to the new pivoted column names (e.g., sum_revenue).
Preserve Input Sort Order
If enabled, the resulting table will attempt to maintain the original row order based on the first occurrence of each unique Group Keys set in the input data. This only applies if Group Keys are defined.
Output Format
Specifies the format of the returned dataset. Options are pandas (DataFrame), polars (DataFrame), or arrow (Arrow Table).
Verbose
Enables detailed logging output during the execution of the brick, useful for debugging.
import logging
import duckdb
import pandas as pd
import polars as pl
import pyarrow as pa
from pathlib import Path
from coded_flows.types import Union, List, Dict, DataFrame, ArrowTable, Str, Bool

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def _coalesce(*values):
    """Return the first non-None value."""
    return next((v for v in values if v is not None), None)


def _sanitize_identifier(identifier):
    """
    Sanitize SQL identifier by escaping special characters.
    Handles double quotes and other problematic characters.
    """
    if not identifier:
        return ""
    return identifier.replace('"', '""')


def pivot_table(
    data: Union[DataFrame, ArrowTable],
    pivot_on: Str = None,
    group_keys: List = None,
    values_to_aggregate: List = None,
    aggregation: Str = None,
    options: Dict = None,
) -> Union[DataFrame, ArrowTable]:
    brick_display_name = "Pivot Table"
    options = options or {}
    result = None
    conn = None
    verbose = options.get("verbose", True)
    add_aggregation_name = options.get("add_aggregation_name", False)
    preserve_row_order = options.get("preserve_row_order", False)
    output_format = options.get("output_format", "pandas")
    pivot_on = _coalesce(pivot_on, options.get("pivot_on", ""))
    group_keys = _coalesce(group_keys, options.get("group_keys", []))
    if not isinstance(group_keys, list) and (
        not all((isinstance(c, str) for c in group_keys))
    ):
        verbose and logger.error(
            f"[{brick_display_name}] Invalid format for 'group keys'!"
        )
        raise ValueError("Invalid format for 'group keys'!")
    values_to_aggregate = _coalesce(
        values_to_aggregate, options.get("values_to_aggregate", [])
    )
    if not isinstance(values_to_aggregate, list) and (
        not all((isinstance(c, str) for c in values_to_aggregate))
    ):
        verbose and logger.error(
            f"[{brick_display_name}] Invalid format for 'values to aggregate'!"
        )
        raise ValueError("Invalid format for 'values to aggregate'!")
    aggregation = _coalesce(aggregation, options.get("aggregation", "count"))
    try:
        data_type = None
        if isinstance(data, pd.DataFrame):
            data_type = "pandas"
        elif isinstance(data, pl.DataFrame):
            data_type = "polars"
        elif isinstance(data, (pa.Table, pa.lib.Table)):
            data_type = "arrow"
        if data_type is None:
            error_msg = "Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table."
            verbose and logger.error(f"[{brick_display_name}] {error_msg}")
            raise ValueError(error_msg)
        if not pivot_on:
            error_msg = "Parameter 'pivot_on' (column to pivot) is required."
            verbose and logger.error(f"[{brick_display_name}] {error_msg}")
            raise ValueError(error_msg)
        if not values_to_aggregate:
            error_msg = "Parameter 'values_to_aggregate' cannot be empty. Please specify at least one column."
            verbose and logger.error(f"[{brick_display_name}] {error_msg}")
            raise ValueError(error_msg)
        verbose and logger.info(
            f"[{brick_display_name}] Preparing to pivot. Pivot: '{pivot_on}'. Agg: '{aggregation}'. Preserve Order: {preserve_row_order}."
        )
        conn = duckdb.connect(":memory:")
        if preserve_row_order and len(group_keys) > 0:
            conn.register("raw_input", data)
            conn.execute(
                "CREATE TEMP TABLE input_table AS SELECT *, row_number() OVER () as _src_row_id FROM raw_input"
            )
        else:
            conn.register("input_table", data)
        schema_query = "DESCRIBE input_table"
        schema_info = conn.execute(schema_query).fetchall()
        available_columns = [row[0] for row in schema_info]
        if pivot_on not in available_columns:
            raise ValueError(f"Column '{pivot_on}' not found in dataset.")
        for key in group_keys:
            if key not in available_columns:
                raise ValueError(f"Group Key column '{key}' not found in dataset.")
        for val in values_to_aggregate:
            if val not in available_columns:
                raise ValueError(f"Value column '{val}' not found in dataset.")
        agg_sql_map = {
            "count": "count({})",
            "distinct": "count(DISTINCT {})",
            "sum": "sum({})",
            "avg": "avg({})",
            "min": "min({})",
            "max": "max({})",
            "median": "median({})",
            "std": "stddev({})",
            "first": "first({})",
            "last": "last({})",
            "concat": "string_agg(CAST({} AS VARCHAR), ', ')",
        }
        if aggregation not in agg_sql_map:
            raise ValueError(f"Unsupported aggregation function: {aggregation}")
        sql_agg_template = agg_sql_map[aggregation]
        using_clauses = []
        for val_col in values_to_aggregate:
            sanitized_val = _sanitize_identifier(val_col)
            clause = sql_agg_template.format(sanitized_val)
            if add_aggregation_name:
                safe_agg = _sanitize_identifier(aggregation).replace('"', "")
                safe_col = _sanitize_identifier(val_col).replace('"', "")
                clause += f' AS "{safe_agg}_{safe_col}"'
            using_clauses.append(clause)
        using_str = ", ".join(using_clauses)
        sanitized_pivot_on = _sanitize_identifier(pivot_on)
        group_by_clause = ""
        if group_keys:
            sanitized_keys = [_sanitize_identifier(k) for k in group_keys]
            group_by_clause = f"GROUP BY {', '.join(sanitized_keys)}"
        pivot_query = f"\n            PIVOT input_table \n            ON {sanitized_pivot_on} \n            USING {using_str} \n            {group_by_clause}\n        "
        if preserve_row_order and len(group_keys) > 0:
            sanitized_keys_str = ", ".join(
                [f'"{_sanitize_identifier(k)}"' for k in group_keys]
            )
            join_conditions = " AND ".join(
                [
                    f'p."{_sanitize_identifier(k)}" IS NOT DISTINCT FROM o."{_sanitize_identifier(k)}"'
                    for k in group_keys
                ]
            )
            final_query = f"\n                WITH pivoted_data AS (\n                    {pivot_query}\n                ),\n                ordering_logic AS (\n                    SELECT {sanitized_keys_str}, MIN(_src_row_id) as min_row_id\n                    FROM input_table\n                    GROUP BY {sanitized_keys_str}\n                )\n                SELECT p.*\n                FROM pivoted_data p\n                JOIN ordering_logic o ON {join_conditions}\n                ORDER BY o.min_row_id\n            "
            verbose and logger.info(
                f"[{brick_display_name}] Executing Pivot with Order Preservation."
            )
        else:
            final_query = pivot_query
            verbose and logger.info(f"[{brick_display_name}] Executing Standard Pivot.")
        result_rel = conn.execute(final_query)
        if output_format == "pandas":
            result = result_rel.df()
            verbose and logger.info(
                f"[{brick_display_name}] Converted to pandas DataFrame."
            )
        elif output_format == "polars":
            result = result_rel.pl()
            verbose and logger.info(
                f"[{brick_display_name}] Converted to Polars DataFrame."
            )
        elif output_format == "arrow":
            result = result_rel.fetch_arrow_table()
            verbose and logger.info(f"[{brick_display_name}] Converted to Arrow Table.")
        else:
            raise ValueError(f"Unsupported output format: {output_format}")
        if output_format == "pandas" or output_format == "polars":
            (rows, cols) = result.shape
        else:
            (rows, cols) = (result.num_rows, result.num_columns)
        verbose and logger.info(
            f"[{brick_display_name}] Pivot successful. Result shape: {rows} rows x {cols} columns."
        )
    except Exception as e:
        verbose and logger.error(
            f"[{brick_display_name}] Error during pivot operation: {str(e)}"
        )
        raise
    finally:
        if conn:
            conn.close()
    return result

Brick Info

version v0.1.3
python 3.10, 3.11, 3.12, 3.13
requirements
  • pandas
  • polars[pyarrow]
  • duckdb
  • pyarrow