Source code for scCS.single

"""
single.py — SingleScorer: single-condition commitment score analysis for scCS.

Orchestrates:
1. RNA velocity computation (optional, via scVelo)
2. Radial star embedding construction (embedding.py)
3. FateMap construction from user-supplied cluster labels (bifurcation.py)
4. Commitment score computation (scores.py)
5. Driver gene analysis (drivers.py)
6. Pathway enrichment (enrichment.py)
7. Plotting (plot.py)

Quick start
-----------
>>> import scCS
>>> scorer = scCS.SingleScorer(
...     adata,
...     root='17',
...     branches=['Monocyte', 'DC', 'Neutrophil'],
...     obs_key='leiden',
... )
>>> scorer.build_embedding(ordering_metric='pseudotime')
>>> scorer.fit()
>>> result = scorer.score()
>>> print(result.summary())
>>> scorer.plot_star(result)
>>> # Driver genes
>>> vel_drivers = scorer.get_velocity_drivers()
>>> deg_drivers = scorer.get_deg_drivers()
>>> # Pathway enrichment (mouse)
>>> enrichment = scorer.get_enrichment(deg_drivers)
"""

from __future__ import annotations

import warnings
from typing import List, Literal, Optional, Union

import numpy as np
import pandas as pd

from ._base import _BaseScorer, SectorMode
from .bifurcation import FateMap, build_fate_map
from .embedding import run_velocity_pipeline
from .scores import (
    CommitmentScoreResult,
    bin_angles,
    bootstrap_cs,
    centroid_sectors,
    compute_cell_scores,
    compute_commitment_entropy,       # backward-compat alias
    compute_mean_cell_entropy,
    compute_nn_cell_entropy,
    compute_per_fate_cell_entropy,
    compute_population_entropy,
    compute_commitment_vector,
    compute_magnitudes,
    compute_angles,
    compute_pairwise_cs_matrix,
    compute_sector_magnitudes,
    equal_sectors,
)
from .drivers import get_velocity_drivers, get_deg_drivers, get_velocity_fate_drivers
from .enrichment import run_enrichment_per_fate


[docs] class SingleScorer(_BaseScorer): """RNA velocity commitment scorer with radial star embedding. Computes commitment scores for a k-furcation defined by a single user-supplied bifurcation cluster and k terminal fate clusters. The embedding places the bifurcation cluster at the origin and arranges each fate on its own radial arm, with cells ordered by differentiation level (pseudotime, CytoTRACE2, or custom score). Parameters ---------- adata : AnnData Single-cell dataset. root : str Label of the progenitor/root cluster in adata.obs[obs_key]. Example: '17' (leiden cluster 17) branches : list of str Labels of the k terminal fate clusters. Example: ['Monocyte', 'DC', 'Neutrophil'] obs_key : str Column in adata.obs with cluster labels. Default: 'leiden'. n_angle_bins : int Number of angular bins for commitment scoring. Default: 36 (10° each). sector_method : {'centroid', 'equal'} How to define angular sectors: - 'centroid': anchor sectors to the direction from origin to each fate centroid in the star embedding (recommended). - 'equal': divide [0°, 360°] into k equal sectors. copy : bool Work on a copy of adata. Examples -------- # k=2 bifurcation scorer = SingleScorer( adata, root='17', branches=['homeostatic', 'activated'], obs_key='leiden', ) scorer.build_embedding(ordering_metric='pseudotime') scorer.fit() result = scorer.score() scorer.plot_star(result) # k=3 with CytoTRACE2 scorer = SingleScorer( adata, root='5', branches=['FateA', 'FateB', 'FateC'], obs_key='cell_type', ) scorer.build_embedding(ordering_metric='cytotrace') scorer.fit() result = scorer.score() scorer.plot_star(result) """ def __init__( self, adata, root: str, branches: List[str], obs_key: str = "leiden", n_angle_bins: int = 36, sector_method: SectorMode = "centroid", copy: bool = False, ): super().__init__( adata=adata, root=root, branches=branches, obs_key=obs_key, n_angle_bins=n_angle_bins, sector_method=sector_method, copy=copy, ) # ------------------------------------------------------------------ # Step 1 (optional): RNA velocity # ------------------------------------------------------------------
[docs] def compute_velocity( self, mode: str = "dynamical", n_top_genes: int = 2000, n_pcs: int = 30, n_neighbors: int = 30, min_shared_counts: int = 20, verbose: bool = True, ) -> "SingleScorer": """Run the full scVelo RNA velocity pipeline. Call this if adata does not yet have velocity vectors. Requires 'spliced' and 'unspliced' layers. Parameters ---------- mode : {'dynamical', 'stochastic', 'steady_state'} n_top_genes, n_pcs, n_neighbors, min_shared_counts : int verbose : bool Returns ------- self """ run_velocity_pipeline( self.adata, mode=mode, n_top_genes=n_top_genes, n_pcs=n_pcs, n_neighbors=n_neighbors, min_shared_counts=min_shared_counts, verbose=verbose, ) return self
# ------------------------------------------------------------------ # Step 4: Fit (build FateMap) # ------------------------------------------------------------------
[docs] def fit(self, verbose: bool = True) -> "SingleScorer": """Build the FateMap from the user-supplied cluster labels. Must be called after build_embedding(). This step: 1. Validates that root and branches exist in adata.obs[obs_key]. 2. Computes fate centroids in the X_sccs embedding. 3. Extracts velocity vectors if not already loaded. Returns ------- self """ self._check_embedding() self._needs_refit = False # clear after rebuild self._fate_map = build_fate_map( self.adata_sub, root=self.root, branches=self.branches, obs_key=self.obs_key, verbose=verbose, ) # Extract velocity vectors if not already loaded if self._vx is None or self._vy is None: try: self.project_velocity(verbose=verbose) except Exception as e: warnings.warn( f"Could not project velocity ({e}). " "Call project_velocity() or load_velocity_vectors() manually.", RuntimeWarning, stacklevel=2, ) if verbose: print(self._fate_map.summary()) self._fitted = True return self
# ------------------------------------------------------------------ # Step 5: Score computation # ------------------------------------------------------------------
[docs] def score( self, cell_mask: Optional[np.ndarray] = None, cell_level: bool = True, k_nn: Optional[int] = None, n_bootstrap: int = 0, bootstrap_ci: float = 0.95, bootstrap_seed: int = 42, verbose: bool = True, write_to_obs: bool = True, ) -> CommitmentScoreResult: """Compute commitment scores for the full population or a subset. Parameters ---------- cell_mask : np.ndarray of bool, shape (n_sub_cells,), optional Boolean mask over ``adata_sub`` cells (NOT the full adata). If provided, only cells where mask=True contribute to the population-level score (M_bin, M_sector, unCS, nCS). Per-cell scores are still computed for all cells. cell_level : bool Whether to compute per-cell fate affinity scores. k_nn : int, optional If set, compute NN-smoothed per-cell entropy using this many nearest neighbors in the scCS embedding (X_sccs). n_bootstrap : int Number of bootstrap replicates for CS confidence intervals. 0 (default) disables bootstrapping. Recommended: 500. bootstrap_ci : float Confidence interval level for bootstrap. Default 0.95 (95% CI). bootstrap_seed : int Random seed for bootstrap resampling. verbose : bool write_to_obs : bool If True (default), write per-cell scores to ``adata_sub.obs``. Returns ------- CommitmentScoreResult """ self._check_fitted() vx = self._vx.copy() vy = self._vy.copy() # Apply cell mask for population-level scoring if cell_mask is not None: vx_pop = vx[cell_mask] vy_pop = vy[cell_mask] else: vx_pop = vx vy_pop = vy # 1. Magnitudes and angles magnitudes = compute_magnitudes(vx_pop, vy_pop) angles = compute_angles(vx_pop, vy_pop) # 2. Angular binning bin_edges, M_bin = bin_angles(angles, magnitudes, n_bins=self.n_angle_bins) # 3. Sector definition fate_map = self._fate_map if self.sector_method == "centroid": sectors, fate_angles = centroid_sectors( fate_map.fate_centroids, fate_map.root_centroid, n_bins=self.n_angle_bins, ) else: sectors = equal_sectors(fate_map.k, n_bins=self.n_angle_bins) fate_angles = fate_map.arm_angles_deg # 4. Sector magnitudes M_sector = compute_sector_magnitudes(M_bin, sectors) # 5. Cell counts per fate if cell_mask is not None: n_cells_per_fate = np.array([ int(cell_mask[idx].sum()) for idx in fate_map.fate_cell_indices ], dtype=float) else: n_cells_per_fate = np.array([ len(idx) for idx in fate_map.fate_cell_indices ], dtype=float) # 6. Commitment vector and population-level entropy commitment_vector = compute_commitment_vector(M_sector) pop_entropy = compute_population_entropy(commitment_vector) # 7. Pairwise CS matrices pairwise_unCS = compute_pairwise_cs_matrix(M_sector, normalized=False) pairwise_nCS = compute_pairwise_cs_matrix( M_sector, n_cells_per_fate=n_cells_per_fate, normalized=True ) # 8. Per-cell scores, per-fate entropy, NN entropy cell_scores = None mean_cell_ent = float("nan") per_fate_ent = np.full(fate_map.k, float("nan")) nn_ent = None if cell_level: cell_scores = compute_cell_scores( vx, vy, fate_map.fate_centroids, fate_map.root_centroid, ) # Global mean per-cell entropy mean_cell_ent = compute_mean_cell_entropy(cell_scores) # Per-fate binary cell entropy — shape (k,) per_fate_ent = compute_per_fate_cell_entropy(cell_scores) # Write per-cell fate scores and raw entropy to adata_sub.obs k_fates = cell_scores.shape[1] with np.errstate(divide="ignore", invalid="ignore"): log_s = np.where(cell_scores > 0, np.log(cell_scores), 0.0) per_cell_H = -np.sum(cell_scores * log_s, axis=1) / np.log(k_fates) if write_to_obs: for j, name in enumerate(fate_map.fate_names): self.adata_sub.obs[f"cs_{name}"] = cell_scores[:, j] self.adata_sub.obs["cs_dominant_fate"] = [ fate_map.fate_names[int(np.argmax(cell_scores[i]))] for i in range(len(cell_scores)) ] self.adata_sub.obs["cs_entropy"] = per_cell_H # NN-smoothed entropy if k_nn is not None and k_nn > 0: coords = np.array(self.adata_sub.obsm["X_sccs"]) nn_ent = compute_nn_cell_entropy(cell_scores, coords, k_nn) if write_to_obs: self.adata_sub.obs["cs_nn_entropy"] = nn_ent # 9. Bootstrap CI (optional) boot_ci = None if n_bootstrap > 0: if verbose: print(f"[scCS] Computing bootstrap CI (n={n_bootstrap})...") boot_ci = bootstrap_cs( vx_pop, vy_pop, sectors=sectors, n_cells_per_fate=n_cells_per_fate, n_bins=self.n_angle_bins, n_bootstrap=n_bootstrap, ci=bootstrap_ci, seed=bootstrap_seed, normalized=True, ) result = CommitmentScoreResult( fate_names=fate_map.fate_names, M_bin=M_bin, bin_edges=bin_edges, sectors=sectors, M_sector=M_sector, n_cells_per_fate=n_cells_per_fate, commitment_vector=commitment_vector, population_entropy=pop_entropy, mean_cell_entropy=mean_cell_ent, per_fate_entropy=per_fate_ent, pairwise_unCS=pairwise_unCS, pairwise_nCS=pairwise_nCS, cell_scores=cell_scores, fate_angles=fate_angles, cell_obs_names=np.array(self.adata_sub.obs_names), nn_cell_entropy=nn_ent, nn_k=k_nn, bootstrap_ci=boot_ci, ) if verbose: print(result.summary()) return result
[docs] def score_per_subset( self, split_by: str, cell_level: bool = False, n_bootstrap: int = 0, verbose: bool = False, ) -> dict: """Compute commitment scores separately for each value of split_by. Useful for comparing commitment across conditions, time points, or trajectory directions. Parameters ---------- split_by : str Column in adata_sub.obs to split by. cell_level : bool n_bootstrap : int Bootstrap replicates for CI. 0 = disabled. verbose : bool Returns ------- dict mapping subset_value -> CommitmentScoreResult """ self._check_fitted() results = {} if split_by in self.adata_sub.obs.columns: subset_col = self.adata_sub.obs[split_by] elif split_by in self.adata.obs.columns: subset_col = self.adata.obs.loc[self.adata_sub.obs_names, split_by] else: raise KeyError( f"'{split_by}' not found in adata_sub.obs or adata.obs columns." ) for val in subset_col.unique(): mask = (subset_col == val).values if mask.sum() < 10: warnings.warn( f"Subset '{val}' has only {mask.sum()} cells in adata_sub. Skipping.", stacklevel=2, ) continue results[val] = self.score( cell_mask=mask, cell_level=cell_level, n_bootstrap=n_bootstrap, verbose=False, write_to_obs=False, ) # Warn when all off-diagonal nCS are inf (progenitor-only subset) ncs = results[val].pairwise_nCS k = ncs.shape[0] off_diag = [ncs[i, j] for i in range(k) for j in range(k) if i != j] if off_diag and all(np.isinf(v) for v in off_diag): warnings.warn( f"Subset '{val}': no cells from any fate arm — " f"pairwise nCS is undefined (inf). " f"This is expected for progenitor-only subsets.", stacklevel=2, ) if verbose: print(f"\n--- Subset: {val} ---") print(results[val].summary()) return results
# ------------------------------------------------------------------ # Label transfer to full adata # ------------------------------------------------------------------
[docs] def transfer_labels( self, adata, result: CommitmentScoreResult, prefix: str = "cs_", ) -> None: """Write per-cell commitment scores back to the full adata. After scoring, per-cell fate affinities, dominant fate, and entropy are stored in ``adata_sub.obs``. This method transfers those columns to the full adata so they can be used in downstream analyses. Cells not in the embedding subset receive NaN for numeric columns and 'unassigned' for categorical columns. Parameters ---------- adata : AnnData The full dataset (same object passed to SingleScorer.__init__). result : CommitmentScoreResult Output of scorer.score(cell_level=True). prefix : str Column prefix. Default: 'cs_'. """ self._check_fitted() if result.cell_scores is None: raise ValueError( "result.cell_scores is None. " "Run scorer.score(cell_level=True) first." ) n_full = adata.n_obs full_names = list(adata.obs_names) name_to_full_idx = {n: i for i, n in enumerate(full_names)} sub_names = list(self.adata_sub.obs_names) sub_to_full = np.array([ name_to_full_idx.get(n, -1) for n in sub_names ]) valid = sub_to_full >= 0 # Per-fate affinity for j, fate_name in enumerate(result.fate_names): col = f"{prefix}{fate_name}" arr = np.full(n_full, np.nan) arr[sub_to_full[valid]] = result.cell_scores[valid, j] adata.obs[col] = arr # Dominant fate dom_col = f"{prefix}dominant_fate" dom_arr = np.full(n_full, "unassigned", dtype=object) sub_dom = self.adata_sub.obs.get("cs_dominant_fate", None) if sub_dom is not None: dom_arr[sub_to_full[valid]] = sub_dom.values[valid] adata.obs[dom_col] = dom_arr # Per-cell entropy ent_col = f"{prefix}entropy" ent_arr = np.full(n_full, np.nan) sub_ent = self.adata_sub.obs.get("cs_entropy", None) if sub_ent is not None: ent_arr[sub_to_full[valid]] = sub_ent.values[valid] adata.obs[ent_col] = ent_arr # NN-smoothed entropy if result.nn_cell_entropy is not None: nn_col = f"{prefix}nn_entropy" nn_arr = np.full(n_full, np.nan) nn_arr[sub_to_full[valid]] = result.nn_cell_entropy[valid] adata.obs[nn_col] = nn_arr # Subset-local pseudotime if "sccs_pseudotime" in self.adata_sub.obs: pt_col = f"{prefix}sccs_pseudotime" pt_arr = np.full(n_full, np.nan) pt_arr[sub_to_full[valid]] = self.adata_sub.obs["sccs_pseudotime"].values[valid] adata.obs[pt_col] = pt_arr print( f"[scCS] Labels transferred to adata.obs for {valid.sum()} / {n_full} cells. " f"Columns: {[f'{prefix}{f}' for f in result.fate_names] + [dom_col, ent_col]}" )
# ------------------------------------------------------------------ # Serialization # ------------------------------------------------------------------
[docs] def save(self, path: str) -> None: """Serialize scorer state to a pickle file. Saves the embedding, FateMap, velocity vectors, and configuration. The full ``adata`` is NOT saved (too large); pass it again to :meth:`load`. Parameters ---------- path : str Destination file path (e.g., ``'scorer.pkl'``). """ import pickle import pathlib state = {k: getattr(self, k) for k in [ "adata_sub", "_fate_map", "_vx", "_vy", "root", "branches", "obs_key", "n_angle_bins", "sector_method", "_embedding_built", "_fitted", "_needs_refit", ]} pathlib.Path(path).write_bytes(pickle.dumps(state)) print(f"[scCS] SingleScorer state saved to '{path}'.")
@classmethod
[docs] def load(cls, path: str, adata) -> "SingleScorer": """Load a scorer from a pickle file. Parameters ---------- path : str Path to a file saved by :meth:`save`. adata : AnnData The full dataset (same object originally passed to ``SingleScorer.__init__``). Not stored in the pickle. Returns ------- SingleScorer """ import pickle import pathlib state = pickle.loads(pathlib.Path(path).read_bytes()) scorer = cls.__new__(cls) scorer.adata = adata for k, v in state.items(): setattr(scorer, k, v) # Ensure _needs_refit exists for pickles from older versions if not hasattr(scorer, "_needs_refit"): scorer._needs_refit = False print(f"[scCS] SingleScorer state loaded from '{path}'.") return scorer
# ------------------------------------------------------------------ # Plotting shortcuts # ------------------------------------------------------------------
[docs] def plot_star(self, result: CommitmentScoreResult, **kwargs): """Radial star embedding plot — primary visualization.""" from .plot import plot_star_embedding return plot_star_embedding(self.adata_sub, result, **kwargs)
[docs] def plot_rose(self, result: CommitmentScoreResult, **kwargs): """Rose/polar plot of cumulative magnitudes per angular bin.""" from .plot import plot_rose return plot_rose(result, **kwargs)
[docs] def plot_pairwise_cs(self, result: CommitmentScoreResult, **kwargs): """Heatmap of pairwise normalized commitment scores.""" from .plot import plot_pairwise_cs return plot_pairwise_cs(result, **kwargs)
[docs] def plot_commitment_bar(self, result: CommitmentScoreResult, **kwargs): """Bar chart of unCS vs nCS per fate pair.""" from .plot import plot_commitment_bar return plot_commitment_bar(result, **kwargs)
[docs] def plot_commitment_heatmap(self, result: CommitmentScoreResult, **kwargs): """Per-cell fate affinity heatmap.""" from .plot import plot_commitment_heatmap return plot_commitment_heatmap(result, **kwargs)
[docs] def plot_subset_comparison(self, subset_results: dict, **kwargs): """Compare commitment scores across subsets.""" from .plot import plot_subset_comparison return plot_subset_comparison(subset_results, **kwargs)
[docs] def plot_nn_entropy_elbow(self, **kwargs): """Elbow plots for choosing k_nn for NN-smoothed entropy. Parameters ---------- k_nn_range : list or range, optional k_nn values to sweep. Default: range(5, 51, 5). **kwargs Passed to :func:`scCS.plot.plot_nn_entropy_elbow`. Returns ------- fig : matplotlib Figure """ from .plot import plot_nn_entropy_elbow return plot_nn_entropy_elbow(self, **kwargs)
# ------------------------------------------------------------------ # Driver genes # ------------------------------------------------------------------
[docs] def get_velocity_drivers( self, n_top_genes: int = 50, ) -> dict: """Rank genes by mean scVelo velocity in each fate arm. Parameters ---------- n_top_genes : int Number of top driver genes to print per fate. Returns ------- dict : fate_name -> DataFrame[gene, mean_velocity, rank] """ self._check_fitted() return get_velocity_drivers( self.adata_sub, fate_names=self._fate_map.fate_names, obs_key=self.obs_key, root=self.root, n_top_genes=n_top_genes, )
[docs] def get_deg_drivers( self, n_top_genes: int = 50, pval_threshold: float = 0.05, logfc_threshold: float = 0.25, ) -> dict: """Find DEGs for each fate arm vs the bifurcation cluster (Wilcoxon). Parameters ---------- n_top_genes : int pval_threshold : float logfc_threshold : float Returns ------- dict : fate_name -> DataFrame[gene, logfoldchange, pval, pval_adj, significant] """ self._check_fitted() return get_deg_drivers( self.adata_sub, fate_names=self._fate_map.fate_names, obs_key=self.obs_key, root=self.root, n_top_genes=n_top_genes, pval_threshold=pval_threshold, logfc_threshold=logfc_threshold, )
[docs] def get_velocity_fate_drivers( self, result: CommitmentScoreResult, n_top_genes: int = 50, pval_threshold: float = 0.05, ) -> dict: """Identify driver genes by correlating gene velocity with fate affinity. Parameters ---------- result : CommitmentScoreResult Output of scorer.score(cell_level=True). n_top_genes : int pval_threshold : float Returns ------- dict : fate_name -> DataFrame[gene, spearman_r, pval, pval_adj, mean_velocity, delta_velocity, significant] """ self._check_fitted() if result.cell_scores is None: raise ValueError( "result.cell_scores is None. " "Run scorer.score(cell_level=True) first." ) return get_velocity_fate_drivers( self.adata_sub, cell_scores=result.cell_scores, fate_names=self._fate_map.fate_names, obs_key=self.obs_key, root=self.root, n_top_genes=n_top_genes, pval_threshold=pval_threshold, )
[docs] def get_enrichment( self, deg_drivers: dict, gene_sets: Optional[List[str]] = None, organism: str = "mouse", pval_threshold: float = 0.05, logfc_threshold: float = 0.25, plot: bool = True, n_top_pathways: int = 15, ) -> dict: """Run pathway enrichment on DEG driver genes per fate arm. Parameters ---------- deg_drivers : dict Output of get_deg_drivers(). gene_sets : list of str, optional organism : str pval_threshold : float logfc_threshold : float plot : bool n_top_pathways : int Returns ------- dict : fate_name -> {'up': DataFrame, 'down': DataFrame} """ self._check_fitted() return run_enrichment_per_fate( deg_drivers=deg_drivers, fate_names=self._fate_map.fate_names, gene_sets=gene_sets, organism=organism, pval_threshold=pval_threshold, logfc_threshold=logfc_threshold, plot=plot, n_top_pathways=n_top_pathways, )
# ------------------------------------------------------------------ # Representation # ------------------------------------------------------------------ def __repr__(self) -> str: if self._fitted: status = "fitted" elif self._needs_refit: status = "needs refit (embedding rebuilt)" elif self._embedding_built: status = "embedding built" else: status = "uninitialised" return ( f"SingleScorer(" f"root='{self.root}', " f"branches={self.branches}, " f"k={len(self.branches)}, " f"status='{status}')" )