diff --git a/backend/src/modules/blockchain/event-handlers/deposit.handler.spec.ts b/backend/src/modules/blockchain/event-handlers/deposit.handler.spec.ts new file mode 100644 index 000000000..51e53b849 --- /dev/null +++ b/backend/src/modules/blockchain/event-handlers/deposit.handler.spec.ts @@ -0,0 +1,136 @@ +import { Test, TestingModule } from '@nestjs/testing'; +import { DataSource } from 'typeorm'; +import { xdr, nativeToScVal } from '@stellar/stellar-sdk'; +import { createHash } from 'crypto'; +import { DepositHandler } from './deposit.handler'; +import { UserSubscription, SubscriptionStatus } from '../../savings/entities/user-subscription.entity'; +import { User } from '../../user/entities/user.entity'; +import { LedgerTransaction, LedgerTransactionType } from '../entities/transaction.entity'; +import { SavingsProduct } from '../../savings/entities/savings-product.entity'; + +describe('DepositHandler', () => { + let handler: DepositHandler; + let dataSource: any; + let entityManager: any; + + const DEPOSIT_HASH = createHash('sha256').update('Deposit').digest('hex'); + + beforeEach(async () => { + entityManager = { + getRepository: jest.fn().mockImplementation((entity) => { + if (entity === User) return userRepo; + if (entity === LedgerTransaction) return txRepo; + if (entity === UserSubscription) return subRepo; + if (entity === SavingsProduct) return productRepo; + return null; + }), + }; + + dataSource = { + transaction: jest.fn().mockImplementation((cb) => cb(entityManager)), + }; + + const module: TestingModule = await Test.createTestingModule({ + providers: [ + DepositHandler, + { provide: DataSource, useValue: dataSource }, + ], + }).compile(); + + handler = module.get(DepositHandler); + }); + + const userRepo = { + findOne: jest.fn(), + }; + const txRepo = { + findOne: jest.fn(), + save: jest.fn(), + create: jest.fn().mockImplementation((v) => v), + }; + const subRepo = { + findOne: jest.fn(), + save: jest.fn(), + create: jest.fn().mockImplementation((v) => v), + }; + const productRepo = { + findOne: jest.fn(), + }; + + it('should be defined', () => { + expect(handler).toBeDefined(); + }); + + describe('handle', () => { + const mockUser = { id: 'user-id', publicKey: 'G...', defaultSavingsProductId: 'prod-id' }; + const mockProduct = { id: 'prod-id', isActive: true }; + const mockEvent = { + id: 'event-1', + topic: [Buffer.from(DEPOSIT_HASH, 'hex').toString('base64')], + value: nativeToScVal({ to: 'G...', amount: BigInt(500) }).toXDR('base64'), + ledger: 100, + txHash: 'tx-hash', + }; + + beforeEach(() => { + jest.clearAllMocks(); + }); + + it('should return false if topic does not match', async () => { + const wrongEvent = { ...mockEvent, topic: ['AAAA'] }; + const result = await handler.handle(wrongEvent); + expect(result).toBe(false); + expect(dataSource.transaction).not.toHaveBeenCalled(); + }); + + it('should process deposit successfully and update subscription', async () => { + userRepo.findOne.mockResolvedValue(mockUser); + txRepo.findOne.mockResolvedValue(null); + subRepo.findOne.mockResolvedValue({ userId: 'user-id', amount: 1000, status: SubscriptionStatus.ACTIVE }); + + const result = await handler.handle(mockEvent); + + expect(result).toBe(true); + expect(txRepo.save).toHaveBeenCalledWith(expect.objectContaining({ + type: LedgerTransactionType.DEPOSIT, + amount: '500', + })); + expect(subRepo.save).toHaveBeenCalledWith(expect.objectContaining({ + amount: 1500, + })); + }); + + it('should create new subscription if one does not exist', async () => { + userRepo.findOne.mockResolvedValue(mockUser); + txRepo.findOne.mockResolvedValue(null); + subRepo.findOne.mockResolvedValue(null); + productRepo.findOne.mockResolvedValue(mockProduct); + + const result = await handler.handle(mockEvent); + + expect(result).toBe(true); + expect(subRepo.create).toHaveBeenCalled(); + expect(subRepo.save).toHaveBeenCalledWith(expect.objectContaining({ + amount: 500, + })); + }); + + it('should match topic by symbol', async () => { + const symbolEvent = { + ...mockEvent, + topic: [nativeToScVal('Deposit', { type: 'symbol' }).toXDR('base64')], + }; + userRepo.findOne.mockResolvedValue(mockUser); + subRepo.findOne.mockResolvedValue({ userId: 'user-id', amount: 100, status: SubscriptionStatus.ACTIVE }); + + const result = await handler.handle(symbolEvent); + expect(result).toBe(true); + expect(txRepo.save).toHaveBeenCalled(); + }); + + it('should throw error if user not found', async () => { + userRepo.findOne.mockResolvedValue(null); + await expect(handler.handle(mockEvent)).rejects.toThrow('Cannot map deposit payload publicKey to user'); + }); + }); +}); diff --git a/backend/src/modules/blockchain/event-handlers/deposit.handler.ts b/backend/src/modules/blockchain/event-handlers/deposit.handler.ts index df76ec227..367c1b5db 100644 --- a/backend/src/modules/blockchain/event-handlers/deposit.handler.ts +++ b/backend/src/modules/blockchain/event-handlers/deposit.handler.ts @@ -141,7 +141,26 @@ export class DepositHandler { const first = topic[0]; const normalized = this.toHex(first); - return normalized === DepositHandler.DEPOSIT_HASH_HEX; + + // Some contracts emit the symbol 'Deposit' directly, others emit its SHA256 hash + // We handle both cases for robustness + if (normalized === DepositHandler.DEPOSIT_HASH_HEX) { + return true; + } + + // Check if it's the symbol 'Deposit' (base64 XDR for Symbol("Deposit") is 'AAAADAAAAAAHrgAA') + if (typeof first === 'string') { + try { + const scVal = xdr.ScVal.fromXDR(first, 'base64'); + if (scValToNative(scVal) === 'Deposit') { + return true; + } + } catch { + // Not XDR, ignore + } + } + + return false; } private extractPayload(value: unknown): DepositPayload { @@ -154,8 +173,10 @@ export class DepositHandler { 'userPublicKey', 'user', 'address', + 'to', ]) ?? ''; - const amountRaw = asRecord['amount']; + + const amountRaw = asRecord['amount'] ?? asRecord['value'] ?? asRecord['amt']; const amount = typeof amountRaw === 'bigint' @@ -168,7 +189,7 @@ export class DepositHandler { if (!publicKey || !amount || Number.isNaN(Number(amount))) { throw new Error( - 'Invalid Deposit payload: expected publicKey + numeric amount', + `Invalid Deposit payload: expected publicKey + numeric amount. Got: PK=${publicKey}, Amt=${amount}`, ); } diff --git a/backend/src/modules/blockchain/indexer.service.spec.ts b/backend/src/modules/blockchain/indexer.service.spec.ts new file mode 100644 index 000000000..8d7545087 --- /dev/null +++ b/backend/src/modules/blockchain/indexer.service.spec.ts @@ -0,0 +1,134 @@ +import { Test, TestingModule } from '@nestjs/testing'; +import { getRepositoryToken } from '@nestjs/typeorm'; +import { ConfigService } from '@nestjs/config'; +import { Logger } from '@nestjs/common'; +import { IndexerService } from './indexer.service'; +import { IndexerState } from './entities/indexer-state.entity'; +import { DeadLetterEvent } from './entities/dead-letter-event.entity'; +import { SavingsProduct } from '../savings/entities/savings-product.entity'; +import { StellarService } from './stellar.service'; +import { DepositHandler } from './event-handlers/deposit.handler'; +import { YieldHandler } from './event-handlers/yield.handler'; + +describe('IndexerService', () => { + let service: IndexerService; + let stellarService: StellarService; + let indexerStateRepo: any; + let savingsProductRepo: any; + let deadLetterRepo: any; + let depositHandler: any; + let yieldHandler: any; + + const mockIndexerState = { + id: 'uuid', + lastProcessedLedger: 100, + totalEventsProcessed: 0, + totalEventsFailed: 0, + updatedAt: new Date(), + }; + + const mockSavingsProducts = [ + { contractId: 'CC1', isActive: true }, + { contractId: 'CC2', isActive: true }, + ]; + + beforeEach(async () => { + indexerStateRepo = { + findOne: jest.fn().mockResolvedValue(mockIndexerState), + save: jest.fn().mockImplementation((val) => Promise.resolve(val)), + create: jest.fn().mockImplementation((val) => val), + }; + + savingsProductRepo = { + find: jest.fn().mockResolvedValue(mockSavingsProducts), + }; + + deadLetterRepo = { + save: jest.fn().mockImplementation((val) => Promise.resolve(val)), + create: jest.fn().mockImplementation((val) => val), + }; + + stellarService = { + getRpcServer: jest.fn().mockReturnValue({ + getEvents: jest.fn(), + }), + getEvents: jest.fn(), + } as any; + + depositHandler = { handle: jest.fn().mockResolvedValue(true) }; + yieldHandler = { handle: jest.fn().mockResolvedValue(false) }; + + const module: TestingModule = await Test.createTestingModule({ + providers: [ + IndexerService, + { provide: ConfigService, useValue: { get: jest.fn() } }, + { provide: StellarService, useValue: stellarService }, + { provide: getRepositoryToken(IndexerState), useValue: indexerStateRepo }, + { provide: getRepositoryToken(DeadLetterEvent), useValue: deadLetterRepo }, + { provide: getRepositoryToken(SavingsProduct), useValue: savingsProductRepo }, + { provide: DepositHandler, useValue: depositHandler }, + { provide: YieldHandler, useValue: yieldHandler }, + ], + }).compile(); + + service = module.get(IndexerService); + // Suppress logger output during tests + jest.spyOn(Logger.prototype, 'log').mockImplementation(() => null); + jest.spyOn(Logger.prototype, 'error').mockImplementation(() => null); + jest.spyOn(Logger.prototype, 'debug').mockImplementation(() => null); + jest.spyOn(Logger.prototype, 'warn').mockImplementation(() => null); + }); + + it('should be defined', () => { + expect(service).toBeDefined(); + }); + + describe('onModuleInit', () => { + it('should initialize state and load contract IDs', async () => { + await service.onModuleInit(); + expect(indexerStateRepo.findOne).toHaveBeenCalled(); + expect(savingsProductRepo.find).toHaveBeenCalledWith({ where: { isActive: true } }); + expect(service.getMonitoredContracts()).toContain('CC1'); + expect(service.getMonitoredContracts()).toContain('CC2'); + }); + }); + + describe('runIndexerCycle', () => { + beforeEach(async () => { + await service.onModuleInit(); + }); + + it('should fetch and process events successfully', async () => { + const mockEvents = [ + { id: '1', ledger: '101', topic: ['deposit'], value: '100', txHash: 'hash1' }, + ]; + (stellarService.getEvents as jest.Mock).mockResolvedValue(mockEvents); + + await service.runIndexerCycle(); + + expect(stellarService.getEvents).toHaveBeenCalledWith(101, ['CC1', 'CC2']); + expect(depositHandler.handle).toHaveBeenCalled(); + expect(indexerStateRepo.save).toHaveBeenCalled(); + expect(service.getIndexerState()?.lastProcessedLedger).toBe(101); + }); + + it('should handle failed events by logging to dead letter queue', async () => { + const mockEvents = [ + { id: '1', ledger: '101', topic: ['deposit'], value: 'fail', txHash: 'hash1' }, + ]; + (stellarService.getEvents as jest.Mock).mockResolvedValue(mockEvents); + depositHandler.handle.mockRejectedValue(new Error('Processing failed')); + + await service.runIndexerCycle(); + + expect(deadLetterRepo.save).toHaveBeenCalled(); + expect(service.getIndexerState()?.totalEventsFailed).toBe(1); + }); + + it('should skip cycle if no active contracts', async () => { + savingsProductRepo.find.mockResolvedValue([]); + await service.runIndexerCycle(); + expect(stellarService.getEvents).not.toHaveBeenCalled(); + }); + }); +}); diff --git a/backend/src/modules/blockchain/indexer.service.ts b/backend/src/modules/blockchain/indexer.service.ts index 604c29890..dfdb5327e 100644 --- a/backend/src/modules/blockchain/indexer.service.ts +++ b/backend/src/modules/blockchain/indexer.service.ts @@ -62,36 +62,46 @@ export class IndexerService implements OnModuleInit { @Cron(CronExpression.EVERY_5_SECONDS) async runIndexerCycle(): Promise { if (!this.indexerState) return; - if (this.contractIds.size === 0) return; - let events: SorobanEvent[] = []; - - try { - events = await this.fetchEvents(); - } catch (err) { - this.logger.error(`Failed to fetch events: ${(err as Error).message}`); - this.indexerState.updatedAt = new Date(); - await this.saveIndexerState(); + // Reload contract IDs to ensure we're watching any new active products + await this.loadContractIds(); + if (this.contractIds.size === 0) { + this.logger.debug('No active contracts to monitor'); return; } - let processed = 0; - let failed = 0; + try { + const events = await this.fetchEvents(); - for (const event of events) { - const ok = await this.processEvent(event); - if (ok) { - processed++; - } else { - failed++; + if (events.length === 0) { + this.logger.debug('No new events found'); + return; } - } - this.indexerState.totalEventsProcessed += processed; - this.indexerState.totalEventsFailed += failed; - this.indexerState.updatedAt = new Date(); + let processed = 0; + let failed = 0; + + for (const event of events) { + const ok = await this.processEvent(event); + if (ok) { + processed++; + } else { + failed++; + } + } + + this.logger.log( + `Processed ${processed} events (Failed: ${failed}) from ledger ${events[0].ledger} to ${events[events.length - 1].ledger}`, + ); + + this.indexerState.totalEventsProcessed += processed; + this.indexerState.totalEventsFailed += failed; + this.indexerState.updatedAt = new Date(); - await this.saveIndexerState(); + await this.saveIndexerState(); + } catch (err) { + this.logger.error(`Indexer cycle failed: ${(err as Error).message}`); + } } private async initializeIndexerState() { @@ -116,11 +126,12 @@ export class IndexerService implements OnModuleInit { where: { isActive: true }, }); - this.contractIds.clear(); - + const newSet = new Set(); for (const p of products) { - if (p.contractId) this.contractIds.add(p.contractId); + if (p.contractId) newSet.add(p.contractId); } + + this.contractIds = newSet; } private async saveIndexerState() { @@ -144,6 +155,9 @@ export class IndexerService implements OnModuleInit { return true; } catch (err) { const msg = (err as Error).message; + this.logger.error( + `Failed to process event ${event.id} from ledger ${event.ledger}: ${msg}`, + ); await this.dlqRepo.save( this.dlqRepo.create({ @@ -165,28 +179,22 @@ export class IndexerService implements OnModuleInit { } private async fetchEvents(): Promise { - if (!this.rpcServer || !this.indexerState) return []; - - const results: SorobanEvent[] = []; - - for (const contractId of this.contractIds) { - const rpcEvents = await (this.rpcServer as any).getEvents({ - startLedger: this.indexerState.lastProcessedLedger + 1, - filters: [{ contractIds: [contractId] }], - }); - - for (const e of rpcEvents.events || []) { - results.push({ - id: e.id, - ledger: parseInt(e.ledger, 10), - topic: e.topic, - value: e.value, - txHash: e.txHash, - }); - } - } + if (!this.indexerState) return []; + + const rpcEvents = await this.stellarService.getEvents( + this.indexerState.lastProcessedLedger + 1, + Array.from(this.contractIds), + ); - return results.sort((a, b) => a.ledger - b.ledger); + return rpcEvents + .map((e) => ({ + id: e.id, + ledger: parseInt(e.ledger, 10), + topic: e.topic, + value: e.value, + txHash: e.txHash, + })) + .sort((a, b) => a.ledger - b.ledger); } getIndexerState() { diff --git a/backend/src/modules/blockchain/stellar.service.ts b/backend/src/modules/blockchain/stellar.service.ts index ae6e5446e..7a317e09e 100644 --- a/backend/src/modules/blockchain/stellar.service.ts +++ b/backend/src/modules/blockchain/stellar.service.ts @@ -180,6 +180,29 @@ export class StellarService implements OnModuleInit { } } + async getEvents(startLedger: number, contractIds: string[]): Promise { + try { + return await this.rpcClient.executeWithRetry(async (client) => { + const rpcServer = client as rpc.Server; + const response = await rpcServer.getEvents({ + startLedger, + filters: [ + { + contractIds, + }, + ], + }); + return response.events || []; + }, 'rpc'); + } catch (error) { + this.logger.error( + `Failed to fetch events from ledger ${startLedger}: ${(error as Error).message}`, + error, + ); + throw error; + } + } + async getDelegationForUser(publicKey: string): Promise { const contractId = this.configService.get('stellar.contractId'); if (!contractId || !publicKey) {