"""Statistical analysis toolkit for spectral morphometry.
Covers the full analytical pipeline from vertex-wise group comparison
to connectome-level network analysis, including dimension-collapsing
methods for converting per-vertex descriptors into per-shape global
vectors.
Sections
--------
§1 Vertex-wise group comparison (t-test, Mann-Whitney, TFCE, FDR, permutation)
§2 Effect sizes (Cohen's d, Hedges' g — vertex-wise maps)
§3 Vertex-wise correlation with clinical scores
§4 Surprise / anomaly maps (z-score against normative)
§5 Classification (SVM, LogReg + CV)
§6 Dimension collapsing (Fisher vectors, Bag-of-Spectral-Words, kernel mean embedding)
§7 Dissimilarity measures (EMD, KL, JS divergence, energy distance)
§8 RSA — Representational Similarity Analysis
§9 Connectome & network analysis (modularity, participation, NBS, Mantel)
§10 Asymmetry analysis (lateralisation indices)
§11 Dimensionality reduction (PCA, MDS, UMAP)
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Any, Literal
import numpy as np
from scipy import stats as sp_stats
from spectralbrain.runtime import (
ConnectomeMatrix,
DescriptorMatrix,
DistanceMatrix,
GlobalDescriptor,
ScalarMap,
get_logger,
progress_simple,
)
logger = get_logger(__name__)
# ======================================================================
# §1 VERTEX-WISE GROUP COMPARISON
# ======================================================================
[docs]
@dataclass
class VertexWiseResult:
"""Results of a vertex-wise statistical test."""
statistic: np.ndarray # (N,) test statistic per vertex
p_values: np.ndarray # (N,) uncorrected p-values
p_corrected: np.ndarray # (N,) corrected p-values
correction: str # method used
significant: np.ndarray # (N,) bool mask at alpha
alpha: float
effect_size: np.ndarray | None = None # (N,) Cohen's d
@property
def n_significant(self) -> int:
"""Return the count of significant results after correction."""
return int(self.significant.sum())
def __repr__(self) -> str:
"""Return a compact summary string."""
return (
f"VertexWiseResult({self.n_significant} significant / "
f"{len(self.statistic)} vertices, "
f"correction='{self.correction}', α={self.alpha})"
)
[docs]
def vertexwise_ttest(
group_a: np.ndarray,
group_b: np.ndarray,
*,
correction: Literal["fdr", "bonferroni", "none"] = "fdr",
alpha: float = 0.05,
equal_var: bool = False,
) -> VertexWiseResult:
"""Independent two-sample t-test at each vertex (vectorised).
Parameters
----------
group_a : ndarray, shape (n_a, N) or (n_a, N, T)
Descriptor values for group A (subjects × vertices [× scales]).
If 3D, tests are run on the mean across the last axis.
group_b : ndarray, shape (n_b, N)
Descriptor values for group B.
correction : str
``"fdr"`` — Benjamini-Hochberg; ``"bonferroni"``; ``"none"``.
alpha : float
equal_var : bool
If ``False`` (default), Welch's t-test (does not assume equal
variances) — the safer default for groups with unequal size or
spread. If ``True``, Student's pooled-variance t-test.
Returns
-------
VertexWiseResult
"""
a = np.asarray(group_a, dtype=np.float64)
b = np.asarray(group_b, dtype=np.float64)
if a.ndim == 3:
a = a.mean(axis=-1)
if b.ndim == 3:
b = b.mean(axis=-1)
# Vectorised across vertices (axis=0 = subjects).
t_stat, p_vals = sp_stats.ttest_ind(a, b, axis=0, equal_var=equal_var)
# Constant-across-subjects vertices yield NaN — treat as null.
t_stat = np.nan_to_num(np.asarray(t_stat), nan=0.0)
p_vals = np.where(np.isnan(p_vals), 1.0, p_vals)
p_corr = _correct_pvalues(p_vals, method=correction)
d = _cohens_d_arrays(a, b)
return VertexWiseResult(
statistic=t_stat,
p_values=p_vals,
p_corrected=p_corr,
correction=correction,
significant=p_corr < alpha,
alpha=alpha,
effect_size=d,
)
[docs]
def vertexwise_mannwhitney(
group_a: np.ndarray,
group_b: np.ndarray,
*,
correction: Literal["fdr", "bonferroni", "none"] = "fdr",
alpha: float = 0.05,
) -> VertexWiseResult:
"""Non-parametric Mann-Whitney U test at each vertex.
Parameters
----------
group_a, group_b : ndarray, shape (n, N)
correction, alpha : as above.
Returns
-------
VertexWiseResult
"""
a = np.asarray(group_a, dtype=np.float64)
b = np.asarray(group_b, dtype=np.float64)
if a.ndim == 3:
a = a.mean(axis=-1)
if b.ndim == 3:
b = b.mean(axis=-1)
# Vectorised across vertices (scipy >= 1.7 supports axis=).
u_stat, p_vals = sp_stats.mannwhitneyu(a, b, alternative="two-sided", axis=0)
u_stat = np.nan_to_num(np.asarray(u_stat), nan=0.0)
p_vals = np.where(np.isnan(p_vals), 1.0, p_vals)
p_corr = _correct_pvalues(p_vals, method=correction)
return VertexWiseResult(
statistic=u_stat,
p_values=p_vals,
p_corrected=p_corr,
correction=correction,
significant=p_corr < alpha,
alpha=alpha,
)
[docs]
def vertexwise_permutation(
group_a: np.ndarray,
group_b: np.ndarray,
*,
n_permutations: int = 5000,
stat_func: Literal["t", "mean_diff"] = "t",
correction: Literal["max", "fdr", "none"] = "max",
seed: int | None = None,
alpha: float = 0.05,
) -> VertexWiseResult:
"""Permutation test at each vertex with multiple-comparison control.
The label permutation builds an exact (non-parametric) null. Crucially,
the per-vertex permutation p-value is **not** corrected for multiple
comparisons on its own. This function offers proper correction:
- ``correction="max"`` (default): family-wise error rate (FWER) control
via the **maximum-statistic** null distribution (Westfall & Young;
Nichols & Holmes 2002). On each permutation the maximum of
``|statistic|`` across all vertices is recorded; a vertex is
significant if its observed statistic exceeds the
``(1-alpha)`` quantile of that null. This is the standard rigorous
correction for vertex/voxel-wise permutation testing.
- ``correction="fdr"``: per-vertex permutation p-values, then
Benjamini-Hochberg.
- ``correction="none"``: raw per-vertex permutation p-values.
Parameters
----------
group_a, group_b : ndarray, shape (n, N)
n_permutations : int
stat_func : ``"t"`` or ``"mean_diff"``
correction : ``"max"``, ``"fdr"``, or ``"none"``
seed : int, optional
alpha : float
Returns
-------
VertexWiseResult
``p_values`` are always the raw per-vertex permutation p-values;
``p_corrected`` reflects the chosen correction. For ``"max"``,
``p_corrected`` is the FWER-adjusted p-value (fraction of the
max-null at or above each observed statistic).
References
----------
Nichols TE, Holmes AP. Nonparametric permutation tests for functional
neuroimaging. *Hum Brain Mapp* 15(1):1–25, 2002.
"""
a = np.asarray(group_a, dtype=np.float64)
b = np.asarray(group_b, dtype=np.float64)
if a.ndim == 3:
a = a.mean(axis=-1)
if b.ndim == 3:
b = b.mean(axis=-1)
rng = np.random.default_rng(seed)
combined = np.vstack([a, b])
n_a, N = a.shape
n_total = combined.shape[0]
def _stat(xa: np.ndarray, xb: np.ndarray) -> np.ndarray:
if stat_func == "t":
t, _ = sp_stats.ttest_ind(xa, xb, axis=0, equal_var=False)
return np.nan_to_num(np.asarray(t), nan=0.0)
return xa.mean(axis=0) - xb.mean(axis=0)
obs = _stat(a, b)
abs_obs = np.abs(obs)
count_extreme = np.zeros(N, dtype=np.int64) # per-vertex (uncorrected)
count_max = np.zeros(N, dtype=np.int64) # FWER via max-statistic
with progress_simple("Permutation test", total=n_permutations) as tick:
for _ in range(n_permutations):
perm = rng.permutation(n_total)
perm_stat = _stat(combined[perm[:n_a]], combined[perm[n_a:]])
abs_perm = np.abs(perm_stat)
count_extreme += (abs_perm >= abs_obs).astype(np.int64)
count_max += (abs_perm.max() >= abs_obs).astype(np.int64)
tick(1)
# Raw per-vertex permutation p-values (add-one for validity).
p_vals = (count_extreme + 1) / (n_permutations + 1)
if correction == "max":
p_corr = (count_max + 1) / (n_permutations + 1)
corr_label = "permutation-max (FWER)"
elif correction == "fdr":
p_corr = _fdr_bh(p_vals)
corr_label = "permutation+fdr"
else:
p_corr = p_vals.copy()
corr_label = "permutation (uncorrected)"
return VertexWiseResult(
statistic=obs,
p_values=p_vals,
p_corrected=p_corr,
correction=corr_label,
significant=p_corr < alpha,
alpha=alpha,
)
[docs]
def tfce(
statistic_map: np.ndarray,
adjacency: Any,
*,
E: float = 0.5,
H: float = 2.0,
n_steps: int = 100,
) -> np.ndarray:
"""Threshold-Free Cluster Enhancement (Smith & Nichols 2009).
Enhances a statistical map by integrating cluster extent and
height across all thresholds.
TFCE(v) = ∫₀^h(v) e(h)^E · h^H dh
Parameters
----------
statistic_map : ndarray, shape (N,)
Vertex-wise test statistic (e.g. t-values).
adjacency : sparse matrix, shape (N, N)
Vertex adjacency (from mesh or kNN graph).
E : float
Cluster extent exponent (default 0.5).
H : float
Height exponent (default 2.0).
n_steps : int
Number of threshold steps for numerical integration.
Returns
-------
ndarray, shape (N,)
TFCE-enhanced statistic map.
References
----------
Smith SM, Nichols TE. Threshold-free cluster enhancement.
*NeuroImage* 44(1):83–98, 2009.
"""
import scipy.sparse as sp
from scipy.sparse.csgraph import connected_components
adj = sp.csr_matrix(adjacency)
stat = np.abs(statistic_map)
max_stat = stat.max()
if max_stat < 1e-10:
return np.zeros_like(stat)
thresholds = np.linspace(0, max_stat, n_steps + 1)[1:]
dh = thresholds[1] - thresholds[0] if len(thresholds) > 1 else max_stat
tfce_map = np.zeros_like(stat, dtype=np.float64)
for h in thresholds:
# Supra-threshold mask.
mask = stat >= h
if not mask.any():
continue
# Find connected components in supra-threshold subgraph.
sub_adj = adj[mask][:, mask]
n_comp, comp_labels = connected_components(sub_adj, directed=False)
# Cluster extent for each vertex.
for c in range(n_comp):
c_mask = comp_labels == c
extent = c_mask.sum()
# Add contribution: e^E · h^H · dh.
vertices_in_cluster = np.where(mask)[0][c_mask]
tfce_map[vertices_in_cluster] += (extent**E) * (h**H) * dh
return tfce_map
def _correct_pvalues(
p_values: np.ndarray,
method: str,
) -> np.ndarray:
"""Apply multiple comparison correction."""
if method == "none":
return p_values.copy()
elif method == "bonferroni":
return np.minimum(p_values * len(p_values), 1.0)
elif method == "fdr":
return _fdr_bh(p_values)
raise ValueError(f"Unknown correction: {method!r}")
def _fdr_bh(p_values: np.ndarray) -> np.ndarray:
"""Benjamini-Hochberg FDR correction."""
n = len(p_values)
order = np.argsort(p_values)
ranked_p = p_values[order]
adjusted = ranked_p * n / (np.arange(1, n + 1))
# Enforce monotonicity.
adjusted = np.minimum.accumulate(adjusted[::-1])[::-1]
result = np.empty_like(p_values)
result[order] = np.minimum(adjusted, 1.0)
return result
# ======================================================================
# §2 EFFECT SIZES
# ======================================================================
def _cohens_d_arrays(a: np.ndarray, b: np.ndarray) -> np.ndarray:
"""Cohen's d per column (vertex)."""
na, nb = a.shape[0], b.shape[0]
ma, mb = a.mean(axis=0), b.mean(axis=0)
va, vb = a.var(axis=0, ddof=1), b.var(axis=0, ddof=1)
pooled = np.sqrt(((na - 1) * va + (nb - 1) * vb) / (na + nb - 2))
return (ma - mb) / (pooled + 1e-30)
[docs]
def cohens_d_map(
group_a: np.ndarray,
group_b: np.ndarray,
) -> ScalarMap:
"""Vertex-wise Cohen's d effect-size map.
Parameters
----------
group_a, group_b : ndarray, shape (n, N)
Returns
-------
ndarray, shape (N,)
"""
a = np.asarray(group_a, dtype=np.float64)
b = np.asarray(group_b, dtype=np.float64)
if a.ndim == 3:
a = a.mean(axis=-1)
if b.ndim == 3:
b = b.mean(axis=-1)
return _cohens_d_arrays(a, b)
[docs]
def hedges_g_map(
group_a: np.ndarray,
group_b: np.ndarray,
) -> ScalarMap:
"""Vertex-wise Hedges' g (bias-corrected Cohen's d).
Parameters
----------
group_a, group_b : ndarray, shape (n, N)
Returns
-------
ndarray, shape (N,)
"""
d = cohens_d_map(group_a, group_b)
n = group_a.shape[0] + group_b.shape[0]
# Correction factor J.
J = 1 - 3 / (4 * (n - 2) - 1)
return d * J
# ======================================================================
# §3 VERTEX-WISE CORRELATION
# ======================================================================
[docs]
def vertexwise_correlation(
descriptors: np.ndarray,
scores: np.ndarray,
*,
method: Literal["pearson", "spearman"] = "pearson",
correction: str = "fdr",
alpha: float = 0.05,
covariates: np.ndarray | None = None,
) -> VertexWiseResult:
"""Correlate a per-vertex descriptor with a clinical score.
Parameters
----------
descriptors : ndarray, shape (S, N) or (S, N, T)
Per-subject descriptor values. If 3D, averaged over T.
scores : ndarray, shape (S,)
Clinical score per subject.
method : str
correction : str
alpha : float
covariates : ndarray, shape (S, C), optional
If given, partial correlation controlling for covariates.
Returns
-------
VertexWiseResult
"""
desc = np.asarray(descriptors, dtype=np.float64)
if desc.ndim == 3:
desc = desc.mean(axis=-1)
scores = np.asarray(scores, dtype=np.float64)
S = desc.shape[0]
# Number of covariates removed (for partial-correlation degrees of freedom).
n_cov = 0
if covariates is not None:
cov = np.asarray(covariates, dtype=np.float64)
if cov.ndim == 1:
cov = cov[:, None]
n_cov = cov.shape[1]
# Residualise scores and every vertex column on the covariates.
scores = _residualise(scores, cov)
desc = _residualise_columns(desc, cov)
if method == "spearman":
# Spearman = Pearson on ranks.
x = _rank_columns(desc)
y = sp_stats.rankdata(scores)
else:
x = desc
y = scores
# Vectorised Pearson across vertices.
xc = x - x.mean(axis=0)
yc = y - y.mean()
denom = np.sqrt((xc**2).sum(axis=0) * (yc**2).sum())
r_vals = np.where(denom > 1e-30, (xc * yc[:, None]).sum(axis=0) / (denom + 1e-30), 0.0)
r_vals = np.clip(r_vals, -1.0, 1.0)
# Two-sided p-value from t-distribution with df = S - 2 - n_cov.
df = S - 2 - n_cov
if df <= 0:
raise ValueError(
f"Not enough samples for {n_cov} covariates: df = {df} (need S > {2 + n_cov})."
)
with np.errstate(divide="ignore", invalid="ignore"):
t_stat = r_vals * np.sqrt(df / (1.0 - r_vals**2))
t_stat = np.nan_to_num(t_stat, nan=0.0, posinf=0.0, neginf=0.0)
p_vals = 2.0 * sp_stats.t.sf(np.abs(t_stat), df)
p_corr = _correct_pvalues(p_vals, method=correction)
return VertexWiseResult(
statistic=r_vals,
p_values=p_vals,
p_corrected=p_corr,
correction=correction,
significant=p_corr < alpha,
alpha=alpha,
)
def _residualise_columns(Y: np.ndarray, X: np.ndarray) -> np.ndarray:
"""Residualise every column of ``Y`` (S, N) on design ``X`` (S, C)."""
Xd = np.column_stack([np.ones(len(Y)), X])
beta, *_ = np.linalg.lstsq(Xd, Y, rcond=None)
return Y - Xd @ beta
def _rank_columns(Y: np.ndarray) -> np.ndarray:
"""Column-wise rank transform (for Spearman)."""
return sp_stats.rankdata(Y, axis=0)
def _residualise(y: np.ndarray, X: np.ndarray) -> np.ndarray:
"""OLS residuals: y - X @ (X^+ @ y)."""
X = np.column_stack([np.ones(len(y)), X])
beta = np.linalg.lstsq(X, y, rcond=None)[0]
return y - X @ beta
# ======================================================================
# §4 SURPRISE / ANOMALY MAPS
# ======================================================================
[docs]
def surprise_map(
subject_descriptor: np.ndarray,
normative_mean: np.ndarray,
normative_std: np.ndarray,
) -> ScalarMap:
"""Z-score anomaly map against a normative distribution.
Parameters
----------
subject_descriptor : ndarray, shape (N,) or (N, T)
normative_mean : ndarray, same shape
normative_std : ndarray, same shape
Returns
-------
ndarray, same shape
Z-scores: positive = above normative, negative = below.
"""
z = (subject_descriptor - normative_mean) / (normative_std + 1e-30)
return z
[docs]
def surprise_map_percentile(
subject_descriptor: np.ndarray,
normative_distribution: np.ndarray,
) -> ScalarMap:
"""Percentile-based anomaly map.
Parameters
----------
subject_descriptor : ndarray, shape (N,)
normative_distribution : ndarray, shape (S, N)
Normative values from S reference subjects.
Returns
-------
ndarray, shape (N,)
Percentile rank (0–100) of subject relative to normative.
"""
subj = np.asarray(subject_descriptor)
norm = np.asarray(normative_distribution)
N = subj.shape[0]
pctile = np.zeros(N)
for v in range(N):
pctile[v] = sp_stats.percentileofscore(norm[:, v], subj[v])
return pctile
# ======================================================================
# §5 CLASSIFICATION
# ======================================================================
[docs]
@dataclass
class ClassificationResult:
"""Output of a classification analysis."""
accuracy: float
accuracy_std: float
auc: float
auc_std: float
feature_importance: np.ndarray | None
confusion_matrix: np.ndarray | None
model_name: str
def __repr__(self) -> str:
"""Return a compact summary string."""
return (
f"Classification({self.model_name}: "
f"AUC={self.auc:.3f}±{self.auc_std:.3f}, "
f"Acc={self.accuracy:.3f}±{self.accuracy_std:.3f})"
)
[docs]
def classify(
features: np.ndarray,
labels: np.ndarray,
*,
model: Literal["svm", "logistic", "random_forest"] = "svm",
n_folds: int = 5,
seed: int | None = 42,
) -> ClassificationResult:
"""Cross-validated classification with feature importance.
Parameters
----------
features : ndarray, shape (S, d)
labels : ndarray, shape (S,)
model : str
n_folds : int
seed : int
Returns
-------
ClassificationResult
"""
from sklearn.model_selection import StratifiedKFold, cross_val_score
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
if model == "svm":
from sklearn.svm import SVC
clf = SVC(kernel="linear", probability=True, random_state=seed)
elif model == "logistic":
from sklearn.linear_model import LogisticRegression
clf = LogisticRegression(max_iter=1000, random_state=seed)
elif model == "random_forest":
from sklearn.ensemble import RandomForestClassifier
clf = RandomForestClassifier(n_estimators=100, random_state=seed)
else:
raise ValueError(f"Unknown model: {model!r}")
pipe = Pipeline([("scaler", StandardScaler()), ("clf", clf)])
cv = StratifiedKFold(n_splits=n_folds, shuffle=True, random_state=seed)
auc_scores = cross_val_score(pipe, features, labels, cv=cv, scoring="roc_auc")
acc_scores = cross_val_score(pipe, features, labels, cv=cv, scoring="balanced_accuracy")
# Fit on full data for feature importance.
pipe.fit(features, labels)
importance = None
if model in ("svm", "logistic"):
importance = np.abs(pipe.named_steps["clf"].coef_).ravel()
elif model == "random_forest":
importance = pipe.named_steps["clf"].feature_importances_
return ClassificationResult(
accuracy=float(acc_scores.mean()),
accuracy_std=float(acc_scores.std()),
auc=float(auc_scores.mean()),
auc_std=float(auc_scores.std()),
feature_importance=importance,
confusion_matrix=None,
model_name=model,
)
# ======================================================================
# §6 DIMENSION COLLAPSING — from per-vertex to per-shape
# ======================================================================
[docs]
def fisher_vector(
descriptor: DescriptorMatrix,
gmm_means: np.ndarray,
gmm_covs: np.ndarray,
gmm_weights: np.ndarray,
) -> GlobalDescriptor:
"""Fisher vector encoding of per-vertex descriptors.
Projects a per-vertex descriptor distribution onto the gradient
of a Gaussian Mixture Model, producing a fixed-length global
vector regardless of the number of vertices.
Parameters
----------
descriptor : ndarray, shape (N, T)
Per-vertex descriptor matrix.
gmm_means : ndarray, shape (K, T)
GMM component means.
gmm_covs : ndarray, shape (K, T)
GMM diagonal covariances.
gmm_weights : ndarray, shape (K,)
GMM component weights (sum to 1).
Returns
-------
ndarray, shape (2·K·T,)
Fisher vector (concatenation of first and second order
gradient statistics).
References
----------
Perronnin F, Dance C. Fisher kernels on visual vocabularies for
image categorization. *CVPR 2007*.
Sánchez J, Perronnin F, Mensink T, Verbeek J. Image classification
with the Fisher vector. *IJCV* 105(3):222–245, 2013.
"""
N, _T = descriptor.shape
K = gmm_means.shape[0]
# Responsibilities: γ(n, k) = P(k|x_n)
log_resp = np.zeros((N, K))
for k in range(K):
diff = descriptor - gmm_means[k]
log_resp[:, k] = (
np.log(gmm_weights[k] + 1e-30)
- 0.5 * np.sum(diff**2 / (gmm_covs[k] + 1e-30), axis=1)
- 0.5 * np.sum(np.log(gmm_covs[k] + 1e-30))
)
# Normalise responsibilities.
log_resp -= log_resp.max(axis=1, keepdims=True)
resp = np.exp(log_resp)
resp /= resp.sum(axis=1, keepdims=True)
fv_parts = []
for k in range(K):
gamma_k = resp[:, k] # (N,)
sqrt_w = np.sqrt(gmm_weights[k] + 1e-30)
diff = descriptor - gmm_means[k] # (N, T)
sigma = gmm_covs[k] # (T,)
# First-order gradient (mean).
g_mean = (1 / (N * sqrt_w)) * np.sum(
gamma_k[:, None] * diff / (np.sqrt(sigma) + 1e-30), axis=0
)
# Second-order gradient (variance).
g_var = (1 / (N * sqrt_w * np.sqrt(2))) * np.sum(
gamma_k[:, None] * (diff**2 / (sigma + 1e-30) - 1), axis=0
)
fv_parts.extend([g_mean, g_var])
fv = np.concatenate(fv_parts)
# L2 normalisation + power normalisation.
fv = np.sign(fv) * np.sqrt(np.abs(fv)) # power norm
norm = np.linalg.norm(fv)
if norm > 1e-10:
fv /= norm
return fv
[docs]
def fit_gmm_codebook(
all_descriptors: np.ndarray,
n_components: int = 32,
*,
seed: int | None = 42,
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""Fit a GMM codebook on pooled descriptors from a population.
Parameters
----------
all_descriptors : ndarray, shape (N_total, T)
Pooled descriptors from all subjects.
n_components : int
Number of GMM components.
seed : int
Returns
-------
means : ndarray, shape (K, T)
covariances : ndarray, shape (K, T)
Diagonal covariances.
weights : ndarray, shape (K,)
"""
from sklearn.mixture import GaussianMixture
gmm = GaussianMixture(
n_components=n_components,
covariance_type="diag",
random_state=seed,
max_iter=200,
)
gmm.fit(all_descriptors)
return gmm.means_, gmm.covariances_, gmm.weights_
[docs]
def bag_of_spectral_words(
descriptor: DescriptorMatrix,
codebook: np.ndarray,
*,
soft: bool = True,
sigma: float | None = None,
) -> GlobalDescriptor:
"""Bag-of-Words encoding of per-vertex descriptors.
Parameters
----------
descriptor : ndarray, shape (N, T)
codebook : ndarray, shape (K, T)
Cluster centres (from k-means on pooled descriptors).
soft : bool
Soft assignment (Gaussian weighted) vs hard assignment.
sigma : float, optional
Bandwidth for soft assignment. ``None`` = auto.
Returns
-------
ndarray, shape (K,)
Normalised histogram over codebook words.
"""
from scipy.spatial.distance import cdist
dists = cdist(descriptor, codebook, metric="euclidean") # (N, K)
if soft:
if sigma is None:
sigma = float(np.median(dists))
weights = np.exp(-(dists**2) / (2 * sigma**2))
weights /= weights.sum(axis=1, keepdims=True)
histogram = weights.sum(axis=0)
else:
assignments = np.argmin(dists, axis=1)
histogram = np.bincount(assignments, minlength=codebook.shape[0]).astype(np.float64)
# L1 normalisation.
histogram /= histogram.sum() + 1e-30
return histogram
[docs]
def kernel_mean_embedding(
descriptor: DescriptorMatrix,
*,
kernel: Literal["rbf", "linear"] = "rbf",
sigma: float | None = None,
n_landmarks: int = 100,
seed: int | None = None,
) -> GlobalDescriptor:
"""Kernel mean embedding of a descriptor distribution.
Embeds the empirical distribution of per-vertex descriptors into
an RKHS, approximated by random Fourier features (Rahimi &
Recht 2007) for scalability.
Parameters
----------
descriptor : ndarray, shape (N, T)
kernel : str
sigma : float, optional
n_landmarks : int
Number of random Fourier features.
seed : int
Returns
-------
ndarray, shape (n_landmarks,)
Approximate kernel mean embedding.
"""
rng = np.random.default_rng(seed)
N, T = descriptor.shape
if sigma is None:
from scipy.spatial.distance import pdist
sample = descriptor[rng.choice(N, min(200, N), replace=False)]
sigma = float(np.median(pdist(sample))) if len(sample) > 1 else 1.0
sigma = max(sigma, 1e-6)
if kernel == "rbf":
# Random Fourier features.
W = rng.normal(0, 1 / sigma, (T, n_landmarks))
b = rng.uniform(0, 2 * np.pi, n_landmarks)
Z = np.sqrt(2 / n_landmarks) * np.cos(descriptor @ W + b)
return Z.mean(axis=0)
elif kernel == "linear":
return descriptor.mean(axis=0)
else:
raise ValueError(f"Unknown kernel: {kernel!r}")
# ======================================================================
# §7 DISSIMILARITY MEASURES
# ======================================================================
[docs]
def emd_distance(a: np.ndarray, b: np.ndarray) -> float:
"""Earth Mover's Distance (1D Wasserstein) between distributions.
For multi-dimensional descriptors, averages across columns.
Parameters
----------
a, b : ndarray, shape (N,) or (N, T)
Returns
-------
float
"""
from scipy.stats import wasserstein_distance
a, b = np.asarray(a), np.asarray(b)
if a.ndim == 1 and b.ndim == 1:
return float(wasserstein_distance(a, b))
if a.ndim == 1:
a = a[:, None]
if b.ndim == 1:
b = b[:, None]
T = min(a.shape[1], b.shape[1])
return float(np.mean([wasserstein_distance(a[:, t], b[:, t]) for t in range(T)]))
[docs]
def kl_divergence(a: np.ndarray, b: np.ndarray, *, bins: int = 50) -> float:
"""KL divergence estimated via histogram binning.
Parameters
----------
a, b : ndarray, shape (N,)
bins : int
Returns
-------
float
D_KL(a || b).
"""
a, b = np.ravel(a), np.ravel(b)
lo = min(a.min(), b.min())
hi = max(a.max(), b.max())
edges = np.linspace(lo, hi, bins + 1)
p = np.histogram(a, bins=edges, density=True)[0] + 1e-10
q = np.histogram(b, bins=edges, density=True)[0] + 1e-10
p /= p.sum()
q /= q.sum()
return float(np.sum(p * np.log(p / q)))
[docs]
def js_divergence(a: np.ndarray, b: np.ndarray, **kwargs) -> float:
"""Jensen-Shannon divergence (symmetric, bounded [0, ln2]).
Parameters
----------
a, b : ndarray
Returns
-------
float
"""
return 0.5 * kl_divergence(a, b, **kwargs) + 0.5 * kl_divergence(b, a, **kwargs)
[docs]
def energy_distance(a: np.ndarray, b: np.ndarray) -> float:
"""Energy distance between two multivariate samples.
Parameters
----------
a : ndarray, shape (N_a, d)
b : ndarray, shape (N_b, d)
Returns
-------
float
"""
from scipy.spatial.distance import cdist
a = np.atleast_2d(a)
b = np.atleast_2d(b)
ab = cdist(a, b).mean()
aa = cdist(a, a).mean()
bb = cdist(b, b).mean()
return float(2 * ab - aa - bb)
# ======================================================================
# §8 RSA — Representational Similarity Analysis
# ======================================================================
[docs]
def rdm(
features: np.ndarray,
*,
metric: Literal["correlation", "euclidean", "cosine"] = "correlation",
) -> DistanceMatrix:
"""Representational Dissimilarity Matrix.
Parameters
----------
features : ndarray, shape (S, d)
S items × d features.
metric : str
Returns
-------
ndarray, shape (S, S)
"""
from scipy.spatial.distance import pdist, squareform
if metric == "correlation":
D = pdist(features, metric="correlation")
elif metric == "euclidean":
D = pdist(features, metric="euclidean")
elif metric == "cosine":
D = pdist(features, metric="cosine")
else:
raise ValueError(f"Unknown metric: {metric!r}")
return squareform(D)
[docs]
def rsa_compare(
rdm_a: DistanceMatrix,
rdm_b: DistanceMatrix,
*,
method: Literal["spearman", "pearson", "kendall"] = "spearman",
permutations: int = 0,
seed: int | None = None,
) -> tuple[float, float]:
"""Compare two RDMs via Representational Similarity Analysis.
Parameters
----------
rdm_a, rdm_b : ndarray, shape (S, S)
Representational Dissimilarity Matrices.
method : str
Correlation method.
permutations : int
If > 0, compute p-value via permutation test.
seed : int
Returns
-------
r : float
Correlation between upper triangles.
p_value : float
p-value (parametric if permutations=0, permutation otherwise).
"""
a_upper = rdm_a[np.triu_indices_from(rdm_a, k=1)]
b_upper = rdm_b[np.triu_indices_from(rdm_b, k=1)]
if method == "spearman":
r_obs, p_param = sp_stats.spearmanr(a_upper, b_upper)
elif method == "pearson":
r_obs, p_param = sp_stats.pearsonr(a_upper, b_upper)
elif method == "kendall":
r_obs, p_param = sp_stats.kendalltau(a_upper, b_upper)
else:
raise ValueError(f"Unknown method: {method!r}")
if permutations <= 0:
return float(r_obs), float(p_param)
# Permutation test (row/column shuffle of one RDM).
rng = np.random.default_rng(seed)
count = 0
n = rdm_a.shape[0]
for _ in range(permutations):
perm = rng.permutation(n)
rdm_perm = rdm_a[np.ix_(perm, perm)]
perm_upper = rdm_perm[np.triu_indices(n, k=1)]
if method == "spearman":
r_perm, _ = sp_stats.spearmanr(perm_upper, b_upper)
elif method == "pearson":
r_perm, _ = sp_stats.pearsonr(perm_upper, b_upper)
else:
r_perm, _ = sp_stats.kendalltau(perm_upper, b_upper)
if abs(r_perm) >= abs(r_obs):
count += 1
p_perm = (count + 1) / (permutations + 1)
return float(r_obs), float(p_perm)
[docs]
def mantel_test(
matrix_a: DistanceMatrix,
matrix_b: DistanceMatrix,
*,
n_permutations: int = 5000,
method: Literal["pearson", "spearman"] = "spearman",
seed: int | None = None,
) -> tuple[float, float]:
"""Mantel test — correlation between two distance matrices.
Tests whether two distance matrices are correlated by comparing
the observed correlation to a null distribution generated by
row/column permutation.
Parameters
----------
matrix_a, matrix_b : ndarray, shape (N, N)
n_permutations : int
method : str
seed : int
Returns
-------
r : float
p_value : float
"""
return rsa_compare(
matrix_a,
matrix_b,
method=method,
permutations=n_permutations,
seed=seed,
)
# ======================================================================
# §9 CONNECTOME & NETWORK ANALYSIS
# ======================================================================
[docs]
def modularity(
connectome: ConnectomeMatrix,
community_labels: np.ndarray,
*,
gamma: float = 1.0,
) -> float:
"""Newman's modularity Q for a given community partition.
Q = (1/2m) Σ_{ij} [A_{ij} - γ·k_i·k_j/(2m)] · δ(c_i, c_j)
Parameters
----------
connectome : ndarray, shape (R, R)
Similarity matrix (higher = more similar).
community_labels : ndarray, shape (R,)
Community assignment per node.
gamma : float
Resolution parameter.
Returns
-------
float
Modularity Q.
"""
A = np.asarray(connectome, dtype=np.float64)
np.fill_diagonal(A, 0)
m2 = A.sum()
if m2 < 1e-10:
return 0.0
k = A.sum(axis=1)
Q = 0.0
for i in range(len(A)):
for j in range(len(A)):
if community_labels[i] == community_labels[j]:
Q += A[i, j] - gamma * k[i] * k[j] / m2
return float(Q / m2)
[docs]
def participation_coefficient(
connectome: ConnectomeMatrix,
community_labels: np.ndarray,
) -> np.ndarray:
"""Participation coefficient per node.
PC_i = 1 - Σ_k (s_{ik} / s_i)²
High PC → hub connected to multiple communities.
Low PC → provincial node within one community.
Parameters
----------
connectome : ndarray, shape (R, R)
community_labels : ndarray, shape (R,)
Returns
-------
ndarray, shape (R,)
"""
A = np.asarray(connectome, dtype=np.float64)
np.fill_diagonal(A, 0)
labels = np.asarray(community_labels)
communities = np.unique(labels)
s_total = A.sum(axis=1) # (R,)
PC = np.ones(len(A))
for c in communities:
mask = labels == c
s_c = A[:, mask].sum(axis=1) # (R,)
ratio = s_c / (s_total + 1e-30)
PC -= ratio**2
return PC
[docs]
def intra_inter_ratio(
connectome: ConnectomeMatrix,
community_labels: np.ndarray,
) -> dict[str, float]:
"""Intra- vs inter-community connectivity ratio.
Parameters
----------
connectome : ndarray, shape (R, R)
community_labels : ndarray, shape (R,)
Returns
-------
dict
``"intra_mean"``, ``"inter_mean"``, ``"ratio"``.
"""
A = np.asarray(connectome, dtype=np.float64)
labels = np.asarray(community_labels)
same = np.equal.outer(labels, labels)
np.fill_diagonal(same, False)
intra_vals = A[same]
inter_vals = A[~same & ~np.eye(len(A), dtype=bool)]
intra_mean = float(intra_vals.mean()) if len(intra_vals) > 0 else 0.0
inter_mean = float(inter_vals.mean()) if len(inter_vals) > 0 else 0.0
ratio = intra_mean / (inter_mean + 1e-30)
return {"intra_mean": intra_mean, "inter_mean": inter_mean, "ratio": ratio}
# ======================================================================
# §10 ASYMMETRY ANALYSIS
# ======================================================================
[docs]
def lateralisation_index(
left: np.ndarray,
right: np.ndarray,
) -> np.ndarray:
"""Lateralisation Index: LI = (L - R) / (|L| + |R|).
Parameters
----------
left, right : ndarray
Matching descriptor values for L and R hemispheres.
Returns
-------
ndarray
LI ∈ [-1, +1]. Positive = left > right.
"""
L = np.asarray(left, dtype=np.float64)
R = np.asarray(right, dtype=np.float64)
return (L - R) / (np.abs(L) + np.abs(R) + 1e-30)
[docs]
def asymmetry_test(
left: np.ndarray,
right: np.ndarray,
*,
test: Literal["paired_t", "wilcoxon"] = "wilcoxon",
) -> tuple[float, float]:
"""Test whether L and R descriptors differ significantly.
Parameters
----------
left, right : ndarray, shape (S,) — per-subject global descriptors
test : str
Returns
-------
statistic, p_value : float
"""
L = np.asarray(left).ravel()
R = np.asarray(right).ravel()
n = min(len(L), len(R))
L, R = L[:n], R[:n]
if test == "paired_t":
return tuple(float(x) for x in sp_stats.ttest_rel(L, R))
elif test == "wilcoxon":
return tuple(float(x) for x in sp_stats.wilcoxon(L, R))
raise ValueError(f"Unknown test: {test!r}")
# ======================================================================
# §11 DIMENSIONALITY REDUCTION
# ======================================================================
[docs]
def spectral_pca(
features: np.ndarray,
n_components: int = 2,
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
"""PCA on spectral features.
Parameters
----------
features : ndarray, shape (S, d)
n_components : int
Returns
-------
scores : ndarray, shape (S, n_components)
loadings : ndarray, shape (n_components, d)
explained_variance_ratio : ndarray, shape (n_components,)
"""
from sklearn.decomposition import PCA
pca = PCA(n_components=n_components)
scores = pca.fit_transform(features)
return scores, pca.components_, pca.explained_variance_ratio_
[docs]
def spectral_mds(
distance_matrix: DistanceMatrix,
n_components: int = 2,
*,
seed: int | None = None,
) -> np.ndarray:
"""Classical MDS embedding from a distance matrix.
Parameters
----------
distance_matrix : ndarray, shape (S, S)
n_components : int
seed : int
Returns
-------
ndarray, shape (S, n_components)
"""
from sklearn.manifold import MDS
mds = MDS(
n_components=n_components,
dissimilarity="precomputed",
random_state=seed,
normalized_stress="auto",
)
return mds.fit_transform(distance_matrix)
[docs]
def spectral_umap(
features: np.ndarray,
n_components: int = 2,
*,
n_neighbors: int = 15,
min_dist: float = 0.1,
seed: int | None = None,
) -> np.ndarray:
"""UMAP embedding of spectral features.
Parameters
----------
features : ndarray, shape (S, d)
n_components : int
n_neighbors : int
min_dist : float
seed : int
Returns
-------
ndarray, shape (S, n_components)
"""
try:
import umap
except ImportError as exc:
raise ImportError("umap-learn is required for UMAP.\n pip install umap-learn") from exc
reducer = umap.UMAP(
n_components=n_components,
n_neighbors=n_neighbors,
min_dist=min_dist,
random_state=seed,
)
return reducer.fit_transform(features)
# ======================================================================
__all__: list[str] = [
# Classification
"ClassificationResult",
# Vertex-wise tests
"VertexWiseResult",
"asymmetry_test",
"bag_of_spectral_words",
"classify",
# Effect sizes
"cohens_d_map",
# Dissimilarity
"emd_distance",
"energy_distance",
# Dimension collapsing
"fisher_vector",
"fit_gmm_codebook",
"hedges_g_map",
"intra_inter_ratio",
"js_divergence",
"kernel_mean_embedding",
"kl_divergence",
# Asymmetry
"lateralisation_index",
"mantel_test",
# Connectome
"modularity",
"participation_coefficient",
# RSA
"rdm",
"rsa_compare",
"spectral_mds",
# Dimensionality reduction
"spectral_pca",
"spectral_umap",
# Surprise maps
"surprise_map",
"surprise_map_percentile",
"tfce",
# Correlation
"vertexwise_correlation",
"vertexwise_mannwhitney",
"vertexwise_permutation",
"vertexwise_ttest",
]