From 734cf3e3973f0e4bbcba357792a43bf62a570fe1 Mon Sep 17 00:00:00 2001 From: Malte Ubl Date: Fri, 20 Mar 2026 16:08:57 -0700 Subject: [PATCH 1/9] feat: add concurrency strategies for overlapping messages (queue, debounce, concurrent) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Problem When multiple messages arrive on the same thread while a handler is still processing, the SDK has only one behavior: **lock-and-drop**. The incoming message is silently discarded (or force-released, which creates uncontrolled concurrency). This is insufficient for most real-world use cases: - **AI chatbots** lose user follow-up messages sent while the model is streaming - **Customer support bots** miss messages entirely, breaking conversation flow - **Collaborative editing bots** need to coalesce rapid corrections into one action ## Solution Introduce a new `concurrency` option on `ChatConfig` with four strategies: ### `'drop'` (default, backward-compatible) Existing behavior. Lock acquired or `LockError` thrown. No changes. ### `'queue'` Messages that arrive while a handler is running are enqueued in the state adapter. When the current handler finishes, the queue is drained: **only the latest message is dispatched**, with all intermediate messages provided as `context.skipped`. This gives the handler full visibility into what happened while it was busy, without forcing it to re-process every message sequentially. ```typescript const chat = new Chat({ concurrency: 'queue', // ... }); chat.onNewMention(async (thread, message, context) => { if (context && context.skipped.length > 0) { // "You sent 4 messages while I was thinking. Responding to your latest." const allMessages = [...context.skipped, message]; // Pass all messages to the LLM for full context } }); ``` Flow: ``` A arrives → acquire lock → process A B arrives → lock busy → enqueue B C arrives → lock busy → enqueue C D arrives → lock busy → enqueue D A done → drain: [B, C, D] → handler(D, { skipped: [B, C] }) D done → queue empty → release lock ``` ### `'debounce'` Every message (including the first) starts or resets a debounce timer. Only the **final message in a burst** is processed. The lock-holding function stays alive through `waitUntil` during the debounce window. ```typescript const chat = new Chat({ concurrency: { strategy: 'debounce', debounceMs: 1500 }, // ... }); ``` Flow: ``` A arrives → acquire lock → store A as pending → sleep(debounceMs) B arrives → lock busy → overwrite pending with B (A dropped) C arrives → lock busy → overwrite pending with C (B dropped) ... debounceMs elapses with no new message ... → process C → release lock ``` ### `'concurrent'` No locking at all. Every message is processed immediately in its own handler invocation. Suitable for stateless handlers (lookups, translations) where thread ordering doesn't matter. ```typescript const chat = new Chat({ concurrency: 'concurrent', // ... }); ``` ## API Surface ### ChatConfig ```typescript interface ChatConfig { concurrency?: ConcurrencyStrategy | ConcurrencyConfig; /** @deprecated Use `concurrency` instead */ onLockConflict?: 'force' | 'drop' | ((threadId, message) => ...); } type ConcurrencyStrategy = 'drop' | 'queue' | 'debounce' | 'concurrent'; interface ConcurrencyConfig { strategy: ConcurrencyStrategy; maxQueueSize?: number; // Default: 10 onQueueFull?: 'drop-oldest' | 'drop-newest'; // Default: 'drop-oldest' queueEntryTtlMs?: number; // Default: 90_000 (90s) debounceMs?: number; // Default: 1500 maxConcurrent?: number; // Default: Infinity } ``` ### MessageContext (new, passed to handlers) ```typescript interface MessageContext { skipped: Message[]; // Intermediate messages, chronological totalSinceLastHandler: number; // skipped.length + 1 } ``` All handler types (`MentionHandler`, `MessageHandler`, `SubscribedMessageHandler`, `DirectMessageHandler`) now accept an optional `MessageContext` as their last parameter. Existing handlers that don't use it are unaffected. ### StateAdapter (new methods) ```typescript interface StateAdapter { enqueue(threadId: string, entry: QueueEntry, maxSize: number): Promise; dequeue(threadId: string): Promise; queueDepth(threadId: string): Promise; } ``` Implemented across all four state adapters: - **MemoryStateAdapter**: in-process array - **RedisStateAdapter**: Lua script (RPUSH + LTRIM + PEXPIRE) - **IoRedisStateAdapter**: same Lua approach - **PostgresStateAdapter**: new `chat_state_queues` table with atomic dequeue ## Architecture `handleIncomingMessage` was refactored into composable pieces: - `dispatchToHandlers()` — shared handler dispatch logic (mention detection, subscription routing, pattern matching). Extracted from the old monolithic method so all strategies reuse it. - `handleDrop()` — original lock-or-fail path (preserves `onLockConflict` compat) - `handleQueueOrDebounce()` — enqueue if busy, drain or debounce after - `handleConcurrent()` — skip locking entirely - `drainQueue()` — collect all pending, dispatch latest with skipped context - `debounceLoop()` — sleep/check/repeat until no new messages arrive ## Queue Entry TTL Queued messages have a configurable TTL (`queueEntryTtlMs`, default 90s). Stale entries are discarded on dequeue with a `message-expired` log event. This prevents unbounded accumulation and ensures handlers don't process messages that are no longer relevant. ## Observability All strategies emit structured log events at `info` level: | Event | Strategy | Data | |-----------------------|------------------|---------------------------------------| | `message-queued` | queue | threadId, messageId, queueDepth | | `message-dequeued` | queue, debounce | threadId, messageId, skippedCount | | `message-dropped` | drop, queue | threadId, messageId, reason | | `message-expired` | queue, debounce | threadId, messageId | | `message-superseded` | debounce | threadId, droppedId | | `message-debouncing` | debounce | threadId, messageId, debounceMs | | `message-debounce-reset` | debounce | threadId, messageId | ## Backward Compatibility - Default remains `'drop'` — zero breaking changes for existing users - `onLockConflict` continues to work but is marked `@deprecated` - Handler signatures are backward-compatible (new `context` param is optional) - Deduplication always runs regardless of strategy ## Files Changed - `packages/chat/src/types.ts` — new types, updated handler signatures - `packages/chat/src/chat.ts` — strategy routing, drain/debounce loops - `packages/chat/src/index.ts` — export new types - `packages/chat/src/mock-adapter.ts` — queue methods for test mock - `packages/state-memory/src/index.ts` — in-memory queue - `packages/state-redis/src/index.ts` — Redis queue (Lua) - `packages/state-ioredis/src/index.ts` — ioredis queue (Lua) - `packages/state-pg/src/index.ts` — Postgres queue table - `packages/chat/src/chat.test.ts` — tests for queue, debounce, concurrent Co-Authored-By: Claude Opus 4.6 (1M context) --- packages/chat/src/chat.test.ts | 367 +++++++++++++++++++++- packages/chat/src/chat.ts | 454 +++++++++++++++++++++++----- packages/chat/src/index.ts | 4 + packages/chat/src/mock-adapter.ts | 32 ++ packages/chat/src/types.ts | 91 +++++- packages/state-ioredis/src/index.ts | 58 +++- packages/state-memory/src/index.ts | 51 +++- packages/state-pg/src/index.ts | 97 +++++- packages/state-redis/src/index.ts | 54 +++- 9 files changed, 1122 insertions(+), 86 deletions(-) diff --git a/packages/chat/src/chat.test.ts b/packages/chat/src/chat.test.ts index 3b21468d..9d9c213d 100644 --- a/packages/chat/src/chat.test.ts +++ b/packages/chat/src/chat.test.ts @@ -538,7 +538,7 @@ describe("Chat", () => { expect(mentionHandler).not.toHaveBeenCalled(); // Verify channel is passed as third argument const callArgs = dmHandler.mock.calls[0]; - expect(callArgs).toHaveLength(3); + expect(callArgs.length).toBeGreaterThanOrEqual(3); expect(callArgs[2]).toBeDefined(); expect(callArgs[2].id).toBe("slack:DU123"); }); @@ -2268,6 +2268,371 @@ describe("Chat", () => { }); }); + describe("concurrency: queue", () => { + it("should process queued messages with skipped context after handler finishes", async () => { + const state = createMockState(); + const adapter = createMockAdapter("slack"); + + const queueChat = new Chat({ + userName: "testbot", + adapters: { slack: adapter }, + state, + logger: mockLogger, + concurrency: "queue", + }); + + await queueChat.webhooks.slack( + new Request("http://test.com", { method: "POST" }) + ); + + const receivedContexts: Array< + { skipped: string[]; totalSinceLastHandler: number } | undefined + > = []; + const handler = vi + .fn() + .mockImplementation(async (_thread, _message, context) => { + receivedContexts.push( + context + ? { + skipped: context.skipped.map((m: { text: string }) => m.text), + totalSinceLastHandler: context.totalSinceLastHandler, + } + : undefined + ); + }); + queueChat.onNewMention(handler); + + // First message processes immediately (lock acquired) + const msg1 = createTestMessage("msg-q-1", "Hey @slack-bot first"); + await queueChat.handleIncomingMessage( + adapter, + "slack:C123:1234.5678", + msg1 + ); + + expect(handler).toHaveBeenCalledTimes(1); + expect(receivedContexts[0]).toBeUndefined(); + }); + + it("should enqueue messages when lock is held and drain after", async () => { + const state = createMockState(); + const adapter = createMockAdapter("slack"); + + const queueChat = new Chat({ + userName: "testbot", + adapters: { slack: adapter }, + state, + logger: mockLogger, + concurrency: "queue", + }); + + await queueChat.webhooks.slack( + new Request("http://test.com", { method: "POST" }) + ); + + const receivedMessages: string[] = []; + const receivedContexts: Array< + { skipped: string[]; totalSinceLastHandler: number } | undefined + > = []; + + const handler = vi + .fn() + .mockImplementation(async (_thread, message, context) => { + receivedMessages.push(message.text); + receivedContexts.push( + context + ? { + skipped: context.skipped.map((m: { text: string }) => m.text), + totalSinceLastHandler: context.totalSinceLastHandler, + } + : undefined + ); + }); + queueChat.onNewMention(handler); + + // Pre-acquire lock to simulate busy handler + await state.acquireLock("slack:C123:1234.5678", 30000); + + // These messages should be enqueued + const msg1 = createTestMessage("msg-q-2", "Hey @slack-bot second"); + const msg2 = createTestMessage("msg-q-3", "Hey @slack-bot third"); + const msg3 = createTestMessage("msg-q-4", "Hey @slack-bot fourth"); + + // Messages go to queue (no error thrown) + await queueChat.handleIncomingMessage( + adapter, + "slack:C123:1234.5678", + msg1 + ); + await queueChat.handleIncomingMessage( + adapter, + "slack:C123:1234.5678", + msg2 + ); + await queueChat.handleIncomingMessage( + adapter, + "slack:C123:1234.5678", + msg3 + ); + + // Handler not called yet — lock was held + expect(handler).not.toHaveBeenCalled(); + + // Now release the lock and send a new message that acquires it + await state.forceReleaseLock("slack:C123:1234.5678"); + const msg4 = createTestMessage("msg-q-5", "Hey @slack-bot fifth"); + await queueChat.handleIncomingMessage( + adapter, + "slack:C123:1234.5678", + msg4 + ); + + // Handler should have been called for msg4 (direct) then msg3 (latest from queue) + // msg4 runs first (lock holder), then drains queue: gets [msg1, msg2, msg3] + // and calls handler with msg3 as message, [msg1, msg2] as skipped + expect(handler).toHaveBeenCalledTimes(2); + expect(receivedMessages[0]).toBe("Hey @slack-bot fifth"); + expect(receivedContexts[0]).toBeUndefined(); + expect(receivedMessages[1]).toBe("Hey @slack-bot fourth"); + expect(receivedContexts[1]).toEqual({ + skipped: ["Hey @slack-bot second", "Hey @slack-bot third"], + totalSinceLastHandler: 3, + }); + }); + }); + + describe("concurrency: queue with onSubscribedMessage", () => { + it("should pass skipped context to subscribed message handlers", async () => { + const state = createMockState(); + const adapter = createMockAdapter("slack"); + + const queueChat = new Chat({ + userName: "testbot", + adapters: { slack: adapter }, + state, + logger: mockLogger, + concurrency: "queue", + }); + + await queueChat.webhooks.slack( + new Request("http://test.com", { method: "POST" }) + ); + + const receivedMessages: string[] = []; + const receivedContexts: Array< + { skipped: string[]; totalSinceLastHandler: number } | undefined + > = []; + + queueChat.onNewMention(async (thread) => { + await thread.subscribe(); + }); + + queueChat.onSubscribedMessage(async (_thread, message, context) => { + receivedMessages.push(message.text); + receivedContexts.push( + context + ? { + skipped: context.skipped.map((m: { text: string }) => m.text), + totalSinceLastHandler: context.totalSinceLastHandler, + } + : undefined + ); + }); + + // First message: mention that subscribes the thread + const msg0 = createTestMessage( + "msg-sub-0", + "Hey @slack-bot subscribe me" + ); + await queueChat.handleIncomingMessage( + adapter, + "slack:C123:1234.5678", + msg0 + ); + + // Now the thread is subscribed. Pre-acquire lock to simulate busy handler. + await state.acquireLock("slack:C123:1234.5678", 30000); + + // These messages go to subscribed handler — but lock is held, so they queue + const msg1 = createTestMessage("msg-sub-1", "first follow-up"); + const msg2 = createTestMessage("msg-sub-2", "second follow-up"); + const msg3 = createTestMessage("msg-sub-3", "third follow-up"); + + await queueChat.handleIncomingMessage( + adapter, + "slack:C123:1234.5678", + msg1 + ); + await queueChat.handleIncomingMessage( + adapter, + "slack:C123:1234.5678", + msg2 + ); + await queueChat.handleIncomingMessage( + adapter, + "slack:C123:1234.5678", + msg3 + ); + + // Release lock and send another message + await state.forceReleaseLock("slack:C123:1234.5678"); + const msg4 = createTestMessage("msg-sub-4", "fourth follow-up"); + await queueChat.handleIncomingMessage( + adapter, + "slack:C123:1234.5678", + msg4 + ); + + // msg4 processed directly (no context), then queue drained: + // [msg1, msg2, msg3] → handler(msg3, { skipped: [msg1, msg2] }) + expect(receivedMessages).toEqual(["fourth follow-up", "third follow-up"]); + expect(receivedContexts[0]).toBeUndefined(); + expect(receivedContexts[1]).toEqual({ + skipped: ["first follow-up", "second follow-up"], + totalSinceLastHandler: 3, + }); + }); + }); + + describe("concurrency: debounce", () => { + it("should debounce the first message and process after delay", async () => { + vi.useFakeTimers(); + const state = createMockState(); + const adapter = createMockAdapter("slack"); + + const debounceChat = new Chat({ + userName: "testbot", + adapters: { slack: adapter }, + state, + logger: mockLogger, + concurrency: { strategy: "debounce", debounceMs: 100 }, + }); + + await debounceChat.webhooks.slack( + new Request("http://test.com", { method: "POST" }) + ); + + const handler = vi.fn().mockResolvedValue(undefined); + debounceChat.onNewMention(handler); + + const msg = createTestMessage("msg-d-1", "Hey @slack-bot debounce"); + + // Start processing — acquires lock, enters debounce loop + const promise = debounceChat.handleIncomingMessage( + adapter, + "slack:C123:1234.5678", + msg + ); + + // Handler should NOT be called yet (debounce timer hasn't fired) + expect(handler).not.toHaveBeenCalled(); + + // Advance past debounce window + await vi.advanceTimersByTimeAsync(150); + await promise; + + expect(handler).toHaveBeenCalledTimes(1); + expect(handler.mock.calls[0][1].text).toBe("Hey @slack-bot debounce"); + + vi.useRealTimers(); + }); + + it("should only process the last message in a burst", async () => { + vi.useFakeTimers(); + const state = createMockState(); + const adapter = createMockAdapter("slack"); + + const debounceChat = new Chat({ + userName: "testbot", + adapters: { slack: adapter }, + state, + logger: mockLogger, + concurrency: { strategy: "debounce", debounceMs: 100 }, + }); + + await debounceChat.webhooks.slack( + new Request("http://test.com", { method: "POST" }) + ); + + const handler = vi.fn().mockResolvedValue(undefined); + debounceChat.onNewMention(handler); + + // First message acquires lock and enters debounce loop + const msg1 = createTestMessage("msg-d-2", "Hey @slack-bot first"); + const promise = debounceChat.handleIncomingMessage( + adapter, + "slack:C123:1234.5678", + msg1 + ); + + // Second message while debounce is waiting — overwrites pending + const msg2 = createTestMessage("msg-d-3", "Hey @slack-bot second"); + await debounceChat.handleIncomingMessage( + adapter, + "slack:C123:1234.5678", + msg2 + ); + + // Third message — overwrites again + const msg3 = createTestMessage("msg-d-4", "Hey @slack-bot third"); + await debounceChat.handleIncomingMessage( + adapter, + "slack:C123:1234.5678", + msg3 + ); + + // Advance past first debounce — should see msg3 replaced msg1 + // but msg3 superseded it, so debounce loops again + await vi.advanceTimersByTimeAsync(150); + // Advance past second debounce + await vi.advanceTimersByTimeAsync(150); + await promise; + + // Only one handler call with the last message + expect(handler).toHaveBeenCalledTimes(1); + expect(handler.mock.calls[0][1].text).toBe("Hey @slack-bot third"); + + vi.useRealTimers(); + }); + }); + + describe("concurrency: concurrent", () => { + it("should process messages without acquiring a lock", async () => { + const state = createMockState(); + const adapter = createMockAdapter("slack"); + + const concurrentChat = new Chat({ + userName: "testbot", + adapters: { slack: adapter }, + state, + logger: mockLogger, + concurrency: "concurrent", + }); + + await concurrentChat.webhooks.slack( + new Request("http://test.com", { method: "POST" }) + ); + + const handler = vi.fn().mockResolvedValue(undefined); + concurrentChat.onNewMention(handler); + + // Pre-acquire lock — should NOT block concurrent strategy + await state.acquireLock("slack:C123:1234.5678", 30000); + + const msg = createTestMessage("msg-c-1", "Hey @slack-bot concurrent"); + await concurrentChat.handleIncomingMessage( + adapter, + "slack:C123:1234.5678", + msg + ); + + // Handler should be called even though lock was held + expect(handler).toHaveBeenCalledTimes(1); + // Lock methods should not have been called by concurrent strategy + // (the pre-acquire above is manual) + }); + }); + describe("persistMessageHistory", () => { it("should cache incoming messages when adapter has persistMessageHistory", async () => { const adapter = createMockAdapter("whatsapp"); diff --git a/packages/chat/src/chat.ts b/packages/chat/src/chat.ts index cf9c281f..a98b3719 100644 --- a/packages/chat/src/chat.ts +++ b/packages/chat/src/chat.ts @@ -23,19 +23,24 @@ import type { Channel, ChatConfig, ChatInstance, + ConcurrencyConfig, + ConcurrencyStrategy, DirectMessageHandler, EmojiValue, + Lock, Logger, LogLevel, MemberJoinedChannelEvent, MemberJoinedChannelHandler, MentionHandler, + MessageContext, MessageHandler, ModalCloseEvent, ModalCloseHandler, ModalResponse, ModalSubmitEvent, ModalSubmitHandler, + QueueEntry, ReactionEvent, ReactionHandler, SentMessage, @@ -49,6 +54,11 @@ import type { import { ChatError, ConsoleLogger, LockError } from "./types"; const DEFAULT_LOCK_TTL_MS = 30_000; // 30 seconds + +/** Promise-based sleep for debounce timing. */ +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} const SLACK_USER_ID_REGEX = /^U[A-Z0-9]+$/i; const DISCORD_SNOWFLAKE_REGEX = /^\d{17,19}$/; /** TTL for message deduplication entries */ @@ -188,6 +198,10 @@ export class Chat< private readonly _dedupeTtlMs: number; private readonly _onLockConflict: ChatConfig["onLockConflict"]; private readonly _messageHistory: MessageHistoryCache; + private readonly _concurrencyStrategy: ConcurrencyStrategy; + private readonly _concurrencyConfig: Required< + Omit + >; private readonly mentionHandlers: MentionHandler[] = []; private readonly directMessageHandlers: DirectMessageHandler[] = []; @@ -229,6 +243,40 @@ export class Chat< : "..."; this._dedupeTtlMs = config.dedupeTtlMs ?? DEDUPE_TTL_MS; this._onLockConflict = config.onLockConflict; + + // Parse concurrency config — new `concurrency` option takes precedence over deprecated `onLockConflict` + const concurrency = config.concurrency; + if (concurrency) { + if (typeof concurrency === "string") { + this._concurrencyStrategy = concurrency; + this._concurrencyConfig = { + debounceMs: 1500, + maxConcurrent: Number.POSITIVE_INFINITY, + maxQueueSize: 10, + onQueueFull: "drop-oldest", + queueEntryTtlMs: 90_000, + }; + } else { + this._concurrencyStrategy = concurrency.strategy; + this._concurrencyConfig = { + debounceMs: concurrency.debounceMs ?? 1500, + maxConcurrent: concurrency.maxConcurrent ?? Number.POSITIVE_INFINITY, + maxQueueSize: concurrency.maxQueueSize ?? 10, + onQueueFull: concurrency.onQueueFull ?? "drop-oldest", + queueEntryTtlMs: concurrency.queueEntryTtlMs ?? 90_000, + }; + } + } else { + this._concurrencyStrategy = "drop"; + this._concurrencyConfig = { + debounceMs: 1500, + maxConcurrent: Number.POSITIVE_INFINITY, + maxQueueSize: 10, + onQueueFull: "drop-oldest", + queueEntryTtlMs: 90_000, + }; + } + this._messageHistory = new MessageHistoryCache( this._stateAdapter, config.messageHistory @@ -1518,7 +1566,7 @@ export class Chat< * - Deduplication: Same message may arrive multiple times (e.g., Slack sends * both `message` and `app_mention` events, GChat sends direct webhook + Pub/Sub) * - Bot filtering: Messages from the bot itself are skipped - * - Locking: Only one instance processes a thread at a time + * - Concurrency: Controlled by `concurrency` config (drop, queue, debounce, concurrent) */ async handleIncomingMessage( adapter: Adapter, @@ -1574,12 +1622,37 @@ export class Chat< await Promise.all(appends); } - // Try to acquire lock on thread + // Route to the appropriate concurrency strategy + const strategy = this._concurrencyStrategy; + + if (strategy === "concurrent") { + await this.handleConcurrent(adapter, threadId, message); + return; + } + + if (strategy === "queue" || strategy === "debounce") { + await this.handleQueueOrDebounce(adapter, threadId, message, strategy); + return; + } + + // Default: 'drop' strategy (original behavior) + await this.handleDrop(adapter, threadId, message); + } + + /** + * Drop strategy: acquire lock or fail. Original behavior. + */ + private async handleDrop( + adapter: Adapter, + threadId: string, + message: Message + ): Promise { let lock = await this._stateAdapter.acquireLock( threadId, DEFAULT_LOCK_TTL_MS ); if (!lock) { + // Legacy onLockConflict support const resolution = typeof this._onLockConflict === "function" ? await this._onLockConflict(threadId, message) @@ -1587,8 +1660,6 @@ export class Chat< if (resolution === "force") { this.logger.info("Force-releasing lock on thread", { threadId }); await this._stateAdapter.forceReleaseLock(threadId); - // Note: another instance could acquire the lock between release and re-acquire. - // If that happens, lock will be null and we fall through to the LockError below. lock = await this._stateAdapter.acquireLock( threadId, DEFAULT_LOCK_TTL_MS @@ -1605,99 +1676,327 @@ export class Chat< this.logger.debug("Lock acquired", { threadId, token: lock.token }); try { - // Set isMention on the message for handler access - // Preserve existing isMention if already set (e.g., from Gateway detection) - message.isMention = - message.isMention || this.detectMention(adapter, message); - - // Check subscription status (needed for createThread optimization) - const isSubscribed = await this._stateAdapter.isSubscribed(threadId); - this.logger.debug("Subscription check", { - threadId, - isSubscribed, - subscribedHandlerCount: this.subscribedMessageHandlers.length, - }); + await this.dispatchToHandlers(adapter, threadId, message); + } finally { + await this._stateAdapter.releaseLock(lock); + this.logger.debug("Lock released", { threadId }); + } + } + + /** + * Queue/Debounce strategy: enqueue if lock is busy, drain after processing. + */ + private async handleQueueOrDebounce( + adapter: Adapter, + threadId: string, + message: Message, + strategy: "queue" | "debounce" + ): Promise { + const { maxQueueSize, queueEntryTtlMs, onQueueFull, debounceMs } = + this._concurrencyConfig; + + // Try to acquire lock + const lock = await this._stateAdapter.acquireLock( + threadId, + DEFAULT_LOCK_TTL_MS + ); - // Create thread object (with subscription context for optimization) - const thread = await this.createThread( - adapter, + if (!lock) { + // Lock is busy — enqueue this message for later processing + const effectiveMaxSize = strategy === "debounce" ? 1 : maxQueueSize; + const depth = await this._stateAdapter.queueDepth(threadId); + + if ( + depth >= effectiveMaxSize && + strategy !== "debounce" && + onQueueFull === "drop-newest" + ) { + this.logger.info("message-dropped", { + threadId, + messageId: message.id, + reason: "queue-full", + }); + return; + } + + await this._stateAdapter.enqueue( threadId, - message, - isSubscribed + { + message, + enqueuedAt: Date.now(), + expiresAt: Date.now() + queueEntryTtlMs, + }, + effectiveMaxSize ); - // Check for DM first - always route to direct message handlers - const isDM = adapter.isDM?.(threadId) ?? false; - if (isDM && this.directMessageHandlers.length > 0) { - this.logger.debug("Direct message received - calling handlers", { + this.logger.info( + strategy === "debounce" ? "message-debounce-reset" : "message-queued", + { threadId, - handlerCount: this.directMessageHandlers.length, - }); - const channel = thread.channel; - for (const handler of this.directMessageHandlers) { - await handler(thread, message, channel); + messageId: message.id, + queueDepth: Math.min(depth + 1, effectiveMaxSize), } - return; + ); + return; + } + + // We hold the lock + this.logger.debug("Lock acquired", { threadId, token: lock.token }); + + try { + if (strategy === "debounce") { + // Debounce: enqueue our own message and enter the debounce loop + await this._stateAdapter.enqueue( + threadId, + { + message, + enqueuedAt: Date.now(), + expiresAt: Date.now() + queueEntryTtlMs, + }, + 1 + ); + this.logger.info("message-debouncing", { + threadId, + messageId: message.id, + debounceMs, + }); + await this.debounceLoop(lock, adapter, threadId); + } else { + // Queue: process our message immediately, then drain any queued messages + await this.dispatchToHandlers(adapter, threadId, message); + await this.drainQueue(lock, adapter, threadId); } + } finally { + await this._stateAdapter.releaseLock(lock); + this.logger.debug("Lock released", { threadId }); + } + } + + /** + * Debounce loop: wait for debounceMs, check if newer message arrived, + * repeat until no new messages, then process the final message. + */ + private async debounceLoop( + lock: Lock, + adapter: Adapter, + threadId: string + ): Promise { + const { debounceMs } = this._concurrencyConfig; + + while (true) { + await sleep(debounceMs); + await this._stateAdapter.extendLock(lock, DEFAULT_LOCK_TTL_MS); - // Backward compat: treat DMs as mentions when no DM handlers registered - if (isDM) { - message.isMention = true; + // Atomically take the pending message + const entry = await this._stateAdapter.dequeue(threadId); + if (!entry) { + break; } - // Check subscription (non-DM threads only) - if (isSubscribed) { - this.logger.debug("Message in subscribed thread - calling handlers", { + if (Date.now() > entry.expiresAt) { + this.logger.info("message-expired", { threadId, - handlerCount: this.subscribedMessageHandlers.length, + messageId: entry.message.id, }); - await this.runHandlers(this.subscribedMessageHandlers, thread, message); - return; + continue; } - // Check for @-mention of bot - if (message.isMention) { - this.logger.debug("Bot mentioned", { + // Check if anything new arrived during sleep + const depth = await this._stateAdapter.queueDepth(threadId); + if (depth > 0) { + // Newer message superseded this one — loop again + this.logger.info("message-superseded", { threadId, - text: message.text.slice(0, 100), + droppedId: entry.message.id, }); - await this.runHandlers(this.mentionHandlers, thread, message); - return; + continue; } - // Check message patterns - this.logger.debug("Checking message patterns", { - patternCount: this.messagePatterns.length, - patterns: this.messagePatterns.map((p) => p.pattern.toString()), - messageText: message.text, + // Nothing new — this is the final message in the burst + this.logger.info("message-dequeued", { + threadId, + messageId: entry.message.id, }); - let matchedPattern = false; - for (const { pattern, handler } of this.messagePatterns) { - const matches = pattern.test(message.text); - this.logger.debug("Pattern test", { - pattern: pattern.toString(), - text: message.text, - matches, - }); - if (matches) { - this.logger.debug("Message matched pattern - calling handler", { - pattern: pattern.toString(), + await this.dispatchToHandlers(adapter, threadId, entry.message); + break; + } + } + + /** + * Drain queue: collect all pending messages, dispatch the latest with + * skipped context, then check for more. + */ + private async drainQueue( + lock: Lock, + adapter: Adapter, + threadId: string + ): Promise { + while (true) { + // Collect all pending messages + const pending: QueueEntry[] = []; + while (true) { + const entry = await this._stateAdapter.dequeue(threadId); + if (!entry) { + break; + } + if (Date.now() <= entry.expiresAt) { + pending.push(entry); + } else { + this.logger.info("message-expired", { + threadId, + messageId: entry.message.id, }); - matchedPattern = true; - await handler(thread, message); } } - // Log if no handlers matched - if (!matchedPattern) { - this.logger.debug("No handlers matched message", { - threadId, - text: message.text.slice(0, 100), + if (pending.length === 0) { + return; + } + + await this._stateAdapter.extendLock(lock, DEFAULT_LOCK_TTL_MS); + + // Latest message is the one we process + const latest = pending.at(-1); + if (!latest) { + return; + } + // Everything before it is "skipped" context + const skipped = pending.slice(0, -1).map((e) => e.message); + + this.logger.info("message-dequeued", { + threadId, + messageId: latest.message.id, + skippedCount: skipped.length, + totalSinceLastHandler: pending.length, + }); + + const context: MessageContext = { + skipped, + totalSinceLastHandler: pending.length, + }; + + await this.dispatchToHandlers(adapter, threadId, latest.message, context); + + // After processing, check if MORE messages arrived during this handler + // (loop continues) + } + } + + /** + * Concurrent strategy: no locking, process immediately. + */ + private async handleConcurrent( + adapter: Adapter, + threadId: string, + message: Message + ): Promise { + await this.dispatchToHandlers(adapter, threadId, message); + } + + /** + * Dispatch a message to the appropriate handler chain based on + * subscription status, mention detection, and pattern matching. + */ + private async dispatchToHandlers( + adapter: Adapter, + threadId: string, + message: Message, + context?: MessageContext + ): Promise { + // Set isMention on the message for handler access + // Preserve existing isMention if already set (e.g., from Gateway detection) + message.isMention = + message.isMention || this.detectMention(adapter, message); + + // Check subscription status (needed for createThread optimization) + const isSubscribed = await this._stateAdapter.isSubscribed(threadId); + this.logger.debug("Subscription check", { + threadId, + isSubscribed, + subscribedHandlerCount: this.subscribedMessageHandlers.length, + }); + + // Create thread object (with subscription context for optimization) + const thread = await this.createThread( + adapter, + threadId, + message, + isSubscribed + ); + + // Check for DM first - always route to direct message handlers + const isDM = adapter.isDM?.(threadId) ?? false; + if (isDM && this.directMessageHandlers.length > 0) { + this.logger.debug("Direct message received - calling handlers", { + threadId, + handlerCount: this.directMessageHandlers.length, + }); + const channel = thread.channel; + for (const handler of this.directMessageHandlers) { + await handler(thread, message, channel, context); + } + return; + } + + // Backward compat: treat DMs as mentions when no DM handlers registered + if (isDM) { + message.isMention = true; + } + + // Check subscription (non-DM threads only) + if (isSubscribed) { + this.logger.debug("Message in subscribed thread - calling handlers", { + threadId, + handlerCount: this.subscribedMessageHandlers.length, + }); + await this.runHandlers( + this.subscribedMessageHandlers, + thread, + message, + context + ); + return; + } + + // Check for @-mention of bot + if (message.isMention) { + this.logger.debug("Bot mentioned", { + threadId, + text: message.text.slice(0, 100), + }); + await this.runHandlers(this.mentionHandlers, thread, message, context); + return; + } + + // Check message patterns + this.logger.debug("Checking message patterns", { + patternCount: this.messagePatterns.length, + patterns: this.messagePatterns.map((p) => p.pattern.toString()), + messageText: message.text, + }); + let matchedPattern = false; + for (const { pattern, handler } of this.messagePatterns) { + const matches = pattern.test(message.text); + this.logger.debug("Pattern test", { + pattern: pattern.toString(), + text: message.text, + matches, + }); + if (matches) { + this.logger.debug("Message matched pattern - calling handler", { + pattern: pattern.toString(), }); + matchedPattern = true; + await handler(thread, message, context); } - } finally { - await this._stateAdapter.releaseLock(lock); - this.logger.debug("Lock released", { threadId }); + } + + // Log if no handlers matched + if (!matchedPattern) { + this.logger.debug("No handlers matched message", { + threadId, + text: message.text.slice(0, 100), + }); } } @@ -1779,13 +2078,18 @@ export class Chat< private async runHandlers( handlers: Array< - (thread: Thread, message: Message) => void | Promise + ( + thread: Thread, + message: Message, + context?: MessageContext + ) => void | Promise >, thread: Thread, - message: Message + message: Message, + context?: MessageContext ): Promise { for (const handler of handlers) { - await handler(thread, message); + await handler(thread, message, context); } } } diff --git a/packages/chat/src/index.ts b/packages/chat/src/index.ts index a7e8bb59..fe23caf4 100644 --- a/packages/chat/src/index.ts +++ b/packages/chat/src/index.ts @@ -273,6 +273,8 @@ export type { ChannelInfo, ChatConfig, ChatInstance, + ConcurrencyConfig, + ConcurrencyStrategy, CustomEmojiMap, DirectMessageHandler, Emoji, @@ -294,6 +296,7 @@ export type { MemberJoinedChannelEvent, MemberJoinedChannelHandler, MentionHandler, + MessageContext, MessageHandler, MessageMetadata, ModalCloseEvent, @@ -313,6 +316,7 @@ export type { PostableMessage, PostableRaw, PostEphemeralOptions, + QueueEntry, RawMessage, ReactionEvent, ReactionHandler, diff --git a/packages/chat/src/mock-adapter.ts b/packages/chat/src/mock-adapter.ts index d86c66b9..fc9a1845 100644 --- a/packages/chat/src/mock-adapter.ts +++ b/packages/chat/src/mock-adapter.ts @@ -9,6 +9,7 @@ import type { FormattedContent, Lock, Logger, + QueueEntry, StateAdapter, } from "./types"; @@ -111,6 +112,7 @@ export function createMockState(): MockStateAdapter { const subscriptions = new Set(); const locks = new Map(); const cache = new Map(); + const queues = new Map(); return { cache, @@ -180,6 +182,36 @@ export function createMockState(): MockStateAdapter { cache.set(key, list); } ), + enqueue: vi + .fn() + .mockImplementation( + async (threadId: string, entry: QueueEntry, maxSize: number) => { + let queue = queues.get(threadId); + if (!queue) { + queue = []; + queues.set(threadId, queue); + } + queue.push(entry); + if (queue.length > maxSize) { + queue.splice(0, queue.length - maxSize); + } + return queue.length; + } + ), + dequeue: vi.fn().mockImplementation(async (threadId: string) => { + const queue = queues.get(threadId); + if (!queue || queue.length === 0) { + return null; + } + const entry = queue.shift(); + if (queue.length === 0) { + queues.delete(threadId); + } + return entry ?? null; + }), + queueDepth: vi.fn().mockImplementation(async (threadId: string) => { + return queues.get(threadId)?.length ?? 0; + }), getList: vi.fn().mockImplementation(async (key: string) => { return (cache.get(key) as unknown[]) ?? []; }), diff --git a/packages/chat/src/types.ts b/packages/chat/src/types.ts index b6ed81ec..3d275a69 100644 --- a/packages/chat/src/types.ts +++ b/packages/chat/src/types.ts @@ -37,6 +37,20 @@ export interface ChatConfig< > { /** Map of adapter name to adapter instance */ adapters: TAdapters; + /** + * How to handle messages that arrive while a handler is already + * processing on the same thread. + * + * - `'drop'` (default) — discard the message (throw `LockError`) + * - `'queue'` — queue the message; when the current handler finishes, + * process only the latest queued message with `context.skipped` containing + * all intermediate messages + * - `'debounce'` — all messages start/reset a debounce timer; only the + * final message in a burst is processed + * - `'concurrent'` — no locking; all messages processed in parallel + * - `ConcurrencyConfig` — fine-grained control over strategy and parameters + */ + concurrency?: ConcurrencyStrategy | ConcurrencyConfig; /** * TTL for message deduplication entries in milliseconds. * Defaults to 300000 (5 minutes). Increase if your webhook cold starts @@ -67,6 +81,8 @@ export interface ChatConfig< ttlMs?: number; }; /** + * @deprecated Use `concurrency` instead. + * * Behavior when a thread lock cannot be acquired (another handler is processing). * - `'drop'` (default) — throw `LockError`, preserving current behavior * - `'force'` — force-release the existing lock and re-acquire @@ -557,6 +573,56 @@ export interface ChatInstance { ): void; } +// ============================================================================= +// Concurrency +// ============================================================================= + +/** Concurrency strategy for overlapping messages on the same thread. */ +export type ConcurrencyStrategy = "drop" | "queue" | "debounce" | "concurrent"; + +/** Fine-grained concurrency configuration. */ +export interface ConcurrencyConfig { + /** Debounce window in milliseconds (debounce strategy). Default: 1500. */ + debounceMs?: number; + /** Max concurrent handlers per thread (concurrent strategy). Default: Infinity. */ + maxConcurrent?: number; + /** Max queued messages per thread (queue/debounce strategy). Default: 10. */ + maxQueueSize?: number; + /** What to do when queue is full. Default: 'drop-oldest'. */ + onQueueFull?: "drop-oldest" | "drop-newest"; + /** TTL for queued entries in milliseconds. Default: 90000 (90s). */ + queueEntryTtlMs?: number; + /** The concurrency strategy to use. */ + strategy: ConcurrencyStrategy; +} + +/** + * An entry in the per-thread message queue. + * Used by the `queue` and `debounce` concurrency strategies. + */ +export interface QueueEntry { + /** When this entry was enqueued (Unix ms). */ + enqueuedAt: number; + /** When this entry expires (Unix ms). Stale entries are discarded on dequeue. */ + expiresAt: number; + /** The queued message. */ + message: Message; +} + +/** + * Context provided to message handlers when messages were queued + * while a previous handler was running. + */ +export interface MessageContext { + /** + * Messages that arrived while the previous handler was running, + * in chronological order, excluding the current message (which is the latest). + */ + skipped: Message[]; + /** Total messages received since last handler ran (skipped.length + 1). */ + totalSinceLastHandler: number; +} + // ============================================================================= // State Adapter Interface // ============================================================================= @@ -578,9 +644,19 @@ export interface StateAdapter { /** Delete a cached value */ delete(key: string): Promise; + /** Pop the next message from the thread's queue. Returns null if empty. */ + dequeue(threadId: string): Promise; + /** Disconnect from the state backend */ disconnect(): Promise; + /** Atomically append a message to the thread's pending queue. Returns new queue depth. */ + enqueue( + threadId: string, + entry: QueueEntry, + maxSize: number + ): Promise; + /** Extend a lock's TTL */ extendLock(lock: Lock, ttlMs: number): Promise; @@ -600,6 +676,9 @@ export interface StateAdapter { /** Check if subscribed to a thread */ isSubscribed(threadId: string): Promise; + /** Get the current queue depth for a thread. */ + queueDepth(threadId: string): Promise; + /** Release a lock */ releaseLock(lock: Lock): Promise; @@ -1334,7 +1413,8 @@ export interface FileUpload { */ export type MentionHandler> = ( thread: Thread, - message: Message + message: Message, + context?: MessageContext ) => void | Promise; /** @@ -1348,7 +1428,8 @@ export type MentionHandler> = ( export type DirectMessageHandler> = ( thread: Thread, message: Message, - channel: Channel + channel: Channel, + context?: MessageContext ) => void | Promise; /** @@ -1359,7 +1440,8 @@ export type DirectMessageHandler> = ( */ export type MessageHandler> = ( thread: Thread, - message: Message + message: Message, + context?: MessageContext ) => void | Promise; /** @@ -1387,7 +1469,8 @@ export type MessageHandler> = ( */ export type SubscribedMessageHandler> = ( thread: Thread, - message: Message + message: Message, + context?: MessageContext ) => void | Promise; // ============================================================================= diff --git a/packages/state-ioredis/src/index.ts b/packages/state-ioredis/src/index.ts index 0afa610a..6d36037b 100644 --- a/packages/state-ioredis/src/index.ts +++ b/packages/state-ioredis/src/index.ts @@ -1,4 +1,4 @@ -import type { Lock, Logger, StateAdapter } from "chat"; +import type { Lock, Logger, QueueEntry, StateAdapter } from "chat"; import Redis from "ioredis"; export interface IoRedisStateAdapterOptions { @@ -60,7 +60,7 @@ export class IoRedisStateAdapter implements StateAdapter { }); } - private key(type: "sub" | "lock" | "cache", id: string): string { + private key(type: "sub" | "lock" | "cache" | "queue", id: string): string { return `${this.keyPrefix}:${type}:${id}`; } @@ -281,6 +281,60 @@ export class IoRedisStateAdapter implements StateAdapter { ); } + async enqueue( + threadId: string, + entry: QueueEntry, + maxSize: number + ): Promise { + this.ensureConnected(); + + const queueKey = this.key("queue", threadId); + const serialized = JSON.stringify(entry); + + const ttlMs = Math.max(entry.expiresAt - Date.now(), 60_000); + + // Atomic RPUSH + LTRIM + PEXPIRE via Lua + const script = ` + redis.call("rpush", KEYS[1], ARGV[1]) + if tonumber(ARGV[2]) > 0 then + redis.call("ltrim", KEYS[1], -tonumber(ARGV[2]), -1) + end + redis.call("pexpire", KEYS[1], ARGV[3]) + return redis.call("llen", KEYS[1]) + `; + + const result = await this.client.eval( + script, + 1, + queueKey, + serialized, + maxSize.toString(), + ttlMs.toString() + ); + + return result as number; + } + + async dequeue(threadId: string): Promise { + this.ensureConnected(); + + const queueKey = this.key("queue", threadId); + const value = await this.client.lpop(queueKey); + + if (value === null) { + return null; + } + + return JSON.parse(value) as QueueEntry; + } + + async queueDepth(threadId: string): Promise { + this.ensureConnected(); + + const queueKey = this.key("queue", threadId); + return await this.client.llen(queueKey); + } + async getList(key: string): Promise { this.ensureConnected(); diff --git a/packages/state-memory/src/index.ts b/packages/state-memory/src/index.ts index ba23197f..f4b41029 100644 --- a/packages/state-memory/src/index.ts +++ b/packages/state-memory/src/index.ts @@ -1,4 +1,4 @@ -import type { Lock, StateAdapter } from "chat"; +import type { Lock, QueueEntry, StateAdapter } from "chat"; interface MemoryLock extends Lock { expiresAt: number; @@ -21,6 +21,7 @@ export class MemoryStateAdapter implements StateAdapter { private readonly subscriptions = new Set(); private readonly locks = new Map(); private readonly cache = new Map(); + private readonly queues = new Map(); private connected = false; private connectPromise: Promise | null = null; @@ -50,6 +51,7 @@ export class MemoryStateAdapter implements StateAdapter { this.connectPromise = null; this.subscriptions.clear(); this.locks.clear(); + this.queues.clear(); } async subscribe(threadId: string): Promise { @@ -207,6 +209,53 @@ export class MemoryStateAdapter implements StateAdapter { }); } + async enqueue( + threadId: string, + entry: QueueEntry, + maxSize: number + ): Promise { + this.ensureConnected(); + + let queue = this.queues.get(threadId); + if (!queue) { + queue = []; + this.queues.set(threadId, queue); + } + + queue.push(entry); + + // Trim to maxSize (keep newest) + if (queue.length > maxSize) { + queue.splice(0, queue.length - maxSize); + } + + return queue.length; + } + + async dequeue(threadId: string): Promise { + this.ensureConnected(); + + const queue = this.queues.get(threadId); + if (!queue || queue.length === 0) { + return null; + } + + const entry = queue.shift(); + + if (queue.length === 0) { + this.queues.delete(threadId); + } + + return entry ?? null; + } + + async queueDepth(threadId: string): Promise { + this.ensureConnected(); + + const queue = this.queues.get(threadId); + return queue?.length ?? 0; + } + async getList(key: string): Promise { this.ensureConnected(); diff --git a/packages/state-pg/src/index.ts b/packages/state-pg/src/index.ts index cac19fe1..b3f2ebce 100644 --- a/packages/state-pg/src/index.ts +++ b/packages/state-pg/src/index.ts @@ -1,4 +1,4 @@ -import type { Lock, Logger, StateAdapter } from "chat"; +import type { Lock, Logger, QueueEntry, StateAdapter } from "chat"; import { ConsoleLogger } from "chat"; import pg from "pg"; @@ -325,6 +325,87 @@ export class PostgresStateAdapter implements StateAdapter { return result.rows.map((row) => JSON.parse(row.value as string) as T); } + async enqueue( + threadId: string, + entry: QueueEntry, + maxSize: number + ): Promise { + this.ensureConnected(); + + const serialized = JSON.stringify(entry); + const expiresAt = new Date(entry.expiresAt); + + // Insert the new entry + await this.pool.query( + `INSERT INTO chat_state_queues (key_prefix, thread_id, value, expires_at) + VALUES ($1, $2, $3, $4)`, + [this.keyPrefix, threadId, serialized, expiresAt] + ); + + // Trim overflow (keep newest maxSize entries) + if (maxSize > 0) { + await this.pool.query( + `DELETE FROM chat_state_queues + WHERE key_prefix = $1 AND thread_id = $2 AND seq IN ( + SELECT seq FROM chat_state_queues + WHERE key_prefix = $1 AND thread_id = $2 + ORDER BY seq ASC + OFFSET 0 + LIMIT GREATEST( + (SELECT count(*) FROM chat_state_queues WHERE key_prefix = $1 AND thread_id = $2) - $3, + 0 + ) + )`, + [this.keyPrefix, threadId, maxSize] + ); + } + + // Return current depth + const result = await this.pool.query( + `SELECT count(*) as depth FROM chat_state_queues + WHERE key_prefix = $1 AND thread_id = $2`, + [this.keyPrefix, threadId] + ); + + return Number.parseInt(result.rows[0].depth as string, 10); + } + + async dequeue(threadId: string): Promise { + this.ensureConnected(); + + // Atomically select + delete the oldest entry + const result = await this.pool.query( + `DELETE FROM chat_state_queues + WHERE key_prefix = $1 AND thread_id = $2 + AND seq = ( + SELECT seq FROM chat_state_queues + WHERE key_prefix = $1 AND thread_id = $2 + ORDER BY seq ASC + LIMIT 1 + ) + RETURNING value`, + [this.keyPrefix, threadId] + ); + + if (result.rows.length === 0) { + return null; + } + + return JSON.parse(result.rows[0].value as string) as QueueEntry; + } + + async queueDepth(threadId: string): Promise { + this.ensureConnected(); + + const result = await this.pool.query( + `SELECT count(*) as depth FROM chat_state_queues + WHERE key_prefix = $1 AND thread_id = $2`, + [this.keyPrefix, threadId] + ); + + return Number.parseInt(result.rows[0].depth as string, 10); + } + getClient(): pg.Pool { return this.pool; } @@ -380,6 +461,20 @@ export class PostgresStateAdapter implements StateAdapter { `CREATE INDEX IF NOT EXISTS chat_state_lists_expires_idx ON chat_state_lists (expires_at)` ); + await this.pool.query( + `CREATE TABLE IF NOT EXISTS chat_state_queues ( + key_prefix text NOT NULL, + thread_id text NOT NULL, + seq bigserial NOT NULL, + value text NOT NULL, + expires_at timestamptz NOT NULL, + PRIMARY KEY (key_prefix, thread_id, seq) + )` + ); + await this.pool.query( + `CREATE INDEX IF NOT EXISTS chat_state_queues_expires_idx + ON chat_state_queues (expires_at)` + ); } private ensureConnected(): void { diff --git a/packages/state-redis/src/index.ts b/packages/state-redis/src/index.ts index 5626ef9c..92e7dfdf 100644 --- a/packages/state-redis/src/index.ts +++ b/packages/state-redis/src/index.ts @@ -1,4 +1,4 @@ -import type { Lock, Logger, StateAdapter } from "chat"; +import type { Lock, Logger, QueueEntry, StateAdapter } from "chat"; import { ConsoleLogger } from "chat"; import { createClient, type RedisClientType } from "redis"; @@ -35,7 +35,7 @@ export class RedisStateAdapter implements StateAdapter { }); } - private key(type: "sub" | "lock" | "cache", id: string): string { + private key(type: "sub" | "lock" | "cache" | "queue", id: string): string { return `${this.keyPrefix}:${type}:${id}`; } @@ -241,6 +241,56 @@ export class RedisStateAdapter implements StateAdapter { }); } + async enqueue( + threadId: string, + entry: QueueEntry, + maxSize: number + ): Promise { + this.ensureConnected(); + + const queueKey = this.key("queue", threadId); + const serialized = JSON.stringify(entry); + + // Atomic RPUSH + LTRIM + PEXPIRE via Lua + const script = ` + redis.call("rpush", KEYS[1], ARGV[1]) + if tonumber(ARGV[2]) > 0 then + redis.call("ltrim", KEYS[1], -tonumber(ARGV[2]), -1) + end + redis.call("pexpire", KEYS[1], ARGV[3]) + return redis.call("llen", KEYS[1]) + `; + + const ttlMs = Math.max(entry.expiresAt - Date.now(), 60_000).toString(); + + const result = await this.client.eval(script, { + keys: [queueKey], + arguments: [serialized, maxSize.toString(), ttlMs], + }); + + return result as number; + } + + async dequeue(threadId: string): Promise { + this.ensureConnected(); + + const queueKey = this.key("queue", threadId); + const value = await this.client.lPop(queueKey); + + if (value === null) { + return null; + } + + return JSON.parse(value) as QueueEntry; + } + + async queueDepth(threadId: string): Promise { + this.ensureConnected(); + + const queueKey = this.key("queue", threadId); + return await this.client.lLen(queueKey); + } + async getList(key: string): Promise { this.ensureConnected(); From 0df362d9bfa110ed7a274aa39df98cca341cddb7 Mon Sep 17 00:00:00 2001 From: Malte Ubl Date: Fri, 20 Mar 2026 16:18:49 -0700 Subject: [PATCH 2/9] test: comprehensive test coverage for concurrency strategies and queue methods Add tests across all state adapters and the Chat class: **MemoryStateAdapter** (8 new tests): - enqueue/dequeue single entry - dequeue from empty queue returns null - dequeue from nonexistent thread returns null - queueDepth returns 0 for empty queue - FIFO ordering across multiple entries - maxSize trimming (keeps newest) - maxSize=1 debounce behavior (last-write-wins) - queue isolation by thread - queue cleared on disconnect **PostgresStateAdapter** (8 new tests): - INSERT query for enqueue - overflow trimming query - depth return value - parsed entry from dequeue - null from empty dequeue - atomic DELETE-RETURNING for dequeue - queueDepth return value - zero depth for empty queue **RedisStateAdapter / IoRedisStateAdapter** (3+3 existence checks): - enqueue, dequeue, queueDepth method existence **Chat concurrency** (5 new tests): - drop-newest policy when queue is full - drop-oldest policy evicts oldest entries - expired entries skipped during drain - onNewMessage pattern handlers receive context - onSubscribedMessage handlers receive skipped context Total new tests: 27 (780 chat + 33 memory + 59 pg) Co-Authored-By: Claude Opus 4.6 (1M context) --- packages/chat/src/chat.test.ts | 221 +++++++++++++++++++++++ packages/state-ioredis/src/index.test.ts | 27 +++ packages/state-memory/src/index.test.ts | 173 ++++++++++++++++++ packages/state-pg/src/index.test.ts | 94 ++++++++++ packages/state-redis/src/index.test.ts | 24 +++ 5 files changed, 539 insertions(+) diff --git a/packages/chat/src/chat.test.ts b/packages/chat/src/chat.test.ts index 9d9c213d..e17adbe0 100644 --- a/packages/chat/src/chat.test.ts +++ b/packages/chat/src/chat.test.ts @@ -2494,6 +2494,227 @@ describe("Chat", () => { }); }); + describe("concurrency: queue edge cases", () => { + it("should drop newest when queue is full with drop-newest policy", async () => { + const state = createMockState(); + const adapter = createMockAdapter("slack"); + + const queueChat = new Chat({ + userName: "testbot", + adapters: { slack: adapter }, + state, + logger: mockLogger, + concurrency: { + strategy: "queue", + maxQueueSize: 2, + onQueueFull: "drop-newest", + }, + }); + + await queueChat.webhooks.slack( + new Request("http://test.com", { method: "POST" }) + ); + + queueChat.onNewMention(vi.fn().mockResolvedValue(undefined)); + + // Hold the lock + await state.acquireLock("slack:C123:1234.5678", 30000); + + // Enqueue 2 messages (fills the queue) + await queueChat.handleIncomingMessage( + adapter, + "slack:C123:1234.5678", + createTestMessage("msg-dq-1", "Hey @slack-bot one") + ); + await queueChat.handleIncomingMessage( + adapter, + "slack:C123:1234.5678", + createTestMessage("msg-dq-2", "Hey @slack-bot two") + ); + + // Third message should be silently dropped (drop-newest) + await queueChat.handleIncomingMessage( + adapter, + "slack:C123:1234.5678", + createTestMessage("msg-dq-3", "Hey @slack-bot three") + ); + + // Queue should still have depth 2 + expect(await state.queueDepth("slack:C123:1234.5678")).toBe(2); + }); + + it("should drop oldest when queue is full with drop-oldest policy", async () => { + const state = createMockState(); + const adapter = createMockAdapter("slack"); + + const queueChat = new Chat({ + userName: "testbot", + adapters: { slack: adapter }, + state, + logger: mockLogger, + concurrency: { + strategy: "queue", + maxQueueSize: 2, + onQueueFull: "drop-oldest", + }, + }); + + await queueChat.webhooks.slack( + new Request("http://test.com", { method: "POST" }) + ); + + const receivedMessages: string[] = []; + queueChat.onNewMention( + vi.fn().mockImplementation(async (_thread, message) => { + receivedMessages.push(message.text); + }) + ); + + // Hold the lock + await state.acquireLock("slack:C123:1234.5678", 30000); + + // Enqueue 3 messages with maxSize 2 → first should be evicted + await queueChat.handleIncomingMessage( + adapter, + "slack:C123:1234.5678", + createTestMessage("msg-do-1", "Hey @slack-bot one") + ); + await queueChat.handleIncomingMessage( + adapter, + "slack:C123:1234.5678", + createTestMessage("msg-do-2", "Hey @slack-bot two") + ); + await queueChat.handleIncomingMessage( + adapter, + "slack:C123:1234.5678", + createTestMessage("msg-do-3", "Hey @slack-bot three") + ); + + expect(await state.queueDepth("slack:C123:1234.5678")).toBe(2); + + // Release and trigger drain + await state.forceReleaseLock("slack:C123:1234.5678"); + await queueChat.handleIncomingMessage( + adapter, + "slack:C123:1234.5678", + createTestMessage("msg-do-4", "Hey @slack-bot four") + ); + + // msg-do-4 processed directly, then drain gets [msg-do-2, msg-do-3] + // (msg-do-1 was evicted), processes msg-do-3 with skipped [msg-do-2] + expect(receivedMessages[0]).toBe("Hey @slack-bot four"); + expect(receivedMessages[1]).toBe("Hey @slack-bot three"); + }); + + it("should skip expired entries during drain", async () => { + const state = createMockState(); + const adapter = createMockAdapter("slack"); + + const queueChat = new Chat({ + userName: "testbot", + adapters: { slack: adapter }, + state, + logger: mockLogger, + concurrency: { + strategy: "queue", + queueEntryTtlMs: 1, // Expire almost immediately + }, + }); + + await queueChat.webhooks.slack( + new Request("http://test.com", { method: "POST" }) + ); + + const receivedMessages: string[] = []; + queueChat.onNewMention( + vi.fn().mockImplementation(async (_thread, message) => { + receivedMessages.push(message.text); + }) + ); + + // Hold the lock + await state.acquireLock("slack:C123:1234.5678", 30000); + + // Enqueue a message with 1ms TTL + await queueChat.handleIncomingMessage( + adapter, + "slack:C123:1234.5678", + createTestMessage("msg-exp-1", "Hey @slack-bot expired") + ); + + // Wait for TTL to expire + await new Promise((resolve) => setTimeout(resolve, 10)); + + // Release and trigger drain + await state.forceReleaseLock("slack:C123:1234.5678"); + await queueChat.handleIncomingMessage( + adapter, + "slack:C123:1234.5678", + createTestMessage("msg-exp-2", "Hey @slack-bot fresh") + ); + + // Only the fresh message should be processed (expired one skipped) + expect(receivedMessages).toEqual(["Hey @slack-bot fresh"]); + }); + + it("should work with onNewMessage pattern handlers", async () => { + const state = createMockState(); + const adapter = createMockAdapter("slack"); + + const queueChat = new Chat({ + userName: "testbot", + adapters: { slack: adapter }, + state, + logger: mockLogger, + concurrency: "queue", + }); + + await queueChat.webhooks.slack( + new Request("http://test.com", { method: "POST" }) + ); + + const receivedMessages: string[] = []; + queueChat.onNewMessage( + HELP_REGEX, + vi.fn().mockImplementation(async (_thread, message, context) => { + receivedMessages.push(message.text); + if (context) { + for (const s of context.skipped) { + receivedMessages.push(`skipped:${s.text}`); + } + } + }) + ); + + // Hold the lock + await state.acquireLock("slack:C123:1234.5678", 30000); + + await queueChat.handleIncomingMessage( + adapter, + "slack:C123:1234.5678", + createTestMessage("msg-pat-1", "!help first") + ); + await queueChat.handleIncomingMessage( + adapter, + "slack:C123:1234.5678", + createTestMessage("msg-pat-2", "!help second") + ); + + // Release and trigger drain + await state.forceReleaseLock("slack:C123:1234.5678"); + await queueChat.handleIncomingMessage( + adapter, + "slack:C123:1234.5678", + createTestMessage("msg-pat-3", "!help third") + ); + + // Direct message processed, then drain with skipped context + expect(receivedMessages[0]).toBe("!help third"); + expect(receivedMessages[1]).toBe("!help second"); + expect(receivedMessages[2]).toBe("skipped:!help first"); + }); + }); + describe("concurrency: debounce", () => { it("should debounce the first message and process after delay", async () => { vi.useFakeTimers(); diff --git a/packages/state-ioredis/src/index.test.ts b/packages/state-ioredis/src/index.test.ts index 024ca6f2..d2ac6734 100644 --- a/packages/state-ioredis/src/index.test.ts +++ b/packages/state-ioredis/src/index.test.ts @@ -42,6 +42,33 @@ describe("IoRedisStateAdapter", () => { adapter.getClient().disconnect(); }); + it("should have enqueue method", () => { + const adapter = createIoRedisState({ + url: "redis://localhost:6379", + logger: mockLogger, + }); + expect(typeof adapter.enqueue).toBe("function"); + adapter.getClient().disconnect(); + }); + + it("should have dequeue method", () => { + const adapter = createIoRedisState({ + url: "redis://localhost:6379", + logger: mockLogger, + }); + expect(typeof adapter.dequeue).toBe("function"); + adapter.getClient().disconnect(); + }); + + it("should have queueDepth method", () => { + const adapter = createIoRedisState({ + url: "redis://localhost:6379", + logger: mockLogger, + }); + expect(typeof adapter.queueDepth).toBe("function"); + adapter.getClient().disconnect(); + }); + // Note: Integration tests with a real Redis instance would go here // but require a running Redis server, so they're skipped by default diff --git a/packages/state-memory/src/index.test.ts b/packages/state-memory/src/index.test.ts index b7563d7a..f98bbd4b 100644 --- a/packages/state-memory/src/index.test.ts +++ b/packages/state-memory/src/index.test.ts @@ -213,6 +213,179 @@ describe("MemoryStateAdapter", () => { }); }); + describe("enqueue / dequeue / queueDepth", () => { + it("should enqueue and dequeue a single entry", async () => { + const entry = { + message: { id: "m1", text: "hello" }, + enqueuedAt: Date.now(), + expiresAt: Date.now() + 90000, + }; + const depth = await adapter.enqueue("thread1", entry as never, 10); + expect(depth).toBe(1); + + const result = await adapter.dequeue("thread1"); + expect(result).toEqual(entry); + }); + + it("should return null when dequeuing from empty queue", async () => { + const result = await adapter.dequeue("thread1"); + expect(result).toBeNull(); + }); + + it("should return null when dequeuing from nonexistent thread", async () => { + const result = await adapter.dequeue("nonexistent"); + expect(result).toBeNull(); + }); + + it("should return 0 depth for empty queue", async () => { + const depth = await adapter.queueDepth("thread1"); + expect(depth).toBe(0); + }); + + it("should dequeue in FIFO order", async () => { + const entry1 = { + message: { id: "m1" }, + enqueuedAt: 1000, + expiresAt: Date.now() + 90000, + }; + const entry2 = { + message: { id: "m2" }, + enqueuedAt: 2000, + expiresAt: Date.now() + 90000, + }; + const entry3 = { + message: { id: "m3" }, + enqueuedAt: 3000, + expiresAt: Date.now() + 90000, + }; + + await adapter.enqueue("thread1", entry1 as never, 10); + await adapter.enqueue("thread1", entry2 as never, 10); + await adapter.enqueue("thread1", entry3 as never, 10); + + expect(await adapter.queueDepth("thread1")).toBe(3); + + const r1 = await adapter.dequeue("thread1"); + expect(r1?.message).toEqual({ id: "m1" }); + + const r2 = await adapter.dequeue("thread1"); + expect(r2?.message).toEqual({ id: "m2" }); + + const r3 = await adapter.dequeue("thread1"); + expect(r3?.message).toEqual({ id: "m3" }); + + expect(await adapter.dequeue("thread1")).toBeNull(); + expect(await adapter.queueDepth("thread1")).toBe(0); + }); + + it("should trim to maxSize keeping newest entries", async () => { + for (let i = 1; i <= 5; i++) { + await adapter.enqueue( + "thread1", + { + message: { id: `m${i}` }, + enqueuedAt: i * 1000, + expiresAt: Date.now() + 90000, + } as never, + 3 + ); + } + + expect(await adapter.queueDepth("thread1")).toBe(3); + + const r1 = await adapter.dequeue("thread1"); + expect(r1?.message).toEqual({ id: "m3" }); + + const r2 = await adapter.dequeue("thread1"); + expect(r2?.message).toEqual({ id: "m4" }); + + const r3 = await adapter.dequeue("thread1"); + expect(r3?.message).toEqual({ id: "m5" }); + }); + + it("should handle maxSize of 1 (debounce behavior)", async () => { + await adapter.enqueue( + "thread1", + { + message: { id: "m1" }, + enqueuedAt: 1000, + expiresAt: Date.now() + 90000, + } as never, + 1 + ); + await adapter.enqueue( + "thread1", + { + message: { id: "m2" }, + enqueuedAt: 2000, + expiresAt: Date.now() + 90000, + } as never, + 1 + ); + await adapter.enqueue( + "thread1", + { + message: { id: "m3" }, + enqueuedAt: 3000, + expiresAt: Date.now() + 90000, + } as never, + 1 + ); + + expect(await adapter.queueDepth("thread1")).toBe(1); + const result = await adapter.dequeue("thread1"); + expect(result?.message).toEqual({ id: "m3" }); + }); + + it("should keep queues isolated by thread", async () => { + await adapter.enqueue( + "thread-a", + { + message: { id: "a1" }, + enqueuedAt: 1000, + expiresAt: Date.now() + 90000, + } as never, + 10 + ); + await adapter.enqueue( + "thread-b", + { + message: { id: "b1" }, + enqueuedAt: 1000, + expiresAt: Date.now() + 90000, + } as never, + 10 + ); + + expect(await adapter.queueDepth("thread-a")).toBe(1); + expect(await adapter.queueDepth("thread-b")).toBe(1); + + const ra = await adapter.dequeue("thread-a"); + expect(ra?.message).toEqual({ id: "a1" }); + + const rb = await adapter.dequeue("thread-b"); + expect(rb?.message).toEqual({ id: "b1" }); + }); + + it("should clear queues on disconnect", async () => { + await adapter.enqueue( + "thread1", + { + message: { id: "m1" }, + enqueuedAt: 1000, + expiresAt: Date.now() + 90000, + } as never, + 10 + ); + + await adapter.disconnect(); + await adapter.connect(); + + expect(await adapter.queueDepth("thread1")).toBe(0); + expect(await adapter.dequeue("thread1")).toBeNull(); + }); + }); + describe("connection", () => { it("should throw when not connected", async () => { const newAdapter = createMemoryState(); diff --git a/packages/state-pg/src/index.test.ts b/packages/state-pg/src/index.test.ts index cb8c9b7d..bfcf414b 100644 --- a/packages/state-pg/src/index.test.ts +++ b/packages/state-pg/src/index.test.ts @@ -493,6 +493,100 @@ describe("PostgresStateAdapter", () => { }); }); + describe("enqueue / dequeue / queueDepth", () => { + it("should call INSERT for enqueue", async () => { + queryRows = [{ depth: "1" }]; + const entry = { + message: { id: "m1", text: "hello" }, + enqueuedAt: Date.now(), + expiresAt: Date.now() + 90000, + }; + await adapter.enqueue("thread1", entry as never, 10); + + const calls = (pool.query as ReturnType).mock.calls; + const insertCall = calls.find( + (c: unknown[]) => + typeof c[0] === "string" && + c[0].includes("INSERT INTO chat_state_queues") + ); + expect(insertCall).toBeTruthy(); + expect(insertCall[1]).toContain("chat-sdk"); + expect(insertCall[1]).toContain("thread1"); + }); + + it("should trim overflow when maxSize is specified", async () => { + queryRows = [{ depth: "1" }]; + const entry = { + message: { id: "m1" }, + enqueuedAt: Date.now(), + expiresAt: Date.now() + 90000, + }; + await adapter.enqueue("thread1", entry as never, 5); + + const calls = (pool.query as ReturnType).mock.calls; + const deleteCall = calls.find( + (c: unknown[]) => + typeof c[0] === "string" && + c[0].includes("DELETE FROM chat_state_queues") + ); + expect(deleteCall).toBeTruthy(); + }); + + it("should return depth from enqueue", async () => { + queryRows = [{ depth: "3" }]; + const entry = { + message: { id: "m1" }, + enqueuedAt: Date.now(), + expiresAt: Date.now() + 90000, + }; + const depth = await adapter.enqueue("thread1", entry as never, 10); + expect(depth).toBe(3); + }); + + it("should return parsed entry from dequeue", async () => { + const entry = { + message: { id: "m1", text: "hello" }, + enqueuedAt: 1000, + expiresAt: 91000, + }; + queryRows = [{ value: JSON.stringify(entry) }]; + const result = await adapter.dequeue("thread1"); + expect(result).toEqual(entry); + }); + + it("should return null from dequeue when queue is empty", async () => { + queryRows = []; + const result = await adapter.dequeue("thread1"); + expect(result).toBeNull(); + }); + + it("should call atomic DELETE-RETURNING for dequeue", async () => { + queryRows = []; + await adapter.dequeue("thread1"); + + const calls = (pool.query as ReturnType).mock.calls; + const deleteCall = calls.find( + (c: unknown[]) => + typeof c[0] === "string" && + c[0].includes("DELETE FROM chat_state_queues") && + c[0].includes("RETURNING value") + ); + expect(deleteCall).toBeTruthy(); + }); + + it("should return depth from queueDepth", async () => { + queryRows = [{ depth: "5" }]; + const depth = await adapter.queueDepth("thread1"); + expect(depth).toBe(5); + }); + + it("should return 0 depth when no rows exist", async () => { + queryRows = [{ depth: "0" }]; + const depth = await adapter.queueDepth("thread1"); + expect(depth).toBe(0); + }); + }); + describe("getClient", () => { it("should return the underlying client", () => { const client = adapter.getClient(); diff --git a/packages/state-redis/src/index.test.ts b/packages/state-redis/src/index.test.ts index 135fe022..14df0551 100644 --- a/packages/state-redis/src/index.test.ts +++ b/packages/state-redis/src/index.test.ts @@ -38,6 +38,30 @@ describe("RedisStateAdapter", () => { expect(typeof adapter.getList).toBe("function"); }); + it("should have enqueue method", () => { + const adapter = createRedisState({ + url: "redis://localhost:6379", + logger: mockLogger, + }); + expect(typeof adapter.enqueue).toBe("function"); + }); + + it("should have dequeue method", () => { + const adapter = createRedisState({ + url: "redis://localhost:6379", + logger: mockLogger, + }); + expect(typeof adapter.dequeue).toBe("function"); + }); + + it("should have queueDepth method", () => { + const adapter = createRedisState({ + url: "redis://localhost:6379", + logger: mockLogger, + }); + expect(typeof adapter.queueDepth).toBe("function"); + }); + // Note: Integration tests with a real Redis instance would go here // but require a running Redis server, so they're skipped by default From 7e90e02e37c62b933202883cc4578127fac36eb6 Mon Sep 17 00:00:00 2001 From: Malte Ubl Date: Fri, 20 Mar 2026 16:47:02 -0700 Subject: [PATCH 3/9] Address feedback --- packages/state-pg/src/index.test.ts | 67 +++++++++++++++++++++++++++++ packages/state-pg/src/index.ts | 29 ++++++++++--- 2 files changed, 90 insertions(+), 6 deletions(-) diff --git a/packages/state-pg/src/index.test.ts b/packages/state-pg/src/index.test.ts index bfcf414b..7809e249 100644 --- a/packages/state-pg/src/index.test.ts +++ b/packages/state-pg/src/index.test.ts @@ -494,6 +494,73 @@ describe("PostgresStateAdapter", () => { }); describe("enqueue / dequeue / queueDepth", () => { + it("should purge expired entries before enqueue", async () => { + queryRows = [{ depth: "1" }]; + const entry = { + message: { id: "m1" }, + enqueuedAt: Date.now(), + expiresAt: Date.now() + 90000, + }; + await adapter.enqueue("thread1", entry as never, 10); + + const calls = (pool.query as ReturnType).mock.calls; + const purgeCall = calls.find( + (c: unknown[]) => + typeof c[0] === "string" && + c[0].includes("DELETE FROM chat_state_queues") && + c[0].includes("expires_at <= now()") + ); + expect(purgeCall).toBeTruthy(); + }); + + it("should purge expired entries before dequeue", async () => { + queryRows = []; + await adapter.dequeue("thread1"); + + const calls = (pool.query as ReturnType).mock.calls; + const purgeCall = calls.find( + (c: unknown[]) => + typeof c[0] === "string" && + c[0].includes("DELETE FROM chat_state_queues") && + c[0].includes("expires_at <= now()") + ); + expect(purgeCall).toBeTruthy(); + }); + + it("should only count non-expired entries in queueDepth", async () => { + queryRows = [{ depth: "2" }]; + await adapter.queueDepth("thread1"); + + const calls = (pool.query as ReturnType).mock.calls; + const countCall = calls.find( + (c: unknown[]) => + typeof c[0] === "string" && + c[0].includes("count(*)") && + c[0].includes("expires_at > now()") + ); + expect(countCall).toBeTruthy(); + }); + + it("should only count non-expired entries in enqueue depth", async () => { + queryRows = [{ depth: "1" }]; + const entry = { + message: { id: "m1" }, + enqueuedAt: Date.now(), + expiresAt: Date.now() + 90000, + }; + await adapter.enqueue("thread1", entry as never, 10); + + const calls = (pool.query as ReturnType).mock.calls; + const countCall = calls.find( + (c: unknown[]) => + typeof c[0] === "string" && + c[0].includes("count(*)") && + c[0].includes("chat_state_queues") && + c[0].includes("expires_at > now()") + ); + expect(countCall).toBeTruthy(); + }); + it("should call INSERT for enqueue", async () => { queryRows = [{ depth: "1" }]; const entry = { diff --git a/packages/state-pg/src/index.ts b/packages/state-pg/src/index.ts index b3f2ebce..a56ee458 100644 --- a/packages/state-pg/src/index.ts +++ b/packages/state-pg/src/index.ts @@ -335,6 +335,13 @@ export class PostgresStateAdapter implements StateAdapter { const serialized = JSON.stringify(entry); const expiresAt = new Date(entry.expiresAt); + // Purge expired entries first to avoid phantom queue pressure + await this.pool.query( + `DELETE FROM chat_state_queues + WHERE key_prefix = $1 AND thread_id = $2 AND expires_at <= now()`, + [this.keyPrefix, threadId] + ); + // Insert the new entry await this.pool.query( `INSERT INTO chat_state_queues (key_prefix, thread_id, value, expires_at) @@ -342,17 +349,19 @@ export class PostgresStateAdapter implements StateAdapter { [this.keyPrefix, threadId, serialized, expiresAt] ); - // Trim overflow (keep newest maxSize entries) + // Trim overflow (keep newest maxSize non-expired entries) if (maxSize > 0) { await this.pool.query( `DELETE FROM chat_state_queues WHERE key_prefix = $1 AND thread_id = $2 AND seq IN ( SELECT seq FROM chat_state_queues WHERE key_prefix = $1 AND thread_id = $2 + AND expires_at > now() ORDER BY seq ASC OFFSET 0 LIMIT GREATEST( - (SELECT count(*) FROM chat_state_queues WHERE key_prefix = $1 AND thread_id = $2) - $3, + (SELECT count(*) FROM chat_state_queues + WHERE key_prefix = $1 AND thread_id = $2 AND expires_at > now()) - $3, 0 ) )`, @@ -360,10 +369,10 @@ export class PostgresStateAdapter implements StateAdapter { ); } - // Return current depth + // Return current non-expired depth const result = await this.pool.query( `SELECT count(*) as depth FROM chat_state_queues - WHERE key_prefix = $1 AND thread_id = $2`, + WHERE key_prefix = $1 AND thread_id = $2 AND expires_at > now()`, [this.keyPrefix, threadId] ); @@ -373,13 +382,21 @@ export class PostgresStateAdapter implements StateAdapter { async dequeue(threadId: string): Promise { this.ensureConnected(); - // Atomically select + delete the oldest entry + // Purge expired entries first + await this.pool.query( + `DELETE FROM chat_state_queues + WHERE key_prefix = $1 AND thread_id = $2 AND expires_at <= now()`, + [this.keyPrefix, threadId] + ); + + // Atomically select + delete the oldest non-expired entry const result = await this.pool.query( `DELETE FROM chat_state_queues WHERE key_prefix = $1 AND thread_id = $2 AND seq = ( SELECT seq FROM chat_state_queues WHERE key_prefix = $1 AND thread_id = $2 + AND expires_at > now() ORDER BY seq ASC LIMIT 1 ) @@ -399,7 +416,7 @@ export class PostgresStateAdapter implements StateAdapter { const result = await this.pool.query( `SELECT count(*) as depth FROM chat_state_queues - WHERE key_prefix = $1 AND thread_id = $2`, + WHERE key_prefix = $1 AND thread_id = $2 AND expires_at > now()`, [this.keyPrefix, threadId] ); From d08a84527ef5d59443d8c552ffee7cec7044259b Mon Sep 17 00:00:00 2001 From: Malte Ubl Date: Fri, 20 Mar 2026 19:00:18 -0700 Subject: [PATCH 4/9] Support a channel locking strategy, make it default for WhatsApp and Telegram --- packages/adapter-telegram/src/index.ts | 1 + packages/adapter-whatsapp/src/index.ts | 1 + packages/chat/src/chat.test.ts | 153 +++++++++++++++++++++++++ packages/chat/src/chat.ts | 109 ++++++++++++++---- packages/chat/src/index.ts | 2 + packages/chat/src/types.ts | 33 ++++++ 6 files changed, 277 insertions(+), 22 deletions(-) diff --git a/packages/adapter-telegram/src/index.ts b/packages/adapter-telegram/src/index.ts index a4028457..4cc46be8 100644 --- a/packages/adapter-telegram/src/index.ts +++ b/packages/adapter-telegram/src/index.ts @@ -189,6 +189,7 @@ export class TelegramAdapter implements Adapter { readonly name = "telegram"; + readonly lockScope = "channel" as const; readonly persistMessageHistory = true; private readonly botToken: string; diff --git a/packages/adapter-whatsapp/src/index.ts b/packages/adapter-whatsapp/src/index.ts index f6121f32..ee5de610 100644 --- a/packages/adapter-whatsapp/src/index.ts +++ b/packages/adapter-whatsapp/src/index.ts @@ -116,6 +116,7 @@ export class WhatsAppAdapter implements Adapter { readonly name = "whatsapp"; + readonly lockScope = "channel" as const; readonly persistMessageHistory = true; readonly userName: string; diff --git a/packages/chat/src/chat.test.ts b/packages/chat/src/chat.test.ts index e17adbe0..2930d50b 100644 --- a/packages/chat/src/chat.test.ts +++ b/packages/chat/src/chat.test.ts @@ -2854,6 +2854,159 @@ describe("Chat", () => { }); }); + describe("lockScope", () => { + it("should use threadId as lock key with default (thread) scope", async () => { + const state = createMockState(); + const adapter = createMockAdapter("slack"); + + const chat2 = new Chat({ + userName: "testbot", + adapters: { slack: adapter }, + state, + logger: mockLogger, + }); + + await chat2.webhooks.slack( + new Request("http://test.com", { method: "POST" }) + ); + + chat2.onNewMention(vi.fn().mockResolvedValue(undefined)); + + const msg = createTestMessage("msg-ls-1", "Hey @slack-bot"); + await chat2.handleIncomingMessage(adapter, "slack:C123:1234.5678", msg); + + // Lock should have been acquired on the full threadId + expect(state.acquireLock).toHaveBeenCalledWith( + "slack:C123:1234.5678", + expect.any(Number) + ); + }); + + it("should use channelId as lock key with channel scope on adapter", async () => { + const state = createMockState(); + const adapter = createMockAdapter("telegram"); + (adapter as { lockScope: string }).lockScope = "channel"; + + const chat2 = new Chat({ + userName: "testbot", + adapters: { telegram: adapter }, + state, + logger: mockLogger, + }); + + await chat2.webhooks.telegram( + new Request("http://test.com", { method: "POST" }) + ); + + chat2.onNewMention(vi.fn().mockResolvedValue(undefined)); + + const msg = createTestMessage("msg-ls-2", "Hey @telegram-bot"); + await chat2.handleIncomingMessage(adapter, "telegram:C123:topic456", msg); + + // channelIdFromThreadId returns first two parts: "telegram:C123" + expect(state.acquireLock).toHaveBeenCalledWith( + "telegram:C123", + expect.any(Number) + ); + }); + + it("should use channelId as lock key with channel scope on config", async () => { + const state = createMockState(); + const adapter = createMockAdapter("slack"); + + const chat2 = new Chat({ + userName: "testbot", + adapters: { slack: adapter }, + state, + logger: mockLogger, + lockScope: "channel", + }); + + await chat2.webhooks.slack( + new Request("http://test.com", { method: "POST" }) + ); + + chat2.onNewMention(vi.fn().mockResolvedValue(undefined)); + + const msg = createTestMessage("msg-ls-3", "Hey @slack-bot"); + await chat2.handleIncomingMessage(adapter, "slack:C123:1234.5678", msg); + + // channelIdFromThreadId returns "slack:C123" + expect(state.acquireLock).toHaveBeenCalledWith( + "slack:C123", + expect.any(Number) + ); + }); + + it("should support async lockScope resolver function", async () => { + const state = createMockState(); + const adapter = createMockAdapter("telegram"); + + const chat2 = new Chat({ + userName: "testbot", + adapters: { telegram: adapter }, + state, + logger: mockLogger, + lockScope: async ({ isDM }) => { + // Simulate async lookup (e.g., checking channel config in DB) + return isDM ? "thread" : "channel"; + }, + }); + + await chat2.webhooks.telegram( + new Request("http://test.com", { method: "POST" }) + ); + + chat2.onNewMention(vi.fn().mockResolvedValue(undefined)); + + // Non-DM: should use channel scope + const msg = createTestMessage("msg-ls-4", "Hey @telegram-bot"); + await chat2.handleIncomingMessage(adapter, "telegram:C123:topic456", msg); + + expect(state.acquireLock).toHaveBeenCalledWith( + "telegram:C123", + expect.any(Number) + ); + }); + + it("should queue on channel-scoped lock key", async () => { + const state = createMockState(); + const adapter = createMockAdapter("telegram"); + (adapter as { lockScope: string }).lockScope = "channel"; + + const chat2 = new Chat({ + userName: "testbot", + adapters: { telegram: adapter }, + state, + logger: mockLogger, + concurrency: "queue", + }); + + await chat2.webhooks.telegram( + new Request("http://test.com", { method: "POST" }) + ); + + chat2.onNewMention(vi.fn().mockResolvedValue(undefined)); + + // Pre-hold the channel lock to force the second message to enqueue + await state.acquireLock("telegram:C123", 30000); + + // Both messages from different topics should use the channel lock key + const msg1 = createTestMessage("msg-ls-5", "Hey @telegram-bot first"); + await chat2.handleIncomingMessage(adapter, "telegram:C123:topic1", msg1); + + const msg2 = createTestMessage("msg-ls-6", "Hey @telegram-bot second"); + await chat2.handleIncomingMessage(adapter, "telegram:C123:topic2", msg2); + + // Both should have been enqueued on the channel key (not topic keys) + const enqueueCalls = state.enqueue.mock.calls; + expect(enqueueCalls.length).toBe(2); + for (const call of enqueueCalls) { + expect(call[0]).toBe("telegram:C123"); + } + }); + }); + describe("persistMessageHistory", () => { it("should cache incoming messages when adapter has persistMessageHistory", async () => { const adapter = createMockAdapter("whatsapp"); diff --git a/packages/chat/src/chat.ts b/packages/chat/src/chat.ts index a98b3719..d6800733 100644 --- a/packages/chat/src/chat.ts +++ b/packages/chat/src/chat.ts @@ -28,6 +28,7 @@ import type { DirectMessageHandler, EmojiValue, Lock, + LockScope, Logger, LogLevel, MemberJoinedChannelEvent, @@ -202,6 +203,7 @@ export class Chat< private readonly _concurrencyConfig: Required< Omit >; + private readonly _lockScope: ChatConfig["lockScope"]; private readonly mentionHandlers: MentionHandler[] = []; private readonly directMessageHandlers: DirectMessageHandler[] = []; @@ -243,6 +245,7 @@ export class Chat< : "..."; this._dedupeTtlMs = config.dedupeTtlMs ?? DEDUPE_TTL_MS; this._onLockConflict = config.onLockConflict; + this._lockScope = config.lockScope; // Parse concurrency config — new `concurrency` option takes precedence over deprecated `onLockConflict` const concurrency = config.concurrency; @@ -1558,6 +1561,33 @@ export class Chat< ); } + /** + * Resolve the lock key for a message based on lock scope. + * With 'thread' scope, returns threadId. With 'channel' scope, + * returns channelId (derived via adapter.channelIdFromThreadId). + */ + private async getLockKey( + adapter: Adapter, + threadId: string + ): Promise { + const channelId = adapter.channelIdFromThreadId(threadId); + + let scope: LockScope; + if (typeof this._lockScope === "function") { + const isDM = adapter.isDM?.(threadId) ?? false; + scope = await this._lockScope({ + adapter, + channelId, + isDM, + threadId, + }); + } else { + scope = this._lockScope ?? adapter.lockScope ?? "thread"; + } + + return scope === "channel" ? channelId : threadId; + } + /** * Handle an incoming message from an adapter. * This is called by adapters when they receive a webhook. @@ -1622,6 +1652,9 @@ export class Chat< await Promise.all(appends); } + // Resolve lock key based on lock scope (thread vs channel) + const lockKey = await this.getLockKey(adapter, threadId); + // Route to the appropriate concurrency strategy const strategy = this._concurrencyStrategy; @@ -1631,12 +1664,18 @@ export class Chat< } if (strategy === "queue" || strategy === "debounce") { - await this.handleQueueOrDebounce(adapter, threadId, message, strategy); + await this.handleQueueOrDebounce( + adapter, + threadId, + lockKey, + message, + strategy + ); return; } // Default: 'drop' strategy (original behavior) - await this.handleDrop(adapter, threadId, message); + await this.handleDrop(adapter, threadId, lockKey, message); } /** @@ -1645,10 +1684,11 @@ export class Chat< private async handleDrop( adapter: Adapter, threadId: string, + lockKey: string, message: Message ): Promise { let lock = await this._stateAdapter.acquireLock( - threadId, + lockKey, DEFAULT_LOCK_TTL_MS ); if (!lock) { @@ -1658,28 +1698,38 @@ export class Chat< ? await this._onLockConflict(threadId, message) : (this._onLockConflict ?? "drop"); if (resolution === "force") { - this.logger.info("Force-releasing lock on thread", { threadId }); - await this._stateAdapter.forceReleaseLock(threadId); - lock = await this._stateAdapter.acquireLock( + this.logger.info("Force-releasing lock on thread", { threadId, + lockKey, + }); + await this._stateAdapter.forceReleaseLock(lockKey); + lock = await this._stateAdapter.acquireLock( + lockKey, DEFAULT_LOCK_TTL_MS ); } if (!lock) { - this.logger.warn("Could not acquire lock on thread", { threadId }); + this.logger.warn("Could not acquire lock on thread", { + threadId, + lockKey, + }); throw new LockError( `Could not acquire lock on thread ${threadId}. Another instance may be processing.` ); } } - this.logger.debug("Lock acquired", { threadId, token: lock.token }); + this.logger.debug("Lock acquired", { + threadId, + lockKey, + token: lock.token, + }); try { await this.dispatchToHandlers(adapter, threadId, message); } finally { await this._stateAdapter.releaseLock(lock); - this.logger.debug("Lock released", { threadId }); + this.logger.debug("Lock released", { threadId, lockKey }); } } @@ -1689,6 +1739,7 @@ export class Chat< private async handleQueueOrDebounce( adapter: Adapter, threadId: string, + lockKey: string, message: Message, strategy: "queue" | "debounce" ): Promise { @@ -1697,14 +1748,14 @@ export class Chat< // Try to acquire lock const lock = await this._stateAdapter.acquireLock( - threadId, + lockKey, DEFAULT_LOCK_TTL_MS ); if (!lock) { // Lock is busy — enqueue this message for later processing const effectiveMaxSize = strategy === "debounce" ? 1 : maxQueueSize; - const depth = await this._stateAdapter.queueDepth(threadId); + const depth = await this._stateAdapter.queueDepth(lockKey); if ( depth >= effectiveMaxSize && @@ -1713,6 +1764,7 @@ export class Chat< ) { this.logger.info("message-dropped", { threadId, + lockKey, messageId: message.id, reason: "queue-full", }); @@ -1720,7 +1772,7 @@ export class Chat< } await this._stateAdapter.enqueue( - threadId, + lockKey, { message, enqueuedAt: Date.now(), @@ -1733,6 +1785,7 @@ export class Chat< strategy === "debounce" ? "message-debounce-reset" : "message-queued", { threadId, + lockKey, messageId: message.id, queueDepth: Math.min(depth + 1, effectiveMaxSize), } @@ -1741,13 +1794,17 @@ export class Chat< } // We hold the lock - this.logger.debug("Lock acquired", { threadId, token: lock.token }); + this.logger.debug("Lock acquired", { + threadId, + lockKey, + token: lock.token, + }); try { if (strategy === "debounce") { // Debounce: enqueue our own message and enter the debounce loop await this._stateAdapter.enqueue( - threadId, + lockKey, { message, enqueuedAt: Date.now(), @@ -1757,18 +1814,19 @@ export class Chat< ); this.logger.info("message-debouncing", { threadId, + lockKey, messageId: message.id, debounceMs, }); - await this.debounceLoop(lock, adapter, threadId); + await this.debounceLoop(lock, adapter, threadId, lockKey); } else { // Queue: process our message immediately, then drain any queued messages await this.dispatchToHandlers(adapter, threadId, message); - await this.drainQueue(lock, adapter, threadId); + await this.drainQueue(lock, adapter, threadId, lockKey); } } finally { await this._stateAdapter.releaseLock(lock); - this.logger.debug("Lock released", { threadId }); + this.logger.debug("Lock released", { threadId, lockKey }); } } @@ -1779,7 +1837,8 @@ export class Chat< private async debounceLoop( lock: Lock, adapter: Adapter, - threadId: string + threadId: string, + lockKey: string ): Promise { const { debounceMs } = this._concurrencyConfig; @@ -1788,7 +1847,7 @@ export class Chat< await this._stateAdapter.extendLock(lock, DEFAULT_LOCK_TTL_MS); // Atomically take the pending message - const entry = await this._stateAdapter.dequeue(threadId); + const entry = await this._stateAdapter.dequeue(lockKey); if (!entry) { break; } @@ -1796,17 +1855,19 @@ export class Chat< if (Date.now() > entry.expiresAt) { this.logger.info("message-expired", { threadId, + lockKey, messageId: entry.message.id, }); continue; } // Check if anything new arrived during sleep - const depth = await this._stateAdapter.queueDepth(threadId); + const depth = await this._stateAdapter.queueDepth(lockKey); if (depth > 0) { // Newer message superseded this one — loop again this.logger.info("message-superseded", { threadId, + lockKey, droppedId: entry.message.id, }); continue; @@ -1815,6 +1876,7 @@ export class Chat< // Nothing new — this is the final message in the burst this.logger.info("message-dequeued", { threadId, + lockKey, messageId: entry.message.id, }); await this.dispatchToHandlers(adapter, threadId, entry.message); @@ -1829,13 +1891,14 @@ export class Chat< private async drainQueue( lock: Lock, adapter: Adapter, - threadId: string + threadId: string, + lockKey: string ): Promise { while (true) { // Collect all pending messages const pending: QueueEntry[] = []; while (true) { - const entry = await this._stateAdapter.dequeue(threadId); + const entry = await this._stateAdapter.dequeue(lockKey); if (!entry) { break; } @@ -1844,6 +1907,7 @@ export class Chat< } else { this.logger.info("message-expired", { threadId, + lockKey, messageId: entry.message.id, }); } @@ -1865,6 +1929,7 @@ export class Chat< this.logger.info("message-dequeued", { threadId, + lockKey, messageId: latest.message.id, skippedCount: skipped.length, totalSinceLastHandler: pending.length, diff --git a/packages/chat/src/index.ts b/packages/chat/src/index.ts index fe23caf4..03d6d167 100644 --- a/packages/chat/src/index.ts +++ b/packages/chat/src/index.ts @@ -290,6 +290,8 @@ export type { ListThreadsOptions, ListThreadsResult, Lock, + LockScope, + LockScopeContext, Logger, LogLevel, MarkdownTextChunk, diff --git a/packages/chat/src/types.ts b/packages/chat/src/types.ts index 3d275a69..0cc20cba 100644 --- a/packages/chat/src/types.ts +++ b/packages/chat/src/types.ts @@ -65,6 +65,19 @@ export interface ChatConfig< * wait until some real text has been streamed before creating the message. */ fallbackStreamingPlaceholderText?: string | null; + /** + * Lock scope determines which messages contend for the same lock. + * + * - `'thread'`: lock per threadId (default for most adapters) + * - `'channel'`: lock per channelId (default for WhatsApp, Telegram) + * - function: resolve scope dynamically per message (async supported) + * + * When not set, falls back to the adapter's `lockScope` property, + * then to `'thread'`. + */ + lockScope?: + | LockScope + | ((context: LockScopeContext) => LockScope | Promise); /** * Logger instance or log level. * Pass "silent" to disable all logging. @@ -265,6 +278,15 @@ export interface Adapter { channelId: string, options?: ListThreadsOptions ): Promise>; + + /** + * Default lock scope for this adapter. + * - `'thread'` (default): lock per threadId + * - `'channel'`: lock per channelId (for channel-based platforms like WhatsApp, Telegram) + * + * Can be overridden by `ChatConfig.lockScope`. + */ + readonly lockScope?: LockScope; /** Unique name for this adapter (e.g., "slack", "teams") */ readonly name: string; @@ -577,6 +599,17 @@ export interface ChatInstance { // Concurrency // ============================================================================= +/** Lock scope determines which messages contend for the same lock. */ +export type LockScope = "thread" | "channel"; + +/** Context provided to the lockScope resolver function. */ +export interface LockScopeContext { + adapter: Adapter; + channelId: string; + isDM: boolean; + threadId: string; +} + /** Concurrency strategy for overlapping messages on the same thread. */ export type ConcurrencyStrategy = "drop" | "queue" | "debounce" | "concurrent"; From 31ecdd404e565e94e1531672e6f8ca1328056235 Mon Sep 17 00:00:00 2001 From: Arif Kobel <102538661+ArifKobel@users.noreply.github.com> Date: Sun, 22 Mar 2026 00:14:00 +0100 Subject: [PATCH 5/9] =?UTF-8?q?docs:=20fix=20typo=20"Committment"=20?= =?UTF-8?q?=E2=86=92=20"Commitment"=20(#274)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Arif Kobel --- apps/docs/content/docs/contributing/building.mdx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/docs/content/docs/contributing/building.mdx b/apps/docs/content/docs/contributing/building.mdx index d38ccc53..a731590f 100644 --- a/apps/docs/content/docs/contributing/building.mdx +++ b/apps/docs/content/docs/contributing/building.mdx @@ -31,7 +31,7 @@ Chat SDK ships with Vercel-maintained adapters for Slack, Teams, Google Chat, Di #### Qualifications for vendor official tier -- Committment for continued maintenance of the adapter. +- Commitment for continued maintenance of the adapter. - GitHub hosting in official vendor-owned org. - Documentation of the adapter in primary vendor docs. - Announcement of the adapter in blog post or changelog and social media. From 838e46a4f054f2fcca95823c16824c8a595f04b6 Mon Sep 17 00:00:00 2001 From: Malte Ubl Date: Mon, 23 Mar 2026 07:21:43 -0700 Subject: [PATCH 6/9] Add webhook verification to GChat (#287) - Issues a warning if required env vars are not present (also for telegram) - Makes telegram use a time-safe verifier --- .changeset/few-cows-search.md | 5 + packages/adapter-gchat/README.md | 50 +++ packages/adapter-gchat/src/index.test.ts | 288 ++++++++++++++++++ packages/adapter-gchat/src/index.ts | 100 +++++- .../src/workspace-events.test.ts | 34 --- .../adapter-gchat/src/workspace-events.ts | 29 -- packages/adapter-telegram/src/index.ts | 20 +- 7 files changed, 461 insertions(+), 65 deletions(-) create mode 100644 .changeset/few-cows-search.md diff --git a/.changeset/few-cows-search.md b/.changeset/few-cows-search.md new file mode 100644 index 00000000..c7bbde31 --- /dev/null +++ b/.changeset/few-cows-search.md @@ -0,0 +1,5 @@ +--- +"@chat-adapter/gchat": patch +--- + +Introduce optional gchat signature verification diff --git a/packages/adapter-gchat/README.md b/packages/adapter-gchat/README.md index 6b00d1a7..e05be535 100644 --- a/packages/adapter-gchat/README.md +++ b/packages/adapter-gchat/README.md @@ -161,6 +161,8 @@ All options are auto-detected from environment variables when not provided. | `credentials` | No* | Service account credentials JSON. Auto-detected from `GOOGLE_CHAT_CREDENTIALS` | | `useApplicationDefaultCredentials` | No | Use Application Default Credentials. Auto-detected from `GOOGLE_CHAT_USE_ADC` | | `pubsubTopic` | No | Pub/Sub topic for Workspace Events. Auto-detected from `GOOGLE_CHAT_PUBSUB_TOPIC` | +| `pubsubAudience` | No | Expected JWT audience for Pub/Sub webhook verification. Auto-detected from `GOOGLE_CHAT_PUBSUB_AUDIENCE` | +| `googleChatProjectNumber` | No | GCP project number for direct webhook JWT verification. Auto-detected from `GOOGLE_CHAT_PROJECT_NUMBER` | | `impersonateUser` | No | User email for domain-wide delegation. Auto-detected from `GOOGLE_CHAT_IMPERSONATE_USER` | | `auth` | No | Custom auth object (advanced) | | `logger` | No | Logger instance (defaults to `ConsoleLogger("info")`) | @@ -175,8 +177,49 @@ GOOGLE_CHAT_CREDENTIALS={"type":"service_account",...} # Optional: for receiving all messages GOOGLE_CHAT_PUBSUB_TOPIC=projects/your-project/topics/chat-events GOOGLE_CHAT_IMPERSONATE_USER=admin@yourdomain.com + +# Optional: webhook verification (recommended for production) +GOOGLE_CHAT_PROJECT_NUMBER=123456789 # For direct webhook JWT verification +GOOGLE_CHAT_PUBSUB_AUDIENCE=https://your-domain.com/api/webhooks/gchat # For Pub/Sub JWT verification +``` + +## Webhook verification + +The adapter supports JWT verification for both webhook types. When configured, the adapter validates the `Authorization: Bearer ` header on incoming requests using Google's public keys. Requests with missing or invalid tokens are rejected with HTTP 401. + +Verification is opt-in — when the config options are not set, webhooks are accepted without signature checks (for backward compatibility and development). + +### Direct webhooks (Google Chat API) + +Google Chat sends a signed JWT with every webhook request. The JWT audience (`aud` claim) is your GCP project number. + +```typescript +createGoogleChatAdapter({ + googleChatProjectNumber: "123456789", +}); +``` + +Find your project number in the [GCP Console dashboard](https://console.cloud.google.com/home/dashboard) (it's different from the project ID). + +### Pub/Sub push messages + +Google Cloud Pub/Sub sends a signed OIDC JWT with push deliveries. The JWT audience is whatever you configured on the push subscription. + +```typescript +createGoogleChatAdapter({ + pubsubAudience: "https://your-domain.com/api/webhooks/gchat", +}); ``` +To enable authenticated push on your Pub/Sub subscription: + +1. Go to **Pub/Sub** then **Subscriptions** +2. Edit your push subscription +3. Enable **Authentication** +4. Select a service account with the **Service Account Token Creator** role +5. Set the **Audience** to your webhook URL +6. Use the same URL as `GOOGLE_CHAT_PUBSUB_AUDIENCE` + ## Features ### Messaging @@ -237,6 +280,13 @@ Fetching message history requires domain-wide delegation with the `impersonateUs ## Troubleshooting +### 401 Unauthorized on webhooks + +- For direct webhooks: verify `GOOGLE_CHAT_PROJECT_NUMBER` matches your GCP project number (not project ID) +- For Pub/Sub: verify `GOOGLE_CHAT_PUBSUB_AUDIENCE` matches the audience configured on your push subscription +- Check that authentication is enabled on your Pub/Sub push subscription +- Ensure the service account used for push authentication has the **Service Account Token Creator** role + ### No webhook received - Verify the App URL is correct in Google Chat configuration diff --git a/packages/adapter-gchat/src/index.test.ts b/packages/adapter-gchat/src/index.test.ts index b6e937ff..2549451c 100644 --- a/packages/adapter-gchat/src/index.test.ts +++ b/packages/adapter-gchat/src/index.test.ts @@ -1,4 +1,5 @@ import { AdapterRateLimitError } from "@chat-adapter/shared"; +import { auth } from "@googleapis/chat"; import type { ChatInstance, Lock, Logger, StateAdapter } from "chat"; import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; import { @@ -151,6 +152,8 @@ async function createInitializedAdapter(opts?: { pubsubTopic?: string; userName?: string; endpointUrl?: string; + googleChatProjectNumber?: string; + pubsubAudience?: string; }) { const adapter = createGoogleChatAdapter({ credentials: TEST_CREDENTIALS, @@ -158,6 +161,8 @@ async function createInitializedAdapter(opts?: { pubsubTopic: opts?.pubsubTopic, userName: opts?.userName, endpointUrl: opts?.endpointUrl, + googleChatProjectNumber: opts?.googleChatProjectNumber, + pubsubAudience: opts?.pubsubAudience, }); const mockState = createMockStateAdapter(); const mockChat = createMockChatInstance(mockState); @@ -2704,4 +2709,287 @@ describe("GoogleChatAdapter", () => { }); }); }); + + describe("webhook verification", () => { + let verifyIdTokenSpy: ReturnType; + + beforeEach(() => { + verifyIdTokenSpy = vi + .spyOn(auth.OAuth2.prototype, "verifyIdToken") + .mockRejectedValue(new Error("Invalid token")); + }); + + afterEach(() => { + verifyIdTokenSpy.mockRestore(); + }); + + it("should reject direct webhook without Authorization header when project number is configured", async () => { + const { adapter } = await createInitializedAdapter({ + googleChatProjectNumber: "123456789", + }); + + const event = makeMessageEvent({ messageText: "Hello" }); + const request = new Request("https://example.com/webhook", { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify(event), + }); + + const response = await adapter.handleWebhook(request); + expect(response.status).toBe(401); + // Should not even call verifyIdToken — no Bearer token present + expect(verifyIdTokenSpy).not.toHaveBeenCalled(); + }); + + it("should reject direct webhook with invalid Bearer token when project number is configured", async () => { + const { adapter } = await createInitializedAdapter({ + googleChatProjectNumber: "123456789", + }); + + const event = makeMessageEvent({ messageText: "Hello" }); + const request = new Request("https://example.com/webhook", { + method: "POST", + headers: { + "content-type": "application/json", + authorization: "Bearer invalid-token", + }, + body: JSON.stringify(event), + }); + + const response = await adapter.handleWebhook(request); + expect(response.status).toBe(401); + expect(verifyIdTokenSpy).toHaveBeenCalledWith({ + idToken: "invalid-token", + audience: "123456789", + }); + }); + + it("should allow direct webhook with valid Bearer token when project number is configured", async () => { + verifyIdTokenSpy.mockResolvedValue({ + getPayload: () => ({ + iss: "chat@system.gserviceaccount.com", + aud: "123456789", + email: "chat@system.gserviceaccount.com", + }), + }); + + const { adapter } = await createInitializedAdapter({ + googleChatProjectNumber: "123456789", + }); + + const event = makeMessageEvent({ messageText: "Hello" }); + const request = new Request("https://example.com/webhook", { + method: "POST", + headers: { + "content-type": "application/json", + authorization: "Bearer valid-google-jwt", + }, + body: JSON.stringify(event), + }); + + const response = await adapter.handleWebhook(request); + expect(response.status).toBe(200); + expect(verifyIdTokenSpy).toHaveBeenCalledWith({ + idToken: "valid-google-jwt", + audience: "123456789", + }); + }); + + it("should allow direct webhook without verification when project number is not configured and warn once", async () => { + const { adapter } = await createInitializedAdapter(); + + const event = makeMessageEvent({ messageText: "Hello" }); + const request1 = new Request("https://example.com/webhook", { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify(event), + }); + const request2 = new Request("https://example.com/webhook", { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify(event), + }); + + const response1 = await adapter.handleWebhook(request1); + expect(response1.status).toBe(200); + expect(verifyIdTokenSpy).not.toHaveBeenCalled(); + expect(mockLogger.warn).toHaveBeenCalledWith( + expect.stringContaining("GOOGLE_CHAT_PROJECT_NUMBER") + ); + + // Second call should not warn again + (mockLogger.warn as ReturnType).mockClear(); + await adapter.handleWebhook(request2); + expect(mockLogger.warn).not.toHaveBeenCalledWith( + expect.stringContaining("GOOGLE_CHAT_PROJECT_NUMBER") + ); + }); + + it("should reject Pub/Sub webhook without Authorization header when pubsubAudience is configured", async () => { + const { adapter } = await createInitializedAdapter({ + pubsubAudience: "https://example.com/webhook/pubsub", + }); + + const pubsubMessage = makePubSubPushMessage({ + message: { + name: "spaces/ABC123/messages/msg1", + text: "Hello", + sender: { name: "users/100", displayName: "User", type: "HUMAN" }, + createTime: new Date().toISOString(), + }, + }); + + const request = new Request("https://example.com/webhook", { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify(pubsubMessage), + }); + + const response = await adapter.handleWebhook(request); + expect(response.status).toBe(401); + }); + + it("should allow Pub/Sub webhook without verification when pubsubAudience is not configured and warn once", async () => { + const { adapter } = await createInitializedAdapter(); + + const makePubSubRequest = () => { + const pubsubMessage = makePubSubPushMessage({ + message: { + name: "spaces/ABC123/messages/msg1", + text: "Hello", + sender: { name: "users/100", displayName: "User", type: "HUMAN" }, + createTime: new Date().toISOString(), + }, + }); + return new Request("https://example.com/webhook", { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify(pubsubMessage), + }); + }; + + const response1 = await adapter.handleWebhook(makePubSubRequest()); + expect(response1.status).toBe(200); + expect(verifyIdTokenSpy).not.toHaveBeenCalled(); + expect(mockLogger.warn).toHaveBeenCalledWith( + expect.stringContaining("GOOGLE_CHAT_PUBSUB_AUDIENCE") + ); + + // Second call should not warn again + (mockLogger.warn as ReturnType).mockClear(); + await adapter.handleWebhook(makePubSubRequest()); + expect(mockLogger.warn).not.toHaveBeenCalledWith( + expect.stringContaining("GOOGLE_CHAT_PUBSUB_AUDIENCE") + ); + }); + + it("should reject Pub/Sub webhook with invalid token when pubsubAudience is configured", async () => { + const { adapter } = await createInitializedAdapter({ + pubsubAudience: "https://example.com/webhook/pubsub", + }); + + const pubsubMessage = makePubSubPushMessage({ + message: { + name: "spaces/ABC123/messages/msg1", + text: "Hello", + sender: { name: "users/100", displayName: "User", type: "HUMAN" }, + createTime: new Date().toISOString(), + }, + }); + + const request = new Request("https://example.com/webhook", { + method: "POST", + headers: { + "content-type": "application/json", + authorization: "Bearer bad-token", + }, + body: JSON.stringify(pubsubMessage), + }); + + const response = await adapter.handleWebhook(request); + expect(response.status).toBe(401); + expect(verifyIdTokenSpy).toHaveBeenCalledWith({ + idToken: "bad-token", + audience: "https://example.com/webhook/pubsub", + }); + }); + + it("should allow Pub/Sub webhook with valid token when pubsubAudience is configured", async () => { + verifyIdTokenSpy.mockResolvedValue({ + getPayload: () => ({ + iss: "accounts.google.com", + aud: "https://example.com/webhook/pubsub", + email: "pubsub@my-project.iam.gserviceaccount.com", + }), + }); + + const { adapter } = await createInitializedAdapter({ + pubsubAudience: "https://example.com/webhook/pubsub", + }); + + const pubsubMessage = makePubSubPushMessage({ + message: { + name: "spaces/ABC123/messages/msg1", + text: "Hello", + sender: { name: "users/100", displayName: "User", type: "HUMAN" }, + createTime: new Date().toISOString(), + }, + }); + + const request = new Request("https://example.com/webhook", { + method: "POST", + headers: { + "content-type": "application/json", + authorization: "Bearer valid-pubsub-jwt", + }, + body: JSON.stringify(pubsubMessage), + }); + + const response = await adapter.handleWebhook(request); + expect(response.status).toBe(200); + }); + + it("should reject request with non-Bearer Authorization scheme", async () => { + const { adapter } = await createInitializedAdapter({ + googleChatProjectNumber: "123456789", + }); + + const event = makeMessageEvent({ messageText: "Hello" }); + const request = new Request("https://example.com/webhook", { + method: "POST", + headers: { + "content-type": "application/json", + authorization: "Basic dXNlcjpwYXNz", + }, + body: JSON.stringify(event), + }); + + const response = await adapter.handleWebhook(request); + expect(response.status).toBe(401); + expect(verifyIdTokenSpy).not.toHaveBeenCalled(); + }); + + it("should reject when verifyIdToken returns no payload", async () => { + verifyIdTokenSpy.mockResolvedValue({ + getPayload: () => undefined, + }); + + const { adapter } = await createInitializedAdapter({ + googleChatProjectNumber: "123456789", + }); + + const event = makeMessageEvent({ messageText: "Hello" }); + const request = new Request("https://example.com/webhook", { + method: "POST", + headers: { + "content-type": "application/json", + authorization: "Bearer token-no-payload", + }, + body: JSON.stringify(event), + }); + + const response = await adapter.handleWebhook(request); + expect(response.status).toBe(401); + }); + }); }); diff --git a/packages/adapter-gchat/src/index.ts b/packages/adapter-gchat/src/index.ts index db1df548..488c674a 100644 --- a/packages/adapter-gchat/src/index.ts +++ b/packages/adapter-gchat/src/index.ts @@ -76,6 +76,13 @@ export interface GoogleChatAdapterBaseConfig { * Should be the full URL of your webhook endpoint (e.g., "https://your-app.vercel.app/api/webhooks/gchat") */ endpointUrl?: string; + /** + * Google Cloud project number for verifying direct webhook JWTs. + * When set, the adapter verifies the Bearer token on incoming Google Chat webhooks + * by checking the JWT audience matches this project number. + * Defaults to GOOGLE_CHAT_PROJECT_NUMBER env var. + */ + googleChatProjectNumber?: string; /** * User email to impersonate for Workspace Events API calls. * Required when using domain-wide delegation. @@ -85,6 +92,13 @@ export interface GoogleChatAdapterBaseConfig { impersonateUser?: string; /** Logger instance for error reporting. Defaults to ConsoleLogger. */ logger?: Logger; + /** + * Expected audience for Pub/Sub push message JWT verification. + * Typically the push endpoint URL configured in your Pub/Sub subscription. + * When set, the adapter verifies the Authorization Bearer token on Pub/Sub messages. + * Defaults to GOOGLE_CHAT_PUBSUB_AUDIENCE env var. + */ + pubsubAudience?: string; /** * Pub/Sub topic for receiving all messages via Workspace Events. * When set, the adapter will automatically create subscriptions when added to a space. @@ -277,6 +291,15 @@ export class GoogleChatAdapter implements Adapter { private readonly impersonatedChatApi?: chat_v1.Chat; /** HTTP endpoint URL for button click actions */ private endpointUrl?: string; + /** Google Cloud project number for verifying direct webhook JWTs */ + private readonly googleChatProjectNumber?: string; + /** Expected audience for Pub/Sub push message JWT verification */ + private readonly pubsubAudience?: string; + /** OAuth2 client for verifying Google-signed JWTs */ + private readonly oauth2Client = new auth.OAuth2(); + /** Track whether we've already warned about missing verification config */ + private warnedNoWebhookVerification = false; + private warnedNoPubsubVerification = false; /** User info cache for display name lookups - initialized later in initialize() */ private userInfoCache: UserInfoCache; @@ -292,6 +315,10 @@ export class GoogleChatAdapter implements Adapter { this.impersonateUser = config.impersonateUser ?? process.env.GOOGLE_CHAT_IMPERSONATE_USER; this.endpointUrl = config.endpointUrl; + this.googleChatProjectNumber = + config.googleChatProjectNumber ?? process.env.GOOGLE_CHAT_PROJECT_NUMBER; + this.pubsubAudience = + config.pubsubAudience ?? process.env.GOOGLE_CHAT_PUBSUB_AUDIENCE; let authClient: Parameters[0]["auth"]; @@ -628,6 +655,47 @@ export class GoogleChatAdapter implements Adapter { return null; } + /** + * Verify a Google-signed JWT Bearer token from the Authorization header. + * Used for both direct Google Chat webhooks and Pub/Sub push messages. + * + * @param request - The incoming HTTP request + * @param expectedAudience - The expected audience claim in the JWT + * @returns true if verification succeeds or is not configured + */ + private async verifyBearerToken( + request: Request, + expectedAudience: string + ): Promise { + const authHeader = request.headers.get("authorization"); + if (!authHeader?.startsWith("Bearer ")) { + this.logger.warn("Missing or invalid Authorization header"); + return false; + } + + const token = authHeader.slice(7); + try { + const ticket = await this.oauth2Client.verifyIdToken({ + idToken: token, + audience: expectedAudience, + }); + const payload = ticket.getPayload(); + if (!payload) { + this.logger.warn("JWT verification returned no payload"); + return false; + } + this.logger.debug("JWT verified", { + iss: payload.iss, + aud: payload.aud, + email: payload.email, + }); + return true; + } catch (error) { + this.logger.warn("JWT verification failed", { error }); + return false; + } + } + async handleWebhook( request: Request, options?: WebhookOptions @@ -660,9 +728,40 @@ export class GoogleChatAdapter implements Adapter { // Check if this is a Pub/Sub push message (from Workspace Events subscription) const maybePubSub = parsed as PubSubPushMessage; if (maybePubSub.message?.data && maybePubSub.subscription) { + // Verify Pub/Sub JWT if audience is configured + if (this.pubsubAudience) { + const valid = await this.verifyBearerToken( + request, + this.pubsubAudience + ); + if (!valid) { + return new Response("Unauthorized", { status: 401 }); + } + } else if (!this.warnedNoPubsubVerification) { + this.warnedNoPubsubVerification = true; + this.logger.warn( + "Pub/Sub webhook verification is disabled. Set GOOGLE_CHAT_PUBSUB_AUDIENCE or pubsubAudience to verify incoming requests." + ); + } return this.handlePubSubMessage(maybePubSub, options); } + // Verify direct Google Chat webhook JWT if project number is configured + if (this.googleChatProjectNumber) { + const valid = await this.verifyBearerToken( + request, + this.googleChatProjectNumber + ); + if (!valid) { + return new Response("Unauthorized", { status: 401 }); + } + } else if (!this.warnedNoWebhookVerification) { + this.warnedNoWebhookVerification = true; + this.logger.warn( + "Google Chat webhook verification is disabled. Set GOOGLE_CHAT_PROJECT_NUMBER or googleChatProjectNumber to verify incoming requests." + ); + } + // Otherwise, treat as a direct Google Chat webhook event const event = parsed as GoogleChatEvent; @@ -2548,7 +2647,6 @@ export { listSpaceSubscriptions, type PubSubPushMessage, type SpaceSubscriptionResult, - verifyPubSubRequest, type WorkspaceEventNotification, type WorkspaceEventsAuthOptions, } from "./workspace-events"; diff --git a/packages/adapter-gchat/src/workspace-events.test.ts b/packages/adapter-gchat/src/workspace-events.test.ts index 96fb760c..1183a976 100644 --- a/packages/adapter-gchat/src/workspace-events.test.ts +++ b/packages/adapter-gchat/src/workspace-events.test.ts @@ -5,7 +5,6 @@ import { decodePubSubMessage, deleteSpaceSubscription, listSpaceSubscriptions, - verifyPubSubRequest, } from "./workspace-events"; vi.mock("@googleapis/workspaceevents", () => ({ @@ -88,39 +87,6 @@ describe("decodePubSubMessage", () => { }); }); -describe("verifyPubSubRequest", () => { - it("should reject non-POST requests", () => { - const req = new Request("https://example.com/webhook", { - method: "GET", - }); - expect(verifyPubSubRequest(req)).toBe(false); - }); - - it("should reject wrong content-type", () => { - const req = new Request("https://example.com/webhook", { - method: "POST", - headers: { "content-type": "text/plain" }, - }); - expect(verifyPubSubRequest(req)).toBe(false); - }); - - it("should accept valid POST with JSON content-type", () => { - const req = new Request("https://example.com/webhook", { - method: "POST", - headers: { "content-type": "application/json" }, - }); - expect(verifyPubSubRequest(req)).toBe(true); - }); - - it("should accept application/json with charset", () => { - const req = new Request("https://example.com/webhook", { - method: "POST", - headers: { "content-type": "application/json; charset=utf-8" }, - }); - expect(verifyPubSubRequest(req)).toBe(true); - }); -}); - describe("createSpaceSubscription", () => { it("should return name and expireTime when operation is done", async () => { const { workspaceevents } = await import("@googleapis/workspaceevents"); diff --git a/packages/adapter-gchat/src/workspace-events.ts b/packages/adapter-gchat/src/workspace-events.ts index 046f707b..5a38dcdb 100644 --- a/packages/adapter-gchat/src/workspace-events.ts +++ b/packages/adapter-gchat/src/workspace-events.ts @@ -318,32 +318,3 @@ export function decodePubSubMessage( reaction: payload.reaction, }; } - -/** - * Verify a Pub/Sub push message is authentic. - * In production, you should verify the JWT token in the Authorization header. - * - * @see https://cloud.google.com/pubsub/docs/authenticate-push-subscriptions - */ -export function verifyPubSubRequest( - request: Request, - _expectedAudience?: string -): boolean { - // Basic check - Pub/Sub always sends POST with specific content type - if (request.method !== "POST") { - return false; - } - - const contentType = request.headers.get("content-type"); - if (!contentType?.includes("application/json")) { - return false; - } - - // For full verification, you would: - // 1. Extract the Bearer token from Authorization header - // 2. Verify it's a valid Google-signed JWT - // 3. Check the audience matches your endpoint - // This requires additional setup - see Google's docs - - return true; -} diff --git a/packages/adapter-telegram/src/index.ts b/packages/adapter-telegram/src/index.ts index 4cc46be8..3993ac7b 100644 --- a/packages/adapter-telegram/src/index.ts +++ b/packages/adapter-telegram/src/index.ts @@ -1,3 +1,4 @@ +import { timingSafeEqual } from "node:crypto"; import { AdapterRateLimitError, AuthenticationError, @@ -195,6 +196,7 @@ export class TelegramAdapter private readonly botToken: string; private readonly apiBaseUrl: string; private readonly secretToken?: string; + private warnedNoVerification = false; private readonly logger: Logger; private readonly formatConverter = new TelegramFormatConverter(); private readonly messageCache = new Map< @@ -314,12 +316,28 @@ export class TelegramAdapter ): Promise { if (this.secretToken) { const headerToken = request.headers.get(TELEGRAM_SECRET_TOKEN_HEADER); - if (headerToken !== this.secretToken) { + let valid = false; + try { + valid = + !!headerToken && + timingSafeEqual( + Buffer.from(headerToken), + Buffer.from(this.secretToken) + ); + } catch { + // Length mismatch throws — treat as invalid + } + if (!valid) { this.logger.warn( "Telegram webhook rejected due to invalid secret token" ); return new Response("Invalid secret token", { status: 401 }); } + } else if (!this.warnedNoVerification) { + this.warnedNoVerification = true; + this.logger.warn( + "Telegram webhook verification is disabled. Set TELEGRAM_WEBHOOK_SECRET_TOKEN or secretToken to verify incoming requests." + ); } let update: TelegramUpdate; From a0580b8697fbb7588f602e3b38a5dcc378d452d4 Mon Sep 17 00:00:00 2001 From: Malte Ubl Date: Mon, 23 Mar 2026 07:57:42 -0700 Subject: [PATCH 7/9] Make adapters depend on `chat` as a real dep (#289) Without this, changeset will make any dep change a major change --- .changeset/brown-tires-lick.md | 12 ++++++++++++ packages/adapter-discord/package.json | 4 +--- packages/adapter-gchat/package.json | 4 +--- packages/adapter-github/package.json | 4 +--- packages/adapter-linear/package.json | 4 +--- packages/adapter-slack/package.json | 4 +--- packages/adapter-teams/package.json | 4 +--- packages/adapter-telegram/package.json | 4 +--- packages/adapter-whatsapp/package.json | 4 +--- 9 files changed, 20 insertions(+), 24 deletions(-) create mode 100644 .changeset/brown-tires-lick.md diff --git a/.changeset/brown-tires-lick.md b/.changeset/brown-tires-lick.md new file mode 100644 index 00000000..10eee9eb --- /dev/null +++ b/.changeset/brown-tires-lick.md @@ -0,0 +1,12 @@ +--- +"@chat-adapter/telegram": minor +"@chat-adapter/whatsapp": minor +"@chat-adapter/discord": minor +"@chat-adapter/github": minor +"@chat-adapter/linear": minor +"@chat-adapter/gchat": minor +"@chat-adapter/slack": minor +"@chat-adapter/teams": minor +--- + +Switch adapters from optional dep to full dep on chat diff --git a/packages/adapter-discord/package.json b/packages/adapter-discord/package.json index 96275d49..3ca8d3a5 100644 --- a/packages/adapter-discord/package.json +++ b/packages/adapter-discord/package.json @@ -25,13 +25,11 @@ }, "dependencies": { "@chat-adapter/shared": "workspace:*", + "chat": "workspace:*", "discord-api-types": "^0.37.119", "discord-interactions": "^4.4.0", "discord.js": "^14.25.1" }, - "peerDependencies": { - "chat": "workspace:*" - }, "devDependencies": { "@types/node": "^25.3.2", "tsup": "^8.3.5", diff --git a/packages/adapter-gchat/package.json b/packages/adapter-gchat/package.json index 20536de1..eeb6c813 100644 --- a/packages/adapter-gchat/package.json +++ b/packages/adapter-gchat/package.json @@ -26,9 +26,7 @@ "dependencies": { "@chat-adapter/shared": "workspace:*", "@googleapis/chat": "^44.6.0", - "@googleapis/workspaceevents": "^9.1.0" - }, - "peerDependencies": { + "@googleapis/workspaceevents": "^9.1.0", "chat": "workspace:*" }, "devDependencies": { diff --git a/packages/adapter-github/package.json b/packages/adapter-github/package.json index df62fe06..a21575d0 100644 --- a/packages/adapter-github/package.json +++ b/packages/adapter-github/package.json @@ -26,9 +26,7 @@ "dependencies": { "@chat-adapter/shared": "workspace:*", "@octokit/auth-app": "^8.2.0", - "@octokit/rest": "^22.0.1" - }, - "peerDependencies": { + "@octokit/rest": "^22.0.1", "chat": "workspace:*" }, "devDependencies": { diff --git a/packages/adapter-linear/package.json b/packages/adapter-linear/package.json index 0033eb0a..53a5d73b 100644 --- a/packages/adapter-linear/package.json +++ b/packages/adapter-linear/package.json @@ -25,9 +25,7 @@ }, "dependencies": { "@chat-adapter/shared": "workspace:*", - "@linear/sdk": "^76.0.0" - }, - "peerDependencies": { + "@linear/sdk": "^76.0.0", "chat": "workspace:*" }, "devDependencies": { diff --git a/packages/adapter-slack/package.json b/packages/adapter-slack/package.json index 8c2eb42c..a2ad97a4 100644 --- a/packages/adapter-slack/package.json +++ b/packages/adapter-slack/package.json @@ -25,9 +25,7 @@ }, "dependencies": { "@chat-adapter/shared": "workspace:*", - "@slack/web-api": "^7.14.0" - }, - "peerDependencies": { + "@slack/web-api": "^7.14.0", "chat": "workspace:*" }, "devDependencies": { diff --git a/packages/adapter-teams/package.json b/packages/adapter-teams/package.json index ccccdc61..4ecf7074 100644 --- a/packages/adapter-teams/package.json +++ b/packages/adapter-teams/package.json @@ -27,9 +27,7 @@ "@azure/identity": "^4.13.0", "@chat-adapter/shared": "workspace:*", "botbuilder": "^4.23.1", - "botframework-connector": "^4.23.3" - }, - "peerDependencies": { + "botframework-connector": "^4.23.3", "chat": "workspace:*" }, "devDependencies": { diff --git a/packages/adapter-telegram/package.json b/packages/adapter-telegram/package.json index 81f07580..5f26bb38 100644 --- a/packages/adapter-telegram/package.json +++ b/packages/adapter-telegram/package.json @@ -24,9 +24,7 @@ "clean": "rm -rf dist" }, "dependencies": { - "@chat-adapter/shared": "workspace:*" - }, - "peerDependencies": { + "@chat-adapter/shared": "workspace:*", "chat": "workspace:*" }, "devDependencies": { diff --git a/packages/adapter-whatsapp/package.json b/packages/adapter-whatsapp/package.json index e006a62e..aa90689f 100644 --- a/packages/adapter-whatsapp/package.json +++ b/packages/adapter-whatsapp/package.json @@ -24,9 +24,7 @@ "clean": "rm -rf dist" }, "dependencies": { - "@chat-adapter/shared": "workspace:*" - }, - "peerDependencies": { + "@chat-adapter/shared": "workspace:*", "chat": "workspace:*" }, "devDependencies": { From 4fb38dce492c10da0ddf0639e12f0c23e8bff2c4 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Mon, 23 Mar 2026 08:04:36 -0700 Subject: [PATCH 8/9] Version Packages (#290) Co-authored-by: github-actions[bot] <41898282+github-actions[bot]@users.noreply.github.com> --- .changeset/add-adapter-disconnect-hook.md | 5 ----- .changeset/brown-tires-lick.md | 12 ------------ .changeset/few-cows-search.md | 5 ----- .changeset/fix-finish-step-event-name.md | 5 ----- .changeset/fix-telegram-markdown-parse-mode.md | 5 ----- .changeset/fix-thread-channel-types.md | 5 ----- .changeset/telegram-entity-markdown.md | 5 ----- packages/adapter-discord/CHANGELOG.md | 14 ++++++++++++++ packages/adapter-discord/package.json | 2 +- packages/adapter-gchat/CHANGELOG.md | 15 +++++++++++++++ packages/adapter-gchat/package.json | 2 +- packages/adapter-github/CHANGELOG.md | 14 ++++++++++++++ packages/adapter-github/package.json | 2 +- packages/adapter-linear/CHANGELOG.md | 14 ++++++++++++++ packages/adapter-linear/package.json | 2 +- packages/adapter-shared/CHANGELOG.md | 9 +++++++++ packages/adapter-shared/package.json | 2 +- packages/adapter-slack/CHANGELOG.md | 14 ++++++++++++++ packages/adapter-slack/package.json | 2 +- packages/adapter-teams/CHANGELOG.md | 14 ++++++++++++++ packages/adapter-teams/package.json | 2 +- packages/adapter-telegram/CHANGELOG.md | 16 ++++++++++++++++ packages/adapter-telegram/package.json | 2 +- packages/adapter-whatsapp/CHANGELOG.md | 14 ++++++++++++++ packages/adapter-whatsapp/package.json | 2 +- packages/chat/CHANGELOG.md | 11 +++++++++++ packages/chat/package.json | 2 +- packages/state-ioredis/CHANGELOG.md | 9 +++++++++ packages/state-ioredis/package.json | 2 +- packages/state-memory/CHANGELOG.md | 9 +++++++++ packages/state-memory/package.json | 2 +- packages/state-pg/CHANGELOG.md | 9 +++++++++ packages/state-pg/package.json | 2 +- packages/state-redis/CHANGELOG.md | 9 +++++++++ packages/state-redis/package.json | 2 +- 35 files changed, 185 insertions(+), 56 deletions(-) delete mode 100644 .changeset/add-adapter-disconnect-hook.md delete mode 100644 .changeset/brown-tires-lick.md delete mode 100644 .changeset/few-cows-search.md delete mode 100644 .changeset/fix-finish-step-event-name.md delete mode 100644 .changeset/fix-telegram-markdown-parse-mode.md delete mode 100644 .changeset/fix-thread-channel-types.md delete mode 100644 .changeset/telegram-entity-markdown.md diff --git a/.changeset/add-adapter-disconnect-hook.md b/.changeset/add-adapter-disconnect-hook.md deleted file mode 100644 index 7e17831b..00000000 --- a/.changeset/add-adapter-disconnect-hook.md +++ /dev/null @@ -1,5 +0,0 @@ ---- -"chat": minor ---- - -Add optional `disconnect()` hook to the Adapter interface, called during `chat.shutdown()` for resource cleanup diff --git a/.changeset/brown-tires-lick.md b/.changeset/brown-tires-lick.md deleted file mode 100644 index 10eee9eb..00000000 --- a/.changeset/brown-tires-lick.md +++ /dev/null @@ -1,12 +0,0 @@ ---- -"@chat-adapter/telegram": minor -"@chat-adapter/whatsapp": minor -"@chat-adapter/discord": minor -"@chat-adapter/github": minor -"@chat-adapter/linear": minor -"@chat-adapter/gchat": minor -"@chat-adapter/slack": minor -"@chat-adapter/teams": minor ---- - -Switch adapters from optional dep to full dep on chat diff --git a/.changeset/few-cows-search.md b/.changeset/few-cows-search.md deleted file mode 100644 index c7bbde31..00000000 --- a/.changeset/few-cows-search.md +++ /dev/null @@ -1,5 +0,0 @@ ---- -"@chat-adapter/gchat": patch ---- - -Introduce optional gchat signature verification diff --git a/.changeset/fix-finish-step-event-name.md b/.changeset/fix-finish-step-event-name.md deleted file mode 100644 index 95c0e6f8..00000000 --- a/.changeset/fix-finish-step-event-name.md +++ /dev/null @@ -1,5 +0,0 @@ ---- -"chat": patch ---- - -Fix `fromFullStream()` step separator detection for AI SDK v5+: rename `step-finish` event check to `finish-step` diff --git a/.changeset/fix-telegram-markdown-parse-mode.md b/.changeset/fix-telegram-markdown-parse-mode.md deleted file mode 100644 index 51a3210b..00000000 --- a/.changeset/fix-telegram-markdown-parse-mode.md +++ /dev/null @@ -1,5 +0,0 @@ ---- -"@chat-adapter/telegram": patch ---- - -Set `parse_mode` to `"Markdown"` when posting messages with a `markdown` field, not only for card messages diff --git a/.changeset/fix-thread-channel-types.md b/.changeset/fix-thread-channel-types.md deleted file mode 100644 index a4cf4b31..00000000 --- a/.changeset/fix-thread-channel-types.md +++ /dev/null @@ -1,5 +0,0 @@ ---- -"chat": patch ---- - -Add missing `toJSON()` method declarations to `Thread` and `Channel` interfaces to match their implementations. diff --git a/.changeset/telegram-entity-markdown.md b/.changeset/telegram-entity-markdown.md deleted file mode 100644 index d404a9f7..00000000 --- a/.changeset/telegram-entity-markdown.md +++ /dev/null @@ -1,5 +0,0 @@ ---- -"@chat-adapter/telegram": patch ---- - -Convert Telegram message entities to markdown in parsed messages diff --git a/packages/adapter-discord/CHANGELOG.md b/packages/adapter-discord/CHANGELOG.md index 940db607..dfd54065 100644 --- a/packages/adapter-discord/CHANGELOG.md +++ b/packages/adapter-discord/CHANGELOG.md @@ -1,5 +1,19 @@ # @chat-adapter/discord +## 4.21.0 + +### Minor Changes + +- d778f72: Switch adapters from optional dep to full dep on chat + +### Patch Changes + +- Updated dependencies [e45a67f] +- Updated dependencies [13ba1c7] +- Updated dependencies [95fd8ce] + - chat@4.21.0 + - @chat-adapter/shared@4.21.0 + ## 4.20.2 ### Patch Changes diff --git a/packages/adapter-discord/package.json b/packages/adapter-discord/package.json index 3ca8d3a5..187a6d6b 100644 --- a/packages/adapter-discord/package.json +++ b/packages/adapter-discord/package.json @@ -1,6 +1,6 @@ { "name": "@chat-adapter/discord", - "version": "4.20.2", + "version": "4.21.0", "description": "Discord adapter for chat", "type": "module", "main": "./dist/index.js", diff --git a/packages/adapter-gchat/CHANGELOG.md b/packages/adapter-gchat/CHANGELOG.md index 6d04e461..bf0c7e0e 100644 --- a/packages/adapter-gchat/CHANGELOG.md +++ b/packages/adapter-gchat/CHANGELOG.md @@ -1,5 +1,20 @@ # @chat-adapter/gchat +## 4.21.0 + +### Minor Changes + +- d778f72: Switch adapters from optional dep to full dep on chat + +### Patch Changes + +- 000792b: Introduce optional gchat signature verification +- Updated dependencies [e45a67f] +- Updated dependencies [13ba1c7] +- Updated dependencies [95fd8ce] + - chat@4.21.0 + - @chat-adapter/shared@4.21.0 + ## 4.20.2 ### Patch Changes diff --git a/packages/adapter-gchat/package.json b/packages/adapter-gchat/package.json index eeb6c813..3b31b352 100644 --- a/packages/adapter-gchat/package.json +++ b/packages/adapter-gchat/package.json @@ -1,6 +1,6 @@ { "name": "@chat-adapter/gchat", - "version": "4.20.2", + "version": "4.21.0", "description": "Google Chat adapter for chat", "type": "module", "main": "./dist/index.js", diff --git a/packages/adapter-github/CHANGELOG.md b/packages/adapter-github/CHANGELOG.md index a03b9b0e..66c8de32 100644 --- a/packages/adapter-github/CHANGELOG.md +++ b/packages/adapter-github/CHANGELOG.md @@ -1,5 +1,19 @@ # @chat-adapter/github +## 4.21.0 + +### Minor Changes + +- d778f72: Switch adapters from optional dep to full dep on chat + +### Patch Changes + +- Updated dependencies [e45a67f] +- Updated dependencies [13ba1c7] +- Updated dependencies [95fd8ce] + - chat@4.21.0 + - @chat-adapter/shared@4.21.0 + ## 4.20.2 ### Patch Changes diff --git a/packages/adapter-github/package.json b/packages/adapter-github/package.json index a21575d0..30dcb944 100644 --- a/packages/adapter-github/package.json +++ b/packages/adapter-github/package.json @@ -1,6 +1,6 @@ { "name": "@chat-adapter/github", - "version": "4.20.2", + "version": "4.21.0", "description": "GitHub adapter for chat - PR comment threads", "type": "module", "main": "./dist/index.js", diff --git a/packages/adapter-linear/CHANGELOG.md b/packages/adapter-linear/CHANGELOG.md index 16897ab5..6bcd08ae 100644 --- a/packages/adapter-linear/CHANGELOG.md +++ b/packages/adapter-linear/CHANGELOG.md @@ -1,5 +1,19 @@ # @chat-adapter/linear +## 4.21.0 + +### Minor Changes + +- d778f72: Switch adapters from optional dep to full dep on chat + +### Patch Changes + +- Updated dependencies [e45a67f] +- Updated dependencies [13ba1c7] +- Updated dependencies [95fd8ce] + - chat@4.21.0 + - @chat-adapter/shared@4.21.0 + ## 4.20.2 ### Patch Changes diff --git a/packages/adapter-linear/package.json b/packages/adapter-linear/package.json index 53a5d73b..cefdd387 100644 --- a/packages/adapter-linear/package.json +++ b/packages/adapter-linear/package.json @@ -1,6 +1,6 @@ { "name": "@chat-adapter/linear", - "version": "4.20.2", + "version": "4.21.0", "description": "Linear adapter for chat - issue comment threads", "type": "module", "main": "./dist/index.js", diff --git a/packages/adapter-shared/CHANGELOG.md b/packages/adapter-shared/CHANGELOG.md index cccacf52..cc28424d 100644 --- a/packages/adapter-shared/CHANGELOG.md +++ b/packages/adapter-shared/CHANGELOG.md @@ -1,5 +1,14 @@ # @chat-adapter/shared +## 4.21.0 + +### Patch Changes + +- Updated dependencies [e45a67f] +- Updated dependencies [13ba1c7] +- Updated dependencies [95fd8ce] + - chat@4.21.0 + ## 4.20.2 ### Patch Changes diff --git a/packages/adapter-shared/package.json b/packages/adapter-shared/package.json index 41f20fd1..1840d05c 100644 --- a/packages/adapter-shared/package.json +++ b/packages/adapter-shared/package.json @@ -1,6 +1,6 @@ { "name": "@chat-adapter/shared", - "version": "4.20.2", + "version": "4.21.0", "description": "Shared utilities for chat SDK adapters", "type": "module", "exports": { diff --git a/packages/adapter-slack/CHANGELOG.md b/packages/adapter-slack/CHANGELOG.md index d214eec1..87b69803 100644 --- a/packages/adapter-slack/CHANGELOG.md +++ b/packages/adapter-slack/CHANGELOG.md @@ -1,5 +1,19 @@ # @chat-adapter/slack +## 4.21.0 + +### Minor Changes + +- d778f72: Switch adapters from optional dep to full dep on chat + +### Patch Changes + +- Updated dependencies [e45a67f] +- Updated dependencies [13ba1c7] +- Updated dependencies [95fd8ce] + - chat@4.21.0 + - @chat-adapter/shared@4.21.0 + ## 4.20.2 ### Patch Changes diff --git a/packages/adapter-slack/package.json b/packages/adapter-slack/package.json index a2ad97a4..44f2e080 100644 --- a/packages/adapter-slack/package.json +++ b/packages/adapter-slack/package.json @@ -1,6 +1,6 @@ { "name": "@chat-adapter/slack", - "version": "4.20.2", + "version": "4.21.0", "description": "Slack adapter for chat", "type": "module", "main": "./dist/index.js", diff --git a/packages/adapter-teams/CHANGELOG.md b/packages/adapter-teams/CHANGELOG.md index fd691771..377b05d5 100644 --- a/packages/adapter-teams/CHANGELOG.md +++ b/packages/adapter-teams/CHANGELOG.md @@ -1,5 +1,19 @@ # @chat-adapter/teams +## 4.21.0 + +### Minor Changes + +- d778f72: Switch adapters from optional dep to full dep on chat + +### Patch Changes + +- Updated dependencies [e45a67f] +- Updated dependencies [13ba1c7] +- Updated dependencies [95fd8ce] + - chat@4.21.0 + - @chat-adapter/shared@4.21.0 + ## 4.20.2 ### Patch Changes diff --git a/packages/adapter-teams/package.json b/packages/adapter-teams/package.json index 4ecf7074..aa318ed1 100644 --- a/packages/adapter-teams/package.json +++ b/packages/adapter-teams/package.json @@ -1,6 +1,6 @@ { "name": "@chat-adapter/teams", - "version": "4.20.2", + "version": "4.21.0", "description": "Microsoft Teams adapter for chat", "type": "module", "main": "./dist/index.js", diff --git a/packages/adapter-telegram/CHANGELOG.md b/packages/adapter-telegram/CHANGELOG.md index 0b7049e4..6d6a0150 100644 --- a/packages/adapter-telegram/CHANGELOG.md +++ b/packages/adapter-telegram/CHANGELOG.md @@ -1,5 +1,21 @@ # @chat-adapter/telegram +## 4.21.0 + +### Minor Changes + +- d778f72: Switch adapters from optional dep to full dep on chat + +### Patch Changes + +- 1d36004: Set `parse_mode` to `"Markdown"` when posting messages with a `markdown` field, not only for card messages +- 85a1d7f: Convert Telegram message entities to markdown in parsed messages +- Updated dependencies [e45a67f] +- Updated dependencies [13ba1c7] +- Updated dependencies [95fd8ce] + - chat@4.21.0 + - @chat-adapter/shared@4.21.0 + ## 4.20.2 ### Patch Changes diff --git a/packages/adapter-telegram/package.json b/packages/adapter-telegram/package.json index 5f26bb38..2ab98e96 100644 --- a/packages/adapter-telegram/package.json +++ b/packages/adapter-telegram/package.json @@ -1,6 +1,6 @@ { "name": "@chat-adapter/telegram", - "version": "4.20.2", + "version": "4.21.0", "description": "Telegram adapter for chat", "type": "module", "main": "./dist/index.js", diff --git a/packages/adapter-whatsapp/CHANGELOG.md b/packages/adapter-whatsapp/CHANGELOG.md index 2db19dd6..e2a8875a 100644 --- a/packages/adapter-whatsapp/CHANGELOG.md +++ b/packages/adapter-whatsapp/CHANGELOG.md @@ -1,5 +1,19 @@ # @chat-adapter/whatsapp +## 4.21.0 + +### Minor Changes + +- d778f72: Switch adapters from optional dep to full dep on chat + +### Patch Changes + +- Updated dependencies [e45a67f] +- Updated dependencies [13ba1c7] +- Updated dependencies [95fd8ce] + - chat@4.21.0 + - @chat-adapter/shared@4.21.0 + ## 4.20.2 ### Patch Changes diff --git a/packages/adapter-whatsapp/package.json b/packages/adapter-whatsapp/package.json index aa90689f..ded203ea 100644 --- a/packages/adapter-whatsapp/package.json +++ b/packages/adapter-whatsapp/package.json @@ -1,6 +1,6 @@ { "name": "@chat-adapter/whatsapp", - "version": "4.20.2", + "version": "4.21.0", "description": "WhatsApp adapter for chat - WhatsApp Business Cloud API", "type": "module", "main": "./dist/index.js", diff --git a/packages/chat/CHANGELOG.md b/packages/chat/CHANGELOG.md index ec43939f..878a25d9 100644 --- a/packages/chat/CHANGELOG.md +++ b/packages/chat/CHANGELOG.md @@ -1,5 +1,16 @@ # chat +## 4.21.0 + +### Minor Changes + +- e45a67f: Add optional `disconnect()` hook to the Adapter interface, called during `chat.shutdown()` for resource cleanup + +### Patch Changes + +- 13ba1c7: Fix `fromFullStream()` step separator detection for AI SDK v5+: rename `step-finish` event check to `finish-step` +- 95fd8ce: Add missing `toJSON()` method declarations to `Thread` and `Channel` interfaces to match their implementations. + ## 4.20.2 ## 4.20.1 diff --git a/packages/chat/package.json b/packages/chat/package.json index 908b94ea..b83d3400 100644 --- a/packages/chat/package.json +++ b/packages/chat/package.json @@ -1,6 +1,6 @@ { "name": "chat", - "version": "4.20.2", + "version": "4.21.0", "description": "Unified chat abstraction for Slack, Teams, Google Chat, and Discord", "type": "module", "main": "./dist/index.js", diff --git a/packages/state-ioredis/CHANGELOG.md b/packages/state-ioredis/CHANGELOG.md index a18ae901..297a98db 100644 --- a/packages/state-ioredis/CHANGELOG.md +++ b/packages/state-ioredis/CHANGELOG.md @@ -1,5 +1,14 @@ # @chat-adapter/state-ioredis +## 4.21.0 + +### Patch Changes + +- Updated dependencies [e45a67f] +- Updated dependencies [13ba1c7] +- Updated dependencies [95fd8ce] + - chat@4.21.0 + ## 4.20.2 ### Patch Changes diff --git a/packages/state-ioredis/package.json b/packages/state-ioredis/package.json index 2be1c914..708907b0 100644 --- a/packages/state-ioredis/package.json +++ b/packages/state-ioredis/package.json @@ -1,6 +1,6 @@ { "name": "@chat-adapter/state-ioredis", - "version": "4.20.2", + "version": "4.21.0", "description": "ioredis state adapter for chat (production)", "type": "module", "main": "./dist/index.js", diff --git a/packages/state-memory/CHANGELOG.md b/packages/state-memory/CHANGELOG.md index b775b042..243edfe0 100644 --- a/packages/state-memory/CHANGELOG.md +++ b/packages/state-memory/CHANGELOG.md @@ -1,5 +1,14 @@ # @chat-adapter/state-memory +## 4.21.0 + +### Patch Changes + +- Updated dependencies [e45a67f] +- Updated dependencies [13ba1c7] +- Updated dependencies [95fd8ce] + - chat@4.21.0 + ## 4.20.2 ### Patch Changes diff --git a/packages/state-memory/package.json b/packages/state-memory/package.json index 15a2cff1..3179db3b 100644 --- a/packages/state-memory/package.json +++ b/packages/state-memory/package.json @@ -1,6 +1,6 @@ { "name": "@chat-adapter/state-memory", - "version": "4.20.2", + "version": "4.21.0", "description": "In-memory state adapter for chat (development/testing)", "type": "module", "main": "./dist/index.js", diff --git a/packages/state-pg/CHANGELOG.md b/packages/state-pg/CHANGELOG.md index 60771e17..06216e1d 100644 --- a/packages/state-pg/CHANGELOG.md +++ b/packages/state-pg/CHANGELOG.md @@ -1,5 +1,14 @@ # @chat-adapter/state-pg +## 4.21.0 + +### Patch Changes + +- Updated dependencies [e45a67f] +- Updated dependencies [13ba1c7] +- Updated dependencies [95fd8ce] + - chat@4.21.0 + ## 4.20.2 ### Patch Changes diff --git a/packages/state-pg/package.json b/packages/state-pg/package.json index 72b4d801..c97e4faf 100644 --- a/packages/state-pg/package.json +++ b/packages/state-pg/package.json @@ -1,6 +1,6 @@ { "name": "@chat-adapter/state-pg", - "version": "4.20.2", + "version": "4.21.0", "description": "Postgres state adapter for chat (production)", "type": "module", "main": "./dist/index.js", diff --git a/packages/state-redis/CHANGELOG.md b/packages/state-redis/CHANGELOG.md index 76a19fc5..7b350914 100644 --- a/packages/state-redis/CHANGELOG.md +++ b/packages/state-redis/CHANGELOG.md @@ -1,5 +1,14 @@ # @chat-adapter/state-redis +## 4.21.0 + +### Patch Changes + +- Updated dependencies [e45a67f] +- Updated dependencies [13ba1c7] +- Updated dependencies [95fd8ce] + - chat@4.21.0 + ## 4.20.2 ### Patch Changes diff --git a/packages/state-redis/package.json b/packages/state-redis/package.json index 38ce0e0c..2fd52332 100644 --- a/packages/state-redis/package.json +++ b/packages/state-redis/package.json @@ -1,6 +1,6 @@ { "name": "@chat-adapter/state-redis", - "version": "4.20.2", + "version": "4.21.0", "description": "Redis state adapter for chat (production)", "type": "module", "main": "./dist/index.js", From 54fc63e793042d88d16e5a1e13bd7b0faf1bdf6f Mon Sep 17 00:00:00 2001 From: Malte Ubl Date: Mon, 23 Mar 2026 12:00:44 -0700 Subject: [PATCH 9/9] Fix serialization --- packages/chat/src/ai.ts | 4 +-- packages/chat/src/chat.ts | 70 ++++++++++++++++++++++++++++++++++----- 2 files changed, 63 insertions(+), 11 deletions(-) diff --git a/packages/chat/src/ai.ts b/packages/chat/src/ai.ts index 7d02f92c..0f0640d8 100644 --- a/packages/chat/src/ai.ts +++ b/packages/chat/src/ai.ts @@ -193,7 +193,7 @@ export async function toAiMessages( : msg.text; // Append link metadata when available - if (msg.links.length > 0) { + if (msg.links && msg.links.length > 0) { const linkParts = msg.links .map((link) => { const parts = link.fetchMessage @@ -218,7 +218,7 @@ export async function toAiMessages( let aiMessage: AiMessage; if (role === "user") { const attachmentParts: AiMessagePart[] = []; - for (const att of msg.attachments) { + for (const att of msg.attachments ?? []) { const part = await attachmentToPart(att); if (part) { attachmentParts.push(part); diff --git a/packages/chat/src/chat.ts b/packages/chat/src/chat.ts index d6800733..9da245f6 100644 --- a/packages/chat/src/chat.ts +++ b/packages/chat/src/chat.ts @@ -19,6 +19,7 @@ import type { AssistantContextChangedHandler, AssistantThreadStartedEvent, AssistantThreadStartedHandler, + Attachment, Author, Channel, ChatConfig, @@ -27,6 +28,8 @@ import type { ConcurrencyStrategy, DirectMessageHandler, EmojiValue, + FormattedContent, + LinkPreview, Lock, LockScope, Logger, @@ -41,7 +44,6 @@ import type { ModalResponse, ModalSubmitEvent, ModalSubmitHandler, - QueueEntry, ReactionEvent, ReactionHandler, SentMessage, @@ -1852,11 +1854,14 @@ export class Chat< break; } + // Reconstruct Message instance after JSON roundtrip through state adapter + const msg = this.rehydrateMessage(entry.message); + if (Date.now() > entry.expiresAt) { this.logger.info("message-expired", { threadId, lockKey, - messageId: entry.message.id, + messageId: msg.id, }); continue; } @@ -1868,7 +1873,7 @@ export class Chat< this.logger.info("message-superseded", { threadId, lockKey, - droppedId: entry.message.id, + droppedId: msg.id, }); continue; } @@ -1877,9 +1882,9 @@ export class Chat< this.logger.info("message-dequeued", { threadId, lockKey, - messageId: entry.message.id, + messageId: msg.id, }); - await this.dispatchToHandlers(adapter, threadId, entry.message); + await this.dispatchToHandlers(adapter, threadId, msg); break; } } @@ -1895,20 +1900,21 @@ export class Chat< lockKey: string ): Promise { while (true) { - // Collect all pending messages - const pending: QueueEntry[] = []; + // Collect all pending messages, rehydrating after JSON roundtrip + const pending: Array<{ message: Message; expiresAt: number }> = []; while (true) { const entry = await this._stateAdapter.dequeue(lockKey); if (!entry) { break; } + const msg = this.rehydrateMessage(entry.message); if (Date.now() <= entry.expiresAt) { - pending.push(entry); + pending.push({ message: msg, expiresAt: entry.expiresAt }); } else { this.logger.info("message-expired", { threadId, lockKey, - messageId: entry.message.id, + messageId: msg.id, }); } } @@ -2141,6 +2147,52 @@ export class Chat< return str.replace(/[.*+?^${}()|[\]\\]/g, "\\$&"); } + /** + * Reconstruct a proper Message instance from a dequeued entry. + * After JSON roundtrip through the state adapter, the message is a plain + * object (not a Message instance). This restores class invariants like + * `links` defaulting to `[]` and `metadata.dateSent` being a Date. + */ + private rehydrateMessage(raw: Message | Record): Message { + if (raw instanceof Message) { + return raw; + } + // After JSON roundtrip, Message.toJSON() was called during stringify, + // so the shape matches SerializedMessage + const obj = raw as Record; + if (obj._type === "chat:Message") { + return Message.fromJSON(obj as unknown as SerializedMessage); + } + // Fallback: plain object that wasn't serialized via toJSON (e.g., in-memory state) + // Reconstruct with defensive defaults + const metadata = obj.metadata as Record; + const dateSent = metadata.dateSent; + const editedAt = metadata.editedAt; + return new Message({ + id: obj.id as string, + threadId: obj.threadId as string, + text: obj.text as string, + formatted: obj.formatted as FormattedContent, + raw: obj.raw, + author: obj.author as Author, + metadata: { + dateSent: + dateSent instanceof Date ? dateSent : new Date(dateSent as string), + edited: metadata.edited as boolean, + editedAt: editedAt + ? new Date( + editedAt instanceof Date + ? editedAt.toISOString() + : (editedAt as string) + ) + : undefined, + }, + attachments: (obj.attachments as Attachment[]) ?? [], + isMention: obj.isMention as boolean | undefined, + links: (obj.links as LinkPreview[] | undefined) ?? [], + }); + } + private async runHandlers( handlers: Array< (