diff --git a/benches/network.rs b/benches/network.rs index ca2ec9d0..c3441f24 100644 --- a/benches/network.rs +++ b/benches/network.rs @@ -170,8 +170,8 @@ mod linux_benches { /// /// The timed section is a single `poll()` call on the pre-populated stack, /// so the measurement reflects the NAT-walk cost at that table size. - /// Today the walk is `O(n)`; the unified flow table planned for Phase 4 - /// should keep the same asymptotic complexity but with smaller constants. + /// Today the walk is `O(n)`; the unified flow table keeps the same + /// asymptotic complexity but with smaller per-entry constants. #[divan::bench(args = [1, 100, 1000])] fn poll_with_n_flows(bencher: Bencher, n: usize) { let mut stack = SlirpBackend::new().unwrap(); @@ -276,9 +276,9 @@ mod linux_benches { }); } - /// Pure-compute bench for `nat::translate_outbound`. Phase 5 baseline - /// for future hasher / data-structure changes (e.g. moving deny_cidrs - /// from `Vec` to a longest-prefix trie). Tens of nanoseconds + /// Pure-compute bench for `nat::translate_outbound`. Baseline for future + /// hasher / data-structure changes (e.g. moving deny_cidrs from + /// `Vec` to a longest-prefix trie). Tens of nanoseconds /// expected; microseconds would indicate an allocation in the hot path. #[divan::bench] fn nat_translate_outbound_hot_path(bencher: Bencher) { @@ -305,13 +305,13 @@ mod linux_benches { /// Measures TCP bulk throughput through the SLIRP relay under backpressure. /// /// Pushes 1 MiB through the relay in 1 KiB chunks with a constrained host - /// receiver (`SO_RCVBUF=4096`) so the post-Phase-3 backpressure path is - /// exercised every iteration. Divan reports throughput in MB/s alongside - /// per-iteration latency, giving a numerical regression signal for the - /// passt-style sequence-mirroring + don't-ACK-on-EAGAIN backpressure path. + /// receiver (`SO_RCVBUF=4096`) so the backpressure path is exercised every + /// iteration. Divan reports throughput in MB/s alongside per-iteration + /// latency, giving a numerical regression signal for the passt-style + /// sequence-mirroring + don't-ACK-on-EAGAIN backpressure path. /// /// The 95% delivery threshold mirrors `tcp_writes_more_than_256kb_succeed` - /// — the binary contract test for Phase 3. + /// — the binary contract test for TCP backpressure correctness. #[divan::bench(sample_count = 10)] fn tcp_bulk_throughput_1mb(bencher: Bencher) { use smoltcp::wire::TcpControl; @@ -612,13 +612,12 @@ mod linux_benches { /// Open `n/3` TCP + `n/3` UDP + `n/3` ICMP-echo flows, then time `poll()`. /// - /// Mirrors `poll_with_n_flows` (TCP-only) but exercises Phase 4's - /// unified `flow_table` with all three protocols populated. Catches - /// enum-dispatch + filter regressions at scale: each `relay_*_data` - /// loop now `filter(|k| matches!(k, FlowKey::Foo(_)))` over the unified - /// table, so per-protocol scan cost is `O(total_flows)` not - /// `O(this_protocol's_flows)`. This bench is the regression gate for - /// that change. + /// Mirrors `poll_with_n_flows` (TCP-only) but exercises the unified + /// `flow_table` with all three protocols populated. Catches enum-dispatch + /// and filter regressions at scale: each `relay_*_data` loop filters + /// by `FlowKey` variant over the unified table, so per-protocol scan cost + /// is `O(total_flows)` not `O(this_protocol's_flows)`. This bench is the + /// regression gate for that property. #[divan::bench(args = [3, 99, 999])] fn poll_with_n_mixed_flows(bencher: Bencher, n: usize) { let mut stack = SlirpBackend::new().unwrap(); @@ -649,10 +648,10 @@ mod linux_benches { /// Insert + remove `n` flow-table entries using synthetic data. /// - /// Pure-compute baseline for the unified `HashMap` - /// in Phase 4. Phase 5+ reference number for hasher experiments - /// (foldhash, ahash, SipHash) or container-shape changes (e.g. - /// hashbrown raw API). Uses synthetic `u32` values instead of real + /// Pure-compute baseline for the unified `HashMap`. + /// Reference number for hasher experiments (foldhash, ahash, SipHash) + /// or container-shape changes (e.g. hashbrown raw API). Uses synthetic + /// `u32` values instead of real /// `TcpNatEntry` (which requires TcpStream) to isolate HashMap /// mechanics from socket cloning overhead — the real cost is /// HashMap insert/remove, not socket ops. @@ -784,8 +783,8 @@ mod linux_benches { } /// Pure-compute cost of synthesizing an inbound SYN frame for - /// port-forwarding (Phase 5.5b.2). No stack allocation or guest frame - /// processing — just the `build_tcp_packet_static` wire encoding. + /// port-forwarding. No stack allocation or guest frame processing — + /// just the `build_tcp_packet_static` wire encoding. /// /// Expected magnitude: sub-microsecond (pure packet construction). /// @@ -843,8 +842,8 @@ mod linux_benches { /// not a bug. Regressions in the inbound state machine or the listener /// poll loop will shift the distribution upward beyond 50 ms. /// - /// Phase 5.5b baseline. Regressions in the inbound state machine or - /// listener-poll loop will surface numerically against this measurement. + /// Regressions in the inbound state machine or listener-poll loop will + /// surface numerically against this measurement. #[divan::bench(sample_count = 20, sample_size = 1)] fn port_forward_accept_latency(bencher: Bencher) { const GUEST_PORT: u16 = 8080; @@ -897,4 +896,144 @@ mod linux_benches { worker.join().expect("worker thread panicked"); }); } + + /// Cost of one `drain_to_guest` call when one TCP flow is `Established` + /// and the host kernel has data ready to relay. + /// + /// Captures the per-packet SLIRP dispatch overhead via epoll: epoll_wait + /// (non-blocking, zero-timeout), readiness scan, peek, and Ethernet frame + /// construction. Only the flows with data ready are dispatched — flows + /// with nothing to relay are skipped. + /// + /// This bench cannot exercise the `net_poll_thread` 50 ms epoll cycle + /// (that thread does not run inside divan). The wall-clock latency floor + /// is captured separately by `voidbox-network-bench`'s `tcp_rx_latency_us_p50` + /// field; see that binary's `Report` struct for the measurement shape. + /// + /// Requires the `bench-helpers` feature (compile with + /// `cargo bench --features bench-helpers`). + #[cfg(feature = "bench-helpers")] + #[divan::bench(sample_count = 50, sample_size = 10)] + fn tcp_rx_latency_one_packet(bencher: Bencher) { + use smoltcp::wire::TcpControl; + use std::io::Write; + use std::net::TcpListener; + + const GUEST_SRC_PORT: u16 = 49155; + const INITIAL_GUEST_SEQ: u32 = 5000; + const PAYLOAD: &[u8] = &[0xAB; 64]; + + // Build a fresh stack with one Established TCP flow. Setup happens + // outside the timed loop so divan only measures the relay dispatch. + let listener = TcpListener::bind("127.0.0.1:0").unwrap(); + let host_port = listener.local_addr().unwrap().port(); + let server_thread = thread::spawn(move || listener.accept().unwrap()); + + let mut stack = SlirpBackend::new().unwrap(); + + // 3-way handshake: guest sends SYN → stack produces SYN-ACK → guest + // sends ACK. This mirrors `tcp_bulk_throughput_1mb` setup. + let syn = build_tcp_syn_for_latency_bench(GUEST_SRC_PORT, host_port, INITIAL_GUEST_SEQ); + stack.process_guest_frame(&syn).unwrap(); + + // Drain for up to 200 ms to collect the SYN-ACK. + let mut drain_frames: Vec> = Vec::new(); + let gateway_seq = { + let deadline = std::time::Instant::now() + Duration::from_millis(200); + loop { + drain_frames.clear(); + stack.drain_to_guest(&mut drain_frames); + if let Some((seq, _, _, _)) = drain_frames + .iter() + .find_map(|f| parse_tcp_to_guest_frame(f)) + { + break seq; + } + if std::time::Instant::now() > deadline { + panic!("no SYN-ACK within deadline"); + } + thread::sleep(Duration::from_millis(5)); + } + }; + + // Complete the handshake: guest sends ACK. + let ack = build_tcp_data_frame( + SLIRP_GATEWAY_IP, + GUEST_SRC_PORT, + host_port, + INITIAL_GUEST_SEQ + 1, + gateway_seq + 1, + TcpControl::None, + &[], + ); + stack.process_guest_frame(&ack).unwrap(); + + // The server thread accepted the connection; grab the socket. + let (mut server_sock, _) = server_thread.join().unwrap(); + server_sock + .set_nonblocking(true) + .expect("server non-blocking"); + + // Set up state for the timed loop. + let mut out: Vec> = Vec::with_capacity(8); + let guest_seq = INITIAL_GUEST_SEQ + 1; + + // Prime: put one payload in the kernel buffer before the first + // iteration begins so the first measured call sees a ready event. + let _ = server_sock.write(PAYLOAD); + + bencher.bench_local(|| { + out.clear(); + // Refill the kernel buffer from the previous iteration's drain. + // write() may return EAGAIN if the buffer is full; that is fine — + // the previous iteration's peek left data in place. + let _ = server_sock.write(divan::black_box(PAYLOAD)); + + // The cost we are measuring: one non-blocking epoll_wait + relay. + divan::black_box(&mut stack).drain_to_guest(&mut out); + + // Consume the relay output so inject_to_guest doesn't grow + // unboundedly across iterations. + divan::black_box(&out); + + // Keep the TCP stream happy: send an ACK for any data the relay + // fed into inject_to_guest (frame content doesn't matter for the + // bench; we just need the host stream not to stall). + for frame in &out { + if let Some((data_seq, _, _, plen)) = parse_tcp_to_guest_frame(frame) { + if plen > 0 { + let ack_back = build_tcp_data_frame( + SLIRP_GATEWAY_IP, + GUEST_SRC_PORT, + host_port, + guest_seq, + data_seq.wrapping_add(plen as u32), + TcpControl::None, + &[], + ); + let _ = stack.process_guest_frame(&ack_back); + } + } + } + }); + } + + /// Build a SYN frame from the guest toward the host for the latency bench. + /// + /// Identical to `build_tcp_data_frame` with `TcpControl::Syn` and zero + /// `ack`. Kept as a separate function to document intent: this is the + /// opening segment of the 3-way handshake used by + /// `tcp_rx_latency_one_packet`. + #[cfg(feature = "bench-helpers")] + fn build_tcp_syn_for_latency_bench(src_port: u16, dst_port: u16, seq: u32) -> Vec { + build_tcp_data_frame( + SLIRP_GATEWAY_IP, + src_port, + dst_port, + seq, + 0, + smoltcp::wire::TcpControl::Syn, + &[], + ) + } } // mod linux_benches diff --git a/docs/superpowers/plans/2026-04-30-smoltcp-passt-port-phase6.4.md b/docs/superpowers/plans/2026-04-30-smoltcp-passt-port-phase6.4.md new file mode 100644 index 00000000..64050246 --- /dev/null +++ b/docs/superpowers/plans/2026-04-30-smoltcp-passt-port-phase6.4.md @@ -0,0 +1,1427 @@ +# Phase 6.4: Event-Driven RX Polling Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Replace the 5 ms timer-driven `net_poll_thread` with `epoll_wait`-driven readiness dispatch, so host→guest RX latency is bounded by the actual data-arrival delay (sub-millisecond) rather than the 5 ms polling cycle. + +**Architecture:** A new `mod epoll_dispatch` inside `src/network/` owns a single `epoll_fd` plus a self-pipe. `SlirpBackend` registers/unregisters socket FDs on flow-table mutations. The `net_poll_thread` calls `epoll_wait` (50 ms timeout for housekeeping) and routes each ready FD to the correct relay handler via `epoll_data` carrying a `FlowKey`. The self-pipe lets the vCPU-thread side wake the poll thread when it adds a new flow without polling-cycle delay. + +**Tech stack:** smoltcp 0.11 wire types (unchanged), `libc::epoll_*` syscalls, `pipe2(O_NONBLOCK | O_CLOEXEC)`, no new crates. + +**Hard performance gate (the "more performant than master" requirement):** + +``` +scripts/bench-compare.sh --baseline origin/main --skip-vm +``` + +…must show, for every comparable bench, **HEAD ≤ baseline + 5 %** *and* at least the following must improve by ≥ 30 %: + +- `port_forward_accept_latency` (currently bounded by 50 ms listener poll; epoll should drop median by an order of magnitude once the listener also moves onto epoll — *or* document why it stays). +- a new `tcp_rx_latency_us_p50` wall-clock metric in `voidbox-network-bench` (Phase 6.4 must be sub-5 ms; pre-6.4 was bounded below by the 5 ms net-poll cycle). + +Phase 6.4 is **not allowed to merge** until both gates above pass. + +--- + +## Background + +Reviewer finding **A4** (Medium-Low) on PR #68: + +- `src/vmm/mod.rs:1599-1610`: `net_poll_thread` wakes every 5 ms (`std::thread::sleep(Duration::from_millis(5))`). +- `src/network/slirp.rs:1549`: `relay_tcp_nat_data` re-peeks 64 KiB on **every** connected TCP socket every tick, regardless of readiness. +- Listener threads spawned by `spawn_port_forward_listeners` (`src/network/slirp.rs:2097`) sleep 50 ms between accept attempts — this is the cap on `port_forward_accept_latency` (~50 ms median observed in `benches/network.rs::port_forward_accept_latency`). + +passt's reference: epoll-driven readiness ([passt/tcp.c:463](https://passt.top/passt/tree/tcp.c#n463)). Phase 6.4 ports the *idea* (event-driven), not the literal `SO_PEEK_OFF` mechanism (which is Linux-specific and would not survive a future cross-platform backend split — though SLIRP itself is already `cfg(target_os = "linux")`). + +## Invariants (carried from Phase 6 overview — non-negotiable) + +1. **Full observability via `tracing`.** Every epoll event emits a `trace!` line with the `FlowKey` and event type. No silent dispatch. +2. **All-Rust path.** `libc::epoll_*` is the syscall surface; no new crates. +3. **Cross-platform discipline.** Phase 6.4 stays inside the existing `#[cfg(target_os = "linux")]` gate. macOS VZ is unaffected. +4. **No regression in Phase 0–5 baselines.** `bench-compare.sh --baseline origin/main` enforced — see "Hard performance gate" above. +5. **Snapshot/restore correctness.** `snapshot_integration` continues to pass. The `epoll_fd` does not survive snapshot; restore rebuilds the epoll set from `flow_table` contents. Snapshot does not serialize the epoll FD itself. + +## File structure + +| Path | Responsibility | Action | +|---|---|---| +| `src/network/epoll_dispatch.rs` | Owns `epoll_fd`, self-pipe, register/unregister, `wait()` returning `Vec`. Linux-only. | **Create** | +| `src/network/mod.rs` | Add `pub(crate) mod epoll_dispatch;` | Modify | +| `src/network/slirp.rs` | Hold `epoll: EpollDispatch` field on `SlirpBackend`; register on every flow_table insert; unregister on remove; rewrite `relay_tcp_nat_data`/`relay_udp_flows`/`relay_icmp_echo` to dispatch only on ready flows. | Modify | +| `src/vmm/mod.rs` | `net_poll_thread` rewrite: `epoll_wait(timeout=50ms)` instead of `sleep(5ms)`. | Modify | +| `tests/network_baseline.rs` | New pin `tcp_rx_latency_sub_5ms`; fix-up `tcp_writes_more_than_256kb_succeed`'s comment-vs-code mismatch; rename/migrate `drain_n` from `.poll()` to `drain_to_guest`. | Modify | +| `benches/network.rs` | Add divan bench `tcp_rx_latency_one_packet`. | Modify | +| `src/bin/voidbox-network-bench/main.rs` | Add `tcp_rx_latency_us_p50` measurement (host writes to a flow, time until guest sees the bytes via the relay). | Modify | +| `docs/superpowers/plans/2026-04-30-smoltcp-passt-port-phase6.4.md` | This file. | Already created | + +`drain_n` migration in `tests/network_baseline.rs` is a quiet cleanup that lands in Task 1 — every test in the file uses it, so dropping `.poll()` here also drops the last in-tree `.poll()` caller and lets us delete the deprecated method entirely later. + +## Architecture notes + +### Why one `epoll_fd` (not one per protocol)? + +- Single point of dispatch — the poll thread does *one* `epoll_wait` syscall regardless of how many flows are open. +- `epoll_data.u64` is 8 bytes — we encode `FlowKey` as a 64-bit token there. UDP and ICMP keys are smaller; TCP keys (`(guest_port, dst_ip, dst_port)`) fit in 64 bits with a tag byte for the protocol discriminator. +- Self-pipe is registered alongside socket FDs; reading it drains a queue of "I just added flow X" wake events posted by `process_guest_frame` running on the vCPU thread. + +### Why a self-pipe? + +`process_guest_frame` runs on the **vCPU thread** under the device lock. When it inserts a new flow into `flow_table`, the new socket FD is registered with epoll on that thread (cheap — just `epoll_ctl(EPOLL_CTL_ADD, ...)`). But the **poll thread** is asleep inside `epoll_wait(timeout=50ms)`. Without a wakeup, the new flow has up to 50 ms of latency before the first poll cycle picks it up. + +The self-pipe (`pipe2(O_NONBLOCK | O_CLOEXEC)` registered with `EPOLLIN`) lets `process_guest_frame` write a single byte after `epoll_ctl`. The poll thread's `epoll_wait` returns immediately, drains the pipe (a no-op handler), and starts dispatching — including the new flow. + +### Snapshot interaction + +`epoll_fd` is a kernel handle on real FDs — not serializable. Snapshot path: + +- `snapshot_internal`: tear down epoll. Drop `EpollDispatch`. Serialize `flow_table` as today. +- `from_snapshot`: deserialize `flow_table` → for every entry, recreate the host socket (already happening today via `host_stream` round-trip) → register the new FD with a fresh `EpollDispatch`. + +No serde changes to `flow_table` itself. + +### Why 50 ms `epoll_wait` timeout? + +Housekeeping the poll thread does *outside* the dispatch loop: + +- Reap stale UDP flows (`UDP_IDLE_TIMEOUT = 60 s`) — coarse, 50 ms is fine. +- Reap stale ICMP flows (similar). +- Phase 6.1 will add `LAST_ACK_TIMEOUT` reaping here. + +If we set the timeout shorter we re-introduce the "wake every X ms regardless" cost we're trying to remove. If we set it longer, housekeeping latency grows. 50 ms balances both at a 10 % wakeup duty cycle versus the previous 100 % (one wakeup every 5 ms). + +--- + +## Tasks + +### Task 1: Pre-baseline + retransmit-test fix-up + +**Files:** +- Modify: `tests/network_baseline.rs:170-179` (the `drain_n` helper) +- Modify: `tests/network_baseline.rs:374-422` (retransmit comment-vs-code in `tcp_writes_more_than_256kb_succeed`) + +- [ ] **Step 1: Capture baseline numbers from `origin/main`** + +```bash +# from a clean repo checkout +scripts/bench-compare.sh --baseline origin/main --skip-vm > /tmp/baseline-vs-main.md +cat /tmp/baseline-vs-main.md +``` + +Expected: every comparable bench has a real number in both columns. Save `/tmp/baseline-vs-main.md` as the pre-Phase-6.4 reference. + +- [ ] **Step 2: Migrate `drain_n` from `.poll()` to `drain_to_guest`** + +Replace `tests/network_baseline.rs:170-179`: + +```rust +/// Drains frames the stack wants to send to the guest, calling +/// `drain_to_guest` up to `n` times. Returns all frames produced +/// across the calls (caller may not care about per-call boundaries). +fn drain_n(stack: &mut SlirpBackend, n: usize) -> Vec> { + let mut out: Vec> = Vec::new(); + for _ in 0..n { + stack.drain_to_guest(&mut out); + } + out +} +``` + +- [ ] **Step 3: Run the existing pins to confirm `drain_n` migration is non-breaking** + +```bash +cargo test --test network_baseline +``` + +Expected: PASS for every existing pin (no semantic change — `drain_to_guest` appends to the buffer, same as `.poll()` extension). + +- [ ] **Step 4: Fix the retransmit comment-vs-code mismatch in `tcp_writes_more_than_256kb_succeed`** + +The Copilot review's C1.1 finding is correct: the loop unconditionally advances `seq` after every send, never retransmits unACK'd chunks. The 95 % threshold tolerates the resulting loss but the test's intent ("we re-send those") doesn't match its implementation. + +Two valid fixes — pick the simpler one. Replace the loop body in `tests/network_baseline.rs:387-422`: + +```rust +while bytes_received.load(Ordering::Relaxed) < TOTAL && std::time::Instant::now() < deadline { + // Retransmit semantics: only advance the send cursor once the + // previous chunk has been ACK'd. If the stack stops ACKing + // (Phase 3 backpressure), we re-send the same seq/payload until + // it's acknowledged. This matches the comment above and the + // production guest-TCP behavior we're emulating. + let _ = stack.process_guest_frame(&build_tcp_frame( + SLIRP_GATEWAY_IP, + GUEST_EPHEMERAL_PORT, + host_port, + seq, + our_seq + 1, + TcpControl::Psh, + &chunk, + )); + + // Drain frames; track the highest ACK we've seen and watch + // for RST/FIN that would indicate a Phase-2 era close. + for f in drain_n(&mut stack, 4) { + if let Some((_, ack, ctrl, _)) = parse_tcp_to_guest(&f) { + if matches!(ctrl, TcpControl::Rst | TcpControl::Fin) { + saw_close = true; + } + if ack > acked_seq { + acked_seq = ack; + } + } + } + + if saw_close { + break; + } + + // Advance our send cursor only past ACK'd data. If the stack + // didn't ACK this chunk, the next loop iteration re-sends the + // same seq/payload (true TCP retransmit semantics). + if acked_seq >= seq.wrapping_add(CHUNK as u32) { + seq = seq.wrapping_add(CHUNK as u32); + } else if seq.wrapping_sub(acked_seq) > 256 * 1024 { + // Out-paced kernel recv buffer; sleep briefly so the host + // server thread can drain. + std::thread::sleep(std::time::Duration::from_millis(10)); + } +} +``` + +The single substantive change: move `seq = seq.wrapping_add(...)` from line 398 (unconditional, immediately after send) to after the drain loop, gated on `acked_seq >= seq + CHUNK`. If the stack ACK'd, advance; otherwise the next iteration re-sends the same chunk. + +- [ ] **Step 5: Run the fixed test to confirm it still passes (now with real retransmit)** + +```bash +cargo test --test network_baseline tcp_writes_more_than_256kb_succeed +``` + +Expected: PASS. The 95 % threshold will likely be 100 % now since real retransmits don't drop bytes. + +- [ ] **Step 6: Commit** + +```bash +git add tests/network_baseline.rs +git commit -m "test(network): drain_n via drain_to_guest + real retransmit in 256kb test + +Two test-harness improvements landing together since both block the +Phase 6.4 RX-latency work: + +- drain_n migrated from deprecated SlirpBackend::poll() to + drain_to_guest. This was the last in-tree poll() caller. +- tcp_writes_more_than_256kb_succeed now matches its 'we re-send + those' comment: seq only advances when acked_seq catches up, + giving real TCP-retransmit semantics in the synthetic guest + rather than the previous 'lossy with 95% tolerance' shape. + Phase 6.4 must not regress this contract; making the test + faithful first means epoll regressions surface as failures + instead of borderline 95% misses." +``` + +--- + +### Task 2: ~~Failing pin — `tcp_rx_latency_sub_5ms`~~ **DROPPED** + +**Status:** Dropped during execution. Original intent was a unit-level BROKEN_ON_PURPOSE pin asserting host→guest delivery in < 5 ms. **The 5 ms floor lives in `net_poll_thread` (`src/vmm/mod.rs:1609`), not in `SlirpBackend::drain_to_guest`** — the relay is synchronous when called from a test harness, so a unit-level latency assertion can't measure what we actually care about. + +**Where the contract moved:** Task 13's wall-clock `tcp_rx_latency_us_p50` metric in `voidbox-network-bench`. That harness boots a real VM, drives the actual `net_poll_thread`, and observes the latency floor end-to-end. The hard-perf-gate requirement at the top of this plan (`tcp_rx_latency_us_p50 < 5 ms`) is the BROKEN_ON_PURPOSE replacement. + +**No code lands for Task 2.** Skip directly to Task 3. + +
+Original Task 2 body (kept for context) + +The original plan attempted a unit-level pin that called `drain_to_guest` synchronously and timed the host-write → guest-receive interval. Implementation revealed: + +- `drain_to_guest` is synchronous; the 5 ms `sleep` in `net_poll_thread` is what bounds VMM-level RX latency, not anything inside `SlirpBackend`. +- The test would have measured "spawn-thread + accept + write" minus "drain-loop find time", which underflowed in debug mode and was meaningless in release mode. + +The contract — Phase 6.4 must deliver host→guest data in < 5 ms when data is available — is preserved as a VM-level requirement in Task 13. + +
+ +- [ ] **Step 1: ~~Write the failing test~~ Skipped — see "DROPPED" note above. Original body kept below for context only.** + +```rust +/// Phase 6.4 pin: host→guest RX latency must be sub-5 ms when data +/// is available. Pre-Phase-6.4 the floor was 5 ms (the +/// `net_poll_thread` `sleep(5ms)` cycle); post-Phase-6.4 the +/// epoll dispatch should deliver in < 1 ms on a quiet system. +/// +/// Test harness: open a TCP flow guest→host, wait for ESTABLISHED, +/// have the host write 64 bytes, measure the time from `write()` +/// returning to the guest seeing the bytes in `drain_to_guest`'s +/// output. Pre-Phase-6.4 this measures ≈ 5 ms ± jitter; post- +/// Phase-6.4 it should be sub-millisecond on the same host. +#[test] +fn tcp_rx_latency_sub_5ms() { + use std::io::Write; + use std::net::{TcpListener, TcpStream}; + use std::time::Instant; + + // Bind a host listener; the SLIRP rewrite of 10.0.2.2 → 127.0.0.1 + // routes our SYN to it. + let listener = TcpListener::bind("127.0.0.1:0").expect("bind"); + let host_port = listener.local_addr().unwrap().port(); + let server = std::thread::spawn(move || -> Option { + let (mut sock, _) = listener.accept().ok()?; + // Wait for the guest to send something so we know the relay + // is established and bidirectional. + let mut probe = [0u8; 1]; + let _ = std::io::Read::read(&mut sock, &mut probe); + + // Stamp T0 just before write returns. + let t0 = Instant::now(); + sock.write_all(&[0x42; 64]).ok()?; + Some(t0.elapsed()) + }); + + let mut stack = SlirpBackend::new().unwrap(); + + // Drive the 3-way handshake. + let our_seq = 1000u32; + stack.process_guest_frame(&build_tcp_frame( + SLIRP_GATEWAY_IP, GUEST_EPHEMERAL_PORT, host_port, our_seq, 0, + TcpControl::Syn, &[], + )).unwrap(); + + let mut gateway_seq = 0u32; + for f in drain_n(&mut stack, 4) { + if let Some((s, _ack, ctrl, _)) = parse_tcp_to_guest(&f) { + if matches!(ctrl, TcpControl::Syn) { + gateway_seq = s; + break; + } + } + } + + stack.process_guest_frame(&build_tcp_frame( + SLIRP_GATEWAY_IP, GUEST_EPHEMERAL_PORT, host_port, our_seq + 1, gateway_seq + 1, + TcpControl::None, &[], + )).unwrap(); + + // Send a probe byte so the host server thread proceeds to write. + stack.process_guest_frame(&build_tcp_frame( + SLIRP_GATEWAY_IP, GUEST_EPHEMERAL_PORT, host_port, our_seq + 1, gateway_seq + 1, + TcpControl::Psh, &[0xAA], + )).unwrap(); + + // Now the host writes and stamps T0. We measure from "host write + // completes" to "guest sees data in drain output." + let host_t0 = server.join().expect("server").expect("write succeeded"); + let drain_start = Instant::now(); + let mut saw_payload = false; + while drain_start.elapsed() < std::time::Duration::from_secs(1) { + let frames: Vec> = drain_n(&mut stack, 1); + for f in &frames { + if let Some((_, _, _, payload_len)) = parse_tcp_to_guest(f) { + if payload_len >= 64 { + saw_payload = true; + break; + } + } + } + if saw_payload { break; } + std::thread::sleep(std::time::Duration::from_micros(50)); + } + let host_to_guest_us = drain_start.elapsed().as_micros() as u64 + - host_t0.as_micros() as u64; + + assert!(saw_payload, "host payload never reached the guest"); + + // The contract: epoll dispatch delivers in < 5 ms. + assert!( + host_to_guest_us < 5_000, + "Phase 6.4 contract: host→guest RX latency must be sub-5 ms \ + (was bounded below by 5 ms net_poll_thread cycle); got {host_to_guest_us} µs" + ); +} +``` + +- [ ] **Step 2: Run the test, expect it to fail** + +```bash +cargo test --test network_baseline tcp_rx_latency_sub_5ms +``` + +Expected: **FAIL** with `host→guest RX latency must be sub-5 ms; got <5000-9999> µs` — the current `net_poll_thread` is ineligible to deliver in <5 ms because of its `sleep(5ms)`. + +This is the Phase 6.4 BROKEN_ON_PURPOSE pin. It will flip in Task 11. + +- [ ] **Step 3: Commit the failing pin** + +```bash +git add tests/network_baseline.rs +git commit -m "test(network): pin tcp_rx_latency_sub_5ms (BROKEN_ON_PURPOSE) + +Phase 6.4 contract: host→guest RX latency must be sub-5 ms when +data is available. Pre-6.4 the floor is the 5 ms net_poll_thread +sleep cycle; this assertion fails on master and on the current +PR #68 tip. Phase 6.4's epoll dispatch will flip it to passing. + +Mark with #[ignore] is deliberately NOT used: this is a positive +contract and CI must surface the failure on master so the gate +is unmissable." +``` + +--- + +### Task 3: `EpollDispatch` skeleton + unit test + +**Files:** +- Create: `src/network/epoll_dispatch.rs` +- Modify: `src/network/mod.rs` — add `pub(crate) mod epoll_dispatch;` + +- [ ] **Step 1: Write the failing test (in the new module)** + +In `src/network/epoll_dispatch.rs`: + +```rust +//! Linux epoll-driven readiness dispatch for SLIRP host sockets. +//! +//! Owns one `epoll_fd` plus a self-pipe. Callers register socket FDs +//! with a `FlowToken` (a 64-bit identifier the dispatcher returns on +//! readiness). The poll thread calls `wait_with_timeout` to block +//! until any registered FD is ready or the timeout fires, then drains +//! the events into a caller-owned buffer. +//! +//! Why no crate? The standard `mio`/`tokio` story would pull in a +//! reactor + a runtime — Phase 6.4 needs neither. `libc::epoll_*` +//! is two syscalls, fully observable, and the surface fits in ~150 +//! lines. See plan 2026-04-30-smoltcp-passt-port-phase6.4.md +//! "Architecture notes" for the rationale. + +use std::io; +use std::os::fd::{AsRawFd, OwnedFd, RawFd}; +use std::time::Duration; + +/// Opaque per-FD identifier the caller uses to look up which flow a +/// readiness event belongs to. Encoded into `epoll_data.u64`. +pub type FlowToken = u64; + +/// One readiness event, mapped from `libc::epoll_event`. +#[derive(Debug, Clone, Copy)] +pub struct EpollEvent { + pub token: FlowToken, + pub readable: bool, + pub writable: bool, +} + +#[derive(Debug)] +pub struct EpollDispatch { + // implementation in next step +} + +#[cfg(test)] +mod tests { + use super::*; + use std::os::fd::AsRawFd; + + #[test] + fn dispatch_new_creates_epoll_fd() { + let dispatch = EpollDispatch::new().expect("EpollDispatch::new"); + assert!(dispatch.epoll_fd_for_test() >= 0); + } +} +``` + +- [ ] **Step 2: Run, expect compile error** + +```bash +cargo test --lib network::epoll_dispatch +``` + +Expected: COMPILE FAIL — `new` and `epoll_fd_for_test` not defined. + +- [ ] **Step 3: Implement minimal `EpollDispatch`** + +Replace the empty struct in `src/network/epoll_dispatch.rs`: + +```rust +#[derive(Debug)] +pub struct EpollDispatch { + epoll_fd: OwnedFd, +} + +impl EpollDispatch { + /// Create a new epoll instance with `EPOLL_CLOEXEC`. + pub fn new() -> io::Result { + // SAFETY: `epoll_create1` returns -1 on error and a valid fd + // otherwise. We wrap into OwnedFd so Drop closes it. + let raw = unsafe { libc::epoll_create1(libc::EPOLL_CLOEXEC) }; + if raw < 0 { + return Err(io::Error::last_os_error()); + } + let epoll_fd = unsafe { OwnedFd::from_raw_fd(raw) }; + Ok(Self { epoll_fd }) + } + + #[cfg(test)] + fn epoll_fd_for_test(&self) -> RawFd { + self.epoll_fd.as_raw_fd() + } +} +``` + +Add the missing `use std::os::fd::FromRawFd;` to the file's existing `use` block (module-scope per project convention). + +- [ ] **Step 4: Run, expect pass** + +```bash +cargo test --lib network::epoll_dispatch::tests::dispatch_new_creates_epoll_fd +``` + +Expected: PASS. + +- [ ] **Step 5: Commit** + +```bash +git add src/network/epoll_dispatch.rs src/network/mod.rs +git commit -m "feat(network): EpollDispatch skeleton with epoll_create1 + +Phase 6.4 foundation. One epoll_fd owned via OwnedFd + EPOLL_CLOEXEC. +No registration logic yet — Task 4 will add register/unregister and +Task 6 will add the self-pipe + wait loop." +``` + +--- + +### Task 4: `register` / `unregister` + tests + +**Files:** +- Modify: `src/network/epoll_dispatch.rs` + +- [ ] **Step 1: Write the failing tests** + +In the `mod tests` block: + +```rust +#[test] +fn register_then_unregister_round_trip() { + use std::net::TcpListener; + let listener = TcpListener::bind("127.0.0.1:0").expect("bind"); + let mut dispatch = EpollDispatch::new().expect("EpollDispatch::new"); + let token: FlowToken = 0xDEAD_BEEF; + dispatch + .register(listener.as_raw_fd(), token, true, false) + .expect("register"); + dispatch.unregister(listener.as_raw_fd()).expect("unregister"); +} + +#[test] +fn register_invalid_fd_returns_error() { + let mut dispatch = EpollDispatch::new().expect("EpollDispatch::new"); + let result = dispatch.register(-1, 0, true, false); + assert!(result.is_err()); +} +``` + +- [ ] **Step 2: Run, expect compile fail** + +```bash +cargo test --lib network::epoll_dispatch +``` + +Expected: COMPILE FAIL — `register`/`unregister` not defined. + +- [ ] **Step 3: Implement** + +Add to `EpollDispatch`: + +```rust +impl EpollDispatch { + /// Register `fd` with the dispatcher. `readable`/`writable` + /// select EPOLLIN / EPOLLOUT. `token` is opaque to the + /// dispatcher — returned verbatim on readiness events. + pub fn register( + &mut self, + fd: RawFd, + token: FlowToken, + readable: bool, + writable: bool, + ) -> io::Result<()> { + let mut events: u32 = 0; + if readable { + events |= libc::EPOLLIN as u32; + } + if writable { + events |= libc::EPOLLOUT as u32; + } + let mut ev = libc::epoll_event { + events, + u64: token, + }; + // SAFETY: epoll_ctl reads `ev` for ADD; we own `fd` for the + // lifetime of the registration (caller's contract). + let rc = unsafe { + libc::epoll_ctl( + self.epoll_fd.as_raw_fd(), + libc::EPOLL_CTL_ADD, + fd, + &mut ev as *mut _, + ) + }; + if rc < 0 { + return Err(io::Error::last_os_error()); + } + Ok(()) + } + + pub fn unregister(&mut self, fd: RawFd) -> io::Result<()> { + // SAFETY: epoll_ctl ignores the event pointer for DEL but + // still requires it to be non-null on older kernels. + let mut ev = libc::epoll_event { events: 0, u64: 0 }; + let rc = unsafe { + libc::epoll_ctl( + self.epoll_fd.as_raw_fd(), + libc::EPOLL_CTL_DEL, + fd, + &mut ev as *mut _, + ) + }; + if rc < 0 { + return Err(io::Error::last_os_error()); + } + Ok(()) + } +} +``` + +- [ ] **Step 4: Run, expect pass** + +```bash +cargo test --lib network::epoll_dispatch +``` + +Expected: PASS for both new tests. + +- [ ] **Step 5: Commit** + +```bash +git add src/network/epoll_dispatch.rs +git commit -m "feat(network): EpollDispatch register/unregister" +``` + +--- + +### Task 5: `wait_with_timeout` + integration test + +**Files:** +- Modify: `src/network/epoll_dispatch.rs` + +- [ ] **Step 1: Write the failing test** + +```rust +#[test] +fn wait_returns_event_when_socket_becomes_readable() { + use std::io::Write; + use std::net::{TcpListener, TcpStream}; + let listener = TcpListener::bind("127.0.0.1:0").expect("bind"); + let addr = listener.local_addr().unwrap(); + let server = std::thread::spawn(move || { + let (mut sock, _) = listener.accept().unwrap(); + sock.write_all(b"hi").unwrap(); + }); + let stream = TcpStream::connect(addr).expect("connect"); + server.join().unwrap(); + + let mut dispatch = EpollDispatch::new().expect("new"); + dispatch + .register(stream.as_raw_fd(), 0xCAFE, true, false) + .expect("register"); + + let mut events: Vec = Vec::new(); + let n = dispatch + .wait_with_timeout(&mut events, Duration::from_secs(1)) + .expect("wait"); + assert_eq!(n, 1); + assert_eq!(events[0].token, 0xCAFE); + assert!(events[0].readable); +} +``` + +- [ ] **Step 2: Run, expect compile fail** + +Expected: `wait_with_timeout` not found. + +- [ ] **Step 3: Implement** + +```rust +impl EpollDispatch { + /// Block up to `timeout` for any registered FD to become ready. + /// Drains ready events into `out` (cleared first). Returns the + /// number of events drained. + /// + /// `timeout = Duration::ZERO` is non-blocking poll; + /// `timeout = Duration::from_secs(...)` waits up to that long. + pub fn wait_with_timeout( + &self, + out: &mut Vec, + timeout: Duration, + ) -> io::Result { + out.clear(); + + // Pre-allocate a fixed-size event buffer. 64 ready FDs per + // wait is more than enough for our flow counts; events not + // returned this round will surface on the next wait. + let mut raw_events: [libc::epoll_event; 64] = + [libc::epoll_event { events: 0, u64: 0 }; 64]; + + let timeout_ms: i32 = timeout + .as_millis() + .min(i32::MAX as u128) as i32; + + // SAFETY: epoll_wait writes up to raw_events.len() entries; + // returns -1 on error, 0 on timeout, n>0 on events. + let n = unsafe { + libc::epoll_wait( + self.epoll_fd.as_raw_fd(), + raw_events.as_mut_ptr(), + raw_events.len() as i32, + timeout_ms, + ) + }; + if n < 0 { + // EINTR is non-fatal — caller can retry on next tick. + let err = io::Error::last_os_error(); + if err.raw_os_error() == Some(libc::EINTR) { + return Ok(0); + } + return Err(err); + } + for raw in &raw_events[..n as usize] { + out.push(EpollEvent { + token: raw.u64, + readable: (raw.events & libc::EPOLLIN as u32) != 0, + writable: (raw.events & libc::EPOLLOUT as u32) != 0, + }); + } + Ok(n as usize) + } +} +``` + +- [ ] **Step 4: Run, expect pass** + +```bash +cargo test --lib network::epoll_dispatch +``` + +- [ ] **Step 5: Commit** + +```bash +git add src/network/epoll_dispatch.rs +git commit -m "feat(network): EpollDispatch::wait_with_timeout" +``` + +--- + +### Task 6: Self-pipe + wakeup test + +**Files:** +- Modify: `src/network/epoll_dispatch.rs` + +- [ ] **Step 1: Write the failing test** + +```rust +#[test] +fn wakeup_unblocks_wait_immediately() { + use std::time::Instant; + let mut dispatch = EpollDispatch::new().expect("new"); + let waker = dispatch.waker(); + + // Start the wait in another thread with a long timeout. + let wait_thread = std::thread::spawn(move || -> std::time::Duration { + let mut events: Vec = Vec::new(); + let start = Instant::now(); + let _ = dispatch.wait_with_timeout(&mut events, Duration::from_secs(5)); + start.elapsed() + }); + + // Wake immediately. + std::thread::sleep(Duration::from_millis(10)); + waker.wake(); + + let elapsed = wait_thread.join().expect("wait thread"); + // Wait thread should return well under the 5 s timeout. + assert!( + elapsed < Duration::from_secs(1), + "wait did not return on wakeup: {elapsed:?}" + ); +} +``` + +- [ ] **Step 2: Run, expect compile fail** + +Expected: `waker()` and `Waker` not defined. + +- [ ] **Step 3: Implement** + +Add to `epoll_dispatch.rs`: + +```rust +/// Cloneable wakeup handle for `EpollDispatch`. Writing one byte to +/// the underlying pipe wakes a thread blocked in `wait_with_timeout`. +#[derive(Debug, Clone)] +pub struct Waker { + write_end: std::sync::Arc, +} + +impl Waker { + pub fn wake(&self) { + let buf = [0u8; 1]; + // SAFETY: write to a non-blocking pipe never blocks. We + // ignore EAGAIN — the pipe already has bytes pending, which + // means a wakeup is already queued. + let _ = unsafe { + libc::write(self.write_end.as_raw_fd(), buf.as_ptr() as *const _, 1) + }; + } +} + +const SELF_PIPE_TOKEN: FlowToken = u64::MAX; + +impl EpollDispatch { + /// Returns a `Waker` that, when called, unblocks any thread + /// currently inside `wait_with_timeout`. + pub fn waker(&mut self) -> Waker { + if self.waker_handle.is_none() { + let (read_fd, write_fd) = create_pipe2_nonblock_cloexec(); + self.register(read_fd.as_raw_fd(), SELF_PIPE_TOKEN, true, false) + .expect("register self-pipe"); + self.read_end = Some(read_fd); + self.waker_handle = Some(std::sync::Arc::new(write_fd)); + } + Waker { + write_end: self.waker_handle.as_ref().unwrap().clone(), + } + } +} + +fn create_pipe2_nonblock_cloexec() -> (OwnedFd, OwnedFd) { + let mut fds = [0 as RawFd; 2]; + // SAFETY: pipe2 with O_NONBLOCK | O_CLOEXEC writes two fds into fds. + let rc = unsafe { + libc::pipe2(fds.as_mut_ptr(), libc::O_NONBLOCK | libc::O_CLOEXEC) + }; + assert!(rc == 0, "pipe2 failed: {}", io::Error::last_os_error()); + let read_end = unsafe { OwnedFd::from_raw_fd(fds[0]) }; + let write_end = unsafe { OwnedFd::from_raw_fd(fds[1]) }; + (read_end, write_end) +} +``` + +Add fields to `EpollDispatch`: + +```rust +#[derive(Debug)] +pub struct EpollDispatch { + epoll_fd: OwnedFd, + read_end: Option, + waker_handle: Option>, +} +``` + +…and update `EpollDispatch::new` to initialize the new fields to `None`. + +In `wait_with_timeout`, after collecting events, drop the self-pipe wake-token from the returned set (the caller doesn't care about it) and drain any pending bytes from the read end: + +```rust +// Drain self-pipe events from the returned set + the pipe itself. +let mut filtered: Vec = Vec::with_capacity(out.len()); +for ev in out.drain(..) { + if ev.token == SELF_PIPE_TOKEN { + if let Some(read_end) = &self.read_end { + let mut scratch = [0u8; 64]; + // SAFETY: non-blocking read; ignored result. + unsafe { + libc::read( + read_end.as_raw_fd(), + scratch.as_mut_ptr() as *mut _, + scratch.len(), + ); + } + } + continue; + } + filtered.push(ev); +} +*out = filtered; +let observable_n = out.len(); +Ok(observable_n) +``` + +- [ ] **Step 4: Run all dispatch tests** + +```bash +cargo test --lib network::epoll_dispatch +``` + +Expected: PASS for all four tests. + +- [ ] **Step 5: Commit** + +```bash +git add src/network/epoll_dispatch.rs +git commit -m "feat(network): EpollDispatch self-pipe wakeup + +Cloneable Waker writes one byte to a non-blocking pipe registered +with EPOLLIN. wait_with_timeout filters self-pipe events out of +the returned set and drains the pipe so subsequent waits don't +spurious-wake." +``` + +--- + +### Task 7: Wire `EpollDispatch` into `SlirpBackend` + +**Files:** +- Modify: `src/network/slirp.rs` — `SlirpBackend` struct + `new` + `with_security`. + +- [ ] **Step 1: Add the field** + +In the `SlirpBackend` struct definition (~line 450): + +```rust +pub struct SlirpBackend { + // ... existing fields ... + epoll: crate::network::epoll_dispatch::EpollDispatch, + epoll_waker: crate::network::epoll_dispatch::Waker, +} +``` + +In `SlirpBackend::with_security` (~line 503), after `flow_table` is initialized but before any flow is inserted: + +```rust +let mut epoll = crate::network::epoll_dispatch::EpollDispatch::new() + .map_err(|e| anyhow::anyhow!("EpollDispatch::new: {e}"))?; +let epoll_waker = epoll.waker(); +``` + +…then include `epoll`, `epoll_waker` in the struct literal. + +- [ ] **Step 2: Run unit tests; expect them to still pass (no behavior change yet)** + +```bash +cargo test --lib network::slirp +cargo test --test network_baseline +``` + +Expected: ALL PASS — `SlirpBackend` now owns an unused epoll_fd. + +- [ ] **Step 3: Commit** + +```bash +git add src/network/slirp.rs +git commit -m "refactor(slirp): SlirpBackend holds EpollDispatch + Waker + +Plumbed but not yet consumed. Subsequent tasks wire flow_table +mutations into epoll register/unregister and rewrite the relay +loops to dispatch on readiness." +``` + +--- + +### Task 8: TCP register/unregister on flow_table mutation + smoke test + +**Files:** +- Modify: `src/network/slirp.rs` — `handle_tcp_frame` (after `flow_table.insert`) and `relay_tcp_nat_data` (where `to_remove` entries are reaped). + +- [ ] **Step 1: Add a `flow_token_for_tcp` helper at module scope** + +Encoding: 8 bits of protocol tag (0x01 = TCP), 8 bits unused (zero), 16 bits guest_port, 32 bits packed (dst_port << 16) | (truncated dst_ip). For 100 % uniqueness across tag/port collisions, see follow-up — for now this 64-bit token is unique within the flow table because `NatKey` itself is unique. + +```rust +const PROTO_TAG_TCP: u64 = 0x0100_0000_0000_0000; +const PROTO_TAG_UDP: u64 = 0x0200_0000_0000_0000; +const PROTO_TAG_ICMP: u64 = 0x0300_0000_0000_0000; + +fn flow_token_for_tcp(key: &NatKey) -> u64 { + let dst_ip_bytes = key.dst_ip.0; + let dst_ip_low: u64 = u64::from(u32::from_be_bytes(dst_ip_bytes)) & 0xFFFF_FFFF; + PROTO_TAG_TCP + | (u64::from(key.guest_src_port) << 32) + | (u64::from(key.dst_port) << 16) + | (dst_ip_low & 0xFFFF) +} +``` + +Symmetric helpers for UDP / ICMP land in Tasks 9 / 10. + +- [ ] **Step 2: After every `flow_table.insert(FlowKey::Tcp(...), FlowEntry::Tcp(entry))`, register the host_stream FD** + +For example in `handle_tcp_frame` (~line 1290 after insert): + +```rust +let token = flow_token_for_tcp(&key); +self.epoll + .register(entry.host_stream.as_raw_fd(), token, true, false) + .ok(); +self.epoll_waker.wake(); +``` + +…and in `process_pending_inbound_accepts` (line 648 area): + +```rust +self.flow_table.insert(FlowKey::Tcp(key), FlowEntry::Tcp(entry)); +let host_fd = match self.flow_table.get(&FlowKey::Tcp(key)) { + Some(FlowEntry::Tcp(e)) => e.host_stream.as_raw_fd(), + _ => unreachable!(), +}; +self.epoll.register(host_fd, flow_token_for_tcp(&key), true, false).ok(); +self.epoll_waker.wake(); +``` + +…and on every `flow_table.remove(&FlowKey::Tcp(...))` site, unregister first: + +```rust +if let Some(FlowEntry::Tcp(e)) = self.flow_table.get(&flow_key) { + self.epoll.unregister(e.host_stream.as_raw_fd()).ok(); +} +self.flow_table.remove(&flow_key); +``` + +(grep for every `flow_table.remove` and `flow_table.insert` site touching TCP — there are ~6.) + +- [ ] **Step 3: Run all baseline pins** + +```bash +cargo test --test network_baseline +``` + +Expected: PASS — no behavioral change yet (relay still re-peeks every flow on every tick). + +- [ ] **Step 4: Commit** + +```bash +git add src/network/slirp.rs +git commit -m "feat(slirp): register TCP flows with EpollDispatch + +flow_table mutations now keep the epoll set in sync. No relay-loop +change yet — Task 11 will switch the loop to dispatch by readiness +instead of iterating the full table." +``` + +--- + +### Task 9: UDP register/unregister + ICMP register/unregister + +Mirror Task 8 for `FlowKey::Udp` and `FlowKey::IcmpEcho` flow_table sites. Same shape: register on insert, unregister on remove. Use `PROTO_TAG_UDP` / `PROTO_TAG_ICMP` in the helpers. + +- [ ] **Step 1: Implement helpers and call sites** +- [ ] **Step 2: Run baseline pins (PASS)** +- [ ] **Step 3: Commit** with message `feat(slirp): register UDP + ICMP flows with EpollDispatch` + +--- + +### Task 10: Flip `relay_tcp_nat_data` to event-driven + +**Files:** +- Modify: `src/network/slirp.rs` — `relay_tcp_nat_data` body (~line 1512+). + +The current loop iterates *every* TCP entry in `flow_table` every tick. New shape: take the readiness set from a caller-passed `&[EpollEvent]`, look up the flow by `FlowKey`, only peek-relay readable flows. + +- [ ] **Step 1: Change signature** + +```rust +fn relay_tcp_nat_data(&mut self, ready: &[EpollEvent]) { + let mut to_remove: Vec = Vec::new(); + let mut frames_to_inject: Vec> = Vec::new(); + + for event in ready { + if event.token & PROTO_TAG_TCP_MASK != PROTO_TAG_TCP { + continue; + } + // Decode token back to NatKey by linear scan — flow_table is + // small and the token-to-key direction is rare (only on + // readiness). Future optimization: keep a side index. + let flow_key = match self.flow_table.iter().find_map(|(k, _)| { + if let FlowKey::Tcp(nat_key) = k { + if flow_token_for_tcp(nat_key) == event.token { + return Some(*k); + } + } + None + }) { + Some(k) => k, + None => continue, + }; + + let Some(FlowEntry::Tcp(entry)) = self.flow_table.get_mut(&flow_key) else { + continue; + }; + if entry.state != TcpNatState::Established { + continue; + } + + // ... existing peek/relay body, unchanged from line 1549+ ... + } + + self.inject_to_guest.append(&mut frames_to_inject); + for flow_key in to_remove { + if let Some(FlowEntry::Tcp(e)) = self.flow_table.get(&flow_key) { + self.epoll.unregister(e.host_stream.as_raw_fd()).ok(); + } + self.flow_table.remove(&flow_key); + } +} +``` + +Define `PROTO_TAG_TCP_MASK` next to the other tag constants: + +```rust +const PROTO_TAG_MASK: u64 = 0xFF00_0000_0000_0000; +``` + +…and check `event.token & PROTO_TAG_MASK == PROTO_TAG_TCP`. + +- [ ] **Step 2: Update the caller in `drain_to_guest`** + +```rust +pub fn drain_to_guest(&mut self, out: &mut Vec>) { + self.process_pending_inbound_accepts(); + // ... ARP handling ... + + // Phase 6.4: gather readiness events once per tick. The poll + // thread will already have driven a recent epoll_wait; here we do + // a non-blocking poll to pick up anything that arrived between + // the last wait and now. + let mut ready: Vec = Vec::new(); + let _ = self.epoll.wait_with_timeout(&mut ready, Duration::ZERO); + + self.resolve_pending_dns(); + self.relay_tcp_nat_data(&ready); + self.relay_icmp_echo(&ready); + self.relay_udp_flows(&ready); + + // ... unchanged collection of frames ... +} +``` + +- [ ] **Step 3: Update `relay_icmp_echo` and `relay_udp_flows` signatures to `(&mut self, ready: &[EpollEvent])`** with parallel filtering by `PROTO_TAG_ICMP` / `PROTO_TAG_UDP`. + +- [ ] **Step 4: Run baseline pins** + +```bash +cargo test --test network_baseline +``` + +Expected: PASS — the `wait_with_timeout(Duration::ZERO)` non-blocking poll captures any ready FD between vCPU calls; the relay still works. + +- [ ] **Step 5: Commit** + +```bash +git add src/network/slirp.rs +git commit -m "feat(slirp): relay loops dispatch by epoll readiness + +drain_to_guest non-blocking-polls the epoll set once per tick and +passes the ready event list to relay_tcp_nat_data / +relay_udp_flows / relay_icmp_echo, which now skip non-ready flows +instead of re-peeking the whole table. Behavior unchanged on +hot-path; per-tick CPU should drop on idle systems with many +flows." +``` + +--- + +### Task 11: Rewrite `net_poll_thread` to use `epoll_wait` + +**Files:** +- Modify: `src/vmm/mod.rs:1599-1640`. + +- [ ] **Step 1: Replace the `sleep(5ms)` loop** + +The current loop: + +```rust +while running.load(Ordering::Relaxed) { + std::thread::sleep(std::time::Duration::from_millis(5)); + // ... try_inject_rx + irq ... +} +``` + +Becomes (pseudocode — exact integration with the device-lock pattern needs care): + +```rust +while running.load(Ordering::Relaxed) { + // Acquire the SlirpBackend's waker once at startup; use it as + // the shutdown signaling channel too. + let mut events: Vec = Vec::new(); + { + let guard = match net_dev.lock() { + Ok(g) => g, + Err(_) => continue, + }; + // Borrow epoll for the wait; see Step 2 for the API on + // VirtioNetDevice that exposes it without holding the + // device lock during epoll_wait. + let _ = guard.poll_epoll(&mut events, Duration::from_millis(50)); + } + // ... try_inject_rx + irq, unchanged ... +} +``` + +The challenge: `epoll_wait` blocks for up to 50 ms; we cannot hold the device lock that whole time (vCPU would stall on next TX). Solution: `VirtioNetDevice::poll_epoll` clones the `epoll` into an `Arc>` (or similar) and the wait happens *outside* the device lock. + +- [ ] **Step 2: Refactor the lock granularity** + +In `src/network/slirp.rs`, change: + +```rust +epoll: EpollDispatch, +``` + +to: + +```rust +epoll: std::sync::Arc>, +``` + +…and update all `self.epoll.register(...)` to `self.epoll.lock().unwrap().register(...)`. Provide a clone-of-Arc accessor: + +```rust +pub fn epoll_arc(&self) -> std::sync::Arc> { + Arc::clone(&self.epoll) +} +``` + +The poll thread holds an `Arc>`, calls `wait_with_timeout` while holding that lock, and *not* the device lock. + +- [ ] **Step 3: Run baseline + integration tests** + +```bash +cargo test --workspace --all-features +cargo test --test network_baseline +``` + +Expected: all PASS. + +- [ ] **Step 4: Run the BROKEN_ON_PURPOSE pin from Task 2 — it should now flip to PASS** + +```bash +cargo test --test network_baseline tcp_rx_latency_sub_5ms +``` + +Expected: **PASS** with measured latency < 5 ms (likely sub-millisecond). + +- [ ] **Step 5: Commit** + +```bash +git add src/network/slirp.rs src/vmm/mod.rs +git commit -m "feat(vmm): net_poll_thread driven by epoll_wait + +Replaces the 5 ms sleep cycle with epoll_wait(timeout=50ms). When +host data arrives, the poll thread wakes within microseconds and +drives drain_to_guest immediately. When idle, the thread wakes +once every 50 ms for housekeeping (UDP/ICMP idle reaping) — a +10x reduction in wakeup duty cycle vs the previous 5 ms timer. + +Phase 6.4 BROKEN_ON_PURPOSE pin tcp_rx_latency_sub_5ms flips to +passing here." +``` + +--- + +### Task 12: Snapshot rebuild test + implementation + +**Files:** +- Modify: `src/vmm/mod.rs` (snapshot/restore paths) and `src/network/slirp.rs` (`from_snapshot`-shaped constructor). + +- [ ] **Step 1: Run the existing snapshot integration suite to confirm baseline** + +```bash +export VOID_BOX_KERNEL=/boot/vmlinuz-$(uname -r) +export VOID_BOX_INITRAMFS=/tmp/void-box-test-rootfs.cpio.gz +cargo test --test snapshot_integration -- --ignored --test-threads=1 +``` + +Expected: PASS (Phase 0–5 baseline). If it doesn't pass on this branch's tip pre-6.4, fix before continuing — this gate is non-negotiable. + +- [ ] **Step 2: Write the new test pin** + +In `tests/network_baseline.rs`: + +```rust +/// Phase 6.4 contract: snapshot/restore must rebuild the epoll +/// dispatch from flow_table contents. After a round-trip, the +/// backend has zero registered flows in epoll if flow_table was +/// non-empty pre-snapshot — that's the bug we want to catch. +#[test] +fn epoll_set_rebuilt_on_restore_smoke() { + // Construct backend, open one TCP flow (handshake), serialize + // the flow_table, drop the backend, build a fresh backend and + // inject the serialized flow_table. Verify the new backend's + // epoll set has the flow's host_fd registered. + // ... (full test code) ... +} +``` + +The detailed body is omitted here — write it referencing the snapshot helpers in `src/vmm/snapshot.rs` and the existing `from_snapshot` shape. Verify by checking the count of registered FDs (add a `#[cfg(test)] pub fn registered_fd_count(&self) -> usize` to `EpollDispatch`). + +- [ ] **Step 3: Run, expect FAIL** + +The current snapshot path has no rebuild step; the count is 0. + +- [ ] **Step 4: Implement rebuild in the snapshot deserialization path** + +Wherever `from_snapshot` reconstructs the `SlirpBackend` (likely in `src/vmm/mod.rs` around line 690 area where snapshots are restored), after the flow_table is rebuilt from the snapshot bytes, iterate it and call `epoll.register` for each entry's host FD. + +- [ ] **Step 5: Run new test + integration suite** + +```bash +cargo test --test network_baseline epoll_set_rebuilt +cargo test --test snapshot_integration -- --ignored --test-threads=1 +``` + +Expected: PASS. + +- [ ] **Step 6: Commit** + +```bash +git add tests/network_baseline.rs src/network/slirp.rs src/vmm/mod.rs +git commit -m "feat(slirp): rebuild epoll set on snapshot restore + +epoll_fd is a kernel handle and cannot serialize. After +flow_table is reconstructed from snapshot bytes, register every +host FD with a fresh EpollDispatch." +``` + +--- + +### Task 13: Bench the win + perf gate + +**Files:** +- Modify: `benches/network.rs` — add `tcp_rx_latency_one_packet`. +- Modify: `src/bin/voidbox-network-bench/main.rs` — add `tcp_rx_latency_us_p50` measurement. + +- [ ] **Step 1: Add divan microbench** + +In `benches/network.rs`, add: + +```rust +/// Phase 6.4 baseline: time from "host write returns" to "guest +/// sees data in drain_to_guest output". Pre-6.4 this was bounded +/// below by the 5 ms net_poll_thread cycle; post-6.4 epoll +/// dispatch should deliver in microseconds. +#[divan::bench] +fn tcp_rx_latency_one_packet(bencher: Bencher) { + // ... handshake setup outside the timed loop ... + bencher.bench_local(|| { + // Host writes; measure how fast the bytes appear in the + // SlirpBackend's drain output. + }); +} +``` + +Full implementation: harness similar to `tcp_inbound_syn_ack_transition` shape — use `bench-helpers` feature for synthetic flow seeding, drive the data path inside the timed closure. + +- [ ] **Step 2: Add wall-clock measurement to `voidbox-network-bench`** + +In `src/bin/voidbox-network-bench/main.rs`, add a `tcp_rx_latency_us_p50` field to `Report` and a `measure_rx_latency` function that boots a VM, opens a guest→host flow, has the host write small packets, and measures host-T0-to-guest-arrival via the SLIRP relay. + +- [ ] **Step 3: Run the perf gate against `origin/main`** + +```bash +scripts/bench-compare.sh --baseline origin/main --skip-vm > /tmp/phase6.4-vs-main.md +cat /tmp/phase6.4-vs-main.md +``` + +Validate per the hard performance gate at the top of this plan: + +- Every comparable bench: HEAD ≤ baseline + 5 %. +- `tcp_rx_latency_one_packet` (HEAD-only) shows a sub-millisecond median. +- `port_forward_accept_latency` improves by ≥ 30 %, *or* document why it stays (likely the listener accept thread is still on the 50 ms cycle — fixing it is a small follow-up step in Phase 6.4 itself or its own task; decide before committing). + +- [ ] **Step 4: If `port_forward_accept_latency` doesn't improve, add a fix-up sub-task** to also move the listener accept onto epoll. The plan permits this — see Architecture notes. + +- [ ] **Step 5: Commit benches + the perf-gate output** + +```bash +git add benches/network.rs src/bin/voidbox-network-bench/main.rs +git commit -m "bench(network): tcp_rx_latency_one_packet + voidbox-network-bench p50 + +Captures the Phase 6.4 win numerically. Pre-6.4 RX latency was +bounded below by the 5 ms net_poll_thread cycle; post-6.4 epoll +dispatch lands in microseconds. + +scripts/bench-compare.sh --baseline origin/main --skip-vm output +attached as /tmp/phase6.4-vs-main.md (not committed; consult the +PR description for the table)." +``` + +--- + +### Task 14: Phase 6.4 validation gate + +- [ ] **Step 1: Standard validation contract** (per `AGENTS.md`) + +```bash +cargo fmt --all -- --check +cargo clippy --workspace --all-targets --all-features -- -D warnings +cargo test --workspace --all-features +cargo test --doc --workspace --all-features +``` + +All must pass. + +- [ ] **Step 2: VM suites** + +```bash +export VOID_BOX_KERNEL=/boot/vmlinuz-$(uname -r) +export VOID_BOX_INITRAMFS=/tmp/void-box-test-rootfs.cpio.gz +cargo test --test conformance -- --ignored --test-threads=1 +cargo test --test oci_integration -- --ignored --test-threads=1 +cargo test --test snapshot_integration -- --ignored --nocapture --test-threads=1 +cargo test --test e2e_telemetry -- --ignored --test-threads=1 +cargo test --test e2e_skill_pipeline -- --ignored --test-threads=1 +cargo test --test e2e_mount -- --ignored --test-threads=1 +cargo test --test e2e_service_mode -- --ignored --test-threads=1 +cargo test --test e2e_sidecar -- --ignored --test-threads=1 +``` + +All must pass. + +- [ ] **Step 3: aarch64 cross-check** + +```bash +CFLAGS_aarch64_unknown_linux_gnu="--sysroot=/usr/aarch64-redhat-linux/sys-root/fc43" \ + RUSTFLAGS="-D warnings" \ + cargo check --target aarch64-unknown-linux-gnu -p void-box --lib --tests +``` + +- [ ] **Step 4: Hard perf gate** + +```bash +scripts/bench-compare.sh --baseline origin/main --skip-vm +``` + +Validate against the contract at the top of this plan. **The PR is not allowed to merge** until this passes. + +- [ ] **Step 5: Commit gate evidence in the PR description (no commit needed)** + +Capture the bench-compare output in the PR body. Phase 6.4 PR is then ready for review. + +--- + +## Rollback plan + +Each task lands as one commit. If Task N introduces a regression caught at Task M (where M > N), `git revert` Task N's commit and redispatch its implementer with the failure context. No task irreversibly changes wire format or snapshot layout — every change is additive (new fields, new module) or behavior-preserving refactor. + +The only exception is the snapshot rebuild path (Task 12). If that's wrong on disk, restored backends will have a fresh-but-empty epoll set and connections will appear hung. Test the snapshot path *before* claiming Task 12 done. + +## Out of scope (deferred to Phase 6.1 / 6.2 / 6.3) + +- TCP half-close — Phase 6.1. +- Async outbound `connect` — Phase 6.2 (will *consume* the epoll dispatch primitive added here for `EPOLLOUT` writability detection). +- Window management — Phase 6.3. + +## Reviewer pointers + +- **Lock granularity:** verify `epoll_wait` does not happen under the device lock (Task 11 Step 2). +- **FD lifecycle:** every `flow_table.insert` has a matching `epoll.register`; every `flow_table.remove` has a matching `epoll.unregister`. grep for both pairs and pair-count. +- **Self-pipe correctness:** `Waker::wake` is no-block, no-allocate, signal-safe-adjacent. +- **Snapshot rebuild:** Task 12's test is the contract; verify the count helper is `#[cfg(test)]` only. +- **Token uniqueness:** `flow_token_for_tcp` is unique within the flow table because `NatKey` is unique. The 16-bit dst_ip truncation is intentional for v4-only addresses on a /16 SLIRP subnet — collisions with foreign IPs are not possible because all flows route through the gateway. + +## Document history + +- 2026-04-30: initial plan written, hard performance gate locked. diff --git a/src/bin/voidbox-network-bench/main.rs b/src/bin/voidbox-network-bench/main.rs index e43e10e5..a18ac09e 100644 --- a/src/bin/voidbox-network-bench/main.rs +++ b/src/bin/voidbox-network-bench/main.rs @@ -127,9 +127,9 @@ FAST SMOKE RUN\n\ no_throughput: bool, /// Push N MB through the SLIRP relay against a slow-receiving host - /// (`SO_RCVBUF = 4096`). Forces the post-Phase-3 backpressure path to - /// actually engage — the small-payload throughput numbers don't - /// exercise it because the host drains too fast. + /// (`SO_RCVBUF = 4096`). Forces the backpressure path to actually + /// engage — the small-payload throughput numbers don't exercise it + /// because the host drains too fast. /// /// 0 (default) skips the measurement. 10 MiB is a reasonable smoke /// value; larger N produces more stable numbers but takes longer. @@ -140,10 +140,10 @@ FAST SMOKE RUN\n\ #[derive(Serialize, Debug, Default)] struct Report { /// Sustained guest→host throughput against a slow-receiving host - /// (`SO_RCVBUF = 4096`). Probes the post-Phase-3 TCP backpressure path - /// — pre-Phase-3 this would be the 256 KB cliff (connection RST mid- - /// transfer); post-Phase-3 it's a real number bounded by the kernel - /// recv buffer's drain rate. Populated only when `--bulk-mb > 0`. + /// (`SO_RCVBUF = 4096`). Probes the TCP backpressure path — rather + /// than hitting a fixed userspace cliff and resetting the connection, + /// throughput is bounded by the kernel recv buffer's drain rate. + /// Populated only when `--bulk-mb > 0`. tcp_bulk_throughput_g2h_mbps: Option, tcp_throughput_g2h_mbps: Option, // TODO(h2g): host→guest requires either a guest-side `nc -l` listener @@ -159,6 +159,17 @@ FAST SMOKE RUN\n\ tcp_crr_latency_us_p50: Option, udp_dns_qps: Option, icmp_rr_latency_us_p50: Option, + /// p50 host→guest RX latency: "host write completes" → "SLIRP relay + /// delivers frame to drain_to_guest output". Measured at the VMM + /// layer against a live guest TCP flow via `nc -l`. + /// + /// Not yet populated: wiring a guest-side listener and synchronizing + /// on first-byte arrival requires either a guest daemon or an additional + /// RPC. The divan microbench `tcp_rx_latency_one_packet` captures the + /// SLIRP-layer dispatch cost directly (epoll_wait + peek + frame build); + /// this wall-clock field will complement it once the guest-listener + /// infrastructure is in place. + tcp_rx_latency_us_p50: Option, } #[tokio::main(flavor = "multi_thread")] @@ -328,9 +339,9 @@ FAST SMOKE RUN\n\ /// pinned on the listener socket. The small recv buffer forces TCP-level /// backpressure: the kernel send buffer fills, our `host_stream.write` /// returns `WouldBlock`, the SLIRP relay declines to ACK the guest's - /// segment, and the guest retransmits. Pre-Phase-3 this same scenario hit - /// the 256 KB userspace cliff (`MAX_TO_HOST_BUFFER`) and got the connection - /// reset; post-Phase-3 the relay holds the line and the bytes go through. + /// segment, and the guest retransmits. The relay holds the line and the + /// bytes go through rather than resetting the connection at a fixed + /// userspace buffer limit. /// /// Returned value is the mean Mbps across `iterations` iterations of pushing /// `bulk_mb` MiB. Effective throughput is much lower than @@ -396,7 +407,7 @@ FAST SMOKE RUN\n\ exit_code = ?output.exit_code, stderr = output.stderr_str(), "bulk-g2h iteration non-zero exit; the connection may have \ - been reset (pre-Phase-3 cliff regression?). skipping" + been reset (backpressure cliff regression?). skipping" ); continue; } diff --git a/src/bin/voidbox-startup-bench/main.rs b/src/bin/voidbox-startup-bench/main.rs index 4c2b9f8d..4380bf10 100644 --- a/src/bin/voidbox-startup-bench/main.rs +++ b/src/bin/voidbox-startup-bench/main.rs @@ -83,7 +83,7 @@ async fn main() -> Result<(), Box> { ); if !warm_only { - eprintln!("\n-- Phase 1: cold boot --"); + eprintln!("\n-- cold boot --"); let mut cold: Vec = Vec::with_capacity(iters); for i in 0..iters { // Route console to a file only on the very first iteration so we @@ -109,7 +109,7 @@ async fn main() -> Result<(), Box> { } if !cold_only { - eprintln!("\n-- Phase 2: warm (snapshot-restore) --"); + eprintln!("\n-- warm (snapshot-restore) --"); let tmp = tempfile::tempdir()?; let snap_path = capture_snapshot(memory_mb, tmp.path()).await?; eprintln!("captured snapshot at: {}", snap_path.display()); diff --git a/src/daemon.rs b/src/daemon.rs index ac422234..85340cb3 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -1704,7 +1704,7 @@ async fn spawn_service_run( let mut published = false; let mut terminalized = false; - // Phase 1: Wait for output publication OR exit OR watchdog. + // Wait for output publication OR exit OR watchdog. tokio::select! { output_result = &mut output_rx => { if let Ok(publication) = output_result { diff --git a/src/devices/virtio_net.rs b/src/devices/virtio_net.rs index df14489d..71214d47 100644 --- a/src/devices/virtio_net.rs +++ b/src/devices/virtio_net.rs @@ -785,6 +785,29 @@ impl VirtioNetDevice { pub fn mac(&self) -> &[u8; 6] { &self.mac } + + /// Return the epoll dispatch instance from the underlying network backend, + /// if the backend is a `SlirpBackend` (Linux only). + /// + /// `net_poll_thread` uses this to block on `epoll_wait` instead of + /// sleeping, waking immediately when host sockets become readable. + #[cfg(target_os = "linux")] + pub fn epoll_arc( + &self, + ) -> Option> { + let backend = self.slirp.lock().unwrap(); + backend.epoll_arc() + } + + /// Forward ready epoll events into the network backend's per-tick queue. + /// + /// Called by net_poll_thread after each epoll_wait returns so that + /// drain_to_guest can process events without re-locking EpollDispatch. + #[cfg(target_os = "linux")] + pub fn push_events_to_backend(&self, events: &[crate::network::epoll_dispatch::EpollEvent]) { + let backend = self.slirp.lock().unwrap(); + backend.push_ready_events(events); + } } #[cfg(test)] diff --git a/src/network/epoll_dispatch.rs b/src/network/epoll_dispatch.rs new file mode 100644 index 00000000..046f9510 --- /dev/null +++ b/src/network/epoll_dispatch.rs @@ -0,0 +1,387 @@ +//! Linux epoll-driven readiness dispatch for SLIRP host sockets. +//! +//! Owns one `epoll_fd` plus an eagerly-initialized self-pipe. Callers +//! register socket FDs with a `FlowToken` (a 64-bit identifier the +//! dispatcher returns on readiness). The poll thread calls +//! `wait_with_timeout` to block until any registered FD is ready or the +//! timeout fires, then drains the events into a caller-owned buffer. +//! +//! `EpollDispatch` is `Sync`: the Linux kernel serializes concurrent +//! `epoll_ctl` and `epoll_wait` calls on the same epoll fd internally. +//! Callers can therefore share one `Arc` across threads +//! and call `register`/`unregister` without an outer `Mutex`, eliminating +//! the lock-contention between `wait_with_timeout` (net-poll thread) and +//! `register` (vCPU thread handling new TCP SYNs). +//! +//! Why no crate? The standard `mio`/`tokio` story would pull in a +//! reactor + a runtime that the SLIRP poll loop does not need. +//! `libc::epoll_*` is two syscalls, fully observable, and the surface +//! fits in ~200 lines. + +use std::io; +use std::os::fd::{AsRawFd, FromRawFd, OwnedFd, RawFd}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::time::Duration; + +/// Opaque per-FD identifier the caller uses to look up which flow a +/// readiness event belongs to. Encoded into `epoll_data.u64`. +pub type FlowToken = u64; + +/// One readiness event, mapped from `libc::epoll_event`. +#[allow(dead_code)] +#[derive(Debug, Clone, Copy)] +pub struct EpollEvent { + pub token: FlowToken, + pub readable: bool, + pub writable: bool, +} + +/// Direction of interest for an `EpollDispatch::register` call. +/// +/// Closed enum lets the type system reject impossible combinations (e.g. +/// "neither read nor write") at compile time and gives a clear name to +/// each mode rather than two opaque booleans. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum RegisterMode { + /// Wake on EPOLLIN only. + Read, + /// Wake on EPOLLOUT only. + Write, + /// Wake on either EPOLLIN or EPOLLOUT. + ReadWrite, +} + +/// Sentinel token reserved for the self-pipe wakeup mechanism. +/// Never returned to callers. +const SELF_PIPE_TOKEN: FlowToken = u64::MAX; + +/// `EpollDispatch` is `Sync`: concurrent `epoll_ctl` and `epoll_wait` +/// on the same epoll fd are kernel-serialized and safe from multiple +/// threads. The only shared state beyond the fd is `registered_count` +/// (an `AtomicUsize`) and the self-pipe (immutable after construction). +pub struct EpollDispatch { + epoll_fd: OwnedFd, + /// Read end of the self-pipe; registered with EPOLLIN at construction. + read_end: OwnedFd, + /// Cloneable waker backed by the write end of the self-pipe. + waker_handle: Arc, + /// Number of user-registered FDs (excludes the self-pipe). + registered_count: AtomicUsize, +} + +// SAFETY: All mutable state is either atomic or only accessed from one +// thread at a time (epoll_ctl/epoll_wait are kernel-serialized on the fd). +unsafe impl Sync for EpollDispatch {} + +impl EpollDispatch { + /// Create a new epoll instance with `EPOLL_CLOEXEC` and eagerly + /// initialize the self-pipe so `waker()` is lock-free. + pub fn new() -> io::Result { + // SAFETY: `epoll_create1` returns -1 on error and a valid fd + // otherwise. We wrap into OwnedFd so Drop closes it. + let raw = unsafe { libc::epoll_create1(libc::EPOLL_CLOEXEC) }; + if raw < 0 { + return Err(io::Error::last_os_error()); + } + let epoll_fd = unsafe { OwnedFd::from_raw_fd(raw) }; + + // Eagerly create the self-pipe and register its read end. + // This avoids the lazy-init branch in the hot path and lets + // `waker()` take `&self` instead of `&mut self`. + let (read_fd, write_fd) = create_pipe2_nonblock_cloexec(); + let mut ev = libc::epoll_event { + events: libc::EPOLLIN as u32, + u64: SELF_PIPE_TOKEN, + }; + // SAFETY: epoll_ctl ADD with a valid fd and event struct. + let epoll_ctl_result = unsafe { + libc::epoll_ctl( + epoll_fd.as_raw_fd(), + libc::EPOLL_CTL_ADD, + read_fd.as_raw_fd(), + &mut ev as *mut _, + ) + }; + if epoll_ctl_result < 0 { + return Err(io::Error::last_os_error()); + } + + Ok(Self { + epoll_fd, + read_end: read_fd, + waker_handle: Arc::new(write_fd), + registered_count: AtomicUsize::new(0), + }) + } + + /// Register `fd` with the dispatcher under `token` for the requested + /// readiness `mode`. `token` is opaque to the dispatcher — returned + /// verbatim on readiness events. + /// + /// Thread-safe: concurrent calls with `unregister` and + /// `wait_with_timeout` are serialized by the kernel's per-epoll-fd lock. + pub fn register(&self, fd: RawFd, token: FlowToken, mode: RegisterMode) -> io::Result<()> { + let events: u32 = match mode { + RegisterMode::Read => libc::EPOLLIN as u32, + RegisterMode::Write => libc::EPOLLOUT as u32, + RegisterMode::ReadWrite => (libc::EPOLLIN | libc::EPOLLOUT) as u32, + }; + let mut ev = libc::epoll_event { events, u64: token }; + // SAFETY: epoll_ctl reads `ev` for ADD; we own `fd` for the + // lifetime of the registration (caller's contract). + let epoll_ctl_result = unsafe { + libc::epoll_ctl( + self.epoll_fd.as_raw_fd(), + libc::EPOLL_CTL_ADD, + fd, + &mut ev as *mut _, + ) + }; + if epoll_ctl_result < 0 { + return Err(io::Error::last_os_error()); + } + if token != SELF_PIPE_TOKEN { + self.registered_count.fetch_add(1, Ordering::Relaxed); + } + Ok(()) + } + + /// Thread-safe: concurrent calls with `register` and `wait_with_timeout` + /// are serialized by the kernel's per-epoll-fd lock. + pub fn unregister(&self, fd: RawFd) -> io::Result<()> { + // SAFETY: epoll_ctl ignores the event pointer for DEL but + // still requires it to be non-null on older kernels. + let mut ev = libc::epoll_event { events: 0, u64: 0 }; + let epoll_ctl_result = unsafe { + libc::epoll_ctl( + self.epoll_fd.as_raw_fd(), + libc::EPOLL_CTL_DEL, + fd, + &mut ev as *mut _, + ) + }; + if epoll_ctl_result < 0 { + return Err(io::Error::last_os_error()); + } + self.registered_count.fetch_sub(1, Ordering::Relaxed); + Ok(()) + } + + /// Returns the number of user-registered FDs (excludes the self-pipe). + #[cfg(any(test, feature = "bench-helpers"))] + pub(crate) fn registered_fd_count(&self) -> usize { + self.registered_count.load(Ordering::Relaxed) + } + + /// Block up to `timeout` for any registered FD to become ready. + /// Drains ready events into `out` (cleared first). Returns the + /// number of raw kernel events (including self-pipe wakes) so callers + /// can use it for adaptive-timeout decisions. + /// + /// `timeout = Duration::ZERO` is a non-blocking poll. + /// + /// Self-pipe events are drained to EAGAIN in-place: no extra allocation. + pub fn wait_with_timeout( + &self, + out: &mut Vec, + timeout: Duration, + ) -> io::Result { + out.clear(); + + // Pre-allocate a fixed-size event buffer. 64 ready FDs per + // wait is more than enough for our flow counts; events not + // returned this round will surface on the next wait. + let mut raw_events: [libc::epoll_event; 64] = [libc::epoll_event { events: 0, u64: 0 }; 64]; + + let timeout_ms: i32 = timeout.as_millis().min(i32::MAX as u128) as i32; + + // SAFETY: epoll_wait writes up to raw_events.len() entries; + // returns -1 on error, 0 on timeout, n>0 on events. + let n = unsafe { + libc::epoll_wait( + self.epoll_fd.as_raw_fd(), + raw_events.as_mut_ptr(), + raw_events.len() as i32, + timeout_ms, + ) + }; + if n < 0 { + // EINTR is non-fatal — caller can retry on next tick. + let err = io::Error::last_os_error(); + if err.raw_os_error() == Some(libc::EINTR) { + return Ok(0); + } + return Err(err); + } + + let raw_count = n as usize; + let mut drained_pipe = false; + + // Single pass: filter self-pipe events (draining the pipe to EAGAIN + // on first occurrence), push real events into `out`. + // No extra allocation: `out` was cleared at the top of this function. + for &raw in &raw_events[..raw_count] { + if raw.u64 == SELF_PIPE_TOKEN { + if !drained_pipe { + // Drain the self-pipe to EAGAIN so EPOLLIN is not + // re-asserted on the next wait. A single read is + // insufficient when wakes arrive faster than we drain + // (burst connection setup), so loop until read returns + // ≤ 0 or a partial fill (pipe empty). + let mut scratch = [0u8; 64]; + loop { + // SAFETY: read from O_NONBLOCK pipe; + // EAGAIN / EOF terminates the loop. + let r = unsafe { + libc::read( + self.read_end.as_raw_fd(), + scratch.as_mut_ptr() as *mut _, + scratch.len(), + ) + }; + if r <= 0 || (r as usize) < scratch.len() { + break; + } + } + drained_pipe = true; + } + continue; + } + out.push(EpollEvent { + token: raw.u64, + readable: (raw.events & libc::EPOLLIN as u32) != 0, + writable: (raw.events & libc::EPOLLOUT as u32) != 0, + }); + } + + Ok(raw_count) + } + + /// Returns a `Waker` that, when called, unblocks any thread + /// currently inside `wait_with_timeout`. The waker is cheap to + /// clone and may be stored across threads. + pub fn waker(&self) -> Waker { + Waker { + write_end: self.waker_handle.clone(), + } + } + + #[cfg(test)] + fn epoll_fd_for_test(&self) -> RawFd { + self.epoll_fd.as_raw_fd() + } +} + +/// Cloneable wakeup handle for `EpollDispatch`. Writing one byte to +/// the underlying pipe wakes a thread blocked in `wait_with_timeout`. +#[derive(Debug, Clone)] +pub struct Waker { + write_end: Arc, +} + +impl Waker { + pub fn wake(&self) { + let buf = [0u8; 1]; + // SAFETY: write to a non-blocking pipe never blocks. We + // ignore EAGAIN — the pipe already has bytes pending, which + // means a wakeup is already queued. + let _ = unsafe { libc::write(self.write_end.as_raw_fd(), buf.as_ptr() as *const _, 1) }; + } +} + +fn create_pipe2_nonblock_cloexec() -> (OwnedFd, OwnedFd) { + let mut fds = [0 as RawFd; 2]; + // SAFETY: pipe2 with O_NONBLOCK | O_CLOEXEC writes two fds into fds. + let rc = unsafe { libc::pipe2(fds.as_mut_ptr(), libc::O_NONBLOCK | libc::O_CLOEXEC) }; + assert!(rc == 0, "pipe2 failed: {}", io::Error::last_os_error()); + let read_end = unsafe { OwnedFd::from_raw_fd(fds[0]) }; + let write_end = unsafe { OwnedFd::from_raw_fd(fds[1]) }; + (read_end, write_end) +} + +#[cfg(test)] +mod tests { + use super::*; + use std::os::fd::AsRawFd; + + #[test] + fn dispatch_new_creates_epoll_fd() { + let dispatch = EpollDispatch::new().expect("EpollDispatch::new"); + assert!(dispatch.epoll_fd_for_test() >= 0); + } + + #[test] + fn register_then_unregister_round_trip() { + use std::net::TcpListener; + let listener = TcpListener::bind("127.0.0.1:0").expect("bind"); + let dispatch = EpollDispatch::new().expect("EpollDispatch::new"); + let token: FlowToken = 0xDEAD_BEEF; + dispatch + .register(listener.as_raw_fd(), token, RegisterMode::Read) + .expect("register"); + dispatch + .unregister(listener.as_raw_fd()) + .expect("unregister"); + } + + #[test] + fn register_invalid_fd_returns_error() { + let dispatch = EpollDispatch::new().expect("EpollDispatch::new"); + let result = dispatch.register(-1, 0, RegisterMode::Read); + assert!(result.is_err()); + } + + #[test] + fn wait_returns_event_when_socket_becomes_readable() { + use std::io::Write; + use std::net::{TcpListener, TcpStream}; + let listener = TcpListener::bind("127.0.0.1:0").expect("bind"); + let addr = listener.local_addr().unwrap(); + let server = std::thread::spawn(move || { + let (mut sock, _) = listener.accept().unwrap(); + sock.write_all(b"hi").unwrap(); + }); + let stream = TcpStream::connect(addr).expect("connect"); + server.join().unwrap(); + + let dispatch = EpollDispatch::new().expect("new"); + dispatch + .register(stream.as_raw_fd(), 0xCAFE, RegisterMode::Read) + .expect("register"); + + let mut events: Vec = Vec::new(); + let n = dispatch + .wait_with_timeout(&mut events, Duration::from_secs(1)) + .expect("wait"); + assert_eq!(n, 1); + assert_eq!(events[0].token, 0xCAFE); + assert!(events[0].readable); + } + + #[test] + fn wakeup_unblocks_wait_immediately() { + use std::time::Instant; + let dispatch = EpollDispatch::new().expect("new"); + let waker = dispatch.waker(); + + // Start the wait in another thread with a long timeout. + let wait_thread = std::thread::spawn(move || -> std::time::Duration { + let mut events: Vec = Vec::new(); + let start = Instant::now(); + let _ = dispatch.wait_with_timeout(&mut events, Duration::from_secs(5)); + start.elapsed() + }); + + // Wake immediately. + std::thread::sleep(Duration::from_millis(10)); + waker.wake(); + + let elapsed = wait_thread.join().expect("wait thread"); + // Wait thread should return well under the 5 s timeout. + assert!( + elapsed < Duration::from_secs(1), + "wait did not return on wakeup: {elapsed:?}" + ); + } +} diff --git a/src/network/mod.rs b/src/network/mod.rs index 4de32a2a..fa498280 100644 --- a/src/network/mod.rs +++ b/src/network/mod.rs @@ -6,6 +6,7 @@ //! - virtio-net configuration //! - Network isolation and NAT +pub(crate) mod epoll_dispatch; pub mod nat; pub mod slirp; @@ -93,6 +94,25 @@ pub trait NetworkBackend: Send { fn is_healthy(&self) -> bool { true } + + /// Return the epoll dispatch instance shared by this backend, if any. + /// + /// Only `SlirpBackend` returns `Some`; other backends (mock, future + /// alternatives) return `None`. `net_poll_thread` uses this to block on + /// `epoll_wait` instead of sleeping, reducing host CPU burn between + /// network events. + #[cfg(target_os = "linux")] + fn epoll_arc(&self) -> Option> { + None + } + + /// Push ready epoll events into the backend's per-tick queue. + /// + /// Called by net_poll_thread after each epoll_wait returns, so + /// drain_to_guest can consume them without re-locking EpollDispatch. + /// The default is a no-op; `SlirpBackend` overrides this. + #[cfg(target_os = "linux")] + fn push_ready_events(&self, _events: &[epoll_dispatch::EpollEvent]) {} } /// TAP device handle diff --git a/src/network/nat.rs b/src/network/nat.rs index ef3f5656..23932d10 100644 --- a/src/network/nat.rs +++ b/src/network/nat.rs @@ -6,9 +6,9 @@ //! function call. //! //! Mirrors passt's `fwd.c::nat_inbound` design: address rewrites are -//! pure functions of (address, rules), not per-flow state. Sets up the -//! shape for IPv6 dual-stack (Phase 6) and port-forwarding (Phase 5 -//! Task 5.5). +//! pure functions of (address, rules), not per-flow state. The same +//! pure-function shape extends cleanly to IPv6 dual-stack and +//! port-forwarding without introducing per-flow mutable state. use std::net::{Ipv4Addr, SocketAddr}; diff --git a/src/network/slirp.rs b/src/network/slirp.rs index 19d7720f..827134ae 100644 --- a/src/network/slirp.rs +++ b/src/network/slirp.rs @@ -10,8 +10,8 @@ //! //! Architecture: //! - Unified flow table: All TCP/UDP/ICMP echo flows live in a single -//! `flow_table: HashMap` (Phase 4). Per-protocol -//! relay logic dispatches on the FlowEntry variant. +//! `flow_table: HashMap`. Per-protocol relay logic +//! dispatches on the FlowEntry variant. //! - ARP: custom handler responds as gateway for all 10.0.2.x IPs //! - TCP: passt-style sequence-mirroring NAT (host→guest via //! `recv(MSG_PEEK)` + ACK-driven consume; guest→host via direct @@ -31,11 +31,12 @@ use std::collections::VecDeque; use std::io::{self, Read, Write}; use std::net::{Ipv4Addr, SocketAddr, TcpListener, TcpStream, UdpSocket}; use std::os::fd::{AsRawFd, FromRawFd}; -use std::sync::atomic::{AtomicBool, AtomicU8, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU64, AtomicU8, Ordering}; use std::sync::{mpsc, Arc, Mutex}; use std::thread::JoinHandle; use std::time::{Duration, Instant}; +use crate::network::epoll_dispatch::{EpollDispatch, EpollEvent, RegisterMode, Waker}; use crate::network::{nat, NetworkBackend}; /// Cached DNS response with expiry. @@ -105,7 +106,37 @@ const PORT_FORWARD_POLL_INTERVAL: Duration = Duration::from_millis(50); static ICMP_PROBE: AtomicU8 = AtomicU8::new(0); // ────────────────────────────────────────────────────────────────────── -// Inbound port-forward accept channel (Phase 5.5b) +// EpollDispatch flow tokens +// ────────────────────────────────────────────────────────────────────── + +/// High-byte protocol tag embedded in the upper 8 bits of a `FlowToken`. +/// The lower 56 bits are a monotonic per-flow counter (see `FLOW_TOKEN_COUNTER`). +/// The tag lets the relay loop distinguish protocol families with a bitmask +/// instead of a separate lookup; the counter guarantees global uniqueness +/// even when two flows share the same port tuple. +const PROTO_TAG_MASK: u64 = 0xFF00_0000_0000_0000; +const PROTO_TAG_TCP: u64 = 0x0100_0000_0000_0000; +const PROTO_TAG_UDP: u64 = 0x0200_0000_0000_0000; +const PROTO_TAG_ICMP: u64 = 0x0300_0000_0000_0000; + +/// Monotonic counter for flow token allocation. The lower 56 bits of each +/// `FlowToken` are drawn from here; the upper 8 bits carry `PROTO_TAG_*`. +/// 2^56 unique tokens are available before wrap — effectively infinite for +/// any realistic process lifetime. +static FLOW_TOKEN_COUNTER: AtomicU64 = AtomicU64::new(0); + +/// Allocate a fresh, globally unique `FlowToken` tagged for the given protocol. +/// +/// The lower 56 bits are drawn from a relaxed monotonic counter shared across +/// all `SlirpBackend` instances. The upper 8 bits carry `proto_tag` so relay +/// loops can demux by protocol without an additional map lookup. +fn next_flow_token(proto_tag: u64) -> u64 { + let counter = FLOW_TOKEN_COUNTER.fetch_add(1, Ordering::Relaxed) & 0x00FF_FFFF_FFFF_FFFF; + proto_tag | counter +} + +// ────────────────────────────────────────────────────────────────────── +// Inbound port-forward accept channel // ────────────────────────────────────────────────────────────────────── /// One accepted host-side TCP connection waiting to be forwarded into the guest. @@ -164,6 +195,10 @@ struct TcpNatEntry { /// the relay can decide how much new payload to peek+send each poll. /// The ACK-driven consume path decrements this as the guest ACKs data. bytes_in_flight: u32, + /// Globally unique epoll token for this flow. Allocated once on insert + /// via `next_flow_token(PROTO_TAG_TCP)` and stored here so unregister + /// sites never need to recompute it. + flow_token: u64, } /// Key for the ICMP echo NAT table: (guest ICMP id, destination IP). @@ -190,6 +225,10 @@ struct IcmpEchoEntry { // Read in `relay_icmp_echo` when translating the reply frame. guest_id: u16, last_activity: Instant, + /// Globally unique epoll token for this flow. Allocated once on insert + /// via `next_flow_token(PROTO_TAG_ICMP)` and stored here so unregister + /// sites never need to recompute it. + flow_token: u64, } /// Key for the UDP flow NAT table: (guest source port, destination IP, destination port). @@ -211,6 +250,10 @@ struct UdpFlowEntry { sock: std::net::UdpSocket, /// Last frame timestamp; read by Task 2.4 idle-timeout reaper. last_activity: Instant, + /// Globally unique epoll token for this flow. Allocated once on insert + /// via `next_flow_token(PROTO_TAG_UDP)` and stored here so unregister + /// sites never need to recompute it. + flow_token: u64, } /// Unified flow-table key. Each variant wraps the protocol-specific @@ -260,7 +303,9 @@ fn open_icmp_socket() -> io::Result { }; if raw < 0 { let err = io::Error::last_os_error(); - if matches!(err.raw_os_error(), Some(libc::EACCES) | Some(libc::EPERM)) { + let errno = err.raw_os_error(); + let unprivileged_icmp_forbidden = errno == Some(libc::EACCES) || errno == Some(libc::EPERM); + if unprivileged_icmp_forbidden { // First failure transitions 0 → 2 and emits the warn-once log. // swap returns the previous value; only log if we were the first // to set it. @@ -467,14 +512,18 @@ pub struct SlirpBackend { dns_cache: HashMap, DnsCacheEntry>, /// DNS queries waiting to be resolved on the net-poll thread. pending_dns: Vec, - /// Unified flow table — Phase 4. + /// Unified flow table keyed by protocol + port tuple. /// - /// All three protocols (TCP, UDP, ICMP echo) are keyed here after Task 4.5. - /// ICMP migrated in 4.3; UDP in 4.4; TCP in 4.5. + /// All three protocols (TCP, UDP, ICMP echo) share this table so a single + /// dispatch loop handles all active flows. flow_table: HashMap, + /// Reverse map from `FlowToken` → `FlowKey` for O(1) readiness-event + /// dispatch. Maintained in sync with `flow_table`: every insert adds an + /// entry; every remove clears it. + token_to_key: HashMap, /// Background threads bound to host TCP ports for inbound port - /// forwarding (Phase 5.5b). Each handle corresponds to one - /// `nat::PortForward` rule. Joined on `Drop`. + /// forwarding. Each handle corresponds to one `nat::PortForward` rule. + /// Joined on `Drop`. port_forward_listeners: Vec>, /// Shutdown signal for `port_forward_listeners`. Set true on Drop; /// each listener thread checks it after every accept and exits cleanly. @@ -488,6 +537,33 @@ pub struct SlirpBackend { /// so test helpers can inject [`InboundAccept`] values directly. #[allow(dead_code)] accept_sender: mpsc::Sender, + /// Epoll dispatcher for host socket readiness. `EpollDispatch` is + /// `Sync`: `register`/`unregister` and `wait_with_timeout` are + /// kernel-serialized on the same epoll fd, so no `Mutex` wrapper is + /// needed. The `Arc` lets the net-poll thread share the dispatcher + /// without holding the device lock. + epoll: Arc, + /// Cloneable waker that interrupts `EpollDispatch::wait_with_timeout`. + /// Used after flow-table mutations to unblock the poll thread immediately. + epoll_waker: Waker, + /// Ready events fed by the net-poll thread after each blocking + /// epoll_wait. drain_to_guest drains this on every call without + /// any EpollDispatch lock contention. + pending_events: Mutex>, + /// Flow keys queued for removal because their state advanced to + /// Closed in a non-relay code path (e.g. guest FIN/RST in + /// handle_tcp_frame). Drained at the bottom of relay_tcp_nat_data + /// without scanning the full flow_table. + pending_close: Vec, + /// Set to `true` the first time `push_ready_events` is called — + /// signals "an external poller (net_poll_thread) is feeding us + /// readiness events." When true, `drain_to_guest` skips its + /// non-blocking-poll fallback (one mutex op + one epoll_wait + /// syscall per call, ~310 ns overhead) and only consumes + /// `pending_events`. Tests/benches without a net_poll_thread + /// keep the fallback so synthetic harnesses still observe + /// readiness. + has_external_poller: AtomicBool, } impl SlirpBackend { @@ -563,11 +639,15 @@ impl SlirpBackend { nat.deny_cidrs.len(), nat.port_forwards.len(), dns_servers ); - // Spawn listener threads for port-forwards (Phase 5.5b). + // Spawn listener threads for port-forwards. let port_forward_shutdown = Arc::new(AtomicBool::new(false)); let (port_forward_listeners, pending_inbound_accepts, accept_sender) = spawn_port_forward_listeners(&nat, &port_forward_shutdown); + let epoll_inner = EpollDispatch::new()?; + let epoll_waker = epoll_inner.waker(); + let epoll = Arc::new(epoll_inner); + Ok(Self { queue, iface, @@ -582,10 +662,16 @@ impl SlirpBackend { dns_cache: HashMap::new(), pending_dns: Vec::new(), flow_table: HashMap::new(), + token_to_key: HashMap::new(), port_forward_listeners, port_forward_shutdown, pending_inbound_accepts, accept_sender, + epoll, + epoll_waker, + pending_events: Mutex::new(Vec::new()), + pending_close: Vec::new(), + has_external_poller: AtomicBool::new(false), }) } @@ -637,6 +723,7 @@ impl SlirpBackend { dst_ip: SLIRP_GATEWAY_IP, dst_port: high_port, }; + let token = next_flow_token(PROTO_TAG_TCP); let entry = TcpNatEntry { host_stream, state: TcpNatState::SynSent, @@ -644,9 +731,22 @@ impl SlirpBackend { guest_ack: 0, last_activity: Instant::now(), bytes_in_flight: 0, + flow_token: token, }; - self.flow_table - .insert(FlowKey::Tcp(key), FlowEntry::Tcp(entry)); + let host_fd = entry.host_stream.as_raw_fd(); + let flow_key = FlowKey::Tcp(key); + self.flow_table.insert(flow_key, FlowEntry::Tcp(entry)); + self.token_to_key.insert(token, flow_key); + if let Err(e) = self.epoll.register(host_fd, token, RegisterMode::Read) { + warn!( + host_port = high_port, + guest_port, + fd = host_fd, + error = %e, + "SLIRP port-forward: epoll register failed; flow present but readiness-driven relay disabled" + ); + } + self.epoll_waker.wake(); let syn_frame = synthesize_inbound_syn(high_port, guest_port, our_isn); self.inject_to_guest.push(syn_frame); trace!( @@ -671,6 +771,15 @@ impl SlirpBackend { Err(_) => return Ok(()), }; + // Track inject_to_guest growth so we can wake the net-poll + // thread if this call queued any frames. The poll thread blocks + // in epoll_wait waiting on FD readiness; an ACK queued during + // guest TX has no FD-side signal (the guest is the writer, not + // the reader on the SLIRP-side socket). Without an explicit + // wake the ACK sits up to epoll_wait's timeout before being + // flushed — TCP send window stalls, throughput drops 10×. + let inject_len_before = self.inject_to_guest.len(); + match eth.ethertype() { EthernetProtocol::Arp => { self.handle_arp_frame(frame)?; @@ -682,6 +791,10 @@ impl SlirpBackend { trace!("SLIRP: ignoring ethertype {:?}", eth.ethertype()); } } + + if self.inject_to_guest.len() > inject_len_before { + self.epoll_waker.wake(); + } Ok(()) } @@ -707,16 +820,51 @@ impl SlirpBackend { // 2. Resolve pending DNS queries (off vCPU thread). self.resolve_pending_dns(); - // 3. Process TCP NAT data relay. - self.relay_tcp_nat_data(); + // 3. Collect ready events. + // + // Always drain `pending_events` first — that's the queue + // `net_poll_thread` fills via `push_ready_events` after every + // successful `epoll_wait`. If we skipped this and only polled + // epoll directly, we would lose every event the net-poll thread + // already drained: level-triggered EPOLLIN doesn't re-fire for + // data the kernel already reported, so the next non-blocking + // poll returns 0 events even when there's work to do. CRR + // connections then wait one full 50 ms epoll cycle for the NEXT + // data event before their first data is relayed. + // + // Then, only if no net-poll thread has populated the queue + // (unit tests / benches), fall back to a non-blocking poll on + // the epoll FD ourselves. `try_lock` keeps that fallback safe + // under contention. + let ready: Vec = { + let mut events: Vec = { + let mut queue = self.pending_events.lock().unwrap(); + std::mem::take(&mut *queue) + }; + // Fallback non-blocking poll only when no external poller + // (net_poll_thread) is feeding us events — otherwise we'd + // pay one mutex op + one epoll_wait syscall per call + // (~310 ns) for nothing. The flag is one-way: set by the + // first push_ready_events and stays set for the backend's + // lifetime. + if events.is_empty() && !self.has_external_poller.load(Ordering::Relaxed) { + let _ = self + .epoll + .wait_with_timeout(&mut events, std::time::Duration::ZERO); + } + events + }; + + // 4. Process TCP NAT data relay. + self.relay_tcp_nat_data(&ready); - // 4. Relay ICMP echo replies from host sockets back to the guest. - self.relay_icmp_echo(); + // 5. Relay ICMP echo replies from host sockets back to the guest. + self.relay_icmp_echo(&ready); - // 5. Relay UDP flow replies from host sockets back to the guest. - self.relay_udp_flows(); + // 6. Relay UDP flow replies from host sockets back to the guest. + self.relay_udp_flows(&ready); - // 6. Collect frames: smoltcp ARP responses + our NAT-built frames. + // 7. Collect frames: smoltcp ARP responses + our NAT-built frames. { let mut q = self.queue.lock().unwrap(); if !q.tx_queue.is_empty() || rx_count > 0 { @@ -1054,6 +1202,9 @@ impl SlirpBackend { }; let flow_key = FlowKey::Udp(key); + // Track whether this is a new entry so we can register it with epoll. + let mut new_host_fd: Option = None; + let mut new_token: u64 = 0; let entry: &mut UdpFlowEntry = match self.flow_table.entry(flow_key) { std::collections::hash_map::Entry::Occupied(o) => match o.into_mut() { FlowEntry::Udp(e) => e, @@ -1067,9 +1218,13 @@ impl SlirpBackend { return Ok(()); } }; + let token = next_flow_token(PROTO_TAG_UDP); + new_host_fd = Some(sock.as_raw_fd()); + new_token = token; match v.insert(FlowEntry::Udp(UdpFlowEntry { sock, last_activity: Instant::now(), + flow_token: token, })) { FlowEntry::Udp(e) => e, _ => unreachable!(), @@ -1078,6 +1233,21 @@ impl SlirpBackend { }; entry.last_activity = Instant::now(); + if let Some(host_fd) = new_host_fd { + self.token_to_key.insert(new_token, flow_key); + if let Err(e) = self.epoll.register(host_fd, new_token, RegisterMode::Read) { + warn!( + guest_src_port = key.guest_src_port, + dst_ip = %key.dst_ip, + dst_port = key.dst_port, + fd = host_fd, + error = %e, + "SLIRP UDP: epoll register failed; flow present but readiness-driven relay disabled" + ); + } + self.epoll_waker.wake(); + } + if let Err(e) = entry.sock.send(&payload) { trace!("SLIRP UDP: send failed: {e}"); } @@ -1118,6 +1288,9 @@ impl SlirpBackend { dst_ip: ipv4.dst_addr(), }; let flow_key = FlowKey::IcmpEcho(key); + // Track whether this is a new entry so we can register it with epoll. + let mut new_icmp_fd: Option = None; + let mut new_token: u64 = 0; let entry: &mut IcmpEchoEntry = match self.flow_table.entry(flow_key) { std::collections::hash_map::Entry::Occupied(occupied) => match occupied.into_mut() { FlowEntry::IcmpEcho(e) => e, @@ -1132,10 +1305,14 @@ impl SlirpBackend { return Ok(()); } }; + let token = next_flow_token(PROTO_TAG_ICMP); + new_icmp_fd = Some(sock.as_raw_fd()); + new_token = token; match vacant.insert(FlowEntry::IcmpEcho(IcmpEchoEntry { sock, guest_id: ident, last_activity: Instant::now(), + flow_token: token, })) { FlowEntry::IcmpEcho(e) => e, _ => unreachable!(), @@ -1144,6 +1321,20 @@ impl SlirpBackend { }; entry.last_activity = Instant::now(); + if let Some(host_fd) = new_icmp_fd { + self.token_to_key.insert(new_token, flow_key); + if let Err(e) = self.epoll.register(host_fd, new_token, RegisterMode::Read) { + warn!( + guest_id = key.guest_id, + dst_ip = %key.dst_ip, + fd = host_fd, + error = %e, + "SLIRP ICMP: epoll register failed; flow present but readiness-driven relay disabled" + ); + } + self.epoll_waker.wake(); + } + // Build a wire ICMP echo packet with seq + data; the kernel will // rewrite the ident on send_to. let req = Icmpv4Repr::EchoRequest { @@ -1192,7 +1383,7 @@ impl SlirpBackend { src_ip, src_port, dst_ip, dst_port ); - // Phase 5 unified outbound translation: combines the gateway-loopback + // Unified outbound translation: combines the gateway-loopback // rewrite + deny-list check in one pure-function call. Returns None if // the dst is denied; on Some, the SocketAddr already has the right // host IP (loopback for the gateway, original for everything else). @@ -1264,14 +1455,22 @@ impl SlirpBackend { return Ok(()); } - // Remove any stale entry with the same key + // Remove any stale entry with the same key, unregistering its FD + // from the epoll set to avoid a dangling registration. + if let Some(FlowEntry::Tcp(stale)) = self.flow_table.get(&FlowKey::Tcp(key)) { + self.token_to_key.remove(&stale.flow_token); + self.epoll.unregister(stale.host_stream.as_raw_fd()).ok(); + } self.flow_table.remove(&FlowKey::Tcp(key)); // Connect to the host address resolved by translate_outbound above. match TcpStream::connect_timeout(&dst_addr, Duration::from_secs(3)) { Ok(stream) => { stream.set_nonblocking(true).ok(); + let host_fd = stream.as_raw_fd(); let our_seq: u32 = rand_seq(); + let token = next_flow_token(PROTO_TAG_TCP); + let flow_key = FlowKey::Tcp(key); let entry = TcpNatEntry { host_stream: stream, state: TcpNatState::SynReceived, @@ -1279,9 +1478,21 @@ impl SlirpBackend { guest_ack: seq + 1, last_activity: Instant::now(), bytes_in_flight: 0, + flow_token: token, }; - self.flow_table - .insert(FlowKey::Tcp(key), FlowEntry::Tcp(entry)); + self.flow_table.insert(flow_key, FlowEntry::Tcp(entry)); + self.token_to_key.insert(token, flow_key); + if let Err(e) = self.epoll.register(host_fd, token, RegisterMode::Read) { + warn!( + guest_src_port = key.guest_src_port, + dst_ip = %key.dst_ip, + dst_port = key.dst_port, + fd = host_fd, + error = %e, + "SLIRP TCP: epoll register failed; flow present but readiness-driven relay disabled" + ); + } + self.epoll_waker.wake(); // Send SYN-ACK back to guest let syn_ack = build_tcp_packet_static( @@ -1332,6 +1543,12 @@ impl SlirpBackend { return Ok(()); }; + // Track whether this processing path sets state=Closed so we can + // enqueue the key in pending_close once the entry borrow ends. + // FIN/RST paths push to pending_close and return early; mid-function + // error paths (ACK-driven read failure, write failure) set this flag. + let mut closed_by_error = false; + entry.last_activity = Instant::now(); // Inbound port-forward: guest's SYN-ACK completing the host-initiated @@ -1423,6 +1640,7 @@ impl SlirpBackend { key.guest_src_port, e ); entry.state = TcpNatState::Closed; + closed_by_error = true; break; } } @@ -1437,10 +1655,10 @@ impl SlirpBackend { let payload = tcp.payload(); if !payload.is_empty() && entry.state == TcpNatState::Established { - // Phase 3 guest→host: rely on the kernel's send buffer + TCP - // retransmit for backpressure. ACK only the bytes the kernel - // accepted right now; on WouldBlock, don't ACK at all and let - // the guest retransmit. No userspace buffering, no 256 KB cap. + // Guest→host backpressure: rely on the kernel's send buffer + TCP + // retransmit. ACK only the bytes the kernel accepted right now; + // on WouldBlock, don't ACK at all and let the guest retransmit. + // No userspace buffering, no fixed byte-cap on in-flight data. let payload_seq = seq; let n_written = match entry.host_stream.write(payload) { Ok(n) => n, @@ -1451,6 +1669,8 @@ impl SlirpBackend { key.guest_src_port, e ); entry.state = TcpNatState::Closed; + // entry last used above; borrow ends here before pending_close push. + self.pending_close.push(flow_key); return Ok(()); } }; @@ -1497,185 +1717,261 @@ impl SlirpBackend { self.inject_to_guest.push(fin_ack_frame); entry.our_seq = entry.our_seq.wrapping_add(1); entry.state = TcpNatState::Closed; + // entry last used above; borrow ends before pending_close push. + self.pending_close.push(flow_key); + return Ok(()); } // RST from guest if tcp.rst() { debug!("SLIRP TCP: RST from guest for {}:{}", dst_ip, dst_port); entry.state = TcpNatState::Closed; + // entry last used above; borrow ends before pending_close push. + self.pending_close.push(flow_key); + return Ok(()); + } + + // ACK-driven read failure marked the entry Closed but execution + // continues here (no early return). Push to pending_close so + // relay_tcp_nat_data removes the flow without an O(n) sweep. + if closed_by_error { + self.pending_close.push(flow_key); } Ok(()) } - /// Relay data from host TCP connections to guest - fn relay_tcp_nat_data(&mut self) { - let mut to_remove: Vec = Vec::new(); + /// Relay data from host TCP connections to guest, driven by epoll readiness. + /// + /// Closed flows enqueued by handle_tcp_frame (FIN/RST) are drained from + /// `pending_close` and removed promptly. Idle-timeout detection iterates + /// only the flow table entries directly, avoiding a separate Vec allocation. + /// Data relay is restricted to flows with an EPOLLIN event in `ready`. + fn relay_tcp_nat_data(&mut self, ready: &[EpollEvent]) { // Collect frames to inject (built separately to avoid borrow issues) let mut frames_to_inject: Vec> = Vec::new(); - let tcp_flow_keys: Vec = self - .flow_table - .keys() - .copied() - .filter(|k| matches!(k, FlowKey::Tcp(_))) - .collect(); + // Seed removal set from flows already marked Closed by handle_tcp_frame + // (FIN/RST path) via the pending_close queue. HashSet gives O(1) + // membership checks in the idle-timeout sweep and readiness filter below, + // avoiding the O(n*k) cost of Vec::contains under connection churn. + let mut to_remove_set: std::collections::HashSet = + std::mem::take(&mut self.pending_close) + .into_iter() + .collect(); + + // Idle-timeout sweep: scan flow_table once without collecting a + // separate key Vec. 300-second inactivity applies regardless of epoll + // readiness; this is O(n) in the number of TCP flows. + const TCP_IDLE_TIMEOUT: Duration = Duration::from_secs(300); + for (flow_key, entry) in &self.flow_table { + if let FlowEntry::Tcp(tcp_entry) = entry { + if tcp_entry.last_activity.elapsed() > TCP_IDLE_TIMEOUT { + to_remove_set.insert(*flow_key); + } + } + } - for flow_key in tcp_flow_keys { - let FlowKey::Tcp(key) = flow_key else { + let mut tcp_flow_keys: Vec = Vec::new(); + for event in ready { + if !event.readable || event.token & PROTO_TAG_MASK != PROTO_TAG_TCP { continue; - }; - let Some(FlowEntry::Tcp(entry)) = self.flow_table.get_mut(&flow_key) else { + } + let Some(flow_key) = self.token_to_key.get(&event.token).copied() else { continue; }; - - if entry.state == TcpNatState::Closed { - to_remove.push(flow_key); + if to_remove_set.contains(&flow_key) { continue; } - if entry.last_activity.elapsed() > Duration::from_secs(300) { - to_remove.push(flow_key); - continue; - } - if entry.state != TcpNatState::Established { + tcp_flow_keys.push(flow_key); + } + + for flow_key in tcp_flow_keys { + let FlowKey::Tcp(key) = flow_key else { continue; - } + }; - // Phase 3 host→guest path: peek what's in the kernel recv buffer - // without consuming. Send only the un-ACK'd portion (bytes past - // what we've already sent). The kernel's socket buffer holds the - // outstanding data; Task 3.4's ACK-driven `read()` consumes it - // once the guest ACKs. - let mut peek_buf = [0u8; 65536]; - match recv_peek(&entry.host_stream, &mut peek_buf) { - Ok(0) => { - // Host closed the connection. Send FIN to guest below. - debug!( - "SLIRP TCP: host EOF on flow guest_port={}, marking Closed", - key.guest_src_port - ); - entry.state = TcpNatState::Closed; + let mut became_closed = false; + let mut fin_frame: Option> = None; + + { + let Some(FlowEntry::Tcp(entry)) = self.flow_table.get_mut(&flow_key) else { + continue; + }; + + if entry.state != TcpNatState::Established { + continue; } - Ok(peek_n) => { - let in_flight = entry.bytes_in_flight as usize; - if peek_n > in_flight { - let new_bytes = &peek_buf[in_flight..peek_n]; - let mut sent_total: usize = 0; - for chunk in new_bytes.chunks(MTU - 54) { - let frame = build_tcp_packet_static( - key.dst_ip, - SLIRP_GUEST_IP, - key.dst_port, - key.guest_src_port, - entry.our_seq, - entry.guest_ack, - TcpControl::None, - chunk, + + // Host→guest path: peek what's in the kernel recv buffer + // without consuming. Send only the un-ACK'd portion (bytes past + // what we've already sent). The kernel's socket buffer holds the + // outstanding data; ACK-driven `read()` consumes it once the + // guest ACKs. + let mut peek_buf = [0u8; 65536]; + match recv_peek(&entry.host_stream, &mut peek_buf) { + Ok(0) => { + // Host closed the connection. Send FIN to guest below. + debug!( + "SLIRP TCP: host EOF on flow guest_port={}, marking Closed", + key.guest_src_port + ); + entry.state = TcpNatState::Closed; + became_closed = true; + } + Ok(peek_n) => { + let in_flight = entry.bytes_in_flight as usize; + if peek_n > in_flight { + let new_bytes = &peek_buf[in_flight..peek_n]; + let mut sent_total: usize = 0; + for chunk in new_bytes.chunks(MTU - 54) { + let frame = build_tcp_packet_static( + key.dst_ip, + SLIRP_GUEST_IP, + key.dst_port, + key.guest_src_port, + entry.our_seq, + entry.guest_ack, + TcpControl::None, + chunk, + ); + frames_to_inject.push(frame); + entry.our_seq = entry.our_seq.wrapping_add(chunk.len() as u32); + entry.bytes_in_flight = + entry.bytes_in_flight.wrapping_add(chunk.len() as u32); + sent_total += chunk.len(); + } + entry.last_activity = Instant::now(); + trace!( + "SLIRP TCP relay: peeked {} bytes (in_flight before={}, sent now={})", + peek_n, + in_flight, + sent_total ); - frames_to_inject.push(frame); - entry.our_seq = entry.our_seq.wrapping_add(chunk.len() as u32); - entry.bytes_in_flight = - entry.bytes_in_flight.wrapping_add(chunk.len() as u32); - sent_total += chunk.len(); } - entry.last_activity = Instant::now(); - trace!( - "SLIRP TCP relay: peeked {} bytes (in_flight before={}, sent now={})", - peek_n, - in_flight, - sent_total + // else: kernel buffer holds only already-in-flight bytes. + // Wait for guest ACK before sending more (Task 3.4). + } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { + // Kernel recv buffer empty; nothing to do this poll. + } + Err(e) => { + warn!( + "SLIRP TCP: recv_peek failed on flow guest_port={}, marking Closed: {}", + key.guest_src_port, e ); + entry.state = TcpNatState::Closed; + became_closed = true; } - // else: kernel buffer holds only already-in-flight bytes. - // Wait for guest ACK before sending more (Task 3.4). - } - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => { - // Kernel recv buffer empty; nothing to do this poll. } - Err(e) => { - warn!( - "SLIRP TCP: recv_peek failed on flow guest_port={}, marking Closed: {}", - key.guest_src_port, e - ); - entry.state = TcpNatState::Closed; + + // FIN if host closed + if entry.state == TcpNatState::Closed { + fin_frame = Some(build_tcp_packet_static( + key.dst_ip, + SLIRP_GUEST_IP, + key.dst_port, + key.guest_src_port, + entry.our_seq, + entry.guest_ack, + TcpControl::Fin, + &[], + )); } - } + } // entry borrow ends here - // FIN if host closed - if entry.state == TcpNatState::Closed { - let fin = build_tcp_packet_static( - key.dst_ip, - SLIRP_GUEST_IP, - key.dst_port, - key.guest_src_port, - entry.our_seq, - entry.guest_ack, - TcpControl::Fin, - &[], - ); + if let Some(fin) = fin_frame { frames_to_inject.push(fin); } + // Queue for removal so the cleanup loop below can unregister + drop. + if became_closed { + to_remove_set.insert(flow_key); + } } self.inject_to_guest.append(&mut frames_to_inject); - for flow_key in to_remove { + for flow_key in to_remove_set { + if let Some(FlowEntry::Tcp(entry)) = self.flow_table.get(&flow_key) { + self.token_to_key.remove(&entry.flow_token); + self.epoll.unregister(entry.host_stream.as_raw_fd()).ok(); + } self.flow_table.remove(&flow_key); } } /// Drain replies from each active ICMP echo socket and emit echo-reply - /// frames to the guest. + /// frames to the guest, driven by epoll readiness. /// - /// Called on every [`drain_to_guest`] tick. Entries idle longer than - /// `ICMP_IDLE_TIMEOUT` are evicted. - fn relay_icmp_echo(&mut self) { + /// Only flows whose token appears in `ready` with EPOLLIN set are visited. + /// Entries idle longer than `ICMP_IDLE_TIMEOUT` are still evicted on any + /// readiness event for that flow. + fn relay_icmp_echo(&mut self, ready: &[EpollEvent]) { const ICMP_IDLE_TIMEOUT: Duration = Duration::from_secs(60); let now = Instant::now(); - let flow_keys: Vec = self - .flow_table - .keys() - .copied() - .filter(|k| matches!(k, FlowKey::IcmpEcho(_))) - .collect(); - for flow_key in flow_keys { - let FlowKey::IcmpEcho(key) = flow_key else { + let mut ready_flow_keys: Vec = Vec::new(); + for event in ready { + if !event.readable || event.token & PROTO_TAG_MASK != PROTO_TAG_ICMP { + continue; + } + let Some(flow_key) = self.token_to_key.get(&event.token).copied() else { + continue; + }; + ready_flow_keys.push(flow_key); + } + + // Mirrors the TCP idle-timeout sweep so ICMP sockets do not accumulate + // indefinitely when the ping target goes silent. + let mut icmp_to_remove: std::collections::HashSet = + std::collections::HashSet::new(); + for (flow_key, entry) in &self.flow_table { + let FlowKey::IcmpEcho(_) = flow_key else { + continue; + }; + let FlowEntry::IcmpEcho(icmp_entry) = entry else { + continue; + }; + if now.duration_since(icmp_entry.last_activity) > ICMP_IDLE_TIMEOUT { + icmp_to_remove.insert(*flow_key); + } + } + + for flow_key in &ready_flow_keys { + // Skip if already in remove set (idle-timeout caught it first). + // O(1) via HashSet, not O(k) Vec::contains. + if icmp_to_remove.contains(flow_key) { + continue; + } + let FlowKey::IcmpEcho(key) = *flow_key else { continue; }; let frame = { - let Some(FlowEntry::IcmpEcho(entry)) = self.flow_table.get_mut(&flow_key) else { + let Some(FlowEntry::IcmpEcho(entry)) = self.flow_table.get_mut(flow_key) else { continue; }; - if now.duration_since(entry.last_activity) > ICMP_IDLE_TIMEOUT { - None // mark for removal below - } else { - let mut buf = [0u8; 1500]; - match entry.sock.recv_from(&mut buf) { - Ok((n, _addr)) => { - entry.last_activity = now; - // Wrap in Some to distinguish from the idle-timeout - // None arm in the outer match. - Some(Self::build_icmp_echo_reply_to_guest( - key.dst_ip, - entry.guest_id, - &buf[..n], - )) - } - Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, - Err(_) => continue, + let mut buf = [0u8; 1500]; + match entry.sock.recv_from(&mut buf) { + Ok((n, _addr)) => { + entry.last_activity = now; + Self::build_icmp_echo_reply_to_guest(key.dst_ip, entry.guest_id, &buf[..n]) } + Err(ref e) if e.kind() == io::ErrorKind::WouldBlock => continue, + Err(_) => continue, } }; - match frame { - None => { - // Idle timeout — evict entry. - self.flow_table.remove(&FlowKey::IcmpEcho(key)); - } - Some(Some(frame_bytes)) => self.inject_to_guest.push(frame_bytes), - Some(None) => {} // build failed; drop silently + if let Some(frame_bytes) = frame { + self.inject_to_guest.push(frame_bytes); } } + + for flow_key in icmp_to_remove { + if let Some(FlowEntry::IcmpEcho(e)) = self.flow_table.get(&flow_key) { + self.token_to_key.remove(&e.flow_token); + self.epoll.unregister(e.sock.as_raw_fd()).ok(); + } + self.flow_table.remove(&flow_key); + } } /// Build an Ethernet/IPv4/ICMP echo-reply frame addressed to the guest. @@ -1732,43 +2028,49 @@ impl SlirpBackend { } /// Drain replies from each active UDP flow socket and emit UDP frames to - /// the guest. + /// the guest, driven by epoll readiness. /// - /// Called on every [`drain_to_guest`] tick. Each connected socket is - /// polled non-blocking; `WouldBlock` and other errors are silently skipped - /// so a stale or unreachable flow never stalls the relay loop. + /// Only flows whose token appears in `ready` with EPOLLIN set are visited. + /// Idle-timeout reaping still runs every call: the reap scan is cheap + /// (skips flows not in `ready`) and ensures stale entries are eventually + /// evicted even when no new data arrives. /// /// Reply addressing mirrors the original guest datagram in reverse: the /// frame's IP source is the original destination (`key.dst_ip`) and UDP /// source port is `key.dst_port`; the destination is the guest IP and /// `key.guest_src_port`. - fn relay_udp_flows(&mut self) { + fn relay_udp_flows(&mut self, ready: &[EpollEvent]) { let now = Instant::now(); - // Reap idle flows; the per-flow connected socket is closed by Drop. - let stale: Vec = self - .flow_table - .iter() - .filter(|(k, e)| { - matches!(k, FlowKey::Udp(_)) - && match e { - FlowEntry::Udp(entry) => { - now.duration_since(entry.last_activity) > UDP_IDLE_TIMEOUT - } - _ => false, - } - }) - .map(|(k, _)| *k) - .collect(); - for k in stale { - self.flow_table.remove(&k); + // Per-flow connected sockets are closed by Drop when the entry leaves + // flow_table. + let mut stale: Vec = Vec::new(); + for (flow_key, entry) in &self.flow_table { + let FlowKey::Udp(_) = flow_key else { continue }; + let FlowEntry::Udp(udp_entry) = entry else { + continue; + }; + if now.duration_since(udp_entry.last_activity) > UDP_IDLE_TIMEOUT { + stale.push(*flow_key); + } + } + for flow_key in stale { + if let Some(FlowEntry::Udp(entry)) = self.flow_table.get(&flow_key) { + self.token_to_key.remove(&entry.flow_token); + self.epoll.unregister(entry.sock.as_raw_fd()).ok(); + } + self.flow_table.remove(&flow_key); } - let flow_keys: Vec = self - .flow_table - .keys() - .copied() - .filter(|k| matches!(k, FlowKey::Udp(_))) - .collect(); + let mut flow_keys: Vec = Vec::new(); + for event in ready { + if !event.readable || event.token & PROTO_TAG_MASK != PROTO_TAG_UDP { + continue; + } + let Some(flow_key) = self.token_to_key.get(&event.token).copied() else { + continue; + }; + flow_keys.push(flow_key); + } for flow_key in flow_keys { let FlowKey::Udp(key) = flow_key else { continue; @@ -1895,6 +2197,25 @@ impl SlirpBackend { buf } + + /// Push events from the net-poll thread into this backend's per-tick + /// event queue. Called from net_poll_thread after each successful + /// epoll_wait, while holding no other lock. + /// + /// drain_to_guest drains this queue with a brief uncontended lock + /// instead of re-entering EpollDispatch (which the net-poll thread + /// holds for the full 50 ms of the blocking wait). + pub fn push_ready_events(&self, events: &[EpollEvent]) { + // First push from net_poll_thread flips the flag so drain_to_guest + // skips its non-blocking-poll fallback. Stays set for the + // backend's lifetime — net_poll_thread doesn't disappear mid-run. + self.has_external_poller.store(true, Ordering::Relaxed); + if events.is_empty() { + return; + } + let mut queue = self.pending_events.lock().unwrap(); + queue.extend_from_slice(events); + } } impl NetworkBackend for SlirpBackend { @@ -1905,6 +2226,16 @@ impl NetworkBackend for SlirpBackend { fn drain_to_guest(&mut self, out: &mut Vec>) { SlirpBackend::drain_to_guest(self, out) } + + #[cfg(target_os = "linux")] + fn epoll_arc(&self) -> Option> { + Some(std::sync::Arc::clone(&self.epoll)) + } + + #[cfg(target_os = "linux")] + fn push_ready_events(&self, events: &[crate::network::epoll_dispatch::EpollEvent]) { + SlirpBackend::push_ready_events(self, events) + } } /// Build a TCP packet (free function to avoid borrow issues with &self methods) @@ -1972,7 +2303,7 @@ fn build_tcp_packet_static( } /// Build a synthetic TCP SYN frame from the SLIRP gateway to the guest, -/// used for inbound port-forwarding (Phase 5.5b). +/// used for inbound port-forwarding. /// /// The frame mirrors what the guest would see from a real TCP client: /// - src: `SLIRP_GATEWAY_IP:high_port` @@ -2189,6 +2520,48 @@ impl Drop for SlirpBackend { } } +impl SlirpBackend { + /// Re-register every live host FD in `flow_table` with the current epoll + /// dispatcher and rebuild `token_to_key`. Called from snapshot restore: + /// the `epoll_fd` is a kernel handle that does not survive snapshot, so a + /// fresh dispatcher starts empty even though `flow_table` deserialized + /// correctly with new FDs. + /// + /// Each existing flow keeps its stored `flow_token` so that any + /// already-queued readiness events (unlikely post-restore, but safe) still + /// resolve correctly. The `token_to_key` map is rebuilt from scratch + /// because it is in-memory-only state; it does not need to be persisted. + pub fn rebuild_epoll_from_flow_table(&mut self) { + use std::os::fd::AsRawFd; + self.token_to_key.clear(); + for (flow_key, entry) in &self.flow_table { + match (flow_key, entry) { + (FlowKey::Tcp(_), FlowEntry::Tcp(e)) => { + self.token_to_key.insert(e.flow_token, *flow_key); + let _ = self.epoll.register( + e.host_stream.as_raw_fd(), + e.flow_token, + RegisterMode::Read, + ); + } + (FlowKey::Udp(_), FlowEntry::Udp(e)) => { + self.token_to_key.insert(e.flow_token, *flow_key); + let _ = + self.epoll + .register(e.sock.as_raw_fd(), e.flow_token, RegisterMode::Read); + } + (FlowKey::IcmpEcho(_), FlowEntry::IcmpEcho(e)) => { + self.token_to_key.insert(e.flow_token, *flow_key); + let _ = + self.epoll + .register(e.sock.as_raw_fd(), e.flow_token, RegisterMode::Read); + } + _ => {} + } + } + } +} + /// Test-only helpers — not compiled into production builds. /// /// These are `#[cfg(test)]`/`#[cfg(feature = "bench-helpers")]` methods on @@ -2221,6 +2594,9 @@ impl SlirpBackend { dst_ip: SLIRP_GATEWAY_IP, dst_port: high_port, }; + let host_fd = host_stream.as_raw_fd(); + let token = next_flow_token(PROTO_TAG_TCP); + let flow_key = FlowKey::Tcp(key); let entry = TcpNatEntry { host_stream, state: TcpNatState::SynSent, @@ -2228,9 +2604,28 @@ impl SlirpBackend { guest_ack: 0, last_activity: Instant::now(), bytes_in_flight: 0, + flow_token: token, }; - self.flow_table - .insert(FlowKey::Tcp(key), FlowEntry::Tcp(entry)); + self.flow_table.insert(flow_key, FlowEntry::Tcp(entry)); + self.token_to_key.insert(token, flow_key); + // Skip epoll registration in test/bench contexts: the synthetic + // stream is already non-blocking but test harnesses check specific + // state transitions, not readiness events. + #[cfg(not(any(test, feature = "bench-helpers")))] + { + if let Err(e) = self.epoll.register(host_fd, token, RegisterMode::Read) { + warn!( + guest_port, + high_port, + fd = host_fd, + error = %e, + "SLIRP: epoll register for synthetic SynSent failed" + ); + } + self.epoll_waker.wake(); + } + #[cfg(any(test, feature = "bench-helpers"))] + let _ = host_fd; } /// Return the `TcpNatState` for the flow identified by `(guest_port, GATEWAY_IP, high_port)`, @@ -2279,6 +2674,24 @@ impl SlirpBackend { .send(accepted) .expect("accept channel must be open"); } + + /// Returns the number of user-registered FDs in the epoll set + /// (excludes the self-pipe). + pub fn registered_fd_count(&self) -> usize { + self.epoll.registered_fd_count() + } + + /// Replace the epoll dispatcher with a fresh empty one, discarding all + /// existing registrations. Simulates the post-snapshot state where the + /// kernel-side `epoll_fd` handle does not survive and a new one is + /// created. Used by `epoll_set_rebuilt_from_flow_table_smoke` to set up + /// the precondition that `rebuild_epoll_from_flow_table` must fix. + pub fn reset_epoll_for_snapshot_test(&mut self) { + let new_epoll_inner = EpollDispatch::new().expect("EpollDispatch::new"); + let new_waker = new_epoll_inner.waker(); + self.epoll = Arc::new(new_epoll_inner); + self.epoll_waker = new_waker; + } } #[cfg(test)] diff --git a/src/vmm/mod.rs b/src/vmm/mod.rs index 9d10588d..97fe2d0f 100644 --- a/src/vmm/mod.rs +++ b/src/vmm/mod.rs @@ -1594,8 +1594,19 @@ fn vsock_irq_thread( /// from host TCP sockets accumulates unread, causing TLS handshakes and /// API calls to time out. /// -/// This thread wakes every 5 ms, reads any pending host data via -/// `try_inject_rx`, and fires IRQ 10 to notify the guest. +/// This thread uses an adaptive `EpollDispatch::wait_with_timeout`: +/// - **Active** (5 ms): any kernel readiness event in the last cycle keeps +/// the thread in the 5 ms cadence so the guest's TCP delayed-ACK timer +/// fires on schedule. Both real socket readiness events and self-pipe +/// wakes (from `epoll_waker.wake()` after a new SYN or injected ACK) +/// count as activity. +/// - **Idle** (50 ms): a cycle with no kernel events backs off to 50 ms. +/// New flows or incoming data wake the wait immediately via the epoll set +/// or the waker, so the 50 ms cap only fires when the network is truly +/// quiet. +/// +/// When the network backend does not provide an epoll instance +/// (non-SlirpBackend), the thread falls back to a fixed 5 ms sleep. fn net_poll_thread(net_dev: Arc>, vm: Arc, running: Arc) { #[repr(C)] struct KvmIrqLevel { @@ -1603,10 +1614,83 @@ fn net_poll_thread(net_dev: Arc>, vm: Arc, running: A level: u32, } const KVM_IRQ_LINE: libc::c_ulong = 0x4008_AE61; + // Adaptive epoll_wait timeout. Active periods need a 5 ms cadence so + // the guest's TCP delayed-ACK timer fires on schedule (the guest spends + // most idle time in HLT and relies on our IRQ pulses to advance vCPU + // schedule slots; a 50 ms gap causes +40 ms CRR latency, exactly + // Linux's delayed-ACK period). Idle periods can use the long timeout + // safely: any new flow's SYN goes through process_guest_frame which + // calls epoll_waker.wake(), and host data arrival fires EPOLLIN — both + // wake the wait immediately, so the 50 ms ceiling never bites a real + // packet. We pick the next timeout based on whether the last wait + // returned events: had-events ⇒ stay in the active 5 ms cadence, + // timed-out ⇒ back off to 50 ms. Maintains correctness; recovers the + // 10x idle wakeup reduction that motivated Phase 6.4 in the first + // place. + const ACTIVE_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(5); + const IDLE_TIMEOUT: std::time::Duration = std::time::Duration::from_millis(50); + const FALLBACK_SLEEP: std::time::Duration = std::time::Duration::from_millis(5); + + // Start in the idle regime — first SYN flips us into active. + let mut epoll_wait_timeout: std::time::Duration = IDLE_TIMEOUT; + let vm_fd = vm.vm_fd().as_raw_fd(); let guest_memory = vm.guest_memory(); + + // Obtain the epoll Arc from the backend without holding the device lock + // across the blocking wait. Falls back to None if the backend is not + // a SlirpBackend (e.g. in unit tests or future alternative backends). + let epoll_arc = { + match net_dev.lock() { + Ok(guard) => guard.epoll_arc(), + Err(_) => None, + } + }; + + let mut epoll_events: Vec = Vec::new(); + while running.load(Ordering::Relaxed) { - std::thread::sleep(std::time::Duration::from_millis(5)); + // Block outside the device lock: either on epoll readiness or a short + // sleep. This lets the vCPU thread acquire the device lock without + // contention during the wait phase. + epoll_events.clear(); + // Raw kernel count from epoll_wait, including self-pipe wakes + // that the filter strips from `epoll_events`. A self-pipe wake + // is the signal that handle_tcp_frame queued a frame and called + // epoll_waker.wake() — i.e. real activity that should keep the + // adaptive timeout in the active 5 ms cadence even though + // `epoll_events.is_empty()`. + let mut raw_kernel_events: usize = 0; + if let Some(ref ep_arc) = epoll_arc { + raw_kernel_events = ep_arc + .wait_with_timeout(&mut epoll_events, epoll_wait_timeout) + .unwrap_or(0); + } else { + std::thread::sleep(FALLBACK_SLEEP); + } + + // Adapt the next-cycle timeout based on this cycle's outcome. + // Any kernel event (real readiness OR self-pipe wake from the + // vCPU thread) signals activity and keeps us in the 5 ms + // cadence so the guest's TCP delayed-ACK timer fires on time. + // A pure timeout drops us to the 50 ms idle cadence. One quiet + // cycle to switch to idle, one event to switch back to active. + epoll_wait_timeout = if raw_kernel_events > 0 { + ACTIVE_TIMEOUT + } else { + IDLE_TIMEOUT + }; + + // Push ready events into the backend's queue before acquiring the + // device lock for inject/IRQ work. drain_to_guest will consume them + // without re-locking EpollDispatch, eliminating mutex contention + // between the net-poll thread's 50 ms blocking wait and the vCPU + // thread's process_guest_frame → drain_to_guest path. + if !epoll_events.is_empty() { + if let Ok(guard) = net_dev.lock() { + guard.push_events_to_backend(&epoll_events); + } + } let has_interrupt = { let mut guard = match net_dev.lock() { @@ -1621,6 +1705,9 @@ fn net_poll_thread(net_dev: Arc>, vm: Arc, running: A // an earlier edge was missed by the guest. if has_interrupt { let assert_irq = KvmIrqLevel { irq: 10, level: 1 }; + // SAFETY: KVM_IRQ_LINE ioctl writes the KvmIrqLevel struct into + // the in-kernel APIC; the struct is #[repr(C)] and the fd is valid + // for the lifetime of `vm`. unsafe { libc::ioctl(vm_fd, KVM_IRQ_LINE as _, &assert_irq); } diff --git a/tests/network_baseline.rs b/tests/network_baseline.rs index 87c3b012..d5115426 100644 --- a/tests/network_baseline.rs +++ b/tests/network_baseline.rs @@ -10,11 +10,11 @@ #![allow(deprecated)] //! //! Three tests assert *broken* behavior on purpose. Each is marked -//! `BROKEN_ON_PURPOSE` and flips in the phase that fixes it: +//! `BROKEN_ON_PURPOSE` and flips when the corresponding fix lands: //! -//! - `tcp_writes_more_than_256kb_succeed` — flipped in Phase 3 (was `tcp_to_host_buffer_drops_at_256kb`) -//! - `udp_non_dns_round_trips` — flipped in Phase 2 (was `udp_non_dns_silently_dropped`) -//! - `icmp_echo_returns_reply` — flipped in Phase 1 (was `icmp_echo_silently_dropped`) +//! - `tcp_writes_more_than_256kb_succeed` (was `tcp_to_host_buffer_drops_at_256kb`) +//! - `udp_non_dns_round_trips` (was `udp_non_dns_silently_dropped`) +//! - `icmp_echo_returns_reply` (was `icmp_echo_silently_dropped`) //! //! Run with: `cargo test --test network_baseline` @@ -168,12 +168,13 @@ fn parse_tcp_to_guest(frame: &[u8]) -> Option<(u32, u32, TcpControl, usize)> { )) } -/// Drains frames the stack wants to send to the guest, calling `poll` -/// up to `n` times. +/// Drains frames the stack wants to send to the guest, calling +/// `drain_to_guest` up to `n` times. Returns all frames produced +/// across the calls (caller may not care about per-call boundaries). fn drain_n(stack: &mut SlirpBackend, n: usize) -> Vec> { - let mut out = Vec::new(); + let mut out: Vec> = Vec::new(); for _ in 0..n { - out.extend(stack.poll()); + stack.drain_to_guest(&mut out); } out } @@ -293,12 +294,11 @@ fn tcp_data_round_trip() { ); } -/// Phase 3 flipped this BROKEN_ON_PURPOSE pin: passt-style sequence -/// mirroring + don't-ACK-on-WouldBlock backpressure replaces the -/// 256 KB userspace cliff. Pushing >1 MB through the relay now -/// succeeds — the kernel's socket buffer holds outstanding bytes, -/// the guest retransmits unacked segments, and the connection stays -/// alive instead of being reset. +/// BROKEN_ON_PURPOSE pin (now passing): passt-style sequence mirroring and +/// don't-ACK-on-WouldBlock backpressure replace the 256 KB userspace cliff. +/// Pushing >1 MB through the relay succeeds — the kernel's socket buffer +/// holds outstanding bytes, the guest retransmits unacked segments, and the +/// connection stays alive instead of being reset. #[test] fn tcp_writes_more_than_256kb_succeed() { use std::sync::atomic::{AtomicUsize, Ordering}; @@ -385,7 +385,11 @@ fn tcp_writes_more_than_256kb_succeed() { let deadline = std::time::Instant::now() + std::time::Duration::from_secs(10); while bytes_received.load(Ordering::Relaxed) < TOTAL && std::time::Instant::now() < deadline { - // Send a chunk; advance our seq. + // Retransmit semantics: only advance the send cursor once the + // previous chunk has been ACK'd. If the stack stops ACKing + // (backpressure engaged), we re-send the same seq/payload until + // it's acknowledged. This matches production guest-TCP retransmit + // behavior. let _ = stack.process_guest_frame(&build_tcp_frame( SLIRP_GATEWAY_IP, GUEST_EPHEMERAL_PORT, @@ -395,10 +399,9 @@ fn tcp_writes_more_than_256kb_succeed() { TcpControl::Psh, &chunk, )); - seq = seq.wrapping_add(CHUNK as u32); // Drain frames; track the highest ACK we've seen and watch - // for RST/FIN that would indicate a Phase-2 era close. + // for RST/FIN that would indicate a premature close. for f in drain_n(&mut stack, 4) { if let Some((_, ack, ctrl, _)) = parse_tcp_to_guest(&f) { if matches!(ctrl, TcpControl::Rst | TcpControl::Fin) { @@ -414,9 +417,14 @@ fn tcp_writes_more_than_256kb_succeed() { break; } - // If we've out-paced the kernel's recv buffer, sleep briefly - // so the server thread can drain it. - if seq.wrapping_sub(acked_seq) > 256 * 1024 { + // Advance our send cursor only past ACK'd data. If the stack + // didn't ACK this chunk, the next loop iteration re-sends the + // same seq/payload (true TCP retransmit semantics). + if acked_seq >= seq.wrapping_add(CHUNK as u32) { + seq = seq.wrapping_add(CHUNK as u32); + } else if seq.wrapping_sub(acked_seq) > 256 * 1024 { + // Out-paced kernel recv buffer; sleep briefly so the host + // server thread can drain. std::thread::sleep(std::time::Duration::from_millis(10)); } } @@ -443,13 +451,13 @@ fn tcp_writes_more_than_256kb_succeed() { let received = bytes_received.load(Ordering::Relaxed); assert!( !saw_close, - "Phase 3 contract: connection must NOT be reset/FIN'd mid-stream \ - (was the 256 KB cliff bug). Saw RST or FIN." + "TCP backpressure must not RST/FIN mid-stream — the relay must hold \ + the line while the kernel drains. Saw RST or FIN." ); assert!( received >= TOTAL * 95 / 100, - "Phase 3 contract: server must receive ~all bytes pushed (got {received}/{TOTAL}); \ - backpressure should retransmit until success, not silently drop." + "server must receive ~all bytes pushed (got {received}/{TOTAL}); \ + backpressure must retransmit until success, not silently drop." ); } @@ -831,9 +839,9 @@ fn dns_cache_keys_by_question_not_xid() { } } -/// Phase 2 flipped this BROKEN_ON_PURPOSE pin: arbitrary UDP (any -/// destination port, not just 53) now round-trips through the per-flow -/// connected-socket NAT introduced in Tasks 2.1–2.4. +/// BROKEN_ON_PURPOSE pin (now passing): arbitrary UDP (any destination +/// port, not just 53) round-trips through the per-flow connected-socket +/// NAT. #[test] fn udp_non_dns_round_trips() { let host_sock = UdpSocket::bind("127.0.0.1:0").unwrap(); @@ -897,9 +905,8 @@ fn udp_non_dns_round_trips() { assert!(saw_reply, "guest must receive UDP reply via per-flow NAT"); } -/// Phase 1 flipped the BROKEN_ON_PURPOSE assertion: the guest now -/// receives an ICMP echo reply via the host's unprivileged -/// `IPPROTO_ICMP SOCK_DGRAM` socket. +/// BROKEN_ON_PURPOSE pin (now passing): the guest receives an ICMP echo +/// reply via the host's unprivileged `IPPROTO_ICMP SOCK_DGRAM` socket. /// /// Skips gracefully if `net.ipv4.ping_group_range` forbids unprivileged /// ICMP for the calling GID — in that environment the warn-once log @@ -1032,7 +1039,7 @@ fn nat_translate_outbound_unmodified_external_ip() { ); } -/// E2E contract for Phase 5.5b inbound port-forwarding. +/// E2E contract for inbound port-forwarding. /// /// Builds a `SlirpBackend` with one TCP port-forward rule /// (`HOST_PORT` → `GUEST_PORT`), has a host thread connect to @@ -1231,3 +1238,56 @@ fn nat_translate_outbound_deny_list() { "IPs outside deny CIDR must pass" ); } + +/// Snapshot/restore must rebuild the epoll dispatch from `flow_table` +/// contents. The `epoll_fd` is a kernel handle that does not survive +/// snapshot; a fresh dispatcher starts with zero registered FDs even +/// though `flow_table` may contain entries with live host sockets. +/// +/// This smoke test verifies the rebuild path end-to-end: +/// 1. Insert a synthetic TCP flow into the flow table. +/// 2. Reset the epoll dispatcher to a fresh empty one (simulating what +/// snapshot restore does: the kernel handle is gone, a new one is created). +/// 3. Confirm the pre-rebuild count is zero. +/// 4. Call `rebuild_epoll_from_flow_table`. +/// 5. Confirm the post-rebuild count is one. +/// +/// Gated on `bench-helpers` because it consumes synthetic-injection helpers +/// (`insert_synthetic_synsent_entry`, `reset_epoll_for_snapshot_test`, +/// `registered_fd_count`) that are only visible to external test/bench +/// consumers when that feature is enabled. Default `cargo test` skips this +/// pin; CI runs it via `cargo test --features bench-helpers`. +#[cfg(feature = "bench-helpers")] +#[test] +fn epoll_set_rebuilt_from_flow_table_smoke() { + use std::net::TcpListener; + + let mut backend = SlirpBackend::new().expect("backend"); + + let listener = TcpListener::bind("127.0.0.1:0").expect("bind"); + let host_stream = + std::net::TcpStream::connect(listener.local_addr().unwrap()).expect("connect"); + host_stream.set_nonblocking(true).ok(); + + // Insert a synthetic flow (may or may not register with epoll depending on + // cfg context). Then reset the epoll dispatcher to a fresh empty one — + // this is the key step that simulates what happens after snapshot restore: + // the kernel-side `epoll_fd` does not survive, so a new one is created + // with zero registrations even though `flow_table` has live entries. + backend.insert_synthetic_synsent_entry(8080, 49152, 1000, host_stream); + backend.reset_epoll_for_snapshot_test(); + + let before = backend.registered_fd_count(); + assert_eq!( + before, 0, + "after reset, epoll must have zero registered FDs (simulates post-snapshot state)" + ); + + backend.rebuild_epoll_from_flow_table(); + + let after = backend.registered_fd_count(); + assert_eq!( + after, 1, + "rebuild_epoll_from_flow_table must register all live flow FDs" + ); +}