Skip to content

feat(fetch): add FreshCandidateCollector primitive for selection-time filtering#1808

Merged
chubes4 merged 1 commit intomainfrom
feat/fresh-candidate-collector
May 6, 2026
Merged

feat(fetch): add FreshCandidateCollector primitive for selection-time filtering#1808
chubes4 merged 1 commit intomainfrom
feat/fresh-candidate-collector

Conversation

@chubes4
Copy link
Copy Markdown
Member

@chubes4 chubes4 commented May 6, 2026

Summary

Adds a source-agnostic Data Machine core primitive for fresh candidate collection before fetch dedupe, per #1807.

Fetch handlers that page through external sources need to skip already-processed and currently-claimed candidates while continuing to scan until they find schedulable work. Today that behavior lives as ad-hoc shims in Intelligence MCP, data-machine-socials Reddit, and data-machine-events. This PR adds the generic primitive those consumers can converge on with no backwards compatibility shims.

Located at DataMachine\Core\Steps\Fetch\FreshCandidateCollector.

Design

Handlers drive the collector imperatively as they paginate:

$collector = new FreshCandidateCollector( $context, $max_items );
foreach ( $this->paginate( $config ) as $candidate ) {
    $collector->offer( (string) $candidate['id'], $normalized_item );
    if ( $collector->isFull() ) {
        break;
    }
}
$collector->markExhausted();

return array( 'items' => $collector->getAccepted() );

The collector reuses ExecutionContext::isItemProcessed() and isItemClaimed(), so datamachine_should_reprocess_item semantics stay consistent with the existing core dedupe path.

Selection-time, not authoritative

Final centralized dedupe/claim/cap in FetchHandler::get_fetch_data() is unchanged. The collector is purely a selection-time aid that lets handlers stop scanning once they have enough fresh work. Items it accepts still flow through the standard FetchHandler pipeline where the authoritative claim happens.

Diagnostics

getDiagnostics() returns raw_seen, accepted, processed_skipped, claimed_skipped, duplicate_skipped, reprocess_accepted, max_items, and source_exhausted. Cheap integers + bool — handlers can log them verbatim or surface them through engine data.

Tests

15 new unit tests under tests/Unit/Core/Steps/Fetch/FreshCandidateCollectorTest.php covering:

  • Processed candidate skip with diagnostic accounting.
  • Claimed candidate skip with diagnostic accounting.
  • Reprocess override accepted (filter forces a processed row back through).
  • Max candidate collection / isFull() short-circuit.
  • Source exhaustion flag (with idempotency).
  • Duplicate identifier guard within a single scan.
  • Empty identifier rejection without counter pollution.
  • Unlimited mode (max_items = 0).
  • Stable diagnostic shape contract.

Stubs ExecutionContext via PHPUnit mocks plus a small subclass so the primitive runs without a database — matches the pattern already used in FetchHandlerDataPacketTest.

Test results

homeboy test data-machine --skip-lint:

  • Before this PR: 1215 tests, 1 pre-existing failure (ImageGenerationPromptRefinementTest::test_refine_prompt_includes_post_context_when_provided).
  • After this PR: 1230 tests (+15 new), same single pre-existing failure, no regressions.
  • All 15 new tests pass.

The pre-existing failure is unrelated to this change and reproduces on origin/main.

vendor/bin/phpcs runs clean on both new files. Repo-wide homeboy lint reports 578 pre-existing JS findings — none in the new PHP files.

Follow-ups (separate PRs, not in this scope)

  • Automattic/intelligence: replace MCP fetch candidate window with this primitive.
  • Extra-Chill/data-machine-socials: replace Reddit pagination/processed-list shim.
  • Extra-Chill/data-machine-events: replace event/image/web-scraper candidate prefilters.

Closes #1807 once the follow-up repo migrations land.

AI assistance

  • AI assistance: Yes
  • Tool(s): OpenCode (openai/gpt-5.5)
  • Used for: Implementing the core fresh-candidate selection primitive, tests, and PR preparation. Chris remains responsible for review and merge.

… filtering

Source-agnostic helper for fetch handlers that paginate or scan an external
source and need to skip already-processed and currently-claimed candidates
while continuing to look for fresh work.

Handlers feed candidate identifiers to the collector imperatively. The
collector reuses ExecutionContext::isItemProcessed/isItemClaimed (so the
datamachine_should_reprocess_item filter applies consistently with final
fetch dedupe) and exposes diagnostics: raw_seen, accepted, processed_skipped,
claimed_skipped, duplicate_skipped, reprocess_accepted, max_items, and
source_exhausted.

Selection-time filtering only — FetchHandler::get_fetch_data() still owns
final dedupe/claim/cap. Replaces the per-extension overfetch / pagination /
processed-list workarounds in Intelligence MCP, data-machine-socials Reddit,
and data-machine-events. Refs #1807.
@chubes4 chubes4 merged commit a8bc102 into main May 6, 2026
3 checks passed
@chubes4 chubes4 deleted the feat/fresh-candidate-collector branch May 6, 2026 17:35
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add core primitive for fresh candidate collection before fetch dedupe

1 participant