From d27e30b406663979ec8fc938b7003ecbee07771c Mon Sep 17 00:00:00 2001 From: victorEdeh Date: Sat, 28 Mar 2026 17:02:28 -0700 Subject: [PATCH] Add Real-Time Balance Sync with Stellar Horizon --- .../specs/realtime-balance-sync/.config.kiro | 1 + .kiro/specs/realtime-balance-sync/design.md | 315 +++++++++++++ .../realtime-balance-sync/requirements.md | 125 ++++++ .kiro/specs/realtime-balance-sync/tasks.md | 181 ++++++++ backend/package.json | 1 + backend/pnpm-lock.yaml | 16 + backend/src/config/configuration.ts | 7 + .../entities/protocol-metrics.entity.ts | 3 + .../blockchain/balance-sync.service.spec.ts | 189 ++++++++ .../blockchain/balance-sync.service.ts | 421 ++++++++++++++++++ .../modules/blockchain/balance-sync.types.ts | 61 +++ .../blockchain/blockchain.controller.ts | 13 +- .../modules/blockchain/blockchain.module.ts | 5 + .../blockchain/savings.service.spec.ts | 122 +++++ .../src/modules/blockchain/savings.service.ts | 100 ++++- 15 files changed, 1540 insertions(+), 20 deletions(-) create mode 100644 .kiro/specs/realtime-balance-sync/.config.kiro create mode 100644 .kiro/specs/realtime-balance-sync/design.md create mode 100644 .kiro/specs/realtime-balance-sync/requirements.md create mode 100644 .kiro/specs/realtime-balance-sync/tasks.md create mode 100644 backend/src/modules/blockchain/balance-sync.service.spec.ts create mode 100644 backend/src/modules/blockchain/balance-sync.service.ts create mode 100644 backend/src/modules/blockchain/balance-sync.types.ts create mode 100644 backend/src/modules/blockchain/savings.service.spec.ts diff --git a/.kiro/specs/realtime-balance-sync/.config.kiro b/.kiro/specs/realtime-balance-sync/.config.kiro new file mode 100644 index 000000000..61df1c4e4 --- /dev/null +++ b/.kiro/specs/realtime-balance-sync/.config.kiro @@ -0,0 +1 @@ +{"specId": "c4910d7c-e290-4648-943c-a9f821dac1f5", "workflowType": "requirements-first", "specType": "feature"} diff --git a/.kiro/specs/realtime-balance-sync/design.md b/.kiro/specs/realtime-balance-sync/design.md new file mode 100644 index 000000000..c819d0352 --- /dev/null +++ b/.kiro/specs/realtime-balance-sync/design.md @@ -0,0 +1,315 @@ +# Design Document: Realtime Balance Sync + +## Overview + +This feature replaces direct Horizon HTTP calls in `SavingsService.getWalletBalance()` and `getUserSavingsBalance()` with a streaming-first, cache-backed architecture. A new `BalanceSyncService` opens a Stellar Horizon SSE stream per subscribed account, writes every incoming balance update to Redis, and emits `BalanceChangedEvent` via `@nestjs/event-emitter` when a balance actually changes. When a stream drops, exponential back-off reconnection kicks in and a per-account polling fallback keeps the cache fresh. Connection health counters are persisted to `ProtocolMetrics` on a configurable schedule and exposed through `BlockchainController`. + +The design fits entirely inside the existing `BlockchainModule` (already `@Global()`), reuses `RpcClientWrapper`, `@nestjs/cache-manager` (Redis-backed), and `@nestjs/event-emitter` — no new infrastructure dependencies are required. + +--- + +## Architecture + +```mermaid +flowchart TD + subgraph BlockchainModule [BlockchainModule (Global)] + BSS[BalanceSyncService] + SS[StellarService] + SavS[SavingsService] + RPC[RpcClientWrapper] + BSS -->|getCurrentHorizonServer| RPC + BSS -->|executeWithRetry| RPC + SavS -->|cache-aside read| CACHE + SavS -->|subscribe on miss| BSS + end + + subgraph Infrastructure + CACHE[(Redis / cache-manager)] + DB[(PostgreSQL / ProtocolMetrics)] + EE[EventEmitter2] + end + + HORIZON[Stellar Horizon SSE] + + HORIZON -->|account stream| BSS + BSS -->|write balance| CACHE + BSS -->|BalanceChangedEvent| EE + BSS -->|persist metrics snapshot| DB + EE -->|@OnEvent| NotificationsService + EE -->|@OnEvent| AnalyticsService +``` + +### Key Design Decisions + +1. **One stream handle per account** — The Stellar SDK's `server.accounts().accountId(id).stream()` returns a `() => void` close function. `BalanceSyncService` stores these handles in a `Map` keyed by public key, enabling O(1) subscribe/unsubscribe at runtime. + +2. **Reconnection loop per account** — Each account has its own independent reconnect state (delay, attempt count, timer). This prevents a single flapping account from blocking reconnection of healthy accounts. + +3. **Polling fallback is per-account** — When a stream for account A is reconnecting, only account A falls back to polling. Accounts with healthy streams are unaffected. + +4. **Cache-aside in SavingsService** — `getWalletBalance()` checks the cache first; on a miss it fetches from Horizon, populates the cache, and returns. This is a read-through pattern that requires no changes to callers. + +5. **ProtocolMetrics extension** — Rather than a new entity, a `jsonb` column `connectionMetrics` is added to `ProtocolMetrics` to store the per-account snapshot. This avoids a new migration table while keeping metrics queryable. + +--- + +## Components and Interfaces + +### BalanceSyncService + +```typescript +@Injectable() +export class BalanceSyncService implements OnModuleInit, OnModuleDestroy { + /** Subscribe an account for real-time balance tracking. Idempotent. */ + subscribe(publicKey: string): void; + + /** Unsubscribe an account and close its stream. Idempotent. */ + unsubscribe(publicKey: string): void; + + /** Return aggregated metrics for all subscribed accounts. */ + getMetricsSummary(): ConnectionMetricsSummary; +} +``` + +### BalanceChangedEvent + +```typescript +export class BalanceChangedEvent { + /** Stellar G... public key of the account */ + accountId: string; + /** Asset code: 'native' for XLM, otherwise the asset code string */ + assetCode: string; + /** Balance before this update (in stroops as a string to avoid float loss) */ + previousBalance: string; + /** Balance after this update */ + newBalance: string; + /** UTC timestamp of the change */ + changedAt: Date; +} +``` + +Event name constant: `'balance.changed'` + +### ConnectionMetricsSummary + +```typescript +export interface AccountMetrics { + publicKey: string; + streamUptimeSeconds: number; + reconnectCount: number; + fallbackActive: boolean; + connectedAt: Date | null; +} + +export interface ConnectionMetricsSummary { + accounts: AccountMetrics[]; + anyFallbackActive: boolean; + totalReconnects: number; +} +``` + +### Cache Key Schema + +| Key pattern | Value | TTL | +|---|---|---| +| `balance:{publicKey}:{assetCode}` | JSON string `{ balance: string, updatedAt: string }` | Configurable, default 300 s | + +Example: `balance:GAAZI4TCR3TY5OJHCTJC2A4QSY6CJWJH5IAJTGKIN2ER7LBNVKOCCWN:native` + +### SavingsService changes + +`getWalletBalance(publicKey, asset)` gains a cache-aside read: + +``` +1. key = `balance:${publicKey}:${asset ?? 'native'}` +2. hit = await cacheManager.get(key) +3. if hit → return parsed balance +4. balance = await horizonServer.accounts().accountId(publicKey).call() [existing logic] +5. await cacheManager.set(key, JSON.stringify({ balance, updatedAt: now }), cacheTtl) +6. return balance +``` + +`getUserSavingsBalance(publicKey)` similarly checks the cache for each asset before hitting Horizon. + +--- + +## Data Models + +### StreamHandle (in-memory only) + +```typescript +interface StreamHandle { + /** Close function returned by Stellar SDK stream() */ + close: () => void; + /** Whether the stream is currently considered connected */ + connected: boolean; + /** Reconnect back-off state */ + reconnect: { + delayMs: number; + timer: NodeJS.Timeout | null; + }; + /** Polling fallback interval handle */ + pollTimer: NodeJS.Timeout | null; + /** Metrics for this account */ + metrics: AccountMetrics; +} +``` + +### ProtocolMetrics entity extension + +A new nullable `jsonb` column is added to the existing `ProtocolMetrics` entity: + +```typescript +@Column('jsonb', { nullable: true }) +connectionMetrics: ConnectionMetricsSummary | null; +``` + +This column stores the snapshot written by `BalanceSyncService` on its persistence interval. + +### Configuration keys added to `configuration.ts` + +```typescript +balanceSync: { + cacheTtlSeconds: parseInt(process.env.BALANCE_CACHE_TTL_SECONDS || '300', 10), + pollIntervalMs: parseInt(process.env.BALANCE_POLL_INTERVAL_MS || '5000', 10), + reconnectInitialDelayMs:parseInt(process.env.BALANCE_RECONNECT_INIT_MS || '1000', 10), + reconnectMaxDelayMs: parseInt(process.env.BALANCE_RECONNECT_MAX_MS || '60000', 10), + metricsPersistIntervalMs:parseInt(process.env.BALANCE_METRICS_PERSIST_MS || '60000', 10), +} +``` + +--- + +## Correctness Properties + +*A property is a characteristic or behavior that should hold true across all valid executions of a system — essentially, a formal statement about what the system should do. Properties serve as the bridge between human-readable specifications and machine-verifiable correctness guarantees.* + +### Property 1: Balance cache round-trip + +*For any* valid Stellar account update delivered by a HorizonStream, writing the balance to the BalanceCache and then immediately reading it back should return a value equal to the written balance. + +**Validates: Requirements 2.1, 2.5** + +--- + +### Property 2: Only changed balances emit events + +*For any* sequence of account updates for a given account and asset, a `BalanceChangedEvent` should be emitted if and only if the new balance differs from the previously cached balance. + +**Validates: Requirements 3.1, 3.3** + +--- + +### Property 3: BalanceChangedEvent contains required fields + +*For any* `BalanceChangedEvent` emitted by `BalanceSyncService`, the event object should contain a non-empty `accountId`, a non-empty `assetCode`, a `previousBalance`, a `newBalance`, and a `changedAt` timestamp. + +**Validates: Requirements 3.2** + +--- + +### Property 4: Per-asset event emission + +*For any* account update that changes balances for N assets, exactly N `BalanceChangedEvent` instances should be emitted — one per changed asset. + +**Validates: Requirements 3.4** + +--- + +### Property 5: Exponential back-off bounds + +*For any* sequence of consecutive reconnection failures for a given account, the delay before each attempt should be `min(initialDelay * 2^attempt, maxDelay)`, and should never exceed `maxDelay`. + +**Validates: Requirements 4.1** + +--- + +### Property 6: Fallback activates on disconnect, deactivates on reconnect + +*For any* account, if its stream disconnects then the polling fallback should become active; when the stream reconnects the polling fallback should become inactive. This is a round-trip property: `disconnect → fallbackActive=true`, `reconnect → fallbackActive=false`. + +**Validates: Requirements 4.5, 5.2, 5.4** + +--- + +### Property 7: Cache-aside read returns cached value on hit + +*For any* account and asset for which a non-expired cache entry exists, `SavingsService.getWalletBalance()` should return the cached balance without issuing a Horizon HTTP request. + +**Validates: Requirements 7.1, 7.3** + +--- + +### Property 8: Cache-aside populates cache on miss + +*For any* account and asset for which no cache entry exists, after `SavingsService.getWalletBalance()` returns, the BalanceCache should contain an entry for that account and asset equal to the returned balance. + +**Validates: Requirements 7.2** + +--- + +### Property 9: Configuration defaults applied on missing values + +*For any* configuration key in the `balanceSync` namespace that is absent from the environment, `BalanceSyncService` should use the documented default value and the service should start without throwing. + +**Validates: Requirements 8.2** + +--- + +### Property 10: Poll interval validation rejects out-of-range values + +*For any* configured `pollIntervalMs` value that is ≤ 0 or > 60 000, `BalanceSyncService` should log an error at startup and substitute the default value of 5 000 ms. + +**Validates: Requirements 8.3** + +--- + +## Error Handling + +| Scenario | Behaviour | +|---|---| +| Cache write fails | Log error at `warn` level; continue processing subsequent updates (Req 2.4) | +| Stream `onerror` fires | Trigger reconnect loop; activate polling fallback for that account | +| All Horizon endpoints unreachable during polling fallback | `executeWithRetry` exhausts retries; log error; next poll tick retries | +| `subscribe()` called for already-subscribed account | No-op (idempotent) | +| `unsubscribe()` called for unknown account | No-op | +| Config value out of range (`pollIntervalMs`) | Log error; use default; do not throw | +| Config value absent | Log warning; use default; do not throw | +| Module destroyed while reconnect timer is pending | `OnModuleDestroy` cancels all timers and closes all streams | + +--- + +## Testing Strategy + +### Unit tests (Jest) + +Focus on specific examples, integration points, and error conditions: + +- `BalanceSyncService.subscribe()` opens a stream and stores the handle +- `BalanceSyncService.unsubscribe()` closes the stream and removes the handle +- Cache write failure is caught and does not propagate +- `BalanceChangedEvent` is not emitted when balance is unchanged +- `OnModuleDestroy` closes all open streams and cancels all timers +- `SavingsService.getWalletBalance()` returns cached value without calling Horizon +- `SavingsService.getWalletBalance()` calls Horizon and populates cache on miss +- Config validation logs error and uses default for out-of-range `pollIntervalMs` + +### Property-based tests (fast-check) + +The project uses Jest as the test runner. [fast-check](https://fast-check.io/) is the property-based testing library to add (`pnpm add -D fast-check`). Each property test runs a minimum of **100 iterations**. + +Each test must be tagged with a comment in the format: +`// Feature: realtime-balance-sync, Property N: ` + +| Property | Test description | +|---|---| +| P1: Balance cache round-trip | Generate random public keys, asset codes, and balance strings; write to cache mock; read back; assert equality | +| P2: Only changed balances emit events | Generate sequences of balance updates (some identical, some different); assert event count equals number of actual changes | +| P3: BalanceChangedEvent fields | Generate random account updates; assert every emitted event has all required non-empty fields | +| P4: Per-asset event emission | Generate account updates with N changed assets; assert exactly N events emitted | +| P5: Exponential back-off bounds | Generate random attempt counts and config values; assert computed delay equals `min(init * 2^n, max)` and never exceeds max | +| P6: Fallback round-trip | Simulate disconnect then reconnect; assert fallback transitions correctly | +| P7: Cache-aside hit | Seed cache with a value; call `getWalletBalance()`; assert Horizon mock not called | +| P8: Cache-aside miss populates | Empty cache; call `getWalletBalance()`; assert cache contains the returned value | +| P9: Config defaults | Generate absent/undefined config values; assert service uses documented defaults | +| P10: Poll interval validation | Generate out-of-range integers; assert default substituted and error logged | diff --git a/.kiro/specs/realtime-balance-sync/requirements.md b/.kiro/specs/realtime-balance-sync/requirements.md new file mode 100644 index 000000000..9f1ae11ce --- /dev/null +++ b/.kiro/specs/realtime-balance-sync/requirements.md @@ -0,0 +1,125 @@ +# Requirements Document + +## Introduction + +This feature replaces the existing polling-based balance retrieval in the NestJS backend with a WebSocket-based streaming service that subscribes to Stellar Horizon account updates in real time. The service maintains a Redis cache of account balances, emits internal events on balance changes via `@nestjs/event-emitter`, and falls back to the existing polling pattern when the WebSocket connection is unavailable. Connection health is exposed through the existing metrics infrastructure. + +## Glossary + +- **BalanceSyncService**: The new NestJS injectable service responsible for managing the Horizon WebSocket stream, updating the Redis cache, and emitting balance-change events. +- **HorizonStream**: The server-sent event / streaming connection opened via the Stellar SDK's `Horizon.Server.accounts().stream()` API. +- **BalanceCache**: The Redis cache (accessed via `@nestjs/cache-manager`) that stores the latest known balance for each subscribed Stellar account. +- **BalanceChangedEvent**: The internal application event emitted through `@nestjs/event-emitter` when a balance update is detected. +- **PollingFallback**: The periodic HTTP-based balance fetch that activates when the HorizonStream is unavailable. +- **RpcClientWrapper**: The existing wrapper class that provides `getCurrentHorizonServer()` and `executeWithRetry()` for Horizon access. +- **BlockchainService**: Refers collectively to `StellarService` and `SavingsService` within the `BlockchainModule`. +- **ConnectionMetrics**: The set of counters and gauges (reconnect count, stream uptime, fallback-active flag) tracked in the `ProtocolMetrics` entity. +- **Subscriber**: A Stellar account public key registered for real-time balance tracking. + +--- + +## Requirements + +### Requirement 1: WebSocket Stream Connection to Horizon + +**User Story:** As a backend engineer, I want the system to open a streaming connection to Stellar Horizon for each subscribed account, so that balance updates are received in real time without polling. + +#### Acceptance Criteria + +1. WHEN `BalanceSyncService` initialises, THE `BalanceSyncService` SHALL open a `HorizonStream` for each registered `Subscriber` using `RpcClientWrapper.getCurrentHorizonServer()`. +2. THE `BalanceSyncService` SHALL support subscribing and unsubscribing individual `Subscriber` accounts at runtime without restarting the service. +3. WHILE a `HorizonStream` is open, THE `BalanceSyncService` SHALL process each incoming account update within 500 ms of receipt. +4. THE `BalanceSyncService` SHALL close all open `HorizonStream` connections gracefully when the NestJS module is destroyed (`OnModuleDestroy`). + +--- + +### Requirement 2: Real-Time Balance Cache Updates + +**User Story:** As a backend engineer, I want every incoming Horizon account update to be written to Redis immediately, so that other services always read a fresh balance without hitting Horizon directly. + +#### Acceptance Criteria + +1. WHEN a `HorizonStream` delivers an account update, THE `BalanceSyncService` SHALL write the updated balance to the `BalanceCache` keyed by the account's public key. +2. THE `BalanceSyncService` SHALL store balances for all asset types present in the account record (native XLM and any non-native assets). +3. THE `BalanceSyncService` SHALL set a configurable TTL on each `BalanceCache` entry, defaulting to 300 seconds. +4. IF a `BalanceCache` write fails, THEN THE `BalanceSyncService` SHALL log the error and continue processing subsequent updates without interruption. +5. FOR ALL valid account updates, writing then reading the `BalanceCache` SHALL return a value equal to the written balance (round-trip property). + +--- + +### Requirement 3: Balance Change Event Emission + +**User Story:** As a backend engineer, I want the system to emit an internal event whenever a balance changes, so that other modules (e.g., notifications, analytics) can react without polling the cache. + +#### Acceptance Criteria + +1. WHEN a `HorizonStream` delivers an account update whose balance differs from the previously cached value, THE `BalanceSyncService` SHALL emit a `BalanceChangedEvent` via `@nestjs/event-emitter`. +2. THE `BalanceChangedEvent` SHALL include the account public key, the previous balance, the new balance, the asset code, and the UTC timestamp of the change. +3. WHEN a `HorizonStream` delivers an account update whose balance is identical to the cached value, THE `BalanceSyncService` SHALL NOT emit a `BalanceChangedEvent`. +4. THE `BalanceSyncService` SHALL emit `BalanceChangedEvent` for each asset whose balance changed independently. + +--- + +### Requirement 4: Automatic Reconnection on Connection Loss + +**User Story:** As a backend engineer, I want the streaming connection to reconnect automatically after a failure, so that balance data remains current without manual intervention. + +#### Acceptance Criteria + +1. IF a `HorizonStream` closes unexpectedly, THEN THE `BalanceSyncService` SHALL attempt to reconnect using exponential back-off starting at 1 second, doubling on each attempt, up to a maximum interval of 60 seconds. +2. THE `BalanceSyncService` SHALL attempt reconnection indefinitely until the stream is re-established or the module is destroyed. +3. WHEN a reconnection attempt succeeds, THE `BalanceSyncService` SHALL log the recovery and reset the back-off interval to its initial value. +4. WHEN a reconnection attempt fails, THE `BalanceSyncService` SHALL increment the `ConnectionMetrics` reconnect counter. +5. WHILE reconnection attempts are in progress, THE `BalanceSyncService` SHALL activate the `PollingFallback` for affected `Subscriber` accounts. + +--- + +### Requirement 5: Polling Fallback + +**User Story:** As a backend engineer, I want the system to fall back to HTTP polling when the WebSocket stream is unavailable, so that balance data does not become stale during outages. + +#### Acceptance Criteria + +1. WHILE the `HorizonStream` for a `Subscriber` is not connected, THE `BalanceSyncService` SHALL poll Horizon for that account's balance at a configurable interval, defaulting to 5 seconds, using `RpcClientWrapper.executeWithRetry()`. +2. WHEN the `HorizonStream` for a `Subscriber` is re-established, THE `BalanceSyncService` SHALL deactivate the `PollingFallback` for that account and resume streaming. +3. WHILE the `PollingFallback` is active, THE `BalanceSyncService` SHALL apply the same cache-write and event-emission logic as the streaming path. +4. THE `BalanceSyncService` SHALL set the `ConnectionMetrics` fallback-active flag to `true` while any `Subscriber` is in polling mode and `false` when all streams are active. + +--- + +### Requirement 6: Connection Health Metrics + +**User Story:** As an operator, I want WebSocket connection health to be tracked in the existing metrics infrastructure, so that I can monitor stream reliability and detect degraded states. + +#### Acceptance Criteria + +1. THE `BalanceSyncService` SHALL record the following `ConnectionMetrics` for each `Subscriber`: stream uptime in seconds, total reconnect count, and whether the `PollingFallback` is currently active. +2. WHEN a `HorizonStream` connects successfully, THE `BalanceSyncService` SHALL record the connection timestamp and begin incrementing stream uptime. +3. WHEN a `HorizonStream` disconnects, THE `BalanceSyncService` SHALL stop incrementing stream uptime for that `Subscriber`. +4. THE `BalanceSyncService` SHALL expose an aggregated metrics summary accessible via an injected method so that `BlockchainController` can include it in health-check responses. +5. THE `BalanceSyncService` SHALL persist `ConnectionMetrics` snapshots to the `ProtocolMetrics` entity at a configurable interval, defaulting to 60 seconds. + +--- + +### Requirement 7: BlockchainService Integration + +**User Story:** As a backend engineer, I want `StellarService` and `SavingsService` to read balances from the `BalanceCache` when available, so that they serve fresh data without issuing redundant Horizon requests. + +#### Acceptance Criteria + +1. WHEN `SavingsService.getWalletBalance()` is called for a `Subscriber` account, THE `SavingsService` SHALL return the balance from the `BalanceCache` if a non-expired entry exists. +2. IF no `BalanceCache` entry exists for the requested account, THEN THE `SavingsService` SHALL fetch the balance from Horizon via `RpcClientWrapper.executeWithRetry()` and populate the cache before returning. +3. WHEN `SavingsService.getUserSavingsBalance()` is called, THE `SavingsService` SHALL use cached balances for any assets already present in the `BalanceCache`. +4. THE `BlockchainModule` SHALL register `BalanceSyncService` as a provider and export it so that other modules can subscribe accounts and read metrics. + +--- + +### Requirement 8: Configuration + +**User Story:** As a backend engineer, I want all tunable parameters of the sync service to be driven by `ConfigService`, so that they can be adjusted per environment without code changes. + +#### Acceptance Criteria + +1. THE `BalanceSyncService` SHALL read the following values from `ConfigService`: Horizon URL (`stellar.horizonUrl`), fallback Horizon URLs (`stellar.horizonFallbackUrls`), cache TTL in seconds, polling fallback interval in milliseconds, reconnect initial delay in milliseconds, reconnect maximum delay in milliseconds, and metrics persistence interval in milliseconds. +2. IF a required configuration value is absent, THEN THE `BalanceSyncService` SHALL use the documented default value and log a warning at startup. +3. THE `BalanceSyncService` SHALL validate that the polling fallback interval is greater than 0 ms and less than or equal to 60 000 ms at startup, logging an error and using the default if the value is out of range. diff --git a/.kiro/specs/realtime-balance-sync/tasks.md b/.kiro/specs/realtime-balance-sync/tasks.md new file mode 100644 index 000000000..c0998a665 --- /dev/null +++ b/.kiro/specs/realtime-balance-sync/tasks.md @@ -0,0 +1,181 @@ +# Implementation Plan: Realtime Balance Sync + +## Overview + +Implement `BalanceSyncService` inside the existing `BlockchainModule`, wiring a per-account Stellar Horizon SSE stream to a Redis cache and `@nestjs/event-emitter`. Integrate cache-aside reads into `SavingsService`, add connection health metrics to `ProtocolMetrics`, and expose a metrics summary through `BlockchainController`. + +## Tasks + +- [x] 1. Add fast-check dev dependency and extend configuration + - [x] 1.1 Install fast-check as a dev dependency + - Run `pnpm add -D fast-check` to add the property-based testing library + - _Requirements: 8.1_ + - [x] 1.2 Extend `backend/src/config/configuration.ts` with `balanceSync` config block + - Add `balanceSync` namespace with keys: `cacheTtlSeconds`, `pollIntervalMs`, `reconnectInitialDelayMs`, `reconnectMaxDelayMs`, `metricsPersistIntervalMs` + - Use env vars with documented defaults (300, 5000, 1000, 60000, 60000) + - _Requirements: 8.1, 8.2_ + +- [x] 2. Create core types and the `BalanceChangedEvent` + - [x] 2.1 Create `backend/src/modules/blockchain/balance-sync.types.ts` + - Define `BalanceChangedEvent` class with fields: `accountId`, `assetCode`, `previousBalance`, `newBalance`, `changedAt` + - Define `AccountMetrics`, `ConnectionMetricsSummary` interfaces + - Define `StreamHandle` interface (in-memory only) + - Export `BALANCE_CHANGED_EVENT = 'balance.changed'` constant + - _Requirements: 3.2, 6.1_ + - [ ]* 2.2 Write property test for `BalanceChangedEvent` field completeness (Property 3) + - `// Feature: realtime-balance-sync, Property 3: BalanceChangedEvent contains required fields` + - Generate random `accountId`, `assetCode`, `previousBalance`, `newBalance` strings; assert all fields are non-empty after construction + - **Property 3: BalanceChangedEvent contains required fields** + - **Validates: Requirements 3.2** + +- [x] 3. Implement `BalanceSyncService` — core structure and configuration validation + - [x] 3.1 Create `backend/src/modules/blockchain/balance-sync.service.ts` + - Scaffold `@Injectable() BalanceSyncService implements OnModuleInit, OnModuleDestroy` + - Inject `ConfigService`, `CACHE_MANAGER`, `EventEmitter2`, `RpcClientWrapper`, and the `ProtocolMetrics` repository + - In `onModuleInit`, read all `balanceSync` config values; log warnings for absent keys and use defaults + - Validate `pollIntervalMs`: if ≤ 0 or > 60 000, log error and substitute 5 000 + - Initialise the `Map` for account handles + - _Requirements: 8.1, 8.2, 8.3_ + - [ ]* 3.2 Write property test for configuration defaults (Property 9) + - `// Feature: realtime-balance-sync, Property 9: Configuration defaults applied on missing values` + - Generate absent/undefined config values; assert service resolves to documented defaults without throwing + - **Property 9: Configuration defaults applied on missing values** + - **Validates: Requirements 8.2** + - [ ]* 3.3 Write property test for poll interval validation (Property 10) + - `// Feature: realtime-balance-sync, Property 10: Poll interval validation rejects out-of-range values` + - Generate integers ≤ 0 or > 60 000; assert default 5 000 is substituted and error is logged + - **Property 10: Poll interval validation rejects out-of-range values** + - **Validates: Requirements 8.3** + +- [x] 4. Implement cache write helpers and `BalanceCache` integration + - [x] 4.1 Add private `writeBalanceToCache` method to `BalanceSyncService` + - Key pattern: `balance:{publicKey}:{assetCode}` + - Value: `JSON.stringify({ balance, updatedAt: new Date().toISOString() })` + - Set TTL from `cacheTtlSeconds` config + - On cache write failure: log at `warn` level and do not rethrow + - _Requirements: 2.1, 2.2, 2.3, 2.4_ + - [ ]* 4.2 Write property test for balance cache round-trip (Property 1) + - `// Feature: realtime-balance-sync, Property 1: Balance cache round-trip` + - Generate random public keys, asset codes, and balance strings; write via `writeBalanceToCache`; read back from cache mock; assert equality + - **Property 1: Balance cache round-trip** + - **Validates: Requirements 2.1, 2.5** + +- [x] 5. Implement balance-change detection and event emission + - [x] 5.1 Add private `processAccountUpdate` method to `BalanceSyncService` + - For each asset in the incoming account record, read the current cached balance + - If the new balance differs from cached, call `writeBalanceToCache` then emit `BalanceChangedEvent` via `EventEmitter2` + - If balance is unchanged, skip emission + - _Requirements: 3.1, 3.3, 3.4_ + - [ ]* 5.2 Write property test for only-changed-balances emit events (Property 2) + - `// Feature: realtime-balance-sync, Property 2: Only changed balances emit events` + - Generate sequences of balance updates (some identical, some different); assert event count equals number of actual changes + - **Property 2: Only changed balances emit events** + - **Validates: Requirements 3.1, 3.3** + - [ ]* 5.3 Write property test for per-asset event emission (Property 4) + - `// Feature: realtime-balance-sync, Property 4: Per-asset event emission` + - Generate account updates with N changed assets; assert exactly N `BalanceChangedEvent` instances emitted + - **Property 4: Per-asset event emission** + - **Validates: Requirements 3.4** + +- [x] 6. Implement stream subscription and reconnection logic + - [x] 6.1 Implement `subscribe(publicKey: string)` on `BalanceSyncService` + - Idempotent: no-op if already subscribed + - Open stream via `RpcClientWrapper.getCurrentHorizonServer().accounts().accountId(publicKey).stream()` + - On each message, call `processAccountUpdate`; record `connectedAt` and set `connected = true` + - On stream error/close, trigger reconnect loop and activate polling fallback + - Store `StreamHandle` in the map + - _Requirements: 1.1, 1.2, 1.3, 6.2_ + - [x] 6.2 Implement exponential back-off reconnect loop (private `scheduleReconnect`) + - Delay formula: `min(initialDelayMs * 2^attempt, maxDelayMs)` + - On success: log recovery, reset delay to initial, deactivate polling fallback, increment reconnect counter + - On failure: increment reconnect counter in `ConnectionMetrics`, schedule next attempt + - _Requirements: 4.1, 4.2, 4.3, 4.4_ + - [ ]* 6.3 Write property test for exponential back-off bounds (Property 5) + - `// Feature: realtime-balance-sync, Property 5: Exponential back-off bounds` + - Generate random attempt counts and config values; assert computed delay equals `min(init * 2^n, max)` and never exceeds `maxDelayMs` + - **Property 5: Exponential back-off bounds** + - **Validates: Requirements 4.1** + +- [x] 7. Implement polling fallback + - [x] 7.1 Add private `activatePollingFallback` and `deactivatePollingFallback` methods + - `activatePollingFallback`: set `pollTimer` using `setInterval`; each tick calls `RpcClientWrapper.executeWithRetry()` for the account, then calls `processAccountUpdate`; set `fallbackActive = true` on the handle's metrics + - `deactivatePollingFallback`: clear `pollTimer`; set `fallbackActive = false` + - Update the global `anyFallbackActive` flag in `getMetricsSummary()` + - _Requirements: 5.1, 5.2, 5.3, 5.4_ + - [ ]* 7.2 Write property test for fallback round-trip (Property 6) + - `// Feature: realtime-balance-sync, Property 6: Fallback activates on disconnect, deactivates on reconnect` + - Simulate disconnect then reconnect; assert `fallbackActive` transitions `false → true → false` + - **Property 6: Fallback round-trip** + - **Validates: Requirements 4.5, 5.2, 5.4** + +- [x] 8. Implement `unsubscribe`, `getMetricsSummary`, and `OnModuleDestroy` + - [x] 8.1 Implement `unsubscribe(publicKey: string)` on `BalanceSyncService` + - Idempotent: no-op if not subscribed + - Close stream, clear reconnect timer, clear poll timer, remove handle from map + - _Requirements: 1.2, 1.4_ + - [x] 8.2 Implement `getMetricsSummary(): ConnectionMetricsSummary` + - Aggregate `AccountMetrics` from all handles; compute `anyFallbackActive` and `totalReconnects` + - _Requirements: 6.1, 6.4_ + - [x] 8.3 Implement `onModuleDestroy` + - Iterate all handles; close streams and cancel all timers + - _Requirements: 1.4_ + - [x] 8.4 Write unit tests for `subscribe`, `unsubscribe`, and `onModuleDestroy` + - Assert `subscribe` opens a stream and stores the handle + - Assert `unsubscribe` closes the stream and removes the handle + - Assert `onModuleDestroy` closes all open streams and cancels all timers + - _Requirements: 1.2, 1.4_ + +- [x] 9. Checkpoint — Ensure all tests pass + - Ensure all tests pass, ask the user if questions arise. + +- [x] 10. Implement metrics persistence to `ProtocolMetrics` + - [x] 10.1 Extend `ProtocolMetrics` entity with `connectionMetrics` column + - Add `@Column('jsonb', { nullable: true }) connectionMetrics: ConnectionMetricsSummary | null` to `backend/src/modules/admin-analytics/entities/protocol-metrics.entity.ts` + - _Requirements: 6.5_ + - [x] 10.2 Add private `persistMetrics` method and schedule it in `onModuleInit` + - Use `setInterval` with `metricsPersistIntervalMs`; call `getMetricsSummary()` and upsert into `ProtocolMetrics` + - Cancel the interval in `onModuleDestroy` + - _Requirements: 6.5_ + +- [x] 11. Integrate `BalanceSyncService` into `BlockchainModule` + - [x] 11.1 Register and export `BalanceSyncService` in `BlockchainModule` + - Add to `providers` and `exports` arrays in `backend/src/modules/blockchain/blockchain.module.ts` + - Import `EventEmitterModule` if not already present; import `TypeOrmModule.forFeature([ProtocolMetrics])` if needed + - _Requirements: 7.4_ + - [x] 11.2 Expose `getMetricsSummary()` via `BlockchainController` + - Add a `GET /blockchain/balance-sync/metrics` endpoint that calls `balanceSyncService.getMetricsSummary()` + - _Requirements: 6.4_ + +- [x] 12. Integrate cache-aside reads into `SavingsService` + - [x] 12.1 Update `SavingsService.getWalletBalance()` with cache-aside logic + - Check `balance:{publicKey}:{asset ?? 'native'}` in cache; return parsed balance on hit + - On miss: fetch from Horizon (existing logic), write to cache, return balance + - _Requirements: 7.1, 7.2_ + - [x] 12.2 Update `SavingsService.getUserSavingsBalance()` with cache-aside logic + - For each asset, check cache before hitting Horizon + - _Requirements: 7.3_ + - [ ]* 12.3 Write property test for cache-aside hit (Property 7) + - `// Feature: realtime-balance-sync, Property 7: Cache-aside read returns cached value on hit` + - Seed cache mock with a value; call `getWalletBalance()`; assert Horizon mock not called + - **Property 7: Cache-aside read returns cached value on hit** + - **Validates: Requirements 7.1, 7.3** + - [ ]* 12.4 Write property test for cache-aside miss populates cache (Property 8) + - `// Feature: realtime-balance-sync, Property 8: Cache-aside populates cache on miss` + - Empty cache; call `getWalletBalance()`; assert cache contains an entry equal to the returned balance + - **Property 8: Cache-aside miss populates cache** + - **Validates: Requirements 7.2** + - [x] 12.5 Write unit tests for `SavingsService` cache-aside behaviour + - Assert cached value returned without calling Horizon + - Assert Horizon called and cache populated on miss + - _Requirements: 7.1, 7.2_ + +- [x] 13. Final checkpoint — Ensure all tests pass + - Ensure all tests pass, ask the user if questions arise. + +## Notes + +- Tasks marked with `*` are optional and can be skipped for a faster MVP +- Each task references specific requirements for traceability +- Property tests require `fast-check` (installed in task 1.1) and use Jest as the runner +- Each property test must include the comment `// Feature: realtime-balance-sync, Property N: ` +- Checkpoints ensure incremental validation before moving to the next phase diff --git a/backend/package.json b/backend/package.json index b744bb725..5a051bcc8 100644 --- a/backend/package.json +++ b/backend/package.json @@ -79,6 +79,7 @@ "eslint": "^9.39.3", "eslint-config-prettier": "^10.0.1", "eslint-plugin-prettier": "^5.2.2", + "fast-check": "^4.6.0", "globals": "^16.0.0", "jest": "^29.7.0", "pino-pretty": "^13.1.3", diff --git a/backend/pnpm-lock.yaml b/backend/pnpm-lock.yaml index 2eb5519a5..b70bb1f63 100644 --- a/backend/pnpm-lock.yaml +++ b/backend/pnpm-lock.yaml @@ -177,6 +177,9 @@ importers: eslint-plugin-prettier: specifier: ^5.2.2 version: 5.5.5(@types/eslint@9.6.1)(eslint-config-prettier@10.1.8(eslint@9.39.3(jiti@2.6.1)))(eslint@9.39.3(jiti@2.6.1))(prettier@3.8.1) + fast-check: + specifier: ^4.6.0 + version: 4.6.0 globals: specifier: ^16.0.0 version: 16.5.0 @@ -2634,6 +2637,10 @@ packages: extend-object@1.0.0: resolution: {integrity: sha512-0dHDIXC7y7LDmCh/lp1oYkmv73K25AMugQI07r8eFopkW6f7Ufn1q+ETMsJjnV9Am14SlElkqy3O92r6xEaxPw==} + fast-check@4.6.0: + resolution: {integrity: sha512-h7H6Dm0Fy+H4ciQYFxFjXnXkzR2kr9Fb22c0UBpHnm59K2zpr2t13aPTHlltFiNT6zuxp6HMPAVVvgur4BLdpA==} + engines: {node: '>=12.17.0'} + fast-copy@4.0.2: resolution: {integrity: sha512-ybA6PDXIXOXivLJK/z9e+Otk7ve13I4ckBvGO5I2RRmBU1gMHLVDJYEuJYhGwez7YNlYji2M2DvVU+a9mSFDlw==} @@ -4145,6 +4152,9 @@ packages: pure-rand@6.1.0: resolution: {integrity: sha512-bVWawvoZoBYpp6yIoQtQXHZjmz35RSVHnUOTefl8Vcjr8snTPY1wnpSPMWekcFwbxI6gtmT7rSYPFvz71ldiOA==} + pure-rand@8.4.0: + resolution: {integrity: sha512-IoM8YF/jY0hiugFo/wOWqfmarlE6J0wc6fDK1PhftMk7MGhVZl88sZimmqBBFomLOCSmcCCpsfj7wXASCpvK9A==} + qs@6.15.0: resolution: {integrity: sha512-mAZTtNCeetKMH+pSjrb76NAM8V9a05I9aBZOHztWy/UqcJdQYNsf59vrRKWnojAT9Y+GbIvoTBC++CPHqpDBhQ==} engines: {node: '>=0.6'} @@ -7711,6 +7721,10 @@ snapshots: extend-object@1.0.0: optional: true + fast-check@4.6.0: + dependencies: + pure-rand: 8.4.0 + fast-copy@4.0.2: {} fast-deep-equal@3.1.3: {} @@ -9727,6 +9741,8 @@ snapshots: pure-rand@6.1.0: {} + pure-rand@8.4.0: {} + qs@6.15.0: dependencies: side-channel: 1.1.0 diff --git a/backend/src/config/configuration.ts b/backend/src/config/configuration.ts index f4b573c89..d95615f3b 100644 --- a/backend/src/config/configuration.ts +++ b/backend/src/config/configuration.ts @@ -78,4 +78,11 @@ export default () => ({ 10, ), }, + balanceSync: { + cacheTtlSeconds: parseInt(process.env.BALANCE_CACHE_TTL_SECONDS || '300', 10), + pollIntervalMs: parseInt(process.env.BALANCE_POLL_INTERVAL_MS || '5000', 10), + reconnectInitialDelayMs: parseInt(process.env.BALANCE_RECONNECT_INIT_MS || '1000', 10), + reconnectMaxDelayMs: parseInt(process.env.BALANCE_RECONNECT_MAX_MS || '60000', 10), + metricsPersistIntervalMs: parseInt(process.env.BALANCE_METRICS_PERSIST_MS || '60000', 10), + }, }); diff --git a/backend/src/modules/admin-analytics/entities/protocol-metrics.entity.ts b/backend/src/modules/admin-analytics/entities/protocol-metrics.entity.ts index 8519c97f9..cfda1847b 100644 --- a/backend/src/modules/admin-analytics/entities/protocol-metrics.entity.ts +++ b/backend/src/modules/admin-analytics/entities/protocol-metrics.entity.ts @@ -27,6 +27,9 @@ export class ProtocolMetrics { @Column('jsonb', { nullable: true }) productBreakdown: Record | null; + @Column('jsonb', { nullable: true }) + connectionMetrics: Record | null; + @CreateDateColumn() createdAt: Date; } diff --git a/backend/src/modules/blockchain/balance-sync.service.spec.ts b/backend/src/modules/blockchain/balance-sync.service.spec.ts new file mode 100644 index 000000000..3189b90b1 --- /dev/null +++ b/backend/src/modules/blockchain/balance-sync.service.spec.ts @@ -0,0 +1,189 @@ +import { Test, TestingModule } from '@nestjs/testing'; +import { ConfigService } from '@nestjs/config'; +import { CACHE_MANAGER } from '@nestjs/cache-manager'; +import { EventEmitter2 } from '@nestjs/event-emitter'; +import { getRepositoryToken } from '@nestjs/typeorm'; +import { BalanceSyncService } from './balance-sync.service'; +import { StellarService } from './stellar.service'; +import { ProtocolMetrics } from '../admin-analytics/entities/protocol-metrics.entity'; + +// Requirements: 1.2, 1.4 + +const MOCK_PUBLIC_KEY_A = 'GAAZI4TCR3TY5OJHCTJC2A4QSY6CJWJH5IAJTGKIN2ER7LBNVKOCCWN'; +const MOCK_PUBLIC_KEY_B = 'GBCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC'; + +function buildConfigValues(): Record { + return { + 'balanceSync.cacheTtlSeconds': 300, + 'balanceSync.pollIntervalMs': 5000, + 'balanceSync.reconnectInitialDelayMs': 1000, + 'balanceSync.reconnectMaxDelayMs': 60000, + 'balanceSync.metricsPersistIntervalMs': 60000, + }; +} + +function makeStreamMock() { + const closeFn = jest.fn(); + // Returns a close function, simulating the Stellar SDK stream() API + const streamFn = jest.fn().mockReturnValue(closeFn); + return { closeFn, streamFn }; +} + +function buildHorizonServerMock(streamFn: jest.Mock) { + return { + accounts: jest.fn().mockReturnValue({ + accountId: jest.fn().mockReturnValue({ + stream: streamFn, + call: jest.fn().mockResolvedValue({ account_id: MOCK_PUBLIC_KEY_A, balances: [] }), + }), + }), + }; +} + +describe('BalanceSyncService', () => { + let service: BalanceSyncService; + let closeFnA: jest.Mock; + let closeFnB: jest.Mock; + let streamFnA: jest.Mock; + + async function buildModule(streamFn?: jest.Mock) { + const { closeFn, streamFn: defaultStreamFn } = makeStreamMock(); + closeFnA = closeFn; + streamFnA = streamFn ?? defaultStreamFn; + + const horizonServerMock = buildHorizonServerMock(streamFnA); + + const configValues = buildConfigValues(); + const mockConfigService = { + get: jest.fn((key: string) => configValues[key]), + }; + + const mockCacheManager = { + get: jest.fn().mockResolvedValue(null), + set: jest.fn().mockResolvedValue(undefined), + }; + + const mockEventEmitter = { + emit: jest.fn(), + }; + + const mockStellarService = { + getHorizonServer: jest.fn().mockReturnValue(horizonServerMock), + }; + + const mockProtocolMetricsRepo = { + findOne: jest.fn().mockResolvedValue(null), + save: jest.fn().mockResolvedValue({}), + }; + + const module: TestingModule = await Test.createTestingModule({ + providers: [ + BalanceSyncService, + { provide: ConfigService, useValue: mockConfigService }, + { provide: CACHE_MANAGER, useValue: mockCacheManager }, + { provide: EventEmitter2, useValue: mockEventEmitter }, + { provide: StellarService, useValue: mockStellarService }, + { + provide: getRepositoryToken(ProtocolMetrics), + useValue: mockProtocolMetricsRepo, + }, + ], + }).compile(); + + service = module.get(BalanceSyncService); + // Trigger onModuleInit to load config + service.onModuleInit(); + + return module; + } + + afterEach(() => { + jest.clearAllMocks(); + }); + + describe('subscribe()', () => { + it('opens a stream and stores the handle', async () => { + await buildModule(); + + service.subscribe(MOCK_PUBLIC_KEY_A); + + const summary = service.getMetricsSummary(); + expect(summary.accounts).toHaveLength(1); + expect(summary.accounts[0].publicKey).toBe(MOCK_PUBLIC_KEY_A); + expect(streamFnA).toHaveBeenCalledTimes(1); + }); + + it('is idempotent — calling twice does not open two streams', async () => { + await buildModule(); + + service.subscribe(MOCK_PUBLIC_KEY_A); + service.subscribe(MOCK_PUBLIC_KEY_A); + + expect(streamFnA).toHaveBeenCalledTimes(1); + expect(service.getMetricsSummary().accounts).toHaveLength(1); + }); + }); + + describe('unsubscribe()', () => { + it('closes the stream and removes the handle', async () => { + await buildModule(); + + service.subscribe(MOCK_PUBLIC_KEY_A); + service.unsubscribe(MOCK_PUBLIC_KEY_A); + + expect(closeFnA).toHaveBeenCalledTimes(1); + expect(service.getMetricsSummary().accounts).toHaveLength(0); + }); + + it('is idempotent — calling on unknown key does not throw', async () => { + await buildModule(); + + expect(() => service.unsubscribe('UNKNOWN_KEY')).not.toThrow(); + }); + }); + + describe('onModuleDestroy()', () => { + it('closes all open streams and cancels all timers', async () => { + // Build a module where each accountId gets its own close function + const closeFnForA = jest.fn(); + const closeFnForB = jest.fn(); + let callCount = 0; + const multiStreamFn = jest.fn().mockImplementation(() => { + callCount++; + return callCount === 1 ? closeFnForA : closeFnForB; + }); + + const horizonServerMock = buildHorizonServerMock(multiStreamFn); + + const configValues = buildConfigValues(); + const mockConfigService = { + get: jest.fn((key: string) => configValues[key]), + }; + + const module: TestingModule = await Test.createTestingModule({ + providers: [ + BalanceSyncService, + { provide: ConfigService, useValue: mockConfigService }, + { provide: CACHE_MANAGER, useValue: { get: jest.fn().mockResolvedValue(null), set: jest.fn() } }, + { provide: EventEmitter2, useValue: { emit: jest.fn() } }, + { provide: StellarService, useValue: { getHorizonServer: jest.fn().mockReturnValue(horizonServerMock) } }, + { provide: getRepositoryToken(ProtocolMetrics), useValue: { findOne: jest.fn(), save: jest.fn() } }, + ], + }).compile(); + + service = module.get(BalanceSyncService); + service.onModuleInit(); + + service.subscribe(MOCK_PUBLIC_KEY_A); + service.subscribe(MOCK_PUBLIC_KEY_B); + + expect(service.getMetricsSummary().accounts).toHaveLength(2); + + service.onModuleDestroy(); + + expect(closeFnForA).toHaveBeenCalledTimes(1); + expect(closeFnForB).toHaveBeenCalledTimes(1); + expect(service.getMetricsSummary().accounts).toHaveLength(0); + }); + }); +}); diff --git a/backend/src/modules/blockchain/balance-sync.service.ts b/backend/src/modules/blockchain/balance-sync.service.ts new file mode 100644 index 000000000..4b42e67b6 --- /dev/null +++ b/backend/src/modules/blockchain/balance-sync.service.ts @@ -0,0 +1,421 @@ +import { Injectable, Logger, OnModuleDestroy, OnModuleInit, Inject } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import { InjectRepository } from '@nestjs/typeorm'; +import { Repository } from 'typeorm'; +import { CACHE_MANAGER } from '@nestjs/cache-manager'; +import { Cache } from 'cache-manager'; +import { EventEmitter2 } from '@nestjs/event-emitter'; +import { Horizon } from '@stellar/stellar-sdk'; +import { StellarService } from './stellar.service'; +import { ProtocolMetrics } from '../admin-analytics/entities/protocol-metrics.entity'; +import { + BalanceChangedEvent, + BALANCE_CHANGED_EVENT, + ConnectionMetricsSummary, + StreamHandle, +} from './balance-sync.types'; + +const CONFIG_DEFAULTS = { + cacheTtlSeconds: 300, + pollIntervalMs: 5000, + reconnectInitialDelayMs: 1000, + reconnectMaxDelayMs: 60000, + metricsPersistIntervalMs: 60000, +} as const; + +@Injectable() +export class BalanceSyncService implements OnModuleInit, OnModuleDestroy { + private readonly logger = new Logger(BalanceSyncService.name); + + private handles: Map = new Map(); + + private cacheTtlSeconds: number; + private pollIntervalMs: number; + private reconnectInitialDelayMs: number; + private reconnectMaxDelayMs: number; + private metricsPersistIntervalMs: number; + private metricsTimer: NodeJS.Timeout | null = null; + + constructor( + private readonly configService: ConfigService, + @Inject(CACHE_MANAGER) private readonly cacheManager: Cache, + private readonly eventEmitter: EventEmitter2, + private readonly stellarService: StellarService, + @InjectRepository(ProtocolMetrics) + private readonly protocolMetricsRepo: Repository, + ) {} + + onModuleInit(): void { + this.cacheTtlSeconds = this.resolveConfig( + 'balanceSync.cacheTtlSeconds', + CONFIG_DEFAULTS.cacheTtlSeconds, + ); + + this.pollIntervalMs = this.resolveConfig( + 'balanceSync.pollIntervalMs', + CONFIG_DEFAULTS.pollIntervalMs, + ); + + this.reconnectInitialDelayMs = this.resolveConfig( + 'balanceSync.reconnectInitialDelayMs', + CONFIG_DEFAULTS.reconnectInitialDelayMs, + ); + + this.reconnectMaxDelayMs = this.resolveConfig( + 'balanceSync.reconnectMaxDelayMs', + CONFIG_DEFAULTS.reconnectMaxDelayMs, + ); + + this.metricsPersistIntervalMs = this.resolveConfig( + 'balanceSync.metricsPersistIntervalMs', + CONFIG_DEFAULTS.metricsPersistIntervalMs, + ); + + // Validate pollIntervalMs range (Requirement 8.3) + if (this.pollIntervalMs <= 0 || this.pollIntervalMs > 60000) { + this.logger.error( + `pollIntervalMs value ${this.pollIntervalMs} is out of range (must be > 0 and <= 60000). ` + + `Substituting default: ${CONFIG_DEFAULTS.pollIntervalMs}`, + ); + this.pollIntervalMs = CONFIG_DEFAULTS.pollIntervalMs; + } + + this.logger.log( + `BalanceSyncService initialised with config: ` + + `cacheTtlSeconds=${this.cacheTtlSeconds}, ` + + `pollIntervalMs=${this.pollIntervalMs}, ` + + `reconnectInitialDelayMs=${this.reconnectInitialDelayMs}, ` + + `reconnectMaxDelayMs=${this.reconnectMaxDelayMs}, ` + + `metricsPersistIntervalMs=${this.metricsPersistIntervalMs}`, + ); + + this.metricsTimer = setInterval(() => { + void this.persistMetrics(); + }, this.metricsPersistIntervalMs); + } + + onModuleDestroy(): void { + this.logger.log('BalanceSyncService destroying'); + + if (this.metricsTimer !== null) { + clearInterval(this.metricsTimer); + this.metricsTimer = null; + } + + const accountCount = this.handles.size; + for (const publicKey of this.handles.keys()) { + this.unsubscribe(publicKey); + } + + this.logger.log(`Cleaned up ${accountCount} account(s)`); + } + + subscribe(publicKey: string): void { + if (this.handles.has(publicKey)) { + this.logger.debug(`Already subscribed to account ${publicKey}, skipping`); + return; + } + + const handle: StreamHandle = { + close: () => {}, + connected: false, + reconnect: { + delayMs: this.reconnectInitialDelayMs, + attempt: 0, + timer: null, + }, + pollTimer: null, + metrics: { + publicKey, + streamUptimeSeconds: 0, + reconnectCount: 0, + fallbackActive: false, + connectedAt: null, + }, + }; + + this.handles.set(publicKey, handle); + this.openStream(publicKey); + } + + unsubscribe(publicKey: string): void { + if (!this.handles.has(publicKey)) { + this.logger.debug(`unsubscribe called for unknown account ${publicKey}, skipping`); + return; + } + + const handle = this.handles.get(publicKey)!; + + handle.close(); + + if (handle.reconnect.timer !== null) { + clearTimeout(handle.reconnect.timer); + } + + this.deactivatePollingFallback(publicKey); + this.handles.delete(publicKey); + + this.logger.log(`Unsubscribed account ${publicKey}`); + } + + getMetricsSummary(): ConnectionMetricsSummary { + const accounts = Array.from(this.handles.entries()).map(([, handle]) => { + const streamUptimeSeconds = + handle.connected && handle.metrics.connectedAt + ? Math.floor((Date.now() - handle.metrics.connectedAt.getTime()) / 1000) + : handle.metrics.streamUptimeSeconds; + + return { + ...handle.metrics, + streamUptimeSeconds, + }; + }); + + const anyFallbackActive = accounts.some((a) => a.fallbackActive); + const totalReconnects = accounts.reduce((sum, a) => sum + a.reconnectCount, 0); + + return { accounts, anyFallbackActive, totalReconnects }; + } + + /** + * Process an incoming account record from the Horizon stream. + * For each asset balance, compare against the cached value and emit + * a BalanceChangedEvent if the balance has changed (Requirements 3.1, 3.3, 3.4). + */ + private async processAccountUpdate(accountRecord: Horizon.AccountResponse): Promise { + const accountId = accountRecord.account_id; + + for (const balance of accountRecord.balances) { + const assetCode = + balance.asset_type === 'native' + ? 'native' + : (balance as Horizon.HorizonApi.BalanceLineAsset).asset_code; + + const newBalance = balance.balance; + const previousBalance = await this.readBalanceFromCache(accountId, assetCode); + + if (newBalance !== previousBalance) { + await this.writeBalanceToCache(accountId, assetCode, newBalance); + + const event = new BalanceChangedEvent(); + event.accountId = accountId; + event.assetCode = assetCode; + event.previousBalance = previousBalance ?? '0'; + event.newBalance = newBalance; + event.changedAt = new Date(); + + this.eventEmitter.emit(BALANCE_CHANGED_EVENT, event); + } + } + } + + /** + * Write a balance entry to the cache. + * Key: `balance:{publicKey}:{assetCode}` + * On any error: log at warn level and do not rethrow (Requirement 2.4). + */ + private async writeBalanceToCache( + publicKey: string, + assetCode: string, + balance: string, + ): Promise { + const key = `balance:${publicKey}:${assetCode}`; + try { + const value = JSON.stringify({ balance, updatedAt: new Date().toISOString() }); + const ttl = this.cacheTtlSeconds * 1000; // cache-manager uses milliseconds + await this.cacheManager.set(key, value, ttl); + } catch (err) { + this.logger.warn( + `Failed to write balance cache for key "${key}": ${(err as Error).message}`, + ); + } + } + + /** + * Read a balance entry from the cache. + * Returns the `balance` field from the stored JSON, or null on miss/error. + */ + private async readBalanceFromCache( + publicKey: string, + assetCode: string, + ): Promise { + const key = `balance:${publicKey}:${assetCode}`; + try { + const result = await this.cacheManager.get(key); + if (typeof result === 'string') { + const parsed = JSON.parse(result) as { balance: string }; + return parsed.balance; + } + return null; + } catch (err) { + this.logger.warn( + `Failed to read balance cache for key "${key}": ${(err as Error).message}`, + ); + return null; + } + } + + /** + * Open a Horizon SSE stream for the given account. + * Stores the SDK-returned close function on the handle. + * Requirements: 1.1, 1.2, 1.3 + */ + private openStream(publicKey: string): void { + const handle = this.handles.get(publicKey); + if (!handle) return; + + const horizonServer = this.stellarService.getHorizonServer(); + + const closeStream = horizonServer + .accounts() + .accountId(publicKey) + .stream({ + onmessage: (accountRecord) => { + handle.connected = true; + handle.metrics.connectedAt = handle.metrics.connectedAt ?? new Date(); + this.processAccountUpdate(accountRecord as unknown as Horizon.AccountResponse).catch((err) => + this.logger.error( + `Error processing account update for ${publicKey}: ${(err as Error).message}`, + ), + ); + }, + onerror: (err) => { + const message = err instanceof Error ? err.message : String(err); + this.logger.warn(`Stream error for ${publicKey}: ${message}`); + handle.connected = false; + this.scheduleReconnect(publicKey); + this.activatePollingFallback(publicKey); + }, + }); + + handle.close = closeStream; + this.logger.log(`Opened Horizon SSE stream for account ${publicKey}`); + } + + /** + * Schedule an exponential back-off reconnect attempt for the given account. + * Requirements: 4.1, 4.2, 4.3, 4.4 + */ + private scheduleReconnect(publicKey: string): void { + const handle = this.handles.get(publicKey); + if (!handle) return; + + // Already scheduled — don't double-schedule + if (handle.reconnect.timer !== null) return; + + const delay = Math.min( + this.reconnectInitialDelayMs * Math.pow(2, handle.reconnect.attempt), + this.reconnectMaxDelayMs, + ); + handle.reconnect.delayMs = delay; + + this.logger.log( + `Scheduling reconnect for ${publicKey}: attempt ${handle.reconnect.attempt + 1}, delay ${delay} ms`, + ); + + handle.reconnect.timer = setTimeout(() => { + handle.reconnect.timer = null; + handle.reconnect.attempt++; + handle.metrics.reconnectCount++; + + try { + this.openStream(publicKey); + // Stream opened without throwing — reset back-off state + handle.reconnect.attempt = 0; + handle.reconnect.delayMs = this.reconnectInitialDelayMs; + this.logger.log(`Stream recovered for account ${publicKey}`); + this.deactivatePollingFallback(publicKey); + } catch (err) { + this.logger.error( + `Reconnect attempt failed for ${publicKey}: ${(err as Error).message}`, + ); + this.scheduleReconnect(publicKey); + } + }, delay); + } + + /** + * Activate the polling fallback for an account whose stream is down. + * Requirements: 5.1, 5.2, 5.3, 5.4 + */ + private activatePollingFallback(publicKey: string): void { + const handle = this.handles.get(publicKey); + if (!handle) return; + + // Idempotent — already polling + if (handle.pollTimer !== null) return; + + handle.metrics.fallbackActive = true; + this.logger.log(`Activating polling fallback for account ${publicKey}`); + + handle.pollTimer = setInterval(async () => { + try { + const horizonServer = this.stellarService.getHorizonServer(); + const account = await horizonServer.accounts().accountId(publicKey).call(); + await this.processAccountUpdate(account as unknown as Horizon.AccountResponse); + } catch (err) { + this.logger.warn( + `Polling fallback error for ${publicKey}: ${(err as Error).message}`, + ); + } + }, this.pollIntervalMs); + } + + /** + * Deactivate the polling fallback once the stream is re-established. + * Requirements: 5.1, 5.2, 5.3, 5.4 + */ + private deactivatePollingFallback(publicKey: string): void { + const handle = this.handles.get(publicKey); + if (!handle) return; + + // Idempotent — not polling + if (handle.pollTimer === null) return; + + clearInterval(handle.pollTimer); + handle.pollTimer = null; + handle.metrics.fallbackActive = false; + this.logger.log(`Polling fallback deactivated for account ${publicKey}`); + } + + /** + * Persist the current connection metrics snapshot to ProtocolMetrics. + * Upserts into the most recent record, or creates a minimal one if none exists. + * Requirements: 6.5 + */ + private async persistMetrics(): Promise { + try { + const summary = this.getMetricsSummary(); + let record = await this.protocolMetricsRepo.findOne({ where: {}, order: { createdAt: 'DESC' } }); + if (record) { + record.connectionMetrics = summary as any; + await this.protocolMetricsRepo.save(record); + } else { + const newRecord = this.protocolMetricsRepo.create({ + snapshotDate: new Date(), + totalValueLockedUsd: 0, + totalValueLockedXlm: 0, + savingsProductCount: 0, + connectionMetrics: summary as any, + }); + await this.protocolMetricsRepo.save(newRecord); + } + } catch (err) { + this.logger.warn(`Failed to persist connection metrics: ${(err as Error).message}`); + } + } + + /** + * Resolve a config value, logging a warning and using the default if absent. + */ + private resolveConfig(key: string, defaultValue: T): T { + const value = this.configService.get(key); + if (value === undefined || value === null) { + this.logger.warn( + `Config key "${key}" is absent. Using default: ${defaultValue}`, + ); + return defaultValue; + } + return value; + } +} diff --git a/backend/src/modules/blockchain/balance-sync.types.ts b/backend/src/modules/blockchain/balance-sync.types.ts new file mode 100644 index 000000000..c2be99c51 --- /dev/null +++ b/backend/src/modules/blockchain/balance-sync.types.ts @@ -0,0 +1,61 @@ +export const BALANCE_CHANGED_EVENT = 'balance.changed'; + +/** + * Emitted via @nestjs/event-emitter whenever a balance change is detected + * for a subscribed Stellar account. + */ +export class BalanceChangedEvent { + /** Stellar G... public key of the account */ + accountId: string; + /** Asset code: 'native' for XLM, otherwise the asset code string */ + assetCode: string; + /** Balance before this update (string to avoid float precision loss) */ + previousBalance: string; + /** Balance after this update */ + newBalance: string; + /** UTC timestamp of the change */ + changedAt: Date; +} + +/** + * Per-account connection health counters tracked by BalanceSyncService. + * Requirement 6.1 + */ +export interface AccountMetrics { + publicKey: string; + streamUptimeSeconds: number; + reconnectCount: number; + fallbackActive: boolean; + connectedAt: Date | null; +} + +/** + * Aggregated metrics summary returned by getMetricsSummary(). + * Requirement 6.1, 6.4 + */ +export interface ConnectionMetricsSummary { + accounts: AccountMetrics[]; + anyFallbackActive: boolean; + totalReconnects: number; +} + +/** + * In-memory handle for a single subscribed account's stream state. + * Never persisted — lives only in the BalanceSyncService Map. + */ +export interface StreamHandle { + /** Close function returned by Stellar SDK stream() */ + close: () => void; + /** Whether the stream is currently considered connected */ + connected: boolean; + /** Reconnect back-off state */ + reconnect: { + delayMs: number; + attempt: number; + timer: NodeJS.Timeout | null; + }; + /** Polling fallback interval handle */ + pollTimer: NodeJS.Timeout | null; + /** Metrics for this account */ + metrics: AccountMetrics; +} diff --git a/backend/src/modules/blockchain/blockchain.controller.ts b/backend/src/modules/blockchain/blockchain.controller.ts index 18e004119..c2aac7df7 100644 --- a/backend/src/modules/blockchain/blockchain.controller.ts +++ b/backend/src/modules/blockchain/blockchain.controller.ts @@ -1,12 +1,16 @@ import { Controller, Get, Param, Post } from '@nestjs/common'; import { ApiOperation, ApiParam, ApiResponse, ApiTags } from '@nestjs/swagger'; import { StellarService } from './stellar.service'; +import { BalanceSyncService } from './balance-sync.service'; import { TransactionDto } from './dto/transaction.dto'; @ApiTags('Blockchain') @Controller('blockchain') export class BlockchainController { - constructor(private readonly stellarService: StellarService) {} + constructor( + private readonly stellarService: StellarService, + private readonly balanceSyncService: BalanceSyncService, + ) {} @Post('wallets/generate') @ApiOperation({ summary: 'Generate a new Stellar keypair' }) @@ -48,4 +52,11 @@ export class BlockchainController { getRpcStatus() { return this.stellarService.getEndpointsStatus(); } + + @Get('balance-sync/metrics') + @ApiOperation({ summary: 'Get WebSocket connection health metrics for balance sync' }) + @ApiResponse({ status: 200, description: 'Connection metrics summary for all subscribed accounts' }) + getBalanceSyncMetrics() { + return this.balanceSyncService.getMetricsSummary(); + } } diff --git a/backend/src/modules/blockchain/blockchain.module.ts b/backend/src/modules/blockchain/blockchain.module.ts index db9b78c91..981da5701 100644 --- a/backend/src/modules/blockchain/blockchain.module.ts +++ b/backend/src/modules/blockchain/blockchain.module.ts @@ -20,6 +20,8 @@ import { DepositHandler } from './event-handlers/deposit.handler'; import { WithdrawHandler } from './event-handlers/withdraw.handler'; import { YieldHandler } from './event-handlers/yield.handler'; import { IndexerService } from './indexer.service'; +import { BalanceSyncService } from './balance-sync.service'; +import { ProtocolMetrics } from '../admin-analytics/entities/protocol-metrics.entity'; @Global() @Module({ @@ -38,6 +40,7 @@ import { IndexerService } from './indexer.service'; User, UserSubscription, SavingsProduct, + ProtocolMetrics, ]), ], controllers: [BlockchainController, StellarEventListenerController], @@ -50,6 +53,7 @@ import { IndexerService } from './indexer.service'; DepositHandler, WithdrawHandler, YieldHandler, + BalanceSyncService, ], exports: [ StellarService, @@ -60,6 +64,7 @@ import { IndexerService } from './indexer.service'; DepositHandler, WithdrawHandler, YieldHandler, + BalanceSyncService, ], }) export class BlockchainModule {} diff --git a/backend/src/modules/blockchain/savings.service.spec.ts b/backend/src/modules/blockchain/savings.service.spec.ts new file mode 100644 index 000000000..dfb7bff03 --- /dev/null +++ b/backend/src/modules/blockchain/savings.service.spec.ts @@ -0,0 +1,122 @@ +import { Test, TestingModule } from '@nestjs/testing'; +import { CACHE_MANAGER } from '@nestjs/cache-manager'; +import { SavingsService } from './savings.service'; +import { StellarService } from './stellar.service'; + +// Requirements: 7.1, 7.2, 7.3 + +const MOCK_PUBLIC_KEY = 'GAAZI4TCR3TY5OJHCTJC2A4QSY6CJWJH5IAJTGKIN2ER7LBNVKOCCWN'; + +function buildAccountMock(nativeBalance: string) { + return { + balances: [{ balance: nativeBalance, asset_type: 'native' }], + }; +} + +async function buildModule( + cacheGetImpl: jest.Mock, + cacheSetImpl: jest.Mock, + horizonCallImpl: jest.Mock, +): Promise { + const mockCacheManager = { + get: cacheGetImpl, + set: cacheSetImpl, + }; + + const mockHorizonServer = { + accounts: jest.fn().mockReturnValue({ + accountId: jest.fn().mockReturnValue({ + call: horizonCallImpl, + }), + }), + }; + + const mockStellarService = { + getHorizonServer: jest.fn().mockReturnValue(mockHorizonServer), + }; + + const module: TestingModule = await Test.createTestingModule({ + providers: [ + SavingsService, + { provide: StellarService, useValue: mockStellarService }, + { provide: CACHE_MANAGER, useValue: mockCacheManager }, + ], + }).compile(); + + return module.get(SavingsService); +} + +describe('SavingsService', () => { + afterEach(() => jest.clearAllMocks()); + + describe('getWalletBalance()', () => { + it('returns cached value without calling Horizon on cache hit', async () => { + const cachedPayload = JSON.stringify({ + balance: '100.0000000', + updatedAt: '2024-01-01T00:00:00.000Z', + }); + const cacheGet = jest.fn().mockResolvedValue(cachedPayload); + const cacheSet = jest.fn().mockResolvedValue(undefined); + const horizonCall = jest.fn(); + + const service = await buildModule(cacheGet, cacheSet, horizonCall); + const result = await service.getWalletBalance(MOCK_PUBLIC_KEY, 'native'); + + expect(result).toBe(1_000_000_000); // 100 XLM * 10_000_000 + expect(horizonCall).not.toHaveBeenCalled(); + }); + + it('calls Horizon and populates cache on cache miss', async () => { + const cacheGet = jest.fn().mockResolvedValue(null); + const cacheSet = jest.fn().mockResolvedValue(undefined); + const horizonCall = jest + .fn() + .mockResolvedValue(buildAccountMock('50.0000000')); + + const service = await buildModule(cacheGet, cacheSet, horizonCall); + const result = await service.getWalletBalance(MOCK_PUBLIC_KEY, 'native'); + + expect(result).toBe(500_000_000); // 50 XLM * 10_000_000 + expect(horizonCall).toHaveBeenCalledTimes(1); + expect(cacheSet).toHaveBeenCalledWith( + `balance:${MOCK_PUBLIC_KEY}:native`, + expect.stringContaining('"balance":"50.0000000"'), + 300_000, + ); + }); + }); + + describe('getUserSavingsBalance()', () => { + it('returns cached native balance without calling Horizon on cache hit', async () => { + const cachedPayload = JSON.stringify({ + balance: '200.0000000', + updatedAt: '2024-01-01T00:00:00.000Z', + }); + const cacheGet = jest.fn().mockResolvedValue(cachedPayload); + const cacheSet = jest.fn().mockResolvedValue(undefined); + const horizonCall = jest.fn(); + + const service = await buildModule(cacheGet, cacheSet, horizonCall); + const result = await service.getUserSavingsBalance(MOCK_PUBLIC_KEY); + + expect(result.flexible).toBe(2_000_000_000); // 200 XLM * 10_000_000 + expect(result.locked).toBe(0); + expect(result.total).toBe(2_000_000_000); + expect(horizonCall).not.toHaveBeenCalled(); + }); + + it('falls through to Horizon on cache miss', async () => { + const cacheGet = jest.fn().mockResolvedValue(null); + const cacheSet = jest.fn().mockResolvedValue(undefined); + const horizonCall = jest + .fn() + .mockResolvedValue(buildAccountMock('10.0000000')); + + const service = await buildModule(cacheGet, cacheSet, horizonCall); + const result = await service.getUserSavingsBalance(MOCK_PUBLIC_KEY); + + expect(horizonCall).toHaveBeenCalledTimes(1); + expect(result).toEqual({ flexible: 0, locked: 0, total: 0 }); + }); + }); +}); diff --git a/backend/src/modules/blockchain/savings.service.ts b/backend/src/modules/blockchain/savings.service.ts index 009b0f9d9..5484c5193 100644 --- a/backend/src/modules/blockchain/savings.service.ts +++ b/backend/src/modules/blockchain/savings.service.ts @@ -1,4 +1,6 @@ -import { Injectable, Logger } from '@nestjs/common'; +import { Injectable, Logger, Inject } from '@nestjs/common'; +import { CACHE_MANAGER } from '@nestjs/cache-manager'; +import { Cache } from 'cache-manager'; import { Account, Address, @@ -21,7 +23,10 @@ export interface SavingsBalance { export class SavingsService { private readonly logger = new Logger(SavingsService.name); - constructor(private readonly stellarService: StellarService) {} + constructor( + private readonly stellarService: StellarService, + @Inject(CACHE_MANAGER) private readonly cacheManager: Cache, + ) {} /** * Fetch total assets from a Soroban vault contract @@ -63,17 +68,37 @@ export class SavingsService { * @returns Object containing flexible, locked, and total savings balances */ async getUserSavingsBalance(publicKey: string): Promise { + // Check cache for native balance first + const cacheKey = `balance:${publicKey}:native`; + try { + const cached = await this.cacheManager.get(cacheKey); + if (typeof cached === 'string') { + try { + const parsed = JSON.parse(cached) as { balance: string }; + const flexibleBalance = Math.floor( + parseFloat(parsed.balance) * 10_000_000, + ); + return { + flexible: flexibleBalance, + locked: 0, + total: flexibleBalance, + }; + } catch { + // fall through to Horizon + } + } + } catch (cacheErr) { + this.logger.warn( + `Cache read failed for ${cacheKey}: ${(cacheErr as Error).message}`, + ); + } + try { const horizonServer = this.stellarService.getHorizonServer(); // Fetch account to get current state - const account = await horizonServer - .accounts() - .accountId(publicKey) - .call(); + await horizonServer.accounts().accountId(publicKey).call(); - // For now, return a structure based on available data - // In a production system, this would query the actual Soroban contract const flexibleBalance = 0; const lockedBalance = 0; @@ -84,7 +109,7 @@ export class SavingsService { }; } catch (error) { this.logger.warn( - `Could not fetch savings for ${publicKey}: ${error.message}`, + `Could not fetch savings for ${publicKey}: ${(error as Error).message}`, ); return { flexible: 0, @@ -156,6 +181,25 @@ export class SavingsService { publicKey: string, asset: string = 'native', ): Promise { + const cacheKey = `balance:${publicKey}:${asset ?? 'native'}`; + + // Cache-aside: check cache first + try { + const cached = await this.cacheManager.get(cacheKey); + if (typeof cached === 'string') { + try { + const parsed = JSON.parse(cached) as { balance: string }; + return Math.floor(parseFloat(parsed.balance) * 10_000_000); + } catch { + // fall through to Horizon + } + } + } catch (cacheErr) { + this.logger.warn( + `Cache read failed for ${cacheKey}: ${(cacheErr as Error).message}`, + ); + } + try { const horizonServer = this.stellarService.getHorizonServer(); const account = await horizonServer @@ -163,24 +207,42 @@ export class SavingsService { .accountId(publicKey) .call(); + let balance: number; + if (asset === 'native') { // Return native balance in stroops (1 XLM = 10,000,000 stroops) - return Math.floor(parseFloat(account.balances[0].balance) * 10_000_000); + balance = Math.floor( + parseFloat(account.balances[0].balance) * 10_000_000, + ); + } else { + // Find specific asset balance + const assetBalance = account.balances.find( + (b) => 'asset_code' in b && b.asset_code === asset, + ); + balance = + assetBalance && 'balance' in assetBalance + ? Math.floor(parseFloat(assetBalance.balance) * 10_000_000) + : 0; } - // Find specific asset balance - const assetBalance = account.balances.find( - (balance) => 'asset_code' in balance && balance.asset_code === asset, - ); - - if (assetBalance && 'balance' in assetBalance) { - return Math.floor(parseFloat(assetBalance.balance) * 10_000_000); + // Populate cache after Horizon fetch + try { + const balanceStr = (balance / 10_000_000).toFixed(7); + await this.cacheManager.set( + cacheKey, + JSON.stringify({ balance: balanceStr, updatedAt: new Date().toISOString() }), + 300_000, + ); + } catch (cacheErr) { + this.logger.warn( + `Cache write failed for ${cacheKey}: ${(cacheErr as Error).message}`, + ); } - return 0; + return balance; } catch (error) { this.logger.warn( - `Could not fetch wallet balance for ${publicKey}: ${error.message}`, + `Could not fetch wallet balance for ${publicKey}: ${(error as Error).message}`, ); return 0; }