diff --git a/concurrency/src/tasks/gen_server.rs b/concurrency/src/tasks/gen_server.rs index fd03f00..da5da63 100644 --- a/concurrency/src/tasks/gen_server.rs +++ b/concurrency/src/tasks/gen_server.rs @@ -164,21 +164,30 @@ where state: Self::State, ) -> impl Future> + Send { async { - let init_result = self - .init(handle, state.clone()) - .await - .inspect_err(|err| tracing::error!("Initialization failed: {err:?}")); + let after_init_state = match self.init(handle, state).await { + Ok(state) => state, + Err(err) => { + tracing::error!("Error during initialization: {err:?}"); + handle.cancellation_token().cancel(); + return Err(GenServerError::Initialization); + } + }; - let res = match init_result { - Ok(new_state) => self.main_loop(handle, rx, new_state).await, - Err(_) => Err(GenServerError::Initialization), + let final_state = match self.main_loop(handle, rx, after_init_state).await { + Ok(state) => state, + Err(err) => { + tracing::error!("Error during main loop: {err:?}"); + handle.cancellation_token().cancel(); + return Err(err); + } }; - handle.cancellation_token().cancel(); - if let Err(err) = self.teardown(handle, state).await { + if let Err(err) = self.teardown(handle, final_state).await { tracing::error!("Error during teardown: {err:?}"); } - res + + handle.cancellation_token().cancel(); + Ok(()) } } @@ -198,7 +207,7 @@ where handle: &GenServerHandle, rx: &mut mpsc::Receiver>, mut state: Self::State, - ) -> impl Future> + Send { + ) -> impl Future> + Send { async { loop { let (new_state, cont) = self.receive(handle, rx, state).await?; @@ -208,7 +217,7 @@ where } } tracing::trace!("Stopping GenServer"); - Ok(()) + Ok(state) } } @@ -305,8 +314,7 @@ where } /// Teardown function. It's called after the stop message is received. - /// It can be overrided on implementations in case final steps are required, - /// like closing streams, stopping timers, etc. + /// It can be overrided on implementations in case final steps are required. fn teardown( &mut self, _handle: &GenServerHandle,