Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
85 changes: 48 additions & 37 deletions backend/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,43 +1,54 @@
import dotenv from 'dotenv';
import app from './app.js';
import logger from './logger.js';
import { startWorkers, stopWorkers } from './workers/index.js';
import dotenv from "dotenv";
import app from "./app.js";
import logger from "./logger.js";
import { sorobanIndexerService } from "./services/soroban-indexer.service.js";
import { startWorkers, stopWorkers } from "./workers/index.js";

dotenv.config();

const startServer = async () => {
try {
// Validate database connectivity
const { prisma } = await import('./lib/prisma.js');
await prisma.$connect();
await prisma.$queryRaw`SELECT 1`;
logger.info('Database connection established successfully');

const port = process.env.PORT || 3001;
const server = app.listen(port, () => {
logger.info(`Server started on port ${port}`);
logger.info(`API Documentation available at http://localhost:${port}/api-docs`);
});

// Start background workers after the HTTP server is up.
await startWorkers();

// Graceful shutdown: stop workers before closing the HTTP server.
const shutdown = (signal: string) => {
logger.info(`Received ${signal}. Shutting down gracefully...`);
stopWorkers();
server.close(() => {
logger.info('HTTP server closed.');
process.exit(0);
});
};

process.on('SIGTERM', () => shutdown('SIGTERM'));
process.on('SIGINT', () => shutdown('SIGINT'));
} catch (error) {
logger.error('Failed to start server due to database connection error:', error);
process.exit(1);
}
try {
// Validate database connectivity
const { prisma } = await import("./lib/prisma.js");
await prisma.$connect();
await prisma.$queryRaw`SELECT 1`;
logger.info("Database connection established successfully");

const port = process.env.PORT || 3001;
const server = app.listen(port, () => {
logger.info(`Server started on port ${port}`);
logger.info(
`API Documentation available at http://localhost:${port}/api-docs`,
);
});

// Start Soroban indexer + background workers after the HTTP server is up.
sorobanIndexerService.start();
await startWorkers();

// Graceful shutdown: stop workers (and indexer) before closing the HTTP server.
const shutdown = (signal) => {
logger.info(`Received ${signal}. Shutting down gracefully...`);
try {
// Prefer a stop() if your service exposes it; otherwise remove this.
sorobanIndexerService.stop?.();
} catch (err) {
logger.warn("Error while stopping soroban indexer:", err);
}

stopWorkers();
server.close(() => {
logger.info("HTTP server closed.");
process.exit(0);
});
};

process.on("SIGTERM", () => shutdown("SIGTERM"));
process.on("SIGINT", () => shutdown("SIGINT"));
} catch (error) {
logger.error("Failed to start server due to database connection error:", error);
process.exit(1);
}
};

startServer();
startServer();
241 changes: 241 additions & 0 deletions backend/src/services/soroban-indexer.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
import { prisma } from '../lib/prisma.js';
import logger from '../logger.js';

type JsonRecord = Record<string, unknown>;

interface RpcEvent {
id?: string;
ledger?: number;
ledgerSequence?: number;
txHash?: string;
topic?: unknown[];
value?: unknown;
contractId?: string;
}

interface RpcResponse {
result?: {
events?: RpcEvent[];
};
error?: {
message?: string;
};
}

type IndexedEventType = 'CREATED' | 'CANCELLED' | 'WITHDRAWN';

const RPC_URL = process.env.SOROBAN_RPC_URL ?? 'https://soroban-testnet.stellar.org';
const POLL_MS = Number(process.env.SOROBAN_INDEXER_POLL_MS ?? 15000);
const START_LEDGER = Number(process.env.SOROBAN_INDEXER_START_LEDGER ?? 0);
const STREAM_CONTRACT_ID = process.env.STREAM_CONTRACT_ID ?? '';

export class SorobanIndexerService {
private timer: NodeJS.Timeout | null = null;
private running = false;
private lastLedger = START_LEDGER;

start() {
if (this.running) return;
this.running = true;

void this.poll();
this.timer = setInterval(() => {
void this.poll();
}, POLL_MS);

logger.info(`Soroban indexer started (poll=${POLL_MS}ms, startLedger=${this.lastLedger})`);
}

stop() {
if (this.timer) clearInterval(this.timer);
this.timer = null;
this.running = false;
}

private async poll() {
if (!STREAM_CONTRACT_ID) return;

try {
const events = await this.fetchEvents(this.lastLedger + 1);
if (events.length === 0) return;

let maxLedger = this.lastLedger;
for (const event of events) {
const ledger = Number(event.ledgerSequence ?? event.ledger ?? 0);
if (ledger > maxLedger) maxLedger = ledger;
await this.indexEvent(event, ledger);
}

this.lastLedger = maxLedger;
} catch (error) {
logger.error('Soroban indexer poll failed', error);
}
}

private async fetchEvents(startLedger: number): Promise<RpcEvent[]> {
const body = {
jsonrpc: '2.0',
id: 1,
method: 'getEvents',
params: {
startLedger,
filters: [{ type: 'contract', contractIds: [STREAM_CONTRACT_ID] }],
pagination: { limit: 100 },
},
};

const response = await fetch(RPC_URL, {
method: 'POST',
headers: { 'content-type': 'application/json' },
body: JSON.stringify(body),
});

if (!response.ok) {
throw new Error(`getEvents failed: ${response.status}`);
}

const payload = (await response.json()) as RpcResponse;
if (payload.error?.message) throw new Error(payload.error.message);
return payload.result?.events ?? [];
}

private asRecord(value: unknown): JsonRecord | null {
if (!value || typeof value !== 'object' || Array.isArray(value)) return null;
return value as JsonRecord;
}

private parseEventType(event: RpcEvent): IndexedEventType | null {
const firstTopic = Array.isArray(event.topic) && event.topic.length > 0
? String(event.topic[0]).toLowerCase()
: '';

if (firstTopic.includes('stream_created')) return 'CREATED';
if (firstTopic.includes('stream_cancelled')) return 'CANCELLED';
if (firstTopic.includes('tokens_withdrawn')) return 'WITHDRAWN';
return null;
}

private parseStreamId(record: JsonRecord): number | null {
const raw = record.stream_id ?? record.streamId;
if (typeof raw === 'number' && Number.isInteger(raw)) return raw;
if (typeof raw === 'string' && raw.trim()) {
const parsed = Number(raw);
if (Number.isInteger(parsed)) return parsed;
}
return null;
}

private readString(record: JsonRecord, ...keys: string[]): string | null {
for (const key of keys) {
const value = record[key];
if (typeof value === 'string' && value.trim()) return value;
}
return null;
}

private async ensureUser(publicKey: string) {
await prisma.user.upsert({
where: { publicKey },
update: {},
create: { publicKey },
});
}

private async indexEvent(event: RpcEvent, ledgerSequence: number) {
const eventType = this.parseEventType(event);
if (!eventType) return;

const value = this.asRecord(event.value);
if (!value) return;

const streamId = this.parseStreamId(value);
if (!streamId) return;

const txHash = event.txHash ?? event.id ?? `event-${streamId}-${ledgerSequence}-${eventType}`;
const timestamp = Math.floor(Date.now() / 1000);

const existing = await prisma.streamEvent.findFirst({
where: {
streamId,
eventType,
transactionHash: txHash,
ledgerSequence,
},
select: { id: true },
});
if (existing) return;

if (eventType === 'CREATED') {
const sender = this.readString(value, 'sender');
const recipient = this.readString(value, 'recipient');
const tokenAddress = this.readString(value, 'token_address', 'tokenAddress');
const ratePerSecond = this.readString(value, 'rate_per_second', 'ratePerSecond');
const depositedAmount = this.readString(value, 'deposited_amount', 'depositedAmount');
const startTimeRaw = value.start_time ?? value.startTime ?? timestamp;
const startTime = Number(startTimeRaw);

if (!sender || !recipient || !tokenAddress || !ratePerSecond || !depositedAmount) return;

await this.ensureUser(sender);
await this.ensureUser(recipient);

await prisma.stream.upsert({
where: { streamId },
update: {
sender,
recipient,
tokenAddress,
ratePerSecond,
depositedAmount,
lastUpdateTime: Number.isFinite(startTime) ? startTime : timestamp,
isActive: true,
},
create: {
streamId,
sender,
recipient,
tokenAddress,
ratePerSecond,
depositedAmount,
withdrawnAmount: '0',
startTime: Number.isFinite(startTime) ? startTime : timestamp,
lastUpdateTime: Number.isFinite(startTime) ? startTime : timestamp,
isActive: true,
},
});
} else if (eventType === 'CANCELLED') {
await prisma.stream.updateMany({
where: { streamId },
data: { isActive: false, lastUpdateTime: timestamp },
});
} else if (eventType === 'WITHDRAWN') {
const stream = await prisma.stream.findUnique({ where: { streamId } });
if (stream) {
const amount = this.readString(value, 'amount') ?? '0';
const nextWithdrawn = (BigInt(stream.withdrawnAmount) + BigInt(amount)).toString();
await prisma.stream.update({
where: { streamId },
data: {
withdrawnAmount: nextWithdrawn,
lastUpdateTime: timestamp,
isActive: BigInt(nextWithdrawn) < BigInt(stream.depositedAmount),
},
});
}
}

await prisma.streamEvent.create({
data: {
streamId,
eventType,
amount: this.readString(value, 'amount'),
transactionHash: txHash,
ledgerSequence,
timestamp,
metadata: JSON.stringify({ topic: event.topic, value: event.value }),
},
});
}
}

export const sorobanIndexerService = new SorobanIndexerService();
Loading