Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 68 additions & 51 deletions scripts/telegram-bridge.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,29 +21,33 @@ const { execFileSync, spawn } = require("child_process");
const { resolveOpenshell } = require("../bin/lib/resolve-openshell");
const { shellQuote, validateName } = require("../bin/lib/runner");

const OPENSHELL = resolveOpenshell();
if (!OPENSHELL) {
console.error("openshell not found on PATH or in common locations");
process.exit(1);
}
let OPENSHELL, TOKEN, API_KEY, SANDBOX, ALLOWED_CHATS;

const TOKEN = process.env.TELEGRAM_BOT_TOKEN;
const API_KEY = process.env.NVIDIA_API_KEY;
const SANDBOX = process.env.SANDBOX_NAME || "nemoclaw";
try { validateName(SANDBOX, "SANDBOX_NAME"); } catch (e) { console.error(e.message); process.exit(1); }
const ALLOWED_CHATS = process.env.ALLOWED_CHAT_IDS
? process.env.ALLOWED_CHAT_IDS.split(",").map((s) => s.trim())
: null;
function init() {
OPENSHELL = resolveOpenshell();
if (!OPENSHELL) {
console.error("openshell not found on PATH or in common locations");
process.exit(1);
}

if (!TOKEN) { console.error("TELEGRAM_BOT_TOKEN required"); process.exit(1); }
if (!API_KEY) { console.error("NVIDIA_API_KEY required"); process.exit(1); }
TOKEN = process.env.TELEGRAM_BOT_TOKEN;
API_KEY = process.env.NVIDIA_API_KEY;
SANDBOX = process.env.SANDBOX_NAME || "nemoclaw";
try { validateName(SANDBOX, "SANDBOX_NAME"); } catch (e) { console.error(e.message); process.exit(1); }
ALLOWED_CHATS = process.env.ALLOWED_CHAT_IDS
? process.env.ALLOWED_CHAT_IDS.split(",").map((s) => s.trim())
: null;

if (!TOKEN) { console.error("TELEGRAM_BOT_TOKEN required"); process.exit(1); }
if (!API_KEY) { console.error("NVIDIA_API_KEY required"); process.exit(1); }
}

let offset = 0;
const activeSessions = new Map(); // chatId → message history

const COOLDOWN_MS = 5000;
const lastMessageTime = new Map();
const busyChats = new Set();
const chatQueues = new Map(); // chatId → Promise chain (serializes agent calls per chat)
const chatQueueDepths = new Map(); // chatId → number of pending jobs
const chatEpochs = new Map(); // chatId → generation counter (bumped on /reset)
const MAX_QUEUE_DEPTH = 5;

// ── Telegram API helpers ──────────────────────────────────────────

Expand Down Expand Up @@ -198,45 +202,52 @@ async function poll() {
// Handle /reset
if (msg.text === "/reset") {
activeSessions.delete(chatId);
chatQueues.delete(chatId);
chatQueueDepths.delete(chatId);
chatEpochs.set(chatId, (chatEpochs.get(chatId) || 0) + 1);
await sendMessage(chatId, "Session reset.", msg.message_id);
continue;
}

// Rate limiting: per-chat cooldown
const now = Date.now();
const lastTime = lastMessageTime.get(chatId) || 0;
if (now - lastTime < COOLDOWN_MS) {
const wait = Math.ceil((COOLDOWN_MS - (now - lastTime)) / 1000);
await sendMessage(chatId, `Please wait ${wait}s before sending another message.`, msg.message_id);
// Queue message so only one agent call runs per chat at a time,
// preventing session-file lock collisions on the same session ID.
const depth = chatQueueDepths.get(chatId) || 0;
if (depth >= MAX_QUEUE_DEPTH) {
await sendMessage(chatId, "Still processing, please wait.", msg.message_id);
continue;
}

// Per-chat serialization: reject if this chat already has an active session
if (busyChats.has(chatId)) {
await sendMessage(chatId, "Still processing your previous message.", msg.message_id);
continue;
}

lastMessageTime.set(chatId, now);
busyChats.add(chatId);

// Send typing indicator
await sendTyping(chatId);

// Keep a typing indicator going while agent runs
const typingInterval = setInterval(() => sendTyping(chatId), 4000);

try {
const response = await runAgentInSandbox(msg.text, chatId);
clearInterval(typingInterval);
console.log(`[${chatId}] agent: ${response.slice(0, 100)}...`);
await sendMessage(chatId, response, msg.message_id);
} catch (err) {
clearInterval(typingInterval);
await sendMessage(chatId, `Error: ${err.message}`, msg.message_id);
} finally {
busyChats.delete(chatId);
}
const messageId = msg.message_id;
const text = msg.text;
const epoch = chatEpochs.get(chatId) || 0;
chatQueueDepths.set(chatId, depth + 1);
const job = async () => {
// If the session was reset since this job was enqueued, skip it
// so the old and new session identities never overlap.
if ((chatEpochs.get(chatId) || 0) !== epoch) return;
await sendTyping(chatId);
const typingInterval = setInterval(() => sendTyping(chatId), 4000);
try {
const sessionId = epoch > 0 ? `${chatId}-e${epoch}` : chatId;
const response = await runAgentInSandbox(text, sessionId);
clearInterval(typingInterval);
console.log(`[${chatId}] agent: ${response.slice(0, 100)}...`);
await sendMessage(chatId, response, messageId);
} catch (err) {
clearInterval(typingInterval);
await sendMessage(chatId, `Error: ${err.message}`, messageId);
} finally {
chatQueueDepths.set(chatId, (chatQueueDepths.get(chatId) || 1) - 1);
if (chatQueueDepths.get(chatId) <= 0) chatQueueDepths.delete(chatId);
}
};

const prev = chatQueues.get(chatId) || Promise.resolve();
const next = prev.then(job, job);
chatQueues.set(chatId, next);
void next.finally(() => {
if (chatQueues.get(chatId) === next) chatQueues.delete(chatId);
});
}
}
} catch (err) {
Expand Down Expand Up @@ -273,4 +284,10 @@ async function main() {
poll();
}

main();
// ── Exports (for testing) ──────────────────────────────────────────
module.exports = { chatQueues, chatQueueDepths, chatEpochs, MAX_QUEUE_DEPTH };

if (require.main === module) {
init();
main();
}
173 changes: 173 additions & 0 deletions test/telegram-bridge-queue.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
// SPDX-FileCopyrightText: Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Tests for #860: per-chat message queuing prevents concurrent agent calls
// and caps queue depth to provide backpressure.

import { describe, it, expect } from "vitest";
import { createRequire } from "node:module";

const require = createRequire(import.meta.url);
const { chatQueues, chatQueueDepths, chatEpochs, MAX_QUEUE_DEPTH } = require("../scripts/telegram-bridge");

describe("telegram bridge queue serialization", () => {
it("exports MAX_QUEUE_DEPTH as 5", () => {
expect(MAX_QUEUE_DEPTH).toBe(5);
});

it("two concurrent jobs on the same chatId execute sequentially", async () => {
const order = [];
let resolveFirst;
const firstBlocks = new Promise((r) => { resolveFirst = r; });

const job1 = async () => {
order.push("job1-start");
await firstBlocks;
order.push("job1-end");
};
const job2 = async () => {
order.push("job2-start");
order.push("job2-end");
};

const chatId = "test-serial";
const prev = chatQueues.get(chatId) || Promise.resolve();
const chain1 = prev.then(job1, job1);
chatQueues.set(chatId, chain1);

const chain2 = chain1.then(job2, job2);
chatQueues.set(chatId, chain2);

// job1 should have started but job2 should be waiting
await new Promise((r) => setTimeout(r, 10));
expect(order).toEqual(["job1-start"]);

// Unblock job1 — job2 should run after
resolveFirst();
await chain2;

expect(order).toEqual(["job1-start", "job1-end", "job2-start", "job2-end"]);

// Cleanup
chatQueues.delete(chatId);
});

it("different chatIds run independently (in parallel)", async () => {
const order = [];
let resolveA;
const blockA = new Promise((r) => { resolveA = r; });

const jobA = async () => {
order.push("A-start");
await blockA;
order.push("A-end");
};
const jobB = async () => {
order.push("B-start");
order.push("B-end");
};

const prevA = chatQueues.get("chatA") || Promise.resolve();
const chainA = prevA.then(jobA, jobA);
chatQueues.set("chatA", chainA);

const prevB = chatQueues.get("chatB") || Promise.resolve();
const chainB = prevB.then(jobB, jobB);
chatQueues.set("chatB", chainB);

// B should complete even though A is blocked
await chainB;
expect(order).toContain("B-start");
expect(order).toContain("B-end");
expect(order).not.toContain("A-end");

resolveA();
await chainA;
expect(order).toEqual(["A-start", "B-start", "B-end", "A-end"]);

chatQueues.delete("chatA");
chatQueues.delete("chatB");
});

it("chatQueueDepths tracks pending jobs and decrements on completion", async () => {
const chatId = "test-depth";
chatQueueDepths.set(chatId, 3);
expect(chatQueueDepths.get(chatId)).toBe(3);

chatQueueDepths.set(chatId, chatQueueDepths.get(chatId) - 1);
expect(chatQueueDepths.get(chatId)).toBe(2);

chatQueueDepths.delete(chatId);
});

it("MAX_QUEUE_DEPTH caps at 5 pending jobs", () => {
const chatId = "test-cap";
// Simulate 5 queued jobs
chatQueueDepths.set(chatId, 5);

const depth = chatQueueDepths.get(chatId) || 0;
expect(depth >= MAX_QUEUE_DEPTH).toBe(true);

chatQueueDepths.delete(chatId);
});

it("/reset during in-flight job does not cause overlapping runs", async () => {
const chatId = "test-reset-race";
let resolveOld;
const blockOld = new Promise((r) => { resolveOld = r; });
const executed = [];

// Epoch starts at 0
chatEpochs.delete(chatId);
const epochBefore = chatEpochs.get(chatId) || 0;

// Enqueue a blocking "old" job that captures epoch 0
const oldJob = async () => {
executed.push("old-start");
await blockOld;
executed.push("old-end");
};
const prev = chatQueues.get(chatId) || Promise.resolve();
const chain1 = prev.then(oldJob, oldJob);
chatQueues.set(chatId, chain1);
chatQueueDepths.set(chatId, 1);

// old job starts
await new Promise((r) => setTimeout(r, 10));
expect(executed).toEqual(["old-start"]);

// Simulate /reset: bump epoch, clear queue state
chatQueues.delete(chatId);
chatQueueDepths.delete(chatId);
chatEpochs.set(chatId, epochBefore + 1);
const epochAfter = chatEpochs.get(chatId) || 0;
expect(epochAfter).toBe(1);

// Enqueue a "new" job that captures epoch 1 — stale-check should skip
// it if it was queued under the old epoch, but here it's under the new one.
const newJob = async () => {
// Check: new job's epoch matches current, so it should run
const currentEpoch = chatEpochs.get(chatId) || 0;
expect(currentEpoch).toBe(epochAfter);
executed.push("new-run");
};
const prevNew = chatQueues.get(chatId) || Promise.resolve();
const chain2 = prevNew.then(newJob, newJob);
chatQueues.set(chatId, chain2);

// new job runs immediately (fresh chain, not blocked by old)
await chain2;
expect(executed).toContain("new-run");
expect(executed).not.toContain("old-end");

// Resolve old job — it should complete without throwing
resolveOld();
await chain1;
expect(executed).toContain("old-end");

// Cleanup
chatQueues.delete(chatId);
chatQueueDepths.delete(chatId);
chatEpochs.delete(chatId);
});
});