diff --git a/inc/Core/Steps/Fetch/Handlers/FetchHandler.php b/inc/Core/Steps/Fetch/Handlers/FetchHandler.php index e3b44706a..2c0aa0e8a 100644 --- a/inc/Core/Steps/Fetch/Handlers/FetchHandler.php +++ b/inc/Core/Steps/Fetch/Handlers/FetchHandler.php @@ -7,8 +7,8 @@ * * Uses ExecutionContext for deduplication, engine data, file storage, and logging. * - * Also registers the skip_item tool available to all fetch-type handlers, allowing - * the pipeline agent to explicitly skip items that don't meet processing criteria. + * Also registers explicit reject/defer tools available to all fetch-type handlers, + * allowing the pipeline agent to mark source rejections or release retriable items. * * @package Data_Machine * @subpackage Core\Steps\Fetch\Handlers @@ -21,7 +21,7 @@ use DataMachine\Core\DataPacket; use DataMachine\Core\ExecutionContext; use DataMachine\Core\FilesRepository\FileStorage; -use DataMachine\Core\Steps\Fetch\Tools\SkipItemTool; +use DataMachine\Core\Steps\Fetch\Tools\FetchItemDispositionTool; use DataMachine\Core\Steps\Handlers\HttpRequestHelpers; if ( ! defined( 'ABSPATH' ) ) { @@ -227,7 +227,7 @@ private function markProcessed( array $items, ExecutionContext $context ): void * * Override in subclasses to add handler-specific side effects. * For example, EventImportHandler stores item context in engine data - * for the skip_item AI tool. + * for the fetch disposition AI tools. * * @param ExecutionContext $context Execution context. * @param array $item The item that was just marked as processed. @@ -412,7 +412,7 @@ protected function getAuthProvider( string $provider_key ): ?object { /** * Initialize FetchHandler static functionality. * - * Registers the skip_item tool in the unified `datamachine_tools` registry + * Registers fetch disposition tools in the unified `datamachine_tools` registry * as a cross-cutting handler tool — ToolPolicyResolver resolves it for any * adjacent step whose handler type is `fetch` or `event_import`. * @@ -422,8 +422,8 @@ public static function init(): void { add_filter( 'datamachine_tools', static function ( array $tools ): array { - $tools['__handler_tools_skip_item'] = array( - '_handler_callable' => array( self::class, 'resolveSkipItemTool' ), + $tools['__handler_tools_fetch_dispositions'] = array( + '_handler_callable' => array( self::class, 'resolveFetchDispositionTools' ), 'handler_types' => array( 'fetch', 'event_import' ), 'modes' => array( 'pipeline' ), 'access_level' => 'admin', @@ -434,7 +434,7 @@ static function ( array $tools ): array { } /** - * Resolve the skip_item tool for a specific fetch-type handler. + * Resolve fetch disposition tools for a specific fetch-type handler. * * Invoked lazily by ToolManager::resolveHandlerTools() when ANY adjacent * step handler's registered type is `fetch` or `event_import`. The tool @@ -444,38 +444,66 @@ static function ( array $tools ): array { * @param string $handler_slug Resolved adjacent-step handler slug. * @param array $handler_config Handler configuration. * @param array $engine_data Engine data snapshot (unused). - * @return array{skip_item: array} Tool map with the skip_item definition. + * @return array{reject_source: array, defer_item: array} Tool map with disposition definitions. * @since 0.9.7 */ - public static function resolveSkipItemTool( + public static function resolveFetchDispositionTools( string $handler_slug, array $handler_config, array $engine_data ): array { unset( $engine_data ); return array( - 'skip_item' => self::getSkipItemToolDefinition( $handler_slug, $handler_config ), + 'reject_source' => self::getRejectSourceToolDefinition( $handler_slug, $handler_config ), + 'defer_item' => self::getDeferItemToolDefinition( $handler_slug, $handler_config ), ); } /** - * Get the skip_item tool definition. + * Get the reject_source tool definition. * * @param string $handler_slug Handler slug to associate with * @param array $handler_config Handler configuration * @return array Tool definition * @since 0.9.7 */ - private static function getSkipItemToolDefinition( string $handler_slug, array $handler_config ): array { + private static function getRejectSourceToolDefinition( string $handler_slug, array $handler_config ): array { return array( - 'class' => SkipItemTool::class, + 'class' => FetchItemDispositionTool::class, 'method' => 'handle_tool_call', 'handler' => $handler_slug, - 'description' => 'Skip processing this item. Use when the item does not meet quality or relevance criteria defined in the pipeline rules (RULES.md). The item will be marked as processed and will not be refetched on subsequent runs.', + 'disposition' => 'reject_source', + 'description' => 'Reject this fetched source item after reasoned content/source evaluation. Use when the source itself is irrelevant, too thin, duplicate, noisy, spammy, or otherwise fails the pipeline quality or relevance criteria. The source item will be marked as processed and will not normally be refetched.', 'parameters' => array( 'reason' => array( 'type' => 'string', - 'description' => 'Concise 2-5 word categorical skip reason, following the vocabulary defined in the pipeline system prompt or RULES.md. Do NOT write sentences — just the category.', + 'description' => 'Concise categorical rejection reason, following the vocabulary defined in the pipeline system prompt or RULES.md.', + 'required' => true, + ), + ), + 'handler_config' => $handler_config, + ); + } + + /** + * Get the defer_item tool definition. + * + * @param string $handler_slug Handler slug to associate with. + * @param array $handler_config Handler configuration. + * @return array Tool definition. + * @since 0.9.7 + */ + private static function getDeferItemToolDefinition( string $handler_slug, array $handler_config ): array { + return array( + 'class' => FetchItemDispositionTool::class, + 'method' => 'handle_tool_call', + 'handler' => $handler_slug, + 'disposition' => 'defer_item', + 'description' => 'Defer this fetched source item when the agent cannot safely complete processing now because of runtime failures, tool errors, missing context, uncertainty, or temporary limitations. The source claim will be released and the item will remain eligible to be fetched and retried later; it will not be marked processed.', + 'parameters' => array( + 'reason' => array( + 'type' => 'string', + 'description' => 'Concise reason explaining why processing cannot safely complete now and should be retried later.', 'required' => true, ), ), diff --git a/inc/Core/Steps/Fetch/Tools/FetchItemDispositionTool.php b/inc/Core/Steps/Fetch/Tools/FetchItemDispositionTool.php new file mode 100644 index 000000000..c332c7bb0 --- /dev/null +++ b/inc/Core/Steps/Fetch/Tools/FetchItemDispositionTool.php @@ -0,0 +1,266 @@ +getDisposition( $tool_def ); + if ( self::DISPOSITION_DEFER_ITEM === $disposition ) { + return $this->deferItem( $parameters, $tool_def ); + } + + return $this->rejectSource( $parameters, $tool_def ); + } + + /** + * Mark the current source item as rejected/processed. + * + * @param array $parameters Tool parameters from AI. + * @param array $tool_def Tool definition with handler_config. + * @return array Tool result with success status. + */ + private function rejectSource( array $parameters, array $tool_def ): array { + unset( $tool_def ); + + $reason = trim( $parameters['reason'] ?? '' ); + $tool_name = self::DISPOSITION_REJECT_SOURCE; + + if ( empty( $reason ) ) { + return array( + 'success' => false, + 'error' => 'reason parameter is required - explain why this source is being rejected', + 'tool_name' => $tool_name, + ); + } + + $job_id = (int) ( $parameters['job_id'] ?? 0 ); + if ( ! $job_id ) { + return array( + 'success' => false, + 'error' => 'job_id is required for reject_source operations', + 'tool_name' => $tool_name, + ); + } + + // Get engine data for item identification + $engine = $parameters['engine'] ?? null; + if ( ! $engine ) { + return array( + 'success' => false, + 'error' => 'Engine context not available', + 'tool_name' => $tool_name, + ); + } + + // Get item identifier and source type from engine data (set by fetch handler) + $item_identifier = $engine->get( 'item_identifier' ); + $source_type = $engine->get( 'source_type' ); + $flow_step_id = $this->resolveFetchFlowStepId( $engine ) ?? ( $parameters['flow_step_id'] ?? $engine->get( 'flow_step_id' ) ); + + // Mark item as processed so it won't be refetched + if ( $flow_step_id && $item_identifier && $source_type ) { + do_action( + 'datamachine_mark_item_processed', + $flow_step_id, + $source_type, + $item_identifier, + $job_id + ); + + do_action( + 'datamachine_log', + 'info', + 'FetchItemDispositionTool: Source rejected and marked as processed', + array( + 'job_id' => $job_id, + 'flow_step_id' => $flow_step_id, + 'item_identifier' => $item_identifier, + 'source_type' => $source_type, + 'reason' => $reason, + ) + ); + } else { + do_action( + 'datamachine_log', + 'warning', + 'FetchItemDispositionTool: Could not mark rejected source as processed - missing identifiers', + array( + 'job_id' => $job_id, + 'flow_step_id' => $flow_step_id, + 'item_identifier' => $item_identifier, + 'source_type' => $source_type, + 'reason' => $reason, + ) + ); + } + + // Set job status override for engine to use at completion + $status = JobStatus::agentSkipped( 'source-rejected' ); + datamachine_merge_engine_data( $job_id, array( 'job_status' => $status->toString() ) ); + + do_action( + 'datamachine_log', + 'info', + 'FetchItemDispositionTool: Job status set to source rejected', + array( + 'job_id' => $job_id, + 'status' => $status->toString(), + 'reason' => $reason, + ) + ); + + return array( + 'success' => true, + 'message' => "Source rejected: {$reason}", + 'status' => $status->toString(), + 'item_identifier' => $item_identifier, + 'tool_name' => $tool_name, + 'disposition' => self::DISPOSITION_REJECT_SOURCE, + 'reason' => $reason, + ); + } + + /** + * Release the current source item claim without marking it processed. + * + * @param array $parameters Tool parameters from AI. + * @param array $tool_def Tool definition with handler_config. + * @return array Tool result with success status. + */ + private function deferItem( array $parameters, array $tool_def ): array { + unset( $tool_def ); + + $reason = trim( $parameters['reason'] ?? '' ); + $tool_name = self::DISPOSITION_DEFER_ITEM; + + if ( empty( $reason ) ) { + return array( + 'success' => false, + 'error' => 'reason parameter is required - explain why this item cannot be safely completed now', + 'tool_name' => $tool_name, + ); + } + + $job_id = (int) ( $parameters['job_id'] ?? 0 ); + if ( ! $job_id ) { + return array( + 'success' => false, + 'error' => 'job_id is required for defer_item operations', + 'tool_name' => $tool_name, + ); + } + + $engine = $parameters['engine'] ?? null; + if ( ! $engine ) { + return array( + 'success' => false, + 'error' => 'Engine context not available', + 'tool_name' => $tool_name, + ); + } + + $item_identifier = $engine->get( 'item_identifier' ); + $source_type = $engine->get( 'source_type' ); + $flow_step_id = $this->resolveFetchFlowStepId( $engine ) ?? ( $parameters['flow_step_id'] ?? $engine->get( 'flow_step_id' ) ); + $released = null; + + if ( $flow_step_id && $item_identifier && $source_type ) { + $released = ( new \DataMachine\Core\Database\ProcessedItems\ProcessedItems() )->release_claim( $flow_step_id, (string) $source_type, (string) $item_identifier ); + } + + $status = JobStatus::failed( 'item-deferred' ); + datamachine_merge_engine_data( $job_id, array( 'job_status' => $status->toString() ) ); + + do_action( + 'datamachine_log', + 'info', + 'FetchItemDispositionTool: Item deferred and source claim released', + array( + 'job_id' => $job_id, + 'flow_step_id' => $flow_step_id, + 'item_identifier' => $item_identifier, + 'source_type' => $source_type, + 'reason' => $reason, + 'released' => $released, + 'status' => $status->toString(), + ) + ); + + return array( + 'success' => true, + 'message' => "Item deferred for retry: {$reason}", + 'status' => $status->toString(), + 'item_identifier' => $item_identifier, + 'tool_name' => $tool_name, + 'disposition' => self::DISPOSITION_DEFER_ITEM, + 'reason' => $reason, + 'released' => $released, + ); + } + + /** + * Return the disposition encoded by the resolved tool definition. + * + * @param array $tool_def Tool definition. + * @return string + */ + private function getDisposition( array $tool_def ): string { + $disposition = $tool_def['disposition'] ?? self::DISPOSITION_REJECT_SOURCE; + + return self::DISPOSITION_DEFER_ITEM === $disposition ? self::DISPOSITION_DEFER_ITEM : self::DISPOSITION_REJECT_SOURCE; + } + + /** + * Resolve the source fetch/event_import step ID from engine flow config. + * + * @param object $engine Engine data wrapper. + * @return string|null Fetch step ID, or null when unavailable. + */ + private function resolveFetchFlowStepId( object $engine ): ?string { + $flow_config = $engine->get( 'flow_config' ); + if ( ! is_array( $flow_config ) ) { + return null; + } + + foreach ( $flow_config as $step_id => $config ) { + if ( ! is_array( $config ) ) { + continue; + } + + if ( in_array( $config['step_type'] ?? '', array( 'fetch', 'event_import' ), true ) ) { + return (string) $step_id; + } + } + + return null; + } +} diff --git a/inc/Core/Steps/Fetch/Tools/SkipItemTool.php b/inc/Core/Steps/Fetch/Tools/SkipItemTool.php deleted file mode 100644 index d3a56e73c..000000000 --- a/inc/Core/Steps/Fetch/Tools/SkipItemTool.php +++ /dev/null @@ -1,130 +0,0 @@ - false, - 'error' => 'reason parameter is required - explain why this item is being skipped', - 'tool_name' => 'skip_item', - ); - } - - $job_id = (int) ( $parameters['job_id'] ?? 0 ); - if ( ! $job_id ) { - return array( - 'success' => false, - 'error' => 'job_id is required for skip_item operations', - 'tool_name' => 'skip_item', - ); - } - - // Get engine data for item identification - $engine = $parameters['engine'] ?? null; - if ( ! $engine ) { - return array( - 'success' => false, - 'error' => 'Engine context not available', - 'tool_name' => 'skip_item', - ); - } - - // Get item identifier and source type from engine data (set by fetch handler) - $item_identifier = $engine->get( 'item_identifier' ); - $source_type = $engine->get( 'source_type' ); - $flow_step_id = $parameters['flow_step_id'] ?? $engine->get( 'flow_step_id' ); - - // Mark item as processed so it won't be refetched - if ( $flow_step_id && $item_identifier && $source_type ) { - do_action( - 'datamachine_mark_item_processed', - $flow_step_id, - $source_type, - $item_identifier, - $job_id - ); - - do_action( - 'datamachine_log', - 'info', - 'SkipItemTool: Item marked as processed (skipped)', - array( - 'job_id' => $job_id, - 'flow_step_id' => $flow_step_id, - 'item_identifier' => $item_identifier, - 'source_type' => $source_type, - 'reason' => $reason, - ) - ); - } else { - do_action( - 'datamachine_log', - 'warning', - 'SkipItemTool: Could not mark item as processed - missing identifiers', - array( - 'job_id' => $job_id, - 'flow_step_id' => $flow_step_id, - 'item_identifier' => $item_identifier, - 'source_type' => $source_type, - 'reason' => $reason, - ) - ); - } - - // Set job status override for engine to use at completion - $status = JobStatus::agentSkipped( $reason ); - datamachine_merge_engine_data( $job_id, array( 'job_status' => $status->toString() ) ); - - do_action( - 'datamachine_log', - 'info', - 'SkipItemTool: Job status set to agent_skipped', - array( - 'job_id' => $job_id, - 'status' => $status->toString(), - 'reason' => $reason, - ) - ); - - return array( - 'success' => true, - 'message' => "Item skipped: {$reason}", - 'status' => $status->toString(), - 'item_identifier' => $item_identifier, - 'tool_name' => 'skip_item', - ); - } -} diff --git a/tests/fetch-item-dispositions-smoke.php b/tests/fetch-item-dispositions-smoke.php new file mode 100644 index 000000000..9555a41db --- /dev/null +++ b/tests/fetch-item-dispositions-smoke.php @@ -0,0 +1,134 @@ + */ + private array $data; + + /** + * @param array $data Engine data. + */ + public function __construct( array $data ) { + $this->data = $data; + } + + public function get( string $key ): mixed { + return $this->data[ $key ] ?? null; + } + } + + $engine = new FetchDispositionSmokeEngine( + array( + 'item_identifier' => 'source-123', + 'source_type' => 'rss', + 'flow_config' => array( + 'fetch-step_7' => array( 'step_type' => 'fetch' ), + 'ai-step_7' => array( 'step_type' => 'ai' ), + ), + ) + ); + + $tool = new DataMachine\Core\Steps\Fetch\Tools\FetchItemDispositionTool(); + + echo "Case 1: reject_source marks processed\n"; + $reject = $tool->handle_tool_call( + array( + 'job_id' => 1814, + 'flow_step_id' => 'ai-step_7', + 'engine' => $engine, + 'reason' => 'duplicate-source', + ), + array( 'disposition' => 'reject_source' ) + ); + assert_fetch_disposition_smoke( 'reject_source succeeds', true === ( $reject['success'] ?? false ) ); + assert_fetch_disposition_smoke( 'reject_source reports explicit tool name', 'reject_source' === ( $reject['tool_name'] ?? '' ) ); + assert_fetch_disposition_smoke( 'reject_source marks fetch step processed', array( 'fetch-step_7', 'rss', 'source-123', 1814 ) === ( $GLOBALS['fetch_disposition_smoke_processed'][0] ?? null ) ); + assert_fetch_disposition_smoke( 'reject_source sets source-rejected status', 'agent_skipped - source-rejected' === ( $GLOBALS['fetch_disposition_smoke_engine'][1814]['job_status'] ?? '' ) ); + + echo "Case 2: defer_item releases claim without marking processed\n"; + $GLOBALS['fetch_disposition_smoke_processed'] = array(); + $defer = $tool->handle_tool_call( + array( + 'job_id' => 1815, + 'flow_step_id' => 'ai-step_7', + 'engine' => $engine, + 'reason' => 'tool-error', + ), + array( 'disposition' => 'defer_item' ) + ); + assert_fetch_disposition_smoke( 'defer_item succeeds', true === ( $defer['success'] ?? false ) ); + assert_fetch_disposition_smoke( 'defer_item reports explicit tool name', 'defer_item' === ( $defer['tool_name'] ?? '' ) ); + assert_fetch_disposition_smoke( 'defer_item releases fetch step claim', array( 'fetch-step_7', 'rss', 'source-123' ) === ( $GLOBALS['fetch_disposition_smoke_released'][0] ?? null ) ); + assert_fetch_disposition_smoke( 'defer_item does not mark processed', array() === $GLOBALS['fetch_disposition_smoke_processed'] ); + assert_fetch_disposition_smoke( 'tool-error deferral remains retry eligible', 'failed - item-deferred' === ( $GLOBALS['fetch_disposition_smoke_engine'][1815]['job_status'] ?? '' ) ); + + echo "Case 3: production tool surface exposes positive affordances\n"; + $fetch_handler = file_get_contents( __DIR__ . '/../inc/Core/Steps/Fetch/Handlers/FetchHandler.php' ); + $execute_step = file_get_contents( __DIR__ . '/../inc/Abilities/Engine/ExecuteStepAbility.php' ); + assert_fetch_disposition_smoke( 'fetch surface exposes reject_source', str_contains( $fetch_handler, "'reject_source'" ) ); + assert_fetch_disposition_smoke( 'fetch surface exposes defer_item', str_contains( $fetch_handler, "'defer_item'" ) ); + assert_fetch_disposition_smoke( 'fetch surface avoids legacy skip tool registration', ! str_contains( $fetch_handler, "'skip_item'" ) ); + assert_fetch_disposition_smoke( 'reject_source describes reasoned content/source rejection', str_contains( $fetch_handler, 'reasoned content/source evaluation' ) ); + assert_fetch_disposition_smoke( 'defer_item describes safe completion and retry eligibility', str_contains( $fetch_handler, 'cannot safely complete processing now' ) && str_contains( $fetch_handler, 'remain eligible' ) ); + assert_fetch_disposition_smoke( 'normal completion still marks processed', str_contains( $execute_step, '$this->markCompletedItemProcessed( $job_id );' ) && str_contains( $execute_step, 'JobStatus::COMPLETED' ) ); + + echo "\nFetch item dispositions smoke complete: {$total} assertions, {$failed} failures.\n"; + if ( $failed > 0 ) { + exit( 1 ); + } +}