Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 34 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ edition = "2024"
[dependencies]
anyhow = "1.0.101"
async-trait = "0.1"
axum = "0.8.8"
axum = { version = "0.8.8", features = ["ws"] }
chrono = { version = "0.4", features = ["serde"] }
dotenv = "0.15.0"
poise = "0.6.1"
Expand Down
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,20 @@

https://discord.com/oauth2/authorize?client_id=1367651749262921868&permissions=277025458240&integration_type=0&scope=bot

## Create your own

You can easily create your own version of this bot.

It needs the premissions in 0Auth2:

- Bot
- Create commands
- Send messages
- Send messages in Threads
- Read Message History
- Add Reactions
- Use Appliation Commands

## How to run

Make sure that you have copied `.env.example` to `.env` and filled in the missing enviornment variables.
Expand Down
12 changes: 12 additions & 0 deletions justfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,16 @@
set dotenv-load

dev:
cargo run

up:
docker compose up redis postgres -d

down:
docker compose down

test:
cargo test -- --test-threads=1

migrate:
sqlx migrate run --source migrations --database-url "$DATABASE_URL"
53 changes: 50 additions & 3 deletions src/adapters/http/mod.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
use axum::{
extract::{
ws::{Message, WebSocket, WebSocketUpgrade},
Path, State,
},
response::Response,
routing::{any, get},
Json, Router,
extract::{Path, State},
routing::get,
};
use tower::ServiceBuilder;
use tower_http::trace::TraceLayer;
use tracing::info;

use std::{io, sync::Arc};

use crate::domain::{OrderRepository, QueueEntry, QueueRepository};
use crate::domain::{OrderRepository, QueueEntry, QueueEvent, QueueRepository};

#[derive(Clone)]
pub struct AppState {
Expand All @@ -35,7 +39,9 @@ impl HttpAdapter {
});

let app = Router::new()
.route("/{guild_id}/status", get(queue_status))
.route("/{guild_id}/queue", get(list_queue))
.route("/{guild_id}/queue/ws", any(list_queue_ws))
.layer(ServiceBuilder::new().layer(TraceLayer::new_for_http()))
.with_state(state);

Expand All @@ -46,10 +52,51 @@ impl HttpAdapter {
}
}

async fn queue_status(State(state): State<Arc<AppState>>, Path(guild_id): Path<String>) -> String {
let is_open = state.queue.is_open(&guild_id);
let status = if is_open { "open" } else { "closed" };
status.to_string()
}

async fn list_queue(
State(state): State<Arc<AppState>>,
Path(guild_id): Path<String>,
) -> Json<Vec<QueueEntry>> {
let queue = state.queue.list(&guild_id).await;
Json(queue)
}

async fn list_queue_ws(
State(state): State<Arc<AppState>>,
Path(guild_id): Path<String>,
ws: WebSocketUpgrade,
) -> Response {
ws.on_upgrade(move |socket| list_queue_ws_handler(state, guild_id, socket))
}

async fn list_queue_ws_handler(state: Arc<AppState>, guild_id: String, mut socket: WebSocket) {
info!("new websocket connection for guild_id: {}", guild_id);

let mut rx = state.queue.subscribe();

// Initial state
let queue = state.queue.list(&guild_id).await;
let msg = serde_json::to_string(&queue).unwrap();
if socket.send(Message::Text(msg.into())).await.is_err() {
return;
}

while let Ok(event) = rx.recv().await {
match event {
QueueEvent::Updated { guild_id: gid } => {
if gid == guild_id {
let queue = state.queue.list(&guild_id).await;
let msg = serde_json::to_string(&queue).unwrap();
if socket.send(Message::Text(msg.into())).await.is_err() {
break;
}
}
}
}
}
}
2 changes: 1 addition & 1 deletion src/domain/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@ pub mod order;
pub mod queue;

pub use order::{DailyStats, OrderRepository};
pub use queue::{QueueEntry, QueueRepository};
pub use queue::{QueueEntry, QueueEvent, QueueRepository};
8 changes: 8 additions & 0 deletions src/domain/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ impl QueueEntry {
}
}

#[derive(Debug, Clone)]
pub enum QueueEvent {
Updated { guild_id: String },
}

#[async_trait::async_trait]
pub trait QueueRepository: Send + Sync {
/// Open the queue to allow new entries
Expand Down Expand Up @@ -47,4 +52,7 @@ pub trait QueueRepository: Send + Sync {

/// Clear the queue
async fn clear(&self, guild_id: &str);

/// Subscribe to queue change events
fn subscribe(&self) -> tokio::sync::broadcast::Receiver<QueueEvent>;
}
34 changes: 33 additions & 1 deletion src/infrastructure/redis_queue_repository.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use std::{collections::HashSet, sync::RwLock};

use redis::AsyncCommands;
use tokio::sync::broadcast;
use tracing::{debug, error, info, instrument, warn};

use crate::domain::{QueueEntry, QueueRepository};
use crate::domain::{QueueEntry, QueueEvent, QueueRepository};

fn queue_key(guild_id: &str) -> String {
format!("queue:{guild_id}")
Expand All @@ -12,13 +13,16 @@ fn queue_key(guild_id: &str) -> String {
pub struct RedisQueueRepository {
redis: redis::Client,
open_guilds: RwLock<HashSet<String>>,
event_tx: broadcast::Sender<QueueEvent>,
}

impl RedisQueueRepository {
pub fn new(redis: redis::Client) -> Self {
let (event_tx, _) = broadcast::channel(64);
Self {
redis,
open_guilds: RwLock::new(HashSet::new()),
event_tx,
}
}
}
Expand All @@ -39,6 +43,7 @@ impl QueueRepository for RedisQueueRepository {
info!(guild_id, "Closing queue for guild");
self.open_guilds.write().unwrap().remove(guild_id);
self.clear(guild_id).await;
self.broadcast_update(guild_id);
}

#[instrument(skip(self), fields(guild_id))]
Expand Down Expand Up @@ -111,6 +116,7 @@ impl QueueRepository for RedisQueueRepository {
0
});
info!(guild_id, user_id = %entry.user_id, queue_size = new_size, "Added user to queue");
self.broadcast_update(guild_id);
new_size
}

Expand All @@ -134,6 +140,9 @@ impl QueueRepository for RedisQueueRepository {
Some(e) => info!(guild_id, user_id = %e.user_id, "Popped user from queue"),
None => debug!(guild_id, "No entry to pop from queue"),
}
if entry.is_some() {
self.broadcast_update(guild_id);
}
entry
}

Expand All @@ -159,6 +168,9 @@ impl QueueRepository for RedisQueueRepository {
.filter_map(|json_str| serde_json::from_str(&json_str).ok())
.collect();
info!(guild_id, count = entries.len(), "Popped entries from queue");
if !entries.is_empty() {
self.broadcast_update(guild_id);
}
entries
}

Expand Down Expand Up @@ -201,6 +213,26 @@ impl QueueRepository for RedisQueueRepository {
} else {
error!(guild_id, "Failed to get Redis connection for clear");
}
self.broadcast_update(guild_id);
}

fn subscribe(&self) -> tokio::sync::broadcast::Receiver<QueueEvent> {
self.event_tx.subscribe()
}
}

impl RedisQueueRepository {
fn broadcast_update(&self, guild_id: &str) {
if let Err(err) = self.event_tx.send(QueueEvent::Updated {
guild_id: guild_id.to_string(),
}) {
error!(
guild_id,
"error" = ?err,
"guild_id" = guild_id,
"Failed to broadcast queue update event"
);
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::env;

use tracing::{error, info};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use vaffelbot_rs::{VaffelBot, config::Config};
use vaffelbot_rs::{config::Config, VaffelBot};

#[tokio::main]
async fn main() {
Expand Down