Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changeset/every-meals-say.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
"docs": minor
"chat": minor
---

Implement new concurrency strategies for overlapping messages
223 changes: 223 additions & 0 deletions apps/docs/content/docs/concurrency.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
---
title: Concurrency
description: Control how overlapping messages on the same thread are handled — queue, debounce, drop, or process concurrently.
type: guide
prerequisites:
- /docs/handling-events
related:
- /docs/state
- /docs/streaming
---

When multiple messages arrive on the same thread while a handler is still processing, the SDK needs a strategy. By default, the incoming message is dropped. The `concurrency` option on `ChatConfig` lets you choose what happens instead.

## Strategies

### Drop (default)

The original behavior. If a handler is already running on a thread, the new message is discarded and a `LockError` is thrown. No queuing, no retries.

```typescript title="lib/bot.ts"
const bot = new Chat({
concurrency: "drop",
// ...
});
```

### Queue

Messages that arrive while a handler is running are enqueued. When the current handler finishes, only the **latest** queued message is dispatched. All intermediate messages are provided as `context.skipped`, giving your handler full visibility into what happened while it was busy.

```typescript title="lib/bot.ts" lineNumbers
const bot = new Chat({
concurrency: "queue",
// ...
});

bot.onNewMention(async (thread, message, context) => {
if (context && context.skipped.length > 0) {
await thread.post(
`You sent ${context.totalSinceLastHandler} messages while I was thinking. Responding to your latest.`
);
}

const response = await generateAIResponse(message.text);
await thread.post(response);
});
```

**Flow:**

```
A arrives → acquire lock → process A
B arrives → lock busy → enqueue B
C arrives → lock busy → enqueue C
D arrives → lock busy → enqueue D
A done → drain: [B, C, D] → handler(D, { skipped: [B, C] })
D done → queue empty → release lock
```

### Debounce

Every message starts or resets a debounce timer. Only the **final message in a burst** is processed.

This is particularly useful for platforms like **WhatsApp** and **Telegram** where users tend to send a flurry of short messages in quick succession instead of composing a single message — "hey", "quick question", "how do I reset my password?" arriving as three separate webhooks within a few seconds. Without debounce, the bot would respond to "hey" before the actual question even arrives. With debounce, the SDK waits for a pause in the conversation and processes only the final message.

```typescript title="lib/bot.ts" lineNumbers
const bot = new Chat({
concurrency: { strategy: "debounce", debounceMs: 1500 },
// ...
});
```

<Callout type="info">
WhatsApp and Telegram adapters default to `lockScope: "channel"`, so debounce applies to the entire conversation — not just a single thread.
</Callout>

**Flow:**

```
A arrives → acquire lock → store A as pending → sleep(debounceMs)
B arrives → lock busy → overwrite pending with B (A dropped)
C arrives → lock busy → overwrite pending with C (B dropped)
... debounceMs elapses with no new message ...
→ process C → release lock
```

Debounce also works well for rapid corrections ("wait, I meant...") and multi-part messages on any platform.

### Concurrent

No locking at all. Every message is processed immediately in its own handler invocation. Use this for stateless handlers where thread ordering doesn't matter.

```typescript title="lib/bot.ts"
const bot = new Chat({
concurrency: "concurrent",
// ...
});
```

## Configuration

For fine-grained control, pass a `ConcurrencyConfig` object instead of a strategy string:

```typescript title="lib/bot.ts" lineNumbers
const bot = new Chat({
concurrency: {
strategy: "queue",
maxQueueSize: 20, // Max queued messages per thread (default: 10)
onQueueFull: "drop-oldest", // or "drop-newest" (default: "drop-oldest")
queueEntryTtlMs: 60_000, // Discard stale entries after 60s (default: 90s)
},
// ...
});
```

### All options

| Option | Strategies | Default | Description |
|--------|-----------|---------|-------------|
| `strategy` | all | `"drop"` | The concurrency strategy to use |
| `maxQueueSize` | queue, debounce | `10` | Maximum queued messages per thread |
| `onQueueFull` | queue, debounce | `"drop-oldest"` | Whether to evict the oldest or reject the newest message when the queue is full |
| `queueEntryTtlMs` | queue, debounce | `90000` | TTL for queued entries in milliseconds. Expired entries are discarded on dequeue |
| `debounceMs` | debounce | `1500` | Debounce window in milliseconds |
| `maxConcurrent` | concurrent | `Infinity` | Max concurrent handlers per thread |

## MessageContext

All handler types (`onNewMention`, `onSubscribedMessage`, `onNewMessage`) accept an optional `MessageContext` as their last parameter. It is only populated when using the `queue` strategy and messages were skipped.

```typescript
interface MessageContext {
/** Messages that arrived while the previous handler was running, in chronological order. */
skipped: Message[];
/** Total messages received since last handler ran (skipped.length + 1). */
totalSinceLastHandler: number;
}
```

Existing handlers that don't use `context` are unaffected — the parameter is optional.

### Example: Pass all messages to an LLM

```typescript title="lib/bot.ts" lineNumbers
bot.onSubscribedMessage(async (thread, message, context) => {
// Combine skipped messages with the current one for full context
const allMessages = [...(context?.skipped ?? []), message];

const response = await generateAIResponse(
allMessages.map((m) => m.text).join("\n\n")
);
await thread.post(response);
});
```

## Lock scope

By default, locks are scoped to the thread — messages in different threads are processed independently. For platforms like WhatsApp and Telegram where conversations happen at the channel level rather than in threads, the lock scope defaults to `"channel"`.

You can override this globally:

```typescript title="lib/bot.ts" lineNumbers
const bot = new Chat({
concurrency: "queue",
lockScope: "channel", // or "thread" (default)
// ...
});
```

Or resolve it dynamically per message:

```typescript title="lib/bot.ts" lineNumbers
const bot = new Chat({
concurrency: "queue",
lockScope: ({ isDM, adapter }) => {
// Use channel scope for DMs, thread scope for group channels
return isDM ? "channel" : "thread";
},
// ...
});
```

## State adapter requirements

The `queue` and `debounce` strategies require three additional methods on your state adapter:

| Method | Description |
|--------|-------------|
| `enqueue(threadId, entry, maxSize)` | Atomically append a message to the thread's queue. Returns new depth. |
| `dequeue(threadId)` | Pop the next (oldest) message from the queue. Returns `null` if empty. |
| `queueDepth(threadId)` | Return the current number of queued messages. |

All built-in state adapters (`@chat-adapter/state-memory`, `@chat-adapter/state-redis`, `@chat-adapter/state-ioredis`) implement these methods. The Redis adapters use Lua scripts for atomicity.

## Observability

All strategies emit structured log events at `info` level:

| Event | Strategy | Data |
|-------|----------|------|
| `message-queued` | queue | threadId, messageId, queueDepth |
| `message-dequeued` | queue, debounce | threadId, messageId, skippedCount |
| `message-dropped` | drop, queue | threadId, messageId, reason |
| `message-expired` | queue, debounce | threadId, messageId |
| `message-superseded` | debounce | threadId, droppedId |
| `message-debouncing` | debounce | threadId, messageId, debounceMs |
| `message-debounce-reset` | debounce | threadId, messageId |

## Choosing a strategy

| Use case | Strategy | Why |
|----------|----------|-----|
| Simple bots, one-shot commands | `drop` | No complexity, no queue overhead |
| AI chatbots, customer support | `queue` | Never lose messages; handler sees full conversation context |
| WhatsApp/Telegram bots, rapid corrections | `debounce` | Users send many short messages in quick succession; wait for a pause before responding |
| Stateless lookups, translations | `concurrent` | Maximum throughput, no ordering needed |

## Backward compatibility

- The default strategy is `drop` — existing behavior is unchanged.
- The deprecated `onLockConflict` option continues to work but should be replaced with `concurrency`.
- Handler signatures are backward-compatible; the new `context` parameter is optional.
- Deduplication always runs regardless of strategy.
1 change: 1 addition & 0 deletions apps/docs/content/docs/meta.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"adapters",
"state",
"---Features---",
"concurrency",
"...",
"error-handling",
"---Guides---",
Expand Down
Loading