Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

139 changes: 128 additions & 11 deletions src/bdk/pg/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,15 @@ pub(super) struct WalletCache {
missing_script_pubkeys: Arc<Mutex<HashSet<ScriptBuf>>>,
missing_txids: Arc<Mutex<HashSet<Txid>>>,
pending_script_misses: Arc<Mutex<HashSet<ScriptBuf>>>,
// Txids confirmed absent from the DB in this process. These are retried in batches to
// recover from races where rows appear after an earlier miss.
pending_tx_misses: Arc<Mutex<HashSet<Txid>>>,
// Txids not yet seen in the in-process cache and not yet known-missing. These are batched
// before we fall back to recording a miss.
pending_tx_lookups: Arc<Mutex<HashSet<Txid>>>,
// Tracks txids that already consumed their one targeted miss-cache retry in this
// SqlxWalletDb lifetime, while still allowing later threshold-driven batch retries.
forced_tx_miss_retries: Arc<Mutex<HashSet<Txid>>>,
// Process-local hint for which keychain script path sets are fully hydrated.
// Bit 0: external, bit 1: internal.
// This is intentionally not synchronized across processes.
Expand Down Expand Up @@ -47,6 +55,8 @@ impl WalletCache {
missing_txids: Arc::new(Mutex::new(HashSet::new())),
pending_script_misses: Arc::new(Mutex::new(HashSet::new())),
pending_tx_misses: Arc::new(Mutex::new(HashSet::new())),
pending_tx_lookups: Arc::new(Mutex::new(HashSet::new())),
forced_tx_miss_retries: Arc::new(Mutex::new(HashSet::new())),
script_pubkeys_loaded_mask: Arc::new(AtomicU8::new(0)),
raw_txs_fully_loaded: Arc::new(AtomicBool::new(false)),
summary_txs_fully_loaded: Arc::new(AtomicBool::new(false)),
Expand Down Expand Up @@ -99,6 +109,14 @@ impl WalletCache {
self.lock_with_error(&self.pending_tx_misses, "pending tx misses cache")
}

fn lock_pending_tx_lookups(&self) -> Result<MutexGuard<'_, HashSet<Txid>>, bdk::Error> {
self.lock_with_error(&self.pending_tx_lookups, "pending tx lookups cache")
}

fn lock_forced_tx_miss_retries(&self) -> Result<MutexGuard<'_, HashSet<Txid>>, bdk::Error> {
self.lock_with_error(&self.forced_tx_miss_retries, "forced tx miss retries cache")
}

pub(super) fn get_script_pubkey_path(
&self,
script: &Script,
Expand Down Expand Up @@ -173,24 +191,28 @@ impl WalletCache {
Ok(cache.get(txid).cloned())
}

pub(super) fn insert_tx(&self, txid: Txid, tx: TransactionDetails) -> Result<(), bdk::Error> {
{
let mut cache = self.lock_transactions()?;
cache.insert(txid, tx);
}
self.mark_txid_not_missing(&txid)?;
Ok(())
}

fn clear_tx_miss_tracking<'a, I>(&self, txids: I) -> Result<(), bdk::Error>
where
I: IntoIterator<Item = &'a Txid>,
{
let mut missing = self.lock_missing_txids()?;
let mut pending = self.lock_pending_tx_misses()?;
let mut forced_retries = self.lock_forced_tx_miss_retries()?;
for txid in txids {
missing.remove(txid);
pending.remove(txid);
forced_retries.remove(txid);
}
Ok(())
}

fn clear_pending_tx_lookups<'a, I>(&self, txids: I) -> Result<(), bdk::Error>
where
I: IntoIterator<Item = &'a Txid>,
{
let mut pending = self.lock_pending_tx_lookups()?;
for txid in txids {
pending.remove(txid);
}
Ok(())
}
Expand All @@ -201,6 +223,7 @@ impl WalletCache {
{
let entries: Vec<_> = entries.into_iter().collect();
self.clear_tx_miss_tracking(entries.iter().map(|(txid, _)| txid))?;
self.clear_pending_tx_lookups(entries.iter().map(|(txid, _)| txid))?;
let mut cache = self.lock_transactions()?;
cache.extend(entries);
Ok(())
Expand All @@ -212,6 +235,7 @@ impl WalletCache {
{
let entries: Vec<_> = entries.into_iter().collect();
self.clear_tx_miss_tracking(entries.iter().map(|(txid, _)| txid))?;
self.clear_pending_tx_lookups(entries.iter().map(|(txid, _)| txid))?;
let mut cache = self.lock_transactions()?;
for (txid, mut summary) in entries {
// Summary refreshes may run after raw tx bytes were already hydrated. Preserve any
Expand Down Expand Up @@ -264,6 +288,9 @@ impl WalletCache {
let mut cache = self.lock_transactions()?;
cache.remove(txid);
}
{
self.lock_forced_tx_miss_retries()?.remove(txid);
}
self.record_missing_txid(*txid)?;
Ok(())
}
Expand Down Expand Up @@ -324,15 +351,75 @@ impl WalletCache {
Ok(())
}

pub(super) fn record_and_enqueue_missing_txid(&self, txid: Txid) -> Result<(), bdk::Error> {
self.lock_missing_txids()?.insert(txid);
pub(super) fn enqueue_pending_tx_miss(&self, txid: Txid) -> Result<(), bdk::Error> {
self.lock_pending_tx_misses()?.insert(txid);
Ok(())
}

pub(super) fn pending_tx_miss_queued(&self, txid: &Txid) -> Result<bool, bdk::Error> {
let pending = self.lock_pending_tx_misses()?;
Ok(pending.contains(txid))
}

#[cfg(test)]
pub(super) fn forced_tx_miss_retry_recorded(&self, txid: &Txid) -> Result<bool, bdk::Error> {
let forced_retries = self.lock_forced_tx_miss_retries()?;
Ok(forced_retries.contains(txid))
}

pub(super) fn claim_forced_tx_miss_retry(&self, txid: Txid) -> Result<bool, bdk::Error> {
let mut forced_retries = self.lock_forced_tx_miss_retries()?;
Ok(forced_retries.insert(txid))
}

pub(super) fn mark_txid_not_missing(&self, txid: &Txid) -> Result<(), bdk::Error> {
self.lock_missing_txids()?.remove(txid);
self.lock_pending_tx_misses()?.remove(txid);
self.lock_pending_tx_lookups()?.remove(txid);
self.lock_forced_tx_miss_retries()?.remove(txid);
Ok(())
}

pub(super) fn enqueue_pending_tx_lookup(&self, txid: Txid) -> Result<(), bdk::Error> {
self.lock_pending_tx_lookups()?.insert(txid);
Ok(())
}

pub(super) fn drain_pending_tx_lookups(&self, max: usize) -> Result<Vec<Txid>, bdk::Error> {
let mut pending = self.lock_pending_tx_lookups()?;
let drained: Vec<_> = pending.iter().take(max).copied().collect();
for txid in &drained {
pending.remove(txid);
}
Ok(drained)
}

pub(super) fn drain_pending_tx_lookups_including(
&self,
required_txid: Txid,
max: usize,
) -> Result<Vec<Txid>, bdk::Error> {
let mut pending = self.lock_pending_tx_lookups()?;
let mut drained = Vec::with_capacity(max.max(1));
drained.push(required_txid);
pending.remove(&required_txid);

let additional = max.max(1).saturating_sub(1);
let rest: Vec<_> = pending.iter().take(additional).copied().collect();
for txid in &rest {
pending.remove(txid);
}
drained.extend(rest);

Ok(drained)
}

pub(super) fn requeue_pending_tx_lookups<I>(&self, txids: I) -> Result<(), bdk::Error>
where
I: IntoIterator<Item = Txid>,
{
let mut pending = self.lock_pending_tx_lookups()?;
pending.extend(txids);
Ok(())
}

Expand All @@ -345,6 +432,26 @@ impl WalletCache {
Ok(drained)
}

pub(super) fn drain_pending_tx_misses_including(
&self,
required_txid: Txid,
max: usize,
) -> Result<Vec<Txid>, bdk::Error> {
let mut pending = self.lock_pending_tx_misses()?;
let mut drained = Vec::with_capacity(max.max(1));
drained.push(required_txid);
pending.remove(&required_txid);

let additional = max.max(1).saturating_sub(1);
let rest: Vec<_> = pending.iter().take(additional).copied().collect();
for txid in &rest {
pending.remove(txid);
}
drained.extend(rest);

Ok(drained)
}

pub(super) fn requeue_pending_tx_misses<I>(&self, txids: I) -> Result<(), bdk::Error>
where
I: IntoIterator<Item = Txid>,
Expand All @@ -370,13 +477,23 @@ impl WalletCache {
Ok(pending.len() >= threshold)
}

pub(super) fn should_batch_resolve_tx_lookups(
&self,
threshold: usize,
) -> Result<bool, bdk::Error> {
let pending = self.lock_pending_tx_lookups()?;
Ok(pending.len() >= threshold)
}

pub(super) fn invalidate(&self) {
Self::clear_even_if_poisoned(&self.script_pubkeys);
Self::clear_even_if_poisoned(&self.transactions);
Self::clear_even_if_poisoned(&self.missing_script_pubkeys);
Self::clear_even_if_poisoned(&self.missing_txids);
Self::clear_even_if_poisoned(&self.pending_script_misses);
Self::clear_even_if_poisoned(&self.pending_tx_misses);
Self::clear_even_if_poisoned(&self.pending_tx_lookups);
Self::clear_even_if_poisoned(&self.forced_tx_miss_retries);

self.script_pubkeys_loaded_mask.store(0, Ordering::Release);
self.raw_txs_fully_loaded.store(false, Ordering::Release);
Expand Down
2 changes: 1 addition & 1 deletion src/bdk/pg/db_traits.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ mod tests {

let txid = Txid::all_zeros();
db.cache
.insert_tx(txid, tx_details(txid))
.extend_txs([(txid, tx_details(txid))])
.expect("insert should succeed");
db.cache.set_raw_txs_fully_loaded();

Expand Down
Loading
Loading