From 1ae5445458868c4a83ea6768e5b054e68c565f0f Mon Sep 17 00:00:00 2001 From: Chris Huber Date: Wed, 6 May 2026 20:14:17 -0400 Subject: [PATCH] fix: throttle concurrent pipeline AI steps --- inc/Abilities/Engine/ExecuteStepAbility.php | 23 +- inc/Core/PluginSettings.php | 25 ++- inc/Core/Steps/AI/AIStep.php | 126 +++++++++-- inc/Engine/AI/PipelineAIConcurrencyLease.php | 47 ++++ .../AI/PipelineAIConcurrencyLimiter.php | 208 ++++++++++++++++++ tests/ai-step-backpressure-smoke.php | 118 ++++++++++ 6 files changed, 523 insertions(+), 24 deletions(-) create mode 100644 inc/Engine/AI/PipelineAIConcurrencyLease.php create mode 100644 inc/Engine/AI/PipelineAIConcurrencyLimiter.php create mode 100644 tests/ai-step-backpressure-smoke.php diff --git a/inc/Abilities/Engine/ExecuteStepAbility.php b/inc/Abilities/Engine/ExecuteStepAbility.php index 36ccc916..e8c1c594 100644 --- a/inc/Abilities/Engine/ExecuteStepAbility.php +++ b/inc/Abilities/Engine/ExecuteStepAbility.php @@ -692,7 +692,7 @@ private function getPriorTerminalStatus( int $job_id ): ?string { return $status; } - if ( JobStatus::PENDING === JobStatus::fromString( $status )->getBaseStatus() && $this->hasPendingRetry( $job_id ) ) { + if ( JobStatus::PENDING === JobStatus::fromString( $status )->getBaseStatus() && ( $this->hasPendingRetry( $job_id ) || $this->hasPendingAIConcurrencyThrottle( $job_id ) ) ) { return $status; } @@ -721,6 +721,27 @@ private function hasPendingRetry( int $job_id ): bool { return ! empty( $retry['next_retry_at'] ); } + /** + * Check whether the job has a future AI concurrency defer recorded. + * + * AIStep parks above-limit work back to pending and reschedules the same + * step. Treat that as an already-routed outcome, not as an empty-packet + * failure, so throttling remains distinct from provider/model errors. + * + * @param int $job_id Job ID. + * @return bool + */ + private function hasPendingAIConcurrencyThrottle( int $job_id ): bool { + if ( ! function_exists( 'datamachine_get_engine_data' ) ) { + return false; + } + + $engine_data = datamachine_get_engine_data( $job_id ); + $throttle = is_array( $engine_data['ai_concurrency_throttle'] ?? null ) ? $engine_data['ai_concurrency_throttle'] : array(); + + return ! empty( $throttle['next_retry_at'] ); + } + /** * Mark a completed job's source item as processed. * diff --git a/inc/Core/PluginSettings.php b/inc/Core/PluginSettings.php index 457b6ade..ce85c922 100644 --- a/inc/Core/PluginSettings.php +++ b/inc/Core/PluginSettings.php @@ -20,11 +20,13 @@ class PluginSettings { - public const DEFAULT_MAX_TURNS = 25; - public const DEFAULT_WP_AI_CLIENT_CONNECT_TIMEOUT = 15.0; - public const DEFAULT_WP_AI_CLIENT_REQUEST_TIMEOUT = 300.0; - public const MAX_WP_AI_CLIENT_CONNECT_TIMEOUT = 300.0; - public const MAX_WP_AI_CLIENT_REQUEST_TIMEOUT = 900.0; + public const DEFAULT_MAX_TURNS = 25; + public const DEFAULT_PIPELINE_AI_CONCURRENCY_LIMIT = 1; + public const DEFAULT_PIPELINE_AI_THROTTLE_DELAY = 10; + public const DEFAULT_WP_AI_CLIENT_CONNECT_TIMEOUT = 15.0; + public const DEFAULT_WP_AI_CLIENT_REQUEST_TIMEOUT = 300.0; + public const MAX_WP_AI_CLIENT_CONNECT_TIMEOUT = 300.0; + public const MAX_WP_AI_CLIENT_REQUEST_TIMEOUT = 900.0; private static ?array $cache = null; private static array $agent_model_cache = array(); @@ -58,14 +60,17 @@ public static function getDefaultQueueTuning(): array { /** * Get centralized plugin defaults used by backend and admin UI. * - * @return array{max_turns:int,wp_ai_client_connect_timeout:float,wp_ai_client_request_timeout:float,queue_tuning:array{concurrent_batches:int,batch_size:int,time_limit:int,chunk_size:int,chunk_delay:int}} + * @return array Default settings. */ public static function getDefaults(): array { return array( - 'max_turns' => self::DEFAULT_MAX_TURNS, - 'wp_ai_client_connect_timeout' => self::DEFAULT_WP_AI_CLIENT_CONNECT_TIMEOUT, - 'wp_ai_client_request_timeout' => self::DEFAULT_WP_AI_CLIENT_REQUEST_TIMEOUT, - 'queue_tuning' => self::getDefaultQueueTuning(), + 'max_turns' => self::DEFAULT_MAX_TURNS, + 'pipeline_ai_concurrency_limit' => self::DEFAULT_PIPELINE_AI_CONCURRENCY_LIMIT, + 'pipeline_ai_throttle_delay' => self::DEFAULT_PIPELINE_AI_THROTTLE_DELAY, + 'pipeline_ai_provider_concurrency_limits' => array(), + 'wp_ai_client_connect_timeout' => self::DEFAULT_WP_AI_CLIENT_CONNECT_TIMEOUT, + 'wp_ai_client_request_timeout' => self::DEFAULT_WP_AI_CLIENT_REQUEST_TIMEOUT, + 'queue_tuning' => self::getDefaultQueueTuning(), ); } diff --git a/inc/Core/Steps/AI/AIStep.php b/inc/Core/Steps/AI/AIStep.php index b435a18e..5e213735 100644 --- a/inc/Core/Steps/AI/AIStep.php +++ b/inc/Core/Steps/AI/AIStep.php @@ -5,6 +5,7 @@ use DataMachine\Abilities\PermissionHelper; use DataMachine\Core\Database\Agents\Agents; use DataMachine\Core\DataPacket; +use DataMachine\Core\Database\Jobs\Jobs; use DataMachine\Core\PluginSettings; use DataMachine\Core\Steps\Step; use DataMachine\Core\Steps\FlowStepConfig; @@ -13,6 +14,8 @@ use DataMachine\Core\Steps\QueueableTrait; use DataMachine\Engine\AI\ConversationManager; use DataMachine\Engine\AI\DataPacketPromptProjector; +use DataMachine\Engine\AI\PipelineAIConcurrencyLease; +use DataMachine\Engine\AI\PipelineAIConcurrencyLimiter; use DataMachine\Engine\AI\PipelineTranscriptPolicy; use DataMachine\Engine\AI\Tools\ToolExecutor; use DataMachine\Engine\AI\Tools\ToolPolicyResolver; @@ -141,7 +144,39 @@ protected function executeStep(): array { return $this->dataPackets; } - // AIStep reads a single prompt slot — `prompt_queue` — under one + $pipeline_step_id = $this->flow_step_config['pipeline_step_id']; + + // Resolve user_id and agent_id from engine snapshot (set by RunFlowAbility). + $job_snapshot = $this->engine->get( 'job' ); + $agent_id = (int) ( $job_snapshot['agent_id'] ?? 0 ); + $user_id = (int) ( $job_snapshot['user_id'] ?? 0 ); + + // Model/provider resolved exclusively via mode system — pipeline config is ignored. + $mode_model = PluginSettings::resolveModelForAgentMode( $agent_id, 'pipeline' ); + $provider_name = $mode_model['provider']; + $model_name = $mode_model['model']; + + $lease_result = PipelineAIConcurrencyLimiter::acquire( + $provider_name, + array( + 'job_id' => $this->job_id, + 'flow_step_id' => $this->flow_step_id, + 'pipeline_step_id' => $pipeline_step_id, + 'pipeline_id' => $job_snapshot['pipeline_id'] ?? null, + 'flow_id' => $job_snapshot['flow_id'] ?? null, + 'mode' => 'pipeline', + ) + ); + + if ( empty( $lease_result['acquired'] ) ) { + $this->deferForAIConcurrency( $provider_name, $lease_result ); + return array(); + } + + $ai_concurrency_lease = $lease_result['lease'] ?? null; + + try { + // AIStep reads a single prompt slot — `prompt_queue` — under one // of three access modes (#1291). Pre-collapse this branched on // `queue_enabled` plus a `user_message` fallback; post-collapse // the mode picks the access pattern and the queue head is the @@ -189,13 +224,6 @@ protected function executeStep(): array { $mime_type = is_string( $file_info['type'] ) ? $file_info['type'] : ''; } - $pipeline_step_id = $this->flow_step_config['pipeline_step_id']; - - // Resolve user_id and agent_id from engine snapshot (set by RunFlowAbility). - $job_snapshot = $this->engine->get( 'job' ); - $agent_id = (int) ( $job_snapshot['agent_id'] ?? 0 ); - $user_id = (int) ( $job_snapshot['user_id'] ?? 0 ); - $packet_projection_context = array( 'job_id' => $this->job_id, 'pipeline_id' => $job_snapshot['pipeline_id'] ?? null, @@ -323,11 +351,6 @@ protected function executeStep(): array { } } - // Model/provider resolved exclusively via mode system — pipeline config is ignored. - $mode_model = PluginSettings::resolveModelForAgentMode( $agent_id, 'pipeline' ); - $provider_name = $mode_model['provider']; - $model_name = $mode_model['model']; - // Establish agent execution context before firing the conversation loop. // // Pipelines run inside the Action Scheduler queue where no WordPress user @@ -440,6 +463,83 @@ protected function executeStep(): array { // Process loop results into data packets return self::processLoopResults( $loop_result, $this->dataPackets, $payload, $available_tools ); + } finally { + if ( $ai_concurrency_lease instanceof PipelineAIConcurrencyLease ) { + $ai_concurrency_lease->release(); + } + } + } + + /** + * Reschedule this AI step because provider/site concurrency is saturated. + * + * @param string $provider_name Provider slug. + * @param array $lease_result Limiter result. + */ + private function deferForAIConcurrency( string $provider_name, array $lease_result ): void { + $delay_seconds = max( 1, (int) ( $lease_result['delay'] ?? 10 ) ); + $timestamp = time() + $delay_seconds; + $action_id = false; + + if ( function_exists( 'as_schedule_single_action' ) ) { + $action_id = as_schedule_single_action( + $timestamp, + 'datamachine_execute_step', + array( + 'job_id' => $this->job_id, + 'flow_step_id' => $this->flow_step_id, + ), + 'data-machine' + ); + } + + if ( false === $action_id ) { + do_action( + 'datamachine_fail_job', + $this->job_id, + 'ai_concurrency_defer_failed', + array( + 'flow_step_id' => $this->flow_step_id, + 'ai_provider' => $provider_name, + 'error_message' => 'AI concurrency throttle could not reschedule the step.', + ) + ); + return; + } + + ( new Jobs() )->update_job_status( $this->job_id, 'pending' ); + + datamachine_merge_engine_data( + $this->job_id, + array( + 'ai_concurrency_throttle' => array( + 'next_retry_at' => gmdate( 'c', $timestamp ), + 'rescheduled_for_seconds' => $delay_seconds, + 'reason' => 'ai_concurrency_limit', + 'provider' => $provider_name, + 'flow_step_id' => $this->flow_step_id, + 'limit' => (int) ( $lease_result['limit'] ?? 0 ), + 'active' => (int) ( $lease_result['active'] ?? 0 ), + 'action_id' => $action_id, + ), + ) + ); + + do_action( + 'datamachine_log', + 'warning', + 'Pipeline AI step deferred by concurrency limit', + array( + 'job_id' => $this->job_id, + 'flow_step_id' => $this->flow_step_id, + 'provider' => $provider_name, + 'reason' => 'ai_concurrency_limit', + 'limit' => (int) ( $lease_result['limit'] ?? 0 ), + 'active' => (int) ( $lease_result['active'] ?? 0 ), + 'rescheduled_for_seconds' => $delay_seconds, + 'action_id' => $action_id, + ) + ); } /** diff --git a/inc/Engine/AI/PipelineAIConcurrencyLease.php b/inc/Engine/AI/PipelineAIConcurrencyLease.php new file mode 100644 index 00000000..99bfcf27 --- /dev/null +++ b/inc/Engine/AI/PipelineAIConcurrencyLease.php @@ -0,0 +1,47 @@ +option_names = $option_names; + $this->token = $token; + } + + /** + * Release all slots owned by this lease. + */ + public function release(): void { + foreach ( $this->option_names as $option_name ) { + $lease = get_option( $option_name, false ); + if ( is_array( $lease ) && ( $lease['token'] ?? '' ) === $this->token ) { + delete_option( $option_name ); + } + } + } +} diff --git a/inc/Engine/AI/PipelineAIConcurrencyLimiter.php b/inc/Engine/AI/PipelineAIConcurrencyLimiter.php new file mode 100644 index 00000000..332c3ffa --- /dev/null +++ b/inc/Engine/AI/PipelineAIConcurrencyLimiter.php @@ -0,0 +1,208 @@ +release(); + + return array( + 'acquired' => false, + 'reason' => 'ai_concurrency_limit', + 'limit' => $result['limit'], + 'active' => $result['active'], + 'delay' => self::throttleDelay( $provider, $context ), + 'provider' => $provider, + ); + } + + $acquired[] = $result['option_name']; + } + + return array( + 'acquired' => true, + 'lease' => new PipelineAIConcurrencyLease( $acquired, $token ), + 'limit' => $scopes[0]['limit'] ?? self::DEFAULT_LIMIT, + 'active' => count( $acquired ), + 'delay' => self::throttleDelay( $provider, $context ), + 'provider' => $provider, + ); + } + + /** + * Resolve site and optional provider scopes. + * + * @return array + */ + private static function resolveScopes( string $provider, array $context ): array { + $site_limit = max( 1, (int) PluginSettings::resolve( 'pipeline_ai_concurrency_limit', self::DEFAULT_LIMIT ) ); + + /** + * Filter site-wide pipeline AI concurrency. + * + * @param int $site_limit Site-wide limit. + * @param string $provider Provider slug. + * @param array $context Execution context. + */ + $site_limit = max( 1, (int) apply_filters( 'datamachine_pipeline_ai_concurrency_limit', $site_limit, $provider, $context ) ); + + $scopes = array( + array( + 'name' => 'site', + 'limit' => $site_limit, + ), + ); + + $provider_limits = PluginSettings::resolve( 'pipeline_ai_provider_concurrency_limits', array() ); + $provider_limit = is_array( $provider_limits ) ? (int) ( $provider_limits[ $provider ] ?? 0 ) : 0; + + /** + * Filter provider-specific pipeline AI concurrency. Return 0 to disable. + * + * @param int $provider_limit Provider limit, or 0 when disabled. + * @param string $provider Provider slug. + * @param array $context Execution context. + */ + $provider_limit = (int) apply_filters( 'datamachine_pipeline_ai_provider_concurrency_limit', $provider_limit, $provider, $context ); + + if ( $provider_limit > 0 ) { + $scopes[] = array( + 'name' => 'provider_' . $provider, + 'limit' => $provider_limit, + ); + } + + return $scopes; + } + + /** + * Acquire one slot in a scope. + * + * @return array{acquired:bool,limit:int,active:int,option_name?:string} + */ + private static function acquireScope( string $scope, int $limit, string $token, string $provider, array $context ): array { + $active = 0; + $now = time(); + + for ( $slot = 1; $slot <= $limit; ++$slot ) { + $option_name = self::optionName( $scope, $slot ); + $existing = get_option( $option_name, false ); + + if ( is_array( $existing ) && (int) ( $existing['expires_at'] ?? 0 ) <= $now ) { + delete_option( $option_name ); + $existing = false; + } + + if ( false !== $existing ) { + ++$active; + continue; + } + + $lease = array( + 'token' => $token, + 'provider' => $provider, + 'created_at' => $now, + 'expires_at' => $now + self::ttl( $provider, $context ), + 'job_id' => (int) ( $context['job_id'] ?? 0 ), + 'flow_step_id' => (string) ( $context['flow_step_id'] ?? '' ), + ); + + if ( add_option( $option_name, $lease, '', 'no' ) ) { + return array( + 'acquired' => true, + 'limit' => $limit, + 'active' => $active + 1, + 'option_name' => $option_name, + ); + } + + ++$active; + } + + return array( + 'acquired' => false, + 'limit' => $limit, + 'active' => $active, + ); + } + + /** + * Resolve throttle delay. + */ + private static function throttleDelay( string $provider, array $context ): int { + $delay = max( 1, (int) PluginSettings::resolve( 'pipeline_ai_throttle_delay', self::DEFAULT_THROTTLE_DELAY ) ); + + /** + * Filter delay for jobs deferred by pipeline AI concurrency. + * + * @param int $delay Delay in seconds. + * @param string $provider Provider slug. + * @param array $context Execution context. + */ + return max( 1, (int) apply_filters( 'datamachine_pipeline_ai_throttle_delay', $delay, $provider, $context ) ); + } + + /** + * Resolve stale lease TTL. + */ + private static function ttl( string $provider, array $context ): int { + /** + * Filter pipeline AI lease TTL. + * + * @param int $ttl TTL in seconds. + * @param string $provider Provider slug. + * @param array $context Execution context. + */ + return max( 30, (int) apply_filters( 'datamachine_pipeline_ai_concurrency_lease_ttl', self::DEFAULT_TTL, $provider, $context ) ); + } + + /** + * Build option name for a slot. + */ + private static function optionName( string $scope, int $slot ): string { + return 'datamachine_pipeline_ai_lease_' . md5( $scope ) . '_' . $slot; + } + + /** + * Build a per-request lease token. + */ + private static function token(): string { + try { + return bin2hex( random_bytes( 16 ) ); + } catch ( \Throwable ) { + return md5( uniqid( 'datamachine_ai_lease_', true ) ); + } + } +} diff --git a/tests/ai-step-backpressure-smoke.php b/tests/ai-step-backpressure-smoke.php new file mode 100644 index 00000000..fb08e422 --- /dev/null +++ b/tests/ai-step-backpressure-smoke.php @@ -0,0 +1,118 @@ + 101, 'flow_step_id' => 'ai-1' ) ); +assert_ai_backpressure_smoke( 'first acquire succeeds', true === $first['acquired'] ); +assert_ai_backpressure_smoke( 'first acquire returns lease', ( $first['lease'] ?? null ) instanceof PipelineAIConcurrencyLease ); + +echo "Case 2: second concurrent AI step is throttled\n"; +$second = PipelineAIConcurrencyLimiter::acquire( 'openai', array( 'job_id' => 102, 'flow_step_id' => 'ai-1' ) ); +assert_ai_backpressure_smoke( 'second acquire is denied', false === $second['acquired'] ); +assert_ai_backpressure_smoke( 'throttle reason is distinct', 'ai_concurrency_limit' === ( $second['reason'] ?? '' ) ); +assert_ai_backpressure_smoke( 'throttle delay is filterable metadata', 7 === ( $second['delay'] ?? 0 ) ); +assert_ai_backpressure_smoke( 'active count is reported', 1 === ( $second['active'] ?? 0 ) ); + +echo "Case 3: release happens on exception/finally paths\n"; +try { + throw new RuntimeException( 'synthetic failure after slot acquisition' ); +} catch ( RuntimeException ) { + // Mirrors AIStep's outer finally: release is independent of model outcome. +} finally { + $first['lease']->release(); +} + +$after_release = PipelineAIConcurrencyLimiter::acquire( 'openai', array( 'job_id' => 103, 'flow_step_id' => 'ai-1' ) ); +assert_ai_backpressure_smoke( 'slot can be acquired after release', true === $after_release['acquired'] ); +$after_release['lease']->release(); + +echo "Case 4: production wiring preserves pipeline semantics\n"; +$ai_src = file_get_contents( __DIR__ . '/../inc/Core/Steps/AI/AIStep.php' ) ?: ''; +$engine_src = file_get_contents( __DIR__ . '/../inc/Abilities/Engine/ExecuteStepAbility.php' ) ?: ''; +$retry_src = file_get_contents( __DIR__ . '/../inc/Core/JobRetryPolicy.php' ) ?: ''; +$fetch_src = file_get_contents( __DIR__ . '/../inc/Core/Steps/Fetch/FetchStep.php' ) ?: ''; +$upsert_files = glob( __DIR__ . '/../inc/Core/Steps/Upsert/*.php' ) ?: array(); +$upsert_src = implode( "\n", array_map( static fn( string $path ): string => file_get_contents( $path ) ?: '', $upsert_files ) ); + +assert_ai_backpressure_smoke( 'AIStep acquires before prompt queue consumption', strpos( $ai_src, 'PipelineAIConcurrencyLimiter::acquire' ) < strpos( $ai_src, 'consumeFromPromptQueue' ) ); +assert_ai_backpressure_smoke( 'AIStep throttles by rescheduling same step', str_contains( $ai_src, 'deferForAIConcurrency' ) && str_contains( $ai_src, 'datamachine_execute_step' ) ); +assert_ai_backpressure_smoke( 'executor treats AI throttle as routed pending work', str_contains( $engine_src, 'hasPendingAIConcurrencyThrottle' ) && str_contains( $engine_src, 'ai_concurrency_throttle' ) ); +assert_ai_backpressure_smoke( 'fetch steps do not use AI limiter', ! str_contains( $fetch_src, 'PipelineAIConcurrencyLimiter' ) ); +assert_ai_backpressure_smoke( 'upsert steps do not use AI limiter', ! str_contains( $upsert_src, 'PipelineAIConcurrencyLimiter' ) ); +assert_ai_backpressure_smoke( 'existing transport retry classifier remains intact', str_contains( $retry_src, 'transport_connect_timeout' ) && str_contains( $retry_src, 'AI_TRANSPORT_BASE_DELAY' ) ); +assert_ai_backpressure_smoke( 'throttle log includes requested metadata', str_contains( $ai_src, 'rescheduled_for_seconds' ) && str_contains( $ai_src, "'active'" ) && str_contains( $ai_src, "'limit'" ) ); + +echo "\nAI step backpressure smoke complete: {$total} assertions, {$failed} failures.\n"; +if ( $failed > 0 ) { + exit( 1 ); +}