Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 22 additions & 14 deletions concurrency/src/tasks/gen_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,21 +164,30 @@ where
state: Self::State,
) -> impl Future<Output = Result<(), GenServerError>> + Send {
async {
let init_result = self
.init(handle, state.clone())
.await
.inspect_err(|err| tracing::error!("Initialization failed: {err:?}"));
let after_init_state = match self.init(handle, state).await {
Ok(state) => state,
Err(err) => {
tracing::error!("Error during initialization: {err:?}");
handle.cancellation_token().cancel();
return Err(GenServerError::Initialization);
}
};

let res = match init_result {
Ok(new_state) => self.main_loop(handle, rx, new_state).await,
Err(_) => Err(GenServerError::Initialization),
let final_state = match self.main_loop(handle, rx, after_init_state).await {
Ok(state) => state,
Err(err) => {
tracing::error!("Error during main loop: {err:?}");
handle.cancellation_token().cancel();
return Err(err);
}
};

handle.cancellation_token().cancel();
if let Err(err) = self.teardown(handle, state).await {
if let Err(err) = self.teardown(handle, final_state).await {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this also be run if main_loop fails?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm still unsure as to if its "correct" or not to go through a teardown knowing that our state may be incorrect, maybe just going through this process in the happy case is right approach.

tracing::error!("Error during teardown: {err:?}");
}
res

handle.cancellation_token().cancel();
Ok(())
}
}

Expand All @@ -198,7 +207,7 @@ where
handle: &GenServerHandle<Self>,
rx: &mut mpsc::Receiver<GenServerInMsg<Self>>,
mut state: Self::State,
) -> impl Future<Output = Result<(), GenServerError>> + Send {
) -> impl Future<Output = Result<Self::State, GenServerError>> + Send {
async {
loop {
let (new_state, cont) = self.receive(handle, rx, state).await?;
Expand All @@ -208,7 +217,7 @@ where
}
}
tracing::trace!("Stopping GenServer");
Ok(())
Ok(state)
}
}

Expand Down Expand Up @@ -305,8 +314,7 @@ where
}

/// Teardown function. It's called after the stop message is received.
/// It can be overrided on implementations in case final steps are required,
/// like closing streams, stopping timers, etc.
/// It can be overrided on implementations in case final steps are required.
fn teardown(
&mut self,
_handle: &GenServerHandle<Self>,
Expand Down