From 27a5e8b5812d9a29d1befb695bb159062b649561 Mon Sep 17 00:00:00 2001 From: Juan P Lopez Date: Fri, 17 Apr 2026 13:31:54 -0500 Subject: [PATCH 1/8] perf(bdk): batch wallet sync tx lookups --- ...b005975efe12df569e823b49bbd7b2688d52b.json | 23 --- ...a581cde4aee4a44070b237493999ab7d52c2f.json | 22 +++ src/bdk/pg/cache.rs | 65 ++++++- src/bdk/pg/db_traits.rs | 2 +- src/bdk/pg/lookups.rs | 158 +++++++++++++----- src/bdk/pg/mod.rs | 44 ++++- src/bdk/pg/transactions.rs | 32 ++-- src/wallet/keychain/wallet.rs | 23 +++ 8 files changed, 275 insertions(+), 94 deletions(-) delete mode 100644 .sqlx/query-5256bb536a5c8b3421a642b8e4eb005975efe12df569e823b49bbd7b2688d52b.json create mode 100644 .sqlx/query-c3fc3bf10aaefb6411ef73538d7a581cde4aee4a44070b237493999ab7d52c2f.json diff --git a/.sqlx/query-5256bb536a5c8b3421a642b8e4eb005975efe12df569e823b49bbd7b2688d52b.json b/.sqlx/query-5256bb536a5c8b3421a642b8e4eb005975efe12df569e823b49bbd7b2688d52b.json deleted file mode 100644 index 62675cca..00000000 --- a/.sqlx/query-5256bb536a5c8b3421a642b8e4eb005975efe12df569e823b49bbd7b2688d52b.json +++ /dev/null @@ -1,23 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n SELECT details_json FROM bdk_transactions WHERE keychain_id = $1 AND tx_id = $2 AND deleted_at IS NULL", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "details_json", - "type_info": "Jsonb" - } - ], - "parameters": { - "Left": [ - "Uuid", - "Text" - ] - }, - "nullable": [ - false - ] - }, - "hash": "5256bb536a5c8b3421a642b8e4eb005975efe12df569e823b49bbd7b2688d52b" -} diff --git a/.sqlx/query-c3fc3bf10aaefb6411ef73538d7a581cde4aee4a44070b237493999ab7d52c2f.json b/.sqlx/query-c3fc3bf10aaefb6411ef73538d7a581cde4aee4a44070b237493999ab7d52c2f.json new file mode 100644 index 00000000..3ede1da4 --- /dev/null +++ b/.sqlx/query-c3fc3bf10aaefb6411ef73538d7a581cde4aee4a44070b237493999ab7d52c2f.json @@ -0,0 +1,22 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT COUNT(*) AS \"count!\"\n FROM bdk_transactions\n WHERE keychain_id = $1\n AND deleted_at IS NULL", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "count!", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [ + "Uuid" + ] + }, + "nullable": [ + null + ] + }, + "hash": "c3fc3bf10aaefb6411ef73538d7a581cde4aee4a44070b237493999ab7d52c2f" +} diff --git a/src/bdk/pg/cache.rs b/src/bdk/pg/cache.rs index 4a94b8fe..6b915c84 100644 --- a/src/bdk/pg/cache.rs +++ b/src/bdk/pg/cache.rs @@ -17,7 +17,12 @@ pub(super) struct WalletCache { missing_script_pubkeys: Arc>>, missing_txids: Arc>>, pending_script_misses: Arc>>, + // Txids confirmed absent from the DB in this process. These are retried in batches to + // recover from races where rows appear after an earlier miss. pending_tx_misses: Arc>>, + // Txids not yet seen in the in-process cache and not yet known-missing. These are batched + // before we fall back to recording a miss. + pending_tx_lookups: Arc>>, // Process-local hint for which keychain script path sets are fully hydrated. // Bit 0: external, bit 1: internal. // This is intentionally not synchronized across processes. @@ -47,6 +52,7 @@ impl WalletCache { missing_txids: Arc::new(Mutex::new(HashSet::new())), pending_script_misses: Arc::new(Mutex::new(HashSet::new())), pending_tx_misses: Arc::new(Mutex::new(HashSet::new())), + pending_tx_lookups: Arc::new(Mutex::new(HashSet::new())), script_pubkeys_loaded_mask: Arc::new(AtomicU8::new(0)), raw_txs_fully_loaded: Arc::new(AtomicBool::new(false)), summary_txs_fully_loaded: Arc::new(AtomicBool::new(false)), @@ -99,6 +105,10 @@ impl WalletCache { self.lock_with_error(&self.pending_tx_misses, "pending tx misses cache") } + fn lock_pending_tx_lookups(&self) -> Result>, bdk::Error> { + self.lock_with_error(&self.pending_tx_lookups, "pending tx lookups cache") + } + pub(super) fn get_script_pubkey_path( &self, script: &Script, @@ -173,15 +183,6 @@ impl WalletCache { Ok(cache.get(txid).cloned()) } - pub(super) fn insert_tx(&self, txid: Txid, tx: TransactionDetails) -> Result<(), bdk::Error> { - { - let mut cache = self.lock_transactions()?; - cache.insert(txid, tx); - } - self.mark_txid_not_missing(&txid)?; - Ok(()) - } - fn clear_tx_miss_tracking<'a, I>(&self, txids: I) -> Result<(), bdk::Error> where I: IntoIterator, @@ -195,12 +196,24 @@ impl WalletCache { Ok(()) } + fn clear_pending_tx_lookups<'a, I>(&self, txids: I) -> Result<(), bdk::Error> + where + I: IntoIterator, + { + let mut pending = self.lock_pending_tx_lookups()?; + for txid in txids { + pending.remove(txid); + } + Ok(()) + } + pub(super) fn extend_txs(&self, entries: I) -> Result<(), bdk::Error> where I: IntoIterator, { let entries: Vec<_> = entries.into_iter().collect(); self.clear_tx_miss_tracking(entries.iter().map(|(txid, _)| txid))?; + self.clear_pending_tx_lookups(entries.iter().map(|(txid, _)| txid))?; let mut cache = self.lock_transactions()?; cache.extend(entries); Ok(()) @@ -212,6 +225,7 @@ impl WalletCache { { let entries: Vec<_> = entries.into_iter().collect(); self.clear_tx_miss_tracking(entries.iter().map(|(txid, _)| txid))?; + self.clear_pending_tx_lookups(entries.iter().map(|(txid, _)| txid))?; let mut cache = self.lock_transactions()?; for (txid, mut summary) in entries { // Summary refreshes may run after raw tx bytes were already hydrated. Preserve any @@ -333,6 +347,30 @@ impl WalletCache { pub(super) fn mark_txid_not_missing(&self, txid: &Txid) -> Result<(), bdk::Error> { self.lock_missing_txids()?.remove(txid); self.lock_pending_tx_misses()?.remove(txid); + self.lock_pending_tx_lookups()?.remove(txid); + Ok(()) + } + + pub(super) fn enqueue_pending_tx_lookup(&self, txid: Txid) -> Result<(), bdk::Error> { + self.lock_pending_tx_lookups()?.insert(txid); + Ok(()) + } + + pub(super) fn drain_pending_tx_lookups(&self, max: usize) -> Result, bdk::Error> { + let mut pending = self.lock_pending_tx_lookups()?; + let drained: Vec<_> = pending.iter().take(max).copied().collect(); + for txid in &drained { + pending.remove(txid); + } + Ok(drained) + } + + pub(super) fn requeue_pending_tx_lookups(&self, txids: I) -> Result<(), bdk::Error> + where + I: IntoIterator, + { + let mut pending = self.lock_pending_tx_lookups()?; + pending.extend(txids); Ok(()) } @@ -370,6 +408,14 @@ impl WalletCache { Ok(pending.len() >= threshold) } + pub(super) fn should_batch_resolve_tx_lookups( + &self, + threshold: usize, + ) -> Result { + let pending = self.lock_pending_tx_lookups()?; + Ok(pending.len() >= threshold) + } + pub(super) fn invalidate(&self) { Self::clear_even_if_poisoned(&self.script_pubkeys); Self::clear_even_if_poisoned(&self.transactions); @@ -377,6 +423,7 @@ impl WalletCache { Self::clear_even_if_poisoned(&self.missing_txids); Self::clear_even_if_poisoned(&self.pending_script_misses); Self::clear_even_if_poisoned(&self.pending_tx_misses); + Self::clear_even_if_poisoned(&self.pending_tx_lookups); self.script_pubkeys_loaded_mask.store(0, Ordering::Release); self.raw_txs_fully_loaded.store(false, Ordering::Release); diff --git a/src/bdk/pg/db_traits.rs b/src/bdk/pg/db_traits.rs index e9866772..07f1a097 100644 --- a/src/bdk/pg/db_traits.rs +++ b/src/bdk/pg/db_traits.rs @@ -324,7 +324,7 @@ mod tests { let txid = Txid::all_zeros(); db.cache - .insert_tx(txid, tx_details(txid)) + .extend_txs([(txid, tx_details(txid))]) .expect("insert should succeed"); db.cache.set_raw_txs_fully_loaded(); diff --git a/src/bdk/pg/lookups.rs b/src/bdk/pg/lookups.rs index 229f9e8c..ba49fa09 100644 --- a/src/bdk/pg/lookups.rs +++ b/src/bdk/pg/lookups.rs @@ -18,31 +18,62 @@ pub(super) enum TxLookupMode { #[derive(Copy, Clone)] pub(super) struct MissResolutionPolicy { - pub threshold: usize, - pub batch_size: usize, + pub tx_lookup_threshold: usize, + pub tx_lookup_batch_size: usize, + pub tx_miss_threshold: usize, + pub tx_miss_batch_size: usize, + pub script_miss_threshold: usize, + pub script_miss_batch_size: usize, } impl Default for MissResolutionPolicy { fn default() -> Self { Self { - threshold: 64, - batch_size: 512, + tx_lookup_threshold: 64, + tx_lookup_batch_size: 512, + tx_miss_threshold: 64, + tx_miss_batch_size: 512, + script_miss_threshold: 64, + script_miss_batch_size: 512, } } } impl SqlxWalletDb { + fn lookup_cached_tx_with_mode( + &self, + txid: &Txid, + mode: TxLookupMode, + hit_source: LookupSource, + mode_miss_source: LookupSource, + ) -> Result, bdk::Error> { + let Some(tx) = self.cache.get_tx(txid)? else { + return Ok(None); + }; + + if Self::tx_matches_lookup_mode(&tx, mode) { + return Ok(Some((Some(tx), hit_source))); + } + + if self.cache.raw_txs_fully_loaded() { + self.cache.record_missing_txid(*txid)?; + return Ok(Some((None, mode_miss_source))); + } + + Ok(None) + } + fn resolve_pending_script_misses(&self) -> Result<(), bdk::Error> { if !self .cache - .should_batch_resolve_script_misses(self.miss_resolution.threshold)? + .should_batch_resolve_script_misses(self.miss_resolution.script_miss_threshold)? { return Ok(()); } let pending = self .cache - .drain_pending_script_misses(self.miss_resolution.batch_size)?; + .drain_pending_script_misses(self.miss_resolution.script_miss_batch_size)?; if pending.is_empty() { return Ok(()); } @@ -73,14 +104,14 @@ impl SqlxWalletDb { fn resolve_pending_tx_misses(&self) -> Result<(), bdk::Error> { if !self .cache - .should_batch_resolve_tx_misses(self.miss_resolution.threshold)? + .should_batch_resolve_tx_misses(self.miss_resolution.tx_miss_threshold)? { return Ok(()); } let pending = self .cache - .drain_pending_tx_misses(self.miss_resolution.batch_size)?; + .drain_pending_tx_misses(self.miss_resolution.tx_miss_batch_size)?; if pending.is_empty() { return Ok(()); } @@ -104,6 +135,54 @@ impl SqlxWalletDb { Ok(()) } + fn resolve_pending_tx_lookups_internal( + &self, + force: bool, + ) -> Result, bdk::Error> { + if !force + && !self + .cache + .should_batch_resolve_tx_lookups(self.miss_resolution.tx_lookup_threshold)? + { + return Ok(None); + } + + let pending = self + .cache + .drain_pending_tx_lookups(self.miss_resolution.tx_lookup_batch_size)?; + if pending.is_empty() { + return Ok(None); + } + + let found = match self + .ctx + .rt + .block_on(async { self.transactions_repo().find_by_ids(&pending).await }) + { + Ok(found) => found, + Err(error) => { + self.cache.requeue_pending_tx_lookups(pending)?; + return Err(error); + } + }; + let n_found = found.len(); + + if n_found > 0 { + self.cache.extend_txs(found)?; + } + + Ok(Some(n_found)) + } + + fn resolve_pending_tx_lookups(&self) -> Result<(), bdk::Error> { + let _ = self.resolve_pending_tx_lookups_internal(false)?; + Ok(()) + } + + fn resolve_pending_tx_lookups_force(&self) -> Result, bdk::Error> { + self.resolve_pending_tx_lookups_internal(true) + } + pub(super) fn lookup_script_pubkey_path( &self, script: &Script, @@ -163,15 +242,10 @@ impl SqlxWalletDb { return Ok((None, "batch_mode_miss")); } - if let Some(tx) = self.cache.get_tx(txid)? { - if Self::tx_matches_lookup_mode(&tx, mode) { - return Ok((Some(tx), "cache")); - } - - if self.cache.raw_txs_fully_loaded() { - self.cache.record_missing_txid(*txid)?; - return Ok((None, "cache_mode_miss")); - } + if let Some(result) = + self.lookup_cached_tx_with_mode(txid, mode, "cache", "cache_mode_miss")? + { + return Ok(result); } if self.cache.txid_marked_missing(txid)? { @@ -185,36 +259,34 @@ impl SqlxWalletDb { return Ok((None, "fully_loaded_miss")); } - self.resolve_pending_tx_misses()?; - if let Some(tx) = self.cache.get_tx(txid)? { - if Self::tx_matches_lookup_mode(&tx, mode) { - return Ok((Some(tx), "batch_resolve")); - } - if self.cache.raw_txs_fully_loaded() { - self.cache.record_missing_txid(*txid)?; - return Ok((None, "batch_resolve_mode_miss")); - } + self.cache.enqueue_pending_tx_lookup(*txid)?; + self.resolve_pending_tx_lookups()?; + if let Some(result) = + self.lookup_cached_tx_with_mode(txid, mode, "batch_lookup", "batch_lookup_mode_miss")? + { + return Ok(result); } - let found = self - .ctx - .rt - .block_on(async { self.transactions_repo().find_by_id(txid).await })?; - - // DB rows represent persisted TransactionDetails. In `RequireRaw` mode, a summary-only - // row is a mode miss (not an existence miss). - - if let Some(tx) = &found { - self.cache.insert_tx(tx.txid, tx.clone())?; - if Self::tx_matches_lookup_mode(tx, mode) { - Ok((found, "db_hit")) - } else { - Ok((None, "db_mode_miss")) + if self.resolve_pending_tx_lookups_force()?.is_some() { + if let Some(result) = self.lookup_cached_tx_with_mode( + txid, + mode, + "forced_batch_lookup", + "forced_batch_lookup_mode_miss", + )? { + return Ok(result); } - } else { - self.cache.record_and_enqueue_missing_txid(*txid)?; - Ok((None, "db_miss")) } + + self.resolve_pending_tx_misses()?; + if let Some(result) = + self.lookup_cached_tx_with_mode(txid, mode, "batch_resolve", "batch_resolve_mode_miss")? + { + return Ok(result); + } + + self.cache.record_and_enqueue_missing_txid(*txid)?; + Ok((None, "db_miss")) } pub(super) fn lookup_tx(&self, txid: &Txid) -> Result { diff --git a/src/bdk/pg/mod.rs b/src/bdk/pg/mod.rs index 7c8ea844..f0cddf7c 100644 --- a/src/bdk/pg/mod.rs +++ b/src/bdk/pg/mod.rs @@ -75,6 +75,18 @@ impl SqlxWalletDb { } } + pub fn tx_count(&self) -> Result { + self.ctx + .rt + .block_on(async { self.transactions_repo().count_active().await }) + } + + pub fn prewarm_raw_txs(&self) -> Result { + use bdk::database::Database; + + Ok(Database::iter_txs(self, true)?.len()) + } + fn script_pubkeys_repo(&self) -> ScriptPubkeys { ScriptPubkeys::new(self.ctx.keychain_id, self.ctx.pool.clone()) } @@ -141,7 +153,7 @@ mod tests { let details = tx_details(txid); cache - .insert_tx(txid, details.clone()) + .extend_txs([(txid, details.clone())]) .expect("insert should succeed"); let loaded = cache.get_tx(&txid).expect("get should succeed"); assert_eq!(loaded, Some(details)); @@ -236,7 +248,7 @@ mod tests { let txid = Txid::all_zeros(); cache - .insert_tx(txid, tx_details(txid)) + .extend_txs([(txid, tx_details(txid))]) .expect("insert should succeed"); let txs = cache.all_txs().expect("all_txs should succeed"); @@ -260,7 +272,7 @@ mod tests { existing.transaction = Some(raw_tx.clone()); existing.received = 1; cache - .insert_tx(txid, existing) + .extend_txs([(txid, existing)]) .expect("insert should succeed"); let mut summary = tx_details(txid); @@ -293,6 +305,32 @@ mod tests { ); } + #[test] + fn wallet_cache_extend_txs_clears_pending_lookup_entries() { + let cache = WalletCache::new(); + let txid = Txid::all_zeros(); + + cache + .enqueue_pending_tx_lookup(txid) + .expect("enqueue should succeed"); + let drained = cache + .drain_pending_tx_lookups(10) + .expect("drain should succeed"); + assert_eq!(drained, vec![txid]); + + cache + .requeue_pending_tx_lookups(vec![txid]) + .expect("requeue should succeed"); + cache + .extend_txs([(txid, tx_details(txid))]) + .expect("extend should succeed"); + + let drained_after_insert = cache + .drain_pending_tx_lookups(10) + .expect("drain should succeed"); + assert!(drained_after_insert.is_empty()); + } + #[test] fn cache_loaded_script_pubkeys_marks_mask_and_populates_paths() { let cache = WalletCache::new(); diff --git a/src/bdk/pg/transactions.rs b/src/bdk/pg/transactions.rs index 6a136eaf..7a3e4b2d 100644 --- a/src/bdk/pg/transactions.rs +++ b/src/bdk/pg/transactions.rs @@ -110,6 +110,23 @@ impl Transactions { Self { keychain_id, pool } } + #[instrument(name = "bdk.transactions.count_active", skip(self))] + pub async fn count_active(&self) -> Result { + let row = sqlx::query!( + r#" + SELECT COUNT(*) AS "count!" + FROM bdk_transactions + WHERE keychain_id = $1 + AND deleted_at IS NULL"#, + self.keychain_id as KeychainId, + ) + .fetch_one(&self.pool) + .await + .map_err(|e| bdk::Error::Generic(e.to_string()))?; + + Ok(row.count) + } + pub async fn persist_all_in_tx( &self, tx: &mut SqlxTransaction<'_, Postgres>, @@ -182,21 +199,6 @@ impl Transactions { .transpose() } - #[instrument(name = "bdk.transactions.find_by_id", skip_all)] - pub async fn find_by_id(&self, tx_id: &Txid) -> Result, bdk::Error> { - let tx = sqlx::query!( - r#" - SELECT details_json FROM bdk_transactions WHERE keychain_id = $1 AND tx_id = $2 AND deleted_at IS NULL"#, - self.keychain_id as KeychainId, - tx_id.to_string(), - ) - .fetch_optional(&self.pool) - .await - .map_err(|e| bdk::Error::Generic(e.to_string()))?; - tx.map(|tx| Self::deserialize_details(tx.details_json)) - .transpose() - } - #[instrument(name = "bdk.transactions.find_by_ids", skip_all, fields(n_requested = tx_ids.len(), n_found))] pub async fn find_by_ids( &self, diff --git a/src/wallet/keychain/wallet.rs b/src/wallet/keychain/wallet.rs index 03ba47b4..cd217bc5 100644 --- a/src/wallet/keychain/wallet.rs +++ b/src/wallet/keychain/wallet.rs @@ -52,6 +52,7 @@ impl Default for SyncProgressContext { const PROGRESS_BUCKET_SIZE_PCT: u8 = 10; const PROGRESS_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(30); +const TX_PREWARM_THRESHOLD: i64 = 5_000; const fn completion_bucket() -> u8 { 100 / PROGRESS_BUCKET_SIZE_PCT @@ -147,6 +148,10 @@ fn should_emit_progress( elapsed_since_last_emit >= PROGRESS_HEARTBEAT_INTERVAL } +fn should_prewarm_raw_txs(tx_count: i64) -> bool { + tx_count >= TX_PREWARM_THRESHOLD +} + impl KeychainWallet { pub fn new( pool: PgPool, @@ -244,6 +249,18 @@ impl KeychainWallet { let max_last_index = last_external.max(last_internal); let _ = wallet.ensure_addresses_cached(max_last_index.saturating_add(1))?; + + let tx_count = wallet.database().tx_count()?; + if should_prewarm_raw_txs(tx_count) { + let prewarmed_txs = wallet.database().prewarm_raw_txs()?; + tracing::info!( + tx_count, + prewarmed_txs, + threshold = TX_PREWARM_THRESHOLD, + "prewarmed raw tx cache before wallet sync" + ); + } + let progress = TracingBdkProgress::new(context, wallet_id, keychain_id); wallet.sync( &blockchain, @@ -345,4 +362,10 @@ mod tests { Duration::from_secs(1) )); } + + #[test] + fn prewarm_enabled_for_large_wallets_only() { + assert!(!should_prewarm_raw_txs(4_999)); + assert!(should_prewarm_raw_txs(5_000)); + } } From fed35827f9ef2549566fc5772648243c334c07e8 Mon Sep 17 00:00:00 2001 From: Juan P Lopez Date: Fri, 17 Apr 2026 14:46:27 -0500 Subject: [PATCH 2/8] fix(bdk): harden tx lookup retry semantics --- src/bdk/pg/cache.rs | 48 +++++++++++- src/bdk/pg/lookups.rs | 133 ++++++++++++++++++++++++++++------ src/bdk/pg/mod.rs | 69 +++++++++++++++++- src/wallet/keychain/wallet.rs | 6 +- 4 files changed, 225 insertions(+), 31 deletions(-) diff --git a/src/bdk/pg/cache.rs b/src/bdk/pg/cache.rs index 6b915c84..539f3957 100644 --- a/src/bdk/pg/cache.rs +++ b/src/bdk/pg/cache.rs @@ -338,12 +338,16 @@ impl WalletCache { Ok(()) } - pub(super) fn record_and_enqueue_missing_txid(&self, txid: Txid) -> Result<(), bdk::Error> { - self.lock_missing_txids()?.insert(txid); + pub(super) fn enqueue_pending_tx_miss(&self, txid: Txid) -> Result<(), bdk::Error> { self.lock_pending_tx_misses()?.insert(txid); Ok(()) } + pub(super) fn pending_tx_miss_queued(&self, txid: &Txid) -> Result { + let pending = self.lock_pending_tx_misses()?; + Ok(pending.contains(txid)) + } + pub(super) fn mark_txid_not_missing(&self, txid: &Txid) -> Result<(), bdk::Error> { self.lock_missing_txids()?.remove(txid); self.lock_pending_tx_misses()?.remove(txid); @@ -365,6 +369,26 @@ impl WalletCache { Ok(drained) } + pub(super) fn drain_pending_tx_lookups_including( + &self, + required_txid: Txid, + max: usize, + ) -> Result, bdk::Error> { + let mut pending = self.lock_pending_tx_lookups()?; + let mut drained = Vec::with_capacity(max.max(1)); + drained.push(required_txid); + pending.remove(&required_txid); + + let additional = max.max(1).saturating_sub(1); + let rest: Vec<_> = pending.iter().take(additional).copied().collect(); + for txid in &rest { + pending.remove(txid); + } + drained.extend(rest); + + Ok(drained) + } + pub(super) fn requeue_pending_tx_lookups(&self, txids: I) -> Result<(), bdk::Error> where I: IntoIterator, @@ -383,6 +407,26 @@ impl WalletCache { Ok(drained) } + pub(super) fn drain_pending_tx_misses_including( + &self, + required_txid: Txid, + max: usize, + ) -> Result, bdk::Error> { + let mut pending = self.lock_pending_tx_misses()?; + let mut drained = Vec::with_capacity(max.max(1)); + drained.push(required_txid); + pending.remove(&required_txid); + + let additional = max.max(1).saturating_sub(1); + let rest: Vec<_> = pending.iter().take(additional).copied().collect(); + for txid in &rest { + pending.remove(txid); + } + drained.extend(rest); + + Ok(drained) + } + pub(super) fn requeue_pending_tx_misses(&self, txids: I) -> Result<(), bdk::Error> where I: IntoIterator, diff --git a/src/bdk/pg/lookups.rs b/src/bdk/pg/lookups.rs index ba49fa09..2b762d13 100644 --- a/src/bdk/pg/lookups.rs +++ b/src/bdk/pg/lookups.rs @@ -40,6 +40,16 @@ impl Default for MissResolutionPolicy { } impl SqlxWalletDb { + fn unresolved_txids( + pending: Vec, + found: &HashMap, + ) -> Vec { + pending + .into_iter() + .filter(|txid| !found.contains_key(txid)) + .collect() + } + fn lookup_cached_tx_with_mode( &self, txid: &Txid, @@ -101,19 +111,27 @@ impl SqlxWalletDb { Ok(()) } - fn resolve_pending_tx_misses(&self) -> Result<(), bdk::Error> { - if !self - .cache - .should_batch_resolve_tx_misses(self.miss_resolution.tx_miss_threshold)? + fn resolve_pending_tx_misses_internal( + &self, + force_txid: Option, + ) -> Result { + if force_txid.is_none() + && !self + .cache + .should_batch_resolve_tx_misses(self.miss_resolution.tx_miss_threshold)? { - return Ok(()); + return Ok(false); } - let pending = self - .cache - .drain_pending_tx_misses(self.miss_resolution.tx_miss_batch_size)?; + let pending = if let Some(txid) = force_txid { + self.cache + .drain_pending_tx_misses_including(txid, self.miss_resolution.tx_miss_batch_size)? + } else { + self.cache + .drain_pending_tx_misses(self.miss_resolution.tx_miss_batch_size)? + }; if pending.is_empty() { - return Ok(()); + return Ok(false); } let found = match self @@ -128,30 +146,51 @@ impl SqlxWalletDb { } }; + let unresolved = Self::unresolved_txids(pending, &found); + if !found.is_empty() { self.cache.extend_txs(found)?; } + if !unresolved.is_empty() { + self.cache.requeue_pending_tx_misses(unresolved)?; + } + + Ok(true) + } + + fn resolve_pending_tx_misses(&self) -> Result<(), bdk::Error> { + let _ = self.resolve_pending_tx_misses_internal(None)?; Ok(()) } + fn resolve_pending_tx_misses_force(&self, txid: Txid) -> Result { + self.resolve_pending_tx_misses_internal(Some(txid)) + } + fn resolve_pending_tx_lookups_internal( &self, - force: bool, - ) -> Result, bdk::Error> { - if !force + force_txid: Option, + ) -> Result { + if force_txid.is_none() && !self .cache .should_batch_resolve_tx_lookups(self.miss_resolution.tx_lookup_threshold)? { - return Ok(None); + return Ok(false); } - let pending = self - .cache - .drain_pending_tx_lookups(self.miss_resolution.tx_lookup_batch_size)?; + let pending = if let Some(txid) = force_txid { + self.cache.drain_pending_tx_lookups_including( + txid, + self.miss_resolution.tx_lookup_batch_size, + )? + } else { + self.cache + .drain_pending_tx_lookups(self.miss_resolution.tx_lookup_batch_size)? + }; if pending.is_empty() { - return Ok(None); + return Ok(false); } let found = match self @@ -171,16 +210,16 @@ impl SqlxWalletDb { self.cache.extend_txs(found)?; } - Ok(Some(n_found)) + Ok(true) } fn resolve_pending_tx_lookups(&self) -> Result<(), bdk::Error> { - let _ = self.resolve_pending_tx_lookups_internal(false)?; + let _ = self.resolve_pending_tx_lookups_internal(None)?; Ok(()) } - fn resolve_pending_tx_lookups_force(&self) -> Result, bdk::Error> { - self.resolve_pending_tx_lookups_internal(true) + fn resolve_pending_tx_lookups_force(&self, txid: Txid) -> Result { + self.resolve_pending_tx_lookups_internal(Some(txid)) } pub(super) fn lookup_script_pubkey_path( @@ -249,6 +288,19 @@ impl SqlxWalletDb { } if self.cache.txid_marked_missing(txid)? { + if self.cache.pending_tx_miss_queued(txid)? + && self.resolve_pending_tx_misses_force(*txid)? + { + if let Some(result) = self.lookup_cached_tx_with_mode( + txid, + mode, + "forced_miss_resolve", + "forced_miss_resolve_mode_miss", + )? { + return Ok(result); + } + } + tracing::trace!("tx miss cache hit"); return Ok((None, "miss_cache")); } @@ -267,7 +319,7 @@ impl SqlxWalletDb { return Ok(result); } - if self.resolve_pending_tx_lookups_force()?.is_some() { + if self.resolve_pending_tx_lookups_force(*txid)? { if let Some(result) = self.lookup_cached_tx_with_mode( txid, mode, @@ -276,6 +328,10 @@ impl SqlxWalletDb { )? { return Ok(result); } + + // Mark this txid as missing immediately after the forced batch query so concurrent + // callers don't re-enqueue it while this lookup continues through miss resolution. + self.cache.record_missing_txid(*txid)?; } self.resolve_pending_tx_misses()?; @@ -285,7 +341,8 @@ impl SqlxWalletDb { return Ok(result); } - self.cache.record_and_enqueue_missing_txid(*txid)?; + self.cache.record_missing_txid(*txid)?; + self.cache.enqueue_pending_tx_miss(*txid)?; Ok((None, "db_miss")) } @@ -346,3 +403,33 @@ impl SqlxWalletDb { txs } } + +#[cfg(test)] +mod tests { + use super::*; + use bdk::bitcoin::hashes::Hash; + + fn tx_details(txid: Txid) -> TransactionDetails { + TransactionDetails { + transaction: None, + txid, + received: 0, + sent: 0, + fee: None, + confirmation_time: None, + } + } + + #[test] + fn unresolved_txids_keeps_only_not_found_entries() { + let first = Txid::all_zeros(); + let second = Txid::from_slice(&[1; 32]).expect("valid txid"); + let third = Txid::from_slice(&[2; 32]).expect("valid txid"); + + let mut found = HashMap::new(); + found.insert(second, tx_details(second)); + + let unresolved = SqlxWalletDb::unresolved_txids(vec![first, second, third], &found); + assert_eq!(unresolved, vec![first, third]); + } +} diff --git a/src/bdk/pg/mod.rs b/src/bdk/pg/mod.rs index f0cddf7c..70bb63fa 100644 --- a/src/bdk/pg/mod.rs +++ b/src/bdk/pg/mod.rs @@ -331,6 +331,60 @@ mod tests { assert!(drained_after_insert.is_empty()); } + #[test] + fn wallet_cache_forced_lookup_drain_includes_requested_txid() { + let cache = WalletCache::new(); + let required = Txid::all_zeros(); + let first = Txid::from_slice(&[1; 32]).expect("valid txid"); + let second = Txid::from_slice(&[2; 32]).expect("valid txid"); + + cache + .enqueue_pending_tx_lookup(first) + .expect("enqueue should succeed"); + cache + .enqueue_pending_tx_lookup(second) + .expect("enqueue should succeed"); + + let drained = cache + .drain_pending_tx_lookups_including(required, 2) + .expect("forced drain should succeed"); + assert_eq!(drained[0], required); + assert_eq!(drained.len(), 2); + assert!(drained.contains(&first) || drained.contains(&second)); + + let remaining = cache + .drain_pending_tx_lookups(10) + .expect("drain should succeed"); + assert_eq!(remaining.len(), 1); + } + + #[test] + fn wallet_cache_forced_tx_miss_drain_includes_requested_txid() { + let cache = WalletCache::new(); + let required = Txid::all_zeros(); + let first = Txid::from_slice(&[1; 32]).expect("valid txid"); + let second = Txid::from_slice(&[2; 32]).expect("valid txid"); + + cache + .enqueue_pending_tx_miss(first) + .expect("enqueue should succeed"); + cache + .enqueue_pending_tx_miss(second) + .expect("enqueue should succeed"); + + let drained = cache + .drain_pending_tx_misses_including(required, 2) + .expect("forced drain should succeed"); + assert_eq!(drained[0], required); + assert_eq!(drained.len(), 2); + assert!(drained.contains(&first) || drained.contains(&second)); + + let remaining = cache + .drain_pending_tx_misses(10) + .expect("drain should succeed"); + assert_eq!(remaining.len(), 1); + } + #[test] fn cache_loaded_script_pubkeys_marks_mask_and_populates_paths() { let cache = WalletCache::new(); @@ -375,11 +429,17 @@ mod tests { let another = Txid::from_slice(&[1; 32]).expect("valid txid"); cache - .record_and_enqueue_missing_txid(txid) + .record_missing_txid(txid) .expect("mark should succeed"); cache - .record_and_enqueue_missing_txid(another) + .enqueue_pending_tx_miss(txid) + .expect("enqueue should succeed"); + cache + .record_missing_txid(another) .expect("mark should succeed"); + cache + .enqueue_pending_tx_miss(another) + .expect("enqueue should succeed"); assert!(cache .txid_marked_missing(&txid) .expect("lookup should succeed")); @@ -438,8 +498,11 @@ mod tests { let txid = Txid::all_zeros(); cache - .record_and_enqueue_missing_txid(txid) + .record_missing_txid(txid) .expect("mark should succeed"); + cache + .enqueue_pending_tx_miss(txid) + .expect("enqueue should succeed"); assert!(cache .txid_marked_missing(&txid) .expect("lookup should succeed")); diff --git a/src/wallet/keychain/wallet.rs b/src/wallet/keychain/wallet.rs index cd217bc5..da36636c 100644 --- a/src/wallet/keychain/wallet.rs +++ b/src/wallet/keychain/wallet.rs @@ -52,7 +52,7 @@ impl Default for SyncProgressContext { const PROGRESS_BUCKET_SIZE_PCT: u8 = 10; const PROGRESS_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(30); -const TX_PREWARM_THRESHOLD: i64 = 5_000; +const TX_PREWARM_THRESHOLD: i64 = 1_000; const fn completion_bucket() -> u8 { 100 / PROGRESS_BUCKET_SIZE_PCT @@ -365,7 +365,7 @@ mod tests { #[test] fn prewarm_enabled_for_large_wallets_only() { - assert!(!should_prewarm_raw_txs(4_999)); - assert!(should_prewarm_raw_txs(5_000)); + assert!(!should_prewarm_raw_txs(999)); + assert!(should_prewarm_raw_txs(1_000)); } } From 3806bf9a658a55eaba1d0b1a9f07d3aa2eeb2711 Mon Sep 17 00:00:00 2001 From: Juan P Lopez Date: Fri, 17 Apr 2026 16:17:44 -0500 Subject: [PATCH 3/8] fix(bdk): preserve mode misses in forced tx lookups --- src/bdk/pg/lookups.rs | 79 +++++++++++++++++++++++++++++++------------ 1 file changed, 58 insertions(+), 21 deletions(-) diff --git a/src/bdk/pg/lookups.rs b/src/bdk/pg/lookups.rs index 2b762d13..9fbce231 100644 --- a/src/bdk/pg/lookups.rs +++ b/src/bdk/pg/lookups.rs @@ -10,6 +10,13 @@ type LookupSource = &'static str; type ScriptPathLookup = (Option<(KeychainKind, u32)>, LookupSource); type TxLookup = (Option, LookupSource); +enum ForcedTxLookupOutcome { + NotQueried, + NotFound, + FoundModeMatch(TransactionDetails), + FoundModeMiss, +} + #[derive(Copy, Clone, Eq, PartialEq)] pub(super) enum TxLookupMode { Any, @@ -171,13 +178,14 @@ impl SqlxWalletDb { fn resolve_pending_tx_lookups_internal( &self, force_txid: Option, - ) -> Result { + mode: TxLookupMode, + ) -> Result { if force_txid.is_none() && !self .cache .should_batch_resolve_tx_lookups(self.miss_resolution.tx_lookup_threshold)? { - return Ok(false); + return Ok(ForcedTxLookupOutcome::NotQueried); } let pending = if let Some(txid) = force_txid { @@ -190,7 +198,7 @@ impl SqlxWalletDb { .drain_pending_tx_lookups(self.miss_resolution.tx_lookup_batch_size)? }; if pending.is_empty() { - return Ok(false); + return Ok(ForcedTxLookupOutcome::NotQueried); } let found = match self @@ -204,22 +212,38 @@ impl SqlxWalletDb { return Err(error); } }; - let n_found = found.len(); - if n_found > 0 { + let forced_outcome = force_txid.map_or(ForcedTxLookupOutcome::NotQueried, |txid| { + found + .get(&txid) + .cloned() + .map_or(ForcedTxLookupOutcome::NotFound, |tx| { + if Self::tx_matches_lookup_mode(&tx, mode) { + ForcedTxLookupOutcome::FoundModeMatch(tx) + } else { + ForcedTxLookupOutcome::FoundModeMiss + } + }) + }); + + if !found.is_empty() { self.cache.extend_txs(found)?; } - Ok(true) + Ok(forced_outcome) } fn resolve_pending_tx_lookups(&self) -> Result<(), bdk::Error> { - let _ = self.resolve_pending_tx_lookups_internal(None)?; + let _ = self.resolve_pending_tx_lookups_internal(None, TxLookupMode::Any)?; Ok(()) } - fn resolve_pending_tx_lookups_force(&self, txid: Txid) -> Result { - self.resolve_pending_tx_lookups_internal(Some(txid)) + fn resolve_pending_tx_lookups_force( + &self, + txid: Txid, + mode: TxLookupMode, + ) -> Result { + self.resolve_pending_tx_lookups_internal(Some(txid), mode) } pub(super) fn lookup_script_pubkey_path( @@ -319,19 +343,21 @@ impl SqlxWalletDb { return Ok(result); } - if self.resolve_pending_tx_lookups_force(*txid)? { - if let Some(result) = self.lookup_cached_tx_with_mode( - txid, - mode, - "forced_batch_lookup", - "forced_batch_lookup_mode_miss", - )? { - return Ok(result); + match self.resolve_pending_tx_lookups_force(*txid, mode)? { + ForcedTxLookupOutcome::FoundModeMatch(tx) => { + return Ok((Some(tx), "forced_batch_lookup")); + } + ForcedTxLookupOutcome::FoundModeMiss => { + return Ok((None, "forced_batch_lookup_mode_miss")); + } + ForcedTxLookupOutcome::NotFound => { + // Mark this txid as missing immediately after the forced batch query so concurrent + // callers don't re-enqueue it while this lookup continues through miss resolution. + self.cache.record_missing_txid(*txid)?; + } + ForcedTxLookupOutcome::NotQueried => { + debug_assert!(false, "forced lookup should always query requested txid"); } - - // Mark this txid as missing immediately after the forced batch query so concurrent - // callers don't re-enqueue it while this lookup continues through miss resolution. - self.cache.record_missing_txid(*txid)?; } self.resolve_pending_tx_misses()?; @@ -432,4 +458,15 @@ mod tests { let unresolved = SqlxWalletDb::unresolved_txids(vec![first, second, third], &found); assert_eq!(unresolved, vec![first, third]); } + + #[test] + fn summary_tx_in_require_raw_mode_is_mode_miss_not_absent() { + let tx = tx_details(Txid::all_zeros()); + + assert!(!SqlxWalletDb::tx_matches_lookup_mode( + &tx, + TxLookupMode::RequireRaw + )); + assert!(SqlxWalletDb::tx_matches_lookup_mode(&tx, TxLookupMode::Any)); + } } From 2078e3a1c7deb38be4195b3bbbf8a9bbc437cfbd Mon Sep 17 00:00:00 2001 From: Juan P Lopez Date: Fri, 17 Apr 2026 19:36:03 -0500 Subject: [PATCH 4/8] fix(bdk): avoid repeated forced miss retries --- src/bdk/pg/cache.rs | 24 ++++++++++++++++++++++ src/bdk/pg/lookups.rs | 9 ++++++--- src/bdk/pg/mod.rs | 46 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 76 insertions(+), 3 deletions(-) diff --git a/src/bdk/pg/cache.rs b/src/bdk/pg/cache.rs index 539f3957..8f386183 100644 --- a/src/bdk/pg/cache.rs +++ b/src/bdk/pg/cache.rs @@ -23,6 +23,9 @@ pub(super) struct WalletCache { // Txids not yet seen in the in-process cache and not yet known-missing. These are batched // before we fall back to recording a miss. pending_tx_lookups: Arc>>, + // Tracks txids that already consumed their one targeted miss-cache retry in this + // SqlxWalletDb lifetime, while still allowing later threshold-driven batch retries. + forced_tx_miss_retries: Arc>>, // Process-local hint for which keychain script path sets are fully hydrated. // Bit 0: external, bit 1: internal. // This is intentionally not synchronized across processes. @@ -53,6 +56,7 @@ impl WalletCache { pending_script_misses: Arc::new(Mutex::new(HashSet::new())), pending_tx_misses: Arc::new(Mutex::new(HashSet::new())), pending_tx_lookups: Arc::new(Mutex::new(HashSet::new())), + forced_tx_miss_retries: Arc::new(Mutex::new(HashSet::new())), script_pubkeys_loaded_mask: Arc::new(AtomicU8::new(0)), raw_txs_fully_loaded: Arc::new(AtomicBool::new(false)), summary_txs_fully_loaded: Arc::new(AtomicBool::new(false)), @@ -109,6 +113,10 @@ impl WalletCache { self.lock_with_error(&self.pending_tx_lookups, "pending tx lookups cache") } + fn lock_forced_tx_miss_retries(&self) -> Result>, bdk::Error> { + self.lock_with_error(&self.forced_tx_miss_retries, "forced tx miss retries cache") + } + pub(super) fn get_script_pubkey_path( &self, script: &Script, @@ -189,9 +197,11 @@ impl WalletCache { { let mut missing = self.lock_missing_txids()?; let mut pending = self.lock_pending_tx_misses()?; + let mut forced_retries = self.lock_forced_tx_miss_retries()?; for txid in txids { missing.remove(txid); pending.remove(txid); + forced_retries.remove(txid); } Ok(()) } @@ -278,6 +288,7 @@ impl WalletCache { let mut cache = self.lock_transactions()?; cache.remove(txid); } + self.lock_forced_tx_miss_retries()?.remove(txid); self.record_missing_txid(*txid)?; Ok(()) } @@ -348,10 +359,22 @@ impl WalletCache { Ok(pending.contains(txid)) } + #[cfg(test)] + pub(super) fn forced_tx_miss_retry_recorded(&self, txid: &Txid) -> Result { + let forced_retries = self.lock_forced_tx_miss_retries()?; + Ok(forced_retries.contains(txid)) + } + + pub(super) fn claim_forced_tx_miss_retry(&self, txid: Txid) -> Result { + let mut forced_retries = self.lock_forced_tx_miss_retries()?; + Ok(forced_retries.insert(txid)) + } + pub(super) fn mark_txid_not_missing(&self, txid: &Txid) -> Result<(), bdk::Error> { self.lock_missing_txids()?.remove(txid); self.lock_pending_tx_misses()?.remove(txid); self.lock_pending_tx_lookups()?.remove(txid); + self.lock_forced_tx_miss_retries()?.remove(txid); Ok(()) } @@ -468,6 +491,7 @@ impl WalletCache { Self::clear_even_if_poisoned(&self.pending_script_misses); Self::clear_even_if_poisoned(&self.pending_tx_misses); Self::clear_even_if_poisoned(&self.pending_tx_lookups); + Self::clear_even_if_poisoned(&self.forced_tx_miss_retries); self.script_pubkeys_loaded_mask.store(0, Ordering::Release); self.raw_txs_fully_loaded.store(false, Ordering::Release); diff --git a/src/bdk/pg/lookups.rs b/src/bdk/pg/lookups.rs index 9fbce231..3a964d77 100644 --- a/src/bdk/pg/lookups.rs +++ b/src/bdk/pg/lookups.rs @@ -312,9 +312,12 @@ impl SqlxWalletDb { } if self.cache.txid_marked_missing(txid)? { - if self.cache.pending_tx_miss_queued(txid)? - && self.resolve_pending_tx_misses_force(*txid)? - { + let should_force_miss_retry = if self.cache.pending_tx_miss_queued(txid)? { + self.cache.claim_forced_tx_miss_retry(*txid)? + } else { + false + }; + if should_force_miss_retry && self.resolve_pending_tx_misses_force(*txid)? { if let Some(result) = self.lookup_cached_tx_with_mode( txid, mode, diff --git a/src/bdk/pg/mod.rs b/src/bdk/pg/mod.rs index 70bb63fa..a80e85d1 100644 --- a/src/bdk/pg/mod.rs +++ b/src/bdk/pg/mod.rs @@ -385,6 +385,52 @@ mod tests { assert_eq!(remaining.len(), 1); } + #[test] + fn wallet_cache_clears_forced_tx_miss_retry_tracking_when_tx_is_loaded() { + let cache = WalletCache::new(); + let txid = Txid::all_zeros(); + + cache + .claim_forced_tx_miss_retry(txid) + .expect("record should succeed"); + cache + .record_missing_txid(txid) + .expect("mark should succeed"); + cache + .enqueue_pending_tx_miss(txid) + .expect("enqueue should succeed"); + + cache + .extend_summary_txs([(txid, tx_details(txid))]) + .expect("extend should succeed"); + + assert!(!cache + .forced_tx_miss_retry_recorded(&txid) + .expect("lookup should succeed")); + } + + #[test] + fn wallet_cache_remove_tx_resets_forced_tx_miss_retry_tracking() { + let cache = WalletCache::new(); + let txid = Txid::all_zeros(); + + cache + .claim_forced_tx_miss_retry(txid) + .expect("record should succeed"); + cache + .extend_txs([(txid, tx_details(txid))]) + .expect("extend should succeed"); + cache + .claim_forced_tx_miss_retry(txid) + .expect("record should succeed"); + + cache.remove_tx(&txid).expect("remove should succeed"); + + assert!(!cache + .forced_tx_miss_retry_recorded(&txid) + .expect("lookup should succeed")); + } + #[test] fn cache_loaded_script_pubkeys_marks_mask_and_populates_paths() { let cache = WalletCache::new(); From aa727c213b3b01f9478c8b54b63b3eb33d822dc5 Mon Sep 17 00:00:00 2001 From: Juan P Lopez Date: Fri, 17 Apr 2026 19:51:59 -0500 Subject: [PATCH 5/8] fix(sync): make tx prewarm best effort --- src/wallet/keychain/wallet.rs | 38 ++++++++++++++++++++++++++--------- 1 file changed, 29 insertions(+), 9 deletions(-) diff --git a/src/wallet/keychain/wallet.rs b/src/wallet/keychain/wallet.rs index da36636c..138c3507 100644 --- a/src/wallet/keychain/wallet.rs +++ b/src/wallet/keychain/wallet.rs @@ -250,15 +250,35 @@ impl KeychainWallet { let _ = wallet.ensure_addresses_cached(max_last_index.saturating_add(1))?; - let tx_count = wallet.database().tx_count()?; - if should_prewarm_raw_txs(tx_count) { - let prewarmed_txs = wallet.database().prewarm_raw_txs()?; - tracing::info!( - tx_count, - prewarmed_txs, - threshold = TX_PREWARM_THRESHOLD, - "prewarmed raw tx cache before wallet sync" - ); + match wallet.database().tx_count() { + Ok(tx_count) if should_prewarm_raw_txs(tx_count) => { + match wallet.database().prewarm_raw_txs() { + Ok(prewarmed_txs) => { + tracing::info!( + tx_count, + prewarmed_txs, + threshold = TX_PREWARM_THRESHOLD, + "prewarmed raw tx cache before wallet sync" + ); + } + Err(error) => { + tracing::warn!( + ?error, + tx_count, + threshold = TX_PREWARM_THRESHOLD, + "failed to prewarm raw tx cache before wallet sync; continuing" + ); + } + } + } + Ok(_) => {} + Err(error) => { + tracing::warn!( + ?error, + threshold = TX_PREWARM_THRESHOLD, + "failed to load tx count for raw tx cache prewarm; continuing without prewarm" + ); + } } let progress = TracingBdkProgress::new(context, wallet_id, keychain_id); From 389f45d3780ffe08d1c698f103d4bfc9a4c06de1 Mon Sep 17 00:00:00 2001 From: Juan P Lopez Date: Fri, 17 Apr 2026 20:02:28 -0500 Subject: [PATCH 6/8] fix(bdk): avoid lock inversion in remove_tx --- src/bdk/pg/cache.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/bdk/pg/cache.rs b/src/bdk/pg/cache.rs index 8f386183..f371ad0e 100644 --- a/src/bdk/pg/cache.rs +++ b/src/bdk/pg/cache.rs @@ -288,7 +288,9 @@ impl WalletCache { let mut cache = self.lock_transactions()?; cache.remove(txid); } - self.lock_forced_tx_miss_retries()?.remove(txid); + { + self.lock_forced_tx_miss_retries()?.remove(txid); + } self.record_missing_txid(*txid)?; Ok(()) } From 1811f3d615336bfd47e5abae402a56013f27d608 Mon Sep 17 00:00:00 2001 From: Juan P Lopez Date: Fri, 17 Apr 2026 20:15:24 -0500 Subject: [PATCH 7/8] perf(bdk): avoid extra prewarm tx allocation --- src/bdk/pg/mod.rs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/bdk/pg/mod.rs b/src/bdk/pg/mod.rs index a80e85d1..20c9437a 100644 --- a/src/bdk/pg/mod.rs +++ b/src/bdk/pg/mod.rs @@ -82,9 +82,14 @@ impl SqlxWalletDb { } pub fn prewarm_raw_txs(&self) -> Result { - use bdk::database::Database; - - Ok(Database::iter_txs(self, true)?.len()) + let loaded = self + .ctx + .rt + .block_on(async { self.transactions_repo().load_all().await })?; + let loaded_count = loaded.len(); + self.cache.extend_txs(loaded)?; + self.cache.set_raw_txs_fully_loaded(); + Ok(loaded_count) } fn script_pubkeys_repo(&self) -> ScriptPubkeys { From c5e875bf4c7026fad946a0b2eb35e21686707daa Mon Sep 17 00:00:00 2001 From: Juan P Lopez Date: Fri, 17 Apr 2026 20:37:29 -0500 Subject: [PATCH 8/8] fix(bdk): queue forced misses before retry resolution --- src/bdk/pg/lookups.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/bdk/pg/lookups.rs b/src/bdk/pg/lookups.rs index 3a964d77..6790cf66 100644 --- a/src/bdk/pg/lookups.rs +++ b/src/bdk/pg/lookups.rs @@ -357,6 +357,7 @@ impl SqlxWalletDb { // Mark this txid as missing immediately after the forced batch query so concurrent // callers don't re-enqueue it while this lookup continues through miss resolution. self.cache.record_missing_txid(*txid)?; + self.cache.enqueue_pending_tx_miss(*txid)?; } ForcedTxLookupOutcome::NotQueried => { debug_assert!(false, "forced lookup should always query requested txid");