Skip to content

Refactor Telegram turn handling for direct DO execution and faster follow-ups#104

Closed
punitarani wants to merge 5 commits intomainfrom
feat/improve-telegram-queue-handling
Closed

Refactor Telegram turn handling for direct DO execution and faster follow-ups#104
punitarani wants to merge 5 commits intomainfrom
feat/improve-telegram-queue-handling

Conversation

@punitarani
Copy link
Copy Markdown
Owner

@punitarani punitarani commented Mar 29, 2026

Summary

  • remove the dead Telegram queue ingress path and route normal Telegram messages directly through the Chat SDK, ConversationSession, and workflow
  • add adaptive turn batching, in-flight turn preservation, execution tokens, and atomic first-outbound claiming to handle pre-reply follow-ups safely
  • make Telegram workflow delivery token-aware to suppress stale or duplicate replies and update docs/runtime references to match the final architecture
  • expand durable object and workflow coverage for debounce behavior, supersession, and outbound gating

Testing

  • bun run --filter @amby/api test
  • bun run --filter @amby/api typecheck
  • bun run --filter @amby/channels test

Greptile Summary

This PR replaces the Cloudflare Queue-based Telegram ingress with a direct webhook→Chat SDK→ConversationSession DO path, and introduces a substantial new turn-management architecture: adaptive debounce deadlines, execution tokens for atomic first-outbound claiming, supersession detection for pre-reply corrections, and a TelegramDeliveryController abstraction that gates all outbound Telegram messages behind a DO-verified claim. The agent-execution workflow is also refactored to use the delivery controller and to explicitly avoid retrying the agent-loop step (preventing duplicate user-visible messages).

Key changes:

  • Queue removal (consumer.ts deleted, wrangler.toml queue bindings removed) — messages now ingested synchronously at webhook time via chat-sdk.ts
  • conversation-session-state.ts (new) — pure state-machine helpers for debounce scheduling, supersession, execution tokens, and outbound claiming, with good unit test coverage
  • conversation-session.ts — DO updated to use new state helpers; adds claimFirstOutbound RPC; P1 bug: completeExecution sets state to "debouncing" on rerun but never calls this.ctx.storage.setAlarm(), so follow-up messages queued during a running workflow are silently lost when the workflow finishes
  • telegram-delivery.ts (new) — delivery controller with suppression, streaming draft management, and first-outbound gating; well-tested
  • agent-execution.ts — retries removed from agent-loop step, execution token threaded through, blocked-user check moved from early webhook path into workflow
  • Legacy buffer migration removed — in-flight DOs with old-format buffer entries will no longer be migrated on hydration

Confidence Score: 4/5

Not safe to merge until the missing alarm scheduling in completeExecution is fixed; without it, follow-up messages queued during an active workflow are silently dropped.

One P1 bug: completeExecution transitions state to debouncing via completeExecutionState but never calls this.ctx.storage.setAlarm(), so the rerun path is broken. All other findings are P2. The new architecture, state-machine extraction, and test coverage are well-designed; a single targeted fix unblocks merge.

apps/api/src/durable-objects/conversation-session.ts — completeExecution method needs to schedule the Cloudflare alarm when shouldRerun is true

Important Files Changed

Filename Overview
apps/api/src/durable-objects/conversation-session.ts Core DO refactored to use new state helpers — P1 bug: completeExecution sets state to debouncing on rerun but never schedules the Cloudflare alarm, so follow-up messages after a workflow completes are silently dropped.
apps/api/src/durable-objects/conversation-session-state.ts New file extracting all session state logic — adaptive debounce, supersession detection, and execution token state transitions. Contains a minor typo (SUPERSSESSION_PREFIXES) but the state machine logic is well-structured and thoroughly tested.
apps/api/src/workflows/telegram-delivery.ts New delivery controller encapsulating first-outbound claiming, suppression, streaming, and finalisation — clean abstraction with good test coverage.
apps/api/src/workflows/agent-execution.ts Refactored to use TelegramDeliveryController, removed retries from agent-loop step (intentional — prevents duplicate messages), now passes executionToken to DO on completion.
apps/api/src/durable-objects/conversation-session.test.ts Good new test coverage for debounce deadlines, supersession detection, in-flight state, and stale-token guards — all pure state-function tests that are fast and reliable.
apps/api/src/workflows/agent-execution.test.ts New tests covering TelegramDeliveryController claim logic, suppression, stream-then-edit flow, and verifying AGENT_LOOP_STEP_OPTIONS has no retries.
packages/channels/src/telegram/chat-sdk.ts Removed early blocked-user check; blocked users now detected inside the workflow instead of at the webhook ingestion layer, adding latency before the relink message is sent.
apps/api/src/queue/consumer.ts File deleted — Cloudflare Queue ingress path fully removed; Telegram messages now route directly through the Chat SDK and ConversationSession DO.

Sequence Diagram

sequenceDiagram
    participant TG as Telegram Webhook
    participant SDK as Chat SDK
    participant DO as ConversationSession DO
    participant WF as AgentExecutionWorkflow
    participant DLV as TelegramDeliveryController

    TG->>SDK: incoming message
    SDK->>DO: ingestMessage(payload)
    DO->>DO: buffer + scheduleDebounce (setAlarm)

    alt follow-up arrives while processing
        TG->>SDK: follow-up message
        SDK->>DO: ingestMessage(payload)
        DO->>DO: handleProcessingFollowUpState (may set supersededAt)
    end

    DO-->>DO: alarm fires
    DO->>DO: beginProcessingState with executionToken
    DO->>WF: create workflow(chatId, messages, executionToken)

    WF->>DO: claimFirstOutbound(executionToken)
    DO-->>WF: ClaimFirstOutboundResult

    WF->>DLV: flushStreamText / finalizeResponse
    DLV->>TG: post or edit message

    WF->>DO: completeExecution(executionToken, outcome)
    DO->>DO: completeExecutionState

    alt shouldRerun true
        Note over DO: state set to debouncing but setAlarm not called
    else shouldRerun false
        DO->>DO: status = idle
    end
Loading

Comments Outside Diff (1)

  1. apps/api/src/durable-objects/conversation-session.ts, line 298-307 (link)

    P1 Missing alarm scheduling on rerun path in completeExecution

    completeExecutionState calls scheduleDebounceState when shouldRerun is true, which sets state.status = "debouncing" and state.debounceDeadlineAt — but it only mutates in-memory state. The actual Cloudflare alarm (this.ctx.storage.setAlarm(deadline)) is never scheduled here.

    This means that when a workflow finishes and there are buffered follow-up messages, the DO sits in "debouncing" state forever without the alarm ever firing. Those messages are silently dropped.

    The alarm() handler also returns early for any non-"debouncing" status, so there's no fallback path.

    // completeExecution should schedule the Cloudflare alarm when shouldRerun is true:
    const result = completeExecutionState(this.state, input, Date.now())
    if (result.accepted && result.shouldRerun && this.state.debounceDeadlineAt !== null) {
        await this.ctx.storage.setAlarm(this.state.debounceDeadlineAt)
    }
    await this.persist()
    return result

Reviews (1): Last reviewed commit: "Refresh Telegram docs and remove exec pl..." | Re-trigger Greptile

Greptile also left 2 inline comments on this PR.

@punitarani punitarani marked this pull request as ready for review March 29, 2026 02:01
@chatgpt-codex-connector
Copy link
Copy Markdown

You have reached your Codex usage limits for code reviews. You can see your limits in the Codex usage dashboard.
To continue using code reviews, you can upgrade your account or add credits to your account and enable them for code reviews in your settings.

"sorry",
"i meant",
"ignore that",
"correction",
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Typo in constant name: SUPERSSESSION_PREFIXES

SUPERSSESSION has a double S — it should be SUPERSESSION_PREFIXES.

Suggested change
"correction",
const SUPERSESSION_PREFIXES = [

punitarani and others added 2 commits March 28, 2026 19:23
Resolve conflict in packages/env/src/workers.ts: remove TELEGRAM_QUEUE
and TELEGRAM_DLQ bindings (queue ingress replaced by direct DO path)
and deduplicate CLOUDFLARE_AI_GATEWAY_ID / ATTACHMENTS_BUCKET declarations.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@punitarani punitarani closed this Mar 30, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant