Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
237 changes: 191 additions & 46 deletions crates/atomic-core/src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,68 @@ impl Database {
path: &Path,
create: bool,
pool_size: usize,
) -> Result<Self, AtomicCoreError> {
Self::open_with_pool_size_and_settings_cleanup(path, create, pool_size, false)
}

/// Acquire a read-only connection from the pool.
/// Tries each pooled connection via try_lock; if all are busy, creates a fresh one.
pub fn read_conn(&self) -> Result<ReadConn<'_>, AtomicCoreError> {
for slot in &self.read_pool {
if let Ok(guard) = slot.try_lock() {
return Ok(ReadConn::Pooled(guard));
}
}
// All pool slots busy — create a temporary connection
let conn = Connection::open(&self.db_path)?;
conn.set_prepared_statement_cache_capacity(STMT_CACHE_CAPACITY);
conn.execute_batch(&format!("{} PRAGMA query_only=ON;", BASE_PRAGMAS))?;
Ok(ReadConn::Temp(conn))
}

/// Create a new connection to the same database.
/// Registers sqlite-vec so the connection can query vec_chunks.
pub fn new_connection(&self) -> Result<Connection, AtomicCoreError> {
// sqlite-vec is registered via sqlite3_auto_extension in open_internal,
// which applies to all connections opened after that call.
let conn = Connection::open(&self.db_path)?;
conn.set_prepared_statement_cache_capacity(STMT_CACHE_CAPACITY);
conn.execute_batch(BASE_PRAGMAS)?;
Ok(conn)
}

/// Open with a larger read pool sized for server workloads.
/// Creates the DB and parent directories if they don't exist.
pub fn open_for_server(path: impl AsRef<Path>) -> Result<Self, AtomicCoreError> {
Self::open_with_pool_size(path.as_ref(), true, SERVER_READ_POOL_SIZE)
}

/// Open a registry-backed data database with a larger read pool.
///
/// This is the same as `open_for_server`, but runs the one migration step
/// that is only valid when this SQLite file is a per-database store whose
/// defaults live in registry.db.
pub fn open_for_server_with_registry(path: impl AsRef<Path>) -> Result<Self, AtomicCoreError> {
Self::open_with_pool_size_and_settings_cleanup(
path.as_ref(),
true,
SERVER_READ_POOL_SIZE,
true,
)
}

fn open_with_pool_size_and_settings_cleanup(
path: &Path,
create: bool,
pool_size: usize,
cleanup_legacy_seed_settings: bool,
) -> Result<Self, AtomicCoreError> {
// Register sqlite-vec extension
unsafe {
#[allow(clippy::missing_transmute_annotations)]
sqlite3_auto_extension(Some(std::mem::transmute(sqlite3_vec_init as *const ())));
}

// Create parent directory if needed
if create {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)?;
Expand All @@ -84,28 +138,18 @@ impl Database {
let conn = Connection::open(path)?;
conn.set_prepared_statement_cache_capacity(STMT_CACHE_CAPACITY);

// Base PRAGMAs + WAL size limit (64 MB) to prevent unbounded WAL growth
conn.execute_batch(&format!(
"{} PRAGMA journal_size_limit=67108864;",
BASE_PRAGMAS
))?;

// Checkpoint any WAL from a previous run to start clean
conn.execute_batch("PRAGMA wal_checkpoint(TRUNCATE);")?;

// Always run migrations — idempotent, handles both new and existing DBs
Self::run_migrations(&conn)?;
Self::run_migrations_internal(&conn, cleanup_legacy_seed_settings)?;

// Update query planner statistics so the optimizer has fresh data
conn.execute_batch("PRAGMA optimize=0x10002;")?;

// Warm the OS page cache and SQLite page cache by touching the indexes
// and table pages that the most common queries need on startup.
Self::warm_cache(&conn);

let db_path = path.to_path_buf();

// Pre-open read connections for the pool
let mut read_pool = Vec::with_capacity(pool_size);
for _ in 0..pool_size {
let rc = Connection::open(&db_path)?;
Expand All @@ -121,38 +165,6 @@ impl Database {
})
}

/// Acquire a read-only connection from the pool.
/// Tries each pooled connection via try_lock; if all are busy, creates a fresh one.
pub fn read_conn(&self) -> Result<ReadConn<'_>, AtomicCoreError> {
for slot in &self.read_pool {
if let Ok(guard) = slot.try_lock() {
return Ok(ReadConn::Pooled(guard));
}
}
// All pool slots busy — create a temporary connection
let conn = Connection::open(&self.db_path)?;
conn.set_prepared_statement_cache_capacity(STMT_CACHE_CAPACITY);
conn.execute_batch(&format!("{} PRAGMA query_only=ON;", BASE_PRAGMAS))?;
Ok(ReadConn::Temp(conn))
}

/// Create a new connection to the same database.
/// Registers sqlite-vec so the connection can query vec_chunks.
pub fn new_connection(&self) -> Result<Connection, AtomicCoreError> {
// sqlite-vec is registered via sqlite3_auto_extension in open_internal,
// which applies to all connections opened after that call.
let conn = Connection::open(&self.db_path)?;
conn.set_prepared_statement_cache_capacity(STMT_CACHE_CAPACITY);
conn.execute_batch(BASE_PRAGMAS)?;
Ok(conn)
}

/// Open with a larger read pool sized for server workloads.
/// Creates the DB and parent directories if they don't exist.
pub fn open_for_server(path: impl AsRef<Path>) -> Result<Self, AtomicCoreError> {
Self::open_with_pool_size(path.as_ref(), true, SERVER_READ_POOL_SIZE)
}

/// Walk the hot indexes and table pages into the OS + SQLite page caches.
/// Called once at startup so the first real queries don't pay cold-cache costs.
fn warm_cache(conn: &Connection) {
Expand Down Expand Up @@ -199,9 +211,20 @@ impl Database {
/// 1. Add a new `if version < N` block at the end (before the virtual-table section)
/// 2. End the block with `PRAGMA user_version = N;`
/// 3. Bump LATEST_VERSION
const LATEST_VERSION: i32 = 14;
const LATEST_VERSION: i32 = 15;

pub fn run_migrations(conn: &Connection) -> Result<(), AtomicCoreError> {
Self::run_migrations_internal(conn, false)
}

pub fn run_migrations_for_registry(conn: &Connection) -> Result<(), AtomicCoreError> {
Self::run_migrations_internal(conn, true)
}

fn run_migrations_internal(
conn: &Connection,
cleanup_legacy_seed_settings: bool,
) -> Result<(), AtomicCoreError> {
let version: i32 = conn.query_row("PRAGMA user_version", [], |row| row.get(0))?;

// --- V0 → V1: Baseline schema (tables + indexes) ---
Expand Down Expand Up @@ -733,6 +756,47 @@ impl Database {
)?;
}

// --- V14 → V15: Finish settings resolver migration ---
//
// Pre-this-version, every per-DB settings table was seeded with the
// entire `DEFAULT_SETTINGS` map at open time. The override-aware
// resolver in `AtomicCore::get_settings_with_source` treats any
// per-DB row as an override on top of the registry default. As long
// as those legacy rows match the registry value they look harmless
// — but the moment the user edits a setting at N=1 (which writes
// only to the registry), the registry diverges from the stale
// per-DB row and the row starts shadowing the new value, making
// edits appear to do nothing. Wipe the seeds so future resolver
// reads fall through to the registry cleanly.
//
// This cleanup is only valid for registry-backed data DBs. Standalone
// SQLite `AtomicCore::open_or_create` DBs store their canonical
// settings in this table, so plain Database opens preserve rows and
// only advance the schema version.
//
// In registry-backed deployments before this version, settings writes
// routed to the registry, so a per-DB row for one of these keys was
// definitionally a seed (never user-set). Per CLAUDE.md pre-alpha
// policy we accept erasing any in-flight per-DB overrides written by
// builds of this branch before the resolver landed.
if version < 15 {
if cleanup_legacy_seed_settings {
let keys: Vec<&str> = crate::settings::DEFAULT_SETTINGS
.iter()
.map(|(k, _)| *k)
.collect();
if !keys.is_empty() {
let placeholders = std::iter::repeat("?")
.take(keys.len())
.collect::<Vec<_>>()
.join(",");
let sql = format!("DELETE FROM settings WHERE key IN ({})", placeholders);
conn.execute(&sql, rusqlite::params_from_iter(keys.iter()))?;
}
}
conn.execute_batch("PRAGMA user_version = 15;")?;
}

// --- Triggers (recreated every startup to stay current) ---
conn.execute_batch(
"DROP TRIGGER IF EXISTS atom_tags_insert_count;
Expand Down Expand Up @@ -921,7 +985,13 @@ impl Database {
)?;
}

crate::settings::migrate_settings(conn)?;
// Per-DB settings tables intentionally start empty under the
// registry-defaults + per-DB-overrides model. Defaults live in
// `registry.db` (seeded by `Registry::open` via `migrate_settings_to`);
// per-DB rows only exist when the user explicitly overrides a value.
// Registry-backed opens run the V14→V15 cleanup above to purge any
// legacy seed rows so the resolver's "any per-DB row is an override"
// rule stays correct.

Ok(())
}
Expand Down Expand Up @@ -998,6 +1068,81 @@ mod tests {
use super::*;
use tempfile::NamedTempFile;

#[test]
fn test_v15_wipes_legacy_per_db_seed_rows() {
// Pre-V15, every per-DB connection got the entire DEFAULT_SETTINGS
// map seeded into its `settings` table. The resolver treats per-DB
// rows as overrides on top of registry defaults — the moment the
// user edits a setting at N=1 (writes to registry only), the
// registry diverges from the stale per-DB row and the row starts
// shadowing the new value, making edits look broken. V15 deletes
// those rows.
//
// Simulate the pre-V15 state by opening a fresh DB (which now runs
// *all* migrations including V15), forcibly downgrading the
// user_version, re-inserting seed rows, then re-running the
// migration.
let temp_file = NamedTempFile::new().unwrap();
let db = Database::open_or_create(temp_file.path()).unwrap();

{
let conn = db.conn.lock().unwrap();
// Force the schema back to V14 and re-seed the legacy rows.
conn.execute_batch("PRAGMA user_version = 14;").unwrap();
crate::settings::migrate_settings(&conn).unwrap();
let pre_count: i64 = conn
.query_row("SELECT COUNT(*) FROM settings", [], |row| row.get(0))
.unwrap();
assert!(
pre_count > 0,
"test setup: legacy rows should be present before V15"
);

// Run migrations again — V15 should fire and wipe the seed rows.
Database::run_migrations_for_registry(&conn).unwrap();

let version: i32 = conn
.query_row("PRAGMA user_version", [], |row| row.get(0))
.unwrap();
assert!(
version >= 15,
"user_version should be ≥15 after migration, got {version}"
);

// Every key in DEFAULT_SETTINGS must be gone — they were seeds.
for (key, _) in crate::settings::DEFAULT_SETTINGS {
let exists: bool = conn
.query_row("SELECT 1 FROM settings WHERE key = ?1", [key], |_| Ok(true))
.unwrap_or(false);
assert!(
!exists,
"V15 should have removed legacy seed row for '{key}'"
);
}
}
}

#[test]
fn test_v15_preserves_standalone_settings_rows() {
// Registry-less SQLite cores use the data DB's settings table as the
// canonical settings store. The V15 seed cleanup is only valid when a
// registry is attached, so the plain migration path must preserve
// those rows.
let temp_file = NamedTempFile::new().unwrap();
let db = Database::open_or_create(temp_file.path()).unwrap();

{
let conn = db.conn.lock().unwrap();
conn.execute_batch("PRAGMA user_version = 14;").unwrap();
crate::settings::set_setting(&conn, "chat_model", "custom/model").unwrap();

Database::run_migrations(&conn).unwrap();

let value = crate::settings::get_setting(&conn, "chat_model").unwrap();
assert_eq!(value, "custom/model");
}
}

#[test]
fn test_create_database() {
let temp_file = NamedTempFile::new().unwrap();
Expand Down
Loading
Loading