From 004774ee3792f08cd68543f8154d045c17c3aa55 Mon Sep 17 00:00:00 2001 From: Juan P Lopez Date: Thu, 16 Apr 2026 11:16:55 -0500 Subject: [PATCH 1/8] refactor(bdk): stabilize sqlx wallet db cache semantics Restructure SqlxWalletDb internals to isolate context, batch state, and shared caches while keeping BDK trait behavior compatible. This avoids eager cache hydration side effects, makes cache errors recoverable, and commits DB writes before cache mutation for safer sync behavior at large wallet scale. --- .gitignore | 1 + src/bdk/pg/mod.rs | 475 ++++++++++++++++++++++++----------- src/bdk/pg/script_pubkeys.rs | 1 + 3 files changed, 330 insertions(+), 147 deletions(-) diff --git a/.gitignore b/.gitignore index f3af7694..ca3fc711 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ /target .bria +.bats-e2e .e2e-logs - .direnv diff --git a/src/bdk/pg/mod.rs b/src/bdk/pg/mod.rs index b1e5ca45..055fc107 100644 --- a/src/bdk/pg/mod.rs +++ b/src/bdk/pg/mod.rs @@ -21,60 +21,204 @@ use index::Indexes; use script_pubkeys::ScriptPubkeys; use std::{ collections::HashMap, - sync::{Arc, Mutex}, + sync::{Arc, Mutex, MutexGuard}, }; pub(super) use sync_times::SyncTimes; pub use transactions::*; pub use utxos::*; -pub struct SqlxWalletDb { +type ScriptPubkeyCache = HashMap; +type TransactionCache = HashMap; + +#[derive(Clone)] +struct WalletDbContext { rt: Handle, pool: PgPool, keychain_id: KeychainId, - utxos: Option>, - cached_spks: Arc>>, - addresses: HashMap, - cached_txs: Arc>>, - txs: HashMap, } -impl SqlxWalletDb { - pub fn new(pool: PgPool, keychain_id: KeychainId) -> Self { +impl WalletDbContext { + fn new(pool: PgPool, keychain_id: KeychainId) -> Self { Self { rt: Handle::current(), - keychain_id, pool, - utxos: None, - addresses: HashMap::new(), - cached_spks: Arc::new(Mutex::new(HashMap::new())), - txs: HashMap::new(), - cached_txs: Arc::new(Mutex::new(HashMap::new())), + keychain_id, } } +} + +#[derive(Default)] +struct WalletBatchState { + utxos: Vec, + addresses: ScriptPubkeyCache, + txs: TransactionCache, +} - fn load_all_txs(&self) -> Result<(), bdk::Error> { - let mut txs = self.cached_txs.lock().expect("poisoned txs cache lock"); - if txs.is_empty() { - let loaded = self.rt.block_on(async { - let txs = Transactions::new(self.keychain_id, self.pool.clone()); - txs.load_all().await - })?; - *txs = loaded; +#[derive(Clone)] +struct WalletCache { + script_pubkeys: Arc>, + transactions: Arc>, +} + +impl WalletCache { + fn new() -> Self { + Self { + script_pubkeys: Arc::new(Mutex::new(HashMap::new())), + transactions: Arc::new(Mutex::new(HashMap::new())), } + } + + fn lock_script_pubkeys(&self) -> Result, bdk::Error> { + self.script_pubkeys + .lock() + .map_err(|_| bdk::Error::Generic("script pubkeys cache lock poisoned".to_string())) + } + + fn lock_transactions(&self) -> Result, bdk::Error> { + self.transactions + .lock() + .map_err(|_| bdk::Error::Generic("transactions cache lock poisoned".to_string())) + } + + fn get_script_pubkey_path( + &self, + script: &Script, + ) -> Result, bdk::Error> { + let cache = self.lock_script_pubkeys()?; + Ok(cache.get(script).copied()) + } + + fn insert_script_pubkey( + &self, + script: ScriptBuf, + path: (KeychainKind, u32), + ) -> Result<(), bdk::Error> { + let mut cache = self.lock_script_pubkeys()?; + cache.insert(script, path); + Ok(()) + } + + fn extend_script_pubkeys(&self, entries: I) -> Result<(), bdk::Error> + where + I: IntoIterator, + { + let mut cache = self.lock_script_pubkeys()?; + cache.extend(entries); Ok(()) } + fn get_tx(&self, txid: &Txid) -> Result, bdk::Error> { + let cache = self.lock_transactions()?; + Ok(cache.get(txid).cloned()) + } + + fn insert_tx(&self, txid: Txid, tx: TransactionDetails) -> Result<(), bdk::Error> { + let mut cache = self.lock_transactions()?; + cache.insert(txid, tx); + Ok(()) + } + + fn extend_txs(&self, entries: I) -> Result<(), bdk::Error> + where + I: IntoIterator, + { + let mut cache = self.lock_transactions()?; + cache.extend(entries); + Ok(()) + } + + fn remove_tx(&self, txid: &Txid) -> Result<(), bdk::Error> { + let mut cache = self.lock_transactions()?; + cache.remove(txid); + Ok(()) + } +} + +pub struct SqlxWalletDb { + ctx: WalletDbContext, + cache: WalletCache, + batch: WalletBatchState, +} + +impl SqlxWalletDb { + pub fn new(pool: PgPool, keychain_id: KeychainId) -> Self { + Self { + ctx: WalletDbContext::new(pool, keychain_id), + cache: WalletCache::new(), + batch: WalletBatchState::default(), + } + } + + fn script_pubkeys_repo(&self) -> ScriptPubkeys { + ScriptPubkeys::new(self.ctx.keychain_id, self.ctx.pool.clone()) + } + + fn utxos_repo(&self) -> Utxos { + Utxos::new(self.ctx.keychain_id, self.ctx.pool.clone()) + } + + fn transactions_repo(&self) -> Transactions { + Transactions::new(self.ctx.keychain_id, self.ctx.pool.clone()) + } + + fn indexes_repo(&self) -> Indexes { + Indexes::new(self.ctx.keychain_id, self.ctx.pool.clone()) + } + + fn sync_times_repo(&self) -> SyncTimes { + SyncTimes::new(self.ctx.keychain_id, self.ctx.pool.clone()) + } + + fn descriptor_checksums_repo(&self) -> DescriptorChecksums { + DescriptorChecksums::new(self.ctx.keychain_id, self.ctx.pool.clone()) + } + + fn lookup_script_pubkey_path( + &self, + script: &Script, + ) -> Result, bdk::Error> { + if let Some(path) = self.batch.addresses.get(script) { + return Ok(Some(*path)); + } + + if let Some(path) = self.cache.get_script_pubkey_path(script)? { + return Ok(Some(path)); + } + + let script_pubkey = script.to_owned(); + let found = self + .ctx + .rt + .block_on(async { self.script_pubkeys_repo().find_path(&script_pubkey).await })?; + + if let Some((kind, path)) = found { + let value = (KeychainKind::from(kind), path); + self.cache.insert_script_pubkey(script_pubkey, value)?; + return Ok(Some(value)); + } + + Ok(None) + } + fn lookup_tx(&self, txid: &Txid) -> Result, bdk::Error> { - if let Some(tx) = self.txs.get(txid) { + if let Some(tx) = self.batch.txs.get(txid) { return Ok(Some(tx.clone())); } - self.load_all_txs()?; - Ok(self - .cached_txs - .lock() - .expect("poisoned txs cache lock") - .get(txid) - .cloned()) + + if let Some(tx) = self.cache.get_tx(txid)? { + return Ok(Some(tx)); + } + + let found = self + .ctx + .rt + .block_on(async { self.transactions_repo().find_by_id(txid).await })?; + + if let Some(tx) = &found { + self.cache.insert_tx(tx.txid, tx.clone())?; + } + + Ok(found) } } @@ -85,15 +229,12 @@ impl BatchOperations for SqlxWalletDb { keychain: KeychainKind, path: u32, ) -> Result<(), bdk::Error> { - self.addresses.insert(script.into(), (keychain, path)); + self.batch.addresses.insert(script.into(), (keychain, path)); Ok(()) } fn set_utxo(&mut self, utxo: &LocalUtxo) -> Result<(), bdk::Error> { - if self.utxos.is_none() { - self.utxos = Some(Vec::new()); - } - self.utxos.as_mut().unwrap().push(utxo.clone()); + self.batch.utxos.push(utxo.clone()); Ok(()) } @@ -102,22 +243,20 @@ impl BatchOperations for SqlxWalletDb { } fn set_tx(&mut self, tx: &TransactionDetails) -> Result<(), bdk::Error> { - self.txs.insert(tx.txid, tx.clone()); + self.batch.txs.insert(tx.txid, tx.clone()); Ok(()) } fn set_last_index(&mut self, kind: KeychainKind, idx: u32) -> Result<(), bdk::Error> { - self.rt.block_on(async { - let indexes = Indexes::new(self.keychain_id, self.pool.clone()); - indexes.persist_last_index(kind, idx).await - }) + self.ctx + .rt + .block_on(async { self.indexes_repo().persist_last_index(kind, idx).await }) } fn set_sync_time(&mut self, time: SyncTime) -> Result<(), bdk::Error> { - self.rt.block_on(async { - let sync_times = SyncTimes::new(self.keychain_id, self.pool.clone()); - sync_times.persist(time).await - }) + self.ctx + .rt + .block_on(async { self.sync_times_repo().persist(time).await }) } fn del_script_pubkey_from_path( @@ -134,11 +273,9 @@ impl BatchOperations for SqlxWalletDb { unimplemented!() } fn del_utxo(&mut self, outpoint: &OutPoint) -> Result, bdk::Error> { - self.rt.block_on(async { - Utxos::new(self.keychain_id, self.pool.clone()) - .delete(outpoint) - .await - }) + self.ctx + .rt + .block_on(async { self.utxos_repo().delete(outpoint).await }) } fn del_raw_tx(&mut self, _: &Txid) -> Result, bdk::Error> { unimplemented!() @@ -149,10 +286,17 @@ impl BatchOperations for SqlxWalletDb { tx_id: &Txid, _include_raw: bool, ) -> Result, bdk::Error> { - self.rt.block_on(async { - let txs = Transactions::new(self.keychain_id, self.pool.clone()); - txs.delete(tx_id).await - }) + let deleted = self + .ctx + .rt + .block_on(async { self.transactions_repo().delete(tx_id).await })?; + + if deleted.is_some() { + self.batch.txs.remove(tx_id); + self.cache.remove_tx(tx_id)?; + } + + Ok(deleted) } fn del_last_index(&mut self, _: KeychainKind) -> Result, bdk::Error> { unimplemented!() @@ -171,8 +315,8 @@ impl Database for SqlxWalletDb { where B: AsRef<[u8]>, { - self.rt.block_on(async { - let checksums = DescriptorChecksums::new(self.keychain_id, self.pool.clone()); + self.ctx.rt.block_on(async { + let checksums = self.descriptor_checksums_repo(); checksums .check_or_persist_descriptor_checksum(keychain, script_bytes.as_ref()) .await?; @@ -184,32 +328,26 @@ impl Database for SqlxWalletDb { &self, keychain: Option, ) -> Result, bdk::Error> { - self.rt.block_on(async { - let script_pubkeys = ScriptPubkeys::new(self.keychain_id, self.pool.clone()); - let scripts = script_pubkeys.list_scripts(keychain).await?; - Ok(scripts) - }) + self.ctx + .rt + .block_on(async { self.script_pubkeys_repo().list_scripts(keychain).await }) } fn iter_utxos(&self) -> Result, bdk::Error> { - self.rt.block_on(async { - Utxos::new(self.keychain_id, self.pool.clone()) - .list_local_utxos() - .await - }) + self.ctx + .rt + .block_on(async { self.utxos_repo().list_local_utxos().await }) } fn iter_raw_txs(&self) -> Result, bdk::Error> { unimplemented!() } fn iter_txs(&self, _: bool) -> Result, bdk::Error> { - self.load_all_txs()?; - Ok(self - .cached_txs - .lock() - .expect("poisoned txs cache lock") - .values() - .cloned() - .collect()) + let mut txs = self + .ctx + .rt + .block_on(async { self.transactions_repo().load_all().await })?; + txs.extend(self.batch.txs.iter().map(|(id, tx)| (*id, tx.clone()))); + Ok(txs.into_values().collect()) } fn get_script_pubkey_from_path( @@ -217,38 +355,20 @@ impl Database for SqlxWalletDb { keychain: KeychainKind, path: u32, ) -> Result, bdk::Error> { - self.rt.block_on(async { - let script_pubkeys = ScriptPubkeys::new(self.keychain_id, self.pool.clone()); - script_pubkeys.find_script(keychain, path).await - }) + self.ctx + .rt + .block_on(async { self.script_pubkeys_repo().find_script(keychain, path).await }) } fn get_path_from_script_pubkey( &self, script: &Script, ) -> Result, bdk::Error> { - let mut cache = self.cached_spks.lock().expect("poisoned spk cache lock"); - if cache.is_empty() { - let loaded = self.rt.block_on(async { - let script_pubkeys = ScriptPubkeys::new(self.keychain_id, self.pool.clone()); - script_pubkeys.load_all().await - })?; - *cache = loaded; - } - - if let Some(res) = cache.get(script) { - Ok(Some(*res)) - } else if let Some(res) = self.addresses.get(script) { - Ok(Some(*res)) - } else { - Ok(None) - } + self.lookup_script_pubkey_path(script) } fn get_utxo(&self, outpoint: &OutPoint) -> Result, bdk::Error> { - self.rt.block_on(async { - Utxos::new(self.keychain_id, self.pool.clone()) - .find(outpoint) - .await - }) + self.ctx + .rt + .block_on(async { self.utxos_repo().find(outpoint).await }) } fn get_raw_tx(&self, tx_id: &Txid) -> Result, bdk::Error> { self.lookup_tx(tx_id) @@ -262,22 +382,19 @@ impl Database for SqlxWalletDb { self.lookup_tx(tx_id) } fn get_last_index(&self, kind: KeychainKind) -> Result, bdk::Error> { - self.rt.block_on(async { - let last_indexes = Indexes::new(self.keychain_id, self.pool.clone()); - last_indexes.get_latest(kind).await - }) + self.ctx + .rt + .block_on(async { self.indexes_repo().get_latest(kind).await }) } fn get_sync_time(&self) -> Result, bdk::Error> { - self.rt.block_on(async { - let sync_times = SyncTimes::new(self.keychain_id, self.pool.clone()); - sync_times.get().await - }) + self.ctx + .rt + .block_on(async { self.sync_times_repo().get().await }) } fn increment_last_index(&mut self, keychain: KeychainKind) -> Result { - self.rt.block_on(async { - let indexes = Indexes::new(self.keychain_id, self.pool.clone()); - indexes.increment(keychain).await - }) + self.ctx + .rt + .block_on(async { self.indexes_repo().increment(keychain).await }) } } @@ -285,52 +402,116 @@ impl BatchDatabase for SqlxWalletDb { type Batch = Self; fn begin_batch(&self) -> ::Batch { - let mut res = SqlxWalletDb::new(self.pool.clone(), self.keychain_id); - res.cached_spks = Arc::clone(&self.cached_spks); - res.cached_txs = Arc::clone(&self.cached_txs); - res + SqlxWalletDb { + ctx: self.ctx.clone(), + cache: self.cache.clone(), + batch: WalletBatchState::default(), + } } fn commit_batch( &mut self, mut batch: ::Batch, ) -> Result<(), bdk::Error> { - self.cached_spks - .lock() - .expect("poisoned spk cache lock") - .extend( - batch - .addresses - .iter() - .map(|(s, (k, p))| (s.clone(), (*k, *p))), - ); - - self.cached_txs - .lock() - .expect("poisoned txs cache lock") - .extend(batch.txs.iter().map(|(id, tx)| (*id, tx.clone()))); - - self.rt.block_on(async move { - if !batch.addresses.is_empty() { - let addresses: Vec<_> = batch - .addresses - .drain() - .map(|(s, (k, p))| (BdkKeychainKind::from(k), p, s)) - .collect(); - let repo = ScriptPubkeys::new(batch.keychain_id, batch.pool.clone()); - repo.persist_all(addresses).await?; + let addresses_for_cache: Vec<_> = batch + .batch + .addresses + .iter() + .map(|(script, (keychain, path))| (script.clone(), (*keychain, *path))) + .collect(); + let addresses_for_db: Vec<_> = batch + .batch + .addresses + .drain() + .map(|(script, (keychain, path))| (BdkKeychainKind::from(keychain), path, script)) + .collect(); + + let txs_for_cache: Vec<_> = batch + .batch + .txs + .iter() + .map(|(txid, tx)| (*txid, tx.clone())) + .collect(); + let txs_for_db: Vec<_> = batch.batch.txs.drain().map(|(_, tx)| tx).collect(); + + let utxos_for_db = std::mem::take(&mut batch.batch.utxos); + let keychain_id = batch.ctx.keychain_id; + let pool = batch.ctx.pool.clone(); + + batch.ctx.rt.block_on(async move { + if !addresses_for_db.is_empty() { + ScriptPubkeys::new(keychain_id, pool.clone()) + .persist_all(addresses_for_db) + .await?; } - if let Some(utxos) = batch.utxos.take() { - let repo = Utxos::new(batch.keychain_id, batch.pool.clone()); - repo.persist_all(utxos).await?; + if !utxos_for_db.is_empty() { + Utxos::new(keychain_id, pool.clone()) + .persist_all(utxos_for_db) + .await?; } - if !batch.txs.is_empty() { - let txs = batch.txs.drain().map(|(_, tx)| tx).collect(); - let repo = Transactions::new(batch.keychain_id, batch.pool.clone()); - repo.persist_all(txs).await?; + + if !txs_for_db.is_empty() { + Transactions::new(keychain_id, pool) + .persist_all(txs_for_db) + .await?; } + Ok::<_, bdk::Error>(()) - }) + })?; + + self.cache.extend_script_pubkeys(addresses_for_cache)?; + self.cache.extend_txs(txs_for_cache)?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use bdk::bitcoin::hashes::Hash; + + fn tx_details(txid: Txid) -> TransactionDetails { + TransactionDetails { + transaction: None, + txid, + received: 0, + sent: 0, + fee: None, + confirmation_time: None, + } + } + + #[test] + fn wallet_cache_can_insert_get_and_remove_transactions() { + let cache = WalletCache::new(); + let txid = Txid::all_zeros(); + let details = tx_details(txid); + + cache + .insert_tx(txid, details.clone()) + .expect("insert should succeed"); + let loaded = cache.get_tx(&txid).expect("get should succeed"); + assert_eq!(loaded, Some(details)); + + cache.remove_tx(&txid).expect("remove should succeed"); + let loaded = cache.get_tx(&txid).expect("get should succeed"); + assert_eq!(loaded, None); + } + + #[test] + fn wallet_cache_can_insert_and_get_script_pubkey_paths() { + let cache = WalletCache::new(); + let script = ScriptBuf::new(); + let path = (KeychainKind::External, 42); + + cache + .insert_script_pubkey(script.clone(), path) + .expect("insert should succeed"); + + let loaded = cache + .get_script_pubkey_path(script.as_script()) + .expect("get should succeed"); + assert_eq!(loaded, Some(path)); } } diff --git a/src/bdk/pg/script_pubkeys.rs b/src/bdk/pg/script_pubkeys.rs index 18f11085..8c6128fd 100644 --- a/src/bdk/pg/script_pubkeys.rs +++ b/src/bdk/pg/script_pubkeys.rs @@ -71,6 +71,7 @@ impl ScriptPubkeys { .map(|row| ScriptBuf::from(row.script))) } + #[allow(dead_code)] #[instrument(name = "bdk.script_pubkeys.load_all", skip_all)] pub async fn load_all( &self, From 746819d5020fe49427faba21c35a7f5e22fcca2d Mon Sep 17 00:00:00 2001 From: Juan P Lopez Date: Thu, 16 Apr 2026 11:59:19 -0500 Subject: [PATCH 2/8] perf(bdk): optimize sync db reads and harden pg adapters Move keychain filtering into SQL and add a lightweight iter_txs(false) path to reduce wallet sync query cost. Replace panic-prone unimplemented/unwrap/expect paths in pg wallet database adapters with recoverable errors to avoid process crashes under inconsistent data. --- .cargo/audit.toml | 9 +- ...d50bedd904b0e8934a065bad8ef339b3d41eb.json | 22 +++ ...6840136afd1c44dbe358dd1d414f32d01cb4.json} | 21 +-- ...6b23f3f84c9e843addc1b05160fd0e28fdf2c.json | 52 ++++++ src/bdk/pg/mod.rs | 82 +++++++-- src/bdk/pg/script_pubkeys.rs | 52 +++--- src/bdk/pg/transactions.rs | 157 +++++++++++++++--- src/bdk/pg/utxos.rs | 85 ++++++---- 8 files changed, 372 insertions(+), 108 deletions(-) create mode 100644 .sqlx/query-561f73d1c3ae19876eef43a68dad50bedd904b0e8934a065bad8ef339b3d41eb.json rename .sqlx/{query-79eff690e77e488dccc3c066b265e7718128987e44d97f408059cca30fc5124c.json => query-90eecb0b6744da1d056aae9b32706840136afd1c44dbe358dd1d414f32d01cb4.json} (56%) create mode 100644 .sqlx/query-dfa815aeb090f27d4fd45326c706b23f3f84c9e843addc1b05160fd0e28fdf2c.json diff --git a/.cargo/audit.toml b/.cargo/audit.toml index 159a1505..910cce68 100644 --- a/.cargo/audit.toml +++ b/.cargo/audit.toml @@ -1,2 +1,9 @@ [advisories] -ignore = ["RUSTSEC-2023-0071", "RUSTSEC-2026-0049"] +ignore = [ + "RUSTSEC-2023-0071", + "RUSTSEC-2026-0049", + # TODO: remove once electrum-client/bdk stop pulling rustls-webpki 0.101.7 + "RUSTSEC-2026-0098", + # TODO: remove once electrum-client/bdk stop pulling rustls-webpki 0.101.7 + "RUSTSEC-2026-0099", +] diff --git a/.sqlx/query-561f73d1c3ae19876eef43a68dad50bedd904b0e8934a065bad8ef339b3d41eb.json b/.sqlx/query-561f73d1c3ae19876eef43a68dad50bedd904b0e8934a065bad8ef339b3d41eb.json new file mode 100644 index 00000000..37cdcf73 --- /dev/null +++ b/.sqlx/query-561f73d1c3ae19876eef43a68dad50bedd904b0e8934a065bad8ef339b3d41eb.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT script FROM bdk_script_pubkeys\n WHERE keychain_id = $1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "script", + "type_info": "Bytea" + } + ], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [ + false + ] + }, + "hash": "561f73d1c3ae19876eef43a68dad50bedd904b0e8934a065bad8ef339b3d41eb" +} diff --git a/.sqlx/query-79eff690e77e488dccc3c066b265e7718128987e44d97f408059cca30fc5124c.json b/.sqlx/query-90eecb0b6744da1d056aae9b32706840136afd1c44dbe358dd1d414f32d01cb4.json similarity index 56% rename from .sqlx/query-79eff690e77e488dccc3c066b265e7718128987e44d97f408059cca30fc5124c.json rename to .sqlx/query-90eecb0b6744da1d056aae9b32706840136afd1c44dbe358dd1d414f32d01cb4.json index e2572075..02f7a0d2 100644 --- a/.sqlx/query-79eff690e77e488dccc3c066b265e7718128987e44d97f408059cca30fc5124c.json +++ b/.sqlx/query-90eecb0b6744da1d056aae9b32706840136afd1c44dbe358dd1d414f32d01cb4.json @@ -1,17 +1,18 @@ { "db_name": "PostgreSQL", - "query": "SELECT script, keychain_kind as \"keychain_kind: BdkKeychainKind\" FROM bdk_script_pubkeys\n WHERE keychain_id = $1", + "query": "SELECT script FROM bdk_script_pubkeys\n WHERE keychain_id = $1 AND keychain_kind = $2", "describe": { "columns": [ { "ordinal": 0, "name": "script", "type_info": "Bytea" - }, - { - "ordinal": 1, - "name": "keychain_kind: BdkKeychainKind", - "type_info": { + } + ], + "parameters": { + "Left": [ + "Uuid", + { "Custom": { "name": "bdkkeychainkind", "kind": { @@ -22,17 +23,11 @@ } } } - } - ], - "parameters": { - "Left": [ - "Uuid" ] }, "nullable": [ - false, false ] }, - "hash": "79eff690e77e488dccc3c066b265e7718128987e44d97f408059cca30fc5124c" + "hash": "90eecb0b6744da1d056aae9b32706840136afd1c44dbe358dd1d414f32d01cb4" } diff --git a/.sqlx/query-dfa815aeb090f27d4fd45326c706b23f3f84c9e843addc1b05160fd0e28fdf2c.json b/.sqlx/query-dfa815aeb090f27d4fd45326c706b23f3f84c9e843addc1b05160fd0e28fdf2c.json new file mode 100644 index 00000000..2e2c29dd --- /dev/null +++ b/.sqlx/query-dfa815aeb090f27d4fd45326c706b23f3f84c9e843addc1b05160fd0e28fdf2c.json @@ -0,0 +1,52 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT tx_id, sent, height,\n (details_json->>'received')::BIGINT AS \"received?\",\n (details_json->>'fee')::BIGINT AS \"fee?\",\n (details_json->'confirmation_time'->>'timestamp')::BIGINT AS \"confirmation_timestamp?\"\n FROM bdk_transactions\n WHERE keychain_id = $1 AND deleted_at IS NULL", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "tx_id", + "type_info": "Varchar" + }, + { + "ordinal": 1, + "name": "sent", + "type_info": "Int8" + }, + { + "ordinal": 2, + "name": "height", + "type_info": "Int4" + }, + { + "ordinal": 3, + "name": "received?", + "type_info": "Int8" + }, + { + "ordinal": 4, + "name": "fee?", + "type_info": "Int8" + }, + { + "ordinal": 5, + "name": "confirmation_timestamp?", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [ + false, + false, + true, + null, + null, + null + ] + }, + "hash": "dfa815aeb090f27d4fd45326c706b23f3f84c9e843addc1b05160fd0e28fdf2c" +} diff --git a/src/bdk/pg/mod.rs b/src/bdk/pg/mod.rs index 055fc107..43eb57dd 100644 --- a/src/bdk/pg/mod.rs +++ b/src/bdk/pg/mod.rs @@ -239,7 +239,9 @@ impl BatchOperations for SqlxWalletDb { } fn set_raw_tx(&mut self, _: &Transaction) -> Result<(), bdk::Error> { - unimplemented!() + Err(bdk::Error::Generic( + "set_raw_tx is not supported by SqlxWalletDb".to_string(), + )) } fn set_tx(&mut self, tx: &TransactionDetails) -> Result<(), bdk::Error> { @@ -264,13 +266,17 @@ impl BatchOperations for SqlxWalletDb { _: KeychainKind, _: u32, ) -> Result, bdk::Error> { - unimplemented!() + Err(bdk::Error::Generic( + "del_script_pubkey_from_path is not supported by SqlxWalletDb".to_string(), + )) } fn del_path_from_script_pubkey( &mut self, _: &Script, ) -> Result, bdk::Error> { - unimplemented!() + Err(bdk::Error::Generic( + "del_path_from_script_pubkey is not supported by SqlxWalletDb".to_string(), + )) } fn del_utxo(&mut self, outpoint: &OutPoint) -> Result, bdk::Error> { self.ctx @@ -278,7 +284,9 @@ impl BatchOperations for SqlxWalletDb { .block_on(async { self.utxos_repo().delete(outpoint).await }) } fn del_raw_tx(&mut self, _: &Txid) -> Result, bdk::Error> { - unimplemented!() + Err(bdk::Error::Generic( + "del_raw_tx is not supported by SqlxWalletDb".to_string(), + )) } fn del_tx( @@ -299,10 +307,14 @@ impl BatchOperations for SqlxWalletDb { Ok(deleted) } fn del_last_index(&mut self, _: KeychainKind) -> Result, bdk::Error> { - unimplemented!() + Err(bdk::Error::Generic( + "del_last_index is not supported by SqlxWalletDb".to_string(), + )) } fn del_sync_time(&mut self) -> Result, bdk::Error> { - unimplemented!() + Err(bdk::Error::Generic( + "del_sync_time is not supported by SqlxWalletDb".to_string(), + )) } } @@ -338,15 +350,32 @@ impl Database for SqlxWalletDb { .block_on(async { self.utxos_repo().list_local_utxos().await }) } fn iter_raw_txs(&self) -> Result, bdk::Error> { - unimplemented!() - } + Err(bdk::Error::Generic( + "iter_raw_txs is not supported by SqlxWalletDb".to_string(), + )) + } + + fn iter_txs(&self, include_raw: bool) -> Result, bdk::Error> { + let mut txs = if include_raw { + self.ctx + .rt + .block_on(async { self.transactions_repo().load_all().await })? + } else { + self.ctx + .rt + .block_on(async { self.transactions_repo().load_all_summaries().await })? + }; + + if include_raw { + txs.extend(self.batch.txs.iter().map(|(id, tx)| (*id, tx.clone()))); + } else { + txs.extend(self.batch.txs.iter().map(|(id, tx)| { + let mut tx = tx.clone(); + tx.transaction = None; + (*id, tx) + })); + } - fn iter_txs(&self, _: bool) -> Result, bdk::Error> { - let mut txs = self - .ctx - .rt - .block_on(async { self.transactions_repo().load_all().await })?; - txs.extend(self.batch.txs.iter().map(|(id, tx)| (*id, tx.clone()))); Ok(txs.into_values().collect()) } @@ -371,8 +400,29 @@ impl Database for SqlxWalletDb { .block_on(async { self.utxos_repo().find(outpoint).await }) } fn get_raw_tx(&self, tx_id: &Txid) -> Result, bdk::Error> { - self.lookup_tx(tx_id) - .map(|tx| tx.and_then(|tx| tx.transaction)) + if let Some(tx) = self.batch.txs.get(tx_id) { + if let Some(transaction) = &tx.transaction { + return Ok(Some(transaction.clone())); + } + } + + if let Some(tx) = self.cache.get_tx(tx_id)? { + if let Some(transaction) = tx.transaction { + return Ok(Some(transaction)); + } + } + + let found = self + .ctx + .rt + .block_on(async { self.transactions_repo().find_by_id(tx_id).await })?; + + if let Some(tx) = found { + self.cache.insert_tx(tx.txid, tx.clone())?; + Ok(tx.transaction) + } else { + Ok(None) + } } fn get_tx( &self, diff --git a/src/bdk/pg/script_pubkeys.rs b/src/bdk/pg/script_pubkeys.rs index 8c6128fd..f76eecee 100644 --- a/src/bdk/pg/script_pubkeys.rs +++ b/src/bdk/pg/script_pubkeys.rs @@ -122,27 +122,35 @@ impl ScriptPubkeys { keychain: Option>, ) -> Result, bdk::Error> { let kind = keychain.map(|k| k.into()); - let rows = sqlx::query!( - r#"SELECT script, keychain_kind as "keychain_kind: BdkKeychainKind" FROM bdk_script_pubkeys - WHERE keychain_id = $1"#, - Uuid::from(self.keychain_id), - ) - .fetch_all(&self.pool) - .await - .map_err(|e| bdk::Error::Generic(e.to_string()))?; - Ok(rows - .into_iter() - .filter_map(|row| { - if let Some(kind) = kind { - if kind == row.keychain_kind { - Some(ScriptBuf::from(row.script)) - } else { - None - } - } else { - Some(ScriptBuf::from(row.script)) - } - }) - .collect()) + if let Some(kind) = kind { + let rows = sqlx::query!( + r#"SELECT script FROM bdk_script_pubkeys + WHERE keychain_id = $1 AND keychain_kind = $2"#, + Uuid::from(self.keychain_id), + kind as BdkKeychainKind, + ) + .fetch_all(&self.pool) + .await + .map_err(|e| bdk::Error::Generic(e.to_string()))?; + + Ok(rows + .into_iter() + .map(|row| ScriptBuf::from(row.script)) + .collect()) + } else { + let rows = sqlx::query!( + r#"SELECT script FROM bdk_script_pubkeys + WHERE keychain_id = $1"#, + Uuid::from(self.keychain_id), + ) + .fetch_all(&self.pool) + .await + .map_err(|e| bdk::Error::Generic(e.to_string()))?; + + Ok(rows + .into_iter() + .map(|row| ScriptBuf::from(row.script)) + .collect()) + } } } diff --git a/src/bdk/pg/transactions.rs b/src/bdk/pg/transactions.rs index 34324a8f..9b7a4648 100644 --- a/src/bdk/pg/transactions.rs +++ b/src/bdk/pg/transactions.rs @@ -1,4 +1,4 @@ -use bdk::{bitcoin::Txid, LocalUtxo, TransactionDetails}; +use bdk::{bitcoin::Txid, BlockTime, LocalUtxo, TransactionDetails}; use sqlx::{PgPool, Postgres, QueryBuilder, Transaction}; use tracing::instrument; @@ -41,18 +41,32 @@ impl Transactions { let batches = txs.chunks(BATCH_SIZE); for batch in batches { + let serialized_batch = batch + .iter() + .map(|tx| { + Ok::<_, bdk::Error>(( + tx.txid.to_string(), + serde_json::to_value(tx).map_err(|e| { + bdk::Error::Generic(format!("failed to serialize tx details: {e}")) + })?, + tx.sent as i64, + tx.confirmation_time.as_ref().map(|t| t.height as i32), + )) + }) + .collect::, _>>()?; + let mut query_builder: QueryBuilder = QueryBuilder::new( r#" INSERT INTO bdk_transactions (keychain_id, tx_id, details_json, sent, height)"#, ); - query_builder.push_values(batch, |mut builder, tx| { + query_builder.push_values(serialized_batch, |mut builder, tx| { builder.push_bind(self.keychain_id as KeychainId); - builder.push_bind(tx.txid.to_string()); - builder.push_bind(serde_json::to_value(tx).unwrap()); - builder.push_bind(tx.sent as i64); - builder.push_bind(tx.confirmation_time.as_ref().map(|t| t.height as i32)); + builder.push_bind(tx.0); + builder.push_bind(tx.1); + builder.push_bind(tx.2); + builder.push_bind(tx.3); }); query_builder.push( @@ -92,9 +106,11 @@ impl Transactions { .await .map_err(|e| bdk::Error::Generic(e.to_string()))?; - Ok(tx.map(|tx| { - serde_json::from_value(tx.details_json).expect("could not deserialize tx details") - })) + tx.map(|tx| { + serde_json::from_value(tx.details_json) + .map_err(|e| bdk::Error::Generic(format!("could not deserialize tx details: {e}"))) + }) + .transpose() } #[allow(dead_code)] @@ -109,7 +125,11 @@ impl Transactions { .fetch_optional(&self.pool) .await .map_err(|e| bdk::Error::Generic(e.to_string()))?; - Ok(tx.map(|tx| serde_json::from_value(tx.details_json).unwrap())) + tx.map(|tx| { + serde_json::from_value(tx.details_json) + .map_err(|e| bdk::Error::Generic(format!("could not deserialize tx details: {e}"))) + }) + .transpose() } #[instrument(name = "bdk.transactions.load_all", skip(self), fields(n_rows))] @@ -123,13 +143,77 @@ impl Transactions { .await .map_err(|e| bdk::Error::Generic(e.to_string()))?; tracing::Span::current().record("n_rows", txs.len()); - Ok(txs - .into_iter() + txs.into_iter() .map(|tx| { - let tx = serde_json::from_value::(tx.details_json).unwrap(); - (tx.txid, tx) + serde_json::from_value::(tx.details_json) + .map(|tx| (tx.txid, tx)) + .map_err(|e| { + bdk::Error::Generic(format!("could not deserialize tx details: {e}")) + }) }) - .collect()) + .collect() + } + + #[instrument( + name = "bdk.transactions.load_all_summaries", + skip(self), + fields(n_rows) + )] + pub async fn load_all_summaries( + &self, + ) -> Result, bdk::Error> { + let rows = sqlx::query!( + r#" + SELECT tx_id, sent, height, + (details_json->>'received')::BIGINT AS "received?", + (details_json->>'fee')::BIGINT AS "fee?", + (details_json->'confirmation_time'->>'timestamp')::BIGINT AS "confirmation_timestamp?" + FROM bdk_transactions + WHERE keychain_id = $1 AND deleted_at IS NULL"#, + self.keychain_id as KeychainId, + ) + .fetch_all(&self.pool) + .await + .map_err(|e| bdk::Error::Generic(e.to_string()))?; + + tracing::Span::current().record("n_rows", rows.len()); + + fn to_u64(value: i64, field: &str) -> Result { + if value < 0 { + return Err(bdk::Error::Generic(format!( + "negative {field} value in bdk_transactions" + ))); + } + Ok(value as u64) + } + + rows.into_iter() + .map(|row| { + let txid = row + .tx_id + .parse::() + .map_err(|e| bdk::Error::Generic(format!("invalid tx_id in db: {e}")))?; + + let confirmation_time = match (row.height, row.confirmation_timestamp) { + (Some(height), Some(timestamp)) => Some(BlockTime { + height: height as u32, + timestamp: to_u64(timestamp, "confirmation timestamp")?, + }), + _ => None, + }; + + let details = TransactionDetails { + txid, + transaction: None, + received: to_u64(row.received.unwrap_or_default(), "received")?, + sent: to_u64(row.sent, "sent")?, + fee: row.fee.map(|f| to_u64(f, "fee")).transpose()?, + confirmation_time, + }; + + Ok((txid, details)) + }) + .collect() } #[instrument(name = "bdk.transactions.find_unsynced_tx", skip(self), fields(n_rows))] @@ -184,11 +268,22 @@ impl Transactions { inputs.push((utxo, row.path as u32)); } if tx_id.is_none() { - tx_id = Some(row.tx_id.parse().expect("couldn't parse tx_id")); + tx_id = Some(row.tx_id.parse().map_err(|e| { + bdk::Error::Generic(format!("invalid tx id from bdk_transactions: {e}")) + })?); let details: TransactionDetails = serde_json::from_value(row.details_json)?; total_utxo_in_sats = Satoshis::from(details.sent); - fee_sats = Satoshis::from(details.fee.expect("Fee")); - vsize = details.transaction.expect("transaction").vsize() as u64; + fee_sats = Satoshis::from(details.fee.ok_or_else(|| { + bdk::Error::Generic("missing fee in unsynced transaction details".to_string()) + })?); + vsize = details + .transaction + .ok_or_else(|| { + bdk::Error::Generic( + "missing raw transaction in unsynced transaction details".to_string(), + ) + })? + .vsize() as u64; confirmation_time = details.confirmation_time; } } @@ -257,19 +352,29 @@ impl Transactions { inputs.push(utxo); } if tx_id.is_none() { - tx_id = Some(row.tx_id.parse().expect("couldn't parse tx_id")); + tx_id = Some(row.tx_id.parse().map_err(|e| { + bdk::Error::Generic(format!("invalid tx id from bdk_transactions: {e}")) + })?); let details: TransactionDetails = serde_json::from_value(row.details_json)?; confirmation_time = details.confirmation_time; } } - Ok(tx_id.map(|tx_id| ConfirmedSpendTransaction { - tx_id, - confirmation_time: confirmation_time - .expect("query should always return confirmation_time"), - inputs, - outputs, - })) + if let Some(tx_id) = tx_id { + let confirmation_time = confirmation_time.ok_or_else(|| { + bdk::Error::Generic( + "missing confirmation_time in confirmed spend transaction details".to_string(), + ) + })?; + Ok(Some(ConfirmedSpendTransaction { + tx_id, + confirmation_time, + inputs, + outputs, + })) + } else { + Ok(None) + } } #[instrument(name = "bdk.transactions.mark_as_synced", skip(self))] diff --git a/src/bdk/pg/utxos.rs b/src/bdk/pg/utxos.rs index 60628a19..65cca85e 100644 --- a/src/bdk/pg/utxos.rs +++ b/src/bdk/pg/utxos.rs @@ -27,17 +27,31 @@ impl Utxos { let batches = utxos.chunks(BATCH_SIZE); for batch in batches { + let serialized_batch = batch + .iter() + .map(|utxo| { + Ok::<_, bdk::Error>(( + utxo.outpoint.txid.to_string(), + utxo.outpoint.vout as i32, + serde_json::to_value(utxo).map_err(|e| { + bdk::Error::Generic(format!("failed to serialize utxo: {e}")) + })?, + utxo.is_spent, + )) + }) + .collect::, _>>()?; + let mut query_builder: QueryBuilder = QueryBuilder::new( r#"INSERT INTO bdk_utxos (keychain_id, tx_id, vout, utxo_json, is_spent)"#, ); - query_builder.push_values(batch, |mut builder, utxo| { + query_builder.push_values(serialized_batch, |mut builder, utxo| { builder.push_bind(Uuid::from(self.keychain_id)); - builder.push_bind(utxo.outpoint.txid.to_string()); - builder.push_bind(utxo.outpoint.vout as i32); - builder.push_bind(serde_json::to_value(utxo).unwrap()); - builder.push_bind(utxo.is_spent); + builder.push_bind(utxo.0); + builder.push_bind(utxo.1); + builder.push_bind(utxo.2); + builder.push_bind(utxo.3); }); query_builder.push( @@ -78,9 +92,11 @@ impl Utxos { .await .map_err(|e| bdk::Error::Generic(e.to_string()))?; - Ok(row.map(|row| { - serde_json::from_value::(row.utxo_json).expect("Could not deserialize utxo") - })) + row.map(|row| { + serde_json::from_value::(row.utxo_json) + .map_err(|e| bdk::Error::Generic(format!("could not deserialize utxo: {e}"))) + }) + .transpose() } #[instrument(name = "bdk.utxos.undelete", skip_all)] @@ -116,9 +132,11 @@ impl Utxos { .await .map_err(|e| bdk::Error::Generic(e.to_string()))?; - Ok(utxo.map(|utxo| { - serde_json::from_value(utxo.utxo_json).expect("Could not deserialize utxo") - })) + utxo.map(|utxo| { + serde_json::from_value(utxo.utxo_json) + .map_err(|e| bdk::Error::Generic(format!("could not deserialize utxo: {e}"))) + }) + .transpose() } #[instrument(name = "bdk.utxos.list_local_utxos", skip_all)] @@ -130,10 +148,13 @@ impl Utxos { .fetch_all(&self.pool) .await .map_err(|e| bdk::Error::Generic(e.to_string()))?; - Ok(utxos + utxos .into_iter() - .map(|utxo| serde_json::from_value(utxo.utxo_json).expect("Could not deserialize utxo")) - .collect()) + .map(|utxo| { + serde_json::from_value(utxo.utxo_json) + .map_err(|e| bdk::Error::Generic(format!("could not deserialize utxo: {e}"))) + }) + .collect() } #[instrument(name = "bdk.utxos.mark_as_synced", skip(self, tx))] @@ -206,19 +227,22 @@ impl Utxos { .fetch_optional(&mut **tx) .await?; - Ok(row.map(|row| { - let local_utxo = serde_json::from_value::(row.utxo_json) - .expect("Could not deserialize utxo"); - let tx_details = serde_json::from_value::(row.details_json) - .expect("Could not deserialize tx details"); - ConfirmedIncomeUtxo { + if let Some(row) = row { + let local_utxo = serde_json::from_value::(row.utxo_json)?; + let tx_details = serde_json::from_value::(row.details_json)?; + let confirmation_time = tx_details.confirmation_time.ok_or_else(|| { + bdk::Error::Generic( + "missing confirmation_time in confirmed income transaction details".to_string(), + ) + })?; + Ok(Some(ConfirmedIncomeUtxo { outpoint: local_utxo.outpoint, spent: local_utxo.is_spent, - confirmation_time: tx_details - .confirmation_time - .expect("query should always return confirmation_time"), - } - })) + confirmation_time, + })) + } else { + Ok(None) + } } #[instrument(name = "bdk.utxos.find_and_remove_soft_deleted_utxo", skip_all)] @@ -238,11 +262,12 @@ impl Utxos { ) .fetch_optional(&mut **tx) .await?; - Ok(row.map(|row| { - let local_utxo = serde_json::from_value::(row.utxo_json) - .expect("Could not deserialize the utxo"); + if let Some(row) = row { + let local_utxo = serde_json::from_value::(row.utxo_json)?; let keychain_id = KeychainId::from(row.keychain_id); - (local_utxo.outpoint, keychain_id) - })) + Ok(Some((local_utxo.outpoint, keychain_id))) + } else { + Ok(None) + } } } From bd3b0d44e06b2ededaec8247a582a896c522a032 Mon Sep 17 00:00:00 2001 From: Juan P Lopez Date: Thu, 16 Apr 2026 12:13:38 -0500 Subject: [PATCH 3/8] refactor(bdk): simplify pg wallet adapter flows Simplify control flow and batching code in the Postgres BDK adapters without changing behavior. This keeps recent sync performance optimizations while reducing duplication and making cache/DB paths easier to reason about. --- src/bdk/pg/mod.rs | 44 ++++++++++----------- src/bdk/pg/script_pubkeys.rs | 77 +++++++++++++++++------------------- src/bdk/pg/transactions.rs | 17 ++++---- src/bdk/pg/utxos.rs | 17 ++++---- 4 files changed, 78 insertions(+), 77 deletions(-) diff --git a/src/bdk/pg/mod.rs b/src/bdk/pg/mod.rs index 43eb57dd..47167d66 100644 --- a/src/bdk/pg/mod.rs +++ b/src/bdk/pg/mod.rs @@ -400,16 +400,17 @@ impl Database for SqlxWalletDb { .block_on(async { self.utxos_repo().find(outpoint).await }) } fn get_raw_tx(&self, tx_id: &Txid) -> Result, bdk::Error> { - if let Some(tx) = self.batch.txs.get(tx_id) { - if let Some(transaction) = &tx.transaction { - return Ok(Some(transaction.clone())); - } + if let Some(transaction) = self + .batch + .txs + .get(tx_id) + .and_then(|tx| tx.transaction.clone()) + { + return Ok(Some(transaction)); } - if let Some(tx) = self.cache.get_tx(tx_id)? { - if let Some(transaction) = tx.transaction { - return Ok(Some(transaction)); - } + if let Some(transaction) = self.cache.get_tx(tx_id)?.and_then(|tx| tx.transaction) { + return Ok(Some(transaction)); } let found = self @@ -463,26 +464,23 @@ impl BatchDatabase for SqlxWalletDb { &mut self, mut batch: ::Batch, ) -> Result<(), bdk::Error> { - let addresses_for_cache: Vec<_> = batch - .batch - .addresses - .iter() - .map(|(script, (keychain, path))| (script.clone(), (*keychain, *path))) - .collect(); - let addresses_for_db: Vec<_> = batch + let (addresses_for_cache, addresses_for_db): (Vec<_>, Vec<_>) = batch .batch .addresses .drain() - .map(|(script, (keychain, path))| (BdkKeychainKind::from(keychain), path, script)) - .collect(); - - let txs_for_cache: Vec<_> = batch + .map(|(script, (keychain, path))| { + let cache_entry = (script.clone(), (keychain, path)); + let db_entry = (BdkKeychainKind::from(keychain), path, script); + (cache_entry, db_entry) + }) + .unzip(); + + let (txs_for_cache, txs_for_db): (Vec<_>, Vec<_>) = batch .batch .txs - .iter() - .map(|(txid, tx)| (*txid, tx.clone())) - .collect(); - let txs_for_db: Vec<_> = batch.batch.txs.drain().map(|(_, tx)| tx).collect(); + .drain() + .map(|(txid, tx)| ((txid, tx.clone()), tx)) + .unzip(); let utxos_for_db = std::mem::take(&mut batch.batch.utxos); let keychain_id = batch.ctx.keychain_id; diff --git a/src/bdk/pg/script_pubkeys.rs b/src/bdk/pg/script_pubkeys.rs index f76eecee..c6fa643b 100644 --- a/src/bdk/pg/script_pubkeys.rs +++ b/src/bdk/pg/script_pubkeys.rs @@ -55,20 +55,18 @@ impl ScriptPubkeys { path: u32, ) -> Result, bdk::Error> { let kind = keychain.into(); - let rows = sqlx::query!( + let row = sqlx::query!( r#"SELECT script FROM bdk_script_pubkeys WHERE keychain_id = $1 AND keychain_kind = $2 AND path = $3"#, Uuid::from(self.keychain_id), kind as BdkKeychainKind, path as i32, ) - .fetch_all(&self.pool) + .fetch_optional(&self.pool) .await .map_err(|e| bdk::Error::Generic(e.to_string()))?; - Ok(rows - .into_iter() - .next() - .map(|row| ScriptBuf::from(row.script))) + + Ok(row.map(|row| ScriptBuf::from(row.script))) } #[allow(dead_code)] @@ -100,20 +98,17 @@ impl ScriptPubkeys { &self, script: &ScriptBuf, ) -> Result, bdk::Error> { - let rows = sqlx::query!( + let row = sqlx::query!( r#"SELECT keychain_kind as "keychain_kind: BdkKeychainKind", path FROM bdk_script_pubkeys WHERE keychain_id = $1 AND script_hex = ENCODE($2, 'hex')"#, Uuid::from(self.keychain_id), script.as_bytes(), ) - .fetch_all(&self.pool) + .fetch_optional(&self.pool) .await .map_err(|e| bdk::Error::Generic(e.to_string()))?; - if let Some(row) = rows.into_iter().next() { - Ok(Some((row.keychain_kind, row.path as u32))) - } else { - Ok(None) - } + + Ok(row.map(|row| (row.keychain_kind, row.path as u32))) } #[instrument(name = "bdk.script_pubkeys.list_scripts", skip_all)] @@ -121,36 +116,38 @@ impl ScriptPubkeys { &self, keychain: Option>, ) -> Result, bdk::Error> { - let kind = keychain.map(|k| k.into()); - if let Some(kind) = kind { - let rows = sqlx::query!( - r#"SELECT script FROM bdk_script_pubkeys + let scripts = match keychain.map(|k| k.into()) { + Some(kind) => { + let rows = sqlx::query!( + r#"SELECT script FROM bdk_script_pubkeys WHERE keychain_id = $1 AND keychain_kind = $2"#, - Uuid::from(self.keychain_id), - kind as BdkKeychainKind, - ) - .fetch_all(&self.pool) - .await - .map_err(|e| bdk::Error::Generic(e.to_string()))?; + Uuid::from(self.keychain_id), + kind as BdkKeychainKind, + ) + .fetch_all(&self.pool) + .await + .map_err(|e| bdk::Error::Generic(e.to_string()))?; - Ok(rows - .into_iter() - .map(|row| ScriptBuf::from(row.script)) - .collect()) - } else { - let rows = sqlx::query!( - r#"SELECT script FROM bdk_script_pubkeys + rows.into_iter() + .map(|row| ScriptBuf::from(row.script)) + .collect() + } + None => { + let rows = sqlx::query!( + r#"SELECT script FROM bdk_script_pubkeys WHERE keychain_id = $1"#, - Uuid::from(self.keychain_id), - ) - .fetch_all(&self.pool) - .await - .map_err(|e| bdk::Error::Generic(e.to_string()))?; + Uuid::from(self.keychain_id), + ) + .fetch_all(&self.pool) + .await + .map_err(|e| bdk::Error::Generic(e.to_string()))?; - Ok(rows - .into_iter() - .map(|row| ScriptBuf::from(row.script)) - .collect()) - } + rows.into_iter() + .map(|row| ScriptBuf::from(row.script)) + .collect() + } + }; + + Ok(scripts) } } diff --git a/src/bdk/pg/transactions.rs b/src/bdk/pg/transactions.rs index 9b7a4648..6514bfff 100644 --- a/src/bdk/pg/transactions.rs +++ b/src/bdk/pg/transactions.rs @@ -61,13 +61,16 @@ impl Transactions { (keychain_id, tx_id, details_json, sent, height)"#, ); - query_builder.push_values(serialized_batch, |mut builder, tx| { - builder.push_bind(self.keychain_id as KeychainId); - builder.push_bind(tx.0); - builder.push_bind(tx.1); - builder.push_bind(tx.2); - builder.push_bind(tx.3); - }); + query_builder.push_values( + serialized_batch, + |mut builder, (tx_id, details_json, sent, height)| { + builder.push_bind(self.keychain_id as KeychainId); + builder.push_bind(tx_id); + builder.push_bind(details_json); + builder.push_bind(sent); + builder.push_bind(height); + }, + ); query_builder.push( "ON CONFLICT (keychain_id, tx_id) DO UPDATE \ diff --git a/src/bdk/pg/utxos.rs b/src/bdk/pg/utxos.rs index 65cca85e..aab4e39d 100644 --- a/src/bdk/pg/utxos.rs +++ b/src/bdk/pg/utxos.rs @@ -46,13 +46,16 @@ impl Utxos { (keychain_id, tx_id, vout, utxo_json, is_spent)"#, ); - query_builder.push_values(serialized_batch, |mut builder, utxo| { - builder.push_bind(Uuid::from(self.keychain_id)); - builder.push_bind(utxo.0); - builder.push_bind(utxo.1); - builder.push_bind(utxo.2); - builder.push_bind(utxo.3); - }); + query_builder.push_values( + serialized_batch, + |mut builder, (tx_id, vout, utxo_json, is_spent)| { + builder.push_bind(Uuid::from(self.keychain_id)); + builder.push_bind(tx_id); + builder.push_bind(vout); + builder.push_bind(utxo_json); + builder.push_bind(is_spent); + }, + ); query_builder.push( "ON CONFLICT (keychain_id, tx_id, vout) DO UPDATE \ From a5615da0268d8f0bc5a1baf83090e2c98974e82e Mon Sep 17 00:00:00 2001 From: Juan P Lopez Date: Thu, 16 Apr 2026 12:44:08 -0500 Subject: [PATCH 4/8] fix(bdk): make pg wallet batch writes atomic Execute script pubkeys, utxos, and transactions persistence in a single SQL transaction during commit_batch to avoid partial DB state on failures. Also unify tx lookup behavior for raw vs summary reads, remove unused script_pubkeys load-all path, and document retained non-transactional persist helpers. --- ...358ceeb651c8f971f92951b5e2d1e113b1286.json | 44 ----------- src/bdk/pg/mod.rs | 75 +++++++++++------- src/bdk/pg/script_pubkeys.rs | 66 +++++++++------- src/bdk/pg/transactions.rs | 78 +++++++++++++++++-- src/bdk/pg/utxos.rs | 76 ++++++++++++++++-- 5 files changed, 223 insertions(+), 116 deletions(-) delete mode 100644 .sqlx/query-6c30dc4e7c409d797b957058f3f358ceeb651c8f971f92951b5e2d1e113b1286.json diff --git a/.sqlx/query-6c30dc4e7c409d797b957058f3f358ceeb651c8f971f92951b5e2d1e113b1286.json b/.sqlx/query-6c30dc4e7c409d797b957058f3f358ceeb651c8f971f92951b5e2d1e113b1286.json deleted file mode 100644 index 9021ebaf..00000000 --- a/.sqlx/query-6c30dc4e7c409d797b957058f3f358ceeb651c8f971f92951b5e2d1e113b1286.json +++ /dev/null @@ -1,44 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "SELECT script, keychain_kind as \"keychain_kind: BdkKeychainKind\", path FROM bdk_script_pubkeys\n WHERE keychain_id = $1", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "script", - "type_info": "Bytea" - }, - { - "ordinal": 1, - "name": "keychain_kind: BdkKeychainKind", - "type_info": { - "Custom": { - "name": "bdkkeychainkind", - "kind": { - "Enum": [ - "external", - "internal" - ] - } - } - } - }, - { - "ordinal": 2, - "name": "path", - "type_info": "Int4" - } - ], - "parameters": { - "Left": [ - "Uuid" - ] - }, - "nullable": [ - false, - false, - false - ] - }, - "hash": "6c30dc4e7c409d797b957058f3f358ceeb651c8f971f92951b5e2d1e113b1286" -} diff --git a/src/bdk/pg/mod.rs b/src/bdk/pg/mod.rs index 47167d66..8c8bcb22 100644 --- a/src/bdk/pg/mod.rs +++ b/src/bdk/pg/mod.rs @@ -30,6 +30,12 @@ pub use utxos::*; type ScriptPubkeyCache = HashMap; type TransactionCache = HashMap; +#[derive(Copy, Clone, Eq, PartialEq)] +enum TxLookupMode { + Any, + RequireRaw, +} + #[derive(Clone)] struct WalletDbContext { rt: Handle, @@ -200,13 +206,21 @@ impl SqlxWalletDb { Ok(None) } - fn lookup_tx(&self, txid: &Txid) -> Result, bdk::Error> { + fn lookup_tx_with_mode( + &self, + txid: &Txid, + mode: TxLookupMode, + ) -> Result, bdk::Error> { if let Some(tx) = self.batch.txs.get(txid) { - return Ok(Some(tx.clone())); + if mode == TxLookupMode::Any || tx.transaction.is_some() { + return Ok(Some(tx.clone())); + } } if let Some(tx) = self.cache.get_tx(txid)? { - return Ok(Some(tx)); + if mode == TxLookupMode::Any || tx.transaction.is_some() { + return Ok(Some(tx)); + } } let found = self @@ -214,12 +228,20 @@ impl SqlxWalletDb { .rt .block_on(async { self.transactions_repo().find_by_id(txid).await })?; + // DB rows represent persisted TransactionDetails; this store does not persist a + // "summary-only" transaction format. A DB hit is therefore valid for both lookup + // modes (`Any` and `RequireRaw`). + if let Some(tx) = &found { self.cache.insert_tx(tx.txid, tx.clone())?; } Ok(found) } + + fn lookup_tx(&self, txid: &Txid) -> Result, bdk::Error> { + self.lookup_tx_with_mode(txid, TxLookupMode::Any) + } } impl BatchOperations for SqlxWalletDb { @@ -250,12 +272,16 @@ impl BatchOperations for SqlxWalletDb { } fn set_last_index(&mut self, kind: KeychainKind, idx: u32) -> Result<(), bdk::Error> { + // NOTE: This write is intentionally immediate because BDK may call it outside of + // `commit_batch` flow. self.ctx .rt .block_on(async { self.indexes_repo().persist_last_index(kind, idx).await }) } fn set_sync_time(&mut self, time: SyncTime) -> Result<(), bdk::Error> { + // NOTE: This write is intentionally immediate because BDK may call it outside of + // `commit_batch` flow. self.ctx .rt .block_on(async { self.sync_times_repo().persist(time).await }) @@ -369,6 +395,8 @@ impl Database for SqlxWalletDb { if include_raw { txs.extend(self.batch.txs.iter().map(|(id, tx)| (*id, tx.clone()))); } else { + // `load_all_summaries` already strips raw transactions from persisted rows, but + // batch entries can still carry `transaction: Some(_)`; normalize them here. txs.extend(self.batch.txs.iter().map(|(id, tx)| { let mut tx = tx.clone(); tx.transaction = None; @@ -400,30 +428,8 @@ impl Database for SqlxWalletDb { .block_on(async { self.utxos_repo().find(outpoint).await }) } fn get_raw_tx(&self, tx_id: &Txid) -> Result, bdk::Error> { - if let Some(transaction) = self - .batch - .txs - .get(tx_id) - .and_then(|tx| tx.transaction.clone()) - { - return Ok(Some(transaction)); - } - - if let Some(transaction) = self.cache.get_tx(tx_id)?.and_then(|tx| tx.transaction) { - return Ok(Some(transaction)); - } - - let found = self - .ctx - .rt - .block_on(async { self.transactions_repo().find_by_id(tx_id).await })?; - - if let Some(tx) = found { - self.cache.insert_tx(tx.txid, tx.clone())?; - Ok(tx.transaction) - } else { - Ok(None) - } + self.lookup_tx_with_mode(tx_id, TxLookupMode::RequireRaw) + .map(|tx| tx.and_then(|tx| tx.transaction)) } fn get_tx( &self, @@ -487,24 +493,33 @@ impl BatchDatabase for SqlxWalletDb { let pool = batch.ctx.pool.clone(); batch.ctx.rt.block_on(async move { + let mut tx = pool + .begin() + .await + .map_err(|e| bdk::Error::Generic(e.to_string()))?; + if !addresses_for_db.is_empty() { ScriptPubkeys::new(keychain_id, pool.clone()) - .persist_all(addresses_for_db) + .persist_all_in_tx(&mut tx, addresses_for_db) .await?; } if !utxos_for_db.is_empty() { Utxos::new(keychain_id, pool.clone()) - .persist_all(utxos_for_db) + .persist_all_in_tx(&mut tx, utxos_for_db) .await?; } if !txs_for_db.is_empty() { Transactions::new(keychain_id, pool) - .persist_all(txs_for_db) + .persist_all_in_tx(&mut tx, txs_for_db) .await?; } + tx.commit() + .await + .map_err(|e| bdk::Error::Generic(e.to_string()))?; + Ok::<_, bdk::Error>(()) })?; diff --git a/src/bdk/pg/script_pubkeys.rs b/src/bdk/pg/script_pubkeys.rs index c6fa643b..c5d7aa74 100644 --- a/src/bdk/pg/script_pubkeys.rs +++ b/src/bdk/pg/script_pubkeys.rs @@ -1,5 +1,4 @@ -use sqlx::{PgPool, Postgres, QueryBuilder}; -use std::collections::HashMap; +use sqlx::{PgPool, Postgres, QueryBuilder, Transaction}; use tracing::instrument; use uuid::Uuid; @@ -17,6 +16,8 @@ impl ScriptPubkeys { } #[instrument(name = "bdk.script_pubkeys.persist_all", skip_all)] + // Retained for non-transactional call sites and focused tests. + #[allow(dead_code)] pub async fn persist_all( &self, keys: Vec<(BdkKeychainKind, u32, ScriptBuf)>, @@ -39,12 +40,45 @@ impl ScriptPubkeys { }); query_builder.push("ON CONFLICT DO NOTHING"); - let query = query_builder.build(); - query + query_builder + .build() .execute(&self.pool) .await .map_err(|e| bdk::Error::Generic(e.to_string()))?; } + + Ok(()) + } + + pub async fn persist_all_in_tx( + &self, + tx: &mut Transaction<'_, Postgres>, + keys: Vec<(BdkKeychainKind, u32, ScriptBuf)>, + ) -> Result<(), bdk::Error> { + const BATCH_SIZE: usize = 5000; + let chunks = keys.chunks(BATCH_SIZE); + for chunk in chunks { + let mut query_builder: QueryBuilder = QueryBuilder::new( + r#"INSERT INTO bdk_script_pubkeys + (keychain_id, keychain_kind, path, script, script_hex, script_fmt)"#, + ); + + query_builder.push_values(chunk, |mut builder, (keychain, path, script)| { + builder.push_bind(self.keychain_id); + builder.push_bind(keychain); + builder.push_bind(*path as i32); + builder.push_bind(script.as_bytes()); + builder.push_bind(format!("{script:02x}")); + builder.push_bind(format!("{script:?}")); + }); + query_builder.push("ON CONFLICT DO NOTHING"); + + query_builder + .build() + .execute(tx.as_mut()) + .await + .map_err(|e| bdk::Error::Generic(e.to_string()))?; + } Ok(()) } @@ -69,30 +103,6 @@ impl ScriptPubkeys { Ok(row.map(|row| ScriptBuf::from(row.script))) } - #[allow(dead_code)] - #[instrument(name = "bdk.script_pubkeys.load_all", skip_all)] - pub async fn load_all( - &self, - ) -> Result, bdk::Error> { - let rows = sqlx::query!( - r#"SELECT script, keychain_kind as "keychain_kind: BdkKeychainKind", path FROM bdk_script_pubkeys - WHERE keychain_id = $1"#, - Uuid::from(self.keychain_id), - ) - .fetch_all(&self.pool) - .await - .map_err(|e| bdk::Error::Generic(e.to_string()))?; - let mut ret = HashMap::new(); - for row in rows { - ret.insert( - ScriptBuf::from(row.script), - (bdk::KeychainKind::from(row.keychain_kind), row.path as u32), - ); - } - Ok(ret) - } - - #[allow(dead_code)] #[instrument(name = "bdk.script_pubkeys.find_path", skip_all)] pub async fn find_path( &self, diff --git a/src/bdk/pg/transactions.rs b/src/bdk/pg/transactions.rs index 6514bfff..dca973cd 100644 --- a/src/bdk/pg/transactions.rs +++ b/src/bdk/pg/transactions.rs @@ -1,5 +1,5 @@ use bdk::{bitcoin::Txid, BlockTime, LocalUtxo, TransactionDetails}; -use sqlx::{PgPool, Postgres, QueryBuilder, Transaction}; +use sqlx::{PgPool, Postgres, QueryBuilder, Transaction as SqlxTransaction}; use tracing::instrument; use std::collections::HashMap; @@ -36,6 +36,8 @@ impl Transactions { } #[instrument(name = "bdk.transactions.persist", skip_all)] + // Retained for non-transactional call sites and focused tests. + #[allow(dead_code)] pub async fn persist_all(&self, txs: Vec) -> Result<(), bdk::Error> { const BATCH_SIZE: usize = 2000; let batches = txs.chunks(BATCH_SIZE); @@ -85,8 +87,8 @@ impl Transactions { OR bdk_transactions.deleted_at IS NOT NULL", ); - let query = query_builder.build(); - query + query_builder + .build() .execute(&self.pool) .await .map_err(|e| bdk::Error::Generic(e.to_string()))?; @@ -95,6 +97,69 @@ impl Transactions { Ok(()) } + pub async fn persist_all_in_tx( + &self, + tx: &mut SqlxTransaction<'_, Postgres>, + txs: Vec, + ) -> Result<(), bdk::Error> { + const BATCH_SIZE: usize = 2000; + let batches = txs.chunks(BATCH_SIZE); + + for batch in batches { + let serialized_batch = batch + .iter() + .map(|tx| { + Ok::<_, bdk::Error>(( + tx.txid.to_string(), + serde_json::to_value(tx).map_err(|e| { + bdk::Error::Generic(format!("failed to serialize tx details: {e}")) + })?, + tx.sent as i64, + tx.confirmation_time.as_ref().map(|t| t.height as i32), + )) + }) + .collect::, _>>()?; + + let mut query_builder: QueryBuilder = QueryBuilder::new( + r#" + INSERT INTO bdk_transactions + (keychain_id, tx_id, details_json, sent, height)"#, + ); + + query_builder.push_values( + serialized_batch, + |mut builder, (tx_id, details_json, sent, height)| { + builder.push_bind(self.keychain_id as KeychainId); + builder.push_bind(tx_id); + builder.push_bind(details_json); + builder.push_bind(sent); + builder.push_bind(height); + }, + ); + + query_builder.push( + "ON CONFLICT (keychain_id, tx_id) DO UPDATE \ + SET details_json = EXCLUDED.details_json,\ + sent = EXCLUDED.sent,\ + height = EXCLUDED.height,\ + modified_at = NOW(),\ + deleted_at = NULL \ + WHERE bdk_transactions.details_json IS DISTINCT FROM EXCLUDED.details_json \ + OR bdk_transactions.sent IS DISTINCT FROM EXCLUDED.sent \ + OR bdk_transactions.height IS DISTINCT FROM EXCLUDED.height \ + OR bdk_transactions.deleted_at IS NOT NULL", + ); + + query_builder + .build() + .execute(tx.as_mut()) + .await + .map_err(|e| bdk::Error::Generic(e.to_string()))?; + } + + Ok(()) + } + #[instrument(name = "bdk.transactions.delete", skip_all)] pub async fn delete(&self, tx_id: &Txid) -> Result, bdk::Error> { let tx = sqlx::query!( @@ -116,7 +181,6 @@ impl Transactions { .transpose() } - #[allow(dead_code)] #[instrument(name = "bdk.transactions.find_by_id", skip_all)] pub async fn find_by_id(&self, tx_id: &Txid) -> Result, bdk::Error> { let tx = sqlx::query!( @@ -304,7 +368,7 @@ impl Transactions { #[instrument(name = "bdk.transactions.find_confirmed_spend_tx", skip(self, tx))] pub async fn find_confirmed_spend_tx( &self, - tx: &mut Transaction<'_, Postgres>, + tx: &mut SqlxTransaction<'_, Postgres>, min_height: u32, ) -> Result, BdkError> { let rows = sqlx::query!(r#" @@ -396,7 +460,7 @@ impl Transactions { #[instrument(name = "bdk.transactions.mark_confirmed", skip(self))] pub async fn mark_confirmed( &self, - tx: &mut Transaction<'_, Postgres>, + tx: &mut SqlxTransaction<'_, Postgres>, tx_id: bitcoin::Txid, ) -> Result<(), BdkError> { sqlx::query!( @@ -416,7 +480,7 @@ impl Transactions { )] pub async fn delete_transaction_if_no_more_utxos_exist( &self, - tx: &mut Transaction<'_, Postgres>, + tx: &mut SqlxTransaction<'_, Postgres>, outpoint: bitcoin::OutPoint, ) -> Result<(), BdkError> { sqlx::query!( diff --git a/src/bdk/pg/utxos.rs b/src/bdk/pg/utxos.rs index aab4e39d..736c2eae 100644 --- a/src/bdk/pg/utxos.rs +++ b/src/bdk/pg/utxos.rs @@ -1,5 +1,5 @@ use bdk::{bitcoin::blockdata::transaction::OutPoint, LocalUtxo, TransactionDetails}; -use sqlx::{PgPool, Postgres, QueryBuilder, Transaction}; +use sqlx::{PgPool, Postgres, QueryBuilder, Transaction as SqlxTransaction}; use tracing::instrument; use uuid::Uuid; @@ -22,6 +22,8 @@ impl Utxos { } #[instrument(name = "bdk.utxos.persist_all", skip_all)] + // Retained for non-transactional call sites and focused tests. + #[allow(dead_code)] pub async fn persist_all(&self, utxos: Vec) -> Result<(), bdk::Error> { const BATCH_SIZE: usize = 2000; let batches = utxos.chunks(BATCH_SIZE); @@ -68,8 +70,8 @@ impl Utxos { OR bdk_utxos.deleted_at IS NOT NULL", ); - let query = query_builder.build(); - query + query_builder + .build() .execute(&self.pool) .await .map_err(|e| bdk::Error::Generic(e.to_string()))?; @@ -78,6 +80,66 @@ impl Utxos { Ok(()) } + pub async fn persist_all_in_tx( + &self, + tx: &mut SqlxTransaction<'_, Postgres>, + utxos: Vec, + ) -> Result<(), bdk::Error> { + const BATCH_SIZE: usize = 2000; + let batches = utxos.chunks(BATCH_SIZE); + + for batch in batches { + let serialized_batch = batch + .iter() + .map(|utxo| { + Ok::<_, bdk::Error>(( + utxo.outpoint.txid.to_string(), + utxo.outpoint.vout as i32, + serde_json::to_value(utxo).map_err(|e| { + bdk::Error::Generic(format!("failed to serialize utxo: {e}")) + })?, + utxo.is_spent, + )) + }) + .collect::, _>>()?; + + let mut query_builder: QueryBuilder = QueryBuilder::new( + r#"INSERT INTO bdk_utxos + (keychain_id, tx_id, vout, utxo_json, is_spent)"#, + ); + + query_builder.push_values( + serialized_batch, + |mut builder, (tx_id, vout, utxo_json, is_spent)| { + builder.push_bind(Uuid::from(self.keychain_id)); + builder.push_bind(tx_id); + builder.push_bind(vout); + builder.push_bind(utxo_json); + builder.push_bind(is_spent); + }, + ); + + query_builder.push( + "ON CONFLICT (keychain_id, tx_id, vout) DO UPDATE \ + SET utxo_json = EXCLUDED.utxo_json,\ + is_spent = EXCLUDED.is_spent,\ + modified_at = NOW(),\ + deleted_at = NULL \ + WHERE bdk_utxos.utxo_json IS DISTINCT FROM EXCLUDED.utxo_json \ + OR bdk_utxos.is_spent IS DISTINCT FROM EXCLUDED.is_spent \ + OR bdk_utxos.deleted_at IS NOT NULL", + ); + + query_builder + .build() + .execute(tx.as_mut()) + .await + .map_err(|e| bdk::Error::Generic(e.to_string()))?; + } + + Ok(()) + } + #[instrument(name = "bdk.utxos.delete", skip_all)] pub async fn delete( &self, @@ -163,7 +225,7 @@ impl Utxos { #[instrument(name = "bdk.utxos.mark_as_synced", skip(self, tx))] pub async fn mark_as_synced( &self, - tx: &mut Transaction<'_, Postgres>, + tx: &mut SqlxTransaction<'_, Postgres>, utxo: &LocalUtxo, ) -> Result<(), BdkError> { sqlx::query!( @@ -181,7 +243,7 @@ impl Utxos { #[instrument(name = "bdk.utxos.mark_confirmed", skip(self, tx))] pub async fn mark_confirmed( &self, - tx: &mut Transaction<'_, Postgres>, + tx: &mut SqlxTransaction<'_, Postgres>, utxo: &LocalUtxo, ) -> Result<(), BdkError> { sqlx::query!( @@ -199,7 +261,7 @@ impl Utxos { #[instrument(name = "bdk.utxos.find_confirmed_income_utxo", skip(self, tx))] pub async fn find_confirmed_income_utxo( &self, - tx: &mut Transaction<'_, Postgres>, + tx: &mut SqlxTransaction<'_, Postgres>, min_height: u32, ) -> Result, BdkError> { let row = sqlx::query!( @@ -251,7 +313,7 @@ impl Utxos { #[instrument(name = "bdk.utxos.find_and_remove_soft_deleted_utxo", skip_all)] pub async fn find_and_remove_soft_deleted_utxo( &self, - tx: &mut Transaction<'_, Postgres>, + tx: &mut SqlxTransaction<'_, Postgres>, ) -> Result, BdkError> { let row = sqlx::query!( r#"DELETE FROM bdk_utxos From b3ff38885513f967b525075607e56ae892895a76 Mon Sep 17 00:00:00 2001 From: Juan P Lopez Date: Thu, 16 Apr 2026 13:19:29 -0500 Subject: [PATCH 5/8] perf(bdk): memoize raw transaction iteration cache Avoid repeated full load_all deserialization in iter_txs(true) by memoizing the raw transaction cache after first successful load. Preserve cache correctness for staged updates and add focused unit coverage for the raw-cache-loaded flag. --- src/bdk/pg/mod.rs | 44 +++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 41 insertions(+), 3 deletions(-) diff --git a/src/bdk/pg/mod.rs b/src/bdk/pg/mod.rs index 8c8bcb22..d85c817a 100644 --- a/src/bdk/pg/mod.rs +++ b/src/bdk/pg/mod.rs @@ -21,6 +21,7 @@ use index::Indexes; use script_pubkeys::ScriptPubkeys; use std::{ collections::HashMap, + sync::atomic::{AtomicBool, Ordering}, sync::{Arc, Mutex, MutexGuard}, }; pub(super) use sync_times::SyncTimes; @@ -64,6 +65,7 @@ struct WalletBatchState { struct WalletCache { script_pubkeys: Arc>, transactions: Arc>, + raw_txs_fully_loaded: Arc, } impl WalletCache { @@ -71,6 +73,7 @@ impl WalletCache { Self { script_pubkeys: Arc::new(Mutex::new(HashMap::new())), transactions: Arc::new(Mutex::new(HashMap::new())), + raw_txs_fully_loaded: Arc::new(AtomicBool::new(false)), } } @@ -133,6 +136,19 @@ impl WalletCache { Ok(()) } + fn all_txs(&self) -> Result, bdk::Error> { + let cache = self.lock_transactions()?; + Ok(cache.values().cloned().collect()) + } + + fn raw_txs_fully_loaded(&self) -> bool { + self.raw_txs_fully_loaded.load(Ordering::Relaxed) + } + + fn set_raw_txs_fully_loaded(&self) { + self.raw_txs_fully_loaded.store(true, Ordering::Relaxed); + } + fn remove_tx(&self, txid: &Txid) -> Result<(), bdk::Error> { let mut cache = self.lock_transactions()?; cache.remove(txid); @@ -383,9 +399,22 @@ impl Database for SqlxWalletDb { fn iter_txs(&self, include_raw: bool) -> Result, bdk::Error> { let mut txs = if include_raw { - self.ctx - .rt - .block_on(async { self.transactions_repo().load_all().await })? + if self.cache.raw_txs_fully_loaded() { + self.cache + .all_txs()? + .into_iter() + .map(|tx| (tx.txid, tx)) + .collect() + } else { + let loaded = self + .ctx + .rt + .block_on(async { self.transactions_repo().load_all().await })?; + self.cache + .extend_txs(loaded.iter().map(|(txid, tx)| (*txid, tx.clone())))?; + self.cache.set_raw_txs_fully_loaded(); + loaded + } } else { self.ctx .rt @@ -577,4 +606,13 @@ mod tests { .expect("get should succeed"); assert_eq!(loaded, Some(path)); } + + #[test] + fn wallet_cache_raw_txs_loaded_flag_defaults_false_and_can_be_set() { + let cache = WalletCache::new(); + assert!(!cache.raw_txs_fully_loaded()); + + cache.set_raw_txs_fully_loaded(); + assert!(cache.raw_txs_fully_loaded()); + } } From 53933d07883066bd50b84795d95d8eaec1104aca Mon Sep 17 00:00:00 2001 From: Juan P Lopez Date: Thu, 16 Apr 2026 13:51:52 -0500 Subject: [PATCH 6/8] test(bdk): cover tx cache iteration semantics Add focused unit coverage for transaction cache behavior used by iter_txs summary/raw paths, and harden summary loading with negative-height validation. Also strengthen raw-cache-loaded atomic ordering with acquire/release semantics. --- src/bdk/pg/mod.rs | 80 ++++++++++++++++++++++++++++++-------- src/bdk/pg/transactions.rs | 11 +++++- 2 files changed, 74 insertions(+), 17 deletions(-) diff --git a/src/bdk/pg/mod.rs b/src/bdk/pg/mod.rs index d85c817a..aae3cea6 100644 --- a/src/bdk/pg/mod.rs +++ b/src/bdk/pg/mod.rs @@ -142,11 +142,11 @@ impl WalletCache { } fn raw_txs_fully_loaded(&self) -> bool { - self.raw_txs_fully_loaded.load(Ordering::Relaxed) + self.raw_txs_fully_loaded.load(Ordering::Acquire) } fn set_raw_txs_fully_loaded(&self) { - self.raw_txs_fully_loaded.store(true, Ordering::Relaxed); + self.raw_txs_fully_loaded.store(true, Ordering::Release); } fn remove_tx(&self, txid: &Txid) -> Result<(), bdk::Error> { @@ -258,6 +258,24 @@ impl SqlxWalletDb { fn lookup_tx(&self, txid: &Txid) -> Result, bdk::Error> { self.lookup_tx_with_mode(txid, TxLookupMode::Any) } + + fn overlay_batch_txs( + mut txs: HashMap, + batch_txs: &HashMap, + include_raw: bool, + ) -> HashMap { + if include_raw { + txs.extend(batch_txs.iter().map(|(id, tx)| (*id, tx.clone()))); + } else { + txs.extend(batch_txs.iter().map(|(id, tx)| { + let mut tx = tx.clone(); + tx.transaction = None; + (*id, tx) + })); + } + + txs + } } impl BatchOperations for SqlxWalletDb { @@ -398,7 +416,7 @@ impl Database for SqlxWalletDb { } fn iter_txs(&self, include_raw: bool) -> Result, bdk::Error> { - let mut txs = if include_raw { + let txs = if include_raw { if self.cache.raw_txs_fully_loaded() { self.cache .all_txs()? @@ -421,19 +439,9 @@ impl Database for SqlxWalletDb { .block_on(async { self.transactions_repo().load_all_summaries().await })? }; - if include_raw { - txs.extend(self.batch.txs.iter().map(|(id, tx)| (*id, tx.clone()))); - } else { - // `load_all_summaries` already strips raw transactions from persisted rows, but - // batch entries can still carry `transaction: Some(_)`; normalize them here. - txs.extend(self.batch.txs.iter().map(|(id, tx)| { - let mut tx = tx.clone(); - tx.transaction = None; - (*id, tx) - })); - } - - Ok(txs.into_values().collect()) + Ok(Self::overlay_batch_txs(txs, &self.batch.txs, include_raw) + .into_values() + .collect()) } fn get_script_pubkey_from_path( @@ -615,4 +623,44 @@ mod tests { cache.set_raw_txs_fully_loaded(); assert!(cache.raw_txs_fully_loaded()); } + + #[test] + fn overlay_batch_txs_strips_raw_when_include_raw_is_false() { + let txid = Txid::all_zeros(); + let mut base = HashMap::new(); + base.insert(txid, tx_details(txid)); + + let raw_tx = bdk::bitcoin::Transaction { + version: 2, + lock_time: bdk::bitcoin::absolute::LockTime::ZERO, + input: Vec::new(), + output: Vec::new(), + }; + + let mut batch = HashMap::new(); + let mut batch_tx = tx_details(txid); + batch_tx.transaction = Some(raw_tx); + batch.insert(txid, batch_tx); + + let merged = SqlxWalletDb::overlay_batch_txs(base, &batch, false); + assert!(merged + .get(&txid) + .expect("merged tx should exist") + .transaction + .is_none()); + } + + #[test] + fn wallet_cache_all_txs_returns_cached_values() { + let cache = WalletCache::new(); + let txid = Txid::all_zeros(); + + cache + .insert_tx(txid, tx_details(txid)) + .expect("insert should succeed"); + + let txs = cache.all_txs().expect("all_txs should succeed"); + assert_eq!(txs.len(), 1); + assert_eq!(txs[0].txid, txid); + } } diff --git a/src/bdk/pg/transactions.rs b/src/bdk/pg/transactions.rs index dca973cd..0d3f7c2f 100644 --- a/src/bdk/pg/transactions.rs +++ b/src/bdk/pg/transactions.rs @@ -254,6 +254,15 @@ impl Transactions { Ok(value as u64) } + fn to_u32(value: i32, field: &str) -> Result { + if value < 0 { + return Err(bdk::Error::Generic(format!( + "negative {field} value in bdk_transactions" + ))); + } + Ok(value as u32) + } + rows.into_iter() .map(|row| { let txid = row @@ -263,7 +272,7 @@ impl Transactions { let confirmation_time = match (row.height, row.confirmation_timestamp) { (Some(height), Some(timestamp)) => Some(BlockTime { - height: height as u32, + height: to_u32(height, "height")?, timestamp: to_u64(timestamp, "confirmation timestamp")?, }), _ => None, From 1d518009a2f8678e1462c2b829ca2ae97137e094 Mon Sep 17 00:00:00 2001 From: Juan P Lopez Date: Thu, 16 Apr 2026 14:26:08 -0500 Subject: [PATCH 7/8] refactor(bdk): simplify pg adapter internals Reduce duplication and clarify control flow in the BDK Postgres adapters while preserving the recent atomic commit and cache semantics. This keeps behavior unchanged but makes error handling and query paths easier to reason about. --- .cargo/audit.toml | 3 +-- src/bdk/pg/mod.rs | 38 +++++++++++++--------------- src/bdk/pg/script_pubkeys.rs | 48 ++++++++++++++---------------------- src/bdk/pg/transactions.rs | 48 +++++++++++++++++------------------- src/bdk/pg/utxos.rs | 46 +++++++++++++++------------------- 5 files changed, 79 insertions(+), 104 deletions(-) diff --git a/.cargo/audit.toml b/.cargo/audit.toml index 910cce68..0a8cdcda 100644 --- a/.cargo/audit.toml +++ b/.cargo/audit.toml @@ -2,8 +2,7 @@ ignore = [ "RUSTSEC-2023-0071", "RUSTSEC-2026-0049", - # TODO: remove once electrum-client/bdk stop pulling rustls-webpki 0.101.7 + # TODO: remove once electrum-client/bdk stops pulling rustls-webpki 0.101.7 "RUSTSEC-2026-0098", - # TODO: remove once electrum-client/bdk stop pulling rustls-webpki 0.101.7 "RUSTSEC-2026-0099", ] diff --git a/src/bdk/pg/mod.rs b/src/bdk/pg/mod.rs index aae3cea6..67bd17b4 100644 --- a/src/bdk/pg/mod.rs +++ b/src/bdk/pg/mod.rs @@ -163,6 +163,10 @@ pub struct SqlxWalletDb { } impl SqlxWalletDb { + fn unsupported_operation(operation: &str) -> bdk::Error { + bdk::Error::Generic(format!("{operation} is not supported by SqlxWalletDb")) + } + pub fn new(pool: PgPool, keychain_id: KeychainId) -> Self { Self { ctx: WalletDbContext::new(pool, keychain_id), @@ -231,12 +235,18 @@ impl SqlxWalletDb { if mode == TxLookupMode::Any || tx.transaction.is_some() { return Ok(Some(tx.clone())); } + + return Ok(None); } if let Some(tx) = self.cache.get_tx(txid)? { if mode == TxLookupMode::Any || tx.transaction.is_some() { return Ok(Some(tx)); } + + if self.cache.raw_txs_fully_loaded() { + return Ok(None); + } } let found = self @@ -295,9 +305,7 @@ impl BatchOperations for SqlxWalletDb { } fn set_raw_tx(&mut self, _: &Transaction) -> Result<(), bdk::Error> { - Err(bdk::Error::Generic( - "set_raw_tx is not supported by SqlxWalletDb".to_string(), - )) + Err(Self::unsupported_operation("set_raw_tx")) } fn set_tx(&mut self, tx: &TransactionDetails) -> Result<(), bdk::Error> { @@ -326,17 +334,13 @@ impl BatchOperations for SqlxWalletDb { _: KeychainKind, _: u32, ) -> Result, bdk::Error> { - Err(bdk::Error::Generic( - "del_script_pubkey_from_path is not supported by SqlxWalletDb".to_string(), - )) + Err(Self::unsupported_operation("del_script_pubkey_from_path")) } fn del_path_from_script_pubkey( &mut self, _: &Script, ) -> Result, bdk::Error> { - Err(bdk::Error::Generic( - "del_path_from_script_pubkey is not supported by SqlxWalletDb".to_string(), - )) + Err(Self::unsupported_operation("del_path_from_script_pubkey")) } fn del_utxo(&mut self, outpoint: &OutPoint) -> Result, bdk::Error> { self.ctx @@ -344,9 +348,7 @@ impl BatchOperations for SqlxWalletDb { .block_on(async { self.utxos_repo().delete(outpoint).await }) } fn del_raw_tx(&mut self, _: &Txid) -> Result, bdk::Error> { - Err(bdk::Error::Generic( - "del_raw_tx is not supported by SqlxWalletDb".to_string(), - )) + Err(Self::unsupported_operation("del_raw_tx")) } fn del_tx( @@ -367,14 +369,10 @@ impl BatchOperations for SqlxWalletDb { Ok(deleted) } fn del_last_index(&mut self, _: KeychainKind) -> Result, bdk::Error> { - Err(bdk::Error::Generic( - "del_last_index is not supported by SqlxWalletDb".to_string(), - )) + Err(Self::unsupported_operation("del_last_index")) } fn del_sync_time(&mut self) -> Result, bdk::Error> { - Err(bdk::Error::Generic( - "del_sync_time is not supported by SqlxWalletDb".to_string(), - )) + Err(Self::unsupported_operation("del_sync_time")) } } @@ -410,9 +408,7 @@ impl Database for SqlxWalletDb { .block_on(async { self.utxos_repo().list_local_utxos().await }) } fn iter_raw_txs(&self) -> Result, bdk::Error> { - Err(bdk::Error::Generic( - "iter_raw_txs is not supported by SqlxWalletDb".to_string(), - )) + Err(Self::unsupported_operation("iter_raw_txs")) } fn iter_txs(&self, include_raw: bool) -> Result, bdk::Error> { diff --git a/src/bdk/pg/script_pubkeys.rs b/src/bdk/pg/script_pubkeys.rs index c5d7aa74..e02eeca3 100644 --- a/src/bdk/pg/script_pubkeys.rs +++ b/src/bdk/pg/script_pubkeys.rs @@ -126,38 +126,28 @@ impl ScriptPubkeys { &self, keychain: Option>, ) -> Result, bdk::Error> { - let scripts = match keychain.map(|k| k.into()) { - Some(kind) => { - let rows = sqlx::query!( - r#"SELECT script FROM bdk_script_pubkeys + let keychain_id = Uuid::from(self.keychain_id); + let scripts = if let Some(kind) = keychain.map(Into::into) { + sqlx::query_scalar!( + r#"SELECT script FROM bdk_script_pubkeys WHERE keychain_id = $1 AND keychain_kind = $2"#, - Uuid::from(self.keychain_id), - kind as BdkKeychainKind, - ) - .fetch_all(&self.pool) - .await - .map_err(|e| bdk::Error::Generic(e.to_string()))?; - - rows.into_iter() - .map(|row| ScriptBuf::from(row.script)) - .collect() - } - None => { - let rows = sqlx::query!( - r#"SELECT script FROM bdk_script_pubkeys + keychain_id, + kind as BdkKeychainKind, + ) + .fetch_all(&self.pool) + .await + .map_err(|e| bdk::Error::Generic(e.to_string()))? + } else { + sqlx::query_scalar!( + r#"SELECT script FROM bdk_script_pubkeys WHERE keychain_id = $1"#, - Uuid::from(self.keychain_id), - ) - .fetch_all(&self.pool) - .await - .map_err(|e| bdk::Error::Generic(e.to_string()))?; - - rows.into_iter() - .map(|row| ScriptBuf::from(row.script)) - .collect() - } + keychain_id, + ) + .fetch_all(&self.pool) + .await + .map_err(|e| bdk::Error::Generic(e.to_string()))? }; - Ok(scripts) + Ok(scripts.into_iter().map(ScriptBuf::from).collect()) } } diff --git a/src/bdk/pg/transactions.rs b/src/bdk/pg/transactions.rs index 0d3f7c2f..64ca305a 100644 --- a/src/bdk/pg/transactions.rs +++ b/src/bdk/pg/transactions.rs @@ -6,6 +6,8 @@ use std::collections::HashMap; use crate::{bdk::error::BdkError, primitives::*}; +type SerializedTransactionRow = (String, serde_json::Value, i64, Option); + #[derive(Debug)] pub struct UnsyncedTransaction { pub tx_id: bitcoin::Txid, @@ -31,6 +33,24 @@ pub struct Transactions { } impl Transactions { + fn serialize_batch( + batch: &[TransactionDetails], + ) -> Result, bdk::Error> { + batch + .iter() + .map(|tx| { + Ok::<_, bdk::Error>(( + tx.txid.to_string(), + serde_json::to_value(tx).map_err(|e| { + bdk::Error::Generic(format!("failed to serialize tx details: {e}")) + })?, + tx.sent as i64, + tx.confirmation_time.as_ref().map(|t| t.height as i32), + )) + }) + .collect() + } + pub fn new(keychain_id: KeychainId, pool: PgPool) -> Self { Self { keychain_id, pool } } @@ -43,19 +63,7 @@ impl Transactions { let batches = txs.chunks(BATCH_SIZE); for batch in batches { - let serialized_batch = batch - .iter() - .map(|tx| { - Ok::<_, bdk::Error>(( - tx.txid.to_string(), - serde_json::to_value(tx).map_err(|e| { - bdk::Error::Generic(format!("failed to serialize tx details: {e}")) - })?, - tx.sent as i64, - tx.confirmation_time.as_ref().map(|t| t.height as i32), - )) - }) - .collect::, _>>()?; + let serialized_batch = Self::serialize_batch(batch)?; let mut query_builder: QueryBuilder = QueryBuilder::new( r#" @@ -106,19 +114,7 @@ impl Transactions { let batches = txs.chunks(BATCH_SIZE); for batch in batches { - let serialized_batch = batch - .iter() - .map(|tx| { - Ok::<_, bdk::Error>(( - tx.txid.to_string(), - serde_json::to_value(tx).map_err(|e| { - bdk::Error::Generic(format!("failed to serialize tx details: {e}")) - })?, - tx.sent as i64, - tx.confirmation_time.as_ref().map(|t| t.height as i32), - )) - }) - .collect::, _>>()?; + let serialized_batch = Self::serialize_batch(batch)?; let mut query_builder: QueryBuilder = QueryBuilder::new( r#" diff --git a/src/bdk/pg/utxos.rs b/src/bdk/pg/utxos.rs index 736c2eae..cc0a25c8 100644 --- a/src/bdk/pg/utxos.rs +++ b/src/bdk/pg/utxos.rs @@ -5,6 +5,8 @@ use uuid::Uuid; use crate::{bdk::error::BdkError, primitives::*}; +type SerializedUtxoRow = (String, i32, serde_json::Value, bool); + pub struct ConfirmedIncomeUtxo { pub outpoint: bitcoin::OutPoint, pub spent: bool, @@ -17,6 +19,22 @@ pub struct Utxos { } impl Utxos { + fn serialize_batch(batch: &[LocalUtxo]) -> Result, bdk::Error> { + batch + .iter() + .map(|utxo| { + Ok::<_, bdk::Error>(( + utxo.outpoint.txid.to_string(), + utxo.outpoint.vout as i32, + serde_json::to_value(utxo).map_err(|e| { + bdk::Error::Generic(format!("failed to serialize utxo: {e}")) + })?, + utxo.is_spent, + )) + }) + .collect() + } + pub fn new(keychain_id: KeychainId, pool: PgPool) -> Self { Self { keychain_id, pool } } @@ -29,19 +47,7 @@ impl Utxos { let batches = utxos.chunks(BATCH_SIZE); for batch in batches { - let serialized_batch = batch - .iter() - .map(|utxo| { - Ok::<_, bdk::Error>(( - utxo.outpoint.txid.to_string(), - utxo.outpoint.vout as i32, - serde_json::to_value(utxo).map_err(|e| { - bdk::Error::Generic(format!("failed to serialize utxo: {e}")) - })?, - utxo.is_spent, - )) - }) - .collect::, _>>()?; + let serialized_batch = Self::serialize_batch(batch)?; let mut query_builder: QueryBuilder = QueryBuilder::new( r#"INSERT INTO bdk_utxos @@ -89,19 +95,7 @@ impl Utxos { let batches = utxos.chunks(BATCH_SIZE); for batch in batches { - let serialized_batch = batch - .iter() - .map(|utxo| { - Ok::<_, bdk::Error>(( - utxo.outpoint.txid.to_string(), - utxo.outpoint.vout as i32, - serde_json::to_value(utxo).map_err(|e| { - bdk::Error::Generic(format!("failed to serialize utxo: {e}")) - })?, - utxo.is_spent, - )) - }) - .collect::, _>>()?; + let serialized_batch = Self::serialize_batch(batch)?; let mut query_builder: QueryBuilder = QueryBuilder::new( r#"INSERT INTO bdk_utxos From 6315caf7c717c3a64cb59483a00175f030a0e581 Mon Sep 17 00:00:00 2001 From: Juan P Lopez Date: Thu, 16 Apr 2026 14:59:19 -0500 Subject: [PATCH 8/8] fix(bdk): honor include_raw in get_tx Respect the Database::get_tx include_raw flag by stripping raw transaction data when requested. Also document the process-local raw tx cache hint and clarify commit_batch atomic scope boundaries. --- src/bdk/pg/mod.rs | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/src/bdk/pg/mod.rs b/src/bdk/pg/mod.rs index 67bd17b4..310fcde1 100644 --- a/src/bdk/pg/mod.rs +++ b/src/bdk/pg/mod.rs @@ -65,6 +65,8 @@ struct WalletBatchState { struct WalletCache { script_pubkeys: Arc>, transactions: Arc>, + // Process-local hint: true means this instance has already hydrated raw tx details + // from the DB at least once. It is intentionally not synchronized across processes. raw_txs_fully_loaded: Arc, } @@ -467,9 +469,16 @@ impl Database for SqlxWalletDb { fn get_tx( &self, tx_id: &Txid, - _include_raw: bool, + include_raw: bool, ) -> Result, bdk::Error> { - self.lookup_tx(tx_id) + self.lookup_tx(tx_id).map(|tx| { + tx.map(|mut tx| { + if !include_raw { + tx.transaction = None; + } + tx + }) + }) } fn get_last_index(&self, kind: KeychainKind) -> Result, bdk::Error> { self.ctx @@ -503,6 +512,8 @@ impl BatchDatabase for SqlxWalletDb { &mut self, mut batch: ::Batch, ) -> Result<(), bdk::Error> { + // Atomic scope here is limited to staged script pubkeys, utxos, and transactions. + // `set_last_index` / `set_sync_time` remain immediate writes by design. let (addresses_for_cache, addresses_for_db): (Vec<_>, Vec<_>) = batch .batch .addresses