Skip to content

feat(conductor): queue messages when conductor is busy, deliver with reply callback#452

Open
nlenepveu wants to merge 8 commits intoasheshgoplani:mainfrom
blackfuel-ai:fix/bridge-conductor-busy-queue
Open

feat(conductor): queue messages when conductor is busy, deliver with reply callback#452
nlenepveu wants to merge 8 commits intoasheshgoplani:mainfrom
blackfuel-ai:fix/bridge-conductor-busy-queue

Conversation

@nlenepveu
Copy link
Copy Markdown
Contributor

Summary

  • Queue instead of drop: when a conductor session is running/active/starting, messages are held in an in-memory queue (_message_queue) and delivered once the conductor becomes free
  • Reply callback: a per-message async closure (ReplyCallback) is stored with each queued item and invoked via fire-and-forget task after drain delivery — the deferred response is pushed back to the user in the same Telegram chat / Slack thread
  • Event loop safety: all blocking subprocess.run calls in async handlers are wrapped in run_in_executor; ensure_conductor_running is now async with await asyncio.sleep instead of time.sleep
  • Queue robustness: MAX_QUEUE_DEPTH=20 per session, deque/popleft() (O(1)), drain task supervisor with crash recovery, get_running_loop() throughout, force_queue param to skip redundant status checks, get_session_status returns "unknown" on CLI failure (not "error") to prevent premature queue drops
  • UX: both Telegram and Slack deferred replies show "Queued response (waited Xs):" prefix with elapsed wait time; orphaned subprocesses killed on TimeoutExpired; differentiated error messages for error-state drop vs send failure

Commits

  1. 09217e6 — core queue infrastructure: _message_queue, _enqueue_message, _drain_queue, _ensure_drain_task; was_busy detection in Telegram and Slack handlers
  2. 8a0172b — post-rebase bug fixes: Slack tuple check, wait_for_response NameError, reply_callback threading, run_in_executor for drain, create_task over deprecated ensure_future, get_default_conductor promotion, typing indicators before blocking sends
  3. 69d3ae1 — robustness pass: all MUST FIX + QUICK WIN items from 4-reviewer audit (async handlers, drain supervisor, deque, MAX_QUEUE_DEPTH, force_queue, get_running_loop, orphan subprocess kill, "Queued response" prefix on Slack, elapsed time in deferred replies)

Test plan

  • Send a message via Telegram while conductor is busy → expect "⏳ Conductor busy — message queued, will reply here when done."; when conductor finishes, expect follow-up "Queued response (waited Xs):\n<response>"
  • Same flow via Slack — same prefix and threading behaviour
  • Send multiple messages while busy → all delivered in order; each gets its own callback
  • Kill conductor mid-queue → expect "[Queued message could not be delivered — conductor is in error state.]" per queued item
  • Restart bridge while queue has items → messages lost (in-memory, expected); no crash or hang
  • python3 -c "import ast; ast.parse(open('conductor/bridge.py').read()); print('OK')"

When the conductor Claude session is busy (running/active), the bridge
was blocking on 'agent-deck session send' for 80s then dropping the
message with a failure notification.

Changes:
- send_to_conductor() now checks session status before sending. If busy,
  messages are queued in-memory and delivered by a background drain task
  that polls every 5s until the conductor is idle/waiting again.
- Race condition handled: if the conductor becomes busy between the
  status check and the actual send, the timeout/not-ready error triggers
  queueing instead of dropping.
- Telegram/Slack handlers show '⏳ Conductor busy — message queued' to
  the user instead of a failure message.
- Heartbeats skip the cycle when conductor is busy (they're periodic,
  no point queueing them).

Also syncs bridge.py with the installed version (multi-conductor
discovery, Slack support, heartbeat rules injection).
- Fix crash: config["user_id"] → config["telegram"]["user_id"]
- Fix Slack: unpack tuple from send_to_conductor (was always truthy bool check)
- Fix Slack: remove wait_for_response() call (function no longer exists)
- Add reply_callback to queue infrastructure so users get deferred responses
- Thread _tg_reply / _slack_reply closures through was_busy paths
- Fix _drain_queue: run_in_executor for blocking subprocess calls (unfreeze loop)
- Fix _drain_queue: notify all queued callbacks on error drop, not just first
- Fix _drain_queue: guard against empty stdout to avoid aiogram crash
- Fix _drain_queue: use --wait to capture conductor response for callback
- Fix ensure_future → create_task (deprecated API)
- Remove dead _queue_lock (never acquired)
- Promote get_default_conductor to module level (was inaccessible across scopes)
- Move typing indicators before blocking send_to_conductor calls (Telegram + Slack)
- Fix _tg_reply: always include "Queued response:" label regardless of profile_tag
- Remove unnecessary say_ref/thread_ts_ref aliases in _slack_reply closure
- Add functools import for run_in_executor partial calls
Event loop / blocking (M1, M2, Q9):
- make ensure_conductor_running async; time.sleep(5) → await asyncio.sleep(5)
- wrap get_session_status + send_to_conductor(wait_for_reply=True) in
  run_in_executor in Telegram and Slack handlers (up to 330s block eliminated)
- get_event_loop() → get_running_loop() throughout drain queue

Subprocess (M8, Q6):
- run_cli: kill + communicate orphaned subprocess on TimeoutExpired
- get_session_status: return "unknown" on CLI failure (not "error") to prevent
  premature queue drop on transient subprocess issues; drain skips "unknown"

Queue type + bounds (M3, Q1, Q4):
- add ReplyCallback = Callable[[str], Coroutine[Any, Any, None]] type alias
- switch _message_queue values from list to collections.deque (O(1) popleft)
- enforce MAX_QUEUE_DEPTH=20 per session; drop oldest on overflow

Queue infrastructure (M4, M5, M6, M7, M10, M11, Q2, Q5):
- add force_queue param to send_to_conductor: skip redundant inner status check
  and enqueue immediately (eliminates was_busy TOCTOU + redundant subprocess call)
- add _drain_queue_supervised wrapper: restarts drain on unexpected crash
- _ensure_drain_task: retrieve + log exception before restarting; use
  get_running_loop().create_task instead of bare create_task
- add _fire_callback helper: invoke callbacks via create_task (fire-and-forget)
  so drain loop is not blocked by slow Telegram/Slack API calls
- move drain exit check to after the session loop (Q2: avoids missing items)
- log delivered count after pop with actual remaining (M10: was pre-pop estimate)
- differentiate error messages: error-state drop vs send-failure drop (M11)

UX (M9, Q7):
- Slack _slack_reply: add "Queued response (waited Xs):" prefix matching Telegram
- both _tg_reply and _slack_reply: show elapsed wait time via time.monotonic() closure
ReplyCallback was defined after send_to_conductor used it in a function
signature, causing NameError at import time. Move it to just before the
first use.
…files()

The load_config() refactor removed 'profiles' from the config dict in
favour of discover_conductors()/get_unique_profiles(), but create_telegram_bot
still referenced the old key, causing KeyError on startup.
Two fixes:

1. run_cli: replace subprocess.run() with Popen + communicate(timeout=)
   so the proc object is available on TimeoutExpired — subprocess.run()
   does not set exc.proc. Also adds start_new_session=True + os.killpg()
   to kill the entire process group (including tmux send-keys grandchildren)
   on timeout.

2. heartbeat_loop: wrap get_session_status and send_to_conductor(wait_for_reply=True)
   in run_in_executor — both are blocking subprocess calls that were
   freezing the event loop for up to RESPONSE_TIMEOUT (300s), preventing
   Telegram/Slack message processing during heartbeat sends.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant