Transpose Data

Transpose multiple rows into columns (widen dataset) using an Index, Label, and Value column.

Transpose Data

Processing

This brick transposes data from a long format (multiple rows) into a wide format (columns). It requires specifying an Index Column (to define rows), a Labels Column (to define the new column headers), and a Values Column (to define the cell content). If duplicate combinations of Index and Label are found, the function keeps the last value encountered based on the original row order, and the output rows are ordered based on the original appearance of the Index value in the input data.

Inputs

data
The input dataset, provided as a Pandas DataFrame, Polars DataFrame, or PyArrow Table.
index column (optional)
The name of the column that defines the unique rows (index) of the resulting transposed data. This is a required field, typically set via the options.
labels column (optional)
The name of the column whose unique values will become the new column headers in the transposed data. This is a required field, typically set via the options.
values column (optional)
The name of the column containing the data values that will populate the cells of the transposed data. This is a required field, typically set via the options.

Inputs Types

Input Types
data DataFrame, ArrowTable
index column Str
labels column Str
values column Str

You can check the list of supported types here: Available Type Hints.

Outputs

result
The resulting transposed dataset (wide format). The format depends on the selected 'Output Format' option (Pandas DataFrame, Polars DataFrame, or Arrow Table).

Outputs Types

Output Types
result DataFrame, ArrowTable

You can check the list of supported types here: Available Type Hints.

Options

The Transpose Data brick contains some changeable options:

Index Column (Rows)
The name of the column to use as the row identifier in the output. This column will always be the first column in the resulting table.
Labels Column (New Columns)
The name of the column containing the values that will be used to create the new column headers (the pivot field).
Values Column (Cell Content)
The name of the column containing the data used to populate the transposed cells.
Output Format
Specifies the desired format for the output data structure (pandas, polars, or arrow). Defaults to pandas.
Verbose
If enabled, detailed logs and information about the transposition process will be displayed.
import logging
import duckdb
import pandas as pd
import polars as pl
import pyarrow as pa
from coded_flows.types import Union, Dict, DataFrame, ArrowTable, Str

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def _coalesce(*values):
    """Return the first non-None value."""
    return next((v for v in values if v is not None), None)


def _sanitize_identifier(identifier):
    """
    Sanitize SQL identifier by escaping special characters.
    Handles double quotes and other problematic characters.
    """
    if not identifier:
        return ""
    return identifier.replace('"', '""')


def transpose_data(
    data: Union[DataFrame, ArrowTable],
    index_column: Str = None,
    labels_column: Str = None,
    values_column: Str = None,
    options: Dict = None,
) -> Union[DataFrame, ArrowTable]:
    """
    Transpose data from long to wide format based on Index, Label, and Value columns.
    Uses the last encountered value for duplicates.
    Ensures the index column is always the first column in the output.

    Args:
        data: Input dataset (pandas, polars, or arrow).
        index_column: Column to group by (rows).
        labels_column: Column to pivot (columns).
        values_column: Column containing the values.
        options: Configuration options.

    Returns:
        Transposed dataset in the selected format.
    """
    brick_display_name = "Transpose Data"
    options = options or {}
    result = None
    conn = None
    verbose = options.get("verbose", True)
    output_format = options.get("output_format", "pandas")
    index_column = _coalesce(index_column, options.get("index_column", ""))
    labels_column = _coalesce(labels_column, options.get("labels_column", ""))
    values_column = _coalesce(values_column, options.get("values_column", ""))
    try:
        data_type = None
        if isinstance(data, pd.DataFrame):
            data_type = "pandas"
        elif isinstance(data, pl.DataFrame):
            data_type = "polars"
        elif isinstance(data, (pa.Table, pa.lib.Table)):
            data_type = "arrow"
        if data_type is None:
            error_msg = "Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table."
            verbose and logger.error(f"[{brick_display_name}] {error_msg}")
            raise ValueError(error_msg)
        if not index_column:
            raise ValueError("Parameter 'index_column' is required.")
        if not labels_column:
            raise ValueError("Parameter 'labels_column' is required.")
        if not values_column:
            raise ValueError("Parameter 'values_column' is required.")
        verbose and logger.info(
            f"[{brick_display_name}] Transposing data. Index: '{index_column}', Labels: '{labels_column}', Values: '{values_column}'."
        )
        conn = duckdb.connect(":memory:")
        conn.register("raw_input", data)
        schema_info = conn.execute("DESCRIBE raw_input").fetchall()
        available_columns = [row[0] for row in schema_info]
        for col_name, param_name in [
            (index_column, "index_column"),
            (labels_column, "labels_column"),
            (values_column, "values_column"),
        ]:
            if col_name not in available_columns:
                verbose and logger.error(
                    f"[{brick_display_name}] Column '{col_name}' not found in data."
                )
                raise ValueError(f"Column '{col_name}' ({param_name}) not found.")
        conn.execute(
            "CREATE TEMP TABLE ordered_input AS SELECT *, row_number() OVER () as _src_row_id FROM raw_input"
        )
        sanitized_index = _sanitize_identifier(index_column)
        sanitized_labels = _sanitize_identifier(labels_column)
        sanitized_values = _sanitize_identifier(values_column)
        query = f'\n            WITH pivoted AS (\n                PIVOT ordered_input\n                ON "{sanitized_labels}"\n                USING last("{sanitized_values}" ORDER BY _src_row_id)\n                GROUP BY "{sanitized_index}"\n            ),\n            ordering AS (\n                -- Find the first appearance of each index to preserve row order\n                SELECT "{sanitized_index}", MIN(_src_row_id) as sort_key\n                FROM ordered_input\n                GROUP BY "{sanitized_index}"\n            )\n            SELECT \n                p."{sanitized_index}", \n                p.* EXCLUDE ("{sanitized_index}")\n            FROM pivoted p\n            JOIN ordering o ON p."{sanitized_index}" IS NOT DISTINCT FROM o."{sanitized_index}"\n            ORDER BY o.sort_key\n        '
        verbose and logger.info(
            f"[{brick_display_name}] Executing transposition query..."
        )
        result_rel = conn.execute(query)
        if output_format == "pandas":
            result = result_rel.df()
            verbose and logger.info(
                f"[{brick_display_name}] Converted to pandas DataFrame."
            )
        elif output_format == "polars":
            result = result_rel.pl()
            verbose and logger.info(
                f"[{brick_display_name}] Converted to Polars DataFrame."
            )
        elif output_format == "arrow":
            result = result_rel.fetch_arrow_table()
            verbose and logger.info(f"[{brick_display_name}] Converted to Arrow Table.")
        else:
            raise ValueError(f"Unsupported output format: {output_format}")
        if output_format == "pandas" or output_format == "polars":
            (rows, cols) = result.shape
        else:
            (rows, cols) = (result.num_rows, result.num_columns)
        verbose and logger.info(
            f"[{brick_display_name}] Transpose successful. Result: {rows} rows x {cols} columns."
        )
    except Exception as e:
        verbose and logger.error(
            f"[{brick_display_name}] Error during transpose operation: {str(e)}"
        )
        raise
    finally:
        if conn:
            conn.close()
    return result

Brick Info

version v0.1.3
python 3.10, 3.11, 3.12, 3.13
requirements
  • pandas
  • polars[pyarrow]
  • duckdb
  • pyarrow