diff --git a/scripts/telegram-bridge.js b/scripts/telegram-bridge.js index 27d5d7ba4..30e7aea1d 100755 --- a/scripts/telegram-bridge.js +++ b/scripts/telegram-bridge.js @@ -21,29 +21,33 @@ const { execFileSync, spawn } = require("child_process"); const { resolveOpenshell } = require("../bin/lib/resolve-openshell"); const { shellQuote, validateName } = require("../bin/lib/runner"); -const OPENSHELL = resolveOpenshell(); -if (!OPENSHELL) { - console.error("openshell not found on PATH or in common locations"); - process.exit(1); -} +let OPENSHELL, TOKEN, API_KEY, SANDBOX, ALLOWED_CHATS; -const TOKEN = process.env.TELEGRAM_BOT_TOKEN; -const API_KEY = process.env.NVIDIA_API_KEY; -const SANDBOX = process.env.SANDBOX_NAME || "nemoclaw"; -try { validateName(SANDBOX, "SANDBOX_NAME"); } catch (e) { console.error(e.message); process.exit(1); } -const ALLOWED_CHATS = process.env.ALLOWED_CHAT_IDS - ? process.env.ALLOWED_CHAT_IDS.split(",").map((s) => s.trim()) - : null; +function init() { + OPENSHELL = resolveOpenshell(); + if (!OPENSHELL) { + console.error("openshell not found on PATH or in common locations"); + process.exit(1); + } -if (!TOKEN) { console.error("TELEGRAM_BOT_TOKEN required"); process.exit(1); } -if (!API_KEY) { console.error("NVIDIA_API_KEY required"); process.exit(1); } + TOKEN = process.env.TELEGRAM_BOT_TOKEN; + API_KEY = process.env.NVIDIA_API_KEY; + SANDBOX = process.env.SANDBOX_NAME || "nemoclaw"; + try { validateName(SANDBOX, "SANDBOX_NAME"); } catch (e) { console.error(e.message); process.exit(1); } + ALLOWED_CHATS = process.env.ALLOWED_CHAT_IDS + ? process.env.ALLOWED_CHAT_IDS.split(",").map((s) => s.trim()) + : null; + + if (!TOKEN) { console.error("TELEGRAM_BOT_TOKEN required"); process.exit(1); } + if (!API_KEY) { console.error("NVIDIA_API_KEY required"); process.exit(1); } +} let offset = 0; const activeSessions = new Map(); // chatId → message history - -const COOLDOWN_MS = 5000; -const lastMessageTime = new Map(); -const busyChats = new Set(); +const chatQueues = new Map(); // chatId → Promise chain (serializes agent calls per chat) +const chatQueueDepths = new Map(); // chatId → number of pending jobs +const chatEpochs = new Map(); // chatId → generation counter (bumped on /reset) +const MAX_QUEUE_DEPTH = 5; // ── Telegram API helpers ────────────────────────────────────────── @@ -198,45 +202,52 @@ async function poll() { // Handle /reset if (msg.text === "/reset") { activeSessions.delete(chatId); + chatQueues.delete(chatId); + chatQueueDepths.delete(chatId); + chatEpochs.set(chatId, (chatEpochs.get(chatId) || 0) + 1); await sendMessage(chatId, "Session reset.", msg.message_id); continue; } - // Rate limiting: per-chat cooldown - const now = Date.now(); - const lastTime = lastMessageTime.get(chatId) || 0; - if (now - lastTime < COOLDOWN_MS) { - const wait = Math.ceil((COOLDOWN_MS - (now - lastTime)) / 1000); - await sendMessage(chatId, `Please wait ${wait}s before sending another message.`, msg.message_id); + // Queue message so only one agent call runs per chat at a time, + // preventing session-file lock collisions on the same session ID. + const depth = chatQueueDepths.get(chatId) || 0; + if (depth >= MAX_QUEUE_DEPTH) { + await sendMessage(chatId, "Still processing, please wait.", msg.message_id); continue; } - // Per-chat serialization: reject if this chat already has an active session - if (busyChats.has(chatId)) { - await sendMessage(chatId, "Still processing your previous message.", msg.message_id); - continue; - } - - lastMessageTime.set(chatId, now); - busyChats.add(chatId); - - // Send typing indicator - await sendTyping(chatId); - - // Keep a typing indicator going while agent runs - const typingInterval = setInterval(() => sendTyping(chatId), 4000); - - try { - const response = await runAgentInSandbox(msg.text, chatId); - clearInterval(typingInterval); - console.log(`[${chatId}] agent: ${response.slice(0, 100)}...`); - await sendMessage(chatId, response, msg.message_id); - } catch (err) { - clearInterval(typingInterval); - await sendMessage(chatId, `Error: ${err.message}`, msg.message_id); - } finally { - busyChats.delete(chatId); - } + const messageId = msg.message_id; + const text = msg.text; + const epoch = chatEpochs.get(chatId) || 0; + chatQueueDepths.set(chatId, depth + 1); + const job = async () => { + // If the session was reset since this job was enqueued, skip it + // so the old and new session identities never overlap. + if ((chatEpochs.get(chatId) || 0) !== epoch) return; + await sendTyping(chatId); + const typingInterval = setInterval(() => sendTyping(chatId), 4000); + try { + const sessionId = epoch > 0 ? `${chatId}-e${epoch}` : chatId; + const response = await runAgentInSandbox(text, sessionId); + clearInterval(typingInterval); + console.log(`[${chatId}] agent: ${response.slice(0, 100)}...`); + await sendMessage(chatId, response, messageId); + } catch (err) { + clearInterval(typingInterval); + await sendMessage(chatId, `Error: ${err.message}`, messageId); + } finally { + chatQueueDepths.set(chatId, (chatQueueDepths.get(chatId) || 1) - 1); + if (chatQueueDepths.get(chatId) <= 0) chatQueueDepths.delete(chatId); + } + }; + + const prev = chatQueues.get(chatId) || Promise.resolve(); + const next = prev.then(job, job); + chatQueues.set(chatId, next); + void next.finally(() => { + if (chatQueues.get(chatId) === next) chatQueues.delete(chatId); + }); } } } catch (err) { @@ -273,4 +284,10 @@ async function main() { poll(); } -main(); +// ── Exports (for testing) ────────────────────────────────────────── +module.exports = { chatQueues, chatQueueDepths, chatEpochs, MAX_QUEUE_DEPTH }; + +if (require.main === module) { + init(); + main(); +} diff --git a/test/telegram-bridge-queue.test.js b/test/telegram-bridge-queue.test.js new file mode 100644 index 000000000..e067fdb37 --- /dev/null +++ b/test/telegram-bridge-queue.test.js @@ -0,0 +1,173 @@ +// SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// SPDX-License-Identifier: Apache-2.0 +// +// Tests for #860: per-chat message queuing prevents concurrent agent calls +// and caps queue depth to provide backpressure. + +import { describe, it, expect } from "vitest"; +import { createRequire } from "node:module"; + +const require = createRequire(import.meta.url); +const { chatQueues, chatQueueDepths, chatEpochs, MAX_QUEUE_DEPTH } = require("../scripts/telegram-bridge"); + +describe("telegram bridge queue serialization", () => { + it("exports MAX_QUEUE_DEPTH as 5", () => { + expect(MAX_QUEUE_DEPTH).toBe(5); + }); + + it("two concurrent jobs on the same chatId execute sequentially", async () => { + const order = []; + let resolveFirst; + const firstBlocks = new Promise((r) => { resolveFirst = r; }); + + const job1 = async () => { + order.push("job1-start"); + await firstBlocks; + order.push("job1-end"); + }; + const job2 = async () => { + order.push("job2-start"); + order.push("job2-end"); + }; + + const chatId = "test-serial"; + const prev = chatQueues.get(chatId) || Promise.resolve(); + const chain1 = prev.then(job1, job1); + chatQueues.set(chatId, chain1); + + const chain2 = chain1.then(job2, job2); + chatQueues.set(chatId, chain2); + + // job1 should have started but job2 should be waiting + await new Promise((r) => setTimeout(r, 10)); + expect(order).toEqual(["job1-start"]); + + // Unblock job1 — job2 should run after + resolveFirst(); + await chain2; + + expect(order).toEqual(["job1-start", "job1-end", "job2-start", "job2-end"]); + + // Cleanup + chatQueues.delete(chatId); + }); + + it("different chatIds run independently (in parallel)", async () => { + const order = []; + let resolveA; + const blockA = new Promise((r) => { resolveA = r; }); + + const jobA = async () => { + order.push("A-start"); + await blockA; + order.push("A-end"); + }; + const jobB = async () => { + order.push("B-start"); + order.push("B-end"); + }; + + const prevA = chatQueues.get("chatA") || Promise.resolve(); + const chainA = prevA.then(jobA, jobA); + chatQueues.set("chatA", chainA); + + const prevB = chatQueues.get("chatB") || Promise.resolve(); + const chainB = prevB.then(jobB, jobB); + chatQueues.set("chatB", chainB); + + // B should complete even though A is blocked + await chainB; + expect(order).toContain("B-start"); + expect(order).toContain("B-end"); + expect(order).not.toContain("A-end"); + + resolveA(); + await chainA; + expect(order).toEqual(["A-start", "B-start", "B-end", "A-end"]); + + chatQueues.delete("chatA"); + chatQueues.delete("chatB"); + }); + + it("chatQueueDepths tracks pending jobs and decrements on completion", async () => { + const chatId = "test-depth"; + chatQueueDepths.set(chatId, 3); + expect(chatQueueDepths.get(chatId)).toBe(3); + + chatQueueDepths.set(chatId, chatQueueDepths.get(chatId) - 1); + expect(chatQueueDepths.get(chatId)).toBe(2); + + chatQueueDepths.delete(chatId); + }); + + it("MAX_QUEUE_DEPTH caps at 5 pending jobs", () => { + const chatId = "test-cap"; + // Simulate 5 queued jobs + chatQueueDepths.set(chatId, 5); + + const depth = chatQueueDepths.get(chatId) || 0; + expect(depth >= MAX_QUEUE_DEPTH).toBe(true); + + chatQueueDepths.delete(chatId); + }); + + it("/reset during in-flight job does not cause overlapping runs", async () => { + const chatId = "test-reset-race"; + let resolveOld; + const blockOld = new Promise((r) => { resolveOld = r; }); + const executed = []; + + // Epoch starts at 0 + chatEpochs.delete(chatId); + const epochBefore = chatEpochs.get(chatId) || 0; + + // Enqueue a blocking "old" job that captures epoch 0 + const oldJob = async () => { + executed.push("old-start"); + await blockOld; + executed.push("old-end"); + }; + const prev = chatQueues.get(chatId) || Promise.resolve(); + const chain1 = prev.then(oldJob, oldJob); + chatQueues.set(chatId, chain1); + chatQueueDepths.set(chatId, 1); + + // old job starts + await new Promise((r) => setTimeout(r, 10)); + expect(executed).toEqual(["old-start"]); + + // Simulate /reset: bump epoch, clear queue state + chatQueues.delete(chatId); + chatQueueDepths.delete(chatId); + chatEpochs.set(chatId, epochBefore + 1); + const epochAfter = chatEpochs.get(chatId) || 0; + expect(epochAfter).toBe(1); + + // Enqueue a "new" job that captures epoch 1 — stale-check should skip + // it if it was queued under the old epoch, but here it's under the new one. + const newJob = async () => { + // Check: new job's epoch matches current, so it should run + const currentEpoch = chatEpochs.get(chatId) || 0; + expect(currentEpoch).toBe(epochAfter); + executed.push("new-run"); + }; + const prevNew = chatQueues.get(chatId) || Promise.resolve(); + const chain2 = prevNew.then(newJob, newJob); + chatQueues.set(chatId, chain2); + + // new job runs immediately (fresh chain, not blocked by old) + await chain2; + expect(executed).toContain("new-run"); + expect(executed).not.toContain("old-end"); + + // Resolve old job — it should complete without throwing + resolveOld(); + await chain1; + expect(executed).toContain("old-end"); + + // Cleanup + chatQueues.delete(chatId); + chatQueueDepths.delete(chatId); + chatEpochs.delete(chatId); + }); +});