diff --git a/src/bdk/pg/cache.rs b/src/bdk/pg/cache.rs index f371ad0e..65d5a551 100644 --- a/src/bdk/pg/cache.rs +++ b/src/bdk/pg/cache.rs @@ -3,29 +3,229 @@ use bdk::{ KeychainKind, TransactionDetails, }; use std::{ + borrow::Borrow, collections::{HashMap, HashSet}, + hash::Hash, sync::atomic::{AtomicBool, AtomicU8, Ordering}, sync::{Arc, Mutex, MutexGuard, PoisonError}, }; use super::{ScriptPubkeyCache, SqlxWalletDb, TransactionCache}; +struct Tracked(Mutex); + +impl Tracked { + fn new(value: T) -> Self { + Self(Mutex::new(value)) + } + + fn lock(&self, context: &'static str) -> Result, bdk::Error> { + self.0 + .lock() + .map_err(|_| bdk::Error::Generic(format!("{context} lock poisoned"))) + } +} + +impl Tracked +where + T: Default, +{ + fn clear_even_if_poisoned(&self) { + let mut guard = self.0.lock().unwrap_or_else(PoisonError::into_inner); + *guard = T::default(); + self.0.clear_poison(); + } +} + +struct PendingSet { + inner: Tracked>, + context: &'static str, +} + +impl PendingSet +where + T: Eq + Hash, +{ + fn new(context: &'static str) -> Self { + Self { + inner: Tracked::new(HashSet::new()), + context, + } + } + + fn lock(&self) -> Result>, bdk::Error> { + self.inner.lock(self.context) + } + + fn insert(&self, value: T) -> Result<(), bdk::Error> { + self.inner.lock(self.context)?.insert(value); + Ok(()) + } + + fn contains(&self, value: &Q) -> Result + where + T: Borrow, + Q: Eq + Hash + ?Sized, + { + Ok(self.inner.lock(self.context)?.contains(value)) + } + + fn drain(&self, max: usize) -> Result, bdk::Error> + where + T: Clone, + { + let mut pending = self.inner.lock(self.context)?; + let drained: Vec<_> = pending.iter().take(max).cloned().collect(); + for value in &drained { + pending.remove(value); + } + Ok(drained) + } + + fn requeue(&self, values: I) -> Result<(), bdk::Error> + where + I: IntoIterator, + { + self.inner.lock(self.context)?.extend(values); + Ok(()) + } + + fn should_batch(&self, threshold: usize) -> Result { + Ok(self.inner.lock(self.context)?.len() >= threshold) + } + + fn clear_even_if_poisoned(&self) { + self.inner.clear_even_if_poisoned(); + } +} + +struct MissTracker { + confirmed: Tracked>, + confirmed_context: &'static str, + pending: PendingSet, +} + +impl MissTracker +where + T: Eq + Hash, +{ + fn new(confirmed_context: &'static str, pending_context: &'static str) -> Self { + Self { + confirmed: Tracked::new(HashSet::new()), + confirmed_context, + pending: PendingSet::new(pending_context), + } + } + + fn is_missing(&self, value: &Q) -> Result + where + T: Borrow, + Q: Eq + Hash + ?Sized, + { + Ok(self.confirmed.lock(self.confirmed_context)?.contains(value)) + } + + fn record(&self, value: T) -> Result<(), bdk::Error> { + self.confirmed.lock(self.confirmed_context)?.insert(value); + Ok(()) + } + + fn enqueue_pending(&self, value: T) -> Result<(), bdk::Error> { + self.pending.insert(value) + } + + fn pending_contains(&self, value: &Q) -> Result + where + T: Borrow, + Q: Eq + Hash + ?Sized, + { + self.pending.contains(value) + } + + fn record_and_enqueue(&self, value: T) -> Result<(), bdk::Error> + where + T: Clone, + { + let (mut confirmed, mut pending) = self.lock_both()?; + confirmed.insert(value.clone()); + pending.insert(value); + Ok(()) + } + + fn clear(&self, value: &Q) -> Result<(), bdk::Error> + where + T: Borrow, + Q: Eq + Hash + ?Sized, + { + let (mut confirmed, mut pending) = self.lock_both()?; + confirmed.remove(value); + pending.remove(value); + Ok(()) + } + + fn clear_many<'a, I>(&self, values: I) -> Result<(), bdk::Error> + where + I: IntoIterator, + T: 'a, + { + let (mut confirmed, mut pending) = self.lock_both()?; + for value in values { + confirmed.remove(value); + pending.remove(value); + } + Ok(()) + } + + // LOCK ORDER INVARIANT: always acquire confirmed before pending. + // Use this method whenever both guards are needed simultaneously. + fn lock_both( + &self, + ) -> Result<(MutexGuard<'_, HashSet>, MutexGuard<'_, HashSet>), bdk::Error> { + let confirmed = self.confirmed.lock(self.confirmed_context)?; + let pending = self.pending.lock()?; + Ok((confirmed, pending)) + } + + fn lock_pending(&self) -> Result>, bdk::Error> { + self.pending.lock() + } + + fn drain_pending(&self, max: usize) -> Result, bdk::Error> + where + T: Clone, + { + self.pending.drain(max) + } + + fn requeue_pending(&self, values: I) -> Result<(), bdk::Error> + where + I: IntoIterator, + { + self.pending.requeue(values) + } + + fn should_batch_pending(&self, threshold: usize) -> Result { + self.pending.should_batch(threshold) + } + + fn clear_even_if_poisoned(&self) { + self.confirmed.clear_even_if_poisoned(); + self.pending.clear_even_if_poisoned(); + } +} + #[derive(Clone)] pub(super) struct WalletCache { - script_pubkeys: Arc>, - transactions: Arc>, - missing_script_pubkeys: Arc>>, - missing_txids: Arc>>, - pending_script_misses: Arc>>, - // Txids confirmed absent from the DB in this process. These are retried in batches to - // recover from races where rows appear after an earlier miss. - pending_tx_misses: Arc>>, + script_pubkeys: Arc>, + transactions: Arc>, + script_misses: Arc>, + tx_misses: Arc>, // Txids not yet seen in the in-process cache and not yet known-missing. These are batched // before we fall back to recording a miss. - pending_tx_lookups: Arc>>, + pending_tx_lookups: Arc>>, // Tracks txids that already consumed their one targeted miss-cache retry in this // SqlxWalletDb lifetime, while still allowing later threshold-driven batch retries. - forced_tx_miss_retries: Arc>>, + forced_tx_miss_retries: Arc>>, // Process-local hint for which keychain script path sets are fully hydrated. // Bit 0: external, bit 1: internal. // This is intentionally not synchronized across processes. @@ -38,41 +238,26 @@ pub(super) struct WalletCache { } impl WalletCache { - fn clear_even_if_poisoned(mutex: &Mutex) - where - T: Default, - { - let mut guard = mutex.lock().unwrap_or_else(PoisonError::into_inner); - *guard = T::default(); - mutex.clear_poison(); - } - pub(super) fn new() -> Self { Self { - script_pubkeys: Arc::new(Mutex::new(HashMap::new())), - transactions: Arc::new(Mutex::new(HashMap::new())), - missing_script_pubkeys: Arc::new(Mutex::new(HashSet::new())), - missing_txids: Arc::new(Mutex::new(HashSet::new())), - pending_script_misses: Arc::new(Mutex::new(HashSet::new())), - pending_tx_misses: Arc::new(Mutex::new(HashSet::new())), - pending_tx_lookups: Arc::new(Mutex::new(HashSet::new())), - forced_tx_miss_retries: Arc::new(Mutex::new(HashSet::new())), + script_pubkeys: Arc::new(Tracked::new(HashMap::new())), + transactions: Arc::new(Tracked::new(HashMap::new())), + script_misses: Arc::new(MissTracker::new( + "missing script pubkeys cache", + "pending script misses cache", + )), + tx_misses: Arc::new(MissTracker::new( + "missing txids cache", + "pending tx misses cache", + )), + pending_tx_lookups: Arc::new(Tracked::new(HashSet::new())), + forced_tx_miss_retries: Arc::new(Tracked::new(HashSet::new())), script_pubkeys_loaded_mask: Arc::new(AtomicU8::new(0)), raw_txs_fully_loaded: Arc::new(AtomicBool::new(false)), summary_txs_fully_loaded: Arc::new(AtomicBool::new(false)), } } - fn lock_with_error<'a, T>( - &self, - mutex: &'a Mutex, - context: &'static str, - ) -> Result, bdk::Error> { - mutex - .lock() - .map_err(|_| bdk::Error::Generic(format!("{context} lock poisoned"))) - } - fn script_pubkey_mask_for(keychain: Option) -> u8 { const EXTERNAL: u8 = 1; const INTERNAL: u8 = 2; @@ -84,37 +269,24 @@ impl WalletCache { } fn lock_script_pubkeys(&self) -> Result, bdk::Error> { - self.lock_with_error(&self.script_pubkeys, "script pubkeys cache") + self.script_pubkeys.lock("script pubkeys cache") } fn lock_transactions(&self) -> Result, bdk::Error> { - self.lock_with_error(&self.transactions, "transactions cache") - } - - fn lock_missing_script_pubkeys( - &self, - ) -> Result>, bdk::Error> { - self.lock_with_error(&self.missing_script_pubkeys, "missing script pubkeys cache") - } - - fn lock_missing_txids(&self) -> Result>, bdk::Error> { - self.lock_with_error(&self.missing_txids, "missing txids cache") - } - - fn lock_pending_script_misses(&self) -> Result>, bdk::Error> { - self.lock_with_error(&self.pending_script_misses, "pending script misses cache") + self.transactions.lock("transactions cache") } fn lock_pending_tx_misses(&self) -> Result>, bdk::Error> { - self.lock_with_error(&self.pending_tx_misses, "pending tx misses cache") + self.tx_misses.lock_pending() } fn lock_pending_tx_lookups(&self) -> Result>, bdk::Error> { - self.lock_with_error(&self.pending_tx_lookups, "pending tx lookups cache") + self.pending_tx_lookups.lock("pending tx lookups cache") } fn lock_forced_tx_miss_retries(&self) -> Result>, bdk::Error> { - self.lock_with_error(&self.forced_tx_miss_retries, "forced tx miss retries cache") + self.forced_tx_miss_retries + .lock("forced tx miss retries cache") } pub(super) fn get_script_pubkey_path( @@ -142,13 +314,7 @@ impl WalletCache { where I: IntoIterator, { - let mut missing = self.lock_missing_script_pubkeys()?; - let mut pending = self.lock_pending_script_misses()?; - for script in scripts { - missing.remove(script.as_script()); - pending.remove(script); - } - Ok(()) + self.script_misses.clear_many(scripts) } pub(super) fn extend_script_pubkeys(&self, entries: I) -> Result<(), bdk::Error> @@ -156,7 +322,11 @@ impl WalletCache { I: IntoIterator, { let entries: Vec<_> = entries.into_iter().collect(); + // Miss tracking is cleared before cache insertion. Concurrent readers may briefly + // observe a mismatch between miss-tracking state and cache contents. self.clear_script_miss_tracking(entries.iter().map(|(script, _)| script))?; + #[cfg(test)] + test_support::pause_at(test_support::HookPoint::BeforeExtendScriptPubkeysInsert); let mut cache = self.lock_script_pubkeys()?; cache.extend(entries); Ok(()) @@ -191,16 +361,24 @@ impl WalletCache { Ok(cache.get(txid).cloned()) } + #[cfg(test)] + pub(super) fn insert_tx(&self, txid: Txid, tx: TransactionDetails) -> Result<(), bdk::Error> { + { + let mut cache = self.lock_transactions()?; + cache.insert(txid, tx); + } + self.mark_txid_not_missing(&txid)?; + Ok(()) + } + fn clear_tx_miss_tracking<'a, I>(&self, txids: I) -> Result<(), bdk::Error> where I: IntoIterator, { - let mut missing = self.lock_missing_txids()?; - let mut pending = self.lock_pending_tx_misses()?; + let txids: Vec<_> = txids.into_iter().copied().collect(); + self.tx_misses.clear_many(txids.iter())?; let mut forced_retries = self.lock_forced_tx_miss_retries()?; - for txid in txids { - missing.remove(txid); - pending.remove(txid); + for txid in &txids { forced_retries.remove(txid); } Ok(()) @@ -222,8 +400,12 @@ impl WalletCache { I: IntoIterator, { let entries: Vec<_> = entries.into_iter().collect(); + // Miss tracking is cleared before cache insertion. Concurrent readers may briefly + // observe a mismatch between miss-tracking state and cache contents. self.clear_tx_miss_tracking(entries.iter().map(|(txid, _)| txid))?; self.clear_pending_tx_lookups(entries.iter().map(|(txid, _)| txid))?; + #[cfg(test)] + test_support::pause_at(test_support::HookPoint::BeforeExtendTxsInsert); let mut cache = self.lock_transactions()?; cache.extend(entries); Ok(()) @@ -234,8 +416,12 @@ impl WalletCache { I: IntoIterator, { let entries: Vec<_> = entries.into_iter().collect(); + // Miss tracking is cleared before cache insertion. Concurrent readers may briefly + // observe a mismatch between miss-tracking state and cache contents. self.clear_tx_miss_tracking(entries.iter().map(|(txid, _)| txid))?; self.clear_pending_tx_lookups(entries.iter().map(|(txid, _)| txid))?; + #[cfg(test)] + test_support::pause_at(test_support::HookPoint::BeforeExtendSummaryTxsInsert); let mut cache = self.lock_transactions()?; for (txid, mut summary) in entries { // Summary refreshes may run after raw tx bytes were already hydrated. Preserve any @@ -296,69 +482,57 @@ impl WalletCache { } pub(super) fn script_marked_missing(&self, script: &Script) -> Result { - let missing = self.lock_missing_script_pubkeys()?; - Ok(missing.contains(script)) + self.script_misses.is_missing(script) } pub(super) fn record_missing_script(&self, script: ScriptBuf) -> Result<(), bdk::Error> { - self.lock_missing_script_pubkeys()?.insert(script); - Ok(()) + self.script_misses.record(script) } pub(super) fn record_and_enqueue_missing_script( &self, script: ScriptBuf, ) -> Result<(), bdk::Error> { - self.lock_missing_script_pubkeys()?.insert(script.clone()); - self.lock_pending_script_misses()?.insert(script); - Ok(()) + self.script_misses.record_and_enqueue(script) } pub(super) fn mark_script_not_missing(&self, script: &Script) -> Result<(), bdk::Error> { - self.lock_missing_script_pubkeys()?.remove(script); - self.lock_pending_script_misses()?.remove(script); - Ok(()) + self.script_misses.clear(script) } pub(super) fn drain_pending_script_misses( &self, max: usize, ) -> Result, bdk::Error> { - let mut pending = self.lock_pending_script_misses()?; - let drained: Vec<_> = pending.iter().take(max).cloned().collect(); - for script in &drained { - pending.remove(script); - } - Ok(drained) + self.script_misses.drain_pending(max) } pub(super) fn requeue_pending_script_misses(&self, scripts: I) -> Result<(), bdk::Error> where I: IntoIterator, { - let mut pending = self.lock_pending_script_misses()?; - pending.extend(scripts); - Ok(()) + self.script_misses.requeue_pending(scripts) } pub(super) fn txid_marked_missing(&self, txid: &Txid) -> Result { - let missing = self.lock_missing_txids()?; - Ok(missing.contains(txid)) + self.tx_misses.is_missing(txid) } pub(super) fn record_missing_txid(&self, txid: Txid) -> Result<(), bdk::Error> { - self.lock_missing_txids()?.insert(txid); - Ok(()) + self.tx_misses.record(txid) } pub(super) fn enqueue_pending_tx_miss(&self, txid: Txid) -> Result<(), bdk::Error> { - self.lock_pending_tx_misses()?.insert(txid); - Ok(()) + self.tx_misses.enqueue_pending(txid) + } + + #[cfg(test)] + pub(super) fn record_and_enqueue_missing_txid(&self, txid: Txid) -> Result<(), bdk::Error> { + self.tx_misses.record_and_enqueue(txid) } pub(super) fn pending_tx_miss_queued(&self, txid: &Txid) -> Result { - let pending = self.lock_pending_tx_misses()?; - Ok(pending.contains(txid)) + self.tx_misses.pending_contains(txid) } #[cfg(test)] @@ -373,8 +547,7 @@ impl WalletCache { } pub(super) fn mark_txid_not_missing(&self, txid: &Txid) -> Result<(), bdk::Error> { - self.lock_missing_txids()?.remove(txid); - self.lock_pending_tx_misses()?.remove(txid); + self.tx_misses.clear(txid)?; self.lock_pending_tx_lookups()?.remove(txid); self.lock_forced_tx_miss_retries()?.remove(txid); Ok(()) @@ -424,12 +597,7 @@ impl WalletCache { } pub(super) fn drain_pending_tx_misses(&self, max: usize) -> Result, bdk::Error> { - let mut pending = self.lock_pending_tx_misses()?; - let drained: Vec<_> = pending.iter().take(max).copied().collect(); - for txid in &drained { - pending.remove(txid); - } - Ok(drained) + self.tx_misses.drain_pending(max) } pub(super) fn drain_pending_tx_misses_including( @@ -456,25 +624,21 @@ impl WalletCache { where I: IntoIterator, { - let mut pending = self.lock_pending_tx_misses()?; - pending.extend(txids); - Ok(()) + self.tx_misses.requeue_pending(txids) } pub(super) fn should_batch_resolve_script_misses( &self, threshold: usize, ) -> Result { - let pending = self.lock_pending_script_misses()?; - Ok(pending.len() >= threshold) + self.script_misses.should_batch_pending(threshold) } pub(super) fn should_batch_resolve_tx_misses( &self, threshold: usize, ) -> Result { - let pending = self.lock_pending_tx_misses()?; - Ok(pending.len() >= threshold) + self.tx_misses.should_batch_pending(threshold) } pub(super) fn should_batch_resolve_tx_lookups( @@ -486,14 +650,12 @@ impl WalletCache { } pub(super) fn invalidate(&self) { - Self::clear_even_if_poisoned(&self.script_pubkeys); - Self::clear_even_if_poisoned(&self.transactions); - Self::clear_even_if_poisoned(&self.missing_script_pubkeys); - Self::clear_even_if_poisoned(&self.missing_txids); - Self::clear_even_if_poisoned(&self.pending_script_misses); - Self::clear_even_if_poisoned(&self.pending_tx_misses); - Self::clear_even_if_poisoned(&self.pending_tx_lookups); - Self::clear_even_if_poisoned(&self.forced_tx_miss_retries); + self.script_pubkeys.clear_even_if_poisoned(); + self.transactions.clear_even_if_poisoned(); + self.script_misses.clear_even_if_poisoned(); + self.tx_misses.clear_even_if_poisoned(); + self.pending_tx_lookups.clear_even_if_poisoned(); + self.forced_tx_miss_retries.clear_even_if_poisoned(); self.script_pubkeys_loaded_mask.store(0, Ordering::Release); self.raw_txs_fully_loaded.store(false, Ordering::Release); @@ -501,3 +663,128 @@ impl WalletCache { .store(false, Ordering::Release); } } + +#[cfg(test)] +pub(crate) mod test_support { + use super::*; + use std::sync::{Condvar, OnceLock}; + + #[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)] + pub(crate) enum HookPoint { + BeforeExtendScriptPubkeysInsert, + BeforeExtendTxsInsert, + BeforeExtendSummaryTxsInsert, + } + + #[derive(Default)] + struct HookState { + reached: bool, + released: bool, + } + + struct InstalledChannels { + state: Mutex, + signal: Condvar, + } + + fn hooks() -> &'static Mutex>> { + static HOOKS: OnceLock>>> = OnceLock::new(); + HOOKS.get_or_init(|| Mutex::new(HashMap::new())) + } + + fn serial() -> &'static Mutex<()> { + static SERIAL: OnceLock> = OnceLock::new(); + SERIAL.get_or_init(|| Mutex::new(())) + } + + pub(crate) struct InstalledHook { + point: HookPoint, + _serial: MutexGuard<'static, ()>, + } + + impl InstalledHook { + pub(crate) fn wait_until_reached(&self) { + let hook = hooks() + .lock() + .expect("hooks lock should succeed") + .get(&self.point) + .cloned() + .expect("hook should stay installed while waiting"); + let mut state = hook.state.lock().expect("state lock should succeed"); + while !state.reached { + state = hook.signal.wait(state).expect("wait should succeed"); + } + } + + pub(crate) fn release(&mut self) { + let hook = hooks() + .lock() + .expect("hooks lock should succeed") + .get(&self.point) + .cloned(); + let Some(hook) = hook else { + return; + }; + + let mut state = hook.state.lock().expect("state lock should succeed"); + state.released = true; + hook.signal.notify_all(); + } + } + + impl Drop for InstalledHook { + fn drop(&mut self) { + if let Some(hook) = hooks() + .lock() + .expect("hooks lock should succeed") + .remove(&self.point) + { + let mut state = hook.state.lock().expect("state lock should succeed"); + state.released = true; + hook.signal.notify_all(); + } + } + } + + pub(crate) fn install_hook(point: HookPoint) -> InstalledHook { + let serial = serial().lock().unwrap_or_else(PoisonError::into_inner); + let hook = Arc::new(InstalledChannels { + state: Mutex::new(HookState::default()), + signal: Condvar::new(), + }); + + let previous = hooks() + .lock() + .expect("hooks lock should succeed") + .insert(point, hook); + assert!( + previous.is_none(), + "test hook already installed for {point:?}" + ); + + InstalledHook { + point, + _serial: serial, + } + } + + pub(super) fn pause_at(point: HookPoint) { + let hook = hooks() + .lock() + .expect("hooks lock should succeed") + .get(&point) + .cloned(); + let Some(hook) = hook else { + return; + }; + + let mut state = hook.state.lock().expect("state lock should succeed"); + if !state.reached { + state.reached = true; + hook.signal.notify_all(); + } + while !state.released { + state = hook.signal.wait(state).expect("wait should succeed"); + } + } +} diff --git a/src/bdk/pg/mod.rs b/src/bdk/pg/mod.rs index 20c9437a..8651ccfa 100644 --- a/src/bdk/pg/mod.rs +++ b/src/bdk/pg/mod.rs @@ -138,7 +138,10 @@ impl SqlxWalletDb { #[cfg(test)] mod tests { use super::*; + use crate::bdk::pg::cache::test_support::{install_hook, HookPoint}; use bdk::bitcoin::hashes::Hash; + use std::collections::HashSet; + use std::thread; fn tx_details(txid: Txid) -> TransactionDetails { TransactionDetails { @@ -570,4 +573,300 @@ mod tests { .expect("drain should succeed"); assert!(drained.is_empty()); } + + #[test] + fn wallet_cache_extend_script_pubkeys_clears_script_miss_tracking() { + let cache = WalletCache::new(); + let script = ScriptBuf::from(vec![0x51]); + + cache + .record_and_enqueue_missing_script(script.clone()) + .expect("mark should succeed"); + assert!(cache + .script_marked_missing(script.as_script()) + .expect("lookup should succeed")); + + cache + .extend_script_pubkeys([(script.clone(), (KeychainKind::External, 7))]) + .expect("extend should succeed"); + + assert!(!cache + .script_marked_missing(script.as_script()) + .expect("lookup should succeed")); + let drained = cache + .drain_pending_script_misses(1) + .expect("drain should succeed"); + assert!(drained.is_empty()); + } + + #[test] + fn wallet_cache_insert_script_pubkey_clears_script_miss_tracking() { + let cache = WalletCache::new(); + let script = ScriptBuf::from(vec![0x53]); + + cache + .record_and_enqueue_missing_script(script.clone()) + .expect("mark should succeed"); + + cache + .insert_script_pubkey(script.clone(), (KeychainKind::External, 11)) + .expect("insert should succeed"); + + assert!(!cache + .script_marked_missing(script.as_script()) + .expect("lookup should succeed")); + let drained = cache + .drain_pending_script_misses(10) + .expect("drain should succeed"); + assert!(drained.is_empty()); + } + + #[test] + fn wallet_cache_extend_txs_clears_tx_miss_tracking() { + let cache = WalletCache::new(); + let txid = Txid::from_slice(&[2; 32]).expect("valid txid"); + + cache + .record_and_enqueue_missing_txid(txid) + .expect("mark should succeed"); + + cache + .extend_txs([(txid, tx_details(txid))]) + .expect("extend should succeed"); + + assert!(!cache + .txid_marked_missing(&txid) + .expect("lookup should succeed")); + let drained = cache + .drain_pending_tx_misses(10) + .expect("drain should succeed"); + assert!(drained.is_empty()); + } + + #[test] + fn wallet_cache_remove_tx_records_miss() { + let cache = WalletCache::new(); + let txid = Txid::from_slice(&[3; 32]).expect("valid txid"); + + cache + .insert_tx(txid, tx_details(txid)) + .expect("insert should succeed"); + assert!(!cache + .txid_marked_missing(&txid) + .expect("lookup should succeed")); + + cache.remove_tx(&txid).expect("remove should succeed"); + + assert!(cache + .txid_marked_missing(&txid) + .expect("lookup should succeed")); + } + + #[test] + fn wallet_cache_requeue_and_drain_roundtrip_scripts() { + let cache = WalletCache::new(); + let scripts = [ + ScriptBuf::from(vec![0x61]), + ScriptBuf::from(vec![0x62]), + ScriptBuf::from(vec![0x63]), + ]; + + for script in scripts.iter().cloned() { + cache + .record_and_enqueue_missing_script(script) + .expect("mark should succeed"); + } + + let drained = cache + .drain_pending_script_misses(2) + .expect("drain should succeed"); + assert_eq!(drained.len(), 2); + + cache + .requeue_pending_script_misses(drained) + .expect("requeue should succeed"); + + let roundtrip = cache + .drain_pending_script_misses(10) + .expect("drain should succeed"); + let expected: HashSet<_> = scripts.into_iter().collect(); + let actual: HashSet<_> = roundtrip.into_iter().collect(); + assert_eq!(actual, expected); + } + + #[test] + fn wallet_cache_invalidate_resets_all_state() { + let cache = WalletCache::new(); + let script = ScriptBuf::from(vec![0x71]); + let missing_script = ScriptBuf::from(vec![0x72]); + let txid = Txid::from_slice(&[4; 32]).expect("valid txid"); + let missing_txid = Txid::from_slice(&[5; 32]).expect("valid txid"); + + cache + .insert_script_pubkey(script.clone(), (KeychainKind::External, 21)) + .expect("insert should succeed"); + cache + .insert_tx(txid, tx_details(txid)) + .expect("insert should succeed"); + cache + .record_and_enqueue_missing_script(missing_script.clone()) + .expect("mark should succeed"); + cache + .record_and_enqueue_missing_txid(missing_txid) + .expect("mark should succeed"); + cache.mark_script_pubkeys_loaded(None); + cache.set_raw_txs_fully_loaded(); + + cache.invalidate(); + + assert_eq!( + cache + .get_script_pubkey_path(script.as_script()) + .expect("lookup should succeed"), + None + ); + assert_eq!(cache.get_tx(&txid).expect("lookup should succeed"), None); + assert!(!cache + .script_marked_missing(missing_script.as_script()) + .expect("lookup should succeed")); + assert!(!cache + .txid_marked_missing(&missing_txid) + .expect("lookup should succeed")); + assert!(cache + .drain_pending_script_misses(10) + .expect("drain should succeed") + .is_empty()); + assert!(cache + .drain_pending_tx_misses(10) + .expect("drain should succeed") + .is_empty()); + assert!(!cache.script_pubkeys_fully_loaded(None)); + assert!(!cache.raw_txs_fully_loaded()); + assert!(!cache.summary_txs_fully_loaded()); + } + + #[test] + fn wallet_cache_extend_script_pubkeys_preserves_late_rerecorded_script_miss() { + let cache = WalletCache::new(); + let script = ScriptBuf::from(vec![0x81]); + let path = (KeychainKind::External, 31); + let mut hook = install_hook(HookPoint::BeforeExtendScriptPubkeysInsert); + + let worker_cache = cache.clone(); + let worker_script = script.clone(); + let handle = thread::spawn(move || { + worker_cache + .extend_script_pubkeys([(worker_script, path)]) + .expect("extend should succeed"); + }); + + hook.wait_until_reached(); + cache + .record_and_enqueue_missing_script(script.clone()) + .expect("mark should succeed"); + hook.release(); + handle.join().expect("worker should join"); + + assert_eq!( + cache + .get_script_pubkey_path(script.as_script()) + .expect("lookup should succeed"), + Some(path) + ); + assert!(cache + .script_marked_missing(script.as_script()) + .expect("lookup should succeed")); + let drained = cache + .drain_pending_script_misses(10) + .expect("drain should succeed"); + assert_eq!(drained, vec![script]); + } + + #[test] + fn wallet_cache_extend_txs_preserves_late_rerecorded_tx_miss() { + let cache = WalletCache::new(); + let txid = Txid::from_slice(&[6; 32]).expect("valid txid"); + let mut hook = install_hook(HookPoint::BeforeExtendTxsInsert); + + let worker_cache = cache.clone(); + let handle = thread::spawn(move || { + worker_cache + .extend_txs([(txid, tx_details(txid))]) + .expect("extend should succeed"); + }); + + hook.wait_until_reached(); + cache + .record_and_enqueue_missing_txid(txid) + .expect("mark should succeed"); + hook.release(); + handle.join().expect("worker should join"); + + assert_eq!( + cache.get_tx(&txid).expect("lookup should succeed"), + Some(tx_details(txid)) + ); + assert!(cache + .txid_marked_missing(&txid) + .expect("lookup should succeed")); + let drained = cache + .drain_pending_tx_misses(10) + .expect("drain should succeed"); + assert_eq!(drained, vec![txid]); + } + + #[test] + fn wallet_cache_extend_summary_txs_preserves_raw_tx_during_concurrent_style_update() { + let cache = WalletCache::new(); + let txid = Txid::from_slice(&[7; 32]).expect("valid txid"); + let mut hook = install_hook(HookPoint::BeforeExtendSummaryTxsInsert); + + let mut summary = tx_details(txid); + summary.received = 99; + summary.sent = 12; + summary.fee = Some(5); + summary.confirmation_time = Some(bdk::BlockTime { + height: 200, + timestamp: 456, + }); + + let worker_cache = cache.clone(); + let handle = thread::spawn(move || { + worker_cache + .extend_summary_txs([(txid, summary)]) + .expect("extend should succeed"); + }); + + hook.wait_until_reached(); + let raw_tx = bdk::bitcoin::Transaction { + version: 2, + lock_time: bdk::bitcoin::absolute::LockTime::ZERO, + input: Vec::new(), + output: Vec::new(), + }; + let mut raw_details = tx_details(txid); + raw_details.transaction = Some(raw_tx.clone()); + raw_details.received = 1; + cache + .insert_tx(txid, raw_details) + .expect("insert should succeed"); + hook.release(); + handle.join().expect("worker should join"); + + let merged = cache + .get_tx(&txid) + .expect("lookup should succeed") + .expect("tx should exist"); + assert_eq!(merged.transaction, Some(raw_tx)); + assert_eq!(merged.received, 99); + assert_eq!(merged.sent, 12); + assert_eq!(merged.fee, Some(5)); + assert_eq!( + merged.confirmation_time, + Some(bdk::BlockTime { + height: 200, + timestamp: 456, + }) + ); + } }