Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions bun.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions workspace/data-proxy/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"@cosmjs/crypto": "^0.33.1",
"@cosmjs/encoding": "^0.33.1",
"@dotenvx/dotenvx": "^1.51.4",
"@pythnetwork/pyth-lazer-sdk": "^6.0.0",
"@seda-protocol/data-proxy-sdk": "workspace:*",
"@seda-protocol/utils": "^2.0.0",
"big.js": "7.0.1",
Expand Down
40 changes: 22 additions & 18 deletions workspace/data-proxy/src/config-parser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const RouteSchema = v.strictObject(
baseURL: maybe(v.string()),
path: v.string(),
upstreamUrl: v.string(),
fetcher: v.optional(v.string()),
method: v.optional(HttpMethodSchema, DEFAULT_HTTP_METHODS),
jsonPath: v.optional(v.pipe(v.string(), v.startsWith("$"))),
allowedQueryParams: v.optional(v.array(v.string())),
Expand Down Expand Up @@ -262,29 +263,32 @@ export function parseConfig(
}

// Ensure variables in the path are used in the upstream URL, the headers or the jsonPath.
for (const match of route.path.matchAll(pathVarRegex)) {
let isUsed = false;
// Skip this check for custom fetchers since they control their own parameter usage.
if (!route.fetcher) {
for (const match of route.path.matchAll(pathVarRegex)) {
let isUsed = false;

if (route.upstreamUrl.includes(match[1])) {
isUsed = true;
}

for (const [_, headerValue] of Object.entries(route.headers)) {
if (headerValue.includes(match[1])) {
if (route.upstreamUrl.includes(match[1])) {
isUsed = true;
break;
}
}

if (route.jsonPath?.includes(match[1])) {
isUsed = true;
}
for (const [_, headerValue] of Object.entries(route.headers)) {
if (headerValue.includes(match[1])) {
isUsed = true;
break;
}
}

if (!isUsed) {
hasWarnings = true;
logger.warn(
`$.${index}.path has ${match[1]}, but it is not used in $.${index}.upstreamUrl or $.${index}.headers. \nPlease either remove the variable from the path or use it in the upstreamUrl or headers (through "{${match[1]}}").`,
);
if (route.jsonPath?.includes(match[1])) {
isUsed = true;
}

if (!isUsed) {
hasWarnings = true;
logger.warn(
`$.${index}.path has ${match[1]}, but it is not used in $.${index}.upstreamUrl or $.${index}.headers. \nPlease either remove the variable from the path or use it in the upstreamUrl or headers (through "{${match[1]}}").`,
);
}
}
}

Expand Down
20 changes: 20 additions & 0 deletions workspace/data-proxy/src/fetchers/default-fetcher.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import { headersToRecord } from "../utils/headers";
import type { Fetcher, FetcherRequest, FetcherResponse } from "./types";

export const defaultFetcher: Fetcher = {
async fetch(request: FetcherRequest): Promise<FetcherResponse> {
const response = await fetch(request.url, {
method: request.method,
headers: request.headers,
body: request.body,
});

const body = await response.text();

return {
status: response.status,
body,
headers: headersToRecord(response.headers),
};
},
};
23 changes: 23 additions & 0 deletions workspace/data-proxy/src/fetchers/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import { defaultFetcher } from "./default-fetcher";
import { pythProFetcher } from "./pyth-pro";
import type { Fetcher } from "./types";

export type { Fetcher, FetcherRequest, FetcherResponse } from "./types";

const registry = new Map<string, Fetcher>();

export function registerFetcher(name: string, fetcher: Fetcher): void {
registry.set(name, fetcher);
}

export function getFetcher(name: string): Fetcher | undefined {
return registry.get(name);
}

export function getRegisteredFetcherNames(): string[] {
return Array.from(registry.keys());
}

// Built-in fetchers — add new ones here
registerFetcher("default", defaultFetcher);
registerFetcher("pyth-pro", pythProFetcher);
195 changes: 195 additions & 0 deletions workspace/data-proxy/src/fetchers/pyth-pro.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
import type {
Channel,
Format,
JsonBinaryEncoding,
PriceFeedProperty,
} from "@pythnetwork/pyth-lazer-sdk";
import { PythLazerClient } from "@pythnetwork/pyth-lazer-sdk";
import type { Fetcher, FetcherRequest, FetcherResponse } from "./types";

const clientCache = new Map<string, PythLazerClient>();

async function getOrCreateClient(
priceServiceUrl: string,
token: string,
): Promise<PythLazerClient> {
const key = `${priceServiceUrl}|${token}`;
if (!clientCache.has(key)) {
clientCache.set(
key,
await PythLazerClient.create({ token, priceServiceUrl }),
);
}
return clientCache.get(key) as PythLazerClient;
}

type PythProEndpoint = "latest_price" | "price";

interface PythProRequestBody {
// Feed identifiers — at least one of these must be provided
priceFeedIds?: number[];
priceFeedSymbols?: string[];
// Required when the URL endpoint is "price".
// Must be whole-second precision in microseconds: Math.floor(Date.now() / 1000) * 1_000_000
timestamp?: number;
// SDK subscription params — passed through directly to the SDK
channel: Channel;
formats: Format[];
properties: PriceFeedProperty[];
jsonBinaryEncoding?: JsonBinaryEncoding;
parsed?: boolean;
}

function resolveEndpoint(rawUrl: string): PythProEndpoint | null {
let pathname: string;
try {
pathname = new URL(rawUrl).pathname;
} catch {
return null;
}
const last = pathname.split("/").filter(Boolean).pop();
if (last === "latest_price" || last === "price") return last;
return null;
}

/** Strips the trailing /v1/<endpoint> from a URL to get the Pyth Lazer service base URL. */
function resolveBaseUrl(rawUrl: string): string {
try {
const parsed = new URL(rawUrl);
const v1Index = parsed.pathname.lastIndexOf("/v1/");
const basePath =
v1Index !== -1 ? parsed.pathname.substring(0, v1Index) : "";
return `${parsed.origin}${basePath}`;
} catch {
return rawUrl;
}
}

export const pythProFetcher: Fetcher = {
async fetch(request: FetcherRequest): Promise<FetcherResponse> {
const authHeader =
request.headers.authorization ?? request.headers.Authorization;

if (!authHeader) {
return {
status: 401,
body: JSON.stringify({
error:
'Missing Authorization header. Set it in the route config: "headers": { "Authorization": "Bearer {$PYTH_LAZER_TOKEN}" }',
}),
headers: { "content-type": "application/json" },
};
}

const token = authHeader.startsWith("Bearer ")
? authHeader.slice("Bearer ".length)
: authHeader;

let parsedBody: PythProRequestBody;
try {
parsedBody = JSON.parse(request.body ?? "{}") as PythProRequestBody;
} catch {
return {
status: 400,
body: JSON.stringify({ error: "Request body must be valid JSON" }),
headers: { "content-type": "application/json" },
};
}

const {
priceFeedIds,
priceFeedSymbols,
timestamp,
channel,
formats,
properties,
jsonBinaryEncoding,
parsed,
} = parsedBody;

if (
(!priceFeedIds || priceFeedIds.length === 0) &&
(!priceFeedSymbols || priceFeedSymbols.length === 0)
) {
return {
status: 400,
body: JSON.stringify({
error:
"Request body must include at least one of: priceFeedIds, priceFeedSymbols",
}),
headers: { "content-type": "application/json" },
};
}

if (!channel) {
return {
status: 400,
body: JSON.stringify({
error:
'Request body must include "channel" (real_time | fixed_rate@50ms | fixed_rate@200ms | fixed_rate@1000ms)',
}),
headers: { "content-type": "application/json" },
};
}

if (!properties || properties.length === 0) {
return {
status: 400,
body: JSON.stringify({
error: 'Request body must include "properties" array',
}),
headers: { "content-type": "application/json" },
};
}

const endpoint = resolveEndpoint(request.url);

if (!endpoint) {
return {
status: 400,
body: JSON.stringify({
error:
'Upstream URL must end with either "/v1/latest_price" or "/v1/price"',
}),
headers: { "content-type": "application/json" },
};
}

if (endpoint === "price" && timestamp === undefined) {
return {
status: 400,
body: JSON.stringify({
error:
'The "/v1/price" endpoint requires "timestamp" (whole-second microseconds, e.g. Math.floor(Date.now() / 1000) * 1_000_000) in the request body',
}),
headers: { "content-type": "application/json" },
};
}

const baseUrl = resolveBaseUrl(request.url);
const client = await getOrCreateClient(baseUrl, token);

const sdkParams = {
...(priceFeedIds && priceFeedIds.length > 0 ? { priceFeedIds } : {}),
...(priceFeedSymbols && priceFeedSymbols.length > 0
? { symbols: priceFeedSymbols }
: {}),
channel,
formats: formats ?? [],
properties,
...(jsonBinaryEncoding !== undefined ? { jsonBinaryEncoding } : {}),
...(parsed !== undefined ? { parsed } : {}),
};
// timestamp is guaranteed defined here — we return 400 above if endpoint === "price" && timestamp === undefined
const result =
endpoint === "price" && timestamp !== undefined
? await client.getPrice({ ...sdkParams, timestamp })
: await client.getLatestPrice(sdkParams);

return {
status: 200,
body: JSON.stringify(result),
headers: { "content-type": "application/json" },
};
},
};
Loading
Loading