Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 18 additions & 9 deletions inc/Steps/EventImport/Handlers/EventFlyer/EventFlyer.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
namespace DataMachineEvents\Steps\EventImport\Handlers\EventFlyer;

use DataMachine\Core\ExecutionContext;
use DataMachine\Core\Steps\Fetch\FreshCandidateCollector;
use DataMachineEvents\Steps\EventImport\Handlers\EventImportHandler;
use DataMachineEvents\Steps\EventImport\Handlers\VenueFieldsTrait;
use DataMachineEvents\Utilities\EventIdentifierGenerator;
Expand Down Expand Up @@ -140,6 +141,12 @@ private function getNextUnprocessedImage( ExecutionContext $context ): ?array {

$image_extensions = array( 'jpg', 'jpeg', 'png', 'gif', 'webp' );

// Selection-time prefilter via Data Machine core primitive.
// FreshCandidateCollector skips already-processed and currently-claimed
// images consistently with FetchHandler's authoritative dedup. Authoritative
// dedup/claim/cap still runs in FetchHandler::get_fetch_data() after this.
$collector = new FreshCandidateCollector( $context, 1 );

foreach ( $repo_files as $file ) {
$extension = strtolower( pathinfo( $file['filename'], PATHINFO_EXTENSION ) );

Expand All @@ -148,23 +155,25 @@ private function getNextUnprocessedImage( ExecutionContext $context ): ?array {
}

$file_identifier = $file['path'];

// Pre-filter: skip already-processed images to find the next one.
// This is a selection mechanism, not dedup — dedup happens in FetchHandler::dedup().
if ( $context->isItemProcessed( $file_identifier ) ) {
continue;
}

return array(
$image = array(
'original_name' => $file['filename'],
'persistent_path' => $file['path'],
'size' => $file['size'],
'mime_type' => $this->getMimeType( $file['path'] ),
'uploaded_at' => gmdate( 'Y-m-d H:i:s', $file['modified'] ),
);

$collector->offer( $file_identifier, $image );

if ( $collector->isFull() ) {
break;
}
}

return null;
$collector->markExhausted();

$accepted = $collector->getAccepted();
return $accepted[0] ?? null;
}

private function getMimeType( string $file_path ): string {
Expand Down
33 changes: 0 additions & 33 deletions inc/Steps/EventImport/Handlers/EventImportHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -110,39 +110,6 @@ public function isPastEvent( string $start_date ): bool {
return strtotime( $start_date ) < strtotime( 'today' );
}

/**
* Check if item has been processed (uses ExecutionContext).
*
* @deprecated Use metadata['item_identifier'] instead. Dedup is now centralized in FetchHandler::dedup().
*
* @param ExecutionContext $context Execution context
* @param string $item_identifier Item identifier
* @return bool True if already processed
*/
public function checkItemProcessed( ExecutionContext $context, string $item_identifier ): bool {
return $context->isItemProcessed( $item_identifier );
}

/**
* Mark item as processed (uses ExecutionContext).
*
* Also stores item context in engine data for the skip_item tool.
*
* @deprecated Use metadata['item_identifier'] instead. Dedup is now centralized in FetchHandler::dedup().
*
* @param ExecutionContext $context Execution context
* @param string $item_identifier Item identifier
*/
public function markItemAsProcessed( ExecutionContext $context, string $item_identifier ): void {
$context->markItemProcessed( $item_identifier );

// Store item context for skip_item tool
$job_id = $context->getJobId();
if ( $job_id ) {
EventEngineData::storeItemContext( (int) $job_id, $item_identifier, $this->handler_type );
}
}

/**
* Called by FetchHandler::dedup() after marking an item as processed.
*
Expand Down
19 changes: 10 additions & 9 deletions inc/Steps/EventImport/Handlers/WebScraper/EventSectionFinder.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
namespace DataMachineEvents\Steps\EventImport\Handlers\WebScraper;

use DataMachine\Core\ExecutionContext;
use DataMachine\Core\Steps\Fetch\FreshCandidateCollector;

if ( ! defined( 'ABSPATH' ) ) {
exit;
Expand All @@ -19,9 +20,9 @@
final class EventSectionFinder {

/**
* @var callable(string): bool
* @var FreshCandidateCollector
*/
private $is_item_processed;
private FreshCandidateCollector $fresh_candidates;

/**
* @var callable(string): string
Expand All @@ -34,12 +35,12 @@ final class EventSectionFinder {
private $is_past_event;

/**
* @param callable(string): bool $is_item_processed Check if item identifier has been processed
* @param FreshCandidateCollector $fresh_candidates Selection-time fresh candidate collector.
* @param callable(string): string $clean_html_for_ai Clean HTML for AI processing
* @param callable(string): bool $is_past_event Check if Y-m-d date is in the past
*/
public function __construct( callable $is_item_processed, callable $clean_html_for_ai, callable $is_past_event ) {
$this->is_item_processed = $is_item_processed;
public function __construct( FreshCandidateCollector $fresh_candidates, callable $clean_html_for_ai, callable $is_past_event ) {
$this->fresh_candidates = $fresh_candidates;
$this->clean_html_for_ai = $clean_html_for_ai;
$this->is_past_event = $is_past_event;
}
Expand Down Expand Up @@ -96,7 +97,7 @@ public function find_first_eligible_section( string $html_content, string $url,
// Build stable identifier from decoded data
$event_identifier = md5( $url . $summary . ( $event_data['start'] ?? '' ) );

if ( call_user_func( $this->is_item_processed, $event_identifier ) ) {
if ( ! $this->fresh_candidates->offer( $event_identifier ) ) {
continue;
}

Expand Down Expand Up @@ -128,12 +129,12 @@ public function find_first_eligible_section( string $html_content, string $url,
$content_hash = md5( $raw_html );
$event_identifier = md5( $url . $content_hash );

if ( call_user_func( $this->is_item_processed, $event_identifier ) ) {
$cleaned_html = call_user_func( $this->clean_html_for_ai, $raw_html );
if ( empty( $cleaned_html ) || strlen( $cleaned_html ) < 30 ) {
continue;
}

$cleaned_html = call_user_func( $this->clean_html_for_ai, $raw_html );
if ( empty( $cleaned_html ) || strlen( $cleaned_html ) < 30 ) {
if ( ! $this->fresh_candidates->offer( $event_identifier ) ) {
continue;
}

Expand Down
21 changes: 12 additions & 9 deletions inc/Steps/EventImport/Handlers/WebScraper/UniversalWebScraper.php
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
namespace DataMachineEvents\Steps\EventImport\Handlers\WebScraper;

use DataMachine\Core\ExecutionContext;
use DataMachine\Core\Steps\Fetch\FreshCandidateCollector;
use DataMachineEvents\Steps\EventImport\Handlers\EventImportHandler;
use DataMachineEvents\Steps\EventImport\Handlers\WebScraper\Extractors\ExtractorInterface;
use DataMachineEvents\Steps\EventImport\Handlers\WebScraper\Extractors\WixEventsExtractor;
Expand Down Expand Up @@ -469,12 +470,18 @@ private function tryHtmlSectionExtraction(
ExecutionContext $context,
int $current_page
): ?array {
$skipped_identifiers = array();
// Selection-time prefilter via Data Machine core primitive.
// The collector skips processed/claimed/duplicate sections while this
// method keeps event-specific filters (HTML validity, title keywords,
// include/exclude keywords). FetchHandler remains authoritative after
// executeFetch() returns.
$section_collector = new FreshCandidateCollector( $context );

while ( true ) {
$event_section = $this->extract_event_sections( $html_content, $current_url, $context, $skipped_identifiers );
$event_section = $this->extract_event_sections( $html_content, $current_url, $context, $section_collector );

if ( empty( $event_section ) ) {
$section_collector->markExhausted();
break;
}

Expand All @@ -490,13 +497,11 @@ private function tryHtmlSectionExtraction(
$raw_html_data = $this->extract_raw_html_section( $event_section['raw_html'], $current_url, $context, $config );

if ( ! $raw_html_data ) {
$skipped_identifiers[ $event_section['identifier'] ] = true;
continue;
}

$section_title = $this->extract_section_title( $raw_html_data );
if ( '' !== $section_title && $this->shouldSkipEventTitle( $section_title ) ) {
$skipped_identifiers[ $event_section['identifier'] ] = true;
continue;
}

Expand All @@ -511,7 +516,6 @@ private function tryHtmlSectionExtraction(
'source_url' => $current_url,
)
);
$skipped_identifiers[ $event_section['identifier'] ] = true;
continue;
}

Expand All @@ -524,7 +528,6 @@ private function tryHtmlSectionExtraction(
'source_url' => $current_url,
)
);
$skipped_identifiers[ $event_section['identifier'] ] = true;
continue;
}

Expand Down Expand Up @@ -740,11 +743,11 @@ private function fetch_html( string $url, ExecutionContext $context ): string {
}

/**
* Extract first non-processed event HTML section from content.
* Extract first fresh event HTML section from content.
*/
private function extract_event_sections( string $html_content, string $url, ExecutionContext $context, array $skipped_identifiers = array() ): ?array {
private function extract_event_sections( string $html_content, string $url, ExecutionContext $context, FreshCandidateCollector $section_collector ): ?array {
$finder = new EventSectionFinder(
fn ( string $identifier ): bool => isset( $skipped_identifiers[ $identifier ] ) || $context->isItemProcessed( $identifier ),
$section_collector,
fn ( string $html ): string => $this->clean_html_for_ai( $html ),
fn ( string $ymd ): bool => $this->isPastEvent( $ymd )
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
namespace DataMachineEvents\Steps\EventImport\Handlers\WebScraper;

use DataMachine\Core\ExecutionContext;
use DataMachine\Core\Steps\Fetch\FreshCandidateCollector;
use DataMachineEvents\Steps\EventImport\Handlers\EventImportHandler;
use DataMachineEvents\Steps\EventImport\Handlers\WebScraper\Extractors\VisionExtractor;
use DataMachineEvents\Steps\EventImport\EventEngineData;
Expand Down Expand Up @@ -76,41 +77,57 @@ public function process(
)
);

// Selection-time prefilter via Data Machine core primitive.
// FreshCandidateCollector skips already-processed and currently-claimed
// images consistently with FetchHandler's authoritative dedup. Authoritative
// dedup/claim/cap still runs in FetchHandler::get_fetch_data() after this.
$collector = new FreshCandidateCollector( $context, 1 );
foreach ( $candidates as $candidate ) {
$image_url = $candidate['url'];

// Generate content-based identifier for cross-run tracking.
$image_url = $candidate['url'];
// Content-based identifier for cross-run tracking.
$image_identifier = md5( $url . $image_url );

// Pre-filter: skip already-processed images to find the next candidate.
// This is a selection mechanism, not dedup — dedup happens in FetchHandler::dedup().
if ( $context->isItemProcessed( $image_identifier ) ) {
$context->log(
'debug',
'VisionExtractor: Skipping processed image',
array( 'image_url' => $image_url )
);
continue;
$collector->offer(
$image_identifier,
array(
'image_url' => $image_url,
'image_identifier' => $image_identifier,
'score' => $candidate['score'] ?? 0,
)
);

if ( $collector->isFull() ) {
break;
}
}
$collector->markExhausted();

$accepted = $collector->getAccepted();
if ( ! empty( $accepted ) ) {
$selected = $accepted[0];
$image_url = $selected['image_url'];
$image_identifier = $selected['image_identifier'];

$context->log(
'debug',
'VisionExtractor: Processing image candidate',
array(
'image_url' => $image_url,
'score' => $candidate['score'],
'score' => $selected['score'],
)
);

// Download to persistent storage.
$file_path = $this->downloadImageToPersistentStorage( $image_url, $context );

// Mark as processed AFTER download attempt (success or fail).
// Direct call — this is a pre-filter, not dedup. Centralized dedup
// in FetchHandler::dedup() runs after executeFetch() returns.
$context->markItemProcessed( $image_identifier );

if ( ! $file_path ) {
// Mark this candidate as processed even though download failed,
// so the next run advances to the next candidate instead of
// retrying a permanently broken URL. This is a retry-prevention
// concern separate from candidate prefiltering - the collector
// only reads processed state, it does not mark.
$context->markItemProcessed( $image_identifier );

$context->log(
'warning',
'VisionExtractor: Failed to download image, will try next candidate on next run',
Expand Down
59 changes: 59 additions & 0 deletions tests/Unit/EventSectionFinderTest.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
<?php
/**
* EventSectionFinder tests.
*
* @package DataMachineEvents\Tests\Unit
*/

namespace DataMachineEvents\Tests\Unit;

use DataMachine\Core\ExecutionContext;
use DataMachine\Core\Steps\Fetch\FreshCandidateCollector;
use DataMachineEvents\Steps\EventImport\Handlers\WebScraper\EventSectionFinder;
use WP_UnitTestCase;

class EventSectionFinderTest extends WP_UnitTestCase {

public function test_processed_first_section_does_not_stop_scan_for_later_fresh_section(): void {
$context = $this->createMock( ExecutionContext::class );
$context->method( 'isDirect' )->willReturn( true );
$context->expects( $this->exactly( 2 ) )
->method( 'isItemProcessed' )
->willReturnOnConsecutiveCalls( true, false );
$context->expects( $this->once() )
->method( 'isItemClaimed' )
->willReturn( false );

$collector = new FreshCandidateCollector( $context );
$finder = new EventSectionFinder(
$collector,
fn ( string $html ): string => trim( $html ),
fn ( string $ymd ): bool => false
);

$html = '<html><body>'
. '<div class="event"><h2>Processed Show</h2>'
. '<p>This processed event section has enough content to pass the length checks.</p></div>'
. '<div class="event"><h2>Fresh Show</h2>'
. '<p>This fresh event section has enough content to pass the length checks.</p></div>'
. '</body></html>';

$section = $finder->find_first_eligible_section( $html, 'https://example.com/events', $context );

$this->assertNotNull( $section );
$this->assertStringContainsString( 'Fresh Show', $section['raw_html'] );
$this->assertSame(
array(
'raw_seen' => 2,
'accepted' => 1,
'processed_skipped' => 1,
'claimed_skipped' => 0,
'duplicate_skipped' => 0,
'reprocess_accepted' => 0,
'max_items' => 0,
'source_exhausted' => false,
),
$collector->getDiagnostics()
);
}
}
Loading