Source code for spectralbrain.core.pointclouds

"""Point-cloud representation and point-cloud-specific analysis.

Provides :class:`BrainPointCloud`, the mesh-free geometric container
for brain structures extracted from volumetric segmentations,
tractography, or surface vertex sets.

Design philosophy
-----------------
Point clouds are the **atlas-free, mesh-free** pathway.  No marching
cubes, no surface reconstruction.  A volumetric segmentation label
becomes a set of (x, y, z) coordinates in world space — full stop.
The Laplace–Beltrami operator is approximated via graph Laplacians
(kNN, Belkin–Niyogi, or Sharp–Crane robust), and spectral descriptors
are computed identically to the mesh pathway.

Acquisition pathways
--------------------
.. code-block:: text

   ┌─ FreeSurfer segmentation (aseg.mgz, thalamic_nuclei) ──► voxel → RAS
   ├─ FreeSurfer surface (.white, .pial) ──────────────────► vertices as points
   ├─ HippUnfold surface (.surf.gii, 8k/18k) ─────────────► vertices as points
   ├─ Raw T1w/T2w/FLAIR (via containers) ──────────────────► segment → voxel → RAS
   ├─ Atlas label volume (Schaefer, Julich, any .nii.gz) ──► voxel → RAS
   ├─ TractSeg tract masks (.nii.gz per tract) ────────────► voxel → RAS
   └─ Tractography streamlines (.trk, .tck) ───────────────► streamline points

All converge to a single ``BrainPointCloud(points)`` that enters the
spectral pipeline.
"""

from __future__ import annotations

from pathlib import Path
from typing import (
    Any,
    Literal,
)

import numpy as np
import scipy.sparse as sp
from scipy.spatial import cKDTree

from spectralbrain.backends.cpu import NumpyBackend
from spectralbrain.core.base import (
    SpectralDecomposition,
    compute_centroid,
    convex_hull_area,
    farthest_point_sampling,
    knn_search,
)
from spectralbrain.runtime import (
    MassMatrix,
    Normals,
    PathLike,
    Points,
    ScalarMap,
    SparseMatrix,
    get_logger,
    progress_simple,
)

logger = get_logger(__name__)


# ======================================================================
# §1  BRAIN POINT CLOUD CLASS
# ======================================================================


[docs] class BrainPointCloud: """Unstructured 3D point cloud for brain structures. Implements :class:`~spectralbrain.core.base.GeometricObject`. No face connectivity — the Laplacian is built from a neighbourhood graph (kNN, ε-ball, or Belkin–Niyogi). Parameters ---------- points : ndarray, shape (N, 3) Coordinates in world space (mm, RAS). metadata : dict, optional Provenance (subject, hemisphere, structure, source, …). Examples -------- >>> pc = BrainPointCloud.from_freesurfer_seg( ... "aseg.mgz", label_id=17, jitter=True) >>> decomp = pc.decompose(k=50, method="robust") """
[docs] def __init__( self, points: Points, metadata: dict[str, Any] | None = None, ) -> None: """Initialise from a (N, 3) coordinate array.""" self.points = np.asarray(points, dtype=np.float64) self.metadata = metadata or {} if self.points.ndim != 2 or self.points.shape[1] != 3: raise ValueError(f"points must be (N, 3), got {self.points.shape}") # Cached properties. self._normals: Normals | None = None self._L: SparseMatrix | None = None self._M: MassMatrix | None = None self._kd_tree: cKDTree | None = None
# ── GeometricObject protocol ────────────────────────────────────── @property def coordinates(self) -> Points: """Return the point coordinates array.""" return self.points @property def n_points(self) -> int: """Return the number of points in the cloud.""" return self.points.shape[0]
[docs] def surface_area(self) -> float: """Approximate surface area via convex hull. For point clouds without face topology, the convex hull area is a rough proxy. Used for eigenvalue normalisation. """ return convex_hull_area(self.points)
# ── KD-tree (cached) ────────────────────────────────────────────── @property def kd_tree(self) -> cKDTree: """Cached spatial index.""" if self._kd_tree is None: self._kd_tree = cKDTree(self.points) return self._kd_tree # ══════════════════════════════════════════════════════════════════ # §2 FACTORY CLASSMETHODS — multi-source acquisition # ══════════════════════════════════════════════════════════════════ # ── From volumetric segmentation (THE core pathway) ───────────────
[docs] @classmethod def from_volume( cls, label_volume: np.ndarray, affine: np.ndarray, label_id: int, *, jitter: bool = True, jitter_scale: float = 0.25, seed: int | None = None, subsample: int | None = None, metadata: dict[str, Any] | None = None, ) -> BrainPointCloud: """Extract a point cloud from a volumetric label map. The most direct pathway: ``mgz → np.array → point cloud``. No marching cubes, no mesh reconstruction. Voxel centroids in world space become points. Parameters ---------- label_volume : ndarray, shape (X, Y, Z) Integer label volume (e.g. from ``aseg.mgz``). affine : ndarray, shape (4, 4) Voxel-to-world affine. label_id : int Target label (e.g. 17 = left hippocampus in aseg). jitter : bool Add sub-voxel Gaussian noise to break grid artefacts. jitter_scale : float Jitter σ in voxel units. seed : int, optional RNG seed for jitter. subsample : int, optional If set, apply FPS to reduce to this many points. metadata : dict, optional Returns ------- BrainPointCloud Examples -------- >>> data, affine = sb.io.load_nifti("aseg.mgz") >>> pc = BrainPointCloud.from_volume(data, affine, label_id=17) """ from spectralbrain.io.loaders import labels_to_pointcloud points = labels_to_pointcloud( label_volume, affine, label_id, jitter=jitter, jitter_scale=jitter_scale, seed=seed, ) meta = { "source": "volume", "label_id": label_id, "n_raw_points": points.shape[0], **(metadata or {}), } if subsample is not None and points.shape[0] > subsample: points, _ = farthest_point_sampling(points, subsample, seed=seed) meta["subsampled_to"] = subsample return cls(points, metadata=meta)
# ── From FreeSurfer segmentation file (load + extract) ────────────
[docs] @classmethod def from_freesurfer_seg( cls, seg_path: PathLike, label_id: int, *, jitter: bool = True, jitter_scale: float = 0.25, seed: int | None = None, subsample: int | None = None, metadata: dict[str, Any] | None = None, ) -> BrainPointCloud: """Load a FreeSurfer segmentation and extract a structure. Handles ``aseg.mgz``, ``aparc+aseg.mgz``, ``ThalamicNuclei.v13.T1.mgz``, ``amygNucVolumes.v22.mgz``, ``hippoSfVolumes-T1.v22.mgz``, or any integer label volume. Parameters ---------- seg_path : PathLike Path to ``.mgz`` / ``.nii.gz`` segmentation. label_id : int Target label code. jitter, jitter_scale, seed, subsample : as in :meth:`from_volume`. metadata : dict, optional Returns ------- BrainPointCloud """ from spectralbrain.io.loaders import load_nifti data, affine = load_nifti(seg_path) meta = { "source": "freesurfer_seg", "seg_file": str(Path(seg_path).name), **(metadata or {}), } return cls.from_volume( data, affine, label_id, jitter=jitter, jitter_scale=jitter_scale, seed=seed, subsample=subsample, metadata=meta, )
# ── From any surface file (vertices as points) ────────────────────
[docs] @classmethod def from_surface( cls, surface_path: PathLike, *, subsample: int | None = None, seed: int | None = None, metadata: dict[str, Any] | None = None, ) -> BrainPointCloud: """Load a surface file and use its vertices as a point cloud. Works with FreeSurfer surfaces (.white, .pial, .inflated), GIfTI (.surf.gii), HippUnfold outputs, or any mesh format. Parameters ---------- surface_path : PathLike subsample : int, optional FPS subsampling target. seed : int, optional metadata : dict, optional Returns ------- BrainPointCloud """ from spectralbrain.io.loaders import load result = load(surface_path) points = result["vertices"] meta = { "source": "surface", "surface_file": str(Path(surface_path).name), "n_original_vertices": points.shape[0], **(metadata or {}), } if subsample is not None and points.shape[0] > subsample: points, _ = farthest_point_sampling(points, subsample, seed=seed) meta["subsampled_to"] = subsample return cls(points, metadata=meta)
# ── From atlas label volume (generic: Schaefer, Julich, etc.) ─────
[docs] @classmethod def from_atlas_volume( cls, atlas_path: PathLike, label_id: int, *, jitter: bool = True, seed: int | None = None, subsample: int | None = None, metadata: dict[str, Any] | None = None, ) -> BrainPointCloud: """Extract a point cloud from an arbitrary atlas NIfTI. The atlas volume must be in the same space as the subject (template space or registered native). Parameters ---------- atlas_path : PathLike NIfTI label volume (e.g. Schaefer in MNI). label_id : int Atlas region ID. jitter, seed, subsample : as in :meth:`from_volume`. metadata : dict, optional Returns ------- BrainPointCloud """ from spectralbrain.io.loaders import load_nifti data, affine = load_nifti(atlas_path) meta = { "source": "atlas_volume", "atlas_file": str(Path(atlas_path).name), **(metadata or {}), } return cls.from_volume( data, affine, label_id, jitter=jitter, seed=seed, subsample=subsample, metadata=meta, )
# ── From TractSeg tract mask ──────────────────────────────────────
[docs] @classmethod def from_tract_mask( cls, mask_path: PathLike, *, threshold: float = 0.5, jitter: bool = True, seed: int | None = None, subsample: int | None = None, metadata: dict[str, Any] | None = None, ) -> BrainPointCloud: """Extract a point cloud from a TractSeg binary tract mask. TractSeg outputs one NIfTI per tract (72 tracts). Each non-zero voxel becomes a point. Parameters ---------- mask_path : PathLike Binary tract mask NIfTI (from TractSeg). threshold : float Binarisation threshold for probabilistic masks. jitter, seed, subsample : as in :meth:`from_volume`. metadata : dict, optional Returns ------- BrainPointCloud """ from spectralbrain.io.loaders import load_nifti data, affine = load_nifti(mask_path) binary = (data > threshold).astype(np.int32) meta = { "source": "tract_mask", "tract_file": str(Path(mask_path).name), **(metadata or {}), } # Use label_id=1 on the binarised mask. return cls.from_volume( binary, affine, label_id=1, jitter=jitter, seed=seed, subsample=subsample, metadata=meta, )
# ── From tractography streamlines (.trk, .tck) ───────────────────
[docs] @classmethod def from_tractography( cls, trk_path: PathLike, *, sampling: Literal["all", "endpoints", "midpoints", "uniform"] = "uniform", points_per_streamline: int = 10, subsample: int | None = None, seed: int | None = None, metadata: dict[str, Any] | None = None, ) -> BrainPointCloud: """Extract a point cloud from tractography streamlines. Parameters ---------- trk_path : PathLike ``.trk`` (TrackVis) or ``.tck`` (MRtrix) file. sampling : str ``"all"`` — every point from every streamline. ``"endpoints"`` — first + last point per streamline. ``"midpoints"`` — middle point per streamline. ``"uniform"`` — uniformly resample each streamline to *points_per_streamline* points. points_per_streamline : int For ``sampling="uniform"``: points per streamline. subsample : int, optional FPS subsampling after extraction. seed : int, optional metadata : dict, optional Returns ------- BrainPointCloud """ try: import nibabel as nib except ImportError as exc: raise ImportError( "nibabel is required for tractography loading.\n pip install nibabel" ) from exc trk = nib.streamlines.load(str(trk_path)) streamlines = trk.streamlines all_points: list[np.ndarray] = [] if sampling == "all": for sl in streamlines: all_points.append(np.asarray(sl, dtype=np.float64)) elif sampling == "endpoints": for sl in streamlines: sl = np.asarray(sl, dtype=np.float64) if len(sl) >= 2: all_points.append(sl[[0, -1]]) elif len(sl) == 1: all_points.append(sl[[0]]) elif sampling == "midpoints": for sl in streamlines: sl = np.asarray(sl, dtype=np.float64) mid = len(sl) // 2 all_points.append(sl[[mid]]) elif sampling == "uniform": for sl in streamlines: sl = np.asarray(sl, dtype=np.float64) if len(sl) < 2: all_points.append(sl) continue # Resample by arc-length interpolation. cumlen = np.concatenate( [ [0], np.cumsum(np.linalg.norm(np.diff(sl, axis=0), axis=1)), ] ) total = cumlen[-1] if total < 1e-10: all_points.append(sl[:1]) continue target_dists = np.linspace(0, total, points_per_streamline) resampled = np.zeros((points_per_streamline, 3)) for dim in range(3): resampled[:, dim] = np.interp(target_dists, cumlen, sl[:, dim]) all_points.append(resampled) else: raise ValueError(f"Unknown sampling: {sampling!r}") if not all_points: raise ValueError(f"No streamlines found in {trk_path}") points = np.vstack(all_points) meta = { "source": "tractography", "trk_file": str(Path(trk_path).name), "sampling": sampling, "n_streamlines": len(streamlines), "n_raw_points": points.shape[0], **(metadata or {}), } if subsample is not None and points.shape[0] > subsample: points, _ = farthest_point_sampling(points, subsample, seed=seed) meta["subsampled_to"] = subsample logger.info( "Extracted %d points from %d streamlines (%s sampling)", points.shape[0], len(streamlines), sampling, ) return cls(points, metadata=meta)
# ── From raw anatomical via containers ────────────────────────────
[docs] @classmethod def from_raw( cls, raw_path: PathLike, label_id: int, *, output_dir: PathLike | None = None, gpu: bool | None = None, jitter: bool = True, seed: int | None = None, subsample: int | None = None, metadata: dict[str, Any] | None = None, ) -> BrainPointCloud: """End-to-end: raw T1w/T2w/FLAIR → segment → point cloud. Chains skull-stripping and segmentation via Singularity containers, then extracts the target structure as a point cloud. **No DL dependencies on host.** Parameters ---------- raw_path : PathLike Raw anatomical NIfTI (any contrast). label_id : int Target structure label. output_dir : PathLike, optional Working directory for intermediate files. gpu : bool, optional jitter, seed, subsample : as in :meth:`from_volume`. metadata : dict, optional Returns ------- BrainPointCloud """ from spectralbrain.io.preprocess import raw_to_pointcloud points = raw_to_pointcloud( raw_path, label_id, output_dir=output_dir, gpu=gpu, jitter=jitter, jitter_scale=0.25, seed=seed, ) meta = { "source": "raw_pipeline", "raw_file": str(Path(raw_path).name), "label_id": label_id, **(metadata or {}), } if subsample is not None and points.shape[0] > subsample: points, _ = farthest_point_sampling(points, subsample, seed=seed) meta["subsampled_to"] = subsample return cls(points, metadata=meta)
# ══════════════════════════════════════════════════════════════════ # §3 LAPLACIAN CONSTRUCTION # ══════════════════════════════════════════════════════════════════
[docs] def compute_laplacian( self, method: Literal["knn", "belkin_niyogi", "robust"] = "robust", *, k: int = 30, epsilon: float | None = None, sigma: float | None = None, robust_mollify: float = 1e-5, ) -> tuple[SparseMatrix, MassMatrix]: """Construct a graph Laplacian on the point cloud. Parameters ---------- method : str ``"knn"`` — Gaussian-weighted kNN graph Laplacian. ``"belkin_niyogi"`` — heat-kernel weighted Laplacian with provable convergence to the continuous LBO (Belkin & Niyogi, JCSS 2008). ``"robust"`` — Sharp & Crane tufted Laplacian (robust to noise and non-uniform density). k : int Number of neighbours for kNN and Belkin–Niyogi. epsilon : float, optional Bandwidth for ε-ball graph. ``None`` = auto from mean kNN distance. sigma : float, optional Gaussian kernel bandwidth. ``None`` = auto (median distance heuristic). robust_mollify : float Mollification for the robust method. Returns ------- L : SparseMatrix, shape (N, N) M : MassMatrix, shape (N, N) """ if method == "knn": L, M = _knn_laplacian( self.points, k=k, sigma=sigma, ) elif method == "belkin_niyogi": L, M = _belkin_niyogi_laplacian( self.points, k=k, epsilon=epsilon, ) elif method == "robust": L, M = _robust_laplacian_pc( self.points, mollify_factor=robust_mollify, ) else: raise ValueError(f"Unknown method: {method!r}") self._L = L self._M = M logger.info( "Point cloud Laplacian (%s): N=%d, nnz=%d", method, L.shape[0], L.nnz, ) return L, M
# ── Spectral decomposition ────────────────────────────────────────
[docs] def decompose( self, k: int = 50, *, laplacian_method: Literal["knn", "belkin_niyogi", "robust"] = "robust", backend: Any | None = None, **kwargs: Any, ) -> SpectralDecomposition: """Compute the spectral decomposition. Parameters ---------- k : int Number of eigenpairs. laplacian_method : str Passed to :meth:`compute_laplacian`. backend : Backend, optional **kwargs Extra args for :meth:`compute_laplacian` and ``backend.eigsh()``. Returns ------- SpectralDecomposition """ if self._L is None or self._M is None: lap_kwargs = { key: kwargs.pop(key) for key in ("sigma", "epsilon", "robust_mollify") if key in kwargs } self.compute_laplacian( method=laplacian_method, k=kwargs.pop("knn_k", 30), **lap_kwargs, ) be = backend or NumpyBackend() evals, evecs = be.eigsh(self._L, self._M, k=k, **kwargs) return SpectralDecomposition( eigenvalues=evals, eigenvectors=evecs, stiffness=self._L, mass=self._M, surface_area=self.surface_area(), metadata={ **self.metadata, "laplacian_method": laplacian_method, "backend": be.name, "n_points": self.n_points, }, )
# ══════════════════════════════════════════════════════════════════ # §4 NORMALS AND CURVATURE (from local PCA, no faces needed) # ══════════════════════════════════════════════════════════════════
[docs] def compute_normals( self, k: int = 15, *, orient_to_centroid: bool = True, ) -> Normals: """Estimate per-point normals via local PCA. The normal at each point is the eigenvector of the local covariance matrix corresponding to the **smallest** eigenvalue (the direction of least variance in the neighbourhood). Parameters ---------- k : int Neighbours for local PCA. orient_to_centroid : bool Flip normals to point away from the cloud centroid (heuristic for outward orientation). Returns ------- normals : ndarray, shape (N, 3) """ if self._normals is not None: return self._normals N = self.n_points normals = np.zeros((N, 3), dtype=np.float64) _, indices = knn_search(self.points, k=k) centroid = compute_centroid(self.points) with progress_simple("Estimating normals", total=N) as tick: for i in range(N): nbrs = self.points[indices[i]] # (k, 3) cov = np.cov(nbrs, rowvar=False) # (3, 3) _eigvals, eigvecs = np.linalg.eigh(cov) # Smallest eigenvalue → normal direction. normals[i] = eigvecs[:, 0] if orient_to_centroid: outward = self.points[i] - centroid if np.dot(normals[i], outward) < 0: normals[i] *= -1 if (i + 1) % 500 == 0 or i == N - 1: tick(min(500, N - (i + 1 - 500))) # Normalise (should already be unit, but ensure). norms = np.linalg.norm(normals, axis=1, keepdims=True) normals /= np.clip(norms, 1e-12, None) self._normals = normals return normals
[docs] def local_curvature( self, k: int = 15, ) -> tuple[ScalarMap, ScalarMap]: """Estimate per-point curvature from local PCA eigenvalues. The ratio λ₀ / (λ₀ + λ₁ + λ₂) of the smallest eigenvalue to the sum is a measure of local planarity — smaller values mean flatter (lower curvature), larger values mean more curved. Parameters ---------- k : int Neighbours for local PCA. Returns ------- curvature : ndarray, shape (N,) Normalised planarity measure (0 = flat, 1 = isotropic). linearity : ndarray, shape (N,) (λ₀ - λ₁) / λ₀ — high for tube-like structures. """ N = self.n_points curvature = np.zeros(N, dtype=np.float64) linearity = np.zeros(N, dtype=np.float64) _, indices = knn_search(self.points, k=k) with progress_simple("Estimating curvature", total=N) as tick: for i in range(N): nbrs = self.points[indices[i]] cov = np.cov(nbrs, rowvar=False) eigvals = np.linalg.eigvalsh(cov) # ascending total = eigvals.sum() if total > 1e-20: curvature[i] = eigvals[0] / total if eigvals[2] > 1e-20: linearity[i] = (eigvals[2] - eigvals[1]) / eigvals[2] if (i + 1) % 500 == 0 or i == N - 1: tick(min(500, N - (i + 1 - 500))) return curvature, linearity
# ══════════════════════════════════════════════════════════════════ # §5 ATLAS-FREE CLUSTERING # ══════════════════════════════════════════════════════════════════
[docs] def cluster( self, method: Literal[ "hdbscan", "spectral", "kmeans", "dbscan", ] = "hdbscan", *, n_clusters: int | None = None, min_cluster_size: int = 50, eps: float | None = None, **kwargs: Any, ) -> np.ndarray: """Atlas-free spatial clustering of the point cloud. Assigns each point to a cluster without relying on any atlas or parcellation. Useful for data-driven subregion discovery. Parameters ---------- method : str ``"hdbscan"`` — density-based, handles irregular shapes, auto-detects number of clusters (recommended). ``"spectral"`` — spectral clustering on kNN graph. ``"kmeans"`` — simple, fast, spherical assumption. ``"dbscan"`` — density-based, fixed epsilon. n_clusters : int, optional For methods that require it (spectral, kmeans). min_cluster_size : int For HDBSCAN. eps : float, optional For DBSCAN. **kwargs Passed to the underlying clustering algorithm. Returns ------- labels : ndarray, shape (N,), int Cluster assignment per point. ``-1`` = noise (HDBSCAN / DBSCAN). """ if method == "hdbscan": return _cluster_hdbscan( self.points, min_cluster_size=min_cluster_size, **kwargs, ) elif method == "spectral": if n_clusters is None: raise ValueError("n_clusters required for spectral clustering.") return _cluster_spectral( self.points, n_clusters=n_clusters, **kwargs, ) elif method == "kmeans": if n_clusters is None: raise ValueError("n_clusters required for kmeans.") return _cluster_kmeans( self.points, n_clusters=n_clusters, **kwargs, ) elif method == "dbscan": return _cluster_dbscan( self.points, eps=eps, **kwargs, ) else: raise ValueError(f"Unknown clustering method: {method!r}")
# ── Subsample ─────────────────────────────────────────────────────
[docs] def subsample( self, n: int, *, seed: int | None = None, ) -> BrainPointCloud: """Return a subsampled copy via farthest-point sampling. Parameters ---------- n : int Target number of points. seed : int, optional Returns ------- BrainPointCloud """ sampled, _idx = farthest_point_sampling(self.points, n, seed=seed) meta = { **self.metadata, "subsampled_to": n, "subsampled_from": self.n_points, } return BrainPointCloud(sampled, metadata=meta)
# ── Denoise ───────────────────────────────────────────────────────
[docs] def denoise( self, k: int = 10, threshold_sigma: float = 2.5, ) -> BrainPointCloud: """Remove density outliers (statistical outlier removal). Parameters ---------- k : int Neighbours for density estimation. threshold_sigma : float Points beyond this many σ from mean log-density are removed. Returns ------- BrainPointCloud Cleaned copy. """ from spectralbrain.core.base import detect_density_outliers outliers = detect_density_outliers( self.points, k=k, threshold_sigma=threshold_sigma, ) clean = self.points[~outliers] n_removed = outliers.sum() logger.info( "Denoised: removed %d / %d points (%.1f%%)", n_removed, self.n_points, 100 * n_removed / self.n_points, ) meta = { **self.metadata, "denoised": True, "n_removed": int(n_removed), } return BrainPointCloud(clean, metadata=meta)
# ── repr ────────────────────────────────────────────────────────── def __repr__(self) -> str: """Return a compact point cloud summary string.""" src = self.metadata.get("source", "") struct = self.metadata.get("structure", "") parts = [f"BrainPointCloud(n_points={self.n_points}"] if src: """Return a compact point cloud summary.""" parts.append(f", source='{src}'") if struct: parts.append(f", structure='{struct}'") parts.append(")") return "".join(parts)
# ====================================================================== # §6 LAPLACIAN IMPLEMENTATIONS # ====================================================================== def _knn_laplacian( points: Points, *, k: int = 30, sigma: float | None = None, ) -> tuple[SparseMatrix, MassMatrix]: """Gaussian-weighted kNN graph Laplacian. W_ij = exp(-||x_i - x_j||² / (2σ²)) if j ∈ kNN(i) L = D - W M = D (diagonal degree as mass) Parameters ---------- points : ndarray, shape (N, 3) k : int sigma : float, optional Kernel bandwidth. ``None`` = median distance heuristic. Returns ------- L, M : SparseMatrix """ N = points.shape[0] dists, indices = knn_search(points, k=k) if sigma is None: # Median heuristic: σ = median of all kNN distances. sigma = float(np.median(dists[:, 1:])) if sigma < 1e-10: sigma = 1.0 sigma2 = 2.0 * sigma**2 rows = np.repeat(np.arange(N), k) cols = indices.ravel() weights = np.exp(-(dists.ravel() ** 2) / sigma2) W = sp.csr_matrix((weights, (rows, cols)), shape=(N, N)) W = (W + W.T) / 2 # symmetrise D_vals = np.asarray(W.sum(axis=1)).ravel() D = sp.diags(D_vals, 0, shape=(N, N), format="csc") L = sp.csc_matrix(D - W) # Mass = diagonal degree. M = sp.diags(D_vals, 0, shape=(N, N), format="csc") return L, M def _belkin_niyogi_laplacian( points: Points, *, k: int = 30, epsilon: float | None = None, ) -> tuple[SparseMatrix, MassMatrix]: """Belkin–Niyogi heat-kernel Laplacian with convergence guarantees. The graph Laplacian L_ε converges to the continuous Laplace– Beltrami operator as N → ∞ and ε → 0 at appropriate rate (Belkin & Niyogi, JCSS 2008). W_ij = (1 / (4πε)) · exp(-||x_i - x_j||² / (4ε)) L_ε = (1/N) · (D - W) Parameters ---------- points : ndarray, shape (N, 3) k : int kNN for sparsification (exact B-N uses all pairs; kNN approximation is standard for scalability). epsilon : float, optional Bandwidth ε. ``None`` = auto from mean kNN distance squared. Returns ------- L, M : SparseMatrix """ N = points.shape[0] dists, indices = knn_search(points, k=k) if epsilon is None: mean_dist = float(np.mean(dists[:, 1:])) epsilon = mean_dist**2 if epsilon < 1e-10: epsilon = 1.0 coeff = 1.0 / (4.0 * np.pi * epsilon) exp_coeff = -1.0 / (4.0 * epsilon) rows = np.repeat(np.arange(N), k) cols = indices.ravel() weights = coeff * np.exp(exp_coeff * dists.ravel() ** 2) W = sp.csr_matrix((weights, (rows, cols)), shape=(N, N)) W = (W + W.T) / 2 D_vals = np.asarray(W.sum(axis=1)).ravel() D = sp.diags(D_vals, 0, shape=(N, N), format="csc") L = sp.csc_matrix((1.0 / N) * (D - W)) # Uniform mass for point clouds (1/N per point). M = sp.diags( np.full(N, 1.0 / N, dtype=np.float64), 0, shape=(N, N), format="csc", ) return L, M def _robust_laplacian_pc( points: Points, *, mollify_factor: float = 1e-5, ) -> tuple[SparseMatrix, MassMatrix]: """Sharp–Crane tufted Laplacian for point clouds (SGP 2020). Parameters ---------- points : ndarray, shape (N, 3) mollify_factor : float Returns ------- L, M : SparseMatrix """ try: import robust_laplacian as rl except ImportError as exc: raise ImportError( "robust_laplacian is required for method='robust'.\n pip install robust_laplacian" ) from exc L, M = rl.point_cloud_laplacian( np.asarray(points, dtype=np.float64), mollify_factor=mollify_factor, ) return sp.csc_matrix(L), sp.csc_matrix(M) # ====================================================================== # §7 CLUSTERING IMPLEMENTATIONS # ====================================================================== def _cluster_hdbscan( points: Points, *, min_cluster_size: int = 50, **kwargs: Any, ) -> np.ndarray: """HDBSCAN clustering.""" try: from sklearn.cluster import HDBSCAN clusterer = HDBSCAN( min_cluster_size=min_cluster_size, **kwargs, ) except ImportError: try: import hdbscan clusterer = hdbscan.HDBSCAN( min_cluster_size=min_cluster_size, **kwargs, ) except ImportError as exc: raise ImportError( "HDBSCAN requires scikit-learn >= 1.3 or the hdbscan package.\n" " pip install scikit-learn # or: pip install hdbscan" ) from exc labels = clusterer.fit_predict(points) n_clusters = len(set(labels)) - (1 if -1 in labels else 0) n_noise = (labels == -1).sum() logger.info( "HDBSCAN: %d clusters, %d noise points (%.1f%%)", n_clusters, n_noise, 100 * n_noise / len(labels), ) return labels def _cluster_spectral( points: Points, *, n_clusters: int, k: int = 20, **kwargs: Any, ) -> np.ndarray: """Spectral clustering on kNN affinity graph.""" from sklearn.cluster import SpectralClustering # Build kNN affinity. dists, indices = knn_search(points, k=k) N = points.shape[0] sigma = float(np.median(dists[:, 1:])) rows = np.repeat(np.arange(N), k) cols = indices.ravel() weights = np.exp(-(dists.ravel() ** 2) / (2 * sigma**2)) W = sp.csr_matrix((weights, (rows, cols)), shape=(N, N)) W = (W + W.T) / 2 sc = SpectralClustering( n_clusters=n_clusters, affinity="precomputed", **kwargs, ) labels = sc.fit_predict(W.toarray()) logger.info("Spectral clustering: %d clusters", n_clusters) return labels def _cluster_kmeans( points: Points, *, n_clusters: int, **kwargs: Any, ) -> np.ndarray: """K-means clustering.""" from sklearn.cluster import KMeans km = KMeans(n_clusters=n_clusters, n_init=10, **kwargs) labels = km.fit_predict(points) logger.info("K-means: %d clusters", n_clusters) return labels def _cluster_dbscan( points: Points, *, eps: float | None = None, min_samples: int = 10, **kwargs: Any, ) -> np.ndarray: """DBSCAN clustering.""" from sklearn.cluster import DBSCAN if eps is None: dists, _ = knn_search(points, k=min_samples) eps = float(np.percentile(dists[:, -1], 90)) db = DBSCAN(eps=eps, min_samples=min_samples, **kwargs) labels = db.fit_predict(points) n_clusters = len(set(labels)) - (1 if -1 in labels else 0) logger.info("DBSCAN (ε=%.2f): %d clusters", eps, n_clusters) return labels # ====================================================================== # §8 __all__ # ====================================================================== __all__ = [ "BrainPointCloud", ]