diff --git a/src/content/__tests__/pollContentRuns.test.ts b/src/content/__tests__/pollContentRuns.test.ts new file mode 100644 index 0000000..9caf66e --- /dev/null +++ b/src/content/__tests__/pollContentRuns.test.ts @@ -0,0 +1,113 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; + +vi.mock("@trigger.dev/sdk/v3", () => ({ + runs: { + poll: vi.fn(), + }, +})); + +vi.mock("../../sandboxes/logStep", () => ({ + logStep: vi.fn(), +})); + +import { runs } from "@trigger.dev/sdk/v3"; +import { pollContentRuns } from "../pollContentRuns"; + +const mockPoll = vi.mocked(runs.poll); + +beforeEach(() => { + vi.clearAllMocks(); +}); + +describe("pollContentRuns", () => { + it("returns completed results when all runs finish", async () => { + mockPoll.mockResolvedValue({ + status: "COMPLETED", + output: { videoSourceUrl: "https://v.mp4", captionText: "caption" }, + } as any); + + const results = await pollContentRuns(["run-1", "run-2"]); + + expect(results).toEqual([ + { runId: "run-1", status: "completed", videoUrl: "https://v.mp4", captionText: "caption" }, + { runId: "run-2", status: "completed", videoUrl: "https://v.mp4", captionText: "caption" }, + ]); + expect(mockPoll).toHaveBeenCalledTimes(2); + expect(mockPoll).toHaveBeenCalledWith("run-1", { pollIntervalMs: 30_000 }); + expect(mockPoll).toHaveBeenCalledWith("run-2", { pollIntervalMs: 30_000 }); + }); + + it("returns failed result for FAILED run status", async () => { + mockPoll.mockResolvedValue({ status: "FAILED" } as any); + + const results = await pollContentRuns(["run-1"]); + + expect(results).toEqual([ + { runId: "run-1", status: "failed", error: "Run failed" }, + ]); + }); + + it("returns failed result for CANCELED run status", async () => { + mockPoll.mockResolvedValue({ status: "CANCELED" } as any); + + const results = await pollContentRuns(["run-1"]); + + expect(results).toEqual([ + { runId: "run-1", status: "failed", error: "Run canceled" }, + ]); + }); + + it("returns failed result when runs.poll throws", async () => { + mockPoll.mockRejectedValue(new Error("Poll timeout")); + + const results = await pollContentRuns(["run-1"]); + + expect(results).toEqual([ + { runId: "run-1", status: "failed", error: "Poll timeout" }, + ]); + }); + + it("handles null output gracefully", async () => { + mockPoll.mockResolvedValue({ + status: "COMPLETED", + output: null, + } as any); + + const results = await pollContentRuns(["run-1"]); + + expect(results).toEqual([ + { runId: "run-1", status: "completed", videoUrl: undefined, captionText: undefined }, + ]); + }); + + it("polls all runs concurrently", async () => { + let resolveOrder: string[] = []; + mockPoll.mockImplementation(async (runId: any) => { + resolveOrder.push(runId); + return { status: "COMPLETED", output: null } as any; + }); + + await pollContentRuns(["run-1", "run-2", "run-3"]); + + expect(resolveOrder).toEqual(["run-1", "run-2", "run-3"]); + expect(mockPoll).toHaveBeenCalledTimes(3); + }); + + it("handles mixed results — some completed, some failed", async () => { + mockPoll + .mockResolvedValueOnce({ + status: "COMPLETED", + output: { videoSourceUrl: "https://v.mp4" }, + } as any) + .mockResolvedValueOnce({ status: "FAILED" } as any) + .mockRejectedValueOnce(new Error("Network error")); + + const results = await pollContentRuns(["run-1", "run-2", "run-3"]); + + expect(results).toEqual([ + { runId: "run-1", status: "completed", videoUrl: "https://v.mp4", captionText: undefined }, + { runId: "run-2", status: "failed", error: "Run failed" }, + { runId: "run-3", status: "failed", error: "Network error" }, + ]); + }); +}); diff --git a/src/content/__tests__/resolveOverallStatus.test.ts b/src/content/__tests__/resolveOverallStatus.test.ts new file mode 100644 index 0000000..61df28b --- /dev/null +++ b/src/content/__tests__/resolveOverallStatus.test.ts @@ -0,0 +1,52 @@ +import { describe, it, expect } from "vitest"; +import { resolveOverallStatus } from "../resolveOverallStatus"; +import type { ContentRunResult } from "../pollContentRuns"; + +describe("resolveOverallStatus", () => { + it("returns 'completed' when all runs completed", () => { + const results: ContentRunResult[] = [ + { runId: "run-1", status: "completed", videoUrl: "https://example.com/v.mp4" }, + { runId: "run-2", status: "completed", captionText: "Hello" }, + ]; + expect(resolveOverallStatus(results)).toBe("completed"); + }); + + it("returns 'failed' when any run failed and none timed out", () => { + const results: ContentRunResult[] = [ + { runId: "run-1", status: "completed" }, + { runId: "run-2", status: "failed", error: "Run crashed" }, + ]; + expect(resolveOverallStatus(results)).toBe("failed"); + }); + + it("returns 'timeout' when any run timed out", () => { + const results: ContentRunResult[] = [ + { runId: "run-1", status: "completed" }, + { runId: "run-2", status: "timeout" }, + ]; + expect(resolveOverallStatus(results)).toBe("timeout"); + }); + + it("returns 'timeout' over 'failed' when both present", () => { + const results: ContentRunResult[] = [ + { runId: "run-1", status: "failed", error: "Run crashed" }, + { runId: "run-2", status: "timeout" }, + ]; + expect(resolveOverallStatus(results)).toBe("timeout"); + }); + + it("returns 'completed' for a single completed run", () => { + const results: ContentRunResult[] = [ + { runId: "run-1", status: "completed" }, + ]; + expect(resolveOverallStatus(results)).toBe("completed"); + }); + + it("returns 'failed' when all runs failed", () => { + const results: ContentRunResult[] = [ + { runId: "run-1", status: "failed", error: "Run crashed" }, + { runId: "run-2", status: "failed", error: "Run canceled" }, + ]; + expect(resolveOverallStatus(results)).toBe("failed"); + }); +}); diff --git a/src/content/__tests__/sendContentCallback.test.ts b/src/content/__tests__/sendContentCallback.test.ts new file mode 100644 index 0000000..c580d7c --- /dev/null +++ b/src/content/__tests__/sendContentCallback.test.ts @@ -0,0 +1,63 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; + +vi.mock("../../sandboxes/logStep", () => ({ + logStep: vi.fn(), +})); + +import { sendContentCallback } from "../sendContentCallback"; +import type { ContentRunResult } from "../pollContentRuns"; + +const originalEnv = process.env; + +beforeEach(() => { + process.env = { ...originalEnv, CODING_AGENT_CALLBACK_SECRET: "test-secret" }; + vi.clearAllMocks(); +}); + +afterEach(() => { + process.env = originalEnv; + vi.restoreAllMocks(); +}); + +describe("sendContentCallback", () => { + it("throws when CODING_AGENT_CALLBACK_SECRET is missing", async () => { + delete process.env.CODING_AGENT_CALLBACK_SECRET; + + await expect( + sendContentCallback("thread-1", "completed", []), + ).rejects.toThrow("CODING_AGENT_CALLBACK_SECRET is required"); + }); + + it("sends correct payload to callback endpoint", async () => { + const mockFetch = vi.spyOn(globalThis, "fetch").mockResolvedValue( + new Response(null, { status: 200 }), + ); + + const results: ContentRunResult[] = [ + { runId: "run-1", status: "completed", videoUrl: "https://v.mp4" }, + ]; + + await sendContentCallback("thread-1", "completed", results); + + expect(mockFetch).toHaveBeenCalledTimes(1); + const [url, options] = mockFetch.mock.calls[0]; + expect(url).toBe("https://recoup-api.vercel.app/api/content-agent/callback"); + expect(options?.method).toBe("POST"); + expect(JSON.parse(options?.body as string)).toEqual({ + threadId: "thread-1", + status: "completed", + results, + }); + expect((options?.headers as Record)["x-callback-secret"]).toBe("test-secret"); + }); + + it("throws on non-ok response", async () => { + vi.spyOn(globalThis, "fetch").mockResolvedValue( + new Response("Internal error", { status: 500 }), + ); + + await expect( + sendContentCallback("thread-1", "completed", []), + ).rejects.toThrow("Callback failed with status 500"); + }); +}); diff --git a/src/content/pollContentRuns.ts b/src/content/pollContentRuns.ts new file mode 100644 index 0000000..2cd6cbc --- /dev/null +++ b/src/content/pollContentRuns.ts @@ -0,0 +1,63 @@ +import { runs } from "@trigger.dev/sdk/v3"; +import { logStep } from "../sandboxes/logStep"; + +const POLL_INTERVAL_MS = 30_000; + +export type ContentRunResult = { + runId: string; + status: "completed" | "failed" | "timeout"; + videoUrl?: string; + captionText?: string; + error?: string; +}; + +/** + * Waits for all Trigger.dev create-content runs to reach a terminal state + * using the native runs.poll() function, then maps results. + */ +export async function pollContentRuns( + runIds: string[], +): Promise { + const settled = await Promise.allSettled( + runIds.map(runId => + runs.poll(runId, { pollIntervalMs: POLL_INTERVAL_MS }), + ), + ); + + return settled.map((result, i) => { + const runId = runIds[i]; + + if (result.status === "rejected") { + logStep(`Run poll failed: ${runId}`, false, { runId, error: result.reason }); + return { + runId, + status: "failed" as const, + error: result.reason?.message ?? "Unknown error", + }; + } + + const run = result.value; + + if (run.status === "COMPLETED") { + const output = run.output as { + videoSourceUrl?: string; + captionText?: string; + } | null; + + logStep(`Run completed: ${runId}`, false, { runId }); + return { + runId, + status: "completed" as const, + videoUrl: output?.videoSourceUrl, + captionText: output?.captionText, + }; + } + + logStep(`Run failed: ${runId} (${run.status})`, false, { runId, status: run.status }); + return { + runId, + status: "failed" as const, + error: `Run ${run.status.toLowerCase()}`, + }; + }); +} diff --git a/src/content/resolveOverallStatus.ts b/src/content/resolveOverallStatus.ts new file mode 100644 index 0000000..0594722 --- /dev/null +++ b/src/content/resolveOverallStatus.ts @@ -0,0 +1,15 @@ +import type { ContentRunResult } from "./pollContentRuns"; + +/** + * Determines overall status from individual run results. + */ +export function resolveOverallStatus( + results: ContentRunResult[], +): "completed" | "failed" | "timeout" { + const allCompleted = results.every(r => r.status === "completed"); + const anyTimeout = results.some(r => r.status === "timeout"); + + if (anyTimeout) return "timeout"; + if (allCompleted) return "completed"; + return "failed"; +} diff --git a/src/content/sendContentCallback.ts b/src/content/sendContentCallback.ts new file mode 100644 index 0000000..77687ab --- /dev/null +++ b/src/content/sendContentCallback.ts @@ -0,0 +1,46 @@ +import { logStep } from "../sandboxes/logStep"; +import { NEW_API_BASE_URL } from "../consts"; +import type { ContentRunResult } from "./pollContentRuns"; + +/** + * Sends aggregated content run results to the callback endpoint. + * Throws on missing env vars or failed callback. + */ +export async function sendContentCallback( + callbackThreadId: string, + overallStatus: "completed" | "failed" | "timeout", + results: ContentRunResult[], +): Promise { + const callbackSecret = process.env.CODING_AGENT_CALLBACK_SECRET; + if (!callbackSecret) { + throw new Error("CODING_AGENT_CALLBACK_SECRET is required"); + } + + const callbackUrl = `${NEW_API_BASE_URL}/api/content-agent/callback`; + + logStep("Calling callback", true, { + callbackUrl, + overallStatus, + resultCount: results.length, + }); + + const response = await fetch(callbackUrl, { + method: "POST", + headers: { + "Content-Type": "application/json", + "x-callback-secret": callbackSecret, + }, + body: JSON.stringify({ + threadId: callbackThreadId, + status: overallStatus, + results, + }), + signal: AbortSignal.timeout(30_000), + }); + + if (!response.ok) { + const body = await response.text().catch(() => ""); + logStep("Callback failed", false, { status: response.status, body }); + throw new Error(`Callback failed with status ${response.status}`); + } +} diff --git a/src/schemas/pollContentRunSchema.ts b/src/schemas/pollContentRunSchema.ts new file mode 100644 index 0000000..29722e5 --- /dev/null +++ b/src/schemas/pollContentRunSchema.ts @@ -0,0 +1,6 @@ +import { z } from "zod"; + +export const pollContentRunPayloadSchema = z.object({ + runIds: z.array(z.string()).min(1), + callbackThreadId: z.string().min(1), +}); diff --git a/src/tasks/pollContentRunTask.ts b/src/tasks/pollContentRunTask.ts new file mode 100644 index 0000000..4f6d534 --- /dev/null +++ b/src/tasks/pollContentRunTask.ts @@ -0,0 +1,31 @@ +import { schemaTask } from "@trigger.dev/sdk/v3"; +import { pollContentRunPayloadSchema } from "../schemas/pollContentRunSchema"; +import { logStep } from "../sandboxes/logStep"; +import { pollContentRuns } from "../content/pollContentRuns"; +import { resolveOverallStatus } from "../content/resolveOverallStatus"; +import { sendContentCallback } from "../content/sendContentCallback"; + +/** + * Waits for Trigger.dev create-content task runs to finish using + * native runs.poll(), then calls the content-agent callback with results. + */ +export const pollContentRunTask = schemaTask({ + id: "poll-content-run", + schema: pollContentRunPayloadSchema, + maxDuration: 60 * 35, + retry: { + maxAttempts: 0, + }, + run: async payload => { + const { runIds, callbackThreadId } = payload; + + logStep("Starting poll-content-run", true, { runIds, callbackThreadId }); + + const results = await pollContentRuns(runIds); + const overallStatus = resolveOverallStatus(results); + + await sendContentCallback(callbackThreadId, overallStatus, results); + + return { status: overallStatus, results }; + }, +});