Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 33 additions & 3 deletions api/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion api/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
"license": "MIT",
"dependencies": {
"@stellar/stellar-sdk": "^14.5.0",
"@types/ws": "^8.18.1",
"axios": "^1.6.2",
"bcryptjs": "^2.4.3",
"cors": "^2.8.5",
Expand All @@ -33,7 +34,8 @@
"express-validator": "^7.0.1",
"helmet": "^7.1.0",
"jsonwebtoken": "^9.0.2",
"winston": "^3.11.0"
"winston": "^3.11.0",
"ws": "^8.20.0"
},
"devDependencies": {
"@types/bcryptjs": "^2.4.6",
Expand Down
8 changes: 8 additions & 0 deletions api/src/config/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,12 @@ export const config = {
retryInitialDelayMs: parseInt(process.env.RETRY_INITIAL_DELAY_MS || '1000', 10),
retryMaxDelayMs: parseInt(process.env.RETRY_MAX_DELAY_MS || '10000', 10),
},
ws: {
/** Milliseconds between price poll cycles (default 30 s) */
priceUpdateIntervalMs: parseInt(process.env.WS_PRICE_UPDATE_INTERVAL_MS || '30000', 10),
/** Milliseconds between heartbeat pings (default 30 s) */
heartbeatIntervalMs: parseInt(process.env.WS_HEARTBEAT_INTERVAL_MS || '30000', 10),
/** Optional oracle service base URL for price data (e.g. http://localhost:4000) */
oracleApiUrl: process.env.ORACLE_API_URL || '',
},
};
10 changes: 9 additions & 1 deletion api/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
import { createServer } from 'http';
import app from './app';
import { config } from './config';
import logger from './utils/logger';
import { createPriceWebSocket } from './ws/priceWebSocket';

const PORT = config.server.port;

app.listen(PORT, () => {
const server = createServer(app);

// Attach WebSocket price server to the same HTTP server
createPriceWebSocket(server);

server.listen(PORT, () => {
logger.info(`StellarLend API server running on port ${PORT}`);
logger.info(`Environment: ${config.server.env}`);
logger.info(`Network: ${config.stellar.network}`);
logger.info(`WebSocket price feed: ws://localhost:${PORT}/api/ws/prices`);
});

process.on('unhandledRejection', (reason, promise) => {
Expand Down
32 changes: 32 additions & 0 deletions api/src/types/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,3 +73,35 @@ export enum TransactionStatus {
FAILED = 'failed',
NOT_FOUND = 'not_found',
}

// ─── WebSocket Types ───────────────────────────────────────────────────────────

export interface PriceData {
asset: string;
price: number;
timestamp: number;
}

export interface WsSubscribeMessage {
type: 'subscribe';
/** Asset symbols to subscribe to, e.g. ["XLM","BTC"] or ["*"] for all */
assets: string[];
}

export interface WsUnsubscribeMessage {
type: 'unsubscribe';
assets: string[];
}

export interface WsPingMessage {
type: 'ping';
}

export type ClientMessage = WsSubscribeMessage | WsUnsubscribeMessage | WsPingMessage;

export type ServerMessage =
| { type: 'price_update'; asset: string; price: number; timestamp: number }
| { type: 'subscribed'; assets: string[] }
| { type: 'unsubscribed'; assets: string[] }
| { type: 'pong' }
| { type: 'error'; message: string };
229 changes: 229 additions & 0 deletions api/src/ws/priceWebSocket.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
import { WebSocket, WebSocketServer } from 'ws';
import { IncomingMessage } from 'http';
import { Server } from 'http';
import axios from 'axios';
import logger from '../utils/logger';
import { config } from '../config';
import {
PriceData,
ClientMessage,
ServerMessage,
WsSubscribeMessage,
WsUnsubscribeMessage,
} from '../types';

const SUPPORTED_ASSETS = ['XLM', 'USDC', 'BTC', 'ETH', 'SOL'];

const COINGECKO_IDS: Record<string, string> = {
XLM: 'stellar',
USDC: 'usd-coin',
BTC: 'bitcoin',
ETH: 'ethereum',
SOL: 'solana',
};

export class PriceWebSocketServer {
private wss: WebSocketServer;
private clientSubscriptions: Map<WebSocket, Set<string>> = new Map();
private lastPrices: Map<string, PriceData> = new Map();
private pollIntervalId?: ReturnType<typeof setInterval>;
private heartbeatIntervalId?: ReturnType<typeof setInterval>;

constructor(server: Server) {
this.wss = new WebSocketServer({ server, path: '/api/ws/prices' });
this.setupConnectionHandler();
this.startPricePolling();
this.startHeartbeat();
logger.info('WebSocket price server initialised at /api/ws/prices');
}

private setupConnectionHandler(): void {
this.wss.on('connection', (ws: WebSocket, req: IncomingMessage) => {
logger.info('WebSocket client connected', { ip: req.socket.remoteAddress });
this.clientSubscriptions.set(ws, new Set());

ws.on('message', (data) => {
try {
const msg: ClientMessage = JSON.parse(data.toString());
this.handleClientMessage(ws, msg);
} catch {
this.send(ws, { type: 'error', message: 'Invalid JSON message' });
}
});

ws.on('close', () => {
this.clientSubscriptions.delete(ws);
logger.info('WebSocket client disconnected');
});

ws.on('error', (err) => {
logger.error('WebSocket client error', { error: err.message });
this.clientSubscriptions.delete(ws);
});
});
}

private handleClientMessage(ws: WebSocket, msg: ClientMessage): void {
switch (msg.type) {
case 'subscribe': {
const subs = this.clientSubscriptions.get(ws);
if (!subs) return;

const requested = (msg as WsSubscribeMessage).assets;
const toSubscribe = requested.includes('*')
? SUPPORTED_ASSETS
: requested.map((a) => a.toUpperCase()).filter((a) => SUPPORTED_ASSETS.includes(a));

toSubscribe.forEach((a) => subs.add(a));
this.send(ws, { type: 'subscribed', assets: toSubscribe });

// Send cached prices immediately
toSubscribe.forEach((asset) => {
const cached = this.lastPrices.get(asset);
if (cached) {
this.send(ws, {
type: 'price_update',
asset: cached.asset,
price: cached.price,
timestamp: cached.timestamp,
});
}
});
break;
}

case 'unsubscribe': {
const subs = this.clientSubscriptions.get(ws);
if (!subs) return;
const assets = (msg as WsUnsubscribeMessage).assets.map((a) => a.toUpperCase());
assets.forEach((a) => subs.delete(a));
this.send(ws, { type: 'unsubscribed', assets });
break;
}

case 'ping':
this.send(ws, { type: 'pong' });
break;

default:
this.send(ws, { type: 'error', message: 'Unknown message type' });
}
}

private send(ws: WebSocket, msg: ServerMessage): void {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify(msg));
}
}

async fetchPrices(): Promise<Map<string, number>> {
const prices = new Map<string, number>();

// Prefer oracle API URL if configured
const oracleUrl = config.ws.oracleApiUrl;
if (oracleUrl) {
try {
const response = await axios.get<Record<string, number>>(`${oracleUrl}/prices`, {
timeout: 5000,
});
Object.entries(response.data).forEach(([asset, price]) =>
prices.set(asset.toUpperCase(), price)
);
if (prices.size > 0) return prices;
} catch (err) {
logger.warn('Oracle API price fetch failed, falling back to CoinGecko', { err });
}
}

// Fallback: CoinGecko public API
try {
const ids = SUPPORTED_ASSETS.map((a) => COINGECKO_IDS[a]).join(',');
const response = await axios.get<Record<string, Record<string, number>>>(
`https://api.coingecko.com/api/v3/simple/price?ids=${ids}&vs_currencies=usd`,
{ timeout: 8000 }
);

SUPPORTED_ASSETS.forEach((asset) => {
const id = COINGECKO_IDS[asset];
const price = response.data[id]?.usd;
if (price !== undefined) {
prices.set(asset, price);
}
});
} catch (err) {
logger.error('CoinGecko price fetch failed', { err });
}

return prices;
}

async pollAndBroadcast(): Promise<void> {
const prices = await this.fetchPrices();
const now = Math.floor(Date.now() / 1000);

prices.forEach((price, asset) => {
const last = this.lastPrices.get(asset);
const changed = !last || last.price !== price;

const update: PriceData = { asset, price, timestamp: now };
this.lastPrices.set(asset, update);

if (changed) {
this.broadcastPriceUpdate(asset, update);
}
});
}

private broadcastPriceUpdate(asset: string, data: PriceData): void {
const msg: ServerMessage = {
type: 'price_update',
asset: data.asset,
price: data.price,
timestamp: data.timestamp,
};

this.clientSubscriptions.forEach((subs, ws) => {
if (subs.has(asset)) {
this.send(ws, msg);
}
});
}

private startPricePolling(): void {
this.pollAndBroadcast().catch((err) => logger.error('Initial price poll failed', { err }));

this.pollIntervalId = setInterval(() => {
this.pollAndBroadcast().catch((err) => logger.error('Price poll cycle failed', { err }));
}, config.ws.priceUpdateIntervalMs);
}

private startHeartbeat(): void {
this.heartbeatIntervalId = setInterval(() => {
this.wss.clients.forEach((ws) => {
if (ws.readyState === WebSocket.OPEN) {
ws.ping();
} else {
this.clientSubscriptions.delete(ws);
}
});
}, config.ws.heartbeatIntervalMs);
}

close(): void {
if (this.pollIntervalId) clearInterval(this.pollIntervalId);
if (this.heartbeatIntervalId) clearInterval(this.heartbeatIntervalId);
this.wss.close();
}

get clientCount(): number {
return this.wss.clients.size;
}

get supportedAssets(): string[] {
return [...SUPPORTED_ASSETS];
}
}

export function createPriceWebSocket(server: Server): PriceWebSocketServer {
return new PriceWebSocketServer(server);
}
Loading
Loading