Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
284 changes: 231 additions & 53 deletions worker/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,14 @@
* ships with raw API keys. Keys are stored as Cloudflare secrets.
*
* Routes:
* POST /chat → Anthropic Messages API (streaming)
* POST /tts → ElevenLabs TTS API
* POST /chat → Anthropic Messages API (streaming)
* POST /tts → ElevenLabs TTS API
* POST /transcribe-token → AssemblyAI temporary websocket token
*
* Hardening:
* - Per-IP sliding-window rate limiting on /chat and /tts
* - Structured JSON error responses with error codes
* - Request logging (method, route, IP, status) for wrangler tail
*/

interface Env {
Expand All @@ -16,42 +22,203 @@ interface Env {
ASSEMBLYAI_API_KEY: string;
}

// --- Structured error codes returned in all error responses ---
type ErrorCode = "RATE_LIMITED" | "UPSTREAM_ERROR" | "BAD_REQUEST" | "INTERNAL_ERROR";

interface StructuredErrorBody {
error: string;
code: ErrorCode;
}

const JSON_CONTENT_TYPE_HEADER = { "content-type": "application/json" };

/**
* Build a structured JSON error Response.
* Every error path in the worker funnels through here so the client always
* receives a consistent `{ error, code }` shape.
*/
function buildStructuredErrorResponse(
httpStatus: number,
errorMessage: string,
errorCode: ErrorCode,
): Response {
const body: StructuredErrorBody = { error: errorMessage, code: errorCode };
return new Response(JSON.stringify(body), {
status: httpStatus,
headers: JSON_CONTENT_TYPE_HEADER,
});
}

// --- Per-IP sliding-window rate limiter ---

/**
* Each entry stores the timestamps (in ms) of recent requests from a single IP
* for a single route. Timestamps older than the window are pruned on every check.
*
* We use an in-memory Map because Cloudflare Workers keep the module-level
* scope alive across requests on the same isolate. This gives us lightweight,
* per-isolate rate limiting without external storage. The trade-off is that
* limits reset when the isolate is evicted, but that is acceptable for
* abuse-prevention (not billing-grade) rate limiting.
*/
interface SlidingWindowEntry {
requestTimestamps: number[];
}

interface RateLimitConfiguration {
maxRequestsPerWindow: number;
windowDurationMilliseconds: number;
}

// Separate rate-limit buckets per route so /chat and /tts limits are independent.
const rateLimitBucketsByRoute = new Map<string, Map<string, SlidingWindowEntry>>();

const RATE_LIMIT_CONFIG_BY_ROUTE: Record<string, RateLimitConfiguration> = {
"/chat": {
maxRequestsPerWindow: 20,
// 60 seconds (1 minute)
windowDurationMilliseconds: 60_000,
},
"/tts": {
maxRequestsPerWindow: 30,
windowDurationMilliseconds: 60_000,
},
};

/**
* Returns `true` if the request should be allowed, `false` if the IP has
* exceeded the rate limit for the given route.
*
* Side-effect: records the current timestamp into the sliding window when the
* request is allowed.
*/
function isRequestAllowedByRateLimit(clientIpAddress: string, routePath: string): boolean {
const routeConfiguration = RATE_LIMIT_CONFIG_BY_ROUTE[routePath];
if (!routeConfiguration) {
// No rate-limit configured for this route — always allow.
return true;
}

// Lazily create the per-route bucket map.
if (!rateLimitBucketsByRoute.has(routePath)) {
rateLimitBucketsByRoute.set(routePath, new Map());
}
const routeBuckets = rateLimitBucketsByRoute.get(routePath)!;

const currentTimestamp = Date.now();
const windowStartTimestamp = currentTimestamp - routeConfiguration.windowDurationMilliseconds;

// Lazily create the entry for this IP.
if (!routeBuckets.has(clientIpAddress)) {
routeBuckets.set(clientIpAddress, { requestTimestamps: [] });
}
const ipEntry = routeBuckets.get(clientIpAddress)!;

// Prune timestamps that have fallen outside the sliding window.
ipEntry.requestTimestamps = ipEntry.requestTimestamps.filter(
(timestamp) => timestamp > windowStartTimestamp,
);

if (ipEntry.requestTimestamps.length >= routeConfiguration.maxRequestsPerWindow) {
return false;
}

// Record this request's timestamp so it counts toward the window.
ipEntry.requestTimestamps.push(currentTimestamp);
return true;
}

// --- Request logging ---

/**
* Logs a single-line summary of each request for observability via `wrangler tail`.
* Format: "[POST /chat] ip=203.0.113.42 → 200"
*/
function logRequestSummary(
httpMethod: string,
routePath: string,
clientIpAddress: string,
responseStatus: number,
): void {
console.log(
`[${httpMethod} ${routePath}] ip=${clientIpAddress} → ${responseStatus}`,
);
}

// --- Main fetch handler ---

export default {
async fetch(request: Request, env: Env): Promise<Response> {
const url = new URL(request.url);
const routePath = url.pathname;
const clientIpAddress = request.headers.get("CF-Connecting-IP") ?? "unknown";
const httpMethod = request.method;

if (request.method !== "POST") {
return new Response("Method not allowed", { status: 405 });
// --- Method check ---
if (httpMethod !== "POST") {
const response = buildStructuredErrorResponse(
405,
"Method not allowed. Only POST requests are accepted.",
"BAD_REQUEST",
);
logRequestSummary(httpMethod, routePath, clientIpAddress, 405);
return response;
}

// --- Rate limiting (applied before any upstream work) ---
if (!isRequestAllowedByRateLimit(clientIpAddress, routePath)) {
const rateLimitConfig = RATE_LIMIT_CONFIG_BY_ROUTE[routePath];
const windowSeconds = rateLimitConfig
? rateLimitConfig.windowDurationMilliseconds / 1000
: 60;
const response = buildStructuredErrorResponse(
429,
`Rate limit exceeded. Try again in ${windowSeconds} seconds.`,
"RATE_LIMITED",
);
logRequestSummary(httpMethod, routePath, clientIpAddress, 429);
return response;
}

// --- Route dispatch ---
try {
if (url.pathname === "/chat") {
return await handleChat(request, env);
}
let response: Response;

if (url.pathname === "/tts") {
return await handleTTS(request, env);
if (routePath === "/chat") {
response = await handleChat(request, env);
} else if (routePath === "/tts") {
response = await handleTTS(request, env);
} else if (routePath === "/transcribe-token") {
response = await handleTranscribeToken(env);
} else {
response = buildStructuredErrorResponse(
404,
`Route not found: ${routePath}`,
"BAD_REQUEST",
);
}

if (url.pathname === "/transcribe-token") {
return await handleTranscribeToken(env);
}
logRequestSummary(httpMethod, routePath, clientIpAddress, response.status);
return response;
} catch (error) {
console.error(`[${url.pathname}] Unhandled error:`, error);
return new Response(
JSON.stringify({ error: String(error) }),
{ status: 500, headers: { "content-type": "application/json" } }
console.error(`[${routePath}] Unhandled error:`, error);
const response = buildStructuredErrorResponse(
500,
"An internal error occurred. Please try again later.",
"INTERNAL_ERROR",
);
logRequestSummary(httpMethod, routePath, clientIpAddress, 500);
return response;
}

return new Response("Not found", { status: 404 });
},
};

// --- Route handlers ---

async function handleChat(request: Request, env: Env): Promise<Response> {
const body = await request.text();

const response = await fetch("https://api.anthropic.com/v1/messages", {
const anthropicResponse = await fetch("https://api.anthropic.com/v1/messages", {
method: "POST",
headers: {
"x-api-key": env.ANTHROPIC_API_KEY,
Expand All @@ -61,56 +228,63 @@ async function handleChat(request: Request, env: Env): Promise<Response> {
body,
});

if (!response.ok) {
const errorBody = await response.text();
console.error(`[/chat] Anthropic API error ${response.status}: ${errorBody}`);
return new Response(errorBody, {
status: response.status,
headers: { "content-type": "application/json" },
});
if (!anthropicResponse.ok) {
const upstreamErrorBody = await anthropicResponse.text();
console.error(
`[/chat] Anthropic API error ${anthropicResponse.status}: ${upstreamErrorBody}`,
);
return buildStructuredErrorResponse(
anthropicResponse.status,
`Anthropic API error: ${upstreamErrorBody}`,
"UPSTREAM_ERROR",
);
}

return new Response(response.body, {
status: response.status,
return new Response(anthropicResponse.body, {
status: anthropicResponse.status,
headers: {
"content-type": response.headers.get("content-type") || "text/event-stream",
"content-type":
anthropicResponse.headers.get("content-type") || "text/event-stream",
"cache-control": "no-cache",
},
});
}

async function handleTranscribeToken(env: Env): Promise<Response> {
const response = await fetch(
const assemblyAiResponse = await fetch(
"https://streaming.assemblyai.com/v3/token?expires_in_seconds=480",
{
method: "GET",
headers: {
authorization: env.ASSEMBLYAI_API_KEY,
},
}
},
);

if (!response.ok) {
const errorBody = await response.text();
console.error(`[/transcribe-token] AssemblyAI token error ${response.status}: ${errorBody}`);
return new Response(errorBody, {
status: response.status,
headers: { "content-type": "application/json" },
});
if (!assemblyAiResponse.ok) {
const upstreamErrorBody = await assemblyAiResponse.text();
console.error(
`[/transcribe-token] AssemblyAI token error ${assemblyAiResponse.status}: ${upstreamErrorBody}`,
);
return buildStructuredErrorResponse(
assemblyAiResponse.status,
`AssemblyAI token error: ${upstreamErrorBody}`,
"UPSTREAM_ERROR",
);
}

const data = await response.text();
return new Response(data, {
const tokenResponseData = await assemblyAiResponse.text();
return new Response(tokenResponseData, {
status: 200,
headers: { "content-type": "application/json" },
headers: JSON_CONTENT_TYPE_HEADER,
});
}

async function handleTTS(request: Request, env: Env): Promise<Response> {
const body = await request.text();
const voiceId = env.ELEVENLABS_VOICE_ID;

const response = await fetch(
const elevenLabsResponse = await fetch(
`https://api.elevenlabs.io/v1/text-to-speech/${voiceId}`,
{
method: "POST",
Expand All @@ -120,22 +294,26 @@ async function handleTTS(request: Request, env: Env): Promise<Response> {
accept: "audio/mpeg",
},
body,
}
},
);

if (!response.ok) {
const errorBody = await response.text();
console.error(`[/tts] ElevenLabs API error ${response.status}: ${errorBody}`);
return new Response(errorBody, {
status: response.status,
headers: { "content-type": "application/json" },
});
if (!elevenLabsResponse.ok) {
const upstreamErrorBody = await elevenLabsResponse.text();
console.error(
`[/tts] ElevenLabs API error ${elevenLabsResponse.status}: ${upstreamErrorBody}`,
);
return buildStructuredErrorResponse(
elevenLabsResponse.status,
`ElevenLabs API error: ${upstreamErrorBody}`,
"UPSTREAM_ERROR",
);
}

return new Response(response.body, {
status: response.status,
return new Response(elevenLabsResponse.body, {
status: elevenLabsResponse.status,
headers: {
"content-type": response.headers.get("content-type") || "audio/mpeg",
"content-type":
elevenLabsResponse.headers.get("content-type") || "audio/mpeg",
},
});
}