From a469496ef516d01b41859a95c5c96f3285042603 Mon Sep 17 00:00:00 2001 From: Eduard Smet Date: Thu, 12 Mar 2026 00:59:04 +0100 Subject: [PATCH] WIP: feat: Plugin API rework --- Cargo.lock | 307 +++++++++++++- Cargo.toml | 2 + src/cli.rs | 3 + src/database.rs | 92 +++++ src/discord.rs | 167 ++++---- src/discord/events.rs | 138 ++++--- src/discord/interactions.rs | 69 ++-- src/discord/requests.rs | 181 ++++----- src/http/registry.rs | 6 +- src/job_scheduler.rs | 172 +++----- src/main.rs | 192 +++++---- src/plugins.rs | 53 +-- src/plugins/registry.rs | 49 ++- src/plugins/runtime.rs | 380 ++++-------------- src/plugins/runtime/internal.rs | 135 +------ src/plugins/runtime/internal/core.rs | 64 +++ src/plugins/runtime/internal/discord.rs | 38 ++ src/plugins/runtime/internal/job_scheduler.rs | 28 ++ src/utils/channels.rs | 120 ++++-- src/utils/env.rs | 34 +- wit/core.wit | 65 +++ wit/discord.wit | 131 ++++-- wit/host.wit | 42 -- wit/job-scheduler.wit | 36 ++ wit/plugin.wit | 64 --- wit/world.wit | 15 +- 26 files changed, 1393 insertions(+), 1190 deletions(-) create mode 100644 src/database.rs create mode 100644 src/plugins/runtime/internal/core.rs create mode 100644 src/plugins/runtime/internal/discord.rs create mode 100644 src/plugins/runtime/internal/job_scheduler.rs create mode 100644 wit/core.wit delete mode 100644 wit/host.wit create mode 100644 wit/job-scheduler.wit delete mode 100644 wit/plugin.wit diff --git a/Cargo.lock b/Cargo.lock index a941dac..db7a6a2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -216,12 +216,24 @@ dependencies = [ "allocator-api2", ] +[[package]] +name = "byteorder-lite" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f1fe948ff07f4bd06c30984e69f5b4899c516a3ef74f34df92a2df2ab535495" + [[package]] name = "bytes" version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b35204fbdc0b3f4446b89fc1ac2cf84a8a68971995d0bf2e925ec7cd960f9cb3" +[[package]] +name = "byteview" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1c53ba0f290bfc610084c05582d9c5d421662128fc69f4bf236707af6fd321b9" + [[package]] name = "cap-fs-ext" version = "3.4.5" @@ -427,6 +439,12 @@ dependencies = [ "memchr", ] +[[package]] +name = "compare" +version = "0.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea0095f6103c2a8b44acd6fd15960c801dafebf02e21940360833e0673f48ba7" + [[package]] name = "core-foundation" version = "0.9.4" @@ -663,6 +681,16 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "crossbeam-skiplist" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df29de440c58ca2cc6e587ec3d22347551a32435fbde9d2bff64e78a9ffa151b" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.21" @@ -823,6 +851,7 @@ dependencies = [ "chrono", "clap", "dotenvy", + "fjall", "indexmap", "reqwest", "rustls 0.23.36", @@ -841,6 +870,7 @@ dependencies = [ "twilight-http", "twilight-model", "url", + "uuid", "wasmtime", "wasmtime-wasi", "wasmtime-wasi-http", @@ -917,6 +947,18 @@ dependencies = [ "syn", ] +[[package]] +name = "enum_dispatch" +version = "0.3.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa18ce2bc66555b3218614519ac839ddb759a7d6720732f979ef8d13be147ecd" +dependencies = [ + "once_cell", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "equivalent" version = "1.0.2" @@ -980,6 +1022,23 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" +[[package]] +name = "fjall" +version = "3.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a9530ff159bc3ad3a15da746da0f6e95375c2ac64708cbb85ec1ebd26761a84" +dependencies = [ + "byteorder-lite", + "byteview", + "dashmap", + "flume", + "log", + "lsm-tree", + "lz4_flex", + "tempfile", + "xxhash-rust", +] + [[package]] name = "float-cmp" version = "0.10.0" @@ -989,6 +1048,15 @@ dependencies = [ "num-traits", ] +[[package]] +name = "flume" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e139bc46ca777eb5efaf62df0ab8cc5fd400866427e56c68b22e414e53bd3be" +dependencies = [ + "spin", +] + [[package]] name = "fnv" version = "1.0.7" @@ -1144,11 +1212,24 @@ dependencies = [ "cfg-if", "js-sys", "libc", - "r-efi", + "r-efi 5.3.0", "wasip2", "wasm-bindgen", ] +[[package]] +name = "getrandom" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0de51e6874e94e7bf76d726fc5d13ba782deca734ff60d5bb2fb2607c7406555" +dependencies = [ + "cfg-if", + "libc", + "r-efi 6.0.0", + "wasip2", + "wasip3", +] + [[package]] name = "gimli" version = "0.32.3" @@ -1554,6 +1635,15 @@ dependencies = [ "serde_core", ] +[[package]] +name = "interval-heap" +version = "0.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "11274e5e8e89b8607cfedc2910b6626e998779b48a019151c7604d0adcb86ac6" +dependencies = [ + "compare", +] + [[package]] name = "io-extras" version = "0.18.4" @@ -1760,6 +1850,37 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" +[[package]] +name = "lsm-tree" +version = "3.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d67f95fd716870329c30aaeedf87f23d426564e6ce46efa045a91444faf2a19" +dependencies = [ + "byteorder-lite", + "byteview", + "crossbeam-skiplist", + "enum_dispatch", + "interval-heap", + "log", + "lz4_flex", + "quick_cache", + "rustc-hash", + "self_cell", + "sfa", + "tempfile", + "varint-rs", + "xxhash-rust", +] + +[[package]] +name = "lz4_flex" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db9a0d582c2874f68138a16ce1867e0ffde6c0bb0a0df85e1f36d04146db488a" +dependencies = [ + "twox-hash", +] + [[package]] name = "mach2" version = "0.4.3" @@ -2039,6 +2160,16 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "prettyplease" +version = "0.2.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "479ca8adacdd7ce8f1fb39ce9ecccbfe93a3f1344b3d0d97f20bc0196208f62b" +dependencies = [ + "proc-macro2", + "syn", +] + [[package]] name = "proc-macro2" version = "1.0.106" @@ -2091,6 +2222,16 @@ dependencies = [ "syn", ] +[[package]] +name = "quick_cache" +version = "0.6.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a70b1b8b47e31d0498ecbc3c5470bb931399a8bfed1fd79d1717a61ce7f96e3" +dependencies = [ + "equivalent", + "hashbrown 0.16.1", +] + [[package]] name = "quinn" version = "0.11.9" @@ -2162,6 +2303,12 @@ version = "5.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" +[[package]] +name = "r-efi" +version = "6.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8dcc9c7d52a811697d2151c701e0d08956f92b0e24136cf4cf27b57a6a0d9bf" + [[package]] name = "rancor" version = "0.1.1" @@ -2641,6 +2788,12 @@ dependencies = [ "libc", ] +[[package]] +name = "self_cell" +version = "1.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b12e76d157a900eb52e81bc6e9f3069344290341720e9178cde2407113ac8d89" + [[package]] name = "semver" version = "1.0.27" @@ -2750,6 +2903,17 @@ dependencies = [ "unsafe-libyaml", ] +[[package]] +name = "sfa" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1296838937cab56cd6c4eeeb8718ec777383700c33f060e2869867bd01d1175" +dependencies = [ + "byteorder-lite", + "log", + "xxhash-rust", +] + [[package]] name = "sha1_smol" version = "1.0.1" @@ -2902,6 +3066,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" +dependencies = [ + "lock_api", +] + [[package]] name = "stable_deref_trait" version = "1.2.1" @@ -3506,6 +3679,12 @@ dependencies = [ "twilight-model", ] +[[package]] +name = "twox-hash" +version = "2.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ea3136b675547379c4bd395ca6b938e5ad3c3d20fad76e7fe85f9e0d011419c" + [[package]] name = "typenum" version = "1.19.0" @@ -3568,11 +3747,11 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "uuid" -version = "1.20.0" +version = "1.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ee48d38b119b0cd71fe4141b30f5ba9c7c5d9f4e7a3a8b4a674e4b6ef789976f" +checksum = "a68d3c8f01c0cfa54a75291d83601161799e4a89a39e0929f4b0354d88757a37" dependencies = [ - "getrandom 0.3.4", + "getrandom 0.4.2", "js-sys", "wasm-bindgen", ] @@ -3595,6 +3774,12 @@ dependencies = [ "ryu", ] +[[package]] +name = "varint-rs" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f54a172d0620933a27a4360d3db3e2ae0dd6cceae9730751a036bbf182c4b23" + [[package]] name = "version_check" version = "0.9.5" @@ -3635,6 +3820,15 @@ dependencies = [ "wit-bindgen", ] +[[package]] +name = "wasip3" +version = "0.4.0+wasi-0.3.0-rc-2026-01-06" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5428f8bf88ea5ddc08faddef2ac4a67e390b88186c703ce6dbd955e1c145aca5" +dependencies = [ + "wit-bindgen", +] + [[package]] name = "wasm-bindgen" version = "0.2.108" @@ -3735,6 +3929,18 @@ dependencies = [ "wasmparser 0.244.0", ] +[[package]] +name = "wasm-metadata" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb0e353e6a2fbdc176932bbaab493762eb1255a7900fe0fea1a2f96c296cc909" +dependencies = [ + "anyhow", + "indexmap", + "wasm-encoder 0.244.0", + "wasmparser 0.244.0", +] + [[package]] name = "wasmparser" version = "0.243.0" @@ -3755,6 +3961,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "47b807c72e1bac69382b3a6fb3dbe8ea4c0ed87ff5629b8685ae6b9a611028fe" dependencies = [ "bitflags", + "hashbrown 0.15.5", "indexmap", "semver", ] @@ -3886,7 +4093,7 @@ dependencies = [ "syn", "wasmtime-internal-component-util", "wasmtime-internal-wit-bindgen", - "wit-parser", + "wit-parser 0.243.0", ] [[package]] @@ -4027,7 +4234,7 @@ dependencies = [ "bitflags", "heck", "indexmap", - "wit-parser", + "wit-parser 0.243.0", ] [[package]] @@ -4671,6 +4878,70 @@ name = "wit-bindgen" version = "0.51.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d7249219f66ced02969388cf2bb044a09756a083d0fab1e566056b04d9fbcaa5" +dependencies = [ + "wit-bindgen-rust-macro", +] + +[[package]] +name = "wit-bindgen-core" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea61de684c3ea68cb082b7a88508a8b27fcc8b797d738bfc99a82facf1d752dc" +dependencies = [ + "anyhow", + "heck", + "wit-parser 0.244.0", +] + +[[package]] +name = "wit-bindgen-rust" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7c566e0f4b284dd6561c786d9cb0142da491f46a9fbed79ea69cdad5db17f21" +dependencies = [ + "anyhow", + "heck", + "indexmap", + "prettyplease", + "syn", + "wasm-metadata", + "wit-bindgen-core", + "wit-component", +] + +[[package]] +name = "wit-bindgen-rust-macro" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c0f9bfd77e6a48eccf51359e3ae77140a7f50b1e2ebfe62422d8afdaffab17a" +dependencies = [ + "anyhow", + "prettyplease", + "proc-macro2", + "quote", + "syn", + "wit-bindgen-core", + "wit-bindgen-rust", +] + +[[package]] +name = "wit-component" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d66ea20e9553b30172b5e831994e35fbde2d165325bec84fc43dbf6f4eb9cb2" +dependencies = [ + "anyhow", + "bitflags", + "indexmap", + "log", + "serde", + "serde_derive", + "serde_json", + "wasm-encoder 0.244.0", + "wasm-metadata", + "wasmparser 0.244.0", + "wit-parser 0.244.0", +] [[package]] name = "wit-parser" @@ -4690,6 +4961,24 @@ dependencies = [ "wasmparser 0.243.0", ] +[[package]] +name = "wit-parser" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ecc8ac4bc1dc3381b7f59c34f00b67e18f910c2c0f50015669dde7def656a736" +dependencies = [ + "anyhow", + "id-arena", + "indexmap", + "log", + "semver", + "serde", + "serde_derive", + "serde_json", + "unicode-xid", + "wasmparser 0.244.0", +] + [[package]] name = "witx" version = "0.9.1" @@ -4708,6 +4997,12 @@ version = "0.6.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9edde0db4769d2dc68579893f2306b26c6ecfbe0ef499b013d731b7b9247e0b9" +[[package]] +name = "xxhash-rust" +version = "0.8.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdd20c5420375476fbd4394763288da7eb0cc0b8c11deed431a91562af7335d3" + [[package]] name = "yoke" version = "0.8.1" diff --git a/Cargo.toml b/Cargo.toml index 804d60f..f0babdb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,7 @@ bytes = "1" chrono = "0.4" clap = { version = "4", features = ["derive"] } dotenvy = "0.15" +fjall = "3" indexmap = "2" reqwest = { version = "0.13", features = ["hickory-dns"] } rustls = "0.23" @@ -40,6 +41,7 @@ url = "2" wasmtime = "41" wasmtime-wasi = "41" wasmtime-wasi-http = "41" +uuid = "1" [profile.release] lto = true diff --git a/src/cli.rs b/src/cli.rs index 6d34823..f90b59f 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -39,6 +39,9 @@ pub struct Cli { #[arg(action=ArgAction::Set, default_value_t = true, short = 'C', long, value_name = "BOOL", help = "Enable the usage of cached plugins", long_help = None, hide_possible_values = true)] pub cache: bool, + #[arg(default_value = "./database", short, long, value_name = "DIRECTORY PATH", help = "The path to the program its database", long_help = None)] + pub database_directory: PathBuf, + #[arg(default_value_t = 15, short = 't', long, value_name = "SECONDS", help = "The amount of seconds after which the HTTP client should timeout", long_help = None)] pub http_client_timeout_seconds: u64, } diff --git a/src/database.rs b/src/database.rs new file mode 100644 index 0000000..1a1affd --- /dev/null +++ b/src/database.rs @@ -0,0 +1,92 @@ +/* SPDX-License-Identifier: GPL-3.0-or-later */ +/* Copyright © 2026 Eduard Smet */ + +use std::{ + fs::{self}, + io::ErrorKind, + path::Path, +}; + +use anyhow::{Result, bail}; +use fjall::{Database, KeyspaceCreateOptions, PersistMode, Slice}; + +use crate::utils::channels::DatabaseMessages; + +pub enum Keyspaces { + Plugins, + PluginStore, + DependencyFunctions, + ScheduledJobs, + DiscordEvents, + DiscordApplicationCommands, + DiscordMessageComponents, + DiscordModals, +} + +pub fn new(database_directory_path: &Path) -> Result { + if let Err(err) = fs::create_dir_all(database_directory_path) + && err.kind() != ErrorKind::AlreadyExists + { + bail!(err); + } + + Ok(Database::builder(database_directory_path).open()?) +} + +pub fn handle_action(database: Database, message: DatabaseMessages) { + match message { + DatabaseMessages::GetState(keyspace, key, response_sender) => { + response_sender.send(get(database, keyspace, key)); + } + DatabaseMessages::InsertState(keyspace, key, value, response_sender) => { + response_sender.send(insert(database, keyspace, key, value)); + } + DatabaseMessages::DeleteState(keyspace, key, response_sender) => { + response_sender.send(remove(database, keyspace, key)); + } + DatabaseMessages::ContainsKey(keyspace, key, response_sender) => { + response_sender.send(contains_key(database, keyspace, key)); + } + } +} + +pub fn get(database: Database, keyspace: Keyspaces, key: Vec) -> Result> { + let keyspace = database.keyspace(get_keyspace(keyspace), KeyspaceCreateOptions::default)?; + + Ok(keyspace.get(key)?) +} + +pub fn insert(database: Database, keyspace: Keyspaces, key: Vec, value: Vec) -> Result<()> { + let keyspace = database.keyspace(get_keyspace(keyspace), KeyspaceCreateOptions::default)?; + + Ok(keyspace.insert(key, value)?) +} + +pub fn remove(database: Database, keyspace: Keyspaces, key: Vec) -> Result<()> { + let keyspace = database.keyspace(get_keyspace(keyspace), KeyspaceCreateOptions::default)?; + + Ok(keyspace.remove(key)?) +} + +pub fn contains_key(database: Database, keyspace: Keyspaces, key: Vec) -> Result { + let keyspace = database.keyspace(get_keyspace(keyspace), KeyspaceCreateOptions::default)?; + + Ok(keyspace.contains_key(key)?) +} + +pub fn persist(database: Database, persist_mode: PersistMode) -> Result<()> { + Ok(database.persist(persist_mode)?) +} + +fn get_keyspace(keyspace: Keyspaces) -> &'static str { + match keyspace { + Keyspaces::Plugins => "plugins", + Keyspaces::PluginStore => "plugin_store", + Keyspaces::DependencyFunctions => "dependency_functions", + Keyspaces::ScheduledJobs => "scheduled_jobs", + Keyspaces::DiscordEvents => "discord_events", + Keyspaces::DiscordApplicationCommands => "discord_application_commands", + Keyspaces::DiscordMessageComponents => "discord_message_componets", + Keyspaces::DiscordModals => "discord_modals", + } +} diff --git a/src/discord.rs b/src/discord.rs index 59bfb07..991ce6e 100644 --- a/src/discord.rs +++ b/src/discord.rs @@ -1,27 +1,22 @@ /* SPDX-License-Identifier: GPL-3.0-or-later */ /* Copyright © 2026 Eduard Smet */ -use std::{collections::HashMap, sync::Arc}; +use std::sync::Arc; use tokio::{ - sync::{ - Mutex, RwLock, - mpsc::{Receiver, Sender}, - }, + sync::mpsc::{UnboundedReceiver, UnboundedSender}, task::JoinHandle, }; use tracing::{error, info}; -use twilight_cache_inmemory::{DefaultInMemoryCache, InMemoryCache, ResourceType}; +use twilight_cache_inmemory::{DefaultInMemoryCache, InMemoryCache}; use twilight_gateway::{ - CloseFrame, Config, Event, EventType, EventTypeFlags, Intents, MessageSender, Shard, StreamExt, + CloseFrame, Config, EventType, EventTypeFlags, Intents, MessageSender, Shard, StreamExt, }; use twilight_http::Client; -use twilight_model::id::{Id, marker::GuildMarker}; use crate::{ SHUTDOWN, - plugins::PluginRegistrations, - utils::channels::{DiscordBotClientMessages, RuntimeMessages}, + utils::channels::{CoreMessages, DiscordBotClientMessages}, }; mod events; @@ -30,23 +25,22 @@ mod requests; pub struct DiscordBotClient { http_client: Arc, - shard_message_senders: Arc, Arc>>>, + shards: Vec, + shard_message_senders: Arc>, cache: Arc, - plugin_registrations: Arc>, - runtime_tx: Arc>, - runtime_rx: Arc>>, + core_tx: Arc>, + rx: UnboundedReceiver, } impl DiscordBotClient { pub async fn new( token: String, - plugin_registrations: Arc>, - runtime_tx: Sender, - runtime_rx: Receiver, - ) -> Result<(Self, Box + Send>), ()> { + core_tx: UnboundedSender, + rx: UnboundedReceiver, + ) -> Result { info!("Creating the Discord bot client"); - let intents = Intents::all(); + let intents = Intents::all(); // TODO: Make this configurable rustls::crypto::aws_lc_rs::default_provider() .install_default() @@ -56,14 +50,14 @@ impl DiscordBotClient { let config = Config::new(token, intents); - let shards = match twilight_gateway::create_recommended( + let (shards, shard_message_senders) = match twilight_gateway::create_recommended( &http_client, config, |_, builder| builder.build(), ) .await { - Ok(shards) => Box::new(shards), + Ok(shard_iterator) => Self::shard_message_senders(Box::new(shard_iterator)), Err(err) => { error!( "Something went wrong while getting the recommended amount of shards from Discord, error: {}", @@ -73,74 +67,66 @@ impl DiscordBotClient { } }; - let shard_message_senders = Arc::new(RwLock::new(HashMap::new())); - - let cache = Arc::new( - DefaultInMemoryCache::builder() - .resource_types(ResourceType::all()) - .build(), - ); - - Ok(( - DiscordBotClient { - http_client: Arc::new(http_client), - shard_message_senders, - cache, - plugin_registrations, - runtime_tx: Arc::new(runtime_tx), - runtime_rx: Arc::new(Mutex::new(runtime_rx)), - }, + let cache = Arc::new(DefaultInMemoryCache::default()); // TODO: Make this configurable + + Ok(DiscordBotClient { + http_client: Arc::new(http_client), shards, - )) + shard_message_senders: Arc::new(shard_message_senders), + cache, + core_tx: Arc::new(core_tx), + rx, + }) } - pub fn start(self, shards: Box + Send>) -> JoinHandle<()> { - let mut tasks = Vec::with_capacity(shards.len()); - - let discord_bot_client = Arc::new(self); + pub fn start(mut self) -> JoinHandle<()> { + let mut tasks = Vec::with_capacity(self.shards.len()); - for shard in shards { + for shard in self.shards.drain(..) { tasks.push(tokio::spawn(Self::shard_runner( - discord_bot_client.clone(), + self.cache.clone(), + self.core_tx.clone(), shard, ))); } tokio::spawn(async move { - while let Some(message) = discord_bot_client.runtime_rx.lock().await.recv().await { + while let Some(message) = self.rx.recv().await { match message { - DiscordBotClientMessages::RegisterApplicationCommands(commands) => { - let _ = discord_bot_client - .application_command_registrations(commands) - .await; + DiscordBotClientMessages::RegisterApplicationCommands( + commands, + response_sender, + ) => { + let http_client = self.http_client.clone(); + tokio::spawn(async { + response_sender.send( + Self::application_command_registrations(http_client, commands) + .await, + ); + }); } DiscordBotClientMessages::Request(request, response_sender) => { - let _ = response_sender.send(discord_bot_client.request(request).await); - } - DiscordBotClientMessages::Shutdown(is_done) => { - for sender in discord_bot_client - .shard_message_senders - .read() - .await - .values() - { - _ = sender.close(CloseFrame::NORMAL); - } - - for task in tasks.drain(..) { - let _ = task.await; - } - - let _ = is_done.send(()); + let http_client = self.http_client.clone(); + let shard_message_senders = self.shard_message_senders.clone(); + + tokio::spawn(async { + response_sender.send( + Self::request(http_client, shard_message_senders, request).await, + ); + }); } } } + + self.shutdown(tasks); }) } - pub async fn shard_runner(discord_bot_client: Arc, mut shard: Shard) { - let shard_message_sender = Arc::new(shard.sender()); - + async fn shard_runner( + cache: Arc, + core_tx: Arc>, + mut shard: Shard, + ) { while let Some(item) = shard.next_event(EventTypeFlags::all()).await { let Ok(event) = item else { error!( @@ -155,24 +141,33 @@ impl DiscordBotClient { break; } - discord_bot_client.cache.update(&event); + cache.update(&event); - match event { - Event::Ready(ready) => { - info!("Shard is ready, logged in as {}", &ready.user.name); + tokio::spawn(Self::handle_event(core_tx.clone(), event)); + } + } - for guild in ready.guilds { - discord_bot_client - .shard_message_senders - .write() - .await - .insert(guild.id, shard_message_sender.clone()); - } - } - _ => { - tokio::spawn(Self::handle_event(discord_bot_client.clone(), event)); - } - } + fn shard_message_senders( + shard_iterator: Box>, + ) -> (Vec, Vec) { + let mut shards = vec![]; + let mut shard_message_senders = vec![]; + + for shard in shard_iterator { + shard_message_senders.push(shard.sender()); + shards.push(shard); + } + + (shards, shard_message_senders) + } + + async fn shutdown(&self, mut tasks: Vec>) { + for shard_message_sender in self.shard_message_senders.iter() { + _ = shard_message_sender.close(CloseFrame::NORMAL); + } + + for task in tasks.drain(..) { + let _ = task.await; } } } diff --git a/src/discord/events.rs b/src/discord/events.rs index 42cb2e7..d6c125b 100644 --- a/src/discord/events.rs +++ b/src/discord/events.rs @@ -1,66 +1,48 @@ /* SPDX-License-Identifier: GPL-3.0-or-later */ /* Copyright © 2026 Eduard Smet */ -use std::{any::Any, sync::Arc}; +use std::sync::Arc; +use tokio::sync::mpsc::UnboundedSender; use tracing::{debug, error}; use twilight_gateway::Event; use twilight_model::application::interaction::InteractionData; use crate::{ discord::DiscordBotClient, - plugins::discord_bot::plugin::discord_types::Events as DiscordEvents, - utils::channels::RuntimeMessages, + plugins::discord_bot::plugin::discord_export_types::DiscordEvents, + utils::channels::{CoreMessages, RuntimeMessages, RuntimeMessagesDiscord}, }; impl DiscordBotClient { #[allow(clippy::too_many_lines)] - pub async fn handle_event(discord_bot_client: Arc, event: Event) { + pub async fn handle_event(core_tx: Arc>, event: Event) { match event { Event::InteractionCreate(interaction_create) => { match interaction_create.data.as_ref() { Some(InteractionData::ApplicationCommand(command_data)) => { - let initialized_plugins = - discord_bot_client.plugin_registrations.read().await; + // TODO: get value - let Some(plugin) = initialized_plugins - .discord_events - .interaction_create - .application_commands - .get(&command_data.id) - else { - return; - }; - - let _ = discord_bot_client - .runtime_tx - .send(RuntimeMessages::CallDiscordEvent( - plugin.clone(), - DiscordEvents::InteractionCreate( - sonic_rs::to_vec(&interaction_create).unwrap(), + let _ = core_tx + .send(CoreMessages::Runtime(RuntimeMessages::Discord( + RuntimeMessagesDiscord::CallDiscordEvent( + plugin_uuid, + DiscordEvents::InteractionCreate( + sonic_rs::to_vec(&interaction_create).unwrap(), + ), ), - )) + ))) .await; } Some(InteractionData::MessageComponent(message_component_interaction_data)) => { - let initialized_plugins = - discord_bot_client.plugin_registrations.read().await; - - let Some(plugin) = initialized_plugins - .discord_events - .interaction_create - .message_components - .get(&message_component_interaction_data.custom_id) - else { - return; - }; - let _ = discord_bot_client .runtime_tx - .send(RuntimeMessages::CallDiscordEvent( - plugin.clone(), - DiscordEvents::InteractionCreate( - sonic_rs::to_vec(&interaction_create).unwrap(), + .send(RuntimeMessages::Discord( + RuntimeMessagesDiscord::CallDiscordEvent( + plugin.clone(), + DiscordEvents::InteractionCreate( + sonic_rs::to_vec(&interaction_create).unwrap(), + ), ), )) .await; @@ -80,10 +62,12 @@ impl DiscordBotClient { let _ = discord_bot_client .runtime_tx - .send(RuntimeMessages::CallDiscordEvent( - plugin.clone(), - DiscordEvents::InteractionCreate( - sonic_rs::to_vec(&interaction_create).unwrap(), + .send(RuntimeMessages::Discord( + RuntimeMessagesDiscord::CallDiscordEvent( + plugin.clone(), + DiscordEvents::InteractionCreate( + sonic_rs::to_vec(&interaction_create).unwrap(), + ), ), )) .await; @@ -105,10 +89,12 @@ impl DiscordBotClient { { let _ = discord_bot_client .runtime_tx - .send(RuntimeMessages::CallDiscordEvent( - plugin.clone(), - DiscordEvents::MessageCreate( - sonic_rs::to_vec(&message_create).unwrap(), + .send(RuntimeMessages::Discord( + RuntimeMessagesDiscord::CallDiscordEvent( + plugin.clone(), + DiscordEvents::MessageCreate( + sonic_rs::to_vec(&message_create).unwrap(), + ), ), )) .await; @@ -124,9 +110,13 @@ impl DiscordBotClient { { let _ = discord_bot_client .runtime_tx - .send(RuntimeMessages::CallDiscordEvent( - plugin.clone(), - DiscordEvents::ThreadCreate(sonic_rs::to_vec(&thread_create).unwrap()), + .send(RuntimeMessages::Discord( + RuntimeMessagesDiscord::CallDiscordEvent( + plugin.clone(), + DiscordEvents::ThreadCreate( + sonic_rs::to_vec(&thread_create).unwrap(), + ), + ), )) .await; } @@ -141,9 +131,13 @@ impl DiscordBotClient { { let _ = discord_bot_client .runtime_tx - .send(RuntimeMessages::CallDiscordEvent( - plugin.clone(), - DiscordEvents::ThreadDelete(sonic_rs::to_vec(&thread_delete).unwrap()), + .send(RuntimeMessages::Discord( + RuntimeMessagesDiscord::CallDiscordEvent( + plugin.clone(), + DiscordEvents::ThreadDelete( + sonic_rs::to_vec(&thread_delete).unwrap(), + ), + ), )) .await; } @@ -158,10 +152,12 @@ impl DiscordBotClient { { let _ = discord_bot_client .runtime_tx - .send(RuntimeMessages::CallDiscordEvent( - plugin.clone(), - DiscordEvents::ThreadListSync( - sonic_rs::to_vec(&thread_list_sync).unwrap(), + .send(RuntimeMessages::Discord( + RuntimeMessagesDiscord::CallDiscordEvent( + plugin.clone(), + DiscordEvents::ThreadListSync( + sonic_rs::to_vec(&thread_list_sync).unwrap(), + ), ), )) .await; @@ -177,10 +173,12 @@ impl DiscordBotClient { { let _ = discord_bot_client .runtime_tx - .send(RuntimeMessages::CallDiscordEvent( - plugin.clone(), - DiscordEvents::ThreadMemberUpdate( - sonic_rs::to_vec(&thread_member_update).unwrap(), + .send(RuntimeMessages::Discord( + RuntimeMessagesDiscord::CallDiscordEvent( + plugin.clone(), + DiscordEvents::ThreadMemberUpdate( + sonic_rs::to_vec(&thread_member_update).unwrap(), + ), ), )) .await; @@ -196,10 +194,12 @@ impl DiscordBotClient { { let _ = discord_bot_client .runtime_tx - .send(RuntimeMessages::CallDiscordEvent( - plugin.clone(), - DiscordEvents::ThreadMembersUpdate( - sonic_rs::to_vec(&thread_members_update).unwrap(), + .send(RuntimeMessages::Discord( + RuntimeMessagesDiscord::CallDiscordEvent( + plugin.clone(), + DiscordEvents::ThreadMembersUpdate( + sonic_rs::to_vec(&thread_members_update).unwrap(), + ), ), )) .await; @@ -215,9 +215,13 @@ impl DiscordBotClient { { let _ = discord_bot_client .runtime_tx - .send(RuntimeMessages::CallDiscordEvent( - plugin.clone(), - DiscordEvents::ThreadUpdate(sonic_rs::to_vec(&thread_update).unwrap()), + .send(RuntimeMessages::Discord( + RuntimeMessagesDiscord::CallDiscordEvent( + plugin.clone(), + DiscordEvents::ThreadUpdate( + sonic_rs::to_vec(&thread_update).unwrap(), + ), + ), )) .await; } diff --git a/src/discord/interactions.rs b/src/discord/interactions.rs index 06f559e..1d3dc68 100644 --- a/src/discord/interactions.rs +++ b/src/discord/interactions.rs @@ -1,10 +1,11 @@ /* SPDX-License-Identifier: GPL-3.0-or-later */ /* Copyright © 2026 Eduard Smet */ -use std::collections::HashMap; +use std::{collections::HashMap, sync::Arc}; +use anyhow::Result; use tracing::{error, info}; -use twilight_http::{request::Request, routing::Route}; +use twilight_http::{Client, request::Request, routing::Route}; use twilight_model::{ application::command::Command, id::{ @@ -17,11 +18,11 @@ use crate::{discord::DiscordBotClient, plugins::PluginRegistrationRequestsApplic impl DiscordBotClient { pub async fn application_command_registrations( - &self, + http_client: Arc, discord_application_command_registration_request: Vec< PluginRegistrationRequestsApplicationCommand, >, - ) -> Result<(), ()> { + ) -> Result<(Vec, Vec)> { let mut discord_commands = HashMap::new(); let mut commands = HashMap::new(); @@ -44,7 +45,7 @@ impl DiscordBotClient { .push((command.plugin_id, command_data.clone())); } - let application_id = match self.http_client.current_user_application().await { + let application_id = match http_client.current_user_application().await { Ok(response) => match response.model().await { Ok(application) => application.id, Err(err) => { @@ -80,8 +81,7 @@ impl DiscordBotClient { } }; - match self - .http_client + match http_client .request::>(global_discord_commands_request) .await { @@ -110,7 +110,7 @@ impl DiscordBotClient { } // TODO: Endpoint is limited to 200 guilds per request, pagination needs to be implemented. - let current_user_guilds = match self.http_client.current_user_guilds().await { + let current_user_guilds = match http_client.current_user_guilds().await { Ok(response) => match response.model().await { Ok(current_user_guilds) => current_user_guilds, Err(err) => { @@ -148,8 +148,7 @@ impl DiscordBotClient { } }; - match self - .http_client + match http_client .request::>(guild_commands_request) .await { @@ -182,18 +181,16 @@ impl DiscordBotClient { if commands_by_name.1.len() == 1 { let command = commands_by_name.1.remove(0); - match self - .register_application_command(application_id, &mut discord_commands, &command.1) - .await + match Self::register_application_command( + http_client.clone(), + application_id, + &mut discord_commands, + &command.1, + ) + .await { Ok(command_id) => { - self.plugin_registrations - .write() - .await - .discord_events - .interaction_create - .application_commands - .insert(command_id, command.0); + // TODO: return value } Err(()) => { error!( @@ -208,22 +205,16 @@ impl DiscordBotClient { for mut command in commands_by_name.1 { command.1.name += format!("~{command_name_occurence_count}").as_str(); - match self - .register_application_command( - application_id, - &mut discord_commands, - &command.1, - ) - .await + match Self::register_application_command( + http_client.clone(), + application_id, + &mut discord_commands, + &command.1, + ) + .await { Ok(command_id) => { - self.plugin_registrations - .write() - .await - .discord_events - .interaction_create - .application_commands - .insert(command_id, command.0); + // TODO: return value } Err(()) => { error!( @@ -238,14 +229,14 @@ impl DiscordBotClient { } } - self.delete_old_application_commands(application_id, &discord_commands) + Self::delete_old_application_commands(http_client, application_id, &discord_commands) .await?; Ok(()) } async fn register_application_command( - &self, + http_client: Arc, application_id: Id, discord_commands: &mut HashMap, command: &Command, @@ -304,7 +295,7 @@ impl DiscordBotClient { } }; - match self.http_client.request::(request).await { + match http_client.request::(request).await { Ok(response) => match response.model().await { Ok(command) => Ok(command.id.unwrap()), Err(err) => { @@ -326,7 +317,7 @@ impl DiscordBotClient { } async fn delete_old_application_commands( - &self, + http_client: Arc, application_id: Id, discord_commands: &HashMap, ) -> Result<(), ()> { @@ -358,7 +349,7 @@ impl DiscordBotClient { "Deleting the {} command, guild id: {:?}", &discord_command.name, &discord_command.guild_id ); - match self.http_client.request::<()>(request).await { + match http_client.request::<()>(request).await { Ok(_) => (), Err(err) => { error!( diff --git a/src/discord/requests.rs b/src/discord/requests.rs index 7c229e3..f8b398a 100644 --- a/src/discord/requests.rs +++ b/src/discord/requests.rs @@ -1,52 +1,47 @@ /* SPDX-License-Identifier: GPL-3.0-or-later */ /* Copyright © 2026 Eduard Smet */ -use twilight_http::{request::Request, routing::Route}; -use twilight_model::{ - gateway::{ - OpCode, - payload::outgoing::{ - RequestGuildMembers, UpdatePresence, UpdateVoiceState, - request_guild_members::RequestGuildMembersInfo, update_presence::UpdatePresencePayload, - update_voice_state::UpdateVoiceStateInfo, - }, +use std::sync::Arc; + +use anyhow::{Result, anyhow, bail}; +use twilight_gateway::MessageSender; +use twilight_http::{Client, request::Request, routing::Route}; +use twilight_model::gateway::{ + OpCode, + payload::outgoing::{ + RequestGuildMembers, UpdatePresence, UpdateVoiceState, + request_guild_members::RequestGuildMembersInfo, update_presence::UpdatePresencePayload, + update_voice_state::UpdateVoiceStateInfo, }, - id::Id, }; use crate::{ discord::DiscordBotClient, plugins::discord_bot::plugin::{ - discord_types::Contents, - host_functions::{DiscordRequests, DiscordResponses}, + discord_import_functions::{DiscordRequests, DiscordResponses}, + discord_import_types::Body, }, }; impl DiscordBotClient { #[allow(clippy::too_many_lines)] pub async fn request( - &self, + http_client: Arc, + shard_message_senders: Arc>, request: DiscordRequests, - ) -> Result, String> { + ) -> Result> { let request = match request { // Shard message sender commands DiscordRequests::RequestGuildMembers((guild_id, body)) => { - let guild_id = Id::new(guild_id); - - let guild_shard_message_sender = if let Some(guild_shard_message_sender) = - self.shard_message_senders.read().await.get(&guild_id) - { - guild_shard_message_sender.clone() - } else { - return Err(String::from("No guild found")); - }; + let guild_shard_message_sender = + Self::get_guild_shard_id(&shard_message_senders, guild_id); let d = match sonic_rs::from_slice::(&body) { Ok(d) => d, Err(err) => { - return Err(format!( + bail!( "Something went wrong while deserializing RequestGuildMembersInfo, error: {err}", - )); + ); } }; @@ -60,27 +55,18 @@ impl DiscordBotClient { None } DiscordRequests::RequestSoundboardSounds(_guild_ids) => { - return Err(String::from( - "RequestSoundboardSounds has not yet been implemented in Twilight.", - )); + bail!("RequestSoundboardSounds has not yet been implemented in Twilight.",); } DiscordRequests::UpdateVoiceState((guild_id, body)) => { - let guild_id = Id::new(guild_id); - - let guild_shard_message_sender = if let Some(guild_shard_message_sender) = - self.shard_message_senders.read().await.get(&guild_id) - { - guild_shard_message_sender.clone() - } else { - return Err(String::from("No guild found")); - }; + let guild_shard_message_sender = + Self::get_guild_shard_id(&shard_message_senders, guild_id); let d = match sonic_rs::from_slice::(&body) { Ok(d) => d, Err(err) => { - return Err(format!( + bail!( "Something went wrong while deserializing RequestGuildMembersInfo, error: {err}", - )); + ); } }; @@ -94,20 +80,14 @@ impl DiscordBotClient { None } DiscordRequests::UpdatePresence(body) => { - let guild_shard_message_sender = if let Some(guild_shard_message_sender) = - self.shard_message_senders.read().await.values().next() - { - guild_shard_message_sender.clone() - } else { - return Err(String::from("No guild found")); - }; + let guild_shard_message_sender = shard_message_senders.get(0).unwrap(); let d = match sonic_rs::from_slice::(&body) { Ok(d) => d, Err(err) => { - return Err(format!( + bail!( "Something went wrong while deserializing RequestGuildMembersInfo, error: {err}", - )); + ); } }; @@ -131,9 +111,9 @@ impl DiscordBotClient { { Ok(request) => Some(request), Err(err) => { - return Err(format!( + bail!( "Something went wrong while building a Discord request, error: {err}" - )); + ); } } } @@ -144,9 +124,9 @@ impl DiscordBotClient { { Ok(request) => Some(request), Err(err) => { - return Err(format!( + bail!( "Something went wrong while building a Discord request, error: {err}" - )); + ); } } } @@ -154,11 +134,11 @@ impl DiscordBotClient { let request_builder = Request::builder(&Route::CreateForumThread { channel_id }); let request_builder = match content { - Contents::Json(bytes) => request_builder.body(bytes), - Contents::Form(buffer) => match request_builder.multipart(buffer) { + Body::Json(bytes) => request_builder.body(bytes), + Body::Form(buffer) => match request_builder.multipart(buffer) { Ok(request) => request, Err(err) => { - return Err(err.to_string()); + bail!(err); } }, }; @@ -166,9 +146,9 @@ impl DiscordBotClient { match request_builder.build() { Ok(request) => Some(request), Err(err) => { - return Err(format!( + bail!( "Something went wrong while building a Discord request, error: {err}" - )); + ); } } } @@ -176,11 +156,11 @@ impl DiscordBotClient { let request_builder = Request::builder(&Route::CreateMessage { channel_id }); let request_builder = match content { - Contents::Json(bytes) => request_builder.body(bytes), - Contents::Form(buffer) => match request_builder.multipart(buffer) { + Body::Json(bytes) => request_builder.body(bytes), + Body::Form(buffer) => match request_builder.multipart(buffer) { Ok(request) => request, Err(err) => { - return Err(err.to_string()); + bail!(err); } }, }; @@ -188,9 +168,9 @@ impl DiscordBotClient { match request_builder.build() { Ok(request) => Some(request), Err(err) => { - return Err(format!( + bail!( "Something went wrong while building a Discord request, error: {err}" - )); + ); } } } @@ -201,9 +181,9 @@ impl DiscordBotClient { { Ok(request) => Some(request), Err(err) => { - return Err(format!( + bail!( "Something went wrong while building a Discord request, error: {err}" - )); + ); } } } @@ -217,9 +197,9 @@ impl DiscordBotClient { { Ok(request) => Some(request), Err(err) => { - return Err(format!( + bail!( "Something went wrong while building a Discord request, error: {err}" - )); + ); } } } @@ -232,9 +212,9 @@ impl DiscordBotClient { { Ok(request) => Some(request), Err(err) => { - return Err(format!( + bail!( "Something went wrong while building a Discord request, error: {err}" - )); + ); } } } @@ -242,9 +222,9 @@ impl DiscordBotClient { match Request::builder(&Route::GetActiveThreads { guild_id }).build() { Ok(request) => Some(request), Err(err) => { - return Err(format!( + bail!( "Something went wrong while building a Discord request, error: {err}" - )); + ); } } } @@ -252,9 +232,9 @@ impl DiscordBotClient { match Request::builder(&Route::GetChannel { channel_id }).build() { Ok(request) => Some(request), Err(err) => { - return Err(format!( + bail!( "Something went wrong while building a Discord request, error: {err}" - )); + ); } } } @@ -268,9 +248,9 @@ impl DiscordBotClient { { Ok(request) => Some(request), Err(err) => { - return Err(format!( + bail!( "Something went wrong while building a Discord request, error: {err}" - )); + ); } } } @@ -284,9 +264,9 @@ impl DiscordBotClient { { Ok(request) => Some(request), Err(err) => { - return Err(format!( + bail!( "Something went wrong while building a Discord request, error: {err}" - )); + ); } } } @@ -300,9 +280,9 @@ impl DiscordBotClient { { Ok(request) => Some(request), Err(err) => { - return Err(format!( + bail!( "Something went wrong while building a Discord request, error: {err}" - )); + ); } } } @@ -315,9 +295,9 @@ impl DiscordBotClient { { Ok(request) => Some(request), Err(err) => { - return Err(format!( + bail!( "Something went wrong while building a Discord request, error: {err}" - )); + ); } } } @@ -332,9 +312,9 @@ impl DiscordBotClient { { Ok(request) => Some(request), Err(err) => { - return Err(format!( + bail!( "Something went wrong while building a Discord request, error: {err}" - )); + ); } } } @@ -354,9 +334,9 @@ impl DiscordBotClient { { Ok(request) => Some(request), Err(err) => { - return Err(format!( + bail!( "Something went wrong while building a Discord request, error: {err}" - )); + ); } } } @@ -364,9 +344,9 @@ impl DiscordBotClient { match Request::builder(&Route::JoinThread { channel_id }).build() { Ok(request) => Some(request), Err(err) => { - return Err(format!( + bail!( "Something went wrong while building a Discord request, error: {err}" - )); + ); } } } @@ -374,9 +354,9 @@ impl DiscordBotClient { match Request::builder(&Route::LeaveThread { channel_id }).build() { Ok(request) => Some(request), Err(err) => { - return Err(format!( + bail!( "Something went wrong while building a Discord request, error: {err}" - )); + ); } } } @@ -389,9 +369,9 @@ impl DiscordBotClient { { Ok(request) => Some(request), Err(err) => { - return Err(format!( + bail!( "Something went wrong while building a Discord request, error: {err}" - )); + ); } } } @@ -402,9 +382,9 @@ impl DiscordBotClient { { Ok(request) => Some(request), Err(err) => { - return Err(format!( + bail!( "Something went wrong while building a Discord request, error: {err}" - )); + ); } } } @@ -422,18 +402,18 @@ impl DiscordBotClient { { Ok(request) => Some(request), Err(err) => { - return Err(format!( + bail!( "Something went wrong while building a Discord request, error: {err}" - )); + ); } } } }; if let Some(request) = request { - match self.http_client.request::>(request).await { + match http_client.request::>(request).await { Ok(response) => Ok(Some(response.bytes().await.unwrap().clone())), - Err(err) => Err(format!( + Err(err) => Err(anyhow!( "Something went wrong while making a Discord request, error: {err}" )), } @@ -441,4 +421,13 @@ impl DiscordBotClient { Ok(None) } } + + fn get_guild_shard_id( + shard_message_senders: &Arc>, + guild_id: u64, + ) -> &MessageSender { + shard_message_senders + .get((guild_id >> 22) as usize % shard_message_senders.len()) + .unwrap() + } } diff --git a/src/http/registry.rs b/src/http/registry.rs index 41583a4..1bf826d 100644 --- a/src/http/registry.rs +++ b/src/http/registry.rs @@ -3,7 +3,7 @@ use std::str::FromStr; -use anyhow::{Context, Error, Result}; +use anyhow::{Context, Result, bail}; use reqwest::StatusCode; use tracing::debug; use url::{ParseError, Url}; @@ -19,10 +19,10 @@ impl HttpClient { let response = self.client.get(url).send().await?; if response.status() != StatusCode::OK { - return Err(Error::msg(format!( + bail!( "The response was undesired, status code: {}", response.status() - ))); + ); } Ok(response diff --git a/src/job_scheduler.rs b/src/job_scheduler.rs index 25e1581..832d724 100644 --- a/src/job_scheduler.rs +++ b/src/job_scheduler.rs @@ -3,145 +3,81 @@ use std::sync::Arc; +use anyhow::Result; use tokio::{ - sync::{ - Mutex, RwLock, - mpsc::{Receiver, Sender}, - }, + sync::mpsc::{UnboundedReceiver, UnboundedSender}, task::JoinHandle, }; -use tokio_cron_scheduler::{Job, JobScheduler as TCScheduler}; -use tracing::{error, info}; +use tokio_cron_scheduler::{Job, JobScheduler as TokioCronScheduler}; +use tracing::info; +use uuid::Uuid; -use crate::{ - plugins::{PluginRegistrationRequestsScheduledJob, PluginRegistrations}, - utils::channels::{JobSchedulerMessages, RuntimeMessages}, +use crate::utils::channels::{ + CoreMessages, JobSchedulerMessages, RuntimeMessages, RuntimeMessagesJobScheduler, }; pub struct JobScheduler { - tokio_cron_scheduler: Arc>, - plugin_registrations: Arc>, - runtime_tx: Arc>, - runtime_rx: Arc>>, + tokio_cron_scheduler: TokioCronScheduler, + core_tx: Arc>, + rx: UnboundedReceiver, } impl JobScheduler { pub async fn new( - plugin_registrations: Arc>, - runtime_tx: Sender, - runtime_rx: Receiver, - ) -> Result { - match TCScheduler::new().await { - Ok(job_scheduler) => Ok(JobScheduler { - tokio_cron_scheduler: Arc::new(RwLock::new(job_scheduler)), - plugin_registrations, - runtime_tx: Arc::new(runtime_tx), - runtime_rx: Arc::new(Mutex::new(runtime_rx)), - }), - Err(err) => { - error!( - "Something went wrong while creating a new instance of the job scheduler, error {}", - &err - ); - Err(()) - } - } - } + core_tx: UnboundedSender, + rx: UnboundedReceiver, + ) -> Result { + info!("Creating the job scheduler"); - pub async fn scheduled_job_registrations( - &self, - initialized_plugin_registrations_scheduled_jobs: Vec< - PluginRegistrationRequestsScheduledJob, - >, - ) { - for scheduled_job in &initialized_plugin_registrations_scheduled_jobs { - for cron in &scheduled_job.crons { - info!( - "Scheduled Job {} from the {} plugin requested to be registered.", - &scheduled_job.id, &scheduled_job.plugin_id - ); - - let runtime_tx = self.runtime_tx.clone(); - let plugin_id = scheduled_job.plugin_id.clone(); - let internal_id = scheduled_job.id.clone(); - - let job = match Job::new_async_tz( - cron.clone(), - chrono::Local, - move |_uuid, _lock| { - let runtime_tx = runtime_tx.clone(); - let plugin_id = plugin_id.clone(); - let internal_id = internal_id.clone(); - - Box::pin(async move { - let _ = runtime_tx - .send(RuntimeMessages::CallScheduledJob(plugin_id, internal_id)) - .await; - }) - }, - ) { - Ok(job) => job, - Err(err) => { - error!( - "Something went wrong while adding {} job from the {} plugin to the job scheduler, error: {}", - &scheduled_job.id, &scheduled_job.plugin_id, &err - ); - continue; - } - }; - - match self.tokio_cron_scheduler.read().await.add(job).await { - Ok(uuid) => { - self.plugin_registrations - .write() - .await - .scheduled_jobs - .insert( - uuid.as_u128(), - (scheduled_job.plugin_id.clone(), scheduled_job.id.clone()), - ); - } - Err(err) => { - error!( - "Something went wrong while adding {} job from the {} plugin to the job scheduler, error: {}", - &scheduled_job.id, &scheduled_job.plugin_id, &err - ); - } - } - } - } + Ok(JobScheduler { + tokio_cron_scheduler: TokioCronScheduler::new().await?, + core_tx: Arc::new(core_tx), + rx, + }) } - pub async fn start(self) -> Result, ()> { - if let Err(err) = self.tokio_cron_scheduler.read().await.start().await { - error!( - "Something went wrong while starting the job scheduler, error: {}", - &err - ); - return Err(()); - } - - let job_scheduler = Arc::new(self); + pub async fn start(mut self) -> Result> { + self.tokio_cron_scheduler.start().await?; Ok(tokio::spawn(async move { - while let Some(message) = job_scheduler.runtime_rx.lock().await.recv().await { + while let Some(message) = self.rx.recv().await { match message { - JobSchedulerMessages::RegisterScheduledJobs(scheduled_jobs) => { - job_scheduler - .scheduled_job_registrations(scheduled_jobs) - .await; + JobSchedulerMessages::AddJob(plugin_id, cron, result) => { + let _ = result.send(self.add_job(plugin_id, cron).await); } - JobSchedulerMessages::Shutdown(is_done) => { - let _ = job_scheduler - .tokio_cron_scheduler - .write() - .await - .shutdown() - .await; - let _ = is_done.send(()); + JobSchedulerMessages::RemoveJob(uuid, result) => { + let _ = result.send(self.remove_job(uuid).await); } } } + + let _ = self.tokio_cron_scheduler.shutdown().await; })) } + + async fn add_job(&self, plugin_id: Uuid, cron: String) -> Result { + info!( + "Scheduled Job at {cron} cron from the {plugin_id} plugin requested to be registered" + ); + + let core_tx = self.core_tx.clone(); + + let job = Job::new_async_tz(cron.clone(), chrono::Local, move |job_id, _lock| { + let core_tx = core_tx.clone(); + + Box::pin(async move { + let _ = core_tx.send(CoreMessages::RuntimeModule(RuntimeMessages::JobScheduler( + RuntimeMessagesJobScheduler::CallScheduledJob(plugin_id, job_id), + ))); + }) + })?; + + Ok(self.tokio_cron_scheduler.add(job).await?) + } + + async fn remove_job(&self, uuid: Uuid) -> Result<()> { + info!("Removing scheduled Job {uuid}"); + + Ok(self.tokio_cron_scheduler.remove(&uuid).await?) + } } diff --git a/src/main.rs b/src/main.rs index 0f31578..9aad980 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,21 +5,23 @@ use std::os::unix::process::CommandExt; use std::{ - collections::{HashMap, VecDeque}, + collections::VecDeque, env, ffi::OsString, - path::{Path, PathBuf}, + path::Path, process::{Command, ExitCode, exit}, - sync::{Arc, LazyLock}, + sync::LazyLock, }; use clap::Parser; -use tokio::{signal, sync::RwLock}; +use fjall::{Database, PersistMode}; +use tokio::{signal, sync::RwLock, task::JoinHandle}; use tracing::{error, info, warn}; use tracing_appender::non_blocking::WorkerGuard; mod cli; mod config; +mod database; mod discord; mod http; mod job_scheduler; @@ -29,13 +31,11 @@ mod utils; use cli::{Cli, CliLogParameters}; use config::Config; use discord::DiscordBotClient; -use http::HttpClient; use job_scheduler::JobScheduler; -use plugins::{ - AvailablePlugin, PluginRegistrations, builder::PluginBuilder, registry, runtime::Runtime, -}; +use plugins::{registry, runtime::Runtime}; +use utils::{channels::Channels, env::Secrets}; -use crate::utils::channels::Channels; +use crate::utils::channels::{ChannelsCore, CoreMessages}; #[derive(PartialEq)] enum Shutdown { @@ -65,14 +65,18 @@ async fn main() -> ExitCode { async fn run() -> Result<(), ()> { let cli = Cli::parse(); - //let mut tasks: Arc>>> = Arc::new(Mutex::new(vec![])); // TODO: Rework shutdown - let (_guard, discord_bot_client_token, channels) = - initialization(cli.log_parameters, &cli.env_file)?; + let mut tasks: Vec> = vec![]; + + let (_guard, secrets, channels) = initialization(cli.log_parameters, &cli.env_file)?; let config = Config::new(&cli.config_file)?; - let available_plugins = registry_get_plugins( + let database = database::new(&cli.database_directory).map_err(|_| ())?; + + tasks.push(start(database, channels.core)); + + let available_plugins = registry::registry_get_plugins( cli.http_client_timeout_seconds, config, cli.plugin_directory.clone(), @@ -80,125 +84,111 @@ async fn run() -> Result<(), ()> { ) .await?; - let plugin_registrations = Arc::new(RwLock::new(PluginRegistrations::new())); - - let (discord_bot_client, shards) = DiscordBotClient::new( - discord_bot_client_token, - plugin_registrations.clone(), - channels.runtime.discord_bot_client_sender, - channels.discord_bot_client.receiver, + let discord_bot_client = DiscordBotClient::new( + secrets.discord_bot_client, + channels.discord_bot_client.core_tx, + channels.discord_bot_client.rx, ) .await?; - info!("Creating the job scheduler"); - let job_scheduler = JobScheduler::new( - plugin_registrations.clone(), - channels.runtime.job_scheduler_sender, - channels.job_scheduler.receiver, - ) - .await?; + let job_scheduler = + JobScheduler::new(channels.job_scheduler.core_tx, channels.job_scheduler.rx) + .await + .map_err(|_| ())?; - info!("Creating the WASI runtime"); - let runtime = Arc::new(Runtime::new( - channels.discord_bot_client.sender, - channels.job_scheduler.sender, - channels.runtime.receiver, - )); + let runtime = Runtime::new(channels.runtime.rx); - discord_bot_client.start(shards); + tasks.push(job_scheduler.start().await.map_err(|_| ())?); - job_scheduler.start().await?; + tasks.push(discord_bot_client.start()); - plugin_initializations( - runtime.clone(), - available_plugins, - plugin_registrations, - &cli.plugin_directory, - ) - .await?; + runtime + .initialize_plugins( + available_plugins, + channels.runtime.core_tx, + &cli.plugin_directory, + ) + .await?; - Runtime::start(runtime.clone()); + tasks.push(runtime.start()); - shutdown(runtime).await + shutdown(tasks).await } fn initialization( cli_log_parameters: CliLogParameters, env_file: &Path, -) -> Result<(Option, String, Channels), ()> { +) -> Result<(Option, Secrets, Channels), ()> { let guard = utils::logger::new(cli_log_parameters)?; - utils::env::load_env_file(env_file)?; + utils::env::load_env_file(env_file).map_err(|_| ())?; - let discord_bot_client_token = utils::env::validate()?; + let secrets = utils::env::get_secrets().map_err(|_| ())?; let channels = utils::channels::new(); - Ok((guard, discord_bot_client_token, channels)) + Ok((guard, secrets, channels)) } -async fn registry_get_plugins( - http_client_timeout_seconds: u64, - config: Config, - plugin_directory: PathBuf, - cache: bool, -) -> Result, ()> { - let http_client = Arc::new(HttpClient::new(http_client_timeout_seconds)?); - - registry::get_plugins(http_client, config, plugin_directory, cache).await -} +fn start(database: Database, mut channels_core: ChannelsCore) -> JoinHandle<()> { + tokio::spawn(async move { + while let Some(core_message) = channels_core.rx.recv().await { + match core_message { + CoreMessages::DatabaseModule(database_message) => { + let database = database.clone(); + + tokio::spawn(async { + database::handle_action(database, database_message); + }); + } + CoreMessages::JobSchedulerModule(job_scheduler_message) => { + channels_core.job_scheduler_tx.send(job_scheduler_message); + } + CoreMessages::DiscordBotClientModule(discord_bot_client_message) => { + channels_core + .discord_bot_client_tx + .send(discord_bot_client_message); + } + CoreMessages::RuntimeModule(runtime_message) => { + channels_core.runtime_tx.send(runtime_message); + } + CoreMessages::Shutdown(shutdown) => todo!(), // TODO: Figure shutdown out + } + } -async fn plugin_initializations( - runtime: Arc, - available_plugins: HashMap, - plugin_registrations: Arc>, - config_directory: &Path, -) -> Result<(), ()> { - info!("Creating the WASI plugin builder"); - let plugin_builder = PluginBuilder::new(); - - info!("Initializing the plugins"); - Runtime::initialize_plugins( - runtime, - plugin_builder, - available_plugins, - plugin_registrations, - config_directory, - ) - .await + database::persist(database, PersistMode::SyncAll); + }) } -async fn shutdown(runtime: Arc) -> Result<(), ()> { - let cancellation_token = runtime.cancellation_token.clone(); - - tokio::select! { - result = async move { - if let Err(err) = signal::ctrl_c().await { - error!( - "Failed to listen for the terminal interrupt signal, error: {}", - &err - ); - return Err(()); - } - - info!("Terminal interrupt signal received, send another to force immediate shutdown"); +async fn shutdown(mut tasks: Vec>) -> Result<(), ()> { + tokio::spawn(async { + if let Err(err) = signal::ctrl_c().await { + error!( + "Failed to listen for the terminal interrupt signal, error: {}", + &err + ); + return Err(()); + } - tokio::spawn(async { - signal::ctrl_c() - .await - .expect("failed to listen for the terminal interrupt signal"); + info!("Terminal interrupt signal received, send another to force immediate shutdown"); - warn!("Second terminal interrupt signal received, forcing immediate shutdown"); - exit(130); - }); + tokio::spawn(async { + signal::ctrl_c() + .await + .expect("failed to listen for the terminal interrupt signal"); - runtime.shutdown(Shutdown::SigInt).await; + warn!("Second terminal interrupt signal received, forcing immediate shutdown"); + exit(130); + }); - Ok(()) + Ok(()) + }); - } => {result} - () = cancellation_token.cancelled() => {Ok(())} + for task in tasks.drain(..) { + task.await; } + + Ok(()) } fn restart() { diff --git a/src/plugins.rs b/src/plugins.rs index dc7becd..7b39da5 100644 --- a/src/plugins.rs +++ b/src/plugins.rs @@ -12,71 +12,39 @@ use serde::{Deserialize, Deserializer}; use serde_yaml_ng::Value; use twilight_model::id::{Id, marker::CommandMarker}; -use crate::plugins::discord_bot::plugin::plugin_types::SupportedRegistrations; +use crate::plugins::discord_bot::plugin::core_import_types::SupportedCoreRegistrations; wasmtime::component::bindgen!({ imports: { default: async }, exports: { default: async } }); -#[derive(Clone, Deserialize)] +#[derive(Deserialize)] pub struct ConfigPlugin { pub plugin: String, pub cache: Option, #[serde(default = "ConfigPlugin::permissions_default")] - pub permissions: SupportedRegistrations, + pub permissions: SupportedCoreRegistrations, // TODO: Add Discord permission support back in pub environment: Option>, pub settings: Option, } impl ConfigPlugin { - fn permissions_default() -> SupportedRegistrations { - let mut supported_registrations = SupportedRegistrations::all(); - - supported_registrations &= !SupportedRegistrations::SHUTDOWN; - - supported_registrations + fn permissions_default() -> SupportedCoreRegistrations { + SupportedCoreRegistrations::empty() } } -impl<'de> Deserialize<'de> for SupportedRegistrations { +impl<'de> Deserialize<'de> for SupportedCoreRegistrations { fn deserialize>(deserializer: D) -> Result { - let mut supported_registrations = SupportedRegistrations::empty(); + let mut supported_registrations = SupportedCoreRegistrations::empty(); let supported_registration_strings = Vec::::deserialize(deserializer)?; for supported_registration_string in supported_registration_strings { match supported_registration_string.to_uppercase().as_str() { "DEPENDENCY_FUNCTIONS" => { - supported_registrations |= SupportedRegistrations::DEPENDENCY_FUNCTIONS; - } - "DISCORD_EVENT_MESSAGE_CREATE" => { - supported_registrations |= SupportedRegistrations::DISCORD_EVENT_MESSAGE_CREATE; - } - "DISCORD_EVENT_INTERACTION_CREATE" => { - supported_registrations |= - SupportedRegistrations::DISCORD_EVENT_INTERACTION_CREATE; - } - "DISCORD_EVENT_THREAD_CREATE" => { - supported_registrations |= SupportedRegistrations::DISCORD_EVENT_THREAD_CREATE; - } - "DISCORD_EVENT_THREAD_DELETE" => { - supported_registrations |= SupportedRegistrations::DISCORD_EVENT_THREAD_DELETE; - } - "DISCORD_EVENT_THREAD_LIST_SYNC" => { - supported_registrations |= - SupportedRegistrations::DISCORD_EVENT_THREAD_LIST_SYNC; - } - "DISCORD_EVENT_THREAD_MEMBER_UPDATE" => { - supported_registrations |= - SupportedRegistrations::DISCORD_EVENT_THREAD_MEMBER_UPDATE; - } - "DISCORD_EVENT_THREAD_MEMBERS_UPDATE" => { - supported_registrations |= - SupportedRegistrations::DISCORD_EVENT_THREAD_MEMBERS_UPDATE; - } - "DISCORD_EVENT_THREAD_UPDATE" => { - supported_registrations |= SupportedRegistrations::DISCORD_EVENT_THREAD_UPDATE; + supported_registrations |= SupportedCoreRegistrations::DEPENDENCY_FUNCTIONS; } "SHUTDOWN" => { - supported_registrations |= SupportedRegistrations::SHUTDOWN; + supported_registrations |= SupportedCoreRegistrations::SHUTDOWN; } &_ => unimplemented!(), } @@ -89,8 +57,9 @@ impl<'de> Deserialize<'de> for SupportedRegistrations { pub struct AvailablePlugin { pub registry_id: String, pub id: String, + pub user_id: String, pub version: Version, - pub permissions: SupportedRegistrations, + pub permissions: SupportedCoreRegistrations, pub environment: Option>, pub settings: Option, } diff --git a/src/plugins/registry.rs b/src/plugins/registry.rs index b8fdf8d..b6e0860 100644 --- a/src/plugins/registry.rs +++ b/src/plugins/registry.rs @@ -8,11 +8,12 @@ use std::{ sync::{Arc, LazyLock}, }; -use anyhow::{Error, Result}; +use anyhow::{Result, bail}; use semver::{Version, VersionReq}; use serde::Deserialize; use tokio::fs; use tracing::{error, info, warn}; +use uuid::Uuid; use crate::{ config::Config, @@ -46,7 +47,7 @@ pub struct RegistryPluginVersion { pub deprecation_reason: Option, } -type RegistryTask = Vec>>>; +type RegistryTask = Vec>>>; static DEFAULT_REGISTRY_ID: &str = "raw.githubusercontent.com/celarye/discord-bot-plugins/refs/heads/master"; @@ -54,15 +55,26 @@ static DEFAULT_REGISTRY_ID: &str = static PROGRAM_VERSION: LazyLock = LazyLock::new(|| Version::parse(env!("CARGO_PKG_VERSION")).unwrap()); +pub async fn registry_get_plugins( + http_client_timeout_seconds: u64, + config: Config, + plugin_directory: PathBuf, + cache: bool, +) -> Result, ()> { + let http_client = Arc::new(HttpClient::new(http_client_timeout_seconds)?); + + get_plugins(http_client, config, plugin_directory, cache).await +} + pub async fn get_plugins( http_client: Arc, config: Config, base_plugin_directory_path: PathBuf, cache: bool, -) -> Result, ()> { +) -> Result, ()> { info!("Fetching and storing the plugins"); - let mut available_plugins = HashMap::new(); + let mut available_plugins = vec![]; let registries = get_cached_plugins( &base_plugin_directory_path, @@ -92,7 +104,7 @@ async fn get_cached_plugins( base_plugin_directory_path: &Path, config: Config, cache: bool, - available_plugins: &mut HashMap, + available_plugins: &mut Vec<(Uuid, AvailablePlugin)>, ) -> HashMap> { let mut registries = HashMap::new(); @@ -112,17 +124,18 @@ async fn get_cached_plugins( { Ok(cache_check) => { if let Some(plugin_version) = cache_check { - available_plugins.insert( - plugin_uid, + available_plugins.push(( + Uuid::new_v4(), AvailablePlugin { registry_id: registry_id.to_string(), id: plugin_id.to_string(), + user_id: plugin_uid, version: plugin_version, permissions: plugin_options.permissions, environment: plugin_options.environment, settings: plugin_options.settings, }, - ); + )); continue; } @@ -146,7 +159,7 @@ async fn fetch_non_cached_plugins( http_client: Arc, base_plugin_directory_path: &Path, registries: HashMap>, - available_plugins: &mut HashMap, + available_plugins: &mut Vec<(Uuid, AvailablePlugin)>, ) { let mut registry_tasks: RegistryTask = vec![]; @@ -175,8 +188,8 @@ async fn fetch_non_cached_plugins( let mut plugin_directory_path = registry_directory_path.join(plugin_id); let Some(registry_plugin) = registry.plugins.get(plugin_id) else { - return Err(Error::msg(format!("The {registry_id} registry has no {plugin_id} plugin entry", - ))); + bail!("The {registry_id} registry has no {plugin_id} plugin entry", + ); }; let Some(plugin_version) = get_plugin_matching_version( @@ -184,8 +197,8 @@ async fn fetch_non_cached_plugins( ®istry_plugin.versions, )? else { - return Err(Error::msg(format!( - "The {plugin_uid} plugin has no version which isn't marked as deprecated and is compatible with this version of the program"))); + bail!( + "The {plugin_uid} plugin has no version which isn't marked as deprecated and is compatible with this version of the program"); }; plugin_directory_path.push(plugin_version.to_string()); @@ -202,10 +215,11 @@ async fn fetch_non_cached_plugins( .await?; Ok(( - plugin_uid, + Uuid::new_v4(), AvailablePlugin { registry_id: registry_id.to_string(), id: plugin_id.to_string(), + user_id: plugin_uid, version: plugin_version, permissions: plugin_options.permissions, environment: plugin_options.environment, @@ -230,8 +244,7 @@ async fn fetch_non_cached_plugins( match registry_task.await.unwrap() { Ok(available_registry_plugins) => { for available_registry_plugin in available_registry_plugins { - available_plugins - .insert(available_registry_plugin.0, available_registry_plugin.1); + available_plugins.push(available_registry_plugin); } } Err(err) => { @@ -301,7 +314,7 @@ async fn get_plugin_latest_cached_version(plugin_path: &Path) -> Result(®istry_metadata_bytes).map_err(Error::new) + Ok(sonic_rs::from_slice::(®istry_metadata_bytes)?) } async fn fetch_plugin( diff --git a/src/plugins/runtime.rs b/src/plugins/runtime.rs index 45a6459..06d8daa 100644 --- a/src/plugins/runtime.rs +++ b/src/plugins/runtime.rs @@ -3,106 +3,90 @@ pub mod internal; -use std::{ - collections::{HashMap, HashSet}, - fs, - path::Path, - sync::Arc, -}; +use std::{collections::HashMap, fs, path::Path}; +use anyhow::Result; use serde_yaml_ng::Value; -use tokio::sync::{ - Mutex, RwLock, - mpsc::{Receiver, Sender}, - oneshot, +use tokio::{ + sync::{ + Mutex, RwLock, + mpsc::{UnboundedReceiver, UnboundedSender}, + }, + task::JoinHandle, }; -use tokio_util::sync::CancellationToken; use tracing::{error, info}; +use uuid::Uuid; use wasmtime::{Store, component::Component}; use wasmtime_wasi::{DirPerms, FilePerms, ResourceTable, WasiCtxBuilder}; use wasmtime_wasi_http::WasiHttpCtx; use crate::{ - SHUTDOWN, Shutdown, plugins::{ - AvailablePlugin, Plugin, PluginRegistrationRequests, - PluginRegistrationRequestsApplicationCommand, PluginRegistrationRequestsScheduledJob, - PluginRegistrations, builder::PluginBuilder, - discord_bot::plugin::discord_types::Events as DiscordEvents, + AvailablePlugin, Plugin, builder::PluginBuilder, + discord_bot::plugin::discord_export_types::DiscordEvents, runtime::internal::InternalRuntime, }, - utils::channels::{DiscordBotClientMessages, JobSchedulerMessages, RuntimeMessages}, + utils::channels::{ + CoreMessages, RuntimeMessages, RuntimeMessagesDiscord, RuntimeMessagesJobScheduler, + }, }; pub struct Runtime { - plugins: RwLock>, - discord_bot_client_tx: Arc>, - job_scheduler_tx: Arc>, - dbc_js_rx: RwLock>, - pub cancellation_token: CancellationToken, + plugins: RwLock>, + rx: UnboundedReceiver, } pub struct RuntimePlugin { instance: Plugin, - store: Mutex>, // TODO: Add async support, waiting for better WASIp3 component creation support + store: Mutex>, // TODO: Add async support } impl Runtime { - pub fn new( - discord_bot_client_tx: Sender, - job_scheduler_tx: Sender, - dbc_js_rx: Receiver, - ) -> Self { + pub fn new(rx: UnboundedReceiver) -> Self { + info!("Creating the WASI runtime"); + Runtime { plugins: RwLock::new(HashMap::new()), - discord_bot_client_tx: Arc::new(discord_bot_client_tx), - job_scheduler_tx: Arc::new(job_scheduler_tx), - dbc_js_rx: RwLock::new(dbc_js_rx), - cancellation_token: CancellationToken::new(), + rx, } } - pub fn start(runtime: Arc) { + pub fn start(mut self) -> JoinHandle<()> { tokio::spawn(async move { - let mut dbc_js_rx = runtime.dbc_js_rx.write().await; - - tokio::select! { - () = async { - while let Some(message) = dbc_js_rx.recv().await { - match message { - RuntimeMessages::CallDiscordEvent(plugin_name, event) => { - runtime.call_discord_event(&plugin_name, &event).await; + while let Some(message) = self.rx.recv().await { + match message { + RuntimeMessages::JobScheduler(job_scheduler_message) => { + match job_scheduler_message { + RuntimeMessagesJobScheduler::CallScheduledJob(plugin_id, job_id) => { + self.call_scheduled_job(plugin_id, job_id).await; } - RuntimeMessages::CallScheduledJob(plugin_name, scheduled_job_name) => { - runtime.call_scheduled_job(&plugin_name, &scheduled_job_name).await;} } } - } => {} - () = runtime.cancellation_token.cancelled() => { - dbc_js_rx.close(); + RuntimeMessages::Discord(discord_message) => match discord_message { + RuntimeMessagesDiscord::CallDiscordEvent(plugin_id, event) => { + self.call_discord_event(plugin_id, &event).await; + } + }, } } - }); + + self.shutdown().await; + }) } pub async fn initialize_plugins( - runtime: Arc, - plugin_builder: PluginBuilder, - plugins: HashMap, - plugin_registrations: Arc>, - directory: &Path, + &self, + available_plugins: Vec<(Uuid, AvailablePlugin)>, + core_tx: UnboundedSender, + plugin_directory: &Path, ) -> Result<(), ()> { - let mut registration_requests = PluginRegistrationRequests { - discord_event_interaction_create: super::PluginRegistrationRequestsInteractionCreate { - application_commands: vec![], - message_component: vec![], - modals: vec![], - }, - scheduled_jobs: vec![], - }; - - for (plugin_uid, plugin) in plugins { - let plugin_directory = directory + info!("Creating the WASI plugin builder"); + let plugin_builder = PluginBuilder::new(); + + info!("Initializing the plugins"); + + for (plugin_id, plugin) in available_plugins { + let plugin_directory = plugin_directory .join(&plugin.registry_id) .join(&plugin.id) .join(plugin.version.to_string()); @@ -112,7 +96,7 @@ impl Runtime { Err(err) => { error!( "An error occured while reading the {} plugin file: {err}", - plugin_uid + plugin.user_id ); continue; } @@ -123,7 +107,7 @@ impl Runtime { Err(err) => { error!( "An error occured while creating a WASI component from the {} plugin: {err}", - plugin_uid + plugin.user_id ); continue; } @@ -142,15 +126,15 @@ impl Runtime { Ok(exists) => { if !exists && let Err(err) = fs::create_dir(&workspace_plugin_dir) { error!( - "Something went wrong while creating the workspace directory for the {} plugin, error: {}", - &plugin_uid, &err + "Something went wrong while creating the workspace directory for the {} plugin, error: {err}", + plugin.user_id ); } } Err(err) => { error!( - "Something went wrong while checking if the workspace directory of the {} plugin exists, error: {}", - &plugin_uid, &err + "Something went wrong while checking if the workspace directory of the {} plugin exists, error: {err}", + plugin.user_id ); return Err(()); } @@ -165,11 +149,11 @@ impl Runtime { let mut store = Store::::new( &plugin_builder.engine, InternalRuntime::new( - plugin_uid.clone(), + plugin.user_id.clone(), wasi, WasiHttpCtx::new(), ResourceTable::new(), - Arc::downgrade(&runtime), + core_tx.clone(), ), ); @@ -180,36 +164,34 @@ impl Runtime { Ok(instance) => instance, Err(err) => { error!( - "Failed to instantiate the {} plugin, error: {}", - &plugin_uid, &err + "Failed to instantiate the {} plugin, error: {err}", + plugin.user_id ); continue; } }; - let plugin_registrations_request = match instance - .discord_bot_plugin_plugin_functions() + match instance + .discord_bot_plugin_core_export_functions() .call_initialization( &mut store, &sonic_rs::to_vec(&plugin.settings.unwrap_or(Value::default())).unwrap(), - plugin.permissions, ) .await { - Ok(init_result) => match init_result { - Ok(registrations_request) => registrations_request, - Err(err) => { + Ok(init_result) => { + if let Err(err) = init_result { error!( - "Failed to initialize the {} plugin, error: {}", - &plugin_uid, &err + "the {} plugin returned an error while intiializing: {err}", + plugin.user_id ); continue; } - }, + } Err(err) => { error!( - "The {} plugin exprienced a critical error: {}", - &plugin_uid, &err + "The {} plugin exprienced a critical error: {err}", + plugin.user_id ); continue; } @@ -220,268 +202,80 @@ impl Runtime { store: Mutex::new(store), }; - if let Some(discord_events) = plugin_registrations_request.discord_events { - if discord_events.message_create { - plugin_registrations - .write() - .await - .discord_events - .message_create - .push(plugin_uid.clone()); - } - - if discord_events.thread_create { - plugin_registrations - .write() - .await - .discord_events - .thread_create - .push(plugin_uid.clone()); - } - - if discord_events.thread_delete { - plugin_registrations - .write() - .await - .discord_events - .thread_delete - .push(plugin_uid.clone()); - } - - if discord_events.thread_list_sync { - plugin_registrations - .write() - .await - .discord_events - .thread_list_sync - .push(plugin_uid.clone()); - } - - if discord_events.thread_member_update { - plugin_registrations - .write() - .await - .discord_events - .thread_member_update - .push(plugin_uid.clone()); - } - - if discord_events.thread_members_update { - plugin_registrations - .write() - .await - .discord_events - .thread_members_update - .push(plugin_uid.clone()); - } - - if discord_events.thread_update { - plugin_registrations - .write() - .await - .discord_events - .thread_update - .push(plugin_uid.clone()); - } - - if let Some(interaction_create) = discord_events.interaction_create { - if let Some(application_commands) = interaction_create.application_commands { - for application_command in application_commands { - registration_requests - .discord_event_interaction_create - .application_commands - .push(PluginRegistrationRequestsApplicationCommand { - plugin_id: plugin_uid.clone(), - data: application_command, - }); - } - } - - if let Some(message_components) = interaction_create.message_components { - // TODO: Prevent duplicate entries - - for message_component in message_components { - plugin_registrations - .write() - .await - .discord_events - .interaction_create - .message_components - .insert(message_component.clone(), plugin_uid.clone()); - } - } - - if let Some(modals) = interaction_create.modals { - // TODO: Prevent duplicate entries - - for modal in modals { - plugin_registrations - .write() - .await - .discord_events - .interaction_create - .modals - .insert(modal.clone(), plugin_uid.clone()); - } - } - } - } - - if let Some(scheduled_jobs) = plugin_registrations_request.scheduled_jobs { - for scheduled_job in scheduled_jobs { - registration_requests.scheduled_jobs.push( - PluginRegistrationRequestsScheduledJob { - plugin_id: plugin_uid.clone(), - id: scheduled_job.0, - crons: scheduled_job.1, - }, - ); - } - } - - if let Some(dependency_functions) = plugin_registrations_request.dependency_functions { - for dependency_function in dependency_functions { - let mut plugin_registrations = plugin_registrations.write().await; - let functions = plugin_registrations - .dependency_functions - .entry(plugin_uid.clone()) - .or_insert(HashSet::new()); - - functions.insert(dependency_function); - } - } - - runtime - .plugins - .write() - .await - .insert(plugin_uid, plugin_context); + self.plugins.write().await.insert(plugin_id, plugin_context); } - let _ = runtime - .discord_bot_client_tx - .send(DiscordBotClientMessages::RegisterApplicationCommands( - registration_requests - .discord_event_interaction_create - .application_commands, - )) - .await; - - let _ = runtime - .job_scheduler_tx - .send(JobSchedulerMessages::RegisterScheduledJobs( - registration_requests.scheduled_jobs, - )) - .await; - Ok(()) } // TODO: Remove trapped plugins - async fn call_discord_event(&self, plugin_name: &str, event: &DiscordEvents) { + async fn call_discord_event(&self, plugin_id: Uuid, event: &DiscordEvents) { let plugins = self.plugins.read().await; - let plugin = plugins.get(plugin_name).unwrap(); + let plugin = plugins.get(&plugin_id).unwrap(); match plugin .instance - .discord_bot_plugin_plugin_functions() + .discord_bot_plugin_discord_export_functions() .call_discord_event(&mut *plugin.store.lock().await, event) .await { Ok(result) => { if let Err(err) = result { - error!("The {} plugin returned an error: {}", plugin_name, &err); + error!("The {plugin_id} plugin returned an error: {err}"); } } Err(err) => { - error!( - "The {} plugin exprienced a critical error: {}", - plugin_name, &err - ); + error!("The {plugin_id} plugin exprienced a critical error: {err}"); } } } - async fn call_scheduled_job(&self, plugin_name: &str, scheduled_job_name: &str) { + async fn call_scheduled_job(&self, plugin_id: Uuid, uuid: Uuid) { let plugins = self.plugins.read().await; - let plugin = plugins.get(plugin_name).unwrap(); + let plugin = plugins.get(&plugin_id).unwrap(); match plugin .instance - .discord_bot_plugin_plugin_functions() - .call_scheduled_job(&mut *plugin.store.lock().await, scheduled_job_name) + .discord_bot_plugin_job_scheduler_export_functions() + .call_scheduled_job(&mut *plugin.store.lock().await, &uuid.to_string()) .await { Ok(result) => { if let Err(err) = result { - error!("The {} plugin returned an error: {}", plugin_name, &err); + error!("The {plugin_id} plugin returned an error: {err}"); } } Err(err) => { - error!( - "The {} plugin exprienced a critical error: {}", - plugin_name, &err - ); + error!("The {plugin_id} plugin exprienced a critical error: {err}"); } } } - async fn call_shutdown(&self, plugin_name: String) { + async fn call_shutdown(&self, plugin_id: Uuid) { let plugins = self.plugins.read().await; - let plugin = plugins.get(&plugin_name).unwrap(); + let plugin = plugins.get(&plugin_id).unwrap(); match plugin .instance - .discord_bot_plugin_plugin_functions() + .discord_bot_plugin_core_export_functions() .call_shutdown(&mut *plugin.store.lock().await) .await { Ok(result) => { if let Err(err) = result { - error!("The {} plugin returned an error: {}", plugin_name, &err); + error!("The {plugin_id} plugin returned an error: {err}"); } } Err(err) => { - error!( - "The {} plugin exprienced a critical error: {}", - plugin_name, &err - ); + error!("The {plugin_id} plugin exprienced a critical error: {err}"); } } } - pub async fn shutdown(&self, shutdown_type: Shutdown) { - if SHUTDOWN.read().await.is_some() { - // TODO: Do not wait for shutdown to complete, the main function shutdown logic needs to get reworked first - self.cancellation_token.cancelled().await; - return; - } - - *SHUTDOWN.write().await = Some(shutdown_type); - - let job_scheduler_is_done = oneshot::channel(); - let discord_bot_client_is_done = oneshot::channel(); - - info!("Shutting down the job scheduler"); - let _ = self - .job_scheduler_tx - .send(JobSchedulerMessages::Shutdown(job_scheduler_is_done.0)) - .await; - let _ = job_scheduler_is_done.1.await; - - info!("Shutting down the Discord bot client shards"); - let _ = self - .discord_bot_client_tx - .send(DiscordBotClientMessages::Shutdown( - discord_bot_client_is_done.0, - )) - .await; - - let _ = discord_bot_client_is_done.1.await; - - // TODO: Allow all plugin calls to finish, call the shutdown methods on them and only then return - - self.cancellation_token.cancel(); + async fn shutdown(&self) { + // TODO: Allow all plugin calls to finish and then call the shutdown methods + // This will be achieved by closing the plugin call channel tasks which then will call + // shutdown one more time before returning } } diff --git a/src/plugins/runtime/internal.rs b/src/plugins/runtime/internal.rs index 4aef3fb..cf79e57 100644 --- a/src/plugins/runtime/internal.rs +++ b/src/plugins/runtime/internal.rs @@ -1,35 +1,22 @@ /* SPDX-License-Identifier: GPL-3.0-or-later */ /* Copyright © 2026 Eduard Smet */ -use std::sync::Weak; - -use tokio::sync::oneshot; -use tracing::{debug, error, info, trace, warn}; +use tokio::sync::mpsc::UnboundedSender; use wasmtime_wasi::{ResourceTable, WasiCtx, WasiCtxView, WasiView}; use wasmtime_wasi_http::{WasiHttpCtx, WasiHttpView}; -use crate::{ - Shutdown, - plugins::{ - discord_bot::plugin::{ - discord_types::{ - Host as DiscordTypes, Requests as DiscordRequests, Responses as DiscordResponses, - }, - host_functions::Host as HostFunctions, - host_types::{Host as HostTypes, LogLevels}, - plugin_types::Host as PluginTypes, - }, - runtime::Runtime, - }, - utils::channels::DiscordBotClientMessages, -}; +mod core; +mod discord; +mod job_scheduler; + +use crate::utils::channels::CoreMessages; pub struct InternalRuntime { - uid: String, + plugin_uid: String, wasi: WasiCtx, wasi_http: WasiHttpCtx, table: ResourceTable, - runtime: Weak, + core_tx: UnboundedSender, } impl WasiView for InternalRuntime { @@ -51,118 +38,20 @@ impl WasiHttpView for InternalRuntime { } } -impl HostFunctions for InternalRuntime { - async fn log(&mut self, level: LogLevels, message: String) { - match level { - LogLevels::Trace => trace!(message), - LogLevels::Debug => debug!(message), - LogLevels::Info => info!(message), - LogLevels::Warn => warn!(message), - LogLevels::Error => error!(message), - } - } - - async fn discord_request( - &mut self, - request: DiscordRequests, - ) -> Result, String> { - let runtime = self.runtime.upgrade().unwrap(); - - let (tx, rx) = oneshot::channel(); - - if let Err(err) = runtime - .discord_bot_client_tx - .send(DiscordBotClientMessages::Request(request, tx)) - .await - { - let err = format!( - "Something went wrong while sending a message over the Discord channel, error: {err}" - ); - - error!(err); - - return Err(err); - } - - match rx.await { - Ok(result) => result, - Err(err) => { - let err = format!("The OneShot sender was dropped: {err}"); - error!(err); - Err(err) - } - } - } - - async fn dependency_function( - &mut self, - dependency: String, - function: String, - params: Vec, - ) -> Result, String> { - let runtime = self.runtime.upgrade().unwrap(); - - let plugins = runtime.plugins.read().await; - let plugin = plugins.get(&dependency).unwrap(); - - // TODO: Check if it is an actual dependency and prevent deadlocks, the channel rework should fix - // the potential deadlocks. - - match plugin - .instance - .discord_bot_plugin_plugin_functions() - .call_dependency_function(&mut *plugin.store.lock().await, &function, ¶ms) - .await - { - Ok(call_result) => match call_result { - Ok(dependency_result) => Ok(dependency_result), - Err(err) => { - let err = format!("The plugin returned an error: {err}"); - error!(err); - Err(err) - } - }, - Err(err) => { - let err = format!("Something went wrong while calling the plugin: {err}"); - error!(err); - Err(err) - } - } - } - - async fn shutdown(&mut self, restart: bool) { - let shutdown_type = if restart { - Shutdown::Restart - } else { - Shutdown::Normal - }; - - self.runtime - .upgrade() - .unwrap() - .shutdown(shutdown_type) - .await; - } -} - -impl HostTypes for InternalRuntime {} -impl PluginTypes for InternalRuntime {} -impl DiscordTypes for InternalRuntime {} - impl InternalRuntime { pub fn new( - uid: String, + plugin_uid: String, wasi: WasiCtx, wasi_http: WasiHttpCtx, table: ResourceTable, - runtime: Weak, + core_tx: UnboundedSender, ) -> Self { InternalRuntime { - uid, + plugin_uid, wasi, wasi_http, table, - runtime, + core_tx, } } } diff --git a/src/plugins/runtime/internal/core.rs b/src/plugins/runtime/internal/core.rs new file mode 100644 index 0000000..d7dab50 --- /dev/null +++ b/src/plugins/runtime/internal/core.rs @@ -0,0 +1,64 @@ +/* SPDX-License-Identifier: GPL-3.0-or-later */ +/* Copyright © 2026 Eduard Smet */ + +use tracing::{debug, error, info, trace, warn}; + +use crate::{ + Shutdown, + plugins::{ + discord_bot::plugin::{ + core_export_types::Host as CoreExportTypesHost, + core_import_functions::{Error, Host as CoreImportFunctionsHost}, + core_import_types::{ + CoreRegistrations, CoreRegistrationsResult, Host as CoreImportTypesHost, LogLevels, + SupportedCoreRegistrations, + }, + core_types::Host as CoreTypesHost, + }, + runtime::internal::InternalRuntime, + }, + utils::channels::CoreMessages, +}; + +impl CoreTypesHost for InternalRuntime {} +impl CoreImportTypesHost for InternalRuntime {} +impl CoreExportTypesHost for InternalRuntime {} + +impl CoreImportFunctionsHost for InternalRuntime { + async fn get_supported_registrations(&mut self) -> SupportedCoreRegistrations { + todo!(); + } + + async fn register(&mut self, registrations: CoreRegistrations) -> CoreRegistrationsResult { + todo!() + } + + async fn log(&mut self, level: LogLevels, message: String) { + match level { + LogLevels::Trace => trace!(message), + LogLevels::Debug => debug!(message), + LogLevels::Info => info!(message), + LogLevels::Warn => warn!(message), + LogLevels::Error => error!(message), + } + } + + async fn shutdown(&mut self, restart: bool) { + let shutdown_type = if restart { + Shutdown::Restart + } else { + Shutdown::Normal + }; + + self.core_tx.send(CoreMessages::Shutdown(shutdown_type)); + } + + async fn dependency_function( + &mut self, + dependency_id: String, + function_id: String, + params: Vec, + ) -> Result, Error> { + todo!() + } +} diff --git a/src/plugins/runtime/internal/discord.rs b/src/plugins/runtime/internal/discord.rs new file mode 100644 index 0000000..a8345cd --- /dev/null +++ b/src/plugins/runtime/internal/discord.rs @@ -0,0 +1,38 @@ +/* SPDX-License-Identifier: GPL-3.0-or-later */ +/* Copyright © 2026 Eduard Smet */ + +use crate::plugins::{ + discord_bot::plugin::{ + core_import_types::Error, + discord_export_types::Host as DiscordExportTypesHost, + discord_import_functions::Host as DiscordImportFunctionsHost, + discord_import_types::{ + DiscordEvents, DiscordRegistrations, DiscordRegistrationsResult, DiscordRequests, + DiscordResponses, Host as DiscordImportTypesHost, + }, + }, + runtime::internal::InternalRuntime, +}; + +impl DiscordImportTypesHost for InternalRuntime {} +impl DiscordExportTypesHost for InternalRuntime {} + +impl DiscordImportFunctionsHost for InternalRuntime { + async fn get_supported_discord_registrations(&mut self) -> DiscordEvents { + todo!() + } + + async fn discord_register( + &mut self, + registrations: DiscordRegistrations, + ) -> DiscordRegistrationsResult { + todo!() + } + + async fn discord_request( + &mut self, + request: DiscordRequests, + ) -> Result, Error> { + todo!() + } +} diff --git a/src/plugins/runtime/internal/job_scheduler.rs b/src/plugins/runtime/internal/job_scheduler.rs new file mode 100644 index 0000000..f2d73b4 --- /dev/null +++ b/src/plugins/runtime/internal/job_scheduler.rs @@ -0,0 +1,28 @@ +/* SPDX-License-Identifier: GPL-3.0-or-later */ +/* Copyright © 2026 Eduard Smet */ + +use crate::plugins::{ + discord_bot::plugin::{ + job_scheduler_import_functions::Host as JobSchedulerImportFunctionsHost, + job_scheduler_import_types::{ + Host as JobSchedulerImportTypesHost, JobSchedulerRegistrations, + JobSchedulerRegistrationsResult, SupportedJobSchedulerRegistrations, + }, + }, + runtime::internal::InternalRuntime, +}; + +impl JobSchedulerImportTypesHost for InternalRuntime {} + +impl JobSchedulerImportFunctionsHost for InternalRuntime { + async fn get_supported_registrations(&mut self) -> SupportedJobSchedulerRegistrations { + todo!() + } + + async fn register( + &mut self, + registrations: JobSchedulerRegistrations, + ) -> JobSchedulerRegistrationsResult { + todo!() + } +} diff --git a/src/utils/channels.rs b/src/utils/channels.rs index cfa6531..62c0b2a 100644 --- a/src/utils/channels.rs +++ b/src/utils/channels.rs @@ -1,76 +1,124 @@ /* SPDX-License-Identifier: GPL-3.0-or-later */ /* Copyright © 2026 Eduard Smet */ +use anyhow::Result; +use fjall::Slice; use tokio::sync::{ - mpsc::{Receiver as MPSCReceiver, Sender as MPSCSender, channel}, + mpsc::{UnboundedReceiver, UnboundedSender, unbounded_channel}, oneshot::Sender as OSSender, }; +use uuid::Uuid; -use crate::plugins::{ - PluginRegistrationRequestsApplicationCommand, PluginRegistrationRequestsScheduledJob, - discord_bot::plugin::host_functions::{DiscordRequests, DiscordResponses}, - exports::discord_bot::plugin::plugin_functions::DiscordEvents, +use crate::{ + Shutdown, + database::Keyspaces, + plugins::{ + PluginRegistrationRequestsApplicationCommand, + discord_bot::plugin::{ + discord_export_types::DiscordEvents, + discord_import_types::{DiscordRequests, DiscordResponses}, + }, + }, }; -pub enum DiscordBotClientMessages { - RegisterApplicationCommands(Vec), - Request( - DiscordRequests, - OSSender, String>>, - ), - Shutdown(OSSender<()>), +pub enum CoreMessages { + DatabaseModule(DatabaseMessages), + + JobSchedulerModule(JobSchedulerMessages), + DiscordBotClientModule(DiscordBotClientMessages), + + RuntimeModule(RuntimeMessages), + + Shutdown(Shutdown), +} + +pub enum DatabaseMessages { + GetState(Keyspaces, Vec, OSSender>>), + InsertState(Keyspaces, Vec, Vec, OSSender>), + DeleteState(Keyspaces, Vec, OSSender>), + ContainsKey(Keyspaces, Vec, OSSender>), } pub enum JobSchedulerMessages { - RegisterScheduledJobs(Vec), - Shutdown(OSSender<()>), + AddJob(Uuid, String, OSSender>), + RemoveJob(Uuid, OSSender>), +} + +pub enum DiscordBotClientMessages { + RegisterApplicationCommands( + Vec, + OSSender, Vec)>>, + ), + Request(DiscordRequests, OSSender>>), } pub enum RuntimeMessages { - CallDiscordEvent(String, DiscordEvents), - CallScheduledJob(String, String), + JobScheduler(RuntimeMessagesJobScheduler), + Discord(RuntimeMessagesDiscord), +} + +pub enum RuntimeMessagesJobScheduler { + CallScheduledJob(Uuid, Uuid), +} + +pub enum RuntimeMessagesDiscord { + CallDiscordEvent(Uuid, DiscordEvents), } pub struct Channels { - pub discord_bot_client: ChannelsDiscordBotClient, + pub core: ChannelsCore, pub job_scheduler: ChannelsJobScheduler, + pub discord_bot_client: ChannelsDiscordBotClient, pub runtime: ChannelsRuntime, } -pub struct ChannelsDiscordBotClient { - pub sender: MPSCSender, - pub receiver: MPSCReceiver, +pub struct ChannelsCore { + pub job_scheduler_tx: UnboundedSender, + pub discord_bot_client_tx: UnboundedSender, + pub runtime_tx: UnboundedSender, + pub rx: UnboundedReceiver, } pub struct ChannelsJobScheduler { - pub sender: MPSCSender, - pub receiver: MPSCReceiver, + pub core_tx: UnboundedSender, + pub rx: UnboundedReceiver, +} + +pub struct ChannelsDiscordBotClient { + pub core_tx: UnboundedSender, + pub rx: UnboundedReceiver, } pub struct ChannelsRuntime { - pub discord_bot_client_sender: MPSCSender, - pub job_scheduler_sender: MPSCSender, - pub receiver: MPSCReceiver, + pub core_tx: UnboundedSender, + pub rx: UnboundedReceiver, } pub fn new() -> Channels { - let (discord_bot_client_tx, discord_bot_client_rx) = channel::(200); - let (job_scheduler_tx, job_scheduler_rx) = channel::(200); - let (runtime_tx, runtime_rx) = channel::(400); + let (core_tx, core_rx) = unbounded_channel::(); + let (job_scheduler_tx, job_scheduler_rx) = unbounded_channel::(); + let (discord_bot_client_tx, discord_bot_client_rx) = + unbounded_channel::(); + let (runtime_tx, runtime_rx) = unbounded_channel::(); Channels { - discord_bot_client: ChannelsDiscordBotClient { - sender: discord_bot_client_tx, - receiver: discord_bot_client_rx, + core: ChannelsCore { + job_scheduler_tx, + discord_bot_client_tx, + runtime_tx, + rx: core_rx, }, job_scheduler: ChannelsJobScheduler { - sender: job_scheduler_tx, - receiver: job_scheduler_rx, + core_tx: core_tx.clone(), + rx: job_scheduler_rx, + }, + discord_bot_client: ChannelsDiscordBotClient { + core_tx: core_tx.clone(), + rx: discord_bot_client_rx, }, runtime: ChannelsRuntime { - discord_bot_client_sender: runtime_tx.clone(), - job_scheduler_sender: runtime_tx, - receiver: runtime_rx, + core_tx, + rx: runtime_rx, }, } } diff --git a/src/utils/env.rs b/src/utils/env.rs index 13c58a4..fca17b4 100644 --- a/src/utils/env.rs +++ b/src/utils/env.rs @@ -3,38 +3,34 @@ use std::{env, path::Path}; -use tracing::{debug, error, info}; - +use anyhow::{Context, Result, bail}; use dotenvy; +use tracing::{debug, info}; + +pub struct Secrets { + pub discord_bot_client: String, +} -pub fn load_env_file(env_file: &Path) -> Result<(), ()> { +pub fn load_env_file(env_file_path: &Path) -> Result<()> { info!("Loading the env file"); - if let Err(err) = dotenvy::from_path(env_file) { + if let Err(err) = dotenvy::from_path(env_file_path) { if err.not_found() { - debug!("No env file found for the following path: {env_file:?}"); + debug!("No env file found at: {env_file_path:?}"); return Ok(()); } - error!("An error occurred wile trying to load the env file: {err}"); - - return Err(()); + bail!("An error occurred wile trying to load the env file: {err}"); } Ok(()) } -pub fn validate() -> Result { +pub fn get_secrets() -> Result { info!("Validating the environment variables (DISCORD_BOT_CLIENT_TOKEN)"); - if let Ok(value) = env::var("DISCORD_BOT_CLIENT_TOKEN") { - debug!("DISCORD_BOT_CLIENT_TOKEN environment variable was found: {value:.3}... (redacted)"); - - Ok(value) - } else { - error!( - "The DISCORD_BOT_CLIENT_TOKEN environment variable was not set, contains an illegal character ('=' or '0') or was not valid unicode" - ); - Err(()) - } + Ok(Secrets { + discord_bot_client: env::var("DISCORD_BOT_CLIENT_TOKEN") + .context("Failed to load the DISCORD_BOT_CLIENT_TOKEN environment variable")?, + }) } diff --git a/wit/core.wit b/wit/core.wit new file mode 100644 index 0000000..3f22801 --- /dev/null +++ b/wit/core.wit @@ -0,0 +1,65 @@ +interface core-types { + type json = list; +} + +interface core-import-types { + type error = string; + + enum log-levels { + trace, + debug, + info, + warn, + error, + } + + flags supported-core-registrations { + dependency-functions, + shutdown, + } + + /// All registrations are opt in. + record core-registrations { + dependency-functions: option>, + shutdown: option, + } + + /// The `dependency-functions` field returns separate lists of successful and failed IDs. + /// + /// The `shutdown` field returns a result to indicate if the shutdown registration was accepted or not. + record core-registrations-result { + dependency-functions: option, list>>, + shutdown: option, + } +} + +interface core-export-types { + type error = string; +} + +interface core-import-functions { + use core-import-types.{error, log-levels, core-registrations, core-registrations-result, supported-core-registrations}; + + get-supported-registrations: func() -> supported-core-registrations; + register: func(registrations: core-registrations) -> core-registrations-result; + + log: func(level: log-levels, message: string); + + shutdown: func(restart: bool); + + /// The `params` parameter and result ok value are bytes. The serialized format has to be decided between plugins. + dependency-function: func(dependency-id: string, function-id: string, params: list) -> result, error>; +} + +interface core-export-functions { + use core-export-types.{error}; + use core-types.{json}; + + initialization: func(settings: json) -> result<_, error>; + + shutdown: func() -> result<_, error>; + + /// The `params` parameter and result ok value are bytes. The serialized format has to be decided between plugins. + dependency-function: func(function-id: string, params: list) -> result, error>; +} + diff --git a/wit/discord.wit b/wit/discord.wit index d5877b0..3a50e75 100644 --- a/wit/discord.wit +++ b/wit/discord.wit @@ -1,36 +1,63 @@ -interface discord-types { - /// variant data is JSON, check the [Discord Gateway Event docs] for the structures. +interface discord-import-types { + use core-types.{json}; + + type form = list; + + flags discord-events { + message-create, + interaction-create, + thread-create, + thread-delete, + thread-list-sync, + thread-member-update, + thread-members-update, + thread-update, + } + + /// All registrations are opt in. + record discord-registrations { + events: option, + interactions: option, + } + + /// The `application-commands` field its structure can be found in the Discord docs: https://docs.discord.com/developers/interactions/application-commands#application-command-object + record discord-registrations-interactions { + application-commands: option>, + message-components: option>, + modals: option>, + } + + record discord-registrations-result { + events: discord-events, + interactions: discord-registrations-result-interactions, + } + + /// The fields return separate lists of successful and failed IDs. /// - /// [Discord Gateway Event docs]: https://discord.com/developers/docs/events/gateway-events - variant events { - interaction-create(list), - message-create(list), - thread-create(list), - thread-delete(list), - thread-list-sync(list), - thread-member-update(list), - thread-members-update(list), - thread-update(list), - } - - /// variant data last tuple entry might be JSON, check the Discord [Gateway Send Event] and HTTP Resource (like the [Message Resource docs]) docs for the structures. + /// The successful IDs gets mapped to host IDs which the plugin should use and then expect on events. + record discord-registrations-result-interactions { + application-commands: tuple>, list>, + message-components: tuple>, list>, + modals: tuple>, list>, + } + + /// Check the Discord Gateway Send Event and HTTP Resource docs for the structures of the variant data: https://discord.com/developers/docs/events/gateway-events#send-events /// - /// [Gateway Send Event]: https://discord.com/developers/docs/events/gateway-events#send-events - /// [Message Resource]: https://discord.com/developers/docs/resources/message - variant requests { + /// Message Resource Example: https://discord.com/developers/docs/resources/message + variant discord-requests { // Shard message sender commands - request-guild-members(tuple>), + request-guild-members(tuple), request-soundboard-sounds(list), - update-voice-state(tuple>), - update-presence(list), + update-voice-state(tuple), + update-presence(json), // HTTP requests add-thread-member(tuple), - create-ban(tuple>), - create-forum-thread(tuple), - create-message(tuple), - create-thread(tuple>), - create-thread-from-message(tuple>), + create-ban(tuple), + create-forum-thread(tuple), + create-message(tuple), + create-thread(tuple), + create-thread-from-message(tuple), delete-message(tuple), get-active-threads(u64), get-channel(u64), @@ -39,20 +66,54 @@ interface discord-types { get-public-archived-threads(tuple, u64, option>), get-thread-member(tuple), get-thread-members(tuple, u64, option, option>), - interaction-callback(tuple>), + interaction-callback(tuple), join-thread(u64), leave-thread(u64), remove-thread-member(tuple), - update-member(tuple>), - update-interaction-original(tuple>), + update-member(tuple), + update-interaction-original(tuple), } - // variant data is either a JSON body or a multipart form buffer. - variant contents { - json(list), - form(list), + variant body { + json(json), + form(form), } - /// responses is JSON. - type responses = list; + type discord-responses = json; +} + +interface discord-export-types { + use core-types.{json}; + + /// Check the Discord Gateway Event docs for the structures of the variant data: https://discord.com/developers/docs/events/gateway-events + variant discord-events { + interaction-create(json), + message-create(json), + thread-create(json), + thread-delete(json), + thread-list-sync(json), + thread-member-update(json), + thread-members-update(json), + thread-update(json), + } +} + +interface discord-import-functions { + use core-import-types.{error}; + use discord-import-types.{discord-events, discord-registrations, discord-registrations-result, discord-requests, discord-responses}; + + get-supported-discord-registrations: func() -> discord-events; + /// Application Command registrations past the initialization phase are ignored. + /// + /// See the following Discord docs for more information: https://docs.discord.com/developers/interactions/application-commands#registering-a-command + discord-register: func(registrations: discord-registrations) -> discord-registrations-result; + + discord-request: func(request: discord-requests) -> result, error>; +} + +interface discord-export-functions { + use core-export-types.{error}; + use discord-export-types.{discord-events}; + + discord-event: func(event: discord-events) -> result<_, error>; } diff --git a/wit/host.wit b/wit/host.wit deleted file mode 100644 index 503c03e..0000000 --- a/wit/host.wit +++ /dev/null @@ -1,42 +0,0 @@ -/// TODO: Implement the registrations-request host function, blocked by missing component -/// information on host calls, fixed by adding Store context to host functions. -/// -/// Relevant Zulip discussion: https://bytecodealliance.zulipchat.com/#narrow/channel/217126-wasmtime/topic/Support.20for.20identifying.20the.20guest.20making.20a.20host.20call/with/571099564 -interface host-types { - //record registrations-result { - // discord-events: registrations-result-discord-events-interaction-create, - // scheduled-jobs: list>, - // dependency-functions: list>, - //} - - //record registrations-result-discord-events-interaction-create { - // application-commands: list>, - // message-components: list>, - // modals: list>, - //} - - enum log-levels { - trace, - debug, - info, - warn, - error, - } -} - -interface host-functions { - use host-types.{log-levels}; - use discord-types.{requests as discord-requests, responses as discord-responses}; - - /// Can only be called during the initialization plugin call, calls at other times will be ignored. - //registrations-request: func(registrations: registrations) -> result; - - log: func(level: log-levels, message: string); - - discord-request: func(request: discord-requests) -> result, string>; - - /// params, result Ok and Err are JSON. - dependency-function: func(dependency: string, function: string, params: list) -> result, string>; - - shutdown: func(restart: bool); -} diff --git a/wit/job-scheduler.wit b/wit/job-scheduler.wit new file mode 100644 index 0000000..5b3a0ac --- /dev/null +++ b/wit/job-scheduler.wit @@ -0,0 +1,36 @@ +interface job-scheduler-import-types { + flags supported-job-scheduler-registrations { + scheduled-jobs, + } + + /// All registrations are opt in. + /// + /// The `scheduled-jobs` field list entries should exist out of cron values. + /// + /// cron: https://en.wikipedia.org/wiki/Cron + record job-scheduler-registrations { + scheduled-jobs: list, + } + + /// The `scheduled-jobs` field returns separate lists of successful and failed cron values. + record job-scheduler-registrations-result { + scheduled-jobs: tuple, list>, + } +} + +interface job-scheduler-export-types { + +} + +interface job-scheduler-import-functions { + use job-scheduler-import-types.{job-scheduler-registrations, job-scheduler-registrations-result, supported-job-scheduler-registrations}; + + get-supported-registrations: func() -> supported-job-scheduler-registrations; + register: func(registrations: job-scheduler-registrations) -> job-scheduler-registrations-result; +} + +interface job-scheduler-export-functions { + use core-export-types.{error}; + + scheduled-job: func(job-id: string) -> result<_, error>; +} diff --git a/wit/plugin.wit b/wit/plugin.wit deleted file mode 100644 index e967d1a..0000000 --- a/wit/plugin.wit +++ /dev/null @@ -1,64 +0,0 @@ -/// Down the line once map gets introduced to the component model -/// mutliple plugin types will rely on it instead of list> or list -/// to enforce uniqueness on the plugin level. -/// [Relevant PR] in the component model repository. -/// -/// [Relevant PR]: https://github.com/WebAssembly/component-model/pull/554 -interface plugin-types { - /// scheduled-jobs: tuple entry 0 is the id and entry 1 is a list of cron - /// values. - record registrations-request { - discord-events: option, - scheduled-jobs: option>>>, - dependency-functions: option>, - } - - record registrations-request-discord-events { - interaction-create: option, - message-create: bool, - thread-create: bool, - thread-delete: bool, - thread-list-sync: bool, - thread-member-update: bool, - thread-members-update: bool, - thread-update: bool, - } - - /// application-commands: tuple entry 0 is the ID and entry 1 is - /// JSON, check the [Discord Application Command docs] for the structure. - /// - /// [Discord Application Command docs]: https://discord.com/developers/docs/interactions/application-commands#application-command-object-application-command-structure - record registrations-request-interaction-create { - application-commands: option>>, - message-components: option>, - modals: option>, - } - - flags supported-registrations { - dependency-functions, - discord-event-message-create, - discord-event-interaction-create, - discord-event-thread-create, - discord-event-thread-delete, - discord-event-thread-list-sync, - discord-event-thread-member-update, - discord-event-thread-members-update, - discord-event-thread-update, - scheduled-jobs, - shutdown, - } -} - -interface plugin-functions { - use plugin-types.{registrations-request, supported-registrations}; - use discord-types.{events as discord-events, requests as discord-requests}; - - /// settings is JSON. - initialization: func(settings: list, supported-registrations: supported-registrations) -> result; - shutdown: func() -> result<_, string>; - - discord-event: func(event: discord-events) -> result<_, string>; - scheduled-job: func(job: string) -> result<_, string>; - /// params and result Ok are JSON. - dependency-function: func(function: string, params: list) -> result, string>; -} diff --git a/wit/world.wit b/wit/world.wit index 14b9017..b454c7b 100644 --- a/wit/world.wit +++ b/wit/world.wit @@ -1,7 +1,18 @@ package discord-bot:plugin@0.1.0; +/// Down the line once map gets introduced to the component model +/// multiple plugin types will rely on it instead of list> or list +/// to enforce uniqueness on the plugin level. +/// +/// Relevant PR in the component model repository: https://github.com/WebAssembly/component-model/pull/554 +/// Relevant PR in the Wasmtime repository: https://github.com/bytecodealliance/wasmtime/pull/12216 world plugin { - import host-functions; + import core-import-functions; + export core-export-functions; - export plugin-functions; + import job-scheduler-import-functions; + export job-scheduler-export-functions; + + import discord-import-functions; + export discord-export-functions; }