Numerical Combinations

Combine every pair of numerical columns with standard + - × ÷ operations.

Numerical Combinations

Processing

This brick combines two specified numerical columns within a dataset by applying a chosen arithmetic operation (+, -, ×, ÷). The result is stored in a new column, and the user can specify the output format, the name of the new column, and whether to keep the original input columns. Division by zero or operations involving NULL values result in NULL.

Inputs

data
The input DataFrame or Arrow Table containing the numerical columns to be combined.
column1 (optional)
The name of the first numerical column used in the arithmetic calculation. This defaults to the value set in the options.
column2 (optional)
The name of the second numerical column used in the arithmetic calculation. This defaults to the value set in the options.
operation (optional)
The type of arithmetic operation to perform (add, subtract, multiply, or divide). This defaults to the value set in the options.
output column name (optional)
The name for the newly created column containing the result. If not provided, a name based on the input columns and operation will be generated (e.g., column1_add_column2).

Inputs Types

Input Types
data DataFrame, ArrowTable
column1 Str
column2 Str
operation Str
output column_name Str

You can check the list of supported types here: Available Type Hints.

Outputs

result
The resulting dataset, which includes the original data plus the newly calculated column, in the specified output format.

Outputs Types

Output Types
result DataFrame, ArrowTable

You can check the list of supported types here: Available Type Hints.

Options

The Numerical Combinations brick contains some changeable options:

Column 1
Specifies the name of the first numerical column to be used in the arithmetic operation.
Column 2
Specifies the name of the second numerical column to be used in the arithmetic operation.
Operation
Selects the arithmetic operation to perform: add (+), subtract (-), multiply (×), or divide (÷). Defaults to 'add'.
Output Column Name
Defines the name of the new column that will store the results of the calculation. If left empty, a descriptive name will be generated based on the input columns and operation.
Keep Original Columns
If enabled, the original input columns used in the operation will remain in the output dataset. If disabled, they will be dropped if they are not needed by other columns.
Output Format
Specifies the desired format of the output data (pandas DataFrame, polars DataFrame, or Arrow Table). Defaults to 'pandas'.
Verbose
If enabled, detailed logs about the process, validation, and execution steps will be printed to the console.
import logging
import duckdb
import pandas as pd
import polars as pl
import pyarrow as pa
from coded_flows.types import Union, DataFrame, ArrowTable, Str

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


def _coalesce(*values):
    return next((v for v in values if v is not None), None)


def _sanitize_identifier(identifier):
    """
    Sanitize SQL identifier by escaping special characters.
    Handles double quotes and other problematic characters.
    """
    return identifier.replace('"', '""')


def _is_numeric_type(duckdb_type):
    """
    Check if a DuckDB type is numeric.
    """
    type_lower = duckdb_type.lower()
    return any(
        (
            t in type_lower
            for t in [
                "tinyint",
                "smallint",
                "integer",
                "bigint",
                "int",
                "float",
                "double",
                "real",
                "decimal",
                "numeric",
            ]
        )
    )


def _get_operation_symbol(operation):
    """
    Get the SQL operator symbol for the given operation.
    """
    operations = {"add": "+", "subtract": "-", "multiply": "*", "divide": "/"}
    return operations.get(operation, "+")


def _build_operation_expression(col1, col2, operation):
    """
    Build a SQL expression for the operation that handles NULL values and division by zero.
    Operations on empty cells and division by 0 yield empty cells (NULL).
    """
    operator = _get_operation_symbol(operation)
    if operation == "divide":
        expr = f'\n            CASE \n                WHEN "{col2}" = 0 OR "{col2}" IS NULL OR "{col1}" IS NULL THEN NULL\n                ELSE "{col1}" {operator} "{col2}"\n            END\n        '
    else:
        expr = f'\n            CASE \n                WHEN "{col1}" IS NULL OR "{col2}" IS NULL THEN NULL\n                ELSE "{col1}" {operator} "{col2}"\n            END\n        '
    return expr


def generate_numerical_combinations(
    data: Union[DataFrame, ArrowTable],
    column1: Str = None,
    column2: Str = None,
    operation: Str = None,
    output_column_name: Str = None,
    options=None,
) -> Union[DataFrame, ArrowTable]:
    brick_display_name = "Numerical Combinations"
    options = options or {}
    verbose = options.get("verbose", True)
    column1 = _coalesce(column1, options.get("column1", ""))
    column2 = _coalesce(column2, options.get("column2", ""))
    operation = _coalesce(operation, options.get("operation", "add"))
    output_column_name = _coalesce(
        output_column_name, options.get("output_column_name", "")
    )
    keep_original_columns = options.get("keep_original_columns", True)
    output_format = options.get("output_format", "pandas")
    result = None
    conn = None
    if not column1 or not isinstance(column1, str):
        verbose and logger.error(
            f"[{brick_display_name}] Column 1 must be provided as a non-empty string!"
        )
        raise ValueError("Column 1 must be provided as a non-empty string!")
    if not column2 or not isinstance(column2, str):
        verbose and logger.error(
            f"[{brick_display_name}] Column 2 must be provided as a non-empty string!"
        )
        raise ValueError("Column 2 must be provided as a non-empty string!")
    if operation not in ["add", "subtract", "multiply", "divide"]:
        verbose and logger.error(
            f"[{brick_display_name}] Invalid operation: '{operation}'. Must be 'add', 'subtract', 'multiply', or 'divide'."
        )
        raise ValueError(
            f"Invalid operation: '{operation}'. Must be 'add', 'subtract', 'multiply', or 'divide'."
        )
    try:
        operation_symbols = {
            "add": "+",
            "subtract": "-",
            "multiply": "×",
            "divide": "÷",
        }
        verbose and logger.info(
            f"[{brick_display_name}] Starting numerical combination: '{column1}' {operation_symbols[operation]} '{column2}'."
        )
        data_type = None
        if isinstance(data, pd.DataFrame):
            data_type = "pandas"
        elif isinstance(data, pl.DataFrame):
            data_type = "polars"
        elif isinstance(data, (pa.Table, pa.lib.Table)):
            data_type = "arrow"
        if data_type is None:
            verbose and logger.error(
                f"[{brick_display_name}] Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
            )
            raise ValueError(
                "Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
            )
        verbose and logger.info(
            f"[{brick_display_name}] Detected input format: {data_type}."
        )
        conn = duckdb.connect(":memory:")
        conn.register("input_table", data)
        column_info = conn.execute("DESCRIBE input_table").fetchall()
        all_columns = {col[0]: col[1] for col in column_info}
        verbose and logger.info(
            f"[{brick_display_name}] Total columns in data: {len(all_columns)}."
        )
        if column1 not in all_columns:
            verbose and logger.error(
                f"[{brick_display_name}] Column '{column1}' not found in data!"
            )
            raise ValueError(f"Column '{column1}' not found in data!")
        if column2 not in all_columns:
            verbose and logger.error(
                f"[{brick_display_name}] Column '{column2}' not found in data!"
            )
            raise ValueError(f"Column '{column2}' not found in data!")
        col1_type = all_columns[column1]
        col2_type = all_columns[column2]
        if not _is_numeric_type(col1_type):
            verbose and logger.error(
                f"[{brick_display_name}] Column '{column1}' is not numeric (type: {col1_type})!"
            )
            raise ValueError(
                f"Column '{column1}' is not numeric (type: {col1_type}). Both columns must be numerical."
            )
        if not _is_numeric_type(col2_type):
            verbose and logger.error(
                f"[{brick_display_name}] Column '{column2}' is not numeric (type: {col2_type})!"
            )
            raise ValueError(
                f"Column '{column2}' is not numeric (type: {col2_type}). Both columns must be numerical."
            )
        verbose and logger.info(
            f"[{brick_display_name}] Both columns are numeric. Column 1 type: {col1_type}, Column 2 type: {col2_type}."
        )
        if not output_column_name:
            output_column_name = f"{column1}_{operation}_{column2}"
            verbose and logger.info(
                f"[{brick_display_name}] Generated output column name: '{output_column_name}'."
            )
        if output_column_name in all_columns:
            verbose and logger.warning(
                f"[{brick_display_name}] Output column name '{output_column_name}' already exists and will be overwritten."
            )
        sanitized_col1 = _sanitize_identifier(column1)
        sanitized_col2 = _sanitize_identifier(column2)
        sanitized_output = _sanitize_identifier(output_column_name)
        operation_expr = _build_operation_expression(
            sanitized_col1, sanitized_col2, operation
        )
        select_parts = []
        if keep_original_columns:
            for col in all_columns.keys():
                if col != output_column_name:
                    sanitized_col = _sanitize_identifier(col)
                    select_parts.append(f'"{sanitized_col}"')
        else:
            for col in all_columns.keys():
                if col != column1 and col != column2:
                    sanitized_col = _sanitize_identifier(col)
                    select_parts.append(f'"{sanitized_col}"')
        select_parts.append(f'{operation_expr} AS "{sanitized_output}"')
        select_clause = ", ".join(select_parts)
        query = f"SELECT {select_clause} FROM input_table"
        verbose and logger.info(
            f"[{brick_display_name}] Executing query to generate numerical combination."
        )
        if output_format == "pandas":
            result = conn.execute(query).df()
            verbose and logger.info(
                f"[{brick_display_name}] Converted result to pandas DataFrame."
            )
        elif output_format == "polars":
            result = conn.execute(query).pl()
            verbose and logger.info(
                f"[{brick_display_name}] Converted result to Polars DataFrame."
            )
        elif output_format == "arrow":
            result = conn.execute(query).fetch_arrow_table()
            verbose and logger.info(
                f"[{brick_display_name}] Converted result to Arrow Table."
            )
        else:
            verbose and logger.error(
                f"[{brick_display_name}] Unsupported output format: {output_format}"
            )
            raise ValueError(f"Unsupported output format: {output_format}")
        verbose and logger.info(
            f"[{brick_display_name}] Numerical combination completed successfully. Created column '{output_column_name}' with {operation_symbols[operation]} operation."
        )
    except Exception as e:
        verbose and logger.error(
            f"[{brick_display_name}] Error during numerical combination operation: {str(e)}"
        )
        raise
    finally:
        if conn is not None:
            conn.close()
    return result

Brick Info

version v0.1.3
python 3.10, 3.11, 3.12, 3.13
requirements
  • pandas
  • polars[pyarrow]
  • duckdb
  • pyarrow