diff --git a/.github/workflows/book.yml b/.github/workflows/book.yml deleted file mode 100644 index 62c73c2..0000000 --- a/.github/workflows/book.yml +++ /dev/null @@ -1,29 +0,0 @@ -name: Github Pages - -on: - push: - branches: - - main - workflow_dispatch: - -jobs: - publish: - name: Publish Book - runs-on: ubuntu-latest - steps: - - name: Checkout sources - uses: actions/checkout@v4 - - - name: Install mdBook - run: | - wget 'https://github.com/rust-lang/mdBook/releases/download/v0.4.42/mdbook-v0.4.42-x86_64-unknown-linux-gnu.tar.gz' --output-document 'mdbook.tar.gz' - tar -zxvf mdbook.tar.gz - - - name: Build Book - run: ./mdbook build - - - name: Deploy pages - uses: peaceiris/actions-gh-pages@v3 - with: - github_token: ${{ secrets.GITHUB_TOKEN }} - publish_dir: ./book diff --git a/Cargo.toml b/Cargo.toml index ac14f5e..e12e8d6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,35 +17,12 @@ all-features = true [features] default = [] -redis = ["dep:redis"] -json = ["dep:serde", "dep:serde_json"] -tokio = ["dep:tokio"] [dependencies] -async-trait = "0.1" -redis = { version = "0.28", features=["connection-manager", "tokio-comp"], optional = true } -serde = { version = "1.0", features = ["serde_derive"], optional = true } -serde_json = { version = "1.0", optional = true } -tokio = { version = "1.43", features = ["time"], optional = true } +redis = { version = "0.28", features=["connection-manager", "tokio-comp"]} [dev-dependencies] -tokio = { version = "1.43", features = ["time", "macros", "rt-multi-thread"]} -redis = { version = "0.28", features=["connection-manager", "tokio-comp"] } serde = { version = "1.0", features = ["serde_derive"] } serde_json = { version = "1.0" } - -[[example]] -name = "memory" -required-features = ["tokio"] - -[[example]] -name = "redis" -required-features = ["redis", "tokio"] - -[[example]] -name = "redis_json" -required-features = ["redis", "json", "tokio"] - -[[example]] -name = "redis_json_autodelete" -required-features = ["redis", "json", "tokio"] +tokio = { version = "1.43", features = ["time", "macros", "rt-multi-thread"]} +tokio-util = { version = "0.7.13", features = ["full"] } diff --git a/Justfile b/Justfile new file mode 100644 index 0000000..18f1485 --- /dev/null +++ b/Justfile @@ -0,0 +1,11 @@ +all: fmt check test + +check: + cargo fmt --all -- --check + cargo clippy --all-features --all-targets -- -D warnings + +test: + cargo test --all-features + +fmt: + cargo fmt --all diff --git a/LICENSE b/LICENSE index 9db0d11..e0a25cf 100644 --- a/LICENSE +++ b/LICENSE @@ -1,4 +1,4 @@ -Copyright (c) 2022 Danil Akhtarov +Copyright (c) 2025 Danil Akhtarov Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated diff --git a/Makefile b/Makefile deleted file mode 100644 index be30857..0000000 --- a/Makefile +++ /dev/null @@ -1,25 +0,0 @@ -DEFAULT_GOAL := all - -.PHONY: all -all: fmt check test docs - -.PHONY: check -check: - cargo fmt --all -- --check - cargo clippy --all-features --all-targets -- -D warnings - -.PHONY: test -test: - cargo test --all-features - -.PHONY: fmt -fmt: - cargo fmt --all - -.PHONY: docs -docs: - mdbook build - -.PHONY: d2 -d2: - d2 diagram.d2 diagram.png diff --git a/README.md b/README.md index c4f6cc0..f3515c3 100644 --- a/README.md +++ b/README.md @@ -5,31 +5,24 @@ [![Docs.rs](https://docs.rs/taskline/badge.svg)](https://docs.rs/taskline) -The library allows for creating scheduled tasks via Redis for Rust. +Taskline is a Rust library for scheduling tasks via Redis. -```rust -producer.schedule(&"Hello!".to_string(), &(now() + 30000.)).await; +## Overview -loop { - let tasks = consumer.poll(&now()).await.unwrap(); +Taskline provides a simple way to schedule and process tasks asynchronously. It follows a producer-consumer model, where a producer schedules tasks to be executed at a specific time, and a consumer retrieves and processes them. - for task in tasks { - println!("Consumed {:?}", task); - } -} -``` - -That means the Consumed will be printed in 30 seconds. +## Use Cases -You can customize a format of an event for redis. Write your wrapper over [RedisBackend](src/backends/redis.rs). See [redis_json backend](src/backends/redis_json.rs). +Taskline is ideal for applications that require deferred execution, such as: -![diagram](diagram.png) +- Scheduling emails to be sent at a later time. +- Sending notifications to users at a specific moment. +- Any background job that needs time-based execution. ## Features - [x] Send/receive tasks in Redis - [x] Delayed tasks -- [x] Support json - [x] Deleting from a storage after handling - [ ] Support Redis Cluster - [ ] Metrics @@ -46,6 +39,22 @@ You can customize a format of an event for redis. Write your wrapper over [Redis cargo add taskline ``` +## Task Auto-Deletion + +### Default Behavior + +By default, Taskline automatically deletes tasks from storage after they are processed. This is the recommended approach for most use cases, as it ensures tasks are not executed multiple read. + +### Disabling Auto-Deletion + +If you prefer to manually manage task deletion, you can disable auto-delete by setting `autodelete=false`. However, this should only be used with a single consumer to avoid duplicate processing. If multiple consumers are involved, consider using a distributed lock mechanism like [redlock](https://redis.com/glossary/redlock/). For more details, see [Distributed Locks with Redis](https://redis.io/docs/manual/patterns/distributed-locks/). + +To manually remove a processed task, use: `Taskline::delete`. + +### Recommendation + +If your use case allows, it is recommended to keep `autodelete=true`, as it simplifies task management and reduces configuration overhead. However, be aware that in the event of an application crash, tasks may be lost before they are processed. + ## License * [MIT LICENSE](LICENSE) diff --git a/book.toml b/book.toml deleted file mode 100644 index c9338f0..0000000 --- a/book.toml +++ /dev/null @@ -1,10 +0,0 @@ -[book] -authors = ["Danil Akhtarov"] -language = "en" -multilingual = false -src = "docs" -title = "Taskline" - -[output.html] -no-section-label = true -git-repository-url = "https://github.com/daxartio/taskline" diff --git a/diagram.d2 b/diagram.d2 deleted file mode 100644 index 8a72d8d..0000000 --- a/diagram.d2 +++ /dev/null @@ -1,48 +0,0 @@ -Scheduled Queue: { - Consumers: { - app: "" { - shape: image - icon: https://cdn4.iconfinder.com/data/icons/logos-brands-5/24/rust-1024.png - } - c1: consumer 1 - c2: consumer 2 - c3: consumer 3 - c: ... - } - - Producers: { - app: "" { - shape: image - icon: https://cdn4.iconfinder.com/data/icons/logos-brands-5/24/rust-1024.png - } - p1: producer 1 - p2: producer 2 - } - - Redis: { - shape: image - icon: https://cdn4.iconfinder.com/data/icons/redis-2/1451/Untitled-2-1024.png - } - - Producers.p1 -> Redis: task 1, task 2, task 3 - Producers.p2 -> Redis: task 4, task 5 - Redis -> Consumers.c1: "[task 1, task 2, task 3]" - Redis -> Consumers.c2: "[task 4]" - Redis -> Consumers.c3: "[task 5]" - Redis -> Consumers.c: "..." -} - -Interface: { - grid-columns: 1 - Producer: { - shape: class - - schedule(task String, score f64) - } - - Consumers: { - shape: class - - poll(score f64): Vec - } -} diff --git a/diagram.png b/diagram.png deleted file mode 100644 index b5c83e5..0000000 Binary files a/diagram.png and /dev/null differ diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md deleted file mode 100644 index ac2c97b..0000000 --- a/docs/SUMMARY.md +++ /dev/null @@ -1,14 +0,0 @@ -# Summary - -- [Introduction](./introduction.md) -- [Quick Start](./quick-start.md) -- [User Guide](./user-guide/README.md) - - [Formats](./user-guide/format.md) - - [Autodelete](./user-guide/autodelete.md) -- [Contributing](./contributing.md) -- [Changelog](./changelog.md) -- [Code Of Conduct](./code-of-conduct.md) - ---- - -[Additional Resources](additional-resources.md) diff --git a/docs/additional-resources.md b/docs/additional-resources.md deleted file mode 100644 index bb9d77e..0000000 --- a/docs/additional-resources.md +++ /dev/null @@ -1,14 +0,0 @@ -# Additional Resources - -## Quick links - -- [Source code](https://github.com/daxartio/taskline) -- [API documentation](https://docs.rs/taskline) -- [Crate](https://crates.io/crates/taskline) -- [License](https://github.com/daxartio/taskline/blob/main/LICENSE) -- [Contribution guide](https://github.com/daxartio/taskline/blob/main/CONTRIBUTING.md) -- [Changelog](https://github.com/daxartio/taskline/blob/main/CHANGELOG.md) - -## Alternatives - -- [Rusty Celery](https://rusty-celery.github.io/) diff --git a/docs/changelog.md b/docs/changelog.md deleted file mode 100644 index bdde652..0000000 --- a/docs/changelog.md +++ /dev/null @@ -1 +0,0 @@ -{{#include ../CHANGELOG.md}} diff --git a/docs/code-of-conduct.md b/docs/code-of-conduct.md deleted file mode 100644 index 613e855..0000000 --- a/docs/code-of-conduct.md +++ /dev/null @@ -1 +0,0 @@ -{{#include ../CODE_OF_CONDUCT.md}} diff --git a/docs/contributing.md b/docs/contributing.md deleted file mode 100644 index 81ddb50..0000000 --- a/docs/contributing.md +++ /dev/null @@ -1 +0,0 @@ -{{#include ../CONTRIBUTING.md}} diff --git a/docs/introduction.md b/docs/introduction.md deleted file mode 100644 index b334073..0000000 --- a/docs/introduction.md +++ /dev/null @@ -1,18 +0,0 @@ -# Introduction - -The library allows to create scheduled tasks via Redis for Rust. - -## How does it work? - -Taskline revolves around the concept of a task. A task is a unit of work that is requested by a producer to be completed by a consumer / worker. - -A producer can schedule a task to be completed at a specific time in the future. A consumer can then fetch the task and complete it. - -There are backends for consumers and producers, which must impliment the `DequeuBackend` and `EnqueuBackend` traits. Right now, there is only one backend, which is Redis. - -## When should I use Taskline? - -Taskline is a good fit for applications that need to schedule work to be done in the future. For example, Taskline is a good fit for: - -- Scheduling emails to be sent in the future -- Scheduling a notification to be sent to a user in the future diff --git a/docs/quick-start.md b/docs/quick-start.md deleted file mode 100644 index 8589858..0000000 --- a/docs/quick-start.md +++ /dev/null @@ -1,33 +0,0 @@ -# Quick Start - -Taskline is provided as the a library on [crates.io](https://crates.io/crates/taskline). To get started, add taskline as a dependency to your project. - -``` -cargo add taskline -``` - -## Example - -The library provides an asynchronous code for interacting with Redis. You can use `tokio` or `async-std` as the runtime. - -First of all, you need to create a `RedisBackend` instance. You can do this by using the `RedisBackend::new` method or `RedisBackendConfig` struct. - -After that, you need to create a consumer and a producer. These are simple structs for more comfortable interaction with the library. You can create them using the `Consumer::new` and `Producer::new` methods. - -You can look at the example below. - -```rust,no_run,noplayground -{{#include ../examples/redis.rs}} -``` - -More examples can be found [here](https://github.com/daxartio/taskline/tree/main/examples). - -## Run the example - -```bash -git clone git@github.com:daxartio/taskline.git -# git clone https://github.com/daxartio/taskline.git -cd taskline -docker-compose up -d -cargo run --example redis -``` diff --git a/docs/user-guide/README.md b/docs/user-guide/README.md deleted file mode 100644 index 4af8e5e..0000000 --- a/docs/user-guide/README.md +++ /dev/null @@ -1,3 +0,0 @@ -# User Guide - -That you need to know. diff --git a/docs/user-guide/autodelete.md b/docs/user-guide/autodelete.md deleted file mode 100644 index b7c6aa2..0000000 --- a/docs/user-guide/autodelete.md +++ /dev/null @@ -1,20 +0,0 @@ -# Autodelete - -## Deleting from a storage after handling - -If you want to delete a task from storage after handling, you can use `RedisJsonBackend` or `RedisBackend` with `autodelete=false` parameter. It's safe to use it only with one consumer. If you have more than one consumer, you can use distributed lock by redis. It's also named as [redlock](https://redis.com/glossary/redlock/). See [Distributed Locks with Redis](https://redis.io/docs/manual/patterns/distributed-locks/). - -Don't forget to delete a task explicitly from storage after handling. See `Committer::commit`. - -It's experimental implementation. In the future, it will be implemented more comfortable way. - -## Recommendation - -I recommend to use `autodelete=True`, if it fits to you. This way is simple to understanding and it do not require extra configurations. -But you need to know that your tasks will not be handling again if your application has an error. - -## Example - -```rust,no_run,noplayground -{{#include ../../examples/redis_json_autodelete.rs}} -``` diff --git a/docs/user-guide/format.md b/docs/user-guide/format.md deleted file mode 100644 index dcb1fe3..0000000 --- a/docs/user-guide/format.md +++ /dev/null @@ -1,16 +0,0 @@ -# Formats of tasks - -## A format of a task for sending and receiving via Redis - -Actually, Taskline uses a format of a backend. You can use any format which you want. - -There are two formats of a task for sending and receiving via Redis which are implemented in the library: - -- JSON -- String - -## Example - -```rust,no_run,noplayground -{{#include ../../examples/redis_json.rs}} -``` diff --git a/examples/autodelete.rs b/examples/autodelete.rs new file mode 100644 index 0000000..c7701d5 --- /dev/null +++ b/examples/autodelete.rs @@ -0,0 +1,70 @@ +use std::{ + sync::Arc, + time::{SystemTime, UNIX_EPOCH}, +}; + +use tokio::time::{sleep, Duration}; +use tokio_util::task::TaskTracker; + +use taskline::prelude::*; + +fn now() -> f64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as f64 +} + +#[tokio::main] +async fn main() { + let client = + redis::Client::open("redis://127.0.0.1/").expect("the redis client should be created"); + + let queue = Arc::new(Taskline::new( + TasklineConfig::<&str>::new() + .queue_key("taskline-queue-autodelete") + .autodelete(false) + .build(), + client, + )); + + queue + .write("Hello!".to_string(), now() + 3000.) + .await + .unwrap(); + + queue + .write("Hello again!".to_string(), now() + 5000.) + .await + .unwrap(); + + println!("the message 'Hello!' will be read in 3s and 5s"); + + loop { + let tasks: Vec = queue + .read(now()) + .await + .expect("reading from redis should be ok"); + + if tasks.is_empty() { + sleep(Duration::from_millis(500)).await; + continue; + } + + let tracker = TaskTracker::new(); + for task in tasks { + let queue = Arc::clone(&queue); + tracker.spawn(async move { + println!("Consumed '{}'", task); + + queue + .delete(&task) + .await + .expect("the message should be deleted"); + println!("'{}' deleted", task); + }); + } + tracker.close(); + tracker.wait().await; + } +} diff --git a/examples/json.rs b/examples/json.rs new file mode 100644 index 0000000..726a128 --- /dev/null +++ b/examples/json.rs @@ -0,0 +1,62 @@ +use std::time::{SystemTime, UNIX_EPOCH}; + +use serde::{Deserialize, Serialize}; +use tokio::time::{sleep, Duration}; + +use taskline::prelude::*; + +fn now() -> f64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as f64 +} + +#[derive(Deserialize, Serialize, Debug, Clone)] +struct Data { + id: u64, + name: String, +} + +#[tokio::main] +async fn main() { + let client = + redis::Client::open("redis://127.0.0.1/").expect("the redis client should be created"); + + let queue = Taskline::new(TasklineConfig::::new().queue_key(123).build(), client); + + queue + .write( + serde_json::to_string(&Data { + id: 1, + name: "Task".to_string(), + }) + .ok(), + now() + 3000., + ) + .await + .unwrap(); + + println!("the data '1:Task!' will be read in 3s"); + + loop { + let tasks: Vec = queue + .read(now()) + .await + .expect("reading from redis should be ok"); + + if tasks.is_empty() { + sleep(Duration::from_millis(500)).await; + continue; + } + + for task in tasks { + tokio::task::spawn(async move { + println!( + "Consumed '{:?}'", + serde_json::from_str::(&task).expect("the message should be parsed") + ); + }); + } + } +} diff --git a/examples/memory.rs b/examples/memory.rs deleted file mode 100644 index 5cf4aec..0000000 --- a/examples/memory.rs +++ /dev/null @@ -1,33 +0,0 @@ -use taskline::prelude::*; - -#[derive(Debug, PartialEq, Clone)] -struct Task { - name: String, -} - -#[tokio::main] -async fn main() { - let backend = MemoryBackend::new(); - let producer = Producer::new(backend.clone()); - let consumer = Consumer::new(backend.clone()); - let committer = Committer::new(backend.clone()); - - producer - .schedule( - &Task { - name: "task".to_string(), - }, - &now(), - ) - .await - .unwrap(); - - poll_tasks(100, consumer, |tasks| async { - for task in tasks.unwrap() { - println!("Consumed {:?}", task); - committer.commit(&task).await.unwrap(); - } - true - }) - .await; -} diff --git a/examples/redis.rs b/examples/redis.rs deleted file mode 100644 index ebc1717..0000000 --- a/examples/redis.rs +++ /dev/null @@ -1,45 +0,0 @@ -extern crate redis; -use tokio::time::{sleep, Duration}; - -use taskline::prelude::*; - -#[tokio::main] -async fn main() { - let queue = RedisBackendConfig { - queue_key: "taskline", - read_batch_size: 10, - autodelete: true, - } - .with_client(redis::Client::open("redis://127.0.0.1/").unwrap()); - - if !queue.is_redis_version_ok().await.unwrap() { - return; - } - - queue - .write(&"Hello!".to_string(), &(now() + 1000.)) - .await - .unwrap(); - - loop { - let tasks = queue.read(&now()).await; - match tasks { - Ok(tasks) => { - if tasks.is_empty() { - sleep(Duration::from_millis(100)).await; - continue; - } - for task in tasks { - tokio::task::spawn(async move { - println!("Consumed '{}'", task); - }); - } - } - Err(e) => { - sleep(Duration::from_millis(1000)).await; - println!("Error: {:?}", e); - continue; - } - } - } -} diff --git a/examples/redis_json.rs b/examples/redis_json.rs deleted file mode 100644 index abdb753..0000000 --- a/examples/redis_json.rs +++ /dev/null @@ -1,48 +0,0 @@ -extern crate redis; -use serde::{Deserialize, Serialize}; - -use taskline::prelude::*; - -#[derive(Deserialize, Serialize, Debug, Clone)] -struct Data { - id: u64, - name: String, -} - -#[tokio::main] -async fn main() { - let queue_key = String::from("taskline"); - let backend = JsonRedisBackend::::new(RedisBackend::new( - redis::Client::open("redis://127.0.0.1/").unwrap(), - queue_key, - 10, - true, - )); - let producer = Producer::new(backend.clone()); - let consumer = Consumer::new(backend.clone()); - - if !backend.is_redis_version_ok().await.unwrap() { - return; - } - - producer - .schedule( - &Data { - id: 1, - name: "Task".to_string(), - }, - &(now() + 1000.), - ) - .await - .unwrap(); - - poll_tasks(100, consumer, |tasks| async { - for task in tasks.unwrap() { - tokio::task::spawn(async move { - println!("Consumed {:?}", task.unwrap()); - }); - } - true - }) - .await; -} diff --git a/examples/redis_json_autodelete.rs b/examples/redis_json_autodelete.rs deleted file mode 100644 index 55e64e6..0000000 --- a/examples/redis_json_autodelete.rs +++ /dev/null @@ -1,44 +0,0 @@ -extern crate redis; -use serde::{Deserialize, Serialize}; - -use taskline::prelude::*; - -#[derive(Deserialize, Serialize, Debug, Clone)] -struct Data { - id: u64, - name: String, -} - -#[tokio::main] -async fn main() { - let backend = JsonRedisBackend::::new(RedisBackend::new( - redis::Client::open("redis://127.0.0.1/").unwrap(), - String::from("taskline"), - 10, - false, - )); - let producer = Producer::new(backend.clone()); - let consumer = Consumer::new(backend.clone()); - let committer = Committer::new(backend.clone()); - - producer - .schedule( - &Data { - id: 1, - name: "Task".to_string(), - }, - &(now() + 1000.), - ) - .await - .unwrap(); - - poll_tasks(100, consumer, |tasks| async { - for task in tasks.unwrap() { - let task = task.unwrap(); - println!("Consumed {:?}", task); - committer.commit(&task).await.unwrap(); - } - true - }) - .await; -} diff --git a/examples/simple.rs b/examples/simple.rs new file mode 100644 index 0000000..d583a6f --- /dev/null +++ b/examples/simple.rs @@ -0,0 +1,50 @@ +use std::time::{SystemTime, UNIX_EPOCH}; + +use tokio::time::{sleep, Duration}; + +use taskline::prelude::*; + +fn now() -> f64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis() as f64 +} + +#[tokio::main] +async fn main() { + let client = + redis::Client::open("redis://127.0.0.1/").expect("the redis client should be created"); + + let queue = Taskline::new( + TasklineConfig::<&str>::new() + .queue_key("taskline-queue") + .build(), + client, + ); + + queue + .write("Hello!".to_string(), now() + 3000.) + .await + .unwrap(); + + println!("the message 'Hello!' will be read in 3s"); + + loop { + let tasks: Vec = queue + .read(now()) + .await + .expect("reading from redis should be ok"); + + if tasks.is_empty() { + sleep(Duration::from_millis(500)).await; + continue; + } + + for task in tasks { + tokio::task::spawn(async move { + println!("Consumed '{}'", task); + }); + } + } +} diff --git a/src/backend.rs b/src/backend.rs deleted file mode 100644 index cb7cc0c..0000000 --- a/src/backend.rs +++ /dev/null @@ -1,32 +0,0 @@ -//! Backend traits for queue. -//! You can implement your own backend by implementing this traits. -use async_trait::async_trait; - -/// The trait for backend implementations which can be used to dequeue tasks. -/// -/// R - type of task. -/// S - type of score used to sort tasks in queue. -/// E - type of error. -#[async_trait] -pub trait DequeuBackend { - async fn dequeue(&self, score: &S) -> Result, E>; -} - -/// The trait for backend implementations which can be used to enqueue tasks. -/// -/// R - type of task. -/// S - type of score used to sort tasks in queue. -/// E - type of error. -#[async_trait] -pub trait EnqueuBackend { - async fn enqueue(&self, task: &R, score: &S) -> Result<(), E>; -} - -/// The trait for backend implementations which can be used to commit tasks. -/// -/// R - type of task. -/// E - type of error. -#[async_trait] -pub trait CommitBackend { - async fn commit(&self, task: &R) -> Result<(), E>; -} diff --git a/src/backends/memory.rs b/src/backends/memory.rs deleted file mode 100644 index 6168c22..0000000 --- a/src/backends/memory.rs +++ /dev/null @@ -1,132 +0,0 @@ -use std::cell::RefCell; -use std::sync::{Arc, Mutex}; - -use async_trait::async_trait; - -use crate::backend::{CommitBackend, DequeuBackend, EnqueuBackend}; - -type Queue = Vec>; - -#[derive(Clone)] -struct Item -where - T: Clone + PartialEq, - S: Copy + PartialOrd, -{ - score: S, - val: T, -} - -/// Memory backend. O(n). This backend is not recommended for production use. -/// -/// New in version 0.8.0. -#[derive(Clone, Default)] -pub struct MemoryBackend -where - T: Clone + PartialEq, - S: Copy + PartialOrd, -{ - queue: Arc>>>, -} - -impl MemoryBackend -where - T: Clone + PartialEq, - S: Copy + PartialOrd, -{ - pub fn new() -> MemoryBackend { - MemoryBackend { - queue: Arc::new(Mutex::new(RefCell::new(Vec::new()))), - } - } - - pub fn with_capacity(capacity: usize) -> MemoryBackend { - MemoryBackend { - queue: Arc::new(Mutex::new(RefCell::new(Vec::with_capacity(capacity)))), - } - } - - pub fn read(&self, score: &S) -> Vec { - self.queue - .lock() - .unwrap() - .borrow() - .iter() - .filter(|v| v.score <= *score) - .map(|v| v.val.clone()) - .collect::>() - } - - pub fn write(&self, task: &T, score: &S) { - let queue = self.queue.lock().unwrap(); - let mut queue = queue.borrow_mut(); - queue.retain(|v| v.val != *task); - queue.push(Item { - score: *score, - val: task.clone(), - }); - } - - pub fn delete(&self, task: &T) { - self.queue - .lock() - .unwrap() - .borrow_mut() - .retain(|v| v.val != *task); - } -} - -#[async_trait] -impl DequeuBackend for MemoryBackend -where - T: Clone + PartialEq + Send, - S: Copy + PartialOrd + Sync + Send, -{ - async fn dequeue(&self, score: &S) -> Result, ()> { - Ok(self.read(score)) - } -} - -#[async_trait] -impl EnqueuBackend for MemoryBackend -where - T: Clone + PartialEq + Sync + Send, - S: Copy + PartialOrd + Sync + Send, -{ - async fn enqueue(&self, task: &T, score: &S) -> Result<(), ()> { - self.write(task, score); - Ok(()) - } -} - -#[async_trait] -impl CommitBackend for MemoryBackend -where - T: Clone + PartialEq + Sync + Send, - S: Copy + PartialOrd + Sync + Send, -{ - async fn commit(&self, task: &T) -> Result<(), ()> { - self.delete(task); - Ok(()) - } -} - -#[cfg(test)] -mod tests { - use super::*; - - #[tokio::test] - async fn test_consumer() { - let backend = MemoryBackend::new(); - - backend.enqueue(&"task".to_string(), &1.).await.unwrap(); - backend.enqueue(&"task".to_string(), &1.).await.unwrap(); - - let tasks = backend.dequeue(&1.).await.unwrap(); - for task in tasks { - assert_eq!(task, "task".to_string()); - backend.commit(&task).await.unwrap(); - } - assert!(backend.queue.lock().unwrap().borrow().is_empty()); - } -} diff --git a/src/backends/mod.rs b/src/backends/mod.rs deleted file mode 100644 index e047070..0000000 --- a/src/backends/mod.rs +++ /dev/null @@ -1,8 +0,0 @@ -//! Backends which can be used to store data. -//! The backends must implement `DequeuBackend` and `EnqueuBackend` traits. -//! They can be used in Consumer and Producer. -pub mod memory; -#[cfg(feature = "redis")] -pub mod redis; -#[cfg(all(feature = "redis", feature = "json"))] -pub mod redis_json; diff --git a/src/backends/redis.rs b/src/backends/redis.rs deleted file mode 100644 index df93ca3..0000000 --- a/src/backends/redis.rs +++ /dev/null @@ -1,196 +0,0 @@ -extern crate redis; -use async_trait::async_trait; -use redis::{AsyncCommands, RedisError}; -use std::ops; - -use crate::backend::{CommitBackend, DequeuBackend, EnqueuBackend}; - -/// Configuration for Redis backend. -/// You can use it to create `RedisBackend` instance. -pub struct RedisBackendConfig { - /// Redis key is used to store tasks. - pub queue_key: S, - /// Number of tasks to read in one batch. - pub read_batch_size: usize, - /// If `true`, tasks will be deleted from queue after reading. - /// If autodelete is `false`, tasks should be deleted explicitly from queue after reading with `RedisBackend::delete`. - /// - /// New in version 0.5.0. - pub autodelete: bool, -} - -impl RedisBackendConfig { - /// Create `RedisBackend` instance. - /// It requires `redis::Client` instance. - /// - /// New in version 0.5.0. - pub fn with_client(self, client: redis::Client) -> RedisBackend { - RedisBackend::new( - client, - self.queue_key.to_string(), - self.read_batch_size, - self.autodelete, - ) - } -} - -impl ops::Add for RedisBackendConfig { - type Output = RedisBackend; - - /// Create `RedisBackend` instance. - /// It requires `redis::Client` instance. - /// Alias for `RedisBackendConfig::with_client`. - fn add(self, client: redis::Client) -> RedisBackend { - self.with_client(client) - } -} - -/// Redis backend. -/// It implements both `DequeuBackend` and `EnqueuBackend` traits. -/// You can use score to sort tasks in queue. Usually it is unix timestamp. -#[derive(Clone)] -pub struct RedisBackend { - client: redis::Client, - queue_key: String, - pop_schedule_script: redis::Script, - read_batch_size: usize, - pub(crate) autodelete: bool, -} - -impl RedisBackend { - /// Create new instance of `RedisBackend`. - /// - /// It requires `redis::Client` instance, redis key used to store tasks and number of tasks to read in one batch. - /// It also creates lua script used to pop tasks from redis. - /// * `client` - redis client. - /// * `queue_key` - redis key is used to store tasks. - /// * `read_batch_size` - number of tasks to read in one batch. - /// * `autodelete` - if `true`, tasks will be deleted from queue after reading. If `false`, tasks should be deleted explicitly from queue after reading with `RedisBackend::delete`. New in version 0.5.0. - pub fn new( - client: redis::Client, - queue_key: String, - read_batch_size: usize, - autodelete: bool, - ) -> Self { - Self { - client, - queue_key, - pop_schedule_script: redis::Script::new( - r" - local key = KEYS[1] - local unix_ts = ARGV[1] - local limit = ARGV[2] - local autodelete = ARGV[3] == '1' - local res = redis.call('zrange', key, '-inf', unix_ts, 'byscore', 'limit', 0, limit) - if autodelete then - for _, raw in ipairs(res) do - redis.call('zrem', key, raw) - end - end - return res", - ), - read_batch_size, - autodelete, - } - } - - /// Calls lua script to pop tasks from redis. - /// If there are no tasks in queue it returns empty vector. - /// If there are no tasks with score less than `score`, returns empty vector. - pub async fn read(&self, score: &f64) -> Result, RedisError> { - let mut con = match self.client.get_multiplexed_async_connection().await { - Ok(con) => con, - Err(e) => return Err(e), - }; - - let result: Vec = match self - .pop_schedule_script - .key(self.queue_key.as_str()) - .arg(score) - .arg(self.read_batch_size) - .arg(self.autodelete as u8) - .invoke_async(&mut con) - .await - { - Ok(result) => result, - Err(e) => return Err(e), - }; - - Ok(result) - } - - /// Adds a task to redis. - /// It uses score to sort tasks in queue. Usually it is unix timestamp. - pub async fn write(&self, task: &String, score: &f64) -> Result<(), RedisError> { - let mut con = match self.client.get_multiplexed_async_connection().await { - Ok(con) => con, - Err(e) => return Err(e), - }; - con.zadd(self.queue_key.as_str(), task, score).await - } - - /// Delete a task from queue. - /// - /// New in version 0.5.0. - pub async fn delete(&self, task: &String) -> Result<(), RedisError> { - if self.autodelete { - return Ok(()); - } - let mut con = match self.client.get_multiplexed_async_connection().await { - Ok(con) => con, - Err(e) => return Err(e), - }; - - con.zrem(self.queue_key.as_str(), task).await - } - - /// Check redis version. - /// - /// New in version 0.6.0. - pub async fn is_redis_version_ok(&self) -> Result { - let mut con = self.client.get_multiplexed_async_connection().await?; - let res: String = redis::cmd("INFO").query_async(&mut con).await?; - let mut ver = res - .lines() - .find(|s| s.contains("redis_version:")) - .unwrap() - .split(':') - .last() - .unwrap() - .split('.') - .take(2); - - let major: u8 = ver.next().unwrap().parse().unwrap(); - let minor: u8 = ver.next().unwrap().parse().unwrap(); - Ok((major, minor) >= (6, 2)) - } -} - -#[async_trait] -impl CommitBackend for RedisBackend { - /// Delete a task from queue. - /// - /// New in version 0.5.1. - async fn commit(&self, task: &String) -> Result<(), RedisError> { - self.delete(task).await - } -} - -#[async_trait] -impl DequeuBackend for RedisBackend { - /// Calls lua script to pop tasks from redis. - /// If there are no tasks in queue it returns empty vector. - /// If there are no tasks with score less than `score`, returns empty vector. - async fn dequeue(&self, score: &f64) -> Result, RedisError> { - self.read(score).await - } -} - -#[async_trait] -impl EnqueuBackend for RedisBackend { - /// Adds a task to redis. - /// It uses score to sort tasks in queue. Usually it is unix timestamp. - async fn enqueue(&self, task: &String, score: &f64) -> Result<(), RedisError> { - self.write(task, score).await - } -} diff --git a/src/backends/redis_json.rs b/src/backends/redis_json.rs deleted file mode 100644 index d738d18..0000000 --- a/src/backends/redis_json.rs +++ /dev/null @@ -1,121 +0,0 @@ -extern crate redis; -use async_trait::async_trait; -use serde::{de::DeserializeOwned, Serialize}; - -use crate::backend::{CommitBackend, DequeuBackend, EnqueuBackend}; -use crate::backends::redis::RedisBackend; - -/// Error type for `JsonRedisBackend`. -/// It wraps `redis::RedisError` and `serde_json::Error`. -#[derive(Debug)] -pub enum JsonRedisError { - Redis(redis::RedisError), - Serde(serde_json::Error), -} - -/// Redis backend with JSON serialization. -/// It implements both `DequeuBackend` and `EnqueuBackend` traits. -/// It requires `serde::Serialize` and `serde::DeserializeOwned` traits for task type. -/// It overrides `RedisBackend` with `serde_json::to_string` and `serde_json::from_str` calls. -/// The logic of `RedisBackend` is not changed. -#[derive(Clone)] -pub struct JsonRedisBackend { - backend: RedisBackend, - _phantom: std::marker::PhantomData, -} - -impl JsonRedisBackend { - pub fn new(backend: RedisBackend) -> Self { - Self { - backend, - _phantom: std::marker::PhantomData, - } - } - - /// Check redis version. - /// - /// New in version 0.6.0. - pub async fn is_redis_version_ok(&self) -> Result { - self.backend.is_redis_version_ok().await - } -} - -impl JsonRedisBackend -where - T: Serialize + Send + Sync, -{ - /// Serializes data to JSON and calls `RedisBackend::write`. - pub async fn write(&self, data: &T, score: &f64) -> Result<(), JsonRedisError> { - let data = match serde_json::to_string(&data) { - Ok(data) => data, - Err(e) => return Err(JsonRedisError::Serde(e)), - }; - match self.backend.write(&data, score).await { - Ok(_) => Ok(()), - Err(e) => Err(JsonRedisError::Redis(e)), - } - } - - /// Delete task from queue. - /// - /// New in version 0.5.0. - pub async fn delete(&self, data: &T) -> Result<(), JsonRedisError> { - if self.backend.autodelete { - return Ok(()); - } - let data = match serde_json::to_string(&data) { - Ok(data) => data, - Err(e) => return Err(JsonRedisError::Serde(e)), - }; - match self.backend.commit(&data).await { - Ok(_) => Ok(()), - Err(e) => Err(JsonRedisError::Redis(e)), - } - } -} - -impl JsonRedisBackend -where - T: DeserializeOwned + Send + Sync, -{ - /// Returns `Vec>` because it is possible that some tasks will be corrupted. - pub async fn read(&self, score: &f64) -> Result>, redis::RedisError> { - let data = self.backend.read(score).await?; - Ok(data.into_iter().map(|d| serde_json::from_str(&d)).collect()) - } -} - -#[async_trait] -impl CommitBackend for JsonRedisBackend -where - T: Serialize + Send + Sync, -{ - /// Delete task from queue. - /// - /// New in version 0.5.1. - async fn commit(&self, data: &T) -> Result<(), JsonRedisError> { - self.delete(data).await - } -} - -#[async_trait] -impl EnqueuBackend for JsonRedisBackend -where - T: Serialize + Send + Sync, -{ - /// Serializes data to JSON and calls `RedisBackend::write`. - async fn enqueue(&self, data: &T, score: &f64) -> Result<(), JsonRedisError> { - self.write(data, score).await - } -} - -#[async_trait] -impl DequeuBackend, f64, redis::RedisError> for JsonRedisBackend -where - T: DeserializeOwned + Send + Sync, -{ - /// Returns `Vec>` because it is possible that some tasks will be corrupted. - async fn dequeue(&self, score: &f64) -> Result>, redis::RedisError> { - self.read(score).await - } -} diff --git a/src/committer.rs b/src/committer.rs deleted file mode 100644 index dc81dfa..0000000 --- a/src/committer.rs +++ /dev/null @@ -1,35 +0,0 @@ -//! Committer of tasks. -//! Main struct for committing tasks from queue. -use std::marker::PhantomData; - -use crate::backend::CommitBackend; - -/// Abstract committer of tasks. -/// It is generic over the backend used to commit tasks. -/// R, S, E - types of request, score and error. -pub struct Committer -where - T: CommitBackend, -{ - backend: T, - _phantom_request: PhantomData, - _phantom_error: PhantomData, -} - -impl Committer -where - T: CommitBackend, -{ - /// Creates new committer. - pub fn new(backend: T) -> Committer { - Committer { - backend, - _phantom_request: PhantomData, - _phantom_error: PhantomData, - } - } - - pub async fn commit(&self, task: &R) -> Result<(), E> { - self.backend.commit(task).await - } -} diff --git a/src/consumer.rs b/src/consumer.rs deleted file mode 100644 index bb62dc8..0000000 --- a/src/consumer.rs +++ /dev/null @@ -1,41 +0,0 @@ -//! Consumer of tasks. -//! Main struct for consuming tasks from queue. -use std::marker::PhantomData; - -use crate::backend::DequeuBackend; - -/// Abstract consumer of tasks. -/// It is generic over the backend used to dequeue tasks. -/// R, S, E - types of request, score and error. -pub struct Consumer -where - T: DequeuBackend, -{ - backend: T, - _phantom_request: PhantomData, - _phantom_score: PhantomData, - _phantom_error: PhantomData, -} - -impl Consumer -where - T: DequeuBackend, -{ - /// Creates new consumer. - pub fn new(backend: T) -> Consumer { - Consumer { - backend, - _phantom_request: PhantomData, - _phantom_score: PhantomData, - _phantom_error: PhantomData, - } - } - - /// Polls tasks from queue. - /// Returns vector of tasks. - /// If there are no tasks in queue, returns empty vector. - /// If there are no tasks with score less than `score`, returns empty vector. - pub async fn poll(&self, score: &S) -> Result, E> { - self.backend.dequeue(score).await - } -} diff --git a/src/lib.rs b/src/lib.rs index 996319b..11ff5a6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,9 +1,3 @@ //! The library allows to create scheduled tasks via Redis for Rust. -pub mod backend; -pub mod backends; -pub mod committer; -pub mod consumer; pub mod prelude; -pub mod producer; -#[cfg(feature = "tokio")] -pub mod utils; +pub mod taskline; diff --git a/src/prelude.rs b/src/prelude.rs index 5432b20..f610666 100644 --- a/src/prelude.rs +++ b/src/prelude.rs @@ -1,12 +1 @@ -//! Types and traits that are commonly used in the library. -pub use crate::backend::*; -pub use crate::backends::memory::MemoryBackend; -#[cfg(feature = "redis")] -pub use crate::backends::redis::{RedisBackend, RedisBackendConfig}; -#[cfg(all(feature = "redis", feature = "json"))] -pub use crate::backends::redis_json::{JsonRedisBackend, JsonRedisError}; -pub use crate::committer::*; -pub use crate::consumer::*; -pub use crate::producer::*; -#[cfg(feature = "tokio")] -pub use crate::utils::*; +pub use crate::taskline::{Taskline, TasklineConfig}; diff --git a/src/producer.rs b/src/producer.rs deleted file mode 100644 index 27d9704..0000000 --- a/src/producer.rs +++ /dev/null @@ -1,39 +0,0 @@ -//! Producer of tasks. -//! Main struct for producing tasks to queue. -use std::marker::PhantomData; - -use crate::backend::EnqueuBackend; - -/// Abstract producer of tasks. -/// It is generic over the backend used to enqueue tasks. -/// R, S, E - types of request, score and error. -pub struct Producer -where - T: EnqueuBackend, -{ - backend: T, - _phantom_request: PhantomData, - _phantom_score: PhantomData, - _phantom_error: PhantomData, -} - -impl Producer -where - T: EnqueuBackend, -{ - /// Creates new producer. - pub fn new(backend: T) -> Producer { - Producer { - backend, - _phantom_request: PhantomData, - _phantom_score: PhantomData, - _phantom_error: PhantomData, - } - } - - /// Schedules a task to queue. - /// Returns `Ok(())` if task was successfully scheduled. - pub async fn schedule(&self, task: &R, score: &S) -> Result<(), E> { - self.backend.enqueue(task, score).await - } -} diff --git a/src/taskline.rs b/src/taskline.rs new file mode 100644 index 0000000..4608a54 --- /dev/null +++ b/src/taskline.rs @@ -0,0 +1,129 @@ +use redis::{AsyncCommands, FromRedisValue, RedisError, ToRedisArgs}; + +static POP_SCHEDULE_SCRIPT: &str = r#" +local key = KEYS[1] +local unix_ts = ARGV[1] +local limit = ARGV[2] +local autodelete = ARGV[3] == '1' +local res = redis.call('zrange', key, '-inf', unix_ts, 'byscore', 'limit', 0, limit) +if autodelete then + for _, raw in ipairs(res) do + redis.call('zrem', key, raw) + end +end +return res +"#; + +/// Configuration for Taskline. +#[derive(Debug, Clone)] +pub struct TasklineConfig { + queue_key: K, + read_batch_size: usize, + autodelete: bool, +} + +impl Default for TasklineConfig { + fn default() -> Self { + Self { + queue_key: K::default(), + read_batch_size: 50, + autodelete: true, + } + } +} + +impl TasklineConfig { + pub fn new() -> Self { + Self::default() + } + + /// Redis key is used to store tasks. + pub fn queue_key(mut self, queue_key: impl Into) -> Self { + self.queue_key = queue_key.into(); + self + } + + /// Number of tasks to read in one batch. + pub fn read_batch_size(mut self, read_batch_size: usize) -> Self { + self.read_batch_size = read_batch_size; + self + } + + /// If `true`, tasks will be deleted from queue after reading. + /// If autodelete is `false`, tasks should be deleted explicitly from queue after reading with `Taskline::delete`. + pub fn autodelete(mut self, autodelete: bool) -> Self { + self.autodelete = autodelete; + self + } + + pub fn build(self) -> Self { + Self { + queue_key: self.queue_key, + read_batch_size: self.read_batch_size, + autodelete: self.autodelete, + } + } +} + +/// # Taskline. +/// You can use score to sort tasks in queue. Usually it is unix timestamp. +#[derive(Debug, Clone)] +pub struct Taskline { + config: TasklineConfig, + client: redis::Client, + pop_schedule_script: redis::Script, +} + +impl Taskline { + /// Creates new instance of `Taskline`. + /// + /// It requires `redis::Client` instance, redis key used to store tasks and number of tasks to read in one batch. + pub fn new(config: TasklineConfig, client: redis::Client) -> Self { + Self { + config, + client, + pop_schedule_script: redis::Script::new(POP_SCHEDULE_SCRIPT), + } + } + + /// Calls lua script to pop tasks from redis. + /// If there are no tasks in queue it returns empty vector. + /// If there are no tasks with score less than `score`, returns empty vector. + pub async fn read(&self, score: S) -> Result { + let mut con = self.client.get_multiplexed_async_connection().await?; + + let result: R = self + .pop_schedule_script + .key(self.config.queue_key.clone()) + .arg(score) + .arg(self.config.read_batch_size) + .arg(self.config.autodelete as u8) + .invoke_async(&mut con) + .await?; + + Ok(result) + } +} + +impl Taskline { + /// Adds a task to redis. + /// It uses score to sort tasks in queue. Usually it is unix timestamp. + pub async fn write( + &self, + task: M, + score: S, + ) -> Result<(), RedisError> { + let mut con = self.client.get_multiplexed_async_connection().await?; + con.zadd(self.config.queue_key.clone(), task, score).await + } + + /// Delete a task from queue. + pub async fn delete(&self, task: M) -> Result<(), RedisError> { + if self.config.autodelete { + return Ok(()); + } + let mut con = self.client.get_multiplexed_async_connection().await?; + + con.zrem(self.config.queue_key.clone(), task).await + } +} diff --git a/src/utils.rs b/src/utils.rs deleted file mode 100644 index 3054310..0000000 --- a/src/utils.rs +++ /dev/null @@ -1,52 +0,0 @@ -use std::{ - fmt, - future::Future, - time::{SystemTime, UNIX_EPOCH}, -}; -use tokio::time::{sleep, Duration}; - -use crate::prelude::{Consumer, DequeuBackend}; - -/// Returns current time. -pub fn now() -> f64 { - SystemTime::now() - .duration_since(UNIX_EPOCH) - .unwrap() - .as_millis() as f64 -} - -/// Polls tasks from a backend and calls the function in the loop. -pub async fn poll_tasks( - interval: u64, - consumer: Consumer, - callback: impl Fn(Result, E>) -> Fut, -) where - T: DequeuBackend, - Fut: Future, - E: fmt::Debug, -{ - loop { - let score = now(); - match consumer.poll(&score).await { - Ok(tasks) => { - if tasks.is_empty() { - if interval != 0 { - sleep(Duration::from_millis(interval)).await; - } - continue; - } - if !callback(Ok(tasks)).await { - break; - } - } - Err(err) => { - if !callback(Err(err)).await { - break; - } - if interval != 0 { - sleep(Duration::from_millis(interval)).await; - } - } - } - } -} diff --git a/tests/main.rs b/tests/main.rs index b39b6e8..8b13789 100644 --- a/tests/main.rs +++ b/tests/main.rs @@ -1,70 +1 @@ -use taskline::committer::Committer; -use taskline::consumer::Consumer; -use taskline::producer::Producer; -mod backend { - use std::cell::RefCell; - use std::sync::{Arc, Mutex}; - use std::vec; - - use async_trait::async_trait; - - use taskline::prelude::{CommitBackend, DequeuBackend, EnqueuBackend}; - - #[derive(Clone)] - pub(crate) struct MemBackend { - pub queue: Arc>>>, - } - - impl MemBackend { - pub(crate) fn new() -> MemBackend { - MemBackend { - queue: Arc::new(Mutex::new(RefCell::new(Vec::new()))), - } - } - } - - #[async_trait] - impl DequeuBackend for MemBackend { - async fn dequeue(&self, _score: &()) -> Result, ()> { - Ok(vec![*self.queue.lock().unwrap().borrow().first().unwrap()]) - } - } - - #[async_trait] - impl EnqueuBackend for MemBackend { - async fn enqueue(&self, task: &i32, _score: &()) -> Result<(), ()> { - self.queue.lock().unwrap().borrow_mut().push(*task); - Ok(()) - } - } - - #[async_trait] - impl CommitBackend for MemBackend { - async fn commit(&self, task: &i32) -> Result<(), ()> { - self.queue - .lock() - .unwrap() - .borrow_mut() - .retain(|x| *x != *task); - Ok(()) - } - } -} - -#[tokio::test] -async fn test_consumer() { - let backend = backend::MemBackend::new(); - let client = Producer::new(backend.clone()); - let consumer = Consumer::new(backend.clone()); - let committer = Committer::new(backend.clone()); - - client.schedule(&1, &()).await.unwrap(); - - let tasks = consumer.poll(&()).await.unwrap(); - for task in tasks { - assert_eq!(task, 1); - committer.commit(&task).await.unwrap(); - } - assert!(backend.queue.lock().unwrap().borrow().is_empty()); -}