Clu. Spectral
Performs Spectral Clustering. Uses eigenvalues of the similarity matrix to reduce dimensionality before clustering.
Clu. Spectral
Processing
This brick groups your data points into clusters using Spectral Clustering. Unlike standard methods (like K-Means) that assume clusters are compact and spherical, Spectral Clustering is excellent for identifying complex, connected structures (like intertwined spirals, rings, or non-convex shapes).
It works by treating your data as a graph: points are nodes, and edges represent how "similar" they are. It analyzes the connectivity of this graph (using eigenvalues) to project the data into a lower-dimensional space where clusters are easier to separate. It then assigns a cluster label to each row in your dataset and calculates standard quality metrics.
Note: This algorithm is powerful but computationally expensive. It is best suited for small to medium-sized datasets (typically under 20,000 rows).
Inputs
- data
- The dataset you want to analyze. It must contain only numerical values (integers, floats, or booleans). Text or date columns must be removed or encoded before using this brick.
Inputs Types
| Input | Types |
|---|---|
data |
DataFrame |
You can check the list of supported types here: Available Type Hints.
Outputs
- Clustered data
- The original dataset with an additional column indicating the assigned group (Cluster ID) for each row.
- Metrics
- A summary of the clustering quality scores (Silhouette Score, Calinski-Harabasz, and Davies-Bouldin). This can be returned as a table or a list of key-value pairs depending on your settings.
Outputs Types
| Output | Types |
|---|---|
Clustered data |
DataFrame |
Metrics |
DataFrame, Dict |
You can check the list of supported types here: Available Type Hints.
Options
The Clu. Spectral brick contains some changeable options:
- Number of Clusters
- The specific number of groups you want the algorithm to find.
- Affinity (Similarity)
- Defines how the algorithm determines if two points are related.
- RBF: (Radial Basis Function) The default method. It uses a Gaussian kernel to measure similarity. Good for general cases.
- Nearest Neighbors: Constructs a graph based on the closest neighbors. Better for data with varying densities or sparse connections.
- Gamma (Kernel Coefficient)
- (Only used if Affinity is "RBF"). Controls how strictly "closeness" is defined. A higher Gamma means points must be very close to be considered similar; a lower Gamma creates a looser definition of similarity.
- Number of Neighbors
- (Only used if Affinity is "Nearest Neighbors"). The number of closest neighbors to consider when constructing the similarity graph.
- Label Assignment Strategy
- The method used to assign the final cluster labels after the dimensionality reduction step.
- KMeans: The standard approach. Fast and generally effective.
- Discretize: Often more stable and less sensitive to random initialization than K-Means in spectral settings.
- Cluster QR: A direct deterministic assignment method.
- Number of K-Means Inits
- (Only used if Label Assignment is "KMeans"). How many times the K-Means algorithm will run with different seeds to find the best result.
- Cluster Column Name
- The header name for the new column containing the cluster IDs (e.g., "cluster_id").
- Metrics Output Format
- Determines the structure of the
Metricsoutput. - Random Seed
- A number that ensures your results are reproducible. Using the same seed with the same data will always yield the same clusters.
- Verbose
- If enabled, detailed progress logs will be printed to the console.
import logging
import pandas as pd
import polars as pl
import numpy as np
from scipy import sparse
from sklearn.cluster import SpectralClustering
from sklearn.metrics import (
silhouette_score,
calinski_harabasz_score,
davies_bouldin_score,
)
from coded_flows.types import DataFrame, Tuple, Union, Dict, Any
from coded_flows.utils import CodedFlowsLogger
logger = CodedFlowsLogger(name="Clu. Spectral", level=logging.INFO)
def _validate_numerical_data(data):
"""
Validates if the input data contains only numerical (integer, float) or boolean values.
"""
if sparse.issparse(data):
if not (
np.issubdtype(data.dtype, np.number) or np.issubdtype(data.dtype, np.bool_)
):
raise TypeError(
f"Sparse matrix contains unsupported data type: {data.dtype}."
)
return
if isinstance(data, np.ndarray):
if not (
np.issubdtype(data.dtype, np.number) or np.issubdtype(data.dtype, np.bool_)
):
raise TypeError(
f"NumPy array contains unsupported data type: {data.dtype}."
)
return
if isinstance(data, (pd.DataFrame, pd.Series)):
if isinstance(data, pd.Series):
if not (
pd.api.types.is_numeric_dtype(data) or pd.api.types.is_bool_dtype(data)
):
raise TypeError(f"Pandas Series '{data.name}' is not numerical.")
else:
numeric_cols = data.select_dtypes(include=["number", "bool"])
if numeric_cols.shape[1] != data.shape[1]:
invalid_cols = list(set(data.columns) - set(numeric_cols.columns))
raise TypeError(
f"Pandas DataFrame contains non-numerical columns: {invalid_cols}."
)
return
if isinstance(data, (pl.DataFrame, pl.Series)):
if isinstance(data, pl.Series):
if not (data.dtype.is_numeric() or data.dtype == pl.Boolean):
raise TypeError(f"Polars Series '{data.name}' is not numerical.")
else:
for col_name, dtype in zip(data.columns, data.dtypes):
if not (dtype.is_numeric() or dtype == pl.Boolean):
raise TypeError(
f"Polars DataFrame contains non-numerical column: '{col_name}'."
)
return
raise ValueError(f"Unsupported data type: {type(data)}")
def clu_spectral(
data: DataFrame, options=None
) -> Tuple[DataFrame, Union[DataFrame, Dict]]:
options = options or {}
verbose = options.get("verbose", True)
n_clusters = options.get("n_clusters", 3)
affinity = options.get("affinity", "RBF").lower().replace(" ", "_")
assign_labels = options.get("assign_labels", "KMeans").lower().replace(" ", "_")
gamma = options.get("gamma", 1.0)
n_neighbors = options.get("n_neighbors", 10)
n_init = options.get("n_init", 10)
cluster_col_name = options.get("cluster_column", "cluster_id")
metrics_as = options.get("metrics_as", "Dataframe")
random_state = options.get("random_state", 42)
AUTOMATED_THRESHOLD = 10000
Clustered_data = None
Metrics = None
try:
verbose and logger.info(
f"Initializing Clu. Spectral (k={n_clusters}, affinity={affinity})."
)
is_pandas = isinstance(data, pd.DataFrame)
is_polars = isinstance(data, pl.DataFrame)
if not (is_pandas or is_polars):
raise ValueError("Input data must be a pandas or polars DataFrame.")
verbose and logger.info("Validating numerical requirements...")
_validate_numerical_data(data)
n_samples = data.shape[0]
verbose and logger.info(f"Processing {n_samples} samples.")
if n_samples > 20000:
verbose and logger.warning(
"Spectral clustering is very memory intensive on large datasets. Expect high resource usage."
)
Model = SpectralClustering(
n_clusters=n_clusters,
affinity=affinity,
gamma=gamma,
n_neighbors=n_neighbors,
assign_labels=assign_labels,
n_init=n_init,
random_state=random_state,
n_jobs=-1,
)
verbose and logger.info("Fitting model and predicting labels...")
labels = Model.fit_predict(data)
n_clusters_found = len(set(labels))
verbose and logger.info(f"Resulting number of clusters: {n_clusters_found}")
if n_clusters_found < 2:
verbose and logger.warning(
"Less than 2 clusters found. Metrics cannot be calculated. Returning NaNs."
)
(s_score, ch_score, db_score) = (np.nan, np.nan, np.nan)
else:
verbose and logger.info("Computing Intrinsic metrics.")
if n_samples > AUTOMATED_THRESHOLD:
s_score = silhouette_score(
data, labels, sample_size=min(n_samples, 20000), random_state=42
)
verbose and logger.info(f"Silhouette Score : {s_score:.4f} (Sampled)")
else:
s_score = silhouette_score(data, labels)
verbose and logger.info(f"Silhouette Score : {s_score:.4f}")
ch_score = calinski_harabasz_score(data, labels)
verbose and logger.info(f"Calinski-Harabasz : {ch_score:.4f}")
db_score = davies_bouldin_score(data, labels)
verbose and logger.info(f"Davies-Bouldin : {db_score:.4f}")
metric_names = ["Silhouette Score", "Calinski-Harabasz", "Davies-Bouldin"]
metric_values = [s_score, ch_score, db_score]
if metrics_as == "Dataframe":
Metrics = pd.DataFrame({"Metric": metric_names, "Value": metric_values})
else:
Metrics = dict(zip(metric_names, metric_values))
if is_pandas:
Clustered_data = data.copy()
Clustered_data[cluster_col_name] = labels
verbose and logger.info("Results assigned in-place to Pandas DataFrame.")
elif is_polars:
Clustered_data = data.with_columns(
pl.Series(name=cluster_col_name, values=labels)
)
verbose and logger.info("Results attached to Polars DataFrame.")
except Exception as e:
verbose and logger.error(f"Clu. Spectral operation failed: {e}")
raise
return (Clustered_data, Metrics)
Brick Info
- shap>=0.47.0
- scikit-learn
- pandas
- numpy
- scipy
- numba>=0.56.0
- polars