Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 44 additions & 16 deletions inc/Core/Steps/Fetch/Handlers/FetchHandler.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@
*
* Uses ExecutionContext for deduplication, engine data, file storage, and logging.
*
* Also registers the skip_item tool available to all fetch-type handlers, allowing
* the pipeline agent to explicitly skip items that don't meet processing criteria.
* Also registers explicit reject/defer tools available to all fetch-type handlers,
* allowing the pipeline agent to mark source rejections or release retriable items.
*
* @package Data_Machine
* @subpackage Core\Steps\Fetch\Handlers
Expand All @@ -21,7 +21,7 @@
use DataMachine\Core\DataPacket;
use DataMachine\Core\ExecutionContext;
use DataMachine\Core\FilesRepository\FileStorage;
use DataMachine\Core\Steps\Fetch\Tools\SkipItemTool;
use DataMachine\Core\Steps\Fetch\Tools\FetchItemDispositionTool;
use DataMachine\Core\Steps\Handlers\HttpRequestHelpers;

if ( ! defined( 'ABSPATH' ) ) {
Expand Down Expand Up @@ -227,7 +227,7 @@ private function markProcessed( array $items, ExecutionContext $context ): void
*
* Override in subclasses to add handler-specific side effects.
* For example, EventImportHandler stores item context in engine data
* for the skip_item AI tool.
* for the fetch disposition AI tools.
*
* @param ExecutionContext $context Execution context.
* @param array $item The item that was just marked as processed.
Expand Down Expand Up @@ -412,7 +412,7 @@ protected function getAuthProvider( string $provider_key ): ?object {
/**
* Initialize FetchHandler static functionality.
*
* Registers the skip_item tool in the unified `datamachine_tools` registry
* Registers fetch disposition tools in the unified `datamachine_tools` registry
* as a cross-cutting handler tool — ToolPolicyResolver resolves it for any
* adjacent step whose handler type is `fetch` or `event_import`.
*
Expand All @@ -422,8 +422,8 @@ public static function init(): void {
add_filter(
'datamachine_tools',
static function ( array $tools ): array {
$tools['__handler_tools_skip_item'] = array(
'_handler_callable' => array( self::class, 'resolveSkipItemTool' ),
$tools['__handler_tools_fetch_dispositions'] = array(
'_handler_callable' => array( self::class, 'resolveFetchDispositionTools' ),
'handler_types' => array( 'fetch', 'event_import' ),
'modes' => array( 'pipeline' ),
'access_level' => 'admin',
Expand All @@ -434,7 +434,7 @@ static function ( array $tools ): array {
}

/**
* Resolve the skip_item tool for a specific fetch-type handler.
* Resolve fetch disposition tools for a specific fetch-type handler.
*
* Invoked lazily by ToolManager::resolveHandlerTools() when ANY adjacent
* step handler's registered type is `fetch` or `event_import`. The tool
Expand All @@ -444,38 +444,66 @@ static function ( array $tools ): array {
* @param string $handler_slug Resolved adjacent-step handler slug.
* @param array $handler_config Handler configuration.
* @param array $engine_data Engine data snapshot (unused).
* @return array{skip_item: array} Tool map with the skip_item definition.
* @return array{reject_source: array, defer_item: array} Tool map with disposition definitions.
* @since 0.9.7
*/
public static function resolveSkipItemTool(
public static function resolveFetchDispositionTools(
string $handler_slug,
array $handler_config,
array $engine_data
): array {
unset( $engine_data );
return array(
'skip_item' => self::getSkipItemToolDefinition( $handler_slug, $handler_config ),
'reject_source' => self::getRejectSourceToolDefinition( $handler_slug, $handler_config ),
'defer_item' => self::getDeferItemToolDefinition( $handler_slug, $handler_config ),
);
}

/**
* Get the skip_item tool definition.
* Get the reject_source tool definition.
*
* @param string $handler_slug Handler slug to associate with
* @param array $handler_config Handler configuration
* @return array Tool definition
* @since 0.9.7
*/
private static function getSkipItemToolDefinition( string $handler_slug, array $handler_config ): array {
private static function getRejectSourceToolDefinition( string $handler_slug, array $handler_config ): array {
return array(
'class' => SkipItemTool::class,
'class' => FetchItemDispositionTool::class,
'method' => 'handle_tool_call',
'handler' => $handler_slug,
'description' => 'Skip processing this item. Use when the item does not meet quality or relevance criteria defined in the pipeline rules (RULES.md). The item will be marked as processed and will not be refetched on subsequent runs.',
'disposition' => 'reject_source',
'description' => 'Reject this fetched source item after reasoned content/source evaluation. Use when the source itself is irrelevant, too thin, duplicate, noisy, spammy, or otherwise fails the pipeline quality or relevance criteria. The source item will be marked as processed and will not normally be refetched.',
'parameters' => array(
'reason' => array(
'type' => 'string',
'description' => 'Concise 2-5 word categorical skip reason, following the vocabulary defined in the pipeline system prompt or RULES.md. Do NOT write sentences — just the category.',
'description' => 'Concise categorical rejection reason, following the vocabulary defined in the pipeline system prompt or RULES.md.',
'required' => true,
),
),
'handler_config' => $handler_config,
);
}

/**
* Get the defer_item tool definition.
*
* @param string $handler_slug Handler slug to associate with.
* @param array $handler_config Handler configuration.
* @return array Tool definition.
* @since 0.9.7
*/
private static function getDeferItemToolDefinition( string $handler_slug, array $handler_config ): array {
return array(
'class' => FetchItemDispositionTool::class,
'method' => 'handle_tool_call',
'handler' => $handler_slug,
'disposition' => 'defer_item',
'description' => 'Defer this fetched source item when the agent cannot safely complete processing now because of runtime failures, tool errors, missing context, uncertainty, or temporary limitations. The source claim will be released and the item will remain eligible to be fetched and retried later; it will not be marked processed.',
'parameters' => array(
'reason' => array(
'type' => 'string',
'description' => 'Concise reason explaining why processing cannot safely complete now and should be retried later.',
'required' => true,
),
),
Expand Down
266 changes: 266 additions & 0 deletions inc/Core/Steps/Fetch/Tools/FetchItemDispositionTool.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,266 @@
<?php
/**
* Fetch Item Disposition Tool
*
* Handler tool that allows the pipeline agent to explicitly reject or defer
* a fetched source item. Rejections mark the source as processed so it will
* not be refetched; deferrals release the source claim so the item remains
* eligible for a later retry.
*
* This provides a safety net when keyword exclusions or other filters
* miss items that shouldn't be processed (e.g., non-music events).
*
* @package DataMachine\Core\Steps\Fetch\Tools
* @since 0.9.7
*/

namespace DataMachine\Core\Steps\Fetch\Tools;

use DataMachine\Core\JobStatus;

defined( 'ABSPATH' ) || exit;

class FetchItemDispositionTool {

private const DISPOSITION_REJECT_SOURCE = 'reject_source';
private const DISPOSITION_DEFER_ITEM = 'defer_item';

/**
* Handle the reject_source/defer_item tool call.
*
* @param array $parameters Tool parameters from AI (reason required).
* @param array $tool_def Tool definition with handler_config.
* @return array Tool result with success status.
*/
public function handle_tool_call( array $parameters, array $tool_def = array() ): array {
$disposition = $this->getDisposition( $tool_def );
if ( self::DISPOSITION_DEFER_ITEM === $disposition ) {
return $this->deferItem( $parameters, $tool_def );
}

return $this->rejectSource( $parameters, $tool_def );
}

/**
* Mark the current source item as rejected/processed.
*
* @param array $parameters Tool parameters from AI.
* @param array $tool_def Tool definition with handler_config.
* @return array Tool result with success status.
*/
private function rejectSource( array $parameters, array $tool_def ): array {
unset( $tool_def );

$reason = trim( $parameters['reason'] ?? '' );
$tool_name = self::DISPOSITION_REJECT_SOURCE;

if ( empty( $reason ) ) {
return array(
'success' => false,
'error' => 'reason parameter is required - explain why this source is being rejected',
'tool_name' => $tool_name,
);
}

$job_id = (int) ( $parameters['job_id'] ?? 0 );
if ( ! $job_id ) {
return array(
'success' => false,
'error' => 'job_id is required for reject_source operations',
'tool_name' => $tool_name,
);
}

// Get engine data for item identification
$engine = $parameters['engine'] ?? null;
if ( ! $engine ) {
return array(
'success' => false,
'error' => 'Engine context not available',
'tool_name' => $tool_name,
);
}

// Get item identifier and source type from engine data (set by fetch handler)
$item_identifier = $engine->get( 'item_identifier' );
$source_type = $engine->get( 'source_type' );
$flow_step_id = $this->resolveFetchFlowStepId( $engine ) ?? ( $parameters['flow_step_id'] ?? $engine->get( 'flow_step_id' ) );

// Mark item as processed so it won't be refetched
if ( $flow_step_id && $item_identifier && $source_type ) {
do_action(
'datamachine_mark_item_processed',
$flow_step_id,
$source_type,
$item_identifier,
$job_id
);

do_action(
'datamachine_log',
'info',
'FetchItemDispositionTool: Source rejected and marked as processed',
array(
'job_id' => $job_id,
'flow_step_id' => $flow_step_id,
'item_identifier' => $item_identifier,
'source_type' => $source_type,
'reason' => $reason,
)
);
} else {
do_action(
'datamachine_log',
'warning',
'FetchItemDispositionTool: Could not mark rejected source as processed - missing identifiers',
array(
'job_id' => $job_id,
'flow_step_id' => $flow_step_id,
'item_identifier' => $item_identifier,
'source_type' => $source_type,
'reason' => $reason,
)
);
}

// Set job status override for engine to use at completion
$status = JobStatus::agentSkipped( 'source-rejected' );
datamachine_merge_engine_data( $job_id, array( 'job_status' => $status->toString() ) );

do_action(
'datamachine_log',
'info',
'FetchItemDispositionTool: Job status set to source rejected',
array(
'job_id' => $job_id,
'status' => $status->toString(),
'reason' => $reason,
)
);

return array(
'success' => true,
'message' => "Source rejected: {$reason}",
'status' => $status->toString(),
'item_identifier' => $item_identifier,
'tool_name' => $tool_name,
'disposition' => self::DISPOSITION_REJECT_SOURCE,
'reason' => $reason,
);
}

/**
* Release the current source item claim without marking it processed.
*
* @param array $parameters Tool parameters from AI.
* @param array $tool_def Tool definition with handler_config.
* @return array Tool result with success status.
*/
private function deferItem( array $parameters, array $tool_def ): array {
unset( $tool_def );

$reason = trim( $parameters['reason'] ?? '' );
$tool_name = self::DISPOSITION_DEFER_ITEM;

if ( empty( $reason ) ) {
return array(
'success' => false,
'error' => 'reason parameter is required - explain why this item cannot be safely completed now',
'tool_name' => $tool_name,
);
}

$job_id = (int) ( $parameters['job_id'] ?? 0 );
if ( ! $job_id ) {
return array(
'success' => false,
'error' => 'job_id is required for defer_item operations',
'tool_name' => $tool_name,
);
}

$engine = $parameters['engine'] ?? null;
if ( ! $engine ) {
return array(
'success' => false,
'error' => 'Engine context not available',
'tool_name' => $tool_name,
);
}

$item_identifier = $engine->get( 'item_identifier' );
$source_type = $engine->get( 'source_type' );
$flow_step_id = $this->resolveFetchFlowStepId( $engine ) ?? ( $parameters['flow_step_id'] ?? $engine->get( 'flow_step_id' ) );
$released = null;

if ( $flow_step_id && $item_identifier && $source_type ) {
$released = ( new \DataMachine\Core\Database\ProcessedItems\ProcessedItems() )->release_claim( $flow_step_id, (string) $source_type, (string) $item_identifier );
}

$status = JobStatus::failed( 'item-deferred' );
datamachine_merge_engine_data( $job_id, array( 'job_status' => $status->toString() ) );

do_action(
'datamachine_log',
'info',
'FetchItemDispositionTool: Item deferred and source claim released',
array(
'job_id' => $job_id,
'flow_step_id' => $flow_step_id,
'item_identifier' => $item_identifier,
'source_type' => $source_type,
'reason' => $reason,
'released' => $released,
'status' => $status->toString(),
)
);

return array(
'success' => true,
'message' => "Item deferred for retry: {$reason}",
'status' => $status->toString(),
'item_identifier' => $item_identifier,
'tool_name' => $tool_name,
'disposition' => self::DISPOSITION_DEFER_ITEM,
'reason' => $reason,
'released' => $released,
);
}

/**
* Return the disposition encoded by the resolved tool definition.
*
* @param array $tool_def Tool definition.
* @return string
*/
private function getDisposition( array $tool_def ): string {
$disposition = $tool_def['disposition'] ?? self::DISPOSITION_REJECT_SOURCE;

return self::DISPOSITION_DEFER_ITEM === $disposition ? self::DISPOSITION_DEFER_ITEM : self::DISPOSITION_REJECT_SOURCE;
}

/**
* Resolve the source fetch/event_import step ID from engine flow config.
*
* @param object $engine Engine data wrapper.
* @return string|null Fetch step ID, or null when unavailable.
*/
private function resolveFetchFlowStepId( object $engine ): ?string {
$flow_config = $engine->get( 'flow_config' );
if ( ! is_array( $flow_config ) ) {
return null;
}

foreach ( $flow_config as $step_id => $config ) {
if ( ! is_array( $config ) ) {
continue;
}

if ( in_array( $config['step_type'] ?? '', array( 'fetch', 'event_import' ), true ) ) {
return (string) $step_id;
}
}

return null;
}
}
Loading
Loading