Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
216 changes: 212 additions & 4 deletions src/commands/telegram.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ensureProjectClaudeMd, run, runUserMessage } from "../runner";
import { ensureProjectClaudeMd, run, runUserMessage, runFork, killActive, isMainBusy } from "../runner";
import { getSettings, loadSettings } from "../config";
import { resetSession } from "../sessions";
import { transcribeAudioToText } from "../whisper";
Expand Down Expand Up @@ -300,6 +300,128 @@ async function sendTyping(token: string, chatId: number, threadId?: number): Pro
}).catch(() => {});
}

// Chat IDs with verbose tool display enabled
const verboseChats = new Set<number>();

/**
* Build a streaming callback using editMessageText.
* On first chunk: send a placeholder message to get message_id.
* On subsequent chunks (throttled): edit that message with accumulated plain text.
* In verbose mode, tool call/result lines appear above the text response.
*/
function makeStreamCallback(
token: string,
chatId: number,
threadId: number | undefined,
options: { intervalMs?: number; verbose?: boolean } = {}
): { onChunk: (text: string) => void; onToolEvent: (line: string) => void; waitForStreamMsg: () => Promise<number | null> } {
const { intervalMs = 500, verbose = false } = options;
let textAcc = "";
const toolLines: string[] = [];
let lastSentAt = 0;
let timer: ReturnType<typeof setTimeout> | null = null;
let streamMsgId: number | null = null;
let initPromise: Promise<void> | null = null;
let finalized = false;

const getDisplay = () => {
const MAX_TOOL_LINES = 8;
const MAX_TEXT_LINES = 15;
let toolPart: string;
if (toolLines.length > MAX_TOOL_LINES) {
const shown = toolLines.slice(-MAX_TOOL_LINES);
toolPart = `[...${toolLines.length - MAX_TOOL_LINES} earlier]\n` + shown.join("\n");
} else {
toolPart = toolLines.join("\n");
}
let textPart = textAcc;
const textLines = textPart.split("\n");
if (textLines.length > MAX_TEXT_LINES) {
textPart = `[...]\n` + textLines.slice(-MAX_TEXT_LINES).join("\n");
}
return toolPart + (textPart ? (toolPart ? "\n\n" : "") + textPart : "");
};

const editStream = () => {
if (!streamMsgId || finalized) return;
let display: string;
if (verbose) {
display = getDisplay();
} else {
// Keep last N lines of text for streaming preview
const lines = textAcc.split("\n");
display = lines.length > 30 ? `[...]\n${lines.slice(-30).join("\n")}` : textAcc;
}
if (!display) return;
callApi(token, "editMessageText", {
chat_id: chatId,
message_id: streamMsgId,
text: display.slice(0, 4096),
}).catch(() => {});
};

const flush = async () => {
const display = verbose ? getDisplay() : textAcc;
if (!display) return;
lastSentAt = Date.now();

if (!streamMsgId && !initPromise) {
initPromise = (async () => {
try {
const res = await callApi<{ ok: boolean; result: { message_id: number } }>(
token, "sendMessage", {
chat_id: chatId,
text: "⏳",
...(threadId ? { message_thread_id: threadId } : {}),
}
);
if (res.ok) {
streamMsgId = res.result.message_id;
editStream();
}
} catch {}
})();
await initPromise;
} else {
if (initPromise) await initPromise;
editStream();
}
};

const onChunk = (text: string) => {
textAcc += text;
const now = Date.now();
if (now - lastSentAt >= intervalMs) {
if (timer) { clearTimeout(timer); timer = null; }
flush();
} else if (!timer) {
timer = setTimeout(() => { timer = null; flush(); }, intervalMs - (now - lastSentAt));
}
};

const onToolEvent = (line: string) => {
if (!verbose) return;
toolLines.push(line);
// Use same throttle logic as onChunk to avoid spamming the API
const now = Date.now();
if (now - lastSentAt >= intervalMs) {
if (timer) { clearTimeout(timer); timer = null; }
flush();
} else if (!timer) {
timer = setTimeout(() => { timer = null; flush(); }, intervalMs - (now - lastSentAt));
}
};

const waitForStreamMsg = async (): Promise<{ msgId: number | null; hadToolLines: boolean }> => {
if (timer) { clearTimeout(timer); timer = null; }
if (initPromise) await initPromise;
finalized = true;
return { msgId: streamMsgId, hadToolLines: toolLines.length > 0 };
};

return { onChunk, onToolEvent, waitForStreamMsg };
}

function extractReactionDirective(text: string): { cleanedText: string; reactionEmoji: string | null } {
let reactionEmoji: string | null = null;
const cleanedText = text
Expand Down Expand Up @@ -516,6 +638,47 @@ async function handleMessage(message: TelegramMessage): Promise<void> {
return;
}

if (command === "/kill") {
const killed = killActive();
await sendMessage(config.token, chatId, killed ? "Killed active agent." : "No active agent running.", threadId);
return;
}

if (command === "/verbose") {
if (verboseChats.has(chatId)) {
verboseChats.delete(chatId);
await sendMessage(config.token, chatId, "Verbose mode off.", threadId);
} else {
verboseChats.add(chatId);
await sendMessage(config.token, chatId, "Verbose mode on — tool calls will be shown.", threadId);
}
return;
}

if (command === "/fork") {
const forkPrompt = text.replace(/^\/fork\s*/i, "").trim();
if (!forkPrompt) {
await sendMessage(config.token, chatId, "Usage: /fork <prompt>", threadId);
return;
}
const typingInterval = setInterval(() => sendTyping(config.token, chatId, threadId), 4000);
try {
await sendTyping(config.token, chatId, threadId);
const senderLabel = message.from?.username ?? String(userId ?? "unknown");
const result = await runFork(`[Telegram from ${senderLabel}]\nMessage: ${forkPrompt}`);
if (result.exitCode !== 0) {
await sendMessage(config.token, chatId, `Fork error (exit ${result.exitCode}): ${result.stderr || "Unknown error"}`, threadId);
} else {
await sendMessage(config.token, chatId, result.stdout || "(empty response)", threadId);
}
} catch (err) {
await sendMessage(config.token, chatId, `Fork error: ${err instanceof Error ? err.message : String(err)}`, threadId);
} finally {
clearInterval(typingInterval);
}
return;
}

// Secretary: detect reply to a bot alert message → treat as custom reply
const replyToMsgId = message.reply_to_message?.message_id;
if (replyToMsgId && text && botId && message.reply_to_message?.from?.id === botId) {
Expand Down Expand Up @@ -598,18 +761,63 @@ async function handleMessage(message: TelegramMessage): Promise<void> {
);
}
const prefixedPrompt = promptParts.join("\n");
const result = await runUserMessage("telegram", prefixedPrompt);
const busy = isMainBusy();
const verbose = verboseChats.has(chatId);
let result;
let streamMsgId: number | null = null;
let hadToolLines = false;
if (busy) {
result = await runFork(prefixedPrompt);
} else {
const stream = makeStreamCallback(config.token, chatId, threadId, { verbose });
result = await runUserMessage("telegram", prefixedPrompt, stream.onChunk, stream.onToolEvent);
const streamResult = await stream.waitForStreamMsg();
streamMsgId = streamResult.msgId;
hadToolLines = streamResult.hadToolLines;
}

if (result.exitCode !== 0) {
await sendMessage(config.token, chatId, `Error (exit ${result.exitCode}): ${result.stderr || "Unknown error"}`, threadId);
const errText = `Error (exit ${result.exitCode}): ${result.stderr || "Unknown error"}`;
if (streamMsgId) {
await callApi(config.token, "editMessageText", {
chat_id: chatId, message_id: streamMsgId, text: errText,
}).catch(() => sendMessage(config.token, chatId, errText, threadId));
} else {
await sendMessage(config.token, chatId, errText, threadId);
}
} else {
const { cleanedText, reactionEmoji } = extractReactionDirective(result.stdout || "");
if (reactionEmoji) {
await sendReaction(config.token, chatId, message.message_id, reactionEmoji).catch((err) => {
console.error(`[Telegram] Failed to send reaction for ${label}: ${err instanceof Error ? err.message : err}`);
});
}
await sendMessage(config.token, chatId, cleanedText || "(empty response)", threadId);
const finalText = cleanedText || "(empty response)";
if (streamMsgId) {
// Edit the streaming message with final formatted HTML.
// editStream() already set the message to the correct plain text content,
// so if all edits fail (e.g. "message is not modified"), do NOT send a new
// message — the user already sees the correct content and a sendMessage
// would create a duplicate.
const html = markdownToTelegramHtml(normalizeTelegramText(finalText));
await callApi(config.token, "editMessageText", {
chat_id: chatId, message_id: streamMsgId,
text: html.slice(0, 4096), parse_mode: "HTML",
}).catch(() => callApi(config.token, "editMessageText", {
chat_id: chatId, message_id: streamMsgId,
text: finalText.slice(0, 4096),
}).catch(() => {
// If all edits fail and the stream message has tool output (verbose),
// send the final response as a new message. But if there were no tool
// lines, the stream message already shows the correct text — "not
// modified" just means it's already right, so don't send a duplicate.
if (verbose && hadToolLines) {
return sendMessage(config.token, chatId, finalText, threadId);
}
}));
} else {
await sendMessage(config.token, chatId, finalText, threadId);
}
}
} catch (err) {
const errMsg = err instanceof Error ? err.message : String(err);
Expand Down
Loading