Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 6 additions & 15 deletions api/platform/src/internal.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,26 +3,17 @@
*
* NOT served over HTTP. Accessed exclusively via createInternalCaller().
* All procedures use internalProcedure (observability middleware, no auth).
*
* Sub-routers will be added as business logic is migrated from:
* - Inngest function steps (DB calls, provider APIs, token management)
* - Route handler lib calls (OAuth, webhook ingestion)
*/
import { createTRPCRouter, internalProcedure } from "./trpc";

import { oauthInternalRouter } from "./router/internal/oauth";
import { webhooksInternalRouter } from "./router/internal/webhooks";
import { createTRPCRouter } from "./trpc";

// -- Internal Router ----------------------------------------------------------

export const internalRouter = createTRPCRouter({
/**
* Proof-of-concept procedure.
* Validates the full chain: caller -> router -> procedure -> middleware -> response.
* Remove once real sub-routers are added.
*/
ping: internalProcedure.query(({ ctx }) => ({
ok: true as const,
timestamp: new Date().toISOString(),
source: ctx.auth.type === "internal" ? ctx.auth.source : "unknown",
})),
webhooks: webhooksInternalRouter,
oauth: oauthInternalRouter,
});

export type InternalRouter = typeof internalRouter;
Expand Down
3 changes: 2 additions & 1 deletion api/platform/src/lib/oauth/callback.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { gatewayInstallations } from "@db/app/schema";
import type { SourceType } from "@repo/app-providers";
import { getProvider, providerAccountInfoSchema } from "@repo/app-providers";
import { and, eq } from "@vendor/db";
import { parseError } from "@vendor/observability/error/next";
import { log } from "@vendor/observability/log/next";
import { providerConfigs } from "../provider-configs";
import { writeTokenRecord } from "../token-store";
Expand Down Expand Up @@ -348,7 +349,7 @@ export async function processOAuthCallback(
reactivated,
});
} catch (err) {
const message = err instanceof Error ? err.message : "unknown";
const message = parseError(err);
log.error("[oauth/callback] oauth callback failed", {
provider: providerName,
error: message,
Expand Down
94 changes: 94 additions & 0 deletions api/platform/src/router/internal/oauth.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/**
* Internal OAuth sub-router.
*
* Handles OAuth authorize URL generation, callback processing, and CLI polling.
* Moved from apps/platform/src/app/api/connect/ route handlers.
*/

import type { SourceType } from "@repo/app-providers";
import type { TRPCRouterRecord } from "@trpc/server";
import { TRPCError } from "@trpc/server";
import { z } from "zod";
import { buildAuthorizeUrl } from "../../lib/oauth/authorize";
import {
type CallbackProcessResult,
processOAuthCallback,
} from "../../lib/oauth/callback";
import { getOAuthResult } from "../../lib/oauth/state";
import { internalProcedure } from "../../trpc";

// ── Router ──────────────────────────────────────────────────────────────────

export const oauthInternalRouter = {
/**
* Build OAuth authorize URL for a provider.
*
* Generates a cryptographically random state token, stores it in Redis,
* and returns the authorization URL.
*/
buildAuthorizeUrl: internalProcedure
.input(
z.object({
provider: z.string(),
orgId: z.string(),
connectedBy: z.string(),
redirectTo: z.string().optional(),
})
)
.mutation(async ({ input }) => {
const result = await buildAuthorizeUrl({
provider: input.provider as SourceType,
orgId: input.orgId,
connectedBy: input.connectedBy,
redirectTo: input.redirectTo,
});

if (!result.ok) {
throw new TRPCError({
code: "BAD_REQUEST",
message: result.error,
});
}

return { url: result.url, state: result.state };
}),

/**
* Process OAuth callback: validate state, exchange code, upsert installation,
* persist tokens, store result for CLI polling.
*
* Returns CallbackProcessResult — the route handler maps this to HTTP responses
* (redirect, inline HTML, or error JSON).
*/
processCallback: internalProcedure
.input(
z.object({
provider: z.string(),
state: z.string(),
query: z.record(z.string(), z.string()),
})
)
.mutation(async ({ input }): Promise<CallbackProcessResult> => {
return processOAuthCallback({
provider: input.provider as SourceType,
state: input.state,
query: input.query as Record<string, string>,
});
}),

/**
* Poll for OAuth completion result.
*
* Returns the result hash from Redis if the OAuth flow has completed,
* or null if still pending.
*/
pollResult: internalProcedure
.input(
z.object({
state: z.string(),
})
)
.query(async ({ input }) => {
return getOAuthResult(input.state);
}),
} satisfies TRPCRouterRecord;
210 changes: 210 additions & 0 deletions api/platform/src/router/internal/webhooks.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
/**
* Internal webhooks sub-router.
*
* Handles webhook ingestion: HMAC verification, DB persistence, Inngest dispatch.
* Moved from apps/platform/src/app/api/ingest/[provider]/route.ts.
*/

import { db } from "@db/app/client";
import { gatewayWebhookDeliveries } from "@db/app/schema";
import type { WebhookDef } from "@repo/app-providers";
import {
deriveVerifySignature,
getProvider,
hasInboundWebhooks,
isWebhookProvider,
} from "@repo/app-providers";
import type { TRPCRouterRecord } from "@trpc/server";
import { TRPCError } from "@trpc/server";
import { log } from "@vendor/observability/log/next";
import { z } from "zod";
import { inngest } from "../../inngest/client";
import { getProviderConfigs } from "../../lib/provider-configs";
import { internalProcedure } from "../../trpc";

// ── Helpers (moved from route handler) ──────────────────────────────────────

function getWebhookDef(
providerDef: NonNullable<ReturnType<typeof getProvider>>
): WebhookDef<unknown> | null {
if (isWebhookProvider(providerDef)) {
return providerDef.webhook as WebhookDef<unknown>;
}
if (providerDef.kind === "managed") {
return providerDef.inbound.webhook as WebhookDef<unknown>;
}
if (
providerDef.kind === "api" &&
"inbound" in providerDef &&
providerDef.inbound
) {
return providerDef.inbound.webhook as WebhookDef<unknown>;
}
return null;
}

// ── Router ──────────────────────────────────────────────────────────────────

export const webhooksInternalRouter = {
/**
* Ingest a webhook delivery: verify HMAC, persist to DB, dispatch to Inngest.
*
* The route handler extracts rawBody and headers from the HTTP request
* and passes them here. This procedure handles everything else.
*
* Returns the response shape for the route handler to forward as JSON.
*/
ingest: internalProcedure
.input(
z.object({
provider: z.string(),
rawBody: z.string(),
headers: z.record(z.string(), z.string()),
receivedAt: z.number(),
})
)
.mutation(async ({ input }) => {
const { provider: providerSlug, rawBody, headers, receivedAt } = input;

// Provider guard
const providerDef = getProvider(providerSlug);
if (!providerDef) {
throw new TRPCError({
code: "NOT_FOUND",
message: `unknown_provider: ${providerSlug}`,
});
}

if (!hasInboundWebhooks(providerDef)) {
throw new TRPCError({
code: "BAD_REQUEST",
message: `not_webhook_provider: ${providerSlug}`,
});
}

const webhookDef = getWebhookDef(providerDef);
if (!webhookDef) {
throw new TRPCError({
code: "BAD_REQUEST",
message: `no_webhook_def: ${providerSlug}`,
});
}

// Webhook header validation
const headersObj: Record<string, string | undefined> = {};
for (const key of Object.keys(
(webhookDef.headersSchema as { shape: Record<string, unknown> }).shape
)) {
headersObj[key] = (headers as Record<string, string>)[key] ?? undefined;
}
const headersParsed = webhookDef.headersSchema.safeParse(headersObj);
if (!headersParsed.success) {
throw new TRPCError({
code: "BAD_REQUEST",
message: "missing_headers",
});
}

// Signature verification
const configs = getProviderConfigs();
const providerConfig = configs[providerSlug];
if (!providerConfig) {
log.error("[webhooks.ingest] provider config not found", {
provider: providerSlug,
});
throw new TRPCError({
code: "INTERNAL_SERVER_ERROR",
message: `provider_not_configured: ${providerSlug}`,
});
}

const secret = (webhookDef.extractSecret as (config: unknown) => string)(
providerConfig
);
const verify =
webhookDef.verifySignature ??
deriveVerifySignature(webhookDef.signatureScheme);

// Build a Headers object for the verify function (it expects Headers, not Record)
const reqHeaders = new Headers(Object.entries(headers));
const isValid = await verify(rawBody, reqHeaders, secret);
if (!isValid) {
log.warn("[webhooks.ingest] signature verification failed", {
provider: providerSlug,
});
throw new TRPCError({
code: "UNAUTHORIZED",
message: "signature_invalid",
});
}

// Payload parse + metadata extraction
let jsonPayload: unknown;
try {
jsonPayload = JSON.parse(rawBody);
} catch {
throw new TRPCError({
code: "BAD_REQUEST",
message: "invalid_json",
});
}

let parsedPayload: unknown;
try {
parsedPayload = webhookDef.parsePayload(jsonPayload);
} catch {
throw new TRPCError({
code: "BAD_REQUEST",
message: `payload_validation_failed: ${providerSlug}`,
});
}

const deliveryId = webhookDef.extractDeliveryId(
reqHeaders,
parsedPayload
);
const eventType = webhookDef.extractEventType(reqHeaders, parsedPayload);
const resourceId = webhookDef.extractResourceId(parsedPayload);

// Persist to DB
await db
.insert(gatewayWebhookDeliveries)
.values({
provider: providerSlug,
deliveryId,
eventType,
installationId: null,
status: "received",
payload: JSON.stringify(parsedPayload),
receivedAt: new Date(receivedAt).toISOString(),
})
.onConflictDoNothing();

// Dispatch Inngest event
const correlationId = crypto.randomUUID();

await inngest.send({
id: `wh-${providerSlug}-${deliveryId}`,
name: "platform/webhook.received",
data: {
provider: providerSlug,
deliveryId,
eventType,
resourceId,
payload: parsedPayload,
receivedAt,
correlationId,
},
});

log.info("[webhooks.ingest] webhook received", {
provider: providerSlug,
deliveryId,
eventType,
resourceId,
correlationId,
});

return { status: "accepted" as const, deliveryId };
}),
} satisfies TRPCRouterRecord;
Loading
Loading