fix: queue telegram bridge messages per chat to prevent session lock contention#860
fix: queue telegram bridge messages per chat to prevent session lock contention#860Junior00619 wants to merge 1 commit intoNVIDIA:mainfrom
Conversation
📝 WalkthroughWalkthroughStartup config and env validation were moved into a new init(); per-chat Promise-based queues ( Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant User as Telegram User
participant TG as Telegram Bot API
participant Bridge as telegram-bridge.js
participant Agent as OpenClaw Sandbox
participant Store as Session Store
User->>TG: send message
TG->>Bridge: deliver update (chatId, message_id, text)
Bridge->>Bridge: enqueue job in chatQueues[chatId]
Bridge->>TG: send typing action
Bridge->>Agent: runAgentInSandbox(text, sessionId_with_epoch)
Agent->>Store: read/write session file
Agent-->>Bridge: agent response / error
Bridge->>TG: send reply (using original message_id)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Comment |
2a61512 to
1f1be1a
Compare
|
@cv friendly ping , this is ready for review whenever you have a moment 🙏 |
There was a problem hiding this comment.
@Junior00619 Correct fix — Promise-chain queuing per chat prevents concurrent runAgentInSandbox calls and eliminates the session lock root cause. The prev.then(job, job) pattern and finally cleanup are both correct.
Three things to address:
-
No tests. This is the more critical fix (root cause vs. aftermath in #862) and should have at least a unit test for queue serialization — e.g., two concurrent messages on the same chatId execute sequentially, not in parallel.
-
Unbounded queue growth. If a user sends 50 messages while the agent is processing, all 50 queue up with no backpressure. Consider capping queue depth per chat (e.g., 5) and responding with "Still processing, please wait" for messages beyond the limit.
-
Poll interval. The current 100ms poll is aggressive. Consider raising to 1000ms — Telegram's
getUpdateslong-poll already handles latency, and 100ms just burns CPU between polls.
|
✨ Thanks for submitting this PR with a detailed summary, it addresses a bug with the Telegram bridge messages and proposes a fix to improve the performance of NemoClaw, which could enhance the user experience. |
1f1be1a to
6de4b04
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (1)
test/telegram-bridge-queue.test.js (1)
13-113: Add a regression test for/resetduring an active queued job.Current coverage validates queue mechanics, but it does not protect the reset-path race (queue eviction while a prior job is still running). A dedicated test for that sequence would prevent regressions of the lock-contention fix.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@test/telegram-bridge-queue.test.js` around lines 13 - 113, Add a new test that enqueues a blocked job for a chatId (use chatQueues to chain a Promise that awaits a resolver) then simulate a /reset while that job is still running by deleting the chat's entries from chatQueues and chatQueueDepths (chatQueues.delete(chatId); chatQueueDepths.delete(chatId)); then resolve the blocked job and await its chain to ensure it completes without throwing and that the maps no longer contain the chatId; reference chatQueues, chatQueueDepths and MAX_QUEUE_DEPTH in the test to locate where to hook into the existing tests.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@test/telegram-bridge-queue.test.js`:
- Around line 13-113: Add a new test that enqueues a blocked job for a chatId
(use chatQueues to chain a Promise that awaits a resolver) then simulate a
/reset while that job is still running by deleting the chat's entries from
chatQueues and chatQueueDepths (chatQueues.delete(chatId);
chatQueueDepths.delete(chatId)); then resolve the blocked job and await its
chain to ensure it completes without throwing and that the maps no longer
contain the chatId; reference chatQueues, chatQueueDepths and MAX_QUEUE_DEPTH in
the test to locate where to hook into the existing tests.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: c0492bfe-a90b-457d-9895-6d80f29c22c1
📒 Files selected for processing (2)
scripts/telegram-bridge.jstest/telegram-bridge-queue.test.js
…contention Replace per-chat cooldown + busyChats rejection with Promise-chain queuing that serializes agent calls per chat ID, preventing concurrent session-file lock collisions. Address reviewer feedback: - Add queue depth cap (MAX_QUEUE_DEPTH=5) with backpressure response - Keep upstream 1000ms poll interval (not 100ms) - Add unit tests for queue serialization and backpressure - Wrap startup validation in init() so the module is import-safe for tests Fixes NVIDIA#831
6de4b04 to
f350b1b
Compare
There was a problem hiding this comment.
🧹 Nitpick comments (1)
scripts/telegram-bridge.js (1)
224-243: Consider movingclearIntervalto a single location infinally.The
typingIntervalis cleared in both thetryblock (line 233) andcatchblock (line 237). While callingclearIntervaltwice is harmless, consolidating cleanup infinallywould be cleaner and ensure the interval is always cleared regardless of control flow.♻️ Suggested refactor
const job = async () => { // If the session was reset since this job was enqueued, skip it // so the old and new session identities never overlap. if ((chatEpochs.get(chatId) || 0) !== epoch) return; await sendTyping(chatId); const typingInterval = setInterval(() => sendTyping(chatId), 4000); try { const sessionId = epoch > 0 ? `${chatId}-e${epoch}` : chatId; const response = await runAgentInSandbox(text, sessionId); - clearInterval(typingInterval); console.log(`[${chatId}] agent: ${response.slice(0, 100)}...`); await sendMessage(chatId, response, messageId); } catch (err) { - clearInterval(typingInterval); await sendMessage(chatId, `Error: ${err.message}`, messageId); } finally { + clearInterval(typingInterval); chatQueueDepths.set(chatId, (chatQueueDepths.get(chatId) || 1) - 1); if (chatQueueDepths.get(chatId) <= 0) chatQueueDepths.delete(chatId); } };🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@scripts/telegram-bridge.js` around lines 224 - 243, The typingInterval is cleared in both the try and catch blocks within the job async function; move the clearInterval(typingInterval) call into the finally block so the interval is always cleaned exactly once. Modify the job function: remove clearInterval from the try and catch, and add clearInterval(typingInterval) at the start of the existing finally block that updates chatQueueDepths; keep references to typingInterval, job, runAgentInSandbox, and sendMessage to locate the change.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@scripts/telegram-bridge.js`:
- Around line 224-243: The typingInterval is cleared in both the try and catch
blocks within the job async function; move the clearInterval(typingInterval)
call into the finally block so the interval is always cleaned exactly once.
Modify the job function: remove clearInterval from the try and catch, and add
clearInterval(typingInterval) at the start of the existing finally block that
updates chatQueueDepths; keep references to typingInterval, job,
runAgentInSandbox, and sendMessage to locate the change.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
Run ID: 104bd9eb-6282-446b-8a41-97a1e2b62487
📒 Files selected for processing (2)
scripts/telegram-bridge.jstest/telegram-bridge-queue.test.js
🚧 Files skipped from review as they are similar to previous changes (1)
- test/telegram-bridge-queue.test.js
Fixes #831
Problem
Concurrent inbound messages for the same Telegram chat each trigger an independent runAgentInSandbox call keyed on the same session ID (tg-). Because the underlying session store uses file-level locking, overlapping writes from parallel SSH+agent processes race on the lock and surface as session file locked (timeout 10000ms) errors.
Root Cause
The poll loop dispatches agent calls inline with await, but the for...of iteration over updates only serializes messages within a single polling batch. Under sustained load, the next getUpdates batch can begin processing before prior agent calls resolve, producing concurrent sandbox processes for the same chat.
Fix
A per-chat promise chain (chatQueues: Map<string, Promise>) gates runAgentInSandbox so at most one invocation is in-flight per chat at any time. Subsequent messages for the same chat are appended to the chain via .then(job, job) — the rejection handler ensures the queue drains even if an individual job throws. Cross-chat concurrency is unaffected.
Cleanup is handled in .finally(): the map entry is removed only when the stored reference matches the completing promise, preventing a late-settling chain from stomping a freshly enqueued one. The /reset command now also evicts the queue entry so a user reset doesn't block behind a stale in-flight call.
Summary by CodeRabbit
New Features
Bug Fixes
Tests