Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 78 additions & 2 deletions inc/Core/JobRetryPolicy.php
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@ class JobRetryPolicy {
*/
private const DEFAULT_BASE_DELAY = 60;

/**
* Short base delay in seconds for cheap AI transport/connect retries.
*/
private const AI_TRANSPORT_BASE_DELAY = 15;

/**
* Default maximum delay in seconds.
*/
Expand Down Expand Up @@ -131,6 +136,7 @@ public static function maybeRetry( int $job_id, string $reason, array $context_d
'attempt' => $attempt,
'max_attempts' => $max_attempts,
'delay_seconds' => $delay_seconds,
'retry_class' => $policy['retry_class'] ?? null,
'action_id' => $action_id,
'reason' => $reason,
'provider' => $context_data['ai_provider'] ?? $engine_data['provider'] ?? null,
Expand All @@ -145,6 +151,7 @@ public static function maybeRetry( int $job_id, string $reason, array $context_d
'max_attempts' => $max_attempts,
'next_retry_at' => gmdate( 'c', $timestamp ),
'retry_after' => $delay_seconds,
'retry_class' => $policy['retry_class'] ?? 'generic',
);
}

Expand All @@ -159,7 +166,8 @@ public static function maybeRetry( int $job_id, string $reason, array $context_d
* @return array
*/
private static function resolvePolicy( int $job_id, string $reason, array $context_data, array $engine_data, array $job ): array {
$retryable = self::isRetryableFailure( $reason, $context_data );
$retry_class = self::classifyFailure( $reason, $context_data );
$retryable = self::isRetryableFailure( $reason, $context_data );

/**
* Filter whether a job failure is retryable.
Expand All @@ -176,11 +184,12 @@ private static function resolvePolicy( int $job_id, string $reason, array $conte
$policy = array(
'retryable' => $retryable,
'max_attempts' => self::DEFAULT_MAX_ATTEMPTS,
'base_delay' => self::DEFAULT_BASE_DELAY,
'base_delay' => self::isShortTransportRetryClass( $retry_class ) ? self::AI_TRANSPORT_BASE_DELAY : self::DEFAULT_BASE_DELAY,
'max_delay' => self::DEFAULT_MAX_DELAY,
'backoff' => 'exponential',
'jitter' => 0,
'retry_after' => self::extractRetryAfter( $context_data ),
'retry_class' => $retry_class,
'provider' => $context_data['ai_provider'] ?? $engine_data['provider'] ?? null,
'source_type' => $engine_data['source_type'] ?? null,
'pipeline_id' => $job['pipeline_id'] ?? ( $engine_data['job']['pipeline_id'] ?? null ),
Expand Down Expand Up @@ -222,6 +231,10 @@ private static function isRetryableFailure( string $reason, array $context_data
return true;
}

if ( self::isShortTransportRetryClass( self::classifyFailure( $reason, $context_data ) ) ) {
return true;
}

$message = strtolower( implode( ' ', array_filter( array(
$reason,
$context_data['error_message'] ?? '',
Expand All @@ -238,6 +251,67 @@ private static function isRetryableFailure( string $reason, array $context_data
return false;
}

/**
* Classify retryable failures so cheap transport blips can retry sooner.
*
* @param string $reason Failure reason.
* @param array $context_data Failure context.
* @return string
*/
private static function classifyFailure( string $reason, array $context_data ): string {
$message = strtolower( implode( ' ', array_filter( array(
$reason,
$context_data['error_code'] ?? '',
$context_data['error_message'] ?? '',
$context_data['exception_message'] ?? '',
$context_data['ai_error'] ?? '',
) ) ) );

if ( '' === $message ) {
return 'generic';
}

foreach ( array( 'rate limit', 'rate-limit', 'too many requests', '429', 'throttle', 'throttled' ) as $needle ) {
if ( str_contains( $message, $needle ) ) {
return 'provider_rate_limit';
}
}

foreach ( array( 'overloaded', 'overload', '503', '502', '504', 'service unavailable' ) as $needle ) {
if ( str_contains( $message, $needle ) ) {
return 'provider_overload';
}
}

foreach ( array( 'could not resolve host', 'couldn\'t resolve host', 'name or service not known', 'curl error 6', 'dns' ) as $needle ) {
if ( str_contains( $message, $needle ) ) {
return 'transport_dns';
}
}

foreach ( array( 'curl error 7', 'failed to connect', 'connection refused', 'connection reset', 'network is unreachable', 'no route to host' ) as $needle ) {
if ( str_contains( $message, $needle ) ) {
return 'transport_network';
}
}

if ( str_contains( $message, 'curl error 28' ) || str_contains( $message, 'connection timed out' ) || str_contains( $message, 'connect timed out' ) ) {
return 'transport_connect_timeout';
}

return 'generic';
}

/**
* Whether a retry class should use the short AI transport delay.
*
* @param string $retry_class Retry classification.
* @return bool
*/
private static function isShortTransportRetryClass( string $retry_class ): bool {
return in_array( $retry_class, array( 'transport_connect_timeout', 'transport_dns', 'transport_network' ), true );
}

/**
* Resolve delay with Retry-After and provider/source throttling hooks.
*
Expand Down Expand Up @@ -382,6 +456,7 @@ private static function recordRetry( int $job_id, string $reason, array $context
'flow_step_id' => $context_data['flow_step_id'] ?? null,
'provider' => $policy['provider'] ?? null,
'source_type' => $policy['source_type'] ?? null,
'retry_class' => $policy['retry_class'] ?? null,
'delay_seconds' => $delay_seconds,
'next_retry_at' => gmdate( 'c', $timestamp ),
'recorded_at' => gmdate( 'c' ),
Expand All @@ -399,6 +474,7 @@ private static function recordRetry( int $job_id, string $reason, array $context
'delay_seconds' => $delay_seconds,
'last_reason' => $reason,
'last_retryable' => true,
'retry_class' => $policy['retry_class'] ?? null,
'provider' => $policy['provider'] ?? null,
'source_type' => $policy['source_type'] ?? null,
'history' => $history,
Expand Down
3 changes: 3 additions & 0 deletions inc/Core/Steps/AI/AIStep.php
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ protected function executeStep(): array {

// Check for errors
if ( isset( $loop_result['error'] ) ) {
$request_metadata = is_array( $loop_result['request_metadata'] ?? null ) ? $loop_result['request_metadata'] : array();
// Record the transcript on the failure path too so operators
// can `wp datamachine jobs transcript <id>` and see exactly
// what the model received before the AI request died. This
Expand All @@ -408,6 +409,8 @@ protected function executeStep(): array {
'flow_step_id' => $this->flow_step_id,
'ai_error' => $loop_result['error'],
'ai_provider' => $provider_name,
'request_metadata' => $request_metadata,
'transport_profile' => is_array( $request_metadata['transport'] ?? null ) ? $request_metadata['transport'] : array(),
'retry_after' => $loop_result['retry_after'] ?? null,
'retry_after_seconds' => $loop_result['retry_after_seconds'] ?? null,
'headers' => is_array( $loop_result['headers'] ?? null ) ? $loop_result['headers'] : array(),
Expand Down
25 changes: 23 additions & 2 deletions tests/job-retry-policy-smoke.php
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ function wp_rand( int $min, int $max ): int {
$normalize_delay = $reflection->getMethod( 'normalizeDelay' );
$is_retryable = $reflection->getMethod( 'isRetryableFailure' );
$resolve_delay = $reflection->getMethod( 'resolveDelay' );
$classify_failure = $reflection->getMethod( 'classifyFailure' );
$resolve_policy = $reflection->getMethod( 'resolvePolicy' );

echo "Case 1: Retry-After values are normalized\n";
assert_retry_policy_smoke( 'numeric Retry-After is seconds', 90 === $extract_retry_after->invoke( null, array( 'retry_after' => '90' ) ) );
Expand All @@ -56,6 +58,7 @@ function wp_rand( int $min, int $max ): int {
assert_retry_policy_smoke( 'explicit retryable flag wins', true === $is_retryable->invoke( null, 'custom_failure', array( 'retryable' => true ) ) );
assert_retry_policy_smoke( 'Retry-After implies retryable', true === $is_retryable->invoke( null, 'provider_error', array( 'retry_after' => 10 ) ) );
assert_retry_policy_smoke( 'rate-limit text implies retryable', true === $is_retryable->invoke( null, 'ai_processing_failed', array( 'ai_error' => 'Provider returned 429 rate limit' ) ) );
assert_retry_policy_smoke( 'cURL 28 connect timeout text implies retryable', true === $is_retryable->invoke( null, 'ai_processing_failed', array( 'ai_error' => 'cURL error 28: Connection timed out after 15000 milliseconds' ) ) );
assert_retry_policy_smoke( 'validation-style failures are not retryable by default', false === $is_retryable->invoke( null, 'missing_flow_id_in_step_config', array() ) );

echo "Case 3: Backoff composes with Retry-After\n";
Expand All @@ -72,7 +75,24 @@ function wp_rand( int $min, int $max ): int {
);
assert_retry_policy_smoke( 'Retry-After is a floor over exponential backoff', 300 === $delay, 'delay was ' . $delay );

echo "Case 4: Production code exposes retry/backoff hooks and metadata\n";
echo "Case 4: AI retry classification preserves provider backoff and shortens transport retries\n";
$transport_context = array( 'ai_error' => 'cURL error 28: Connection timed out after 15000 milliseconds' );
$rate_context = array( 'ai_error' => 'Provider returned 429 rate limit' );
$generic_context = array( 'ai_error' => 'Provider temporarily unavailable, try again later' );
$transport_policy = $resolve_policy->invoke( null, 123, 'ai_processing_failed', $transport_context, array(), array() );
$rate_policy = $resolve_policy->invoke( null, 123, 'ai_processing_failed', $rate_context, array(), array() );
$generic_policy = $resolve_policy->invoke( null, 123, 'ai_processing_failed', $generic_context, array(), array() );
$transport_delay = $resolve_delay->invoke( null, 1, $transport_policy, array() );
$rate_delay = $resolve_delay->invoke( null, 1, $rate_policy, array() );
$generic_delay = $resolve_delay->invoke( null, 1, $generic_policy, array() );

assert_retry_policy_smoke( 'cURL 28 is classified as transport connect timeout', 'transport_connect_timeout' === $classify_failure->invoke( null, 'ai_processing_failed', $transport_context ) );
assert_retry_policy_smoke( 'cURL 28 uses short transport base delay', 15 === $transport_delay, 'delay was ' . $transport_delay );
assert_retry_policy_smoke( 'rate limit is classified separately', 'provider_rate_limit' === $rate_policy['retry_class'] );
assert_retry_policy_smoke( 'rate limit keeps default base delay', 60 === $rate_delay, 'delay was ' . $rate_delay );
assert_retry_policy_smoke( 'generic retryable AI failure keeps default base delay', 60 === $generic_delay, 'delay was ' . $generic_delay );

echo "Case 5: Production code exposes retry/backoff hooks and metadata\n";
$policy_src = file_get_contents( __DIR__ . '/../inc/Core/JobRetryPolicy.php' ) ?: '';
$fail_src = file_get_contents( __DIR__ . '/../inc/Engine/Actions/Handlers/FailJobHandler.php' ) ?: '';
$ai_src = file_get_contents( __DIR__ . '/../inc/Core/Steps/AI/AIStep.php' ) ?: '';
Expand All @@ -81,9 +101,10 @@ function wp_rand( int $min, int $max ): int {
assert_retry_policy_smoke( 'policy exposes retry policy hook', str_contains( $policy_src, 'datamachine_job_retry_policy' ) );
assert_retry_policy_smoke( 'policy exposes provider/source throttle hook', str_contains( $policy_src, 'datamachine_job_retry_throttle_delay' ) );
assert_retry_policy_smoke( 'policy records retry metadata', str_contains( $policy_src, "'retry'" ) && str_contains( $policy_src, "'history'" ) );
assert_retry_policy_smoke( 'policy records retry classification metadata', str_contains( $policy_src, "'retry_class'" ) );
assert_retry_policy_smoke( 'policy records poison item isolation metadata', str_contains( $policy_src, "'poison_item'" ) );
assert_retry_policy_smoke( 'fail handler tries retry before final failure', strpos( $fail_src, 'maybeRetry' ) < strpos( $fail_src, 'complete_job' ) );
assert_retry_policy_smoke( 'AI failures pass retry-after context', str_contains( $ai_src, "'retry_after'" ) && str_contains( $ai_src, "'headers'" ) );
assert_retry_policy_smoke( 'AI failures pass retry and transport context', str_contains( $ai_src, "'retry_after'" ) && str_contains( $ai_src, "'headers'" ) && str_contains( $ai_src, "'transport_profile'" ) );

echo "\nJob retry policy smoke complete: {$total} assertions, {$failed} failures.\n";
if ( $failed > 0 ) {
Expand Down
Loading