diff --git a/docs/superpowers/plans/2026-04-06-tcp-reassembly.md b/docs/superpowers/plans/2026-04-06-tcp-reassembly.md new file mode 100644 index 0000000..4115840 --- /dev/null +++ b/docs/superpowers/plans/2026-04-06-tcp-reassembly.md @@ -0,0 +1,1787 @@ +# TCP Stream Reassembly Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Add forensic-grade TCP stream reassembly to wirerust with overlapping segment handling, memory limits, anomaly detection, and incremental stream delivery via callbacks. + +**Architecture:** Standalone `src/reassembly/` module between decoder and analyzers. FlowKey identifies connections, BTreeMap> stores out-of-order segments keyed by ISN-relative offset, contiguous data is flushed to StreamHandler callbacks incrementally. First-wins overlap policy. Configurable depth (10MB/direction) and memcap (1GB global). + +**Tech Stack:** Rust 2024 edition. No new crate dependencies — uses std collections (HashMap, BTreeMap) and existing wirerust types (ParsedPacket, TransportInfo, Finding). + +--- + +## File Structure + +``` +src/ +├── decoder.rs — MODIFY: add seq_number to TransportInfo::Tcp +├── cli.rs — MODIFY: add --reassemble, --no-reassemble, --reassembly-depth, --reassembly-memcap +├── lib.rs — MODIFY: add pub mod reassembly; +├── main.rs — MODIFY: wire reassembler into analyze pipeline +├── reassembly/ +│ ├── mod.rs — TcpReassembler, ReassemblyConfig, ReassemblyStats, public API +│ ├── flow.rs — FlowKey, TcpFlow, FlowDirection, FlowState, Direction, CloseReason +│ ├── segment.rs — insert_segment(), flush_contiguous(), overlap trimming +│ └── handler.rs — StreamHandler trait, StreamAnalyzer trait +tests/ +├── decoder_tests.rs — MODIFY: update tests for new seq_number field +├── reassembly_flow_tests.rs — FlowKey canonicalization, state transitions +├── reassembly_segment_tests.rs — Segment insertion, overlap, flush, wraparound +├── reassembly_engine_tests.rs — Full engine: depth limit, memcap, eviction, anomaly detection +``` + +--- + +### Task 1: Add seq_number to TransportInfo::Tcp + +**Files:** +- Modify: `src/decoder.rs` +- Modify: `tests/decoder_tests.rs` +- Modify: `tests/analyzer_tests.rs` +- Modify: `tests/summary_tests.rs` +- Modify: `tests/integration_test.rs` + +The reassembler needs the TCP sequence number from each packet. Currently `TransportInfo::Tcp` only has ports and flags. + +- [ ] **Step 1: Update TransportInfo::Tcp to include seq_number** + +In `src/decoder.rs`, change the `Tcp` variant: + +```rust +#[derive(Debug, Clone)] +pub enum TransportInfo { + Tcp { + src_port: u16, + dst_port: u16, + seq_number: u32, + syn: bool, + ack: bool, + fin: bool, + rst: bool, + }, + Udp { + src_port: u16, + dst_port: u16, + }, + None, +} +``` + +In the `decode_packet` function, update the TCP match arm: + +```rust +Some(etherparse::TransportSlice::Tcp(tcp)) => ( + Protocol::Tcp, + TransportInfo::Tcp { + src_port: tcp.source_port(), + dst_port: tcp.destination_port(), + seq_number: tcp.header().sequence_number(), + syn: tcp.syn(), + ack: tcp.ack(), + fin: tcp.fin(), + rst: tcp.rst(), + }, +), +``` + +- [ ] **Step 2: Fix all test files that construct TransportInfo::Tcp** + +In `tests/decoder_tests.rs`, the existing tests use pattern matching with `..` so they should still compile. Verify by running: + +Run: `cargo test --test decoder_tests` + +In `tests/analyzer_tests.rs`, update `make_non_dns_packet()`: + +```rust +fn make_non_dns_packet() -> ParsedPacket { + ParsedPacket { + src_ip: IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), + dst_ip: IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2)), + protocol: Protocol::Tcp, + transport: TransportInfo::Tcp { + src_port: 12345, + dst_port: 80, + seq_number: 1000, + syn: true, + ack: false, + fin: false, + rst: false, + }, + payload: vec![], + packet_len: 54, + } +} +``` + +In `tests/summary_tests.rs`, update `make_parsed()`: + +```rust +fn make_parsed(src: [u8; 4], dst: [u8; 4], src_port: u16, dst_port: u16) -> ParsedPacket { + ParsedPacket { + src_ip: IpAddr::V4(Ipv4Addr::from(src)), + dst_ip: IpAddr::V4(Ipv4Addr::from(dst)), + protocol: Protocol::Tcp, + transport: TransportInfo::Tcp { + src_port, + dst_port, + seq_number: 1000, + syn: false, + ack: false, + fin: false, + rst: false, + }, + payload: vec![], + packet_len: 54, + } +} +``` + +- [ ] **Step 3: Run all tests** + +Run: `cargo test` +Expected: All 19 tests pass. + +- [ ] **Step 4: Commit** + +```bash +git add src/decoder.rs tests/decoder_tests.rs tests/analyzer_tests.rs tests/summary_tests.rs tests/integration_test.rs +git commit -m "feat: add seq_number to TransportInfo::Tcp for reassembly" +``` + +--- + +### Task 2: StreamHandler and Flow Types (handler.rs + flow.rs) + +**Files:** +- Create: `src/reassembly/handler.rs` +- Create: `src/reassembly/flow.rs` +- Create: `src/reassembly/mod.rs` (stub) +- Modify: `src/lib.rs` +- Create: `tests/reassembly_flow_tests.rs` + +- [ ] **Step 1: Write the failing test for FlowKey canonicalization** + +Create `tests/reassembly_flow_tests.rs`: + +```rust +use std::net::{IpAddr, Ipv4Addr}; + +use wirerust::reassembly::flow::{FlowDirection, FlowKey, FlowState, TcpFlow}; +use wirerust::reassembly::handler::Direction; + +#[test] +fn test_flow_key_canonicalization() { + let ip_a = IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)); + let ip_b = IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2)); + + let key_ab = FlowKey::new(ip_a, 12345, ip_b, 80); + let key_ba = FlowKey::new(ip_b, 80, ip_a, 12345); + + assert_eq!(key_ab, key_ba); + assert_eq!(key_ab.lower_ip, IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1))); + assert_eq!(key_ab.lower_port, 80); + assert_eq!(key_ab.upper_ip, IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2))); + assert_eq!(key_ab.upper_port, 12345); +} + +#[test] +fn test_flow_key_same_ip_different_ports() { + let ip = IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)); + + let key1 = FlowKey::new(ip, 80, ip, 12345); + let key2 = FlowKey::new(ip, 12345, ip, 80); + + assert_eq!(key1, key2); + assert_eq!(key1.lower_port, 80); + assert_eq!(key1.upper_port, 12345); +} + +#[test] +fn test_flow_direction_determines_client_server() { + let ip_client = IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)); + let ip_server = IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2)); + + let mut flow = TcpFlow::new(FlowKey::new(ip_client, 12345, ip_server, 80), 1000); + flow.set_initiator(ip_client, 12345); + + assert_eq!( + flow.direction(ip_client, 12345), + Direction::ClientToServer + ); + assert_eq!( + flow.direction(ip_server, 80), + Direction::ServerToClient + ); +} + +#[test] +fn test_flow_state_transitions() { + let ip_a = IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)); + let ip_b = IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2)); + + let mut flow = TcpFlow::new(FlowKey::new(ip_a, 12345, ip_b, 80), 1000); + assert_eq!(flow.state, FlowState::New); + + flow.on_syn(); + assert_eq!(flow.state, FlowState::SynSent); + + flow.on_syn_ack(); + assert_eq!(flow.state, FlowState::Established); + + flow.on_fin(); + assert_eq!(flow.state, FlowState::Closing); + + flow.on_fin(); + assert_eq!(flow.state, FlowState::Closed); +} + +#[test] +fn test_flow_rst_from_any_state() { + let ip_a = IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)); + let ip_b = IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2)); + + let mut flow = TcpFlow::new(FlowKey::new(ip_a, 12345, ip_b, 80), 1000); + flow.on_syn(); + assert_eq!(flow.state, FlowState::SynSent); + + flow.on_rst(); + assert_eq!(flow.state, FlowState::Closed); +} + +#[test] +fn test_mid_stream_pickup() { + let ip_a = IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)); + let ip_b = IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2)); + + let mut flow = TcpFlow::new(FlowKey::new(ip_a, 12345, ip_b, 80), 1000); + flow.on_data_without_syn(); + assert_eq!(flow.state, FlowState::Established); + assert!(flow.partial); +} + +#[test] +fn test_flow_direction_new() { + let dir = FlowDirection::new(); + assert_eq!(dir.isn, None); + assert_eq!(dir.base_offset, 0); + assert!(dir.segments.is_empty()); + assert_eq!(dir.reassembled_bytes, 0); + assert!(!dir.fin_seen); + assert!(!dir.rst_seen); + assert!(!dir.depth_exceeded); +} +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `cargo test --test reassembly_flow_tests` +Expected: FAIL — module `reassembly` not found. + +- [ ] **Step 3: Create handler.rs with traits and enums** + +Create `src/reassembly/handler.rs`: + +```rust +use crate::analyzer::AnalysisSummary; +use crate::findings::Finding; +use crate::reassembly::flow::FlowKey; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Direction { + ClientToServer, + ServerToClient, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum CloseReason { + Fin, + Rst, + Timeout, + MemoryPressure, +} + +pub trait StreamHandler { + fn on_data( + &mut self, + flow_key: &FlowKey, + direction: Direction, + data: &[u8], + offset: u64, + ); + + fn on_flow_close(&mut self, flow_key: &FlowKey, reason: CloseReason); +} + +pub trait StreamAnalyzer: StreamHandler { + fn name(&self) -> &'static str; + fn summarize(&self) -> AnalysisSummary; + fn findings(&self) -> Vec; +} +``` + +- [ ] **Step 4: Create flow.rs with FlowKey, FlowDirection, TcpFlow, FlowState** + +Create `src/reassembly/flow.rs`: + +```rust +use std::collections::BTreeMap; +use std::net::IpAddr; + +use crate::reassembly::handler::Direction; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct FlowKey { + pub lower_ip: IpAddr, + pub lower_port: u16, + pub upper_ip: IpAddr, + pub upper_port: u16, +} + +impl FlowKey { + pub fn new(ip_a: IpAddr, port_a: u16, ip_b: IpAddr, port_b: u16) -> Self { + if (ip_a, port_a) <= (ip_b, port_b) { + FlowKey { + lower_ip: ip_a, + lower_port: port_a, + upper_ip: ip_b, + upper_port: port_b, + } + } else { + FlowKey { + lower_ip: ip_b, + lower_port: port_b, + upper_ip: ip_a, + upper_port: port_a, + } + } + } +} + +impl std::fmt::Display for FlowKey { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}:{} → {}:{}", + self.lower_ip, self.lower_port, self.upper_ip, self.upper_port + ) + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum FlowState { + New, + SynSent, + Established, + Closing, + Closed, + TimedOut, +} + +#[derive(Debug)] +pub struct FlowDirection { + pub isn: Option, + pub base_offset: u64, + pub segments: BTreeMap>, + pub reassembled_bytes: usize, + pub overlap_count: u32, + pub small_segment_count: u32, + pub fin_seen: bool, + pub rst_seen: bool, + pub depth_exceeded: bool, +} + +impl FlowDirection { + pub fn new() -> Self { + FlowDirection { + isn: None, + base_offset: 0, + segments: BTreeMap::new(), + reassembled_bytes: 0, + overlap_count: 0, + small_segment_count: 0, + fin_seen: false, + rst_seen: false, + depth_exceeded: false, + } + } + + pub fn set_isn(&mut self, isn: u32) { + if self.isn.is_none() { + self.isn = Some(isn); + self.base_offset = 1; // ISN+1 is first data byte + } + } + + pub fn infer_isn(&mut self, first_seq: u32) { + if self.isn.is_none() { + self.isn = Some(first_seq.wrapping_sub(1)); + self.base_offset = 1; + } + } + + pub fn memory_used(&self) -> usize { + self.segments.values().map(|v| v.len()).sum() + } +} + +#[derive(Debug)] +pub struct TcpFlow { + pub key: FlowKey, + pub client_to_server: FlowDirection, + pub server_to_client: FlowDirection, + pub state: FlowState, + pub partial: bool, + pub first_seen: u32, + pub last_seen: u32, + initiator_ip: Option, + initiator_port: Option, + fin_count: u8, +} + +impl TcpFlow { + pub fn new(key: FlowKey, timestamp: u32) -> Self { + TcpFlow { + key, + client_to_server: FlowDirection::new(), + server_to_client: FlowDirection::new(), + state: FlowState::New, + partial: false, + first_seen: timestamp, + last_seen: timestamp, + initiator_ip: None, + initiator_port: None, + fin_count: 0, + } + } + + pub fn set_initiator(&mut self, ip: IpAddr, port: u16) { + if self.initiator_ip.is_none() { + self.initiator_ip = Some(ip); + self.initiator_port = Some(port); + } + } + + pub fn direction(&self, src_ip: IpAddr, src_port: u16) -> Direction { + if self.initiator_ip == Some(src_ip) && self.initiator_port == Some(src_port) { + Direction::ClientToServer + } else { + Direction::ServerToClient + } + } + + pub fn get_direction_mut(&mut self, dir: Direction) -> &mut FlowDirection { + match dir { + Direction::ClientToServer => &mut self.client_to_server, + Direction::ServerToClient => &mut self.server_to_client, + } + } + + pub fn on_syn(&mut self) { + if self.state == FlowState::New { + self.state = FlowState::SynSent; + } + } + + pub fn on_syn_ack(&mut self) { + if self.state == FlowState::SynSent || self.state == FlowState::New { + self.state = FlowState::Established; + } + } + + pub fn on_data_without_syn(&mut self) { + if self.state == FlowState::New { + self.state = FlowState::Established; + self.partial = true; + } + } + + pub fn on_fin(&mut self) { + self.fin_count += 1; + if self.fin_count >= 2 { + self.state = FlowState::Closed; + } else if self.state == FlowState::Established || self.state == FlowState::SynSent { + self.state = FlowState::Closing; + } + } + + pub fn on_rst(&mut self) { + self.state = FlowState::Closed; + } + + pub fn memory_used(&self) -> usize { + self.client_to_server.memory_used() + self.server_to_client.memory_used() + } +} +``` + +- [ ] **Step 5: Create reassembly/mod.rs stub** + +Create `src/reassembly/mod.rs`: + +```rust +pub mod flow; +pub mod handler; +``` + +- [ ] **Step 6: Add reassembly module to lib.rs** + +Update `src/lib.rs`: + +```rust +pub mod analyzer; +pub mod cli; +pub mod decoder; +pub mod findings; +pub mod reader; +pub mod reassembly; +pub mod reporter; +pub mod summary; +``` + +- [ ] **Step 7: Run tests** + +Run: `cargo test --test reassembly_flow_tests` +Expected: 7 tests PASS. + +- [ ] **Step 8: Commit** + +```bash +git add src/reassembly/ src/lib.rs tests/reassembly_flow_tests.rs +git commit -m "feat: add FlowKey, TcpFlow, FlowDirection, StreamHandler types" +``` + +--- + +### Task 3: Segment Insertion and Contiguous Flush (segment.rs) + +**Files:** +- Create: `src/reassembly/segment.rs` +- Modify: `src/reassembly/mod.rs` (add `pub mod segment;`) +- Create: `tests/reassembly_segment_tests.rs` + +- [ ] **Step 1: Write failing tests for segment operations** + +Create `tests/reassembly_segment_tests.rs`: + +```rust +use wirerust::reassembly::flow::FlowDirection; +use wirerust::reassembly::segment::{flush_contiguous, insert_segment, InsertResult}; + +#[test] +fn test_insert_single_segment() { + let mut dir = FlowDirection::new(); + dir.set_isn(1000); + + let result = insert_segment(&mut dir, 1001, b"hello", 10_485_760); + assert_eq!(result, InsertResult::Inserted); + assert_eq!(dir.segments.len(), 1); + assert_eq!(dir.segments.get(&1), Some(&b"hello".to_vec())); +} + +#[test] +fn test_flush_contiguous_single() { + let mut dir = FlowDirection::new(); + dir.set_isn(1000); + + insert_segment(&mut dir, 1001, b"hello", 10_485_760); + + let flushed = flush_contiguous(&mut dir); + assert_eq!(flushed.len(), 1); + assert_eq!(flushed[0].0, 1); // offset + assert_eq!(flushed[0].1, b"hello"); + assert_eq!(dir.base_offset, 6); // 1 + 5 + assert_eq!(dir.reassembled_bytes, 5); + assert!(dir.segments.is_empty()); +} + +#[test] +fn test_flush_contiguous_ordered() { + let mut dir = FlowDirection::new(); + dir.set_isn(1000); + + insert_segment(&mut dir, 1001, b"aaa", 10_485_760); + insert_segment(&mut dir, 1004, b"bbb", 10_485_760); + + let flushed = flush_contiguous(&mut dir); + assert_eq!(flushed.len(), 2); + assert_eq!(flushed[0].1, b"aaa"); + assert_eq!(flushed[1].1, b"bbb"); + assert_eq!(dir.base_offset, 7); // 1 + 3 + 3 + assert!(dir.segments.is_empty()); +} + +#[test] +fn test_out_of_order_buffering() { + let mut dir = FlowDirection::new(); + dir.set_isn(1000); + + // Insert segment 2 first (out of order) + insert_segment(&mut dir, 1004, b"bbb", 10_485_760); + let flushed = flush_contiguous(&mut dir); + assert!(flushed.is_empty()); // Can't flush — gap at offset 1 + + // Now insert segment 1 + insert_segment(&mut dir, 1001, b"aaa", 10_485_760); + let flushed = flush_contiguous(&mut dir); + assert_eq!(flushed.len(), 2); // Both flush now + assert_eq!(flushed[0].1, b"aaa"); + assert_eq!(flushed[1].1, b"bbb"); + assert_eq!(dir.base_offset, 7); +} + +#[test] +fn test_retransmission_dedup() { + let mut dir = FlowDirection::new(); + dir.set_isn(1000); + + insert_segment(&mut dir, 1001, b"hello", 10_485_760); + let result = insert_segment(&mut dir, 1001, b"hello", 10_485_760); + assert_eq!(result, InsertResult::Duplicate); + assert_eq!(dir.segments.len(), 1); // No duplicate stored +} + +#[test] +fn test_overlap_first_wins() { + let mut dir = FlowDirection::new(); + dir.set_isn(1000); + + // Insert "AAABBB" at offset 1 + insert_segment(&mut dir, 1001, b"AAABBB", 10_485_760); + + // Overlapping insert: "XXXCC" at offset 4 (overlaps with "BBB" at 4-6) + let result = insert_segment(&mut dir, 1004, b"XXXCC", 10_485_760); + assert_eq!(result, InsertResult::PartialOverlap); + assert_eq!(dir.overlap_count, 1); + + // Flush and verify: first 6 bytes from original, then "CC" from new + let flushed = flush_contiguous(&mut dir); + let all_bytes: Vec = flushed.iter().flat_map(|(_, data)| data.iter().copied()).collect(); + assert_eq!(&all_bytes, b"AAABBBCC"); +} + +#[test] +fn test_overlap_conflicting_data_detected() { + let mut dir = FlowDirection::new(); + dir.set_isn(1000); + + insert_segment(&mut dir, 1001, b"AAAA", 10_485_760); + + // Same range, different data + let result = insert_segment(&mut dir, 1001, b"BBBB", 10_485_760); + assert_eq!(result, InsertResult::ConflictingOverlap); + assert_eq!(dir.overlap_count, 1); + + // Original data preserved (first-wins) + let flushed = flush_contiguous(&mut dir); + assert_eq!(flushed[0].1, b"AAAA"); +} + +#[test] +fn test_sequence_wraparound() { + let mut dir = FlowDirection::new(); + // ISN near wraparound + dir.set_isn(0xFFFF_FFF0); + + // First data byte at ISN+1 = 0xFFFF_FFF1, offset = 1 + insert_segment(&mut dir, 0xFFFF_FFF1, b"before", 10_485_760); + // Next segment wraps: seq = 0xFFFF_FFF1 + 6 = 0xFFFF_FFF7, offset = 7 + insert_segment(&mut dir, 0xFFFF_FFF7, b"wrap", 10_485_760); + // Another after wrap: seq = 0xFFFF_FFFB, offset = 11 + insert_segment(&mut dir, 0xFFFF_FFFB, b"around", 10_485_760); + + let flushed = flush_contiguous(&mut dir); + let all_bytes: Vec = flushed.iter().flat_map(|(_, data)| data.iter().copied()).collect(); + assert_eq!(&all_bytes, b"beforewraparound"); +} + +#[test] +fn test_small_segment_tracking() { + let mut dir = FlowDirection::new(); + dir.set_isn(1000); + + // Insert small segments + for i in 0..5u32 { + let seq = 1001 + i; + insert_segment(&mut dir, seq, &[b'a'], 10_485_760); + } + + assert_eq!(dir.small_segment_count, 5); +} + +#[test] +fn test_depth_limit_truncation() { + let mut dir = FlowDirection::new(); + dir.set_isn(1000); + + let max_depth: usize = 100; // small for testing + let data = vec![b'A'; 80]; + insert_segment(&mut dir, 1001, &data, max_depth); + flush_contiguous(&mut dir); + assert_eq!(dir.reassembled_bytes, 80); + assert!(!dir.depth_exceeded); + + // This should be truncated to 20 bytes + let data2 = vec![b'B'; 50]; + let result = insert_segment(&mut dir, 1081, &data2, max_depth); + assert_eq!(result, InsertResult::Truncated); + assert!(dir.depth_exceeded); + + let flushed = flush_contiguous(&mut dir); + assert_eq!(flushed[0].1.len(), 20); // truncated from 50 to 20 + assert_eq!(dir.reassembled_bytes, 100); +} +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `cargo test --test reassembly_segment_tests` +Expected: FAIL — module `segment` not found. + +- [ ] **Step 3: Implement segment.rs** + +Create `src/reassembly/segment.rs`: + +```rust +use crate::reassembly::flow::FlowDirection; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum InsertResult { + Inserted, + Duplicate, + PartialOverlap, + ConflictingOverlap, + Truncated, + DepthExceeded, +} + +/// Compute the ISN-relative offset for a sequence number. +fn seq_offset(seq: u32, isn: u32) -> u64 { + seq.wrapping_sub(isn) as u64 +} + +/// Insert a segment into the flow direction's out-of-order buffer. +/// Applies first-wins overlap policy and tracks anomaly counters. +pub fn insert_segment( + dir: &mut FlowDirection, + seq: u32, + data: &[u8], + max_depth: usize, +) -> InsertResult { + if data.is_empty() { + return InsertResult::Inserted; + } + + let isn = match dir.isn { + Some(isn) => isn, + None => return InsertResult::Inserted, // no ISN yet, skip + }; + + // Track small segments + if data.len() < 8 { + dir.small_segment_count += 1; + } else { + dir.small_segment_count = 0; // reset on normal-sized segment + } + + // Check depth limit + let remaining_depth = max_depth.saturating_sub(dir.reassembled_bytes); + if remaining_depth == 0 { + if !dir.depth_exceeded { + dir.depth_exceeded = true; + } + return InsertResult::DepthExceeded; + } + + let offset = seq_offset(seq, isn); + let mut segment_data = data.to_vec(); + + // Truncate if exceeding depth + let buffered: usize = dir.segments.values().map(|v| v.len()).sum(); + let total_after = dir.reassembled_bytes + buffered + segment_data.len(); + let truncated = if total_after > max_depth { + let allowed = max_depth.saturating_sub(dir.reassembled_bytes + buffered); + if allowed == 0 { + dir.depth_exceeded = true; + return InsertResult::DepthExceeded; + } + segment_data.truncate(allowed); + dir.depth_exceeded = true; + true + } else { + false + }; + + let new_start = offset; + let new_end = offset + segment_data.len() as u64; + + // Check for overlaps with existing segments + let mut has_overlap = false; + let mut has_conflict = false; + let mut trimmed_ranges: Vec<(u64, u64)> = Vec::new(); + + // Collect existing segment ranges that overlap + for (&existing_offset, existing_data) in dir.segments.iter() { + let existing_end = existing_offset + existing_data.len() as u64; + + if new_start < existing_end && new_end > existing_offset { + // Overlap detected + has_overlap = true; + + // Check if overlapping region has different data (conflict) + let overlap_start = new_start.max(existing_offset); + let overlap_end = new_end.min(existing_end); + + for pos in overlap_start..overlap_end { + let new_idx = (pos - new_start) as usize; + let existing_idx = (pos - existing_offset) as usize; + if new_idx < segment_data.len() + && existing_idx < existing_data.len() + && segment_data[new_idx] != existing_data[existing_idx] + { + has_conflict = true; + break; + } + } + + trimmed_ranges.push((existing_offset, existing_end)); + } + } + + if has_overlap { + dir.overlap_count += 1; + + // Check if fully covered (duplicate/retransmission) + let fully_covered = trimmed_ranges.iter().any(|&(es, ee)| es <= new_start && ee >= new_end); + if fully_covered { + return if has_conflict { + InsertResult::ConflictingOverlap + } else { + InsertResult::Duplicate + }; + } + + // First-wins: trim new segment to only cover gaps + // Build list of gap regions within [new_start, new_end) + let mut gaps: Vec<(u64, u64)> = Vec::new(); + let mut cursor = new_start; + + // Sort existing overlapping ranges + let mut sorted_ranges = trimmed_ranges.clone(); + sorted_ranges.sort_by_key(|&(start, _)| start); + + for &(es, ee) in &sorted_ranges { + if cursor < es { + gaps.push((cursor, es.min(new_end))); + } + cursor = cursor.max(ee); + } + if cursor < new_end { + gaps.push((cursor, new_end)); + } + + // Insert only gap portions + for (gap_start, gap_end) in gaps { + let start_idx = (gap_start - new_start) as usize; + let end_idx = (gap_end - new_start) as usize; + if start_idx < segment_data.len() && end_idx <= segment_data.len() { + let gap_data = segment_data[start_idx..end_idx].to_vec(); + if !gap_data.is_empty() { + dir.segments.insert(gap_start, gap_data); + } + } + } + + return if has_conflict { + InsertResult::ConflictingOverlap + } else if truncated { + InsertResult::Truncated + } else { + InsertResult::PartialOverlap + }; + } + + // No overlap — insert normally + dir.segments.insert(offset, segment_data); + + if truncated { + InsertResult::Truncated + } else { + InsertResult::Inserted + } +} + +/// Flush contiguous segments starting from base_offset. +/// Returns Vec of (offset, data) pairs that were flushed. +pub fn flush_contiguous(dir: &mut FlowDirection) -> Vec<(u64, Vec)> { + let mut flushed = Vec::new(); + + loop { + if let Some(data) = dir.segments.remove(&dir.base_offset) { + let offset = dir.base_offset; + dir.base_offset += data.len() as u64; + dir.reassembled_bytes += data.len(); + flushed.push((offset, data)); + } else { + break; + } + } + + flushed +} +``` + +- [ ] **Step 4: Add segment module to reassembly/mod.rs** + +Update `src/reassembly/mod.rs`: + +```rust +pub mod flow; +pub mod handler; +pub mod segment; +``` + +- [ ] **Step 5: Run tests** + +Run: `cargo test --test reassembly_segment_tests` +Expected: 10 tests PASS. + +- [ ] **Step 6: Commit** + +```bash +git add src/reassembly/segment.rs src/reassembly/mod.rs tests/reassembly_segment_tests.rs +git commit -m "feat: add segment insertion with first-wins overlap and contiguous flush" +``` + +--- + +### Task 4: TcpReassembler Engine (mod.rs) + +**Files:** +- Modify: `src/reassembly/mod.rs` +- Create: `tests/reassembly_engine_tests.rs` + +- [ ] **Step 1: Write failing tests for the engine** + +Create `tests/reassembly_engine_tests.rs`: + +```rust +use std::net::{IpAddr, Ipv4Addr}; + +use wirerust::decoder::{ParsedPacket, Protocol, TransportInfo}; +use wirerust::findings::Finding; +use wirerust::reassembly::flow::FlowKey; +use wirerust::reassembly::handler::{CloseReason, Direction, StreamHandler}; +use wirerust::reassembly::mod_public::{ReassemblyConfig, ReassemblyStats, TcpReassembler}; + +/// Test handler that records all callbacks. +struct RecordingHandler { + data_events: Vec<(FlowKey, Direction, Vec, u64)>, + close_events: Vec<(FlowKey, CloseReason)>, +} + +impl RecordingHandler { + fn new() -> Self { + RecordingHandler { + data_events: Vec::new(), + close_events: Vec::new(), + } + } + + fn all_data(&self) -> Vec { + self.data_events + .iter() + .flat_map(|(_, _, data, _)| data.iter().copied()) + .collect() + } +} + +impl StreamHandler for RecordingHandler { + fn on_data( + &mut self, + flow_key: &FlowKey, + direction: Direction, + data: &[u8], + offset: u64, + ) { + self.data_events + .push((flow_key.clone(), direction, data.to_vec(), offset)); + } + + fn on_flow_close(&mut self, flow_key: &FlowKey, reason: CloseReason) { + self.close_events.push((flow_key.clone(), reason)); + } +} + +fn make_tcp_packet( + src_ip: [u8; 4], + src_port: u16, + dst_ip: [u8; 4], + dst_port: u16, + seq: u32, + payload: &[u8], + syn: bool, + fin: bool, + rst: bool, +) -> ParsedPacket { + ParsedPacket { + src_ip: IpAddr::V4(Ipv4Addr::from(src_ip)), + dst_ip: IpAddr::V4(Ipv4Addr::from(dst_ip)), + protocol: Protocol::Tcp, + transport: TransportInfo::Tcp { + src_port, + dst_port, + seq_number: seq, + syn, + ack: false, + fin, + rst, + }, + payload: payload.to_vec(), + packet_len: 54 + payload.len(), + } +} + +#[test] +fn test_three_packet_stream_ordered() { + let config = ReassemblyConfig::default(); + let mut reassembler = TcpReassembler::new(config); + let mut handler = RecordingHandler::new(); + + let client = [10, 0, 0, 1]; + let server = [10, 0, 0, 2]; + + // SYN + let syn = make_tcp_packet(client, 12345, server, 80, 1000, &[], true, false, false); + reassembler.process_packet(&syn, 1, &mut handler); + + // Data packets + let p1 = make_tcp_packet(client, 12345, server, 80, 1001, b"aaa", false, false, false); + reassembler.process_packet(&p1, 2, &mut handler); + + let p2 = make_tcp_packet(client, 12345, server, 80, 1004, b"bbb", false, false, false); + reassembler.process_packet(&p2, 3, &mut handler); + + let p3 = make_tcp_packet(client, 12345, server, 80, 1007, b"ccc", false, false, false); + reassembler.process_packet(&p3, 4, &mut handler); + + assert_eq!(handler.all_data(), b"aaabbbccc"); + assert_eq!(handler.data_events.len(), 3); +} + +#[test] +fn test_out_of_order_delivery() { + let config = ReassemblyConfig::default(); + let mut reassembler = TcpReassembler::new(config); + let mut handler = RecordingHandler::new(); + + let client = [10, 0, 0, 1]; + let server = [10, 0, 0, 2]; + + // SYN + let syn = make_tcp_packet(client, 12345, server, 80, 1000, &[], true, false, false); + reassembler.process_packet(&syn, 1, &mut handler); + + // Send packets [1, 3, 2] + let p1 = make_tcp_packet(client, 12345, server, 80, 1001, b"aaa", false, false, false); + reassembler.process_packet(&p1, 2, &mut handler); + + let p3 = make_tcp_packet(client, 12345, server, 80, 1007, b"ccc", false, false, false); + reassembler.process_packet(&p3, 3, &mut handler); + assert_eq!(handler.data_events.len(), 1); // only p1 flushed + + let p2 = make_tcp_packet(client, 12345, server, 80, 1004, b"bbb", false, false, false); + reassembler.process_packet(&p2, 4, &mut handler); + + // Now all three should be flushed + assert_eq!(handler.all_data(), b"aaabbbccc"); +} + +#[test] +fn test_mid_stream_no_syn() { + let config = ReassemblyConfig::default(); + let mut reassembler = TcpReassembler::new(config); + let mut handler = RecordingHandler::new(); + + let client = [10, 0, 0, 1]; + let server = [10, 0, 0, 2]; + + // Data without SYN + let p1 = make_tcp_packet(client, 12345, server, 80, 5000, b"hello", false, false, false); + reassembler.process_packet(&p1, 1, &mut handler); + + assert_eq!(handler.all_data(), b"hello"); + + let stats = reassembler.stats(); + assert_eq!(stats.flows_total, 1); + assert_eq!(stats.flows_partial, 1); +} + +#[test] +fn test_rst_closes_flow() { + let config = ReassemblyConfig::default(); + let mut reassembler = TcpReassembler::new(config); + let mut handler = RecordingHandler::new(); + + let client = [10, 0, 0, 1]; + let server = [10, 0, 0, 2]; + + let syn = make_tcp_packet(client, 12345, server, 80, 1000, &[], true, false, false); + reassembler.process_packet(&syn, 1, &mut handler); + + let data = make_tcp_packet(client, 12345, server, 80, 1001, b"data", false, false, false); + reassembler.process_packet(&data, 2, &mut handler); + + let rst = make_tcp_packet(server, 80, client, 12345, 2000, &[], false, false, true); + reassembler.process_packet(&rst, 3, &mut handler); + + assert_eq!(handler.close_events.len(), 1); + assert_eq!(handler.close_events[0].1, CloseReason::Rst); +} + +#[test] +fn test_finalize_flushes_remaining() { + let config = ReassemblyConfig::default(); + let mut reassembler = TcpReassembler::new(config); + let mut handler = RecordingHandler::new(); + + let client = [10, 0, 0, 1]; + let server = [10, 0, 0, 2]; + + let syn = make_tcp_packet(client, 12345, server, 80, 1000, &[], true, false, false); + reassembler.process_packet(&syn, 1, &mut handler); + + let data = make_tcp_packet(client, 12345, server, 80, 1001, b"leftover", false, false, false); + reassembler.process_packet(&data, 2, &mut handler); + + // Finalize — should close all flows + reassembler.finalize(&mut handler); + + assert_eq!(handler.close_events.len(), 1); + assert_eq!(handler.close_events[0].1, CloseReason::Timeout); +} + +#[test] +fn test_flow_timeout_expiration() { + let config = ReassemblyConfig { + flow_timeout_secs: 10, + ..ReassemblyConfig::default() + }; + let mut reassembler = TcpReassembler::new(config); + let mut handler = RecordingHandler::new(); + + let client = [10, 0, 0, 1]; + let server = [10, 0, 0, 2]; + + let syn = make_tcp_packet(client, 12345, server, 80, 1000, &[], true, false, false); + reassembler.process_packet(&syn, 100, &mut handler); + + // Expire at time 200 (100 seconds later, > 10s timeout) + reassembler.expire_flows(200, &mut handler); + + assert_eq!(handler.close_events.len(), 1); + assert_eq!(handler.close_events[0].1, CloseReason::Timeout); + + let stats = reassembler.stats(); + assert_eq!(stats.flows_expired, 1); +} +``` + +- [ ] **Step 2: Run test to verify it fails** + +Run: `cargo test --test reassembly_engine_tests` +Expected: FAIL — `mod_public` not found (we need to expose TcpReassembler from mod.rs). + +- [ ] **Step 3: Implement TcpReassembler in mod.rs** + +Replace `src/reassembly/mod.rs` with: + +```rust +pub mod flow; +pub mod handler; +pub mod segment; + +use std::collections::HashMap; + +use crate::decoder::{ParsedPacket, Protocol, TransportInfo}; +use crate::findings::{Confidence, Finding, ThreatCategory, Verdict}; +use crate::reassembly::flow::{FlowKey, FlowState, TcpFlow}; +use crate::reassembly::handler::{CloseReason, Direction, StreamHandler}; +use crate::reassembly::segment::{flush_contiguous, insert_segment, InsertResult}; + +#[derive(Debug, Clone)] +pub struct ReassemblyConfig { + pub max_depth_per_direction: usize, + pub global_memcap: usize, + pub flow_timeout_secs: u32, +} + +impl Default for ReassemblyConfig { + fn default() -> Self { + ReassemblyConfig { + max_depth_per_direction: 10_485_760, // 10MB + global_memcap: 1_073_741_824, // 1GB + flow_timeout_secs: 300, // 5 min + } + } +} + +#[derive(Debug, Default)] +pub struct ReassemblyStats { + pub flows_total: u64, + pub flows_partial: u64, + pub flows_expired: u64, + pub flows_evicted: u64, + pub packets_processed: u64, + pub packets_skipped: u64, + pub depth_exceeded_count: u64, + pub memcap_exceeded: bool, +} + +pub struct TcpReassembler { + flows: HashMap, + config: ReassemblyConfig, + total_memory_used: usize, + stats: ReassemblyStats, + findings: Vec, +} + +impl TcpReassembler { + pub fn new(config: ReassemblyConfig) -> Self { + TcpReassembler { + flows: HashMap::new(), + config, + total_memory_used: 0, + stats: ReassemblyStats::default(), + findings: Vec::new(), + } + } + + pub fn process_packet( + &mut self, + packet: &ParsedPacket, + timestamp: u32, + handler: &mut dyn StreamHandler, + ) { + // Only process TCP packets + let (src_port, dst_port, seq_number, syn, fin, rst) = match &packet.transport { + TransportInfo::Tcp { + src_port, + dst_port, + seq_number, + syn, + fin, + rst, + .. + } => (*src_port, *dst_port, *seq_number, *syn, *fin, *rst), + _ => { + self.stats.packets_skipped += 1; + return; + } + }; + + self.stats.packets_processed += 1; + + let key = FlowKey::new(packet.src_ip, src_port, packet.dst_ip, dst_port); + + // Get or create flow + let is_new_flow = !self.flows.contains_key(&key); + let flow = self.flows.entry(key.clone()).or_insert_with(|| { + let mut f = TcpFlow::new(key.clone(), timestamp); + self.stats.flows_total += 1; + f + }); + flow.last_seen = timestamp; + + // Determine direction and handle flags + if syn && !flow.client_to_server.fin_seen { + flow.set_initiator(packet.src_ip, src_port); + if rst { + // SYN+RST is weird, ignore + } else if flow.state == FlowState::SynSent { + // SYN+ACK (server response) + flow.on_syn_ack(); + flow.server_to_client.set_isn(seq_number); + } else { + // SYN (client initiating) + flow.on_syn(); + flow.client_to_server.set_isn(seq_number); + } + } + + if rst { + flow.on_rst(); + handler.on_flow_close(&flow.key, CloseReason::Rst); + return; + } + + if fin { + let dir = flow.direction(packet.src_ip, src_port); + flow.get_direction_mut(dir).fin_seen = true; + flow.on_fin(); + } + + // Process payload + if !packet.payload.is_empty() { + if flow.state == FlowState::New { + // Mid-stream pickup + flow.set_initiator(packet.src_ip, src_port); + flow.on_data_without_syn(); + self.stats.flows_partial += 1; + } + + let dir = flow.direction(packet.src_ip, src_port); + let flow_dir = flow.get_direction_mut(dir); + + // Infer ISN if not set (mid-stream) + if flow_dir.isn.is_none() { + flow_dir.infer_isn(seq_number); + } + + // Check memcap before inserting + if self.total_memory_used + packet.payload.len() > self.config.global_memcap { + self.evict_flows(timestamp, handler); + self.stats.memcap_exceeded = true; + eprintln!( + "Warning: reassembly memory cap reached, evicting flows. \ + Re-run with --reassembly-memcap to increase." + ); + } + + let result = insert_segment( + flow_dir, + seq_number, + &packet.payload, + self.config.max_depth_per_direction, + ); + + // Generate findings for anomalies + match result { + InsertResult::ConflictingOverlap => { + self.findings.push(Finding { + category: ThreatCategory::Anomaly, + verdict: Verdict::Likely, + confidence: Confidence::High, + summary: format!( + "Conflicting data in overlapping TCP segments on flow {}", + flow.key + ), + evidence: vec!["Possible insertion/evasion attack".into()], + mitre_technique: Some("T1036".into()), + source_ip: Some(packet.src_ip), + timestamp: None, + }); + } + InsertResult::Truncated => { + self.stats.depth_exceeded_count += 1; + self.findings.push(Finding { + category: ThreatCategory::Anomaly, + verdict: Verdict::Inconclusive, + confidence: Confidence::Low, + summary: format!( + "Flow {} exceeded reassembly depth ({}MB), stream truncated", + flow.key, + self.config.max_depth_per_direction / 1_048_576, + ), + evidence: vec![], + mitre_technique: None, + source_ip: None, + timestamp: None, + }); + } + InsertResult::DepthExceeded => { + // Already counted + } + _ => {} + } + + // Check overlap count threshold + if flow_dir.overlap_count == 51 { + self.findings.push(Finding { + category: ThreatCategory::Anomaly, + verdict: Verdict::Likely, + confidence: Confidence::Medium, + summary: format!( + "Excessive TCP segment overlaps on flow {} ({} overlaps)", + flow.key, flow_dir.overlap_count + ), + evidence: vec!["Possible evasion attempt".into()], + mitre_technique: Some("T1036".into()), + source_ip: Some(packet.src_ip), + timestamp: None, + }); + } + + // Check small segment flood + if flow_dir.small_segment_count == 2049 { + self.findings.push(Finding { + category: ThreatCategory::Anomaly, + verdict: Verdict::Inconclusive, + confidence: Confidence::Medium, + summary: format!( + "Excessive small TCP segments on flow {} ({} segments <8 bytes)", + flow.key, flow_dir.small_segment_count + ), + evidence: vec!["Possible IDS evasion".into()], + mitre_technique: None, + source_ip: Some(packet.src_ip), + timestamp: None, + }); + } + + // Flush contiguous data + let flushed = flush_contiguous(flow_dir); + let dir_enum = dir; + for (offset, data) in &flushed { + self.total_memory_used = self.total_memory_used.saturating_sub(data.len()); + handler.on_data(&flow.key, dir_enum, data, *offset); + } + + // Track memory added by non-flushed segments + self.total_memory_used = self + .flows + .values() + .map(|f| f.memory_used()) + .sum(); + } + } + + pub fn expire_flows(&mut self, current_time: u32, handler: &mut dyn StreamHandler) { + let timeout = self.config.flow_timeout_secs; + let expired_keys: Vec = self + .flows + .iter() + .filter(|(_, flow)| { + flow.state != FlowState::Closed + && current_time.saturating_sub(flow.last_seen) > timeout + }) + .map(|(key, _)| key.clone()) + .collect(); + + for key in expired_keys { + if let Some(mut flow) = self.flows.remove(&key) { + flow.state = FlowState::TimedOut; + self.total_memory_used = self.total_memory_used.saturating_sub(flow.memory_used()); + handler.on_flow_close(&flow.key, CloseReason::Timeout); + self.stats.flows_expired += 1; + } + } + } + + pub fn finalize(&mut self, handler: &mut dyn StreamHandler) { + let all_keys: Vec = self.flows.keys().cloned().collect(); + for key in all_keys { + if let Some(flow) = self.flows.remove(&key) { + self.total_memory_used = self.total_memory_used.saturating_sub(flow.memory_used()); + handler.on_flow_close(&flow.key, CloseReason::Timeout); + } + } + } + + pub fn stats(&self) -> &ReassemblyStats { + &self.stats + } + + pub fn findings(&self) -> &[Finding] { + &self.findings + } + + fn evict_flows(&mut self, current_time: u32, handler: &mut dyn StreamHandler) { + // First evict non-established flows + let non_established: Vec = self + .flows + .iter() + .filter(|(_, f)| { + f.state != FlowState::Established && f.state != FlowState::Closing + }) + .map(|(k, _)| k.clone()) + .collect(); + + for key in non_established { + if let Some(flow) = self.flows.remove(&key) { + self.total_memory_used = self.total_memory_used.saturating_sub(flow.memory_used()); + handler.on_flow_close(&flow.key, CloseReason::MemoryPressure); + self.stats.flows_evicted += 1; + } + if self.total_memory_used < self.config.global_memcap { + return; + } + } + + // Then evict LRU established flows + let mut by_last_seen: Vec<(FlowKey, u32)> = self + .flows + .iter() + .map(|(k, f)| (k.clone(), f.last_seen)) + .collect(); + by_last_seen.sort_by_key(|(_, ts)| *ts); + + for (key, _) in by_last_seen { + if self.total_memory_used < self.config.global_memcap { + return; + } + if let Some(flow) = self.flows.remove(&key) { + self.total_memory_used = self.total_memory_used.saturating_sub(flow.memory_used()); + handler.on_flow_close(&flow.key, CloseReason::MemoryPressure); + self.stats.flows_evicted += 1; + } + } + } +} +``` + +- [ ] **Step 4: Fix test imports** + +The test uses `mod_public` which was a placeholder. The actual types are at `wirerust::reassembly::{ReassemblyConfig, ReassemblyStats, TcpReassembler}`. Update the import in `tests/reassembly_engine_tests.rs`: + +Replace: +```rust +use wirerust::reassembly::mod_public::{ReassemblyConfig, ReassemblyStats, TcpReassembler}; +``` +With: +```rust +use wirerust::reassembly::{ReassemblyConfig, ReassemblyStats, TcpReassembler}; +``` + +- [ ] **Step 5: Run tests** + +Run: `cargo test --test reassembly_engine_tests` +Expected: 6 tests PASS. + +Run: `cargo test` +Expected: All tests pass (existing + new). + +- [ ] **Step 6: Commit** + +```bash +git add src/reassembly/mod.rs tests/reassembly_engine_tests.rs +git commit -m "feat: add TcpReassembler engine with memcap, depth limits, and anomaly detection" +``` + +--- + +### Task 5: CLI Flags and main.rs Integration + +**Files:** +- Modify: `src/cli.rs` +- Modify: `src/main.rs` +- Modify: `tests/cli_tests.rs` + +- [ ] **Step 1: Add reassembly CLI flags** + +In `src/cli.rs`, add to the `Cli` struct: + +```rust +/// Force TCP stream reassembly on +#[arg(long, global = true)] +pub reassemble: bool, + +/// Force TCP stream reassembly off (quick scan) +#[arg(long, global = true)] +pub no_reassemble: bool, + +/// Per-direction stream reassembly limit in MB (default: 10) +#[arg(long, global = true, default_value_t = 10)] +pub reassembly_depth: usize, + +/// Global reassembly memory cap in MB (default: 1024) +#[arg(long, global = true, default_value_t = 1024)] +pub reassembly_memcap: usize, +``` + +- [ ] **Step 2: Add CLI test for reassembly flags** + +Add to `tests/cli_tests.rs`: + +```rust +#[test] +fn test_reassembly_flags() { + let cli = Cli::parse_from([ + "wirerust", + "analyze", + "test.pcap", + "--reassemble", + "--reassembly-depth", + "20", + "--reassembly-memcap", + "2048", + ]); + assert!(cli.reassemble); + assert_eq!(cli.reassembly_depth, 20); + assert_eq!(cli.reassembly_memcap, 2048); +} + +#[test] +fn test_no_reassemble_flag() { + let cli = Cli::parse_from(["wirerust", "analyze", "test.pcap", "--no-reassemble"]); + assert!(cli.no_reassemble); +} +``` + +- [ ] **Step 3: Wire reassembler into main.rs** + +Update `src/main.rs` — add imports and modify `run_analyze`: + +Add to imports: +```rust +use wirerust::reassembly::{ReassemblyConfig, TcpReassembler}; +use wirerust::reassembly::handler::StreamHandler; +``` + +Update `run_analyze` to create and use the reassembler: + +```rust +fn run_analyze( + targets: &[std::path::PathBuf], + enable_dns: bool, + use_color: bool, + cli: &Cli, +) -> Result<()> { + let mut summary = Summary::new(); + let mut dns_analyzer = DnsAnalyzer::new(); + let mut all_findings = Vec::new(); + + // Determine if reassembly is needed + let needs_reassembly = cli.reassemble; // Will expand when HTTP/TLS analyzers added + let skip_reassembly = cli.no_reassemble; + + let mut reassembler = if needs_reassembly && !skip_reassembly { + let config = ReassemblyConfig { + max_depth_per_direction: cli.reassembly_depth * 1_048_576, + global_memcap: cli.reassembly_memcap * 1_048_576, + ..ReassemblyConfig::default() + }; + Some(TcpReassembler::new(config)) + } else { + None + }; + + // Placeholder handler for now — will be replaced by actual stream analyzers + struct NullHandler; + impl StreamHandler for NullHandler { + fn on_data(&mut self, _: &wirerust::reassembly::flow::FlowKey, _: wirerust::reassembly::handler::Direction, _: &[u8], _: u64) {} + fn on_flow_close(&mut self, _: &wirerust::reassembly::flow::FlowKey, _: wirerust::reassembly::handler::CloseReason) {} + } + let mut stream_handler = NullHandler; + + for target in targets { + let pcap_files = resolve_targets(target)?; + for path in &pcap_files { + let source = PcapSource::from_file(path) + .with_context(|| format!("Failed to read {}", path.display()))?; + + let pb = ProgressBar::new(source.packets.len() as u64); + pb.set_style(ProgressStyle::with_template( + "[{elapsed_precise}] {bar:40} {pos}/{len} packets", + )?); + + for raw in &source.packets { + if let Ok(parsed) = decode_packet(&raw.data) { + summary.ingest(&parsed); + + if enable_dns && dns_analyzer.can_decode(&parsed) { + let findings = dns_analyzer.analyze(&parsed); + all_findings.extend(findings); + } + + if let Some(ref mut reasm) = reassembler { + reasm.process_packet(&parsed, raw.timestamp_secs, &mut stream_handler); + } + } + pb.inc(1); + } + pb.finish_and_clear(); + } + } + + // Finalize reassembler + if let Some(ref mut reasm) = reassembler { + reasm.finalize(&mut stream_handler); + all_findings.extend(reasm.findings().to_vec()); + } + + let analyzer_summaries = if enable_dns { + vec![dns_analyzer.summarize()] + } else { + vec![] + }; + + let output = match cli.output_format { + Some(OutputFormat::Json) => { + let reporter = JsonReporter; + reporter.render(&summary, &all_findings, &analyzer_summaries) + } + _ => { + let reporter = TerminalReporter { use_color }; + reporter.render(&summary, &all_findings, &analyzer_summaries) + } + }; + + println!("{output}"); + Ok(()) +} +``` + +- [ ] **Step 4: Run all tests** + +Run: `cargo test` +Expected: All tests pass. + +Run: `cargo clippy --all-targets -- -D warnings` +Expected: No errors. + +Run: `cargo run -- --help` +Expected: Shows `--reassemble`, `--no-reassemble`, `--reassembly-depth`, `--reassembly-memcap` flags. + +- [ ] **Step 5: Commit** + +```bash +git add src/cli.rs src/main.rs tests/cli_tests.rs +git commit -m "feat: wire TCP reassembler into CLI and analyze pipeline" +``` + +--- + +### Task 6: Final Validation and Push + +- [ ] **Step 1: Run full test suite** + +Run: `cargo test` +Expected: All tests pass. + +Run: `cargo clippy --all-targets -- -D warnings` +Expected: No errors. + +Run: `cargo fmt --all --check` +Expected: No formatting issues. + +- [ ] **Step 2: Push branch** + +```bash +git push -u origin feature/tcp-reassembly +``` + +- [ ] **Step 3: Create PR** + +```bash +gh pr create --repo Zious11/wirerust --base develop --title "feat: add TCP stream reassembly engine" --body "$(cat <<'EOF' +## Summary +- Forensic-grade TCP stream reassembly module (`src/reassembly/`) +- FlowKey canonicalization, ISN-relative u64 offsets, BTreeMap segment storage +- First-wins overlap policy with anomaly detection (conflicting data, excessive overlaps, small segment floods) +- Configurable depth limit (10MB/direction) and global memcap (1GB) with LRU eviction +- Mid-stream pickup (missing SYN) with partial flow flagging +- Incremental stream delivery via StreamHandler callbacks +- CLI flags: `--reassemble`, `--no-reassemble`, `--reassembly-depth`, `--reassembly-memcap` + +Closes: n/a (infrastructure for #1, #2) + +## Test plan +- [ ] `cargo test` — all tests pass +- [ ] `cargo clippy -- -D warnings` — clean +- [ ] `cargo fmt --check` — clean +- [ ] Segment tests: ordered, out-of-order, overlap, retransmit, wraparound, depth truncation +- [ ] Engine tests: three-packet stream, OOO delivery, mid-stream, RST, finalize, timeout +- [ ] Flow tests: canonicalization, state transitions, direction detection +EOF +)" +``` diff --git a/docs/superpowers/specs/2026-04-06-tcp-reassembly-design.md b/docs/superpowers/specs/2026-04-06-tcp-reassembly-design.md new file mode 100644 index 0000000..0df2f8c --- /dev/null +++ b/docs/superpowers/specs/2026-04-06-tcp-reassembly-design.md @@ -0,0 +1,374 @@ +# TCP Stream Reassembly — Design Spec + +## Goal + +Add forensic-grade TCP stream reassembly to wirerust so that TCP-based protocol analyzers (HTTP, TLS, SMB, etc.) can operate on complete, ordered byte streams instead of individual packet payloads. + +## Why + +Without reassembly, TCP-based analyzers only see whatever fits in a single packet. HTTP requests span multiple segments, TLS ClientHellos can be fragmented, and any protocol over TCP is unreliable to parse packet-by-packet. pcapper (Python/Scapy) has basic reassembly via `TCPSession` but handles retransmissions and overlapping segments poorly. wirerust's reassembly is a correctness advantage over pcapper, not just a speed advantage. + +## Architecture + +The reassembly module sits between the decoder and stream-based analyzers: + +``` +Reader → Decoder → TcpReassembler ──→ StreamAnalyzers (HTTP, TLS, SMB...) + │ + └──→ per-packet Analyzers (DNS, port scan...) +``` + +Every decoded packet goes through both paths. The reassembler tracks TCP flows and delivers contiguous byte streams to stream analyzers via callbacks. Per-packet analyzers (DNS, etc.) continue to receive individual packets unchanged. + +## Core Data Model + +### FlowKey + +Identifies a TCP connection. Canonicalized so both directions map to the same key. + +```rust +pub struct FlowKey { + pub lower_ip: IpAddr, + pub lower_port: u16, + pub upper_ip: IpAddr, + pub upper_port: u16, +} +``` + +Canonicalization: compare `(ip, port)` tuples lexicographically; the smaller one is `lower`. This means `A→B` and `B→A` produce the same `FlowKey`. + +### FlowDirection + +One side of a TCP connection. Each flow has two: client→server and server→client. + +```rust +pub struct FlowDirection { + pub isn: Option, + pub base_offset: u64, + pub segments: BTreeMap>, + pub reassembled_bytes: usize, + pub overlap_count: u32, + pub small_segment_count: u32, + pub fin_seen: bool, + pub rst_seen: bool, + pub depth_exceeded: bool, +} +``` + +- `isn`: Initial Sequence Number. Set from SYN or inferred from first data packet. +- `base_offset`: The next contiguous byte expected, ISN-relative. Starts at 1 (ISN+1 is the first data byte after SYN). Uses `u64` to handle streams >4GB without key-space wraparound. +- `segments`: Out-of-order buffer. Keyed by ISN-relative offset as `u64` (`(seq.wrapping_sub(isn)) as u64`). BTreeMap provides ordered iteration for flush. +- `reassembled_bytes`: Total bytes flushed so far. Used to enforce depth limit. +- `overlap_count`: Number of overlapping segments seen. If >50, generate an evasion-attempt Finding. +- `small_segment_count`: Consecutive segments <8 bytes. If >2048, generate an evasion-attempt Finding. +- `fin_seen`, `rst_seen`: Terminal flag tracking. +- `depth_exceeded`: Set when `reassembled_bytes` exceeds the per-direction limit. + +### TcpFlow + +A complete TCP connection. + +```rust +pub struct TcpFlow { + pub key: FlowKey, + pub client_to_server: FlowDirection, + pub server_to_client: FlowDirection, + pub state: FlowState, + pub partial: bool, + pub first_seen: u32, + pub last_seen: u32, +} +``` + +- `state`: `New`, `SynSent`, `Established`, `Closing`, `Closed`, `TimedOut`. +- `partial`: `true` if the flow was picked up mid-stream (no SYN observed). Forensic reports include this flag. + +### FlowState Transitions + +``` +New → SynSent (SYN seen) +SynSent → Established (SYN+ACK seen, or data seen) +New → Established (data without SYN — mid-stream pickup, sets partial=true) +Established → Closing (FIN seen on either direction) +Closing → Closed (FIN seen on both directions, or timeout) +Any → Closed (RST seen) +Any → TimedOut (flow_timeout_secs exceeded) +``` + +## Sequence Number Handling + +All segment keys are stored as ISN-relative offsets cast to `u64`: `(seq.wrapping_sub(isn)) as u64`. The wrapping subtraction handles the 32-bit sequence space correctly, and promoting to `u64` ensures BTreeMap key ordering works for streams of any size (including >4GB transfers that wrap the sequence space). + +Comparison helpers for raw seq numbers use wrapping arithmetic: + +```rust +fn seq_before(a: u32, b: u32) -> bool { + // a is "before" b in TCP sequence space (signed comparison of difference) + (a.wrapping_sub(b) as i32) < 0 +} + +fn seq_offset(seq: u32, isn: u32) -> u64 { + seq.wrapping_sub(isn) as u64 +} +``` + +## Overlap Handling + +**Policy: first-wins (hardcoded).** When a new segment overlaps existing data, the existing bytes are kept. This matches the behavior of Windows, macOS, and BSD — the majority of real-world targets. It also matches what Zeek and NetworkMiner do. + +Implementation: on segment insertion, check BTreeMap neighbors. If the new segment's range overlaps any existing segment, trim the new one to only cover gaps. If fully covered, discard it (retransmission dedup). + +**Anomaly detection on overlaps:** +- Increment `overlap_count` on every overlap. +- If `overlap_count > 50` on a flow direction, generate a Finding: `[Anomaly] LIKELY (MEDIUM) — Excessive TCP segment overlaps on flow {key} ({count} overlaps), possible evasion attempt. MITRE: T1036.` +- If overlapping data differs from existing data (not a simple retransmit), generate: `[Anomaly] LIKELY (HIGH) — Conflicting data in overlapping TCP segments on flow {key}, possible insertion/evasion attack.` + +**Depth truncation mid-segment:** When inserting a segment that would exceed `max_depth_per_direction`, truncate it to `depth - reassembled_bytes` rather than dropping entirely. This captures as much as possible before cutting off. + +**Small segment flood detection:** Track consecutive segments <8 bytes in `small_segment_count`. If >2048, generate: `[Anomaly] INCONCLUSIVE (MEDIUM) — Excessive small TCP segments on flow {key} ({count} segments <8 bytes), possible IDS evasion.` + +## Mid-Stream Pickup + +If data arrives for a flow with no SYN observed: +1. Set `isn` to the first segment's sequence number minus 1. +2. Set state to `Established`. +3. Set `partial = true`. +4. Reassembly proceeds normally from there. + +This handles common scenarios: pcap capture started after connection was established, asymmetric SPAN port configs, or SYN packets dropped. + +## Reassembly Engine + +### TcpReassembler + +```rust +pub struct TcpReassembler { + flows: HashMap, + config: ReassemblyConfig, + total_memory_used: usize, +} + +pub struct ReassemblyConfig { + pub max_depth_per_direction: usize, // default: 10MB (10_485_760) + pub global_memcap: usize, // default: 1GB (1_073_741_824) + pub flow_timeout_secs: u32, // default: 300 +} +``` + +### Public API + +```rust +impl TcpReassembler { + pub fn new(config: ReassemblyConfig) -> Self; + + /// Process a decoded packet. Calls handler callbacks when new + /// contiguous data becomes available. Uses pcap timestamp for + /// timeout tracking (not wall clock). + pub fn process_packet( + &mut self, + packet: &ParsedPacket, + timestamp: u32, + handler: &mut dyn StreamHandler, + ); + + /// Expire flows older than flow_timeout_secs. + /// Evicts non-established first, then LRU. + pub fn expire_flows( + &mut self, + current_time: u32, + handler: &mut dyn StreamHandler, + ); + + /// Expire all remaining flows. Call at end of pcap. + pub fn finalize(&mut self, handler: &mut dyn StreamHandler); + + pub fn stats(&self) -> ReassemblyStats; +} +``` + +### Processing Flow (per packet) + +1. Skip non-TCP packets. +2. Extract FlowKey from ParsedPacket (canonicalize). +3. Look up or create TcpFlow in HashMap. +4. Determine direction (client→server or server→client) by comparing src against flow initiator. +5. Handle TCP flags: + - SYN: record ISN, update state. + - SYN+ACK: record server ISN, transition to Established. + - FIN: mark `fin_seen` on that direction, transition state. + - RST: mark `rst_seen`, transition to Closed, call `handler.on_flow_close()`. +6. If payload present and depth not exceeded and memcap not exceeded: + - Compute ISN-relative offset. + - Check for overlaps with existing segments (first-wins: trim new). + - Insert into BTreeMap. + - Flush: iterate from `base_offset`, move contiguous segments out, call `handler.on_data()` with the new bytes. + - Advance `base_offset`, increment `reassembled_bytes`. + - Update `total_memory_used`. +7. Update `last_seen` with pcap timestamp. + +### Contiguous Flush + +After inserting a segment, scan the BTreeMap starting from `base_offset`: + +``` +While BTreeMap contains a segment at base_offset: + Remove it from BTreeMap + Call handler.on_data(flow_key, direction, &data, base_offset) + base_offset += data.len() + reassembled_bytes += data.len() +``` + +If `reassembled_bytes` exceeds `max_depth_per_direction`, set `depth_exceeded = true` and generate a Finding. + +## StreamHandler Trait + +Analyzers that need reassembled streams implement this: + +```rust +pub enum Direction { + ClientToServer, + ServerToClient, +} + +pub enum CloseReason { + Fin, + Rst, + Timeout, + MemoryPressure, +} + +pub trait StreamHandler { + fn on_data( + &mut self, + flow_key: &FlowKey, + direction: Direction, + data: &[u8], + offset: u64, + ); + + fn on_flow_close( + &mut self, + flow_key: &FlowKey, + reason: CloseReason, + ); +} +``` + +### StreamAnalyzer Trait + +Extends StreamHandler with reporting methods compatible with the existing Reporter system: + +```rust +pub trait StreamAnalyzer: StreamHandler { + fn name(&self) -> &'static str; + fn summarize(&self) -> AnalysisSummary; + fn findings(&self) -> Vec; +} +``` + +Reporters consume `Vec` and `Vec` from both `ProtocolAnalyzer` (per-packet) and `StreamAnalyzer` (stream-based) — no changes to reporters needed. + +## Memory Management + +### Per-Direction Depth Limit (default 10MB) + +Once `reassembled_bytes` exceeds `max_depth_per_direction` on a flow direction: +- Stop storing new payload for that direction. +- Continue tracking the flow for metadata (packet counts, flags, timing). +- Generate a Finding: `[Anomaly] INCONCLUSIVE (LOW) — Flow {key} exceeded reassembly depth (10MB), stream truncated.` + +### Global Memory Cap (default 1GB) + +Before inserting a new segment, check `total_memory_used` against `global_memcap`. If exceeded: + +1. Evict non-established flows first (SynSent, Closing, half-open) — likely port scans. +2. If still over, evict established flows by LRU (`last_seen` oldest first). +3. Each eviction: flush accumulated data to handler, call `on_flow_close(MemoryPressure)`. +4. Increment `stats.flows_evicted`. +5. Log warning to stderr: "Reassembly memory cap reached, evicting flows." + +The final report includes: "Warning: reassembly memory cap reached, N flows evicted. Re-run with --reassembly-memcap to increase." + +## Auto-Detect Activation + +Reassembly is not always-on. It activates automatically when any TCP-based analyzer is enabled: + +```rust +let needs_reassembly = enable_http || enable_tls || enable_smb || cli.reassemble; +let skip_reassembly = cli.no_reassemble; + +let mut reassembler = if needs_reassembly && !skip_reassembly { + Some(TcpReassembler::new(config)) +} else { + None +}; +``` + +When no TCP analyzers are active and `--reassemble` is not set, the reassembler is not created. Zero overhead for DNS-only or summary-only runs. + +## CLI Flags + +``` +--reassemble Force TCP reassembly on +--no-reassemble Force TCP reassembly off (quick header scan) +--reassembly-depth Per-direction stream limit in MB (default: 10) +--reassembly-memcap Global reassembly memory cap in MB (default: 1024) +``` + +## Edge Cases + +| Scenario | Handling | +|----------|----------| +| Malformed TCP header | Skip packet, increment `stats.malformed_packets` | +| SYN retransmit with different ISN | Keep first ISN (first-wins) | +| Zero-window probes / keepalives | Detect single byte at `expected_seq - 1`, ignore | +| One-sided capture (only one direction) | Visible direction reassembles, other stays empty | +| Duplicate FIN/RST | Ignore after the first | +| Half-close (FIN one direction, data continues other) | Each direction tracked independently | +| Pcap timestamp not monotonic | Use packet timestamps as-is; timeout still works since we compare against `last_seen` | + +## File Structure + +``` +src/reassembly/ +├── mod.rs — TcpReassembler, ReassemblyConfig, ReassemblyStats, public API +├── flow.rs — FlowKey, TcpFlow, FlowDirection, FlowState +├── segment.rs — Segment insertion, overlap handling, contiguous flush logic +└── handler.rs — StreamHandler trait, StreamAnalyzer trait, Direction, CloseReason +``` + +## Testing Strategy + +### Unit Tests + +- **segment.rs**: Insert ordered, out-of-order, overlapping, retransmitted, and wrapping segments. Verify first-wins overlap. Verify contiguous flush produces correct bytes at correct offsets. +- **flow.rs**: State transitions through full lifecycle. FlowKey canonicalization (A→B == B→A). Mid-stream pickup sets partial flag. ISN inference from first data packet. +- **mod.rs**: Depth limit enforcement (>10MB stops buffering, generates Finding). Memcap eviction (non-established first, then LRU). Stats tracking. + +### Integration Tests + +All tests use synthetic packet bytes (same pattern as existing wirerust tests — no external fixtures). + +- Three-packet stream in order → reassembled payload matches concatenation. +- Three-packet stream out of order [1, 3, 2] → same result after reordering. +- Retransmission of packet 1 → deduplicated, single copy in stream. +- Overlapping segments with different data → first-wins, original data preserved, Finding generated for conflicting data. +- Excessive overlaps (>50) → evasion-attempt Finding generated. +- Small segment flood (>2048 segments <8 bytes) → evasion-attempt Finding generated. +- Depth truncation mid-segment → partial segment stored up to limit, truncation Finding generated. +- 11MB stream → first 10MB reassembled, truncation Finding generated. +- 100+ flows exceeding memcap → eviction occurs, stats.flows_evicted > 0. +- Mid-stream flow (no SYN) → reassembles correctly, partial flag set. +- RST mid-stream → on_flow_close(Rst) called, accumulated data flushed. + +## Future Considerations (Not In Scope) + +These are deliberately deferred. They can be added later without changing the core architecture: + +- **Configurable overlap policy** (last-wins, per-OS policy) — first-wins covers the majority of targets; add configurability only if users request it. +- **TCP Fast Open (TFO)** — Data in SYN packets. Rare in practice. Would require special handling of payload attached to SYN. +- **PAWS (Protection Against Wrapped Sequence Numbers)** — Uses TCP timestamps option to reject old duplicates. Not needed for offline pcap analysis since we see all packets. +- **Urgent pointer / out-of-band data** — Rarely used. Would need special offset handling. +- **ACK-based progress tracking** — Not needed for offline analysis. Sequence-number-only tracking is sufficient. +- **Live capture mode** — Current design uses pcap timestamps. Live mode would need wall-clock timeouts and periodic expiration on a timer. +- **Parallel reassembly with rayon** — Flow-level parallelism is possible since flows are independent, but adds complexity to the callback model. diff --git a/src/cli.rs b/src/cli.rs index 187bea3..44a2dd1 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -35,6 +35,22 @@ pub struct Cli { #[arg(long, global = true)] pub csv: Option>, + /// Force TCP stream reassembly on + #[arg(long, global = true, conflicts_with = "no_reassemble")] + pub reassemble: bool, + + /// Force TCP stream reassembly off (quick scan) + #[arg(long, global = true)] + pub no_reassemble: bool, + + /// Per-direction stream reassembly limit in MB (default: 10) + #[arg(long, global = true, default_value_t = 10)] + pub reassembly_depth: usize, + + /// Global reassembly memory cap in MB (default: 1024) + #[arg(long, global = true, default_value_t = 1024)] + pub reassembly_memcap: usize, + #[command(subcommand)] pub command: Commands, } diff --git a/src/decoder.rs b/src/decoder.rs index b675ddb..2f0ac2c 100644 --- a/src/decoder.rs +++ b/src/decoder.rs @@ -17,6 +17,7 @@ pub enum TransportInfo { Tcp { src_port: u16, dst_port: u16, + seq_number: u32, syn: bool, ack: bool, fin: bool, @@ -95,6 +96,7 @@ pub fn decode_packet(data: &[u8]) -> Result { TransportInfo::Tcp { src_port: tcp.source_port(), dst_port: tcp.destination_port(), + seq_number: tcp.to_header().sequence_number, syn: tcp.syn(), ack: tcp.ack(), fin: tcp.fin(), diff --git a/src/lib.rs b/src/lib.rs index 3b23055..9dac35d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,5 +3,6 @@ pub mod cli; pub mod decoder; pub mod findings; pub mod reader; +pub mod reassembly; pub mod reporter; pub mod summary; diff --git a/src/main.rs b/src/main.rs index 8629d8e..ade9f86 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,6 +9,9 @@ use wirerust::analyzer::dns::DnsAnalyzer; use wirerust::cli::{Cli, Commands, OutputFormat}; use wirerust::decoder::decode_packet; use wirerust::reader::PcapSource; +use wirerust::reassembly::flow::FlowKey; +use wirerust::reassembly::handler::{CloseReason, Direction, StreamHandler}; +use wirerust::reassembly::{ReassemblyConfig, TcpReassembler}; use wirerust::reporter::Reporter; use wirerust::reporter::json::JsonReporter; use wirerust::reporter::terminal::TerminalReporter; @@ -43,6 +46,28 @@ fn run_analyze( let mut dns_analyzer = DnsAnalyzer::new(); let mut all_findings = Vec::new(); + // Determine if reassembly is needed + let needs_reassembly = cli.reassemble; // Will expand when HTTP/TLS analyzers added + let skip_reassembly = cli.no_reassemble; + + let mut reassembler = if needs_reassembly && !skip_reassembly { + let config = ReassemblyConfig { + max_depth: cli.reassembly_depth * 1_048_576, + memcap: cli.reassembly_memcap * 1_048_576, + ..ReassemblyConfig::default() + }; + Some(TcpReassembler::new(config)) + } else { + None + }; + + struct NullHandler; + impl StreamHandler for NullHandler { + fn on_data(&mut self, _: &FlowKey, _: Direction, _: &[u8], _: u64) {} + fn on_flow_close(&mut self, _: &FlowKey, _: CloseReason) {} + } + let mut stream_handler = NullHandler; + for target in targets { let pcap_files = resolve_targets(target)?; for path in &pcap_files { @@ -61,6 +86,9 @@ fn run_analyze( let findings = dns_analyzer.analyze(&parsed); all_findings.extend(findings); } + if let Some(ref mut reasm) = reassembler { + reasm.process_packet(&parsed, raw.timestamp_secs, &mut stream_handler); + } } pb.inc(1); } @@ -68,6 +96,11 @@ fn run_analyze( } } + if let Some(ref mut reasm) = reassembler { + reasm.finalize(&mut stream_handler); + all_findings.extend(reasm.findings().to_vec()); + } + let analyzer_summaries = if enable_dns { vec![dns_analyzer.summarize()] } else { diff --git a/src/reassembly/flow.rs b/src/reassembly/flow.rs new file mode 100644 index 0000000..8bc3512 --- /dev/null +++ b/src/reassembly/flow.rs @@ -0,0 +1,196 @@ +use std::collections::BTreeMap; +use std::net::IpAddr; + +use crate::reassembly::handler::Direction; + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct FlowKey { + pub lower_ip: IpAddr, + pub lower_port: u16, + pub upper_ip: IpAddr, + pub upper_port: u16, +} + +impl FlowKey { + pub fn new(ip_a: IpAddr, port_a: u16, ip_b: IpAddr, port_b: u16) -> Self { + // Canonicalize by (ip, port) tuple comparison — keeps IP+port paired together. + // This is critical: sorting independently would merge different connections. + if (ip_a, port_a) <= (ip_b, port_b) { + FlowKey { + lower_ip: ip_a, + lower_port: port_a, + upper_ip: ip_b, + upper_port: port_b, + } + } else { + FlowKey { + lower_ip: ip_b, + lower_port: port_b, + upper_ip: ip_a, + upper_port: port_a, + } + } + } +} + +impl std::fmt::Display for FlowKey { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}:{} → {}:{}", + self.lower_ip, self.lower_port, self.upper_ip, self.upper_port + ) + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum FlowState { + New, + SynSent, + Established, + Closing, + Closed, +} + +#[derive(Debug)] +pub struct FlowDirection { + pub isn: Option, + pub base_offset: u64, + pub segments: BTreeMap>, + pub reassembled_bytes: usize, + pub overlap_count: u32, + pub overlap_alert_fired: bool, + pub small_segment_count: u32, + pub small_segment_alert_fired: bool, + pub fin_seen: bool, + pub rst_seen: bool, + pub depth_exceeded: bool, +} + +impl Default for FlowDirection { + fn default() -> Self { + Self::new() + } +} + +impl FlowDirection { + pub fn new() -> Self { + FlowDirection { + isn: None, + base_offset: 0, + segments: BTreeMap::new(), + reassembled_bytes: 0, + overlap_count: 0, + overlap_alert_fired: false, + small_segment_count: 0, + small_segment_alert_fired: false, + fin_seen: false, + rst_seen: false, + depth_exceeded: false, + } + } + + pub fn set_isn(&mut self, isn: u32) { + if self.isn.is_none() { + self.isn = Some(isn); + self.base_offset = 1; // ISN+1 is first data byte + } + } + + pub fn infer_isn(&mut self, first_seq: u32) { + if self.isn.is_none() { + self.isn = Some(first_seq.wrapping_sub(1)); + self.base_offset = 1; + } + } + + pub fn memory_used(&self) -> usize { + self.segments.values().map(|v| v.len()).sum() + } +} + +#[derive(Debug)] +pub struct TcpFlow { + pub key: FlowKey, + pub client_to_server: FlowDirection, + pub server_to_client: FlowDirection, + pub state: FlowState, + pub partial: bool, + pub first_seen: u32, + pub last_seen: u32, + initiator: Option<(IpAddr, u16)>, + fin_count: u8, +} + +impl TcpFlow { + pub fn new(key: FlowKey, timestamp: u32) -> Self { + TcpFlow { + key, + client_to_server: FlowDirection::new(), + server_to_client: FlowDirection::new(), + state: FlowState::New, + partial: false, + first_seen: timestamp, + last_seen: timestamp, + initiator: None, + fin_count: 0, + } + } + + pub fn set_initiator(&mut self, ip: IpAddr, port: u16) { + if self.initiator.is_none() { + self.initiator = Some((ip, port)); + } + } + + pub fn direction(&self, src_ip: IpAddr, src_port: u16) -> Direction { + if self.initiator == Some((src_ip, src_port)) { + Direction::ClientToServer + } else { + Direction::ServerToClient + } + } + + pub fn get_direction_mut(&mut self, dir: Direction) -> &mut FlowDirection { + match dir { + Direction::ClientToServer => &mut self.client_to_server, + Direction::ServerToClient => &mut self.server_to_client, + } + } + + pub fn on_syn(&mut self) { + if self.state == FlowState::New { + self.state = FlowState::SynSent; + } + } + + pub fn on_syn_ack(&mut self) { + if self.state == FlowState::SynSent || self.state == FlowState::New { + self.state = FlowState::Established; + } + } + + pub fn on_data_without_syn(&mut self) { + if self.state == FlowState::New { + self.state = FlowState::Established; + self.partial = true; + } + } + + pub fn on_fin(&mut self) { + self.fin_count = self.fin_count.saturating_add(1); + if self.fin_count >= 2 { + self.state = FlowState::Closed; + } else if self.state == FlowState::Established || self.state == FlowState::SynSent { + self.state = FlowState::Closing; + } + } + + pub fn on_rst(&mut self) { + self.state = FlowState::Closed; + } + + pub fn memory_used(&self) -> usize { + self.client_to_server.memory_used() + self.server_to_client.memory_used() + } +} diff --git a/src/reassembly/handler.rs b/src/reassembly/handler.rs new file mode 100644 index 0000000..2b0d445 --- /dev/null +++ b/src/reassembly/handler.rs @@ -0,0 +1,29 @@ +use crate::analyzer::AnalysisSummary; +use crate::findings::Finding; +use crate::reassembly::flow::FlowKey; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum Direction { + ClientToServer, + ServerToClient, +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum CloseReason { + Fin, + Rst, + Timeout, + MemoryPressure, +} + +pub trait StreamHandler { + fn on_data(&mut self, flow_key: &FlowKey, direction: Direction, data: &[u8], offset: u64); + + fn on_flow_close(&mut self, flow_key: &FlowKey, reason: CloseReason); +} + +pub trait StreamAnalyzer: StreamHandler { + fn name(&self) -> &'static str; + fn summarize(&self) -> AnalysisSummary; + fn findings(&self) -> Vec; +} diff --git a/src/reassembly/mod.rs b/src/reassembly/mod.rs new file mode 100644 index 0000000..5927078 --- /dev/null +++ b/src/reassembly/mod.rs @@ -0,0 +1,469 @@ +pub mod flow; +pub mod handler; +pub mod segment; + +use std::collections::HashMap; + +use crate::decoder::{ParsedPacket, Protocol, TransportInfo}; +use crate::findings::{Confidence, Finding, ThreatCategory, Verdict}; +use crate::reassembly::flow::{FlowKey, FlowState, TcpFlow}; +use crate::reassembly::handler::{CloseReason, StreamHandler}; +use crate::reassembly::segment::{InsertResult, flush_contiguous, insert_segment}; + +const OVERLAP_ALERT_THRESHOLD: u32 = 50; +const SMALL_SEGMENT_ALERT_THRESHOLD: u32 = 2048; +const MAX_FINDINGS: usize = 10_000; + +/// Configuration for the TCP reassembly engine. +#[derive(Debug, Clone)] +pub struct ReassemblyConfig { + /// Maximum bytes to reassemble per-direction before stopping (depth limit). + pub max_depth: usize, + /// Maximum total memory across all flows before eviction kicks in. + pub memcap: usize, + /// Seconds of inactivity before a flow is considered timed out. + pub flow_timeout_secs: u32, + /// Maximum number of concurrent flows tracked. Prevents flow table flooding. + pub max_flows: usize, + /// Maximum segments per flow direction. Prevents BTreeMap overhead explosion. + pub max_segments_per_direction: usize, +} + +impl Default for ReassemblyConfig { + fn default() -> Self { + ReassemblyConfig { + max_depth: 10 * 1024 * 1024, // 10 MB per direction + memcap: 1024 * 1024 * 1024, // 1 GB total + flow_timeout_secs: 300, // 5 minutes + max_flows: 100_000, // 100K concurrent flows + max_segments_per_direction: 10_000, // 10K segments per direction + } + } +} + +/// Counters exposed by the reassembly engine. +#[derive(Debug, Clone, Default)] +pub struct ReassemblyStats { + pub packets_processed: u64, + pub packets_tcp: u64, + pub packets_skipped_non_tcp: u64, + pub flows_total: u64, + pub flows_partial: u64, + pub flows_expired: u64, + pub flows_rst: u64, + pub flows_fin: u64, + pub segments_inserted: u64, + pub segments_duplicates: u64, + pub segments_overlaps: u64, + pub bytes_reassembled: u64, + pub evictions: u64, +} + +/// The main TCP reassembly engine. +pub struct TcpReassembler { + config: ReassemblyConfig, + flows: HashMap, + stats: ReassemblyStats, + findings: Vec, + total_memory: usize, +} + +impl TcpReassembler { + pub fn new(config: ReassemblyConfig) -> Self { + assert!(config.max_depth > 0, "max_depth must be > 0"); + assert!(config.memcap > 0, "memcap must be > 0"); + assert!(config.max_flows > 0, "max_flows must be > 0"); + assert!( + config.max_segments_per_direction > 0, + "max_segments_per_direction must be > 0" + ); + TcpReassembler { + config, + flows: HashMap::new(), + stats: ReassemblyStats::default(), + findings: Vec::new(), + total_memory: 0, + } + } + + /// Process a single parsed packet through the reassembly engine. + pub fn process_packet( + &mut self, + packet: &ParsedPacket, + timestamp: u32, + handler: &mut dyn StreamHandler, + ) { + self.stats.packets_processed += 1; + + // 1. Skip non-TCP packets + if packet.protocol != Protocol::Tcp { + self.stats.packets_skipped_non_tcp += 1; + return; + } + + // 2. Extract TCP fields + let (src_port, dst_port, seq, syn, ack, fin, rst) = match &packet.transport { + TransportInfo::Tcp { + src_port, + dst_port, + seq_number, + syn, + ack, + fin, + rst, + } => (*src_port, *dst_port, *seq_number, *syn, *ack, *fin, *rst), + _ => return, + }; + + self.stats.packets_tcp += 1; + + // 3. Build the flow key + let key = FlowKey::new(packet.src_ip, src_port, packet.dst_ip, dst_port); + + // 4. Get or create flow + if !self.flows.contains_key(&key) { + // Enforce max_flows limit + if self.flows.len() >= self.config.max_flows { + self.evict_flows(handler); + if self.flows.len() >= self.config.max_flows { + // Still at capacity after eviction — drop this packet + return; + } + } + let flow = TcpFlow::new(key.clone(), timestamp); + self.flows.insert(key.clone(), flow); + self.stats.flows_total += 1; + } + + // Work with the flow + let flow = self.flows.get_mut(&key).unwrap(); + flow.last_seen = timestamp; + + // 5. Handle SYN (without ACK) -- client initiating + if syn && !ack { + flow.set_initiator(packet.src_ip, src_port); + let dir = flow.direction(packet.src_ip, src_port); + flow.get_direction_mut(dir).set_isn(seq); + flow.on_syn(); + } + + // 6. Handle SYN+ACK -- server responding + if syn && ack { + // The responder is sending SYN+ACK, so the initiator is the *destination* + flow.set_initiator(packet.dst_ip, dst_port); + let dir = flow.direction(packet.src_ip, src_port); + flow.get_direction_mut(dir).set_isn(seq); + flow.on_syn_ack(); + } + + // 7. Handle RST — flush salvageable data, close, and remove + if rst { + flow.on_rst(); + self.stats.flows_rst += 1; + let key_clone = key.clone(); + // Flush buffered contiguous data before removing + if let Some(flow) = self.flows.get_mut(&key_clone) { + use crate::reassembly::handler::Direction; + for dir in [Direction::ClientToServer, Direction::ServerToClient] { + let flow_dir = flow.get_direction_mut(dir); + let flushed = flush_contiguous(flow_dir); + for (offset, data) in &flushed { + self.stats.bytes_reassembled += data.len() as u64; + handler.on_data(&key_clone, dir, data, *offset); + } + } + } + handler.on_flow_close(&key_clone, CloseReason::Rst); + self.flows.remove(&key_clone); + self.update_memory(); + return; + } + + // 8. Handle FIN + if fin { + let dir = flow.direction(packet.src_ip, src_port); + flow.get_direction_mut(dir).fin_seen = true; + flow.on_fin(); + // Note: if state is now Closed (both FINs seen), the flow will be + // removed after payload processing below (step 10). + } + + // 9. Handle payload + let payload = &packet.payload; + if !payload.is_empty() { + // If no SYN was seen (mid-stream join), infer state + if flow.state == FlowState::New { + flow.on_data_without_syn(); + flow.set_initiator(packet.src_ip, src_port); + let dir = flow.direction(packet.src_ip, src_port); + flow.get_direction_mut(dir).infer_isn(seq); + self.stats.flows_partial += 1; + } + + let dir = flow.direction(packet.src_ip, src_port); + + // Ensure ISN is set for this direction even on established flows + // (e.g., server direction when only SYN was seen, not SYN+ACK) + if flow.get_direction_mut(dir).isn.is_none() { + flow.get_direction_mut(dir).infer_isn(seq); + } + + let flow_dir = flow.get_direction_mut(dir); + let result = insert_segment( + flow_dir, + seq, + payload, + self.config.max_depth, + self.config.max_segments_per_direction, + ); + + match result { + InsertResult::Inserted => self.stats.segments_inserted += 1, + InsertResult::Duplicate => self.stats.segments_duplicates += 1, + InsertResult::PartialOverlap => { + self.stats.segments_overlaps += 1; + self.stats.segments_inserted += 1; + } + InsertResult::ConflictingOverlap => { + self.stats.segments_overlaps += 1; + self.generate_conflicting_overlap_finding(&key, packet.src_ip); + } + InsertResult::Truncated => { + self.stats.segments_inserted += 1; + self.generate_truncated_finding(&key, packet.src_ip); + } + InsertResult::DepthExceeded => { + // Already tracked in the direction + } + } + + // Check anomaly thresholds on the direction + let flow = self.flows.get_mut(&key).unwrap(); + let flow_dir = flow.get_direction_mut(dir); + if flow_dir.overlap_count > OVERLAP_ALERT_THRESHOLD + && !flow_dir.overlap_alert_fired + && self.findings.len() < MAX_FINDINGS + { + flow_dir.overlap_alert_fired = true; + self.findings.push(Finding { + category: ThreatCategory::Anomaly, + verdict: Verdict::Likely, + confidence: Confidence::Medium, + summary: format!( + "Excessive segment overlaps ({}) on flow {}", + flow_dir.overlap_count, key + ), + evidence: vec!["Possible evasion attempt".into()], + mitre_technique: Some("T1036".into()), + source_ip: Some(packet.src_ip), + timestamp: None, + }); + } + if flow_dir.small_segment_count > SMALL_SEGMENT_ALERT_THRESHOLD + && !flow_dir.small_segment_alert_fired + && self.findings.len() < MAX_FINDINGS + { + flow_dir.small_segment_alert_fired = true; + self.findings.push(Finding { + category: ThreatCategory::Anomaly, + verdict: Verdict::Inconclusive, + confidence: Confidence::Medium, + summary: format!( + "Excessive small segments ({}) on flow {}", + flow_dir.small_segment_count, key + ), + evidence: vec!["Possible IDS evasion".into()], + mitre_technique: None, + source_ip: Some(packet.src_ip), + timestamp: None, + }); + } + + // Flush contiguous data + let flow = self.flows.get_mut(&key).unwrap(); + let flow_dir = flow.get_direction_mut(dir); + let flushed = flush_contiguous(flow_dir); + + for (offset, data) in &flushed { + self.stats.bytes_reassembled += data.len() as u64; + handler.on_data(&key, dir, data, *offset); + } + } + + // 10. Remove FIN-closed flows after processing their final payload + if self + .flows + .get(&key) + .is_some_and(|f| f.state == FlowState::Closed) + { + // Flush remaining data in both directions before removal + if let Some(flow) = self.flows.get_mut(&key) { + use crate::reassembly::handler::Direction; + for dir in [Direction::ClientToServer, Direction::ServerToClient] { + let flow_dir = flow.get_direction_mut(dir); + let flushed = flush_contiguous(flow_dir); + for (offset, data) in &flushed { + self.stats.bytes_reassembled += data.len() as u64; + handler.on_data(&key, dir, data, *offset); + } + } + } + self.stats.flows_fin += 1; + handler.on_flow_close(&key, CloseReason::Fin); + self.flows.remove(&key); + } + + // 11. Update total memory tracking + self.update_memory(); + + // 12. Evict flows if memcap exceeded + if self.total_memory > self.config.memcap { + self.evict_flows(handler); + } + } + + /// Expire flows that have been idle longer than the configured timeout. + pub fn expire_flows(&mut self, current_time: u32, handler: &mut dyn StreamHandler) { + let timeout = self.config.flow_timeout_secs; + let expired_keys: Vec = self + .flows + .iter() + .filter(|(_, flow)| { + flow.state == FlowState::Closed + || (current_time > flow.last_seen && (current_time - flow.last_seen) > timeout) + }) + .map(|(key, _)| key.clone()) + .collect(); + + for key in expired_keys { + // Flush salvageable data before removing + if let Some(flow) = self.flows.get_mut(&key) { + use crate::reassembly::handler::Direction; + for dir in [Direction::ClientToServer, Direction::ServerToClient] { + let flow_dir = flow.get_direction_mut(dir); + let flushed = flush_contiguous(flow_dir); + for (offset, data) in &flushed { + handler.on_data(&key, dir, data, *offset); + } + } + } + self.flows.remove(&key); + self.stats.flows_expired += 1; + handler.on_flow_close(&key, CloseReason::Timeout); + } + + self.update_memory(); + } + + /// Close all remaining flows (called at end of capture). + pub fn finalize(&mut self, handler: &mut dyn StreamHandler) { + use crate::reassembly::handler::Direction; + let all_keys: Vec = self.flows.keys().cloned().collect(); + for key in all_keys { + // Flush any remaining contiguous data before closing + if let Some(flow) = self.flows.get_mut(&key) { + for dir in [Direction::ClientToServer, Direction::ServerToClient] { + let flow_dir = flow.get_direction_mut(dir); + let flushed = flush_contiguous(flow_dir); + for (offset, data) in &flushed { + handler.on_data(&key, dir, data, *offset); + } + } + } + self.flows.remove(&key); + handler.on_flow_close(&key, CloseReason::Timeout); + } + self.update_memory(); + } + + /// Return a reference to current stats. + pub fn stats(&self) -> &ReassemblyStats { + &self.stats + } + + /// Return any anomaly findings generated during reassembly. + pub fn findings(&self) -> &[Finding] { + &self.findings + } + + // --- Private helpers --- + + fn update_memory(&mut self) { + self.total_memory = self.flows.values().map(|f| f.memory_used()).sum(); + } + + /// Evict flows when memcap is exceeded. + /// Strategy: evict non-established flows first (sorted by LRU), + /// then established flows by LRU. + fn evict_flows(&mut self, handler: &mut dyn StreamHandler) { + // Sort once, then evict from the sorted list until under memcap + let mut candidates: Vec<(FlowKey, bool, u32)> = self + .flows + .iter() + .map(|(key, flow)| { + let is_established = flow.state == FlowState::Established; + (key.clone(), is_established, flow.last_seen) + }) + .collect(); + + // Sort: non-established first, then by oldest last_seen + candidates.sort_by(|a, b| { + a.1.cmp(&b.1) // false (non-established) < true (established) + .then(a.2.cmp(&b.2)) // older first + }); + + for (key, _, _) in &candidates { + if self.total_memory <= self.config.memcap && self.flows.len() <= self.config.max_flows + { + break; + } + // Flush salvageable contiguous data before evicting + if let Some(flow) = self.flows.get_mut(key) { + use crate::reassembly::handler::Direction; + for dir in [Direction::ClientToServer, Direction::ServerToClient] { + let flow_dir = flow.get_direction_mut(dir); + let flushed = flush_contiguous(flow_dir); + for (offset, data) in &flushed { + handler.on_data(key, dir, data, *offset); + } + } + } + self.flows.remove(key); + self.stats.evictions += 1; + handler.on_flow_close(key, CloseReason::MemoryPressure); + self.update_memory(); + } + } + + fn generate_conflicting_overlap_finding(&mut self, key: &FlowKey, src_ip: std::net::IpAddr) { + if self.findings.len() >= MAX_FINDINGS { + return; + } + self.findings.push(Finding { + category: ThreatCategory::Anomaly, + verdict: Verdict::Likely, + confidence: Confidence::High, + summary: format!("Conflicting TCP segment overlap on flow {}", key), + evidence: vec!["Retransmitted segment contains different data".to_string()], + mitre_technique: Some("T1036".to_string()), + source_ip: Some(src_ip), + timestamp: None, + }); + } + + fn generate_truncated_finding(&mut self, key: &FlowKey, src_ip: std::net::IpAddr) { + if self.findings.len() >= MAX_FINDINGS { + return; + } + self.findings.push(Finding { + category: ThreatCategory::Anomaly, + verdict: Verdict::Inconclusive, + confidence: Confidence::Low, + summary: format!("Stream depth exceeded on flow {}", key), + evidence: vec![format!("Max depth {} bytes reached", self.config.max_depth)], + mitre_technique: None, + source_ip: Some(src_ip), + timestamp: None, + }); + } +} diff --git a/src/reassembly/segment.rs b/src/reassembly/segment.rs new file mode 100644 index 0000000..409edcf --- /dev/null +++ b/src/reassembly/segment.rs @@ -0,0 +1,193 @@ +use crate::reassembly::flow::FlowDirection; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum InsertResult { + Inserted, + Duplicate, + PartialOverlap, + ConflictingOverlap, + Truncated, + DepthExceeded, +} + +/// Compute the ISN-relative offset for a sequence number. +fn seq_offset(seq: u32, isn: u32) -> u64 { + seq.wrapping_sub(isn) as u64 +} + +/// Insert a segment into the flow direction's out-of-order buffer. +/// Applies first-wins overlap policy and tracks anomaly counters. +pub fn insert_segment( + dir: &mut FlowDirection, + seq: u32, + data: &[u8], + max_depth: usize, + max_segments: usize, +) -> InsertResult { + if data.is_empty() { + return InsertResult::Inserted; + } + + let isn = match dir.isn { + Some(isn) => isn, + None => { + debug_assert!(false, "insert_segment called with no ISN set"); + return InsertResult::DepthExceeded; + } + }; + + // Enforce max segments per direction to prevent BTreeMap overhead explosion + if dir.segments.len() >= max_segments { + return InsertResult::DepthExceeded; + } + + // Track small segments (cumulative, not consecutive) + if data.len() < 8 { + dir.small_segment_count += 1; + } + + // Check depth limit + let remaining_depth = max_depth.saturating_sub(dir.reassembled_bytes); + if remaining_depth == 0 { + if !dir.depth_exceeded { + dir.depth_exceeded = true; + } + return InsertResult::DepthExceeded; + } + + let offset = seq_offset(seq, isn); + let mut segment_data = data.to_vec(); + + // Truncate if exceeding depth + let buffered: usize = dir.segments.values().map(|v| v.len()).sum(); + let total_after = dir.reassembled_bytes + buffered + segment_data.len(); + let truncated = if total_after > max_depth { + let allowed = max_depth.saturating_sub(dir.reassembled_bytes + buffered); + if allowed == 0 { + dir.depth_exceeded = true; + return InsertResult::DepthExceeded; + } + segment_data.truncate(allowed); + dir.depth_exceeded = true; + true + } else { + false + }; + + let new_start = offset; + let new_end = offset + segment_data.len() as u64; + + // Check for overlaps with existing segments + let mut has_overlap = false; + let mut has_conflict = false; + let mut trimmed_ranges: Vec<(u64, u64)> = Vec::new(); + + for (&existing_offset, existing_data) in dir.segments.iter() { + let existing_end = existing_offset + existing_data.len() as u64; + + if new_start < existing_end && new_end > existing_offset { + has_overlap = true; + + let overlap_start = new_start.max(existing_offset); + let overlap_end = new_end.min(existing_end); + + // Use slice comparison (SIMD-optimized) instead of byte-by-byte + let new_slice_start = (overlap_start - new_start) as usize; + let new_slice_end = (overlap_end - new_start) as usize; + let existing_slice_start = (overlap_start - existing_offset) as usize; + let existing_slice_end = (overlap_end - existing_offset) as usize; + + if new_slice_end <= segment_data.len() + && existing_slice_end <= existing_data.len() + && segment_data[new_slice_start..new_slice_end] + != existing_data[existing_slice_start..existing_slice_end] + { + has_conflict = true; + } + + trimmed_ranges.push((existing_offset, existing_end)); + } + } + + if has_overlap { + dir.overlap_count += 1; + + let fully_covered = trimmed_ranges + .iter() + .any(|&(es, ee)| es <= new_start && ee >= new_end); + if fully_covered { + return if has_conflict { + InsertResult::ConflictingOverlap + } else { + InsertResult::Duplicate + }; + } + + // First-wins: insert only gap portions + let mut gaps: Vec<(u64, u64)> = Vec::new(); + let mut cursor = new_start; + + let mut sorted_ranges = trimmed_ranges.clone(); + sorted_ranges.sort_by_key(|&(start, _)| start); + + for &(es, ee) in &sorted_ranges { + if cursor < es { + gaps.push((cursor, es.min(new_end))); + } + cursor = cursor.max(ee); + } + if cursor < new_end { + gaps.push((cursor, new_end)); + } + + let had_gap = !gaps.is_empty(); + + for (gap_start, gap_end) in gaps { + // Enforce max_segments inside gap insertion loop + if dir.segments.len() >= max_segments { + break; + } + let start_idx = (gap_start - new_start) as usize; + let end_idx = (gap_end - new_start) as usize; + if start_idx < segment_data.len() && end_idx <= segment_data.len() { + let gap_data = segment_data[start_idx..end_idx].to_vec(); + if !gap_data.is_empty() { + dir.segments.insert(gap_start, gap_data); + } + } + } + + // Only report ConflictingOverlap when fully covered (no gap was inserted) + return if !had_gap && has_conflict { + InsertResult::ConflictingOverlap + } else if truncated { + InsertResult::Truncated + } else { + InsertResult::PartialOverlap + }; + } + + // No overlap — insert normally + dir.segments.insert(offset, segment_data); + + if truncated { + InsertResult::Truncated + } else { + InsertResult::Inserted + } +} + +/// Flush contiguous segments starting from base_offset. +/// Returns Vec of (offset, data) pairs that were flushed. +pub fn flush_contiguous(dir: &mut FlowDirection) -> Vec<(u64, Vec)> { + let mut flushed = Vec::new(); + + while let Some(data) = dir.segments.remove(&dir.base_offset) { + let offset = dir.base_offset; + dir.base_offset += data.len() as u64; + dir.reassembled_bytes += data.len(); + flushed.push((offset, data)); + } + + flushed +} diff --git a/tests/analyzer_tests.rs b/tests/analyzer_tests.rs index 3512c47..77d47b9 100644 --- a/tests/analyzer_tests.rs +++ b/tests/analyzer_tests.rs @@ -26,6 +26,7 @@ fn make_non_dns_packet() -> ParsedPacket { transport: TransportInfo::Tcp { src_port: 12345, dst_port: 80, + seq_number: 1000, syn: true, ack: false, fin: false, diff --git a/tests/cli_tests.rs b/tests/cli_tests.rs index f5b0c50..4f5973a 100644 --- a/tests/cli_tests.rs +++ b/tests/cli_tests.rs @@ -45,6 +45,29 @@ fn test_summary_subcommand() { assert_eq!(cli.output_format, Some(OutputFormat::Json)); } +#[test] +fn test_reassembly_flags() { + let cli = Cli::parse_from([ + "wirerust", + "analyze", + "test.pcap", + "--reassemble", + "--reassembly-depth", + "20", + "--reassembly-memcap", + "2048", + ]); + assert!(cli.reassemble); + assert_eq!(cli.reassembly_depth, 20); + assert_eq!(cli.reassembly_memcap, 2048); +} + +#[test] +fn test_no_reassemble_flag() { + let cli = Cli::parse_from(["wirerust", "analyze", "test.pcap", "--no-reassemble"]); + assert!(cli.no_reassemble); +} + #[test] fn test_no_color_flag() { let cli = Cli::parse_from(["wirerust", "--no-color", "analyze", "test.pcap"]); diff --git a/tests/fixtures/http-ooo.pcap b/tests/fixtures/http-ooo.pcap new file mode 100644 index 0000000..be0b5d2 Binary files /dev/null and b/tests/fixtures/http-ooo.pcap differ diff --git a/tests/fixtures/http.pcap b/tests/fixtures/http.pcap new file mode 100644 index 0000000..145c2b0 Binary files /dev/null and b/tests/fixtures/http.pcap differ diff --git a/tests/fixtures/segmented.pcap b/tests/fixtures/segmented.pcap new file mode 100644 index 0000000..f86e103 Binary files /dev/null and b/tests/fixtures/segmented.pcap differ diff --git a/tests/fixtures/tls.pcap b/tests/fixtures/tls.pcap new file mode 100644 index 0000000..a1c6bd4 Binary files /dev/null and b/tests/fixtures/tls.pcap differ diff --git a/tests/reassembly_engine_tests.rs b/tests/reassembly_engine_tests.rs new file mode 100644 index 0000000..010ee33 --- /dev/null +++ b/tests/reassembly_engine_tests.rs @@ -0,0 +1,226 @@ +use std::net::{IpAddr, Ipv4Addr}; + +use wirerust::decoder::{ParsedPacket, Protocol, TransportInfo}; +use wirerust::reassembly::flow::FlowKey; +use wirerust::reassembly::handler::{CloseReason, Direction, StreamHandler}; +use wirerust::reassembly::{ReassemblyConfig, TcpReassembler}; + +/// Test handler that records all callbacks. +struct RecordingHandler { + data_events: Vec<(FlowKey, Direction, Vec, u64)>, + close_events: Vec<(FlowKey, CloseReason)>, +} + +impl RecordingHandler { + fn new() -> Self { + RecordingHandler { + data_events: Vec::new(), + close_events: Vec::new(), + } + } + + fn all_data(&self) -> Vec { + self.data_events + .iter() + .flat_map(|(_, _, data, _)| data.iter().copied()) + .collect() + } +} + +impl StreamHandler for RecordingHandler { + fn on_data(&mut self, flow_key: &FlowKey, direction: Direction, data: &[u8], offset: u64) { + self.data_events + .push((flow_key.clone(), direction, data.to_vec(), offset)); + } + + fn on_flow_close(&mut self, flow_key: &FlowKey, reason: CloseReason) { + self.close_events.push((flow_key.clone(), reason)); + } +} + +#[allow(clippy::too_many_arguments)] +fn make_tcp_packet( + src_ip: [u8; 4], + src_port: u16, + dst_ip: [u8; 4], + dst_port: u16, + seq: u32, + payload: &[u8], + syn: bool, + fin: bool, + rst: bool, +) -> ParsedPacket { + ParsedPacket { + src_ip: IpAddr::V4(Ipv4Addr::from(src_ip)), + dst_ip: IpAddr::V4(Ipv4Addr::from(dst_ip)), + protocol: Protocol::Tcp, + transport: TransportInfo::Tcp { + src_port, + dst_port, + seq_number: seq, + syn, + ack: false, + fin, + rst, + }, + payload: payload.to_vec(), + packet_len: 54 + payload.len(), + } +} + +#[test] +fn test_three_packet_stream_ordered() { + let config = ReassemblyConfig::default(); + let mut reassembler = TcpReassembler::new(config); + let mut handler = RecordingHandler::new(); + + let client = [10, 0, 0, 1]; + let server = [10, 0, 0, 2]; + + // SYN + let syn = make_tcp_packet(client, 12345, server, 80, 1000, &[], true, false, false); + reassembler.process_packet(&syn, 1, &mut handler); + + // Data packets + let p1 = make_tcp_packet(client, 12345, server, 80, 1001, b"aaa", false, false, false); + reassembler.process_packet(&p1, 2, &mut handler); + + let p2 = make_tcp_packet(client, 12345, server, 80, 1004, b"bbb", false, false, false); + reassembler.process_packet(&p2, 3, &mut handler); + + let p3 = make_tcp_packet(client, 12345, server, 80, 1007, b"ccc", false, false, false); + reassembler.process_packet(&p3, 4, &mut handler); + + assert_eq!(handler.all_data(), b"aaabbbccc"); + assert_eq!(handler.data_events.len(), 3); +} + +#[test] +fn test_out_of_order_delivery() { + let config = ReassemblyConfig::default(); + let mut reassembler = TcpReassembler::new(config); + let mut handler = RecordingHandler::new(); + + let client = [10, 0, 0, 1]; + let server = [10, 0, 0, 2]; + + // SYN + let syn = make_tcp_packet(client, 12345, server, 80, 1000, &[], true, false, false); + reassembler.process_packet(&syn, 1, &mut handler); + + // Send packets [1, 3, 2] + let p1 = make_tcp_packet(client, 12345, server, 80, 1001, b"aaa", false, false, false); + reassembler.process_packet(&p1, 2, &mut handler); + + let p3 = make_tcp_packet(client, 12345, server, 80, 1007, b"ccc", false, false, false); + reassembler.process_packet(&p3, 3, &mut handler); + assert_eq!(handler.data_events.len(), 1); // only p1 flushed + + let p2 = make_tcp_packet(client, 12345, server, 80, 1004, b"bbb", false, false, false); + reassembler.process_packet(&p2, 4, &mut handler); + + // Now all three should be flushed + assert_eq!(handler.all_data(), b"aaabbbccc"); +} + +#[test] +fn test_mid_stream_no_syn() { + let config = ReassemblyConfig::default(); + let mut reassembler = TcpReassembler::new(config); + let mut handler = RecordingHandler::new(); + + let client = [10, 0, 0, 1]; + let server = [10, 0, 0, 2]; + + // Data without SYN + let p1 = make_tcp_packet( + client, 12345, server, 80, 5000, b"hello", false, false, false, + ); + reassembler.process_packet(&p1, 1, &mut handler); + + assert_eq!(handler.all_data(), b"hello"); + + let stats = reassembler.stats(); + assert_eq!(stats.flows_total, 1); + assert_eq!(stats.flows_partial, 1); +} + +#[test] +fn test_rst_closes_flow() { + let config = ReassemblyConfig::default(); + let mut reassembler = TcpReassembler::new(config); + let mut handler = RecordingHandler::new(); + + let client = [10, 0, 0, 1]; + let server = [10, 0, 0, 2]; + + let syn = make_tcp_packet(client, 12345, server, 80, 1000, &[], true, false, false); + reassembler.process_packet(&syn, 1, &mut handler); + + let data = make_tcp_packet( + client, 12345, server, 80, 1001, b"data", false, false, false, + ); + reassembler.process_packet(&data, 2, &mut handler); + + let rst = make_tcp_packet(server, 80, client, 12345, 2000, &[], false, false, true); + reassembler.process_packet(&rst, 3, &mut handler); + + assert_eq!(handler.close_events.len(), 1); + assert_eq!(handler.close_events[0].1, CloseReason::Rst); +} + +#[test] +fn test_finalize_flushes_remaining() { + let config = ReassemblyConfig::default(); + let mut reassembler = TcpReassembler::new(config); + let mut handler = RecordingHandler::new(); + + let client = [10, 0, 0, 1]; + let server = [10, 0, 0, 2]; + + let syn = make_tcp_packet(client, 12345, server, 80, 1000, &[], true, false, false); + reassembler.process_packet(&syn, 1, &mut handler); + + let data = make_tcp_packet( + client, + 12345, + server, + 80, + 1001, + b"leftover", + false, + false, + false, + ); + reassembler.process_packet(&data, 2, &mut handler); + + reassembler.finalize(&mut handler); + + assert_eq!(handler.close_events.len(), 1); + assert_eq!(handler.close_events[0].1, CloseReason::Timeout); +} + +#[test] +fn test_flow_timeout_expiration() { + let config = ReassemblyConfig { + flow_timeout_secs: 10, + ..ReassemblyConfig::default() + }; + let mut reassembler = TcpReassembler::new(config); + let mut handler = RecordingHandler::new(); + + let client = [10, 0, 0, 1]; + let server = [10, 0, 0, 2]; + + let syn = make_tcp_packet(client, 12345, server, 80, 1000, &[], true, false, false); + reassembler.process_packet(&syn, 100, &mut handler); + + // Expire at time 200 (100 seconds later, > 10s timeout) + reassembler.expire_flows(200, &mut handler); + + assert_eq!(handler.close_events.len(), 1); + assert_eq!(handler.close_events[0].1, CloseReason::Timeout); + + let stats = reassembler.stats(); + assert_eq!(stats.flows_expired, 1); +} diff --git a/tests/reassembly_flow_tests.rs b/tests/reassembly_flow_tests.rs new file mode 100644 index 0000000..34530f8 --- /dev/null +++ b/tests/reassembly_flow_tests.rs @@ -0,0 +1,101 @@ +use std::net::{IpAddr, Ipv4Addr}; + +use wirerust::reassembly::flow::{FlowDirection, FlowKey, FlowState, TcpFlow}; +use wirerust::reassembly::handler::Direction; + +#[test] +fn test_flow_key_canonicalization() { + let ip_a = IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)); + let ip_b = IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2)); + + let key_ab = FlowKey::new(ip_a, 12345, ip_b, 80); + let key_ba = FlowKey::new(ip_b, 80, ip_a, 12345); + + assert_eq!(key_ab, key_ba); + // Tuple ordering: (10.0.0.1, 12345) < (10.0.0.2, 80) since IPs differ + assert_eq!(key_ab.lower_ip, IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1))); + assert_eq!(key_ab.lower_port, 12345); + assert_eq!(key_ab.upper_ip, IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2))); + assert_eq!(key_ab.upper_port, 80); +} + +#[test] +fn test_flow_key_same_ip_different_ports() { + let ip = IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)); + + let key1 = FlowKey::new(ip, 80, ip, 12345); + let key2 = FlowKey::new(ip, 12345, ip, 80); + + assert_eq!(key1, key2); + assert_eq!(key1.lower_port, 80); + assert_eq!(key1.upper_port, 12345); +} + +#[test] +fn test_flow_direction_determines_client_server() { + let ip_client = IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)); + let ip_server = IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2)); + + let mut flow = TcpFlow::new(FlowKey::new(ip_client, 12345, ip_server, 80), 1000); + flow.set_initiator(ip_client, 12345); + + assert_eq!(flow.direction(ip_client, 12345), Direction::ClientToServer); + assert_eq!(flow.direction(ip_server, 80), Direction::ServerToClient); +} + +#[test] +fn test_flow_state_transitions() { + let ip_a = IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)); + let ip_b = IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2)); + + let mut flow = TcpFlow::new(FlowKey::new(ip_a, 12345, ip_b, 80), 1000); + assert_eq!(flow.state, FlowState::New); + + flow.on_syn(); + assert_eq!(flow.state, FlowState::SynSent); + + flow.on_syn_ack(); + assert_eq!(flow.state, FlowState::Established); + + flow.on_fin(); + assert_eq!(flow.state, FlowState::Closing); + + flow.on_fin(); + assert_eq!(flow.state, FlowState::Closed); +} + +#[test] +fn test_flow_rst_from_any_state() { + let ip_a = IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)); + let ip_b = IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2)); + + let mut flow = TcpFlow::new(FlowKey::new(ip_a, 12345, ip_b, 80), 1000); + flow.on_syn(); + assert_eq!(flow.state, FlowState::SynSent); + + flow.on_rst(); + assert_eq!(flow.state, FlowState::Closed); +} + +#[test] +fn test_mid_stream_pickup() { + let ip_a = IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)); + let ip_b = IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2)); + + let mut flow = TcpFlow::new(FlowKey::new(ip_a, 12345, ip_b, 80), 1000); + flow.on_data_without_syn(); + assert_eq!(flow.state, FlowState::Established); + assert!(flow.partial); +} + +#[test] +fn test_flow_direction_new() { + let dir = FlowDirection::new(); + assert_eq!(dir.isn, None); + assert_eq!(dir.base_offset, 0); + assert!(dir.segments.is_empty()); + assert_eq!(dir.reassembled_bytes, 0); + assert!(!dir.fin_seen); + assert!(!dir.rst_seen); + assert!(!dir.depth_exceeded); +} diff --git a/tests/reassembly_segment_tests.rs b/tests/reassembly_segment_tests.rs new file mode 100644 index 0000000..1ab0d12 --- /dev/null +++ b/tests/reassembly_segment_tests.rs @@ -0,0 +1,172 @@ +use wirerust::reassembly::flow::FlowDirection; +use wirerust::reassembly::segment::{InsertResult, flush_contiguous, insert_segment}; + +#[test] +fn test_insert_single_segment() { + let mut dir = FlowDirection::new(); + dir.set_isn(1000); + + let result = insert_segment(&mut dir, 1001, b"hello", 10_485_760, 10_000); + assert_eq!(result, InsertResult::Inserted); + assert_eq!(dir.segments.len(), 1); + assert_eq!(dir.segments.get(&1), Some(&b"hello".to_vec())); +} + +#[test] +fn test_flush_contiguous_single() { + let mut dir = FlowDirection::new(); + dir.set_isn(1000); + + insert_segment(&mut dir, 1001, b"hello", 10_485_760, 10_000); + + let flushed = flush_contiguous(&mut dir); + assert_eq!(flushed.len(), 1); + assert_eq!(flushed[0].0, 1); // offset + assert_eq!(flushed[0].1, b"hello"); + assert_eq!(dir.base_offset, 6); // 1 + 5 + assert_eq!(dir.reassembled_bytes, 5); + assert!(dir.segments.is_empty()); +} + +#[test] +fn test_flush_contiguous_ordered() { + let mut dir = FlowDirection::new(); + dir.set_isn(1000); + + insert_segment(&mut dir, 1001, b"aaa", 10_485_760, 10_000); + insert_segment(&mut dir, 1004, b"bbb", 10_485_760, 10_000); + + let flushed = flush_contiguous(&mut dir); + assert_eq!(flushed.len(), 2); + assert_eq!(flushed[0].1, b"aaa"); + assert_eq!(flushed[1].1, b"bbb"); + assert_eq!(dir.base_offset, 7); // 1 + 3 + 3 + assert!(dir.segments.is_empty()); +} + +#[test] +fn test_out_of_order_buffering() { + let mut dir = FlowDirection::new(); + dir.set_isn(1000); + + // Insert segment 2 first (out of order) + insert_segment(&mut dir, 1004, b"bbb", 10_485_760, 10_000); + let flushed = flush_contiguous(&mut dir); + assert!(flushed.is_empty()); // Can't flush — gap at offset 1 + + // Now insert segment 1 + insert_segment(&mut dir, 1001, b"aaa", 10_485_760, 10_000); + let flushed = flush_contiguous(&mut dir); + assert_eq!(flushed.len(), 2); // Both flush now + assert_eq!(flushed[0].1, b"aaa"); + assert_eq!(flushed[1].1, b"bbb"); + assert_eq!(dir.base_offset, 7); +} + +#[test] +fn test_retransmission_dedup() { + let mut dir = FlowDirection::new(); + dir.set_isn(1000); + + insert_segment(&mut dir, 1001, b"hello", 10_485_760, 10_000); + let result = insert_segment(&mut dir, 1001, b"hello", 10_485_760, 10_000); + assert_eq!(result, InsertResult::Duplicate); + assert_eq!(dir.segments.len(), 1); // No duplicate stored +} + +#[test] +fn test_overlap_first_wins() { + let mut dir = FlowDirection::new(); + dir.set_isn(1000); + + // Insert "AAABBB" at offset 1 + insert_segment(&mut dir, 1001, b"AAABBB", 10_485_760, 10_000); + + // Overlapping insert: "XXXCC" at offset 4 (overlaps with "BBB" at 4-6) + let result = insert_segment(&mut dir, 1004, b"XXXCC", 10_485_760, 10_000); + assert_eq!(result, InsertResult::PartialOverlap); + assert_eq!(dir.overlap_count, 1); + + // Flush and verify: first 6 bytes from original, then "CC" from new + let flushed = flush_contiguous(&mut dir); + let all_bytes: Vec = flushed + .iter() + .flat_map(|(_, data)| data.iter().copied()) + .collect(); + assert_eq!(&all_bytes, b"AAABBBCC"); +} + +#[test] +fn test_overlap_conflicting_data_detected() { + let mut dir = FlowDirection::new(); + dir.set_isn(1000); + + insert_segment(&mut dir, 1001, b"AAAA", 10_485_760, 10_000); + + // Same range, different data + let result = insert_segment(&mut dir, 1001, b"BBBB", 10_485_760, 10_000); + assert_eq!(result, InsertResult::ConflictingOverlap); + assert_eq!(dir.overlap_count, 1); + + // Original data preserved (first-wins) + let flushed = flush_contiguous(&mut dir); + assert_eq!(flushed[0].1, b"AAAA"); +} + +#[test] +fn test_sequence_wraparound() { + let mut dir = FlowDirection::new(); + // ISN near wraparound + dir.set_isn(0xFFFF_FFF0); + + // First data byte at ISN+1 = 0xFFFF_FFF1, offset = 1 + insert_segment(&mut dir, 0xFFFF_FFF1, b"before", 10_485_760, 10_000); + // Next segment wraps: seq = 0xFFFF_FFF1 + 6 = 0xFFFF_FFF7, offset = 7 + insert_segment(&mut dir, 0xFFFF_FFF7, b"wrap", 10_485_760, 10_000); + // Another after wrap: seq = 0xFFFF_FFFB, offset = 11 + insert_segment(&mut dir, 0xFFFF_FFFB, b"around", 10_485_760, 10_000); + + let flushed = flush_contiguous(&mut dir); + let all_bytes: Vec = flushed + .iter() + .flat_map(|(_, data)| data.iter().copied()) + .collect(); + assert_eq!(&all_bytes, b"beforewraparound"); +} + +#[test] +fn test_small_segment_tracking() { + let mut dir = FlowDirection::new(); + dir.set_isn(1000); + + // Insert small segments + for i in 0..5u32 { + let seq = 1001 + i; + insert_segment(&mut dir, seq, b"a", 10_485_760, 10_000); + } + + assert_eq!(dir.small_segment_count, 5); +} + +#[test] +fn test_depth_limit_truncation() { + let mut dir = FlowDirection::new(); + dir.set_isn(1000); + + let max_depth: usize = 100; // small for testing + let data = vec![b'A'; 80]; + insert_segment(&mut dir, 1001, &data, max_depth, 10_000); + flush_contiguous(&mut dir); + assert_eq!(dir.reassembled_bytes, 80); + assert!(!dir.depth_exceeded); + + // This should be truncated to 20 bytes + let data2 = vec![b'B'; 50]; + let result = insert_segment(&mut dir, 1081, &data2, max_depth, 10_000); + assert_eq!(result, InsertResult::Truncated); + assert!(dir.depth_exceeded); + + let flushed = flush_contiguous(&mut dir); + assert_eq!(flushed[0].1.len(), 20); // truncated from 50 to 20 + assert_eq!(dir.reassembled_bytes, 100); +} diff --git a/tests/summary_tests.rs b/tests/summary_tests.rs index c938310..d8c8f4f 100644 --- a/tests/summary_tests.rs +++ b/tests/summary_tests.rs @@ -11,6 +11,7 @@ fn make_parsed(src: [u8; 4], dst: [u8; 4], src_port: u16, dst_port: u16) -> Pars transport: TransportInfo::Tcp { src_port, dst_port, + seq_number: 1000, syn: false, ack: false, fin: false,