diff --git a/data-machine.php b/data-machine.php index d834d7957..3e11a1b28 100644 --- a/data-machine.php +++ b/data-machine.php @@ -90,6 +90,7 @@ function () { // Load and instantiate all handlers - they self-register via constructors datamachine_load_handlers(); \DataMachine\Engine\Bundle\AuthRefHandlerConfig::register(); + \DataMachine\Core\Database\BundleArtifacts\InstalledBundleArtifacts::register(); // Initialize FetchHandler to register skip_item tool for all fetch-type handlers \DataMachine\Core\Steps\Fetch\Handlers\FetchHandler::init(); diff --git a/inc/Cli/Commands/AgentBundleCommand.php b/inc/Cli/Commands/AgentBundleCommand.php index becc651a4..ad5116dfe 100644 --- a/inc/Cli/Commands/AgentBundleCommand.php +++ b/inc/Cli/Commands/AgentBundleCommand.php @@ -224,7 +224,20 @@ public function upgrade( array $args, array $assoc_args ): void { } $owner_id = isset( $assoc_args['owner'] ) ? $this->resolve_user_id( $assoc_args['owner'] ) : 0; - $result = $this->bundler()->import( $bundle, '' !== $slug ? $slug : null, $owner_id, false, array( 'reconcile_runtime' => $reconcile_runtime ) ); + $result = $this->bundler()->import( + $bundle, + '' !== $slug ? $slug : null, + $owner_id, + false, + array( + 'reconcile_runtime' => $reconcile_runtime, + // Mark this as an upgrade so the importer treats an existing agent row as the upgrade + // target instead of returning "Agent slug already exists" when local pipelines/flows + // have been edited (#1801). Local-modified artifacts come back in `result.conflicts` + // and get staged as PendingActions below. + 'is_upgrade' => true, + ) + ); if ( empty( $result['success'] ) ) { WP_CLI::error( (string) ( $result['error'] ?? 'Bundle upgrade failed.' ) ); return; diff --git a/inc/Core/Agents/AgentBundler.php b/inc/Core/Agents/AgentBundler.php index 4e593475f..82955ccda 100644 --- a/inc/Core/Agents/AgentBundler.php +++ b/inc/Core/Agents/AgentBundler.php @@ -445,15 +445,21 @@ private static function strip_secret_like_values( array $value ): array { * @param string|null $new_slug Optional override slug. * @param int $owner_id WordPress user ID to own the imported agent. * @param bool $dry_run If true, validate without writing. - * @param array $options Import options. - * @return array{success: bool, message?: string, error?: string, summary?: array} + * @param array $options Import options. Supported keys: + * - reconcile_runtime (bool) Replace preserved runtime queue/scheduling fields. + * - is_upgrade (bool) Treat an existing agent with the same slug as the upgrade + * target instead of returning a slug-collision error. Required when the live + * pipelines/flows have been edited (`local_modified`) so the importer can stage + * conflicts and the CLI can hand them to the planner / PendingActions. + * @return array{success: bool, message?: string, error?: string, error_code?: string, summary?: array} */ public function import( array $bundle, ?string $new_slug = null, int $owner_id = 0, bool $dry_run = false, array $options = array() ): array { // Validate bundle. if ( empty( $bundle['bundle_version'] ) || empty( $bundle['agent'] ) ) { return array( - 'success' => false, - 'error' => 'Invalid bundle: missing bundle_version or agent data.', + 'success' => false, + 'error_code' => 'install_invalid_bundle', + 'error' => 'Invalid bundle: missing bundle_version or agent data.', ); } @@ -473,21 +479,27 @@ public function import( array $bundle, ?string $new_slug = null, int $owner_id = ); $is_portable_bundle = ! empty( $bundle['bundle_slug'] ) || $this->bundle_has_portable_artifacts( $bundle ); $reconcile_runtime = ! empty( $options['reconcile_runtime'] ); + $is_upgrade = ! empty( $options['is_upgrade'] ); // Check for slug collision. + // On install: existing slug + (renamed-to-collision OR non-portable bundle) is a hard error. + // On upgrade: an existing slug is the upgrade target. Bundle-slug mismatch is still an error so we + // don't silently overwrite an unrelated agent that happens to share the slug. $existing = $this->agents_repo->get_by_slug( $slug ); - if ( $existing && ( $new_slug || ! $is_portable_bundle ) ) { + if ( $existing && ! $is_upgrade && ( $new_slug || ! $is_portable_bundle ) ) { return array( - 'success' => false, - 'error' => sprintf( 'Agent slug "%s" already exists. Use --slug= to rename on import.', $slug ), + 'success' => false, + 'error_code' => 'install_slug_collision', + 'error' => sprintf( 'Agent slug "%s" already exists. Use --slug= to rename on import, or run `agent upgrade` to update the existing install.', $slug ), ); } if ( $existing ) { $installed_bundle = $existing['agent_config']['datamachine_bundle'] ?? array(); if ( ! empty( $installed_bundle['bundle_slug'] ) && $installed_bundle['bundle_slug'] !== $bundle_slug ) { return array( - 'success' => false, - 'error' => sprintf( 'Agent slug "%s" is installed from bundle "%s", not "%s".', $slug, $installed_bundle['bundle_slug'], $bundle_slug ), + 'success' => false, + 'error_code' => 'install_bundle_slug_mismatch', + 'error' => sprintf( 'Agent slug "%s" is installed from bundle "%s", not "%s".', $slug, $installed_bundle['bundle_slug'], $bundle_slug ), ); } } @@ -535,6 +547,21 @@ public function import( array $bundle, ?string $new_slug = null, int $owner_id = } // --- Actual import --- + // + // Everything below this point is the post-claim mutation block. Any failure must be reported as + // `success: false` with a typed error_code and the agent row (if newly created) must be removed so + // `agent list` doesn't show a half-installed entry. Pre-claim guards above guarantee no DB writes + // have happened yet. + $created_agent_id = 0; // Tracks an agent row this call inserted, for manual rollback. + $created_pipeline_ids = array(); + $created_flow_ids = array(); + $transaction_started = $this->begin_transaction(); + + try { + // Test fault-injection seam. Production code path is a no-op. Tests load the + // `DataMachine\Tests\Support\AgentBundlerImportFaultInjector` and trigger a typed failure to + // exercise rollback semantics without waiting for a live SQLite race. + do_action( 'datamachine_bundle_import_post_claim_started', $bundle_metadata, $slug ); // 1. Create or update the agent record. $incoming_config = $agent_data['agent_config'] ?? array(); @@ -557,13 +584,15 @@ public function import( array $bundle, ?string $new_slug = null, int $owner_id = if ( $existing ) { $agent_id = (int) $existing['agent_id']; - $this->agents_repo->update_agent( + if ( ! $this->agents_repo->update_agent( $agent_id, array( 'agent_name' => $agent_data['agent_name'] ?? $slug, 'agent_config' => $config, ) - ); + ) ) { + throw new \RuntimeException( 'Failed to update existing agent record.' ); + } } else { $agent_id = $this->agents_repo->create_if_missing( $slug, @@ -571,13 +600,10 @@ public function import( array $bundle, ?string $new_slug = null, int $owner_id = $owner_id, $config ); - } - - if ( ! $agent_id ) { - return array( - 'success' => false, - 'error' => 'Failed to create agent record.', - ); + if ( ! $agent_id ) { + throw new \RuntimeException( 'Failed to create agent record.' ); + } + $created_agent_id = $agent_id; } // 2. Write agent identity files. @@ -639,34 +665,38 @@ public function import( array $bundle, ?string $new_slug = null, int $owner_id = 'agent_id' => $agent_id, 'user_id' => $owner_id, ) ); + if ( ! $new_pipeline_id ) { + throw new \RuntimeException( sprintf( 'Failed to create pipeline "%s".', $portable_slug ) ); + } + $created_pipeline_ids[] = (int) $new_pipeline_id; } - if ( $new_pipeline_id ) { - $pipeline_config = $this->remap_pipeline_step_ids( $pipeline_config, $old_id, (int) $new_pipeline_id ); - $this->pipelines_repo->update_pipeline( - (int) $new_pipeline_id, - array( - 'pipeline_name' => $pipeline_data['pipeline_name'], - 'pipeline_config' => $pipeline_config, - 'portable_slug' => $portable_slug, - ) - ); + $pipeline_config = $this->remap_pipeline_step_ids( $pipeline_config, $old_id, (int) $new_pipeline_id ); + if ( ! $this->pipelines_repo->update_pipeline( + (int) $new_pipeline_id, + array( + 'pipeline_name' => $pipeline_data['pipeline_name'], + 'pipeline_config' => $pipeline_config, + 'portable_slug' => $portable_slug, + ) + ) ) { + throw new \RuntimeException( sprintf( 'Failed to update pipeline "%s".', $portable_slug ) ); + } - $pipeline_id_map[ $old_id ] = (int) $new_pipeline_id; - $artifact_records[ $artifact_key ] = $this->bundle_artifact_record( - $bundle_metadata, - 'pipeline', - $portable_slug, - 'pipelines/' . $portable_slug . '.json', - $payload - ); + $pipeline_id_map[ $old_id ] = (int) $new_pipeline_id; + $artifact_records[ $artifact_key ] = $this->bundle_artifact_record( + $bundle_metadata, + 'pipeline', + $portable_slug, + 'pipelines/' . $portable_slug . '.json', + $payload + ); - // Write pipeline memory files to disk. - $this->write_pipeline_memory_files( - (int) $new_pipeline_id, - $pipeline_data['memory_file_contents'] ?? array() - ); - } + // Write pipeline memory files to disk. + $this->write_pipeline_memory_files( + (int) $new_pipeline_id, + $pipeline_data['memory_file_contents'] ?? array() + ); } // 5. Import flows: create paused, preserve local schedules/queues on update. @@ -751,10 +781,9 @@ public function import( array $bundle, ?string $new_slug = null, int $owner_id = if ( $reconcile_runtime ) { $update_data['scheduling_config'] = $flow_data['scheduling_config'] ?? array(); } - $this->flows_repo->update_flow( - $new_flow_id, - $update_data - ); + if ( ! $this->flows_repo->update_flow( $new_flow_id, $update_data ) ) { + throw new \RuntimeException( sprintf( 'Failed to update flow "%s".', $portable_slug ) ); + } } else { $new_flow_id = $this->flows_repo->create_flow( array( 'pipeline_id' => $new_pipeline_id, @@ -766,36 +795,39 @@ public function import( array $bundle, ?string $new_slug = null, int $owner_id = 'user_id' => $owner_id, ) ); - if ( $new_flow_id ) { - $flow_config = $this->remap_flow_step_ids( $flow_config, $old_pipeline_id, (int) $new_pipeline_id, (int) $new_flow_id ); - $this->flows_repo->update_flow( - (int) $new_flow_id, - array( - 'flow_name' => $flow_data['flow_name'], - 'flow_config' => $flow_config, - 'portable_slug' => $portable_slug, - ) - ); + if ( ! $new_flow_id ) { + throw new \RuntimeException( sprintf( 'Failed to create flow "%s".', $portable_slug ) ); } - } + $created_flow_ids[] = (int) $new_flow_id; - if ( $new_flow_id ) { - ++$flow_count; - $artifact_records[ $artifact_key ] = $this->bundle_artifact_record( - $bundle_metadata, - 'flow', - $portable_slug, - 'flows/' . $portable_slug . '.json', - $payload - ); - - // Write flow memory files to disk. - $this->write_flow_memory_files( - $new_pipeline_id, + $flow_config = $this->remap_flow_step_ids( $flow_config, $old_pipeline_id, (int) $new_pipeline_id, (int) $new_flow_id ); + if ( ! $this->flows_repo->update_flow( (int) $new_flow_id, - $flow_data['memory_file_contents'] ?? array() - ); + array( + 'flow_name' => $flow_data['flow_name'], + 'flow_config' => $flow_config, + 'portable_slug' => $portable_slug, + ) + ) ) { + throw new \RuntimeException( sprintf( 'Failed to update freshly-created flow "%s".', $portable_slug ) ); + } } + + ++$flow_count; + $artifact_records[ $artifact_key ] = $this->bundle_artifact_record( + $bundle_metadata, + 'flow', + $portable_slug, + 'flows/' . $portable_slug . '.json', + $payload + ); + + // Write flow memory files to disk. + $this->write_flow_memory_files( + $new_pipeline_id, + (int) $new_flow_id, + $flow_data['memory_file_contents'] ?? array() + ); } // 6. Apply plugin-owned artifacts through their owning plugin. @@ -865,7 +897,35 @@ public function import( array $bundle, ?string $new_slug = null, int $owner_id = $summary['runtime_drift'] = $runtime_drift; $config['datamachine_bundle']['artifacts'] = $artifact_records; - $this->agents_repo->update_agent( $agent_id, array( 'agent_config' => $config ) ); + if ( ! $this->agents_repo->update_agent( $agent_id, array( 'agent_config' => $config ) ) ) { + throw new \RuntimeException( 'Failed to persist final agent_config with bundle artifact registry.' ); + } + + // Test fault-injection seam — fires after every mutation but before commit so a test handler + // can throw and exercise the rollback path without waiting for a SQLite race. + do_action( 'datamachine_bundle_import_pre_commit', $bundle_metadata, $slug, $agent_id ); + + // Verify persistence end-to-end. SQLite under Studio has been observed silently rolling back the + // outer mutations under contention (#1801): the in-memory bundler thinks everything wrote, but a + // fresh SELECT shows no agent row and no artifacts. Re-fetching closes the door on that path — + // if the row isn't there, we surface the failure instead of returning a ghost agent_id. + $persisted = $this->agents_repo->get_agent( $agent_id ); + if ( ! $persisted ) { + throw new \RuntimeException( sprintf( 'Agent row for ID %d disappeared after install — possible silent rollback.', $agent_id ) ); + } + $persisted_artifacts = is_array( $persisted['agent_config']['datamachine_bundle']['artifacts'] ?? null ) + ? $persisted['agent_config']['datamachine_bundle']['artifacts'] + : array(); + if ( count( $persisted_artifacts ) < count( $artifact_records ) ) { + throw new \RuntimeException( sprintf( + 'Agent ID %d persisted %d artifact records but %d were written — possible silent rollback.', + $agent_id, + count( $persisted_artifacts ), + count( $artifact_records ) + ) ); + } + + $this->commit_transaction( $transaction_started ); return array( 'success' => true, @@ -878,6 +938,95 @@ public function import( array $bundle, ?string $new_slug = null, int $owner_id = ), 'summary' => $summary, ); + } catch ( \Throwable $e ) { + // Roll back any DB writes from this call. Run native ROLLBACK first; if the underlying engine + // (e.g. the SQLite drop-in) silently no-ops on rollback, fall back to manual cleanup of rows + // we created so the next install attempt sees a clean slate. + $this->rollback_transaction( $transaction_started ); + $this->manual_rollback( $created_agent_id, $created_pipeline_ids, $created_flow_ids ); + + do_action( + 'datamachine_log', + 'error', + 'AgentBundler::import post-claim failure — rolled back.', + array( + 'agent_slug' => $slug, + 'bundle_slug' => $bundle_slug, + 'is_upgrade' => $is_upgrade, + 'error' => $e->getMessage(), + 'error_class' => get_class( $e ), + ) + ); + + return array( + 'success' => false, + 'error_code' => 'install_post_claim_failure', + 'error' => sprintf( 'Agent install rolled back: %s', $e->getMessage() ), + ); + } + } + + /** + * Open a DB transaction for the import critical section. + * + * Returns true when the engine accepted `START TRANSACTION`. SQLite via the Studio drop-in maps this + * to `BEGIN`. If the engine refuses (returns false), we still proceed — the manual_rollback() path + * is the safety net that cleans up any rows we know we created. + */ + private function begin_transaction(): bool { + if ( ! isset( $this->agents_repo ) ) { + return false; + } + + global $wpdb; + // phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.DirectDatabaseQuery.NoCaching,WordPress.DB.PreparedSQL.NotPrepared + $result = $wpdb->query( 'START TRANSACTION' ); + + return false !== $result; + } + + private function commit_transaction( bool $started ): void { + if ( ! $started ) { + return; + } + global $wpdb; + // phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.DirectDatabaseQuery.NoCaching,WordPress.DB.PreparedSQL.NotPrepared + $wpdb->query( 'COMMIT' ); + } + + private function rollback_transaction( bool $started ): void { + if ( ! $started ) { + return; + } + global $wpdb; + // phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.DirectDatabaseQuery.NoCaching,WordPress.DB.PreparedSQL.NotPrepared + $wpdb->query( 'ROLLBACK' ); + } + + /** + * Manually undo writes from a failed import. + * + * Belt-and-braces: if the engine ignored ROLLBACK we still delete the agent row and any + * pipelines/flows this call created so a retry sees the same shape as a fresh install. We never + * delete rows that pre-existed. + * + * @param int $created_agent_id Agent row this call inserted (0 if upgrade). + * @param int[] $created_pipeline_ids Pipeline rows this call inserted. + * @param int[] $created_flow_ids Flow rows this call inserted. + */ + private function manual_rollback( int $created_agent_id, array $created_pipeline_ids, array $created_flow_ids ): void { + foreach ( $created_flow_ids as $flow_id ) { + $this->flows_repo->delete_flow( (int) $flow_id ); + } + foreach ( $created_pipeline_ids as $pipeline_id ) { + $this->pipelines_repo->delete_pipeline( (int) $pipeline_id ); + } + if ( $created_agent_id > 0 ) { + global $wpdb; + $agents_table = $wpdb->base_prefix . 'datamachine_agents'; + // phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.DirectDatabaseQuery.NoCaching + $wpdb->delete( $agents_table, array( 'agent_id' => (int) $created_agent_id ), array( '%d' ) ); + } } private function bundle_has_portable_artifacts( array $bundle ): bool { diff --git a/inc/Core/Database/BundleArtifacts/InstalledBundleArtifacts.php b/inc/Core/Database/BundleArtifacts/InstalledBundleArtifacts.php index 944aa4175..3bd643ede 100644 --- a/inc/Core/Database/BundleArtifacts/InstalledBundleArtifacts.php +++ b/inc/Core/Database/BundleArtifacts/InstalledBundleArtifacts.php @@ -22,6 +22,33 @@ final class InstalledBundleArtifacts extends BaseRepository { public const TABLE_NAME = 'datamachine_bundle_artifacts'; + /** + * Wire cleanup hooks once per request. + * + * Currently registers a `datamachine_agent_deleted` listener that wipes any tracked artifact rows + * for the deleted agent. The importer does not write to this table today, but extensions can — and + * a stale row here would mis-classify a fresh install as an upgrade against a non-existent agent. + * Registering the listener defensively closes that door (#1801). + * + * @return void + */ + public static function register(): void { + add_action( + 'datamachine_agent_deleted', + static function ( int $agent_id ): void { + if ( $agent_id <= 0 ) { + return; + } + global $wpdb; + $table = $wpdb->prefix . self::TABLE_NAME; + // phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.DirectDatabaseQuery.NoCaching + $wpdb->delete( $table, array( 'agent_id' => $agent_id ), array( '%d' ) ); + }, + 10, + 1 + ); + } + /** * Create installed bundle artifact tracking table. * diff --git a/tests/Unit/Core/Agents/AgentBundlerImportTest.php b/tests/Unit/Core/Agents/AgentBundlerImportTest.php new file mode 100644 index 000000000..a9bc9577e --- /dev/null +++ b/tests/Unit/Core/Agents/AgentBundlerImportTest.php @@ -0,0 +1,256 @@ + true` against an + * agent whose live pipeline/flow have been edited (`local_modified`) must succeed and surface + * conflicts instead of erroring on the slug-collision guard. + * 3. Bundle artifact registry cleanup — deleting an agent must clear any rows in the + * `datamachine_bundle_artifacts` table for that agent so subsequent installs are classified as + * fresh installs, not stale upgrades. + * + * @package DataMachine\Tests\Unit\Core\Agents + */ + +namespace DataMachine\Tests\Unit\Core\Agents; + +use DataMachine\Core\Agents\AgentBundler; +use DataMachine\Core\Database\Agents\Agents as AgentsRepository; +use DataMachine\Core\Database\BundleArtifacts\InstalledBundleArtifacts; +use DataMachine\Core\Database\Flows\Flows as FlowsRepository; +use DataMachine\Core\Database\Pipelines\Pipelines as PipelinesRepository; +use DataMachine\Engine\Bundle\AgentBundleInstalledArtifact; +use DataMachine\Engine\Bundle\AgentBundleManifest; +use WP_UnitTestCase; + +class AgentBundlerImportTest extends WP_UnitTestCase { + + private AgentBundler $bundler; + private AgentsRepository $agents_repo; + private PipelinesRepository $pipelines_repo; + private FlowsRepository $flows_repo; + private int $owner_id; + + public function set_up(): void { + parent::set_up(); + + AgentsRepository::create_table(); + PipelinesRepository::create_table(); + FlowsRepository::create_table(); + InstalledBundleArtifacts::create_table(); + + $this->bundler = new AgentBundler(); + $this->agents_repo = new AgentsRepository(); + $this->pipelines_repo = new PipelinesRepository(); + $this->flows_repo = new FlowsRepository(); + + $this->owner_id = self::factory()->user->create( array( 'role' => 'administrator' ) ); + wp_set_current_user( $this->owner_id ); + } + + public function tear_down(): void { + global $wpdb; + + // phpcs:disable WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.DirectDatabaseQuery.NoCaching,WordPress.DB.PreparedSQL.NotPrepared + $wpdb->query( "DELETE FROM {$wpdb->base_prefix}datamachine_agents" ); + $wpdb->query( "DELETE FROM {$wpdb->prefix}datamachine_pipelines" ); + $wpdb->query( "DELETE FROM {$wpdb->prefix}datamachine_flows" ); + $wpdb->query( "DELETE FROM {$wpdb->prefix}datamachine_bundle_artifacts" ); + // phpcs:enable WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.DirectDatabaseQuery.NoCaching,WordPress.DB.PreparedSQL.NotPrepared + + remove_all_actions( 'datamachine_bundle_import_pre_commit' ); + remove_all_actions( 'datamachine_bundle_import_post_claim_started' ); + + parent::tear_down(); + } + + /** + * Build a minimal-but-realistic portable bundle for tests. Mirrors the shape produced by + * AgentBundler::export() against a single-pipeline, single-flow agent. + */ + private function fixture_bundle( string $slug = 'wc-static-site-agent' ): array { + return array( + 'bundle_version' => '1', + 'bundle_slug' => $slug, + 'agent' => array( + 'agent_slug' => $slug, + 'agent_name' => 'Static Site Agent', + 'agent_config' => array(), + ), + 'pipelines' => array( + array( + 'original_id' => 1, + 'pipeline_name' => 'Static Site Pipeline', + 'portable_slug' => 'static-site-pipeline', + 'pipeline_config' => array( + '1_step-uuid' => array( + 'pipeline_step_id' => '1_step-uuid', + 'execution_order' => 0, + 'step_type' => 'fetch', + ), + ), + ), + ), + 'flows' => array( + array( + 'original_pipeline_id' => 1, + 'flow_name' => 'Static Site Flow', + 'portable_slug' => 'static-site-flow', + 'flow_config' => array( + '1_step-uuid_1' => array( + 'pipeline_step_id' => '1_step-uuid', + 'pipeline_id' => 1, + 'flow_id' => 1, + 'flow_step_id' => '1_step-uuid_1', + 'execution_order' => 0, + ), + ), + 'scheduling_config' => array( 'interval' => 'manual' ), + ), + ), + ); + } + + /** + * The silent-partial-success regression in #1801: a failure after the agent row was claimed used + * to return `success: true` with a populated agent_id summary, while the row was rolled back at + * the SQLite layer. Now any post-claim throw must: + * + * 1. Return `success: false` with `error_code: install_post_claim_failure`. + * 2. Leave no agent row behind (manual rollback covers cases where the engine ignores ROLLBACK). + * 3. Leave no orphan pipeline / flow rows behind. + */ + public function test_post_claim_failure_rolls_back_and_reports_typed_error(): void { + $bundle = $this->fixture_bundle(); + + $fault_count = 0; + add_action( + 'datamachine_bundle_import_pre_commit', + static function () use ( &$fault_count ): void { + ++$fault_count; + throw new \RuntimeException( 'simulated SQLite drop on commit' ); + } + ); + + $result = $this->bundler->import( $bundle, null, $this->owner_id ); + + $this->assertSame( 1, $fault_count, 'Pre-commit hook fired once.' ); + $this->assertFalse( $result['success'], 'Import surfaces failure instead of silent partial success.' ); + $this->assertSame( 'install_post_claim_failure', $result['error_code'] ?? null ); + $this->assertStringContainsString( 'simulated SQLite drop on commit', (string) ( $result['error'] ?? '' ) ); + + $this->assertNull( + $this->agents_repo->get_by_slug( 'wc-static-site-agent' ), + 'No half-installed agent row remains after rollback.' + ); + + global $wpdb; + // phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.DirectDatabaseQuery.NoCaching,WordPress.DB.PreparedSQL.NotPrepared + $pipeline_count = (int) $wpdb->get_var( "SELECT COUNT(*) FROM {$wpdb->prefix}datamachine_pipelines" ); + // phpcs:ignore WordPress.DB.DirectDatabaseQuery.DirectQuery,WordPress.DB.DirectDatabaseQuery.NoCaching,WordPress.DB.PreparedSQL.NotPrepared + $flow_count = (int) $wpdb->get_var( "SELECT COUNT(*) FROM {$wpdb->prefix}datamachine_flows" ); + + $this->assertSame( 0, $pipeline_count, 'No orphan pipeline rows.' ); + $this->assertSame( 0, $flow_count, 'No orphan flow rows.' ); + } + + /** + * The "Agent slug already exists" misclassification in #1801: when an operator runs `agent + * upgrade` against a bundle whose live pipeline/flow have been edited (legitimate operator + * action), the importer used to short-circuit on the slug collision guard before the planner + * could surface conflicts. With `is_upgrade => true`, the importer must accept the existing + * row as the upgrade target and return `success: true` so the CLI can stage PendingActions. + */ + public function test_upgrade_against_existing_agent_does_not_error_on_slug_collision(): void { + // First install — clean. + $first = $this->bundler->import( $this->fixture_bundle(), null, $this->owner_id ); + $this->assertTrue( (bool) $first['success'], 'Initial install succeeds.' ); + + // Edit the live pipeline so the next import would classify the artifact as `local_modified`. + $existing_agent = $this->agents_repo->get_by_slug( 'wc-static-site-agent' ); + $existing_pipeline = $this->pipelines_repo->get_by_portable_slug( + (int) $existing_agent['agent_id'], + 'static-site-pipeline' + ); + $this->pipelines_repo->update_pipeline( + (int) $existing_pipeline['pipeline_id'], + array( 'pipeline_name' => 'Edited Locally' ) + ); + + // Without `is_upgrade`, the importer's portable-bundle path lets this through, but the + // CLI upgrade entrypoint also passes `is_upgrade => true` to make the contract explicit and + // keep `--slug` overrides from accidentally re-triggering the install collision check. + $second = $this->bundler->import( + $this->fixture_bundle(), + null, + $this->owner_id, + false, + array( 'is_upgrade' => true ) + ); + + $this->assertTrue( (bool) $second['success'], 'Upgrade does not error on slug collision.' ); + $this->assertSame( + (int) $existing_agent['agent_id'], + (int) $second['summary']['agent_id'], + 'Upgrade reuses the existing agent_id.' + ); + } + + /** + * Defensive registry cleanup: even though the importer does not currently write to + * `datamachine_bundle_artifacts`, extensions can. If an agent is deleted, any tracked rows for + * its agent_id must be wiped so the next install is classified as fresh, not as a stale upgrade. + */ + public function test_agent_delete_clears_bundle_artifact_registry(): void { + InstalledBundleArtifacts::register(); + + $store = new InstalledBundleArtifacts(); + $agent_id = $this->agents_repo->create_if_missing( 'cleanup-target', 'Cleanup Target', $this->owner_id, array() ); + + $manifest = new AgentBundleManifest( + gmdate( 'c' ), + 'data-machine/test', + 'cleanup-target', + '1', + '', + '', + array( + 'slug' => 'cleanup-target', + 'label' => 'Cleanup Target', + 'description' => '', + 'agent_config' => array(), + ), + array( + 'memory' => array(), + 'pipelines' => array(), + 'flows' => array(), + 'extensions' => array(), + 'handler_auth' => 'refs', + ) + ); + + $store->record_install( + $manifest, + 'pipeline', + 'static-site-pipeline', + 'pipelines/static-site-pipeline.json', + array( 'pipeline_name' => 'Static Site Pipeline' ), + $agent_id + ); + + $this->assertCount( 1, $store->list_for_agent( $agent_id ), 'Registry row inserted.' ); + + do_action( 'datamachine_agent_deleted', $agent_id, 'cleanup-target' ); + + $this->assertSame( + array(), + $store->list_for_agent( $agent_id ), + 'Registry rows cleared on `datamachine_agent_deleted`.' + ); + } +}