Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions backend/src/queue/processors/payout.consumer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
import { Processor, WorkerHost, OnWorkerEvent } from '@nestjs/bullmq';
import { Logger } from '@nestjs/common';
import { Job } from 'bullmq';
import { QUEUE_NAMES } from '../queue.constants';
import { PayoutJobData } from '../queue.interfaces';
import { PrismaService } from '../../prisma/prisma.service';

@Processor(QUEUE_NAMES.BOUNTY_PAYOUTS, {
concurrency: 3,
})
export class PayoutConsumer extends WorkerHost {
private readonly logger = new Logger(PayoutConsumer.name);

constructor(private prisma: PrismaService) {
super();
}

async process(job: Job<PayoutJobData>): Promise<void> {
const { bountyId, approvedAddresses } = job.data;

this.logger.log(
`[PayoutConsumer] Processing payout for bounty ${bountyId} to ${approvedAddresses.length} addresses`,
);

// Simulate processing signatures (2-second sleep as specified)
await this.simulateSignatureProcessing(bountyId, approvedAddresses);

// Update bounty status to COMPLETED after successful payout
try {
await this.prisma.bounty.update({
where: { id: bountyId },
data: { status: 'COMPLETED' },
});
this.logger.log(
`[PayoutConsumer] Bounty ${bountyId} status updated to COMPLETED`,
);
} catch (err: any) {
this.logger.warn(
`[PayoutConsumer] Could not update bounty ${bountyId} status: ${err.message}`,
);
}

this.logger.log(`[PayoutConsumer] Payout job ${job.id} completed`);
}

private async simulateSignatureProcessing(
bountyId: string,
addresses: string[],
): Promise<void> {
// Simulate multi-signature coordination / heavy calculations
const processingTime = 2000;
this.logger.debug(
`[PayoutConsumer] Simulating signature processing for ${bountyId} (${processingTime}ms)`,
);
await new Promise((resolve) => setTimeout(resolve, processingTime));

this.logger.debug(
`[PayoutConsumer] Signatures processed for bounty ${bountyId}, addresses: ${addresses.join(', ')}`,
);
}

@OnWorkerEvent('active')
onActive(job: Job<PayoutJobData>) {
this.logger.debug(`[PayoutConsumer] Job ${job.id} is now active`);
}

@OnWorkerEvent('completed')
onCompleted(job: Job<PayoutJobData>) {
this.logger.debug(`[PayoutConsumer] Job ${job.id} has completed`);
}

@OnQueueFailed()
onFailed(job: Job<PayoutJobData>, error: Error) {
this.logger.error(
`[PayoutConsumer] Job ${job.id} failed: ${error.message}`,
);

// Update local Database status back to FAILED on queue failure
if (job?.data?.bountyId) {
this.prisma.bounty
.update({
where: { id: job.data.bountyId },
data: { status: 'FAILED' },
})
.catch((err) => {
this.logger.error(
`[PayoutConsumer] Failed to update bounty status to FAILED: ${err.message}`,
);
});
}
}
}
5 changes: 5 additions & 0 deletions backend/src/queue/queue.constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ export const QUEUE_NAMES = {
* Dummy queue for testing/proof of concept
*/
DUMMY: 'dummy',

/**
* Queue for bounty payout processing
*/
BOUNTY_PAYOUTS: 'bounty-payouts',
} as const;

export type QueueName = (typeof QUEUE_NAMES)[keyof typeof QUEUE_NAMES];
15 changes: 15 additions & 0 deletions backend/src/queue/queue.interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,18 @@ export interface OnChainEventJobData {
*/
blockHeight: number;
}

/**
* Interface for bounty payout job data
*/
export interface PayoutJobData {
/**
* Bounty ID to process payout for
*/
bountyId: string;

/**
* List of approved wallet addresses for payout
*/
approvedAddresses: string[];
}
6 changes: 5 additions & 1 deletion backend/src/queue/queue.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { BullModule } from '@nestjs/bullmq';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { QUEUE_NAMES } from './queue.constants';
import { DummyProcessor } from './processors/dummy.processor';
import { PayoutConsumer } from './processors/payout.consumer';
import { QueueService } from './queue.service';
import { QueueController } from './queue.controller';

Expand Down Expand Up @@ -46,10 +47,13 @@ import { QueueController } from './queue.controller';
{
name: QUEUE_NAMES.ON_CHAIN_EVENTS,
},
{
name: QUEUE_NAMES.BOUNTY_PAYOUTS,
},
),
],
controllers: [QueueController],
providers: [QueueService, DummyProcessor],
providers: [QueueService, DummyProcessor, PayoutConsumer],
exports: [QueueService, BullModule],
})
export class QueueModule {}
12 changes: 11 additions & 1 deletion backend/src/queue/queue.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Injectable, Logger } from '@nestjs/common';
import { InjectQueue } from '@nestjs/bullmq';
import { Queue } from 'bullmq';
import { QUEUE_NAMES } from './queue.constants';
import { DummyJobData, EmailJobData, OnChainEventJobData } from './queue.interfaces';
import { DummyJobData, EmailJobData, OnChainEventJobData, PayoutJobData } from './queue.interfaces';

@Injectable()
export class QueueService {
Expand All @@ -13,6 +13,8 @@ export class QueueService {
@InjectQueue(QUEUE_NAMES.EMAIL) private readonly emailQueue: Queue,
@InjectQueue(QUEUE_NAMES.ON_CHAIN_EVENTS)
private readonly onChainEventsQueue: Queue,
@InjectQueue(QUEUE_NAMES.BOUNTY_PAYOUTS)
private readonly bountyPayoutsQueue: Queue,
) {}

/**
Expand Down Expand Up @@ -64,6 +66,14 @@ export class QueueService {
this.logger.log(`Added on-chain event job ${job.id} to queue`);
}

/**
* Add a bounty payout job to the queue
*/
async addPayoutJob(data: PayoutJobData): Promise<void> {
const job = await this.bountyPayoutsQueue.add('process-payout', data);
this.logger.log(`Added payout job ${job.id} to bounty-payouts queue`);
}

/**
* Get queue statistics
*/
Expand Down