Numerical Combinations
Combine every pair of numerical columns with standard + - × ÷ operations.
Numerical Combinations
Processing
This brick combines two specified numerical columns within a dataset by applying a chosen arithmetic operation (+, -, ×, ÷). The result is stored in a new column, and the user can specify the output format, the name of the new column, and whether to keep the original input columns. Division by zero or operations involving NULL values result in NULL.
Inputs
- data
- The input DataFrame or Arrow Table containing the numerical columns to be combined.
- column1 (optional)
- The name of the first numerical column used in the arithmetic calculation. This defaults to the value set in the options.
- column2 (optional)
- The name of the second numerical column used in the arithmetic calculation. This defaults to the value set in the options.
- operation (optional)
- The type of arithmetic operation to perform (
add,subtract,multiply, ordivide). This defaults to the value set in the options. - output column name (optional)
- The name for the newly created column containing the result. If not provided, a name based on the input columns and operation will be generated (e.g.,
column1_add_column2).
Inputs Types
| Input | Types |
|---|---|
data |
DataFrame, ArrowTable |
column1 |
Str |
column2 |
Str |
operation |
Str |
output column_name |
Str |
You can check the list of supported types here: Available Type Hints.
Outputs
- result
- The resulting dataset, which includes the original data plus the newly calculated column, in the specified output format.
Outputs Types
| Output | Types |
|---|---|
result |
DataFrame, ArrowTable |
You can check the list of supported types here: Available Type Hints.
Options
The Numerical Combinations brick contains some changeable options:
- Column 1
- Specifies the name of the first numerical column to be used in the arithmetic operation.
- Column 2
- Specifies the name of the second numerical column to be used in the arithmetic operation.
- Operation
- Selects the arithmetic operation to perform: add (+), subtract (-), multiply (×), or divide (÷). Defaults to 'add'.
- Output Column Name
- Defines the name of the new column that will store the results of the calculation. If left empty, a descriptive name will be generated based on the input columns and operation.
- Keep Original Columns
- If enabled, the original input columns used in the operation will remain in the output dataset. If disabled, they will be dropped if they are not needed by other columns.
- Output Format
- Specifies the desired format of the output data (pandas DataFrame, polars DataFrame, or Arrow Table). Defaults to 'pandas'.
- Verbose
- If enabled, detailed logs about the process, validation, and execution steps will be printed to the console.
import logging
import duckdb
import pandas as pd
import polars as pl
import pyarrow as pa
from coded_flows.types import Union, DataFrame, ArrowTable, Str
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def _coalesce(*values):
return next((v for v in values if v is not None), None)
def _sanitize_identifier(identifier):
"""
Sanitize SQL identifier by escaping special characters.
Handles double quotes and other problematic characters.
"""
return identifier.replace('"', '""')
def _is_numeric_type(duckdb_type):
"""
Check if a DuckDB type is numeric.
"""
type_lower = duckdb_type.lower()
return any(
(
t in type_lower
for t in [
"tinyint",
"smallint",
"integer",
"bigint",
"int",
"float",
"double",
"real",
"decimal",
"numeric",
]
)
)
def _get_operation_symbol(operation):
"""
Get the SQL operator symbol for the given operation.
"""
operations = {"add": "+", "subtract": "-", "multiply": "*", "divide": "/"}
return operations.get(operation, "+")
def _build_operation_expression(col1, col2, operation):
"""
Build a SQL expression for the operation that handles NULL values and division by zero.
Operations on empty cells and division by 0 yield empty cells (NULL).
"""
operator = _get_operation_symbol(operation)
if operation == "divide":
expr = f'\n CASE \n WHEN "{col2}" = 0 OR "{col2}" IS NULL OR "{col1}" IS NULL THEN NULL\n ELSE "{col1}" {operator} "{col2}"\n END\n '
else:
expr = f'\n CASE \n WHEN "{col1}" IS NULL OR "{col2}" IS NULL THEN NULL\n ELSE "{col1}" {operator} "{col2}"\n END\n '
return expr
def generate_numerical_combinations(
data: Union[DataFrame, ArrowTable],
column1: Str = None,
column2: Str = None,
operation: Str = None,
output_column_name: Str = None,
options=None,
) -> Union[DataFrame, ArrowTable]:
brick_display_name = "Numerical Combinations"
options = options or {}
verbose = options.get("verbose", True)
column1 = _coalesce(column1, options.get("column1", ""))
column2 = _coalesce(column2, options.get("column2", ""))
operation = _coalesce(operation, options.get("operation", "add"))
output_column_name = _coalesce(
output_column_name, options.get("output_column_name", "")
)
keep_original_columns = options.get("keep_original_columns", True)
output_format = options.get("output_format", "pandas")
result = None
conn = None
if not column1 or not isinstance(column1, str):
verbose and logger.error(
f"[{brick_display_name}] Column 1 must be provided as a non-empty string!"
)
raise ValueError("Column 1 must be provided as a non-empty string!")
if not column2 or not isinstance(column2, str):
verbose and logger.error(
f"[{brick_display_name}] Column 2 must be provided as a non-empty string!"
)
raise ValueError("Column 2 must be provided as a non-empty string!")
if operation not in ["add", "subtract", "multiply", "divide"]:
verbose and logger.error(
f"[{brick_display_name}] Invalid operation: '{operation}'. Must be 'add', 'subtract', 'multiply', or 'divide'."
)
raise ValueError(
f"Invalid operation: '{operation}'. Must be 'add', 'subtract', 'multiply', or 'divide'."
)
try:
operation_symbols = {
"add": "+",
"subtract": "-",
"multiply": "×",
"divide": "÷",
}
verbose and logger.info(
f"[{brick_display_name}] Starting numerical combination: '{column1}' {operation_symbols[operation]} '{column2}'."
)
data_type = None
if isinstance(data, pd.DataFrame):
data_type = "pandas"
elif isinstance(data, pl.DataFrame):
data_type = "polars"
elif isinstance(data, (pa.Table, pa.lib.Table)):
data_type = "arrow"
if data_type is None:
verbose and logger.error(
f"[{brick_display_name}] Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
)
raise ValueError(
"Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
)
verbose and logger.info(
f"[{brick_display_name}] Detected input format: {data_type}."
)
conn = duckdb.connect(":memory:")
conn.register("input_table", data)
column_info = conn.execute("DESCRIBE input_table").fetchall()
all_columns = {col[0]: col[1] for col in column_info}
verbose and logger.info(
f"[{brick_display_name}] Total columns in data: {len(all_columns)}."
)
if column1 not in all_columns:
verbose and logger.error(
f"[{brick_display_name}] Column '{column1}' not found in data!"
)
raise ValueError(f"Column '{column1}' not found in data!")
if column2 not in all_columns:
verbose and logger.error(
f"[{brick_display_name}] Column '{column2}' not found in data!"
)
raise ValueError(f"Column '{column2}' not found in data!")
col1_type = all_columns[column1]
col2_type = all_columns[column2]
if not _is_numeric_type(col1_type):
verbose and logger.error(
f"[{brick_display_name}] Column '{column1}' is not numeric (type: {col1_type})!"
)
raise ValueError(
f"Column '{column1}' is not numeric (type: {col1_type}). Both columns must be numerical."
)
if not _is_numeric_type(col2_type):
verbose and logger.error(
f"[{brick_display_name}] Column '{column2}' is not numeric (type: {col2_type})!"
)
raise ValueError(
f"Column '{column2}' is not numeric (type: {col2_type}). Both columns must be numerical."
)
verbose and logger.info(
f"[{brick_display_name}] Both columns are numeric. Column 1 type: {col1_type}, Column 2 type: {col2_type}."
)
if not output_column_name:
output_column_name = f"{column1}_{operation}_{column2}"
verbose and logger.info(
f"[{brick_display_name}] Generated output column name: '{output_column_name}'."
)
if output_column_name in all_columns:
verbose and logger.warning(
f"[{brick_display_name}] Output column name '{output_column_name}' already exists and will be overwritten."
)
sanitized_col1 = _sanitize_identifier(column1)
sanitized_col2 = _sanitize_identifier(column2)
sanitized_output = _sanitize_identifier(output_column_name)
operation_expr = _build_operation_expression(
sanitized_col1, sanitized_col2, operation
)
select_parts = []
if keep_original_columns:
for col in all_columns.keys():
if col != output_column_name:
sanitized_col = _sanitize_identifier(col)
select_parts.append(f'"{sanitized_col}"')
else:
for col in all_columns.keys():
if col != column1 and col != column2:
sanitized_col = _sanitize_identifier(col)
select_parts.append(f'"{sanitized_col}"')
select_parts.append(f'{operation_expr} AS "{sanitized_output}"')
select_clause = ", ".join(select_parts)
query = f"SELECT {select_clause} FROM input_table"
verbose and logger.info(
f"[{brick_display_name}] Executing query to generate numerical combination."
)
if output_format == "pandas":
result = conn.execute(query).df()
verbose and logger.info(
f"[{brick_display_name}] Converted result to pandas DataFrame."
)
elif output_format == "polars":
result = conn.execute(query).pl()
verbose and logger.info(
f"[{brick_display_name}] Converted result to Polars DataFrame."
)
elif output_format == "arrow":
result = conn.execute(query).fetch_arrow_table()
verbose and logger.info(
f"[{brick_display_name}] Converted result to Arrow Table."
)
else:
verbose and logger.error(
f"[{brick_display_name}] Unsupported output format: {output_format}"
)
raise ValueError(f"Unsupported output format: {output_format}")
verbose and logger.info(
f"[{brick_display_name}] Numerical combination completed successfully. Created column '{output_column_name}' with {operation_symbols[operation]} operation."
)
except Exception as e:
verbose and logger.error(
f"[{brick_display_name}] Error during numerical combination operation: {str(e)}"
)
raise
finally:
if conn is not None:
conn.close()
return result
Brick Info
- pandas
- polars[pyarrow]
- duckdb
- pyarrow