diff --git a/Cargo.lock b/Cargo.lock index c1147f4..0be0f28 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,63 +2,6 @@ # It is not intended for manual editing. version = 4 -[[package]] -name = "actix" -version = "0.13.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de7fa236829ba0841304542f7614c42b80fca007455315c45c785ccfa873a85b" -dependencies = [ - "actix-macros", - "actix-rt", - "actix_derive", - "bitflags", - "bytes", - "crossbeam-channel", - "futures-core", - "futures-sink", - "futures-task", - "futures-util", - "log", - "once_cell", - "parking_lot", - "pin-project-lite", - "smallvec", - "tokio 1.40.0", - "tokio-util", -] - -[[package]] -name = "actix-macros" -version = "0.2.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e01ed3140b2f8d422c68afa1ed2e85d996ea619c988ac834d255db32138655cb" -dependencies = [ - "quote", - "syn", -] - -[[package]] -name = "actix-rt" -version = "2.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "24eda4e2a6e042aa4e55ac438a2ae052d3b5da0ecf83d7411e1a368946925208" -dependencies = [ - "actix-macros", - "futures-core", - "tokio 1.40.0", -] - -[[package]] -name = "actix_derive" -version = "0.6.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6ac1e58cded18cb28ddc17143c4dea5345b3ad575e14f32f66e4054a56eb271" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "addr2line" version = "0.24.2" @@ -78,23 +21,22 @@ checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" name = "aj" version = "0.7.2" dependencies = [ - "actix-rt", "aj_core", "aj_macro", "lazy_static", "serde", + "tokio 1.40.0", ] [[package]] name = "aj_core" version = "0.7.2" dependencies = [ - "actix", - "actix-rt", "async-trait", "chrono", "cron", "dashmap", + "kameo", "lazy_static", "log", "redis", @@ -253,21 +195,6 @@ dependencies = [ "once_cell", ] -[[package]] -name = "crossbeam-channel" -version = "0.5.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33480d6946193aa8033910124896ca395333cae7e2d1113d1fef6c3272217df2" -dependencies = [ - "crossbeam-utils", -] - -[[package]] -name = "crossbeam-utils" -version = "0.8.20" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80" - [[package]] name = "darling" version = "0.20.10" @@ -326,6 +253,18 @@ dependencies = [ "serde", ] +[[package]] +name = "dyn-clone" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0881ea181b1df73ff77ffaaf9c7544ecc11e82fba9b5f27b262a3c73a332555" + +[[package]] +name = "either" +version = "1.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" + [[package]] name = "equivalent" version = "1.0.1" @@ -347,12 +286,65 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "futures" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" +dependencies = [ + "futures-core", + "futures-sink", +] + [[package]] name = "futures-core" version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" +[[package]] +name = "futures-executor" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" + +[[package]] +name = "futures-macro" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "futures-sink" version = "0.3.31" @@ -371,10 +363,16 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ + "futures-channel", "futures-core", + "futures-io", + "futures-macro", + "futures-sink", "futures-task", + "memchr", "pin-project-lite", "pin-utils", + "slab", ] [[package]] @@ -412,6 +410,12 @@ version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1e087f84d4f86bf4b218b927129862374b72199ae7d8657835f1e89000eea4fb" +[[package]] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" + [[package]] name = "hermit-abi" version = "0.3.9" @@ -467,10 +471,10 @@ dependencies = [ name = "in-memory" version = "0.1.0" dependencies = [ - "actix-rt", "aj", "chrono", "serde", + "tokio 1.40.0", ] [[package]] @@ -495,6 +499,15 @@ dependencies = [ "serde", ] +[[package]] +name = "itertools" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.11" @@ -510,6 +523,36 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "kameo" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62237a96597618543798a36ec723eb75c5ac301e2690243fd600be1f5eb3dd2d" +dependencies = [ + "dyn-clone", + "futures", + "itertools", + "kameo_macros", + "once_cell", + "serde", + "tokio 1.40.0", + "tokio-stream", + "tracing", +] + +[[package]] +name = "kameo_macros" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5bbbd8e8d7b02bc67eae0dcbdb82c0a71cc7cc61734059ee3e7439a1ee1e0e85" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn", + "uuid", +] + [[package]] name = "lazy_static" version = "1.5.0" @@ -801,6 +844,12 @@ dependencies = [ "libc", ] +[[package]] +name = "slab" +version = "0.4.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a2ae44ef20feb57a68b23d846850f861394c2e02dc425a50098ae8c90267589" + [[package]] name = "smallvec" version = "1.13.2" @@ -904,6 +953,7 @@ dependencies = [ "signal-hook-registry", "socket2", "tokio-macros", + "tracing", "windows-sys", ] @@ -919,18 +969,47 @@ dependencies = [ ] [[package]] -name = "tokio-util" -version = "0.7.12" +name = "tokio-stream" +version = "0.1.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61e7c3654c13bcd040d4a03abee2c75b1d14a37b423cf5a813ceae1cc903ec6a" +checksum = "32da49809aab5c3bc678af03902d4ccddea2a87d028d86392a4b1560c6906c70" dependencies = [ - "bytes", "futures-core", - "futures-sink", "pin-project-lite", "tokio 1.40.0", ] +[[package]] +name = "tracing" +version = "0.1.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100" +dependencies = [ + "pin-project-lite", + "tracing-attributes", + "tracing-core", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tracing-core" +version = "0.1.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a" +dependencies = [ + "once_cell", +] + [[package]] name = "unicode-bidi" version = "0.3.17" diff --git a/README.md b/README.md index bccb79b..3ab6ec6 100644 --- a/README.md +++ b/README.md @@ -2,14 +2,14 @@ ![ci status](https://github.com/cptrodgers/aj/actions/workflows/test-and-build.yml/badge.svg) AJ is a simple, customizable, and feature-rich background job processing library for Rust. -It can work with any runtime by running a new actix-rt in a separate thread if it detects that an actix-rt runtime is not present. +It runs on pure Tokio runtime using the Kameo actor framework. ## Install ```toml aj = "0.7.2" serde = { version = "1.0.64", features = ["derive"] } # Serialize and deserialize the job -actix-rt = "2.2" # Actor model runtime engine +tokio = { version = "1", features = ["rt-multi-thread", "macros"] } # Async runtime ``` ### Enable Redis Backend @@ -30,7 +30,7 @@ async fn hello(name: String) { println!("Hello {name}"); } -#[aj::main] +#[tokio::main] async fn main() { // Start AJ with in-memory backend (default, no feature flag needed) AJ::quick_start(); @@ -45,7 +45,7 @@ async fn main() { hello::run("AJ".into()).await; // Sleep 1 sec to view the result from the job (if you want to wait for the job to run) - // sleep(Duration::from_secs(1)).await; + // tokio::time::sleep(std::time::Duration::from_secs(1)).await; } ``` @@ -100,7 +100,7 @@ impl Executable for Print { } } -#[main] +#[tokio::main] async fn main() { // Start AJ engine AJ::quick_start(); @@ -285,7 +285,7 @@ impl JobPlugin for SamplePlugin { } } -#[aj::main] +#[tokio::main] async fn main() { AJ::register_plugin(SamplePlugin).await.unwrap(); } diff --git a/aj/Cargo.toml b/aj/Cargo.toml index 962f0e9..6aa2a77 100644 --- a/aj/Cargo.toml +++ b/aj/Cargo.toml @@ -3,7 +3,7 @@ name = "aj" version = "0.7.2" edition = "2021" authors = ["cptrodgers "] -description = "Background Job based on Actix" +description = "Background Job Library for Rust" readme = "README.md" repository = "https://github.com/cptrodgers/aj" license = "MIT OR Apache-2.0" @@ -21,5 +21,5 @@ redis = ["aj_core/redis"] aj_macro = { version = "=0.7.2", path = "../aj_macro" } aj_core = { version = "=0.7.2", path = "../aj_core" } serde = { version = "1.0.64", features = ["derive"] } -actix-rt = "2.2" +tokio = { version = "1.23.1", features = ["rt", "macros"] } lazy_static = { version = "1.4.0" } diff --git a/aj/README.md b/aj/README.md index 759235b..ef18335 100644 --- a/aj/README.md +++ b/aj/README.md @@ -1,15 +1,15 @@ -# a# aj +# aj ![ci status](https://github.com/cptrodgers/aj/actions/workflows/test-and-build.yml/badge.svg) -Aj is a simple, customizable, and feature-rich background job processing library for Rust. -It can work with any runtime by running new actix-rt in a separated thread if it detects that an actix-rt runtime is not present. +AJ is a simple, customizable, and feature-rich background job processing library for Rust. +It runs on pure Tokio runtime using the Kameo actor framework. ## Install ```toml -aj = "0.7.0" +aj = "0.7.2" serde = { version = "1.0.64", features = ["derive"] } # Serialize and deserialize the job -actix-rt = "2.2" # Actor model runtime engine +tokio = { version = "1", features = ["rt-multi-thread", "macros"] } # Async runtime ``` ## Quick start @@ -22,19 +22,19 @@ async fn hello(name: String) { println!("Hello {name}"); } -#[aj::main] +#[tokio::main] async fn main() { - // AJ will be backed by run in-memory backend. - // If you wish to use redis as the backend for aj. + // AJ will be backed by in-memory backend. + // If you wish to use redis as the backend for aj: // AJ::start(aj::Redis::new("redis://localhost:6379")); AJ::quick_start(); - // Fire and forget the job. No gruantee job is queued + // Fire and forget the job. No guarantee job is queued hello::just_run("Rodgers".into()); - // Or waiting job is queued + // Or wait for job to be queued hello::run("AJ".into()).await; - // Sleep 1 sec to view the result from job (if you want to wait the job run) - // sleep(Duration::from_secs(1)).await; + // Sleep 1 sec to view the result from job (if you want to wait for the job to run) + // tokio::time::sleep(std::time::Duration::from_secs(1)).await; } ``` diff --git a/aj/examples/cancel_job.rs b/aj/examples/cancel_job.rs index ff8b609..b25b823 100644 --- a/aj/examples/cancel_job.rs +++ b/aj/examples/cancel_job.rs @@ -1,12 +1,11 @@ use aj::{ async_trait, export::core::{ - actix_rt::time::sleep, chrono::Duration, get_now_as_secs, serde::{Deserialize, Serialize}, }, - main, BackgroundJob, Executable, JobContext, AJ, + BackgroundJob, Executable, JobContext, AJ, }; #[derive(BackgroundJob, Serialize, Deserialize, Debug, Clone)] @@ -24,7 +23,7 @@ impl Executable for Print { } } -#[main] +#[tokio::main] async fn main() { AJ::quick_start(); let job_id = Print { number: 1 } @@ -43,5 +42,5 @@ async fn main() { .await .unwrap(); - sleep(std::time::Duration::from_secs(2)).await; + tokio::time::sleep(std::time::Duration::from_secs(2)).await; } diff --git a/aj/examples/cron_job.rs b/aj/examples/cron_job.rs index 67959de..473c92d 100644 --- a/aj/examples/cron_job.rs +++ b/aj/examples/cron_job.rs @@ -1,10 +1,7 @@ use aj::{ async_trait, - export::core::{ - actix_rt::time::sleep, - serde::{Deserialize, Serialize}, - }, - main, BackgroundJob, Executable, JobContext, AJ, + export::core::serde::{Deserialize, Serialize}, + BackgroundJob, Executable, JobContext, AJ, }; use aj_core::get_now_as_secs; @@ -20,12 +17,12 @@ impl Executable for AJob { } } -#[main] +#[tokio::main] async fn main() { AJ::quick_start(); println!("Start time {}", get_now_as_secs()); let _ = AJob.job().cron("* * * * * * *").run().await; - sleep(std::time::Duration::from_secs(5)).await; + tokio::time::sleep(std::time::Duration::from_secs(5)).await; } diff --git a/aj/examples/retry_job.rs b/aj/examples/retry_job.rs index cd63e2f..2205772 100644 --- a/aj/examples/retry_job.rs +++ b/aj/examples/retry_job.rs @@ -2,11 +2,8 @@ use std::time::Duration; use aj::{ async_trait, - export::core::{ - actix_rt::time::sleep, - serde::{Deserialize, Serialize}, - }, - main, BackgroundJob, Executable, JobContext, AJ, + export::core::serde::{Deserialize, Serialize}, + BackgroundJob, Executable, JobContext, AJ, }; use aj_core::retry::Retry; @@ -31,7 +28,7 @@ impl Executable for Print { } } -#[main] +#[tokio::main] async fn main() { AJ::quick_start(); @@ -43,5 +40,5 @@ async fn main() { )); let _ = job.run().await; - sleep(Duration::from_secs(5)).await; + tokio::time::sleep(Duration::from_secs(5)).await; } diff --git a/aj/examples/schedule_job.rs b/aj/examples/schedule_job.rs index 7df4e54..2808d68 100644 --- a/aj/examples/schedule_job.rs +++ b/aj/examples/schedule_job.rs @@ -1,11 +1,10 @@ use aj::{ async_trait, export::core::{ - actix_rt::time::sleep, chrono::Duration, serde::{Deserialize, Serialize}, }, - main, BackgroundJob, Executable, JobContext, AJ, + BackgroundJob, Executable, JobContext, AJ, }; use aj_core::{get_now, get_now_as_secs}; @@ -21,7 +20,7 @@ impl Executable for AJob { } } -#[main] +#[tokio::main] async fn main() { AJ::quick_start(); @@ -32,5 +31,5 @@ async fn main() { .schedule_at(get_now() + Duration::seconds(2)) .just_run(); - sleep(std::time::Duration::from_secs(3)).await; + tokio::time::sleep(std::time::Duration::from_secs(3)).await; } diff --git a/aj/examples/simple_job.rs b/aj/examples/simple_job.rs index 92abb6c..624d57c 100644 --- a/aj/examples/simple_job.rs +++ b/aj/examples/simple_job.rs @@ -1,7 +1,7 @@ use std::time::Duration; use aj::job; -use aj::{export::core::actix_rt::time::sleep, main, AJ}; +use aj::AJ; #[job] fn hello(name: String) { @@ -14,7 +14,7 @@ async fn async_hello(name: String) { println!("Hello async, {name}"); } -#[main] +#[tokio::main] async fn main() { // Start AJ engine AJ::quick_start(); @@ -26,5 +26,5 @@ async fn main() { let _ = async_hello::just_run("AJ".into()); // Sleep 1 ms to view the result from job - sleep(Duration::from_secs(1)).await; + tokio::time::sleep(Duration::from_secs(1)).await; } diff --git a/aj/examples/update_job.rs b/aj/examples/update_job.rs index c8873d9..10e6776 100644 --- a/aj/examples/update_job.rs +++ b/aj/examples/update_job.rs @@ -1,11 +1,10 @@ use aj::{ async_trait, export::core::{ - actix_rt::time::sleep, chrono::Duration, serde::{Deserialize, Serialize}, }, - main, BackgroundJob, Executable, JobContext, AJ, + BackgroundJob, Executable, JobContext, AJ, }; use aj_core::get_now_as_secs; @@ -23,7 +22,7 @@ impl Executable for Print { } } -#[main] +#[tokio::main] async fn main() { AJ::quick_start(); @@ -41,5 +40,5 @@ async fn main() { .await .unwrap(); - sleep(std::time::Duration::from_secs(2)).await; + tokio::time::sleep(std::time::Duration::from_secs(2)).await; } diff --git a/aj/src/lib.rs b/aj/src/lib.rs index 76574bc..75c753d 100644 --- a/aj/src/lib.rs +++ b/aj/src/lib.rs @@ -1,12 +1,12 @@ -//! Aj is a simple, flexible, and feature-rich background job processing library for Rust, backed by Actix (Actor Model). +//! Aj is a simple, flexible, and feature-rich background job processing library for Rust. //! //! # Quick Start //! -//! ```rust +//! ```rust,ignore //! use std::time::Duration; //! //! use aj::job; -//! use aj::{export::core::actix_rt::time::sleep, main, AJ}; +//! use aj::AJ; //! //! #[job] //! fn hello(name: String) { @@ -19,7 +19,7 @@ //! println!("Hello async, {name}"); //! } //! -//! #[main] +//! #[tokio::main] //! async fn main() { //! // Start AJ engine with in-memory backend //! AJ::quick_start(); @@ -31,7 +31,7 @@ //! let _ = async_hello::just_run("AJ".into()); //! //! // Sleep 1 ms to view the result from job -//! sleep(Duration::from_secs(1)).await; +//! tokio::time::sleep(Duration::from_secs(1)).await; //! } //! ``` //! @@ -70,7 +70,6 @@ pub use aj_core::{BackgroundJob, Error, Executable, Job, JobContext, WorkQueue, pub use aj_macro::job; pub use aj_macro::BackgroundJob; -pub use actix_rt::main; pub use aj_core::async_trait::async_trait; pub use aj_core::chrono; diff --git a/aj_core/Cargo.toml b/aj_core/Cargo.toml index 6ec21a6..71048af 100644 --- a/aj_core/Cargo.toml +++ b/aj_core/Cargo.toml @@ -3,7 +3,7 @@ name = "aj_core" version = "0.7.2" edition = "2021" authors = ["cptrodgers "] -description = "Background Job based on Actix" +description = "Background Job Library for Rust" readme = "README.md" repository = "https://github.com/cptrodgers/aj" license = "MIT OR Apache-2.0" @@ -22,8 +22,7 @@ redis = ["dep:redis"] uuid = { version = "1.8", features = ["serde", "v4"] } redis = { version = "0.25.3", optional = true } cron = "0.12.0" -actix = "0.13.0" -actix-rt = "2.2" +kameo = "0.13" serde = { version = "1.0.64", features = ["derive"] } serde_json = "1.0.64" serde_with = { version = "3.6.1", features = ["chrono_0_4"] } @@ -32,4 +31,4 @@ log = "0.4" lazy_static = { version = "1.4.0" } async-trait = "0.1.74" dashmap = "5.5.3" -tokio = "1.23.1" +tokio = { version = "1.23.1", features = ["sync", "time"] } diff --git a/aj_core/src/aj.rs b/aj_core/src/aj.rs index fa47a65..d8cb313 100644 --- a/aj_core/src/aj.rs +++ b/aj_core/src/aj.rs @@ -1,11 +1,11 @@ -use actix::*; use dashmap::DashMap; -use fut::wrap_future; +use kameo::actor::ActorRef; +use kameo::message::{Context, Message}; +use kameo::Actor; use lazy_static::lazy_static; use serde::de::DeserializeOwned; use serde::Serialize; use std::any::{Any, TypeId}; -use std::marker::PhantomData; use std::sync::{Arc, RwLock}; use crate::job::Job; @@ -22,7 +22,12 @@ lazy_static! { } lazy_static! { - static ref AJ_ADDR: Arc>>> = Arc::new(RwLock::new(None)); + static ref AJ_BACKEND: Arc>>> = + Arc::new(RwLock::new(None)); +} + +lazy_static! { + static ref AJ_ADDR: Arc>>> = Arc::new(RwLock::new(None)); } #[derive(Debug, Default)] @@ -31,14 +36,21 @@ pub struct Registry { registry_by_name: DashMap>, } -pub fn get_work_queue_address() -> Option>> +fn get_backend() -> Option> { + if let Ok(backend) = AJ_BACKEND.try_read() { + backend.clone() + } else { + None + } +} + +pub fn get_work_queue_address() -> Option>> where M: Executable + Send + Sync + Clone + Serialize + DeserializeOwned + 'static, - WorkQueue: Actor>>, { let type_id = TypeId::of::(); if let Some(queue_addr) = QUEUE_REGISTRY.registry.get(&type_id) { - if let Some(addr) = queue_addr.downcast_ref::>>() { + if let Some(addr) = queue_addr.downcast_ref::>>() { return Some(addr.clone()); } } @@ -46,7 +58,35 @@ where None } -pub fn get_aj_address() -> Option> { +fn register_work_queue(queue_name: &str) -> ActorRef> +where + M: Executable + Send + Sync + Clone + Serialize + DeserializeOwned + 'static, +{ + let type_id = TypeId::of::(); + let registry = &QUEUE_REGISTRY; + + // Check if already registered + if let Some(queue_addr) = registry.registry.get(&type_id) { + if let Some(addr) = queue_addr.downcast_ref::>>() { + return addr.clone(); + } + } + + let backend = get_backend().expect("AJ is not started, please start it via AJ::start"); + + // Start Queue actor + let queue_ref = WorkQueue::::start_with_name(queue_name.into(), backend); + registry + .registry + .insert(type_id, Box::new(queue_ref.clone())); + registry + .registry_by_name + .insert(queue_name.into(), Box::new(queue_ref.clone())); + + queue_ref +} + +pub fn get_aj_address() -> Option> { if let Ok(addr) = AJ_ADDR.try_read() { addr.clone() } else { @@ -54,86 +94,37 @@ pub fn get_aj_address() -> Option> { } } -pub struct AJ { - backend: Arc, -} - -impl Actor for AJ { - type Context = Context; -} +#[derive(Actor)] +pub struct AJ {} impl AJ { - // Will use memory as Backend for AJ - pub fn start(backend: impl Backend + Send + Sync + 'static) -> Addr { + /// Start AJ with a custom backend + pub fn start(backend: impl Backend + Send + Sync + 'static) -> ActorRef { if let Some(aj_addr) = get_aj_address() { warn!("AJ is running. Return current AJ"); return aj_addr; } - match System::try_current() { - Some(_) => { - info!("Found Actix Runtime, re-use it!"); - Self::register_addr(backend); - } - None => { - info!("No Actix Runtime, trying start new one in separated thread!"); - std::thread::spawn(|| { - // Start the Actix runtime within this new thread - let _ = System::new(); - Self::register_addr(backend); - }) - .join() - .expect("Failed to start thread"); - } + // Store backend globally + if let Ok(ref mut backend_ref) = AJ_BACKEND.try_write() { + **backend_ref = Some(Arc::new(backend)); } - get_aj_address().expect("AJ address must be registered!") - } - - fn register_addr(backend: impl Backend + Send + Sync + 'static) -> Addr { - let arbiter: Arbiter = Arbiter::new(); - let addr = ::start_in_arbiter(&arbiter.handle(), |_| Self { - backend: Arc::new(backend), - }); + let actor = AJ {}; + let actor_ref = kameo::spawn(actor); if let Ok(ref mut aj_addr) = AJ_ADDR.try_write() { - **aj_addr = Some(addr.clone()); + **aj_addr = Some(actor_ref.clone()); } - addr + actor_ref } - pub fn quick_start() -> Addr { + /// Quick start AJ with in-memory backend + pub fn quick_start() -> ActorRef { Self::start(InMemory::default()) } - pub fn register(&self, queue_name: &str) -> Addr> - where - M: Executable + Send + Sync + Clone + Serialize + DeserializeOwned + 'static, - WorkQueue: Actor>>, - { - let type_id = TypeId::of::(); - - let registry = &QUEUE_REGISTRY; - if registry.registry_by_name.contains_key(queue_name) { - panic!("You already register queue with name: {}", queue_name); - } - if registry.registry.contains_key(&type_id) { - panic!("You already register queue with type: {:?}", type_id); - } - - // Start Queue in an arbiter thread - let queue_addr = WorkQueue::::start_with_name(queue_name.into(), self.backend.clone()); - registry - .registry - .insert(type_id, Box::new(queue_addr.clone())); - registry - .registry_by_name - .insert(queue_name.into(), Box::new(queue_addr.clone())); - - queue_addr - } - pub async fn enqueue_job( job: Job, config: EnqueueConfig, @@ -141,31 +132,23 @@ impl AJ { ) -> Result<(), Error> where M: Executable + Send + Sync + Clone + Serialize + DeserializeOwned + 'static, - WorkQueue: Actor>>, { - let addr = if let Some(addr) = get_work_queue_address() { - addr + let actor_ref = if let Some(actor_ref) = get_work_queue_address() { + actor_ref } else { - info!("Not found WorkQueue for {}", stringify!(M)); - let message = InitWorkQueue { - queue_name: queue_name.into(), - _type: PhantomData, - }; - let aj_addr = get_aj_address().expect("AJ is not start, please start it via AJ::start"); - aj_addr.send(message).await? + info!("Not found WorkQueue for {}, creating...", queue_name); + register_work_queue::(queue_name) }; - enqueue_job(addr, job, config).await + enqueue_job(actor_ref, job, config).await } pub async fn cancel_job(job_id: String) -> Result<(), Error> where M: Executable + Send + Sync + Clone + Serialize + DeserializeOwned + 'static, - - WorkQueue: Actor>>, { - let addr: Option>> = get_work_queue_address(); - if let Some(queue_addr) = addr { - cancel_job(queue_addr, job_id).await + let actor_ref: Option>> = get_work_queue_address(); + if let Some(queue_ref) = actor_ref { + cancel_job(queue_ref, job_id).await } else { Err(Error::NoQueueRegister) } @@ -174,11 +157,10 @@ impl AJ { pub async fn get_job(job_id: &str) -> Option> where M: Executable + Send + Sync + Clone + Serialize + DeserializeOwned + 'static, - WorkQueue: Actor>>, { - let addr: Option>> = get_work_queue_address(); - if let Some(queue_addr) = addr { - get_job(queue_addr, job_id).await + let actor_ref: Option>> = get_work_queue_address(); + if let Some(queue_ref) = actor_ref { + get_job(queue_ref, job_id).await } else { None } @@ -198,7 +180,6 @@ impl AJ { + Serialize + DeserializeOwned + 'static, - WorkQueue: Actor>>, { let job = Self::get_job::(job_id).await; if let Some(mut job) = job { @@ -217,11 +198,10 @@ impl AJ { pub async fn retry_job(job_id: &str) -> Result where M: Executable + Send + Sync + Clone + Serialize + DeserializeOwned + 'static, - WorkQueue: Actor>>, { - let addr: Option>> = get_work_queue_address(); - if let Some(queue_addr) = addr { - retry_job(queue_addr, job_id).await + let actor_ref: Option>> = get_work_queue_address(); + if let Some(queue_ref) = actor_ref { + retry_job(queue_ref, job_id).await } else { Err(Error::NoQueueRegister) } @@ -230,7 +210,6 @@ impl AJ { pub async fn add_job(job: Job, queue_name: &str) -> Result where M: Executable + Send + Sync + Clone + Serialize + DeserializeOwned + 'static, - WorkQueue: Actor>>, { let job_id = job.id().to_string(); let config = EnqueueConfig::new_re_run(); @@ -241,11 +220,10 @@ impl AJ { pub async fn update_work_queue(config: WorkQueueConfig) -> Result<(), Error> where M: Executable + Send + Sync + Clone + Serialize + DeserializeOwned + 'static, - WorkQueue: Actor>>, { - let addr: Option>> = get_work_queue_address(); - if let Some(queue_addr) = addr { - update_work_queue_config(queue_addr, config).await + let actor_ref: Option>> = get_work_queue_address(); + if let Some(queue_ref) = actor_ref { + update_work_queue_config(queue_ref, config).await } else { Err(Error::NoQueueRegister) } @@ -259,55 +237,29 @@ impl AJ { } } -#[derive(Message)] -#[rtype(result = "Addr>")] -pub struct InitWorkQueue -where - M: Executable + Send + Sync + Clone + Serialize + DeserializeOwned + 'static, - WorkQueue: Actor>>, -{ - pub queue_name: String, - _type: PhantomData, -} - -impl Handler> for AJ -where - M: Executable + Send + Sync + Clone + Serialize + DeserializeOwned + 'static, - WorkQueue: Actor>>, -{ - type Result = Addr>; - - fn handle(&mut self, msg: InitWorkQueue, _: &mut Self::Context) -> Self::Result { - self.register(&msg.queue_name) - } -} - -/// This message will handle fire and forgot style -#[derive(Message)] -#[rtype(result = "()")] +// Message: JustRunJob (fire and forget style) pub struct JustRunJob where M: Executable + Send + Sync + Clone + Serialize + DeserializeOwned + 'static, - WorkQueue: Actor>>, { pub job: Job, pub queue_name: String, } -impl Handler> for AJ +impl Message> for AJ where M: Executable + Send + Sync + Clone + Serialize + DeserializeOwned + 'static, - WorkQueue: Actor>>, { - type Result = (); - - fn handle(&mut self, msg: JustRunJob, ctx: &mut Self::Context) -> Self::Result { - let task = async move { - if let Err(reason) = AJ::add_job(msg.job, &msg.queue_name).await { - error!("Cannot start job {reason:?}"); - } - }; - wrap_future::<_, Self>(task).spawn(ctx) + type Reply = (); + + async fn handle( + &mut self, + msg: JustRunJob, + _ctx: Context<'_, Self, Self::Reply>, + ) -> Self::Reply { + if let Err(reason) = AJ::add_job(msg.job, &msg.queue_name).await { + error!("Cannot start job {reason:?}"); + } } } @@ -315,45 +267,11 @@ where mod tests { use super::{get_aj_address, AJ}; - #[test] - fn test_start_aj_with_non_tokio_runtime() { - let addr = AJ::quick_start(); - let register_addr = get_aj_address(); - - assert!(register_addr.is_some()); - assert_eq!(addr, register_addr.unwrap()); - } - - #[test] - fn test_start_aj_under_tokio_runtime() { - use tokio::runtime::Builder; - let tokio_rt = Builder::new_current_thread().build().unwrap(); - tokio_rt.block_on(async { - let addr = AJ::quick_start(); - let register_addr = get_aj_address(); - - assert!(register_addr.is_some()); - assert_eq!(addr, register_addr.unwrap()); - }) - } - - #[test] - fn test_start_aj_under_actix_runtime() { - let system = actix_rt::System::new(); - system.block_on(async { - let addr = AJ::quick_start(); - let register_addr = get_aj_address(); - - assert!(register_addr.is_some()); - assert_eq!(addr, register_addr.unwrap()); - }); - } - - #[test] - fn test_start_multiple_times() { - let addr = AJ::quick_start(); - let second_addr = AJ::quick_start(); + #[tokio::test] + async fn test_start_aj_under_tokio_runtime() { + let _actor_ref = AJ::quick_start(); + let register_ref = get_aj_address(); - assert_eq!(addr, second_addr); + assert!(register_ref.is_some()); } } diff --git a/aj_core/src/error.rs b/aj_core/src/error.rs index 7c6ea07..9944b35 100644 --- a/aj_core/src/error.rs +++ b/aj_core/src/error.rs @@ -1,4 +1,3 @@ -use actix::MailboxError; #[cfg(feature = "redis")] use redis::RedisError; @@ -7,7 +6,7 @@ pub enum Error { #[cfg(feature = "redis")] Redis(RedisError), CronError(cron::error::Error), - MailboxError(MailboxError), + ActorError(String), NoQueueRegister, SerializeError, } @@ -25,8 +24,8 @@ impl From for Error { } } -impl From for Error { - fn from(value: MailboxError) -> Self { - Self::MailboxError(value) +impl From> for Error { + fn from(value: kameo::error::SendError) -> Self { + Self::ActorError(format!("{:?}", value)) } } diff --git a/aj_core/src/job/mod.rs b/aj_core/src/job/mod.rs index 046731f..0d689bb 100644 --- a/aj_core/src/job/mod.rs +++ b/aj_core/src/job/mod.rs @@ -263,7 +263,7 @@ mod tests { Job::new(TestJob { number }) } - #[actix::test] + #[tokio::test] async fn test_job() { let number = 1; let mut default_job = default_job(number); @@ -278,7 +278,7 @@ mod tests { assert_eq!(default_job.context.run_count, 1); } - #[actix::test] + #[tokio::test] async fn test_schedule_job() { let number = 1; let schedule_at = get_now() + Duration::from_secs(1); @@ -288,7 +288,7 @@ mod tests { assert!(schedule_job.context.job_type == JobType::ScheduledAt(schedule_at)) } - #[actix::test] + #[tokio::test] async fn test_cron_job() { let number = 1; let expression = "0 1 1 1 * * *"; @@ -299,7 +299,7 @@ mod tests { assert!(schedule_job.context.job_type == expected_cron); } - #[actix::test] + #[tokio::test] async fn test_retry() { #[derive(Default, Debug, Clone, Serialize)] pub struct TestRetryJob { diff --git a/aj_core/src/lib.rs b/aj_core/src/lib.rs index 0afef09..deeef02 100644 --- a/aj_core/src/lib.rs +++ b/aj_core/src/lib.rs @@ -22,7 +22,6 @@ pub use queue::*; pub use util::*; // External libs. -pub use actix_rt; pub use async_trait; pub use chrono; pub use cron; @@ -49,10 +48,12 @@ where /// It will just send message to WorkQueue and no gurantee job is inserted to backend pub fn just_run(self) { - if let Some(aj_addr) = get_aj_address() { - aj_addr.do_send(JustRunJob { - job: self, - queue_name: M::queue_name().to_string(), + if let Some(aj_ref) = get_aj_address() { + let job = self; + let queue_name = M::queue_name().to_string(); + // Fire and forget - spawn a task to send the message + tokio::spawn(async move { + let _ = aj_ref.tell(JustRunJob { job, queue_name }).await; }); } } diff --git a/aj_core/src/plugin/job_plugin.rs b/aj_core/src/plugin/job_plugin.rs index 4827f79..0b35f0f 100644 --- a/aj_core/src/plugin/job_plugin.rs +++ b/aj_core/src/plugin/job_plugin.rs @@ -38,7 +38,7 @@ pub trait JobPlugin { } pub struct JobPluginWrapper { - hook: Box, + pub(crate) hook: Box, job_type_ids: Vec, } @@ -150,7 +150,7 @@ mod tests { assert_eq!(plugin_2.should_run(TypeId::of::()), false); } - #[actix_rt::test] + #[tokio::test] async fn test_change_status_hook() { // Plugin apply for all Job let plugin = JobPluginWrapper::new(SimplePlugin, vec![]); @@ -161,7 +161,7 @@ mod tests { assert_eq!(*JOB_ID.lock().unwrap(), "job_status"); } - #[actix_rt::test] + #[tokio::test] async fn test_change_before_run() { // Plugin apply for all Job let plugin = JobPluginWrapper::new(SimplePlugin, vec![]); @@ -172,7 +172,7 @@ mod tests { assert_eq!(*JOB_ID.lock().unwrap(), "job_before"); } - #[actix_rt::test] + #[tokio::test] async fn test_change_after_run() { // Plugin apply for all Job let plugin = JobPluginWrapper::new(SimplePlugin, vec![]); diff --git a/aj_core/src/plugin/mod.rs b/aj_core/src/plugin/mod.rs index 710b33a..5620715 100644 --- a/aj_core/src/plugin/mod.rs +++ b/aj_core/src/plugin/mod.rs @@ -2,21 +2,34 @@ pub mod job_plugin; pub use job_plugin::*; -use actix::*; -use std::{marker::PhantomData, sync::Arc}; +use kameo::actor::ActorRef; +use kameo::message::{Context, Message}; +use kameo::Actor; +use std::sync::Arc; +use std::sync::OnceLock; use crate::{Error, Executable, JobStatus}; -#[derive(Clone, Default)] +static PLUGIN_CENTER: OnceLock> = OnceLock::new(); + +#[derive(Clone, Default, Actor)] pub struct PluginCenter { plugins: Vec>, } impl PluginCenter { + /// Get or initialize the global PluginCenter actor + fn get_or_init() -> ActorRef { + PLUGIN_CENTER + .get_or_init(|| kameo::spawn(PluginCenter::default())) + .clone() + } + pub async fn register(plugin: JobPluginWrapper) -> Result<(), Error> { - Self::from_registry() - .send(RegisterPlugin { plugin }) - .await?; + Self::get_or_init() + .ask(RegisterPlugin { plugin }) + .await + .map_err(|e| Error::ActorError(format!("{:?}", e)))?; Ok(()) } @@ -24,108 +37,96 @@ impl PluginCenter { where M: Executable + Clone + Send + 'static, { - let msg: ChangeStatus = ChangeStatus { - job_id, - status, - phantom: PhantomData, - }; - Self::from_registry().do_send(msg); + let actor_ref = Self::get_or_init(); + let msg = ChangeStatusMsg { job_id, status }; + // Fire and forget - spawn a task to send the message + tokio::spawn(async move { + let _ = actor_ref.tell(msg).await; + }); } pub(crate) async fn before(job_id: String) where M: Executable + Clone + Send + 'static, { - let msg: RunHook = RunHook { + let msg = RunHookMsg { job_id, before: true, - phantom: PhantomData, }; - let _ = Self::from_registry().send(msg).await; + let _ = Self::get_or_init().ask(msg).await; } pub(crate) async fn after(job_id: String) where M: Executable + Clone + Send + 'static, { - let msg: RunHook = RunHook { + let msg = RunHookMsg { job_id, - before: true, - phantom: PhantomData, + before: false, }; - let _ = Self::from_registry().send(msg).await; + let _ = Self::get_or_init().ask(msg).await; } } -impl Actor for PluginCenter { - type Context = Context; -} - -impl SystemService for PluginCenter {} -impl Supervised for PluginCenter {} - -#[derive(Message)] -#[rtype(result = "()")] +// Message: RegisterPlugin pub struct RegisterPlugin { pub plugin: JobPluginWrapper, } -impl Handler for PluginCenter { - type Result = (); +impl Message for PluginCenter { + type Reply = (); - fn handle(&mut self, msg: RegisterPlugin, _: &mut Self::Context) -> Self::Result { - self.plugins.push(Arc::new(msg.plugin)) + async fn handle( + &mut self, + msg: RegisterPlugin, + _ctx: Context<'_, Self, Self::Reply>, + ) -> Self::Reply { + self.plugins.push(Arc::new(msg.plugin)); } } -#[derive(Message)] -#[rtype(result = "()")] -pub struct RunHook -where - M: Executable + Clone + 'static, -{ +// Message: RunHookMsg (non-generic version) +pub struct RunHookMsg { pub job_id: String, pub before: bool, - phantom: PhantomData, } -impl Handler> for PluginCenter { - type Result = ResponseFuture<()>; - - fn handle(&mut self, msg: RunHook, _ctx: &mut Self::Context) -> Self::Result { - let plugins = self.plugins.clone(); - Box::pin(async move { - for plugin in plugins { - if msg.before { - plugin.before_run::(&msg.job_id).await; - } else { - plugin.after_run::(&msg.job_id).await; - } +impl Message for PluginCenter { + type Reply = (); + + async fn handle( + &mut self, + msg: RunHookMsg, + _ctx: Context<'_, Self, Self::Reply>, + ) -> Self::Reply { + let job_id = msg.job_id; + let before = msg.before; + for plugin in &self.plugins { + if before { + plugin.hook.before_run(&job_id).await; + } else { + plugin.hook.after_run(&job_id).await; } - }) + } } } -#[derive(Message)] -#[rtype(result = "()")] -pub struct ChangeStatus -where - M: Executable + Clone + 'static, -{ +// Message: ChangeStatusMsg (non-generic version) +pub struct ChangeStatusMsg { pub job_id: String, pub status: JobStatus, - phantom: PhantomData, } -impl Handler> for PluginCenter { - type Result = ResponseFuture<()>; - - fn handle(&mut self, msg: ChangeStatus, _ctx: &mut Self::Context) -> Self::Result { - let plugins = self.plugins.clone(); - Box::pin(async move { - for plugin in plugins { - plugin.change_status::(&msg.job_id, msg.status).await; - } - }) +impl Message for PluginCenter { + type Reply = (); + + async fn handle( + &mut self, + msg: ChangeStatusMsg, + _ctx: Context<'_, Self, Self::Reply>, + ) -> Self::Reply { + for plugin in &self.plugins { + plugin.hook.change_status(&msg.job_id, msg.status).await; + } } } diff --git a/aj_core/src/queue.rs b/aj_core/src/queue.rs index 0cd2b7b..1022183 100644 --- a/aj_core/src/queue.rs +++ b/aj_core/src/queue.rs @@ -1,5 +1,6 @@ -use actix::fut::wrap_future; -use actix::*; +use kameo::actor::ActorRef; +use kameo::message::{Context, Message}; +use kameo::Actor; use serde::de::DeserializeOwned; use serde::Serialize; use std::fmt::Debug; @@ -65,11 +66,10 @@ impl Default for WorkQueueConfig { } } -#[derive(Clone)] +#[derive(Actor)] pub struct WorkQueue where M: Executable + Send + Sync + Clone + Serialize + DeserializeOwned + 'static, - Self: Actor>, { name: Arc, worker_id: String, @@ -78,17 +78,24 @@ where backend: Arc, } -impl Actor for WorkQueue +impl Clone for WorkQueue where - M: Executable + Unpin + Send + Sync + Clone + Serialize + DeserializeOwned + 'static, + M: Executable + Send + Sync + Clone + Serialize + DeserializeOwned + 'static, { - type Context = Context; + fn clone(&self) -> Self { + Self { + name: self.name.clone(), + worker_id: self.worker_id.clone(), + config: self.config.clone(), + _type: PhantomData, + backend: self.backend.clone(), + } + } } impl WorkQueue where M: Executable + Send + Sync + Clone + Serialize + DeserializeOwned + 'static, - Self: Actor>, { pub fn new(job_name: String, backend: Arc) -> Self { Self { @@ -105,14 +112,33 @@ where &self.name } - pub fn start_with_name(name: String, backend: Arc) -> Addr { - let arbiter: Arbiter = Arbiter::new(); + pub fn start_with_name( + name: String, + backend: Arc, + ) -> ActorRef { + let queue = WorkQueue::::new(name, backend); + let actor_ref = kameo::spawn(queue); + + // Start the processing loop in a separate task + let actor_ref_clone = actor_ref.clone(); + tokio::spawn(async move { + Self::processing_loop(actor_ref_clone).await; + }); - ::start_in_arbiter(&arbiter.handle(), |ctx| { - let mut q = WorkQueue::::new(name, backend); - q.process_jobs(ctx); - q - }) + actor_ref + } + + /// Background processing loop that sends ProcessTick messages periodically + async fn processing_loop(actor_ref: ActorRef) { + let mut interval = tokio::time::interval(DEFAULT_TICK_DURATION); + loop { + interval.tick().await; + // Send ProcessTick message to self + if actor_ref.tell(ProcessTick).await.is_err() { + // Actor stopped, exit the loop + break; + } + } } // ======================================================================== @@ -125,20 +151,20 @@ where if let Some(existing_job) = existing_job { if config.override_data && !existing_job.is_running() { - info!( + log::info!( "[WorkQueue] Update existing job with new job data: {}", job.id() ); save_job(self.backend.as_ref(), &self.name, job_id, &job)?; } else { - info!( + log::info!( "[WorkQueue] Job is running, skip update job data: {}", job.id() ); } if config.re_run && existing_job.is_done() { - info!("[WorkQueue] Re run job {}", existing_job.id()); + log::info!("[WorkQueue] Re run job {}", existing_job.id()); self.enqueue(job)?; } @@ -150,7 +176,7 @@ where pub fn enqueue(&self, mut job: Job) -> Result<(), Error> { let job_id = job.id().to_string(); - info!("[WorkQueue] New Job {}", job_id); + log::info!("[WorkQueue] New Job {}", job_id); // Update job status job.context.job_status = JobStatus::Queued; @@ -183,7 +209,7 @@ where pub fn re_enqueue(&self, mut job: Job) -> Result<(), Error> { let job_id = job.id().to_string(); - debug!("[WorkQueue] Re-run job {}", job_id); + log::debug!("[WorkQueue] Re-run job {}", job_id); // Remove from active queue self.backend.active_remove(&self.name, &job_id)?; @@ -213,19 +239,19 @@ where } pub fn mark_job_is_canceled(&self, job_id: &str) { - info!("Cancel job {}", job_id); + log::info!("Cancel job {}", job_id); // Remove from active and release lock if let Err(e) = self.backend.active_remove(&self.name, job_id) { - error!("[WorkQueue] Cannot remove from active {}: {:?}", job_id, e); + log::error!("[WorkQueue] Cannot remove from active {}: {:?}", job_id, e); } if let Err(e) = self.backend.lock_release(job_id, &self.worker_id) { - error!("[WorkQueue] Cannot release lock {}: {:?}", job_id, e); + log::error!("[WorkQueue] Cannot release lock {}: {:?}", job_id, e); } } pub fn mark_job_is_finished(&self, mut job: Job) -> Result<(), Error> { let job_id = job.id().to_string(); - info!("Finish job {}", job_id); + log::info!("Finish job {}", job_id); // Update job status job.context.job_status = JobStatus::Finished; @@ -242,7 +268,7 @@ where pub fn mark_job_is_failed(&self, mut job: Job) -> Result<(), Error> { let job_id = job.id().to_string(); - info!("Failed job {}", job_id); + log::info!("Failed job {}", job_id); // Update job status job.context.job_status = JobStatus::Failed; @@ -261,28 +287,24 @@ where // Job Processing // ======================================================================== - pub fn process_jobs(&mut self, ctx: &mut Context>) { + pub fn process_jobs(&self) { // First, move ready delayed jobs to waiting queue let now_ms = get_now_as_ms(); if let Err(e) = self.backend.delayed_move_ready(&self.name, now_ms) { - error!("[WorkQueue] Failed to move delayed jobs: {:?}", e); + log::error!("[WorkQueue] Failed to move delayed jobs: {:?}", e); } // Then pick and process jobs match self.pick_jobs_to_process() { Ok(jobs) => { for job in jobs { - self.execute_job_task(job, ctx); + self.execute_job_task(job); } } Err(err) => { - error!("[WorkQueue]: Cannot pick jobs to process {err:?}",); + log::error!("[WorkQueue]: Cannot pick jobs to process {err:?}"); } } - - ctx.run_later(self.config.process_tick_duration, |work_queue, ctx| { - work_queue.process_jobs(ctx); - }); } pub fn pick_jobs_to_process(&self) -> Result>, Error> { @@ -314,7 +336,7 @@ where ready_jobs.push(job); } else { // Job data not found, remove from active - warn!("[WorkQueue] Job data not found for {}, removing", job_id); + log::warn!("[WorkQueue] Job data not found for {}, removing", job_id); self.backend.active_remove(&self.name, &job_id)?; self.backend.lock_release(&job_id, &self.worker_id)?; } @@ -329,15 +351,14 @@ where Ok(ready_jobs) } - pub fn execute_job_task(&self, job: Job, ctx: &mut Context>) { + pub fn execute_job_task(&self, job: Job) { let this = self.clone(); - let task = async move { + tokio::spawn(async move { if let Err(err) = this.execute_job(job.clone()).await { - error!("[WorkQueue] Execute job {} fail: {:?}", job.id(), err); + log::error!("[WorkQueue] Execute job {} fail: {:?}", job.id(), err); let _ = this.mark_job_is_failed(job); } - }; - wrap_future::<_, Self>(task).spawn(ctx); + }); } pub async fn execute_job(&self, mut job: Job) -> Result<(), Error> { @@ -350,7 +371,7 @@ where let job_output = job.execute().await; let is_failed_output = job.data.is_failed_output(&job_output).await; - info!( + log::info!( "[WorkQueue] Execution complete. Job {} - Result: {job_output:?}", job.id() ); @@ -358,7 +379,7 @@ where // Check for retry if let Some(retry_context) = job.context.retry.as_mut() { if let Some(next_retry_at) = job.data.retry_at(retry_context, job_output).await { - info!("[WorkQueue] Retry this job. {}", job.id()); + log::info!("[WorkQueue] Retry this job. {}", job.id()); job.context.job_type = JobType::ScheduledAt(next_retry_at); return self.re_enqueue(job); } @@ -394,7 +415,7 @@ where crate::PluginCenter::change_status::(job_id.to_string(), JobStatus::Canceled); } else { - warn!("[WorkQueue] Cannot cancel {:?} job", job.context.job_status); + log::warn!("[WorkQueue] Cannot cancel {:?} job", job.context.job_status); } } @@ -414,15 +435,16 @@ where self.re_enqueue(job)?; return Ok(true); } else { - debug!( + log::debug!( "[WorkQueue] Cannot retry job {} in status {:?}", - job_id, job.context.job_status + job_id, + job.context.job_status ); return Ok(false); } } - debug!("[WorkQueue] Don't found job {} to retry", job_id); + log::debug!("[WorkQueue] Don't found job {} to retry", job_id); Ok(false) } @@ -436,66 +458,93 @@ where } // ============================================================================ -// Actix Message Handlers +// Kameo Message Handlers // ============================================================================ -#[derive(Message, Debug)] -#[rtype(result = "Result<(), Error>")] +// Message: ProcessTick (internal message for periodic processing) +pub struct ProcessTick; + +impl Message for WorkQueue +where + M: Executable + Send + Sync + Clone + Serialize + DeserializeOwned + 'static, +{ + type Reply = (); + + async fn handle( + &mut self, + _msg: ProcessTick, + _ctx: Context<'_, Self, Self::Reply>, + ) -> Self::Reply { + self.process_jobs(); + } +} + +// Message: Enqueue +#[derive(Debug)] pub struct Enqueue(pub Job, pub EnqueueConfig); -impl Handler> for WorkQueue +impl Message> for WorkQueue where M: Executable + Send + Sync + Clone + Serialize + DeserializeOwned + 'static, - Self: Actor>, { - type Result = Result<(), Error>; + type Reply = Result<(), Error>; - fn handle(&mut self, msg: Enqueue, _: &mut Self::Context) -> Self::Result { + async fn handle( + &mut self, + msg: Enqueue, + _ctx: Context<'_, Self, Self::Reply>, + ) -> Self::Reply { self.run_with_config(msg.0, msg.1) } } pub async fn enqueue_job( - addr: Addr>, + actor_ref: ActorRef>, job: Job, config: EnqueueConfig, ) -> Result<(), Error> where M: Executable + Send + Sync + Clone + Serialize + DeserializeOwned + 'static, - WorkQueue: Actor>>, { - addr.send::>(Enqueue(job, config)).await? + actor_ref + .ask(Enqueue(job, config)) + .await + .map_err(|e| Error::from(e)) } -#[derive(Message, Debug)] -#[rtype(result = "Result<(), Error>")] +// Message: CancelJob +#[derive(Debug)] pub struct CancelJob { pub job_id: String, } -impl Handler for WorkQueue +impl Message for WorkQueue where M: Executable + Send + Sync + Clone + Serialize + DeserializeOwned + 'static, - Self: Actor>, { - type Result = Result<(), Error>; - - fn handle(&mut self, msg: CancelJob, _: &mut Self::Context) -> Self::Result { - let job_id = msg.job_id; - self.cancel_job(&job_id) + type Reply = Result<(), Error>; + + async fn handle( + &mut self, + msg: CancelJob, + _ctx: Context<'_, Self, Self::Reply>, + ) -> Self::Reply { + self.cancel_job(&msg.job_id) } } -pub async fn cancel_job(addr: Addr>, job_id: String) -> Result<(), Error> +pub async fn cancel_job(actor_ref: ActorRef>, job_id: String) -> Result<(), Error> where M: Executable + Send + Sync + Clone + Serialize + DeserializeOwned + 'static, - WorkQueue: Actor>>, { - addr.send::(CancelJob { job_id }).await? + actor_ref + .ask(CancelJob { job_id }) + .await + .map_err(|e| Error::from(e)) } -#[derive(Message, Debug)] -#[rtype(result = "Option>")] +// Message: GetJob +#[derive(Debug)] pub struct GetJob where M: Executable + Send + Sync + Clone + Serialize + DeserializeOwned + 'static, @@ -504,32 +553,34 @@ where _phantom: PhantomData, } -impl Handler> for WorkQueue +impl Message> for WorkQueue where M: Executable + Send + Sync + Clone + Serialize + DeserializeOwned + 'static, - WorkQueue: Actor>>, { - type Result = Option>; + type Reply = Option>; - fn handle(&mut self, msg: GetJob, _: &mut Self::Context) -> Self::Result { + async fn handle( + &mut self, + msg: GetJob, + _ctx: Context<'_, Self, Self::Reply>, + ) -> Self::Reply { self.get_job(&msg.job_id).ok().flatten() } } -pub async fn get_job(addr: Addr>, job_id: &str) -> Option> +pub async fn get_job(actor_ref: ActorRef>, job_id: &str) -> Option> where M: Executable + Send + Sync + Clone + Serialize + DeserializeOwned + 'static, - WorkQueue: Actor>>, { let msg: GetJob = GetJob { job_id: job_id.to_string(), _phantom: PhantomData, }; - addr.send::>(msg).await.ok().flatten() + actor_ref.ask(msg).await.ok().flatten() } -#[derive(Message, Debug)] -#[rtype(result = "Result")] +// Message: RetryJob +#[derive(Debug)] pub struct RetryJob where M: Executable + Send + Sync + Clone + Serialize + DeserializeOwned + 'static, @@ -538,57 +589,63 @@ where _phantom: PhantomData, } -impl Handler> for WorkQueue +impl Message> for WorkQueue where M: Executable + Send + Sync + Clone + Serialize + DeserializeOwned + 'static, - WorkQueue: Actor>>, { - type Result = Result; + type Reply = Result; - fn handle(&mut self, msg: RetryJob, _: &mut Self::Context) -> Self::Result { + async fn handle( + &mut self, + msg: RetryJob, + _ctx: Context<'_, Self, Self::Reply>, + ) -> Self::Reply { self.retry_job(&msg.job_id) } } -pub async fn retry_job(addr: Addr>, job_id: &str) -> Result +pub async fn retry_job(actor_ref: ActorRef>, job_id: &str) -> Result where M: Executable + Send + Sync + Clone + Serialize + DeserializeOwned + 'static, - WorkQueue: Actor>>, { let msg: RetryJob = RetryJob { job_id: job_id.to_string(), _phantom: PhantomData, }; - addr.send::>(msg).await? + actor_ref.ask(msg).await.map_err(|e| Error::from(e)) } -#[derive(Message)] -#[rtype(result = "()")] +// Message: UpdateWorkQueue pub struct UpdateWorkQueue { pub config: WorkQueueConfig, } -impl Handler for WorkQueue +impl Message for WorkQueue where M: Executable + Send + Sync + Clone + Serialize + DeserializeOwned + 'static, - WorkQueue: Actor>>, { - type Result = (); + type Reply = (); - fn handle(&mut self, msg: UpdateWorkQueue, _: &mut Self::Context) -> Self::Result { + async fn handle( + &mut self, + msg: UpdateWorkQueue, + _ctx: Context<'_, Self, Self::Reply>, + ) -> Self::Reply { self.config = msg.config; } } pub async fn update_work_queue_config( - addr: Addr>, + actor_ref: ActorRef>, config: WorkQueueConfig, ) -> Result<(), Error> where M: Executable + Send + Sync + Clone + Serialize + DeserializeOwned + 'static, - WorkQueue: Actor>>, { let msg = UpdateWorkQueue { config }; - addr.send::(msg).await?; + actor_ref + .ask(msg) + .await + .map_err(|e| Error::ActorError(format!("{:?}", e)))?; Ok(()) } diff --git a/examples/normal/Cargo.toml b/examples/normal/Cargo.toml index 1916156..d5d1283 100644 --- a/examples/normal/Cargo.toml +++ b/examples/normal/Cargo.toml @@ -6,5 +6,5 @@ edition = "2021" [dependencies] aj = { version = "=0.7.2", path = "../../aj", features = ["redis"] } serde = { version = "1.0.64", features = ["derive"] } -actix-rt = "2.2" +tokio = { version = "1.23.1", features = ["full"] } chrono = { version = "0.4.34" } diff --git a/examples/normal/src/cancel_job.rs b/examples/normal/src/cancel_job.rs index a0b52cb..f59ad6f 100644 --- a/examples/normal/src/cancel_job.rs +++ b/examples/normal/src/cancel_job.rs @@ -1,4 +1,4 @@ -use aj::{export::core::actix_rt::time::sleep, BackgroundJob, AJ}; +use aj::{BackgroundJob, AJ}; use crate::default_print_job::Print; @@ -14,5 +14,5 @@ pub async fn run() { let success = AJ::cancel_job::(job_id).await.is_ok(); println!("Cancel: {success}"); - sleep(std::time::Duration::from_secs(1)).await; + tokio::time::sleep(std::time::Duration::from_secs(1)).await; } diff --git a/examples/normal/src/cron_job.rs b/examples/normal/src/cron_job.rs index 004ffaf..78fc60a 100644 --- a/examples/normal/src/cron_job.rs +++ b/examples/normal/src/cron_job.rs @@ -1,4 +1,3 @@ -use actix_rt::time::sleep; use aj::BackgroundJob; use crate::default_print_job::Print; @@ -7,5 +6,5 @@ pub async fn run() { // Cron let _ = Print { number: 3 }.job().cron("* * * * * * *").run().await; - sleep(std::time::Duration::from_secs(3)).await; + tokio::time::sleep(std::time::Duration::from_secs(3)).await; } diff --git a/examples/normal/src/macro_job.rs b/examples/normal/src/macro_job.rs index 17304b5..02ef59b 100644 --- a/examples/normal/src/macro_job.rs +++ b/examples/normal/src/macro_job.rs @@ -1,6 +1,5 @@ use std::time::Duration; -use aj::export::core::actix_rt::time::sleep; use aj::job; #[job] @@ -22,5 +21,5 @@ pub async fn run() { async_hello::just_run("AJ".into()); // Sleep 1 sec to view the result from job - sleep(Duration::from_secs(1)).await; + tokio::time::sleep(Duration::from_secs(1)).await; } diff --git a/examples/normal/src/main.rs b/examples/normal/src/main.rs index 1461699..b070c60 100644 --- a/examples/normal/src/main.rs +++ b/examples/normal/src/main.rs @@ -7,7 +7,7 @@ pub mod retry_job; pub mod schedule_job; pub mod update_job; -use aj::{main, redis::Redis, AJ}; +use aj::{redis::Redis, AJ}; use plugin::SamplePlugin; #[allow(dead_code)] @@ -15,7 +15,7 @@ fn run_aj_redis_engine() { AJ::start(Redis::new("redis://localhost:6379")); } -#[main] +#[tokio::main] async fn main() { // Run AJ engine with In Memory AJ::quick_start(); diff --git a/examples/normal/src/retry_job.rs b/examples/normal/src/retry_job.rs index 004cb7e..1b55d5b 100644 --- a/examples/normal/src/retry_job.rs +++ b/examples/normal/src/retry_job.rs @@ -1,4 +1,3 @@ -use actix_rt::time::sleep; use aj::{ job::{JobStatus, Retry}, BackgroundJob, AJ, @@ -15,7 +14,7 @@ pub async fn run() { )); let job_id = job.run().await.unwrap(); - sleep(std::time::Duration::from_secs(5)).await; + tokio::time::sleep(std::time::Duration::from_secs(5)).await; // Get Job let job = AJ::get_job::(&job_id).await.unwrap(); @@ -25,7 +24,7 @@ pub async fn run() { // Manual Retry println!("Manual Retry"); AJ::retry_job::(&job_id).await.unwrap(); - sleep(std::time::Duration::from_secs(1)).await; + tokio::time::sleep(std::time::Duration::from_secs(1)).await; // Exponential Retry 3 times -> Maximum do the job 4 times. let job = Print { number: 3 } @@ -36,5 +35,5 @@ pub async fn run() { )); let _ = job.run().await.unwrap(); // Run at 0s, Retry at after first job 1s, after second job 2s,.... 4s - sleep(std::time::Duration::from_secs(5)).await; + tokio::time::sleep(std::time::Duration::from_secs(5)).await; } diff --git a/examples/normal/src/schedule_job.rs b/examples/normal/src/schedule_job.rs index 154c542..1c3c9fd 100644 --- a/examples/normal/src/schedule_job.rs +++ b/examples/normal/src/schedule_job.rs @@ -1,4 +1,3 @@ -use actix_rt::time::sleep; use aj::BackgroundJob; use chrono::Duration; @@ -22,5 +21,5 @@ pub async fn run() { // Cron let _ = Print { number: 3 }.job().cron("* * * * * * *").run().await; - sleep(std::time::Duration::from_secs(5)).await; + tokio::time::sleep(std::time::Duration::from_secs(5)).await; } diff --git a/examples/normal/src/update_job.rs b/examples/normal/src/update_job.rs index ff65327..d538949 100644 --- a/examples/normal/src/update_job.rs +++ b/examples/normal/src/update_job.rs @@ -1,4 +1,3 @@ -use actix_rt::time::sleep; use aj::{BackgroundJob, AJ}; use crate::default_print_job::Print; @@ -17,5 +16,5 @@ pub async fn run() { .await .unwrap(); - sleep(std::time::Duration::from_secs(3)).await; + tokio::time::sleep(std::time::Duration::from_secs(3)).await; }