@@ -9,6 +9,9 @@ use linux_perf_event_reader::{
99use std:: collections:: { HashMap , VecDeque } ;
1010use std:: io:: { Cursor , Read , Seek , SeekFrom } ;
1111
12+ #[ cfg( feature = "zstd" ) ]
13+ use crate :: decompression:: ZstdDecompressor ;
14+
1215use super :: error:: { Error , ReadError } ;
1316use super :: feature_sections:: AttributeDescription ;
1417use super :: features:: Feature ;
@@ -196,6 +199,8 @@ impl<C: Read + Seek> PerfFileReader<C> {
196199 buffers_for_recycling : VecDeque :: new ( ) ,
197200 current_event_body : Vec :: new ( ) ,
198201 pending_first_record : None ,
202+ #[ cfg( feature = "zstd" ) ]
203+ zstd_decompressor : ZstdDecompressor :: new ( ) ,
199204 } ;
200205
201206 Ok ( Self {
@@ -230,10 +235,7 @@ impl<R: Read> PerfFileReader<R> {
230235 }
231236 }
232237
233- fn parse_pipe_impl < T : ByteOrder > (
234- mut reader : R ,
235- endian : Endianness ,
236- ) -> Result < Self , Error > {
238+ fn parse_pipe_impl < T : ByteOrder > ( mut reader : R , endian : Endianness ) -> Result < Self , Error > {
237239 let mut attributes = Vec :: new ( ) ;
238240 let mut feature_sections = LinearMap :: new ( ) ;
239241 let mut pending_first_record: Option < ( PerfEventHeader , Vec < u8 > ) > = None ;
@@ -302,16 +304,16 @@ impl<R: Read> PerfFileReader<R> {
302304 // If EVENT_DESC feature is present, extract event names from it
303305 if let Some ( event_desc_data) = feature_sections. get ( & Feature :: EVENT_DESC ) {
304306 let event_desc_attrs = AttributeDescription :: parse_event_desc_section :: < _ , T > (
305- Cursor :: new ( & event_desc_data[ ..] )
307+ Cursor :: new ( & event_desc_data[ ..] ) ,
306308 ) ?;
307309
308310 // Match attributes by event IDs and update names
309311 for attr in attributes. iter_mut ( ) {
310312 // Find matching event in EVENT_DESC by comparing event IDs
311313 if let Some ( event_desc_attr) = event_desc_attrs. iter ( ) . find ( |desc| {
312- !desc. event_ids . is_empty ( ) &&
313- !attr. event_ids . is_empty ( ) &&
314- desc. event_ids [ 0 ] == attr. event_ids [ 0 ]
314+ !desc. event_ids . is_empty ( )
315+ && !attr. event_ids . is_empty ( )
316+ && desc. event_ids [ 0 ] == attr. event_ids [ 0 ]
315317 } ) {
316318 attr. name = event_desc_attr. name . clone ( ) ;
317319 }
@@ -384,6 +386,8 @@ impl<R: Read> PerfFileReader<R> {
384386 buffers_for_recycling : VecDeque :: new ( ) ,
385387 current_event_body : Vec :: new ( ) ,
386388 pending_first_record,
389+ #[ cfg( feature = "zstd" ) ]
390+ zstd_decompressor : ZstdDecompressor :: new ( ) ,
387391 } ;
388392
389393 Ok ( Self {
@@ -408,6 +412,9 @@ pub struct PerfRecordIter<R: Read> {
408412 buffers_for_recycling : VecDeque < Vec < u8 > > ,
409413 /// For pipe mode: the first non-metadata record that was read during initialization
410414 pending_first_record : Option < ( PerfEventHeader , Vec < u8 > ) > ,
415+ /// Zstd decompressor for handling COMPRESSED records
416+ #[ cfg( feature = "zstd" ) ]
417+ zstd_decompressor : ZstdDecompressor ,
411418}
412419
413420impl < R : Read > PerfRecordIter < R > {
@@ -458,7 +465,9 @@ impl<R: Read> PerfRecordIter<R> {
458465 Ok ( header) => header,
459466 Err ( e) => {
460467 // For pipe mode with unbounded length, EOF just means end of stream
461- if self . record_data_len == u64:: MAX && e. kind ( ) == std:: io:: ErrorKind :: UnexpectedEof {
468+ if self . record_data_len == u64:: MAX
469+ && e. kind ( ) == std:: io:: ErrorKind :: UnexpectedEof
470+ {
462471 break ;
463472 }
464473 return Err ( e. into ( ) ) ;
@@ -471,9 +480,9 @@ impl<R: Read> PerfRecordIter<R> {
471480 }
472481 self . read_offset += u64:: from ( header. size ) ;
473482
474- if UserRecordType :: try_from ( RecordType ( header. type_ ) )
475- == Some ( UserRecordType :: PERF_FINISHED_ROUND )
476- {
483+ let user_record_type = UserRecordType :: try_from ( RecordType ( header. type_ ) ) ;
484+
485+ if user_record_type == Some ( UserRecordType :: PERF_FINISHED_ROUND ) {
477486 self . sorter . finish_round ( ) ;
478487 if self . sorter . has_more ( ) {
479488 // The sorter is non-empty. We're done.
@@ -488,19 +497,42 @@ impl<R: Read> PerfRecordIter<R> {
488497 let event_body_len = size - PerfEventHeader :: STRUCT_SIZE ;
489498 let mut buffer = self . buffers_for_recycling . pop_front ( ) . unwrap_or_default ( ) ;
490499 buffer. resize ( event_body_len, 0 ) ;
491-
492500 // Try to read the event body. For pipe mode, EOF here also means end of stream.
493501 match self . reader . read_exact ( & mut buffer) {
494- Ok ( ( ) ) => { } ,
502+ Ok ( ( ) ) => { }
495503 Err ( e) => {
496504 // For pipe mode with unbounded length, EOF just means end of stream
497- if self . record_data_len == u64:: MAX && e. kind ( ) == std:: io:: ErrorKind :: UnexpectedEof {
505+ if self . record_data_len == u64:: MAX
506+ && e. kind ( ) == std:: io:: ErrorKind :: UnexpectedEof
507+ {
498508 break ;
499509 }
500510 return Err ( ReadError :: PerfEventData . into ( ) ) ;
501511 }
502512 }
503513
514+ if user_record_type == Some ( UserRecordType :: PERF_COMPRESSED ) {
515+ // PERF_COMPRESSED is the old format, not yet implemented
516+ return Err ( Error :: IoError ( std:: io:: Error :: new (
517+ std:: io:: ErrorKind :: Unsupported ,
518+ "PERF_COMPRESSED (type 81) is not supported yet, only PERF_COMPRESSED2 (type 83)" ,
519+ ) ) ) ;
520+ }
521+
522+ if user_record_type == Some ( UserRecordType :: PERF_COMPRESSED2 ) {
523+ #[ cfg( not( feature = "zstd" ) ) ]
524+ {
525+ return Err ( Error :: IoError ( std:: io:: Error :: new ( std:: io:: ErrorKind :: Unsupported ,
526+ "Compression support is not enabled. Please rebuild with the 'zstd' feature flag." ,
527+ ) ) ) ;
528+ }
529+ #[ cfg( feature = "zstd" ) ]
530+ {
531+ self . decompress_and_process_compressed2 :: < T > ( & buffer) ?;
532+ continue ;
533+ }
534+ }
535+
504536 self . process_record :: < T > ( header, buffer, offset) ?;
505537 }
506538
@@ -552,7 +584,64 @@ impl<R: Read> PerfRecordIter<R> {
552584 attr_index,
553585 } ;
554586 self . sorter . insert_unordered ( sort_key, pending_record) ;
587+ Ok ( ( ) )
588+ }
555589
590+ /// Decompresses a PERF_RECORD_COMPRESSED2 record and processes its sub-records.
591+ #[ cfg( feature = "zstd" ) ]
592+ fn decompress_and_process_compressed2 < T : ByteOrder > (
593+ & mut self ,
594+ buffer : & [ u8 ] ,
595+ ) -> Result < ( ) , Error > {
596+ if buffer. len ( ) < 8 {
597+ return Err ( ReadError :: PerfEventData . into ( ) ) ;
598+ }
599+ let data_size = T :: read_u64 ( & buffer[ 0 ..8 ] ) as usize ;
600+ if data_size > buffer. len ( ) - 8 {
601+ return Err ( ReadError :: PerfEventData . into ( ) ) ;
602+ }
603+ let compressed_data = & buffer[ 8 ..8 + data_size] ;
604+
605+ let decompressed = self . zstd_decompressor . decompress ( compressed_data) ?;
606+
607+ // Parse the decompressed data as a sequence of perf records
608+ let mut cursor = Cursor :: new ( & decompressed[ ..] ) ;
609+ let mut offset = 0u64 ;
610+
611+ while ( cursor. position ( ) as usize ) < decompressed. len ( ) {
612+ let header_start = cursor. position ( ) as usize ;
613+ // Check if we have enough bytes for a header
614+ let remaining = decompressed. len ( ) - header_start;
615+ if remaining < PerfEventHeader :: STRUCT_SIZE {
616+ self . zstd_decompressor
617+ . save_partial_record ( & decompressed[ header_start..] ) ;
618+ break ;
619+ }
620+
621+ let sub_header = PerfEventHeader :: parse :: < _ , T > ( & mut cursor) ?;
622+ let sub_size = sub_header. size as usize ;
623+ if sub_size < PerfEventHeader :: STRUCT_SIZE {
624+ return Err ( Error :: InvalidPerfEventSize ) ;
625+ }
626+
627+ let sub_event_body_len = sub_size - PerfEventHeader :: STRUCT_SIZE ;
628+ // Check if we have enough bytes for the sub-record body
629+ let remaining_after_header = decompressed. len ( ) - cursor. position ( ) as usize ;
630+ if sub_event_body_len > remaining_after_header {
631+ self . zstd_decompressor
632+ . save_partial_record ( & decompressed[ header_start..] ) ;
633+ break ;
634+ }
635+
636+ let mut sub_buffer = self . buffers_for_recycling . pop_front ( ) . unwrap_or_default ( ) ;
637+ sub_buffer. resize ( sub_event_body_len, 0 ) ;
638+ cursor
639+ . read_exact ( & mut sub_buffer)
640+ . map_err ( |_| ReadError :: PerfEventData ) ?;
641+
642+ self . process_record :: < T > ( sub_header, sub_buffer, offset) ?;
643+ offset += sub_size as u64 ;
644+ }
556645 Ok ( ( ) )
557646 }
558647
0 commit comments