Skip to content

Log extract-1 queue jobs to CSV during fanout cycle #347

@murderteeth

Description

@murderteeth

Description

When the indexer runs a full "abi fanout" cycle it queues up significantly more jobs than expected. It might be fine — everything could be working as designed — but we currently have no structured way to see exactly what's being produced. Console logs are transient and don't give a clear picture of the full cascade. We need a "job inventory": a way to capture every job that lands in a queue during a fanout cycle so we can review it offline.

As a first step, we're scoping this to just the mainnet extract queue (extract-1) since that's where the bulk of work lands.

Context

Current implementation: The mq.add() function in packages/lib/mq.ts resolves queue names at line 71 — jobs with bychain: true go to extract-{chainId}. For mainnet (chainId 1), this means extract.evmlog, extract.snapshot, extract.timeseries, and extract.block jobs all land in the extract-1 queue.

What we're measuring: The full set of extract jobs produced for mainnet during one fanout cycle — job name, target address, block ranges, abiPath, etc.

Relevant files:

  • packages/lib/mq.ts:70-74 — the add() function where we intercept
  • packages/ingest/fanout/abis.ts — primary fanout
  • packages/ingest/fanout/events.ts — creates extract.evmlog jobs per stride chunk
  • packages/ingest/fanout/timeseries.ts — creates extract.timeseries jobs per missing date

Tasks

1. Add CSV logging to mq.add()

  • In packages/lib/mq.ts, add a conditional in add() that appends to a CSV file when the resolved queue name is extract-1
  • Gate behind an env var like MQ_INVENTORY=true so it's opt-in
  • CSV columns: jobName, abiPath, address, chainId, fromBlock, toBlock, outputLabel
    • Pull fields from the job data object, defaulting to empty when not present
  • Write to output/extract-1-inventory.csv (or configurable via env)
  • Log the job to CSV and still add it to the queue normally — this runs alongside a real fanout, not instead of one

2. Print summary on process exit

  • Register a process.on('beforeExit') or similar handler that, when MQ_INVENTORY is set, prints a summary: total jobs, breakdown by jobName, breakdown by abiPath
  • Alternatively, just rely on the CSV and analyze externally

Acceptance Criteria

  • Setting MQ_INVENTORY=true causes every job added to extract-1 to be logged to a CSV
  • CSV captures: jobName, abiPath, address, chainId, fromBlock, toBlock, outputLabel
  • Normal fanout operation is not affected — jobs still get added to Redis as usual
  • CSV file is created/appended correctly across the full fanout cycle

Technical Notes

  • Keep changes minimal — a few lines in mq.add() gated behind an env var
  • The CSV append should be sync (fs.appendFileSync) to avoid race conditions and keep it simple
  • No need to capture load queue jobs or other chains — just extract-1

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions