-
Notifications
You must be signed in to change notification settings - Fork 2
feat: add poll-content-run Trigger.dev task #109
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
f95b105
5763c65
e805467
55d0d19
08d4afa
adc8e47
d1c8c62
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,113 @@ | ||
| import { describe, it, expect, vi, beforeEach } from "vitest"; | ||
|
|
||
| vi.mock("@trigger.dev/sdk/v3", () => ({ | ||
| runs: { | ||
| poll: vi.fn(), | ||
| }, | ||
| })); | ||
|
|
||
| vi.mock("../../sandboxes/logStep", () => ({ | ||
| logStep: vi.fn(), | ||
| })); | ||
|
|
||
| import { runs } from "@trigger.dev/sdk/v3"; | ||
| import { pollContentRuns } from "../pollContentRuns"; | ||
|
|
||
| const mockPoll = vi.mocked(runs.poll); | ||
|
|
||
| beforeEach(() => { | ||
| vi.clearAllMocks(); | ||
| }); | ||
|
|
||
| describe("pollContentRuns", () => { | ||
| it("returns completed results when all runs finish", async () => { | ||
| mockPoll.mockResolvedValue({ | ||
| status: "COMPLETED", | ||
| output: { videoSourceUrl: "https://v.mp4", captionText: "caption" }, | ||
| } as any); | ||
|
|
||
| const results = await pollContentRuns(["run-1", "run-2"]); | ||
|
|
||
| expect(results).toEqual([ | ||
| { runId: "run-1", status: "completed", videoUrl: "https://v.mp4", captionText: "caption" }, | ||
| { runId: "run-2", status: "completed", videoUrl: "https://v.mp4", captionText: "caption" }, | ||
| ]); | ||
| expect(mockPoll).toHaveBeenCalledTimes(2); | ||
| expect(mockPoll).toHaveBeenCalledWith("run-1", { pollIntervalMs: 30_000 }); | ||
| expect(mockPoll).toHaveBeenCalledWith("run-2", { pollIntervalMs: 30_000 }); | ||
| }); | ||
|
|
||
| it("returns failed result for FAILED run status", async () => { | ||
| mockPoll.mockResolvedValue({ status: "FAILED" } as any); | ||
|
|
||
| const results = await pollContentRuns(["run-1"]); | ||
|
|
||
| expect(results).toEqual([ | ||
| { runId: "run-1", status: "failed", error: "Run failed" }, | ||
| ]); | ||
| }); | ||
|
|
||
| it("returns failed result for CANCELED run status", async () => { | ||
| mockPoll.mockResolvedValue({ status: "CANCELED" } as any); | ||
|
|
||
| const results = await pollContentRuns(["run-1"]); | ||
|
|
||
| expect(results).toEqual([ | ||
| { runId: "run-1", status: "failed", error: "Run canceled" }, | ||
| ]); | ||
| }); | ||
|
|
||
| it("returns failed result when runs.poll throws", async () => { | ||
| mockPoll.mockRejectedValue(new Error("Poll timeout")); | ||
|
|
||
| const results = await pollContentRuns(["run-1"]); | ||
|
|
||
| expect(results).toEqual([ | ||
| { runId: "run-1", status: "failed", error: "Poll timeout" }, | ||
| ]); | ||
| }); | ||
|
|
||
| it("handles null output gracefully", async () => { | ||
| mockPoll.mockResolvedValue({ | ||
| status: "COMPLETED", | ||
| output: null, | ||
| } as any); | ||
|
|
||
| const results = await pollContentRuns(["run-1"]); | ||
|
|
||
| expect(results).toEqual([ | ||
| { runId: "run-1", status: "completed", videoUrl: undefined, captionText: undefined }, | ||
| ]); | ||
| }); | ||
|
|
||
| it("polls all runs concurrently", async () => { | ||
| let resolveOrder: string[] = []; | ||
| mockPoll.mockImplementation(async (runId: any) => { | ||
| resolveOrder.push(runId); | ||
| return { status: "COMPLETED", output: null } as any; | ||
| }); | ||
|
|
||
| await pollContentRuns(["run-1", "run-2", "run-3"]); | ||
|
|
||
| expect(resolveOrder).toEqual(["run-1", "run-2", "run-3"]); | ||
| expect(mockPoll).toHaveBeenCalledTimes(3); | ||
| }); | ||
|
|
||
| it("handles mixed results — some completed, some failed", async () => { | ||
| mockPoll | ||
| .mockResolvedValueOnce({ | ||
| status: "COMPLETED", | ||
| output: { videoSourceUrl: "https://v.mp4" }, | ||
| } as any) | ||
| .mockResolvedValueOnce({ status: "FAILED" } as any) | ||
| .mockRejectedValueOnce(new Error("Network error")); | ||
|
|
||
| const results = await pollContentRuns(["run-1", "run-2", "run-3"]); | ||
|
|
||
| expect(results).toEqual([ | ||
| { runId: "run-1", status: "completed", videoUrl: "https://v.mp4", captionText: undefined }, | ||
| { runId: "run-2", status: "failed", error: "Run failed" }, | ||
| { runId: "run-3", status: "failed", error: "Network error" }, | ||
| ]); | ||
| }); | ||
| }); |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,52 @@ | ||
| import { describe, it, expect } from "vitest"; | ||
| import { resolveOverallStatus } from "../resolveOverallStatus"; | ||
| import type { ContentRunResult } from "../pollContentRuns"; | ||
|
|
||
| describe("resolveOverallStatus", () => { | ||
| it("returns 'completed' when all runs completed", () => { | ||
| const results: ContentRunResult[] = [ | ||
| { runId: "run-1", status: "completed", videoUrl: "https://example.com/v.mp4" }, | ||
| { runId: "run-2", status: "completed", captionText: "Hello" }, | ||
| ]; | ||
| expect(resolveOverallStatus(results)).toBe("completed"); | ||
| }); | ||
|
|
||
| it("returns 'failed' when any run failed and none timed out", () => { | ||
| const results: ContentRunResult[] = [ | ||
| { runId: "run-1", status: "completed" }, | ||
| { runId: "run-2", status: "failed", error: "Run crashed" }, | ||
| ]; | ||
| expect(resolveOverallStatus(results)).toBe("failed"); | ||
| }); | ||
|
|
||
| it("returns 'timeout' when any run timed out", () => { | ||
| const results: ContentRunResult[] = [ | ||
| { runId: "run-1", status: "completed" }, | ||
| { runId: "run-2", status: "timeout" }, | ||
| ]; | ||
| expect(resolveOverallStatus(results)).toBe("timeout"); | ||
| }); | ||
|
|
||
| it("returns 'timeout' over 'failed' when both present", () => { | ||
| const results: ContentRunResult[] = [ | ||
| { runId: "run-1", status: "failed", error: "Run crashed" }, | ||
| { runId: "run-2", status: "timeout" }, | ||
| ]; | ||
| expect(resolveOverallStatus(results)).toBe("timeout"); | ||
| }); | ||
|
|
||
| it("returns 'completed' for a single completed run", () => { | ||
| const results: ContentRunResult[] = [ | ||
| { runId: "run-1", status: "completed" }, | ||
| ]; | ||
| expect(resolveOverallStatus(results)).toBe("completed"); | ||
| }); | ||
|
|
||
| it("returns 'failed' when all runs failed", () => { | ||
| const results: ContentRunResult[] = [ | ||
| { runId: "run-1", status: "failed", error: "Run crashed" }, | ||
| { runId: "run-2", status: "failed", error: "Run canceled" }, | ||
| ]; | ||
| expect(resolveOverallStatus(results)).toBe("failed"); | ||
| }); | ||
| }); |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,63 @@ | ||
| import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; | ||
|
|
||
| vi.mock("../../sandboxes/logStep", () => ({ | ||
| logStep: vi.fn(), | ||
| })); | ||
|
|
||
| import { sendContentCallback } from "../sendContentCallback"; | ||
| import type { ContentRunResult } from "../pollContentRuns"; | ||
|
|
||
| const originalEnv = process.env; | ||
|
|
||
| beforeEach(() => { | ||
| process.env = { ...originalEnv, CODING_AGENT_CALLBACK_SECRET: "test-secret" }; | ||
| vi.clearAllMocks(); | ||
| }); | ||
|
|
||
| afterEach(() => { | ||
| process.env = originalEnv; | ||
| vi.restoreAllMocks(); | ||
| }); | ||
|
|
||
| describe("sendContentCallback", () => { | ||
| it("throws when CODING_AGENT_CALLBACK_SECRET is missing", async () => { | ||
| delete process.env.CODING_AGENT_CALLBACK_SECRET; | ||
|
|
||
| await expect( | ||
| sendContentCallback("thread-1", "completed", []), | ||
| ).rejects.toThrow("CODING_AGENT_CALLBACK_SECRET is required"); | ||
| }); | ||
|
|
||
| it("sends correct payload to callback endpoint", async () => { | ||
| const mockFetch = vi.spyOn(globalThis, "fetch").mockResolvedValue( | ||
| new Response(null, { status: 200 }), | ||
| ); | ||
|
|
||
| const results: ContentRunResult[] = [ | ||
| { runId: "run-1", status: "completed", videoUrl: "https://v.mp4" }, | ||
| ]; | ||
|
|
||
| await sendContentCallback("thread-1", "completed", results); | ||
|
|
||
| expect(mockFetch).toHaveBeenCalledTimes(1); | ||
| const [url, options] = mockFetch.mock.calls[0]; | ||
| expect(url).toBe("https://recoup-api.vercel.app/api/content-agent/callback"); | ||
| expect(options?.method).toBe("POST"); | ||
| expect(JSON.parse(options?.body as string)).toEqual({ | ||
| threadId: "thread-1", | ||
| status: "completed", | ||
| results, | ||
| }); | ||
| expect((options?.headers as Record<string, string>)["x-callback-secret"]).toBe("test-secret"); | ||
| }); | ||
|
|
||
| it("throws on non-ok response", async () => { | ||
| vi.spyOn(globalThis, "fetch").mockResolvedValue( | ||
| new Response("Internal error", { status: 500 }), | ||
| ); | ||
|
|
||
| await expect( | ||
| sendContentCallback("thread-1", "completed", []), | ||
| ).rejects.toThrow("Callback failed with status 500"); | ||
| }); | ||
| }); |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,63 @@ | ||
| import { runs } from "@trigger.dev/sdk/v3"; | ||
| import { logStep } from "../sandboxes/logStep"; | ||
|
|
||
| const POLL_INTERVAL_MS = 30_000; | ||
|
|
||
| export type ContentRunResult = { | ||
| runId: string; | ||
| status: "completed" | "failed" | "timeout"; | ||
| videoUrl?: string; | ||
| captionText?: string; | ||
| error?: string; | ||
| }; | ||
|
|
||
| /** | ||
| * Waits for all Trigger.dev create-content runs to reach a terminal state | ||
| * using the native runs.poll() function, then maps results. | ||
| */ | ||
| export async function pollContentRuns( | ||
| runIds: string[], | ||
| ): Promise<ContentRunResult[]> { | ||
| const settled = await Promise.allSettled( | ||
| runIds.map(runId => | ||
| runs.poll(runId, { pollIntervalMs: POLL_INTERVAL_MS }), | ||
| ), | ||
| ); | ||
|
|
||
| return settled.map((result, i) => { | ||
| const runId = runIds[i]; | ||
|
|
||
| if (result.status === "rejected") { | ||
| logStep(`Run poll failed: ${runId}`, false, { runId, error: result.reason }); | ||
| return { | ||
| runId, | ||
| status: "failed" as const, | ||
| error: result.reason?.message ?? "Unknown error", | ||
| }; | ||
| } | ||
|
|
||
| const run = result.value; | ||
|
|
||
| if (run.status === "COMPLETED") { | ||
| const output = run.output as { | ||
| videoSourceUrl?: string; | ||
| captionText?: string; | ||
| } | null; | ||
|
|
||
| logStep(`Run completed: ${runId}`, false, { runId }); | ||
| return { | ||
| runId, | ||
| status: "completed" as const, | ||
| videoUrl: output?.videoSourceUrl, | ||
| captionText: output?.captionText, | ||
| }; | ||
| } | ||
|
|
||
| logStep(`Run failed: ${runId} (${run.status})`, false, { runId, status: run.status }); | ||
| return { | ||
| runId, | ||
| status: "failed" as const, | ||
| error: `Run ${run.status.toLowerCase()}`, | ||
| }; | ||
| }); | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,15 @@ | ||
| import type { ContentRunResult } from "./pollContentRuns"; | ||
|
|
||
| /** | ||
| * Determines overall status from individual run results. | ||
| */ | ||
| export function resolveOverallStatus( | ||
| results: ContentRunResult[], | ||
| ): "completed" | "failed" | "timeout" { | ||
| const allCompleted = results.every(r => r.status === "completed"); | ||
| const anyTimeout = results.some(r => r.status === "timeout"); | ||
|
|
||
| if (anyTimeout) return "timeout"; | ||
| if (allCompleted) return "completed"; | ||
| return "failed"; | ||
| } |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,46 @@ | ||||||||||||||||||
| import { logStep } from "../sandboxes/logStep"; | ||||||||||||||||||
| import { NEW_API_BASE_URL } from "../consts"; | ||||||||||||||||||
| import type { ContentRunResult } from "./pollContentRuns"; | ||||||||||||||||||
|
|
||||||||||||||||||
| /** | ||||||||||||||||||
| * Sends aggregated content run results to the callback endpoint. | ||||||||||||||||||
| * Throws on missing env vars or failed callback. | ||||||||||||||||||
| */ | ||||||||||||||||||
| export async function sendContentCallback( | ||||||||||||||||||
| callbackThreadId: string, | ||||||||||||||||||
| overallStatus: "completed" | "failed" | "timeout", | ||||||||||||||||||
| results: ContentRunResult[], | ||||||||||||||||||
| ): Promise<void> { | ||||||||||||||||||
| const callbackSecret = process.env.CODING_AGENT_CALLBACK_SECRET; | ||||||||||||||||||
| if (!callbackSecret) { | ||||||||||||||||||
| throw new Error("CODING_AGENT_CALLBACK_SECRET is required"); | ||||||||||||||||||
| } | ||||||||||||||||||
|
Comment on lines
+14
to
+17
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Environment variable name mismatch with PR requirements. The code reads 🐛 Proposed fix- const callbackSecret = process.env.CODING_AGENT_CALLBACK_SECRET;
+ const callbackSecret = process.env.CONTENT_AGENT_CALLBACK_SECRET;
if (!callbackSecret) {
- throw new Error("CODING_AGENT_CALLBACK_SECRET is required");
+ throw new Error("CONTENT_AGENT_CALLBACK_SECRET is required");
}📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||
|
|
||||||||||||||||||
| const callbackUrl = `${NEW_API_BASE_URL}/api/content-agent/callback`; | ||||||||||||||||||
|
|
||||||||||||||||||
| logStep("Calling callback", true, { | ||||||||||||||||||
| callbackUrl, | ||||||||||||||||||
| overallStatus, | ||||||||||||||||||
| resultCount: results.length, | ||||||||||||||||||
| }); | ||||||||||||||||||
|
|
||||||||||||||||||
| const response = await fetch(callbackUrl, { | ||||||||||||||||||
| method: "POST", | ||||||||||||||||||
| headers: { | ||||||||||||||||||
| "Content-Type": "application/json", | ||||||||||||||||||
| "x-callback-secret": callbackSecret, | ||||||||||||||||||
| }, | ||||||||||||||||||
| body: JSON.stringify({ | ||||||||||||||||||
| threadId: callbackThreadId, | ||||||||||||||||||
| status: overallStatus, | ||||||||||||||||||
| results, | ||||||||||||||||||
| }), | ||||||||||||||||||
| signal: AbortSignal.timeout(30_000), | ||||||||||||||||||
| }); | ||||||||||||||||||
|
|
||||||||||||||||||
| if (!response.ok) { | ||||||||||||||||||
| const body = await response.text().catch(() => ""); | ||||||||||||||||||
| logStep("Callback failed", false, { status: response.status, body }); | ||||||||||||||||||
| throw new Error(`Callback failed with status ${response.status}`); | ||||||||||||||||||
| } | ||||||||||||||||||
| } | ||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| import { z } from "zod"; | ||
|
|
||
| export const pollContentRunPayloadSchema = z.object({ | ||
| runIds: z.array(z.string()).min(1), | ||
| callbackThreadId: z.string().min(1), | ||
| }); |
sweetmantech marked this conversation as resolved.
Show resolved
Hide resolved
|
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,31 @@ | ||
| import { schemaTask } from "@trigger.dev/sdk/v3"; | ||
| import { pollContentRunPayloadSchema } from "../schemas/pollContentRunSchema"; | ||
| import { logStep } from "../sandboxes/logStep"; | ||
| import { pollContentRuns } from "../content/pollContentRuns"; | ||
| import { resolveOverallStatus } from "../content/resolveOverallStatus"; | ||
| import { sendContentCallback } from "../content/sendContentCallback"; | ||
|
|
||
| /** | ||
| * Waits for Trigger.dev create-content task runs to finish using | ||
| * native runs.poll(), then calls the content-agent callback with results. | ||
| */ | ||
| export const pollContentRunTask = schemaTask({ | ||
| id: "poll-content-run", | ||
| schema: pollContentRunPayloadSchema, | ||
| maxDuration: 60 * 35, | ||
| retry: { | ||
| maxAttempts: 0, | ||
| }, | ||
| run: async payload => { | ||
| const { runIds, callbackThreadId } = payload; | ||
|
|
||
| logStep("Starting poll-content-run", true, { runIds, callbackThreadId }); | ||
|
|
||
| const results = await pollContentRuns(runIds); | ||
| const overallStatus = resolveOverallStatus(results); | ||
|
|
||
| await sendContentCallback(callbackThreadId, overallStatus, results); | ||
|
|
||
| return { status: overallStatus, results }; | ||
| }, | ||
| }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we polling the run status in a loop instead of using the native wait function?
https://trigger.dev/docs/runs#waiting-for-runs