diff --git a/inc/Abilities/Engine/DrainJobAbility.php b/inc/Abilities/Engine/DrainJobAbility.php new file mode 100644 index 00000000..dc3095d6 --- /dev/null +++ b/inc/Abilities/Engine/DrainJobAbility.php @@ -0,0 +1,291 @@ +initDatabases(); + + $this->registerAbility(); + } + + /** + * Register the datamachine/drain-job ability. + */ + private function registerAbility(): void { + $register_callback = function () { + wp_register_ability( + 'datamachine/drain-job', + array( + 'label' => __( 'Drain Job', 'data-machine' ), + 'description' => __( 'Synchronously drain due Action Scheduler work for one Data Machine job until terminal or budgeted.', 'data-machine' ), + 'category' => 'datamachine-jobs', + 'input_schema' => array( + 'type' => 'object', + 'required' => array( 'job_id' ), + 'properties' => array( + 'job_id' => array( + 'type' => 'integer', + 'description' => __( 'Job ID to drain.', 'data-machine' ), + ), + 'step_budget' => array( + 'type' => 'integer', + 'default' => self::DEFAULT_STEP_BUDGET, + 'description' => __( 'Maximum Action Scheduler actions to execute before stopping.', 'data-machine' ), + ), + 'time_budget_ms' => array( + 'type' => 'integer', + 'default' => self::DEFAULT_TIME_BUDGET_MS, + 'description' => __( 'Maximum wall-clock milliseconds to drain before stopping.', 'data-machine' ), + ), + ), + ), + 'output_schema' => array( + 'type' => 'object', + 'properties' => array( + 'success' => array( 'type' => 'boolean' ), + 'job_id' => array( 'type' => 'integer' ), + 'terminal_state' => array( 'type' => array( 'string', 'null' ) ), + 'steps_run' => array( 'type' => 'integer' ), + 'actions_drained' => array( 'type' => 'integer' ), + 'wall_time_ms' => array( 'type' => 'integer' ), + 'remaining_actions' => array( 'type' => 'integer' ), + 'budget_exhausted' => array( 'type' => 'boolean' ), + 'last_error' => array( 'type' => array( 'string', 'null' ) ), + 'error' => array( 'type' => 'string' ), + ), + ), + 'execute_callback' => array( $this, 'execute' ), + 'permission_callback' => array( $this, 'checkPermission' ), + 'meta' => array( + 'show_in_rest' => false, + 'annotations' => array( + 'readonly' => false, + 'destructive' => false, + 'idempotent' => false, + ), + ), + ) + ); + }; + + if ( doing_action( 'wp_abilities_api_init' ) ) { + $register_callback(); + } elseif ( ! did_action( 'wp_abilities_api_init' ) ) { + add_action( 'wp_abilities_api_init', $register_callback ); + } + } + + /** + * Execute the drain-job ability. + * + * @param array $input Input with job_id and optional budgets. + * @return array Result with terminal status and drain stats. + */ + public function execute( array $input ): array { + $job_id = (int) ( $input['job_id'] ?? 0 ); + $step_budget = max( 1, (int) ( $input['step_budget'] ?? self::DEFAULT_STEP_BUDGET ) ); + $time_budget_ms = max( 1, (int) ( $input['time_budget_ms'] ?? self::DEFAULT_TIME_BUDGET_MS ) ); + $started_at = microtime( true ); + $actions_drained = 0; + $last_error = null; + + if ( $job_id <= 0 ) { + return array( + 'success' => false, + 'error' => 'A valid job_id is required.', + ); + } + + $job = $this->db_jobs->get_job( $job_id ); + if ( ! $job ) { + return array( + 'success' => false, + 'job_id' => $job_id, + 'error' => sprintf( 'Job %d not found.', $job_id ), + ); + } + + $guard_error = $this->getActionSchedulerGuardError(); + if ( null !== $guard_error ) { + return array( + 'success' => false, + 'job_id' => $job_id, + 'error' => $guard_error, + 'error_type' => 'action_scheduler_unavailable', + ); + } + + while ( true ) { + $job = $this->db_jobs->get_job( $job_id ); + $status = (string) ( $job['status'] ?? '' ); + if ( JobStatus::isStatusFinal( $status ) ) { + break; + } + + if ( $actions_drained >= $step_budget || $this->elapsedMs( $started_at ) >= $time_budget_ms ) { + break; + } + + $action_id = $this->getNextDuePendingActionId( $job_id ); + if ( ! $action_id ) { + break; + } + + try { + \ActionScheduler::runner()->process_action( $action_id, 'Data Machine drain-job ability' ); + } catch ( \Throwable $e ) { + $last_error = $e->getMessage(); + } + + ++$actions_drained; + } + + $job = $this->db_jobs->get_job( $job_id ); + $status = (string) ( $job['status'] ?? '' ); + $terminal_state = JobStatus::isStatusFinal( $status ) ? $status : null; + $remaining_actions = $this->countDuePendingActions( $job_id ); + $wall_time_ms = $this->elapsedMs( $started_at ); + $budget_exhausted = null === $terminal_state && ( $actions_drained >= $step_budget || $wall_time_ms >= $time_budget_ms ); + + return array( + 'success' => null !== $terminal_state, + 'job_id' => $job_id, + 'terminal_state' => $terminal_state, + 'steps_run' => $actions_drained, + 'actions_drained' => $actions_drained, + 'wall_time_ms' => $wall_time_ms, + 'remaining_actions' => $remaining_actions, + 'budget_exhausted' => $budget_exhausted, + 'last_error' => $last_error, + ); + } + + /** + * Return a typed guard error when Action Scheduler cannot be drained. + */ + private function getActionSchedulerGuardError(): ?string { + if ( ! class_exists( '\ActionScheduler' ) || ! method_exists( '\ActionScheduler', 'runner' ) ) { + return 'Action Scheduler queue runner is not available.'; + } + + $runner = \ActionScheduler::runner(); + if ( ! is_object( $runner ) || ! method_exists( $runner, 'process_action' ) ) { + return 'Action Scheduler action processor is not available.'; + } + + return null; + } + + /** + * Get the next due pending Data Machine step action for a single job. + */ + private function getNextDuePendingActionId( int $job_id ): int { + $ids = $this->getDuePendingActionIds( $job_id ); + + return $ids[0] ?? 0; + } + + /** + * Count due pending Data Machine step actions for a single job. + */ + private function countDuePendingActions( int $job_id ): int { + return count( $this->getDuePendingActionIds( $job_id ) ); + } + + /** + * Query due pending actions and filter by decoded job_id args. + * + * @return int[] Action IDs. + */ + private function getDuePendingActionIds( int $job_id ): array { + global $wpdb; + + $actions_table = $wpdb->prefix . 'actionscheduler_actions'; + $groups_table = $wpdb->prefix . 'actionscheduler_groups'; + + // phpcs:disable WordPress.DB.PreparedSQL.InterpolatedNotPrepared -- Table names are generated from the WP prefix. + $rows = $wpdb->get_results( + $wpdb->prepare( + "SELECT a.action_id, a.args + FROM {$actions_table} a + INNER JOIN {$groups_table} g ON g.group_id = a.group_id + WHERE a.hook = %s + AND a.status = 'pending' + AND g.slug = %s + AND a.scheduled_date_gmt <= %s + AND a.args LIKE %s + ORDER BY a.scheduled_date_gmt ASC, a.action_id ASC", + self::HOOK_EXECUTE_STEP, + self::GROUP, + gmdate( 'Y-m-d H:i:s' ), + '%"job_id"%' + ) + ); + // phpcs:enable WordPress.DB.PreparedSQL.InterpolatedNotPrepared + + $ids = array(); + foreach ( $rows as $row ) { + if ( $this->extractActionJobId( (string) $row->args ) !== $job_id ) { + continue; + } + + $ids[] = (int) $row->action_id; + } + + return $ids; + } + + /** + * Extract job_id from Action Scheduler's JSON-encoded args column. + */ + private function extractActionJobId( string $args_json ): int { + $args = json_decode( $args_json, true ); + if ( ! is_array( $args ) ) { + return 0; + } + + if ( isset( $args['job_id'] ) ) { + return (int) $args['job_id']; + } + + foreach ( $args as $value ) { + if ( is_array( $value ) && isset( $value['job_id'] ) ) { + return (int) $value['job_id']; + } + } + + return 0; + } + + /** + * Return elapsed wall-clock milliseconds. + */ + private function elapsedMs( float $started_at ): int { + return (int) round( ( microtime( true ) - $started_at ) * 1000 ); + } +} diff --git a/inc/Abilities/EngineAbilities.php b/inc/Abilities/EngineAbilities.php index d7b30014..fa950eb3 100644 --- a/inc/Abilities/EngineAbilities.php +++ b/inc/Abilities/EngineAbilities.php @@ -13,6 +13,7 @@ namespace DataMachine\Abilities; use DataMachine\Abilities\Engine\RunFlowAbility; +use DataMachine\Abilities\Engine\DrainJobAbility; use DataMachine\Abilities\Engine\ExecuteStepAbility; use DataMachine\Abilities\Engine\ScheduleNextStepAbility; use DataMachine\Abilities\Engine\ScheduleFlowAbility; @@ -24,6 +25,7 @@ class EngineAbilities { private static bool $registered = false; private RunFlowAbility $run_flow; + private DrainJobAbility $drain_job; private ExecuteStepAbility $execute_step; private ScheduleNextStepAbility $schedule_next_step; private ScheduleFlowAbility $schedule_flow; @@ -34,6 +36,7 @@ public function __construct() { } $this->run_flow = new RunFlowAbility(); + $this->drain_job = new DrainJobAbility(); $this->execute_step = new ExecuteStepAbility(); $this->schedule_next_step = new ScheduleNextStepAbility(); $this->schedule_flow = new ScheduleFlowAbility(); diff --git a/tests/Unit/Abilities/Engine/DrainJobAbilityTest.php b/tests/Unit/Abilities/Engine/DrainJobAbilityTest.php new file mode 100644 index 00000000..0d9a9eff --- /dev/null +++ b/tests/Unit/Abilities/Engine/DrainJobAbilityTest.php @@ -0,0 +1,141 @@ +user->create(array('role' => 'administrator')); + wp_set_current_user($user_id); + + $this->handler_filter = function (array $handlers, ?string $step_type = null): array { + if (null === $step_type || 'fetch' === $step_type) { + $handlers['drain_job_empty_fetch'] = array( + 'label' => 'Drain Job Empty Fetch', + 'type' => 'fetch', + 'class' => DrainJobEmptyFetchHandler::class, + ); + } + + return $handlers; + }; + add_filter('datamachine_handlers', $this->handler_filter, 10, 2); + } + + public function tear_down(): void + { + remove_filter('datamachine_handlers', $this->handler_filter, 10); + wp_set_current_user(0); + + parent::tear_down(); + } + + public function test_drain_job_runs_scheduled_flow_to_terminal_state(): void + { + $this->assertTrue(class_exists('\ActionScheduler'), 'Action Scheduler must be loaded for synchronous draining.'); + $this->assertNotFalse( + has_action('datamachine_execute_step'), + 'Execution engine execute-step bridge should be registered.' + ); + + $pipeline_id = (new Pipelines())->create_pipeline( + array( + 'pipeline_name' => 'Drain Job Ability Pipeline', + 'pipeline_config' => array(), + 'user_id' => get_current_user_id(), + ) + ); + $this->assertIsInt($pipeline_id); + + $flow_id = (new Flows())->create_flow( + array( + 'pipeline_id' => $pipeline_id, + 'flow_name' => 'Drain Job Ability Flow', + 'flow_config' => array(), + 'scheduling_config' => array('enabled' => true), + 'user_id' => get_current_user_id(), + ) + ); + $this->assertIsInt($flow_id); + + $flow_config = array( + 'flow_fetch' => FlowStepConfigFactory::build( + array( + 'flow_step_id' => 'flow_fetch', + 'pipeline_step_id' => 'pipeline_fetch', + 'step_type' => 'fetch', + 'execution_order' => 0, + 'pipeline_id' => $pipeline_id, + 'flow_id' => $flow_id, + 'handler_slug' => 'drain_job_empty_fetch', + 'handler_config' => array(), + 'queue_mode' => 'static', + ) + ), + ); + $this->assertTrue((new Flows())->update_flow($flow_id, array('flow_config' => $flow_config))); + + $run_result = (new RunFlowAbility())->execute(array('flow_id' => $flow_id)); + $this->assertTrue($run_result['success'] ?? false); + + $job_id = (int) $run_result['job_id']; + $this->assertGreaterThan(0, $job_id); + $this->assertSame(JobStatus::PROCESSING, (new Jobs())->get_job($job_id)['status'] ?? ''); + + $drain_result = (new DrainJobAbility())->execute( + array( + 'job_id' => $job_id, + 'step_budget' => 5, + 'time_budget_ms' => 10000, + ) + ); + + $this->assertTrue($drain_result['success'] ?? false); + $this->assertSame(JobStatus::COMPLETED_NO_ITEMS, $drain_result['terminal_state'] ?? null); + $this->assertSame(1, $drain_result['actions_drained'] ?? null); + $this->assertSame(0, $drain_result['remaining_actions'] ?? null); + $this->assertFalse($drain_result['budget_exhausted'] ?? true); + } +} + +class DrainJobEmptyFetchHandler +{ + /** + * Return no data so the single-step flow reaches completed_no_items. + * + * @param int|string $pipeline_id Pipeline ID. + * @param array $handler_settings Handler settings. + * @param string $job_id Job ID. + * @return array Empty packet list. + */ + // phpcs:ignore WordPress.NamingConventions.ValidFunctionName.MethodNameInvalid + public function get_fetch_data($pipeline_id, array $handler_settings, string $job_id): array + { + $pipeline_id; + $handler_settings; + $job_id; + + return array(); + } +} diff --git a/tests/drain-job-ability-smoke.php b/tests/drain-job-ability-smoke.php new file mode 100644 index 00000000..d0b9b23b --- /dev/null +++ b/tests/drain-job-ability-smoke.php @@ -0,0 +1,59 @@ + array( 'job_id' )", $ability_source, 'job_id is required input' ); +assert_drain_job_contains( 'DEFAULT_STEP_BUDGET = 50', $ability_source, 'sane default step budget is declared' ); +assert_drain_job_contains( 'DEFAULT_TIME_BUDGET_MS = 300000', $ability_source, 'sane default wall-clock budget is declared' ); +assert_drain_job_contains( "'step_budget'", $ability_source, 'step budget is exposed in input schema' ); +assert_drain_job_contains( "'time_budget_ms'", $ability_source, 'wall-clock budget is exposed in input schema' ); +assert_drain_job_contains( "'error_type' => 'action_scheduler_unavailable'", $ability_source, 'AS unavailability returns a typed error' ); +assert_drain_job_contains( "\\ActionScheduler::runner()->process_action", $ability_source, 'ability uses Action Scheduler public action processor' ); +assert_drain_job_contains( "WHERE a.hook = %s", $ability_source, 'query scopes to execute-step actions' ); +assert_drain_job_contains( "AND a.status = 'pending'", $ability_source, 'query ignores already-claimed in-progress actions' ); +assert_drain_job_contains( "AND g.slug = %s", $ability_source, 'query scopes to Data Machine AS group' ); +assert_drain_job_contains( "extractActionJobId", $ability_source, 'query results are filtered to one job_id' ); +assert_drain_job_contains( 'JobStatus::isStatusFinal', $ability_source, 'drain stops on terminal job status' ); +assert_drain_job_contains( "'actions_drained'", $ability_source, 'output reports actions drained' ); +assert_drain_job_contains( "'wall_time_ms'", $ability_source, 'output reports elapsed wall time' ); +assert_drain_job_contains( "use DataMachine\\Abilities\\Engine\\DrainJobAbility;", $registry_source, 'EngineAbilities imports DrainJobAbility' ); +assert_drain_job_contains( '$this->drain_job = new DrainJobAbility();', $registry_source, 'EngineAbilities instantiates DrainJobAbility' ); +assert_drain_job_not_contains( 'WP_CLI::', $ability_source, 'drain-job ability does not add a WP-CLI execution path' ); +assert_drain_job_not_contains( 'as_run_queue', $ability_source, 'drain-job avoids unscoped queue draining' ); + +$query_start = strpos( $ability_source, 'private function getDuePendingActionIds' ); +assert_drain_job_true( false !== $query_start, 'job-scoped AS query helper exists' ); +$query_source = false === $query_start ? '' : substr( $ability_source, $query_start ); +assert_drain_job_not_contains( 'LIMIT 1', $query_source, 'job filtering is not defeated by pre-filter SQL limiting' ); + +echo "OK ({$assertions} assertions)\n";