Skip to content

API Reference

Welcome to the API reference for PhosphoVelocity. This documentation is automatically generated from the Python source code.


Model & Inference

Core modules for Gaussian Process modeling and Bayesian inference.

Gaussian Process Surrogate

PhosphositeGP

GP surrogate for a single phosphosite trajectory.

Incorporates observation uncertainty directly (heteroscedastic noise) and uses a Matérn 5/2 kernel which is better suited for biochemical dynamics.

Parameters:

Name Type Description Default
length_scale float

Initial length-scale for the Matérn kernel (in minutes).

30.0
normalize_y bool

Whether to normalize the target variable before fitting.

True
Source code in src/phospho_velocity/model/gp.py
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
class PhosphositeGP:
    """GP surrogate for a single phosphosite trajectory.

    Incorporates observation uncertainty directly (heteroscedastic noise)
    and uses a Matérn 5/2 kernel which is better suited for biochemical
    dynamics.

    Parameters
    ----------
    length_scale : float
        Initial length-scale for the Matérn kernel (in minutes).
    normalize_y : bool
        Whether to normalize the target variable before fitting.
    """

    def __init__(
        self,
        length_scale: float = 30.0,
        normalize_y: bool = True,
    ) -> None:
        self.length_scale = length_scale
        self.normalize_y = normalize_y
        self._gp: Optional[GaussianProcessRegressor] = None
        self._is_fitted: bool = False

    # ------------------------------------------------------------------
    def fit(
        self,
        times: np.ndarray,
        values: np.ndarray,
        uncertainties: Optional[np.ndarray] = None,
    ) -> "PhosphositeGP":
        """Fit GP to observed (time, log2_intensity) pairs.

        Non-finite values are silently removed before fitting.

        Parameters
        ----------
        times : array-like, shape (n,)
        values : array-like, shape (n,)
        uncertainties : array-like, shape (n,), optional
            Standard deviations of the observations. If provided, they are
            squared and passed as the `alpha` parameter for heteroscedastic noise.

        Returns
        -------
        self
        """
        self._gp, self._is_fitted = _fit_single(
            times, values, uncertainties, self.length_scale, self.normalize_y
        )
        if not self._is_fitted:
            logger.warning("Fewer than 2 valid points; GP will not be fitted.")
        return self

    # ------------------------------------------------------------------
    def predict(self, grid_times: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
        grid_times = np.asarray(grid_times, dtype=float)
        if not self._is_fitted or self._gp is None:
            nan = np.full(len(grid_times), np.nan)
            return nan, nan

        mean, std = self._gp.predict(grid_times.reshape(-1, 1), return_std=True)
        return mean, std

    # ------------------------------------------------------------------
    def predict_derivative(
        self, grid_times: np.ndarray, eps: float = 0.5
    ) -> Tuple[np.ndarray, np.ndarray]:
        grid_times = np.asarray(grid_times, dtype=float)
        if not self._is_fitted or self._gp is None:
            nan = np.full(len(grid_times), np.nan)
            return nan, nan

        mean_fwd, std_fwd = self.predict(grid_times + eps)
        mean_bwd, std_bwd = self.predict(grid_times - eps)

        dmean = (mean_fwd - mean_bwd) / (2.0 * eps)
        dstd = np.sqrt(std_fwd**2 + std_bwd**2) / (2.0 * eps)
        return dmean, dstd

fit(times, values, uncertainties=None)

Fit GP to observed (time, log2_intensity) pairs.

Non-finite values are silently removed before fitting.

Parameters:

Name Type Description Default
times (array - like, shape(n))
required
values (array - like, shape(n))
required
uncertainties (array - like, shape(n))

Standard deviations of the observations. If provided, they are squared and passed as the alpha parameter for heteroscedastic noise.

None

Returns:

Type Description
self
Source code in src/phospho_velocity/model/gp.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
def fit(
    self,
    times: np.ndarray,
    values: np.ndarray,
    uncertainties: Optional[np.ndarray] = None,
) -> "PhosphositeGP":
    """Fit GP to observed (time, log2_intensity) pairs.

    Non-finite values are silently removed before fitting.

    Parameters
    ----------
    times : array-like, shape (n,)
    values : array-like, shape (n,)
    uncertainties : array-like, shape (n,), optional
        Standard deviations of the observations. If provided, they are
        squared and passed as the `alpha` parameter for heteroscedastic noise.

    Returns
    -------
    self
    """
    self._gp, self._is_fitted = _fit_single(
        times, values, uncertainties, self.length_scale, self.normalize_y
    )
    if not self._is_fitted:
        logger.warning("Fewer than 2 valid points; GP will not be fitted.")
    return self

fit_parallel(instances, times_list, values_list, uncertainties_list=None, n_jobs=-1, desc='Fitting GPs')

Fit a list of PhosphositeGP instances in parallel with a tqdm progress bar.

The flow of every other method (predict, predict_derivative) is unchanged — this only replaces the sequential loop over .fit() calls.

Parameters:

Name Type Description Default
instances list of PhosphositeGP

Pre-constructed GP objects (one per (site, cell_line) group).

required
times_list list of np.ndarray

Time arrays, one per instance.

required
values_list list of np.ndarray

Intensity arrays, one per instance.

required
uncertainties_list list of np.ndarray or None

Uncertainty arrays, one per instance. Pass None for the whole list to use the fixed noise floor for all.

None
n_jobs int

Number of parallel jobs. -1 uses all available cores.

-1
desc str

tqdm bar label.

'Fitting GPs'

Returns:

Type Description
list of PhosphositeGP

The same instances, each with _gp and _is_fitted populated.

Source code in src/phospho_velocity/model/gp.py
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
def fit_parallel(
    instances: List[PhosphositeGP],
    times_list: List[np.ndarray],
    values_list: List[np.ndarray],
    uncertainties_list: Optional[List[Optional[np.ndarray]]] = None,
    n_jobs: int = -1,
    desc: str = "Fitting GPs",
) -> List[PhosphositeGP]:
    """Fit a list of PhosphositeGP instances in parallel with a tqdm progress bar.

    The flow of every other method (predict, predict_derivative) is unchanged —
    this only replaces the sequential loop over .fit() calls.

    Parameters
    ----------
    instances : list of PhosphositeGP
        Pre-constructed GP objects (one per (site, cell_line) group).
    times_list : list of np.ndarray
        Time arrays, one per instance.
    values_list : list of np.ndarray
        Intensity arrays, one per instance.
    uncertainties_list : list of np.ndarray or None, optional
        Uncertainty arrays, one per instance. Pass None for the whole list
        to use the fixed noise floor for all.
    n_jobs : int
        Number of parallel jobs. -1 uses all available cores.
    desc : str
        tqdm bar label.

    Returns
    -------
    list of PhosphositeGP
        The same instances, each with _gp and _is_fitted populated.
    """
    n = len(instances)
    if uncertainties_list is None:
        uncertainties_list = [None] * n

    # Run all fits in parallel; wrap the input iterable with tqdm for the bar
    results: List[Tuple[Optional[GaussianProcessRegressor], bool]] = Parallel(
        n_jobs=n_jobs,
        prefer="threads",  # avoids pickle overhead; switch to "processes" if GIL-bound
    )(
        delayed(_fit_single)(t, v, u, inst.length_scale, inst.normalize_y)
        for inst, t, v, u in tqdm(
            zip(instances, times_list, values_list, uncertainties_list),
            total=n,
            desc=desc,
            unit="site",
        )
    )

    for inst, (gp, is_fitted) in zip(instances, results):
        inst._gp = gp
        inst._is_fitted = is_fitted
        if not is_fitted:
            logger.warning("GP not fitted for one site (< 2 valid points).")

    return instances

Observation Model

Observation model for phosphopeptide log-ratios.

Based on the uncertainty propagation described in Robin et al. (2019): the localization probability of a phosphosite determines how much uncertainty is attributed to that measurement.

ObservationModel

Gaussian observation model for phosphopeptide log-ratios.

The likelihood is: p(y | mu, sigma) = Normal(y; mu, sigma)

Optionally, a Robin et al. 2019 Gamma-variance model can be fitted to empirical log-ratio data via :meth:fit_variance_model, after which :meth:sigma_from_model returns the predicted sigma for any log-ratio magnitude.

Parameters:

Name Type Description Default
None
required
Source code in src/phospho_velocity/model/observation.py
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
class ObservationModel:
    """Gaussian observation model for phosphopeptide log-ratios.

    The likelihood is:
        p(y | mu, sigma) = Normal(y; mu, sigma)

    Optionally, a Robin et al. 2019 Gamma-variance model can be fitted to
    empirical log-ratio data via :meth:`fit_variance_model`, after which
    :meth:`sigma_from_model` returns the predicted sigma for any log-ratio
    magnitude.

    Parameters
    ----------
    None
    """

    def __init__(self) -> None:
        self._variance_model_params: Optional[Tuple[float, float]] = None
        self.sigma_mode: str = "locprob"  # "locprob" or "variance_model"

    # ------------------------------------------------------------------
    def log_likelihood(
        self,
        observed: Union[float, np.ndarray],
        predicted: Union[float, np.ndarray],
        sigma: Union[float, np.ndarray],
    ) -> Union[float, np.ndarray]:
        """Gaussian log-likelihood.

        Parameters
        ----------
        observed : float or ndarray
            Observed log2 intensity or log-ratio values.
        predicted : float or ndarray
            Predicted values from the GP or ODE model.
        sigma : float or ndarray
            Observation noise standard deviation.

        Returns
        -------
        float or ndarray
            Log-likelihood value(s).
        """
        obs = np.asarray(observed, dtype=float)
        pred = np.asarray(predicted, dtype=float)
        sig = np.asarray(sigma, dtype=float)
        ll = -0.5 * np.log(2.0 * np.pi * sig**2) - 0.5 * ((obs - pred) / sig) ** 2
        return ll

    # ------------------------------------------------------------------
    def fit_variance_model(
        self,
        log_ratios: np.ndarray,
        sd_values: np.ndarray,
        min_bin_size: int = 20,
    ) -> None:
        """Fit a Robin et al. 2019 exponential variance model.

        Models the relationship between ``|log_ratio|`` and observed SD as:

        .. math::

            b(x) = A \\cdot \\exp(B \\cdot x), \\quad x = \\log(|\\text{log\\_ratio}| + \\varepsilon)

        where the parameters ``(A, B)`` are estimated by log-linear regression
        on per-bin median SDs.

        After calling this method, ``sigma_mode`` is automatically switched to
        ``"variance_model"`` and :meth:`sigma_from_model` is available.

        Parameters
        ----------
        log_ratios : ndarray, shape (N,)
            Observed peptide log-ratios.
        sd_values : ndarray, shape (N,)
            Corresponding observed standard deviations.
        min_bin_size : int
            Minimum number of data points per bin (default 20).
        """
        from scipy.stats import binned_statistic

        lr = np.asarray(log_ratios, dtype=float)
        sd = np.asarray(sd_values, dtype=float)

        # Keep only finite pairs
        valid = np.isfinite(lr) & np.isfinite(sd) & (sd > 0)
        lr = lr[valid]
        sd = sd[valid]

        if len(lr) < min_bin_size * 2:
            logger.warning(
                "Too few data points (%d) to fit variance model; keeping locprob mode.",
                len(lr),
            )
            return

        abs_lr = np.abs(lr)
        eps = 1e-8
        x = np.log(abs_lr + eps)

        # Determine number of bins such that each has >= min_bin_size points
        n_bins = max(2, len(lr) // min_bin_size)
        bin_edges = np.percentile(x, np.linspace(0, 100, n_bins + 1))
        # Ensure unique edges
        bin_edges = np.unique(bin_edges)
        if len(bin_edges) < 3:
            logger.warning(
                "Cannot form enough bins for variance model; keeping locprob mode."
            )
            return

        bin_medians, bin_centers, _ = binned_statistic(
            x, sd, statistic="median", bins=bin_edges
        )
        bin_x = 0.5 * (bin_edges[:-1] + bin_edges[1:])

        # Keep bins with valid median SD
        finite_mask = np.isfinite(bin_medians) & (bin_medians > 0)
        if finite_mask.sum() < 2:
            logger.warning(
                "Insufficient valid bins for variance model; keeping locprob mode."
            )
            return

        bx = bin_x[finite_mask]
        by = bin_medians[finite_mask]

        # Fit log(b) = log(A) + B*x by linear regression
        log_by = np.log(by)
        coeffs = np.polyfit(bx, log_by, 1)  # [B, log(A)]
        B = float(coeffs[0])
        A = float(np.exp(coeffs[1]))

        self._variance_model_params = (A, B)
        self.sigma_mode = "variance_model"
        logger.info("Fitted variance model: A=%.4f, B=%.4f", A, B)

    # ------------------------------------------------------------------
    def sigma_from_model(
        self, log_ratio: Union[float, np.ndarray]
    ) -> Union[float, np.ndarray]:
        """Predict sigma using the fitted exponential variance model.

        Requires :meth:`fit_variance_model` to have been called first.

        Parameters
        ----------
        log_ratio : float or ndarray
            Log-ratio magnitude(s) for which to predict sigma.

        Returns
        -------
        float or ndarray
            Predicted sigma value(s).

        Raises
        ------
        RuntimeError
            If :meth:`fit_variance_model` has not been called.
        """
        if self._variance_model_params is None:
            raise RuntimeError(
                "Variance model not fitted. Call fit_variance_model() first."
            )
        A, B = self._variance_model_params
        lr = np.asarray(log_ratio, dtype=float)
        abs_lr = np.abs(lr)
        eps = 1e-8
        x = np.log(abs_lr + eps)
        sigma = A * np.exp(B * x)
        return float(sigma) if sigma.ndim == 0 else sigma

fit_variance_model(log_ratios, sd_values, min_bin_size=20)

Fit a Robin et al. 2019 exponential variance model.

Models the relationship between |log_ratio| and observed SD as:

.. math::

b(x) = A \cdot \exp(B \cdot x), \quad x = \log(|\text{log\_ratio}| + \varepsilon)

where the parameters (A, B) are estimated by log-linear regression on per-bin median SDs.

After calling this method, sigma_mode is automatically switched to "variance_model" and :meth:sigma_from_model is available.

Parameters:

Name Type Description Default
log_ratios (ndarray, shape(N))

Observed peptide log-ratios.

required
sd_values (ndarray, shape(N))

Corresponding observed standard deviations.

required
min_bin_size int

Minimum number of data points per bin (default 20).

20
Source code in src/phospho_velocity/model/observation.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
def fit_variance_model(
    self,
    log_ratios: np.ndarray,
    sd_values: np.ndarray,
    min_bin_size: int = 20,
) -> None:
    """Fit a Robin et al. 2019 exponential variance model.

    Models the relationship between ``|log_ratio|`` and observed SD as:

    .. math::

        b(x) = A \\cdot \\exp(B \\cdot x), \\quad x = \\log(|\\text{log\\_ratio}| + \\varepsilon)

    where the parameters ``(A, B)`` are estimated by log-linear regression
    on per-bin median SDs.

    After calling this method, ``sigma_mode`` is automatically switched to
    ``"variance_model"`` and :meth:`sigma_from_model` is available.

    Parameters
    ----------
    log_ratios : ndarray, shape (N,)
        Observed peptide log-ratios.
    sd_values : ndarray, shape (N,)
        Corresponding observed standard deviations.
    min_bin_size : int
        Minimum number of data points per bin (default 20).
    """
    from scipy.stats import binned_statistic

    lr = np.asarray(log_ratios, dtype=float)
    sd = np.asarray(sd_values, dtype=float)

    # Keep only finite pairs
    valid = np.isfinite(lr) & np.isfinite(sd) & (sd > 0)
    lr = lr[valid]
    sd = sd[valid]

    if len(lr) < min_bin_size * 2:
        logger.warning(
            "Too few data points (%d) to fit variance model; keeping locprob mode.",
            len(lr),
        )
        return

    abs_lr = np.abs(lr)
    eps = 1e-8
    x = np.log(abs_lr + eps)

    # Determine number of bins such that each has >= min_bin_size points
    n_bins = max(2, len(lr) // min_bin_size)
    bin_edges = np.percentile(x, np.linspace(0, 100, n_bins + 1))
    # Ensure unique edges
    bin_edges = np.unique(bin_edges)
    if len(bin_edges) < 3:
        logger.warning(
            "Cannot form enough bins for variance model; keeping locprob mode."
        )
        return

    bin_medians, bin_centers, _ = binned_statistic(
        x, sd, statistic="median", bins=bin_edges
    )
    bin_x = 0.5 * (bin_edges[:-1] + bin_edges[1:])

    # Keep bins with valid median SD
    finite_mask = np.isfinite(bin_medians) & (bin_medians > 0)
    if finite_mask.sum() < 2:
        logger.warning(
            "Insufficient valid bins for variance model; keeping locprob mode."
        )
        return

    bx = bin_x[finite_mask]
    by = bin_medians[finite_mask]

    # Fit log(b) = log(A) + B*x by linear regression
    log_by = np.log(by)
    coeffs = np.polyfit(bx, log_by, 1)  # [B, log(A)]
    B = float(coeffs[0])
    A = float(np.exp(coeffs[1]))

    self._variance_model_params = (A, B)
    self.sigma_mode = "variance_model"
    logger.info("Fitted variance model: A=%.4f, B=%.4f", A, B)

log_likelihood(observed, predicted, sigma)

Gaussian log-likelihood.

Parameters:

Name Type Description Default
observed float or ndarray

Observed log2 intensity or log-ratio values.

required
predicted float or ndarray

Predicted values from the GP or ODE model.

required
sigma float or ndarray

Observation noise standard deviation.

required

Returns:

Type Description
float or ndarray

Log-likelihood value(s).

Source code in src/phospho_velocity/model/observation.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def log_likelihood(
    self,
    observed: Union[float, np.ndarray],
    predicted: Union[float, np.ndarray],
    sigma: Union[float, np.ndarray],
) -> Union[float, np.ndarray]:
    """Gaussian log-likelihood.

    Parameters
    ----------
    observed : float or ndarray
        Observed log2 intensity or log-ratio values.
    predicted : float or ndarray
        Predicted values from the GP or ODE model.
    sigma : float or ndarray
        Observation noise standard deviation.

    Returns
    -------
    float or ndarray
        Log-likelihood value(s).
    """
    obs = np.asarray(observed, dtype=float)
    pred = np.asarray(predicted, dtype=float)
    sig = np.asarray(sigma, dtype=float)
    ll = -0.5 * np.log(2.0 * np.pi * sig**2) - 0.5 * ((obs - pred) / sig) ** 2
    return ll

sigma_from_model(log_ratio)

Predict sigma using the fitted exponential variance model.

Requires :meth:fit_variance_model to have been called first.

Parameters:

Name Type Description Default
log_ratio float or ndarray

Log-ratio magnitude(s) for which to predict sigma.

required

Returns:

Type Description
float or ndarray

Predicted sigma value(s).

Raises:

Type Description
RuntimeError

If :meth:fit_variance_model has not been called.

Source code in src/phospho_velocity/model/observation.py
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
def sigma_from_model(
    self, log_ratio: Union[float, np.ndarray]
) -> Union[float, np.ndarray]:
    """Predict sigma using the fitted exponential variance model.

    Requires :meth:`fit_variance_model` to have been called first.

    Parameters
    ----------
    log_ratio : float or ndarray
        Log-ratio magnitude(s) for which to predict sigma.

    Returns
    -------
    float or ndarray
        Predicted sigma value(s).

    Raises
    ------
    RuntimeError
        If :meth:`fit_variance_model` has not been called.
    """
    if self._variance_model_params is None:
        raise RuntimeError(
            "Variance model not fitted. Call fit_variance_model() first."
        )
    A, B = self._variance_model_params
    lr = np.asarray(log_ratio, dtype=float)
    abs_lr = np.abs(lr)
    eps = 1e-8
    x = np.log(abs_lr + eps)
    sigma = A * np.exp(B * x)
    return float(sigma) if sigma.ndim == 0 else sigma

peptide_log_ratio_uncertainty(localization_prob, min_sigma=0.1, max_sigma=2.0)

Map localization probability to observation uncertainty.

High localization probability → low sigma (reliable measurement). Low localization probability → high sigma (uncertain measurement).

sigma = max_sigma - (max_sigma - min_sigma) * localization_prob

Parameters:

Name Type Description Default
localization_prob float or ndarray

Phosphosite localization probability in [0, 1].

required
min_sigma float

Minimum uncertainty (at localization_prob = 1).

0.1
max_sigma float

Maximum uncertainty (at localization_prob = 0).

2.0

Returns:

Type Description
float or ndarray

Uncertainty sigma value(s).

Source code in src/phospho_velocity/model/observation.py
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
def peptide_log_ratio_uncertainty(
    localization_prob: Union[float, np.ndarray],
    min_sigma: float = 0.1,
    max_sigma: float = 2.0,
) -> Union[float, np.ndarray]:
    """Map localization probability to observation uncertainty.

    High localization probability → low sigma (reliable measurement).
    Low localization probability → high sigma (uncertain measurement).

    sigma = max_sigma - (max_sigma - min_sigma) * localization_prob

    Parameters
    ----------
    localization_prob : float or ndarray
        Phosphosite localization probability in [0, 1].
    min_sigma : float
        Minimum uncertainty (at localization_prob = 1).
    max_sigma : float
        Maximum uncertainty (at localization_prob = 0).

    Returns
    -------
    float or ndarray
        Uncertainty sigma value(s).
    """
    lp = np.clip(np.asarray(localization_prob, dtype=float), 0.0, 1.0)
    return max_sigma - (max_sigma - min_sigma) * lp

Bayesian Inference (PyMC)

Bayesian GP inference using PyMC.

Implements a fully Bayesian Gaussian Process model for phosphosite trajectories, yielding posterior distributions over the latent trajectory and its derivative (velocity).

BayesianPhosphositeModel

Bayesian GP model for a single phosphosite trajectory.

Uses PyMC's latent GP with an ExpQuad (squared exponential) covariance.

Parameters:

Name Type Description Default
n_samples int

Number of MCMC samples per chain.

500
n_tune int

Number of tuning steps.

500
target_accept float

NUTS target acceptance rate.

0.9
random_seed int

Random seed for reproducibility.

42
network KinaseSubstrateNetwork

If provided, the regularization prior λ_s = 1/(1+|K_s|) is used to set the scale of the HalfNormal prior on the GP signal variance σ_f. Well-connected sites get a tighter prior.

None
site_id str

Site identifier used with network to look up λ_s.

None
Source code in src/phospho_velocity/inference/gp_inference.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
class BayesianPhosphositeModel:
    """Bayesian GP model for a single phosphosite trajectory.

    Uses PyMC's latent GP with an ExpQuad (squared exponential) covariance.

    Parameters
    ----------
    n_samples : int
        Number of MCMC samples per chain.
    n_tune : int
        Number of tuning steps.
    target_accept : float
        NUTS target acceptance rate.
    random_seed : int
        Random seed for reproducibility.
    network : KinaseSubstrateNetwork, optional
        If provided, the regularization prior ``λ_s = 1/(1+|K_s|)`` is used
        to set the scale of the HalfNormal prior on the GP signal variance
        ``σ_f``.  Well-connected sites get a tighter prior.
    site_id : str, optional
        Site identifier used with *network* to look up ``λ_s``.
    """

    def __init__(
        self,
        n_samples: int = 500,
        n_tune: int = 500,
        target_accept: float = 0.9,
        random_seed: int = 42,
        network=None,
        site_id: Optional[str] = None,
    ) -> None:
        self.n_samples = n_samples
        self.n_tune = n_tune
        self.target_accept = target_accept
        self.random_seed = random_seed
        self.network = network
        self.site_id = site_id
        self._idata = None
        self._obs_times: Optional[np.ndarray] = None

    # ------------------------------------------------------------------
    def fit(
        self,
        times: np.ndarray,
        values: np.ndarray,
        uncertainties: Optional[np.ndarray] = None,
    ):
        """Fit the Bayesian GP model via NUTS.

        Parameters
        ----------
        times : array-like, shape (n,)
            Observed time points.
        values : array-like, shape (n,)
            Observed log2 intensities (may contain NaN).
        uncertainties : array-like, shape (n,), optional
            Per-observation sigma.  Defaults to 0.5 for all observations.

        Returns
        -------
        az.InferenceData
            Posterior samples.
        """
        import pymc as pm

        times = np.asarray(times, dtype=float)
        values = np.asarray(values, dtype=float)

        # Mask non-finite values
        mask = np.isfinite(times) & np.isfinite(values)
        t_obs = times[mask].reshape(-1, 1)
        v_obs = values[mask]

        if len(t_obs) < 2:
            logger.warning("Fewer than 2 valid observations; skipping MCMC.")
            return None

        if uncertainties is not None:
            unc = np.asarray(uncertainties, dtype=float)[mask]
        else:
            unc = np.full(len(v_obs), 0.5)

        # Compute network-based prior scale for signal variance
        sigma_f_scale = self._signal_var_prior_scale()

        with pm.Model():
            length_scale = pm.HalfNormal("length_scale", sigma=50)
            signal_var = pm.HalfNormal("signal_var", sigma=sigma_f_scale)
            noise = pm.HalfNormal("noise", sigma=1)

            cov = signal_var**2 * pm.gp.cov.ExpQuad(1, ls=length_scale)
            gp = pm.gp.Latent(cov_func=cov)

            f = gp.prior("f", X=t_obs)

            obs_sigma = noise + unc
            _ = pm.Normal("obs", mu=f, sigma=obs_sigma, observed=v_obs)

            idata = pm.sample(
                draws=self.n_samples,
                tune=self.n_tune,
                target_accept=self.target_accept,
                random_seed=self.random_seed,
                progressbar=False,
                nuts_sampler="numpyro",
                cores=8,
            )

        self._idata = idata
        self._obs_times = t_obs.flatten()  # store observed times for interpolation
        self._v_obs = v_obs  # store observed values for posterior_velocity conditioning
        self._obs_unc = unc  # store per-obs uncertainties for posterior_velocity
        return idata

    # ------------------------------------------------------------------
    def _signal_var_prior_scale(self) -> float:
        """Compute HalfNormal scale for ``σ_f`` from network prior.

        Returns
        -------
        float
            Prior scale ``2*λ_s + 0.1`` where ``λ_s = 1/(1+|K_s|)``.
            Well-connected sites (small ``λ_s``) get a tight prior (scale ≈ 0.1),
            while orphan / no-network sites (``λ_s = 1``) get a wide prior
            (scale = 2.1).
        """
        if self.network is not None and self.site_id is not None:
            lambda_s = float(self.network.regularization_prior(self.site_id))
        else:
            lambda_s = 1.0  # no network → widest prior (orphan site)
        return 2.0 * lambda_s + 0.1

    # ------------------------------------------------------------------
    def posterior_predictive(
        self,
        idata,
        grid_times: np.ndarray,
    ) -> Tuple[np.ndarray, np.ndarray]:
        """Compute mean and std of posterior predictive at grid_times.

        Parameters
        ----------
        idata : az.InferenceData
        grid_times : array-like, shape (m,)

        Returns
        -------
        mean : ndarray, shape (m,)
        std : ndarray, shape (m,)
        """
        if idata is None:
            nan = np.full(len(grid_times), np.nan)
            return nan, nan

        # Extract posterior f samples (chain × draw × n_obs)
        f_samples = idata.posterior["f"].values  # (chains, draws, n_obs)
        f_flat = f_samples.reshape(-1, f_samples.shape[-1])

        # Use the actual observed time points (stored during fit) for interpolation
        obs_times = getattr(self, "_obs_times", None)
        if obs_times is None or len(obs_times) != f_flat.shape[-1]:
            # Fallback: cannot map correctly
            nan = np.full(len(grid_times), np.nan)
            return nan, nan

        grid_times = np.asarray(grid_times, dtype=float)
        # Interpolate each posterior sample to grid_times, then summarise
        interp_samples = np.stack(
            [np.interp(grid_times, obs_times, row) for row in f_flat], axis=0
        )
        mean_interp = interp_samples.mean(axis=0)
        std_interp = interp_samples.std(axis=0)
        return mean_interp, std_interp

    # ------------------------------------------------------------------
    def posterior_velocity(
        self,
        idata,
        grid_times: np.ndarray,
        eps: float = 0.5,
    ) -> Tuple[np.ndarray, np.ndarray]:
        """Velocity posterior via the GP derivative kernel.

        For each posterior draw ``(ℓ_i, σ_f_i, noise_i)``, instantiates a
        :class:`~phospho_velocity.model.gp_derivative.GPDerivative` and
        conditions it on the *observed* values ``v_obs`` (stored during
        :meth:`fit`) using the per-observation sigma ``noise_i + unc``.
        The total velocity uncertainty is computed via the law of total
        variance: ``Var[v] = E[Var[v|θ]] + Var[E[v|θ]]``.

        Falls back to a finite-difference approximation of the posterior
        predictive when fewer than 2 observed time points are available.

        Parameters
        ----------
        idata : az.InferenceData
        grid_times : array-like, shape (m,)
        eps : float
            Finite-difference step used only for the fallback path.

        Returns
        -------
        vel_mean : ndarray, shape (m,)
        vel_std : ndarray, shape (m,)
        """
        from ..model.gp_derivative import GPDerivative

        grid_times = np.asarray(grid_times, dtype=float)

        if idata is None:
            nan = np.full(len(grid_times), np.nan)
            return nan, nan

        obs_times = getattr(self, "_obs_times", None)
        if obs_times is None or len(obs_times) < 2:
            # Fallback to finite differences on posterior mean
            mean_fwd, _ = self.posterior_predictive(idata, grid_times + eps)
            mean_bwd, _ = self.posterior_predictive(idata, grid_times - eps)
            vel_mean = (mean_fwd - mean_bwd) / (2.0 * eps)
            _, std_fwd = self.posterior_predictive(idata, grid_times + eps)
            _, std_bwd = self.posterior_predictive(idata, grid_times - eps)
            vel_std = np.sqrt(std_fwd**2 + std_bwd**2) / (2.0 * eps)
            return vel_mean, vel_std

        # Use actual observations (not posterior f draws) for conditioning
        v_obs = getattr(self, "_v_obs", None)
        if v_obs is None or len(v_obs) != len(obs_times):
            nan = np.full(len(grid_times), np.nan)
            return nan, nan

        # Extract length_scale, signal_var and noise posterior samples
        ls_samples = idata.posterior["length_scale"].values.flatten()  # (S,)
        sv_samples = idata.posterior["signal_var"].values.flatten()  # (S,)
        noise_samples = idata.posterior["noise"].values.flatten()  # (S,)

        # Retrieve stored per-observation uncertainties from fit()
        obs_unc = getattr(self, "_obs_unc", None)

        n_draws = len(ls_samples)
        vel_mean_draws = np.zeros((n_draws, len(grid_times)))
        vel_var_draws = np.zeros((n_draws, len(grid_times)))

        for i in range(n_draws):
            ls_i = float(ls_samples[i])
            sv_i = float(sv_samples[i])
            noise_i = float(noise_samples[i])

            # Use the same per-observation sigma as in fit(): noise + unc.
            if obs_unc is not None:
                obs_unc_arr = np.asarray(obs_unc, dtype=float)
                if obs_unc_arr.shape[0] == len(obs_times):
                    sigma_obs_i = noise_i + obs_unc_arr
                else:
                    logger.warning(
                        "Length of stored observation uncertainties (%d) does not "
                        "match number of observation times (%d); falling back to "
                        "homoscedastic noise in posterior_velocity().",
                        obs_unc_arr.shape[0],
                        len(obs_times),
                    )
                    sigma_obs_i = np.full(len(obs_times), noise_i)
            else:
                sigma_obs_i = np.full(len(obs_times), noise_i)

            gpd = GPDerivative(
                length_scale=ls_i,
                signal_var=sv_i**2,
                noise_var=1e-6,
            )
            v_mean_i, v_std_i = gpd.fit_and_predict_velocity(
                obs_times, v_obs, sigma_obs_i, grid_times
            )
            vel_mean_draws[i] = v_mean_i
            vel_var_draws[i] = v_std_i**2

        # Law of total variance: Var[v] = E[Var[v|θ]] + Var[E[v|θ]]
        vel_mean = vel_mean_draws.mean(axis=0)
        vel_std = np.sqrt(vel_var_draws.mean(axis=0) + vel_mean_draws.var(axis=0))
        return vel_mean, vel_std

fit(times, values, uncertainties=None)

Fit the Bayesian GP model via NUTS.

Parameters:

Name Type Description Default
times (array - like, shape(n))

Observed time points.

required
values (array - like, shape(n))

Observed log2 intensities (may contain NaN).

required
uncertainties (array - like, shape(n))

Per-observation sigma. Defaults to 0.5 for all observations.

None

Returns:

Type Description
InferenceData

Posterior samples.

Source code in src/phospho_velocity/inference/gp_inference.py
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def fit(
    self,
    times: np.ndarray,
    values: np.ndarray,
    uncertainties: Optional[np.ndarray] = None,
):
    """Fit the Bayesian GP model via NUTS.

    Parameters
    ----------
    times : array-like, shape (n,)
        Observed time points.
    values : array-like, shape (n,)
        Observed log2 intensities (may contain NaN).
    uncertainties : array-like, shape (n,), optional
        Per-observation sigma.  Defaults to 0.5 for all observations.

    Returns
    -------
    az.InferenceData
        Posterior samples.
    """
    import pymc as pm

    times = np.asarray(times, dtype=float)
    values = np.asarray(values, dtype=float)

    # Mask non-finite values
    mask = np.isfinite(times) & np.isfinite(values)
    t_obs = times[mask].reshape(-1, 1)
    v_obs = values[mask]

    if len(t_obs) < 2:
        logger.warning("Fewer than 2 valid observations; skipping MCMC.")
        return None

    if uncertainties is not None:
        unc = np.asarray(uncertainties, dtype=float)[mask]
    else:
        unc = np.full(len(v_obs), 0.5)

    # Compute network-based prior scale for signal variance
    sigma_f_scale = self._signal_var_prior_scale()

    with pm.Model():
        length_scale = pm.HalfNormal("length_scale", sigma=50)
        signal_var = pm.HalfNormal("signal_var", sigma=sigma_f_scale)
        noise = pm.HalfNormal("noise", sigma=1)

        cov = signal_var**2 * pm.gp.cov.ExpQuad(1, ls=length_scale)
        gp = pm.gp.Latent(cov_func=cov)

        f = gp.prior("f", X=t_obs)

        obs_sigma = noise + unc
        _ = pm.Normal("obs", mu=f, sigma=obs_sigma, observed=v_obs)

        idata = pm.sample(
            draws=self.n_samples,
            tune=self.n_tune,
            target_accept=self.target_accept,
            random_seed=self.random_seed,
            progressbar=False,
            nuts_sampler="numpyro",
            cores=8,
        )

    self._idata = idata
    self._obs_times = t_obs.flatten()  # store observed times for interpolation
    self._v_obs = v_obs  # store observed values for posterior_velocity conditioning
    self._obs_unc = unc  # store per-obs uncertainties for posterior_velocity
    return idata

posterior_predictive(idata, grid_times)

Compute mean and std of posterior predictive at grid_times.

Parameters:

Name Type Description Default
idata InferenceData
required
grid_times (array - like, shape(m))
required

Returns:

Name Type Description
mean (ndarray, shape(m))
std (ndarray, shape(m))
Source code in src/phospho_velocity/inference/gp_inference.py
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
def posterior_predictive(
    self,
    idata,
    grid_times: np.ndarray,
) -> Tuple[np.ndarray, np.ndarray]:
    """Compute mean and std of posterior predictive at grid_times.

    Parameters
    ----------
    idata : az.InferenceData
    grid_times : array-like, shape (m,)

    Returns
    -------
    mean : ndarray, shape (m,)
    std : ndarray, shape (m,)
    """
    if idata is None:
        nan = np.full(len(grid_times), np.nan)
        return nan, nan

    # Extract posterior f samples (chain × draw × n_obs)
    f_samples = idata.posterior["f"].values  # (chains, draws, n_obs)
    f_flat = f_samples.reshape(-1, f_samples.shape[-1])

    # Use the actual observed time points (stored during fit) for interpolation
    obs_times = getattr(self, "_obs_times", None)
    if obs_times is None or len(obs_times) != f_flat.shape[-1]:
        # Fallback: cannot map correctly
        nan = np.full(len(grid_times), np.nan)
        return nan, nan

    grid_times = np.asarray(grid_times, dtype=float)
    # Interpolate each posterior sample to grid_times, then summarise
    interp_samples = np.stack(
        [np.interp(grid_times, obs_times, row) for row in f_flat], axis=0
    )
    mean_interp = interp_samples.mean(axis=0)
    std_interp = interp_samples.std(axis=0)
    return mean_interp, std_interp

posterior_velocity(idata, grid_times, eps=0.5)

Velocity posterior via the GP derivative kernel.

For each posterior draw (ℓ_i, σ_f_i, noise_i), instantiates a :class:~phospho_velocity.model.gp_derivative.GPDerivative and conditions it on the observed values v_obs (stored during :meth:fit) using the per-observation sigma noise_i + unc. The total velocity uncertainty is computed via the law of total variance: Var[v] = E[Var[v|θ]] + Var[E[v|θ]].

Falls back to a finite-difference approximation of the posterior predictive when fewer than 2 observed time points are available.

Parameters:

Name Type Description Default
idata InferenceData
required
grid_times (array - like, shape(m))
required
eps float

Finite-difference step used only for the fallback path.

0.5

Returns:

Name Type Description
vel_mean (ndarray, shape(m))
vel_std (ndarray, shape(m))
Source code in src/phospho_velocity/inference/gp_inference.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
def posterior_velocity(
    self,
    idata,
    grid_times: np.ndarray,
    eps: float = 0.5,
) -> Tuple[np.ndarray, np.ndarray]:
    """Velocity posterior via the GP derivative kernel.

    For each posterior draw ``(ℓ_i, σ_f_i, noise_i)``, instantiates a
    :class:`~phospho_velocity.model.gp_derivative.GPDerivative` and
    conditions it on the *observed* values ``v_obs`` (stored during
    :meth:`fit`) using the per-observation sigma ``noise_i + unc``.
    The total velocity uncertainty is computed via the law of total
    variance: ``Var[v] = E[Var[v|θ]] + Var[E[v|θ]]``.

    Falls back to a finite-difference approximation of the posterior
    predictive when fewer than 2 observed time points are available.

    Parameters
    ----------
    idata : az.InferenceData
    grid_times : array-like, shape (m,)
    eps : float
        Finite-difference step used only for the fallback path.

    Returns
    -------
    vel_mean : ndarray, shape (m,)
    vel_std : ndarray, shape (m,)
    """
    from ..model.gp_derivative import GPDerivative

    grid_times = np.asarray(grid_times, dtype=float)

    if idata is None:
        nan = np.full(len(grid_times), np.nan)
        return nan, nan

    obs_times = getattr(self, "_obs_times", None)
    if obs_times is None or len(obs_times) < 2:
        # Fallback to finite differences on posterior mean
        mean_fwd, _ = self.posterior_predictive(idata, grid_times + eps)
        mean_bwd, _ = self.posterior_predictive(idata, grid_times - eps)
        vel_mean = (mean_fwd - mean_bwd) / (2.0 * eps)
        _, std_fwd = self.posterior_predictive(idata, grid_times + eps)
        _, std_bwd = self.posterior_predictive(idata, grid_times - eps)
        vel_std = np.sqrt(std_fwd**2 + std_bwd**2) / (2.0 * eps)
        return vel_mean, vel_std

    # Use actual observations (not posterior f draws) for conditioning
    v_obs = getattr(self, "_v_obs", None)
    if v_obs is None or len(v_obs) != len(obs_times):
        nan = np.full(len(grid_times), np.nan)
        return nan, nan

    # Extract length_scale, signal_var and noise posterior samples
    ls_samples = idata.posterior["length_scale"].values.flatten()  # (S,)
    sv_samples = idata.posterior["signal_var"].values.flatten()  # (S,)
    noise_samples = idata.posterior["noise"].values.flatten()  # (S,)

    # Retrieve stored per-observation uncertainties from fit()
    obs_unc = getattr(self, "_obs_unc", None)

    n_draws = len(ls_samples)
    vel_mean_draws = np.zeros((n_draws, len(grid_times)))
    vel_var_draws = np.zeros((n_draws, len(grid_times)))

    for i in range(n_draws):
        ls_i = float(ls_samples[i])
        sv_i = float(sv_samples[i])
        noise_i = float(noise_samples[i])

        # Use the same per-observation sigma as in fit(): noise + unc.
        if obs_unc is not None:
            obs_unc_arr = np.asarray(obs_unc, dtype=float)
            if obs_unc_arr.shape[0] == len(obs_times):
                sigma_obs_i = noise_i + obs_unc_arr
            else:
                logger.warning(
                    "Length of stored observation uncertainties (%d) does not "
                    "match number of observation times (%d); falling back to "
                    "homoscedastic noise in posterior_velocity().",
                    obs_unc_arr.shape[0],
                    len(obs_times),
                )
                sigma_obs_i = np.full(len(obs_times), noise_i)
        else:
            sigma_obs_i = np.full(len(obs_times), noise_i)

        gpd = GPDerivative(
            length_scale=ls_i,
            signal_var=sv_i**2,
            noise_var=1e-6,
        )
        v_mean_i, v_std_i = gpd.fit_and_predict_velocity(
            obs_times, v_obs, sigma_obs_i, grid_times
        )
        vel_mean_draws[i] = v_mean_i
        vel_var_draws[i] = v_std_i**2

    # Law of total variance: Var[v] = E[Var[v|θ]] + Var[E[v|θ]]
    vel_mean = vel_mean_draws.mean(axis=0)
    vel_std = np.sqrt(vel_var_draws.mean(axis=0) + vel_mean_draws.var(axis=0))
    return vel_mean, vel_std

HierarchicalBayesianPhosphositeModel

Hierarchical GP model for processing batches of phosphosites.

Source code in src/phospho_velocity/inference/gp_inference.py
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
class HierarchicalBayesianPhosphositeModel:
    """Hierarchical GP model for processing batches of phosphosites."""

    def __init__(self, n_samples: int = 200, n_tune: int = 200):
        self.n_samples = n_samples
        self.n_tune = n_tune

    def fit_and_predict_batch(
        self,
        df_batch: pd.DataFrame,
        grid_times: np.ndarray,
        group_col: str = "_group_id",
        time_col: str = "time_min",
        value_col: str = "log2_intensity_norm",
        uncert_col: str = "sigma",
    ) -> Dict[
        str,
        Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray],
    ]:
        """
        Fits the hierarchical model and computes posterior velocity directly.
        Returns a dictionary mapping group_id -> (vel_mean, vel_std, vel_q5, vel_q95)
        """
        groups = df_batch[group_col].unique()
        n_groups = len(groups)
        group_idx_map = {grp: i for i, grp in enumerate(groups)}
        group_indices = df_batch[group_col].map(group_idx_map).values

        times = df_batch[time_col].values
        values = df_batch[value_col].values

        # Incorporate observation uncertainties if available
        if uncert_col in df_batch.columns:
            obs_noise = df_batch[uncert_col].values
        else:
            obs_noise = np.full(len(values), 0.1)

        with pm.Model():
            # --- GLOBAL HYPERPRIORS ---
            ls_mu = pm.Normal("ls_mu", mu=np.log(30.0), sigma=0.5)
            ls_sigma = pm.HalfNormal("ls_sigma", sigma=0.5)

            # --- SITE-SPECIFIC PARAMETERS ---
            ls_offset = pm.Normal("ls_offset", mu=0.0, sigma=1.0, shape=n_groups)
            ls_site = pm.Deterministic(
                "ls_site", pm.math.exp(ls_mu + ls_offset * ls_sigma)
            )
            eta = pm.HalfNormal("eta", sigma=2.0, shape=n_groups)

            # --- BUILD GPS ---
            gps = []
            for i in range(n_groups):
                idx_mask = group_indices == i
                t_i = times[idx_mask][:, None]
                v_i = values[idx_mask]
                noise_i = obs_noise[idx_mask]

                cov = (eta[i] ** 2) * pm.gp.cov.Matern52(1, ls=ls_site[i])
                gp = pm.gp.Marginal(cov_func=cov)
                gp.marginal_likelihood(f"y_{i}", X=t_i, y=v_i, sigma=noise_i)
                gps.append(gp)

            # --- MCMC SAMPLING ---
            logger.info(f"Sampling Hierarchical Model for {n_groups} trajectories...")
            idata = pm.sample(
                draws=self.n_samples,
                tune=self.n_tune,
                target_accept=0.9,
                progressbar=False,
                nuts_sampler="numpyro",
                cores=8,
            )

            # logger.info("Running fast Variational Inference (ADVI)...")
            # approx = pm.fit(n=20000, method="advi", progressbar=False)
            #
            # # Draw samples from the approximation
            # idata = approx.sample(1000)

            # --- POSTERIOR VELOCITY PREDICTION ---
            eps = 0.5
            # grid_fwd = (grid_times + eps)[:, None]
            # grid_bwd = (grid_times - eps)[:, None]

            results = {}
            for i, grp in enumerate(groups):
                # Calculate the latent GP curve slightly forward and slightly backward
                # mu_fwd = gps[i].conditional(f"fwd_{i}", Xnew=grid_fwd)
                # mu_bwd = gps[i].conditional(f"bwd_{i}", Xnew=grid_bwd)

                # Sample the exact curves from the MCMC traces
                post_pred = pm.sample_posterior_predictive(
                    idata, var_names=[f"fwd_{i}", f"bwd_{i}"], progressbar=False
                )

                # Extract the matrices (flattening chains and draws)
                fwd_samples = post_pred.posterior_predictive[f"fwd_{i}"].values.reshape(
                    -1, len(grid_times)
                )
                bwd_samples = post_pred.posterior_predictive[f"bwd_{i}"].values.reshape(
                    -1, len(grid_times)
                )

                # Finite difference across all posterior samples simultaneously
                vel_samples = (fwd_samples - bwd_samples) / (2.0 * eps)

                # Calculate exactly what your dataframe expects
                vel_mean = np.mean(vel_samples, axis=0)
                vel_std = np.std(vel_samples, axis=0)
                vel_q5 = np.percentile(vel_samples, 5, axis=0)
                vel_q95 = np.percentile(vel_samples, 95, axis=0)

                # --- Add this alongside the fwd/bwd conditional calls ---
                # mu_latent = gps[i].conditional(f"mu_{i}", Xnew=grid_times[:, None])

                # Sample it in sample_posterior_predictive
                post_pred = pm.sample_posterior_predictive(
                    idata,
                    var_names=[f"fwd_{i}", f"bwd_{i}", f"mu_{i}"],
                    progressbar=False,
                )

                # Extract and calculate stats
                latent_samples = post_pred.posterior_predictive[
                    f"mu_{i}"
                ].values.reshape(-1, len(grid_times))
                traj_mean = np.mean(latent_samples, axis=0)
                traj_std = np.std(latent_samples, axis=0)

                # Update return dictionary to include traj_mean and traj_std
                results[grp] = (vel_mean, vel_std, vel_q5, vel_q95, traj_mean, traj_std)

        return results

fit_and_predict_batch(df_batch, grid_times, group_col='_group_id', time_col='time_min', value_col='log2_intensity_norm', uncert_col='sigma')

Fits the hierarchical model and computes posterior velocity directly. Returns a dictionary mapping group_id -> (vel_mean, vel_std, vel_q5, vel_q95)

Source code in src/phospho_velocity/inference/gp_inference.py
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
def fit_and_predict_batch(
    self,
    df_batch: pd.DataFrame,
    grid_times: np.ndarray,
    group_col: str = "_group_id",
    time_col: str = "time_min",
    value_col: str = "log2_intensity_norm",
    uncert_col: str = "sigma",
) -> Dict[
    str,
    Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray],
]:
    """
    Fits the hierarchical model and computes posterior velocity directly.
    Returns a dictionary mapping group_id -> (vel_mean, vel_std, vel_q5, vel_q95)
    """
    groups = df_batch[group_col].unique()
    n_groups = len(groups)
    group_idx_map = {grp: i for i, grp in enumerate(groups)}
    group_indices = df_batch[group_col].map(group_idx_map).values

    times = df_batch[time_col].values
    values = df_batch[value_col].values

    # Incorporate observation uncertainties if available
    if uncert_col in df_batch.columns:
        obs_noise = df_batch[uncert_col].values
    else:
        obs_noise = np.full(len(values), 0.1)

    with pm.Model():
        # --- GLOBAL HYPERPRIORS ---
        ls_mu = pm.Normal("ls_mu", mu=np.log(30.0), sigma=0.5)
        ls_sigma = pm.HalfNormal("ls_sigma", sigma=0.5)

        # --- SITE-SPECIFIC PARAMETERS ---
        ls_offset = pm.Normal("ls_offset", mu=0.0, sigma=1.0, shape=n_groups)
        ls_site = pm.Deterministic(
            "ls_site", pm.math.exp(ls_mu + ls_offset * ls_sigma)
        )
        eta = pm.HalfNormal("eta", sigma=2.0, shape=n_groups)

        # --- BUILD GPS ---
        gps = []
        for i in range(n_groups):
            idx_mask = group_indices == i
            t_i = times[idx_mask][:, None]
            v_i = values[idx_mask]
            noise_i = obs_noise[idx_mask]

            cov = (eta[i] ** 2) * pm.gp.cov.Matern52(1, ls=ls_site[i])
            gp = pm.gp.Marginal(cov_func=cov)
            gp.marginal_likelihood(f"y_{i}", X=t_i, y=v_i, sigma=noise_i)
            gps.append(gp)

        # --- MCMC SAMPLING ---
        logger.info(f"Sampling Hierarchical Model for {n_groups} trajectories...")
        idata = pm.sample(
            draws=self.n_samples,
            tune=self.n_tune,
            target_accept=0.9,
            progressbar=False,
            nuts_sampler="numpyro",
            cores=8,
        )

        # logger.info("Running fast Variational Inference (ADVI)...")
        # approx = pm.fit(n=20000, method="advi", progressbar=False)
        #
        # # Draw samples from the approximation
        # idata = approx.sample(1000)

        # --- POSTERIOR VELOCITY PREDICTION ---
        eps = 0.5
        # grid_fwd = (grid_times + eps)[:, None]
        # grid_bwd = (grid_times - eps)[:, None]

        results = {}
        for i, grp in enumerate(groups):
            # Calculate the latent GP curve slightly forward and slightly backward
            # mu_fwd = gps[i].conditional(f"fwd_{i}", Xnew=grid_fwd)
            # mu_bwd = gps[i].conditional(f"bwd_{i}", Xnew=grid_bwd)

            # Sample the exact curves from the MCMC traces
            post_pred = pm.sample_posterior_predictive(
                idata, var_names=[f"fwd_{i}", f"bwd_{i}"], progressbar=False
            )

            # Extract the matrices (flattening chains and draws)
            fwd_samples = post_pred.posterior_predictive[f"fwd_{i}"].values.reshape(
                -1, len(grid_times)
            )
            bwd_samples = post_pred.posterior_predictive[f"bwd_{i}"].values.reshape(
                -1, len(grid_times)
            )

            # Finite difference across all posterior samples simultaneously
            vel_samples = (fwd_samples - bwd_samples) / (2.0 * eps)

            # Calculate exactly what your dataframe expects
            vel_mean = np.mean(vel_samples, axis=0)
            vel_std = np.std(vel_samples, axis=0)
            vel_q5 = np.percentile(vel_samples, 5, axis=0)
            vel_q95 = np.percentile(vel_samples, 95, axis=0)

            # --- Add this alongside the fwd/bwd conditional calls ---
            # mu_latent = gps[i].conditional(f"mu_{i}", Xnew=grid_times[:, None])

            # Sample it in sample_posterior_predictive
            post_pred = pm.sample_posterior_predictive(
                idata,
                var_names=[f"fwd_{i}", f"bwd_{i}", f"mu_{i}"],
                progressbar=False,
            )

            # Extract and calculate stats
            latent_samples = post_pred.posterior_predictive[
                f"mu_{i}"
            ].values.reshape(-1, len(grid_times))
            traj_mean = np.mean(latent_samples, axis=0)
            traj_std = np.std(latent_samples, axis=0)

            # Update return dictionary to include traj_mean and traj_std
            results[grp] = (vel_mean, vel_std, vel_q5, vel_q95, traj_mean, traj_std)

    return results

Velocity Computation

Higher-level wrappers for estimating velocities across full datasets.

Velocity computation for phosphosite time courses.

Provides GP-derivative velocity estimators operating on tidy long-format DataFrames of phosphosite time-course data. Velocity is derived analytically from the joint Gaussian Process over (f, df/dt) using the Matérn 5/2 derivative kernel — no finite-difference approximation.

VelocityEstimator

Estimate phosphosite velocity from time-course data via GP derivative.

Velocity is computed analytically as the posterior mean of the GP derivative, conditioned on the observations, using the Matérn 5/2 derivative kernel.

Parameters:

Name Type Description Default
grid_times array - like

Dense time grid for GP evaluation. Defaults to np.linspace(0, 60, 120).

None
length_scale float

GP length-scale parameter passed to :class:GPDerivative. Defaults to 30.0 minutes.

30.0
signal_var float

GP signal variance passed to :class:GPDerivative. Defaults to 1.0.

1.0
network KinaseSubstrateNetwork

If provided, used to derive per-site GP hyperparameters via :meth:_site_hyperparams.

None
prior_strength float

Scaling factor that controls how strongly the network connectivity modulates the per-site hyperparameters. Defaults to 1.0.

1.0
Source code in src/phospho_velocity/velocity/compute.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
class VelocityEstimator:
    """Estimate phosphosite velocity from time-course data via GP derivative.

    Velocity is computed analytically as the posterior mean of the GP
    derivative, conditioned on the observations, using the Matérn 5/2
    derivative kernel.

    Parameters
    ----------
    grid_times : array-like, optional
        Dense time grid for GP evaluation.  Defaults to
        ``np.linspace(0, 60, 120)``.
    length_scale : float, optional
        GP length-scale parameter passed to :class:`GPDerivative`.
        Defaults to 30.0 minutes.
    signal_var : float, optional
        GP signal variance passed to :class:`GPDerivative`.  Defaults to 1.0.
    network : KinaseSubstrateNetwork, optional
        If provided, used to derive per-site GP hyperparameters via
        :meth:`_site_hyperparams`.
    prior_strength : float, optional
        Scaling factor that controls how strongly the network connectivity
        modulates the per-site hyperparameters.  Defaults to 1.0.
    """

    def __init__(
        self,
        grid_times: Optional[np.ndarray] = None,
        length_scale: float = 30.0,
        signal_var: float = 1.0,
        network: Optional["KinaseSubstrateNetwork"] = None,
        prior_strength: float = 1.0,
    ) -> None:
        self.grid_times = (
            np.asarray(grid_times, dtype=float)
            if grid_times is not None
            else _DEFAULT_GRID
        )
        self.base_length_scale = length_scale
        self.base_signal_var = signal_var
        self.network = network
        self.prior_strength = prior_strength

    # Backward-compatible aliases
    @property
    def length_scale(self) -> float:
        """Alias for :attr:`base_length_scale` (backward compatibility)."""
        return self.base_length_scale

    @property
    def signal_var(self) -> float:
        """Alias for :attr:`base_signal_var` (backward compatibility)."""
        return self.base_signal_var

    # ------------------------------------------------------------------
    def _site_hyperparams(self, site_id: str) -> tuple:
        """Return (length_scale, signal_var) for this site, possibly network-informed.

        Parameters
        ----------
        site_id : str
            Identifier of the phosphosite.

        Returns
        -------
        tuple of (float, float)
            ``(length_scale, signal_var)`` to use for this site's GP fit.
            ``lambda_s`` is the value returned by
            :meth:`~phospho_velocity.network.kinase_substrate.KinaseSubstrateNetwork.regularization_prior`,
            a float in (0, 1] where small values indicate well-connected sites
            and values near 1 indicate orphan sites.
        """
        if self.network is None:
            return self.base_length_scale, self.base_signal_var

        try:
            lambda_s = float(self.network.regularization_prior(site_id))
        except (AttributeError, TypeError, ValueError):
            lambda_s = 0.5  # fallback if network object is malformed

        # Well-connected sites (small lambda_s) → stronger regularization → smaller signal_var
        # Orphan sites (lambda_s ~ 1) → weaker regularization → larger signal_var
        # scale increases monotonically with lambda_s so orphans get larger signal_var
        scale = 1.0 + self.prior_strength * (2.0 * lambda_s - 1.0)
        signal_var = max(self.base_signal_var * scale, 1e-4)

        # More connected → slightly shorter length-scale
        ls_scale = 1.0 - 0.3 * self.prior_strength * (1.0 - lambda_s)
        length_scale = max(self.base_length_scale * ls_scale, 1.0)

        return length_scale, signal_var

    # ------------------------------------------------------------------
    def estimate_site_velocity(
        self,
        times: np.ndarray,
        values: np.ndarray,
        uncertainties: Optional[np.ndarray] = None,
    ) -> Dict[str, np.ndarray]:
        """Estimate velocity for a single (site, cell_line) pair.

        Parameters
        ----------
        times : array-like, shape (n,)
        values : array-like, shape (n,)
        uncertainties : array-like, shape (n,), optional

        Returns
        -------
        dict with keys:
            ``velocity``, ``velocity_std``, ``times``,
            ``trajectory_mean``, ``trajectory_std``
        """
        times = np.asarray(times, dtype=float)
        values = np.asarray(values, dtype=float)

        eval_times = self.grid_times

        # Filter valid observations
        mask = np.isfinite(times) & np.isfinite(values)
        t_clean = times[mask]
        v_clean = values[mask]

        if len(t_clean) < 2:
            nan = np.full(len(eval_times), np.nan)
            return {
                "velocity": nan,
                "velocity_std": nan,
                "times": eval_times,
                "trajectory_mean": nan,
                "trajectory_std": nan,
            }

        # Determine per-observation sigma for the derivative kernel
        if uncertainties is not None:
            u_clean = np.asarray(uncertainties, dtype=float)[mask]
            u_clean = np.where(np.isfinite(u_clean), u_clean, 0.5)
        else:
            u_clean = np.full(len(t_clean), 0.5)

        gpd = GPDerivative(length_scale=self.base_length_scale, signal_var=self.base_signal_var, noise_var=1e-6)
        vel, vel_std = gpd.fit_and_predict_velocity(
            t_clean, v_clean, u_clean, eval_times
        )

        # Also produce trajectory prediction via the standard GP surrogate
        gp = PhosphositeGP()
        gp.fit(
            t_clean,
            v_clean,
            uncertainties=u_clean if uncertainties is not None else None,
        )
        traj_mean, traj_std = gp.predict(eval_times)

        return {
            "velocity": vel,
            "velocity_std": vel_std,
            "times": eval_times,
            "trajectory_mean": traj_mean,
            "trajectory_std": traj_std,
        }

    # ------------------------------------------------------------------
    def estimate_all_sites(
            self,
            df: pd.DataFrame,
            value_col: str = "log2_intensity_norm",
            time_col: str = "time_min",
            site_col: str = "site_id",
            cell_line_col: str = "cell_line",
            uncertainty_col: Optional[str] = "sigma",
            n_jobs: int = -1,
    ) -> pd.DataFrame:
        from ..model.gp import fit_parallel  # module-level parallel worker

        groups = list(df.groupby([site_col, cell_line_col]))
        n_groups = len(groups)
        logger.info("Estimating velocity for %d (site, cell_line) groups", n_groups)

        # ── collect per-group arrays ──────────────────────────────────────
        keys_list = [key for key, _ in groups]
        grp_list = [grp for _, grp in groups]

        times_list = [g[time_col].values for g in grp_list]
        values_list = [g[value_col].values for g in grp_list]
        uncert_list = [
            g[uncertainty_col].values
            if (uncertainty_col and uncertainty_col in g.columns)
            else None
            for g in grp_list
        ]

        # ── build one PhosphositeGP per group with per-site length-scale, fit in parallel ─
        instances = [
            PhosphositeGP(length_scale=self._site_hyperparams(str(key[0]))[0])
            for key in keys_list
        ]
        fit_parallel(
            instances, times_list, values_list, uncert_list,
            n_jobs=n_jobs,
            desc="Estimating velocity",
        )

        # ── compute velocity + trajectory for each fitted GP ─────────────
        records: List[dict] = []
        for (site_id, cell_line), inst, times, values, uncertainties in zip(
                keys_list, instances, times_list, values_list, uncert_list
        ):
            eval_times = self.grid_times

            if not inst._is_fitted:
                nan = np.full(len(eval_times), np.nan)
                result = {
                    "velocity": nan, "velocity_std": nan,
                    "times": eval_times,
                    "trajectory_mean": nan, "trajectory_std": nan,
                }
            else:
                # mask + sigma — same logic as estimate_site_velocity
                mask = np.isfinite(times) & np.isfinite(values)
                t_clean = times[mask]
                v_clean = values[mask]
                if uncertainties is not None:
                    u_clean = np.asarray(uncertainties, dtype=float)[mask]
                    u_clean = np.where(np.isfinite(u_clean), u_clean, 0.5)
                else:
                    u_clean = np.full(len(t_clean), 0.5)

                # NEW: network-informed hyperparameters for this site
                length_scale, signal_var = self._site_hyperparams(str(site_id))

                gpd = GPDerivative(
                    length_scale=length_scale,
                    signal_var=signal_var,
                    noise_var=1e-6,
                )
                vel, vel_std = gpd.fit_and_predict_velocity(
                    t_clean, v_clean, u_clean, eval_times
                )

                # Trajectory prediction from the parallel-fitted GP
                traj_mean, traj_std = inst.predict(eval_times)

                result = {
                    "velocity": vel, "velocity_std": vel_std,
                    "times": eval_times,
                    "trajectory_mean": traj_mean, "trajectory_std": traj_std,
                }

            for i, t in enumerate(result["times"]):
                records.append({
                    site_col: site_id,
                    cell_line_col: cell_line,
                    "time_min": t,
                    "velocity": result["velocity"][i],
                    "velocity_std": result["velocity_std"][i],
                    "trajectory_mean": result["trajectory_mean"][i],
                    "trajectory_std": result["trajectory_std"][i],
                })

        return pd.DataFrame(records)

length_scale property

Alias for :attr:base_length_scale (backward compatibility).

signal_var property

Alias for :attr:base_signal_var (backward compatibility).

estimate_site_velocity(times, values, uncertainties=None)

Estimate velocity for a single (site, cell_line) pair.

Parameters:

Name Type Description Default
times (array - like, shape(n))
required
values (array - like, shape(n))
required
uncertainties (array - like, shape(n))
None

Returns:

Type Description
dict with keys:

velocity, velocity_std, times, trajectory_mean, trajectory_std

Source code in src/phospho_velocity/velocity/compute.py
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
def estimate_site_velocity(
    self,
    times: np.ndarray,
    values: np.ndarray,
    uncertainties: Optional[np.ndarray] = None,
) -> Dict[str, np.ndarray]:
    """Estimate velocity for a single (site, cell_line) pair.

    Parameters
    ----------
    times : array-like, shape (n,)
    values : array-like, shape (n,)
    uncertainties : array-like, shape (n,), optional

    Returns
    -------
    dict with keys:
        ``velocity``, ``velocity_std``, ``times``,
        ``trajectory_mean``, ``trajectory_std``
    """
    times = np.asarray(times, dtype=float)
    values = np.asarray(values, dtype=float)

    eval_times = self.grid_times

    # Filter valid observations
    mask = np.isfinite(times) & np.isfinite(values)
    t_clean = times[mask]
    v_clean = values[mask]

    if len(t_clean) < 2:
        nan = np.full(len(eval_times), np.nan)
        return {
            "velocity": nan,
            "velocity_std": nan,
            "times": eval_times,
            "trajectory_mean": nan,
            "trajectory_std": nan,
        }

    # Determine per-observation sigma for the derivative kernel
    if uncertainties is not None:
        u_clean = np.asarray(uncertainties, dtype=float)[mask]
        u_clean = np.where(np.isfinite(u_clean), u_clean, 0.5)
    else:
        u_clean = np.full(len(t_clean), 0.5)

    gpd = GPDerivative(length_scale=self.base_length_scale, signal_var=self.base_signal_var, noise_var=1e-6)
    vel, vel_std = gpd.fit_and_predict_velocity(
        t_clean, v_clean, u_clean, eval_times
    )

    # Also produce trajectory prediction via the standard GP surrogate
    gp = PhosphositeGP()
    gp.fit(
        t_clean,
        v_clean,
        uncertainties=u_clean if uncertainties is not None else None,
    )
    traj_mean, traj_std = gp.predict(eval_times)

    return {
        "velocity": vel,
        "velocity_std": vel_std,
        "times": eval_times,
        "trajectory_mean": traj_mean,
        "trajectory_std": traj_std,
    }

run_bayesian_velocity(df, value_col='log2_intensity_norm', time_col='time_min', site_col='site_id', cell_line_col='cell_line', n_samples=1000, n_tune=1000, n_grid=60, batch_size=20, method='hierarchical_bayesian', extend_to=None, network=None)

Run Bayesian GP velocity for all (site, cell_line) groups.

Parameters:

Name Type Description Default
df DataFrame
required
value_col str
'log2_intensity_norm'
time_col str
'time_min'
site_col str
'site_id'
cell_line_col str
'cell_line'
n_samples int
1000
n_tune int
1000
n_grid int

Number of points in the evaluation time grid (default 60).

60
batch_size int

Number of trajectories to process simultaneously (Hierarchical only).

20
method str

'bayesian' (independent GPs) or 'hierarchical_bayesian' (batched partial pooling).

'hierarchical_bayesian'
extend_to float

Extend time grid to this value.

None
network KinaseSubstrateNetwork

If provided, used to set network-informed priors on GP signal variance.

None

Returns:

Type Description
DataFrame

Columns: site_id, cell_line, time_min, velocity_mean, velocity_sd, velocity_q5, velocity_q95.

Source code in src/phospho_velocity/velocity/compute.py
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
def run_bayesian_velocity(
    df: pd.DataFrame,
    value_col: str = "log2_intensity_norm",
    time_col: str = "time_min",
    site_col: str = "site_id",
    cell_line_col: str = "cell_line",
    n_samples: int = 1000,
    n_tune: int = 1000,
    n_grid: int = 60,
    batch_size: int = 20,
    method: str = "hierarchical_bayesian",
    extend_to: Optional[float] = None,
    network=None,
) -> pd.DataFrame:
    """Run Bayesian GP velocity for all (site, cell_line) groups.

    Parameters
    ----------
    df : pd.DataFrame
    value_col : str
    time_col : str
    site_col : str
    cell_line_col : str
    n_samples : int
    n_tune : int
    n_grid : int
        Number of points in the evaluation time grid (default 60).
    batch_size : int
        Number of trajectories to process simultaneously (Hierarchical only).
    method : str
        'bayesian' (independent GPs) or 'hierarchical_bayesian' (batched partial pooling).
    extend_to : float, optional
        Extend time grid to this value.
    network : KinaseSubstrateNetwork, optional
        If provided, used to set network-informed priors on GP signal variance.

    Returns
    -------
    pd.DataFrame
        Columns: site_id, cell_line, time_min, velocity_mean, velocity_sd,
        velocity_q5, velocity_q95.
    """
    from ..inference.gp_inference import BayesianPhosphositeModel
    from ..inference.gp_inference import HierarchicalBayesianPhosphositeModel

    t_min = df[time_col].dropna().min()
    t_max = extend_to if extend_to is not None else df[time_col].dropna().max()
    if np.isnan(t_min) or np.isnan(t_max) or t_min == t_max:
        raise ValueError(
            f"Cannot build time grid from column '{time_col}': min={t_min}, max={t_max}"
        )

    grid_times = np.linspace(t_min, t_max, n_grid)
    records: List[dict] = []

    # =================================================================
    # METHOD 1: Independent Bayesian GPs (One-by-one)
    # =================================================================
    if method == "bayesian":
        logger.info(
            f"Starting Independent Bayesian inference on {len(df.groupby([site_col, cell_line_col]))} groups..."
        )

        # Fits independent Bayesian GPs per site-cell group; appends velocity posteriors to records
        for (site_id, cell_line), grp in df.groupby([site_col, cell_line_col]):
            times = grp[time_col].values
            values = grp[value_col].values

            # Incorporate uncertainties if the column exists
            if "sigma" in grp.columns:
                uncertainties = grp["sigma"].values
            else:
                uncertainties = None

            # Re-instantiate with network prior for this site
            site_model = BayesianPhosphositeModel(
                n_samples=n_samples,
                n_tune=n_tune,
                network=network,
                site_id=str(site_id),
            )
            idata = site_model.fit(times, values, uncertainties=uncertainties)
            if idata is None:
                continue

            vel_mean, vel_std = site_model.posterior_velocity(idata, grid_times)

            for i, t in enumerate(grid_times):
                records.append(
                    {
                        site_col: site_id,
                        cell_line_col: cell_line,
                        "time_min": t,
                        "velocity_mean": vel_mean[i],
                        "velocity_sd": vel_std[i],
                        # Normal approximation for independent GP quantiles
                        "velocity_q5": vel_mean[i] - 1.645 * vel_std[i],
                        "velocity_q95": vel_mean[i] + 1.645 * vel_std[i],
                    }
                )

    # =================================================================
    # METHOD 2: Hierarchical Bayesian GPs (Batched Partial-Pooling)
    # =================================================================
    elif method == "hierarchical_bayesian":
        hierarchical_model = HierarchicalBayesianPhosphositeModel(
            n_samples=n_samples, n_tune=n_tune
        )

        # Create a unique group ID to handle grouping safely
        df = df.copy()
        df["_group_id"] = (
            df[site_col].astype(str) + "|||" + df[cell_line_col].astype(str)
        )
        unique_groups = df["_group_id"].unique()

        logger.info(
            f"Starting Hierarchical Bayesian inference on {len(unique_groups)} total groups..."
        )

        for i in range(0, len(unique_groups), batch_size):
            batch_groups = unique_groups[i : i + batch_size]
            df_batch = df[df["_group_id"].isin(batch_groups)]

            logger.info(
                f"Processing batch {i // batch_size + 1} of {(len(unique_groups) + batch_size - 1) // batch_size}..."
            )

            # Fit model and get predictions directly
            batch_results = hierarchical_model.fit_and_predict_batch(
                df_batch=df_batch,
                grid_times=grid_times,
                group_col="_group_id",
                time_col=time_col,
                value_col=value_col,
                uncert_col="sigma",
            )

            for group_id, (
                v_mean,
                v_sd,
                v_q5,
                v_q95,
                traj_mean,
                traj_std,
            ) in batch_results.items():
                site_id, cell_line = group_id.split("|||")

                for j, t in enumerate(grid_times):
                    records.append(
                        {
                            site_col: site_id,
                            cell_line_col: cell_line,
                            "time_min": t,
                            "velocity_mean": v_mean[j],
                            "velocity_sd": v_sd[j],
                            "velocity_q5": v_q5[j],
                            "velocity_q95": v_q95[j],
                            "trajectory_mean": traj_mean[j],
                            "trajectory_std": traj_std[j],
                        }
                    )

    else:
        raise ValueError(
            f"Unknown Bayesian method: {method}. Use 'bayesian' or 'hierarchical_bayesian'."
        )

    return pd.DataFrame(records)

Data Processing

Modules for parsing raw MaxQuant files and preprocessing the data.

Input / Output

MaxQuant Phospho(STY)Sites.txt parser.

Parses the wide-format MaxQuant phosphoproteomics output into a tidy long-format DataFrame suitable for downstream GP-based time-course reconstruction and velocity modeling.

MaxQuantAdapter

Adapter for MaxQuant Phospho(STY)Sites.txt files.

Wraps the module-level parsing functions as a :class:~phospho_velocity.io.base.PhosphoInputAdapter-compatible object. The module-level functions are kept for backward compatibility.

Source code in src/phospho_velocity/io/maxquant.py
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
class MaxQuantAdapter:
    """Adapter for MaxQuant ``Phospho(STY)Sites.txt`` files.

    Wraps the module-level parsing functions as a
    :class:`~phospho_velocity.io.base.PhosphoInputAdapter`-compatible object.
    The module-level functions are kept for backward compatibility.
    """

    def parse(self, path: str) -> pd.DataFrame:
        """Parse a MaxQuant Phospho(STY)Sites.txt file.

        Delegates to :func:`parse_phosphosites`.

        Parameters
        ----------
        path : str
            Path to the file.

        Returns
        -------
        pd.DataFrame
            Canonical long-format DataFrame.  ``localization_probability``
            is aliased from ``Localization prob`` when present.
        """
        df = parse_phosphosites(path)
        # Ensure canonical column name
        if (
            "Localization prob" in df.columns
            and "localization_probability" not in df.columns
        ):
            df = df.rename(columns={"Localization prob": "localization_probability"})
        # Ensure gene column exists (use first part of site_id as fallback)
        if "gene" not in df.columns:
            if "Leading proteins" in df.columns:
                df["gene"] = df["Leading proteins"].str.split(";").str[0]
            else:
                df["gene"] = df["site_id"].str.split("_").str[0]
        return df

    def validate_columns(self, df: pd.DataFrame):
        """Return missing canonical columns from *df*.

        Parameters
        ----------
        df : pd.DataFrame

        Returns
        -------
        list of str
        """
        from .base import CANONICAL_COLUMNS

        return [c for c in CANONICAL_COLUMNS if c not in df.columns]

parse(path)

Parse a MaxQuant Phospho(STY)Sites.txt file.

Delegates to :func:parse_phosphosites.

Parameters:

Name Type Description Default
path str

Path to the file.

required

Returns:

Type Description
DataFrame

Canonical long-format DataFrame. localization_probability is aliased from Localization prob when present.

Source code in src/phospho_velocity/io/maxquant.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
def parse(self, path: str) -> pd.DataFrame:
    """Parse a MaxQuant Phospho(STY)Sites.txt file.

    Delegates to :func:`parse_phosphosites`.

    Parameters
    ----------
    path : str
        Path to the file.

    Returns
    -------
    pd.DataFrame
        Canonical long-format DataFrame.  ``localization_probability``
        is aliased from ``Localization prob`` when present.
    """
    df = parse_phosphosites(path)
    # Ensure canonical column name
    if (
        "Localization prob" in df.columns
        and "localization_probability" not in df.columns
    ):
        df = df.rename(columns={"Localization prob": "localization_probability"})
    # Ensure gene column exists (use first part of site_id as fallback)
    if "gene" not in df.columns:
        if "Leading proteins" in df.columns:
            df["gene"] = df["Leading proteins"].str.split(";").str[0]
        else:
            df["gene"] = df["site_id"].str.split("_").str[0]
    return df

validate_columns(df)

Return missing canonical columns from df.

Parameters:

Name Type Description Default
df DataFrame
required

Returns:

Type Description
list of str
Source code in src/phospho_velocity/io/maxquant.py
264
265
266
267
268
269
270
271
272
273
274
275
276
277
def validate_columns(self, df: pd.DataFrame):
    """Return missing canonical columns from *df*.

    Parameters
    ----------
    df : pd.DataFrame

    Returns
    -------
    list of str
    """
    from .base import CANONICAL_COLUMNS

    return [c for c in CANONICAL_COLUMNS if c not in df.columns]

find_intensity_columns(cols)

Strictly match ONLY raw intensity columns: 'Intensity ES2_1', NOT: - Intensity L - Intensity H - Intensity normalized

Source code in src/phospho_velocity/io/maxquant.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
def find_intensity_columns(cols: List[str]) -> List[str]:
    """
    Strictly match ONLY raw intensity columns:
    'Intensity ES2_1', NOT:
    - Intensity L
    - Intensity H
    - Intensity normalized
    """
    pattern = re.compile(r"^Intensity\s+[A-Z0-9]+_\d+$")

    matched = [c for c in cols if pattern.match(c)]

    # Defensive filtering: exclude known bad patterns
    bad_tokens = [" L ", " H ", "normalized", "___"]
    matched = [c for c in matched if not any(tok in c for tok in bad_tokens)]

    logger.info("Using %d clean intensity columns", len(matched))

    return matched

make_site_id(row)

Build a stable site ID from MaxQuant phosphosite row.

The Leading proteins field from MaxQuant may optionally carry colon-delimited genomic annotation appended by external tools, giving the following full site_id schema (all parts colon-separated within the leading-proteins segment, underscore before the phosphosite suffix)::

<protein_id>:<genomic_mapping>:<aa_substitution>:<nucleotide_change>
    :<codon_change>:<sift_prediction>:<polyphen_score>_<phosphosite>

Example::

ENSP00000263026:map.16/22269867/G,T:p.Q361R:n.A1556G:c.cAa/cGa
    :SIFTprediction.tolerated:PolyPhenScore.0_S74

For unannotated entries the simpler form is used::

ENSP00000001146_S460

Component descriptions:

  • protein_id – Ensembl protein accession, e.g. ENSP00000263026.
  • genomic_mapping – chromosome/position/ref,alt, e.g. map.16/22269867/G,T. Note: contains a literal comma.
  • aa_substitution – HGVS protein change, e.g. p.Q361R.
  • nucleotide_change – transcript-level HGVS, e.g. n.A1556G.
  • codon_change – codon-level notation, e.g. c.cAa/cGa.
  • sift_prediction – e.g. SIFTprediction.tolerated.
  • polyphen_score – e.g. PolyPhenScore.0.
  • phosphosite – single amino-acid + position code, e.g. S74 (only the first position from Positions within proteins is used to avoid repeated entries like S74;74;385;269).

Parameters:

Name Type Description Default
row Series

A row from Phospho(STY)Sites.txt.

required

Returns:

Type Description
str

Opaque but human-readable site identifier. Downstream code must treat this as an opaque string; use :func:~phospho_velocity.io.formatter.split_site_id to recover individual annotation fields.

Source code in src/phospho_velocity/io/maxquant.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def make_site_id(row: pd.Series) -> str:
    """Build a stable site ID from MaxQuant phosphosite row.

    The ``Leading proteins`` field from MaxQuant may optionally carry
    colon-delimited genomic annotation appended by external tools, giving the
    following full site_id schema (all parts colon-separated within the
    leading-proteins segment, underscore before the phosphosite suffix)::

        <protein_id>:<genomic_mapping>:<aa_substitution>:<nucleotide_change>
            :<codon_change>:<sift_prediction>:<polyphen_score>_<phosphosite>

    Example::

        ENSP00000263026:map.16/22269867/G,T:p.Q361R:n.A1556G:c.cAa/cGa
            :SIFTprediction.tolerated:PolyPhenScore.0_S74

    For unannotated entries the simpler form is used::

        ENSP00000001146_S460

    Component descriptions:

    - ``protein_id`` – Ensembl protein accession, e.g. ``ENSP00000263026``.
    - ``genomic_mapping`` – chromosome/position/ref,alt, e.g.
      ``map.16/22269867/G,T``.  Note: contains a literal comma.
    - ``aa_substitution`` – HGVS protein change, e.g. ``p.Q361R``.
    - ``nucleotide_change`` – transcript-level HGVS, e.g. ``n.A1556G``.
    - ``codon_change`` – codon-level notation, e.g. ``c.cAa/cGa``.
    - ``sift_prediction`` – e.g. ``SIFTprediction.tolerated``.
    - ``polyphen_score`` – e.g. ``PolyPhenScore.0``.
    - ``phosphosite`` – single amino-acid + position code, e.g. ``S74``
      (only the *first* position from ``Positions within proteins`` is used
      to avoid repeated entries like ``S74;74;385;269``).

    Parameters
    ----------
    row : pd.Series
        A row from Phospho(STY)Sites.txt.

    Returns
    -------
    str
        Opaque but human-readable site identifier.  Downstream code must
        treat this as an opaque string; use
        :func:`~phospho_velocity.io.formatter.split_site_id` to recover
        individual annotation fields.
    """
    protein = str(row.get("Leading proteins", "unknown")).split(";")[0].strip()
    aa = str(row.get("Amino acid", "X"))
    # Use only the first position to avoid repeated entries like "74;74;385;269"
    positions = str(row.get("Positions within proteins", "0")).split(";")[0].strip()
    return f"{protein}_{aa}{positions}"

parse_phosphosites(filepath, sep='\t')

Parse MaxQuant Phospho(STY)Sites.txt into tidy long-format DataFrame.

Rows corresponding to contaminants (Potential contaminant == "+" ) and reverse decoy hits (Reverse == "+" ) are removed. Rows with NaN intensity are kept to preserve the missingness structure.

Parameters:

Name Type Description Default
filepath str

Path to Phospho (STY)Sites.txt (or equivalent).

required
sep str

Column separator (default \t).

'\t'

Returns:

Type Description
DataFrame

Tidy long-format table with columns: site_id, cell_line, replicate, intensity, plus all metadata columns found in the file.

Source code in src/phospho_velocity/io/maxquant.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
def parse_phosphosites(
    filepath: str,
    sep: str = "\t",
) -> pd.DataFrame:
    """Parse MaxQuant Phospho(STY)Sites.txt into tidy long-format DataFrame.

    Rows corresponding to contaminants (``Potential contaminant == "+"`` ) and
    reverse decoy hits (``Reverse == "+"`` ) are removed.  Rows with NaN
    intensity are **kept** to preserve the missingness structure.

    Parameters
    ----------
    filepath : str
        Path to ``Phospho (STY)Sites.txt`` (or equivalent).
    sep : str
        Column separator (default ``\\t``).

    Returns
    -------
    pd.DataFrame
        Tidy long-format table with columns:
        ``site_id``, ``cell_line``, ``replicate``, ``intensity``,
        plus all metadata columns found in the file.
    """
    logger.info("Reading phosphosite file: %s", filepath)
    df = pd.read_csv(filepath, sep=sep, low_memory=False)
    logger.info("Loaded %d rows × %d columns", *df.shape)

    # Filter contaminants and reverse hits
    if "Potential contaminant" in df.columns:
        mask_cont = df["Potential contaminant"].eq("+")
        n_cont = mask_cont.sum()
        df = df[~mask_cont]
        logger.info("Removed %d contaminant rows", n_cont)

    if "Reverse" in df.columns:
        mask_rev = df["Reverse"].eq("+")
        n_rev = mask_rev.sum()
        df = df[~mask_rev]
        logger.info("Removed %d reverse rows", n_rev)

    df = df.reset_index(drop=True)

    # Build site IDs
    df = df.copy()
    df["site_id"] = df.apply(make_site_id, axis=1)

    # Find intensity columns
    intensity_cols = find_intensity_columns(df.columns.tolist())
    if not intensity_cols:
        raise ValueError("No intensity columns found in the phosphosite file.")

    # Determine which meta columns are present
    present_meta = [c for c in _META_COLS if c in df.columns]

    # Melt to long format — keep NaN values (do NOT dropna)
    id_vars = ["site_id"] + present_meta
    long = df[id_vars + intensity_cols].melt(
        id_vars=id_vars,
        value_vars=intensity_cols,
        var_name="sample_col",
        value_name="intensity",
    )

    # Parse cell_line and replicate
    meta = long["sample_col"].apply(
        lambda c: pd.Series(parse_sample_meta(c), index=["cell_line", "replicate"])
    )
    long = pd.concat([long.drop(columns=["sample_col"]), meta], axis=1)

    # Replace 0 intensities with NaN (MaxQuant uses 0 for missing)
    long["intensity"] = long["intensity"].replace(0, np.nan)

    # Drop rows with no sample assignment (parsing failures)
    long = long.dropna(subset=["cell_line", "replicate"])

    # Ensure numeric intensity
    long["intensity"] = pd.to_numeric(long["intensity"], errors="coerce")

    # Drop rows where everything is NaN for a site
    valid_counts = long.groupby(["site_id", "cell_line"])["intensity"].transform(
        lambda x: x.notna().sum()
    )

    long = long[valid_counts >= 1]

    logger.info("Long-format table: %d rows", len(long))
    return long.reset_index(drop=True)

parse_sample_meta(colname)

Parse Intensity ES2_1(cell_line='ES2', replicate=1).

Parameters:

Name Type Description Default
colname str

Column name such as Intensity ES2_1.

required

Returns:

Type Description
tuple of (str, int)

(cell_line, replicate)

Raises:

Type Description
ValueError

If the column name does not match the expected pattern.

Source code in src/phospho_velocity/io/maxquant.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
def parse_sample_meta(colname: str) -> Tuple[str, int]:
    """Parse ``Intensity ES2_1`` → ``(cell_line='ES2', replicate=1)``.

    Parameters
    ----------
    colname : str
        Column name such as ``Intensity ES2_1``.

    Returns
    -------
    tuple of (str, int)
        ``(cell_line, replicate)``

    Raises
    ------
    ValueError
        If the column name does not match the expected pattern.
    """
    m = re.match(r"^Intensity\s+([A-Z0-9]+)_(\d+)$", colname)
    if not m:
        raise ValueError(f"Cannot parse sample meta from column: {colname!r}")
    cell_line = m.group(1)
    replicate = int(m.group(2))
    return cell_line, replicate

Preprocessing & Normalization

Preprocessing and normalization utilities.

Implements log2 transformation, per-sample median normalization, log-fold-change computation, and missingness handling for phosphoproteomics time-course data.

assign_time(df, time_map=None, allow_missing=False)

Add time_min column from cell_line + replicate mapping.

Parameters:

Name Type Description Default
df DataFrame

Long-format DataFrame with cell_line and replicate columns.

required
time_map dict or :class:`~phospho_velocity.config.TimeMap`

Mapping (cell_line, replicate) → time_min.

  • If a plain :class:dict is passed (legacy behaviour), it is used directly.
  • If a :class:~phospho_velocity.config.TimeMap is passed, its mapping attribute is used and the object's :meth:validate method is called to detect uncovered pairs.
  • If None, :data:DEFAULT_TIME_MAP is used with a deprecation warning (PXD000901 hardcoded mapping).
None
allow_missing bool

When True, rows with no matching time point are kept with time_min = NaN and a warning is issued. When False (default), the same behaviour applies but a ValueError is raised if the time_map is a :class:~phospho_velocity.config.TimeMap and uncovered pairs exist.

False

Returns:

Type Description
DataFrame

Copy of df with added time_min column.

Source code in src/phospho_velocity/preprocessing/normalize.py
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def assign_time(
    df: pd.DataFrame,
    time_map: Union[
        None,
        Dict[Tuple[str, int], float],
        "TimeMap",  # noqa: F821 – forward reference
    ] = None,
    allow_missing: bool = False,
) -> pd.DataFrame:
    """Add ``time_min`` column from cell_line + replicate mapping.

    Parameters
    ----------
    df : pd.DataFrame
        Long-format DataFrame with ``cell_line`` and ``replicate`` columns.
    time_map : dict or :class:`~phospho_velocity.config.TimeMap`, optional
        Mapping ``(cell_line, replicate) → time_min``.

        * If a plain :class:`dict` is passed (legacy behaviour), it is used
          directly.
        * If a :class:`~phospho_velocity.config.TimeMap` is passed, its
          ``mapping`` attribute is used and the object's :meth:`validate`
          method is called to detect uncovered pairs.
        * If ``None``, :data:`DEFAULT_TIME_MAP` is used with a deprecation
          warning (PXD000901 hardcoded mapping).
    allow_missing : bool
        When ``True``, rows with no matching time point are kept with
        ``time_min = NaN`` and a warning is issued.  When ``False``
        (default), the same behaviour applies but a ``ValueError`` is raised
        if the *time_map* is a :class:`~phospho_velocity.config.TimeMap` and
        uncovered pairs exist.

    Returns
    -------
    pd.DataFrame
        Copy of *df* with added ``time_min`` column.
    """
    # Resolve the actual dict to use for lookup
    from ..config import TimeMap as TimeMapClass  # local import to avoid circular

    if time_map is None:
        warnings.warn(
            "assign_time: no time_map provided; falling back to the PXD000901 "
            "hardcoded mapping.  Pass a TimeMap object or a dict for portability.",
            DeprecationWarning,
            stacklevel=2,
        )
        mapping: Dict[Tuple[str, int], float] = DEFAULT_TIME_MAP
    elif isinstance(time_map, TimeMapClass):
        # Validate coverage
        uncovered = time_map.validate(df)
        if uncovered:
            msg = (
                f"TimeMap does not cover {len(uncovered)} (cell_line, replicate) "
                f"pair(s): {uncovered[:5]}{'...' if len(uncovered) > 5 else ''}"
            )
            if allow_missing:
                logger.warning("%s — those rows will have time_min=NaN.", msg)
            else:
                raise ValueError(msg)
        mapping = time_map.mapping
    else:
        mapping = time_map  # plain dict (legacy)

    out = df.copy()
    out["time_min"] = out.apply(
        lambda r: np.nan
        if pd.isna(r["replicate"])
        else mapping.get((r["cell_line"], int(r["replicate"])), np.nan),
        axis=1,
    )
    n_missing = out["time_min"].isna().sum()
    if n_missing:
        logger.warning("%d rows could not be mapped to a time point", n_missing)
    return out

compute_log_fold_change(df, value_col='log2_intensity', baseline_time=0.0)

Compute log2 fold-change vs. baseline time per (site_id, cell_line).

Parameters:

Name Type Description Default
df DataFrame

Long-format DataFrame with time_min, site_id, and cell_line columns.

required
value_col str

Column with log2 intensity values.

'log2_intensity'
baseline_time float

Time point to use as baseline (default 0.0).

0.0

Returns:

Type Description
DataFrame

Copy of df with log2FC column added.

Source code in src/phospho_velocity/preprocessing/normalize.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
def compute_log_fold_change(
    df: pd.DataFrame,
    value_col: str = "log2_intensity",
    baseline_time: float = 0.0,
) -> pd.DataFrame:
    """Compute log2 fold-change vs. baseline time per (site_id, cell_line).

    Parameters
    ----------
    df : pd.DataFrame
        Long-format DataFrame with ``time_min``, ``site_id``, and ``cell_line``
        columns.
    value_col : str
        Column with log2 intensity values.
    baseline_time : float
        Time point to use as baseline (default 0.0).

    Returns
    -------
    pd.DataFrame
        Copy of *df* with ``log2FC`` column added.
    """
    out = df.copy()

    baseline = (
        out[out["time_min"] == baseline_time]
        .groupby(["site_id", "cell_line"])[value_col]
        .mean()
        .rename("baseline_val")
    )

    out = out.join(baseline, on=["site_id", "cell_line"])
    out["log2FC"] = np.where(
        out[value_col].notna() & out["baseline_val"].notna(),
        out[value_col] - out["baseline_val"],
        np.nan,
    )
    out = out.drop(columns=["baseline_val"])
    return out

median_normalize(df, value_col='log2_intensity', group_cols=None)

Median-center per cell_line (default), preserving global scale.

Source code in src/phospho_velocity/preprocessing/normalize.py
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def median_normalize(
    df: pd.DataFrame,
    value_col: str = "log2_intensity",
    group_cols: Optional[list[str]] = None,
) -> pd.DataFrame:
    """
    Median-center per cell_line (default), preserving global scale.
    """

    out = df.copy()

    if group_cols is None:
        group_cols = ["cell_line"]

    x = pd.to_numeric(out[value_col], errors="coerce")

    # Global median
    global_median = float(x.median(skipna=True))

    # Per-group median
    group_medians = x.groupby(out[group_cols].apply(tuple, axis=1)).transform("median")

    norm_col = f"{value_col}_norm"

    out[norm_col] = np.where(
        x.notna(),
        x - group_medians + global_median,
        np.nan,
    )

    return out

Visualization

Plotting utilities for trajectories, confidence intervals, and heatmaps.

Visualization utilities for phosphosite trajectories and velocities.

plot_site_overview(vel_df, output_path=None)

Save a multi-panel overview figure.

Parameters:

Name Type Description Default
vel_df DataFrame

Velocity DataFrame from :func:VelocityEstimator.estimate_all_sites.

required
output_path str

File path to save the figure (PNG/PDF/SVG). If None, the figure is returned without saving.

None

Returns:

Name Type Description
fig matplotlib Figure
Source code in src/phospho_velocity/plotting/trajectory.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
def plot_site_overview(
    vel_df: pd.DataFrame,
    output_path: Optional[str] = None,
):
    """Save a multi-panel overview figure.

    Parameters
    ----------
    vel_df : pd.DataFrame
        Velocity DataFrame from :func:`VelocityEstimator.estimate_all_sites`.
    output_path : str, optional
        File path to save the figure (PNG/PDF/SVG).  If None, the figure is
        returned without saving.

    Returns
    -------
    fig : matplotlib Figure
    """
    try:
        import matplotlib.pyplot as plt
    except ImportError as e:
        raise ImportError("matplotlib is required for plotting.") from e

    fig, axes = plt.subplots(1, 2, figsize=(16, 6))
    plot_velocity_heatmap(vel_df, ax=axes[0])

    # Distribution of velocities
    axes[1].hist(
        vel_df["velocity"].dropna(),
        bins=50,
        color="steelblue",
        edgecolor="white",
    )
    axes[1].set_xlabel("Velocity")
    axes[1].set_ylabel("Count")
    axes[1].set_title("Velocity distribution")
    axes[1].axvline(0, color="gray", linestyle="--")

    fig.tight_layout()

    if output_path is not None:
        fig.savefig(output_path, dpi=150, bbox_inches="tight")
        logger.info("Saved overview figure to %s", output_path)

    return fig

plot_site_trajectory(times, values, trajectory_mean, trajectory_std=None, velocity_mean=None, velocity_std=None, velocity_q5=None, velocity_q95=None, site_id='Unknown Site', ax=None)

Plot GP trajectory and velocity for a single phosphosite.

Handles both standard GP uncertainty (mean ± 2σ) and Bayesian Credible Intervals (q5 to q95).

Source code in src/phospho_velocity/plotting/trajectory.py
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def plot_site_trajectory(
    times: np.ndarray,
    values: np.ndarray,
    trajectory_mean: np.ndarray,
    trajectory_std: Optional[np.ndarray] = None,
    velocity_mean: Optional[np.ndarray] = None,
    velocity_std: Optional[np.ndarray] = None,
    velocity_q5: Optional[np.ndarray] = None,  # New: Bayesian Credible Interval
    velocity_q95: Optional[np.ndarray] = None,  # New: Bayesian Credible Interval
    site_id: str = "Unknown Site",
    ax=None,
):
    """Plot GP trajectory and velocity for a single phosphosite.

    Handles both standard GP uncertainty (mean ± 2σ) and
    Bayesian Credible Intervals (q5 to q95).
    """
    try:
        import matplotlib.pyplot as plt
    except ImportError as e:
        raise ImportError("matplotlib is required for plotting.") from e

    # Determine grid based on the length of trajectory_mean
    # This allows for extrapolation beyond the original data range
    grid = np.linspace(
        float(np.nanmin(times)), float(np.nanmax(times)), len(trajectory_mean)
    )
    if velocity_mean is not None and len(velocity_mean) == len(trajectory_mean):
        # If we have a custom grid used in compute.py, it's better to pass it,
        # but this linspace is a safe fallback.
        pass

    if ax is None:
        fig, axes = plt.subplots(2, 1, figsize=(8, 7), sharex=True)
    else:
        fig = ax.figure
        axes = ax if hasattr(ax, "__len__") else [ax]

    ax0, ax1 = axes[0], axes[1]

    # Upper panel: trajectory
    ax0.scatter(
        times, values, color="black", s=40, zorder=5, label="Observed", alpha=0.8
    )
    ax0.plot(grid, trajectory_mean, color="steelblue", lw=2, label="Posterior Mean")

    if trajectory_std is not None:
        ax0.fill_between(
            grid,
            trajectory_mean - 2 * trajectory_std,
            trajectory_mean + 2 * trajectory_std,
            alpha=0.2,
            color="steelblue",
            label="±2σ (Uncertainty)",
        )

    ax0.set_ylabel("log₂ intensity")
    ax0.set_title(f"Trajectory & Velocity: {site_id}", fontweight="bold")
    ax0.legend(fontsize=8, loc="best")
    ax0.grid(True, linestyle="--", alpha=0.5)

    # Lower panel: velocity
    if velocity_mean is not None:
        ax1.plot(grid, velocity_mean, color="firebrick", lw=2, label="Velocity Mean")

        # Priority 1: Use Bayesian Quantiles (q5/q95) if available
        if velocity_q5 is not None and velocity_q95 is not None:
            ax1.fill_between(
                grid,
                velocity_q5,
                velocity_q95,
                alpha=0.25,
                color="firebrick",
                label="90% Credible Interval",
            )
        # Priority 2: Fallback to standard deviation
        elif velocity_std is not None:
            ax1.fill_between(
                grid,
                velocity_mean - 2 * velocity_std,
                velocity_mean + 2 * velocity_std,
                alpha=0.25,
                color="firebrick",
                label="±2σ (Uncertainty)",
            )

        ax1.axhline(0, color="black", linestyle="-", linewidth=0.8, alpha=0.3)
        ax1.set_xlabel("Time (min)")
        ax1.set_ylabel("d(log₂I)/dt")
        ax1.legend(fontsize=8, loc="best")
        ax1.grid(True, linestyle="--", alpha=0.5)

    fig.tight_layout()
    return fig, axes

plot_velocity_heatmap(vel_df, site_col='site_id', time_col='time_min', value_col='velocity', ax=None)

Plot heatmap of velocity across sites × time.

Parameters:

Name Type Description Default
vel_df DataFrame
required
site_col str
'site_id'
time_col str
'time_min'
value_col str
'velocity'
ax matplotlib Axes
None

Returns:

Name Type Description
fig matplotlib Figure
ax matplotlib Axes
Source code in src/phospho_velocity/plotting/trajectory.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def plot_velocity_heatmap(
    vel_df: pd.DataFrame,
    site_col: str = "site_id",
    time_col: str = "time_min",
    value_col: str = "velocity",
    ax=None,
):
    """Plot heatmap of velocity across sites × time.

    Parameters
    ----------
    vel_df : pd.DataFrame
    site_col : str
    time_col : str
    value_col : str
    ax : matplotlib Axes, optional

    Returns
    -------
    fig : matplotlib Figure
    ax : matplotlib Axes
    """
    try:
        import matplotlib.pyplot as plt
    except ImportError as e:
        raise ImportError("matplotlib is required for plotting.") from e

    pivot = vel_df.pivot_table(
        index=site_col, columns=time_col, values=value_col, aggfunc="mean"
    )

    if ax is None:
        fig, ax = plt.subplots(figsize=(10, max(4, len(pivot) * 0.3)))
    else:
        fig = ax.figure

    vmax = np.nanpercentile(np.abs(pivot.values), 95)
    im = ax.imshow(
        pivot.values,
        aspect="auto",
        cmap="RdBu_r",
        vmin=-vmax,
        vmax=vmax,
    )
    ax.set_yticks(range(len(pivot.index)))
    ax.set_yticklabels(pivot.index, fontsize=6)
    ax.set_xticks(range(len(pivot.columns)))
    ax.set_xticklabels([f"{t:.0f}" for t in pivot.columns], rotation=45, fontsize=7)
    ax.set_xlabel("Time (min)")
    ax.set_ylabel("Site ID")
    ax.set_title("Velocity heatmap")
    plt.colorbar(im, ax=ax, label="Velocity")
    return fig, ax

Network & Configuration

Network priors and pipeline configuration.

Kinase-Substrate Network

Kinase-substrate network prior.

Provides a data structure and utilities for incorporating kinase–substrate network knowledge (e.g. from KinomeXplorer / NetworKIN / Creixell et al. 2015) as regularization priors into the velocity model.

KinaseSubstrateNetwork

Kinase–substrate interaction network.

Stores directed kinase → substrate-site edges with optional scores.

Parameters:

Name Type Description Default
None
required
Source code in src/phospho_velocity/network/kinase_substrate.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
class KinaseSubstrateNetwork:
    """Kinase–substrate interaction network.

    Stores directed kinase → substrate-site edges with optional scores.

    Parameters
    ----------
    None
    """

    def __init__(self) -> None:
        self._kinase_to_substrates: Dict[str, List[str]] = defaultdict(list)
        self._site_to_kinases: Dict[str, List[str]] = defaultdict(list)
        self._scores: Dict[tuple, float] = {}

    # ------------------------------------------------------------------
    def load_from_file(
        self, filepath: str, sep: str = "\t"
    ) -> "KinaseSubstrateNetwork":
        """Load kinase–substrate pairs from a tab-delimited file.

        Two column layouts are accepted:

        * **Legacy format**: ``kinase``, ``substrate_site``, (``score``).
        * **Current format** (produced by ``make_network_file.py``):
          ``kinase``, ``substrate``, ``site``, (``score``).
          ``substrate_site`` is derived as ``"{substrate}_{site}"`` so that it
          matches the ``site_id`` keys produced by the MaxQuant parser.

        Parameters
        ----------
        filepath : str
            Path to the network file.
        sep : str
            Column separator.

        Returns
        -------
        self

        Raises
        ------
        ValueError
            If neither the legacy nor the current column layout is detected.
        """
        df = pd.read_csv(filepath, sep=sep)
        cols = set(df.columns)

        if "substrate_site" in cols and "kinase" in cols:
            # Legacy format: substrate_site column already present
            pass
        elif {"kinase", "substrate", "site"}.issubset(cols):
            # Current format: combine substrate + site into substrate_site so
            # the resulting key matches the "<protein>_<aa><position>" site_id
            # convention used throughout the pipeline.
            df = df.assign(
                substrate_site=df["substrate"].astype(str) + "_" + df["site"].astype(str)
            )
        else:
            raise ValueError(
                "Network file must contain either {'kinase', 'substrate_site'} "
                "(legacy format) or {'kinase', 'substrate', 'site'} "
                "(current format); "
                f"found {cols}"
            )

        for _, row in df.iterrows():
            kinase = str(row["kinase"])
            site = str(row["substrate_site"])
            score = float(row.get("score", 1.0)) if "score" in row.index else 1.0

            if site not in self._kinase_to_substrates[kinase]:
                self._kinase_to_substrates[kinase].append(site)
            if kinase not in self._site_to_kinases[site]:
                self._site_to_kinases[site].append(kinase)
            self._scores[(kinase, site)] = score

        logger.info(
            "Loaded %d kinase–substrate pairs for %d kinases and %d sites",
            len(self._scores),
            len(self._kinase_to_substrates),
            len(self._site_to_kinases),
        )
        return self

    # ------------------------------------------------------------------
    def get_substrates(self, kinase: str) -> List[str]:
        """Return substrate sites for *kinase*.

        Parameters
        ----------
        kinase : str

        Returns
        -------
        list of str
        """
        return list(self._kinase_to_substrates.get(kinase, []))

    # ------------------------------------------------------------------
    def get_kinases(self, site: str) -> List[str]:
        """Return kinases for *site*.

        Parameters
        ----------
        site : str

        Returns
        -------
        list of str
        """
        return list(self._site_to_kinases.get(site, []))

    # ------------------------------------------------------------------
    def pssm_weight(
        self,
        site_sequence: str,
        pssm_matrix: np.ndarray,
        aa_order: str = "ACDEFGHIKLMNPQRSTVWY",
    ) -> float:
        """Score a sequence window against a position-specific scoring matrix.

        Parameters
        ----------
        site_sequence : str
            Amino-acid sequence window (e.g. 15-mers centered on phosphosite).
        pssm_matrix : ndarray, shape (L, 20)
            PSSM scores; rows = positions, columns = amino acids in *aa_order*.
        aa_order : str
            Amino acid ordering for PSSM columns.

        Returns
        -------
        float
            Cumulative log-odds score.
        """
        aa_index = {aa: i for i, aa in enumerate(aa_order)}
        score = 0.0
        for pos, aa in enumerate(site_sequence):
            if pos >= pssm_matrix.shape[0]:
                break
            idx = aa_index.get(aa.upper(), -1)
            if idx >= 0:
                score += pssm_matrix[pos, idx]
        return score

    # ------------------------------------------------------------------
    def regularization_prior(
        self,
        site_id: str,
        sites_df: Optional[pd.DataFrame] = None,
    ) -> float:
        """Compute regularization weight based on network connectivity.

        Sites with more kinase connections receive lower regularization
        (stronger prior towards the network-implied trajectory).

        Parameters
        ----------
        site_id : str
        sites_df : pd.DataFrame, optional
            Not currently used; reserved for future extensions.

        Returns
        -------
        float
            Regularization weight in (0, 1].
        """
        n_kinases = len(self._site_to_kinases.get(site_id, []))
        # Sigmoid-like decay: well-connected sites get lower regularization
        weight = 1.0 / (1.0 + n_kinases)
        return float(weight)

get_kinases(site)

Return kinases for site.

Parameters:

Name Type Description Default
site str
required

Returns:

Type Description
list of str
Source code in src/phospho_velocity/network/kinase_substrate.py
119
120
121
122
123
124
125
126
127
128
129
130
def get_kinases(self, site: str) -> List[str]:
    """Return kinases for *site*.

    Parameters
    ----------
    site : str

    Returns
    -------
    list of str
    """
    return list(self._site_to_kinases.get(site, []))

get_substrates(kinase)

Return substrate sites for kinase.

Parameters:

Name Type Description Default
kinase str
required

Returns:

Type Description
list of str
Source code in src/phospho_velocity/network/kinase_substrate.py
105
106
107
108
109
110
111
112
113
114
115
116
def get_substrates(self, kinase: str) -> List[str]:
    """Return substrate sites for *kinase*.

    Parameters
    ----------
    kinase : str

    Returns
    -------
    list of str
    """
    return list(self._kinase_to_substrates.get(kinase, []))

load_from_file(filepath, sep='\t')

Load kinase–substrate pairs from a tab-delimited file.

Two column layouts are accepted:

  • Legacy format: kinase, substrate_site, (score).
  • Current format (produced by make_network_file.py): kinase, substrate, site, (score). substrate_site is derived as "{substrate}_{site}" so that it matches the site_id keys produced by the MaxQuant parser.

Parameters:

Name Type Description Default
filepath str

Path to the network file.

required
sep str

Column separator.

'\t'

Returns:

Type Description
self

Raises:

Type Description
ValueError

If neither the legacy nor the current column layout is detected.

Source code in src/phospho_velocity/network/kinase_substrate.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def load_from_file(
    self, filepath: str, sep: str = "\t"
) -> "KinaseSubstrateNetwork":
    """Load kinase–substrate pairs from a tab-delimited file.

    Two column layouts are accepted:

    * **Legacy format**: ``kinase``, ``substrate_site``, (``score``).
    * **Current format** (produced by ``make_network_file.py``):
      ``kinase``, ``substrate``, ``site``, (``score``).
      ``substrate_site`` is derived as ``"{substrate}_{site}"`` so that it
      matches the ``site_id`` keys produced by the MaxQuant parser.

    Parameters
    ----------
    filepath : str
        Path to the network file.
    sep : str
        Column separator.

    Returns
    -------
    self

    Raises
    ------
    ValueError
        If neither the legacy nor the current column layout is detected.
    """
    df = pd.read_csv(filepath, sep=sep)
    cols = set(df.columns)

    if "substrate_site" in cols and "kinase" in cols:
        # Legacy format: substrate_site column already present
        pass
    elif {"kinase", "substrate", "site"}.issubset(cols):
        # Current format: combine substrate + site into substrate_site so
        # the resulting key matches the "<protein>_<aa><position>" site_id
        # convention used throughout the pipeline.
        df = df.assign(
            substrate_site=df["substrate"].astype(str) + "_" + df["site"].astype(str)
        )
    else:
        raise ValueError(
            "Network file must contain either {'kinase', 'substrate_site'} "
            "(legacy format) or {'kinase', 'substrate', 'site'} "
            "(current format); "
            f"found {cols}"
        )

    for _, row in df.iterrows():
        kinase = str(row["kinase"])
        site = str(row["substrate_site"])
        score = float(row.get("score", 1.0)) if "score" in row.index else 1.0

        if site not in self._kinase_to_substrates[kinase]:
            self._kinase_to_substrates[kinase].append(site)
        if kinase not in self._site_to_kinases[site]:
            self._site_to_kinases[site].append(kinase)
        self._scores[(kinase, site)] = score

    logger.info(
        "Loaded %d kinase–substrate pairs for %d kinases and %d sites",
        len(self._scores),
        len(self._kinase_to_substrates),
        len(self._site_to_kinases),
    )
    return self

pssm_weight(site_sequence, pssm_matrix, aa_order='ACDEFGHIKLMNPQRSTVWY')

Score a sequence window against a position-specific scoring matrix.

Parameters:

Name Type Description Default
site_sequence str

Amino-acid sequence window (e.g. 15-mers centered on phosphosite).

required
pssm_matrix (ndarray, shape(L, 20))

PSSM scores; rows = positions, columns = amino acids in aa_order.

required
aa_order str

Amino acid ordering for PSSM columns.

'ACDEFGHIKLMNPQRSTVWY'

Returns:

Type Description
float

Cumulative log-odds score.

Source code in src/phospho_velocity/network/kinase_substrate.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
def pssm_weight(
    self,
    site_sequence: str,
    pssm_matrix: np.ndarray,
    aa_order: str = "ACDEFGHIKLMNPQRSTVWY",
) -> float:
    """Score a sequence window against a position-specific scoring matrix.

    Parameters
    ----------
    site_sequence : str
        Amino-acid sequence window (e.g. 15-mers centered on phosphosite).
    pssm_matrix : ndarray, shape (L, 20)
        PSSM scores; rows = positions, columns = amino acids in *aa_order*.
    aa_order : str
        Amino acid ordering for PSSM columns.

    Returns
    -------
    float
        Cumulative log-odds score.
    """
    aa_index = {aa: i for i, aa in enumerate(aa_order)}
    score = 0.0
    for pos, aa in enumerate(site_sequence):
        if pos >= pssm_matrix.shape[0]:
            break
        idx = aa_index.get(aa.upper(), -1)
        if idx >= 0:
            score += pssm_matrix[pos, idx]
    return score

regularization_prior(site_id, sites_df=None)

Compute regularization weight based on network connectivity.

Sites with more kinase connections receive lower regularization (stronger prior towards the network-implied trajectory).

Parameters:

Name Type Description Default
site_id str
required
sites_df DataFrame

Not currently used; reserved for future extensions.

None

Returns:

Type Description
float

Regularization weight in (0, 1].

Source code in src/phospho_velocity/network/kinase_substrate.py
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
def regularization_prior(
    self,
    site_id: str,
    sites_df: Optional[pd.DataFrame] = None,
) -> float:
    """Compute regularization weight based on network connectivity.

    Sites with more kinase connections receive lower regularization
    (stronger prior towards the network-implied trajectory).

    Parameters
    ----------
    site_id : str
    sites_df : pd.DataFrame, optional
        Not currently used; reserved for future extensions.

    Returns
    -------
    float
        Regularization weight in (0, 1].
    """
    n_kinases = len(self._site_to_kinases.get(site_id, []))
    # Sigmoid-like decay: well-connected sites get lower regularization
    weight = 1.0 / (1.0 + n_kinases)
    return float(weight)

build_default_network()

Return an empty :class:KinaseSubstrateNetwork.

Serves as a placeholder when no network file is available.

Returns:

Type Description
KinaseSubstrateNetwork
Source code in src/phospho_velocity/network/kinase_substrate.py
193
194
195
196
197
198
199
200
201
202
203
def build_default_network() -> KinaseSubstrateNetwork:
    """Return an empty :class:`KinaseSubstrateNetwork`.

    Serves as a placeholder when no network file is available.

    Returns
    -------
    KinaseSubstrateNetwork
    """
    logger.info("Building empty default kinase–substrate network.")
    return KinaseSubstrateNetwork()

Pipeline Configuration

Configuration system for phospho-velocity pipeline.

Uses Python dataclasses to keep dependencies minimal (no pydantic required). Supports loading from and saving to YAML files.

GenericInputSchema dataclass

Column-name mapping for generic long-format input files.

Parameters:

Name Type Description Default
site_id_col str

Column containing phosphosite identifiers.

'site_id'
gene_col str

Column containing gene/protein names.

'gene'
cell_line_col str

Column containing cell line or sample labels.

'cell_line'
replicate_col str

Column containing replicate indices.

'replicate'
intensity_col str

Column containing (log2) intensity or ratio values.

'intensity'
locprob_col str

Column containing localization probabilities. When None, σ_obs = σ_max is assumed for all rows.

None
Source code in src/phospho_velocity/config.py
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
@dataclass
class GenericInputSchema:
    """Column-name mapping for generic long-format input files.

    Parameters
    ----------
    site_id_col : str
        Column containing phosphosite identifiers.
    gene_col : str
        Column containing gene/protein names.
    cell_line_col : str
        Column containing cell line or sample labels.
    replicate_col : str
        Column containing replicate indices.
    intensity_col : str
        Column containing (log2) intensity or ratio values.
    locprob_col : str, optional
        Column containing localization probabilities.  When ``None``,
        ``σ_obs = σ_max`` is assumed for all rows.
    """

    site_id_col: str = "site_id"
    gene_col: str = "gene"
    cell_line_col: str = "cell_line"
    replicate_col: str = "replicate"
    intensity_col: str = "intensity"
    locprob_col: Optional[str] = None

ModelConfig dataclass

GP model / MCMC parameters.

Source code in src/phospho_velocity/config.py
208
209
210
211
212
213
214
215
216
@dataclass
class ModelConfig:
    """GP model / MCMC parameters."""

    length_scale: float = 30.0
    noise_level: float = 0.5
    n_mcmc_samples: int = 1000
    n_mcmc_tune: int = 1000
    random_seed: int = 42

PipelineConfig dataclass

Top-level pipeline configuration.

Source code in src/phospho_velocity/config.py
219
220
221
222
223
224
225
226
227
228
229
230
231
232
@dataclass
class PipelineConfig:
    """Top-level pipeline configuration."""

    input_file: str = ""
    output_dir: str = "results"
    time_map: Dict[str, float] = field(default_factory=dict)
    time_map_file: Optional[str] = None
    preprocessing: PreprocessingConfig = field(default_factory=PreprocessingConfig)
    model: ModelConfig = field(default_factory=ModelConfig)
    network_file: Optional[str] = None
    cell_lines: Optional[List[str]] = None
    adapter: str = "maxquant"
    input_schema: Optional[GenericInputSchema] = None

PreprocessingConfig dataclass

Preprocessing parameters.

Source code in src/phospho_velocity/config.py
198
199
200
201
202
203
204
205
@dataclass
class PreprocessingConfig:
    """Preprocessing parameters."""

    pseudocount: float = 1.0
    missingness_strategy: str = "keep"
    min_localization_prob: float = 0.75
    baseline_time: float = 0.0

TimeMap dataclass

Mapping from (cell_line, replicate) to time in minutes.

Parameters:

Name Type Description Default
mapping dict

Dictionary with (cell_line_label, replicate_index) keys and time_in_minutes float values.

dict()
Source code in src/phospho_velocity/config.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
@dataclass
class TimeMap:
    """Mapping from ``(cell_line, replicate)`` to time in minutes.

    Parameters
    ----------
    mapping : dict
        Dictionary with ``(cell_line_label, replicate_index)`` keys and
        ``time_in_minutes`` float values.
    """

    mapping: Dict[Tuple[str, int], float] = field(default_factory=dict)

    # ------------------------------------------------------------------
    @classmethod
    def from_dict(cls, d: Dict) -> "TimeMap":
        """Construct a :class:`TimeMap` from a plain dictionary.

        The dictionary keys must be 2-tuples ``(cell_line, replicate)`` or
        strings in the form ``"cell_line_replicate"`` (split on the last
        underscore).

        Parameters
        ----------
        d : dict
            Source mapping.

        Returns
        -------
        TimeMap
        """
        parsed: Dict[Tuple[str, int], float] = {}
        for k, v in d.items():
            if isinstance(k, tuple):
                parsed[(str(k[0]), int(k[1]))] = float(v)
            elif isinstance(k, str) and "_" in k:
                parts = k.rsplit("_", 1)
                parsed[(parts[0], int(parts[1]))] = float(v)
            else:
                raise ValueError(
                    f"Unsupported key format for TimeMap: {k!r}. "
                    "Expected (cell_line, replicate) tuple or 'cell_line_replicate' string."
                )
        return cls(mapping=parsed)

    # ------------------------------------------------------------------
    @classmethod
    def from_csv(cls, path: str) -> "TimeMap":
        """Load a :class:`TimeMap` from a CSV file.

        The CSV must have columns ``cell_line``, ``replicate``, ``time_min``
        (with a header row).

        Parameters
        ----------
        path : str
            Path to the CSV file.

        Returns
        -------
        TimeMap
        """
        parsed: Dict[Tuple[str, int], float] = {}
        with open(path, newline="") as fh:
            reader = csv.DictReader(fh)
            required = {"cell_line", "replicate", "time_min"}
            if reader.fieldnames is None or not required.issubset(
                set(reader.fieldnames)
            ):
                raise ValueError(
                    f"TimeMap CSV must contain columns {required}; "
                    f"found {list(reader.fieldnames or [])}"
                )
            for row in reader:
                key = (str(row["cell_line"]), int(row["replicate"]))
                parsed[key] = float(row["time_min"])
        return cls(mapping=parsed)

    # ------------------------------------------------------------------
    @classmethod
    def from_json(cls, path: str) -> "TimeMap":
        """Load a :class:`TimeMap` from a JSON file.

        The JSON must be an object whose keys are ``"cell_line_replicate"``
        strings and whose values are time-in-minutes floats, **or** a list of
        ``{"cell_line": ..., "replicate": ..., "time_min": ...}`` objects.

        Parameters
        ----------
        path : str
            Path to the JSON file.

        Returns
        -------
        TimeMap
        """
        with open(path) as fh:
            data = json.load(fh)
        if isinstance(data, list):
            parsed: Dict[Tuple[str, int], float] = {}
            for entry in data:
                parsed[(str(entry["cell_line"]), int(entry["replicate"]))] = float(
                    entry["time_min"]
                )
            return cls(mapping=parsed)
        # dict form
        return cls.from_dict(data)

    # ------------------------------------------------------------------
    def validate(
        self,
        df: pd.DataFrame,
        cell_line_col: str = "cell_line",
        replicate_col: str = "replicate",
    ) -> List[Tuple[str, int]]:
        """Return ``(cell_line, replicate)`` pairs in *df* not covered by the mapping.

        Parameters
        ----------
        df : pd.DataFrame
            Long-format data with *cell_line_col* and *replicate_col* columns.
        cell_line_col : str
            Column name for cell line labels.
        replicate_col : str
            Column name for replicate indices.

        Returns
        -------
        list of (str, int)
            Uncovered (cell_line, replicate) pairs; empty if all covered.
        """
        if (
            df.empty
            or cell_line_col not in df.columns
            or replicate_col not in df.columns
        ):
            return []
        unique = df[[cell_line_col, replicate_col]].drop_duplicates()
        pairs: List[Tuple[str, int]] = []
        for _, row in unique.iterrows():
            cell = row[cell_line_col]
            rep = row[replicate_col]
            # Skip rows with missing replicate — they cannot be mapped
            if pd.isna(rep):
                continue
            pairs.append((str(cell), int(rep)))
        return [p for p in pairs if p not in self.mapping]

from_csv(path) classmethod

Load a :class:TimeMap from a CSV file.

The CSV must have columns cell_line, replicate, time_min (with a header row).

Parameters:

Name Type Description Default
path str

Path to the CSV file.

required

Returns:

Type Description
TimeMap
Source code in src/phospho_velocity/config.py
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
@classmethod
def from_csv(cls, path: str) -> "TimeMap":
    """Load a :class:`TimeMap` from a CSV file.

    The CSV must have columns ``cell_line``, ``replicate``, ``time_min``
    (with a header row).

    Parameters
    ----------
    path : str
        Path to the CSV file.

    Returns
    -------
    TimeMap
    """
    parsed: Dict[Tuple[str, int], float] = {}
    with open(path, newline="") as fh:
        reader = csv.DictReader(fh)
        required = {"cell_line", "replicate", "time_min"}
        if reader.fieldnames is None or not required.issubset(
            set(reader.fieldnames)
        ):
            raise ValueError(
                f"TimeMap CSV must contain columns {required}; "
                f"found {list(reader.fieldnames or [])}"
            )
        for row in reader:
            key = (str(row["cell_line"]), int(row["replicate"]))
            parsed[key] = float(row["time_min"])
    return cls(mapping=parsed)

from_dict(d) classmethod

Construct a :class:TimeMap from a plain dictionary.

The dictionary keys must be 2-tuples (cell_line, replicate) or strings in the form "cell_line_replicate" (split on the last underscore).

Parameters:

Name Type Description Default
d dict

Source mapping.

required

Returns:

Type Description
TimeMap
Source code in src/phospho_velocity/config.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
@classmethod
def from_dict(cls, d: Dict) -> "TimeMap":
    """Construct a :class:`TimeMap` from a plain dictionary.

    The dictionary keys must be 2-tuples ``(cell_line, replicate)`` or
    strings in the form ``"cell_line_replicate"`` (split on the last
    underscore).

    Parameters
    ----------
    d : dict
        Source mapping.

    Returns
    -------
    TimeMap
    """
    parsed: Dict[Tuple[str, int], float] = {}
    for k, v in d.items():
        if isinstance(k, tuple):
            parsed[(str(k[0]), int(k[1]))] = float(v)
        elif isinstance(k, str) and "_" in k:
            parts = k.rsplit("_", 1)
            parsed[(parts[0], int(parts[1]))] = float(v)
        else:
            raise ValueError(
                f"Unsupported key format for TimeMap: {k!r}. "
                "Expected (cell_line, replicate) tuple or 'cell_line_replicate' string."
            )
    return cls(mapping=parsed)

from_json(path) classmethod

Load a :class:TimeMap from a JSON file.

The JSON must be an object whose keys are "cell_line_replicate" strings and whose values are time-in-minutes floats, or a list of {"cell_line": ..., "replicate": ..., "time_min": ...} objects.

Parameters:

Name Type Description Default
path str

Path to the JSON file.

required

Returns:

Type Description
TimeMap
Source code in src/phospho_velocity/config.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
@classmethod
def from_json(cls, path: str) -> "TimeMap":
    """Load a :class:`TimeMap` from a JSON file.

    The JSON must be an object whose keys are ``"cell_line_replicate"``
    strings and whose values are time-in-minutes floats, **or** a list of
    ``{"cell_line": ..., "replicate": ..., "time_min": ...}`` objects.

    Parameters
    ----------
    path : str
        Path to the JSON file.

    Returns
    -------
    TimeMap
    """
    with open(path) as fh:
        data = json.load(fh)
    if isinstance(data, list):
        parsed: Dict[Tuple[str, int], float] = {}
        for entry in data:
            parsed[(str(entry["cell_line"]), int(entry["replicate"]))] = float(
                entry["time_min"]
            )
        return cls(mapping=parsed)
    # dict form
    return cls.from_dict(data)

validate(df, cell_line_col='cell_line', replicate_col='replicate')

Return (cell_line, replicate) pairs in df not covered by the mapping.

Parameters:

Name Type Description Default
df DataFrame

Long-format data with cell_line_col and replicate_col columns.

required
cell_line_col str

Column name for cell line labels.

'cell_line'
replicate_col str

Column name for replicate indices.

'replicate'

Returns:

Type Description
list of (str, int)

Uncovered (cell_line, replicate) pairs; empty if all covered.

Source code in src/phospho_velocity/config.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
def validate(
    self,
    df: pd.DataFrame,
    cell_line_col: str = "cell_line",
    replicate_col: str = "replicate",
) -> List[Tuple[str, int]]:
    """Return ``(cell_line, replicate)`` pairs in *df* not covered by the mapping.

    Parameters
    ----------
    df : pd.DataFrame
        Long-format data with *cell_line_col* and *replicate_col* columns.
    cell_line_col : str
        Column name for cell line labels.
    replicate_col : str
        Column name for replicate indices.

    Returns
    -------
    list of (str, int)
        Uncovered (cell_line, replicate) pairs; empty if all covered.
    """
    if (
        df.empty
        or cell_line_col not in df.columns
        or replicate_col not in df.columns
    ):
        return []
    unique = df[[cell_line_col, replicate_col]].drop_duplicates()
    pairs: List[Tuple[str, int]] = []
    for _, row in unique.iterrows():
        cell = row[cell_line_col]
        rep = row[replicate_col]
        # Skip rows with missing replicate — they cannot be mapped
        if pd.isna(rep):
            continue
        pairs.append((str(cell), int(rep)))
    return [p for p in pairs if p not in self.mapping]

load_config(path)

Load a :class:PipelineConfig from a YAML file.

Parameters:

Name Type Description Default
path str

Path to YAML configuration file.

required

Returns:

Type Description
PipelineConfig
Source code in src/phospho_velocity/config.py
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
def load_config(path: str) -> PipelineConfig:
    """Load a :class:`PipelineConfig` from a YAML file.

    Parameters
    ----------
    path : str
        Path to YAML configuration file.

    Returns
    -------
    PipelineConfig
    """
    try:
        import yaml
    except ImportError as e:
        raise ImportError(
            "PyYAML is required for config loading: pip install pyyaml"
        ) from e

    with open(path, "r") as fh:
        data = yaml.safe_load(fh) or {}

    preprocessing = PreprocessingConfig(**data.pop("preprocessing", {}))
    model = ModelConfig(**data.pop("model", {}))

    return PipelineConfig(
        preprocessing=preprocessing,
        model=model,
        **{k: v for k, v in data.items() if k in PipelineConfig.__dataclass_fields__},
    )

save_config(config, path)

Serialize config to a YAML file.

Parameters:

Name Type Description Default
config PipelineConfig
required
path str

Destination path.

required
Source code in src/phospho_velocity/config.py
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
def save_config(config: PipelineConfig, path: str) -> None:
    """Serialize *config* to a YAML file.

    Parameters
    ----------
    config : PipelineConfig
    path : str
        Destination path.
    """
    try:
        import yaml
    except ImportError as e:
        raise ImportError(
            "PyYAML is required for config saving: pip install pyyaml"
        ) from e

    import dataclasses

    def _to_dict(obj):
        if dataclasses.is_dataclass(obj):
            return {k: _to_dict(v) for k, v in dataclasses.asdict(obj).items()}
        return obj

    data = _to_dict(config)
    with open(path, "w") as fh:
        yaml.dump(data, fh, default_flow_style=False)
    logger.info("Saved config to %s", path)

Command Line Interface

The internal CLI orchestrator.

Command-line interface for phospho-velocity.

Provides subcommands: parse, preprocess, fit, run.

build_parser()

Build the top-level argument parser.

Source code in src/phospho_velocity/cli/run.py
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
def build_parser() -> argparse.ArgumentParser:
    """Build the top-level argument parser."""
    parser = argparse.ArgumentParser(
        prog="phospho-velocity",
        description="Phosphosite velocity modeling from LC-MS/MS time-course data.",
    )
    parser.add_argument(
        "--log-level",
        default="INFO",
        choices=["DEBUG", "INFO", "WARNING", "ERROR"],
        help="Logging verbosity (default: INFO).",
    )

    sub = parser.add_subparsers(dest="command", required=True)

    # parse sub-command
    p_parse = sub.add_parser("parse", help="Parse MaxQuant Phospho(STY)Sites.txt.")
    p_parse.add_argument("--input", "-i", required=True, help="Input file path.")
    p_parse.add_argument("--output", "-o", required=True, help="Output CSV path.")
    p_parse.set_defaults(func=_cmd_parse)

    # preprocess sub-command
    p_pre = sub.add_parser("preprocess", help="Normalize and compute log-fold-change.")
    p_pre.add_argument("--input", "-i", required=True, help="Input long-format CSV.")
    p_pre.add_argument("--output", "-o", required=True, help="Output CSV path.")
    p_pre.add_argument(
        "--missingness",
        default="keep",
        choices=["keep", "zero", "interpolate"],
        help="Missingness strategy (default: keep).",
    )
    p_pre.add_argument(
        "--fit-variance-model",
        action="store_true",
        dest="fit_variance_model",
        help="Fit Robin et al. 2019 exponential variance model and save to results/variance_model.json.",
    )
    p_pre.set_defaults(func=_cmd_preprocess)

    # fit sub-command
    p_fit = sub.add_parser("fit", help="Fit GP models and compute velocities.")
    p_fit.add_argument("--input", "-i", required=True, help="Preprocessed CSV.")
    p_fit.add_argument("--output", "-o", required=True, help="Velocity CSV output.")
    p_fit.add_argument("--value-col", default="log2_intensity_norm", dest="value_col")
    p_fit.set_defaults(func=_cmd_fit)

    # run sub-command (full pipeline)
    p_run = sub.add_parser("run", help="Run full pipeline end-to-end.")
    p_run.add_argument("--config", "-c", help="YAML config file.")
    p_run.add_argument("--input", "-i", help="Input file (if no config).")
    p_run.add_argument("--output-dir", default="results", help="Output directory.")
    p_run.add_argument(
        "--missingness",
        default="keep",
        choices=["keep", "zero", "interpolate"],
    )
    p_run.add_argument(
        "--time-map",
        default=None,
        metavar="FILE",
        dest="time_map_file",
        help="CSV time-map file (columns: cell_line,replicate,time_min).",
    )
    p_run.add_argument(
        "--adapter",
        default="maxquant",
        choices=["maxquant", "generic"],
        help="Input adapter type (default: maxquant).",
    )
    p_run.add_argument(
        "--schema-file",
        default=None,
        metavar="FILE",
        dest="schema_file",
        help="TOML/YAML schema file for the generic adapter.",
    )
    p_run.add_argument(
        "--network",
        default=None,
        metavar="FILE",
        dest="network_file",
        help="Kinase–substrate network TSV (columns: kinase,substrate,site,score).",
    )
    p_run.set_defaults(func=_cmd_run)

    # validate sub-command
    p_val = sub.add_parser(
        "validate",
        help="Check time-map coverage against parsed data without running the pipeline.",
    )
    p_val.add_argument("--input", "-i", required=True, help="Parsed long-format CSV.")
    p_val.add_argument(
        "--time-map",
        required=True,
        metavar="FILE",
        dest="time_map_file",
        help="CSV time-map file (columns: cell_line,replicate,time_min).",
    )
    p_val.set_defaults(func=_cmd_validate)

    return parser

main(argv=None)

Entry point for the phospho-velocity CLI.

Source code in src/phospho_velocity/cli/run.py
325
326
327
328
329
330
def main(argv: Optional[list] = None) -> None:
    """Entry point for the ``phospho-velocity`` CLI."""
    parser = build_parser()
    args = parser.parse_args(argv)
    _setup_logging(args.log_level)
    args.func(args)