diff --git a/.sqlx/query-846df15d0a9709ad69c5ff8e6c3e6679816cf5c9f86ea2b4a9d1b95458ec56a1.json b/.sqlx/query-846df15d0a9709ad69c5ff8e6c3e6679816cf5c9f86ea2b4a9d1b95458ec56a1.json new file mode 100644 index 00000000..a1b78b6f --- /dev/null +++ b/.sqlx/query-846df15d0a9709ad69c5ff8e6c3e6679816cf5c9f86ea2b4a9d1b95458ec56a1.json @@ -0,0 +1,44 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT script, keychain_kind as \"keychain_kind: BdkKeychainKind\", path FROM bdk_script_pubkeys\n WHERE keychain_id = $1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "script", + "type_info": "Bytea" + }, + { + "ordinal": 1, + "name": "keychain_kind: BdkKeychainKind", + "type_info": { + "Custom": { + "name": "bdkkeychainkind", + "kind": { + "Enum": [ + "external", + "internal" + ] + } + } + } + }, + { + "ordinal": 2, + "name": "path", + "type_info": "Int4" + } + ], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [ + false, + false, + false + ] + }, + "hash": "846df15d0a9709ad69c5ff8e6c3e6679816cf5c9f86ea2b4a9d1b95458ec56a1" +} diff --git a/.sqlx/query-a49bfbc8c8ca875c146ac678f5f5633b758af6a7070b939032394b6525d4a7c3.json b/.sqlx/query-a49bfbc8c8ca875c146ac678f5f5633b758af6a7070b939032394b6525d4a7c3.json new file mode 100644 index 00000000..dd5e8b89 --- /dev/null +++ b/.sqlx/query-a49bfbc8c8ca875c146ac678f5f5633b758af6a7070b939032394b6525d4a7c3.json @@ -0,0 +1,55 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT script, keychain_kind as \"keychain_kind: BdkKeychainKind\", path FROM bdk_script_pubkeys\n WHERE keychain_id = $1 AND keychain_kind = $2", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "script", + "type_info": "Bytea" + }, + { + "ordinal": 1, + "name": "keychain_kind: BdkKeychainKind", + "type_info": { + "Custom": { + "name": "bdkkeychainkind", + "kind": { + "Enum": [ + "external", + "internal" + ] + } + } + } + }, + { + "ordinal": 2, + "name": "path", + "type_info": "Int4" + } + ], + "parameters": { + "Left": [ + "Uuid", + { + "Custom": { + "name": "bdkkeychainkind", + "kind": { + "Enum": [ + "external", + "internal" + ] + } + } + } + ] + }, + "nullable": [ + false, + false, + false + ] + }, + "hash": "a49bfbc8c8ca875c146ac678f5f5633b758af6a7070b939032394b6525d4a7c3" +} diff --git a/src/bdk/pg/mod.rs b/src/bdk/pg/mod.rs index 310fcde1..ad3233bd 100644 --- a/src/bdk/pg/mod.rs +++ b/src/bdk/pg/mod.rs @@ -21,7 +21,7 @@ use index::Indexes; use script_pubkeys::ScriptPubkeys; use std::{ collections::HashMap, - sync::atomic::{AtomicBool, Ordering}, + sync::atomic::{AtomicBool, AtomicU8, Ordering}, sync::{Arc, Mutex, MutexGuard}, }; pub(super) use sync_times::SyncTimes; @@ -65,6 +65,10 @@ struct WalletBatchState { struct WalletCache { script_pubkeys: Arc>, transactions: Arc>, + // Process-local hint for which keychain script path sets are fully hydrated. + // Bit 0: external, bit 1: internal. + // This is intentionally not synchronized across processes. + script_pubkeys_loaded_mask: Arc, // Process-local hint: true means this instance has already hydrated raw tx details // from the DB at least once. It is intentionally not synchronized across processes. raw_txs_fully_loaded: Arc, @@ -75,10 +79,21 @@ impl WalletCache { Self { script_pubkeys: Arc::new(Mutex::new(HashMap::new())), transactions: Arc::new(Mutex::new(HashMap::new())), + script_pubkeys_loaded_mask: Arc::new(AtomicU8::new(0)), raw_txs_fully_loaded: Arc::new(AtomicBool::new(false)), } } + fn script_pubkey_mask_for(keychain: Option) -> u8 { + const EXTERNAL: u8 = 1; + const INTERNAL: u8 = 2; + match keychain { + Some(KeychainKind::External) => EXTERNAL, + Some(KeychainKind::Internal) => INTERNAL, + None => EXTERNAL | INTERNAL, + } + } + fn lock_script_pubkeys(&self) -> Result, bdk::Error> { self.script_pubkeys .lock() @@ -118,6 +133,30 @@ impl WalletCache { Ok(()) } + fn all_script_pubkeys( + &self, + keychain: Option, + ) -> Result, bdk::Error> { + let cache = self.lock_script_pubkeys()?; + Ok(cache + .iter() + .filter(|(_, (kind, _))| keychain.is_none_or(|k| *kind == k)) + .map(|(script, _)| script.clone()) + .collect()) + } + + fn script_pubkeys_fully_loaded(&self, keychain: Option) -> bool { + let required_mask = Self::script_pubkey_mask_for(keychain); + let loaded_mask = self.script_pubkeys_loaded_mask.load(Ordering::Acquire); + loaded_mask & required_mask == required_mask + } + + fn mark_script_pubkeys_loaded(&self, keychain: Option) { + let mask = Self::script_pubkey_mask_for(keychain); + self.script_pubkeys_loaded_mask + .fetch_or(mask, Ordering::Release); + } + fn get_tx(&self, txid: &Txid) -> Result, bdk::Error> { let cache = self.lock_transactions()?; Ok(cache.get(txid).cloned()) @@ -138,11 +177,40 @@ impl WalletCache { Ok(()) } + fn extend_summary_txs(&self, entries: I) -> Result<(), bdk::Error> + where + I: IntoIterator, + { + let mut cache = self.lock_transactions()?; + for (txid, mut summary) in entries { + // Summary refreshes may run after raw tx bytes were already hydrated. Preserve any + // cached raw transaction payload while applying fresh DB metadata fields. + // `iter_txs` overlays in-memory batch writes afterwards, so uncommitted updates still + // take precedence in returned views. + if let Some(raw_tx) = cache + .get(&txid) + .and_then(|existing| existing.transaction.clone()) + { + summary.transaction = Some(raw_tx); + } + cache.insert(txid, summary); + } + Ok(()) + } + fn all_txs(&self) -> Result, bdk::Error> { let cache = self.lock_transactions()?; Ok(cache.values().cloned().collect()) } + fn all_summary_txs(&self) -> Result, bdk::Error> { + let cache = self.lock_transactions()?; + Ok(cache + .values() + .map(|tx| (tx.txid, SqlxWalletDb::summary_tx_from_ref(tx))) + .collect()) + } + fn raw_txs_fully_loaded(&self) -> bool { self.raw_txs_fully_loaded.load(Ordering::Acquire) } @@ -181,6 +249,24 @@ impl SqlxWalletDb { ScriptPubkeys::new(self.ctx.keychain_id, self.ctx.pool.clone()) } + fn cache_loaded_script_pubkeys( + cache: &WalletCache, + keychain: Option, + scripts_with_paths: Vec<(ScriptBuf, (KeychainKind, u32))>, + ) -> Result, bdk::Error> { + cache.extend_script_pubkeys( + scripts_with_paths + .iter() + .map(|(script, path)| (script.clone(), *path)), + )?; + cache.mark_script_pubkeys_loaded(keychain); + + Ok(scripts_with_paths + .into_iter() + .map(|(script, _)| script) + .collect()) + } + fn utxos_repo(&self) -> Utxos { Utxos::new(self.ctx.keychain_id, self.ctx.pool.clone()) } @@ -234,7 +320,7 @@ impl SqlxWalletDb { mode: TxLookupMode, ) -> Result, bdk::Error> { if let Some(tx) = self.batch.txs.get(txid) { - if mode == TxLookupMode::Any || tx.transaction.is_some() { + if Self::tx_matches_lookup_mode(tx, mode) { return Ok(Some(tx.clone())); } @@ -242,7 +328,7 @@ impl SqlxWalletDb { } if let Some(tx) = self.cache.get_tx(txid)? { - if mode == TxLookupMode::Any || tx.transaction.is_some() { + if Self::tx_matches_lookup_mode(&tx, mode) { return Ok(Some(tx)); } @@ -271,6 +357,41 @@ impl SqlxWalletDb { self.lookup_tx_with_mode(txid, TxLookupMode::Any) } + fn tx_matches_lookup_mode(tx: &TransactionDetails, mode: TxLookupMode) -> bool { + mode == TxLookupMode::Any || tx.transaction.is_some() + } + + fn summary_tx_from_ref(tx: &TransactionDetails) -> TransactionDetails { + TransactionDetails { + transaction: None, + txid: tx.txid, + received: tx.received, + sent: tx.sent, + fee: tx.fee, + confirmation_time: tx.confirmation_time.clone(), + } + } + + fn summary_tx_from_owned(tx: TransactionDetails) -> TransactionDetails { + let TransactionDetails { + txid, + received, + sent, + fee, + confirmation_time, + .. + } = tx; + + TransactionDetails { + transaction: None, + txid, + received, + sent, + fee, + confirmation_time, + } + } + fn overlay_batch_txs( mut txs: HashMap, batch_txs: &HashMap, @@ -279,11 +400,11 @@ impl SqlxWalletDb { if include_raw { txs.extend(batch_txs.iter().map(|(id, tx)| (*id, tx.clone()))); } else { - txs.extend(batch_txs.iter().map(|(id, tx)| { - let mut tx = tx.clone(); - tx.transaction = None; - (*id, tx) - })); + txs.extend( + batch_txs + .iter() + .map(|(id, tx)| (*id, Self::summary_tx_from_ref(tx))), + ); } txs @@ -291,6 +412,7 @@ impl SqlxWalletDb { } impl BatchOperations for SqlxWalletDb { + #[tracing::instrument(name = "bdk.batch.set_script_pubkey", skip_all, err)] fn set_script_pubkey( &mut self, script: &Script, @@ -301,20 +423,24 @@ impl BatchOperations for SqlxWalletDb { Ok(()) } + #[tracing::instrument(name = "bdk.batch.set_utxo", skip_all, err)] fn set_utxo(&mut self, utxo: &LocalUtxo) -> Result<(), bdk::Error> { self.batch.utxos.push(utxo.clone()); Ok(()) } + #[tracing::instrument(name = "bdk.batch.set_raw_tx", skip_all, err)] fn set_raw_tx(&mut self, _: &Transaction) -> Result<(), bdk::Error> { Err(Self::unsupported_operation("set_raw_tx")) } + #[tracing::instrument(name = "bdk.batch.set_tx", skip_all, err)] fn set_tx(&mut self, tx: &TransactionDetails) -> Result<(), bdk::Error> { self.batch.txs.insert(tx.txid, tx.clone()); Ok(()) } + #[tracing::instrument(name = "bdk.batch.set_last_index", skip_all, err)] fn set_last_index(&mut self, kind: KeychainKind, idx: u32) -> Result<(), bdk::Error> { // NOTE: This write is intentionally immediate because BDK may call it outside of // `commit_batch` flow. @@ -323,6 +449,7 @@ impl BatchOperations for SqlxWalletDb { .block_on(async { self.indexes_repo().persist_last_index(kind, idx).await }) } + #[tracing::instrument(name = "bdk.batch.set_sync_time", skip_all, err)] fn set_sync_time(&mut self, time: SyncTime) -> Result<(), bdk::Error> { // NOTE: This write is intentionally immediate because BDK may call it outside of // `commit_batch` flow. @@ -331,6 +458,7 @@ impl BatchOperations for SqlxWalletDb { .block_on(async { self.sync_times_repo().persist(time).await }) } + #[tracing::instrument(name = "bdk.batch.del_script_pubkey_from_path", skip_all, err)] fn del_script_pubkey_from_path( &mut self, _: KeychainKind, @@ -338,21 +466,28 @@ impl BatchOperations for SqlxWalletDb { ) -> Result, bdk::Error> { Err(Self::unsupported_operation("del_script_pubkey_from_path")) } + + #[tracing::instrument(name = "bdk.batch.del_path_from_script_pubkey", skip_all, err)] fn del_path_from_script_pubkey( &mut self, _: &Script, ) -> Result, bdk::Error> { Err(Self::unsupported_operation("del_path_from_script_pubkey")) } + + #[tracing::instrument(name = "bdk.batch.del_utxo", skip_all, err)] fn del_utxo(&mut self, outpoint: &OutPoint) -> Result, bdk::Error> { self.ctx .rt .block_on(async { self.utxos_repo().delete(outpoint).await }) } + + #[tracing::instrument(name = "bdk.batch.del_raw_tx", skip_all, err)] fn del_raw_tx(&mut self, _: &Txid) -> Result, bdk::Error> { Err(Self::unsupported_operation("del_raw_tx")) } + #[tracing::instrument(name = "bdk.batch.del_tx", skip_all, err)] fn del_tx( &mut self, tx_id: &Txid, @@ -370,15 +505,20 @@ impl BatchOperations for SqlxWalletDb { Ok(deleted) } + + #[tracing::instrument(name = "bdk.batch.del_last_index", skip_all, err)] fn del_last_index(&mut self, _: KeychainKind) -> Result, bdk::Error> { Err(Self::unsupported_operation("del_last_index")) } + + #[tracing::instrument(name = "bdk.batch.del_sync_time", skip_all, err)] fn del_sync_time(&mut self) -> Result, bdk::Error> { Err(Self::unsupported_operation("del_sync_time")) } } impl Database for SqlxWalletDb { + #[tracing::instrument(name = "bdk.db.check_descriptor_checksum", skip_all, err)] fn check_descriptor_checksum( &mut self, keychain: KeychainKind, @@ -396,32 +536,47 @@ impl Database for SqlxWalletDb { Ok(()) }) } + + #[tracing::instrument(name = "bdk.db.iter_script_pubkeys", skip_all, err)] fn iter_script_pubkeys( &self, keychain: Option, ) -> Result, bdk::Error> { - self.ctx - .rt - .block_on(async { self.script_pubkeys_repo().list_scripts(keychain).await }) + if self.cache.script_pubkeys_fully_loaded(keychain) { + return self.cache.all_script_pubkeys(keychain); + } + + let scripts_with_paths = self.ctx.rt.block_on(async { + self.script_pubkeys_repo() + .list_scripts_with_paths(keychain) + .await + })?; + + Self::cache_loaded_script_pubkeys(&self.cache, keychain, scripts_with_paths) } + + #[tracing::instrument(name = "bdk.db.iter_utxos", skip_all, err)] fn iter_utxos(&self) -> Result, bdk::Error> { self.ctx .rt .block_on(async { self.utxos_repo().list_local_utxos().await }) } + + #[tracing::instrument(name = "bdk.db.iter_raw_txs", skip_all, err)] fn iter_raw_txs(&self) -> Result, bdk::Error> { Err(Self::unsupported_operation("iter_raw_txs")) } + #[tracing::instrument(name = "bdk.db.iter_txs", skip_all, err)] fn iter_txs(&self, include_raw: bool) -> Result, bdk::Error> { - let txs = if include_raw { - if self.cache.raw_txs_fully_loaded() { - self.cache - .all_txs()? - .into_iter() - .map(|tx| (tx.txid, tx)) - .collect() - } else { + let txs = match (include_raw, self.cache.raw_txs_fully_loaded()) { + (true, true) => self + .cache + .all_txs()? + .into_iter() + .map(|tx| (tx.txid, tx)) + .collect(), + (true, false) => { let loaded = self .ctx .rt @@ -431,10 +586,19 @@ impl Database for SqlxWalletDb { self.cache.set_raw_txs_fully_loaded(); loaded } - } else { - self.ctx - .rt - .block_on(async { self.transactions_repo().load_all_summaries().await })? + // Once raw txs are fully loaded for this process, serve summary calls from cache to + // avoid repeated full-table reads. This returns the in-process snapshot (kept current + // by set/del/commit paths) rather than forcing a fresh DB roundtrip. + (false, true) => self.cache.all_summary_txs()?, + (false, false) => { + let txs = self + .ctx + .rt + .block_on(async { self.transactions_repo().load_all_summaries().await })?; + self.cache + .extend_summary_txs(txs.iter().map(|(txid, tx)| (*txid, tx.clone())))?; + txs + } }; Ok(Self::overlay_batch_txs(txs, &self.batch.txs, include_raw) @@ -442,6 +606,7 @@ impl Database for SqlxWalletDb { .collect()) } + #[tracing::instrument(name = "bdk.db.get_script_pubkey_from_path", skip_all, err)] fn get_script_pubkey_from_path( &self, keychain: KeychainKind, @@ -451,45 +616,60 @@ impl Database for SqlxWalletDb { .rt .block_on(async { self.script_pubkeys_repo().find_script(keychain, path).await }) } + + #[tracing::instrument(name = "bdk.db.get_path_from_script_pubkey", skip_all, err)] fn get_path_from_script_pubkey( &self, script: &Script, ) -> Result, bdk::Error> { self.lookup_script_pubkey_path(script) } + + #[tracing::instrument(name = "bdk.db.get_utxo", skip_all, err)] fn get_utxo(&self, outpoint: &OutPoint) -> Result, bdk::Error> { self.ctx .rt .block_on(async { self.utxos_repo().find(outpoint).await }) } + + #[tracing::instrument(name = "bdk.db.get_raw_tx", skip_all, err)] fn get_raw_tx(&self, tx_id: &Txid) -> Result, bdk::Error> { self.lookup_tx_with_mode(tx_id, TxLookupMode::RequireRaw) .map(|tx| tx.and_then(|tx| tx.transaction)) } + + #[tracing::instrument(name = "bdk.db.get_tx", skip_all, err)] fn get_tx( &self, tx_id: &Txid, include_raw: bool, ) -> Result, bdk::Error> { self.lookup_tx(tx_id).map(|tx| { - tx.map(|mut tx| { - if !include_raw { - tx.transaction = None; + tx.map(|tx| { + if include_raw { + tx + } else { + Self::summary_tx_from_owned(tx) } - tx }) }) } + + #[tracing::instrument(name = "bdk.db.get_last_index", skip_all, err)] fn get_last_index(&self, kind: KeychainKind) -> Result, bdk::Error> { self.ctx .rt .block_on(async { self.indexes_repo().get_latest(kind).await }) } + + #[tracing::instrument(name = "bdk.db.get_sync_time", skip_all, err)] fn get_sync_time(&self) -> Result, bdk::Error> { self.ctx .rt .block_on(async { self.sync_times_repo().get().await }) } + + #[tracing::instrument(name = "bdk.db.increment_last_index", skip_all, err)] fn increment_last_index(&mut self, keychain: KeychainKind) -> Result { self.ctx .rt @@ -631,6 +811,23 @@ mod tests { assert!(cache.raw_txs_fully_loaded()); } + #[test] + fn wallet_cache_script_pubkeys_loaded_flags_track_keychains() { + let cache = WalletCache::new(); + + assert!(!cache.script_pubkeys_fully_loaded(Some(KeychainKind::External))); + assert!(!cache.script_pubkeys_fully_loaded(Some(KeychainKind::Internal))); + assert!(!cache.script_pubkeys_fully_loaded(None)); + + cache.mark_script_pubkeys_loaded(Some(KeychainKind::External)); + assert!(cache.script_pubkeys_fully_loaded(Some(KeychainKind::External))); + assert!(!cache.script_pubkeys_fully_loaded(Some(KeychainKind::Internal))); + assert!(!cache.script_pubkeys_fully_loaded(None)); + + cache.mark_script_pubkeys_loaded(Some(KeychainKind::Internal)); + assert!(cache.script_pubkeys_fully_loaded(None)); + } + #[test] fn overlay_batch_txs_strips_raw_when_include_raw_is_false() { let txid = Txid::all_zeros(); @@ -670,4 +867,90 @@ mod tests { assert_eq!(txs.len(), 1); assert_eq!(txs[0].txid, txid); } + + #[test] + fn wallet_cache_extend_summary_txs_preserves_raw_and_refreshes_metadata() { + let cache = WalletCache::new(); + let txid = Txid::all_zeros(); + + let raw_tx = bdk::bitcoin::Transaction { + version: 2, + lock_time: bdk::bitcoin::absolute::LockTime::ZERO, + input: Vec::new(), + output: Vec::new(), + }; + + let mut existing = tx_details(txid); + existing.transaction = Some(raw_tx.clone()); + existing.received = 1; + cache + .insert_tx(txid, existing) + .expect("insert should succeed"); + + let mut summary = tx_details(txid); + summary.received = 42; + summary.sent = 7; + summary.fee = Some(3); + summary.confirmation_time = Some(bdk::BlockTime { + height: 100, + timestamp: 123, + }); + + cache + .extend_summary_txs([(txid, summary)]) + .expect("extend should succeed"); + + let merged = cache + .get_tx(&txid) + .expect("get should succeed") + .expect("tx should exist"); + assert_eq!(merged.transaction, Some(raw_tx)); + assert_eq!(merged.received, 42); + assert_eq!(merged.sent, 7); + assert_eq!(merged.fee, Some(3)); + assert_eq!( + merged.confirmation_time, + Some(bdk::BlockTime { + height: 100, + timestamp: 123, + }) + ); + } + + #[test] + fn cache_loaded_script_pubkeys_marks_mask_and_populates_paths() { + let cache = WalletCache::new(); + let external_script = ScriptBuf::from(vec![0x51]); + let internal_script = ScriptBuf::from(vec![0x52]); + + let loaded = SqlxWalletDb::cache_loaded_script_pubkeys( + &cache, + Some(KeychainKind::External), + vec![(external_script.clone(), (KeychainKind::External, 7))], + ) + .expect("cache should be populated"); + + assert_eq!(loaded, vec![external_script.clone()]); + assert!(cache.script_pubkeys_fully_loaded(Some(KeychainKind::External))); + assert!(!cache.script_pubkeys_fully_loaded(Some(KeychainKind::Internal))); + assert_eq!( + cache + .get_script_pubkey_path(external_script.as_script()) + .expect("path lookup should succeed"), + Some((KeychainKind::External, 7)) + ); + + SqlxWalletDb::cache_loaded_script_pubkeys( + &cache, + Some(KeychainKind::Internal), + vec![(internal_script.clone(), (KeychainKind::Internal, 3))], + ) + .expect("cache should be populated"); + + assert!(cache.script_pubkeys_fully_loaded(None)); + let all = cache + .all_script_pubkeys(None) + .expect("all_script_pubkeys should succeed"); + assert_eq!(all.len(), 2); + } } diff --git a/src/bdk/pg/script_pubkeys.rs b/src/bdk/pg/script_pubkeys.rs index e02eeca3..01983512 100644 --- a/src/bdk/pg/script_pubkeys.rs +++ b/src/bdk/pg/script_pubkeys.rs @@ -5,12 +5,27 @@ use uuid::Uuid; use super::convert::BdkKeychainKind; use crate::primitives::{bitcoin::ScriptBuf, *}; +type ScriptWithPath = (ScriptBuf, (bdk::KeychainKind, u32)); + pub struct ScriptPubkeys { keychain_id: KeychainId, pool: PgPool, } impl ScriptPubkeys { + fn script_with_path( + script: Vec, + keychain_kind: BdkKeychainKind, + path: i32, + ) -> Result { + let path = u32::try_from(path) + .map_err(|_| bdk::Error::Generic(format!("invalid derivation path from db: {path}")))?; + Ok(( + ScriptBuf::from(script), + (bdk::KeychainKind::from(keychain_kind), path), + )) + } + pub fn new(keychain_id: KeychainId, pool: PgPool) -> Self { Self { keychain_id, pool } } @@ -88,12 +103,12 @@ impl ScriptPubkeys { keychain: impl Into, path: u32, ) -> Result, bdk::Error> { - let kind = keychain.into(); + let keychain_kind = keychain.into(); let row = sqlx::query!( r#"SELECT script FROM bdk_script_pubkeys WHERE keychain_id = $1 AND keychain_kind = $2 AND path = $3"#, Uuid::from(self.keychain_id), - kind as BdkKeychainKind, + keychain_kind as BdkKeychainKind, path as i32, ) .fetch_optional(&self.pool) @@ -118,21 +133,31 @@ impl ScriptPubkeys { .await .map_err(|e| bdk::Error::Generic(e.to_string()))?; - Ok(row.map(|row| (row.keychain_kind, row.path as u32))) + row.map(|row| { + u32::try_from(row.path) + .map(|path| (row.keychain_kind, path)) + .map_err(|_| { + bdk::Error::Generic(format!("invalid derivation path from db: {}", row.path)) + }) + }) + .transpose() } #[instrument(name = "bdk.script_pubkeys.list_scripts", skip_all)] + // Retained for non-path call sites and focused tests. + #[allow(dead_code)] pub async fn list_scripts( &self, keychain: Option>, ) -> Result, bdk::Error> { let keychain_id = Uuid::from(self.keychain_id); - let scripts = if let Some(kind) = keychain.map(Into::into) { + let keychain_kind: Option = keychain.map(Into::into); + let scripts = if let Some(keychain_kind) = keychain_kind { sqlx::query_scalar!( r#"SELECT script FROM bdk_script_pubkeys WHERE keychain_id = $1 AND keychain_kind = $2"#, keychain_id, - kind as BdkKeychainKind, + keychain_kind as BdkKeychainKind, ) .fetch_all(&self.pool) .await @@ -150,4 +175,43 @@ impl ScriptPubkeys { Ok(scripts.into_iter().map(ScriptBuf::from).collect()) } + + #[instrument(name = "bdk.script_pubkeys.list_scripts_with_paths", skip_all)] + pub async fn list_scripts_with_paths( + &self, + keychain: Option>, + ) -> Result, bdk::Error> { + let keychain_id = Uuid::from(self.keychain_id); + let keychain_kind: Option = keychain.map(Into::into); + if let Some(keychain_kind) = keychain_kind { + let rows = sqlx::query!( + r#"SELECT script, keychain_kind as "keychain_kind: BdkKeychainKind", path FROM bdk_script_pubkeys + WHERE keychain_id = $1 AND keychain_kind = $2"#, + keychain_id, + keychain_kind as BdkKeychainKind, + ) + .fetch_all(&self.pool) + .await + .map_err(|e| bdk::Error::Generic(e.to_string()))?; + + Ok(rows + .into_iter() + .map(|row| Self::script_with_path(row.script, row.keychain_kind, row.path)) + .collect::, _>>()?) + } else { + let rows = sqlx::query!( + r#"SELECT script, keychain_kind as "keychain_kind: BdkKeychainKind", path FROM bdk_script_pubkeys + WHERE keychain_id = $1"#, + keychain_id, + ) + .fetch_all(&self.pool) + .await + .map_err(|e| bdk::Error::Generic(e.to_string()))?; + + Ok(rows + .into_iter() + .map(|row| Self::script_with_path(row.script, row.keychain_kind, row.path)) + .collect::, _>>()?) + } + } } diff --git a/src/job/sync_wallet.rs b/src/job/sync_wallet.rs index e2348854..7442afd9 100644 --- a/src/job/sync_wallet.rs +++ b/src/job/sync_wallet.rs @@ -683,6 +683,16 @@ async fn cleanup_soft_deleted_utxos(ctx: &KeychainSyncContext<'_>) -> Result<(), tx.commit().await?; continue; } + Err(UtxoError::UtxoAlreadySettledError) => { + // This can happen if the soft-deleted BDK UTXO maps to a bria UTXO that is + // already accounting-settled. Do not fail/retry the whole sync job; restore the + // BDK row visibility and continue. + // If rollback itself fails, this iteration returns early and undelete is skipped; + // the next sync cycle re-attempts reconciliation. + tx.rollback().await?; + ctx.bdk_utxos.undelete(outpoint).await?; + continue; + } Err(e) => return Err(e.into()), };