diff --git a/src/lib/river/client.ts b/src/lib/river/client.ts index 7128063..691ee7e 100644 --- a/src/lib/river/client.ts +++ b/src/lib/river/client.ts @@ -5,7 +5,7 @@ import type { InferRiverAgentChunkType, InferRiverAgentInputType } from './types.js'; -import { RiverError } from './errors.js'; +import { RiverError, codeFromStatus, RiverErrorJSONSchema } from './errors.js'; type RiverClientCaller = { [K in keyof T]: ClientSideCaller, InferRiverAgentInputType>; @@ -44,25 +44,63 @@ const createClientCaller = (endpoint: string): RiverClien signal: abortController.signal }), (error) => { - return new RiverError('Failed to call agent', error); + const isAbort = abortController.signal.aborted; + const message = isAbort ? 'Request was cancelled' : 'Failed to call agent'; + const code: RiverError['code'] = isAbort + ? 'CLIENT_CLOSED_REQUEST' + : 'INTERNAL_SERVER_ERROR'; + return new RiverError(message, { + code: code, + agentId, + cause: error + }); } ); if (response.isErr()) { + const isAbortErr = response.error.code === 'CLIENT_CLOSED_REQUEST'; + if (isAbortErr || abortController.signal.aborted) { + await onCancel?.(); + await handleFinish(); + return; + } await onError?.(response.error); await handleFinish(); return; } if (!response.value.ok) { - await onError?.(new RiverError('Failed to call agent', response.value)); + const args = { + code: codeFromStatus(response.value.status), + httpStatus: response.value.status, + agentId + }; + const riverErr = await ResultAsync.fromPromise( + response.value.json(), + (error) => new RiverError('Failed to parse JSON', { cause: error, ...args }) + ).match( + (json) => { + const { success, data, error } = RiverErrorJSONSchema.safeParse(json); + return success + ? RiverError.fromJSON(data) + : new RiverError('Unexpected Error Format', { cause: error, ...args }); + }, + (error) => error + ); + + await onError?.(riverErr); await handleFinish(); return; } const reader = response.value.body?.getReader(); if (!reader) { - await onError?.(new RiverError('Failed to get reader', true)); + await onError?.( + new RiverError('Failed to get reader', { + code: 'INTERNAL_SERVER_ERROR', + agentId + }) + ); await handleFinish(); return; } @@ -74,7 +112,10 @@ const createClientCaller = (endpoint: string): RiverClien while (!done) { const readResult = await ResultAsync.fromPromise(reader.read(), (error) => { - return new RiverError('Failed to read stream', error); + return new RiverError('Failed to read stream', { + code: 'INTERNAL_SERVER_ERROR', + cause: error + }); }); if (readResult.isErr()) { @@ -100,17 +141,46 @@ const createClientCaller = (endpoint: string): RiverClien buffer = messages.pop() || ''; for (const message of messages) { - if (!message.trim().startsWith('data: ')) continue; + const lines = message.split('\n'); + let eventType: string | null = null; + let dataPayload = ''; + + for (const rawLine of lines) { + const line = rawLine.trim(); + if (!line) continue; + if (line.startsWith('event:')) { + eventType = line.slice('event:'.length).trim(); + } else if (line.startsWith('data:')) { + dataPayload += line.slice('data:'.length).trim(); + } + } + + if (eventType === 'error') { + const result = RiverErrorJSONSchema.safeParse(dataPayload); + const riverErr = result.success + ? RiverError.fromJSON(result.data) + : new RiverError('Stream error', { + code: 'INTERNAL_SERVER_ERROR', + cause: dataPayload, + agentId + }); + + await onError?.(riverErr); - const rawData = message.replace('data: ', '').trim(); + await reader.cancel(); + abortController.abort(); + done = true; + break; + } + + if (!dataPayload) continue; let parsed: unknown; try { - parsed = JSON.parse(rawData); + parsed = JSON.parse(dataPayload); } catch { - parsed = rawData; + parsed = dataPayload; } - await onChunk?.(parsed as any, totalChunks); totalChunks += 1; } diff --git a/src/lib/river/errors.ts b/src/lib/river/errors.ts index 65e8bb3..7f7c702 100644 --- a/src/lib/river/errors.ts +++ b/src/lib/river/errors.ts @@ -1,10 +1,233 @@ -export class RiverError { +import z from 'zod'; + +export type RiverErrorCode = + | 'BAD_REQUEST' + | 'UNAUTHORIZED' + | 'FORBIDDEN' + | 'NOT_FOUND' + | 'METHOD_NOT_SUPPORTED' + | 'TIMEOUT' + | 'CONFLICT' + | 'PRECONDITION_FAILED' + | 'PAYLOAD_TOO_LARGE' + | 'UNPROCESSABLE_CONTENT' + | 'TOO_MANY_REQUESTS' + | 'CLIENT_CLOSED_REQUEST' + | 'INTERNAL_SERVER_ERROR' + | 'NOT_IMPLEMENTED' + | 'BAD_GATEWAY' + | 'SERVICE_UNAVAILABLE' + | 'GATEWAY_TIMEOUT'; + +/** + * Zod enum for `RiverErrorCode` to enable schema validation on the client. + */ +export const RiverErrorCodeSchema = z.enum([ + 'BAD_REQUEST', + 'UNAUTHORIZED', + 'FORBIDDEN', + 'NOT_FOUND', + 'METHOD_NOT_SUPPORTED', + 'TIMEOUT', + 'CONFLICT', + 'PRECONDITION_FAILED', + 'PAYLOAD_TOO_LARGE', + 'UNPROCESSABLE_CONTENT', + 'TOO_MANY_REQUESTS', + 'CLIENT_CLOSED_REQUEST', + 'INTERNAL_SERVER_ERROR', + 'NOT_IMPLEMENTED', + 'BAD_GATEWAY', + 'SERVICE_UNAVAILABLE', + 'GATEWAY_TIMEOUT' +]); + +/** + * Mapping from `RiverErrorCode` to an appropriate HTTP status code. Used when + * formatting errors for HTTP responses or SSE error frames. + */ +const HTTP_STATUS_FROM_CODE: Record = { + BAD_REQUEST: 400, + UNAUTHORIZED: 401, + FORBIDDEN: 403, + NOT_FOUND: 404, + METHOD_NOT_SUPPORTED: 405, + TIMEOUT: 408, + CONFLICT: 409, + PRECONDITION_FAILED: 412, + PAYLOAD_TOO_LARGE: 413, + UNPROCESSABLE_CONTENT: 422, + TOO_MANY_REQUESTS: 429, + CLIENT_CLOSED_REQUEST: 499, + INTERNAL_SERVER_ERROR: 500, + NOT_IMPLEMENTED: 501, + BAD_GATEWAY: 502, + SERVICE_UNAVAILABLE: 503, + GATEWAY_TIMEOUT: 504 +}; + +/** + * Derive a `RiverErrorCode` from an HTTP status. Used client-side when + * converting non-OK HTTP responses into normalized River errors. + */ +export const codeFromStatus = (status: number): RiverErrorCode => { + switch (status) { + case 400: + return 'BAD_REQUEST'; + case 401: + return 'UNAUTHORIZED'; + case 403: + return 'FORBIDDEN'; + case 404: + return 'NOT_FOUND'; + case 405: + return 'METHOD_NOT_SUPPORTED'; + case 408: + return 'TIMEOUT'; + case 409: + return 'CONFLICT'; + case 412: + return 'PRECONDITION_FAILED'; + case 413: + return 'PAYLOAD_TOO_LARGE'; + case 422: + return 'UNPROCESSABLE_CONTENT'; + case 429: + return 'TOO_MANY_REQUESTS'; + case 499: + return 'CLIENT_CLOSED_REQUEST'; + case 500: + return 'INTERNAL_SERVER_ERROR'; + case 501: + return 'NOT_IMPLEMENTED'; + case 502: + return 'BAD_GATEWAY'; + case 503: + return 'SERVICE_UNAVAILABLE'; + case 504: + return 'GATEWAY_TIMEOUT'; + default: + // Heuristic defaults: bucket other 4xx to BAD_REQUEST and other 5xx to INTERNAL + if (status >= 500) return 'INTERNAL_SERVER_ERROR'; + if (status >= 400) return 'BAD_REQUEST'; + return 'INTERNAL_SERVER_ERROR'; + } +}; + +/** + * JSON-serializable shape of a River error. This is the payload sent to clients + * over HTTP (for non-OK responses) and SSE error frames (`event: error`). + * + * - `message`: human-readable description for display and logs + * - `code`: TRPC-style error code describing the error category + * - `httpStatus`: optional HTTP status to use in responses + * - `agentId`: optional agent identifier where the error occurred + * - `details`: optional structured metadata (e.g., zod issues) + */ +export type RiverErrorJSON = { + message: string; + code: RiverErrorCode; + httpStatus?: number; + agentId?: string; + details?: unknown; +}; + +/** + * Zod schema for `RiverErrorJSON` with `passthrough()` to allow forwards-compatible + * fields without breaking validation. Used on the client when parsing HTTP/SSE + * error payloads with `safeParse`. + */ +export const RiverErrorJSONSchema = z + .object({ + message: z.string(), + code: RiverErrorCodeSchema, + httpStatus: z.number().optional(), + agentId: z.string().optional(), + details: z.unknown().optional() + }) + .loose(); + +export class RiverError extends Error { __name__ = 'RiverError'; message: string; - cause: unknown; + code: RiverErrorCode; + httpStatus?: number; + agentId?: string; + cause?: unknown; + details?: unknown; - constructor(message: string, cause?: unknown) { + constructor( + message: string, + options?: { + code?: RiverErrorCode; + httpStatus?: number; + agentId?: string; + cause?: unknown; + details?: unknown; + } + ) { + super(message); + this.name = 'RiverError'; this.message = message; - this.cause = cause; + const opts = options ?? {}; + this.code = opts.code ?? 'INTERNAL_SERVER_ERROR'; + this.httpStatus = opts.httpStatus ?? HTTP_STATUS_FROM_CODE[this.code]; + this.agentId = opts.agentId; + this.cause = opts.cause; + this.details = opts.details; + } + + /** + * Runtime type guard that checks whether a value is a `RiverError`. + */ + static isRiverError(err: unknown): err is RiverError { + return ( + !!err && + typeof err === 'object' && + ('__name__' in err + ? (err as any).__name__ === 'RiverError' + : false || (err as any).name === 'RiverError') + ); + } + + /** + * Normalize an unknown thrown value into a `RiverError`, using the provided + * `fallbackCode` when the value does not already represent a River error. + */ + static fromUnknown( + err: unknown, + fallbackCode: RiverErrorCode = 'INTERNAL_SERVER_ERROR' + ): RiverError { + if (RiverError.isRiverError(err)) return err as RiverError; + if (err instanceof Error) { + return new RiverError(err.message || 'Unknown error', { code: fallbackCode, cause: err }); + } + return new RiverError('Unknown error', { code: fallbackCode, cause: err }); + } + + /** + * Convert a `RiverError` instance to a JSON-serializable payload for + * transport over HTTP or SSE. + */ + static toJSON(err: RiverError): RiverErrorJSON { + return { + message: err.message, + code: err.code, + httpStatus: err.httpStatus, + agentId: err.agentId, + details: err.details + }; + } + + /** + * Reconstruct a `RiverError` instance from a serialized JSON payload. + */ + static fromJSON(json: RiverErrorJSON): RiverError { + return new RiverError(json.message, { + code: json.code, + httpStatus: json.httpStatus, + agentId: json.agentId, + details: json.details + }); } } diff --git a/src/lib/river/server.ts b/src/lib/river/server.ts index 5be800b..f493018 100644 --- a/src/lib/river/server.ts +++ b/src/lib/river/server.ts @@ -87,7 +87,7 @@ const createServerSideAgentRunner: ServerSideAgentRunner = (router) => { }; }; -const createServerEndpointHandler: ServerEndpointHandler = (router) => { +const createServerEndpointHandler: ServerEndpointHandler = (router, options) => { const runner = createServerSideAgentRunner(router); return { POST: async (event) => { @@ -107,16 +107,26 @@ const createServerEndpointHandler: ServerEndpointHandler = (router) => { input: router[body.agentId].inputSchema }); - const bodyResult = bodySchema.safeParse(body); - if (!bodyResult.success) { - const error = new RiverError('Invalid body', bodyResult.error); - return new Response(JSON.stringify(error), { status: 400 }); + const bodyResult = bodySchema.safeParse(body); + if (!bodyResult.success) { + const err = new RiverError('Invalid body', { + code: 'BAD_REQUEST', + httpStatus: 400, + agentId: typeof body?.agentId === 'string' ? body.agentId : undefined, + details: bodyResult.error + }); + + const payload = options?.errorFormatter + ? options.errorFormatter(err) + : RiverError.toJSON(err); + return new Response(JSON.stringify(payload), { status: payload.httpStatus ?? 400 }); } const stream = new ReadableStream({ async start(streamController) { // TODO: make it so that you can do some wait until and piping shit in here const internalAgent = router[bodyResult.data.agentId]; + const encoder = new TextEncoder(); try { await runner.runAgent({ agentId: bodyResult.data.agentId, @@ -136,22 +146,40 @@ const createServerEndpointHandler: ServerEndpointHandler = (router) => { if (internalAgent.type === 'ai-sdk' && internalAgent.afterAgentRun) { await internalAgent.afterAgentRun('canceled'); } - } else { - streamController.error(error); - if (internalAgent.type === 'ai-sdk' && internalAgent.afterAgentRun) { - await internalAgent.afterAgentRun('error'); - } - } - } finally { - streamController.close(); - } - }, - cancel(reason) { - abortController.abort(reason); + } else { + const normalized = RiverError.fromUnknown(error, 'INTERNAL_SERVER_ERROR'); + // attach agentId for consistency + normalized.agentId = bodyResult.data.agentId as string; + // Server-side error hook (tRPC-style) + await options?.onError?.({ + error: normalized, + agentId: bodyResult.data.agentId as string, + input: bodyResult.data.input, + event + }); + const payload = options?.errorFormatter + ? options.errorFormatter(normalized) + : RiverError.toJSON(normalized); + const sseErrorChunk = `event: error\ndata: ${JSON.stringify(payload)}\n\n`; + streamController.enqueue(encoder.encode(sseErrorChunk)); + if (internalAgent.type === 'ai-sdk' && internalAgent.afterAgentRun) { + await internalAgent.afterAgentRun('error'); + } + } + } finally { + streamController.close(); + } + }, + cancel(reason) { + abortController.abort(reason); + } + }); + + return new Response(stream, { + headers: { + 'Content-Type': 'text/event-stream' } }); - - return new Response(stream); } }; }; diff --git a/src/lib/river/types.ts b/src/lib/river/types.ts index 89dfd71..329b8cd 100644 --- a/src/lib/river/types.ts +++ b/src/lib/river/types.ts @@ -2,6 +2,7 @@ import type { StreamTextResult, TextStreamPart, Tool, ToolSet } from 'ai'; import z from 'zod'; import type { RequestEvent } from '@sveltejs/kit'; import type { RiverError } from './errors.js'; +import type { RiverErrorJSON } from './errors.js'; // AGENTS SECTION type AiSdkRiverAgent = { @@ -84,6 +85,25 @@ type DecoratedAgentRouter = { [K in keyof T]: InferRiverAgent; }; +type ServerOnErrorHook = (ctx: { + /** The normalized River error instance. */ + error: RiverError; + /** The agent identifier (path) that failed. */ + agentId: string; + /** The input provided to the agent call. */ + input: unknown; + /** The request event for additional context. */ + event: RequestEvent; +}) => void | Promise; + +/** Optional formatter that controls the JSON shape sent to clients. */ +type ServerErrorFormatter = (err: RiverError) => RiverErrorJSON; + +type AgentRouterOptions = { + onError?: ServerOnErrorHook; + errorFormatter?: ServerErrorFormatter; +}; + type CreateAgentRouter = (agents: T) => DecoratedAgentRouter; // SERVER RUNNER SECTION @@ -100,7 +120,8 @@ type ServerSideAgentRunner = ( }; type ServerEndpointHandler = ( - router: DecoratedAgentRouter + router: DecoratedAgentRouter, + options?: AgentRouterOptions ) => { POST: (event: RequestEvent) => Promise }; // CLIENT CALLER SECTION diff --git a/src/routes/examples/river/agents.ts b/src/routes/examples/river/agents.ts index ada7380..6d7bc15 100644 --- a/src/routes/examples/river/agents.ts +++ b/src/routes/examples/river/agents.ts @@ -1,4 +1,4 @@ -import { RIVER_SERVER } from '$lib/index.js'; +import { RIVER_SERVER, RiverError } from '$lib/index.js'; import z from 'zod'; import { demoAiStream } from './garbage.js'; import { chatDemoAiStream } from './chat.js'; @@ -25,6 +25,10 @@ export const exampleAiSdkAgent = RIVER_SERVER.createAiSdkAgent({ prompt: z.string() }), beforeAgentRun: (input) => { + /* + * Potential issue (maybe its intended): + * you always have to return the prompt even if you your perforing a side effect? + */ return { prompt: input.prompt + ' also are apples purple?' };