diff --git a/inc/Cli/Commands/AICommand.php b/inc/Cli/Commands/AICommand.php index a0c709650..8ddd1ca69 100644 --- a/inc/Cli/Commands/AICommand.php +++ b/inc/Cli/Commands/AICommand.php @@ -104,6 +104,14 @@ private function renderTableSummary( array $result ): void { 'metric' => 'conversation_user_message_bytes', 'value' => (int) $result['conversation_user_message_bytes'], ), + array( + 'metric' => 'canonical_packet_json_bytes', + 'value' => (int) $result['canonical_packet_json_bytes'], + ), + array( + 'metric' => 'projected_packet_json_bytes', + 'value' => (int) $result['projected_packet_json_bytes'], + ), array( 'metric' => 'conversation_packet_json_bytes', 'value' => (int) $result['conversation_packet_json_bytes'], diff --git a/inc/Core/Steps/AI/AIStep.php b/inc/Core/Steps/AI/AIStep.php index 75adf1c57..0adafc5a0 100644 --- a/inc/Core/Steps/AI/AIStep.php +++ b/inc/Core/Steps/AI/AIStep.php @@ -12,6 +12,7 @@ use DataMachine\Core\Steps\StepTypeRegistrationTrait; use DataMachine\Core\Steps\QueueableTrait; use DataMachine\Engine\AI\ConversationManager; +use DataMachine\Engine\AI\DataPacketPromptProjector; use DataMachine\Engine\AI\PipelineTranscriptPolicy; use DataMachine\Engine\AI\Tools\ToolExecutor; use DataMachine\Engine\AI\Tools\ToolPolicyResolver; @@ -188,10 +189,25 @@ protected function executeStep(): array { $mime_type = is_string( $file_info['type'] ) ? $file_info['type'] : ''; } + $pipeline_step_id = $this->flow_step_config['pipeline_step_id']; + + // Resolve user_id and agent_id from engine snapshot (set by RunFlowAbility). + $job_snapshot = $this->engine->get( 'job' ); + $agent_id = (int) ( $job_snapshot['agent_id'] ?? 0 ); + $user_id = (int) ( $job_snapshot['user_id'] ?? 0 ); + + $packet_projection_context = array( + 'job_id' => $this->job_id, + 'pipeline_id' => $job_snapshot['pipeline_id'] ?? null, + 'flow_id' => $job_snapshot['flow_id'] ?? null, + 'flow_step_id' => $this->flow_step_id, + 'pipeline_step_id' => $pipeline_step_id, + ); + $messages = array(); if ( ! empty( $this->dataPackets ) ) { - $data_packet_content = wp_json_encode( array( 'data_packets' => self::sanitizeDataPacketsForAi( $this->dataPackets ) ), JSON_PRETTY_PRINT | JSON_UNESCAPED_UNICODE ); + $data_packet_content = wp_json_encode( array( 'data_packets' => DataPacketPromptProjector::project( $this->dataPackets, $packet_projection_context ) ), JSON_UNESCAPED_UNICODE ); $messages[] = ConversationManager::buildConversationMessage( 'user', false === $data_packet_content ? '' : $data_packet_content @@ -215,17 +231,10 @@ protected function executeStep(): array { $messages[] = ConversationManager::buildConversationMessage( 'user', $user_message ); } - $pipeline_step_id = $this->flow_step_config['pipeline_step_id']; - $pipeline_step_config = $this->engine->getPipelineStepConfig( $pipeline_step_id ); $max_turns = PluginSettings::get( 'max_turns', PluginSettings::DEFAULT_MAX_TURNS ); - // Resolve user_id and agent_id from engine snapshot (set by RunFlowAbility). - $job_snapshot = $this->engine->get( 'job' ); - $agent_id = (int) ( $job_snapshot['agent_id'] ?? 0 ); - $user_id = (int) ( $job_snapshot['user_id'] ?? 0 ); - // Resolve transcript persistence policy once per AI step invocation. // Resolution order: flow > pipeline > site option (default false). // The boolean is threaded through $payload so the loop doesn't need @@ -431,58 +440,16 @@ protected function executeStep(): array { } /** - * Remove local-only file paths before serializing data packets to AI. + * Project data packets before serializing them to AI. * - * Fetch handlers may include file_info.file_path so downstream runtime steps - * can attach images or access files. That internal path should not be exposed - * in the AI-visible JSON payload because models can copy it into generated - * content. The original packets remain unchanged for runtime use. + * Kept as a compatibility wrapper for older tests/call sites. Canonical + * packets remain unchanged for runtime and storage use. * * @param array $data_packets Original data packets. - * @return array Sanitized copy safe for AI serialization. + * @return array Projected copy safe for AI serialization. */ public static function sanitizeDataPacketsForAi( array $data_packets ): array { - $sanitized_packets = array(); - - foreach ( $data_packets as $packet ) { - if ( ! is_array( $packet ) ) { - $sanitized_packets[] = $packet; - continue; - } - - $sanitized_packet = $packet; - - if ( isset( $sanitized_packet['data'] ) && is_array( $sanitized_packet['data'] ) ) { - $sanitized_packet['data'] = self::sanitizePacketDataForAi( $sanitized_packet['data'] ); - } - - $sanitized_packets[] = $sanitized_packet; - } - - return $sanitized_packets; - } - - /** - * Remove internal file path fields from packet data. - * - * @param array $packet_data Packet data array. - * @return array Sanitized packet data. - */ - private static function sanitizePacketDataForAi( array $packet_data ): array { - if ( ! isset( $packet_data['file_info'] ) || ! is_array( $packet_data['file_info'] ) ) { - return $packet_data; - } - - $sanitized_file_info = $packet_data['file_info']; - unset( $sanitized_file_info['file_path'] ); - - if ( empty( $sanitized_file_info ) ) { - unset( $packet_data['file_info'] ); - return $packet_data; - } - - $packet_data['file_info'] = $sanitized_file_info; - return $packet_data; + return DataPacketPromptProjector::project( $data_packets ); } /** diff --git a/inc/Engine/AI/DataPacketPromptProjector.php b/inc/Engine/AI/DataPacketPromptProjector.php new file mode 100644 index 000000000..af16d6e68 --- /dev/null +++ b/inc/Engine/AI/DataPacketPromptProjector.php @@ -0,0 +1,89 @@ +retrieveDataPackets( $job_id, $engine ); - $messages = $this->buildInitialMessages( $data_packets, $engine, $flow_step_config ); - $payload = $this->buildPayload( $job_id, $flow_step_id, $pipeline_step_id, $data_packets, $engine, $job ); + $data_packets = $this->retrieveDataPackets( $job_id, $engine ); + $packet_projection_context = $this->buildProjectionContext( $job_id, $flow_step_id, $pipeline_step_id, $engine, $job ); + $messages = $this->buildInitialMessages( $data_packets, $engine, $flow_step_config, $packet_projection_context ); + $payload = $this->buildPayload( $job_id, $flow_step_id, $pipeline_step_id, $data_packets, $engine, $job ); $previous_step_config = $this->getAdjacentStepConfig( $engine, $flow_step_id, $payload, 'previous' ); $next_step_config = $this->getAdjacentStepConfig( $engine, $flow_step_id, $payload, 'next' ); @@ -161,7 +161,7 @@ public function inspectPipelineJob( int $job_id, ?string $flow_step_id = null ): 'model' => $model, 'mode' => ToolPolicyResolver::MODE_PIPELINE, ), - $this->measure( $assembled, $data_packets, $messages ) + $this->measure( $assembled, $data_packets, $messages, $packet_projection_context ) ); } @@ -187,11 +187,11 @@ private function retrieveDataPackets( int $job_id, EngineData $engine ): array { return ( new FileRetrieval() )->retrieve_data_by_job_id( $job_id, $context ); } - private function buildInitialMessages( array $data_packets, EngineData $engine, array $flow_step_config ): array { + private function buildInitialMessages( array $data_packets, EngineData $engine, array $flow_step_config, array $packet_projection_context ): array { $messages = array(); if ( ! empty( $data_packets ) ) { - $data_packet_content = wp_json_encode( array( 'data_packets' => AIStep::sanitizeDataPacketsForAi( $data_packets ) ), JSON_PRETTY_PRINT | JSON_UNESCAPED_UNICODE ); + $data_packet_content = wp_json_encode( array( 'data_packets' => DataPacketPromptProjector::project( $data_packets, $packet_projection_context ) ), JSON_UNESCAPED_UNICODE ); $messages[] = ConversationManager::buildConversationMessage( 'user', false === $data_packet_content ? '' : $data_packet_content @@ -231,6 +231,18 @@ private function peekPromptQueueValue( array $flow_step_config ): string { return trim( (string) ( $queue[0]['prompt'] ?? '' ) ); } + private function buildProjectionContext( int $job_id, string $flow_step_id, string $pipeline_step_id, EngineData $engine, array $job ): array { + $job_snapshot = $engine->getJobContext(); + + return array( + 'job_id' => $job_id, + 'pipeline_id' => $job_snapshot['pipeline_id'] ?? ( $job['pipeline_id'] ?? null ), + 'flow_id' => $job_snapshot['flow_id'] ?? ( $job['flow_id'] ?? null ), + 'flow_step_id' => $flow_step_id, + 'pipeline_step_id' => $pipeline_step_id, + ); + } + private function buildPayload( int $job_id, string $flow_step_id, @@ -266,12 +278,14 @@ private function getAdjacentStepConfig( EngineData $engine, string $flow_step_id return $adjacent_id ? $engine->getFlowStepConfig( $adjacent_id ) : null; } - private function measure( array $assembled, array $data_packets, array $initial_messages ): array { + private function measure( array $assembled, array $data_packets, array $initial_messages, array $packet_projection_context ): array { $request = $assembled['request']; $structured_tools = $assembled['structured_tools']; $messages = $request['messages'] ?? array(); $tools = $request['tools'] ?? array(); + $projected_packets = DataPacketPromptProjector::project( $data_packets, $packet_projection_context ); + return array( 'message_count' => count( $messages ), 'initial_message_count' => count( $initial_messages ), @@ -279,7 +293,9 @@ private function measure( array $assembled, array $data_packets, array $initial_ 'messages_json_bytes' => self::jsonBytes( $messages ), 'tools_json_bytes' => self::jsonBytes( $tools ), 'conversation_user_message_bytes' => self::sumUserMessageBytes( $initial_messages ), - 'conversation_packet_json_bytes' => self::jsonBytes( AIStep::sanitizeDataPacketsForAi( $data_packets ) ), + 'canonical_packet_json_bytes' => self::jsonBytes( $data_packets ), + 'projected_packet_json_bytes' => self::jsonBytes( $projected_packets ), + 'conversation_packet_json_bytes' => self::jsonBytes( $projected_packets ), 'directives' => $assembled['directive_breakdown'], 'tool_count' => count( $structured_tools ), 'largest_tools' => $this->largestTools( $structured_tools ), diff --git a/tests/Unit/Core/Steps/AI/AIStepTest.php b/tests/Unit/Core/Steps/AI/AIStepTest.php index 943582dbc..7552ebba4 100644 --- a/tests/Unit/Core/Steps/AI/AIStepTest.php +++ b/tests/Unit/Core/Steps/AI/AIStepTest.php @@ -8,6 +8,7 @@ namespace DataMachine\Tests\Unit\Core\Steps\AI; use DataMachine\Core\Steps\AI\AIStep; +use DataMachine\Engine\AI\DataPacketPromptProjector; use DataMachine\Engine\AI\Tools\ToolResultFinder; use PHPUnit\Framework\TestCase; use ReflectionMethod; @@ -79,6 +80,126 @@ public function test_sanitize_data_packets_for_ai_leaves_packets_without_file_in $this->assertSame( $data_packets, AIStep::sanitizeDataPacketsForAi( $data_packets ) ); } + public function test_prompt_projection_generic_fallback_preserves_unknown_packet_shape(): void { + $canonical = array( + array( + 'type' => 'fetch', + 'data' => array( + 'title' => 'RSS item', + 'body' => 'Keep body', + 'file_info' => array( + 'file_path' => '/tmp/runtime-only.jpg', + 'mime_type' => 'image/jpeg', + ), + ), + 'metadata' => array( + 'source_type' => 'rss', + 'custom_key' => 'custom value', + ), + ), + ); + + $projected = DataPacketPromptProjector::project( $canonical ); + + $this->assertSame( 'RSS item', $projected[0]['data']['title'] ); + $this->assertSame( 'Keep body', $projected[0]['data']['body'] ); + $this->assertSame( 'rss', $projected[0]['metadata']['source_type'] ); + $this->assertSame( 'custom value', $projected[0]['metadata']['custom_key'] ); + $this->assertArrayNotHasKey( 'file_path', $projected[0]['data']['file_info'] ); + $this->assertSame( '/tmp/runtime-only.jpg', $canonical[0]['data']['file_info']['file_path'] ); + } + + public function test_prompt_projection_does_not_flatten_unknown_json_body_packets(): void { + $canonical = array( + array( + 'type' => 'fetch', + 'data' => array( + 'title' => 'Unknown JSON packet', + 'body' => '{"title":"Nested title","custom":"important"}', + ), + 'metadata' => array( 'source_type' => 'custom_json_feed' ), + ), + ); + + $this->assertSame( $canonical, DataPacketPromptProjector::project( $canonical ) ); + } + + public function test_prompt_projection_filter_can_replace_prompt_packet_without_mutating_canonical(): void { + $canonical = array( + array( + 'type' => 'fetch', + 'data' => array( + 'title' => 'Verbose packet', + 'body' => 'Long source-specific body that an integration understands.', + ), + 'metadata' => array( + 'source_type' => 'integration_owned_source', + 'raw_payload' => array( 'duplicated' => true ), + ), + ), + ); + + add_filter( + 'datamachine_ai_project_data_packet', + static function ( array $projected, array $packet ): array { + if ( 'integration_owned_source' !== ( $packet['metadata']['source_type'] ?? '' ) ) { + return $projected; + } + + return array( + 'type' => $packet['type'], + 'data' => array( 'title' => $packet['data']['title'] ), + 'metadata' => array( 'source_type' => $packet['metadata']['source_type'] ), + ); + }, + 10, + 2 + ); + + $canonical_before = $canonical; + $projected = DataPacketPromptProjector::project( $canonical ); + + $this->assertSame( $canonical_before, $canonical ); + $this->assertSame( 'Verbose packet', $projected[0]['data']['title'] ); + $this->assertArrayNotHasKey( 'body', $projected[0]['data'] ); + $this->assertArrayNotHasKey( 'raw_payload', $projected[0]['metadata'] ); + } + + public function test_prompt_projection_filter_receives_source_agnostic_context(): void { + $canonical = array( + array( + 'type' => 'fetch', + 'data' => array( 'title' => 'Context packet' ), + 'metadata' => array( 'source_type' => 'context_source' ), + ), + ); + $context = array( + 'job_id' => 1799, + 'pipeline_id' => 3, + 'flow_id' => 2, + 'flow_step_id' => 'flow_step_ai', + 'pipeline_step_id' => 'pipeline_step_ai', + ); + $received = array(); + + add_filter( + 'datamachine_ai_project_data_packet', + static function ( array $projected, array $packet, array $filter_context ) use ( &$received ): array { + if ( 'context_source' === ( $packet['metadata']['source_type'] ?? '' ) ) { + $received = $filter_context; + } + + return $projected; + }, + 10, + 3 + ); + + DataPacketPromptProjector::project( $canonical, $context ); + + $this->assertSame( $context, $received ); + } + /** * Test that processLoopResults does NOT carry forward input DataPackets. * diff --git a/tests/ai-packet-projection-smoke.php b/tests/ai-packet-projection-smoke.php new file mode 100644 index 000000000..96eee7f4d --- /dev/null +++ b/tests/ai-packet-projection-smoke.php @@ -0,0 +1,165 @@ + $callback, + 'accepted_args' => $_accepted_args, + ); +} + +function apply_filters( string $hook, $value, ...$args ) { + global $test_filters; + if ( empty( $test_filters[ $hook ] ) ) { + return $value; + } + + ksort( $test_filters[ $hook ] ); + foreach ( $test_filters[ $hook ] as $callbacks ) { + foreach ( $callbacks as $filter ) { + $accepted_args = max( 1, (int) $filter['accepted_args'] ); + $filter_args = array_slice( array_merge( array( $value ), $args ), 0, $accepted_args ); + $value = $filter['callback']( ...$filter_args ); + } + } + + return $value; +} + +function wp_json_encode( $value, int $flags = 0 ) { + return json_encode( $value, $flags ); +} + +require_once __DIR__ . '/../inc/Engine/AI/DataPacketPromptProjector.php'; + +$failed = 0; +$total = 0; + +function assert_projection( string $name, bool $condition, string $detail = '' ): void { + global $failed, $total; + ++$total; + if ( $condition ) { + echo " [PASS] $name\n"; + return; + } + + echo " [FAIL] $name" . ( $detail ? " - $detail" : '' ) . "\n"; + ++$failed; +} + +echo "AI DataPacket prompt projection smoke\n"; + +$canonical = array( + array( + 'type' => 'fetch', + 'timestamp' => 1770000000, + 'data' => array( + 'title' => 'Generic packet', + 'body' => 'Plain source text', + 'file_info' => array( + 'file_path' => '/tmp/runtime-only.jpg', + 'mime_type' => 'image/jpeg', + ), + ), + 'metadata' => array( + 'source_type' => 'generic_source', + 'custom_key' => 'custom value', + ), + ), +); + +$canonical_before = $canonical; +$projected = \DataMachine\Engine\AI\DataPacketPromptProjector::project( $canonical ); + +assert_projection( 'canonical packet unchanged after generic projection', $canonical_before === $canonical ); +assert_projection( 'generic title preserved', 'Generic packet' === ( $projected[0]['data']['title'] ?? '' ) ); +assert_projection( 'generic body preserved', 'Plain source text' === ( $projected[0]['data']['body'] ?? '' ) ); +assert_projection( 'generic metadata preserved', 'custom value' === ( $projected[0]['metadata']['custom_key'] ?? '' ) ); +assert_projection( 'runtime file_path stripped from prompt data', ! array_key_exists( 'file_path', $projected[0]['data']['file_info'] ?? array() ) ); + +add_filter( + 'datamachine_ai_project_data_packet', + static function ( array $projected_packet, array $canonical_packet ): array { + if ( 'integration_owned_source' !== ( $canonical_packet['metadata']['source_type'] ?? '' ) ) { + return $projected_packet; + } + + return array( + 'type' => $canonical_packet['type'], + 'data' => array( + 'title' => $canonical_packet['data']['title'], + 'snippet' => 'Source-specific compact projection', + ), + 'metadata' => array( 'source_type' => $canonical_packet['metadata']['source_type'] ), + ); + }, + 10, + 2 +); + +$source_specific = array( + array( + 'type' => 'fetch', + 'data' => array( + 'title' => 'Verbose integration packet', + 'body' => str_repeat( 'Long duplicated source text. ', 20 ), + ), + 'metadata' => array( + 'source_type' => 'integration_owned_source', + 'raw_payload' => array( 'duplicated' => true ), + ), + ), +); +$source_specific_before = $source_specific; +$context = array( + 'job_id' => 1799, + 'pipeline_id' => 3, + 'flow_id' => 2, + 'flow_step_id' => 'flow_step_ai', + 'pipeline_step_id' => 'pipeline_step_ai', +); +$received_context = array(); + +add_filter( + 'datamachine_ai_project_data_packet', + static function ( array $projected_packet, array $_canonical_packet, array $filter_context ) use ( &$received_context ): array { + $received_context = $filter_context; + return $projected_packet; + }, + 20, + 3 +); + +$compact = \DataMachine\Engine\AI\DataPacketPromptProjector::project( $source_specific, $context ); + +assert_projection( 'filter projection leaves canonical source packet unchanged', $source_specific_before === $source_specific ); +assert_projection( 'filter projection can remove verbose body', ! array_key_exists( 'body', $compact[0]['data'] ?? array() ) ); +assert_projection( 'filter projection can remove source-specific raw metadata', ! array_key_exists( 'raw_payload', $compact[0]['metadata'] ?? array() ) ); +assert_projection( 'filter projection keeps compact source text', 'Source-specific compact projection' === ( $compact[0]['data']['snippet'] ?? '' ) ); +assert_projection( 'three-argument filter receives projection context', $context === $received_context ); + +$canonical_bytes = strlen( wp_json_encode( $source_specific, JSON_UNESCAPED_UNICODE ) ); +$projected_bytes = strlen( wp_json_encode( $compact, JSON_UNESCAPED_UNICODE ) ); + +assert_projection( 'filter-projected packet JSON is smaller than canonical JSON', $projected_bytes < $canonical_bytes, "canonical=$canonical_bytes projected=$projected_bytes" ); + +$prompt_json = wp_json_encode( array( 'data_packets' => $compact ), JSON_UNESCAPED_UNICODE ); +assert_projection( 'prompt JSON is compact by default', ! str_contains( $prompt_json, "\n" ) ); + +echo "\n$total assertions, $failed failures\n"; +if ( $failed > 0 ) { + exit( 1 ); +} diff --git a/tests/ai-request-inspector-smoke.php b/tests/ai-request-inspector-smoke.php index 843c0fbe4..87219a69f 100644 --- a/tests/ai-request-inspector-smoke.php +++ b/tests/ai-request-inspector-smoke.php @@ -202,6 +202,8 @@ function ( array $directives ): array { assert_test( '--job option documented', false !== strpos( $command, '--job=' ) ); assert_test( '--step option documented', false !== strpos( $command, '--step=' ) ); assert_test( 'json output path exists', false !== strpos( $command, "'json' === \$format" ) ); +assert_test( 'table output includes canonical packet bytes', false !== strpos( $command, 'canonical_packet_json_bytes' ) ); +assert_test( 'table output includes projected packet bytes', false !== strpos( $command, 'projected_packet_json_bytes' ) ); assert_test( 'table output includes directive section', false !== strpos( $command, "Directives" ) ); assert_test( 'table output includes largest tools section', false !== strpos( $command, "Largest tools" ) );