diff --git a/benches/network.rs b/benches/network.rs index c3441f24..6fd8720a 100644 --- a/benches/network.rs +++ b/benches/network.rs @@ -832,18 +832,15 @@ mod linux_benches { } /// Wall-clock latency of the full inbound port-forward path: host - /// `TcpStream::connect` → listener thread `accept()` (polled every - /// `PORT_FORWARD_POLL_INTERVAL = 50 ms`) → mpsc channel push → - /// `process_pending_inbound_accepts` → `synthesize_inbound_syn` → - /// first SYN frame visible in `drain_to_guest` output. + /// `TcpStream::connect` → epoll readiness event → `process_listener_readiness` + /// accept → mpsc channel push → `process_pending_inbound_accepts` → + /// `synthesize_inbound_syn` → first SYN frame visible in `drain_to_guest` + /// output. /// - /// The 50 ms polling ceiling means the distribution will be roughly - /// uniform on [0, 50 ms] — a median around 25 ms is expected and normal, - /// not a bug. Regressions in the inbound state machine or the listener - /// poll loop will shift the distribution upward beyond 50 ms. - /// - /// Regressions in the inbound state machine or listener-poll loop will - /// surface numerically against this measurement. + /// The listener FD is registered with `EpollDispatch`; accept latency is + /// bounded by the epoll_wait cadence (≤ 5 ms active), not a fixed poll + /// interval. Sub-millisecond medians are expected. Regressions in the + /// inbound state machine will surface numerically against this measurement. #[divan::bench(sample_count = 20, sample_size = 1)] fn port_forward_accept_latency(bencher: Bencher) { const GUEST_PORT: u16 = 8080; @@ -870,9 +867,8 @@ mod linux_benches { bencher.bench_local(|| { // Spawn a worker thread that connects to the host listener port. - // The listener thread inside SlirpBackend will accept() it on the - // next poll (within PORT_FORWARD_POLL_INTERVAL = 50ms) and push - // the accepted stream onto the mpsc channel. + // EpollDispatch fires readiness; process_listener_readiness accepts + // and pushes the stream onto the mpsc channel. let connect_addr = format!("127.0.0.1:{host_port}"); let worker = thread::spawn(move || { let addr: std::net::SocketAddr = connect_addr.parse().expect("parse connect addr"); diff --git a/src/network/slirp.rs b/src/network/slirp.rs index 827134ae..9f5047da 100644 --- a/src/network/slirp.rs +++ b/src/network/slirp.rs @@ -33,7 +33,6 @@ use std::net::{Ipv4Addr, SocketAddr, TcpListener, TcpStream, UdpSocket}; use std::os::fd::{AsRawFd, FromRawFd}; use std::sync::atomic::{AtomicBool, AtomicU64, AtomicU8, Ordering}; use std::sync::{mpsc, Arc, Mutex}; -use std::thread::JoinHandle; use std::time::{Duration, Instant}; use crate::network::epoll_dispatch::{EpollDispatch, EpollEvent, RegisterMode, Waker}; @@ -92,12 +91,6 @@ const MAX_QUEUE_SIZE: usize = 64; const TCP_WINDOW: u16 = 65535; const UDP_IDLE_TIMEOUT: Duration = Duration::from_secs(60); -/// Sleep interval for the port-forward listener thread between non-blocking -/// accept polls. Short enough to keep accept latency low; long enough to -/// avoid busy-waiting the host CPU. -#[allow(dead_code)] -const PORT_FORWARD_POLL_INTERVAL: Duration = Duration::from_millis(50); - /// ICMP unprivileged probe state. /// /// `0` = unknown (not yet probed), `1` = available, `2` = unavailable @@ -118,6 +111,7 @@ const PROTO_TAG_MASK: u64 = 0xFF00_0000_0000_0000; const PROTO_TAG_TCP: u64 = 0x0100_0000_0000_0000; const PROTO_TAG_UDP: u64 = 0x0200_0000_0000_0000; const PROTO_TAG_ICMP: u64 = 0x0300_0000_0000_0000; +const PROTO_TAG_LISTEN: u64 = 0x0400_0000_0000_0000; /// Monotonic counter for flow token allocation. The lower 56 bits of each /// `FlowToken` are drawn from here; the upper 8 bits carry `PROTO_TAG_*`. @@ -135,14 +129,24 @@ fn next_flow_token(proto_tag: u64) -> u64 { proto_tag | counter } +/// Build an epoll token for a port-forward listener FD. +/// +/// The high byte carries `PROTO_TAG_LISTEN`; the low 16 bits encode the +/// host port. Each port-forward rule has a distinct host port, so tokens +/// are unique across all registered listeners. +fn flow_token_for_listener(host_port: u16) -> u64 { + PROTO_TAG_LISTEN | u64::from(host_port) +} + // ────────────────────────────────────────────────────────────────────── // Inbound port-forward accept channel // ────────────────────────────────────────────────────────────────────── /// One accepted host-side TCP connection waiting to be forwarded into the guest. /// -/// Produced by [`run_port_forward_listener`] and consumed by -/// [`SlirpBackend::process_pending_inbound_accepts`] on the net-poll thread. +/// Produced by [`SlirpBackend::process_listener_readiness`] (epoll-driven +/// accept) and consumed by [`SlirpBackend::process_pending_inbound_accepts`] +/// on the net-poll thread. pub(crate) struct InboundAccept { /// The accepted host-side TCP stream (non-blocking after accept). host_stream: TcpStream, @@ -521,15 +525,15 @@ pub struct SlirpBackend { /// dispatch. Maintained in sync with `flow_table`: every insert adds an /// entry; every remove clears it. token_to_key: HashMap, - /// Background threads bound to host TCP ports for inbound port - /// forwarding. Each handle corresponds to one `nat::PortForward` rule. - /// Joined on `Drop`. - port_forward_listeners: Vec>, - /// Shutdown signal for `port_forward_listeners`. Set true on Drop; - /// each listener thread checks it after every accept and exits cleanly. - port_forward_shutdown: Arc, - /// Receiver end of the accept channel fed by [`run_port_forward_listener`] - /// threads. Processed on the net-poll thread in + /// Live `TcpListener`s for each TCP port-forward rule, keyed by host port. + /// The tuple value is `(listener, guest_port)`. Each listener's FD is + /// registered with `EpollDispatch` under `PROTO_TAG_LISTEN`; readiness + /// events drive the accept loop on the net-poll thread. No dedicated + /// polling thread per rule. + port_forward_listeners: HashMap, + /// Receiver end of the accept channel fed by + /// [`bind_port_forward_listeners`] via [`SlirpBackend::process_listener_readiness`]. + /// Processed on the net-poll thread in /// [`SlirpBackend::process_pending_inbound_accepts`]. pending_inbound_accepts: mpsc::Receiver, /// Sender end of `pending_inbound_accepts`. Kept alive so the channel @@ -639,15 +643,15 @@ impl SlirpBackend { nat.deny_cidrs.len(), nat.port_forwards.len(), dns_servers ); - // Spawn listener threads for port-forwards. - let port_forward_shutdown = Arc::new(AtomicBool::new(false)); - let (port_forward_listeners, pending_inbound_accepts, accept_sender) = - spawn_port_forward_listeners(&nat, &port_forward_shutdown); + let (accept_tx, accept_rx) = mpsc::channel::(); let epoll_inner = EpollDispatch::new()?; let epoll_waker = epoll_inner.waker(); let epoll = Arc::new(epoll_inner); + // Bind listeners for port-forwards and register their FDs with epoll. + let port_forward_listeners = bind_port_forward_listeners(&nat, &epoll); + Ok(Self { queue, iface, @@ -664,9 +668,8 @@ impl SlirpBackend { flow_table: HashMap::new(), token_to_key: HashMap::new(), port_forward_listeners, - port_forward_shutdown, - pending_inbound_accepts, - accept_sender, + pending_inbound_accepts: accept_rx, + accept_sender: accept_tx, epoll, epoll_waker, pending_events: Mutex::new(Vec::new()), @@ -701,10 +704,77 @@ impl SlirpBackend { /// Drain the inbound-accept channel and seed a `SynSent` flow-table entry /// plus a synthesized SYN frame for each accepted connection. /// + /// Accept connections from any port-forward listeners whose FDs are ready + /// in `ready` and push them onto the inbound-accept channel for + /// [`process_pending_inbound_accepts`] to consume. + /// + /// Drains until `WouldBlock` so that a burst of connections arriving + /// between two epoll wakeups is not spread across multiple ticks. + fn process_listener_readiness(&mut self, ready: &[EpollEvent]) { + // Accepted connections are collected here first so that the borrow on + // `port_forward_listeners` ends before we call `accept_sender.send`. + let mut accepted_batch: Vec = Vec::new(); + let mut sender_failed = false; + + for event in ready { + if !event.readable || event.token & PROTO_TAG_MASK != PROTO_TAG_LISTEN { + continue; + } + let host_port = (event.token & 0xFFFF) as u16; + let Some((listener, guest_port)) = self.port_forward_listeners.get(&host_port) else { + continue; + }; + let guest_port = *guest_port; + // Drain the listener — multiple connections may have arrived in one + // EPOLLIN edge. + loop { + match listener.accept() { + Ok((stream, peer_addr)) => { + let high_port = peer_addr.port(); + let _ = stream.set_nonblocking(true); + trace!( + host_port, + guest_port, + high_port, + peer = %peer_addr, + "SLIRP port-forward: accepted connection" + ); + accepted_batch.push(InboundAccept { + host_stream: stream, + high_port, + guest_port, + }); + } + Err(ref would_block) if would_block.kind() == io::ErrorKind::WouldBlock => { + break; + } + Err(accept_error) => { + warn!( + host_port, + error = %accept_error, + "SLIRP port-forward: accept error" + ); + break; + } + } + } + } + + // Borrow of `port_forward_listeners` has ended; send the batch. + for accepted in accepted_batch { + if self.accept_sender.send(accepted).is_err() { + sender_failed = true; + break; + } + } + let _ = sender_failed; // receiver drop handled gracefully on next tick + } + /// Called at the top of [`drain_to_guest`] so all `SlirpBackend` mutation /// stays on the net-poll thread — same single-writer lock model as the rest - /// of the relay pipeline. The listener threads only enqueue via the mpsc - /// channel; they never touch `flow_table` or `inject_to_guest` directly. + /// of the relay pipeline. `process_listener_readiness` enqueues accepted + /// connections via the mpsc channel; this method drains that channel and + /// seeds the flow table. fn process_pending_inbound_accepts(&mut self) { loop { let accepted = match self.pending_inbound_accepts.try_recv() { @@ -803,9 +873,6 @@ impl SlirpBackend { /// /// See [`crate::network::NetworkBackend::drain_to_guest`]. pub fn drain_to_guest(&mut self, out: &mut Vec>) { - // 0. Process any accepted host-side connections from port-forward listeners. - self.process_pending_inbound_accepts(); - // Check rx_queue size before polling. let rx_count = { let q = self.queue.lock().unwrap(); @@ -855,6 +922,13 @@ impl SlirpBackend { events }; + // 0a. Accept any newly-ready listener connections (may push into + // accept_sender for the next step). + self.process_listener_readiness(&ready); + + // 0b. Drain the accept channel (epoll-driven listeners + test helpers). + self.process_pending_inbound_accepts(); + // 4. Process TCP NAT data relay. self.relay_tcp_nat_data(&ready); @@ -2377,132 +2451,58 @@ fn ipv4_checksum(header: &[u8]) -> u16 { !sum as u16 } -/// Spawn one listener thread per TCP port-forward rule and return the join -/// handles, the receiver end of the accept channel, and the sender end. -/// -/// The caller stores the handles in `SlirpBackend::port_forward_listeners`, -/// the receiver in `SlirpBackend::pending_inbound_accepts`, and the sender in -/// `SlirpBackend::accept_sender` (so the channel stays open when zero listener -/// threads are running, e.g. in tests). +/// Bind one `TcpListener` per TCP port-forward rule, register each with +/// `epoll`, and return a map from host port to `(listener, guest_port)`. /// -/// When `nat.port_forwards` contains no TCP rules the returned `Vec` is empty -/// and no background threads are spawned. -pub(crate) fn spawn_port_forward_listeners( +/// Rules whose bind or `set_nonblocking` calls fail are skipped with a +/// `WARN` log; the returned map contains only the rules that succeeded. +/// When `nat.port_forwards` contains no TCP rules the map is empty. +pub(crate) fn bind_port_forward_listeners( nat: &nat::Rules, - shutdown: &Arc, -) -> ( - Vec>, - mpsc::Receiver, - mpsc::Sender, -) { - let (accept_tx, accept_rx) = mpsc::channel::(); - let mut handles = Vec::new(); + epoll: &Arc, +) -> HashMap { + let mut listeners = HashMap::new(); for port_forward in &nat.port_forwards { if port_forward.proto != nat::ForwardProto::Tcp { continue; } let host_port = port_forward.host_port; let guest_port = port_forward.guest_port; - let tx = accept_tx.clone(); - let shutdown = Arc::clone(shutdown); - let handle = std::thread::Builder::new() - .name(format!("slirp-pf-{host_port}-{guest_port}")) - .spawn(move || { - run_port_forward_listener(host_port, guest_port, tx, shutdown); - }) - .expect("spawn port-forward listener thread"); - handles.push(handle); - } - (handles, accept_rx, accept_tx) -} - -/// Main loop for a port-forward listener thread. -/// -/// Binds `127.0.0.1:host_port`, accepts connections in non-blocking mode, -/// and forwards each accepted [`TcpStream`] to the net-poll thread via -/// `accept_tx`. The peer's remote port is used as `high_port` — it is -/// unique per connection and requires no extra allocation. -/// -/// The thread exits when `shutdown` is `true` or when `accept_tx.send` -/// fails (receiver dropped — backend is shutting down). -fn run_port_forward_listener( - host_port: u16, - guest_port: u16, - accept_tx: mpsc::Sender, - shutdown: Arc, -) { - let listener = match TcpListener::bind(("127.0.0.1", host_port)) { - Ok(listener) => listener, - Err(bind_error) => { + let listener = match TcpListener::bind(("127.0.0.1", host_port)) { + Ok(l) => l, + Err(bind_error) => { + warn!( + host_port, + error = %bind_error, + "SLIRP port-forward: bind failed, rule disabled" + ); + continue; + } + }; + if let Err(nb_error) = listener.set_nonblocking(true) { warn!( host_port, - error = %bind_error, - "SLIRP port-forward: bind failed, port-forward disabled" + error = %nb_error, + "SLIRP port-forward: set_nonblocking failed, rule disabled" ); - return; + continue; } - }; - if let Err(nb_error) = listener.set_nonblocking(true) { - warn!( + let token = flow_token_for_listener(host_port); + if let Err(reg_error) = epoll.register(listener.as_raw_fd(), token, RegisterMode::Read) { + warn!( + host_port, + error = %reg_error, + "SLIRP port-forward: epoll register failed, rule disabled" + ); + continue; + } + debug!( host_port, - error = %nb_error, - "SLIRP port-forward: set_nonblocking failed, port-forward disabled" + guest_port, "SLIRP port-forward: listening on 127.0.0.1 (epoll-driven)" ); - return; - } - debug!( - host_port, - guest_port, "SLIRP port-forward: listening on 127.0.0.1" - ); - - while !shutdown.load(Ordering::Relaxed) { - match listener.accept() { - Ok((stream, peer_addr)) => { - let high_port = peer_addr.port(); - if let Err(nb_error) = stream.set_nonblocking(true) { - warn!( - host_port, - guest_port, - high_port, - error = %nb_error, - "SLIRP port-forward: accepted stream set_nonblocking failed, dropping" - ); - continue; - } - trace!( - host_port, - guest_port, - high_port, - peer = %peer_addr, - "SLIRP port-forward: accepted connection" - ); - let accepted = InboundAccept { - host_stream: stream, - high_port, - guest_port, - }; - if accept_tx.send(accepted).is_err() { - debug!( - host_port, - "SLIRP port-forward: backend gone, listener exiting" - ); - return; - } - } - Err(ref would_block) if would_block.kind() == io::ErrorKind::WouldBlock => { - std::thread::sleep(PORT_FORWARD_POLL_INTERVAL); - } - Err(accept_error) => { - warn!( - host_port, - error = %accept_error, - "SLIRP port-forward: accept error" - ); - std::thread::sleep(PORT_FORWARD_POLL_INTERVAL); - } - } + listeners.insert(host_port, (listener, guest_port)); } - debug!(host_port, "SLIRP port-forward: listener shutting down"); + listeners } impl Default for SlirpBackend { @@ -2511,15 +2511,6 @@ impl Default for SlirpBackend { } } -impl Drop for SlirpBackend { - fn drop(&mut self) { - self.port_forward_shutdown.store(true, Ordering::Relaxed); - for handle in std::mem::take(&mut self.port_forward_listeners) { - let _ = handle.join(); - } - } -} - impl SlirpBackend { /// Re-register every live host FD in `flow_table` with the current epoll /// dispatcher and rebuild `token_to_key`. Called from snapshot restore: @@ -2915,27 +2906,27 @@ mod tests { assert_eq!(syn_count, 1, "exactly one SYN must be queued for the guest"); } - /// Verify that `with_security` spawns exactly one listener thread when - /// given one TCP port-forward rule, and zero threads when given none. + /// Verify that `with_security` binds exactly one epoll-driven listener when + /// given one TCP port-forward rule, and zero listeners when given none. #[test] - fn with_security_spawns_listener_per_tcp_port_forward() { - // Empty port-forwards: no listener threads. + fn with_security_binds_listener_per_tcp_port_forward() { + // Empty port-forwards: no listeners. let empty = SlirpBackend::with_security(64, 50, &["169.254.0.0/16".to_string()], &[]) .expect("SlirpBackend::with_security (empty)"); assert_eq!( empty.port_forward_listeners.len(), 0, - "zero listener threads for empty port_forwards" + "zero listeners for empty port_forwards" ); - // One TCP port-forward: exactly one listener thread. + // One TCP port-forward: exactly one listener. let one = SlirpBackend::with_security(64, 50, &["169.254.0.0/16".to_string()], &[(18080, 80)]) .expect("SlirpBackend::with_security (one forward)"); assert_eq!( one.port_forward_listeners.len(), 1, - "one listener thread for one TCP port-forward rule" + "one listener for one TCP port-forward rule" ); } }