Skip to content

Added Success/NoSuccess results for GenServer init #39

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jul 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion concurrency/src/messages.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct Unused;
123 changes: 93 additions & 30 deletions concurrency/src/tasks/gen_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@ use futures::future::FutureExt as _;
use spawned_rt::tasks::{self as rt, mpsc, oneshot, timeout, CancellationToken};
use std::{fmt::Debug, future::Future, panic::AssertUnwindSafe, time::Duration};

use crate::error::GenServerError;
use crate::{
error::GenServerError,
tasks::InitResult::{NoSuccess, Success},
};

const DEFAULT_CALL_TIMEOUT: Duration = Duration::from_secs(5);

Expand Down Expand Up @@ -120,6 +123,11 @@ pub enum CastResponse<G: GenServer> {
Stop,
}

pub enum InitResult<G: GenServer> {
Success(G),
NoSuccess(G),
}

pub trait GenServer: Send + Sized + Clone {
type CallMsg: Clone + Send + Sized + Sync;
type CastMsg: Clone + Send + Sized + Sync;
Expand All @@ -145,14 +153,18 @@ pub trait GenServer: Send + Sized + Clone {
rx: &mut mpsc::Receiver<GenServerInMsg<Self>>,
) -> impl Future<Output = Result<(), GenServerError>> + Send {
async {
let init_result = self
.init(handle)
.await
.inspect_err(|err| tracing::error!("Initialization failed: {err:?}"));

let res = match init_result {
Ok(new_state) => new_state.main_loop(handle, rx).await,
Err(_) => Err(GenServerError::Initialization),
let res = match self.init(handle).await {
Ok(Success(new_state)) => new_state.main_loop(handle, rx).await,
Ok(NoSuccess(intermediate_state)) => {
// new_state is NoSuccess, this means the initialization failed, but the error was handled
// in callback. No need to report the error.
// Just skip main_loop and return the state to teardown the GenServer
Ok(intermediate_state)
}
Err(err) => {
tracing::error!("Initialization failed with unhandled error: {err:?}");
Err(GenServerError::Initialization)
}
};

handle.cancellation_token().cancel();
Expand All @@ -171,8 +183,8 @@ pub trait GenServer: Send + Sized + Clone {
fn init(
self,
_handle: &GenServerHandle<Self>,
) -> impl Future<Output = Result<Self, Self::Error>> + Send {
async { Ok(self) }
) -> impl Future<Output = Result<InitResult<Self>, Self::Error>> + Send {
async { Ok(Success(self)) }
}

fn main_loop(
Expand Down Expand Up @@ -297,8 +309,12 @@ pub trait GenServer: Send + Sized + Clone {
mod tests {

use super::*;
use crate::tasks::send_after;
use std::{thread, time::Duration};
use crate::{messages::Unused, tasks::send_after};
use std::{
sync::{Arc, Mutex},
thread,
time::Duration,
};

#[derive(Clone)]
struct BadlyBehavedTask;
Expand All @@ -315,16 +331,16 @@ mod tests {

impl GenServer for BadlyBehavedTask {
type CallMsg = InMessage;
type CastMsg = ();
type OutMsg = ();
type Error = ();
type CastMsg = Unused;
type OutMsg = Unused;
type Error = Unused;

async fn handle_call(
self,
_: Self::CallMsg,
_: &GenServerHandle<Self>,
) -> CallResponse<Self> {
CallResponse::Stop(())
CallResponse::Stop(Unused)
}

async fn handle_cast(
Expand All @@ -345,9 +361,9 @@ mod tests {

impl GenServer for WellBehavedTask {
type CallMsg = InMessage;
type CastMsg = ();
type CastMsg = Unused;
type OutMsg = OutMsg;
type Error = ();
type Error = Unused;

async fn handle_call(
self,
Expand All @@ -370,7 +386,7 @@ mod tests {
) -> CastResponse<Self> {
self.count += 1;
println!("{:?}: good still alive", thread::current().id());
send_after(Duration::from_millis(100), handle.to_owned(), ());
send_after(Duration::from_millis(100), handle.to_owned(), Unused);
CastResponse::NoReply(self)
}
}
Expand All @@ -380,9 +396,9 @@ mod tests {
let runtime = rt::Runtime::new().unwrap();
runtime.block_on(async move {
let mut badboy = BadlyBehavedTask.start();
let _ = badboy.cast(()).await;
let _ = badboy.cast(Unused).await;
let mut goodboy = WellBehavedTask { count: 0 }.start();
let _ = goodboy.cast(()).await;
let _ = goodboy.cast(Unused).await;
rt::sleep(Duration::from_secs(1)).await;
let count = goodboy.call(InMessage::GetCount).await.unwrap();

Expand All @@ -400,9 +416,9 @@ mod tests {
let runtime = rt::Runtime::new().unwrap();
runtime.block_on(async move {
let mut badboy = BadlyBehavedTask.start_blocking();
let _ = badboy.cast(()).await;
let _ = badboy.cast(Unused).await;
let mut goodboy = WellBehavedTask { count: 0 }.start();
let _ = goodboy.cast(()).await;
let _ = goodboy.cast(Unused).await;
rt::sleep(Duration::from_secs(1)).await;
let count = goodboy.call(InMessage::GetCount).await.unwrap();

Expand All @@ -428,9 +444,9 @@ mod tests {

impl GenServer for SomeTask {
type CallMsg = SomeTaskCallMsg;
type CastMsg = ();
type OutMsg = ();
type Error = ();
type CastMsg = Unused;
type OutMsg = Unused;
type Error = Unused;

async fn handle_call(
self,
Expand All @@ -441,12 +457,12 @@ mod tests {
SomeTaskCallMsg::SlowOperation => {
// Simulate a slow operation that will not resolve in time
rt::sleep(TIMEOUT_DURATION * 2).await;
CallResponse::Reply(self, ())
CallResponse::Reply(self, Unused)
}
SomeTaskCallMsg::FastOperation => {
// Simulate a fast operation that resolves in time
rt::sleep(TIMEOUT_DURATION / 2).await;
CallResponse::Reply(self, ())
CallResponse::Reply(self, Unused)
}
}
}
Expand All @@ -461,12 +477,59 @@ mod tests {
let result = unresolving_task
.call_with_timeout(SomeTaskCallMsg::FastOperation, TIMEOUT_DURATION)
.await;
assert!(matches!(result, Ok(())));
assert!(matches!(result, Ok(Unused)));

let result = unresolving_task
.call_with_timeout(SomeTaskCallMsg::SlowOperation, TIMEOUT_DURATION)
.await;
assert!(matches!(result, Err(GenServerError::CallTimeout)));
});
}

#[derive(Clone)]
struct SomeTaskThatFailsOnInit {
sender_channel: Arc<Mutex<mpsc::Receiver<u8>>>,
}

impl SomeTaskThatFailsOnInit {
pub fn new(sender_channel: Arc<Mutex<mpsc::Receiver<u8>>>) -> Self {
Self { sender_channel }
}
}

impl GenServer for SomeTaskThatFailsOnInit {
type CallMsg = Unused;
type CastMsg = Unused;
type OutMsg = Unused;
type Error = Unused;

async fn init(
self,
_handle: &GenServerHandle<Self>,
) -> Result<InitResult<Self>, Self::Error> {
// Simulate an initialization failure by returning NoSuccess
Ok(NoSuccess(self))
}

async fn teardown(self, _handle: &GenServerHandle<Self>) -> Result<(), Self::Error> {
self.sender_channel.lock().unwrap().close();
Ok(())
}
}

#[test]
pub fn task_fails_with_intermediate_state() {
let runtime = rt::Runtime::new().unwrap();
runtime.block_on(async move {
let (rx, tx) = mpsc::channel::<u8>();
let sender_channel = Arc::new(Mutex::new(tx));
let _task = SomeTaskThatFailsOnInit::new(sender_channel).start();

// Wait a while to ensure the task has time to run and fail
rt::sleep(Duration::from_secs(1)).await;

// We assure that the teardown function has ran by checking that the receiver channel is closed
assert!(rx.is_closed())
});
}
}
5 changes: 4 additions & 1 deletion concurrency/src/tasks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ mod stream_tests;
#[cfg(test)]
mod timer_tests;

pub use gen_server::{CallResponse, CastResponse, GenServer, GenServerHandle, GenServerInMsg};
pub use gen_server::{
CallResponse, CastResponse, GenServer, GenServerHandle, GenServerInMsg, InitResult,
InitResult::NoSuccess, InitResult::Success,
};
pub use process::{send, Process, ProcessInfo};
pub use stream::spawn_listener;
pub use time::{send_after, send_interval};
9 changes: 6 additions & 3 deletions concurrency/src/tasks/timer_tests.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
use crate::tasks::{send_interval, CallResponse, CastResponse, GenServer, GenServerHandle};
use crate::tasks::{
gen_server::InitResult, send_interval, CallResponse, CastResponse, GenServer, GenServerHandle,
InitResult::Success,
};
use spawned_rt::tasks::{self as rt, CancellationToken};
use std::time::Duration;

Expand Down Expand Up @@ -59,14 +62,14 @@ impl GenServer for Repeater {
type OutMsg = RepeaterOutMessage;
type Error = ();

async fn init(mut self, handle: &RepeaterHandle) -> Result<Self, Self::Error> {
async fn init(mut self, handle: &RepeaterHandle) -> Result<InitResult<Self>, Self::Error> {
let timer = send_interval(
Duration::from_millis(100),
handle.clone(),
RepeaterCastMessage::Inc,
);
self.cancellation_token = Some(timer.cancellation_token);
Ok(self)
Ok(Success(self))
}

async fn handle_call(
Expand Down
12 changes: 9 additions & 3 deletions examples/bank/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ use std::collections::HashMap;

use spawned_concurrency::{
messages::Unused,
tasks::{CallResponse, GenServer, GenServerHandle},
tasks::{
CallResponse, GenServer, GenServerHandle,
InitResult::{self, Success},
},
};

use crate::messages::{BankError, BankInMessage as InMessage, BankOutMessage as OutMessage};
Expand Down Expand Up @@ -60,9 +63,12 @@ impl GenServer for Bank {
type Error = BankError;

// Initializing "main" account with 1000 in balance to test init() callback.
async fn init(mut self, _handle: &GenServerHandle<Self>) -> Result<Self, Self::Error> {
async fn init(
mut self,
_handle: &GenServerHandle<Self>,
) -> Result<InitResult<Self>, Self::Error> {
self.accounts.insert("main".to_string(), 1000);
Ok(self)
Ok(Success(self))
}

async fn handle_call(
Expand Down
12 changes: 9 additions & 3 deletions examples/updater/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ use std::time::Duration;

use spawned_concurrency::{
messages::Unused,
tasks::{send_interval, CastResponse, GenServer, GenServerHandle},
tasks::{
send_interval, CastResponse, GenServer, GenServerHandle,
InitResult::{self, Success},
},
};
use spawned_rt::tasks::CancellationToken;

Expand Down Expand Up @@ -34,10 +37,13 @@ impl GenServer for UpdaterServer {
type Error = std::fmt::Error;

// Initializing GenServer to start periodic checks.
async fn init(mut self, handle: &GenServerHandle<Self>) -> Result<Self, Self::Error> {
async fn init(
mut self,
handle: &GenServerHandle<Self>,
) -> Result<InitResult<Self>, Self::Error> {
let timer = send_interval(self.periodicity, handle.clone(), InMessage::Check);
self.timer_token = Some(timer.cancellation_token);
Ok(self)
Ok(Success(self))
}

async fn handle_cast(
Expand Down