Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions crates/localup-client/src/localup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1163,6 +1163,16 @@ impl TunnelConnection {

let mut result = Vec::with_capacity(data.len() + 64);
let mut host_rewritten = false;
let mut is_upgrade = false;

// First pass: detect if this is an Upgrade request (WebSocket, etc.)
for line in &lines[1..] {
let lower = line.to_lowercase();
if lower.starts_with("upgrade:") {
is_upgrade = true;
break;
}
}

for (i, line) in lines.iter().enumerate() {
if i == 0 {
Expand All @@ -1183,12 +1193,24 @@ impl TunnelConnection {
result.extend_from_slice(b"Origin: http://");
result.extend_from_slice(local_addr.as_bytes());
result.extend_from_slice(b"\r\n");
} else if !is_upgrade && line.to_lowercase().starts_with("connection:") {
// For non-upgrade requests, skip existing Connection header —
// we force Connection: close below
continue;
} else {
result.extend_from_slice(line.as_bytes());
result.extend_from_slice(b"\r\n");
}
}

if !is_upgrade {
// Force Connection: close so the local server closes the TCP connection
// after sending its response. Without this, HTTP/1.1 keep-alive servers
// hold the connection open indefinitely, preventing the tunnel stream
// from detecting that the response is complete.
result.extend_from_slice(b"Connection: close\r\n");
}

// Terminate headers
result.extend_from_slice(b"\r\n");

Expand Down
165 changes: 106 additions & 59 deletions crates/localup-server-https/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1530,45 +1530,75 @@ impl HttpsServer {
//
// Both directions run concurrently. The response can start streaming
// back to the browser while the request body is still being uploaded.
//
// IMPORTANT: quic_send must stay alive in the main scope for the entire
// request duration. If it's dropped (e.g., moved into a spawned task that
// completes early for GET requests), the QUIC send half closes, which
// signals the client to tear down the stream before the response arrives.

// Spawn task: stream remaining H2 request body → QUIC tunnel
// quic_send is passed by reference via a channel to avoid moving it.
let (body_tx, mut body_rx) = tokio::sync::mpsc::channel::<Vec<u8>>(8);
let h2_to_quic = tokio::spawn(async move {
loop {
match h2_body.data().await {
Some(Ok(data)) => {
let len = data.len();
let _ = h2_body.flow_control().release_capacity(len);
let msg = TunnelMessage::HttpStreamData {
stream_id,
data: data.to_vec(),
};
if quic_send.send_message(&msg).await.is_err() {
if body_tx.send(data.to_vec()).await.is_err() {
break;
}
}
Some(Err(e)) => {
debug!("H2 body read error: {}", e);
break;
}
None => {
// END_STREAM received — request body complete
break;
}
None => break, // END_STREAM — request body complete
}
}
});

// Main task: stream QUIC tunnel response → H2 response
// Wait for the first response chunk (contains HTTP/1.1 status + headers)
let first_response =
tokio::time::timeout(std::time::Duration::from_secs(30), quic_recv.recv_message())
.await;
// Main task: bidirectional streaming
// - Forward H2 request body chunks (from body_rx) → QUIC tunnel
// - Wait for first QUIC response (HTTP/1.1 headers), then stream body → H2
//
// We use tokio::select! to handle both body forwarding and response
// receiving concurrently, keeping quic_send alive the whole time.

let mut status_code: u16 = 502;
let mut resp_headers_captured: Vec<(String, String)> = Vec::new();
let mut body_done = false;

// Phase 1: Wait for first response while also forwarding any request body data
let first_data = loop {
tokio::select! {
// Forward request body chunks to QUIC tunnel
body_chunk = body_rx.recv(), if !body_done => {
match body_chunk {
Some(data) => {
let msg = TunnelMessage::HttpStreamData {
stream_id,
data,
};
if let Err(e) = quic_send.send_message(&msg).await {
debug!("QUIC send error forwarding body: {}", e);
body_done = true;
}
}
None => {
body_done = true;
}
}
}
// Wait for first response from tunnel
resp = quic_recv.recv_message() => {
break resp;
}
}
};

match first_response {
Ok(Ok(Some(TunnelMessage::HttpStreamData { data, .. }))) => {
match first_data {
Ok(Some(TunnelMessage::HttpStreamData { data, .. })) => {
// Parse the HTTP/1.1 response to extract status and headers
let response_str = String::from_utf8_lossy(&data);

Expand Down Expand Up @@ -1646,76 +1676,93 @@ impl HttpsServer {
}
}

// Stream remaining response data from tunnel to h2 client
// Phase 2: Stream remaining response data, still forwarding any body chunks
loop {
match quic_recv.recv_message().await {
Ok(Some(TunnelMessage::HttpStreamData { data, .. })) => {
if is_chunked {
let mut input = std::mem::take(&mut chunked_remainder);
input.extend_from_slice(&data);

let (decoded, remainder) = decode_chunked_streaming(&input);
chunked_remainder = remainder;

if !decoded.is_empty() {
if let Err(e) = h2_send.send_data(Bytes::from(decoded), false) {
tokio::select! {
// Forward remaining request body chunks
body_chunk = body_rx.recv(), if !body_done => {
match body_chunk {
Some(data) => {
let msg = TunnelMessage::HttpStreamData {
stream_id,
data,
};
if let Err(e) = quic_send.send_message(&msg).await {
debug!("QUIC send error forwarding body: {}", e);
body_done = true;
}
}
None => {
body_done = true;
}
}
}
// Stream response data from tunnel to h2 client
resp = quic_recv.recv_message() => {
match resp {
Ok(Some(TunnelMessage::HttpStreamData { data, .. })) => {
if is_chunked {
let mut input = std::mem::take(&mut chunked_remainder);
input.extend_from_slice(&data);

let (decoded, remainder) = decode_chunked_streaming(&input);
chunked_remainder = remainder;

if !decoded.is_empty() {
if let Err(e) = h2_send.send_data(Bytes::from(decoded), false) {
debug!("h2 send error in streaming proxy: {}", e);
break;
}
}
} else if let Err(e) = h2_send.send_data(Bytes::from(data), false) {
debug!("h2 send error in streaming proxy: {}", e);
break;
}
}
} else if let Err(e) = h2_send.send_data(Bytes::from(data), false) {
debug!("h2 send error in streaming proxy: {}", e);
break;
Ok(Some(TunnelMessage::HttpStreamClose { .. })) => {
debug!("Tunnel closed stream {} (end of response)", stream_id);
let _ = h2_send.send_data(Bytes::new(), true);
break;
}
Ok(None) => {
debug!("QUIC stream ended for stream {}", stream_id);
let _ = h2_send.send_data(Bytes::new(), true);
break;
}
Err(e) => {
debug!("QUIC recv error in streaming proxy: {}", e);
let _ = h2_send.send_data(Bytes::new(), true);
break;
}
_ => {
warn!("Unexpected message on h2 streaming proxy");
}
}
}
Ok(Some(TunnelMessage::HttpStreamClose { .. })) => {
debug!("Tunnel closed stream {} (end of response)", stream_id);
let _ = h2_send.send_data(Bytes::new(), true);
break;
}
Ok(None) => {
debug!("QUIC stream ended for stream {}", stream_id);
let _ = h2_send.send_data(Bytes::new(), true);
break;
}
Err(e) => {
debug!("QUIC recv error in streaming proxy: {}", e);
let _ = h2_send.send_data(Bytes::new(), true);
break;
}
_ => {
warn!("Unexpected message on h2 streaming proxy");
}
}
}
}
Ok(Ok(Some(TunnelMessage::HttpStreamClose { .. }))) | Ok(Ok(None)) => {
Ok(Some(TunnelMessage::HttpStreamClose { .. })) | Ok(None) => {
warn!("Tunnel stream closed without response");
let response = http::Response::builder().status(502).body(()).unwrap();
let mut send = send_response.send_response(response, false)?;
send.send_data(Bytes::from("Tunnel closed"), true)?;
}
Ok(Ok(Some(other))) => {
Ok(Some(other)) => {
warn!("Unexpected tunnel message: {:?}", other);
let response = http::Response::builder().status(502).body(()).unwrap();
let mut send = send_response.send_response(response, false)?;
send.send_data(Bytes::from("Unexpected tunnel response"), true)?;
}
Ok(Err(e)) => {
Err(e) => {
error!("Tunnel receive error: {}", e);
let response = http::Response::builder().status(502).body(()).unwrap();
let mut send = send_response.send_response(response, false)?;
send.send_data(Bytes::from("Tunnel error"), true)?;
}
Err(_) => {
error!("Tunnel response timeout after 30s");
let response = http::Response::builder().status(504).body(()).unwrap();
let mut send = send_response.send_response(response, false)?;
send.send_data(Bytes::from("Gateway Timeout"), true)?;
}
}

// Wait for request body forwarding to finish (or abort if response is done)
// Clean up the body forwarding task
h2_to_quic.abort();
let _ = h2_to_quic.await;

Expand Down