Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions apps/api/src/durable-objects/chat-state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ function readQueueExpiry(raw: string) {
}
}

export class ChatStateDO<TEnv = unknown> extends DurableObject<TEnv> {
export class AmbyChatState<TEnv = unknown> extends DurableObject<TEnv> {
private readonly sql: DurableObjectStorage["sql"]

constructor(ctx: DurableObjectState, env: TEnv) {
Expand Down Expand Up @@ -404,7 +404,7 @@ export class ChatStateDO<TEnv = unknown> extends DurableObject<TEnv> {
await this.ctx.storage.setAlarm(next)
}
} catch (err) {
console.error("ChatStateDO alarm failed, rescheduling:", err)
console.error("amby_ChatState alarm failed, rescheduling:", err)
await this.ctx.storage.setAlarm(Date.now() + 30_000)
}
}
Expand Down Expand Up @@ -433,7 +433,7 @@ export class ChatStateDO<TEnv = unknown> extends DurableObject<TEnv> {
const next = this.nextExpiry()
if (next !== null) {
this.ctx.storage.setAlarm(next).catch((err) => {
console.error("ChatStateDO failed to schedule cleanup alarm:", err)
console.error("amby_ChatState failed to schedule cleanup alarm:", err)
})
}
}
Expand Down
22 changes: 11 additions & 11 deletions apps/api/src/durable-objects/conversation-session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ interface IngestPayload {
const DEBOUNCE_MS = 3000
const ACTIVE_DEBOUNCE_MS = 1000

export class ConversationSession extends DurableObject<WorkerBindings> {
export class AmbyConversation extends DurableObject<WorkerBindings> {
private state: SessionState = {
status: "idle",
userId: null,
Expand Down Expand Up @@ -96,7 +96,7 @@ export class ConversationSession extends DurableObject<WorkerBindings> {
async ingestMessage(payload: IngestPayload): Promise<void> {
await this.hydrate()
setTelegramScope({
component: "conversation-session.ingest",
component: "amby_conversation.ingest",
chatId: payload.chatId,
from: payload.from,
userId: this.state.userId,
Expand Down Expand Up @@ -133,13 +133,13 @@ export class ConversationSession extends DurableObject<WorkerBindings> {

if (this.state.status === "processing") {
// Agent is already running — forward as interrupt to the active workflow
if (this.state.activeWorkflowId && this.env.AGENT_WORKFLOW) {
if (this.state.activeWorkflowId && this.env.AMBY_AGENT_EXECUTION) {
try {
const instance = await this.env.AGENT_WORKFLOW.get(this.state.activeWorkflowId)
const instance = await this.env.AMBY_AGENT_EXECUTION.get(this.state.activeWorkflowId)
await Sentry.startSpan(
{
op: "workflow.event",
name: "AgentExecutionWorkflow.sendEvent",
name: "amby_AgentExecution.sendEvent",
},
async () => {
await instance.sendEvent({
Expand Down Expand Up @@ -168,7 +168,7 @@ export class ConversationSession extends DurableObject<WorkerBindings> {
const pendingFrom = await this.ctx.storage.get<TelegramFrom>("pendingFrom")
if (this.state.chatId) {
setTelegramScope({
component: "conversation-session.alarm",
component: "amby_conversation.alarm",
chatId: this.state.chatId,
from: pendingFrom ?? null,
userId: this.state.userId,
Expand All @@ -179,7 +179,7 @@ export class ConversationSession extends DurableObject<WorkerBindings> {
},
})
} else {
setWorkerScope("conversation-session.alarm", {
setWorkerScope("amby_conversation.alarm", {
buffered_message_count: this.state.buffer.length,
session_status: this.state.status,
})
Expand All @@ -205,13 +205,13 @@ export class ConversationSession extends DurableObject<WorkerBindings> {
this.state.status = "processing"

// Launch the workflow
const workflow = this.env.AGENT_WORKFLOW
const workflow = this.env.AMBY_AGENT_EXECUTION
if (workflow) {
try {
const instance = await Sentry.startSpan(
{
op: "workflow.start",
name: "AgentExecutionWorkflow.create",
name: "amby_AgentExecution.create",
},
() =>
workflow.create({
Expand Down Expand Up @@ -239,7 +239,7 @@ export class ConversationSession extends DurableObject<WorkerBindings> {
await this.ctx.storage.setAlarm(Date.now() + 5000)
}
} else {
console.error("[DO] AGENT_WORKFLOW binding not available")
console.error("[DO] AMBY_AGENT_EXECUTION binding not available")
this.state.status = "idle"
}

Expand All @@ -249,7 +249,7 @@ export class ConversationSession extends DurableObject<WorkerBindings> {
async completeExecution(result: { userId?: string; conversationId?: string }): Promise<void> {
await this.hydrate()
setTelegramScope({
component: "conversation-session.complete",
component: "amby_conversation.complete",
chatId: this.state.chatId ?? undefined,
userId: result.userId ?? this.state.userId,
conversationId: result.conversationId ?? this.state.conversationId,
Expand Down
10 changes: 5 additions & 5 deletions apps/api/src/queue/consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ export async function handleQueueBatch(
try {
await runtime.runPromise(
handleCommand(parsedCommand, from, chatId, {
sandboxWorkflow: env.SANDBOX_WORKFLOW,
sandboxWorkflow: env.AMBY_SANDBOX_PROVISION,
}),
)
Sentry.logger.info("Telegram command processed", {
Expand All @@ -83,10 +83,10 @@ export async function handleQueueBatch(
if (!bufferedMessage) {
return
}
// Route supported Telegram messages to ConversationSession Durable Object
const doBinding = env.CONVERSATION_SESSION
// Route supported Telegram messages to the amby_Conversation Durable Object.
const doBinding = env.AMBY_CONVERSATION
if (!doBinding) {
console.error("[Queue] CONVERSATION_SESSION binding not available")
console.error("[Queue] AMBY_CONVERSATION binding not available")
return
}

Expand All @@ -96,7 +96,7 @@ export async function handleQueueBatch(
await Sentry.startSpan(
{
op: "durable-object.rpc",
name: "ConversationSession.ingestMessage",
name: "amby_Conversation.ingestMessage",
},
async () => {
await stub.ingestMessage({
Expand Down
44 changes: 44 additions & 0 deletions apps/api/src/worker-surface.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import { describe, expect, it } from "bun:test"

async function readText(relativePath: string) {
return await Bun.file(new URL(relativePath, import.meta.url)).text()
}

describe("Cloudflare worker surface", () => {
it("exports the renamed Durable Objects and Workflows", async () => {
const source = await readText("./worker.ts")

for (const exportName of [
"AmbyConversation",
"AmbyChatState",
"AmbyAgentExecution",
"AmbySandboxProvision",
"AmbyVolumeProvision",
]) {
expect(source).toContain(exportName)
}
})

it("keeps Wrangler config aligned with the renamed worker entities", async () => {
const source = await readText("../wrangler.toml")

expect(source).toContain('{ name = "AMBY_CONVERSATION", class_name = "AmbyConversation" }')
expect(source).toContain('{ name = "AMBY_CHAT_STATE", class_name = "AmbyChatState" }')
expect(source).toContain('binding = "AMBY_AGENT_EXECUTION"')
expect(source).toContain('class_name = "AmbyAgentExecution"')
expect(source).toContain('binding = "AMBY_SANDBOX_PROVISION"')
expect(source).toContain('class_name = "AmbySandboxProvision"')
expect(source).toContain('binding = "AMBY_VOLUME_PROVISION"')
expect(source).toContain('class_name = "AmbyVolumeProvision"')
expect(source).toContain('tag = "v3"')
expect(source).toContain('new_classes = ["AmbyConversation"]')
expect(source).toContain('tag = "v4"')
expect(source).toContain('new_sqlite_classes = ["AmbyChatState"]')

expect(source).not.toContain('{ name = "CONVERSATION_SESSION", class_name = "ConversationSession" }')
expect(source).not.toContain('{ name = "CHAT_STATE", class_name = "ChatStateDO" }')
expect(source).not.toContain('binding = "AGENT_WORKFLOW"')
expect(source).not.toContain('binding = "SANDBOX_WORKFLOW"')
expect(source).not.toContain('binding = "VOLUME_WORKFLOW"')
})
})
40 changes: 10 additions & 30 deletions apps/api/src/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,42 +21,22 @@ import {
createCloudflareChatState,
} from "./chat-state/cloudflare-chat-state"
import { handleExpiredConnectedAccount } from "./composio/expired-account"
import { ChatStateDO as ChatStateDOBase } from "./durable-objects/chat-state"
import { ConversationSession as ConversationSessionBase } from "./durable-objects/conversation-session"
import { AmbyChatState } from "./durable-objects/chat-state"
import { AmbyConversation } from "./durable-objects/conversation-session"
import { handleScheduledReconciliation } from "./handlers/reconciliation"
import { handleTaskEventPost } from "./handlers/task-events"
import { getHomeResponse } from "./home"
import { handleQueueBatch } from "./queue/consumer"
import { makeAgentRuntimeForConsumer, makeRuntimeForConsumer } from "./queue/runtime"
import { getSentryOptions, getSentryOptionsOrFallback, setTelegramScope } from "./sentry"
import { AgentExecutionWorkflow as AgentExecutionWorkflowBase } from "./workflows/agent-execution"
import { SandboxProvisionWorkflow as SandboxProvisionWorkflowBase } from "./workflows/sandbox-provision"
import { VolumeProvisionWorkflow as VolumeProvisionWorkflowBase } from "./workflows/volume-provision"

// Re-export instrumented Durable Object and Workflow classes so Cloudflare can discover them
export const ConversationSession = Sentry.instrumentDurableObjectWithSentry(
getSentryOptionsOrFallback,
ConversationSessionBase,
)
export const ChatStateDO = Sentry.instrumentDurableObjectWithSentry(
getSentryOptionsOrFallback,
ChatStateDOBase,
)
export const AgentExecutionWorkflow = Sentry.instrumentWorkflowWithSentry(
getSentryOptionsOrFallback,
AgentExecutionWorkflowBase,
)
export const SandboxProvisionWorkflow = Sentry.instrumentWorkflowWithSentry(
getSentryOptionsOrFallback,
SandboxProvisionWorkflowBase,
)
export const VolumeProvisionWorkflow = Sentry.instrumentWorkflowWithSentry(
getSentryOptionsOrFallback,
VolumeProvisionWorkflowBase,
)
import { getSentryOptions, setTelegramScope } from "./sentry"
import { AmbyAgentExecution } from "./workflows/agent-execution"
import { AmbySandboxProvision } from "./workflows/sandbox-provision"
import { AmbyVolumeProvision } from "./workflows/volume-provision"

export { AmbyConversation, AmbyChatState, AmbyAgentExecution, AmbySandboxProvision, AmbyVolumeProvision }

type ApiBindings = WorkerBindings & {
CHAT_STATE: ChatStateNamespaceLike
AMBY_CHAT_STATE: ChatStateNamespaceLike
}

type Env = { Bindings: ApiBindings; Variables: { posthogDistinctId?: string } }
Expand Down Expand Up @@ -213,7 +193,7 @@ function getOrCreateWorkerChatState(env: ApiBindings) {
// Intentionally route all Chat SDK transport state through one unsharded DO instance for now.
// Shard by adapter name once webhook throughput or lock contention justifies it.
// connect() is not called here — the Chat SDK calls state.connect() during init.
workerChatState = createCloudflareChatState({ namespace: env.CHAT_STATE })
workerChatState = createCloudflareChatState({ namespace: env.AMBY_CHAT_STATE })
return workerChatState
}

Expand Down
6 changes: 3 additions & 3 deletions apps/api/src/workflows/agent-execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ export interface AgentExecutionParams {
parentContext?: string
}

export class AgentExecutionWorkflow extends WorkflowEntrypoint<
export class AmbyAgentExecution extends WorkflowEntrypoint<
WorkerBindings,
AgentExecutionParams
> {
Expand All @@ -29,7 +29,7 @@ export class AgentExecutionWorkflow extends WorkflowEntrypoint<
let { userId, conversationId } = event.payload

setTelegramScope({
component: "workflow.agent_execution",
component: "workflow.amby_agent_execution",
chatId,
from,
userId,
Expand Down Expand Up @@ -299,7 +299,7 @@ export class AgentExecutionWorkflow extends WorkflowEntrypoint<
userId: string | null,
conversationId: string | null | undefined,
) {
const doBinding = this.env.CONVERSATION_SESSION
const doBinding = this.env.AMBY_CONVERSATION
if (isSubAgent || !doBinding) return

await step.do("complete", async () => {
Expand Down
8 changes: 4 additions & 4 deletions apps/api/src/workflows/sandbox-provision.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,13 @@ export interface SandboxProvisionParams {
userId: string
}

export class SandboxProvisionWorkflow extends WorkflowEntrypoint<
export class AmbySandboxProvision extends WorkflowEntrypoint<
WorkerBindings,
SandboxProvisionParams
> {
async run(event: WorkflowEvent<SandboxProvisionParams>, step: WorkflowStep) {
const { userId } = event.payload
const scope = setWorkerScope("workflow.sandbox_provision", {
const scope = setWorkerScope("workflow.amby_sandbox_provision", {
workflow_instance_id: event.instanceId,
user_id: userId,
})
Expand Down Expand Up @@ -132,9 +132,9 @@ export class SandboxProvisionWorkflow extends WorkflowEntrypoint<

if (volumeRow.status !== "ready") {
await upsertMainSandbox(null, "volume_creating", volumeRow.id)
const volumeWorkflow = env.VOLUME_WORKFLOW
const volumeWorkflow = env.AMBY_VOLUME_PROVISION
if (!volumeWorkflow) {
throw new Error("VOLUME_WORKFLOW binding is not configured.")
throw new Error("AMBY_VOLUME_PROVISION binding is not configured.")
}

await step.do("start-volume-workflow", async () => {
Expand Down
6 changes: 3 additions & 3 deletions apps/api/src/workflows/volume-provision.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ export interface VolumeProvisionResult {
status: "creating" | "ready" | "error" | "deleted"
}

export class VolumeProvisionWorkflow extends WorkflowEntrypoint<
export class AmbyVolumeProvision extends WorkflowEntrypoint<
WorkerBindings,
VolumeProvisionParams
> {
async run(event: WorkflowEvent<VolumeProvisionParams>, step: WorkflowStep) {
const { userId, parentWorkflowId } = event.payload
const scope = setWorkerScope("workflow.volume_provision", {
const scope = setWorkerScope("workflow.amby_volume_provision", {
workflow_instance_id: event.instanceId,
user_id: userId,
})
Expand Down Expand Up @@ -131,7 +131,7 @@ export class VolumeProvisionWorkflow extends WorkflowEntrypoint<
}
}

const sandboxWorkflow = env.SANDBOX_WORKFLOW
const sandboxWorkflow = env.AMBY_SANDBOX_PROVISION
if (parentWorkflowId && sandboxWorkflow) {
await step.do(
"notify-parent",
Expand Down
32 changes: 20 additions & 12 deletions apps/api/wrangler.toml
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,11 @@ dead_letter_queue = "telegram-dlq"
binding = "TELEGRAM_DLQ"
queue = "telegram-dlq"

# --- Durable Object: ConversationSession ---
# --- Durable Objects: Amby runtime state ---
[durable_objects]
bindings = [
{ name = "CONVERSATION_SESSION", class_name = "ConversationSession" },
{ name = "CHAT_STATE", class_name = "ChatStateDO" }
{ name = "AMBY_CONVERSATION", class_name = "AmbyConversation" },
{ name = "AMBY_CHAT_STATE", class_name = "AmbyChatState" }
]

[[migrations]]
Expand All @@ -123,21 +123,29 @@ new_classes = ["ConversationSession"]
tag = "v2"
new_sqlite_classes = ["ChatStateDO"]

[[migrations]]
tag = "v3"
new_classes = ["AmbyConversation"]

[[migrations]]
tag = "v4"
new_sqlite_classes = ["AmbyChatState"]

# --- Workflow: AgentExecution ---
[[workflows]]
name = "agent-execution"
binding = "AGENT_WORKFLOW"
class_name = "AgentExecutionWorkflow"
name = "amby-agent-execution"
binding = "AMBY_AGENT_EXECUTION"
class_name = "AmbyAgentExecution"

[[workflows]]
name = "sandbox-provision"
binding = "SANDBOX_WORKFLOW"
class_name = "SandboxProvisionWorkflow"
name = "amby-sandbox-provision"
binding = "AMBY_SANDBOX_PROVISION"
class_name = "AmbySandboxProvision"

[[workflows]]
name = "volume-provision"
binding = "VOLUME_WORKFLOW"
class_name = "VolumeProvisionWorkflow"
name = "amby-volume-provision"
binding = "AMBY_VOLUME_PROVISION"
class_name = "AmbyVolumeProvision"

# Task reconciliation + Telegram completion notifications
[triggers]
Expand Down
Loading