From d0163ed2f8188b700a5d288fdf41fd1879bb6d39 Mon Sep 17 00:00:00 2001 From: Zious Date: Tue, 7 Apr 2026 13:12:02 -0500 Subject: [PATCH 1/6] refactor: make FlowKey fields private with read-only accessors Prevents HashMap key mutation after insertion. Adds lower_ip(), lower_port(), upper_ip(), upper_port() accessors. Updates dispatcher and tests to use accessor syntax. Part of #12 --- src/dispatcher.rs | 2 +- src/reassembly/flow.rs | 24 ++++++++++++++++++++---- tests/reassembly_flow_tests.rs | 12 ++++++------ 3 files changed, 27 insertions(+), 11 deletions(-) diff --git a/src/dispatcher.rs b/src/dispatcher.rs index 1af8c9d..e27189a 100644 --- a/src/dispatcher.rs +++ b/src/dispatcher.rs @@ -47,7 +47,7 @@ fn classify(data: &[u8], flow_key: &FlowKey) -> DispatchTarget { return DispatchTarget::Http; } // Port fallback for short data - let ports = [flow_key.lower_port, flow_key.upper_port]; + let ports = [flow_key.lower_port(), flow_key.upper_port()]; if ports.contains(&443) || ports.contains(&8443) { return DispatchTarget::Tls; } diff --git a/src/reassembly/flow.rs b/src/reassembly/flow.rs index c7da2dd..5812d67 100644 --- a/src/reassembly/flow.rs +++ b/src/reassembly/flow.rs @@ -5,13 +5,29 @@ use crate::reassembly::handler::Direction; #[derive(Debug, Clone, PartialEq, Eq, Hash)] pub struct FlowKey { - pub lower_ip: IpAddr, - pub lower_port: u16, - pub upper_ip: IpAddr, - pub upper_port: u16, + lower_ip: IpAddr, + lower_port: u16, + upper_ip: IpAddr, + upper_port: u16, } impl FlowKey { + pub fn lower_ip(&self) -> IpAddr { + self.lower_ip + } + + pub fn lower_port(&self) -> u16 { + self.lower_port + } + + pub fn upper_ip(&self) -> IpAddr { + self.upper_ip + } + + pub fn upper_port(&self) -> u16 { + self.upper_port + } + pub fn new(ip_a: IpAddr, port_a: u16, ip_b: IpAddr, port_b: u16) -> Self { // Canonicalize by (ip, port) tuple comparison — keeps IP+port paired together. // This is critical: sorting independently would merge different connections. diff --git a/tests/reassembly_flow_tests.rs b/tests/reassembly_flow_tests.rs index 19786b7..ae7a7e2 100644 --- a/tests/reassembly_flow_tests.rs +++ b/tests/reassembly_flow_tests.rs @@ -13,10 +13,10 @@ fn test_flow_key_canonicalization() { assert_eq!(key_ab, key_ba); // Tuple ordering: (10.0.0.1, 12345) < (10.0.0.2, 80) since IPs differ - assert_eq!(key_ab.lower_ip, IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1))); - assert_eq!(key_ab.lower_port, 12345); - assert_eq!(key_ab.upper_ip, IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2))); - assert_eq!(key_ab.upper_port, 80); + assert_eq!(key_ab.lower_ip(), IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1))); + assert_eq!(key_ab.lower_port(), 12345); + assert_eq!(key_ab.upper_ip(), IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2))); + assert_eq!(key_ab.upper_port(), 80); } #[test] @@ -27,8 +27,8 @@ fn test_flow_key_same_ip_different_ports() { let key2 = FlowKey::new(ip, 12345, ip, 80); assert_eq!(key1, key2); - assert_eq!(key1.lower_port, 80); - assert_eq!(key1.upper_port, 12345); + assert_eq!(key1.lower_port(), 80); + assert_eq!(key1.upper_port(), 12345); } #[test] From 70d95cc67699f85a61e501d15851a44cdc628aaa Mon Sep 17 00:00:00 2001 From: Zious Date: Tue, 7 Apr 2026 13:15:55 -0500 Subject: [PATCH 2/6] refactor: move insert_segment and flush_contiguous into FlowDirection methods Consolidates segment mutation logic as methods on the type they operate on. Standalone functions in segment.rs become impl FlowDirection methods. All callers updated to method syntax. Part of #12 --- src/reassembly/mod.rs | 17 +- src/reassembly/segment.rs | 334 +++++++++++++++--------------- tests/reassembly_segment_tests.rs | 86 ++++---- 3 files changed, 219 insertions(+), 218 deletions(-) diff --git a/src/reassembly/mod.rs b/src/reassembly/mod.rs index e6b4e05..550206f 100644 --- a/src/reassembly/mod.rs +++ b/src/reassembly/mod.rs @@ -8,7 +8,7 @@ use crate::decoder::{ParsedPacket, Protocol, TransportInfo}; use crate::findings::{Confidence, Finding, ThreatCategory, Verdict}; use crate::reassembly::flow::{FlowKey, FlowState, TcpFlow}; use crate::reassembly::handler::{CloseReason, StreamHandler}; -use crate::reassembly::segment::{InsertResult, flush_contiguous, insert_segment}; +use crate::reassembly::segment::InsertResult; const OVERLAP_ALERT_THRESHOLD: u32 = 50; const SMALL_SEGMENT_ALERT_THRESHOLD: u32 = 2048; @@ -173,7 +173,7 @@ impl TcpReassembler { use crate::reassembly::handler::Direction; for dir in [Direction::ClientToServer, Direction::ServerToClient] { let flow_dir = flow.get_direction_mut(dir); - let flushed = flush_contiguous(flow_dir); + let flushed = flow_dir.flush_contiguous(); for (offset, data) in &flushed { self.stats.bytes_reassembled += data.len() as u64; handler.on_data(&key_clone, dir, data, *offset); @@ -217,8 +217,7 @@ impl TcpReassembler { let flow_dir = flow.get_direction_mut(dir); let before_insert = flow_dir.buffered_bytes; - let result = insert_segment( - flow_dir, + let result = flow_dir.insert_segment( seq, payload, self.config.max_depth, @@ -298,7 +297,7 @@ impl TcpReassembler { let flow = self.flows.get_mut(&key).unwrap(); let flow_dir = flow.get_direction_mut(dir); let before_flush = flow_dir.buffered_bytes; - let flushed = flush_contiguous(flow_dir); + let flushed = flow_dir.flush_contiguous(); self.total_memory -= before_flush - flow_dir.buffered_bytes; for (offset, data) in &flushed { @@ -324,7 +323,7 @@ impl TcpReassembler { use crate::reassembly::handler::Direction; for dir in [Direction::ClientToServer, Direction::ServerToClient] { let flow_dir = flow.get_direction_mut(dir); - let flushed = flush_contiguous(flow_dir); + let flushed = flow_dir.flush_contiguous(); for (offset, data) in &flushed { self.stats.bytes_reassembled += data.len() as u64; handler.on_data(&key, dir, data, *offset); @@ -367,7 +366,7 @@ impl TcpReassembler { use crate::reassembly::handler::Direction; for dir in [Direction::ClientToServer, Direction::ServerToClient] { let flow_dir = flow.get_direction_mut(dir); - let flushed = flush_contiguous(flow_dir); + let flushed = flow_dir.flush_contiguous(); for (offset, data) in &flushed { handler.on_data(&key, dir, data, *offset); } @@ -394,7 +393,7 @@ impl TcpReassembler { if let Some(flow) = self.flows.get_mut(&key) { for dir in [Direction::ClientToServer, Direction::ServerToClient] { let flow_dir = flow.get_direction_mut(dir); - let flushed = flush_contiguous(flow_dir); + let flushed = flow_dir.flush_contiguous(); for (offset, data) in &flushed { handler.on_data(&key, dir, data, *offset); } @@ -458,7 +457,7 @@ impl TcpReassembler { use crate::reassembly::handler::Direction; for dir in [Direction::ClientToServer, Direction::ServerToClient] { let flow_dir = flow.get_direction_mut(dir); - let flushed = flush_contiguous(flow_dir); + let flushed = flow_dir.flush_contiguous(); for (offset, data) in &flushed { handler.on_data(key, dir, data, *offset); } diff --git a/src/reassembly/segment.rs b/src/reassembly/segment.rs index 4a05072..32e14fa 100644 --- a/src/reassembly/segment.rs +++ b/src/reassembly/segment.rs @@ -15,201 +15,203 @@ fn seq_offset(seq: u32, isn: u32) -> u64 { seq.wrapping_sub(isn) as u64 } -/// Insert a segment into the flow direction's out-of-order buffer. -/// Applies first-wins overlap policy and tracks anomaly counters. -pub fn insert_segment( - dir: &mut FlowDirection, - seq: u32, - data: &[u8], - max_depth: usize, - max_segments: usize, -) -> InsertResult { - if data.is_empty() { - return InsertResult::Inserted; - } - - let isn = match dir.isn { - Some(isn) => isn, - None => { - debug_assert!(false, "insert_segment called with no ISN set"); - return InsertResult::DepthExceeded; +impl FlowDirection { + /// Insert a segment into the flow direction's out-of-order buffer. + /// Applies first-wins overlap policy and tracks anomaly counters. + pub fn insert_segment( + &mut self, + seq: u32, + data: &[u8], + max_depth: usize, + max_segments: usize, + ) -> InsertResult { + if data.is_empty() { + return InsertResult::Inserted; } - }; - // Enforce max segments per direction to prevent BTreeMap overhead explosion - if dir.segments.len() >= max_segments { - return InsertResult::DepthExceeded; - } - - // Track small segments (cumulative, not consecutive) - if data.len() < 8 { - dir.small_segment_count += 1; - } + let isn = match self.isn { + Some(isn) => isn, + None => { + debug_assert!(false, "insert_segment called with no ISN set"); + return InsertResult::DepthExceeded; + } + }; - // Check depth limit - let remaining_depth = max_depth.saturating_sub(dir.reassembled_bytes); - if remaining_depth == 0 { - if !dir.depth_exceeded { - dir.depth_exceeded = true; + // Enforce max segments per direction to prevent BTreeMap overhead explosion + if self.segments.len() >= max_segments { + return InsertResult::DepthExceeded; } - return InsertResult::DepthExceeded; - } - let offset = seq_offset(seq, isn); - let mut segment_data = data.to_vec(); + // Track small segments (cumulative, not consecutive) + if data.len() < 8 { + self.small_segment_count += 1; + } - // Truncate if exceeding depth - let buffered = dir.buffered_bytes; - let total_after = dir.reassembled_bytes + buffered + segment_data.len(); - let truncated = if total_after > max_depth { - let allowed = max_depth.saturating_sub(dir.reassembled_bytes + buffered); - if allowed == 0 { - dir.depth_exceeded = true; + // Check depth limit + let remaining_depth = max_depth.saturating_sub(self.reassembled_bytes); + if remaining_depth == 0 { + if !self.depth_exceeded { + self.depth_exceeded = true; + } return InsertResult::DepthExceeded; } - segment_data.truncate(allowed); - dir.depth_exceeded = true; - true - } else { - false - }; - - let new_start = offset; - let new_end = offset + segment_data.len() as u64; - - // Check for overlaps with existing segments - let mut has_overlap = false; - let mut has_conflict = false; - let mut trimmed_ranges: Vec<(u64, u64)> = Vec::new(); - - // Only segments starting before new_end can overlap [new_start, new_end). - for (&existing_offset, existing_data) in dir.segments.range(..new_end) { - let existing_end = existing_offset + existing_data.len() as u64; - - if new_start < existing_end && new_end > existing_offset { - has_overlap = true; - - let overlap_start = new_start.max(existing_offset); - let overlap_end = new_end.min(existing_end); - - // Use slice comparison (SIMD-optimized) instead of byte-by-byte - let new_slice_start = (overlap_start - new_start) as usize; - let new_slice_end = (overlap_end - new_start) as usize; - let existing_slice_start = (overlap_start - existing_offset) as usize; - let existing_slice_end = (overlap_end - existing_offset) as usize; - - if new_slice_end <= segment_data.len() - && existing_slice_end <= existing_data.len() - && segment_data[new_slice_start..new_slice_end] - != existing_data[existing_slice_start..existing_slice_end] - { - has_conflict = true; + + let offset = seq_offset(seq, isn); + let mut segment_data = data.to_vec(); + + // Truncate if exceeding depth + let buffered = self.buffered_bytes; + let total_after = self.reassembled_bytes + buffered + segment_data.len(); + let truncated = if total_after > max_depth { + let allowed = max_depth.saturating_sub(self.reassembled_bytes + buffered); + if allowed == 0 { + self.depth_exceeded = true; + return InsertResult::DepthExceeded; } + segment_data.truncate(allowed); + self.depth_exceeded = true; + true + } else { + false + }; - trimmed_ranges.push((existing_offset, existing_end)); - } - } + let new_start = offset; + let new_end = offset + segment_data.len() as u64; - if has_overlap { - dir.overlap_count += 1; + // Check for overlaps with existing segments + let mut has_overlap = false; + let mut has_conflict = false; + let mut trimmed_ranges: Vec<(u64, u64)> = Vec::new(); - let fully_covered = trimmed_ranges - .iter() - .any(|&(es, ee)| es <= new_start && ee >= new_end); - if fully_covered { - return if has_conflict { - InsertResult::ConflictingOverlap - } else { - InsertResult::Duplicate - }; - } + // Only segments starting before new_end can overlap [new_start, new_end). + for (&existing_offset, existing_data) in self.segments.range(..new_end) { + let existing_end = existing_offset + existing_data.len() as u64; - // First-wins: insert only gap portions - let mut gaps: Vec<(u64, u64)> = Vec::new(); - let mut cursor = new_start; + if new_start < existing_end && new_end > existing_offset { + has_overlap = true; - let mut sorted_ranges = trimmed_ranges.clone(); - sorted_ranges.sort_by_key(|&(start, _)| start); + let overlap_start = new_start.max(existing_offset); + let overlap_end = new_end.min(existing_end); + + // Use slice comparison (SIMD-optimized) instead of byte-by-byte + let new_slice_start = (overlap_start - new_start) as usize; + let new_slice_end = (overlap_end - new_start) as usize; + let existing_slice_start = (overlap_start - existing_offset) as usize; + let existing_slice_end = (overlap_end - existing_offset) as usize; + + if new_slice_end <= segment_data.len() + && existing_slice_end <= existing_data.len() + && segment_data[new_slice_start..new_slice_end] + != existing_data[existing_slice_start..existing_slice_end] + { + has_conflict = true; + } - for &(es, ee) in &sorted_ranges { - if cursor < es { - gaps.push((cursor, es.min(new_end))); + trimmed_ranges.push((existing_offset, existing_end)); } - cursor = cursor.max(ee); - } - if cursor < new_end { - gaps.push((cursor, new_end)); } - let had_gap = !gaps.is_empty(); + if has_overlap { + self.overlap_count += 1; + + let fully_covered = trimmed_ranges + .iter() + .any(|&(es, ee)| es <= new_start && ee >= new_end); + if fully_covered { + return if has_conflict { + InsertResult::ConflictingOverlap + } else { + InsertResult::Duplicate + }; + } + + // First-wins: insert only gap portions + let mut gaps: Vec<(u64, u64)> = Vec::new(); + let mut cursor = new_start; - for (gap_start, gap_end) in gaps { - // Enforce max_segments inside gap insertion loop - if dir.segments.len() >= max_segments { - break; + let mut sorted_ranges = trimmed_ranges.clone(); + sorted_ranges.sort_by_key(|&(start, _)| start); + + for &(es, ee) in &sorted_ranges { + if cursor < es { + gaps.push((cursor, es.min(new_end))); + } + cursor = cursor.max(ee); + } + if cursor < new_end { + gaps.push((cursor, new_end)); } - let start_idx = (gap_start - new_start) as usize; - let end_idx = (gap_end - new_start) as usize; - if start_idx < segment_data.len() && end_idx <= segment_data.len() { - let gap_data = segment_data[start_idx..end_idx].to_vec(); - if !gap_data.is_empty() { - let gap_len = gap_data.len(); - let old = dir.segments.insert(gap_start, gap_data); - debug_assert!( - old.is_none(), - "gap_start {} collided with existing segment", - gap_start - ); - if let Some(old) = old { - dir.buffered_bytes -= old.len(); + + let had_gap = !gaps.is_empty(); + + for (gap_start, gap_end) in gaps { + // Enforce max_segments inside gap insertion loop + if self.segments.len() >= max_segments { + break; + } + let start_idx = (gap_start - new_start) as usize; + let end_idx = (gap_end - new_start) as usize; + if start_idx < segment_data.len() && end_idx <= segment_data.len() { + let gap_data = segment_data[start_idx..end_idx].to_vec(); + if !gap_data.is_empty() { + let gap_len = gap_data.len(); + let old = self.segments.insert(gap_start, gap_data); + debug_assert!( + old.is_none(), + "gap_start {} collided with existing segment", + gap_start + ); + if let Some(old) = old { + self.buffered_bytes -= old.len(); + } + self.buffered_bytes += gap_len; } - dir.buffered_bytes += gap_len; } } + + // Only report ConflictingOverlap when fully covered (no gap was inserted) + return if !had_gap && has_conflict { + InsertResult::ConflictingOverlap + } else if truncated { + InsertResult::Truncated + } else { + InsertResult::PartialOverlap + }; + } + + // No overlap — insert normally + let data_len = segment_data.len(); + let old = self.segments.insert(offset, segment_data); + debug_assert!( + old.is_none(), + "offset {} collided with existing segment in no-overlap path", + offset + ); + if let Some(old) = old { + self.buffered_bytes -= old.len(); } + self.buffered_bytes += data_len; - // Only report ConflictingOverlap when fully covered (no gap was inserted) - return if !had_gap && has_conflict { - InsertResult::ConflictingOverlap - } else if truncated { + if truncated { InsertResult::Truncated } else { - InsertResult::PartialOverlap - }; - } - - // No overlap — insert normally - let data_len = segment_data.len(); - let old = dir.segments.insert(offset, segment_data); - debug_assert!( - old.is_none(), - "offset {} collided with existing segment in no-overlap path", - offset - ); - if let Some(old) = old { - dir.buffered_bytes -= old.len(); + InsertResult::Inserted + } } - dir.buffered_bytes += data_len; - if truncated { - InsertResult::Truncated - } else { - InsertResult::Inserted - } -} + /// Flush contiguous segments starting from base_offset. + /// Returns Vec of (offset, data) pairs that were flushed. + pub fn flush_contiguous(&mut self) -> Vec<(u64, Vec)> { + let mut flushed = Vec::new(); + + while let Some(data) = self.segments.remove(&self.base_offset) { + let offset = self.base_offset; + self.buffered_bytes -= data.len(); + self.base_offset += data.len() as u64; + self.reassembled_bytes += data.len(); + flushed.push((offset, data)); + } -/// Flush contiguous segments starting from base_offset. -/// Returns Vec of (offset, data) pairs that were flushed. -pub fn flush_contiguous(dir: &mut FlowDirection) -> Vec<(u64, Vec)> { - let mut flushed = Vec::new(); - - while let Some(data) = dir.segments.remove(&dir.base_offset) { - let offset = dir.base_offset; - dir.buffered_bytes -= data.len(); - dir.base_offset += data.len() as u64; - dir.reassembled_bytes += data.len(); - flushed.push((offset, data)); + flushed } - - flushed } diff --git a/tests/reassembly_segment_tests.rs b/tests/reassembly_segment_tests.rs index c5ef1ef..2320f25 100644 --- a/tests/reassembly_segment_tests.rs +++ b/tests/reassembly_segment_tests.rs @@ -1,12 +1,12 @@ use wirerust::reassembly::flow::FlowDirection; -use wirerust::reassembly::segment::{InsertResult, flush_contiguous, insert_segment}; +use wirerust::reassembly::segment::InsertResult; #[test] fn test_insert_single_segment() { let mut dir = FlowDirection::new(); dir.set_isn(1000); - let result = insert_segment(&mut dir, 1001, b"hello", 10_485_760, 10_000); + let result = dir.insert_segment(1001, b"hello", 10_485_760, 10_000); assert_eq!(result, InsertResult::Inserted); assert_eq!(dir.segments.len(), 1); assert_eq!(dir.segments.get(&1), Some(&b"hello".to_vec())); @@ -17,9 +17,9 @@ fn test_flush_contiguous_single() { let mut dir = FlowDirection::new(); dir.set_isn(1000); - insert_segment(&mut dir, 1001, b"hello", 10_485_760, 10_000); + dir.insert_segment(1001, b"hello", 10_485_760, 10_000); - let flushed = flush_contiguous(&mut dir); + let flushed = dir.flush_contiguous(); assert_eq!(flushed.len(), 1); assert_eq!(flushed[0].0, 1); // offset assert_eq!(flushed[0].1, b"hello"); @@ -33,10 +33,10 @@ fn test_flush_contiguous_ordered() { let mut dir = FlowDirection::new(); dir.set_isn(1000); - insert_segment(&mut dir, 1001, b"aaa", 10_485_760, 10_000); - insert_segment(&mut dir, 1004, b"bbb", 10_485_760, 10_000); + dir.insert_segment(1001, b"aaa", 10_485_760, 10_000); + dir.insert_segment(1004, b"bbb", 10_485_760, 10_000); - let flushed = flush_contiguous(&mut dir); + let flushed = dir.flush_contiguous(); assert_eq!(flushed.len(), 2); assert_eq!(flushed[0].1, b"aaa"); assert_eq!(flushed[1].1, b"bbb"); @@ -50,13 +50,13 @@ fn test_out_of_order_buffering() { dir.set_isn(1000); // Insert segment 2 first (out of order) - insert_segment(&mut dir, 1004, b"bbb", 10_485_760, 10_000); - let flushed = flush_contiguous(&mut dir); + dir.insert_segment(1004, b"bbb", 10_485_760, 10_000); + let flushed = dir.flush_contiguous(); assert!(flushed.is_empty()); // Can't flush — gap at offset 1 // Now insert segment 1 - insert_segment(&mut dir, 1001, b"aaa", 10_485_760, 10_000); - let flushed = flush_contiguous(&mut dir); + dir.insert_segment(1001, b"aaa", 10_485_760, 10_000); + let flushed = dir.flush_contiguous(); assert_eq!(flushed.len(), 2); // Both flush now assert_eq!(flushed[0].1, b"aaa"); assert_eq!(flushed[1].1, b"bbb"); @@ -68,8 +68,8 @@ fn test_retransmission_dedup() { let mut dir = FlowDirection::new(); dir.set_isn(1000); - insert_segment(&mut dir, 1001, b"hello", 10_485_760, 10_000); - let result = insert_segment(&mut dir, 1001, b"hello", 10_485_760, 10_000); + dir.insert_segment(1001, b"hello", 10_485_760, 10_000); + let result = dir.insert_segment(1001, b"hello", 10_485_760, 10_000); assert_eq!(result, InsertResult::Duplicate); assert_eq!(dir.segments.len(), 1); // No duplicate stored assert_eq!(dir.buffered_bytes, 5); // counter must not double-count @@ -81,15 +81,15 @@ fn test_overlap_first_wins() { dir.set_isn(1000); // Insert "AAABBB" at offset 1 - insert_segment(&mut dir, 1001, b"AAABBB", 10_485_760, 10_000); + dir.insert_segment(1001, b"AAABBB", 10_485_760, 10_000); // Overlapping insert: "XXXCC" at offset 4 (overlaps with "BBB" at 4-6) - let result = insert_segment(&mut dir, 1004, b"XXXCC", 10_485_760, 10_000); + let result = dir.insert_segment(1004, b"XXXCC", 10_485_760, 10_000); assert_eq!(result, InsertResult::PartialOverlap); assert_eq!(dir.overlap_count, 1); // Flush and verify: first 6 bytes from original, then "CC" from new - let flushed = flush_contiguous(&mut dir); + let flushed = dir.flush_contiguous(); let all_bytes: Vec = flushed .iter() .flat_map(|(_, data)| data.iter().copied()) @@ -102,15 +102,15 @@ fn test_overlap_conflicting_data_detected() { let mut dir = FlowDirection::new(); dir.set_isn(1000); - insert_segment(&mut dir, 1001, b"AAAA", 10_485_760, 10_000); + dir.insert_segment(1001, b"AAAA", 10_485_760, 10_000); // Same range, different data - let result = insert_segment(&mut dir, 1001, b"BBBB", 10_485_760, 10_000); + let result = dir.insert_segment(1001, b"BBBB", 10_485_760, 10_000); assert_eq!(result, InsertResult::ConflictingOverlap); assert_eq!(dir.overlap_count, 1); // Original data preserved (first-wins) - let flushed = flush_contiguous(&mut dir); + let flushed = dir.flush_contiguous(); assert_eq!(flushed[0].1, b"AAAA"); } @@ -121,13 +121,13 @@ fn test_sequence_wraparound() { dir.set_isn(0xFFFF_FFF0); // First data byte at ISN+1 = 0xFFFF_FFF1, offset = 1 - insert_segment(&mut dir, 0xFFFF_FFF1, b"before", 10_485_760, 10_000); + dir.insert_segment(0xFFFF_FFF1, b"before", 10_485_760, 10_000); // Next segment wraps: seq = 0xFFFF_FFF1 + 6 = 0xFFFF_FFF7, offset = 7 - insert_segment(&mut dir, 0xFFFF_FFF7, b"wrap", 10_485_760, 10_000); + dir.insert_segment(0xFFFF_FFF7, b"wrap", 10_485_760, 10_000); // Another after wrap: seq = 0xFFFF_FFFB, offset = 11 - insert_segment(&mut dir, 0xFFFF_FFFB, b"around", 10_485_760, 10_000); + dir.insert_segment(0xFFFF_FFFB, b"around", 10_485_760, 10_000); - let flushed = flush_contiguous(&mut dir); + let flushed = dir.flush_contiguous(); let all_bytes: Vec = flushed .iter() .flat_map(|(_, data)| data.iter().copied()) @@ -143,7 +143,7 @@ fn test_small_segment_tracking() { // Insert small segments for i in 0..5u32 { let seq = 1001 + i; - insert_segment(&mut dir, seq, b"a", 10_485_760, 10_000); + dir.insert_segment(seq, b"a", 10_485_760, 10_000); } assert_eq!(dir.small_segment_count, 5); @@ -153,9 +153,9 @@ fn test_small_segment_tracking() { fn test_buffered_bytes_after_insert() { let mut dir = FlowDirection::new(); dir.set_isn(1000); - insert_segment(&mut dir, 1001, b"hello", 10_485_760, 10_000); + dir.insert_segment(1001, b"hello", 10_485_760, 10_000); assert_eq!(dir.buffered_bytes, 5); - insert_segment(&mut dir, 1006, b"world", 10_485_760, 10_000); + dir.insert_segment(1006, b"world", 10_485_760, 10_000); assert_eq!(dir.buffered_bytes, 10); } @@ -163,9 +163,9 @@ fn test_buffered_bytes_after_insert() { fn test_buffered_bytes_after_overlap() { let mut dir = FlowDirection::new(); dir.set_isn(1000); - insert_segment(&mut dir, 1001, b"AAABBB", 10_485_760, 10_000); + dir.insert_segment(1001, b"AAABBB", 10_485_760, 10_000); assert_eq!(dir.buffered_bytes, 6); - insert_segment(&mut dir, 1004, b"XXXCC", 10_485_760, 10_000); + dir.insert_segment(1004, b"XXXCC", 10_485_760, 10_000); assert_eq!(dir.buffered_bytes, 8); // 6 original + 2 gap bytes } @@ -174,10 +174,10 @@ fn test_buffered_bytes_after_flush() { let mut dir = FlowDirection::new(); dir.set_isn(1000); - insert_segment(&mut dir, 1001, b"hello", 10_485_760, 10_000); + dir.insert_segment(1001, b"hello", 10_485_760, 10_000); assert_eq!(dir.buffered_bytes, 5); - let flushed = flush_contiguous(&mut dir); + let flushed = dir.flush_contiguous(); assert_eq!(flushed.len(), 1); assert_eq!(dir.buffered_bytes, 0); } @@ -188,12 +188,12 @@ fn test_buffered_bytes_partial_flush() { dir.set_isn(1000); // Insert segment at offset 1 (contiguous) and offset 10 (gap) - insert_segment(&mut dir, 1001, b"aaa", 10_485_760, 10_000); - insert_segment(&mut dir, 1010, b"bbb", 10_485_760, 10_000); + dir.insert_segment(1001, b"aaa", 10_485_760, 10_000); + dir.insert_segment(1010, b"bbb", 10_485_760, 10_000); assert_eq!(dir.buffered_bytes, 6); // Flush only flushes contiguous segment at offset 1 - let flushed = flush_contiguous(&mut dir); + let flushed = dir.flush_contiguous(); assert_eq!(flushed.len(), 1); assert_eq!(flushed[0].1, b"aaa"); assert_eq!(dir.buffered_bytes, 3); // "bbb" remains buffered @@ -206,18 +206,18 @@ fn test_depth_limit_truncation() { let max_depth: usize = 100; // small for testing let data = vec![b'A'; 80]; - insert_segment(&mut dir, 1001, &data, max_depth, 10_000); - flush_contiguous(&mut dir); + dir.insert_segment(1001, &data, max_depth, 10_000); + dir.flush_contiguous(); assert_eq!(dir.reassembled_bytes, 80); assert!(!dir.depth_exceeded); // This should be truncated to 20 bytes let data2 = vec![b'B'; 50]; - let result = insert_segment(&mut dir, 1081, &data2, max_depth, 10_000); + let result = dir.insert_segment(1081, &data2, max_depth, 10_000); assert_eq!(result, InsertResult::Truncated); assert!(dir.depth_exceeded); - let flushed = flush_contiguous(&mut dir); + let flushed = dir.flush_contiguous(); assert_eq!(flushed[0].1.len(), 20); // truncated from 50 to 20 assert_eq!(dir.reassembled_bytes, 100); } @@ -228,21 +228,21 @@ fn test_overlap_detection_boundary() { dir.set_isn(1000); // Insert segment at offset 1, length 5 (covers 1-5) - insert_segment(&mut dir, 1001, b"AAAAA", 10_485_760, 10_000); + dir.insert_segment(1001, b"AAAAA", 10_485_760, 10_000); // Insert segment at offset 10, length 5 (covers 10-14) — no overlap with above - insert_segment(&mut dir, 1010, b"BBBBB", 10_485_760, 10_000); + dir.insert_segment(1010, b"BBBBB", 10_485_760, 10_000); assert_eq!(dir.segments.len(), 2); assert_eq!(dir.overlap_count, 0); // Insert segment at offset 3, length 4 (covers 3-6) — overlaps first, not second - let result = insert_segment(&mut dir, 1003, b"XXXX", 10_485_760, 10_000); + let result = dir.insert_segment(1003, b"XXXX", 10_485_760, 10_000); assert_eq!(result, InsertResult::PartialOverlap); assert_eq!(dir.overlap_count, 1); // Insert segment at offset 6, length 4 (covers 6-9). // The partial-overlap insert above deposited a 1-byte gap at offset 6 ("X"), // so this segment overlaps that byte and is PartialOverlap. - let result = insert_segment(&mut dir, 1006, b"CCCC", 10_485_760, 10_000); + let result = dir.insert_segment(1006, b"CCCC", 10_485_760, 10_000); assert_eq!(result, InsertResult::PartialOverlap); assert_eq!(dir.overlap_count, 2); } @@ -253,11 +253,11 @@ fn test_range_boundary_exact_new_end() { dir.set_isn(1000); // Insert segment at offset 1, length 5 (covers 1-5, ends at 6) - insert_segment(&mut dir, 1001, b"AAAAA", 10_485_760, 10_000); + dir.insert_segment(1001, b"AAAAA", 10_485_760, 10_000); // Insert segment starting exactly at the end of the first (offset 6) // This should NOT overlap — range(..new_end) must exclude it - let result = insert_segment(&mut dir, 1006, b"BBBBB", 10_485_760, 10_000); + let result = dir.insert_segment(1006, b"BBBBB", 10_485_760, 10_000); assert_eq!(result, InsertResult::Inserted); assert_eq!(dir.overlap_count, 0); } From a6b4b5fabfa37b58564363bf01316998e6225e89 Mon Sep 17 00:00:00 2001 From: Zious Date: Tue, 7 Apr 2026 13:20:18 -0500 Subject: [PATCH 3/6] refactor: extract close_flow helper, fix bytes_reassembled counting Deduplicates the flush-remove-notify pattern used by RST, FIN, expire, evict, and finalize into a single close_flow() method. Fixes minor inconsistency: expire/evict/finalize now count bytes_reassembled during final flush (RST/FIN already did). Part of #12 --- src/reassembly/mod.rs | 130 +++++++------------------------ tests/reassembly_engine_tests.rs | 44 +++++++++++ 2 files changed, 73 insertions(+), 101 deletions(-) diff --git a/src/reassembly/mod.rs b/src/reassembly/mod.rs index 550206f..f0a2597 100644 --- a/src/reassembly/mod.rs +++ b/src/reassembly/mod.rs @@ -160,29 +160,7 @@ impl TcpReassembler { if rst { flow.on_rst(); self.stats.flows_rst += 1; - let key_clone = key.clone(); - // Capture memory before flushing: total_memory still holds this flow's - // full contribution. Subtracting flow_mem after removal zeros it out. - let flow_mem = self - .flows - .get(&key_clone) - .expect("flow must exist before RST removal") - .memory_used(); - // Flush buffered contiguous data before removing - if let Some(flow) = self.flows.get_mut(&key_clone) { - use crate::reassembly::handler::Direction; - for dir in [Direction::ClientToServer, Direction::ServerToClient] { - let flow_dir = flow.get_direction_mut(dir); - let flushed = flow_dir.flush_contiguous(); - for (offset, data) in &flushed { - self.stats.bytes_reassembled += data.len() as u64; - handler.on_data(&key_clone, dir, data, *offset); - } - } - } - handler.on_flow_close(&key_clone, CloseReason::Rst); - self.flows.remove(&key_clone); - self.total_memory -= flow_mem; + self.close_flow(&key, CloseReason::Rst, handler); return; } @@ -312,28 +290,8 @@ impl TcpReassembler { .get(&key) .is_some_and(|f| f.state == FlowState::Closed) { - // Capture memory before flushing (see RST handler comment for rationale) - let flow_mem = self - .flows - .get(&key) - .expect("flow must exist before FIN removal") - .memory_used(); - // Flush remaining data in both directions before removal - if let Some(flow) = self.flows.get_mut(&key) { - use crate::reassembly::handler::Direction; - for dir in [Direction::ClientToServer, Direction::ServerToClient] { - let flow_dir = flow.get_direction_mut(dir); - let flushed = flow_dir.flush_contiguous(); - for (offset, data) in &flushed { - self.stats.bytes_reassembled += data.len() as u64; - handler.on_data(&key, dir, data, *offset); - } - } - } self.stats.flows_fin += 1; - handler.on_flow_close(&key, CloseReason::Fin); - self.flows.remove(&key); - self.total_memory -= flow_mem; + self.close_flow(&key, CloseReason::Fin, handler); } // 12. Evict flows if memcap exceeded @@ -356,52 +314,16 @@ impl TcpReassembler { .collect(); for key in expired_keys { - let flow_mem = self - .flows - .get(&key) - .expect("expired flow must exist") - .memory_used(); - // Flush salvageable data before removing - if let Some(flow) = self.flows.get_mut(&key) { - use crate::reassembly::handler::Direction; - for dir in [Direction::ClientToServer, Direction::ServerToClient] { - let flow_dir = flow.get_direction_mut(dir); - let flushed = flow_dir.flush_contiguous(); - for (offset, data) in &flushed { - handler.on_data(&key, dir, data, *offset); - } - } - } - self.flows.remove(&key); - self.total_memory -= flow_mem; self.stats.flows_expired += 1; - handler.on_flow_close(&key, CloseReason::Timeout); + self.close_flow(&key, CloseReason::Timeout, handler); } } /// Close all remaining flows (called at end of capture). pub fn finalize(&mut self, handler: &mut dyn StreamHandler) { - use crate::reassembly::handler::Direction; let all_keys: Vec = self.flows.keys().cloned().collect(); for key in all_keys { - let flow_mem = self - .flows - .get(&key) - .expect("finalize flow must exist") - .memory_used(); - // Flush any remaining contiguous data before closing - if let Some(flow) = self.flows.get_mut(&key) { - for dir in [Direction::ClientToServer, Direction::ServerToClient] { - let flow_dir = flow.get_direction_mut(dir); - let flushed = flow_dir.flush_contiguous(); - for (offset, data) in &flushed { - handler.on_data(&key, dir, data, *offset); - } - } - } - self.flows.remove(&key); - self.total_memory -= flow_mem; - handler.on_flow_close(&key, CloseReason::Timeout); + self.close_flow(&key, CloseReason::Timeout, handler); } } @@ -422,6 +344,30 @@ impl TcpReassembler { // --- Private helpers --- + /// Flush remaining contiguous data in both directions, remove the flow, + /// update memory accounting, and notify the handler. + fn close_flow(&mut self, key: &FlowKey, reason: CloseReason, handler: &mut dyn StreamHandler) { + use crate::reassembly::handler::Direction; + let flow_mem = self + .flows + .get(key) + .expect("flow must exist before close") + .memory_used(); + if let Some(flow) = self.flows.get_mut(key) { + for dir in [Direction::ClientToServer, Direction::ServerToClient] { + let flow_dir = flow.get_direction_mut(dir); + let flushed = flow_dir.flush_contiguous(); + for (offset, data) in &flushed { + self.stats.bytes_reassembled += data.len() as u64; + handler.on_data(key, dir, data, *offset); + } + } + } + self.flows.remove(key); + self.total_memory -= flow_mem; + handler.on_flow_close(key, reason); + } + /// Evict flows when memcap is exceeded. /// Strategy: evict non-established flows first (sorted by LRU), /// then established flows by LRU. @@ -447,26 +393,8 @@ impl TcpReassembler { { break; } - let flow_mem = self - .flows - .get(key) - .expect("eviction candidate must exist") - .memory_used(); - // Flush salvageable contiguous data before evicting - if let Some(flow) = self.flows.get_mut(key) { - use crate::reassembly::handler::Direction; - for dir in [Direction::ClientToServer, Direction::ServerToClient] { - let flow_dir = flow.get_direction_mut(dir); - let flushed = flow_dir.flush_contiguous(); - for (offset, data) in &flushed { - handler.on_data(key, dir, data, *offset); - } - } - } - self.flows.remove(key); - self.total_memory -= flow_mem; self.stats.evictions += 1; - handler.on_flow_close(key, CloseReason::MemoryPressure); + self.close_flow(key, CloseReason::MemoryPressure, handler); } } diff --git a/tests/reassembly_engine_tests.rs b/tests/reassembly_engine_tests.rs index 104834b..d409d8c 100644 --- a/tests/reassembly_engine_tests.rs +++ b/tests/reassembly_engine_tests.rs @@ -967,3 +967,47 @@ fn test_max_segments_per_direction() { reassembler.finalize(&mut handler); assert_eq!(reassembler.total_memory(), 0); } + +#[test] +fn test_finalize_bytes_reassembled_consistent() { + let config = ReassemblyConfig::default(); + let mut reassembler = TcpReassembler::new(config); + let mut handler = RecordingHandler::new(); + + let client = [10, 0, 0, 1]; + let server = [10, 0, 0, 2]; + + let syn = make_tcp_packet( + client, + 12345, + server, + 80, + 1000, + &[], + true, + false, + false, + false, + ); + reassembler.process_packet(&syn, 1, &mut handler); + + let data = make_tcp_packet( + client, 12345, server, 80, 1001, b"hello", false, false, false, false, + ); + reassembler.process_packet(&data, 2, &mut handler); + + let bytes_before_finalize = reassembler.stats().bytes_reassembled; + reassembler.finalize(&mut handler); + let bytes_after_finalize = reassembler.stats().bytes_reassembled; + + assert!(bytes_after_finalize >= bytes_before_finalize); + let total_delivered: u64 = handler + .data_events + .iter() + .map(|(_, _, data, _)| data.len() as u64) + .sum(); + assert_eq!( + bytes_after_finalize, total_delivered, + "bytes_reassembled must match total bytes delivered to handler" + ); +} From d944ccaa1c803f23c06d31c96dcf5f90e637cb0d Mon Sep 17 00:00:00 2001 From: Zious Date: Tue, 7 Apr 2026 13:57:52 -0500 Subject: [PATCH 4/6] feat: add max_receive_window to reject far-ahead segments Segments with offsets beyond base_offset + max_receive_window are rejected with InsertResult::OutOfWindow. Default 1MB matches Suricata/Zeek/Snort industry defaults. Counter-only, no Finding generated (matches industry practice). Part of #12 --- src/reassembly/mod.rs | 13 +++++ src/reassembly/segment.rs | 10 +++- tests/reassembly_engine_tests.rs | 60 ++++++++++++++++++++++ tests/reassembly_segment_tests.rs | 84 +++++++++++++++++++------------ 4 files changed, 135 insertions(+), 32 deletions(-) diff --git a/src/reassembly/mod.rs b/src/reassembly/mod.rs index f0a2597..ce264bf 100644 --- a/src/reassembly/mod.rs +++ b/src/reassembly/mod.rs @@ -27,6 +27,9 @@ pub struct ReassemblyConfig { pub max_flows: usize, /// Maximum segments per flow direction. Prevents BTreeMap overhead explosion. pub max_segments_per_direction: usize, + /// Maximum distance (bytes) ahead of base_offset to accept a segment. + /// Segments beyond this are dropped. Default 1MB matches Suricata/Zeek/Snort. + pub max_receive_window: usize, } impl Default for ReassemblyConfig { @@ -37,6 +40,7 @@ impl Default for ReassemblyConfig { flow_timeout_secs: 300, // 5 minutes max_flows: 100_000, // 100K concurrent flows max_segments_per_direction: 10_000, // 10K segments per direction + max_receive_window: 1_048_576, // 1 MB forward window } } } @@ -55,6 +59,7 @@ pub struct ReassemblyStats { pub segments_inserted: u64, pub segments_duplicates: u64, pub segments_overlaps: u64, + pub segments_out_of_window: u64, pub bytes_reassembled: u64, pub evictions: u64, } @@ -77,6 +82,10 @@ impl TcpReassembler { config.max_segments_per_direction > 0, "max_segments_per_direction must be > 0" ); + assert!( + config.max_receive_window > 0, + "max_receive_window must be > 0" + ); TcpReassembler { config, flows: HashMap::new(), @@ -200,6 +209,7 @@ impl TcpReassembler { payload, self.config.max_depth, self.config.max_segments_per_direction, + self.config.max_receive_window, ); debug_assert!( flow_dir.buffered_bytes >= before_insert, @@ -227,6 +237,9 @@ impl TcpReassembler { InsertResult::DepthExceeded => { // Already tracked in the direction } + InsertResult::OutOfWindow => { + self.stats.segments_out_of_window += 1; + } } // Check anomaly thresholds on the direction diff --git a/src/reassembly/segment.rs b/src/reassembly/segment.rs index 32e14fa..e8d3489 100644 --- a/src/reassembly/segment.rs +++ b/src/reassembly/segment.rs @@ -8,6 +8,7 @@ pub enum InsertResult { ConflictingOverlap, Truncated, DepthExceeded, + OutOfWindow, } /// Compute the ISN-relative offset for a sequence number. @@ -24,6 +25,7 @@ impl FlowDirection { data: &[u8], max_depth: usize, max_segments: usize, + max_receive_window: usize, ) -> InsertResult { if data.is_empty() { return InsertResult::Inserted; @@ -37,6 +39,13 @@ impl FlowDirection { } }; + let offset = seq_offset(seq, isn); + + // Reject segments too far ahead of base_offset (before overlap/depth checks) + if offset > self.base_offset.saturating_add(max_receive_window as u64) { + return InsertResult::OutOfWindow; + } + // Enforce max segments per direction to prevent BTreeMap overhead explosion if self.segments.len() >= max_segments { return InsertResult::DepthExceeded; @@ -56,7 +65,6 @@ impl FlowDirection { return InsertResult::DepthExceeded; } - let offset = seq_offset(seq, isn); let mut segment_data = data.to_vec(); // Truncate if exceeding depth diff --git a/tests/reassembly_engine_tests.rs b/tests/reassembly_engine_tests.rs index d409d8c..a7f0eb1 100644 --- a/tests/reassembly_engine_tests.rs +++ b/tests/reassembly_engine_tests.rs @@ -1011,3 +1011,63 @@ fn test_finalize_bytes_reassembled_consistent() { "bytes_reassembled must match total bytes delivered to handler" ); } + +#[test] +fn test_out_of_window_segment_rejected_by_engine() { + let config = ReassemblyConfig { + max_receive_window: 1000, // small window for testing + ..ReassemblyConfig::default() + }; + let mut reassembler = TcpReassembler::new(config); + let mut handler = RecordingHandler::new(); + + let client = [10, 0, 0, 1]; + let server = [10, 0, 0, 2]; + + // SYN (ISN=1000, base_offset=1) + let syn = make_tcp_packet( + client, + 12345, + server, + 80, + 1000, + &[], + true, + false, + false, + false, + ); + reassembler.process_packet(&syn, 1, &mut handler); + + // Normal data at offset 1 (within window) + let p1 = make_tcp_packet( + client, 12345, server, 80, 1001, b"hello", false, false, false, false, + ); + reassembler.process_packet(&p1, 2, &mut handler); + + assert_eq!(handler.data_events.len(), 1); + assert_eq!(reassembler.stats().segments_inserted, 1); + + // Segment way beyond window: base_offset=6, window=1000, so offset > 1006 is rejected + // seq = ISN + offset = 1000 + 2000 = 3000 + let far = make_tcp_packet( + client, 12345, server, 80, 3000, b"evil", false, false, false, false, + ); + reassembler.process_packet(&far, 3, &mut handler); + + // Should be rejected — no new data events, counter incremented + assert_eq!(handler.data_events.len(), 1); // unchanged + assert_eq!(reassembler.stats().segments_out_of_window, 1); + assert_eq!(reassembler.stats().segments_inserted, 1); // unchanged + + // Segment just within window should be accepted + // base_offset=6, window=1000, so offset 1006 is the last accepted + // seq = ISN + offset = 1000 + 1006 = 2006 + let edge = make_tcp_packet( + client, 12345, server, 80, 2006, b"ok", false, false, false, false, + ); + reassembler.process_packet(&edge, 4, &mut handler); + + assert_eq!(reassembler.stats().segments_inserted, 2); + assert_eq!(reassembler.stats().segments_out_of_window, 1); // unchanged +} diff --git a/tests/reassembly_segment_tests.rs b/tests/reassembly_segment_tests.rs index 2320f25..c5712e3 100644 --- a/tests/reassembly_segment_tests.rs +++ b/tests/reassembly_segment_tests.rs @@ -6,7 +6,7 @@ fn test_insert_single_segment() { let mut dir = FlowDirection::new(); dir.set_isn(1000); - let result = dir.insert_segment(1001, b"hello", 10_485_760, 10_000); + let result = dir.insert_segment(1001, b"hello", 10_485_760, 10_000, 10_485_760); assert_eq!(result, InsertResult::Inserted); assert_eq!(dir.segments.len(), 1); assert_eq!(dir.segments.get(&1), Some(&b"hello".to_vec())); @@ -17,7 +17,7 @@ fn test_flush_contiguous_single() { let mut dir = FlowDirection::new(); dir.set_isn(1000); - dir.insert_segment(1001, b"hello", 10_485_760, 10_000); + dir.insert_segment(1001, b"hello", 10_485_760, 10_000, 10_485_760); let flushed = dir.flush_contiguous(); assert_eq!(flushed.len(), 1); @@ -33,8 +33,8 @@ fn test_flush_contiguous_ordered() { let mut dir = FlowDirection::new(); dir.set_isn(1000); - dir.insert_segment(1001, b"aaa", 10_485_760, 10_000); - dir.insert_segment(1004, b"bbb", 10_485_760, 10_000); + dir.insert_segment(1001, b"aaa", 10_485_760, 10_000, 10_485_760); + dir.insert_segment(1004, b"bbb", 10_485_760, 10_000, 10_485_760); let flushed = dir.flush_contiguous(); assert_eq!(flushed.len(), 2); @@ -50,12 +50,12 @@ fn test_out_of_order_buffering() { dir.set_isn(1000); // Insert segment 2 first (out of order) - dir.insert_segment(1004, b"bbb", 10_485_760, 10_000); + dir.insert_segment(1004, b"bbb", 10_485_760, 10_000, 10_485_760); let flushed = dir.flush_contiguous(); assert!(flushed.is_empty()); // Can't flush — gap at offset 1 // Now insert segment 1 - dir.insert_segment(1001, b"aaa", 10_485_760, 10_000); + dir.insert_segment(1001, b"aaa", 10_485_760, 10_000, 10_485_760); let flushed = dir.flush_contiguous(); assert_eq!(flushed.len(), 2); // Both flush now assert_eq!(flushed[0].1, b"aaa"); @@ -68,8 +68,8 @@ fn test_retransmission_dedup() { let mut dir = FlowDirection::new(); dir.set_isn(1000); - dir.insert_segment(1001, b"hello", 10_485_760, 10_000); - let result = dir.insert_segment(1001, b"hello", 10_485_760, 10_000); + dir.insert_segment(1001, b"hello", 10_485_760, 10_000, 10_485_760); + let result = dir.insert_segment(1001, b"hello", 10_485_760, 10_000, 10_485_760); assert_eq!(result, InsertResult::Duplicate); assert_eq!(dir.segments.len(), 1); // No duplicate stored assert_eq!(dir.buffered_bytes, 5); // counter must not double-count @@ -81,10 +81,10 @@ fn test_overlap_first_wins() { dir.set_isn(1000); // Insert "AAABBB" at offset 1 - dir.insert_segment(1001, b"AAABBB", 10_485_760, 10_000); + dir.insert_segment(1001, b"AAABBB", 10_485_760, 10_000, 10_485_760); // Overlapping insert: "XXXCC" at offset 4 (overlaps with "BBB" at 4-6) - let result = dir.insert_segment(1004, b"XXXCC", 10_485_760, 10_000); + let result = dir.insert_segment(1004, b"XXXCC", 10_485_760, 10_000, 10_485_760); assert_eq!(result, InsertResult::PartialOverlap); assert_eq!(dir.overlap_count, 1); @@ -102,10 +102,10 @@ fn test_overlap_conflicting_data_detected() { let mut dir = FlowDirection::new(); dir.set_isn(1000); - dir.insert_segment(1001, b"AAAA", 10_485_760, 10_000); + dir.insert_segment(1001, b"AAAA", 10_485_760, 10_000, 10_485_760); // Same range, different data - let result = dir.insert_segment(1001, b"BBBB", 10_485_760, 10_000); + let result = dir.insert_segment(1001, b"BBBB", 10_485_760, 10_000, 10_485_760); assert_eq!(result, InsertResult::ConflictingOverlap); assert_eq!(dir.overlap_count, 1); @@ -121,11 +121,11 @@ fn test_sequence_wraparound() { dir.set_isn(0xFFFF_FFF0); // First data byte at ISN+1 = 0xFFFF_FFF1, offset = 1 - dir.insert_segment(0xFFFF_FFF1, b"before", 10_485_760, 10_000); + dir.insert_segment(0xFFFF_FFF1, b"before", 10_485_760, 10_000, 10_485_760); // Next segment wraps: seq = 0xFFFF_FFF1 + 6 = 0xFFFF_FFF7, offset = 7 - dir.insert_segment(0xFFFF_FFF7, b"wrap", 10_485_760, 10_000); + dir.insert_segment(0xFFFF_FFF7, b"wrap", 10_485_760, 10_000, 10_485_760); // Another after wrap: seq = 0xFFFF_FFFB, offset = 11 - dir.insert_segment(0xFFFF_FFFB, b"around", 10_485_760, 10_000); + dir.insert_segment(0xFFFF_FFFB, b"around", 10_485_760, 10_000, 10_485_760); let flushed = dir.flush_contiguous(); let all_bytes: Vec = flushed @@ -143,7 +143,7 @@ fn test_small_segment_tracking() { // Insert small segments for i in 0..5u32 { let seq = 1001 + i; - dir.insert_segment(seq, b"a", 10_485_760, 10_000); + dir.insert_segment(seq, b"a", 10_485_760, 10_000, 10_485_760); } assert_eq!(dir.small_segment_count, 5); @@ -153,9 +153,9 @@ fn test_small_segment_tracking() { fn test_buffered_bytes_after_insert() { let mut dir = FlowDirection::new(); dir.set_isn(1000); - dir.insert_segment(1001, b"hello", 10_485_760, 10_000); + dir.insert_segment(1001, b"hello", 10_485_760, 10_000, 10_485_760); assert_eq!(dir.buffered_bytes, 5); - dir.insert_segment(1006, b"world", 10_485_760, 10_000); + dir.insert_segment(1006, b"world", 10_485_760, 10_000, 10_485_760); assert_eq!(dir.buffered_bytes, 10); } @@ -163,9 +163,9 @@ fn test_buffered_bytes_after_insert() { fn test_buffered_bytes_after_overlap() { let mut dir = FlowDirection::new(); dir.set_isn(1000); - dir.insert_segment(1001, b"AAABBB", 10_485_760, 10_000); + dir.insert_segment(1001, b"AAABBB", 10_485_760, 10_000, 10_485_760); assert_eq!(dir.buffered_bytes, 6); - dir.insert_segment(1004, b"XXXCC", 10_485_760, 10_000); + dir.insert_segment(1004, b"XXXCC", 10_485_760, 10_000, 10_485_760); assert_eq!(dir.buffered_bytes, 8); // 6 original + 2 gap bytes } @@ -174,7 +174,7 @@ fn test_buffered_bytes_after_flush() { let mut dir = FlowDirection::new(); dir.set_isn(1000); - dir.insert_segment(1001, b"hello", 10_485_760, 10_000); + dir.insert_segment(1001, b"hello", 10_485_760, 10_000, 10_485_760); assert_eq!(dir.buffered_bytes, 5); let flushed = dir.flush_contiguous(); @@ -188,8 +188,8 @@ fn test_buffered_bytes_partial_flush() { dir.set_isn(1000); // Insert segment at offset 1 (contiguous) and offset 10 (gap) - dir.insert_segment(1001, b"aaa", 10_485_760, 10_000); - dir.insert_segment(1010, b"bbb", 10_485_760, 10_000); + dir.insert_segment(1001, b"aaa", 10_485_760, 10_000, 10_485_760); + dir.insert_segment(1010, b"bbb", 10_485_760, 10_000, 10_485_760); assert_eq!(dir.buffered_bytes, 6); // Flush only flushes contiguous segment at offset 1 @@ -206,14 +206,14 @@ fn test_depth_limit_truncation() { let max_depth: usize = 100; // small for testing let data = vec![b'A'; 80]; - dir.insert_segment(1001, &data, max_depth, 10_000); + dir.insert_segment(1001, &data, max_depth, 10_000, 10_485_760); dir.flush_contiguous(); assert_eq!(dir.reassembled_bytes, 80); assert!(!dir.depth_exceeded); // This should be truncated to 20 bytes let data2 = vec![b'B'; 50]; - let result = dir.insert_segment(1081, &data2, max_depth, 10_000); + let result = dir.insert_segment(1081, &data2, max_depth, 10_000, 10_485_760); assert_eq!(result, InsertResult::Truncated); assert!(dir.depth_exceeded); @@ -228,21 +228,21 @@ fn test_overlap_detection_boundary() { dir.set_isn(1000); // Insert segment at offset 1, length 5 (covers 1-5) - dir.insert_segment(1001, b"AAAAA", 10_485_760, 10_000); + dir.insert_segment(1001, b"AAAAA", 10_485_760, 10_000, 10_485_760); // Insert segment at offset 10, length 5 (covers 10-14) — no overlap with above - dir.insert_segment(1010, b"BBBBB", 10_485_760, 10_000); + dir.insert_segment(1010, b"BBBBB", 10_485_760, 10_000, 10_485_760); assert_eq!(dir.segments.len(), 2); assert_eq!(dir.overlap_count, 0); // Insert segment at offset 3, length 4 (covers 3-6) — overlaps first, not second - let result = dir.insert_segment(1003, b"XXXX", 10_485_760, 10_000); + let result = dir.insert_segment(1003, b"XXXX", 10_485_760, 10_000, 10_485_760); assert_eq!(result, InsertResult::PartialOverlap); assert_eq!(dir.overlap_count, 1); // Insert segment at offset 6, length 4 (covers 6-9). // The partial-overlap insert above deposited a 1-byte gap at offset 6 ("X"), // so this segment overlaps that byte and is PartialOverlap. - let result = dir.insert_segment(1006, b"CCCC", 10_485_760, 10_000); + let result = dir.insert_segment(1006, b"CCCC", 10_485_760, 10_000, 10_485_760); assert_eq!(result, InsertResult::PartialOverlap); assert_eq!(dir.overlap_count, 2); } @@ -253,11 +253,33 @@ fn test_range_boundary_exact_new_end() { dir.set_isn(1000); // Insert segment at offset 1, length 5 (covers 1-5, ends at 6) - dir.insert_segment(1001, b"AAAAA", 10_485_760, 10_000); + dir.insert_segment(1001, b"AAAAA", 10_485_760, 10_000, 10_485_760); // Insert segment starting exactly at the end of the first (offset 6) // This should NOT overlap — range(..new_end) must exclude it - let result = dir.insert_segment(1006, b"BBBBB", 10_485_760, 10_000); + let result = dir.insert_segment(1006, b"BBBBB", 10_485_760, 10_000, 10_485_760); assert_eq!(result, InsertResult::Inserted); assert_eq!(dir.overlap_count, 0); } + +#[test] +fn test_out_of_window_segment_rejected() { + let mut dir = FlowDirection::new(); + dir.set_isn(1000); + + // Insert normal segment at offset 1 (within any window) + let result = dir.insert_segment(1001, b"hello", 10_485_760, 10_000, 1_048_576); + assert_eq!(result, InsertResult::Inserted); + + dir.flush_contiguous(); // base_offset now 6 + + // Insert segment far beyond window: base_offset=6 + 1MB + 100 = way out of window + let far_seq = 1000 + 6 + 1_048_576 + 100; // ISN + base_offset + window + 100 + let result = dir.insert_segment(far_seq as u32, b"evil", 10_485_760, 10_000, 1_048_576); + assert_eq!(result, InsertResult::OutOfWindow); + + // Segment just inside window boundary should be accepted + let edge_seq = 1000 + 6 + 1_048_576; // ISN + base_offset + window (exactly at boundary) + let result = dir.insert_segment(edge_seq as u32, b"edge", 10_485_760, 10_000, 1_048_576); + assert_eq!(result, InsertResult::Inserted); +} From c7e4b2229059181467865c63001797a4bd055e22 Mon Sep 17 00:00:00 2001 From: Zious Date: Tue, 7 Apr 2026 14:16:28 -0500 Subject: [PATCH 5/6] fix: rewrite close_flow to single remove, add off-by-one boundary test Replaces triple lookup (.get + .get_mut + .remove) with single .remove() returning owned flow. Eliminates .expect() panic risk and simplifies memory accounting. Adds off-by-one boundary test for max_receive_window (window+1 rejected, window accepted). Part of #12 --- src/reassembly/mod.rs | 25 +++++++++++-------------- tests/reassembly_segment_tests.rs | 7 ++++++- 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/src/reassembly/mod.rs b/src/reassembly/mod.rs index ce264bf..aa20c7d 100644 --- a/src/reassembly/mod.rs +++ b/src/reassembly/mod.rs @@ -361,22 +361,19 @@ impl TcpReassembler { /// update memory accounting, and notify the handler. fn close_flow(&mut self, key: &FlowKey, reason: CloseReason, handler: &mut dyn StreamHandler) { use crate::reassembly::handler::Direction; - let flow_mem = self - .flows - .get(key) - .expect("flow must exist before close") - .memory_used(); - if let Some(flow) = self.flows.get_mut(key) { - for dir in [Direction::ClientToServer, Direction::ServerToClient] { - let flow_dir = flow.get_direction_mut(dir); - let flushed = flow_dir.flush_contiguous(); - for (offset, data) in &flushed { - self.stats.bytes_reassembled += data.len() as u64; - handler.on_data(key, dir, data, *offset); - } + let Some(mut flow) = self.flows.remove(key) else { + debug_assert!(false, "close_flow called for non-existent key: {}", key); + return; + }; + let flow_mem = flow.memory_used(); + for dir in [Direction::ClientToServer, Direction::ServerToClient] { + let flow_dir = flow.get_direction_mut(dir); + let flushed = flow_dir.flush_contiguous(); + for (offset, data) in &flushed { + self.stats.bytes_reassembled += data.len() as u64; + handler.on_data(key, dir, data, *offset); } } - self.flows.remove(key); self.total_memory -= flow_mem; handler.on_flow_close(key, reason); } diff --git a/tests/reassembly_segment_tests.rs b/tests/reassembly_segment_tests.rs index c5712e3..88bf578 100644 --- a/tests/reassembly_segment_tests.rs +++ b/tests/reassembly_segment_tests.rs @@ -278,7 +278,12 @@ fn test_out_of_window_segment_rejected() { let result = dir.insert_segment(far_seq as u32, b"evil", 10_485_760, 10_000, 1_048_576); assert_eq!(result, InsertResult::OutOfWindow); - // Segment just inside window boundary should be accepted + // Segment exactly one byte beyond window should be rejected (off-by-one check) + let one_past_seq = 1000 + 6 + 1_048_576 + 1; // ISN + base_offset + window + 1 + let result = dir.insert_segment(one_past_seq as u32, b"x", 10_485_760, 10_000, 1_048_576); + assert_eq!(result, InsertResult::OutOfWindow); + + // Segment exactly at window boundary should be accepted let edge_seq = 1000 + 6 + 1_048_576; // ISN + base_offset + window (exactly at boundary) let result = dir.insert_segment(edge_seq as u32, b"edge", 10_485_760, 10_000, 1_048_576); assert_eq!(result, InsertResult::Inserted); From a05a7f5f8bb97d4c3fda8d26c4907cfbe0ae9364 Mon Sep 17 00:00:00 2001 From: Zious Date: Tue, 7 Apr 2026 14:20:27 -0500 Subject: [PATCH 6/6] docs: update README for TLS analyzer and reassembly improvements Adds TLS to protocol analysis, features, architecture diagram, and component table. Updates --tls flag description (no longer "coming soon"). Removes TLS from roadmap (implemented in #30). Adds max_receive_window to reassembly feature description. --- README.md | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/README.md b/README.md index 3723984..45be462 100644 --- a/README.md +++ b/README.md @@ -7,9 +7,10 @@ Inspired by [pcapper](https://github.com/SackOfHacks/pcapper) — reimagined for ## Features - **One-pass triage** — hosts, services, protocols, and threat signals from pcap files -- **Protocol analysis** — DNS and HTTP traffic analysis with extensible analyzer framework +- **Protocol analysis** — DNS, HTTP, and TLS traffic analysis with extensible analyzer framework - **HTTP forensics** — stream-level HTTP/1.x parsing with detection for path traversal, web shells, unusual methods, and anomalies -- **TCP stream reassembly** — forensic-grade reassembly engine with first-wins overlap policy, configurable depth/memory limits +- **TLS forensics** — ClientHello/ServerHello parsing, SNI extraction, JA3/JA3S fingerprinting, weak cipher and deprecated SSL 2.0/3.0 detection +- **TCP stream reassembly** — forensic-grade reassembly engine with first-wins overlap policy, configurable depth/memory/window limits - **Multi-link-type support** — Ethernet, Raw IP, IPv4, IPv6, and Linux Cooked (SLL) pcap formats - **Threat detection** — finding system with verdict/confidence scoring and MITRE ATT&CK mapping - **Multiple outputs** — colored terminal, JSON export @@ -85,7 +86,7 @@ Options: --threats Run threat detection --dns Analyze DNS traffic --http Analyze HTTP traffic (auto-enables reassembly) ---tls Analyze TLS handshakes (coming soon) +--tls Analyze TLS handshakes (SNI, JA3/JA3S, weak ciphers, deprecated SSL) --beacon Detect C2 beaconing patterns (coming soon) -a, --all Run all analyzers -f, --filter BPF filter expression @@ -98,7 +99,7 @@ PCAP file → Reader → Decoder → Analyzers → Reporter ↓ ↓ ↓ DataLink ParsedPacket Findings ↓ - Reassembly Engine → StreamAnalyzers (HTTP) + Reassembly Engine → StreamDispatcher → StreamAnalyzers (HTTP, TLS) ↓ Summary ``` @@ -108,6 +109,7 @@ PCAP file → Reader → Decoder → Analyzers → Reporter | Reader | `pcap-file` | Parse pcap files (5 link types) | | Decoder | `etherparse` | Zero-copy packet parsing | | HTTP Parser | `httparse` | HTTP/1.x request/response parsing | +| TLS Parser | `tls-parser` | TLS handshake parsing, JA3/JA3S | | Reassembly | (built-in) | TCP stream reassembly engine | | CLI | `clap` | Argument parsing | | Output | `owo-colors`, `serde_json` | Terminal + JSON | @@ -144,7 +146,6 @@ impl ProtocolAnalyzer for MyAnalyzer { See [open issues](https://github.com/Zious11/wirerust/issues) for planned features: -- TLS analyzer (JA3/JA4 fingerprinting) - C2 beaconing detection - CSV and SQLite export - MITRE ATT&CK mapping