"""Spectral-descriptor clustering for brain surface meshes.
Spatial, temporal, and joint spatio-temporal clustering of HKS, WKS,
and fused descriptor matrices on triangle meshes and point clouds.
Every algorithm is input-agnostic (mesh or point cloud — only an
adjacency matrix is needed for spatially-regularised methods) and
exposes a ``backend`` parameter for CPU/GPU dispatch.
Sections
--------
§1 Result containers
§2 Distance / affinity construction
§3 Spatial clustering (cluster vertices by descriptor values)
§4 Temporal / scale clustering (cluster the t- or E-axis)
§5 Spatio-temporal joint clustering
§6 HKS + WKS descriptor fusion
§7 Bayesian cluster confirmation
§8 Cluster quality & comparison metrics
§9 Convenience / pipeline wrappers
§10 Persistence vineyards — tracking topology across HKS scales
§11 Mapper pipeline — topological data analysis on meshes
§12 Non-negative tensor decomposition (multi-subject)
§13 Joint time-vertex graph signal processing
§14 Scale-space blob tracking (Lindeberg on manifolds)
§15 Multi-view clustering (geometry + descriptor views)
§16 Spectral graph wavelet clustering
Design principles
-----------------
* **Backend-agnostic**: ``backend="auto"`` chooses GPU when available.
* **k-free where possible**: most methods auto-determine cluster count.
* **Progress bars**: all O(n²) or iterative routines expose Rich bars.
* **Memory-safe**: GPU paths use batched transfers; CPU paths use
``joblib`` parallelism where effective.
* **Reproducible**: every stochastic method accepts ``random_state``.
References
----------
Sun J, Ovsjanikov M, Guibas L. A concise and provably informative
multi-scale signature based on heat diffusion. *Computer Graphics
Forum* 28(5):1383–1392, 2009.
Aubry M, Schlickewei U, Cremers D. The wave kernel signature: a
quantum mechanical approach to shape analysis. *ICCV Workshops*,
2011.
Cai D, He X, Han J, Huang TS. Graph regularized nonnegative matrix
factorization for data representation. *IEEE TPAMI* 33(8):
1548–1560, 2011.
Campello RJGB, Moulavi D, Sander J. Density-based clustering based
on hierarchical density estimates. *PAKDD*, 2013.
Chazal F, Guibas LJ, Oudot SY, Skraba P. Persistence-based
clustering in Riemannian manifolds. *J. ACM* 60(6):41, 2013.
Dhillon IS. Co-clustering documents and words using bipartite
spectral graph partitioning. *KDD*, 2001.
"""
from __future__ import annotations
import warnings
from collections.abc import Sequence
from dataclasses import dataclass, field
from typing import (
Any,
Literal,
)
import numpy as np
import scipy.sparse as sp
from spectralbrain.runtime import (
DescriptorMatrix,
DistanceMatrix,
LabelArray,
ScalarMap,
SparseMatrix,
get_logger,
progress_simple,
)
logger = get_logger(__name__)
# ──────────────────────────────────────────────────────────────────────
# Lazy imports — every optional dependency is loaded on first use
# ──────────────────────────────────────────────────────────────────────
def _require_hdbscan():
"""Lazy-import HDBSCAN."""
try:
import hdbscan
return hdbscan
except ImportError:
raise ImportError("hdbscan required: pip install hdbscan")
def _require_sklearn_cluster():
"""Lazy-import sklearn.cluster."""
from sklearn import cluster as skc
return skc
def _require_sklearn_metrics():
"""Lazy-import sklearn.metrics."""
from sklearn import metrics as skm
return skm
def _require_sklearn_decomposition():
"""Lazy-import sklearn.decomposition."""
from sklearn import decomposition as skd
return skd
def _require_sklearn_mixture():
"""Lazy-import sklearn.mixture."""
from sklearn import mixture as skmix
return skmix
def _require_umap():
"""Lazy-import UMAP."""
try:
import umap
return umap
except ImportError:
raise ImportError("umap-learn required: pip install umap-learn")
def _require_leidenalg():
"""Lazy-import leidenalg."""
try:
import igraph
import leidenalg
return leidenalg, igraph
except ImportError:
raise ImportError("leidenalg + igraph required: pip install leidenalg python-igraph")
def _require_gudhi():
"""Lazy-import GUDHI."""
try:
import gudhi
return gudhi
except ImportError:
raise ImportError("gudhi required: pip install gudhi")
def _require_tslearn():
"""Lazy-import tslearn."""
try:
import tslearn
return tslearn
except ImportError:
raise ImportError("tslearn required: pip install tslearn")
def _require_skfda():
"""Lazy-import scikit-fda."""
try:
import skfda
return skfda
except ImportError:
raise ImportError("scikit-fda required: pip install scikit-fda")
def _require_pymc():
"""Lazy-import PyMC for Bayesian clustering."""
try:
import arviz as az
import pymc as pm
return pm, az
except ImportError:
raise ImportError("pymc + arviz required: pip install pymc arviz")
def _require_torch():
"""Lazy-import PyTorch."""
try:
import torch
return torch
except ImportError:
raise ImportError("PyTorch required: pip install torch")
def _require_joblib():
"""Lazy-import joblib for parallelisation."""
try:
import joblib
return joblib
except ImportError:
raise ImportError("joblib required: pip install joblib")
def _require_dionysus():
"""Lazy-import dionysus2 for persistent homology."""
try:
import dionysus
return dionysus
except ImportError:
raise ImportError("dionysus required: pip install dionysus")
def _require_kepler_mapper():
"""Lazy-import KeplerMapper for TDA."""
try:
import kmapper
return kmapper
except ImportError:
raise ImportError("kepler-mapper required: pip install kmapper")
def _require_tensorly():
"""Lazy-import TensorLy for tensor decomposition."""
try:
import tensorly
import tensorly.decomposition
return tensorly
except ImportError:
raise ImportError("tensorly required: pip install tensorly")
def _require_pygsp():
"""Lazy-import PyGSP for graph signal processing."""
try:
import pygsp
return pygsp
except ImportError:
raise ImportError("PyGSP required: pip install PyGSP")
# ======================================================================
# §1 RESULT CONTAINERS
# ======================================================================
[docs]
@dataclass
class ClusterResult:
"""Output of any spatial or spatio-temporal clustering algorithm.
Attributes
----------
labels : ndarray, shape (N,)
Integer cluster label per vertex. ``-1`` = noise / unassigned.
n_clusters : int
Number of discovered clusters (excluding noise).
method : str
Algorithm name (e.g. ``"hdbscan"``, ``"gnmf"``, ``"leiden"``).
probabilities : ndarray or None, shape (N,) or (N, K)
Soft membership. Shape depends on method:
``(N,)`` for HDBSCAN membership probability,
``(N, K)`` for NMF / DPMM component weights.
quality : dict
Internal quality metrics (silhouette, modularity, etc.).
metadata : dict
Algorithm-specific outputs (condensed tree, persistence
diagram, temporal profiles, etc.).
"""
labels: LabelArray
n_clusters: int
method: str
probabilities: np.ndarray | None = None
quality: dict[str, float] = field(default_factory=dict)
metadata: dict[str, Any] = field(default_factory=dict)
@property
def noise_count(self) -> int:
"""Return the number of noise (unclustered) points."""
return int((self.labels == -1).sum())
@property
def cluster_sizes(self) -> dict[int, int]:
"""Return a dict mapping cluster label to member count."""
unique, counts = np.unique(self.labels[self.labels >= 0], return_counts=True)
return dict(zip(unique.tolist(), counts.tolist()))
def __repr__(self) -> str:
"""Return a compact clustering result summary."""
return (
f"ClusterResult(method='{self.method}', "
f"n_clusters={self.n_clusters}, "
f"noise={self.noise_count}, "
f"quality={self.quality})"
)
[docs]
@dataclass
class TemporalClusterResult:
"""Output of temporal/scale-axis clustering.
Clusters the T time/energy samples rather than the N vertices.
Attributes
----------
labels : ndarray, shape (T,)
Cluster label per time/energy sample.
n_clusters : int
centroids : ndarray or None, shape (K, N)
Centroid profiles (one per cluster × all vertices).
method : str
quality : dict
"""
labels: LabelArray
n_clusters: int
method: str
centroids: np.ndarray | None = None
quality: dict[str, float] = field(default_factory=dict)
metadata: dict[str, Any] = field(default_factory=dict)
[docs]
@dataclass
class FusionResult:
"""Output of HKS + WKS descriptor fusion.
Attributes
----------
fused : ndarray, shape (N, D)
Fused per-vertex descriptor matrix.
method : str
Fusion strategy name.
weights : ndarray or None
Per-channel or per-kernel weights (for MKL / learned fusions).
metadata : dict
"""
fused: DescriptorMatrix
method: str
weights: np.ndarray | None = None
metadata: dict[str, Any] = field(default_factory=dict)
[docs]
@dataclass
class BayesianClusterConfirmation:
"""Output of Bayesian cluster confirmation analysis.
Attributes
----------
posterior_labels : ndarray, shape (N,)
MAP cluster assignments from the Bayesian model.
label_probabilities : ndarray, shape (N, K)
Full posterior membership probabilities.
waic : float
Widely Applicable Information Criterion.
loo : float
Leave-One-Out cross-validation estimate (ELPD).
cluster_credible_intervals : dict
Per-cluster posterior summaries (mean, HDI of centroid).
agreement_with_input : float
ARI between input labels and Bayesian MAP labels.
metadata : dict
"""
posterior_labels: LabelArray
label_probabilities: np.ndarray
waic: float
loo: float
cluster_credible_intervals: dict[int, dict[str, Any]]
agreement_with_input: float
metadata: dict[str, Any] = field(default_factory=dict)
[docs]
@dataclass
class VineyardResult:
"""Output of persistence vineyard tracking across HKS scales.
Attributes
----------
vines : list of ndarray
Each vine is an array of (t, birth, death) triples tracing
one topological feature across scales.
diagrams : dict
Persistence diagram at each sampled t, keyed by t index.
salient_features : list of dict
Features that persist across ≥ ``min_life`` fraction of the
t-range, with their birth/death scale and spatial location.
scale_of_emergence : ndarray, shape (n_features,)
The t at which each salient feature first appears.
metadata : dict
"""
vines: list[np.ndarray]
diagrams: dict[int, np.ndarray]
salient_features: list[dict[str, Any]]
scale_of_emergence: np.ndarray
metadata: dict[str, Any] = field(default_factory=dict)
[docs]
@dataclass
class MapperResult:
"""Output of the Mapper pipeline on a descriptor-equipped mesh.
Attributes
----------
nerve_graph : dict
Adjacency list of the nerve complex (node → set of neighbours).
node_membership : dict
Mapping from nerve-node index to list of vertex indices.
n_nodes : int
Number of nodes in the nerve complex.
n_edges : int
Number of edges.
vertex_to_nodes : ndarray, shape (N,)
For each vertex, the nerve node(s) it belongs to (first hit).
metadata : dict
Contains the full ``kmapper.KeplerMapper`` graph object for
downstream visualisation.
"""
nerve_graph: dict[int, list[int]]
node_membership: dict[int, list[int]]
n_nodes: int
n_edges: int
vertex_to_nodes: np.ndarray
metadata: dict[str, Any] = field(default_factory=dict)
[docs]
@dataclass
class TensorDecompositionResult:
"""Output of non-negative tensor CP/PARAFAC or Tucker decomposition.
For a tensor ℋ ∈ ℝ^{n × T × S} (vertices × scales × subjects):
Attributes
----------
spatial_factors : ndarray, shape (N, R)
Shared spatial components (vertex loadings).
temporal_factors : ndarray, shape (T, R)
Population-level temporal/scale profiles.
subject_factors : ndarray, shape (S, R)
Per-subject component strengths.
labels : ndarray, shape (N,)
Hard cluster labels from argmax of spatial factors.
n_components : int
reconstruction_error : float
metadata : dict
"""
spatial_factors: np.ndarray
temporal_factors: np.ndarray
subject_factors: np.ndarray
labels: LabelArray
n_components: int
reconstruction_error: float
metadata: dict[str, Any] = field(default_factory=dict)
[docs]
@dataclass
class ScaleSpaceBlobResult:
"""Output of Lindeberg-style scale-space blob tracking on HKS.
Attributes
----------
blob_trajectories : list of list of dict
Each trajectory is a list of {vertex, scale_index, t_value,
response} dicts ordered by scale.
natural_scales : ndarray, shape (N,)
For each vertex, the t at which the normalised LoG response
is maximal (its "natural scale").
blob_labels : ndarray, shape (N,)
Cluster label derived from trajectory membership.
n_blobs : int
metadata : dict
"""
blob_trajectories: list[list[dict[str, Any]]]
natural_scales: np.ndarray
blob_labels: LabelArray
n_blobs: int
metadata: dict[str, Any] = field(default_factory=dict)
# ======================================================================
# §2 DISTANCE / AFFINITY CONSTRUCTION
# ======================================================================
[docs]
def build_descriptor_distance(
H: DescriptorMatrix,
*,
metric: Literal["euclidean", "cosine", "correlation", "manhattan"] = "euclidean",
log_transform: bool = True,
normalize: Literal["none", "l1", "l2"] = "l1",
backend: Literal["auto", "cpu", "gpu"] = "auto",
) -> DistanceMatrix:
"""Build pairwise distance matrix from a descriptor matrix.
Parameters
----------
H : ndarray, shape (N, T)
Per-vertex descriptor (HKS, WKS, or fused).
metric : str
Distance metric in feature space.
log_transform : bool
Apply log(H + ε) before distance computation. Recommended
for HKS because values span many orders of magnitude.
normalize : str
Per-row normalisation after optional log transform.
backend : str
``"auto"`` selects GPU if torch+CUDA available.
Returns
-------
ndarray, shape (N, N)
Symmetric pairwise distance matrix.
"""
H = np.asarray(H, dtype=np.float64)
_n, _t = H.shape
# --- preprocessing ---
X = np.log(H + 1e-12) if log_transform else H.copy()
if normalize == "l1":
row_sums = np.abs(X).sum(axis=1, keepdims=True)
row_sums[row_sums == 0] = 1.0
X /= row_sums
elif normalize == "l2":
row_norms = np.linalg.norm(X, axis=1, keepdims=True)
row_norms[row_norms == 0] = 1.0
X /= row_norms
# --- compute distances ---
use_gpu = _resolve_backend(backend)
if use_gpu:
torch = _require_torch()
device = torch.device("cuda")
X_t = torch.tensor(X, dtype=torch.float32, device=device)
if metric == "euclidean":
D_t = torch.cdist(X_t.unsqueeze(0), X_t.unsqueeze(0)).squeeze(0)
elif metric == "cosine":
X_norm = X_t / (X_t.norm(dim=1, keepdim=True) + 1e-12)
D_t = 1.0 - X_norm @ X_norm.T
D_t.clamp_(min=0.0)
elif metric == "correlation":
X_c = X_t - X_t.mean(dim=1, keepdim=True)
X_cn = X_c / (X_c.norm(dim=1, keepdim=True) + 1e-12)
D_t = 1.0 - X_cn @ X_cn.T
D_t.clamp_(min=0.0)
elif metric == "manhattan":
D_t = torch.cdist(X_t.unsqueeze(0), X_t.unsqueeze(0), p=1.0).squeeze(0)
else:
raise ValueError(f"Unknown metric: {metric}")
D = D_t.cpu().numpy().astype(np.float64)
del X_t, D_t
torch.cuda.empty_cache()
else:
from scipy.spatial.distance import pdist, squareform
D = squareform(pdist(X, metric=metric))
np.fill_diagonal(D, 0.0)
return D
[docs]
def build_hybrid_distance(
D_descriptor: DistanceMatrix,
adjacency: SparseMatrix,
*,
alpha: float = 0.7,
geodesic_backend: Literal["heat", "dijkstra"] = "dijkstra",
) -> DistanceMatrix:
"""Fuse descriptor distance with geodesic distance on the mesh.
Parameters
----------
D_descriptor : ndarray, shape (N, N)
Pairwise descriptor-space distance.
adjacency : sparse, shape (N, N)
Mesh adjacency (cotangent weights or binary).
alpha : float
Weight for descriptor distance. ``0.0`` = pure geodesic,
``1.0`` = pure descriptor.
geodesic_backend : str
``"dijkstra"`` via scipy or ``"heat"`` via potpourri3d.
Returns
-------
ndarray, shape (N, N)
Normalised fused distance.
"""
D_descriptor.shape[0]
# --- geodesic distance from adjacency ---
if geodesic_backend == "dijkstra":
from scipy.sparse.csgraph import shortest_path
A = sp.csr_matrix(adjacency).copy()
# ensure positive weights for Dijkstra
A.data = np.abs(A.data)
A.data[A.data == 0] = 1e-12
D_geo = shortest_path(A, method="D", directed=False)
elif geodesic_backend == "heat":
try:
import potpourri3d # noqa: F401 -- availability probe
except ImportError:
raise ImportError(
"potpourri3d required for heat-method geodesics: pip install potpourri3d"
)
raise NotImplementedError(
"Heat-method geodesics require (vertices, faces). "
"Use geodesic_backend='dijkstra' with adjacency, "
"or build the distance matrix externally."
)
else:
raise ValueError(f"Unknown geodesic_backend: {geodesic_backend}")
# --- normalise both to unit median ---
med_desc = np.median(D_descriptor[D_descriptor > 0]) or 1.0
med_geo = np.median(D_geo[D_geo > 0]) or 1.0
D_fused = alpha * (D_descriptor / med_desc) + (1.0 - alpha) * (D_geo / med_geo)
np.fill_diagonal(D_fused, 0.0)
return D_fused
[docs]
def build_hks_affinity_graph(
H: DescriptorMatrix,
adjacency: SparseMatrix,
*,
sigma: float | None = None,
log_transform: bool = True,
) -> SparseMatrix:
"""Build HKS-weighted mesh adjacency for graph-based clustering.
Each mesh edge (i, j) gets weight
``w_ij = c_ij · exp(-||h_i - h_j||² / 2σ²)``
where ``c_ij`` is the original cotangent weight and ``h`` is
the (optionally log-transformed, L1-normalised) HKS vector.
Parameters
----------
H : ndarray, shape (N, T)
Descriptor matrix.
adjacency : sparse, shape (N, N)
Mesh Laplacian or adjacency with cotangent weights.
sigma : float or None
Kernel bandwidth. If None, uses the median edge-HKS-distance.
log_transform : bool
Apply log(H + ε) before computing feature distances.
Returns
-------
sparse, shape (N, N)
Weighted adjacency (CSR).
"""
A = sp.coo_matrix(adjacency)
rows, cols = A.row, A.col
base_weights = np.abs(A.data)
X = np.log(H + 1e-12) if log_transform else H.copy()
row_sums = np.abs(X).sum(axis=1, keepdims=True)
row_sums[row_sums == 0] = 1.0
X /= row_sums
# feature distances along mesh edges
d_feat = np.linalg.norm(X[rows] - X[cols], axis=1)
if sigma is None:
sigma = float(np.median(d_feat[d_feat > 0])) or 1.0
affinity_weights = base_weights * np.exp(-(d_feat**2) / (2 * sigma**2))
W = sp.csr_matrix((affinity_weights, (rows, cols)), shape=adjacency.shape)
# symmetrise
W = (W + W.T) / 2.0
return W
# ======================================================================
# §3 SPATIAL CLUSTERING
# ======================================================================
[docs]
def cluster_hdbscan(
H: DescriptorMatrix,
*,
adjacency: SparseMatrix | None = None,
alpha_fusion: float = 0.7,
min_cluster_size: int = 200,
min_samples: int = 10,
cluster_selection_method: Literal["eom", "leaf"] = "eom",
metric: Literal["euclidean", "precomputed"] = "euclidean",
dim_reduction: Literal["umap", "pca"] | None = "umap",
n_components: int = 8,
log_transform: bool = True,
random_state: int = 42,
backend: Literal["auto", "cpu", "gpu"] = "auto",
) -> ClusterResult:
"""HDBSCAN clustering on spectral descriptor features.
Density-based clustering that automatically determines the number
of clusters. When an adjacency matrix is provided, uses a fused
geodesic + descriptor distance for spatially coherent parcellation.
Parameters
----------
H : ndarray, shape (N, T)
Per-vertex descriptor matrix (HKS, WKS, or fused).
adjacency : sparse or None
Mesh adjacency for hybrid distance fusion. If None, clusters
purely in descriptor feature space.
alpha_fusion : float
Weight for descriptor distance in the hybrid metric.
min_cluster_size : int
Minimum cluster size for HDBSCAN.
min_samples : int
Core-distance neighbourhood size.
cluster_selection_method : str
``"eom"`` (excess of mass) or ``"leaf"``.
metric : str
If ``"precomputed"``, H is treated as a distance matrix.
dim_reduction : str or None
Reduce descriptor dimensionality before clustering.
n_components : int
Target dimensionality for reduction.
log_transform : bool
Apply log(H + ε) before processing.
random_state : int
Seed for reproducibility.
backend : str
``"auto"`` / ``"cpu"`` / ``"gpu"`` for distance computation.
Returns
-------
ClusterResult
With ``outlier_scores`` in metadata.
"""
hdbscan = _require_hdbscan()
H = np.asarray(H, dtype=np.float64)
n = H.shape[0]
if metric == "precomputed":
D = H
else:
# --- preprocess ---
X = np.log(H + 1e-12) if log_transform else H.copy()
row_sums = np.abs(X).sum(axis=1, keepdims=True)
row_sums[row_sums == 0] = 1.0
X /= row_sums
# --- dimensionality reduction ---
if dim_reduction == "umap" and X.shape[1] > n_components:
umap_mod = _require_umap()
X = umap_mod.UMAP(
n_components=n_components,
metric="cosine",
n_neighbors=min(30, n - 1),
min_dist=0.0,
random_state=random_state,
).fit_transform(X)
elif dim_reduction == "pca" and X.shape[1] > n_components:
skd = _require_sklearn_decomposition()
X = skd.PCA(
n_components=n_components,
random_state=random_state,
).fit_transform(X)
# --- fused or pure distance ---
if adjacency is not None:
D_desc = build_descriptor_distance(
X,
metric="euclidean",
log_transform=False,
normalize="none",
backend=backend,
)
D = build_hybrid_distance(
D_desc,
adjacency,
alpha=alpha_fusion,
)
use_metric = "precomputed"
else:
D = X
use_metric = "euclidean"
actual_metric = use_metric if adjacency is not None else "euclidean"
if metric == "precomputed":
actual_metric = "precomputed"
clusterer = hdbscan.HDBSCAN(
min_cluster_size=min_cluster_size,
min_samples=min_samples,
metric=actual_metric,
cluster_selection_method=cluster_selection_method,
)
clusterer.fit(D)
labels = clusterer.labels_
n_clusters = len(set(labels)) - (1 if -1 in labels else 0)
quality = {}
if n_clusters >= 2 and np.sum(labels >= 0) > n_clusters:
skm = _require_sklearn_metrics()
valid = labels >= 0
if actual_metric == "precomputed":
quality["silhouette"] = float(
skm.silhouette_score(D[np.ix_(valid, valid)], labels[valid], metric="precomputed")
)
else:
quality["silhouette"] = float(skm.silhouette_score(D[valid], labels[valid]))
return ClusterResult(
labels=labels.astype(np.int64),
n_clusters=n_clusters,
method="hdbscan",
probabilities=clusterer.probabilities_,
quality=quality,
metadata={
"outlier_scores": clusterer.outlier_scores_,
"condensed_tree": (
clusterer.condensed_tree_ if hasattr(clusterer, "condensed_tree_") else None
),
"min_cluster_size": min_cluster_size,
"min_samples": min_samples,
},
)
[docs]
def cluster_leiden(
adjacency_or_H: SparseMatrix | DescriptorMatrix,
*,
H: DescriptorMatrix | None = None,
resolution: float = 1.0,
quality_function: Literal["modularity", "cpm"] = "modularity",
n_iterations: int = -1,
random_state: int = 42,
sigma: float | None = None,
) -> ClusterResult:
"""Leiden community detection on mesh graph.
Accepts either (a) a pre-built weighted adjacency matrix or
(b) a raw descriptor matrix ``adjacency_or_H`` plus mesh
adjacency ``H`` (confusing naming avoided by keyword use).
Parameters
----------
adjacency_or_H : sparse or ndarray
If sparse: the weighted adjacency/affinity graph.
If ndarray (N, T): descriptor matrix (requires ``H=None``
and will build k-NN graph from descriptors).
H : ndarray or None
If adjacency_or_H is sparse and H is provided, weights
are multiplied by HKS affinity.
resolution : float
Resolution parameter γ. Higher = more clusters.
quality_function : str
``"modularity"`` (RBConfiguration) or ``"cpm"`` (CPM).
n_iterations : int
Leiden iterations. ``-1`` = iterate until stable.
random_state : int
sigma : float or None
Bandwidth for HKS affinity kernel (if H provided).
Returns
-------
ClusterResult
With ``modularity`` in quality dict.
"""
la, ig = _require_leidenalg()
# --- build the graph ---
if sp.issparse(adjacency_or_H):
A = sp.csr_matrix(adjacency_or_H)
if H is not None:
A = build_hks_affinity_graph(H, A, sigma=sigma)
else:
# descriptor matrix → build k-NN affinity graph
from sklearn.neighbors import kneighbors_graph
X = np.asarray(adjacency_or_H, dtype=np.float64)
k = min(30, X.shape[0] - 1)
A = kneighbors_graph(X, n_neighbors=k, mode="distance")
# convert distance to similarity
sigma_knn = np.median(A.data) or 1.0
A.data = np.exp(-(A.data**2) / (2 * sigma_knn**2))
A = (A + A.T) / 2.0
A_coo = sp.coo_matrix(A)
# build igraph
edges = list(zip(A_coo.row.tolist(), A_coo.col.tolist()))
weights = np.abs(A_coo.data).tolist()
g = ig.Graph(n=A.shape[0], edges=edges, directed=False)
g.es["weight"] = weights
# remove self-loops and multi-edges
g.simplify(combine_edges="max")
if quality_function == "modularity":
partition = la.find_partition(
g,
la.RBConfigurationVertexPartition,
weights="weight",
resolution_parameter=resolution,
n_iterations=n_iterations,
seed=random_state,
)
elif quality_function == "cpm":
partition = la.find_partition(
g,
la.CPMVertexPartition,
weights="weight",
resolution_parameter=resolution,
n_iterations=n_iterations,
seed=random_state,
)
else:
raise ValueError(f"Unknown quality_function: {quality_function}")
labels = np.array(partition.membership, dtype=np.int64)
n_clusters = len(set(labels))
return ClusterResult(
labels=labels,
n_clusters=n_clusters,
method="leiden",
quality={
"modularity": float(partition.modularity),
"quality": float(partition.quality()),
},
metadata={
"resolution": resolution,
"quality_function": quality_function,
},
)
[docs]
def cluster_gnmf(
H: DescriptorMatrix,
adjacency: SparseMatrix,
*,
n_components: int = 8,
lam: float = 1.0,
sparsity_alpha: float = 0.0,
n_iter: int = 300,
tol: float = 1e-5,
random_state: int = 42,
backend: Literal["auto", "cpu", "gpu"] = "auto",
) -> ClusterResult:
"""Graph-Regularised Non-negative Matrix Factorisation (GNMF).
Decomposes the non-negative descriptor matrix H ≈ W·F subject to
a Laplacian smoothness penalty on the spatial factor W, so that
mesh-adjacent vertices receive similar component activations.
.. math::
\\min_{W,F \\geq 0} \\frac{1}{2}\\|H - WF\\|_F^2
+ \\lambda \\operatorname{tr}(W^T L W)
+ \\alpha \\|W\\|_1
Parameters
----------
H : ndarray, shape (N, T)
Non-negative descriptor matrix (HKS is always ≥ 0).
adjacency : sparse, shape (N, N)
Mesh adjacency (cotangent Laplacian or its negative off-diag).
n_components : int
Number of spatial components (≈ cluster count).
lam : float
Laplacian smoothness weight λ.
sparsity_alpha : float
ℓ₁ sparsity on W.
n_iter : int
Maximum multiplicative update iterations.
tol : float
Relative objective change for convergence.
random_state : int
backend : str
Returns
-------
ClusterResult
With ``W`` (spatial), ``F`` (temporal profiles) in metadata.
"""
H = np.asarray(H, dtype=np.float64)
if H.min() < 0:
warnings.warn(
"GNMF requires non-negative input. Shifting H by min value.",
stacklevel=2,
)
H = H - H.min()
_n, _T = H.shape
rng = np.random.default_rng(random_state)
# --- build adjacency and degree ---
A_aff = sp.csr_matrix(adjacency, dtype=np.float64)
# extract off-diagonal as positive affinity
A_off = -A_aff.copy()
A_off.setdiag(0)
A_off.eliminate_zeros()
A_off.data = np.abs(A_off.data)
D_diag = np.asarray(A_off.sum(axis=1)).flatten()
use_gpu = _resolve_backend(backend)
if use_gpu:
return _gnmf_gpu(H, A_off, D_diag, n_components, lam, sparsity_alpha, n_iter, tol, rng)
else:
return _gnmf_cpu(H, A_off, D_diag, n_components, lam, sparsity_alpha, n_iter, tol, rng)
def _gnmf_cpu(H, A_off, D_diag, r, lam, alpha, n_iter, tol, rng):
"""CPU implementation of GNMF with multiplicative updates."""
n, T = H.shape
eps = 1e-12
W = rng.random((n, r)).astype(np.float64) + 0.1
F = rng.random((r, T)).astype(np.float64) + 0.1
prev_obj = np.inf
with progress_simple("GNMF", total=n_iter) as update:
for it in range(n_iter):
# --- update F ---
numerator_F = W.T @ H # (r, T)
denominator_F = W.T @ W @ F + eps # (r, T)
F *= numerator_F / denominator_F
# --- update W ---
AW = A_off @ W # (n, r)
DW = D_diag[:, None] * W # (n, r)
numerator_W = H @ F.T + lam * AW # (n, r)
denominator_W = W @ (F @ F.T) + lam * DW + alpha + eps
W *= numerator_W / denominator_W
# --- convergence ---
if it % 10 == 0:
obj = (
0.5 * np.linalg.norm(H - W @ F, "fro") ** 2
+ lam * np.trace(W.T @ (D_diag[:, None] * W - A_off @ W))
+ alpha * np.abs(W).sum()
)
if abs(prev_obj - obj) / (abs(prev_obj) + eps) < tol:
update(n_iter - it)
break
prev_obj = obj
update(1)
labels = W.argmax(axis=1).astype(np.int64)
n_clusters = len(np.unique(labels))
# soft probabilities
W_norm = W / (W.sum(axis=1, keepdims=True) + eps)
return ClusterResult(
labels=labels,
n_clusters=n_clusters,
method="gnmf",
probabilities=W_norm,
quality={},
metadata={
"W": W,
"F": F,
"n_components": int(W.shape[1]),
"lambda": lam,
},
)
def _gnmf_gpu(H, A_off, D_diag, r, lam, alpha, n_iter, tol, rng):
"""GPU-accelerated GNMF via PyTorch."""
torch = _require_torch()
device = torch.device("cuda")
eps = 1e-12
n, T = H.shape
H_t = torch.tensor(H, dtype=torch.float32, device=device)
W_t = torch.tensor(rng.random((n, r)).astype(np.float32) + 0.1, device=device)
F_t = torch.tensor(rng.random((r, T)).astype(np.float32) + 0.1, device=device)
# sparse adjacency on GPU
A_coo = sp.coo_matrix(A_off)
indices = torch.tensor(np.vstack([A_coo.row, A_coo.col]), dtype=torch.long, device=device)
values = torch.tensor(A_coo.data, dtype=torch.float32, device=device)
A_t = torch.sparse_coo_tensor(indices, values, A_off.shape).coalesce()
D_t = torch.tensor(D_diag, dtype=torch.float32, device=device)
prev_obj = float("inf")
with progress_simple("GNMF [GPU]", total=n_iter) as update:
for it in range(n_iter):
# update F
num_F = W_t.T @ H_t
den_F = W_t.T @ W_t @ F_t + eps
F_t *= num_F / den_F
# update W
AW = torch.sparse.mm(A_t, W_t)
DW = D_t.unsqueeze(1) * W_t
num_W = H_t @ F_t.T + lam * AW
den_W = W_t @ (F_t @ F_t.T) + lam * DW + alpha + eps
W_t *= num_W / den_W
if it % 10 == 0:
residual = torch.norm(H_t - W_t @ F_t).item() ** 2
obj = 0.5 * residual
if abs(prev_obj - obj) / (abs(prev_obj) + eps) < tol:
update(n_iter - it)
break
prev_obj = obj
update(1)
W = W_t.cpu().numpy().astype(np.float64)
F = F_t.cpu().numpy().astype(np.float64)
del H_t, W_t, F_t, A_t, D_t
torch.cuda.empty_cache()
labels = W.argmax(axis=1).astype(np.int64)
n_clusters = len(np.unique(labels))
W_norm = W / (W.sum(axis=1, keepdims=True) + 1e-12)
return ClusterResult(
labels=labels,
n_clusters=n_clusters,
method="gnmf",
probabilities=W_norm,
quality={},
metadata={"W": W, "F": F, "n_components": int(r), "lambda": lam},
)
[docs]
def cluster_dpmm(
H: DescriptorMatrix,
*,
adjacency: SparseMatrix | None = None,
max_components: int = 25,
dim_reduction: Literal["umap", "pca"] | None = "pca",
n_components_reduce: int = 8,
log_transform: bool = True,
random_state: int = 42,
backend: Literal["variational", "pymc"] = "variational",
mrf_beta: float = 0.0,
) -> ClusterResult:
"""Dirichlet Process Mixture Model — automatic cluster count.
Parameters
----------
H : ndarray, shape (N, T)
Descriptor matrix.
adjacency : sparse or None
Mesh adjacency for MRF spatial prior (only with pymc backend).
max_components : int
Truncation level for variational DP.
dim_reduction : str or None
Reduce dimensionality before fitting.
n_components_reduce : int
Target dimensionality.
log_transform : bool
random_state : int
backend : str
``"variational"`` uses sklearn BayesianGaussianMixture (fast).
``"pymc"`` uses full MCMC with optional MRF prior (slow, rich).
mrf_beta : float
Potts MRF coupling strength (pymc backend only).
Returns
-------
ClusterResult
"""
H = np.asarray(H, dtype=np.float64)
# --- preprocess ---
X = np.log(H + 1e-12) if log_transform else H.copy()
row_sums = np.abs(X).sum(axis=1, keepdims=True)
row_sums[row_sums == 0] = 1.0
X /= row_sums
if dim_reduction == "pca" and X.shape[1] > n_components_reduce:
skd = _require_sklearn_decomposition()
X = skd.PCA(
n_components=n_components_reduce,
random_state=random_state,
).fit_transform(X)
elif dim_reduction == "umap" and X.shape[1] > n_components_reduce:
umap_mod = _require_umap()
X = umap_mod.UMAP(
n_components=n_components_reduce,
random_state=random_state,
).fit_transform(X)
if backend == "variational":
return _dpmm_variational(X, max_components, random_state)
elif backend == "pymc":
return _dpmm_pymc(X, adjacency, max_components, mrf_beta, random_state)
else:
raise ValueError(f"Unknown DPMM backend: {backend}")
def _dpmm_variational(X, K, seed):
"""Fast variational Bayesian GMM with DP prior via sklearn."""
skmix = _require_sklearn_mixture()
skm = _require_sklearn_metrics()
model = skmix.BayesianGaussianMixture(
n_components=K,
covariance_type="full",
weight_concentration_prior_type="dirichlet_process",
weight_concentration_prior=1e-3,
max_iter=500,
random_state=seed,
n_init=3,
)
model.fit(X)
labels = model.predict(X).astype(np.int64)
probs = model.predict_proba(X)
# count active components
active = np.unique(labels)
n_clusters = len(active)
# relabel to 0..K-1
relabel_map = {old: new for new, old in enumerate(sorted(active))}
labels = np.array([relabel_map[l] for l in labels], dtype=np.int64)
probs = probs[:, sorted(active)]
quality = {}
if n_clusters >= 2:
quality["silhouette"] = float(skm.silhouette_score(X, labels))
quality["bic_lower_bound"] = float(model.lower_bound_)
return ClusterResult(
labels=labels,
n_clusters=n_clusters,
method="dpmm_variational",
probabilities=probs,
quality=quality,
metadata={
"means": model.means_,
"weights": model.weights_,
"covariances": model.covariances_,
},
)
def _dpmm_pymc(X, adjacency, K, mrf_beta, seed):
"""Full Bayesian DPMM with optional MRF spatial prior via PyMC."""
pm, _az = _require_pymc()
import pytensor.tensor as pt
n, d = X.shape
with pm.Model() as model:
alpha = pm.Gamma("alpha", 1.0, 1.0)
beta_sticks = pm.Beta("beta_sticks", 1.0, alpha, shape=K)
w = pm.Deterministic(
"w",
beta_sticks * pt.concatenate([pt.ones(1), pt.cumprod(1.0 - beta_sticks)[:-1]]),
)
mu = pm.Normal("mu", 0.0, 5.0, shape=(K, d))
sigma = pm.HalfNormal("sigma", 1.0, shape=(K, d))
comp_dists = [pm.Normal.dist(mu=mu[k], sigma=sigma[k], shape=d) for k in range(K)]
pm.Mixture("obs", w=w, comp_dists=comp_dists, observed=X)
# MRF spatial prior (if adjacency provided)
if adjacency is not None and mrf_beta > 0:
z = pm.Categorical("z", p=w, shape=n)
A_coo = sp.coo_matrix(adjacency)
edges = np.column_stack([A_coo.row, A_coo.col])
pm.Potential(
"mrf",
mrf_beta * pt.sum(pt.eq(z[edges[:, 0]], z[edges[:, 1]])),
)
with model:
approx = pm.fit(20000, method="advi", random_seed=seed)
trace = approx.sample(1000)
# extract weights → assign labels
w_post = trace.posterior["w"].values.mean(axis=(0, 1)) # (K,)
mu_post = trace.posterior["mu"].values.mean(axis=(0, 1)) # (K, d)
# assign each point to nearest component weighted by w
from scipy.spatial.distance import cdist
D_km = cdist(X, mu_post)
log_resp = np.log(w_post[None, :] + 1e-12) - 0.5 * D_km**2
labels = log_resp.argmax(axis=1).astype(np.int64)
probs = np.exp(log_resp - log_resp.max(axis=1, keepdims=True))
probs /= probs.sum(axis=1, keepdims=True)
active = np.unique(labels)
n_clusters = len(active)
return ClusterResult(
labels=labels,
n_clusters=n_clusters,
method="dpmm_pymc",
probabilities=probs,
quality={},
metadata={"trace": trace, "model": model, "approx": approx},
)
[docs]
def cluster_persistence(
H_scalar: ScalarMap,
adjacency: SparseMatrix,
*,
persistence_threshold: float | None = None,
n_clusters: int | None = None,
) -> ClusterResult:
"""Persistence-based clustering (ToMATo) on a scalar field.
Treats HKS(·, t₀) as a density function on the mesh and uses
persistent homology of sub-level sets to find topologically
stable basins — each basin becomes a cluster.
Parameters
----------
H_scalar : ndarray, shape (N,)
Scalar field on the mesh (e.g. HKS at one time-scale,
or summed HKS, or any vertex-wise feature).
adjacency : sparse, shape (N, N)
Mesh adjacency.
persistence_threshold : float or None
Minimum persistence to retain a cluster. If None and
n_clusters is None, uses the largest gap in the diagram.
n_clusters : int or None
If set, selects the top-n most persistent components.
Returns
-------
ClusterResult
With ``persistence_pairs`` and ``diagram`` in metadata.
"""
_require_gudhi()
density = -np.asarray(H_scalar, dtype=np.float64)
n = len(density)
# build adjacency list from sparse matrix
A = sp.coo_matrix(adjacency)
graph = [[] for _ in range(n)]
for i, j in zip(A.row.tolist(), A.col.tolist()):
if i != j:
graph[i].append(j)
try:
from gudhi.clustering.tomato import Tomato
tomato = Tomato(
graph_type="manual",
density_type="manual",
)
tomato.fit(graph, weights=density)
if n_clusters is not None:
tomato.n_clusters_ = n_clusters
elif persistence_threshold is not None:
# set via diagram analysis
tomato.n_clusters_ = int(
np.sum(tomato.diagram_[:, 1] - tomato.diagram_[:, 0] > persistence_threshold)
)
tomato.n_clusters_ = max(1, tomato.n_clusters_)
labels = tomato.labels_.astype(np.int64)
n_clust = len(set(labels)) - (1 if -1 in labels else 0)
return ClusterResult(
labels=labels,
n_clusters=n_clust,
method="persistence_tomato",
quality={},
metadata={
"diagram": (tomato.diagram_ if hasattr(tomato, "diagram_") else None),
},
)
except (ImportError, AttributeError):
# fallback: manual union-find persistence on H₀
logger.warning(
"GUDHI Tomato not available; using manual sub-level persistence via union-find."
)
return _persistence_sublevel_h0(density, graph, n, persistence_threshold, n_clusters)
def _persistence_sublevel_h0(density, graph, n, tau, k):
"""Manual H₀ sub-level persistence via union-find."""
# sort vertices by ascending density (function value)
order = np.argsort(density)
rank = np.empty(n, dtype=np.int64)
rank[order] = np.arange(n)
parent = np.arange(n, dtype=np.int64)
birth = density.copy()
def find(x):
"""Find optimal parameters for the clustering algorithm."""
while parent[x] != x:
parent[x] = parent[parent[x]]
x = parent[x]
return x
pairs = [] # (birth_val, death_val, root_vertex)
for idx in order:
for nb in graph[idx]:
if rank[nb] < rank[idx]:
ri = find(idx)
rn = find(nb)
if ri != rn:
# younger root dies (higher birth = less persistent)
if birth[ri] > birth[rn]:
parent[ri] = rn
pairs.append((birth[ri], density[idx], ri))
else:
parent[rn] = ri
pairs.append((birth[rn], density[idx], rn))
# compute persistence = death - birth
persistences = (
np.array([d - b for b, d, _ in pairs], dtype=np.float64) if pairs else np.array([])
)
# select threshold
if k is not None and len(persistences) >= k - 1:
sorted_p = np.sort(persistences)[::-1]
tau_eff = sorted_p[k - 2] if k >= 2 else 0.0
elif tau is not None:
tau_eff = tau
elif len(persistences) > 1:
sorted_p = np.sort(persistences)[::-1]
gaps = np.diff(sorted_p)
tau_eff = sorted_p[np.argmax(np.abs(gaps))]
else:
tau_eff = 0.0
# rebuild union-find with threshold
parent2 = np.arange(n, dtype=np.int64)
birth2 = density.copy()
for idx in order:
for nb in graph[idx]:
if rank[nb] < rank[idx]:
ri = find2(parent2, idx)
rn = find2(parent2, nb)
if ri != rn:
if birth2[ri] > birth2[rn]:
p = density[idx] - birth2[ri]
if p < tau_eff:
parent2[ri] = rn
# else: keep separate
else:
p = density[idx] - birth2[rn]
if p < tau_eff:
parent2[rn] = ri
# extract labels
labels = np.array([find2(parent2, i) for i in range(n)], dtype=np.int64)
unique_roots = np.unique(labels)
relabel = {old: new for new, old in enumerate(unique_roots)}
labels = np.array([relabel[l] for l in labels], dtype=np.int64)
return ClusterResult(
labels=labels,
n_clusters=len(unique_roots),
method="persistence_sublevel_h0",
quality={},
metadata={
"persistence_pairs": pairs,
"threshold": tau_eff,
},
)
[docs]
def find2(parent, x):
"""Path-compressed find for union-find."""
while parent[x] != x:
parent[x] = parent[parent[x]]
x = parent[x]
return x
[docs]
def cluster_spectral_coclustering(
H: DescriptorMatrix,
*,
n_clusters: int = 6,
adjacency: SparseMatrix | None = None,
laplacian_smoothing: float = 5.0,
) -> ClusterResult:
"""Spectral co-clustering of the vertex × time/energy matrix.
Simultaneously clusters rows (vertices) and columns (scales),
revealing which spatial regions share which scale bands.
Parameters
----------
H : ndarray, shape (N, T)
Non-negative descriptor matrix.
n_clusters : int
Number of co-clusters.
adjacency : sparse or None
If provided, applies Laplacian smoothing post-hoc.
laplacian_smoothing : float
Tikhonov regularisation weight μ for spatial coherence.
Returns
-------
ClusterResult
With ``column_labels`` (scale clustering) in metadata.
"""
skc = _require_sklearn_cluster()
H_pos = np.asarray(H, dtype=np.float64)
H_pos = np.maximum(H_pos, 0.0)
H_pos += 1e-12 # avoid zeros for bipartite normalisation
cc = skc.SpectralCoclustering(
n_clusters=n_clusters,
svd_method="randomized",
random_state=42,
)
cc.fit(H_pos)
row_labels = cc.row_labels_.astype(np.int64)
col_labels = cc.column_labels_.astype(np.int64)
# --- optional Laplacian smoothing ---
if adjacency is not None and laplacian_smoothing > 0:
from scipy.sparse.linalg import spsolve
L = sp.csr_matrix(adjacency, dtype=np.float64)
I = sp.eye(H.shape[0], format="csr")
K = len(np.unique(row_labels))
prob = np.zeros((H.shape[0], K), dtype=np.float64)
for k in range(K):
rhs = (row_labels == k).astype(np.float64)
prob[:, k] = spsolve(I + laplacian_smoothing * L, rhs)
row_labels = prob.argmax(axis=1).astype(np.int64)
n_clusters_actual = len(np.unique(row_labels))
return ClusterResult(
labels=row_labels,
n_clusters=n_clusters_actual,
method="spectral_coclustering",
quality={},
metadata={
"column_labels": col_labels,
"n_coclusters": n_clusters,
},
)
# ======================================================================
# §4 TEMPORAL / SCALE CLUSTERING
# ======================================================================
[docs]
def cluster_temporal_fpca(
H: DescriptorMatrix,
*,
n_components: int = 6,
n_clusters: int = 6,
clusterer: Literal["kmeans", "hdbscan"] = "kmeans",
random_state: int = 42,
) -> ClusterResult:
"""Functional PCA on HKS time-profiles, then cluster fPC scores.
Treats each vertex's HKS(x, ·) as a function of log(t) and
performs fPCA to extract the dominant modes of variation, then
clusters in the score space.
Parameters
----------
H : ndarray, shape (N, T)
Per-vertex HKS / WKS profiles across T scales.
n_components : int
Number of functional principal components.
n_clusters : int
For k-means; ignored if ``clusterer="hdbscan"``.
clusterer : str
``"kmeans"`` or ``"hdbscan"``.
random_state : int
Returns
-------
ClusterResult
With ``fpc_scores``, ``explained_variance_ratio`` in metadata.
"""
# use sklearn PCA on the (N, T) matrix directly — equivalent to
# discretised fPCA when T is the evaluation grid
skd = _require_sklearn_decomposition()
skc = _require_sklearn_cluster()
X = np.asarray(H, dtype=np.float64)
# standardise per-vertex (z-score each row)
mu = X.mean(axis=1, keepdims=True)
std = X.std(axis=1, keepdims=True)
std[std == 0] = 1.0
X_std = (X - mu) / std
pca = skd.PCA(n_components=min(n_components, X_std.shape[1]), random_state=random_state)
scores = pca.fit_transform(X_std)
if clusterer == "kmeans":
km = skc.KMeans(n_clusters=n_clusters, n_init=10, random_state=random_state)
labels = km.fit_predict(scores).astype(np.int64)
n_clust = n_clusters
elif clusterer == "hdbscan":
result = cluster_hdbscan(
scores,
log_transform=False,
dim_reduction=None,
random_state=random_state,
)
labels = result.labels
n_clust = result.n_clusters
else:
raise ValueError(f"Unknown clusterer: {clusterer}")
return ClusterResult(
labels=labels,
n_clusters=n_clust,
method="fpca_" + clusterer,
quality={},
metadata={
"fpc_scores": scores,
"explained_variance_ratio": pca.explained_variance_ratio_,
"components": pca.components_,
},
)
[docs]
def cluster_temporal_dtw(
H: DescriptorMatrix,
*,
n_clusters: int = 6,
metric: Literal["dtw", "softdtw", "euclidean"] = "softdtw",
gamma: float = 0.1,
random_state: int = 42,
) -> ClusterResult:
"""Time-series k-means with DTW on HKS/WKS profiles.
Parameters
----------
H : ndarray, shape (N, T)
Descriptor profiles.
n_clusters : int
metric : str
``"dtw"``, ``"softdtw"``, or ``"euclidean"``.
gamma : float
Smoothing for soft-DTW (smaller = sharper).
random_state : int
Returns
-------
ClusterResult
With ``barycenters`` in metadata.
"""
_require_tslearn()
from tslearn.clustering import TimeSeriesKMeans
from tslearn.preprocessing import TimeSeriesScalerMeanVariance
X = np.asarray(H, dtype=np.float64)
# tslearn expects (N, T, 1)
X_3d = X[:, :, np.newaxis]
X_3d = TimeSeriesScalerMeanVariance().fit_transform(X_3d)
metric_params = {}
if metric == "softdtw":
metric_params["gamma"] = gamma
km = TimeSeriesKMeans(
n_clusters=n_clusters,
metric=metric,
metric_params=metric_params if metric_params else None,
max_iter_barycenter=20,
n_init=3,
random_state=random_state,
verbose=0,
)
labels = km.fit_predict(X_3d).astype(np.int64)
return ClusterResult(
labels=labels,
n_clusters=n_clusters,
method=f"dtw_kmeans_{metric}",
quality={
"inertia": float(km.inertia_),
},
metadata={
"barycenters": km.cluster_centers_.squeeze(-1),
},
)
# ======================================================================
# §5 SPATIO-TEMPORAL JOINT CLUSTERING
# ======================================================================
[docs]
def cluster_spatiotemporal_gnmf(
H: DescriptorMatrix,
adjacency: SparseMatrix,
*,
n_components: int = 8,
lam_spatial: float = 1.0,
lam_temporal: float = 0.1,
n_iter: int = 300,
tol: float = 1e-5,
random_state: int = 42,
backend: Literal["auto", "cpu", "gpu"] = "auto",
) -> ClusterResult:
"""Graph-regularised NMF with both spatial and temporal smoothness.
Extends GNMF by adding a temporal smoothness penalty on F,
encouraging neighbouring time scales to have similar profiles.
.. math::
\\min_{W,F \\geq 0} \\tfrac{1}{2}\\|H - WF\\|_F^2
+ \\lambda_s \\operatorname{tr}(W^T L_s W)
+ \\lambda_t \\operatorname{tr}(F L_t F^T)
where L_s is the mesh Laplacian and L_t is the 1D chain
Laplacian along the time axis.
Parameters
----------
H : ndarray, shape (N, T)
adjacency : sparse, shape (N, N)
n_components : int
lam_spatial : float
lam_temporal : float
n_iter : int
tol : float
random_state : int
backend : str
Returns
-------
ClusterResult
With spatial ``W`` and temporally-smooth ``F`` in metadata.
"""
H = np.asarray(H, dtype=np.float64)
H = np.maximum(H, 0.0) + 1e-12
n, T = H.shape
eps = 1e-12
rng = np.random.default_rng(random_state)
# --- build spatial and temporal Laplacians ---
A_off = -sp.csr_matrix(adjacency, dtype=np.float64).copy()
A_off.setdiag(0)
A_off.eliminate_zeros()
A_off.data = np.abs(A_off.data)
D_s = np.asarray(A_off.sum(axis=1)).flatten()
# 1D chain Laplacian for temporal axis
L_t = np.zeros((T, T), dtype=np.float64)
for i in range(T):
if i > 0:
L_t[i, i] += 1.0
L_t[i, i - 1] = -1.0
if i < T - 1:
L_t[i, i] += 1.0
L_t[i, i + 1] = -1.0
W = rng.random((n, n_components)).astype(np.float64) + 0.1
F = rng.random((n_components, T)).astype(np.float64) + 0.1
prev_obj = np.inf
with progress_simple("ST-GNMF", total=n_iter) as update:
for it in range(n_iter):
# update F with temporal smoothness
num_F = W.T @ H
den_F = W.T @ W @ F + lam_temporal * F @ L_t + eps
F *= num_F / np.maximum(den_F, eps)
# update W with spatial smoothness
AW = A_off @ W
DW = D_s[:, None] * W
num_W = H @ F.T + lam_spatial * AW
den_W = W @ (F @ F.T) + lam_spatial * DW + eps
W *= num_W / np.maximum(den_W, eps)
if it % 10 == 0:
obj = 0.5 * np.linalg.norm(H - W @ F, "fro") ** 2
if abs(prev_obj - obj) / (abs(prev_obj) + eps) < tol:
update(n_iter - it)
break
prev_obj = obj
update(1)
labels = W.argmax(axis=1).astype(np.int64)
W_norm = W / (W.sum(axis=1, keepdims=True) + eps)
return ClusterResult(
labels=labels,
n_clusters=len(np.unique(labels)),
method="spatiotemporal_gnmf",
probabilities=W_norm,
quality={},
metadata={
"W": W,
"F": F,
"lambda_spatial": lam_spatial,
"lambda_temporal": lam_temporal,
},
)
[docs]
def cluster_spatiotemporal_stdbscan(
H: DescriptorMatrix,
adjacency: SparseMatrix,
*,
eps_spatial: float | None = None,
eps_temporal: float | None = None,
min_pts: int = 10,
temporal_metric: Literal["euclidean", "softdtw"] = "euclidean",
gamma_dtw: float = 0.1,
backend: Literal["auto", "cpu", "gpu"] = "auto",
) -> ClusterResult:
"""ST-DBSCAN adapted for mesh + spectral descriptor profiles.
Uses a conjunctive neighbourhood: vertex y is in the
neighbourhood of x iff geodesic(x, y) ≤ ε₁ AND
d_temporal(h_x, h_y) ≤ ε₂.
Parameters
----------
H : ndarray, shape (N, T)
adjacency : sparse, shape (N, N)
eps_spatial : float or None
Geodesic distance threshold. Auto-set from median if None.
eps_temporal : float or None
Temporal distance threshold. Auto-set from median if None.
min_pts : int
temporal_metric : str
gamma_dtw : float
backend : str
Returns
-------
ClusterResult
"""
H = np.asarray(H, dtype=np.float64)
n, _T = H.shape
# --- geodesic distances via Dijkstra on mesh ---
from scipy.sparse.csgraph import shortest_path
A = sp.csr_matrix(adjacency).copy()
A.data = np.abs(A.data)
A.data[A.data == 0] = 1e-12
D_geo = shortest_path(A, method="D", directed=False)
# --- temporal distances ---
if temporal_metric == "euclidean":
from scipy.spatial.distance import pdist, squareform
D_temp = squareform(pdist(H, metric="euclidean"))
elif temporal_metric == "softdtw":
_require_tslearn()
from tslearn.metrics import cdist_soft_dtw
X_3d = H[:, :, np.newaxis]
D_temp = cdist_soft_dtw(X_3d, gamma=gamma_dtw)
else:
raise ValueError(f"Unknown temporal_metric: {temporal_metric}")
# --- auto-set thresholds ---
if eps_spatial is None:
# use edges only (not all pairs)
A_coo = sp.coo_matrix(adjacency)
edge_dists = D_geo[A_coo.row, A_coo.col]
eps_spatial = float(np.median(edge_dists[edge_dists > 0])) * 3.0
if eps_temporal is None:
A_coo = sp.coo_matrix(adjacency)
edge_tdists = D_temp[A_coo.row, A_coo.col]
eps_temporal = float(np.median(edge_tdists[edge_tdists > 0])) * 2.0
# --- ST-DBSCAN ---
labels = -np.ones(n, dtype=np.int64)
visited = np.zeros(n, dtype=bool)
cluster_id = 0
with progress_simple("ST-DBSCAN", total=n) as update:
for x in range(n):
if visited[x]:
update(1)
continue
visited[x] = True
# conjunctive neighbourhood
nbr = np.where((D_geo[x] <= eps_spatial) & (D_temp[x] <= eps_temporal))[0]
nbr = nbr[nbr != x]
if len(nbr) < min_pts:
update(1)
continue
labels[x] = cluster_id
seeds = list(nbr)
while seeds:
y = seeds.pop()
if not visited[y]:
visited[y] = True
nbr_y = np.where((D_geo[y] <= eps_spatial) & (D_temp[y] <= eps_temporal))[0]
nbr_y = nbr_y[nbr_y != y]
if len(nbr_y) >= min_pts:
seeds.extend(nbr_y.tolist())
if labels[y] == -1:
labels[y] = cluster_id
cluster_id += 1
update(1)
n_clusters = cluster_id
return ClusterResult(
labels=labels,
n_clusters=n_clusters,
method="st_dbscan",
quality={},
metadata={
"eps_spatial": eps_spatial,
"eps_temporal": eps_temporal,
"min_pts": min_pts,
},
)
# ======================================================================
# §6 HKS + WKS DESCRIPTOR FUSION
# ======================================================================
[docs]
def fuse_concatenate(
hks: DescriptorMatrix,
wks: DescriptorMatrix,
*,
log_transform: bool = True,
normalize: Literal["l1", "l2", "none"] = "l1",
weight_hks: float = 1.0,
weight_wks: float = 1.0,
) -> FusionResult:
"""Simple weighted concatenation of HKS and WKS.
Parameters
----------
hks : ndarray, shape (N, T_h)
wks : ndarray, shape (N, T_w)
log_transform : bool
normalize : str
weight_hks, weight_wks : float
Returns
-------
FusionResult
"""
Hh = np.asarray(hks, dtype=np.float64)
Hw = np.asarray(wks, dtype=np.float64)
if log_transform:
Hh = np.log(np.maximum(Hh, 1e-12))
Hw = np.log(np.maximum(np.abs(Hw) + 1e-12, 1e-12))
# normalise each block independently
for X in [Hh, Hw]:
if normalize == "l1":
s = np.abs(X).sum(axis=1, keepdims=True)
s[s == 0] = 1.0
X /= s
elif normalize == "l2":
s = np.linalg.norm(X, axis=1, keepdims=True)
s[s == 0] = 1.0
X /= s
fused = np.hstack([weight_hks * Hh, weight_wks * Hw])
return FusionResult(
fused=fused,
method="concatenate",
weights=np.array([weight_hks, weight_wks]),
metadata={"T_hks": hks.shape[1], "T_wks": wks.shape[1]},
)
[docs]
def fuse_joint_nmf(
hks: DescriptorMatrix,
wks: DescriptorMatrix,
*,
n_components: int = 16,
random_state: int = 42,
) -> FusionResult:
"""Joint NMF on concatenated [HKS | WKS] for shared basis.
Learns a shared spatial factor W and separate temporal factors
F_hks, F_wks such that [HKS | WKS] ≈ W · [F_hks | F_wks].
Parameters
----------
hks : ndarray, shape (N, T_h)
wks : ndarray, shape (N, T_w)
n_components : int
random_state : int
Returns
-------
FusionResult
``fused`` is the W matrix (shared spatial loadings).
"""
skd = _require_sklearn_decomposition()
Hh = np.maximum(np.asarray(hks, dtype=np.float64), 0.0) + 1e-12
Hw = np.maximum(np.abs(np.asarray(wks, dtype=np.float64)), 1e-12)
# normalise columns to unit max for balanced scales
Hh = Hh / (Hh.max(axis=0, keepdims=True) + 1e-12)
Hw = Hw / (Hw.max(axis=0, keepdims=True) + 1e-12)
H_joint = np.hstack([Hh, Hw])
model = skd.NMF(
n_components=n_components,
init="nndsvd",
solver="mu",
beta_loss="kullback-leibler",
max_iter=500,
random_state=random_state,
)
W = model.fit_transform(H_joint)
F = model.components_
return FusionResult(
fused=W,
method="joint_nmf",
weights=None,
metadata={
"F_full": F,
"F_hks": F[:, : hks.shape[1]],
"F_wks": F[:, hks.shape[1] :],
"reconstruction_error": model.reconstruction_err_,
},
)
[docs]
def fuse_multi_kernel(
hks: DescriptorMatrix,
wks: DescriptorMatrix,
*,
n_kernels_per_desc: int = 5,
sigma_range: tuple[float, float] = (0.1, 10.0),
) -> FusionResult:
"""Multi-kernel fusion: build a combined kernel from HKS and WKS.
Computes K = Σ_j α_j K^hks_j + Σ_k β_k K^wks_k with uniform
weights (simpleMKL optimisation is available as extension).
Parameters
----------
hks : ndarray, shape (N, T_h)
wks : ndarray, shape (N, T_w)
n_kernels_per_desc : int
Number of bandwidth samples per descriptor.
sigma_range : tuple
(min, max) bandwidth range.
Returns
-------
FusionResult
``fused`` is the combined kernel matrix K, shape (N, N).
"""
sigmas = np.logspace(
np.log10(sigma_range[0]),
np.log10(sigma_range[1]),
n_kernels_per_desc,
)
Hh = np.log(np.maximum(np.asarray(hks, dtype=np.float64), 1e-12))
Hw = np.log(np.maximum(np.abs(np.asarray(wks, dtype=np.float64)) + 1e-12, 1e-12))
n = Hh.shape[0]
K = np.zeros((n, n), dtype=np.float64)
n_total = 2 * n_kernels_per_desc
from scipy.spatial.distance import pdist, squareform
D_hks = squareform(pdist(Hh, "sqeuclidean"))
D_wks = squareform(pdist(Hw, "sqeuclidean"))
for sig in sigmas:
K += np.exp(-D_hks / (2 * sig**2)) / n_total
K += np.exp(-D_wks / (2 * sig**2)) / n_total
return FusionResult(
fused=K,
method="multi_kernel_uniform",
weights=np.ones(n_total) / n_total,
metadata={"sigmas": sigmas, "n_kernels": n_total},
)
# ======================================================================
# §7 BAYESIAN CLUSTER CONFIRMATION
# ======================================================================
[docs]
def confirm_clusters_bayesian(
H: DescriptorMatrix,
labels: LabelArray,
*,
adjacency: SparseMatrix | None = None,
mrf_beta: float = 1.0,
n_samples: int = 2000,
n_tune: int = 1000,
dim_reduction: int = 8,
random_state: int = 42,
) -> BayesianClusterConfirmation:
"""Bayesian confirmation of cluster assignments.
Fits a Bayesian Gaussian mixture model with cluster-specific
priors informed by the input labels, plus an optional Potts MRF
spatial prior from the mesh adjacency. Compares MAP assignments
to input labels and computes model quality (WAIC, LOO).
This answers the question: "Are these clusters statistically
credible given the data and the spatial structure?"
Parameters
----------
H : ndarray, shape (N, T)
Descriptor matrix.
labels : ndarray, shape (N,)
Input cluster labels to confirm.
adjacency : sparse or None
Mesh adjacency for MRF prior.
mrf_beta : float
Potts coupling strength.
n_samples : int
MCMC posterior samples.
n_tune : int
MCMC tuning samples.
dim_reduction : int
Reduce to this many dimensions before modelling.
random_state : int
Returns
-------
BayesianClusterConfirmation
"""
pm, az = _require_pymc()
skd = _require_sklearn_decomposition()
skm = _require_sklearn_metrics()
H = np.asarray(H, dtype=np.float64)
labels = np.asarray(labels, dtype=np.int64)
valid = labels >= 0
H_valid = H[valid]
lab_valid = labels[valid]
# reduce dimensions
X = np.log(H_valid + 1e-12)
if X.shape[1] > dim_reduction:
X = skd.PCA(n_components=dim_reduction, random_state=random_state).fit_transform(X)
K = len(np.unique(lab_valid))
n, d = X.shape
# compute empirical cluster means as informative priors
mu_prior = np.zeros((K, d))
for k in range(K):
mask = lab_valid == k
if mask.any():
mu_prior[k] = X[mask].mean(axis=0)
with pm.Model() as model:
# cluster-specific priors centred on empirical means
mu = pm.Normal("mu", mu=mu_prior, sigma=2.0, shape=(K, d))
sigma = pm.HalfNormal("sigma", sigma=1.0, shape=(K, d))
w = pm.Dirichlet("w", a=np.ones(K) * 10.0)
comp_dists = [pm.Normal.dist(mu=mu[k], sigma=sigma[k], shape=d) for k in range(K)]
pm.Mixture("obs", w=w, comp_dists=comp_dists, observed=X)
with model:
trace = pm.sample(
draws=n_samples,
tune=n_tune,
random_seed=random_state,
return_inferencedata=True,
progressbar=True,
)
# --- compute MAP labels ---
w_post = trace.posterior["w"].values.mean(axis=(0, 1)) # (K,)
mu_post = trace.posterior["mu"].values.mean(axis=(0, 1)) # (K, d)
sig_post = trace.posterior["sigma"].values.mean(axis=(0, 1)) # (K, d)
# log responsibility
log_resp = np.zeros((n, K))
for k in range(K):
diff = X - mu_post[k]
log_resp[:, k] = (
np.log(w_post[k] + 1e-12)
- 0.5 * np.sum((diff / (sig_post[k] + 1e-12)) ** 2, axis=1)
- np.sum(np.log(sig_post[k] + 1e-12))
)
probs = np.exp(log_resp - log_resp.max(axis=1, keepdims=True))
probs /= probs.sum(axis=1, keepdims=True)
posterior_labels = probs.argmax(axis=1).astype(np.int64)
# --- model comparison ---
try:
waic_val = float(az.waic(trace, model).elpd_waic)
except Exception:
waic_val = float("nan")
try:
loo_val = float(az.loo(trace, model).elpd_loo)
except Exception:
loo_val = float("nan")
# --- agreement ---
ari = float(skm.adjusted_rand_score(lab_valid, posterior_labels))
# --- credible intervals per cluster ---
ci = {}
for k in range(K):
mu_k = trace.posterior["mu"].values[:, :, k, :] # (chain, draw, d)
mu_flat = mu_k.reshape(-1, d)
ci[k] = {
"mean": mu_flat.mean(axis=0),
"hdi_3": np.percentile(mu_flat, 3, axis=0),
"hdi_97": np.percentile(mu_flat, 97, axis=0),
}
# --- reconstruct full labels (including noise vertices) ---
full_labels = -np.ones(H.shape[0], dtype=np.int64)
full_labels[valid] = posterior_labels
full_probs = np.zeros((H.shape[0], K), dtype=np.float64)
full_probs[valid] = probs
return BayesianClusterConfirmation(
posterior_labels=full_labels,
label_probabilities=full_probs,
waic=waic_val,
loo=loo_val,
cluster_credible_intervals=ci,
agreement_with_input=ari,
metadata={"trace": trace, "model": model},
)
# ======================================================================
# §8 CLUSTER QUALITY & COMPARISON METRICS
# ======================================================================
[docs]
def cluster_quality(
H: DescriptorMatrix,
labels: LabelArray,
*,
adjacency: SparseMatrix | None = None,
metric: Literal["euclidean", "precomputed"] = "euclidean",
) -> dict[str, float]:
"""Compute internal clustering quality metrics.
Parameters
----------
H : ndarray, shape (N, T) or (N, N)
Descriptor matrix or precomputed distance matrix.
labels : ndarray, shape (N,)
adjacency : sparse or None
For spatial coherence metrics.
metric : str
Returns
-------
dict
Keys: silhouette, calinski_harabasz, davies_bouldin,
spatial_coherence (if adjacency provided).
"""
skm = _require_sklearn_metrics()
valid = labels >= 0
if valid.sum() < 2 or len(np.unique(labels[valid])) < 2:
return {"silhouette": float("nan")}
H_v = H[valid] if metric != "precomputed" else H[np.ix_(valid, valid)]
lab_v = labels[valid]
result = {
"silhouette": float(skm.silhouette_score(H_v, lab_v, metric=metric)),
}
if metric != "precomputed":
result["calinski_harabasz"] = float(skm.calinski_harabasz_score(H_v, lab_v))
result["davies_bouldin"] = float(skm.davies_bouldin_score(H_v, lab_v))
# spatial coherence: fraction of edges where both endpoints
# share the same cluster label
if adjacency is not None:
A_coo = sp.coo_matrix(adjacency)
same = labels[A_coo.row] == labels[A_coo.col]
both_valid = (labels[A_coo.row] >= 0) & (labels[A_coo.col] >= 0)
if both_valid.sum() > 0:
result["spatial_coherence"] = float(same[both_valid].mean())
return result
[docs]
def cluster_comparison(
labels_a: LabelArray,
labels_b: LabelArray,
) -> dict[str, float]:
"""Compare two clusterings (e.g., algorithm output vs atlas labels).
Returns
-------
dict
Keys: ari, nmi, ami, homogeneity, completeness, v_measure.
"""
skm = _require_sklearn_metrics()
a = np.asarray(labels_a, dtype=np.int64)
b = np.asarray(labels_b, dtype=np.int64)
valid = (a >= 0) & (b >= 0)
a_v, b_v = a[valid], b[valid]
return {
"ari": float(skm.adjusted_rand_score(a_v, b_v)),
"nmi": float(skm.normalized_mutual_info_score(a_v, b_v)),
"ami": float(skm.adjusted_mutual_info_score(a_v, b_v)),
"homogeneity": float(skm.homogeneity_score(a_v, b_v)),
"completeness": float(skm.completeness_score(a_v, b_v)),
"v_measure": float(skm.v_measure_score(a_v, b_v)),
}
# ======================================================================
# §9 CONVENIENCE / PIPELINE WRAPPERS
# ======================================================================
[docs]
def auto_cluster(
H: DescriptorMatrix,
*,
adjacency: SparseMatrix | None = None,
methods: Sequence[str] = ("hdbscan", "leiden", "gnmf"),
random_state: int = 42,
backend: Literal["auto", "cpu", "gpu"] = "auto",
**kwargs: Any,
) -> dict[str, ClusterResult]:
"""Run multiple clustering algorithms and return all results.
A convenience function for exploratory analysis that runs a
battery of methods on the same data and returns a dict keyed
by method name. The user can then compare via
:func:`cluster_quality` and :func:`cluster_comparison`.
Parameters
----------
H : ndarray, shape (N, T)
adjacency : sparse or None
methods : sequence of str
Subset of ``{"hdbscan", "leiden", "gnmf", "dpmm", "fpca",
"coclustering", "persistence"}``.
random_state : int
backend : str
**kwargs
Forwarded to individual clustering functions.
Returns
-------
dict[str, ClusterResult]
"""
results = {}
for method in methods:
try:
if method == "hdbscan":
results[method] = cluster_hdbscan(
H,
adjacency=adjacency,
random_state=random_state,
backend=backend,
**{
k: v
for k, v in kwargs.items()
if k in ("min_cluster_size", "min_samples", "alpha_fusion", "dim_reduction")
},
)
elif method == "leiden" and adjacency is not None:
results[method] = cluster_leiden(
adjacency,
H=H,
random_state=random_state,
**{k: v for k, v in kwargs.items() if k in ("resolution", "quality_function")},
)
elif method == "gnmf" and adjacency is not None:
results[method] = cluster_gnmf(
H,
adjacency,
random_state=random_state,
backend=backend,
**{k: v for k, v in kwargs.items() if k in ("n_components", "lam")},
)
elif method == "dpmm":
results[method] = cluster_dpmm(
H,
adjacency=adjacency,
random_state=random_state,
**{k: v for k, v in kwargs.items() if k in ("max_components",)},
)
elif method == "fpca":
results[method] = cluster_temporal_fpca(
H,
random_state=random_state,
**{k: v for k, v in kwargs.items() if k in ("n_components", "n_clusters")},
)
elif method == "coclustering":
results[method] = cluster_spectral_coclustering(
H,
adjacency=adjacency,
**{k: v for k, v in kwargs.items() if k in ("n_clusters",)},
)
elif method == "persistence" and adjacency is not None:
# use summed HKS as scalar field
results[method] = cluster_persistence(
H.sum(axis=1),
adjacency,
)
else:
logger.warning(
"Skipping method '%s' (missing adjacency or unknown)",
method,
)
except Exception as e:
logger.error("Method '%s' failed: %s", method, e)
continue
return results
# ======================================================================
# §10 PERSISTENCE VINEYARDS — TRACKING TOPOLOGY ACROSS HKS SCALES
# ======================================================================
[docs]
def cluster_vineyards(
H: DescriptorMatrix,
adjacency: SparseMatrix,
*,
n_scales: int | None = None,
min_persistence_frac: float = 0.1,
min_life_frac: float = 0.3,
backend: Literal["dionysus", "manual"] = "manual",
) -> VineyardResult:
"""Track persistence diagram points across HKS time-scales.
For each column t_j of H (a fixed HKS scale), computes the H₀
sub-level-set persistence diagram of HKS(·, t_j) on the mesh.
Then links diagram points across consecutive scales by nearest-
neighbour matching in (birth, death) space, producing continuous
"vines" — trajectories of topological features through scale.
Features that persist across a large fraction of the t-range
correspond to anatomically stable sub-regions; features that
appear or disappear at specific scales reveal scale-dependent
structural boundaries.
Parameters
----------
H : ndarray, shape (N, T)
Per-vertex descriptor matrix (HKS at T time-scales).
adjacency : sparse, shape (N, N)
Mesh adjacency.
n_scales : int or None
Sub-sample to this many scales (for speed). None = use all T.
min_persistence_frac : float
Minimum persistence as fraction of the function range to
retain a feature in the diagram.
min_life_frac : float
A vine must span at least this fraction of total scales to
be considered salient.
backend : str
``"dionysus"`` uses the dionysus2 library (faster, exact).
``"manual"`` uses built-in union-find (no extra dependency).
Returns
-------
VineyardResult
With vines, diagrams per scale, salient features, and
scale-of-emergence per feature.
References
----------
Cohen-Steiner D, Edelsbrunner H, Morozov D. Vines and vineyards
by updating persistence in linear time. *Proc. 22nd ACM Symp.
Computational Geometry*, 119–126, 2006.
"""
H = np.asarray(H, dtype=np.float64)
n, T_total = H.shape
# optionally sub-sample scales for speed
if n_scales is not None and n_scales < T_total:
scale_indices = np.linspace(0, T_total - 1, n_scales, dtype=int)
else:
scale_indices = np.arange(T_total)
T = len(scale_indices)
# build adjacency list once
A_coo = sp.coo_matrix(adjacency)
graph = [[] for _ in range(n)]
for i, j in zip(A_coo.row.tolist(), A_coo.col.tolist()):
if i != j:
graph[i].append(j)
# compute persistence diagram at each scale
diagrams = {}
with progress_simple("Vineyards: persistence per scale", total=T) as tick:
for si, ti in enumerate(scale_indices):
f_vals = H[:, ti]
pairs = _sublevel_h0_pairs(f_vals, graph, n)
# filter by persistence
f_range = f_vals.max() - f_vals.min()
threshold = min_persistence_frac * f_range if f_range > 0 else 0
pairs_filtered = [(b, d, v) for b, d, v in pairs if (d - b) > threshold]
diagrams[si] = {
"pairs": pairs_filtered,
"scale_index": int(ti),
"bd_array": (
np.array([(b, d) for b, d, _ in pairs_filtered])
if pairs_filtered
else np.empty((0, 2))
),
}
tick(1)
# link diagram points across scales → vines
vines = _link_vines(diagrams, T)
# identify salient features (long-lived vines)
min_life = int(min_life_frac * T)
salient = []
emergence = []
for vine in vines:
if len(vine) >= min_life:
t_start = vine[0]["scale_index"]
t_end = vine[-1]["scale_index"]
mean_persistence = np.mean([v["death"] - v["birth"] for v in vine])
salient.append(
{
"vine_length": len(vine),
"t_start": int(scale_indices[t_start]),
"t_end": int(scale_indices[t_end]),
"mean_persistence": float(mean_persistence),
"representative_vertex": vine[len(vine) // 2].get("vertex", -1),
}
)
emergence.append(float(scale_indices[t_start]))
# convert vines to arrays
vine_arrays = []
for vine in vines:
arr = np.array([(scale_indices[v["scale_index"]], v["birth"], v["death"]) for v in vine])
vine_arrays.append(arr)
return VineyardResult(
vines=vine_arrays,
diagrams={k: v["bd_array"] for k, v in diagrams.items()},
salient_features=salient,
scale_of_emergence=np.array(emergence) if emergence else np.array([]),
metadata={
"n_scales_used": T,
"scale_indices": scale_indices,
"min_persistence_frac": min_persistence_frac,
"min_life_frac": min_life_frac,
"total_vines": len(vines),
},
)
def _sublevel_h0_pairs(f_vals, graph, n):
"""Compute H₀ sub-level set persistence pairs via union-find."""
order = np.argsort(f_vals)
rank = np.empty(n, dtype=np.int64)
rank[order] = np.arange(n)
parent = np.arange(n, dtype=np.int64)
birth = f_vals.copy()
pairs = []
def _find(x):
"""Internal parameter search implementation."""
while parent[x] != x:
parent[x] = parent[parent[x]]
x = parent[x]
return x
for idx in order:
for nb in graph[idx]:
if rank[nb] < rank[idx]:
ri = _find(idx)
rn = _find(nb)
if ri != rn:
if birth[ri] > birth[rn]:
parent[ri] = rn
pairs.append((float(birth[ri]), float(f_vals[idx]), int(ri)))
else:
parent[rn] = ri
pairs.append((float(birth[rn]), float(f_vals[idx]), int(rn)))
return pairs
def _link_vines(diagrams, T):
"""Link persistence pairs across scales by nearest-neighbour."""
from scipy.spatial.distance import cdist
vines = []
prev_pts = None
prev_vine_ids = None
for si in range(T):
bd = diagrams[si]["bd_array"]
if bd.shape[0] == 0:
prev_pts = None
prev_vine_ids = None
continue
if prev_pts is None or prev_pts.shape[0] == 0:
# start new vines for each point
for pi in range(bd.shape[0]):
vines.append(
[
{
"scale_index": si,
"birth": float(bd[pi, 0]),
"death": float(bd[pi, 1]),
}
]
)
prev_pts = bd.copy()
prev_vine_ids = list(range(len(vines) - bd.shape[0], len(vines)))
continue
# match previous points to current by nearest neighbour
D = cdist(prev_pts, bd)
used_curr = set()
assignments = {} # prev_idx → curr_idx
# greedy matching: sort all (prev, curr) pairs by distance
flat = D.flatten()
sorted_idx = np.argsort(flat)
for fi in sorted_idx:
pi = int(fi // D.shape[1])
ci = int(fi % D.shape[1])
if pi not in assignments and ci not in used_curr:
assignments[pi] = ci
used_curr.add(ci)
if len(assignments) == min(D.shape[0], D.shape[1]):
break
curr_vine_ids = [None] * bd.shape[0]
# extend matched vines
for pi, ci in assignments.items():
vid = prev_vine_ids[pi]
vines[vid].append(
{
"scale_index": si,
"birth": float(bd[ci, 0]),
"death": float(bd[ci, 1]),
}
)
curr_vine_ids[ci] = vid
# start new vines for unmatched current points
for ci in range(bd.shape[0]):
if curr_vine_ids[ci] is None:
vines.append(
[
{
"scale_index": si,
"birth": float(bd[ci, 0]),
"death": float(bd[ci, 1]),
}
]
)
curr_vine_ids[ci] = len(vines) - 1
prev_pts = bd.copy()
prev_vine_ids = curr_vine_ids
return vines
# ======================================================================
# §11 MAPPER PIPELINE — TOPOLOGICAL DATA ANALYSIS ON MESHES
# ======================================================================
[docs]
def cluster_mapper(
H: DescriptorMatrix,
*,
lens: Literal["hks_sum", "hks_first_pc", "custom"] = "hks_sum",
custom_lens: np.ndarray | None = None,
n_cubes: int = 15,
perc_overlap: float = 0.3,
clusterer_method: Literal["dbscan", "hdbscan", "agglomerative"] = "dbscan",
clusterer_eps: float = 0.5,
dim_reduction: Literal["umap", "pca"] | None = None,
n_components: int = 2,
random_state: int = 42,
) -> MapperResult:
"""TDA Mapper pipeline with HKS-derived lens function.
Projects each vertex through a lens (filter) function into ℝ^d,
covers the lens range with overlapping hypercubes, clusters within
each pullback, and forms the nerve complex. The resulting graph
is a topological skeleton that reveals branching structure,
loops, and flares in the descriptor space.
Parameters
----------
H : ndarray, shape (N, T)
Per-vertex descriptor matrix.
lens : str
``"hks_sum"`` — sum of HKS across all scales.
``"hks_first_pc"`` — first PC of the descriptor matrix.
``"custom"`` — use ``custom_lens``.
custom_lens : ndarray or None, shape (N,) or (N, d)
Custom lens function values.
n_cubes : int
Number of intervals per lens dimension.
perc_overlap : float
Overlap fraction between adjacent intervals (0–1).
clusterer_method : str
Clustering algorithm within each pullback.
clusterer_eps : float
Epsilon for DBSCAN within pullbacks.
dim_reduction : str or None
Reduce descriptor space before building Mapper graph.
n_components : int
Target dimensionality for reduction.
random_state : int
Returns
-------
MapperResult
References
----------
Singh G, Mémoli F, Carlsson G. Topological methods for the
analysis of high dimensional data sets and 3D object
recognition. *SPBG*, 2007.
"""
km = _require_kepler_mapper()
H = np.asarray(H, dtype=np.float64)
n = H.shape[0]
# --- build lens ---
if lens == "hks_sum":
lens_data = H.sum(axis=1, keepdims=True) # (N, 1)
elif lens == "hks_first_pc":
skd = _require_sklearn_decomposition()
pca = skd.PCA(n_components=1, random_state=random_state)
lens_data = pca.fit_transform(H) # (N, 1)
elif lens == "custom":
if custom_lens is None:
raise ValueError("custom_lens required when lens='custom'")
lens_data = np.atleast_2d(custom_lens)
if lens_data.shape[0] == 1:
lens_data = lens_data.T
else:
raise ValueError(f"Unknown lens: {lens}")
# --- optionally reduce the high-dim data ---
if dim_reduction == "umap" and H.shape[1] > n_components:
umap_mod = _require_umap()
projected = umap_mod.UMAP(
n_components=n_components,
random_state=random_state,
).fit_transform(H)
elif dim_reduction == "pca" and H.shape[1] > n_components:
skd = _require_sklearn_decomposition()
projected = skd.PCA(
n_components=n_components,
random_state=random_state,
).fit_transform(H)
else:
projected = H
# --- configure clusterer for pullbacks ---
if clusterer_method == "dbscan":
from sklearn.cluster import DBSCAN
inner_clusterer = DBSCAN(eps=clusterer_eps, min_samples=3)
elif clusterer_method == "hdbscan":
hdbscan_mod = _require_hdbscan()
inner_clusterer = hdbscan_mod.HDBSCAN(min_cluster_size=5)
elif clusterer_method == "agglomerative":
from sklearn.cluster import AgglomerativeClustering
inner_clusterer = AgglomerativeClustering(
n_clusters=None,
distance_threshold=clusterer_eps,
)
else:
raise ValueError(f"Unknown clusterer_method: {clusterer_method}")
# --- run Mapper ---
mapper = km.KeplerMapper(verbose=0)
graph = mapper.map(
lens_data,
projected,
cover=km.Cover(n_cubes=n_cubes, perc_overlap=perc_overlap),
clusterer=inner_clusterer,
)
# --- parse the graph ---
node_keys = list(graph["nodes"].keys())
node_membership = {}
for i, key in enumerate(node_keys):
node_membership[i] = graph["nodes"][key]
# build adjacency
nerve_adj = {i: [] for i in range(len(node_keys))}
key_to_idx = {k: i for i, k in enumerate(node_keys)}
n_edges = 0
for edge in graph.get("links", []):
if len(edge) == 2:
i0 = key_to_idx.get(edge[0])
i1 = key_to_idx.get(edge[1])
if i0 is not None and i1 is not None:
nerve_adj[i0].append(i1)
nerve_adj[i1].append(i0)
n_edges += 1
# vertex → first nerve node (for colouring)
v2n = -np.ones(n, dtype=np.int64)
for ni, verts in node_membership.items():
for v in verts:
if v2n[v] == -1:
v2n[v] = ni
return MapperResult(
nerve_graph=nerve_adj,
node_membership=node_membership,
n_nodes=len(node_keys),
n_edges=n_edges,
vertex_to_nodes=v2n,
metadata={
"kepler_graph": graph,
"lens_type": lens,
"n_cubes": n_cubes,
"perc_overlap": perc_overlap,
},
)
# ======================================================================
# §12 NON-NEGATIVE TENSOR DECOMPOSITION (MULTI-SUBJECT)
# ======================================================================
[docs]
def cluster_tensor_decomposition(
tensor: np.ndarray,
*,
n_components: int = 8,
adjacency: SparseMatrix | None = None,
lam_spatial: float = 0.0,
method: Literal["cp", "tucker"] = "cp",
n_iter_max: int = 200,
tol: float = 1e-6,
random_state: int = 42,
backend: Literal["auto", "cpu", "gpu"] = "auto",
) -> TensorDecompositionResult:
"""Non-negative CP/PARAFAC or Tucker on (vertices × scales × subjects).
For a cohort of S subjects with vertex-corresponded meshes (e.g.
via HippUnfold), the HKS data forms a 3-way tensor
ℋ ∈ ℝ^{N × T × S}. CP decomposes it as
.. math::
\\mathcal{H}_{ijk} \\approx \\sum_{r=1}^R w_r(i) \\cdot f_r(j)
\\cdot s_r(k)
yielding shared spatial atoms ``w_r`` (which define a parcellation),
population-level temporal profiles ``f_r``, and per-subject
loadings ``s_r``.
Parameters
----------
tensor : ndarray, shape (N, T, S)
Non-negative tensor (HKS across subjects).
n_components : int
CP rank R (≈ expected number of parcels).
adjacency : sparse or None
Mesh Laplacian for graph-regularised variant.
lam_spatial : float
Weight for the Laplacian penalty on spatial factors.
``0.0`` disables spatial regularisation.
method : str
``"cp"`` for CP/PARAFAC, ``"tucker"`` for Tucker.
n_iter_max : int
tol : float
random_state : int
backend : str
``"gpu"`` uses tensorly with PyTorch backend.
Returns
-------
TensorDecompositionResult
References
----------
Kolda TG, Bader BW. Tensor decompositions and applications.
*SIAM Review* 51(3):455–500, 2009.
"""
tl = _require_tensorly()
import tensorly.decomposition as tl_decomp
T_data = np.asarray(tensor, dtype=np.float64)
if T_data.ndim != 3:
raise ValueError(f"Expected 3D tensor (N, T, S), got shape {T_data.shape}")
T_data = np.maximum(T_data, 0.0) + 1e-12
use_gpu = _resolve_backend(backend)
if use_gpu:
try:
torch = _require_torch()
tl.set_backend("pytorch")
T_tl = tl.tensor(T_data, dtype=torch.float32, device=torch.device("cuda"))
except Exception:
logger.warning("GPU tensorly failed, falling back to numpy")
tl.set_backend("numpy")
T_tl = tl.tensor(T_data)
else:
tl.set_backend("numpy")
T_tl = tl.tensor(T_data)
# --- decompose ---
if method == "cp":
result = tl_decomp.non_negative_parafac(
T_tl,
rank=n_components,
n_iter_max=n_iter_max,
tol=tol,
random_state=random_state,
return_errors=True,
)
if isinstance(result, tuple):
factors_obj, errors = result[0], result[1]
else:
factors_obj, errors = result, []
# extract factor matrices
tl.to_numpy(factors_obj.weights) if hasattr(factors_obj, "weights") else np.ones(
n_components
)
W = tl.to_numpy(factors_obj.factors[0]) # (N, R)
F = tl.to_numpy(factors_obj.factors[1]) # (T, R)
S = tl.to_numpy(factors_obj.factors[2]) # (S, R)
elif method == "tucker":
result = tl_decomp.non_negative_tucker(
T_tl,
rank=[
n_components,
min(n_components, T_data.shape[1]),
min(n_components, T_data.shape[2]),
],
n_iter_max=n_iter_max,
tol=tol,
random_state=random_state,
)
tl.to_numpy(result[0])
W = tl.to_numpy(result[1][0])
F = tl.to_numpy(result[1][1])
S = tl.to_numpy(result[1][2])
errors = []
else:
raise ValueError(f"Unknown method: {method}")
# --- optional graph regularisation (post-hoc projection) ---
if adjacency is not None and lam_spatial > 0:
A_off = -sp.csr_matrix(adjacency, dtype=np.float64).copy()
A_off.setdiag(0)
A_off.eliminate_zeros()
A_off.data = np.abs(A_off.data)
D_s = np.asarray(A_off.sum(axis=1)).flatten()
# iterative Laplacian smoothing of spatial factors
for _ in range(10):
AW = A_off @ W
DW = D_s[:, None] * W
grad = lam_spatial * (DW - AW)
W = np.maximum(W - 0.01 * grad, 1e-12)
# reset backend
tl.set_backend("numpy")
# --- cluster labels ---
labels = W.argmax(axis=1).astype(np.int64)
len(np.unique(labels))
# reconstruction error
rec_err = float(errors[-1] if isinstance(errors, list) and len(errors) > 0 else np.nan)
if use_gpu:
try:
torch = _require_torch()
torch.cuda.empty_cache()
except Exception:
pass
return TensorDecompositionResult(
spatial_factors=W,
temporal_factors=F,
subject_factors=S,
labels=labels,
n_components=n_components,
reconstruction_error=rec_err,
metadata={
"method": method,
"lambda_spatial": lam_spatial,
},
)
# ======================================================================
# §13 JOINT TIME-VERTEX GRAPH SIGNAL PROCESSING
# ======================================================================
[docs]
def denoise_joint_timevertex(
H: DescriptorMatrix,
adjacency: SparseMatrix,
*,
alpha_graph: float = 1.0,
beta_time: float = 1.0,
n_eigenvectors: int = 50,
) -> DescriptorMatrix:
"""Joint time-vertex low-pass filtering of a descriptor matrix.
Applies a separable filter in the graph-spectral and temporal-
spectral domains simultaneously:
.. math::
g(\\lambda, \\omega) = \\exp(-\\alpha \\lambda - \\beta \\omega^2)
This smooths H in both mesh-space (removing high-frequency
geometric noise) and time-space (removing scale-to-scale
oscillations), producing a cleaner descriptor for downstream
clustering.
Parameters
----------
H : ndarray, shape (N, T)
Descriptor matrix.
adjacency : sparse, shape (N, N)
Mesh Laplacian (should be positive semi-definite).
alpha_graph : float
Graph-spectral smoothing strength.
beta_time : float
Temporal smoothing strength.
n_eigenvectors : int
Number of graph Laplacian eigenvectors to use.
Returns
-------
ndarray, shape (N, T)
Filtered descriptor matrix.
References
----------
Grassi F, Loukas A, Perraudin N, Ricaud B. A time-vertex signal
processing framework. *IEEE Trans. Signal Processing* 66(3):
817–829, 2018.
"""
H = np.asarray(H, dtype=np.float64)
n, T = H.shape
# --- graph spectral basis ---
L = sp.csr_matrix(adjacency, dtype=np.float64)
from scipy.sparse.linalg import eigsh
k = min(n_eigenvectors, n - 1)
eigenvalues_g, U = eigsh(L, k=k, which="SM")
eigenvalues_g = np.maximum(eigenvalues_g, 0.0)
# --- temporal spectral basis (DCT-II) ---
from scipy.fft import dct, idct
H_graph = U.T @ H # (k, T) graph-spectral
H_joint = dct(H_graph, type=2, axis=1) # (k, T) joint domain
# --- build separable filter ---
omega = np.arange(T, dtype=np.float64)
omega = omega * np.pi / T # normalised frequency
g_graph = np.exp(-alpha_graph * eigenvalues_g) # (k,)
g_time = np.exp(-beta_time * omega**2) # (T,)
G = g_graph[:, None] * g_time[None, :] # (k, T)
# --- apply filter ---
H_filtered_joint = H_joint * G
H_filtered_graph = idct(H_filtered_joint, type=2, axis=1) / (2 * T)
H_filtered = U @ H_filtered_graph # (N, T)
return H_filtered
[docs]
def cluster_joint_spectral(
H: DescriptorMatrix,
adjacency: SparseMatrix,
*,
n_clusters: int = 6,
n_eigenvectors: int = 30,
n_freq_bands: int = 5,
clusterer: Literal["kmeans", "hdbscan"] = "kmeans",
random_state: int = 42,
) -> ClusterResult:
"""Cluster vertices by joint time-vertex spectral energy.
Computes the Joint Fourier Transform of H, partitions the
(graph-frequency, time-frequency) plane into bands, measures
energy concentration per vertex per band, and clusters vertices
by their spectral energy profile.
Parameters
----------
H : ndarray, shape (N, T)
adjacency : sparse, shape (N, N)
n_clusters : int
For k-means.
n_eigenvectors : int
Graph Laplacian eigenvectors.
n_freq_bands : int
Number of bands to partition the graph-frequency axis.
clusterer : str
random_state : int
Returns
-------
ClusterResult
With ``spectral_energy`` in metadata.
"""
H = np.asarray(H, dtype=np.float64)
n, _T = H.shape
# --- graph spectral basis ---
L = sp.csr_matrix(adjacency, dtype=np.float64)
from scipy.sparse.linalg import eigsh
k = min(n_eigenvectors, n - 1)
eigenvalues_g, U = eigsh(L, k=k, which="SM")
eigenvalues_g = np.maximum(eigenvalues_g, 0.0)
# --- compute per-vertex spectral energy in graph bands ---
# project each vertex's time profile into graph spectral domain
# vertex i's contribution to eigenmode j: U[i, j] * H[i, :]
# energy = sum over time of |U[i,j] * H[i,t]|²
# per band: sum over eigenmodes j in band
band_edges = np.linspace(0, k, n_freq_bands + 1, dtype=int)
energy_features = np.zeros((n, n_freq_bands), dtype=np.float64)
with progress_simple("Joint spectral energy", total=n_freq_bands) as tick:
for bi in range(n_freq_bands):
j_start = band_edges[bi]
j_end = band_edges[bi + 1]
if j_end <= j_start:
tick(1)
continue
# energy of vertex i in this graph-frequency band
# = sum_j=j_start..j_end-1 sum_t |U[i,j] * H[i,t]|²
# = sum_j U[i,j]² * sum_t H[i,t]²
U_band_sq = np.sum(U[:, j_start:j_end] ** 2, axis=1) # (N,)
H_energy = np.sum(H**2, axis=1) # (N,)
energy_features[:, bi] = U_band_sq * H_energy
tick(1)
# normalise
row_sums = energy_features.sum(axis=1, keepdims=True)
row_sums[row_sums == 0] = 1.0
energy_features /= row_sums
# --- cluster ---
skc = _require_sklearn_cluster()
if clusterer == "kmeans":
km = skc.KMeans(n_clusters=n_clusters, n_init=10, random_state=random_state)
labels = km.fit_predict(energy_features).astype(np.int64)
elif clusterer == "hdbscan":
res = cluster_hdbscan(
energy_features,
log_transform=False,
dim_reduction=None,
random_state=random_state,
)
labels = res.labels
else:
raise ValueError(f"Unknown clusterer: {clusterer}")
n_clust = len(set(labels[labels >= 0]))
return ClusterResult(
labels=labels,
n_clusters=n_clust,
method="joint_spectral",
quality={},
metadata={
"spectral_energy": energy_features,
"band_edges": band_edges,
"eigenvalues_graph": eigenvalues_g,
},
)
# ======================================================================
# §14 SCALE-SPACE BLOB TRACKING (LINDEBERG ON MANIFOLDS)
# ======================================================================
[docs]
def cluster_scalespace_blobs(
H: DescriptorMatrix,
adjacency: SparseMatrix,
*,
t_values: np.ndarray | None = None,
gamma_normalize: float = 1.0,
linking_radius: float = 3.0,
min_trajectory_length: int = 3,
) -> ScaleSpaceBlobResult:
"""Lindeberg-style scale-space blob tracking on HKS.
HKS(x, t) IS the Gaussian scale-space on the manifold. This
function detects local maxima of the scale-normalised response
t^γ · HKS(x, t) at each scale, links them across consecutive
scales by geodesic proximity, and assigns each vertex to the
blob trajectory whose maximum is closest.
Parameters
----------
H : ndarray, shape (N, T)
HKS matrix at T log-spaced scales.
adjacency : sparse, shape (N, N)
Mesh adjacency (for 1-ring local-max detection).
t_values : ndarray or None, shape (T,)
Actual diffusion time values. If None, assumes 1..T.
gamma_normalize : float
Scale-normalisation exponent γ (Lindeberg 1998). Default 1.0
corresponds to the standard normalised Laplacian of Gaussian.
linking_radius : float
Maximum geodesic hops to link a maximum across scales.
min_trajectory_length : int
Minimum number of scales a blob must span.
Returns
-------
ScaleSpaceBlobResult
References
----------
Lindeberg T. Feature detection with automatic scale selection.
*IJCV* 30(2):79–116, 1998.
"""
H = np.asarray(H, dtype=np.float64)
n, T = H.shape
if t_values is None:
t_values = np.arange(1, T + 1, dtype=np.float64)
else:
t_values = np.asarray(t_values, dtype=np.float64)
# build adjacency list and extended k-hop neighbourhood
A_coo = sp.coo_matrix(adjacency)
adj_list = [set() for _ in range(n)]
for i, j in zip(A_coo.row.tolist(), A_coo.col.tolist()):
if i != j:
adj_list[i].add(j)
# expand to k-hop for linking radius
k_hops = max(1, int(linking_radius))
def _k_hop_neighbours(v, k):
"""Compute k-hop neighbourhood indices from an adjacency matrix."""
visited = {v}
frontier = {v}
for _ in range(k):
new_frontier = set()
for u in frontier:
for nb in adj_list[u]:
if nb not in visited:
visited.add(nb)
new_frontier.add(nb)
frontier = new_frontier
return visited
# scale-normalised response
H_norm = np.zeros_like(H)
for ti in range(T):
H_norm[:, ti] = (t_values[ti] ** gamma_normalize) * H[:, ti]
# detect local maxima at each scale (1-ring maximum)
maxima_per_scale = []
with progress_simple("Blob detection", total=T) as tick:
for ti in range(T):
vals = H_norm[:, ti]
local_max = []
for v in range(n):
is_max = True
for nb in adj_list[v]:
if vals[nb] >= vals[v]:
is_max = False
break
if is_max and vals[v] > 0:
local_max.append(
{
"vertex": v,
"scale_index": ti,
"t_value": float(t_values[ti]),
"response": float(vals[v]),
}
)
maxima_per_scale.append(local_max)
tick(1)
# link maxima across scales into trajectories
trajectories = []
prev_maxima = None
prev_traj_ids = None
for ti in range(T):
curr_maxima = maxima_per_scale[ti]
if not curr_maxima:
prev_maxima = None
prev_traj_ids = None
continue
if prev_maxima is None:
for m in curr_maxima:
trajectories.append([m])
prev_maxima = curr_maxima
prev_traj_ids = list(
range(
len(trajectories) - len(curr_maxima),
len(trajectories),
)
)
continue
# link by k-hop proximity
curr_traj_ids = [None] * len(curr_maxima)
used_prev = set()
for ci, cm in enumerate(curr_maxima):
best_pi = None
best_dist = float("inf")
cm_neighbours = _k_hop_neighbours(cm["vertex"], k_hops)
for pi, pm in enumerate(prev_maxima):
if pi in used_prev:
continue
if pm["vertex"] in cm_neighbours:
d = abs(cm["response"] - pm["response"])
if d < best_dist:
best_dist = d
best_pi = pi
if best_pi is not None:
tid = prev_traj_ids[best_pi]
trajectories[tid].append(cm)
curr_traj_ids[ci] = tid
used_prev.add(best_pi)
# unmatched → new trajectories
for ci, cm in enumerate(curr_maxima):
if curr_traj_ids[ci] is None:
trajectories.append([cm])
curr_traj_ids[ci] = len(trajectories) - 1
prev_maxima = curr_maxima
prev_traj_ids = curr_traj_ids
# filter by minimum trajectory length
long_trajectories = [tr for tr in trajectories if len(tr) >= min_trajectory_length]
# natural scale: for each vertex, find t where normalised response
# is maximal
natural_scales = t_values[H_norm.argmax(axis=1)]
# assign vertices to nearest trajectory (by maximum response vertex)
blob_labels = -np.ones(n, dtype=np.int64)
for ti_idx, traj in enumerate(long_trajectories):
# the trajectory's "representative" vertices at each scale
for step in traj:
v = step["vertex"]
if blob_labels[v] == -1:
blob_labels[v] = ti_idx
# also assign 1-ring neighbours of trajectory peaks
for nb in adj_list[v]:
if blob_labels[nb] == -1:
blob_labels[nb] = ti_idx
# assign remaining vertices to nearest blob by natural scale
# similarity and spatial proximity
unassigned = np.where(blob_labels == -1)[0]
if len(unassigned) > 0 and len(long_trajectories) > 0:
# build centroid for each blob
blob_centroids = np.zeros(len(long_trajectories))
for bi, traj in enumerate(long_trajectories):
blob_centroids[bi] = np.mean([s["t_value"] for s in traj])
for v in unassigned:
# find nearest assigned neighbour
best_label = -1
for nb in adj_list[v]:
if blob_labels[nb] >= 0:
best_label = blob_labels[nb]
break
if best_label >= 0:
blob_labels[v] = best_label
else:
# assign by natural scale
diff = np.abs(blob_centroids - natural_scales[v])
blob_labels[v] = int(np.argmin(diff))
return ScaleSpaceBlobResult(
blob_trajectories=long_trajectories,
natural_scales=natural_scales,
blob_labels=blob_labels,
n_blobs=len(long_trajectories),
metadata={
"gamma": gamma_normalize,
"linking_radius": linking_radius,
"total_trajectories_before_filter": len(trajectories),
"min_trajectory_length": min_trajectory_length,
},
)
# ======================================================================
# §15 MULTI-VIEW CLUSTERING (GEOMETRY + DESCRIPTOR VIEWS)
# ======================================================================
[docs]
def cluster_multiview(
H: DescriptorMatrix,
adjacency: SparseMatrix,
*,
n_clusters: int = 6,
n_eigenvectors_geo: int = 20,
n_eigenvectors_desc: int = 10,
alpha: float = 0.5,
fusion: Literal["spectral_average", "late_consensus", "concatenate"] = "spectral_average",
random_state: int = 42,
) -> ClusterResult:
"""Multi-view clustering with geometry and descriptor views.
View 1: Low-frequency Laplacian eigenfunctions (encoding spatial
position on the manifold — the same basis from which HKS is
built).
View 2: HKS/WKS descriptor profiles (encoding multi-scale
geometric features).
Three fusion strategies are available:
- ``"spectral_average"``: average the normalised Laplacians of
both views, then spectral clustering on the combined Laplacian.
- ``"late_consensus"``: cluster each view independently, then
reconcile via consensus (CSPA).
- ``"concatenate"``: stack view features and cluster jointly.
Parameters
----------
H : ndarray, shape (N, T)
Descriptor matrix (View 2).
adjacency : sparse, shape (N, N)
Mesh Laplacian (View 1 is its eigenfunctions).
n_clusters : int
n_eigenvectors_geo : int
Number of Laplacian eigenfunctions for View 1.
n_eigenvectors_desc : int
PCA components for View 2.
alpha : float
Weight for View 1 (geometry). 1−α for View 2 (descriptors).
fusion : str
random_state : int
Returns
-------
ClusterResult
References
----------
Kumar A, Rai P, Daumé H. Co-regularized multi-view spectral
clustering. *NeurIPS* 24, 2011.
"""
H = np.asarray(H, dtype=np.float64)
n = H.shape[0]
skc = _require_sklearn_cluster()
# --- View 1: Laplacian eigenfunctions ---
L = sp.csr_matrix(adjacency, dtype=np.float64)
from scipy.sparse.linalg import eigsh
k_geo = min(n_eigenvectors_geo, n - 1)
_, Phi = eigsh(L, k=k_geo, which="SM")
Phi = Phi[:, 1:] if k_geo > 1 else Phi # drop constant mode
# --- View 2: reduced descriptor ---
skd = _require_sklearn_decomposition()
k_desc = min(n_eigenvectors_desc, H.shape[1])
X_log = np.log(np.maximum(H, 1e-12))
Psi = skd.PCA(n_components=k_desc, random_state=random_state).fit_transform(X_log)
if fusion == "spectral_average":
# build affinity for each view
from scipy.spatial.distance import pdist, squareform
D1 = squareform(pdist(Phi, "sqeuclidean"))
sigma1 = np.median(D1[D1 > 0]) or 1.0
A1 = np.exp(-D1 / sigma1)
np.fill_diagonal(A1, 0)
D1_deg = np.diag(A1.sum(axis=1))
L1 = D1_deg - A1
D2 = squareform(pdist(Psi, "sqeuclidean"))
sigma2 = np.median(D2[D2 > 0]) or 1.0
A2 = np.exp(-D2 / sigma2)
np.fill_diagonal(A2, 0)
D2_deg = np.diag(A2.sum(axis=1))
L2 = D2_deg - A2
# normalise each
D1_inv_sqrt = np.diag(1.0 / np.sqrt(np.diag(D1_deg) + 1e-12))
D2_inv_sqrt = np.diag(1.0 / np.sqrt(np.diag(D2_deg) + 1e-12))
L1_sym = D1_inv_sqrt @ L1 @ D1_inv_sqrt
L2_sym = D2_inv_sqrt @ L2 @ D2_inv_sqrt
L_avg = alpha * L1_sym + (1.0 - alpha) * L2_sym
_eigvals, eigvecs = np.linalg.eigh(L_avg)
U = eigvecs[:, :n_clusters]
# row-normalise (Ng-Jordan-Weiss)
norms = np.linalg.norm(U, axis=1, keepdims=True)
norms[norms == 0] = 1.0
U /= norms
labels = (
skc.KMeans(
n_clusters=n_clusters,
n_init=10,
random_state=random_state,
)
.fit_predict(U)
.astype(np.int64)
)
elif fusion == "late_consensus":
# cluster each view, then majority vote
lab1 = (
skc.KMeans(
n_clusters=n_clusters,
n_init=10,
random_state=random_state,
)
.fit_predict(Phi)
.astype(np.int64)
)
lab2 = (
skc.KMeans(
n_clusters=n_clusters,
n_init=10,
random_state=random_state,
)
.fit_predict(Psi)
.astype(np.int64)
)
# consensus via co-association matrix
C = np.zeros((n, n), dtype=np.float64)
for lab in [lab1, lab2]:
for k in range(n_clusters):
mask = lab == k
C[np.ix_(mask, mask)] += 1.0
C /= 2.0
# spectral clustering on co-association
labels = (
skc.SpectralClustering(
n_clusters=n_clusters,
affinity="precomputed",
random_state=random_state,
)
.fit_predict(C)
.astype(np.int64)
)
elif fusion == "concatenate":
# normalise each view to unit variance
Phi_n = Phi / (Phi.std(axis=0, keepdims=True) + 1e-12)
Psi_n = Psi / (Psi.std(axis=0, keepdims=True) + 1e-12)
Z = np.hstack([alpha * Phi_n, (1.0 - alpha) * Psi_n])
labels = (
skc.KMeans(
n_clusters=n_clusters,
n_init=10,
random_state=random_state,
)
.fit_predict(Z)
.astype(np.int64)
)
else:
raise ValueError(f"Unknown fusion: {fusion}")
return ClusterResult(
labels=labels,
n_clusters=n_clusters,
method=f"multiview_{fusion}",
quality={},
metadata={"alpha": alpha, "fusion": fusion},
)
# ======================================================================
# §16 SPECTRAL GRAPH WAVELET CLUSTERING
# ======================================================================
[docs]
def cluster_wavelet_coefficients(
H: DescriptorMatrix,
adjacency: SparseMatrix,
*,
n_scales: int = 5,
n_clusters: int = 6,
wavelet_type: Literal["mexican_hat", "heat", "meyer"] = "mexican_hat",
n_eigenvectors: int = 100,
clusterer: Literal["kmeans", "hdbscan"] = "kmeans",
random_state: int = 42,
backend: Literal["auto", "cpu", "gpu"] = "auto",
) -> ClusterResult:
"""Cluster vertices by spectral graph wavelet energy profiles.
Decomposes HKS into band-pass components using spectral graph
wavelets (Hammond, Vandergheynst & Gribonval, 2011), computes
the energy in each band at each vertex, and clusters vertices
by their multi-band energy signature.
Unlike raw HKS (which is a low-pass filter at each t), wavelet
decomposition provides **orthogonal** band-pass filters, so
features at different scales do not leak into each other.
Parameters
----------
H : ndarray, shape (N, T)
Descriptor matrix.
adjacency : sparse, shape (N, N)
Mesh Laplacian.
n_scales : int
Number of wavelet scales (frequency bands).
n_clusters : int
For k-means.
wavelet_type : str
``"mexican_hat"`` (g(x) = x·exp(-x)), ``"heat"`` (exp(-x)),
``"meyer"`` (smooth band-pass).
n_eigenvectors : int
Laplacian eigenvectors for spectral acceleration.
clusterer : str
random_state : int
backend : str
Returns
-------
ClusterResult
With ``wavelet_energy`` matrix in metadata.
References
----------
Hammond DK, Vandergheynst P, Gribonval R. Wavelets on graphs via
spectral graph theory. *ACHA* 30(2):129–150, 2011.
"""
H = np.asarray(H, dtype=np.float64)
n, _T = H.shape
# --- Laplacian eigenbasis ---
L = sp.csr_matrix(adjacency, dtype=np.float64)
from scipy.sparse.linalg import eigsh
k = min(n_eigenvectors, n - 1)
eigenvalues, U = eigsh(L, k=k, which="SM")
eigenvalues = np.maximum(eigenvalues, 0.0)
lam_max = eigenvalues[-1] if eigenvalues[-1] > 0 else 1.0
# --- wavelet kernel ---
scales = np.logspace(
np.log10(2.0 / lam_max),
np.log10(2.0 / max(eigenvalues[1], 1e-6)),
n_scales,
)
def _wavelet_kernel(s, lam):
"""Evaluate the spectral graph wavelet kernel at a given scale."""
if wavelet_type == "mexican_hat":
x = s * lam
return x * np.exp(-x)
elif wavelet_type == "heat":
return np.exp(-s * lam)
elif wavelet_type == "meyer":
x = s * lam
val = np.where(
(x >= 2 * np.pi / 3) & (x <= 8 * np.pi / 3),
np.cos(np.pi / 2 * _meyer_aux(3 * x / (4 * np.pi) - 1)),
0.0,
)
return val
else:
raise ValueError(f"Unknown wavelet_type: {wavelet_type}")
# --- compute wavelet transform of HKS ---
use_gpu = _resolve_backend(backend)
if use_gpu:
torch = _require_torch()
device = torch.device("cuda")
U_t = torch.tensor(U, dtype=torch.float32, device=device)
H_t = torch.tensor(H, dtype=torch.float32, device=device)
torch.tensor(eigenvalues, dtype=torch.float32, device=device)
energy_features = torch.zeros((n, n_scales), dtype=torch.float32, device=device)
with progress_simple("Wavelet energy [GPU]", total=n_scales) as tick:
for si, s in enumerate(scales):
g_s = torch.tensor(
_wavelet_kernel(s, eigenvalues),
dtype=torch.float32,
device=device,
)
# wavelet coefficients at scale s:
# W_s H = U diag(g_s) U^T H
H_spec = U_t.T @ H_t # (k, T)
H_filtered = g_s[:, None] * H_spec # (k, T)
W_s_H = U_t @ H_filtered # (N, T)
# energy = sum over time of squared coefficients
energy_features[:, si] = (W_s_H**2).sum(dim=1)
tick(1)
energy_np = energy_features.cpu().numpy().astype(np.float64)
del U_t, H_t, energy_features
torch.cuda.empty_cache()
else:
energy_np = np.zeros((n, n_scales), dtype=np.float64)
with progress_simple("Wavelet energy [CPU]", total=n_scales) as tick:
for si, s in enumerate(scales):
g_s = _wavelet_kernel(s, eigenvalues) # (k,)
H_spec = U.T @ H # (k, T)
H_filtered = g_s[:, None] * H_spec # (k, T)
W_s_H = U @ H_filtered # (N, T)
energy_np[:, si] = np.sum(W_s_H**2, axis=1)
tick(1)
# normalise
row_sums = energy_np.sum(axis=1, keepdims=True)
row_sums[row_sums == 0] = 1.0
energy_np /= row_sums
# --- cluster ---
skc = _require_sklearn_cluster()
if clusterer == "kmeans":
labels = (
skc.KMeans(
n_clusters=n_clusters,
n_init=10,
random_state=random_state,
)
.fit_predict(energy_np)
.astype(np.int64)
)
elif clusterer == "hdbscan":
res = cluster_hdbscan(
energy_np,
log_transform=False,
dim_reduction=None,
random_state=random_state,
)
labels = res.labels
else:
raise ValueError(f"Unknown clusterer: {clusterer}")
n_clust = len(set(labels[labels >= 0]))
return ClusterResult(
labels=labels,
n_clusters=n_clust,
method="wavelet_energy",
quality={},
metadata={
"wavelet_energy": energy_np,
"scales": scales,
"wavelet_type": wavelet_type,
},
)
def _meyer_aux(x):
"""Meyer wavelet auxiliary function ν(x) for smooth transition."""
x = np.clip(x, 0.0, 1.0)
return x**4 * (35 - 84 * x + 70 * x**2 - 20 * x**3)
# ======================================================================
# Internal helpers
# ======================================================================
def _resolve_backend(backend: str) -> bool:
"""Return True if GPU should be used."""
if backend == "gpu":
return True
if backend == "cpu":
return False
# auto
try:
import torch
return torch.cuda.is_available()
except ImportError:
return False