Clu. Agglomerative

Performs Hierarchical Agglomerative Clustering. Merges data points bottom-up.

Clu. Agglomerative

Processing

This brick groups your data points into clusters using Hierarchical Agglomerative Clustering. It builds a hierarchy of clusters by processing data from the "bottom up": it starts by treating each data point as its own cluster and then sequentially merges the pairs of clusters that are most similar.

This process continues until it reaches a specific number of clusters or a specific distance threshold that you define. This method is particularly useful when you want to understand the structure of your data or finding natural groupings without knowing the number of clusters in advance (when using the threshold mode). It automatically calculates validation metrics to help you judge the quality of the separation.

Inputs

data
The dataset you want to cluster. The data must contain only numerical values (integers, floats) or boolean values, as these are required to calculate the mathematical distances between points.

Inputs Types

Input Types
data DataFrame

You can check the list of supported types here: Available Type Hints.

Outputs

Clustered data
The original dataset with an additional column appended to it. This new column contains the ID (label) of the cluster assigned to each row.
Metrics
A collection of evaluation scores that measure the quality of the clustering. These metrics help you understand how distinct and well-separated the identified groups are.

The Metrics output contains the following specific data fields (keys or rows):

  • Silhouette Score: Ranges from -1 to 1. A high value indicates that the object is well matched to its own cluster and poorly matched to neighboring clusters.
  • Calinski-Harabasz: A higher score means clusters are dense and well separated. Also known as the Variance Ratio Criterion.
  • Davies-Bouldin: A lower score indicates better clustering. It measures the average similarity between each cluster and its most similar one.

Outputs Types

Output Types
Clustered data DataFrame
Metrics DataFrame, Dict

You can check the list of supported types here: Available Type Hints.

Options

The Clu. Agglomerative brick contains some changeable options:

Use Distance Threshold
Determines how the algorithm decides when to stop merging clusters. If toggled, the algorithm stops merging clusters when the distance between them exceeds the value defined in Distance Threshold. This allows the number of clusters to be determined automatically by the data structure. If not active, it stops when it reaches the specific number of clusters defined in Number of Clusters (k).
Number of Clusters (k)
The exact number of groups you want to find. This option is only used if Use Distance Threshold is turned off.
Distance Threshold
The linkage distance threshold above which clusters will not be merged. A lower value results in more clusters (stricter merging), while a higher value results in fewer clusters. This option is only used if Use Distance Threshold is turned on.
Linkage Criterion
Determines which distance to use between sets of observations. The algorithm merges the pairs of clusters that minimize this criterion.
  • Ward: Minimizes the variance of the clusters being merged. This usually creates clusters of roughly equal size.
  • Complete: Uses the maximum distance between all observations of the two sets. This tends to produce compact clusters.
  • Average: Uses the average of the distances of each observation of the two sets.
  • Single: Uses the minimum of the distances between all observations of the two sets. This can result in "chain-like" clusters.
Distance Metric
The method used to calculate the distance between individual data points.
  • Euclidean: Standard straight-line distance.
  • Manhattan: Sum of absolute differences (grid-like path).
  • Chebyshev: The greatest of their differences along any coordinate dimension.
  • Cosine: Measures the angle between vectors, focusing on orientation rather than magnitude.
Cluster Column Name
The name of the new column that will be added to your data containing the cluster labels.
Metrics Output Format
Chooses the data structure for the Metrics output.
Verbose
If enabled, detailed logs regarding the clustering process and score calculation will be printed to the console.
import logging
import pandas as pd
import polars as pl
import numpy as np
from scipy import sparse
from sklearn.cluster import AgglomerativeClustering
from sklearn.metrics import (
    silhouette_score,
    calinski_harabasz_score,
    davies_bouldin_score,
)
from coded_flows.types import DataFrame, Tuple, Union, Dict, Any
from coded_flows.utils import CodedFlowsLogger

logger = CodedFlowsLogger(name="Clu. Agglomerative", level=logging.INFO)


def _validate_numerical_data(data):
    """
    Validates if the input data contains only numerical (integer, float) or boolean values.
    """
    if sparse.issparse(data):
        if not (
            np.issubdtype(data.dtype, np.number) or np.issubdtype(data.dtype, np.bool_)
        ):
            raise TypeError(
                f"Sparse matrix contains unsupported data type: {data.dtype}."
            )
        return
    if isinstance(data, np.ndarray):
        if not (
            np.issubdtype(data.dtype, np.number) or np.issubdtype(data.dtype, np.bool_)
        ):
            raise TypeError(
                f"NumPy array contains unsupported data type: {data.dtype}."
            )
        return
    if isinstance(data, (pd.DataFrame, pd.Series)):
        if isinstance(data, pd.Series):
            if not (
                pd.api.types.is_numeric_dtype(data) or pd.api.types.is_bool_dtype(data)
            ):
                raise TypeError(f"Pandas Series '{data.name}' is not numerical.")
        else:
            numeric_cols = data.select_dtypes(include=["number", "bool"])
            if numeric_cols.shape[1] != data.shape[1]:
                invalid_cols = list(set(data.columns) - set(numeric_cols.columns))
                raise TypeError(
                    f"Pandas DataFrame contains non-numerical columns: {invalid_cols}."
                )
        return
    if isinstance(data, (pl.DataFrame, pl.Series)):
        if isinstance(data, pl.Series):
            if not (data.dtype.is_numeric() or data.dtype == pl.Boolean):
                raise TypeError(f"Polars Series '{data.name}' is not numerical.")
        else:
            for col_name, dtype in zip(data.columns, data.dtypes):
                if not (dtype.is_numeric() or dtype == pl.Boolean):
                    raise TypeError(
                        f"Polars DataFrame contains non-numerical column: '{col_name}'."
                    )
        return
    raise ValueError(f"Unsupported data type: {type(data)}")


def clu_agglomerative(
    data: DataFrame, options=None
) -> Tuple[DataFrame, Union[DataFrame, Dict]]:
    options = options or {}
    verbose = options.get("verbose", True)
    mode_use_threshold = options.get("mode_toggle", False)
    n_clusters_input = options.get("n_clusters", 2)
    dist_threshold_input = options.get("distance_threshold", 10.0)
    linkage = options.get("linkage", "ward").lower()
    metric = options.get("metric", "euclidean").lower()
    cluster_col_name = options.get("cluster_column", "cluster_id")
    metrics_as = options.get("metrics_as", "Dataframe")
    AUTOMATED_THRESHOLD = 10000
    Clustered_data = None
    Metrics = None
    try:
        verbose and logger.info("Initializing Clu. Agglomerative.")
        is_pandas = isinstance(data, pd.DataFrame)
        is_polars = isinstance(data, pl.DataFrame)
        if not (is_pandas or is_polars):
            raise ValueError("Input data must be a pandas or polars DataFrame.")
        verbose and logger.info("Validating numerical requirements...")
        _validate_numerical_data(data)
        n_samples = data.shape[0]
        verbose and logger.info(f"Processing {n_samples} samples.")
        if linkage == "ward" and metric != "euclidean":
            verbose and logger.warning(
                "Linkage 'ward' requires 'euclidean' metric. Overriding metric to 'euclidean'."
            )
            metric = "euclidean"
        final_n_clusters = None
        final_threshold = None
        if mode_use_threshold:
            final_n_clusters = None
            final_threshold = dist_threshold_input
            verbose and logger.info(
                f"Mode: Distance Threshold ({final_threshold}). Linkage: {linkage}, Metric: {metric}."
            )
        else:
            final_n_clusters = n_clusters_input
            final_threshold = None
            verbose and logger.info(
                f"Mode: Fixed Clusters (K={final_n_clusters}). Linkage: {linkage}, Metric: {metric}."
            )
        Model = AgglomerativeClustering(
            n_clusters=final_n_clusters,
            metric=metric,
            linkage=linkage,
            distance_threshold=final_threshold,
            compute_full_tree="auto",
        )
        verbose and logger.info("Fitting model and predicting labels...")
        labels = Model.fit_predict(data)
        n_clusters_found = len(set(labels))
        verbose and logger.info(f"Resulting number of clusters: {n_clusters_found}")
        if n_clusters_found < 2:
            verbose and logger.warning(
                "Only 1 cluster found. Metrics cannot be calculated. Returning NaNs."
            )
            (s_score, ch_score, db_score) = (np.nan, np.nan, np.nan)
        else:
            verbose and logger.info("Computing Intrinsic metrics.")
            if n_samples > AUTOMATED_THRESHOLD:
                s_score = silhouette_score(
                    data, labels, sample_size=min(n_samples, 20000), random_state=42
                )
                verbose and logger.info(f"Silhouette Score  : {s_score:.4f} (Sampled)")
            else:
                s_score = silhouette_score(data, labels)
                verbose and logger.info(f"Silhouette Score  : {s_score:.4f}")
            ch_score = calinski_harabasz_score(data, labels)
            verbose and logger.info(f"Calinski-Harabasz : {ch_score:.4f}")
            db_score = davies_bouldin_score(data, labels)
            verbose and logger.info(f"Davies-Bouldin    : {db_score:.4f}")
        metric_names = ["Silhouette Score", "Calinski-Harabasz", "Davies-Bouldin"]
        metric_values = [s_score, ch_score, db_score]
        if metrics_as == "Dataframe":
            Metrics = pd.DataFrame({"Metric": metric_names, "Value": metric_values})
        else:
            Metrics = dict(zip(metric_names, metric_values))
        if is_pandas:
            Clustered_data = data.copy()
            Clustered_data[cluster_col_name] = labels
            verbose and logger.info("Results assigned in-place to Pandas DataFrame.")
        elif is_polars:
            Clustered_data = data.with_columns(
                pl.Series(name=cluster_col_name, values=labels)
            )
            verbose and logger.info("Results attached to Polars DataFrame.")
    except Exception as e:
        verbose and logger.error(f"Clu. Agglomerative operation failed: {e}")
        raise
    return (Clustered_data, Metrics)

Brick Info

version v0.1.4
python 3.11, 3.12, 3.13
requirements
  • shap>=0.47.0
  • scikit-learn
  • pandas
  • numpy
  • scipy
  • numba>=0.56.0
  • polars