Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 72 additions & 4 deletions backend/src/controllers/stream.controller.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import type { Request, Response } from 'express';
import { prisma } from '../lib/prisma.js';
import logger from '../logger.js';
import { claimableAmountService } from '../services/claimable.service.js';

/**
* Create a new stream (stub for on-chain indexing)
Expand Down Expand Up @@ -70,10 +71,17 @@ export const listStreams = async (req: Request, res: Response) => {
*/
export const getStream = async (req: Request, res: Response) => {
try {
const { streamId } = req.params;
const streamIdParam = Array.isArray(req.params.streamId)
? req.params.streamId[0]
: req.params.streamId;
const parsedStreamId = Number.parseInt(streamIdParam ?? '', 10);

if (!Number.isFinite(parsedStreamId)) {
return res.status(400).json({ error: 'Invalid streamId parameter' });
}

const stream = await prisma.stream.findUnique({
where: { streamId: parseInt(streamId) },
where: { streamId: parsedStreamId },
include: {
senderUser: true,
recipientUser: true,
Expand All @@ -99,10 +107,17 @@ export const getStream = async (req: Request, res: Response) => {
*/
export const getStreamEvents = async (req: Request, res: Response) => {
try {
const { streamId } = req.params;
const streamIdParam = Array.isArray(req.params.streamId)
? req.params.streamId[0]
: req.params.streamId;
const parsedStreamId = Number.parseInt(streamIdParam ?? '', 10);

if (!Number.isFinite(parsedStreamId)) {
return res.status(400).json({ error: 'Invalid streamId parameter' });
}

const events = await prisma.streamEvent.findMany({
where: { streamId: parseInt(streamId) },
where: { streamId: parsedStreamId },
orderBy: { timestamp: 'desc' }
});

Expand All @@ -112,3 +127,56 @@ export const getStreamEvents = async (req: Request, res: Response) => {
return res.status(500).json({ error: 'Internal server error' });
}
};

/**
* Get actionable claimable amount for a stream (no direct RPC call).
*/
export const getStreamClaimableAmount = async (req: Request, res: Response) => {
try {
const streamIdParam = Array.isArray(req.params.streamId)
? req.params.streamId[0]
: req.params.streamId;
const parsedStreamId = Number.parseInt(streamIdParam ?? '', 10);

if (!Number.isFinite(parsedStreamId)) {
return res.status(400).json({ error: 'Invalid streamId parameter' });
}

const atQuery = req.query.at as string | undefined;
let requestedAt: number | undefined;

if (atQuery !== undefined) {
requestedAt = Number.parseInt(atQuery, 10);
if (!Number.isFinite(requestedAt) || requestedAt < 0) {
return res.status(400).json({
error: 'Invalid query parameter',
message: "'at' must be a non-negative Unix timestamp in seconds",
});
}
}

const stream = await prisma.stream.findUnique({
where: { streamId: parsedStreamId },
select: {
streamId: true,
ratePerSecond: true,
depositedAmount: true,
withdrawnAmount: true,
lastUpdateTime: true,
isActive: true,
updatedAt: true,
},
});

if (!stream) {
return res.status(404).json({ error: 'Stream not found' });
}

const result = claimableAmountService.getClaimableAmount(stream, requestedAt);

return res.status(200).json(result);
} catch (error) {
logger.error('Error calculating stream claimable amount:', error);
return res.status(500).json({ error: 'Internal server error' });
}
};
70 changes: 69 additions & 1 deletion backend/src/routes/v1/stream.routes.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
import { Router } from 'express';
import { createStream, listStreams, getStream, getStreamEvents } from '../../controllers/stream.controller.js';
import {
createStream,
listStreams,
getStream,
getStreamEvents,
getStreamClaimableAmount,
} from '../../controllers/stream.controller.js';

const router = Router();

Expand Down Expand Up @@ -167,4 +173,66 @@ router.get('/:streamId', getStream);
*/
router.get('/:streamId/events', getStreamEvents);

/**
* @openapi
* /v1/streams/{streamId}/claimable:
* get:
* tags:
* - Streams
* summary: Get actionable claimable amount for a stream
* description: |
* Returns the exact actionable amount currently withdrawable from a stream,
* using indexed stream state in PostgreSQL and overflow-safe logic equivalent
* to the Soroban contract's `calculate_claimable` function.
*
* **Performance:**
* - Uses an in-memory cache for hot reads
* - Does not call Soroban RPC for this computation
* parameters:
* - in: path
* name: streamId
* required: true
* schema:
* type: integer
* description: On-chain stream ID
* - in: query
* name: at
* required: false
* schema:
* type: integer
* description: Optional Unix timestamp in seconds used for deterministic calculation
* responses:
* 200:
* description: Claimable amount calculated successfully
* content:
* application/json:
* schema:
* type: object
* properties:
* streamId:
* type: integer
* example: 1
* claimableAmount:
* type: string
* description: Actionable amount currently withdrawable (i128 as string)
* example: "1500"
* actionable:
* type: boolean
* description: Whether a withdrawal is currently actionable
* example: true
* calculatedAt:
* type: integer
* description: Unix timestamp (seconds) used for calculation
* example: 1708534800
* cached:
* type: boolean
* description: Whether response was served from cache
* example: false
* 400:
* description: Invalid streamId or query parameter
* 404:
* description: Stream not found
*/
router.get('/:streamId/claimable', getStreamClaimableAmount);

export default router;
153 changes: 153 additions & 0 deletions backend/src/services/claimable.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
const I128_MAX = (1n << 127n) - 1n;
const I128_MIN = -(1n << 127n);

export interface ClaimableStreamState {
streamId: number;
ratePerSecond: string;
depositedAmount: string;
withdrawnAmount: string;
lastUpdateTime: number;
isActive: boolean;
updatedAt?: Date;
}

export interface ClaimableAmountResult {
streamId: number;
claimableAmount: string;
actionable: boolean;
calculatedAt: number;
cached: boolean;
}

interface ClaimableCacheEntry {
value: Omit<ClaimableAmountResult, 'cached'>;
expiresAtMs: number;
}

interface ClaimableServiceOptions {
cacheTtlMs?: number;
nowMs?: () => number;
}

function clampI128(value: bigint): bigint {
if (value > I128_MAX) return I128_MAX;
if (value < I128_MIN) return I128_MIN;
return value;
}

function saturatingSubI128(a: bigint, b: bigint): bigint {
return clampI128(a - b);
}

function saturatingMulI128(a: bigint, b: bigint): bigint {
return clampI128(a * b);
}

function parseI128(value: string, fieldName: string): bigint {
try {
return clampI128(BigInt(value));
} catch {
throw new Error(`Invalid i128 value for '${fieldName}'`);
}
}

function getStateFingerprint(stream: ClaimableStreamState): string {
if (stream.updatedAt) {
return String(stream.updatedAt.getTime());
}

return [
stream.ratePerSecond,
stream.depositedAmount,
stream.withdrawnAmount,
stream.lastUpdateTime,
stream.isActive ? '1' : '0',
].join(':');
}

/**
* Mirrors Soroban's overflow-safe claimable calculation:
* - elapsed = now.saturating_sub(last_update_time)
* - streamed = (elapsed * rate_per_second) with i128 saturation
* - remaining = deposited_amount.saturating_sub(withdrawn_amount)
* - claimable = min(streamed, remaining)
*/
export class ClaimableAmountService {
private readonly cache = new Map<string, ClaimableCacheEntry>();
private readonly cacheTtlMs: number;
private readonly nowMs: () => number;

constructor(options: ClaimableServiceOptions = {}) {
this.cacheTtlMs = Math.max(0, options.cacheTtlMs ?? 1000);
this.nowMs = options.nowMs ?? (() => Date.now());
}

clearCache(): void {
this.cache.clear();
}

getClaimableAmount(
stream: ClaimableStreamState,
requestedAt?: number,
): ClaimableAmountResult {
const calculatedAt =
requestedAt !== undefined
? Math.floor(requestedAt)
: Math.floor(this.nowMs() / 1000);

const cacheKey = `${stream.streamId}:${getStateFingerprint(stream)}:${calculatedAt}`;
const nowMs = this.nowMs();
const cachedEntry = this.cache.get(cacheKey);

if (cachedEntry && cachedEntry.expiresAtMs > nowMs) {
return {
...cachedEntry.value,
cached: true,
};
}

const streamLastUpdate = BigInt(Math.max(0, stream.lastUpdateTime));
const nowTs = BigInt(Math.max(0, calculatedAt));
const elapsed = nowTs > streamLastUpdate ? nowTs - streamLastUpdate : 0n;

const ratePerSecond = parseI128(stream.ratePerSecond, 'ratePerSecond');
const depositedAmount = parseI128(stream.depositedAmount, 'depositedAmount');
const withdrawnAmount = parseI128(stream.withdrawnAmount, 'withdrawnAmount');

const streamedAmount = saturatingMulI128(elapsed, ratePerSecond);
const remainingAmount = saturatingSubI128(depositedAmount, withdrawnAmount);
const rawClaimable =
streamedAmount > remainingAmount ? remainingAmount : streamedAmount;

// "Actionable" mirrors what a client can withdraw right now.
const actionableAmount =
stream.isActive && rawClaimable > 0n ? rawClaimable : 0n;

const value: Omit<ClaimableAmountResult, 'cached'> = {
streamId: stream.streamId,
claimableAmount: actionableAmount.toString(),
actionable: actionableAmount > 0n,
calculatedAt,
};

this.cache.set(cacheKey, {
value,
expiresAtMs: nowMs + this.cacheTtlMs,
});

return {
...value,
cached: false,
};
}
}

const configuredCacheTtlMs = Number.parseInt(
process.env.CLAIMABLE_CACHE_TTL_MS ?? '1000',
10,
);

export const claimableAmountService = new ClaimableAmountService({
cacheTtlMs: Number.isFinite(configuredCacheTtlMs) ? configuredCacheTtlMs : 1000,
});

Loading
Loading