From a39c45e1754a7c8c3c4522022e44473134320c60 Mon Sep 17 00:00:00 2001 From: Godsmiracle001 <118224169+Godsmiracle001@users.noreply.github.com> Date: Thu, 26 Feb 2026 11:44:20 +0100 Subject: [PATCH] feat(backend): add cached claimable amount endpoint --- backend/src/controllers/stream.controller.ts | 76 ++++++++- backend/src/routes/v1/stream.routes.ts | 70 ++++++++- backend/src/services/claimable.service.ts | 153 +++++++++++++++++++ backend/tests/claimable.service.test.ts | 136 +++++++++++++++++ 4 files changed, 430 insertions(+), 5 deletions(-) create mode 100644 backend/src/services/claimable.service.ts create mode 100644 backend/tests/claimable.service.test.ts diff --git a/backend/src/controllers/stream.controller.ts b/backend/src/controllers/stream.controller.ts index 632b646..68f23b7 100644 --- a/backend/src/controllers/stream.controller.ts +++ b/backend/src/controllers/stream.controller.ts @@ -1,6 +1,7 @@ import type { Request, Response } from 'express'; import { prisma } from '../lib/prisma.js'; import logger from '../logger.js'; +import { claimableAmountService } from '../services/claimable.service.js'; /** * Create a new stream (stub for on-chain indexing) @@ -70,10 +71,17 @@ export const listStreams = async (req: Request, res: Response) => { */ export const getStream = async (req: Request, res: Response) => { try { - const { streamId } = req.params; + const streamIdParam = Array.isArray(req.params.streamId) + ? req.params.streamId[0] + : req.params.streamId; + const parsedStreamId = Number.parseInt(streamIdParam ?? '', 10); + + if (!Number.isFinite(parsedStreamId)) { + return res.status(400).json({ error: 'Invalid streamId parameter' }); + } const stream = await prisma.stream.findUnique({ - where: { streamId: parseInt(streamId) }, + where: { streamId: parsedStreamId }, include: { senderUser: true, recipientUser: true, @@ -99,10 +107,17 @@ export const getStream = async (req: Request, res: Response) => { */ export const getStreamEvents = async (req: Request, res: Response) => { try { - const { streamId } = req.params; + const streamIdParam = Array.isArray(req.params.streamId) + ? req.params.streamId[0] + : req.params.streamId; + const parsedStreamId = Number.parseInt(streamIdParam ?? '', 10); + + if (!Number.isFinite(parsedStreamId)) { + return res.status(400).json({ error: 'Invalid streamId parameter' }); + } const events = await prisma.streamEvent.findMany({ - where: { streamId: parseInt(streamId) }, + where: { streamId: parsedStreamId }, orderBy: { timestamp: 'desc' } }); @@ -112,3 +127,56 @@ export const getStreamEvents = async (req: Request, res: Response) => { return res.status(500).json({ error: 'Internal server error' }); } }; + +/** + * Get actionable claimable amount for a stream (no direct RPC call). + */ +export const getStreamClaimableAmount = async (req: Request, res: Response) => { + try { + const streamIdParam = Array.isArray(req.params.streamId) + ? req.params.streamId[0] + : req.params.streamId; + const parsedStreamId = Number.parseInt(streamIdParam ?? '', 10); + + if (!Number.isFinite(parsedStreamId)) { + return res.status(400).json({ error: 'Invalid streamId parameter' }); + } + + const atQuery = req.query.at as string | undefined; + let requestedAt: number | undefined; + + if (atQuery !== undefined) { + requestedAt = Number.parseInt(atQuery, 10); + if (!Number.isFinite(requestedAt) || requestedAt < 0) { + return res.status(400).json({ + error: 'Invalid query parameter', + message: "'at' must be a non-negative Unix timestamp in seconds", + }); + } + } + + const stream = await prisma.stream.findUnique({ + where: { streamId: parsedStreamId }, + select: { + streamId: true, + ratePerSecond: true, + depositedAmount: true, + withdrawnAmount: true, + lastUpdateTime: true, + isActive: true, + updatedAt: true, + }, + }); + + if (!stream) { + return res.status(404).json({ error: 'Stream not found' }); + } + + const result = claimableAmountService.getClaimableAmount(stream, requestedAt); + + return res.status(200).json(result); + } catch (error) { + logger.error('Error calculating stream claimable amount:', error); + return res.status(500).json({ error: 'Internal server error' }); + } +}; diff --git a/backend/src/routes/v1/stream.routes.ts b/backend/src/routes/v1/stream.routes.ts index 02a564f..c670816 100644 --- a/backend/src/routes/v1/stream.routes.ts +++ b/backend/src/routes/v1/stream.routes.ts @@ -1,5 +1,11 @@ import { Router } from 'express'; -import { createStream, listStreams, getStream, getStreamEvents } from '../../controllers/stream.controller.js'; +import { + createStream, + listStreams, + getStream, + getStreamEvents, + getStreamClaimableAmount, +} from '../../controllers/stream.controller.js'; const router = Router(); @@ -167,4 +173,66 @@ router.get('/:streamId', getStream); */ router.get('/:streamId/events', getStreamEvents); +/** + * @openapi + * /v1/streams/{streamId}/claimable: + * get: + * tags: + * - Streams + * summary: Get actionable claimable amount for a stream + * description: | + * Returns the exact actionable amount currently withdrawable from a stream, + * using indexed stream state in PostgreSQL and overflow-safe logic equivalent + * to the Soroban contract's `calculate_claimable` function. + * + * **Performance:** + * - Uses an in-memory cache for hot reads + * - Does not call Soroban RPC for this computation + * parameters: + * - in: path + * name: streamId + * required: true + * schema: + * type: integer + * description: On-chain stream ID + * - in: query + * name: at + * required: false + * schema: + * type: integer + * description: Optional Unix timestamp in seconds used for deterministic calculation + * responses: + * 200: + * description: Claimable amount calculated successfully + * content: + * application/json: + * schema: + * type: object + * properties: + * streamId: + * type: integer + * example: 1 + * claimableAmount: + * type: string + * description: Actionable amount currently withdrawable (i128 as string) + * example: "1500" + * actionable: + * type: boolean + * description: Whether a withdrawal is currently actionable + * example: true + * calculatedAt: + * type: integer + * description: Unix timestamp (seconds) used for calculation + * example: 1708534800 + * cached: + * type: boolean + * description: Whether response was served from cache + * example: false + * 400: + * description: Invalid streamId or query parameter + * 404: + * description: Stream not found + */ +router.get('/:streamId/claimable', getStreamClaimableAmount); + export default router; diff --git a/backend/src/services/claimable.service.ts b/backend/src/services/claimable.service.ts new file mode 100644 index 0000000..705ef67 --- /dev/null +++ b/backend/src/services/claimable.service.ts @@ -0,0 +1,153 @@ +const I128_MAX = (1n << 127n) - 1n; +const I128_MIN = -(1n << 127n); + +export interface ClaimableStreamState { + streamId: number; + ratePerSecond: string; + depositedAmount: string; + withdrawnAmount: string; + lastUpdateTime: number; + isActive: boolean; + updatedAt?: Date; +} + +export interface ClaimableAmountResult { + streamId: number; + claimableAmount: string; + actionable: boolean; + calculatedAt: number; + cached: boolean; +} + +interface ClaimableCacheEntry { + value: Omit; + expiresAtMs: number; +} + +interface ClaimableServiceOptions { + cacheTtlMs?: number; + nowMs?: () => number; +} + +function clampI128(value: bigint): bigint { + if (value > I128_MAX) return I128_MAX; + if (value < I128_MIN) return I128_MIN; + return value; +} + +function saturatingSubI128(a: bigint, b: bigint): bigint { + return clampI128(a - b); +} + +function saturatingMulI128(a: bigint, b: bigint): bigint { + return clampI128(a * b); +} + +function parseI128(value: string, fieldName: string): bigint { + try { + return clampI128(BigInt(value)); + } catch { + throw new Error(`Invalid i128 value for '${fieldName}'`); + } +} + +function getStateFingerprint(stream: ClaimableStreamState): string { + if (stream.updatedAt) { + return String(stream.updatedAt.getTime()); + } + + return [ + stream.ratePerSecond, + stream.depositedAmount, + stream.withdrawnAmount, + stream.lastUpdateTime, + stream.isActive ? '1' : '0', + ].join(':'); +} + +/** + * Mirrors Soroban's overflow-safe claimable calculation: + * - elapsed = now.saturating_sub(last_update_time) + * - streamed = (elapsed * rate_per_second) with i128 saturation + * - remaining = deposited_amount.saturating_sub(withdrawn_amount) + * - claimable = min(streamed, remaining) + */ +export class ClaimableAmountService { + private readonly cache = new Map(); + private readonly cacheTtlMs: number; + private readonly nowMs: () => number; + + constructor(options: ClaimableServiceOptions = {}) { + this.cacheTtlMs = Math.max(0, options.cacheTtlMs ?? 1000); + this.nowMs = options.nowMs ?? (() => Date.now()); + } + + clearCache(): void { + this.cache.clear(); + } + + getClaimableAmount( + stream: ClaimableStreamState, + requestedAt?: number, + ): ClaimableAmountResult { + const calculatedAt = + requestedAt !== undefined + ? Math.floor(requestedAt) + : Math.floor(this.nowMs() / 1000); + + const cacheKey = `${stream.streamId}:${getStateFingerprint(stream)}:${calculatedAt}`; + const nowMs = this.nowMs(); + const cachedEntry = this.cache.get(cacheKey); + + if (cachedEntry && cachedEntry.expiresAtMs > nowMs) { + return { + ...cachedEntry.value, + cached: true, + }; + } + + const streamLastUpdate = BigInt(Math.max(0, stream.lastUpdateTime)); + const nowTs = BigInt(Math.max(0, calculatedAt)); + const elapsed = nowTs > streamLastUpdate ? nowTs - streamLastUpdate : 0n; + + const ratePerSecond = parseI128(stream.ratePerSecond, 'ratePerSecond'); + const depositedAmount = parseI128(stream.depositedAmount, 'depositedAmount'); + const withdrawnAmount = parseI128(stream.withdrawnAmount, 'withdrawnAmount'); + + const streamedAmount = saturatingMulI128(elapsed, ratePerSecond); + const remainingAmount = saturatingSubI128(depositedAmount, withdrawnAmount); + const rawClaimable = + streamedAmount > remainingAmount ? remainingAmount : streamedAmount; + + // "Actionable" mirrors what a client can withdraw right now. + const actionableAmount = + stream.isActive && rawClaimable > 0n ? rawClaimable : 0n; + + const value: Omit = { + streamId: stream.streamId, + claimableAmount: actionableAmount.toString(), + actionable: actionableAmount > 0n, + calculatedAt, + }; + + this.cache.set(cacheKey, { + value, + expiresAtMs: nowMs + this.cacheTtlMs, + }); + + return { + ...value, + cached: false, + }; + } +} + +const configuredCacheTtlMs = Number.parseInt( + process.env.CLAIMABLE_CACHE_TTL_MS ?? '1000', + 10, +); + +export const claimableAmountService = new ClaimableAmountService({ + cacheTtlMs: Number.isFinite(configuredCacheTtlMs) ? configuredCacheTtlMs : 1000, +}); + diff --git a/backend/tests/claimable.service.test.ts b/backend/tests/claimable.service.test.ts new file mode 100644 index 0000000..f658bad --- /dev/null +++ b/backend/tests/claimable.service.test.ts @@ -0,0 +1,136 @@ +import { describe, expect, it } from 'vitest'; +import { ClaimableAmountService } from '../src/services/claimable.service.js'; + +describe('ClaimableAmountService', () => { + it('calculates claimable amount for active stream', () => { + const service = new ClaimableAmountService({ + cacheTtlMs: 5_000, + nowMs: () => 10_000, + }); + + const result = service.getClaimableAmount({ + streamId: 1, + ratePerSecond: '5', + depositedAmount: '500', + withdrawnAmount: '100', + lastUpdateTime: 7, + isActive: true, + }); + + // elapsed = 10 - 7 = 3 + // streamed = 3 * 5 = 15 + // remaining = 500 - 100 = 400 + // claimable = min(15, 400) = 15 + expect(result.claimableAmount).toBe('15'); + expect(result.actionable).toBe(true); + expect(result.cached).toBe(false); + }); + + it('caps claimable amount at remaining balance', () => { + const service = new ClaimableAmountService({ + cacheTtlMs: 5_000, + nowMs: () => 100_000, + }); + + const result = service.getClaimableAmount({ + streamId: 2, + ratePerSecond: '10', + depositedAmount: '1000', + withdrawnAmount: '900', + lastUpdateTime: 0, + isActive: true, + }); + + expect(result.claimableAmount).toBe('100'); + expect(result.actionable).toBe(true); + }); + + it('returns zero when stream is inactive', () => { + const service = new ClaimableAmountService({ + cacheTtlMs: 5_000, + nowMs: () => 100_000, + }); + + const result = service.getClaimableAmount({ + streamId: 3, + ratePerSecond: '10', + depositedAmount: '1000', + withdrawnAmount: '100', + lastUpdateTime: 0, + isActive: false, + }); + + expect(result.claimableAmount).toBe('0'); + expect(result.actionable).toBe(false); + }); + + it('returns zero when withdrawn exceeds deposited (non-actionable)', () => { + const service = new ClaimableAmountService({ + cacheTtlMs: 5_000, + nowMs: () => 100_000, + }); + + const result = service.getClaimableAmount({ + streamId: 4, + ratePerSecond: '10', + depositedAmount: '100', + withdrawnAmount: '150', + lastUpdateTime: 0, + isActive: true, + }); + + expect(result.claimableAmount).toBe('0'); + expect(result.actionable).toBe(false); + }); + + it('uses cache for repeated request with same stream state + timestamp', () => { + let now = 5_000; + const service = new ClaimableAmountService({ + cacheTtlMs: 10_000, + nowMs: () => now, + }); + + const input = { + streamId: 5, + ratePerSecond: '7', + depositedAmount: '700', + withdrawnAmount: '0', + lastUpdateTime: 0, + isActive: true, + }; + + const first = service.getClaimableAmount(input, 5); + const second = service.getClaimableAmount(input, 5); + + expect(first.cached).toBe(false); + expect(second.cached).toBe(true); + + // Advance local clock beyond cache TTL + now = 20_001; + const third = service.getClaimableAmount(input, 5); + expect(third.cached).toBe(false); + }); + + it('saturates overflow-safe multiplication to i128 max', () => { + const i128Max = ((1n << 127n) - 1n).toString(); + const service = new ClaimableAmountService({ + cacheTtlMs: 5_000, + nowMs: () => 1_000, + }); + + const result = service.getClaimableAmount({ + streamId: 6, + ratePerSecond: i128Max, + depositedAmount: i128Max, + withdrawnAmount: '0', + lastUpdateTime: 998, + isActive: true, + }); + + // elapsed=2 => streamed overflows i128, saturates to i128 max + // remaining=i128 max => claimable=i128 max + expect(result.claimableAmount).toBe(i128Max); + expect(result.actionable).toBe(true); + }); +}); +