Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion inc/Abilities/Engine/ExecuteStepAbility.php
Original file line number Diff line number Diff line change
Expand Up @@ -692,7 +692,7 @@ private function getPriorTerminalStatus( int $job_id ): ?string {
return $status;
}

if ( JobStatus::PENDING === JobStatus::fromString( $status )->getBaseStatus() && $this->hasPendingRetry( $job_id ) ) {
if ( JobStatus::PENDING === JobStatus::fromString( $status )->getBaseStatus() && ( $this->hasPendingRetry( $job_id ) || $this->hasPendingAIConcurrencyThrottle( $job_id ) ) ) {
return $status;
}

Expand Down Expand Up @@ -721,6 +721,27 @@ private function hasPendingRetry( int $job_id ): bool {
return ! empty( $retry['next_retry_at'] );
}

/**
* Check whether the job has a future AI concurrency defer recorded.
*
* AIStep parks above-limit work back to pending and reschedules the same
* step. Treat that as an already-routed outcome, not as an empty-packet
* failure, so throttling remains distinct from provider/model errors.
*
* @param int $job_id Job ID.
* @return bool
*/
private function hasPendingAIConcurrencyThrottle( int $job_id ): bool {
if ( ! function_exists( 'datamachine_get_engine_data' ) ) {
return false;
}

$engine_data = datamachine_get_engine_data( $job_id );
$throttle = is_array( $engine_data['ai_concurrency_throttle'] ?? null ) ? $engine_data['ai_concurrency_throttle'] : array();

return ! empty( $throttle['next_retry_at'] );
}

/**
* Mark a completed job's source item as processed.
*
Expand Down
25 changes: 15 additions & 10 deletions inc/Core/PluginSettings.php
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@

class PluginSettings {

public const DEFAULT_MAX_TURNS = 25;
public const DEFAULT_WP_AI_CLIENT_CONNECT_TIMEOUT = 15.0;
public const DEFAULT_WP_AI_CLIENT_REQUEST_TIMEOUT = 300.0;
public const MAX_WP_AI_CLIENT_CONNECT_TIMEOUT = 300.0;
public const MAX_WP_AI_CLIENT_REQUEST_TIMEOUT = 900.0;
public const DEFAULT_MAX_TURNS = 25;
public const DEFAULT_PIPELINE_AI_CONCURRENCY_LIMIT = 1;
public const DEFAULT_PIPELINE_AI_THROTTLE_DELAY = 10;
public const DEFAULT_WP_AI_CLIENT_CONNECT_TIMEOUT = 15.0;
public const DEFAULT_WP_AI_CLIENT_REQUEST_TIMEOUT = 300.0;
public const MAX_WP_AI_CLIENT_CONNECT_TIMEOUT = 300.0;
public const MAX_WP_AI_CLIENT_REQUEST_TIMEOUT = 900.0;

private static ?array $cache = null;
private static array $agent_model_cache = array();
Expand Down Expand Up @@ -58,14 +60,17 @@ public static function getDefaultQueueTuning(): array {
/**
* Get centralized plugin defaults used by backend and admin UI.
*
* @return array{max_turns:int,wp_ai_client_connect_timeout:float,wp_ai_client_request_timeout:float,queue_tuning:array{concurrent_batches:int,batch_size:int,time_limit:int,chunk_size:int,chunk_delay:int}}
* @return array<string,mixed> Default settings.
*/
public static function getDefaults(): array {
return array(
'max_turns' => self::DEFAULT_MAX_TURNS,
'wp_ai_client_connect_timeout' => self::DEFAULT_WP_AI_CLIENT_CONNECT_TIMEOUT,
'wp_ai_client_request_timeout' => self::DEFAULT_WP_AI_CLIENT_REQUEST_TIMEOUT,
'queue_tuning' => self::getDefaultQueueTuning(),
'max_turns' => self::DEFAULT_MAX_TURNS,
'pipeline_ai_concurrency_limit' => self::DEFAULT_PIPELINE_AI_CONCURRENCY_LIMIT,
'pipeline_ai_throttle_delay' => self::DEFAULT_PIPELINE_AI_THROTTLE_DELAY,
'pipeline_ai_provider_concurrency_limits' => array(),
'wp_ai_client_connect_timeout' => self::DEFAULT_WP_AI_CLIENT_CONNECT_TIMEOUT,
'wp_ai_client_request_timeout' => self::DEFAULT_WP_AI_CLIENT_REQUEST_TIMEOUT,
'queue_tuning' => self::getDefaultQueueTuning(),
);
}

Expand Down
126 changes: 113 additions & 13 deletions inc/Core/Steps/AI/AIStep.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
use DataMachine\Abilities\PermissionHelper;
use DataMachine\Core\Database\Agents\Agents;
use DataMachine\Core\DataPacket;
use DataMachine\Core\Database\Jobs\Jobs;
use DataMachine\Core\PluginSettings;
use DataMachine\Core\Steps\Step;
use DataMachine\Core\Steps\FlowStepConfig;
Expand All @@ -13,6 +14,8 @@
use DataMachine\Core\Steps\QueueableTrait;
use DataMachine\Engine\AI\ConversationManager;
use DataMachine\Engine\AI\DataPacketPromptProjector;
use DataMachine\Engine\AI\PipelineAIConcurrencyLease;
use DataMachine\Engine\AI\PipelineAIConcurrencyLimiter;
use DataMachine\Engine\AI\PipelineTranscriptPolicy;
use DataMachine\Engine\AI\Tools\ToolExecutor;
use DataMachine\Engine\AI\Tools\ToolPolicyResolver;
Expand Down Expand Up @@ -141,7 +144,39 @@ protected function executeStep(): array {
return $this->dataPackets;
}

// AIStep reads a single prompt slot — `prompt_queue` — under one
$pipeline_step_id = $this->flow_step_config['pipeline_step_id'];

// Resolve user_id and agent_id from engine snapshot (set by RunFlowAbility).
$job_snapshot = $this->engine->get( 'job' );
$agent_id = (int) ( $job_snapshot['agent_id'] ?? 0 );
$user_id = (int) ( $job_snapshot['user_id'] ?? 0 );

// Model/provider resolved exclusively via mode system — pipeline config is ignored.
$mode_model = PluginSettings::resolveModelForAgentMode( $agent_id, 'pipeline' );
$provider_name = $mode_model['provider'];
$model_name = $mode_model['model'];

$lease_result = PipelineAIConcurrencyLimiter::acquire(
$provider_name,
array(
'job_id' => $this->job_id,
'flow_step_id' => $this->flow_step_id,
'pipeline_step_id' => $pipeline_step_id,
'pipeline_id' => $job_snapshot['pipeline_id'] ?? null,
'flow_id' => $job_snapshot['flow_id'] ?? null,
'mode' => 'pipeline',
)
);

if ( empty( $lease_result['acquired'] ) ) {
$this->deferForAIConcurrency( $provider_name, $lease_result );
return array();
}

$ai_concurrency_lease = $lease_result['lease'] ?? null;

try {
// AIStep reads a single prompt slot — `prompt_queue` — under one
// of three access modes (#1291). Pre-collapse this branched on
// `queue_enabled` plus a `user_message` fallback; post-collapse
// the mode picks the access pattern and the queue head is the
Expand Down Expand Up @@ -189,13 +224,6 @@ protected function executeStep(): array {
$mime_type = is_string( $file_info['type'] ) ? $file_info['type'] : '';
}

$pipeline_step_id = $this->flow_step_config['pipeline_step_id'];

// Resolve user_id and agent_id from engine snapshot (set by RunFlowAbility).
$job_snapshot = $this->engine->get( 'job' );
$agent_id = (int) ( $job_snapshot['agent_id'] ?? 0 );
$user_id = (int) ( $job_snapshot['user_id'] ?? 0 );

$packet_projection_context = array(
'job_id' => $this->job_id,
'pipeline_id' => $job_snapshot['pipeline_id'] ?? null,
Expand Down Expand Up @@ -323,11 +351,6 @@ protected function executeStep(): array {
}
}

// Model/provider resolved exclusively via mode system — pipeline config is ignored.
$mode_model = PluginSettings::resolveModelForAgentMode( $agent_id, 'pipeline' );
$provider_name = $mode_model['provider'];
$model_name = $mode_model['model'];

// Establish agent execution context before firing the conversation loop.
//
// Pipelines run inside the Action Scheduler queue where no WordPress user
Expand Down Expand Up @@ -440,6 +463,83 @@ protected function executeStep(): array {

// Process loop results into data packets
return self::processLoopResults( $loop_result, $this->dataPackets, $payload, $available_tools );
} finally {
if ( $ai_concurrency_lease instanceof PipelineAIConcurrencyLease ) {
$ai_concurrency_lease->release();
}
}
}

/**
* Reschedule this AI step because provider/site concurrency is saturated.
*
* @param string $provider_name Provider slug.
* @param array $lease_result Limiter result.
*/
private function deferForAIConcurrency( string $provider_name, array $lease_result ): void {
$delay_seconds = max( 1, (int) ( $lease_result['delay'] ?? 10 ) );
$timestamp = time() + $delay_seconds;
$action_id = false;

if ( function_exists( 'as_schedule_single_action' ) ) {
$action_id = as_schedule_single_action(
$timestamp,
'datamachine_execute_step',
array(
'job_id' => $this->job_id,
'flow_step_id' => $this->flow_step_id,
),
'data-machine'
);
}

if ( false === $action_id ) {
do_action(
'datamachine_fail_job',
$this->job_id,
'ai_concurrency_defer_failed',
array(
'flow_step_id' => $this->flow_step_id,
'ai_provider' => $provider_name,
'error_message' => 'AI concurrency throttle could not reschedule the step.',
)
);
return;
}

( new Jobs() )->update_job_status( $this->job_id, 'pending' );

datamachine_merge_engine_data(
$this->job_id,
array(
'ai_concurrency_throttle' => array(
'next_retry_at' => gmdate( 'c', $timestamp ),
'rescheduled_for_seconds' => $delay_seconds,
'reason' => 'ai_concurrency_limit',
'provider' => $provider_name,
'flow_step_id' => $this->flow_step_id,
'limit' => (int) ( $lease_result['limit'] ?? 0 ),
'active' => (int) ( $lease_result['active'] ?? 0 ),
'action_id' => $action_id,
),
)
);

do_action(
'datamachine_log',
'warning',
'Pipeline AI step deferred by concurrency limit',
array(
'job_id' => $this->job_id,
'flow_step_id' => $this->flow_step_id,
'provider' => $provider_name,
'reason' => 'ai_concurrency_limit',
'limit' => (int) ( $lease_result['limit'] ?? 0 ),
'active' => (int) ( $lease_result['active'] ?? 0 ),
'rescheduled_for_seconds' => $delay_seconds,
'action_id' => $action_id,
)
);
}

/**
Expand Down
47 changes: 47 additions & 0 deletions inc/Engine/AI/PipelineAIConcurrencyLease.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
<?php
/**
* Pipeline AI concurrency lease.
*
* @package DataMachine\Engine\AI
*/

namespace DataMachine\Engine\AI;

defined( 'ABSPATH' ) || exit;

/**
* Releases one or more acquired AI concurrency slots.
*/
class PipelineAIConcurrencyLease {

/**
* @var string[]
*/
private array $option_names;

/**
* @var string
*/
private string $token;

/**
* @param string[] $option_names Acquired option names.
* @param string $token Lease token.
*/
public function __construct( array $option_names, string $token ) {
$this->option_names = $option_names;
$this->token = $token;
}

/**
* Release all slots owned by this lease.
*/
public function release(): void {
foreach ( $this->option_names as $option_name ) {
$lease = get_option( $option_name, false );
if ( is_array( $lease ) && ( $lease['token'] ?? '' ) === $this->token ) {
delete_option( $option_name );
}
}
}
}
Loading
Loading