From 4a1c6cc49f4398a285a25f4c8253824cc08ce26f Mon Sep 17 00:00:00 2001 From: Vineet Date: Wed, 22 Apr 2026 17:35:41 -0400 Subject: [PATCH] ingest: VariantReader trait for VCF Pull VCF reading behind a small VariantReader trait so new formats (GDS, BGEN) can plug in without touching the processing core. Collapses the duplicate record loops in ingest/vcf.rs and staar/genotype.rs into one RecordContext path. FormatHandler::open_reader is the extension point. Invariance golden and the ground-truth-vs-R suite bit-identical. Closes #87. --- src/ingest/format.rs | 24 ++- src/ingest/mod.rs | 3 + src/ingest/reader.rs | 120 +++++++++++++++ src/ingest/vcf.rs | 348 ++++++++++++++++++++++++++++++------------ src/staar/genotype.rs | 154 +++++-------------- 5 files changed, 441 insertions(+), 208 deletions(-) create mode 100644 src/ingest/reader.rs diff --git a/src/ingest/format.rs b/src/ingest/format.rs index bfe6fec..ded6d79 100644 --- a/src/ingest/format.rs +++ b/src/ingest/format.rs @@ -8,7 +8,7 @@ use parquet::file::reader::FileReader; use crate::error::CohortError; -use super::{sniff_delimiter, Delimiter, InputFormat}; +use super::{sniff_delimiter, Delimiter, InputFormat, VariantReader}; /// Detection confidence: 0.0 (no match) to 1.0 (certain). pub enum DetectResult { @@ -41,6 +41,20 @@ pub trait FormatHandler: Send + Sync { /// For tabular formats, the detected delimiter (if any). fn delimiter(&self, path: &Path) -> Option; + + /// Open a streaming variant reader. Default errors so tabular/parquet + /// handlers (which don't carry genotypes) can skip the impl. + fn open_reader( + &self, + path: &Path, + _threads: usize, + ) -> Result, CohortError> { + Err(CohortError::Input(format!( + "format '{}' does not support variant streaming for '{}'", + self.name(), + path.display() + ))) + } } fn path_lower(path: &Path) -> String { @@ -100,6 +114,14 @@ impl FormatHandler for VcfHandler { fn delimiter(&self, _path: &Path) -> Option { None } + + fn open_reader( + &self, + path: &Path, + threads: usize, + ) -> Result, CohortError> { + Ok(Box::new(super::vcf::VcfVariantReader::open(path, threads)?)) + } } pub struct ParquetHandler; diff --git a/src/ingest/mod.rs b/src/ingest/mod.rs index 2d09bea..85d543a 100644 --- a/src/ingest/mod.rs +++ b/src/ingest/mod.rs @@ -2,9 +2,12 @@ pub mod detect; pub mod format; +pub mod reader; pub mod sql; pub mod vcf; +pub use reader::{RawRecord, VariantReader}; + use std::io::{BufRead, BufReader}; use std::path::Path; diff --git a/src/ingest/reader.rs b/src/ingest/reader.rs new file mode 100644 index 0000000..42f5460 --- /dev/null +++ b/src/ingest/reader.rs @@ -0,0 +1,120 @@ +//! Format-agnostic variant reader. +//! +//! The reader drives iteration and calls a processor closure on each record. +//! Callback form (not a lending iterator) because noodles VCF field wrappers +//! (`AlternateBases<'r>`, `Samples<'r>`, ...) expose their inner `&'r str` +//! only through `AsRef::as_ref(&self) -> &str`, which elides to the wrapper's +//! borrow rather than the record's. Keeping iteration inside the reader lets +//! wrapper locals stay alive across one closure call without any buffer copy. +//! +//! Per-variant normalization (`normalize_chrom`, `parsimony_normalize`, +//! `ascii_uppercase_into`) and multi-allelic split stay in the processor. + +use crate::error::CohortError; + +/// One variant as seen by the processor. Slices are valid only during the +/// `for_each` closure invocation that produced them. +pub struct RawRecord<'a> { + pub chromosome: &'a str, + pub position: i32, + pub ref_allele: &'a str, + /// Raw ALT field. Comma-separated for multi-allelic sites; the processor + /// splits and parsimony-normalizes per alt. + pub alt_alleles: &'a str, + pub rsid: Option<&'a str>, + pub qual: Option<&'a str>, + pub filter: Option<&'a str>, + /// Raw per-sample text (tab-separated, FORMAT-prefixed for VCF). Passed + /// through to `GenotypeWriter::push` unchanged. + pub samples_text: &'a str, +} + +/// Streaming variant reader. One reader per file; returning `Err` from `f` +/// terminates iteration. +pub trait VariantReader: Send { + fn for_each( + &mut self, + f: &mut dyn for<'a> FnMut(RawRecord<'a>) -> Result<(), CohortError>, + ) -> Result<(), CohortError>; +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::ingest::vcf::VcfVariantReader; + use std::io::Write; + + /// Opens a tiny inline VCF and asserts canonical fields including a + /// multi-allelic site and a missing-filter row. Reader-level only: no + /// `RecordContext`, no parquet side effects. + #[test] + fn vcf_reader_yields_canonical_records() { + let mut tmp = tempfile::NamedTempFile::new().unwrap(); + writeln!(tmp, "##fileformat=VCFv4.3").unwrap(); + writeln!(tmp, "##contig=").unwrap(); + writeln!(tmp, "##FORMAT=").unwrap(); + writeln!( + tmp, + "#CHROM\tPOS\tID\tREF\tALT\tQUAL\tFILTER\tINFO\tFORMAT\tS1\tS2" + ) + .unwrap(); + writeln!(tmp, "22\t100\trs1\tA\tG\t30.5\tPASS\t.\tGT\t0/0\t0/1").unwrap(); + writeln!(tmp, "22\t200\t.\tA\tC,T\t.\t.\t.\tGT\t0/1\t1/2").unwrap(); + writeln!(tmp, "22\t300\trs3\tC\tT\t40\tLowQual\t.\tGT\t1/1\t0/0").unwrap(); + tmp.flush().unwrap(); + + let mut reader = VcfVariantReader::open(tmp.path(), 1).unwrap(); + + struct Row { + chrom: String, + pos: i32, + r: String, + a: String, + rsid: Option, + qual: Option, + filter: Option, + } + let mut rows: Vec = Vec::new(); + let mut samples_first: Option = None; + reader + .for_each(&mut |rec| { + rows.push(Row { + chrom: rec.chromosome.to_string(), + pos: rec.position, + r: rec.ref_allele.to_string(), + a: rec.alt_alleles.to_string(), + rsid: rec.rsid.map(str::to_string), + qual: rec.qual.map(str::to_string), + filter: rec.filter.map(str::to_string), + }); + if samples_first.is_none() { + samples_first = Some(rec.samples_text.to_string()); + } + Ok(()) + }) + .unwrap(); + + assert_eq!(rows.len(), 3); + + assert_eq!(rows[0].chrom, "22"); + assert_eq!(rows[0].pos, 100); + assert_eq!(rows[0].r, "A"); + assert_eq!(rows[0].a, "G"); + assert_eq!(rows[0].rsid.as_deref(), Some("rs1")); + assert_eq!(rows[0].qual.as_deref(), Some("30.5")); + assert_eq!(rows[0].filter.as_deref(), Some("PASS")); + + assert_eq!(rows[1].pos, 200); + assert_eq!(rows[1].a, "C,T"); + assert!(rows[1].rsid.is_none()); + assert!(rows[1].qual.is_none()); + assert!(rows[1].filter.is_none()); + + assert_eq!(rows[2].pos, 300); + assert_eq!(rows[2].filter.as_deref(), Some("LowQual")); + + // noodles 0.73 `Samples` covers everything after INFO, which for VCF + // includes the FORMAT column; `GenotypeWriter::push` parses this. + assert_eq!(samples_first.as_deref(), Some("GT\t0/0\t0/1")); + } +} diff --git a/src/ingest/vcf.rs b/src/ingest/vcf.rs index 9f9c9c2..0c785e6 100644 --- a/src/ingest/vcf.rs +++ b/src/ingest/vcf.rs @@ -5,7 +5,7 @@ //! Batch size is derived from the caller's memory budget (see `derive_batch_size`). //! Multi-allelic sites are split into biallelic records. -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::fs::File; use std::io::{BufRead, BufReader}; use std::num::NonZeroUsize; @@ -19,6 +19,7 @@ use parquet::arrow::ArrowWriter; use parquet::basic::Compression; use parquet::file::properties::WriterProperties; +use super::{RawRecord, VariantReader}; use crate::staar::genotype::GenotypeWriter; use crate::store::list::VariantSetWriter; use crate::error::CohortError; @@ -258,6 +259,101 @@ pub(crate) fn ascii_uppercase_into(src: &str, buf: &mut String) { } } +/// Owns a reusable `noodles_vcf::Record` so iteration does not allocate per +/// row. Field wrappers (`AlternateBases`, `Ids`, `Filters`, `Samples`) live +/// as locals inside `for_each` so their `&str` borrows stay alive across the +/// closure call. +pub struct VcfVariantReader { + inner: noodles_vcf::io::Reader>, + record: noodles_vcf::Record, + qual_buf: String, + path: PathBuf, +} + +impl VcfVariantReader { + pub fn open(path: &Path, threads: usize) -> Result { + let buf = open_vcf(path, threads)?; + let mut inner = noodles_vcf::io::Reader::new(buf); + inner.read_header().map_err(|e| { + CohortError::Input(format!("VCF header in '{}': {e}", path.display())) + })?; + Ok(Self { + inner, + record: noodles_vcf::Record::default(), + qual_buf: String::with_capacity(32), + path: path.to_path_buf(), + }) + } +} + +impl VariantReader for VcfVariantReader { + fn for_each( + &mut self, + f: &mut dyn for<'a> FnMut(RawRecord<'a>) -> Result<(), CohortError>, + ) -> Result<(), CohortError> { + loop { + let n = self.inner.read_record(&mut self.record).map_err(|e| { + CohortError::Analysis(format!( + "VCF parse error in {}: {e}", + self.path.display() + )) + })?; + if n == 0 { + return Ok(()); + } + + let position = match self.record.variant_start() { + Some(Ok(p)) => p.get() as i32, + _ => continue, + }; + + self.qual_buf.clear(); + let qual: Option<&str> = match self.record.quality_score() { + Some(Ok(q)) => { + use std::fmt::Write; + let _ = write!(self.qual_buf, "{q}"); + Some(self.qual_buf.as_str()) + } + _ => None, + }; + + let alt_bases = self.record.alternate_bases(); + let alt_alleles: &str = alt_bases.as_ref(); + + let ids = self.record.ids(); + let ids_raw: &str = ids.as_ref(); + let rsid = if ids_raw.is_empty() || ids_raw == "." { + None + } else { + ids_raw.split(';').next() + }; + + let filters = self.record.filters(); + let filter_raw: &str = filters.as_ref(); + let filter = if filter_raw.is_empty() || filter_raw == "." { + None + } else { + Some(filter_raw) + }; + + let samples = self.record.samples(); + let samples_text: &str = samples.as_ref(); + + let rec = RawRecord { + chromosome: self.record.reference_sequence_name(), + position, + ref_allele: self.record.reference_bases(), + alt_alleles, + rsid, + qual, + filter, + samples_text, + }; + f(rec)?; + } + } +} + /// Ingest result stats. pub struct VcfIngestResult { pub variant_count: u64, @@ -275,10 +371,13 @@ struct ChromWriter { /// Mutable state for the record-processing loop. Owns the reusable buffers, /// per-chromosome writers, and counters. Used by both sequential and parallel paths. +/// `skip_variants` switches the genotype-only mode used by `extract_genotypes`: +/// variants.parquet writers are not created and cross-file chromosome-order +/// violations are surfaced as errors (because `GenotypeWriter` is single-chrom +/// streaming and cannot reopen a closed part). struct RecordContext<'a> { ref_buf: String, alt_buf: String, - qual_buf: String, writers: HashMap<&'static str, ChromWriter>, geno_writer: Option<&'a mut GenotypeWriter>, schema: Arc, @@ -290,6 +389,8 @@ struct RecordContext<'a> { variant_count: u64, filtered_contigs: u64, multiallelic_split: u64, + skip_variants: bool, + finished_chroms: HashSet<&'static str>, } impl<'a> RecordContext<'a> { @@ -299,6 +400,43 @@ impl<'a> RecordContext<'a> { output_dir: PathBuf, part_id: Option, chromosome_filter: Option, + ) -> Self { + Self::build( + geno_writer, + memory_budget, + output_dir, + part_id, + chromosome_filter, + false, + ) + } + + /// Genotype-only mode: `variants.parquet` is not written. Used by + /// `extract_genotypes`. Cross-file chromosome-order violations error out + /// because the `GenotypeWriter` is single-chrom streaming. + fn new_genotype_only( + geno_writer: &'a mut GenotypeWriter, + memory_budget: u64, + output_dir: PathBuf, + chromosome_filter: Option, + ) -> Self { + Self::build( + Some(geno_writer), + memory_budget, + output_dir, + None, + chromosome_filter, + true, + ) + } + + fn build( + geno_writer: Option<&'a mut GenotypeWriter>, + memory_budget: u64, + output_dir: PathBuf, + part_id: Option, + chromosome_filter: Option, + skip_variants: bool, ) -> Self { let batch_size = derive_batch_size(memory_budget); let schema = Arc::new(vcf_schema()); @@ -309,7 +447,6 @@ impl<'a> RecordContext<'a> { Self { ref_buf: String::with_capacity(256), alt_buf: String::with_capacity(256), - qual_buf: String::with_capacity(32), writers: HashMap::new(), geno_writer, schema, @@ -321,6 +458,8 @@ impl<'a> RecordContext<'a> { variant_count: 0, filtered_contigs: 0, multiallelic_split: 0, + skip_variants, + finished_chroms: HashSet::new(), } } @@ -335,14 +474,17 @@ impl<'a> RecordContext<'a> { Ok(dir.join(name)) } - fn process_record( + fn process( &mut self, - record: &noodles_vcf::Record, + rec: RawRecord<'_>, output: &dyn Output, ) -> Result<(), CohortError> { - let chrom = match normalize_chrom(record.reference_sequence_name()) { + let chrom = match normalize_chrom(rec.chromosome) { Some(c) => c, - None => { self.filtered_contigs += 1; return Ok(()); } + None => { + self.filtered_contigs += 1; + return Ok(()); + } }; if let Some(filt) = &self.chromosome_filter { if !filt.contains_canonical(chrom) { @@ -350,84 +492,112 @@ impl<'a> RecordContext<'a> { return Ok(()); } } - let pos = match record.variant_start() { - Some(Ok(p)) => p.get() as i32, - _ => return Ok(()), - }; - - ascii_uppercase_into(record.reference_bases(), &mut self.ref_buf); - - let alt_bases = record.alternate_bases(); - let alt_str: &str = alt_bases.as_ref(); - - let ids_wrapper = record.ids(); - let ids_str: &str = ids_wrapper.as_ref(); - let rsid = if ids_str.is_empty() || ids_str == "." { - None - } else { - ids_str.split(';').next() - }; - self.qual_buf.clear(); - let qual_ref: Option<&str> = match record.quality_score() { - Some(Ok(q)) => { - use std::fmt::Write; - let _ = write!(self.qual_buf, "{q}"); - Some(self.qual_buf.as_str()) + // Genotype-only mode: the `GenotypeWriter` streams one chromosome + // at a time. Re-opening a chromosome whose part file was already + // closed would overwrite data, so surface it as an input error. + if self.skip_variants { + if let Some(ref gw) = self.geno_writer { + if gw.current_chrom() != Some(chrom) + && self.finished_chroms.contains(chrom) + { + return Err(CohortError::Input(format!( + "Chromosome {chrom} appears in the current input but \ + its writer was already closed after processing an \ + earlier file. Sort VCF files by chromosome or use \ + one file per chromosome." + ))); + } } - _ => None, - }; + } + + let pos = rec.position; + ascii_uppercase_into(rec.ref_allele, &mut self.ref_buf); - let filters_wrapper = record.filters(); - let filter_raw: &str = filters_wrapper.as_ref(); - let filter_str: Option<&str> = if filter_raw.is_empty() || filter_raw == "." { - None + let alt_str = rec.alt_alleles; + let alt_count = if alt_str.is_empty() { + 0 } else { - Some(filter_raw) + alt_str.matches(',').count() + 1 }; - - let alt_count = if alt_str.is_empty() { 0 } else { alt_str.matches(',').count() + 1 }; if alt_count > 1 { self.multiallelic_split += alt_count as u64 - 1; } - let samples_raw; - let samples_str: &str = if self.geno_writer.is_some() { - samples_raw = record.samples(); - samples_raw.as_ref() - } else { - "" - }; - for (alt_idx, alt) in alt_str.split(',').enumerate() { ascii_uppercase_into(alt.trim(), &mut self.alt_buf); - if self.alt_buf == "*" || self.alt_buf == "." || self.alt_buf.is_empty() || self.alt_buf.starts_with('<') { + if self.alt_buf == "*" + || self.alt_buf == "." + || self.alt_buf.is_empty() + || self.alt_buf.starts_with('<') + { continue; } let (nr, na, np) = parsimony_normalize(&self.ref_buf, &self.alt_buf, pos); - // Split borrow: writers HashMap mutably, ref_buf/alt_buf stay - // immutably borrowed through nr/na from parsimony_normalize. - let cw = Self::get_or_create_writer( - chrom, &mut self.writers, &self.output_dir, self.part_id, - &self.schema, &self.props, self.batch_size, output, - )?; - cw.batch.push(chrom, np, nr, na, rsid, qual_ref, filter_str); - cw.count += 1; + if !self.skip_variants { + let cw = Self::get_or_create_writer( + chrom, + &mut self.writers, + &self.output_dir, + self.part_id, + &self.schema, + &self.props, + self.batch_size, + output, + )?; + cw.batch.push(chrom, np, nr, na, rec.rsid, rec.qual, rec.filter); + cw.count += 1; + + if cw.batch.is_full() { + let rb = cw.batch.finish()?; + cw.writer + .write(&rb) + .map_err(|e| CohortError::Resource(format!("Parquet write error: {e}")))?; + } + } self.variant_count += 1; - if cw.batch.is_full() { - let rb = cw.batch.finish()?; - cw.writer - .write(&rb) - .map_err(|e| CohortError::Resource(format!("Parquet write error: {e}")))?; + if let Some(ref mut gw) = self.geno_writer { + gw.push(chrom, np, nr, na, rec.samples_text, (alt_idx + 1) as u8, output)?; } + } + Ok(()) + } - if let Some(ref mut gw) = self.geno_writer { - gw.push(chrom, np, nr, na, samples_str, (alt_idx + 1) as u8, output)?; + /// End-of-file hook: in genotype-only mode, mark every chromosome the + /// `GenotypeWriter` has touched except the current one as finished, so + /// the next file reopening those triggers the `skip_variants` guard in + /// `process`. + fn finish_file(&mut self) { + if !self.skip_variants { + return; + } + let Some(ref gw) = self.geno_writer else { return }; + let current = gw.current_chrom(); + for c in gw.chromosomes() { + if Some(c.as_str()) != current { + if let Some(canon) = normalize_chrom(c) { + self.finished_chroms.insert(canon); + } } } + } + + fn drive( + &mut self, + reader: &mut dyn VariantReader, + label: &str, + output: &dyn Output, + ) -> Result<(), CohortError> { + let pb = output.progress(0, label); + reader.for_each(&mut |rec| { + self.process(rec, output)?; + pb.inc(1); + Ok(()) + })?; + pb.finish(&format!("{} variants ingested", self.variant_count)); Ok(()) } @@ -526,8 +696,6 @@ impl<'a> RecordContext<'a> { }) } - /// Stream files through this context. Each file is opened, parsed, - /// and its records fed to process_record. fn ingest_files( &mut self, files: &[impl AsRef], @@ -544,41 +712,33 @@ impl<'a> RecordContext<'a> { input_path.file_name().unwrap_or_default().to_string_lossy() )); } - - let reader = open_vcf(input_path, threads)?; - let mut vcf_reader = noodles_vcf::io::Reader::new(reader); - { - let mut hr = vcf_reader.header_reader(); - std::io::copy(&mut hr, &mut std::io::sink()).map_err(|e| { - CohortError::Input(format!( - "Skip VCF header in {}: {e}", - input_path.display() - )) - })?; - } - - let pb = output.progress( - 0, - &format!( - "ingesting {}", - input_path.file_name().unwrap_or_default().to_string_lossy() - ), + let mut reader = VcfVariantReader::open(input_path, threads)?; + let label = format!( + "ingesting {}", + input_path.file_name().unwrap_or_default().to_string_lossy() ); - for result in vcf_reader.records() { - let record = result.map_err(|e| { - CohortError::Analysis(format!( - "VCF parse error in {}: {e}", input_path.display() - )) - })?; - self.process_record(&record, output)?; - pb.inc(1); - } - pb.finish(&format!("{} variants ingested", self.variant_count)); + self.drive(&mut reader, &label, output)?; + self.finish_file(); } Ok(()) } } +/// Stream genotypes into an existing `GenotypeWriter`. No `variants.parquet` +/// is written — this is the standalone extraction path used by the cohort +/// store when variants have already been ingested separately. +pub fn stream_genotypes( + vcf_paths: &[PathBuf], + gw: &mut GenotypeWriter, + memory_budget: u64, + threads: usize, + geno_dir: PathBuf, + output: &dyn Output, +) -> Result<(), CohortError> { + let mut ctx = RecordContext::new_genotype_only(gw, memory_budget, geno_dir, None); + ctx.ingest_files(vcf_paths, threads, output) +} + /// Stream one or more VCF files to per-chromosome parquet files (sequential). #[allow(clippy::too_many_arguments)] pub fn ingest_vcfs( diff --git a/src/staar/genotype.rs b/src/staar/genotype.rs index a83c095..af43566 100644 --- a/src/staar/genotype.rs +++ b/src/staar/genotype.rs @@ -22,7 +22,7 @@ use parquet::file::properties::WriterProperties; use serde::{Deserialize, Serialize}; use crate::error::CohortError; -use crate::ingest::vcf::{ascii_uppercase_into, normalize_chrom, open_vcf, parsimony_normalize}; +use crate::ingest::vcf::open_vcf; use crate::output::Output; #[derive(Serialize, Deserialize)] @@ -49,24 +49,27 @@ pub fn extract_genotypes( return Err(CohortError::Input("No VCF files provided.".into())); } - let reader = open_vcf(&vcf_paths[0], threads)?; - let mut vcf_reader = noodles_vcf::io::Reader::new(reader); - let header = vcf_reader - .read_header() - .map_err(|e| CohortError::Input(format!("VCF header: {e}")))?; - - let sample_names: Vec = header - .sample_names() - .iter() - .map(|s| s.to_string()) - .collect(); + let sample_names = read_sample_names(&vcf_paths[0])?; let n_samples = sample_names.len(); if n_samples == 0 { return Err(CohortError::Input( "VCF has no samples. STAAR requires a multi-sample VCF.".into(), )); } - drop(vcf_reader); + + for vcf_path in &vcf_paths[1..] { + let file_samples = read_sample_names(vcf_path)?; + if file_samples != sample_names { + return Err(CohortError::Input(format!( + "Sample mismatch: '{}' has {} samples but '{}' has {}. \ + All VCF files must have identical samples in the same order.", + vcf_paths[0].display(), + sample_names.len(), + vcf_path.display(), + file_samples.len() + ))); + } + } output.status(&format!( "Extracting genotypes: {} samples, {} file(s), {} threads, {:.1}G memory", @@ -79,66 +82,14 @@ pub fn extract_genotypes( let geno_dir = output_dir.join("genotypes"); let mut gw = GenotypeWriter::new(n_samples, &geno_dir, available_memory)?; - let mut finished_chroms: std::collections::HashSet = std::collections::HashSet::new(); - let mut ref_buf = String::with_capacity(256); - let mut alt_buf = String::with_capacity(256); - let pb = output.progress(0, "extracting genotypes"); - - for (file_idx, vcf_path) in vcf_paths.iter().enumerate() { - let reader = open_vcf(vcf_path, threads)?; - let mut vcf_reader = noodles_vcf::io::Reader::new(reader); - let file_header = vcf_reader - .read_header() - .map_err(|e| CohortError::Input(format!("VCF header in '{}': {e}", vcf_path.display())))?; - - if file_idx > 0 { - let file_samples: Vec = file_header - .sample_names() - .iter() - .map(|s| s.to_string()) - .collect(); - if file_samples != sample_names { - return Err(CohortError::Input(format!( - "Sample mismatch: '{}' has {} samples but '{}' has {}. \ - All VCF files must have identical samples in the same order.", - vcf_paths[0].display(), - sample_names.len(), - vcf_path.display(), - file_samples.len() - ))); - } - } - - for result in vcf_reader.records() { - let record = result.map_err(|e| { - CohortError::Analysis(format!("VCF parse error in '{}': {e}", vcf_path.display())) - })?; - - let raw_chrom = record.reference_sequence_name(); - if let Some(chrom) = normalize_chrom(raw_chrom) { - if gw.current_chrom.as_deref() != Some(chrom) && finished_chroms.contains(chrom) { - return Err(CohortError::Input(format!( - "Chromosome {chrom} appears in '{}' but its writer was already \ - closed after processing an earlier file. Sort VCF files by \ - chromosome or use one file per chromosome.", - vcf_path.display() - ))); - } - } - - process_record_geno(&record, &mut gw, &mut ref_buf, &mut alt_buf, output)?; - pb.inc(1); - } - - if let Some(ref current) = gw.current_chrom { - for c in &gw.chromosomes { - if c != current { - finished_chroms.insert(c.clone()); - } - } - } - } - pb.finish(&format!("{} variants extracted", gw.variant_count())); + crate::ingest::vcf::stream_genotypes( + vcf_paths, + &mut gw, + available_memory, + threads, + geno_dir.clone(), + output, + )?; let total_variants = gw.variant_count(); let source_vcfs = vcf_paths.iter().map(|p| p.display().to_string()).collect(); @@ -214,9 +165,20 @@ impl GenotypeWriter { }) } - /// Push one biallelic variant with dosages extracted from raw VCF sample text. - /// `chrom` and `pos/ref/alt` must already be normalized. - /// `alt_idx` is the 1-based index of this alt allele in the original record. + /// Chromosome currently being written, or `None` before the first push. + pub fn current_chrom(&self) -> Option<&str> { + self.current_chrom.as_deref() + } + + /// Every chromosome touched so far in insertion order, including the one + /// currently open. + pub fn chromosomes(&self) -> &[String] { + &self.chromosomes + } + + /// Push one biallelic variant. `chrom` and `pos/ref/alt` must already be + /// normalized. `alt_idx` is the 1-based index of this alt allele in the + /// original VCF record. #[allow(clippy::too_many_arguments)] pub fn push( &mut self, @@ -242,9 +204,9 @@ impl GenotypeWriter { let mut sample_idx: usize = 0; let mut start: usize = 0; - // memchr uses AVX2/SSE4.2 to find tabs at 32 bytes/cycle. - // For 200K-sample lines (~7MB), this replaces the dominant cost - // of byte-by-byte split('\t'). + // memchr uses AVX2/SSE4.2 to find tabs at 32 bytes/cycle. For + // 200K-sample lines (~7MB), this replaces the dominant cost of + // byte-by-byte split('\t'). for tab_pos in memchr::memchr_iter(b'\t', bytes) { if sample_idx >= n { break; } let gt_len = gt_prefix_len(&bytes[start..tab_pos]); @@ -387,40 +349,6 @@ impl Drop for GenotypeWriter { } } -fn process_record_geno( - record: &noodles_vcf::Record, - gw: &mut GenotypeWriter, - ref_buf: &mut String, - alt_buf: &mut String, - output: &dyn Output, -) -> Result<(), CohortError> { - let chrom = match normalize_chrom(record.reference_sequence_name()) { - Some(c) => c, - None => return Ok(()), - }; - let pos = match record.variant_start() { - Some(Ok(p)) => p.get() as i32, - _ => return Ok(()), - }; - - ascii_uppercase_into(record.reference_bases(), ref_buf); - let alt_bases = record.alternate_bases(); - let alt_str: &str = alt_bases.as_ref(); - let samples_raw = record.samples(); - let samples_str: &str = samples_raw.as_ref(); - - for (alt_idx, alt) in alt_str.split(',').enumerate() { - ascii_uppercase_into(alt.trim(), alt_buf); - if alt_buf == "*" || alt_buf == "." || alt_buf.is_empty() || alt_buf.starts_with('<') { - continue; - } - - let (nr, na, np) = parsimony_normalize(ref_buf, alt_buf, pos); - gw.push(chrom, np, nr, na, samples_str, (alt_idx + 1) as u8, output)?; - } - Ok(()) -} - /// Row-group capacity the GenotypeWriter would allocate under `available_memory`, /// before the 1k..100k clamp. Shared with the ingest preflight so it can reject /// configurations whose raw capacity would force flush-per-variant throughput.