"""Exploratory data analysis and quality control for spectral morphometry.
Five diagnostic blocks plus the descriptor recommendation engine:
1. **Spectral QC** — validate eigendecomposition quality.
2. **Optimal k** — how many eigenpairs are enough?
3. **Descriptor profiling** — summary statistics, normality, outliers.
4. **Reliability** — ICC test-retest, batch-effect detection.
5. **Report** — integrated markdown/Rich output.
6. **recommend_descriptor()** — surrogate-based descriptor selection.
"""
from __future__ import annotations
from collections.abc import Callable
from dataclasses import dataclass, field
from typing import Any, Literal
import numpy as np
from spectralbrain.core.base import SpectralDecomposition
from spectralbrain.runtime import (
DESCRIPTOR_ELIGIBILITY,
AnalysisObjective,
get_logger,
progress_simple,
)
logger = get_logger(__name__)
# ======================================================================
# §1 SPECTRAL QC
# ======================================================================
[docs]
@dataclass
class SpectralQCReport:
"""Quality-control diagnostics for a spectral decomposition.
All fields are populated by :func:`spectral_qc`.
"""
n_vertices: int = 0
n_eigenvalues: int = 0
lambda_0: float = 0.0
lambda_0_ok: bool = True
fiedler_value: float = 0.0
spectral_gap: float = 0.0
eigenvalues_nonneg: bool = True
n_negative_eigenvalues: int = 0
max_negative_eigenvalue: float = 0.0
orthonormality_error: float = 0.0
orthonormality_ok: bool = True
laplacian_row_sum_max: float = 0.0
laplacian_row_sum_ok: bool = True
near_degenerate_pairs: int = 0
recommended_k: int | None = None
warnings: list[str] = field(default_factory=list)
passed: bool = True
def __repr__(self) -> str:
"""Return a human-readable summary of the QC report."""
status = "✓ PASSED" if self.passed else "✗ ISSUES FOUND"
return (
f"SpectralQC({status}, N={self.n_vertices}, "
f"k={self.n_eigenvalues}, λ₀={self.lambda_0:.2e}, "
f"Fiedler={self.fiedler_value:.4f})"
)
[docs]
def spectral_qc(
decomp: SpectralDecomposition,
*,
lambda_0_tol: float = 1e-4,
ortho_tol: float = 1e-3,
row_sum_tol: float = 1e-2,
degeneracy_tol: float = 1e-6,
) -> SpectralQCReport:
"""Run quality-control diagnostics on a spectral decomposition.
Parameters
----------
decomp : SpectralDecomposition
lambda_0_tol : float
Tolerance for λ₀ ≈ 0.
ortho_tol : float
Tolerance for M-orthonormality of eigenvectors.
row_sum_tol : float
Tolerance for Laplacian row-sum ≈ 0.
degeneracy_tol : float
Relative gap below which eigenvalue pairs are flagged
as near-degenerate.
Returns
-------
SpectralQCReport
"""
rpt = SpectralQCReport()
evals = decomp.eigenvalues
evecs = decomp.eigenvectors
rpt.n_vertices = decomp.n_vertices
rpt.n_eigenvalues = decomp.n_eigenvalues
rpt.lambda_0 = float(evals[0])
rpt.fiedler_value = float(evals[1]) if len(evals) > 1 else 0.0
rpt.spectral_gap = decomp.spectral_gap
# Check λ₀ ≈ 0.
if abs(evals[0]) > lambda_0_tol:
rpt.lambda_0_ok = False
rpt.warnings.append(f"λ₀ = {evals[0]:.2e} (expected ≈ 0, tol={lambda_0_tol:.0e})")
# Check non-negativity.
neg_mask = evals < -1e-10
rpt.n_negative_eigenvalues = int(neg_mask.sum())
if rpt.n_negative_eigenvalues > 0:
rpt.eigenvalues_nonneg = False
rpt.max_negative_eigenvalue = float(evals[neg_mask].min())
rpt.warnings.append(
f"{rpt.n_negative_eigenvalues} negative eigenvalues "
f"(min={rpt.max_negative_eigenvalue:.2e})"
)
# Check M-orthonormality: Φᵀ M Φ ≈ I.
if decomp.mass is not None:
M_dense = decomp.mass
# Sample a subset of columns for large k.
k_check = min(decomp.n_eigenvalues, 20)
Phi = evecs[:, :k_check]
gram = Phi.T @ (M_dense @ Phi)
identity = np.eye(k_check)
rpt.orthonormality_error = float(np.max(np.abs(gram - identity)))
if rpt.orthonormality_error > ortho_tol:
rpt.orthonormality_ok = False
rpt.warnings.append(
f"Eigenvectors not M-orthonormal (max error={rpt.orthonormality_error:.2e})"
)
# Check Laplacian row sum.
if decomp.stiffness is not None:
row_sums = np.abs(np.asarray(decomp.stiffness.sum(axis=1)).ravel())
rpt.laplacian_row_sum_max = float(row_sums.max())
if rpt.laplacian_row_sum_max > row_sum_tol:
rpt.laplacian_row_sum_ok = False
rpt.warnings.append(
f"Laplacian row-sum max={rpt.laplacian_row_sum_max:.2e} (should be ≈ 0)"
)
# Near-degenerate eigenvalue pairs.
for i in range(1, len(evals) - 1):
if evals[i] > 1e-10:
rel_gap = abs(evals[i + 1] - evals[i]) / evals[i]
if rel_gap < degeneracy_tol:
rpt.near_degenerate_pairs += 1
if rpt.near_degenerate_pairs > 0:
rpt.warnings.append(
f"{rpt.near_degenerate_pairs} near-degenerate eigenvalue pairs "
f"(GPS sign ambiguity risk)"
)
rpt.passed = rpt.lambda_0_ok and rpt.eigenvalues_nonneg and rpt.orthonormality_ok
return rpt
# ======================================================================
# §2 OPTIMAL k SELECTION
# ======================================================================
[docs]
@dataclass
class OptimalKResult:
"""Recommended number of eigenpairs by multiple criteria."""
k_elbow: int = 0
k_energy_95: int = 0
k_energy_99: int = 0
k_gap: int = 0
k_recommended: int = 0
eigenvalues: np.ndarray | None = None
cumulative_energy: np.ndarray | None = None
def __repr__(self) -> str:
"""Return a compact summary of the optimal-k recommendation."""
return (
f"OptimalK(recommended={self.k_recommended}, "
f"elbow={self.k_elbow}, energy95={self.k_energy_95}, "
f"gap={self.k_gap})"
)
[docs]
def optimal_k(
eigenvalues: np.ndarray,
*,
energy_thresholds: tuple[float, float] = (0.95, 0.99),
) -> OptimalKResult:
"""Determine optimal number of eigenpairs.
Three criteria:
1. **Elbow** — maximum curvature of log(λ) vs index.
2. **Energy** — Σᵢλᵢ / Σλ > threshold.
3. **Max gap** — largest relative gap between consecutive λ.
Parameters
----------
eigenvalues : ndarray
Full eigenvalue sequence.
energy_thresholds : tuple of float
Thresholds for cumulative energy (default 95% and 99%).
Returns
-------
OptimalKResult
"""
evals = np.asarray(eigenvalues)
evals_pos = evals[evals > 1e-10]
n = len(evals_pos)
result = OptimalKResult(eigenvalues=evals)
if n < 3:
result.k_recommended = n
return result
# Cumulative energy.
total = evals_pos.sum()
cum = np.cumsum(evals_pos) / total
result.cumulative_energy = cum
result.k_energy_95 = int(np.searchsorted(cum, energy_thresholds[0]) + 1)
result.k_energy_99 = int(np.searchsorted(cum, energy_thresholds[1]) + 1)
# Elbow: maximum second derivative of log(λ).
log_lam = np.log(evals_pos + 1e-30)
d2 = np.diff(log_lam, n=2)
result.k_elbow = int(np.argmax(np.abs(d2)) + 2) # +2 for diff offset
# Max relative gap.
gaps = np.diff(evals_pos) / (evals_pos[:-1] + 1e-30)
result.k_gap = int(np.argmax(gaps) + 1)
# Consensus: median of the three.
candidates = [result.k_elbow, result.k_energy_95, result.k_gap]
result.k_recommended = int(np.median(candidates))
result.k_recommended = max(10, min(result.k_recommended, n))
return result
# ======================================================================
# §3 DESCRIPTOR PROFILING
# ======================================================================
[docs]
def descriptor_profile(
descriptors: dict[str, np.ndarray],
*,
normality_samples: int = 500,
seed: int | None = None,
) -> dict[str, dict[str, Any]]:
"""Summary statistics for each descriptor.
Parameters
----------
descriptors : dict of {name: ndarray}
Descriptor arrays (any shape — handles ScalarMap, DescriptorMatrix,
GlobalDescriptor).
normality_samples : int
Subsample size for Shapiro-Wilk test.
seed : int, optional
Returns
-------
dict of {name: {stat: value}}
Keys per descriptor: mean, std, min, max, skew, kurtosis,
q25, q50, q75, shapiro_p, n_outliers_3sigma, shape.
"""
from scipy.stats import kurtosis, shapiro, skew
rng = np.random.default_rng(seed)
profiles: dict[str, dict[str, Any]] = {}
for name, arr in descriptors.items():
arr = np.asarray(arr, dtype=np.float64)
flat = arr.ravel()
p = {
"shape": arr.shape,
"mean": float(np.mean(flat)),
"std": float(np.std(flat)),
"min": float(np.min(flat)),
"max": float(np.max(flat)),
"q25": float(np.percentile(flat, 25)),
"q50": float(np.percentile(flat, 50)),
"q75": float(np.percentile(flat, 75)),
"skewness": float(skew(flat)),
"kurtosis": float(kurtosis(flat)),
}
# Outlier count (beyond 3σ).
z = (flat - p["mean"]) / (p["std"] + 1e-30)
p["n_outliers_3sigma"] = int(np.sum(np.abs(z) > 3))
p["pct_outliers"] = 100 * p["n_outliers_3sigma"] / len(flat)
# Shapiro-Wilk on subsample.
n = min(normality_samples, len(flat))
sample = rng.choice(flat, size=n, replace=False) if len(flat) > n else flat
try:
_, p_val = shapiro(sample)
p["shapiro_p"] = float(p_val)
p["normally_distributed"] = p_val > 0.05
except Exception:
p["shapiro_p"] = None
p["normally_distributed"] = None
profiles[name] = p
return profiles
[docs]
def descriptor_correlation(
descriptors: dict[str, np.ndarray],
*,
method: Literal["pearson", "spearman"] = "pearson",
) -> tuple[np.ndarray, list[str]]:
"""Correlation matrix between descriptors (redundancy check).
For multi-column descriptors, uses the mean across columns.
Parameters
----------
descriptors : dict of {name: ndarray}
method : str
Returns
-------
corr_matrix : ndarray, shape (D, D)
names : list of str
"""
from scipy.stats import spearmanr
names = sorted(descriptors.keys())
vectors = []
for name in names:
arr = np.asarray(descriptors[name], dtype=np.float64)
if arr.ndim > 1:
vectors.append(arr.mean(axis=1))
else:
vectors.append(arr)
# Ensure all same length.
min_len = min(len(v) for v in vectors)
mat = np.column_stack([v[:min_len] for v in vectors])
if method == "pearson":
corr = np.corrcoef(mat, rowvar=False)
else:
corr, _ = spearmanr(mat)
if corr.ndim == 0:
corr = np.array([[1.0]])
return corr, names
# ======================================================================
# §4 TEST-RETEST RELIABILITY
# ======================================================================
[docs]
def compute_icc(
test: np.ndarray,
retest: np.ndarray,
*,
icc_type: Literal["ICC2,1", "ICC3,1"] = "ICC3,1",
) -> float:
"""Intraclass Correlation Coefficient for test-retest.
Parameters
----------
test : ndarray, shape (N,) or (N, T)
Descriptor values at time 1.
retest : ndarray, shape (N,) or (N, T)
Descriptor values at time 2.
icc_type : str
``"ICC2,1"`` — two-way random, single measures.
``"ICC3,1"`` — two-way mixed, single measures
(recommended for neuroimaging).
Returns
-------
float
ICC value in [-1, 1]. >0.75 = excellent, 0.60–0.75 = good,
0.40–0.60 = fair, <0.40 = poor.
"""
test = np.asarray(test, dtype=np.float64).ravel()
retest = np.asarray(retest, dtype=np.float64).ravel()
n = min(len(test), len(retest))
test, retest = test[:n], retest[:n]
# Two-way ANOVA decomposition.
k = 2 # two measurements
grand_mean = (test.mean() + retest.mean()) / 2
# Between-subjects SS.
subject_means = (test + retest) / 2
SS_between = k * np.sum((subject_means - grand_mean) ** 2)
# Within-subjects SS.
SS_within = np.sum((test - subject_means) ** 2) + np.sum((retest - subject_means) ** 2)
# Between-measures SS.
measure_means = np.array([test.mean(), retest.mean()])
SS_measures = n * np.sum((measure_means - grand_mean) ** 2)
# Error SS.
SS_error = SS_within - SS_measures
# Mean squares.
MS_between = SS_between / (n - 1)
MS_measures = SS_measures / (k - 1) if k > 1 else 0
MS_error = SS_error / ((n - 1) * (k - 1)) if (n - 1) * (k - 1) > 0 else 1e-10
if icc_type == "ICC3,1":
# ICC(3,1) = (MS_between - MS_error) / (MS_between + (k-1)·MS_error)
icc = (MS_between - MS_error) / (MS_between + (k - 1) * MS_error)
elif icc_type == "ICC2,1":
icc = (MS_between - MS_error) / (
MS_between + (k - 1) * MS_error + k * (MS_measures - MS_error) / n
)
else:
raise ValueError(f"Unknown ICC type: {icc_type!r}")
return float(np.clip(icc, -1.0, 1.0))
[docs]
def batch_effect_scan(
descriptors: dict[str, np.ndarray],
site_labels: np.ndarray,
*,
alpha: float = 0.05,
) -> dict[str, dict[str, Any]]:
"""Scan for batch/site effects in spectral descriptors.
For each descriptor, tests whether distributions differ
significantly across sites using Kruskal-Wallis.
Parameters
----------
descriptors : dict of {name: ndarray}
Per-subject descriptor values.
site_labels : ndarray, shape (n_subjects,)
Site/scanner labels.
alpha : float
Significance threshold.
Returns
-------
dict of {name: {statistic, p_value, has_batch_effect, effect_size}}
"""
from scipy.stats import kruskal
site_labels = np.asarray(site_labels)
unique_sites = np.unique(site_labels)
results: dict[str, dict[str, Any]] = {}
for name, arr in descriptors.items():
arr = np.asarray(arr, dtype=np.float64)
if arr.ndim > 1:
arr = arr.mean(axis=1)
groups = [arr[site_labels == s] for s in unique_sites]
groups = [g for g in groups if len(g) > 1]
if len(groups) < 2:
results[name] = {
"statistic": 0.0,
"p_value": 1.0,
"has_batch_effect": False,
"effect_size": 0.0,
}
continue
try:
stat, p_val = kruskal(*groups)
# Effect size: η² = H / (N - 1)
N = sum(len(g) for g in groups)
eta_sq = float(stat / (N - 1)) if N > 1 else 0.0
results[name] = {
"statistic": float(stat),
"p_value": float(p_val),
"has_batch_effect": p_val < alpha,
"effect_size_eta2": eta_sq,
}
except Exception:
results[name] = {
"statistic": 0.0,
"p_value": 1.0,
"has_batch_effect": False,
"effect_size_eta2": 0.0,
}
return results
[docs]
def eigenvalue_stability(
decomps: list[SpectralDecomposition],
*,
n_eigenvalues: int | None = None,
) -> dict[str, np.ndarray]:
"""Cross-subject eigenvalue stability analysis.
Parameters
----------
decomps : list of SpectralDecomposition
Decompositions from multiple subjects.
n_eigenvalues : int, optional
Number of eigenvalues to compare.
Returns
-------
dict
Keys: ``"mean"``, ``"std"``, ``"cv"`` (coefficient of
variation), ``"eigenvalue_matrix"`` (subjects × k).
"""
k = n_eigenvalues or min(d.n_eigenvalues for d in decomps)
matrix = np.array([d.eigenvalues[:k] for d in decomps]) # (S, k)
mean_evals = matrix.mean(axis=0)
std_evals = matrix.std(axis=0)
cv = std_evals / (mean_evals + 1e-30)
return {
"mean": mean_evals,
"std": std_evals,
"cv": cv,
"eigenvalue_matrix": matrix,
}
# ======================================================================
# §5 RECOMMEND_DESCRIPTOR — surrogate-based selection
# ======================================================================
[docs]
@dataclass
class DescriptorRecommendation:
"""Output of :func:`recommend_descriptor`.
Attributes
----------
recommended : str
Name of the top-ranked descriptor.
objective : str
The analysis objective used.
ranking : list of dict
Top descriptors with scores and metrics.
surrogate_details : dict
Information about the surrogates generated.
"""
recommended: str
objective: str
ranking: list[dict[str, Any]]
surrogate_details: dict[str, Any]
def __repr__(self) -> str:
"""Return a summary showing the top-5 ranked descriptors."""
top5 = ", ".join(f"{r['descriptor']}({r['score']:.3f})" for r in self.ranking[:5])
return f"Recommendation('{self.recommended}' for {self.objective}) — top 5: [{top5}]"
def _generate_surrogates(
points: np.ndarray,
objective: str,
*,
n_surrogates: int = 30,
seed: int | None = None,
) -> tuple[list[np.ndarray], np.ndarray]:
"""Generate synthetic deformations for descriptor evaluation.
Parameters
----------
points : ndarray, shape (N, 3)
Reference geometry (vertex/point coordinates).
objective : str
n_surrogates : int
seed : int
Returns
-------
surrogate_points : list of ndarray
Deformed point sets.
labels : ndarray, shape (n_surrogates,)
0 = control (undeformed), 1 = deformed.
"""
rng = np.random.default_rng(seed)
n_half = n_surrogates // 2
surrogates: list[np.ndarray] = []
labels = np.zeros(n_surrogates, dtype=np.int32)
N = points.shape[0]
centroid = points.mean(axis=0)
for i in range(n_surrogates):
if i < n_half:
# Controls: add small Gaussian noise (no systematic deformation).
noise = rng.normal(0, 0.01, points.shape)
surrogates.append(points + noise)
labels[i] = 0
else:
# Deformed: apply objective-specific deformations.
if objective == "group_discrimination":
# Focal atrophy: shrink a random subregion.
center = points[rng.integers(N)]
dists = np.linalg.norm(points - center, axis=1)
radius = np.percentile(dists, 30)
mask = dists < radius
scale = 0.7 + 0.3 * rng.random()
deformed = points.copy()
deformed[mask] = center + (deformed[mask] - center) * scale
surrogates.append(deformed + rng.normal(0, 0.01, points.shape))
elif objective == "lateralization":
# Asymmetric deformation: scale one half differently.
mid = centroid[0]
left = points[:, 0] < mid
deformed = points.copy()
scale = 0.8 + 0.2 * rng.random()
deformed[left] *= np.array([scale, 1, 1])
surrogates.append(deformed + rng.normal(0, 0.01, points.shape))
elif objective == "longitudinal_change":
# Progressive uniform shrinkage.
scale = 0.85 + 0.15 * rng.random()
deformed = centroid + (points - centroid) * scale
surrogates.append(deformed + rng.normal(0, 0.01, points.shape))
elif objective == "subregion_detection":
# Localised bump (add outward displacement to a patch).
center = points[rng.integers(N)]
dists = np.linalg.norm(points - center, axis=1)
radius = np.percentile(dists, 20)
mask = dists < radius
displacement = rng.uniform(0.5, 2.0)
deformed = points.copy()
direction = deformed[mask] - centroid
direction /= np.linalg.norm(direction, axis=1, keepdims=True) + 1e-12
deformed[mask] += direction * displacement
surrogates.append(deformed + rng.normal(0, 0.01, points.shape))
labels[i] = 1
return surrogates, labels
def _evaluate_descriptor(
descriptor_values: list[np.ndarray],
labels: np.ndarray,
*,
n_splits: int = 5,
) -> dict[str, float]:
"""Evaluate a descriptor's discriminative power on surrogates.
Returns AUC, balanced accuracy, and Cohen's d.
"""
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_score
from sklearn.preprocessing import StandardScaler
# Aggregate each surrogate's descriptor to a single vector.
features = []
for dv in descriptor_values:
arr = np.asarray(dv, dtype=np.float64)
if arr.ndim > 1:
# Use mean + std per column as global summary.
feat = np.concatenate([arr.mean(axis=0), arr.std(axis=0)])
else:
feat = np.array([arr.mean(), arr.std(), np.median(arr)])
features.append(feat)
X = np.array(features) # (n_surr, d)
y = labels
# Handle constant features.
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# Remove zero-variance columns.
var_mask = X_scaled.std(axis=0) > 1e-10
if not var_mask.any():
return {"auc": 0.5, "accuracy": 0.5, "cohens_d": 0.0}
X_scaled = X_scaled[:, var_mask]
n_splits_actual = min(n_splits, min(np.bincount(y)))
n_splits_actual = max(2, n_splits_actual)
try:
clf = LogisticRegression(max_iter=1000, solver="lbfgs")
auc_scores = cross_val_score(
clf,
X_scaled,
y,
cv=n_splits_actual,
scoring="roc_auc",
)
acc_scores = cross_val_score(
clf,
X_scaled,
y,
cv=n_splits_actual,
scoring="balanced_accuracy",
)
except Exception:
auc_scores = np.array([0.5])
acc_scores = np.array([0.5])
# Cohen's d between groups.
X0 = X_scaled[y == 0]
X1 = X_scaled[y == 1]
pooled_std = np.sqrt(
((len(X0) - 1) * X0.var(axis=0).mean() + (len(X1) - 1) * X1.var(axis=0).mean())
/ (len(X0) + len(X1) - 2)
)
cohens_d = float(np.abs(X0.mean() - X1.mean()) / (pooled_std + 1e-10))
return {
"auc": float(np.mean(auc_scores)),
"auc_std": float(np.std(auc_scores)),
"accuracy": float(np.mean(acc_scores)),
"cohens_d": cohens_d,
}
[docs]
def recommend_descriptor(
points: np.ndarray,
labels: np.ndarray | None = None,
objective: str | AnalysisObjective = "group_discrimination",
*,
n_surrogates: int = 30,
k_eigenpairs: int = 30,
n_jobs: int = 1,
seed: int | None = 42,
) -> DescriptorRecommendation:
"""Recommend the best spectral descriptor for an analysis objective.
Generates synthetic surrogates with controlled deformations,
computes all eligible descriptors, evaluates each descriptor's
discriminative power, and ranks by consensus.
Parameters
----------
points : ndarray, shape (N, 3)
Representative geometry (e.g. mean mesh vertices, or one
subject's point cloud).
labels : ndarray, optional
Not used by the surrogate engine (surrogates generate their
own labels). Reserved for future data-driven evaluation.
objective : str or AnalysisObjective
Analysis goal. Determines eligible descriptors and
surrogate deformation type.
n_surrogates : int
Number of synthetic shapes to generate.
k_eigenpairs : int
Eigenpairs per surrogate decomposition.
n_jobs : int
Number of parallel workers for surrogate decomposition.
``1`` = sequential (default), ``-1`` = all cores. Requires
``joblib`` when > 1.
seed : int, optional
RNG seed for reproducibility.
Returns
-------
DescriptorRecommendation
Contains ``.recommended``, ``.ranking`` (top descriptors
with AUC, accuracy, effect size), and ``.surrogate_details``.
Notes
-----
This function is computationally heavy (30 surrogates × k
eigenpairs × all descriptors by default). For large meshes,
consider using ``n_jobs=-1`` to parallelise the surrogate
decomposition across CPU cores.
Examples
--------
>>> rec = sb.statistics.recommend_descriptor(
... mesh.vertices,
... objective="group_discrimination",
... n_jobs=-1,
... )
>>> print(rec.recommended)
'wks'
>>> print(rec.ranking[:3])
"""
from spectralbrain.spectral.descriptors import (
compute_bates_signatures,
compute_bks,
compute_gps,
compute_hks,
compute_shapedna,
compute_si_hks,
compute_wks,
)
if isinstance(objective, AnalysisObjective):
obj_str = objective.value
else:
obj_str = objective
eligible = DESCRIPTOR_ELIGIBILITY.get(obj_str, [])
if not eligible:
raise ValueError(
f"Unknown objective: {obj_str!r}. Available: {list(DESCRIPTOR_ELIGIBILITY.keys())}"
)
# Map descriptor names to compute functions.
compute_fns: dict[str, Callable] = {
"shapedna": lambda d: compute_shapedna(d, normalize="area"),
"hks": lambda d: compute_hks(d, n_times=20),
"si_hks": lambda d: compute_si_hks(d, n_frequencies=6),
"wks": lambda d: compute_wks(d, n_energies=20),
"gps": lambda d: compute_gps(d),
"bates_sp": lambda d: compute_bates_signatures(d, order=2, n_times=5),
"bks": lambda d: compute_bks(d),
}
# Filter to eligible + available.
active_descs = {name: fn for name, fn in compute_fns.items() if name in eligible}
logger.info(
"recommend_descriptor: objective='%s', %d eligible, %d surrogates, k=%d",
obj_str,
len(active_descs),
n_surrogates,
k_eigenpairs,
)
# Generate surrogates.
surrogates, surr_labels = _generate_surrogates(
points,
obj_str,
n_surrogates=n_surrogates,
seed=seed,
)
# Decompose all surrogates.
from spectralbrain.core.pointclouds import BrainPointCloud
def _decompose_single(pts: np.ndarray) -> SpectralDecomposition:
"""Decompose a single surrogate point cloud."""
pc = BrainPointCloud(pts)
return pc.decompose(k=k_eigenpairs, laplacian_method="knn")
decomps: list[SpectralDecomposition] = []
if n_jobs == 1:
with progress_simple("Decomposing surrogates", total=n_surrogates) as tick:
for pts in surrogates:
decomps.append(_decompose_single(pts))
tick(1)
else:
from spectralbrain.backends.cpu import parallel_map
decomps = parallel_map(
_decompose_single,
surrogates,
n_jobs=n_jobs,
description="Decomposing surrogates (parallel)",
)
# Compute each descriptor on all surrogates and evaluate.
scores: list[dict[str, Any]] = []
with progress_simple("Evaluating descriptors", total=len(active_descs)) as tick:
for desc_name, compute_fn in active_descs.items():
try:
desc_values = [compute_fn(d) for d in decomps]
metrics = _evaluate_descriptor(desc_values, surr_labels)
# Composite score: weighted combination.
score = (
0.5 * metrics["auc"]
+ 0.3 * metrics["accuracy"]
+ 0.2 * min(metrics["cohens_d"] / 2.0, 1.0)
)
scores.append(
{
"descriptor": desc_name,
"score": score,
**metrics,
}
)
except Exception as exc:
logger.warning(
"Descriptor '%s' failed: %s",
desc_name,
exc,
)
scores.append(
{
"descriptor": desc_name,
"score": 0.0,
"auc": 0.0,
"accuracy": 0.0,
"cohens_d": 0.0,
"error": str(exc),
}
)
tick(1)
# Rank by composite score.
scores.sort(key=lambda x: x["score"], reverse=True)
return DescriptorRecommendation(
recommended=scores[0]["descriptor"] if scores else "hks",
objective=obj_str,
ranking=scores,
surrogate_details={
"n_surrogates": n_surrogates,
"n_controls": int((surr_labels == 0).sum()),
"n_deformed": int((surr_labels == 1).sum()),
"k_eigenpairs": k_eigenpairs,
"seed": seed,
},
)
# ======================================================================
# §6 __all__
# ======================================================================
__all__: list[str] = [
# Recommendation
"DescriptorRecommendation",
# Optimal k
"OptimalKResult",
# QC
"SpectralQCReport",
"batch_effect_scan",
# Reliability
"compute_icc",
"descriptor_correlation",
# Descriptor profiling
"descriptor_profile",
"eigenvalue_stability",
"optimal_k",
"recommend_descriptor",
"spectral_qc",
]