diff --git a/Cargo.lock b/Cargo.lock index 0839f61..4d351f0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -152,7 +152,7 @@ dependencies = [ "crossbeam-deque", "crossbeam-epoch", "crossbeam-queue", - "crossbeam-utils", + "crossbeam-utils 0.7.2", ] [[package]] @@ -161,7 +161,7 @@ version = "0.4.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b153fe7cbef478c567df0f972e02e6d736db11affe43dfc9c56a9374d1adfb87" dependencies = [ - "crossbeam-utils", + "crossbeam-utils 0.7.2", "maybe-uninit", ] @@ -172,7 +172,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c20ff29ded3204c5106278a81a38f4b482636ed4fa1e6cfbeef193291beb29ed" dependencies = [ "crossbeam-epoch", - "crossbeam-utils", + "crossbeam-utils 0.7.2", "maybe-uninit", ] @@ -184,7 +184,7 @@ checksum = "058ed274caafc1f60c4997b5fc07bf7dc7cca454af7c6e81edffe5f33f70dace" dependencies = [ "autocfg", "cfg-if 0.1.10", - "crossbeam-utils", + "crossbeam-utils 0.7.2", "lazy_static", "maybe-uninit", "memoffset", @@ -198,7 +198,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "774ba60a54c213d409d5353bda12d49cd68d14e45036a285234c8d6f91f92570" dependencies = [ "cfg-if 0.1.10", - "crossbeam-utils", + "crossbeam-utils 0.7.2", "maybe-uninit", ] @@ -213,6 +213,26 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "crossbeam-utils" +version = "0.8.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" + +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if 1.0.0", + "crossbeam-utils 0.8.21", + "hashbrown 0.14.5", + "lock_api", + "once_cell", + "parking_lot_core", +] + [[package]] name = "displaydoc" version = "0.2.5" @@ -411,6 +431,12 @@ dependencies = [ "tracing", ] +[[package]] +name = "hashbrown" +version = "0.14.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" + [[package]] name = "hashbrown" version = "0.15.3" @@ -602,7 +628,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cea70ddb795996207ad57735b50c5982d8844f38ba9ee5f1aedcfb708a2aa11e" dependencies = [ "equivalent", - "hashbrown", + "hashbrown 0.15.3", ] [[package]] @@ -1199,7 +1225,9 @@ dependencies = [ name = "spawned-concurrency" version = "0.2.1" dependencies = [ + "dashmap", "futures", + "once_cell", "spawned-rt", "thiserror", "tokio", diff --git a/concurrency/Cargo.toml b/concurrency/Cargo.toml index 198c166..f86e406 100644 --- a/concurrency/Cargo.toml +++ b/concurrency/Cargo.toml @@ -10,6 +10,8 @@ spawned-rt = { workspace = true } tracing = { workspace = true } futures = "0.3.1" thiserror = "2.0.12" +dashmap = "6.1.0" +once_cell = "1.21.3" [dev-dependencies] # This tokio imports are only used in tests, we should not use them in the library code. diff --git a/concurrency/src/tasks/gen_server_registry.rs b/concurrency/src/tasks/gen_server_registry.rs new file mode 100644 index 0000000..0ffcae3 --- /dev/null +++ b/concurrency/src/tasks/gen_server_registry.rs @@ -0,0 +1,305 @@ +// Inspired by ractor pid_registry: +// https://github.com/slawlor/ractor/blob/main/ractor/src/registry/pid_registry.rs + +use dashmap::DashMap; +use once_cell::sync::OnceCell; +use std::sync::Arc; + +use crate::tasks::{GenServer, GenServerHandle}; + +#[derive(Debug, thiserror::Error)] +pub enum GenServerRegistryError { + #[error("A GenServer is already Registered at this Address")] + AddressAlreadyTaken, + #[error("There is no GenServer associated with this Address")] + ServerNotFound, +} + +// Wrapper trait to allow downcasting of `GenServerHandle`. +// Needed as otherwise `GenServerHandle` is sized and +// thus not `dyn` compatible. +trait AnyGenServerHandle: Send + Sync { + fn as_any(&self) -> &dyn std::any::Any; +} + +impl AnyGenServerHandle for GenServerHandle { + fn as_any(&self) -> &dyn std::any::Any { + self + } +} + +// Global registry for GenServer handles. +// This allows us to store and retrieve `GenServerHandle` instances +// by their address, similar to how PIDs work in Erlang/Elixir. +// +// We do not expose this directly, but rather through the `GenServerRegistry` struct. +static GENSERVER_REGISTRY: OnceCell>>> = + OnceCell::new(); + +/// A registry for `GenServer` instances, allowing them to be stored and retrieved by address. +/// This is similar to the PID registry in Erlang/Elixir, where processes can be registered and looked up by their identifiers. +pub struct GenServerRegistry; + +impl GenServerRegistry { + /// Adds a `GenServerHandle` to the registry under the specified address. + /// Returns an error if the address is already taken. + /// + /// Example usage: + /// ```rs + /// let banana_fruit_basket = FruitBasket::new("Banana", 10).start(); // FruitBasket implements GenServer + /// GenServerRegistry::add_entry("banana_fruit_basket", banana_fruit_basket).unwrap(); + /// ``` + pub fn add_entry( + address: &str, + server_handle: GenServerHandle, + ) -> Result<(), GenServerRegistryError> { + let registry = GENSERVER_REGISTRY.get_or_init(|| Arc::new(DashMap::new())); + if registry.contains_key(address) { + return Err(GenServerRegistryError::AddressAlreadyTaken); + } + registry.insert(address.to_string(), Box::new(server_handle)); + Ok(()) + } + + /// Retrieves a `GenServerHandle` from the registry by its address. + /// Returns an error if the address is not found or if the handle cannot be downcast + /// + /// Calling this function requires that the type of `GenServer` retrieved is known at compile time: + /// ```rs + /// let mut retrieved_vegetable_basket: GenServerHandle = + /// GenServerRegistry::get_entry("lettuce_vegetable_basket").unwrap(); + /// ``` + pub fn get_entry( + address: &str, + ) -> Result, GenServerRegistryError> { + let registry = GENSERVER_REGISTRY.get_or_init(|| Arc::new(DashMap::new())); + registry + .get(address) + .ok_or(GenServerRegistryError::ServerNotFound)? + .as_any() + .downcast_ref::>() + .cloned() + .ok_or(GenServerRegistryError::ServerNotFound) + } + + /// Retrieves all entries of a specific `GenServer` type from the registry. + /// + /// Example usage: + /// ```rs + /// let fruit_entries: Vec> = + /// GenServerRegistry::all_entries::(); + /// ``` + pub fn all_entries() -> Vec> { + let registry = GENSERVER_REGISTRY.get_or_init(|| Arc::new(DashMap::new())); + registry + .iter() + .filter_map(|entry| { + entry + .value() + .as_any() + .downcast_ref::>() + .cloned() + }) + .collect() + } + + /// Updates an existing entry in the registry with a new `GenServerHandle`. + /// Returns an error if the address is not found. + pub fn update_entry( + address: &str, + server_handle: GenServerHandle, + ) -> Result<(), GenServerRegistryError> { + let registry = GENSERVER_REGISTRY.get_or_init(|| Arc::new(DashMap::new())); + if !registry.contains_key(address) { + return Err(GenServerRegistryError::ServerNotFound); + } + registry.insert(address.to_string(), Box::new(server_handle)); + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use spawned_rt::tasks::{self as rt}; + + #[derive(Clone)] + enum InMsg { + GetCount, + } + + #[derive(Clone)] + enum OutMsg { + Count(u32), + } + + #[derive(Clone)] + struct FruitBasket { + _name: String, + count: u32, + } + + impl FruitBasket { + pub fn new(name: &str, initial_count: u32) -> Self { + Self { + _name: name.to_string(), + count: initial_count, + } + } + } + + impl GenServer for FruitBasket { + type CallMsg = InMsg; + type CastMsg = (); + type OutMsg = OutMsg; + type Error = (); + + async fn handle_call( + self, + message: Self::CallMsg, + _handle: &GenServerHandle, + ) -> crate::tasks::CallResponse { + match message { + InMsg::GetCount => { + let count = self.count; + crate::tasks::CallResponse::Reply(self, OutMsg::Count(count)) + } + } + } + } + + #[derive(Clone)] + struct VegetableBasket { + _name: String, + count: u32, + } + + impl VegetableBasket { + pub fn new(name: &str, initial_count: u32) -> Self { + Self { + _name: name.to_string(), + count: initial_count, + } + } + } + + impl GenServer for VegetableBasket { + type CallMsg = InMsg; + type CastMsg = (); + type OutMsg = OutMsg; + type Error = (); + + async fn handle_call( + self, + message: Self::CallMsg, + _handle: &GenServerHandle, + ) -> crate::tasks::CallResponse { + match message { + InMsg::GetCount => { + let count = self.count; + crate::tasks::CallResponse::Reply(self, OutMsg::Count(count)) + } + } + } + } + + // This test checks the functionality of the GenServerRegistry, + // It's done all in a signle test function to prevent the global registry + // state from not being reset between tests. + #[test] + fn test_gen_server_registry() { + let runtime = rt::Runtime::new().unwrap(); + runtime.block_on(async move { + let banana_fruit_basket = FruitBasket::new("Banana", 10).start(); + let lettuce_vegetable_basket = VegetableBasket::new("Lettuce", 20).start(); + + // We can store different GenServer types in the registry + assert!(GenServerRegistry::add_entry( + "banana_fruit_basket", + banana_fruit_basket.clone() + ) + .is_ok()); + assert!(GenServerRegistry::add_entry( + "lettuce_vegetable_basket", + lettuce_vegetable_basket.clone(), + ) + .is_ok()); + + // Retrieve the FruitBasket GenServer + let mut retrieved_fruit_basket: GenServerHandle = + GenServerRegistry::get_entry("banana_fruit_basket").unwrap(); + let call_result = retrieved_fruit_basket.call(InMsg::GetCount).await; + assert!(call_result.is_ok()); + if let Ok(OutMsg::Count(count)) = call_result { + assert_eq!(count, 10); + } else { + panic!("Expected OutMsg::Count"); + } + + // Retrieve the VegetableBasket GenServer + let mut retrieved_vegetable_basket: GenServerHandle = + GenServerRegistry::get_entry("lettuce_vegetable_basket").unwrap(); + + let call_result = retrieved_vegetable_basket.call(InMsg::GetCount).await; + assert!(call_result.is_ok()); + if let Ok(OutMsg::Count(count)) = call_result { + assert_eq!(count, 20); + } else { + panic!("Expected OutMsg::Count"); + } + + // Next we check that we can retrieve all entries + let mut amount_collected = 0; + let fruit_entries: Vec> = + GenServerRegistry::all_entries::(); + for mut entry in fruit_entries { + let out_msg = entry.call(InMsg::GetCount).await.unwrap(); + let OutMsg::Count(count) = out_msg; + amount_collected += count; + } + + let vegetable_entries: Vec> = + GenServerRegistry::all_entries::(); + for mut entry in vegetable_entries { + let out_msg = entry.call(InMsg::GetCount).await.unwrap(); + let OutMsg::Count(count) = out_msg; + amount_collected += count; + } + + assert_eq!(amount_collected, 10 + 20); + + // Now we update the entry for the fruit basket + let new_banana_fruit_basket = FruitBasket::new("Banana", 15).start(); + + // We can't insert the entry directly + assert!(GenServerRegistry::add_entry( + "banana_fruit_basket", + new_banana_fruit_basket.clone() + ) + .is_err()); + + // But we can update it + assert!(GenServerRegistry::update_entry( + "banana_fruit_basket", + new_banana_fruit_basket + ) + .is_ok()); + + let mut updated_fruit_basket: GenServerHandle = + GenServerRegistry::get_entry("banana_fruit_basket").unwrap(); + let updated_call_result = updated_fruit_basket.call(InMsg::GetCount).await.unwrap(); + let OutMsg::Count(count) = updated_call_result; + assert_eq!(count, 15); + + // Finally, we check that we can't retrieve a non-existent entry + assert!(GenServerRegistry::get_entry::("orange_fruit_basket").is_err()); + + // And neither update a non-existent entry + let orange_fruit_basket = FruitBasket::new("Orange", 5).start(); + assert!( + GenServerRegistry::update_entry("orange_fruit_basket", orange_fruit_basket) + .is_err() + ); + }); + } +} diff --git a/concurrency/src/tasks/mod.rs b/concurrency/src/tasks/mod.rs index 201c7d2..9d13c59 100644 --- a/concurrency/src/tasks/mod.rs +++ b/concurrency/src/tasks/mod.rs @@ -2,6 +2,7 @@ //! Runtime tasks-based traits and structs to implement concurrent code à-la-Erlang. mod gen_server; +mod gen_server_registry; mod process; mod stream; mod time; @@ -12,6 +13,7 @@ mod stream_tests; mod timer_tests; pub use gen_server::{CallResponse, CastResponse, GenServer, GenServerHandle, GenServerInMsg}; +pub use gen_server_registry::GenServerRegistry; pub use process::{send, Process, ProcessInfo}; pub use stream::spawn_listener; pub use time::{send_after, send_interval};