diff --git a/bun.lock b/bun.lock index e0e79b3..8939bff 100644 --- a/bun.lock +++ b/bun.lock @@ -1,6 +1,5 @@ { "lockfileVersion": 1, - "configVersion": 0, "workspaces": { "": { "name": "seda-data-proxy", @@ -16,12 +15,13 @@ }, "workspace/data-proxy": { "name": "@seda-protocol/data-proxy", - "version": "1.3.5", + "version": "1.3.6", "dependencies": { "@commander-js/extra-typings": "^12.1.0", "@cosmjs/crypto": "^0.33.1", "@cosmjs/encoding": "^0.33.1", "@dotenvx/dotenvx": "^1.51.4", + "@pythnetwork/pyth-lazer-sdk": "^6.0.0", "@seda-protocol/data-proxy-sdk": "workspace:*", "@seda-protocol/utils": "^2.0.0", "big.js": "7.0.1", @@ -121,6 +121,8 @@ "@inquirer/type": ["@inquirer/type@1.5.5", "", { "dependencies": { "mute-stream": "^1.0.0" } }, "sha512-MzICLu4yS7V8AA61sANROZ9vT1H3ooca5dSmI1FjZkzq7o/koMsRfQSzRtFo+F3Ao4Sf1C0bpLKejpKB/+j6MA=="], + "@isaacs/ttlcache": ["@isaacs/ttlcache@1.4.1", "", {}, "sha512-RQgQ4uQ+pLbqXfOmieB91ejmLwvSgv9nLx6sT6sD83s7umBypgg+OIBOBbEUiJXrfpnp9j0mRhYYdzp9uqq3lA=="], + "@jsep-plugin/assignment": ["@jsep-plugin/assignment@1.3.0", "", { "peerDependencies": { "jsep": "^0.4.0||^1.0.0" } }, "sha512-VVgV+CXrhbMI3aSusQyclHkenWSAm95WaiKrMxRFam3JSUiIaQjoMIw2sEs/OX4XifnqeQUN4DYbJjlA8EfktQ=="], "@jsep-plugin/regex": ["@jsep-plugin/regex@1.0.4", "", { "peerDependencies": { "jsep": "^0.4.0||^1.0.0" } }, "sha512-q7qL4Mgjs1vByCaTnDFcBnV9HS7GVPJX5vyVoCgZHNSC9rjwIlmbXG5sUuorR5ndfHAIlJ8pVStxvjXHbNvtUg=="], @@ -161,6 +163,8 @@ "@protobufjs/utf8": ["@protobufjs/utf8@1.1.0", "", {}, "sha512-Vvn3zZrhQZkkBE8LSuW3em98c0FwgO4nxzv6OdSxPKJIEKY2bGbHn+mhGIPerzI4twdxaP8/0+06HBpwf345Lw=="], + "@pythnetwork/pyth-lazer-sdk": ["@pythnetwork/pyth-lazer-sdk@6.0.0", "", { "dependencies": { "@isaacs/ttlcache": "^1.4.1", "buffer": "^6.0.3", "isomorphic-ws": "^5.0.0", "ts-log": "^2.2.7", "ws": "^8.18.0" } }, "sha512-cty4/M4zQOUtFnm1ZsCfMs9x3sLk4ZmEwW0CPqMtDZFai94QGsXHkKvi5LkBPRL4aQg96EuVzOa37H0lHqANeQ=="], + "@seda-protocol/data-proxy": ["@seda-protocol/data-proxy@workspace:workspace/data-proxy"], "@seda-protocol/data-proxy-sdk": ["@seda-protocol/data-proxy-sdk@workspace:workspace/data-proxy-sdk"], @@ -219,6 +223,8 @@ "brorand": ["brorand@1.1.0", "", {}, "sha512-cKV8tMCEpQs4hK/ik71d6LrPOnpkpGBR0wzxqr68g2m/LB2GxVYQroAjMJZRVM1Y4BCjCKc3vAamxSzOY2RP+w=="], + "buffer": ["buffer@6.0.3", "", { "dependencies": { "base64-js": "^1.3.1", "ieee754": "^1.2.1" } }, "sha512-FTiCpNxtwiZZHEZbcbTIcZjERVICn9yq/pDFkTl95/AxzD1naBctN7YO68riM/gLSDY7sdrMby8hofADYuuqOA=="], + "bun-types": ["bun-types@1.2.23", "", { "dependencies": { "@types/node": "*" }, "peerDependencies": { "@types/react": "^19" } }, "sha512-R9f0hKAZXgFU3mlrA0YpE/fiDvwV0FT9rORApt2aQVWSuJDzZOyB5QLc0N/4HF57CS8IXJ6+L5E4W1bW6NS2Aw=="], "call-bind-apply-helpers": ["call-bind-apply-helpers@1.0.2", "", { "dependencies": { "es-errors": "^1.3.0", "function-bind": "^1.1.2" } }, "sha512-Sp1ablJ0ivDkSzjcaJdxEunN5/XvksFJ2sMBFfq6x0ryhQV/2b/KwFe21cMpmHtPOSij8K99/wSfoEuTObmuMQ=="], @@ -361,7 +367,7 @@ "isexe": ["isexe@3.1.1", "", {}, "sha512-LpB/54B+/2J5hqQ7imZHfdU31OlgQqx7ZicVlkm9kzg9/w8GKLEcFfJl/t7DCEDueOyBAD6zCCwTO6Fzs0NoEQ=="], - "isomorphic-ws": ["isomorphic-ws@4.0.1", "", { "peerDependencies": { "ws": "*" } }, "sha512-BhBvN2MBpWTaSHdWRb/bwdZJ1WaehQ2L1KngkCkfLUGF0mAWAT1sQUQacEmQ0jXkFw/czDXPNQSL5u2/Krsz1w=="], + "isomorphic-ws": ["isomorphic-ws@5.0.0", "", { "peerDependencies": { "ws": "*" } }, "sha512-muId7Zzn9ywDsyXgTIafTry2sV3nySZeUDe6YedVd1Hvuuep5AsIlqK+XefWpYTyJG5e503F2xIuT2lcU6rCSw=="], "jsep": ["jsep@1.4.0", "", {}, "sha512-B7qPcEVE3NVkmSJbaYxvv4cHkVW7DQsZz13pUMrfS8z8Q/BuShN+gcTXrUlPiGqM2/t/EEaI030bpxMqY8gMlw=="], @@ -483,6 +489,8 @@ "true-myth": ["true-myth@9.0.1", "", {}, "sha512-8ePgamjgV3CKDI43o6eUtid+9iadfEKUiu2WTucbeCnPTyDOQpZ4svCuozLwm9S4hzUolOclsUCJf3yjQg3g/Q=="], + "ts-log": ["ts-log@2.2.7", "", {}, "sha512-320x5Ggei84AxzlXp91QkIGSw5wgaLT6GeAH0KsqDmRZdVWW2OiSeVvElVoatk3f7nicwXlElXsoFkARiGE2yg=="], + "type-fest": ["type-fest@4.41.0", "", {}, "sha512-TeTSQ6H5YHvpqVwBRcnLDCBnDOHWYu7IvGbHT6N8AOymcr9PJGjc1GTtiWZTYg0NCgYwvnYWEkVChQAr9bjfwA=="], "typescript": ["typescript@5.8.3", "", { "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" } }, "sha512-p1diW6TqL9L07nNxvRMM7hMMw4c5XOo/1ibL4aAIGmSAt9slTE1Xgw5KWuof2uTOvCg9BY7ZRi+GaF+7sfgPeQ=="], @@ -505,7 +513,7 @@ "wrap-ansi": ["wrap-ansi@6.2.0", "", { "dependencies": { "ansi-styles": "^4.0.0", "string-width": "^4.1.0", "strip-ansi": "^6.0.0" } }, "sha512-r6lPcBGxZXlIcymEu7InxDMhdW0KDxpLgoFLcguasxCaJ/SOIZwINatK9KY/tf+ZrlywOKU0UDj3ATXUBfxJXA=="], - "ws": ["ws@7.5.10", "", { "peerDependencies": { "bufferutil": "^4.0.1", "utf-8-validate": "^5.0.2" }, "optionalPeers": ["bufferutil", "utf-8-validate"] }, "sha512-+dbF1tHwZpXcbOJdVOkzLDxZP1ailvSxM6ZweXTegylPny803bFhA+vqBYw4s31NSAk4S2Qz+AKXK9a4wkdjcQ=="], + "ws": ["ws@8.19.0", "", { "peerDependencies": { "bufferutil": "^4.0.1", "utf-8-validate": ">=5.0.2" }, "optionalPeers": ["bufferutil", "utf-8-validate"] }, "sha512-blAT2mjOEIi0ZzruJfIhb3nps74PRWTCz1IjglWEEpQl5XS/UNama6u2/rjFkDDouqr4L67ry+1aGIALViWjDg=="], "xstream": ["xstream@11.14.0", "", { "dependencies": { "globalthis": "^1.0.1", "symbol-observable": "^2.0.3" } }, "sha512-1bLb+kKKtKPbgTK6i/BaoAn03g47PpFstlbe1BA+y3pNS/LfvcaghS5BFf9+EE1J+KwSQsEpfJvFN5GqFtiNmw=="], @@ -519,6 +527,10 @@ "@bundled-es-modules/cookie/cookie": ["cookie@0.7.2", "", {}, "sha512-yki5XnKuf750l50uGTllt6kKILY4nQ1eNIQatoXEByZ5dWgnKqbnqmTrBE5B4N7lrMJKQ2ytWMiTO2o0v6Ew/w=="], + "@cosmjs/socket/isomorphic-ws": ["isomorphic-ws@4.0.1", "", { "peerDependencies": { "ws": "*" } }, "sha512-BhBvN2MBpWTaSHdWRb/bwdZJ1WaehQ2L1KngkCkfLUGF0mAWAT1sQUQacEmQ0jXkFw/czDXPNQSL5u2/Krsz1w=="], + + "@cosmjs/socket/ws": ["ws@7.5.10", "", { "peerDependencies": { "bufferutil": "^4.0.1", "utf-8-validate": "^5.0.2" }, "optionalPeers": ["bufferutil", "utf-8-validate"] }, "sha512-+dbF1tHwZpXcbOJdVOkzLDxZP1ailvSxM6ZweXTegylPny803bFhA+vqBYw4s31NSAk4S2Qz+AKXK9a4wkdjcQ=="], + "@dotenvx/dotenvx/commander": ["commander@11.1.0", "", {}, "sha512-yPVavfyCcRhmorC7rWlkHn15b4wDVgVmBA7kV4QVBsF7kv/9TKJAbAXVTxvTnwP8HHKjRCJDClKbciiYS7p0DQ=="], "@inquirer/core/@inquirer/type": ["@inquirer/type@2.0.0", "", { "dependencies": { "mute-stream": "^1.0.0" } }, "sha512-XvJRx+2KR3YXyYtPUUy+qd9i7p+GO9Ko6VIIpWlBrpWwXDv8WLFeHTxz35CfQFUiBMLXlGHhGzys7lqit9gWag=="], diff --git a/workspace/data-proxy/package.json b/workspace/data-proxy/package.json index a902d65..235c46e 100644 --- a/workspace/data-proxy/package.json +++ b/workspace/data-proxy/package.json @@ -11,6 +11,7 @@ "@cosmjs/crypto": "^0.33.1", "@cosmjs/encoding": "^0.33.1", "@dotenvx/dotenvx": "^1.51.4", + "@pythnetwork/pyth-lazer-sdk": "^6.0.0", "@seda-protocol/data-proxy-sdk": "workspace:*", "@seda-protocol/utils": "^2.0.0", "big.js": "7.0.1", diff --git a/workspace/data-proxy/src/config-parser.ts b/workspace/data-proxy/src/config-parser.ts index 5bc7b59..0eccaea 100644 --- a/workspace/data-proxy/src/config-parser.ts +++ b/workspace/data-proxy/src/config-parser.ts @@ -27,6 +27,7 @@ const RouteSchema = v.strictObject( baseURL: maybe(v.string()), path: v.string(), upstreamUrl: v.string(), + fetcher: v.optional(v.string()), method: v.optional(HttpMethodSchema, DEFAULT_HTTP_METHODS), jsonPath: v.optional(v.pipe(v.string(), v.startsWith("$"))), allowedQueryParams: v.optional(v.array(v.string())), @@ -262,29 +263,32 @@ export function parseConfig( } // Ensure variables in the path are used in the upstream URL, the headers or the jsonPath. - for (const match of route.path.matchAll(pathVarRegex)) { - let isUsed = false; + // Skip this check for custom fetchers since they control their own parameter usage. + if (!route.fetcher) { + for (const match of route.path.matchAll(pathVarRegex)) { + let isUsed = false; - if (route.upstreamUrl.includes(match[1])) { - isUsed = true; - } - - for (const [_, headerValue] of Object.entries(route.headers)) { - if (headerValue.includes(match[1])) { + if (route.upstreamUrl.includes(match[1])) { isUsed = true; - break; } - } - if (route.jsonPath?.includes(match[1])) { - isUsed = true; - } + for (const [_, headerValue] of Object.entries(route.headers)) { + if (headerValue.includes(match[1])) { + isUsed = true; + break; + } + } - if (!isUsed) { - hasWarnings = true; - logger.warn( - `$.${index}.path has ${match[1]}, but it is not used in $.${index}.upstreamUrl or $.${index}.headers. \nPlease either remove the variable from the path or use it in the upstreamUrl or headers (through "{${match[1]}}").`, - ); + if (route.jsonPath?.includes(match[1])) { + isUsed = true; + } + + if (!isUsed) { + hasWarnings = true; + logger.warn( + `$.${index}.path has ${match[1]}, but it is not used in $.${index}.upstreamUrl or $.${index}.headers. \nPlease either remove the variable from the path or use it in the upstreamUrl or headers (through "{${match[1]}}").`, + ); + } } } diff --git a/workspace/data-proxy/src/fetchers/default-fetcher.ts b/workspace/data-proxy/src/fetchers/default-fetcher.ts new file mode 100644 index 0000000..0304012 --- /dev/null +++ b/workspace/data-proxy/src/fetchers/default-fetcher.ts @@ -0,0 +1,20 @@ +import { headersToRecord } from "../utils/headers"; +import type { Fetcher, FetcherRequest, FetcherResponse } from "./types"; + +export const defaultFetcher: Fetcher = { + async fetch(request: FetcherRequest): Promise { + const response = await fetch(request.url, { + method: request.method, + headers: request.headers, + body: request.body, + }); + + const body = await response.text(); + + return { + status: response.status, + body, + headers: headersToRecord(response.headers), + }; + }, +}; diff --git a/workspace/data-proxy/src/fetchers/index.ts b/workspace/data-proxy/src/fetchers/index.ts new file mode 100644 index 0000000..da3facb --- /dev/null +++ b/workspace/data-proxy/src/fetchers/index.ts @@ -0,0 +1,23 @@ +import { defaultFetcher } from "./default-fetcher"; +import { pythProFetcher } from "./pyth-pro"; +import type { Fetcher } from "./types"; + +export type { Fetcher, FetcherRequest, FetcherResponse } from "./types"; + +const registry = new Map(); + +export function registerFetcher(name: string, fetcher: Fetcher): void { + registry.set(name, fetcher); +} + +export function getFetcher(name: string): Fetcher | undefined { + return registry.get(name); +} + +export function getRegisteredFetcherNames(): string[] { + return Array.from(registry.keys()); +} + +// Built-in fetchers — add new ones here +registerFetcher("default", defaultFetcher); +registerFetcher("pyth-pro", pythProFetcher); diff --git a/workspace/data-proxy/src/fetchers/pyth-pro.ts b/workspace/data-proxy/src/fetchers/pyth-pro.ts new file mode 100644 index 0000000..aa80b9e --- /dev/null +++ b/workspace/data-proxy/src/fetchers/pyth-pro.ts @@ -0,0 +1,195 @@ +import type { + Channel, + Format, + JsonBinaryEncoding, + PriceFeedProperty, +} from "@pythnetwork/pyth-lazer-sdk"; +import { PythLazerClient } from "@pythnetwork/pyth-lazer-sdk"; +import type { Fetcher, FetcherRequest, FetcherResponse } from "./types"; + +const clientCache = new Map(); + +async function getOrCreateClient( + priceServiceUrl: string, + token: string, +): Promise { + const key = `${priceServiceUrl}|${token}`; + if (!clientCache.has(key)) { + clientCache.set( + key, + await PythLazerClient.create({ token, priceServiceUrl }), + ); + } + return clientCache.get(key) as PythLazerClient; +} + +type PythProEndpoint = "latest_price" | "price"; + +interface PythProRequestBody { + // Feed identifiers — at least one of these must be provided + priceFeedIds?: number[]; + priceFeedSymbols?: string[]; + // Required when the URL endpoint is "price". + // Must be whole-second precision in microseconds: Math.floor(Date.now() / 1000) * 1_000_000 + timestamp?: number; + // SDK subscription params — passed through directly to the SDK + channel: Channel; + formats: Format[]; + properties: PriceFeedProperty[]; + jsonBinaryEncoding?: JsonBinaryEncoding; + parsed?: boolean; +} + +function resolveEndpoint(rawUrl: string): PythProEndpoint | null { + let pathname: string; + try { + pathname = new URL(rawUrl).pathname; + } catch { + return null; + } + const last = pathname.split("/").filter(Boolean).pop(); + if (last === "latest_price" || last === "price") return last; + return null; +} + +/** Strips the trailing /v1/ from a URL to get the Pyth Lazer service base URL. */ +function resolveBaseUrl(rawUrl: string): string { + try { + const parsed = new URL(rawUrl); + const v1Index = parsed.pathname.lastIndexOf("/v1/"); + const basePath = + v1Index !== -1 ? parsed.pathname.substring(0, v1Index) : ""; + return `${parsed.origin}${basePath}`; + } catch { + return rawUrl; + } +} + +export const pythProFetcher: Fetcher = { + async fetch(request: FetcherRequest): Promise { + const authHeader = + request.headers.authorization ?? request.headers.Authorization; + + if (!authHeader) { + return { + status: 401, + body: JSON.stringify({ + error: + 'Missing Authorization header. Set it in the route config: "headers": { "Authorization": "Bearer {$PYTH_LAZER_TOKEN}" }', + }), + headers: { "content-type": "application/json" }, + }; + } + + const token = authHeader.startsWith("Bearer ") + ? authHeader.slice("Bearer ".length) + : authHeader; + + let parsedBody: PythProRequestBody; + try { + parsedBody = JSON.parse(request.body ?? "{}") as PythProRequestBody; + } catch { + return { + status: 400, + body: JSON.stringify({ error: "Request body must be valid JSON" }), + headers: { "content-type": "application/json" }, + }; + } + + const { + priceFeedIds, + priceFeedSymbols, + timestamp, + channel, + formats, + properties, + jsonBinaryEncoding, + parsed, + } = parsedBody; + + if ( + (!priceFeedIds || priceFeedIds.length === 0) && + (!priceFeedSymbols || priceFeedSymbols.length === 0) + ) { + return { + status: 400, + body: JSON.stringify({ + error: + "Request body must include at least one of: priceFeedIds, priceFeedSymbols", + }), + headers: { "content-type": "application/json" }, + }; + } + + if (!channel) { + return { + status: 400, + body: JSON.stringify({ + error: + 'Request body must include "channel" (real_time | fixed_rate@50ms | fixed_rate@200ms | fixed_rate@1000ms)', + }), + headers: { "content-type": "application/json" }, + }; + } + + if (!properties || properties.length === 0) { + return { + status: 400, + body: JSON.stringify({ + error: 'Request body must include "properties" array', + }), + headers: { "content-type": "application/json" }, + }; + } + + const endpoint = resolveEndpoint(request.url); + + if (!endpoint) { + return { + status: 400, + body: JSON.stringify({ + error: + 'Upstream URL must end with either "/v1/latest_price" or "/v1/price"', + }), + headers: { "content-type": "application/json" }, + }; + } + + if (endpoint === "price" && timestamp === undefined) { + return { + status: 400, + body: JSON.stringify({ + error: + 'The "/v1/price" endpoint requires "timestamp" (whole-second microseconds, e.g. Math.floor(Date.now() / 1000) * 1_000_000) in the request body', + }), + headers: { "content-type": "application/json" }, + }; + } + + const baseUrl = resolveBaseUrl(request.url); + const client = await getOrCreateClient(baseUrl, token); + + const sdkParams = { + ...(priceFeedIds && priceFeedIds.length > 0 ? { priceFeedIds } : {}), + ...(priceFeedSymbols && priceFeedSymbols.length > 0 + ? { symbols: priceFeedSymbols } + : {}), + channel, + formats: formats ?? [], + properties, + ...(jsonBinaryEncoding !== undefined ? { jsonBinaryEncoding } : {}), + ...(parsed !== undefined ? { parsed } : {}), + }; + // timestamp is guaranteed defined here — we return 400 above if endpoint === "price" && timestamp === undefined + const result = + endpoint === "price" && timestamp !== undefined + ? await client.getPrice({ ...sdkParams, timestamp }) + : await client.getLatestPrice(sdkParams); + + return { + status: 200, + body: JSON.stringify(result), + headers: { "content-type": "application/json" }, + }; + }, +}; diff --git a/workspace/data-proxy/src/fetchers/types.ts b/workspace/data-proxy/src/fetchers/types.ts new file mode 100644 index 0000000..fdf291f --- /dev/null +++ b/workspace/data-proxy/src/fetchers/types.ts @@ -0,0 +1,23 @@ +export interface FetcherRequest { + /** Resolved upstream URL after path/env variable substitution. Empty string when no upstreamUrl is configured. */ + url: string; + method: string; + /** Merged request headers with host removed and configured route headers applied. */ + headers: Record; + body?: string; + /** Raw Elysia path params captured from the route pattern (e.g. { feedId: "BTC" }). */ + pathParams: Record; + /** Allowlist-filtered query params from the incoming request. */ + queryParams: URLSearchParams; +} + +export interface FetcherResponse { + status: number; + body: string; + /** Response headers — used by forwardResponseHeaders to selectively pass headers to the client. */ + headers: Record; +} + +export interface Fetcher { + fetch(request: FetcherRequest): Promise; +} diff --git a/workspace/data-proxy/src/proxy-server.ts b/workspace/data-proxy/src/proxy-server.ts index 2f5da74..f7de944 100644 --- a/workspace/data-proxy/src/proxy-server.ts +++ b/workspace/data-proxy/src/proxy-server.ts @@ -3,16 +3,19 @@ import { constants, type DataProxy } from "@seda-protocol/data-proxy-sdk"; import { tryAsync } from "@seda-protocol/utils"; import * as Match from "effect/Match"; import * as Option from "effect/Option"; -import { Elysia } from "elysia"; +import { Elysia, type HTTPMethod } from "elysia"; import { Maybe } from "true-myth"; -import { type Config, getHttpMethods } from "./config-parser"; +import { type Config, type Route, getHttpMethods } from "./config-parser"; import { DEFAULT_PROXY_ROUTE_GROUP, JSON_PATH_HEADER_KEY } from "./constants"; +import { getFetcher } from "./fetchers"; +import { defaultFetcher } from "./fetchers/default-fetcher"; import logger from "./logger"; import { StatusContext, statusPlugin } from "./status-plugin"; import { createDefaultResponseHeaders, createSignedResponseHeaders, } from "./utils/create-headers"; +import { headersToRecord } from "./utils/headers"; import { queryJson } from "./utils/query-json"; import { replaceParams } from "./utils/replace-params"; import { createUrlSearchParams } from "./utils/search-params"; @@ -26,11 +29,345 @@ function createErrorResponse(error: string, status: number) { }); } +type ApplyJsonPathResult = + | { ok: true; data: string; parsedData: unknown } + | { ok: false; response: Response }; + +function applyJsonPath( + responseData: string | object, + expression: string, + label: string, + errorStatus: number, + requestLogger: typeof logger, +): ApplyJsonPathResult { + requestLogger.debug(`Applying ${label} JSONpath ${expression}`); + const result = queryJson(responseData, expression); + + if (result.isErr) { + requestLogger.error(`Failed to apply ${label} JSONpath: ${expression}`, { + error: result.error, + }); + return { + ok: false, + response: createErrorResponse(result.error, errorStatus), + }; + } + + requestLogger.debug(`Successfully applied ${label} JSONpath`); + return { + ok: true, + data: JSON.stringify(result.value), + parsedData: result.value, + }; +} + export interface ProxyServerOptions { port: number; disableProof: boolean; } +interface RouteHandlerContext { + headers: Record; + params: Record; + body: unknown; + path: string; + requestId: string; + request: Request; +} + +async function handleProxyRequest( + ctx: RouteHandlerContext, + route: Route, + routeMethod: HTTPMethod, + proxyGroup: string, + config: Config, + dataProxy: DataProxy, + serverOptions: ProxyServerOptions, +): Promise { + const { headers, params, body, path, requestId, request } = ctx; + const requestLogger = logger.child({ requestId, path }); + + // requestBody is now always a string because of the parse function in this route + const requestBody = Maybe.of(body as string | undefined); + + // Verification with the SEDA chain that the overlay node is eligible + if (!serverOptions.disableProof) { + requestLogger.debug("Verifying proof"); + + const proofHeader = Option.fromNullable( + headers[constants.PROOF_HEADER_KEY], + ); + const sedaFastProofHeader = Option.fromNullable( + headers[constants.SEDA_FAST_PROOF_HEADER_KEY], + ); + + const heightFromHeader = Number(headers[constants.HEIGHT_HEADER_KEY]); + const eligibleHeight = Maybe.of( + Number.isNaN(heightFromHeader) ? undefined : BigInt(heightFromHeader), + ); + + requestLogger.debug( + `Received proof for height ${eligibleHeight.mapOr("unknown", (h) => + h.toString(), + )}`, + ); + + if (Option.isNone(proofHeader) && Option.isNone(sedaFastProofHeader)) { + const message = `Header "${constants.PROOF_HEADER_KEY}" or "${constants.SEDA_FAST_PROOF_HEADER_KEY}" is not provided`; + requestLogger.error(message); + return createErrorResponse(message, 400); + } + + // Disallow SEDA Fast usage if it's not enabled + if (!config.sedaFast?.enable && Option.isSome(sedaFastProofHeader)) { + const message = `Header "${constants.SEDA_FAST_PROOF_HEADER_KEY}" is not allowed`; + requestLogger.error(message); + return createErrorResponse(message, 400); + } + + const proofInfo = Match.value(proofHeader).pipe( + Match.when(Option.isSome, (header) => { + return { + decodedProof: dataProxy.decodeProof(header.value), + rawProof: header.value, + type: "seda-core" as const, + }; + }), + Match.when(Option.isNone, () => { + // Should not throw since we checked for both headers above + return { + decodedProof: dataProxy.decodeSedaFastProof( + Option.getOrThrow(sedaFastProofHeader), + ), + rawProof: Option.getOrThrow(sedaFastProofHeader), + type: "seda-fast" as const, + }; + }), + Match.exhaustive, + ); + + if (proofInfo.decodedProof.isErr) { + const message = `Failed to decode proof: ${proofInfo.decodedProof.error}, make sure the proof is a base64 encoded string`; + requestLogger.error(message); + return createErrorResponse(message, 400); + } + + const proofId = + proofInfo.type === "seda-core" + ? proofInfo.decodedProof.value.drId + : proofInfo.decodedProof.value.publicKey.toString("hex"); + + const idType = + proofInfo.type === "seda-core" + ? "Data Request Id" + : "SEDA Fast Public Key"; + requestLogger.debug(`${idType}: ${proofId}`); + + const verificationResult = await Match.value(proofInfo).pipe( + Match.when({ type: "seda-fast" }, async (proof) => { + // Should not happen since we already checked for this above, but we need to satisfy the type checker + if (proof.decodedProof.isErr) { + throw new Error("Failed to decode proof"); + } + + return { + verification: await dataProxy.verifyFastProof( + proof.decodedProof.value, + ), + type: "seda-fast" as const, + }; + }), + Match.when({ type: "seda-core" }, async (proof) => { + return { + verification: await verifyWithRetry( + requestLogger, + dataProxy, + proof.rawProof, + eligibleHeight, + config.verificationMaxRetries, + () => config.verificationRetryDelay, + ), + type: "seda-core" as const, + }; + }), + Match.exhaustive, + ); + + if (verificationResult.verification.isErr) { + const message = `Failed to verify eligibility proof ${verificationResult.verification.error}`; + requestLogger.error(message); + return createErrorResponse(message, 401); + } + + if (!verificationResult.verification.value.isValid) { + const heightOrTimestamp = + verificationResult.type === "seda-core" + ? verificationResult.verification.value.currentHeight + : verificationResult.verification.value.currentUnixTimestamp; + const message = `Ineligible executor at height/timestamp ${heightOrTimestamp}: ${verificationResult.verification.value.status}`; + requestLogger.error(message); + return createErrorResponse(message, 401); + } + + // Verification passed, we can proceed + } else { + requestLogger.debug("Skipping proof verification."); + } + + // Parse the request URL to get the search params, + // this is to support query params that can be repeated, such as ?one=one&one=two + const requestUrl = new URL(request.url); + + // Add the request search params (?one=two) to the upstream url + const requestSearchParams = createUrlSearchParams( + requestUrl.searchParams, + route.allowedQueryParams, + ); + + const upstreamHeaders = new Headers(); + + // Redirect all headers given by the requester + for (const [key, value] of Object.entries(headers)) { + if (!value || key === constants.PROOF_HEADER_KEY) { + continue; + } + upstreamHeaders.append(key, value); + } + + // Inject all configured headers by the data proxy node configuration + // Important: configured headers take precedence over headers sent in the request + for (const [key, value] of Object.entries(route.headers)) { + upstreamHeaders.set(key, replaceParams(value, params)); + } + + // Host doesn't match since we are proxying. Returning the upstream host while the URL does not match results + // in the client to not return the response. + upstreamHeaders.delete("host"); + + const resolvedUrlResult = injectSearchParamsInUrl( + replaceParams(route.upstreamUrl, params), + requestSearchParams, + ); + if (resolvedUrlResult.isErr) { + return createErrorResponse(resolvedUrlResult.error, 500); + } + const resolvedUrl = resolvedUrlResult.value.toString(); + + let responseData: string; + let upstreamResponseHeaders: Headers; + + // Use the configured custom fetcher, or fall back to the default HTTP fetcher + const fetcherName = route.fetcher ?? "default"; + const fetcher = + (route.fetcher ? getFetcher(route.fetcher) : undefined) ?? defaultFetcher; + + requestLogger.debug( + route.fetcher + ? `${routeMethod} ${proxyGroup}${route.path} -> fetcher:${fetcherName}` + : `${routeMethod} ${proxyGroup}${route.path} -> ${resolvedUrl}`, + { headers: upstreamHeaders, body, resolvedUrl }, + ); + + const fetcherResult = await tryAsync(async () => + fetcher.fetch({ + url: resolvedUrl, + method: routeMethod, + headers: headersToRecord(upstreamHeaders), + body: body as string | undefined, + pathParams: params, + queryParams: requestSearchParams, + }), + ); + + if (fetcherResult.isErr) { + const message = `Fetcher "${fetcherName}" for ${route.path} failed: ${fetcherResult.error}`; + requestLogger.error(message, { error: fetcherResult.error }); + return createErrorResponse(message, 500); + } + + if (fetcherResult.value.status >= 400) { + const message = `Fetcher "${fetcherName}" for ${route.path} returned status ${fetcherResult.value.status}: ${fetcherResult.value.body}`; + requestLogger.error(message); + return createErrorResponse(message, fetcherResult.value.status); + } + + requestLogger.debug("Received fetcher response", { + status: fetcherResult.value.status, + }); + + responseData = fetcherResult.value.body; + upstreamResponseHeaders = new Headers(fetcherResult.value.headers); + + // jsonPathInput tracks the most recently parsed form of the response so that + // sequential JSONPath applications don't re-parse the same JSON string. + let jsonPathInput: string | object = responseData; + + if (route.jsonPath) { + const jsonPathResult = applyJsonPath( + jsonPathInput, + replaceParams(route.jsonPath, params), + "route", + 500, + requestLogger, + ); + if (!jsonPathResult.ok) return jsonPathResult.response; + responseData = jsonPathResult.data; + jsonPathInput = jsonPathResult.parsedData as object; + } + + const jsonPathRequestHeader = Maybe.of(headers[JSON_PATH_HEADER_KEY]); + + // We apply the JSON path to the data that's exposed by the data proxy. + // This allows operators to specify what data is accessible while the data request program can specify what it wants from the accessible data. + if (jsonPathRequestHeader.isJust) { + const jsonPathResult = applyJsonPath( + jsonPathInput, + jsonPathRequestHeader.value, + "request", + 400, + requestLogger, + ); + if (!jsonPathResult.ok) return jsonPathResult.response; + responseData = jsonPathResult.data; + } + + // If the route or proxy has a public endpoint we replace the protocol and host with the public endpoint. + const calledEndpoint = route.baseURL + .or(config.baseURL) + .mapOr(request.url, (t) => { + const pathIndex = request.url.indexOf(path); + return `${t}${request.url.slice(pathIndex)}`; + }); + + requestLogger.debug("Signing data", { + calledEndpoint, + method: request.method, + body: requestBody.unwrapOr(undefined), + responseData, + }); + + const signature = await dataProxy.signData( + calledEndpoint, + request.method, + Buffer.from(requestBody.isJust ? requestBody.value : "", "utf-8"), + Buffer.from(responseData, "utf-8"), + ); + + const responseHeaders = new Headers(); + + // Forward all headers that are configured in the config.json + for (const forwardHeaderKey of route.forwardResponseHeaders) { + const forwardHeaderValue = upstreamResponseHeaders.get(forwardHeaderKey); + if (forwardHeaderValue) { + responseHeaders.append(forwardHeaderKey, forwardHeaderValue); + } + } + + return new Response(responseData, { + headers: createSignedResponseHeaders(signature, responseHeaders), + }); +} + export function startProxyServer( config: Config, dataProxy: DataProxy, @@ -100,332 +437,16 @@ export function startProxyServer( app.route( routeMethod, route.path, - async ({ headers, params, body, path, requestId, request }) => { - const requestLogger = logger.child({ requestId, path }); - - // requestBody is now always a string because of the parse function in this route - const requestBody = Maybe.of(body as string | undefined); - - // Verification with the SEDA chain that the overlay node is eligible - if (!serverOptions.disableProof) { - requestLogger.debug("Verifying proof"); - - const proofHeader = Option.fromNullable( - headers[constants.PROOF_HEADER_KEY], - ); - const sedaFastProofHeader = Option.fromNullable( - headers[constants.SEDA_FAST_PROOF_HEADER_KEY], - ); - - const heightFromHeader = Number( - headers[constants.HEIGHT_HEADER_KEY], - ); - const eligibleHeight = Maybe.of( - Number.isNaN(heightFromHeader) - ? undefined - : BigInt(heightFromHeader), - ); - - requestLogger.debug( - `Received proof for height ${eligibleHeight.mapOr( - "unknown", - (h) => h.toString(), - )}`, - ); - - if ( - Option.isNone(proofHeader) && - Option.isNone(sedaFastProofHeader) - ) { - const message = `Header "${constants.PROOF_HEADER_KEY}" or "${constants.SEDA_FAST_PROOF_HEADER_KEY}" is not provided`; - requestLogger.error(message); - return createErrorResponse(message, 400); - } - - // Disallow SEDA Fast usage if it's not enabled - if ( - !config.sedaFast?.enable && - Option.isSome(sedaFastProofHeader) - ) { - const message = `Header "${constants.SEDA_FAST_PROOF_HEADER_KEY}" is not allowed`; - requestLogger.error(message); - return createErrorResponse(message, 400); - } - - const proofInfo = Match.value(proofHeader).pipe( - Match.when(Option.isSome, (header) => { - return { - decodedProof: dataProxy.decodeProof(header.value), - rawProof: header.value, - type: "seda-core" as const, - }; - }), - Match.when(Option.isNone, () => { - // Should not throw since we checked for both headers above - return { - decodedProof: dataProxy.decodeSedaFastProof( - Option.getOrThrow(sedaFastProofHeader), - ), - rawProof: Option.getOrThrow(sedaFastProofHeader), - type: "seda-fast" as const, - }; - }), - Match.exhaustive, - ); - - if (proofInfo.decodedProof.isErr) { - const message = `Failed to decode proof: ${proofInfo.decodedProof.error}, make sure the proof is a base64 encoded string`; - requestLogger.error(message); - return createErrorResponse(message, 400); - } - - const proofId = - proofInfo.type === "seda-core" - ? proofInfo.decodedProof.value.drId - : proofInfo.decodedProof.value.publicKey.toString("hex"); - - const idType = - proofInfo.type === "seda-core" - ? "Data Request Id" - : "SEDA Fast Public Key"; - requestLogger.debug(`${idType}: ${proofId}`); - - const verificationResult = await Match.value(proofInfo).pipe( - Match.when({ type: "seda-fast" }, async (proof) => { - // Should not happen since we already checked for this above, but we need to satisfy the type checker - if (proof.decodedProof.isErr) { - throw new Error("Failed to decode proof"); - } - - return { - verification: await dataProxy.verifyFastProof( - proof.decodedProof.value, - ), - type: "seda-fast" as const, - }; - }), - Match.when({ type: "seda-core" }, async (proof) => { - return { - verification: await verifyWithRetry( - requestLogger, - dataProxy, - proof.rawProof, - eligibleHeight, - config.verificationMaxRetries, - () => config.verificationRetryDelay, - ), - type: "seda-core" as const, - }; - }), - Match.exhaustive, - ); - - if (verificationResult.verification.isErr) { - const message = `Failed to verify eligibility proof ${verificationResult.verification.error}`; - requestLogger.error(message); - return createErrorResponse(message, 401); - } - - if (!verificationResult.verification.value.isValid) { - const heightOrTimestamp = - verificationResult.type === "seda-core" - ? verificationResult.verification.value.currentHeight - : verificationResult.verification.value - .currentUnixTimestamp; - const message = `Ineligible executor at height/timestamp ${heightOrTimestamp}: ${verificationResult.verification.value.status}`; - requestLogger.error(message); - return createErrorResponse(message, 401); - } - - // Verification passed, we can proceed - } else { - requestLogger.debug("Skipping proof verification."); - } - - // Parse the request URL to get the search params, - // this is to support query params that can be repeated, such as ?one=one&one=two - const requestUrl = new URL(request.url); - - // Add the request search params (?one=two) to the upstream url - const requestSearchParams = createUrlSearchParams( - requestUrl.searchParams, - route.allowedQueryParams, - ); - let upstreamUrl = replaceParams(route.upstreamUrl, params); - const upstreamUrlResult = injectSearchParamsInUrl( - upstreamUrl, - requestSearchParams, - ); - - if (upstreamUrlResult.isErr) { - return createErrorResponse(upstreamUrlResult.error, 500); - } - upstreamUrl = upstreamUrlResult.value.toString(); - - const upstreamHeaders = new Headers(); - - // Redirect all headers given by the requester - for (const [key, value] of Object.entries(headers)) { - if (!value || key === constants.PROOF_HEADER_KEY) { - continue; - } - - upstreamHeaders.append(key, value); - } - - // Inject all configured headers by the data proxy node configuration - // Important: configured headers take precedence over headers sent in the request - for (const [key, value] of Object.entries(route.headers)) { - upstreamHeaders.set(key, replaceParams(value, params)); - } - - // Host doesn't match since we are proxying. Returning the upstream host while the URL does not match results - // in the client to not return the response. - upstreamHeaders.delete("host"); - - requestLogger.debug( - `${routeMethod} ${proxyGroup}${route.path} -> ${upstreamUrl}`, - { headers: upstreamHeaders, body, upstreamUrl }, - ); - - const upstreamResponse = await tryAsync(async () => - fetch(upstreamUrl, { - method: routeMethod, - headers: upstreamHeaders, - body: body as string, - }), - ); - - if (upstreamResponse.isErr) { - const message = `Proxying URL ${route.path} failed: ${upstreamResponse.error}`; - requestLogger.error(message, { error: upstreamResponse.error }); - return createErrorResponse(message, 500); - } - - if (!upstreamResponse.value.ok) { - const body = await tryAsync( - async () => await upstreamResponse.value.text(), - ); - - // Rare case where the upstream response is not a text parseable response - if (body.isErr) { - const message = `Upstream response for ${route.path} is not ok: ${upstreamResponse.value.status} err: ${body.error}`; - requestLogger.error(message, { error: body.error }); - return createErrorResponse( - message, - upstreamResponse.value.status, - ); - } - - const message = `Upstream response for ${route.path} is not ok: ${upstreamResponse.value.status} body: ${body.value}`; - requestLogger.error(message); - - return createErrorResponse( - message, - upstreamResponse.value.status, - ); - } - - requestLogger.debug("Received upstream response", { - headers: upstreamResponse.value.headers, - }); - - const upstreamTextResponse = await tryAsync( - async () => await upstreamResponse.value.text(), - ); - - if (upstreamTextResponse.isErr) { - const message = `Reading ${route.path} response body failed: ${upstreamTextResponse.error}`; - requestLogger.error(message, { - error: upstreamTextResponse.error, - }); - return createErrorResponse(message, 500); - } - - let responseData: string = upstreamTextResponse.value; - - if (route.jsonPath) { - requestLogger.debug(`Applying route JSONpath ${route.jsonPath}`); - const data = queryJson( - upstreamTextResponse.value, - replaceParams(route.jsonPath, params), - ); - - if (data.isErr) { - requestLogger.error( - `Failed to apply route JSONpath: ${route.jsonPath}`, - { error: data.error }, - ); - return createErrorResponse(data.error, 500); - } - - responseData = JSON.stringify(data.value); - requestLogger.debug("Successfully applied route JSONpath"); - } - - const jsonPathRequestHeader = Maybe.of( - headers[JSON_PATH_HEADER_KEY], - ); - - // TODO: Would be nice to only parse the JSON once - if (jsonPathRequestHeader.isJust) { - requestLogger.debug( - `Applying request JSONpath ${jsonPathRequestHeader.value}`, - ); - // We apply the JSON path to the data that's exposed by the data proxy. - // This allows operators to specify what data is accessible while the data request program can specify what it wants from the accessible data. - const data = queryJson(responseData, jsonPathRequestHeader.value); - - if (data.isErr) { - requestLogger.error( - `Failed to apply JSONpath: ${jsonPathRequestHeader.value}`, - { error: data.error }, - ); - return createErrorResponse(data.error, 400); - } - - responseData = JSON.stringify(data.value); - requestLogger.debug("Successfully applied request JSONpath"); - } - - // If the route or proxy has a public endpoint we replace the protocol and host with the public endpoint. - const calledEndpoint = route.baseURL - .or(config.baseURL) - .mapOr(request.url, (t) => { - const pathIndex = request.url.indexOf(path); - return `${t}${request.url.slice(pathIndex)}`; - }); - - requestLogger.debug("Signing data", { - calledEndpoint, - method: request.method, - body: requestBody.unwrapOr(undefined), - responseData, - }); - - const signature = await dataProxy.signData( - calledEndpoint, - request.method, - Buffer.from(requestBody.isJust ? requestBody.value : "", "utf-8"), - Buffer.from(responseData, "utf-8"), - ); - - const responseHeaders = new Headers(); - - // Forward all headers that are configured in the config.json - for (const forwardHeaderKey of route.forwardResponseHeaders) { - const forwardHeaderValue = - upstreamResponse.value.headers.get(forwardHeaderKey); - - if (forwardHeaderValue) { - responseHeaders.append(forwardHeaderKey, forwardHeaderValue); - } - } - - return new Response(responseData, { - headers: createSignedResponseHeaders(signature, responseHeaders), - }); - }, + (ctx) => + handleProxyRequest( + ctx, + route, + routeMethod, + proxyGroup, + config, + dataProxy, + serverOptions, + ), { config: {}, parse: ({ request }) => { @@ -441,6 +462,15 @@ export function startProxyServer( return app; }); + // Validate that every route referencing a custom fetcher has a registered implementation. + for (const route of config.routes) { + if (route.fetcher && !getFetcher(route.fetcher)) { + throw new Error( + `Route "${route.path}" references fetcher "${route.fetcher}", but no fetcher with that name is registered. Register it via registerFetcher() in workspace/data-proxy/src/fetchers/index.ts.`, + ); + } + } + server.listen(serverOptions.port); logger.info( `Proxy routes is at http://127.0.0.1:${serverOptions.port}/${proxyGroup === "" || proxyGroup.endsWith("/") ? `${proxyGroup}` : `${proxyGroup}/`}`, diff --git a/workspace/data-proxy/src/utils/headers.ts b/workspace/data-proxy/src/utils/headers.ts new file mode 100644 index 0000000..56e9d4c --- /dev/null +++ b/workspace/data-proxy/src/utils/headers.ts @@ -0,0 +1,7 @@ +export function headersToRecord(headers: Headers): Record { + const record: Record = {}; + headers.forEach((value, key) => { + record[key] = value; + }); + return record; +} diff --git a/workspace/data-proxy/src/utils/verify-with-retry.ts b/workspace/data-proxy/src/utils/verify-with-retry.ts index 6eeca09..fc7ffab 100644 --- a/workspace/data-proxy/src/utils/verify-with-retry.ts +++ b/workspace/data-proxy/src/utils/verify-with-retry.ts @@ -1,5 +1,4 @@ import type { DataProxy } from "@seda-protocol/data-proxy-sdk"; -import * as Match from "effect/Match"; import type { Maybe, Result } from "true-myth"; import type { Logger } from "winston";