diff --git a/docs/superpowers/plans/2026-04-06-reassembly-perf.md b/docs/superpowers/plans/2026-04-06-reassembly-perf.md new file mode 100644 index 0000000..d8b9165 --- /dev/null +++ b/docs/superpowers/plans/2026-04-06-reassembly-perf.md @@ -0,0 +1,783 @@ +# TCP Reassembly Performance Optimization Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Replace three O(n) hotspots in the TCP reassembly engine with O(1)/O(log n) operations using incremental counters and BTreeMap range queries. + +**Architecture:** Add `buffered_bytes: usize` field to `FlowDirection` for incremental byte tracking. Replace `segments.iter()` with `segments.range(..new_end)` for overlap detection. Remove `update_memory()` and track `total_memory` via deltas at each mutation site. + +**Tech Stack:** Rust std `BTreeMap::range()`, no new dependencies. + +--- + +## File Structure + +| File | Responsibility | Changes | +|------|---------------|---------| +| `src/reassembly/flow.rs` | Flow/direction data structures | Add `buffered_bytes` field, update `memory_used()` | +| `src/reassembly/segment.rs` | Segment insert/flush operations | Track `buffered_bytes` on insert/flush, use `range()`, replace O(n) sum | +| `src/reassembly/mod.rs` | Reassembly engine orchestration | Remove `update_memory()`, add inline delta tracking | +| `tests/reassembly_segment_tests.rs` | Segment operation tests | Add `buffered_bytes` assertions to existing + new tests | +| `tests/reassembly_flow_tests.rs` | Flow structure tests | Add `buffered_bytes` initialization assertion | +| `tests/reassembly_engine_tests.rs` | Engine integration tests | Add `total_memory` tracking test | + +--- + +### Task 1: Add `buffered_bytes` field and track on segment insert + +**Files:** +- Modify: `src/reassembly/flow.rs:56-91` +- Modify: `src/reassembly/segment.rs:145-178` +- Modify: `tests/reassembly_segment_tests.rs` +- Modify: `tests/reassembly_flow_tests.rs` + +- [ ] **Step 1: Add `buffered_bytes` field to `FlowDirection`** + +In `src/reassembly/flow.rs`, add the field to the struct (after `segments` on line 59): + +```rust +pub struct FlowDirection { + pub isn: Option, + pub base_offset: u64, + pub segments: BTreeMap>, + pub buffered_bytes: usize, + pub reassembled_bytes: usize, + pub overlap_count: u32, + pub overlap_alert_fired: bool, + pub small_segment_count: u32, + pub small_segment_alert_fired: bool, + pub fin_seen: bool, + pub rst_seen: bool, + pub depth_exceeded: bool, +} +``` + +Initialize to 0 in `FlowDirection::new()`: + +```rust +pub fn new() -> Self { + FlowDirection { + isn: None, + base_offset: 0, + segments: BTreeMap::new(), + buffered_bytes: 0, + reassembled_bytes: 0, + overlap_count: 0, + overlap_alert_fired: false, + small_segment_count: 0, + small_segment_alert_fired: false, + fin_seen: false, + rst_seen: false, + depth_exceeded: false, + } +} +``` + +- [ ] **Step 2: Add `buffered_bytes` assertion to existing flow init test** + +In `tests/reassembly_flow_tests.rs`, add this assertion to `test_flow_direction_new`: + +```rust +#[test] +fn test_flow_direction_new() { + let dir = FlowDirection::new(); + assert_eq!(dir.isn, None); + assert_eq!(dir.base_offset, 0); + assert!(dir.segments.is_empty()); + assert_eq!(dir.buffered_bytes, 0); + assert_eq!(dir.reassembled_bytes, 0); + assert!(!dir.fin_seen); + assert!(!dir.rst_seen); + assert!(!dir.depth_exceeded); +} +``` + +- [ ] **Step 3: Write failing test for `buffered_bytes` after insert** + +In `tests/reassembly_segment_tests.rs`, add: + +```rust +#[test] +fn test_buffered_bytes_after_insert() { + let mut dir = FlowDirection::new(); + dir.set_isn(1000); + + insert_segment(&mut dir, 1001, b"hello", 10_485_760, 10_000); + assert_eq!(dir.buffered_bytes, 5); + + insert_segment(&mut dir, 1006, b"world", 10_485_760, 10_000); + assert_eq!(dir.buffered_bytes, 10); +} +``` + +- [ ] **Step 4: Run test to verify it fails** + +Run: `cargo test test_buffered_bytes_after_insert -- --nocapture` +Expected: FAIL — `buffered_bytes` is 0 (not updated yet). + +- [ ] **Step 5: Write failing test for `buffered_bytes` after overlapping insert** + +In `tests/reassembly_segment_tests.rs`, add: + +```rust +#[test] +fn test_buffered_bytes_after_overlap() { + let mut dir = FlowDirection::new(); + dir.set_isn(1000); + + // Insert "AAABBB" at offset 1 (6 bytes) + insert_segment(&mut dir, 1001, b"AAABBB", 10_485_760, 10_000); + assert_eq!(dir.buffered_bytes, 6); + + // Overlapping insert: "XXXCC" at offset 4 — only "CC" (2 bytes) is new + insert_segment(&mut dir, 1004, b"XXXCC", 10_485_760, 10_000); + assert_eq!(dir.buffered_bytes, 8); // 6 original + 2 gap bytes +} +``` + +- [ ] **Step 6: Update `insert_segment` to track `buffered_bytes`** + +In `src/reassembly/segment.rs`, modify the normal insert at line 170-171: + +```rust + // No overlap — insert normally + let data_len = segment_data.len(); + if let Some(old) = dir.segments.insert(offset, segment_data) { + dir.buffered_bytes -= old.len(); + } + dir.buffered_bytes += data_len; +``` + +And the gap insert at line 153-157, replace: + +```rust + let gap_data = segment_data[start_idx..end_idx].to_vec(); + if !gap_data.is_empty() { + dir.segments.insert(gap_start, gap_data); + } +``` + +with: + +```rust + let gap_data = segment_data[start_idx..end_idx].to_vec(); + if !gap_data.is_empty() { + let gap_len = gap_data.len(); + if let Some(old) = dir.segments.insert(gap_start, gap_data) { + dir.buffered_bytes -= old.len(); + } + dir.buffered_bytes += gap_len; + } +``` + +- [ ] **Step 7: Run tests to verify they pass** + +Run: `cargo test test_buffered_bytes -- --nocapture` +Expected: PASS for both `test_buffered_bytes_after_insert` and `test_buffered_bytes_after_overlap`. + +- [ ] **Step 8: Run all existing segment tests** + +Run: `cargo test reassembly_segment -- --nocapture` +Expected: All existing tests PASS (behavior unchanged). + +- [ ] **Step 9: Commit** + +```bash +git add src/reassembly/flow.rs src/reassembly/segment.rs tests/reassembly_segment_tests.rs tests/reassembly_flow_tests.rs +git commit -m "perf: add buffered_bytes field and track on segment insert" +``` + +--- + +### Task 2: Track `buffered_bytes` on flush and update `memory_used()` + +**Files:** +- Modify: `src/reassembly/segment.rs:180-193` +- Modify: `src/reassembly/flow.rs:107-109` +- Modify: `tests/reassembly_segment_tests.rs` + +- [ ] **Step 1: Write failing test for `buffered_bytes` after insert + flush** + +In `tests/reassembly_segment_tests.rs`, add: + +```rust +#[test] +fn test_buffered_bytes_after_flush() { + let mut dir = FlowDirection::new(); + dir.set_isn(1000); + + insert_segment(&mut dir, 1001, b"hello", 10_485_760, 10_000); + assert_eq!(dir.buffered_bytes, 5); + + let flushed = flush_contiguous(&mut dir); + assert_eq!(flushed.len(), 1); + assert_eq!(dir.buffered_bytes, 0); +} + +#[test] +fn test_buffered_bytes_partial_flush() { + let mut dir = FlowDirection::new(); + dir.set_isn(1000); + + // Insert segment at offset 1 (contiguous) and offset 10 (gap) + insert_segment(&mut dir, 1001, b"aaa", 10_485_760, 10_000); + insert_segment(&mut dir, 1010, b"bbb", 10_485_760, 10_000); + assert_eq!(dir.buffered_bytes, 6); + + // Flush only flushes contiguous segment at offset 1 + let flushed = flush_contiguous(&mut dir); + assert_eq!(flushed.len(), 1); + assert_eq!(flushed[0].1, b"aaa"); + assert_eq!(dir.buffered_bytes, 3); // "bbb" remains buffered +} +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `cargo test test_buffered_bytes_after_flush test_buffered_bytes_partial_flush -- --nocapture` +Expected: FAIL — `flush_contiguous` doesn't update `buffered_bytes` yet. + +- [ ] **Step 3: Update `flush_contiguous` to decrement `buffered_bytes`** + +In `src/reassembly/segment.rs`, modify `flush_contiguous`: + +```rust +pub fn flush_contiguous(dir: &mut FlowDirection) -> Vec<(u64, Vec)> { + let mut flushed = Vec::new(); + + while let Some(data) = dir.segments.remove(&dir.base_offset) { + let offset = dir.base_offset; + dir.buffered_bytes -= data.len(); + dir.base_offset += data.len() as u64; + dir.reassembled_bytes += data.len(); + flushed.push((offset, data)); + } + + flushed +} +``` + +- [ ] **Step 4: Run flush tests to verify they pass** + +Run: `cargo test test_buffered_bytes_after_flush test_buffered_bytes_partial_flush -- --nocapture` +Expected: PASS. + +- [ ] **Step 5: Update `memory_used()` with debug assertion** + +In `src/reassembly/flow.rs`, replace `memory_used()`: + +```rust + pub fn memory_used(&self) -> usize { + debug_assert_eq!( + self.buffered_bytes, + self.segments.values().map(|v| v.len()).sum::(), + "buffered_bytes counter drifted from actual segment sizes" + ); + self.buffered_bytes + } +``` + +- [ ] **Step 6: Run all tests to verify debug_assert doesn't fire** + +Run: `cargo test -- --nocapture` +Expected: All tests PASS. The `debug_assert_eq!` verifies `buffered_bytes` matches the recomputed sum on every `memory_used()` call during tests. + +- [ ] **Step 7: Commit** + +```bash +git add src/reassembly/segment.rs src/reassembly/flow.rs tests/reassembly_segment_tests.rs +git commit -m "perf: track buffered_bytes on flush and add debug assertion in memory_used()" +``` + +--- + +### Task 3: Replace O(n) buffered sum with field access and use `range()` for overlap detection + +**Files:** +- Modify: `src/reassembly/segment.rs:62,85` +- Modify: `tests/reassembly_segment_tests.rs` + +- [ ] **Step 1: Write test for range-based overlap boundary** + +In `tests/reassembly_segment_tests.rs`, add: + +```rust +#[test] +fn test_overlap_detection_boundary() { + let mut dir = FlowDirection::new(); + dir.set_isn(1000); + + // Insert segment at offset 1, length 5 (covers 1-5) + insert_segment(&mut dir, 1001, b"AAAAA", 10_485_760, 10_000); + // Insert segment at offset 10, length 5 (covers 10-14) — no overlap with above + insert_segment(&mut dir, 1010, b"BBBBB", 10_485_760, 10_000); + assert_eq!(dir.segments.len(), 2); + assert_eq!(dir.overlap_count, 0); + + // Insert segment at offset 3, length 4 (covers 3-6) — overlaps first, not second + let result = insert_segment(&mut dir, 1003, b"XXXX", 10_485_760, 10_000); + assert_eq!(result, InsertResult::PartialOverlap); + assert_eq!(dir.overlap_count, 1); + + // Insert segment at offset 6, length 4 (covers 6-9) — no overlap with either + let result = insert_segment(&mut dir, 1006, b"CCCC", 10_485_760, 10_000); + assert_eq!(result, InsertResult::Inserted); + assert_eq!(dir.overlap_count, 1); // unchanged +} +``` + +- [ ] **Step 2: Run test to verify it passes (establishes baseline)** + +Run: `cargo test test_overlap_detection_boundary -- --nocapture` +Expected: PASS (current O(n) logic produces correct results). + +- [ ] **Step 3: Replace O(n) buffered sum with field access** + +In `src/reassembly/segment.rs`, line 62, replace: + +```rust + let buffered: usize = dir.segments.values().map(|v| v.len()).sum(); +``` + +with: + +```rust + let buffered = dir.buffered_bytes; +``` + +- [ ] **Step 4: Replace `iter()` with `range(..new_end)` for overlap detection** + +In `src/reassembly/segment.rs`, line 85, replace: + +```rust + for (&existing_offset, existing_data) in dir.segments.iter() { +``` + +with: + +```rust + for (&existing_offset, existing_data) in dir.segments.range(..new_end) { +``` + +- [ ] **Step 5: Run all segment tests** + +Run: `cargo test reassembly_segment -- --nocapture` +Expected: All PASS — including `test_overlap_detection_boundary`, `test_overlap_first_wins`, `test_overlap_conflicting_data_detected`, `test_retransmission_dedup`. + +- [ ] **Step 6: Run full test suite** + +Run: `cargo test -- --nocapture` +Expected: All PASS. + +- [ ] **Step 7: Commit** + +```bash +git add src/reassembly/segment.rs tests/reassembly_segment_tests.rs +git commit -m "perf: use BTreeMap::range() for overlap detection and replace O(n) buffered sum" +``` + +--- + +### Task 4: Remove `update_memory()` and add incremental `total_memory` tracking + +**Files:** +- Modify: `src/reassembly/mod.rs` +- Modify: `tests/reassembly_engine_tests.rs` + +This is the largest task. It replaces 5 `update_memory()` call sites with inline delta tracking. The key patterns are: + +**For insert+flush (no removal):** Track deltas before/after each operation. +**For flush+remove paths (RST, FIN, expire, finalize, evict):** Capture flow's `memory_used()` before flushing, subtract that amount on removal. This is simpler than tracking individual flush deltas because the entire flow is about to be destroyed. + +- [ ] **Step 1: Write test verifying `total_memory` tracking** + +In `tests/reassembly_engine_tests.rs`, add: + +```rust +#[test] +fn test_total_memory_tracking() { + let config = ReassemblyConfig::default(); + let mut reassembler = TcpReassembler::new(config); + let mut handler = RecordingHandler::new(); + + let client = [10, 0, 0, 1]; + let server = [10, 0, 0, 2]; + + // SYN — no payload, no memory change + let syn = make_tcp_packet(client, 12345, server, 80, 1000, &[], true, false, false); + reassembler.process_packet(&syn, 1, &mut handler); + + // Out-of-order segment — buffered (not flushed) + let p2 = make_tcp_packet(client, 12345, server, 80, 1004, b"bbb", false, false, false); + reassembler.process_packet(&p2, 2, &mut handler); + // "bbb" is buffered (3 bytes) because offset 1 is missing + assert!(handler.data_events.is_empty()); // nothing flushed yet + + // In-order segment — triggers flush of both + let p1 = make_tcp_packet(client, 12345, server, 80, 1001, b"aaa", false, false, false); + reassembler.process_packet(&p1, 3, &mut handler); + // Both segments flushed — total_memory should be 0 + assert_eq!(handler.all_data(), b"aaabbb"); + + // Finalize — closes flow, should leave total_memory at 0 + reassembler.finalize(&mut handler); +} +``` + +Note: The PR exposes `TcpReassembler::total_memory()` publicly, so the test asserts on it directly at each checkpoint (after buffering, after flush, after finalize). + +- [ ] **Step 2: Run test to verify it passes with current `update_memory()`** + +Run: `cargo test test_total_memory_tracking -- --nocapture` +Expected: PASS (baseline with current O(n) recomputation). + +- [ ] **Step 3: Update payload processing path (process_packet lines 211-291)** + +In `src/reassembly/mod.rs`, replace the insert + flush section. Find the block starting around line 211: + +```rust + let flow_dir = flow.get_direction_mut(dir); + let result = insert_segment( + flow_dir, + seq, + payload, + self.config.max_depth, + self.config.max_segments_per_direction, + ); +``` + +Replace with: + +```rust + let flow_dir = flow.get_direction_mut(dir); + let before_insert = flow_dir.buffered_bytes; + let result = insert_segment( + flow_dir, + seq, + payload, + self.config.max_depth, + self.config.max_segments_per_direction, + ); + self.total_memory += flow_dir.buffered_bytes - before_insert; +``` + +Then find the flush section around line 283: + +```rust + let flow = self.flows.get_mut(&key).unwrap(); + let flow_dir = flow.get_direction_mut(dir); + let flushed = flush_contiguous(flow_dir); + + for (offset, data) in &flushed { + self.stats.bytes_reassembled += data.len() as u64; + handler.on_data(&key, dir, data, *offset); + } +``` + +Replace with: + +```rust + let flow = self.flows.get_mut(&key).unwrap(); + let flow_dir = flow.get_direction_mut(dir); + let before_flush = flow_dir.buffered_bytes; + let flushed = flush_contiguous(flow_dir); + self.total_memory -= before_flush - flow_dir.buffered_bytes; + + for (offset, data) in &flushed { + self.stats.bytes_reassembled += data.len() as u64; + handler.on_data(&key, dir, data, *offset); + } +``` + +Remove the `self.update_memory();` call at line 317 (after the payload section). + +- [ ] **Step 4: Update RST handler (process_packet lines 159-179)** + +Find the RST handler block: + +```rust + if rst { + flow.on_rst(); + self.stats.flows_rst += 1; + let key_clone = key.clone(); + // Flush buffered contiguous data before removing + if let Some(flow) = self.flows.get_mut(&key_clone) { + use crate::reassembly::handler::Direction; + for dir in [Direction::ClientToServer, Direction::ServerToClient] { + let flow_dir = flow.get_direction_mut(dir); + let flushed = flush_contiguous(flow_dir); + for (offset, data) in &flushed { + self.stats.bytes_reassembled += data.len() as u64; + handler.on_data(&key_clone, dir, data, *offset); + } + } + } + handler.on_flow_close(&key_clone, CloseReason::Rst); + self.flows.remove(&key_clone); + self.update_memory(); + return; + } +``` + +Replace with: + +```rust + if rst { + flow.on_rst(); + self.stats.flows_rst += 1; + let key_clone = key.clone(); + let flow_mem = self.flows.get(&key_clone).map(|f| f.memory_used()).unwrap_or(0); + // Flush buffered contiguous data before removing + if let Some(flow) = self.flows.get_mut(&key_clone) { + use crate::reassembly::handler::Direction; + for dir in [Direction::ClientToServer, Direction::ServerToClient] { + let flow_dir = flow.get_direction_mut(dir); + let flushed = flush_contiguous(flow_dir); + for (offset, data) in &flushed { + self.stats.bytes_reassembled += data.len() as u64; + handler.on_data(&key_clone, dir, data, *offset); + } + } + } + handler.on_flow_close(&key_clone, CloseReason::Rst); + self.flows.remove(&key_clone); + self.total_memory -= flow_mem; + return; + } +``` + +Pattern: capture `memory_used()` before flushing, subtract on removal. This correctly accounts for both flushed bytes and any remaining non-contiguous bytes. + +- [ ] **Step 5: Update FIN-closed flow removal (process_packet lines 294-314)** + +Find the FIN closure block: + +```rust + if self + .flows + .get(&key) + .is_some_and(|f| f.state == FlowState::Closed) + { + // Flush remaining data in both directions before removal + if let Some(flow) = self.flows.get_mut(&key) { + use crate::reassembly::handler::Direction; + for dir in [Direction::ClientToServer, Direction::ServerToClient] { + let flow_dir = flow.get_direction_mut(dir); + let flushed = flush_contiguous(flow_dir); + for (offset, data) in &flushed { + self.stats.bytes_reassembled += data.len() as u64; + handler.on_data(&key, dir, data, *offset); + } + } + } + self.stats.flows_fin += 1; + handler.on_flow_close(&key, CloseReason::Fin); + self.flows.remove(&key); + } +``` + +Replace with: + +```rust + if self + .flows + .get(&key) + .is_some_and(|f| f.state == FlowState::Closed) + { + let flow_mem = self.flows.get(&key).map(|f| f.memory_used()).unwrap_or(0); + // Flush remaining data in both directions before removal + if let Some(flow) = self.flows.get_mut(&key) { + use crate::reassembly::handler::Direction; + for dir in [Direction::ClientToServer, Direction::ServerToClient] { + let flow_dir = flow.get_direction_mut(dir); + let flushed = flush_contiguous(flow_dir); + for (offset, data) in &flushed { + self.stats.bytes_reassembled += data.len() as u64; + handler.on_data(&key, dir, data, *offset); + } + } + } + self.stats.flows_fin += 1; + handler.on_flow_close(&key, CloseReason::Fin); + self.flows.remove(&key); + self.total_memory -= flow_mem; + } +``` + +- [ ] **Step 6: Update `expire_flows` (lines 326-356)** + +Find `expire_flows`: + +```rust + for key in expired_keys { + // Flush salvageable data before removing + if let Some(flow) = self.flows.get_mut(&key) { + use crate::reassembly::handler::Direction; + for dir in [Direction::ClientToServer, Direction::ServerToClient] { + let flow_dir = flow.get_direction_mut(dir); + let flushed = flush_contiguous(flow_dir); + for (offset, data) in &flushed { + handler.on_data(&key, dir, data, *offset); + } + } + } + self.flows.remove(&key); + self.stats.flows_expired += 1; + handler.on_flow_close(&key, CloseReason::Timeout); + } + + self.update_memory(); +``` + +Replace with: + +```rust + for key in expired_keys { + let flow_mem = self.flows.get(&key).map(|f| f.memory_used()).unwrap_or(0); + // Flush salvageable data before removing + if let Some(flow) = self.flows.get_mut(&key) { + use crate::reassembly::handler::Direction; + for dir in [Direction::ClientToServer, Direction::ServerToClient] { + let flow_dir = flow.get_direction_mut(dir); + let flushed = flush_contiguous(flow_dir); + for (offset, data) in &flushed { + handler.on_data(&key, dir, data, *offset); + } + } + } + self.flows.remove(&key); + self.total_memory -= flow_mem; + self.stats.flows_expired += 1; + handler.on_flow_close(&key, CloseReason::Timeout); + } +``` + +- [ ] **Step 7: Update `finalize` (lines 358-377)** + +Find `finalize`: + +```rust + let all_keys: Vec = self.flows.keys().cloned().collect(); + for key in all_keys { + // Flush any remaining contiguous data before closing + if let Some(flow) = self.flows.get_mut(&key) { + for dir in [Direction::ClientToServer, Direction::ServerToClient] { + let flow_dir = flow.get_direction_mut(dir); + let flushed = flush_contiguous(flow_dir); + for (offset, data) in &flushed { + handler.on_data(&key, dir, data, *offset); + } + } + } + self.flows.remove(&key); + handler.on_flow_close(&key, CloseReason::Timeout); + } + self.update_memory(); +``` + +Replace with: + +```rust + let all_keys: Vec = self.flows.keys().cloned().collect(); + for key in all_keys { + let flow_mem = self.flows.get(&key).map(|f| f.memory_used()).unwrap_or(0); + // Flush any remaining contiguous data before closing + if let Some(flow) = self.flows.get_mut(&key) { + for dir in [Direction::ClientToServer, Direction::ServerToClient] { + let flow_dir = flow.get_direction_mut(dir); + let flushed = flush_contiguous(flow_dir); + for (offset, data) in &flushed { + handler.on_data(&key, dir, data, *offset); + } + } + } + self.flows.remove(&key); + self.total_memory -= flow_mem; + handler.on_flow_close(&key, CloseReason::Timeout); + } +``` + +- [ ] **Step 8: Update `evict_flows` (lines 398-436)** + +Find the eviction loop: + +```rust + for (key, _, _) in &candidates { + if self.total_memory <= self.config.memcap && self.flows.len() <= self.config.max_flows + { + break; + } + // Flush salvageable contiguous data before evicting + if let Some(flow) = self.flows.get_mut(key) { + use crate::reassembly::handler::Direction; + for dir in [Direction::ClientToServer, Direction::ServerToClient] { + let flow_dir = flow.get_direction_mut(dir); + let flushed = flush_contiguous(flow_dir); + for (offset, data) in &flushed { + handler.on_data(key, dir, data, *offset); + } + } + } + self.flows.remove(key); + self.stats.evictions += 1; + handler.on_flow_close(key, CloseReason::MemoryPressure); + self.update_memory(); + } +``` + +Replace with: + +```rust + for (key, _, _) in &candidates { + if self.total_memory <= self.config.memcap && self.flows.len() <= self.config.max_flows + { + break; + } + let flow_mem = self.flows.get(key).map(|f| f.memory_used()).unwrap_or(0); + // Flush salvageable contiguous data before evicting + if let Some(flow) = self.flows.get_mut(key) { + use crate::reassembly::handler::Direction; + for dir in [Direction::ClientToServer, Direction::ServerToClient] { + let flow_dir = flow.get_direction_mut(dir); + let flushed = flush_contiguous(flow_dir); + for (offset, data) in &flushed { + handler.on_data(key, dir, data, *offset); + } + } + } + self.flows.remove(key); + self.total_memory -= flow_mem; + self.stats.evictions += 1; + handler.on_flow_close(key, CloseReason::MemoryPressure); + } +``` + +- [ ] **Step 9: Remove `update_memory()` function** + +In `src/reassembly/mod.rs`, delete the `update_memory` function entirely: + +```rust + // DELETE THIS FUNCTION: + fn update_memory(&mut self) { + self.total_memory = self.flows.values().map(|f| f.memory_used()).sum(); + } +``` + +- [ ] **Step 10: Run full test suite** + +Run: `cargo test -- --nocapture` +Expected: All tests PASS — including existing engine tests (`test_three_packet_stream_ordered`, `test_out_of_order_delivery`, `test_rst_closes_flow`, `test_finalize_flushes_remaining`, `test_flow_timeout_expiration`) and the new `test_total_memory_tracking`. + +The `debug_assert_eq!` in `memory_used()` fires on every flow removal during tests, verifying that incremental `buffered_bytes` matches the recomputed sum. + +- [ ] **Step 11: Run clippy and fmt** + +Run: `cargo clippy -- -D warnings && cargo fmt --check` +Expected: No warnings, no formatting issues. + +- [ ] **Step 12: Commit** + +```bash +git add src/reassembly/mod.rs tests/reassembly_engine_tests.rs +git commit -m "perf: remove update_memory() and track total_memory incrementally" +``` diff --git a/docs/superpowers/specs/2026-04-06-reassembly-perf-design.md b/docs/superpowers/specs/2026-04-06-reassembly-perf-design.md new file mode 100644 index 0000000..6fc38bb --- /dev/null +++ b/docs/superpowers/specs/2026-04-06-reassembly-perf-design.md @@ -0,0 +1,154 @@ +# TCP Reassembly Performance Optimization Design + +**Issue:** #11 — perf: use BTreeMap::range() and incremental memory tracking in reassembly +**Scope:** Three targeted O(n) → O(1)/O(log n) optimizations in the reassembly engine internals. No behavioral changes. Adds `total_memory()` public accessor for testability. + +## Problem + +Before these optimizations, the TCP reassembly engine had three O(n) hotspots identified during PR #10 review: + +1. **Overlap detection** (`segment.rs:85`): iterated all segments in the BTreeMap per insert, even though only segments starting before `new_end` could overlap. +2. **`buffered_bytes` computation** (`segment.rs:62`): recomputed `segments.values().map(|v| v.len()).sum()` on every insert — O(n) per call. +3. **`update_memory()` recomputation** (`mod.rs:391`): iterated all flows × all segments per direction to sum total memory — O(m × n). Called at 5 sites per packet (lines 178, 317, 355, 376, 434). + +With `max_segments_per_direction = 10,000` and `max_flows = 100,000`, these costs were bounded but wasteful. + +## Approach + +Incremental counters + BTreeMap range query. All three optimizations are independent and can be implemented/tested separately. + +## Changes + +### 1. `FlowDirection.buffered_bytes` — Incremental Counter (`flow.rs`) + +Add `pub buffered_bytes: usize` field to `FlowDirection`, initialized to 0. + +Update at every `BTreeMap::insert` and `BTreeMap::remove` site: + +**On insert** (both `segment.rs:155` gap insert and `segment.rs:171` normal insert): +```rust +let data_len = segment_data.len(); +if let Some(old) = dir.segments.insert(offset, segment_data) { + dir.buffered_bytes -= old.len(); +} +dir.buffered_bytes += data_len; +``` + +Capturing `data_len` before `insert()` is required because `segment_data` is moved into the BTreeMap. + +The `if let Some(old)` handles the theoretical edge case where `insert()` replaces an existing key. In practice, the overlap detection prevents this for the normal path (line 171), and gap computation prevents it for the gap path (line 155). The guard is defensive. + +**On flush** (`segment.rs:185`, inside `flush_contiguous`): +```rust +while let Some(data) = dir.segments.remove(&dir.base_offset) { + dir.buffered_bytes -= data.len(); + // ... existing logic ... +} +``` + +**Replace `memory_used()`** (`flow.rs:107`): +```rust +pub fn memory_used(&self) -> usize { + debug_assert_eq!( + self.buffered_bytes, + self.segments.values().map(|v| v.len()).sum::(), + "buffered_bytes counter drifted from actual segment sizes" + ); + self.buffered_bytes +} +``` + +The `debug_assert_eq!` verifies the counter in debug/test builds (O(n) check). Compiled out in release. + +**Depth check** (`segment.rs:62`): Replace O(n) summation with field access: +```rust +// Before: +let buffered: usize = dir.segments.values().map(|v| v.len()).sum(); + +// After: +let buffered = dir.buffered_bytes; +``` + +### 2. `BTreeMap::range()` for Overlap Detection (`segment.rs`) + +Replace full iteration at line 85: +```rust +// Before (O(n) — iterates all segments): +for (&existing_offset, existing_data) in dir.segments.iter() { + +// After (O(log n + k) — only segments that could overlap): +for (&existing_offset, existing_data) in dir.segments.range(..new_end) { +``` + +**Why this is correct:** A segment at `existing_offset` overlaps `[new_start, new_end)` only if `existing_offset < new_end AND new_start < existing_end`. The first condition is enforced by `range(..new_end)`. The second condition is checked inside the loop (unchanged). Segments with `existing_offset >= new_end` cannot overlap regardless of their length, because the overlap condition requires `existing_offset < new_end`. + +**No lower bound needed:** We could theoretically skip segments whose end is before `new_start`, but since segment length varies and the BTreeMap is keyed by start offset (not end), a lower bound would require knowing max segment size. The `range(..new_end)` upper bound is sufficient — the inner `new_start < existing_end` check handles the rest. + +### 3. Incremental `total_memory` Tracking (`mod.rs`) + +Remove `fn update_memory()` entirely. Replace all 5 call sites with inline delta computation. + +**Pattern for insert sites:** +```rust +let before = flow_dir.buffered_bytes; +let result = insert_segment(flow_dir, seq, payload, ...); +let delta = flow_dir.buffered_bytes - before; +self.total_memory += delta; +``` + +**Pattern for flush sites:** +```rust +let before = flow_dir.buffered_bytes; +let flushed = flush_contiguous(flow_dir); +self.total_memory -= before - flow_dir.buffered_bytes; +``` + +**Pattern for flow removal:** +```rust +if let Some(flow) = self.flows.remove(&key) { + self.total_memory -= flow.memory_used(); +} +``` + +`flow.memory_used()` is now O(1) — just `client_to_server.buffered_bytes + server_to_client.buffered_bytes`. + +**Call site mapping:** + +| Line | Context | Replacement | +|------|---------|-------------| +| 178 | After RST removal | `self.total_memory -= flow.memory_used()` before remove | +| 317 | After payload processing | Delta after insert + delta after flush | +| 355 | After `expire_flows` | Subtract removed flow memory | +| 376 | After `finalize` | Subtract removed flow memory | +| 434 | Inside `evict_flows` loop | Subtract removed flow memory | + +## Files Modified + +| File | Change | +|------|--------| +| `src/reassembly/flow.rs` | Add `buffered_bytes: usize` field, update `memory_used()` | +| `src/reassembly/segment.rs` | Use `range(..new_end)`, update `buffered_bytes` on insert/flush, replace O(n) sum | +| `src/reassembly/mod.rs` | Remove `update_memory()`, add inline delta tracking at all mutation sites | + +## Testing + +**Unit tests (new):** +1. `buffered_bytes` correct after single insert +2. `buffered_bytes` correct after insert + flush sequence +3. `buffered_bytes` correct after overlapping inserts (gap portions only) +4. `buffered_bytes` correct after replacement insert (defensive case) +5. `total_memory` delta correct after insert +6. `total_memory` correct after flow removal +7. Range-based overlap detection finds all overlapping segments +8. Range-based overlap detection skips non-overlapping segments + +**Existing tests:** All must pass unchanged. The `debug_assert_eq!` in `memory_used()` acts as a cross-check in all test runs. + +**No integration test changes** — behavior is identical, only performance characteristics change. + +## Not In Scope + +- **Per-flow cached memory** — `FlowDirection.buffered_bytes` makes `TcpFlow.memory_used()` O(1) already +- **Lower bound on range query** — diminishing returns, inner check handles it +- **Benchmarks** — can be added in a follow-up issue +- **Structural refactoring** — keeping changes minimal and targeted diff --git a/src/reassembly/flow.rs b/src/reassembly/flow.rs index 8bc3512..c7da2dd 100644 --- a/src/reassembly/flow.rs +++ b/src/reassembly/flow.rs @@ -57,6 +57,7 @@ pub struct FlowDirection { pub isn: Option, pub base_offset: u64, pub segments: BTreeMap>, + pub buffered_bytes: usize, pub reassembled_bytes: usize, pub overlap_count: u32, pub overlap_alert_fired: bool, @@ -79,6 +80,7 @@ impl FlowDirection { isn: None, base_offset: 0, segments: BTreeMap::new(), + buffered_bytes: 0, reassembled_bytes: 0, overlap_count: 0, overlap_alert_fired: false, @@ -105,7 +107,12 @@ impl FlowDirection { } pub fn memory_used(&self) -> usize { - self.segments.values().map(|v| v.len()).sum() + debug_assert_eq!( + self.buffered_bytes, + self.segments.values().map(|v| v.len()).sum::(), + "buffered_bytes counter drifted from actual segment sizes" + ); + self.buffered_bytes } } diff --git a/src/reassembly/mod.rs b/src/reassembly/mod.rs index 5927078..e6b4e05 100644 --- a/src/reassembly/mod.rs +++ b/src/reassembly/mod.rs @@ -161,6 +161,13 @@ impl TcpReassembler { flow.on_rst(); self.stats.flows_rst += 1; let key_clone = key.clone(); + // Capture memory before flushing: total_memory still holds this flow's + // full contribution. Subtracting flow_mem after removal zeros it out. + let flow_mem = self + .flows + .get(&key_clone) + .expect("flow must exist before RST removal") + .memory_used(); // Flush buffered contiguous data before removing if let Some(flow) = self.flows.get_mut(&key_clone) { use crate::reassembly::handler::Direction; @@ -175,7 +182,7 @@ impl TcpReassembler { } handler.on_flow_close(&key_clone, CloseReason::Rst); self.flows.remove(&key_clone); - self.update_memory(); + self.total_memory -= flow_mem; return; } @@ -209,6 +216,7 @@ impl TcpReassembler { } let flow_dir = flow.get_direction_mut(dir); + let before_insert = flow_dir.buffered_bytes; let result = insert_segment( flow_dir, seq, @@ -216,6 +224,13 @@ impl TcpReassembler { self.config.max_depth, self.config.max_segments_per_direction, ); + debug_assert!( + flow_dir.buffered_bytes >= before_insert, + "insert_segment decreased buffered_bytes: before={} after={}", + before_insert, + flow_dir.buffered_bytes + ); + self.total_memory += flow_dir.buffered_bytes.saturating_sub(before_insert); match result { InsertResult::Inserted => self.stats.segments_inserted += 1, @@ -282,7 +297,9 @@ impl TcpReassembler { // Flush contiguous data let flow = self.flows.get_mut(&key).unwrap(); let flow_dir = flow.get_direction_mut(dir); + let before_flush = flow_dir.buffered_bytes; let flushed = flush_contiguous(flow_dir); + self.total_memory -= before_flush - flow_dir.buffered_bytes; for (offset, data) in &flushed { self.stats.bytes_reassembled += data.len() as u64; @@ -296,6 +313,12 @@ impl TcpReassembler { .get(&key) .is_some_and(|f| f.state == FlowState::Closed) { + // Capture memory before flushing (see RST handler comment for rationale) + let flow_mem = self + .flows + .get(&key) + .expect("flow must exist before FIN removal") + .memory_used(); // Flush remaining data in both directions before removal if let Some(flow) = self.flows.get_mut(&key) { use crate::reassembly::handler::Direction; @@ -311,11 +334,9 @@ impl TcpReassembler { self.stats.flows_fin += 1; handler.on_flow_close(&key, CloseReason::Fin); self.flows.remove(&key); + self.total_memory -= flow_mem; } - // 11. Update total memory tracking - self.update_memory(); - // 12. Evict flows if memcap exceeded if self.total_memory > self.config.memcap { self.evict_flows(handler); @@ -336,6 +357,11 @@ impl TcpReassembler { .collect(); for key in expired_keys { + let flow_mem = self + .flows + .get(&key) + .expect("expired flow must exist") + .memory_used(); // Flush salvageable data before removing if let Some(flow) = self.flows.get_mut(&key) { use crate::reassembly::handler::Direction; @@ -348,11 +374,10 @@ impl TcpReassembler { } } self.flows.remove(&key); + self.total_memory -= flow_mem; self.stats.flows_expired += 1; handler.on_flow_close(&key, CloseReason::Timeout); } - - self.update_memory(); } /// Close all remaining flows (called at end of capture). @@ -360,6 +385,11 @@ impl TcpReassembler { use crate::reassembly::handler::Direction; let all_keys: Vec = self.flows.keys().cloned().collect(); for key in all_keys { + let flow_mem = self + .flows + .get(&key) + .expect("finalize flow must exist") + .memory_used(); // Flush any remaining contiguous data before closing if let Some(flow) = self.flows.get_mut(&key) { for dir in [Direction::ClientToServer, Direction::ServerToClient] { @@ -371,9 +401,9 @@ impl TcpReassembler { } } self.flows.remove(&key); + self.total_memory -= flow_mem; handler.on_flow_close(&key, CloseReason::Timeout); } - self.update_memory(); } /// Return a reference to current stats. @@ -386,12 +416,13 @@ impl TcpReassembler { &self.findings } - // --- Private helpers --- - - fn update_memory(&mut self) { - self.total_memory = self.flows.values().map(|f| f.memory_used()).sum(); + /// Return the current total memory used by all flow buffers. + pub fn total_memory(&self) -> usize { + self.total_memory } + // --- Private helpers --- + /// Evict flows when memcap is exceeded. /// Strategy: evict non-established flows first (sorted by LRU), /// then established flows by LRU. @@ -417,6 +448,11 @@ impl TcpReassembler { { break; } + let flow_mem = self + .flows + .get(key) + .expect("eviction candidate must exist") + .memory_used(); // Flush salvageable contiguous data before evicting if let Some(flow) = self.flows.get_mut(key) { use crate::reassembly::handler::Direction; @@ -429,9 +465,9 @@ impl TcpReassembler { } } self.flows.remove(key); + self.total_memory -= flow_mem; self.stats.evictions += 1; handler.on_flow_close(key, CloseReason::MemoryPressure); - self.update_memory(); } } diff --git a/src/reassembly/segment.rs b/src/reassembly/segment.rs index 409edcf..4a05072 100644 --- a/src/reassembly/segment.rs +++ b/src/reassembly/segment.rs @@ -59,7 +59,7 @@ pub fn insert_segment( let mut segment_data = data.to_vec(); // Truncate if exceeding depth - let buffered: usize = dir.segments.values().map(|v| v.len()).sum(); + let buffered = dir.buffered_bytes; let total_after = dir.reassembled_bytes + buffered + segment_data.len(); let truncated = if total_after > max_depth { let allowed = max_depth.saturating_sub(dir.reassembled_bytes + buffered); @@ -82,7 +82,8 @@ pub fn insert_segment( let mut has_conflict = false; let mut trimmed_ranges: Vec<(u64, u64)> = Vec::new(); - for (&existing_offset, existing_data) in dir.segments.iter() { + // Only segments starting before new_end can overlap [new_start, new_end). + for (&existing_offset, existing_data) in dir.segments.range(..new_end) { let existing_end = existing_offset + existing_data.len() as u64; if new_start < existing_end && new_end > existing_offset { @@ -152,7 +153,17 @@ pub fn insert_segment( if start_idx < segment_data.len() && end_idx <= segment_data.len() { let gap_data = segment_data[start_idx..end_idx].to_vec(); if !gap_data.is_empty() { - dir.segments.insert(gap_start, gap_data); + let gap_len = gap_data.len(); + let old = dir.segments.insert(gap_start, gap_data); + debug_assert!( + old.is_none(), + "gap_start {} collided with existing segment", + gap_start + ); + if let Some(old) = old { + dir.buffered_bytes -= old.len(); + } + dir.buffered_bytes += gap_len; } } } @@ -168,7 +179,17 @@ pub fn insert_segment( } // No overlap — insert normally - dir.segments.insert(offset, segment_data); + let data_len = segment_data.len(); + let old = dir.segments.insert(offset, segment_data); + debug_assert!( + old.is_none(), + "offset {} collided with existing segment in no-overlap path", + offset + ); + if let Some(old) = old { + dir.buffered_bytes -= old.len(); + } + dir.buffered_bytes += data_len; if truncated { InsertResult::Truncated @@ -184,6 +205,7 @@ pub fn flush_contiguous(dir: &mut FlowDirection) -> Vec<(u64, Vec)> { while let Some(data) = dir.segments.remove(&dir.base_offset) { let offset = dir.base_offset; + dir.buffered_bytes -= data.len(); dir.base_offset += data.len() as u64; dir.reassembled_bytes += data.len(); flushed.push((offset, data)); diff --git a/tests/reassembly_engine_tests.rs b/tests/reassembly_engine_tests.rs index 010ee33..4fe4a15 100644 --- a/tests/reassembly_engine_tests.rs +++ b/tests/reassembly_engine_tests.rs @@ -167,6 +167,7 @@ fn test_rst_closes_flow() { assert_eq!(handler.close_events.len(), 1); assert_eq!(handler.close_events[0].1, CloseReason::Rst); + assert_eq!(reassembler.total_memory(), 0); } #[test] @@ -223,4 +224,71 @@ fn test_flow_timeout_expiration() { let stats = reassembler.stats(); assert_eq!(stats.flows_expired, 1); + assert_eq!(reassembler.total_memory(), 0); +} + +#[test] +fn test_total_memory_tracking() { + let config = ReassemblyConfig::default(); + let mut reassembler = TcpReassembler::new(config); + let mut handler = RecordingHandler::new(); + + let client = [10, 0, 0, 1]; + let server = [10, 0, 0, 2]; + + // SYN — no payload, no memory change + let syn = make_tcp_packet(client, 12345, server, 80, 1000, &[], true, false, false); + reassembler.process_packet(&syn, 1, &mut handler); + + // Out-of-order segment — buffered (not flushed) + let p2 = make_tcp_packet(client, 12345, server, 80, 1004, b"bbb", false, false, false); + reassembler.process_packet(&p2, 2, &mut handler); + assert!(handler.data_events.is_empty()); + assert_eq!(reassembler.total_memory(), 3); // "bbb" buffered + + // In-order segment — triggers flush of both + let p1 = make_tcp_packet(client, 12345, server, 80, 1001, b"aaa", false, false, false); + reassembler.process_packet(&p1, 3, &mut handler); + assert_eq!(handler.all_data(), b"aaabbb"); + assert_eq!(reassembler.total_memory(), 0); // all flushed + + // Finalize — closes flow + reassembler.finalize(&mut handler); + assert_eq!(reassembler.total_memory(), 0); +} + +#[test] +fn test_fin_close_total_memory() { + let config = ReassemblyConfig::default(); + let mut reassembler = TcpReassembler::new(config); + let mut handler = RecordingHandler::new(); + + let client = [10, 0, 0, 1]; + let server = [10, 0, 0, 2]; + + // SYN from client + let syn = make_tcp_packet(client, 12345, server, 80, 1000, &[], true, false, false); + reassembler.process_packet(&syn, 1, &mut handler); + + // Out-of-order data — stays buffered (gap at offset 1) + let p2 = make_tcp_packet(client, 12345, server, 80, 1004, b"bbb", false, false, false); + reassembler.process_packet(&p2, 2, &mut handler); + assert_eq!(reassembler.total_memory(), 3); + + // FIN from client (first FIN) + let fin1 = make_tcp_packet(client, 12345, server, 80, 1007, &[], false, true, false); + reassembler.process_packet(&fin1, 3, &mut handler); + + // FIN from server (second FIN → Closed, triggers step 10 removal) + let fin2 = make_tcp_packet(server, 80, client, 12345, 2000, &[], false, true, false); + reassembler.process_packet(&fin2, 4, &mut handler); + + // Flow removed with buffered-but-not-flushed data — total_memory must be 0 + assert_eq!(reassembler.total_memory(), 0); + assert!( + handler + .close_events + .iter() + .any(|(_, r)| *r == CloseReason::Fin) + ); } diff --git a/tests/reassembly_flow_tests.rs b/tests/reassembly_flow_tests.rs index 34530f8..19786b7 100644 --- a/tests/reassembly_flow_tests.rs +++ b/tests/reassembly_flow_tests.rs @@ -95,6 +95,7 @@ fn test_flow_direction_new() { assert_eq!(dir.base_offset, 0); assert!(dir.segments.is_empty()); assert_eq!(dir.reassembled_bytes, 0); + assert_eq!(dir.buffered_bytes, 0); assert!(!dir.fin_seen); assert!(!dir.rst_seen); assert!(!dir.depth_exceeded); diff --git a/tests/reassembly_segment_tests.rs b/tests/reassembly_segment_tests.rs index 1ab0d12..c5ef1ef 100644 --- a/tests/reassembly_segment_tests.rs +++ b/tests/reassembly_segment_tests.rs @@ -72,6 +72,7 @@ fn test_retransmission_dedup() { let result = insert_segment(&mut dir, 1001, b"hello", 10_485_760, 10_000); assert_eq!(result, InsertResult::Duplicate); assert_eq!(dir.segments.len(), 1); // No duplicate stored + assert_eq!(dir.buffered_bytes, 5); // counter must not double-count } #[test] @@ -148,6 +149,56 @@ fn test_small_segment_tracking() { assert_eq!(dir.small_segment_count, 5); } +#[test] +fn test_buffered_bytes_after_insert() { + let mut dir = FlowDirection::new(); + dir.set_isn(1000); + insert_segment(&mut dir, 1001, b"hello", 10_485_760, 10_000); + assert_eq!(dir.buffered_bytes, 5); + insert_segment(&mut dir, 1006, b"world", 10_485_760, 10_000); + assert_eq!(dir.buffered_bytes, 10); +} + +#[test] +fn test_buffered_bytes_after_overlap() { + let mut dir = FlowDirection::new(); + dir.set_isn(1000); + insert_segment(&mut dir, 1001, b"AAABBB", 10_485_760, 10_000); + assert_eq!(dir.buffered_bytes, 6); + insert_segment(&mut dir, 1004, b"XXXCC", 10_485_760, 10_000); + assert_eq!(dir.buffered_bytes, 8); // 6 original + 2 gap bytes +} + +#[test] +fn test_buffered_bytes_after_flush() { + let mut dir = FlowDirection::new(); + dir.set_isn(1000); + + insert_segment(&mut dir, 1001, b"hello", 10_485_760, 10_000); + assert_eq!(dir.buffered_bytes, 5); + + let flushed = flush_contiguous(&mut dir); + assert_eq!(flushed.len(), 1); + assert_eq!(dir.buffered_bytes, 0); +} + +#[test] +fn test_buffered_bytes_partial_flush() { + let mut dir = FlowDirection::new(); + dir.set_isn(1000); + + // Insert segment at offset 1 (contiguous) and offset 10 (gap) + insert_segment(&mut dir, 1001, b"aaa", 10_485_760, 10_000); + insert_segment(&mut dir, 1010, b"bbb", 10_485_760, 10_000); + assert_eq!(dir.buffered_bytes, 6); + + // Flush only flushes contiguous segment at offset 1 + let flushed = flush_contiguous(&mut dir); + assert_eq!(flushed.len(), 1); + assert_eq!(flushed[0].1, b"aaa"); + assert_eq!(dir.buffered_bytes, 3); // "bbb" remains buffered +} + #[test] fn test_depth_limit_truncation() { let mut dir = FlowDirection::new(); @@ -170,3 +221,43 @@ fn test_depth_limit_truncation() { assert_eq!(flushed[0].1.len(), 20); // truncated from 50 to 20 assert_eq!(dir.reassembled_bytes, 100); } + +#[test] +fn test_overlap_detection_boundary() { + let mut dir = FlowDirection::new(); + dir.set_isn(1000); + + // Insert segment at offset 1, length 5 (covers 1-5) + insert_segment(&mut dir, 1001, b"AAAAA", 10_485_760, 10_000); + // Insert segment at offset 10, length 5 (covers 10-14) — no overlap with above + insert_segment(&mut dir, 1010, b"BBBBB", 10_485_760, 10_000); + assert_eq!(dir.segments.len(), 2); + assert_eq!(dir.overlap_count, 0); + + // Insert segment at offset 3, length 4 (covers 3-6) — overlaps first, not second + let result = insert_segment(&mut dir, 1003, b"XXXX", 10_485_760, 10_000); + assert_eq!(result, InsertResult::PartialOverlap); + assert_eq!(dir.overlap_count, 1); + + // Insert segment at offset 6, length 4 (covers 6-9). + // The partial-overlap insert above deposited a 1-byte gap at offset 6 ("X"), + // so this segment overlaps that byte and is PartialOverlap. + let result = insert_segment(&mut dir, 1006, b"CCCC", 10_485_760, 10_000); + assert_eq!(result, InsertResult::PartialOverlap); + assert_eq!(dir.overlap_count, 2); +} + +#[test] +fn test_range_boundary_exact_new_end() { + let mut dir = FlowDirection::new(); + dir.set_isn(1000); + + // Insert segment at offset 1, length 5 (covers 1-5, ends at 6) + insert_segment(&mut dir, 1001, b"AAAAA", 10_485_760, 10_000); + + // Insert segment starting exactly at the end of the first (offset 6) + // This should NOT overlap — range(..new_end) must exclude it + let result = insert_segment(&mut dir, 1006, b"BBBBB", 10_485_760, 10_000); + assert_eq!(result, InsertResult::Inserted); + assert_eq!(dir.overlap_count, 0); +}