Reg. LightGBM

Train a LightGBM gradient boosting regression model.

Reg. LightGBM

Processing

This brick trains a LightGBM gradient boosting model for regression tasks, predicting continuous numerical values (like prices, temperatures, or sales figures).

It handles the complete machine learning workflow: splitting your data into training and test sets, training the model, evaluating its performance with multiple accuracy metrics, and optionally fine-tuning settings automatically to achieve the best results.

LightGBM is a fast, efficient algorithm that builds multiple decision trees sequentially, with each tree learning from the mistakes of previous ones.

Inputs

X
The dataset containing your input features (the variables used to make predictions). This should be a table where each row is an example and each column is a feature, such as product characteristics, weather conditions, or customer attributes.
y
The target values you want to predict (the actual numbers you're trying to forecast). This should be a column of continuous numerical values corresponding to each row in X, like sales amounts, temperatures, or prices.

Inputs Types

Input Types
X DataFrame
y DataSeries, NDArray, List

You can check the list of supported types here: Available Type Hints.

Outputs

Model
The trained LightGBM regression model, ready to make predictions on new data. This is the core output that you'll use to forecast values for unseen examples.
SHAP
A SHAP explainer object that helps you understand which features most influenced the model's predictions. Only provided when "SHAP Explainer" is enabled. This advanced tool shows how each feature contributes to individual predictions.
Label Encoder
An encoder object used to transform the target values during training. This ensures consistent data formatting when making future predictions.
Metrics
Performance measurements showing how well the model predicts on the test data. Contains seven accuracy metrics including Forecast Accuracy, Mean Absolute Error, R2 Score, and others. The format (table or dictionary) is controlled by the "Metrics as" setting.
CV Metrics
Cross-validation performance measurements, showing how consistently the model performs across multiple data splits. Only provided when "Enable Cross-Validation" is turned on. This gives you a more robust estimate of model quality.
Features Importance
A ranked list showing which input features have the most influence on predictions. Higher importance means the feature plays a bigger role in the model's decisions.
Prediction Set
A detailed table combining your test data features, actual values, and the model's predictions side-by-side. This lets you examine individual predictions and spot patterns in errors.
HPO Trials
A history of all experiments tried during automatic hyperparameter optimization, showing which settings were tested and their performance. Only provided when "Hyperparameter Optim." is enabled.
HPO Best
The best-performing hyperparameter settings discovered during optimization. Only provided when "Hyperparameter Optim." is enabled. These are the final settings used in your trained model.

The Metrics output contains the following performance measurements:

  • Forecast Accuracy: The percentage of correctly forecasted magnitude (1 - WAPE), where 100% is perfect prediction.
  • Weighted Absolute Percentage Error (WAPE): Total absolute error divided by total actual values (lower is better).
  • Mean Absolute Error (MAE): Average absolute difference between predictions and actual values.
  • Mean Squared Error (MSE): Average squared difference between predictions and actual values.
  • Root Mean Squared Error (RMSE): Square root of MSE, expressed in the same units as your target variable.
  • R2 Score: Proportion of variance in the target explained by the model (1.0 is perfect, 0.0 is no better than average).
  • Mean Absolute Percentage Error (MAPE): Average percentage error across all predictions.

The CV Metrics output contains the same seven metrics as above, but each includes both a mean value and standard deviation across all cross-validation folds, showing performance consistency.

The Features Importance output contains:

  • feature: The name of each input feature.
  • importance: The relative importance score (higher means more influential in predictions), based on how often the feature is used for splits and how much it improves accuracy.

The Prediction Set output contains:

  • All original input features from the test set (one column per feature).
  • y_true: The actual target values from your test data.
  • y_pred: The model's predicted values for the test data.

The HPO Trials output contains detailed information about each hyperparameter optimization trial:

  • number: Sequential trial number.
  • value: The optimization metric score achieved by this trial.
  • best_value: The best score achieved up to this trial (cumulative best).
  • params_*: The specific hyperparameter values tested in this trial (one column per hyperparameter).
  • state: Whether the trial completed successfully.
  • datetime_start and datetime_complete: Timestamps for trial execution.
  • duration: How long the trial took to complete.

The HPO Best output contains the optimal hyperparameter values discovered during optimization, with keys matching the hyperparameter names (e.g., n_estimators, learning_rate, max_depth).

Outputs Types

Output Types
Model Any
SHAP Any
Label Encoder Any
Metrics DataFrame, Dict
CV Metrics DataFrame
Features Importance DataFrame
Prediction Set DataFrame
HPO Trials DataFrame
HPO Best Dict

You can check the list of supported types here: Available Type Hints.

Options

The Reg. LightGBM brick contains some changeable options:

Max Number of Trees
Controls how many decision trees the model builds. More trees can capture more complex patterns but take longer to train and may overfit. Start with 100 and increase if performance plateaus. Range: 10-5000.
Enable Early Stopping
When turned on, training stops automatically if the model's performance stops improving for a certain number of rounds. This saves time and prevents overfitting by avoiding unnecessary training iterations.
Early Stopping Rounds
How many rounds to wait without improvement before stopping training. For example, with a value of 10, if the model doesn't improve for 10 consecutive rounds, training ends. Only applies when "Enable Early Stopping" is on.
Max Depth
The maximum number of levels in each decision tree. Deeper trees can model more complex relationships but risk overfitting. A value of -1 means no limit (the algorithm decides automatically). Lower values (3-7) create simpler, more generalizable models.
Number of Leaves
The maximum number of leaf nodes (final decision points) in each tree. More leaves allow finer-grained predictions but increase model complexity. Default is 31. This works together with Max Depth to control tree complexity.
Learning Rate
How much each tree contributes to the final prediction (also called shrinkage or step size). Smaller values (0.01-0.05) create more accurate but slower-to-train models. Larger values (0.1-0.3) train faster but may miss optimal solutions. Think of it as the "learning speed" of the algorithm.
Min Split Gain
The minimum improvement in accuracy required to split a tree node further. Higher values prevent the model from making splits that barely improve predictions, reducing overfitting. Zero means any improvement is acceptable.
Min Child Weight (Hessian)
The minimum sum of instance weights (a statistical measure called Hessian) needed in a child node. Higher values make the model more conservative by preventing splits that result in very small groups, which helps prevent overfitting.
Min Leaf Samples
The minimum number of data samples required to be in a leaf node. Higher values prevent the model from creating leaves with very few examples, making it more generalizable and less prone to overfitting.
Colsample by Tree
The fraction of features to randomly select when building each tree. For example, 0.8 means each tree uses 80% of features, chosen randomly. This adds diversity to trees and reduces overfitting. A value of 1.0 uses all features.
L1 Regularization
Applies L1 penalty (Lasso regularization) to reduce model complexity. Higher values push more feature weights toward zero, creating simpler models that may generalize better. Zero means no L1 regularization.
L2 Regularization
Applies L2 penalty (Ridge regularization) to prevent extreme feature weights. Higher values smooth out the model's behavior, reducing sensitivity to individual features and helping prevent overfitting. Zero means no L2 regularization.
Use Bagging
When enabled, trains each tree on a random subset of the data (bootstrap aggregating). This adds randomness that helps prevent overfitting and often improves model robustness. When on, also configure "Subsample Ratio" and "Subsample Frequency".
Subsample Ratio
The fraction of training data to use for each tree when bagging is enabled. For example, 0.8 means each tree sees 80% of the data, randomly selected. Only applies when "Use Bagging" is on.
Subsample Frequency
How often to resample the data (in terms of boosting iterations). For example, a value of 1 means resample every iteration, while 5 means resample every 5 iterations. Zero means no resampling. Only applies when "Use Bagging" is on.
Auto Split Data
When enabled, the brick automatically determines the best split ratio between training and test sets based on your dataset size. Larger datasets use smaller test percentages to maximize training data. When disabled, use "Test/Validation Set %" to set the split manually.
Shuffle Split
When enabled, randomly shuffles the data before splitting into training and test sets. This ensures a representative mix in each set. Turn off only if your data has a meaningful order (like time series) that shouldn't be disrupted.
Test/Validation Set %
The percentage of data reserved for testing and validation when "Auto Split Data" is disabled. For example, 15 means 15% for testing. This data is never used during training, providing an honest measure of model performance.
Retrain On Full Data
When enabled, after evaluating performance, the model is retrained on the entire dataset (combining training and test sets) for production use. The reported metrics still reflect the original train/test split, but the final model has seen all your data, potentially improving its predictions.
Enable Cross-Validation
When enabled, evaluates the model using k-fold cross-validation, which splits data into multiple parts and tests the model on each part separately. This provides a more reliable estimate of performance than a single train/test split, especially for smaller datasets.
Number of CV Folds
How many parts to divide the data into for cross-validation. For example, 5 means the data is split into 5 parts, with the model trained and tested 5 times (each time using a different part as the test set). More folds give more reliable results but take longer. Only applies when "Enable Cross-Validation" is on.
Hyperparameter Optim.
When enabled, automatically searches for the best combination of model settings (like tree depth, learning rate, regularization) by testing many different configurations. This can significantly improve model performance but takes much longer to train.
Optimization Metric
Which performance measure to maximize or minimize during automatic hyperparameter tuning. Only applies when "Hyperparameter Optim." is enabled.
  • Forecast Accuracy: Maximizes the percentage of correctly forecasted magnitude (higher is better, range 0-100%).
  • Weighted Absolute Percentage Error (WAPE): Minimizes the total prediction error relative to actual values (lower is better).
  • Mean Absolute Error (MAE): Minimizes the average absolute difference between predictions and actual values (lower is better).
  • Mean Squared Error (MSE): Minimizes the average squared difference, penalizing large errors more heavily (lower is better).
  • Root Mean Squared Error (RMSE): Minimizes the square root of MSE, in the same units as your target variable (lower is better).
  • R2 Score: Maximizes the proportion of variance explained by the model, where 1.0 is perfect and 0.0 is no better than guessing the average (higher is better).
  • Mean Absolute Percentage Error (MAPE): Minimizes the average percentage error across predictions (lower is better).
Optimization Method
The algorithm used to search for the best hyperparameters. Only applies when "Hyperparameter Optim." is enabled.
  • Tree-structured Parzen: An intelligent method that learns from previous trials to suggest promising hyperparameters (recommended for most cases, balances speed and quality).
  • Gaussian Process: A sophisticated Bayesian approach that models the relationship between hyperparameters and performance (slower but thorough, good for smaller search spaces).
  • CMA-ES: Evolution-based strategy that adapts its search based on the landscape of the hyperparameter space (good for continuous parameters).
  • Random Sobol Search: A quasi-random approach that explores the space more evenly than pure randomness (good baseline method).
  • Random Search: Tries random combinations of hyperparameters (fastest but least efficient, useful as a baseline).
Optimization Iterations
How many different hyperparameter combinations to test during automatic tuning. More iterations increase the chance of finding better settings but take longer. For example, 50 means the brick will train and evaluate 50 different model configurations. Only applies when "Hyperparameter Optim." is enabled.
Metrics as
Controls the format of the main performance metrics output.
SHAP Explainer
When enabled, generates a SHAP (SHapley Additive exPlanations) explainer object that reveals which features drive individual predictions. This advanced interpretability tool shows how each feature contributes positively or negatively to the model's output.
SHAP Sampler
When enabled, uses a representative sample of the training data as the background dataset for SHAP calculations instead of the full dataset. This significantly speeds up SHAP computation for large datasets with minimal impact on explanation quality. Only applies when "SHAP Explainer" is enabled.
SHAP Feature Perturbation
The method SHAP uses to calculate feature contributions. Only applies when "SHAP Explainer" is enabled.
  • Interventional: Treats features as independent and measures their impact by replacing values (faster, works well when features are truly independent).
  • Tree Path Dependent: Uses the tree structure to calculate contributions, accounting for feature interactions (slower but more accurate for tree models where features interact).
Number of Jobs
How many CPU cores to use for parallel processing during training and evaluation. More cores speed up computation.
Random State
A seed number that controls randomness, ensuring reproducible results. Using the same random state with the same data and settings will always produce identical results. Change this value to try different random splits and initializations.
Brick Caching
When enabled, saves all outputs (model, metrics, predictions) to disk after the first run. Subsequent runs with identical data and settings load these cached results instantly instead of retraining, saving significant time. Turn off if you want to force retraining.
Verbose Logging
When enabled, prints detailed progress messages during training, including split sizes, hyperparameter values, and performance metrics. Turn off for quieter operation when running many workflows.
import logging
import warnings
import shap
import json
import xxhash
import hashlib
import tempfile
import sklearn
import scipy
import joblib
import numpy as np
import pandas as pd
import polars as pl
from lightgbm import LGBMRegressor, early_stopping as early_stopping_call, Booster
from sklearn.preprocessing import LabelEncoder
from pathlib import Path
from scipy import sparse
from optuna.samplers import (
    TPESampler,
    RandomSampler,
    GPSampler,
    CmaEsSampler,
    QMCSampler,
)
import optuna
from optuna import Study
from optuna.trial import FrozenTrial
from optuna.pruners import HyperbandPruner
from optuna import create_study
from sklearn.model_selection import train_test_split, cross_validate, KFold
from sklearn.metrics import (
    mean_absolute_error,
    mean_squared_error,
    r2_score,
    root_mean_squared_error,
    mean_absolute_percentage_error,
    make_scorer,
)
from dataclasses import dataclass
from datetime import datetime
from coded_flows.types import (
    Union,
    Dict,
    List,
    Tuple,
    NDArray,
    DataFrame,
    DataSeries,
    Any,
    Tuple,
)
from coded_flows.utils import CodedFlowsLogger

logger = CodedFlowsLogger(name="Reg. LightGBM", level=logging.INFO)
optuna.logging.set_verbosity(optuna.logging.ERROR)
warnings.filterwarnings("ignore", category=optuna.exceptions.ExperimentalWarning)
METRICS_DICT = {
    "Forecast Accuracy": "fa",
    "Weighted Absolute Percentage Error (WAPE)": "wape",
    "Mean Absolute Error (MAE)": "mae",
    "Mean Squared Error (MSE)": "mse",
    "Root Mean Squared Error (RMSE)": "rmse",
    "R2 Score": "r2",
    "Mean Absolute Percentage Error (MAPE)": "mape",
}
METRICS_OPT = {
    "fa": "maximize",
    "wape": "minimize",
    "mae": "minimize",
    "mse": "minimize",
    "rmse": "minimize",
    "r2": "maximize",
    "mape": "minimize",
}
DataType = Union[
    pd.DataFrame, pl.DataFrame, np.ndarray, sparse.spmatrix, pd.Series, pl.Series
]


@dataclass
class _DatasetFingerprint:
    """Lightweight fingerprint of a dataset."""

    hash: str
    shape: tuple
    computed_at: str
    data_type: str
    method: str


class _UniversalDatasetHasher:
    """
    High-performance dataset hasher optimizing for zero-copy operations
    and native backend execution (C/Rust).
    """

    def __init__(
        self,
        data_size: int,
        method: str = "auto",
        sample_size: int = 100000,
        verbose: bool = False,
    ):
        self.method = method
        self.sample_size = sample_size
        self.data_size = data_size
        self.verbose = verbose

    def hash_data(self, data: DataType) -> _DatasetFingerprint:
        """
        Main entry point: hash any supported data format.
        Auto-detects format and applies optimal strategy.
        """
        if isinstance(data, pd.DataFrame):
            return self._hash_pandas(data)
        elif isinstance(data, pl.DataFrame):
            return self._hash_polars(data)
        elif isinstance(data, pd.Series):
            return self._hash_pandas_series(data)
        elif isinstance(data, pl.Series):
            return self._hash_polars_series(data)
        elif isinstance(data, np.ndarray):
            return self._hash_numpy(data)
        elif sparse.issparse(data):
            return self._hash_sparse(data)
        else:
            raise TypeError(f"Unsupported data type: {type(data)}")

    def _hash_pandas(self, df: pd.DataFrame) -> _DatasetFingerprint:
        """
        Optimized Pandas hashing using pd.util.hash_pandas_object.
        Avoids object-to-string conversion overhead.
        """
        method = self._determine_method(self.data_size, self.method)
        self.verbose and logger.info(
            f"Hashing Pandas: {self.data_size:,} rows - {method}"
        )
        target_df = df
        if method == "sampled":
            target_df = self._get_pandas_sample(df)
        hasher = xxhash.xxh128()
        self._hash_schema(
            hasher,
            {
                "columns": df.columns.tolist(),
                "dtypes": {k: str(v) for (k, v) in df.dtypes.items()},
                "shape": df.shape,
            },
        )
        try:
            row_hashes = pd.util.hash_pandas_object(target_df, index=False)
            hasher.update(memoryview(row_hashes.values))
        except Exception as e:
            self.verbose and logger.warning(
                f"Fast hash failed, falling back to slow hash: {e}"
            )
            self._hash_pandas_fallback(hasher, target_df)
        return _DatasetFingerprint(
            hash=hasher.hexdigest(),
            shape=df.shape,
            computed_at=datetime.now().isoformat(),
            data_type="pandas",
            method=method,
        )

    def _get_pandas_sample(self, df: pd.DataFrame) -> pd.DataFrame:
        """Deterministic slicing for sampling (Zero randomness)."""
        if self.data_size <= self.sample_size:
            return df
        chunk = self.sample_size // 3
        head = df.iloc[:chunk]
        mid_idx = self.data_size // 2
        mid = df.iloc[mid_idx : mid_idx + chunk]
        tail = df.iloc[-chunk:]
        return pd.concat([head, mid, tail])

    def _hash_pandas_fallback(self, hasher, df: pd.DataFrame):
        """Legacy fallback for complex object types."""
        for col in df.columns:
            val = df[col].astype(str).values
            hasher.update(val.astype(np.bytes_).tobytes())

    def _hash_polars(self, df: pl.DataFrame) -> _DatasetFingerprint:
        """
        Optimized Polars hashing using native Rust execution.
        """
        method = self._determine_method(self.data_size, self.method)
        self.verbose and logger.info(
            f"Hashing Polars: {self.data_size:,} rows - {method}"
        )
        target_df = df
        if method == "sampled" and self.data_size > self.sample_size:
            indices = self._get_sample_indices(self.data_size, self.sample_size)
            target_df = df.gather(indices)
        hasher = xxhash.xxh128()
        self._hash_schema(
            hasher,
            {
                "columns": df.columns,
                "dtypes": [str(t) for t in df.dtypes],
                "shape": df.shape,
            },
        )
        row_hashes = target_df.hash_rows()
        hasher.update(memoryview(row_hashes.to_numpy()))
        return _DatasetFingerprint(
            hash=hasher.hexdigest(),
            shape=df.shape,
            computed_at=datetime.now().isoformat(),
            data_type="polars",
            method=method,
        )

    def _hash_pandas_series(self, series: pd.Series) -> _DatasetFingerprint:
        """Hash Pandas Series using the fastest vectorized method."""
        self.verbose and logger.info(f"Hashing Pandas Series: {self.data_size:,} rows")
        hasher = xxhash.xxh128()
        self._hash_schema(
            hasher,
            {
                "name": series.name if series.name else "None",
                "dtype": str(series.dtype),
                "shape": series.shape,
            },
        )
        try:
            row_hashes = pd.util.hash_pandas_object(series, index=False)
            hasher.update(memoryview(row_hashes.values))
        except Exception as e:
            self.verbose and logger.warning(f"Series hash failed, falling back: {e}")
            hasher.update(memoryview(series.astype(str).values.tobytes()))
        return _DatasetFingerprint(
            hash=hasher.hexdigest(),
            shape=series.shape,
            computed_at=datetime.now().isoformat(),
            data_type="pandas_series",
            method="full",
        )

    def _hash_polars_series(self, series: pl.Series) -> _DatasetFingerprint:
        """Hash Polars Series using native Polars expressions."""
        self.verbose and logger.info(f"Hashing Polars Series: {self.data_size:,} rows")
        hasher = xxhash.xxh128()
        self._hash_schema(
            hasher,
            {"name": series.name, "dtype": str(series.dtype), "shape": series.shape},
        )
        try:
            row_hashes = series.hash()
            hasher.update(memoryview(row_hashes.to_numpy()))
        except Exception as e:
            self.verbose and logger.warning(
                f"Polars series native hash failed. Falling back."
            )
            hasher.update(str(series.to_list()).encode())
        return _DatasetFingerprint(
            hash=hasher.hexdigest(),
            shape=series.shape,
            computed_at=datetime.now().isoformat(),
            data_type="polars_series",
            method="full",
        )

    def _hash_numpy(self, arr: np.ndarray) -> _DatasetFingerprint:
        """
        Optimized NumPy hashing using Buffer Protocol (Zero-Copy).
        """
        hasher = xxhash.xxh128()
        self._hash_schema(
            hasher,
            {"shape": arr.shape, "dtype": str(arr.dtype), "strides": arr.strides},
        )
        if arr.flags["C_CONTIGUOUS"] or arr.flags["F_CONTIGUOUS"]:
            hasher.update(memoryview(arr))
        else:
            hasher.update(memoryview(np.ascontiguousarray(arr)))
        return _DatasetFingerprint(
            hash=hasher.hexdigest(),
            shape=arr.shape,
            computed_at=datetime.now().isoformat(),
            data_type="numpy",
            method="full",
        )

    def _hash_sparse(self, matrix: sparse.spmatrix) -> _DatasetFingerprint:
        """
        Optimized sparse hashing. Hashes underlying data arrays directly.
        """
        if not (sparse.isspmatrix_csr(matrix) or sparse.isspmatrix_csc(matrix)):
            matrix = matrix.tocsr()
        hasher = xxhash.xxh128()
        self._hash_schema(
            hasher, {"shape": matrix.shape, "format": matrix.format, "nnz": matrix.nnz}
        )
        hasher.update(memoryview(matrix.data))
        hasher.update(memoryview(matrix.indices))
        hasher.update(memoryview(matrix.indptr))
        return _DatasetFingerprint(
            hash=hasher.hexdigest(),
            shape=matrix.shape,
            computed_at=datetime.now().isoformat(),
            data_type=f"sparse_{matrix.format}",
            method="sparse",
        )

    def _determine_method(self, rows: int, requested: str) -> str:
        if requested != "auto":
            return requested
        if rows < 5000000:
            return "full"
        return "sampled"

    def _hash_schema(self, hasher, schema: Dict[str, Any]):
        """Compact schema hashing."""
        hasher.update(
            json.dumps(schema, sort_keys=True, separators=(",", ":")).encode()
        )

    def _get_sample_indices(self, total_rows: int, sample_size: int) -> list:
        """Calculate indices for sampling without generating full range lists."""
        chunk = sample_size // 3
        indices = list(range(min(chunk, total_rows)))
        mid_start = max(0, total_rows // 2 - chunk // 2)
        mid_end = min(mid_start + chunk, total_rows)
        indices.extend(range(mid_start, mid_end))
        last_start = max(0, total_rows - chunk)
        indices.extend(range(last_start, total_rows))
        return sorted(list(set(indices)))


def wape_score(y_true, y_pred):
    """
    Calculates Weighted Absolute Percentage Error (WAPE).

    WAPE = sum(|Error|) / sum(|Groundtruth|)
    """
    y_true = np.asarray(y_true, dtype=np.float64)
    y_pred = np.asarray(y_pred, dtype=np.float64)
    eps = np.finfo(np.float64).eps
    sum_abs_error = np.sum(np.abs(y_true - y_pred))
    sum_abs_truth = np.maximum(np.sum(np.abs(y_true)), eps)
    return sum_abs_error / sum_abs_truth


def forecast_accuracy(y_true, y_pred):
    """
    Calculates Forecast Accuracy.

    FA = 1 - (sum(|Error|) / sum(|Groundtruth|))
    """
    y_true = np.asarray(y_true, dtype=np.float64)
    y_pred = np.asarray(y_pred, dtype=np.float64)
    eps = np.finfo(np.float64).eps
    sum_abs_error = np.sum(np.abs(y_true - y_pred))
    sum_abs_truth = np.maximum(np.sum(np.abs(y_true)), eps)
    return 1 - sum_abs_error / sum_abs_truth


def _normalize_hpo_df(df):
    df = df.copy()
    param_cols = [c for c in df.columns if c.startswith("params_")]
    df[param_cols] = df[param_cols].astype("string[pyarrow]")
    return df


def _smart_split(
    n_samples,
    X,
    y,
    *,
    random_state=42,
    shuffle=True,
    stratify=None,
    fixed_test_split=None,
    verbose=True,
):
    """
    Parameters
    ----------
    n_samples : int
        Number of samples in the dataset (len(X) or len(y))
    X : array-like
        Features
    y : array-like
        Target
    random_state : int
    shuffle : bool
    stratify : array-like or None
        For stratified splitting (recommended for classification)

    Returns
    -------
    If return_val=True  → X_train, X_val, X_test, y_train, y_val, y_test
    If return_val=False → X_train, X_test, y_train, y_test
    """
    if fixed_test_split:
        test_ratio = fixed_test_split
        val_ratio = fixed_test_split
    elif n_samples <= 1000:
        test_ratio = 0.2
        val_ratio = 0.1
    elif n_samples < 10000:
        test_ratio = 0.15
        val_ratio = 0.15
    elif n_samples < 100000:
        test_ratio = 0.1
        val_ratio = 0.1
    elif n_samples < 1000000:
        test_ratio = 0.05
        val_ratio = 0.05
    else:
        test_ratio = 0.01
        val_ratio = 0.01
    (X_train, X_test, y_train, y_test) = train_test_split(
        X,
        y,
        test_size=test_ratio,
        random_state=random_state,
        shuffle=shuffle,
        stratify=stratify,
    )
    val_size_in_train = val_ratio / (1 - test_ratio)
    verbose and logger.info(
        f"Split → Train: {1 - test_ratio:.2%} | Test: {test_ratio:.2%} (no validation set)"
    )
    return (X_train, X_test, y_train, y_test, val_size_in_train)


def _ensure_feature_names(X, feature_names=None):
    if isinstance(X, pd.DataFrame):
        return list(X.columns)
    if isinstance(X, np.ndarray):
        if feature_names is None:
            feature_names = [f"feature_{i}" for i in range(X.shape[1])]
        return feature_names
    raise TypeError("X must be a pandas DataFrame or numpy ndarray")


def _perform_cross_validation(
    model, X, y, cv_folds, shuffle, random_state, n_jobs, verbose
) -> dict[str, Any]:
    """Perform cross-validation on the regression model."""
    verbose and logger.info(f"Performing {cv_folds}-fold cross-validation...")
    cv = KFold(n_splits=cv_folds, shuffle=shuffle, random_state=random_state)
    scoring = {
        "MAE": "neg_mean_absolute_error",
        "MSE": "neg_mean_squared_error",
        "RMSE": "neg_root_mean_squared_error",
        "MAPE": "neg_mean_absolute_percentage_error",
        "R2": "r2",
        "WAPE": make_scorer(wape_score, greater_is_better=False),
        "Forecast_Accuracy": make_scorer(forecast_accuracy, greater_is_better=True),
    }
    cv_results = cross_validate(
        model, X, y, cv=cv, scoring=scoring, return_train_score=False, n_jobs=n_jobs
    )

    def get_score_stats(metric_key, invert_sign=False):
        key = f"test_{metric_key}"
        if key in cv_results:
            scores = cv_results[key]
            if invert_sign:
                scores = -scores
            return (scores.mean(), scores.std())
        return (0.0, 0.0)

    (mae_mean, mae_std) = get_score_stats("MAE", invert_sign=True)
    (mse_mean, mse_std) = get_score_stats("MSE", invert_sign=True)
    (rmse_mean, rmse_std) = get_score_stats("RMSE", invert_sign=True)
    (mape_mean, mape_std) = get_score_stats("MAPE", invert_sign=True)
    (wape_mean, wape_std) = get_score_stats("WAPE", invert_sign=True)
    (r2_mean, r2_std) = get_score_stats("R2", invert_sign=False)
    (fa_mean, fa_std) = get_score_stats("Forecast_Accuracy", invert_sign=False)
    verbose and logger.info(f"CV MAE          : {mae_mean:.4f} (+/- {mae_std:.4f})")
    verbose and logger.info(f"CV MSE          : {mse_mean:.4f} (+/- {mse_std:.4f})")
    verbose and logger.info(f"CV RMSE         : {rmse_mean:.4f} (+/- {rmse_std:.4f})")
    verbose and logger.info(f"CV MAPE         : {mape_mean:.4f} (+/- {mape_std:.4f})")
    verbose and logger.info(f"CV WAPE         : {wape_mean:.4f} (+/- {wape_std:.4f})")
    verbose and logger.info(f"CV R2 Score     : {r2_mean:.4f} (+/- {r2_std:.4f})")
    verbose and logger.info(f"CV Forecast Acc : {fa_mean:.4f} (+/- {fa_std:.4f})")
    CV_metrics = pd.DataFrame(
        {
            "Metric": [
                "Mean Absolute Error (MAE)",
                "Mean Squared Error (MSE)",
                "Root Mean Squared Error (RMSE)",
                "Mean Absolute Percentage Error (MAPE)",
                "Weighted Absolute Percentage Error (WAPE)",
                "R2 Score",
                "Forecast Accuracy",
            ],
            "Mean": [
                mae_mean,
                mse_mean,
                rmse_mean,
                mape_mean,
                wape_mean,
                r2_mean,
                fa_mean,
            ],
            "Std": [mae_std, mse_std, rmse_std, mape_std, wape_std, r2_std, fa_std],
        }
    )
    return CV_metrics


def _compute_score(model, X, y, metric):
    """
    Computes the score for the model on the given data based on the selected metric.
    Assumes 'metric' is passed as the short code (e.g., "MAE", "R2", "FA").
    """
    y_pred = model.predict(X)
    if metric == "mae":
        score = mean_absolute_error(y, y_pred)
    elif metric == "mse":
        score = mean_squared_error(y, y_pred)
    elif metric == "rmse":
        score = root_mean_squared_error(y, y_pred)
    elif metric == "mape":
        score = mean_absolute_percentage_error(y, y_pred)
    elif metric == "r2":
        score = r2_score(y, y_pred)
    elif metric == "wape" or metric == "fa":
        y_true_np = np.array(y, dtype=float).flatten()
        y_pred_np = np.array(y_pred, dtype=float).flatten()
        eps = np.finfo(np.float64).eps
        sum_abs_error = np.sum(np.abs(y_true_np - y_pred_np))
        sum_abs_truth = np.maximum(np.sum(np.abs(y_true_np)), eps)
        wape_val = sum_abs_error / sum_abs_truth
        if metric == "fa":
            score = 1.0 - wape_val
        else:
            score = wape_val
    else:
        raise ValueError(f"Unknown regression metric: {metric}")
    return score


def _get_cv_scoring_object(metric: str) -> Any:
    """
    Returns a scoring object (string or callable) suitable for cross_validate or GridSearchCV.
    Used during HPO for Regression.
    """
    if metric == "mae":
        return "neg_mean_absolute_error"
    elif metric == "mse":
        return "neg_mean_squared_error"
    elif metric == "rmse":
        return "neg_root_mean_squared_error"
    elif metric == "r2":
        return "r2"
    elif metric == "mape":
        return "neg_mean_absolute_percentage_error"
    elif metric == "wape":
        return make_scorer(wape_score, greater_is_better=False)
    elif metric == "fa":
        return make_scorer(forecast_accuracy, greater_is_better=True)
    else:
        return "neg_root_mean_squared_error"


def _hyperparameters_optimization(
    X,
    y,
    constant_hyperparameters,
    optimization_metric,
    val_ratio,
    shuffle_split,
    use_cross_val,
    cv_folds,
    n_trials=50,
    strategy="maximize",
    sampler="Tree-structured Parzen",
    seed=None,
    n_jobs=-1,
    verbose=False,
):
    direction = "maximize" if strategy.lower() == "maximize" else "minimize"
    sampler_map = {
        "Tree-structured Parzen": TPESampler(seed=seed),
        "Gaussian Process": GPSampler(seed=seed),
        "CMA-ES": CmaEsSampler(seed=seed),
        "Random Search": RandomSampler(seed=seed),
        "Random Sobol Search": QMCSampler(seed=seed),
    }
    if sampler in sampler_map:
        chosen_sampler = sampler_map[sampler]
    else:
        logger.warning(f"Sampler '{sampler}' not recognized → falling back to TPE")
        chosen_sampler = TPESampler(seed=seed)
    chosen_pruner = HyperbandPruner()
    if use_cross_val:
        cv = KFold(n_splits=cv_folds, shuffle=shuffle_split, random_state=seed)
        cv_score_obj = _get_cv_scoring_object(optimization_metric)
    else:
        (X_train, X_val, y_train, y_val) = train_test_split(
            X, y, test_size=val_ratio, random_state=seed, shuffle=shuffle_split
        )
    use_bagging = constant_hyperparameters.get("use_bagging")
    model_objective = constant_hyperparameters.get("objective")

    def logging_callback(study: Study, trial: FrozenTrial):
        """Callback function to log trial progress"""
        verbose and logger.info(
            f"Trial {trial.number} finished with value: {trial.value} and parameters: {trial.params}"
        )
        try:
            verbose and logger.info(f"Best value so far: {study.best_value}")
            verbose and logger.info(f"Best parameters so far: {study.best_params}")
        except ValueError:
            verbose and logger.info(f"No successful trials completed yet")
        verbose and logger.info(f"" + "-" * 50)

    def objective(trial):
        try:
            params = {}
            params["n_estimators"] = trial.suggest_int("n_estimators", 50, 1000)
            params["max_depth"] = trial.suggest_int("max_depth", 1, 15)
            params["num_leaves"] = trial.suggest_int("num_leaves", 20, 500)
            params["learning_rate"] = trial.suggest_float(
                "learning_rate", 0.001, 0.5, log=True
            )
            params["min_split_gain"] = trial.suggest_float("min_split_gain", 0.0, 5.0)
            params["min_child_weight"] = trial.suggest_float(
                "min_child_weight", 0.0, 10.0
            )
            params["min_child_samples"] = trial.suggest_int("min_child_samples", 5, 100)
            params["colsample_bytree"] = trial.suggest_float(
                "colsample_bytree", 0.1, 1.0
            )
            params["reg_alpha"] = trial.suggest_float(
                "reg_alpha", 1e-08, 100.0, log=True
            )
            params["reg_lambda"] = trial.suggest_float(
                "reg_lambda", 1e-08, 100.0, log=True
            )
            if use_bagging:
                params["subsample"] = trial.suggest_float("subsample", 0.5, 1.0)
                params["subsample_freq"] = trial.suggest_int("subsample_freq", 1, 10)
            else:
                params["subsample"] = 1.0
                params["subsample_freq"] = 0
            model = LGBMRegressor(
                **params,
                objective=model_objective,
                random_state=seed,
                n_jobs=n_jobs,
                verbosity=-1,
            )
            if use_cross_val:
                scores = cross_validate(
                    model, X, y, cv=cv, n_jobs=n_jobs, scoring=cv_score_obj
                )
                return scores["test_score"].mean()
            else:
                model.fit(X_train, y_train)
                score = _compute_score(model, X_val, y_val, optimization_metric)
                return score
        except Exception as e:
            verbose and logger.error(
                f"Trial {trial.number} failed with error: {str(e)}"
            )
            raise

    study = create_study(
        direction=direction, sampler=chosen_sampler, pruner=chosen_pruner
    )
    study.optimize(
        objective,
        n_trials=n_trials,
        catch=(Exception,),
        n_jobs=n_jobs,
        callbacks=[logging_callback],
    )
    verbose and logger.info(f"Optimization completed!")
    verbose and logger.info(
        f"   Best Number of Trees       : {study.best_params['n_estimators']}"
    )
    verbose and logger.info(
        f"   Best Max Depth             : {study.best_params['max_depth']}"
    )
    verbose and logger.info(
        f"   Best Number of Leaves      : {study.best_params['num_leaves']}"
    )
    verbose and logger.info(
        f"   Best Learning Rate         : {study.best_params['learning_rate']}"
    )
    verbose and logger.info(
        f"   Best L1 Regularization     : {study.best_params['reg_alpha']}"
    )
    verbose and logger.info(
        f"   Best L2 Regularization     : {study.best_params['reg_lambda']}"
    )
    verbose and logger.info(
        f"   Best Min Child Weight      : {study.best_params['min_child_weight']}"
    )
    verbose and logger.info(
        f"   Best Min Split Gain        : {study.best_params['min_split_gain']}"
    )
    verbose and logger.info(
        f"   Best Min Leaf Samples      : {study.best_params['min_child_samples']}"
    )
    verbose and logger.info(
        f"   Best Colsample by Tree     : {study.best_params['colsample_bytree']}"
    )
    if use_bagging:
        verbose and logger.info(
            f"   Best Subsample Ratio       : {study.best_params['subsample']}"
        )
        verbose and logger.info(
            f"   Best Subsample Frequency   : {study.best_params['subsample_freq']}"
        )
    verbose and logger.info(
        f"   Best {optimization_metric:<22}: {study.best_value:.4f}"
    )
    verbose and logger.info(f"   Sampler used               : {sampler}")
    verbose and logger.info(f"   Direction                  : {direction}")
    if use_cross_val:
        verbose and logger.info(f"   Cross-validation           : {cv_folds}-fold")
    else:
        verbose and logger.info(
            f"   Validation                 : single train/val split"
        )
    trials = study.trials_dataframe()
    trials["best_value"] = trials["value"].cummax()
    cols = list(trials.columns)
    value_idx = cols.index("value")
    cols = [c for c in cols if c != "best_value"]
    new_order = cols[: value_idx + 1] + ["best_value"] + cols[value_idx + 1 :]
    trials = trials[new_order]
    return (study.best_params, trials)


def _combine_test_data(X_test, y_true, y_pred, features_names=None):
    """
    Combine X_test, y_true, y_pred into a single DataFrame.

    Parameters:
    -----------
    X_test : pandas/polars DataFrame, numpy array, or scipy sparse matrix
        Test features
    y_true : pandas/polars Series, numpy array, or list
        True labels
    y_pred : pandas/polars Series, numpy array, or list
        Predicted labels

    Returns:
    --------
    pandas.DataFrame
        Combined DataFrame with features, y_true, and y_pred
    """
    if sparse.issparse(X_test):
        X_df = pd.DataFrame(X_test.toarray())
    elif isinstance(X_test, np.ndarray):
        X_df = pd.DataFrame(X_test)
    elif hasattr(X_test, "to_pandas"):
        X_df = X_test.to_pandas()
    elif isinstance(X_test, pd.DataFrame):
        X_df = X_test.copy()
    else:
        raise TypeError(f"Unsupported type for X_test: {type(X_test)}")
    if X_df.columns.tolist() == list(range(len(X_df.columns))):
        X_df.columns = (
            [f"feature_{i}" for i in range(len(X_df.columns))]
            if features_names is None
            else features_names
        )
    if isinstance(y_true, list):
        y_true_series = pd.Series(y_true, name="y_true")
    elif isinstance(y_true, np.ndarray):
        y_true_series = pd.Series(y_true, name="y_true")
    elif hasattr(y_true, "to_pandas"):
        y_true_series = y_true.to_pandas()
        y_true_series.name = "y_true"
    elif isinstance(y_true, pd.Series):
        y_true_series = y_true.copy()
        y_true_series.name = "y_true"
    else:
        raise TypeError(f"Unsupported type for y_true: {type(y_true)}")
    if isinstance(y_pred, list):
        y_pred_series = pd.Series(y_pred, name="y_pred")
    elif isinstance(y_pred, np.ndarray):
        y_pred_series = pd.Series(y_pred, name="y_pred")
    elif hasattr(y_pred, "to_pandas"):
        y_pred_series = y_pred.to_pandas()
        y_pred_series.name = "y_pred"
    elif isinstance(y_pred, pd.Series):
        y_pred_series = y_pred.copy()
        y_pred_series.name = "y_pred"
    else:
        raise TypeError(f"Unsupported type for y_pred: {type(y_pred)}")
    X_df = X_df.reset_index(drop=True)
    y_true_series = y_true_series.reset_index(drop=True)
    y_pred_series = y_pred_series.reset_index(drop=True)
    result_df = pd.concat([X_df, y_true_series, y_pred_series], axis=1)
    return result_df


def _get_feature_importance(model, feature_names=None, sort=True, top_n=None):
    """
    Extract feature importance from a Random Forest model.

    Parameters:
    -----------
    model : Fitted model
    feature_names : list or array-like, optional
        Names of features. If None, uses generic names like 'feature_0', 'feature_1', etc.
    sort : bool, default=True
        Whether to sort features by importance (descending)
    top_n : int, optional
        If specified, returns only the top N most important features

    Returns:
    --------
    pd.DataFrame
        DataFrame with columns: 'feature', 'importance'
        Importance values represent the mean decrease in impurity (Gini importance)
    """
    importances = model.feature_importances_
    if feature_names is None:
        feature_names = [f"feature_{i}" for i in range(len(importances))]
    importance_df = pd.DataFrame({"feature": feature_names, "importance": importances})
    if sort:
        importance_df = importance_df.sort_values("importance", ascending=False)
    importance_df = importance_df.reset_index(drop=True)
    if top_n is not None:
        importance_df = importance_df.head(top_n)
    return importance_df


def _smart_shap_background(
    X: Union[np.ndarray, pd.DataFrame],
    model_type: str = "tree",
    seed: int = 42,
    verbose: bool = True,
) -> Union[np.ndarray, pd.DataFrame, object]:
    """
    Intelligently prepares a background dataset for SHAP based on model type.

    Strategies:
    - Tree: Higher sample cap (1000), uses Random Sampling (preserves data structure).
    - Other: Lower sample cap (100), uses K-Means (maximizes info density).
    """
    (n_rows, n_features) = X.shape
    if model_type == "tree":
        max_samples = 1000
        use_kmeans = False
    else:
        max_samples = 100
        use_kmeans = True
    if n_rows <= max_samples:
        verbose and logger.info(
            f"✓ Dataset small ({n_rows} <= {max_samples}). Using full data."
        )
        return X
    verbose and logger.info(
        f"⚡ Large dataset detected ({n_rows} rows). Optimization Strategy: {('K-Means' if use_kmeans else 'Random Sampling')}"
    )
    if use_kmeans:
        try:
            verbose and logger.info(
                f"   Summarizing to {max_samples} weighted centroids..."
            )
            return shap.kmeans(X, max_samples)
        except Exception as e:
            logger.warning(
                f"   K-Means failed ({str(e)}). Falling back to random sampling."
            )
            return shap.sample(X, max_samples, random_state=seed)
    else:
        verbose and logger.info(f"   Sampling {max_samples} random rows...")
        return shap.sample(X, max_samples, random_state=seed)


def train_reg_lightgbm(
    X: DataFrame, y: Union[DataSeries, NDArray, List], options=None
) -> Tuple[
    Any,
    Any,
    Any,
    Union[DataFrame, Dict],
    DataFrame,
    DataFrame,
    DataFrame,
    DataFrame,
    Dict,
]:
    options = options or {}
    n_estimators = options.get("n_estimators", 100)
    early_stopping = options.get("early_stopping", True)
    early_stopping_rounds = options.get("early_stopping_rounds", 10)
    max_depth = options.get("max_depth", -1)
    num_leaves = options.get("num_leaves", 31)
    learning_rate = options.get("learning_rate", 0.1)
    min_split_gain = options.get("min_split_gain", 0.0)
    min_child_weight = options.get("min_child_weight", 0.001)
    min_child_samples = options.get("min_child_samples", 20)
    colsample_bytree = options.get("colsample_bytree", 1.0)
    reg_alpha = options.get("reg_alpha", 0.0)
    reg_lambda = options.get("reg_lambda", 0.0)
    use_bagging = options.get("use_bagging", False)
    subsample = options.get("subsample", 1.0)
    subsample_freq = options.get("subsample_freq", 0)
    auto_split = options.get("auto_split", True)
    test_val_size = options.get("test_val_size", 15) / 100
    shuffle_split = options.get("shuffle_split", True)
    retrain_on_full = options.get("retrain_on_full", False)
    use_cross_validation = options.get("use_cross_validation", False)
    cv_folds = options.get("cv_folds", 5)
    use_hpo = options.get("use_hyperparameter_optimization", False)
    optimization_metric = options.get(
        "optimization_metric", "Root Mean Squared Error (RMSE)"
    )
    optimization_metric = METRICS_DICT[optimization_metric]
    optimization_method = options.get("optimization_method", "Tree-structured Parzen")
    optimization_iterations = options.get("optimization_iterations", 50)
    return_shap_explainer = options.get("return_shap_explainer", False)
    use_shap_sampler = options.get("use_shap_sampler", False)
    shap_feature_perturbation = options.get(
        "shap_feature_perturbation", "Interventional"
    )
    metrics_as = options.get("metrics_as", "Dataframe")
    n_jobs_str = options.get("n_jobs", "1")
    random_state = options.get("random_state", 42)
    activate_caching = options.get("activate_caching", False)
    verbose = options.get("verbose", True)
    n_jobs_int = -1 if n_jobs_str == "All" else int(n_jobs_str)
    skip_computation = False
    Model = None
    Metrics = pd.DataFrame()
    CV_Metrics = pd.DataFrame()
    Features_Importance = pd.DataFrame()
    Label_Encoder = None
    SHAP = None
    HPO_Trials = pd.DataFrame()
    HPO_Best = None
    fa = None
    wape = None
    mae = None
    mse = None
    rmse = None
    r2 = None
    mape = None
    (n_samples, _) = X.shape
    if activate_caching:
        verbose and logger.info(f"Caching is activate")
        data_hasher = _UniversalDatasetHasher(n_samples, verbose=verbose)
        X_hash = data_hasher.hash_data(X).hash
        y_hash = data_hasher.hash_data(y).hash
        all_hash_base_text = f"HASH BASE TEXT LIGHTGBMPandas Version {pd.__version__}POLARS Version {pl.__version__}Numpy Version {np.__version__}Scikit Learn Version {sklearn.__version__}Scipy Version {scipy.__version__}{('SHAP Version ' + shap.__version__ if return_shap_explainer else 'NO SHAP Version')}{X_hash}{y_hash}{n_estimators}{early_stopping}{early_stopping_rounds}{max_depth}{learning_rate}{reg_lambda}{reg_alpha}{min_child_weight}{subsample}{colsample_bytree}{num_leaves}{min_split_gain}{min_child_samples}{use_bagging}{subsample_freq}{('Use HPO' if use_hpo else 'No HPO')}{(optimization_metric if use_hpo else 'No HPO Metric')}{(optimization_method if use_hpo else 'No HPO Method')}{(optimization_iterations if use_hpo else 'No HPO Iter')}{(cv_folds if use_cross_validation else 'No CV')}{('Auto Split' if auto_split else test_val_size)}{shuffle_split}{return_shap_explainer}{shap_feature_perturbation}{use_shap_sampler}{random_state}"
        all_hash = hashlib.sha256(all_hash_base_text.encode("utf-8")).hexdigest()
        verbose and logger.info(f"Hash was computed: {all_hash}")
        temp_folder = Path(tempfile.gettempdir())
        cache_folder = temp_folder / "coded-flows-cache"
        cache_folder.mkdir(parents=True, exist_ok=True)
        model_path = cache_folder / f"{all_hash}.txt"
        metrics_dict_path = cache_folder / f"metrics_{all_hash}.json"
        metrics_df_path = cache_folder / f"metrics_{all_hash}.parquet"
        cv_metrics_path = cache_folder / f"cv_metrics_{all_hash}.parquet"
        hpo_trials_path = cache_folder / f"hpo_trials_{all_hash}.parquet"
        hpo_best_params_path = cache_folder / f"hpo_best_params_{all_hash}.json"
        features_importance_path = (
            cache_folder / f"features_importance_{all_hash}.parquet"
        )
        prediction_set_path = cache_folder / f"prediction_set_{all_hash}.parquet"
        shap_path = cache_folder / f"{all_hash}.shap"
        label_encoder_path = cache_folder / f"{all_hash}.encoder"
        skip_computation = model_path.is_file()
    if not skip_computation:
        features_names = X.columns if hasattr(X, "columns") else None
        shap_feature_names = _ensure_feature_names(X)
        Label_Encoder = LabelEncoder()
        y = Label_Encoder.fit_transform(y)
        eval_metric = "l2"
        objective = "regression"
        fixed_test_split = None if auto_split else test_val_size
        (X_train, X_test, y_train, y_test, val_ratio) = _smart_split(
            n_samples,
            X,
            y,
            random_state=random_state,
            shuffle=shuffle_split,
            fixed_test_split=fixed_test_split,
            verbose=verbose,
        )
        if use_hpo:
            verbose and logger.info(f"Performing Hyperparameters Optimization")
            constant_hyperparameters = {
                "use_bagging": use_bagging,
                "objective": objective,
            }
            (HPO_Best, HPO_Trials) = _hyperparameters_optimization(
                X_train,
                y_train,
                constant_hyperparameters,
                optimization_metric,
                val_ratio,
                shuffle_split,
                use_cross_validation,
                cv_folds,
                optimization_iterations,
                METRICS_OPT[optimization_metric],
                optimization_method,
                random_state,
                n_jobs_int,
                verbose=verbose,
            )
            HPO_Trials = _normalize_hpo_df(HPO_Trials)
            n_estimators = HPO_Best["n_estimators"]
            max_depth = HPO_Best["max_depth"]
            num_leaves = HPO_Best["num_leaves"]
            learning_rate = HPO_Best["learning_rate"]
            min_split_gain = HPO_Best["min_split_gain"]
            min_child_weight = HPO_Best["min_child_weight"]
            min_child_samples = HPO_Best["min_child_samples"]
            colsample_bytree = HPO_Best["colsample_bytree"]
            reg_alpha = HPO_Best["reg_alpha"]
            reg_lambda = HPO_Best["reg_lambda"]
            subsample = HPO_Best.get("subsample", 1)
            subsample_freq = HPO_Best.get("subsample_freq", 0.0)
        model_params = {}
        model_params["n_estimators"] = n_estimators
        model_params["max_depth"] = max_depth
        model_params["num_leaves"] = num_leaves
        model_params["learning_rate"] = learning_rate
        model_params["min_split_gain"] = min_split_gain
        model_params["min_child_weight"] = min_child_weight
        model_params["min_child_samples"] = min_child_samples
        model_params["colsample_bytree"] = colsample_bytree
        model_params["reg_alpha"] = reg_alpha
        model_params["reg_lambda"] = reg_lambda
        model_params["subsample"] = subsample if use_bagging else 1.0
        model_params["subsample_freq"] = subsample_freq if use_bagging else 0
        Model = LGBMRegressor(
            **model_params,
            objective=objective,
            random_state=random_state,
            n_jobs=n_jobs_int,
            verbosity=-1,
        )
        if early_stopping and (not use_hpo):
            (X_train, X_val, y_train, y_val) = train_test_split(
                X_train,
                y_train,
                test_size=val_ratio,
                random_state=random_state,
                shuffle=shuffle_split,
            )
            callbacks = [
                early_stopping_call(
                    stopping_rounds=early_stopping_rounds, verbose=False
                )
            ]
            Model.fit(
                X_train,
                y_train,
                eval_set=[(X_val, y_val)],
                eval_metric=eval_metric,
                callbacks=callbacks,
            )
            model_params["n_estimators"] = Model.best_iteration_
        else:
            Model.fit(X_train, y_train)
        if use_cross_validation and (not use_hpo):
            verbose and logger.info(
                f"Using Cross-Validation to measure performance metrics"
            )
            CV_Model = LGBMRegressor(
                **model_params,
                objective=objective,
                random_state=random_state,
                n_jobs=n_jobs_int,
                verbosity=-1,
            )
            CV_Metrics = _perform_cross_validation(
                CV_Model,
                X_train,
                y_train,
                cv_folds,
                shuffle_split,
                random_state,
                n_jobs_int,
                verbose,
            )
        y_pred = Model.predict(X_test)
        fa = forecast_accuracy(y_test, y_pred)
        wape = wape_score(y_test, y_pred)
        mae = mean_absolute_error(y_test, y_pred)
        mse = mean_squared_error(y_test, y_pred)
        rmse = root_mean_squared_error(y_test, y_pred)
        r2 = r2_score(y_test, y_pred)
        mape = mean_absolute_percentage_error(y_test, y_pred)
        if metrics_as == "Dataframe":
            Metrics = pd.DataFrame(
                {
                    "Metric": [
                        "Forecast Accuracy",
                        "Weighted Absolute Percentage Error",
                        "Mean Absolute Error",
                        "Mean Squared Error",
                        "Root Mean Squared Error",
                        "R2 Score",
                        "Mean Absolute Percentage Error",
                    ],
                    "Value": [fa, wape, mae, mse, rmse, r2, mape],
                }
            )
        else:
            Metrics = {
                "forecast_accuracy": fa,
                "weighted_absolute_percentage_error ": wape,
                "mean_absolute_error": mae,
                "mean_squared_error": mse,
                "root_mean_squared_error": rmse,
                "r2_score": r2,
                "mean_absolute_percentage_error": mape,
            }
        verbose and logger.info(f"Forecast Accuracy                  : {fa:.2%}")
        verbose and logger.info(f"Weighted Absolute Percentage Error : {wape:.2%}")
        verbose and logger.info(f"Mean Absolute Error                : {mae:.4f}")
        verbose and logger.info(f"Mean Squared Error                 : {mse:.4f}")
        verbose and logger.info(f"Root Mean Squared Error            : {rmse:.4f}")
        verbose and logger.info(f"R2 Score                           : {r2:.4f}")
        verbose and logger.info(f"Mean Absolute Percentage Error     : {mape:.2%}")
        Prediction_Set = _combine_test_data(X_test, y_test, y_pred, features_names)
        verbose and logger.info(f"Prediction Set created")
        if retrain_on_full:
            verbose and logger.info(
                "Retraining model on full dataset for production deployment"
            )
            Model.fit(X, y)
            verbose and logger.info(
                "Model successfully retrained on full dataset. Reported metrics remain from original held-out test set."
            )
        Features_Importance = _get_feature_importance(Model, features_names)
        verbose and logger.info(f"Features Importance computed")
        if return_shap_explainer:
            if shap_feature_perturbation == "Interventional":
                SHAP = shap.TreeExplainer(
                    Model.booster_,
                    (
                        _smart_shap_background(
                            X if retrain_on_full else X_train,
                            model_type="tree",
                            seed=random_state,
                            verbose=verbose,
                        )
                        if use_shap_sampler
                        else X if retrain_on_full else X_train
                    ),
                    feature_names=shap_feature_names,
                )
            else:
                SHAP = shap.TreeExplainer(
                    Model.booster_,
                    feature_names=shap_feature_names,
                    feature_perturbation="tree_path_dependent",
                )
            verbose and logger.info(f"SHAP explainer generated")
        if activate_caching:
            verbose and logger.info(f"Caching output elements")
            Model.booster_.save_model(model_path)
            if isinstance(Metrics, dict):
                with metrics_dict_path.open("w", encoding="utf-8") as f:
                    json.dump(Metrics, f, ensure_ascii=False, indent=4)
            else:
                Metrics.to_parquet(metrics_df_path)
            if use_cross_validation and (not use_hpo):
                CV_Metrics.to_parquet(cv_metrics_path)
            if use_hpo:
                HPO_Trials.to_parquet(hpo_trials_path)
                with hpo_best_params_path.open("w", encoding="utf-8") as f:
                    json.dump(HPO_Best, f, ensure_ascii=False, indent=4)
            Features_Importance.to_parquet(features_importance_path)
            Prediction_Set.to_parquet(prediction_set_path)
            if return_shap_explainer:
                with shap_path.open("wb") as f:
                    joblib.dump(SHAP, f)
            joblib.dump(Label_Encoder, label_encoder_path)
            verbose and logger.info(f"Caching done")
    else:
        verbose and logger.info(f"Skipping computations and loading cached elements")
        Model = Booster(model_file=str(model_path))
        verbose and logger.info(f"Model loaded")
        if metrics_dict_path.is_file():
            with metrics_dict_path.open("r", encoding="utf-8") as f:
                Metrics = json.load(f)
        else:
            Metrics = pd.read_parquet(metrics_df_path)
        verbose and logger.info(f"Metrics loaded")
        if use_cross_validation and (not use_hpo):
            CV_Metrics = pd.read_parquet(cv_metrics_path)
            verbose and logger.info(f"Cross Validation metrics loaded")
        if use_hpo:
            HPO_Trials = pd.read_parquet(hpo_trials_path)
            with hpo_best_params_path.open("r", encoding="utf-8") as f:
                HPO_Best = json.load(f)
            verbose and logger.info(
                f"Hyperparameters Optimization trials and best params loaded"
            )
        Features_Importance = pd.read_parquet(features_importance_path)
        verbose and logger.info(f"Features Importance loaded")
        Prediction_Set = pd.read_parquet(prediction_set_path)
        verbose and logger.info(f"Prediction Set loaded")
        if return_shap_explainer:
            with shap_path.open("rb") as f:
                SHAP = joblib.load(f)
            verbose and logger.info(f"SHAP Explainer loaded")
        Label_Encoder = joblib.load(label_encoder_path)
        verbose and logger.info(f"Label Encoder loaded")
    return (
        Model,
        SHAP,
        Label_Encoder,
        Metrics,
        CV_Metrics,
        Features_Importance,
        Prediction_Set,
        HPO_Trials,
        HPO_Best,
    )

Brick Info

version v0.1.4
python 3.11, 3.12, 3.13
requirements
  • shap>=0.47.0
  • scikit-learn
  • pandas
  • numpy
  • lightgbm
  • torch
  • numba>=0.56.0
  • shap
  • cmaes
  • optuna
  • scipy
  • polars
  • xxhash