diff --git a/docs/development/fresh-candidate-collector.md b/docs/development/fresh-candidate-collector.md new file mode 100644 index 000000000..1d8b6ea50 --- /dev/null +++ b/docs/development/fresh-candidate-collector.md @@ -0,0 +1,106 @@ +# Fresh Candidate Collector + +Source-agnostic primitive for fetch handlers that paginate or scan an external +source and need to skip already-processed and currently-claimed candidates +while continuing to look for fresh work. + +Located at `DataMachine\Core\Steps\Fetch\FreshCandidateCollector`. + +## Why this exists + +Data Machine core owns final dedupe/claim/cap in +`FetchHandler::get_fetch_data()`. That layer is authoritative — but it only +runs *after* a handler returns. When the top of a paginated feed is full of +items the flow has already processed, naive handlers come back with "no new +work" even when fresh items live two pages deeper. + +Multiple downstream consumers grew their own ad-hoc workarounds for the same +shape — overfetch windows, ability-level pagination filters, handler-level +processed-list prefilters. The fresh candidate collector is the generic core +primitive those consumers can converge on. + +## Selection-time vs authoritative dedupe + +| Layer | Where | What it does | +|---|---|---| +| Selection-time (this primitive) | Inside `executeFetch()` | Lets the handler stop scanning once it has enough fresh candidates. Skips items that are processed or actively claimed. | +| Authoritative dedupe + claim + cap | `FetchHandler::get_fetch_data()` | Final filter that runs after the handler returns. Re-checks processed/claim state, applies `max_items`, atomically claims. | + +The two layers run on the same `ExecutionContext` and the same +`datamachine_should_reprocess_item` filter, so they agree on what counts as +"fresh". The collector is purely an early-exit aid — handlers stay correct +even if they skip it entirely. + +## Usage + +```php +use DataMachine\Core\Steps\Fetch\FreshCandidateCollector; + +protected function executeFetch( array $config, ExecutionContext $context ): array { + $max_items = (int) ( $config['max_items'] ?? 5 ); + $collector = new FreshCandidateCollector( $context, $max_items ); + + foreach ( $this->paginate( $config ) as $candidate ) { + $collector->offer( + (string) $candidate['id'], + array( + 'title' => $candidate['title'], + 'content' => $candidate['body'], + 'metadata' => array( 'item_identifier' => (string) $candidate['id'] ), + ) + ); + + if ( $collector->isFull() ) { + break; + } + } + + // If pagination terminated naturally, mark exhaustion so diagnostics reflect it. + $collector->markExhausted(); + + $context->log( 'debug', 'Fresh candidate scan complete', $collector->getDiagnostics() ); + + return array( + 'items' => $collector->getAccepted(), + ); +} +``` + +## API summary + +- `offer( string $identifier, mixed $payload = null ): bool` — submit a candidate. Returns true when accepted. +- `isFull(): bool` — true when `max_items` reached. Always false when `max_items` is 0 (unlimited). +- `markExhausted(): void` — handler signals that pagination walked the source end-to-end. +- `isExhausted(): bool` +- `count(): int` — accepted count. +- `getMaxItems(): int` +- `getAccepted(): array` — payloads in offer order. +- `getDiagnostics(): array` — counters and exhaustion flag (see below). + +## Diagnostics + +`getDiagnostics()` returns: + +| Key | Meaning | +|---|---| +| `raw_seen` | Total candidates offered (excluding empty identifiers). | +| `accepted` | Candidates that passed all checks. | +| `processed_skipped` | Candidates skipped by `ExecutionContext::isItemProcessed()`. | +| `claimed_skipped` | Candidates skipped by `ExecutionContext::isItemClaimed()`. | +| `duplicate_skipped` | Same identifier offered twice in this scan. | +| `reprocess_accepted` | Accepted candidates whose row already existed in the processed table — i.e. the `datamachine_should_reprocess_item` filter forced a revisit. | +| `max_items` | Configured target. 0 = unlimited. | +| `source_exhausted` | True when the handler called `markExhausted()`. | + +These are intentionally cheap integers + a bool. Handlers can log them +verbatim, push them into engine data, or surface them through the fetch +result envelope without further shaping. + +## Non-goals + +- Not a replacement for final fetch dedupe — `FetchHandler` still claims and + caps after `executeFetch()` returns. +- Not aware of source-specific concepts (Reddit subreddits, MGS event types, + RSS GUID formats, etc.). Identifiers are opaque strings. +- Not concerned with how the handler paginates. The collector only sees the + candidates the handler chooses to offer. diff --git a/inc/Core/Steps/Fetch/FreshCandidateCollector.php b/inc/Core/Steps/Fetch/FreshCandidateCollector.php new file mode 100644 index 000000000..6956d377c --- /dev/null +++ b/inc/Core/Steps/Fetch/FreshCandidateCollector.php @@ -0,0 +1,244 @@ +paginate( $config ) as $candidate ) { + * $collector->offer( $candidate['id'], $candidate ); + * if ( $collector->isFull() ) { + * break; + * } + * } + * // If pagination terminated naturally, tell the collector. + * $collector->markExhausted(); + * + * return array( + * 'items' => $collector->getAccepted(), + * ); + * + * Items returned from `getAccepted()` still flow through the standard + * `FetchHandler` pipeline, where final dedupe + claim + max_items cap apply. + * + * @package DataMachine\Core\Steps\Fetch + * @since 0.105.0 + */ + +namespace DataMachine\Core\Steps\Fetch; + +use DataMachine\Core\ExecutionContext; + +if ( ! defined( 'ABSPATH' ) ) { + exit; +} + +class FreshCandidateCollector { + + public const SKIP_PROCESSED = 'processed'; + public const SKIP_CLAIMED = 'claimed'; + public const SKIP_DUPLICATE = 'duplicate'; + + private ExecutionContext $context; + + /** + * Target number of fresh candidates. 0 means unlimited — the handler + * decides when to stop scanning (typically by pagination exhaustion). + */ + private int $max_items; + + /** @var array Accepted candidate payloads in the order they were offered. */ + private array $accepted = array(); + + /** @var array Identifier set to detect duplicates within the current scan. */ + private array $seen_identifiers = array(); + + private int $raw_seen = 0; + private int $processed_skipped = 0; + private int $claimed_skipped = 0; + private int $duplicate_skipped = 0; + private int $reprocess_accepted = 0; + private bool $source_exhausted = false; + + public function __construct( ExecutionContext $context, int $max_items = 0 ) { + $this->context = $context; + $this->max_items = max( 0, $max_items ); + } + + /** + * Offer a candidate identifier (and optional payload) to the collector. + * + * The collector does the exact same processed/claim checks as + * `FetchHandler::filterProcessed()` so that selection-time filtering and + * final fetch-time filtering agree on what counts as "fresh". + * + * @param string $identifier Stable, unique identifier for the source item. + * @param mixed $payload Optional candidate payload to retain. When omitted, + * the identifier itself is stored. Handlers commonly + * pass the normalized item shape that `FetchHandler` + * expects (`title`, `content`, `metadata`, ...). + * @return bool True when the candidate was accepted, false when skipped or full. + */ + public function offer( string $identifier, mixed $payload = null ): bool { + if ( '' === $identifier ) { + // Identifier-less items are never deduped at selection time — + // they fall through to the regular FetchHandler path. Treat + // the offer as "not collected" so the caller can decide. + return false; + } + + ++$this->raw_seen; + + if ( $this->isFull() ) { + return false; + } + + if ( isset( $this->seen_identifiers[ $identifier ] ) ) { + ++$this->duplicate_skipped; + return false; + } + + if ( $this->context->isItemProcessed( $identifier ) ) { + ++$this->processed_skipped; + $this->seen_identifiers[ $identifier ] = true; + return false; + } + + if ( $this->context->isItemClaimed( $identifier ) ) { + ++$this->claimed_skipped; + $this->seen_identifiers[ $identifier ] = true; + return false; + } + + // Selection-time accept. If the row exists in the processed table and + // only got past `isItemProcessed()` because the + // `datamachine_should_reprocess_item` filter overrode the default + // skip, surface that as a separate diagnostic so consumers can + // distinguish "fresh discovery" from "scheduled revisit". Bounded by + // `max_items` so cost stays predictable. + if ( $this->rawIsProcessed( $identifier ) ) { + ++$this->reprocess_accepted; + } + + $this->seen_identifiers[ $identifier ] = true; + $this->accepted[] = ( null === $payload ) ? $identifier : $payload; + return true; + } + + /** + * True when the collector has reached its `max_items` target. + * + * Always false when `max_items` is 0 (unlimited). + */ + public function isFull(): bool { + return $this->max_items > 0 && count( $this->accepted ) >= $this->max_items; + } + + /** + * Mark the source as exhausted — used when the handler's pagination has + * walked every available candidate without filling the collector. + * + * Idempotent. + */ + public function markExhausted(): void { + $this->source_exhausted = true; + } + + /** + * Whether the source has been marked exhausted. + */ + public function isExhausted(): bool { + return $this->source_exhausted; + } + + /** + * Number of accepted (fresh) candidates so far. + */ + public function count(): int { + return count( $this->accepted ); + } + + /** + * Configured target. 0 means unlimited. + */ + public function getMaxItems(): int { + return $this->max_items; + } + + /** + * Accepted candidate payloads in offer order. + * + * @return array + */ + public function getAccepted(): array { + return $this->accepted; + } + + /** + * Diagnostic snapshot suitable for logging or returning alongside the + * accepted list. All counters are integers; `source_exhausted` is a bool. + * + * @return array{ + * raw_seen:int, + * accepted:int, + * processed_skipped:int, + * claimed_skipped:int, + * duplicate_skipped:int, + * reprocess_accepted:int, + * max_items:int, + * source_exhausted:bool + * } + */ + public function getDiagnostics(): array { + return array( + 'raw_seen' => $this->raw_seen, + 'accepted' => count( $this->accepted ), + 'processed_skipped' => $this->processed_skipped, + 'claimed_skipped' => $this->claimed_skipped, + 'duplicate_skipped' => $this->duplicate_skipped, + 'reprocess_accepted' => $this->reprocess_accepted, + 'max_items' => $this->max_items, + 'source_exhausted' => $this->source_exhausted, + ); + } + + /** + * Inspect the persisted processed-state without invoking the reprocess + * filter, so we can detect filter-driven overrides. + * + * Mirrors the early-out logic of `ExecutionContext::isItemProcessed()` for + * direct/standalone modes, where there is no persistence layer at all. + * + * Protected so tests can override without touching the database. + */ + protected function rawIsProcessed( string $identifier ): bool { + if ( $this->context->isDirect() || $this->context->isStandalone() ) { + return false; + } + + $flow_step_id = $this->context->getFlowStepId(); + if ( ! $flow_step_id ) { + return false; + } + + $repo = new \DataMachine\Core\Database\ProcessedItems\ProcessedItems(); + return $repo->has_item_been_processed( + $flow_step_id, + $this->context->getHandlerType(), + $identifier + ); + } +} diff --git a/tests/Unit/Core/Steps/Fetch/FreshCandidateCollectorTest.php b/tests/Unit/Core/Steps/Fetch/FreshCandidateCollectorTest.php new file mode 100644 index 000000000..e417bfca2 --- /dev/null +++ b/tests/Unit/Core/Steps/Fetch/FreshCandidateCollectorTest.php @@ -0,0 +1,290 @@ + */ + private array $raw_processed_map = array(); + + public function setRawProcessed( string $identifier, bool $processed ): void { + $this->raw_processed_map[ $identifier ] = $processed; + } + + protected function rawIsProcessed( string $identifier ): bool { + return $this->raw_processed_map[ $identifier ] ?? false; + } +} + +class FreshCandidateCollectorTest extends TestCase { + + // ----------------------------------------------------------------- + // Acceptance — fresh items pass through. + // ----------------------------------------------------------------- + + public function test_fresh_candidate_is_accepted(): void { + $collector = new FreshCandidateCollector( $this->buildContext(), 5 ); + + $this->assertTrue( $collector->offer( 'item-1', array( 'id' => 1 ) ) ); + $this->assertSame( 1, $collector->count() ); + $this->assertSame( array( array( 'id' => 1 ) ), $collector->getAccepted() ); + } + + public function test_payload_defaults_to_identifier(): void { + $collector = new FreshCandidateCollector( $this->buildContext(), 5 ); + + $collector->offer( 'plain-id' ); + $this->assertSame( array( 'plain-id' ), $collector->getAccepted() ); + } + + public function test_empty_identifier_is_rejected_without_counting(): void { + $collector = new FreshCandidateCollector( $this->buildContext(), 5 ); + + $this->assertFalse( $collector->offer( '' ) ); + $this->assertSame( 0, $collector->count() ); + $this->assertSame( 0, $collector->getDiagnostics()['raw_seen'] ); + } + + // ----------------------------------------------------------------- + // Processed candidate skip. + // ----------------------------------------------------------------- + + public function test_processed_candidate_is_skipped_with_diagnostic(): void { + $context = $this->buildContext( array( 'item-2' => true ) ); + $collector = new FreshCandidateCollector( $context, 5 ); + + $this->assertTrue( $collector->offer( 'item-1' ) ); + $this->assertFalse( $collector->offer( 'item-2' ) ); + $this->assertTrue( $collector->offer( 'item-3' ) ); + + $diag = $collector->getDiagnostics(); + $this->assertSame( 3, $diag['raw_seen'] ); + $this->assertSame( 2, $diag['accepted'] ); + $this->assertSame( 1, $diag['processed_skipped'] ); + $this->assertSame( 0, $diag['claimed_skipped'] ); + } + + // ----------------------------------------------------------------- + // Claimed candidate skip. + // ----------------------------------------------------------------- + + public function test_claimed_candidate_is_skipped_with_diagnostic(): void { + $context = $this->buildContext( array(), array( 'item-2' => true ) ); + $collector = new FreshCandidateCollector( $context, 5 ); + + $this->assertTrue( $collector->offer( 'item-1' ) ); + $this->assertFalse( $collector->offer( 'item-2' ) ); + + $diag = $collector->getDiagnostics(); + $this->assertSame( 0, $diag['processed_skipped'] ); + $this->assertSame( 1, $diag['claimed_skipped'] ); + $this->assertSame( 1, $diag['accepted'] ); + } + + public function test_processed_check_runs_before_claim_check(): void { + // Item that is BOTH processed and claimed should attribute to the + // processed bucket, since the default skip decision is dominated by + // the persisted processed row. + $context = $this->buildContext( + array( 'item-1' => true ), + array( 'item-1' => true ) + ); + $collector = new FreshCandidateCollector( $context, 5 ); + + $collector->offer( 'item-1' ); + $diag = $collector->getDiagnostics(); + $this->assertSame( 1, $diag['processed_skipped'] ); + $this->assertSame( 0, $diag['claimed_skipped'] ); + } + + // ----------------------------------------------------------------- + // Reprocess override — filter forces a processed row back through. + // ----------------------------------------------------------------- + + public function test_reprocess_override_is_accepted_and_diagnosed(): void { + // `isItemProcessed()` returns false because the reprocess filter + // overrode the default skip — but the underlying row still exists. + $context = $this->buildContext( array(), array() ); + $collector = new StubFreshCandidateCollector( $context, 5 ); + $collector->setRawProcessed( 'revisit-me', true ); + + $this->assertTrue( $collector->offer( 'revisit-me', array( 'id' => 'revisit-me' ) ) ); + + $diag = $collector->getDiagnostics(); + $this->assertSame( 1, $diag['accepted'] ); + $this->assertSame( 1, $diag['reprocess_accepted'] ); + $this->assertSame( 0, $diag['processed_skipped'] ); + } + + public function test_truly_fresh_candidate_does_not_count_as_reprocess(): void { + $context = $this->buildContext(); + $collector = new StubFreshCandidateCollector( $context, 5 ); + $collector->setRawProcessed( 'never-seen', false ); + + $collector->offer( 'never-seen' ); + $this->assertSame( 0, $collector->getDiagnostics()['reprocess_accepted'] ); + } + + // ----------------------------------------------------------------- + // Max candidate collection. + // ----------------------------------------------------------------- + + public function test_collector_stops_accepting_once_full(): void { + $collector = new FreshCandidateCollector( $this->buildContext(), 2 ); + + $this->assertTrue( $collector->offer( 'item-1' ) ); + $this->assertTrue( $collector->offer( 'item-2' ) ); + $this->assertTrue( $collector->isFull() ); + $this->assertFalse( $collector->offer( 'item-3' ) ); + $this->assertSame( 2, $collector->count() ); + } + + public function test_max_items_zero_means_unlimited(): void { + $collector = new FreshCandidateCollector( $this->buildContext(), 0 ); + + for ( $i = 0; $i < 50; $i++ ) { + $collector->offer( "item-{$i}" ); + } + + $this->assertSame( 50, $collector->count() ); + $this->assertFalse( $collector->isFull() ); + } + + public function test_full_collector_does_not_increment_processed_skipped(): void { + // Once full, additional candidates short-circuit before consulting + // processed/claim state — the collector is done caring. + $context = $this->buildContext( array( 'item-3' => true ) ); + $collector = new FreshCandidateCollector( $context, 2 ); + + $collector->offer( 'item-1' ); + $collector->offer( 'item-2' ); + $collector->offer( 'item-3' ); // would-be skip, but collector is full + $collector->offer( 'item-4' ); + + $diag = $collector->getDiagnostics(); + $this->assertSame( 2, $diag['accepted'] ); + $this->assertSame( 0, $diag['processed_skipped'] ); + $this->assertSame( 4, $diag['raw_seen'] ); + } + + // ----------------------------------------------------------------- + // Source exhaustion. + // ----------------------------------------------------------------- + + public function test_source_exhaustion_is_recorded(): void { + $collector = new FreshCandidateCollector( $this->buildContext(), 5 ); + + $collector->offer( 'item-1' ); + $this->assertFalse( $collector->isExhausted() ); + $this->assertFalse( $collector->getDiagnostics()['source_exhausted'] ); + + $collector->markExhausted(); + + $this->assertTrue( $collector->isExhausted() ); + $this->assertTrue( $collector->getDiagnostics()['source_exhausted'] ); + } + + public function test_mark_exhausted_is_idempotent(): void { + $collector = new FreshCandidateCollector( $this->buildContext(), 5 ); + + $collector->markExhausted(); + $collector->markExhausted(); + + $this->assertTrue( $collector->isExhausted() ); + } + + // ----------------------------------------------------------------- + // Duplicate identifier guard within a single scan. + // ----------------------------------------------------------------- + + public function test_duplicate_identifier_is_skipped_within_scan(): void { + $collector = new FreshCandidateCollector( $this->buildContext(), 5 ); + + $this->assertTrue( $collector->offer( 'item-1' ) ); + $this->assertFalse( $collector->offer( 'item-1' ) ); + + $diag = $collector->getDiagnostics(); + $this->assertSame( 1, $diag['accepted'] ); + $this->assertSame( 1, $diag['duplicate_skipped'] ); + } + + // ----------------------------------------------------------------- + // Diagnostic shape. + // ----------------------------------------------------------------- + + public function test_diagnostics_shape_is_stable(): void { + $collector = new FreshCandidateCollector( $this->buildContext(), 7 ); + + $diag = $collector->getDiagnostics(); + + $this->assertSame( + array( + 'raw_seen', + 'accepted', + 'processed_skipped', + 'claimed_skipped', + 'duplicate_skipped', + 'reprocess_accepted', + 'max_items', + 'source_exhausted', + ), + array_keys( $diag ) + ); + $this->assertSame( 7, $diag['max_items'] ); + } + + // ----------------------------------------------------------------- + // Helpers. + // ----------------------------------------------------------------- + + /** + * Build an ExecutionContext mock that returns canned processed/claim + * answers from the supplied lookup maps. + * + * @param array $processed_map identifier => isItemProcessed result + * @param array $claimed_map identifier => isItemClaimed result + */ + private function buildContext( array $processed_map = array(), array $claimed_map = array() ): ExecutionContext { + $context = $this->getMockBuilder( ExecutionContext::class ) + ->disableOriginalConstructor() + ->onlyMethods( array( 'isItemProcessed', 'isItemClaimed', 'isDirect', 'isStandalone', 'getFlowStepId', 'getHandlerType' ) ) + ->getMock(); + + $context->method( 'isItemProcessed' )->willReturnCallback( + static function ( string $identifier ) use ( $processed_map ): bool { + return $processed_map[ $identifier ] ?? false; + } + ); + $context->method( 'isItemClaimed' )->willReturnCallback( + static function ( string $identifier ) use ( $claimed_map ): bool { + return $claimed_map[ $identifier ] ?? false; + } + ); + $context->method( 'isDirect' )->willReturn( false ); + $context->method( 'isStandalone' )->willReturn( false ); + $context->method( 'getFlowStepId' )->willReturn( 'flow-step-1' ); + $context->method( 'getHandlerType' )->willReturn( 'test_source' ); + + return $context; + } +}