diff --git a/Cargo.toml b/Cargo.toml index fdbadf7..a6e2ad2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,10 @@ documentation = "https://docs.rs/linux-perf-data/" repository = "https://github.com/mstange/linux-perf-data/" exclude = ["/.github", "/.vscode", "/tests"] +[features] +default = ["zstd"] +zstd = ["zstd-safe"] + [dependencies] byteorder = "1.4.3" memchr = "2.4.1" @@ -21,6 +25,7 @@ linux-perf-event-reader = "0.10.0" linear-map = "1.2.0" prost = { version = "0.14", default-features = false, features = ["std"] } prost-derive = "0.14" +zstd-safe = { version = "7.2", optional = true } [dev-dependencies] yaxpeax-arch = { version = "0.3", default-features = false } diff --git a/src/constants.rs b/src/constants.rs index 58c9900..0b7534d 100644 --- a/src/constants.rs +++ b/src/constants.rs @@ -18,6 +18,7 @@ pub const PERF_RECORD_EVENT_UPDATE: u32 = 78; pub const PERF_RECORD_TIME_CONV: u32 = 79; pub const PERF_RECORD_HEADER_FEATURE: u32 = 80; pub const PERF_RECORD_COMPRESSED: u32 = 81; +pub const PERF_RECORD_COMPRESSED2: u32 = 83; // pub const SIMPLE_PERF_RECORD_TYPE_START: u32 = 32768; diff --git a/src/decompression.rs b/src/decompression.rs new file mode 100644 index 0000000..9b625a9 --- /dev/null +++ b/src/decompression.rs @@ -0,0 +1,69 @@ +use zstd_safe::{DCtx, InBuffer, OutBuffer}; + +/// A zstd decompressor for PERF_RECORD_COMPRESSED records. +pub struct ZstdDecompressor { + dctx: Option>, + /// Buffer for partial perf records that span multiple compressed chunks + partial_record_buffer: Vec, +} + +impl Default for ZstdDecompressor { + fn default() -> Self { + Self::new() + } +} + +impl ZstdDecompressor { + pub fn new() -> Self { + Self { + dctx: None, + partial_record_buffer: Vec::new(), + } + } + + /// Decompress a chunk of zstd data. + pub fn decompress(&mut self, compressed_data: &[u8]) -> Result, std::io::Error> { + let dctx = self.dctx.get_or_insert_with(DCtx::create); + + let mut decompressed = vec![0; compressed_data.len() * 4]; + let mut in_buffer = InBuffer::around(compressed_data); + let mut total_out = 0; + + while in_buffer.pos < in_buffer.src.len() { + let available = decompressed.len() - total_out; + let mut out_buffer = OutBuffer::around(&mut decompressed[total_out..]); + + match dctx.decompress_stream(&mut out_buffer, &mut in_buffer) { + Ok(_) => { + total_out += out_buffer.pos(); + if out_buffer.pos() == available { + decompressed.resize(decompressed.len() + compressed_data.len() * 4, 0); + } + } + Err(code) => { + let error_name = zstd_safe::get_error_name(code); + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + format!("Zstd decompression failed: {}", error_name), + )); + } + } + } + + decompressed.truncate(total_out); + + // Prepend any partial record data from the previous chunk + if !self.partial_record_buffer.is_empty() { + let mut combined = std::mem::take(&mut self.partial_record_buffer); + combined.extend_from_slice(&decompressed); + decompressed = combined; + } + + Ok(decompressed) + } + + /// Save partial record data that spans to the next compressed chunk. + pub fn save_partial_record(&mut self, data: &[u8]) { + self.partial_record_buffer = data.to_vec(); + } +} diff --git a/src/error.rs b/src/error.rs index 74e980e..12baee2 100644 --- a/src/error.rs +++ b/src/error.rs @@ -63,6 +63,12 @@ pub enum Error { #[error("The specified size in the perf event header was smaller than the header itself")] InvalidPerfEventSize, + + #[error("Cannot parse non-streaming perf.data file with parse_pipe. Use parse_file instead.")] + FileFormatDetectedInPipeMode, + + #[error("Detected pipe format in file mode")] + PipeFormatDetectedInFileMode, } impl From for Error { diff --git a/src/feature_sections.rs b/src/feature_sections.rs index 39080e3..06de827 100644 --- a/src/feature_sections.rs +++ b/src/feature_sections.rs @@ -49,6 +49,40 @@ impl SampleTimeRange { } } +/// Information about compression used in the perf.data file. +#[derive(Debug, Clone, Copy)] +pub struct CompressionInfo { + pub version: u32, + /// Compression algorithm type. 1 = Zstd + pub type_: u32, + /// Compression level (e.g., 1-22 for Zstd) + pub level: u32, + /// Compression ratio achieved + pub ratio: u32, + /// mmap buffer size + pub mmap_len: u32, +} + +impl CompressionInfo { + pub const STRUCT_SIZE: usize = 4 + 4 + 4 + 4 + 4; + pub const ZSTD_TYPE: u32 = 1; + + pub fn parse(mut reader: R) -> Result { + let version = reader.read_u32::()?; + let type_ = reader.read_u32::()?; + let level = reader.read_u32::()?; + let ratio = reader.read_u32::()?; + let mmap_len = reader.read_u32::()?; + Ok(Self { + version, + type_, + level, + ratio, + mmap_len, + }) + } +} + pub struct HeaderString; impl HeaderString { diff --git a/src/file_reader.rs b/src/file_reader.rs index 5880760..44d57f3 100644 --- a/src/file_reader.rs +++ b/src/file_reader.rs @@ -9,6 +9,9 @@ use linux_perf_event_reader::{ use std::collections::{HashMap, VecDeque}; use std::io::{Cursor, Read, Seek, SeekFrom}; +#[cfg(feature = "zstd")] +use crate::decompression::ZstdDecompressor; + use super::error::{Error, ReadError}; use super::feature_sections::AttributeDescription; use super::features::Feature; @@ -58,7 +61,15 @@ pub struct PerfFileReader { impl PerfFileReader { pub fn parse_file(mut cursor: C) -> Result { - let header = PerfHeader::parse(&mut cursor)?; + let header = match PerfHeader::parse(&mut cursor) { + Ok(header) => header, + Err(Error::PipeFormatDetectedInFileMode) => { + // Rewind and parse as pipe format instead + cursor.seek(SeekFrom::Start(0))?; + return Self::parse_pipe(cursor); + } + Err(e) => return Err(e), + }; match &header.magic { b"PERFILE2" => { Self::parse_file_impl::(cursor, header, Endianness::LittleEndian) @@ -196,6 +207,8 @@ impl PerfFileReader { buffers_for_recycling: VecDeque::new(), current_event_body: Vec::new(), pending_first_record: None, + #[cfg(feature = "zstd")] + zstd_decompressor: ZstdDecompressor::new(), }; Ok(Self { @@ -366,6 +379,8 @@ impl PerfFileReader { buffers_for_recycling: VecDeque::new(), current_event_body: Vec::new(), pending_first_record, + #[cfg(feature = "zstd")] + zstd_decompressor: ZstdDecompressor::new(), }; Ok(Self { @@ -391,6 +406,9 @@ pub struct PerfRecordIter { buffers_for_recycling: VecDeque>, /// For pipe mode: the first non-metadata record that was read during initialization pending_first_record: Option<(PerfEventHeader, Vec)>, + /// Zstd decompressor for handling COMPRESSED records + #[cfg(feature = "zstd")] + zstd_decompressor: ZstdDecompressor, } impl PerfRecordIter { @@ -459,9 +477,9 @@ impl PerfRecordIter { } self.read_offset += u64::from(header.size); - if UserRecordType::try_from(RecordType(header.type_)) - == Some(UserRecordType::PERF_FINISHED_ROUND) - { + let user_record_type = UserRecordType::try_from(RecordType(header.type_)); + + if user_record_type == Some(UserRecordType::PERF_FINISHED_ROUND) { self.sorter.finish_round(); if self.sorter.has_more() { // The sorter is non-empty. We're done. @@ -476,7 +494,6 @@ impl PerfRecordIter { let event_body_len = size - PerfEventHeader::STRUCT_SIZE; let mut buffer = self.buffers_for_recycling.pop_front().unwrap_or_default(); buffer.resize(event_body_len, 0); - // Try to read the event body. For pipe mode, EOF here also means end of stream. match self.reader.read_exact(&mut buffer) { Ok(()) => {} @@ -491,6 +508,28 @@ impl PerfRecordIter { } } + if user_record_type == Some(UserRecordType::PERF_COMPRESSED) { + // PERF_COMPRESSED is the old format, not yet implemented + return Err(Error::IoError(std::io::Error::new( + std::io::ErrorKind::Unsupported, + "PERF_COMPRESSED (type 81) is not supported yet, only PERF_COMPRESSED2 (type 83)", + ))); + } + + if user_record_type == Some(UserRecordType::PERF_COMPRESSED2) { + #[cfg(not(feature = "zstd"))] + { + return Err(Error::IoError(std::io::Error::new(std::io::ErrorKind::Unsupported, + "Compression support is not enabled. Please rebuild with the 'zstd' feature flag.", + ))); + } + #[cfg(feature = "zstd")] + { + self.decompress_and_process_compressed2::(&buffer)?; + continue; + } + } + self.process_record::(header, buffer, offset)?; } @@ -542,7 +581,64 @@ impl PerfRecordIter { attr_index, }; self.sorter.insert_unordered(sort_key, pending_record); + Ok(()) + } + /// Decompresses a PERF_RECORD_COMPRESSED2 record and processes its sub-records. + #[cfg(feature = "zstd")] + fn decompress_and_process_compressed2( + &mut self, + buffer: &[u8], + ) -> Result<(), Error> { + if buffer.len() < 8 { + return Err(ReadError::PerfEventData.into()); + } + let data_size = T::read_u64(&buffer[0..8]) as usize; + if data_size > buffer.len() - 8 { + return Err(ReadError::PerfEventData.into()); + } + let compressed_data = &buffer[8..8 + data_size]; + + let decompressed = self.zstd_decompressor.decompress(compressed_data)?; + + // Parse the decompressed data as a sequence of perf records + let mut cursor = Cursor::new(&decompressed[..]); + let mut offset = 0u64; + + while (cursor.position() as usize) < decompressed.len() { + let header_start = cursor.position() as usize; + // Check if we have enough bytes for a header + let remaining = decompressed.len() - header_start; + if remaining < PerfEventHeader::STRUCT_SIZE { + self.zstd_decompressor + .save_partial_record(&decompressed[header_start..]); + break; + } + + let sub_header = PerfEventHeader::parse::<_, T>(&mut cursor)?; + let sub_size = sub_header.size as usize; + if sub_size < PerfEventHeader::STRUCT_SIZE { + return Err(Error::InvalidPerfEventSize); + } + + let sub_event_body_len = sub_size - PerfEventHeader::STRUCT_SIZE; + // Check if we have enough bytes for the sub-record body + let remaining_after_header = decompressed.len() - cursor.position() as usize; + if sub_event_body_len > remaining_after_header { + self.zstd_decompressor + .save_partial_record(&decompressed[header_start..]); + break; + } + + let mut sub_buffer = self.buffers_for_recycling.pop_front().unwrap_or_default(); + sub_buffer.resize(sub_event_body_len, 0); + cursor + .read_exact(&mut sub_buffer) + .map_err(|_| ReadError::PerfEventData)?; + + self.process_record::(sub_header, sub_buffer, offset)?; + offset += sub_size as u64; + } Ok(()) } diff --git a/src/header.rs b/src/header.rs index 9000e4d..1849746 100644 --- a/src/header.rs +++ b/src/header.rs @@ -2,6 +2,7 @@ use std::io::Read; use byteorder::{ByteOrder, ReadBytesExt}; +use super::error::Error; use super::features::FeatureSet; use super::section::PerfFileSection; @@ -28,7 +29,7 @@ pub struct PerfHeader { } impl PerfHeader { - pub fn parse(mut reader: R) -> Result { + pub fn parse(mut reader: R) -> Result { let mut magic = [0; 8]; reader.read_exact(&mut magic)?; @@ -39,11 +40,14 @@ impl PerfHeader { } } - fn parse_impl( - mut reader: R, - magic: [u8; 8], - ) -> Result { + fn parse_impl(mut reader: R, magic: [u8; 8]) -> Result { let header_size = reader.read_u64::()?; + + // Detect if this is actually a pipe format instead of file format. + if header_size == std::mem::size_of::() as u64 { + return Err(Error::PipeFormatDetectedInFileMode); + } + let attr_size = reader.read_u64::()?; let attr_section = PerfFileSection::parse::<_, T>(&mut reader)?; let data_section = PerfFileSection::parse::<_, T>(&mut reader)?; @@ -81,7 +85,7 @@ pub struct PerfPipeHeader { } impl PerfPipeHeader { - pub fn parse(mut reader: R) -> Result { + pub fn parse(mut reader: R) -> Result { let mut magic = [0; 8]; reader.read_exact(&mut magic)?; @@ -90,6 +94,14 @@ impl PerfPipeHeader { } else { reader.read_u64::()? }; + + // Detect if this is actually a file format instead of pipe format. + if size > std::mem::size_of::() as u64 + && size == std::mem::size_of::() as u64 + { + return Err(Error::FileFormatDetectedInPipeMode); + } + Ok(Self { magic, size }) } } diff --git a/src/lib.rs b/src/lib.rs index fc95d4e..61557d8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -64,6 +64,8 @@ mod build_id_event; mod constants; +#[cfg(feature = "zstd")] +mod decompression; mod dso_info; mod dso_key; mod error; @@ -91,7 +93,7 @@ pub use linux_perf_event_reader::Endianness; pub use dso_info::DsoInfo; pub use dso_key::DsoKey; pub use error::{Error, ReadError}; -pub use feature_sections::{AttributeDescription, NrCpus, SampleTimeRange}; +pub use feature_sections::{AttributeDescription, CompressionInfo, NrCpus, SampleTimeRange}; pub use features::{Feature, FeatureSet, FeatureSetIter}; pub use file_reader::{PerfFileReader, PerfRecordIter}; pub use perf_file::PerfFile; diff --git a/src/perf_file.rs b/src/perf_file.rs index fd4625b..0f168d5 100644 --- a/src/perf_file.rs +++ b/src/perf_file.rs @@ -10,7 +10,7 @@ use super::dso_info::DsoInfo; use super::dso_key::DsoKey; use super::error::Error; use super::feature_sections::{ - AttributeDescription, ClockData, NrCpus, PmuMappings, SampleTimeRange, + AttributeDescription, ClockData, CompressionInfo, NrCpus, PmuMappings, SampleTimeRange, }; use super::features::{Feature, FeatureSet}; use super::simpleperf; @@ -213,6 +213,18 @@ impl PerfFile { .transpose() } + /// Information about compression used in the perf.data file + pub fn compression_info(&self) -> Result, Error> { + self.feature_section_data(Feature::COMPRESSED) + .map(|section| { + Ok(match self.endian { + Endianness::LittleEndian => CompressionInfo::parse::<_, LittleEndian>(section), + Endianness::BigEndian => CompressionInfo::parse::<_, BigEndian>(section), + }?) + }) + .transpose() + } + /// The meta info map, if this is a Simpleperf profile. pub fn simpleperf_meta_info(&self) -> Result>, Error> { match self.feature_section_data(Feature::SIMPLEPERF_META_INFO) { diff --git a/src/record.rs b/src/record.rs index 5c3a587..ed4e025 100644 --- a/src/record.rs +++ b/src/record.rs @@ -55,6 +55,7 @@ impl UserRecordType { pub const PERF_TIME_CONV: Self = Self(RecordType(PERF_RECORD_TIME_CONV)); pub const PERF_HEADER_FEATURE: Self = Self(RecordType(PERF_RECORD_HEADER_FEATURE)); pub const PERF_COMPRESSED: Self = Self(RecordType(PERF_RECORD_COMPRESSED)); + pub const PERF_COMPRESSED2: Self = Self(RecordType(PERF_RECORD_COMPRESSED2)); pub const SIMPLEPERF_KERNEL_SYMBOL: Self = Self(RecordType(SIMPLE_PERF_RECORD_KERNEL_SYMBOL)); pub const SIMPLEPERF_DSO: Self = Self(RecordType(SIMPLE_PERF_RECORD_DSO)); @@ -107,6 +108,7 @@ impl std::fmt::Debug for UserRecordType { Self::PERF_TIME_CONV => "PERF_TIME_CONV".fmt(f), Self::PERF_HEADER_FEATURE => "PERF_HEADER_FEATURE".fmt(f), Self::PERF_COMPRESSED => "PERF_COMPRESSED".fmt(f), + Self::PERF_COMPRESSED2 => "PERF_COMPRESSED2".fmt(f), Self::SIMPLEPERF_KERNEL_SYMBOL => "SIMPLEPERF_KERNEL_SYMBOL".fmt(f), Self::SIMPLEPERF_DSO => "SIMPLEPERF_DSO".fmt(f), Self::SIMPLEPERF_SYMBOL => "SIMPLEPERF_SYMBOL".fmt(f), diff --git a/tests/compression_e2e.rs b/tests/compression_e2e.rs new file mode 100644 index 0000000..816c297 --- /dev/null +++ b/tests/compression_e2e.rs @@ -0,0 +1,414 @@ +use linux_perf_data::{CompressionInfo, Error, PerfFileReader, PerfFileRecord}; +use std::fs::File; +use std::io::BufReader; + +/// Test that compressed files can be parsed successfully +#[test] +fn test_compressed_file_parsing() { + let file = File::open("tests/fixtures/sleep_compressed.data").unwrap(); + let reader = BufReader::new(file); + + let PerfFileReader { + mut perf_file, + mut record_iter, + } = PerfFileReader::parse_file(reader).unwrap(); + + // Should have compression info + let comp_info = perf_file.compression_info().unwrap(); + assert!( + comp_info.is_some(), + "Compressed file should have compression info" + ); + + // Count records - should be able to read them all + let mut count = 0; + while let Some(_record) = record_iter.next_record(&mut perf_file).unwrap() { + count += 1; + } + + assert!(count > 0, "Should have read some records"); +} + +/// Test that uncompressed files return None for compression_info +#[test] +fn test_uncompressed_file_no_compression_info() { + let file = File::open("tests/fixtures/sleep.data").unwrap(); + let reader = BufReader::new(file); + + let PerfFileReader { + mut perf_file, + mut record_iter, + } = PerfFileReader::parse_file(reader).unwrap(); + + // Should NOT have compression info + let comp_info = perf_file.compression_info().unwrap(); + assert!( + comp_info.is_none(), + "Uncompressed file should not have compression info" + ); + + // But should still be able to read records + let mut count = 0; + while let Some(_record) = record_iter.next_record(&mut perf_file).unwrap() { + count += 1; + } + + assert!(count > 0, "Should have read some records"); +} + +/// Test compression metadata is correct +#[test] +fn test_compression_metadata() { + let file = File::open("tests/fixtures/sleep_compressed.data").unwrap(); + let reader = BufReader::new(file); + + let PerfFileReader { + perf_file, + record_iter: _, + } = PerfFileReader::parse_file(reader).unwrap(); + + let comp_info = perf_file + .compression_info() + .unwrap() + .expect("Compressed file should have compression info"); + + // Should be using Zstd compression + assert_eq!( + comp_info.type_, + CompressionInfo::ZSTD_TYPE, + "Should be using Zstd compression" + ); + + // Compression level should be in valid Zstd range (1-22, or 0 for default) + assert!( + comp_info.level <= 22, + "Compression level should be <= 22, got {}", + comp_info.level + ); + + // Ratio and mmap_len should be non-zero + assert!(comp_info.ratio > 0, "Compression ratio should be non-zero"); + assert!(comp_info.mmap_len > 0, "mmap_len should be non-zero"); +} + +/// Test that compressed and uncompressed files contain equivalent data +#[test] +fn test_compressed_uncompressed_equivalence() { + // Read compressed file + let file = File::open("tests/fixtures/sleep_compressed.data").unwrap(); + let reader = BufReader::new(file); + let PerfFileReader { + mut perf_file, + mut record_iter, + } = PerfFileReader::parse_file(reader).unwrap(); + + // Collect record type debug strings from compressed file + let mut compressed_records = Vec::new(); + while let Some(record) = record_iter.next_record(&mut perf_file).unwrap() { + let record_type_str = match &record { + PerfFileRecord::EventRecord { record, .. } => format!("{:?}", record.record_type), + PerfFileRecord::UserRecord(record) => format!("{:?}", record.record_type), + }; + compressed_records.push(record_type_str); + } + + // Read uncompressed file + let file = File::open("tests/fixtures/sleep.data").unwrap(); + let reader = BufReader::new(file); + let PerfFileReader { + mut perf_file, + mut record_iter, + } = PerfFileReader::parse_file(reader).unwrap(); + + // Collect record type debug strings from uncompressed file + let mut uncompressed_records = Vec::new(); + while let Some(record) = record_iter.next_record(&mut perf_file).unwrap() { + let record_type_str = match &record { + PerfFileRecord::EventRecord { record, .. } => format!("{:?}", record.record_type), + PerfFileRecord::UserRecord(record) => format!("{:?}", record.record_type), + }; + uncompressed_records.push(record_type_str); + } + + // Both files should have records + assert!( + !compressed_records.is_empty(), + "Compressed file should have records" + ); + assert!( + !uncompressed_records.is_empty(), + "Uncompressed file should have records" + ); + + // Note: The test files were generated from different perf record sessions + // (one with -k monotonic, one without), so exact counts won't match. + // This test verifies both files can be parsed and contain the expected + // types of records (e.g., both have SAMPLE, MMAP, etc.) + let compressed_count = compressed_records.len(); + let uncompressed_count = uncompressed_records.len(); + + println!( + "Compressed file: {} records, Uncompressed file: {} records", + compressed_count, uncompressed_count + ); + + // Both should have a reasonable number of records for a 1-second sleep + assert!( + compressed_count >= 10, + "Compressed file should have at least 10 records, got {}", + compressed_count + ); + assert!( + uncompressed_count >= 10, + "Uncompressed file should have at least 10 records, got {}", + uncompressed_count + ); +} + +/// Test that we can read sample records from compressed data +#[test] +fn test_compressed_sample_records() { + let file = File::open("tests/fixtures/sleep_compressed.data").unwrap(); + let reader = BufReader::new(file); + + let PerfFileReader { + mut perf_file, + mut record_iter, + } = PerfFileReader::parse_file(reader).unwrap(); + + let mut sample_count = 0; + let mut total_count = 0; + + // Record type 9 is PERF_RECORD_SAMPLE + const PERF_RECORD_SAMPLE: u32 = 9; + + while let Some(record) = record_iter.next_record(&mut perf_file).unwrap() { + if let PerfFileRecord::EventRecord { record, .. } = record { + if record.record_type.0 == PERF_RECORD_SAMPLE { + sample_count += 1; + } + } + total_count += 1; + } + + assert!(total_count > 0, "Should have processed some records"); + assert!( + sample_count > 0, + "Should have found at least one SAMPLE record" + ); +} + +/// Test that record types are as expected (no COMPRESSED2 or FINISHED_INIT exposed) +#[test] +fn test_no_compressed_records_in_output() { + use linux_perf_data::UserRecordType; + + let file = File::open("tests/fixtures/sleep_compressed.data").unwrap(); + let reader = BufReader::new(file); + + let PerfFileReader { + mut perf_file, + mut record_iter, + } = PerfFileReader::parse_file(reader).unwrap(); + + while let Some(record) = record_iter.next_record(&mut perf_file).unwrap() { + // Check user records - these internal types should never be exposed + if let PerfFileRecord::UserRecord(record) = &record { + assert_ne!( + record.record_type, + UserRecordType::PERF_COMPRESSED2, + "COMPRESSED2 records should be transparent and not exposed" + ); + } + } +} + +/// Test pipe mode with zstd compression +#[test] +fn test_pipe_mode_with_zstd_compression() { + use linux_perf_data::UserRecordType; + + // Read pipe mode compressed file + let file = File::open("tests/fixtures/sleep_compressed.pipe.data").unwrap(); + let reader = BufReader::new(file); + + // Use parse_pipe instead of parse_file + let PerfFileReader { + mut perf_file, + mut record_iter, + } = PerfFileReader::parse_pipe(reader).unwrap(); + + // Should have compression info even in pipe mode + let comp_info = perf_file.compression_info().unwrap(); + assert!( + comp_info.is_some(), + "Pipe mode should support compression info" + ); + + if let Some(info) = comp_info { + assert_eq!( + info.type_, + CompressionInfo::ZSTD_TYPE, + "Should be using Zstd compression" + ); + assert!(info.level <= 22, "Compression level should be valid"); + } + + // Should be able to read all records + let mut total_count = 0; + let mut sample_count = 0; + let mut record_type_counts = std::collections::HashMap::new(); + + // Record type 9 is PERF_RECORD_SAMPLE + const PERF_RECORD_SAMPLE: u32 = 9; + + while let Some(record) = record_iter.next_record(&mut perf_file).unwrap() { + // Verify internal compressed record types are not exposed + if let PerfFileRecord::UserRecord(user_record) = &record { + assert_ne!( + user_record.record_type, + UserRecordType::PERF_COMPRESSED2, + "COMPRESSED2 records should be transparent" + ); + *record_type_counts + .entry(format!("User:{:?}", user_record.record_type)) + .or_insert(0) += 1; + } + + // Count samples + if let PerfFileRecord::EventRecord { record, .. } = &record { + *record_type_counts + .entry(format!("{:?}", record.record_type)) + .or_insert(0) += 1; + if record.record_type.0 == PERF_RECORD_SAMPLE { + sample_count += 1; + } + } + + total_count += 1; + } + + println!( + "Pipe mode zstd: {} total records, {} samples", + total_count, sample_count + ); + println!("Record type counts: {:?}", record_type_counts); + + // Verify we parsed a substantial number of records (streaming decompression working) + assert!( + total_count >= 100, + "Should have read at least 100 records from pipe mode, got {}", + total_count + ); + assert!( + sample_count > 0, + "Should have found sample records in pipe mode" + ); +} + +/// Test that records spanning compressed chunk boundaries are handled correctly. +/// +/// This test uses a fixture where perf records span across COMPRESSED record +/// boundaries, requiring the decompressor to buffer partial records. +#[cfg(feature = "zstd")] +#[test] +fn test_records_spanning_compressed_boundaries() { + let file = File::open("tests/fixtures/fibo_compressed.pipe.data").unwrap(); + let reader = BufReader::new(file); + + let PerfFileReader { + mut perf_file, + mut record_iter, + } = PerfFileReader::parse_pipe(reader).unwrap(); + + const PERF_RECORD_SAMPLE: u32 = 9; + + let mut total_count = 0; + let mut sample_count = 0; + + while let Some(record) = record_iter.next_record(&mut perf_file).unwrap() { + if let PerfFileRecord::EventRecord { record, .. } = &record { + if record.record_type.0 == PERF_RECORD_SAMPLE { + sample_count += 1; + } + } + total_count += 1; + } + + // This file has records that span compressed chunk boundaries. + // Without proper partial record handling, we'd get fewer records or errors. + assert!( + total_count > 1000, + "Expected >1000 records (got {}), partial record handling may be broken", + total_count + ); + assert!( + sample_count > 500, + "Expected >500 samples (got {}), partial record handling may be broken", + sample_count + ); +} + +/// Test feature flag: when zstd is disabled, appropriate error occurs +#[cfg(not(feature = "zstd"))] +#[test] +fn test_zstd_feature_disabled_error() { + let file = File::open("tests/fixtures/sleep_compressed.data").unwrap(); + let reader = BufReader::new(file); + + // Should be able to parse the file header + let result = PerfFileReader::parse_file(reader); + + // But reading compressed records should fail + if let Ok(PerfFileReader { + mut perf_file, + mut record_iter, + }) = result + { + let mut found_error = false; + while let Some(result) = record_iter.next_record(&mut perf_file).transpose() { + if result.is_err() { + found_error = true; + break; + } + } + assert!( + found_error, + "Should get an error when reading compressed data without zstd feature" + ); + } +} + +/// Test that parse_pipe fails with a clear error when given file format data +#[test] +fn test_parse_pipe_with_file_format_fails() { + let file = File::open("tests/fixtures/sleep.data").unwrap(); + let reader = BufReader::new(file); + + let result = PerfFileReader::parse_pipe(reader); + assert!( + matches!(result, Err(Error::FileFormatDetectedInPipeMode)), + "Expected FileFormatDetectedInPipeMode error" + ); +} + +/// Test that parse_file transparently handles pipe format by falling back to parse_pipe +#[test] +fn test_parse_file_with_pipe_format_falls_back() { + let file = File::open("tests/fixtures/sleep_compressed.pipe.data").unwrap(); + let reader = BufReader::new(file); + + // parse_file should detect pipe format and fall back to parse_pipe + let PerfFileReader { + mut perf_file, + mut record_iter, + } = PerfFileReader::parse_file(reader) + .expect("parse_file should handle pipe format transparently"); + + // Should be able to read records + let mut count = 0; + while let Some(_record) = record_iter.next_record(&mut perf_file).unwrap() { + count += 1; + } + assert!(count > 0, "Should have read some records"); +} diff --git a/tests/fixtures/README.md b/tests/fixtures/README.md new file mode 100644 index 0000000..1cdaf77 --- /dev/null +++ b/tests/fixtures/README.md @@ -0,0 +1,29 @@ +# Test Fixtures + +### File Mode (standard perf.data format) + +- `sleep.data` - Uncompressed perf data file (104-byte header) +- `sleep_compressed.data` - Zstd-compressed perf data file (104-byte header) + +### Pipe Mode (streaming perf.data format) + +- `sleep_compressed.pipe.data` - Zstd-compressed pipe mode data (16-byte header) +- `fibo_compressed.pipe.data` - Zstd-compressed pipe mode data with records spanning compressed chunk boundaries + +## Generation + +These files were generated using the following commands: + +```bash +# File mode - Uncompressed +perf record -o sleep.data -k monotonic sleep 1 + +# File mode - Compressed with zstd +perf record -z -o sleep_compressed.data -k monotonic sleep 1 + +# Pipe mode - Compressed with zstd +perf record -z -o - sleep 1 > sleep_compressed.pipe.data +``` + +All files capture the sleep 1 second workload to enable comparison testing. +Except for `fibo_compressed.pipe.data`, which was generated using a custom workload to create records that span compressed chunk boundaries. diff --git a/tests/fixtures/fibo_compressed.pipe.data b/tests/fixtures/fibo_compressed.pipe.data new file mode 100644 index 0000000..20d6e9a Binary files /dev/null and b/tests/fixtures/fibo_compressed.pipe.data differ diff --git a/tests/fixtures/sleep.data b/tests/fixtures/sleep.data new file mode 100644 index 0000000..1a912e4 Binary files /dev/null and b/tests/fixtures/sleep.data differ diff --git a/tests/fixtures/sleep_compressed.data b/tests/fixtures/sleep_compressed.data new file mode 100644 index 0000000..ce8d3c4 Binary files /dev/null and b/tests/fixtures/sleep_compressed.data differ diff --git a/tests/fixtures/sleep_compressed.pipe.data b/tests/fixtures/sleep_compressed.pipe.data new file mode 100644 index 0000000..294297a Binary files /dev/null and b/tests/fixtures/sleep_compressed.pipe.data differ