Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions inc/Cli/Commands/AICommand.php
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,14 @@ private function renderTableSummary( array $result ): void {
'metric' => 'conversation_user_message_bytes',
'value' => (int) $result['conversation_user_message_bytes'],
),
array(
'metric' => 'canonical_packet_json_bytes',
'value' => (int) $result['canonical_packet_json_bytes'],
),
array(
'metric' => 'projected_packet_json_bytes',
'value' => (int) $result['projected_packet_json_bytes'],
),
array(
'metric' => 'conversation_packet_json_bytes',
'value' => (int) $result['conversation_packet_json_bytes'],
Expand Down
77 changes: 22 additions & 55 deletions inc/Core/Steps/AI/AIStep.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
use DataMachine\Core\Steps\StepTypeRegistrationTrait;
use DataMachine\Core\Steps\QueueableTrait;
use DataMachine\Engine\AI\ConversationManager;
use DataMachine\Engine\AI\DataPacketPromptProjector;
use DataMachine\Engine\AI\PipelineTranscriptPolicy;
use DataMachine\Engine\AI\Tools\ToolExecutor;
use DataMachine\Engine\AI\Tools\ToolPolicyResolver;
Expand Down Expand Up @@ -188,10 +189,25 @@ protected function executeStep(): array {
$mime_type = is_string( $file_info['type'] ) ? $file_info['type'] : '';
}

$pipeline_step_id = $this->flow_step_config['pipeline_step_id'];

// Resolve user_id and agent_id from engine snapshot (set by RunFlowAbility).
$job_snapshot = $this->engine->get( 'job' );
$agent_id = (int) ( $job_snapshot['agent_id'] ?? 0 );
$user_id = (int) ( $job_snapshot['user_id'] ?? 0 );

$packet_projection_context = array(
'job_id' => $this->job_id,
'pipeline_id' => $job_snapshot['pipeline_id'] ?? null,
'flow_id' => $job_snapshot['flow_id'] ?? null,
'flow_step_id' => $this->flow_step_id,
'pipeline_step_id' => $pipeline_step_id,
);

$messages = array();

if ( ! empty( $this->dataPackets ) ) {
$data_packet_content = wp_json_encode( array( 'data_packets' => self::sanitizeDataPacketsForAi( $this->dataPackets ) ), JSON_PRETTY_PRINT | JSON_UNESCAPED_UNICODE );
$data_packet_content = wp_json_encode( array( 'data_packets' => DataPacketPromptProjector::project( $this->dataPackets, $packet_projection_context ) ), JSON_UNESCAPED_UNICODE );
$messages[] = ConversationManager::buildConversationMessage(
'user',
false === $data_packet_content ? '' : $data_packet_content
Expand All @@ -215,17 +231,10 @@ protected function executeStep(): array {
$messages[] = ConversationManager::buildConversationMessage( 'user', $user_message );
}

$pipeline_step_id = $this->flow_step_config['pipeline_step_id'];

$pipeline_step_config = $this->engine->getPipelineStepConfig( $pipeline_step_id );

$max_turns = PluginSettings::get( 'max_turns', PluginSettings::DEFAULT_MAX_TURNS );

// Resolve user_id and agent_id from engine snapshot (set by RunFlowAbility).
$job_snapshot = $this->engine->get( 'job' );
$agent_id = (int) ( $job_snapshot['agent_id'] ?? 0 );
$user_id = (int) ( $job_snapshot['user_id'] ?? 0 );

// Resolve transcript persistence policy once per AI step invocation.
// Resolution order: flow > pipeline > site option (default false).
// The boolean is threaded through $payload so the loop doesn't need
Expand Down Expand Up @@ -431,58 +440,16 @@ protected function executeStep(): array {
}

/**
* Remove local-only file paths before serializing data packets to AI.
* Project data packets before serializing them to AI.
*
* Fetch handlers may include file_info.file_path so downstream runtime steps
* can attach images or access files. That internal path should not be exposed
* in the AI-visible JSON payload because models can copy it into generated
* content. The original packets remain unchanged for runtime use.
* Kept as a compatibility wrapper for older tests/call sites. Canonical
* packets remain unchanged for runtime and storage use.
*
* @param array $data_packets Original data packets.
* @return array Sanitized copy safe for AI serialization.
* @return array Projected copy safe for AI serialization.
*/
public static function sanitizeDataPacketsForAi( array $data_packets ): array {
$sanitized_packets = array();

foreach ( $data_packets as $packet ) {
if ( ! is_array( $packet ) ) {
$sanitized_packets[] = $packet;
continue;
}

$sanitized_packet = $packet;

if ( isset( $sanitized_packet['data'] ) && is_array( $sanitized_packet['data'] ) ) {
$sanitized_packet['data'] = self::sanitizePacketDataForAi( $sanitized_packet['data'] );
}

$sanitized_packets[] = $sanitized_packet;
}

return $sanitized_packets;
}

/**
* Remove internal file path fields from packet data.
*
* @param array $packet_data Packet data array.
* @return array Sanitized packet data.
*/
private static function sanitizePacketDataForAi( array $packet_data ): array {
if ( ! isset( $packet_data['file_info'] ) || ! is_array( $packet_data['file_info'] ) ) {
return $packet_data;
}

$sanitized_file_info = $packet_data['file_info'];
unset( $sanitized_file_info['file_path'] );

if ( empty( $sanitized_file_info ) ) {
unset( $packet_data['file_info'] );
return $packet_data;
}

$packet_data['file_info'] = $sanitized_file_info;
return $packet_data;
return DataPacketPromptProjector::project( $data_packets );
}

/**
Expand Down
89 changes: 89 additions & 0 deletions inc/Engine/AI/DataPacketPromptProjector.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
<?php
/**
* DataPacket prompt projection.
*
* @package DataMachine\Engine\AI
*/

namespace DataMachine\Engine\AI;

defined( 'ABSPATH' ) || exit;

/**
* Builds prompt-facing packet copies without changing canonical packets.
*/
class DataPacketPromptProjector {

/**
* Project canonical DataPackets for AI prompt serialization.
*
* Data Machine's default projection is intentionally source-agnostic. Source
* integrations that understand handler-specific packet shapes can replace or
* compact the prompt-facing packet with the datamachine_ai_project_data_packet
* filter while canonical storage/engine packets remain unchanged.
*
* @param array $data_packets Canonical packets from storage/engine state.
* @param array $context Source-agnostic runtime context for projection filters.
* @return array Prompt-facing packet copies.
*/
public static function project( array $data_packets, array $context = array() ): array {
$projected_packets = array();

foreach ( $data_packets as $packet ) {
if ( ! is_array( $packet ) ) {
$projected_packets[] = $packet;
continue;
}

$projected_packets[] = self::projectPacket( $packet, $context );
}

return $projected_packets;
}

/**
* Project one packet using the generic default and filter extension point.
*
* @param array $packet Canonical packet.
* @param array $context Source-agnostic runtime context for projection filters.
* @return array Prompt-facing packet.
*/
private static function projectPacket( array $packet, array $context ): array {
$projected = $packet;
if ( isset( $projected['data'] ) && is_array( $projected['data'] ) ) {
$projected['data'] = self::sanitizePacketData( $projected['data'] );
}

if ( function_exists( 'apply_filters' ) ) {
$filtered = apply_filters( 'datamachine_ai_project_data_packet', $projected, $packet, $context );
if ( is_array( $filtered ) ) {
return $filtered;
}
}

return $projected;
}

/**
* Remove internal fields from prompt-facing data.
*
* @param array $packet_data Packet data.
* @return array Sanitized packet data.
*/
private static function sanitizePacketData( array $packet_data ): array {
if ( ! isset( $packet_data['file_info'] ) || ! is_array( $packet_data['file_info'] ) ) {
return $packet_data;
}

$sanitized_file_info = $packet_data['file_info'];
unset( $sanitized_file_info['file_path'] );

if ( empty( $sanitized_file_info ) ) {
unset( $packet_data['file_info'] );
return $packet_data;
}

$packet_data['file_info'] = $sanitized_file_info;
return $packet_data;
}
}
34 changes: 25 additions & 9 deletions inc/Engine/AI/RequestInspector.php
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
use DataMachine\Core\EngineData;
use DataMachine\Core\FilesRepository\FileRetrieval;
use DataMachine\Core\PluginSettings;
use DataMachine\Core\Steps\AI\AIStep;
use DataMachine\Core\Steps\AI\ToolPolicy\PipelineToolPolicyArgs;
use DataMachine\Core\Steps\FlowStepConfig;
use DataMachine\Abilities\Flow\QueueAbility;
Expand Down Expand Up @@ -89,9 +88,10 @@ public function inspectPipelineJob( int $job_id, ?string $flow_step_id = null ):
);
}

$data_packets = $this->retrieveDataPackets( $job_id, $engine );
$messages = $this->buildInitialMessages( $data_packets, $engine, $flow_step_config );
$payload = $this->buildPayload( $job_id, $flow_step_id, $pipeline_step_id, $data_packets, $engine, $job );
$data_packets = $this->retrieveDataPackets( $job_id, $engine );
$packet_projection_context = $this->buildProjectionContext( $job_id, $flow_step_id, $pipeline_step_id, $engine, $job );
$messages = $this->buildInitialMessages( $data_packets, $engine, $flow_step_config, $packet_projection_context );
$payload = $this->buildPayload( $job_id, $flow_step_id, $pipeline_step_id, $data_packets, $engine, $job );

$previous_step_config = $this->getAdjacentStepConfig( $engine, $flow_step_id, $payload, 'previous' );
$next_step_config = $this->getAdjacentStepConfig( $engine, $flow_step_id, $payload, 'next' );
Expand Down Expand Up @@ -161,7 +161,7 @@ public function inspectPipelineJob( int $job_id, ?string $flow_step_id = null ):
'model' => $model,
'mode' => ToolPolicyResolver::MODE_PIPELINE,
),
$this->measure( $assembled, $data_packets, $messages )
$this->measure( $assembled, $data_packets, $messages, $packet_projection_context )
);
}

Expand All @@ -187,11 +187,11 @@ private function retrieveDataPackets( int $job_id, EngineData $engine ): array {
return ( new FileRetrieval() )->retrieve_data_by_job_id( $job_id, $context );
}

private function buildInitialMessages( array $data_packets, EngineData $engine, array $flow_step_config ): array {
private function buildInitialMessages( array $data_packets, EngineData $engine, array $flow_step_config, array $packet_projection_context ): array {
$messages = array();

if ( ! empty( $data_packets ) ) {
$data_packet_content = wp_json_encode( array( 'data_packets' => AIStep::sanitizeDataPacketsForAi( $data_packets ) ), JSON_PRETTY_PRINT | JSON_UNESCAPED_UNICODE );
$data_packet_content = wp_json_encode( array( 'data_packets' => DataPacketPromptProjector::project( $data_packets, $packet_projection_context ) ), JSON_UNESCAPED_UNICODE );
$messages[] = ConversationManager::buildConversationMessage(
'user',
false === $data_packet_content ? '' : $data_packet_content
Expand Down Expand Up @@ -231,6 +231,18 @@ private function peekPromptQueueValue( array $flow_step_config ): string {
return trim( (string) ( $queue[0]['prompt'] ?? '' ) );
}

private function buildProjectionContext( int $job_id, string $flow_step_id, string $pipeline_step_id, EngineData $engine, array $job ): array {
$job_snapshot = $engine->getJobContext();

return array(
'job_id' => $job_id,
'pipeline_id' => $job_snapshot['pipeline_id'] ?? ( $job['pipeline_id'] ?? null ),
'flow_id' => $job_snapshot['flow_id'] ?? ( $job['flow_id'] ?? null ),
'flow_step_id' => $flow_step_id,
'pipeline_step_id' => $pipeline_step_id,
);
}

private function buildPayload(
int $job_id,
string $flow_step_id,
Expand Down Expand Up @@ -266,20 +278,24 @@ private function getAdjacentStepConfig( EngineData $engine, string $flow_step_id
return $adjacent_id ? $engine->getFlowStepConfig( $adjacent_id ) : null;
}

private function measure( array $assembled, array $data_packets, array $initial_messages ): array {
private function measure( array $assembled, array $data_packets, array $initial_messages, array $packet_projection_context ): array {
$request = $assembled['request'];
$structured_tools = $assembled['structured_tools'];
$messages = $request['messages'] ?? array();
$tools = $request['tools'] ?? array();

$projected_packets = DataPacketPromptProjector::project( $data_packets, $packet_projection_context );

return array(
'message_count' => count( $messages ),
'initial_message_count' => count( $initial_messages ),
'total_request_json_bytes' => self::jsonBytes( $request ),
'messages_json_bytes' => self::jsonBytes( $messages ),
'tools_json_bytes' => self::jsonBytes( $tools ),
'conversation_user_message_bytes' => self::sumUserMessageBytes( $initial_messages ),
'conversation_packet_json_bytes' => self::jsonBytes( AIStep::sanitizeDataPacketsForAi( $data_packets ) ),
'canonical_packet_json_bytes' => self::jsonBytes( $data_packets ),
'projected_packet_json_bytes' => self::jsonBytes( $projected_packets ),
'conversation_packet_json_bytes' => self::jsonBytes( $projected_packets ),
'directives' => $assembled['directive_breakdown'],
'tool_count' => count( $structured_tools ),
'largest_tools' => $this->largestTools( $structured_tools ),
Expand Down
Loading
Loading