From df6c8e5c5e91fefef50b9518797fea8adced8222 Mon Sep 17 00:00:00 2001 From: Esteban Dimitroff Hodi Date: Thu, 24 Jul 2025 09:27:21 -0300 Subject: [PATCH 1/2] Added Success/NoSuccess results for GenServer init --- concurrency/src/tasks/gen_server.rs | 34 +++++++++++++++++++--------- concurrency/src/tasks/mod.rs | 5 +++- concurrency/src/tasks/timer_tests.rs | 9 +++++--- examples/bank/src/server.rs | 12 +++++++--- examples/updater/src/server.rs | 12 +++++++--- 5 files changed, 51 insertions(+), 21 deletions(-) diff --git a/concurrency/src/tasks/gen_server.rs b/concurrency/src/tasks/gen_server.rs index 88eb743..8d6838e 100644 --- a/concurrency/src/tasks/gen_server.rs +++ b/concurrency/src/tasks/gen_server.rs @@ -4,7 +4,10 @@ use futures::future::FutureExt as _; use spawned_rt::tasks::{self as rt, mpsc, oneshot, timeout, CancellationToken}; use std::{fmt::Debug, future::Future, panic::AssertUnwindSafe, time::Duration}; -use crate::error::GenServerError; +use crate::{ + error::GenServerError, + tasks::InitResult::{NoSuccess, Success}, +}; const DEFAULT_CALL_TIMEOUT: Duration = Duration::from_secs(5); @@ -120,6 +123,11 @@ pub enum CastResponse { Stop, } +pub enum InitResult { + Success(G), + NoSuccess(G), +} + pub trait GenServer: Send + Sized + Clone { type CallMsg: Clone + Send + Sized + Sync; type CastMsg: Clone + Send + Sized + Sync; @@ -145,14 +153,18 @@ pub trait GenServer: Send + Sized + Clone { rx: &mut mpsc::Receiver>, ) -> impl Future> + Send { async { - let init_result = self - .init(handle) - .await - .inspect_err(|err| tracing::error!("Initialization failed: {err:?}")); - - let res = match init_result { - Ok(new_state) => new_state.main_loop(handle, rx).await, - Err(_) => Err(GenServerError::Initialization), + let res = match self.init(handle).await { + Ok(Success(new_state)) => new_state.main_loop(handle, rx).await, + Ok(NoSuccess(intermediate_state)) => { + // new_state is NoSuccess, this means the initialization failed, but the error was handled + // in callback. No need to report the error. + // Just skip main_loop and return the state to teardown the GenServer + Ok(intermediate_state) + } + Err(err) => { + tracing::error!("Initialization failed with unhandled error: {err:?}"); + Err(GenServerError::Initialization) + } }; handle.cancellation_token().cancel(); @@ -171,8 +183,8 @@ pub trait GenServer: Send + Sized + Clone { fn init( self, _handle: &GenServerHandle, - ) -> impl Future> + Send { - async { Ok(self) } + ) -> impl Future, Self::Error>> + Send { + async { Ok(Success(self)) } } fn main_loop( diff --git a/concurrency/src/tasks/mod.rs b/concurrency/src/tasks/mod.rs index 201c7d2..65ce84b 100644 --- a/concurrency/src/tasks/mod.rs +++ b/concurrency/src/tasks/mod.rs @@ -11,7 +11,10 @@ mod stream_tests; #[cfg(test)] mod timer_tests; -pub use gen_server::{CallResponse, CastResponse, GenServer, GenServerHandle, GenServerInMsg}; +pub use gen_server::{ + CallResponse, CastResponse, GenServer, GenServerHandle, GenServerInMsg, InitResult, + InitResult::NoSuccess, InitResult::Success, +}; pub use process::{send, Process, ProcessInfo}; pub use stream::spawn_listener; pub use time::{send_after, send_interval}; diff --git a/concurrency/src/tasks/timer_tests.rs b/concurrency/src/tasks/timer_tests.rs index 1aeedce..9eef493 100644 --- a/concurrency/src/tasks/timer_tests.rs +++ b/concurrency/src/tasks/timer_tests.rs @@ -1,4 +1,7 @@ -use crate::tasks::{send_interval, CallResponse, CastResponse, GenServer, GenServerHandle}; +use crate::tasks::{ + gen_server::InitResult, send_interval, CallResponse, CastResponse, GenServer, GenServerHandle, + InitResult::Success, +}; use spawned_rt::tasks::{self as rt, CancellationToken}; use std::time::Duration; @@ -59,14 +62,14 @@ impl GenServer for Repeater { type OutMsg = RepeaterOutMessage; type Error = (); - async fn init(mut self, handle: &RepeaterHandle) -> Result { + async fn init(mut self, handle: &RepeaterHandle) -> Result, Self::Error> { let timer = send_interval( Duration::from_millis(100), handle.clone(), RepeaterCastMessage::Inc, ); self.cancellation_token = Some(timer.cancellation_token); - Ok(self) + Ok(Success(self)) } async fn handle_call( diff --git a/examples/bank/src/server.rs b/examples/bank/src/server.rs index 469172b..b290a07 100644 --- a/examples/bank/src/server.rs +++ b/examples/bank/src/server.rs @@ -2,7 +2,10 @@ use std::collections::HashMap; use spawned_concurrency::{ messages::Unused, - tasks::{CallResponse, GenServer, GenServerHandle}, + tasks::{ + CallResponse, GenServer, GenServerHandle, + InitResult::{self, Success}, + }, }; use crate::messages::{BankError, BankInMessage as InMessage, BankOutMessage as OutMessage}; @@ -60,9 +63,12 @@ impl GenServer for Bank { type Error = BankError; // Initializing "main" account with 1000 in balance to test init() callback. - async fn init(mut self, _handle: &GenServerHandle) -> Result { + async fn init( + mut self, + _handle: &GenServerHandle, + ) -> Result, Self::Error> { self.accounts.insert("main".to_string(), 1000); - Ok(self) + Ok(Success(self)) } async fn handle_call( diff --git a/examples/updater/src/server.rs b/examples/updater/src/server.rs index 41115d6..31a9fdb 100644 --- a/examples/updater/src/server.rs +++ b/examples/updater/src/server.rs @@ -2,7 +2,10 @@ use std::time::Duration; use spawned_concurrency::{ messages::Unused, - tasks::{send_interval, CastResponse, GenServer, GenServerHandle}, + tasks::{ + send_interval, CastResponse, GenServer, GenServerHandle, + InitResult::{self, Success}, + }, }; use spawned_rt::tasks::CancellationToken; @@ -34,10 +37,13 @@ impl GenServer for UpdaterServer { type Error = std::fmt::Error; // Initializing GenServer to start periodic checks. - async fn init(mut self, handle: &GenServerHandle) -> Result { + async fn init( + mut self, + handle: &GenServerHandle, + ) -> Result, Self::Error> { let timer = send_interval(self.periodicity, handle.clone(), InMessage::Check); self.timer_token = Some(timer.cancellation_token); - Ok(self) + Ok(Success(self)) } async fn handle_cast( From 51fdf9364ed7e028d753bcbf15a2e27df00899a7 Mon Sep 17 00:00:00 2001 From: Esteban Dimitroff Hodi Date: Tue, 29 Jul 2025 14:21:39 -0300 Subject: [PATCH 2/2] Added NoSuccess test --- concurrency/src/messages.rs | 2 +- concurrency/src/tasks/gen_server.rs | 89 +++++++++++++++++++++++------ 2 files changed, 71 insertions(+), 20 deletions(-) diff --git a/concurrency/src/messages.rs b/concurrency/src/messages.rs index c484657..e0aceb8 100644 --- a/concurrency/src/messages.rs +++ b/concurrency/src/messages.rs @@ -1,2 +1,2 @@ -#[derive(Clone)] +#[derive(Clone, Debug)] pub struct Unused; diff --git a/concurrency/src/tasks/gen_server.rs b/concurrency/src/tasks/gen_server.rs index 8d6838e..5c31170 100644 --- a/concurrency/src/tasks/gen_server.rs +++ b/concurrency/src/tasks/gen_server.rs @@ -309,8 +309,12 @@ pub trait GenServer: Send + Sized + Clone { mod tests { use super::*; - use crate::tasks::send_after; - use std::{thread, time::Duration}; + use crate::{messages::Unused, tasks::send_after}; + use std::{ + sync::{Arc, Mutex}, + thread, + time::Duration, + }; #[derive(Clone)] struct BadlyBehavedTask; @@ -327,16 +331,16 @@ mod tests { impl GenServer for BadlyBehavedTask { type CallMsg = InMessage; - type CastMsg = (); - type OutMsg = (); - type Error = (); + type CastMsg = Unused; + type OutMsg = Unused; + type Error = Unused; async fn handle_call( self, _: Self::CallMsg, _: &GenServerHandle, ) -> CallResponse { - CallResponse::Stop(()) + CallResponse::Stop(Unused) } async fn handle_cast( @@ -357,9 +361,9 @@ mod tests { impl GenServer for WellBehavedTask { type CallMsg = InMessage; - type CastMsg = (); + type CastMsg = Unused; type OutMsg = OutMsg; - type Error = (); + type Error = Unused; async fn handle_call( self, @@ -382,7 +386,7 @@ mod tests { ) -> CastResponse { self.count += 1; println!("{:?}: good still alive", thread::current().id()); - send_after(Duration::from_millis(100), handle.to_owned(), ()); + send_after(Duration::from_millis(100), handle.to_owned(), Unused); CastResponse::NoReply(self) } } @@ -392,9 +396,9 @@ mod tests { let runtime = rt::Runtime::new().unwrap(); runtime.block_on(async move { let mut badboy = BadlyBehavedTask.start(); - let _ = badboy.cast(()).await; + let _ = badboy.cast(Unused).await; let mut goodboy = WellBehavedTask { count: 0 }.start(); - let _ = goodboy.cast(()).await; + let _ = goodboy.cast(Unused).await; rt::sleep(Duration::from_secs(1)).await; let count = goodboy.call(InMessage::GetCount).await.unwrap(); @@ -412,9 +416,9 @@ mod tests { let runtime = rt::Runtime::new().unwrap(); runtime.block_on(async move { let mut badboy = BadlyBehavedTask.start_blocking(); - let _ = badboy.cast(()).await; + let _ = badboy.cast(Unused).await; let mut goodboy = WellBehavedTask { count: 0 }.start(); - let _ = goodboy.cast(()).await; + let _ = goodboy.cast(Unused).await; rt::sleep(Duration::from_secs(1)).await; let count = goodboy.call(InMessage::GetCount).await.unwrap(); @@ -440,9 +444,9 @@ mod tests { impl GenServer for SomeTask { type CallMsg = SomeTaskCallMsg; - type CastMsg = (); - type OutMsg = (); - type Error = (); + type CastMsg = Unused; + type OutMsg = Unused; + type Error = Unused; async fn handle_call( self, @@ -453,12 +457,12 @@ mod tests { SomeTaskCallMsg::SlowOperation => { // Simulate a slow operation that will not resolve in time rt::sleep(TIMEOUT_DURATION * 2).await; - CallResponse::Reply(self, ()) + CallResponse::Reply(self, Unused) } SomeTaskCallMsg::FastOperation => { // Simulate a fast operation that resolves in time rt::sleep(TIMEOUT_DURATION / 2).await; - CallResponse::Reply(self, ()) + CallResponse::Reply(self, Unused) } } } @@ -473,7 +477,7 @@ mod tests { let result = unresolving_task .call_with_timeout(SomeTaskCallMsg::FastOperation, TIMEOUT_DURATION) .await; - assert!(matches!(result, Ok(()))); + assert!(matches!(result, Ok(Unused))); let result = unresolving_task .call_with_timeout(SomeTaskCallMsg::SlowOperation, TIMEOUT_DURATION) @@ -481,4 +485,51 @@ mod tests { assert!(matches!(result, Err(GenServerError::CallTimeout))); }); } + + #[derive(Clone)] + struct SomeTaskThatFailsOnInit { + sender_channel: Arc>>, + } + + impl SomeTaskThatFailsOnInit { + pub fn new(sender_channel: Arc>>) -> Self { + Self { sender_channel } + } + } + + impl GenServer for SomeTaskThatFailsOnInit { + type CallMsg = Unused; + type CastMsg = Unused; + type OutMsg = Unused; + type Error = Unused; + + async fn init( + self, + _handle: &GenServerHandle, + ) -> Result, Self::Error> { + // Simulate an initialization failure by returning NoSuccess + Ok(NoSuccess(self)) + } + + async fn teardown(self, _handle: &GenServerHandle) -> Result<(), Self::Error> { + self.sender_channel.lock().unwrap().close(); + Ok(()) + } + } + + #[test] + pub fn task_fails_with_intermediate_state() { + let runtime = rt::Runtime::new().unwrap(); + runtime.block_on(async move { + let (rx, tx) = mpsc::channel::(); + let sender_channel = Arc::new(Mutex::new(tx)); + let _task = SomeTaskThatFailsOnInit::new(sender_channel).start(); + + // Wait a while to ensure the task has time to run and fail + rt::sleep(Duration::from_secs(1)).await; + + // We assure that the teardown function has ran by checking that the receiver channel is closed + assert!(rx.is_closed()) + }); + } }