From c7d313cd9442f306fe67a7171645a7d55d933bc8 Mon Sep 17 00:00:00 2001 From: Malte Ubl Date: Tue, 24 Mar 2026 14:03:18 -0700 Subject: [PATCH] Docs for the new concurrency strategies --- .changeset/every-meals-say.md | 6 + apps/docs/content/docs/concurrency.mdx | 223 +++++++++++++++++++++++++ apps/docs/content/docs/meta.json | 1 + 3 files changed, 230 insertions(+) create mode 100644 .changeset/every-meals-say.md create mode 100644 apps/docs/content/docs/concurrency.mdx diff --git a/.changeset/every-meals-say.md b/.changeset/every-meals-say.md new file mode 100644 index 00000000..179ac19b --- /dev/null +++ b/.changeset/every-meals-say.md @@ -0,0 +1,6 @@ +--- +"docs": minor +"chat": minor +--- + +Implement new concurrency strategies for overlapping messages diff --git a/apps/docs/content/docs/concurrency.mdx b/apps/docs/content/docs/concurrency.mdx new file mode 100644 index 00000000..93baeddd --- /dev/null +++ b/apps/docs/content/docs/concurrency.mdx @@ -0,0 +1,223 @@ +--- +title: Concurrency +description: Control how overlapping messages on the same thread are handled — queue, debounce, drop, or process concurrently. +type: guide +prerequisites: + - /docs/handling-events +related: + - /docs/state + - /docs/streaming +--- + +When multiple messages arrive on the same thread while a handler is still processing, the SDK needs a strategy. By default, the incoming message is dropped. The `concurrency` option on `ChatConfig` lets you choose what happens instead. + +## Strategies + +### Drop (default) + +The original behavior. If a handler is already running on a thread, the new message is discarded and a `LockError` is thrown. No queuing, no retries. + +```typescript title="lib/bot.ts" +const bot = new Chat({ + concurrency: "drop", + // ... +}); +``` + +### Queue + +Messages that arrive while a handler is running are enqueued. When the current handler finishes, only the **latest** queued message is dispatched. All intermediate messages are provided as `context.skipped`, giving your handler full visibility into what happened while it was busy. + +```typescript title="lib/bot.ts" lineNumbers +const bot = new Chat({ + concurrency: "queue", + // ... +}); + +bot.onNewMention(async (thread, message, context) => { + if (context && context.skipped.length > 0) { + await thread.post( + `You sent ${context.totalSinceLastHandler} messages while I was thinking. Responding to your latest.` + ); + } + + const response = await generateAIResponse(message.text); + await thread.post(response); +}); +``` + +**Flow:** + +``` +A arrives → acquire lock → process A +B arrives → lock busy → enqueue B +C arrives → lock busy → enqueue C +D arrives → lock busy → enqueue D +A done → drain: [B, C, D] → handler(D, { skipped: [B, C] }) +D done → queue empty → release lock +``` + +### Debounce + +Every message starts or resets a debounce timer. Only the **final message in a burst** is processed. + +This is particularly useful for platforms like **WhatsApp** and **Telegram** where users tend to send a flurry of short messages in quick succession instead of composing a single message — "hey", "quick question", "how do I reset my password?" arriving as three separate webhooks within a few seconds. Without debounce, the bot would respond to "hey" before the actual question even arrives. With debounce, the SDK waits for a pause in the conversation and processes only the final message. + +```typescript title="lib/bot.ts" lineNumbers +const bot = new Chat({ + concurrency: { strategy: "debounce", debounceMs: 1500 }, + // ... +}); +``` + + +WhatsApp and Telegram adapters default to `lockScope: "channel"`, so debounce applies to the entire conversation — not just a single thread. + + +**Flow:** + +``` +A arrives → acquire lock → store A as pending → sleep(debounceMs) +B arrives → lock busy → overwrite pending with B (A dropped) +C arrives → lock busy → overwrite pending with C (B dropped) + ... debounceMs elapses with no new message ... + → process C → release lock +``` + +Debounce also works well for rapid corrections ("wait, I meant...") and multi-part messages on any platform. + +### Concurrent + +No locking at all. Every message is processed immediately in its own handler invocation. Use this for stateless handlers where thread ordering doesn't matter. + +```typescript title="lib/bot.ts" +const bot = new Chat({ + concurrency: "concurrent", + // ... +}); +``` + +## Configuration + +For fine-grained control, pass a `ConcurrencyConfig` object instead of a strategy string: + +```typescript title="lib/bot.ts" lineNumbers +const bot = new Chat({ + concurrency: { + strategy: "queue", + maxQueueSize: 20, // Max queued messages per thread (default: 10) + onQueueFull: "drop-oldest", // or "drop-newest" (default: "drop-oldest") + queueEntryTtlMs: 60_000, // Discard stale entries after 60s (default: 90s) + }, + // ... +}); +``` + +### All options + +| Option | Strategies | Default | Description | +|--------|-----------|---------|-------------| +| `strategy` | all | `"drop"` | The concurrency strategy to use | +| `maxQueueSize` | queue, debounce | `10` | Maximum queued messages per thread | +| `onQueueFull` | queue, debounce | `"drop-oldest"` | Whether to evict the oldest or reject the newest message when the queue is full | +| `queueEntryTtlMs` | queue, debounce | `90000` | TTL for queued entries in milliseconds. Expired entries are discarded on dequeue | +| `debounceMs` | debounce | `1500` | Debounce window in milliseconds | +| `maxConcurrent` | concurrent | `Infinity` | Max concurrent handlers per thread | + +## MessageContext + +All handler types (`onNewMention`, `onSubscribedMessage`, `onNewMessage`) accept an optional `MessageContext` as their last parameter. It is only populated when using the `queue` strategy and messages were skipped. + +```typescript +interface MessageContext { + /** Messages that arrived while the previous handler was running, in chronological order. */ + skipped: Message[]; + /** Total messages received since last handler ran (skipped.length + 1). */ + totalSinceLastHandler: number; +} +``` + +Existing handlers that don't use `context` are unaffected — the parameter is optional. + +### Example: Pass all messages to an LLM + +```typescript title="lib/bot.ts" lineNumbers +bot.onSubscribedMessage(async (thread, message, context) => { + // Combine skipped messages with the current one for full context + const allMessages = [...(context?.skipped ?? []), message]; + + const response = await generateAIResponse( + allMessages.map((m) => m.text).join("\n\n") + ); + await thread.post(response); +}); +``` + +## Lock scope + +By default, locks are scoped to the thread — messages in different threads are processed independently. For platforms like WhatsApp and Telegram where conversations happen at the channel level rather than in threads, the lock scope defaults to `"channel"`. + +You can override this globally: + +```typescript title="lib/bot.ts" lineNumbers +const bot = new Chat({ + concurrency: "queue", + lockScope: "channel", // or "thread" (default) + // ... +}); +``` + +Or resolve it dynamically per message: + +```typescript title="lib/bot.ts" lineNumbers +const bot = new Chat({ + concurrency: "queue", + lockScope: ({ isDM, adapter }) => { + // Use channel scope for DMs, thread scope for group channels + return isDM ? "channel" : "thread"; + }, + // ... +}); +``` + +## State adapter requirements + +The `queue` and `debounce` strategies require three additional methods on your state adapter: + +| Method | Description | +|--------|-------------| +| `enqueue(threadId, entry, maxSize)` | Atomically append a message to the thread's queue. Returns new depth. | +| `dequeue(threadId)` | Pop the next (oldest) message from the queue. Returns `null` if empty. | +| `queueDepth(threadId)` | Return the current number of queued messages. | + +All built-in state adapters (`@chat-adapter/state-memory`, `@chat-adapter/state-redis`, `@chat-adapter/state-ioredis`) implement these methods. The Redis adapters use Lua scripts for atomicity. + +## Observability + +All strategies emit structured log events at `info` level: + +| Event | Strategy | Data | +|-------|----------|------| +| `message-queued` | queue | threadId, messageId, queueDepth | +| `message-dequeued` | queue, debounce | threadId, messageId, skippedCount | +| `message-dropped` | drop, queue | threadId, messageId, reason | +| `message-expired` | queue, debounce | threadId, messageId | +| `message-superseded` | debounce | threadId, droppedId | +| `message-debouncing` | debounce | threadId, messageId, debounceMs | +| `message-debounce-reset` | debounce | threadId, messageId | + +## Choosing a strategy + +| Use case | Strategy | Why | +|----------|----------|-----| +| Simple bots, one-shot commands | `drop` | No complexity, no queue overhead | +| AI chatbots, customer support | `queue` | Never lose messages; handler sees full conversation context | +| WhatsApp/Telegram bots, rapid corrections | `debounce` | Users send many short messages in quick succession; wait for a pause before responding | +| Stateless lookups, translations | `concurrent` | Maximum throughput, no ordering needed | + +## Backward compatibility + +- The default strategy is `drop` — existing behavior is unchanged. +- The deprecated `onLockConflict` option continues to work but should be replaced with `concurrency`. +- Handler signatures are backward-compatible; the new `context` parameter is optional. +- Deduplication always runs regardless of strategy. diff --git a/apps/docs/content/docs/meta.json b/apps/docs/content/docs/meta.json index a751c0e4..34ab76b5 100644 --- a/apps/docs/content/docs/meta.json +++ b/apps/docs/content/docs/meta.json @@ -12,6 +12,7 @@ "adapters", "state", "---Features---", + "concurrency", "...", "error-handling", "---Guides---",