Source code for spectralbrain.io.group

"""Group-level loading for cohort statistics.

This module closes the gap between per-file I/O and the group-statistics
functions in :mod:`spectralbrain.statistics.analysis`. The workflow is:

1. **Discover** one file per subject from a BIDS/derivatives tree, a
   FreeSurfer ``SUBJECTS_DIR``, or an explicit list/dict of paths.
2. **Load** every subject in parallel (joblib via
   :func:`spectralbrain.backends.cpu.parallel_map`), fail-soft: a subject
   that errors is logged and dropped rather than aborting the cohort.
3. **Stack** the per-subject arrays into a single ``(S, N)`` (or
   ``(S, N, T)``) array, packaged in a :class:`GroupData` object that
   carries subject IDs and parsed BIDS entities (for covariates).

Two loading modes:

- ``mode="maps"`` — load a per-vertex overlay/metric or a precomputed
  descriptor field that is **already vertex-corresponded** on a common
  template (the light path).
- ``mode="pipeline"`` — load each surface, build the Laplace–Beltrami
  decomposition, and compute a spectral descriptor per subject (the heavy
  path, where joblib and the GPU backends pay off).

The resulting :class:`GroupData` plugs straight into
:func:`group_comparison`, which dispatches to the vertex-wise tests.

Examples
--------
>>> # HippUnfold-style derivatives, descriptor fields already on template:
>>> files = discover_bids(
...     "/data/derivatives/hippunfold",
...     "sub-{sub}/surf/sub-{sub}_hemi-L_*_thickness.shape.gii",
... )
>>> group = load_group(files, mode="maps", n_jobs=8)
>>> res = group_comparison(group, group.covariate("group"), test="ttest")

>>> # Full pipeline from FreeSurfer surfaces, HKS per subject, on GPU:
>>> files = discover_freesurfer("/data/fs", surface="white", hemi="lh")
>>> from spectralbrain.backends import TorchBackend
>>> group = load_group(
...     files, mode="pipeline", descriptor="hks", k=100,
...     backend=TorchBackend(), n_jobs=4,
... )
"""

from __future__ import annotations

from collections.abc import Callable
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any

import numpy as np

from spectralbrain.io.loaders import load as _load
from spectralbrain.runtime import PathLike, get_logger
from spectralbrain.utils.helpers import parse_bids_filename

logger = get_logger(__name__)


# ======================================================================
# §1  GROUP CONTAINER
# ======================================================================


[docs] @dataclass class GroupData: """A loaded cohort ready for group statistics. Attributes ---------- data : ndarray or list of ndarray Stacked per-subject arrays, shape ``(S, N)`` or ``(S, N, T)``. Falls back to a list if subjects have heterogeneous shapes. subject_ids : list of str Subject identifiers, aligned with ``data``'s first axis. entities : list of dict BIDS entities parsed from each source filename (for covariates). paths : list of Path Source file per subject. faces : ndarray, optional Template faces ``(F, 3)`` — useful for TFCE adjacency. metadata : dict Bookkeeping (mode, number of failed subjects, …). """ data: Any subject_ids: list[str] entities: list[dict[str, str]] paths: list[Path] faces: np.ndarray | None = None metadata: dict[str, Any] = field(default_factory=dict) def __len__(self) -> int: return len(self.subject_ids) @property def n_subjects(self) -> int: """Number of successfully loaded subjects.""" return len(self.subject_ids) @property def is_stacked(self) -> bool: """True if ``data`` is a single stacked array.""" return isinstance(self.data, np.ndarray)
[docs] def covariate(self, key: str, default: Any = None) -> np.ndarray: """Return a BIDS entity (e.g. ``"ses"``, ``"group"``) per subject. Parameters ---------- key : str Entity key to pull from each subject's parsed filename. default : Any Value for subjects missing the entity. Returns ------- ndarray, shape (S,) """ return np.array([e.get(key, default) for e in self.entities], dtype=object)
[docs] def split(self, labels: np.ndarray) -> tuple[np.ndarray, np.ndarray]: """Split the stacked data into two groups by a 2-level label array. Parameters ---------- labels : array-like, shape (S,) Exactly two distinct non-null values define the two groups. Returns ------- group_a, group_b : ndarray Subsets of ``data`` for the two label levels (in sorted order). """ if not self.is_stacked: raise ValueError("Cannot split: data is not a single stacked array.") labels = np.asarray(labels) mask_valid = np.array([x is not None for x in labels]) uniq = sorted({x for x in labels[mask_valid]}) if len(uniq) != 2: raise ValueError(f"split() needs exactly 2 label levels; got {len(uniq)}: {uniq}") a = self.data[labels == uniq[0]] b = self.data[labels == uniq[1]] return a, b
# ====================================================================== # §2 DISCOVERY # ====================================================================== def _norm_sid(s: str) -> str: """Normalise a subject id to the ``sub-XXX`` form.""" return s if s.startswith("sub-") else f"sub-{s}"
[docs] def discover_bids( root: PathLike, pattern: str, *, subjects: list[str] | None = None, ) -> dict[str, Path]: """Discover one file per subject in a BIDS / derivatives tree. Parameters ---------- root : PathLike Dataset (or derivatives) root. pattern : str Glob relative to *root* with a ``{sub}`` placeholder (the bare label, no ``sub-`` prefix), e.g. ``"sub-{sub}/anat/sub-{sub}_hemi-L_thickness.shape.gii"``. subjects : list of str, optional Restrict to these subjects (``"sub-01"`` or ``"01"``). Defaults to every ``sub-*`` directory under *root*. Returns ------- dict of {subject_id: Path} Subjects with no match (or whose match is ambiguous) are logged; the first match is taken when several exist. """ root = Path(root) if subjects is None: labels = sorted(p.name[4:] for p in root.glob("sub-*") if p.is_dir()) else: labels = [s[4:] if s.startswith("sub-") else s for s in subjects] found: dict[str, Path] = {} for label in labels: matches = sorted(root.glob(pattern.replace("{sub}", label))) if not matches: logger.warning("No file for sub-%s (pattern %r)", label, pattern) continue if len(matches) > 1: logger.warning("Multiple files for sub-%s; using %s", label, matches[0].name) found[_norm_sid(label)] = matches[0] logger.info("BIDS discovery: %d subjects matched.", len(found)) return found
[docs] def discover_freesurfer( subjects_dir: PathLike, *, hemi: str = "lh", surface: str | None = None, measure: str | None = None, subjects: list[str] | None = None, ) -> dict[str, Path]: """Discover FreeSurfer surface or morphometry files per subject. Provide exactly one of *surface* (geometry, e.g. ``"white"``) or *measure* (overlay, e.g. ``"thickness"``). The path resolved is ``{subjects_dir}/{sub}/surf/{hemi}.{name}``. Parameters ---------- subjects_dir : PathLike FreeSurfer ``SUBJECTS_DIR``. hemi : str ``"lh"`` or ``"rh"``. surface : str, optional Surface geometry name (``"white"``, ``"pial"``, …). measure : str, optional Morphometry overlay (``"thickness"``, ``"curv"``, ``"sulc"``, …). subjects : list of str, optional Restrict to these subject directory names. Returns ------- dict of {subject_id: Path} """ sd = Path(subjects_dir) if (surface is None) == (measure is None): raise ValueError("Provide exactly one of `surface` or `measure`.") name = surface if surface is not None else measure if subjects is None: candidates = sorted(p.name for p in sd.iterdir() if (p / "surf").is_dir()) else: candidates = list(subjects) found: dict[str, Path] = {} for sub in candidates: f = sd / sub / "surf" / f"{hemi}.{name}" if f.exists(): found[sub] = f else: logger.warning("Missing %s", f) logger.info("FreeSurfer discovery: %d subjects matched.", len(found)) return found
# ====================================================================== # §3 PER-SUBJECT LOADERS # ====================================================================== def _default_map_loader(path: PathLike) -> np.ndarray: """Extract a per-vertex array from any supported overlay/metric file.""" r = _load(path) if "scalars" in r: return np.asarray(r["scalars"]) if "data" in r: # volume → flattened return np.asarray(r["data"]).ravel() if "vertices" in r: raise ValueError( f"{Path(path).name} is surface geometry (no per-vertex scalar). " "Use mode='pipeline' to compute a descriptor, or point at an " "overlay/metric file." ) raise ValueError(f"No per-vertex array found in {Path(path).name}") def _make_descriptor_loader( descriptor: str = "hks", *, k: int = 100, backend: Any | None = None, laplacian: str = "cotangent", **descriptor_kwargs: Any, ) -> Callable[[PathLike], np.ndarray]: """Build a loader: surface file → decompose → spectral descriptor.""" from spectralbrain.core.meshes import BrainMesh from spectralbrain.spectral import descriptors as _desc table: dict[str, Callable[..., np.ndarray]] = { "hks": _desc.compute_hks, "si_hks": _desc.compute_si_hks, "wks": _desc.compute_wks, "gps": _desc.compute_gps, "shapedna": _desc.compute_shapedna, } if descriptor not in table: raise ValueError(f"Unknown descriptor {descriptor!r}; choose from {list(table)}") fn = table[descriptor] def _loader(path: PathLike) -> np.ndarray: r = _load(path) if "vertices" not in r or "faces" not in r: raise ValueError(f"{Path(path).name} is not a surface mesh.") mesh = BrainMesh(r["vertices"], r["faces"]) decomp = mesh.decompose(k=k, laplacian_method=laplacian, backend=backend) return np.asarray(fn(decomp, **descriptor_kwargs)) return _loader # ====================================================================== # §4 GROUP LOADER # ====================================================================== def _finalize_group( items: list[tuple[str, Path]], arrays: list[np.ndarray | None], *, mode: str, stack: bool = True, faces: np.ndarray | None = None, ) -> GroupData: """Drop failed subjects, stack if shapes match, and package a GroupData.""" ok = [(s, p, a) for (s, p), a in zip(items, arrays) if a is not None] n_failed = len(items) - len(ok) sids = [s for s, _, _ in ok] out_paths = [p for _, p, _ in ok] entities = [parse_bids_filename(p.name) for p in out_paths] loaded = [a for _, _, a in ok] data: Any if stack and loaded and len({a.shape for a in loaded}) == 1: data = np.stack(loaded, axis=0) else: if stack and loaded: logger.warning("Heterogeneous subject shapes; returning a list, not a stacked array.") data = loaded logger.info("Loaded group: %d/%d subjects (mode=%s).", len(ok), len(items), mode) return GroupData( data=data, subject_ids=sids, entities=entities, paths=out_paths, faces=faces, metadata={"mode": mode, "n_failed": n_failed, "n_requested": len(items)}, )
[docs] def load_group( files: dict[str, PathLike] | list[PathLike], *, mode: str = "maps", loader: Callable[[PathLike], np.ndarray] | None = None, n_jobs: int = 1, stack: bool = True, descriptor: str = "hks", k: int = 100, backend: Any | None = None, descriptor_kwargs: dict[str, Any] | None = None, template_faces: np.ndarray | None = None, ) -> GroupData: """Load and stack a cohort for group statistics. Parameters ---------- files : dict or list ``{subject_id: path}`` (e.g. from :func:`discover_bids` / :func:`discover_freesurfer`) or a plain list of paths (subject IDs are then parsed from the filenames). mode : ``"maps"`` or ``"pipeline"`` ``"maps"`` loads vertex-corresponded overlays/descriptor fields; ``"pipeline"`` loads each surface and computes a descriptor. loader : callable, optional Custom ``path -> ndarray`` loader. Overrides *mode*. n_jobs : int Parallel workers for loading (joblib). ``1`` = sequential. stack : bool Stack into one array when subject shapes match (else keep a list). descriptor, k, backend, descriptor_kwargs Pipeline-mode options (descriptor name, eigenpairs, compute backend, and extra keyword arguments forwarded to the descriptor). template_faces : ndarray, optional Stored on the result for downstream TFCE adjacency. Returns ------- GroupData """ if isinstance(files, dict): items: list[tuple[str, Path]] = [(s, Path(p)) for s, p in files.items()] else: items = [] for p in files: p = Path(p) sub = parse_bids_filename(p.name).get("sub") items.append((_norm_sid(sub) if sub else p.stem, p)) if loader is None: if mode == "maps": loader = _default_map_loader elif mode == "pipeline": loader = _make_descriptor_loader( descriptor, k=k, backend=backend, **(descriptor_kwargs or {}) ) else: raise ValueError(f"Unknown mode {mode!r}; use 'maps' or 'pipeline'.") paths = [p for _, p in items] active_loader = loader def _one(path: Path) -> np.ndarray | None: try: return np.asarray(active_loader(path)) except Exception as exc: logger.error("✗ %s: %s", path.name, exc) return None if n_jobs == 1: arrays = [_one(p) for p in paths] else: from spectralbrain.backends.cpu import parallel_map arrays = parallel_map(_one, paths, n_jobs=n_jobs, description="Loading group") return _finalize_group(items, arrays, mode=mode, stack=stack, faces=template_faces)
# ====================================================================== # §5 TEMPLATE RESAMPLING (FreeSurfer) # ====================================================================== def _resolve_sphere(subjects_dir: Path, subject: str, hemi: str) -> Path: """Locate a subject's (or template's) ``{hemi}.sphere.reg``.""" from spectralbrain.io.parcellate import _find_freesurfer_home cand = subjects_dir / subject / "surf" / f"{hemi}.sphere.reg" if cand.exists(): return cand fs_home = _find_freesurfer_home() if fs_home: alt = fs_home / "subjects" / subject / "surf" / f"{hemi}.sphere.reg" if alt.exists(): return alt raise FileNotFoundError( f"{hemi}.sphere.reg not found for '{subject}'. Run recon-all, or set " "FREESURFER_HOME so fsaverage can be located." )
[docs] def resample_to_template( values: np.ndarray, subjects_dir: PathLike, subject_id: str, hemi: str, *, template: str = "fsaverage", method: str = "nearest", k: int = 3, ) -> np.ndarray: """Resample a native-space per-vertex overlay onto a template surface. Mirrors FreeSurfer's ``mri_surf2surf``: both surfaces are brought into spherical-registration space (``{hemi}.sphere.reg``) and the template vertices sample the subject overlay there. ``"nearest"`` takes the closest subject vertex (matching SpectralBrain's existing label projection); ``"linear"`` blends the *k* nearest by inverse distance, which is smoother for continuous metrics. Parameters ---------- values : ndarray, shape (N_subject,) Per-vertex overlay on the subject's native surface. subjects_dir : PathLike FreeSurfer ``SUBJECTS_DIR`` (must also contain *template*). subject_id : str Subject directory name. hemi : str ``"lh"`` or ``"rh"``. template : str Template subject (default ``"fsaverage"``). method : ``"nearest"`` or ``"linear"`` Interpolation on the registration sphere. k : int Neighbours for ``"linear"`` inverse-distance weighting. Returns ------- ndarray, shape (N_template,) Overlay resampled onto the template surface. """ from scipy.spatial import cKDTree from spectralbrain.io.loaders import load_freesurfer_surface sd = Path(subjects_dir) values = np.asarray(values) src_coords, _ = load_freesurfer_surface(_resolve_sphere(sd, subject_id, hemi)) tgt_coords, _ = load_freesurfer_surface(_resolve_sphere(sd, template, hemi)) if len(values) != len(src_coords): raise ValueError( f"Overlay has {len(values)} values but subject surface has {len(src_coords)} vertices." ) tree = cKDTree(src_coords) if method == "nearest": _, idx = tree.query(tgt_coords, k=1) return values[idx] if method == "linear": dist, idx = tree.query(tgt_coords, k=k) w = 1.0 / np.clip(dist, 1e-12, None) w /= w.sum(axis=1, keepdims=True) return (values[idx] * w).sum(axis=1) raise ValueError(f"Unknown method {method!r}; use 'nearest' or 'linear'.")
[docs] def load_group_freesurfer( subjects_dir: PathLike, *, measure: str, hemi: str = "lh", template: str = "fsaverage", resample: bool = True, method: str = "nearest", subjects: list[str] | None = None, n_jobs: int = 1, ) -> GroupData: """Load a FreeSurfer morphometry measure across a cohort onto a template. For each subject the native overlay (``{hemi}.{measure}``) is loaded and — unless ``resample=False`` — resampled to *template* via :func:`resample_to_template`, so the cohort stacks into a single vertex-corresponded ``(S, N_template)`` array ready for :func:`group_comparison`. Parameters ---------- subjects_dir : PathLike FreeSurfer ``SUBJECTS_DIR``. measure : str Morphometry overlay (``"thickness"``, ``"curv"``, ``"sulc"``, …). hemi : str ``"lh"`` or ``"rh"``. template : str Target template subject (default ``"fsaverage"``). resample : bool Resample to *template* (True) or keep native space (False; only stackable if every subject shares the vertex count). method : ``"nearest"`` or ``"linear"`` Resampling interpolation. subjects : list of str, optional Restrict to these subject directory names. n_jobs : int Parallel workers (joblib). Returns ------- GroupData """ from spectralbrain.io.loaders import load_freesurfer_morph sd = Path(subjects_dir) files = discover_freesurfer(sd, hemi=hemi, measure=measure, subjects=subjects) items: list[tuple[str, Path]] = list(files.items()) def _one(item: tuple[str, Path]) -> np.ndarray | None: sid, path = item try: vals = np.asarray(load_freesurfer_morph(path)) if resample: vals = resample_to_template(vals, sd, sid, hemi, template=template, method=method) return vals except Exception as exc: logger.error("✗ %s: %s", sid, exc) return None if n_jobs == 1: arrays = [_one(it) for it in items] else: from spectralbrain.backends.cpu import parallel_map arrays = parallel_map(_one, items, n_jobs=n_jobs, description="Loading FS group") mode = f"freesurfer:{measure}" + (f"→{template}" if resample else "") return _finalize_group(items, arrays, mode=mode)
# ====================================================================== # §6 ANALYSIS GLUE # ======================================================================
[docs] def group_comparison( group: GroupData | tuple[np.ndarray, np.ndarray], labels: np.ndarray | None = None, *, test: str = "ttest", **kwargs: Any, ) -> Any: """Run a vertex-wise group comparison on a loaded cohort. Parameters ---------- group : GroupData or (group_a, group_b) A loaded cohort (split via *labels*) or a pre-split pair of arrays. labels : array-like, shape (S,), optional Two-level grouping variable (required when *group* is a :class:`GroupData`). Often ``group.covariate("group")``. test : ``"ttest"``, ``"mannwhitney"``, or ``"permutation"`` Which vertex-wise test from :mod:`spectralbrain.statistics.analysis` to run. **kwargs Forwarded to the chosen test (e.g. ``correction``, ``alpha``, ``n_permutations``). Returns ------- VertexWiseResult """ from spectralbrain.statistics import analysis as _analysis tests: dict[str, Callable[..., Any]] = { "ttest": _analysis.vertexwise_ttest, "mannwhitney": _analysis.vertexwise_mannwhitney, "permutation": _analysis.vertexwise_permutation, } if test not in tests: raise ValueError(f"Unknown test {test!r}; choose from {list(tests)}") if isinstance(group, GroupData): if labels is None: raise ValueError("`labels` is required when passing a GroupData.") group_a, group_b = group.split(labels) else: group_a, group_b = group return tests[test](group_a, group_b, **kwargs)
__all__ = [ "GroupData", "discover_bids", "discover_freesurfer", "group_comparison", "load_group", "load_group_freesurfer", "resample_to_template", ]