Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions inc/Abilities/Job/JobHelpers.php
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
namespace DataMachine\Abilities\Job;

use DataMachine\Abilities\PermissionHelper;
use DataMachine\Abilities\Flow\QueueAbility;

use DataMachine\Core\Admin\DateFormatter;
use DataMachine\Core\Database\Flows\Flows;
Expand Down Expand Up @@ -189,6 +190,77 @@ protected function createJob( int $flow_id, int $pipeline_id = 0 ): ?int {
return $job_id;
}

/**
* Restore a queue item backup when retry/recovery needs to undo a drain-mode consume.
*
* Loop mode already rotates the consumed item back into the queue, and static mode never
* mutates queue storage, so only drain-mode backups are appended.
*
* @param int $flow_id Flow ID containing the queued step.
* @param array $backup queued_prompt_backup payload from job engine_data.
* @return bool Whether the backup was restored to the flow config.
*/
protected function restoreQueuedPromptBackup( int $flow_id, array $backup ): bool {
$flow = $this->db_flows->get_flow( $flow_id );
if ( ! $flow || ! isset( $flow['flow_config'] ) || ! is_array( $flow['flow_config'] ) ) {
return false;
}

$flow_config = $flow['flow_config'];
if ( ! $this->restoreQueuedPromptBackupToFlowConfig( $flow_config, $backup ) ) {
return false;
}

return (bool) $this->db_flows->update_flow( $flow_id, array( 'flow_config' => $flow_config ) );
}

/**
* Apply queue-backup restoration semantics to an in-memory flow config.
*
* @param array $flow_config Flow config to mutate when restoration is needed.
* @param array $backup queued_prompt_backup payload from job engine_data.
* @return bool Whether an entry was appended.
*/
protected function restoreQueuedPromptBackupToFlowConfig( array &$flow_config, array $backup ): bool {
$mode = $backup['mode'] ?? 'drain';
if ( 'drain' !== $mode || empty( $backup['flow_step_id'] ) ) {
return false;
}

$step_id = $backup['flow_step_id'];
if ( ! isset( $flow_config[ $step_id ] ) || ! is_array( $flow_config[ $step_id ] ) ) {
return false;
}

$slot = $backup['slot'] ?? QueueAbility::SLOT_PROMPT_QUEUE;
$entry = null;

if ( QueueAbility::SLOT_CONFIG_PATCH_QUEUE === $slot && isset( $backup['patch'] ) && is_array( $backup['patch'] ) ) {
$entry = array(
'patch' => $backup['patch'],
'added_at' => $backup['added_at'] ?? gmdate( 'c' ),
);
} elseif ( isset( $backup['prompt'] ) ) {
$slot = QueueAbility::SLOT_PROMPT_QUEUE;
$entry = array(
'prompt' => $backup['prompt'],
'added_at' => $backup['added_at'] ?? gmdate( 'c' ),
);
}

if ( null === $entry ) {
return false;
}

if ( ! isset( $flow_config[ $step_id ][ $slot ] ) || ! is_array( $flow_config[ $step_id ][ $slot ] ) ) {
$flow_config[ $step_id ][ $slot ] = array();
}

$flow_config[ $step_id ][ $slot ][] = $entry;

return true;
}

/**
* Delete jobs based on criteria.
*
Expand Down
42 changes: 3 additions & 39 deletions inc/Abilities/Job/RecoverStuckJobsAbility.php
Original file line number Diff line number Diff line change
Expand Up @@ -241,46 +241,10 @@ public function execute( array $input ): array {

do_action( 'datamachine_job_complete', $job_id, 'failed' );

// Check for queued_prompt_backup and requeue if found.
// Slot-aware: AI backups go back to prompt_queue,
// fetch backups go back to config_patch_queue.
// Restore drain-mode queued_prompt_backup if the prior run removed an entry.
$backup = $engine_data['queued_prompt_backup'] ?? array();
if ( ! empty( $backup ) && isset( $backup['flow_step_id'] ) ) {
$slot = $backup['slot'] ?? \DataMachine\Abilities\Flow\QueueAbility::SLOT_PROMPT_QUEUE;
$flow = $this->db_flows->get_flow( $job_flow_id );
if ( $flow && isset( $flow['flow_config'] ) ) {
$flow_config = $flow['flow_config'];
$step_id = $backup['flow_step_id'];

if ( isset( $flow_config[ $step_id ] ) ) {
$entry = null;

if ( \DataMachine\Abilities\Flow\QueueAbility::SLOT_CONFIG_PATCH_QUEUE === $slot && isset( $backup['patch'] ) && is_array( $backup['patch'] ) ) {
$entry = array(
'patch' => $backup['patch'],
'added_at' => gmdate( 'c' ),
);
} elseif ( isset( $backup['prompt'] ) ) {
$entry = array(
'prompt' => $backup['prompt'],
'added_at' => gmdate( 'c' ),
);
$slot = \DataMachine\Abilities\Flow\QueueAbility::SLOT_PROMPT_QUEUE;
}

if ( null !== $entry ) {
if ( ! isset( $flow_config[ $step_id ][ $slot ] ) || ! is_array( $flow_config[ $step_id ][ $slot ] ) ) {
$flow_config[ $step_id ][ $slot ] = array();
}
$flow_config[ $step_id ][ $slot ][] = $entry;

$update_result = $this->db_flows->update_flow( $job_flow_id, array( 'flow_config' => $flow_config ) );
if ( $update_result ) {
++$requeued;
}
}
}
}
if ( ! empty( $backup ) && $this->restoreQueuedPromptBackup( $job_flow_id, $backup ) ) {
++$requeued;
}
} else {
$jobs[] = array(
Expand Down
47 changes: 3 additions & 44 deletions inc/Abilities/Job/RetryJobAbility.php
Original file line number Diff line number Diff line change
Expand Up @@ -120,54 +120,13 @@ public function execute( array $input ): array {

do_action( 'datamachine_job_complete', $job_id, 'failed' );

// Check for queued_prompt_backup and requeue if found. The
// `slot` field on the backup tells us which queue it came from
// (prompt_queue for AI, config_patch_queue for Fetch). Pre-#1292
// backups have no `slot` field; treat them as prompt_queue for
// backward compat — those jobs predate the split.
// Restore drain-mode queued_prompt_backup if the prior run removed an entry.
$prompt_requeued = false;
$job_flow_id = (int) ( $job['flow_id'] ?? 0 );
$backup = $engine_data['queued_prompt_backup'] ?? array();

if ( ! empty( $backup ) && isset( $backup['flow_step_id'] ) ) {
$slot = $backup['slot'] ?? \DataMachine\Abilities\Flow\QueueAbility::SLOT_PROMPT_QUEUE;

$flow = $this->db_flows->get_flow( $job_flow_id );

if ( $flow && isset( $flow['flow_config'] ) ) {
$flow_config = $flow['flow_config'];
$step_id = $backup['flow_step_id'];

if ( isset( $flow_config[ $step_id ] ) ) {
$entry = null;

if ( \DataMachine\Abilities\Flow\QueueAbility::SLOT_CONFIG_PATCH_QUEUE === $slot && isset( $backup['patch'] ) && is_array( $backup['patch'] ) ) {
$entry = array(
'patch' => $backup['patch'],
'added_at' => gmdate( 'c' ),
);
} elseif ( isset( $backup['prompt'] ) ) {
$entry = array(
'prompt' => $backup['prompt'],
'added_at' => gmdate( 'c' ),
);
$slot = \DataMachine\Abilities\Flow\QueueAbility::SLOT_PROMPT_QUEUE;
}

if ( null !== $entry ) {
if ( ! isset( $flow_config[ $step_id ][ $slot ] ) || ! is_array( $flow_config[ $step_id ][ $slot ] ) ) {
$flow_config[ $step_id ][ $slot ] = array();
}
$flow_config[ $step_id ][ $slot ][] = $entry;

$update_result = $this->db_flows->update_flow( $job_flow_id, array( 'flow_config' => $flow_config ) );

if ( $update_result ) {
$prompt_requeued = true;
}
}
}
}
if ( ! empty( $backup ) && $job_flow_id > 0 ) {
$prompt_requeued = $this->restoreQueuedPromptBackup( $job_flow_id, $backup );
}

do_action(
Expand Down
146 changes: 146 additions & 0 deletions tests/job-queue-backup-restore-smoke.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
<?php
/**
* Pure-PHP smoke test for queued item restoration on job retry/recovery (#1810).
*
* Run with: php tests/job-queue-backup-restore-smoke.php
*
* @package DataMachine\Tests
*/

require_once __DIR__ . '/bootstrap-unit.php';

$failed = 0;
$total = 0;

function assert_job_queue_restore_smoke( string $name, bool $cond, string $detail = '' ): void {
global $failed, $total;
++$total;
if ( $cond ) {
echo " [PASS] $name\n";
return;
}

echo " [FAIL] $name" . ( $detail ? " - $detail" : '' ) . "\n";
++$failed;
}

class JobQueueBackupRestoreHarness {
use DataMachine\Abilities\Job\JobHelpers;

public function apply( array &$flow_config, array $backup ): bool {
return $this->restoreQueuedPromptBackupToFlowConfig( $flow_config, $backup );
}
}

$helper = new JobQueueBackupRestoreHarness();

echo "Case 1: loop-mode config_patch_queue backup is not duplicated\n";
$flow_config = array(
'step1' => array(
'queue_mode' => 'loop',
'config_patch_queue' => array(
array( 'patch' => array( 'slug' => 'second' ), 'added_at' => 't1' ),
array( 'patch' => array( 'slug' => 'first' ), 'added_at' => 't0' ),
),
),
);
$restored = $helper->apply(
$flow_config,
array(
'slot' => DataMachine\Abilities\Flow\QueueAbility::SLOT_CONFIG_PATCH_QUEUE,
'mode' => 'loop',
'patch' => array( 'slug' => 'first' ),
'flow_step_id' => 'step1',
'added_at' => 't0',
)
);
assert_job_queue_restore_smoke( 'loop config patch restore returns false', false === $restored );
assert_job_queue_restore_smoke( 'loop config patch queue keeps rotated two entries', 2 === count( $flow_config['step1']['config_patch_queue'] ) );
assert_job_queue_restore_smoke( 'loop config patch queue tail is still the consumed item', 'first' === ( $flow_config['step1']['config_patch_queue'][1]['patch']['slug'] ?? null ) );

echo "Case 2: manual retry and recover-stuck use the shared helper\n";
$retry_src = file_get_contents( __DIR__ . '/../inc/Abilities/Job/RetryJobAbility.php' ) ?: '';
$recover_src = file_get_contents( __DIR__ . '/../inc/Abilities/Job/RecoverStuckJobsAbility.php' ) ?: '';
assert_job_queue_restore_smoke( 'manual retry calls restoreQueuedPromptBackup()', str_contains( $retry_src, 'restoreQueuedPromptBackup( $job_flow_id, $backup )' ) );
assert_job_queue_restore_smoke( 'recover-stuck calls restoreQueuedPromptBackup()', str_contains( $recover_src, 'restoreQueuedPromptBackup( $job_flow_id, $backup )' ) );
assert_job_queue_restore_smoke( 'manual retry no longer appends queue backups inline', ! str_contains( $retry_src, '$flow_config[ $step_id ][ $slot ][] = $entry;' ) );
assert_job_queue_restore_smoke( 'recover-stuck no longer appends queue backups inline', ! str_contains( $recover_src, '$flow_config[ $step_id ][ $slot ][] = $entry;' ) );

echo "Case 3: drain-mode config_patch_queue backup is restored\n";
$flow_config = array(
'step1' => array(
'queue_mode' => 'drain',
'config_patch_queue' => array(
array( 'patch' => array( 'slug' => 'second' ), 'added_at' => 't1' ),
),
),
);
$restored = $helper->apply(
$flow_config,
array(
'slot' => DataMachine\Abilities\Flow\QueueAbility::SLOT_CONFIG_PATCH_QUEUE,
'mode' => 'drain',
'patch' => array( 'slug' => 'first' ),
'flow_step_id' => 'step1',
'added_at' => 't0',
)
);
assert_job_queue_restore_smoke( 'drain config patch restore returns true', true === $restored );
assert_job_queue_restore_smoke( 'drain config patch queue appends consumed item', 2 === count( $flow_config['step1']['config_patch_queue'] ) );
assert_job_queue_restore_smoke( 'drain config patch preserves backup payload', 'first' === ( $flow_config['step1']['config_patch_queue'][1]['patch']['slug'] ?? null ) );

echo "Case 4: prompt_queue follows the same mode semantics\n";
$flow_config = array(
'step1' => array(
'queue_mode' => 'loop',
'prompt_queue' => array(
array( 'prompt' => 'second', 'added_at' => 't1' ),
array( 'prompt' => 'first', 'added_at' => 't0' ),
),
),
);
$restored = $helper->apply(
$flow_config,
array(
'slot' => DataMachine\Abilities\Flow\QueueAbility::SLOT_PROMPT_QUEUE,
'mode' => 'loop',
'prompt' => 'first',
'flow_step_id' => 'step1',
'added_at' => 't0',
)
);
assert_job_queue_restore_smoke( 'loop prompt restore returns false', false === $restored );
assert_job_queue_restore_smoke( 'loop prompt queue keeps rotated two entries', 2 === count( $flow_config['step1']['prompt_queue'] ) );

$flow_config = array( 'step1' => array( 'queue_mode' => 'drain', 'prompt_queue' => array() ) );
$restored = $helper->apply(
$flow_config,
array(
'slot' => DataMachine\Abilities\Flow\QueueAbility::SLOT_PROMPT_QUEUE,
'mode' => 'drain',
'prompt' => 'first',
'flow_step_id' => 'step1',
'added_at' => 't0',
)
);
assert_job_queue_restore_smoke( 'drain prompt restore returns true', true === $restored );
assert_job_queue_restore_smoke( 'drain prompt queue appends consumed prompt', 'first' === ( $flow_config['step1']['prompt_queue'][0]['prompt'] ?? null ) );

echo "Case 5: static-mode backups are ignored\n";
$flow_config = array( 'step1' => array( 'queue_mode' => 'static', 'prompt_queue' => array( array( 'prompt' => 'first', 'added_at' => 't0' ) ) ) );
$restored = $helper->apply(
$flow_config,
array(
'slot' => DataMachine\Abilities\Flow\QueueAbility::SLOT_PROMPT_QUEUE,
'mode' => 'static',
'prompt' => 'first',
'flow_step_id' => 'step1',
)
);
assert_job_queue_restore_smoke( 'static prompt restore returns false', false === $restored );
assert_job_queue_restore_smoke( 'static prompt queue is unchanged', 1 === count( $flow_config['step1']['prompt_queue'] ) );

echo "\nJob queue backup restore smoke complete: {$total} assertions, {$failed} failures.\n";
if ( $failed > 0 ) {
exit( 1 );
}
Loading