Stack Data

Combine rows of two or more datasets into a single output dataset using union, intersection, or custom schema methods.

Stack Data

Processing

This function combines the rows of two or more input datasets (which can be Pandas DataFrames, Polars DataFrames, or Arrow Tables) into a single unified output dataset.

The method for column handling is determined by the Stacking Method option, allowing for schema union, schema intersection, or a custom predefined schema. The final stacked dataset is returned in the format specified by the Output Format option.

Inputs

datasets
A list containing at least two datasets (DataFrame, Polars, or Arrow Table) that need to be stacked vertically.

Inputs Types

Input Types
datasets List

You can check the list of supported types here: Available Type Hints.

Outputs

result
The resulting dataset containing the combined rows of the input datasets, formatted according to the selected output format.

Outputs Types

Output Types
result DataFrame, ArrowTable

You can check the list of supported types here: Available Type Hints.

Options

The Stack Data brick contains some changeable options:

Stacking Method
Defines how the schemas of the input datasets are combined. Choices include union (uses all unique columns across all inputs), intersection (uses only columns common to all inputs), or custom.
Custom Columns
A list of column names to be used for the output schema, required if the 'Stacking Method' is set to 'custom'. Missing columns are filled with NULLs.
Output Format
Determines the desired format for the resulting stacked dataset. Choices include pandas (Pandas DataFrame), polars (Polars DataFrame), or arrow (PyArrow Table).
Verbose
Enables or disables detailed logging output regarding the stacking process and column determination.
import logging
import duckdb
import pandas as pd
import polars as pl
import pyarrow as pa
from coded_flows.types import Union, DataFrame, ArrowTable, List

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def _sanitize_identifier(identifier):
    """
    Sanitize SQL identifier by escaping special characters.
    Handles double quotes and other problematic characters.
    """
    return identifier.replace('"', '""')


def stack_data(datasets: List, options=None) -> Union[DataFrame, ArrowTable]:
    brick_display_name = "Stack Data"
    options = options or {}
    verbose = options.get("verbose", True)
    stacking_method = options.get("stacking_method", "union")
    custom_columns = options.get("custom_columns", [])
    output_format = options.get("output_format", "pandas")
    result = None
    try:
        if not datasets or len(datasets) < 2:
            verbose and logger.error(
                f"[{brick_display_name}] At least two datasets are required for stacking."
            )
            raise ValueError("At least two datasets are required for stacking.")
        verbose and logger.info(
            f"[{brick_display_name}] Starting stacking process with {len(datasets)} datasets using '{stacking_method}' method."
        )
        valid_types = []
        for idx, dataset in enumerate(datasets):
            data_type = None
            if isinstance(dataset, pd.DataFrame):
                data_type = "pandas"
            elif isinstance(dataset, pl.DataFrame):
                data_type = "polars"
            elif isinstance(dataset, (pa.Table, pa.lib.Table)):
                data_type = "arrow"
            if data_type is None:
                verbose and logger.error(
                    f"[{brick_display_name}] Dataset {idx + 1} must be a pandas DataFrame, Polars DataFrame, or Arrow Table."
                )
                raise ValueError(
                    f"Dataset {idx + 1} must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
                )
            valid_types.append(data_type)
        verbose and logger.info(
            f"[{brick_display_name}] Detected input formats: {', '.join(set(valid_types))}."
        )
        conn = duckdb.connect(":memory:")
        all_columns_sets = []
        for idx, dataset in enumerate(datasets):
            table_name = f"dataset_{idx}"
            conn.register(table_name, dataset)
            column_info = conn.execute(f"DESCRIBE {table_name}").fetchall()
            columns = set([col[0] for col in column_info])
            all_columns_sets.append(columns)
            verbose and logger.info(
                f"[{brick_display_name}] Dataset {idx + 1} has {len(columns)} columns."
            )
        if stacking_method == "union":
            output_columns = set()
            for cols in all_columns_sets:
                output_columns.update(cols)
            output_columns = sorted(list(output_columns))
            verbose and logger.info(
                f"[{brick_display_name}] Union method: {len(output_columns)} columns in output."
            )
        elif stacking_method == "intersection":
            output_columns = all_columns_sets[0]
            for cols in all_columns_sets[1:]:
                output_columns = output_columns.intersection(cols)
            output_columns = sorted(list(output_columns))
            verbose and logger.info(
                f"[{brick_display_name}] Intersection method: {len(output_columns)} columns in output."
            )
        elif stacking_method == "custom":
            if not custom_columns:
                verbose and logger.error(
                    f"[{brick_display_name}] Custom columns must be specified when using 'custom' stacking method."
                )
                raise ValueError(
                    "Custom columns must be specified when using 'custom' stacking method."
                )
            output_columns = custom_columns
            verbose and logger.info(
                f"[{brick_display_name}] Custom method: {len(output_columns)} columns specified."
            )
        else:
            verbose and logger.error(
                f"[{brick_display_name}] Invalid stacking method: '{stacking_method}'."
            )
            raise ValueError(
                f"Invalid stacking method: '{stacking_method}'. Must be 'union', 'intersection', or 'custom'."
            )
        union_queries = []
        for idx, dataset_columns in enumerate(all_columns_sets):
            table_name = f"dataset_{idx}"
            select_parts = []
            for col in output_columns:
                sanitized_col = f'"{_sanitize_identifier(col)}"'
                if col in dataset_columns:
                    select_parts.append(sanitized_col)
                else:
                    select_parts.append(f"NULL AS {sanitized_col}")
            select_clause = ", ".join(select_parts)
            union_queries.append(f"SELECT {select_clause} FROM {table_name}")
        final_query = " UNION ALL ".join(union_queries)
        verbose and logger.info(
            f"[{brick_display_name}] Executing stacking query with UNION ALL."
        )
        if output_format == "pandas":
            result = conn.execute(final_query).df()
            verbose and logger.info(
                f"[{brick_display_name}] Converted result to pandas DataFrame."
            )
        elif output_format == "polars":
            result = conn.execute(final_query).pl()
            verbose and logger.info(
                f"[{brick_display_name}] Converted result to Polars DataFrame."
            )
        elif output_format == "arrow":
            result = conn.execute(final_query).fetch_arrow_table()
            verbose and logger.info(
                f"[{brick_display_name}] Converted result to Arrow Table."
            )
        else:
            verbose and logger.error(
                f"[{brick_display_name}] Unsupported output format: {output_format}"
            )
            conn.close()
            raise ValueError(f"Unsupported output format: {output_format}")
        conn.close()
        verbose and logger.info(
            f"[{brick_display_name}] Successfully stacked {len(datasets)} datasets with {len(result)} total rows."
        )
    except Exception as e:
        verbose and logger.error(f"[{brick_display_name}] Error stacking datasets.")
        raise
    return result

Brick Info

version v0.1.3
python 3.10, 3.11, 3.12, 3.13
requirements
  • pandas
  • polars[pyarrow]
  • duckdb
  • pyarrow