feat: broadcast shutdown notification to active threads (RFC #78 §1d Phase 1)#182
feat: broadcast shutdown notification to active threads (RFC #78 §1d Phase 1)#182ruan330 wants to merge 4 commits intoopenabdev:mainfrom
Conversation
Review — PR #182Good feature — users currently get no warning when the broker shuts down. This fixes that. ✅ What looks good
🔴 Must fix before merge1. Rebase onto latest
2. CI not triggered 🟡 Non-blocking3. k8s terminationGracePeriodSeconds 4. "Restarting" message on permanent shutdown SummaryValuable UX improvement. Rebase needed — pool and main.rs have both changed significantly. |
04d1e0e to
82c8a7e
Compare
masami-agent
left a comment
There was a problem hiding this comment.
PR Review: #182
Summary
- Problem: Broker restarts kill conversations silently — users get no notification that the broker is going down.
- Approach: Add
active_thread_ids()accessor toSessionPool, handle SIGTERM alongside SIGINT, and broadcast a neutral notification to each active Discord thread before shutting down shards. - Risk level: Low
Core Assessment
- Problem clearly stated: ✅ — well-motivated, linked to RFC #78 §1d
- Approach appropriate: ✅ — minimal, clean separation between pool (lifecycle) and main (Discord)
- Alternatives considered: ✅ — PR description covers three alternatives with clear reasoning
- Best approach for now: ✅ — Phase 1 scope is correct; no over-engineering
Findings
pool.rs — active_thread_ids()
Clean and minimal. Takes a read lock, returns an owned Vec<String>, releases immediately. No contention risk with the subsequent shutdown() write lock. ✅
main.rs — SIGTERM handling
The tokio::select! over ctrl_c() and sigterm.recv() is the standard pattern. The .expect("install SIGTERM handler") is acceptable here — if we cannot install a signal handler at startup, panicking is the right call since the broker would be unable to shut down gracefully anyway. ✅
main.rs — broadcast loop
The strip_prefix("discord:") + parse::<u64>() approach correctly handles the {platform}:{id} key format used by AdapterRouter. Non-Discord keys (e.g. slack:...) are silently skipped via the parse failure path, which is the right behavior for now. ✅
main.rs — error handling
channel.say() failures are logged as warnings and do not block shutdown. Correct — a dead Discord connection should never prevent pool cleanup. ✅
Review Summary
🔴 Blockers
Cargo.lockversion bump is out of scope. The diff changesCargo.lockfrom0.7.6→0.7.7. Onmain,Cargo.tomlis already0.7.7butCargo.lockis stale at0.7.6. While the sync is technically correct, it is unrelated to the shutdown broadcast feature and should not be bundled into this PR. Please revert theCargo.lockchange and let it be fixed in a separate housekeeping commit, or confirm this was an intentional rebase artifact that the maintainers are okay with.
💬 Questions
-
Non-Discord shutdown path also lacks SIGTERM. The
elsebranch (line ~194 on main:"running without discord, press ctrl+c to stop") still only awaitsctrl_c(). In a Slack-only or headless deployment,docker stop/killwould still bypass the graceful path. Was this intentionally left out of scope, or should it be addressed here for consistency? -
Slack adapter threads. When Slack support is active,
thread_keywill beslack:{id}. The broadcast loop silently skips non-Discord keys (theparse::<u64>()fails onslack:...afterstrip_prefix("discord:")returns the full key). This is fine for Phase 1, but worth noting — should there be atracing::debug!for skipped non-Discord keys so operators know the broadcast was Discord-only?
🔧 Suggested Changes
-
Consider a timeout on the broadcast loop. If there are many active sessions and Discord is slow or rate-limiting, the broadcast could delay shutdown significantly. A
tokio::time::timeoutwrapping the entire loop (e.g. 5–10 seconds) would provide a safety net. Not blocking, but worth considering for production robustness. -
The
pool.shutdown()is called twice on the Discord path. After the spawned task runsshard_manager.shutdown_all(), execution continues pastclient.start().await?and hits theshutdown_pool.shutdown().awaitat line ~208. The broadcast task does NOT callpool.shutdown()— it only broadcasts and shuts down shards. So the pool is shut down once (at the bottom), which is correct. However, the broadcast task clonespoolasbroadcast_poolbut never calls shutdown on it. This is fine, but the namingbroadcast_poolcould be clearer — maybenotification_poolor justpool_refto signal it is read-only usage.
ℹ️ Info
- The
Cargo.lockdiscrepancy onmain(Cargo.toml= 0.7.7,Cargo.lock= 0.7.6) is a pre-existing issue. It should be fixed, but in a separate commit. - RFC #78 §1d Phase 2 (session persistence) is correctly deferred. The neutral wording ("you can continue" vs "will resume") is the right call given no state is persisted yet.
⚪ Nits
- Line length on the
channel.say()string literal is ~90 chars — fine, but if you want to break it for readability, aconstat the top of the block would work nicely:const SHUTDOWN_MSG: &str = "🔄 Broker restarting. You can continue the conversation when the broker is back.";
Verdict
COMMENT — One blocker (Cargo.lock scope) and two questions to resolve. The core implementation is clean and well-designed. Once the Cargo.lock issue is addressed and the questions are answered, this should be ready for approval.
masami-agent
left a comment
There was a problem hiding this comment.
Hi @ruan330 — leaving some inline comments on specific areas. The overall review is in my earlier comment above. Thanks for the solid work on this!
| [[package]] | ||
| name = "openab" | ||
| version = "0.7.6" | ||
| version = "0.7.7" |
There was a problem hiding this comment.
🔴 BLOCKER — This version bump (0.7.6 → 0.7.7) is a pre-existing Cargo.lock / Cargo.toml sync issue on main, not related to the shutdown broadcast feature. Please revert this file from the PR to keep the scope clean. The lockfile sync can be fixed in a separate housekeeping PR or commit.
git checkout origin/main -- Cargo.lock| // systemctl stop / docker stop / kill deliver SIGTERM; without handling | ||
| // it, the broker would exit without running the broadcast below. | ||
| let shard_manager = client.shard_manager.clone(); | ||
| let broadcast_pool = pool.clone(); |
There was a problem hiding this comment.
💬 Question — The else branch below (non-Discord / Slack-only mode, around line 194 on main) still only awaits ctrl_c(). In a Slack-only or headless deployment, docker stop / kill would still bypass the graceful path. Was this intentionally left out of scope for this PR? If so, might be worth a // TODO: comment noting that SIGTERM handling should be added here too.
| info!(count = thread_ids.len(), "broadcasting shutdown notification"); | ||
| for thread_key in thread_ids { | ||
| // thread_key format: "platform:id" (e.g. "discord:1234567890"). | ||
| let raw_id = thread_key |
There was a problem hiding this comment.
🟡 Suggestion — Consider wrapping the broadcast loop in a timeout to guard against Discord rate limits or network issues delaying shutdown:
let _ = tokio::time::timeout(
std::time::Duration::from_secs(10),
async {
for thread_key in thread_ids {
// ... existing broadcast logic ...
}
},
).await;Not blocking — just a safety net for production. If Discord is slow, you probably still want the process to exit within a bounded time.
| .strip_prefix("discord:") | ||
| .unwrap_or(&thread_key); | ||
| if let Ok(id) = raw_id.parse::<u64>() { | ||
| let channel = serenity::model::id::ChannelId::new(id); |
There was a problem hiding this comment.
ℹ️ Info — When Slack adapter threads exist, their keys (slack:{id}) will pass through strip_prefix("discord:") unchanged (returns None → unwrap_or gives the full key), then parse::<u64>() fails on slack:12345 and they're silently skipped. This is correct behavior for Phase 1, but a tracing::debug! here for skipped non-Discord keys would help operators understand why some sessions didn't get notifications:
let raw_id = thread_key
.strip_prefix("discord:")
.unwrap_or(&thread_key);
if let Ok(id) = raw_id.parse::<u64>() {
// ... send notification ...
} else {
tracing::debug!(thread_key = %thread_key, "skipping non-Discord thread for shutdown broadcast");
}| ) | ||
| .await | ||
| { | ||
| tracing::warn!(thread_key = %thread_key, error = %e, "failed to post shutdown notification"); |
There was a problem hiding this comment.
⚪ Nit — The message string is a bit long inline. Consider extracting it as a constant for readability:
const SHUTDOWN_MSG: &str =
"🔄 Broker restarting. You can continue the conversation when the broker is back.";Minor — skip if you prefer it inline.
OpenAB PR Screening
Screening report## IntentThis PR tries to make broker shutdowns visible to users who are actively chatting in Discord threads. Today, a broker restart or stop can terminate an in-flight conversation without explanation, leaving a partial reply or a dead thread with no operator-visible or user-visible signal. The proposed change addresses that gap by sending a shutdown notice to all currently active Discord threads before the broker tears down the session pool. FeatThis is a feature with some operational-hardening characteristics. In plain terms, when the OpenAB broker receives Who It ServesThe primary beneficiary is Discord end users in active threads, with secondary value for maintainers and operators who want shutdown behavior to be less opaque and easier to reason about during deploys, restarts, or crashes followed by restart. Rewritten PromptImplement Phase 1 of graceful shutdown notification for the Discord broker. When the process receives either Merge PitchThis is worth advancing because it closes a user-facing failure mode with low implementation surface area: silent thread death during broker restarts. The change is operationally useful, narrow in scope, and aligned with RFC #78 Phase 1. The risk profile is moderate-low. The likely reviewer concern is not whether the feature is useful, but whether shutdown-path messaging is too coupled to Discord delivery, too optimistic under rate limits, or under-tested for edge cases like invalid thread IDs and partial shutdown failures. Best-Practice ComparisonOpenClaw principles that are relevant here:
Hermes Agent principles that are relevant here:
Net assessment:
Implementation OptionsOption 1: Minimal signal-path broadcastKeep the current approach: handle This is the most conservative path. It preserves existing boundaries reasonably well and keeps the PR tightly scoped to user notification. Option 2: Lifecycle notifier abstractionIntroduce a small shutdown notifier component or trait owned by the broker layer. The pool exposes active session identifiers, while the broker invokes a notifier responsible for formatting and delivering shutdown notices. The initial implementation still targets Discord threads, but the shutdown path becomes easier to test and less ad hoc. This is the balanced option. It adds a small abstraction without dragging Discord concerns into Option 3: Broader graceful-shutdown frameworkImplement a more complete shutdown coordinator: optional drain window for in-flight streams, bounded concurrency for thread notifications, structured shutdown result logging, and groundwork for Phase 2 persistence or resumable session metadata. This is the ambitious option. It better matches mature gateway/runtime patterns, but it expands scope and risks turning a clean Phase 1 merge into a broader design exercise. Comparison Table
RecommendationRecommend the balanced version of Option 1 leaning slightly toward Option 2: merge the shutdown notification behavior now, but keep the design disciplined enough that notification delivery is clearly broker-owned and easy to evolve. That is the right step for follow-up discussion because it solves the real user problem immediately without pretending to solve persistence or resumability. If reviewers are comfortable with the current narrow accessor plus |
82c8a7e to
fa106cd
Compare
|
Force-pushed Mapping to the screening concerns:
Design alternatives considered (before picking pool-owned addressing):
Tests: 43 passed, clippy clean. CI green. |
…penabdev#78 §1d Phase 1) On SIGINT or SIGTERM the broker now posts a short notification to every active thread across all configured adapters (Discord, Slack, …) before closing shards. Users get a clear signal that the broker is going away instead of replies cutting off silently mid-stream. Discord Discussion URL: https://discord.com/channels/1488041051187974246/1495050997461024879 ## Design - `SessionPool` stores each active session's addressing info alongside its connection (`addresses: HashMap<thread_key, (ChannelRef, Adapter)>`), kept in lockstep with `state.active` so broadcast has a single source of truth. No parallel cache in the adapter layer. - `SessionPool::begin_shutdown()` flips the shutdown flag and snapshots `addresses` under the state write lock, making admission and snapshot atomic with respect to each other. - `AdapterRouter::broadcast_shutdown(message, timeout)` calls `begin_shutdown`, then posts the notification to every snapshot entry in parallel via `tokio::task::JoinSet`. A `tokio::select!` with a 10-second deadline caps total broadcast time so shutdown itself never blocks on a slow platform. - `main.rs` shutdown handler listens for SIGTERM in addition to SIGINT (`#[cfg(unix)]`-gated with a ctrl_c-only fallback for non-Unix), and runs `broadcast_shutdown` before `shard_manager.shutdown_all()`. Both the Discord branch and the Slack-only branch use the same flow via a shared `wait_for_shutdown_signal()` helper. ## Shutdown admission invariant Every session in the pool at broadcast time is either: 1. In the broadcast snapshot (receives the notification), or 2. Rejected at admission (receives an inline "Bot is shutting down" notice through `handle_message`'s pool-error path). `get_or_create` checks `is_shutting_down()` at four atomic points: 1. Fast-fail on entry (cheap). 2. Inside the initial `state.read()` block (catches shutdowns that ran before we reached this task). 3. Before the `return Ok(())` on the existing-alive-session path, with a `state.read()` barrier to synchronize with `begin_shutdown`'s `state.write()` flag flip (catches shutdowns that started while we were waiting on the per-connection mutex). 4. Inside the final `state.write()` block before insert (atomic with `begin_shutdown`'s flag flip + snapshot). ## Behaviour notes - Notification is best-effort: delivery errors and the 10-second deadline both fall through to normal pool teardown. - Wording is intentionally neutral ("Context will reset on return") — we don't promise automatic session resume; RFC openabdev#78 Phase 2 persistence is a separate follow-up. - Per-channel parallel send is safe under Discord/Slack rate limits (limits are per-channel, not per-adapter) and is ~10× faster than sequential for deployments with several active threads. ## Testing - `cargo test` passes: 43 passed, 0 failed. - `cargo clippy --all-targets -- -D warnings` clean. - Verified on bare-metal and Docker brokers: SIGTERM triggers `broadcasting shutdown notification count=N` log and the message arrives in every active thread. Relates to: openabdev#78, openabdev#75
fa106cd to
12aa620
Compare
|
Force-pushed
Tests: 43 passed, clippy clean. CI green. |
|
small maintainer note: the earlier CI failure was not a design objection to the shutdown feature itself. the real issue was branch drift against current |
|
handoff note: the earlier CI failures were merge-drift noise, not a feature-design rejection. we rebased/fixed the branch against current |
masami-agent
left a comment
There was a problem hiding this comment.
PR Review: #182
Summary
- Problem: When the broker shuts down (SIGINT/SIGTERM), active conversations die silently mid-reply with no user-facing signal. RFC #78 §1d Phase 1.
- Approach: Pool-owned addressing + atomic shutdown flag + parallel broadcast via
JoinSetwith timeout, platform-neutral through theChatAdapterabstraction. - Risk level: Medium — touches core pool admission logic and shutdown path.
Core Assessment
- Problem clearly stated: ✅ — Excellent PR description with design rationale, behavior notes, and test plan.
- Approach appropriate: ✅ — Pool-owns-addressing is the right call. No parallel cache in the adapter layer, single source of truth.
- Alternatives considered: ✅ — PR description explicitly contrasts with the original Discord-specific approach and explains why the multi-platform refactor makes this the right home.
- Best approach for now: ✅ — Minimal, well-scoped. Deliberately avoids session persistence (Phase 2 follow-up).
Findings
Shutdown admission invariant (pool.rs) — The 4-point check in get_or_create is thorough and correct:
- Fast-fail on entry (cheap, avoids spawning ACP process).
- Inside
state.read()block (catches shutdowns before this task ran). - After
conn.lock()withstate.read()barrier (synchronizes withbegin_shutdown's write-lock flag flip — this is the subtle one, and the comment explains it well). - Inside final
state.write()before insert (atomic with flag-flip + snapshot).
The begin_shutdown() method correctly holds the state write lock while flipping the AtomicBool and snapshotting addresses, so any admission that committed before the lock is in the snapshot, and any that comes after sees the flag. Sound.
addresses invariant — addresses.keys() == active.keys() is maintained across all mutation paths: insert, stale rebuild, eviction, cleanup_idle, and shutdown. I traced each one — all correct.
Broadcast timeout (adapter.rs) — The biased select with set.shutdown().await is correct. set.shutdown() cancels remaining tasks and waits for them to drop, which is the right behavior for HTTP calls that should be cancellable via tokio's cooperative cancellation.
Bot-loop prevention (adapter.rs) — Smart catch. During the shutdown window, the bot's own broadcast message could arrive as a new event on Slack, trigger handle_message, get rejected by the pool, and the rejection reply would loop. Deserializing sender_json to check is_bot breaks the cycle cleanly. The Deserialize derive on SenderContext is the minimal change needed.
Signal handling (main.rs) — wait_for_shutdown_signal() with #[cfg(unix)] SIGTERM + ctrl_c, and a ctrl_c-only fallback for non-Unix, is clean and correct. Both the Discord and Slack-only branches use the same flow.
discord.rs / slack.rs — Purely rustfmt line wrapping. No logic changes. ✅
Review Summary
🔴 Blockers
(none)
💬 Questions
get_or_createsignature change — The newchannel: &ChannelRef, adapter: &Arc<dyn ChatAdapter>parameters are passed through fromhandle_message. Is there any other call site forget_or_createoutside ofhandle_message? If so, those callers would need updating. (I only see the one call site inadapter.rs, so this should be fine — just confirming.)
🔧 Suggested Changes
- Commit history cleanup — The branch has merge commits from "Codex Temp" (
48034166,9e7deaa9,13563f9c). Since the project uses squash merge, this will be cleaned up at merge time, but if you're planning any further pushes to this branch, consider rebasing to a single commit for cleanergit logduring review.
ℹ️ Info
- The
_sync = self.state.read().awaitpattern in check #3 is unusual (acquiring a read lock purely for synchronization), but the comment explains the intent clearly. The RwLock acquire provides the happens-before relationship needed to make theAtomicBoolstore frombegin_shutdownvisible. This is a valid and well-documented pattern here. broadcast_shutdownsends viaadapter.send_message()(Discord/Slack HTTP API), not through ACP connections, sopool.shutdown()clearingactiveconnections afterward does not interfere with in-flight broadcast sends. The sequencing inmain.rs(broadcast → shutdown shards → pool.shutdown()) is correct.- The
AtomicBoolwith Acquire/Release ordering is technically redundant with the RwLock-protected checks in points #2 and #4 (which already have happens-before from the lock), but it's needed for the fast-fail path (#1) and the read-barrier path (#3). Belt-and-suspenders for shutdown correctness is the right trade-off.
⚪ Nits
(none — code is clean, comments are thorough)
Verdict
APPROVE — This is a well-designed, well-documented feature. The shutdown admission invariant is sound, the race condition coverage is thorough, and the platform-neutral broadcast approach is the right architecture. No blockers.
obrutjack
left a comment
There was a problem hiding this comment.
Merge checklist verified:
- ✅ CI all green (cargo check + 7 Docker smoke tests)
- ✅ masami-agent reviewed — no blockers, thorough analysis of shutdown admission invariant
- ✅ Multi-platform broadcast via ChatAdapter (Discord + Slack)
- ✅ Parallel broadcast with 10s timeout (JoinSet + deadline)
- ✅ SIGTERM + SIGINT handling (unix + non-unix)
- ✅ Bot-loop prevention during shutdown window
- ✅ Pool admission 4-layer shutdown check — sound
- ✅ Neutral wording (no "restarting" assumption)
- ✅ No version regression
Well-designed feature. The pool-owned addressing approach is the right architecture. Pending @thepagent.
Summary
On SIGINT or SIGTERM the broker now posts a short notification to every active thread across all configured adapters (Discord, Slack, …) before closing shards. Users get a clear signal that the broker is going away instead of replies cutting off silently mid-stream.
Discord Discussion URL: https://discord.com/channels/1488041051187974246/1495050997461024879
Relates to: #78, #75
Problem
Today, when the broker is restarted (
systemctl restart,docker stop, Ctrl+C), conversations just die silently mid-reply. Users have no signal that anything is happening until the broker comes back, and in-flight streams leave partial messages with no explanation. RFC #78 §1d calls this out as Phase 1.The original #182 (rebased before v0.7.6) was Discord-specific and done at the
main.rslevel. The multi-platform refactor in #259 makes that approach stale: Slack threads also need the notification, and theAdapterRouter/ChatAdapterabstraction is the right home for platform-neutral broadcast. This revision rewrites the feature against that architecture.Design
Pool owns addressing, not the router.
SessionPoolnow stores each active session'sChannelRef+Arc<dyn ChatAdapter>alongside the connection (addresses: HashMap<thread_key, ...>), kept in lockstep withstate.activeso broadcast has a single source of truth. No parallel cache in the adapter layer, nopool.active_thread_ids()filter, no prune task.Atomic admission and snapshot.
SessionPool::begin_shutdown()takes the state write lock, flips the shutdown flag, and snapshotsaddresses— all in one critical section. Any admission committed before us is in the snapshot.get_or_createchecksis_shutting_down()at four points so every session is either in the snapshot or rejected at admission:state.read()block.return Ok(())on the existing-alive-session path, with astate.read()barrier synchronizing againstbegin_shutdown'sstate.write()flag flip (closes the race where shutdown starts while we wait on the per-connection mutex).state.write()block before insert.Platform-neutral broadcast.
AdapterRouter::broadcast_shutdown(message, timeout)callspool.begin_shutdown()and posts the notification to every snapshot entry in parallel viatokio::task::JoinSet. Atokio::select!with a configurable deadline (10s here) caps total broadcast time so shutdown itself never blocks on a slow platform.Unified signal handling.
main.rsuses a sharedwait_for_shutdown_signal()helper for SIGINT + SIGTERM (#[cfg(unix)]-gated with a ctrl_c-only fallback for non-Unix). Both the Discord-enabled and Slack-only branches invokebroadcast_shutdownbefore tearing down adapters.Behaviour notes
Tests & verification
cargo test— 43 passed, 0 failed.cargo clippy --all-targets -- -D warnings— clean.broadcasting shutdown notification count=Nlog and the message arrives in every active thread.Test plan
cargo test,cargo clippy, Docker smoke tests.docker stop/kill -TERM), verify the thread receives the "Bot is shutting down" message before the pool tears down.