From 984f5638985e1581c628524fd8288c3fcf2bb60e Mon Sep 17 00:00:00 2001 From: RalfG Date: Fri, 25 Apr 2025 00:27:34 +0200 Subject: [PATCH 1/4] Split-up DeepLC class into separate functions (WIP) --- deeplc/_calibration.py | 344 ++++++ deeplc/_data.py | 49 + deeplc/_finetune.py | 139 +++ deeplc/deeplc.py | 1368 ++++----------------- deeplc/{feat_extractor.py => features.py} | 16 +- deeplc/trainl3.py | 106 -- tests/test_feat_extractor.py | 2 +- 7 files changed, 771 insertions(+), 1253 deletions(-) create mode 100644 deeplc/_calibration.py create mode 100644 deeplc/_data.py create mode 100644 deeplc/_finetune.py rename deeplc/{feat_extractor.py => features.py} (94%) delete mode 100644 deeplc/trainl3.py diff --git a/deeplc/_calibration.py b/deeplc/_calibration.py new file mode 100644 index 0000000..1c67f14 --- /dev/null +++ b/deeplc/_calibration.py @@ -0,0 +1,344 @@ +"""Retention time calibration.""" + +from __future__ import annotations + +import logging +from abc import ABC, abstractmethod + +import numpy as np +from sklearn.linear_model import LinearRegression +from sklearn.pipeline import make_pipeline +from sklearn.preprocessing import SplineTransformer + +from deeplc._exceptions import CalibrationError + +LOGGER = logging.getLogger(__name__) + + +class Calibrator(ABC): + """Abstract base class for retention time calibrators.""" + + @abstractmethod + def __init__(self, *args, **kwargs): + super().__init__() + + @abstractmethod + def fit(measured_tr: np.ndarray, predicted_tr: np.ndarray) -> None: ... + + @abstractmethod + def transform(tr: np.ndarray) -> np.ndarray: ... + + +class PiecewiseLinearCalibrator(Calibrator): + def __init__( + self, + split_cal: int = 50, + bin_distance: float = 2.0, + dict_cal_divider: int = 50, + use_median: bool = True, + ): + """ + Piece-wise linear calibration for retention time. + + Parameters + ---------- + split_cal + Number of splits. + bin_distance + Distance between bins. + dict_cal_divider + # TODO: Make more descriptive + Divider for the dictionary used in the piece-wise linear model. + use_median + # TODO: Make more descriptive + If True, use median instead of mean for calibration. + + """ + super().__init__() + self.split_cal = split_cal + self.bin_distance = bin_distance + self.dict_cal_divider = dict_cal_divider + self.use_median = use_median + + self._calibrate_min = None + self._calibrate_max = None + self._calibrate_dict = None + self._fit = False + + def fit(self, measured_tr: np.ndarray, predicted_tr: np.ndarray) -> None: + """ + Fit a piece-wise linear model to the measured and predicted retention times. + + Parameters + ---------- + measured_tr + Measured retention times. + predicted_tr + Predicted retention times. + + """ + measured_tr, predicted_tr = _process_arrays(measured_tr, predicted_tr) + + mtr_mean = [] + ptr_mean = [] + + calibrate_dict = {} + calibrate_min = float("inf") + calibrate_max = 0 + + LOGGER.debug( + "Selecting the data points for calibration (used to fit the linear models between)" + ) + # smooth between observed and predicted + split_val = predicted_tr[-1] / self.split_cal + + for range_calib_number in np.arange(0.0, predicted_tr[-1], split_val): + ptr_index_start = np.argmax(predicted_tr >= range_calib_number) + ptr_index_end = np.argmax(predicted_tr >= range_calib_number + split_val) + + # no points so no cigar... use previous points + if ptr_index_start >= ptr_index_end: + LOGGER.debug( + "Skipping calibration step, due to no points in the " + "predicted range (are you sure about the split size?): " + "%s,%s", + range_calib_number, + range_calib_number + split_val, + ) + continue + + mtr = measured_tr[ptr_index_start:ptr_index_end] + ptr = predicted_tr[ptr_index_start:ptr_index_end] + + if self.use_median: + mtr_mean.append(np.median(mtr)) + ptr_mean.append(np.median(ptr)) + else: + mtr_mean.append(sum(mtr) / len(mtr)) + ptr_mean.append(sum(ptr) / len(ptr)) + + LOGGER.debug("Fitting the linear models between the points") + + if self.split_cal >= len(measured_tr): + raise CalibrationError( + f"Not enough measured tr ({len(measured_tr)}) for the chosen number of splits " + f"({self.split_cal}). Choose a smaller split_cal parameter or provide more " + "peptides for fitting the calibration curve." + ) + if len(mtr_mean) == 0: + raise CalibrationError("The measured tr list is empty, not able to calibrate") + if len(ptr_mean) == 0: + raise CalibrationError("The predicted tr list is empty, not able to calibrate") + + # calculate calibration curves + for i in range(0, len(ptr_mean)): + if i >= len(ptr_mean) - 1: + continue + delta_ptr = ptr_mean[i + 1] - ptr_mean[i] + delta_mtr = mtr_mean[i + 1] - mtr_mean[i] + + slope = delta_mtr / delta_ptr + intercept = (-1 * (ptr_mean[i] * slope)) + mtr_mean[i] + + # optimized predictions using a dict to find calibration curve very fast + for v in np.arange( + round(ptr_mean[i], self.bin_distance), + round(ptr_mean[i + 1], self.bin_distance), + 1 / ((self.bin_distance) * self.dict_cal_divider), + ): + if v < calibrate_min: + calibrate_min = v + if v > calibrate_max: + calibrate_max = v + calibrate_dict[str(round(v, self.bin_distance))] = (slope, intercept) + + self._calibrate_min = calibrate_min + self._calibrate_max = calibrate_max + self._calibrate_dict = calibrate_dict + + self._fit = True + + def transform(self, tr: np.ndarray) -> np.ndarray: + """ + Transform the predicted retention times using the fitted piece-wise linear model. + + Parameters + ---------- + tr + Retention times to be transformed. + + Returns + ------- + np.ndarray + Transformed retention times. + """ + if not self._fit: + raise CalibrationError( + "The model has not been fitted yet. Please call fit() before transform()." + ) + + if tr.shape[0] == 0: + return np.array([]) + + # TODO: Can this be vectorized? + cal_preds = [] + for uncal_pred in tr: + try: + slope, intercept = self.cal_dict[str(round(uncal_pred, self.bin_distance))] + cal_preds.append(slope * (uncal_pred) + intercept) + except KeyError: + # outside of the prediction range ... use the last + # calibration curve + if uncal_pred <= self.cal_min: + slope, intercept = self.cal_dict[str(round(self.cal_min, self.bin_distance))] + cal_preds.append(slope * (uncal_pred) + intercept) + elif uncal_pred >= self.cal_max: + slope, intercept = self.cal_dict[str(round(self.cal_max, self.bin_distance))] + cal_preds.append(slope * (uncal_pred) + intercept) + else: + slope, intercept = self.cal_dict[str(round(self.cal_max, self.bin_distance))] + cal_preds.append(slope * (uncal_pred) + intercept) + + return np.array(cal_preds) + + +class SplineTransformerCalibrator(Calibrator): + def __init__(self): + """SplineTransformer calibration for retention time.""" + super().__init__() + self._calibrate_min = None + self._calibrate_max = None + self._linear_model_left = None + self._spline_model = None + self._linear_model_right = None + + self._fit = False + + def fit( + self, + measured_tr: np.ndarray, + predicted_tr: np.ndarray, + simplified: bool = False, # TODO: Move to __init__? + ) -> None: + """ + Fit the SplineTransformer model to the measured and predicted retention times. + + Parameters + ---------- + measured_tr + Measured retention times. + predicted_tr + Predicted retention times. + simplified + If True, use a simplified model with fewer knots and a linear model. + If False, use a more complex model with more knots and a spline model. + + """ + measured_tr, predicted_tr = _process_arrays(measured_tr, predicted_tr) + + # Fit a SplineTransformer model + if simplified: + spline = SplineTransformer(degree=2, n_knots=10) + linear_model = LinearRegression() + linear_model.fit(predicted_tr.reshape(-1, 1), measured_tr) + + linear_model_left = linear_model + # TODO @RobbinBouwmeester: Should this be the spline model? + spline_model = linear_model + linear_model_right = linear_model + else: + spline = SplineTransformer(degree=4, n_knots=int(len(measured_tr) / 500) + 5) + spline_model = make_pipeline(spline, LinearRegression()) + spline_model.fit(predicted_tr.reshape(-1, 1), measured_tr) + + # Determine the top 10% of data on either end + n_top = int(len(predicted_tr) * 0.1) + + # Fit a linear model on the bottom 10% (left-side extrapolation) + X_left = predicted_tr[:n_top] + y_left = measured_tr[:n_top] + linear_model_left = LinearRegression() + linear_model_left.fit(X_left.reshape(-1, 1), y_left) + + # Fit a linear model on the top 10% (right-side extrapolation) + X_right = predicted_tr[-n_top:] + y_right = measured_tr[-n_top:] + linear_model_right = LinearRegression() + linear_model_right.fit(X_right.reshape(-1, 1), y_right) + + self._calibrate_min = min(predicted_tr) + self._calibrate_max = max(predicted_tr) + self._linear_model_left = linear_model_left + self._spline_model = spline_model + self._linear_model_right = linear_model_right + + self._fit = True + + def transform(self, tr: np.ndarray) -> np.ndarray: + """ + Transform the predicted retention times using the fitted SplineTransformer model. + + Parameters + ---------- + tr + Retention times to be transformed. + + Returns + ------- + np.ndarray + Transformed retention times. + """ + if not self._fit: + raise CalibrationError( + "The model has not been fitted yet. Please call fit() before transform()." + ) + + if tr.shape[0] == 0: + return np.array([]) + + y_pred_spline = self._spline_model.predict(tr.reshape(-1, 1)) + y_pred_left = self._linear_model_left.predict(tr.reshape(-1, 1)) + y_pred_right = self._linear_model_right.predict(tr.reshape(-1, 1)) + + # Use spline model within the range of X + within_range = (tr >= self.cal_min) & (tr <= self.cal_max) + within_range = within_range.ravel() # Ensure this is a 1D array for proper indexing + + # Create a prediction array initialized with spline predictions + cal_preds = np.copy(y_pred_spline) + + # Replace predictions outside the range with the linear model predictions + cal_preds[~within_range & (tr.ravel() < self.cal_min)] = y_pred_left[ + ~within_range & (tr.ravel() < self.cal_min) + ] + cal_preds[~within_range & (tr.ravel() > self.cal_max)] = y_pred_right[ + ~within_range & (tr.ravel() > self.cal_max) + ] + + return np.array(cal_preds) + + +def _process_arrays( + measured_tr: np.ndarray, + predicted_tr: np.ndarray, +) -> tuple[np.ndarray, np.ndarray]: + """Process the measured and predicted retention times.""" + # Check array lengths + if len(measured_tr) != len(predicted_tr): + raise ValueError( + f"Measured and predicted retention times must have the same length. " + f"Got {len(measured_tr)} and {len(predicted_tr)}." + ) + + # Ensure both arrays are 1D + if len(measured_tr.shape) > 1: + measured_tr = measured_tr.flatten() + if len(predicted_tr.shape) > 1: + predicted_tr = predicted_tr.flatten() + + # Sort arrays by predicted_tr + indices = np.argsort(predicted_tr) + measured_tr = np.array(measured_tr, dtype=np.float32)[indices] + predicted_tr = np.array(predicted_tr, dtype=np.float32)[indices] + + return measured_tr, predicted_tr diff --git a/deeplc/_data.py b/deeplc/_data.py new file mode 100644 index 0000000..e5a947e --- /dev/null +++ b/deeplc/_data.py @@ -0,0 +1,49 @@ +import torch +from torch.utils.data import Dataset + + +class DeepLCDataset(Dataset): + """ + Custom Dataset class for DeepLC used for loading features from peptide sequences. + + Parameters + ---------- + X : ndarray + Feature matrix for input data. + X_sum : ndarray + Feature matrix for sum of input data. + X_global : ndarray + Feature matrix for global input data. + X_hc : ndarray + Feature matrix for high-order context features. + target : ndarray, optional + The target retention times. Default is None. + """ + + def __init__(self, X, X_sum, X_global, X_hc, target=None): + self.X = torch.from_numpy(X).float() + self.X_sum = torch.from_numpy(X_sum).float() + self.X_global = torch.from_numpy(X_global).float() + self.X_hc = torch.from_numpy(X_hc).float() + + if target is not None: + self.target = torch.from_numpy(target).float() # Add target values if provided + else: + self.target = None # If no target is provided, set it to None + + def __len__(self): + return self.X.shape[0] + + def __getitem__(self, idx): + if self.target is not None: + # Return both features and target during training + return ( + self.X[idx], + self.X_sum[idx], + self.X_global[idx], + self.X_hc[idx], + self.target[idx], + ) + else: + # Return only features during prediction + return (self.X[idx], self.X_sum[idx], self.X_global[idx], self.X_hc[idx]) diff --git a/deeplc/_finetune.py b/deeplc/_finetune.py new file mode 100644 index 0000000..aebfe21 --- /dev/null +++ b/deeplc/_finetune.py @@ -0,0 +1,139 @@ +from __future__ import annotations + +import copy +import logging + +import torch +from torch.utils.data import DataLoader + +LOGGER = logging.getLogger(__name__) + + +class DeepLCFineTuner: + """ + Class for fine-tuning a DeepLC model. + + Parameters + ---------- + model : torch.nn.Module + The model to fine-tune. + train_data : torch.utils.data.Dataset + Dataset containing the training data. + device : str, optional, default='cpu' + The device on which to run the model ('cpu' or 'cuda'). + learning_rate : float, optional, default=0.001 + The learning rate for the optimizer. + epochs : int, optional, default=10 + Number of training epochs. + batch_size : int, optional, default=256 + Batch size for training. + validation_data : torch.utils.data.Dataset or None, optional + If provided, used directly for validation. Otherwise, a fraction of + `train_data` will be held out. + validation_split : float, optional, default=0.1 + Fraction of `train_data` to reserve for validation when + `validation_data` is None. + patience : int, optional, default=5 + Number of epochs with no improvement on validation loss before stopping. + """ + + def __init__( + self, + model, + train_data, + device="cpu", + learning_rate=0.001, + epochs=10, + batch_size=256, + validation_data=None, + validation_split=0.1, + patience=5, + ): + self.model = model.to(device) + self.train_data = train_data + self.device = device + self.learning_rate = learning_rate + self.epochs = epochs + self.batch_size = batch_size + self.validation_data = validation_data + self.validation_split = validation_split + self.patience = patience + + def _freeze_layers(self, unfreeze_keywords="33_1"): + """ + Freezes all layers except those that contain the unfreeze_keyword + in their name. + """ + + for name, param in self.model.named_parameters(): + param.requires_grad = unfreeze_keywords in name + + def prepare_data(self, data, shuffle=True): + return DataLoader(data, batch_size=self.batch_size, shuffle=shuffle) + + def fine_tune(self): + LOGGER.debug("Starting fine-tuning...") + if self.validation_data is None: + # Split the training data into training and validation sets + val_size = int(len(self.train_data) * self.validation_split) + train_size = len(self.train_data) - val_size + train_dataset, val_dataset = torch.utils.data.random_split( + self.train_data, [train_size, val_size] + ) + else: + train_dataset = self.train_data + val_dataset = self.validation_data + train_loader = self.prepare_data(train_dataset) + val_loader = self.prepare_data(val_dataset, shuffle=False) + + optimizer = torch.optim.Adam( + filter(lambda p: p.requires_grad, self.model.parameters()), + lr=self.learning_rate, + ) + loss_fn = torch.nn.L1Loss() + best_model_wts = copy.deepcopy(self.model.state_dict()) + best_val_loss = float("inf") + epochs_no_improve = 0 + + for epoch in range(self.epochs): + running_loss = 0.0 + self.model.train() + for batch in train_loader: + batch_X, batch_X_sum, batch_X_global, batch_X_hc, target = batch + + target = target.view(-1, 1) + + optimizer.zero_grad() + outputs = self.model(batch_X, batch_X_sum, batch_X_global, batch_X_hc) + loss = loss_fn(outputs, target) + loss.backward() + optimizer.step() + running_loss += loss.item() + avg_loss = running_loss / len(train_loader) + + self.model.eval() + val_loss = 0.0 + with torch.no_grad(): + for batch in val_loader: + batch_X, batch_X_sum, batch_X_global, batch_X_hc, target = batch + target = target.view(-1, 1) + outputs = self.model(batch_X, batch_X_sum, batch_X_global, batch_X_hc) + val_loss += loss_fn(outputs, target).item() + avg_val_loss = val_loss / len(val_loader) + + LOGGER.debug( + f"Epoch {epoch + 1}/{self.epochs}, " + f"Loss: {avg_loss:.4f}, " + f"Validation Loss: {avg_val_loss:.4f}" + ) + if avg_val_loss < best_val_loss: + best_val_loss = avg_val_loss + best_model_wts = copy.deepcopy(self.model.state_dict()) + epochs_no_improve = 0 + else: + epochs_no_improve += 1 + if epochs_no_improve >= self.patience: + LOGGER.debug(f"Early stopping triggered {epoch + 1}") + break + self.model.load_state_dict(best_model_wts) + return self.model diff --git a/deeplc/deeplc.py b/deeplc/deeplc.py index 22f1b92..f72fa61 100644 --- a/deeplc/deeplc.py +++ b/deeplc/deeplc.py @@ -19,37 +19,27 @@ "Sven Degroeve", ] -# Default models, will be used if no other is specified. If no best model is -# selected during calibration, the first model in the list will be used. import copy -import gc import logging import multiprocessing -import multiprocessing.dummy import os import random import sys import warnings -from configparser import ConfigParser from pathlib import Path from tempfile import TemporaryDirectory import numpy as np import pandas as pd import torch -from dask import compute, delayed from psm_utils import PSM, Peptidoform, PSMList from psm_utils.io import read_file -from psm_utils.io.peptide_record import peprec_to_proforma -from sklearn.base import BaseEstimator -from sklearn.linear_model import LinearRegression -from sklearn.pipeline import make_pipeline -from sklearn.preprocessing import SplineTransformer -from torch.utils.data import DataLoader, Dataset +from torch.utils.data import DataLoader -from deeplc._exceptions import CalibrationError -from deeplc.feat_extractor import aggregate_encodings, encode_peptidoform -from deeplc.trainl3 import train_elastic_net +from deeplc._calibration import Calibrator, SplineTransformerCalibrator +from deeplc._data import DeepLCDataset +from deeplc._finetune import DeepLCFineTuner +from deeplc.features import extract_features, unpack_features # If CLI/GUI/frozen: disable warnings before importing IS_CLI_GUI = os.path.basename(sys.argv[0]) in ["deeplc", "deeplc-gui"] @@ -57,1191 +47,279 @@ if IS_CLI_GUI or IS_FROZEN: warnings.filterwarnings("ignore", category=UserWarning) +# Default models, will be used if no other is specified. If no best model is +# selected during calibration, the first model in the list will be used. DEEPLC_DIR = os.path.dirname(os.path.realpath(__file__)) DEFAULT_MODELS = [ "mods/full_hc_PXD005573_pub_1fd8363d9af9dcad3be7553c39396960.pt", "mods/full_hc_PXD005573_pub_8c22d89667368f2f02ad996469ba157e.pt", "mods/full_hc_PXD005573_pub_cb975cfdd4105f97efa0b3afffe075cc.pt", ] -DEFAULT_MODELS = [os.path.join(DEEPLC_DIR, dm) for dm in DEFAULT_MODELS] - -LIBRARY = {} - - -# TODO: Keep for torch? -# Set to force CPU calculations -os.environ["CUDA_VISIBLE_DEVICES"] = "-1" +DEFAULT_MODELS = [os.path.join(DEEPLC_DIR, m) for m in DEFAULT_MODELS] logger = logging.getLogger(__name__) -def split_list(a, n): - k, m = divmod(len(a), n) - return (a[i * k + min(i, m) : (i + 1) * k + min(i + 1, m)] for i in range(n)) - - -def divide_chunks(list_, n_chunks): - """Yield successive n-sized chunks from list_.""" - for i in range(0, len(list_), n_chunks): - yield list_[i : i + n_chunks] - - -class DeepLCDataset(Dataset): +def predict( + psm_list: PSMList | None = None, + model_files: str | list[str] | None = None, + calibrator: Calibrator | None = None, + batch_size: int | None = None, + single_model_mode: bool = False, +): """ - Custom Dataset class for DeepLC used for loading features from peptide sequences. + Make predictions for sequences, in batches if required. Parameters ---------- - X : ndarray - Feature matrix for input data. - X_sum : ndarray - Feature matrix for sum of input data. - X_global : ndarray - Feature matrix for global input data. - X_hc : ndarray - Feature matrix for high-order context features. - target : ndarray, optional - The target retention times. Default is None. - """ - - def __init__(self, X, X_sum, X_global, X_hc, target=None): - self.X = torch.from_numpy(X).float() - self.X_sum = torch.from_numpy(X_sum).float() - self.X_global = torch.from_numpy(X_global).float() - self.X_hc = torch.from_numpy(X_hc).float() - - if target is not None: - self.target = torch.from_numpy( - target - ).float() # Add target values if provided - else: - self.target = None # If no target is provided, set it to None - - def __len__(self): - return self.X.shape[0] + psm_list + PSMList object containing the peptidoforms to predict for. + model_files + Model file (or files) to use for prediction. If None, the default model is used. + calibrator + Calibrator object to use for calibration. If None, no calibration is performed. + batch_size + How many samples per batch to load (default: 1). + single_model_mode + Whether to use a single model instead of multiple default models. Only applies if + model_file is None. + + Returns + ------- + np.array + predictions - def __getitem__(self, idx): - if self.target is not None: - # Return both features and target during training - return ( - self.X[idx], - self.X_sum[idx], - self.X_global[idx], - self.X_hc[idx], - self.target[idx], - ) + """ + if len(psm_list) == 0: + return [] + + # Extract features + features = extract_features(psm_list) + X_sum, X_global, X_hc, X_main = unpack_features(features) + dataset = DeepLCDataset(X_main, X_sum, X_global, X_hc) + loader = DataLoader(dataset, batch_size=batch_size, shuffle=False) + + if model_files is not None: + if isinstance(model_files, str): + model_files = [model_files] + elif isinstance(model_files, list): + model_files = model_files else: - # Return only features during prediction - return (self.X[idx], self.X_sum[idx], self.X_global[idx], self.X_hc[idx]) + raise ValueError("Invalid model name provided.") + else: + model_files = [DEFAULT_MODELS[0]] if single_model_mode else DEFAULT_MODELS + # Get predictions; iterate over models if multiple were selected + model_predictions = [] + for model_f in model_files: + # Load model + mod = torch.load(model_f, weights_only=False, map_location=torch.device("cpu")) + mod.eval() -class DeepLCFineTuner: + # Predict + ret_preds = [] + with torch.no_grad(): + for batch in loader: + batch_X, batch_X_sum, batch_X_global, batch_X_hc = batch + batch_preds = mod(batch_X, batch_X_sum, batch_X_global, batch_X_hc) + ret_preds.append(batch_preds.detach().cpu().numpy()) + + # Concatenate predictions + ret_preds = np.concatenate(ret_preds, axis=0) + + # TODO: Bring outside of model loop? + # Calibrate + if calibrator is not None: + ret_preds = calibrator.transform(ret_preds) + + model_predictions.append(ret_preds) + + # Average the predictions from all models + averaged_predictions = np.mean(model_predictions, axis=0) + + return averaged_predictions + + +# TODO: Split-of transfer learning? +def calibrate( + psm_list: PSMList | None = None, + measured_tr: np.ndarray | None = None, + model_files: str | list[str] | None = None, + location_retraining_models: str = "", + sample_for_calibration_curve: int | None = None, + return_plotly_report=False, + n_jobs: int | None = None, + batch_size: int = int(1e6), + fine_tune: bool = False, + n_epochs: int = 20, + calibrator: Calibrator | None = None, +) -> dict | None: """ - Class for fine-tuning a DeepLC model. + Find best model and calibrate. Parameters ---------- - model : torch.nn.Module - The model to fine-tune. - train_data : torch.utils.data.Dataset - Dataset containing the training data. - device : str, optional, default='cpu' - The device on which to run the model ('cpu' or 'cuda'). - learning_rate : float, optional, default=0.001 - The learning rate for the optimizer. - epochs : int, optional, default=10 - Number of training epochs. - batch_size : int, optional, default=256 - Batch size for training. - validation_data : torch.utils.data.Dataset or None, optional - If provided, used directly for validation. Otherwise, a fraction of - `train_data` will be held out. - validation_split : float, optional, default=0.1 - Fraction of `train_data` to reserve for validation when - `validation_data` is None. - patience : int, optional, default=5 - Number of epochs with no improvement on validation loss before stopping. - """ - - def __init__( - self, - model, - train_data, - device="cpu", - learning_rate=0.001, - epochs=10, - batch_size=256, - validation_data=None, - validation_split=0.1, - patience=5, - ): - self.model = model.to(device) - self.train_data = train_data - self.device = device - self.learning_rate = learning_rate - self.epochs = epochs - self.batch_size = batch_size - self.validation_data = validation_data - self.validation_split = validation_split - self.patience = patience - - def _freeze_layers(self, unfreeze_keywords="33_1"): - """ - Freezes all layers except those that contain the unfreeze_keyword - in their name. - """ - - for name, param in self.model.named_parameters(): - param.requires_grad = unfreeze_keywords in name - - def prepare_data(self, data, shuffle=True): - return DataLoader(data, batch_size=self.batch_size, shuffle=shuffle) - - def fine_tune(self): - logger.debug("Starting fine-tuning...") - if self.validation_data is None: - # Split the training data into training and validation sets - val_size = int(len(self.train_data) * self.validation_split) - train_size = len(self.train_data) - val_size - train_dataset, val_dataset = torch.utils.data.random_split( - self.train_data, [train_size, val_size] - ) - else: - train_dataset = self.train_data - val_dataset = self.validation_data - train_loader = self.prepare_data(train_dataset) - val_loader = self.prepare_data(val_dataset, shuffle=False) - - optimizer = torch.optim.Adam( - filter(lambda p: p.requires_grad, self.model.parameters()), - lr=self.learning_rate, - ) - loss_fn = torch.nn.L1Loss() - best_model_wts = copy.deepcopy(self.model.state_dict()) - best_val_loss = float("inf") - epochs_no_improve = 0 - - for epoch in range(self.epochs): - running_loss = 0.0 - self.model.train() - for batch in train_loader: - batch_X, batch_X_sum, batch_X_global, batch_X_hc, target = batch - - target = target.view(-1, 1) - - optimizer.zero_grad() - outputs = self.model(batch_X, batch_X_sum, batch_X_global, batch_X_hc) - loss = loss_fn(outputs, target) - loss.backward() - optimizer.step() - running_loss += loss.item() - avg_loss = running_loss / len(train_loader) - - self.model.eval() - val_loss = 0.0 - with torch.no_grad(): - for batch in val_loader: - batch_X, batch_X_sum, batch_X_global, batch_X_hc, target = batch - target = target.view(-1, 1) - outputs = self.model( - batch_X, batch_X_sum, batch_X_global, batch_X_hc - ) - val_loss += loss_fn(outputs, target).item() - avg_val_loss = val_loss / len(val_loader) - - logger.debug( - f"Epoch {epoch + 1}/{self.epochs}, " - f"Loss: {avg_loss:.4f}, " - f"Validation Loss: {avg_val_loss:.4f}" - ) - if avg_val_loss < best_val_loss: - best_val_loss = avg_val_loss - best_model_wts = copy.deepcopy(self.model.state_dict()) - epochs_no_improve = 0 - else: - epochs_no_improve += 1 - if epochs_no_improve >= self.patience: - logger.debug(f"Early stopping triggered {epoch + 1}") - break - self.model.load_state_dict(best_model_wts) - return self.model - - -class DeepLC: - """ - DeepLC predictor. - - Methods + psm_list + PSMList object containing the peptidoforms to predict for. + measured_tr + Measured retention time used for calibration. Should correspond to the PSMs in the + provided PSMs. If None, the measured retention time is taken from the PSMs. + model_files + Path to one or mode models to test and calibrat for. If a list of models is passed, + the best performing one on the calibration data will be selected. + correction_factor + correction factor that needs to be applied to the supplied measured tr's + location_retraining_models + Location to save the retraining models; if None, a temporary directory is used. + sample_for_calibration_curve + Number of PSMs to sample for calibration curve; if None, all provided PSMs are used. + use_median + Whether to use the median value in a window to perform calibration; only applies to + piecewise linear calibration, not to PyGAM calibration. + return_plotly_report + Whether to return a plotly report with the calibration results. + + Returns ------- - calibrate_preds - Find best model and calibrate - make_preds - Make predictions + dict | None + Dictionary with plotly report information or None. """ + # Getting measured retention time either from measured_tr or provided PSMs + if not measured_tr: + measured_tr = [psm.retention_time for psm in psm_list] + if None in measured_tr: + raise ValueError("Not all PSMs have an observed retention time.") - library = {} + n_jobs = multiprocessing.cpu_count() if n_jobs is None else n_jobs - def __init__( - self, - main_path: str | None = None, - path_model: str | None = None, - verbose: bool = True, - bin_distance: float = 2.0, - dict_cal_divider: int = 50, - split_cal: int = 50, - n_jobs: int | None = None, - config_file: str | None = None, - f_extractor: None = None, - cnn_model: bool = True, - # batch_num: int = 250000, - batch_num: int = int(1e6), - batch_num_tf: int = 1024, - write_library: bool = False, - use_library: str | None = None, - reload_library: bool = False, - pygam_calibration: bool = True, - deepcallc_mod: bool = False, - deeplc_retrain: bool = False, - predict_ccs: bool = False, - n_epochs: int = 20, - single_model_mode: bool = True, - ): - """ - Initialize the DeepLC predictor. + if calibrator is None: + calibrator = SplineTransformerCalibrator() - Parameters - ---------- - main_path - Main path of the module. - path_model - Path to prediction model(s); if not provided, the best default model is selected based - on calibration peptides. - verbose - Turn logging on/off. - bin_dist - Precision factor for calibration lookup. - dict_cal_divider - Divider that sets the precision for fast lookup of retention times in calibration; e.g. - 10 means a precision of 0.1 between the calibration anchor points - split_cal - Number of splits in the chromatogram for piecewise linear calibration. - n_jobs - Number of CPU threads to use. - config_file - Path to a configuration file. - f_extractor - Deprecated. - cnn_model - Flag indicating whether to use the CNN model. - batch_num - Prediction batch size (in peptides); lower values reduce memory footprint. - batch_num_tf - Batch size for TensorFlow predictions. - write_library - Whether to append new predictions to a library for faster future access. - use_library - Library file to read from or write to for prediction caching. - reload_library - Whether to reload the prediction library. - pygam_calibration - Flag to enable calibration using PyGAM. - deepcallc_mod - Flag specific to deepcallc mode. - deeplc_retrain - Flag indicating whether to perform retraining (transfer learning) of prediction models. - predict_ccs - Flag to control prediction of CCS values. - n_epochs - Number of epochs used in retraining if deeplc_retrain is enabled. - single_model_mode - Flag to use a single model instead of multiple default models. + # Ensuring self.model is list of strings + model_files = model_files or DEFAULT_MODELS + if isinstance(model_files, str): + model_files = [model_files] - """ - # if a config file is defined overwrite standard parameters - if config_file: - cparser = ConfigParser() - cparser.read(config_file) - dict_cal_divider = cparser.getint("DeepLC", "dict_cal_divider") - split_cal = cparser.getint("DeepLC", "split_cal") - n_jobs = cparser.getint("DeepLC", "n_jobs") + logger.debug("Start to calibrate predictions ...") + logger.debug(f"Ready to find the best model out of: {model_files}") - self.main_path = main_path or os.path.dirname(os.path.realpath(__file__)) - self.path_model = self._get_model_paths(path_model, single_model_mode) - self.verbose = verbose - self.bin_distance = bin_distance - self.dict_cal_divider = dict_cal_divider - self.split_cal = split_cal - self.n_jobs = multiprocessing.cpu_count() if n_jobs is None else n_jobs - self.config_file = config_file - self.cnn_model = cnn_model - self.model_cache = {} - self.batch_num = batch_num - self.batch_num_tf = batch_num_tf - self.write_library = write_library - self.use_library = use_library - self.reload_library = reload_library - self.pygam_calibration = pygam_calibration - self.deepcallc_mod = deepcallc_mod - self.deeplc_retrain = deeplc_retrain - self.predict_ccs = predict_ccs - self.n_epochs = n_epochs + if fine_tune: + logger.debug("Preparing for model fine-tuning...") - # Apparently... - self.model = self.path_model + features = extract_features(psm_list) + X_sum, X_global, X_hc, X_main = unpack_features(features) - if f_extractor: - warnings.DeprecationWarning("f_extractor argument is deprecated.") + dataset = DeepLCDataset(X_main, X_sum, X_global, X_hc, np.array(measured_tr)) - # TODO REMOVE!!! - self.verbose = True - - # Calibration variables - self.calibrate_dict = {} - self.calibrate_min = float("inf") - self.calibrate_max = 0 - - if self.deepcallc_mod: - self.write_library = False - self.use_library = None - self.reload_library = False - - def __str__(self): - return """ - _____ _ _____ - | __ \ | | / ____| - | | | | ___ ___ _ __ | | | | - | | | |/ _ \/ _ \ '_ \| | | | - | |__| | __/ __/ |_) | |___| |____ - |_____/ \___|\___| .__/|______\_____| - | | - |_| - """ - - @staticmethod - def _get_model_paths( - passed_model_path: str | None, single_model_mode: bool - ) -> list[str]: - """Get the model paths based on the passed model path and the single model mode.""" - if passed_model_path: - return [passed_model_path] - - if single_model_mode: - return [DEFAULT_MODELS[0]] - - return DEFAULT_MODELS - - def _prepare_feature_matrices(self, psm_list): - """ - Extract features in parallel and assemble the four input matrices. - - Parameters - ---------- - psm_list : list of PSM - List of peptide‐spectrum matches for which to extract features. - - Returns - ------- - X : ndarray, shape (n_peptides, n_features) - X_sum : ndarray, shape (n_peptides, n_sum_features) - X_global : ndarray, shape (n_peptides, n_global_features * 2) - X_hc : ndarray, shape (n_peptides, n_hc_features) - """ - feats = self.do_f_extraction_psm_list_parallel(psm_list) - X = np.stack(list(feats["matrix"].values())) - X_sum = np.stack(list(feats["matrix_sum"].values())) - X_global = np.concatenate( - ( - np.stack(list(feats["matrix_all"].values())), - np.stack(list(feats["pos_matrix"].values())), - ), - axis=1, + base_model_path = model_files[0] + base_model = torch.load( + base_model_path, weights_only=False, map_location=torch.device("cpu") ) - X_hc = np.stack(list(feats["matrix_hc"].values())) - return X, X_sum, X_global, X_hc - - def _extract_features( - self, - peptidoforms: list[str | Peptidoform] | PSMList, - chunk_size: int = 10000, - ) -> dict[str, dict[int, np.ndarray]]: - """Extract features for all peptidoforms.""" - if isinstance(peptidoforms, PSMList): - peptidoforms = [psm.peptidoform for psm in peptidoforms] - - logger.debug("Running feature extraction in single-threaded mode...") - if self.n_jobs <= 1: - encodings = [ - encode_peptidoform(pf, predict_ccs=self.predict_ccs) - for pf in peptidoforms - ] - - else: - logger.debug("Preparing feature extraction with Dask") - # Process peptidoforms in larger chunks to reduce task overhead. - peptidoform_strings = [ - str(pep) for pep in peptidoforms - ] # Faster pickling of strings - - def chunked_encode(chunk): - return [ - encode_peptidoform(pf, predict_ccs=self.predict_ccs) for pf in chunk - ] - - tasks = [ - delayed(chunked_encode)(peptidoform_strings[i : i + chunk_size]) - for i in range(0, len(peptidoform_strings), chunk_size) - ] - - logger.debug("Starting feature extraction with Dask") - chunks_encodings = compute( - *tasks, scheduler="processes", workers=self.n_jobs - ) - - # Flatten the list of lists. - encodings = [enc for chunk in chunks_encodings for enc in chunk] - - # Aggregate the encodings into a single dictionary. - aggregated_encodings = aggregate_encodings(encodings) + base_model.eval() - logger.debug("Finished feature extraction") - - return aggregated_encodings - - def _apply_calibration_core( - self, - uncal_preds: np.ndarray, - cal_dict: dict | list[BaseEstimator], - cal_min: float, - cal_max: float, - ) -> np.ndarray: - """Apply calibration to the predictions.""" - if len(uncal_preds) == 0: - return np.array([]) - - cal_preds = [] - if self.pygam_calibration: - linear_model_left, spline_model, linear_model_right = cal_dict - y_pred_spline = spline_model.predict(uncal_preds.reshape(-1, 1)) - y_pred_left = linear_model_left.predict(uncal_preds.reshape(-1, 1)) - y_pred_right = linear_model_right.predict(uncal_preds.reshape(-1, 1)) - - # Use spline model within the range of X - within_range = (uncal_preds >= cal_min) & (uncal_preds <= cal_max) - within_range = ( - within_range.ravel() - ) # Ensure this is a 1D array for proper indexing - - # Create a prediction array initialized with spline predictions - cal_preds = np.copy(y_pred_spline) + fine_tuner = DeepLCFineTuner( + model=base_model, + train_data=dataset, + batch_size=batch_size, + epochs=n_epochs, + ) + # fine_tuner._freeze_layers() + fine_tuned_model = fine_tuner.fine_tune() - # Replace predictions outside the range with the linear model predictions - cal_preds[~within_range & (uncal_preds.ravel() < cal_min)] = y_pred_left[ - ~within_range & (uncal_preds.ravel() < cal_min) - ] - cal_preds[~within_range & (uncal_preds.ravel() > cal_max)] = y_pred_right[ - ~within_range & (uncal_preds.ravel() > cal_max) - ] + if location_retraining_models: + os.makedirs(location_retraining_models, exist_ok=True) + temp_dir_obj = TemporaryDirectory() + t_dir_models = temp_dir_obj.name else: - for uncal_pred in uncal_preds: - try: - slope, intercept = cal_dict[ - str(round(uncal_pred, self.bin_distance)) - ] - cal_preds.append(slope * (uncal_pred) + intercept) - except KeyError: - # outside of the prediction range ... use the last - # calibration curve - if uncal_pred <= cal_min: - slope, intercept = cal_dict[ - str(round(cal_min, self.bin_distance)) - ] - cal_preds.append(slope * (uncal_pred) + intercept) - elif uncal_pred >= cal_max: - slope, intercept = cal_dict[ - str(round(cal_max, self.bin_distance)) - ] - cal_preds.append(slope * (uncal_pred) + intercept) - else: - slope, intercept = cal_dict[ - str(round(cal_max, self.bin_distance)) - ] - cal_preds.append(slope * (uncal_pred) + intercept) - - return np.array(cal_preds) - - def _make_preds_core_library(self, psm_list=None, calibrate=True, mod_name=None): - """Get predictions stored in library and calibrate them if needed.""" - psm_list = [] if psm_list is None else psm_list - ret_preds = [] - for psm in psm_list: - ret_preds.append(LIBRARY[psm.peptidoform.proforma + "|" + mod_name]) - - if calibrate: - if isinstance(self.calibrate_min, dict): - # if multiple models are used, use the model name to get the - # calibration values (DeepCallC mode) - calibrate_dict, calibrate_min, calibrate_max = ( - self.calibrate_dict[mod_name], - self.calibrate_min[mod_name], - self.calibrate_max[mod_name], - ) - else: - # if only one model is used, use the same calibration values - calibrate_dict, calibrate_min, calibrate_max = ( - self.calibrate_dict, - self.calibrate_min, - self.calibrate_max, - ) - - ret_preds = self._apply_calibration_core( - ret_preds, calibrate_dict, calibrate_min, calibrate_max - ) - - return ret_preds - - def _make_preds_core( - self, - X: np.ndarray | None = None, - X_sum: np.ndarray | None = None, - X_global: np.ndarray | None = None, - X_hc: np.ndarray | None = None, - calibrate=True, - mod_name=None, - ) -> np.ndarray: - """Make predictions.""" - # Check calibration state - if calibrate: - assert self.calibrate_dict, ( - "DeepLC instance is not yet calibrated. Calibrate before making predictions, or " - "use `calibrate=False`" - ) - - if len(X) == 0: - return np.array([]) - - dataset = DeepLCDataset(X, X_sum, X_global, X_hc) - loader = DataLoader(dataset, batch_size=self.batch_num_tf, shuffle=False) - - ret_preds = [] - - mod = torch.load(mod_name, weights_only=False, map_location=torch.device("cpu")) - mod.eval() - try: - with torch.no_grad(): - for batch in loader: - batch_X, batch_X_sum, batch_X_global, batch_X_hc = batch - batch_preds = mod(batch_X, batch_X_sum, batch_X_global, batch_X_hc) - ret_preds.append(batch_preds.detach().cpu().numpy()) - - ret_preds = np.concatenate(ret_preds, axis=0) - except UnboundLocalError: - logger.debug("X is empty, skipping...") - ret_preds = [] - - if calibrate: - if isinstance(self.calibrate_min, dict): - # if multiple models are used, use the model name to get the - # calibration values (DeepCallC mode) - calibrate_dict, calibrate_min, calibrate_max = ( - self.calibrate_dict[mod_name], - self.calibrate_min[mod_name], - self.calibrate_max[mod_name], - ) - else: - # if only one model is used, use the same calibration values - calibrate_dict, calibrate_min, calibrate_max = ( - self.calibrate_dict, - self.calibrate_min, - self.calibrate_max, - ) - - ret_preds = self._apply_calibration_core( - ret_preds, calibrate_dict, calibrate_min, calibrate_max - ) - - gc.collect() - return ret_preds - - def make_preds( - self, - psm_list: PSMList | None = None, - infile: str | Path | None = None, - seq_df: pd.DataFrame | None = None, - calibrate: bool = True, - mod_name: str | None = None, - ): - """ - Make predictions for sequences, in batches if required. - - Parameters - ---------- - psm_list - PSMList object containing the peptidoforms to predict for. - infile - Path to a file containing the peptidoforms to predict for. - seq_df - DataFrame containing the sequences (column:seq), modifications - (column:modifications) and naming (column:index). - calibrate - calibrate predictions or just return the predictions. - mod_name - specify a model to use instead of the model assigned originally to this instance of the - object. - - Returns - ------- - np.array - predictions - """ - if psm_list is None: - if seq_df is not None: - psm_list = _dataframe_to_psm_list(seq_df) - elif infile is not None: - psm_list = _file_to_psm_list(infile) - else: - raise ValueError( - "Either `psm_list` or `seq_df` or `infile` must be provided." - ) - - if len(psm_list) == 0: - logger.warning("No PSMs to predict for.") - return [] - - ret_preds_batches = [] - for psm_list_t in divide_chunks(psm_list, self.batch_num): - ret_preds = [] - - # Extract features for the CNN model - # features = self._extract_features(psm_list_t) - # X_sum, X_global, X_hc, X = unpack_features(features) - X, X_sum, X_global, X_hc = self._prepare_feature_matrices(psm_list_t) - - # Check if model was provided, and if not, whether multiple models are selected in - # the DeepLC object or not. - if mod_name: - model_names = [mod_name] - elif isinstance(self.model, dict): - model_names = [m_name for m_group_name, m_name in self.model.items()] - elif isinstance(self.model, list): - model_names = self.model - elif isinstance(self.model, str): - model_names = [self.model] - else: - raise ValueError("Invalid model name provided.") - - # Get predictions - if len(model_names) > 1: - # Iterate over models if multiple were selected - model_predictions = [] - for model_name in model_names: - model_predictions.append( - self._make_preds_core( - X=X, - X_sum=X_sum, - X_global=X_global, - X_hc=X_hc, - calibrate=calibrate, - mod_name=model_name, - ) - ) - # Average the predictions from all models - ret_preds = np.array( - [sum(a) / len(a) for a in zip(*ret_preds, strict=True)] - ) - # ret_preds = np.mean(model_predictions, axis=0) - - else: - # Use the single model - ret_preds = self._make_preds_core( - X=X, - X_sum=X_sum, - X_global=X_global, - X_hc=X_hc, - calibrate=calibrate, - mod_name=model_names[0], - ) - - ret_preds_batches.append(ret_preds) - - all_ret_preds = np.concatenate(ret_preds_batches, axis=0) - - return all_ret_preds - - def _calibrate_preds_pygam( - self, - measured_tr: np.ndarray, - predicted_tr: np.ndarray, - ) -> tuple[float, float, list[BaseEstimator]]: - """Make calibration curve for predictions using PyGAM.""" - logger.debug("Getting predictions for calibration...") - - # sort two lists, predicted and observed based on measured tr - tr_sort = [ - (mtr, ptr) - for mtr, ptr in sorted( - zip(measured_tr, predicted_tr, strict=True), key=lambda pair: pair[1] - ) - ] - measured_tr = np.array([mtr for mtr, ptr in tr_sort], dtype=np.float32) - predicted_tr = np.array([ptr for mtr, ptr in tr_sort], dtype=np.float32) - - # Fit a SplineTransformer model - if self.deeplc_retrain: - spline = SplineTransformer(degree=2, n_knots=10) - linear_model = LinearRegression() - linear_model.fit(predicted_tr.reshape(-1, 1), measured_tr) - - linear_model_left = linear_model - spline_model = linear_model - linear_model_right = linear_model + t_dir_models = location_retraining_models + os.makedirs(t_dir_models, exist_ok=True) + + # Define path to save fine-tuned model + fine_tuned_model_path = os.path.join(t_dir_models, "fine_tuned_model.pth") + torch.save(fine_tuned_model, fine_tuned_model_path) + model_files = [fine_tuned_model_path] + + # Limit calibration to a subset of PSMs if specified + if sample_for_calibration_curve: + psm_list = random.sample(list(psm_list), sample_for_calibration_curve) + measured_tr = [psm.retention_time for psm in psm_list] + + best_perf = float("inf") + best_calibrator = {} + mod_calibrator = {} + pred_dict = {} + mod_dict = {} + temp_obs = [] + temp_pred = [] + + for model_name in model_files: + logger.debug(f"Trying out the following model: {model_name}") + predicted_tr = predict(psm_list, calibrate=False, model_name=model_name) + + model_calibrator = copy.deepcopy(calibrator) + + if isinstance(model_calibrator, SplineTransformerCalibrator): + model_calibrator.fit(predicted_tr, measured_tr, simplified=fine_tune) else: - spline = SplineTransformer( - degree=4, n_knots=int(len(measured_tr) / 500) + 5 - ) - spline_model = make_pipeline(spline, LinearRegression()) - spline_model.fit(predicted_tr.reshape(-1, 1), measured_tr) - - # Determine the top 10% of data on either end - n_top = int(len(predicted_tr) * 0.1) - - # Fit a linear model on the bottom 10% (left-side extrapolation) - X_left = predicted_tr[:n_top] - y_left = measured_tr[:n_top] - linear_model_left = LinearRegression() - linear_model_left.fit(X_left.reshape(-1, 1), y_left) - - # Fit a linear model on the top 10% (right-side extrapolation) - X_right = predicted_tr[-n_top:] - y_right = measured_tr[-n_top:] - linear_model_right = LinearRegression() - linear_model_right.fit(X_right.reshape(-1, 1), y_right) - - calibrate_min = min(predicted_tr) - calibrate_max = max(predicted_tr) - - return ( - calibrate_min, - calibrate_max, - [linear_model_left, spline_model, linear_model_right], - ) - - def _calibrate_preds_piecewise_linear( - self, - measured_tr: np.ndarray, - predicted_tr: np.ndarray, - use_median: bool = True, - ) -> tuple[float, float, dict[str, tuple[float]]]: - """Make calibration curve for predictions.""" - # sort two lists, predicted and observed based on measured tr - tr_sort = [ - (mtr, ptr) - for mtr, ptr in sorted( - zip(measured_tr, predicted_tr, strict=False), key=lambda pair: pair[1] - ) - ] - measured_tr = np.array([mtr for mtr, ptr in tr_sort]) - predicted_tr = np.array([ptr for mtr, ptr in tr_sort]) - - mtr_mean = [] - ptr_mean = [] - - calibrate_dict = {} - calibrate_min = float("inf") - calibrate_max = 0 - - logger.debug( - "Selecting the data points for calibration (used to fit the linear models between)" - ) - # smooth between observed and predicted - split_val = predicted_tr[-1] / self.split_cal - - for range_calib_number in np.arange(0.0, predicted_tr[-1], split_val): - ptr_index_start = np.argmax(predicted_tr >= range_calib_number) - ptr_index_end = np.argmax(predicted_tr >= range_calib_number + split_val) - - # no points so no cigar... use previous points - if ptr_index_start >= ptr_index_end: - logger.debug( - "Skipping calibration step, due to no points in the " - "predicted range (are you sure about the split size?): " - "%s,%s", - range_calib_number, - range_calib_number + split_val, - ) - continue - - mtr = measured_tr[ptr_index_start:ptr_index_end] - ptr = predicted_tr[ptr_index_start:ptr_index_end] - - if use_median: - mtr_mean.append(np.median(mtr)) - ptr_mean.append(np.median(ptr)) - else: - mtr_mean.append(sum(mtr) / len(mtr)) - ptr_mean.append(sum(ptr) / len(ptr)) - - logger.debug("Fitting the linear models between the points") - - if self.split_cal >= len(measured_tr): - raise CalibrationError( - f"Not enough measured tr ({len(measured_tr)}) for the chosen number of splits " - f"({self.split_cal}). Choose a smaller split_cal parameter or provide more " - "peptides for fitting the calibration curve." - ) - if len(mtr_mean) == 0: - raise CalibrationError( - "The measured tr list is empty, not able to calibrate" - ) - if len(ptr_mean) == 0: - raise CalibrationError( - "The predicted tr list is empty, not able to calibrate" - ) - - # calculate calibration curves - for i in range(0, len(ptr_mean)): - if i >= len(ptr_mean) - 1: - continue - delta_ptr = ptr_mean[i + 1] - ptr_mean[i] - delta_mtr = mtr_mean[i + 1] - mtr_mean[i] - - slope = delta_mtr / delta_ptr - intercept = (-1 * (ptr_mean[i] * slope)) + mtr_mean[i] + model_calibrator.fit(predicted_tr, measured_tr) - # optimized predictions using a dict to find calibration curve very - # fast - for v in np.arange( - round(ptr_mean[i], self.bin_distance), - round(ptr_mean[i + 1], self.bin_distance), - 1 / ((self.bin_distance) * self.dict_cal_divider), - ): - if v < calibrate_min: - calibrate_min = v - if v > calibrate_max: - calibrate_max = v - calibrate_dict[str(round(v, self.bin_distance))] = (slope, intercept) + # TODO: Use pathlib to get the model base name + m_name = model_name.split("/")[-1] - return calibrate_min, calibrate_max, calibrate_dict + # Get new predictions with calibration + preds = predict(psm_list, calibrate=True, model_files=[model_name]) - def calibrate_preds( - self, - psm_list: PSMList | None = None, - infile: str | Path | None = None, - seq_df: pd.DataFrame | None = None, - measured_tr: np.ndarray | None = None, - location_retraining_models: str = "", - sample_for_calibration_curve: int | None = None, - use_median: bool = True, - return_plotly_report=False, - ) -> dict | None: - """ - Find best model and calibrate. + # Save the predictions and calibration parameters + # TODO Double nested dict not required without CALLC functionality? + m_group_name = "_".join(m_name.split("_")[:-1]) + pred_dict.setdefault(m_group_name, {})[model_name] = preds + mod_dict.setdefault(m_group_name, {})[model_name] = model_name + mod_calibrator.setdefault(m_group_name, {})[model_name] = model_calibrator - Parameters - ---------- - psm_list - PSMList object containing the peptidoforms to predict for. - infile - Path to a file containing the peptidoforms to predict for. - seq_df - DataFrame containing the sequences (column:seq), modifications (column:modifications), - naming (column:index), and optionally charge (column:charge) and measured retention - time (column:tr). - measured_tr : list - Measured retention time used for calibration. Should correspond to the PSMs in the - provided PSMs. If None, the measured retention time is taken from the PSMs. - correction_factor : float - correction factor that needs to be applied to the supplied measured tr's - location_retraining_models - Location to save the retraining models; if None, a temporary directory is used. - sample_for_calibration_curve - Number of PSMs to sample for calibration curve; if None, all provided PSMs are used. - use_median - Whether to use the median value in a window to perform calibration; only applies to - piecewise linear calibration, not to PyGAM calibration. - return_plotly_report - Whether to return a plotly report with the calibration results. + # Find best-performing model, including each model's calibration + for m_name in pred_dict: + # TODO: Use numpy methods + preds = [sum(a) / len(a) for a in zip(*list(pred_dict[m_name].values()), strict=True)] + perf = sum(abs(np.array(measured_tr) - np.array(preds))) # MAE - Returns - ------- - dict | None - Dictionary with plotly report information or None. + logger.debug(f"For {m_name} model got a performance of: {perf / len(preds)}") - """ - # Getting PSMs either from psm_list, seq_df, or infile - if psm_list is None: - if seq_df is not None: - psm_list = _dataframe_to_psm_list(seq_df) - elif infile is not None: - psm_list = _file_to_psm_list(infile) - else: - raise ValueError( - "Either `psm_list` or `seq_df` or `infile` must be provided." - ) + if perf < best_perf: # Lower is better, as MAE is used + m_group_name = m_name - # Getting measured retention time either from measured_tr or provided PSMs - if not measured_tr: - measured_tr = [psm.retention_time for psm in psm_list] - if None in measured_tr: - raise ValueError("Not all PSMs have an observed retention time.") + # TODO is deepcopy really required? + best_calibrator = copy.deepcopy(mod_calibrator[m_group_name]) + best_model = copy.deepcopy(mod_dict[m_group_name]) + best_perf = perf - # Ensuring self.model is list of strings - if isinstance(self.model, str): - self.model = [self.model] + temp_obs = np.array(measured_tr) + temp_pred = np.array(preds) - logger.debug("Start to calibrate predictions ...") - logger.debug(f"Ready to find the best model out of: {self.model}") + logger.debug(f"Model with the best performance got selected: {best_model}") - best_perf = float("inf") - best_calibrate_min = 0.0 - best_calibrate_max = 0.0 - best_calibrate_dict = {} - mod_calibrate_dict = {} - mod_calibrate_min_dict = {} - mod_calibrate_max_dict = {} - pred_dict = {} - mod_dict = {} - temp_obs = [] - temp_pred = [] + # TODO: Move to separate function + if return_plotly_report: + import deeplc.plot - if self.deeplc_retrain: - logger.debug("Preparing for model fine-tuning...") - - X, X_sum, X_global, X_hc = self._prepare_feature_matrices(psm_list) - dataset = DeepLCDataset(X, X_sum, X_global, X_hc, np.array(measured_tr)) - - base_model_path = ( - self.model[0] if isinstance(self.model, list) else self.model - ) - base_model = torch.load( - base_model_path, weights_only=False, map_location=torch.device("cpu") - ) - base_model.eval() - - fine_tuner = DeepLCFineTuner( - model=base_model, - train_data=dataset, - batch_size=self.batch_num_tf, - epochs=self.n_epochs, - ) - # fine_tuner._freeze_layers() - fine_tuned_model = fine_tuner.fine_tune() - - if location_retraining_models: - os.makedirs(location_retraining_models, exist_ok=True) - temp_dir_obj = TemporaryDirectory() - t_dir_models = temp_dir_obj.name - self._temp_dir_obj = temp_dir_obj - else: - t_dir_models = location_retraining_models - os.makedirs(t_dir_models, exist_ok=True) - - # Define path to save fine-tuned model - fine_tuned_model_path = os.path.join(t_dir_models, "fine_tuned_model.pth") - torch.save(fine_tuned_model, fine_tuned_model_path) - self.model = [fine_tuned_model_path] - - # Limit calibration to a subset of PSMs if specified - if sample_for_calibration_curve: - psm_list = random.sample(list(psm_list), sample_for_calibration_curve) - measured_tr = [psm.retention_time for psm in psm_list] - - for model_name in self.model: - logger.debug(f"Trying out the following model: {model_name}") - predicted_tr = self.make_preds( - psm_list, calibrate=False, mod_name=model_name - ) - - if self.pygam_calibration: - calibrate_output = self._calibrate_preds_pygam( - measured_tr, predicted_tr - ) - else: - calibrate_output = self._calibrate_preds_piecewise_linear( - measured_tr, predicted_tr, use_median=use_median - ) - self.calibrate_min, self.calibrate_max, self.calibrate_dict = ( - calibrate_output - ) - # TODO: Currently, calibration dict can be both a dict (linear) or a list of models - # (PyGAM)... This should be handled better in the future. - - # Skip this model if calibrate_dict is empty - # TODO: Should this do something when using PyGAM and calibrate_dict is a list? - if ( - isinstance(self.calibrate_dict, dict) - and len(self.calibrate_dict.keys()) == 0 - ): - continue - - m_name = model_name.split("/")[-1] - - # Get new predictions with calibration - preds = self.make_preds( - psm_list, calibrate=True, seq_df=seq_df, mod_name=model_name - ) - - m_group_name = ( - "deepcallc" if self.deepcallc_mod else "_".join(m_name.split("_")[:-1]) - ) - m = model_name - try: - pred_dict[m_group_name][m] = preds - mod_dict[m_group_name][m] = m - mod_calibrate_dict[m_group_name][m] = self.calibrate_dict - mod_calibrate_min_dict[m_group_name][m] = self.calibrate_min - mod_calibrate_max_dict[m_group_name][m] = self.calibrate_max - except KeyError: - pred_dict[m_group_name] = {} - mod_dict[m_group_name] = {} - mod_calibrate_dict[m_group_name] = {} - mod_calibrate_min_dict[m_group_name] = {} - mod_calibrate_max_dict[m_group_name] = {} - - pred_dict[m_group_name][m] = preds - mod_dict[m_group_name][m] = m - mod_calibrate_dict[m_group_name][m] = self.calibrate_dict - mod_calibrate_min_dict[m_group_name][m] = self.calibrate_min - mod_calibrate_max_dict[m_group_name][m] = self.calibrate_max - - for m_name in pred_dict: - preds = [ - sum(a) / len(a) - for a in zip(*list(pred_dict[m_name].values()), strict=True) - ] - if len(measured_tr) == 0: - perf = sum(abs(seq_df["tr"] - preds)) - else: - perf = sum(abs(np.array(measured_tr) - np.array(preds))) - - logger.debug( - f"For {m_name} model got a performance of: {perf / len(preds)}" - ) - - if perf < best_perf: - m_group_name = "deepcallc" if self.deepcallc_mod else m_name - - # TODO is deepcopy really required? - best_calibrate_dict = copy.deepcopy(mod_calibrate_dict[m_group_name]) - best_calibrate_min = copy.deepcopy(mod_calibrate_min_dict[m_group_name]) - best_calibrate_max = copy.deepcopy(mod_calibrate_max_dict[m_group_name]) - - best_model = copy.deepcopy(mod_dict[m_group_name]) - best_perf = perf - - temp_obs = np.array(measured_tr) - temp_pred = np.array(preds) - - self.calibrate_dict = best_calibrate_dict - self.calibrate_min = best_calibrate_min - self.calibrate_max = best_calibrate_max - self.model = best_model - - if self.deepcallc_mod: - self.deepcallc_model = train_elastic_net( - pd.DataFrame(pred_dict["deepcallc"]), seq_df["tr"] - ) - - logger.debug(f"Model with the best performance got selected: {best_model}") - - if return_plotly_report: - import deeplc.plot - - plotly_return_dict = {} - plotly_df = pd.DataFrame( - list(zip(temp_obs, temp_pred, strict=True)), - columns=[ - "Observed retention time", - "Predicted retention time", - ], - ) - plotly_return_dict["scatter"] = deeplc.plot.scatter(plotly_df) - plotly_return_dict["baseline_dist"] = deeplc.plot.distribution_baseline( - plotly_df - ) - return plotly_return_dict - - return None - - -def _get_pool(n_jobs: int) -> multiprocessing.Pool | multiprocessing.dummy.Pool: # type: ignore - """Get a Pool object for parallel processing.""" - if multiprocessing.current_process().daemon: - logger.warning( - "DeepLC is running in a daemon process. Disabling multiprocessing as daemonic " - "processes can't have children." + plotly_return_dict = {} + plotly_df = pd.DataFrame( + list(zip(temp_obs, temp_pred, strict=True)), + columns=[ + "Observed retention time", + "Predicted retention time", + ], ) - return multiprocessing.dummy.Pool(1) - elif n_jobs == 1: - return multiprocessing.dummy.Pool(1) + plotly_return_dict["scatter"] = deeplc.plot.scatter(plotly_df) + plotly_return_dict["baseline_dist"] = deeplc.plot.distribution_baseline(plotly_df) else: - max_n_jobs = multiprocessing.cpu_count() - if n_jobs > max_n_jobs: - logger.warning( - f"Number of jobs ({n_jobs}) exceeds the number of CPUs ({max_n_jobs}). " - f"Setting number of jobs to {max_n_jobs}." - ) - return multiprocessing.Pool(max_n_jobs) - else: - return multiprocessing.Pool(n_jobs) - - -def _lists_to_psm_list( - sequences: list[str], - modifications: list[str | None], - identifiers: list[str], - charges: list[int] | None, - retention_times: list[float] | None = None, - n_jobs: int = 1, -) -> PSMList: - """Convert lists into a PSMList using Dask for parallel processing.""" - if not charges: - charges = [None] * len(sequences) - - if not retention_times: - retention_times = [None] * len(sequences) - - def create_psm(args): - sequence, modifications, identifier, charge, retention_time = args - return PSM( - peptidoform=peprec_to_proforma(sequence, modifications, charge=charge), - spectrum_id=identifier, - retention_time=retention_time, - ) - - args_list = list( - zip( - sequences, modifications, identifiers, charges, retention_times, strict=True - ) - ) - tasks = [delayed(create_psm)(args) for args in args_list] - list_of_psms = list(compute(*tasks, scheduler="processes")) - return PSMList(psm_list=list_of_psms) - - -# TODO: I'm not sure what the expected behavior was for charges; they were parsed -# from the dataframe, while the passed list was used to check whether it they should get -# parsed. I'll allow both with a priority for the passed charges. -def _dataframe_to_psm_list( - dataframe: pd.DataFrame, - charges: list[int] | None, - n_jobs: int = 1, -) -> PSMList: - """Convert a DataFrame with sequences, modifications, and identifiers into a PSMList.""" - sequences = dataframe["seq"] - modifications = dataframe["modifications"] - identifiers = dataframe.index - retention_times = dataframe["tr"] if "tr" in dataframe.columns else None - - if not charges and "charge" in dataframe.columns: - charges = dataframe["charge"] + plotly_return_dict = None - return _lists_to_psm_list( - sequences, modifications, identifiers, charges, retention_times, n_jobs=n_jobs - ) + return best_model, best_calibrator, plotly_return_dict +# TODO: Move to psm_utils? def _file_to_psm_list(input_file: str | Path) -> PSMList: """Read a file into a PSMList, optionally mapping MaxQuant modifications labels.""" psm_list = read_file(input_file) diff --git a/deeplc/feat_extractor.py b/deeplc/features.py similarity index 94% rename from deeplc/feat_extractor.py rename to deeplc/features.py index ecda640..517ab55 100644 --- a/deeplc/feat_extractor.py +++ b/deeplc/features.py @@ -7,7 +7,7 @@ from re import sub import numpy as np -from psm_utils.psm import Peptidoform +from psm_utils import Peptidoform, PSMList from pyteomics import mass logger = logging.getLogger(__name__) @@ -208,6 +208,20 @@ def encode_peptidoform( } +def extract_features( + peptidoforms: list[str | Peptidoform] | PSMList, + predict_ccs: bool = False, +) -> dict[str, dict[int, np.ndarray]]: + """Extract features for all peptidoforms.""" + if isinstance(peptidoforms, PSMList): + peptidoforms = [psm.peptidoform for psm in peptidoforms] + + encodings = [encode_peptidoform(pf, predict_ccs=predict_ccs) for pf in peptidoforms] + aggregated_encodings = aggregate_encodings(encodings) + + return aggregated_encodings + + def aggregate_encodings( encodings: list[dict[str, np.ndarray]], ) -> dict[str, dict[int, np.ndarray]]: diff --git a/deeplc/trainl3.py b/deeplc/trainl3.py deleted file mode 100644 index 1682485..0000000 --- a/deeplc/trainl3.py +++ /dev/null @@ -1,106 +0,0 @@ -""" -Robbin Bouwmeester - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. - -This project was made possible by MASSTRPLAN. MASSTRPLAN received funding -from the Marie Sklodowska-Curie EU Framework for Research and Innovation -Horizon 2020, under Grant Agreement No. 675132. -""" - -try: - from sklearn.base import clone - from sklearn.linear_model import ElasticNet - from sklearn.model_selection import GridSearchCV -except ImportError: - _has_sklearn = False -else: - _has_sklearn = True - - -def train_elastic_net(X, y, n_jobs=16, cv=None): - """ - Function that trains Layer 3 of CALLC (elastic net) - - Parameters - ---------- - X : pd.DataFrame - dataframe with molecular descriptors - y : pd.Series - vector with observed retention times - n_jobs : int - number of jobs to spawn - cv : sklearn.model_selection.KFold - cv object - - Returns - ------- - sklearn.linear_model.ElasticNet - elastic net model trained in Layer 3 - list - list with predictions - list - list with features used to train Layer 3 - """ - if not _has_sklearn: - raise ImportError( - "This function requires the optional dependency `scikit-learn`. Run `pip install " - "scikit-learn` and try again." - ) - - model = ElasticNet() - crossv_mod = clone(model) - ret_mod = clone(model) - - set_reg = [ - 0.01, - 1.0, - 10.0, - 100.0, - 1000.0, - 10000.0, - 10000.0, - 100000.0, - 1000000.0, - 1000000000, - 1000000, - ] - set_reg.extend([x / 2 for x in set_reg]) - set_reg.extend([x / 3 for x in set_reg]) - - params = { - "alpha": set_reg, - "l1_ratio": [0.01, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1.0], - "copy_X": [True], - "normalize": [False], - "positive": [True], - "fit_intercept": [True, False], - } - - grid = GridSearchCV( - model, - params, - cv=cv, - scoring="neg_mean_absolute_error", - verbose=0, - n_jobs=n_jobs, - refit=True, - ) - grid.fit(X, y) - - crossv_mod.set_params(**grid.best_params_) - - ret_mod.set_params(**grid.best_params_) - ret_mod.fit(X, y) - - return ret_mod diff --git a/tests/test_feat_extractor.py b/tests/test_feat_extractor.py index 8ecc39e..4ef3062 100644 --- a/tests/test_feat_extractor.py +++ b/tests/test_feat_extractor.py @@ -1,7 +1,7 @@ import numpy as np from psm_utils.psm import Peptidoform -from deeplc.feat_extractor import encode_peptidoform +from deeplc.features import encode_peptidoform def _check_result_structure(result: dict[str, np.ndarray]) -> None: From 12f332aca3eaaa75482da2f5663c7d0f51848218 Mon Sep 17 00:00:00 2001 From: RalfG Date: Thu, 5 Jun 2025 12:25:02 +0200 Subject: [PATCH 2/4] Generate features as part of dataloader for optimized multiprocessing and batching --- deeplc/__init__.py | 8 +- deeplc/_data.py | 73 +++++++++--------- deeplc/{features.py => _features.py} | 48 ++---------- deeplc/{_calibration.py => calibration.py} | 32 +++++++- deeplc/deeplc.py | 86 +++++++--------------- 5 files changed, 99 insertions(+), 148 deletions(-) rename deeplc/{features.py => _features.py} (84%) rename deeplc/{_calibration.py => calibration.py} (93%) diff --git a/deeplc/__init__.py b/deeplc/__init__.py index e4fb659..be17e9d 100644 --- a/deeplc/__init__.py +++ b/deeplc/__init__.py @@ -1,8 +1,8 @@ -__all__ = ["DeepLC"] +# __all__ = ["DeepLC"] -from importlib.metadata import version +# from importlib.metadata import version -__version__ = version("deeplc") +# __version__ = version("deeplc") -from deeplc.deeplc import DeepLC +# from deeplc.deeplc import DeepLC diff --git a/deeplc/_data.py b/deeplc/_data.py index e5a947e..952ad84 100644 --- a/deeplc/_data.py +++ b/deeplc/_data.py @@ -1,49 +1,44 @@ import torch +from psm_utils.psm_list import PSMList from torch.utils.data import Dataset +from deeplc._features import encode_peptidoform -class DeepLCDataset(Dataset): - """ - Custom Dataset class for DeepLC used for loading features from peptide sequences. - - Parameters - ---------- - X : ndarray - Feature matrix for input data. - X_sum : ndarray - Feature matrix for sum of input data. - X_global : ndarray - Feature matrix for global input data. - X_hc : ndarray - Feature matrix for high-order context features. - target : ndarray, optional - The target retention times. Default is None. - """ - def __init__(self, X, X_sum, X_global, X_hc, target=None): - self.X = torch.from_numpy(X).float() - self.X_sum = torch.from_numpy(X_sum).float() - self.X_global = torch.from_numpy(X_global).float() - self.X_hc = torch.from_numpy(X_hc).float() +class DeepLCDataset(Dataset): + """Custom Dataset class for DeepLC used for loading features from peptide sequences.""" - if target is not None: - self.target = torch.from_numpy(target).float() # Add target values if provided + def __init__(self, psm_list: PSMList, add_ccs_features: bool = False): + self.psm_list = psm_list + self.add_ccs_features = add_ccs_features + + self._targets = self._get_targets(psm_list) + + @staticmethod + def _get_targets(psm_list: PSMList) -> torch.Tensor | None: + retention_times = [psm.retention_time for psm in psm_list] + if None not in retention_times: + return torch.tensor(retention_times, dtype=torch.float32) else: - self.target = None # If no target is provided, set it to None + return None def __len__(self): - return self.X.shape[0] + return len(self.psm_list) - def __getitem__(self, idx): - if self.target is not None: - # Return both features and target during training - return ( - self.X[idx], - self.X_sum[idx], - self.X_global[idx], - self.X_hc[idx], - self.target[idx], - ) - else: - # Return only features during prediction - return (self.X[idx], self.X_sum[idx], self.X_global[idx], self.X_hc[idx]) + def __getitem__(self, idx) -> tuple: + if not isinstance(idx, int): + raise TypeError(f"Index must be an integer, got {type(idx)} instead.") + features = encode_peptidoform( + self.psm_list[idx].peptidoform, + add_ccs_features=self.add_ccs_features + ) + feature_tuples = ( + torch.from_numpy(features["matrix"]).to(dtype=torch.float32), + torch.from_numpy(features["matrix_sum"]).to(dtype=torch.float32), + torch.from_numpy(features["matrix_global"]).to(dtype=torch.float32), + torch.from_numpy(features["matrix_hc"]).to(dtype=torch.float32), + ) + targets = self._targets[idx] if self._targets is not None else torch.full_like( + feature_tuples[0], fill_value=float('nan'), dtype=torch.float32 + ) + return feature_tuples, targets diff --git a/deeplc/features.py b/deeplc/_features.py similarity index 84% rename from deeplc/features.py rename to deeplc/_features.py index 517ab55..8d77156 100644 --- a/deeplc/features.py +++ b/deeplc/_features.py @@ -148,7 +148,7 @@ def _compute_rolling_sum(matrix: np.ndarray, n: int = 2) -> np.ndarray: def encode_peptidoform( peptidoform: Peptidoform | str, - predict_ccs: bool = False, + add_ccs_features: bool = False, padding_length: int = 60, positions: set[int] | None = None, positions_pos: set[int] | None = None, @@ -188,7 +188,7 @@ def encode_peptidoform( matrix_all = np.sum(std_matrix, axis=0) matrix_all = np.append(matrix_all, seq_len) - if predict_ccs: + if add_ccs_features: matrix_all = np.append(matrix_all, (seq.count("H")) / seq_len) matrix_all = np.append( matrix_all, (seq.count("F") + seq.count("W") + seq.count("Y")) / seq_len @@ -198,50 +198,12 @@ def encode_peptidoform( matrix_all = np.append(matrix_all, charge) matrix_sum = _compute_rolling_sum(std_matrix.T, n=2)[:, ::2].T + + matrix_global = np.concatenate([matrix_all, pos_matrix.flatten()]) return { "matrix": std_matrix, "matrix_sum": matrix_sum, - "matrix_all": matrix_all, - "pos_matrix": pos_matrix.flatten(), + "matrix_global": matrix_global, "matrix_hc": onehot_matrix, } - - -def extract_features( - peptidoforms: list[str | Peptidoform] | PSMList, - predict_ccs: bool = False, -) -> dict[str, dict[int, np.ndarray]]: - """Extract features for all peptidoforms.""" - if isinstance(peptidoforms, PSMList): - peptidoforms = [psm.peptidoform for psm in peptidoforms] - - encodings = [encode_peptidoform(pf, predict_ccs=predict_ccs) for pf in peptidoforms] - aggregated_encodings = aggregate_encodings(encodings) - - return aggregated_encodings - - -def aggregate_encodings( - encodings: list[dict[str, np.ndarray]], -) -> dict[str, dict[int, np.ndarray]]: - """Aggregate list of encodings into single dictionary.""" - return {key: {i: enc[key] for i, enc in enumerate(encodings)} for key in encodings[0]} - - -def unpack_features( - features: dict[str, np.ndarray], -) -> tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray]: - """Unpack dictionary with features to numpy arrays.""" - X_sum = np.stack(list(features["matrix_sum"].values())) - X_global = np.concatenate( - ( - np.stack(list(features["matrix_all"].values())), - np.stack(list(features["pos_matrix"].values())), - ), - axis=1, - ) - X_hc = np.stack(list(features["matrix_hc"].values())) - X_main = np.stack(list(features["matrix"].values())) - - return X_sum, X_global, X_hc, X_main diff --git a/deeplc/_calibration.py b/deeplc/calibration.py similarity index 93% rename from deeplc/_calibration.py rename to deeplc/calibration.py index 1c67f14..d94f21b 100644 --- a/deeplc/_calibration.py +++ b/deeplc/calibration.py @@ -15,8 +15,8 @@ LOGGER = logging.getLogger(__name__) -class Calibrator(ABC): - """Abstract base class for retention time calibrators.""" +class Calibration(ABC): + """Abstract base class for retention time calibration.""" @abstractmethod def __init__(self, *args, **kwargs): @@ -27,9 +27,33 @@ def fit(measured_tr: np.ndarray, predicted_tr: np.ndarray) -> None: ... @abstractmethod def transform(tr: np.ndarray) -> np.ndarray: ... + +class IdentityCalibration(Calibration): + """No calibration, just returns the predicted retention times.""" -class PiecewiseLinearCalibrator(Calibrator): + def fit(self, measured_tr: np.ndarray, predicted_tr: np.ndarray) -> None: + """No fitting required for NoCalibration.""" + pass + + def transform(self, tr: np.ndarray) -> np.ndarray: + """ + Transform the predicted retention times without any calibration. + + Parameters + ---------- + tr + Retention times to be transformed. + + Returns + ------- + np.ndarray + Transformed retention times (same as input). + """ + return tr + + +class PiecewiseLinearCalibration(Calibration): def __init__( self, split_cal: int = 50, @@ -202,7 +226,7 @@ def transform(self, tr: np.ndarray) -> np.ndarray: return np.array(cal_preds) -class SplineTransformerCalibrator(Calibrator): +class SplineTransformerCalibration(Calibration): def __init__(self): """SplineTransformer calibration for retention time.""" super().__init__() diff --git a/deeplc/deeplc.py b/deeplc/deeplc.py index f72fa61..1d92dda 100644 --- a/deeplc/deeplc.py +++ b/deeplc/deeplc.py @@ -36,10 +36,9 @@ from psm_utils.io import read_file from torch.utils.data import DataLoader -from deeplc._calibration import Calibrator, SplineTransformerCalibrator +from deeplc.calibration import Calibration, SplineTransformerCalibration from deeplc._data import DeepLCDataset from deeplc._finetune import DeepLCFineTuner -from deeplc.features import extract_features, unpack_features # If CLI/GUI/frozen: disable warnings before importing IS_CLI_GUI = os.path.basename(sys.argv[0]) in ["deeplc", "deeplc-gui"] @@ -64,8 +63,8 @@ def predict( psm_list: PSMList | None = None, model_files: str | list[str] | None = None, - calibrator: Calibrator | None = None, - batch_size: int | None = None, + calibrator: Calibration | None = None, + batch_size: int = 1024, single_model_mode: bool = False, ): """ @@ -94,10 +93,8 @@ def predict( if len(psm_list) == 0: return [] - # Extract features - features = extract_features(psm_list) - X_sum, X_global, X_hc, X_main = unpack_features(features) - dataset = DeepLCDataset(X_main, X_sum, X_global, X_hc) + # Setup dataset and dataloader + dataset = DeepLCDataset(psm_list) loader = DataLoader(dataset, batch_size=batch_size, shuffle=False) if model_files is not None: @@ -114,16 +111,16 @@ def predict( model_predictions = [] for model_f in model_files: # Load model - mod = torch.load(model_f, weights_only=False, map_location=torch.device("cpu")) - mod.eval() + model = torch.load(model_f, weights_only=False, map_location=torch.device("cpu")) + model.eval() # Predict ret_preds = [] with torch.no_grad(): - for batch in loader: - batch_X, batch_X_sum, batch_X_global, batch_X_hc = batch - batch_preds = mod(batch_X, batch_X_sum, batch_X_global, batch_X_hc) + for features, _ in loader: + batch_preds = model(*features) ret_preds.append(batch_preds.detach().cpu().numpy()) + raise Exception() # Concatenate predictions ret_preds = np.concatenate(ret_preds, axis=0) @@ -144,7 +141,6 @@ def predict( # TODO: Split-of transfer learning? def calibrate( psm_list: PSMList | None = None, - measured_tr: np.ndarray | None = None, model_files: str | list[str] | None = None, location_retraining_models: str = "", sample_for_calibration_curve: int | None = None, @@ -153,7 +149,7 @@ def calibrate( batch_size: int = int(1e6), fine_tune: bool = False, n_epochs: int = 20, - calibrator: Calibrator | None = None, + calibrator: Calibration | None = None, ) -> dict | None: """ Find best model and calibrate. @@ -162,23 +158,25 @@ def calibrate( ---------- psm_list PSMList object containing the peptidoforms to predict for. - measured_tr - Measured retention time used for calibration. Should correspond to the PSMs in the - provided PSMs. If None, the measured retention time is taken from the PSMs. model_files Path to one or mode models to test and calibrat for. If a list of models is passed, the best performing one on the calibration data will be selected. - correction_factor - correction factor that needs to be applied to the supplied measured tr's location_retraining_models Location to save the retraining models; if None, a temporary directory is used. sample_for_calibration_curve Number of PSMs to sample for calibration curve; if None, all provided PSMs are used. - use_median - Whether to use the median value in a window to perform calibration; only applies to - piecewise linear calibration, not to PyGAM calibration. return_plotly_report Whether to return a plotly report with the calibration results. + n_jobs + Number of jobs to use for parallel processing; if None, the number of CPU cores is used. + batch_size + Batch size to use for training and prediction; default is 1e6, which means all data is + processed in one batch. + fine_tune + Whether to fine-tune the model on the provided PSMs. If True, the first model in + model_files will be used for fine-tuning. + n_epochs + Number of epochs to use for fine-tuning the model. Default is 20. Returns ------- @@ -186,16 +184,13 @@ def calibrate( Dictionary with plotly report information or None. """ - # Getting measured retention time either from measured_tr or provided PSMs - if not measured_tr: - measured_tr = [psm.retention_time for psm in psm_list] - if None in measured_tr: - raise ValueError("Not all PSMs have an observed retention time.") + if None in psm_list["retention_time"]: + raise ValueError("Not all PSMs have an observed retention time.") n_jobs = multiprocessing.cpu_count() if n_jobs is None else n_jobs if calibrator is None: - calibrator = SplineTransformerCalibrator() + calibrator = SplineTransformerCalibration() # Ensuring self.model is list of strings model_files = model_files or DEFAULT_MODELS @@ -206,12 +201,8 @@ def calibrate( logger.debug(f"Ready to find the best model out of: {model_files}") if fine_tune: - logger.debug("Preparing for model fine-tuning...") - - features = extract_features(psm_list) - X_sum, X_global, X_hc, X_main = unpack_features(features) - - dataset = DeepLCDataset(X_main, X_sum, X_global, X_hc, np.array(measured_tr)) + logger.debug("Starting model fine-tuning...") + dataset = DeepLCDataset(psm_list) base_model_path = model_files[0] base_model = torch.load( @@ -251,8 +242,6 @@ def calibrate( mod_calibrator = {} pred_dict = {} mod_dict = {} - temp_obs = [] - temp_pred = [] for model_name in model_files: logger.debug(f"Trying out the following model: {model_name}") @@ -260,7 +249,7 @@ def calibrate( model_calibrator = copy.deepcopy(calibrator) - if isinstance(model_calibrator, SplineTransformerCalibrator): + if isinstance(model_calibrator, SplineTransformerCalibration): model_calibrator.fit(predicted_tr, measured_tr, simplified=fine_tune) else: model_calibrator.fit(predicted_tr, measured_tr) @@ -294,29 +283,10 @@ def calibrate( best_model = copy.deepcopy(mod_dict[m_group_name]) best_perf = perf - temp_obs = np.array(measured_tr) - temp_pred = np.array(preds) - logger.debug(f"Model with the best performance got selected: {best_model}") - # TODO: Move to separate function - if return_plotly_report: - import deeplc.plot - - plotly_return_dict = {} - plotly_df = pd.DataFrame( - list(zip(temp_obs, temp_pred, strict=True)), - columns=[ - "Observed retention time", - "Predicted retention time", - ], - ) - plotly_return_dict["scatter"] = deeplc.plot.scatter(plotly_df) - plotly_return_dict["baseline_dist"] = deeplc.plot.distribution_baseline(plotly_df) - else: - plotly_return_dict = None - return best_model, best_calibrator, plotly_return_dict + return best_model, best_calibrator # TODO: Move to psm_utils? From 8ebbd6c841bf56d5398c8403ae6a8d6dcfcf37d9 Mon Sep 17 00:00:00 2001 From: RalfG Date: Fri, 6 Jun 2025 19:24:53 +0200 Subject: [PATCH 3/4] Simplify predict function and only predict unique peptidoforms: - Remove ensemble-model mode (different kernel sizes). - Split of model loading to separate function. - Make dataset take peptidoforms instead of PSMs. - Get unique peptidoforms before predicting and keep inverse index for mapping back predictions to input PSM list. --- deeplc/_data.py | 43 ++++++++------ deeplc/deeplc.py | 151 +++++++++++++++++++++++++---------------------- 2 files changed, 107 insertions(+), 87 deletions(-) diff --git a/deeplc/_data.py b/deeplc/_data.py index 952ad84..fcbc2b3 100644 --- a/deeplc/_data.py +++ b/deeplc/_data.py @@ -1,5 +1,6 @@ +import numpy as np import torch -from psm_utils.psm_list import PSMList +from psm_utils import Peptidoform, PSMList from torch.utils.data import Dataset from deeplc._features import encode_peptidoform @@ -8,28 +9,24 @@ class DeepLCDataset(Dataset): """Custom Dataset class for DeepLC used for loading features from peptide sequences.""" - def __init__(self, psm_list: PSMList, add_ccs_features: bool = False): - self.psm_list = psm_list + def __init__( + self, + peptidoforms: list[Peptidoform | str], + target_retention_times: np.ndarray | None = None, + add_ccs_features: bool = False + ): + self.peptidoforms = peptidoforms + self.target_retention_times = target_retention_times self.add_ccs_features = add_ccs_features - - self._targets = self._get_targets(psm_list) - @staticmethod - def _get_targets(psm_list: PSMList) -> torch.Tensor | None: - retention_times = [psm.retention_time for psm in psm_list] - if None not in retention_times: - return torch.tensor(retention_times, dtype=torch.float32) - else: - return None - def __len__(self): - return len(self.psm_list) + return len(self.peptidoforms) def __getitem__(self, idx) -> tuple: if not isinstance(idx, int): raise TypeError(f"Index must be an integer, got {type(idx)} instead.") features = encode_peptidoform( - self.psm_list[idx].peptidoform, + self.peptidoforms[idx], add_ccs_features=self.add_ccs_features ) feature_tuples = ( @@ -38,7 +35,19 @@ def __getitem__(self, idx) -> tuple: torch.from_numpy(features["matrix_global"]).to(dtype=torch.float32), torch.from_numpy(features["matrix_hc"]).to(dtype=torch.float32), ) - targets = self._targets[idx] if self._targets is not None else torch.full_like( - feature_tuples[0], fill_value=float('nan'), dtype=torch.float32 + targets = ( + self.target_retention_times[idx] + if self.target_retention_times is not None + else torch.full_like( + feature_tuples[0], fill_value=float('nan'), dtype=torch.float32 + ) ) return feature_tuples, targets + + +def get_targets(psm_list: PSMList) -> np.ndarray | None: + retention_times = psm_list["retention_time"] + if None not in retention_times: + return torch.tensor(retention_times, dtype=torch.float32) + else: + return None diff --git a/deeplc/deeplc.py b/deeplc/deeplc.py index 1d92dda..a0a3586 100644 --- a/deeplc/deeplc.py +++ b/deeplc/deeplc.py @@ -34,11 +34,13 @@ import torch from psm_utils import PSM, Peptidoform, PSMList from psm_utils.io import read_file +from rich.progress import track +from torch.nn import Module from torch.utils.data import DataLoader -from deeplc.calibration import Calibration, SplineTransformerCalibration -from deeplc._data import DeepLCDataset +from deeplc._data import DeepLCDataset, get_targets from deeplc._finetune import DeepLCFineTuner +from deeplc.calibration import Calibration, SplineTransformerCalibration # If CLI/GUI/frozen: disable warnings before importing IS_CLI_GUI = os.path.basename(sys.argv[0]) in ["deeplc", "deeplc-gui"] @@ -49,12 +51,8 @@ # Default models, will be used if no other is specified. If no best model is # selected during calibration, the first model in the list will be used. DEEPLC_DIR = os.path.dirname(os.path.realpath(__file__)) -DEFAULT_MODELS = [ - "mods/full_hc_PXD005573_pub_1fd8363d9af9dcad3be7553c39396960.pt", - "mods/full_hc_PXD005573_pub_8c22d89667368f2f02ad996469ba157e.pt", - "mods/full_hc_PXD005573_pub_cb975cfdd4105f97efa0b3afffe075cc.pt", -] -DEFAULT_MODELS = [os.path.join(DEEPLC_DIR, m) for m in DEFAULT_MODELS] +DEFAULT_MODEL = "mods/full_hc_PXD005573_pub_1fd8363d9af9dcad3be7553c39396960.pt" +DEFAULT_MODEL = os.path.join(DEEPLC_DIR, DEFAULT_MODEL) logger = logging.getLogger(__name__) @@ -62,10 +60,9 @@ def predict( psm_list: PSMList | None = None, - model_files: str | list[str] | None = None, - calibrator: Calibration | None = None, + model: str | list[str] | None = None, + num_workers: int = 4, batch_size: int = 1024, - single_model_mode: bool = False, ): """ Make predictions for sequences, in batches if required. @@ -74,15 +71,10 @@ def predict( ---------- psm_list PSMList object containing the peptidoforms to predict for. - model_files - Model file (or files) to use for prediction. If None, the default model is used. - calibrator - Calibrator object to use for calibration. If None, no calibration is performed. + model_file + Model file to use for prediction. If None, the default model is used. batch_size - How many samples per batch to load (default: 1). - single_model_mode - Whether to use a single model instead of multiple default models. Only applies if - model_file is None. + How many samples per batch to load (default: 1024). Returns ------- @@ -90,52 +82,38 @@ def predict( predictions """ - if len(psm_list) == 0: - return [] + # Shortcut if empty PSMList is provided + if not psm_list: + return np.array([]) - # Setup dataset and dataloader - dataset = DeepLCDataset(psm_list) - loader = DataLoader(dataset, batch_size=batch_size, shuffle=False) - - if model_files is not None: - if isinstance(model_files, str): - model_files = [model_files] - elif isinstance(model_files, list): - model_files = model_files - else: - raise ValueError("Invalid model name provided.") - else: - model_files = [DEFAULT_MODELS[0]] if single_model_mode else DEFAULT_MODELS + # Avoid predicting repeated PSMs + unique_peptidoforms, inverse_indices = _get_unique_peptidoforms(psm_list) - # Get predictions; iterate over models if multiple were selected - model_predictions = [] - for model_f in model_files: - # Load model - model = torch.load(model_f, weights_only=False, map_location=torch.device("cpu")) - model.eval() + # Setup dataset and dataloader + dataset = DeepLCDataset(unique_peptidoforms, target_retention_times=None) + loader = DataLoader(dataset, num_workers=num_workers, batch_size=batch_size, shuffle=False) - # Predict - ret_preds = [] - with torch.no_grad(): - for features, _ in loader: - batch_preds = model(*features) - ret_preds.append(batch_preds.detach().cpu().numpy()) - raise Exception() + # Get model files + model = model or DEFAULT_MODEL - # Concatenate predictions - ret_preds = np.concatenate(ret_preds, axis=0) + # Check device availability + device = torch.device("cuda" if torch.cuda.is_available() else "cpu") - # TODO: Bring outside of model loop? - # Calibrate - if calibrator is not None: - ret_preds = calibrator.transform(ret_preds) + # Load model on specified device + model = _load_model(model=model, device=device, eval=True) - model_predictions.append(ret_preds) + # Predict + predictions = [] + with torch.no_grad(): + for features, _ in track(loader): + features = [feature_tensor.to(device) for feature_tensor in features] + batch_preds = model(*features) + predictions.append(batch_preds.detach().cpu().numpy()) - # Average the predictions from all models - averaged_predictions = np.mean(model_predictions, axis=0) + # Concatenate predictions and reorder to match original PSMList order + predictions = np.concatenate(predictions, axis=0)[inverse_indices] - return averaged_predictions + return predictions # TODO: Split-of transfer learning? @@ -144,13 +122,12 @@ def calibrate( model_files: str | list[str] | None = None, location_retraining_models: str = "", sample_for_calibration_curve: int | None = None, - return_plotly_report=False, n_jobs: int | None = None, batch_size: int = int(1e6), fine_tune: bool = False, n_epochs: int = 20, calibrator: Calibration | None = None, -) -> dict | None: +) -> tuple[str, dict[str, Calibration]]: """ Find best model and calibrate. @@ -159,14 +136,12 @@ def calibrate( psm_list PSMList object containing the peptidoforms to predict for. model_files - Path to one or mode models to test and calibrat for. If a list of models is passed, + Path to one or mode models to test and calibrate for. If a list of models is passed, the best performing one on the calibration data will be selected. location_retraining_models Location to save the retraining models; if None, a temporary directory is used. sample_for_calibration_curve Number of PSMs to sample for calibration curve; if None, all provided PSMs are used. - return_plotly_report - Whether to return a plotly report with the calibration results. n_jobs Number of jobs to use for parallel processing; if None, the number of CPU cores is used. batch_size @@ -180,8 +155,6 @@ def calibrate( Returns ------- - dict | None - Dictionary with plotly report information or None. """ if None in psm_list["retention_time"]: @@ -193,7 +166,7 @@ def calibrate( calibrator = SplineTransformerCalibration() # Ensuring self.model is list of strings - model_files = model_files or DEFAULT_MODELS + model_files = model_files or DEFAULT_MODEL if isinstance(model_files, str): model_files = [model_files] @@ -239,13 +212,13 @@ def calibrate( best_perf = float("inf") best_calibrator = {} - mod_calibrator = {} + model_calibrators = {} pred_dict = {} mod_dict = {} for model_name in model_files: logger.debug(f"Trying out the following model: {model_name}") - predicted_tr = predict(psm_list, calibrate=False, model_name=model_name) + predicted_tr = predict(psm_list, calibrator=calibrator, model_name=model_name) model_calibrator = copy.deepcopy(calibrator) @@ -265,7 +238,7 @@ def calibrate( m_group_name = "_".join(m_name.split("_")[:-1]) pred_dict.setdefault(m_group_name, {})[model_name] = preds mod_dict.setdefault(m_group_name, {})[model_name] = model_name - mod_calibrator.setdefault(m_group_name, {})[model_name] = model_calibrator + model_calibrators.setdefault(m_group_name, {})[model_name] = model_calibrator # Find best-performing model, including each model's calibration for m_name in pred_dict: @@ -279,13 +252,12 @@ def calibrate( m_group_name = m_name # TODO is deepcopy really required? - best_calibrator = copy.deepcopy(mod_calibrator[m_group_name]) + best_calibrator = copy.deepcopy(model_calibrators[m_group_name]) best_model = copy.deepcopy(mod_dict[m_group_name]) best_perf = perf logger.debug(f"Model with the best performance got selected: {best_model}") - return best_model, best_calibrator @@ -304,3 +276,42 @@ def _file_to_psm_list(input_file: str | Path) -> PSMList: psm_list.rename_modifications(mapper) return psm_list + + +def _get_unique_peptidoforms(psm_list: PSMList) -> tuple[PSMList, np.ndarray]: + """Get PSMs with unique peptidoforms and their inverse indices.""" + peptidoform_strings = np.array([str(psm.peptidoform) for psm in psm_list]) + unique_peptidoforms, inverse_indices = np.unique(peptidoform_strings, return_inverse=True) + return unique_peptidoforms, inverse_indices + + +def _load_model( + model: Module | Path | str | None = None, + device: str | None = None, + eval: bool = False, +) -> Module: + """Load a model from a file or return the default model if none is provided.""" + # If no model is provided, use the default model + model = model or DEFAULT_MODEL + + # If device is not specified, use the default device (GPU if available, else CPU) + device = device or torch.device("cuda" if torch.cuda.is_available() else "cpu") + + # Load model from file if a path is provided + if isinstance(model, str | Path): + model = torch.load(model, weights_only=False, map_location=device) + elif not isinstance(model, Module): + raise TypeError(f"Expected a PyTorch Module or a file path, got {type(model)} instead.") + + # Ensure the model is on the specified device + model.to(device) + + # Set the model to evaluation or training mode based on the eval flag + if eval: + model.eval() + else: + model.train() + + return model + + From f1947ebd14d6aeda1077084abaf12c2338021056 Mon Sep 17 00:00:00 2001 From: RalfG Date: Tue, 23 Sep 2025 15:46:59 +0200 Subject: [PATCH 4/4] Fully vectorize linear calibration --- deeplc/calibration.py | 481 ++++++++++++++++---------------------- tests/test_calibration.py | 79 +++++++ 2 files changed, 283 insertions(+), 277 deletions(-) create mode 100644 tests/test_calibration.py diff --git a/deeplc/calibration.py b/deeplc/calibration.py index d94f21b..35d668f 100644 --- a/deeplc/calibration.py +++ b/deeplc/calibration.py @@ -1,4 +1,9 @@ -"""Retention time calibration.""" +""" +Retention time calibration utilities. + +This module provides calibration strategies to map source retention times to +an aligned target scale. +""" from __future__ import annotations @@ -6,9 +11,9 @@ from abc import ABC, abstractmethod import numpy as np -from sklearn.linear_model import LinearRegression -from sklearn.pipeline import make_pipeline -from sklearn.preprocessing import SplineTransformer +from sklearn.linear_model import LinearRegression # type: ignore[import] +from sklearn.pipeline import Pipeline, make_pipeline # type: ignore[import] +from sklearn.preprocessing import SplineTransformer # type: ignore[import] from deeplc._exceptions import CalibrationError @@ -23,346 +28,268 @@ def __init__(self, *args, **kwargs): super().__init__() @abstractmethod - def fit(measured_tr: np.ndarray, predicted_tr: np.ndarray) -> None: ... + def fit(self, target_rt: np.ndarray, source_rt: np.ndarray) -> None: + """Fit the calibration from source to target.""" @abstractmethod - def transform(tr: np.ndarray) -> np.ndarray: ... - + def transform(self, source_rt: np.ndarray) -> np.ndarray: + """Transform source retention times into the calibrated target space.""" + class IdentityCalibration(Calibration): - """No calibration, just returns the predicted retention times.""" + """No calibration; returns inputs unchanged.""" - def fit(self, measured_tr: np.ndarray, predicted_tr: np.ndarray) -> None: - """No fitting required for NoCalibration.""" - pass + def fit(self, target_rt: np.ndarray, source_rt: np.ndarray) -> None: # noqa: ARG002 + return None - def transform(self, tr: np.ndarray) -> np.ndarray: - """ - Transform the predicted retention times without any calibration. - - Parameters - ---------- - tr - Retention times to be transformed. - - Returns - ------- - np.ndarray - Transformed retention times (same as input). - """ - return tr + def transform(self, source_rt: np.ndarray) -> np.ndarray: + return source_rt class PiecewiseLinearCalibration(Calibration): def __init__( self, - split_cal: int = 50, - bin_distance: float = 2.0, - dict_cal_divider: int = 50, + number_of_splits: int = 50, + extrapolate: bool = True, use_median: bool = True, - ): + ) -> None: """ - Piece-wise linear calibration for retention time. - + Piece-wise linear calibration based on per-split anchors. + Parameters ---------- - split_cal - Number of splits. - bin_distance - Distance between bins. - dict_cal_divider - # TODO: Make more descriptive - Divider for the dictionary used in the piece-wise linear model. - use_median - # TODO: Make more descriptive - If True, use median instead of mean for calibration. - + number_of_splits : int + Number of segments to split the source retention time range into. + More segments allow more flexibility but may lead to overfitting. + extrapolate : bool + If True, allows extrapolation outside the fitted source retention time range. + If False, clips input values to the fitted range. + use_median : bool + If True, uses the median of each segment to define anchors. If False, uses the mean. """ super().__init__() - self.split_cal = split_cal - self.bin_distance = bin_distance - self.dict_cal_divider = dict_cal_divider - self.use_median = use_median - - self._calibrate_min = None - self._calibrate_max = None - self._calibrate_dict = None - self._fit = False - - def fit(self, measured_tr: np.ndarray, predicted_tr: np.ndarray) -> None: - """ - Fit a piece-wise linear model to the measured and predicted retention times. - - Parameters - ---------- - measured_tr - Measured retention times. - predicted_tr - Predicted retention times. - - """ - measured_tr, predicted_tr = _process_arrays(measured_tr, predicted_tr) - - mtr_mean = [] - ptr_mean = [] + self.number_of_splits = int(number_of_splits) + self.extrapolate = bool(extrapolate) + self.use_median = bool(use_median) + + self._calibrate_min: float | None = None + self._calibrate_max: float | None = None + self._source_breakpoints: np.ndarray | None = None + self._slopes: np.ndarray | None = None + self._intercepts: np.ndarray | None = None + + def fit(self, target_rt: np.ndarray, source_rt: np.ndarray) -> None: + """Fit a piece-wise linear model mapping source to target retention times.""" + target_rt, source_rt = _prepare_series(target_rt, source_rt) + + cal_min = float(source_rt[0]) + cal_max = float(source_rt[-1]) + if (not np.isfinite(cal_min)) or (not np.isfinite(cal_max)) or (cal_max <= cal_min): + raise CalibrationError( + "Source retention times have zero or invalid range; cannot calibrate." + ) - calibrate_dict = {} - calibrate_min = float("inf") - calibrate_max = 0 + boundaries = np.linspace(cal_min, cal_max, self.number_of_splits + 1, dtype=np.float32) + starts = np.searchsorted(source_rt, boundaries[:-1], side="left") + ends = np.searchsorted(source_rt, boundaries[1:], side="left") - LOGGER.debug( - "Selecting the data points for calibration (used to fit the linear models between)" - ) - # smooth between observed and predicted - split_val = predicted_tr[-1] / self.split_cal - - for range_calib_number in np.arange(0.0, predicted_tr[-1], split_val): - ptr_index_start = np.argmax(predicted_tr >= range_calib_number) - ptr_index_end = np.argmax(predicted_tr >= range_calib_number + split_val) - - # no points so no cigar... use previous points - if ptr_index_start >= ptr_index_end: - LOGGER.debug( - "Skipping calibration step, due to no points in the " - "predicted range (are you sure about the split size?): " - "%s,%s", - range_calib_number, - range_calib_number + split_val, - ) + tgt_anchors: list[float] = [] + src_anchors: list[float] = [] + for s, e in zip(starts, ends, strict=True): + if e <= s: continue - - mtr = measured_tr[ptr_index_start:ptr_index_end] - ptr = predicted_tr[ptr_index_start:ptr_index_end] - + t_seg = target_rt[s:e] + s_seg = source_rt[s:e] if self.use_median: - mtr_mean.append(np.median(mtr)) - ptr_mean.append(np.median(ptr)) + tgt_anchors.append(float(np.median(t_seg))) + src_anchors.append(float(np.median(s_seg))) else: - mtr_mean.append(sum(mtr) / len(mtr)) - ptr_mean.append(sum(ptr) / len(ptr)) - - LOGGER.debug("Fitting the linear models between the points") + tgt_anchors.append(float(np.mean(t_seg))) + src_anchors.append(float(np.mean(s_seg))) - if self.split_cal >= len(measured_tr): + if len(src_anchors) < 2: raise CalibrationError( - f"Not enough measured tr ({len(measured_tr)}) for the chosen number of splits " - f"({self.split_cal}). Choose a smaller split_cal parameter or provide more " - "peptides for fitting the calibration curve." + "Not enough anchor points to build a piecewise calibration (need >= 2)." ) - if len(mtr_mean) == 0: - raise CalibrationError("The measured tr list is empty, not able to calibrate") - if len(ptr_mean) == 0: - raise CalibrationError("The predicted tr list is empty, not able to calibrate") - - # calculate calibration curves - for i in range(0, len(ptr_mean)): - if i >= len(ptr_mean) - 1: - continue - delta_ptr = ptr_mean[i + 1] - ptr_mean[i] - delta_mtr = mtr_mean[i + 1] - mtr_mean[i] - - slope = delta_mtr / delta_ptr - intercept = (-1 * (ptr_mean[i] * slope)) + mtr_mean[i] - - # optimized predictions using a dict to find calibration curve very fast - for v in np.arange( - round(ptr_mean[i], self.bin_distance), - round(ptr_mean[i + 1], self.bin_distance), - 1 / ((self.bin_distance) * self.dict_cal_divider), - ): - if v < calibrate_min: - calibrate_min = v - if v > calibrate_max: - calibrate_max = v - calibrate_dict[str(round(v, self.bin_distance))] = (slope, intercept) - - self._calibrate_min = calibrate_min - self._calibrate_max = calibrate_max - self._calibrate_dict = calibrate_dict - - self._fit = True - - def transform(self, tr: np.ndarray) -> np.ndarray: - """ - Transform the predicted retention times using the fitted piece-wise linear model. - Parameters - ---------- - tr - Retention times to be transformed. - - Returns - ------- - np.ndarray - Transformed retention times. - """ - if not self._fit: + src_arr = np.asarray(src_anchors, dtype=np.float32) + tgt_arr = np.asarray(tgt_anchors, dtype=np.float32) + keep = np.concatenate(([True], src_arr[1:] > src_arr[:-1])) + src_arr = src_arr[keep] + tgt_arr = tgt_arr[keep] + if src_arr.size < 2: raise CalibrationError( - "The model has not been fitted yet. Please call fit() before transform()." + "After removing degenerate anchors, not enough points remain to define segments." ) - if tr.shape[0] == 0: + delta_src = src_arr[1:] - src_arr[:-1] + delta_tgt = tgt_arr[1:] - tgt_arr[:-1] + slopes = delta_tgt / delta_src + intercepts = (-src_arr[:-1] * slopes) + tgt_arr[:-1] + + self._source_breakpoints = src_arr.astype(np.float32) + self._slopes = slopes.astype(np.float32) + self._intercepts = intercepts.astype(np.float32) + self._calibrate_min = cal_min + self._calibrate_max = cal_max + + LOGGER.debug( + "Piecewise fit: anchors=%d, segments=%d, range=[%.3f, %.3f]", + len(self._source_breakpoints), + len(self._slopes), + self._calibrate_min, + self._calibrate_max, + ) + + def transform(self, source_rt: np.ndarray) -> np.ndarray: + """Transform source retention times using the fitted piece-wise linear model.""" + if ( + self._calibrate_min is None + or self._calibrate_max is None + or self._source_breakpoints is None + or self._slopes is None + or self._intercepts is None + ): + raise CalibrationError("The model has not been fitted yet. Call fit() first.") + + if source_rt.shape[0] == 0: return np.array([]) - # TODO: Can this be vectorized? - cal_preds = [] - for uncal_pred in tr: - try: - slope, intercept = self.cal_dict[str(round(uncal_pred, self.bin_distance))] - cal_preds.append(slope * (uncal_pred) + intercept) - except KeyError: - # outside of the prediction range ... use the last - # calibration curve - if uncal_pred <= self.cal_min: - slope, intercept = self.cal_dict[str(round(self.cal_min, self.bin_distance))] - cal_preds.append(slope * (uncal_pred) + intercept) - elif uncal_pred >= self.cal_max: - slope, intercept = self.cal_dict[str(round(self.cal_max, self.bin_distance))] - cal_preds.append(slope * (uncal_pred) + intercept) - else: - slope, intercept = self.cal_dict[str(round(self.cal_max, self.bin_distance))] - cal_preds.append(slope * (uncal_pred) + intercept) + x = source_rt.astype(np.float32, copy=False) + x_eval = ( + np.clip(x, self._calibrate_min, self._calibrate_max) if not self.extrapolate else x + ) - return np.array(cal_preds) + idx = np.searchsorted(self._source_breakpoints, x_eval, side="right") - 1 + idx = np.clip(idx, 0, len(self._source_breakpoints) - 2) + y = self._slopes[idx] * x_eval + self._intercepts[idx] + return y + + def get_calibration_curve(self) -> tuple[np.ndarray, np.ndarray]: + """Return the calibration anchors as two arrays (x, y).""" + if ( + self._source_breakpoints is None + or self._slopes is None + or self._intercepts is None + ): + raise CalibrationError("The model has not been fitted yet. Call fit() first.") + + x = self._source_breakpoints.astype(np.float64) + y = np.empty_like(x, dtype=np.float64) + y[0] = float(self._slopes[0] * x[0] + self._intercepts[0]) + if len(x) > 1: + prev_idx = np.arange(0, len(x) - 1) + y[1:] = (self._slopes[prev_idx] * x[1:] + self._intercepts[prev_idx]).astype( + np.float64 + ) + return x, y + + @property + def calibrate_min(self) -> float | None: + return self._calibrate_min + + @property + def calibrate_max(self) -> float | None: + return self._calibrate_max class SplineTransformerCalibration(Calibration): - def __init__(self): - """SplineTransformer calibration for retention time.""" + def __init__(self) -> None: super().__init__() - self._calibrate_min = None - self._calibrate_max = None - self._linear_model_left = None - self._spline_model = None - self._linear_model_right = None - - self._fit = False + self._calibrate_min: float | None = None + self._calibrate_max: float | None = None + self._model_left: LinearRegression | None = None + self._model_main: Pipeline | LinearRegression | None = None + self._model_right: LinearRegression | None = None def fit( self, - measured_tr: np.ndarray, - predicted_tr: np.ndarray, - simplified: bool = False, # TODO: Move to __init__? + target_rt: np.ndarray, + source_rt: np.ndarray, + simplified: bool = False, ) -> None: - """ - Fit the SplineTransformer model to the measured and predicted retention times. - - Parameters - ---------- - measured_tr - Measured retention times. - predicted_tr - Predicted retention times. - simplified - If True, use a simplified model with fewer knots and a linear model. - If False, use a more complex model with more knots and a spline model. - - """ - measured_tr, predicted_tr = _process_arrays(measured_tr, predicted_tr) + """Fit a spline-based model mapping source to target retention times.""" + target_rt, source_rt = _prepare_series(target_rt, source_rt) - # Fit a SplineTransformer model if simplified: - spline = SplineTransformer(degree=2, n_knots=10) linear_model = LinearRegression() - linear_model.fit(predicted_tr.reshape(-1, 1), measured_tr) - + linear_model.fit(source_rt.reshape(-1, 1), target_rt) linear_model_left = linear_model - # TODO @RobbinBouwmeester: Should this be the spline model? spline_model = linear_model linear_model_right = linear_model else: - spline = SplineTransformer(degree=4, n_knots=int(len(measured_tr) / 500) + 5) + spline = SplineTransformer(degree=4, n_knots=int(len(source_rt) / 500) + 5) spline_model = make_pipeline(spline, LinearRegression()) - spline_model.fit(predicted_tr.reshape(-1, 1), measured_tr) - - # Determine the top 10% of data on either end - n_top = int(len(predicted_tr) * 0.1) + spline_model.fit(source_rt.reshape(-1, 1), target_rt) - # Fit a linear model on the bottom 10% (left-side extrapolation) - X_left = predicted_tr[:n_top] - y_left = measured_tr[:n_top] + n_top = int(len(source_rt) * 0.1) + X_left = source_rt[:n_top] + y_left = target_rt[:n_top] linear_model_left = LinearRegression() linear_model_left.fit(X_left.reshape(-1, 1), y_left) - # Fit a linear model on the top 10% (right-side extrapolation) - X_right = predicted_tr[-n_top:] - y_right = measured_tr[-n_top:] + X_right = source_rt[-n_top:] + y_right = target_rt[-n_top:] linear_model_right = LinearRegression() linear_model_right.fit(X_right.reshape(-1, 1), y_right) - self._calibrate_min = min(predicted_tr) - self._calibrate_max = max(predicted_tr) - self._linear_model_left = linear_model_left - self._spline_model = spline_model - self._linear_model_right = linear_model_right - - self._fit = True - - def transform(self, tr: np.ndarray) -> np.ndarray: - """ - Transform the predicted retention times using the fitted SplineTransformer model. - - Parameters - ---------- - tr - Retention times to be transformed. - - Returns - ------- - np.ndarray - Transformed retention times. - """ - if not self._fit: - raise CalibrationError( - "The model has not been fitted yet. Please call fit() before transform()." - ) - - if tr.shape[0] == 0: + self._calibrate_min = float(np.min(source_rt)) + self._calibrate_max = float(np.max(source_rt)) + self._model_left = linear_model_left + self._model_main = spline_model + self._model_right = linear_model_right + + def transform(self, source_rt: np.ndarray) -> np.ndarray: + """Transform source retention times using the fitted spline model.""" + if ( + self._calibrate_min is None + or self._calibrate_max is None + or self._model_main is None + or self._model_left is None + or self._model_right is None + ): + raise CalibrationError("The model has not been fitted yet. Call fit() first.") + assert self._model_main is not None + assert self._model_left is not None + assert self._model_right is not None + + if source_rt.shape[0] == 0: return np.array([]) - y_pred_spline = self._spline_model.predict(tr.reshape(-1, 1)) - y_pred_left = self._linear_model_left.predict(tr.reshape(-1, 1)) - y_pred_right = self._linear_model_right.predict(tr.reshape(-1, 1)) + y_pred_spline = self._model_main.predict(source_rt.reshape(-1, 1)) + y_pred_left = self._model_left.predict(source_rt.reshape(-1, 1)) + y_pred_right = self._model_right.predict(source_rt.reshape(-1, 1)) - # Use spline model within the range of X - within_range = (tr >= self.cal_min) & (tr <= self.cal_max) - within_range = within_range.ravel() # Ensure this is a 1D array for proper indexing + within_range = (source_rt >= self._calibrate_min) & (source_rt <= self._calibrate_max) + within_range = within_range.ravel() - # Create a prediction array initialized with spline predictions cal_preds = np.copy(y_pred_spline) - - # Replace predictions outside the range with the linear model predictions - cal_preds[~within_range & (tr.ravel() < self.cal_min)] = y_pred_left[ - ~within_range & (tr.ravel() < self.cal_min) + cal_preds[~within_range & (source_rt.ravel() < self._calibrate_min)] = y_pred_left[ + ~within_range & (source_rt.ravel() < self._calibrate_min) ] - cal_preds[~within_range & (tr.ravel() > self.cal_max)] = y_pred_right[ - ~within_range & (tr.ravel() > self.cal_max) + cal_preds[~within_range & (source_rt.ravel() > self._calibrate_max)] = y_pred_right[ + ~within_range & (source_rt.ravel() > self._calibrate_max) ] - return np.array(cal_preds) -def _process_arrays( - measured_tr: np.ndarray, - predicted_tr: np.ndarray, +def _prepare_series( + target_rt: np.ndarray, + source_rt: np.ndarray, ) -> tuple[np.ndarray, np.ndarray]: - """Process the measured and predicted retention times.""" - # Check array lengths - if len(measured_tr) != len(predicted_tr): + """Prepare target/source arrays: shape, sort by source, cast to float32.""" + if len(target_rt) != len(source_rt): raise ValueError( - f"Measured and predicted retention times must have the same length. " - f"Got {len(measured_tr)} and {len(predicted_tr)}." + "Target and source retention times must have the same length. Got " + f"{len(target_rt)} and {len(source_rt)}." ) + if len(target_rt.shape) > 1: + target_rt = target_rt.flatten() + if len(source_rt.shape) > 1: + source_rt = source_rt.flatten() - # Ensure both arrays are 1D - if len(measured_tr.shape) > 1: - measured_tr = measured_tr.flatten() - if len(predicted_tr.shape) > 1: - predicted_tr = predicted_tr.flatten() - - # Sort arrays by predicted_tr - indices = np.argsort(predicted_tr) - measured_tr = np.array(measured_tr, dtype=np.float32)[indices] - predicted_tr = np.array(predicted_tr, dtype=np.float32)[indices] + idx = np.argsort(source_rt) + target_rt = np.array(target_rt, dtype=np.float32)[idx] + source_rt = np.array(source_rt, dtype=np.float32)[idx] - return measured_tr, predicted_tr + return target_rt, source_rt diff --git a/tests/test_calibration.py b/tests/test_calibration.py new file mode 100644 index 0000000..d71fd43 --- /dev/null +++ b/tests/test_calibration.py @@ -0,0 +1,79 @@ +import numpy as np +import pytest + +from deeplc.calibration import CalibrationError, PiecewiseLinearCalibration + + +def test_transform_requires_fit(): + cal = PiecewiseLinearCalibration() + with pytest.raises(CalibrationError): + cal.transform(np.array([0.0, 1.0], dtype=np.float32)) + + +def test_piecewise_maps_predicted_to_measured_linear_case(): + # Linear relationship with no noise + predicted = np.linspace(0.0, 10.0, 201, dtype=np.float32) + measured = 2.0 * predicted + 5.0 + + cal = PiecewiseLinearCalibration(number_of_splits=25, use_median=True) + cal.fit(target_rt=measured, source_rt=predicted) + + transformed = cal.transform(predicted) + # Should match measured closely + assert transformed.shape == measured.shape + np.testing.assert_allclose(transformed, measured, rtol=1e-5, atol=1e-4) + + +def test_piecewise_out_of_range_values_are_handled(): + predicted = np.linspace(0.0, 10.0, 201, dtype=np.float32) + measured = 1.5 * predicted - 3.0 + + cal = PiecewiseLinearCalibration(number_of_splits=20, use_median=True) + cal.fit(target_rt=measured, source_rt=predicted) + + # Values outside the training range + tr = np.array([-5.0, -0.1, 0.0, 5.0, 10.0, 10.1, 15.0], dtype=np.float32) + out = cal.transform(tr) + + assert out.shape == tr.shape + assert np.all(np.isfinite(out)) + # Monotonic transformation expected for linear calibration + assert np.all(np.diff(out) >= -1e-6) + + +def test_breakpoint_transform_matches_linear_truth(): + x = np.linspace(0.0, 100.0, 1001, dtype=np.float32) + y = 0.5 * x + 10.0 + cal = PiecewiseLinearCalibration(number_of_splits=40) + cal.fit(target_rt=y, source_rt=x) + out = cal.transform(x) + np.testing.assert_allclose(out, y, rtol=1e-5, atol=1e-4) + + +def test_extrapolate_false_clips_inputs(): + x = np.linspace(10.0, 20.0, 101, dtype=np.float32) + y = 3.0 * x - 7.0 + cal = PiecewiseLinearCalibration(number_of_splits=10, extrapolate=False) + cal.fit(target_rt=y, source_rt=x) + + x_query = np.array([0.0, 5.0, 10.0, 15.0, 20.0, 30.0], dtype=np.float32) + out = cal.transform(x_query) + + # Values below clip to left bound; above to right bound + left_val = (3.0 * 10.0) - 7.0 + right_val = (3.0 * 20.0) - 7.0 + assert np.isclose(out[0], left_val, rtol=1e-6) + assert np.isclose(out[1], left_val, rtol=1e-6) + assert np.isclose(out[-1], right_val, rtol=1e-6) + + +def test_zero_range_predicted_raises(): + import pytest + + from deeplc.calibration import CalibrationError + + x = np.ones(50, dtype=np.float32) * 7.5 + y = 2.0 * x + 1.0 + cal = PiecewiseLinearCalibration(number_of_splits=10) + with pytest.raises(CalibrationError): + cal.fit(target_rt=y, source_rt=x)