From 349d65f55f24e169049280cf23e715e2b00b297c Mon Sep 17 00:00:00 2001 From: Filip Rumenovski Date: Wed, 31 Dec 2025 10:21:47 +0100 Subject: [PATCH 1/2] Run all tests and ensure they pass --- CHANGELOG.md | 1 + Cargo.lock | 53 +- README.md | 1 + crates/sage-cli/src/input.rs | 39 + crates/sage-cli/src/main.rs | 27 + crates/sage-cli/src/runner.rs | 135 ++- crates/sage-cloudpath/Cargo.toml | 3 + crates/sage-cloudpath/src/index_parquet.rs | 1089 ++++++++++++++++++++ crates/sage-cloudpath/src/lib.rs | 3 + crates/sage-cloudpath/src/parquet.rs | 20 +- crates/sage/src/database.rs | 2 +- 11 files changed, 1326 insertions(+), 47 deletions(-) create mode 100644 crates/sage-cloudpath/src/index_parquet.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index 794b0461..288eeec4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Speedup on the generation of databases when large number of peptides are redundant. - Initial support for searching diaPASEF data - `override_precursor_charge` setting that forces multiple charge states to be searched +- Index serialization to parquet format (`--save-index`, `--load-index`, `--export-index`, `--validate-index`) ### Breaking Changes - `precursor_ppm` field reports the non-absoluted average mass error, rather than the absoluted average mass error. - Don't deisotope reporter ion regions if MS2-based TMT/iTRAQ is used diff --git a/Cargo.lock b/Cargo.lock index 1ea8aa55..d61567af 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -352,7 +352,7 @@ dependencies = [ "aws-types", "bytes", "bytes-utils", - "fastrand", + "fastrand 1.9.0", "http", "http-body", "once_cell", @@ -495,7 +495,7 @@ dependencies = [ "aws-smithy-protocol-test", "aws-smithy-types", "bytes", - "fastrand", + "fastrand 1.9.0", "http", "http-body", "hyper", @@ -1119,6 +1119,16 @@ dependencies = [ "typeid", ] +[[package]] +name = "errno" +version = "0.3.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" +dependencies = [ + "libc", + "windows-sys 0.61.2", +] + [[package]] name = "extend" version = "0.1.2" @@ -1152,6 +1162,12 @@ dependencies = [ "instant", ] +[[package]] +name = "fastrand" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" + [[package]] name = "find-msvc-tools" version = "0.1.4" @@ -1841,6 +1857,12 @@ dependencies = [ "num-traits", ] +[[package]] +name = "linux-raw-sys" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df1d3c3b53da64cf5760482273a98e575c651a67eec7f77df96b5b642de8f039" + [[package]] name = "litemap" version = "0.8.0" @@ -2647,6 +2669,19 @@ dependencies = [ "semver", ] +[[package]] +name = "rustix" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "146c9e247ccc180c1f61615433868c99f3de3ae256a30a43b49f67c2d9171f34" +dependencies = [ + "bitflags 2.10.0", + "errno", + "libc", + "linux-raw-sys", + "windows-sys 0.61.2", +] + [[package]] name = "rustls" version = "0.20.9" @@ -2759,6 +2794,7 @@ dependencies = [ "sage-core", "serde", "serde_json", + "tempfile", "thiserror", "timsrust", "tokio", @@ -3137,6 +3173,19 @@ dependencies = [ "libc", ] +[[package]] +name = "tempfile" +version = "3.24.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "655da9c7eb6305c55742045d5a8d2037996d61d8de95806335c7c86ce0f82e9c" +dependencies = [ + "fastrand 2.3.0", + "getrandom 0.3.4", + "once_cell", + "rustix", + "windows-sys 0.61.2", +] + [[package]] name = "termcolor" version = "1.4.1" diff --git a/README.md b/README.md index 657d446b..88df4cc7 100644 --- a/README.md +++ b/README.md @@ -49,6 +49,7 @@ If you use Sage in a scientific publication, please cite the following paper: - Configuration by [JSON file](https://sage-docs.vercel.app/docs/configuration#file) - Built-in support for reading gzipped-mzML files - Support for reading/writing directly from [AWS S3](https://sage-docs.vercel.app/docs/configuration/aws) +- Index serialization to parquet for fast reloading (`--save-index`, `--load-index`) ## Interoperability diff --git a/crates/sage-cli/src/input.rs b/crates/sage-cli/src/input.rs index b9b7b002..9ffce300 100644 --- a/crates/sage-cli/src/input.rs +++ b/crates/sage-cli/src/input.rs @@ -47,6 +47,18 @@ pub struct Search { pub annotate_matches: bool, pub score_type: ScoreType, + + #[serde(skip_serializing)] + pub save_index: Option, + + #[serde(skip_serializing)] + pub load_index: Option, + + #[serde(skip_serializing)] + pub export_index: Option, + + #[serde(skip_serializing)] + pub validate_index: bool, } #[derive(Deserialize)] @@ -76,6 +88,15 @@ pub struct Input { pub write_pin: Option, pub write_report: Option, pub score_type: Option, + + #[serde(skip)] + pub save_index: Option, + #[serde(skip)] + pub load_index: Option, + #[serde(skip)] + pub export_index: Option, + #[serde(skip)] + pub validate_index: Option, } #[derive(Serialize, Deserialize, Debug)] @@ -213,6 +234,20 @@ impl Input { input.annotate_matches = Some(annotate_matches); } + // Index serialization options + if let Some(save_index) = matches.get_one::("save-index") { + input.save_index = Some(save_index.clone()); + } + if let Some(load_index) = matches.get_one::("load-index") { + input.load_index = Some(load_index.clone()); + } + if let Some(export_index) = matches.get_one::("export-index") { + input.export_index = Some(export_index.clone()); + } + if let Some(validate_index) = matches.get_one::("validate-index").copied() { + input.validate_index = Some(validate_index); + } + // avoid to later panic if these parameters are not set (but doesn't check if files exist) ensure!( @@ -343,6 +378,10 @@ impl Input { bruker_config: self.bruker_config.unwrap_or_default(), write_report: self.write_report.unwrap_or(false), score_type, + save_index: self.save_index, + load_index: self.load_index, + export_index: self.export_index, + validate_index: self.validate_index.unwrap_or(false), }) } } diff --git a/crates/sage-cli/src/main.rs b/crates/sage-cli/src/main.rs index dd43aa8a..c8aeeb81 100644 --- a/crates/sage-cli/src/main.rs +++ b/crates/sage-cli/src/main.rs @@ -88,6 +88,33 @@ fn main() -> anyhow::Result<()> { .action(clap::ArgAction::SetFalse) .help("Disable sending telemetry data"), ) + .arg( + Arg::new("save-index") + .long("save-index") + .value_parser(clap::builder::NonEmptyStringValueParser::new()) + .help("Save the built index to a parquet directory for reuse") + .value_hint(ValueHint::DirPath), + ) + .arg( + Arg::new("load-index") + .long("load-index") + .value_parser(clap::builder::NonEmptyStringValueParser::new()) + .help("Load a pre-built index from parquet directory (skips FASTA processing)") + .value_hint(ValueHint::DirPath), + ) + .arg( + Arg::new("export-index") + .long("export-index") + .value_parser(clap::builder::NonEmptyStringValueParser::new()) + .help("Export a user-friendly peptide index parquet file") + .value_hint(ValueHint::FilePath), + ) + .arg( + Arg::new("validate-index") + .long("validate-index") + .action(clap::ArgAction::SetTrue) + .help("Validate loaded index matches what would be built from FASTA"), + ) .help_template( "{usage-heading} {usage}\n\n\ {about-with-newline}\n\ diff --git a/crates/sage-cli/src/runner.rs b/crates/sage-cli/src/runner.rs index 317011f2..e4eb9574 100644 --- a/crates/sage-cli/src/runner.rs +++ b/crates/sage-cli/src/runner.rs @@ -88,50 +88,107 @@ impl Runner { pub fn new(parameters: Search, parallel: usize) -> anyhow::Result { let mut parameters = parameters.clone(); let start = Instant::now(); - let fasta = sage_cloudpath::util::read_fasta( - ¶meters.database.fasta, - ¶meters.database.decoy_tag, - parameters.database.generate_decoys, - ) - .with_context(|| { - format!( - "Failed to build database from `{}`", - parameters.database.fasta + + // Check if we should load a pre-built index + let database = if let Some(ref load_path) = parameters.load_index { + info!("Loading pre-built index from {}", load_path); + let path = load_path.parse::()?; + let loaded_db = sage_cloudpath::index_parquet::deserialize_index(&path) + .with_context(|| format!("Failed to load index from `{}`", load_path))?; + + info!( + "loaded {} fragments, {} peptides in {:#?}", + loaded_db.fragments.len(), + loaded_db.peptides.len(), + start.elapsed() + ); + + // Optional validation: build from FASTA and compare + if parameters.validate_index { + info!("Validating loaded index against FASTA build..."); + let fasta = sage_cloudpath::util::read_fasta( + ¶meters.database.fasta, + ¶meters.database.decoy_tag, + parameters.database.generate_decoys, + ) + .with_context(|| { + format!("Failed to read FASTA from `{}`", parameters.database.fasta) + })?; + let built_db = parameters.database.clone().build(fasta); + sage_cloudpath::index_parquet::validate_index(&built_db, &loaded_db) + .with_context(|| "Index validation failed")?; + info!("Index validation passed!"); + } + + loaded_db + } else { + // Build from FASTA as usual + let fasta = sage_cloudpath::util::read_fasta( + ¶meters.database.fasta, + ¶meters.database.decoy_tag, + parameters.database.generate_decoys, ) - })?; - - let database = match parameters.database.prefilter { - false => parameters.database.clone().build(fasta), - true => { - parameters - .database - .auto_calculate_prefilter_chunk_size(&fasta); - if parameters.database.prefilter_chunk_size >= fasta.targets.len() { - parameters.database.clone().build(fasta) - } else { - info!( - "using {} db chunks of size {}", - (fasta.targets.len() + parameters.database.prefilter_chunk_size - 1) - / parameters.database.prefilter_chunk_size, - parameters.database.prefilter_chunk_size, - ); - let mini_runner = Self { - database: IndexedDatabase::default(), - parameters: parameters.clone(), - start, - }; - let peptides = mini_runner.prefilter_peptides(parallel, fasta); - parameters.database.clone().build_from_peptides(peptides) + .with_context(|| { + format!( + "Failed to build database from `{}`", + parameters.database.fasta + ) + })?; + + let built_db = match parameters.database.prefilter { + false => parameters.database.clone().build(fasta), + true => { + parameters + .database + .auto_calculate_prefilter_chunk_size(&fasta); + if parameters.database.prefilter_chunk_size >= fasta.targets.len() { + parameters.database.clone().build(fasta) + } else { + info!( + "using {} db chunks of size {}", + (fasta.targets.len() + parameters.database.prefilter_chunk_size - 1) + / parameters.database.prefilter_chunk_size, + parameters.database.prefilter_chunk_size, + ); + let mini_runner = Self { + database: IndexedDatabase::default(), + parameters: parameters.clone(), + start, + }; + let peptides = mini_runner.prefilter_peptides(parallel, fasta); + parameters.database.clone().build_from_peptides(peptides) + } } + }; + + info!( + "generated {} fragments, {} peptides in {:#?}", + built_db.fragments.len(), + built_db.peptides.len(), + start.elapsed() + ); + + // Save index if requested + if let Some(ref save_path) = parameters.save_index { + info!("Saving index to {}", save_path); + let path = save_path.parse::()?; + sage_cloudpath::index_parquet::serialize_index(&built_db, &path) + .with_context(|| format!("Failed to save index to `{}`", save_path))?; + info!("Index saved successfully"); } + + // Export user-friendly index if requested + if let Some(ref export_path) = parameters.export_index { + info!("Exporting user-friendly index to {}", export_path); + let path = export_path.parse::()?; + sage_cloudpath::index_parquet::export_index(&built_db, &path) + .with_context(|| format!("Failed to export index to `{}`", export_path))?; + info!("Index exported successfully"); + } + + built_db }; - info!( - "generated {} fragments, {} peptides in {:#?}", - database.fragments.len(), - database.peptides.len(), - (start.elapsed()) - ); Ok(Self { database, parameters, diff --git a/crates/sage-cloudpath/Cargo.toml b/crates/sage-cloudpath/Cargo.toml index e9475d8c..7006711f 100644 --- a/crates/sage-cloudpath/Cargo.toml +++ b/crates/sage-cloudpath/Cargo.toml @@ -33,3 +33,6 @@ serde_json = "1.0" sage-core = { path = "../sage" } parquet = { version = "50.0.0", optional = true, default-features = false, features = ["zstd"] } + +[dev-dependencies] +tempfile = "3" diff --git a/crates/sage-cloudpath/src/index_parquet.rs b/crates/sage-cloudpath/src/index_parquet.rs new file mode 100644 index 00000000..eeee0d8c --- /dev/null +++ b/crates/sage-cloudpath/src/index_parquet.rs @@ -0,0 +1,1089 @@ +//! Parquet serialization for IndexedDatabase +//! +//! Provides functions to serialize and deserialize the fragment ion index +//! to parquet format for fast loading, as well as user-friendly export. + +#![cfg(feature = "parquet")] + +use std::sync::Arc; + +use parquet::basic::ZstdLevel; +use parquet::data_type::{BoolType, ByteArray, ByteArrayType, FloatType, Int32Type}; +use parquet::file::properties::WriterProperties; +use parquet::file::reader::{FileReader, SerializedFileReader}; +use parquet::file::writer::SerializedFileWriter; +use parquet::record::RowAccessor; +use parquet::schema::types::Type; + +use sage_core::database::{IndexedDatabase, PeptideIx, Theoretical}; +use sage_core::enzyme::Position; +use sage_core::ion_series::Kind; +use sage_core::modification::ModificationSpecificity; +use sage_core::peptide::Peptide; + +use crate::parquet::{ROW_GROUP_SIZE, ZSTD_COMPRESSION_LEVEL}; +use crate::{CloudPath, Error}; + +/// Convert parquet error to io error +fn pq_err(e: parquet::errors::ParquetError) -> std::io::Error { + std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) +} + +#[derive(Debug)] +pub enum ValidationError { + PeptideCountMismatch { expected: usize, actual: usize }, + FragmentCountMismatch { expected: usize, actual: usize }, + PeptideMismatch { index: usize, field: &'static str }, + FragmentMismatch { index: usize, field: &'static str }, + MetadataMismatch { field: &'static str }, +} + +impl std::fmt::Display for ValidationError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::PeptideCountMismatch { expected, actual } => { + write!( + f, + "peptide count mismatch: expected {expected}, got {actual}" + ) + } + Self::FragmentCountMismatch { expected, actual } => { + write!( + f, + "fragment count mismatch: expected {expected}, got {actual}" + ) + } + Self::PeptideMismatch { index, field } => { + write!(f, "peptide mismatch at index {index}: field '{field}'") + } + Self::FragmentMismatch { index, field } => { + write!(f, "fragment mismatch at index {index}: field '{field}'") + } + Self::MetadataMismatch { field } => write!(f, "metadata mismatch: field '{field}'"), + } + } +} + +impl std::error::Error for ValidationError {} + +// Schema definitions +fn build_peptides_schema() -> parquet::errors::Result { + parquet::schema::parser::parse_message_type( + r#"message peptides_schema { + required int32 peptide_id; required boolean decoy; required byte_array sequence; + required byte_array modifications; optional float nterm; optional float cterm; + required float monoisotopic; required int32 missed_cleavages; + required boolean semi_enzymatic; required int32 position; + required byte_array proteins (utf8); + }"#, + ) +} + +fn build_fragments_schema() -> parquet::errors::Result { + parquet::schema::parser::parse_message_type( + r#"message fragments_schema { required int32 peptide_index; required float fragment_mz; }"#, + ) +} + +fn build_metadata_schema() -> parquet::errors::Result { + parquet::schema::parser::parse_message_type( + r#"message metadata_schema { required byte_array key (utf8); required byte_array value (utf8); }"#, + ) +} + +fn build_export_schema() -> parquet::errors::Result { + parquet::schema::parser::parse_message_type( + r#"message peptide_export_schema { + required byte_array peptide (utf8); required byte_array stripped_sequence (utf8); + required byte_array proteins (utf8); required boolean is_decoy; + required float monoisotopic_mass; required int32 missed_cleavages; + required boolean semi_enzymatic; required byte_array position (utf8); + required int32 num_proteins; required int32 fragment_count; + }"#, + ) +} + +// Conversion helpers +fn position_to_i32(pos: Position) -> i32 { + match pos { + Position::Nterm => 0, + Position::Cterm => 1, + Position::Full => 2, + Position::Internal => 3, + } +} + +fn i32_to_position(val: i32) -> Position { + match val { + 0 => Position::Nterm, + 1 => Position::Cterm, + 2 => Position::Full, + _ => Position::Internal, + } +} + +fn position_to_str(pos: Position) -> &'static str { + match pos { + Position::Nterm => "N-term", + Position::Cterm => "C-term", + Position::Full => "Full", + Position::Internal => "Internal", + } +} + +fn kind_to_str(kind: Kind) -> &'static str { + match kind { + Kind::A => "a", + Kind::B => "b", + Kind::C => "c", + Kind::X => "x", + Kind::Y => "y", + Kind::Z => "z", + } +} + +fn str_to_kind(s: &str) -> Kind { + match s { + "a" => Kind::A, + "b" => Kind::B, + "c" => Kind::C, + "x" => Kind::X, + "y" => Kind::Y, + "z" => Kind::Z, + _ => Kind::B, + } +} + +fn pack_mods(mods: &[f32]) -> Vec { + mods.iter().flat_map(|m| m.to_le_bytes()).collect() +} + +fn unpack_mods(bytes: &[u8]) -> Vec { + bytes + .chunks_exact(4) + .map(|c| f32::from_le_bytes([c[0], c[1], c[2], c[3]])) + .collect() +} + +fn serialize_ion_kinds(kinds: &[Kind]) -> String { + kinds + .iter() + .map(|k| kind_to_str(*k)) + .collect::>() + .join(",") +} + +fn deserialize_ion_kinds(s: &str) -> Vec { + s.split(',') + .filter(|s| !s.is_empty()) + .map(str_to_kind) + .collect() +} + +fn serialize_potential_mods(mods: &[(ModificationSpecificity, f32)]) -> String { + serde_json::to_string( + &mods + .iter() + .map(|(s, m)| (format!("{s}"), *m)) + .collect::>(), + ) + .unwrap_or_default() +} + +fn deserialize_potential_mods(s: &str) -> Vec<(ModificationSpecificity, f32)> { + if s.is_empty() { + return Vec::new(); + } + serde_json::from_str::>(s) + .map(|pairs| { + pairs + .into_iter() + .filter_map(|(spec, mass)| spec.parse().ok().map(|s| (s, mass))) + .collect() + }) + .unwrap_or_default() +} + +fn serialize_min_values(values: &[f32]) -> String { + values + .iter() + .map(|v| format!("{:08x}", v.to_bits())) + .collect::>() + .join(",") +} + +fn deserialize_min_values(s: &str) -> Vec { + s.split(',') + .filter(|s| !s.is_empty()) + .filter_map(|s| u32::from_str_radix(s, 16).ok().map(f32::from_bits)) + .collect() +} + +fn writer_props() -> parquet::errors::Result { + Ok(WriterProperties::builder() + .set_compression(parquet::basic::Compression::ZSTD(ZstdLevel::try_new( + ZSTD_COMPRESSION_LEVEL, + )?)) + .build()) +} + +// Serialization +fn write_peptides( + peptides: &[Peptide], + decoy_tag: &str, + gen_decoys: bool, +) -> parquet::errors::Result> { + let mut writer = SerializedFileWriter::new( + Vec::new(), + build_peptides_schema()?.into(), + writer_props()?.into(), + )?; + + for chunk in peptides.chunks(ROW_GROUP_SIZE) { + let mut rg = writer.next_row_group()?; + macro_rules! col { + ($vals:expr, $ty:ty) => {{ + if let Some(mut c) = rg.next_column()? { + c.typed::<$ty>().write_batch(&$vals, None, None)?; + c.close()?; + } + }}; + } + + col!( + chunk + .iter() + .enumerate() + .map(|(i, _)| i as i32) + .collect::>(), + Int32Type + ); + col!(chunk.iter().map(|p| p.decoy).collect::>(), BoolType); + col!( + chunk + .iter() + .map(|p| ByteArray::from(p.sequence.as_ref())) + .collect::>(), + ByteArrayType + ); + col!( + chunk + .iter() + .map(|p| ByteArray::from(pack_mods(&p.modifications))) + .collect::>(), + ByteArrayType + ); + + // Optional nterm/cterm + for getter in [|p: &Peptide| p.nterm, |p: &Peptide| p.cterm] { + if let Some(mut c) = rg.next_column()? { + let (mut vals, mut defs) = (Vec::new(), Vec::new()); + for p in chunk { + match getter(p) { + Some(v) => { + vals.push(v); + defs.push(1); + } + None => defs.push(0), + } + } + c.typed::() + .write_batch(&vals, Some(&defs), None)?; + c.close()?; + } + } + + col!( + chunk.iter().map(|p| p.monoisotopic).collect::>(), + FloatType + ); + col!( + chunk + .iter() + .map(|p| p.missed_cleavages as i32) + .collect::>(), + Int32Type + ); + col!( + chunk.iter().map(|p| p.semi_enzymatic).collect::>(), + BoolType + ); + col!( + chunk + .iter() + .map(|p| position_to_i32(p.position)) + .collect::>(), + Int32Type + ); + col!( + chunk + .iter() + .map(|p| ByteArray::from(p.proteins(decoy_tag, gen_decoys).as_bytes())) + .collect::>(), + ByteArrayType + ); + rg.close()?; + } + writer.into_inner() +} + +fn write_fragments(fragments: &[Theoretical]) -> parquet::errors::Result> { + let mut writer = SerializedFileWriter::new( + Vec::new(), + build_fragments_schema()?.into(), + writer_props()?.into(), + )?; + for chunk in fragments.chunks(ROW_GROUP_SIZE) { + let mut rg = writer.next_row_group()?; + if let Some(mut c) = rg.next_column()? { + c.typed::().write_batch( + &chunk + .iter() + .map(|f| f.peptide_index.0 as i32) + .collect::>(), + None, + None, + )?; + c.close()?; + } + if let Some(mut c) = rg.next_column()? { + c.typed::().write_batch( + &chunk.iter().map(|f| f.fragment_mz).collect::>(), + None, + None, + )?; + c.close()?; + } + rg.close()?; + } + writer.into_inner() +} + +fn write_metadata(db: &IndexedDatabase) -> parquet::errors::Result> { + let mut writer = SerializedFileWriter::new( + Vec::new(), + build_metadata_schema()?.into(), + writer_props()?.into(), + )?; + let meta = [ + ("version", env!("CARGO_PKG_VERSION").to_string()), + ("bucket_size", db.bucket_size.to_string()), + ("generate_decoys", db.generate_decoys.to_string()), + ("decoy_tag", db.decoy_tag.clone()), + ("ion_kinds", serialize_ion_kinds(&db.ion_kinds)), + ( + "potential_mods", + serialize_potential_mods(&db.potential_mods), + ), + ("min_values", serialize_min_values(&db.min_value)), + ]; + let mut rg = writer.next_row_group()?; + if let Some(mut c) = rg.next_column()? { + c.typed::().write_batch( + &meta + .iter() + .map(|(k, _)| ByteArray::from(k.as_bytes())) + .collect::>(), + None, + None, + )?; + c.close()?; + } + if let Some(mut c) = rg.next_column()? { + c.typed::().write_batch( + &meta + .iter() + .map(|(_, v)| ByteArray::from(v.as_bytes())) + .collect::>(), + None, + None, + )?; + c.close()?; + } + rg.close()?; + writer.into_inner() +} + +// Deserialization +fn read_metadata( + bytes: &[u8], +) -> parquet::errors::Result> { + let reader = SerializedFileReader::new(bytes::Bytes::from(bytes.to_vec()))?; + let mut meta = std::collections::HashMap::new(); + for row in reader.get_row_iter(None)? { + let row = row?; + meta.insert( + row.get_string(0)?.to_string(), + row.get_string(1)?.to_string(), + ); + } + Ok(meta) +} + +fn read_peptides( + bytes: &[u8], + decoy_tag: &str, + gen_decoys: bool, +) -> parquet::errors::Result> { + let reader = SerializedFileReader::new(bytes::Bytes::from(bytes.to_vec()))?; + let mut peptides = Vec::new(); + for row in reader.get_row_iter(None)? { + let row = row?; + let decoy = row.get_bool(1)?; + let proteins: Vec> = row + .get_string(10)? + .split(';') + .filter(|s| !s.is_empty()) + .map(|s| { + Arc::from(if decoy && gen_decoys && s.starts_with(decoy_tag) { + &s[decoy_tag.len()..] + } else { + s + }) + }) + .collect(); + peptides.push(Peptide { + decoy, + sequence: Arc::from(row.get_bytes(2)?.data()), + modifications: unpack_mods(row.get_bytes(3)?.data()), + nterm: row.get_float(4).ok(), + cterm: row.get_float(5).ok(), + monoisotopic: row.get_float(6)?, + missed_cleavages: row.get_int(7)? as u8, + semi_enzymatic: row.get_bool(8)?, + position: i32_to_position(row.get_int(9)?), + proteins, + }); + } + Ok(peptides) +} + +fn read_fragments(bytes: &[u8]) -> parquet::errors::Result> { + let reader = SerializedFileReader::new(bytes::Bytes::from(bytes.to_vec()))?; + let mut fragments = Vec::new(); + for row in reader.get_row_iter(None)? { + let row = row?; + fragments.push(Theoretical { + peptide_index: PeptideIx(row.get_int(0)? as u32), + fragment_mz: row.get_float(1)?, + }); + } + Ok(fragments) +} + +fn read_file_bytes(path: &CloudPath) -> Result, Error> { + match path { + CloudPath::Local(p) => Ok(std::fs::read(p)?), + CloudPath::S3 { .. } => Err(Error::IO(std::io::Error::new( + std::io::ErrorKind::Unsupported, + "S3 index loading not yet supported", + ))), + } +} + +// Public API +pub fn serialize_index(db: &IndexedDatabase, output_dir: &CloudPath) -> Result<(), Error> { + output_dir.mkdir()?; + for (name, bytes) in [ + ( + "peptides.parquet", + write_peptides(&db.peptides, &db.decoy_tag, db.generate_decoys).map_err(pq_err)?, + ), + ( + "fragments.parquet", + write_fragments(&db.fragments).map_err(pq_err)?, + ), + ("metadata.parquet", write_metadata(db).map_err(pq_err)?), + ] { + let mut path = output_dir.clone(); + path.push(name); + path.write_bytes_sync(bytes)?; + } + Ok(()) +} + +pub fn deserialize_index(input_dir: &CloudPath) -> Result { + let mut meta_path = input_dir.clone(); + meta_path.push("metadata.parquet"); + let meta = read_metadata(&read_file_bytes(&meta_path)?).map_err(pq_err)?; + + let bucket_size = meta + .get("bucket_size") + .and_then(|s| s.parse().ok()) + .unwrap_or(8192); + let generate_decoys = meta + .get("generate_decoys") + .map(|s| s == "true") + .unwrap_or(true); + let decoy_tag = meta + .get("decoy_tag") + .cloned() + .unwrap_or_else(|| "rev_".into()); + let ion_kinds = meta + .get("ion_kinds") + .map(|s| deserialize_ion_kinds(s)) + .unwrap_or_else(|| vec![Kind::B, Kind::Y]); + let potential_mods = meta + .get("potential_mods") + .map(|s| deserialize_potential_mods(s)) + .unwrap_or_default(); + let min_value = meta + .get("min_values") + .map(|s| deserialize_min_values(s)) + .unwrap_or_default(); + + let mut pep_path = input_dir.clone(); + pep_path.push("peptides.parquet"); + let peptides = + read_peptides(&read_file_bytes(&pep_path)?, &decoy_tag, generate_decoys).map_err(pq_err)?; + + let mut frag_path = input_dir.clone(); + frag_path.push("fragments.parquet"); + let fragments = read_fragments(&read_file_bytes(&frag_path)?).map_err(pq_err)?; + + Ok(IndexedDatabase { + peptides, + fragments, + ion_kinds, + min_value, + potential_mods, + bucket_size, + generate_decoys, + decoy_tag, + }) +} + +pub fn validate_index( + original: &IndexedDatabase, + loaded: &IndexedDatabase, +) -> Result<(), ValidationError> { + macro_rules! check { + ($cond:expr, $err:expr) => { + if $cond { + return Err($err); + } + }; + } + + check!( + original.peptides.len() != loaded.peptides.len(), + ValidationError::PeptideCountMismatch { + expected: original.peptides.len(), + actual: loaded.peptides.len() + } + ); + check!( + original.fragments.len() != loaded.fragments.len(), + ValidationError::FragmentCountMismatch { + expected: original.fragments.len(), + actual: loaded.fragments.len() + } + ); + + check!( + original.bucket_size != loaded.bucket_size, + ValidationError::MetadataMismatch { + field: "bucket_size" + } + ); + check!( + original.generate_decoys != loaded.generate_decoys, + ValidationError::MetadataMismatch { + field: "generate_decoys" + } + ); + check!( + original.decoy_tag != loaded.decoy_tag, + ValidationError::MetadataMismatch { field: "decoy_tag" } + ); + check!( + original.ion_kinds != loaded.ion_kinds, + ValidationError::MetadataMismatch { field: "ion_kinds" } + ); + + check!( + original.min_value.len() != loaded.min_value.len(), + ValidationError::MetadataMismatch { field: "min_value" } + ); + for (a, b) in original.min_value.iter().zip(&loaded.min_value) { + check!( + a.to_bits() != b.to_bits(), + ValidationError::MetadataMismatch { field: "min_value" } + ); + } + + check!( + original.potential_mods.len() != loaded.potential_mods.len(), + ValidationError::MetadataMismatch { + field: "potential_mods" + } + ); + for ((sa, ma), (sb, mb)) in original.potential_mods.iter().zip(&loaded.potential_mods) { + check!( + sa != sb || ma.to_bits() != mb.to_bits(), + ValidationError::MetadataMismatch { + field: "potential_mods" + } + ); + } + + for (i, (o, l)) in original.peptides.iter().zip(&loaded.peptides).enumerate() { + check!( + o.decoy != l.decoy, + ValidationError::PeptideMismatch { + index: i, + field: "decoy" + } + ); + check!( + o.sequence != l.sequence, + ValidationError::PeptideMismatch { + index: i, + field: "sequence" + } + ); + check!( + o.modifications != l.modifications, + ValidationError::PeptideMismatch { + index: i, + field: "modifications" + } + ); + check!( + o.nterm != l.nterm, + ValidationError::PeptideMismatch { + index: i, + field: "nterm" + } + ); + check!( + o.cterm != l.cterm, + ValidationError::PeptideMismatch { + index: i, + field: "cterm" + } + ); + check!( + o.monoisotopic.to_bits() != l.monoisotopic.to_bits(), + ValidationError::PeptideMismatch { + index: i, + field: "monoisotopic" + } + ); + check!( + o.missed_cleavages != l.missed_cleavages, + ValidationError::PeptideMismatch { + index: i, + field: "missed_cleavages" + } + ); + check!( + o.semi_enzymatic != l.semi_enzymatic, + ValidationError::PeptideMismatch { + index: i, + field: "semi_enzymatic" + } + ); + check!( + o.position != l.position, + ValidationError::PeptideMismatch { + index: i, + field: "position" + } + ); + check!( + o.proteins != l.proteins, + ValidationError::PeptideMismatch { + index: i, + field: "proteins" + } + ); + } + + for (i, (o, l)) in original.fragments.iter().zip(&loaded.fragments).enumerate() { + check!( + o.peptide_index != l.peptide_index, + ValidationError::FragmentMismatch { + index: i, + field: "peptide_index" + } + ); + check!( + o.fragment_mz.to_bits() != l.fragment_mz.to_bits(), + ValidationError::FragmentMismatch { + index: i, + field: "fragment_mz" + } + ); + } + Ok(()) +} + +pub fn export_index(db: &IndexedDatabase, output_path: &CloudPath) -> Result<(), Error> { + let mut frag_counts = vec![0i32; db.peptides.len()]; + for f in &db.fragments { + if (f.peptide_index.0 as usize) < frag_counts.len() { + frag_counts[f.peptide_index.0 as usize] += 1; + } + } + + let mut writer = SerializedFileWriter::new( + Vec::new(), + build_export_schema().map_err(pq_err)?.into(), + writer_props().map_err(pq_err)?.into(), + ) + .map_err(pq_err)?; + + for (chunk_idx, chunk) in db.peptides.chunks(ROW_GROUP_SIZE).enumerate() { + let start = chunk_idx * ROW_GROUP_SIZE; + let mut rg = writer.next_row_group().map_err(pq_err)?; + macro_rules! col { + ($vals:expr, $ty:ty) => {{ + if let Some(mut c) = rg.next_column().map_err(pq_err)? { + c.typed::<$ty>() + .write_batch(&$vals, None, None) + .map_err(pq_err)?; + c.close().map_err(pq_err)?; + } + }}; + } + + col!( + chunk + .iter() + .map(|p| ByteArray::from(p.to_string().as_bytes())) + .collect::>(), + ByteArrayType + ); + col!( + chunk + .iter() + .map(|p| ByteArray::from(p.sequence.as_ref())) + .collect::>(), + ByteArrayType + ); + col!( + chunk + .iter() + .map(|p| ByteArray::from(p.proteins(&db.decoy_tag, db.generate_decoys).as_bytes())) + .collect::>(), + ByteArrayType + ); + col!(chunk.iter().map(|p| p.decoy).collect::>(), BoolType); + col!( + chunk.iter().map(|p| p.monoisotopic).collect::>(), + FloatType + ); + col!( + chunk + .iter() + .map(|p| p.missed_cleavages as i32) + .collect::>(), + Int32Type + ); + col!( + chunk.iter().map(|p| p.semi_enzymatic).collect::>(), + BoolType + ); + col!( + chunk + .iter() + .map(|p| ByteArray::from(position_to_str(p.position).as_bytes())) + .collect::>(), + ByteArrayType + ); + col!( + chunk + .iter() + .map(|p| p.proteins.len() as i32) + .collect::>(), + Int32Type + ); + col!( + (0..chunk.len()) + .map(|i| frag_counts.get(start + i).copied().unwrap_or(0)) + .collect::>(), + Int32Type + ); + rg.close().map_err(pq_err)?; + } + output_path.write_bytes_sync(writer.into_inner().map_err(pq_err)?)?; + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::TempDir; + + fn make_test_peptide(decoy: bool, seq: &[u8], proteins: Vec<&str>) -> Peptide { + Peptide { + decoy, + sequence: Arc::from(seq), + modifications: vec![0.0; seq.len()], + nterm: None, + cterm: None, + monoisotopic: 1000.5, + missed_cleavages: 1, + semi_enzymatic: false, + position: Position::Internal, + proteins: proteins.into_iter().map(Arc::from).collect(), + } + } + + fn make_test_database() -> IndexedDatabase { + let peptides = vec![ + make_test_peptide(false, b"PEPTIDE", vec!["sp|P12345|PROT1"]), + make_test_peptide( + false, + b"SEQVENCE", + vec!["sp|P12345|PROT1", "sp|P67890|PROT2"], + ), + make_test_peptide(true, b"EDITPEP", vec!["sp|P12345|PROT1"]), + ]; + let fragments = vec![ + Theoretical { + peptide_index: PeptideIx(0), + fragment_mz: 500.25, + }, + Theoretical { + peptide_index: PeptideIx(0), + fragment_mz: 600.30, + }, + Theoretical { + peptide_index: PeptideIx(1), + fragment_mz: 450.15, + }, + Theoretical { + peptide_index: PeptideIx(2), + fragment_mz: 550.20, + }, + ]; + IndexedDatabase { + peptides, + fragments, + ion_kinds: vec![Kind::B, Kind::Y], + min_value: vec![100.0, 200.0, 300.0], + potential_mods: vec![ + (ModificationSpecificity::Residue(b'M'), 15.994915), + (ModificationSpecificity::Residue(b'C'), 57.021464), + ], + bucket_size: 8192, + generate_decoys: true, + decoy_tag: "rev_".to_string(), + } + } + + #[test] + fn test_pack_unpack_modifications() { + let mods = vec![1.5f32, -2.3f32, 0.0f32, 100.123f32]; + let unpacked = unpack_mods(&pack_mods(&mods)); + assert!(mods + .iter() + .zip(&unpacked) + .all(|(a, b)| a.to_bits() == b.to_bits())); + } + + #[test] + fn test_position_conversion() { + for (i, pos) in [ + Position::Nterm, + Position::Cterm, + Position::Full, + Position::Internal, + ] + .iter() + .enumerate() + { + assert_eq!(position_to_i32(*pos), i as i32); + assert_eq!(i32_to_position(i as i32), *pos); + } + assert_eq!(i32_to_position(99), Position::Internal); + } + + #[test] + fn test_ion_kinds_serialization() { + let kinds = vec![Kind::A, Kind::B, Kind::Y]; + assert_eq!(deserialize_ion_kinds(&serialize_ion_kinds(&kinds)), kinds); + } + + #[test] + fn test_min_values_serialization() { + let values = vec![ + 1.0f32, + 2.5f32, + 100.123f32, + f32::MIN_POSITIVE, + f32::MAX, + -0.0f32, + ]; + let deser = deserialize_min_values(&serialize_min_values(&values)); + assert!(values + .iter() + .zip(&deser) + .all(|(a, b)| a.to_bits() == b.to_bits())); + } + + #[test] + fn test_potential_mods_serialization() { + let mods = vec![ + (ModificationSpecificity::Residue(b'M'), 15.994915f32), + (ModificationSpecificity::PeptideN(None), 42.010565f32), + (ModificationSpecificity::Residue(b'C'), 57.021464f32), + ]; + let deser = deserialize_potential_mods(&serialize_potential_mods(&mods)); + assert!(mods + .iter() + .zip(&deser) + .all(|((sa, ma), (sb, mb))| sa == sb && ma.to_bits() == mb.to_bits())); + } + + #[test] + fn test_empty_serialization() { + assert!(deserialize_min_values(&serialize_min_values(&[])).is_empty()); + assert!(deserialize_potential_mods(&serialize_potential_mods(&[])).is_empty()); + } + + #[test] + fn test_full_round_trip() { + let db = make_test_database(); + let tmp = TempDir::new().unwrap(); + let path = CloudPath::Local(tmp.path().to_path_buf()); + + serialize_index(&db, &path).expect("serialize failed"); + let loaded = deserialize_index(&path).expect("deserialize failed"); + validate_index(&db, &loaded).expect("validation failed"); + } + + #[test] + fn test_round_trip_with_optional_mods() { + let mut db = make_test_database(); + db.peptides[0].nterm = Some(42.01); + db.peptides[1].cterm = Some(-17.03); + db.peptides[2].nterm = Some(28.0); + db.peptides[2].cterm = Some(14.5); + + let tmp = TempDir::new().unwrap(); + let path = CloudPath::Local(tmp.path().to_path_buf()); + + serialize_index(&db, &path).unwrap(); + let loaded = deserialize_index(&path).unwrap(); + validate_index(&db, &loaded).expect("validation failed with optional mods"); + } + + #[test] + fn test_round_trip_preserves_float_precision() { + let mut db = make_test_database(); + db.peptides[0].monoisotopic = std::f32::consts::PI; + db.peptides[1].monoisotopic = f32::MIN_POSITIVE; + db.fragments[0].fragment_mz = std::f32::consts::E; + + let tmp = TempDir::new().unwrap(); + let path = CloudPath::Local(tmp.path().to_path_buf()); + + serialize_index(&db, &path).unwrap(); + let loaded = deserialize_index(&path).unwrap(); + + assert_eq!( + db.peptides[0].monoisotopic.to_bits(), + loaded.peptides[0].monoisotopic.to_bits() + ); + assert_eq!( + db.peptides[1].monoisotopic.to_bits(), + loaded.peptides[1].monoisotopic.to_bits() + ); + assert_eq!( + db.fragments[0].fragment_mz.to_bits(), + loaded.fragments[0].fragment_mz.to_bits() + ); + } + + #[test] + fn test_empty_database_round_trip() { + let db = IndexedDatabase::default(); + let tmp = TempDir::new().unwrap(); + let path = CloudPath::Local(tmp.path().to_path_buf()); + + serialize_index(&db, &path).unwrap(); + let loaded = deserialize_index(&path).unwrap(); + validate_index(&db, &loaded).expect("empty database validation failed"); + } + + #[test] + fn test_validation_detects_peptide_count_mismatch() { + let db1 = make_test_database(); + let mut db2 = make_test_database(); + db2.peptides.pop(); + + let err = validate_index(&db1, &db2).unwrap_err(); + assert!(matches!(err, ValidationError::PeptideCountMismatch { .. })); + } + + #[test] + fn test_validation_detects_fragment_mismatch() { + let db1 = make_test_database(); + let mut db2 = make_test_database(); + db2.fragments[0].fragment_mz = 999.99; + + let err = validate_index(&db1, &db2).unwrap_err(); + assert!(matches!( + err, + ValidationError::FragmentMismatch { + field: "fragment_mz", + .. + } + )); + } + + #[test] + fn test_validation_detects_metadata_mismatch() { + let db1 = make_test_database(); + let mut db2 = make_test_database(); + db2.bucket_size = 9999; + + let err = validate_index(&db1, &db2).unwrap_err(); + assert!(matches!( + err, + ValidationError::MetadataMismatch { + field: "bucket_size" + } + )); + } + + #[test] + fn test_export_index() { + let db = make_test_database(); + let tmp = TempDir::new().unwrap(); + let export_path = CloudPath::Local(tmp.path().join("export.parquet")); + + export_index(&db, &export_path).expect("export failed"); + + // Verify file exists and is readable + let bytes = std::fs::read(tmp.path().join("export.parquet")).unwrap(); + assert!(!bytes.is_empty()); + } + + #[test] + fn test_validation_error_display() { + let err = ValidationError::PeptideCountMismatch { + expected: 100, + actual: 50, + }; + assert_eq!( + err.to_string(), + "peptide count mismatch: expected 100, got 50" + ); + + let err = ValidationError::PeptideMismatch { + index: 42, + field: "sequence", + }; + assert_eq!( + err.to_string(), + "peptide mismatch at index 42: field 'sequence'" + ); + + let err = ValidationError::MetadataMismatch { field: "decoy_tag" }; + assert_eq!(err.to_string(), "metadata mismatch: field 'decoy_tag'"); + } +} diff --git a/crates/sage-cloudpath/src/lib.rs b/crates/sage-cloudpath/src/lib.rs index 7d54f09a..2230e722 100644 --- a/crates/sage-cloudpath/src/lib.rs +++ b/crates/sage-cloudpath/src/lib.rs @@ -15,6 +15,9 @@ pub use util::FileFormat; #[cfg(feature = "parquet")] pub mod parquet; +#[cfg(feature = "parquet")] +pub mod index_parquet; + static S3_CLIENT: once_cell::sync::OnceCell = once_cell::sync::OnceCell::new(); async fn s3_client() -> &'static aws_sdk_s3::Client { diff --git a/crates/sage-cloudpath/src/parquet.rs b/crates/sage-cloudpath/src/parquet.rs index f513730a..122d86bb 100644 --- a/crates/sage-cloudpath/src/parquet.rs +++ b/crates/sage-cloudpath/src/parquet.rs @@ -8,6 +8,10 @@ #![cfg(feature = "parquet")] +// Shared constants for parquet serialization +pub const ROW_GROUP_SIZE: usize = 65536; +pub const ZSTD_COMPRESSION_LEVEL: i32 = 3; + use std::collections::HashMap; use std::hash::BuildHasher; @@ -133,13 +137,15 @@ pub fn serialize_features( let schema = build_schema()?; let options = WriterProperties::builder() - .set_compression(parquet::basic::Compression::ZSTD(ZstdLevel::try_new(3)?)) + .set_compression(parquet::basic::Compression::ZSTD(ZstdLevel::try_new( + ZSTD_COMPRESSION_LEVEL, + )?)) .build(); let buf = Vec::new(); let mut writer = SerializedFileWriter::new(buf, schema.into(), options.into())?; - for features in features.chunks(65536) { + for features in features.chunks(ROW_GROUP_SIZE) { let mut rg = writer.next_row_group()?; macro_rules! write_col { ($field:ident, $ty:ident) => { @@ -265,14 +271,16 @@ pub fn serialize_matched_fragments( let schema = build_matched_fragment_schema()?; let options = WriterProperties::builder() - .set_compression(parquet::basic::Compression::ZSTD(ZstdLevel::try_new(3)?)) + .set_compression(parquet::basic::Compression::ZSTD(ZstdLevel::try_new( + ZSTD_COMPRESSION_LEVEL, + )?)) .build(); let buf = Vec::new(); let mut writer = SerializedFileWriter::new(buf, schema.into(), options.into())?; - for features in features.chunks(65536) { + for features in features.chunks(ROW_GROUP_SIZE) { let mut rg = writer.next_row_group()?; if let Some(mut col) = rg.next_column()? { @@ -426,7 +434,9 @@ pub fn serialize_lfq( let schema = build_lfq_schema()?; let options = WriterProperties::builder() - .set_compression(parquet::basic::Compression::ZSTD(ZstdLevel::try_new(3)?)) + .set_compression(parquet::basic::Compression::ZSTD(ZstdLevel::try_new( + ZSTD_COMPRESSION_LEVEL, + )?)) .build(); let buf = Vec::new(); diff --git a/crates/sage/src/database.rs b/crates/sage/src/database.rs index ef3f6a47..5090fa9e 100644 --- a/crates/sage/src/database.rs +++ b/crates/sage/src/database.rs @@ -381,7 +381,7 @@ pub struct Theoretical { pub fragment_mz: f32, } -#[derive(Default)] +#[derive(Default, PartialEq)] pub struct IndexedDatabase { pub peptides: Vec, pub fragments: Vec, From 771bad8ffc23274adff82674061c92d66e726c37 Mon Sep 17 00:00:00 2001 From: Filip Rumenovski Date: Wed, 31 Dec 2025 15:49:13 +0100 Subject: [PATCH 2/2] Add changes to repository --- CHANGELOG.md | 2 +- crates/sage-cli/src/input.rs | 9 - crates/sage-cli/src/main.rs | 7 - crates/sage-cli/src/runner.rs | 9 - crates/sage-cloudpath/src/index_parquet.rs | 701 +++++---------------- 5 files changed, 165 insertions(+), 563 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 288eeec4..0ec9443e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,7 +10,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Speedup on the generation of databases when large number of peptides are redundant. - Initial support for searching diaPASEF data - `override_precursor_charge` setting that forces multiple charge states to be searched -- Index serialization to parquet format (`--save-index`, `--load-index`, `--export-index`, `--validate-index`) +- Index serialization to parquet format (`--save-index`, `--load-index`, `--validate-index`) ### Breaking Changes - `precursor_ppm` field reports the non-absoluted average mass error, rather than the absoluted average mass error. - Don't deisotope reporter ion regions if MS2-based TMT/iTRAQ is used diff --git a/crates/sage-cli/src/input.rs b/crates/sage-cli/src/input.rs index 9ffce300..28cd7afd 100644 --- a/crates/sage-cli/src/input.rs +++ b/crates/sage-cli/src/input.rs @@ -54,9 +54,6 @@ pub struct Search { #[serde(skip_serializing)] pub load_index: Option, - #[serde(skip_serializing)] - pub export_index: Option, - #[serde(skip_serializing)] pub validate_index: bool, } @@ -94,8 +91,6 @@ pub struct Input { #[serde(skip)] pub load_index: Option, #[serde(skip)] - pub export_index: Option, - #[serde(skip)] pub validate_index: Option, } @@ -241,9 +236,6 @@ impl Input { if let Some(load_index) = matches.get_one::("load-index") { input.load_index = Some(load_index.clone()); } - if let Some(export_index) = matches.get_one::("export-index") { - input.export_index = Some(export_index.clone()); - } if let Some(validate_index) = matches.get_one::("validate-index").copied() { input.validate_index = Some(validate_index); } @@ -380,7 +372,6 @@ impl Input { score_type, save_index: self.save_index, load_index: self.load_index, - export_index: self.export_index, validate_index: self.validate_index.unwrap_or(false), }) } diff --git a/crates/sage-cli/src/main.rs b/crates/sage-cli/src/main.rs index c8aeeb81..e70b93f5 100644 --- a/crates/sage-cli/src/main.rs +++ b/crates/sage-cli/src/main.rs @@ -102,13 +102,6 @@ fn main() -> anyhow::Result<()> { .help("Load a pre-built index from parquet directory (skips FASTA processing)") .value_hint(ValueHint::DirPath), ) - .arg( - Arg::new("export-index") - .long("export-index") - .value_parser(clap::builder::NonEmptyStringValueParser::new()) - .help("Export a user-friendly peptide index parquet file") - .value_hint(ValueHint::FilePath), - ) .arg( Arg::new("validate-index") .long("validate-index") diff --git a/crates/sage-cli/src/runner.rs b/crates/sage-cli/src/runner.rs index e4eb9574..cb69d20a 100644 --- a/crates/sage-cli/src/runner.rs +++ b/crates/sage-cli/src/runner.rs @@ -177,15 +177,6 @@ impl Runner { info!("Index saved successfully"); } - // Export user-friendly index if requested - if let Some(ref export_path) = parameters.export_index { - info!("Exporting user-friendly index to {}", export_path); - let path = export_path.parse::()?; - sage_cloudpath::index_parquet::export_index(&built_db, &path) - .with_context(|| format!("Failed to export index to `{}`", export_path))?; - info!("Index exported successfully"); - } - built_db }; diff --git a/crates/sage-cloudpath/src/index_parquet.rs b/crates/sage-cloudpath/src/index_parquet.rs index eeee0d8c..da4cbffb 100644 --- a/crates/sage-cloudpath/src/index_parquet.rs +++ b/crates/sage-cloudpath/src/index_parquet.rs @@ -1,7 +1,4 @@ //! Parquet serialization for IndexedDatabase -//! -//! Provides functions to serialize and deserialize the fragment ion index -//! to parquet format for fast loading, as well as user-friendly export. #![cfg(feature = "parquet")] @@ -24,7 +21,6 @@ use sage_core::peptide::Peptide; use crate::parquet::{ROW_GROUP_SIZE, ZSTD_COMPRESSION_LEVEL}; use crate::{CloudPath, Error}; -/// Convert parquet error to io error fn pq_err(e: parquet::errors::ParquetError) -> std::io::Error { std::io::Error::new(std::io::ErrorKind::Other, e.to_string()) } @@ -42,70 +38,49 @@ impl std::fmt::Display for ValidationError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { Self::PeptideCountMismatch { expected, actual } => { - write!( - f, - "peptide count mismatch: expected {expected}, got {actual}" - ) + write!(f, "peptide count: expected {expected}, got {actual}") } Self::FragmentCountMismatch { expected, actual } => { - write!( - f, - "fragment count mismatch: expected {expected}, got {actual}" - ) + write!(f, "fragment count: expected {expected}, got {actual}") } Self::PeptideMismatch { index, field } => { - write!(f, "peptide mismatch at index {index}: field '{field}'") + write!(f, "peptide[{index}].{field} mismatch") } Self::FragmentMismatch { index, field } => { - write!(f, "fragment mismatch at index {index}: field '{field}'") + write!(f, "fragment[{index}].{field} mismatch") } - Self::MetadataMismatch { field } => write!(f, "metadata mismatch: field '{field}'"), + Self::MetadataMismatch { field } => write!(f, "metadata.{field} mismatch"), } } } impl std::error::Error for ValidationError {} -// Schema definitions fn build_peptides_schema() -> parquet::errors::Result { parquet::schema::parser::parse_message_type( - r#"message peptides_schema { - required int32 peptide_id; required boolean decoy; required byte_array sequence; - required byte_array modifications; optional float nterm; optional float cterm; - required float monoisotopic; required int32 missed_cleavages; - required boolean semi_enzymatic; required int32 position; - required byte_array proteins (utf8); - }"#, + "message peptides { + required int32 id; required boolean decoy; required byte_array seq; + required byte_array mods; optional float nterm; optional float cterm; + required float mono; required int32 mc; required boolean semi; + required int32 pos; required byte_array proteins (utf8); + }", ) } fn build_fragments_schema() -> parquet::errors::Result { parquet::schema::parser::parse_message_type( - r#"message fragments_schema { required int32 peptide_index; required float fragment_mz; }"#, + "message fragments { required int32 pep_ix; required float mz; }", ) } fn build_metadata_schema() -> parquet::errors::Result { parquet::schema::parser::parse_message_type( - r#"message metadata_schema { required byte_array key (utf8); required byte_array value (utf8); }"#, + "message metadata { required byte_array key (utf8); required byte_array val (utf8); }", ) } -fn build_export_schema() -> parquet::errors::Result { - parquet::schema::parser::parse_message_type( - r#"message peptide_export_schema { - required byte_array peptide (utf8); required byte_array stripped_sequence (utf8); - required byte_array proteins (utf8); required boolean is_decoy; - required float monoisotopic_mass; required int32 missed_cleavages; - required boolean semi_enzymatic; required byte_array position (utf8); - required int32 num_proteins; required int32 fragment_count; - }"#, - ) -} - -// Conversion helpers -fn position_to_i32(pos: Position) -> i32 { - match pos { +fn position_to_i32(p: Position) -> i32 { + match p { Position::Nterm => 0, Position::Cterm => 1, Position::Full => 2, @@ -113,8 +88,8 @@ fn position_to_i32(pos: Position) -> i32 { } } -fn i32_to_position(val: i32) -> Position { - match val { +fn i32_to_position(v: i32) -> Position { + match v { 0 => Position::Nterm, 1 => Position::Cterm, 2 => Position::Full, @@ -122,34 +97,25 @@ fn i32_to_position(val: i32) -> Position { } } -fn position_to_str(pos: Position) -> &'static str { - match pos { - Position::Nterm => "N-term", - Position::Cterm => "C-term", - Position::Full => "Full", - Position::Internal => "Internal", +fn kind_to_char(k: Kind) -> char { + match k { + Kind::A => 'a', + Kind::B => 'b', + Kind::C => 'c', + Kind::X => 'x', + Kind::Y => 'y', + Kind::Z => 'z', } } -fn kind_to_str(kind: Kind) -> &'static str { - match kind { - Kind::A => "a", - Kind::B => "b", - Kind::C => "c", - Kind::X => "x", - Kind::Y => "y", - Kind::Z => "z", - } -} - -fn str_to_kind(s: &str) -> Kind { - match s { - "a" => Kind::A, - "b" => Kind::B, - "c" => Kind::C, - "x" => Kind::X, - "y" => Kind::Y, - "z" => Kind::Z, +fn char_to_kind(c: char) -> Kind { + match c { + 'a' => Kind::A, + 'b' => Kind::B, + 'c' => Kind::C, + 'x' => Kind::X, + 'y' => Kind::Y, + 'z' => Kind::Z, _ => Kind::B, } } @@ -158,26 +124,18 @@ fn pack_mods(mods: &[f32]) -> Vec { mods.iter().flat_map(|m| m.to_le_bytes()).collect() } -fn unpack_mods(bytes: &[u8]) -> Vec { - bytes - .chunks_exact(4) +fn unpack_mods(b: &[u8]) -> Vec { + b.chunks_exact(4) .map(|c| f32::from_le_bytes([c[0], c[1], c[2], c[3]])) .collect() } -fn serialize_ion_kinds(kinds: &[Kind]) -> String { - kinds - .iter() - .map(|k| kind_to_str(*k)) - .collect::>() - .join(",") +fn serialize_kinds(kinds: &[Kind]) -> String { + kinds.iter().map(|k| kind_to_char(*k)).collect() } -fn deserialize_ion_kinds(s: &str) -> Vec { - s.split(',') - .filter(|s| !s.is_empty()) - .map(str_to_kind) - .collect() +fn deserialize_kinds(s: &str) -> Vec { + s.chars().map(char_to_kind).collect() } fn serialize_potential_mods(mods: &[(ModificationSpecificity, f32)]) -> String { @@ -195,10 +153,9 @@ fn deserialize_potential_mods(s: &str) -> Vec<(ModificationSpecificity, f32)> { return Vec::new(); } serde_json::from_str::>(s) - .map(|pairs| { - pairs - .into_iter() - .filter_map(|(spec, mass)| spec.parse().ok().map(|s| (s, mass))) + .map(|v| { + v.into_iter() + .filter_map(|(spec, m)| spec.parse().ok().map(|s| (s, m))) .collect() }) .unwrap_or_default() @@ -209,13 +166,16 @@ fn serialize_min_values(values: &[f32]) -> String { .iter() .map(|v| format!("{:08x}", v.to_bits())) .collect::>() - .join(",") + .join("") } fn deserialize_min_values(s: &str) -> Vec { - s.split(',') - .filter(|s| !s.is_empty()) - .filter_map(|s| u32::from_str_radix(s, 16).ok().map(f32::from_bits)) + (0..s.len()) + .step_by(8) + .filter_map(|i| { + s.get(i..i + 8) + .and_then(|h| u32::from_str_radix(h, 16).ok().map(f32::from_bits)) + }) .collect() } @@ -227,29 +187,22 @@ fn writer_props() -> parquet::errors::Result { .build()) } -// Serialization -fn write_peptides( - peptides: &[Peptide], - decoy_tag: &str, - gen_decoys: bool, -) -> parquet::errors::Result> { - let mut writer = SerializedFileWriter::new( +fn write_peptides(peps: &[Peptide], tag: &str, gen: bool) -> parquet::errors::Result> { + let mut w = SerializedFileWriter::new( Vec::new(), build_peptides_schema()?.into(), writer_props()?.into(), )?; - - for chunk in peptides.chunks(ROW_GROUP_SIZE) { - let mut rg = writer.next_row_group()?; + for chunk in peps.chunks(ROW_GROUP_SIZE) { + let mut rg = w.next_row_group()?; macro_rules! col { - ($vals:expr, $ty:ty) => {{ + ($v:expr, $t:ty) => { if let Some(mut c) = rg.next_column()? { - c.typed::<$ty>().write_batch(&$vals, None, None)?; + c.typed::<$t>().write_batch(&$v, None, None)?; c.close()?; } - }}; + }; } - col!( chunk .iter() @@ -273,18 +226,15 @@ fn write_peptides( .collect::>(), ByteArrayType ); - - // Optional nterm/cterm for getter in [|p: &Peptide| p.nterm, |p: &Peptide| p.cterm] { if let Some(mut c) = rg.next_column()? { let (mut vals, mut defs) = (Vec::new(), Vec::new()); for p in chunk { - match getter(p) { - Some(v) => { - vals.push(v); - defs.push(1); - } - None => defs.push(0), + if let Some(v) = getter(p) { + vals.push(v); + defs.push(1); + } else { + defs.push(0); } } c.typed::() @@ -292,7 +242,6 @@ fn write_peptides( c.close()?; } } - col!( chunk.iter().map(|p| p.monoisotopic).collect::>(), FloatType @@ -318,23 +267,23 @@ fn write_peptides( col!( chunk .iter() - .map(|p| ByteArray::from(p.proteins(decoy_tag, gen_decoys).as_bytes())) + .map(|p| ByteArray::from(p.proteins(tag, gen).as_bytes())) .collect::>(), ByteArrayType ); rg.close()?; } - writer.into_inner() + w.into_inner() } -fn write_fragments(fragments: &[Theoretical]) -> parquet::errors::Result> { - let mut writer = SerializedFileWriter::new( +fn write_fragments(frags: &[Theoretical]) -> parquet::errors::Result> { + let mut w = SerializedFileWriter::new( Vec::new(), build_fragments_schema()?.into(), writer_props()?.into(), )?; - for chunk in fragments.chunks(ROW_GROUP_SIZE) { - let mut rg = writer.next_row_group()?; + for chunk in frags.chunks(ROW_GROUP_SIZE) { + let mut rg = w.next_row_group()?; if let Some(mut c) = rg.next_column()? { c.typed::().write_batch( &chunk @@ -356,28 +305,27 @@ fn write_fragments(fragments: &[Theoretical]) -> parquet::errors::Result } rg.close()?; } - writer.into_inner() + w.into_inner() } fn write_metadata(db: &IndexedDatabase) -> parquet::errors::Result> { - let mut writer = SerializedFileWriter::new( + let mut w = SerializedFileWriter::new( Vec::new(), build_metadata_schema()?.into(), writer_props()?.into(), )?; let meta = [ - ("version", env!("CARGO_PKG_VERSION").to_string()), ("bucket_size", db.bucket_size.to_string()), ("generate_decoys", db.generate_decoys.to_string()), ("decoy_tag", db.decoy_tag.clone()), - ("ion_kinds", serialize_ion_kinds(&db.ion_kinds)), + ("ion_kinds", serialize_kinds(&db.ion_kinds)), ( "potential_mods", serialize_potential_mods(&db.potential_mods), ), ("min_values", serialize_min_values(&db.min_value)), ]; - let mut rg = writer.next_row_group()?; + let mut rg = w.next_row_group()?; if let Some(mut c) = rg.next_column()? { c.typed::().write_batch( &meta @@ -401,33 +349,25 @@ fn write_metadata(db: &IndexedDatabase) -> parquet::errors::Result> { c.close()?; } rg.close()?; - writer.into_inner() + w.into_inner() } -// Deserialization fn read_metadata( bytes: &[u8], ) -> parquet::errors::Result> { - let reader = SerializedFileReader::new(bytes::Bytes::from(bytes.to_vec()))?; - let mut meta = std::collections::HashMap::new(); - for row in reader.get_row_iter(None)? { + let r = SerializedFileReader::new(bytes::Bytes::from(bytes.to_vec()))?; + let mut m = std::collections::HashMap::new(); + for row in r.get_row_iter(None)? { let row = row?; - meta.insert( - row.get_string(0)?.to_string(), - row.get_string(1)?.to_string(), - ); + m.insert(row.get_string(0)?.into(), row.get_string(1)?.into()); } - Ok(meta) + Ok(m) } -fn read_peptides( - bytes: &[u8], - decoy_tag: &str, - gen_decoys: bool, -) -> parquet::errors::Result> { - let reader = SerializedFileReader::new(bytes::Bytes::from(bytes.to_vec()))?; - let mut peptides = Vec::new(); - for row in reader.get_row_iter(None)? { +fn read_peptides(bytes: &[u8], tag: &str, gen: bool) -> parquet::errors::Result> { + let r = SerializedFileReader::new(bytes::Bytes::from(bytes.to_vec()))?; + let mut peps = Vec::new(); + for row in r.get_row_iter(None)? { let row = row?; let decoy = row.get_bool(1)?; let proteins: Vec> = row @@ -435,14 +375,14 @@ fn read_peptides( .split(';') .filter(|s| !s.is_empty()) .map(|s| { - Arc::from(if decoy && gen_decoys && s.starts_with(decoy_tag) { - &s[decoy_tag.len()..] + Arc::from(if decoy && gen && s.starts_with(tag) { + &s[tag.len()..] } else { s }) }) .collect(); - peptides.push(Peptide { + peps.push(Peptide { decoy, sequence: Arc::from(row.get_bytes(2)?.data()), modifications: unpack_mods(row.get_bytes(3)?.data()), @@ -455,35 +395,35 @@ fn read_peptides( proteins, }); } - Ok(peptides) + Ok(peps) } fn read_fragments(bytes: &[u8]) -> parquet::errors::Result> { - let reader = SerializedFileReader::new(bytes::Bytes::from(bytes.to_vec()))?; - let mut fragments = Vec::new(); - for row in reader.get_row_iter(None)? { + let r = SerializedFileReader::new(bytes::Bytes::from(bytes.to_vec()))?; + let mut frags = Vec::new(); + for row in r.get_row_iter(None)? { let row = row?; - fragments.push(Theoretical { + frags.push(Theoretical { peptide_index: PeptideIx(row.get_int(0)? as u32), fragment_mz: row.get_float(1)?, }); } - Ok(fragments) + Ok(frags) } -fn read_file_bytes(path: &CloudPath) -> Result, Error> { +fn read_file(path: &CloudPath) -> Result, Error> { match path { CloudPath::Local(p) => Ok(std::fs::read(p)?), CloudPath::S3 { .. } => Err(Error::IO(std::io::Error::new( std::io::ErrorKind::Unsupported, - "S3 index loading not yet supported", + "S3 not supported", ))), } } -// Public API -pub fn serialize_index(db: &IndexedDatabase, output_dir: &CloudPath) -> Result<(), Error> { - output_dir.mkdir()?; +/// Serialize an IndexedDatabase to a parquet directory +pub fn serialize_index(db: &IndexedDatabase, dir: &CloudPath) -> Result<(), Error> { + dir.mkdir()?; for (name, bytes) in [ ( "peptides.parquet", @@ -495,17 +435,18 @@ pub fn serialize_index(db: &IndexedDatabase, output_dir: &CloudPath) -> Result<( ), ("metadata.parquet", write_metadata(db).map_err(pq_err)?), ] { - let mut path = output_dir.clone(); - path.push(name); - path.write_bytes_sync(bytes)?; + let mut p = dir.clone(); + p.push(name); + p.write_bytes_sync(bytes)?; } Ok(()) } -pub fn deserialize_index(input_dir: &CloudPath) -> Result { - let mut meta_path = input_dir.clone(); - meta_path.push("metadata.parquet"); - let meta = read_metadata(&read_file_bytes(&meta_path)?).map_err(pq_err)?; +/// Deserialize an IndexedDatabase from a parquet directory +pub fn deserialize_index(dir: &CloudPath) -> Result { + let mut mp = dir.clone(); + mp.push("metadata.parquet"); + let meta = read_metadata(&read_file(&mp)?).map_err(pq_err)?; let bucket_size = meta .get("bucket_size") @@ -521,7 +462,7 @@ pub fn deserialize_index(input_dir: &CloudPath) -> Result Result Result Result<(), ValidationError> { +/// Validate that two IndexedDatabases are identical (bit-exact for floats) +pub fn validate_index(a: &IndexedDatabase, b: &IndexedDatabase) -> Result<(), ValidationError> { macro_rules! check { - ($cond:expr, $err:expr) => { - if $cond { - return Err($err); + ($c:expr, $e:expr) => { + if $c { + return Err($e); } }; } check!( - original.peptides.len() != loaded.peptides.len(), + a.peptides.len() != b.peptides.len(), ValidationError::PeptideCountMismatch { - expected: original.peptides.len(), - actual: loaded.peptides.len() + expected: a.peptides.len(), + actual: b.peptides.len() } ); check!( - original.fragments.len() != loaded.fragments.len(), + a.fragments.len() != b.fragments.len(), ValidationError::FragmentCountMismatch { - expected: original.fragments.len(), - actual: loaded.fragments.len() + expected: a.fragments.len(), + actual: b.fragments.len() } ); - check!( - original.bucket_size != loaded.bucket_size, + a.bucket_size != b.bucket_size, ValidationError::MetadataMismatch { field: "bucket_size" } ); check!( - original.generate_decoys != loaded.generate_decoys, + a.generate_decoys != b.generate_decoys, ValidationError::MetadataMismatch { field: "generate_decoys" } ); check!( - original.decoy_tag != loaded.decoy_tag, + a.decoy_tag != b.decoy_tag, ValidationError::MetadataMismatch { field: "decoy_tag" } ); check!( - original.ion_kinds != loaded.ion_kinds, + a.ion_kinds != b.ion_kinds, ValidationError::MetadataMismatch { field: "ion_kinds" } ); - check!( - original.min_value.len() != loaded.min_value.len(), + a.min_value.len() != b.min_value.len(), ValidationError::MetadataMismatch { field: "min_value" } ); - for (a, b) in original.min_value.iter().zip(&loaded.min_value) { + for (x, y) in a.min_value.iter().zip(&b.min_value) { check!( - a.to_bits() != b.to_bits(), + x.to_bits() != y.to_bits(), ValidationError::MetadataMismatch { field: "min_value" } ); } - check!( - original.potential_mods.len() != loaded.potential_mods.len(), + a.potential_mods.len() != b.potential_mods.len(), ValidationError::MetadataMismatch { field: "potential_mods" } ); - for ((sa, ma), (sb, mb)) in original.potential_mods.iter().zip(&loaded.potential_mods) { + for ((sa, ma), (sb, mb)) in a.potential_mods.iter().zip(&b.potential_mods) { check!( sa != sb || ma.to_bits() != mb.to_bits(), ValidationError::MetadataMismatch { @@ -627,7 +562,7 @@ pub fn validate_index( ); } - for (i, (o, l)) in original.peptides.iter().zip(&loaded.peptides).enumerate() { + for (i, (o, l)) in a.peptides.iter().zip(&b.peptides).enumerate() { check!( o.decoy != l.decoy, ValidationError::PeptideMismatch { @@ -700,7 +635,7 @@ pub fn validate_index( ); } - for (i, (o, l)) in original.fragments.iter().zip(&loaded.fragments).enumerate() { + for (i, (o, l)) in a.fragments.iter().zip(&b.fragments).enumerate() { check!( o.peptide_index != l.peptide_index, ValidationError::FragmentMismatch { @@ -719,371 +654,63 @@ pub fn validate_index( Ok(()) } -pub fn export_index(db: &IndexedDatabase, output_path: &CloudPath) -> Result<(), Error> { - let mut frag_counts = vec![0i32; db.peptides.len()]; - for f in &db.fragments { - if (f.peptide_index.0 as usize) < frag_counts.len() { - frag_counts[f.peptide_index.0 as usize] += 1; - } - } - - let mut writer = SerializedFileWriter::new( - Vec::new(), - build_export_schema().map_err(pq_err)?.into(), - writer_props().map_err(pq_err)?.into(), - ) - .map_err(pq_err)?; - - for (chunk_idx, chunk) in db.peptides.chunks(ROW_GROUP_SIZE).enumerate() { - let start = chunk_idx * ROW_GROUP_SIZE; - let mut rg = writer.next_row_group().map_err(pq_err)?; - macro_rules! col { - ($vals:expr, $ty:ty) => {{ - if let Some(mut c) = rg.next_column().map_err(pq_err)? { - c.typed::<$ty>() - .write_batch(&$vals, None, None) - .map_err(pq_err)?; - c.close().map_err(pq_err)?; - } - }}; - } - - col!( - chunk - .iter() - .map(|p| ByteArray::from(p.to_string().as_bytes())) - .collect::>(), - ByteArrayType - ); - col!( - chunk - .iter() - .map(|p| ByteArray::from(p.sequence.as_ref())) - .collect::>(), - ByteArrayType - ); - col!( - chunk - .iter() - .map(|p| ByteArray::from(p.proteins(&db.decoy_tag, db.generate_decoys).as_bytes())) - .collect::>(), - ByteArrayType - ); - col!(chunk.iter().map(|p| p.decoy).collect::>(), BoolType); - col!( - chunk.iter().map(|p| p.monoisotopic).collect::>(), - FloatType - ); - col!( - chunk - .iter() - .map(|p| p.missed_cleavages as i32) - .collect::>(), - Int32Type - ); - col!( - chunk.iter().map(|p| p.semi_enzymatic).collect::>(), - BoolType - ); - col!( - chunk - .iter() - .map(|p| ByteArray::from(position_to_str(p.position).as_bytes())) - .collect::>(), - ByteArrayType - ); - col!( - chunk - .iter() - .map(|p| p.proteins.len() as i32) - .collect::>(), - Int32Type - ); - col!( - (0..chunk.len()) - .map(|i| frag_counts.get(start + i).copied().unwrap_or(0)) - .collect::>(), - Int32Type - ); - rg.close().map_err(pq_err)?; - } - output_path.write_bytes_sync(writer.into_inner().map_err(pq_err)?)?; - Ok(()) -} - #[cfg(test)] mod tests { use super::*; use tempfile::TempDir; - fn make_test_peptide(decoy: bool, seq: &[u8], proteins: Vec<&str>) -> Peptide { - Peptide { - decoy, - sequence: Arc::from(seq), - modifications: vec![0.0; seq.len()], - nterm: None, - cterm: None, - monoisotopic: 1000.5, - missed_cleavages: 1, - semi_enzymatic: false, - position: Position::Internal, - proteins: proteins.into_iter().map(Arc::from).collect(), - } - } - - fn make_test_database() -> IndexedDatabase { - let peptides = vec![ - make_test_peptide(false, b"PEPTIDE", vec!["sp|P12345|PROT1"]), - make_test_peptide( - false, - b"SEQVENCE", - vec!["sp|P12345|PROT1", "sp|P67890|PROT2"], - ), - make_test_peptide(true, b"EDITPEP", vec!["sp|P12345|PROT1"]), - ]; - let fragments = vec![ - Theoretical { - peptide_index: PeptideIx(0), - fragment_mz: 500.25, - }, - Theoretical { - peptide_index: PeptideIx(0), - fragment_mz: 600.30, - }, - Theoretical { - peptide_index: PeptideIx(1), - fragment_mz: 450.15, - }, - Theoretical { - peptide_index: PeptideIx(2), - fragment_mz: 550.20, - }, - ]; + fn test_db() -> IndexedDatabase { IndexedDatabase { - peptides, - fragments, + peptides: vec![Peptide { + decoy: false, + sequence: Arc::from(b"PEPTIDE".as_slice()), + modifications: vec![0.0; 7], + nterm: Some(42.01), + cterm: None, + monoisotopic: std::f32::consts::PI, + missed_cleavages: 1, + semi_enzymatic: false, + position: Position::Internal, + proteins: vec![Arc::from("PROT1")], + }], + fragments: vec![Theoretical { + peptide_index: PeptideIx(0), + fragment_mz: std::f32::consts::E, + }], ion_kinds: vec![Kind::B, Kind::Y], - min_value: vec![100.0, 200.0, 300.0], - potential_mods: vec![ - (ModificationSpecificity::Residue(b'M'), 15.994915), - (ModificationSpecificity::Residue(b'C'), 57.021464), - ], + min_value: vec![100.0, f32::MIN_POSITIVE], + potential_mods: vec![(ModificationSpecificity::Residue(b'M'), 15.994915)], bucket_size: 8192, generate_decoys: true, - decoy_tag: "rev_".to_string(), + decoy_tag: "rev_".into(), } } #[test] - fn test_pack_unpack_modifications() { - let mods = vec![1.5f32, -2.3f32, 0.0f32, 100.123f32]; - let unpacked = unpack_mods(&pack_mods(&mods)); - assert!(mods - .iter() - .zip(&unpacked) - .all(|(a, b)| a.to_bits() == b.to_bits())); - } - - #[test] - fn test_position_conversion() { - for (i, pos) in [ - Position::Nterm, - Position::Cterm, - Position::Full, - Position::Internal, - ] - .iter() - .enumerate() - { - assert_eq!(position_to_i32(*pos), i as i32); - assert_eq!(i32_to_position(i as i32), *pos); - } - assert_eq!(i32_to_position(99), Position::Internal); - } - - #[test] - fn test_ion_kinds_serialization() { - let kinds = vec![Kind::A, Kind::B, Kind::Y]; - assert_eq!(deserialize_ion_kinds(&serialize_ion_kinds(&kinds)), kinds); - } - - #[test] - fn test_min_values_serialization() { - let values = vec![ - 1.0f32, - 2.5f32, - 100.123f32, - f32::MIN_POSITIVE, - f32::MAX, - -0.0f32, - ]; - let deser = deserialize_min_values(&serialize_min_values(&values)); - assert!(values - .iter() - .zip(&deser) - .all(|(a, b)| a.to_bits() == b.to_bits())); - } - - #[test] - fn test_potential_mods_serialization() { - let mods = vec![ - (ModificationSpecificity::Residue(b'M'), 15.994915f32), - (ModificationSpecificity::PeptideN(None), 42.010565f32), - (ModificationSpecificity::Residue(b'C'), 57.021464f32), - ]; - let deser = deserialize_potential_mods(&serialize_potential_mods(&mods)); - assert!(mods - .iter() - .zip(&deser) - .all(|((sa, ma), (sb, mb))| sa == sb && ma.to_bits() == mb.to_bits())); - } - - #[test] - fn test_empty_serialization() { - assert!(deserialize_min_values(&serialize_min_values(&[])).is_empty()); - assert!(deserialize_potential_mods(&serialize_potential_mods(&[])).is_empty()); - } - - #[test] - fn test_full_round_trip() { - let db = make_test_database(); - let tmp = TempDir::new().unwrap(); - let path = CloudPath::Local(tmp.path().to_path_buf()); - - serialize_index(&db, &path).expect("serialize failed"); - let loaded = deserialize_index(&path).expect("deserialize failed"); - validate_index(&db, &loaded).expect("validation failed"); - } - - #[test] - fn test_round_trip_with_optional_mods() { - let mut db = make_test_database(); - db.peptides[0].nterm = Some(42.01); - db.peptides[1].cterm = Some(-17.03); - db.peptides[2].nterm = Some(28.0); - db.peptides[2].cterm = Some(14.5); - + fn round_trip() { + let db = test_db(); let tmp = TempDir::new().unwrap(); - let path = CloudPath::Local(tmp.path().to_path_buf()); - + let path = CloudPath::Local(tmp.path().into()); serialize_index(&db, &path).unwrap(); let loaded = deserialize_index(&path).unwrap(); - validate_index(&db, &loaded).expect("validation failed with optional mods"); + validate_index(&db, &loaded).unwrap(); } #[test] - fn test_round_trip_preserves_float_precision() { - let mut db = make_test_database(); - db.peptides[0].monoisotopic = std::f32::consts::PI; - db.peptides[1].monoisotopic = f32::MIN_POSITIVE; - db.fragments[0].fragment_mz = std::f32::consts::E; - - let tmp = TempDir::new().unwrap(); - let path = CloudPath::Local(tmp.path().to_path_buf()); - - serialize_index(&db, &path).unwrap(); - let loaded = deserialize_index(&path).unwrap(); - - assert_eq!( - db.peptides[0].monoisotopic.to_bits(), - loaded.peptides[0].monoisotopic.to_bits() - ); - assert_eq!( - db.peptides[1].monoisotopic.to_bits(), - loaded.peptides[1].monoisotopic.to_bits() - ); - assert_eq!( - db.fragments[0].fragment_mz.to_bits(), - loaded.fragments[0].fragment_mz.to_bits() - ); - } - - #[test] - fn test_empty_database_round_trip() { + fn empty_db() { let db = IndexedDatabase::default(); let tmp = TempDir::new().unwrap(); - let path = CloudPath::Local(tmp.path().to_path_buf()); - + let path = CloudPath::Local(tmp.path().into()); serialize_index(&db, &path).unwrap(); let loaded = deserialize_index(&path).unwrap(); - validate_index(&db, &loaded).expect("empty database validation failed"); - } - - #[test] - fn test_validation_detects_peptide_count_mismatch() { - let db1 = make_test_database(); - let mut db2 = make_test_database(); - db2.peptides.pop(); - - let err = validate_index(&db1, &db2).unwrap_err(); - assert!(matches!(err, ValidationError::PeptideCountMismatch { .. })); - } - - #[test] - fn test_validation_detects_fragment_mismatch() { - let db1 = make_test_database(); - let mut db2 = make_test_database(); - db2.fragments[0].fragment_mz = 999.99; - - let err = validate_index(&db1, &db2).unwrap_err(); - assert!(matches!( - err, - ValidationError::FragmentMismatch { - field: "fragment_mz", - .. - } - )); - } - - #[test] - fn test_validation_detects_metadata_mismatch() { - let db1 = make_test_database(); - let mut db2 = make_test_database(); - db2.bucket_size = 9999; - - let err = validate_index(&db1, &db2).unwrap_err(); - assert!(matches!( - err, - ValidationError::MetadataMismatch { - field: "bucket_size" - } - )); - } - - #[test] - fn test_export_index() { - let db = make_test_database(); - let tmp = TempDir::new().unwrap(); - let export_path = CloudPath::Local(tmp.path().join("export.parquet")); - - export_index(&db, &export_path).expect("export failed"); - - // Verify file exists and is readable - let bytes = std::fs::read(tmp.path().join("export.parquet")).unwrap(); - assert!(!bytes.is_empty()); + validate_index(&db, &loaded).unwrap(); } #[test] - fn test_validation_error_display() { - let err = ValidationError::PeptideCountMismatch { - expected: 100, - actual: 50, - }; - assert_eq!( - err.to_string(), - "peptide count mismatch: expected 100, got 50" - ); - - let err = ValidationError::PeptideMismatch { - index: 42, - field: "sequence", - }; - assert_eq!( - err.to_string(), - "peptide mismatch at index 42: field 'sequence'" - ); - - let err = ValidationError::MetadataMismatch { field: "decoy_tag" }; - assert_eq!(err.to_string(), "metadata mismatch: field 'decoy_tag'"); + fn detects_mismatch() { + let db1 = test_db(); + let mut db2 = test_db(); + db2.fragments[0].fragment_mz = 999.0; + assert!(validate_index(&db1, &db2).is_err()); } }