diff --git a/.gitignore b/.gitignore index b5e49e13..607e0875 100644 --- a/.gitignore +++ b/.gitignore @@ -7,3 +7,4 @@ build/ cache gql-cache* __pycache__ +output/ diff --git a/packages/lib/mq.ts b/packages/lib/mq.ts index b74fb6b9..23027d45 100644 --- a/packages/lib/mq.ts +++ b/packages/lib/mq.ts @@ -1,6 +1,11 @@ import { Queue, Worker } from 'bullmq' -import { Job } from './types' +import fs from 'fs' +import path from 'path' import chains from './chains' +import { Job } from './types' + +const MQ_INVENTORY = process.env.MQ_INVENTORY === 'true' +const MQ_INVENTORY_PATH = process.env.MQ_INVENTORY_PATH || path.resolve(process.cwd(), 'output/extract-1-inventory.csv') export const q = { fanout: 'fanout', @@ -69,6 +74,30 @@ export function connect(queueName: string) { // eslint-disable-next-line @typescript-eslint/no-explicit-any export async function add(job: Job, data: any, options?: any) { const queue = job.bychain ? `${job.queue}-${data.chainId}` : job.queue + if (MQ_INVENTORY && queue === 'extract-1') { + try { + fs.mkdirSync(path.dirname(MQ_INVENTORY_PATH), { recursive: true }) + if (!fs.existsSync(MQ_INVENTORY_PATH)) { + fs.writeFileSync(MQ_INVENTORY_PATH, 'jobName,abiPath,address,chainId,fromBlock,toBlock,outputLabel\n') + } + const csvEscape = (v: unknown) => { + const s = String(v ?? '') + return s.includes(',') || s.includes('"') ? `"${s.replace(/"/g, '""')}"` : s + } + const row = [ + job.name, + data.abiPath ?? data.abi?.abiPath ?? '', + data.address ?? data.source?.address ?? '', + data.chainId ?? data.source?.chainId ?? '', + data.from ?? data.fromBlock ?? '', + data.to ?? data.toBlock ?? '', + data.outputLabel ?? '' + ].map(csvEscape).join(',') + fs.appendFileSync(MQ_INVENTORY_PATH, row + '\n') + } catch(error) { + console.error('šŸ“‹', 'inventory write failed', error) + } + } if (!queues[queue]) { queues[queue] = connect(queue) } return await queues[queue].add(job.name, data, { priority: DEFAULT_PRIORITY, attempts: 1, ...options }) } @@ -155,3 +184,31 @@ export function computeConcurrency(jobs: number, options: ConcurrencyOptions) { export async function down() { return Promise.all(Object.values(queues).map(async queue => queue.close())) } + +if (MQ_INVENTORY) { + process.on('beforeExit', () => { + if (!fs.existsSync(MQ_INVENTORY_PATH)) return + const lines = fs.readFileSync(MQ_INVENTORY_PATH, 'utf-8').split('\n').filter(Boolean) + const total = lines.length - 1 // exclude header + if (total <= 0) return + + const byName: Record = {} + const byAbiPath: Record = {} + for (let i = 1; i < lines.length; i++) { + const [jobName, abiPath] = lines[i].split(',') + byName[jobName] = (byName[jobName] || 0) + 1 + if (abiPath) byAbiPath[abiPath] = (byAbiPath[abiPath] || 0) + 1 + } + + console.log(`\nšŸ“‹ MQ Inventory Summary (extract-1): ${total} jobs`) + console.log(' By jobName:') + for (const [k, v] of Object.entries(byName).sort((a, b) => b[1] - a[1])) { + console.log(` ${k}: ${v}`) + } + console.log(' By abiPath:') + for (const [k, v] of Object.entries(byAbiPath).sort((a, b) => b[1] - a[1])) { + console.log(` ${k}: ${v}`) + } + console.log(` CSV written to: ${MQ_INVENTORY_PATH}\n`) + }) +}