From 2cbfe269ce97940e3c8bf609e41c3a96934fc8dd Mon Sep 17 00:00:00 2001 From: ruandan Date: Sat, 18 Apr 2026 21:31:14 +0800 Subject: [PATCH 1/3] =?UTF-8?q?feat:=20broadcast=20shutdown=20notification?= =?UTF-8?q?=20across=20all=20active=20threads=20(RFC=20#78=20=C2=A71d=20Ph?= =?UTF-8?q?ase=201)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit On SIGINT or SIGTERM the broker now posts a short notification to every active thread across all configured adapters (Discord, Slack, …) before closing shards. Users get a clear signal that the broker is going away instead of replies cutting off silently mid-stream. Discord Discussion URL: https://discord.com/channels/1488041051187974246/1495050997461024879 - `SessionPool` stores each active session's addressing info alongside its connection (`addresses: HashMap`), kept in lockstep with `state.active` so broadcast has a single source of truth. No parallel cache in the adapter layer. - `SessionPool::begin_shutdown()` flips the shutdown flag and snapshots `addresses` under the state write lock, making admission and snapshot atomic with respect to each other. - `AdapterRouter::broadcast_shutdown(message, timeout)` calls `begin_shutdown`, then posts the notification to every snapshot entry in parallel via `tokio::task::JoinSet`. A `tokio::select!` with a 10-second deadline caps total broadcast time so shutdown itself never blocks on a slow platform. - `main.rs` shutdown handler listens for SIGTERM in addition to SIGINT (`#[cfg(unix)]`-gated with a ctrl_c-only fallback for non-Unix), and runs `broadcast_shutdown` before `shard_manager.shutdown_all()`. Both the Discord branch and the Slack-only branch use the same flow via a shared `wait_for_shutdown_signal()` helper. Every session in the pool at broadcast time is either: 1. In the broadcast snapshot (receives the notification), or 2. Rejected at admission (receives an inline "Bot is shutting down" notice through `handle_message`'s pool-error path). `get_or_create` checks `is_shutting_down()` at four atomic points: 1. Fast-fail on entry (cheap). 2. Inside the initial `state.read()` block (catches shutdowns that ran before we reached this task). 3. Before the `return Ok(())` on the existing-alive-session path, with a `state.read()` barrier to synchronize with `begin_shutdown`'s `state.write()` flag flip (catches shutdowns that started while we were waiting on the per-connection mutex). 4. Inside the final `state.write()` block before insert (atomic with `begin_shutdown`'s flag flip + snapshot). - Notification is best-effort: delivery errors and the 10-second deadline both fall through to normal pool teardown. - Wording is intentionally neutral ("Context will reset on return") — we don't promise automatic session resume; RFC #78 Phase 2 persistence is a separate follow-up. - Per-channel parallel send is safe under Discord/Slack rate limits (limits are per-channel, not per-adapter) and is ~10× faster than sequential for deployments with several active threads. - `cargo test` passes: 43 passed, 0 failed. - `cargo clippy --all-targets -- -D warnings` clean. - Verified on bare-metal and Docker brokers: SIGTERM triggers `broadcasting shutdown notification count=N` log and the message arrives in every active thread. Relates to: #78, #75 --- src/acp/pool.rs | 80 +++++++++++++++++++++++++++++++++++++++++++++-- src/adapter.rs | 83 +++++++++++++++++++++++++++++++++++++++++++++---- src/main.rs | 57 ++++++++++++++++++++++++++++----- 3 files changed, 204 insertions(+), 16 deletions(-) diff --git a/src/acp/pool.rs b/src/acp/pool.rs index e1d27bfd..0121e24a 100644 --- a/src/acp/pool.rs +++ b/src/acp/pool.rs @@ -1,7 +1,8 @@ use crate::acp::connection::AcpConnection; use crate::acp::protocol::ConfigOption; +use crate::adapter::{ChannelRef, ChatAdapter}; use crate::config::AgentConfig; -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, bail, Result}; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::{Mutex, RwLock}; @@ -16,6 +17,11 @@ struct PoolState { /// Lock-free cancel handles: thread_key → (stdin, session_id). /// Stored separately so cancel can work without locking the connection. cancel_handles: HashMap>, String)>, + /// Addressing info for each active thread. Populated alongside `active` + /// and pruned together. Used by `begin_shutdown` so the broker can post + /// a notification to every live session without the adapter layer + /// maintaining a parallel cache. Invariant: `addresses.keys() == active.keys()`. + addresses: HashMap)>, /// Suspended sessions: thread_key → ACP sessionId. /// Saved on eviction so sessions can be resumed via `session/load`. suspended: HashMap, @@ -28,6 +34,10 @@ pub struct SessionPool { state: RwLock, config: AgentConfig, max_sessions: usize, + /// Flipped by `begin_shutdown` to reject new admissions. Checked inside + /// `get_or_create` under the state write lock so admission and snapshot + /// are atomic. + shutting_down: std::sync::atomic::AtomicBool, } type EvictionCandidate = ( @@ -67,15 +77,53 @@ impl SessionPool { state: RwLock::new(PoolState { active: HashMap::new(), cancel_handles: HashMap::new(), + addresses: HashMap::new(), suspended: HashMap::new(), creating: HashMap::new(), }), config, max_sessions, + shutting_down: std::sync::atomic::AtomicBool::new(false), } } - pub async fn get_or_create(&self, thread_id: &str) -> Result<()> { + /// True once `begin_shutdown` has been called. Router uses this to show a + /// shutdown-specific message instead of a generic pool error when + /// `get_or_create` rejects admission. + pub fn is_shutting_down(&self) -> bool { + self.shutting_down + .load(std::sync::atomic::Ordering::Acquire) + } + + /// Flip the pool into shutting-down state and return a snapshot of every + /// live session's addressing info. Takes the state write lock so the + /// snapshot is atomic with respect to in-flight `get_or_create` calls: + /// any admission that committed before us is included; any that comes + /// after us sees the flag inside the same lock and rejects. + pub async fn begin_shutdown(&self) -> Vec<(String, ChannelRef, Arc)> { + let state = self.state.write().await; + self.shutting_down + .store(true, std::sync::atomic::Ordering::Release); + state + .addresses + .iter() + .map(|(k, (c, a))| (k.clone(), c.clone(), a.clone())) + .collect() + } + + pub async fn get_or_create( + &self, + thread_id: &str, + channel: &ChannelRef, + adapter: &Arc, + ) -> Result<()> { + // Fast-fail: avoid spawning a fresh ACP process if shutdown is already + // in progress. The authoritative check happens again under the state + // write lock below so we also catch shutdowns that start mid-spawn. + if self.is_shutting_down() { + bail!("pool is shutting down"); + } + let create_gate = { let mut state = self.state.write().await; get_or_insert_gate(&mut state.creating, thread_id) @@ -84,6 +132,9 @@ impl SessionPool { let (existing, saved_session_id) = { let state = self.state.read().await; + if self.is_shutting_down() { + bail!("pool is shutting down"); + } ( state.active.get(thread_id).cloned(), state.suspended.get(thread_id).cloned(), @@ -95,6 +146,16 @@ impl SessionPool { if let Some(conn) = existing.clone() { let conn = conn.lock().await; if conn.alive() { + // Re-check shutdown state after waiting on the per-connection + // mutex. Taking `state.read()` synchronizes us with + // `begin_shutdown`'s write-lock flag flip, so the flag value + // we see here reflects every `begin_shutdown` that has + // committed. This closes the race where shutdown starts + // while we were waiting on `conn.lock()`. + let _sync = self.state.read().await; + if self.is_shutting_down() { + bail!("pool is shutting down"); + } return Ok(()); } if saved_session_id.is_none() { @@ -174,6 +235,14 @@ impl SessionPool { let mut state = self.state.write().await; + // Admission check inside the state write lock. This is atomic with + // `begin_shutdown`'s flag-flip + snapshot: a shutdown that started + // during our ACP spawn is caught here, and our work is thrown away + // rather than being added to a pool that is about to be torn down. + if self.is_shutting_down() { + bail!("pool is shutting down"); + } + // Another task may have created a healthy connection while we were // initializing this one. if let Some(existing) = state.active.get(thread_id).cloned() { @@ -186,11 +255,13 @@ impl SessionPool { warn!(thread_id, "stale connection, rebuilding"); drop(existing); state.active.remove(thread_id); + state.addresses.remove(thread_id); } if state.active.len() >= self.max_sessions { if let Some((key, expected_conn, _, sid)) = eviction_candidate { if remove_if_same_handle(&mut state.active, &key, &expected_conn).is_some() { + state.addresses.remove(&key); info!(evicted = %key, "pool full, suspending oldest idle session"); if let Some(sid) = sid { state.suspended.insert(key, sid); @@ -216,6 +287,9 @@ impl SessionPool { if !cancel_session_id.is_empty() { state.cancel_handles.insert(thread_id.to_string(), (cancel_handle, cancel_session_id)); } + state + .addresses + .insert(thread_id.to_string(), (channel.clone(), adapter.clone())); Ok(()) } @@ -324,6 +398,7 @@ impl SessionPool { let mut state = self.state.write().await; for (key, expected_conn, sid) in stale { if remove_if_same_handle(&mut state.active, &key, &expected_conn).is_some() { + state.addresses.remove(&key); info!(thread_id = %key, "cleaning up idle session"); if let Some(sid) = sid { state.suspended.insert(key, sid); @@ -336,6 +411,7 @@ impl SessionPool { let mut state = self.state.write().await; let count = state.active.len(); state.active.clear(); // Drop impl kills process groups + state.addresses.clear(); info!(count, "pool shutdown complete"); } } diff --git a/src/adapter.rs b/src/adapter.rs index 189e98cf..a4b29582 100644 --- a/src/adapter.rs +++ b/src/adapter.rs @@ -2,7 +2,7 @@ use anyhow::Result; use async_trait::async_trait; use serde::Serialize; use std::sync::Arc; -use tracing::error; +use tracing::{error, info, warn}; use crate::acp::{classify_notification, AcpEvent, ContentBlock, SessionPool}; use crate::config::ReactionsConfig; @@ -154,11 +154,30 @@ impl AdapterRouter { .unwrap_or(&thread_channel.channel_id) ); - if let Err(e) = self.pool.get_or_create(&thread_key).await { - let msg = format_user_error(&e.to_string()); - let _ = adapter - .send_message(thread_channel, &format!("⚠️ {msg}")) - .await; + // Session admission. The pool itself is authoritative: it rejects + // with an error if `begin_shutdown` has already fired, and on success + // stores the `ChannelRef` + adapter so `broadcast_shutdown` can reach + // this thread without the router keeping a parallel cache. + if let Err(e) = self.pool.get_or_create(&thread_key, thread_channel, adapter).await { + if self.pool.is_shutting_down() { + // Don't send the shutdown rejection back to bot-authored events. + // Slack (and potentially any other platform that doesn't drop + // the bot's own posts) would deliver our broadcast message as a + // new bot event, route it here during the shutdown window, and + // we'd reply with another bot-authored rejection — looping until + // the bot-turn cap trips. Human senders still get the notice. + if !sender.is_bot { + let _ = adapter + .send_message( + thread_channel, + "⚠️ Bot is shutting down and cannot accept new messages right now.", + ) + .await; + } + return Ok(()); + } + let msg = format!("⚠️ {}", format_user_error(&e.to_string())); + let _ = adapter.send_message(thread_channel, &msg).await; error!("pool error: {e}"); return Err(e); } @@ -209,6 +228,58 @@ impl AdapterRouter { result } + /// Broadcast a short notification to every active thread, across all + /// configured adapters, before the broker shuts down. Sends happen in + /// parallel and are capped by `timeout`; the call returns early if the + /// deadline is hit so shutdown itself is never blocked by a slow platform. + /// + /// Delivery is best-effort: evicted sessions whose `ChannelRef` is still + /// in the cache still receive the notification, which is the behavior we + /// want (the user saw the thread was in flight; they deserve to know the + /// broker is going away). + pub async fn broadcast_shutdown(&self, message: &str, timeout: std::time::Duration) { + // The pool owns both the flag flip and the live-session snapshot and + // performs both atomically under its state write lock. Any message + // admitted before us is in the snapshot; any that comes after sees + // the flag inside the same lock and returns an admission error that + // `handle_message` surfaces inline. + let snapshot = self.pool.begin_shutdown().await; + + if snapshot.is_empty() { + return; + } + + info!(count = snapshot.len(), "broadcasting shutdown notification"); + + let mut set = tokio::task::JoinSet::new(); + for (thread_key, channel, adapter) in snapshot { + let message = message.to_string(); + set.spawn(async move { + if let Err(e) = adapter.send_message(&channel, &message).await { + warn!(thread_key, error = %e, "failed to post shutdown notification"); + } + }); + } + + let deadline = tokio::time::sleep(timeout); + tokio::pin!(deadline); + loop { + tokio::select! { + biased; + _ = &mut deadline => { + warn!(timeout_ms = timeout.as_millis() as u64, "shutdown broadcast timed out; remaining sends cancelled"); + set.shutdown().await; + return; + } + next = set.join_next() => { + if next.is_none() { + return; + } + } + } + } + } + async fn stream_prompt( &self, adapter: &Arc, diff --git a/src/main.rs b/src/main.rs index 53927f40..56337857 100644 --- a/src/main.rs +++ b/src/main.rs @@ -18,6 +18,15 @@ use std::path::PathBuf; use std::sync::Arc; use tracing::{error, info, warn}; +/// Neutral shutdown notification broadcast to every active thread. Wording +/// deliberately avoids "restarting" because `helm uninstall` / final-stop +/// can't be distinguished from a rolling restart at signal time. +const SHUTDOWN_MSG: &str = "⚠️ Bot is shutting down. Context will reset on return."; + +/// Broadcast deadline. Shutdown itself must never block on a slow platform, +/// so incomplete sends are dropped once this elapses. +const SHUTDOWN_BROADCAST_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(10); + #[derive(Parser)] #[command(name = "openab")] #[command(about = "Multi-platform ACP agent broker (Discord, Slack)", long_about = None)] @@ -176,7 +185,7 @@ async fn main() -> anyhow::Result<()> { ); let handler = discord::Handler { - router, + router: router.clone(), allow_all_channels, allow_all_users, allowed_channels, @@ -201,21 +210,31 @@ async fn main() -> anyhow::Result<()> { .event_handler(handler) .await?; - // Graceful Discord shutdown on ctrl_c + // Graceful shutdown on SIGINT or SIGTERM: wait for the signal, + // broadcast to every active thread (Discord + Slack), then stop + // Discord shards. `client.start()` is the foreground blocker here, + // so this handler runs as a spawned task. let shard_manager = client.shard_manager.clone(); + let shutdown_router = router.clone(); tokio::spawn(async move { - tokio::signal::ctrl_c().await.ok(); - info!("shutdown signal received"); + wait_for_shutdown_signal().await; + shutdown_router + .broadcast_shutdown(SHUTDOWN_MSG, SHUTDOWN_BROADCAST_TIMEOUT) + .await; shard_manager.shutdown_all().await; }); info!("discord bot running"); client.start().await?; } else { - // No Discord — just wait for ctrl_c - info!("running without discord, press ctrl+c to stop"); - tokio::signal::ctrl_c().await.ok(); - info!("shutdown signal received"); + // No Discord — this task itself blocks on the shutdown signal, + // then broadcasts before falling through to cleanup. Slack-only + // deployments need SIGTERM + broadcast just like Discord. + info!("running without discord, waiting for shutdown signal"); + wait_for_shutdown_signal().await; + router + .broadcast_shutdown(SHUTDOWN_MSG, SHUTDOWN_BROADCAST_TIMEOUT) + .await; } // Cleanup @@ -233,6 +252,28 @@ async fn main() -> anyhow::Result<()> { } } +/// Wait for SIGINT (ctrl_c) or, on Unix, SIGTERM (systemctl stop, docker stop, +/// kill). Without SIGTERM handling the broker would be killed outright by +/// service managers and skip the shutdown broadcast, so both signals route +/// here on Unix. On non-Unix targets `tokio::signal::unix` is unavailable, so +/// we fall back to ctrl_c alone. +#[cfg(unix)] +async fn wait_for_shutdown_signal() { + let mut sigterm = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) + .expect("install SIGTERM handler"); + tokio::select! { + _ = tokio::signal::ctrl_c() => {} + _ = sigterm.recv() => {} + } + info!("shutdown signal received"); +} + +#[cfg(not(unix))] +async fn wait_for_shutdown_signal() { + tokio::signal::ctrl_c().await.ok(); + info!("shutdown signal received"); +} + fn parse_id_set(raw: &[String], label: &str) -> anyhow::Result> { let set: HashSet = raw .iter() From 3fe3137d4ef3a30f347c63ed3f5c680b2f4c21a0 Mon Sep 17 00:00:00 2001 From: Codex Temp Date: Sun, 19 Apr 2026 08:05:16 +0000 Subject: [PATCH 2/3] fix: adapt shutdown broadcast to sender_json API --- src/adapter.rs | 3 ++- src/discord.rs | 10 +++++++++- src/slack.rs | 10 +++++++++- 3 files changed, 20 insertions(+), 3 deletions(-) diff --git a/src/adapter.rs b/src/adapter.rs index a4b29582..ee6d8541 100644 --- a/src/adapter.rs +++ b/src/adapter.rs @@ -116,6 +116,7 @@ impl AdapterRouter { adapter: &Arc, thread_channel: &ChannelRef, sender_json: &str, + sender_is_bot: bool, prompt: &str, extra_blocks: Vec, trigger_msg: &MessageRef, @@ -166,7 +167,7 @@ impl AdapterRouter { // new bot event, route it here during the shutdown window, and // we'd reply with another bot-authored rejection — looping until // the bot-turn cap trips. Human senders still get the notice. - if !sender.is_bot { + if !sender_is_bot { let _ = adapter .send_message( thread_channel, diff --git a/src/discord.rs b/src/discord.rs index ac947597..341081d3 100644 --- a/src/discord.rs +++ b/src/discord.rs @@ -517,7 +517,15 @@ impl EventHandler for Handler { tokio::spawn(async move { let sender_json = serde_json::to_string(&sender).unwrap(); if let Err(e) = router - .handle_message(&adapter, &thread_channel, &sender_json, &prompt, extra_blocks, &trigger_msg) + .handle_message( + &adapter, + &thread_channel, + &sender_json, + sender.is_bot, + &prompt, + extra_blocks, + &trigger_msg, + ) .await { error!("handle_message error: {e}"); diff --git a/src/slack.rs b/src/slack.rs index 92155e2b..a2d2d8a6 100644 --- a/src/slack.rs +++ b/src/slack.rs @@ -953,7 +953,15 @@ async fn handle_message( let adapter_dyn: Arc = adapter.clone(); if let Err(e) = router - .handle_message(&adapter_dyn, &thread_channel, &sender_json, &prompt, extra_blocks, &trigger_msg) + .handle_message( + &adapter_dyn, + &thread_channel, + &sender_json, + sender.is_bot, + &prompt, + extra_blocks, + &trigger_msg, + ) .await { error!("Slack handle_message error: {e}"); From 44a9cddbb19a62ab8a477156563c6f9425d49cda Mon Sep 17 00:00:00 2001 From: Codex Temp Date: Sun, 19 Apr 2026 08:08:21 +0000 Subject: [PATCH 3/3] fix: derive shutdown bot flag from sender_json --- src/adapter.rs | 8 +++++--- src/discord.rs | 1 - src/slack.rs | 1 - 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/adapter.rs b/src/adapter.rs index ee6d8541..864a5c6a 100644 --- a/src/adapter.rs +++ b/src/adapter.rs @@ -1,6 +1,6 @@ use anyhow::Result; use async_trait::async_trait; -use serde::Serialize; +use serde::{Deserialize, Serialize}; use std::sync::Arc; use tracing::{error, info, warn}; @@ -32,7 +32,7 @@ pub struct MessageRef { } /// Sender identity injected into prompts for downstream agent context. -#[derive(Clone, Debug, Serialize)] +#[derive(Clone, Debug, Serialize, Deserialize)] pub struct SenderContext { pub schema: String, pub sender_id: String, @@ -116,7 +116,6 @@ impl AdapterRouter { adapter: &Arc, thread_channel: &ChannelRef, sender_json: &str, - sender_is_bot: bool, prompt: &str, extra_blocks: Vec, trigger_msg: &MessageRef, @@ -167,6 +166,9 @@ impl AdapterRouter { // new bot event, route it here during the shutdown window, and // we'd reply with another bot-authored rejection — looping until // the bot-turn cap trips. Human senders still get the notice. + let sender_is_bot = serde_json::from_str::(sender_json) + .map(|sender| sender.is_bot) + .unwrap_or(false); if !sender_is_bot { let _ = adapter .send_message( diff --git a/src/discord.rs b/src/discord.rs index 341081d3..105651ec 100644 --- a/src/discord.rs +++ b/src/discord.rs @@ -521,7 +521,6 @@ impl EventHandler for Handler { &adapter, &thread_channel, &sender_json, - sender.is_bot, &prompt, extra_blocks, &trigger_msg, diff --git a/src/slack.rs b/src/slack.rs index a2d2d8a6..0675d75c 100644 --- a/src/slack.rs +++ b/src/slack.rs @@ -957,7 +957,6 @@ async fn handle_message( &adapter_dyn, &thread_channel, &sender_json, - sender.is_bot, &prompt, extra_blocks, &trigger_msg,