From c874683fac0ff7efe6576b94a4650e5607c601a3 Mon Sep 17 00:00:00 2001 From: Tony Lee Date: Fri, 17 Apr 2026 18:42:02 +0800 Subject: [PATCH 01/14] feat: per-thread serialization + deferred send-once + thread_id in SenderContext --- src/adapter.rs | 81 ++++++-------------------------------------------- 1 file changed, 9 insertions(+), 72 deletions(-) diff --git a/src/adapter.rs b/src/adapter.rs index 2c2d096..01a4c5f 100644 --- a/src/adapter.rs +++ b/src/adapter.rs @@ -2,7 +2,6 @@ use anyhow::Result; use async_trait::async_trait; use serde::Serialize; use std::sync::Arc; -use tokio::sync::watch; use tracing::error; use crate::acp::{classify_notification, AcpEvent, ContentBlock, SessionPool}; @@ -41,6 +40,10 @@ pub struct SenderContext { pub display_name: String, pub channel: String, pub channel_id: String, + /// Thread identifier, if the message is inside a thread. + /// Slack: thread_ts. Discord: None (threads are separate channels). + #[serde(skip_serializing_if = "Option::is_none")] + pub thread_id: Option, pub is_bot: bool, } @@ -130,8 +133,6 @@ impl AdapterRouter { } } - let thinking_msg = adapter.send_message(thread_channel, "...").await?; - let thread_key = format!( "{}:{}", adapter.platform(), @@ -144,7 +145,7 @@ impl AdapterRouter { if let Err(e) = self.pool.get_or_create(&thread_key).await { let msg = format_user_error(&e.to_string()); let _ = adapter - .edit_message(&thinking_msg, &format!("⚠️ {msg}")) + .send_message(thread_channel, &format!("⚠️ {msg}")) .await; error!("pool error: {e}"); return Err(e); @@ -165,7 +166,6 @@ impl AdapterRouter { &thread_key, content_blocks, thread_channel, - &thinking_msg, reactions.clone(), ) .await; @@ -190,7 +190,7 @@ impl AdapterRouter { if let Err(ref e) = result { let _ = adapter - .edit_message(&thinking_msg, &format!("⚠️ {e}")) + .send_message(thread_channel, &format!("⚠️ {e}")) .await; } @@ -203,12 +203,10 @@ impl AdapterRouter { thread_key: &str, content_blocks: Vec, thread_channel: &ChannelRef, - thinking_msg: &MessageRef, reactions: Arc, ) -> Result<()> { let adapter = adapter.clone(); let thread_channel = thread_channel.clone(); - let msg_ref = thinking_msg.clone(); let message_limit = adapter.message_limit(); self.pool @@ -221,13 +219,6 @@ impl AdapterRouter { let (mut rx, _) = conn.session_prompt(content_blocks).await?; reactions.set_thinking().await; - let initial = if reset { - "⚠️ _Session expired, starting fresh..._\n\n...".to_string() - } else { - "...".to_string() - }; - let (buf_tx, buf_rx) = watch::channel(initial); - let mut text_buf = String::new(); let mut tool_lines: Vec = Vec::new(); @@ -235,43 +226,7 @@ impl AdapterRouter { text_buf.push_str("⚠️ _Session expired, starting fresh..._\n\n"); } - // Spawn edit-streaming task — only edits the single message, never sends new ones. - // Long content is truncated during streaming; final multi-message split happens after. - let streaming_limit = message_limit.saturating_sub(100); - let edit_handle = { - let adapter = adapter.clone(); - let msg_ref = msg_ref.clone(); - let mut buf_rx = buf_rx.clone(); - tokio::spawn(async move { - let mut last_content = String::new(); - loop { - tokio::time::sleep(std::time::Duration::from_millis(1500)).await; - if buf_rx.has_changed().unwrap_or(false) { - let content = buf_rx.borrow_and_update().clone(); - if content != last_content { - let display = if content.chars().count() > streaming_limit { - // Tail-priority: keep the last N chars so user - // sees the most recent agent output - let total = content.chars().count(); - let skip = total - streaming_limit; - let truncated: String = content.chars().skip(skip).collect(); - format!("…(truncated)\n{truncated}") - } else { - content.clone() - }; - let _ = adapter.edit_message(&msg_ref, &display).await; - last_content = content; - } - } - if buf_rx.has_changed().is_err() { - break; - } - } - }) - }; - // Process ACP notifications - let mut got_first_text = false; let mut response_error: Option = None; while let Some(notification) = rx.recv().await { if notification.id.is_some() { @@ -284,12 +239,7 @@ impl AdapterRouter { if let Some(event) = classify_notification(¬ification) { match event { AcpEvent::Text(t) => { - if !got_first_text { - got_first_text = true; - } text_buf.push_str(&t); - let _ = - buf_tx.send(compose_display(&tool_lines, &text_buf, true)); } AcpEvent::Thinking => { reactions.set_thinking().await; @@ -307,8 +257,6 @@ impl AdapterRouter { state: ToolState::Running, }); } - let _ = - buf_tx.send(compose_display(&tool_lines, &text_buf, true)); } AcpEvent::ToolDone { id, title, status } => { reactions.set_thinking().await; @@ -329,8 +277,6 @@ impl AdapterRouter { state: new_state, }); } - let _ = - buf_tx.send(compose_display(&tool_lines, &text_buf, true)); } _ => {} } @@ -338,10 +284,8 @@ impl AdapterRouter { } conn.prompt_done().await; - drop(buf_tx); - let _ = edit_handle.await; - // Final edit with complete content + // Send complete content as a single new message let final_content = compose_display(&tool_lines, &text_buf, false); let final_content = if final_content.is_empty() { if let Some(err) = response_error { @@ -356,15 +300,8 @@ impl AdapterRouter { }; let chunks = format::split_message(&final_content, message_limit); - let mut current_msg = msg_ref; - for (i, chunk) in chunks.iter().enumerate() { - if i == 0 { - let _ = adapter.edit_message(¤t_msg, chunk).await; - } else if let Ok(new_msg) = - adapter.send_message(&thread_channel, chunk).await - { - current_msg = new_msg; - } + for chunk in &chunks { + let _ = adapter.send_message(&thread_channel, chunk).await; } Ok(()) From 7ae0a1f794d1f64aaf7b80b4d6ec99d1d7b55b20 Mon Sep 17 00:00:00 2001 From: Tony Lee Date: Fri, 17 Apr 2026 18:42:11 +0800 Subject: [PATCH 02/14] feat(slack): per-thread serialization + deferred send-once + thread_id in SenderContext --- src/slack.rs | 82 +++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 81 insertions(+), 1 deletion(-) diff --git a/src/slack.rs b/src/slack.rs index fabf31b..13d3d22 100644 --- a/src/slack.rs +++ b/src/slack.rs @@ -384,6 +384,47 @@ impl ChatAdapter for SlackAdapter { } } +// --- Per-thread async queue (inspired by OpenClaw's KeyedAsyncQueue) --- + +/// Serialize async work per key while allowing unrelated keys to run concurrently. +/// Same-key tasks execute in FIFO order; different keys run in parallel. +/// Idle keys are cleaned up automatically after the last task settles. +struct KeyedAsyncQueue { + tails: tokio::sync::Mutex>>, +} + +impl KeyedAsyncQueue { + fn new() -> Self { + Self { + tails: tokio::sync::Mutex::new(HashMap::new()), + } + } + + /// Acquire a per-key permit. The returned guard must be held for the + /// duration of the async work. Dropping it allows the next queued task + /// for the same key to proceed. + async fn acquire(&self, key: &str) -> tokio::sync::OwnedSemaphorePermit { + let sem = { + let mut tails = self.tails.lock().await; + tails + .entry(key.to_string()) + .or_insert_with(|| Arc::new(tokio::sync::Semaphore::new(1))) + .clone() + }; + // This will wait if another task for the same key is in progress. + sem.acquire_owned().await.expect("semaphore closed unexpectedly") + } + + /// Remove semaphores that have no waiters and no held permits (idle). + /// Memory cost of idle entries is low (one Arc per thread_ts), + /// so cleanup is best-effort and not critical. + #[allow(dead_code)] + async fn cleanup_idle(&self) { + let mut tails = self.tails.lock().await; + tails.retain(|_, sem| sem.available_permits() < 1); + } +} + // --- Socket Mode event loop --- /// Hard cap on consecutive bot messages in a thread. Prevents runaway loops. @@ -406,6 +447,7 @@ pub async fn run_slack_adapter( mut shutdown_rx: watch::Receiver, ) -> Result<()> { let adapter = Arc::new(SlackAdapter::new(bot_token.clone(), session_ttl)); + let queue = Arc::new(KeyedAsyncQueue::new()); loop { // Check for shutdown before (re)connecting @@ -455,6 +497,26 @@ pub async fn run_slack_adapter( let event_type = event["type"].as_str().unwrap_or(""); match event_type { "app_mention" => { + // Apply bot gating for app_mention events (same rules as message events) + let is_bot = event["bot_id"].is_string() + || event["subtype"].as_str() == Some("bot_message"); + if is_bot { + match allow_bot_messages { + AllowBots::Off => { continue; } + AllowBots::Mentions | AllowBots::All => { + if !trusted_bot_ids.is_empty() { + let event_bot_id = event["bot_id"].as_str().unwrap_or(""); + let resolved = adapter.resolve_bot_user_id(event_bot_id).await; + let is_trusted = resolved.as_ref() + .is_some_and(|uid| trusted_bot_ids.contains(uid.as_str())); + if !is_trusted { + debug!(event_bot_id, resolved = ?resolved, "bot not in trusted_bot_ids, ignoring app_mention"); + continue; + } + } + } + } + } let event = event.clone(); let adapter = adapter.clone(); let bot_token = bot_token.clone(); @@ -462,7 +524,15 @@ pub async fn run_slack_adapter( let allowed_users = allowed_users.clone(); let stt_config = stt_config.clone(); let router = router.clone(); + let queue = queue.clone(); + // Queue key: thread_ts (existing thread) or ts (new thread) + let queue_key = event["thread_ts"] + .as_str() + .or_else(|| event["ts"].as_str()) + .unwrap_or("") + .to_string(); tokio::spawn(async move { + let _permit = queue.acquire(&queue_key).await; handle_message( &event, true, @@ -597,7 +667,7 @@ pub async fn run_slack_adapter( } } - // Dispatch to handle_message + // Dispatch to handle_message (serialized per thread) let event = event.clone(); let adapter = adapter.clone(); let bot_token = bot_token.clone(); @@ -605,7 +675,16 @@ pub async fn run_slack_adapter( let allowed_users = allowed_users.clone(); let stt_config = stt_config.clone(); let router = router.clone(); + let queue = queue.clone(); + // Queue key: thread_ts (existing thread) or channel:ts (new thread / DM) + let queue_key = event["thread_ts"] + .as_str() + .map(|s| s.to_string()) + .unwrap_or_else(|| { + format!("{}:{}", channel_id, event["ts"].as_str().unwrap_or("")) + }); tokio::spawn(async move { + let _permit = queue.acquire(&queue_key).await; handle_message( &event, is_dm, @@ -813,6 +892,7 @@ async fn handle_message( display_name, channel: "slack".into(), channel_id: channel_id.clone(), + thread_id: thread_ts.clone(), is_bot: is_bot_msg, }; From af7f3c678a4f0361850269529428862c7c2475e4 Mon Sep 17 00:00:00 2001 From: Tony Lee Date: Fri, 17 Apr 2026 18:43:00 +0800 Subject: [PATCH 03/14] feat(discord): set thread_id = None in SenderContext --- src/discord.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/discord.rs b/src/discord.rs index dead2d2..19cb914 100644 --- a/src/discord.rs +++ b/src/discord.rs @@ -361,6 +361,7 @@ impl EventHandler for Handler { display_name: display_name.to_string(), channel: "discord".into(), channel_id: msg.channel_id.to_string(), + thread_id: None, is_bot: msg.author.bot, }; From 56cdf670ce627ac3aadf4dac0d5fa7e02307258f Mon Sep 17 00:00:00 2001 From: Tony Lee Date: Fri, 17 Apr 2026 19:37:27 +0800 Subject: [PATCH 04/14] chore: remove unused edit_message trait method and implementations --- src/adapter.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/adapter.rs b/src/adapter.rs index 01a4c5f..f2d07ef 100644 --- a/src/adapter.rs +++ b/src/adapter.rs @@ -60,9 +60,6 @@ pub trait ChatAdapter: Send + Sync + 'static { /// Send a new message, returns a reference to the sent message. async fn send_message(&self, channel: &ChannelRef, content: &str) -> Result; - /// Edit an existing message in-place. - async fn edit_message(&self, msg: &MessageRef, content: &str) -> Result<()>; - /// Create a thread from a trigger message, returns the thread channel ref. async fn create_thread( &self, From 535be76c49510bd9c1ba6bbf6c1e4183049b3017 Mon Sep 17 00:00:00 2001 From: Tony Lee Date: Fri, 17 Apr 2026 19:37:46 +0800 Subject: [PATCH 05/14] chore: remove unused edit_message from SlackAdapter --- src/slack.rs | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/src/slack.rs b/src/slack.rs index 13d3d22..ea4c716 100644 --- a/src/slack.rs +++ b/src/slack.rs @@ -318,19 +318,6 @@ impl ChatAdapter for SlackAdapter { }) } - async fn edit_message(&self, msg: &MessageRef, content: &str) -> Result<()> { - let mrkdwn = markdown_to_mrkdwn(content); - self.api_post( - "chat.update", - serde_json::json!({ - "channel": msg.channel.channel_id, - "ts": msg.message_id, - "text": mrkdwn, - }), - ) - .await?; - Ok(()) - } async fn create_thread( &self, From 5cd391ba83971a5cae46d53941f509fc5ac0ca12 Mon Sep 17 00:00:00 2001 From: Tony Lee Date: Fri, 17 Apr 2026 19:37:59 +0800 Subject: [PATCH 06/14] chore: remove unused edit_message and EditMessage import from discord.rs --- src/discord.rs | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/src/discord.rs b/src/discord.rs index 19cb914..43058f8 100644 --- a/src/discord.rs +++ b/src/discord.rs @@ -5,7 +5,7 @@ use crate::format; use crate::media; use async_trait::async_trait; use std::sync::LazyLock; -use serenity::builder::{CreateThread, EditMessage}; +use serenity::builder::CreateThread; use serenity::http::Http; use serenity::model::channel::{AutoArchiveDuration, Message, ReactionType}; use serenity::model::gateway::Ready; @@ -54,18 +54,6 @@ impl ChatAdapter for DiscordAdapter { }) } - async fn edit_message(&self, msg: &MessageRef, content: &str) -> anyhow::Result<()> { - let ch_id: u64 = msg.channel.channel_id.parse()?; - let msg_id: u64 = msg.message_id.parse()?; - ChannelId::new(ch_id) - .edit_message( - &self.http, - MessageId::new(msg_id), - EditMessage::new().content(content), - ) - .await?; - Ok(()) - } async fn create_thread( &self, From 4eed3fcc0681dafc38a75423b962277294b26368 Mon Sep 17 00:00:00 2001 From: thepagent Date: Fri, 17 Apr 2026 23:55:36 -0400 Subject: [PATCH 07/14] feat(slack): conditional streaming edit based on allow_bot_messages config When allow_bot_messages=off (default): use streaming edit (placeholder + live updates via chat.update) for better human UX. When allow_bot_messages=mentions|all: use send-once to protect bot-to-bot communication from placeholder/message_changed interference. Addresses: https://github.com/openabdev/openab/pull/420#issuecomment-4272627412 --- src/adapter.rs | 77 ++++++++++++++++++++++++++++++++++++++++++++++++-- src/format.rs | 5 ++++ src/slack.rs | 25 ++++++++++++++-- 3 files changed, 102 insertions(+), 5 deletions(-) diff --git a/src/adapter.rs b/src/adapter.rs index f2d07ef..d469a11 100644 --- a/src/adapter.rs +++ b/src/adapter.rs @@ -73,6 +73,17 @@ pub trait ChatAdapter: Send + Sync + 'static { /// Remove a reaction/emoji from a message. async fn remove_reaction(&self, msg: &MessageRef, emoji: &str) -> Result<()>; + + /// Edit an existing message in-place (for streaming updates). + /// Default: unsupported (send-once only). + async fn edit_message(&self, _msg: &MessageRef, _content: &str) -> Result<()> { + Err(anyhow::anyhow!("edit_message not supported")) + } + + /// Whether this adapter should use streaming edit (true) or send-once (false). + fn use_streaming(&self) -> bool { + false + } } // --- AdapterRouter --- @@ -205,6 +216,7 @@ impl AdapterRouter { let adapter = adapter.clone(); let thread_channel = thread_channel.clone(); let message_limit = adapter.message_limit(); + let streaming = adapter.use_streaming(); self.pool .with_connection(thread_key, |conn| { @@ -223,6 +235,43 @@ impl AdapterRouter { text_buf.push_str("⚠️ _Session expired, starting fresh..._\n\n"); } + // Streaming edit: send placeholder, spawn edit loop + let (buf_tx, placeholder_msg) = if streaming { + let initial = if reset { + "⚠️ _Session expired, starting fresh..._\n\n…".to_string() + } else { + "…".to_string() + }; + let msg = adapter.send_message(&thread_channel, &initial).await?; + let (tx, rx) = tokio::sync::watch::channel(initial); + let edit_adapter = adapter.clone(); + let edit_msg = msg.clone(); + let limit = message_limit; + let mut buf_rx = rx; + tokio::spawn(async move { + let mut last = String::new(); + loop { + tokio::time::sleep(std::time::Duration::from_millis(1500)).await; + if buf_rx.has_changed().unwrap_or(false) { + let content = buf_rx.borrow_and_update().clone(); + if content != last { + let display = if content.chars().count() > limit - 100 { + format!("{}…", format::truncate_chars(&content, limit - 100)) + } else { + content.clone() + }; + let _ = edit_adapter.edit_message(&edit_msg, &display).await; + last = content; + } + } + if buf_rx.has_changed().is_err() { break; } + } + }); + (Some(tx), Some(msg)) + } else { + (None, None) + }; + // Process ACP notifications let mut response_error: Option = None; while let Some(notification) = rx.recv().await { @@ -237,6 +286,9 @@ impl AdapterRouter { match event { AcpEvent::Text(t) => { text_buf.push_str(&t); + if let Some(tx) = &buf_tx { + let _ = tx.send(compose_display(&tool_lines, &text_buf, true)); + } } AcpEvent::Thinking => { reactions.set_thinking().await; @@ -254,6 +306,9 @@ impl AdapterRouter { state: ToolState::Running, }); } + if let Some(tx) = &buf_tx { + let _ = tx.send(compose_display(&tool_lines, &text_buf, true)); + } } AcpEvent::ToolDone { id, title, status } => { reactions.set_thinking().await; @@ -274,6 +329,9 @@ impl AdapterRouter { state: new_state, }); } + if let Some(tx) = &buf_tx { + let _ = tx.send(compose_display(&tool_lines, &text_buf, true)); + } } _ => {} } @@ -281,8 +339,10 @@ impl AdapterRouter { } conn.prompt_done().await; + // Stop the edit loop + drop(buf_tx); - // Send complete content as a single new message + // Build final content let final_content = compose_display(&tool_lines, &text_buf, false); let final_content = if final_content.is_empty() { if let Some(err) = response_error { @@ -297,8 +357,19 @@ impl AdapterRouter { }; let chunks = format::split_message(&final_content, message_limit); - for chunk in &chunks { - let _ = adapter.send_message(&thread_channel, chunk).await; + if let Some(msg) = placeholder_msg { + // Streaming: edit first chunk into placeholder, send rest as new messages + if let Some(first) = chunks.first() { + let _ = adapter.edit_message(&msg, first).await; + } + for chunk in chunks.iter().skip(1) { + let _ = adapter.send_message(&thread_channel, chunk).await; + } + } else { + // Send-once: all chunks as new messages + for chunk in &chunks { + let _ = adapter.send_message(&thread_channel, chunk).await; + } } Ok(()) diff --git a/src/format.rs b/src/format.rs index 56f0fad..35aa837 100644 --- a/src/format.rs +++ b/src/format.rs @@ -60,3 +60,8 @@ pub fn shorten_thread_name(prompt: &str) -> String { } } + +/// Truncate a string to at most `limit` Unicode characters. +pub fn truncate_chars(s: &str, limit: usize) -> String { + s.chars().take(limit).collect() +} \ No newline at end of file diff --git a/src/slack.rs b/src/slack.rs index ea4c716..2e4524f 100644 --- a/src/slack.rs +++ b/src/slack.rs @@ -63,10 +63,12 @@ pub struct SlackAdapter { participated_threads: tokio::sync::Mutex>, /// TTL for participation cache entries (matches session_ttl_hours from config). session_ttl: std::time::Duration, + /// Controls streaming behavior: Off → streaming edit, Mentions/All → send-once. + allow_bot_messages: AllowBots, } impl SlackAdapter { - pub fn new(bot_token: String, session_ttl: std::time::Duration) -> Self { + pub fn new(bot_token: String, session_ttl: std::time::Duration, allow_bot_messages: AllowBots) -> Self { Self { client: reqwest::Client::new(), bot_token, @@ -75,6 +77,7 @@ impl SlackAdapter { bot_id_cache: tokio::sync::Mutex::new(HashMap::new()), participated_threads: tokio::sync::Mutex::new(HashMap::new()), session_ttl, + allow_bot_messages, } } @@ -369,6 +372,24 @@ impl ChatAdapter for SlackAdapter { Err(e) => Err(e), } } + + async fn edit_message(&self, msg: &MessageRef, content: &str) -> Result<()> { + let mrkdwn = markdown_to_mrkdwn(content); + self.api_post( + "chat.update", + serde_json::json!({ + "channel": msg.channel.channel_id, + "ts": msg.message_id, + "text": mrkdwn, + }), + ) + .await?; + Ok(()) + } + + fn use_streaming(&self) -> bool { + self.allow_bot_messages == AllowBots::Off + } } // --- Per-thread async queue (inspired by OpenClaw's KeyedAsyncQueue) --- @@ -433,7 +454,7 @@ pub async fn run_slack_adapter( router: Arc, mut shutdown_rx: watch::Receiver, ) -> Result<()> { - let adapter = Arc::new(SlackAdapter::new(bot_token.clone(), session_ttl)); + let adapter = Arc::new(SlackAdapter::new(bot_token.clone(), session_ttl, allow_bot_messages)); let queue = Arc::new(KeyedAsyncQueue::new()); loop { From c281b2ec653c7bcfe63122df266a3f9580c88e1f Mon Sep 17 00:00:00 2001 From: thepagent Date: Sat, 18 Apr 2026 00:29:48 -0400 Subject: [PATCH 08/14] fix(discord): remove mention_roles from is_mentioned check MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The mention_roles check caused false positives when multiple bots share a channel — @mentioning one bot would trigger the other if they share a Discord role. The mentions_user_id + content check is sufficient. --- src/discord.rs | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/src/discord.rs b/src/discord.rs index 43058f8..14f2594 100644 --- a/src/discord.rs +++ b/src/discord.rs @@ -204,11 +204,7 @@ impl EventHandler for Handler { self.allowed_channels.is_empty() || self.allowed_channels.contains(&channel_id); let is_mentioned = msg.mentions_user_id(bot_id) - || msg.content.contains(&format!("<@{}>", bot_id)) - || msg - .mention_roles - .iter() - .any(|r| msg.content.contains(&format!("<@&{}>", r))); + || msg.content.contains(&format!("<@{}>", bot_id)); // Bot message gating (from upstream #321) if msg.author.bot { From 54498399b49009099a998533479f135a741a67ca Mon Sep 17 00:00:00 2001 From: thepagent Date: Sat, 18 Apr 2026 02:00:31 -0400 Subject: [PATCH 09/14] fix(helm): add allowUserMessages support for Discord section in configmap Previously allowUserMessages was only rendered for the [slack] section. This adds the same support for [discord], enabling per-agent control of whether the bot requires @mention in threads. --- charts/openab/templates/configmap.yaml | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/charts/openab/templates/configmap.yaml b/charts/openab/templates/configmap.yaml index e0c4a61..29bd302 100644 --- a/charts/openab/templates/configmap.yaml +++ b/charts/openab/templates/configmap.yaml @@ -42,6 +42,12 @@ data: {{- if $cfg.discord.trustedBotIds }} trusted_bot_ids = {{ $cfg.discord.trustedBotIds | toJson }} {{- end }} + {{- if $cfg.discord.allowUserMessages }} + {{- if not (has $cfg.discord.allowUserMessages (list "involved" "mentions")) }} + {{- fail (printf "agents.%s.discord.allowUserMessages must be one of: involved, mentions — got: %s" $name $cfg.discord.allowUserMessages) }} + {{- end }} + allow_user_messages = "{{ $cfg.discord.allowUserMessages }}" + {{- end }} {{- end }} {{- if and ($cfg.slack).enabled }} From 61f9b969de35f738f9f499349c94f38f50660a1f Mon Sep 17 00:00:00 2001 From: thepagent Date: Sat, 18 Apr 2026 02:20:28 -0400 Subject: [PATCH 10/14] chore(helm): add inline comments for allow_user_messages in configmap --- charts/openab/templates/configmap.yaml | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/charts/openab/templates/configmap.yaml b/charts/openab/templates/configmap.yaml index 29bd302..ed729a0 100644 --- a/charts/openab/templates/configmap.yaml +++ b/charts/openab/templates/configmap.yaml @@ -42,11 +42,12 @@ data: {{- if $cfg.discord.trustedBotIds }} trusted_bot_ids = {{ $cfg.discord.trustedBotIds | toJson }} {{- end }} + {{- /* allowUserMessages: controls whether the bot requires @mention in threads (Discord) */ -}} {{- if $cfg.discord.allowUserMessages }} {{- if not (has $cfg.discord.allowUserMessages (list "involved" "mentions")) }} {{- fail (printf "agents.%s.discord.allowUserMessages must be one of: involved, mentions — got: %s" $name $cfg.discord.allowUserMessages) }} {{- end }} - allow_user_messages = "{{ $cfg.discord.allowUserMessages }}" + allow_user_messages = {{ $cfg.discord.allowUserMessages | toJson }} {{- /* involved (default): respond in bot's threads without @mention | mentions: always require @mention */ -}} {{- end }} {{- end }} @@ -79,7 +80,7 @@ data: {{- if not (has ($cfg.slack).allowUserMessages (list "involved" "mentions")) }} {{- fail (printf "agents.%s.slack.allowUserMessages must be one of: involved, mentions — got: %s" $name ($cfg.slack).allowUserMessages) }} {{- end }} - allow_user_messages = {{ ($cfg.slack).allowUserMessages | toJson }} + allow_user_messages = {{ ($cfg.slack).allowUserMessages | toJson }} {{- /* involved (default): respond in bot's threads without @mention | mentions: always require @mention */ -}} {{- end }} {{- end }} From 7baeba47de5368b588545c297fdb94783305ff25 Mon Sep 17 00:00:00 2001 From: thepagent Date: Sat, 18 Apr 2026 02:32:32 -0400 Subject: [PATCH 11/14] chore: add .env and .kiro/ to .gitignore --- .gitignore | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.gitignore b/.gitignore index 16dd32e..c58f29e 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,5 @@ config.toml *.swp .DS_Store +.env +.kiro/ From d027eaa6924e5d003ee50df9a73572fee4a1cf14 Mon Sep 17 00:00:00 2001 From: thepagent Date: Sat, 18 Apr 2026 02:33:32 -0400 Subject: [PATCH 12/14] chore: add trailing newline to format.rs --- src/format.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/format.rs b/src/format.rs index 35aa837..7e4a344 100644 --- a/src/format.rs +++ b/src/format.rs @@ -64,4 +64,4 @@ pub fn shorten_thread_name(prompt: &str) -> String { /// Truncate a string to at most `limit` Unicode characters. pub fn truncate_chars(s: &str, limit: usize) -> String { s.chars().take(limit).collect() -} \ No newline at end of file +} From c6e126bde36883ea9c11d1932c46a5622b75aaf7 Mon Sep 17 00:00:00 2001 From: thepagent Date: Sat, 18 Apr 2026 02:43:33 -0400 Subject: [PATCH 13/14] =?UTF-8?q?fix:=20address=20review=20feedback=20?= =?UTF-8?q?=E2=80=94=20semaphore=20leak,=20panic=20safety,=20tail-priority?= =?UTF-8?q?=20truncation?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - KeyedAsyncQueue::acquire() now does lazy cleanup when entries > 100, evicting idle semaphores (strong_count == 1 && permits available) - Replaced expect() with match + warn + skip (returns Option) to avoid task panic on closed semaphore - Removed unused cleanup_idle() (dead code) - Switched streaming truncation from head-priority to tail-priority so users see the most recent agent output during live updates - Added comments explaining queue_key construction differences between app_mention (ts-only) and message (channel:ts for DM uniqueness) --- src/adapter.rs | 2 +- src/format.rs | 11 ++++++++--- src/slack.rs | 41 +++++++++++++++++++++++++---------------- 3 files changed, 34 insertions(+), 20 deletions(-) diff --git a/src/adapter.rs b/src/adapter.rs index d469a11..5c39369 100644 --- a/src/adapter.rs +++ b/src/adapter.rs @@ -256,7 +256,7 @@ impl AdapterRouter { let content = buf_rx.borrow_and_update().clone(); if content != last { let display = if content.chars().count() > limit - 100 { - format!("{}…", format::truncate_chars(&content, limit - 100)) + format!("…{}", format::truncate_chars_tail(&content, limit - 100)) } else { content.clone() }; diff --git a/src/format.rs b/src/format.rs index 7e4a344..fbf76c6 100644 --- a/src/format.rs +++ b/src/format.rs @@ -61,7 +61,12 @@ pub fn shorten_thread_name(prompt: &str) -> String { } -/// Truncate a string to at most `limit` Unicode characters. -pub fn truncate_chars(s: &str, limit: usize) -> String { - s.chars().take(limit).collect() +/// Truncate a string to at most `limit` Unicode characters, keeping the tail +/// (most recent output) for better streaming UX. +pub fn truncate_chars_tail(s: &str, limit: usize) -> String { + let total = s.chars().count(); + if total <= limit { + return s.to_string(); + } + s.chars().skip(total - limit).collect() } diff --git a/src/slack.rs b/src/slack.rs index 2e4524f..15d7e1a 100644 --- a/src/slack.rs +++ b/src/slack.rs @@ -411,25 +411,28 @@ impl KeyedAsyncQueue { /// Acquire a per-key permit. The returned guard must be held for the /// duration of the async work. Dropping it allows the next queued task /// for the same key to proceed. - async fn acquire(&self, key: &str) -> tokio::sync::OwnedSemaphorePermit { + /// + /// Performs lazy cleanup of idle semaphores to prevent unbounded growth + /// in long-running deployments. + async fn acquire(&self, key: &str) -> Option { let sem = { let mut tails = self.tails.lock().await; + // Lazy cleanup: evict idle entries (available_permits == 1 means no one is holding or waiting) + if tails.len() > 100 { + tails.retain(|_, sem| Arc::strong_count(sem) > 1 || sem.available_permits() < 1); + } tails .entry(key.to_string()) .or_insert_with(|| Arc::new(tokio::sync::Semaphore::new(1))) .clone() }; - // This will wait if another task for the same key is in progress. - sem.acquire_owned().await.expect("semaphore closed unexpectedly") - } - - /// Remove semaphores that have no waiters and no held permits (idle). - /// Memory cost of idle entries is low (one Arc per thread_ts), - /// so cleanup is best-effort and not critical. - #[allow(dead_code)] - async fn cleanup_idle(&self) { - let mut tails = self.tails.lock().await; - tails.retain(|_, sem| sem.available_permits() < 1); + match sem.acquire_owned().await { + Ok(permit) => Some(permit), + Err(e) => { + warn!(key, error = %e, "semaphore closed, skipping message"); + None + } + } } } @@ -533,14 +536,17 @@ pub async fn run_slack_adapter( let stt_config = stt_config.clone(); let router = router.clone(); let queue = queue.clone(); - // Queue key: thread_ts (existing thread) or ts (new thread) + // Queue key: thread_ts if already in a thread, otherwise ts. + // app_mention always has a channel context, so ts alone + // is unique enough (unlike message events in DMs where + // we prefix with channel_id to avoid ts collisions). let queue_key = event["thread_ts"] .as_str() .or_else(|| event["ts"].as_str()) .unwrap_or("") .to_string(); tokio::spawn(async move { - let _permit = queue.acquire(&queue_key).await; + let Some(_permit) = queue.acquire(&queue_key).await else { return }; handle_message( &event, true, @@ -684,7 +690,10 @@ pub async fn run_slack_adapter( let stt_config = stt_config.clone(); let router = router.clone(); let queue = queue.clone(); - // Queue key: thread_ts (existing thread) or channel:ts (new thread / DM) + // Queue key: thread_ts if in a thread, otherwise channel:ts. + // Prefixed with channel_id for non-thread messages because + // DMs and channels can have overlapping ts values — the + // prefix ensures keys are globally unique. let queue_key = event["thread_ts"] .as_str() .map(|s| s.to_string()) @@ -692,7 +701,7 @@ pub async fn run_slack_adapter( format!("{}:{}", channel_id, event["ts"].as_str().unwrap_or("")) }); tokio::spawn(async move { - let _permit = queue.acquire(&queue_key).await; + let Some(_permit) = queue.acquire(&queue_key).await else { return }; handle_message( &event, is_dm, From 17c3e4c74e4376e4189152c030c1d43ed8ba4e72 Mon Sep 17 00:00:00 2001 From: thepagent Date: Sat, 18 Apr 2026 02:47:32 -0400 Subject: [PATCH 14/14] chore: remove extra blank line in discord.rs --- src/discord.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/discord.rs b/src/discord.rs index 14f2594..11c0e2e 100644 --- a/src/discord.rs +++ b/src/discord.rs @@ -54,7 +54,6 @@ impl ChatAdapter for DiscordAdapter { }) } - async fn create_thread( &self, channel: &ChannelRef,