From 1b7e27f2c455d066534acd5e3b77d1a0633d8ce2 Mon Sep 17 00:00:00 2001 From: Sander Saares Date: Tue, 22 Jul 2025 09:04:14 +0300 Subject: [PATCH 1/6] Deadlock start --- tests/h2-tests/tests/deadlock.rs | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) create mode 100644 tests/h2-tests/tests/deadlock.rs diff --git a/tests/h2-tests/tests/deadlock.rs b/tests/h2-tests/tests/deadlock.rs new file mode 100644 index 00000000..e6d5b0e2 --- /dev/null +++ b/tests/h2-tests/tests/deadlock.rs @@ -0,0 +1,32 @@ +use std::net::SocketAddr; + +#[test] +fn logical_deadlock() { + +} + +/// Opens a TCP socket and listens for HTTP2 connections. +/// +/// Expects to read 10 KB of request body from each request, +/// to which it responds with a 200 OK with 10 KB of response body. +struct Server { + local_addr: SocketAddr, +} + +impl Server { + /// Starts a new server. Drop it to signal the server to stop and to wait for it to stop. + fn start() -> Self { + + } + + /// The address the server is listening on. Clients should send their traffic here. + fn addr(&self) -> SocketAddr { + self.local_addr + } +} + +impl Drop for Server { + fn drop(&mut self) { + todo!() + } +} \ No newline at end of file From 5e195ddd78b7c094ba0386ce87635e881dabfd55 Mon Sep 17 00:00:00 2001 From: Sander Saares Date: Tue, 22 Jul 2025 13:32:24 +0300 Subject: [PATCH 2/6] Demonstrate logical deadlock of HTTP2 connection when handling more requests than max_concurrent_streams --- tests/h2-tests/tests/deadlock.rs | 228 ++++++++++++++++++++++++++++--- 1 file changed, 206 insertions(+), 22 deletions(-) diff --git a/tests/h2-tests/tests/deadlock.rs b/tests/h2-tests/tests/deadlock.rs index e6d5b0e2..e2e0a387 100644 --- a/tests/h2-tests/tests/deadlock.rs +++ b/tests/h2-tests/tests/deadlock.rs @@ -1,32 +1,216 @@ -use std::net::SocketAddr; +use std::{ + mem, + net::SocketAddr, + sync::{ + atomic::{self, AtomicUsize}, + Arc, + }, + time::Instant, +}; +use futures::StreamExt; +use h2_support::prelude::*; +use tokio::{net::TcpStream, runtime::Runtime, sync::Barrier}; + +static REQUESTS_COMPLETED: AtomicUsize = AtomicUsize::new(0); + +// The magic happens when concurrency is much greater than max_concurrent_streams, +// though it also seems to reproduce less often with smaller numbers in general. +const CONCURRENCY: usize = 100; +const MAX_CONCURRENT_STREAMS: u32 = 20; + +// If we successfully complete this many requests per task, we can call the test a success. +// Typically, this test seems to fail in the 10-25 request range but no doubt there are many +// variables in play, as failures are somewhat random and timing-dependent. +const TARGET_REQUESTS_PER_TASK: usize = 100; + +// The test is successful if all the expected requests have been completed. +const TARGET_REQUESTS_COMPLETED: usize = CONCURRENCY * TARGET_REQUESTS_PER_TASK; + +// If no requests have been completed in this long, we consider the connection deadlocked. +const MUST_MAKE_PROGRESS_INTERVAL: Duration = Duration::from_secs(2); +const CHECK_FOR_PROGRESS_INTERVAL: Duration = Duration::from_millis(100); + +// For easy repro, run as `cargo test -p h2-tests --test deadlock -- --no-capture` #[test] fn logical_deadlock() { + let server_addr = serve(); + + // We start all the tasks at the same time for increased water hammer. + let start = Arc::new(Barrier::new(CONCURRENCY)); + + let runtime = Runtime::new().unwrap(); + + runtime.block_on(async move { + let tcp_connection = TcpStream::connect(server_addr).await.unwrap(); + let (client, h2_connection) = client::handshake(tcp_connection).await.unwrap(); + + tokio::spawn(async move { + // This just does "connection stuff" independent of any requests. + h2_connection.await.unwrap(); + }); + + let mut join_handles = Vec::new(); + + for _ in 0..CONCURRENCY { + join_handles.push(tokio::spawn({ + let mut client = client.clone(); + let start = start.clone(); + + async move { + start.wait().await; + + for _ in 0..TARGET_REQUESTS_PER_TASK { + let request = Request::builder() + .method(Method::POST) + .uri("/") + .body(()) + .unwrap(); + + let (response_future, mut request_body) = + client.send_request(request, false).unwrap(); + + request_body + .send_data(Bytes::from_static(REQUEST_BODY), true) + .unwrap(); + + // The anomaly we expect to see will result in this await never completing + // for all `CONCURRENCY` concurrent requests enqueued on this connection. + let mut response = match response_future.await { + Ok(response) => response, + Err(e) + if e.is_reset() && e.reason() == Some(Reason::REFUSED_STREAM) => + { + // Not relevant - it means the server sent a max_concurrent_streams + // limit and the client has not yet seen it, so it is not enforced. + // Not an ideal outcome in general but not relevant to this test + // scenario - the anomaly we expect to see results in deadlocked + // requests, not failing requests. + REQUESTS_COMPLETED.fetch_add(1, atomic::Ordering::Relaxed); + continue; + } + Err(e) => { + // This failure is not expected as part of the test scenario, neither for fail nor pass results. + panic!("Request failed unexpectedly: {:?}", e); + } + }; + + let response_body_stream = response.body_mut(); + let mut total_length: usize = 0; + + while let Some(Ok(chunk)) = response_body_stream.next().await { + total_length += chunk.len(); + } + + assert_eq!(total_length, 10 * 1024); + + REQUESTS_COMPLETED.fetch_add(1, atomic::Ordering::Relaxed); + } + } + })); + } + + let mut last_progress = Instant::now(); + let mut last_requests_completed = 0; + while REQUESTS_COMPLETED.load(atomic::Ordering::Relaxed) < TARGET_REQUESTS_COMPLETED { + let requests_completed = REQUESTS_COMPLETED.load(atomic::Ordering::Relaxed); + + println!("Requests completed: {requests_completed} of {TARGET_REQUESTS_COMPLETED}",); + + if requests_completed > last_requests_completed { + last_progress = Instant::now(); + } + + last_requests_completed = requests_completed; + + if Instant::now().duration_since(last_progress) > MUST_MAKE_PROGRESS_INTERVAL { + panic!( + "No requests completed in the last {:?}, deadlock likely occurred", + MUST_MAKE_PROGRESS_INTERVAL + ); + } + + thread::sleep(CHECK_FOR_PROGRESS_INTERVAL); + } + + println!("All requests completed successfully."); + + for handle in join_handles { + handle.await.unwrap(); + } + }); } -/// Opens a TCP socket and listens for HTTP2 connections. -/// -/// Expects to read 10 KB of request body from each request, +static REQUEST_BODY: &[u8] = &[77; 10 * 1024]; +static RESPONSE_BODY: &[u8] = &[88; 10 * 1024]; + +/// Starts a test server that will open a TCP socket and listen for HTTP/2 connections. +/// +/// For simplify, it will listen forever (until the test binary terminates). +/// +/// Expects to read 10 KB of request body from each request to `/`, /// to which it responds with a 200 OK with 10 KB of response body. -struct Server { - local_addr: SocketAddr, -} +fn serve() -> SocketAddr { + let runtime = Runtime::new().unwrap(); -impl Server { - /// Starts a new server. Drop it to signal the server to stop and to wait for it to stop. - fn start() -> Self { - - } + let local_addr = runtime.block_on(async { + let listener = tokio::net::TcpListener::bind(SocketAddr::from(([127, 0, 0, 1], 0))) + .await + .unwrap(); - /// The address the server is listening on. Clients should send their traffic here. - fn addr(&self) -> SocketAddr { - self.local_addr - } -} + let local_addr = listener.local_addr().unwrap(); + + tokio::spawn(async move { + loop { + println!("Waiting for connection on {}", local_addr); -impl Drop for Server { - fn drop(&mut self) { - todo!() - } -} \ No newline at end of file + let (socket, _) = listener.accept().await.unwrap(); + + println!("Accepted connection from {}", socket.peer_addr().unwrap()); + + // Fork each connection. + tokio::spawn(async move { + let mut connection = server::Builder::new() + .max_concurrent_streams(MAX_CONCURRENT_STREAMS) + .handshake(socket) + .await + .expect("Failed to handshake"); + + while let Some(incoming) = connection.next().await { + let Ok((mut request, mut responder)) = incoming else { + // This generally means connection terminated. + break; + }; + + // Fork each request. + tokio::spawn(async move { + let request_body_stream = request.body_mut(); + let mut total_length: usize = 0; + + while let Some(Ok(chunk)) = request_body_stream.next().await { + total_length += chunk.len(); + } + + assert_eq!(total_length, 10 * 1024); + + let response = + Response::builder().status(StatusCode::OK).body(()).unwrap(); + let mut body_sender = responder.send_response(response, false).unwrap(); + body_sender + .send_data(Bytes::from_static(RESPONSE_BODY), true) + .unwrap(); + }); + } + }); + } + }); + + local_addr + }); + + // Keep it running until end of process. + mem::forget(runtime); + + local_addr +} From 20759ba527c4a3c06aa87e81ed022ee2216d7438 Mon Sep 17 00:00:00 2001 From: Sander Saares Date: Tue, 22 Jul 2025 13:35:26 +0300 Subject: [PATCH 3/6] Remove misleading comment --- tests/h2-tests/tests/deadlock.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tests/h2-tests/tests/deadlock.rs b/tests/h2-tests/tests/deadlock.rs index e2e0a387..f799eff8 100644 --- a/tests/h2-tests/tests/deadlock.rs +++ b/tests/h2-tests/tests/deadlock.rs @@ -14,8 +14,7 @@ use tokio::{net::TcpStream, runtime::Runtime, sync::Barrier}; static REQUESTS_COMPLETED: AtomicUsize = AtomicUsize::new(0); -// The magic happens when concurrency is much greater than max_concurrent_streams, -// though it also seems to reproduce less often with smaller numbers in general. +// The magic happens when concurrency is much greater than max_concurrent_streams. const CONCURRENCY: usize = 100; const MAX_CONCURRENT_STREAMS: u32 = 20; From 13edf156f0602b12efbfdd3a8c46ba4e66e3b5a8 Mon Sep 17 00:00:00 2001 From: Sander Saares Date: Tue, 22 Jul 2025 13:35:47 +0300 Subject: [PATCH 4/6] Delete misleading comment --- tests/h2-tests/tests/deadlock.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/h2-tests/tests/deadlock.rs b/tests/h2-tests/tests/deadlock.rs index f799eff8..d1ef9f1b 100644 --- a/tests/h2-tests/tests/deadlock.rs +++ b/tests/h2-tests/tests/deadlock.rs @@ -30,7 +30,6 @@ const TARGET_REQUESTS_COMPLETED: usize = CONCURRENCY * TARGET_REQUESTS_PER_TASK; const MUST_MAKE_PROGRESS_INTERVAL: Duration = Duration::from_secs(2); const CHECK_FOR_PROGRESS_INTERVAL: Duration = Duration::from_millis(100); -// For easy repro, run as `cargo test -p h2-tests --test deadlock -- --no-capture` #[test] fn logical_deadlock() { let server_addr = serve(); From 1e08581487eea7088ffc5f821ba1db549327de8e Mon Sep 17 00:00:00 2001 From: Sander Saares Date: Tue, 22 Jul 2025 13:36:37 +0300 Subject: [PATCH 5/6] Tidy --- tests/h2-tests/tests/deadlock.rs | 17 ++--------------- 1 file changed, 2 insertions(+), 15 deletions(-) diff --git a/tests/h2-tests/tests/deadlock.rs b/tests/h2-tests/tests/deadlock.rs index d1ef9f1b..1c04ecc7 100644 --- a/tests/h2-tests/tests/deadlock.rs +++ b/tests/h2-tests/tests/deadlock.rs @@ -1,16 +1,13 @@ use std::{ mem, net::SocketAddr, - sync::{ - atomic::{self, AtomicUsize}, - Arc, - }, + sync::atomic::{self, AtomicUsize}, time::Instant, }; use futures::StreamExt; use h2_support::prelude::*; -use tokio::{net::TcpStream, runtime::Runtime, sync::Barrier}; +use tokio::{net::TcpStream, runtime::Runtime}; static REQUESTS_COMPLETED: AtomicUsize = AtomicUsize::new(0); @@ -34,9 +31,6 @@ const CHECK_FOR_PROGRESS_INTERVAL: Duration = Duration::from_millis(100); fn logical_deadlock() { let server_addr = serve(); - // We start all the tasks at the same time for increased water hammer. - let start = Arc::new(Barrier::new(CONCURRENCY)); - let runtime = Runtime::new().unwrap(); runtime.block_on(async move { @@ -53,11 +47,8 @@ fn logical_deadlock() { for _ in 0..CONCURRENCY { join_handles.push(tokio::spawn({ let mut client = client.clone(); - let start = start.clone(); async move { - start.wait().await; - for _ in 0..TARGET_REQUESTS_PER_TASK { let request = Request::builder() .method(Method::POST) @@ -161,12 +152,8 @@ fn serve() -> SocketAddr { tokio::spawn(async move { loop { - println!("Waiting for connection on {}", local_addr); - let (socket, _) = listener.accept().await.unwrap(); - println!("Accepted connection from {}", socket.peer_addr().unwrap()); - // Fork each connection. tokio::spawn(async move { let mut connection = server::Builder::new() From 66abdc0acd3b0b630dde935bdfba067ccae2912e Mon Sep 17 00:00:00 2001 From: Sander Saares Date: Tue, 22 Jul 2025 18:53:50 +0300 Subject: [PATCH 6/6] tokio::time::sleep --- tests/h2-tests/tests/deadlock.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/h2-tests/tests/deadlock.rs b/tests/h2-tests/tests/deadlock.rs index 1c04ecc7..a5d6686a 100644 --- a/tests/h2-tests/tests/deadlock.rs +++ b/tests/h2-tests/tests/deadlock.rs @@ -120,7 +120,7 @@ fn logical_deadlock() { ); } - thread::sleep(CHECK_FOR_PROGRESS_INTERVAL); + tokio::time::sleep(CHECK_FOR_PROGRESS_INTERVAL).await; } println!("All requests completed successfully.");