diff --git a/Cargo.lock b/Cargo.lock index c85263d..5f11cfd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1197,7 +1197,7 @@ dependencies = [ [[package]] name = "spawned-concurrency" -version = "0.1.7" +version = "0.2.0" dependencies = [ "futures", "spawned-rt", @@ -1209,7 +1209,7 @@ dependencies = [ [[package]] name = "spawned-rt" -version = "0.1.7" +version = "0.2.0" dependencies = [ "crossbeam", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 03428b1..0a23f9a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,6 +22,6 @@ tracing = { version = "0.1.41", features = ["log"] } tracing-subscriber = { version = "0.3.19", features = ["env-filter"] } [workspace.package] -version = "0.1.7" +version = "0.2.0" license = "MIT" edition = "2021" diff --git a/concurrency/src/tasks/gen_server.rs b/concurrency/src/tasks/gen_server.rs index fd03f00..118b1d3 100644 --- a/concurrency/src/tasks/gen_server.rs +++ b/concurrency/src/tasks/gen_server.rs @@ -25,45 +25,35 @@ impl Clone for GenServerHandle { } impl GenServerHandle { - pub(crate) fn new(initial_state: G::State) -> Self { + pub(crate) fn new(gen_server: G) -> Self { let (tx, mut rx) = mpsc::channel::>(); let cancellation_token = CancellationToken::new(); let handle = GenServerHandle { tx, cancellation_token, }; - let mut gen_server: G = GenServer::new(); let handle_clone = handle.clone(); // Ignore the JoinHandle for now. Maybe we'll use it in the future let _join_handle = rt::spawn(async move { - if gen_server - .run(&handle, &mut rx, initial_state) - .await - .is_err() - { + if gen_server.run(&handle, &mut rx).await.is_err() { tracing::trace!("GenServer crashed") }; }); handle_clone } - pub(crate) fn new_blocking(initial_state: G::State) -> Self { + pub(crate) fn new_blocking(gen_server: G) -> Self { let (tx, mut rx) = mpsc::channel::>(); let cancellation_token = CancellationToken::new(); let handle = GenServerHandle { tx, cancellation_token, }; - let mut gen_server: G = GenServer::new(); let handle_clone = handle.clone(); // Ignore the JoinHandle for now. Maybe we'll use it in the future let _join_handle = rt::spawn_blocking(|| { rt::block_on(async move { - if gen_server - .run(&handle, &mut rx, initial_state) - .await - .is_err() - { + if gen_server.run(&handle, &mut rx).await.is_err() { tracing::trace!("GenServer crashed") }; }) @@ -119,33 +109,25 @@ pub enum GenServerInMsg { } pub enum CallResponse { - Reply(G::State, G::OutMsg), + Reply(G, G::OutMsg), Unused, Stop(G::OutMsg), } pub enum CastResponse { - NoReply(G::State), + NoReply(G), Unused, Stop, } -pub trait GenServer -where - Self: Default + Send + Sized, -{ +pub trait GenServer: Send + Sized + Clone { type CallMsg: Clone + Send + Sized + Sync; type CastMsg: Clone + Send + Sized + Sync; type OutMsg: Send + Sized; - type State: Clone + Send; type Error: Debug + Send; - fn new() -> Self { - Self::default() - } - - fn start(initial_state: Self::State) -> GenServerHandle { - GenServerHandle::new(initial_state) + fn start(self) -> GenServerHandle { + GenServerHandle::new(self) } /// Tokio tasks depend on a coolaborative multitasking model. "work stealing" can't @@ -153,29 +135,29 @@ where /// or other blocking tasks need to be in their own separate thread, and the OS /// will manage them through hardware interrupts. /// Start blocking provides such thread. - fn start_blocking(initial_state: Self::State) -> GenServerHandle { - GenServerHandle::new_blocking(initial_state) + fn start_blocking(self) -> GenServerHandle { + GenServerHandle::new_blocking(self) } fn run( - &mut self, + self, handle: &GenServerHandle, rx: &mut mpsc::Receiver>, - state: Self::State, ) -> impl Future> + Send { async { let init_result = self - .init(handle, state.clone()) + .clone() + .init(handle) .await .inspect_err(|err| tracing::error!("Initialization failed: {err:?}")); let res = match init_result { - Ok(new_state) => self.main_loop(handle, rx, new_state).await, + Ok(new_state) => new_state.main_loop(handle, rx).await, Err(_) => Err(GenServerError::Initialization), }; handle.cancellation_token().cancel(); - if let Err(err) = self.teardown(handle, state).await { + if let Err(err) = self.teardown(handle).await { tracing::error!("Error during teardown: {err:?}"); } res @@ -186,23 +168,21 @@ where /// can be overrided on implementations in case initial steps are /// required. fn init( - &mut self, + self, _handle: &GenServerHandle, - state: Self::State, - ) -> impl Future> + Send { - async { Ok(state) } + ) -> impl Future> + Send { + async { Ok(self) } } fn main_loop( - &mut self, + mut self, handle: &GenServerHandle, rx: &mut mpsc::Receiver>, - mut state: Self::State, ) -> impl Future> + Send { async { loop { - let (new_state, cont) = self.receive(handle, rx, state).await?; - state = new_state; + let (new_state, cont) = self.receive(handle, rx).await?; + self = new_state; if !cont { break; } @@ -213,21 +193,20 @@ where } fn receive( - &mut self, + self, handle: &GenServerHandle, rx: &mut mpsc::Receiver>, - state: Self::State, - ) -> impl Future> + Send { + ) -> impl Future> + Send { async move { let message = rx.recv().await; // Save current state in case of a rollback - let state_clone = state.clone(); + let state_clone = self.clone(); let (keep_running, new_state) = match message { Some(GenServerInMsg::Call { sender, message }) => { let (keep_running, new_state, response) = - match AssertUnwindSafe(self.handle_call(message, handle, state)) + match AssertUnwindSafe(self.handle_call(message, handle)) .catch_unwind() .await { @@ -257,7 +236,7 @@ where (keep_running, new_state) } Some(GenServerInMsg::Cast { message }) => { - match AssertUnwindSafe(self.handle_cast(message, handle, state)) + match AssertUnwindSafe(self.handle_cast(message, handle)) .catch_unwind() .await { @@ -279,7 +258,7 @@ where } None => { // Channel has been closed; won't receive further messages. Stop the server. - (false, state) + (false, self) } }; Ok((new_state, keep_running)) @@ -287,19 +266,17 @@ where } fn handle_call( - &mut self, + self, _message: Self::CallMsg, _handle: &GenServerHandle, - _state: Self::State, ) -> impl Future> + Send { async { CallResponse::Unused } } fn handle_cast( - &mut self, + self, _message: Self::CastMsg, _handle: &GenServerHandle, - _state: Self::State, ) -> impl Future> + Send { async { CastResponse::Unused } } @@ -308,9 +285,8 @@ where /// It can be overrided on implementations in case final steps are required, /// like closing streams, stopping timers, etc. fn teardown( - &mut self, + self, _handle: &GenServerHandle, - _state: Self::State, ) -> impl Future> + Send { async { Ok(()) } } @@ -323,7 +299,7 @@ mod tests { use crate::tasks::send_after; use std::{thread, time::Duration}; - #[derive(Default)] + #[derive(Clone)] struct BadlyBehavedTask; #[derive(Clone)] @@ -340,23 +316,20 @@ mod tests { type CallMsg = InMessage; type CastMsg = (); type OutMsg = (); - type State = (); type Error = (); async fn handle_call( - &mut self, + self, _: Self::CallMsg, _: &GenServerHandle, - _: Self::State, ) -> CallResponse { CallResponse::Stop(()) } async fn handle_cast( - &mut self, + self, _: Self::CastMsg, _: &GenServerHandle, - _: Self::State, ) -> CastResponse { rt::sleep(Duration::from_millis(20)).await; thread::sleep(Duration::from_secs(2)); @@ -364,11 +337,8 @@ mod tests { } } - #[derive(Default)] - struct WellBehavedTask; - #[derive(Clone)] - struct CountState { + struct WellBehavedTask { pub count: u64, } @@ -376,34 +346,31 @@ mod tests { type CallMsg = InMessage; type CastMsg = (); type OutMsg = OutMsg; - type State = CountState; type Error = (); async fn handle_call( - &mut self, + self, message: Self::CallMsg, _: &GenServerHandle, - state: Self::State, ) -> CallResponse { match message { InMessage::GetCount => { - let count = state.count; - CallResponse::Reply(state, OutMsg::Count(count)) + let count = self.count; + CallResponse::Reply(self, OutMsg::Count(count)) } - InMessage::Stop => CallResponse::Stop(OutMsg::Count(state.count)), + InMessage::Stop => CallResponse::Stop(OutMsg::Count(self.count)), } } async fn handle_cast( - &mut self, + mut self, _: Self::CastMsg, handle: &GenServerHandle, - mut state: Self::State, ) -> CastResponse { - state.count += 1; + self.count += 1; println!("{:?}: good still alive", thread::current().id()); send_after(Duration::from_millis(100), handle.to_owned(), ()); - CastResponse::NoReply(state) + CastResponse::NoReply(self) } } @@ -411,9 +378,9 @@ mod tests { pub fn badly_behaved_thread_non_blocking() { let runtime = rt::Runtime::new().unwrap(); runtime.block_on(async move { - let mut badboy = BadlyBehavedTask::start(()); + let mut badboy = BadlyBehavedTask.start(); let _ = badboy.cast(()).await; - let mut goodboy = WellBehavedTask::start(CountState { count: 0 }); + let mut goodboy = WellBehavedTask { count: 0 }.start(); let _ = goodboy.cast(()).await; rt::sleep(Duration::from_secs(1)).await; let count = goodboy.call(InMessage::GetCount).await.unwrap(); @@ -431,9 +398,9 @@ mod tests { pub fn badly_behaved_thread() { let runtime = rt::Runtime::new().unwrap(); runtime.block_on(async move { - let mut badboy = BadlyBehavedTask::start_blocking(()); + let mut badboy = BadlyBehavedTask.start_blocking(); let _ = badboy.cast(()).await; - let mut goodboy = WellBehavedTask::start(CountState { count: 0 }); + let mut goodboy = WellBehavedTask { count: 0 }.start(); let _ = goodboy.cast(()).await; rt::sleep(Duration::from_secs(1)).await; let count = goodboy.call(InMessage::GetCount).await.unwrap(); @@ -449,7 +416,7 @@ mod tests { const TIMEOUT_DURATION: Duration = Duration::from_millis(100); - #[derive(Default)] + #[derive(Debug, Default, Clone)] struct SomeTask; #[derive(Clone)] @@ -462,25 +429,23 @@ mod tests { type CallMsg = SomeTaskCallMsg; type CastMsg = (); type OutMsg = (); - type State = (); type Error = (); async fn handle_call( - &mut self, + self, message: Self::CallMsg, _handle: &GenServerHandle, - _state: Self::State, ) -> CallResponse { match message { SomeTaskCallMsg::SlowOperation => { // Simulate a slow operation that will not resolve in time rt::sleep(TIMEOUT_DURATION * 2).await; - CallResponse::Reply((), ()) + CallResponse::Reply(self, ()) } SomeTaskCallMsg::FastOperation => { // Simulate a fast operation that resolves in time rt::sleep(TIMEOUT_DURATION / 2).await; - CallResponse::Reply((), ()) + CallResponse::Reply(self, ()) } } } @@ -490,7 +455,7 @@ mod tests { pub fn unresolving_task_times_out() { let runtime = rt::Runtime::new().unwrap(); runtime.block_on(async move { - let mut unresolving_task = SomeTask::start(()); + let mut unresolving_task = SomeTask.start(); let result = unresolving_task .call_with_timeout(SomeTaskCallMsg::FastOperation, TIMEOUT_DURATION) diff --git a/concurrency/src/tasks/stream_tests.rs b/concurrency/src/tasks/stream_tests.rs index 74e5f35..09f8bf9 100644 --- a/concurrency/src/tasks/stream_tests.rs +++ b/concurrency/src/tasks/stream_tests.rs @@ -8,11 +8,18 @@ use crate::tasks::{ type SummatoryHandle = GenServerHandle; -#[derive(Default)] -struct Summatory; +#[derive(Clone)] +struct Summatory { + count: u16, +} + +impl Summatory { + pub fn new(count: u16) -> Self { + Self { count } + } +} -type SummatoryState = u16; -type SummatoryOutMessage = SummatoryState; +type SummatoryOutMessage = u16; #[derive(Clone)] enum SummatoryCastMessage { @@ -21,7 +28,7 @@ enum SummatoryCastMessage { } impl Summatory { - pub async fn get_value(server: &mut SummatoryHandle) -> Result { + pub async fn get_value(server: &mut SummatoryHandle) -> Result { server.call(()).await.map_err(|_| ()) } } @@ -30,32 +37,29 @@ impl GenServer for Summatory { type CallMsg = (); // We only handle one type of call, so there is no need for a specific message type. type CastMsg = SummatoryCastMessage; type OutMsg = SummatoryOutMessage; - type State = SummatoryState; type Error = (); async fn handle_cast( - &mut self, + mut self, message: Self::CastMsg, _handle: &GenServerHandle, - state: Self::State, ) -> CastResponse { match message { SummatoryCastMessage::Add(val) => { - let new_state = state + val; - CastResponse::NoReply(new_state) + self.count += val; + CastResponse::NoReply(self) } SummatoryCastMessage::Stop => CastResponse::Stop, } } async fn handle_call( - &mut self, + self, _message: Self::CallMsg, _handle: &SummatoryHandle, - state: Self::State, ) -> CallResponse { - let current_value = state; - CallResponse::Reply(state, current_value) + let current_value = self.count; + CallResponse::Reply(self, current_value) } } @@ -69,7 +73,7 @@ fn message_builder(value: u8) -> SummatoryCastMessage { pub fn test_sum_numbers_from_stream() { let runtime = rt::Runtime::new().unwrap(); runtime.block_on(async move { - let mut summatory_handle = Summatory::start(0); + let mut summatory_handle = Summatory::new(0).start(); let stream = tokio_stream::iter(vec![1u8, 2, 3, 4, 5].into_iter().map(Ok::)); spawn_listener(summatory_handle.clone(), message_builder, stream); @@ -86,7 +90,7 @@ pub fn test_sum_numbers_from_stream() { pub fn test_sum_numbers_from_channel() { let runtime = rt::Runtime::new().unwrap(); runtime.block_on(async move { - let mut summatory_handle = Summatory::start(0); + let mut summatory_handle = Summatory::new(0).start(); let (tx, rx) = spawned_rt::tasks::mpsc::channel::>(); // Spawn a task to send numbers to the channel @@ -114,7 +118,7 @@ pub fn test_sum_numbers_from_channel() { pub fn test_sum_numbers_from_broadcast_channel() { let runtime = rt::Runtime::new().unwrap(); runtime.block_on(async move { - let mut summatory_handle = Summatory::start(0); + let mut summatory_handle = Summatory::new(0).start(); let (tx, rx) = tokio::sync::broadcast::channel::(5); // Spawn a task to send numbers to the channel @@ -144,7 +148,7 @@ pub fn test_stream_cancellation() { let runtime = rt::Runtime::new().unwrap(); runtime.block_on(async move { - let mut summatory_handle = Summatory::start(0); + let mut summatory_handle = Summatory::new(0).start(); let (tx, rx) = spawned_rt::tasks::mpsc::channel::>(); // Spawn a task to send numbers to the channel diff --git a/concurrency/src/tasks/timer_tests.rs b/concurrency/src/tasks/timer_tests.rs index d02f9cf..1aeedce 100644 --- a/concurrency/src/tasks/timer_tests.rs +++ b/concurrency/src/tasks/timer_tests.rs @@ -6,12 +6,6 @@ use super::send_after; type RepeaterHandle = GenServerHandle; -#[derive(Clone)] -struct RepeaterState { - pub(crate) count: i32, - pub(crate) cancellation_token: Option, -} - #[derive(Clone)] enum RepeaterCastMessage { Inc, @@ -28,8 +22,20 @@ enum RepeaterOutMessage { Count(i32), } -#[derive(Default)] -struct Repeater; +#[derive(Clone)] +struct Repeater { + pub(crate) count: i32, + pub(crate) cancellation_token: Option, +} + +impl Repeater { + pub fn new(initial_count: i32) -> Self { + Repeater { + count: initial_count, + cancellation_token: None, + } + } +} impl Repeater { pub async fn stop_timer(server: &mut RepeaterHandle) -> Result<(), ()> { @@ -51,50 +57,43 @@ impl GenServer for Repeater { type CallMsg = RepeaterCallMessage; type CastMsg = RepeaterCastMessage; type OutMsg = RepeaterOutMessage; - type State = RepeaterState; type Error = (); - async fn init( - &mut self, - handle: &RepeaterHandle, - mut state: Self::State, - ) -> Result { + async fn init(mut self, handle: &RepeaterHandle) -> Result { let timer = send_interval( Duration::from_millis(100), handle.clone(), RepeaterCastMessage::Inc, ); - state.cancellation_token = Some(timer.cancellation_token); - Ok(state) + self.cancellation_token = Some(timer.cancellation_token); + Ok(self) } async fn handle_call( - &mut self, + self, _message: Self::CallMsg, _handle: &RepeaterHandle, - state: Self::State, ) -> CallResponse { - let count = state.count; - CallResponse::Reply(state, RepeaterOutMessage::Count(count)) + let count = self.count; + CallResponse::Reply(self, RepeaterOutMessage::Count(count)) } async fn handle_cast( - &mut self, + mut self, message: Self::CastMsg, _handle: &GenServerHandle, - mut state: Self::State, ) -> CastResponse { match message { RepeaterCastMessage::Inc => { - state.count += 1; + self.count += 1; } RepeaterCastMessage::StopTimer => { - if let Some(ct) = state.cancellation_token.clone() { + if let Some(ct) = self.cancellation_token.clone() { ct.cancel() }; } }; - CastResponse::NoReply(state) + CastResponse::NoReply(self) } } @@ -103,10 +102,7 @@ pub fn test_send_interval_and_cancellation() { let runtime = rt::Runtime::new().unwrap(); runtime.block_on(async move { // Start a Repeater - let mut repeater = Repeater::start(RepeaterState { - count: 0, - cancellation_token: None, - }); + let mut repeater = Repeater::new(0).start(); // Wait for 1 second rt::sleep(Duration::from_secs(1)).await; @@ -133,11 +129,6 @@ pub fn test_send_interval_and_cancellation() { type DelayedHandle = GenServerHandle; -#[derive(Clone)] -struct DelayedState { - pub(crate) count: i32, -} - #[derive(Clone)] enum DelayedCastMessage { Inc, @@ -154,8 +145,18 @@ enum DelayedOutMessage { Count(i32), } -#[derive(Default)] -struct Delayed; +#[derive(Clone)] +struct Delayed { + pub(crate) count: i32, +} + +impl Delayed { + pub fn new(initial_count: i32) -> Self { + Delayed { + count: initial_count, + } + } +} impl Delayed { pub async fn get_count(server: &mut DelayedHandle) -> Result { @@ -174,36 +175,33 @@ impl GenServer for Delayed { type CallMsg = DelayedCallMessage; type CastMsg = DelayedCastMessage; type OutMsg = DelayedOutMessage; - type State = DelayedState; type Error = (); async fn handle_call( - &mut self, + self, message: Self::CallMsg, _handle: &DelayedHandle, - state: Self::State, ) -> CallResponse { match message { DelayedCallMessage::GetCount => { - let count = state.count; - CallResponse::Reply(state, DelayedOutMessage::Count(count)) + let count = self.count; + CallResponse::Reply(self, DelayedOutMessage::Count(count)) } - DelayedCallMessage::Stop => CallResponse::Stop(DelayedOutMessage::Count(state.count)), + DelayedCallMessage::Stop => CallResponse::Stop(DelayedOutMessage::Count(self.count)), } } async fn handle_cast( - &mut self, + mut self, message: Self::CastMsg, _handle: &DelayedHandle, - mut state: Self::State, ) -> CastResponse { match message { DelayedCastMessage::Inc => { - state.count += 1; + self.count += 1; } }; - CastResponse::NoReply(state) + CastResponse::NoReply(self) } } @@ -212,7 +210,7 @@ pub fn test_send_after_and_cancellation() { let runtime = rt::Runtime::new().unwrap(); runtime.block_on(async move { // Start a Delayed - let mut repeater = Delayed::start(DelayedState { count: 0 }); + let mut repeater = Delayed::new(0).start(); // Set a just once timed message let _ = send_after( @@ -256,7 +254,7 @@ pub fn test_send_after_gen_server_teardown() { let runtime = rt::Runtime::new().unwrap(); runtime.block_on(async move { // Start a Delayed - let mut repeater = Delayed::start(DelayedState { count: 0 }); + let mut repeater = Delayed::new(0).start(); // Set a just once timed message let _ = send_after( diff --git a/concurrency/src/threads/gen_server.rs b/concurrency/src/threads/gen_server.rs index 9ace83e..ee09b17 100644 --- a/concurrency/src/threads/gen_server.rs +++ b/concurrency/src/threads/gen_server.rs @@ -22,14 +22,13 @@ impl Clone for GenServerHandle { } impl GenServerHandle { - pub(crate) fn new(initial_state: G::State) -> Self { + pub(crate) fn new(gen_server: G) -> Self { let (tx, mut rx) = mpsc::channel::>(); let handle = GenServerHandle { tx }; - let mut gen_server: G = GenServer::new(); let handle_clone = handle.clone(); // Ignore the JoinHandle for now. Maybe we'll use it in the future let _join_handle = rt::spawn(move || { - if gen_server.run(&handle, &mut rx, initial_state).is_err() { + if gen_server.run(&handle, &mut rx).is_err() { tracing::trace!("GenServer crashed") }; }); @@ -70,50 +69,41 @@ pub enum GenServerInMsg { } pub enum CallResponse { - Reply(G::State, G::OutMsg), + Reply(G, G::OutMsg), Unused, Stop(G::OutMsg), } pub enum CastResponse { - NoReply(G::State), + NoReply(G), Unused, Stop, } -pub trait GenServer -where - Self: Default + Send + Sized, -{ +pub trait GenServer: Send + Sized + Clone { type CallMsg: Clone + Send + Sized; type CastMsg: Clone + Send + Sized; type OutMsg: Send + Sized; - type State: Clone + Send; type Error: Debug; - fn new() -> Self { - Self::default() - } - - fn start(initial_state: Self::State) -> GenServerHandle { - GenServerHandle::new(initial_state) + fn start(self) -> GenServerHandle { + GenServerHandle::new(self) } /// We copy the same interface as tasks, but all threads can work /// while blocking by default - fn start_blocking(initial_state: Self::State) -> GenServerHandle { - GenServerHandle::new(initial_state) + fn start_blocking(self) -> GenServerHandle { + GenServerHandle::new(self) } fn run( - &mut self, + self, handle: &GenServerHandle, rx: &mut mpsc::Receiver>, - state: Self::State, ) -> Result<(), GenServerError> { - match self.init(handle, state) { + match self.init(handle) { Ok(new_state) => { - self.main_loop(handle, rx, new_state)?; + new_state.main_loop(handle, rx)?; Ok(()) } Err(err) => { @@ -126,48 +116,40 @@ where /// Initialization function. It's called before main loop. It /// can be overrided on implementations in case initial steps are /// required. - fn init( - &mut self, - _handle: &GenServerHandle, - state: Self::State, - ) -> Result { - Ok(state) + fn init(self, _handle: &GenServerHandle) -> Result { + Ok(self) } fn main_loop( - &mut self, + mut self, handle: &GenServerHandle, rx: &mut mpsc::Receiver>, - mut state: Self::State, ) -> Result<(), GenServerError> { loop { - let (new_state, cont) = self.receive(handle, rx, state)?; + let (new_state, cont) = self.receive(handle, rx)?; if !cont { break; } - state = new_state; + self = new_state; } tracing::trace!("Stopping GenServer"); Ok(()) } fn receive( - &mut self, + self, handle: &GenServerHandle, rx: &mut mpsc::Receiver>, - state: Self::State, - ) -> Result<(Self::State, bool), GenServerError> { + ) -> Result<(Self, bool), GenServerError> { let message = rx.recv().ok(); // Save current state in case of a rollback - let state_clone = state.clone(); + let state_clone = self.clone(); let (keep_running, new_state) = match message { Some(GenServerInMsg::Call { sender, message }) => { let (keep_running, new_state, response) = - match catch_unwind(AssertUnwindSafe(|| { - self.handle_call(message, handle, state) - })) { + match catch_unwind(AssertUnwindSafe(|| self.handle_call(message, handle))) { Ok(response) => match response { CallResponse::Reply(new_state, response) => { (true, new_state, Ok(response)) @@ -192,9 +174,7 @@ where (keep_running, new_state) } Some(GenServerInMsg::Cast { message }) => { - match catch_unwind(AssertUnwindSafe(|| { - self.handle_cast(message, handle, state) - })) { + match catch_unwind(AssertUnwindSafe(|| self.handle_cast(message, handle))) { Ok(response) => match response { CastResponse::NoReply(new_state) => (true, new_state), CastResponse::Stop => (false, state_clone), @@ -211,26 +191,24 @@ where } None => { // Channel has been closed; won't receive further messages. Stop the server. - (false, state) + (false, self) } }; Ok((new_state, keep_running)) } fn handle_call( - &mut self, + self, _message: Self::CallMsg, _handle: &GenServerHandle, - _state: Self::State, ) -> CallResponse { CallResponse::Unused } fn handle_cast( - &mut self, + self, _message: Self::CastMsg, _handle: &GenServerHandle, - _state: Self::State, ) -> CastResponse { CastResponse::Unused } diff --git a/concurrency/src/threads/timer_tests.rs b/concurrency/src/threads/timer_tests.rs index f5d2124..7c144d8 100644 --- a/concurrency/src/threads/timer_tests.rs +++ b/concurrency/src/threads/timer_tests.rs @@ -6,12 +6,6 @@ use super::send_after; type RepeaterHandle = GenServerHandle; -#[derive(Clone)] -struct RepeaterState { - pub(crate) count: i32, - pub(crate) cancellation_token: Option, -} - #[derive(Clone)] enum RepeaterCastMessage { Inc, @@ -28,8 +22,20 @@ enum RepeaterOutMessage { Count(i32), } -#[derive(Default)] -struct Repeater; +#[derive(Clone)] +struct Repeater { + pub(crate) count: i32, + pub(crate) cancellation_token: Option, +} + +impl Repeater { + pub fn new(initial_count: i32) -> Self { + Repeater { + count: initial_count, + cancellation_token: None, + } + } +} impl Repeater { pub fn stop_timer(server: &mut RepeaterHandle) -> Result<(), ()> { @@ -45,60 +51,46 @@ impl GenServer for Repeater { type CallMsg = RepeaterCallMessage; type CastMsg = RepeaterCastMessage; type OutMsg = RepeaterOutMessage; - type State = RepeaterState; type Error = (); - fn init( - &mut self, - handle: &RepeaterHandle, - mut state: Self::State, - ) -> Result { + fn init(mut self, handle: &RepeaterHandle) -> Result { let timer = send_interval( Duration::from_millis(100), handle.clone(), RepeaterCastMessage::Inc, ); - state.cancellation_token = Some(timer.cancellation_token); - Ok(state) + self.cancellation_token = Some(timer.cancellation_token); + Ok(self) } - fn handle_call( - &mut self, - _message: Self::CallMsg, - _handle: &RepeaterHandle, - state: Self::State, - ) -> CallResponse { - let count = state.count; - CallResponse::Reply(state, RepeaterOutMessage::Count(count)) + fn handle_call(self, _message: Self::CallMsg, _handle: &RepeaterHandle) -> CallResponse { + let count = self.count; + CallResponse::Reply(self, RepeaterOutMessage::Count(count)) } fn handle_cast( - &mut self, + mut self, message: Self::CastMsg, _handle: &GenServerHandle, - mut state: Self::State, ) -> CastResponse { match message { RepeaterCastMessage::Inc => { - state.count += 1; + self.count += 1; } RepeaterCastMessage::StopTimer => { - if let Some(mut ct) = state.cancellation_token.clone() { + if let Some(mut ct) = self.cancellation_token.clone() { ct.cancel() }; } }; - CastResponse::NoReply(state) + CastResponse::NoReply(self) } } #[test] pub fn test_send_interval_and_cancellation() { // Start a Repeater - let mut repeater = Repeater::start(RepeaterState { - count: 0, - cancellation_token: None, - }); + let mut repeater = Repeater::new(0).start(); // Wait for 1 second rt::sleep(Duration::from_secs(1)); @@ -124,11 +116,6 @@ pub fn test_send_interval_and_cancellation() { type DelayedHandle = GenServerHandle; -#[derive(Clone)] -struct DelayedState { - pub(crate) count: i32, -} - #[derive(Clone)] enum DelayedCastMessage { Inc, @@ -144,8 +131,18 @@ enum DelayedOutMessage { Count(i32), } -#[derive(Default)] -struct Delayed; +#[derive(Clone)] +struct Delayed { + pub(crate) count: i32, +} + +impl Delayed { + pub fn new(initial_count: i32) -> Self { + Delayed { + count: initial_count, + } + } +} impl Delayed { pub fn get_count(server: &mut DelayedHandle) -> Result { @@ -157,38 +154,31 @@ impl GenServer for Delayed { type CallMsg = DelayedCallMessage; type CastMsg = DelayedCastMessage; type OutMsg = DelayedOutMessage; - type State = DelayedState; type Error = (); - fn handle_call( - &mut self, - _message: Self::CallMsg, - _handle: &DelayedHandle, - state: Self::State, - ) -> CallResponse { - let count = state.count; - CallResponse::Reply(state, DelayedOutMessage::Count(count)) + fn handle_call(self, _message: Self::CallMsg, _handle: &DelayedHandle) -> CallResponse { + let count = self.count; + CallResponse::Reply(self, DelayedOutMessage::Count(count)) } fn handle_cast( - &mut self, + mut self, message: Self::CastMsg, _handle: &DelayedHandle, - mut state: Self::State, ) -> CastResponse { match message { DelayedCastMessage::Inc => { - state.count += 1; + self.count += 1; } }; - CastResponse::NoReply(state) + CastResponse::NoReply(self) } } #[test] pub fn test_send_after_and_cancellation() { // Start a Delayed - let mut repeater = Delayed::start(DelayedState { count: 0 }); + let mut repeater = Delayed::new(0).start(); // Set a just once timed message let _ = send_after( diff --git a/examples/bank/src/main.rs b/examples/bank/src/main.rs index 7284745..37485c8 100644 --- a/examples/bank/src/main.rs +++ b/examples/bank/src/main.rs @@ -22,8 +22,6 @@ mod messages; mod server; -use std::collections::HashMap; - use messages::{BankError, BankOutMessage}; use server::Bank; use spawned_concurrency::tasks::GenServer as _; @@ -32,7 +30,7 @@ use spawned_rt::tasks as rt; fn main() { rt::run(async { // Starting the bank - let mut name_server = Bank::start(HashMap::new()); + let mut name_server = Bank::new().start(); // Testing initial balance for "main" account let result = Bank::withdraw(&mut name_server, "main".to_string(), 15).await; diff --git a/examples/bank/src/server.rs b/examples/bank/src/server.rs index de5bbb7..469172b 100644 --- a/examples/bank/src/server.rs +++ b/examples/bank/src/server.rs @@ -9,10 +9,19 @@ use crate::messages::{BankError, BankInMessage as InMessage, BankOutMessage as O type MsgResult = Result; type BankHandle = GenServerHandle; -type BankState = HashMap; -#[derive(Default)] -pub struct Bank {} +#[derive(Clone)] +pub struct Bank { + accounts: HashMap, +} + +impl Bank { + pub fn new() -> Self { + Bank { + accounts: HashMap::new(), + } + } +} impl Bank { pub async fn stop(server: &mut BankHandle) -> MsgResult { @@ -49,52 +58,46 @@ impl GenServer for Bank { type CastMsg = Unused; type OutMsg = MsgResult; type Error = BankError; - type State = BankState; // Initializing "main" account with 1000 in balance to test init() callback. - async fn init( - &mut self, - _handle: &GenServerHandle, - mut state: Self::State, - ) -> Result { - state.insert("main".to_string(), 1000); - Ok(state) + async fn init(mut self, _handle: &GenServerHandle) -> Result { + self.accounts.insert("main".to_string(), 1000); + Ok(self) } async fn handle_call( - &mut self, + mut self, message: Self::CallMsg, _handle: &BankHandle, - mut state: Self::State, ) -> CallResponse { match message.clone() { - Self::CallMsg::New { who } => match state.get(&who) { + Self::CallMsg::New { who } => match self.accounts.get(&who) { Some(_amount) => { - CallResponse::Reply(state, Err(BankError::AlreadyACustomer { who })) + CallResponse::Reply(self, Err(BankError::AlreadyACustomer { who })) } None => { - state.insert(who.clone(), 0); - CallResponse::Reply(state, Ok(OutMessage::Welcome { who })) + self.accounts.insert(who.clone(), 0); + CallResponse::Reply(self, Ok(OutMessage::Welcome { who })) } }, - Self::CallMsg::Add { who, amount } => match state.get(&who) { + Self::CallMsg::Add { who, amount } => match self.accounts.get(&who) { Some(current) => { let new_amount = current + amount; - state.insert(who.clone(), new_amount); + self.accounts.insert(who.clone(), new_amount); CallResponse::Reply( - state, + self, Ok(OutMessage::Balance { who, amount: new_amount, }), ) } - None => CallResponse::Reply(state, Err(BankError::NotACustomer { who })), + None => CallResponse::Reply(self, Err(BankError::NotACustomer { who })), }, - Self::CallMsg::Remove { who, amount } => match state.get(&who) { + Self::CallMsg::Remove { who, amount } => match self.accounts.get(&who) { Some(¤t) => match current < amount { true => CallResponse::Reply( - state, + self, Err(BankError::InsufficientBalance { who, amount: current, @@ -102,9 +105,9 @@ impl GenServer for Bank { ), false => { let new_amount = current - amount; - state.insert(who.clone(), new_amount); + self.accounts.insert(who.clone(), new_amount); CallResponse::Reply( - state, + self, Ok(OutMessage::WidrawOk { who, amount: new_amount, @@ -112,7 +115,7 @@ impl GenServer for Bank { ) } }, - None => CallResponse::Reply(state, Err(BankError::NotACustomer { who })), + None => CallResponse::Reply(self, Err(BankError::NotACustomer { who })), }, Self::CallMsg::Stop => CallResponse::Stop(Ok(OutMessage::Stopped)), } diff --git a/examples/bank_threads/src/main.rs b/examples/bank_threads/src/main.rs index ced28da..4fbca29 100644 --- a/examples/bank_threads/src/main.rs +++ b/examples/bank_threads/src/main.rs @@ -22,8 +22,6 @@ mod messages; mod server; -use std::collections::HashMap; - use messages::{BankError, BankOutMessage}; use server::Bank; use spawned_concurrency::threads::GenServer as _; @@ -32,7 +30,7 @@ use spawned_rt::threads as rt; fn main() { rt::run(|| { // Starting the bank - let mut name_server = Bank::start(HashMap::new()); + let mut name_server = Bank::new().start(); // Testing initial balance for "main" account let result = Bank::withdraw(&mut name_server, "main".to_string(), 15); diff --git a/examples/bank_threads/src/server.rs b/examples/bank_threads/src/server.rs index 610099e..5419708 100644 --- a/examples/bank_threads/src/server.rs +++ b/examples/bank_threads/src/server.rs @@ -9,10 +9,19 @@ use crate::messages::{BankError, BankInMessage as InMessage, BankOutMessage as O type MsgResult = Result; type BankHandle = GenServerHandle; -type BankState = HashMap; -#[derive(Default)] -pub struct Bank {} +#[derive(Clone)] +pub struct Bank { + accounts: HashMap, +} + +impl Bank { + pub fn new() -> Self { + Bank { + accounts: HashMap::new(), + } + } +} impl Bank { pub fn stop(server: &mut BankHandle) -> MsgResult { @@ -45,52 +54,42 @@ impl GenServer for Bank { type CastMsg = Unused; type OutMsg = MsgResult; type Error = BankError; - type State = BankState; // Initializing "main" account with 1000 in balance to test init() callback. - fn init( - &mut self, - _handle: &GenServerHandle, - mut state: Self::State, - ) -> Result { - state.insert("main".to_string(), 1000); - Ok(state) + fn init(mut self, _handle: &GenServerHandle) -> Result { + self.accounts.insert("main".to_string(), 1000); + Ok(self) } - fn handle_call( - &mut self, - message: Self::CallMsg, - _handle: &BankHandle, - mut state: Self::State, - ) -> CallResponse { + fn handle_call(mut self, message: Self::CallMsg, _handle: &BankHandle) -> CallResponse { match message.clone() { - Self::CallMsg::New { who } => match state.get(&who) { + Self::CallMsg::New { who } => match self.accounts.get(&who) { Some(_amount) => { - CallResponse::Reply(state, Err(BankError::AlreadyACustomer { who })) + CallResponse::Reply(self, Err(BankError::AlreadyACustomer { who })) } None => { - state.insert(who.clone(), 0); - CallResponse::Reply(state, Ok(OutMessage::Welcome { who })) + self.accounts.insert(who.clone(), 0); + CallResponse::Reply(self, Ok(OutMessage::Welcome { who })) } }, - Self::CallMsg::Add { who, amount } => match state.get(&who) { + Self::CallMsg::Add { who, amount } => match self.accounts.get(&who) { Some(current) => { let new_amount = current + amount; - state.insert(who.clone(), new_amount); + self.accounts.insert(who.clone(), new_amount); CallResponse::Reply( - state, + self, Ok(OutMessage::Balance { who, amount: new_amount, }), ) } - None => CallResponse::Reply(state, Err(BankError::NotACustomer { who })), + None => CallResponse::Reply(self, Err(BankError::NotACustomer { who })), }, - Self::CallMsg::Remove { who, amount } => match state.get(&who) { + Self::CallMsg::Remove { who, amount } => match self.accounts.get(&who) { Some(¤t) => match current < amount { true => CallResponse::Reply( - state, + self, Err(BankError::InsufficientBalance { who, amount: current, @@ -98,9 +97,9 @@ impl GenServer for Bank { ), false => { let new_amount = current - amount; - state.insert(who.clone(), new_amount); + self.accounts.insert(who.clone(), new_amount); CallResponse::Reply( - state, + self, Ok(OutMessage::WidrawOk { who, amount: new_amount, @@ -108,7 +107,7 @@ impl GenServer for Bank { ) } }, - None => CallResponse::Reply(state, Err(BankError::NotACustomer { who })), + None => CallResponse::Reply(self, Err(BankError::NotACustomer { who })), }, Self::CallMsg::Stop => CallResponse::Stop(Ok(OutMessage::Stopped)), } diff --git a/examples/blocking_genserver/main.rs b/examples/blocking_genserver/main.rs index 51f4138..98e670b 100644 --- a/examples/blocking_genserver/main.rs +++ b/examples/blocking_genserver/main.rs @@ -6,15 +6,22 @@ use spawned_concurrency::tasks::{ CallResponse, CastResponse, GenServer, GenServerHandle, send_after, }; -// We test a scenario with a badly behaved task#[derive(Default)] -#[derive(Default)] +// We test a scenario with a badly behaved task +#[derive(Clone)] struct BadlyBehavedTask; +impl BadlyBehavedTask { + pub fn new() -> Self { + BadlyBehavedTask + } +} + #[derive(Clone)] pub enum InMessage { GetCount, Stop, } + #[derive(Clone)] pub enum OutMsg { Count(u64), @@ -24,24 +31,13 @@ impl GenServer for BadlyBehavedTask { type CallMsg = InMessage; type CastMsg = (); type OutMsg = (); - type State = (); type Error = (); - async fn handle_call( - &mut self, - _: Self::CallMsg, - _: &GenServerHandle, - _: Self::State, - ) -> CallResponse { + async fn handle_call(self, _: Self::CallMsg, _: &GenServerHandle) -> CallResponse { CallResponse::Stop(()) } - async fn handle_cast( - &mut self, - _: Self::CastMsg, - _: &GenServerHandle, - _: Self::State, - ) -> CastResponse { + async fn handle_cast(self, _: Self::CastMsg, _: &GenServerHandle) -> CastResponse { rt::sleep(Duration::from_millis(20)).await; loop { println!("{:?}: bad still alive", thread::current().id()); @@ -50,46 +46,48 @@ impl GenServer for BadlyBehavedTask { } } -#[derive(Default)] -struct WellBehavedTask; - #[derive(Clone)] -struct CountState { - pub count: u64, +struct WellBehavedTask { + count: u64, +} + +impl WellBehavedTask { + pub fn new(initial_count: u64) -> Self { + WellBehavedTask { + count: initial_count, + } + } } impl GenServer for WellBehavedTask { type CallMsg = InMessage; type CastMsg = (); type OutMsg = OutMsg; - type State = CountState; type Error = (); async fn handle_call( - &mut self, + self, message: Self::CallMsg, _: &GenServerHandle, - state: Self::State, ) -> CallResponse { match message { InMessage::GetCount => { - let count = state.count; - CallResponse::Reply(state, OutMsg::Count(count)) + let count = self.count; + CallResponse::Reply(self, OutMsg::Count(count)) } - InMessage::Stop => CallResponse::Stop(OutMsg::Count(state.count)), + InMessage::Stop => CallResponse::Stop(OutMsg::Count(self.count)), } } async fn handle_cast( - &mut self, + mut self, _: Self::CastMsg, handle: &GenServerHandle, - mut state: Self::State, ) -> CastResponse { - state.count += 1; + self.count += 1; println!("{:?}: good still alive", thread::current().id()); send_after(Duration::from_millis(100), handle.to_owned(), ()); - CastResponse::NoReply(state) + CastResponse::NoReply(self) } } @@ -99,9 +97,9 @@ impl GenServer for WellBehavedTask { pub fn main() { rt::run(async move { // If we change BadlyBehavedTask to start instead, it can stop the entire program - let mut badboy = BadlyBehavedTask::start_blocking(()); + let mut badboy = BadlyBehavedTask::new().start_blocking(); let _ = badboy.cast(()).await; - let mut goodboy = WellBehavedTask::start(CountState { count: 0 }); + let mut goodboy = WellBehavedTask::new(0).start(); let _ = goodboy.cast(()).await; rt::sleep(Duration::from_secs(1)).await; let count = goodboy.call(InMessage::GetCount).await.unwrap(); diff --git a/examples/name_server/src/main.rs b/examples/name_server/src/main.rs index 713d41c..22e91c7 100644 --- a/examples/name_server/src/main.rs +++ b/examples/name_server/src/main.rs @@ -14,8 +14,6 @@ mod messages; mod server; -use std::collections::HashMap; - use messages::NameServerOutMessage; use server::NameServer; use spawned_concurrency::tasks::GenServer as _; @@ -23,7 +21,7 @@ use spawned_rt::tasks as rt; fn main() { rt::run(async { - let mut name_server = NameServer::start(HashMap::new()); + let mut name_server = NameServer::new().start(); let result = NameServer::add(&mut name_server, "Joe".to_string(), "At Home".to_string()).await; diff --git a/examples/name_server/src/server.rs b/examples/name_server/src/server.rs index 2191c32..54531f8 100644 --- a/examples/name_server/src/server.rs +++ b/examples/name_server/src/server.rs @@ -8,10 +8,19 @@ use spawned_concurrency::{ use crate::messages::{NameServerInMessage as InMessage, NameServerOutMessage as OutMessage}; type NameServerHandle = GenServerHandle; -type NameServerState = HashMap; -#[derive(Default)] -pub struct NameServer {} +#[derive(Clone)] +pub struct NameServer { + inner: HashMap, +} + +impl NameServer { + pub fn new() -> Self { + NameServer { + inner: HashMap::new(), + } + } +} impl NameServer { pub async fn add(server: &mut NameServerHandle, key: String, value: String) -> OutMessage { @@ -34,25 +43,23 @@ impl GenServer for NameServer { type CastMsg = Unused; type OutMsg = OutMessage; type Error = std::fmt::Error; - type State = NameServerState; async fn handle_call( - &mut self, + mut self, message: Self::CallMsg, _handle: &NameServerHandle, - mut state: Self::State, ) -> CallResponse { match message.clone() { Self::CallMsg::Add { key, value } => { - state.insert(key, value); - CallResponse::Reply(state, Self::OutMsg::Ok) + self.inner.insert(key, value); + CallResponse::Reply(self, Self::OutMsg::Ok) } - Self::CallMsg::Find { key } => match state.get(&key) { + Self::CallMsg::Find { key } => match self.inner.get(&key) { Some(result) => { let value = result.to_string(); - CallResponse::Reply(state, Self::OutMsg::Found { value }) + CallResponse::Reply(self, Self::OutMsg::Found { value }) } - None => CallResponse::Reply(state, Self::OutMsg::NotFound), + None => CallResponse::Reply(self, Self::OutMsg::NotFound), }, } } diff --git a/examples/name_server_with_error/src/main.rs b/examples/name_server_with_error/src/main.rs index 7cc5f67..eb5ab4c 100644 --- a/examples/name_server_with_error/src/main.rs +++ b/examples/name_server_with_error/src/main.rs @@ -23,7 +23,10 @@ use spawned_rt::tasks as rt; fn main() { rt::run(async { - let mut name_server = NameServer::start(HashMap::new()); + let mut name_server = NameServer { + inner: HashMap::new(), + } + .start(); let result = NameServer::add(&mut name_server, "Joe".to_string(), "At Home".to_string()).await; diff --git a/examples/name_server_with_error/src/server.rs b/examples/name_server_with_error/src/server.rs index def8ecf..47b2552 100644 --- a/examples/name_server_with_error/src/server.rs +++ b/examples/name_server_with_error/src/server.rs @@ -9,10 +9,11 @@ use spawned_concurrency::{ use crate::messages::{NameServerInMessage as InMessage, NameServerOutMessage as OutMessage}; type NameServerHandle = GenServerHandle; -type NameServerState = HashMap; -#[derive(Default)] -pub struct NameServer {} +#[derive(Clone)] +pub struct NameServer { + pub inner: HashMap, +} impl NameServer { pub async fn add(server: &mut NameServerHandle, key: String, value: String) -> OutMessage { @@ -36,29 +37,27 @@ impl GenServer for NameServer { type CastMsg = Unused; type OutMsg = OutMessage; type Error = std::fmt::Error; - type State = NameServerState; async fn handle_call( - &mut self, + mut self, message: Self::CallMsg, _handle: &NameServerHandle, - mut state: Self::State, ) -> CallResponse { match message.clone() { Self::CallMsg::Add { key, value } => { - state.insert(key.clone(), value); + self.inner.insert(key.clone(), value); if key == "error" { panic!("error!") } else { - CallResponse::Reply(state, Self::OutMsg::Ok) + CallResponse::Reply(self, Self::OutMsg::Ok) } } - Self::CallMsg::Find { key } => match state.get(&key) { + Self::CallMsg::Find { key } => match self.inner.get(&key) { Some(result) => { let value = result.to_string(); - CallResponse::Reply(state, Self::OutMsg::Found { value }) + CallResponse::Reply(self, Self::OutMsg::Found { value }) } - None => CallResponse::Reply(state, Self::OutMsg::NotFound), + None => CallResponse::Reply(self, Self::OutMsg::NotFound), }, } } diff --git a/examples/updater/src/main.rs b/examples/updater/src/main.rs index f04b3d4..a0db2cb 100644 --- a/examples/updater/src/main.rs +++ b/examples/updater/src/main.rs @@ -8,18 +8,18 @@ mod server; use std::{thread, time::Duration}; -use server::{UpdateServerState, UpdaterServer}; +use server::UpdaterServer; use spawned_concurrency::tasks::GenServer as _; use spawned_rt::tasks as rt; fn main() { rt::run(async { tracing::info!("Starting Updater"); - UpdaterServer::start(UpdateServerState { - url: "https://httpbin.org/ip".to_string(), - periodicity: Duration::from_millis(1000), - timer_token: None, - }); + UpdaterServer::new( + "https://httpbin.org/ip".to_string(), + Duration::from_millis(1000), + ) + .start(); // giving it some time before ending thread::sleep(Duration::from_secs(10)); diff --git a/examples/updater/src/server.rs b/examples/updater/src/server.rs index fbb822e..41115d6 100644 --- a/examples/updater/src/server.rs +++ b/examples/updater/src/server.rs @@ -11,46 +11,47 @@ use crate::messages::{UpdaterInMessage as InMessage, UpdaterOutMessage as OutMes type UpdateServerHandle = GenServerHandle; #[derive(Clone)] -pub struct UpdateServerState { +pub struct UpdaterServer { pub url: String, pub periodicity: Duration, pub timer_token: Option, } -#[derive(Default)] -pub struct UpdaterServer {} + +impl UpdaterServer { + pub fn new(url: String, periodicity: Duration) -> Self { + UpdaterServer { + url, + periodicity, + timer_token: None, + } + } +} impl GenServer for UpdaterServer { type CallMsg = Unused; type CastMsg = InMessage; type OutMsg = OutMessage; type Error = std::fmt::Error; - type State = UpdateServerState; // Initializing GenServer to start periodic checks. - async fn init( - &mut self, - handle: &GenServerHandle, - mut state: Self::State, - ) -> Result { - let timer = send_interval(state.periodicity, handle.clone(), InMessage::Check); - state.timer_token = Some(timer.cancellation_token); - Ok(state) + async fn init(mut self, handle: &GenServerHandle) -> Result { + let timer = send_interval(self.periodicity, handle.clone(), InMessage::Check); + self.timer_token = Some(timer.cancellation_token); + Ok(self) } async fn handle_cast( - &mut self, + self, message: Self::CastMsg, _handle: &UpdateServerHandle, - state: Self::State, ) -> CastResponse { match message { Self::CastMsg::Check => { - //send_after(state.periodicity, handle.clone(), InMessage::Check); - let url = state.url.clone(); + let url = self.url.clone(); tracing::info!("Fetching: {url}"); let resp = req(url).await; tracing::info!("Response: {resp:?}"); - CastResponse::NoReply(state) + CastResponse::NoReply(self) } } } diff --git a/examples/updater_threads/src/main.rs b/examples/updater_threads/src/main.rs index b4409b5..aad6dba 100644 --- a/examples/updater_threads/src/main.rs +++ b/examples/updater_threads/src/main.rs @@ -8,16 +8,17 @@ mod server; use std::{thread, time::Duration}; -use server::{UpdateServerState, UpdaterServer}; +use server::UpdaterServer; use spawned_concurrency::threads::GenServer as _; use spawned_rt::threads as rt; fn main() { rt::run(|| { - UpdaterServer::start(UpdateServerState { + UpdaterServer { url: "https://httpbin.org/ip".to_string(), periodicity: Duration::from_millis(1000), - }); + } + .start(); // giving it some time before ending thread::sleep(Duration::from_secs(10)); diff --git a/examples/updater_threads/src/server.rs b/examples/updater_threads/src/server.rs index ace3be0..0ed9173 100644 --- a/examples/updater_threads/src/server.rs +++ b/examples/updater_threads/src/server.rs @@ -11,46 +11,38 @@ use crate::messages::{UpdaterInMessage as InMessage, UpdaterOutMessage as OutMes type UpdateServerHandle = GenServerHandle; #[derive(Clone)] -pub struct UpdateServerState { +pub struct UpdaterServer { pub url: String, pub periodicity: Duration, } -#[derive(Default)] -pub struct UpdaterServer {} impl GenServer for UpdaterServer { type CallMsg = Unused; type CastMsg = InMessage; type OutMsg = OutMessage; type Error = std::fmt::Error; - type State = UpdateServerState; // Initializing GenServer to start periodic checks. - fn init( - &mut self, - handle: &GenServerHandle, - state: Self::State, - ) -> Result { - send_after(state.periodicity, handle.clone(), InMessage::Check); - Ok(state) + fn init(self, handle: &GenServerHandle) -> Result { + send_after(self.periodicity, handle.clone(), InMessage::Check); + Ok(self) } fn handle_cast( - &mut self, + self, message: Self::CastMsg, handle: &UpdateServerHandle, - state: Self::State, ) -> CastResponse { match message { Self::CastMsg::Check => { - send_after(state.periodicity, handle.clone(), InMessage::Check); - let url = state.url.clone(); + send_after(self.periodicity, handle.clone(), InMessage::Check); + let url = self.url.clone(); tracing::info!("Fetching: {url}"); let resp = block_on(req(url)); tracing::info!("Response: {resp:?}"); - CastResponse::NoReply(state) + CastResponse::NoReply(self) } } }