From 3b98d288a477078c8ce67fdb9240acbb21c55378 Mon Sep 17 00:00:00 2001 From: David Viejo Date: Sat, 7 Mar 2026 11:27:57 +0100 Subject: [PATCH 1/2] fix(https): Keep QUIC send half alive during H2 passthrough streaming The previous passthrough rewrite moved quic_send into a spawned task for body forwarding. For GET requests (no body), the task completed immediately, dropping quic_send and closing the QUIC send half. The client interpreted this as stream termination and closed before the response arrived, causing 502 "Tunnel closed" on every request. Fix: Use an mpsc channel to forward H2 body chunks to the main task, which owns quic_send for the entire request lifetime. tokio::select! handles both body forwarding and response streaming concurrently. --- crates/localup-server-https/src/server.rs | 165 ++++++++++++++-------- 1 file changed, 106 insertions(+), 59 deletions(-) diff --git a/crates/localup-server-https/src/server.rs b/crates/localup-server-https/src/server.rs index ff314a5..2ab7b15 100644 --- a/crates/localup-server-https/src/server.rs +++ b/crates/localup-server-https/src/server.rs @@ -1530,19 +1530,22 @@ impl HttpsServer { // // Both directions run concurrently. The response can start streaming // back to the browser while the request body is still being uploaded. + // + // IMPORTANT: quic_send must stay alive in the main scope for the entire + // request duration. If it's dropped (e.g., moved into a spawned task that + // completes early for GET requests), the QUIC send half closes, which + // signals the client to tear down the stream before the response arrives. // Spawn task: stream remaining H2 request body → QUIC tunnel + // quic_send is passed by reference via a channel to avoid moving it. + let (body_tx, mut body_rx) = tokio::sync::mpsc::channel::>(8); let h2_to_quic = tokio::spawn(async move { loop { match h2_body.data().await { Some(Ok(data)) => { let len = data.len(); let _ = h2_body.flow_control().release_capacity(len); - let msg = TunnelMessage::HttpStreamData { - stream_id, - data: data.to_vec(), - }; - if quic_send.send_message(&msg).await.is_err() { + if body_tx.send(data.to_vec()).await.is_err() { break; } } @@ -1550,25 +1553,52 @@ impl HttpsServer { debug!("H2 body read error: {}", e); break; } - None => { - // END_STREAM received — request body complete - break; - } + None => break, // END_STREAM — request body complete } } }); - // Main task: stream QUIC tunnel response → H2 response - // Wait for the first response chunk (contains HTTP/1.1 status + headers) - let first_response = - tokio::time::timeout(std::time::Duration::from_secs(30), quic_recv.recv_message()) - .await; + // Main task: bidirectional streaming + // - Forward H2 request body chunks (from body_rx) → QUIC tunnel + // - Wait for first QUIC response (HTTP/1.1 headers), then stream body → H2 + // + // We use tokio::select! to handle both body forwarding and response + // receiving concurrently, keeping quic_send alive the whole time. let mut status_code: u16 = 502; let mut resp_headers_captured: Vec<(String, String)> = Vec::new(); + let mut body_done = false; + + // Phase 1: Wait for first response while also forwarding any request body data + let first_data = loop { + tokio::select! { + // Forward request body chunks to QUIC tunnel + body_chunk = body_rx.recv(), if !body_done => { + match body_chunk { + Some(data) => { + let msg = TunnelMessage::HttpStreamData { + stream_id, + data, + }; + if let Err(e) = quic_send.send_message(&msg).await { + debug!("QUIC send error forwarding body: {}", e); + body_done = true; + } + } + None => { + body_done = true; + } + } + } + // Wait for first response from tunnel + resp = quic_recv.recv_message() => { + break resp; + } + } + }; - match first_response { - Ok(Ok(Some(TunnelMessage::HttpStreamData { data, .. }))) => { + match first_data { + Ok(Some(TunnelMessage::HttpStreamData { data, .. })) => { // Parse the HTTP/1.1 response to extract status and headers let response_str = String::from_utf8_lossy(&data); @@ -1646,76 +1676,93 @@ impl HttpsServer { } } - // Stream remaining response data from tunnel to h2 client + // Phase 2: Stream remaining response data, still forwarding any body chunks loop { - match quic_recv.recv_message().await { - Ok(Some(TunnelMessage::HttpStreamData { data, .. })) => { - if is_chunked { - let mut input = std::mem::take(&mut chunked_remainder); - input.extend_from_slice(&data); - - let (decoded, remainder) = decode_chunked_streaming(&input); - chunked_remainder = remainder; - - if !decoded.is_empty() { - if let Err(e) = h2_send.send_data(Bytes::from(decoded), false) { + tokio::select! { + // Forward remaining request body chunks + body_chunk = body_rx.recv(), if !body_done => { + match body_chunk { + Some(data) => { + let msg = TunnelMessage::HttpStreamData { + stream_id, + data, + }; + if let Err(e) = quic_send.send_message(&msg).await { + debug!("QUIC send error forwarding body: {}", e); + body_done = true; + } + } + None => { + body_done = true; + } + } + } + // Stream response data from tunnel to h2 client + resp = quic_recv.recv_message() => { + match resp { + Ok(Some(TunnelMessage::HttpStreamData { data, .. })) => { + if is_chunked { + let mut input = std::mem::take(&mut chunked_remainder); + input.extend_from_slice(&data); + + let (decoded, remainder) = decode_chunked_streaming(&input); + chunked_remainder = remainder; + + if !decoded.is_empty() { + if let Err(e) = h2_send.send_data(Bytes::from(decoded), false) { + debug!("h2 send error in streaming proxy: {}", e); + break; + } + } + } else if let Err(e) = h2_send.send_data(Bytes::from(data), false) { debug!("h2 send error in streaming proxy: {}", e); break; } } - } else if let Err(e) = h2_send.send_data(Bytes::from(data), false) { - debug!("h2 send error in streaming proxy: {}", e); - break; + Ok(Some(TunnelMessage::HttpStreamClose { .. })) => { + debug!("Tunnel closed stream {} (end of response)", stream_id); + let _ = h2_send.send_data(Bytes::new(), true); + break; + } + Ok(None) => { + debug!("QUIC stream ended for stream {}", stream_id); + let _ = h2_send.send_data(Bytes::new(), true); + break; + } + Err(e) => { + debug!("QUIC recv error in streaming proxy: {}", e); + let _ = h2_send.send_data(Bytes::new(), true); + break; + } + _ => { + warn!("Unexpected message on h2 streaming proxy"); + } } } - Ok(Some(TunnelMessage::HttpStreamClose { .. })) => { - debug!("Tunnel closed stream {} (end of response)", stream_id); - let _ = h2_send.send_data(Bytes::new(), true); - break; - } - Ok(None) => { - debug!("QUIC stream ended for stream {}", stream_id); - let _ = h2_send.send_data(Bytes::new(), true); - break; - } - Err(e) => { - debug!("QUIC recv error in streaming proxy: {}", e); - let _ = h2_send.send_data(Bytes::new(), true); - break; - } - _ => { - warn!("Unexpected message on h2 streaming proxy"); - } } } } - Ok(Ok(Some(TunnelMessage::HttpStreamClose { .. }))) | Ok(Ok(None)) => { + Ok(Some(TunnelMessage::HttpStreamClose { .. })) | Ok(None) => { warn!("Tunnel stream closed without response"); let response = http::Response::builder().status(502).body(()).unwrap(); let mut send = send_response.send_response(response, false)?; send.send_data(Bytes::from("Tunnel closed"), true)?; } - Ok(Ok(Some(other))) => { + Ok(Some(other)) => { warn!("Unexpected tunnel message: {:?}", other); let response = http::Response::builder().status(502).body(()).unwrap(); let mut send = send_response.send_response(response, false)?; send.send_data(Bytes::from("Unexpected tunnel response"), true)?; } - Ok(Err(e)) => { + Err(e) => { error!("Tunnel receive error: {}", e); let response = http::Response::builder().status(502).body(()).unwrap(); let mut send = send_response.send_response(response, false)?; send.send_data(Bytes::from("Tunnel error"), true)?; } - Err(_) => { - error!("Tunnel response timeout after 30s"); - let response = http::Response::builder().status(504).body(()).unwrap(); - let mut send = send_response.send_response(response, false)?; - send.send_data(Bytes::from("Gateway Timeout"), true)?; - } } - // Wait for request body forwarding to finish (or abort if response is done) + // Clean up the body forwarding task h2_to_quic.abort(); let _ = h2_to_quic.await; From 7e728077c719927b973f3a30d243e3b995952614 Mon Sep 17 00:00:00 2001 From: David Viejo Date: Sat, 7 Mar 2026 11:38:04 +0100 Subject: [PATCH 2/2] fix(client): Force Connection: close on proxied HTTP/1.1 requests The transparent streaming proxy kept TCP connections alive (HTTP/1.1 default), so local servers like Next.js never closed the connection after sending their response. This meant local_read.read() blocked indefinitely, delaying the tunnel stream close by ~6 seconds. Fix: Inject Connection: close header when rewriting Host headers for non-upgrade requests. WebSocket upgrade requests preserve the original Connection: Upgrade header. --- crates/localup-client/src/localup.rs | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/crates/localup-client/src/localup.rs b/crates/localup-client/src/localup.rs index e4efa73..280f957 100644 --- a/crates/localup-client/src/localup.rs +++ b/crates/localup-client/src/localup.rs @@ -1163,6 +1163,16 @@ impl TunnelConnection { let mut result = Vec::with_capacity(data.len() + 64); let mut host_rewritten = false; + let mut is_upgrade = false; + + // First pass: detect if this is an Upgrade request (WebSocket, etc.) + for line in &lines[1..] { + let lower = line.to_lowercase(); + if lower.starts_with("upgrade:") { + is_upgrade = true; + break; + } + } for (i, line) in lines.iter().enumerate() { if i == 0 { @@ -1183,12 +1193,24 @@ impl TunnelConnection { result.extend_from_slice(b"Origin: http://"); result.extend_from_slice(local_addr.as_bytes()); result.extend_from_slice(b"\r\n"); + } else if !is_upgrade && line.to_lowercase().starts_with("connection:") { + // For non-upgrade requests, skip existing Connection header — + // we force Connection: close below + continue; } else { result.extend_from_slice(line.as_bytes()); result.extend_from_slice(b"\r\n"); } } + if !is_upgrade { + // Force Connection: close so the local server closes the TCP connection + // after sending its response. Without this, HTTP/1.1 keep-alive servers + // hold the connection open indefinitely, preventing the tunnel stream + // from detecting that the response is complete. + result.extend_from_slice(b"Connection: close\r\n"); + } + // Terminate headers result.extend_from_slice(b"\r\n");