Window Computation

Perform window operations on a dataset with configurable frames and ordering.

Window Computation

Processing

This function performs window operations (analytic functions) on the input dataset. It supports various aggregation, ranking, and value functions (like sum, avg, row_number, lag, lead) with configurable partitioning, ordering, and window frame boundaries (ROWS BETWEEN). The resulting window calculation is appended to the original dataset as a new column.

Inputs

data
The input dataset (Pandas DataFrame, Polars DataFrame, or PyArrow Table) on which the window computation is performed.

Inputs Types

Input Types
data DataFrame, ArrowTable

You can check the list of supported types here: Available Type Hints.

Outputs

result
The output dataset containing all original columns plus the new computed window column. The format depends on the selected Output Format option.

Outputs Types

Output Types
result DataFrame, ArrowTable

You can check the list of supported types here: Available Type Hints.

Options

The Window Computation brick contains some changeable options:

Target Column
The column on which the window operation (aggregation or value function) should be applied. This is not required for ranking functions like row_number or rank, or when using count to count all rows.
Window Operation
The specific window function to apply, such as aggregation (sum, avg), ranking (row_number, rank), or value functions (lag, lead).
Distinct (Aggregates only)
If enabled, calculates aggregate operations (like COUNT, SUM, AVG) only on distinct values within the current window partition.
Ignore Nulls (Value functions only)
If enabled, skips null values when calculating value-based window functions such as LAG or FIRST_VALUE.
Partition By Columns
A list of columns used to partition the dataset, defining the groups for the window function.
Order By Columns
A list of columns and their corresponding directions (ASC or DESC) used to define the order within each partition.
Offset (for Lag/Lead)
The number of rows to look backward (LAG) or forward (LEAD) when performing offset operations. Must be 1 or greater.
Preceding Rows (Aggregation)
Defines the start boundary of the window frame for aggregation functions. Set to -1 for UNBOUNDED PRECEDING, 0 for CURRENT ROW, or a positive integer for a fixed number of rows preceding.
Following Rows (Aggregation)
Defines the end boundary of the window frame for aggregation functions. Set to -1 for UNBOUNDED FOLLOWING, 0 for CURRENT ROW, or a positive integer for a fixed number of rows following.
Concat Separator
The separator string used when performing the concat (STRING_AGG) operation.
Result Column Name
The name of the new column that will contain the result of the window computation. If left empty, a name is generated based on the operation and target column.
Output Format
Specifies the desired format of the resulting data, either pandas (DataFrame), polars (DataFrame), or arrow (Arrow Table).
Verbose
Enables detailed logging and informational messages during the execution of the brick.
import logging
import duckdb
import pandas as pd
import polars as pl
import pyarrow as pa
from coded_flows.types import Union, DataFrame, ArrowTable, Int, Str

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def _sanitize_identifier(identifier):
    """
    Sanitize SQL identifier by escaping special characters.
    """
    return identifier.replace('"', '""')


def _coalesce(*values):
    """Return the first non-None value."""
    return next((v for v in values if v is not None), None)


def _get_frame_bound(val, direction):
    if val == -1:
        return f"UNBOUNDED {direction}"
    elif val == 0:
        return "CURRENT ROW"
    else:
        return f"{val} {direction}"


def window_computation(
    data: Union[DataFrame, ArrowTable], options=None
) -> Union[DataFrame, ArrowTable]:
    brick_display_name = "Window Computation"
    options = options or {}
    verbose = options.get("verbose", True)
    target_column = _coalesce(target_column, options.get("target_column", ""))
    operation = options.get("operation", "sum")
    partition_by = options.get("partition_by", [])
    order_by_pairs = options.get("order_by", [])
    concat_separator = options.get("concat_separator", ", ")
    result_column_name = options.get("result_column_name", "")
    output_format = options.get("output_format", "pandas")
    use_distinct = options.get("use_distinct", False)
    ignore_nulls = options.get("ignore_nulls", False)
    offset_val = options.get("offset", 1)
    preceding_val = options.get("preceding_rows", -1)
    following_val = options.get("following_rows", -1)
    result = None
    try:
        verbose and logger.info(
            f"[{brick_display_name}] Starting window computation. Operation: '{operation}'."
        )
        if not target_column and operation not in ["row_number", "rank", "count"]:
            verbose and logger.error(
                f"[{brick_display_name}] Target column must be specified for operation '{operation}'."
            )
            raise ValueError(
                f"Target column must be specified for operation '{operation}'."
            )
        data_type = None
        if isinstance(data, pd.DataFrame):
            data_type = "pandas"
        elif isinstance(data, pl.DataFrame):
            data_type = "polars"
        elif isinstance(data, (pa.Table, pa.lib.Table)):
            data_type = "arrow"
        else:
            verbose and logger.error(
                f"[{brick_display_name}] Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table."
            )
            raise ValueError(
                "Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
            )
        verbose and logger.info(
            f"[{brick_display_name}] Detected input format: {data_type}."
        )
        conn = duckdb.connect(":memory:")
        conn.register("input_table_raw", data)
        conn.execute(
            "CREATE OR REPLACE TABLE input_table AS SELECT *, ROW_NUMBER() OVER () AS __original_row_id FROM input_table_raw"
        )
        column_info = conn.execute("DESCRIBE input_table").fetchall()
        all_columns = [col[0] for col in column_info if col[0] != "__original_row_id"]
        if target_column and target_column not in all_columns:
            raise ValueError(f"Target column '{target_column}' not found.")
        for col in partition_by:
            if col not in all_columns:
                raise ValueError(f"Partition column '{col}' not found.")
        parsed_order_by = []
        for item in order_by_pairs:
            col_name = item.get("key")
            direction = item.get("value", "ASC")
            if col_name not in all_columns:
                raise ValueError(f"Order column '{col_name}' not found.")
            parsed_order_by.append(f'"{_sanitize_identifier(col_name)}" {direction}')
        sanitized_target = (
            f'"{_sanitize_identifier(target_column)}"' if target_column else "*"
        )
        distinct_clause = "DISTINCT " if use_distinct else ""
        ignore_nulls_clause = " IGNORE NULLS" if ignore_nulls else ""
        if operation == "row_number":
            window_expr = "ROW_NUMBER()"
        elif operation == "rank":
            window_expr = "RANK()"
        elif operation == "max":
            window_expr = f"MAX({distinct_clause}{sanitized_target})"
        elif operation == "min":
            window_expr = f"MIN({distinct_clause}{sanitized_target})"
        elif operation == "avg":
            window_expr = f"AVG({distinct_clause}{sanitized_target})"
        elif operation == "sum":
            window_expr = f"SUM({distinct_clause}{sanitized_target})"
        elif operation == "stddev":
            window_expr = f"STDDEV({distinct_clause}{sanitized_target})"
        elif operation == "count":
            target_expr = sanitized_target if target_column else "*"
            window_expr = f"COUNT({distinct_clause}{target_expr})"
        elif operation == "first":
            window_expr = f"FIRST_VALUE({sanitized_target}{ignore_nulls_clause})"
        elif operation == "last":
            window_expr = f"LAST_VALUE({sanitized_target}{ignore_nulls_clause})"
        elif operation == "concat":
            window_expr = (
                f"STRING_AGG({distinct_clause}{sanitized_target}, '{concat_separator}')"
            )
        elif operation == "lag":
            window_expr = f"LAG({sanitized_target}, {offset_val}{ignore_nulls_clause})"
        elif operation == "lead":
            window_expr = f"LEAD({sanitized_target}, {offset_val}{ignore_nulls_clause})"
        elif operation == "lagdiff":
            window_expr = f"{sanitized_target} - LAG({sanitized_target}, {offset_val}{ignore_nulls_clause})"
        elif operation == "leaddiff":
            window_expr = f"{sanitized_target} - LEAD({sanitized_target}, {offset_val}{ignore_nulls_clause})"
        else:
            raise ValueError(f"Unsupported operation: {operation}")
        over_parts = []
        if partition_by:
            sanitized_partition = [
                f'"{_sanitize_identifier(col)}"' for col in partition_by
            ]
            over_parts.append("PARTITION BY " + ", ".join(sanitized_partition))
        if parsed_order_by:
            over_parts.append("ORDER BY " + ", ".join(parsed_order_by))
        aggregates_with_frames = [
            "sum",
            "avg",
            "min",
            "max",
            "count",
            "stddev",
            "concat",
            "first",
            "last",
        ]
        if operation in aggregates_with_frames:
            start_frame = _get_frame_bound(preceding_val, "PRECEDING")
            end_frame = _get_frame_bound(following_val, "FOLLOWING")
            frame_clause = f"ROWS BETWEEN {start_frame} AND {end_frame}"
            over_parts.append(frame_clause)
        over_clause_str = " ".join(over_parts)
        full_window_function = f"{window_expr} OVER ({over_clause_str})"
        if not result_column_name:
            result_column_name = (
                f"{operation}_{(target_column if target_column else 'index')}"
            )
        sanitized_result = f'"{_sanitize_identifier(result_column_name)}"'
        sanitized_all_columns = [
            f'"{_sanitize_identifier(col)}"' for col in all_columns
        ]
        select_cols_str = ", ".join(sanitized_all_columns)
        query = f"\n            SELECT \n                {select_cols_str}, \n                {full_window_function} AS {sanitized_result} \n            FROM input_table\n            ORDER BY __original_row_id\n        "
        verbose and logger.info(f"[{brick_display_name}] Executing Query...")
        if output_format == "pandas":
            result = conn.execute(query).df()
        elif output_format == "polars":
            result = conn.execute(query).pl()
        elif output_format == "arrow":
            result = conn.execute(query).fetch_arrow_table()
        else:
            raise ValueError(f"Unsupported output format: {output_format}")
        conn.close()
        verbose and logger.info(f"[{brick_display_name}] Completed successfully.")
    except Exception as e:
        verbose and logger.error(f"[{brick_display_name}] Error: {str(e)}")
        raise
    return result

Brick Info

version v0.1.3
python 3.10, 3.11, 3.12, 3.13
requirements
  • pandas
  • polars[pyarrow]
  • duckdb
  • pyarrow