Reg. Decision Tree
Train a Decision Tree regression model.
Reg. Decision Tree
Processing
This brick trains a Decision Tree Regression model to predict numerical values based on your data. Think of a decision tree as a flowchart: it asks a series of yes/no questions about your data features (e.g., "Is the value greater than 50?") to split the data into smaller groups, eventually arriving at a prediction.
It is a versatile algorithm that can capture non-linear relationships without requiring heavy data preprocessing. This brick handles the entire workflow: it automatically splits your data into training and testing sets, can perform Cross-Validation to ensure reliability, and includes an "Auto-Tune" mode (Hyperparameter Optimization) to find the best possible configuration for your specific dataset.
Inputs
- X
- The features dataset containing the data points used to make predictions. This should be a table (DataFrame) where columns are variables (like "Age", "Temperature", "Price") and rows are individual observations.
- y
- The target values you want to predict. This is a list or series of numbers corresponding to the rows in
X.
Inputs Types
| Input | Types |
|---|---|
X |
DataFrame |
y |
DataSeries, NDArray, List |
You can check the list of supported types here: Available Type Hints.
Outputs
- Model
- The trained decision tree model (Scikit-Learn object). You can pass this to other bricks to make new predictions or save it for later use.
- SHAP
- A SHAP explainer object used for interpreting the model's decisions. It helps explain why the model made a specific prediction.
- Metrics
- A summary of the model's performance on the test set, including accuracy scores and error rates.
- CV Metrics
- The results from Cross-Validation, showing the mean performance and standard deviation across multiple data splits. This provides a more robust measure of reliability.
- Features Importance
- A table ranking which columns in your
Xinput had the most influence on the final prediction. - Prediction Set
- A dataframe combining your test features, the actual target values (
y_true), and the values predicted by the model (y_pred). This allows you to compare the results row-by-row. - HPO Trials
- A detailed log of every attempt made during Hyperparameter Optimization, showing which settings were tried and how well they performed.
- HPO Best
- A dictionary containing the single best set of parameters found during the optimization process.
The Features_Importance output contains the following specific data fields:
- feature: The name of the column from your input data.
- importance: A numeric score indicating how useful this feature was for making predictions (higher is better).
The CV_Metrics output contains the following specific data fields:
- Metric: The name of the performance metric (e.g., "Mean Absolute Error").
- Mean: The average score across all cross-validation folds.
- Std: The standard deviation (how much the score varied between folds).
The Prediction_Set output contains the following specific data fields:
- feature_{name}: All original columns from your test dataset.
- y_true: The actual target value from the test set.
- y_pred: The value predicted by the decision tree model.
Outputs Types
| Output | Types |
|---|---|
Model |
Any |
SHAP |
Any |
Metrics |
DataFrame, Dict |
CV Metrics |
DataFrame |
Features Importance |
DataFrame |
Prediction Set |
DataFrame |
HPO Trials |
DataFrame |
HPO Best |
Dict |
You can check the list of supported types here: Available Type Hints.
Options
The Reg. Decision Tree brick contains some changeable options:
- Criterion
- The function used to measure the quality of a split (how the tree decides where to branch).
- Squared Error: The standard metric for regression (Variance reduction). Good for general use.
- Friedman MSE: A variation of squared error that can provide better improvements for some datasets.
- Absolute Error: Uses mean absolute error. More robust to outliers but slower to calculate.
- Poisson: Uses reduction in Poisson deviance. specific for count data.
- Split Strategy
- Controls how the tree chooses the split at each node.
- Best: Evaluates all valid splits to find the absolute best one.
- Random: Chooses the best random split. This is faster and can prevent overfitting.
- Max Depth (0 = Unlimited)
- The maximum "height" of the tree. A deeper tree can model more complex relationships but is more likely to memorize noise (overfit). Setting this to 0 allows the tree to grow until all leaves are pure.
- Feature Sampling
- The number of features (columns) to consider when looking for the best split.
- Automatic 30%: Considers 30% of features at each split.
- Automatic 50%: Considers 50% of features.
- Square root: Considers the square root of the total number of features.
- Logarithm: Considers the log2 of the total number of features. * None: Considers every single feature at every split (slowest but most thorough).
- Min Samples per Leaf
- The minimum number of data points required to be in a leaf node (the end of a branch). Increasing this number smooths the model, making it less sensitive to noise.
- Auto Split Data
- If enabled, the brick automatically separates your data into training and testing sets. If disabled, it assumes the inputs are already prepared or uses the manual percentage setting.
- Test/Validation Set %
- The percentage of data to hold back for testing. For example, 15 means 15% of the data is used to test the model, and 85% is used to train it.
- Shuffle Split
- Whether to shuffle the data randomly before splitting. This ensures the training and test sets are representative of the whole dataset.
- Retrain On Full Data
- If enabled, after evaluating the model on the test set, the brick re-trains the model on 100% of the available data. This produces the best possible model for future deployment.
- Enable Cross-Validation
- If enabled, the model is trained and tested multiple times on different "folds" of the data to get a stable performance estimate.
- Number of CV Folds
- The number of times to split the data during cross-validation (e.g., 5 folds means 5 rounds of training/testing).
- Hyperparameter Optim.
- If enabled, the brick runs an automated experiment (using Optuna) to find the best combination of settings (Depth, Criterion, etc.) for your specific data.
- Optimization Metric
- The target score to optimize for.
- Forecast Accuracy: Maximizes the accuracy percentage.
- RMSE/MSE: Minimizes the squared error (standard for regression).
- MAE: Minimizes the absolute error.
- R2 Score: Maximizes the "goodness of fit."
- Optimization Method
- The algorithm used to search for the best settings.
- Tree-structured Parzen: A Bayesian optimization method that models good vs bad parameter regions using probability distributions and prioritizes sampling where success is statistically more likely.
- Gaussian Process: ses a probabilistic regression model (Gaussian Process) to estimate performance uncertainty and selects new trials using acquisition functions.
- CMA-ES: An evolutionary strategy that adapts the covariance matrix of a multivariate normal distribution to efficiently search complex, non-linear, non-convex spaces.
- Random Sobol Search: Uses low-discrepancy quasi-random sequences to ensure uniform coverage of the parameter space, avoiding clustering and gaps.
- Random Search: Uniform random sampling of parameter configurations without learning or feedback between iterations.
- Optimization Iterations
- The number of different settings combinations to try. More iterations take longer but increase the chance of finding the best model.
- Metrics as
- Choose the format for the
Metricsoutput. - SHAP Explainer
- If enabled, generates a SHAP object for interpreting the model.
- SHAP Sampler
- Use a background dataset sample for SHAP calculations (speeds up calculation for large datasets).
- SHAP Feature Perturbation
- How SHAP handles correlated features.
- Interventional: (Recommended) Breaks dependencies between features.
- Tree Path Dependent: Follows the tree structure; faster but can be biased if features are highly correlated.
- Number of Jobs
- The number of CPU cores to use. "All" uses all available power.
- Random State
- A seed number to ensure results are reproducible. Using the same seed with the same data will yield the same result.
- Brick Caching
- Saves the results to a temporary cache to speed up re-runs of the workflow.
- Verbose Logging
- Enables detailed progress logs in the console.
import logging
import warnings
import shap
import json
import xxhash
import hashlib
import tempfile
import sklearn
import scipy
import joblib
import numpy as np
import pandas as pd
import polars as pl
from pathlib import Path
from scipy import sparse
from optuna.samplers import (
TPESampler,
RandomSampler,
GPSampler,
CmaEsSampler,
QMCSampler,
)
import optuna
from optuna import Study
from optuna.trial import FrozenTrial
from optuna.pruners import HyperbandPruner
from optuna import create_study
from sklearn.tree import DecisionTreeRegressor
from sklearn.model_selection import train_test_split, cross_validate, KFold
from sklearn.metrics import (
mean_absolute_error,
mean_squared_error,
r2_score,
root_mean_squared_error,
mean_absolute_percentage_error,
make_scorer,
)
from dataclasses import dataclass
from datetime import datetime
from coded_flows.types import (
Union,
Dict,
List,
Tuple,
NDArray,
DataFrame,
DataSeries,
Any,
Tuple,
)
from coded_flows.utils import CodedFlowsLogger
logger = CodedFlowsLogger(name="Reg. Decision Tree", level=logging.INFO)
optuna.logging.set_verbosity(optuna.logging.ERROR)
warnings.filterwarnings("ignore", category=optuna.exceptions.ExperimentalWarning)
METRICS_DICT = {
"Forecast Accuracy": "fa",
"Weighted Absolute Percentage Error (WAPE)": "wape",
"Mean Absolute Error (MAE)": "mae",
"Mean Squared Error (MSE)": "mse",
"Root Mean Squared Error (RMSE)": "rmse",
"R2 Score": "r2",
"Mean Absolute Percentage Error (MAPE)": "mape",
}
METRICS_OPT = {
"fa": "maximize",
"wape": "minimize",
"mae": "minimize",
"mse": "minimize",
"rmse": "minimize",
"r2": "maximize",
"mape": "minimize",
}
DataType = Union[
pd.DataFrame, pl.DataFrame, np.ndarray, sparse.spmatrix, pd.Series, pl.Series
]
@dataclass
class _DatasetFingerprint:
"""Lightweight fingerprint of a dataset."""
hash: str
shape: tuple
computed_at: str
data_type: str
method: str
class _UniversalDatasetHasher:
"""
High-performance dataset hasher optimizing for zero-copy operations
and native backend execution (C/Rust).
"""
def __init__(
self,
data_size: int,
method: str = "auto",
sample_size: int = 100000,
verbose: bool = False,
):
self.method = method
self.sample_size = sample_size
self.data_size = data_size
self.verbose = verbose
def hash_data(self, data: DataType) -> _DatasetFingerprint:
"""
Main entry point: hash any supported data format.
Auto-detects format and applies optimal strategy.
"""
if isinstance(data, pd.DataFrame):
return self._hash_pandas(data)
elif isinstance(data, pl.DataFrame):
return self._hash_polars(data)
elif isinstance(data, pd.Series):
return self._hash_pandas_series(data)
elif isinstance(data, pl.Series):
return self._hash_polars_series(data)
elif isinstance(data, np.ndarray):
return self._hash_numpy(data)
elif sparse.issparse(data):
return self._hash_sparse(data)
else:
raise TypeError(f"Unsupported data type: {type(data)}")
def _hash_pandas(self, df: pd.DataFrame) -> _DatasetFingerprint:
"""
Optimized Pandas hashing using pd.util.hash_pandas_object.
Avoids object-to-string conversion overhead.
"""
method = self._determine_method(self.data_size, self.method)
self.verbose and logger.info(
f"Hashing Pandas: {self.data_size:,} rows - {method}"
)
target_df = df
if method == "sampled":
target_df = self._get_pandas_sample(df)
hasher = xxhash.xxh128()
self._hash_schema(
hasher,
{
"columns": df.columns.tolist(),
"dtypes": {k: str(v) for (k, v) in df.dtypes.items()},
"shape": df.shape,
},
)
try:
row_hashes = pd.util.hash_pandas_object(target_df, index=False)
hasher.update(memoryview(row_hashes.values))
except Exception as e:
self.verbose and logger.warning(
f"Fast hash failed, falling back to slow hash: {e}"
)
self._hash_pandas_fallback(hasher, target_df)
return _DatasetFingerprint(
hash=hasher.hexdigest(),
shape=df.shape,
computed_at=datetime.now().isoformat(),
data_type="pandas",
method=method,
)
def _get_pandas_sample(self, df: pd.DataFrame) -> pd.DataFrame:
"""Deterministic slicing for sampling (Zero randomness)."""
if self.data_size <= self.sample_size:
return df
chunk = self.sample_size // 3
head = df.iloc[:chunk]
mid_idx = self.data_size // 2
mid = df.iloc[mid_idx : mid_idx + chunk]
tail = df.iloc[-chunk:]
return pd.concat([head, mid, tail])
def _hash_pandas_fallback(self, hasher, df: pd.DataFrame):
"""Legacy fallback for complex object types."""
for col in df.columns:
val = df[col].astype(str).values
hasher.update(val.astype(np.bytes_).tobytes())
def _hash_polars(self, df: pl.DataFrame) -> _DatasetFingerprint:
"""
Optimized Polars hashing using native Rust execution.
"""
method = self._determine_method(self.data_size, self.method)
self.verbose and logger.info(
f"Hashing Polars: {self.data_size:,} rows - {method}"
)
target_df = df
if method == "sampled" and self.data_size > self.sample_size:
indices = self._get_sample_indices(self.data_size, self.sample_size)
target_df = df.gather(indices)
hasher = xxhash.xxh128()
self._hash_schema(
hasher,
{
"columns": df.columns,
"dtypes": [str(t) for t in df.dtypes],
"shape": df.shape,
},
)
row_hashes = target_df.hash_rows()
hasher.update(memoryview(row_hashes.to_numpy()))
return _DatasetFingerprint(
hash=hasher.hexdigest(),
shape=df.shape,
computed_at=datetime.now().isoformat(),
data_type="polars",
method=method,
)
def _hash_pandas_series(self, series: pd.Series) -> _DatasetFingerprint:
"""Hash Pandas Series using the fastest vectorized method."""
self.verbose and logger.info(f"Hashing Pandas Series: {self.data_size:,} rows")
hasher = xxhash.xxh128()
self._hash_schema(
hasher,
{
"name": series.name if series.name else "None",
"dtype": str(series.dtype),
"shape": series.shape,
},
)
try:
row_hashes = pd.util.hash_pandas_object(series, index=False)
hasher.update(memoryview(row_hashes.values))
except Exception as e:
self.verbose and logger.warning(f"Series hash failed, falling back: {e}")
hasher.update(memoryview(series.astype(str).values.tobytes()))
return _DatasetFingerprint(
hash=hasher.hexdigest(),
shape=series.shape,
computed_at=datetime.now().isoformat(),
data_type="pandas_series",
method="full",
)
def _hash_polars_series(self, series: pl.Series) -> _DatasetFingerprint:
"""Hash Polars Series using native Polars expressions."""
self.verbose and logger.info(f"Hashing Polars Series: {self.data_size:,} rows")
hasher = xxhash.xxh128()
self._hash_schema(
hasher,
{"name": series.name, "dtype": str(series.dtype), "shape": series.shape},
)
try:
row_hashes = series.hash()
hasher.update(memoryview(row_hashes.to_numpy()))
except Exception as e:
self.verbose and logger.warning(
f"Polars series native hash failed. Falling back."
)
hasher.update(str(series.to_list()).encode())
return _DatasetFingerprint(
hash=hasher.hexdigest(),
shape=series.shape,
computed_at=datetime.now().isoformat(),
data_type="polars_series",
method="full",
)
def _hash_numpy(self, arr: np.ndarray) -> _DatasetFingerprint:
"""
Optimized NumPy hashing using Buffer Protocol (Zero-Copy).
"""
hasher = xxhash.xxh128()
self._hash_schema(
hasher,
{"shape": arr.shape, "dtype": str(arr.dtype), "strides": arr.strides},
)
if arr.flags["C_CONTIGUOUS"] or arr.flags["F_CONTIGUOUS"]:
hasher.update(memoryview(arr))
else:
hasher.update(memoryview(np.ascontiguousarray(arr)))
return _DatasetFingerprint(
hash=hasher.hexdigest(),
shape=arr.shape,
computed_at=datetime.now().isoformat(),
data_type="numpy",
method="full",
)
def _hash_sparse(self, matrix: sparse.spmatrix) -> _DatasetFingerprint:
"""
Optimized sparse hashing. Hashes underlying data arrays directly.
"""
if not (sparse.isspmatrix_csr(matrix) or sparse.isspmatrix_csc(matrix)):
matrix = matrix.tocsr()
hasher = xxhash.xxh128()
self._hash_schema(
hasher, {"shape": matrix.shape, "format": matrix.format, "nnz": matrix.nnz}
)
hasher.update(memoryview(matrix.data))
hasher.update(memoryview(matrix.indices))
hasher.update(memoryview(matrix.indptr))
return _DatasetFingerprint(
hash=hasher.hexdigest(),
shape=matrix.shape,
computed_at=datetime.now().isoformat(),
data_type=f"sparse_{matrix.format}",
method="sparse",
)
def _determine_method(self, rows: int, requested: str) -> str:
if requested != "auto":
return requested
if rows < 5000000:
return "full"
return "sampled"
def _hash_schema(self, hasher, schema: Dict[str, Any]):
"""Compact schema hashing."""
hasher.update(
json.dumps(schema, sort_keys=True, separators=(",", ":")).encode()
)
def _get_sample_indices(self, total_rows: int, sample_size: int) -> list:
"""Calculate indices for sampling without generating full range lists."""
chunk = sample_size // 3
indices = list(range(min(chunk, total_rows)))
mid_start = max(0, total_rows // 2 - chunk // 2)
mid_end = min(mid_start + chunk, total_rows)
indices.extend(range(mid_start, mid_end))
last_start = max(0, total_rows - chunk)
indices.extend(range(last_start, total_rows))
return sorted(list(set(indices)))
def wape_score(y_true, y_pred):
"""
Calculates Weighted Absolute Percentage Error (WAPE).
WAPE = sum(|Error|) / sum(|Groundtruth|)
"""
y_true = np.asarray(y_true, dtype=np.float64)
y_pred = np.asarray(y_pred, dtype=np.float64)
eps = np.finfo(np.float64).eps
sum_abs_error = np.sum(np.abs(y_true - y_pred))
sum_abs_truth = np.maximum(np.sum(np.abs(y_true)), eps)
return sum_abs_error / sum_abs_truth
def forecast_accuracy(y_true, y_pred):
"""
Calculates Forecast Accuracy.
FA = 1 - (sum(|Error|) / sum(|Groundtruth|))
"""
y_true = np.asarray(y_true, dtype=np.float64)
y_pred = np.asarray(y_pred, dtype=np.float64)
eps = np.finfo(np.float64).eps
sum_abs_error = np.sum(np.abs(y_true - y_pred))
sum_abs_truth = np.maximum(np.sum(np.abs(y_true)), eps)
return 1 - sum_abs_error / sum_abs_truth
def _normalize_hpo_df(df):
df = df.copy()
param_cols = [c for c in df.columns if c.startswith("params_")]
df[param_cols] = df[param_cols].astype("string[pyarrow]")
return df
def _validate_numerical_data(data):
"""
Validates if the input data (NumPy array, Pandas DataFrame/Series,
Polars DataFrame/Series, or SciPy sparse matrix) contains only
numerical (integer, float) or boolean values.
Args:
data: The input data structure to check.
Raises:
TypeError: If the input data contains non-numerical and non-boolean types.
ValueError: If the input data is of an unsupported type.
"""
if sparse.issparse(data):
if not (
np.issubdtype(data.dtype, np.number) or np.issubdtype(data.dtype, np.bool_)
):
raise TypeError(
f"Sparse matrix contains unsupported data type: {data.dtype}. Only numerical or boolean types are allowed."
)
return
elif isinstance(data, np.ndarray):
if not (
np.issubdtype(data.dtype, np.number) or np.issubdtype(data.dtype, np.bool_)
):
raise TypeError(
f"NumPy array contains unsupported data type: {data.dtype}. Only numerical or boolean types are allowed."
)
return
elif isinstance(data, (pd.DataFrame, pd.Series)):
d_types = data.dtypes.apply(lambda x: x.kind)
non_numerical_mask = ~d_types.isin(["i", "f", "b"])
if non_numerical_mask.any():
non_numerical_columns = (
data.columns[non_numerical_mask].tolist()
if isinstance(data, pd.DataFrame)
else [data.name]
)
raise TypeError(
f"Pandas {('DataFrame' if isinstance(data, pd.DataFrame) else 'Series')} contains non-numerical/boolean data. Offending column(s) and types: {data.dtypes[non_numerical_mask].to_dict()}"
)
return
elif isinstance(data, (pl.DataFrame, pl.Series)):
pl_numerical_types = [
pl.Int8,
pl.Int16,
pl.Int32,
pl.Int64,
pl.UInt8,
pl.UInt16,
pl.UInt32,
pl.UInt64,
pl.Float32,
pl.Float64,
pl.Boolean,
]
if isinstance(data, pl.DataFrame):
for col, dtype in data.schema.items():
if dtype not in pl_numerical_types:
raise TypeError(
f"Polars DataFrame column '{col}' has unsupported data type: {dtype}. Only numerical or boolean types are allowed."
)
elif isinstance(data, pl.Series):
if data.dtype not in pl_numerical_types:
raise TypeError(
f"Polars Series has unsupported data type: {data.dtype}. Only numerical or boolean types are allowed."
)
return
else:
raise ValueError(
f"Unsupported data type provided: {type(data)}. Function supports NumPy, Pandas, Polars, and SciPy sparse matrices."
)
def _smart_split(
n_samples,
X,
y,
*,
random_state=42,
shuffle=True,
fixed_test_split=None,
verbose=True,
):
"""
Parameters
----------
n_samples : int
Number of samples in the dataset (len(X) or len(y))
X : array-like
Features
y : array-like
Target
random_state : int
shuffle : bool
stratify : array-like or None
For stratified splitting (recommended for classification)
Returns
-------
If return_val=True → X_train, X_val, X_test, y_train, y_val, y_test
If return_val=False → X_train, X_test, y_train, y_test
"""
if fixed_test_split:
test_ratio = fixed_test_split
val_ratio = fixed_test_split
elif n_samples <= 1000:
test_ratio = 0.2
val_ratio = 0.1
elif n_samples < 10000:
test_ratio = 0.15
val_ratio = 0.15
elif n_samples < 100000:
test_ratio = 0.1
val_ratio = 0.1
elif n_samples < 1000000:
test_ratio = 0.05
val_ratio = 0.05
else:
test_ratio = 0.01
val_ratio = 0.01
(X_train, X_test, y_train, y_test) = train_test_split(
X, y, test_size=test_ratio, random_state=random_state, shuffle=shuffle
)
val_size_in_train = val_ratio / (1 - test_ratio)
verbose and logger.info(
f"Split → Train: {1 - test_ratio:.2%} | Test: {test_ratio:.2%} (no validation set)"
)
return (X_train, X_test, y_train, y_test, val_size_in_train)
def _ensure_feature_names(X, feature_names=None):
if isinstance(X, pd.DataFrame):
return list(X.columns)
if isinstance(X, np.ndarray):
if feature_names is None:
feature_names = [f"feature_{i}" for i in range(X.shape[1])]
return feature_names
raise TypeError("X must be a pandas DataFrame or numpy ndarray")
def _perform_cross_validation(
model, X, y, cv_folds, shuffle, random_state, n_jobs, verbose
) -> dict[str, Any]:
"""Perform cross-validation on the regression model."""
verbose and logger.info(f"Performing {cv_folds}-fold cross-validation...")
cv = KFold(n_splits=cv_folds, shuffle=shuffle, random_state=random_state)
scoring = {
"MAE": "neg_mean_absolute_error",
"MSE": "neg_mean_squared_error",
"RMSE": "neg_root_mean_squared_error",
"MAPE": "neg_mean_absolute_percentage_error",
"R2": "r2",
"WAPE": make_scorer(wape_score, greater_is_better=False),
"Forecast_Accuracy": make_scorer(forecast_accuracy, greater_is_better=True),
}
cv_results = cross_validate(
model, X, y, cv=cv, scoring=scoring, return_train_score=False, n_jobs=n_jobs
)
def get_score_stats(metric_key, invert_sign=False):
key = f"test_{metric_key}"
if key in cv_results:
scores = cv_results[key]
if invert_sign:
scores = -scores
return (scores.mean(), scores.std())
return (0.0, 0.0)
(mae_mean, mae_std) = get_score_stats("MAE", invert_sign=True)
(mse_mean, mse_std) = get_score_stats("MSE", invert_sign=True)
(rmse_mean, rmse_std) = get_score_stats("RMSE", invert_sign=True)
(mape_mean, mape_std) = get_score_stats("MAPE", invert_sign=True)
(wape_mean, wape_std) = get_score_stats("WAPE", invert_sign=True)
(r2_mean, r2_std) = get_score_stats("R2", invert_sign=False)
(fa_mean, fa_std) = get_score_stats("Forecast_Accuracy", invert_sign=False)
verbose and logger.info(f"CV MAE : {mae_mean:.4f} (+/- {mae_std:.4f})")
verbose and logger.info(f"CV MSE : {mse_mean:.4f} (+/- {mse_std:.4f})")
verbose and logger.info(f"CV RMSE : {rmse_mean:.4f} (+/- {rmse_std:.4f})")
verbose and logger.info(f"CV MAPE : {mape_mean:.4f} (+/- {mape_std:.4f})")
verbose and logger.info(f"CV WAPE : {wape_mean:.4f} (+/- {wape_std:.4f})")
verbose and logger.info(f"CV R2 Score : {r2_mean:.4f} (+/- {r2_std:.4f})")
verbose and logger.info(f"CV Forecast Acc : {fa_mean:.4f} (+/- {fa_std:.4f})")
CV_metrics = pd.DataFrame(
{
"Metric": [
"Mean Absolute Error (MAE)",
"Mean Squared Error (MSE)",
"Root Mean Squared Error (RMSE)",
"Mean Absolute Percentage Error (MAPE)",
"Weighted Absolute Percentage Error (WAPE)",
"R2 Score",
"Forecast Accuracy",
],
"Mean": [
mae_mean,
mse_mean,
rmse_mean,
mape_mean,
wape_mean,
r2_mean,
fa_mean,
],
"Std": [mae_std, mse_std, rmse_std, mape_std, wape_std, r2_std, fa_std],
}
)
return CV_metrics
def _compute_score(model, X, y, metric):
"""
Computes the score for the model on the given data based on the selected metric.
Assumes 'metric' is passed as the short code (e.g., "MAE", "R2", "FA").
"""
y_pred = model.predict(X)
if metric == "mae":
score = mean_absolute_error(y, y_pred)
elif metric == "mse":
score = mean_squared_error(y, y_pred)
elif metric == "rmse":
score = root_mean_squared_error(y, y_pred)
elif metric == "mape":
score = mean_absolute_percentage_error(y, y_pred)
elif metric == "r2":
score = r2_score(y, y_pred)
elif metric == "wape" or metric == "fa":
y_true_np = np.array(y, dtype=float).flatten()
y_pred_np = np.array(y_pred, dtype=float).flatten()
eps = np.finfo(np.float64).eps
sum_abs_error = np.sum(np.abs(y_true_np - y_pred_np))
sum_abs_truth = np.maximum(np.sum(np.abs(y_true_np)), eps)
wape_val = sum_abs_error / sum_abs_truth
if metric == "fa":
score = 1.0 - wape_val
else:
score = wape_val
else:
raise ValueError(f"Unknown regression metric: {metric}")
return score
def _get_cv_scoring_object(metric: str) -> Any:
"""
Returns a scoring object (string or callable) suitable for cross_validate or GridSearchCV.
Used during HPO for Regression.
"""
if metric == "mae":
return "neg_mean_absolute_error"
elif metric == "mse":
return "neg_mean_squared_error"
elif metric == "rmse":
return "neg_root_mean_squared_error"
elif metric == "r2":
return "r2"
elif metric == "mape":
return "neg_mean_absolute_percentage_error"
elif metric == "wape":
return make_scorer(wape_score, greater_is_better=False)
elif metric == "fa":
return make_scorer(forecast_accuracy, greater_is_better=True)
else:
return "neg_root_mean_squared_error"
def _hyperparameters_optimization(
X,
y,
constant_hyperparameters,
optimization_metric,
val_ratio,
shuffle_split,
use_cross_val,
cv_folds,
n_trials=50,
strategy="maximize",
sampler="Tree-structured Parzen",
seed=None,
n_jobs=-1,
verbose=False,
):
direction = "maximize" if strategy.lower() == "maximize" else "minimize"
sampler_map = {
"Tree-structured Parzen": TPESampler(seed=seed),
"Gaussian Process": GPSampler(seed=seed),
"CMA-ES": CmaEsSampler(seed=seed),
"Random Search": RandomSampler(seed=seed),
"Random Sobol Search": QMCSampler(seed=seed),
}
if sampler in sampler_map:
chosen_sampler = sampler_map[sampler]
else:
logger.warning(f"Sampler '{sampler}' not recognized → falling back to TPE")
chosen_sampler = TPESampler(seed=seed)
chosen_pruner = HyperbandPruner()
if use_cross_val:
cv = KFold(n_splits=cv_folds, shuffle=shuffle_split, random_state=seed)
cv_score_obj = _get_cv_scoring_object(optimization_metric)
else:
(X_train, X_val, y_train, y_val) = train_test_split(
X, y, test_size=val_ratio, random_state=seed, shuffle=shuffle_split
)
def logging_callback(study: Study, trial: FrozenTrial):
"""Callback function to log trial progress"""
verbose and logger.info(
f"Trial {trial.number} finished with value: {trial.value} and parameters: {trial.params}"
)
try:
verbose and logger.info(f"Best value so far: {study.best_value}")
verbose and logger.info(f"Best parameters so far: {study.best_params}")
except ValueError:
verbose and logger.info(f"No successful trials completed yet")
verbose and logger.info(f"" + "-" * 50)
def objective(trial):
try:
criterion = trial.suggest_categorical(
"criterion", ["squared_error", "friedman_mse", "absolute_error"]
)
splitter = trial.suggest_categorical("splitter", ["best", "random"])
max_depth = trial.suggest_int("max_depth", 10, 120, log=True)
min_samples_split = trial.suggest_int("min_samples_split", 2, 32)
min_samples_leaf = trial.suggest_int("min_samples_leaf", 1, 32)
max_features = trial.suggest_categorical(
"max_features", ["sqrt", "log2", None, 0.5, 0.8]
)
model = DecisionTreeRegressor(
criterion=criterion,
splitter=splitter,
min_samples_leaf=min_samples_leaf,
max_features=max_features,
max_depth=max_depth,
min_samples_split=min_samples_split,
random_state=seed,
)
if use_cross_val:
scores = cross_validate(
model, X, y, cv=cv, n_jobs=n_jobs, scoring=cv_score_obj
)
return scores["test_score"].mean()
else:
model.fit(X_train, y_train)
score = _compute_score(model, X_val, y_val, optimization_metric)
return score
except Exception as e:
verbose and logger.error(
f"Trial {trial.number} failed with error: {str(e)}"
)
raise
study = create_study(
direction=direction, sampler=chosen_sampler, pruner=chosen_pruner
)
study.optimize(
objective,
n_trials=n_trials,
catch=(Exception,),
n_jobs=n_jobs,
callbacks=[logging_callback],
)
verbose and logger.info(f"Optimization completed!")
verbose and logger.info(
f" Best Criterion : {study.best_params['criterion']}"
)
verbose and logger.info(
f" Best Split Strategy : {study.best_params['splitter']}"
)
verbose and logger.info(
f" Best Max Depth : {study.best_params['max_depth']}"
)
verbose and logger.info(
f" Best Feature Sampling : {study.best_params['max_features']}"
)
verbose and logger.info(
f" Best Min Samples per Leaf : {study.best_params['min_samples_leaf']}"
)
verbose and logger.info(
f" Best Min Samples Split : {study.best_params['min_samples_split']}"
)
verbose and logger.info(
f" Best {optimization_metric:<22}: {study.best_value:.4f}"
)
verbose and logger.info(f" Sampler used : {sampler}")
verbose and logger.info(f" Direction : {direction}")
if use_cross_val:
verbose and logger.info(f" Cross-validation : {cv_folds}-fold")
else:
verbose and logger.info(
f" Validation : single train/val split"
)
trials = study.trials_dataframe()
trials["best_value"] = trials["value"].cummax()
cols = list(trials.columns)
value_idx = cols.index("value")
cols = [c for c in cols if c != "best_value"]
new_order = cols[: value_idx + 1] + ["best_value"] + cols[value_idx + 1 :]
trials = trials[new_order]
return (study.best_params, trials)
def _combine_test_data(X_test, y_true, y_pred, features_names=None):
"""
Combine X_test, y_true, y_pred into a single DataFrame.
Parameters:
-----------
X_test : pandas/polars DataFrame, numpy array, or scipy sparse matrix
Test features
y_true : pandas/polars Series, numpy array, or list
True labels
y_pred : pandas/polars Series, numpy array, or list
Predicted labels
Returns:
--------
pandas.DataFrame
Combined DataFrame with features, y_true, and y_pred
"""
if sparse.issparse(X_test):
X_df = pd.DataFrame(X_test.toarray())
elif isinstance(X_test, np.ndarray):
X_df = pd.DataFrame(X_test)
elif hasattr(X_test, "to_pandas"):
X_df = X_test.to_pandas()
elif isinstance(X_test, pd.DataFrame):
X_df = X_test.copy()
else:
raise TypeError(f"Unsupported type for X_test: {type(X_test)}")
if X_df.columns.tolist() == list(range(len(X_df.columns))):
X_df.columns = (
[f"feature_{i}" for i in range(len(X_df.columns))]
if features_names is None
else features_names
)
if isinstance(y_true, list):
y_true_series = pd.Series(y_true, name="y_true")
elif isinstance(y_true, np.ndarray):
y_true_series = pd.Series(y_true, name="y_true")
elif hasattr(y_true, "to_pandas"):
y_true_series = y_true.to_pandas()
y_true_series.name = "y_true"
elif isinstance(y_true, pd.Series):
y_true_series = y_true.copy()
y_true_series.name = "y_true"
else:
raise TypeError(f"Unsupported type for y_true: {type(y_true)}")
if isinstance(y_pred, list):
y_pred_series = pd.Series(y_pred, name="y_pred")
elif isinstance(y_pred, np.ndarray):
y_pred_series = pd.Series(y_pred, name="y_pred")
elif hasattr(y_pred, "to_pandas"):
y_pred_series = y_pred.to_pandas()
y_pred_series.name = "y_pred"
elif isinstance(y_pred, pd.Series):
y_pred_series = y_pred.copy()
y_pred_series.name = "y_pred"
else:
raise TypeError(f"Unsupported type for y_pred: {type(y_pred)}")
X_df = X_df.reset_index(drop=True)
y_true_series = y_true_series.reset_index(drop=True)
y_pred_series = y_pred_series.reset_index(drop=True)
result_df = pd.concat([X_df, y_true_series, y_pred_series], axis=1)
return result_df
def _get_feature_importance(model, feature_names=None, sort=True, top_n=None):
"""
Extract feature importance from model.
Parameters:
-----------
model : Fitted scikit-learn model
feature_names : list or array-like, optional
Names of features. If None, uses generic names like 'feature_0', 'feature_1', etc.
sort : bool, default=True
Whether to sort features by importance (descending)
top_n : int, optional
If specified, returns only the top N most important features
Returns:
--------
pd.DataFrame
DataFrame with columns: 'feature', 'importance'
Importance values represent the mean decrease in impurity (Gini importance)
"""
importances = model.feature_importances_
if feature_names is None:
feature_names = [f"feature_{i}" for i in range(len(importances))]
importance_df = pd.DataFrame({"feature": feature_names, "importance": importances})
if sort:
importance_df = importance_df.sort_values("importance", ascending=False)
importance_df = importance_df.reset_index(drop=True)
if top_n is not None:
importance_df = importance_df.head(top_n)
return importance_df
def _smart_shap_background(
X: Union[np.ndarray, pd.DataFrame],
model_type: str = "tree",
seed: int = 42,
verbose: bool = True,
) -> Union[np.ndarray, pd.DataFrame, object]:
"""
Intelligently prepares a background dataset for SHAP based on model type.
Strategies:
- Tree: Higher sample cap (1000), uses Random Sampling (preserves data structure).
- Other: Lower sample cap (100), uses K-Means (maximizes info density).
"""
(n_rows, n_features) = X.shape
if model_type == "tree":
max_samples = 1000
use_kmeans = False
else:
max_samples = 100
use_kmeans = True
if n_rows <= max_samples:
verbose and logger.info(
f"✓ Dataset small ({n_rows} <= {max_samples}). Using full data."
)
return X
verbose and logger.info(
f"⚡ Large dataset detected ({n_rows} rows). Optimization Strategy: {('K-Means' if use_kmeans else 'Random Sampling')}"
)
if use_kmeans:
try:
verbose and logger.info(
f" Summarizing to {max_samples} weighted centroids..."
)
return shap.kmeans(X, max_samples)
except Exception as e:
logger.warning(
f" K-Means failed ({str(e)}). Falling back to random sampling."
)
return shap.sample(X, max_samples, random_state=seed)
else:
verbose and logger.info(f" Sampling {max_samples} random rows...")
return shap.sample(X, max_samples, random_state=seed)
def train_reg_decision_tree(
X: DataFrame, y: Union[DataSeries, NDArray, List], options=None
) -> Tuple[
Any, Any, Union[DataFrame, Dict], DataFrame, DataFrame, DataFrame, DataFrame, Dict
]:
options = options or {}
criterion = options.get("criterion", "squared_error").lower().replace(" ", "_")
splitter = options.get("splitter", "best").lower()
feature_strategy = options.get("feature_strategy", "Square root")
max_depth_input = options.get("max_depth", 0)
min_samples_leaf = options.get("min_samples_leaf", 1)
if feature_strategy == "Automatic 30%":
max_features = 0.3
elif feature_strategy == "Automatic 60%":
max_features = 0.5
elif feature_strategy == "Square root":
max_features = "sqrt"
elif feature_strategy == "Logarithm":
max_features = "log2"
elif feature_strategy == "None":
max_features = None
else:
max_features = "sqrt"
max_depth = None if max_depth_input == 0 else max_depth_input
auto_split = options.get("auto_split", True)
test_val_size = options.get("test_val_size", 15) / 100
shuffle_split = options.get("shuffle_split", True)
retrain_on_full = options.get("retrain_on_full", False)
use_cross_validation = options.get("use_cross_validation", False)
cv_folds = options.get("cv_folds", 5)
use_hpo = options.get("use_hyperparameter_optimization", False)
optimization_metric = options.get(
"optimization_metric", "Root Mean Squared Error (RMSE)"
)
optimization_metric = METRICS_DICT[optimization_metric]
optimization_method = options.get("optimization_method", "Tree-structured Parzen")
optimization_iterations = options.get("optimization_iterations", 50)
return_shap_explainer = options.get("return_shap_explainer", False)
use_shap_sampler = options.get("use_shap_sampler", False)
shap_feature_perturbation = options.get(
"shap_feature_perturbation", "Interventional"
)
metrics_as = options.get("metrics_as", "Dataframe")
n_jobs_str = options.get("n_jobs", "1")
random_state = options.get("random_state", 42)
activate_caching = options.get("activate_caching", False)
verbose = options.get("verbose", True)
n_jobs_int = -1 if n_jobs_str == "All" else int(n_jobs_str)
skip_computation = False
Model = None
Metrics = pd.DataFrame()
CV_Metrics = pd.DataFrame()
Features_Importance = pd.DataFrame()
SHAP = None
HPO_Trials = pd.DataFrame()
HPO_Best = None
fa = None
wape = None
mae = None
mse = None
rmse = None
r2 = None
mape = None
(n_samples, _) = X.shape
shap_feature_names = _ensure_feature_names(X)
if activate_caching:
verbose and logger.info(f"Caching is activate")
data_hasher = _UniversalDatasetHasher(n_samples, verbose=verbose)
X_hash = data_hasher.hash_data(X).hash
y_hash = data_hasher.hash_data(y).hash
all_hash_base_text = f"HASH BASE TEXTPandas Version {pd.__version__}POLARS Version {pl.__version__}Numpy Version {np.__version__}Scikit Learn Version {sklearn.__version__}Scipy Version {scipy.__version__}{('SHAP Version ' + shap.__version__ if return_shap_explainer else 'NO SHAP Version')}{X_hash}{y_hash}{criterion}{splitter}{feature_strategy}{max_depth_input}{min_samples_leaf}{('Use HPO' if use_hpo else 'No HPO')}{(optimization_metric if use_hpo else 'No HPO Metric')}{(optimization_method if use_hpo else 'No HPO Method')}{(optimization_iterations if use_hpo else 'No HPO Iter')}{(cv_folds if use_cross_validation else 'No CV')}{('Auto Split' if auto_split else test_val_size)}{shuffle_split}{return_shap_explainer}{shap_feature_perturbation}{use_shap_sampler}{random_state}"
all_hash = hashlib.sha256(all_hash_base_text.encode("utf-8")).hexdigest()
verbose and logger.info(f"Hash was computed: {all_hash}")
temp_folder = Path(tempfile.gettempdir())
cache_folder = temp_folder / "coded-flows-cache"
cache_folder.mkdir(parents=True, exist_ok=True)
model_path = cache_folder / f"{all_hash}.model"
metrics_dict_path = cache_folder / f"metrics_{all_hash}.json"
metrics_df_path = cache_folder / f"metrics_{all_hash}.parquet"
cv_metrics_path = cache_folder / f"cv_metrics_{all_hash}.parquet"
hpo_trials_path = cache_folder / f"hpo_trials_{all_hash}.parquet"
hpo_best_params_path = cache_folder / f"hpo_best_params_{all_hash}.json"
features_importance_path = (
cache_folder / f"features_importance_{all_hash}.parquet"
)
prediction_set_path = cache_folder / f"prediction_set_{all_hash}.parquet"
shap_path = cache_folder / f"{all_hash}.shap"
skip_computation = model_path.is_file()
if not skip_computation:
try:
_validate_numerical_data(X)
except Exception as e:
verbose and logger.error(
f"Only numerical or boolean types are allowed for 'X' input!"
)
raise
features_names = X.columns if hasattr(X, "columns") else None
min_samples_split = 2 * min_samples_leaf
fixed_test_split = None if auto_split else test_val_size
(X_train, X_test, y_train, y_test, val_ratio) = _smart_split(
n_samples,
X,
y,
random_state=random_state,
shuffle=shuffle_split,
fixed_test_split=fixed_test_split,
verbose=verbose,
)
if use_hpo:
verbose and logger.info(f"Performing Hyperparameters Optimization")
constant_hyperparameters = {}
(HPO_Best, HPO_Trials) = _hyperparameters_optimization(
X_train,
y_train,
constant_hyperparameters,
optimization_metric,
val_ratio,
shuffle_split,
use_cross_validation,
cv_folds,
optimization_iterations,
METRICS_OPT[optimization_metric],
optimization_method,
random_state,
n_jobs_int,
verbose=verbose,
)
HPO_Trials = _normalize_hpo_df(HPO_Trials)
criterion = HPO_Best["criterion"]
splitter = HPO_Best["splitter"]
max_depth = HPO_Best["max_depth"]
min_samples_split = HPO_Best["min_samples_split"]
min_samples_leaf = HPO_Best["min_samples_leaf"]
max_features = HPO_Best["max_features"]
Model = DecisionTreeRegressor(
criterion=criterion,
splitter=splitter,
min_samples_leaf=min_samples_leaf,
max_features=max_features,
max_depth=max_depth,
min_samples_split=min_samples_split,
random_state=random_state,
)
if use_cross_validation and (not use_hpo):
verbose and logger.info(
f"Using Cross-Validation to measure performance metrics"
)
CV_Metrics = _perform_cross_validation(
Model,
X_train,
y_train,
cv_folds,
shuffle_split,
random_state,
n_jobs_int,
verbose,
)
Model.fit(X_train, y_train)
y_pred = Model.predict(X_test)
fa = forecast_accuracy(y_test, y_pred)
wape = wape_score(y_test, y_pred)
mae = mean_absolute_error(y_test, y_pred)
mse = mean_squared_error(y_test, y_pred)
rmse = root_mean_squared_error(y_test, y_pred)
r2 = r2_score(y_test, y_pred)
mape = mean_absolute_percentage_error(y_test, y_pred)
if metrics_as == "Dataframe":
Metrics = pd.DataFrame(
{
"Metric": [
"Forecast Accuracy",
"Weighted Absolute Percentage Error",
"Mean Absolute Error",
"Mean Squared Error",
"Root Mean Squared Error",
"R2 Score",
"Mean Absolute Percentage Error",
],
"Value": [fa, wape, mae, mse, rmse, r2, mape],
}
)
else:
Metrics = {
"forecast_accuracy": fa,
"weighted_absolute_percentage_error ": wape,
"mean_absolute_error": mae,
"mean_squared_error": mse,
"root_mean_squared_error": rmse,
"r2_score": r2,
"mean_absolute_percentage_error": mape,
}
verbose and logger.info(f"Forecast Accuracy : {fa:.2%}")
verbose and logger.info(f"Weighted Absolute Percentage Error : {wape:.2%}")
verbose and logger.info(f"Mean Absolute Error : {mae:.4f}")
verbose and logger.info(f"Mean Squared Error : {mse:.4f}")
verbose and logger.info(f"Root Mean Squared Error : {rmse:.4f}")
verbose and logger.info(f"R2 Score : {r2:.4f}")
verbose and logger.info(f"Mean Absolute Percentage Error : {mape:.2%}")
Prediction_Set = _combine_test_data(X_test, y_test, y_pred, features_names)
verbose and logger.info(f"Prediction Set created")
if retrain_on_full:
verbose and logger.info(
"Retraining model on full dataset for production deployment"
)
Model.fit(X, y)
verbose and logger.info(
"Model successfully retrained on full dataset. Reported metrics remain from original held-out test set."
)
Features_Importance = _get_feature_importance(Model, features_names)
verbose and logger.info(f"Features Importance computed")
if return_shap_explainer:
if shap_feature_perturbation == "Interventional":
SHAP = shap.TreeExplainer(
Model,
(
_smart_shap_background(
X if retrain_on_full else X_train,
model_type="tree",
seed=random_state,
verbose=verbose,
)
if use_shap_sampler
else X if retrain_on_full else X_train
),
feature_names=shap_feature_names,
)
else:
SHAP = shap.TreeExplainer(
Model,
feature_names=shap_feature_names,
feature_perturbation="tree_path_dependent",
)
verbose and logger.info(f"SHAP explainer generated")
if activate_caching:
verbose and logger.info(f"Caching output elements")
joblib.dump(Model, model_path)
if isinstance(Metrics, dict):
with metrics_dict_path.open("w", encoding="utf-8") as f:
json.dump(Metrics, f, ensure_ascii=False, indent=4)
else:
Metrics.to_parquet(metrics_df_path)
if use_cross_validation and (not use_hpo):
CV_Metrics.to_parquet(cv_metrics_path)
if use_hpo:
HPO_Trials.to_parquet(hpo_trials_path)
with hpo_best_params_path.open("w", encoding="utf-8") as f:
json.dump(HPO_Best, f, ensure_ascii=False, indent=4)
Features_Importance.to_parquet(features_importance_path)
Prediction_Set.to_parquet(prediction_set_path)
if return_shap_explainer:
with shap_path.open("wb") as f:
joblib.dump(SHAP, f)
verbose and logger.info(f"Caching done")
else:
verbose and logger.info(f"Skipping computations and loading cached elements")
Model = joblib.load(model_path)
verbose and logger.info(f"Model loaded")
if metrics_dict_path.is_file():
with metrics_dict_path.open("r", encoding="utf-8") as f:
Metrics = json.load(f)
else:
Metrics = pd.read_parquet(metrics_df_path)
verbose and logger.info(f"Metrics loaded")
if use_cross_validation and (not use_hpo):
CV_Metrics = pd.read_parquet(cv_metrics_path)
verbose and logger.info(f"Cross Validation metrics loaded")
if use_hpo:
HPO_Trials = pd.read_parquet(hpo_trials_path)
with hpo_best_params_path.open("r", encoding="utf-8") as f:
HPO_Best = json.load(f)
verbose and logger.info(
f"Hyperparameters Optimization trials and best params loaded"
)
Features_Importance = pd.read_parquet(features_importance_path)
verbose and logger.info(f"Features Importance loaded")
Prediction_Set = pd.read_parquet(prediction_set_path)
verbose and logger.info(f"Prediction Set loaded")
if return_shap_explainer:
with shap_path.open("rb") as f:
SHAP = joblib.load(f)
verbose and logger.info(f"SHAP Explainer loaded")
return (
Model,
SHAP,
Metrics,
CV_Metrics,
Features_Importance,
Prediction_Set,
HPO_Trials,
HPO_Best,
)
Brick Info
- shap>=0.47.0
- scikit-learn
- pandas
- numpy
- torch
- numba>=0.56.0
- shap
- cmaes
- optuna
- scipy
- polars
- xxhash