From 47a37152c0491bd5040d13598c47b3d8b410f91a Mon Sep 17 00:00:00 2001 From: jasperdg Date: Fri, 6 Mar 2026 15:24:39 +0100 Subject: [PATCH 1/3] feat: add custom fetcher system with pyth-pro fetcher Introduces a pluggable fetcher abstraction that allows routes to opt into custom fetch logic instead of the default HTTP passthrough. - Add Fetcher interface (types.ts) and a registry (index.ts) with built-in 'default' and 'pyth-pro' fetchers - pyth-pro fetcher: uses @pythnetwork/pyth-lazer-sdk to call getLatestPrice or getPrice based on the upstream URL path (/v1/latest_price vs /v1/price), caches PythLazerClient per base-URL+token, and validates that timestamp (whole-second microseconds) is present for the /v1/price endpoint - proxy-server: route handler selects fetcher by name from the route config, falls back to defaultFetcher when none is specified - config-parser: adds optional 'fetcher' field to RouteSchema; skips unused path-variable warnings for custom fetcher routes - headers util and verify-with-retry improvements --- bun.lock | 20 +- workspace/data-proxy/package.json | 1 + workspace/data-proxy/src/config-parser.ts | 40 +- .../src/fetchers/default-fetcher.ts | 20 + workspace/data-proxy/src/fetchers/index.ts | 23 + workspace/data-proxy/src/fetchers/pyth-pro.ts | 199 +++++ workspace/data-proxy/src/fetchers/types.ts | 23 + workspace/data-proxy/src/proxy-server.ts | 685 +++++++++--------- workspace/data-proxy/src/utils/headers.ts | 7 + .../data-proxy/src/utils/verify-with-retry.ts | 1 - 10 files changed, 668 insertions(+), 351 deletions(-) create mode 100644 workspace/data-proxy/src/fetchers/default-fetcher.ts create mode 100644 workspace/data-proxy/src/fetchers/index.ts create mode 100644 workspace/data-proxy/src/fetchers/pyth-pro.ts create mode 100644 workspace/data-proxy/src/fetchers/types.ts create mode 100644 workspace/data-proxy/src/utils/headers.ts diff --git a/bun.lock b/bun.lock index e0e79b3..8939bff 100644 --- a/bun.lock +++ b/bun.lock @@ -1,6 +1,5 @@ { "lockfileVersion": 1, - "configVersion": 0, "workspaces": { "": { "name": "seda-data-proxy", @@ -16,12 +15,13 @@ }, "workspace/data-proxy": { "name": "@seda-protocol/data-proxy", - "version": "1.3.5", + "version": "1.3.6", "dependencies": { "@commander-js/extra-typings": "^12.1.0", "@cosmjs/crypto": "^0.33.1", "@cosmjs/encoding": "^0.33.1", "@dotenvx/dotenvx": "^1.51.4", + "@pythnetwork/pyth-lazer-sdk": "^6.0.0", "@seda-protocol/data-proxy-sdk": "workspace:*", "@seda-protocol/utils": "^2.0.0", "big.js": "7.0.1", @@ -121,6 +121,8 @@ "@inquirer/type": ["@inquirer/type@1.5.5", "", { "dependencies": { "mute-stream": "^1.0.0" } }, "sha512-MzICLu4yS7V8AA61sANROZ9vT1H3ooca5dSmI1FjZkzq7o/koMsRfQSzRtFo+F3Ao4Sf1C0bpLKejpKB/+j6MA=="], + "@isaacs/ttlcache": ["@isaacs/ttlcache@1.4.1", "", {}, "sha512-RQgQ4uQ+pLbqXfOmieB91ejmLwvSgv9nLx6sT6sD83s7umBypgg+OIBOBbEUiJXrfpnp9j0mRhYYdzp9uqq3lA=="], + "@jsep-plugin/assignment": ["@jsep-plugin/assignment@1.3.0", "", { "peerDependencies": { "jsep": "^0.4.0||^1.0.0" } }, "sha512-VVgV+CXrhbMI3aSusQyclHkenWSAm95WaiKrMxRFam3JSUiIaQjoMIw2sEs/OX4XifnqeQUN4DYbJjlA8EfktQ=="], "@jsep-plugin/regex": ["@jsep-plugin/regex@1.0.4", "", { "peerDependencies": { "jsep": "^0.4.0||^1.0.0" } }, "sha512-q7qL4Mgjs1vByCaTnDFcBnV9HS7GVPJX5vyVoCgZHNSC9rjwIlmbXG5sUuorR5ndfHAIlJ8pVStxvjXHbNvtUg=="], @@ -161,6 +163,8 @@ "@protobufjs/utf8": ["@protobufjs/utf8@1.1.0", "", {}, "sha512-Vvn3zZrhQZkkBE8LSuW3em98c0FwgO4nxzv6OdSxPKJIEKY2bGbHn+mhGIPerzI4twdxaP8/0+06HBpwf345Lw=="], + "@pythnetwork/pyth-lazer-sdk": ["@pythnetwork/pyth-lazer-sdk@6.0.0", "", { "dependencies": { "@isaacs/ttlcache": "^1.4.1", "buffer": "^6.0.3", "isomorphic-ws": "^5.0.0", "ts-log": "^2.2.7", "ws": "^8.18.0" } }, "sha512-cty4/M4zQOUtFnm1ZsCfMs9x3sLk4ZmEwW0CPqMtDZFai94QGsXHkKvi5LkBPRL4aQg96EuVzOa37H0lHqANeQ=="], + "@seda-protocol/data-proxy": ["@seda-protocol/data-proxy@workspace:workspace/data-proxy"], "@seda-protocol/data-proxy-sdk": ["@seda-protocol/data-proxy-sdk@workspace:workspace/data-proxy-sdk"], @@ -219,6 +223,8 @@ "brorand": ["brorand@1.1.0", "", {}, "sha512-cKV8tMCEpQs4hK/ik71d6LrPOnpkpGBR0wzxqr68g2m/LB2GxVYQroAjMJZRVM1Y4BCjCKc3vAamxSzOY2RP+w=="], + "buffer": ["buffer@6.0.3", "", { "dependencies": { "base64-js": "^1.3.1", "ieee754": "^1.2.1" } }, "sha512-FTiCpNxtwiZZHEZbcbTIcZjERVICn9yq/pDFkTl95/AxzD1naBctN7YO68riM/gLSDY7sdrMby8hofADYuuqOA=="], + "bun-types": ["bun-types@1.2.23", "", { "dependencies": { "@types/node": "*" }, "peerDependencies": { "@types/react": "^19" } }, "sha512-R9f0hKAZXgFU3mlrA0YpE/fiDvwV0FT9rORApt2aQVWSuJDzZOyB5QLc0N/4HF57CS8IXJ6+L5E4W1bW6NS2Aw=="], "call-bind-apply-helpers": ["call-bind-apply-helpers@1.0.2", "", { "dependencies": { "es-errors": "^1.3.0", "function-bind": "^1.1.2" } }, "sha512-Sp1ablJ0ivDkSzjcaJdxEunN5/XvksFJ2sMBFfq6x0ryhQV/2b/KwFe21cMpmHtPOSij8K99/wSfoEuTObmuMQ=="], @@ -361,7 +367,7 @@ "isexe": ["isexe@3.1.1", "", {}, "sha512-LpB/54B+/2J5hqQ7imZHfdU31OlgQqx7ZicVlkm9kzg9/w8GKLEcFfJl/t7DCEDueOyBAD6zCCwTO6Fzs0NoEQ=="], - "isomorphic-ws": ["isomorphic-ws@4.0.1", "", { "peerDependencies": { "ws": "*" } }, "sha512-BhBvN2MBpWTaSHdWRb/bwdZJ1WaehQ2L1KngkCkfLUGF0mAWAT1sQUQacEmQ0jXkFw/czDXPNQSL5u2/Krsz1w=="], + "isomorphic-ws": ["isomorphic-ws@5.0.0", "", { "peerDependencies": { "ws": "*" } }, "sha512-muId7Zzn9ywDsyXgTIafTry2sV3nySZeUDe6YedVd1Hvuuep5AsIlqK+XefWpYTyJG5e503F2xIuT2lcU6rCSw=="], "jsep": ["jsep@1.4.0", "", {}, "sha512-B7qPcEVE3NVkmSJbaYxvv4cHkVW7DQsZz13pUMrfS8z8Q/BuShN+gcTXrUlPiGqM2/t/EEaI030bpxMqY8gMlw=="], @@ -483,6 +489,8 @@ "true-myth": ["true-myth@9.0.1", "", {}, "sha512-8ePgamjgV3CKDI43o6eUtid+9iadfEKUiu2WTucbeCnPTyDOQpZ4svCuozLwm9S4hzUolOclsUCJf3yjQg3g/Q=="], + "ts-log": ["ts-log@2.2.7", "", {}, "sha512-320x5Ggei84AxzlXp91QkIGSw5wgaLT6GeAH0KsqDmRZdVWW2OiSeVvElVoatk3f7nicwXlElXsoFkARiGE2yg=="], + "type-fest": ["type-fest@4.41.0", "", {}, "sha512-TeTSQ6H5YHvpqVwBRcnLDCBnDOHWYu7IvGbHT6N8AOymcr9PJGjc1GTtiWZTYg0NCgYwvnYWEkVChQAr9bjfwA=="], "typescript": ["typescript@5.8.3", "", { "bin": { "tsc": "bin/tsc", "tsserver": "bin/tsserver" } }, "sha512-p1diW6TqL9L07nNxvRMM7hMMw4c5XOo/1ibL4aAIGmSAt9slTE1Xgw5KWuof2uTOvCg9BY7ZRi+GaF+7sfgPeQ=="], @@ -505,7 +513,7 @@ "wrap-ansi": ["wrap-ansi@6.2.0", "", { "dependencies": { "ansi-styles": "^4.0.0", "string-width": "^4.1.0", "strip-ansi": "^6.0.0" } }, "sha512-r6lPcBGxZXlIcymEu7InxDMhdW0KDxpLgoFLcguasxCaJ/SOIZwINatK9KY/tf+ZrlywOKU0UDj3ATXUBfxJXA=="], - "ws": ["ws@7.5.10", "", { "peerDependencies": { "bufferutil": "^4.0.1", "utf-8-validate": "^5.0.2" }, "optionalPeers": ["bufferutil", "utf-8-validate"] }, "sha512-+dbF1tHwZpXcbOJdVOkzLDxZP1ailvSxM6ZweXTegylPny803bFhA+vqBYw4s31NSAk4S2Qz+AKXK9a4wkdjcQ=="], + "ws": ["ws@8.19.0", "", { "peerDependencies": { "bufferutil": "^4.0.1", "utf-8-validate": ">=5.0.2" }, "optionalPeers": ["bufferutil", "utf-8-validate"] }, "sha512-blAT2mjOEIi0ZzruJfIhb3nps74PRWTCz1IjglWEEpQl5XS/UNama6u2/rjFkDDouqr4L67ry+1aGIALViWjDg=="], "xstream": ["xstream@11.14.0", "", { "dependencies": { "globalthis": "^1.0.1", "symbol-observable": "^2.0.3" } }, "sha512-1bLb+kKKtKPbgTK6i/BaoAn03g47PpFstlbe1BA+y3pNS/LfvcaghS5BFf9+EE1J+KwSQsEpfJvFN5GqFtiNmw=="], @@ -519,6 +527,10 @@ "@bundled-es-modules/cookie/cookie": ["cookie@0.7.2", "", {}, "sha512-yki5XnKuf750l50uGTllt6kKILY4nQ1eNIQatoXEByZ5dWgnKqbnqmTrBE5B4N7lrMJKQ2ytWMiTO2o0v6Ew/w=="], + "@cosmjs/socket/isomorphic-ws": ["isomorphic-ws@4.0.1", "", { "peerDependencies": { "ws": "*" } }, "sha512-BhBvN2MBpWTaSHdWRb/bwdZJ1WaehQ2L1KngkCkfLUGF0mAWAT1sQUQacEmQ0jXkFw/czDXPNQSL5u2/Krsz1w=="], + + "@cosmjs/socket/ws": ["ws@7.5.10", "", { "peerDependencies": { "bufferutil": "^4.0.1", "utf-8-validate": "^5.0.2" }, "optionalPeers": ["bufferutil", "utf-8-validate"] }, "sha512-+dbF1tHwZpXcbOJdVOkzLDxZP1ailvSxM6ZweXTegylPny803bFhA+vqBYw4s31NSAk4S2Qz+AKXK9a4wkdjcQ=="], + "@dotenvx/dotenvx/commander": ["commander@11.1.0", "", {}, "sha512-yPVavfyCcRhmorC7rWlkHn15b4wDVgVmBA7kV4QVBsF7kv/9TKJAbAXVTxvTnwP8HHKjRCJDClKbciiYS7p0DQ=="], "@inquirer/core/@inquirer/type": ["@inquirer/type@2.0.0", "", { "dependencies": { "mute-stream": "^1.0.0" } }, "sha512-XvJRx+2KR3YXyYtPUUy+qd9i7p+GO9Ko6VIIpWlBrpWwXDv8WLFeHTxz35CfQFUiBMLXlGHhGzys7lqit9gWag=="], diff --git a/workspace/data-proxy/package.json b/workspace/data-proxy/package.json index a902d65..235c46e 100644 --- a/workspace/data-proxy/package.json +++ b/workspace/data-proxy/package.json @@ -11,6 +11,7 @@ "@cosmjs/crypto": "^0.33.1", "@cosmjs/encoding": "^0.33.1", "@dotenvx/dotenvx": "^1.51.4", + "@pythnetwork/pyth-lazer-sdk": "^6.0.0", "@seda-protocol/data-proxy-sdk": "workspace:*", "@seda-protocol/utils": "^2.0.0", "big.js": "7.0.1", diff --git a/workspace/data-proxy/src/config-parser.ts b/workspace/data-proxy/src/config-parser.ts index 5bc7b59..0eccaea 100644 --- a/workspace/data-proxy/src/config-parser.ts +++ b/workspace/data-proxy/src/config-parser.ts @@ -27,6 +27,7 @@ const RouteSchema = v.strictObject( baseURL: maybe(v.string()), path: v.string(), upstreamUrl: v.string(), + fetcher: v.optional(v.string()), method: v.optional(HttpMethodSchema, DEFAULT_HTTP_METHODS), jsonPath: v.optional(v.pipe(v.string(), v.startsWith("$"))), allowedQueryParams: v.optional(v.array(v.string())), @@ -262,29 +263,32 @@ export function parseConfig( } // Ensure variables in the path are used in the upstream URL, the headers or the jsonPath. - for (const match of route.path.matchAll(pathVarRegex)) { - let isUsed = false; + // Skip this check for custom fetchers since they control their own parameter usage. + if (!route.fetcher) { + for (const match of route.path.matchAll(pathVarRegex)) { + let isUsed = false; - if (route.upstreamUrl.includes(match[1])) { - isUsed = true; - } - - for (const [_, headerValue] of Object.entries(route.headers)) { - if (headerValue.includes(match[1])) { + if (route.upstreamUrl.includes(match[1])) { isUsed = true; - break; } - } - if (route.jsonPath?.includes(match[1])) { - isUsed = true; - } + for (const [_, headerValue] of Object.entries(route.headers)) { + if (headerValue.includes(match[1])) { + isUsed = true; + break; + } + } - if (!isUsed) { - hasWarnings = true; - logger.warn( - `$.${index}.path has ${match[1]}, but it is not used in $.${index}.upstreamUrl or $.${index}.headers. \nPlease either remove the variable from the path or use it in the upstreamUrl or headers (through "{${match[1]}}").`, - ); + if (route.jsonPath?.includes(match[1])) { + isUsed = true; + } + + if (!isUsed) { + hasWarnings = true; + logger.warn( + `$.${index}.path has ${match[1]}, but it is not used in $.${index}.upstreamUrl or $.${index}.headers. \nPlease either remove the variable from the path or use it in the upstreamUrl or headers (through "{${match[1]}}").`, + ); + } } } diff --git a/workspace/data-proxy/src/fetchers/default-fetcher.ts b/workspace/data-proxy/src/fetchers/default-fetcher.ts new file mode 100644 index 0000000..0304012 --- /dev/null +++ b/workspace/data-proxy/src/fetchers/default-fetcher.ts @@ -0,0 +1,20 @@ +import { headersToRecord } from "../utils/headers"; +import type { Fetcher, FetcherRequest, FetcherResponse } from "./types"; + +export const defaultFetcher: Fetcher = { + async fetch(request: FetcherRequest): Promise { + const response = await fetch(request.url, { + method: request.method, + headers: request.headers, + body: request.body, + }); + + const body = await response.text(); + + return { + status: response.status, + body, + headers: headersToRecord(response.headers), + }; + }, +}; diff --git a/workspace/data-proxy/src/fetchers/index.ts b/workspace/data-proxy/src/fetchers/index.ts new file mode 100644 index 0000000..da3facb --- /dev/null +++ b/workspace/data-proxy/src/fetchers/index.ts @@ -0,0 +1,23 @@ +import { defaultFetcher } from "./default-fetcher"; +import { pythProFetcher } from "./pyth-pro"; +import type { Fetcher } from "./types"; + +export type { Fetcher, FetcherRequest, FetcherResponse } from "./types"; + +const registry = new Map(); + +export function registerFetcher(name: string, fetcher: Fetcher): void { + registry.set(name, fetcher); +} + +export function getFetcher(name: string): Fetcher | undefined { + return registry.get(name); +} + +export function getRegisteredFetcherNames(): string[] { + return Array.from(registry.keys()); +} + +// Built-in fetchers — add new ones here +registerFetcher("default", defaultFetcher); +registerFetcher("pyth-pro", pythProFetcher); diff --git a/workspace/data-proxy/src/fetchers/pyth-pro.ts b/workspace/data-proxy/src/fetchers/pyth-pro.ts new file mode 100644 index 0000000..a0afce0 --- /dev/null +++ b/workspace/data-proxy/src/fetchers/pyth-pro.ts @@ -0,0 +1,199 @@ +import type { + Channel, + Format, + JsonBinaryEncoding, + PriceFeedProperty, +} from "@pythnetwork/pyth-lazer-sdk"; +import { PythLazerClient } from "@pythnetwork/pyth-lazer-sdk"; +import type { Fetcher, FetcherRequest, FetcherResponse } from "./types"; + +const clientCache = new Map(); + +async function getOrCreateClient( + priceServiceUrl: string, + token: string, +): Promise { + const key = `${priceServiceUrl}|${token}`; + if (!clientCache.has(key)) { + clientCache.set( + key, + await PythLazerClient.create({ token, priceServiceUrl }), + ); + } + return clientCache.get(key) as PythLazerClient; +} + +type PythProEndpoint = "latest_price" | "price"; + +interface PythProRequestBody { + // Feed identifiers — at least one of these must be provided + priceFeedIds?: number[]; + priceFeedSymbols?: string[]; + // Required when the URL endpoint is "price". + // Must be whole-second precision in microseconds: Math.floor(Date.now() / 1000) * 1_000_000 + timestamp?: number; + // SDK subscription params — passed through directly to the SDK + channel: Channel; + formats: Format[]; + properties: PriceFeedProperty[]; + jsonBinaryEncoding?: JsonBinaryEncoding; + parsed?: boolean; +} + +function resolveEndpoint(rawUrl: string): PythProEndpoint | null { + let pathname: string; + try { + pathname = new URL(rawUrl).pathname; + } catch { + return null; + } + const last = pathname.split("/").filter(Boolean).pop(); + if (last === "latest_price" || last === "price") return last; + return null; +} + +/** Strips the trailing /v1/ from a URL to get the Pyth Lazer service base URL. */ +function resolveBaseUrl(rawUrl: string): string { + try { + const parsed = new URL(rawUrl); + const v1Index = parsed.pathname.lastIndexOf("/v1/"); + const basePath = + v1Index !== -1 ? parsed.pathname.substring(0, v1Index) : ""; + return `${parsed.origin}${basePath}`; + } catch { + return rawUrl; + } +} + +export const pythProFetcher: Fetcher = { + async fetch(request: FetcherRequest): Promise { + const authHeader = + request.headers.authorization ?? request.headers.Authorization; + + if (!authHeader) { + return { + status: 401, + body: JSON.stringify({ + error: + 'Missing Authorization header. Set it in the route config: "headers": { "Authorization": "Bearer {$PYTH_LAZER_TOKEN}" }', + }), + headers: { "content-type": "application/json" }, + }; + } + + const token = authHeader.startsWith("Bearer ") + ? authHeader.slice("Bearer ".length) + : authHeader; + + let parsedBody: PythProRequestBody; + try { + parsedBody = JSON.parse(request.body ?? "{}") as PythProRequestBody; + } catch { + return { + status: 400, + body: JSON.stringify({ error: "Request body must be valid JSON" }), + headers: { "content-type": "application/json" }, + }; + } + + const { + priceFeedIds, + priceFeedSymbols, + timestamp, + channel, + formats, + properties, + jsonBinaryEncoding, + parsed, + } = parsedBody; + + console.log("priceFeedIds", priceFeedIds); + if ( + (!priceFeedIds || priceFeedIds.length === 0) && + (!priceFeedSymbols || priceFeedSymbols.length === 0) + ) { + return { + status: 400, + body: JSON.stringify({ + error: + "Request body must include at least one of: priceFeedIds, priceFeedSymbols", + }), + headers: { "content-type": "application/json" }, + }; + } + + if (!channel) { + return { + status: 400, + body: JSON.stringify({ + error: + 'Request body must include "channel" (real_time | fixed_rate@50ms | fixed_rate@200ms | fixed_rate@1000ms)', + }), + headers: { "content-type": "application/json" }, + }; + } + + if (!properties || properties.length === 0) { + return { + status: 400, + body: JSON.stringify({ + error: 'Request body must include "properties" array', + }), + headers: { "content-type": "application/json" }, + }; + } + + const endpoint = resolveEndpoint(request.url); + + if (!endpoint) { + return { + status: 400, + body: JSON.stringify({ + error: + 'Upstream URL must end with either "/v1/latest_price" or "/v1/price"', + }), + headers: { "content-type": "application/json" }, + }; + } + + if (endpoint === "price" && timestamp === undefined) { + return { + status: 400, + body: JSON.stringify({ + error: + 'The "/v1/price" endpoint requires "timestamp" (whole-second microseconds, e.g. Math.floor(Date.now() / 1000) * 1_000_000) in the request body', + }), + headers: { "content-type": "application/json" }, + }; + } + + const baseUrl = resolveBaseUrl(request.url); + const client = await getOrCreateClient(baseUrl, token); + + const sdkParams = { + ...(priceFeedIds && priceFeedIds.length > 0 ? { priceFeedIds } : {}), + ...(priceFeedSymbols && priceFeedSymbols.length > 0 + ? { symbols: priceFeedSymbols } + : {}), + channel, + formats: formats ?? [], + properties, + ...(jsonBinaryEncoding !== undefined ? { jsonBinaryEncoding } : {}), + ...(parsed !== undefined ? { parsed } : {}), + }; + console.log("get here"); + + const result = + endpoint === "price" + ? await client.getPrice({ ...sdkParams, timestamp: timestamp! }) + : await client.getLatestPrice(sdkParams); + + console.log("result", result); + + return { + status: 200, + body: JSON.stringify(result), + headers: { "content-type": "application/json" }, + }; + }, +}; diff --git a/workspace/data-proxy/src/fetchers/types.ts b/workspace/data-proxy/src/fetchers/types.ts new file mode 100644 index 0000000..fdf291f --- /dev/null +++ b/workspace/data-proxy/src/fetchers/types.ts @@ -0,0 +1,23 @@ +export interface FetcherRequest { + /** Resolved upstream URL after path/env variable substitution. Empty string when no upstreamUrl is configured. */ + url: string; + method: string; + /** Merged request headers with host removed and configured route headers applied. */ + headers: Record; + body?: string; + /** Raw Elysia path params captured from the route pattern (e.g. { feedId: "BTC" }). */ + pathParams: Record; + /** Allowlist-filtered query params from the incoming request. */ + queryParams: URLSearchParams; +} + +export interface FetcherResponse { + status: number; + body: string; + /** Response headers — used by forwardResponseHeaders to selectively pass headers to the client. */ + headers: Record; +} + +export interface Fetcher { + fetch(request: FetcherRequest): Promise; +} diff --git a/workspace/data-proxy/src/proxy-server.ts b/workspace/data-proxy/src/proxy-server.ts index 2f5da74..565d5a9 100644 --- a/workspace/data-proxy/src/proxy-server.ts +++ b/workspace/data-proxy/src/proxy-server.ts @@ -3,11 +3,14 @@ import { constants, type DataProxy } from "@seda-protocol/data-proxy-sdk"; import { tryAsync } from "@seda-protocol/utils"; import * as Match from "effect/Match"; import * as Option from "effect/Option"; -import { Elysia } from "elysia"; +import { Elysia, type HTTPMethod } from "elysia"; import { Maybe } from "true-myth"; -import { type Config, getHttpMethods } from "./config-parser"; +import { type Config, type Route, getHttpMethods } from "./config-parser"; import { DEFAULT_PROXY_ROUTE_GROUP, JSON_PATH_HEADER_KEY } from "./constants"; +import { getFetcher } from "./fetchers"; +import { defaultFetcher } from "./fetchers/default-fetcher"; import logger from "./logger"; +import { headersToRecord } from "./utils/headers"; import { StatusContext, statusPlugin } from "./status-plugin"; import { createDefaultResponseHeaders, @@ -26,11 +29,344 @@ function createErrorResponse(error: string, status: number) { }); } +type ApplyJsonPathResult = + | { ok: true; data: string; parsedData: unknown } + | { ok: false; response: Response }; + +function applyJsonPath( + responseData: string | object, + expression: string, + label: string, + errorStatus: number, + requestLogger: typeof logger, +): ApplyJsonPathResult { + requestLogger.debug(`Applying ${label} JSONpath ${expression}`); + const result = queryJson(responseData, expression); + + if (result.isErr) { + requestLogger.error(`Failed to apply ${label} JSONpath: ${expression}`, { + error: result.error, + }); + return { + ok: false, + response: createErrorResponse(result.error, errorStatus), + }; + } + + requestLogger.debug(`Successfully applied ${label} JSONpath`); + return { + ok: true, + data: JSON.stringify(result.value), + parsedData: result.value, + }; +} + export interface ProxyServerOptions { port: number; disableProof: boolean; } +interface RouteHandlerContext { + headers: Record; + params: Record; + body: unknown; + path: string; + requestId: string; + request: Request; +} + +async function handleProxyRequest( + ctx: RouteHandlerContext, + route: Route, + routeMethod: HTTPMethod, + proxyGroup: string, + config: Config, + dataProxy: DataProxy, + serverOptions: ProxyServerOptions, +): Promise { + const { headers, params, body, path, requestId, request } = ctx; + const requestLogger = logger.child({ requestId, path }); + + // requestBody is now always a string because of the parse function in this route + const requestBody = Maybe.of(body as string | undefined); + + // Verification with the SEDA chain that the overlay node is eligible + if (!serverOptions.disableProof) { + requestLogger.debug("Verifying proof"); + + const proofHeader = Option.fromNullable( + headers[constants.PROOF_HEADER_KEY], + ); + const sedaFastProofHeader = Option.fromNullable( + headers[constants.SEDA_FAST_PROOF_HEADER_KEY], + ); + + const heightFromHeader = Number(headers[constants.HEIGHT_HEADER_KEY]); + const eligibleHeight = Maybe.of( + Number.isNaN(heightFromHeader) ? undefined : BigInt(heightFromHeader), + ); + + requestLogger.debug( + `Received proof for height ${eligibleHeight.mapOr("unknown", (h) => + h.toString(), + )}`, + ); + + if (Option.isNone(proofHeader) && Option.isNone(sedaFastProofHeader)) { + const message = `Header "${constants.PROOF_HEADER_KEY}" or "${constants.SEDA_FAST_PROOF_HEADER_KEY}" is not provided`; + requestLogger.error(message); + return createErrorResponse(message, 400); + } + + // Disallow SEDA Fast usage if it's not enabled + if (!config.sedaFast?.enable && Option.isSome(sedaFastProofHeader)) { + const message = `Header "${constants.SEDA_FAST_PROOF_HEADER_KEY}" is not allowed`; + requestLogger.error(message); + return createErrorResponse(message, 400); + } + + const proofInfo = Match.value(proofHeader).pipe( + Match.when(Option.isSome, (header) => { + return { + decodedProof: dataProxy.decodeProof(header.value), + rawProof: header.value, + type: "seda-core" as const, + }; + }), + Match.when(Option.isNone, () => { + // Should not throw since we checked for both headers above + return { + decodedProof: dataProxy.decodeSedaFastProof( + Option.getOrThrow(sedaFastProofHeader), + ), + rawProof: Option.getOrThrow(sedaFastProofHeader), + type: "seda-fast" as const, + }; + }), + Match.exhaustive, + ); + + if (proofInfo.decodedProof.isErr) { + const message = `Failed to decode proof: ${proofInfo.decodedProof.error}, make sure the proof is a base64 encoded string`; + requestLogger.error(message); + return createErrorResponse(message, 400); + } + + const proofId = + proofInfo.type === "seda-core" + ? proofInfo.decodedProof.value.drId + : proofInfo.decodedProof.value.publicKey.toString("hex"); + + const idType = + proofInfo.type === "seda-core" + ? "Data Request Id" + : "SEDA Fast Public Key"; + requestLogger.debug(`${idType}: ${proofId}`); + + const verificationResult = await Match.value(proofInfo).pipe( + Match.when({ type: "seda-fast" }, async (proof) => { + // Should not happen since we already checked for this above, but we need to satisfy the type checker + if (proof.decodedProof.isErr) { + throw new Error("Failed to decode proof"); + } + + return { + verification: await dataProxy.verifyFastProof( + proof.decodedProof.value, + ), + type: "seda-fast" as const, + }; + }), + Match.when({ type: "seda-core" }, async (proof) => { + return { + verification: await verifyWithRetry( + requestLogger, + dataProxy, + proof.rawProof, + eligibleHeight, + config.verificationMaxRetries, + () => config.verificationRetryDelay, + ), + type: "seda-core" as const, + }; + }), + Match.exhaustive, + ); + + if (verificationResult.verification.isErr) { + const message = `Failed to verify eligibility proof ${verificationResult.verification.error}`; + requestLogger.error(message); + return createErrorResponse(message, 401); + } + + if (!verificationResult.verification.value.isValid) { + const heightOrTimestamp = + verificationResult.type === "seda-core" + ? verificationResult.verification.value.currentHeight + : verificationResult.verification.value.currentUnixTimestamp; + const message = `Ineligible executor at height/timestamp ${heightOrTimestamp}: ${verificationResult.verification.value.status}`; + requestLogger.error(message); + return createErrorResponse(message, 401); + } + + // Verification passed, we can proceed + } else { + requestLogger.debug("Skipping proof verification."); + } + + // Parse the request URL to get the search params, + // this is to support query params that can be repeated, such as ?one=one&one=two + const requestUrl = new URL(request.url); + + // Add the request search params (?one=two) to the upstream url + const requestSearchParams = createUrlSearchParams( + requestUrl.searchParams, + route.allowedQueryParams, + ); + + const upstreamHeaders = new Headers(); + + // Redirect all headers given by the requester + for (const [key, value] of Object.entries(headers)) { + if (!value || key === constants.PROOF_HEADER_KEY) { + continue; + } + upstreamHeaders.append(key, value); + } + + // Inject all configured headers by the data proxy node configuration + // Important: configured headers take precedence over headers sent in the request + for (const [key, value] of Object.entries(route.headers)) { + upstreamHeaders.set(key, replaceParams(value, params)); + } + + // Host doesn't match since we are proxying. Returning the upstream host while the URL does not match results + // in the client to not return the response. + upstreamHeaders.delete("host"); + + const resolvedUrlResult = injectSearchParamsInUrl( + replaceParams(route.upstreamUrl, params), + requestSearchParams, + ); + if (resolvedUrlResult.isErr) { + return createErrorResponse(resolvedUrlResult.error, 500); + } + const resolvedUrl = resolvedUrlResult.value.toString(); + + let responseData: string; + let upstreamResponseHeaders: Headers; + + // Use the configured custom fetcher, or fall back to the default HTTP fetcher + const fetcherName = route.fetcher ?? "default"; + const fetcher = route.fetcher ? getFetcher(route.fetcher)! : defaultFetcher; + + requestLogger.debug( + route.fetcher + ? `${routeMethod} ${proxyGroup}${route.path} -> fetcher:${fetcherName}` + : `${routeMethod} ${proxyGroup}${route.path} -> ${resolvedUrl}`, + { headers: upstreamHeaders, body, resolvedUrl }, + ); + + const fetcherResult = await tryAsync(async () => + fetcher.fetch({ + url: resolvedUrl, + method: routeMethod, + headers: headersToRecord(upstreamHeaders), + body: body as string | undefined, + pathParams: params, + queryParams: requestSearchParams, + }), + ); + + if (fetcherResult.isErr) { + const message = `Fetcher "${fetcherName}" for ${route.path} failed: ${fetcherResult.error}`; + requestLogger.error(message, { error: fetcherResult.error }); + return createErrorResponse(message, 500); + } + + if (fetcherResult.value.status >= 400) { + const message = `Fetcher "${fetcherName}" for ${route.path} returned status ${fetcherResult.value.status}: ${fetcherResult.value.body}`; + requestLogger.error(message); + return createErrorResponse(message, fetcherResult.value.status); + } + + requestLogger.debug("Received fetcher response", { + status: fetcherResult.value.status, + }); + + responseData = fetcherResult.value.body; + upstreamResponseHeaders = new Headers(fetcherResult.value.headers); + + // jsonPathInput tracks the most recently parsed form of the response so that + // sequential JSONPath applications don't re-parse the same JSON string. + let jsonPathInput: string | object = responseData; + + if (route.jsonPath) { + const jsonPathResult = applyJsonPath( + jsonPathInput, + replaceParams(route.jsonPath, params), + "route", + 500, + requestLogger, + ); + if (!jsonPathResult.ok) return jsonPathResult.response; + responseData = jsonPathResult.data; + jsonPathInput = jsonPathResult.parsedData as object; + } + + const jsonPathRequestHeader = Maybe.of(headers[JSON_PATH_HEADER_KEY]); + + // We apply the JSON path to the data that's exposed by the data proxy. + // This allows operators to specify what data is accessible while the data request program can specify what it wants from the accessible data. + if (jsonPathRequestHeader.isJust) { + const jsonPathResult = applyJsonPath( + jsonPathInput, + jsonPathRequestHeader.value, + "request", + 400, + requestLogger, + ); + if (!jsonPathResult.ok) return jsonPathResult.response; + responseData = jsonPathResult.data; + } + + // If the route or proxy has a public endpoint we replace the protocol and host with the public endpoint. + const calledEndpoint = route.baseURL + .or(config.baseURL) + .mapOr(request.url, (t) => { + const pathIndex = request.url.indexOf(path); + return `${t}${request.url.slice(pathIndex)}`; + }); + + requestLogger.debug("Signing data", { + calledEndpoint, + method: request.method, + body: requestBody.unwrapOr(undefined), + responseData, + }); + + const signature = await dataProxy.signData( + calledEndpoint, + request.method, + Buffer.from(requestBody.isJust ? requestBody.value : "", "utf-8"), + Buffer.from(responseData, "utf-8"), + ); + + const responseHeaders = new Headers(); + + // Forward all headers that are configured in the config.json + for (const forwardHeaderKey of route.forwardResponseHeaders) { + const forwardHeaderValue = upstreamResponseHeaders.get(forwardHeaderKey); + if (forwardHeaderValue) { + responseHeaders.append(forwardHeaderKey, forwardHeaderValue); + } + } + + return new Response(responseData, { + headers: createSignedResponseHeaders(signature, responseHeaders), + }); +} + export function startProxyServer( config: Config, dataProxy: DataProxy, @@ -100,332 +436,16 @@ export function startProxyServer( app.route( routeMethod, route.path, - async ({ headers, params, body, path, requestId, request }) => { - const requestLogger = logger.child({ requestId, path }); - - // requestBody is now always a string because of the parse function in this route - const requestBody = Maybe.of(body as string | undefined); - - // Verification with the SEDA chain that the overlay node is eligible - if (!serverOptions.disableProof) { - requestLogger.debug("Verifying proof"); - - const proofHeader = Option.fromNullable( - headers[constants.PROOF_HEADER_KEY], - ); - const sedaFastProofHeader = Option.fromNullable( - headers[constants.SEDA_FAST_PROOF_HEADER_KEY], - ); - - const heightFromHeader = Number( - headers[constants.HEIGHT_HEADER_KEY], - ); - const eligibleHeight = Maybe.of( - Number.isNaN(heightFromHeader) - ? undefined - : BigInt(heightFromHeader), - ); - - requestLogger.debug( - `Received proof for height ${eligibleHeight.mapOr( - "unknown", - (h) => h.toString(), - )}`, - ); - - if ( - Option.isNone(proofHeader) && - Option.isNone(sedaFastProofHeader) - ) { - const message = `Header "${constants.PROOF_HEADER_KEY}" or "${constants.SEDA_FAST_PROOF_HEADER_KEY}" is not provided`; - requestLogger.error(message); - return createErrorResponse(message, 400); - } - - // Disallow SEDA Fast usage if it's not enabled - if ( - !config.sedaFast?.enable && - Option.isSome(sedaFastProofHeader) - ) { - const message = `Header "${constants.SEDA_FAST_PROOF_HEADER_KEY}" is not allowed`; - requestLogger.error(message); - return createErrorResponse(message, 400); - } - - const proofInfo = Match.value(proofHeader).pipe( - Match.when(Option.isSome, (header) => { - return { - decodedProof: dataProxy.decodeProof(header.value), - rawProof: header.value, - type: "seda-core" as const, - }; - }), - Match.when(Option.isNone, () => { - // Should not throw since we checked for both headers above - return { - decodedProof: dataProxy.decodeSedaFastProof( - Option.getOrThrow(sedaFastProofHeader), - ), - rawProof: Option.getOrThrow(sedaFastProofHeader), - type: "seda-fast" as const, - }; - }), - Match.exhaustive, - ); - - if (proofInfo.decodedProof.isErr) { - const message = `Failed to decode proof: ${proofInfo.decodedProof.error}, make sure the proof is a base64 encoded string`; - requestLogger.error(message); - return createErrorResponse(message, 400); - } - - const proofId = - proofInfo.type === "seda-core" - ? proofInfo.decodedProof.value.drId - : proofInfo.decodedProof.value.publicKey.toString("hex"); - - const idType = - proofInfo.type === "seda-core" - ? "Data Request Id" - : "SEDA Fast Public Key"; - requestLogger.debug(`${idType}: ${proofId}`); - - const verificationResult = await Match.value(proofInfo).pipe( - Match.when({ type: "seda-fast" }, async (proof) => { - // Should not happen since we already checked for this above, but we need to satisfy the type checker - if (proof.decodedProof.isErr) { - throw new Error("Failed to decode proof"); - } - - return { - verification: await dataProxy.verifyFastProof( - proof.decodedProof.value, - ), - type: "seda-fast" as const, - }; - }), - Match.when({ type: "seda-core" }, async (proof) => { - return { - verification: await verifyWithRetry( - requestLogger, - dataProxy, - proof.rawProof, - eligibleHeight, - config.verificationMaxRetries, - () => config.verificationRetryDelay, - ), - type: "seda-core" as const, - }; - }), - Match.exhaustive, - ); - - if (verificationResult.verification.isErr) { - const message = `Failed to verify eligibility proof ${verificationResult.verification.error}`; - requestLogger.error(message); - return createErrorResponse(message, 401); - } - - if (!verificationResult.verification.value.isValid) { - const heightOrTimestamp = - verificationResult.type === "seda-core" - ? verificationResult.verification.value.currentHeight - : verificationResult.verification.value - .currentUnixTimestamp; - const message = `Ineligible executor at height/timestamp ${heightOrTimestamp}: ${verificationResult.verification.value.status}`; - requestLogger.error(message); - return createErrorResponse(message, 401); - } - - // Verification passed, we can proceed - } else { - requestLogger.debug("Skipping proof verification."); - } - - // Parse the request URL to get the search params, - // this is to support query params that can be repeated, such as ?one=one&one=two - const requestUrl = new URL(request.url); - - // Add the request search params (?one=two) to the upstream url - const requestSearchParams = createUrlSearchParams( - requestUrl.searchParams, - route.allowedQueryParams, - ); - let upstreamUrl = replaceParams(route.upstreamUrl, params); - const upstreamUrlResult = injectSearchParamsInUrl( - upstreamUrl, - requestSearchParams, - ); - - if (upstreamUrlResult.isErr) { - return createErrorResponse(upstreamUrlResult.error, 500); - } - upstreamUrl = upstreamUrlResult.value.toString(); - - const upstreamHeaders = new Headers(); - - // Redirect all headers given by the requester - for (const [key, value] of Object.entries(headers)) { - if (!value || key === constants.PROOF_HEADER_KEY) { - continue; - } - - upstreamHeaders.append(key, value); - } - - // Inject all configured headers by the data proxy node configuration - // Important: configured headers take precedence over headers sent in the request - for (const [key, value] of Object.entries(route.headers)) { - upstreamHeaders.set(key, replaceParams(value, params)); - } - - // Host doesn't match since we are proxying. Returning the upstream host while the URL does not match results - // in the client to not return the response. - upstreamHeaders.delete("host"); - - requestLogger.debug( - `${routeMethod} ${proxyGroup}${route.path} -> ${upstreamUrl}`, - { headers: upstreamHeaders, body, upstreamUrl }, - ); - - const upstreamResponse = await tryAsync(async () => - fetch(upstreamUrl, { - method: routeMethod, - headers: upstreamHeaders, - body: body as string, - }), - ); - - if (upstreamResponse.isErr) { - const message = `Proxying URL ${route.path} failed: ${upstreamResponse.error}`; - requestLogger.error(message, { error: upstreamResponse.error }); - return createErrorResponse(message, 500); - } - - if (!upstreamResponse.value.ok) { - const body = await tryAsync( - async () => await upstreamResponse.value.text(), - ); - - // Rare case where the upstream response is not a text parseable response - if (body.isErr) { - const message = `Upstream response for ${route.path} is not ok: ${upstreamResponse.value.status} err: ${body.error}`; - requestLogger.error(message, { error: body.error }); - return createErrorResponse( - message, - upstreamResponse.value.status, - ); - } - - const message = `Upstream response for ${route.path} is not ok: ${upstreamResponse.value.status} body: ${body.value}`; - requestLogger.error(message); - - return createErrorResponse( - message, - upstreamResponse.value.status, - ); - } - - requestLogger.debug("Received upstream response", { - headers: upstreamResponse.value.headers, - }); - - const upstreamTextResponse = await tryAsync( - async () => await upstreamResponse.value.text(), - ); - - if (upstreamTextResponse.isErr) { - const message = `Reading ${route.path} response body failed: ${upstreamTextResponse.error}`; - requestLogger.error(message, { - error: upstreamTextResponse.error, - }); - return createErrorResponse(message, 500); - } - - let responseData: string = upstreamTextResponse.value; - - if (route.jsonPath) { - requestLogger.debug(`Applying route JSONpath ${route.jsonPath}`); - const data = queryJson( - upstreamTextResponse.value, - replaceParams(route.jsonPath, params), - ); - - if (data.isErr) { - requestLogger.error( - `Failed to apply route JSONpath: ${route.jsonPath}`, - { error: data.error }, - ); - return createErrorResponse(data.error, 500); - } - - responseData = JSON.stringify(data.value); - requestLogger.debug("Successfully applied route JSONpath"); - } - - const jsonPathRequestHeader = Maybe.of( - headers[JSON_PATH_HEADER_KEY], - ); - - // TODO: Would be nice to only parse the JSON once - if (jsonPathRequestHeader.isJust) { - requestLogger.debug( - `Applying request JSONpath ${jsonPathRequestHeader.value}`, - ); - // We apply the JSON path to the data that's exposed by the data proxy. - // This allows operators to specify what data is accessible while the data request program can specify what it wants from the accessible data. - const data = queryJson(responseData, jsonPathRequestHeader.value); - - if (data.isErr) { - requestLogger.error( - `Failed to apply JSONpath: ${jsonPathRequestHeader.value}`, - { error: data.error }, - ); - return createErrorResponse(data.error, 400); - } - - responseData = JSON.stringify(data.value); - requestLogger.debug("Successfully applied request JSONpath"); - } - - // If the route or proxy has a public endpoint we replace the protocol and host with the public endpoint. - const calledEndpoint = route.baseURL - .or(config.baseURL) - .mapOr(request.url, (t) => { - const pathIndex = request.url.indexOf(path); - return `${t}${request.url.slice(pathIndex)}`; - }); - - requestLogger.debug("Signing data", { - calledEndpoint, - method: request.method, - body: requestBody.unwrapOr(undefined), - responseData, - }); - - const signature = await dataProxy.signData( - calledEndpoint, - request.method, - Buffer.from(requestBody.isJust ? requestBody.value : "", "utf-8"), - Buffer.from(responseData, "utf-8"), - ); - - const responseHeaders = new Headers(); - - // Forward all headers that are configured in the config.json - for (const forwardHeaderKey of route.forwardResponseHeaders) { - const forwardHeaderValue = - upstreamResponse.value.headers.get(forwardHeaderKey); - - if (forwardHeaderValue) { - responseHeaders.append(forwardHeaderKey, forwardHeaderValue); - } - } - - return new Response(responseData, { - headers: createSignedResponseHeaders(signature, responseHeaders), - }); - }, + (ctx) => + handleProxyRequest( + ctx, + route, + routeMethod, + proxyGroup, + config, + dataProxy, + serverOptions, + ), { config: {}, parse: ({ request }) => { @@ -441,6 +461,15 @@ export function startProxyServer( return app; }); + // Validate that every route referencing a custom fetcher has a registered implementation. + for (const route of config.routes) { + if (route.fetcher && !getFetcher(route.fetcher)) { + throw new Error( + `Route "${route.path}" references fetcher "${route.fetcher}", but no fetcher with that name is registered. Register it via registerFetcher() in workspace/data-proxy/src/fetchers/index.ts.`, + ); + } + } + server.listen(serverOptions.port); logger.info( `Proxy routes is at http://127.0.0.1:${serverOptions.port}/${proxyGroup === "" || proxyGroup.endsWith("/") ? `${proxyGroup}` : `${proxyGroup}/`}`, diff --git a/workspace/data-proxy/src/utils/headers.ts b/workspace/data-proxy/src/utils/headers.ts new file mode 100644 index 0000000..56e9d4c --- /dev/null +++ b/workspace/data-proxy/src/utils/headers.ts @@ -0,0 +1,7 @@ +export function headersToRecord(headers: Headers): Record { + const record: Record = {}; + headers.forEach((value, key) => { + record[key] = value; + }); + return record; +} diff --git a/workspace/data-proxy/src/utils/verify-with-retry.ts b/workspace/data-proxy/src/utils/verify-with-retry.ts index 6eeca09..fc7ffab 100644 --- a/workspace/data-proxy/src/utils/verify-with-retry.ts +++ b/workspace/data-proxy/src/utils/verify-with-retry.ts @@ -1,5 +1,4 @@ import type { DataProxy } from "@seda-protocol/data-proxy-sdk"; -import * as Match from "effect/Match"; import type { Maybe, Result } from "true-myth"; import type { Logger } from "winston"; From 60c3435bac544d044a83ddc1f3fb7b32639b69f7 Mon Sep 17 00:00:00 2001 From: jasperdg Date: Fri, 6 Mar 2026 15:25:42 +0100 Subject: [PATCH 2/3] chore: add pyth-pro fetcher smoke-test script --- temp/script.ts | 95 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 95 insertions(+) create mode 100644 temp/script.ts diff --git a/temp/script.ts b/temp/script.ts new file mode 100644 index 0000000..5076929 --- /dev/null +++ b/temp/script.ts @@ -0,0 +1,95 @@ +/** + * Quick smoke-test for the pyth-pro custom fetcher running on the local proxy. + * + * Mirrors the oracle program URL construction: + * let url = [proxy_config.proxy_url.deref(), "v1/", "latest_price"].concat(); + * let url = [proxy_config.proxy_url.deref(), "v1/", "price"].concat(); + * + * Config used (single wildcard route): + * { "path": "/pyth-pro/*", "upstreamUrl": "https://pyth-lazer.dourolabs.app/{*}", "fetcher": "pyth-pro" } + * + * Prerequisites: + * 1. Start the proxy (from the repo root): + * bun run workspace/data-proxy/src/index.ts run \ + * --config config.json \ + * --disable-proof \ + * --skip-registration-check \ + * --port 3000 + * + * 2. Run this script: + * bun run temp/script.ts + */ + +// Simulates proxy_config.proxy_url — the base URL the oracle program holds. +const PROXY_URL = "http://localhost:3000/proxy/pyth-pro/"; + +// Oracle program builds the full URL by concatenating the endpoint segment. +function buildUrl(endpoint: "v1/latest_price" | "v1/price"): string { + return [PROXY_URL, endpoint].join(""); +} + +// --------------------------------------------------------------------------- +// Shared request body fields +// --------------------------------------------------------------------------- +const SHARED_PARAMS = { + priceFeedIds: [1], // confirmed valid: 1 = BTC/USD + channel: "fixed_rate@200ms", + formats: ["evm"], // valid values: evm | solana | leEcdsa | leUnsigned + properties: ["price"], // valid values: price | bestBidPrice | bestAskPrice | publisherCount | exponent | confidence | ... + jsonBinaryEncoding: "base64", + parsed: true, +} as const; + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- +async function post(url: string, body: object): Promise { + console.log(`\n→ POST ${url}`); + console.log(" body:", JSON.stringify(body, null, 2)); + + const res = await fetch(url, { + method: "POST", + headers: { "content-type": "application/json" }, + body: JSON.stringify(body), + }); + + const text = await res.text(); + let parsed: unknown; + try { + parsed = JSON.parse(text); + } catch { + parsed = text; + } + + console.log(` status: ${res.status}`); + console.log(" response:", JSON.stringify(parsed, null, 2)); +} + +// --------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------- + +// 1. latest_price — oracle appends "v1/latest_price" +await post(buildUrl("v1/latest_price"), { + ...SHARED_PARAMS, +}); + +// 2. price — oracle appends "v1/price" + includes timestamp +// Timestamp must be whole-second precision in microseconds: +// Math.floor(Date.now() / 1000) * 1_000_000 +await post(buildUrl("v1/price"), { + ...SHARED_PARAMS, + timestamp: (Math.floor(Date.now() / 1000) - 5 * 60) * 1_000_000, // 5 min ago +}); + +// 3. Error: /v1/price without timestamp → expect 400 +await post(buildUrl("v1/price"), { + ...SHARED_PARAMS, +}); + +// 4. Error: missing priceFeedIds → expect 400 +await post(buildUrl("v1/latest_price"), { + channel: "fixed_rate@200ms", + formats: ["evm"], + properties: ["price"], +}); From a1f5316d1b1e939f2a649fd4f1d77fba214c7788 Mon Sep 17 00:00:00 2001 From: jasperdg Date: Mon, 9 Mar 2026 14:49:11 +0100 Subject: [PATCH 3/3] fix: bun run fmt --- temp/script.ts | 95 ------------------- workspace/data-proxy/src/fetchers/pyth-pro.ts | 10 +- workspace/data-proxy/src/proxy-server.ts | 5 +- 3 files changed, 6 insertions(+), 104 deletions(-) delete mode 100644 temp/script.ts diff --git a/temp/script.ts b/temp/script.ts deleted file mode 100644 index 5076929..0000000 --- a/temp/script.ts +++ /dev/null @@ -1,95 +0,0 @@ -/** - * Quick smoke-test for the pyth-pro custom fetcher running on the local proxy. - * - * Mirrors the oracle program URL construction: - * let url = [proxy_config.proxy_url.deref(), "v1/", "latest_price"].concat(); - * let url = [proxy_config.proxy_url.deref(), "v1/", "price"].concat(); - * - * Config used (single wildcard route): - * { "path": "/pyth-pro/*", "upstreamUrl": "https://pyth-lazer.dourolabs.app/{*}", "fetcher": "pyth-pro" } - * - * Prerequisites: - * 1. Start the proxy (from the repo root): - * bun run workspace/data-proxy/src/index.ts run \ - * --config config.json \ - * --disable-proof \ - * --skip-registration-check \ - * --port 3000 - * - * 2. Run this script: - * bun run temp/script.ts - */ - -// Simulates proxy_config.proxy_url — the base URL the oracle program holds. -const PROXY_URL = "http://localhost:3000/proxy/pyth-pro/"; - -// Oracle program builds the full URL by concatenating the endpoint segment. -function buildUrl(endpoint: "v1/latest_price" | "v1/price"): string { - return [PROXY_URL, endpoint].join(""); -} - -// --------------------------------------------------------------------------- -// Shared request body fields -// --------------------------------------------------------------------------- -const SHARED_PARAMS = { - priceFeedIds: [1], // confirmed valid: 1 = BTC/USD - channel: "fixed_rate@200ms", - formats: ["evm"], // valid values: evm | solana | leEcdsa | leUnsigned - properties: ["price"], // valid values: price | bestBidPrice | bestAskPrice | publisherCount | exponent | confidence | ... - jsonBinaryEncoding: "base64", - parsed: true, -} as const; - -// --------------------------------------------------------------------------- -// Helpers -// --------------------------------------------------------------------------- -async function post(url: string, body: object): Promise { - console.log(`\n→ POST ${url}`); - console.log(" body:", JSON.stringify(body, null, 2)); - - const res = await fetch(url, { - method: "POST", - headers: { "content-type": "application/json" }, - body: JSON.stringify(body), - }); - - const text = await res.text(); - let parsed: unknown; - try { - parsed = JSON.parse(text); - } catch { - parsed = text; - } - - console.log(` status: ${res.status}`); - console.log(" response:", JSON.stringify(parsed, null, 2)); -} - -// --------------------------------------------------------------------------- -// Tests -// --------------------------------------------------------------------------- - -// 1. latest_price — oracle appends "v1/latest_price" -await post(buildUrl("v1/latest_price"), { - ...SHARED_PARAMS, -}); - -// 2. price — oracle appends "v1/price" + includes timestamp -// Timestamp must be whole-second precision in microseconds: -// Math.floor(Date.now() / 1000) * 1_000_000 -await post(buildUrl("v1/price"), { - ...SHARED_PARAMS, - timestamp: (Math.floor(Date.now() / 1000) - 5 * 60) * 1_000_000, // 5 min ago -}); - -// 3. Error: /v1/price without timestamp → expect 400 -await post(buildUrl("v1/price"), { - ...SHARED_PARAMS, -}); - -// 4. Error: missing priceFeedIds → expect 400 -await post(buildUrl("v1/latest_price"), { - channel: "fixed_rate@200ms", - formats: ["evm"], - properties: ["price"], -}); diff --git a/workspace/data-proxy/src/fetchers/pyth-pro.ts b/workspace/data-proxy/src/fetchers/pyth-pro.ts index a0afce0..aa80b9e 100644 --- a/workspace/data-proxy/src/fetchers/pyth-pro.ts +++ b/workspace/data-proxy/src/fetchers/pyth-pro.ts @@ -107,7 +107,6 @@ export const pythProFetcher: Fetcher = { parsed, } = parsedBody; - console.log("priceFeedIds", priceFeedIds); if ( (!priceFeedIds || priceFeedIds.length === 0) && (!priceFeedSymbols || priceFeedSymbols.length === 0) @@ -181,14 +180,11 @@ export const pythProFetcher: Fetcher = { ...(jsonBinaryEncoding !== undefined ? { jsonBinaryEncoding } : {}), ...(parsed !== undefined ? { parsed } : {}), }; - console.log("get here"); - + // timestamp is guaranteed defined here — we return 400 above if endpoint === "price" && timestamp === undefined const result = - endpoint === "price" - ? await client.getPrice({ ...sdkParams, timestamp: timestamp! }) + endpoint === "price" && timestamp !== undefined + ? await client.getPrice({ ...sdkParams, timestamp }) : await client.getLatestPrice(sdkParams); - - console.log("result", result); return { status: 200, diff --git a/workspace/data-proxy/src/proxy-server.ts b/workspace/data-proxy/src/proxy-server.ts index 565d5a9..f7de944 100644 --- a/workspace/data-proxy/src/proxy-server.ts +++ b/workspace/data-proxy/src/proxy-server.ts @@ -10,12 +10,12 @@ import { DEFAULT_PROXY_ROUTE_GROUP, JSON_PATH_HEADER_KEY } from "./constants"; import { getFetcher } from "./fetchers"; import { defaultFetcher } from "./fetchers/default-fetcher"; import logger from "./logger"; -import { headersToRecord } from "./utils/headers"; import { StatusContext, statusPlugin } from "./status-plugin"; import { createDefaultResponseHeaders, createSignedResponseHeaders, } from "./utils/create-headers"; +import { headersToRecord } from "./utils/headers"; import { queryJson } from "./utils/query-json"; import { replaceParams } from "./utils/replace-params"; import { createUrlSearchParams } from "./utils/search-params"; @@ -258,7 +258,8 @@ async function handleProxyRequest( // Use the configured custom fetcher, or fall back to the default HTTP fetcher const fetcherName = route.fetcher ?? "default"; - const fetcher = route.fetcher ? getFetcher(route.fetcher)! : defaultFetcher; + const fetcher = + (route.fetcher ? getFetcher(route.fetcher) : undefined) ?? defaultFetcher; requestLogger.debug( route.fetcher