diff --git a/app/api/coding-agent/[platform]/route.ts b/app/api/coding-agent/[platform]/route.ts index a51a2104..f58298ea 100644 --- a/app/api/coding-agent/[platform]/route.ts +++ b/app/api/coding-agent/[platform]/route.ts @@ -1,59 +1,13 @@ -import type { NextRequest } from "next/server"; -import { after } from "next/server"; +import { createPlatformRoutes } from "@/lib/agents/createPlatformRoutes"; import { codingAgentBot } from "@/lib/coding-agent/bot"; -import { handleUrlVerification } from "@/lib/slack/handleUrlVerification"; import "@/lib/coding-agent/handlers/registerHandlers"; /** - * GET /api/coding-agent/[platform] + * GET & POST /api/coding-agent/[platform] * - * Handles webhook verification handshakes (e.g. WhatsApp hub.challenge). - * - * @param request - The incoming verification request - * @param params - Route params containing the platform name - */ -export async function GET( - request: NextRequest, - { params }: { params: Promise<{ platform: string }> }, -) { - const { platform } = await params; - - const handler = codingAgentBot.webhooks[platform as keyof typeof codingAgentBot.webhooks]; - - if (!handler) { - return new Response("Unknown platform", { status: 404 }); - } - - return handler(request, { waitUntil: p => after(() => p) }); -} - -/** - * POST /api/coding-agent/[platform] - * - * Webhook endpoint for the coding agent bot. - * Handles Slack and WhatsApp webhooks via dynamic [platform] segment. - * - * @param request - The incoming webhook request - * @param params - Route params containing the platform name + * Webhook endpoints for the coding agent bot. + * Handles Slack, GitHub, and WhatsApp webhooks via dynamic [platform] segment. */ -export async function POST( - request: NextRequest, - { params }: { params: Promise<{ platform: string }> }, -) { - const { platform } = await params; - - if (platform === "slack") { - const verification = await handleUrlVerification(request); - if (verification) return verification; - } - - await codingAgentBot.initialize(); - - const handler = codingAgentBot.webhooks[platform as keyof typeof codingAgentBot.webhooks]; - - if (!handler) { - return new Response("Unknown platform", { status: 404 }); - } - - return handler(request, { waitUntil: p => after(() => p) }); -} +export const { GET, POST } = createPlatformRoutes({ + getBot: () => codingAgentBot, +}); diff --git a/app/api/content-agent/[platform]/route.ts b/app/api/content-agent/[platform]/route.ts new file mode 100644 index 00000000..0e982157 --- /dev/null +++ b/app/api/content-agent/[platform]/route.ts @@ -0,0 +1,14 @@ +import { createPlatformRoutes } from "@/lib/agents/createPlatformRoutes"; +import { contentAgentBot } from "@/lib/agents/content/bot"; +import "@/lib/agents/content/handlers/registerHandlers"; + +/** + * GET & POST /api/content-agent/[platform] + * + * Webhook endpoints for the content agent bot. + * Handles Slack webhooks via dynamic [platform] segment. + */ +export const { GET, POST } = createPlatformRoutes({ + getBot: () => contentAgentBot!, + isConfigured: () => contentAgentBot !== null, +}); diff --git a/app/api/content-agent/callback/route.ts b/app/api/content-agent/callback/route.ts new file mode 100644 index 00000000..3c1c0429 --- /dev/null +++ b/app/api/content-agent/callback/route.ts @@ -0,0 +1,21 @@ +import type { NextRequest } from "next/server"; +import { contentAgentBot } from "@/lib/agents/content/bot"; +import { handleContentAgentCallback } from "@/lib/agents/content/handleContentAgentCallback"; + +/** + * POST /api/content-agent/callback + * + * Callback endpoint for the poll-content-run Trigger.dev task. + * Receives task results and posts them back to the Slack thread. + * + * @param request - The incoming callback request + * @returns The callback response + */ +export async function POST(request: NextRequest) { + if (!contentAgentBot) { + return Response.json({ error: "Content agent not configured" }, { status: 503 }); + } + + await contentAgentBot.initialize(); + return handleContentAgentCallback(request); +} diff --git a/lib/agents/content/__tests__/handleContentAgentCallback.test.ts b/lib/agents/content/__tests__/handleContentAgentCallback.test.ts new file mode 100644 index 00000000..c334ae8d --- /dev/null +++ b/lib/agents/content/__tests__/handleContentAgentCallback.test.ts @@ -0,0 +1,73 @@ +import { describe, it, expect, vi, beforeEach, afterEach } from "vitest"; +import { handleContentAgentCallback } from "../handleContentAgentCallback"; + +vi.mock("@/lib/networking/getCorsHeaders", () => ({ + getCorsHeaders: vi.fn(() => ({ "Access-Control-Allow-Origin": "*" })), +})); + +vi.mock("../validateContentAgentCallback", () => ({ + validateContentAgentCallback: vi.fn(), +})); + +vi.mock("@/lib/agents/getThread", () => ({ + getThread: vi.fn(), +})); + +describe("handleContentAgentCallback", () => { + const originalEnv = { ...process.env }; + + beforeEach(() => { + vi.clearAllMocks(); + process.env.CODING_AGENT_CALLBACK_SECRET = "test-secret"; + }); + + afterEach(() => { + process.env = { ...originalEnv }; + }); + + it("returns 401 when x-callback-secret header is missing", async () => { + const request = new Request("http://localhost/api/content-agent/callback", { + method: "POST", + body: JSON.stringify({}), + }); + + const response = await handleContentAgentCallback(request); + expect(response.status).toBe(401); + }); + + it("returns 401 when secret does not match CODING_AGENT_CALLBACK_SECRET", async () => { + const request = new Request("http://localhost/api/content-agent/callback", { + method: "POST", + headers: { "x-callback-secret": "wrong-secret" }, + body: JSON.stringify({}), + }); + + const response = await handleContentAgentCallback(request); + expect(response.status).toBe(401); + }); + + it("returns 401 when CODING_AGENT_CALLBACK_SECRET env var is not set", async () => { + delete process.env.CODING_AGENT_CALLBACK_SECRET; + + const request = new Request("http://localhost/api/content-agent/callback", { + method: "POST", + headers: { "x-callback-secret": "test-secret" }, + body: JSON.stringify({}), + }); + + const response = await handleContentAgentCallback(request); + expect(response.status).toBe(401); + }); + + it("proceeds past auth when secret matches CODING_AGENT_CALLBACK_SECRET", async () => { + const request = new Request("http://localhost/api/content-agent/callback", { + method: "POST", + headers: { "x-callback-secret": "test-secret" }, + body: "not json", + }); + + const response = await handleContentAgentCallback(request); + // Should get past auth and fail on invalid JSON (400), not auth (401) + expect(response.status).toBe(400); + }); +}); diff --git a/lib/agents/content/__tests__/isContentAgentConfigured.test.ts b/lib/agents/content/__tests__/isContentAgentConfigured.test.ts new file mode 100644 index 00000000..d566159c --- /dev/null +++ b/lib/agents/content/__tests__/isContentAgentConfigured.test.ts @@ -0,0 +1,26 @@ +import { describe, it, expect, beforeEach, afterEach } from "vitest"; +import { isContentAgentConfigured } from "../isContentAgentConfigured"; +import { CONTENT_AGENT_REQUIRED_ENV_VARS } from "../validateContentAgentEnv"; + +describe("isContentAgentConfigured", () => { + const originalEnv = { ...process.env }; + + beforeEach(() => { + for (const key of CONTENT_AGENT_REQUIRED_ENV_VARS) { + process.env[key] = "test-value"; + } + }); + + afterEach(() => { + process.env = { ...originalEnv }; + }); + + it("returns true when all required env vars are set", () => { + expect(isContentAgentConfigured()).toBe(true); + }); + + it("returns false when any required env var is missing", () => { + delete process.env.CODING_AGENT_CALLBACK_SECRET; + expect(isContentAgentConfigured()).toBe(false); + }); +}); diff --git a/lib/agents/content/__tests__/validateContentAgentEnv.test.ts b/lib/agents/content/__tests__/validateContentAgentEnv.test.ts new file mode 100644 index 00000000..4b514c1c --- /dev/null +++ b/lib/agents/content/__tests__/validateContentAgentEnv.test.ts @@ -0,0 +1,40 @@ +import { describe, it, expect, beforeEach, afterEach } from "vitest"; +import { + validateContentAgentEnv, + CONTENT_AGENT_REQUIRED_ENV_VARS, +} from "../validateContentAgentEnv"; + +describe("validateContentAgentEnv", () => { + const originalEnv = { ...process.env }; + + beforeEach(() => { + for (const key of CONTENT_AGENT_REQUIRED_ENV_VARS) { + process.env[key] = "test-value"; + } + }); + + afterEach(() => { + process.env = { ...originalEnv }; + }); + + it("does not throw when all required env vars are set", () => { + expect(() => validateContentAgentEnv()).not.toThrow(); + }); + + it("throws when a required env var is missing", () => { + delete process.env.SLACK_CONTENT_BOT_TOKEN; + expect(() => validateContentAgentEnv()).toThrow(/Missing required environment variables/); + }); + + it("lists all missing vars in the error message", () => { + delete process.env.SLACK_CONTENT_BOT_TOKEN; + delete process.env.REDIS_URL; + expect(() => validateContentAgentEnv()).toThrow("SLACK_CONTENT_BOT_TOKEN"); + expect(() => validateContentAgentEnv()).toThrow("REDIS_URL"); + }); + + it("requires CODING_AGENT_CALLBACK_SECRET, not CONTENT_AGENT_CALLBACK_SECRET", () => { + expect(CONTENT_AGENT_REQUIRED_ENV_VARS).toContain("CODING_AGENT_CALLBACK_SECRET"); + expect(CONTENT_AGENT_REQUIRED_ENV_VARS).not.toContain("CONTENT_AGENT_CALLBACK_SECRET"); + }); +}); diff --git a/lib/agents/content/bot.ts b/lib/agents/content/bot.ts new file mode 100644 index 00000000..70f88ba4 --- /dev/null +++ b/lib/agents/content/bot.ts @@ -0,0 +1,44 @@ +import { Chat } from "chat"; +import { SlackAdapter } from "@chat-adapter/slack"; +import { agentLogger, createAgentState } from "@/lib/agents/createAgentState"; +import type { ContentAgentThreadState } from "./types"; +import { isContentAgentConfigured } from "./isContentAgentConfigured"; +import { validateContentAgentEnv } from "./validateContentAgentEnv"; + +type ContentAgentAdapters = { + slack: SlackAdapter; +}; + +/** + * Creates a new Chat bot instance configured with the Slack adapter + * for the Recoup Content Agent. + * + * @returns The configured Chat bot instance + */ +function createContentAgentBot() { + validateContentAgentEnv(); + + const state = createAgentState("content-agent"); + + const slack = new SlackAdapter({ + botToken: process.env.SLACK_CONTENT_BOT_TOKEN!, + signingSecret: process.env.SLACK_CONTENT_SIGNING_SECRET!, + logger: agentLogger, + }); + + return new Chat({ + userName: "Recoup Content Agent", + adapters: { slack }, + state, + }); +} + +export type ContentAgentBot = ReturnType; + +/** + * Singleton bot instance. Only created when content agent env vars are configured. + * Registers as the Chat SDK singleton so ThreadImpl can resolve adapters lazily from thread IDs. + */ +export const contentAgentBot: ContentAgentBot | null = isContentAgentConfigured() + ? createContentAgentBot().registerSingleton() + : null; diff --git a/lib/agents/content/handleContentAgentCallback.ts b/lib/agents/content/handleContentAgentCallback.ts new file mode 100644 index 00000000..de01f754 --- /dev/null +++ b/lib/agents/content/handleContentAgentCallback.ts @@ -0,0 +1,98 @@ +import { timingSafeEqual } from "crypto"; +import { NextResponse } from "next/server"; +import { getCorsHeaders } from "@/lib/networking/getCorsHeaders"; +import { validateContentAgentCallback } from "./validateContentAgentCallback"; +import { getThread } from "@/lib/agents/getThread"; +import type { ContentAgentThreadState } from "./types"; + +/** + * Handles content agent task callback from Trigger.dev. + * Verifies the shared secret and dispatches based on callback status. + * + * @param request - The incoming callback request + * @returns A NextResponse + */ +export async function handleContentAgentCallback(request: Request): Promise { + const secret = request.headers.get("x-callback-secret"); + const expectedSecret = process.env.CODING_AGENT_CALLBACK_SECRET; + + const secretBuf = secret ? Buffer.from(secret) : Buffer.alloc(0); + const expectedBuf = expectedSecret ? Buffer.from(expectedSecret) : Buffer.alloc(0); + + if ( + !secret || + !expectedSecret || + secretBuf.length !== expectedBuf.length || + !timingSafeEqual(secretBuf, expectedBuf) + ) { + return NextResponse.json( + { status: "error", error: "Unauthorized" }, + { status: 401, headers: getCorsHeaders() }, + ); + } + + let body: unknown; + try { + body = await request.json(); + } catch { + return NextResponse.json( + { status: "error", error: "Invalid JSON body" }, + { status: 400, headers: getCorsHeaders() }, + ); + } + + const validated = validateContentAgentCallback(body); + + if (validated instanceof NextResponse) { + return validated; + } + + const thread = getThread(validated.threadId); + + // Idempotency: skip if thread is no longer running (duplicate/retry delivery) + const currentState = await thread.state; + if (currentState?.status && currentState.status !== "running") { + return NextResponse.json({ status: "ok", skipped: true }, { headers: getCorsHeaders() }); + } + + switch (validated.status) { + case "completed": { + const results = validated.results ?? []; + const videos = results.filter(r => r.status === "completed" && r.videoUrl); + const failed = results.filter(r => r.status === "failed"); + + if (videos.length > 0) { + const lines = videos.map((v, i) => { + const label = videos.length > 1 ? `**Video ${i + 1}:** ` : ""; + const caption = v.captionText ? `\n> ${v.captionText}` : ""; + return `${label}${v.videoUrl}${caption}`; + }); + + if (failed.length > 0) { + lines.push(`\n_${failed.length} run(s) failed._`); + } + + await thread.post(lines.join("\n\n")); + } else { + await thread.post("Content generation finished but no videos were produced."); + } + + await thread.setState({ status: "completed" }); + break; + } + + case "failed": + await thread.setState({ status: "failed" }); + await thread.post(`Content generation failed: ${validated.message ?? "Unknown error"}`); + break; + + case "timeout": + await thread.setState({ status: "timeout" }); + await thread.post( + "Content generation timed out after 30 minutes. The pipeline may still be running — check the Trigger.dev dashboard.", + ); + break; + } + + return NextResponse.json({ status: "ok" }, { headers: getCorsHeaders() }); +} diff --git a/lib/agents/content/handlers/registerHandlers.ts b/lib/agents/content/handlers/registerHandlers.ts new file mode 100644 index 00000000..5bc537d9 --- /dev/null +++ b/lib/agents/content/handlers/registerHandlers.ts @@ -0,0 +1,12 @@ +import { contentAgentBot } from "../bot"; +import { registerOnNewMention } from "./registerOnNewMention"; +import { registerOnSubscribedMessage } from "./registerOnSubscribedMessage"; + +/** + * Registers all content agent event handlers on the bot singleton. + * Import this file once to attach handlers to the bot. + */ +if (contentAgentBot) { + registerOnNewMention(contentAgentBot); + registerOnSubscribedMessage(contentAgentBot); +} diff --git a/lib/agents/content/handlers/registerOnNewMention.ts b/lib/agents/content/handlers/registerOnNewMention.ts new file mode 100644 index 00000000..592412f9 --- /dev/null +++ b/lib/agents/content/handlers/registerOnNewMention.ts @@ -0,0 +1,113 @@ +import type { ContentAgentBot } from "../bot"; +import { triggerCreateContent } from "@/lib/trigger/triggerCreateContent"; +import { triggerPollContentRun } from "@/lib/trigger/triggerPollContentRun"; +import { resolveArtistSlug } from "@/lib/content/resolveArtistSlug"; +import { getArtistContentReadiness } from "@/lib/content/getArtistContentReadiness"; +import { selectAccountSnapshots } from "@/lib/supabase/account_snapshots/selectAccountSnapshots"; +import { DEFAULT_CONTENT_TEMPLATE } from "@/lib/content/contentTemplates"; + +/** + * Registers the onNewMention handler on the content agent bot. + * Parses the mention text, validates the artist, triggers content creation, + * and starts a polling task to report results back. + * + * @param bot - The content agent bot instance to register the handler on + */ +export function registerOnNewMention(bot: ContentAgentBot) { + bot.onNewMention(async (thread, _) => { + try { + const accountId = "fb678396-a68f-4294-ae50-b8cacf9ce77b"; + const artistAccountId = "1873859c-dd37-4e9a-9bac-80d3558527a9"; + const template = DEFAULT_CONTENT_TEMPLATE; + const batch = 1; + const lipsync = false; + + // Resolve artist slug + const artistSlug = await resolveArtistSlug(artistAccountId); + if (!artistSlug) { + await thread.post( + `Artist not found for account ID \`${artistAccountId}\`. Please check the ID and try again.`, + ); + return; + } + + // Resolve GitHub repo + let githubRepo: string; + try { + const readiness = await getArtistContentReadiness({ + accountId, + artistAccountId, + artistSlug, + }); + githubRepo = readiness.githubRepo; + } catch { + const snapshots = await selectAccountSnapshots(artistAccountId); + const repo = snapshots?.[0]?.github_repo; + if (!repo) { + await thread.post( + `No GitHub repository found for artist \`${artistSlug}\`. Content creation requires a configured repo.`, + ); + return; + } + githubRepo = repo; + } + + // Post acknowledgment + const batchNote = batch > 1 ? ` (${batch} videos)` : ""; + const lipsyncNote = lipsync ? " with lipsync" : ""; + await thread.post( + `Generating content for **${artistSlug}**${batchNote}${lipsyncNote}... Template: \`${template}\`. I'll reply here when ready (~5-10 min).`, + ); + + // Trigger content creation + const payload = { + accountId, + artistSlug, + template, + lipsync, + captionLength: "short" as const, + upscale: false, + githubRepo, + }; + + const results = await Promise.allSettled( + Array.from({ length: batch }, () => triggerCreateContent(payload)), + ); + const runIds = results + .filter(r => r.status === "fulfilled") + .map(r => (r as PromiseFulfilledResult<{ id: string }>).value.id); + + if (runIds.length === 0) { + await thread.post("Failed to trigger content creation. Please try again."); + return; + } + + // Set thread state + await thread.setState({ + status: "running", + artistAccountId, + template, + lipsync, + batch, + runIds, + }); + + // Trigger polling task + try { + await triggerPollContentRun({ + runIds, + callbackThreadId: thread.id, + }); + } catch (pollError) { + console.error("[content-agent] triggerPollContentRun failed:", pollError); + await thread.setState({ status: "failed" }); + await thread.post("Failed to start content polling. Please try again."); + return; + } + } catch (error) { + console.error("[content-agent] onNewMention error:", error); + await thread.setState({ status: "failed" }); + await thread.post("Something went wrong starting content generation. Please try again."); + } + }); +} diff --git a/lib/agents/content/handlers/registerOnSubscribedMessage.ts b/lib/agents/content/handlers/registerOnSubscribedMessage.ts new file mode 100644 index 00000000..5371a24b --- /dev/null +++ b/lib/agents/content/handlers/registerOnSubscribedMessage.ts @@ -0,0 +1,20 @@ +import type { ContentAgentBot } from "../bot"; + +/** + * Registers the onSubscribedMessage handler for the content agent. + * Handles replies in active threads while content is being generated. + * + * @param bot - The content agent bot instance to register the handler on + */ +export function registerOnSubscribedMessage(bot: ContentAgentBot) { + bot.onSubscribedMessage(async (thread, message) => { + // Guard against bot-authored messages to prevent echo loops + if (message.author.isBot || message.author.isMe) return; + + const state = await thread.state; + + if (state?.status === "running") { + await thread.post("Still generating your content. I'll reply here when it's ready."); + } + }); +} diff --git a/lib/agents/content/isContentAgentConfigured.ts b/lib/agents/content/isContentAgentConfigured.ts new file mode 100644 index 00000000..6da272f0 --- /dev/null +++ b/lib/agents/content/isContentAgentConfigured.ts @@ -0,0 +1,14 @@ +import { CONTENT_AGENT_REQUIRED_ENV_VARS } from "./validateContentAgentEnv"; + +/** + * Returns true if all required content agent environment variables are set. + * + * @returns Whether the content agent is fully configured + */ +export function isContentAgentConfigured(): boolean { + const missing = CONTENT_AGENT_REQUIRED_ENV_VARS.filter(name => !process.env[name]); + if (missing.length > 0) { + console.warn(`[content-agent] Missing env vars: ${missing.join(", ")}`); + } + return missing.length === 0; +} diff --git a/lib/agents/content/types.ts b/lib/agents/content/types.ts new file mode 100644 index 00000000..bae35862 --- /dev/null +++ b/lib/agents/content/types.ts @@ -0,0 +1,12 @@ +/** + * Thread state for the content agent bot. + * Stored in Redis via Chat SDK's state adapter. + */ +export interface ContentAgentThreadState { + status: "running" | "completed" | "failed" | "timeout"; + artistAccountId: string; + template: string; + lipsync: boolean; + batch: number; + runIds: string[]; +} diff --git a/lib/agents/content/validateContentAgentCallback.ts b/lib/agents/content/validateContentAgentCallback.ts new file mode 100644 index 00000000..70fff8f9 --- /dev/null +++ b/lib/agents/content/validateContentAgentCallback.ts @@ -0,0 +1,52 @@ +import { NextResponse } from "next/server"; +import { getCorsHeaders } from "@/lib/networking/getCorsHeaders"; +import { z } from "zod"; + +const contentRunResultSchema = z.object({ + runId: z.string(), + status: z.enum(["completed", "failed", "timeout"]), + videoUrl: z.string().optional(), + captionText: z.string().optional(), + error: z.string().optional(), +}); + +export const contentAgentCallbackSchema = z.object({ + threadId: z + .string({ message: "threadId is required" }) + .min(1, "threadId cannot be empty") + .regex(/^[^:]+:[^:]+:[^:]+$/, "threadId must match adapter:channel:thread format"), + status: z.enum(["completed", "failed", "timeout"]), + results: z.array(contentRunResultSchema).optional(), + message: z.string().optional(), +}); + +export type ContentAgentCallbackBody = z.infer; + +/** + * Validates the content agent callback body against the expected schema. + * + * @param body - The parsed JSON body of the callback request. + * @returns A NextResponse with an error if validation fails, or the validated body. + */ +export function validateContentAgentCallback( + body: unknown, +): NextResponse | ContentAgentCallbackBody { + const result = contentAgentCallbackSchema.safeParse(body); + + if (!result.success) { + const firstError = result.error.issues[0]; + return NextResponse.json( + { + status: "error", + missing_fields: firstError.path, + error: firstError.message, + }, + { + status: 400, + headers: getCorsHeaders(), + }, + ); + } + + return result.data; +} diff --git a/lib/agents/content/validateContentAgentEnv.ts b/lib/agents/content/validateContentAgentEnv.ts new file mode 100644 index 00000000..d6693626 --- /dev/null +++ b/lib/agents/content/validateContentAgentEnv.ts @@ -0,0 +1,19 @@ +export const CONTENT_AGENT_REQUIRED_ENV_VARS = [ + "SLACK_CONTENT_BOT_TOKEN", + "SLACK_CONTENT_SIGNING_SECRET", + "CODING_AGENT_CALLBACK_SECRET", + "REDIS_URL", +] as const; + +/** + * Validates that all required environment variables for the content agent are set. + * Throws an error listing all missing variables. + */ +export function validateContentAgentEnv(): void { + const missing = CONTENT_AGENT_REQUIRED_ENV_VARS.filter(name => !process.env[name]); + if (missing.length > 0) { + throw new Error( + `[content-agent] Missing required environment variables:\n${missing.map(v => ` - ${v}`).join("\n")}`, + ); + } +} diff --git a/lib/agents/createAgentState.ts b/lib/agents/createAgentState.ts new file mode 100644 index 00000000..124aaaf1 --- /dev/null +++ b/lib/agents/createAgentState.ts @@ -0,0 +1,29 @@ +import { ConsoleLogger } from "chat"; +import { createIoRedisState } from "@chat-adapter/state-ioredis"; +import redis from "@/lib/redis/connection"; + +/** + * Shared logger for all agent bots. + */ +export const agentLogger = new ConsoleLogger(); + +/** + * Creates a Redis-backed state adapter for an agent bot. + * Handles the Redis connection lifecycle and returns an ioredis state instance. + * + * @param keyPrefix - The Redis key prefix for this agent (e.g. "coding-agent", "content-agent") + * @returns The ioredis state adapter + */ +export function createAgentState(keyPrefix: string) { + if (redis.status === "wait") { + redis.connect().catch(err => { + console.error(`[${keyPrefix}] Redis failed to connect:`, err); + }); + } + + return createIoRedisState({ + client: redis, + keyPrefix, + logger: agentLogger, + }); +} diff --git a/lib/agents/createPlatformRoutes.ts b/lib/agents/createPlatformRoutes.ts new file mode 100644 index 00000000..5ee7420b --- /dev/null +++ b/lib/agents/createPlatformRoutes.ts @@ -0,0 +1,105 @@ +import type { NextRequest } from "next/server"; +import { after } from "next/server"; +import { z } from "zod"; +import { handleUrlVerification } from "@/lib/slack/handleUrlVerification"; + +const platformSchema = z.object({ + platform: z.string().min(1), +}); + +type WebhookHandler = ( + request: Request, + options?: { waitUntil?: (task: Promise) => void }, +) => Promise; + +interface AgentBotLike { + webhooks: Record; + initialize(): Promise; +} + +interface PlatformRouteConfig { + getBot: () => AgentBotLike; + ensureHandlers?: () => void; + isConfigured?: () => boolean; +} + +/** + * Creates GET and POST route handlers for a [platform] webhook route. + * Shared across agent bots (coding-agent, content-agent) to avoid duplication. + * + * @param config - Bot accessor, optional handler registration, optional env guard + * @returns GET and POST route handlers for Next.js App Router + */ +export function createPlatformRoutes(config: PlatformRouteConfig) { + /** + * Handles webhook verification handshakes (e.g. WhatsApp hub.challenge). + * + * @param request - The incoming verification request + * @param root0 - Route params wrapper + * @param root0.params - Promise resolving to the platform name + * @returns The webhook verification response + */ + async function GET(request: NextRequest, { params }: { params: Promise<{ platform: string }> }) { + if (config.isConfigured && !config.isConfigured()) { + return Response.json({ error: "Agent not configured" }, { status: 503 }); + } + + const parsed = platformSchema.safeParse(await params); + if (!parsed.success) { + return Response.json({ error: "Invalid platform parameter" }, { status: 400 }); + } + + const { platform } = parsed.data; + config.ensureHandlers?.(); + + const bot = config.getBot(); + const handler = bot.webhooks[platform]; + + if (!handler) { + return Response.json({ error: "Unknown platform" }, { status: 404 }); + } + + return handler(request, { waitUntil: p => after(() => p) }); + } + + /** + * Handles incoming webhook events from the platform adapter. + * + * @param request - The incoming webhook request + * @param root0 - Route params wrapper + * @param root0.params - Promise resolving to the platform name + * @returns The webhook response + */ + async function POST(request: NextRequest, { params }: { params: Promise<{ platform: string }> }) { + const parsed = platformSchema.safeParse(await params); + if (!parsed.success) { + return Response.json({ error: "Invalid platform parameter" }, { status: 400 }); + } + + const { platform } = parsed.data; + + if (platform === "slack") { + const verification = await handleUrlVerification(request); + if (verification) return verification; + } + + if (config.isConfigured && !config.isConfigured()) { + return Response.json({ error: "Agent not configured" }, { status: 503 }); + } + + config.ensureHandlers?.(); + + const bot = config.getBot(); + await bot.initialize(); + + const handler = bot.webhooks[platform]; + + if (!handler) { + return Response.json({ error: "Unknown platform" }, { status: 404 }); + } + + return handler(request, { waitUntil: p => after(() => p) }); + } + + return { GET, POST }; +} diff --git a/lib/agents/getThread.ts b/lib/agents/getThread.ts new file mode 100644 index 00000000..8a306dcb --- /dev/null +++ b/lib/agents/getThread.ts @@ -0,0 +1,29 @@ +import { ThreadImpl } from "chat"; + +const THREAD_ID_PATTERN = /^[^:]+:[^:]+:[^:]+$/; + +/** + * Reconstructs a Thread from a stored thread ID using the Chat SDK singleton. + * Shared across agent bots (coding-agent, content-agent). + * + * @param threadId - The stored thread identifier (format: adapter:channel:thread) + * @returns The reconstructed Thread instance + * @throws If threadId does not match the expected adapter:channel:thread format + */ +export function getThread>(threadId: string) { + if (!THREAD_ID_PATTERN.test(threadId)) { + throw new Error( + `Invalid threadId format: expected "adapter:channel:thread", got "${threadId}"`, + ); + } + + const parts = threadId.split(":"); + const adapterName = parts[0]; + const channelId = `${adapterName}:${parts[1]}`; + + return new ThreadImpl({ + adapterName, + id: threadId, + channelId, + }); +} diff --git a/lib/coding-agent/__tests__/getThread.test.ts b/lib/coding-agent/__tests__/getThread.test.ts index e44198aa..ce876a11 100644 --- a/lib/coding-agent/__tests__/getThread.test.ts +++ b/lib/coding-agent/__tests__/getThread.test.ts @@ -4,9 +4,9 @@ vi.mock("chat", () => ({ ThreadImpl: vi.fn().mockImplementation((config: Record) => config), })); -describe("getThread", () => { +describe("getThread (shared)", () => { it("parses adapter name and channel ID from thread ID", async () => { - const { getThread } = await import("../getThread"); + const { getThread } = await import("@/lib/agents/getThread"); const { ThreadImpl } = await import("chat"); getThread("slack:C123:1234567890.123456"); diff --git a/lib/coding-agent/__tests__/handlePRCreated.test.ts b/lib/coding-agent/__tests__/handlePRCreated.test.ts index 3ccac045..928e9e15 100644 --- a/lib/coding-agent/__tests__/handlePRCreated.test.ts +++ b/lib/coding-agent/__tests__/handlePRCreated.test.ts @@ -5,7 +5,7 @@ const mockThread = { setState: vi.fn(), }; -vi.mock("../getThread", () => ({ +vi.mock("@/lib/agents/getThread", () => ({ getThread: vi.fn(() => mockThread), })); diff --git a/lib/coding-agent/bot.ts b/lib/coding-agent/bot.ts index 88160c5c..273ca613 100644 --- a/lib/coding-agent/bot.ts +++ b/lib/coding-agent/bot.ts @@ -1,15 +1,12 @@ -import { Chat, ConsoleLogger } from "chat"; +import { Chat } from "chat"; import { SlackAdapter } from "@chat-adapter/slack"; import { createWhatsAppAdapter, WhatsAppAdapter } from "@chat-adapter/whatsapp"; import { createGitHubAdapter } from "@chat-adapter/github"; -import { createIoRedisState } from "@chat-adapter/state-ioredis"; -import redis from "@/lib/redis/connection"; +import { agentLogger, createAgentState } from "@/lib/agents/createAgentState"; import type { CodingAgentThreadState } from "./types"; import { validateCodingAgentEnv } from "./validateEnv"; import { isWhatsAppConfigured } from "./whatsApp/isWhatsAppConfigured"; -const logger = new ConsoleLogger(); - type CodingAgentAdapters = { slack: SlackAdapter; github: ReturnType; @@ -18,40 +15,31 @@ type CodingAgentAdapters = { /** * Creates a new Chat bot instance configured with Slack, GitHub, and optionally WhatsApp adapters. + * + * @returns The configured Chat bot instance */ export function createCodingAgentBot() { validateCodingAgentEnv(); - // ioredis is configured with lazyConnect: true, so we must - // explicitly connect before the state adapter listens for "ready". - if (redis.status === "wait") { - redis.connect().catch(() => { - throw new Error("[coding-agent] Redis failed to connect"); - }); - } - const state = createIoRedisState({ - client: redis, - keyPrefix: "coding-agent", - logger, - }); + const state = createAgentState("coding-agent"); const slack = new SlackAdapter({ botToken: process.env.SLACK_BOT_TOKEN!, signingSecret: process.env.SLACK_SIGNING_SECRET!, - logger, + logger: agentLogger, }); const github = createGitHubAdapter({ token: process.env.GITHUB_TOKEN!, webhookSecret: process.env.GITHUB_WEBHOOK_SECRET!, userName: process.env.GITHUB_BOT_USERNAME ?? "recoup-coding-agent", - logger, + logger: agentLogger, }); const adapters: CodingAgentAdapters = { slack, github }; if (isWhatsAppConfigured()) { - adapters.whatsapp = createWhatsAppAdapter({ logger }); + adapters.whatsapp = createWhatsAppAdapter({ logger: agentLogger }); } return new Chat({ diff --git a/lib/coding-agent/getThread.ts b/lib/coding-agent/getThread.ts deleted file mode 100644 index 1322db4d..00000000 --- a/lib/coding-agent/getThread.ts +++ /dev/null @@ -1,17 +0,0 @@ -import { ThreadImpl } from "chat"; -import type { CodingAgentThreadState } from "./types"; - -/** - * Reconstructs a Thread from a stored thread ID using the Chat SDK singleton. - * - * @param threadId - */ -export function getThread(threadId: string) { - const adapterName = threadId.split(":")[0]; - const channelId = `${adapterName}:${threadId.split(":")[1]}`; - return new ThreadImpl({ - adapterName, - id: threadId, - channelId, - }); -} diff --git a/lib/coding-agent/handleCodingAgentCallback.ts b/lib/coding-agent/handleCodingAgentCallback.ts index 53ff1424..e4cabda1 100644 --- a/lib/coding-agent/handleCodingAgentCallback.ts +++ b/lib/coding-agent/handleCodingAgentCallback.ts @@ -1,7 +1,7 @@ import { NextResponse } from "next/server"; import { getCorsHeaders } from "@/lib/networking/getCorsHeaders"; import { validateCodingAgentCallback } from "./validateCodingAgentCallback"; -import { getThread } from "./getThread"; +import { getThread } from "@/lib/agents/getThread"; import { handlePRCreated } from "./handlePRCreated"; import { buildPRCard } from "./buildPRCard"; import { setCodingAgentPRState } from "./prState"; @@ -41,7 +41,7 @@ export async function handleCodingAgentCallback(request: Request): Promise(validated.threadId); // Post agent stdout to the thread so users see the full agent response if (validated.stdout?.trim()) { diff --git a/lib/coding-agent/handlePRCreated.ts b/lib/coding-agent/handlePRCreated.ts index 77837b28..52bee268 100644 --- a/lib/coding-agent/handlePRCreated.ts +++ b/lib/coding-agent/handlePRCreated.ts @@ -1,7 +1,8 @@ -import { getThread } from "./getThread"; +import { getThread } from "@/lib/agents/getThread"; import { buildPRCard } from "./buildPRCard"; import { setCodingAgentPRState } from "./prState"; import type { CodingAgentCallbackBody } from "./validateCodingAgentCallback"; +import type { CodingAgentThreadState } from "./types"; /** * Handles the pr_created callback status. @@ -11,7 +12,7 @@ import type { CodingAgentCallbackBody } from "./validateCodingAgentCallback"; * @param body */ export async function handlePRCreated(threadId: string, body: CodingAgentCallbackBody) { - const thread = getThread(threadId); + const thread = getThread(threadId); const prs = body.prs ?? []; const card = buildPRCard("PRs Created", prs); diff --git a/lib/trigger/triggerPollContentRun.ts b/lib/trigger/triggerPollContentRun.ts new file mode 100644 index 00000000..f6b1d8a8 --- /dev/null +++ b/lib/trigger/triggerPollContentRun.ts @@ -0,0 +1,18 @@ +import { tasks } from "@trigger.dev/sdk"; + +type PollContentRunPayload = { + runIds: string[]; + callbackThreadId: string; +}; + +/** + * Triggers the poll-content-run task to monitor content creation runs + * and post results back to the Slack thread via callback. + * + * @param payload - The run IDs to poll and the callback thread ID + * @returns The task handle with runId + */ +export async function triggerPollContentRun(payload: PollContentRunPayload) { + const handle = await tasks.trigger("poll-content-run", payload); + return handle; +}