diff --git a/backend/src/queue/processors/payout.consumer.ts b/backend/src/queue/processors/payout.consumer.ts new file mode 100644 index 0000000..f0a83d0 --- /dev/null +++ b/backend/src/queue/processors/payout.consumer.ts @@ -0,0 +1,92 @@ +import { Processor, WorkerHost, OnWorkerEvent } from '@nestjs/bullmq'; +import { Logger } from '@nestjs/common'; +import { Job } from 'bullmq'; +import { QUEUE_NAMES } from '../queue.constants'; +import { PayoutJobData } from '../queue.interfaces'; +import { PrismaService } from '../../prisma/prisma.service'; + +@Processor(QUEUE_NAMES.BOUNTY_PAYOUTS, { + concurrency: 3, +}) +export class PayoutConsumer extends WorkerHost { + private readonly logger = new Logger(PayoutConsumer.name); + + constructor(private prisma: PrismaService) { + super(); + } + + async process(job: Job): Promise { + const { bountyId, approvedAddresses } = job.data; + + this.logger.log( + `[PayoutConsumer] Processing payout for bounty ${bountyId} to ${approvedAddresses.length} addresses`, + ); + + // Simulate processing signatures (2-second sleep as specified) + await this.simulateSignatureProcessing(bountyId, approvedAddresses); + + // Update bounty status to COMPLETED after successful payout + try { + await this.prisma.bounty.update({ + where: { id: bountyId }, + data: { status: 'COMPLETED' }, + }); + this.logger.log( + `[PayoutConsumer] Bounty ${bountyId} status updated to COMPLETED`, + ); + } catch (err: any) { + this.logger.warn( + `[PayoutConsumer] Could not update bounty ${bountyId} status: ${err.message}`, + ); + } + + this.logger.log(`[PayoutConsumer] Payout job ${job.id} completed`); + } + + private async simulateSignatureProcessing( + bountyId: string, + addresses: string[], + ): Promise { + // Simulate multi-signature coordination / heavy calculations + const processingTime = 2000; + this.logger.debug( + `[PayoutConsumer] Simulating signature processing for ${bountyId} (${processingTime}ms)`, + ); + await new Promise((resolve) => setTimeout(resolve, processingTime)); + + this.logger.debug( + `[PayoutConsumer] Signatures processed for bounty ${bountyId}, addresses: ${addresses.join(', ')}`, + ); + } + + @OnWorkerEvent('active') + onActive(job: Job) { + this.logger.debug(`[PayoutConsumer] Job ${job.id} is now active`); + } + + @OnWorkerEvent('completed') + onCompleted(job: Job) { + this.logger.debug(`[PayoutConsumer] Job ${job.id} has completed`); + } + + @OnQueueFailed() + onFailed(job: Job, error: Error) { + this.logger.error( + `[PayoutConsumer] Job ${job.id} failed: ${error.message}`, + ); + + // Update local Database status back to FAILED on queue failure + if (job?.data?.bountyId) { + this.prisma.bounty + .update({ + where: { id: job.data.bountyId }, + data: { status: 'FAILED' }, + }) + .catch((err) => { + this.logger.error( + `[PayoutConsumer] Failed to update bounty status to FAILED: ${err.message}`, + ); + }); + } + } +} diff --git a/backend/src/queue/queue.constants.ts b/backend/src/queue/queue.constants.ts index a3ea54b..942fbc7 100644 --- a/backend/src/queue/queue.constants.ts +++ b/backend/src/queue/queue.constants.ts @@ -16,6 +16,11 @@ export const QUEUE_NAMES = { * Dummy queue for testing/proof of concept */ DUMMY: 'dummy', + + /** + * Queue for bounty payout processing + */ + BOUNTY_PAYOUTS: 'bounty-payouts', } as const; export type QueueName = (typeof QUEUE_NAMES)[keyof typeof QUEUE_NAMES]; diff --git a/backend/src/queue/queue.interfaces.ts b/backend/src/queue/queue.interfaces.ts index b38bf28..12ef690 100644 --- a/backend/src/queue/queue.interfaces.ts +++ b/backend/src/queue/queue.interfaces.ts @@ -72,3 +72,18 @@ export interface OnChainEventJobData { */ blockHeight: number; } + +/** + * Interface for bounty payout job data + */ +export interface PayoutJobData { + /** + * Bounty ID to process payout for + */ + bountyId: string; + + /** + * List of approved wallet addresses for payout + */ + approvedAddresses: string[]; +} diff --git a/backend/src/queue/queue.module.ts b/backend/src/queue/queue.module.ts index 19788a8..0bff284 100644 --- a/backend/src/queue/queue.module.ts +++ b/backend/src/queue/queue.module.ts @@ -3,6 +3,7 @@ import { BullModule } from '@nestjs/bullmq'; import { ConfigModule, ConfigService } from '@nestjs/config'; import { QUEUE_NAMES } from './queue.constants'; import { DummyProcessor } from './processors/dummy.processor'; +import { PayoutConsumer } from './processors/payout.consumer'; import { QueueService } from './queue.service'; import { QueueController } from './queue.controller'; @@ -46,10 +47,13 @@ import { QueueController } from './queue.controller'; { name: QUEUE_NAMES.ON_CHAIN_EVENTS, }, + { + name: QUEUE_NAMES.BOUNTY_PAYOUTS, + }, ), ], controllers: [QueueController], - providers: [QueueService, DummyProcessor], + providers: [QueueService, DummyProcessor, PayoutConsumer], exports: [QueueService, BullModule], }) export class QueueModule {} diff --git a/backend/src/queue/queue.service.ts b/backend/src/queue/queue.service.ts index b02f1f3..69ae07c 100644 --- a/backend/src/queue/queue.service.ts +++ b/backend/src/queue/queue.service.ts @@ -2,7 +2,7 @@ import { Injectable, Logger } from '@nestjs/common'; import { InjectQueue } from '@nestjs/bullmq'; import { Queue } from 'bullmq'; import { QUEUE_NAMES } from './queue.constants'; -import { DummyJobData, EmailJobData, OnChainEventJobData } from './queue.interfaces'; +import { DummyJobData, EmailJobData, OnChainEventJobData, PayoutJobData } from './queue.interfaces'; @Injectable() export class QueueService { @@ -13,6 +13,8 @@ export class QueueService { @InjectQueue(QUEUE_NAMES.EMAIL) private readonly emailQueue: Queue, @InjectQueue(QUEUE_NAMES.ON_CHAIN_EVENTS) private readonly onChainEventsQueue: Queue, + @InjectQueue(QUEUE_NAMES.BOUNTY_PAYOUTS) + private readonly bountyPayoutsQueue: Queue, ) {} /** @@ -64,6 +66,14 @@ export class QueueService { this.logger.log(`Added on-chain event job ${job.id} to queue`); } + /** + * Add a bounty payout job to the queue + */ + async addPayoutJob(data: PayoutJobData): Promise { + const job = await this.bountyPayoutsQueue.add('process-payout', data); + this.logger.log(`Added payout job ${job.id} to bounty-payouts queue`); + } + /** * Get queue statistics */