Compute Average
Compute the line by line arithmetical mean (average) of a set of numeric columns.
Compute Average
Processing
This function calculates the row-wise arithmetic mean (average) across a specified set of numeric columns in the input dataset. The input data must be a Pandas DataFrame, Polars DataFrame, or PyArrow Table. Columns can be selected explicitly via a list or dynamically using a Regular Expression pattern. The resulting average is added as a new column to the dataset.
Inputs
- data
- The input dataset (Pandas DataFrame, Polars DataFrame, or PyArrow Table) on which the average calculation will be performed.
- columns (optional)
- A list of column names that should be included in the average calculation. If the
regex patterninput or option is provided, this list is ignored. - regex pattern (optional)
- A regular expression pattern used to dynamically select numeric columns for averaging. If provided, this overrides the explicit
columnslist. - output column (optional)
- The name assigned to the newly created column containing the calculated average. Defaults to 'average'.
- use default value (optional)
- Boolean flag to determine if a specific default value should be used when computing the average of a row where all selected numeric columns are NULL.
- default value (optional)
- The numeric value to insert into the output column when the row average calculation results in NULL (i.e., when
use default valueis true and all input values are missing).
Inputs Types
| Input | Types |
|---|---|
data |
DataFrame, ArrowTable |
columns |
List |
regex pattern |
Str |
output column |
Str |
use default value |
Bool |
default value |
Number |
You can check the list of supported types here: Available Type Hints.
Outputs
- result
- The dataset containing all original columns plus the newly added column with the calculated row averages. The format of the output depends on the
Output Formatoption selected.
Outputs Types
| Output | Types |
|---|---|
result |
DataFrame, ArrowTable |
You can check the list of supported types here: Available Type Hints.
Options
The Compute Average brick contains some changeable options:
- Columns to Average
- Specifies a list of column names to include in the average calculation.
- Regex Pattern
- A regular expression used to select columns for averaging dynamically.
- Output Column Name
- Defines the name of the new column that will store the computed average. Defaults to
average. - Use Default Value
- If enabled, rows where all selected numeric inputs are NULL will receive the specified default value instead of NULL.
- Default Value
- The specific numeric value to be used when the row average is calculated over entirely NULL inputs and 'Use Default Value' is enabled. Defaults to 0.0.
- Output Format
- Selects the format of the returned dataset (
pandasDataFrame,polarsDataFrame, orarrowTable). - Safe Mode
- If enabled, the function will silently skip non-existent columns (if using a column list) or non-numeric columns instead of raising an error.
- Verbose
- If enabled, logs informational messages about the process steps.
import logging
import duckdb
import pandas as pd
import polars as pl
import pyarrow as pa
import re
from coded_flows.types import Union, List, DataFrame, ArrowTable, Str, Number, Bool
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def _coalesce(*values):
return next((v for v in values if v is not None), None)
def _sanitize_identifier(identifier):
"""
Sanitize SQL identifier by escaping special characters.
Handles double quotes and other problematic characters.
"""
return identifier.replace('"', '""')
def _is_numeric_type(duckdb_type):
"""
Check if a DuckDB type is numeric.
"""
type_lower = duckdb_type.lower()
return any(
(
t in type_lower
for t in [
"tinyint",
"smallint",
"integer",
"bigint",
"int",
"float",
"double",
"real",
"decimal",
"numeric",
]
)
)
def compute_average(
data: Union[DataFrame, ArrowTable],
columns: List = None,
regex_pattern: Str = None,
output_column: Str = None,
use_default_value: Bool = None,
default_value: Number = None,
options=None,
) -> Union[DataFrame, ArrowTable]:
brick_display_name = "Compute Average"
options = options or {}
verbose = options.get("verbose", True)
columns = _coalesce(columns, options.get("columns", []))
regex_pattern = _coalesce(regex_pattern, options.get("regex_pattern", ""))
output_column = _coalesce(output_column, options.get("output_column", "average"))
use_default_value = _coalesce(
use_default_value, options.get("use_default_value", False)
)
default_value = _coalesce(default_value, options.get("default_value", 0.0))
output_format = options.get("output_format", "pandas")
safe_mode = options.get("safe_mode", False)
result = None
conn = None
if not isinstance(columns, list) or not all((isinstance(c, str) for c in columns)):
verbose and logger.error(
f"[{brick_display_name}] Invalid columns format! Expected a list."
)
raise ValueError("Columns must be provided as a list!")
if not output_column or not isinstance(output_column, str):
verbose and logger.error(
f"[{brick_display_name}] Invalid output column name! Expected a non-empty string."
)
raise ValueError("Output column name must be a non-empty string!")
try:
avg_mode = None
if regex_pattern:
avg_mode = "regex_pattern"
elif len(columns) == 0:
verbose and logger.error(
f"[{brick_display_name}] No columns specified! Provide either a column list or a regex pattern."
)
raise ValueError(
"No columns specified! Provide either a column list or a regex pattern."
)
else:
avg_mode = "column_list"
verbose and logger.info(
f"[{brick_display_name}] Detected mode: '{avg_mode}'. Starting average computation."
)
data_type = None
if isinstance(data, pd.DataFrame):
data_type = "pandas"
elif isinstance(data, pl.DataFrame):
data_type = "polars"
elif isinstance(data, (pa.Table, pa.lib.Table)):
data_type = "arrow"
if data_type is None:
verbose and logger.error(
f"[{brick_display_name}] Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
)
raise ValueError(
"Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
)
verbose and logger.info(
f"[{brick_display_name}] Detected input format: {data_type}."
)
conn = duckdb.connect(":memory:")
conn.register("input_table", data)
column_info = conn.execute("DESCRIBE input_table").fetchall()
all_columns = {col[0]: col[1] for col in column_info}
verbose and logger.info(
f"[{brick_display_name}] Total columns in data: {len(all_columns)}."
)
if output_column in all_columns:
verbose and logger.warning(
f"[{brick_display_name}] Output column '{output_column}' already exists and will be overwritten."
)
columns_to_average = []
if avg_mode == "column_list":
if not safe_mode:
missing_columns = [col for col in columns if col not in all_columns]
if missing_columns:
verbose and logger.error(
f"[{brick_display_name}] Columns not found in data: {missing_columns}"
)
raise ValueError(f"Columns not found in data: {missing_columns}")
columns_to_average = [col for col in columns if col in all_columns]
skipped = len(columns) - len(columns_to_average)
if safe_mode and skipped > 0:
skipped_cols = [col for col in columns if col not in all_columns]
verbose and logger.warning(
f"[{brick_display_name}] Safe mode: Skipped {skipped} non-existent columns: {skipped_cols}"
)
verbose and logger.info(
f"[{brick_display_name}] Computing average over {len(columns_to_average)} column(s): {columns_to_average}."
)
elif avg_mode == "regex_pattern":
try:
pattern = re.compile(regex_pattern)
columns_to_average = [
col for col in all_columns.keys() if pattern.search(col)
]
if not columns_to_average:
verbose and logger.error(
f"[{brick_display_name}] No columns matched regex pattern '{regex_pattern}'."
)
raise ValueError(
f"No columns matched regex pattern '{regex_pattern}'."
)
verbose and logger.info(
f"[{brick_display_name}] Regex pattern '{regex_pattern}' matched {len(columns_to_average)} columns: {columns_to_average}."
)
except re.error as e:
verbose and logger.error(
f"[{brick_display_name}] Invalid regex pattern."
)
raise ValueError(f"Invalid regex pattern!")
numeric_columns_to_average = []
non_numeric_columns = []
for col in columns_to_average:
col_type = all_columns[col]
if _is_numeric_type(col_type):
numeric_columns_to_average.append(col)
else:
non_numeric_columns.append((col, col_type))
if non_numeric_columns:
if safe_mode:
verbose and logger.warning(
f"[{brick_display_name}] Safe mode: Skipping {len(non_numeric_columns)} non-numeric columns: {[col for (col, _) in non_numeric_columns]}"
)
else:
verbose and logger.error(
f"[{brick_display_name}] Found non-numeric columns: {[(col, typ) for (col, typ) in non_numeric_columns]}"
)
raise ValueError(
f"Cannot compute average over non-numeric columns: {[col for (col, _) in non_numeric_columns]}"
)
if not numeric_columns_to_average:
verbose and logger.error(
f"[{brick_display_name}] No numeric columns found to compute average."
)
raise ValueError("No numeric columns found to compute average.")
verbose and logger.info(
f"[{brick_display_name}] Computing average over {len(numeric_columns_to_average)} numeric columns."
)
sanitized_cols = [
_sanitize_identifier(col) for col in numeric_columns_to_average
]
count_parts = [
f'CASE WHEN "{col}" IS NOT NULL THEN 1 ELSE 0 END' for col in sanitized_cols
]
count_expr = " + ".join(count_parts)
sum_parts = [f'COALESCE("{col}", 0)' for col in sanitized_cols]
sum_expr = " + ".join(sum_parts)
if use_default_value:
avg_expr = f"\n CASE \n WHEN ({count_expr}) = 0 THEN {default_value}\n ELSE ({sum_expr}) / ({count_expr})\n END\n "
verbose and logger.info(
f"[{brick_display_name}] Using default value {default_value} for rows with all NULL values."
)
else:
avg_expr = f"\n CASE \n WHEN ({count_expr}) = 0 THEN NULL\n ELSE ({sum_expr}) / ({count_expr})\n END\n "
verbose and logger.info(
f"[{brick_display_name}] Rows with all NULL values will result in NULL."
)
select_parts = []
for col in all_columns.keys():
if col == output_column:
continue
sanitized_col = _sanitize_identifier(col)
select_parts.append(f'"{sanitized_col}"')
sanitized_output_col = _sanitize_identifier(output_column)
select_parts.append(f'{avg_expr} AS "{sanitized_output_col}"')
select_clause = ", ".join(select_parts)
query = f"SELECT {select_clause} FROM input_table"
verbose and logger.info(
f"[{brick_display_name}] Executing query to compute average."
)
if output_format == "pandas":
result = conn.execute(query).df()
verbose and logger.info(
f"[{brick_display_name}] Converted result to pandas DataFrame."
)
elif output_format == "polars":
result = conn.execute(query).pl()
verbose and logger.info(
f"[{brick_display_name}] Converted result to Polars DataFrame."
)
elif output_format == "arrow":
result = conn.execute(query).fetch_arrow_table()
verbose and logger.info(
f"[{brick_display_name}] Converted result to Arrow Table."
)
else:
verbose and logger.error(
f"[{brick_display_name}] Unsupported output format: {output_format}"
)
raise ValueError(f"Unsupported output format: {output_format}")
verbose and logger.info(
f"[{brick_display_name}] Average computation completed successfully. Output column: '{output_column}'."
)
except Exception as e:
verbose and logger.error(
f"[{brick_display_name}] Error during average computation: {str(e)}"
)
raise
finally:
if conn is not None:
conn.close()
return result
Brick Info
- pandas
- polars[pyarrow]
- duckdb
- pyarrow