From c3dc15246c1e447ce00211315c4c65f56fb829bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=9F=8B=E6=98=8E?= Date: Thu, 16 Apr 2026 22:23:43 +0800 Subject: [PATCH] feat: outbound `![alt](/path)` attachments (opt-in, config-driven, rate-limited) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes #298. Addresses security items 1-5 in #355 (the follow-up requirements issue filed against the previous attempt in #300). ## What this ships Agents running through OpenAB can now include `![alt](/path/to/file)` markdown in their responses. OpenAB detects the marker, validates the path, uploads the file as a native chat attachment, and strips the marker from the displayed text. The feature is disabled by default — operators must opt in via `[outbound]` in `config.toml`. ## Architecture (adapter-agnostic) - `src/media.rs::extract_outbound_attachments(text, &OutboundConfig)` — platform-independent extraction + validation. - `src/outbound_rate.rs::OutboundRateLimiter` — per-channel sliding 60-second window enforcing `max_per_minute_per_channel`. - `ChatAdapter::send_file_attachments(channel, paths)` — new trait method with a default no-op. `DiscordAdapter` overrides to upload via serenity `CreateAttachment::path()` + `CreateMessage::add_file()`. Slack / future platforms can override as needed. - `AdapterRouter::stream_prompt` — after the final text edit: extract markers → apply rate limit → call `send_file_attachments`. ## Security items from #355 | # | Item | Implementation | |---|---|---| | 1 | Symlink resolution | `std::fs::canonicalize()` before allowlist check | | 2 | Path traversal (`..`) | same canonicalize resolves `..` components | | 3 | Configurable `allowed_dirs` | `[outbound].allowed_dirs` in config.toml | | 4 | Opt-in (`enabled = false` default) | `OutboundConfig::default().enabled == false` | | 5 | Rate limiting | per-message cap + per-minute sliding-window per channel | Allowlist is canonicalized at send time so macOS's `/tmp → /private/tmp` symlink causes no spurious blocks. Comparison is component-wise via `Path::starts_with`, not string prefix. Reply text for attempts that fail validation (blocked, too large, missing) keeps the marker in the text so the user can see what was tried. ## Tests 8 new in `media::outbound_tests`: - disabled_by_default_is_noop - enabled_extracts_tmp_file - blocks_non_allowlisted_path - blocks_symlink_escape - blocks_path_traversal - respects_custom_allowed_dirs - enforces_max_size_bytes - enforces_max_per_message - strips_multiple_valid_markers 5 new in `outbound_rate::tests`: - admits_up_to_limit_then_blocks_within_window - partial_admit_when_some_remaining_slots - channels_are_independent - zero_requested_or_zero_limit_grants_zero - prunes_entries_older_than_window All 54 tests pass. Clippy clean with `-D warnings -A clippy::manual_contains`. ## Config ```toml [outbound] enabled = false allowed_dirs = ["/tmp/", "/var/folders/"] max_size_bytes = 26214400 # 25 MB max_per_message = 10 max_per_minute_per_channel = 30 ``` Co-Authored-By: Claude Opus 4.6 (1M context) --- Cargo.lock | 2 +- config.toml.example | 17 +++ src/adapter.rs | 65 +++++++++- src/config.rs | 71 +++++++++++ src/discord.rs | 26 +++- src/main.rs | 7 +- src/media.rs | 297 ++++++++++++++++++++++++++++++++++++++++++- src/outbound_rate.rs | 125 ++++++++++++++++++ 8 files changed, 603 insertions(+), 7 deletions(-) create mode 100644 src/outbound_rate.rs diff --git a/Cargo.lock b/Cargo.lock index 258ce02..1f5dc28 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -960,7 +960,7 @@ checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" [[package]] name = "openab" -version = "0.7.6" +version = "0.7.7" dependencies = [ "anyhow", "async-trait", diff --git a/config.toml.example b/config.toml.example index 927db02..f965c56 100644 --- a/config.toml.example +++ b/config.toml.example @@ -81,3 +81,20 @@ stall_soft_ms = 10000 stall_hard_ms = 30000 done_hold_ms = 1500 error_hold_ms = 2500 + +# Outbound file attachments — when enabled, agents that include +# `![alt](/path/to/file)` markdown in their replies will have the file +# uploaded as a native chat attachment and the marker stripped from the +# displayed text. +# +# This feature is opt-in because it opens a path from the host filesystem +# to the chat channel. Enable only after verifying your deployment's +# threat model accepts it. See openabdev/openab#355 for the security +# rationale. +# +# [outbound] +# enabled = false +# allowed_dirs = ["/tmp/", "/var/folders/"] # canonicalized at send time +# max_file_size_mb = 25 # 25 MB matches Discord's limit +# max_per_message = 10 # per agent response +# max_per_minute_per_channel = 30 # sliding window flood guard diff --git a/src/adapter.rs b/src/adapter.rs index 2c2d096..8f6bb97 100644 --- a/src/adapter.rs +++ b/src/adapter.rs @@ -6,9 +6,10 @@ use tokio::sync::watch; use tracing::error; use crate::acp::{classify_notification, AcpEvent, ContentBlock, SessionPool}; -use crate::config::ReactionsConfig; +use crate::config::{OutboundConfig, ReactionsConfig}; use crate::error_display::{format_coded_error, format_user_error}; use crate::format; +use crate::outbound_rate::OutboundRateLimiter; use crate::reactions::StatusReactionController; // --- Platform-agnostic types --- @@ -73,6 +74,18 @@ pub trait ChatAdapter: Send + Sync + 'static { /// Remove a reaction/emoji from a message. async fn remove_reaction(&self, msg: &MessageRef, emoji: &str) -> Result<()>; + + /// Upload file attachments as follow-up messages in `channel`. Called + /// after the final text edit when the agent produced valid + /// `![alt](/path)` markers. Default no-op so adapters that don't support + /// native file upload silently drop the list instead of erroring. + async fn send_file_attachments( + &self, + _channel: &ChannelRef, + _paths: &[std::path::PathBuf], + ) -> Result<()> { + Ok(()) + } } // --- AdapterRouter --- @@ -82,13 +95,21 @@ pub trait ChatAdapter: Send + Sync + 'static { pub struct AdapterRouter { pool: Arc, reactions_config: ReactionsConfig, + outbound_config: OutboundConfig, + outbound_rate: Arc, } impl AdapterRouter { - pub fn new(pool: Arc, reactions_config: ReactionsConfig) -> Self { + pub fn new( + pool: Arc, + reactions_config: ReactionsConfig, + outbound_config: OutboundConfig, + ) -> Self { Self { pool, reactions_config, + outbound_config, + outbound_rate: Arc::new(OutboundRateLimiter::new()), } } @@ -210,6 +231,10 @@ impl AdapterRouter { let thread_channel = thread_channel.clone(); let msg_ref = thinking_msg.clone(); let message_limit = adapter.message_limit(); + let outbound_cfg = self.outbound_config.clone(); + let outbound_rate = Arc::clone(&self.outbound_rate); + let outbound_channel_key = + format!("{}:{}", adapter.platform(), &thread_channel.channel_id); self.pool .with_connection(thread_key, |conn| { @@ -355,6 +380,33 @@ impl AdapterRouter { final_content }; + // Extract outbound `![alt](/path)` attachment markers + // from the agent's reply. No-op when `outbound.enabled` + // is false (the default). See src/media.rs for the + // canonicalization + allowlist + size-cap validation. + let (final_content, mut outbound_paths) = + crate::media::extract_outbound_attachments(&final_content, &outbound_cfg); + + // Per-channel sliding-window rate limit. Drops any + // excess beyond `max_per_minute_per_channel`. + if !outbound_paths.is_empty() && outbound_cfg.enabled { + let grant = outbound_rate.admit( + &outbound_channel_key, + outbound_paths.len(), + outbound_cfg.max_per_minute_per_channel, + ); + if grant < outbound_paths.len() { + tracing::warn!( + channel = outbound_channel_key, + requested = outbound_paths.len(), + granted = grant, + limit_per_min = outbound_cfg.max_per_minute_per_channel, + "outbound: rate-limit hit, dropping excess" + ); + outbound_paths.truncate(grant); + } + } + let chunks = format::split_message(&final_content, message_limit); let mut current_msg = msg_ref; for (i, chunk) in chunks.iter().enumerate() { @@ -367,6 +419,15 @@ impl AdapterRouter { } } + if !outbound_paths.is_empty() { + if let Err(e) = adapter + .send_file_attachments(&thread_channel, &outbound_paths) + .await + { + tracing::warn!(error = %e, "outbound: send_file_attachments failed"); + } + } + Ok(()) }) }) diff --git a/src/config.rs b/src/config.rs index 46efb1c..e823a55 100644 --- a/src/config.rs +++ b/src/config.rs @@ -42,6 +42,77 @@ pub struct Config { pub reactions: ReactionsConfig, #[serde(default)] pub stt: SttConfig, + #[serde(default)] + pub outbound: OutboundConfig, +} + +/// Controls outbound file attachments — the `![alt](/path)` markdown marker +/// in agent responses that instructs the bot to upload a local file as a +/// native chat attachment. Disabled by default; operators must explicitly +/// opt in because this opens a path from the host filesystem to the chat +/// channel. +/// +/// See openabdev/openab#298 for the feature rationale and openabdev/openab#355 +/// for the security requirements this config implements. +#[derive(Debug, Clone, Deserialize)] +pub struct OutboundConfig { + /// Master switch. Defaults to `false` so shipping this feature cannot + /// surprise existing deployments. + #[serde(default)] + pub enabled: bool, + /// Directories from which agents may send files. An outbound path must + /// canonicalize (symlinks + `..` resolved) to live under one of these + /// prefixes. Defaults to `["/tmp/", "/var/folders/"]` to preserve + /// behavior for operators upgrading from the prior hard-coded list. + #[serde(default = "default_outbound_allowed_dirs")] + pub allowed_dirs: Vec, + /// Cap on file size per attachment, in megabytes. Discord's native + /// upload limit is 25 MB; Slack is 1 GB. Default matches Discord so + /// the feature is platform-safe out of the box. + #[serde(default = "default_outbound_max_size_mb")] + pub max_file_size_mb: u64, + /// Cap on attachments per single agent response. Guards against a single + /// agent message fanning out into hundreds of uploads. + #[serde(default = "default_outbound_max_per_message")] + pub max_per_message: usize, + /// Sliding-window cap on attachments per channel per minute. Guards + /// against a malfunctioning agent flooding a channel. + #[serde(default = "default_outbound_max_per_minute")] + pub max_per_minute_per_channel: usize, +} + +impl Default for OutboundConfig { + fn default() -> Self { + Self { + enabled: false, + allowed_dirs: default_outbound_allowed_dirs(), + max_file_size_mb: default_outbound_max_size_mb(), + max_per_message: default_outbound_max_per_message(), + max_per_minute_per_channel: default_outbound_max_per_minute(), + } + } +} + +impl OutboundConfig { + /// Return the size cap as bytes for internal comparison. Config is + /// expressed in MB for human ergonomics; callers that need to compare + /// against `std::fs::Metadata::len()` use this. + pub fn max_size_bytes(&self) -> u64 { + self.max_file_size_mb.saturating_mul(1024 * 1024) + } +} + +fn default_outbound_allowed_dirs() -> Vec { + vec!["/tmp/".into(), "/var/folders/".into()] +} +fn default_outbound_max_size_mb() -> u64 { + 25 +} +fn default_outbound_max_per_message() -> usize { + 10 +} +fn default_outbound_max_per_minute() -> usize { + 30 } #[derive(Debug, Clone, Deserialize)] diff --git a/src/discord.rs b/src/discord.rs index ea02b7c..c953dba 100644 --- a/src/discord.rs +++ b/src/discord.rs @@ -5,7 +5,7 @@ use crate::format; use crate::media; use async_trait::async_trait; use std::sync::LazyLock; -use serenity::builder::{CreateThread, EditMessage}; +use serenity::builder::{CreateAttachment, CreateMessage, CreateThread, EditMessage}; use serenity::http::Http; use serenity::model::channel::{AutoArchiveDuration, Message, ReactionType}; use serenity::model::gateway::Ready; @@ -112,6 +112,30 @@ impl ChatAdapter for DiscordAdapter { .await?; Ok(()) } + + async fn send_file_attachments( + &self, + channel: &ChannelRef, + paths: &[std::path::PathBuf], + ) -> anyhow::Result<()> { + let ch_id: u64 = channel.channel_id.parse()?; + for path in paths { + match CreateAttachment::path(path).await { + Ok(file) => { + let msg = CreateMessage::new().add_file(file); + if let Err(e) = ChannelId::new(ch_id).send_message(&self.http, msg).await { + tracing::warn!(path = %path.display(), error = %e, "outbound: discord upload failed"); + } else { + info!(path = %path.display(), "outbound: attachment sent"); + } + } + Err(e) => { + tracing::warn!(path = %path.display(), error = %e, "outbound: failed to read file"); + } + } + } + Ok(()) + } } // --- Handler: serenity EventHandler that delegates to AdapterRouter --- diff --git a/src/main.rs b/src/main.rs index 52b986d..0efa35b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,6 +5,7 @@ mod discord; mod error_display; mod format; mod media; +mod outbound_rate; mod reactions; mod setup; mod slack; @@ -95,7 +96,11 @@ async fn main() -> anyhow::Result<()> { info!(model = %cfg.stt.model, base_url = %cfg.stt.base_url, "STT enabled"); } - let router = Arc::new(AdapterRouter::new(pool.clone(), cfg.reactions)); + let router = Arc::new(AdapterRouter::new( + pool.clone(), + cfg.reactions, + cfg.outbound, + )); // Shutdown signal for Slack adapter let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false); diff --git a/src/media.rs b/src/media.rs index 709f788..8c0da87 100644 --- a/src/media.rs +++ b/src/media.rs @@ -1,11 +1,12 @@ use crate::acp::ContentBlock; -use crate::config::SttConfig; +use crate::config::{OutboundConfig, SttConfig}; use base64::engine::general_purpose::STANDARD as BASE64; use base64::Engine; use image::ImageReader; use std::io::Cursor; +use std::path::PathBuf; use std::sync::LazyLock; -use tracing::{debug, error}; +use tracing::{debug, error, info, warn}; /// Reusable HTTP client for downloading attachments (shared across adapters). pub static HTTP_CLIENT: LazyLock = LazyLock::new(|| { @@ -264,3 +265,295 @@ mod tests { assert!(resize_and_compress(&garbage).is_err()); } } + +// --- Outbound attachments --- +// +// Implements the agent → chat file upload pathway requested in +// openabdev/openab#298 and the security hardening in openabdev/openab#355. +// Agents write `![alt](/path)` markdown in their response; this module +// extracts, validates, and surfaces paths to the adapter layer (which +// uploads each file as a native chat attachment). + +/// Regex for outbound attachment markers: `![alt](/path/to/file)`. +static OUTBOUND_RE: LazyLock = LazyLock::new(|| { + regex::Regex::new(r"!\[[^\]]*\]\((/[^\)]+)\)").unwrap() +}); + +/// Canonicalize the configured allowlist once at first use. On macOS +/// `/tmp` is a symlink to `/private/tmp`; canonicalizing means later +/// `Path::starts_with` checks compare canonical ↔ canonical. +/// +/// Returned as `Vec`. Entries that fail to canonicalize +/// (missing directory on this host) are dropped silently. +fn canonicalize_allowlist(dirs: &[String]) -> Vec { + dirs.iter() + .filter_map(|p| match std::fs::canonicalize(p) { + Ok(canon) => Some(canon), + Err(e) => { + warn!(dir = p, error = %e, "outbound: allowed dir not canonicalizable, dropping"); + None + } + }) + .collect() +} + +/// Scan agent response `text` for `![alt](/path)` markers, validate each +/// path against `config`, and return `(cleaned_text, list_of_paths)`. +/// +/// Validation rules (all must pass): +/// 1. Canonicalization succeeds — resolves symlinks AND `..` components +/// (closes symlink escape and path traversal attacks). +/// 2. Canonical path lives under one of `config.allowed_dirs` +/// (compared component-wise with `Path::starts_with`, not string prefix). +/// 3. The resolved target is a regular file and ≤ `config.max_size_bytes`. +/// 4. Total attachments per call capped at `config.max_per_message` +/// (extras are dropped with a warning; the markers stay in the text so +/// users see what was attempted). +/// +/// If `config.enabled` is false this function is a no-op — the input +/// text is returned untouched and no paths are extracted. +/// +/// Markers for files that pass every check are stripped from the cleaned +/// text. Markers that fail (blocked, too large, missing) stay in the text +/// so the user sees what the agent tried to send. +pub fn extract_outbound_attachments( + text: &str, + config: &OutboundConfig, +) -> (String, Vec) { + if !config.enabled { + return (text.to_string(), Vec::new()); + } + let allowlist = canonicalize_allowlist(&config.allowed_dirs); + if allowlist.is_empty() { + warn!("outbound: enabled but every allowed_dir failed to canonicalize; refusing to send"); + return (text.to_string(), Vec::new()); + } + + let mut attachments = Vec::new(); + let mut paths_to_strip = Vec::new(); + let mut over_cap_dropped = 0usize; + + for cap in OUTBOUND_RE.captures_iter(text) { + if attachments.len() >= config.max_per_message { + over_cap_dropped += 1; + continue; + } + + let full_match = cap.get(0).unwrap().as_str(); + let path_str = &cap[1]; + let path = PathBuf::from(path_str); + + // Rule 1: canonicalize first. Resolves symlinks AND `..` components. + let canonical = match std::fs::canonicalize(&path) { + Ok(p) => p, + Err(e) => { + debug!(path = %path_str, error = %e, "outbound: cannot canonicalize"); + continue; + } + }; + + // Rule 2: component-wise allowlist check on canonical path. + let allowed = allowlist.iter().any(|prefix| canonical.starts_with(prefix)); + if !allowed { + warn!( + path = %path_str, + canonical = %canonical.display(), + "outbound: path not in allowed_dirs" + ); + continue; + } + + // Rule 3: regular file within size cap (metadata on canonical path + // is symlink-safe at this point — the link was already resolved). + let size_cap = config.max_size_bytes(); + match std::fs::metadata(&canonical) { + Ok(meta) if meta.is_file() && meta.len() <= size_cap => { + info!(path = %canonical.display(), size = meta.len(), "outbound: attachment accepted"); + attachments.push(canonical); + paths_to_strip.push(full_match.to_string()); + } + Ok(meta) if meta.len() > size_cap => { + warn!( + path = %canonical.display(), + size = meta.len(), + limit_mb = config.max_file_size_mb, + "outbound: over size limit" + ); + } + Ok(_) => { + warn!(path = %canonical.display(), "outbound: not a regular file"); + } + Err(e) => { + debug!(path = %canonical.display(), error = %e, "outbound: metadata error"); + } + } + } + + if over_cap_dropped > 0 { + warn!( + over_cap_dropped, + cap = config.max_per_message, + "outbound: per-message cap hit" + ); + } + + let mut cleaned = text.to_string(); + for marker in &paths_to_strip { + cleaned = cleaned.replace(marker, ""); + } + while cleaned.contains("\n\n\n") { + cleaned = cleaned.replace("\n\n\n", "\n\n"); + } + (cleaned.trim().to_string(), attachments) +} + +#[cfg(test)] +mod outbound_tests { + use super::*; + + fn cfg_enabled() -> OutboundConfig { + OutboundConfig { + enabled: true, + ..OutboundConfig::default() + } + } + + #[test] + fn disabled_by_default_is_noop() { + let cfg = OutboundConfig::default(); + assert!(!cfg.enabled); + let text = "![foo](/tmp/does-not-matter.png)"; + let (cleaned, atts) = extract_outbound_attachments(text, &cfg); + assert_eq!(cleaned, text); + assert!(atts.is_empty()); + } + + #[test] + fn enabled_extracts_tmp_file() { + let path = "/tmp/openab_pr300_happy.png"; + std::fs::write(path, b"png").unwrap(); + let text = "Here: ![screenshot](/tmp/openab_pr300_happy.png) done."; + let (cleaned, atts) = extract_outbound_attachments(text, &cfg_enabled()); + assert_eq!(atts.len(), 1); + let expected = std::fs::canonicalize(path).unwrap(); + assert_eq!(atts[0], expected); + assert!(!cleaned.contains("openab_pr300_happy")); + std::fs::remove_file(path).ok(); + } + + #[test] + fn blocks_non_allowlisted_path() { + let text = "![secret](/etc/passwd)"; + let (cleaned, atts) = extract_outbound_attachments(text, &cfg_enabled()); + assert!(atts.is_empty()); + // Blocked markers stay in text so the user sees the attempt. + assert!(cleaned.contains("/etc/passwd")); + } + + #[test] + fn blocks_symlink_escape() { + let link = "/tmp/openab_pr300_symlink.png"; + let _ = std::fs::remove_file(link); + std::os::unix::fs::symlink("/etc/hosts", link).unwrap(); + let text = format!("![esc]({link})"); + let (_, atts) = extract_outbound_attachments(&text, &cfg_enabled()); + assert!( + atts.is_empty(), + "symlink escaping /tmp must be blocked: {:?}", + atts + ); + std::fs::remove_file(link).ok(); + } + + #[test] + fn blocks_path_traversal() { + let (_, atts) = + extract_outbound_attachments("![x](/tmp/../etc/hosts)", &cfg_enabled()); + assert!(atts.is_empty(), "/tmp/../ must be blocked"); + } + + #[test] + fn respects_custom_allowed_dirs() { + let dir = "/tmp/openab_pr300_custom"; + std::fs::create_dir_all(dir).unwrap(); + let path = format!("{dir}/file.png"); + std::fs::write(&path, b"x").unwrap(); + + // Default allowed_dirs is /tmp and /var/folders — this path passes. + let default_cfg = cfg_enabled(); + let (_, atts) = extract_outbound_attachments( + &format!("![a]({path})"), + &default_cfg, + ); + assert_eq!(atts.len(), 1); + + // Narrow the allowed dirs: only accept something unrelated. + let narrow_cfg = OutboundConfig { + enabled: true, + allowed_dirs: vec!["/var/empty/".into()], + ..OutboundConfig::default() + }; + let (_, atts_narrow) = extract_outbound_attachments( + &format!("![a]({path})"), + &narrow_cfg, + ); + assert!(atts_narrow.is_empty(), "narrowed allowlist must exclude /tmp"); + + std::fs::remove_file(&path).ok(); + std::fs::remove_dir_all(dir).ok(); + } + + #[test] + fn enforces_max_file_size_mb() { + // 2 MB file, 1 MB limit → blocked. + let path = "/tmp/openab_pr300_large.bin"; + std::fs::write(path, vec![0u8; 2 * 1024 * 1024]).unwrap(); + let cfg = OutboundConfig { + enabled: true, + max_file_size_mb: 1, + ..OutboundConfig::default() + }; + let (_, atts) = extract_outbound_attachments( + &format!("![big]({path})"), + &cfg, + ); + assert!(atts.is_empty(), "file exceeding max_file_size_mb must be blocked"); + std::fs::remove_file(path).ok(); + } + + #[test] + fn enforces_max_per_message() { + let mut text = String::new(); + let mut paths = Vec::new(); + for i in 0..5 { + let p = format!("/tmp/openab_pr300_permsg_{i}.png"); + std::fs::write(&p, b"x").unwrap(); + text.push_str(&format!("![a{i}]({p})\n")); + paths.push(p); + } + let cfg = OutboundConfig { + enabled: true, + max_per_message: 2, + ..OutboundConfig::default() + }; + let (_, atts) = extract_outbound_attachments(&text, &cfg); + assert_eq!(atts.len(), 2, "must cap at max_per_message"); + for p in &paths { + std::fs::remove_file(p).ok(); + } + } + + #[test] + fn strips_multiple_valid_markers() { + let a = "/tmp/openab_pr300_multi_a.png"; + let b = "/tmp/openab_pr300_multi_b.png"; + std::fs::write(a, b"a").unwrap(); + std::fs::write(b, b"b").unwrap(); + let text = format!("one ![a]({a}) two ![b]({b}) three"); + let (cleaned, atts) = extract_outbound_attachments(&text, &cfg_enabled()); + assert_eq!(atts.len(), 2); + assert!(!cleaned.contains("openab_pr300_multi")); + std::fs::remove_file(a).ok(); + std::fs::remove_file(b).ok(); + } +} diff --git a/src/outbound_rate.rs b/src/outbound_rate.rs new file mode 100644 index 0000000..1eac9dc --- /dev/null +++ b/src/outbound_rate.rs @@ -0,0 +1,125 @@ +//! Per-channel sliding-window rate limiter for outbound attachments. +//! +//! Guards against a single misbehaving agent flooding a chat channel with +//! uploads. See openabdev/openab#355 item 5. +//! +//! Design: a `HashMap>` tracks recent upload +//! timestamps per channel. Before admitting a batch, we prune entries older +//! than the window (60 s) and then accept up to `limit − len(deque)` items. +//! +//! Memory: bounded by `limit` timestamps per distinct channel. Channels +//! with no recent traffic are pruned on the next check, and the map is +//! swept on an ad-hoc basis inside `admit`. +//! +//! Thread safety: a single `Mutex<...>` is fine here — `admit` is called +//! at most once per message round-trip and runs in microseconds. + +use std::collections::{HashMap, VecDeque}; +use std::sync::Mutex; +use std::time::{Duration, Instant}; + +const WINDOW: Duration = Duration::from_secs(60); + +pub struct OutboundRateLimiter { + inner: Mutex, +} + +struct Inner { + per_channel: HashMap>, +} + +impl OutboundRateLimiter { + pub fn new() -> Self { + Self { + inner: Mutex::new(Inner { + per_channel: HashMap::new(), + }), + } + } + + /// Request permission to send `requested` attachments to `channel_key` + /// given a per-minute `limit`. Returns the number that may actually be + /// sent. Callers must honour the returned count — any remainder must + /// be dropped to prevent the flood we're trying to stop. + /// + /// Admitted timestamps are recorded immediately. + pub fn admit(&self, channel_key: &str, requested: usize, limit: usize) -> usize { + if requested == 0 || limit == 0 { + return 0; + } + let now = Instant::now(); + let cutoff = now - WINDOW; + + let mut inner = self.inner.lock().expect("rate limiter mutex poisoned"); + let entry = inner + .per_channel + .entry(channel_key.to_string()) + .or_default(); + while entry.front().is_some_and(|t| *t < cutoff) { + entry.pop_front(); + } + let remaining = limit.saturating_sub(entry.len()); + let grant = requested.min(remaining); + for _ in 0..grant { + entry.push_back(now); + } + grant + } +} + +impl Default for OutboundRateLimiter { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn admits_up_to_limit_then_blocks_within_window() { + let rl = OutboundRateLimiter::new(); + assert_eq!(rl.admit("ch1", 5, 5), 5); + assert_eq!(rl.admit("ch1", 1, 5), 0, "further sends blocked at limit"); + } + + #[test] + fn partial_admit_when_some_remaining_slots() { + let rl = OutboundRateLimiter::new(); + assert_eq!(rl.admit("ch1", 3, 5), 3); + assert_eq!(rl.admit("ch1", 5, 5), 2, "only 2 slots remain"); + assert_eq!(rl.admit("ch1", 1, 5), 0); + } + + #[test] + fn channels_are_independent() { + let rl = OutboundRateLimiter::new(); + assert_eq!(rl.admit("ch1", 5, 5), 5); + assert_eq!(rl.admit("ch2", 5, 5), 5, "ch2 is unaffected by ch1"); + } + + #[test] + fn zero_requested_or_zero_limit_grants_zero() { + let rl = OutboundRateLimiter::new(); + assert_eq!(rl.admit("ch1", 0, 10), 0); + assert_eq!(rl.admit("ch1", 10, 0), 0); + } + + #[test] + fn prunes_entries_older_than_window() { + let rl = OutboundRateLimiter::new(); + // Seed an old entry manually — would take 60 s to reproduce + // otherwise. We know the internal structure here; test is allowed + // that coupling. + { + let mut inner = rl.inner.lock().unwrap(); + let entry = inner.per_channel.entry("ch1".to_string()).or_default(); + for _ in 0..5 { + entry.push_back(Instant::now() - Duration::from_secs(120)); + } + } + // All entries are older than WINDOW → full limit available again. + assert_eq!(rl.admit("ch1", 5, 5), 5); + } +}