From 6c5241ca46983c6a1d750a086eedc3ca104f9ba6 Mon Sep 17 00:00:00 2001 From: Chris Huber Date: Wed, 6 May 2026 16:27:29 -0400 Subject: [PATCH] fix(job): restore queue backups only for drain mode --- inc/Abilities/Job/JobHelpers.php | 72 +++++++++ inc/Abilities/Job/RecoverStuckJobsAbility.php | 42 +---- inc/Abilities/Job/RetryJobAbility.php | 47 +----- tests/job-queue-backup-restore-smoke.php | 146 ++++++++++++++++++ 4 files changed, 224 insertions(+), 83 deletions(-) create mode 100644 tests/job-queue-backup-restore-smoke.php diff --git a/inc/Abilities/Job/JobHelpers.php b/inc/Abilities/Job/JobHelpers.php index 0ae90102a..72b8a5f2b 100644 --- a/inc/Abilities/Job/JobHelpers.php +++ b/inc/Abilities/Job/JobHelpers.php @@ -12,6 +12,7 @@ namespace DataMachine\Abilities\Job; use DataMachine\Abilities\PermissionHelper; +use DataMachine\Abilities\Flow\QueueAbility; use DataMachine\Core\Admin\DateFormatter; use DataMachine\Core\Database\Flows\Flows; @@ -189,6 +190,77 @@ protected function createJob( int $flow_id, int $pipeline_id = 0 ): ?int { return $job_id; } + /** + * Restore a queue item backup when retry/recovery needs to undo a drain-mode consume. + * + * Loop mode already rotates the consumed item back into the queue, and static mode never + * mutates queue storage, so only drain-mode backups are appended. + * + * @param int $flow_id Flow ID containing the queued step. + * @param array $backup queued_prompt_backup payload from job engine_data. + * @return bool Whether the backup was restored to the flow config. + */ + protected function restoreQueuedPromptBackup( int $flow_id, array $backup ): bool { + $flow = $this->db_flows->get_flow( $flow_id ); + if ( ! $flow || ! isset( $flow['flow_config'] ) || ! is_array( $flow['flow_config'] ) ) { + return false; + } + + $flow_config = $flow['flow_config']; + if ( ! $this->restoreQueuedPromptBackupToFlowConfig( $flow_config, $backup ) ) { + return false; + } + + return (bool) $this->db_flows->update_flow( $flow_id, array( 'flow_config' => $flow_config ) ); + } + + /** + * Apply queue-backup restoration semantics to an in-memory flow config. + * + * @param array $flow_config Flow config to mutate when restoration is needed. + * @param array $backup queued_prompt_backup payload from job engine_data. + * @return bool Whether an entry was appended. + */ + protected function restoreQueuedPromptBackupToFlowConfig( array &$flow_config, array $backup ): bool { + $mode = $backup['mode'] ?? 'drain'; + if ( 'drain' !== $mode || empty( $backup['flow_step_id'] ) ) { + return false; + } + + $step_id = $backup['flow_step_id']; + if ( ! isset( $flow_config[ $step_id ] ) || ! is_array( $flow_config[ $step_id ] ) ) { + return false; + } + + $slot = $backup['slot'] ?? QueueAbility::SLOT_PROMPT_QUEUE; + $entry = null; + + if ( QueueAbility::SLOT_CONFIG_PATCH_QUEUE === $slot && isset( $backup['patch'] ) && is_array( $backup['patch'] ) ) { + $entry = array( + 'patch' => $backup['patch'], + 'added_at' => $backup['added_at'] ?? gmdate( 'c' ), + ); + } elseif ( isset( $backup['prompt'] ) ) { + $slot = QueueAbility::SLOT_PROMPT_QUEUE; + $entry = array( + 'prompt' => $backup['prompt'], + 'added_at' => $backup['added_at'] ?? gmdate( 'c' ), + ); + } + + if ( null === $entry ) { + return false; + } + + if ( ! isset( $flow_config[ $step_id ][ $slot ] ) || ! is_array( $flow_config[ $step_id ][ $slot ] ) ) { + $flow_config[ $step_id ][ $slot ] = array(); + } + + $flow_config[ $step_id ][ $slot ][] = $entry; + + return true; + } + /** * Delete jobs based on criteria. * diff --git a/inc/Abilities/Job/RecoverStuckJobsAbility.php b/inc/Abilities/Job/RecoverStuckJobsAbility.php index 2d4780bca..7b0628d47 100644 --- a/inc/Abilities/Job/RecoverStuckJobsAbility.php +++ b/inc/Abilities/Job/RecoverStuckJobsAbility.php @@ -241,46 +241,10 @@ public function execute( array $input ): array { do_action( 'datamachine_job_complete', $job_id, 'failed' ); - // Check for queued_prompt_backup and requeue if found. - // Slot-aware: AI backups go back to prompt_queue, - // fetch backups go back to config_patch_queue. + // Restore drain-mode queued_prompt_backup if the prior run removed an entry. $backup = $engine_data['queued_prompt_backup'] ?? array(); - if ( ! empty( $backup ) && isset( $backup['flow_step_id'] ) ) { - $slot = $backup['slot'] ?? \DataMachine\Abilities\Flow\QueueAbility::SLOT_PROMPT_QUEUE; - $flow = $this->db_flows->get_flow( $job_flow_id ); - if ( $flow && isset( $flow['flow_config'] ) ) { - $flow_config = $flow['flow_config']; - $step_id = $backup['flow_step_id']; - - if ( isset( $flow_config[ $step_id ] ) ) { - $entry = null; - - if ( \DataMachine\Abilities\Flow\QueueAbility::SLOT_CONFIG_PATCH_QUEUE === $slot && isset( $backup['patch'] ) && is_array( $backup['patch'] ) ) { - $entry = array( - 'patch' => $backup['patch'], - 'added_at' => gmdate( 'c' ), - ); - } elseif ( isset( $backup['prompt'] ) ) { - $entry = array( - 'prompt' => $backup['prompt'], - 'added_at' => gmdate( 'c' ), - ); - $slot = \DataMachine\Abilities\Flow\QueueAbility::SLOT_PROMPT_QUEUE; - } - - if ( null !== $entry ) { - if ( ! isset( $flow_config[ $step_id ][ $slot ] ) || ! is_array( $flow_config[ $step_id ][ $slot ] ) ) { - $flow_config[ $step_id ][ $slot ] = array(); - } - $flow_config[ $step_id ][ $slot ][] = $entry; - - $update_result = $this->db_flows->update_flow( $job_flow_id, array( 'flow_config' => $flow_config ) ); - if ( $update_result ) { - ++$requeued; - } - } - } - } + if ( ! empty( $backup ) && $this->restoreQueuedPromptBackup( $job_flow_id, $backup ) ) { + ++$requeued; } } else { $jobs[] = array( diff --git a/inc/Abilities/Job/RetryJobAbility.php b/inc/Abilities/Job/RetryJobAbility.php index 64e2483ec..f7b839965 100644 --- a/inc/Abilities/Job/RetryJobAbility.php +++ b/inc/Abilities/Job/RetryJobAbility.php @@ -120,54 +120,13 @@ public function execute( array $input ): array { do_action( 'datamachine_job_complete', $job_id, 'failed' ); - // Check for queued_prompt_backup and requeue if found. The - // `slot` field on the backup tells us which queue it came from - // (prompt_queue for AI, config_patch_queue for Fetch). Pre-#1292 - // backups have no `slot` field; treat them as prompt_queue for - // backward compat — those jobs predate the split. + // Restore drain-mode queued_prompt_backup if the prior run removed an entry. $prompt_requeued = false; $job_flow_id = (int) ( $job['flow_id'] ?? 0 ); $backup = $engine_data['queued_prompt_backup'] ?? array(); - if ( ! empty( $backup ) && isset( $backup['flow_step_id'] ) ) { - $slot = $backup['slot'] ?? \DataMachine\Abilities\Flow\QueueAbility::SLOT_PROMPT_QUEUE; - - $flow = $this->db_flows->get_flow( $job_flow_id ); - - if ( $flow && isset( $flow['flow_config'] ) ) { - $flow_config = $flow['flow_config']; - $step_id = $backup['flow_step_id']; - - if ( isset( $flow_config[ $step_id ] ) ) { - $entry = null; - - if ( \DataMachine\Abilities\Flow\QueueAbility::SLOT_CONFIG_PATCH_QUEUE === $slot && isset( $backup['patch'] ) && is_array( $backup['patch'] ) ) { - $entry = array( - 'patch' => $backup['patch'], - 'added_at' => gmdate( 'c' ), - ); - } elseif ( isset( $backup['prompt'] ) ) { - $entry = array( - 'prompt' => $backup['prompt'], - 'added_at' => gmdate( 'c' ), - ); - $slot = \DataMachine\Abilities\Flow\QueueAbility::SLOT_PROMPT_QUEUE; - } - - if ( null !== $entry ) { - if ( ! isset( $flow_config[ $step_id ][ $slot ] ) || ! is_array( $flow_config[ $step_id ][ $slot ] ) ) { - $flow_config[ $step_id ][ $slot ] = array(); - } - $flow_config[ $step_id ][ $slot ][] = $entry; - - $update_result = $this->db_flows->update_flow( $job_flow_id, array( 'flow_config' => $flow_config ) ); - - if ( $update_result ) { - $prompt_requeued = true; - } - } - } - } + if ( ! empty( $backup ) && $job_flow_id > 0 ) { + $prompt_requeued = $this->restoreQueuedPromptBackup( $job_flow_id, $backup ); } do_action( diff --git a/tests/job-queue-backup-restore-smoke.php b/tests/job-queue-backup-restore-smoke.php new file mode 100644 index 000000000..ee5a2cbaf --- /dev/null +++ b/tests/job-queue-backup-restore-smoke.php @@ -0,0 +1,146 @@ +restoreQueuedPromptBackupToFlowConfig( $flow_config, $backup ); + } +} + +$helper = new JobQueueBackupRestoreHarness(); + +echo "Case 1: loop-mode config_patch_queue backup is not duplicated\n"; +$flow_config = array( + 'step1' => array( + 'queue_mode' => 'loop', + 'config_patch_queue' => array( + array( 'patch' => array( 'slug' => 'second' ), 'added_at' => 't1' ), + array( 'patch' => array( 'slug' => 'first' ), 'added_at' => 't0' ), + ), + ), +); +$restored = $helper->apply( + $flow_config, + array( + 'slot' => DataMachine\Abilities\Flow\QueueAbility::SLOT_CONFIG_PATCH_QUEUE, + 'mode' => 'loop', + 'patch' => array( 'slug' => 'first' ), + 'flow_step_id' => 'step1', + 'added_at' => 't0', + ) +); +assert_job_queue_restore_smoke( 'loop config patch restore returns false', false === $restored ); +assert_job_queue_restore_smoke( 'loop config patch queue keeps rotated two entries', 2 === count( $flow_config['step1']['config_patch_queue'] ) ); +assert_job_queue_restore_smoke( 'loop config patch queue tail is still the consumed item', 'first' === ( $flow_config['step1']['config_patch_queue'][1]['patch']['slug'] ?? null ) ); + +echo "Case 2: manual retry and recover-stuck use the shared helper\n"; +$retry_src = file_get_contents( __DIR__ . '/../inc/Abilities/Job/RetryJobAbility.php' ) ?: ''; +$recover_src = file_get_contents( __DIR__ . '/../inc/Abilities/Job/RecoverStuckJobsAbility.php' ) ?: ''; +assert_job_queue_restore_smoke( 'manual retry calls restoreQueuedPromptBackup()', str_contains( $retry_src, 'restoreQueuedPromptBackup( $job_flow_id, $backup )' ) ); +assert_job_queue_restore_smoke( 'recover-stuck calls restoreQueuedPromptBackup()', str_contains( $recover_src, 'restoreQueuedPromptBackup( $job_flow_id, $backup )' ) ); +assert_job_queue_restore_smoke( 'manual retry no longer appends queue backups inline', ! str_contains( $retry_src, '$flow_config[ $step_id ][ $slot ][] = $entry;' ) ); +assert_job_queue_restore_smoke( 'recover-stuck no longer appends queue backups inline', ! str_contains( $recover_src, '$flow_config[ $step_id ][ $slot ][] = $entry;' ) ); + +echo "Case 3: drain-mode config_patch_queue backup is restored\n"; +$flow_config = array( + 'step1' => array( + 'queue_mode' => 'drain', + 'config_patch_queue' => array( + array( 'patch' => array( 'slug' => 'second' ), 'added_at' => 't1' ), + ), + ), +); +$restored = $helper->apply( + $flow_config, + array( + 'slot' => DataMachine\Abilities\Flow\QueueAbility::SLOT_CONFIG_PATCH_QUEUE, + 'mode' => 'drain', + 'patch' => array( 'slug' => 'first' ), + 'flow_step_id' => 'step1', + 'added_at' => 't0', + ) +); +assert_job_queue_restore_smoke( 'drain config patch restore returns true', true === $restored ); +assert_job_queue_restore_smoke( 'drain config patch queue appends consumed item', 2 === count( $flow_config['step1']['config_patch_queue'] ) ); +assert_job_queue_restore_smoke( 'drain config patch preserves backup payload', 'first' === ( $flow_config['step1']['config_patch_queue'][1]['patch']['slug'] ?? null ) ); + +echo "Case 4: prompt_queue follows the same mode semantics\n"; +$flow_config = array( + 'step1' => array( + 'queue_mode' => 'loop', + 'prompt_queue' => array( + array( 'prompt' => 'second', 'added_at' => 't1' ), + array( 'prompt' => 'first', 'added_at' => 't0' ), + ), + ), +); +$restored = $helper->apply( + $flow_config, + array( + 'slot' => DataMachine\Abilities\Flow\QueueAbility::SLOT_PROMPT_QUEUE, + 'mode' => 'loop', + 'prompt' => 'first', + 'flow_step_id' => 'step1', + 'added_at' => 't0', + ) +); +assert_job_queue_restore_smoke( 'loop prompt restore returns false', false === $restored ); +assert_job_queue_restore_smoke( 'loop prompt queue keeps rotated two entries', 2 === count( $flow_config['step1']['prompt_queue'] ) ); + +$flow_config = array( 'step1' => array( 'queue_mode' => 'drain', 'prompt_queue' => array() ) ); +$restored = $helper->apply( + $flow_config, + array( + 'slot' => DataMachine\Abilities\Flow\QueueAbility::SLOT_PROMPT_QUEUE, + 'mode' => 'drain', + 'prompt' => 'first', + 'flow_step_id' => 'step1', + 'added_at' => 't0', + ) +); +assert_job_queue_restore_smoke( 'drain prompt restore returns true', true === $restored ); +assert_job_queue_restore_smoke( 'drain prompt queue appends consumed prompt', 'first' === ( $flow_config['step1']['prompt_queue'][0]['prompt'] ?? null ) ); + +echo "Case 5: static-mode backups are ignored\n"; +$flow_config = array( 'step1' => array( 'queue_mode' => 'static', 'prompt_queue' => array( array( 'prompt' => 'first', 'added_at' => 't0' ) ) ) ); +$restored = $helper->apply( + $flow_config, + array( + 'slot' => DataMachine\Abilities\Flow\QueueAbility::SLOT_PROMPT_QUEUE, + 'mode' => 'static', + 'prompt' => 'first', + 'flow_step_id' => 'step1', + ) +); +assert_job_queue_restore_smoke( 'static prompt restore returns false', false === $restored ); +assert_job_queue_restore_smoke( 'static prompt queue is unchanged', 1 === count( $flow_config['step1']['prompt_queue'] ) ); + +echo "\nJob queue backup restore smoke complete: {$total} assertions, {$failed} failures.\n"; +if ( $failed > 0 ) { + exit( 1 ); +}