diff --git a/inc/Api/System/System.php b/inc/Api/System/System.php index 43af53ff8..e5498f66a 100644 --- a/inc/Api/System/System.php +++ b/inc/Api/System/System.php @@ -17,7 +17,6 @@ use DataMachine\Engine\Tasks\TaskRegistry; use DataMachine\Engine\AI\System\Tasks\SystemTask; use DataMachine\Engine\AI\System\SystemTaskPromptRegistry; -use DataMachine\Core\Database\Jobs\JobsOperations; if ( ! defined('ABSPATH') ) { exit; diff --git a/inc/Core/Database/Jobs/Jobs.php b/inc/Core/Database/Jobs/Jobs.php index 8b95d3d02..9a5b3e819 100644 --- a/inc/Core/Database/Jobs/Jobs.php +++ b/inc/Core/Database/Jobs/Jobs.php @@ -1,101 +1,622 @@ table_name = $wpdb->prefix . 'datamachine_jobs'; + /** + * Known compound status suffixes for exact-match queries. + * + * Each key maps a status prefix to the known variants that share it. + * Used by delete_old_jobs() and count_old_jobs() to build IN clauses + * instead of LIKE, which enables efficient use of the idx_status_created + * composite index. + * + * @var array + */ + private const STATUS_VARIANTS = array( + 'completed' => array( + 'completed', + 'completed_no_items', + 'agent_skipped', + ), + 'failed' => array( + 'failed', + ), + ); + + // --------------------------------------------------------------------- + // CRUD + // --------------------------------------------------------------------- - $this->operations = new JobsOperations(); - $this->status = new JobsStatus(); - } + /** + * Create a new job record. + * + * Supports three execution modes: + * - Direct execution: pipeline_id='direct', flow_id='direct' (chat/API workflows without saved pipeline/flow) + * - Database flow: pipeline_id and flow_id are numeric strings (saved pipelines and flows) + * - Standalone: pipeline_id=null, flow_id=null (jobs without pipeline/flow context) + * + * @param array $job_data Job data with optional pipeline_id and flow_id + * @return int|false Job ID on success, false on failure + */ + public function create_job( array $job_data ): int|false { + $pipeline_id = $job_data['pipeline_id'] ?? null; + $flow_id = $job_data['flow_id'] ?? null; - public function create_job( array $job_data ): int|false { - return $this->operations->create_job( $job_data ); + // Direct execution: both must be explicitly 'direct' + $is_direct_execution = ( 'direct' === $pipeline_id && 'direct' === $flow_id ); + + // Database flow: both must be valid numeric IDs > 0 + $is_database_flow = ( is_numeric( $pipeline_id ) && (int) $pipeline_id > 0 && is_numeric( $flow_id ) && (int) $flow_id > 0 ); + + // No pipeline/flow context: both are null. + $is_contextless = ( null === $pipeline_id && null === $flow_id ); + + if ( ! $is_direct_execution && ! $is_database_flow && ! $is_contextless ) { + do_action( + 'datamachine_log', + 'error', + 'Invalid job data: must provide both IDs as "direct", both as valid numeric IDs, or both as null', + array( + 'pipeline_id' => $pipeline_id, + 'flow_id' => $flow_id, + ) + ); + return false; + } + + // Normalize to string for database storage (null stays null when no context). + if ( $is_database_flow ) { + $pipeline_id = (string) absint( $pipeline_id ); + $flow_id = (string) absint( $flow_id ); + } + // Direct and contextless keep their values ('direct' or null). + + // Sanitize source — accept any string, don't gatekeep values. + $default_source = $is_contextless ? 'direct' : ( $is_direct_execution ? 'direct' : 'pipeline' ); + $source = sanitize_key( $job_data['source'] ?? $default_source ); + + $label = isset( $job_data['label'] ) ? sanitize_text_field( $job_data['label'] ) : null; + + $parent_job_id = isset( $job_data['parent_job_id'] ) ? absint( $job_data['parent_job_id'] ) : 0; + $user_id = isset( $job_data['user_id'] ) ? absint( $job_data['user_id'] ) : 0; + $agent_id = isset( $job_data['agent_id'] ) ? absint( $job_data['agent_id'] ) : null; + + $data = array( + 'user_id' => $user_id, + 'source' => $source, + 'label' => $label, + 'status' => 'pending', + ); + + $format = array( '%d', '%s', '%s', '%s' ); + + if ( null !== $agent_id && $agent_id > 0 ) { + $data['agent_id'] = $agent_id; + $format[] = '%d'; + } + + // Only include pipeline_id/flow_id when they have values (NULL omission lets DB default apply). + if ( ! $is_contextless ) { + $data['pipeline_id'] = $pipeline_id; + $data['flow_id'] = $flow_id; + $format[] = '%s'; + $format[] = '%s'; + } + + if ( $parent_job_id > 0 ) { + $data['parent_job_id'] = $parent_job_id; + $format[] = '%d'; + } + + // phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.DirectDatabaseQuery.NoCaching + $inserted = $this->wpdb->insert( $this->table_name, $data, $format ); + + if ( false === $inserted ) { + do_action( + 'datamachine_log', + 'error', + 'Failed to insert job', + array( + 'pipeline_id' => $pipeline_id, + 'flow_id' => $flow_id, + 'db_error' => $this->wpdb->last_error, + ) + ); + return false; + } + + return $this->wpdb->insert_id; } + public function get_job( int $job_id ): ?array { + if ( empty( $job_id ) ) { + return null; + } + + // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared + $job = $this->wpdb->get_row( $this->wpdb->prepare( 'SELECT * FROM %i WHERE job_id = %d', $this->table_name, $job_id ), ARRAY_A ); + + if ( $job && isset( $job['engine_data'] ) && is_string( $job['engine_data'] ) ) { + $decoded = json_decode( $job['engine_data'], true ); + if ( json_last_error() === JSON_ERROR_NONE ) { + $job['engine_data'] = $decoded; + } + } + + return $job; + } + /** + * Get jobs count with optional filtering. + * + * @param array $args Filter arguments: + * - flow_id: Filter by flow ID or 'direct' (optional) + * - pipeline_id: Filter by pipeline ID or 'direct' (optional) + * - status: Filter by status (optional) + * @return int Total count + */ public function get_jobs_count( array $args = array() ): int { - return $this->operations->get_jobs_count( $args ); + $where_clauses = array(); + $where_values = array(); + + if ( ! empty( $args['flow_id'] ) ) { + $where_clauses[] = 'flow_id = %s'; + $where_values[] = (string) $args['flow_id']; + } + + if ( ! empty( $args['pipeline_id'] ) ) { + $where_clauses[] = 'pipeline_id = %s'; + $where_values[] = (string) $args['pipeline_id']; + } + + if ( ! empty( $args['status'] ) ) { + $status_value = sanitize_text_field( $args['status'] ); + $where_clauses[] = 'status LIKE %s'; + $where_values[] = $this->wpdb->esc_like( $status_value ) . '%'; + } + + if ( ! empty( $args['source'] ) ) { + $where_clauses[] = 'source = %s'; + $where_values[] = sanitize_text_field( $args['source'] ); + } + + if ( isset( $args['user_id'] ) ) { + $where_clauses[] = 'user_id = %d'; + $where_values[] = absint( $args['user_id'] ); + } + + if ( isset( $args['agent_id'] ) ) { + $where_clauses[] = 'agent_id = %d'; + $where_values[] = absint( $args['agent_id'] ); + } + + if ( ! empty( $args['since'] ) ) { + $where_clauses[] = 'created_at >= %s'; + $where_values[] = sanitize_text_field( $args['since'] ); + } + + if ( isset( $args['parent_job_id'] ) ) { + $where_clauses[] = 'parent_job_id = %d'; + $where_values[] = absint( $args['parent_job_id'] ); + } + + if ( ! empty( $args['hide_children'] ) ) { + $where_clauses[] = '(parent_job_id IS NULL OR parent_job_id = 0)'; + } + + $where_sql = ''; + if ( ! empty( $where_clauses ) ) { + $where_sql = 'WHERE ' . implode( ' AND ', $where_clauses ); + } + + // phpcs:disable WordPress.DB.PreparedSQL.InterpolatedNotPrepared, WordPress.DB.PreparedSQLPlaceholders.ReplacementsWrongNumber + $query = $this->wpdb->prepare( + "SELECT COUNT(job_id) FROM %i {$where_sql}", + array_merge( array( $this->table_name ), $where_values ) + ); + // phpcs:enable WordPress.DB.PreparedSQL.InterpolatedNotPrepared, WordPress.DB.PreparedSQLPlaceholders.ReplacementsWrongNumber + + // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared -- Query is prepared above + $count = $this->wpdb->get_var( $query ); + + return (int) $count; } + /** + * Get paginated jobs with pipeline and flow names. + * + * Supports filtering by flow_id, pipeline_id, and status. + * + * @param array $args Query arguments: + * - orderby: Column to order by (default: 'j.job_id') + * - order: ASC or DESC (default: 'DESC') + * - per_page: Results per page (default: 20) + * - offset: Pagination offset (default: 0) + * - flow_id: Filter by flow ID (optional) + * - pipeline_id: Filter by pipeline ID (optional) + * - status: Filter by status (optional) + * @return array Jobs with pipeline and flow names + */ public function get_jobs_for_list_table( array $args ): array { - return $this->operations->get_jobs_for_list_table( $args ); - } + $orderby = $args['orderby'] ?? 'j.job_id'; + $order = strtoupper( $args['order'] ?? 'DESC' ); + $per_page = (int) ( $args['per_page'] ?? 20 ); + $offset = (int) ( $args['offset'] ?? 0 ); + + $pipelines_table = $this->wpdb->prefix . 'datamachine_pipelines'; + $flows_table = $this->wpdb->prefix . 'datamachine_flows'; + + // Validate order direction + $order = in_array( $order, array( 'ASC', 'DESC' ), true ) ? $order : 'DESC'; + + // Validate orderby column (whitelist approach) + $valid_orderby = array( + 'j.job_id', + 'j.pipeline_id', + 'j.flow_id', + 'j.status', + 'j.created_at', + 'j.completed_at', + 'p.pipeline_name', + 'f.flow_name', + ); + if ( ! in_array( $orderby, $valid_orderby, true ) ) { + $orderby = 'j.job_id'; + } - public function start_job( int $job_id, string $status = 'processing' ): bool { - return $this->status->start_job( $job_id, $status ); - } + $where_clauses = array(); + $where_values = array(); - public function complete_job( int $job_id, string $status ): bool { - return $this->status->complete_job( $job_id, $status ); - } + if ( ! empty( $args['flow_id'] ) ) { + $where_clauses[] = 'j.flow_id = %s'; + $where_values[] = (string) $args['flow_id']; + } - public function update_job_status( int $job_id, string $status ): bool { - return $this->status->update_job_status( $job_id, $status ); + if ( ! empty( $args['pipeline_id'] ) ) { + $where_clauses[] = 'j.pipeline_id = %s'; + $where_values[] = (string) $args['pipeline_id']; + } + + if ( ! empty( $args['status'] ) ) { + $status_value = sanitize_text_field( $args['status'] ); + // Prefix match: --status=failed matches "failed", "failed:reason", etc. + $where_clauses[] = 'j.status LIKE %s'; + $where_values[] = $this->wpdb->esc_like( $status_value ) . '%'; + } + + if ( ! empty( $args['source'] ) ) { + $where_clauses[] = 'j.source = %s'; + $where_values[] = sanitize_text_field( $args['source'] ); + } + + if ( isset( $args['user_id'] ) ) { + $where_clauses[] = 'j.user_id = %d'; + $where_values[] = absint( $args['user_id'] ); + } + + if ( isset( $args['agent_id'] ) ) { + $where_clauses[] = 'j.agent_id = %d'; + $where_values[] = absint( $args['agent_id'] ); + } + + if ( ! empty( $args['since'] ) ) { + $where_clauses[] = 'j.created_at >= %s'; + $where_values[] = sanitize_text_field( $args['since'] ); + } + + if ( isset( $args['parent_job_id'] ) ) { + $where_clauses[] = 'j.parent_job_id = %d'; + $where_values[] = absint( $args['parent_job_id'] ); + } + + if ( ! empty( $args['hide_children'] ) ) { + $where_clauses[] = '(j.parent_job_id IS NULL OR j.parent_job_id = 0)'; + } + + $where_sql = ''; + if ( ! empty( $where_clauses ) ) { + $where_sql = 'WHERE ' . implode( ' AND ', $where_clauses ); + } + + // Note: orderby is validated above, so safe to interpolate. + // For direct execution jobs, LEFT JOINs will return NULL for pipeline_name/flow_name. + // JOIN uses j.pipeline_id (varchar) directly against CAST of p.pipeline_id (int) to varchar + // for index-friendly matching on the jobs table side. + // phpcs:disable WordPress.DB.PreparedSQL.InterpolatedNotPrepared, WordPress.DB.PreparedSQLPlaceholders.ReplacementsWrongNumber + $query = $this->wpdb->prepare( + "SELECT j.*, p.pipeline_name, f.flow_name, + (SELECT COUNT(*) FROM %i c WHERE c.parent_job_id = j.job_id) AS child_count + FROM %i j + LEFT JOIN %i p ON j.pipeline_id = p.pipeline_id + LEFT JOIN %i f ON j.flow_id = f.flow_id + {$where_sql} + ORDER BY {$orderby} {$order} + LIMIT %d OFFSET %d", + array_merge( + array( $this->table_name, $this->table_name, $pipelines_table, $flows_table ), + $where_values, + array( $per_page, $offset ) + ) + ); + // phpcs:enable WordPress.DB.PreparedSQL.InterpolatedNotPrepared, WordPress.DB.PreparedSQLPlaceholders.ReplacementsWrongNumber + + // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared -- Query is prepared above + $results = $this->wpdb->get_results( $query, ARRAY_A ); + + return $results ? $results : array(); } + /** + * Get all jobs for pipeline deletion impact analysis. + */ public function get_jobs_for_pipeline( int $pipeline_id ): array { - return $this->operations->get_jobs_for_pipeline( $pipeline_id ); + if ( $pipeline_id <= 0 ) { + return array(); + } + + $db_flows = new \DataMachine\Core\Database\Flows\Flows(); + + $flows = $db_flows->get_flows_for_pipeline( $pipeline_id ); + if ( empty( $flows ) ) { + return array(); + } + + $all_jobs = array(); + foreach ( $flows as $flow ) { + $flow_id = $flow['flow_id']; + $flow_jobs = $this->get_jobs_for_flow( $flow_id ); + $all_jobs = array_merge( $all_jobs, $flow_jobs ); + } + + if ( ! empty( $all_jobs ) ) { + usort( + $all_jobs, + function ( $a, $b ) { + $time_a = is_array( $a ) ? $a['created_at'] : $a->created_at; + $time_b = is_array( $b ) ? $b['created_at'] : $b->created_at; + return strcmp( $time_b, $time_a ); // DESC order + } + ); + } + + return $all_jobs; } + /** + * Get all jobs for a flow. + * + * @param int|string $flow_id Flow ID or 'direct' + * @return array Jobs for the flow + */ public function get_jobs_for_flow( int|string $flow_id ): array { - return $this->operations->get_jobs_for_flow( $flow_id ); + + if ( empty( $flow_id ) ) { + return array(); + } + + // Skip if numeric and <= 0 (but allow 'direct' string) + if ( is_numeric( $flow_id ) && (int) $flow_id <= 0 ) { + return array(); + } + + // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared + $results = $this->wpdb->get_results( $this->wpdb->prepare( 'SELECT * FROM %i WHERE flow_id = %s ORDER BY created_at DESC', $this->table_name, (string) $flow_id ), ARRAY_A ); + + return $results ? $results : array(); } /** * Get all child jobs of a parent job, ordered by job_id ascending. * - * Used by fan-out system tasks (e.g. SystemTask::undo) to walk - * children's effects when the parent records none of its own. + * Used by fan-out system tasks (parent schedules N children via + * TaskScheduler::scheduleBatch) to walk their children's effects + * for undo, status aggregation, etc. Children are linked via the + * indexed `parent_job_id` column. + * + * Engine data is decoded from JSON to match get_job()'s shape so + * callers can treat parent and child rows uniformly. * * @since 0.83.0 * * @param int $parent_job_id Parent job ID. - * @return array Child job rows with engine_data decoded. + * @return array Array of child job rows with engine_data decoded, or empty array. */ public function get_children( int $parent_job_id ): array { - return $this->operations->get_children( $parent_job_id ); + if ( $parent_job_id <= 0 ) { + return array(); + } + + // phpcs:disable WordPress.DB.PreparedSQL.NotPrepared -- Query is prepared on the next line. + $rows = $this->wpdb->get_results( + $this->wpdb->prepare( + 'SELECT * FROM %i WHERE parent_job_id = %d ORDER BY job_id ASC', + $this->table_name, + $parent_job_id + ), + ARRAY_A + ); + // phpcs:enable WordPress.DB.PreparedSQL.NotPrepared + + if ( empty( $rows ) ) { + return array(); + } + + foreach ( $rows as &$row ) { + if ( isset( $row['engine_data'] ) && is_string( $row['engine_data'] ) && '' !== $row['engine_data'] ) { + $decoded = json_decode( $row['engine_data'], true ); + if ( json_last_error() === JSON_ERROR_NONE ) { + $row['engine_data'] = $decoded; + } else { + $row['engine_data'] = array(); + } + } else { + $row['engine_data'] = array(); + } + } + unset( $row ); + + return $rows; } + /** + * Get the latest job for each flow in a batch. + * + * Uses a subquery to efficiently get the most recent job per flow_id. + * + * @param array $flow_ids Array of flow IDs to query (numeric IDs only, not 'direct') + * @return array Map of [flow_id => job_row] for flows that have jobs + */ public function get_latest_jobs_by_flow_ids( array $flow_ids ): array { - return $this->operations->get_latest_jobs_by_flow_ids( $flow_ids ); + if ( empty( $flow_ids ) ) { + return array(); + } + + // Filter to numeric IDs only (this method is for database flows, not direct execution) + $flow_ids = array_filter( $flow_ids, fn( $id ) => is_numeric( $id ) && (int) $id > 0 ); + $flow_ids = array_map( fn( $id ) => (string) $id, $flow_ids ); + + if ( empty( $flow_ids ) ) { + return array(); + } + + $placeholders = implode( ',', array_fill( 0, count( $flow_ids ), '%s' ) ); + + // Subquery to get max job_id per flow, then join to get full row + // phpcs:disable WordPress.DB.PreparedSQL.InterpolatedNotPrepared, WordPress.DB.PreparedSQLPlaceholders.ReplacementsWrongNumber + $query = $this->wpdb->prepare( + "SELECT j.* FROM %i j + INNER JOIN ( + SELECT flow_id, MAX(job_id) as max_job_id + FROM %i + WHERE flow_id IN ({$placeholders}) + GROUP BY flow_id + ) latest ON j.job_id = latest.max_job_id", + array_merge( array( $this->table_name, $this->table_name ), $flow_ids ) + ); + // phpcs:enable WordPress.DB.PreparedSQL.InterpolatedNotPrepared, WordPress.DB.PreparedSQLPlaceholders.ReplacementsWrongNumber + + // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared -- Query is prepared above + $results = $this->wpdb->get_results( $query, ARRAY_A ); + + if ( ! $results ) { + return array(); + } + + // Key by flow_id for easy lookup (keep as string for consistency) + $jobs_by_flow = array(); + foreach ( $results as $job ) { + $jobs_by_flow[ $job['flow_id'] ] = $job; + } + + return $jobs_by_flow; } - public function delete_jobs( array $criteria = array() ): int|false { - return $this->operations->delete_jobs( $criteria ); + /** + * Resolve a status prefix to known variants for indexed lookups. + * + * Falls back to a LIKE pattern when the prefix is not in STATUS_VARIANTS + * (e.g. custom statuses from third-party handlers). + * + * @param string $status_prefix The status prefix (e.g. 'completed', 'failed'). + * @return array{type: 'in', values: string[]} | array{type: 'like', pattern: string} + */ + private function resolve_status_match( string $status_prefix ): array { + if ( isset( self::STATUS_VARIANTS[ $status_prefix ] ) ) { + return array( + 'type' => 'in', + 'values' => self::STATUS_VARIANTS[ $status_prefix ], + ); + } + + return array( + 'type' => 'like', + 'pattern' => $this->wpdb->esc_like( $status_prefix ) . '%', + ); } /** * Delete old jobs by status and age. * + * Removes jobs matching the given status pattern that are older than + * the specified number of days. Uses LIKE matching to handle compound + * statuses (e.g., "failed - timeout"). + * * @since 0.28.0 * - * @param string $status_pattern Base status to match (e.g., 'failed'). + * @param string $status_pattern Base status to match (e.g., 'failed'). Uses LIKE prefix matching. * @param int $older_than_days Delete jobs older than this many days. * @return int|false Number of deleted rows, or false on error. */ public function delete_old_jobs( string $status_pattern, int $older_than_days ): int|false { - return $this->operations->delete_old_jobs( $status_pattern, $older_than_days ); + if ( empty( $status_pattern ) || $older_than_days < 1 ) { + return false; + } + + $cutoff_datetime = gmdate( 'Y-m-d H:i:s', time() - ( $older_than_days * DAY_IN_SECONDS ) ); + $match = $this->resolve_status_match( $status_pattern ); + + // phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.DirectDatabaseQuery.NoCaching + // phpcs:disable WordPress.DB.PreparedSQL -- Table name from $wpdb->prefix, not user input. + if ( 'in' === $match['type'] ) { + $placeholders = implode( ',', array_fill( 0, count( $match['values'] ), '%s' ) ); + $args = array_merge( + array( "DELETE FROM %i WHERE status IN ({$placeholders}) AND created_at < %s", $this->table_name ), + $match['values'], + array( $cutoff_datetime ) + ); + $result = $this->wpdb->query( $this->wpdb->prepare( ...$args ) ); + } else { + $result = $this->wpdb->query( + $this->wpdb->prepare( + 'DELETE FROM %i WHERE status LIKE %s AND created_at < %s', + $this->table_name, + $match['pattern'], + $cutoff_datetime + ) + ); + } + // phpcs:enable WordPress.DB.PreparedSQL + + do_action( + 'datamachine_log', + 'info', + 'Deleted old jobs', + array( + 'status_pattern' => $status_pattern, + 'older_than_days' => $older_than_days, + 'cutoff_datetime' => $cutoff_datetime, + 'jobs_deleted' => false !== $result ? $result : 0, + 'success' => false !== $result, + ) + ); + + return $result; } /** @@ -108,29 +629,429 @@ public function delete_old_jobs( string $status_pattern, int $older_than_days ): * @return int Number of matching jobs. */ public function count_old_jobs( string $status_pattern, int $older_than_days ): int { - return $this->operations->count_old_jobs( $status_pattern, $older_than_days ); + if ( empty( $status_pattern ) || $older_than_days < 1 ) { + return 0; + } + + $cutoff_datetime = gmdate( 'Y-m-d H:i:s', time() - ( $older_than_days * DAY_IN_SECONDS ) ); + $match = $this->resolve_status_match( $status_pattern ); + + // phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.DirectDatabaseQuery.NoCaching + // phpcs:disable WordPress.DB.PreparedSQL -- Table name from $wpdb->prefix, not user input. + if ( 'in' === $match['type'] ) { + $placeholders = implode( ',', array_fill( 0, count( $match['values'] ), '%s' ) ); + $args = array_merge( + array( "SELECT COUNT(*) FROM %i WHERE status IN ({$placeholders}) AND created_at < %s", $this->table_name ), + $match['values'], + array( $cutoff_datetime ) + ); + $count = $this->wpdb->get_var( $this->wpdb->prepare( ...$args ) ); + } else { + $count = $this->wpdb->get_var( + $this->wpdb->prepare( + 'SELECT COUNT(*) FROM %i WHERE status LIKE %s AND created_at < %s', + $this->table_name, + $match['pattern'], + $cutoff_datetime + ) + ); + } + // phpcs:enable WordPress.DB.PreparedSQL + + return (int) $count; + } + + /** + * Delete jobs by status criteria or all jobs. + */ + public function delete_jobs( array $criteria = array() ): int|false { + + if ( empty( $criteria ) ) { + do_action( 'datamachine_log', 'warning', 'No criteria provided for jobs deletion' ); + return false; + } + + if ( ! empty( $criteria['failed'] ) ) { + $failed_pattern = $this->wpdb->esc_like( 'failed' ) . '%'; + // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared + $result = $this->wpdb->query( $this->wpdb->prepare( 'DELETE FROM %i WHERE status LIKE %s', $this->table_name, $failed_pattern ) ); + } else { + // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared + $result = $this->wpdb->query( $this->wpdb->prepare( 'DELETE FROM %i', $this->table_name ) ); + } + + do_action( + 'datamachine_log', + 'debug', + 'Deleted jobs', + array( + 'criteria' => $criteria, + 'jobs_deleted' => false !== $result ? $result : 0, + 'success' => false !== $result, + ) + ); + + return $result; } + // --------------------------------------------------------------------- + // Engine data + // --------------------------------------------------------------------- + + /** + * Store engine data for centralized access via datamachine_engine_data filter. + */ public function store_engine_data( int $job_id, array $data ): bool { - return $this->operations->store_engine_data( $job_id, $data ); + if ( $job_id <= 0 ) { + do_action( 'datamachine_log', 'error', 'Invalid job ID for engine_data storage', array( 'job_id' => $job_id ) ); + return false; + } + + // Encode data as JSON for database storage. + $encoded = wp_json_encode( $data ); + $update_data = array( 'engine_data' => $encoded ); + $format = array( '%s' ); + + // Promote task_type to its own indexed column for fast lookups. + if ( isset( $data['task_type'] ) && is_string( $data['task_type'] ) ) { + $update_data['task_type'] = sanitize_key( $data['task_type'] ); + $format[] = '%s'; + } + + $result = $this->wpdb->update( + $this->table_name, + $update_data, + array( 'job_id' => $job_id ), + $format, + array( '%d' ) + ); + + if ( false === $result ) { + do_action( + 'datamachine_log', + 'error', + 'Failed to store engine_data', + array( + 'job_id' => $job_id, + 'db_error' => $this->wpdb->last_error, + ) + ); + return false; + } + + do_action( + 'datamachine_log', + 'debug', + 'Stored engine_data successfully', + array( + 'job_id' => $job_id, + 'data_keys' => array_keys( $data ), + ) + ); + + return true; } + /** + * Retrieve stored engine data for datamachine_engine_data filter access. + */ public function retrieve_engine_data( int $job_id ): array { - return $this->operations->retrieve_engine_data( $job_id ); + $job = $this->get_job( $job_id ); + + if ( $job && isset( $job['engine_data'] ) && is_array( $job['engine_data'] ) ) { + return $job['engine_data']; + } + + return array(); } - public function get_job( int $job_id ): ?array { - return $this->operations->get_job( $job_id ); + // --------------------------------------------------------------------- + // Status transitions + // --------------------------------------------------------------------- + + /** + * Update the status for a job. + * + * @param int $job_id The job ID. + * @param string $status The new status (e.g., 'processing'). + * @return bool True on success, false on failure. + */ + public function start_job( int $job_id, string $status = 'processing' ): bool { + if ( empty( $job_id ) ) { + return false; + } + // phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.DirectDatabaseQuery.NoCaching + $updated = $this->wpdb->update( + $this->table_name, + array( + 'status' => $status, + ), + array( 'job_id' => $job_id ), + array( '%s' ), // Format for data + array( '%d' ) // Format for WHERE + ); + + if ( false !== $updated ) { + RunMetrics::start( $job_id, array( 'status' => $status ) ); + } + + return false !== $updated; } - public function get_flow_health( int|string $flow_id ): array { - return $this->operations->get_flow_health( $flow_id ); + /** + * Update the status and completed_at time for a job. + * + * Accepts compound statuses like "agent_skipped - reason" via JobStatus validation. + * + * @param int $job_id The job ID. + * @param string $status The final status (any JobStatus final status, may be compound). + * @return bool True on success, false on failure. + */ + public function complete_job( int $job_id, string $status ): bool { + // Validate using JobStatus - supports compound statuses like "agent_skipped - reason" + if ( empty( $job_id ) || ! JobStatus::isStatusFinal( $status ) ) { + return false; + } + + // Truncate to fit varchar(255) column. Full reason is preserved in engine_data. + if ( strlen( $status ) > 255 ) { + $status = substr( $status, 0, 252 ) . '...'; + } + + $update_data = array( + 'status' => $status, + 'completed_at' => current_time( 'mysql', true ), + ); + + // phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.DirectDatabaseQuery.NoCaching + $updated = $this->wpdb->update( + $this->table_name, + $update_data, + array( 'job_id' => $job_id ), + array( '%s', '%s' ), + array( '%d' ) + ); + + if ( false !== $updated ) { + RunMetrics::complete( $job_id, $status ); + do_action( 'datamachine_job_complete', $job_id, $status ); + } + + return false !== $updated; } + /** + * Update job status. + * + * @param int $job_id The job ID. + * @param string $status The new status. + * @return bool True on success, false on failure. + */ + public function update_job_status( int $job_id, string $status ): bool { + + if ( empty( $job_id ) ) { + return false; + } + + // Truncate to fit varchar(255) column. + if ( strlen( $status ) > 255 ) { + $status = substr( $status, 0, 252 ) . '...'; + } + + $update_data = array( 'status' => $status ); + $format = array( '%s' ); + + // phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.DirectDatabaseQuery.NoCaching + $updated = $this->wpdb->update( + $this->table_name, + $update_data, + array( 'job_id' => $job_id ), + $format, + array( '%d' ) + ); + + return false !== $updated; + } + + // --------------------------------------------------------------------- + // Flow health + // --------------------------------------------------------------------- + + /** + * Get flows with consecutive failures or no_items above threshold. + * + * Uses cached flow health data when available. + * Only checks database flows (numeric IDs), not direct execution jobs. + * + * @param int $threshold Minimum consecutive count to flag as problem. + * @return array Array of [flow_id => counts_array] for problem flows. + */ public function get_problem_flow_ids( int $threshold = 3 ): array { - return $this->operations->get_problem_flow_ids( $threshold ); + // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared + $query = $this->wpdb->prepare( + "SELECT DISTINCT flow_id FROM %i WHERE flow_id != 'direct' AND flow_id REGEXP '^[0-9]+$'", + $this->table_name + ); + + // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared + $flow_ids = $this->wpdb->get_col( $query ); + + if ( empty( $flow_ids ) ) { + return array(); + } + + $problem_flows = array(); + + foreach ( $flow_ids as $flow_id ) { + $counts = $this->get_flow_health( $flow_id ); + + if ( $counts['consecutive_failures'] >= $threshold || $counts['consecutive_no_items'] >= $threshold ) { + $problem_flows[ $flow_id ] = $counts; + } + } + + return $problem_flows; + } + + /** + * Get recent jobs for a flow (limited). + * + * @param int|string $flow_id Flow ID. + * @param int $limit Max jobs to return. + * @return array Recent jobs, newest first. + */ + public function get_recent_jobs_for_flow( int|string $flow_id, int $limit = 10 ): array { + if ( empty( $flow_id ) ) { + return array(); + } + + if ( is_numeric( $flow_id ) && (int) $flow_id <= 0 ) { + return array(); + } + + // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared + // phpcs:disable WordPress.DB.PreparedSQL -- Table name from $wpdb->prefix, not user input. + $results = $this->wpdb->get_results( + $this->wpdb->prepare( + 'SELECT * FROM %i WHERE flow_id = %s ORDER BY created_at DESC LIMIT %d', + $this->table_name, + (string) $flow_id, + $limit + ), + ARRAY_A + ); + // phpcs:enable WordPress.DB.PreparedSQL + + return $results ? $results : array(); + } + + /** + * Get cached flow health or compute if missing. + * + * @param int|string $flow_id Flow ID. + * @return array Counts array. + */ + public function get_flow_health( int|string $flow_id ): array { + $cached = $this->get_flow_health_cache( $flow_id ); + if ( false !== $cached ) { + return $cached; + } + + $counts = $this->compute_consecutive_counts( $flow_id ); + $this->set_flow_health_cache( $flow_id, $counts ); + + return $counts; + } + + /** + * Update flow health cache when a job completes. + * + * Called via datamachine_job_complete hook. + * + * @param int $job_id Job ID that completed. + * @param string $status Final status. + */ + public function update_flow_health_cache( int $job_id, string $status ): void { + $job = $this->get_job( $job_id ); + if ( ! $job ) { + return; + } + + $flow_id = $job['flow_id']; + + // Skip direct execution jobs + if ( 'direct' === $flow_id || ! is_numeric( $flow_id ) ) { + return; + } + + // Compute fresh counts and cache + $counts = $this->compute_consecutive_counts( $flow_id ); + $this->set_flow_health_cache( $flow_id, $counts ); + } + + /** + * Get flow health from transient cache. + * + * @param int|string $flow_id Flow ID. + * @return array|false Cached counts or false if not cached. + */ + private function get_flow_health_cache( int|string $flow_id ): array|false { + return get_transient( "datamachine_flow_health_{$flow_id}" ); + } + + /** + * Set flow health in transient cache. + * + * @param int|string $flow_id Flow ID. + * @param array $counts Counts to cache. + */ + private function set_flow_health_cache( int|string $flow_id, array $counts ): void { + set_transient( "datamachine_flow_health_{$flow_id}", $counts, DAY_IN_SECONDS ); + } + + /** + * Compute consecutive counts from recent job history. + * + * @param int|string $flow_id Flow ID. + * @return array Counts array. + */ + private function compute_consecutive_counts( int|string $flow_id ): array { + $jobs = $this->get_recent_jobs_for_flow( $flow_id, 10 ); + + $result = array( + 'consecutive_failures' => 0, + 'consecutive_no_items' => 0, + 'latest_job' => $jobs[0] ?? null, + ); + + if ( empty( $jobs ) ) { + return $result; + } + + // Count consecutive failures from most recent + foreach ( $jobs as $job ) { + if ( JobStatus::isStatusFailure( $job['status'] ) ) { + ++$result['consecutive_failures']; + } else { + break; + } + } + + // Count consecutive no_items from most recent + foreach ( $jobs as $job ) { + if ( str_starts_with( $job['status'], 'completed_no_items' ) ) { + ++$result['consecutive_no_items']; + } else { + break; + } + } + + return $result; } + // --------------------------------------------------------------------- + // Schema + // --------------------------------------------------------------------- public static function create_table() { global $wpdb; diff --git a/inc/Core/Database/Jobs/JobsOperations.php b/inc/Core/Database/Jobs/JobsOperations.php deleted file mode 100644 index 951b2b9f7..000000000 --- a/inc/Core/Database/Jobs/JobsOperations.php +++ /dev/null @@ -1,923 +0,0 @@ - 0 - $is_database_flow = ( is_numeric( $pipeline_id ) && (int) $pipeline_id > 0 && is_numeric( $flow_id ) && (int) $flow_id > 0 ); - - // No pipeline/flow context: both are null. - $is_contextless = ( null === $pipeline_id && null === $flow_id ); - - if ( ! $is_direct_execution && ! $is_database_flow && ! $is_contextless ) { - do_action( - 'datamachine_log', - 'error', - 'Invalid job data: must provide both IDs as "direct", both as valid numeric IDs, or both as null', - array( - 'pipeline_id' => $pipeline_id, - 'flow_id' => $flow_id, - ) - ); - return false; - } - - // Normalize to string for database storage (null stays null when no context). - if ( $is_database_flow ) { - $pipeline_id = (string) absint( $pipeline_id ); - $flow_id = (string) absint( $flow_id ); - } - // Direct and contextless keep their values ('direct' or null). - - // Sanitize source — accept any string, don't gatekeep values. - $default_source = $is_contextless ? 'direct' : ( $is_direct_execution ? 'direct' : 'pipeline' ); - $source = sanitize_key( $job_data['source'] ?? $default_source ); - - $label = isset( $job_data['label'] ) ? sanitize_text_field( $job_data['label'] ) : null; - - $parent_job_id = isset( $job_data['parent_job_id'] ) ? absint( $job_data['parent_job_id'] ) : 0; - $user_id = isset( $job_data['user_id'] ) ? absint( $job_data['user_id'] ) : 0; - $agent_id = isset( $job_data['agent_id'] ) ? absint( $job_data['agent_id'] ) : null; - - $data = array( - 'user_id' => $user_id, - 'source' => $source, - 'label' => $label, - 'status' => 'pending', - ); - - $format = array( '%d', '%s', '%s', '%s' ); - - if ( null !== $agent_id && $agent_id > 0 ) { - $data['agent_id'] = $agent_id; - $format[] = '%d'; - } - - // Only include pipeline_id/flow_id when they have values (NULL omission lets DB default apply). - if ( ! $is_contextless ) { - $data['pipeline_id'] = $pipeline_id; - $data['flow_id'] = $flow_id; - $format[] = '%s'; - $format[] = '%s'; - } - - if ( $parent_job_id > 0 ) { - $data['parent_job_id'] = $parent_job_id; - $format[] = '%d'; - } - - // phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.DirectDatabaseQuery.NoCaching - $inserted = $this->wpdb->insert( $this->table_name, $data, $format ); - - if ( false === $inserted ) { - do_action( - 'datamachine_log', - 'error', - 'Failed to insert job', - array( - 'pipeline_id' => $pipeline_id, - 'flow_id' => $flow_id, - 'db_error' => $this->wpdb->last_error, - ) - ); - return false; - } - - $job_id = $this->wpdb->insert_id; - - return $job_id; - } - - public function get_job( int $job_id ): ?array { - if ( empty( $job_id ) ) { - return null; - } - - // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared - $job = $this->wpdb->get_row( $this->wpdb->prepare( 'SELECT * FROM %i WHERE job_id = %d', $this->table_name, $job_id ), ARRAY_A ); - - if ( $job && isset( $job['engine_data'] ) && is_string( $job['engine_data'] ) ) { - $decoded = json_decode( $job['engine_data'], true ); - if ( json_last_error() === JSON_ERROR_NONE ) { - $job['engine_data'] = $decoded; - } - } - - return $job; - } - - /** - * Get jobs count with optional filtering. - * - * @param array $args Filter arguments: - * - flow_id: Filter by flow ID or 'direct' (optional) - * - pipeline_id: Filter by pipeline ID or 'direct' (optional) - * - status: Filter by status (optional) - * @return int Total count - */ - public function get_jobs_count( array $args = array() ): int { - // Build WHERE clauses for filtering - $where_clauses = array(); - $where_values = array(); - - if ( ! empty( $args['flow_id'] ) ) { - $where_clauses[] = 'flow_id = %s'; - $where_values[] = (string) $args['flow_id']; - } - - if ( ! empty( $args['pipeline_id'] ) ) { - $where_clauses[] = 'pipeline_id = %s'; - $where_values[] = (string) $args['pipeline_id']; - } - - if ( ! empty( $args['status'] ) ) { - $status_value = sanitize_text_field( $args['status'] ); - $where_clauses[] = 'status LIKE %s'; - $where_values[] = $this->wpdb->esc_like( $status_value ) . '%'; - } - - if ( ! empty( $args['source'] ) ) { - $where_clauses[] = 'source = %s'; - $where_values[] = sanitize_text_field( $args['source'] ); - } - - if ( isset( $args['user_id'] ) ) { - $where_clauses[] = 'user_id = %d'; - $where_values[] = absint( $args['user_id'] ); - } - - if ( isset( $args['agent_id'] ) ) { - $where_clauses[] = 'agent_id = %d'; - $where_values[] = absint( $args['agent_id'] ); - } - - if ( ! empty( $args['since'] ) ) { - $where_clauses[] = 'created_at >= %s'; - $where_values[] = sanitize_text_field( $args['since'] ); - } - - if ( isset( $args['parent_job_id'] ) ) { - $where_clauses[] = 'parent_job_id = %d'; - $where_values[] = absint( $args['parent_job_id'] ); - } - - if ( ! empty( $args['hide_children'] ) ) { - $where_clauses[] = '(parent_job_id IS NULL OR parent_job_id = 0)'; - } - - $where_sql = ''; - if ( ! empty( $where_clauses ) ) { - $where_sql = 'WHERE ' . implode( ' AND ', $where_clauses ); - } - - // phpcs:disable WordPress.DB.PreparedSQL.InterpolatedNotPrepared, WordPress.DB.PreparedSQLPlaceholders.ReplacementsWrongNumber - $query = $this->wpdb->prepare( - "SELECT COUNT(job_id) FROM %i {$where_sql}", - array_merge( array( $this->table_name ), $where_values ) - ); - // phpcs:enable WordPress.DB.PreparedSQL.InterpolatedNotPrepared, WordPress.DB.PreparedSQLPlaceholders.ReplacementsWrongNumber - - // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared -- Query is prepared above - $count = $this->wpdb->get_var( $query ); - - return (int) $count; - } - - /** - * Get paginated jobs with pipeline and flow names. - * - * Supports filtering by flow_id, pipeline_id, and status. - * - * @param array $args Query arguments: - * - orderby: Column to order by (default: 'j.job_id') - * - order: ASC or DESC (default: 'DESC') - * - per_page: Results per page (default: 20) - * - offset: Pagination offset (default: 0) - * - flow_id: Filter by flow ID (optional) - * - pipeline_id: Filter by pipeline ID (optional) - * - status: Filter by status (optional) - * @return array Jobs with pipeline and flow names - */ - public function get_jobs_for_list_table( array $args ): array { - $orderby = $args['orderby'] ?? 'j.job_id'; - $order = strtoupper( $args['order'] ?? 'DESC' ); - $per_page = (int) ( $args['per_page'] ?? 20 ); - $offset = (int) ( $args['offset'] ?? 0 ); - - $pipelines_table = $this->wpdb->prefix . 'datamachine_pipelines'; - $flows_table = $this->wpdb->prefix . 'datamachine_flows'; - - // Validate order direction - $order = in_array( $order, array( 'ASC', 'DESC' ), true ) ? $order : 'DESC'; - - // Validate orderby column (whitelist approach) - $valid_orderby = array( - 'j.job_id', - 'j.pipeline_id', - 'j.flow_id', - 'j.status', - 'j.created_at', - 'j.completed_at', - 'p.pipeline_name', - 'f.flow_name', - ); - if ( ! in_array( $orderby, $valid_orderby, true ) ) { - $orderby = 'j.job_id'; - } - - // Build WHERE clauses for filtering - $where_clauses = array(); - $where_values = array(); - - if ( ! empty( $args['flow_id'] ) ) { - $where_clauses[] = 'j.flow_id = %s'; - $where_values[] = (string) $args['flow_id']; - } - - if ( ! empty( $args['pipeline_id'] ) ) { - $where_clauses[] = 'j.pipeline_id = %s'; - $where_values[] = (string) $args['pipeline_id']; - } - - if ( ! empty( $args['status'] ) ) { - $status_value = sanitize_text_field( $args['status'] ); - // Prefix match: --status=failed matches "failed", "failed:reason", etc. - $where_clauses[] = 'j.status LIKE %s'; - $where_values[] = $this->wpdb->esc_like( $status_value ) . '%'; - } - - if ( ! empty( $args['source'] ) ) { - $where_clauses[] = 'j.source = %s'; - $where_values[] = sanitize_text_field( $args['source'] ); - } - - if ( isset( $args['user_id'] ) ) { - $where_clauses[] = 'j.user_id = %d'; - $where_values[] = absint( $args['user_id'] ); - } - - if ( isset( $args['agent_id'] ) ) { - $where_clauses[] = 'j.agent_id = %d'; - $where_values[] = absint( $args['agent_id'] ); - } - - if ( ! empty( $args['since'] ) ) { - $where_clauses[] = 'j.created_at >= %s'; - $where_values[] = sanitize_text_field( $args['since'] ); - } - - if ( isset( $args['parent_job_id'] ) ) { - $where_clauses[] = 'j.parent_job_id = %d'; - $where_values[] = absint( $args['parent_job_id'] ); - } - - if ( ! empty( $args['hide_children'] ) ) { - $where_clauses[] = '(j.parent_job_id IS NULL OR j.parent_job_id = 0)'; - } - - $where_sql = ''; - if ( ! empty( $where_clauses ) ) { - $where_sql = 'WHERE ' . implode( ' AND ', $where_clauses ); - } - - // Build the full query - // Note: orderby is validated above, so safe to interpolate - // For direct execution jobs, LEFT JOINs will return NULL for pipeline_name/flow_name - // JOIN uses j.pipeline_id (varchar) directly against CAST of p.pipeline_id (int) to varchar - // for index-friendly matching on the jobs table side. - // phpcs:disable WordPress.DB.PreparedSQL.InterpolatedNotPrepared, WordPress.DB.PreparedSQLPlaceholders.ReplacementsWrongNumber - $query = $this->wpdb->prepare( - "SELECT j.*, p.pipeline_name, f.flow_name, - (SELECT COUNT(*) FROM %i c WHERE c.parent_job_id = j.job_id) AS child_count - FROM %i j - LEFT JOIN %i p ON j.pipeline_id = p.pipeline_id - LEFT JOIN %i f ON j.flow_id = f.flow_id - {$where_sql} - ORDER BY {$orderby} {$order} - LIMIT %d OFFSET %d", - array_merge( - array( $this->table_name, $this->table_name, $pipelines_table, $flows_table ), - $where_values, - array( $per_page, $offset ) - ) - ); - // phpcs:enable WordPress.DB.PreparedSQL.InterpolatedNotPrepared, WordPress.DB.PreparedSQLPlaceholders.ReplacementsWrongNumber - - // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared -- Query is prepared above - $results = $this->wpdb->get_results( $query, ARRAY_A ); - - return $results ? $results : array(); - } - - /** - * Get all jobs for pipeline deletion impact analysis. - */ - public function get_jobs_for_pipeline( int $pipeline_id ): array { - if ( $pipeline_id <= 0 ) { - return array(); - } - - $db_flows = new \DataMachine\Core\Database\Flows\Flows(); - - $flows = $db_flows->get_flows_for_pipeline( $pipeline_id ); - if ( empty( $flows ) ) { - return array(); - } - - $all_jobs = array(); - foreach ( $flows as $flow ) { - $flow_id = $flow['flow_id']; - $flow_jobs = $this->get_jobs_for_flow( $flow_id ); - $all_jobs = array_merge( $all_jobs, $flow_jobs ); - } - - if ( ! empty( $all_jobs ) ) { - usort( - $all_jobs, - function ( $a, $b ) { - $time_a = is_array( $a ) ? $a['created_at'] : $a->created_at; - $time_b = is_array( $b ) ? $b['created_at'] : $b->created_at; - return strcmp( $time_b, $time_a ); // DESC order - } - ); - } - - return $all_jobs; - } - - /** - * Get all jobs for a flow. - * - * @param int|string $flow_id Flow ID or 'direct' - * @return array Jobs for the flow - */ - public function get_jobs_for_flow( int|string $flow_id ): array { - - if ( empty( $flow_id ) ) { - return array(); - } - - // Skip if numeric and <= 0 (but allow 'direct' string) - if ( is_numeric( $flow_id ) && (int) $flow_id <= 0 ) { - return array(); - } - - // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared - $results = $this->wpdb->get_results( $this->wpdb->prepare( 'SELECT * FROM %i WHERE flow_id = %s ORDER BY created_at DESC', $this->table_name, (string) $flow_id ), ARRAY_A ); - - return $results ? $results : array(); - } - - /** - * Get all child jobs of a parent job, ordered by job_id ascending. - * - * Used by fan-out system tasks (parent schedules N children via - * TaskScheduler::scheduleBatch) to walk their children's effects - * for undo, status aggregation, etc. Children are linked via the - * indexed `parent_job_id` column. - * - * Engine data is decoded from JSON to match get_job()'s shape so - * callers can treat parent and child rows uniformly. - * - * @since 0.83.0 - * - * @param int $parent_job_id Parent job ID. - * @return array Array of child job rows with engine_data decoded, or empty array. - */ - public function get_children( int $parent_job_id ): array { - if ( $parent_job_id <= 0 ) { - return array(); - } - - // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared - $rows = $this->wpdb->get_results( - $this->wpdb->prepare( - 'SELECT * FROM %i WHERE parent_job_id = %d ORDER BY job_id ASC', - $this->table_name, - $parent_job_id - ), - ARRAY_A - ); - - if ( empty( $rows ) ) { - return array(); - } - - foreach ( $rows as &$row ) { - if ( isset( $row['engine_data'] ) && is_string( $row['engine_data'] ) && '' !== $row['engine_data'] ) { - $decoded = json_decode( $row['engine_data'], true ); - if ( json_last_error() === JSON_ERROR_NONE ) { - $row['engine_data'] = $decoded; - } else { - $row['engine_data'] = array(); - } - } else { - $row['engine_data'] = array(); - } - } - unset( $row ); - - return $rows; - } - - /** - * Get the latest job for each flow in a batch. - * - * Uses a subquery to efficiently get the most recent job per flow_id. - * - * @param array $flow_ids Array of flow IDs to query (numeric IDs only, not 'direct') - * @return array Map of [flow_id => job_row] for flows that have jobs - */ - public function get_latest_jobs_by_flow_ids( array $flow_ids ): array { - if ( empty( $flow_ids ) ) { - return array(); - } - - // Filter to numeric IDs only (this method is for database flows, not direct execution) - $flow_ids = array_filter( $flow_ids, fn( $id ) => is_numeric( $id ) && (int) $id > 0 ); - $flow_ids = array_map( fn( $id ) => (string) $id, $flow_ids ); - - if ( empty( $flow_ids ) ) { - return array(); - } - - $placeholders = implode( ',', array_fill( 0, count( $flow_ids ), '%s' ) ); - - // Subquery to get max job_id per flow, then join to get full row - // phpcs:disable WordPress.DB.PreparedSQL.InterpolatedNotPrepared, WordPress.DB.PreparedSQLPlaceholders.ReplacementsWrongNumber - $query = $this->wpdb->prepare( - "SELECT j.* FROM %i j - INNER JOIN ( - SELECT flow_id, MAX(job_id) as max_job_id - FROM %i - WHERE flow_id IN ({$placeholders}) - GROUP BY flow_id - ) latest ON j.job_id = latest.max_job_id", - array_merge( array( $this->table_name, $this->table_name ), $flow_ids ) - ); - // phpcs:enable WordPress.DB.PreparedSQL.InterpolatedNotPrepared, WordPress.DB.PreparedSQLPlaceholders.ReplacementsWrongNumber - - // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared -- Query is prepared above - $results = $this->wpdb->get_results( $query, ARRAY_A ); - - if ( ! $results ) { - return array(); - } - - // Key by flow_id for easy lookup (keep as string for consistency) - $jobs_by_flow = array(); - foreach ( $results as $job ) { - $jobs_by_flow[ $job['flow_id'] ] = $job; - } - - return $jobs_by_flow; - } - - /** - * Delete old jobs by status and age. - * - * Removes jobs matching the given status pattern that are older than - * the specified number of days. Uses LIKE matching to handle compound - * statuses (e.g., "failed - timeout"). - * - * @since 0.28.0 - * - * @param string $status_pattern Base status to match (e.g., 'failed'). Uses LIKE prefix matching. - * @param int $older_than_days Delete jobs older than this many days. - * @return int|false Number of deleted rows, or false on error. - */ - /** - * Known compound status suffixes for exact-match queries. - * - * Each key maps a status prefix to the known variants that share it. - * Used by delete_old_jobs() and count_old_jobs() to build IN clauses - * instead of LIKE, which enables efficient use of the idx_status_created - * composite index. - * - * @var array - */ - private const STATUS_VARIANTS = array( - 'completed' => array( - 'completed', - 'completed_no_items', - 'agent_skipped', - ), - 'failed' => array( - 'failed', - ), - ); - - /** - * Resolve a status prefix to known variants for indexed lookups. - * - * Falls back to a LIKE pattern when the prefix is not in STATUS_VARIANTS - * (e.g. custom statuses from third-party handlers). - * - * @param string $status_prefix The status prefix (e.g. 'completed', 'failed'). - * @return array{type: 'in', values: string[]} | array{type: 'like', pattern: string} - */ - private function resolve_status_match( string $status_prefix ): array { - if ( isset( self::STATUS_VARIANTS[ $status_prefix ] ) ) { - return array( - 'type' => 'in', - 'values' => self::STATUS_VARIANTS[ $status_prefix ], - ); - } - - return array( - 'type' => 'like', - 'pattern' => $this->wpdb->esc_like( $status_prefix ) . '%', - ); - } - - public function delete_old_jobs( string $status_pattern, int $older_than_days ): int|false { - if ( empty( $status_pattern ) || $older_than_days < 1 ) { - return false; - } - - $cutoff_datetime = gmdate( 'Y-m-d H:i:s', time() - ( $older_than_days * DAY_IN_SECONDS ) ); - $match = $this->resolve_status_match( $status_pattern ); - - // phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.DirectDatabaseQuery.NoCaching - // phpcs:disable WordPress.DB.PreparedSQL -- Table name from $wpdb->prefix, not user input. - if ( 'in' === $match['type'] ) { - $placeholders = implode( ',', array_fill( 0, count( $match['values'] ), '%s' ) ); - $args = array_merge( - array( "DELETE FROM %i WHERE status IN ({$placeholders}) AND created_at < %s", $this->table_name ), - $match['values'], - array( $cutoff_datetime ) - ); - $result = $this->wpdb->query( $this->wpdb->prepare( ...$args ) ); - } else { - $result = $this->wpdb->query( - $this->wpdb->prepare( - 'DELETE FROM %i WHERE status LIKE %s AND created_at < %s', - $this->table_name, - $match['pattern'], - $cutoff_datetime - ) - ); - } - // phpcs:enable WordPress.DB.PreparedSQL - - do_action( - 'datamachine_log', - 'info', - 'Deleted old jobs', - array( - 'status_pattern' => $status_pattern, - 'older_than_days' => $older_than_days, - 'cutoff_datetime' => $cutoff_datetime, - 'jobs_deleted' => false !== $result ? $result : 0, - 'success' => false !== $result, - ) - ); - - return $result; - } - - /** - * Count jobs matching a status pattern older than a given age. - * - * @since 0.28.0 - * - * @param string $status_pattern Base status to match (e.g., 'failed'). - * @param int $older_than_days Count jobs older than this many days. - * @return int Number of matching jobs. - */ - public function count_old_jobs( string $status_pattern, int $older_than_days ): int { - if ( empty( $status_pattern ) || $older_than_days < 1 ) { - return 0; - } - - $cutoff_datetime = gmdate( 'Y-m-d H:i:s', time() - ( $older_than_days * DAY_IN_SECONDS ) ); - $match = $this->resolve_status_match( $status_pattern ); - - // phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.DirectDatabaseQuery.NoCaching - // phpcs:disable WordPress.DB.PreparedSQL -- Table name from $wpdb->prefix, not user input. - if ( 'in' === $match['type'] ) { - $placeholders = implode( ',', array_fill( 0, count( $match['values'] ), '%s' ) ); - $args = array_merge( - array( "SELECT COUNT(*) FROM %i WHERE status IN ({$placeholders}) AND created_at < %s", $this->table_name ), - $match['values'], - array( $cutoff_datetime ) - ); - $count = $this->wpdb->get_var( $this->wpdb->prepare( ...$args ) ); - } else { - $count = $this->wpdb->get_var( - $this->wpdb->prepare( - 'SELECT COUNT(*) FROM %i WHERE status LIKE %s AND created_at < %s', - $this->table_name, - $match['pattern'], - $cutoff_datetime - ) - ); - } - // phpcs:enable WordPress.DB.PreparedSQL - - return (int) $count; - } - - /** - * Delete jobs by status criteria or all jobs. - */ - public function delete_jobs( array $criteria = array() ): int|false { - - if ( empty( $criteria ) ) { - do_action( 'datamachine_log', 'warning', 'No criteria provided for jobs deletion' ); - return false; - } - - if ( ! empty( $criteria['failed'] ) ) { - $failed_pattern = $this->wpdb->esc_like( 'failed' ) . '%'; - // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared - $result = $this->wpdb->query( $this->wpdb->prepare( 'DELETE FROM %i WHERE status LIKE %s', $this->table_name, $failed_pattern ) ); - } else { - // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared - $result = $this->wpdb->query( $this->wpdb->prepare( 'DELETE FROM %i', $this->table_name ) ); - } - - do_action( - 'datamachine_log', - 'debug', - 'Deleted jobs', - array( - 'criteria' => $criteria, - 'jobs_deleted' => false !== $result ? $result : 0, - 'success' => false !== $result, - ) - ); - - return $result; - } - - /** - * Store engine data for centralized access via datamachine_engine_data filter. - */ - public function store_engine_data( int $job_id, array $data ): bool { - if ( $job_id <= 0 ) { - do_action( 'datamachine_log', 'error', 'Invalid job ID for engine_data storage', array( 'job_id' => $job_id ) ); - return false; - } - - // Encode data as JSON for database storage. - $encoded = wp_json_encode( $data ); - $update_data = array( 'engine_data' => $encoded ); - $format = array( '%s' ); - - // Promote task_type to its own indexed column for fast lookups. - if ( isset( $data['task_type'] ) && is_string( $data['task_type'] ) ) { - $update_data['task_type'] = sanitize_key( $data['task_type'] ); - $format[] = '%s'; - } - - $result = $this->wpdb->update( - $this->table_name, - $update_data, - array( 'job_id' => $job_id ), - $format, - array( '%d' ) - ); - - if ( false === $result ) { - do_action( - 'datamachine_log', - 'error', - 'Failed to store engine_data', - array( - 'job_id' => $job_id, - 'db_error' => $this->wpdb->last_error, - ) - ); - return false; - } - - do_action( - 'datamachine_log', - 'debug', - 'Stored engine_data successfully', - array( - 'job_id' => $job_id, - 'data_keys' => array_keys( $data ), - ) - ); - - return true; - } - - /** - * Retrieve stored engine data for datamachine_engine_data filter access. - */ - public function retrieve_engine_data( int $job_id ): array { - $job = $this->get_job( $job_id ); - - if ( $job && isset( $job['engine_data'] ) && is_array( $job['engine_data'] ) ) { - return $job['engine_data']; - } - - return array(); - } - - /** - * Get flows with consecutive failures or no_items above threshold. - * - * Uses cached flow health data when available. - * Only checks database flows (numeric IDs), not direct execution jobs. - * - * @param int $threshold Minimum consecutive count to flag as problem. - * @return array Array of [flow_id => counts_array] for problem flows. - */ - public function get_problem_flow_ids( int $threshold = 3 ): array { - // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared - $query = $this->wpdb->prepare( - "SELECT DISTINCT flow_id FROM %i WHERE flow_id != 'direct' AND flow_id REGEXP '^[0-9]+$'", - $this->table_name - ); - - // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared - $flow_ids = $this->wpdb->get_col( $query ); - - if ( empty( $flow_ids ) ) { - return array(); - } - - $problem_flows = array(); - - foreach ( $flow_ids as $flow_id ) { - $counts = $this->get_flow_health( $flow_id ); - - if ( $counts['consecutive_failures'] >= $threshold || $counts['consecutive_no_items'] >= $threshold ) { - $problem_flows[ $flow_id ] = $counts; - } - } - - return $problem_flows; - } - - /** - * Get recent jobs for a flow (limited). - * - * @param int|string $flow_id Flow ID. - * @param int $limit Max jobs to return. - * @return array Recent jobs, newest first. - */ - public function get_recent_jobs_for_flow( int|string $flow_id, int $limit = 10 ): array { - if ( empty( $flow_id ) ) { - return array(); - } - - if ( is_numeric( $flow_id ) && (int) $flow_id <= 0 ) { - return array(); - } - - // phpcs:ignore WordPress.DB.PreparedSQL.NotPrepared - // phpcs:disable WordPress.DB.PreparedSQL -- Table name from $wpdb->prefix, not user input. - $results = $this->wpdb->get_results( - $this->wpdb->prepare( - 'SELECT * FROM %i WHERE flow_id = %s ORDER BY created_at DESC LIMIT %d', - $this->table_name, - (string) $flow_id, - $limit - ), - ARRAY_A - ); - // phpcs:enable WordPress.DB.PreparedSQL - - return $results ? $results : array(); - } - - /** - * Get cached flow health or compute if missing. - * - * @param int|string $flow_id Flow ID. - * @return array Counts array. - */ - public function get_flow_health( int|string $flow_id ): array { - $cached = $this->get_flow_health_cache( $flow_id ); - if ( false !== $cached ) { - return $cached; - } - - $counts = $this->compute_consecutive_counts( $flow_id ); - $this->set_flow_health_cache( $flow_id, $counts ); - - return $counts; - } - - /** - * Update flow health cache when a job completes. - * - * Called via datamachine_job_complete hook. - * - * @param int $job_id Job ID that completed. - * @param string $status Final status. - */ - public function update_flow_health_cache( int $job_id, string $status ): void { - $job = $this->get_job( $job_id ); - if ( ! $job ) { - return; - } - - $flow_id = $job['flow_id']; - - // Skip direct execution jobs - if ( 'direct' === $flow_id || ! is_numeric( $flow_id ) ) { - return; - } - - // Compute fresh counts and cache - $counts = $this->compute_consecutive_counts( $flow_id ); - $this->set_flow_health_cache( $flow_id, $counts ); - } - - /** - * Get flow health from transient cache. - * - * @param int|string $flow_id Flow ID. - * @return array|false Cached counts or false if not cached. - */ - private function get_flow_health_cache( int|string $flow_id ): array|false { - return get_transient( "datamachine_flow_health_{$flow_id}" ); - } - - /** - * Set flow health in transient cache. - * - * @param int|string $flow_id Flow ID. - * @param array $counts Counts to cache. - */ - private function set_flow_health_cache( int|string $flow_id, array $counts ): void { - set_transient( "datamachine_flow_health_{$flow_id}", $counts, DAY_IN_SECONDS ); - } - - /** - * Compute consecutive counts from recent job history. - * - * @param int|string $flow_id Flow ID. - * @return array Counts array. - */ - private function compute_consecutive_counts( int|string $flow_id ): array { - $jobs = $this->get_recent_jobs_for_flow( $flow_id, 10 ); - - $result = array( - 'consecutive_failures' => 0, - 'consecutive_no_items' => 0, - 'latest_job' => $jobs[0] ?? null, - ); - - if ( empty( $jobs ) ) { - return $result; - } - - // Count consecutive failures from most recent - foreach ( $jobs as $job ) { - if ( \DataMachine\Core\JobStatus::isStatusFailure( $job['status'] ) ) { - ++$result['consecutive_failures']; - } else { - break; - } - } - - // Count consecutive no_items from most recent - foreach ( $jobs as $job ) { - if ( str_starts_with( $job['status'], 'completed_no_items' ) ) { - ++$result['consecutive_no_items']; - } else { - break; - } - } - - return $result; - } -} diff --git a/inc/Core/Database/Jobs/JobsStatus.php b/inc/Core/Database/Jobs/JobsStatus.php deleted file mode 100644 index 6aab7d180..000000000 --- a/inc/Core/Database/Jobs/JobsStatus.php +++ /dev/null @@ -1,132 +0,0 @@ -wpdb->update( - $this->table_name, - array( - 'status' => $status, - ), - array( 'job_id' => $job_id ), - array( '%s' ), // Format for data - array( '%d' ) // Format for WHERE - ); - - if ( false !== $updated ) { - RunMetrics::start( $job_id, array( 'status' => $status ) ); - } - - return false !== $updated; - } - - /** - * Update the status and completed_at time for a job. - * - * Accepts compound statuses like "agent_skipped - reason" via JobStatus validation. - * - * @param int $job_id The job ID. - * @param string $status The final status (any JobStatus final status, may be compound). - * @return bool True on success, false on failure. - */ - public function complete_job( int $job_id, string $status ): bool { - // Validate using JobStatus - supports compound statuses like "agent_skipped - reason" - if ( empty( $job_id ) || ! JobStatus::isStatusFinal( $status ) ) { - return false; - } - - // Truncate to fit varchar(255) column. Full reason is preserved in engine_data. - if ( strlen( $status ) > 255 ) { - $status = substr( $status, 0, 252 ) . '...'; - } - - $update_data = array( - 'status' => $status, - 'completed_at' => current_time( 'mysql', true ), - ); - - // phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.DirectDatabaseQuery.NoCaching - $updated = $this->wpdb->update( - $this->table_name, - $update_data, - array( 'job_id' => $job_id ), - array( '%s', '%s' ), - array( '%d' ) - ); - - if ( false !== $updated ) { - RunMetrics::complete( $job_id, $status ); - do_action( 'datamachine_job_complete', $job_id, $status ); - } - - return false !== $updated; - } - - /** - * Update job status. - * - * @param int $job_id The job ID. - * @param string $status The new status. - * @return bool True on success, false on failure. - */ - public function update_job_status( int $job_id, string $status ): bool { - - if ( empty( $job_id ) ) { - return false; - } - - // Truncate to fit varchar(255) column. - if ( strlen( $status ) > 255 ) { - $status = substr( $status, 0, 252 ) . '...'; - } - - $update_data = array( 'status' => $status ); - $format = array( '%s' ); - - // phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.DirectDatabaseQuery.NoCaching - $updated = $this->wpdb->update( - $this->table_name, - $update_data, - array( 'job_id' => $job_id ), - $format, - array( '%d' ) - ); - - return false !== $updated; - } -} diff --git a/inc/Engine/Actions/Handlers/JobCompleteHandler.php b/inc/Engine/Actions/Handlers/JobCompleteHandler.php index f7fcefd93..22612d696 100644 --- a/inc/Engine/Actions/Handlers/JobCompleteHandler.php +++ b/inc/Engine/Actions/Handlers/JobCompleteHandler.php @@ -20,8 +20,8 @@ class JobCompleteHandler { * @param string $status Job completion status. */ public static function handle( $job_id, $status ) { - $jobs_ops = new \DataMachine\Core\Database\Jobs\JobsOperations(); - $jobs_ops->update_flow_health_cache( $job_id, $status ); + $jobs_db = new \DataMachine\Core\Database\Jobs\Jobs(); + $jobs_db->update_flow_health_cache( $job_id, $status ); // Revert one-time flows to manual after execution. // The Action Scheduler single action auto-completes, but the scheduling_config diff --git a/tests/Unit/Core/WordPress/PostTrackingTest.php b/tests/Unit/Core/WordPress/PostTrackingTest.php index 8f6070b6d..779c9ef02 100644 --- a/tests/Unit/Core/WordPress/PostTrackingTest.php +++ b/tests/Unit/Core/WordPress/PostTrackingTest.php @@ -12,7 +12,6 @@ use DataMachine\Core\Database\Flows\Flows; use DataMachine\Core\Database\Jobs\Jobs; -use DataMachine\Core\Database\Jobs\JobsOperations; use DataMachine\Core\WordPress\PostTracking; use WP_UnitTestCase; @@ -195,8 +194,7 @@ private function create_flow( int $pipeline_id, string $flow_name = 'Test Flow', private function create_job( int $flow_id, int $pipeline_id ): int { $jobs_db = new Jobs(); - $ops = new JobsOperations( $jobs_db ); - $job_id = $ops->create_job( + $job_id = $jobs_db->create_job( array( 'flow_id' => $flow_id, 'pipeline_id' => $pipeline_id, diff --git a/tests/jobs-get-children-smoke.php b/tests/jobs-get-children-smoke.php index 741868594..6b85bf3a0 100644 --- a/tests/jobs-get-children-smoke.php +++ b/tests/jobs-get-children-smoke.php @@ -4,7 +4,7 @@ * * Run with: php tests/jobs-get-children-smoke.php * - * Verifies the row-shaping logic that JobsOperations::get_children + * Verifies the row-shaping logic that Jobs::get_children * applies to wpdb->get_results output: * - Empty result set → empty array. * - Each row's engine_data is JSON-decoded into an array. @@ -25,7 +25,7 @@ } /** - * Mirror of the row-shaping foreach inside JobsOperations::get_children. + * Mirror of the row-shaping foreach inside Jobs::get_children. * Kept literally byte-equivalent so any drift surfaces as a diff between * this harness and the source. */