Bin Numbers
Group numbers into bins (intervals) using fixed-size or custom intervals.
Bin Numbers
Processing
This function groups numeric values within specified columns into discrete intervals or "bins." It supports two primary modes: fixed_size, where bins have uniform width, or custom_intervals, allowing the user to define specific boundaries. The output dataset includes the new binned column(s) where values are represented by their interval range (e.g., "10:20").
Inputs
- data
- The input tabular data (DataFrame or Arrow Table) containing the columns to be binned.
- columns (optional)
- A list of column names containing numeric data that should be converted into bins. If not provided via input, the list must be set using the 'Input Columns' option.
Inputs Types
| Input | Types |
|---|---|
data |
DataFrame, ArrowTable |
columns |
List |
You can check the list of supported types here: Available Type Hints.
Outputs
- result
- The resulting tabular dataset containing the original data plus the new binned column(s). The specific type (DataFrame or ArrowTable) depends on the selected 'Output Format' option.
Outputs Types
| Output | Types |
|---|---|
result |
DataFrame, ArrowTable |
You can check the list of supported types here: Available Type Hints.
Options
The Bin Numbers brick contains some changeable options:
- Input Columns
- Specifies the numeric column(s) in the input data that should be binned. This is an alternative way to provide the
columnsinput. - Binning Mode
- Determines the method for creating bins:
fixed_size(equal width) orcustom_intervals(user-defined ranges). Defaults tofixed_size. - Bin Width
- Used only in
fixed_sizemode. Defines the uniform size of each bin interval (e.g., 10). - Minimum Value
- Used in
fixed_sizemode. Defines the starting point for the bin calculation. - Use Minimum Value
- If enabled, values below the defined
Minimum Valueare grouped into a specific labeled bin (e.g., '< 0'). - Maximum Value
- Used in
fixed_sizemode. Defines the maximum value before grouping into the upper bound label. - Use Maximum Value
- If enabled, values equal to or above the defined
Maximum Valueare grouped into a specific labeled bin (e.g., '>= 100'). - Custom Intervals
- Used only in
custom_intervalsmode. A list of comma-separated string pairs defininglower,upperbounds (e.g., "0,10"). Bounds can be left empty to represent infinity (e.g., "10," for[10, +inf)or ",10" for[-inf, 10)). - Output Suffix
- A suffix (default:
_binned) added to the name of the new binned column ifBin In Placeis disabled. - Bin In Place
- If enabled, the original numeric column is overwritten by the new categorical bin column. If disabled, a new column is created.
- Safe Mode
- If enabled, errors related to missing columns or non-numeric columns are logged as warnings and skipped, allowing the operation to continue. If disabled, these errors raise an exception.
- Output Format
- Specifies the desired format for the returned data (
pandas,polars, orarrow). Defaults topandas. - Verbose
- Enables detailed logging information during execution.
import logging
import duckdb
import pandas as pd
import polars as pl
import pyarrow as pa
from coded_flows.types import Union, DataFrame, ArrowTable, List, Str, Float, Bool
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def _sanitize_identifier(identifier):
"""
Sanitize SQL identifier by escaping special characters.
Handles double quotes and other problematic characters.
"""
return identifier.replace('"', '""')
def _build_fixed_case_statement(
sanitized_col,
use_minimum,
minimum_value,
use_maximum,
maximum_value,
offset,
bin_width,
):
"""Build CASE statement for fixed-size binning"""
case_parts = []
if use_minimum:
case_parts.append(
f"""WHEN "{sanitized_col}" < {minimum_value} THEN '< {minimum_value}'"""
)
if use_maximum:
case_parts.append(
f"""WHEN "{sanitized_col}" >= {maximum_value} THEN '>= {maximum_value}'"""
)
if use_minimum and use_maximum:
case_parts.append(
f"""\n WHEN "{sanitized_col}" >= {minimum_value} AND "{sanitized_col}" < {maximum_value} THEN\n CAST(FLOOR(("{sanitized_col}" - {offset}) / {bin_width}) * {bin_width} + {offset} AS VARCHAR) ||\n ':' ||\n CAST(FLOOR(("{sanitized_col}" - {offset}) / {bin_width}) * {bin_width} + {bin_width} + {offset} AS VARCHAR)\n """
)
elif use_minimum:
case_parts.append(
f"""\n WHEN "{sanitized_col}" >= {minimum_value} THEN\n CAST(FLOOR(("{sanitized_col}" - {offset}) / {bin_width}) * {bin_width} + {offset} AS VARCHAR) ||\n ':' ||\n CAST(FLOOR(("{sanitized_col}" - {offset}) / {bin_width}) * {bin_width} + {bin_width} + {offset} AS VARCHAR)\n """
)
elif use_maximum:
case_parts.append(
f"""\n WHEN "{sanitized_col}" < {maximum_value} THEN\n CAST(FLOOR(("{sanitized_col}" - {offset}) / {bin_width}) * {bin_width} + {offset} AS VARCHAR) ||\n ':' ||\n CAST(FLOOR(("{sanitized_col}" - {offset}) / {bin_width}) * {bin_width} + {bin_width} + {offset} AS VARCHAR)\n """
)
else:
case_parts.append(
f"""\n WHEN "{sanitized_col}" IS NOT NULL THEN\n CAST(FLOOR(("{sanitized_col}" - {offset}) / {bin_width}) * {bin_width} + {offset} AS VARCHAR) ||\n ':' ||\n CAST(FLOOR(("{sanitized_col}" - {offset}) / {bin_width}) * {bin_width} + {bin_width} + {offset} AS VARCHAR)\n """
)
case_parts.append("ELSE NULL")
return "CASE " + " ".join(case_parts) + " END"
def _build_custom_case_statement(sanitized_col, intervals):
"""Build CASE statement for custom intervals"""
case_parts = []
for interval in intervals:
lower = interval["lower"]
upper = interval["upper"]
lower_label = interval["lower_label"]
upper_label = interval["upper_label"]
if lower is None and upper is None:
condition = f'"{sanitized_col}" IS NOT NULL'
elif lower is None:
condition = f'"{sanitized_col}" < {upper}'
elif upper is None:
condition = f'"{sanitized_col}" >= {lower}'
else:
condition = f'"{sanitized_col}" >= {lower} AND "{sanitized_col}" < {upper}'
label = f"{lower_label}:{upper_label}"
case_parts.append(f"WHEN {condition} THEN '{label}'")
case_parts.append("ELSE NULL")
return "CASE " + " ".join(case_parts) + " END"
def bin_numbers(
data: Union[DataFrame, ArrowTable], columns: List = None, options=None
) -> Union[DataFrame, ArrowTable]:
brick_display_name = "Bin Numbers"
options = options or {}
verbose = options.get("verbose", True)
columns = columns or options.get("columns", [])
binning_mode = options.get("binning_mode", "fixed_size")
bin_width = options.get("bin_width", 10)
minimum_value = options.get("minimum_value", 0)
use_minimum = options.get("use_minimum", False)
maximum_value = options.get("maximum_value", 100)
use_maximum = options.get("use_maximum", False)
custom_intervals = options.get("custom_intervals", ["0,10", "10,20", "20,30"])
output_suffix = options.get("output_suffix", "_binned")
in_place = options.get("in_place", False)
safe_mode = options.get("safe_mode", False)
output_format = options.get("output_format", "pandas")
result = None
if not isinstance(columns, list) and (
not all((isinstance(c, str) for c in columns))
):
verbose and logger.error(
f"[{brick_display_name}] Invalid columns format! Expected a list."
)
raise ValueError("Columns must be provided as a list!")
try:
verbose and logger.info(f"[{brick_display_name}] Starting binning operation.")
if not columns or len(columns) == 0:
verbose and logger.error(
f"[{brick_display_name}] At least one input column must be specified."
)
raise ValueError("At least one input column must be specified")
if not isinstance(columns, list) or not all(
(isinstance(c, str) for c in columns)
):
verbose and logger.error(
f"[{brick_display_name}] Input columns must be provided as a list of strings."
)
raise ValueError("Input columns must be provided as a list of strings")
data_type = None
if isinstance(data, pd.DataFrame):
data_type = "pandas"
elif isinstance(data, pl.DataFrame):
data_type = "polars"
elif isinstance(data, (pa.Table, pa.lib.Table)):
data_type = "arrow"
if data_type is None:
verbose and logger.error(
f"[{brick_display_name}] Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table."
)
raise ValueError(
"Input data must be a pandas DataFrame, Polars DataFrame, or Arrow Table"
)
verbose and logger.info(
f"[{brick_display_name}] Detected input format: {data_type}."
)
conn = duckdb.connect(":memory:")
conn.register("input_table", data)
column_info = conn.execute("DESCRIBE input_table").fetchall()
all_columns = {col[0]: col[1] for col in column_info}
verbose and logger.info(
f"[{brick_display_name}] Processing {len(columns)} column(s) for binning."
)
numeric_types = [
"INTEGER",
"BIGINT",
"SMALLINT",
"TINYINT",
"DOUBLE",
"FLOAT",
"REAL",
"DECIMAL",
"NUMERIC",
"HUGEINT",
]
valid_columns = []
invalid_columns = []
for col in columns:
if col not in all_columns:
if safe_mode:
verbose and logger.warning(
f"[{brick_display_name}] Column '{col}' not found in data. Skipping."
)
invalid_columns.append(col)
continue
else:
verbose and logger.error(
f"[{brick_display_name}] Column '{col}' not found in data."
)
conn.close()
raise ValueError(f"Column '{col}' not found in data")
column_type = all_columns[col].upper()
is_numeric = any((num_type in column_type for num_type in numeric_types))
if not is_numeric:
error_message = f"Column '{col}' is not numeric (type: {column_type})"
if safe_mode:
verbose and logger.warning(
f"[{brick_display_name}] {error_message}. Skipping."
)
invalid_columns.append(col)
continue
else:
verbose and logger.error(f"[{brick_display_name}] {error_message}.")
conn.close()
raise ValueError(error_message)
valid_columns.append(col)
verbose and logger.info(
f"[{brick_display_name}] Column '{col}' is numeric (type: {column_type})."
)
if not valid_columns:
if safe_mode:
verbose and logger.warning(
f"[{brick_display_name}] No valid numeric columns to bin. Returning original data."
)
conn.close()
result = data
else:
verbose and logger.error(
f"[{brick_display_name}] No valid numeric columns to bin."
)
conn.close()
raise ValueError("No valid numeric columns to bin")
else:
verbose and logger.info(
f"[{brick_display_name}] {len(valid_columns)} valid column(s) will be binned."
)
if binning_mode == "fixed_size":
verbose and logger.info(
f"[{brick_display_name}] Using fixed-size binning with width={bin_width}."
)
offset = minimum_value if use_minimum else 0
verbose and logger.info(f"[{brick_display_name}] Bin offset: {offset}.")
if use_minimum:
verbose and logger.info(
f"[{brick_display_name}] Values below {minimum_value} will be labeled as '< {minimum_value}'."
)
if use_maximum:
verbose and logger.info(
f"[{brick_display_name}] Values above or equal to {maximum_value} will be labeled as '>= {maximum_value}'."
)
elif binning_mode == "custom_intervals":
verbose and logger.info(
f"[{brick_display_name}] Using custom intervals binning."
)
try:
if not isinstance(custom_intervals, list):
verbose and logger.error(
f"[{brick_display_name}] Custom intervals must be a list."
)
conn.close()
raise ValueError("Custom intervals must be a list")
intervals = []
for interval_str in custom_intervals:
interval_str = interval_str.strip()
if not interval_str:
continue
parts = [p.strip() for p in interval_str.split(",")]
if len(parts) != 2:
verbose and logger.error(
f"[{brick_display_name}] Invalid interval format: '{interval_str}'. Expected 'lower,upper'."
)
conn.close()
raise ValueError(
f"Invalid interval format: '{interval_str}'. Expected 'lower,upper'"
)
lower = parts[0]
upper = parts[1]
if lower == "":
lower_val = None
lower_label = "-inf"
else:
try:
lower_val = float(lower)
lower_label = lower
except ValueError:
verbose and logger.error(
f"[{brick_display_name}] Invalid lower bound: '{lower}'"
)
conn.close()
raise ValueError(f"Invalid lower bound: '{lower}'")
if upper == "":
upper_val = None
upper_label = "+inf"
else:
try:
upper_val = float(upper)
upper_label = upper
except ValueError:
verbose and logger.error(
f"[{brick_display_name}] Invalid upper bound: '{upper}'"
)
conn.close()
raise ValueError(f"Invalid upper bound: '{upper}'")
if lower_val is not None and upper_val is not None:
if lower_val >= upper_val:
verbose and logger.error(
f"[{brick_display_name}] Invalid interval: lower bound ({lower_val}) must be less than upper bound ({upper_val})."
)
conn.close()
raise ValueError(
f"Invalid interval: lower bound ({lower_val}) must be less than upper bound ({upper_val})"
)
intervals.append(
{
"lower": lower_val,
"upper": upper_val,
"lower_label": lower_label,
"upper_label": upper_label,
}
)
if not intervals:
verbose and logger.error(
f"[{brick_display_name}] No valid intervals specified."
)
conn.close()
raise ValueError("No valid intervals specified")
verbose and logger.info(
f"[{brick_display_name}] Parsed {len(intervals)} custom intervals."
)
except Exception as e:
verbose and logger.error(
f"[{brick_display_name}] Error parsing custom intervals."
)
conn.close()
raise
else:
verbose and logger.error(
f"[{brick_display_name}] Unknown binning mode: {binning_mode}"
)
conn.close()
raise ValueError(f"Unknown binning mode: {binning_mode}")
select_parts = []
binned_columns = set()
for col_name in all_columns.keys():
sanitized_col = _sanitize_identifier(col_name)
if col_name in valid_columns:
if binning_mode == "fixed_size":
case_statement = _build_fixed_case_statement(
sanitized_col,
use_maximum,
minimum_value,
use_maximum,
maximum_value,
offset,
bin_width,
)
else:
case_statement = _build_custom_case_statement(
sanitized_col, intervals
)
if in_place:
select_parts.append(f'{case_statement} AS "{sanitized_col}"')
binned_columns.add(col_name)
else:
select_parts.append(f'"{sanitized_col}"')
else:
select_parts.append(f'"{sanitized_col}"')
if not in_place:
for col_name in valid_columns:
sanitized_col = _sanitize_identifier(col_name)
output_col_name = f"{col_name}{output_suffix}"
sanitized_output_col = _sanitize_identifier(output_col_name)
if binning_mode == "fixed_size":
case_statement = _build_fixed_case_statement(
sanitized_col,
use_maximum,
minimum_value,
use_maximum,
maximum_value,
offset,
bin_width,
)
else:
case_statement = _build_custom_case_statement(
sanitized_col, intervals
)
select_parts.append(f'{case_statement} AS "{sanitized_output_col}"')
binned_columns.add(output_col_name)
select_clause = ", ".join(select_parts)
query = f"SELECT {select_clause} FROM input_table"
verbose and logger.info(f"[{brick_display_name}] Executing binning query.")
if output_format == "pandas":
result = conn.execute(query).df()
verbose and logger.info(
f"[{brick_display_name}] Converted result to pandas DataFrame."
)
elif output_format == "polars":
result = conn.execute(query).pl()
verbose and logger.info(
f"[{brick_display_name}] Converted result to Polars DataFrame."
)
elif output_format == "arrow":
result = conn.execute(query).fetch_arrow_table()
verbose and logger.info(
f"[{brick_display_name}] Converted result to Arrow Table."
)
else:
verbose and logger.error(
f"[{brick_display_name}] Unsupported output format: {output_format}"
)
conn.close()
raise ValueError(f"Unsupported output format: {output_format}")
conn.close()
result_rows = len(result)
verbose and logger.info(
f"[{brick_display_name}] Binning operation completed successfully. Binned {len(valid_columns)} column(s). Returned {result_rows} rows."
)
except Exception as e:
verbose and logger.error(
f"[{brick_display_name}] Error during binning operation."
)
raise
return result
Brick Info
- pandas
- polars[pyarrow]
- duckdb
- pyarrow