From e06c2b961a460e34d48f25245edac67f17b0a461 Mon Sep 17 00:00:00 2001 From: Punit Arani Date: Sun, 29 Mar 2026 13:35:40 -0700 Subject: [PATCH] Rename Amby worker entities and bindings --- apps/api/src/durable-objects/chat-state.ts | 6 +-- .../durable-objects/conversation-session.ts | 22 +++++----- apps/api/src/queue/consumer.ts | 10 ++--- apps/api/src/worker-surface.test.ts | 44 +++++++++++++++++++ apps/api/src/worker.ts | 40 +++++------------ apps/api/src/workflows/agent-execution.ts | 6 +-- apps/api/src/workflows/sandbox-provision.ts | 8 ++-- apps/api/src/workflows/volume-provision.ts | 6 +-- apps/api/wrangler.toml | 32 +++++++++----- docs/RUNTIME.md | 16 +++---- docs/channels/telegram.md | 36 +++++++-------- docs/chat/attachments.md | 18 ++++---- packages/attachments/src/service.test.ts | 4 +- packages/channels/src/telegram/chat-sdk.ts | 8 ++-- packages/channels/src/telegram/utils.ts | 4 +- packages/computer/src/sandbox/service.ts | 2 +- packages/env/src/shared.ts | 4 +- packages/env/src/workers.ts | 12 ++--- 18 files changed, 155 insertions(+), 123 deletions(-) create mode 100644 apps/api/src/worker-surface.test.ts diff --git a/apps/api/src/durable-objects/chat-state.ts b/apps/api/src/durable-objects/chat-state.ts index ad87a92..d1eecda 100644 --- a/apps/api/src/durable-objects/chat-state.ts +++ b/apps/api/src/durable-objects/chat-state.ts @@ -68,7 +68,7 @@ function readQueueExpiry(raw: string) { } } -export class ChatStateDO extends DurableObject { +export class AmbyChatState extends DurableObject { private readonly sql: DurableObjectStorage["sql"] constructor(ctx: DurableObjectState, env: TEnv) { @@ -404,7 +404,7 @@ export class ChatStateDO extends DurableObject { await this.ctx.storage.setAlarm(next) } } catch (err) { - console.error("ChatStateDO alarm failed, rescheduling:", err) + console.error("amby_ChatState alarm failed, rescheduling:", err) await this.ctx.storage.setAlarm(Date.now() + 30_000) } } @@ -433,7 +433,7 @@ export class ChatStateDO extends DurableObject { const next = this.nextExpiry() if (next !== null) { this.ctx.storage.setAlarm(next).catch((err) => { - console.error("ChatStateDO failed to schedule cleanup alarm:", err) + console.error("amby_ChatState failed to schedule cleanup alarm:", err) }) } } diff --git a/apps/api/src/durable-objects/conversation-session.ts b/apps/api/src/durable-objects/conversation-session.ts index d0e35c8..5ef2e3d 100644 --- a/apps/api/src/durable-objects/conversation-session.ts +++ b/apps/api/src/durable-objects/conversation-session.ts @@ -24,7 +24,7 @@ interface IngestPayload { const DEBOUNCE_MS = 3000 const ACTIVE_DEBOUNCE_MS = 1000 -export class ConversationSession extends DurableObject { +export class AmbyConversation extends DurableObject { private state: SessionState = { status: "idle", userId: null, @@ -96,7 +96,7 @@ export class ConversationSession extends DurableObject { async ingestMessage(payload: IngestPayload): Promise { await this.hydrate() setTelegramScope({ - component: "conversation-session.ingest", + component: "amby_conversation.ingest", chatId: payload.chatId, from: payload.from, userId: this.state.userId, @@ -133,13 +133,13 @@ export class ConversationSession extends DurableObject { if (this.state.status === "processing") { // Agent is already running — forward as interrupt to the active workflow - if (this.state.activeWorkflowId && this.env.AGENT_WORKFLOW) { + if (this.state.activeWorkflowId && this.env.AMBY_AGENT_EXECUTION) { try { - const instance = await this.env.AGENT_WORKFLOW.get(this.state.activeWorkflowId) + const instance = await this.env.AMBY_AGENT_EXECUTION.get(this.state.activeWorkflowId) await Sentry.startSpan( { op: "workflow.event", - name: "AgentExecutionWorkflow.sendEvent", + name: "amby_AgentExecution.sendEvent", }, async () => { await instance.sendEvent({ @@ -168,7 +168,7 @@ export class ConversationSession extends DurableObject { const pendingFrom = await this.ctx.storage.get("pendingFrom") if (this.state.chatId) { setTelegramScope({ - component: "conversation-session.alarm", + component: "amby_conversation.alarm", chatId: this.state.chatId, from: pendingFrom ?? null, userId: this.state.userId, @@ -179,7 +179,7 @@ export class ConversationSession extends DurableObject { }, }) } else { - setWorkerScope("conversation-session.alarm", { + setWorkerScope("amby_conversation.alarm", { buffered_message_count: this.state.buffer.length, session_status: this.state.status, }) @@ -205,13 +205,13 @@ export class ConversationSession extends DurableObject { this.state.status = "processing" // Launch the workflow - const workflow = this.env.AGENT_WORKFLOW + const workflow = this.env.AMBY_AGENT_EXECUTION if (workflow) { try { const instance = await Sentry.startSpan( { op: "workflow.start", - name: "AgentExecutionWorkflow.create", + name: "amby_AgentExecution.create", }, () => workflow.create({ @@ -239,7 +239,7 @@ export class ConversationSession extends DurableObject { await this.ctx.storage.setAlarm(Date.now() + 5000) } } else { - console.error("[DO] AGENT_WORKFLOW binding not available") + console.error("[DO] AMBY_AGENT_EXECUTION binding not available") this.state.status = "idle" } @@ -249,7 +249,7 @@ export class ConversationSession extends DurableObject { async completeExecution(result: { userId?: string; conversationId?: string }): Promise { await this.hydrate() setTelegramScope({ - component: "conversation-session.complete", + component: "amby_conversation.complete", chatId: this.state.chatId ?? undefined, userId: result.userId ?? this.state.userId, conversationId: result.conversationId ?? this.state.conversationId, diff --git a/apps/api/src/queue/consumer.ts b/apps/api/src/queue/consumer.ts index 87a8baf..b6a301f 100644 --- a/apps/api/src/queue/consumer.ts +++ b/apps/api/src/queue/consumer.ts @@ -60,7 +60,7 @@ export async function handleQueueBatch( try { await runtime.runPromise( handleCommand(parsedCommand, from, chatId, { - sandboxWorkflow: env.SANDBOX_WORKFLOW, + sandboxWorkflow: env.AMBY_SANDBOX_PROVISION, }), ) Sentry.logger.info("Telegram command processed", { @@ -83,10 +83,10 @@ export async function handleQueueBatch( if (!bufferedMessage) { return } - // Route supported Telegram messages to ConversationSession Durable Object - const doBinding = env.CONVERSATION_SESSION + // Route supported Telegram messages to the amby_Conversation Durable Object. + const doBinding = env.AMBY_CONVERSATION if (!doBinding) { - console.error("[Queue] CONVERSATION_SESSION binding not available") + console.error("[Queue] AMBY_CONVERSATION binding not available") return } @@ -96,7 +96,7 @@ export async function handleQueueBatch( await Sentry.startSpan( { op: "durable-object.rpc", - name: "ConversationSession.ingestMessage", + name: "amby_Conversation.ingestMessage", }, async () => { await stub.ingestMessage({ diff --git a/apps/api/src/worker-surface.test.ts b/apps/api/src/worker-surface.test.ts new file mode 100644 index 0000000..1fbd911 --- /dev/null +++ b/apps/api/src/worker-surface.test.ts @@ -0,0 +1,44 @@ +import { describe, expect, it } from "bun:test" + +async function readText(relativePath: string) { + return await Bun.file(new URL(relativePath, import.meta.url)).text() +} + +describe("Cloudflare worker surface", () => { + it("exports the renamed Durable Objects and Workflows", async () => { + const source = await readText("./worker.ts") + + for (const exportName of [ + "AmbyConversation", + "AmbyChatState", + "AmbyAgentExecution", + "AmbySandboxProvision", + "AmbyVolumeProvision", + ]) { + expect(source).toContain(exportName) + } + }) + + it("keeps Wrangler config aligned with the renamed worker entities", async () => { + const source = await readText("../wrangler.toml") + + expect(source).toContain('{ name = "AMBY_CONVERSATION", class_name = "AmbyConversation" }') + expect(source).toContain('{ name = "AMBY_CHAT_STATE", class_name = "AmbyChatState" }') + expect(source).toContain('binding = "AMBY_AGENT_EXECUTION"') + expect(source).toContain('class_name = "AmbyAgentExecution"') + expect(source).toContain('binding = "AMBY_SANDBOX_PROVISION"') + expect(source).toContain('class_name = "AmbySandboxProvision"') + expect(source).toContain('binding = "AMBY_VOLUME_PROVISION"') + expect(source).toContain('class_name = "AmbyVolumeProvision"') + expect(source).toContain('tag = "v3"') + expect(source).toContain('new_classes = ["AmbyConversation"]') + expect(source).toContain('tag = "v4"') + expect(source).toContain('new_sqlite_classes = ["AmbyChatState"]') + + expect(source).not.toContain('{ name = "CONVERSATION_SESSION", class_name = "ConversationSession" }') + expect(source).not.toContain('{ name = "CHAT_STATE", class_name = "ChatStateDO" }') + expect(source).not.toContain('binding = "AGENT_WORKFLOW"') + expect(source).not.toContain('binding = "SANDBOX_WORKFLOW"') + expect(source).not.toContain('binding = "VOLUME_WORKFLOW"') + }) +}) diff --git a/apps/api/src/worker.ts b/apps/api/src/worker.ts index fea003a..f435bd3 100644 --- a/apps/api/src/worker.ts +++ b/apps/api/src/worker.ts @@ -21,42 +21,22 @@ import { createCloudflareChatState, } from "./chat-state/cloudflare-chat-state" import { handleExpiredConnectedAccount } from "./composio/expired-account" -import { ChatStateDO as ChatStateDOBase } from "./durable-objects/chat-state" -import { ConversationSession as ConversationSessionBase } from "./durable-objects/conversation-session" +import { AmbyChatState } from "./durable-objects/chat-state" +import { AmbyConversation } from "./durable-objects/conversation-session" import { handleScheduledReconciliation } from "./handlers/reconciliation" import { handleTaskEventPost } from "./handlers/task-events" import { getHomeResponse } from "./home" import { handleQueueBatch } from "./queue/consumer" import { makeAgentRuntimeForConsumer, makeRuntimeForConsumer } from "./queue/runtime" -import { getSentryOptions, getSentryOptionsOrFallback, setTelegramScope } from "./sentry" -import { AgentExecutionWorkflow as AgentExecutionWorkflowBase } from "./workflows/agent-execution" -import { SandboxProvisionWorkflow as SandboxProvisionWorkflowBase } from "./workflows/sandbox-provision" -import { VolumeProvisionWorkflow as VolumeProvisionWorkflowBase } from "./workflows/volume-provision" - -// Re-export instrumented Durable Object and Workflow classes so Cloudflare can discover them -export const ConversationSession = Sentry.instrumentDurableObjectWithSentry( - getSentryOptionsOrFallback, - ConversationSessionBase, -) -export const ChatStateDO = Sentry.instrumentDurableObjectWithSentry( - getSentryOptionsOrFallback, - ChatStateDOBase, -) -export const AgentExecutionWorkflow = Sentry.instrumentWorkflowWithSentry( - getSentryOptionsOrFallback, - AgentExecutionWorkflowBase, -) -export const SandboxProvisionWorkflow = Sentry.instrumentWorkflowWithSentry( - getSentryOptionsOrFallback, - SandboxProvisionWorkflowBase, -) -export const VolumeProvisionWorkflow = Sentry.instrumentWorkflowWithSentry( - getSentryOptionsOrFallback, - VolumeProvisionWorkflowBase, -) +import { getSentryOptions, setTelegramScope } from "./sentry" +import { AmbyAgentExecution } from "./workflows/agent-execution" +import { AmbySandboxProvision } from "./workflows/sandbox-provision" +import { AmbyVolumeProvision } from "./workflows/volume-provision" + +export { AmbyConversation, AmbyChatState, AmbyAgentExecution, AmbySandboxProvision, AmbyVolumeProvision } type ApiBindings = WorkerBindings & { - CHAT_STATE: ChatStateNamespaceLike + AMBY_CHAT_STATE: ChatStateNamespaceLike } type Env = { Bindings: ApiBindings; Variables: { posthogDistinctId?: string } } @@ -213,7 +193,7 @@ function getOrCreateWorkerChatState(env: ApiBindings) { // Intentionally route all Chat SDK transport state through one unsharded DO instance for now. // Shard by adapter name once webhook throughput or lock contention justifies it. // connect() is not called here — the Chat SDK calls state.connect() during init. - workerChatState = createCloudflareChatState({ namespace: env.CHAT_STATE }) + workerChatState = createCloudflareChatState({ namespace: env.AMBY_CHAT_STATE }) return workerChatState } diff --git a/apps/api/src/workflows/agent-execution.ts b/apps/api/src/workflows/agent-execution.ts index 8e6ce72..f1a2de8 100644 --- a/apps/api/src/workflows/agent-execution.ts +++ b/apps/api/src/workflows/agent-execution.ts @@ -20,7 +20,7 @@ export interface AgentExecutionParams { parentContext?: string } -export class AgentExecutionWorkflow extends WorkflowEntrypoint< +export class AmbyAgentExecution extends WorkflowEntrypoint< WorkerBindings, AgentExecutionParams > { @@ -29,7 +29,7 @@ export class AgentExecutionWorkflow extends WorkflowEntrypoint< let { userId, conversationId } = event.payload setTelegramScope({ - component: "workflow.agent_execution", + component: "workflow.amby_agent_execution", chatId, from, userId, @@ -299,7 +299,7 @@ export class AgentExecutionWorkflow extends WorkflowEntrypoint< userId: string | null, conversationId: string | null | undefined, ) { - const doBinding = this.env.CONVERSATION_SESSION + const doBinding = this.env.AMBY_CONVERSATION if (isSubAgent || !doBinding) return await step.do("complete", async () => { diff --git a/apps/api/src/workflows/sandbox-provision.ts b/apps/api/src/workflows/sandbox-provision.ts index 2d4e760..ac98382 100644 --- a/apps/api/src/workflows/sandbox-provision.ts +++ b/apps/api/src/workflows/sandbox-provision.ts @@ -28,13 +28,13 @@ export interface SandboxProvisionParams { userId: string } -export class SandboxProvisionWorkflow extends WorkflowEntrypoint< +export class AmbySandboxProvision extends WorkflowEntrypoint< WorkerBindings, SandboxProvisionParams > { async run(event: WorkflowEvent, step: WorkflowStep) { const { userId } = event.payload - const scope = setWorkerScope("workflow.sandbox_provision", { + const scope = setWorkerScope("workflow.amby_sandbox_provision", { workflow_instance_id: event.instanceId, user_id: userId, }) @@ -132,9 +132,9 @@ export class SandboxProvisionWorkflow extends WorkflowEntrypoint< if (volumeRow.status !== "ready") { await upsertMainSandbox(null, "volume_creating", volumeRow.id) - const volumeWorkflow = env.VOLUME_WORKFLOW + const volumeWorkflow = env.AMBY_VOLUME_PROVISION if (!volumeWorkflow) { - throw new Error("VOLUME_WORKFLOW binding is not configured.") + throw new Error("AMBY_VOLUME_PROVISION binding is not configured.") } await step.do("start-volume-workflow", async () => { diff --git a/apps/api/src/workflows/volume-provision.ts b/apps/api/src/workflows/volume-provision.ts index cde4010..9e57995 100644 --- a/apps/api/src/workflows/volume-provision.ts +++ b/apps/api/src/workflows/volume-provision.ts @@ -29,13 +29,13 @@ export interface VolumeProvisionResult { status: "creating" | "ready" | "error" | "deleted" } -export class VolumeProvisionWorkflow extends WorkflowEntrypoint< +export class AmbyVolumeProvision extends WorkflowEntrypoint< WorkerBindings, VolumeProvisionParams > { async run(event: WorkflowEvent, step: WorkflowStep) { const { userId, parentWorkflowId } = event.payload - const scope = setWorkerScope("workflow.volume_provision", { + const scope = setWorkerScope("workflow.amby_volume_provision", { workflow_instance_id: event.instanceId, user_id: userId, }) @@ -131,7 +131,7 @@ export class VolumeProvisionWorkflow extends WorkflowEntrypoint< } } - const sandboxWorkflow = env.SANDBOX_WORKFLOW + const sandboxWorkflow = env.AMBY_SANDBOX_PROVISION if (parentWorkflowId && sandboxWorkflow) { await step.do( "notify-parent", diff --git a/apps/api/wrangler.toml b/apps/api/wrangler.toml index bd05a5c..f3ea128 100644 --- a/apps/api/wrangler.toml +++ b/apps/api/wrangler.toml @@ -108,11 +108,11 @@ dead_letter_queue = "telegram-dlq" binding = "TELEGRAM_DLQ" queue = "telegram-dlq" -# --- Durable Object: ConversationSession --- +# --- Durable Objects: Amby runtime state --- [durable_objects] bindings = [ - { name = "CONVERSATION_SESSION", class_name = "ConversationSession" }, - { name = "CHAT_STATE", class_name = "ChatStateDO" } + { name = "AMBY_CONVERSATION", class_name = "AmbyConversation" }, + { name = "AMBY_CHAT_STATE", class_name = "AmbyChatState" } ] [[migrations]] @@ -123,21 +123,29 @@ new_classes = ["ConversationSession"] tag = "v2" new_sqlite_classes = ["ChatStateDO"] +[[migrations]] +tag = "v3" +new_classes = ["AmbyConversation"] + +[[migrations]] +tag = "v4" +new_sqlite_classes = ["AmbyChatState"] + # --- Workflow: AgentExecution --- [[workflows]] -name = "agent-execution" -binding = "AGENT_WORKFLOW" -class_name = "AgentExecutionWorkflow" +name = "amby-agent-execution" +binding = "AMBY_AGENT_EXECUTION" +class_name = "AmbyAgentExecution" [[workflows]] -name = "sandbox-provision" -binding = "SANDBOX_WORKFLOW" -class_name = "SandboxProvisionWorkflow" +name = "amby-sandbox-provision" +binding = "AMBY_SANDBOX_PROVISION" +class_name = "AmbySandboxProvision" [[workflows]] -name = "volume-provision" -binding = "VOLUME_WORKFLOW" -class_name = "VolumeProvisionWorkflow" +name = "amby-volume-provision" +binding = "AMBY_VOLUME_PROVISION" +class_name = "AmbyVolumeProvision" # Task reconciliation + Telegram completion notifications [triggers] diff --git a/docs/RUNTIME.md b/docs/RUNTIME.md index 168bce0..78ce64d 100644 --- a/docs/RUNTIME.md +++ b/docs/RUNTIME.md @@ -9,8 +9,8 @@ sequenceDiagram participant TG as Telegram participant WH as Webhook (Hono) participant Q as Queue - participant DO as ConversationSession DO - participant WF as AgentExecutionWorkflow + participant DO as "AmbyConversation DO" + participant WF as AmbyAgentExecution participant AG as AgentService participant TG2 as Telegram @@ -31,7 +31,7 @@ sequenceDiagram WF->>DO: completeExecution(userId, conversationId) ``` -## Durable Object: ConversationSession +## Durable Object: AmbyConversation One instance per Telegram chat. Provides: @@ -42,7 +42,7 @@ One instance per Telegram chat. Provides: States: `idle` -> `debouncing` -> `processing` -> `idle` -## AgentExecutionWorkflow +## AmbyAgentExecution Cloudflare Workflow with durable steps and retry: @@ -186,13 +186,13 @@ Two additional Cloudflare Workflows handle compute provisioning: | Workflow | Responsibility | |----------|---------------| -| `SandboxProvisionWorkflow` | Ensure user has a valid main sandbox on correct volume | -| `VolumeProvisionWorkflow` | Ensure per-user persistent volume exists and is ready | +| `AmbySandboxProvision` | Ensure user has a valid main sandbox on correct volume | +| `AmbyVolumeProvision` | Ensure per-user persistent volume exists and is ready | ```mermaid flowchart LR - AgentWF[AgentExecutionWorkflow] -->|needs sandbox| SandboxWF[SandboxProvisionWorkflow] - SandboxWF -->|needs volume| VolumeWF[VolumeProvisionWorkflow] + AgentWF[AmbyAgentExecution] -->|needs sandbox| SandboxWF[AmbySandboxProvision] + SandboxWF -->|needs volume| VolumeWF[AmbyVolumeProvision] ``` Stale/invalid sandboxes and unusable volumes are replaced automatically. diff --git a/docs/channels/telegram.md b/docs/channels/telegram.md index a950fe9..088a87b 100644 --- a/docs/channels/telegram.md +++ b/docs/channels/telegram.md @@ -39,7 +39,7 @@ Telegram logic is not responsible for: | Runtime | File | When it is used | Core difference | |---|---|---|---| -| Cloudflare Worker | `apps/api/src/worker.ts` | production-style runtime and `wrangler dev` | uses `ChatStateDO`, `ConversationSession`, and `AgentExecutionWorkflow` | +| Cloudflare Worker | `apps/api/src/worker.ts` | production-style runtime and `wrangler dev` | uses `AmbyChatState`, `AmbyConversation`, and `AmbyAgentExecution` | | Bun | `apps/api/src/index.ts` | local Bun-only development | uses `createAmbyBot()` and in-memory Chat SDK state | ## Production Worker flow @@ -49,9 +49,9 @@ sequenceDiagram participant TG as Telegram participant WH as worker.ts POST /telegram/webhook participant SDK as Chat SDK gateway - participant State as ChatStateDO - participant Sess as ConversationSession DO - participant WF as AgentExecutionWorkflow + participant State as AmbyChatState + participant Sess as "AmbyConversation DO" + participant WF as AmbyAgentExecution participant Attach as "@amby/attachments" participant Agent as ConversationRuntime participant Send as ReplySender @@ -89,7 +89,7 @@ sequenceDiagram That route does three things: 1. creates or reuses the Worker Chat SDK singleton via `getOrCreateChat(...)` -2. injects the Worker-only Chat SDK state adapter backed by `ChatStateDO` +2. injects the Worker-only Chat SDK state adapter backed by `AmbyChatState` 3. hands the raw request to `chat.webhooks.telegram(...)` At this point the raw HTTP request leaves Hono and enters the Chat SDK plus `@chat-adapter/telegram`. @@ -106,31 +106,31 @@ Important responsibilities here: - ignore messages authored by the bot itself - route accepted messages into `routeIncomingMessage(...)` -The Worker path uses a dedicated `ChatStateDO` through `createCloudflareChatState(...)`. +The Worker path uses a dedicated `AmbyChatState` through `createCloudflareChatState(...)`. ### 3. Chat SDK state ownership -`ChatStateDO` is not the business workflow state. It only stores Chat SDK transport state. +`AmbyChatState` is not the business workflow state. It only stores Chat SDK transport state. ```mermaid flowchart LR Update[Telegram update] --> Chat[Chat SDK] - Chat --> State[ChatStateDO] + Chat --> State[AmbyChatState] Chat --> Route[routeIncomingMessage] Route --> Command[handleCommand] - Route --> Session[ConversationSession DO] - Session --> Workflow[AgentExecutionWorkflow] + Route --> Session["AmbyConversation DO"] + Session --> Workflow[AmbyAgentExecution] Workflow --> Runtime[ConversationRuntime] ``` -`ChatStateDO` owns: +`AmbyChatState` owns: - thread subscriptions - thread locks - small cached values and dedupe keys - Chat SDK list/history storage -`ConversationSession` owns: +`AmbyConversation` owns: - the per-chat message buffer - debounce timing @@ -171,7 +171,7 @@ Commands do not go through the buffering Durable Object or the agent workflow. #### Normal message path -If the message is not a command and can be normalized into a `BufferedInboundMessage`, `routeIncomingMessage(...)` forwards it to `ConversationSession.ingestMessage(...)` using one Durable Object instance per Telegram chat id. +If the message is not a command and can be normalized into a `BufferedInboundMessage`, `routeIncomingMessage(...)` forwards it to `AmbyConversation.ingestMessage(...)` using one Durable Object instance per Telegram chat id. For v1 this includes: @@ -209,7 +209,7 @@ Key behavior: ### 6. Durable workflow execution -When the debounce alarm fires, `ConversationSession` starts `AgentExecutionWorkflow`. +When the debounce alarm fires, `AmbyConversation` starts `AmbyAgentExecution`. `apps/api/src/workflows/agent-execution.ts` then: @@ -221,7 +221,7 @@ When the debounce alarm fires, `ConversationSession` starts `AgentExecutionWorkf 6. streams interim output by posting once and then editing every 500ms 7. sends final reply parts through `ReplySender`, using native Telegram photo/document delivery when possible 8. falls back to signed Amby download links for unsupported outbound files -9. tells `ConversationSession` that execution finished +9. tells `AmbyConversation` that execution finished The workflow is where Telegram delivery, attachment ingest, and agent execution meet. @@ -286,9 +286,9 @@ Differences from the Worker path: - uses `createAmbyBot()` in `packages/channels/src/telegram/bot.ts` - keeps Chat SDK state in memory with `createMemoryState()` -- does not use `ChatStateDO` -- does not use `ConversationSession` -- does not use `AgentExecutionWorkflow` +- does not use `AmbyChatState` +- does not use `AmbyConversation` +- does not use `AmbyAgentExecution` This path is for local development convenience, not for production durability. diff --git a/docs/chat/attachments.md b/docs/chat/attachments.md index b218f90..ad799b9 100644 --- a/docs/chat/attachments.md +++ b/docs/chat/attachments.md @@ -41,8 +41,8 @@ The attachment pipeline is split across a few clear boundaries. ```mermaid flowchart LR - CH["Channel adapter"] --> DO["ConversationSession DO"] - DO --> WF["AgentExecutionWorkflow"] + CH["Channel adapter"] --> DO["AmbyConversation DO"] + DO --> WF["AmbyAgentExecution"] WF --> ATT["@amby/attachments"] ATT --> DB["Postgres"] ATT --> BLOB["R2 or local blob store"] @@ -59,11 +59,11 @@ Responsibilities: - parse raw transport payloads - build `BufferedInboundMessage` - preserve transport-specific source metadata -- `ConversationSession`: +- `AmbyConversation`: - debounce bursts of input - merge media groups when the channel provides them - hand buffered messages to the workflow -- `AgentExecutionWorkflow`: +- `AmbyAgentExecution`: - resolve the user and conversation - call attachment ingest at workflow time - pass structured current-turn input to the agent @@ -142,8 +142,8 @@ The `attachments` table stores canonical metadata: ```mermaid sequenceDiagram participant CH as Channel adapter - participant DO as ConversationSession - participant WF as AgentExecutionWorkflow + participant DO as AmbyConversation + participant WF as AmbyAgentExecution participant ATT as "@amby/attachments" participant DB as Postgres participant BLOB as R2 or local store @@ -187,7 +187,7 @@ The current implementation uses Telegram-specific source metadata, but the buffe Attachment bytes are downloaded and stored only after user and conversation resolution. -That happens inside `AgentExecutionWorkflow` by calling: +That happens inside `AmbyAgentExecution` by calling: ```ts AttachmentService.ingestBufferedMessages(...) @@ -349,7 +349,7 @@ Replies are also part-based. ```mermaid sequenceDiagram participant AG as ConversationRuntime - participant WF as AgentExecutionWorkflow + participant WF as AmbyAgentExecution participant SEND as ReplySender participant ATT as "@amby/attachments" participant CH as Channel transport @@ -393,7 +393,7 @@ This split is intentional: ## Backward Compatibility -`ConversationSession` migrates legacy buffer entries on hydrate. Existing Durable Objects that stored the pre-attachment `{ text, messageId, date }` shape are transparently converted to the new `BufferedInboundMessage` format the first time they load. +`AmbyConversation` migrates legacy buffer entries on hydrate. Existing Durable Objects that stored the pre-attachment `{ text, messageId, date }` shape are transparently converted to the new `BufferedInboundMessage` format the first time they load. `ReplyDraftHandle` carries optional `chunkIds` for multi-chunk Telegram messages. The streaming preview is capped at 4090 characters to avoid Telegram `editMessageText` failures. On finalization, the streaming draft is always deleted and the final response is posted fresh, which handles splitting naturally. diff --git a/packages/attachments/src/service.test.ts b/packages/attachments/src/service.test.ts index 3a6cc7d..49da3cd 100644 --- a/packages/attachments/src/service.test.ts +++ b/packages/attachments/src/service.test.ts @@ -52,8 +52,8 @@ const TEST_ENV: Env = { BRAINTRUST_PROJECT_ID: "", POSTHOG_KEY: "", POSTHOG_HOST: "", - SANDBOX_WORKFLOW: undefined, - VOLUME_WORKFLOW: undefined, + AMBY_SANDBOX_PROVISION: undefined, + AMBY_VOLUME_PROVISION: undefined, } function unexpectedCall(label: string) { diff --git a/packages/channels/src/telegram/chat-sdk.ts b/packages/channels/src/telegram/chat-sdk.ts index a681149..0d24004 100644 --- a/packages/channels/src/telegram/chat-sdk.ts +++ b/packages/channels/src/telegram/chat-sdk.ts @@ -95,7 +95,7 @@ async function routeIncomingMessage( try { await runtime.runPromise( handleCommand(parsedCommand, from, chatId, { - sandboxWorkflow: env.SANDBOX_WORKFLOW, + sandboxWorkflow: env.AMBY_SANDBOX_PROVISION, }).pipe( Effect.catchAllCause((cause) => Effect.sync(() => { @@ -126,10 +126,10 @@ async function routeIncomingMessage( await identityRuntime.dispose() } - // Text messages: route to ConversationSession DO for debouncing - const doBinding = env.CONVERSATION_SESSION + // Text messages: route to the amby_Conversation DO for debouncing. + const doBinding = env.AMBY_CONVERSATION if (!doBinding) { - console.error("[ChatSDK] CONVERSATION_SESSION binding not available") + console.error("[ChatSDK] AMBY_CONVERSATION binding not available") return } diff --git a/packages/channels/src/telegram/utils.ts b/packages/channels/src/telegram/utils.ts index 6250303..7abb313 100644 --- a/packages/channels/src/telegram/utils.ts +++ b/packages/channels/src/telegram/utils.ts @@ -285,7 +285,7 @@ const startTelegramSession = ( userId: string, from: TelegramFrom, chatId: number, - options?: { sandboxWorkflow?: WorkerBindings["SANDBOX_WORKFLOW"] }, + options?: { sandboxWorkflow?: WorkerBindings["AMBY_SANDBOX_PROVISION"] }, ) => Effect.gen(function* () { const env = yield* EnvService @@ -414,7 +414,7 @@ export const handleCommand = ( command: ParsedTelegramCommand, from: TelegramFrom, chatId: number, - options?: { sandboxWorkflow?: WorkerBindings["SANDBOX_WORKFLOW"] }, + options?: { sandboxWorkflow?: WorkerBindings["AMBY_SANDBOX_PROVISION"] }, ) => Effect.gen(function* () { const sender = yield* TelegramSender diff --git a/packages/computer/src/sandbox/service.ts b/packages/computer/src/sandbox/service.ts index f9af85c..ea05f42 100644 --- a/packages/computer/src/sandbox/service.ts +++ b/packages/computer/src/sandbox/service.ts @@ -64,7 +64,7 @@ export const SandboxServiceLive = Layer.effect( const kickOffSandboxProvision = (userId: string) => Effect.tryPromise({ try: async () => { - const workflow = env.SANDBOX_WORKFLOW + const workflow = env.AMBY_SANDBOX_PROVISION if (!workflow) return await kickOffSandboxProvisionIfNeeded(computeStore, userId, () => workflow.create({ id: sandboxWorkflowId(userId), params: { userId } }), diff --git a/packages/env/src/shared.ts b/packages/env/src/shared.ts index 3dfcbf1..3f59de9 100644 --- a/packages/env/src/shared.ts +++ b/packages/env/src/shared.ts @@ -104,8 +104,8 @@ export interface Env { readonly POSTHOG_HOST: string // Workflows - readonly SANDBOX_WORKFLOW?: WorkflowBinding<{ userId: string }> - readonly VOLUME_WORKFLOW?: WorkflowBinding<{ userId: string; parentWorkflowId?: string }> + readonly AMBY_SANDBOX_PROVISION?: WorkflowBinding<{ userId: string }> + readonly AMBY_VOLUME_PROVISION?: WorkflowBinding<{ userId: string; parentWorkflowId?: string }> } export const DEFAULT_TELEGRAM_BOT_USERNAME = "my_amby_bot" diff --git a/packages/env/src/workers.ts b/packages/env/src/workers.ts index 2984871..97c9922 100644 --- a/packages/env/src/workers.ts +++ b/packages/env/src/workers.ts @@ -82,13 +82,13 @@ export interface WorkerBindings { AI?: unknown TELEGRAM_QUEUE?: { send(body: unknown, options?: { contentType?: string }): Promise } TELEGRAM_DLQ?: { send(body: unknown, options?: { contentType?: string }): Promise } - CONVERSATION_SESSION?: { + AMBY_CONVERSATION?: { idFromName(name: string): { toString(): string } get(id: { toString(): string }): { ingestMessage(msg: unknown): Promise } } - AGENT_WORKFLOW?: WorkflowBinding - SANDBOX_WORKFLOW?: WorkflowBinding<{ userId: string }> - VOLUME_WORKFLOW?: WorkflowBinding<{ userId: string; parentWorkflowId?: string }> + AMBY_AGENT_EXECUTION?: WorkflowBinding + AMBY_SANDBOX_PROVISION?: WorkflowBinding<{ userId: string }> + AMBY_VOLUME_PROVISION?: WorkflowBinding<{ userId: string; parentWorkflowId?: string }> } export const makeEnvServiceFromBindings = (bindings: WorkerBindings) => @@ -160,6 +160,6 @@ export const makeEnvServiceFromBindings = (bindings: WorkerBindings) => POSTHOG_HOST: bindings.POSTHOG_HOST ?? "https://us.i.posthog.com", // Workflows - SANDBOX_WORKFLOW: bindings.SANDBOX_WORKFLOW, - VOLUME_WORKFLOW: bindings.VOLUME_WORKFLOW, + AMBY_SANDBOX_PROVISION: bindings.AMBY_SANDBOX_PROVISION, + AMBY_VOLUME_PROVISION: bindings.AMBY_VOLUME_PROVISION, })