From 11d765282e73e629b2c23402151e15724a4d9c91 Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Tue, 29 Jul 2025 09:45:01 -0300 Subject: [PATCH 1/9] signal with channel --- concurrency/src/tasks/gen_server.rs | 62 ++++++++++++++++----- concurrency/src/tasks/stream_tests.rs | 8 +-- concurrency/src/tasks/timer_tests.rs | 6 +- examples/bank/src/main.rs | 2 +- examples/blocking_genserver/main.rs | 4 +- examples/name_server/src/main.rs | 2 +- examples/name_server_with_error/src/main.rs | 3 +- examples/updater/src/main.rs | 3 +- 8 files changed, 62 insertions(+), 28 deletions(-) diff --git a/concurrency/src/tasks/gen_server.rs b/concurrency/src/tasks/gen_server.rs index 88eb743..126ab86 100644 --- a/concurrency/src/tasks/gen_server.rs +++ b/concurrency/src/tasks/gen_server.rs @@ -25,7 +25,7 @@ impl Clone for GenServerHandle { } impl GenServerHandle { - pub(crate) fn new(gen_server: G) -> Self { + pub(crate) fn new(gen_server: G) -> Result { let (tx, mut rx) = mpsc::channel::>(); let cancellation_token = CancellationToken::new(); let handle = GenServerHandle { @@ -33,16 +33,28 @@ impl GenServerHandle { cancellation_token, }; let handle_clone = handle.clone(); + + // We create a channel of single use to signal when the GenServer has started. + let (mut start_signal_tx, start_signal_rx) = std::sync::mpsc::channel(); // Ignore the JoinHandle for now. Maybe we'll use it in the future let _join_handle = rt::spawn(async move { - if gen_server.run(&handle, &mut rx).await.is_err() { + if gen_server + .run(&handle, &mut rx, &mut start_signal_tx) + .await + .is_err() + { tracing::trace!("GenServer crashed") }; }); - handle_clone + + // Briefly block until the GenServer signals us that it has started + match start_signal_rx.recv() { + Ok(true) => Ok(handle_clone), + _ => Err(GenServerError::Initialization), + } } - pub(crate) fn new_blocking(gen_server: G) -> Self { + pub(crate) fn new_blocking(gen_server: G) -> Result { let (tx, mut rx) = mpsc::channel::>(); let cancellation_token = CancellationToken::new(); let handle = GenServerHandle { @@ -50,15 +62,27 @@ impl GenServerHandle { cancellation_token, }; let handle_clone = handle.clone(); + + // We create a channel of single use to signal when the GenServer has started. + let (mut start_signal_tx, start_signal_rx) = std::sync::mpsc::channel(); // Ignore the JoinHandle for now. Maybe we'll use it in the future let _join_handle = rt::spawn_blocking(|| { rt::block_on(async move { - if gen_server.run(&handle, &mut rx).await.is_err() { + if gen_server + .run(&handle, &mut rx, &mut start_signal_tx) + .await + .is_err() + { tracing::trace!("GenServer crashed") }; }) }); - handle_clone + + // Briefly block until the GenServer signals us that it has started + match start_signal_rx.recv() { + Ok(true) => Ok(handle_clone), + _ => Err(GenServerError::Initialization), + } } pub fn sender(&self) -> mpsc::Sender> { @@ -126,7 +150,7 @@ pub trait GenServer: Send + Sized + Clone { type OutMsg: Send + Sized; type Error: Debug + Send; - fn start(self) -> GenServerHandle { + fn start(self) -> Result, GenServerError> { GenServerHandle::new(self) } @@ -135,7 +159,7 @@ pub trait GenServer: Send + Sized + Clone { /// or other blocking tasks need to be in their own separate thread, and the OS /// will manage them through hardware interrupts. /// Start blocking provides such thread. - fn start_blocking(self) -> GenServerHandle { + fn start_blocking(self) -> Result, GenServerError> { GenServerHandle::new_blocking(self) } @@ -143,6 +167,7 @@ pub trait GenServer: Send + Sized + Clone { self, handle: &GenServerHandle, rx: &mut mpsc::Receiver>, + start_signal_tx: &mut std::sync::mpsc::Sender, ) -> impl Future> + Send { async { let init_result = self @@ -151,8 +176,15 @@ pub trait GenServer: Send + Sized + Clone { .inspect_err(|err| tracing::error!("Initialization failed: {err:?}")); let res = match init_result { - Ok(new_state) => new_state.main_loop(handle, rx).await, - Err(_) => Err(GenServerError::Initialization), + Ok(new_state) => { + start_signal_tx.send(true).unwrap(); // TODO: REMOVE UNWRAP + new_state.main_loop(handle, rx).await + }, + Err(_) => { + // Signal the spawner that the initialization failed + start_signal_tx.send(false).unwrap(); // TODO: REMOVE UNWRAP + Err(GenServerError::Initialization) + } }; handle.cancellation_token().cancel(); @@ -379,9 +411,9 @@ mod tests { pub fn badly_behaved_thread_non_blocking() { let runtime = rt::Runtime::new().unwrap(); runtime.block_on(async move { - let mut badboy = BadlyBehavedTask.start(); + let mut badboy = BadlyBehavedTask.start().unwrap(); let _ = badboy.cast(()).await; - let mut goodboy = WellBehavedTask { count: 0 }.start(); + let mut goodboy = WellBehavedTask { count: 0 }.start().unwrap(); let _ = goodboy.cast(()).await; rt::sleep(Duration::from_secs(1)).await; let count = goodboy.call(InMessage::GetCount).await.unwrap(); @@ -399,9 +431,9 @@ mod tests { pub fn badly_behaved_thread() { let runtime = rt::Runtime::new().unwrap(); runtime.block_on(async move { - let mut badboy = BadlyBehavedTask.start_blocking(); + let mut badboy = BadlyBehavedTask.start_blocking().unwrap(); let _ = badboy.cast(()).await; - let mut goodboy = WellBehavedTask { count: 0 }.start(); + let mut goodboy = WellBehavedTask { count: 0 }.start().unwrap(); let _ = goodboy.cast(()).await; rt::sleep(Duration::from_secs(1)).await; let count = goodboy.call(InMessage::GetCount).await.unwrap(); @@ -456,7 +488,7 @@ mod tests { pub fn unresolving_task_times_out() { let runtime = rt::Runtime::new().unwrap(); runtime.block_on(async move { - let mut unresolving_task = SomeTask.start(); + let mut unresolving_task = SomeTask.start().unwrap(); let result = unresolving_task .call_with_timeout(SomeTaskCallMsg::FastOperation, TIMEOUT_DURATION) diff --git a/concurrency/src/tasks/stream_tests.rs b/concurrency/src/tasks/stream_tests.rs index 09f8bf9..1ce7961 100644 --- a/concurrency/src/tasks/stream_tests.rs +++ b/concurrency/src/tasks/stream_tests.rs @@ -73,7 +73,7 @@ fn message_builder(value: u8) -> SummatoryCastMessage { pub fn test_sum_numbers_from_stream() { let runtime = rt::Runtime::new().unwrap(); runtime.block_on(async move { - let mut summatory_handle = Summatory::new(0).start(); + let mut summatory_handle = Summatory::new(0).start().unwrap(); let stream = tokio_stream::iter(vec![1u8, 2, 3, 4, 5].into_iter().map(Ok::)); spawn_listener(summatory_handle.clone(), message_builder, stream); @@ -90,7 +90,7 @@ pub fn test_sum_numbers_from_stream() { pub fn test_sum_numbers_from_channel() { let runtime = rt::Runtime::new().unwrap(); runtime.block_on(async move { - let mut summatory_handle = Summatory::new(0).start(); + let mut summatory_handle = Summatory::new(0).start().unwrap(); let (tx, rx) = spawned_rt::tasks::mpsc::channel::>(); // Spawn a task to send numbers to the channel @@ -118,7 +118,7 @@ pub fn test_sum_numbers_from_channel() { pub fn test_sum_numbers_from_broadcast_channel() { let runtime = rt::Runtime::new().unwrap(); runtime.block_on(async move { - let mut summatory_handle = Summatory::new(0).start(); + let mut summatory_handle = Summatory::new(0).start().unwrap(); let (tx, rx) = tokio::sync::broadcast::channel::(5); // Spawn a task to send numbers to the channel @@ -148,7 +148,7 @@ pub fn test_stream_cancellation() { let runtime = rt::Runtime::new().unwrap(); runtime.block_on(async move { - let mut summatory_handle = Summatory::new(0).start(); + let mut summatory_handle = Summatory::new(0).start().unwrap(); let (tx, rx) = spawned_rt::tasks::mpsc::channel::>(); // Spawn a task to send numbers to the channel diff --git a/concurrency/src/tasks/timer_tests.rs b/concurrency/src/tasks/timer_tests.rs index 1aeedce..72858bc 100644 --- a/concurrency/src/tasks/timer_tests.rs +++ b/concurrency/src/tasks/timer_tests.rs @@ -102,7 +102,7 @@ pub fn test_send_interval_and_cancellation() { let runtime = rt::Runtime::new().unwrap(); runtime.block_on(async move { // Start a Repeater - let mut repeater = Repeater::new(0).start(); + let mut repeater = Repeater::new(0).start().unwrap(); // Wait for 1 second rt::sleep(Duration::from_secs(1)).await; @@ -210,7 +210,7 @@ pub fn test_send_after_and_cancellation() { let runtime = rt::Runtime::new().unwrap(); runtime.block_on(async move { // Start a Delayed - let mut repeater = Delayed::new(0).start(); + let mut repeater = Delayed::new(0).start().unwrap(); // Set a just once timed message let _ = send_after( @@ -254,7 +254,7 @@ pub fn test_send_after_gen_server_teardown() { let runtime = rt::Runtime::new().unwrap(); runtime.block_on(async move { // Start a Delayed - let mut repeater = Delayed::new(0).start(); + let mut repeater = Delayed::new(0).start().unwrap(); // Set a just once timed message let _ = send_after( diff --git a/examples/bank/src/main.rs b/examples/bank/src/main.rs index 37485c8..336104a 100644 --- a/examples/bank/src/main.rs +++ b/examples/bank/src/main.rs @@ -30,7 +30,7 @@ use spawned_rt::tasks as rt; fn main() { rt::run(async { // Starting the bank - let mut name_server = Bank::new().start(); + let mut name_server = Bank::new().start().unwrap(); // Testing initial balance for "main" account let result = Bank::withdraw(&mut name_server, "main".to_string(), 15).await; diff --git a/examples/blocking_genserver/main.rs b/examples/blocking_genserver/main.rs index 98e670b..1b6c038 100644 --- a/examples/blocking_genserver/main.rs +++ b/examples/blocking_genserver/main.rs @@ -97,9 +97,9 @@ impl GenServer for WellBehavedTask { pub fn main() { rt::run(async move { // If we change BadlyBehavedTask to start instead, it can stop the entire program - let mut badboy = BadlyBehavedTask::new().start_blocking(); + let mut badboy = BadlyBehavedTask::new().start_blocking().unwrap(); let _ = badboy.cast(()).await; - let mut goodboy = WellBehavedTask::new(0).start(); + let mut goodboy = WellBehavedTask::new(0).start().unwrap(); let _ = goodboy.cast(()).await; rt::sleep(Duration::from_secs(1)).await; let count = goodboy.call(InMessage::GetCount).await.unwrap(); diff --git a/examples/name_server/src/main.rs b/examples/name_server/src/main.rs index 22e91c7..506ab3b 100644 --- a/examples/name_server/src/main.rs +++ b/examples/name_server/src/main.rs @@ -21,7 +21,7 @@ use spawned_rt::tasks as rt; fn main() { rt::run(async { - let mut name_server = NameServer::new().start(); + let mut name_server = NameServer::new().start().unwrap(); let result = NameServer::add(&mut name_server, "Joe".to_string(), "At Home".to_string()).await; diff --git a/examples/name_server_with_error/src/main.rs b/examples/name_server_with_error/src/main.rs index eb5ab4c..eb269fb 100644 --- a/examples/name_server_with_error/src/main.rs +++ b/examples/name_server_with_error/src/main.rs @@ -26,7 +26,8 @@ fn main() { let mut name_server = NameServer { inner: HashMap::new(), } - .start(); + .start() + .unwrap(); let result = NameServer::add(&mut name_server, "Joe".to_string(), "At Home".to_string()).await; diff --git a/examples/updater/src/main.rs b/examples/updater/src/main.rs index a0db2cb..5d6f8df 100644 --- a/examples/updater/src/main.rs +++ b/examples/updater/src/main.rs @@ -19,7 +19,8 @@ fn main() { "https://httpbin.org/ip".to_string(), Duration::from_millis(1000), ) - .start(); + .start() + .unwrap(); // giving it some time before ending thread::sleep(Duration::from_secs(10)); From 27c82821943637e79a56520862e0b2131c17365c Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Tue, 29 Jul 2025 10:01:06 -0300 Subject: [PATCH 2/9] remove unwraps --- concurrency/src/tasks/gen_server.rs | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/concurrency/src/tasks/gen_server.rs b/concurrency/src/tasks/gen_server.rs index 126ab86..a70ceac 100644 --- a/concurrency/src/tasks/gen_server.rs +++ b/concurrency/src/tasks/gen_server.rs @@ -47,7 +47,7 @@ impl GenServerHandle { }; }); - // Briefly block until the GenServer signals us that it has started + // Wait for the GenServer to signal us that it has started match start_signal_rx.recv() { Ok(true) => Ok(handle_clone), _ => Err(GenServerError::Initialization), @@ -78,7 +78,7 @@ impl GenServerHandle { }) }); - // Briefly block until the GenServer signals us that it has started + // Wait for the GenServer to signal us that it has started match start_signal_rx.recv() { Ok(true) => Ok(handle_clone), _ => Err(GenServerError::Initialization), @@ -177,12 +177,11 @@ pub trait GenServer: Send + Sized + Clone { let res = match init_result { Ok(new_state) => { - start_signal_tx.send(true).unwrap(); // TODO: REMOVE UNWRAP + start_signal_tx.send(true).map_err(|_| GenServerError::Initialization)?; new_state.main_loop(handle, rx).await }, Err(_) => { - // Signal the spawner that the initialization failed - start_signal_tx.send(false).unwrap(); // TODO: REMOVE UNWRAP + start_signal_tx.send(false).map_err(|_| GenServerError::Initialization)?; Err(GenServerError::Initialization) } }; From 1985d54d27f790d0fd9fe3a56b7c8022634ec38c Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Tue, 29 Jul 2025 10:29:59 -0300 Subject: [PATCH 3/9] add test --- concurrency/src/tasks/gen_server.rs | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/concurrency/src/tasks/gen_server.rs b/concurrency/src/tasks/gen_server.rs index a70ceac..b80f5f5 100644 --- a/concurrency/src/tasks/gen_server.rs +++ b/concurrency/src/tasks/gen_server.rs @@ -500,4 +500,32 @@ mod tests { assert!(matches!(result, Err(GenServerError::CallTimeout))); }); } + + #[derive(Clone)] + struct FailsOnInitTask; + + impl GenServer for FailsOnInitTask { + type CallMsg = (); + type CastMsg = (); + type OutMsg = (); + type Error = (); + + async fn init(self, _handle: &GenServerHandle) -> Result { + Err(()) + } + } + + #[test] + pub fn failing_on_init_task() { + let runtime = rt::Runtime::new().unwrap(); + runtime.block_on(async move { + // Attempt to start a GenServer that fails on initialization + let result = FailsOnInitTask.start(); + assert!(matches!(result, Err(GenServerError::Initialization))); + + // Other tasks should start correctly + let result = WellBehavedTask { count: 0 }.start(); + assert!(result.is_ok()); + }); + } } From dcee65496748e67ee065611b1a0cb9502b4b43a7 Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Tue, 29 Jul 2025 10:35:43 -0300 Subject: [PATCH 4/9] doc function --- concurrency/src/tasks/gen_server.rs | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/concurrency/src/tasks/gen_server.rs b/concurrency/src/tasks/gen_server.rs index b80f5f5..ce7620e 100644 --- a/concurrency/src/tasks/gen_server.rs +++ b/concurrency/src/tasks/gen_server.rs @@ -150,6 +150,7 @@ pub trait GenServer: Send + Sized + Clone { type OutMsg: Send + Sized; type Error: Debug + Send; + /// Starts the GenServer, waiting for it to finalize its `init` process. fn start(self) -> Result, GenServerError> { GenServerHandle::new(self) } @@ -159,6 +160,7 @@ pub trait GenServer: Send + Sized + Clone { /// or other blocking tasks need to be in their own separate thread, and the OS /// will manage them through hardware interrupts. /// Start blocking provides such thread. + /// As with `start`, it waits for the GenServer to finalize its `init` process. fn start_blocking(self) -> Result, GenServerError> { GenServerHandle::new_blocking(self) } @@ -177,11 +179,15 @@ pub trait GenServer: Send + Sized + Clone { let res = match init_result { Ok(new_state) => { - start_signal_tx.send(true).map_err(|_| GenServerError::Initialization)?; + start_signal_tx + .send(true) + .map_err(|_| GenServerError::Initialization)?; new_state.main_loop(handle, rx).await - }, + } Err(_) => { - start_signal_tx.send(false).map_err(|_| GenServerError::Initialization)?; + start_signal_tx + .send(false) + .map_err(|_| GenServerError::Initialization)?; Err(GenServerError::Initialization) } }; From 593cc940e1f94d9e8278b8b74500a6f68ef988fd Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Tue, 29 Jul 2025 10:57:38 -0300 Subject: [PATCH 5/9] make use of the join_handle --- concurrency/src/tasks/gen_server.rs | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/concurrency/src/tasks/gen_server.rs b/concurrency/src/tasks/gen_server.rs index ce7620e..06ad626 100644 --- a/concurrency/src/tasks/gen_server.rs +++ b/concurrency/src/tasks/gen_server.rs @@ -37,7 +37,7 @@ impl GenServerHandle { // We create a channel of single use to signal when the GenServer has started. let (mut start_signal_tx, start_signal_rx) = std::sync::mpsc::channel(); // Ignore the JoinHandle for now. Maybe we'll use it in the future - let _join_handle = rt::spawn(async move { + let join_handle = rt::spawn(async move { if gen_server .run(&handle, &mut rx, &mut start_signal_tx) .await @@ -50,7 +50,10 @@ impl GenServerHandle { // Wait for the GenServer to signal us that it has started match start_signal_rx.recv() { Ok(true) => Ok(handle_clone), - _ => Err(GenServerError::Initialization), + _ => { + join_handle.abort(); // Abort the task even tho we know it won't run anymore + Err(GenServerError::Initialization) + } } } @@ -65,8 +68,7 @@ impl GenServerHandle { // We create a channel of single use to signal when the GenServer has started. let (mut start_signal_tx, start_signal_rx) = std::sync::mpsc::channel(); - // Ignore the JoinHandle for now. Maybe we'll use it in the future - let _join_handle = rt::spawn_blocking(|| { + let join_handle = rt::spawn_blocking(|| { rt::block_on(async move { if gen_server .run(&handle, &mut rx, &mut start_signal_tx) @@ -81,7 +83,10 @@ impl GenServerHandle { // Wait for the GenServer to signal us that it has started match start_signal_rx.recv() { Ok(true) => Ok(handle_clone), - _ => Err(GenServerError::Initialization), + _ => { + join_handle.abort(); // Abort the task even tho we know it won't run anymore + Err(GenServerError::Initialization) + } } } From a5278aa95e5f9e90068a57a423cbd2836b540063 Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Tue, 29 Jul 2025 15:46:29 -0300 Subject: [PATCH 6/9] restore old interface, add verified methods --- concurrency/src/tasks/gen_server.rs | 109 ++++++++++++++++---- concurrency/src/tasks/stream_tests.rs | 8 +- concurrency/src/tasks/timer_tests.rs | 6 +- examples/bank/src/main.rs | 2 +- examples/blocking_genserver/main.rs | 4 +- examples/name_server/src/main.rs | 2 +- examples/name_server_with_error/src/main.rs | 3 +- examples/updater/src/main.rs | 3 +- 8 files changed, 100 insertions(+), 37 deletions(-) diff --git a/concurrency/src/tasks/gen_server.rs b/concurrency/src/tasks/gen_server.rs index 06ad626..7b9a0f5 100644 --- a/concurrency/src/tasks/gen_server.rs +++ b/concurrency/src/tasks/gen_server.rs @@ -25,7 +25,26 @@ impl Clone for GenServerHandle { } impl GenServerHandle { - pub(crate) fn new(gen_server: G) -> Result { + pub(crate) fn new(gen_server: G) -> Self { + let (tx, mut rx) = mpsc::channel::>(); + let cancellation_token = CancellationToken::new(); + let handle = GenServerHandle { + tx, + cancellation_token, + }; + let handle_clone = handle.clone(); + + // Ignore the JoinHandle for now. Maybe we'll use it in the future + let _join_handle = rt::spawn(async move { + if gen_server.run(&handle, &mut rx, None).await.is_err() { + tracing::trace!("GenServer crashed") + }; + }); + + handle_clone + } + + pub(crate) fn verified_new(gen_server: G) -> Result { let (tx, mut rx) = mpsc::channel::>(); let cancellation_token = CancellationToken::new(); let handle = GenServerHandle { @@ -39,7 +58,7 @@ impl GenServerHandle { // Ignore the JoinHandle for now. Maybe we'll use it in the future let join_handle = rt::spawn(async move { if gen_server - .run(&handle, &mut rx, &mut start_signal_tx) + .run(&handle, &mut rx, Some(&mut start_signal_tx)) .await .is_err() { @@ -57,7 +76,28 @@ impl GenServerHandle { } } - pub(crate) fn new_blocking(gen_server: G) -> Result { + pub(crate) fn new_blocking(gen_server: G) -> Self { + let (tx, mut rx) = mpsc::channel::>(); + let cancellation_token = CancellationToken::new(); + let handle = GenServerHandle { + tx, + cancellation_token, + }; + let handle_clone = handle.clone(); + + // Ignore the JoinHandle for now. Maybe we'll use it in the future + let _join_handle = rt::spawn_blocking(|| { + rt::block_on(async move { + if gen_server.run(&handle, &mut rx, None).await.is_err() { + tracing::trace!("GenServer crashed") + }; + }) + }); + + handle_clone + } + + pub(crate) fn verified_new_blocking(gen_server: G) -> Result { let (tx, mut rx) = mpsc::channel::>(); let cancellation_token = CancellationToken::new(); let handle = GenServerHandle { @@ -67,11 +107,13 @@ impl GenServerHandle { let handle_clone = handle.clone(); // We create a channel of single use to signal when the GenServer has started. + // This channel is used in the verified method, here it's just to keep the API consistent. + // The handle is thereby returned immediately, without waiting for the GenServer to start. let (mut start_signal_tx, start_signal_rx) = std::sync::mpsc::channel(); let join_handle = rt::spawn_blocking(|| { rt::block_on(async move { if gen_server - .run(&handle, &mut rx, &mut start_signal_tx) + .run(&handle, &mut rx, Some(&mut start_signal_tx)) .await .is_err() { @@ -155,26 +197,43 @@ pub trait GenServer: Send + Sized + Clone { type OutMsg: Send + Sized; type Error: Debug + Send; - /// Starts the GenServer, waiting for it to finalize its `init` process. - fn start(self) -> Result, GenServerError> { + /// Starts the GenServer, without waiting for it to finalize its `init` process. + fn start(self) -> GenServerHandle { GenServerHandle::new(self) } + /// Starts the GenServer, waiting for it to finalize its `init` process. + fn verified_start(self) -> Result, GenServerError> { + GenServerHandle::verified_new(self) + } + /// Tokio tasks depend on a coolaborative multitasking model. "work stealing" can't /// happen if the task is blocking the thread. As such, for sync compute task /// or other blocking tasks need to be in their own separate thread, and the OS /// will manage them through hardware interrupts. /// Start blocking provides such thread. - /// As with `start`, it waits for the GenServer to finalize its `init` process. - fn start_blocking(self) -> Result, GenServerError> { + /// + /// As with `start`, it doesn't wait for the GenServer to finalize its `init` process. + fn start_blocking(self) -> GenServerHandle { GenServerHandle::new_blocking(self) } + /// Tokio tasks depend on a coolaborative multitasking model. "work stealing" can't + /// happen if the task is blocking the thread. As such, for sync compute task + /// or other blocking tasks need to be in their own separate thread, and the OS + /// will manage them through hardware interrupts. + /// Start blocking provides such thread. + /// + /// As with `verified_start`, it waits for the GenServer to finalize its `init` process. + fn verified_start_blocking(self) -> Result, GenServerError> { + GenServerHandle::verified_new_blocking(self) + } + fn run( self, handle: &GenServerHandle, rx: &mut mpsc::Receiver>, - start_signal_tx: &mut std::sync::mpsc::Sender, + start_signal_tx: Option<&mut std::sync::mpsc::Sender>, ) -> impl Future> + Send { async { let init_result = self @@ -184,15 +243,21 @@ pub trait GenServer: Send + Sized + Clone { let res = match init_result { Ok(new_state) => { - start_signal_tx - .send(true) - .map_err(|_| GenServerError::Initialization)?; + if let Some(start_signal_tx) = start_signal_tx { + // Notify that the GenServer has started successfully + start_signal_tx + .send(true) + .map_err(|_| GenServerError::Initialization)?; + } new_state.main_loop(handle, rx).await } Err(_) => { - start_signal_tx - .send(false) - .map_err(|_| GenServerError::Initialization)?; + if let Some(start_signal_tx) = start_signal_tx { + // Notify that the GenServer failed to start + start_signal_tx + .send(false) + .map_err(|_| GenServerError::Initialization)?; + } Err(GenServerError::Initialization) } }; @@ -421,9 +486,9 @@ mod tests { pub fn badly_behaved_thread_non_blocking() { let runtime = rt::Runtime::new().unwrap(); runtime.block_on(async move { - let mut badboy = BadlyBehavedTask.start().unwrap(); + let mut badboy = BadlyBehavedTask.start(); let _ = badboy.cast(()).await; - let mut goodboy = WellBehavedTask { count: 0 }.start().unwrap(); + let mut goodboy = WellBehavedTask { count: 0 }.start(); let _ = goodboy.cast(()).await; rt::sleep(Duration::from_secs(1)).await; let count = goodboy.call(InMessage::GetCount).await.unwrap(); @@ -441,9 +506,9 @@ mod tests { pub fn badly_behaved_thread() { let runtime = rt::Runtime::new().unwrap(); runtime.block_on(async move { - let mut badboy = BadlyBehavedTask.start_blocking().unwrap(); + let mut badboy = BadlyBehavedTask.start_blocking(); let _ = badboy.cast(()).await; - let mut goodboy = WellBehavedTask { count: 0 }.start().unwrap(); + let mut goodboy = WellBehavedTask { count: 0 }.start(); let _ = goodboy.cast(()).await; rt::sleep(Duration::from_secs(1)).await; let count = goodboy.call(InMessage::GetCount).await.unwrap(); @@ -498,7 +563,7 @@ mod tests { pub fn unresolving_task_times_out() { let runtime = rt::Runtime::new().unwrap(); runtime.block_on(async move { - let mut unresolving_task = SomeTask.start().unwrap(); + let mut unresolving_task = SomeTask.start(); let result = unresolving_task .call_with_timeout(SomeTaskCallMsg::FastOperation, TIMEOUT_DURATION) @@ -531,11 +596,11 @@ mod tests { let runtime = rt::Runtime::new().unwrap(); runtime.block_on(async move { // Attempt to start a GenServer that fails on initialization - let result = FailsOnInitTask.start(); + let result = FailsOnInitTask.verified_start(); assert!(matches!(result, Err(GenServerError::Initialization))); // Other tasks should start correctly - let result = WellBehavedTask { count: 0 }.start(); + let result = WellBehavedTask { count: 0 }.verified_start(); assert!(result.is_ok()); }); } diff --git a/concurrency/src/tasks/stream_tests.rs b/concurrency/src/tasks/stream_tests.rs index 1ce7961..09f8bf9 100644 --- a/concurrency/src/tasks/stream_tests.rs +++ b/concurrency/src/tasks/stream_tests.rs @@ -73,7 +73,7 @@ fn message_builder(value: u8) -> SummatoryCastMessage { pub fn test_sum_numbers_from_stream() { let runtime = rt::Runtime::new().unwrap(); runtime.block_on(async move { - let mut summatory_handle = Summatory::new(0).start().unwrap(); + let mut summatory_handle = Summatory::new(0).start(); let stream = tokio_stream::iter(vec![1u8, 2, 3, 4, 5].into_iter().map(Ok::)); spawn_listener(summatory_handle.clone(), message_builder, stream); @@ -90,7 +90,7 @@ pub fn test_sum_numbers_from_stream() { pub fn test_sum_numbers_from_channel() { let runtime = rt::Runtime::new().unwrap(); runtime.block_on(async move { - let mut summatory_handle = Summatory::new(0).start().unwrap(); + let mut summatory_handle = Summatory::new(0).start(); let (tx, rx) = spawned_rt::tasks::mpsc::channel::>(); // Spawn a task to send numbers to the channel @@ -118,7 +118,7 @@ pub fn test_sum_numbers_from_channel() { pub fn test_sum_numbers_from_broadcast_channel() { let runtime = rt::Runtime::new().unwrap(); runtime.block_on(async move { - let mut summatory_handle = Summatory::new(0).start().unwrap(); + let mut summatory_handle = Summatory::new(0).start(); let (tx, rx) = tokio::sync::broadcast::channel::(5); // Spawn a task to send numbers to the channel @@ -148,7 +148,7 @@ pub fn test_stream_cancellation() { let runtime = rt::Runtime::new().unwrap(); runtime.block_on(async move { - let mut summatory_handle = Summatory::new(0).start().unwrap(); + let mut summatory_handle = Summatory::new(0).start(); let (tx, rx) = spawned_rt::tasks::mpsc::channel::>(); // Spawn a task to send numbers to the channel diff --git a/concurrency/src/tasks/timer_tests.rs b/concurrency/src/tasks/timer_tests.rs index 72858bc..1aeedce 100644 --- a/concurrency/src/tasks/timer_tests.rs +++ b/concurrency/src/tasks/timer_tests.rs @@ -102,7 +102,7 @@ pub fn test_send_interval_and_cancellation() { let runtime = rt::Runtime::new().unwrap(); runtime.block_on(async move { // Start a Repeater - let mut repeater = Repeater::new(0).start().unwrap(); + let mut repeater = Repeater::new(0).start(); // Wait for 1 second rt::sleep(Duration::from_secs(1)).await; @@ -210,7 +210,7 @@ pub fn test_send_after_and_cancellation() { let runtime = rt::Runtime::new().unwrap(); runtime.block_on(async move { // Start a Delayed - let mut repeater = Delayed::new(0).start().unwrap(); + let mut repeater = Delayed::new(0).start(); // Set a just once timed message let _ = send_after( @@ -254,7 +254,7 @@ pub fn test_send_after_gen_server_teardown() { let runtime = rt::Runtime::new().unwrap(); runtime.block_on(async move { // Start a Delayed - let mut repeater = Delayed::new(0).start().unwrap(); + let mut repeater = Delayed::new(0).start(); // Set a just once timed message let _ = send_after( diff --git a/examples/bank/src/main.rs b/examples/bank/src/main.rs index 336104a..37485c8 100644 --- a/examples/bank/src/main.rs +++ b/examples/bank/src/main.rs @@ -30,7 +30,7 @@ use spawned_rt::tasks as rt; fn main() { rt::run(async { // Starting the bank - let mut name_server = Bank::new().start().unwrap(); + let mut name_server = Bank::new().start(); // Testing initial balance for "main" account let result = Bank::withdraw(&mut name_server, "main".to_string(), 15).await; diff --git a/examples/blocking_genserver/main.rs b/examples/blocking_genserver/main.rs index 1b6c038..98e670b 100644 --- a/examples/blocking_genserver/main.rs +++ b/examples/blocking_genserver/main.rs @@ -97,9 +97,9 @@ impl GenServer for WellBehavedTask { pub fn main() { rt::run(async move { // If we change BadlyBehavedTask to start instead, it can stop the entire program - let mut badboy = BadlyBehavedTask::new().start_blocking().unwrap(); + let mut badboy = BadlyBehavedTask::new().start_blocking(); let _ = badboy.cast(()).await; - let mut goodboy = WellBehavedTask::new(0).start().unwrap(); + let mut goodboy = WellBehavedTask::new(0).start(); let _ = goodboy.cast(()).await; rt::sleep(Duration::from_secs(1)).await; let count = goodboy.call(InMessage::GetCount).await.unwrap(); diff --git a/examples/name_server/src/main.rs b/examples/name_server/src/main.rs index 506ab3b..22e91c7 100644 --- a/examples/name_server/src/main.rs +++ b/examples/name_server/src/main.rs @@ -21,7 +21,7 @@ use spawned_rt::tasks as rt; fn main() { rt::run(async { - let mut name_server = NameServer::new().start().unwrap(); + let mut name_server = NameServer::new().start(); let result = NameServer::add(&mut name_server, "Joe".to_string(), "At Home".to_string()).await; diff --git a/examples/name_server_with_error/src/main.rs b/examples/name_server_with_error/src/main.rs index eb269fb..eb5ab4c 100644 --- a/examples/name_server_with_error/src/main.rs +++ b/examples/name_server_with_error/src/main.rs @@ -26,8 +26,7 @@ fn main() { let mut name_server = NameServer { inner: HashMap::new(), } - .start() - .unwrap(); + .start(); let result = NameServer::add(&mut name_server, "Joe".to_string(), "At Home".to_string()).await; diff --git a/examples/updater/src/main.rs b/examples/updater/src/main.rs index 5d6f8df..a0db2cb 100644 --- a/examples/updater/src/main.rs +++ b/examples/updater/src/main.rs @@ -19,8 +19,7 @@ fn main() { "https://httpbin.org/ip".to_string(), Duration::from_millis(1000), ) - .start() - .unwrap(); + .start(); // giving it some time before ending thread::sleep(Duration::from_secs(10)); From 28aafe691aafab61efb5614775794cb06f82e68a Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Tue, 29 Jul 2025 15:48:20 -0300 Subject: [PATCH 7/9] reword comment --- concurrency/src/tasks/gen_server.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/concurrency/src/tasks/gen_server.rs b/concurrency/src/tasks/gen_server.rs index 7b9a0f5..050c9cd 100644 --- a/concurrency/src/tasks/gen_server.rs +++ b/concurrency/src/tasks/gen_server.rs @@ -243,8 +243,9 @@ pub trait GenServer: Send + Sized + Clone { let res = match init_result { Ok(new_state) => { + // Notify that the GenServer has started successfully + // in case we have a start signal channel if let Some(start_signal_tx) = start_signal_tx { - // Notify that the GenServer has started successfully start_signal_tx .send(true) .map_err(|_| GenServerError::Initialization)?; @@ -252,8 +253,9 @@ pub trait GenServer: Send + Sized + Clone { new_state.main_loop(handle, rx).await } Err(_) => { + // Notify that the GenServer failed to start + // in case we have a start signal channel if let Some(start_signal_tx) = start_signal_tx { - // Notify that the GenServer failed to start start_signal_tx .send(false) .map_err(|_| GenServerError::Initialization)?; From fc6e96cefa00bb800d5c424386649cc9115f8ccc Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Tue, 29 Jul 2025 15:52:28 -0300 Subject: [PATCH 8/9] cover all start methods in test --- concurrency/src/tasks/gen_server.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/concurrency/src/tasks/gen_server.rs b/concurrency/src/tasks/gen_server.rs index 050c9cd..4304cdf 100644 --- a/concurrency/src/tasks/gen_server.rs +++ b/concurrency/src/tasks/gen_server.rs @@ -601,9 +601,17 @@ mod tests { let result = FailsOnInitTask.verified_start(); assert!(matches!(result, Err(GenServerError::Initialization))); + // Attempt to start a GenServer (in a blocking way) that fails on initialization + let result = FailsOnInitTask.verified_start_blocking(); + assert!(matches!(result, Err(GenServerError::Initialization))); + // Other tasks should start correctly let result = WellBehavedTask { count: 0 }.verified_start(); assert!(result.is_ok()); + + // They also should start in blocking mode + let result = WellBehavedTask { count: 0 }.verified_start_blocking(); + assert!(result.is_ok()); }); } } From 021624408567633217aa7b8e3d006570896098bf Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Tue, 29 Jul 2025 15:56:23 -0300 Subject: [PATCH 9/9] remove empty lines from diff --- concurrency/src/tasks/gen_server.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/concurrency/src/tasks/gen_server.rs b/concurrency/src/tasks/gen_server.rs index 4304cdf..50e6a40 100644 --- a/concurrency/src/tasks/gen_server.rs +++ b/concurrency/src/tasks/gen_server.rs @@ -33,14 +33,12 @@ impl GenServerHandle { cancellation_token, }; let handle_clone = handle.clone(); - // Ignore the JoinHandle for now. Maybe we'll use it in the future let _join_handle = rt::spawn(async move { if gen_server.run(&handle, &mut rx, None).await.is_err() { tracing::trace!("GenServer crashed") }; }); - handle_clone } @@ -84,7 +82,6 @@ impl GenServerHandle { cancellation_token, }; let handle_clone = handle.clone(); - // Ignore the JoinHandle for now. Maybe we'll use it in the future let _join_handle = rt::spawn_blocking(|| { rt::block_on(async move { @@ -93,7 +90,6 @@ impl GenServerHandle { }; }) }); - handle_clone }