From a935c0f763da26484d07e1c0e180cd9caf9ece97 Mon Sep 17 00:00:00 2001 From: Juan P Lopez Date: Thu, 16 Apr 2026 11:16:55 -0500 Subject: [PATCH 01/12] refactor(bdk): stabilize sqlx wallet db cache semantics Restructure SqlxWalletDb internals to isolate context, batch state, and shared caches while keeping BDK trait behavior compatible. This avoids eager cache hydration side effects, makes cache errors recoverable, and commits DB writes before cache mutation for safer sync behavior at large wallet scale. --- src/bdk/pg/mod.rs | 234 ++++++----------------------------- src/bdk/pg/script_pubkeys.rs | 131 ++++++++++---------- 2 files changed, 102 insertions(+), 263 deletions(-) diff --git a/src/bdk/pg/mod.rs b/src/bdk/pg/mod.rs index 310fcde1..055fc107 100644 --- a/src/bdk/pg/mod.rs +++ b/src/bdk/pg/mod.rs @@ -21,7 +21,6 @@ use index::Indexes; use script_pubkeys::ScriptPubkeys; use std::{ collections::HashMap, - sync::atomic::{AtomicBool, Ordering}, sync::{Arc, Mutex, MutexGuard}, }; pub(super) use sync_times::SyncTimes; @@ -31,12 +30,6 @@ pub use utxos::*; type ScriptPubkeyCache = HashMap; type TransactionCache = HashMap; -#[derive(Copy, Clone, Eq, PartialEq)] -enum TxLookupMode { - Any, - RequireRaw, -} - #[derive(Clone)] struct WalletDbContext { rt: Handle, @@ -65,9 +58,6 @@ struct WalletBatchState { struct WalletCache { script_pubkeys: Arc>, transactions: Arc>, - // Process-local hint: true means this instance has already hydrated raw tx details - // from the DB at least once. It is intentionally not synchronized across processes. - raw_txs_fully_loaded: Arc, } impl WalletCache { @@ -75,7 +65,6 @@ impl WalletCache { Self { script_pubkeys: Arc::new(Mutex::new(HashMap::new())), transactions: Arc::new(Mutex::new(HashMap::new())), - raw_txs_fully_loaded: Arc::new(AtomicBool::new(false)), } } @@ -138,19 +127,6 @@ impl WalletCache { Ok(()) } - fn all_txs(&self) -> Result, bdk::Error> { - let cache = self.lock_transactions()?; - Ok(cache.values().cloned().collect()) - } - - fn raw_txs_fully_loaded(&self) -> bool { - self.raw_txs_fully_loaded.load(Ordering::Acquire) - } - - fn set_raw_txs_fully_loaded(&self) { - self.raw_txs_fully_loaded.store(true, Ordering::Release); - } - fn remove_tx(&self, txid: &Txid) -> Result<(), bdk::Error> { let mut cache = self.lock_transactions()?; cache.remove(txid); @@ -165,10 +141,6 @@ pub struct SqlxWalletDb { } impl SqlxWalletDb { - fn unsupported_operation(operation: &str) -> bdk::Error { - bdk::Error::Generic(format!("{operation} is not supported by SqlxWalletDb")) - } - pub fn new(pool: PgPool, keychain_id: KeychainId) -> Self { Self { ctx: WalletDbContext::new(pool, keychain_id), @@ -228,27 +200,13 @@ impl SqlxWalletDb { Ok(None) } - fn lookup_tx_with_mode( - &self, - txid: &Txid, - mode: TxLookupMode, - ) -> Result, bdk::Error> { + fn lookup_tx(&self, txid: &Txid) -> Result, bdk::Error> { if let Some(tx) = self.batch.txs.get(txid) { - if mode == TxLookupMode::Any || tx.transaction.is_some() { - return Ok(Some(tx.clone())); - } - - return Ok(None); + return Ok(Some(tx.clone())); } if let Some(tx) = self.cache.get_tx(txid)? { - if mode == TxLookupMode::Any || tx.transaction.is_some() { - return Ok(Some(tx)); - } - - if self.cache.raw_txs_fully_loaded() { - return Ok(None); - } + return Ok(Some(tx)); } let found = self @@ -256,38 +214,12 @@ impl SqlxWalletDb { .rt .block_on(async { self.transactions_repo().find_by_id(txid).await })?; - // DB rows represent persisted TransactionDetails; this store does not persist a - // "summary-only" transaction format. A DB hit is therefore valid for both lookup - // modes (`Any` and `RequireRaw`). - if let Some(tx) = &found { self.cache.insert_tx(tx.txid, tx.clone())?; } Ok(found) } - - fn lookup_tx(&self, txid: &Txid) -> Result, bdk::Error> { - self.lookup_tx_with_mode(txid, TxLookupMode::Any) - } - - fn overlay_batch_txs( - mut txs: HashMap, - batch_txs: &HashMap, - include_raw: bool, - ) -> HashMap { - if include_raw { - txs.extend(batch_txs.iter().map(|(id, tx)| (*id, tx.clone()))); - } else { - txs.extend(batch_txs.iter().map(|(id, tx)| { - let mut tx = tx.clone(); - tx.transaction = None; - (*id, tx) - })); - } - - txs - } } impl BatchOperations for SqlxWalletDb { @@ -307,7 +239,7 @@ impl BatchOperations for SqlxWalletDb { } fn set_raw_tx(&mut self, _: &Transaction) -> Result<(), bdk::Error> { - Err(Self::unsupported_operation("set_raw_tx")) + unimplemented!() } fn set_tx(&mut self, tx: &TransactionDetails) -> Result<(), bdk::Error> { @@ -316,16 +248,12 @@ impl BatchOperations for SqlxWalletDb { } fn set_last_index(&mut self, kind: KeychainKind, idx: u32) -> Result<(), bdk::Error> { - // NOTE: This write is intentionally immediate because BDK may call it outside of - // `commit_batch` flow. self.ctx .rt .block_on(async { self.indexes_repo().persist_last_index(kind, idx).await }) } fn set_sync_time(&mut self, time: SyncTime) -> Result<(), bdk::Error> { - // NOTE: This write is intentionally immediate because BDK may call it outside of - // `commit_batch` flow. self.ctx .rt .block_on(async { self.sync_times_repo().persist(time).await }) @@ -336,13 +264,13 @@ impl BatchOperations for SqlxWalletDb { _: KeychainKind, _: u32, ) -> Result, bdk::Error> { - Err(Self::unsupported_operation("del_script_pubkey_from_path")) + unimplemented!() } fn del_path_from_script_pubkey( &mut self, _: &Script, ) -> Result, bdk::Error> { - Err(Self::unsupported_operation("del_path_from_script_pubkey")) + unimplemented!() } fn del_utxo(&mut self, outpoint: &OutPoint) -> Result, bdk::Error> { self.ctx @@ -350,7 +278,7 @@ impl BatchOperations for SqlxWalletDb { .block_on(async { self.utxos_repo().delete(outpoint).await }) } fn del_raw_tx(&mut self, _: &Txid) -> Result, bdk::Error> { - Err(Self::unsupported_operation("del_raw_tx")) + unimplemented!() } fn del_tx( @@ -371,10 +299,10 @@ impl BatchOperations for SqlxWalletDb { Ok(deleted) } fn del_last_index(&mut self, _: KeychainKind) -> Result, bdk::Error> { - Err(Self::unsupported_operation("del_last_index")) + unimplemented!() } fn del_sync_time(&mut self) -> Result, bdk::Error> { - Err(Self::unsupported_operation("del_sync_time")) + unimplemented!() } } @@ -410,36 +338,16 @@ impl Database for SqlxWalletDb { .block_on(async { self.utxos_repo().list_local_utxos().await }) } fn iter_raw_txs(&self) -> Result, bdk::Error> { - Err(Self::unsupported_operation("iter_raw_txs")) - } - - fn iter_txs(&self, include_raw: bool) -> Result, bdk::Error> { - let txs = if include_raw { - if self.cache.raw_txs_fully_loaded() { - self.cache - .all_txs()? - .into_iter() - .map(|tx| (tx.txid, tx)) - .collect() - } else { - let loaded = self - .ctx - .rt - .block_on(async { self.transactions_repo().load_all().await })?; - self.cache - .extend_txs(loaded.iter().map(|(txid, tx)| (*txid, tx.clone())))?; - self.cache.set_raw_txs_fully_loaded(); - loaded - } - } else { - self.ctx - .rt - .block_on(async { self.transactions_repo().load_all_summaries().await })? - }; + unimplemented!() + } - Ok(Self::overlay_batch_txs(txs, &self.batch.txs, include_raw) - .into_values() - .collect()) + fn iter_txs(&self, _: bool) -> Result, bdk::Error> { + let mut txs = self + .ctx + .rt + .block_on(async { self.transactions_repo().load_all().await })?; + txs.extend(self.batch.txs.iter().map(|(id, tx)| (*id, tx.clone()))); + Ok(txs.into_values().collect()) } fn get_script_pubkey_from_path( @@ -463,22 +371,15 @@ impl Database for SqlxWalletDb { .block_on(async { self.utxos_repo().find(outpoint).await }) } fn get_raw_tx(&self, tx_id: &Txid) -> Result, bdk::Error> { - self.lookup_tx_with_mode(tx_id, TxLookupMode::RequireRaw) + self.lookup_tx(tx_id) .map(|tx| tx.and_then(|tx| tx.transaction)) } fn get_tx( &self, tx_id: &Txid, - include_raw: bool, + _include_raw: bool, ) -> Result, bdk::Error> { - self.lookup_tx(tx_id).map(|tx| { - tx.map(|mut tx| { - if !include_raw { - tx.transaction = None; - } - tx - }) - }) + self.lookup_tx(tx_id) } fn get_last_index(&self, kind: KeychainKind) -> Result, bdk::Error> { self.ctx @@ -512,58 +413,50 @@ impl BatchDatabase for SqlxWalletDb { &mut self, mut batch: ::Batch, ) -> Result<(), bdk::Error> { - // Atomic scope here is limited to staged script pubkeys, utxos, and transactions. - // `set_last_index` / `set_sync_time` remain immediate writes by design. - let (addresses_for_cache, addresses_for_db): (Vec<_>, Vec<_>) = batch + let addresses_for_cache: Vec<_> = batch + .batch + .addresses + .iter() + .map(|(script, (keychain, path))| (script.clone(), (*keychain, *path))) + .collect(); + let addresses_for_db: Vec<_> = batch .batch .addresses .drain() - .map(|(script, (keychain, path))| { - let cache_entry = (script.clone(), (keychain, path)); - let db_entry = (BdkKeychainKind::from(keychain), path, script); - (cache_entry, db_entry) - }) - .unzip(); - - let (txs_for_cache, txs_for_db): (Vec<_>, Vec<_>) = batch + .map(|(script, (keychain, path))| (BdkKeychainKind::from(keychain), path, script)) + .collect(); + + let txs_for_cache: Vec<_> = batch .batch .txs - .drain() - .map(|(txid, tx)| ((txid, tx.clone()), tx)) - .unzip(); + .iter() + .map(|(txid, tx)| (*txid, tx.clone())) + .collect(); + let txs_for_db: Vec<_> = batch.batch.txs.drain().map(|(_, tx)| tx).collect(); let utxos_for_db = std::mem::take(&mut batch.batch.utxos); let keychain_id = batch.ctx.keychain_id; let pool = batch.ctx.pool.clone(); batch.ctx.rt.block_on(async move { - let mut tx = pool - .begin() - .await - .map_err(|e| bdk::Error::Generic(e.to_string()))?; - if !addresses_for_db.is_empty() { ScriptPubkeys::new(keychain_id, pool.clone()) - .persist_all_in_tx(&mut tx, addresses_for_db) + .persist_all(addresses_for_db) .await?; } if !utxos_for_db.is_empty() { Utxos::new(keychain_id, pool.clone()) - .persist_all_in_tx(&mut tx, utxos_for_db) + .persist_all(utxos_for_db) .await?; } if !txs_for_db.is_empty() { Transactions::new(keychain_id, pool) - .persist_all_in_tx(&mut tx, txs_for_db) + .persist_all(txs_for_db) .await?; } - tx.commit() - .await - .map_err(|e| bdk::Error::Generic(e.to_string()))?; - Ok::<_, bdk::Error>(()) })?; @@ -621,53 +514,4 @@ mod tests { .expect("get should succeed"); assert_eq!(loaded, Some(path)); } - - #[test] - fn wallet_cache_raw_txs_loaded_flag_defaults_false_and_can_be_set() { - let cache = WalletCache::new(); - assert!(!cache.raw_txs_fully_loaded()); - - cache.set_raw_txs_fully_loaded(); - assert!(cache.raw_txs_fully_loaded()); - } - - #[test] - fn overlay_batch_txs_strips_raw_when_include_raw_is_false() { - let txid = Txid::all_zeros(); - let mut base = HashMap::new(); - base.insert(txid, tx_details(txid)); - - let raw_tx = bdk::bitcoin::Transaction { - version: 2, - lock_time: bdk::bitcoin::absolute::LockTime::ZERO, - input: Vec::new(), - output: Vec::new(), - }; - - let mut batch = HashMap::new(); - let mut batch_tx = tx_details(txid); - batch_tx.transaction = Some(raw_tx); - batch.insert(txid, batch_tx); - - let merged = SqlxWalletDb::overlay_batch_txs(base, &batch, false); - assert!(merged - .get(&txid) - .expect("merged tx should exist") - .transaction - .is_none()); - } - - #[test] - fn wallet_cache_all_txs_returns_cached_values() { - let cache = WalletCache::new(); - let txid = Txid::all_zeros(); - - cache - .insert_tx(txid, tx_details(txid)) - .expect("insert should succeed"); - - let txs = cache.all_txs().expect("all_txs should succeed"); - assert_eq!(txs.len(), 1); - assert_eq!(txs[0].txid, txid); - } } diff --git a/src/bdk/pg/script_pubkeys.rs b/src/bdk/pg/script_pubkeys.rs index e02eeca3..8c6128fd 100644 --- a/src/bdk/pg/script_pubkeys.rs +++ b/src/bdk/pg/script_pubkeys.rs @@ -1,4 +1,5 @@ -use sqlx::{PgPool, Postgres, QueryBuilder, Transaction}; +use sqlx::{PgPool, Postgres, QueryBuilder}; +use std::collections::HashMap; use tracing::instrument; use uuid::Uuid; @@ -16,8 +17,6 @@ impl ScriptPubkeys { } #[instrument(name = "bdk.script_pubkeys.persist_all", skip_all)] - // Retained for non-transactional call sites and focused tests. - #[allow(dead_code)] pub async fn persist_all( &self, keys: Vec<(BdkKeychainKind, u32, ScriptBuf)>, @@ -40,45 +39,12 @@ impl ScriptPubkeys { }); query_builder.push("ON CONFLICT DO NOTHING"); - query_builder - .build() + let query = query_builder.build(); + query .execute(&self.pool) .await .map_err(|e| bdk::Error::Generic(e.to_string()))?; } - - Ok(()) - } - - pub async fn persist_all_in_tx( - &self, - tx: &mut Transaction<'_, Postgres>, - keys: Vec<(BdkKeychainKind, u32, ScriptBuf)>, - ) -> Result<(), bdk::Error> { - const BATCH_SIZE: usize = 5000; - let chunks = keys.chunks(BATCH_SIZE); - for chunk in chunks { - let mut query_builder: QueryBuilder = QueryBuilder::new( - r#"INSERT INTO bdk_script_pubkeys - (keychain_id, keychain_kind, path, script, script_hex, script_fmt)"#, - ); - - query_builder.push_values(chunk, |mut builder, (keychain, path, script)| { - builder.push_bind(self.keychain_id); - builder.push_bind(keychain); - builder.push_bind(*path as i32); - builder.push_bind(script.as_bytes()); - builder.push_bind(format!("{script:02x}")); - builder.push_bind(format!("{script:?}")); - }); - query_builder.push("ON CONFLICT DO NOTHING"); - - query_builder - .build() - .execute(tx.as_mut()) - .await - .map_err(|e| bdk::Error::Generic(e.to_string()))?; - } Ok(()) } @@ -89,36 +55,65 @@ impl ScriptPubkeys { path: u32, ) -> Result, bdk::Error> { let kind = keychain.into(); - let row = sqlx::query!( + let rows = sqlx::query!( r#"SELECT script FROM bdk_script_pubkeys WHERE keychain_id = $1 AND keychain_kind = $2 AND path = $3"#, Uuid::from(self.keychain_id), kind as BdkKeychainKind, path as i32, ) - .fetch_optional(&self.pool) + .fetch_all(&self.pool) .await .map_err(|e| bdk::Error::Generic(e.to_string()))?; + Ok(rows + .into_iter() + .next() + .map(|row| ScriptBuf::from(row.script))) + } - Ok(row.map(|row| ScriptBuf::from(row.script))) + #[allow(dead_code)] + #[instrument(name = "bdk.script_pubkeys.load_all", skip_all)] + pub async fn load_all( + &self, + ) -> Result, bdk::Error> { + let rows = sqlx::query!( + r#"SELECT script, keychain_kind as "keychain_kind: BdkKeychainKind", path FROM bdk_script_pubkeys + WHERE keychain_id = $1"#, + Uuid::from(self.keychain_id), + ) + .fetch_all(&self.pool) + .await + .map_err(|e| bdk::Error::Generic(e.to_string()))?; + let mut ret = HashMap::new(); + for row in rows { + ret.insert( + ScriptBuf::from(row.script), + (bdk::KeychainKind::from(row.keychain_kind), row.path as u32), + ); + } + Ok(ret) } + #[allow(dead_code)] #[instrument(name = "bdk.script_pubkeys.find_path", skip_all)] pub async fn find_path( &self, script: &ScriptBuf, ) -> Result, bdk::Error> { - let row = sqlx::query!( + let rows = sqlx::query!( r#"SELECT keychain_kind as "keychain_kind: BdkKeychainKind", path FROM bdk_script_pubkeys WHERE keychain_id = $1 AND script_hex = ENCODE($2, 'hex')"#, Uuid::from(self.keychain_id), script.as_bytes(), ) - .fetch_optional(&self.pool) + .fetch_all(&self.pool) .await .map_err(|e| bdk::Error::Generic(e.to_string()))?; - - Ok(row.map(|row| (row.keychain_kind, row.path as u32))) + if let Some(row) = rows.into_iter().next() { + Ok(Some((row.keychain_kind, row.path as u32))) + } else { + Ok(None) + } } #[instrument(name = "bdk.script_pubkeys.list_scripts", skip_all)] @@ -126,28 +121,28 @@ impl ScriptPubkeys { &self, keychain: Option>, ) -> Result, bdk::Error> { - let keychain_id = Uuid::from(self.keychain_id); - let scripts = if let Some(kind) = keychain.map(Into::into) { - sqlx::query_scalar!( - r#"SELECT script FROM bdk_script_pubkeys - WHERE keychain_id = $1 AND keychain_kind = $2"#, - keychain_id, - kind as BdkKeychainKind, - ) - .fetch_all(&self.pool) - .await - .map_err(|e| bdk::Error::Generic(e.to_string()))? - } else { - sqlx::query_scalar!( - r#"SELECT script FROM bdk_script_pubkeys - WHERE keychain_id = $1"#, - keychain_id, - ) - .fetch_all(&self.pool) - .await - .map_err(|e| bdk::Error::Generic(e.to_string()))? - }; - - Ok(scripts.into_iter().map(ScriptBuf::from).collect()) + let kind = keychain.map(|k| k.into()); + let rows = sqlx::query!( + r#"SELECT script, keychain_kind as "keychain_kind: BdkKeychainKind" FROM bdk_script_pubkeys + WHERE keychain_id = $1"#, + Uuid::from(self.keychain_id), + ) + .fetch_all(&self.pool) + .await + .map_err(|e| bdk::Error::Generic(e.to_string()))?; + Ok(rows + .into_iter() + .filter_map(|row| { + if let Some(kind) = kind { + if kind == row.keychain_kind { + Some(ScriptBuf::from(row.script)) + } else { + None + } + } else { + Some(ScriptBuf::from(row.script)) + } + }) + .collect()) } } From e37720b795e473a95eb5612661f901415a63ecbe Mon Sep 17 00:00:00 2001 From: Juan P Lopez Date: Thu, 16 Apr 2026 11:59:19 -0500 Subject: [PATCH 02/12] perf(bdk): optimize sync db reads and harden pg adapters Move keychain filtering into SQL and add a lightweight iter_txs(false) path to reduce wallet sync query cost. Replace panic-prone unimplemented/unwrap/expect paths in pg wallet database adapters with recoverable errors to avoid process crashes under inconsistent data. --- .cargo/audit.toml | 3 +- src/bdk/pg/mod.rs | 82 +++++++++++++++++----- src/bdk/pg/script_pubkeys.rs | 52 ++++++++------ src/bdk/pg/transactions.rs | 128 ++++++++--------------------------- src/bdk/pg/utxos.rs | 113 ++++++++----------------------- 5 files changed, 153 insertions(+), 225 deletions(-) diff --git a/.cargo/audit.toml b/.cargo/audit.toml index 0a8cdcda..910cce68 100644 --- a/.cargo/audit.toml +++ b/.cargo/audit.toml @@ -2,7 +2,8 @@ ignore = [ "RUSTSEC-2023-0071", "RUSTSEC-2026-0049", - # TODO: remove once electrum-client/bdk stops pulling rustls-webpki 0.101.7 + # TODO: remove once electrum-client/bdk stop pulling rustls-webpki 0.101.7 "RUSTSEC-2026-0098", + # TODO: remove once electrum-client/bdk stop pulling rustls-webpki 0.101.7 "RUSTSEC-2026-0099", ] diff --git a/src/bdk/pg/mod.rs b/src/bdk/pg/mod.rs index 055fc107..43eb57dd 100644 --- a/src/bdk/pg/mod.rs +++ b/src/bdk/pg/mod.rs @@ -239,7 +239,9 @@ impl BatchOperations for SqlxWalletDb { } fn set_raw_tx(&mut self, _: &Transaction) -> Result<(), bdk::Error> { - unimplemented!() + Err(bdk::Error::Generic( + "set_raw_tx is not supported by SqlxWalletDb".to_string(), + )) } fn set_tx(&mut self, tx: &TransactionDetails) -> Result<(), bdk::Error> { @@ -264,13 +266,17 @@ impl BatchOperations for SqlxWalletDb { _: KeychainKind, _: u32, ) -> Result, bdk::Error> { - unimplemented!() + Err(bdk::Error::Generic( + "del_script_pubkey_from_path is not supported by SqlxWalletDb".to_string(), + )) } fn del_path_from_script_pubkey( &mut self, _: &Script, ) -> Result, bdk::Error> { - unimplemented!() + Err(bdk::Error::Generic( + "del_path_from_script_pubkey is not supported by SqlxWalletDb".to_string(), + )) } fn del_utxo(&mut self, outpoint: &OutPoint) -> Result, bdk::Error> { self.ctx @@ -278,7 +284,9 @@ impl BatchOperations for SqlxWalletDb { .block_on(async { self.utxos_repo().delete(outpoint).await }) } fn del_raw_tx(&mut self, _: &Txid) -> Result, bdk::Error> { - unimplemented!() + Err(bdk::Error::Generic( + "del_raw_tx is not supported by SqlxWalletDb".to_string(), + )) } fn del_tx( @@ -299,10 +307,14 @@ impl BatchOperations for SqlxWalletDb { Ok(deleted) } fn del_last_index(&mut self, _: KeychainKind) -> Result, bdk::Error> { - unimplemented!() + Err(bdk::Error::Generic( + "del_last_index is not supported by SqlxWalletDb".to_string(), + )) } fn del_sync_time(&mut self) -> Result, bdk::Error> { - unimplemented!() + Err(bdk::Error::Generic( + "del_sync_time is not supported by SqlxWalletDb".to_string(), + )) } } @@ -338,15 +350,32 @@ impl Database for SqlxWalletDb { .block_on(async { self.utxos_repo().list_local_utxos().await }) } fn iter_raw_txs(&self) -> Result, bdk::Error> { - unimplemented!() - } + Err(bdk::Error::Generic( + "iter_raw_txs is not supported by SqlxWalletDb".to_string(), + )) + } + + fn iter_txs(&self, include_raw: bool) -> Result, bdk::Error> { + let mut txs = if include_raw { + self.ctx + .rt + .block_on(async { self.transactions_repo().load_all().await })? + } else { + self.ctx + .rt + .block_on(async { self.transactions_repo().load_all_summaries().await })? + }; + + if include_raw { + txs.extend(self.batch.txs.iter().map(|(id, tx)| (*id, tx.clone()))); + } else { + txs.extend(self.batch.txs.iter().map(|(id, tx)| { + let mut tx = tx.clone(); + tx.transaction = None; + (*id, tx) + })); + } - fn iter_txs(&self, _: bool) -> Result, bdk::Error> { - let mut txs = self - .ctx - .rt - .block_on(async { self.transactions_repo().load_all().await })?; - txs.extend(self.batch.txs.iter().map(|(id, tx)| (*id, tx.clone()))); Ok(txs.into_values().collect()) } @@ -371,8 +400,29 @@ impl Database for SqlxWalletDb { .block_on(async { self.utxos_repo().find(outpoint).await }) } fn get_raw_tx(&self, tx_id: &Txid) -> Result, bdk::Error> { - self.lookup_tx(tx_id) - .map(|tx| tx.and_then(|tx| tx.transaction)) + if let Some(tx) = self.batch.txs.get(tx_id) { + if let Some(transaction) = &tx.transaction { + return Ok(Some(transaction.clone())); + } + } + + if let Some(tx) = self.cache.get_tx(tx_id)? { + if let Some(transaction) = tx.transaction { + return Ok(Some(transaction)); + } + } + + let found = self + .ctx + .rt + .block_on(async { self.transactions_repo().find_by_id(tx_id).await })?; + + if let Some(tx) = found { + self.cache.insert_tx(tx.txid, tx.clone())?; + Ok(tx.transaction) + } else { + Ok(None) + } } fn get_tx( &self, diff --git a/src/bdk/pg/script_pubkeys.rs b/src/bdk/pg/script_pubkeys.rs index 8c6128fd..f76eecee 100644 --- a/src/bdk/pg/script_pubkeys.rs +++ b/src/bdk/pg/script_pubkeys.rs @@ -122,27 +122,35 @@ impl ScriptPubkeys { keychain: Option>, ) -> Result, bdk::Error> { let kind = keychain.map(|k| k.into()); - let rows = sqlx::query!( - r#"SELECT script, keychain_kind as "keychain_kind: BdkKeychainKind" FROM bdk_script_pubkeys - WHERE keychain_id = $1"#, - Uuid::from(self.keychain_id), - ) - .fetch_all(&self.pool) - .await - .map_err(|e| bdk::Error::Generic(e.to_string()))?; - Ok(rows - .into_iter() - .filter_map(|row| { - if let Some(kind) = kind { - if kind == row.keychain_kind { - Some(ScriptBuf::from(row.script)) - } else { - None - } - } else { - Some(ScriptBuf::from(row.script)) - } - }) - .collect()) + if let Some(kind) = kind { + let rows = sqlx::query!( + r#"SELECT script FROM bdk_script_pubkeys + WHERE keychain_id = $1 AND keychain_kind = $2"#, + Uuid::from(self.keychain_id), + kind as BdkKeychainKind, + ) + .fetch_all(&self.pool) + .await + .map_err(|e| bdk::Error::Generic(e.to_string()))?; + + Ok(rows + .into_iter() + .map(|row| ScriptBuf::from(row.script)) + .collect()) + } else { + let rows = sqlx::query!( + r#"SELECT script FROM bdk_script_pubkeys + WHERE keychain_id = $1"#, + Uuid::from(self.keychain_id), + ) + .fetch_all(&self.pool) + .await + .map_err(|e| bdk::Error::Generic(e.to_string()))?; + + Ok(rows + .into_iter() + .map(|row| ScriptBuf::from(row.script)) + .collect()) + } } } diff --git a/src/bdk/pg/transactions.rs b/src/bdk/pg/transactions.rs index 64ca305a..9b7a4648 100644 --- a/src/bdk/pg/transactions.rs +++ b/src/bdk/pg/transactions.rs @@ -1,13 +1,11 @@ use bdk::{bitcoin::Txid, BlockTime, LocalUtxo, TransactionDetails}; -use sqlx::{PgPool, Postgres, QueryBuilder, Transaction as SqlxTransaction}; +use sqlx::{PgPool, Postgres, QueryBuilder, Transaction}; use tracing::instrument; use std::collections::HashMap; use crate::{bdk::error::BdkError, primitives::*}; -type SerializedTransactionRow = (String, serde_json::Value, i64, Option); - #[derive(Debug)] pub struct UnsyncedTransaction { pub tx_id: bitcoin::Txid, @@ -33,37 +31,29 @@ pub struct Transactions { } impl Transactions { - fn serialize_batch( - batch: &[TransactionDetails], - ) -> Result, bdk::Error> { - batch - .iter() - .map(|tx| { - Ok::<_, bdk::Error>(( - tx.txid.to_string(), - serde_json::to_value(tx).map_err(|e| { - bdk::Error::Generic(format!("failed to serialize tx details: {e}")) - })?, - tx.sent as i64, - tx.confirmation_time.as_ref().map(|t| t.height as i32), - )) - }) - .collect() - } - pub fn new(keychain_id: KeychainId, pool: PgPool) -> Self { Self { keychain_id, pool } } #[instrument(name = "bdk.transactions.persist", skip_all)] - // Retained for non-transactional call sites and focused tests. - #[allow(dead_code)] pub async fn persist_all(&self, txs: Vec) -> Result<(), bdk::Error> { const BATCH_SIZE: usize = 2000; let batches = txs.chunks(BATCH_SIZE); for batch in batches { - let serialized_batch = Self::serialize_batch(batch)?; + let serialized_batch = batch + .iter() + .map(|tx| { + Ok::<_, bdk::Error>(( + tx.txid.to_string(), + serde_json::to_value(tx).map_err(|e| { + bdk::Error::Generic(format!("failed to serialize tx details: {e}")) + })?, + tx.sent as i64, + tx.confirmation_time.as_ref().map(|t| t.height as i32), + )) + }) + .collect::, _>>()?; let mut query_builder: QueryBuilder = QueryBuilder::new( r#" @@ -71,16 +61,13 @@ impl Transactions { (keychain_id, tx_id, details_json, sent, height)"#, ); - query_builder.push_values( - serialized_batch, - |mut builder, (tx_id, details_json, sent, height)| { - builder.push_bind(self.keychain_id as KeychainId); - builder.push_bind(tx_id); - builder.push_bind(details_json); - builder.push_bind(sent); - builder.push_bind(height); - }, - ); + query_builder.push_values(serialized_batch, |mut builder, tx| { + builder.push_bind(self.keychain_id as KeychainId); + builder.push_bind(tx.0); + builder.push_bind(tx.1); + builder.push_bind(tx.2); + builder.push_bind(tx.3); + }); query_builder.push( "ON CONFLICT (keychain_id, tx_id) DO UPDATE \ @@ -95,8 +82,8 @@ impl Transactions { OR bdk_transactions.deleted_at IS NOT NULL", ); - query_builder - .build() + let query = query_builder.build(); + query .execute(&self.pool) .await .map_err(|e| bdk::Error::Generic(e.to_string()))?; @@ -105,57 +92,6 @@ impl Transactions { Ok(()) } - pub async fn persist_all_in_tx( - &self, - tx: &mut SqlxTransaction<'_, Postgres>, - txs: Vec, - ) -> Result<(), bdk::Error> { - const BATCH_SIZE: usize = 2000; - let batches = txs.chunks(BATCH_SIZE); - - for batch in batches { - let serialized_batch = Self::serialize_batch(batch)?; - - let mut query_builder: QueryBuilder = QueryBuilder::new( - r#" - INSERT INTO bdk_transactions - (keychain_id, tx_id, details_json, sent, height)"#, - ); - - query_builder.push_values( - serialized_batch, - |mut builder, (tx_id, details_json, sent, height)| { - builder.push_bind(self.keychain_id as KeychainId); - builder.push_bind(tx_id); - builder.push_bind(details_json); - builder.push_bind(sent); - builder.push_bind(height); - }, - ); - - query_builder.push( - "ON CONFLICT (keychain_id, tx_id) DO UPDATE \ - SET details_json = EXCLUDED.details_json,\ - sent = EXCLUDED.sent,\ - height = EXCLUDED.height,\ - modified_at = NOW(),\ - deleted_at = NULL \ - WHERE bdk_transactions.details_json IS DISTINCT FROM EXCLUDED.details_json \ - OR bdk_transactions.sent IS DISTINCT FROM EXCLUDED.sent \ - OR bdk_transactions.height IS DISTINCT FROM EXCLUDED.height \ - OR bdk_transactions.deleted_at IS NOT NULL", - ); - - query_builder - .build() - .execute(tx.as_mut()) - .await - .map_err(|e| bdk::Error::Generic(e.to_string()))?; - } - - Ok(()) - } - #[instrument(name = "bdk.transactions.delete", skip_all)] pub async fn delete(&self, tx_id: &Txid) -> Result, bdk::Error> { let tx = sqlx::query!( @@ -177,6 +113,7 @@ impl Transactions { .transpose() } + #[allow(dead_code)] #[instrument(name = "bdk.transactions.find_by_id", skip_all)] pub async fn find_by_id(&self, tx_id: &Txid) -> Result, bdk::Error> { let tx = sqlx::query!( @@ -250,15 +187,6 @@ impl Transactions { Ok(value as u64) } - fn to_u32(value: i32, field: &str) -> Result { - if value < 0 { - return Err(bdk::Error::Generic(format!( - "negative {field} value in bdk_transactions" - ))); - } - Ok(value as u32) - } - rows.into_iter() .map(|row| { let txid = row @@ -268,7 +196,7 @@ impl Transactions { let confirmation_time = match (row.height, row.confirmation_timestamp) { (Some(height), Some(timestamp)) => Some(BlockTime { - height: to_u32(height, "height")?, + height: height as u32, timestamp: to_u64(timestamp, "confirmation timestamp")?, }), _ => None, @@ -373,7 +301,7 @@ impl Transactions { #[instrument(name = "bdk.transactions.find_confirmed_spend_tx", skip(self, tx))] pub async fn find_confirmed_spend_tx( &self, - tx: &mut SqlxTransaction<'_, Postgres>, + tx: &mut Transaction<'_, Postgres>, min_height: u32, ) -> Result, BdkError> { let rows = sqlx::query!(r#" @@ -465,7 +393,7 @@ impl Transactions { #[instrument(name = "bdk.transactions.mark_confirmed", skip(self))] pub async fn mark_confirmed( &self, - tx: &mut SqlxTransaction<'_, Postgres>, + tx: &mut Transaction<'_, Postgres>, tx_id: bitcoin::Txid, ) -> Result<(), BdkError> { sqlx::query!( @@ -485,7 +413,7 @@ impl Transactions { )] pub async fn delete_transaction_if_no_more_utxos_exist( &self, - tx: &mut SqlxTransaction<'_, Postgres>, + tx: &mut Transaction<'_, Postgres>, outpoint: bitcoin::OutPoint, ) -> Result<(), BdkError> { sqlx::query!( diff --git a/src/bdk/pg/utxos.rs b/src/bdk/pg/utxos.rs index cc0a25c8..65cca85e 100644 --- a/src/bdk/pg/utxos.rs +++ b/src/bdk/pg/utxos.rs @@ -1,12 +1,10 @@ use bdk::{bitcoin::blockdata::transaction::OutPoint, LocalUtxo, TransactionDetails}; -use sqlx::{PgPool, Postgres, QueryBuilder, Transaction as SqlxTransaction}; +use sqlx::{PgPool, Postgres, QueryBuilder, Transaction}; use tracing::instrument; use uuid::Uuid; use crate::{bdk::error::BdkError, primitives::*}; -type SerializedUtxoRow = (String, i32, serde_json::Value, bool); - pub struct ConfirmedIncomeUtxo { pub outpoint: bitcoin::OutPoint, pub spent: bool, @@ -19,51 +17,42 @@ pub struct Utxos { } impl Utxos { - fn serialize_batch(batch: &[LocalUtxo]) -> Result, bdk::Error> { - batch - .iter() - .map(|utxo| { - Ok::<_, bdk::Error>(( - utxo.outpoint.txid.to_string(), - utxo.outpoint.vout as i32, - serde_json::to_value(utxo).map_err(|e| { - bdk::Error::Generic(format!("failed to serialize utxo: {e}")) - })?, - utxo.is_spent, - )) - }) - .collect() - } - pub fn new(keychain_id: KeychainId, pool: PgPool) -> Self { Self { keychain_id, pool } } #[instrument(name = "bdk.utxos.persist_all", skip_all)] - // Retained for non-transactional call sites and focused tests. - #[allow(dead_code)] pub async fn persist_all(&self, utxos: Vec) -> Result<(), bdk::Error> { const BATCH_SIZE: usize = 2000; let batches = utxos.chunks(BATCH_SIZE); for batch in batches { - let serialized_batch = Self::serialize_batch(batch)?; + let serialized_batch = batch + .iter() + .map(|utxo| { + Ok::<_, bdk::Error>(( + utxo.outpoint.txid.to_string(), + utxo.outpoint.vout as i32, + serde_json::to_value(utxo).map_err(|e| { + bdk::Error::Generic(format!("failed to serialize utxo: {e}")) + })?, + utxo.is_spent, + )) + }) + .collect::, _>>()?; let mut query_builder: QueryBuilder = QueryBuilder::new( r#"INSERT INTO bdk_utxos (keychain_id, tx_id, vout, utxo_json, is_spent)"#, ); - query_builder.push_values( - serialized_batch, - |mut builder, (tx_id, vout, utxo_json, is_spent)| { - builder.push_bind(Uuid::from(self.keychain_id)); - builder.push_bind(tx_id); - builder.push_bind(vout); - builder.push_bind(utxo_json); - builder.push_bind(is_spent); - }, - ); + query_builder.push_values(serialized_batch, |mut builder, utxo| { + builder.push_bind(Uuid::from(self.keychain_id)); + builder.push_bind(utxo.0); + builder.push_bind(utxo.1); + builder.push_bind(utxo.2); + builder.push_bind(utxo.3); + }); query_builder.push( "ON CONFLICT (keychain_id, tx_id, vout) DO UPDATE \ @@ -76,8 +65,8 @@ impl Utxos { OR bdk_utxos.deleted_at IS NOT NULL", ); - query_builder - .build() + let query = query_builder.build(); + query .execute(&self.pool) .await .map_err(|e| bdk::Error::Generic(e.to_string()))?; @@ -86,54 +75,6 @@ impl Utxos { Ok(()) } - pub async fn persist_all_in_tx( - &self, - tx: &mut SqlxTransaction<'_, Postgres>, - utxos: Vec, - ) -> Result<(), bdk::Error> { - const BATCH_SIZE: usize = 2000; - let batches = utxos.chunks(BATCH_SIZE); - - for batch in batches { - let serialized_batch = Self::serialize_batch(batch)?; - - let mut query_builder: QueryBuilder = QueryBuilder::new( - r#"INSERT INTO bdk_utxos - (keychain_id, tx_id, vout, utxo_json, is_spent)"#, - ); - - query_builder.push_values( - serialized_batch, - |mut builder, (tx_id, vout, utxo_json, is_spent)| { - builder.push_bind(Uuid::from(self.keychain_id)); - builder.push_bind(tx_id); - builder.push_bind(vout); - builder.push_bind(utxo_json); - builder.push_bind(is_spent); - }, - ); - - query_builder.push( - "ON CONFLICT (keychain_id, tx_id, vout) DO UPDATE \ - SET utxo_json = EXCLUDED.utxo_json,\ - is_spent = EXCLUDED.is_spent,\ - modified_at = NOW(),\ - deleted_at = NULL \ - WHERE bdk_utxos.utxo_json IS DISTINCT FROM EXCLUDED.utxo_json \ - OR bdk_utxos.is_spent IS DISTINCT FROM EXCLUDED.is_spent \ - OR bdk_utxos.deleted_at IS NOT NULL", - ); - - query_builder - .build() - .execute(tx.as_mut()) - .await - .map_err(|e| bdk::Error::Generic(e.to_string()))?; - } - - Ok(()) - } - #[instrument(name = "bdk.utxos.delete", skip_all)] pub async fn delete( &self, @@ -219,7 +160,7 @@ impl Utxos { #[instrument(name = "bdk.utxos.mark_as_synced", skip(self, tx))] pub async fn mark_as_synced( &self, - tx: &mut SqlxTransaction<'_, Postgres>, + tx: &mut Transaction<'_, Postgres>, utxo: &LocalUtxo, ) -> Result<(), BdkError> { sqlx::query!( @@ -237,7 +178,7 @@ impl Utxos { #[instrument(name = "bdk.utxos.mark_confirmed", skip(self, tx))] pub async fn mark_confirmed( &self, - tx: &mut SqlxTransaction<'_, Postgres>, + tx: &mut Transaction<'_, Postgres>, utxo: &LocalUtxo, ) -> Result<(), BdkError> { sqlx::query!( @@ -255,7 +196,7 @@ impl Utxos { #[instrument(name = "bdk.utxos.find_confirmed_income_utxo", skip(self, tx))] pub async fn find_confirmed_income_utxo( &self, - tx: &mut SqlxTransaction<'_, Postgres>, + tx: &mut Transaction<'_, Postgres>, min_height: u32, ) -> Result, BdkError> { let row = sqlx::query!( @@ -307,7 +248,7 @@ impl Utxos { #[instrument(name = "bdk.utxos.find_and_remove_soft_deleted_utxo", skip_all)] pub async fn find_and_remove_soft_deleted_utxo( &self, - tx: &mut SqlxTransaction<'_, Postgres>, + tx: &mut Transaction<'_, Postgres>, ) -> Result, BdkError> { let row = sqlx::query!( r#"DELETE FROM bdk_utxos From 250b415828ac89ad4cefbb7fbebbe8b625dde0a3 Mon Sep 17 00:00:00 2001 From: Juan P Lopez Date: Thu, 16 Apr 2026 12:13:38 -0500 Subject: [PATCH 03/12] refactor(bdk): simplify pg wallet adapter flows Simplify control flow and batching code in the Postgres BDK adapters without changing behavior. This keeps recent sync performance optimizations while reducing duplication and making cache/DB paths easier to reason about. --- src/bdk/pg/mod.rs | 44 ++++++++++----------- src/bdk/pg/script_pubkeys.rs | 77 +++++++++++++++++------------------- src/bdk/pg/transactions.rs | 17 ++++---- src/bdk/pg/utxos.rs | 17 ++++---- 4 files changed, 78 insertions(+), 77 deletions(-) diff --git a/src/bdk/pg/mod.rs b/src/bdk/pg/mod.rs index 43eb57dd..47167d66 100644 --- a/src/bdk/pg/mod.rs +++ b/src/bdk/pg/mod.rs @@ -400,16 +400,17 @@ impl Database for SqlxWalletDb { .block_on(async { self.utxos_repo().find(outpoint).await }) } fn get_raw_tx(&self, tx_id: &Txid) -> Result, bdk::Error> { - if let Some(tx) = self.batch.txs.get(tx_id) { - if let Some(transaction) = &tx.transaction { - return Ok(Some(transaction.clone())); - } + if let Some(transaction) = self + .batch + .txs + .get(tx_id) + .and_then(|tx| tx.transaction.clone()) + { + return Ok(Some(transaction)); } - if let Some(tx) = self.cache.get_tx(tx_id)? { - if let Some(transaction) = tx.transaction { - return Ok(Some(transaction)); - } + if let Some(transaction) = self.cache.get_tx(tx_id)?.and_then(|tx| tx.transaction) { + return Ok(Some(transaction)); } let found = self @@ -463,26 +464,23 @@ impl BatchDatabase for SqlxWalletDb { &mut self, mut batch: ::Batch, ) -> Result<(), bdk::Error> { - let addresses_for_cache: Vec<_> = batch - .batch - .addresses - .iter() - .map(|(script, (keychain, path))| (script.clone(), (*keychain, *path))) - .collect(); - let addresses_for_db: Vec<_> = batch + let (addresses_for_cache, addresses_for_db): (Vec<_>, Vec<_>) = batch .batch .addresses .drain() - .map(|(script, (keychain, path))| (BdkKeychainKind::from(keychain), path, script)) - .collect(); - - let txs_for_cache: Vec<_> = batch + .map(|(script, (keychain, path))| { + let cache_entry = (script.clone(), (keychain, path)); + let db_entry = (BdkKeychainKind::from(keychain), path, script); + (cache_entry, db_entry) + }) + .unzip(); + + let (txs_for_cache, txs_for_db): (Vec<_>, Vec<_>) = batch .batch .txs - .iter() - .map(|(txid, tx)| (*txid, tx.clone())) - .collect(); - let txs_for_db: Vec<_> = batch.batch.txs.drain().map(|(_, tx)| tx).collect(); + .drain() + .map(|(txid, tx)| ((txid, tx.clone()), tx)) + .unzip(); let utxos_for_db = std::mem::take(&mut batch.batch.utxos); let keychain_id = batch.ctx.keychain_id; diff --git a/src/bdk/pg/script_pubkeys.rs b/src/bdk/pg/script_pubkeys.rs index f76eecee..c6fa643b 100644 --- a/src/bdk/pg/script_pubkeys.rs +++ b/src/bdk/pg/script_pubkeys.rs @@ -55,20 +55,18 @@ impl ScriptPubkeys { path: u32, ) -> Result, bdk::Error> { let kind = keychain.into(); - let rows = sqlx::query!( + let row = sqlx::query!( r#"SELECT script FROM bdk_script_pubkeys WHERE keychain_id = $1 AND keychain_kind = $2 AND path = $3"#, Uuid::from(self.keychain_id), kind as BdkKeychainKind, path as i32, ) - .fetch_all(&self.pool) + .fetch_optional(&self.pool) .await .map_err(|e| bdk::Error::Generic(e.to_string()))?; - Ok(rows - .into_iter() - .next() - .map(|row| ScriptBuf::from(row.script))) + + Ok(row.map(|row| ScriptBuf::from(row.script))) } #[allow(dead_code)] @@ -100,20 +98,17 @@ impl ScriptPubkeys { &self, script: &ScriptBuf, ) -> Result, bdk::Error> { - let rows = sqlx::query!( + let row = sqlx::query!( r#"SELECT keychain_kind as "keychain_kind: BdkKeychainKind", path FROM bdk_script_pubkeys WHERE keychain_id = $1 AND script_hex = ENCODE($2, 'hex')"#, Uuid::from(self.keychain_id), script.as_bytes(), ) - .fetch_all(&self.pool) + .fetch_optional(&self.pool) .await .map_err(|e| bdk::Error::Generic(e.to_string()))?; - if let Some(row) = rows.into_iter().next() { - Ok(Some((row.keychain_kind, row.path as u32))) - } else { - Ok(None) - } + + Ok(row.map(|row| (row.keychain_kind, row.path as u32))) } #[instrument(name = "bdk.script_pubkeys.list_scripts", skip_all)] @@ -121,36 +116,38 @@ impl ScriptPubkeys { &self, keychain: Option>, ) -> Result, bdk::Error> { - let kind = keychain.map(|k| k.into()); - if let Some(kind) = kind { - let rows = sqlx::query!( - r#"SELECT script FROM bdk_script_pubkeys + let scripts = match keychain.map(|k| k.into()) { + Some(kind) => { + let rows = sqlx::query!( + r#"SELECT script FROM bdk_script_pubkeys WHERE keychain_id = $1 AND keychain_kind = $2"#, - Uuid::from(self.keychain_id), - kind as BdkKeychainKind, - ) - .fetch_all(&self.pool) - .await - .map_err(|e| bdk::Error::Generic(e.to_string()))?; + Uuid::from(self.keychain_id), + kind as BdkKeychainKind, + ) + .fetch_all(&self.pool) + .await + .map_err(|e| bdk::Error::Generic(e.to_string()))?; - Ok(rows - .into_iter() - .map(|row| ScriptBuf::from(row.script)) - .collect()) - } else { - let rows = sqlx::query!( - r#"SELECT script FROM bdk_script_pubkeys + rows.into_iter() + .map(|row| ScriptBuf::from(row.script)) + .collect() + } + None => { + let rows = sqlx::query!( + r#"SELECT script FROM bdk_script_pubkeys WHERE keychain_id = $1"#, - Uuid::from(self.keychain_id), - ) - .fetch_all(&self.pool) - .await - .map_err(|e| bdk::Error::Generic(e.to_string()))?; + Uuid::from(self.keychain_id), + ) + .fetch_all(&self.pool) + .await + .map_err(|e| bdk::Error::Generic(e.to_string()))?; - Ok(rows - .into_iter() - .map(|row| ScriptBuf::from(row.script)) - .collect()) - } + rows.into_iter() + .map(|row| ScriptBuf::from(row.script)) + .collect() + } + }; + + Ok(scripts) } } diff --git a/src/bdk/pg/transactions.rs b/src/bdk/pg/transactions.rs index 9b7a4648..6514bfff 100644 --- a/src/bdk/pg/transactions.rs +++ b/src/bdk/pg/transactions.rs @@ -61,13 +61,16 @@ impl Transactions { (keychain_id, tx_id, details_json, sent, height)"#, ); - query_builder.push_values(serialized_batch, |mut builder, tx| { - builder.push_bind(self.keychain_id as KeychainId); - builder.push_bind(tx.0); - builder.push_bind(tx.1); - builder.push_bind(tx.2); - builder.push_bind(tx.3); - }); + query_builder.push_values( + serialized_batch, + |mut builder, (tx_id, details_json, sent, height)| { + builder.push_bind(self.keychain_id as KeychainId); + builder.push_bind(tx_id); + builder.push_bind(details_json); + builder.push_bind(sent); + builder.push_bind(height); + }, + ); query_builder.push( "ON CONFLICT (keychain_id, tx_id) DO UPDATE \ diff --git a/src/bdk/pg/utxos.rs b/src/bdk/pg/utxos.rs index 65cca85e..aab4e39d 100644 --- a/src/bdk/pg/utxos.rs +++ b/src/bdk/pg/utxos.rs @@ -46,13 +46,16 @@ impl Utxos { (keychain_id, tx_id, vout, utxo_json, is_spent)"#, ); - query_builder.push_values(serialized_batch, |mut builder, utxo| { - builder.push_bind(Uuid::from(self.keychain_id)); - builder.push_bind(utxo.0); - builder.push_bind(utxo.1); - builder.push_bind(utxo.2); - builder.push_bind(utxo.3); - }); + query_builder.push_values( + serialized_batch, + |mut builder, (tx_id, vout, utxo_json, is_spent)| { + builder.push_bind(Uuid::from(self.keychain_id)); + builder.push_bind(tx_id); + builder.push_bind(vout); + builder.push_bind(utxo_json); + builder.push_bind(is_spent); + }, + ); query_builder.push( "ON CONFLICT (keychain_id, tx_id, vout) DO UPDATE \ From 6fbfebaa366e916c8312275747c57e14179c3d02 Mon Sep 17 00:00:00 2001 From: Juan P Lopez Date: Thu, 16 Apr 2026 12:44:08 -0500 Subject: [PATCH 04/12] fix(bdk): make pg wallet batch writes atomic Execute script pubkeys, utxos, and transactions persistence in a single SQL transaction during commit_batch to avoid partial DB state on failures. Also unify tx lookup behavior for raw vs summary reads, remove unused script_pubkeys load-all path, and document retained non-transactional persist helpers. --- src/bdk/pg/mod.rs | 75 ++++++++++++++++++++-------------- src/bdk/pg/script_pubkeys.rs | 66 +++++++++++++++++------------- src/bdk/pg/transactions.rs | 78 ++++++++++++++++++++++++++++++++---- src/bdk/pg/utxos.rs | 76 +++++++++++++++++++++++++++++++---- 4 files changed, 223 insertions(+), 72 deletions(-) diff --git a/src/bdk/pg/mod.rs b/src/bdk/pg/mod.rs index 47167d66..8c8bcb22 100644 --- a/src/bdk/pg/mod.rs +++ b/src/bdk/pg/mod.rs @@ -30,6 +30,12 @@ pub use utxos::*; type ScriptPubkeyCache = HashMap; type TransactionCache = HashMap; +#[derive(Copy, Clone, Eq, PartialEq)] +enum TxLookupMode { + Any, + RequireRaw, +} + #[derive(Clone)] struct WalletDbContext { rt: Handle, @@ -200,13 +206,21 @@ impl SqlxWalletDb { Ok(None) } - fn lookup_tx(&self, txid: &Txid) -> Result, bdk::Error> { + fn lookup_tx_with_mode( + &self, + txid: &Txid, + mode: TxLookupMode, + ) -> Result, bdk::Error> { if let Some(tx) = self.batch.txs.get(txid) { - return Ok(Some(tx.clone())); + if mode == TxLookupMode::Any || tx.transaction.is_some() { + return Ok(Some(tx.clone())); + } } if let Some(tx) = self.cache.get_tx(txid)? { - return Ok(Some(tx)); + if mode == TxLookupMode::Any || tx.transaction.is_some() { + return Ok(Some(tx)); + } } let found = self @@ -214,12 +228,20 @@ impl SqlxWalletDb { .rt .block_on(async { self.transactions_repo().find_by_id(txid).await })?; + // DB rows represent persisted TransactionDetails; this store does not persist a + // "summary-only" transaction format. A DB hit is therefore valid for both lookup + // modes (`Any` and `RequireRaw`). + if let Some(tx) = &found { self.cache.insert_tx(tx.txid, tx.clone())?; } Ok(found) } + + fn lookup_tx(&self, txid: &Txid) -> Result, bdk::Error> { + self.lookup_tx_with_mode(txid, TxLookupMode::Any) + } } impl BatchOperations for SqlxWalletDb { @@ -250,12 +272,16 @@ impl BatchOperations for SqlxWalletDb { } fn set_last_index(&mut self, kind: KeychainKind, idx: u32) -> Result<(), bdk::Error> { + // NOTE: This write is intentionally immediate because BDK may call it outside of + // `commit_batch` flow. self.ctx .rt .block_on(async { self.indexes_repo().persist_last_index(kind, idx).await }) } fn set_sync_time(&mut self, time: SyncTime) -> Result<(), bdk::Error> { + // NOTE: This write is intentionally immediate because BDK may call it outside of + // `commit_batch` flow. self.ctx .rt .block_on(async { self.sync_times_repo().persist(time).await }) @@ -369,6 +395,8 @@ impl Database for SqlxWalletDb { if include_raw { txs.extend(self.batch.txs.iter().map(|(id, tx)| (*id, tx.clone()))); } else { + // `load_all_summaries` already strips raw transactions from persisted rows, but + // batch entries can still carry `transaction: Some(_)`; normalize them here. txs.extend(self.batch.txs.iter().map(|(id, tx)| { let mut tx = tx.clone(); tx.transaction = None; @@ -400,30 +428,8 @@ impl Database for SqlxWalletDb { .block_on(async { self.utxos_repo().find(outpoint).await }) } fn get_raw_tx(&self, tx_id: &Txid) -> Result, bdk::Error> { - if let Some(transaction) = self - .batch - .txs - .get(tx_id) - .and_then(|tx| tx.transaction.clone()) - { - return Ok(Some(transaction)); - } - - if let Some(transaction) = self.cache.get_tx(tx_id)?.and_then(|tx| tx.transaction) { - return Ok(Some(transaction)); - } - - let found = self - .ctx - .rt - .block_on(async { self.transactions_repo().find_by_id(tx_id).await })?; - - if let Some(tx) = found { - self.cache.insert_tx(tx.txid, tx.clone())?; - Ok(tx.transaction) - } else { - Ok(None) - } + self.lookup_tx_with_mode(tx_id, TxLookupMode::RequireRaw) + .map(|tx| tx.and_then(|tx| tx.transaction)) } fn get_tx( &self, @@ -487,24 +493,33 @@ impl BatchDatabase for SqlxWalletDb { let pool = batch.ctx.pool.clone(); batch.ctx.rt.block_on(async move { + let mut tx = pool + .begin() + .await + .map_err(|e| bdk::Error::Generic(e.to_string()))?; + if !addresses_for_db.is_empty() { ScriptPubkeys::new(keychain_id, pool.clone()) - .persist_all(addresses_for_db) + .persist_all_in_tx(&mut tx, addresses_for_db) .await?; } if !utxos_for_db.is_empty() { Utxos::new(keychain_id, pool.clone()) - .persist_all(utxos_for_db) + .persist_all_in_tx(&mut tx, utxos_for_db) .await?; } if !txs_for_db.is_empty() { Transactions::new(keychain_id, pool) - .persist_all(txs_for_db) + .persist_all_in_tx(&mut tx, txs_for_db) .await?; } + tx.commit() + .await + .map_err(|e| bdk::Error::Generic(e.to_string()))?; + Ok::<_, bdk::Error>(()) })?; diff --git a/src/bdk/pg/script_pubkeys.rs b/src/bdk/pg/script_pubkeys.rs index c6fa643b..c5d7aa74 100644 --- a/src/bdk/pg/script_pubkeys.rs +++ b/src/bdk/pg/script_pubkeys.rs @@ -1,5 +1,4 @@ -use sqlx::{PgPool, Postgres, QueryBuilder}; -use std::collections::HashMap; +use sqlx::{PgPool, Postgres, QueryBuilder, Transaction}; use tracing::instrument; use uuid::Uuid; @@ -17,6 +16,8 @@ impl ScriptPubkeys { } #[instrument(name = "bdk.script_pubkeys.persist_all", skip_all)] + // Retained for non-transactional call sites and focused tests. + #[allow(dead_code)] pub async fn persist_all( &self, keys: Vec<(BdkKeychainKind, u32, ScriptBuf)>, @@ -39,12 +40,45 @@ impl ScriptPubkeys { }); query_builder.push("ON CONFLICT DO NOTHING"); - let query = query_builder.build(); - query + query_builder + .build() .execute(&self.pool) .await .map_err(|e| bdk::Error::Generic(e.to_string()))?; } + + Ok(()) + } + + pub async fn persist_all_in_tx( + &self, + tx: &mut Transaction<'_, Postgres>, + keys: Vec<(BdkKeychainKind, u32, ScriptBuf)>, + ) -> Result<(), bdk::Error> { + const BATCH_SIZE: usize = 5000; + let chunks = keys.chunks(BATCH_SIZE); + for chunk in chunks { + let mut query_builder: QueryBuilder = QueryBuilder::new( + r#"INSERT INTO bdk_script_pubkeys + (keychain_id, keychain_kind, path, script, script_hex, script_fmt)"#, + ); + + query_builder.push_values(chunk, |mut builder, (keychain, path, script)| { + builder.push_bind(self.keychain_id); + builder.push_bind(keychain); + builder.push_bind(*path as i32); + builder.push_bind(script.as_bytes()); + builder.push_bind(format!("{script:02x}")); + builder.push_bind(format!("{script:?}")); + }); + query_builder.push("ON CONFLICT DO NOTHING"); + + query_builder + .build() + .execute(tx.as_mut()) + .await + .map_err(|e| bdk::Error::Generic(e.to_string()))?; + } Ok(()) } @@ -69,30 +103,6 @@ impl ScriptPubkeys { Ok(row.map(|row| ScriptBuf::from(row.script))) } - #[allow(dead_code)] - #[instrument(name = "bdk.script_pubkeys.load_all", skip_all)] - pub async fn load_all( - &self, - ) -> Result, bdk::Error> { - let rows = sqlx::query!( - r#"SELECT script, keychain_kind as "keychain_kind: BdkKeychainKind", path FROM bdk_script_pubkeys - WHERE keychain_id = $1"#, - Uuid::from(self.keychain_id), - ) - .fetch_all(&self.pool) - .await - .map_err(|e| bdk::Error::Generic(e.to_string()))?; - let mut ret = HashMap::new(); - for row in rows { - ret.insert( - ScriptBuf::from(row.script), - (bdk::KeychainKind::from(row.keychain_kind), row.path as u32), - ); - } - Ok(ret) - } - - #[allow(dead_code)] #[instrument(name = "bdk.script_pubkeys.find_path", skip_all)] pub async fn find_path( &self, diff --git a/src/bdk/pg/transactions.rs b/src/bdk/pg/transactions.rs index 6514bfff..dca973cd 100644 --- a/src/bdk/pg/transactions.rs +++ b/src/bdk/pg/transactions.rs @@ -1,5 +1,5 @@ use bdk::{bitcoin::Txid, BlockTime, LocalUtxo, TransactionDetails}; -use sqlx::{PgPool, Postgres, QueryBuilder, Transaction}; +use sqlx::{PgPool, Postgres, QueryBuilder, Transaction as SqlxTransaction}; use tracing::instrument; use std::collections::HashMap; @@ -36,6 +36,8 @@ impl Transactions { } #[instrument(name = "bdk.transactions.persist", skip_all)] + // Retained for non-transactional call sites and focused tests. + #[allow(dead_code)] pub async fn persist_all(&self, txs: Vec) -> Result<(), bdk::Error> { const BATCH_SIZE: usize = 2000; let batches = txs.chunks(BATCH_SIZE); @@ -85,8 +87,8 @@ impl Transactions { OR bdk_transactions.deleted_at IS NOT NULL", ); - let query = query_builder.build(); - query + query_builder + .build() .execute(&self.pool) .await .map_err(|e| bdk::Error::Generic(e.to_string()))?; @@ -95,6 +97,69 @@ impl Transactions { Ok(()) } + pub async fn persist_all_in_tx( + &self, + tx: &mut SqlxTransaction<'_, Postgres>, + txs: Vec, + ) -> Result<(), bdk::Error> { + const BATCH_SIZE: usize = 2000; + let batches = txs.chunks(BATCH_SIZE); + + for batch in batches { + let serialized_batch = batch + .iter() + .map(|tx| { + Ok::<_, bdk::Error>(( + tx.txid.to_string(), + serde_json::to_value(tx).map_err(|e| { + bdk::Error::Generic(format!("failed to serialize tx details: {e}")) + })?, + tx.sent as i64, + tx.confirmation_time.as_ref().map(|t| t.height as i32), + )) + }) + .collect::, _>>()?; + + let mut query_builder: QueryBuilder = QueryBuilder::new( + r#" + INSERT INTO bdk_transactions + (keychain_id, tx_id, details_json, sent, height)"#, + ); + + query_builder.push_values( + serialized_batch, + |mut builder, (tx_id, details_json, sent, height)| { + builder.push_bind(self.keychain_id as KeychainId); + builder.push_bind(tx_id); + builder.push_bind(details_json); + builder.push_bind(sent); + builder.push_bind(height); + }, + ); + + query_builder.push( + "ON CONFLICT (keychain_id, tx_id) DO UPDATE \ + SET details_json = EXCLUDED.details_json,\ + sent = EXCLUDED.sent,\ + height = EXCLUDED.height,\ + modified_at = NOW(),\ + deleted_at = NULL \ + WHERE bdk_transactions.details_json IS DISTINCT FROM EXCLUDED.details_json \ + OR bdk_transactions.sent IS DISTINCT FROM EXCLUDED.sent \ + OR bdk_transactions.height IS DISTINCT FROM EXCLUDED.height \ + OR bdk_transactions.deleted_at IS NOT NULL", + ); + + query_builder + .build() + .execute(tx.as_mut()) + .await + .map_err(|e| bdk::Error::Generic(e.to_string()))?; + } + + Ok(()) + } + #[instrument(name = "bdk.transactions.delete", skip_all)] pub async fn delete(&self, tx_id: &Txid) -> Result, bdk::Error> { let tx = sqlx::query!( @@ -116,7 +181,6 @@ impl Transactions { .transpose() } - #[allow(dead_code)] #[instrument(name = "bdk.transactions.find_by_id", skip_all)] pub async fn find_by_id(&self, tx_id: &Txid) -> Result, bdk::Error> { let tx = sqlx::query!( @@ -304,7 +368,7 @@ impl Transactions { #[instrument(name = "bdk.transactions.find_confirmed_spend_tx", skip(self, tx))] pub async fn find_confirmed_spend_tx( &self, - tx: &mut Transaction<'_, Postgres>, + tx: &mut SqlxTransaction<'_, Postgres>, min_height: u32, ) -> Result, BdkError> { let rows = sqlx::query!(r#" @@ -396,7 +460,7 @@ impl Transactions { #[instrument(name = "bdk.transactions.mark_confirmed", skip(self))] pub async fn mark_confirmed( &self, - tx: &mut Transaction<'_, Postgres>, + tx: &mut SqlxTransaction<'_, Postgres>, tx_id: bitcoin::Txid, ) -> Result<(), BdkError> { sqlx::query!( @@ -416,7 +480,7 @@ impl Transactions { )] pub async fn delete_transaction_if_no_more_utxos_exist( &self, - tx: &mut Transaction<'_, Postgres>, + tx: &mut SqlxTransaction<'_, Postgres>, outpoint: bitcoin::OutPoint, ) -> Result<(), BdkError> { sqlx::query!( diff --git a/src/bdk/pg/utxos.rs b/src/bdk/pg/utxos.rs index aab4e39d..736c2eae 100644 --- a/src/bdk/pg/utxos.rs +++ b/src/bdk/pg/utxos.rs @@ -1,5 +1,5 @@ use bdk::{bitcoin::blockdata::transaction::OutPoint, LocalUtxo, TransactionDetails}; -use sqlx::{PgPool, Postgres, QueryBuilder, Transaction}; +use sqlx::{PgPool, Postgres, QueryBuilder, Transaction as SqlxTransaction}; use tracing::instrument; use uuid::Uuid; @@ -22,6 +22,8 @@ impl Utxos { } #[instrument(name = "bdk.utxos.persist_all", skip_all)] + // Retained for non-transactional call sites and focused tests. + #[allow(dead_code)] pub async fn persist_all(&self, utxos: Vec) -> Result<(), bdk::Error> { const BATCH_SIZE: usize = 2000; let batches = utxos.chunks(BATCH_SIZE); @@ -68,8 +70,8 @@ impl Utxos { OR bdk_utxos.deleted_at IS NOT NULL", ); - let query = query_builder.build(); - query + query_builder + .build() .execute(&self.pool) .await .map_err(|e| bdk::Error::Generic(e.to_string()))?; @@ -78,6 +80,66 @@ impl Utxos { Ok(()) } + pub async fn persist_all_in_tx( + &self, + tx: &mut SqlxTransaction<'_, Postgres>, + utxos: Vec, + ) -> Result<(), bdk::Error> { + const BATCH_SIZE: usize = 2000; + let batches = utxos.chunks(BATCH_SIZE); + + for batch in batches { + let serialized_batch = batch + .iter() + .map(|utxo| { + Ok::<_, bdk::Error>(( + utxo.outpoint.txid.to_string(), + utxo.outpoint.vout as i32, + serde_json::to_value(utxo).map_err(|e| { + bdk::Error::Generic(format!("failed to serialize utxo: {e}")) + })?, + utxo.is_spent, + )) + }) + .collect::, _>>()?; + + let mut query_builder: QueryBuilder = QueryBuilder::new( + r#"INSERT INTO bdk_utxos + (keychain_id, tx_id, vout, utxo_json, is_spent)"#, + ); + + query_builder.push_values( + serialized_batch, + |mut builder, (tx_id, vout, utxo_json, is_spent)| { + builder.push_bind(Uuid::from(self.keychain_id)); + builder.push_bind(tx_id); + builder.push_bind(vout); + builder.push_bind(utxo_json); + builder.push_bind(is_spent); + }, + ); + + query_builder.push( + "ON CONFLICT (keychain_id, tx_id, vout) DO UPDATE \ + SET utxo_json = EXCLUDED.utxo_json,\ + is_spent = EXCLUDED.is_spent,\ + modified_at = NOW(),\ + deleted_at = NULL \ + WHERE bdk_utxos.utxo_json IS DISTINCT FROM EXCLUDED.utxo_json \ + OR bdk_utxos.is_spent IS DISTINCT FROM EXCLUDED.is_spent \ + OR bdk_utxos.deleted_at IS NOT NULL", + ); + + query_builder + .build() + .execute(tx.as_mut()) + .await + .map_err(|e| bdk::Error::Generic(e.to_string()))?; + } + + Ok(()) + } + #[instrument(name = "bdk.utxos.delete", skip_all)] pub async fn delete( &self, @@ -163,7 +225,7 @@ impl Utxos { #[instrument(name = "bdk.utxos.mark_as_synced", skip(self, tx))] pub async fn mark_as_synced( &self, - tx: &mut Transaction<'_, Postgres>, + tx: &mut SqlxTransaction<'_, Postgres>, utxo: &LocalUtxo, ) -> Result<(), BdkError> { sqlx::query!( @@ -181,7 +243,7 @@ impl Utxos { #[instrument(name = "bdk.utxos.mark_confirmed", skip(self, tx))] pub async fn mark_confirmed( &self, - tx: &mut Transaction<'_, Postgres>, + tx: &mut SqlxTransaction<'_, Postgres>, utxo: &LocalUtxo, ) -> Result<(), BdkError> { sqlx::query!( @@ -199,7 +261,7 @@ impl Utxos { #[instrument(name = "bdk.utxos.find_confirmed_income_utxo", skip(self, tx))] pub async fn find_confirmed_income_utxo( &self, - tx: &mut Transaction<'_, Postgres>, + tx: &mut SqlxTransaction<'_, Postgres>, min_height: u32, ) -> Result, BdkError> { let row = sqlx::query!( @@ -251,7 +313,7 @@ impl Utxos { #[instrument(name = "bdk.utxos.find_and_remove_soft_deleted_utxo", skip_all)] pub async fn find_and_remove_soft_deleted_utxo( &self, - tx: &mut Transaction<'_, Postgres>, + tx: &mut SqlxTransaction<'_, Postgres>, ) -> Result, BdkError> { let row = sqlx::query!( r#"DELETE FROM bdk_utxos From 874b7494bf89cb6a64728d445aa125d799089bd3 Mon Sep 17 00:00:00 2001 From: Juan P Lopez Date: Thu, 16 Apr 2026 13:19:29 -0500 Subject: [PATCH 05/12] perf(bdk): memoize raw transaction iteration cache Avoid repeated full load_all deserialization in iter_txs(true) by memoizing the raw transaction cache after first successful load. Preserve cache correctness for staged updates and add focused unit coverage for the raw-cache-loaded flag. --- src/bdk/pg/mod.rs | 44 +++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 41 insertions(+), 3 deletions(-) diff --git a/src/bdk/pg/mod.rs b/src/bdk/pg/mod.rs index 8c8bcb22..d85c817a 100644 --- a/src/bdk/pg/mod.rs +++ b/src/bdk/pg/mod.rs @@ -21,6 +21,7 @@ use index::Indexes; use script_pubkeys::ScriptPubkeys; use std::{ collections::HashMap, + sync::atomic::{AtomicBool, Ordering}, sync::{Arc, Mutex, MutexGuard}, }; pub(super) use sync_times::SyncTimes; @@ -64,6 +65,7 @@ struct WalletBatchState { struct WalletCache { script_pubkeys: Arc>, transactions: Arc>, + raw_txs_fully_loaded: Arc, } impl WalletCache { @@ -71,6 +73,7 @@ impl WalletCache { Self { script_pubkeys: Arc::new(Mutex::new(HashMap::new())), transactions: Arc::new(Mutex::new(HashMap::new())), + raw_txs_fully_loaded: Arc::new(AtomicBool::new(false)), } } @@ -133,6 +136,19 @@ impl WalletCache { Ok(()) } + fn all_txs(&self) -> Result, bdk::Error> { + let cache = self.lock_transactions()?; + Ok(cache.values().cloned().collect()) + } + + fn raw_txs_fully_loaded(&self) -> bool { + self.raw_txs_fully_loaded.load(Ordering::Relaxed) + } + + fn set_raw_txs_fully_loaded(&self) { + self.raw_txs_fully_loaded.store(true, Ordering::Relaxed); + } + fn remove_tx(&self, txid: &Txid) -> Result<(), bdk::Error> { let mut cache = self.lock_transactions()?; cache.remove(txid); @@ -383,9 +399,22 @@ impl Database for SqlxWalletDb { fn iter_txs(&self, include_raw: bool) -> Result, bdk::Error> { let mut txs = if include_raw { - self.ctx - .rt - .block_on(async { self.transactions_repo().load_all().await })? + if self.cache.raw_txs_fully_loaded() { + self.cache + .all_txs()? + .into_iter() + .map(|tx| (tx.txid, tx)) + .collect() + } else { + let loaded = self + .ctx + .rt + .block_on(async { self.transactions_repo().load_all().await })?; + self.cache + .extend_txs(loaded.iter().map(|(txid, tx)| (*txid, tx.clone())))?; + self.cache.set_raw_txs_fully_loaded(); + loaded + } } else { self.ctx .rt @@ -577,4 +606,13 @@ mod tests { .expect("get should succeed"); assert_eq!(loaded, Some(path)); } + + #[test] + fn wallet_cache_raw_txs_loaded_flag_defaults_false_and_can_be_set() { + let cache = WalletCache::new(); + assert!(!cache.raw_txs_fully_loaded()); + + cache.set_raw_txs_fully_loaded(); + assert!(cache.raw_txs_fully_loaded()); + } } From 85bb9bbba35e534ca9e079eb506479ef967eb81a Mon Sep 17 00:00:00 2001 From: Juan P Lopez Date: Thu, 16 Apr 2026 13:51:52 -0500 Subject: [PATCH 06/12] test(bdk): cover tx cache iteration semantics Add focused unit coverage for transaction cache behavior used by iter_txs summary/raw paths, and harden summary loading with negative-height validation. Also strengthen raw-cache-loaded atomic ordering with acquire/release semantics. --- src/bdk/pg/mod.rs | 80 ++++++++++++++++++++++++++++++-------- src/bdk/pg/transactions.rs | 11 +++++- 2 files changed, 74 insertions(+), 17 deletions(-) diff --git a/src/bdk/pg/mod.rs b/src/bdk/pg/mod.rs index d85c817a..aae3cea6 100644 --- a/src/bdk/pg/mod.rs +++ b/src/bdk/pg/mod.rs @@ -142,11 +142,11 @@ impl WalletCache { } fn raw_txs_fully_loaded(&self) -> bool { - self.raw_txs_fully_loaded.load(Ordering::Relaxed) + self.raw_txs_fully_loaded.load(Ordering::Acquire) } fn set_raw_txs_fully_loaded(&self) { - self.raw_txs_fully_loaded.store(true, Ordering::Relaxed); + self.raw_txs_fully_loaded.store(true, Ordering::Release); } fn remove_tx(&self, txid: &Txid) -> Result<(), bdk::Error> { @@ -258,6 +258,24 @@ impl SqlxWalletDb { fn lookup_tx(&self, txid: &Txid) -> Result, bdk::Error> { self.lookup_tx_with_mode(txid, TxLookupMode::Any) } + + fn overlay_batch_txs( + mut txs: HashMap, + batch_txs: &HashMap, + include_raw: bool, + ) -> HashMap { + if include_raw { + txs.extend(batch_txs.iter().map(|(id, tx)| (*id, tx.clone()))); + } else { + txs.extend(batch_txs.iter().map(|(id, tx)| { + let mut tx = tx.clone(); + tx.transaction = None; + (*id, tx) + })); + } + + txs + } } impl BatchOperations for SqlxWalletDb { @@ -398,7 +416,7 @@ impl Database for SqlxWalletDb { } fn iter_txs(&self, include_raw: bool) -> Result, bdk::Error> { - let mut txs = if include_raw { + let txs = if include_raw { if self.cache.raw_txs_fully_loaded() { self.cache .all_txs()? @@ -421,19 +439,9 @@ impl Database for SqlxWalletDb { .block_on(async { self.transactions_repo().load_all_summaries().await })? }; - if include_raw { - txs.extend(self.batch.txs.iter().map(|(id, tx)| (*id, tx.clone()))); - } else { - // `load_all_summaries` already strips raw transactions from persisted rows, but - // batch entries can still carry `transaction: Some(_)`; normalize them here. - txs.extend(self.batch.txs.iter().map(|(id, tx)| { - let mut tx = tx.clone(); - tx.transaction = None; - (*id, tx) - })); - } - - Ok(txs.into_values().collect()) + Ok(Self::overlay_batch_txs(txs, &self.batch.txs, include_raw) + .into_values() + .collect()) } fn get_script_pubkey_from_path( @@ -615,4 +623,44 @@ mod tests { cache.set_raw_txs_fully_loaded(); assert!(cache.raw_txs_fully_loaded()); } + + #[test] + fn overlay_batch_txs_strips_raw_when_include_raw_is_false() { + let txid = Txid::all_zeros(); + let mut base = HashMap::new(); + base.insert(txid, tx_details(txid)); + + let raw_tx = bdk::bitcoin::Transaction { + version: 2, + lock_time: bdk::bitcoin::absolute::LockTime::ZERO, + input: Vec::new(), + output: Vec::new(), + }; + + let mut batch = HashMap::new(); + let mut batch_tx = tx_details(txid); + batch_tx.transaction = Some(raw_tx); + batch.insert(txid, batch_tx); + + let merged = SqlxWalletDb::overlay_batch_txs(base, &batch, false); + assert!(merged + .get(&txid) + .expect("merged tx should exist") + .transaction + .is_none()); + } + + #[test] + fn wallet_cache_all_txs_returns_cached_values() { + let cache = WalletCache::new(); + let txid = Txid::all_zeros(); + + cache + .insert_tx(txid, tx_details(txid)) + .expect("insert should succeed"); + + let txs = cache.all_txs().expect("all_txs should succeed"); + assert_eq!(txs.len(), 1); + assert_eq!(txs[0].txid, txid); + } } diff --git a/src/bdk/pg/transactions.rs b/src/bdk/pg/transactions.rs index dca973cd..0d3f7c2f 100644 --- a/src/bdk/pg/transactions.rs +++ b/src/bdk/pg/transactions.rs @@ -254,6 +254,15 @@ impl Transactions { Ok(value as u64) } + fn to_u32(value: i32, field: &str) -> Result { + if value < 0 { + return Err(bdk::Error::Generic(format!( + "negative {field} value in bdk_transactions" + ))); + } + Ok(value as u32) + } + rows.into_iter() .map(|row| { let txid = row @@ -263,7 +272,7 @@ impl Transactions { let confirmation_time = match (row.height, row.confirmation_timestamp) { (Some(height), Some(timestamp)) => Some(BlockTime { - height: height as u32, + height: to_u32(height, "height")?, timestamp: to_u64(timestamp, "confirmation timestamp")?, }), _ => None, From 22d5ad57a789cf79c96c0ea34e254cda7b348bde Mon Sep 17 00:00:00 2001 From: Juan P Lopez Date: Thu, 16 Apr 2026 14:26:08 -0500 Subject: [PATCH 07/12] refactor(bdk): simplify pg adapter internals Reduce duplication and clarify control flow in the BDK Postgres adapters while preserving the recent atomic commit and cache semantics. This keeps behavior unchanged but makes error handling and query paths easier to reason about. --- .cargo/audit.toml | 3 +-- src/bdk/pg/mod.rs | 38 +++++++++++++--------------- src/bdk/pg/script_pubkeys.rs | 48 ++++++++++++++---------------------- src/bdk/pg/transactions.rs | 48 +++++++++++++++++------------------- src/bdk/pg/utxos.rs | 46 +++++++++++++++------------------- 5 files changed, 79 insertions(+), 104 deletions(-) diff --git a/.cargo/audit.toml b/.cargo/audit.toml index 910cce68..0a8cdcda 100644 --- a/.cargo/audit.toml +++ b/.cargo/audit.toml @@ -2,8 +2,7 @@ ignore = [ "RUSTSEC-2023-0071", "RUSTSEC-2026-0049", - # TODO: remove once electrum-client/bdk stop pulling rustls-webpki 0.101.7 + # TODO: remove once electrum-client/bdk stops pulling rustls-webpki 0.101.7 "RUSTSEC-2026-0098", - # TODO: remove once electrum-client/bdk stop pulling rustls-webpki 0.101.7 "RUSTSEC-2026-0099", ] diff --git a/src/bdk/pg/mod.rs b/src/bdk/pg/mod.rs index aae3cea6..67bd17b4 100644 --- a/src/bdk/pg/mod.rs +++ b/src/bdk/pg/mod.rs @@ -163,6 +163,10 @@ pub struct SqlxWalletDb { } impl SqlxWalletDb { + fn unsupported_operation(operation: &str) -> bdk::Error { + bdk::Error::Generic(format!("{operation} is not supported by SqlxWalletDb")) + } + pub fn new(pool: PgPool, keychain_id: KeychainId) -> Self { Self { ctx: WalletDbContext::new(pool, keychain_id), @@ -231,12 +235,18 @@ impl SqlxWalletDb { if mode == TxLookupMode::Any || tx.transaction.is_some() { return Ok(Some(tx.clone())); } + + return Ok(None); } if let Some(tx) = self.cache.get_tx(txid)? { if mode == TxLookupMode::Any || tx.transaction.is_some() { return Ok(Some(tx)); } + + if self.cache.raw_txs_fully_loaded() { + return Ok(None); + } } let found = self @@ -295,9 +305,7 @@ impl BatchOperations for SqlxWalletDb { } fn set_raw_tx(&mut self, _: &Transaction) -> Result<(), bdk::Error> { - Err(bdk::Error::Generic( - "set_raw_tx is not supported by SqlxWalletDb".to_string(), - )) + Err(Self::unsupported_operation("set_raw_tx")) } fn set_tx(&mut self, tx: &TransactionDetails) -> Result<(), bdk::Error> { @@ -326,17 +334,13 @@ impl BatchOperations for SqlxWalletDb { _: KeychainKind, _: u32, ) -> Result, bdk::Error> { - Err(bdk::Error::Generic( - "del_script_pubkey_from_path is not supported by SqlxWalletDb".to_string(), - )) + Err(Self::unsupported_operation("del_script_pubkey_from_path")) } fn del_path_from_script_pubkey( &mut self, _: &Script, ) -> Result, bdk::Error> { - Err(bdk::Error::Generic( - "del_path_from_script_pubkey is not supported by SqlxWalletDb".to_string(), - )) + Err(Self::unsupported_operation("del_path_from_script_pubkey")) } fn del_utxo(&mut self, outpoint: &OutPoint) -> Result, bdk::Error> { self.ctx @@ -344,9 +348,7 @@ impl BatchOperations for SqlxWalletDb { .block_on(async { self.utxos_repo().delete(outpoint).await }) } fn del_raw_tx(&mut self, _: &Txid) -> Result, bdk::Error> { - Err(bdk::Error::Generic( - "del_raw_tx is not supported by SqlxWalletDb".to_string(), - )) + Err(Self::unsupported_operation("del_raw_tx")) } fn del_tx( @@ -367,14 +369,10 @@ impl BatchOperations for SqlxWalletDb { Ok(deleted) } fn del_last_index(&mut self, _: KeychainKind) -> Result, bdk::Error> { - Err(bdk::Error::Generic( - "del_last_index is not supported by SqlxWalletDb".to_string(), - )) + Err(Self::unsupported_operation("del_last_index")) } fn del_sync_time(&mut self) -> Result, bdk::Error> { - Err(bdk::Error::Generic( - "del_sync_time is not supported by SqlxWalletDb".to_string(), - )) + Err(Self::unsupported_operation("del_sync_time")) } } @@ -410,9 +408,7 @@ impl Database for SqlxWalletDb { .block_on(async { self.utxos_repo().list_local_utxos().await }) } fn iter_raw_txs(&self) -> Result, bdk::Error> { - Err(bdk::Error::Generic( - "iter_raw_txs is not supported by SqlxWalletDb".to_string(), - )) + Err(Self::unsupported_operation("iter_raw_txs")) } fn iter_txs(&self, include_raw: bool) -> Result, bdk::Error> { diff --git a/src/bdk/pg/script_pubkeys.rs b/src/bdk/pg/script_pubkeys.rs index c5d7aa74..e02eeca3 100644 --- a/src/bdk/pg/script_pubkeys.rs +++ b/src/bdk/pg/script_pubkeys.rs @@ -126,38 +126,28 @@ impl ScriptPubkeys { &self, keychain: Option>, ) -> Result, bdk::Error> { - let scripts = match keychain.map(|k| k.into()) { - Some(kind) => { - let rows = sqlx::query!( - r#"SELECT script FROM bdk_script_pubkeys + let keychain_id = Uuid::from(self.keychain_id); + let scripts = if let Some(kind) = keychain.map(Into::into) { + sqlx::query_scalar!( + r#"SELECT script FROM bdk_script_pubkeys WHERE keychain_id = $1 AND keychain_kind = $2"#, - Uuid::from(self.keychain_id), - kind as BdkKeychainKind, - ) - .fetch_all(&self.pool) - .await - .map_err(|e| bdk::Error::Generic(e.to_string()))?; - - rows.into_iter() - .map(|row| ScriptBuf::from(row.script)) - .collect() - } - None => { - let rows = sqlx::query!( - r#"SELECT script FROM bdk_script_pubkeys + keychain_id, + kind as BdkKeychainKind, + ) + .fetch_all(&self.pool) + .await + .map_err(|e| bdk::Error::Generic(e.to_string()))? + } else { + sqlx::query_scalar!( + r#"SELECT script FROM bdk_script_pubkeys WHERE keychain_id = $1"#, - Uuid::from(self.keychain_id), - ) - .fetch_all(&self.pool) - .await - .map_err(|e| bdk::Error::Generic(e.to_string()))?; - - rows.into_iter() - .map(|row| ScriptBuf::from(row.script)) - .collect() - } + keychain_id, + ) + .fetch_all(&self.pool) + .await + .map_err(|e| bdk::Error::Generic(e.to_string()))? }; - Ok(scripts) + Ok(scripts.into_iter().map(ScriptBuf::from).collect()) } } diff --git a/src/bdk/pg/transactions.rs b/src/bdk/pg/transactions.rs index 0d3f7c2f..64ca305a 100644 --- a/src/bdk/pg/transactions.rs +++ b/src/bdk/pg/transactions.rs @@ -6,6 +6,8 @@ use std::collections::HashMap; use crate::{bdk::error::BdkError, primitives::*}; +type SerializedTransactionRow = (String, serde_json::Value, i64, Option); + #[derive(Debug)] pub struct UnsyncedTransaction { pub tx_id: bitcoin::Txid, @@ -31,6 +33,24 @@ pub struct Transactions { } impl Transactions { + fn serialize_batch( + batch: &[TransactionDetails], + ) -> Result, bdk::Error> { + batch + .iter() + .map(|tx| { + Ok::<_, bdk::Error>(( + tx.txid.to_string(), + serde_json::to_value(tx).map_err(|e| { + bdk::Error::Generic(format!("failed to serialize tx details: {e}")) + })?, + tx.sent as i64, + tx.confirmation_time.as_ref().map(|t| t.height as i32), + )) + }) + .collect() + } + pub fn new(keychain_id: KeychainId, pool: PgPool) -> Self { Self { keychain_id, pool } } @@ -43,19 +63,7 @@ impl Transactions { let batches = txs.chunks(BATCH_SIZE); for batch in batches { - let serialized_batch = batch - .iter() - .map(|tx| { - Ok::<_, bdk::Error>(( - tx.txid.to_string(), - serde_json::to_value(tx).map_err(|e| { - bdk::Error::Generic(format!("failed to serialize tx details: {e}")) - })?, - tx.sent as i64, - tx.confirmation_time.as_ref().map(|t| t.height as i32), - )) - }) - .collect::, _>>()?; + let serialized_batch = Self::serialize_batch(batch)?; let mut query_builder: QueryBuilder = QueryBuilder::new( r#" @@ -106,19 +114,7 @@ impl Transactions { let batches = txs.chunks(BATCH_SIZE); for batch in batches { - let serialized_batch = batch - .iter() - .map(|tx| { - Ok::<_, bdk::Error>(( - tx.txid.to_string(), - serde_json::to_value(tx).map_err(|e| { - bdk::Error::Generic(format!("failed to serialize tx details: {e}")) - })?, - tx.sent as i64, - tx.confirmation_time.as_ref().map(|t| t.height as i32), - )) - }) - .collect::, _>>()?; + let serialized_batch = Self::serialize_batch(batch)?; let mut query_builder: QueryBuilder = QueryBuilder::new( r#" diff --git a/src/bdk/pg/utxos.rs b/src/bdk/pg/utxos.rs index 736c2eae..cc0a25c8 100644 --- a/src/bdk/pg/utxos.rs +++ b/src/bdk/pg/utxos.rs @@ -5,6 +5,8 @@ use uuid::Uuid; use crate::{bdk::error::BdkError, primitives::*}; +type SerializedUtxoRow = (String, i32, serde_json::Value, bool); + pub struct ConfirmedIncomeUtxo { pub outpoint: bitcoin::OutPoint, pub spent: bool, @@ -17,6 +19,22 @@ pub struct Utxos { } impl Utxos { + fn serialize_batch(batch: &[LocalUtxo]) -> Result, bdk::Error> { + batch + .iter() + .map(|utxo| { + Ok::<_, bdk::Error>(( + utxo.outpoint.txid.to_string(), + utxo.outpoint.vout as i32, + serde_json::to_value(utxo).map_err(|e| { + bdk::Error::Generic(format!("failed to serialize utxo: {e}")) + })?, + utxo.is_spent, + )) + }) + .collect() + } + pub fn new(keychain_id: KeychainId, pool: PgPool) -> Self { Self { keychain_id, pool } } @@ -29,19 +47,7 @@ impl Utxos { let batches = utxos.chunks(BATCH_SIZE); for batch in batches { - let serialized_batch = batch - .iter() - .map(|utxo| { - Ok::<_, bdk::Error>(( - utxo.outpoint.txid.to_string(), - utxo.outpoint.vout as i32, - serde_json::to_value(utxo).map_err(|e| { - bdk::Error::Generic(format!("failed to serialize utxo: {e}")) - })?, - utxo.is_spent, - )) - }) - .collect::, _>>()?; + let serialized_batch = Self::serialize_batch(batch)?; let mut query_builder: QueryBuilder = QueryBuilder::new( r#"INSERT INTO bdk_utxos @@ -89,19 +95,7 @@ impl Utxos { let batches = utxos.chunks(BATCH_SIZE); for batch in batches { - let serialized_batch = batch - .iter() - .map(|utxo| { - Ok::<_, bdk::Error>(( - utxo.outpoint.txid.to_string(), - utxo.outpoint.vout as i32, - serde_json::to_value(utxo).map_err(|e| { - bdk::Error::Generic(format!("failed to serialize utxo: {e}")) - })?, - utxo.is_spent, - )) - }) - .collect::, _>>()?; + let serialized_batch = Self::serialize_batch(batch)?; let mut query_builder: QueryBuilder = QueryBuilder::new( r#"INSERT INTO bdk_utxos From 36e9815b6369e680b0c14bd9003410d133ef830d Mon Sep 17 00:00:00 2001 From: Juan P Lopez Date: Thu, 16 Apr 2026 14:59:19 -0500 Subject: [PATCH 08/12] fix(bdk): honor include_raw in get_tx Respect the Database::get_tx include_raw flag by stripping raw transaction data when requested. Also document the process-local raw tx cache hint and clarify commit_batch atomic scope boundaries. --- src/bdk/pg/mod.rs | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/src/bdk/pg/mod.rs b/src/bdk/pg/mod.rs index 67bd17b4..310fcde1 100644 --- a/src/bdk/pg/mod.rs +++ b/src/bdk/pg/mod.rs @@ -65,6 +65,8 @@ struct WalletBatchState { struct WalletCache { script_pubkeys: Arc>, transactions: Arc>, + // Process-local hint: true means this instance has already hydrated raw tx details + // from the DB at least once. It is intentionally not synchronized across processes. raw_txs_fully_loaded: Arc, } @@ -467,9 +469,16 @@ impl Database for SqlxWalletDb { fn get_tx( &self, tx_id: &Txid, - _include_raw: bool, + include_raw: bool, ) -> Result, bdk::Error> { - self.lookup_tx(tx_id) + self.lookup_tx(tx_id).map(|tx| { + tx.map(|mut tx| { + if !include_raw { + tx.transaction = None; + } + tx + }) + }) } fn get_last_index(&self, kind: KeychainKind) -> Result, bdk::Error> { self.ctx @@ -503,6 +512,8 @@ impl BatchDatabase for SqlxWalletDb { &mut self, mut batch: ::Batch, ) -> Result<(), bdk::Error> { + // Atomic scope here is limited to staged script pubkeys, utxos, and transactions. + // `set_last_index` / `set_sync_time` remain immediate writes by design. let (addresses_for_cache, addresses_for_db): (Vec<_>, Vec<_>) = batch .batch .addresses From b4b680cb067ba29110e39d51ce5363df01cb1587 Mon Sep 17 00:00:00 2001 From: Juan P Lopez Date: Thu, 16 Apr 2026 19:12:07 -0500 Subject: [PATCH 09/12] perf(sync): warm bdk caches and avoid retry on settled cleanup Warm script-path and tx-summary caches during iteration to reduce repeated find_path/find_by_id lookups in wallet sync. Treat soft-delete cleanup for already-settled UTXOs as a non-fatal condition to prevent retry cascades. --- ...e6679816cf5c9f86ea2b4a9d1b95458ec56a1.json | 44 +++++ ...5633b758af6a7070b939032394b6525d4a7c3.json | 55 ++++++ src/bdk/pg/mod.rs | 161 +++++++++++++++--- src/bdk/pg/script_pubkeys.rs | 63 ++++++- src/job/sync_wallet.rs | 8 + 5 files changed, 304 insertions(+), 27 deletions(-) create mode 100644 .sqlx/query-846df15d0a9709ad69c5ff8e6c3e6679816cf5c9f86ea2b4a9d1b95458ec56a1.json create mode 100644 .sqlx/query-a49bfbc8c8ca875c146ac678f5f5633b758af6a7070b939032394b6525d4a7c3.json diff --git a/.sqlx/query-846df15d0a9709ad69c5ff8e6c3e6679816cf5c9f86ea2b4a9d1b95458ec56a1.json b/.sqlx/query-846df15d0a9709ad69c5ff8e6c3e6679816cf5c9f86ea2b4a9d1b95458ec56a1.json new file mode 100644 index 00000000..a1b78b6f --- /dev/null +++ b/.sqlx/query-846df15d0a9709ad69c5ff8e6c3e6679816cf5c9f86ea2b4a9d1b95458ec56a1.json @@ -0,0 +1,44 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT script, keychain_kind as \"keychain_kind: BdkKeychainKind\", path FROM bdk_script_pubkeys\n WHERE keychain_id = $1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "script", + "type_info": "Bytea" + }, + { + "ordinal": 1, + "name": "keychain_kind: BdkKeychainKind", + "type_info": { + "Custom": { + "name": "bdkkeychainkind", + "kind": { + "Enum": [ + "external", + "internal" + ] + } + } + } + }, + { + "ordinal": 2, + "name": "path", + "type_info": "Int4" + } + ], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [ + false, + false, + false + ] + }, + "hash": "846df15d0a9709ad69c5ff8e6c3e6679816cf5c9f86ea2b4a9d1b95458ec56a1" +} diff --git a/.sqlx/query-a49bfbc8c8ca875c146ac678f5f5633b758af6a7070b939032394b6525d4a7c3.json b/.sqlx/query-a49bfbc8c8ca875c146ac678f5f5633b758af6a7070b939032394b6525d4a7c3.json new file mode 100644 index 00000000..dd5e8b89 --- /dev/null +++ b/.sqlx/query-a49bfbc8c8ca875c146ac678f5f5633b758af6a7070b939032394b6525d4a7c3.json @@ -0,0 +1,55 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT script, keychain_kind as \"keychain_kind: BdkKeychainKind\", path FROM bdk_script_pubkeys\n WHERE keychain_id = $1 AND keychain_kind = $2", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "script", + "type_info": "Bytea" + }, + { + "ordinal": 1, + "name": "keychain_kind: BdkKeychainKind", + "type_info": { + "Custom": { + "name": "bdkkeychainkind", + "kind": { + "Enum": [ + "external", + "internal" + ] + } + } + } + }, + { + "ordinal": 2, + "name": "path", + "type_info": "Int4" + } + ], + "parameters": { + "Left": [ + "Uuid", + { + "Custom": { + "name": "bdkkeychainkind", + "kind": { + "Enum": [ + "external", + "internal" + ] + } + } + } + ] + }, + "nullable": [ + false, + false, + false + ] + }, + "hash": "a49bfbc8c8ca875c146ac678f5f5633b758af6a7070b939032394b6525d4a7c3" +} diff --git a/src/bdk/pg/mod.rs b/src/bdk/pg/mod.rs index 310fcde1..7090186a 100644 --- a/src/bdk/pg/mod.rs +++ b/src/bdk/pg/mod.rs @@ -21,7 +21,7 @@ use index::Indexes; use script_pubkeys::ScriptPubkeys; use std::{ collections::HashMap, - sync::atomic::{AtomicBool, Ordering}, + sync::atomic::{AtomicBool, AtomicU8, Ordering}, sync::{Arc, Mutex, MutexGuard}, }; pub(super) use sync_times::SyncTimes; @@ -65,6 +65,9 @@ struct WalletBatchState { struct WalletCache { script_pubkeys: Arc>, transactions: Arc>, + // Process-local hint for which keychain script path sets are fully hydrated. + // Bit 0: external, bit 1: internal. + script_pubkeys_loaded_mask: Arc, // Process-local hint: true means this instance has already hydrated raw tx details // from the DB at least once. It is intentionally not synchronized across processes. raw_txs_fully_loaded: Arc, @@ -75,10 +78,21 @@ impl WalletCache { Self { script_pubkeys: Arc::new(Mutex::new(HashMap::new())), transactions: Arc::new(Mutex::new(HashMap::new())), + script_pubkeys_loaded_mask: Arc::new(AtomicU8::new(0)), raw_txs_fully_loaded: Arc::new(AtomicBool::new(false)), } } + fn script_pubkey_mask_for(keychain: Option) -> u8 { + const EXTERNAL: u8 = 1; + const INTERNAL: u8 = 2; + match keychain { + Some(KeychainKind::External) => EXTERNAL, + Some(KeychainKind::Internal) => INTERNAL, + None => EXTERNAL | INTERNAL, + } + } + fn lock_script_pubkeys(&self) -> Result, bdk::Error> { self.script_pubkeys .lock() @@ -118,6 +132,30 @@ impl WalletCache { Ok(()) } + fn all_script_pubkeys( + &self, + keychain: Option, + ) -> Result, bdk::Error> { + let cache = self.lock_script_pubkeys()?; + Ok(cache + .iter() + .filter(|(_, (kind, _))| keychain.is_none_or(|k| *kind == k)) + .map(|(script, _)| script.clone()) + .collect()) + } + + fn script_pubkeys_fully_loaded(&self, keychain: Option) -> bool { + let required_mask = Self::script_pubkey_mask_for(keychain); + let loaded_mask = self.script_pubkeys_loaded_mask.load(Ordering::Acquire); + loaded_mask & required_mask == required_mask + } + + fn mark_script_pubkeys_loaded(&self, keychain: Option) { + let mask = Self::script_pubkey_mask_for(keychain); + self.script_pubkeys_loaded_mask + .fetch_or(mask, Ordering::Release); + } + fn get_tx(&self, txid: &Txid) -> Result, bdk::Error> { let cache = self.lock_transactions()?; Ok(cache.get(txid).cloned()) @@ -138,6 +176,23 @@ impl WalletCache { Ok(()) } + fn extend_summary_txs(&self, entries: I) -> Result<(), bdk::Error> + where + I: IntoIterator, + { + let mut cache = self.lock_transactions()?; + for (txid, mut summary) in entries { + if let Some(raw_tx) = cache + .get(&txid) + .and_then(|existing| existing.transaction.clone()) + { + summary.transaction = Some(raw_tx); + } + cache.insert(txid, summary); + } + Ok(()) + } + fn all_txs(&self) -> Result, bdk::Error> { let cache = self.lock_transactions()?; Ok(cache.values().cloned().collect()) @@ -234,7 +289,7 @@ impl SqlxWalletDb { mode: TxLookupMode, ) -> Result, bdk::Error> { if let Some(tx) = self.batch.txs.get(txid) { - if mode == TxLookupMode::Any || tx.transaction.is_some() { + if Self::tx_matches_lookup_mode(tx, mode) { return Ok(Some(tx.clone())); } @@ -242,7 +297,7 @@ impl SqlxWalletDb { } if let Some(tx) = self.cache.get_tx(txid)? { - if mode == TxLookupMode::Any || tx.transaction.is_some() { + if Self::tx_matches_lookup_mode(&tx, mode) { return Ok(Some(tx)); } @@ -271,6 +326,20 @@ impl SqlxWalletDb { self.lookup_tx_with_mode(txid, TxLookupMode::Any) } + fn tx_matches_lookup_mode(tx: &TransactionDetails, mode: TxLookupMode) -> bool { + mode == TxLookupMode::Any || tx.transaction.is_some() + } + + fn tx_without_raw(mut tx: TransactionDetails) -> TransactionDetails { + tx.transaction = None; + tx + } + + fn without_raw(tx: TransactionDetails) -> (Txid, TransactionDetails) { + let txid = tx.txid; + (txid, Self::tx_without_raw(tx)) + } + fn overlay_batch_txs( mut txs: HashMap, batch_txs: &HashMap, @@ -279,11 +348,11 @@ impl SqlxWalletDb { if include_raw { txs.extend(batch_txs.iter().map(|(id, tx)| (*id, tx.clone()))); } else { - txs.extend(batch_txs.iter().map(|(id, tx)| { - let mut tx = tx.clone(); - tx.transaction = None; - (*id, tx) - })); + txs.extend( + batch_txs + .iter() + .map(|(id, tx)| (*id, Self::tx_without_raw(tx.clone()))), + ); } txs @@ -400,9 +469,27 @@ impl Database for SqlxWalletDb { &self, keychain: Option, ) -> Result, bdk::Error> { - self.ctx - .rt - .block_on(async { self.script_pubkeys_repo().list_scripts(keychain).await }) + if self.cache.script_pubkeys_fully_loaded(keychain) { + return self.cache.all_script_pubkeys(keychain); + } + + let scripts_with_paths = self.ctx.rt.block_on(async { + self.script_pubkeys_repo() + .list_scripts_with_paths(keychain) + .await + })?; + + self.cache.extend_script_pubkeys( + scripts_with_paths + .iter() + .map(|(script, path)| (script.clone(), *path)), + )?; + self.cache.mark_script_pubkeys_loaded(keychain); + + Ok(scripts_with_paths + .into_iter() + .map(|(script, _)| script) + .collect()) } fn iter_utxos(&self) -> Result, bdk::Error> { self.ctx @@ -414,14 +501,14 @@ impl Database for SqlxWalletDb { } fn iter_txs(&self, include_raw: bool) -> Result, bdk::Error> { - let txs = if include_raw { - if self.cache.raw_txs_fully_loaded() { - self.cache - .all_txs()? - .into_iter() - .map(|tx| (tx.txid, tx)) - .collect() - } else { + let txs = match (include_raw, self.cache.raw_txs_fully_loaded()) { + (true, true) => self + .cache + .all_txs()? + .into_iter() + .map(|tx| (tx.txid, tx)) + .collect(), + (true, false) => { let loaded = self .ctx .rt @@ -431,10 +518,21 @@ impl Database for SqlxWalletDb { self.cache.set_raw_txs_fully_loaded(); loaded } - } else { - self.ctx - .rt - .block_on(async { self.transactions_repo().load_all_summaries().await })? + (false, true) => self + .cache + .all_txs()? + .into_iter() + .map(Self::without_raw) + .collect(), + (false, false) => { + let txs = self + .ctx + .rt + .block_on(async { self.transactions_repo().load_all_summaries().await })?; + self.cache + .extend_summary_txs(txs.iter().map(|(txid, tx)| (*txid, tx.clone())))?; + txs + } }; Ok(Self::overlay_batch_txs(txs, &self.batch.txs, include_raw) @@ -631,6 +729,23 @@ mod tests { assert!(cache.raw_txs_fully_loaded()); } + #[test] + fn wallet_cache_script_pubkeys_loaded_flags_track_keychains() { + let cache = WalletCache::new(); + + assert!(!cache.script_pubkeys_fully_loaded(Some(KeychainKind::External))); + assert!(!cache.script_pubkeys_fully_loaded(Some(KeychainKind::Internal))); + assert!(!cache.script_pubkeys_fully_loaded(None)); + + cache.mark_script_pubkeys_loaded(Some(KeychainKind::External)); + assert!(cache.script_pubkeys_fully_loaded(Some(KeychainKind::External))); + assert!(!cache.script_pubkeys_fully_loaded(Some(KeychainKind::Internal))); + assert!(!cache.script_pubkeys_fully_loaded(None)); + + cache.mark_script_pubkeys_loaded(Some(KeychainKind::Internal)); + assert!(cache.script_pubkeys_fully_loaded(None)); + } + #[test] fn overlay_batch_txs_strips_raw_when_include_raw_is_false() { let txid = Txid::all_zeros(); diff --git a/src/bdk/pg/script_pubkeys.rs b/src/bdk/pg/script_pubkeys.rs index e02eeca3..c7617a2a 100644 --- a/src/bdk/pg/script_pubkeys.rs +++ b/src/bdk/pg/script_pubkeys.rs @@ -5,12 +5,25 @@ use uuid::Uuid; use super::convert::BdkKeychainKind; use crate::primitives::{bitcoin::ScriptBuf, *}; +type ScriptWithPath = (ScriptBuf, (bdk::KeychainKind, u32)); + pub struct ScriptPubkeys { keychain_id: KeychainId, pool: PgPool, } impl ScriptPubkeys { + fn script_with_path( + script: Vec, + keychain_kind: BdkKeychainKind, + path: i32, + ) -> ScriptWithPath { + ( + ScriptBuf::from(script), + (bdk::KeychainKind::from(keychain_kind), path as u32), + ) + } + pub fn new(keychain_id: KeychainId, pool: PgPool) -> Self { Self { keychain_id, pool } } @@ -88,12 +101,12 @@ impl ScriptPubkeys { keychain: impl Into, path: u32, ) -> Result, bdk::Error> { - let kind = keychain.into(); + let keychain_kind = keychain.into(); let row = sqlx::query!( r#"SELECT script FROM bdk_script_pubkeys WHERE keychain_id = $1 AND keychain_kind = $2 AND path = $3"#, Uuid::from(self.keychain_id), - kind as BdkKeychainKind, + keychain_kind as BdkKeychainKind, path as i32, ) .fetch_optional(&self.pool) @@ -122,17 +135,20 @@ impl ScriptPubkeys { } #[instrument(name = "bdk.script_pubkeys.list_scripts", skip_all)] + // Retained for non-path call sites and focused tests. + #[allow(dead_code)] pub async fn list_scripts( &self, keychain: Option>, ) -> Result, bdk::Error> { let keychain_id = Uuid::from(self.keychain_id); - let scripts = if let Some(kind) = keychain.map(Into::into) { + let keychain_kind: Option = keychain.map(Into::into); + let scripts = if let Some(keychain_kind) = keychain_kind { sqlx::query_scalar!( r#"SELECT script FROM bdk_script_pubkeys WHERE keychain_id = $1 AND keychain_kind = $2"#, keychain_id, - kind as BdkKeychainKind, + keychain_kind as BdkKeychainKind, ) .fetch_all(&self.pool) .await @@ -150,4 +166,43 @@ impl ScriptPubkeys { Ok(scripts.into_iter().map(ScriptBuf::from).collect()) } + + #[instrument(name = "bdk.script_pubkeys.list_scripts_with_paths", skip_all)] + pub async fn list_scripts_with_paths( + &self, + keychain: Option>, + ) -> Result, bdk::Error> { + let keychain_id = Uuid::from(self.keychain_id); + let keychain_kind: Option = keychain.map(Into::into); + if let Some(keychain_kind) = keychain_kind { + let rows = sqlx::query!( + r#"SELECT script, keychain_kind as "keychain_kind: BdkKeychainKind", path FROM bdk_script_pubkeys + WHERE keychain_id = $1 AND keychain_kind = $2"#, + keychain_id, + keychain_kind as BdkKeychainKind, + ) + .fetch_all(&self.pool) + .await + .map_err(|e| bdk::Error::Generic(e.to_string()))?; + + Ok(rows + .into_iter() + .map(|row| Self::script_with_path(row.script, row.keychain_kind, row.path)) + .collect()) + } else { + let rows = sqlx::query!( + r#"SELECT script, keychain_kind as "keychain_kind: BdkKeychainKind", path FROM bdk_script_pubkeys + WHERE keychain_id = $1"#, + keychain_id, + ) + .fetch_all(&self.pool) + .await + .map_err(|e| bdk::Error::Generic(e.to_string()))?; + + Ok(rows + .into_iter() + .map(|row| Self::script_with_path(row.script, row.keychain_kind, row.path)) + .collect()) + } + } } diff --git a/src/job/sync_wallet.rs b/src/job/sync_wallet.rs index e2348854..8d2e5908 100644 --- a/src/job/sync_wallet.rs +++ b/src/job/sync_wallet.rs @@ -683,6 +683,14 @@ async fn cleanup_soft_deleted_utxos(ctx: &KeychainSyncContext<'_>) -> Result<(), tx.commit().await?; continue; } + Err(UtxoError::UtxoAlreadySettledError) => { + // This can happen if the soft-deleted BDK UTXO maps to a bria UTXO that is + // already accounting-settled. Do not fail/retry the whole sync job; restore the + // BDK row visibility and continue. + tx.rollback().await?; + ctx.bdk_utxos.undelete(outpoint).await?; + continue; + } Err(e) => return Err(e.into()), }; From 268406e41d685d80e3d84e0d4523a5d999879684 Mon Sep 17 00:00:00 2001 From: Juan P Lopez Date: Thu, 16 Apr 2026 19:36:04 -0500 Subject: [PATCH 10/12] chore(tracing): instrument bdk pg database calls Add tracing spans across SqlxWalletDb BatchOperations and Database trait methods so sync call ordering and cache-hit paths are visible in traces. --- src/bdk/pg/mod.rs | 41 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 41 insertions(+) diff --git a/src/bdk/pg/mod.rs b/src/bdk/pg/mod.rs index 7090186a..00c8312e 100644 --- a/src/bdk/pg/mod.rs +++ b/src/bdk/pg/mod.rs @@ -360,6 +360,7 @@ impl SqlxWalletDb { } impl BatchOperations for SqlxWalletDb { + #[tracing::instrument(name = "bdk.batch.set_script_pubkey", skip_all, err)] fn set_script_pubkey( &mut self, script: &Script, @@ -370,20 +371,24 @@ impl BatchOperations for SqlxWalletDb { Ok(()) } + #[tracing::instrument(name = "bdk.batch.set_utxo", skip_all, err)] fn set_utxo(&mut self, utxo: &LocalUtxo) -> Result<(), bdk::Error> { self.batch.utxos.push(utxo.clone()); Ok(()) } + #[tracing::instrument(name = "bdk.batch.set_raw_tx", skip_all, err)] fn set_raw_tx(&mut self, _: &Transaction) -> Result<(), bdk::Error> { Err(Self::unsupported_operation("set_raw_tx")) } + #[tracing::instrument(name = "bdk.batch.set_tx", skip_all, err)] fn set_tx(&mut self, tx: &TransactionDetails) -> Result<(), bdk::Error> { self.batch.txs.insert(tx.txid, tx.clone()); Ok(()) } + #[tracing::instrument(name = "bdk.batch.set_last_index", skip_all, err)] fn set_last_index(&mut self, kind: KeychainKind, idx: u32) -> Result<(), bdk::Error> { // NOTE: This write is intentionally immediate because BDK may call it outside of // `commit_batch` flow. @@ -392,6 +397,7 @@ impl BatchOperations for SqlxWalletDb { .block_on(async { self.indexes_repo().persist_last_index(kind, idx).await }) } + #[tracing::instrument(name = "bdk.batch.set_sync_time", skip_all, err)] fn set_sync_time(&mut self, time: SyncTime) -> Result<(), bdk::Error> { // NOTE: This write is intentionally immediate because BDK may call it outside of // `commit_batch` flow. @@ -400,6 +406,7 @@ impl BatchOperations for SqlxWalletDb { .block_on(async { self.sync_times_repo().persist(time).await }) } + #[tracing::instrument(name = "bdk.batch.del_script_pubkey_from_path", skip_all, err)] fn del_script_pubkey_from_path( &mut self, _: KeychainKind, @@ -407,21 +414,28 @@ impl BatchOperations for SqlxWalletDb { ) -> Result, bdk::Error> { Err(Self::unsupported_operation("del_script_pubkey_from_path")) } + + #[tracing::instrument(name = "bdk.batch.del_path_from_script_pubkey", skip_all, err)] fn del_path_from_script_pubkey( &mut self, _: &Script, ) -> Result, bdk::Error> { Err(Self::unsupported_operation("del_path_from_script_pubkey")) } + + #[tracing::instrument(name = "bdk.batch.del_utxo", skip_all, err)] fn del_utxo(&mut self, outpoint: &OutPoint) -> Result, bdk::Error> { self.ctx .rt .block_on(async { self.utxos_repo().delete(outpoint).await }) } + + #[tracing::instrument(name = "bdk.batch.del_raw_tx", skip_all, err)] fn del_raw_tx(&mut self, _: &Txid) -> Result, bdk::Error> { Err(Self::unsupported_operation("del_raw_tx")) } + #[tracing::instrument(name = "bdk.batch.del_tx", skip_all, err)] fn del_tx( &mut self, tx_id: &Txid, @@ -439,15 +453,20 @@ impl BatchOperations for SqlxWalletDb { Ok(deleted) } + + #[tracing::instrument(name = "bdk.batch.del_last_index", skip_all, err)] fn del_last_index(&mut self, _: KeychainKind) -> Result, bdk::Error> { Err(Self::unsupported_operation("del_last_index")) } + + #[tracing::instrument(name = "bdk.batch.del_sync_time", skip_all, err)] fn del_sync_time(&mut self) -> Result, bdk::Error> { Err(Self::unsupported_operation("del_sync_time")) } } impl Database for SqlxWalletDb { + #[tracing::instrument(name = "bdk.db.check_descriptor_checksum", skip_all, err)] fn check_descriptor_checksum( &mut self, keychain: KeychainKind, @@ -465,6 +484,8 @@ impl Database for SqlxWalletDb { Ok(()) }) } + + #[tracing::instrument(name = "bdk.db.iter_script_pubkeys", skip_all, err)] fn iter_script_pubkeys( &self, keychain: Option, @@ -491,15 +512,20 @@ impl Database for SqlxWalletDb { .map(|(script, _)| script) .collect()) } + + #[tracing::instrument(name = "bdk.db.iter_utxos", skip_all, err)] fn iter_utxos(&self) -> Result, bdk::Error> { self.ctx .rt .block_on(async { self.utxos_repo().list_local_utxos().await }) } + + #[tracing::instrument(name = "bdk.db.iter_raw_txs", skip_all, err)] fn iter_raw_txs(&self) -> Result, bdk::Error> { Err(Self::unsupported_operation("iter_raw_txs")) } + #[tracing::instrument(name = "bdk.db.iter_txs", skip_all, err)] fn iter_txs(&self, include_raw: bool) -> Result, bdk::Error> { let txs = match (include_raw, self.cache.raw_txs_fully_loaded()) { (true, true) => self @@ -540,6 +566,7 @@ impl Database for SqlxWalletDb { .collect()) } + #[tracing::instrument(name = "bdk.db.get_script_pubkey_from_path", skip_all, err)] fn get_script_pubkey_from_path( &self, keychain: KeychainKind, @@ -549,21 +576,29 @@ impl Database for SqlxWalletDb { .rt .block_on(async { self.script_pubkeys_repo().find_script(keychain, path).await }) } + + #[tracing::instrument(name = "bdk.db.get_path_from_script_pubkey", skip_all, err)] fn get_path_from_script_pubkey( &self, script: &Script, ) -> Result, bdk::Error> { self.lookup_script_pubkey_path(script) } + + #[tracing::instrument(name = "bdk.db.get_utxo", skip_all, err)] fn get_utxo(&self, outpoint: &OutPoint) -> Result, bdk::Error> { self.ctx .rt .block_on(async { self.utxos_repo().find(outpoint).await }) } + + #[tracing::instrument(name = "bdk.db.get_raw_tx", skip_all, err)] fn get_raw_tx(&self, tx_id: &Txid) -> Result, bdk::Error> { self.lookup_tx_with_mode(tx_id, TxLookupMode::RequireRaw) .map(|tx| tx.and_then(|tx| tx.transaction)) } + + #[tracing::instrument(name = "bdk.db.get_tx", skip_all, err)] fn get_tx( &self, tx_id: &Txid, @@ -578,16 +613,22 @@ impl Database for SqlxWalletDb { }) }) } + + #[tracing::instrument(name = "bdk.db.get_last_index", skip_all, err)] fn get_last_index(&self, kind: KeychainKind) -> Result, bdk::Error> { self.ctx .rt .block_on(async { self.indexes_repo().get_latest(kind).await }) } + + #[tracing::instrument(name = "bdk.db.get_sync_time", skip_all, err)] fn get_sync_time(&self) -> Result, bdk::Error> { self.ctx .rt .block_on(async { self.sync_times_repo().get().await }) } + + #[tracing::instrument(name = "bdk.db.increment_last_index", skip_all, err)] fn increment_last_index(&mut self, keychain: KeychainKind) -> Result { self.ctx .rt From 3a091fe6f5ecf52de36aabd0ac7181565771a9e1 Mon Sep 17 00:00:00 2001 From: Juan P Lopez Date: Thu, 16 Apr 2026 20:22:54 -0500 Subject: [PATCH 11/12] fix(bdk): harden cache invariants and path decoding --- src/bdk/pg/mod.rs | 124 +++++++++++++++++++++++++++++++---- src/bdk/pg/script_pubkeys.rs | 23 +++++-- src/job/sync_wallet.rs | 2 + 3 files changed, 131 insertions(+), 18 deletions(-) diff --git a/src/bdk/pg/mod.rs b/src/bdk/pg/mod.rs index 00c8312e..b2db5bcd 100644 --- a/src/bdk/pg/mod.rs +++ b/src/bdk/pg/mod.rs @@ -67,6 +67,7 @@ struct WalletCache { transactions: Arc>, // Process-local hint for which keychain script path sets are fully hydrated. // Bit 0: external, bit 1: internal. + // This is intentionally not synchronized across processes. script_pubkeys_loaded_mask: Arc, // Process-local hint: true means this instance has already hydrated raw tx details // from the DB at least once. It is intentionally not synchronized across processes. @@ -182,6 +183,10 @@ impl WalletCache { { let mut cache = self.lock_transactions()?; for (txid, mut summary) in entries { + // Summary refreshes may run after raw tx bytes were already hydrated. Preserve any + // cached raw transaction payload while applying fresh DB metadata fields. + // `iter_txs` overlays in-memory batch writes afterwards, so uncommitted updates still + // take precedence in returned views. if let Some(raw_tx) = cache .get(&txid) .and_then(|existing| existing.transaction.clone()) @@ -236,6 +241,24 @@ impl SqlxWalletDb { ScriptPubkeys::new(self.ctx.keychain_id, self.ctx.pool.clone()) } + fn cache_loaded_script_pubkeys( + cache: &WalletCache, + keychain: Option, + scripts_with_paths: Vec<(ScriptBuf, (KeychainKind, u32))>, + ) -> Result, bdk::Error> { + cache.extend_script_pubkeys( + scripts_with_paths + .iter() + .map(|(script, path)| (script.clone(), *path)), + )?; + cache.mark_script_pubkeys_loaded(keychain); + + Ok(scripts_with_paths + .into_iter() + .map(|(script, _)| script) + .collect()) + } + fn utxos_repo(&self) -> Utxos { Utxos::new(self.ctx.keychain_id, self.ctx.pool.clone()) } @@ -500,17 +523,7 @@ impl Database for SqlxWalletDb { .await })?; - self.cache.extend_script_pubkeys( - scripts_with_paths - .iter() - .map(|(script, path)| (script.clone(), *path)), - )?; - self.cache.mark_script_pubkeys_loaded(keychain); - - Ok(scripts_with_paths - .into_iter() - .map(|(script, _)| script) - .collect()) + Self::cache_loaded_script_pubkeys(&self.cache, keychain, scripts_with_paths) } #[tracing::instrument(name = "bdk.db.iter_utxos", skip_all, err)] @@ -544,6 +557,9 @@ impl Database for SqlxWalletDb { self.cache.set_raw_txs_fully_loaded(); loaded } + // Once raw txs are fully loaded for this process, serve summary calls from cache to + // avoid repeated full-table reads. This returns the in-process snapshot (kept current + // by set/del/commit paths) rather than forcing a fresh DB roundtrip. (false, true) => self .cache .all_txs()? @@ -826,4 +842,90 @@ mod tests { assert_eq!(txs.len(), 1); assert_eq!(txs[0].txid, txid); } + + #[test] + fn wallet_cache_extend_summary_txs_preserves_raw_and_refreshes_metadata() { + let cache = WalletCache::new(); + let txid = Txid::all_zeros(); + + let raw_tx = bdk::bitcoin::Transaction { + version: 2, + lock_time: bdk::bitcoin::absolute::LockTime::ZERO, + input: Vec::new(), + output: Vec::new(), + }; + + let mut existing = tx_details(txid); + existing.transaction = Some(raw_tx.clone()); + existing.received = 1; + cache + .insert_tx(txid, existing) + .expect("insert should succeed"); + + let mut summary = tx_details(txid); + summary.received = 42; + summary.sent = 7; + summary.fee = Some(3); + summary.confirmation_time = Some(bdk::BlockTime { + height: 100, + timestamp: 123, + }); + + cache + .extend_summary_txs([(txid, summary)]) + .expect("extend should succeed"); + + let merged = cache + .get_tx(&txid) + .expect("get should succeed") + .expect("tx should exist"); + assert_eq!(merged.transaction, Some(raw_tx)); + assert_eq!(merged.received, 42); + assert_eq!(merged.sent, 7); + assert_eq!(merged.fee, Some(3)); + assert_eq!( + merged.confirmation_time, + Some(bdk::BlockTime { + height: 100, + timestamp: 123, + }) + ); + } + + #[test] + fn cache_loaded_script_pubkeys_marks_mask_and_populates_paths() { + let cache = WalletCache::new(); + let external_script = ScriptBuf::from(vec![0x51]); + let internal_script = ScriptBuf::from(vec![0x52]); + + let loaded = SqlxWalletDb::cache_loaded_script_pubkeys( + &cache, + Some(KeychainKind::External), + vec![(external_script.clone(), (KeychainKind::External, 7))], + ) + .expect("cache should be populated"); + + assert_eq!(loaded, vec![external_script.clone()]); + assert!(cache.script_pubkeys_fully_loaded(Some(KeychainKind::External))); + assert!(!cache.script_pubkeys_fully_loaded(Some(KeychainKind::Internal))); + assert_eq!( + cache + .get_script_pubkey_path(external_script.as_script()) + .expect("path lookup should succeed"), + Some((KeychainKind::External, 7)) + ); + + SqlxWalletDb::cache_loaded_script_pubkeys( + &cache, + Some(KeychainKind::Internal), + vec![(internal_script.clone(), (KeychainKind::Internal, 3))], + ) + .expect("cache should be populated"); + + assert!(cache.script_pubkeys_fully_loaded(None)); + let all = cache + .all_script_pubkeys(None) + .expect("all_script_pubkeys should succeed"); + assert_eq!(all.len(), 2); + } } diff --git a/src/bdk/pg/script_pubkeys.rs b/src/bdk/pg/script_pubkeys.rs index c7617a2a..01983512 100644 --- a/src/bdk/pg/script_pubkeys.rs +++ b/src/bdk/pg/script_pubkeys.rs @@ -17,11 +17,13 @@ impl ScriptPubkeys { script: Vec, keychain_kind: BdkKeychainKind, path: i32, - ) -> ScriptWithPath { - ( + ) -> Result { + let path = u32::try_from(path) + .map_err(|_| bdk::Error::Generic(format!("invalid derivation path from db: {path}")))?; + Ok(( ScriptBuf::from(script), - (bdk::KeychainKind::from(keychain_kind), path as u32), - ) + (bdk::KeychainKind::from(keychain_kind), path), + )) } pub fn new(keychain_id: KeychainId, pool: PgPool) -> Self { @@ -131,7 +133,14 @@ impl ScriptPubkeys { .await .map_err(|e| bdk::Error::Generic(e.to_string()))?; - Ok(row.map(|row| (row.keychain_kind, row.path as u32))) + row.map(|row| { + u32::try_from(row.path) + .map(|path| (row.keychain_kind, path)) + .map_err(|_| { + bdk::Error::Generic(format!("invalid derivation path from db: {}", row.path)) + }) + }) + .transpose() } #[instrument(name = "bdk.script_pubkeys.list_scripts", skip_all)] @@ -188,7 +197,7 @@ impl ScriptPubkeys { Ok(rows .into_iter() .map(|row| Self::script_with_path(row.script, row.keychain_kind, row.path)) - .collect()) + .collect::, _>>()?) } else { let rows = sqlx::query!( r#"SELECT script, keychain_kind as "keychain_kind: BdkKeychainKind", path FROM bdk_script_pubkeys @@ -202,7 +211,7 @@ impl ScriptPubkeys { Ok(rows .into_iter() .map(|row| Self::script_with_path(row.script, row.keychain_kind, row.path)) - .collect()) + .collect::, _>>()?) } } } diff --git a/src/job/sync_wallet.rs b/src/job/sync_wallet.rs index 8d2e5908..7442afd9 100644 --- a/src/job/sync_wallet.rs +++ b/src/job/sync_wallet.rs @@ -687,6 +687,8 @@ async fn cleanup_soft_deleted_utxos(ctx: &KeychainSyncContext<'_>) -> Result<(), // This can happen if the soft-deleted BDK UTXO maps to a bria UTXO that is // already accounting-settled. Do not fail/retry the whole sync job; restore the // BDK row visibility and continue. + // If rollback itself fails, this iteration returns early and undelete is skipped; + // the next sync cycle re-attempts reconciliation. tx.rollback().await?; ctx.bdk_utxos.undelete(outpoint).await?; continue; From 75e1e9910c94596641e85a37c06c1086524811d6 Mon Sep 17 00:00:00 2001 From: Juan P Lopez Date: Thu, 16 Apr 2026 20:41:04 -0500 Subject: [PATCH 12/12] refactor(bdk): simplify summary tx conversion paths --- src/bdk/pg/mod.rs | 59 +++++++++++++++++++++++++++++++++-------------- 1 file changed, 42 insertions(+), 17 deletions(-) diff --git a/src/bdk/pg/mod.rs b/src/bdk/pg/mod.rs index b2db5bcd..ad3233bd 100644 --- a/src/bdk/pg/mod.rs +++ b/src/bdk/pg/mod.rs @@ -203,6 +203,14 @@ impl WalletCache { Ok(cache.values().cloned().collect()) } + fn all_summary_txs(&self) -> Result, bdk::Error> { + let cache = self.lock_transactions()?; + Ok(cache + .values() + .map(|tx| (tx.txid, SqlxWalletDb::summary_tx_from_ref(tx))) + .collect()) + } + fn raw_txs_fully_loaded(&self) -> bool { self.raw_txs_fully_loaded.load(Ordering::Acquire) } @@ -353,14 +361,35 @@ impl SqlxWalletDb { mode == TxLookupMode::Any || tx.transaction.is_some() } - fn tx_without_raw(mut tx: TransactionDetails) -> TransactionDetails { - tx.transaction = None; - tx + fn summary_tx_from_ref(tx: &TransactionDetails) -> TransactionDetails { + TransactionDetails { + transaction: None, + txid: tx.txid, + received: tx.received, + sent: tx.sent, + fee: tx.fee, + confirmation_time: tx.confirmation_time.clone(), + } } - fn without_raw(tx: TransactionDetails) -> (Txid, TransactionDetails) { - let txid = tx.txid; - (txid, Self::tx_without_raw(tx)) + fn summary_tx_from_owned(tx: TransactionDetails) -> TransactionDetails { + let TransactionDetails { + txid, + received, + sent, + fee, + confirmation_time, + .. + } = tx; + + TransactionDetails { + transaction: None, + txid, + received, + sent, + fee, + confirmation_time, + } } fn overlay_batch_txs( @@ -374,7 +403,7 @@ impl SqlxWalletDb { txs.extend( batch_txs .iter() - .map(|(id, tx)| (*id, Self::tx_without_raw(tx.clone()))), + .map(|(id, tx)| (*id, Self::summary_tx_from_ref(tx))), ); } @@ -560,12 +589,7 @@ impl Database for SqlxWalletDb { // Once raw txs are fully loaded for this process, serve summary calls from cache to // avoid repeated full-table reads. This returns the in-process snapshot (kept current // by set/del/commit paths) rather than forcing a fresh DB roundtrip. - (false, true) => self - .cache - .all_txs()? - .into_iter() - .map(Self::without_raw) - .collect(), + (false, true) => self.cache.all_summary_txs()?, (false, false) => { let txs = self .ctx @@ -621,11 +645,12 @@ impl Database for SqlxWalletDb { include_raw: bool, ) -> Result, bdk::Error> { self.lookup_tx(tx_id).map(|tx| { - tx.map(|mut tx| { - if !include_raw { - tx.transaction = None; + tx.map(|tx| { + if include_raw { + tx + } else { + Self::summary_tx_from_owned(tx) } - tx }) }) }