From e81cf196e407406639f43463eea91d7a91d20144 Mon Sep 17 00:00:00 2001 From: Chris Huber Date: Wed, 6 May 2026 19:34:41 -0400 Subject: [PATCH] fix: shorten AI transport retry delays --- inc/Core/JobRetryPolicy.php | 80 +++++++++++++++++++++++++++++++- inc/Core/Steps/AI/AIStep.php | 3 ++ tests/job-retry-policy-smoke.php | 25 +++++++++- 3 files changed, 104 insertions(+), 4 deletions(-) diff --git a/inc/Core/JobRetryPolicy.php b/inc/Core/JobRetryPolicy.php index dc3ec79b..3d7d87e6 100644 --- a/inc/Core/JobRetryPolicy.php +++ b/inc/Core/JobRetryPolicy.php @@ -26,6 +26,11 @@ class JobRetryPolicy { */ private const DEFAULT_BASE_DELAY = 60; + /** + * Short base delay in seconds for cheap AI transport/connect retries. + */ + private const AI_TRANSPORT_BASE_DELAY = 15; + /** * Default maximum delay in seconds. */ @@ -131,6 +136,7 @@ public static function maybeRetry( int $job_id, string $reason, array $context_d 'attempt' => $attempt, 'max_attempts' => $max_attempts, 'delay_seconds' => $delay_seconds, + 'retry_class' => $policy['retry_class'] ?? null, 'action_id' => $action_id, 'reason' => $reason, 'provider' => $context_data['ai_provider'] ?? $engine_data['provider'] ?? null, @@ -145,6 +151,7 @@ public static function maybeRetry( int $job_id, string $reason, array $context_d 'max_attempts' => $max_attempts, 'next_retry_at' => gmdate( 'c', $timestamp ), 'retry_after' => $delay_seconds, + 'retry_class' => $policy['retry_class'] ?? 'generic', ); } @@ -159,7 +166,8 @@ public static function maybeRetry( int $job_id, string $reason, array $context_d * @return array */ private static function resolvePolicy( int $job_id, string $reason, array $context_data, array $engine_data, array $job ): array { - $retryable = self::isRetryableFailure( $reason, $context_data ); + $retry_class = self::classifyFailure( $reason, $context_data ); + $retryable = self::isRetryableFailure( $reason, $context_data ); /** * Filter whether a job failure is retryable. @@ -176,11 +184,12 @@ private static function resolvePolicy( int $job_id, string $reason, array $conte $policy = array( 'retryable' => $retryable, 'max_attempts' => self::DEFAULT_MAX_ATTEMPTS, - 'base_delay' => self::DEFAULT_BASE_DELAY, + 'base_delay' => self::isShortTransportRetryClass( $retry_class ) ? self::AI_TRANSPORT_BASE_DELAY : self::DEFAULT_BASE_DELAY, 'max_delay' => self::DEFAULT_MAX_DELAY, 'backoff' => 'exponential', 'jitter' => 0, 'retry_after' => self::extractRetryAfter( $context_data ), + 'retry_class' => $retry_class, 'provider' => $context_data['ai_provider'] ?? $engine_data['provider'] ?? null, 'source_type' => $engine_data['source_type'] ?? null, 'pipeline_id' => $job['pipeline_id'] ?? ( $engine_data['job']['pipeline_id'] ?? null ), @@ -222,6 +231,10 @@ private static function isRetryableFailure( string $reason, array $context_data return true; } + if ( self::isShortTransportRetryClass( self::classifyFailure( $reason, $context_data ) ) ) { + return true; + } + $message = strtolower( implode( ' ', array_filter( array( $reason, $context_data['error_message'] ?? '', @@ -238,6 +251,67 @@ private static function isRetryableFailure( string $reason, array $context_data return false; } + /** + * Classify retryable failures so cheap transport blips can retry sooner. + * + * @param string $reason Failure reason. + * @param array $context_data Failure context. + * @return string + */ + private static function classifyFailure( string $reason, array $context_data ): string { + $message = strtolower( implode( ' ', array_filter( array( + $reason, + $context_data['error_code'] ?? '', + $context_data['error_message'] ?? '', + $context_data['exception_message'] ?? '', + $context_data['ai_error'] ?? '', + ) ) ) ); + + if ( '' === $message ) { + return 'generic'; + } + + foreach ( array( 'rate limit', 'rate-limit', 'too many requests', '429', 'throttle', 'throttled' ) as $needle ) { + if ( str_contains( $message, $needle ) ) { + return 'provider_rate_limit'; + } + } + + foreach ( array( 'overloaded', 'overload', '503', '502', '504', 'service unavailable' ) as $needle ) { + if ( str_contains( $message, $needle ) ) { + return 'provider_overload'; + } + } + + foreach ( array( 'could not resolve host', 'couldn\'t resolve host', 'name or service not known', 'curl error 6', 'dns' ) as $needle ) { + if ( str_contains( $message, $needle ) ) { + return 'transport_dns'; + } + } + + foreach ( array( 'curl error 7', 'failed to connect', 'connection refused', 'connection reset', 'network is unreachable', 'no route to host' ) as $needle ) { + if ( str_contains( $message, $needle ) ) { + return 'transport_network'; + } + } + + if ( str_contains( $message, 'curl error 28' ) || str_contains( $message, 'connection timed out' ) || str_contains( $message, 'connect timed out' ) ) { + return 'transport_connect_timeout'; + } + + return 'generic'; + } + + /** + * Whether a retry class should use the short AI transport delay. + * + * @param string $retry_class Retry classification. + * @return bool + */ + private static function isShortTransportRetryClass( string $retry_class ): bool { + return in_array( $retry_class, array( 'transport_connect_timeout', 'transport_dns', 'transport_network' ), true ); + } + /** * Resolve delay with Retry-After and provider/source throttling hooks. * @@ -382,6 +456,7 @@ private static function recordRetry( int $job_id, string $reason, array $context 'flow_step_id' => $context_data['flow_step_id'] ?? null, 'provider' => $policy['provider'] ?? null, 'source_type' => $policy['source_type'] ?? null, + 'retry_class' => $policy['retry_class'] ?? null, 'delay_seconds' => $delay_seconds, 'next_retry_at' => gmdate( 'c', $timestamp ), 'recorded_at' => gmdate( 'c' ), @@ -399,6 +474,7 @@ private static function recordRetry( int $job_id, string $reason, array $context 'delay_seconds' => $delay_seconds, 'last_reason' => $reason, 'last_retryable' => true, + 'retry_class' => $policy['retry_class'] ?? null, 'provider' => $policy['provider'] ?? null, 'source_type' => $policy['source_type'] ?? null, 'history' => $history, diff --git a/inc/Core/Steps/AI/AIStep.php b/inc/Core/Steps/AI/AIStep.php index 0adafc5a..b435a18e 100644 --- a/inc/Core/Steps/AI/AIStep.php +++ b/inc/Core/Steps/AI/AIStep.php @@ -388,6 +388,7 @@ protected function executeStep(): array { // Check for errors if ( isset( $loop_result['error'] ) ) { + $request_metadata = is_array( $loop_result['request_metadata'] ?? null ) ? $loop_result['request_metadata'] : array(); // Record the transcript on the failure path too so operators // can `wp datamachine jobs transcript ` and see exactly // what the model received before the AI request died. This @@ -408,6 +409,8 @@ protected function executeStep(): array { 'flow_step_id' => $this->flow_step_id, 'ai_error' => $loop_result['error'], 'ai_provider' => $provider_name, + 'request_metadata' => $request_metadata, + 'transport_profile' => is_array( $request_metadata['transport'] ?? null ) ? $request_metadata['transport'] : array(), 'retry_after' => $loop_result['retry_after'] ?? null, 'retry_after_seconds' => $loop_result['retry_after_seconds'] ?? null, 'headers' => is_array( $loop_result['headers'] ?? null ) ? $loop_result['headers'] : array(), diff --git a/tests/job-retry-policy-smoke.php b/tests/job-retry-policy-smoke.php index ec9ef330..6eda94ee 100644 --- a/tests/job-retry-policy-smoke.php +++ b/tests/job-retry-policy-smoke.php @@ -46,6 +46,8 @@ function wp_rand( int $min, int $max ): int { $normalize_delay = $reflection->getMethod( 'normalizeDelay' ); $is_retryable = $reflection->getMethod( 'isRetryableFailure' ); $resolve_delay = $reflection->getMethod( 'resolveDelay' ); +$classify_failure = $reflection->getMethod( 'classifyFailure' ); +$resolve_policy = $reflection->getMethod( 'resolvePolicy' ); echo "Case 1: Retry-After values are normalized\n"; assert_retry_policy_smoke( 'numeric Retry-After is seconds', 90 === $extract_retry_after->invoke( null, array( 'retry_after' => '90' ) ) ); @@ -56,6 +58,7 @@ function wp_rand( int $min, int $max ): int { assert_retry_policy_smoke( 'explicit retryable flag wins', true === $is_retryable->invoke( null, 'custom_failure', array( 'retryable' => true ) ) ); assert_retry_policy_smoke( 'Retry-After implies retryable', true === $is_retryable->invoke( null, 'provider_error', array( 'retry_after' => 10 ) ) ); assert_retry_policy_smoke( 'rate-limit text implies retryable', true === $is_retryable->invoke( null, 'ai_processing_failed', array( 'ai_error' => 'Provider returned 429 rate limit' ) ) ); +assert_retry_policy_smoke( 'cURL 28 connect timeout text implies retryable', true === $is_retryable->invoke( null, 'ai_processing_failed', array( 'ai_error' => 'cURL error 28: Connection timed out after 15000 milliseconds' ) ) ); assert_retry_policy_smoke( 'validation-style failures are not retryable by default', false === $is_retryable->invoke( null, 'missing_flow_id_in_step_config', array() ) ); echo "Case 3: Backoff composes with Retry-After\n"; @@ -72,7 +75,24 @@ function wp_rand( int $min, int $max ): int { ); assert_retry_policy_smoke( 'Retry-After is a floor over exponential backoff', 300 === $delay, 'delay was ' . $delay ); -echo "Case 4: Production code exposes retry/backoff hooks and metadata\n"; +echo "Case 4: AI retry classification preserves provider backoff and shortens transport retries\n"; +$transport_context = array( 'ai_error' => 'cURL error 28: Connection timed out after 15000 milliseconds' ); +$rate_context = array( 'ai_error' => 'Provider returned 429 rate limit' ); +$generic_context = array( 'ai_error' => 'Provider temporarily unavailable, try again later' ); +$transport_policy = $resolve_policy->invoke( null, 123, 'ai_processing_failed', $transport_context, array(), array() ); +$rate_policy = $resolve_policy->invoke( null, 123, 'ai_processing_failed', $rate_context, array(), array() ); +$generic_policy = $resolve_policy->invoke( null, 123, 'ai_processing_failed', $generic_context, array(), array() ); +$transport_delay = $resolve_delay->invoke( null, 1, $transport_policy, array() ); +$rate_delay = $resolve_delay->invoke( null, 1, $rate_policy, array() ); +$generic_delay = $resolve_delay->invoke( null, 1, $generic_policy, array() ); + +assert_retry_policy_smoke( 'cURL 28 is classified as transport connect timeout', 'transport_connect_timeout' === $classify_failure->invoke( null, 'ai_processing_failed', $transport_context ) ); +assert_retry_policy_smoke( 'cURL 28 uses short transport base delay', 15 === $transport_delay, 'delay was ' . $transport_delay ); +assert_retry_policy_smoke( 'rate limit is classified separately', 'provider_rate_limit' === $rate_policy['retry_class'] ); +assert_retry_policy_smoke( 'rate limit keeps default base delay', 60 === $rate_delay, 'delay was ' . $rate_delay ); +assert_retry_policy_smoke( 'generic retryable AI failure keeps default base delay', 60 === $generic_delay, 'delay was ' . $generic_delay ); + +echo "Case 5: Production code exposes retry/backoff hooks and metadata\n"; $policy_src = file_get_contents( __DIR__ . '/../inc/Core/JobRetryPolicy.php' ) ?: ''; $fail_src = file_get_contents( __DIR__ . '/../inc/Engine/Actions/Handlers/FailJobHandler.php' ) ?: ''; $ai_src = file_get_contents( __DIR__ . '/../inc/Core/Steps/AI/AIStep.php' ) ?: ''; @@ -81,9 +101,10 @@ function wp_rand( int $min, int $max ): int { assert_retry_policy_smoke( 'policy exposes retry policy hook', str_contains( $policy_src, 'datamachine_job_retry_policy' ) ); assert_retry_policy_smoke( 'policy exposes provider/source throttle hook', str_contains( $policy_src, 'datamachine_job_retry_throttle_delay' ) ); assert_retry_policy_smoke( 'policy records retry metadata', str_contains( $policy_src, "'retry'" ) && str_contains( $policy_src, "'history'" ) ); +assert_retry_policy_smoke( 'policy records retry classification metadata', str_contains( $policy_src, "'retry_class'" ) ); assert_retry_policy_smoke( 'policy records poison item isolation metadata', str_contains( $policy_src, "'poison_item'" ) ); assert_retry_policy_smoke( 'fail handler tries retry before final failure', strpos( $fail_src, 'maybeRetry' ) < strpos( $fail_src, 'complete_job' ) ); -assert_retry_policy_smoke( 'AI failures pass retry-after context', str_contains( $ai_src, "'retry_after'" ) && str_contains( $ai_src, "'headers'" ) ); +assert_retry_policy_smoke( 'AI failures pass retry and transport context', str_contains( $ai_src, "'retry_after'" ) && str_contains( $ai_src, "'headers'" ) && str_contains( $ai_src, "'transport_profile'" ) ); echo "\nJob retry policy smoke complete: {$total} assertions, {$failed} failures.\n"; if ( $failed > 0 ) {