diff --git a/api/package-lock.json b/api/package-lock.json index 529ea6b4..7bf06d5e 100644 --- a/api/package-lock.json +++ b/api/package-lock.json @@ -10,6 +10,7 @@ "license": "MIT", "dependencies": { "@stellar/stellar-sdk": "^14.5.0", + "@types/ws": "^8.18.1", "axios": "^1.6.2", "bcryptjs": "^2.4.3", "cors": "^2.8.5", @@ -19,7 +20,8 @@ "express-validator": "^7.0.1", "helmet": "^7.1.0", "jsonwebtoken": "^9.0.2", - "winston": "^3.11.0" + "winston": "^3.11.0", + "ws": "^8.20.0" }, "devDependencies": { "@types/bcryptjs": "^2.4.6", @@ -1608,7 +1610,6 @@ "version": "20.19.37", "resolved": "https://registry.npmjs.org/@types/node/-/node-20.19.37.tgz", "integrity": "sha512-8kzdPJ3FsNsVIurqBs7oodNnCEVbni9yUEkaHbgptDACOPW04jimGagZ51E6+lXUwJjgnBw+hyko/lkFWCldqw==", - "dev": true, "license": "MIT", "dependencies": { "undici-types": "~6.21.0" @@ -1719,6 +1720,15 @@ "integrity": "sha512-6WaYesThRMCl19iryMYP7/x2OVgCtbIVflDGFpWnb9irXI3UjYE4AzmYuiUKY1AJstGijoY+MgUszMgRxIYTYw==", "license": "MIT" }, + "node_modules/@types/ws": { + "version": "8.18.1", + "resolved": "https://registry.npmjs.org/@types/ws/-/ws-8.18.1.tgz", + "integrity": "sha512-ThVF6DCVhA8kUGy+aazFQ4kXQ7E1Ty7A3ypFOe0IcJV8O/M511G99AW24irKrW56Wt44yG9+ij8FaqoBGkuBXg==", + "license": "MIT", + "dependencies": { + "@types/node": "*" + } + }, "node_modules/@types/yargs": { "version": "17.0.35", "resolved": "https://registry.npmjs.org/@types/yargs/-/yargs-17.0.35.tgz", @@ -7337,7 +7347,6 @@ "version": "6.21.0", "resolved": "https://registry.npmjs.org/undici-types/-/undici-types-6.21.0.tgz", "integrity": "sha512-iwDZqg0QAGrg9Rav5H4n0M64c3mkR59cJ6wQp+7C4nI0gsmExaedaYLNO44eT4AtBBwjbTiGPMlt2Md0T9H9JQ==", - "dev": true, "license": "MIT" }, "node_modules/unpipe": { @@ -7590,6 +7599,27 @@ "node": "^12.13.0 || ^14.15.0 || >=16.0.0" } }, + "node_modules/ws": { + "version": "8.20.0", + "resolved": "https://registry.npmjs.org/ws/-/ws-8.20.0.tgz", + "integrity": "sha512-sAt8BhgNbzCtgGbt2OxmpuryO63ZoDk/sqaB/znQm94T4fCEsy/yV+7CdC1kJhOU9lboAEU7R3kquuycDoibVA==", + "license": "MIT", + "engines": { + "node": ">=10.0.0" + }, + "peerDependencies": { + "bufferutil": "^4.0.1", + "utf-8-validate": ">=5.0.2" + }, + "peerDependenciesMeta": { + "bufferutil": { + "optional": true + }, + "utf-8-validate": { + "optional": true + } + } + }, "node_modules/xtend": { "version": "4.0.2", "resolved": "https://registry.npmjs.org/xtend/-/xtend-4.0.2.tgz", diff --git a/api/package.json b/api/package.json index 758de24b..3d87a465 100644 --- a/api/package.json +++ b/api/package.json @@ -24,6 +24,7 @@ "license": "MIT", "dependencies": { "@stellar/stellar-sdk": "^14.5.0", + "@types/ws": "^8.18.1", "axios": "^1.6.2", "bcryptjs": "^2.4.3", "cors": "^2.8.5", @@ -33,7 +34,8 @@ "express-validator": "^7.0.1", "helmet": "^7.1.0", "jsonwebtoken": "^9.0.2", - "winston": "^3.11.0" + "winston": "^3.11.0", + "ws": "^8.20.0" }, "devDependencies": { "@types/bcryptjs": "^2.4.6", diff --git a/api/src/config/index.ts b/api/src/config/index.ts index 85ed31ea..6fe6c0f5 100644 --- a/api/src/config/index.ts +++ b/api/src/config/index.ts @@ -40,4 +40,12 @@ export const config = { retryInitialDelayMs: parseInt(process.env.RETRY_INITIAL_DELAY_MS || '1000', 10), retryMaxDelayMs: parseInt(process.env.RETRY_MAX_DELAY_MS || '10000', 10), }, + ws: { + /** Milliseconds between price poll cycles (default 30 s) */ + priceUpdateIntervalMs: parseInt(process.env.WS_PRICE_UPDATE_INTERVAL_MS || '30000', 10), + /** Milliseconds between heartbeat pings (default 30 s) */ + heartbeatIntervalMs: parseInt(process.env.WS_HEARTBEAT_INTERVAL_MS || '30000', 10), + /** Optional oracle service base URL for price data (e.g. http://localhost:4000) */ + oracleApiUrl: process.env.ORACLE_API_URL || '', + }, }; diff --git a/api/src/index.ts b/api/src/index.ts index 7fcc978f..88cf36df 100644 --- a/api/src/index.ts +++ b/api/src/index.ts @@ -1,13 +1,21 @@ +import { createServer } from 'http'; import app from './app'; import { config } from './config'; import logger from './utils/logger'; +import { createPriceWebSocket } from './ws/priceWebSocket'; const PORT = config.server.port; -app.listen(PORT, () => { +const server = createServer(app); + +// Attach WebSocket price server to the same HTTP server +createPriceWebSocket(server); + +server.listen(PORT, () => { logger.info(`StellarLend API server running on port ${PORT}`); logger.info(`Environment: ${config.server.env}`); logger.info(`Network: ${config.stellar.network}`); + logger.info(`WebSocket price feed: ws://localhost:${PORT}/api/ws/prices`); }); process.on('unhandledRejection', (reason, promise) => { diff --git a/api/src/types/index.ts b/api/src/types/index.ts index f7063248..1ef94e44 100644 --- a/api/src/types/index.ts +++ b/api/src/types/index.ts @@ -73,3 +73,35 @@ export enum TransactionStatus { FAILED = 'failed', NOT_FOUND = 'not_found', } + +// ─── WebSocket Types ─────────────────────────────────────────────────────────── + +export interface PriceData { + asset: string; + price: number; + timestamp: number; +} + +export interface WsSubscribeMessage { + type: 'subscribe'; + /** Asset symbols to subscribe to, e.g. ["XLM","BTC"] or ["*"] for all */ + assets: string[]; +} + +export interface WsUnsubscribeMessage { + type: 'unsubscribe'; + assets: string[]; +} + +export interface WsPingMessage { + type: 'ping'; +} + +export type ClientMessage = WsSubscribeMessage | WsUnsubscribeMessage | WsPingMessage; + +export type ServerMessage = + | { type: 'price_update'; asset: string; price: number; timestamp: number } + | { type: 'subscribed'; assets: string[] } + | { type: 'unsubscribed'; assets: string[] } + | { type: 'pong' } + | { type: 'error'; message: string }; diff --git a/api/src/ws/priceWebSocket.ts b/api/src/ws/priceWebSocket.ts new file mode 100644 index 00000000..6973648b --- /dev/null +++ b/api/src/ws/priceWebSocket.ts @@ -0,0 +1,229 @@ +import { WebSocket, WebSocketServer } from 'ws'; +import { IncomingMessage } from 'http'; +import { Server } from 'http'; +import axios from 'axios'; +import logger from '../utils/logger'; +import { config } from '../config'; +import { + PriceData, + ClientMessage, + ServerMessage, + WsSubscribeMessage, + WsUnsubscribeMessage, +} from '../types'; + +const SUPPORTED_ASSETS = ['XLM', 'USDC', 'BTC', 'ETH', 'SOL']; + +const COINGECKO_IDS: Record = { + XLM: 'stellar', + USDC: 'usd-coin', + BTC: 'bitcoin', + ETH: 'ethereum', + SOL: 'solana', +}; + +export class PriceWebSocketServer { + private wss: WebSocketServer; + private clientSubscriptions: Map> = new Map(); + private lastPrices: Map = new Map(); + private pollIntervalId?: ReturnType; + private heartbeatIntervalId?: ReturnType; + + constructor(server: Server) { + this.wss = new WebSocketServer({ server, path: '/api/ws/prices' }); + this.setupConnectionHandler(); + this.startPricePolling(); + this.startHeartbeat(); + logger.info('WebSocket price server initialised at /api/ws/prices'); + } + + private setupConnectionHandler(): void { + this.wss.on('connection', (ws: WebSocket, req: IncomingMessage) => { + logger.info('WebSocket client connected', { ip: req.socket.remoteAddress }); + this.clientSubscriptions.set(ws, new Set()); + + ws.on('message', (data) => { + try { + const msg: ClientMessage = JSON.parse(data.toString()); + this.handleClientMessage(ws, msg); + } catch { + this.send(ws, { type: 'error', message: 'Invalid JSON message' }); + } + }); + + ws.on('close', () => { + this.clientSubscriptions.delete(ws); + logger.info('WebSocket client disconnected'); + }); + + ws.on('error', (err) => { + logger.error('WebSocket client error', { error: err.message }); + this.clientSubscriptions.delete(ws); + }); + }); + } + + private handleClientMessage(ws: WebSocket, msg: ClientMessage): void { + switch (msg.type) { + case 'subscribe': { + const subs = this.clientSubscriptions.get(ws); + if (!subs) return; + + const requested = (msg as WsSubscribeMessage).assets; + const toSubscribe = requested.includes('*') + ? SUPPORTED_ASSETS + : requested.map((a) => a.toUpperCase()).filter((a) => SUPPORTED_ASSETS.includes(a)); + + toSubscribe.forEach((a) => subs.add(a)); + this.send(ws, { type: 'subscribed', assets: toSubscribe }); + + // Send cached prices immediately + toSubscribe.forEach((asset) => { + const cached = this.lastPrices.get(asset); + if (cached) { + this.send(ws, { + type: 'price_update', + asset: cached.asset, + price: cached.price, + timestamp: cached.timestamp, + }); + } + }); + break; + } + + case 'unsubscribe': { + const subs = this.clientSubscriptions.get(ws); + if (!subs) return; + const assets = (msg as WsUnsubscribeMessage).assets.map((a) => a.toUpperCase()); + assets.forEach((a) => subs.delete(a)); + this.send(ws, { type: 'unsubscribed', assets }); + break; + } + + case 'ping': + this.send(ws, { type: 'pong' }); + break; + + default: + this.send(ws, { type: 'error', message: 'Unknown message type' }); + } + } + + private send(ws: WebSocket, msg: ServerMessage): void { + if (ws.readyState === WebSocket.OPEN) { + ws.send(JSON.stringify(msg)); + } + } + + async fetchPrices(): Promise> { + const prices = new Map(); + + // Prefer oracle API URL if configured + const oracleUrl = config.ws.oracleApiUrl; + if (oracleUrl) { + try { + const response = await axios.get>(`${oracleUrl}/prices`, { + timeout: 5000, + }); + Object.entries(response.data).forEach(([asset, price]) => + prices.set(asset.toUpperCase(), price) + ); + if (prices.size > 0) return prices; + } catch (err) { + logger.warn('Oracle API price fetch failed, falling back to CoinGecko', { err }); + } + } + + // Fallback: CoinGecko public API + try { + const ids = SUPPORTED_ASSETS.map((a) => COINGECKO_IDS[a]).join(','); + const response = await axios.get>>( + `https://api.coingecko.com/api/v3/simple/price?ids=${ids}&vs_currencies=usd`, + { timeout: 8000 } + ); + + SUPPORTED_ASSETS.forEach((asset) => { + const id = COINGECKO_IDS[asset]; + const price = response.data[id]?.usd; + if (price !== undefined) { + prices.set(asset, price); + } + }); + } catch (err) { + logger.error('CoinGecko price fetch failed', { err }); + } + + return prices; + } + + async pollAndBroadcast(): Promise { + const prices = await this.fetchPrices(); + const now = Math.floor(Date.now() / 1000); + + prices.forEach((price, asset) => { + const last = this.lastPrices.get(asset); + const changed = !last || last.price !== price; + + const update: PriceData = { asset, price, timestamp: now }; + this.lastPrices.set(asset, update); + + if (changed) { + this.broadcastPriceUpdate(asset, update); + } + }); + } + + private broadcastPriceUpdate(asset: string, data: PriceData): void { + const msg: ServerMessage = { + type: 'price_update', + asset: data.asset, + price: data.price, + timestamp: data.timestamp, + }; + + this.clientSubscriptions.forEach((subs, ws) => { + if (subs.has(asset)) { + this.send(ws, msg); + } + }); + } + + private startPricePolling(): void { + this.pollAndBroadcast().catch((err) => logger.error('Initial price poll failed', { err })); + + this.pollIntervalId = setInterval(() => { + this.pollAndBroadcast().catch((err) => logger.error('Price poll cycle failed', { err })); + }, config.ws.priceUpdateIntervalMs); + } + + private startHeartbeat(): void { + this.heartbeatIntervalId = setInterval(() => { + this.wss.clients.forEach((ws) => { + if (ws.readyState === WebSocket.OPEN) { + ws.ping(); + } else { + this.clientSubscriptions.delete(ws); + } + }); + }, config.ws.heartbeatIntervalMs); + } + + close(): void { + if (this.pollIntervalId) clearInterval(this.pollIntervalId); + if (this.heartbeatIntervalId) clearInterval(this.heartbeatIntervalId); + this.wss.close(); + } + + get clientCount(): number { + return this.wss.clients.size; + } + + get supportedAssets(): string[] { + return [...SUPPORTED_ASSETS]; + } +} + +export function createPriceWebSocket(server: Server): PriceWebSocketServer { + return new PriceWebSocketServer(server); +} diff --git a/oracle/src/config.ts b/oracle/src/config.ts index d0dd27c7..c69bcd47 100644 --- a/oracle/src/config.ts +++ b/oracle/src/config.ts @@ -184,7 +184,9 @@ export function maskSecret(key: string): string { * Returns a safe (redacted) version of the config for logging. * Strips adminSecretKey entirely. */ -export function getSafeConfig(config: OracleServiceConfig): Omit & { adminSecretKey: string } { +export function getSafeConfig( + config: OracleServiceConfig +): Omit & { adminSecretKey: string } { return { ...config, adminSecretKey: maskSecret(config.adminSecretKey),