From 222025ce611f2de7cb77951fc9be938f11fb83c1 Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Mon, 21 Jul 2025 12:11:28 -0300 Subject: [PATCH 1/9] add registry --- Cargo.lock | 1 + concurrency/Cargo.toml | 1 + concurrency/src/tasks/gen_server_registry.rs | 340 +++++++++++++++++++ concurrency/src/tasks/mod.rs | 2 + 4 files changed, 344 insertions(+) create mode 100644 concurrency/src/tasks/gen_server_registry.rs diff --git a/Cargo.lock b/Cargo.lock index c85263d..b53f5ad 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1200,6 +1200,7 @@ name = "spawned-concurrency" version = "0.1.7" dependencies = [ "futures", + "once_cell", "spawned-rt", "thiserror", "tokio", diff --git a/concurrency/Cargo.toml b/concurrency/Cargo.toml index 198c166..27aed3d 100644 --- a/concurrency/Cargo.toml +++ b/concurrency/Cargo.toml @@ -10,6 +10,7 @@ spawned-rt = { workspace = true } tracing = { workspace = true } futures = "0.3.1" thiserror = "2.0.12" +once_cell = "1.21.3" [dev-dependencies] # This tokio imports are only used in tests, we should not use them in the library code. diff --git a/concurrency/src/tasks/gen_server_registry.rs b/concurrency/src/tasks/gen_server_registry.rs new file mode 100644 index 0000000..5ab9e5f --- /dev/null +++ b/concurrency/src/tasks/gen_server_registry.rs @@ -0,0 +1,340 @@ +use std::collections::HashMap; + +use crate::tasks::{GenServer, GenServerHandle}; + +#[derive(Debug, thiserror::Error)] +pub enum GenServerRegistryError { + #[error("A GenServer is already Registered at this Address")] + AddressAlreadyTaken, + #[error("There is no GenServer associated with this Address")] + ServerNotFound, +} + +#[derive(Default)] +pub struct GenServerRegistry { + agenda: HashMap>, +} + +impl GenServerRegistry { + pub fn new() -> Self { + Self { + agenda: HashMap::new(), + } + } + + pub fn add_entry( + &mut self, + address: &str, + server_handle: GenServerHandle, + ) -> Result<(), GenServerRegistryError> { + if self.agenda.contains_key(address) { + return Err(GenServerRegistryError::AddressAlreadyTaken); + } + + self.agenda.insert(address.to_string(), server_handle); + Ok(()) + } + + pub fn remove_entry( + &mut self, + address: &str, + ) -> Result, GenServerRegistryError> { + self.agenda + .remove(address) + .ok_or(GenServerRegistryError::ServerNotFound) + } + + pub fn get_entry(&self, address: &str) -> Result, GenServerRegistryError> { + self.agenda + .get(address) + .cloned() + .ok_or(GenServerRegistryError::ServerNotFound) + } + + pub fn change_entry( + &mut self, + address: &str, + server_handle: GenServerHandle, + ) -> Result<(), GenServerRegistryError> { + // This function works like `add_entry`, without checking if the address already exists. + self.agenda.insert(address.to_string(), server_handle); + Ok(()) + } + + pub fn all_entries(&self) -> Vec> { + self.agenda.values().cloned().collect() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use once_cell::sync::Lazy; + use spawned_rt::tasks::{self as rt}; + use std::sync::Mutex; + + type AddressedGenServerHandle = GenServerHandle; + + #[derive(Default)] + struct AddressedGenServer; + + #[derive(Clone)] + enum AddressedGenServerCallMessage { + GetState, + } + + impl GenServer for AddressedGenServer { + type CallMsg = AddressedGenServerCallMessage; + type CastMsg = (); + type OutMsg = u8; + type State = u8; + type Error = (); + + async fn handle_call( + &mut self, + message: Self::CallMsg, + _handle: &GenServerHandle, + state: Self::State, + ) -> crate::tasks::CallResponse { + match message { + AddressedGenServerCallMessage::GetState => { + let out_msg = state; + crate::tasks::CallResponse::Reply(state, out_msg) + } + } + } + } + + #[test] + fn test_gen_server_directoy_add_entries() { + let runtime = rt::Runtime::new().unwrap(); + runtime.block_on(async move { + // We first instance a globally accessible GenServer directory + static GENSERVER_DIRECTORY: Lazy>> = + Lazy::new(|| Mutex::new(GenServerRegistry::new())); + + // We create the first server and add it to the directory + let gen_one_handle = AddressedGenServer::start(1); + assert!(GENSERVER_DIRECTORY + .lock() + .unwrap() + .add_entry("SERVER_ONE", gen_one_handle.clone()) + .is_ok()); + + // We create a second server and add it to the directory + let gen_two_handle = AddressedGenServer::start(2); + assert!(GENSERVER_DIRECTORY + .lock() + .unwrap() + .add_entry("SERVER_TWO", gen_two_handle.clone()) + .is_ok()); + + // We retrieve the first server from the directory, calling it we should retrieve its state correctly + let mut one_address = GENSERVER_DIRECTORY + .lock() + .unwrap() + .get_entry("SERVER_ONE") + .unwrap(); + assert_eq!( + AddressedGenServerHandle::call( + &mut one_address, + AddressedGenServerCallMessage::GetState + ) + .await + .unwrap(), + 1 + ); + + // Same goes for the second server + let mut two_address = GENSERVER_DIRECTORY + .lock() + .unwrap() + .get_entry("SERVER_TWO") + .unwrap(); + assert_eq!( + AddressedGenServerHandle::call( + &mut two_address, + AddressedGenServerCallMessage::GetState + ) + .await + .unwrap(), + 2 + ); + + // We can't retrieve a server that does not exist + assert!(GENSERVER_DIRECTORY + .lock() + .unwrap() + .get_entry("SERVER_THREE") + .is_err()); + }) + } + + #[test] + fn test_gen_server_directory_remove_entry() { + let runtime = rt::Runtime::new().unwrap(); + runtime.block_on(async move { + // We first instance a globally accessible GenServer directory + static GENSERVER_DIRECTORY: Lazy>> = + Lazy::new(|| Mutex::new(GenServerRegistry::new())); + + // We create the first server and add it to the directory + let gen_one_handle = AddressedGenServer::start(1); + assert!(GENSERVER_DIRECTORY + .lock() + .unwrap() + .add_entry("SERVER_ONE", gen_one_handle.clone()) + .is_ok()); + + // We retrieve the first server from the directory, calling it we should retrieve its state correctly + let mut one_address = GENSERVER_DIRECTORY + .lock() + .unwrap() + .get_entry("SERVER_ONE") + .unwrap(); + assert_eq!( + AddressedGenServerHandle::call( + &mut one_address, + AddressedGenServerCallMessage::GetState + ) + .await + .unwrap(), + 1 + ); + + // We remove the server from the directory + let _ = GENSERVER_DIRECTORY + .lock() + .unwrap() + .remove_entry("SERVER_ONE") + .unwrap(); + + // We can no longer retrieve the server from the directory + assert!(GENSERVER_DIRECTORY + .lock() + .unwrap() + .get_entry("SERVER_ONE") + .is_err()); + + // We can still call the removed server handle, and it should return its state + assert_eq!( + AddressedGenServerHandle::call( + &mut gen_one_handle.clone(), + AddressedGenServerCallMessage::GetState + ) + .await + .unwrap(), + 1 + ); + + // We can't remove a server that does not exist + assert!(GENSERVER_DIRECTORY + .lock() + .unwrap() + .remove_entry("SERVER_THREE") + .is_err()); + }); + } + + #[test] + fn test_gen_server_directory_modify_entry() { + let runtime = rt::Runtime::new().unwrap(); + runtime.block_on(async { + // We first instance a globally accessible GenServer directory + static GENSERVER_DIRECTORY: Lazy>> = + Lazy::new(|| Mutex::new(GenServerRegistry::new())); + + // We create the server and add it to the directory + let gen_one_handle = AddressedGenServer::start(1); + assert!(GENSERVER_DIRECTORY + .lock() + .unwrap() + .add_entry("CHANGES", gen_one_handle.clone()) + .is_ok()); + + // We retrieve the server from the directory, calling it we should retrieve its state correctly + let mut retrieved_server = GENSERVER_DIRECTORY + .lock() + .unwrap() + .get_entry("CHANGES") + .unwrap(); + assert_eq!( + AddressedGenServerHandle::call( + &mut retrieved_server, + AddressedGenServerCallMessage::GetState + ) + .await + .unwrap(), + 1 + ); + + // We create a new server and change the entry in the directory + let gen_two_handle = AddressedGenServer::start(2); + assert!(GENSERVER_DIRECTORY + .lock() + .unwrap() + .change_entry("CHANGES", gen_two_handle.clone()) + .is_ok()); + + // We retrieve the second server from the directory, calling it we should retrieve its state correctly + let mut retrieved_server = GENSERVER_DIRECTORY + .lock() + .unwrap() + .get_entry("CHANGES") + .unwrap(); + + assert_eq!( + AddressedGenServerHandle::call( + &mut retrieved_server, + AddressedGenServerCallMessage::GetState + ) + .await + .unwrap(), + 2 + ); + }); + } + + #[test] + fn test_gen_server_directory_all_entries() { + let runtime = rt::Runtime::new().unwrap(); + runtime.block_on(async move { + // We first instance a globally accessible GenServer directory + static GENSERVER_DIRECTORY: Lazy>> = + Lazy::new(|| Mutex::new(GenServerRegistry::new())); + + // We create the first server and add it to the directory + let gen_one_handle = AddressedGenServer::start(1); + assert!(GENSERVER_DIRECTORY + .lock() + .unwrap() + .add_entry("SERVER_ONE", gen_one_handle.clone()) + .is_ok()); + + // We create a second server and add it to the directory + let gen_two_handle = AddressedGenServer::start(2); + assert!(GENSERVER_DIRECTORY + .lock() + .unwrap() + .add_entry("SERVER_TWO", gen_two_handle.clone()) + .is_ok()); + + // We retrieve all entries from the directory + let all_entries = GENSERVER_DIRECTORY.lock().unwrap().all_entries(); + assert_eq!(all_entries.len(), 2); + + let mut sum = 0; + for entry in all_entries { + let mut server_handle = entry; + sum += AddressedGenServerHandle::call( + &mut server_handle, + AddressedGenServerCallMessage::GetState, + ) + .await + .unwrap(); + } + + assert_eq!(sum, 3); + }); + } +} diff --git a/concurrency/src/tasks/mod.rs b/concurrency/src/tasks/mod.rs index 201c7d2..9d13c59 100644 --- a/concurrency/src/tasks/mod.rs +++ b/concurrency/src/tasks/mod.rs @@ -2,6 +2,7 @@ //! Runtime tasks-based traits and structs to implement concurrent code à-la-Erlang. mod gen_server; +mod gen_server_registry; mod process; mod stream; mod time; @@ -12,6 +13,7 @@ mod stream_tests; mod timer_tests; pub use gen_server::{CallResponse, CastResponse, GenServer, GenServerHandle, GenServerInMsg}; +pub use gen_server_registry::GenServerRegistry; pub use process::{send, Process, ProcessInfo}; pub use stream::spawn_listener; pub use time::{send_after, send_interval}; From e8ae0ad087b5cb87eedd9ca0b529b7361c463834 Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Mon, 21 Jul 2025 13:08:12 -0300 Subject: [PATCH 2/9] improve doc --- concurrency/src/tasks/gen_server_registry.rs | 90 +++++++++++++++++++- 1 file changed, 89 insertions(+), 1 deletion(-) diff --git a/concurrency/src/tasks/gen_server_registry.rs b/concurrency/src/tasks/gen_server_registry.rs index 5ab9e5f..fdadaf5 100644 --- a/concurrency/src/tasks/gen_server_registry.rs +++ b/concurrency/src/tasks/gen_server_registry.rs @@ -10,18 +10,97 @@ pub enum GenServerRegistryError { ServerNotFound, } +/// This struct represents a registry for GenServers, allowing +/// you to register, unregister, and retrieve GenServer handles +/// by their address. +/// +/// Besides being useful for managing multiple GenServers, +/// it also adds the possibility of retrieving a GenServerHandle +/// globally by its address. +/// +/// # Example +/// +/// ```rust +/// use once_cell::sync::Lazy; +/// use spawned_concurrency::tasks::{GenServer, GenServerRegistry}; +/// use spawned_rt::tasks::{self as rt}; +/// use std::sync::Mutex; +/// +/// #[derive(Clone)] +/// enum BankInMessage { +/// GetBalance, +/// } +/// +/// type BankBalance = i32; +/// +/// #[derive(Default)] +/// struct Bank; +/// +/// impl GenServer for Bank { +/// type CallMsg = BankInMessage; +/// type CastMsg = (); +/// type OutMsg = BankBalance; +/// type Error = (); +/// type State = BankBalance; +/// +/// async fn handle_call( +/// &mut self, +/// message: Self::CallMsg, +/// _handle: &spawned_concurrency::tasks::GenServerHandle, +/// state: Self::State, +/// ) -> spawned_concurrency::tasks::CallResponse { +/// match message { +/// BankInMessage::GetBalance => { +/// let balance = state; +/// spawned_concurrency::tasks::CallResponse::Reply(state, balance) +/// } +/// } +/// } +/// } +/// +/// static GENSERVER_DIRECTORY: Lazy>> = +/// Lazy::new(|| Mutex::new(GenServerRegistry::new())); +/// +/// fn main() { +/// let runtime = rt::Runtime::new().unwrap(); +/// runtime.block_on(async move { +/// let some_bank = Bank::start(1000); +/// +/// GENSERVER_DIRECTORY +/// .lock() +/// .unwrap() +/// .add_entry("some_bank", some_bank.clone()) +/// .unwrap(); +/// +/// somewhere_else().await; +/// }); +/// } +/// +/// async fn somewhere_else() { +/// let mut bank_handle = GENSERVER_DIRECTORY +/// .lock() +/// .unwrap() +/// .get_entry("some_bank") +/// .unwrap(); +/// let balance = bank_handle.call(BankInMessage::GetBalance).await.unwrap(); +/// println!("Balance: {}", balance) +/// } +/// ``` #[derive(Default)] pub struct GenServerRegistry { agenda: HashMap>, } impl GenServerRegistry { + /// Creates a new empty GenServer registry. pub fn new() -> Self { Self { agenda: HashMap::new(), } } + /// Adds a new entry to the registry. + /// Fails if the address is already taken. pub fn add_entry( &mut self, address: &str, @@ -35,6 +114,8 @@ impl GenServerRegistry { Ok(()) } + /// Removes an entry from the registry. + /// Fails if the address does not exist. pub fn remove_entry( &mut self, address: &str, @@ -44,6 +125,8 @@ impl GenServerRegistry { .ok_or(GenServerRegistryError::ServerNotFound) } + /// Retrieves an entry from the registry. + /// Fails if the address does not exist. pub fn get_entry(&self, address: &str) -> Result, GenServerRegistryError> { self.agenda .get(address) @@ -51,16 +134,21 @@ impl GenServerRegistry { .ok_or(GenServerRegistryError::ServerNotFound) } + /// Modifies an existing entry in the registry. + /// If the address does not exist, it behaves like `add_entry`. pub fn change_entry( &mut self, address: &str, server_handle: GenServerHandle, ) -> Result<(), GenServerRegistryError> { - // This function works like `add_entry`, without checking if the address already exists. self.agenda.insert(address.to_string(), server_handle); Ok(()) } + /// Returns all entries in the registry as a vector. + /// + /// This is useful for cases where you need to call all + /// registered GenServers. pub fn all_entries(&self) -> Vec> { self.agenda.values().cloned().collect() } From d9d7c43c4c2c76e972efeb604526c6bf3bf1450b Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Mon, 21 Jul 2025 13:24:36 -0300 Subject: [PATCH 3/9] fix toml --- concurrency/Cargo.toml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/concurrency/Cargo.toml b/concurrency/Cargo.toml index 27aed3d..34d3b59 100644 --- a/concurrency/Cargo.toml +++ b/concurrency/Cargo.toml @@ -10,12 +10,13 @@ spawned-rt = { workspace = true } tracing = { workspace = true } futures = "0.3.1" thiserror = "2.0.12" -once_cell = "1.21.3" [dev-dependencies] # This tokio imports are only used in tests, we should not use them in the library code. tokio-stream = { version = "0.1.17" } tokio = { version = "1", features = ["full"] } +# Used for the `GenServerRegistry` testing. +once_cell = "1.21.3" [lib] path = "./src/lib.rs" From 073c73413a1d1e94bafbcf1720c49db752de6964 Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Wed, 23 Jul 2025 18:21:10 -0300 Subject: [PATCH 4/9] fix tests after interface change --- concurrency/src/tasks/gen_server_registry.rs | 59 ++++++++++++-------- 1 file changed, 35 insertions(+), 24 deletions(-) diff --git a/concurrency/src/tasks/gen_server_registry.rs b/concurrency/src/tasks/gen_server_registry.rs index fdadaf5..0cd88ef 100644 --- a/concurrency/src/tasks/gen_server_registry.rs +++ b/concurrency/src/tasks/gen_server_registry.rs @@ -31,28 +31,33 @@ pub enum GenServerRegistryError { /// GetBalance, /// } /// -/// type BankBalance = i32; /// -/// #[derive(Default)] -/// struct Bank; +/// #[derive(Clone)] +/// struct Bank { +/// balance: i32, +/// } +/// +/// impl Bank { +/// pub fn new(initial_balance: i32) -> Self { +/// Bank { balance: initial_balance } +/// } +/// } /// /// impl GenServer for Bank { /// type CallMsg = BankInMessage; /// type CastMsg = (); -/// type OutMsg = BankBalance; +/// type OutMsg = i32; /// type Error = (); -/// type State = BankBalance; /// /// async fn handle_call( -/// &mut self, +/// self, /// message: Self::CallMsg, /// _handle: &spawned_concurrency::tasks::GenServerHandle, -/// state: Self::State, /// ) -> spawned_concurrency::tasks::CallResponse { /// match message { /// BankInMessage::GetBalance => { -/// let balance = state; -/// spawned_concurrency::tasks::CallResponse::Reply(state, balance) +/// let balance = self.balance; +/// spawned_concurrency::tasks::CallResponse::Reply(self, balance) /// } /// } /// } @@ -64,7 +69,7 @@ pub enum GenServerRegistryError { /// fn main() { /// let runtime = rt::Runtime::new().unwrap(); /// runtime.block_on(async move { -/// let some_bank = Bank::start(1000); +/// let some_bank = Bank::new(1000).start(); /// /// GENSERVER_DIRECTORY /// .lock() @@ -163,8 +168,16 @@ mod tests { type AddressedGenServerHandle = GenServerHandle; - #[derive(Default)] - struct AddressedGenServer; + #[derive(Clone)] + struct AddressedGenServer { + value: u8, + } + + impl AddressedGenServer { + pub fn new(value: u8) -> Self { + AddressedGenServer { value } + } + } #[derive(Clone)] enum AddressedGenServerCallMessage { @@ -175,19 +188,17 @@ mod tests { type CallMsg = AddressedGenServerCallMessage; type CastMsg = (); type OutMsg = u8; - type State = u8; type Error = (); async fn handle_call( - &mut self, + self, message: Self::CallMsg, _handle: &GenServerHandle, - state: Self::State, ) -> crate::tasks::CallResponse { match message { AddressedGenServerCallMessage::GetState => { - let out_msg = state; - crate::tasks::CallResponse::Reply(state, out_msg) + let out_msg = self.value; + crate::tasks::CallResponse::Reply(self, out_msg) } } } @@ -202,7 +213,7 @@ mod tests { Lazy::new(|| Mutex::new(GenServerRegistry::new())); // We create the first server and add it to the directory - let gen_one_handle = AddressedGenServer::start(1); + let gen_one_handle = AddressedGenServer::new(1).start(); assert!(GENSERVER_DIRECTORY .lock() .unwrap() @@ -210,7 +221,7 @@ mod tests { .is_ok()); // We create a second server and add it to the directory - let gen_two_handle = AddressedGenServer::start(2); + let gen_two_handle = AddressedGenServer::new(2).start(); assert!(GENSERVER_DIRECTORY .lock() .unwrap() @@ -267,7 +278,7 @@ mod tests { Lazy::new(|| Mutex::new(GenServerRegistry::new())); // We create the first server and add it to the directory - let gen_one_handle = AddressedGenServer::start(1); + let gen_one_handle = AddressedGenServer::new(1).start(); assert!(GENSERVER_DIRECTORY .lock() .unwrap() @@ -333,7 +344,7 @@ mod tests { Lazy::new(|| Mutex::new(GenServerRegistry::new())); // We create the server and add it to the directory - let gen_one_handle = AddressedGenServer::start(1); + let gen_one_handle = AddressedGenServer::new(1).start(); assert!(GENSERVER_DIRECTORY .lock() .unwrap() @@ -357,7 +368,7 @@ mod tests { ); // We create a new server and change the entry in the directory - let gen_two_handle = AddressedGenServer::start(2); + let gen_two_handle = AddressedGenServer::new(2).start(); assert!(GENSERVER_DIRECTORY .lock() .unwrap() @@ -392,7 +403,7 @@ mod tests { Lazy::new(|| Mutex::new(GenServerRegistry::new())); // We create the first server and add it to the directory - let gen_one_handle = AddressedGenServer::start(1); + let gen_one_handle = AddressedGenServer::new(1).start(); assert!(GENSERVER_DIRECTORY .lock() .unwrap() @@ -400,7 +411,7 @@ mod tests { .is_ok()); // We create a second server and add it to the directory - let gen_two_handle = AddressedGenServer::start(2); + let gen_two_handle = AddressedGenServer::new(2).start(); assert!(GENSERVER_DIRECTORY .lock() .unwrap() From 60a27563ec1ee14755b06267bda6eb124ac46f33 Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Thu, 24 Jul 2025 10:08:17 -0300 Subject: [PATCH 5/9] refactor registry --- Cargo.lock | 39 +- concurrency/Cargo.toml | 5 +- concurrency/src/tasks/gen_server_registry.rs | 499 +++++-------------- concurrency/src/tasks/mod.rs | 2 +- 4 files changed, 158 insertions(+), 387 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f83d22c..4d351f0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -152,7 +152,7 @@ dependencies = [ "crossbeam-deque", "crossbeam-epoch", "crossbeam-queue", - "crossbeam-utils", + "crossbeam-utils 0.7.2", ] [[package]] @@ -161,7 +161,7 @@ version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b153fe7cbef478c567df0f972e02e6d736db11affe43dfc9c56a9374d1adfb87" dependencies = [ - "crossbeam-utils", + "crossbeam-utils 0.7.2", "maybe-uninit", ] @@ -172,7 +172,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c20ff29ded3204c5106278a81a38f4b482636ed4fa1e6cfbeef193291beb29ed" dependencies = [ "crossbeam-epoch", - "crossbeam-utils", + "crossbeam-utils 0.7.2", "maybe-uninit", ] @@ -184,7 +184,7 @@ checksum = "058ed274caafc1f60c4997b5fc07bf7dc7cca454af7c6e81edffe5f33f70dace" dependencies = [ "autocfg", "cfg-if 0.1.10", - "crossbeam-utils", + "crossbeam-utils 0.7.2", "lazy_static", "maybe-uninit", "memoffset", @@ -198,7 +198,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "774ba60a54c213d409d5353bda12d49cd68d14e45036a285234c8d6f91f92570" dependencies = [ "cfg-if 0.1.10", - "crossbeam-utils", + "crossbeam-utils 0.7.2", "maybe-uninit", ] @@ -213,6 +213,26 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "crossbeam-utils" +version = "0.8.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" + +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if 1.0.0", + "crossbeam-utils 0.8.21", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "displaydoc" version = "0.2.5" @@ -411,6 +431,12 @@ dependencies = [ "tracing", ] +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + [[package]] name = "hashbrown" version = "0.15.3" @@ -602,7 +628,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cea70ddb795996207ad57735b50c5982d8844f38ba9ee5f1aedcfb708a2aa11e" dependencies = [ "equivalent", - "hashbrown", + "hashbrown 0.15.3", ] [[package]] @@ -1199,6 +1225,7 @@ dependencies = [ name = "spawned-concurrency" version = "0.2.1" dependencies = [ + "dashmap", "futures", "once_cell", "spawned-rt", diff --git a/concurrency/Cargo.toml b/concurrency/Cargo.toml index 34d3b59..1fc8fd4 100644 --- a/concurrency/Cargo.toml +++ b/concurrency/Cargo.toml @@ -10,13 +10,14 @@ spawned-rt = { workspace = true } tracing = { workspace = true } futures = "0.3.1" thiserror = "2.0.12" +dashmap = "6.1.0" +# Used for the `GenServerRegistry` testing. +once_cell = "1.21.3" [dev-dependencies] # This tokio imports are only used in tests, we should not use them in the library code. tokio-stream = { version = "0.1.17" } tokio = { version = "1", features = ["full"] } -# Used for the `GenServerRegistry` testing. -once_cell = "1.21.3" [lib] path = "./src/lib.rs" diff --git a/concurrency/src/tasks/gen_server_registry.rs b/concurrency/src/tasks/gen_server_registry.rs index 0cd88ef..fc69603 100644 --- a/concurrency/src/tasks/gen_server_registry.rs +++ b/concurrency/src/tasks/gen_server_registry.rs @@ -1,4 +1,6 @@ -use std::collections::HashMap; +use dashmap::DashMap; +use once_cell::sync::OnceCell; +use std::sync::Arc; use crate::tasks::{GenServer, GenServerHandle}; @@ -10,184 +12,83 @@ pub enum GenServerRegistryError { ServerNotFound, } -/// This struct represents a registry for GenServers, allowing -/// you to register, unregister, and retrieve GenServer handles -/// by their address. -/// -/// Besides being useful for managing multiple GenServers, -/// it also adds the possibility of retrieving a GenServerHandle -/// globally by its address. -/// -/// # Example -/// -/// ```rust -/// use once_cell::sync::Lazy; -/// use spawned_concurrency::tasks::{GenServer, GenServerRegistry}; -/// use spawned_rt::tasks::{self as rt}; -/// use std::sync::Mutex; -/// -/// #[derive(Clone)] -/// enum BankInMessage { -/// GetBalance, -/// } -/// -/// -/// #[derive(Clone)] -/// struct Bank { -/// balance: i32, -/// } -/// -/// impl Bank { -/// pub fn new(initial_balance: i32) -> Self { -/// Bank { balance: initial_balance } -/// } -/// } -/// -/// impl GenServer for Bank { -/// type CallMsg = BankInMessage; -/// type CastMsg = (); -/// type OutMsg = i32; -/// type Error = (); -/// -/// async fn handle_call( -/// self, -/// message: Self::CallMsg, -/// _handle: &spawned_concurrency::tasks::GenServerHandle, -/// ) -> spawned_concurrency::tasks::CallResponse { -/// match message { -/// BankInMessage::GetBalance => { -/// let balance = self.balance; -/// spawned_concurrency::tasks::CallResponse::Reply(self, balance) -/// } -/// } -/// } -/// } -/// -/// static GENSERVER_DIRECTORY: Lazy>> = -/// Lazy::new(|| Mutex::new(GenServerRegistry::new())); -/// -/// fn main() { -/// let runtime = rt::Runtime::new().unwrap(); -/// runtime.block_on(async move { -/// let some_bank = Bank::new(1000).start(); -/// -/// GENSERVER_DIRECTORY -/// .lock() -/// .unwrap() -/// .add_entry("some_bank", some_bank.clone()) -/// .unwrap(); -/// -/// somewhere_else().await; -/// }); -/// } -/// -/// async fn somewhere_else() { -/// let mut bank_handle = GENSERVER_DIRECTORY -/// .lock() -/// .unwrap() -/// .get_entry("some_bank") -/// .unwrap(); -/// let balance = bank_handle.call(BankInMessage::GetBalance).await.unwrap(); -/// println!("Balance: {}", balance) -/// } -/// ``` -#[derive(Default)] -pub struct GenServerRegistry { - agenda: HashMap>, +// Wrapper trait to allow downcasting of `GenServerHandle. +// Needed as otherwise `GenServerHandle` is sized does +// not `dyn` compatible. +trait AnyGenServerHandle: Send + Sync { + fn as_any(&self) -> &dyn std::any::Any; } -impl GenServerRegistry { - /// Creates a new empty GenServer registry. - pub fn new() -> Self { - Self { - agenda: HashMap::new(), - } - } - - /// Adds a new entry to the registry. - /// Fails if the address is already taken. - pub fn add_entry( - &mut self, - address: &str, - server_handle: GenServerHandle, - ) -> Result<(), GenServerRegistryError> { - if self.agenda.contains_key(address) { - return Err(GenServerRegistryError::AddressAlreadyTaken); - } - - self.agenda.insert(address.to_string(), server_handle); - Ok(()) - } - - /// Removes an entry from the registry. - /// Fails if the address does not exist. - pub fn remove_entry( - &mut self, - address: &str, - ) -> Result, GenServerRegistryError> { - self.agenda - .remove(address) - .ok_or(GenServerRegistryError::ServerNotFound) - } - - /// Retrieves an entry from the registry. - /// Fails if the address does not exist. - pub fn get_entry(&self, address: &str) -> Result, GenServerRegistryError> { - self.agenda - .get(address) - .cloned() - .ok_or(GenServerRegistryError::ServerNotFound) +impl AnyGenServerHandle for GenServerHandle { + fn as_any(&self) -> &dyn std::any::Any { + self } +} - /// Modifies an existing entry in the registry. - /// If the address does not exist, it behaves like `add_entry`. - pub fn change_entry( - &mut self, - address: &str, - server_handle: GenServerHandle, - ) -> Result<(), GenServerRegistryError> { - self.agenda.insert(address.to_string(), server_handle); - Ok(()) +static GENSERVER_REGISTRY: OnceCell>>> = + OnceCell::new(); + +pub fn add_registry_entry( + address: &str, + server_handle: GenServerHandle, +) { + let registry = GENSERVER_REGISTRY.get_or_init(|| Arc::new(DashMap::new())); + if registry.contains_key(address) { + panic!( + "A GenServer is already registered at this address: {}", + address + ); } + registry.insert(address.to_string(), Box::new(server_handle)); +} - /// Returns all entries in the registry as a vector. - /// - /// This is useful for cases where you need to call all - /// registered GenServers. - pub fn all_entries(&self) -> Vec> { - self.agenda.values().cloned().collect() - } +pub fn get_registry_entry( + address: &str, +) -> Result, GenServerRegistryError> { + let registry = GENSERVER_REGISTRY.get_or_init(|| Arc::new(DashMap::new())); + registry + .get(address) + .ok_or(GenServerRegistryError::ServerNotFound)? + .as_any() + .downcast_ref::>() + .cloned() + .ok_or(GenServerRegistryError::ServerNotFound) } #[cfg(test)] mod tests { use super::*; - use once_cell::sync::Lazy; use spawned_rt::tasks::{self as rt}; - use std::sync::Mutex; - - type AddressedGenServerHandle = GenServerHandle; #[derive(Clone)] - struct AddressedGenServer { - value: u8, + enum InMsg { + GetCount, } - impl AddressedGenServer { - pub fn new(value: u8) -> Self { - AddressedGenServer { value } - } + #[derive(Clone)] + enum OutMsg { + Count(u32), } #[derive(Clone)] - enum AddressedGenServerCallMessage { - GetState, + struct FruitBasket { + _name: String, + count: u32, + } + + impl FruitBasket { + pub fn new(name: &str, initial_count: u32) -> Self { + Self { + _name: name.to_string(), + count: initial_count, + } + } } - impl GenServer for AddressedGenServer { - type CallMsg = AddressedGenServerCallMessage; + impl GenServer for FruitBasket { + type CallMsg = InMsg; type CastMsg = (); - type OutMsg = u8; + type OutMsg = OutMsg; type Error = (); async fn handle_call( @@ -196,244 +97,86 @@ mod tests { _handle: &GenServerHandle, ) -> crate::tasks::CallResponse { match message { - AddressedGenServerCallMessage::GetState => { - let out_msg = self.value; - crate::tasks::CallResponse::Reply(self, out_msg) + InMsg::GetCount => { + let count = self.count; + crate::tasks::CallResponse::Reply(self, OutMsg::Count(count)) } } } } - #[test] - fn test_gen_server_directoy_add_entries() { - let runtime = rt::Runtime::new().unwrap(); - runtime.block_on(async move { - // We first instance a globally accessible GenServer directory - static GENSERVER_DIRECTORY: Lazy>> = - Lazy::new(|| Mutex::new(GenServerRegistry::new())); - - // We create the first server and add it to the directory - let gen_one_handle = AddressedGenServer::new(1).start(); - assert!(GENSERVER_DIRECTORY - .lock() - .unwrap() - .add_entry("SERVER_ONE", gen_one_handle.clone()) - .is_ok()); - - // We create a second server and add it to the directory - let gen_two_handle = AddressedGenServer::new(2).start(); - assert!(GENSERVER_DIRECTORY - .lock() - .unwrap() - .add_entry("SERVER_TWO", gen_two_handle.clone()) - .is_ok()); - - // We retrieve the first server from the directory, calling it we should retrieve its state correctly - let mut one_address = GENSERVER_DIRECTORY - .lock() - .unwrap() - .get_entry("SERVER_ONE") - .unwrap(); - assert_eq!( - AddressedGenServerHandle::call( - &mut one_address, - AddressedGenServerCallMessage::GetState - ) - .await - .unwrap(), - 1 - ); - - // Same goes for the second server - let mut two_address = GENSERVER_DIRECTORY - .lock() - .unwrap() - .get_entry("SERVER_TWO") - .unwrap(); - assert_eq!( - AddressedGenServerHandle::call( - &mut two_address, - AddressedGenServerCallMessage::GetState - ) - .await - .unwrap(), - 2 - ); - - // We can't retrieve a server that does not exist - assert!(GENSERVER_DIRECTORY - .lock() - .unwrap() - .get_entry("SERVER_THREE") - .is_err()); - }) + #[derive(Clone)] + struct VegetableBasket { + _name: String, + count: u32, } - #[test] - fn test_gen_server_directory_remove_entry() { - let runtime = rt::Runtime::new().unwrap(); - runtime.block_on(async move { - // We first instance a globally accessible GenServer directory - static GENSERVER_DIRECTORY: Lazy>> = - Lazy::new(|| Mutex::new(GenServerRegistry::new())); - - // We create the first server and add it to the directory - let gen_one_handle = AddressedGenServer::new(1).start(); - assert!(GENSERVER_DIRECTORY - .lock() - .unwrap() - .add_entry("SERVER_ONE", gen_one_handle.clone()) - .is_ok()); - - // We retrieve the first server from the directory, calling it we should retrieve its state correctly - let mut one_address = GENSERVER_DIRECTORY - .lock() - .unwrap() - .get_entry("SERVER_ONE") - .unwrap(); - assert_eq!( - AddressedGenServerHandle::call( - &mut one_address, - AddressedGenServerCallMessage::GetState - ) - .await - .unwrap(), - 1 - ); - - // We remove the server from the directory - let _ = GENSERVER_DIRECTORY - .lock() - .unwrap() - .remove_entry("SERVER_ONE") - .unwrap(); - - // We can no longer retrieve the server from the directory - assert!(GENSERVER_DIRECTORY - .lock() - .unwrap() - .get_entry("SERVER_ONE") - .is_err()); - - // We can still call the removed server handle, and it should return its state - assert_eq!( - AddressedGenServerHandle::call( - &mut gen_one_handle.clone(), - AddressedGenServerCallMessage::GetState - ) - .await - .unwrap(), - 1 - ); - - // We can't remove a server that does not exist - assert!(GENSERVER_DIRECTORY - .lock() - .unwrap() - .remove_entry("SERVER_THREE") - .is_err()); - }); + impl VegetableBasket { + pub fn new(name: &str, initial_count: u32) -> Self { + Self { + _name: name.to_string(), + count: initial_count, + } + } } - #[test] - fn test_gen_server_directory_modify_entry() { - let runtime = rt::Runtime::new().unwrap(); - runtime.block_on(async { - // We first instance a globally accessible GenServer directory - static GENSERVER_DIRECTORY: Lazy>> = - Lazy::new(|| Mutex::new(GenServerRegistry::new())); - - // We create the server and add it to the directory - let gen_one_handle = AddressedGenServer::new(1).start(); - assert!(GENSERVER_DIRECTORY - .lock() - .unwrap() - .add_entry("CHANGES", gen_one_handle.clone()) - .is_ok()); - - // We retrieve the server from the directory, calling it we should retrieve its state correctly - let mut retrieved_server = GENSERVER_DIRECTORY - .lock() - .unwrap() - .get_entry("CHANGES") - .unwrap(); - assert_eq!( - AddressedGenServerHandle::call( - &mut retrieved_server, - AddressedGenServerCallMessage::GetState - ) - .await - .unwrap(), - 1 - ); - - // We create a new server and change the entry in the directory - let gen_two_handle = AddressedGenServer::new(2).start(); - assert!(GENSERVER_DIRECTORY - .lock() - .unwrap() - .change_entry("CHANGES", gen_two_handle.clone()) - .is_ok()); - - // We retrieve the second server from the directory, calling it we should retrieve its state correctly - let mut retrieved_server = GENSERVER_DIRECTORY - .lock() - .unwrap() - .get_entry("CHANGES") - .unwrap(); + impl GenServer for VegetableBasket { + type CallMsg = InMsg; + type CastMsg = (); + type OutMsg = OutMsg; + type Error = (); - assert_eq!( - AddressedGenServerHandle::call( - &mut retrieved_server, - AddressedGenServerCallMessage::GetState - ) - .await - .unwrap(), - 2 - ); - }); + async fn handle_call( + self, + message: Self::CallMsg, + _handle: &GenServerHandle, + ) -> crate::tasks::CallResponse { + match message { + InMsg::GetCount => { + let count = self.count; + crate::tasks::CallResponse::Reply(self, OutMsg::Count(count)) + } + } + } } #[test] - fn test_gen_server_directory_all_entries() { + fn test_gen_server_registry() { let runtime = rt::Runtime::new().unwrap(); runtime.block_on(async move { - // We first instance a globally accessible GenServer directory - static GENSERVER_DIRECTORY: Lazy>> = - Lazy::new(|| Mutex::new(GenServerRegistry::new())); - - // We create the first server and add it to the directory - let gen_one_handle = AddressedGenServer::new(1).start(); - assert!(GENSERVER_DIRECTORY - .lock() - .unwrap() - .add_entry("SERVER_ONE", gen_one_handle.clone()) - .is_ok()); - - // We create a second server and add it to the directory - let gen_two_handle = AddressedGenServer::new(2).start(); - assert!(GENSERVER_DIRECTORY - .lock() - .unwrap() - .add_entry("SERVER_TWO", gen_two_handle.clone()) - .is_ok()); - - // We retrieve all entries from the directory - let all_entries = GENSERVER_DIRECTORY.lock().unwrap().all_entries(); - assert_eq!(all_entries.len(), 2); - - let mut sum = 0; - for entry in all_entries { - let mut server_handle = entry; - sum += AddressedGenServerHandle::call( - &mut server_handle, - AddressedGenServerCallMessage::GetState, - ) - .await - .unwrap(); + let banana_fruit_basket = FruitBasket::new("Banana", 10).start(); + let lettuce_vegetable_basket = VegetableBasket::new("Lettuce", 20).start(); + + // We can store different GenServer types in the registry + add_registry_entry("banana_fruit_basket", banana_fruit_basket.clone()); + add_registry_entry("lettuce_vegetable_basket", lettuce_vegetable_basket.clone()); + + // Retrieve the FruitBasket GenServer + let mut retrieved_fruit_basket: GenServerHandle = + get_registry_entry("banana_fruit_basket").unwrap(); + let call_result = retrieved_fruit_basket + .call(InMsg::GetCount) + .await; + assert!(call_result.is_ok()); + if let Ok(OutMsg::Count(count)) = call_result { + assert_eq!(count, 10); + } else { + panic!("Expected OutMsg::Count"); } - assert_eq!(sum, 3); + // Retrieve the VegetableBasket GenServer + let mut retrieved_vegetable_basket: GenServerHandle = + get_registry_entry("lettuce_vegetable_basket").unwrap(); + + let call_result = retrieved_vegetable_basket + .call(InMsg::GetCount) + .await; + assert!(call_result.is_ok()); + if let Ok(OutMsg::Count(count)) = call_result { + assert_eq!(count, 20); + } else { + panic!("Expected OutMsg::Count"); + } }); } } diff --git a/concurrency/src/tasks/mod.rs b/concurrency/src/tasks/mod.rs index 9d13c59..a4d15d5 100644 --- a/concurrency/src/tasks/mod.rs +++ b/concurrency/src/tasks/mod.rs @@ -13,7 +13,7 @@ mod stream_tests; mod timer_tests; pub use gen_server::{CallResponse, CastResponse, GenServer, GenServerHandle, GenServerInMsg}; -pub use gen_server_registry::GenServerRegistry; +pub use gen_server_registry::{add_registry_entry, get_registry_entry}; pub use process::{send, Process, ProcessInfo}; pub use stream::spawn_listener; pub use time::{send_after, send_interval}; From 46c53b55f1c3f7dca355c7140755a36a978bac1d Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Thu, 24 Jul 2025 10:44:46 -0300 Subject: [PATCH 6/9] wrap functionallity in struct --- concurrency/src/tasks/gen_server_registry.rs | 81 +++++++++++--------- concurrency/src/tasks/mod.rs | 2 +- 2 files changed, 46 insertions(+), 37 deletions(-) diff --git a/concurrency/src/tasks/gen_server_registry.rs b/concurrency/src/tasks/gen_server_registry.rs index fc69603..31c239e 100644 --- a/concurrency/src/tasks/gen_server_registry.rs +++ b/concurrency/src/tasks/gen_server_registry.rs @@ -1,3 +1,6 @@ +// Inspired by ractor pid_registry: +// https://github.com/slawlor/ractor/blob/main/ractor/src/registry/pid_registry.rs + use dashmap::DashMap; use once_cell::sync::OnceCell; use std::sync::Arc; @@ -12,9 +15,9 @@ pub enum GenServerRegistryError { ServerNotFound, } -// Wrapper trait to allow downcasting of `GenServerHandle. -// Needed as otherwise `GenServerHandle` is sized does -// not `dyn` compatible. +// Wrapper trait to allow downcasting of `GenServerHandle`. +// Needed as otherwise `GenServerHandle` is sized and +// thus not `dyn` compatible. trait AnyGenServerHandle: Send + Sync { fn as_any(&self) -> &dyn std::any::Any; } @@ -28,31 +31,33 @@ impl AnyGenServerHandle for GenServerHandle { static GENSERVER_REGISTRY: OnceCell>>> = OnceCell::new(); -pub fn add_registry_entry( - address: &str, - server_handle: GenServerHandle, -) { - let registry = GENSERVER_REGISTRY.get_or_init(|| Arc::new(DashMap::new())); - if registry.contains_key(address) { - panic!( - "A GenServer is already registered at this address: {}", - address - ); +pub struct GenServerRegistry; + +impl GenServerRegistry { + pub fn add_entry( + address: &str, + server_handle: GenServerHandle, + ) -> Result<(), GenServerRegistryError> { + let registry = GENSERVER_REGISTRY.get_or_init(|| Arc::new(DashMap::new())); + if registry.contains_key(address) { + return Err(GenServerRegistryError::AddressAlreadyTaken); + } + registry.insert(address.to_string(), Box::new(server_handle)); + Ok(()) } - registry.insert(address.to_string(), Box::new(server_handle)); -} -pub fn get_registry_entry( - address: &str, -) -> Result, GenServerRegistryError> { - let registry = GENSERVER_REGISTRY.get_or_init(|| Arc::new(DashMap::new())); - registry - .get(address) - .ok_or(GenServerRegistryError::ServerNotFound)? - .as_any() - .downcast_ref::>() - .cloned() - .ok_or(GenServerRegistryError::ServerNotFound) + pub fn get_entry( + address: &str, + ) -> Result, GenServerRegistryError> { + let registry = GENSERVER_REGISTRY.get_or_init(|| Arc::new(DashMap::new())); + registry + .get(address) + .ok_or(GenServerRegistryError::ServerNotFound)? + .as_any() + .downcast_ref::>() + .cloned() + .ok_or(GenServerRegistryError::ServerNotFound) + } } #[cfg(test)] @@ -148,15 +153,21 @@ mod tests { let lettuce_vegetable_basket = VegetableBasket::new("Lettuce", 20).start(); // We can store different GenServer types in the registry - add_registry_entry("banana_fruit_basket", banana_fruit_basket.clone()); - add_registry_entry("lettuce_vegetable_basket", lettuce_vegetable_basket.clone()); + assert!(GenServerRegistry::add_entry( + "banana_fruit_basket", + banana_fruit_basket.clone() + ) + .is_ok()); + assert!(GenServerRegistry::add_entry( + "lettuce_vegetable_basket", + lettuce_vegetable_basket.clone(), + ) + .is_ok()); // Retrieve the FruitBasket GenServer let mut retrieved_fruit_basket: GenServerHandle = - get_registry_entry("banana_fruit_basket").unwrap(); - let call_result = retrieved_fruit_basket - .call(InMsg::GetCount) - .await; + GenServerRegistry::get_entry("banana_fruit_basket").unwrap(); + let call_result = retrieved_fruit_basket.call(InMsg::GetCount).await; assert!(call_result.is_ok()); if let Ok(OutMsg::Count(count)) = call_result { assert_eq!(count, 10); @@ -166,11 +177,9 @@ mod tests { // Retrieve the VegetableBasket GenServer let mut retrieved_vegetable_basket: GenServerHandle = - get_registry_entry("lettuce_vegetable_basket").unwrap(); + GenServerRegistry::get_entry("lettuce_vegetable_basket").unwrap(); - let call_result = retrieved_vegetable_basket - .call(InMsg::GetCount) - .await; + let call_result = retrieved_vegetable_basket.call(InMsg::GetCount).await; assert!(call_result.is_ok()); if let Ok(OutMsg::Count(count)) = call_result { assert_eq!(count, 20); diff --git a/concurrency/src/tasks/mod.rs b/concurrency/src/tasks/mod.rs index a4d15d5..9d13c59 100644 --- a/concurrency/src/tasks/mod.rs +++ b/concurrency/src/tasks/mod.rs @@ -13,7 +13,7 @@ mod stream_tests; mod timer_tests; pub use gen_server::{CallResponse, CastResponse, GenServer, GenServerHandle, GenServerInMsg}; -pub use gen_server_registry::{add_registry_entry, get_registry_entry}; +pub use gen_server_registry::GenServerRegistry; pub use process::{send, Process, ProcessInfo}; pub use stream::spawn_listener; pub use time::{send_after, send_interval}; From ee3decb9e584f765fa4ecad408e62ef275b545b7 Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Thu, 24 Jul 2025 12:44:39 -0300 Subject: [PATCH 7/9] add `all_entries` & tests --- concurrency/src/tasks/gen_server_registry.rs | 105 +++++++++++++++++++ 1 file changed, 105 insertions(+) diff --git a/concurrency/src/tasks/gen_server_registry.rs b/concurrency/src/tasks/gen_server_registry.rs index 31c239e..777b883 100644 --- a/concurrency/src/tasks/gen_server_registry.rs +++ b/concurrency/src/tasks/gen_server_registry.rs @@ -28,12 +28,27 @@ impl AnyGenServerHandle for GenServerHandle { } } +// Global registry for GenServer handles. +// This allows us to store and retrieve `GenServerHandle` instances +// by their address, similar to how PIDs work in Erlang/Elixir. +// +// We do not expose this directly, but rather through the `GenServerRegistry` struct. static GENSERVER_REGISTRY: OnceCell>>> = OnceCell::new(); +/// A registry for `GenServer` instances, allowing them to be stored and retrieved by address. +/// This is similar to the PID registry in Erlang/Elixir, where processes can be registered and looked up by their identifiers. pub struct GenServerRegistry; impl GenServerRegistry { + /// Adds a `GenServerHandle` to the registry under the specified address. + /// Returns an error if the address is already taken. + /// + /// Example usage: + /// ```rs + /// let banana_fruit_basket = FruitBasket::new("Banana", 10).start(); // FruitBasket implements GenServer + /// GenServerRegistry::add_entry("banana_fruit_basket", banana_fruit_basket).unwrap(); + /// ``` pub fn add_entry( address: &str, server_handle: GenServerHandle, @@ -46,6 +61,14 @@ impl GenServerRegistry { Ok(()) } + /// Retrieves a `GenServerHandle` from the registry by its address. + /// Returns an error if the address is not found or if the handle cannot be downcast + /// + /// Calling this function requires that the type of `GenServer` retrieved is known at compile time: + /// ```rs + /// let mut retrieved_vegetable_basket: GenServerHandle = + /// GenServerRegistry::get_entry("lettuce_vegetable_basket").unwrap(); + /// ``` pub fn get_entry( address: &str, ) -> Result, GenServerRegistryError> { @@ -58,6 +81,41 @@ impl GenServerRegistry { .cloned() .ok_or(GenServerRegistryError::ServerNotFound) } + + /// Retrieves all entries of a specific `GenServer` type from the registry. + /// + /// Example usage: + /// ```rs + /// let fruit_entries: Vec> = + /// GenServerRegistry::all_entries::(); + /// ``` + pub fn all_entries() -> Vec> { + let registry = GENSERVER_REGISTRY.get_or_init(|| Arc::new(DashMap::new())); + registry + .iter() + .filter_map(|entry| { + entry + .value() + .as_any() + .downcast_ref::>() + .cloned() + }) + .collect() + } + + /// Updates an existing entry in the registry with a new `GenServerHandle`. + /// Returns an error if the address is not found. + pub fn update_entry( + address: &str, + server_handle: GenServerHandle, + ) -> Result<(), GenServerRegistryError> { + let registry = GENSERVER_REGISTRY.get_or_init(|| Arc::new(DashMap::new())); + if !registry.contains_key(address) { + return Err(GenServerRegistryError::ServerNotFound); + } + registry.insert(address.to_string(), Box::new(server_handle)); + Ok(()) + } } #[cfg(test)] @@ -145,6 +203,9 @@ mod tests { } } + // This test checks the functionality of the GenServerRegistry, + // It's done all in a signle test function to prevent the global registry + // state from not being reset between tests. #[test] fn test_gen_server_registry() { let runtime = rt::Runtime::new().unwrap(); @@ -186,6 +247,50 @@ mod tests { } else { panic!("Expected OutMsg::Count"); } + + // Next we check that we can retrieve all entries + let mut amount_collected = 0; + let fruit_entries: Vec> = + GenServerRegistry::all_entries::(); + for mut entry in fruit_entries { + let out_msg = entry.call(InMsg::GetCount).await.unwrap(); + let OutMsg::Count(count) = out_msg; + amount_collected += count; + } + + let vegetable_entries: Vec> = + GenServerRegistry::all_entries::(); + for mut entry in vegetable_entries { + let out_msg = entry.call(InMsg::GetCount).await.unwrap(); + let OutMsg::Count(count) = out_msg; + amount_collected += count; + } + + assert_eq!(amount_collected, 10 + 20); + + // Now we update the entry for the fruit basket + let new_banana_fruit_basket = FruitBasket::new("Banana", 15).start(); + assert!(GenServerRegistry::update_entry( + "banana_fruit_basket", + new_banana_fruit_basket + ) + .is_ok()); + + let mut updated_fruit_basket: GenServerHandle = + GenServerRegistry::get_entry("banana_fruit_basket").unwrap(); + let updated_call_result = updated_fruit_basket.call(InMsg::GetCount).await.unwrap(); + let OutMsg::Count(count) = updated_call_result; + assert_eq!(count, 15); + + // Finally, we check that we can't retrieve a non-existent entry + assert!(GenServerRegistry::get_entry::("orange_fruit_basket").is_err()); + + // And neither update a non-existent entry + let orange_fruit_basket = FruitBasket::new("Orange", 5).start(); + assert!( + GenServerRegistry::update_entry("orange_fruit_basket", orange_fruit_basket) + .is_err() + ); }); } } From abf149a7f8839df9371116ed158d68ac7112b8ea Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Thu, 24 Jul 2025 12:47:16 -0300 Subject: [PATCH 8/9] improve test coverage --- concurrency/src/tasks/gen_server_registry.rs | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/concurrency/src/tasks/gen_server_registry.rs b/concurrency/src/tasks/gen_server_registry.rs index 777b883..0ffcae3 100644 --- a/concurrency/src/tasks/gen_server_registry.rs +++ b/concurrency/src/tasks/gen_server_registry.rs @@ -270,6 +270,15 @@ mod tests { // Now we update the entry for the fruit basket let new_banana_fruit_basket = FruitBasket::new("Banana", 15).start(); + + // We can't insert the entry directly + assert!(GenServerRegistry::add_entry( + "banana_fruit_basket", + new_banana_fruit_basket.clone() + ) + .is_err()); + + // But we can update it assert!(GenServerRegistry::update_entry( "banana_fruit_basket", new_banana_fruit_basket From 6c69e20176ea9c9bb99a24f8ee84a48961ecb348 Mon Sep 17 00:00:00 2001 From: Juan Munoz Date: Thu, 24 Jul 2025 12:50:51 -0300 Subject: [PATCH 9/9] remove comment from toml --- concurrency/Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/concurrency/Cargo.toml b/concurrency/Cargo.toml index 1fc8fd4..f86e406 100644 --- a/concurrency/Cargo.toml +++ b/concurrency/Cargo.toml @@ -11,7 +11,6 @@ tracing = { workspace = true } futures = "0.3.1" thiserror = "2.0.12" dashmap = "6.1.0" -# Used for the `GenServerRegistry` testing. once_cell = "1.21.3" [dev-dependencies]