Window Computation
Perform window operations on a dataset with configurable frames and ordering.
Window Computation
Processing
This function performs window operations (analytic functions) on the input dataset. It supports various aggregation, ranking, and value functions (like sum, avg, row_number, lag, lead) with configurable partitioning, ordering, and window frame boundaries (ROWS BETWEEN). The resulting window calculation is appended to the original dataset as a new column.
Inputs
- data
- The input dataset (Pandas DataFrame, Polars DataFrame, or PyArrow Table) on which the window computation is performed.
Inputs Types
| Input | Types |
|---|---|
data |
DataFrame, ArrowTable |
You can check the list of supported types here: Available Type Hints.
Outputs
- result
- The output dataset containing all original columns plus the new computed window column. The format depends on the selected Output Format option.
Outputs Types
| Output | Types |
|---|---|
result |
DataFrame, ArrowTable |
You can check the list of supported types here: Available Type Hints.
Options
The Window Computation brick contains some changeable options:
- Target Column
- The column on which the window operation (aggregation or value function) should be applied. This is not required for ranking functions like
row_numberorrank, or when usingcountto count all rows. - Window Operation
- The specific window function to apply, such as aggregation (
sum,avg), ranking (row_number,rank), or value functions (lag,lead). - Distinct (Aggregates only)
- If enabled, calculates aggregate operations (like
COUNT,SUM,AVG) only on distinct values within the current window partition. - Ignore Nulls (Value functions only)
- If enabled, skips null values when calculating value-based window functions such as
LAGorFIRST_VALUE. - Partition By Columns
- A list of columns used to partition the dataset, defining the groups for the window function.
- Order By Columns
- A list of columns and their corresponding directions (ASC or DESC) used to define the order within each partition.
- Offset (for Lag/Lead)
- The number of rows to look backward (
LAG) or forward (LEAD) when performing offset operations. Must be 1 or greater. - Preceding Rows (Aggregation)
- Defines the start boundary of the window frame for aggregation functions. Set to
-1for UNBOUNDED PRECEDING,0for CURRENT ROW, or a positive integer for a fixed number of rows preceding. - Following Rows (Aggregation)
- Defines the end boundary of the window frame for aggregation functions. Set to
-1for UNBOUNDED FOLLOWING,0for CURRENT ROW, or a positive integer for a fixed number of rows following. - Concat Separator
- The separator string used when performing the
concat(STRING_AGG) operation. - Result Column Name
- The name of the new column that will contain the result of the window computation. If left empty, a name is generated based on the operation and target column.
- Output Format
- Specifies the desired format of the resulting data, either
pandas(DataFrame),polars(DataFrame), orarrow(Arrow Table). - Verbose
- Enables detailed logging and informational messages during the execution of the brick.
import logging
import duckdb
import pandas as pd
import polars as pl
import pyarrow as pa
from coded_flows.types import Union, DataFrame, ArrowTable, Int, Str
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def _sanitize_identifier(identifier):
"""
Sanitize SQL identifier by escaping special characters.
"""
return identifier.replace('"', '""')
def _coalesce(*values):
"""Return the first non-None value."""
return next((v for v in values if v is not None), None)
def _get_frame_bound(val, direction):
if val == -1:
return f"UNBOUNDED {direction}"
elif val == 0:
return "CURRENT ROW"
else:
return f"{val} {direction}"
def window_computation(
data: Union[DataFrame, ArrowTable], options=None
) -> Union[DataFrame, ArrowTable]:
brick_display_name = "Window Computation"
options = options or {}
verbose = options.get("verbose", True)
target_column = _coalesce(target_column, options.get("target_column", ""))
operation = options.get("operation", "sum")
partition_by = options.get("partition_by", [])
order_by_pairs = options.get("order_by", [])
concat_separator = options.get("concat_separator", ", ")
result_column_name = options.get("result_column_name", "")
output_format = options.get("output_format", "pandas")
use_distinct = options.get("use_distinct", False)
ignore_nulls = options.get("ignore_nulls", False)
offset_val = options.get("offset", 1)
preceding_val = options.get("preceding_rows", -1)
following_val = options.get("following_rows", -1)
result = None
try:
verbose and logger.info(
f"[{brick_display_name}] Starting window computation. Operation: '{operation}'."
)
if not target_column and operation not in ["row_number", "rank", "count"]:
verbose and logger.error(
f"[{brick_display_name}] Target column must be specified for operation '{operation}'."
)
raise ValueError(
f"Target column must be specified for operation '{operation}'."
)
data_type = None
if isinstance(data, pd.DataFrame):
data_type = "pandas"
elif isinstance(data, pl.DataFrame):
data_type = "polars"
elif isinstance(data, (pa.Table, pa.lib.Table)):
data_type = "arrow"
else:
verbose and logger.error(
f"[{brick_display_name}] Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table."
)
raise ValueError(
"Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
)
verbose and logger.info(
f"[{brick_display_name}] Detected input format: {data_type}."
)
conn = duckdb.connect(":memory:")
conn.register("input_table_raw", data)
conn.execute(
"CREATE OR REPLACE TABLE input_table AS SELECT *, ROW_NUMBER() OVER () AS __original_row_id FROM input_table_raw"
)
column_info = conn.execute("DESCRIBE input_table").fetchall()
all_columns = [col[0] for col in column_info if col[0] != "__original_row_id"]
if target_column and target_column not in all_columns:
raise ValueError(f"Target column '{target_column}' not found.")
for col in partition_by:
if col not in all_columns:
raise ValueError(f"Partition column '{col}' not found.")
parsed_order_by = []
for item in order_by_pairs:
col_name = item.get("key")
direction = item.get("value", "ASC")
if col_name not in all_columns:
raise ValueError(f"Order column '{col_name}' not found.")
parsed_order_by.append(f'"{_sanitize_identifier(col_name)}" {direction}')
sanitized_target = (
f'"{_sanitize_identifier(target_column)}"' if target_column else "*"
)
distinct_clause = "DISTINCT " if use_distinct else ""
ignore_nulls_clause = " IGNORE NULLS" if ignore_nulls else ""
if operation == "row_number":
window_expr = "ROW_NUMBER()"
elif operation == "rank":
window_expr = "RANK()"
elif operation == "max":
window_expr = f"MAX({distinct_clause}{sanitized_target})"
elif operation == "min":
window_expr = f"MIN({distinct_clause}{sanitized_target})"
elif operation == "avg":
window_expr = f"AVG({distinct_clause}{sanitized_target})"
elif operation == "sum":
window_expr = f"SUM({distinct_clause}{sanitized_target})"
elif operation == "stddev":
window_expr = f"STDDEV({distinct_clause}{sanitized_target})"
elif operation == "count":
target_expr = sanitized_target if target_column else "*"
window_expr = f"COUNT({distinct_clause}{target_expr})"
elif operation == "first":
window_expr = f"FIRST_VALUE({sanitized_target}{ignore_nulls_clause})"
elif operation == "last":
window_expr = f"LAST_VALUE({sanitized_target}{ignore_nulls_clause})"
elif operation == "concat":
window_expr = (
f"STRING_AGG({distinct_clause}{sanitized_target}, '{concat_separator}')"
)
elif operation == "lag":
window_expr = f"LAG({sanitized_target}, {offset_val}{ignore_nulls_clause})"
elif operation == "lead":
window_expr = f"LEAD({sanitized_target}, {offset_val}{ignore_nulls_clause})"
elif operation == "lagdiff":
window_expr = f"{sanitized_target} - LAG({sanitized_target}, {offset_val}{ignore_nulls_clause})"
elif operation == "leaddiff":
window_expr = f"{sanitized_target} - LEAD({sanitized_target}, {offset_val}{ignore_nulls_clause})"
else:
raise ValueError(f"Unsupported operation: {operation}")
over_parts = []
if partition_by:
sanitized_partition = [
f'"{_sanitize_identifier(col)}"' for col in partition_by
]
over_parts.append("PARTITION BY " + ", ".join(sanitized_partition))
if parsed_order_by:
over_parts.append("ORDER BY " + ", ".join(parsed_order_by))
aggregates_with_frames = [
"sum",
"avg",
"min",
"max",
"count",
"stddev",
"concat",
"first",
"last",
]
if operation in aggregates_with_frames:
start_frame = _get_frame_bound(preceding_val, "PRECEDING")
end_frame = _get_frame_bound(following_val, "FOLLOWING")
frame_clause = f"ROWS BETWEEN {start_frame} AND {end_frame}"
over_parts.append(frame_clause)
over_clause_str = " ".join(over_parts)
full_window_function = f"{window_expr} OVER ({over_clause_str})"
if not result_column_name:
result_column_name = (
f"{operation}_{(target_column if target_column else 'index')}"
)
sanitized_result = f'"{_sanitize_identifier(result_column_name)}"'
sanitized_all_columns = [
f'"{_sanitize_identifier(col)}"' for col in all_columns
]
select_cols_str = ", ".join(sanitized_all_columns)
query = f"\n SELECT \n {select_cols_str}, \n {full_window_function} AS {sanitized_result} \n FROM input_table\n ORDER BY __original_row_id\n "
verbose and logger.info(f"[{brick_display_name}] Executing Query...")
if output_format == "pandas":
result = conn.execute(query).df()
elif output_format == "polars":
result = conn.execute(query).pl()
elif output_format == "arrow":
result = conn.execute(query).fetch_arrow_table()
else:
raise ValueError(f"Unsupported output format: {output_format}")
conn.close()
verbose and logger.info(f"[{brick_display_name}] Completed successfully.")
except Exception as e:
verbose and logger.error(f"[{brick_display_name}] Error: {str(e)}")
raise
return result
Brick Info
- pandas
- polars[pyarrow]
- duckdb
- pyarrow