From df90341438da383ada145d18497ce0ea8bc385ef Mon Sep 17 00:00:00 2001 From: Zious Date: Mon, 6 Apr 2026 20:20:39 -0500 Subject: [PATCH 1/8] docs: add reassembly perf optimization design spec MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Design for issue #11 — BTreeMap::range() overlap detection and incremental memory tracking in the TCP reassembly engine. --- .../2026-04-06-reassembly-perf-design.md | 154 ++++++++++++++++++ 1 file changed, 154 insertions(+) create mode 100644 docs/superpowers/specs/2026-04-06-reassembly-perf-design.md diff --git a/docs/superpowers/specs/2026-04-06-reassembly-perf-design.md b/docs/superpowers/specs/2026-04-06-reassembly-perf-design.md new file mode 100644 index 0000000..4155cf6 --- /dev/null +++ b/docs/superpowers/specs/2026-04-06-reassembly-perf-design.md @@ -0,0 +1,154 @@ +# TCP Reassembly Performance Optimization Design + +**Issue:** #11 — perf: use BTreeMap::range() and incremental memory tracking in reassembly +**Scope:** Three targeted O(n) → O(1)/O(log n) optimizations in the reassembly engine internals. No behavioral changes, no API changes. + +## Problem + +The TCP reassembly engine has three O(n) hotspots identified during PR #10 review: + +1. **Overlap detection** (`segment.rs:85`): iterates all segments in the BTreeMap per insert, even though only segments starting before `new_end` can overlap. +2. **`buffered_bytes` computation** (`segment.rs:62`): recomputes `segments.values().map(|v| v.len()).sum()` on every insert — O(n) per call. +3. **`update_memory()` recomputation** (`mod.rs:391`): iterates all flows × all segments per direction to sum total memory — O(m × n). Called at 5 sites per packet (lines 178, 317, 355, 376, 434). + +With `max_segments_per_direction = 10,000` and `max_flows = 100,000`, these are bounded but wasteful. + +## Approach + +Incremental counters + BTreeMap range query. All three optimizations are independent and can be implemented/tested separately. + +## Changes + +### 1. `FlowDirection.buffered_bytes` — Incremental Counter (`flow.rs`) + +Add `pub buffered_bytes: usize` field to `FlowDirection`, initialized to 0. + +Update at every `BTreeMap::insert` and `BTreeMap::remove` site: + +**On insert** (both `segment.rs:155` gap insert and `segment.rs:171` normal insert): +```rust +let data_len = segment_data.len(); +if let Some(old) = dir.segments.insert(offset, segment_data) { + dir.buffered_bytes -= old.len(); +} +dir.buffered_bytes += data_len; +``` + +Capturing `data_len` before `insert()` is required because `segment_data` is moved into the BTreeMap. + +The `if let Some(old)` handles the theoretical edge case where `insert()` replaces an existing key. In practice, the overlap detection prevents this for the normal path (line 171), and gap computation prevents it for the gap path (line 155). The guard is defensive. + +**On flush** (`segment.rs:185`, inside `flush_contiguous`): +```rust +while let Some(data) = dir.segments.remove(&dir.base_offset) { + dir.buffered_bytes -= data.len(); + // ... existing logic ... +} +``` + +**Replace `memory_used()`** (`flow.rs:107`): +```rust +pub fn memory_used(&self) -> usize { + debug_assert_eq!( + self.buffered_bytes, + self.segments.values().map(|v| v.len()).sum::(), + "buffered_bytes counter drifted from actual segment sizes" + ); + self.buffered_bytes +} +``` + +The `debug_assert_eq!` verifies the counter in debug/test builds (O(n) check). Compiled out in release. + +**Depth check** (`segment.rs:62`): Replace O(n) summation with field access: +```rust +// Before: +let buffered: usize = dir.segments.values().map(|v| v.len()).sum(); + +// After: +let buffered = dir.buffered_bytes; +``` + +### 2. `BTreeMap::range()` for Overlap Detection (`segment.rs`) + +Replace full iteration at line 85: +```rust +// Before (O(n) — iterates all segments): +for (&existing_offset, existing_data) in dir.segments.iter() { + +// After (O(log n + k) — only segments that could overlap): +for (&existing_offset, existing_data) in dir.segments.range(..new_end) { +``` + +**Why this is correct:** A segment at `existing_offset` overlaps `[new_start, new_end)` only if `existing_offset < new_end AND new_start < existing_end`. The first condition is enforced by `range(..new_end)`. The second condition is checked inside the loop (unchanged). Segments with `existing_offset >= new_end` cannot overlap regardless of their length, because the overlap condition requires `existing_offset < new_end`. + +**No lower bound needed:** We could theoretically skip segments whose end is before `new_start`, but since segment length varies and the BTreeMap is keyed by start offset (not end), a lower bound would require knowing max segment size. The `range(..new_end)` upper bound is sufficient — the inner `new_start < existing_end` check handles the rest. + +### 3. Incremental `total_memory` Tracking (`mod.rs`) + +Remove `fn update_memory()` entirely. Replace all 5 call sites with inline delta computation. + +**Pattern for insert sites:** +```rust +let before = flow_dir.buffered_bytes; +let result = insert_segment(flow_dir, seq, payload, ...); +let delta = flow_dir.buffered_bytes - before; +self.total_memory += delta; +``` + +**Pattern for flush sites:** +```rust +let before = flow_dir.buffered_bytes; +let flushed = flush_contiguous(flow_dir); +self.total_memory -= before - flow_dir.buffered_bytes; +``` + +**Pattern for flow removal:** +```rust +if let Some(flow) = self.flows.remove(&key) { + self.total_memory -= flow.memory_used(); +} +``` + +`flow.memory_used()` is now O(1) — just `client_to_server.buffered_bytes + server_to_client.buffered_bytes`. + +**Call site mapping:** + +| Line | Context | Replacement | +|------|---------|-------------| +| 178 | After RST removal | `self.total_memory -= flow.memory_used()` before remove | +| 317 | After payload processing | Delta after insert + delta after flush | +| 355 | After `expire_flows` | Subtract removed flow memory | +| 376 | After `finalize` | Subtract removed flow memory | +| 434 | Inside `evict_flows` loop | Subtract removed flow memory | + +## Files Modified + +| File | Change | +|------|--------| +| `src/reassembly/flow.rs` | Add `buffered_bytes: usize` field, update `memory_used()` | +| `src/reassembly/segment.rs` | Use `range(..new_end)`, update `buffered_bytes` on insert/flush, replace O(n) sum | +| `src/reassembly/mod.rs` | Remove `update_memory()`, add inline delta tracking at all mutation sites | + +## Testing + +**Unit tests (new):** +1. `buffered_bytes` correct after single insert +2. `buffered_bytes` correct after insert + flush sequence +3. `buffered_bytes` correct after overlapping inserts (gap portions only) +4. `buffered_bytes` correct after replacement insert (defensive case) +5. `total_memory` delta correct after insert +6. `total_memory` correct after flow removal +7. Range-based overlap detection finds all overlapping segments +8. Range-based overlap detection skips non-overlapping segments + +**Existing tests:** All must pass unchanged. The `debug_assert_eq!` in `memory_used()` acts as a cross-check in all test runs. + +**No integration test changes** — behavior is identical, only performance characteristics change. + +## Not In Scope + +- **Per-flow cached memory** — `FlowDirection.buffered_bytes` makes `TcpFlow.memory_used()` O(1) already +- **Lower bound on range query** — diminishing returns, inner check handles it +- **Benchmarks** — can be added in a follow-up issue +- **Structural refactoring** — keeping changes minimal and targeted From 4a8cdf998c682509e5fea5d780788d85c79fe0c7 Mon Sep 17 00:00:00 2001 From: Zious Date: Mon, 6 Apr 2026 20:30:26 -0500 Subject: [PATCH 2/8] docs: add reassembly perf optimization implementation plan 4-task plan for issue #11: buffered_bytes tracking, range-based overlap detection, and incremental total_memory tracking. --- .../plans/2026-04-06-reassembly-perf.md | 783 ++++++++++++++++++ 1 file changed, 783 insertions(+) create mode 100644 docs/superpowers/plans/2026-04-06-reassembly-perf.md diff --git a/docs/superpowers/plans/2026-04-06-reassembly-perf.md b/docs/superpowers/plans/2026-04-06-reassembly-perf.md new file mode 100644 index 0000000..f7a1a72 --- /dev/null +++ b/docs/superpowers/plans/2026-04-06-reassembly-perf.md @@ -0,0 +1,783 @@ +# TCP Reassembly Performance Optimization Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Replace three O(n) hotspots in the TCP reassembly engine with O(1)/O(log n) operations using incremental counters and BTreeMap range queries. + +**Architecture:** Add `buffered_bytes: usize` field to `FlowDirection` for incremental byte tracking. Replace `segments.iter()` with `segments.range(..new_end)` for overlap detection. Remove `update_memory()` and track `total_memory` via deltas at each mutation site. + +**Tech Stack:** Rust std `BTreeMap::range()`, no new dependencies. + +--- + +## File Structure + +| File | Responsibility | Changes | +|------|---------------|---------| +| `src/reassembly/flow.rs` | Flow/direction data structures | Add `buffered_bytes` field, update `memory_used()` | +| `src/reassembly/segment.rs` | Segment insert/flush operations | Track `buffered_bytes` on insert/flush, use `range()`, replace O(n) sum | +| `src/reassembly/mod.rs` | Reassembly engine orchestration | Remove `update_memory()`, add inline delta tracking | +| `tests/reassembly_segment_tests.rs` | Segment operation tests | Add `buffered_bytes` assertions to existing + new tests | +| `tests/reassembly_flow_tests.rs` | Flow structure tests | Add `buffered_bytes` initialization assertion | +| `tests/reassembly_engine_tests.rs` | Engine integration tests | Add `total_memory` tracking test | + +--- + +### Task 1: Add `buffered_bytes` field and track on segment insert + +**Files:** +- Modify: `src/reassembly/flow.rs:56-91` +- Modify: `src/reassembly/segment.rs:145-178` +- Modify: `tests/reassembly_segment_tests.rs` +- Modify: `tests/reassembly_flow_tests.rs` + +- [ ] **Step 1: Add `buffered_bytes` field to `FlowDirection`** + +In `src/reassembly/flow.rs`, add the field to the struct (after `segments` on line 59): + +```rust +pub struct FlowDirection { + pub isn: Option, + pub base_offset: u64, + pub segments: BTreeMap>, + pub buffered_bytes: usize, + pub reassembled_bytes: usize, + pub overlap_count: u32, + pub overlap_alert_fired: bool, + pub small_segment_count: u32, + pub small_segment_alert_fired: bool, + pub fin_seen: bool, + pub rst_seen: bool, + pub depth_exceeded: bool, +} +``` + +Initialize to 0 in `FlowDirection::new()`: + +```rust +pub fn new() -> Self { + FlowDirection { + isn: None, + base_offset: 0, + segments: BTreeMap::new(), + buffered_bytes: 0, + reassembled_bytes: 0, + overlap_count: 0, + overlap_alert_fired: false, + small_segment_count: 0, + small_segment_alert_fired: false, + fin_seen: false, + rst_seen: false, + depth_exceeded: false, + } +} +``` + +- [ ] **Step 2: Add `buffered_bytes` assertion to existing flow init test** + +In `tests/reassembly_flow_tests.rs`, add this assertion to `test_flow_direction_new`: + +```rust +#[test] +fn test_flow_direction_new() { + let dir = FlowDirection::new(); + assert_eq!(dir.isn, None); + assert_eq!(dir.base_offset, 0); + assert!(dir.segments.is_empty()); + assert_eq!(dir.buffered_bytes, 0); + assert_eq!(dir.reassembled_bytes, 0); + assert!(!dir.fin_seen); + assert!(!dir.rst_seen); + assert!(!dir.depth_exceeded); +} +``` + +- [ ] **Step 3: Write failing test for `buffered_bytes` after insert** + +In `tests/reassembly_segment_tests.rs`, add: + +```rust +#[test] +fn test_buffered_bytes_after_insert() { + let mut dir = FlowDirection::new(); + dir.set_isn(1000); + + insert_segment(&mut dir, 1001, b"hello", 10_485_760, 10_000); + assert_eq!(dir.buffered_bytes, 5); + + insert_segment(&mut dir, 1006, b"world", 10_485_760, 10_000); + assert_eq!(dir.buffered_bytes, 10); +} +``` + +- [ ] **Step 4: Run test to verify it fails** + +Run: `cargo test test_buffered_bytes_after_insert -- --nocapture` +Expected: FAIL — `buffered_bytes` is 0 (not updated yet). + +- [ ] **Step 5: Write failing test for `buffered_bytes` after overlapping insert** + +In `tests/reassembly_segment_tests.rs`, add: + +```rust +#[test] +fn test_buffered_bytes_after_overlap() { + let mut dir = FlowDirection::new(); + dir.set_isn(1000); + + // Insert "AAABBB" at offset 1 (6 bytes) + insert_segment(&mut dir, 1001, b"AAABBB", 10_485_760, 10_000); + assert_eq!(dir.buffered_bytes, 6); + + // Overlapping insert: "XXXCC" at offset 4 — only "CC" (2 bytes) is new + insert_segment(&mut dir, 1004, b"XXXCC", 10_485_760, 10_000); + assert_eq!(dir.buffered_bytes, 8); // 6 original + 2 gap bytes +} +``` + +- [ ] **Step 6: Update `insert_segment` to track `buffered_bytes`** + +In `src/reassembly/segment.rs`, modify the normal insert at line 170-171: + +```rust + // No overlap — insert normally + let data_len = segment_data.len(); + if let Some(old) = dir.segments.insert(offset, segment_data) { + dir.buffered_bytes -= old.len(); + } + dir.buffered_bytes += data_len; +``` + +And the gap insert at line 153-157, replace: + +```rust + let gap_data = segment_data[start_idx..end_idx].to_vec(); + if !gap_data.is_empty() { + dir.segments.insert(gap_start, gap_data); + } +``` + +with: + +```rust + let gap_data = segment_data[start_idx..end_idx].to_vec(); + if !gap_data.is_empty() { + let gap_len = gap_data.len(); + if let Some(old) = dir.segments.insert(gap_start, gap_data) { + dir.buffered_bytes -= old.len(); + } + dir.buffered_bytes += gap_len; + } +``` + +- [ ] **Step 7: Run tests to verify they pass** + +Run: `cargo test test_buffered_bytes -- --nocapture` +Expected: PASS for both `test_buffered_bytes_after_insert` and `test_buffered_bytes_after_overlap`. + +- [ ] **Step 8: Run all existing segment tests** + +Run: `cargo test reassembly_segment -- --nocapture` +Expected: All existing tests PASS (behavior unchanged). + +- [ ] **Step 9: Commit** + +```bash +git add src/reassembly/flow.rs src/reassembly/segment.rs tests/reassembly_segment_tests.rs tests/reassembly_flow_tests.rs +git commit -m "perf: add buffered_bytes field and track on segment insert" +``` + +--- + +### Task 2: Track `buffered_bytes` on flush and update `memory_used()` + +**Files:** +- Modify: `src/reassembly/segment.rs:180-193` +- Modify: `src/reassembly/flow.rs:107-109` +- Modify: `tests/reassembly_segment_tests.rs` + +- [ ] **Step 1: Write failing test for `buffered_bytes` after insert + flush** + +In `tests/reassembly_segment_tests.rs`, add: + +```rust +#[test] +fn test_buffered_bytes_after_flush() { + let mut dir = FlowDirection::new(); + dir.set_isn(1000); + + insert_segment(&mut dir, 1001, b"hello", 10_485_760, 10_000); + assert_eq!(dir.buffered_bytes, 5); + + let flushed = flush_contiguous(&mut dir); + assert_eq!(flushed.len(), 1); + assert_eq!(dir.buffered_bytes, 0); +} + +#[test] +fn test_buffered_bytes_partial_flush() { + let mut dir = FlowDirection::new(); + dir.set_isn(1000); + + // Insert segment at offset 1 (contiguous) and offset 10 (gap) + insert_segment(&mut dir, 1001, b"aaa", 10_485_760, 10_000); + insert_segment(&mut dir, 1010, b"bbb", 10_485_760, 10_000); + assert_eq!(dir.buffered_bytes, 6); + + // Flush only flushes contiguous segment at offset 1 + let flushed = flush_contiguous(&mut dir); + assert_eq!(flushed.len(), 1); + assert_eq!(flushed[0].1, b"aaa"); + assert_eq!(dir.buffered_bytes, 3); // "bbb" remains buffered +} +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `cargo test test_buffered_bytes_after_flush test_buffered_bytes_partial_flush -- --nocapture` +Expected: FAIL — `flush_contiguous` doesn't update `buffered_bytes` yet. + +- [ ] **Step 3: Update `flush_contiguous` to decrement `buffered_bytes`** + +In `src/reassembly/segment.rs`, modify `flush_contiguous`: + +```rust +pub fn flush_contiguous(dir: &mut FlowDirection) -> Vec<(u64, Vec)> { + let mut flushed = Vec::new(); + + while let Some(data) = dir.segments.remove(&dir.base_offset) { + let offset = dir.base_offset; + dir.buffered_bytes -= data.len(); + dir.base_offset += data.len() as u64; + dir.reassembled_bytes += data.len(); + flushed.push((offset, data)); + } + + flushed +} +``` + +- [ ] **Step 4: Run flush tests to verify they pass** + +Run: `cargo test test_buffered_bytes_after_flush test_buffered_bytes_partial_flush -- --nocapture` +Expected: PASS. + +- [ ] **Step 5: Update `memory_used()` with debug assertion** + +In `src/reassembly/flow.rs`, replace `memory_used()`: + +```rust + pub fn memory_used(&self) -> usize { + debug_assert_eq!( + self.buffered_bytes, + self.segments.values().map(|v| v.len()).sum::(), + "buffered_bytes counter drifted from actual segment sizes" + ); + self.buffered_bytes + } +``` + +- [ ] **Step 6: Run all tests to verify debug_assert doesn't fire** + +Run: `cargo test -- --nocapture` +Expected: All tests PASS. The `debug_assert_eq!` verifies `buffered_bytes` matches the recomputed sum on every `memory_used()` call during tests. + +- [ ] **Step 7: Commit** + +```bash +git add src/reassembly/segment.rs src/reassembly/flow.rs tests/reassembly_segment_tests.rs +git commit -m "perf: track buffered_bytes on flush and add debug assertion in memory_used()" +``` + +--- + +### Task 3: Replace O(n) buffered sum with field access and use `range()` for overlap detection + +**Files:** +- Modify: `src/reassembly/segment.rs:62,85` +- Modify: `tests/reassembly_segment_tests.rs` + +- [ ] **Step 1: Write test for range-based overlap boundary** + +In `tests/reassembly_segment_tests.rs`, add: + +```rust +#[test] +fn test_overlap_detection_boundary() { + let mut dir = FlowDirection::new(); + dir.set_isn(1000); + + // Insert segment at offset 1, length 5 (covers 1-5) + insert_segment(&mut dir, 1001, b"AAAAA", 10_485_760, 10_000); + // Insert segment at offset 10, length 5 (covers 10-14) — no overlap with above + insert_segment(&mut dir, 1010, b"BBBBB", 10_485_760, 10_000); + assert_eq!(dir.segments.len(), 2); + assert_eq!(dir.overlap_count, 0); + + // Insert segment at offset 3, length 4 (covers 3-6) — overlaps first, not second + let result = insert_segment(&mut dir, 1003, b"XXXX", 10_485_760, 10_000); + assert_eq!(result, InsertResult::PartialOverlap); + assert_eq!(dir.overlap_count, 1); + + // Insert segment at offset 6, length 4 (covers 6-9) — no overlap with either + let result = insert_segment(&mut dir, 1006, b"CCCC", 10_485_760, 10_000); + assert_eq!(result, InsertResult::Inserted); + assert_eq!(dir.overlap_count, 1); // unchanged +} +``` + +- [ ] **Step 2: Run test to verify it passes (establishes baseline)** + +Run: `cargo test test_overlap_detection_boundary -- --nocapture` +Expected: PASS (current O(n) logic produces correct results). + +- [ ] **Step 3: Replace O(n) buffered sum with field access** + +In `src/reassembly/segment.rs`, line 62, replace: + +```rust + let buffered: usize = dir.segments.values().map(|v| v.len()).sum(); +``` + +with: + +```rust + let buffered = dir.buffered_bytes; +``` + +- [ ] **Step 4: Replace `iter()` with `range(..new_end)` for overlap detection** + +In `src/reassembly/segment.rs`, line 85, replace: + +```rust + for (&existing_offset, existing_data) in dir.segments.iter() { +``` + +with: + +```rust + for (&existing_offset, existing_data) in dir.segments.range(..new_end) { +``` + +- [ ] **Step 5: Run all segment tests** + +Run: `cargo test reassembly_segment -- --nocapture` +Expected: All PASS — including `test_overlap_detection_boundary`, `test_overlap_first_wins`, `test_overlap_conflicting_data_detected`, `test_retransmission_dedup`. + +- [ ] **Step 6: Run full test suite** + +Run: `cargo test -- --nocapture` +Expected: All PASS. + +- [ ] **Step 7: Commit** + +```bash +git add src/reassembly/segment.rs tests/reassembly_segment_tests.rs +git commit -m "perf: use BTreeMap::range() for overlap detection and replace O(n) buffered sum" +``` + +--- + +### Task 4: Remove `update_memory()` and add incremental `total_memory` tracking + +**Files:** +- Modify: `src/reassembly/mod.rs` +- Modify: `tests/reassembly_engine_tests.rs` + +This is the largest task. It replaces 5 `update_memory()` call sites with inline delta tracking. The key patterns are: + +**For insert+flush (no removal):** Track deltas before/after each operation. +**For flush+remove paths (RST, FIN, expire, finalize, evict):** Capture flow's `memory_used()` before flushing, subtract that amount on removal. This is simpler than tracking individual flush deltas because the entire flow is about to be destroyed. + +- [ ] **Step 1: Write test verifying `total_memory` tracking** + +In `tests/reassembly_engine_tests.rs`, add: + +```rust +#[test] +fn test_total_memory_tracking() { + let config = ReassemblyConfig::default(); + let mut reassembler = TcpReassembler::new(config); + let mut handler = RecordingHandler::new(); + + let client = [10, 0, 0, 1]; + let server = [10, 0, 0, 2]; + + // SYN — no payload, no memory change + let syn = make_tcp_packet(client, 12345, server, 80, 1000, &[], true, false, false); + reassembler.process_packet(&syn, 1, &mut handler); + + // Out-of-order segment — buffered (not flushed) + let p2 = make_tcp_packet(client, 12345, server, 80, 1004, b"bbb", false, false, false); + reassembler.process_packet(&p2, 2, &mut handler); + // "bbb" is buffered (3 bytes) because offset 1 is missing + assert!(handler.data_events.is_empty()); // nothing flushed yet + + // In-order segment — triggers flush of both + let p1 = make_tcp_packet(client, 12345, server, 80, 1001, b"aaa", false, false, false); + reassembler.process_packet(&p1, 3, &mut handler); + // Both segments flushed — total_memory should be 0 + assert_eq!(handler.all_data(), b"aaabbb"); + + // Finalize — closes flow, should leave total_memory at 0 + reassembler.finalize(&mut handler); +} +``` + +Note: We cannot directly assert `total_memory` because it is a private field. The test verifies correct behavior indirectly — the `debug_assert_eq!` inside `memory_used()` (called during finalize's flow removal) will panic if counters have drifted. If this test passes, incremental tracking is correct. + +- [ ] **Step 2: Run test to verify it passes with current `update_memory()`** + +Run: `cargo test test_total_memory_tracking -- --nocapture` +Expected: PASS (baseline with current O(n) recomputation). + +- [ ] **Step 3: Update payload processing path (process_packet lines 211-291)** + +In `src/reassembly/mod.rs`, replace the insert + flush section. Find the block starting around line 211: + +```rust + let flow_dir = flow.get_direction_mut(dir); + let result = insert_segment( + flow_dir, + seq, + payload, + self.config.max_depth, + self.config.max_segments_per_direction, + ); +``` + +Replace with: + +```rust + let flow_dir = flow.get_direction_mut(dir); + let before_insert = flow_dir.buffered_bytes; + let result = insert_segment( + flow_dir, + seq, + payload, + self.config.max_depth, + self.config.max_segments_per_direction, + ); + self.total_memory += flow_dir.buffered_bytes - before_insert; +``` + +Then find the flush section around line 283: + +```rust + let flow = self.flows.get_mut(&key).unwrap(); + let flow_dir = flow.get_direction_mut(dir); + let flushed = flush_contiguous(flow_dir); + + for (offset, data) in &flushed { + self.stats.bytes_reassembled += data.len() as u64; + handler.on_data(&key, dir, data, *offset); + } +``` + +Replace with: + +```rust + let flow = self.flows.get_mut(&key).unwrap(); + let flow_dir = flow.get_direction_mut(dir); + let before_flush = flow_dir.buffered_bytes; + let flushed = flush_contiguous(flow_dir); + self.total_memory -= before_flush - flow_dir.buffered_bytes; + + for (offset, data) in &flushed { + self.stats.bytes_reassembled += data.len() as u64; + handler.on_data(&key, dir, data, *offset); + } +``` + +Remove the `self.update_memory();` call at line 317 (after the payload section). + +- [ ] **Step 4: Update RST handler (process_packet lines 159-179)** + +Find the RST handler block: + +```rust + if rst { + flow.on_rst(); + self.stats.flows_rst += 1; + let key_clone = key.clone(); + // Flush buffered contiguous data before removing + if let Some(flow) = self.flows.get_mut(&key_clone) { + use crate::reassembly::handler::Direction; + for dir in [Direction::ClientToServer, Direction::ServerToClient] { + let flow_dir = flow.get_direction_mut(dir); + let flushed = flush_contiguous(flow_dir); + for (offset, data) in &flushed { + self.stats.bytes_reassembled += data.len() as u64; + handler.on_data(&key_clone, dir, data, *offset); + } + } + } + handler.on_flow_close(&key_clone, CloseReason::Rst); + self.flows.remove(&key_clone); + self.update_memory(); + return; + } +``` + +Replace with: + +```rust + if rst { + flow.on_rst(); + self.stats.flows_rst += 1; + let key_clone = key.clone(); + let flow_mem = self.flows.get(&key_clone).map(|f| f.memory_used()).unwrap_or(0); + // Flush buffered contiguous data before removing + if let Some(flow) = self.flows.get_mut(&key_clone) { + use crate::reassembly::handler::Direction; + for dir in [Direction::ClientToServer, Direction::ServerToClient] { + let flow_dir = flow.get_direction_mut(dir); + let flushed = flush_contiguous(flow_dir); + for (offset, data) in &flushed { + self.stats.bytes_reassembled += data.len() as u64; + handler.on_data(&key_clone, dir, data, *offset); + } + } + } + handler.on_flow_close(&key_clone, CloseReason::Rst); + self.flows.remove(&key_clone); + self.total_memory -= flow_mem; + return; + } +``` + +Pattern: capture `memory_used()` before flushing, subtract on removal. This correctly accounts for both flushed bytes and any remaining non-contiguous bytes. + +- [ ] **Step 5: Update FIN-closed flow removal (process_packet lines 294-314)** + +Find the FIN closure block: + +```rust + if self + .flows + .get(&key) + .is_some_and(|f| f.state == FlowState::Closed) + { + // Flush remaining data in both directions before removal + if let Some(flow) = self.flows.get_mut(&key) { + use crate::reassembly::handler::Direction; + for dir in [Direction::ClientToServer, Direction::ServerToClient] { + let flow_dir = flow.get_direction_mut(dir); + let flushed = flush_contiguous(flow_dir); + for (offset, data) in &flushed { + self.stats.bytes_reassembled += data.len() as u64; + handler.on_data(&key, dir, data, *offset); + } + } + } + self.stats.flows_fin += 1; + handler.on_flow_close(&key, CloseReason::Fin); + self.flows.remove(&key); + } +``` + +Replace with: + +```rust + if self + .flows + .get(&key) + .is_some_and(|f| f.state == FlowState::Closed) + { + let flow_mem = self.flows.get(&key).map(|f| f.memory_used()).unwrap_or(0); + // Flush remaining data in both directions before removal + if let Some(flow) = self.flows.get_mut(&key) { + use crate::reassembly::handler::Direction; + for dir in [Direction::ClientToServer, Direction::ServerToClient] { + let flow_dir = flow.get_direction_mut(dir); + let flushed = flush_contiguous(flow_dir); + for (offset, data) in &flushed { + self.stats.bytes_reassembled += data.len() as u64; + handler.on_data(&key, dir, data, *offset); + } + } + } + self.stats.flows_fin += 1; + handler.on_flow_close(&key, CloseReason::Fin); + self.flows.remove(&key); + self.total_memory -= flow_mem; + } +``` + +- [ ] **Step 6: Update `expire_flows` (lines 326-356)** + +Find `expire_flows`: + +```rust + for key in expired_keys { + // Flush salvageable data before removing + if let Some(flow) = self.flows.get_mut(&key) { + use crate::reassembly::handler::Direction; + for dir in [Direction::ClientToServer, Direction::ServerToClient] { + let flow_dir = flow.get_direction_mut(dir); + let flushed = flush_contiguous(flow_dir); + for (offset, data) in &flushed { + handler.on_data(&key, dir, data, *offset); + } + } + } + self.flows.remove(&key); + self.stats.flows_expired += 1; + handler.on_flow_close(&key, CloseReason::Timeout); + } + + self.update_memory(); +``` + +Replace with: + +```rust + for key in expired_keys { + let flow_mem = self.flows.get(&key).map(|f| f.memory_used()).unwrap_or(0); + // Flush salvageable data before removing + if let Some(flow) = self.flows.get_mut(&key) { + use crate::reassembly::handler::Direction; + for dir in [Direction::ClientToServer, Direction::ServerToClient] { + let flow_dir = flow.get_direction_mut(dir); + let flushed = flush_contiguous(flow_dir); + for (offset, data) in &flushed { + handler.on_data(&key, dir, data, *offset); + } + } + } + self.flows.remove(&key); + self.total_memory -= flow_mem; + self.stats.flows_expired += 1; + handler.on_flow_close(&key, CloseReason::Timeout); + } +``` + +- [ ] **Step 7: Update `finalize` (lines 358-377)** + +Find `finalize`: + +```rust + let all_keys: Vec = self.flows.keys().cloned().collect(); + for key in all_keys { + // Flush any remaining contiguous data before closing + if let Some(flow) = self.flows.get_mut(&key) { + for dir in [Direction::ClientToServer, Direction::ServerToClient] { + let flow_dir = flow.get_direction_mut(dir); + let flushed = flush_contiguous(flow_dir); + for (offset, data) in &flushed { + handler.on_data(&key, dir, data, *offset); + } + } + } + self.flows.remove(&key); + handler.on_flow_close(&key, CloseReason::Timeout); + } + self.update_memory(); +``` + +Replace with: + +```rust + let all_keys: Vec = self.flows.keys().cloned().collect(); + for key in all_keys { + let flow_mem = self.flows.get(&key).map(|f| f.memory_used()).unwrap_or(0); + // Flush any remaining contiguous data before closing + if let Some(flow) = self.flows.get_mut(&key) { + for dir in [Direction::ClientToServer, Direction::ServerToClient] { + let flow_dir = flow.get_direction_mut(dir); + let flushed = flush_contiguous(flow_dir); + for (offset, data) in &flushed { + handler.on_data(&key, dir, data, *offset); + } + } + } + self.flows.remove(&key); + self.total_memory -= flow_mem; + handler.on_flow_close(&key, CloseReason::Timeout); + } +``` + +- [ ] **Step 8: Update `evict_flows` (lines 398-436)** + +Find the eviction loop: + +```rust + for (key, _, _) in &candidates { + if self.total_memory <= self.config.memcap && self.flows.len() <= self.config.max_flows + { + break; + } + // Flush salvageable contiguous data before evicting + if let Some(flow) = self.flows.get_mut(key) { + use crate::reassembly::handler::Direction; + for dir in [Direction::ClientToServer, Direction::ServerToClient] { + let flow_dir = flow.get_direction_mut(dir); + let flushed = flush_contiguous(flow_dir); + for (offset, data) in &flushed { + handler.on_data(key, dir, data, *offset); + } + } + } + self.flows.remove(key); + self.stats.evictions += 1; + handler.on_flow_close(key, CloseReason::MemoryPressure); + self.update_memory(); + } +``` + +Replace with: + +```rust + for (key, _, _) in &candidates { + if self.total_memory <= self.config.memcap && self.flows.len() <= self.config.max_flows + { + break; + } + let flow_mem = self.flows.get(key).map(|f| f.memory_used()).unwrap_or(0); + // Flush salvageable contiguous data before evicting + if let Some(flow) = self.flows.get_mut(key) { + use crate::reassembly::handler::Direction; + for dir in [Direction::ClientToServer, Direction::ServerToClient] { + let flow_dir = flow.get_direction_mut(dir); + let flushed = flush_contiguous(flow_dir); + for (offset, data) in &flushed { + handler.on_data(key, dir, data, *offset); + } + } + } + self.flows.remove(key); + self.total_memory -= flow_mem; + self.stats.evictions += 1; + handler.on_flow_close(key, CloseReason::MemoryPressure); + } +``` + +- [ ] **Step 9: Remove `update_memory()` function** + +In `src/reassembly/mod.rs`, delete the `update_memory` function entirely: + +```rust + // DELETE THIS FUNCTION: + fn update_memory(&mut self) { + self.total_memory = self.flows.values().map(|f| f.memory_used()).sum(); + } +``` + +- [ ] **Step 10: Run full test suite** + +Run: `cargo test -- --nocapture` +Expected: All tests PASS — including existing engine tests (`test_three_packet_stream_ordered`, `test_out_of_order_delivery`, `test_rst_closes_flow`, `test_finalize_flushes_remaining`, `test_flow_timeout_expiration`) and the new `test_total_memory_tracking`. + +The `debug_assert_eq!` in `memory_used()` fires on every flow removal during tests, verifying that incremental `buffered_bytes` matches the recomputed sum. + +- [ ] **Step 11: Run clippy and fmt** + +Run: `cargo clippy -- -D warnings && cargo fmt --check` +Expected: No warnings, no formatting issues. + +- [ ] **Step 12: Commit** + +```bash +git add src/reassembly/mod.rs tests/reassembly_engine_tests.rs +git commit -m "perf: remove update_memory() and track total_memory incrementally" +``` From 92215b9d6a3051841694131ef0284935d9f7421a Mon Sep 17 00:00:00 2001 From: Zious Date: Mon, 6 Apr 2026 20:34:02 -0500 Subject: [PATCH 3/8] perf: add buffered_bytes field and track on segment insert --- src/reassembly/flow.rs | 2 ++ src/reassembly/segment.rs | 12 ++++++++++-- tests/reassembly_flow_tests.rs | 1 + tests/reassembly_segment_tests.rs | 20 ++++++++++++++++++++ 4 files changed, 33 insertions(+), 2 deletions(-) diff --git a/src/reassembly/flow.rs b/src/reassembly/flow.rs index 8bc3512..d1779d2 100644 --- a/src/reassembly/flow.rs +++ b/src/reassembly/flow.rs @@ -57,6 +57,7 @@ pub struct FlowDirection { pub isn: Option, pub base_offset: u64, pub segments: BTreeMap>, + pub buffered_bytes: usize, pub reassembled_bytes: usize, pub overlap_count: u32, pub overlap_alert_fired: bool, @@ -79,6 +80,7 @@ impl FlowDirection { isn: None, base_offset: 0, segments: BTreeMap::new(), + buffered_bytes: 0, reassembled_bytes: 0, overlap_count: 0, overlap_alert_fired: false, diff --git a/src/reassembly/segment.rs b/src/reassembly/segment.rs index 409edcf..f5ab05a 100644 --- a/src/reassembly/segment.rs +++ b/src/reassembly/segment.rs @@ -152,7 +152,11 @@ pub fn insert_segment( if start_idx < segment_data.len() && end_idx <= segment_data.len() { let gap_data = segment_data[start_idx..end_idx].to_vec(); if !gap_data.is_empty() { - dir.segments.insert(gap_start, gap_data); + let gap_len = gap_data.len(); + if let Some(old) = dir.segments.insert(gap_start, gap_data) { + dir.buffered_bytes -= old.len(); + } + dir.buffered_bytes += gap_len; } } } @@ -168,7 +172,11 @@ pub fn insert_segment( } // No overlap — insert normally - dir.segments.insert(offset, segment_data); + let data_len = segment_data.len(); + if let Some(old) = dir.segments.insert(offset, segment_data) { + dir.buffered_bytes -= old.len(); + } + dir.buffered_bytes += data_len; if truncated { InsertResult::Truncated diff --git a/tests/reassembly_flow_tests.rs b/tests/reassembly_flow_tests.rs index 34530f8..19786b7 100644 --- a/tests/reassembly_flow_tests.rs +++ b/tests/reassembly_flow_tests.rs @@ -95,6 +95,7 @@ fn test_flow_direction_new() { assert_eq!(dir.base_offset, 0); assert!(dir.segments.is_empty()); assert_eq!(dir.reassembled_bytes, 0); + assert_eq!(dir.buffered_bytes, 0); assert!(!dir.fin_seen); assert!(!dir.rst_seen); assert!(!dir.depth_exceeded); diff --git a/tests/reassembly_segment_tests.rs b/tests/reassembly_segment_tests.rs index 1ab0d12..82e8f33 100644 --- a/tests/reassembly_segment_tests.rs +++ b/tests/reassembly_segment_tests.rs @@ -148,6 +148,26 @@ fn test_small_segment_tracking() { assert_eq!(dir.small_segment_count, 5); } +#[test] +fn test_buffered_bytes_after_insert() { + let mut dir = FlowDirection::new(); + dir.set_isn(1000); + insert_segment(&mut dir, 1001, b"hello", 10_485_760, 10_000); + assert_eq!(dir.buffered_bytes, 5); + insert_segment(&mut dir, 1006, b"world", 10_485_760, 10_000); + assert_eq!(dir.buffered_bytes, 10); +} + +#[test] +fn test_buffered_bytes_after_overlap() { + let mut dir = FlowDirection::new(); + dir.set_isn(1000); + insert_segment(&mut dir, 1001, b"AAABBB", 10_485_760, 10_000); + assert_eq!(dir.buffered_bytes, 6); + insert_segment(&mut dir, 1004, b"XXXCC", 10_485_760, 10_000); + assert_eq!(dir.buffered_bytes, 8); // 6 original + 2 gap bytes +} + #[test] fn test_depth_limit_truncation() { let mut dir = FlowDirection::new(); From 2c653fe23deb4bbcbe2da943d80abede3c9836e7 Mon Sep 17 00:00:00 2001 From: Zious Date: Mon, 6 Apr 2026 20:39:13 -0500 Subject: [PATCH 4/8] perf: track buffered_bytes on flush and add debug assertion in memory_used() --- src/reassembly/flow.rs | 7 ++++++- src/reassembly/segment.rs | 1 + tests/reassembly_segment_tests.rs | 30 ++++++++++++++++++++++++++++++ 3 files changed, 37 insertions(+), 1 deletion(-) diff --git a/src/reassembly/flow.rs b/src/reassembly/flow.rs index d1779d2..c7da2dd 100644 --- a/src/reassembly/flow.rs +++ b/src/reassembly/flow.rs @@ -107,7 +107,12 @@ impl FlowDirection { } pub fn memory_used(&self) -> usize { - self.segments.values().map(|v| v.len()).sum() + debug_assert_eq!( + self.buffered_bytes, + self.segments.values().map(|v| v.len()).sum::(), + "buffered_bytes counter drifted from actual segment sizes" + ); + self.buffered_bytes } } diff --git a/src/reassembly/segment.rs b/src/reassembly/segment.rs index f5ab05a..080b437 100644 --- a/src/reassembly/segment.rs +++ b/src/reassembly/segment.rs @@ -192,6 +192,7 @@ pub fn flush_contiguous(dir: &mut FlowDirection) -> Vec<(u64, Vec)> { while let Some(data) = dir.segments.remove(&dir.base_offset) { let offset = dir.base_offset; + dir.buffered_bytes -= data.len(); dir.base_offset += data.len() as u64; dir.reassembled_bytes += data.len(); flushed.push((offset, data)); diff --git a/tests/reassembly_segment_tests.rs b/tests/reassembly_segment_tests.rs index 82e8f33..c9c3e5e 100644 --- a/tests/reassembly_segment_tests.rs +++ b/tests/reassembly_segment_tests.rs @@ -168,6 +168,36 @@ fn test_buffered_bytes_after_overlap() { assert_eq!(dir.buffered_bytes, 8); // 6 original + 2 gap bytes } +#[test] +fn test_buffered_bytes_after_flush() { + let mut dir = FlowDirection::new(); + dir.set_isn(1000); + + insert_segment(&mut dir, 1001, b"hello", 10_485_760, 10_000); + assert_eq!(dir.buffered_bytes, 5); + + let flushed = flush_contiguous(&mut dir); + assert_eq!(flushed.len(), 1); + assert_eq!(dir.buffered_bytes, 0); +} + +#[test] +fn test_buffered_bytes_partial_flush() { + let mut dir = FlowDirection::new(); + dir.set_isn(1000); + + // Insert segment at offset 1 (contiguous) and offset 10 (gap) + insert_segment(&mut dir, 1001, b"aaa", 10_485_760, 10_000); + insert_segment(&mut dir, 1010, b"bbb", 10_485_760, 10_000); + assert_eq!(dir.buffered_bytes, 6); + + // Flush only flushes contiguous segment at offset 1 + let flushed = flush_contiguous(&mut dir); + assert_eq!(flushed.len(), 1); + assert_eq!(flushed[0].1, b"aaa"); + assert_eq!(dir.buffered_bytes, 3); // "bbb" remains buffered +} + #[test] fn test_depth_limit_truncation() { let mut dir = FlowDirection::new(); From 5480d454f2b98b17dcaa55a62fe80a8f7fe93c28 Mon Sep 17 00:00:00 2001 From: Zious Date: Mon, 6 Apr 2026 20:43:12 -0500 Subject: [PATCH 5/8] perf: use BTreeMap::range() for overlap detection and replace O(n) buffered sum --- src/reassembly/segment.rs | 5 ++-- tests/reassembly_segment_tests.rs | 40 +++++++++++++++++++++++++++++++ 2 files changed, 43 insertions(+), 2 deletions(-) diff --git a/src/reassembly/segment.rs b/src/reassembly/segment.rs index 080b437..76b842d 100644 --- a/src/reassembly/segment.rs +++ b/src/reassembly/segment.rs @@ -59,7 +59,7 @@ pub fn insert_segment( let mut segment_data = data.to_vec(); // Truncate if exceeding depth - let buffered: usize = dir.segments.values().map(|v| v.len()).sum(); + let buffered = dir.buffered_bytes; let total_after = dir.reassembled_bytes + buffered + segment_data.len(); let truncated = if total_after > max_depth { let allowed = max_depth.saturating_sub(dir.reassembled_bytes + buffered); @@ -82,7 +82,8 @@ pub fn insert_segment( let mut has_conflict = false; let mut trimmed_ranges: Vec<(u64, u64)> = Vec::new(); - for (&existing_offset, existing_data) in dir.segments.iter() { + // Only segments starting before new_end can overlap [new_start, new_end). + for (&existing_offset, existing_data) in dir.segments.range(..new_end) { let existing_end = existing_offset + existing_data.len() as u64; if new_start < existing_end && new_end > existing_offset { diff --git a/tests/reassembly_segment_tests.rs b/tests/reassembly_segment_tests.rs index c9c3e5e..dd651ef 100644 --- a/tests/reassembly_segment_tests.rs +++ b/tests/reassembly_segment_tests.rs @@ -220,3 +220,43 @@ fn test_depth_limit_truncation() { assert_eq!(flushed[0].1.len(), 20); // truncated from 50 to 20 assert_eq!(dir.reassembled_bytes, 100); } + +#[test] +fn test_overlap_detection_boundary() { + let mut dir = FlowDirection::new(); + dir.set_isn(1000); + + // Insert segment at offset 1, length 5 (covers 1-5) + insert_segment(&mut dir, 1001, b"AAAAA", 10_485_760, 10_000); + // Insert segment at offset 10, length 5 (covers 10-14) — no overlap with above + insert_segment(&mut dir, 1010, b"BBBBB", 10_485_760, 10_000); + assert_eq!(dir.segments.len(), 2); + assert_eq!(dir.overlap_count, 0); + + // Insert segment at offset 3, length 4 (covers 3-6) — overlaps first, not second + let result = insert_segment(&mut dir, 1003, b"XXXX", 10_485_760, 10_000); + assert_eq!(result, InsertResult::PartialOverlap); + assert_eq!(dir.overlap_count, 1); + + // Insert segment at offset 6, length 4 (covers 6-9). + // The partial-overlap insert above deposited a 1-byte gap at offset 6 ("X"), + // so this segment overlaps that byte and is PartialOverlap. + let result = insert_segment(&mut dir, 1006, b"CCCC", 10_485_760, 10_000); + assert_eq!(result, InsertResult::PartialOverlap); + assert_eq!(dir.overlap_count, 2); +} + +#[test] +fn test_range_boundary_exact_new_end() { + let mut dir = FlowDirection::new(); + dir.set_isn(1000); + + // Insert segment at offset 1, length 5 (covers 1-5, ends at 6) + insert_segment(&mut dir, 1001, b"AAAAA", 10_485_760, 10_000); + + // Insert segment starting exactly at the end of the first (offset 6) + // This should NOT overlap — range(..new_end) must exclude it + let result = insert_segment(&mut dir, 1006, b"BBBBB", 10_485_760, 10_000); + assert_eq!(result, InsertResult::Inserted); + assert_eq!(dir.overlap_count, 0); +} From 551177d52b6fe806ca5bb9d97c0d0578bd9076a1 Mon Sep 17 00:00:00 2001 From: Zious Date: Mon, 6 Apr 2026 20:50:18 -0500 Subject: [PATCH 6/8] perf: remove update_memory() and track total_memory incrementally --- src/reassembly/mod.rs | 35 +++++++++++++++++++++----------- tests/reassembly_engine_tests.rs | 32 +++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+), 12 deletions(-) diff --git a/src/reassembly/mod.rs b/src/reassembly/mod.rs index 5927078..ec5aecd 100644 --- a/src/reassembly/mod.rs +++ b/src/reassembly/mod.rs @@ -161,6 +161,11 @@ impl TcpReassembler { flow.on_rst(); self.stats.flows_rst += 1; let key_clone = key.clone(); + let flow_mem = self + .flows + .get(&key_clone) + .map(|f| f.memory_used()) + .unwrap_or(0); // Flush buffered contiguous data before removing if let Some(flow) = self.flows.get_mut(&key_clone) { use crate::reassembly::handler::Direction; @@ -175,7 +180,7 @@ impl TcpReassembler { } handler.on_flow_close(&key_clone, CloseReason::Rst); self.flows.remove(&key_clone); - self.update_memory(); + self.total_memory -= flow_mem; return; } @@ -209,6 +214,7 @@ impl TcpReassembler { } let flow_dir = flow.get_direction_mut(dir); + let before_insert = flow_dir.buffered_bytes; let result = insert_segment( flow_dir, seq, @@ -216,6 +222,7 @@ impl TcpReassembler { self.config.max_depth, self.config.max_segments_per_direction, ); + self.total_memory += flow_dir.buffered_bytes - before_insert; match result { InsertResult::Inserted => self.stats.segments_inserted += 1, @@ -282,7 +289,9 @@ impl TcpReassembler { // Flush contiguous data let flow = self.flows.get_mut(&key).unwrap(); let flow_dir = flow.get_direction_mut(dir); + let before_flush = flow_dir.buffered_bytes; let flushed = flush_contiguous(flow_dir); + self.total_memory -= before_flush - flow_dir.buffered_bytes; for (offset, data) in &flushed { self.stats.bytes_reassembled += data.len() as u64; @@ -296,6 +305,7 @@ impl TcpReassembler { .get(&key) .is_some_and(|f| f.state == FlowState::Closed) { + let flow_mem = self.flows.get(&key).map(|f| f.memory_used()).unwrap_or(0); // Flush remaining data in both directions before removal if let Some(flow) = self.flows.get_mut(&key) { use crate::reassembly::handler::Direction; @@ -311,11 +321,9 @@ impl TcpReassembler { self.stats.flows_fin += 1; handler.on_flow_close(&key, CloseReason::Fin); self.flows.remove(&key); + self.total_memory -= flow_mem; } - // 11. Update total memory tracking - self.update_memory(); - // 12. Evict flows if memcap exceeded if self.total_memory > self.config.memcap { self.evict_flows(handler); @@ -336,6 +344,7 @@ impl TcpReassembler { .collect(); for key in expired_keys { + let flow_mem = self.flows.get(&key).map(|f| f.memory_used()).unwrap_or(0); // Flush salvageable data before removing if let Some(flow) = self.flows.get_mut(&key) { use crate::reassembly::handler::Direction; @@ -348,11 +357,10 @@ impl TcpReassembler { } } self.flows.remove(&key); + self.total_memory -= flow_mem; self.stats.flows_expired += 1; handler.on_flow_close(&key, CloseReason::Timeout); } - - self.update_memory(); } /// Close all remaining flows (called at end of capture). @@ -360,6 +368,7 @@ impl TcpReassembler { use crate::reassembly::handler::Direction; let all_keys: Vec = self.flows.keys().cloned().collect(); for key in all_keys { + let flow_mem = self.flows.get(&key).map(|f| f.memory_used()).unwrap_or(0); // Flush any remaining contiguous data before closing if let Some(flow) = self.flows.get_mut(&key) { for dir in [Direction::ClientToServer, Direction::ServerToClient] { @@ -371,9 +380,9 @@ impl TcpReassembler { } } self.flows.remove(&key); + self.total_memory -= flow_mem; handler.on_flow_close(&key, CloseReason::Timeout); } - self.update_memory(); } /// Return a reference to current stats. @@ -386,12 +395,13 @@ impl TcpReassembler { &self.findings } - // --- Private helpers --- - - fn update_memory(&mut self) { - self.total_memory = self.flows.values().map(|f| f.memory_used()).sum(); + /// Return the current total memory used by all flow buffers. + pub fn total_memory(&self) -> usize { + self.total_memory } + // --- Private helpers --- + /// Evict flows when memcap is exceeded. /// Strategy: evict non-established flows first (sorted by LRU), /// then established flows by LRU. @@ -417,6 +427,7 @@ impl TcpReassembler { { break; } + let flow_mem = self.flows.get(key).map(|f| f.memory_used()).unwrap_or(0); // Flush salvageable contiguous data before evicting if let Some(flow) = self.flows.get_mut(key) { use crate::reassembly::handler::Direction; @@ -429,9 +440,9 @@ impl TcpReassembler { } } self.flows.remove(key); + self.total_memory -= flow_mem; self.stats.evictions += 1; handler.on_flow_close(key, CloseReason::MemoryPressure); - self.update_memory(); } } diff --git a/tests/reassembly_engine_tests.rs b/tests/reassembly_engine_tests.rs index 010ee33..b9b2b7f 100644 --- a/tests/reassembly_engine_tests.rs +++ b/tests/reassembly_engine_tests.rs @@ -167,6 +167,7 @@ fn test_rst_closes_flow() { assert_eq!(handler.close_events.len(), 1); assert_eq!(handler.close_events[0].1, CloseReason::Rst); + assert_eq!(reassembler.total_memory(), 0); } #[test] @@ -223,4 +224,35 @@ fn test_flow_timeout_expiration() { let stats = reassembler.stats(); assert_eq!(stats.flows_expired, 1); + assert_eq!(reassembler.total_memory(), 0); +} + +#[test] +fn test_total_memory_tracking() { + let config = ReassemblyConfig::default(); + let mut reassembler = TcpReassembler::new(config); + let mut handler = RecordingHandler::new(); + + let client = [10, 0, 0, 1]; + let server = [10, 0, 0, 2]; + + // SYN — no payload, no memory change + let syn = make_tcp_packet(client, 12345, server, 80, 1000, &[], true, false, false); + reassembler.process_packet(&syn, 1, &mut handler); + + // Out-of-order segment — buffered (not flushed) + let p2 = make_tcp_packet(client, 12345, server, 80, 1004, b"bbb", false, false, false); + reassembler.process_packet(&p2, 2, &mut handler); + assert!(handler.data_events.is_empty()); + assert_eq!(reassembler.total_memory(), 3); // "bbb" buffered + + // In-order segment — triggers flush of both + let p1 = make_tcp_packet(client, 12345, server, 80, 1001, b"aaa", false, false, false); + reassembler.process_packet(&p1, 3, &mut handler); + assert_eq!(handler.all_data(), b"aaabbb"); + assert_eq!(reassembler.total_memory(), 0); // all flushed + + // Finalize — closes flow + reassembler.finalize(&mut handler); + assert_eq!(reassembler.total_memory(), 0); } From cdb9ec9981065cd75aa84dabc104d626df82e97b Mon Sep 17 00:00:00 2001 From: Zious Date: Mon, 6 Apr 2026 21:15:20 -0500 Subject: [PATCH 7/8] fix: address PR review findings for reassembly perf - Use saturating_sub for insert delta to prevent usize underflow - Add debug_assert! at both defensive insert guard sites - Replace unwrap_or(0) with expect() at all 5 flow removal sites - Add FIN-close total_memory test with buffered data - Add buffered_bytes assertion to retransmission dedup test - Add comments explaining pre-flush capture pattern --- src/reassembly/mod.rs | 39 +++++++++++++++++++++++++------ src/reassembly/segment.rs | 16 +++++++++++-- tests/reassembly_engine_tests.rs | 36 ++++++++++++++++++++++++++++ tests/reassembly_segment_tests.rs | 1 + 4 files changed, 83 insertions(+), 9 deletions(-) diff --git a/src/reassembly/mod.rs b/src/reassembly/mod.rs index ec5aecd..e6b4e05 100644 --- a/src/reassembly/mod.rs +++ b/src/reassembly/mod.rs @@ -161,11 +161,13 @@ impl TcpReassembler { flow.on_rst(); self.stats.flows_rst += 1; let key_clone = key.clone(); + // Capture memory before flushing: total_memory still holds this flow's + // full contribution. Subtracting flow_mem after removal zeros it out. let flow_mem = self .flows .get(&key_clone) - .map(|f| f.memory_used()) - .unwrap_or(0); + .expect("flow must exist before RST removal") + .memory_used(); // Flush buffered contiguous data before removing if let Some(flow) = self.flows.get_mut(&key_clone) { use crate::reassembly::handler::Direction; @@ -222,7 +224,13 @@ impl TcpReassembler { self.config.max_depth, self.config.max_segments_per_direction, ); - self.total_memory += flow_dir.buffered_bytes - before_insert; + debug_assert!( + flow_dir.buffered_bytes >= before_insert, + "insert_segment decreased buffered_bytes: before={} after={}", + before_insert, + flow_dir.buffered_bytes + ); + self.total_memory += flow_dir.buffered_bytes.saturating_sub(before_insert); match result { InsertResult::Inserted => self.stats.segments_inserted += 1, @@ -305,7 +313,12 @@ impl TcpReassembler { .get(&key) .is_some_and(|f| f.state == FlowState::Closed) { - let flow_mem = self.flows.get(&key).map(|f| f.memory_used()).unwrap_or(0); + // Capture memory before flushing (see RST handler comment for rationale) + let flow_mem = self + .flows + .get(&key) + .expect("flow must exist before FIN removal") + .memory_used(); // Flush remaining data in both directions before removal if let Some(flow) = self.flows.get_mut(&key) { use crate::reassembly::handler::Direction; @@ -344,7 +357,11 @@ impl TcpReassembler { .collect(); for key in expired_keys { - let flow_mem = self.flows.get(&key).map(|f| f.memory_used()).unwrap_or(0); + let flow_mem = self + .flows + .get(&key) + .expect("expired flow must exist") + .memory_used(); // Flush salvageable data before removing if let Some(flow) = self.flows.get_mut(&key) { use crate::reassembly::handler::Direction; @@ -368,7 +385,11 @@ impl TcpReassembler { use crate::reassembly::handler::Direction; let all_keys: Vec = self.flows.keys().cloned().collect(); for key in all_keys { - let flow_mem = self.flows.get(&key).map(|f| f.memory_used()).unwrap_or(0); + let flow_mem = self + .flows + .get(&key) + .expect("finalize flow must exist") + .memory_used(); // Flush any remaining contiguous data before closing if let Some(flow) = self.flows.get_mut(&key) { for dir in [Direction::ClientToServer, Direction::ServerToClient] { @@ -427,7 +448,11 @@ impl TcpReassembler { { break; } - let flow_mem = self.flows.get(key).map(|f| f.memory_used()).unwrap_or(0); + let flow_mem = self + .flows + .get(key) + .expect("eviction candidate must exist") + .memory_used(); // Flush salvageable contiguous data before evicting if let Some(flow) = self.flows.get_mut(key) { use crate::reassembly::handler::Direction; diff --git a/src/reassembly/segment.rs b/src/reassembly/segment.rs index 76b842d..4a05072 100644 --- a/src/reassembly/segment.rs +++ b/src/reassembly/segment.rs @@ -154,7 +154,13 @@ pub fn insert_segment( let gap_data = segment_data[start_idx..end_idx].to_vec(); if !gap_data.is_empty() { let gap_len = gap_data.len(); - if let Some(old) = dir.segments.insert(gap_start, gap_data) { + let old = dir.segments.insert(gap_start, gap_data); + debug_assert!( + old.is_none(), + "gap_start {} collided with existing segment", + gap_start + ); + if let Some(old) = old { dir.buffered_bytes -= old.len(); } dir.buffered_bytes += gap_len; @@ -174,7 +180,13 @@ pub fn insert_segment( // No overlap — insert normally let data_len = segment_data.len(); - if let Some(old) = dir.segments.insert(offset, segment_data) { + let old = dir.segments.insert(offset, segment_data); + debug_assert!( + old.is_none(), + "offset {} collided with existing segment in no-overlap path", + offset + ); + if let Some(old) = old { dir.buffered_bytes -= old.len(); } dir.buffered_bytes += data_len; diff --git a/tests/reassembly_engine_tests.rs b/tests/reassembly_engine_tests.rs index b9b2b7f..4fe4a15 100644 --- a/tests/reassembly_engine_tests.rs +++ b/tests/reassembly_engine_tests.rs @@ -256,3 +256,39 @@ fn test_total_memory_tracking() { reassembler.finalize(&mut handler); assert_eq!(reassembler.total_memory(), 0); } + +#[test] +fn test_fin_close_total_memory() { + let config = ReassemblyConfig::default(); + let mut reassembler = TcpReassembler::new(config); + let mut handler = RecordingHandler::new(); + + let client = [10, 0, 0, 1]; + let server = [10, 0, 0, 2]; + + // SYN from client + let syn = make_tcp_packet(client, 12345, server, 80, 1000, &[], true, false, false); + reassembler.process_packet(&syn, 1, &mut handler); + + // Out-of-order data — stays buffered (gap at offset 1) + let p2 = make_tcp_packet(client, 12345, server, 80, 1004, b"bbb", false, false, false); + reassembler.process_packet(&p2, 2, &mut handler); + assert_eq!(reassembler.total_memory(), 3); + + // FIN from client (first FIN) + let fin1 = make_tcp_packet(client, 12345, server, 80, 1007, &[], false, true, false); + reassembler.process_packet(&fin1, 3, &mut handler); + + // FIN from server (second FIN → Closed, triggers step 10 removal) + let fin2 = make_tcp_packet(server, 80, client, 12345, 2000, &[], false, true, false); + reassembler.process_packet(&fin2, 4, &mut handler); + + // Flow removed with buffered-but-not-flushed data — total_memory must be 0 + assert_eq!(reassembler.total_memory(), 0); + assert!( + handler + .close_events + .iter() + .any(|(_, r)| *r == CloseReason::Fin) + ); +} diff --git a/tests/reassembly_segment_tests.rs b/tests/reassembly_segment_tests.rs index dd651ef..c5ef1ef 100644 --- a/tests/reassembly_segment_tests.rs +++ b/tests/reassembly_segment_tests.rs @@ -72,6 +72,7 @@ fn test_retransmission_dedup() { let result = insert_segment(&mut dir, 1001, b"hello", 10_485_760, 10_000); assert_eq!(result, InsertResult::Duplicate); assert_eq!(dir.segments.len(), 1); // No duplicate stored + assert_eq!(dir.buffered_bytes, 5); // counter must not double-count } #[test] From 30d2282cc82428bf649587b9987c84fc288157e6 Mon Sep 17 00:00:00 2001 From: Zious Date: Mon, 6 Apr 2026 21:24:34 -0500 Subject: [PATCH 8/8] docs: update spec/plan to reflect total_memory() accessor and past tense --- docs/superpowers/plans/2026-04-06-reassembly-perf.md | 2 +- .../specs/2026-04-06-reassembly-perf-design.md | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/docs/superpowers/plans/2026-04-06-reassembly-perf.md b/docs/superpowers/plans/2026-04-06-reassembly-perf.md index f7a1a72..d8b9165 100644 --- a/docs/superpowers/plans/2026-04-06-reassembly-perf.md +++ b/docs/superpowers/plans/2026-04-06-reassembly-perf.md @@ -424,7 +424,7 @@ fn test_total_memory_tracking() { } ``` -Note: We cannot directly assert `total_memory` because it is a private field. The test verifies correct behavior indirectly — the `debug_assert_eq!` inside `memory_used()` (called during finalize's flow removal) will panic if counters have drifted. If this test passes, incremental tracking is correct. +Note: The PR exposes `TcpReassembler::total_memory()` publicly, so the test asserts on it directly at each checkpoint (after buffering, after flush, after finalize). - [ ] **Step 2: Run test to verify it passes with current `update_memory()`** diff --git a/docs/superpowers/specs/2026-04-06-reassembly-perf-design.md b/docs/superpowers/specs/2026-04-06-reassembly-perf-design.md index 4155cf6..6fc38bb 100644 --- a/docs/superpowers/specs/2026-04-06-reassembly-perf-design.md +++ b/docs/superpowers/specs/2026-04-06-reassembly-perf-design.md @@ -1,17 +1,17 @@ # TCP Reassembly Performance Optimization Design **Issue:** #11 — perf: use BTreeMap::range() and incremental memory tracking in reassembly -**Scope:** Three targeted O(n) → O(1)/O(log n) optimizations in the reassembly engine internals. No behavioral changes, no API changes. +**Scope:** Three targeted O(n) → O(1)/O(log n) optimizations in the reassembly engine internals. No behavioral changes. Adds `total_memory()` public accessor for testability. ## Problem -The TCP reassembly engine has three O(n) hotspots identified during PR #10 review: +Before these optimizations, the TCP reassembly engine had three O(n) hotspots identified during PR #10 review: -1. **Overlap detection** (`segment.rs:85`): iterates all segments in the BTreeMap per insert, even though only segments starting before `new_end` can overlap. -2. **`buffered_bytes` computation** (`segment.rs:62`): recomputes `segments.values().map(|v| v.len()).sum()` on every insert — O(n) per call. -3. **`update_memory()` recomputation** (`mod.rs:391`): iterates all flows × all segments per direction to sum total memory — O(m × n). Called at 5 sites per packet (lines 178, 317, 355, 376, 434). +1. **Overlap detection** (`segment.rs:85`): iterated all segments in the BTreeMap per insert, even though only segments starting before `new_end` could overlap. +2. **`buffered_bytes` computation** (`segment.rs:62`): recomputed `segments.values().map(|v| v.len()).sum()` on every insert — O(n) per call. +3. **`update_memory()` recomputation** (`mod.rs:391`): iterated all flows × all segments per direction to sum total memory — O(m × n). Called at 5 sites per packet (lines 178, 317, 355, 376, 434). -With `max_segments_per_direction = 10,000` and `max_flows = 100,000`, these are bounded but wasteful. +With `max_segments_per_direction = 10,000` and `max_flows = 100,000`, these costs were bounded but wasteful. ## Approach