diff --git a/inc/Steps/EventImport/Handlers/EventFlyer/EventFlyer.php b/inc/Steps/EventImport/Handlers/EventFlyer/EventFlyer.php index 904dfdc..3446946 100644 --- a/inc/Steps/EventImport/Handlers/EventFlyer/EventFlyer.php +++ b/inc/Steps/EventImport/Handlers/EventFlyer/EventFlyer.php @@ -12,6 +12,7 @@ namespace DataMachineEvents\Steps\EventImport\Handlers\EventFlyer; use DataMachine\Core\ExecutionContext; +use DataMachine\Core\Steps\Fetch\FreshCandidateCollector; use DataMachineEvents\Steps\EventImport\Handlers\EventImportHandler; use DataMachineEvents\Steps\EventImport\Handlers\VenueFieldsTrait; use DataMachineEvents\Utilities\EventIdentifierGenerator; @@ -140,6 +141,12 @@ private function getNextUnprocessedImage( ExecutionContext $context ): ?array { $image_extensions = array( 'jpg', 'jpeg', 'png', 'gif', 'webp' ); + // Selection-time prefilter via Data Machine core primitive. + // FreshCandidateCollector skips already-processed and currently-claimed + // images consistently with FetchHandler's authoritative dedup. Authoritative + // dedup/claim/cap still runs in FetchHandler::get_fetch_data() after this. + $collector = new FreshCandidateCollector( $context, 1 ); + foreach ( $repo_files as $file ) { $extension = strtolower( pathinfo( $file['filename'], PATHINFO_EXTENSION ) ); @@ -148,23 +155,25 @@ private function getNextUnprocessedImage( ExecutionContext $context ): ?array { } $file_identifier = $file['path']; - - // Pre-filter: skip already-processed images to find the next one. - // This is a selection mechanism, not dedup — dedup happens in FetchHandler::dedup(). - if ( $context->isItemProcessed( $file_identifier ) ) { - continue; - } - - return array( + $image = array( 'original_name' => $file['filename'], 'persistent_path' => $file['path'], 'size' => $file['size'], 'mime_type' => $this->getMimeType( $file['path'] ), 'uploaded_at' => gmdate( 'Y-m-d H:i:s', $file['modified'] ), ); + + $collector->offer( $file_identifier, $image ); + + if ( $collector->isFull() ) { + break; + } } - return null; + $collector->markExhausted(); + + $accepted = $collector->getAccepted(); + return $accepted[0] ?? null; } private function getMimeType( string $file_path ): string { diff --git a/inc/Steps/EventImport/Handlers/EventImportHandler.php b/inc/Steps/EventImport/Handlers/EventImportHandler.php index 8a59e25..1da29d8 100644 --- a/inc/Steps/EventImport/Handlers/EventImportHandler.php +++ b/inc/Steps/EventImport/Handlers/EventImportHandler.php @@ -110,39 +110,6 @@ public function isPastEvent( string $start_date ): bool { return strtotime( $start_date ) < strtotime( 'today' ); } - /** - * Check if item has been processed (uses ExecutionContext). - * - * @deprecated Use metadata['item_identifier'] instead. Dedup is now centralized in FetchHandler::dedup(). - * - * @param ExecutionContext $context Execution context - * @param string $item_identifier Item identifier - * @return bool True if already processed - */ - public function checkItemProcessed( ExecutionContext $context, string $item_identifier ): bool { - return $context->isItemProcessed( $item_identifier ); - } - - /** - * Mark item as processed (uses ExecutionContext). - * - * Also stores item context in engine data for the skip_item tool. - * - * @deprecated Use metadata['item_identifier'] instead. Dedup is now centralized in FetchHandler::dedup(). - * - * @param ExecutionContext $context Execution context - * @param string $item_identifier Item identifier - */ - public function markItemAsProcessed( ExecutionContext $context, string $item_identifier ): void { - $context->markItemProcessed( $item_identifier ); - - // Store item context for skip_item tool - $job_id = $context->getJobId(); - if ( $job_id ) { - EventEngineData::storeItemContext( (int) $job_id, $item_identifier, $this->handler_type ); - } - } - /** * Called by FetchHandler::dedup() after marking an item as processed. * diff --git a/inc/Steps/EventImport/Handlers/WebScraper/EventSectionFinder.php b/inc/Steps/EventImport/Handlers/WebScraper/EventSectionFinder.php index be69eca..a82997f 100644 --- a/inc/Steps/EventImport/Handlers/WebScraper/EventSectionFinder.php +++ b/inc/Steps/EventImport/Handlers/WebScraper/EventSectionFinder.php @@ -11,6 +11,7 @@ namespace DataMachineEvents\Steps\EventImport\Handlers\WebScraper; use DataMachine\Core\ExecutionContext; +use DataMachine\Core\Steps\Fetch\FreshCandidateCollector; if ( ! defined( 'ABSPATH' ) ) { exit; @@ -19,9 +20,9 @@ final class EventSectionFinder { /** - * @var callable(string): bool + * @var FreshCandidateCollector */ - private $is_item_processed; + private FreshCandidateCollector $fresh_candidates; /** * @var callable(string): string @@ -34,12 +35,12 @@ final class EventSectionFinder { private $is_past_event; /** - * @param callable(string): bool $is_item_processed Check if item identifier has been processed + * @param FreshCandidateCollector $fresh_candidates Selection-time fresh candidate collector. * @param callable(string): string $clean_html_for_ai Clean HTML for AI processing * @param callable(string): bool $is_past_event Check if Y-m-d date is in the past */ - public function __construct( callable $is_item_processed, callable $clean_html_for_ai, callable $is_past_event ) { - $this->is_item_processed = $is_item_processed; + public function __construct( FreshCandidateCollector $fresh_candidates, callable $clean_html_for_ai, callable $is_past_event ) { + $this->fresh_candidates = $fresh_candidates; $this->clean_html_for_ai = $clean_html_for_ai; $this->is_past_event = $is_past_event; } @@ -96,7 +97,7 @@ public function find_first_eligible_section( string $html_content, string $url, // Build stable identifier from decoded data $event_identifier = md5( $url . $summary . ( $event_data['start'] ?? '' ) ); - if ( call_user_func( $this->is_item_processed, $event_identifier ) ) { + if ( ! $this->fresh_candidates->offer( $event_identifier ) ) { continue; } @@ -128,12 +129,12 @@ public function find_first_eligible_section( string $html_content, string $url, $content_hash = md5( $raw_html ); $event_identifier = md5( $url . $content_hash ); - if ( call_user_func( $this->is_item_processed, $event_identifier ) ) { + $cleaned_html = call_user_func( $this->clean_html_for_ai, $raw_html ); + if ( empty( $cleaned_html ) || strlen( $cleaned_html ) < 30 ) { continue; } - $cleaned_html = call_user_func( $this->clean_html_for_ai, $raw_html ); - if ( empty( $cleaned_html ) || strlen( $cleaned_html ) < 30 ) { + if ( ! $this->fresh_candidates->offer( $event_identifier ) ) { continue; } diff --git a/inc/Steps/EventImport/Handlers/WebScraper/UniversalWebScraper.php b/inc/Steps/EventImport/Handlers/WebScraper/UniversalWebScraper.php index 30b3be3..90b4553 100644 --- a/inc/Steps/EventImport/Handlers/WebScraper/UniversalWebScraper.php +++ b/inc/Steps/EventImport/Handlers/WebScraper/UniversalWebScraper.php @@ -48,6 +48,7 @@ namespace DataMachineEvents\Steps\EventImport\Handlers\WebScraper; use DataMachine\Core\ExecutionContext; +use DataMachine\Core\Steps\Fetch\FreshCandidateCollector; use DataMachineEvents\Steps\EventImport\Handlers\EventImportHandler; use DataMachineEvents\Steps\EventImport\Handlers\WebScraper\Extractors\ExtractorInterface; use DataMachineEvents\Steps\EventImport\Handlers\WebScraper\Extractors\WixEventsExtractor; @@ -469,12 +470,18 @@ private function tryHtmlSectionExtraction( ExecutionContext $context, int $current_page ): ?array { - $skipped_identifiers = array(); + // Selection-time prefilter via Data Machine core primitive. + // The collector skips processed/claimed/duplicate sections while this + // method keeps event-specific filters (HTML validity, title keywords, + // include/exclude keywords). FetchHandler remains authoritative after + // executeFetch() returns. + $section_collector = new FreshCandidateCollector( $context ); while ( true ) { - $event_section = $this->extract_event_sections( $html_content, $current_url, $context, $skipped_identifiers ); + $event_section = $this->extract_event_sections( $html_content, $current_url, $context, $section_collector ); if ( empty( $event_section ) ) { + $section_collector->markExhausted(); break; } @@ -490,13 +497,11 @@ private function tryHtmlSectionExtraction( $raw_html_data = $this->extract_raw_html_section( $event_section['raw_html'], $current_url, $context, $config ); if ( ! $raw_html_data ) { - $skipped_identifiers[ $event_section['identifier'] ] = true; continue; } $section_title = $this->extract_section_title( $raw_html_data ); if ( '' !== $section_title && $this->shouldSkipEventTitle( $section_title ) ) { - $skipped_identifiers[ $event_section['identifier'] ] = true; continue; } @@ -511,7 +516,6 @@ private function tryHtmlSectionExtraction( 'source_url' => $current_url, ) ); - $skipped_identifiers[ $event_section['identifier'] ] = true; continue; } @@ -524,7 +528,6 @@ private function tryHtmlSectionExtraction( 'source_url' => $current_url, ) ); - $skipped_identifiers[ $event_section['identifier'] ] = true; continue; } @@ -740,11 +743,11 @@ private function fetch_html( string $url, ExecutionContext $context ): string { } /** - * Extract first non-processed event HTML section from content. + * Extract first fresh event HTML section from content. */ - private function extract_event_sections( string $html_content, string $url, ExecutionContext $context, array $skipped_identifiers = array() ): ?array { + private function extract_event_sections( string $html_content, string $url, ExecutionContext $context, FreshCandidateCollector $section_collector ): ?array { $finder = new EventSectionFinder( - fn ( string $identifier ): bool => isset( $skipped_identifiers[ $identifier ] ) || $context->isItemProcessed( $identifier ), + $section_collector, fn ( string $html ): string => $this->clean_html_for_ai( $html ), fn ( string $ymd ): bool => $this->isPastEvent( $ymd ) ); diff --git a/inc/Steps/EventImport/Handlers/WebScraper/VisionExtractionProcessor.php b/inc/Steps/EventImport/Handlers/WebScraper/VisionExtractionProcessor.php index f28f597..6ac644b 100644 --- a/inc/Steps/EventImport/Handlers/WebScraper/VisionExtractionProcessor.php +++ b/inc/Steps/EventImport/Handlers/WebScraper/VisionExtractionProcessor.php @@ -13,6 +13,7 @@ namespace DataMachineEvents\Steps\EventImport\Handlers\WebScraper; use DataMachine\Core\ExecutionContext; +use DataMachine\Core\Steps\Fetch\FreshCandidateCollector; use DataMachineEvents\Steps\EventImport\Handlers\EventImportHandler; use DataMachineEvents\Steps\EventImport\Handlers\WebScraper\Extractors\VisionExtractor; use DataMachineEvents\Steps\EventImport\EventEngineData; @@ -76,41 +77,57 @@ public function process( ) ); + // Selection-time prefilter via Data Machine core primitive. + // FreshCandidateCollector skips already-processed and currently-claimed + // images consistently with FetchHandler's authoritative dedup. Authoritative + // dedup/claim/cap still runs in FetchHandler::get_fetch_data() after this. + $collector = new FreshCandidateCollector( $context, 1 ); foreach ( $candidates as $candidate ) { - $image_url = $candidate['url']; - - // Generate content-based identifier for cross-run tracking. + $image_url = $candidate['url']; + // Content-based identifier for cross-run tracking. $image_identifier = md5( $url . $image_url ); - // Pre-filter: skip already-processed images to find the next candidate. - // This is a selection mechanism, not dedup — dedup happens in FetchHandler::dedup(). - if ( $context->isItemProcessed( $image_identifier ) ) { - $context->log( - 'debug', - 'VisionExtractor: Skipping processed image', - array( 'image_url' => $image_url ) - ); - continue; + $collector->offer( + $image_identifier, + array( + 'image_url' => $image_url, + 'image_identifier' => $image_identifier, + 'score' => $candidate['score'] ?? 0, + ) + ); + + if ( $collector->isFull() ) { + break; } + } + $collector->markExhausted(); + + $accepted = $collector->getAccepted(); + if ( ! empty( $accepted ) ) { + $selected = $accepted[0]; + $image_url = $selected['image_url']; + $image_identifier = $selected['image_identifier']; $context->log( 'debug', 'VisionExtractor: Processing image candidate', array( 'image_url' => $image_url, - 'score' => $candidate['score'], + 'score' => $selected['score'], ) ); // Download to persistent storage. $file_path = $this->downloadImageToPersistentStorage( $image_url, $context ); - // Mark as processed AFTER download attempt (success or fail). - // Direct call — this is a pre-filter, not dedup. Centralized dedup - // in FetchHandler::dedup() runs after executeFetch() returns. - $context->markItemProcessed( $image_identifier ); - if ( ! $file_path ) { + // Mark this candidate as processed even though download failed, + // so the next run advances to the next candidate instead of + // retrying a permanently broken URL. This is a retry-prevention + // concern separate from candidate prefiltering - the collector + // only reads processed state, it does not mark. + $context->markItemProcessed( $image_identifier ); + $context->log( 'warning', 'VisionExtractor: Failed to download image, will try next candidate on next run', diff --git a/tests/Unit/EventSectionFinderTest.php b/tests/Unit/EventSectionFinderTest.php new file mode 100644 index 0000000..065fcb4 --- /dev/null +++ b/tests/Unit/EventSectionFinderTest.php @@ -0,0 +1,59 @@ +createMock( ExecutionContext::class ); + $context->method( 'isDirect' )->willReturn( true ); + $context->expects( $this->exactly( 2 ) ) + ->method( 'isItemProcessed' ) + ->willReturnOnConsecutiveCalls( true, false ); + $context->expects( $this->once() ) + ->method( 'isItemClaimed' ) + ->willReturn( false ); + + $collector = new FreshCandidateCollector( $context ); + $finder = new EventSectionFinder( + $collector, + fn ( string $html ): string => trim( $html ), + fn ( string $ymd ): bool => false + ); + + $html = '' + . '

Processed Show

' + . '

This processed event section has enough content to pass the length checks.

' + . '

Fresh Show

' + . '

This fresh event section has enough content to pass the length checks.

' + . ''; + + $section = $finder->find_first_eligible_section( $html, 'https://example.com/events', $context ); + + $this->assertNotNull( $section ); + $this->assertStringContainsString( 'Fresh Show', $section['raw_html'] ); + $this->assertSame( + array( + 'raw_seen' => 2, + 'accepted' => 1, + 'processed_skipped' => 1, + 'claimed_skipped' => 0, + 'duplicate_skipped' => 0, + 'reprocess_accepted' => 0, + 'max_items' => 0, + 'source_exhausted' => false, + ), + $collector->getDiagnostics() + ); + } +}