Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 13 additions & 64 deletions crates/sage-cli/src/output.rs
Original file line number Diff line number Diff line change
@@ -1,54 +1,31 @@
use rayon::prelude::*;
use sage_core::spectrum::MS1Spectra;
use sage_core::spectrum::ProcessedSpectrum;
use sage_core::{scoring::Feature, tmt::TmtQuant};

#[derive(Default)]
pub struct SageResults {
pub ms1: MS1Spectra,
pub ms1: Vec<ProcessedSpectrum>,
pub features: Vec<Feature>,
pub quant: Vec<TmtQuant>,
}

impl SageResults {
fn fold(mut self, other: SageResults) -> Self {
self.features.extend(other.features);
self.quant.extend(other.quant);
self.ms1.extend(other.ms1);
self
}
}

impl FromParallelIterator<SageResults> for SageResults {
fn from_par_iter<I>(par_iter: I) -> Self
where
I: IntoParallelIterator<Item = SageResults>,
{
par_iter
.into_par_iter()
.reduce(SageResults::default, |mut acc, x| {
acc.features.extend(x.features);
acc.quant.extend(x.quant);
match (acc.ms1, x.ms1) {
(MS1Spectra::NoMobility(mut a), MS1Spectra::NoMobility(b)) => {
a.extend(b);
acc.ms1 = MS1Spectra::NoMobility(a);
}
(MS1Spectra::WithMobility(mut a), MS1Spectra::WithMobility(b)) => {
a.extend(b);
acc.ms1 = MS1Spectra::WithMobility(a);
}
(MS1Spectra::Empty, MS1Spectra::Empty) => {
acc.ms1 = MS1Spectra::Empty;
}
(MS1Spectra::Empty, MS1Spectra::WithMobility(a))
| (MS1Spectra::WithMobility(a), MS1Spectra::Empty) => {
acc.ms1 = MS1Spectra::WithMobility(a);
}
(MS1Spectra::Empty, MS1Spectra::NoMobility(a))
| (MS1Spectra::NoMobility(a), MS1Spectra::Empty) => {
acc.ms1 = MS1Spectra::NoMobility(a);
}
_ => {
// In theory this can happen if someone is searching
// together files of different types, mixing the ones
// that support IMS and the ones that dont.
// ... I dont think this should be run-time recoverable.
unreachable!("Found combination of MS1 spectra with and without mobility.")
}
};
acc
})
.reduce(SageResults::default, SageResults::fold)
}
}

Expand All @@ -59,34 +36,6 @@ impl FromIterator<SageResults> for SageResults {
{
par_iter
.into_iter()
.fold(SageResults::default(), |mut acc, x| {
acc.features.extend(x.features);
acc.quant.extend(x.quant);
match (acc.ms1, x.ms1) {
(MS1Spectra::NoMobility(mut a), MS1Spectra::NoMobility(b)) => {
a.extend(b);
acc.ms1 = MS1Spectra::NoMobility(a);
}
(MS1Spectra::WithMobility(mut a), MS1Spectra::WithMobility(b)) => {
a.extend(b);
acc.ms1 = MS1Spectra::WithMobility(a);
}
(MS1Spectra::Empty, MS1Spectra::Empty) => {
acc.ms1 = MS1Spectra::Empty;
}
(MS1Spectra::Empty, MS1Spectra::WithMobility(a))
| (MS1Spectra::WithMobility(a), MS1Spectra::Empty) => {
acc.ms1 = MS1Spectra::WithMobility(a);
}
(MS1Spectra::Empty, MS1Spectra::NoMobility(a))
| (MS1Spectra::NoMobility(a), MS1Spectra::Empty) => {
acc.ms1 = MS1Spectra::NoMobility(a);
}
_ => {
unreachable!("Found combination of MS1 spectra with and without mobility.")
}
};
acc
})
.fold(SageResults::default(), SageResults::fold)
}
}
32 changes: 15 additions & 17 deletions crates/sage-cli/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use sage_core::mass::Tolerance;
use sage_core::peptide::Peptide;
use sage_core::scoring::Fragments;
use sage_core::scoring::{Feature, Scorer};
use sage_core::spectrum::{MS1Spectra, ProcessedSpectrum, RawSpectrum, SpectrumProcessor};
use sage_core::spectrum::{ProcessedSpectrum, RawSpectrum, SpectrumProcessor};
use sage_core::tmt::TmtQuant;
use std::collections::HashMap;
use std::collections::HashSet;
Expand Down Expand Up @@ -136,8 +136,8 @@ impl Runner {

pub fn prefilter_peptides(self, parallel: usize, fasta: Fasta) -> Vec<Peptide> {
let spectra: Option<(
MS1Spectra,
Vec<ProcessedSpectrum<sage_core::spectrum::Peak>>,
Vec<ProcessedSpectrum>,
Vec<ProcessedSpectrum>,
)> = match parallel >= self.parameters.mzml_paths.len() {
true => Some(self.read_processed_spectra(&self.parameters.mzml_paths, 0, 0)),
false => None,
Expand Down Expand Up @@ -207,15 +207,15 @@ impl Runner {
fn peptide_filter_processed_spectra(
&self,
scorer: &Scorer,
spectra: &Vec<ProcessedSpectrum<sage_core::spectrum::Peak>>,
spectra: &Vec<ProcessedSpectrum>,
) -> Vec<PeptideIx> {
use std::sync::atomic::{AtomicUsize, Ordering};
let counter = AtomicUsize::new(0);
let start = Instant::now();

let peptide_idxs: Vec<_> = spectra
.par_iter()
.filter(|spec| spec.peaks.len() >= self.parameters.min_peaks && spec.level == 2)
.filter(|spec| spec.masses.len() >= self.parameters.min_peaks && spec.level == 2)
.map(|x| {
let prev = counter.fetch_add(1, Ordering::Relaxed);
if prev > 0 && prev % 10_000 == 0 {
Expand Down Expand Up @@ -262,15 +262,15 @@ impl Runner {
fn search_processed_spectra(
&self,
scorer: &Scorer,
msn_spectra: &Vec<ProcessedSpectrum<sage_core::spectrum::Peak>>,
msn_spectra: &Vec<ProcessedSpectrum>,
) -> Vec<Feature> {
use std::sync::atomic::{AtomicUsize, Ordering};
let counter = AtomicUsize::new(0);
let start = Instant::now();

let features: Vec<_> = msn_spectra
.par_iter()
.filter(|spec| spec.peaks.len() >= self.parameters.min_peaks && spec.level == 2)
.filter(|spec| spec.masses.len() >= self.parameters.min_peaks && spec.level == 2)
.map(|x| {
let prev = counter.fetch_add(1, Ordering::Relaxed);
if prev > 0 && prev % 10_000 == 0 {
Expand All @@ -293,8 +293,8 @@ impl Runner {

fn complete_features(
&self,
msn_spectra: Vec<ProcessedSpectrum<sage_core::spectrum::Peak>>,
ms1_spectra: MS1Spectra,
msn_spectra: Vec<ProcessedSpectrum>,
ms1_spectra: Vec<ProcessedSpectrum>,
features: Vec<Feature>,
) -> SageResults {
let quant = self
Expand Down Expand Up @@ -340,8 +340,8 @@ impl Runner {
chunk_idx: usize,
batch_size: usize,
) -> (
MS1Spectra,
Vec<ProcessedSpectrum<sage_core::spectrum::Peak>>,
Vec<ProcessedSpectrum>,
Vec<ProcessedSpectrum>,
) {
// Read all of the spectra at once - this can help prevent memory over-consumption issues
info!(
Expand Down Expand Up @@ -430,19 +430,17 @@ impl Runner {
let ms1_empty = spectra.ms1.is_empty();
let ms1_spectra = if ms1_empty {
log::trace!("no MS1 spectra found");
MS1Spectra::Empty
vec![]
} else if all_contain_ims {
log::trace!("Processing MS1 spectra with IMS");
let spectra = spectra
spectra
.ms1
.into_iter()
.map(|x| sp.process_with_mobility(x))
.collect();
MS1Spectra::WithMobility(spectra)
.collect()
} else {
log::trace!("Processing MS1 spectra without IMS");
let spectra = spectra.ms1.into_iter().map(|s| sp.process(s)).collect();
MS1Spectra::NoMobility(spectra)
spectra.ms1.into_iter().map(|s| sp.process(s)).collect()
};

let io_time = Instant::now() - start;
Expand Down
2 changes: 1 addition & 1 deletion crates/sage-cli/tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ fn integration() -> anyhow::Result<()> {

let sp = SpectrumProcessor::new(100, true, 0.0);
let processed = sp.process(spectra[0].clone());
assert!(processed.peaks.len() <= 300);
assert!(processed.masses.len() <= 300);

let scorer = Scorer {
db: &database,
Expand Down
4 changes: 2 additions & 2 deletions crates/sage-cloudpath/src/tdf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ impl PeakBuffer {

// The "order" is sorted by intensity
// This will be used later during the centroiding (for details check that implementation)
self.order.extend((0..self.len()));
self.order.extend(0..self.len());
self.order.sort_unstable_by(|&a, &b| {
self.peaks[b]
.intensity
Expand Down Expand Up @@ -375,7 +375,7 @@ impl PeakBuffer {
global_num_included += num_includable;

if global_num_included == self.len() {
log::debug!("All peaks were included in the centroiding");
log::trace!("All peaks were included in the centroiding");
break;
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/sage/src/fdr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ pub fn picked_precursor(
.map(|score| ((score.ix, score.decoy), score.q))
.collect::<FnvHashMap<_, _>>();

peaks.par_iter_mut().for_each(|((ix), (peak, _))| {
peaks.par_iter_mut().for_each(|(ix, (peak, _))| {
peak.q_value = scores[ix];
});
passing
Expand Down
122 changes: 44 additions & 78 deletions crates/sage/src/lfq.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crate::database::{binary_search_slice, IndexedDatabase, PeptideIx};
use crate::mass::{composition, Composition, Tolerance, NEUTRON};
use crate::ml::{matrix::Matrix, retention_alignment::Alignment};
use crate::scoring::Feature;
use crate::spectrum::MS1Spectra;
use crate::spectrum::ProcessedSpectrum;
use dashmap::DashMap;
use rayon::prelude::*;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -224,80 +224,51 @@ impl FeatureMap {
pub fn quantify(
&self,
db: &IndexedDatabase,
spectra: &MS1Spectra,
spectra: &Vec<ProcessedSpectrum>,
alignments: &[Alignment],
) -> HashMap<(PrecursorId, bool), (Peak, Vec<f64>), fnv::FnvBuildHasher> {
let scores: DashMap<(PrecursorId, bool), Grid, fnv::FnvBuildHasher> = DashMap::default();

log::info!("tracing MS1 features");

// TODO: find a good way to abstract this ... I think a macro would be the way to go.
match spectra {
MS1Spectra::NoMobility(spectra) => spectra.par_iter().for_each(|spectrum| {
let a = alignments[spectrum.file_id];
let rt = (spectrum.scan_start_time / a.max_rt) * a.slope + a.intercept;
let query = self.rt_slice(rt, RT_TOL);

for peak in &spectrum.peaks {
for entry in query.mass_lookup(peak.mass) {
let id = match self.settings.combine_charge_states {
true => PrecursorId::Combined(entry.peptide),
false => PrecursorId::Charged((entry.peptide, entry.charge)),
};

let mut grid = scores.entry((id, entry.decoy)).or_insert_with(|| {
let p = &db[entry.peptide];
let composition = p
.sequence
.iter()
.map(|r| composition(*r))
.sum::<Composition>();
let dist = crate::isotopes::peptide_isotopes(
composition.carbon,
composition.sulfur,
);
Grid::new(entry, RT_TOL, dist, alignments.len(), GRID_SIZE)
});

grid.add_entry(rt, entry.isotope, spectrum.file_id, peak.intensity);
}
}
}),
MS1Spectra::WithMobility(spectra) => spectra.par_iter().for_each(|spectrum| {
let a = alignments[spectrum.file_id];
let rt = (spectrum.scan_start_time / a.max_rt) * a.slope + a.intercept;
let query = self.rt_slice(rt, RT_TOL);

for peak in &spectrum.peaks {
for entry in query.mass_mobility_lookup(peak.mass, peak.mobility) {
let id = match self.settings.combine_charge_states {
true => PrecursorId::Combined(entry.peptide),
false => PrecursorId::Charged((entry.peptide, entry.charge)),
};

let mut grid = scores.entry((id, entry.decoy)).or_insert_with(|| {
let p = &db[entry.peptide];
let composition = p
.sequence
.iter()
.map(|r| composition(*r))
.sum::<Composition>();
let dist = crate::isotopes::peptide_isotopes(
composition.carbon,
composition.sulfur,
);
Grid::new(entry, RT_TOL, dist, alignments.len(), GRID_SIZE)
});

grid.add_entry(rt, entry.isotope, spectrum.file_id, peak.intensity);
}
spectra.par_iter().for_each(|spectrum| {
let a = alignments[spectrum.file_id];
let rt = (spectrum.scan_start_time / a.max_rt) * a.slope + a.intercept;
let query = self.rt_slice(rt, RT_TOL);

for (peak_idx, peak_mass) in spectrum.masses.iter().enumerate() {

let mob = match &spectrum.mobilities {
Some(mobs) => Some(mobs[peak_idx]),
None => None,
};
let inten = spectrum.intensities[peak_idx];

for entry in query.mass_lookup(*peak_mass, mob){
let id = match self.settings.combine_charge_states {
true => PrecursorId::Combined(entry.peptide),
false => PrecursorId::Charged((entry.peptide, entry.charge)),
};

let mut grid = scores.entry((id, entry.decoy)).or_insert_with(|| {
let p = &db[entry.peptide];
let composition = p
.sequence
.iter()
.map(|r| composition(*r))
.sum::<Composition>();
let dist = crate::isotopes::peptide_isotopes(
composition.carbon,
composition.sulfur,
);
Grid::new(entry, RT_TOL, dist, alignments.len(), GRID_SIZE)
});

grid.add_entry(rt, entry.isotope, spectrum.file_id, inten);
}
}),
MS1Spectra::Empty => {
// Should never be called if no MS1 spectra are present
log::warn!("no MS1 spectra found for quantification");
}
};
});

log::info!("integrating MS1 features");

Expand Down Expand Up @@ -658,7 +629,7 @@ fn convolve(slice: &[f64], kernel: &[f64]) -> Vec<f64> {
}

impl Query<'_> {
pub fn mass_lookup(&self, mass: f32) -> impl Iterator<Item = &PrecursorRange> {
pub fn mass_lookup(&self, mass: f32, mobility: Option<f32>) -> impl Iterator<Item = &PrecursorRange> {
(self.page_lo..self.page_hi).flat_map(move |page| {
let left_idx = page * self.bin_size;
// Last chunk not guaranted to be modulo bucket size, make sure we don't
Expand All @@ -683,17 +654,12 @@ impl Query<'_> {
&& mass >= frag.mass_lo
&& mass <= frag.mass_hi
})
})
}

pub fn mass_mobility_lookup(
&self,
mass: f32,
mobility: f32,
) -> impl Iterator<Item = &PrecursorRange> {
self.mass_lookup(mass).filter(move |precursor| {
// TODO: replace this magic number with a patameter.
(precursor.mobility_hi >= mobility) && (precursor.mobility_lo <= mobility)
}).filter(move |precursor| {
if let Some(mobility) = mobility {
(precursor.mobility_hi >= mobility) && (precursor.mobility_lo <= mobility)
} else {
true
}
})
}
}
Loading