diff --git a/.sqlx/query-1c966653b607ef9cd8a305aac131cf64a17ab0803fdc55db8b55a14fced7178d.json b/.sqlx/query-1c966653b607ef9cd8a305aac131cf64a17ab0803fdc55db8b55a14fced7178d.json deleted file mode 100644 index 18d2d33c..00000000 --- a/.sqlx/query-1c966653b607ef9cd8a305aac131cf64a17ab0803fdc55db8b55a14fced7178d.json +++ /dev/null @@ -1,22 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "\n SELECT details_json FROM bdk_transactions WHERE keychain_id = $1 AND deleted_at IS NULL", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "details_json", - "type_info": "Jsonb" - } - ], - "parameters": { - "Left": [ - "Uuid" - ] - }, - "nullable": [ - false - ] - }, - "hash": "1c966653b607ef9cd8a305aac131cf64a17ab0803fdc55db8b55a14fced7178d" -} diff --git a/.sqlx/query-68df480b792fa014a5d66aa05f2d7caa1305d81382f32f2b55c36d31fa8e3583.json b/.sqlx/query-68df480b792fa014a5d66aa05f2d7caa1305d81382f32f2b55c36d31fa8e3583.json new file mode 100644 index 00000000..e68533b2 --- /dev/null +++ b/.sqlx/query-68df480b792fa014a5d66aa05f2d7caa1305d81382f32f2b55c36d31fa8e3583.json @@ -0,0 +1,30 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT tx_id, details_json\n FROM bdk_transactions\n WHERE keychain_id = $1\n AND deleted_at IS NULL\n AND ($2::TEXT IS NULL OR tx_id > $2)\n ORDER BY tx_id ASC\n LIMIT $3", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "tx_id", + "type_info": "Varchar" + }, + { + "ordinal": 1, + "name": "details_json", + "type_info": "Jsonb" + } + ], + "parameters": { + "Left": [ + "Uuid", + "Text", + "Int8" + ] + }, + "nullable": [ + false, + false + ] + }, + "hash": "68df480b792fa014a5d66aa05f2d7caa1305d81382f32f2b55c36d31fa8e3583" +} diff --git a/.sqlx/query-846df15d0a9709ad69c5ff8e6c3e6679816cf5c9f86ea2b4a9d1b95458ec56a1.json b/.sqlx/query-7024ed177438c56be05fb4c9e2b221f33154ca07f31a5b376db94d1bd6b26257.json similarity index 76% rename from .sqlx/query-846df15d0a9709ad69c5ff8e6c3e6679816cf5c9f86ea2b4a9d1b95458ec56a1.json rename to .sqlx/query-7024ed177438c56be05fb4c9e2b221f33154ca07f31a5b376db94d1bd6b26257.json index a1b78b6f..13469b4f 100644 --- a/.sqlx/query-846df15d0a9709ad69c5ff8e6c3e6679816cf5c9f86ea2b4a9d1b95458ec56a1.json +++ b/.sqlx/query-7024ed177438c56be05fb4c9e2b221f33154ca07f31a5b376db94d1bd6b26257.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "SELECT script, keychain_kind as \"keychain_kind: BdkKeychainKind\", path FROM bdk_script_pubkeys\n WHERE keychain_id = $1", + "query": "SELECT script, keychain_kind as \"keychain_kind: BdkKeychainKind\", path\n FROM bdk_script_pubkeys\n WHERE keychain_id = $1 AND script_hex = ANY($2)", "describe": { "columns": [ { @@ -31,7 +31,8 @@ ], "parameters": { "Left": [ - "Uuid" + "Uuid", + "TextArray" ] }, "nullable": [ @@ -40,5 +41,5 @@ false ] }, - "hash": "846df15d0a9709ad69c5ff8e6c3e6679816cf5c9f86ea2b4a9d1b95458ec56a1" + "hash": "7024ed177438c56be05fb4c9e2b221f33154ca07f31a5b376db94d1bd6b26257" } diff --git a/.sqlx/query-dfa815aeb090f27d4fd45326c706b23f3f84c9e843addc1b05160fd0e28fdf2c.json b/.sqlx/query-8d9a4cd5b4a81777199efd165ab69068c41d743c4768e992745d334acf147ef5.json similarity index 55% rename from .sqlx/query-dfa815aeb090f27d4fd45326c706b23f3f84c9e843addc1b05160fd0e28fdf2c.json rename to .sqlx/query-8d9a4cd5b4a81777199efd165ab69068c41d743c4768e992745d334acf147ef5.json index 2e2c29dd..dd1109d9 100644 --- a/.sqlx/query-dfa815aeb090f27d4fd45326c706b23f3f84c9e843addc1b05160fd0e28fdf2c.json +++ b/.sqlx/query-8d9a4cd5b4a81777199efd165ab69068c41d743c4768e992745d334acf147ef5.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n SELECT tx_id, sent, height,\n (details_json->>'received')::BIGINT AS \"received?\",\n (details_json->>'fee')::BIGINT AS \"fee?\",\n (details_json->'confirmation_time'->>'timestamp')::BIGINT AS \"confirmation_timestamp?\"\n FROM bdk_transactions\n WHERE keychain_id = $1 AND deleted_at IS NULL", + "query": "\n SELECT tx_id, sent, height,\n (details_json->>'received')::BIGINT AS \"received?\",\n (details_json->>'fee')::BIGINT AS \"fee?\",\n (details_json->'confirmation_time'->>'timestamp')::BIGINT AS \"confirmation_timestamp?\"\n FROM bdk_transactions\n WHERE keychain_id = $1\n AND deleted_at IS NULL\n AND ($2::TEXT IS NULL OR tx_id > $2)\n ORDER BY tx_id ASC\n LIMIT $3", "describe": { "columns": [ { @@ -36,7 +36,9 @@ ], "parameters": { "Left": [ - "Uuid" + "Uuid", + "Text", + "Int8" ] }, "nullable": [ @@ -48,5 +50,5 @@ null ] }, - "hash": "dfa815aeb090f27d4fd45326c706b23f3f84c9e843addc1b05160fd0e28fdf2c" + "hash": "8d9a4cd5b4a81777199efd165ab69068c41d743c4768e992745d334acf147ef5" } diff --git a/.sqlx/query-a49bfbc8c8ca875c146ac678f5f5633b758af6a7070b939032394b6525d4a7c3.json b/.sqlx/query-de37549b14b9fd94158abdabeb2f5ed88144d562a5eae47e8a3f1e9d16e7c92a.json similarity index 72% rename from .sqlx/query-a49bfbc8c8ca875c146ac678f5f5633b758af6a7070b939032394b6525d4a7c3.json rename to .sqlx/query-de37549b14b9fd94158abdabeb2f5ed88144d562a5eae47e8a3f1e9d16e7c92a.json index dd5e8b89..19997f9a 100644 --- a/.sqlx/query-a49bfbc8c8ca875c146ac678f5f5633b758af6a7070b939032394b6525d4a7c3.json +++ b/.sqlx/query-de37549b14b9fd94158abdabeb2f5ed88144d562a5eae47e8a3f1e9d16e7c92a.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "SELECT script, keychain_kind as \"keychain_kind: BdkKeychainKind\", path FROM bdk_script_pubkeys\n WHERE keychain_id = $1 AND keychain_kind = $2", + "query": "SELECT script, keychain_kind as \"keychain_kind: BdkKeychainKind\", path\n FROM bdk_script_pubkeys\n WHERE keychain_id = $1\n AND keychain_kind = $2\n AND ($3::INT4 IS NULL OR path > $3)\n ORDER BY path ASC\n LIMIT $4", "describe": { "columns": [ { @@ -42,7 +42,9 @@ ] } } - } + }, + "Int4", + "Int8" ] }, "nullable": [ @@ -51,5 +53,5 @@ false ] }, - "hash": "a49bfbc8c8ca875c146ac678f5f5633b758af6a7070b939032394b6525d4a7c3" + "hash": "de37549b14b9fd94158abdabeb2f5ed88144d562a5eae47e8a3f1e9d16e7c92a" } diff --git a/.sqlx/query-fbfacf3613c2dbe990a8c02ea2755ea2c981b717f87a42e83cd934d2abf33443.json b/.sqlx/query-fbfacf3613c2dbe990a8c02ea2755ea2c981b717f87a42e83cd934d2abf33443.json new file mode 100644 index 00000000..032f5977 --- /dev/null +++ b/.sqlx/query-fbfacf3613c2dbe990a8c02ea2755ea2c981b717f87a42e83cd934d2abf33443.json @@ -0,0 +1,29 @@ +{ + "db_name": "PostgreSQL", + "query": "\n SELECT tx_id, details_json\n FROM bdk_transactions\n WHERE keychain_id = $1\n AND deleted_at IS NULL\n AND tx_id = ANY($2)", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "tx_id", + "type_info": "Varchar" + }, + { + "ordinal": 1, + "name": "details_json", + "type_info": "Jsonb" + } + ], + "parameters": { + "Left": [ + "Uuid", + "TextArray" + ] + }, + "nullable": [ + false, + false + ] + }, + "hash": "fbfacf3613c2dbe990a8c02ea2755ea2c981b717f87a42e83cd934d2abf33443" +} diff --git a/src/bdk/pg/cache.rs b/src/bdk/pg/cache.rs new file mode 100644 index 00000000..4a94b8fe --- /dev/null +++ b/src/bdk/pg/cache.rs @@ -0,0 +1,386 @@ +use bdk::{ + bitcoin::{Script, ScriptBuf, Txid}, + KeychainKind, TransactionDetails, +}; +use std::{ + collections::{HashMap, HashSet}, + sync::atomic::{AtomicBool, AtomicU8, Ordering}, + sync::{Arc, Mutex, MutexGuard, PoisonError}, +}; + +use super::{ScriptPubkeyCache, SqlxWalletDb, TransactionCache}; + +#[derive(Clone)] +pub(super) struct WalletCache { + script_pubkeys: Arc>, + transactions: Arc>, + missing_script_pubkeys: Arc>>, + missing_txids: Arc>>, + pending_script_misses: Arc>>, + pending_tx_misses: Arc>>, + // Process-local hint for which keychain script path sets are fully hydrated. + // Bit 0: external, bit 1: internal. + // This is intentionally not synchronized across processes. + script_pubkeys_loaded_mask: Arc, + // Process-local hint: true means this instance has already hydrated raw tx details + // from the DB at least once. It is intentionally not synchronized across processes. + raw_txs_fully_loaded: Arc, + // Process-local hint: true means summary tx details were fully hydrated once. + summary_txs_fully_loaded: Arc, +} + +impl WalletCache { + fn clear_even_if_poisoned(mutex: &Mutex) + where + T: Default, + { + let mut guard = mutex.lock().unwrap_or_else(PoisonError::into_inner); + *guard = T::default(); + mutex.clear_poison(); + } + + pub(super) fn new() -> Self { + Self { + script_pubkeys: Arc::new(Mutex::new(HashMap::new())), + transactions: Arc::new(Mutex::new(HashMap::new())), + missing_script_pubkeys: Arc::new(Mutex::new(HashSet::new())), + missing_txids: Arc::new(Mutex::new(HashSet::new())), + pending_script_misses: Arc::new(Mutex::new(HashSet::new())), + pending_tx_misses: Arc::new(Mutex::new(HashSet::new())), + script_pubkeys_loaded_mask: Arc::new(AtomicU8::new(0)), + raw_txs_fully_loaded: Arc::new(AtomicBool::new(false)), + summary_txs_fully_loaded: Arc::new(AtomicBool::new(false)), + } + } + + fn lock_with_error<'a, T>( + &self, + mutex: &'a Mutex, + context: &'static str, + ) -> Result, bdk::Error> { + mutex + .lock() + .map_err(|_| bdk::Error::Generic(format!("{context} lock poisoned"))) + } + + fn script_pubkey_mask_for(keychain: Option) -> u8 { + const EXTERNAL: u8 = 1; + const INTERNAL: u8 = 2; + match keychain { + Some(KeychainKind::External) => EXTERNAL, + Some(KeychainKind::Internal) => INTERNAL, + None => EXTERNAL | INTERNAL, + } + } + + fn lock_script_pubkeys(&self) -> Result, bdk::Error> { + self.lock_with_error(&self.script_pubkeys, "script pubkeys cache") + } + + fn lock_transactions(&self) -> Result, bdk::Error> { + self.lock_with_error(&self.transactions, "transactions cache") + } + + fn lock_missing_script_pubkeys( + &self, + ) -> Result>, bdk::Error> { + self.lock_with_error(&self.missing_script_pubkeys, "missing script pubkeys cache") + } + + fn lock_missing_txids(&self) -> Result>, bdk::Error> { + self.lock_with_error(&self.missing_txids, "missing txids cache") + } + + fn lock_pending_script_misses(&self) -> Result>, bdk::Error> { + self.lock_with_error(&self.pending_script_misses, "pending script misses cache") + } + + fn lock_pending_tx_misses(&self) -> Result>, bdk::Error> { + self.lock_with_error(&self.pending_tx_misses, "pending tx misses cache") + } + + pub(super) fn get_script_pubkey_path( + &self, + script: &Script, + ) -> Result, bdk::Error> { + let cache = self.lock_script_pubkeys()?; + Ok(cache.get(script).copied()) + } + + pub(super) fn insert_script_pubkey( + &self, + script: ScriptBuf, + path: (KeychainKind, u32), + ) -> Result<(), bdk::Error> { + { + let mut cache = self.lock_script_pubkeys()?; + cache.insert(script.clone(), path); + } + self.mark_script_not_missing(&script)?; + Ok(()) + } + + fn clear_script_miss_tracking<'a, I>(&self, scripts: I) -> Result<(), bdk::Error> + where + I: IntoIterator, + { + let mut missing = self.lock_missing_script_pubkeys()?; + let mut pending = self.lock_pending_script_misses()?; + for script in scripts { + missing.remove(script.as_script()); + pending.remove(script); + } + Ok(()) + } + + pub(super) fn extend_script_pubkeys(&self, entries: I) -> Result<(), bdk::Error> + where + I: IntoIterator, + { + let entries: Vec<_> = entries.into_iter().collect(); + self.clear_script_miss_tracking(entries.iter().map(|(script, _)| script))?; + let mut cache = self.lock_script_pubkeys()?; + cache.extend(entries); + Ok(()) + } + + pub(super) fn all_script_pubkeys( + &self, + keychain: Option, + ) -> Result, bdk::Error> { + let cache = self.lock_script_pubkeys()?; + Ok(cache + .iter() + .filter(|(_, (kind, _))| keychain.is_none_or(|k| *kind == k)) + .map(|(script, _)| script.clone()) + .collect()) + } + + pub(super) fn script_pubkeys_fully_loaded(&self, keychain: Option) -> bool { + let required_mask = Self::script_pubkey_mask_for(keychain); + let loaded_mask = self.script_pubkeys_loaded_mask.load(Ordering::Acquire); + loaded_mask & required_mask == required_mask + } + + pub(super) fn mark_script_pubkeys_loaded(&self, keychain: Option) { + let mask = Self::script_pubkey_mask_for(keychain); + self.script_pubkeys_loaded_mask + .fetch_or(mask, Ordering::Release); + } + + pub(super) fn get_tx(&self, txid: &Txid) -> Result, bdk::Error> { + let cache = self.lock_transactions()?; + Ok(cache.get(txid).cloned()) + } + + pub(super) fn insert_tx(&self, txid: Txid, tx: TransactionDetails) -> Result<(), bdk::Error> { + { + let mut cache = self.lock_transactions()?; + cache.insert(txid, tx); + } + self.mark_txid_not_missing(&txid)?; + Ok(()) + } + + fn clear_tx_miss_tracking<'a, I>(&self, txids: I) -> Result<(), bdk::Error> + where + I: IntoIterator, + { + let mut missing = self.lock_missing_txids()?; + let mut pending = self.lock_pending_tx_misses()?; + for txid in txids { + missing.remove(txid); + pending.remove(txid); + } + Ok(()) + } + + pub(super) fn extend_txs(&self, entries: I) -> Result<(), bdk::Error> + where + I: IntoIterator, + { + let entries: Vec<_> = entries.into_iter().collect(); + self.clear_tx_miss_tracking(entries.iter().map(|(txid, _)| txid))?; + let mut cache = self.lock_transactions()?; + cache.extend(entries); + Ok(()) + } + + pub(super) fn extend_summary_txs(&self, entries: I) -> Result<(), bdk::Error> + where + I: IntoIterator, + { + let entries: Vec<_> = entries.into_iter().collect(); + self.clear_tx_miss_tracking(entries.iter().map(|(txid, _)| txid))?; + let mut cache = self.lock_transactions()?; + for (txid, mut summary) in entries { + // Summary refreshes may run after raw tx bytes were already hydrated. Preserve any + // cached raw transaction payload while applying fresh DB metadata fields. + // `iter_txs` overlays in-memory batch writes afterwards, so uncommitted updates still + // take precedence in returned views. + if let Some(raw_tx) = cache + .get(&txid) + .and_then(|existing| existing.transaction.clone()) + { + summary.transaction = Some(raw_tx); + } + cache.insert(txid, summary); + } + Ok(()) + } + + pub(super) fn all_txs(&self) -> Result, bdk::Error> { + let cache = self.lock_transactions()?; + Ok(cache.values().cloned().collect()) + } + + pub(super) fn all_summary_txs(&self) -> Result, bdk::Error> { + let cache = self.lock_transactions()?; + Ok(cache + .values() + .map(|tx| (tx.txid, SqlxWalletDb::summary_tx_from_ref(tx))) + .collect()) + } + + pub(super) fn raw_txs_fully_loaded(&self) -> bool { + self.raw_txs_fully_loaded.load(Ordering::Acquire) + } + + pub(super) fn set_raw_txs_fully_loaded(&self) { + self.raw_txs_fully_loaded.store(true, Ordering::Release); + self.summary_txs_fully_loaded.store(true, Ordering::Release); + } + + pub(super) fn summary_txs_fully_loaded(&self) -> bool { + self.summary_txs_fully_loaded.load(Ordering::Acquire) + } + + pub(super) fn set_summary_txs_fully_loaded(&self) { + self.summary_txs_fully_loaded.store(true, Ordering::Release); + } + + pub(super) fn remove_tx(&self, txid: &Txid) -> Result<(), bdk::Error> { + { + let mut cache = self.lock_transactions()?; + cache.remove(txid); + } + self.record_missing_txid(*txid)?; + Ok(()) + } + + pub(super) fn script_marked_missing(&self, script: &Script) -> Result { + let missing = self.lock_missing_script_pubkeys()?; + Ok(missing.contains(script)) + } + + pub(super) fn record_missing_script(&self, script: ScriptBuf) -> Result<(), bdk::Error> { + self.lock_missing_script_pubkeys()?.insert(script); + Ok(()) + } + + pub(super) fn record_and_enqueue_missing_script( + &self, + script: ScriptBuf, + ) -> Result<(), bdk::Error> { + self.lock_missing_script_pubkeys()?.insert(script.clone()); + self.lock_pending_script_misses()?.insert(script); + Ok(()) + } + + pub(super) fn mark_script_not_missing(&self, script: &Script) -> Result<(), bdk::Error> { + self.lock_missing_script_pubkeys()?.remove(script); + self.lock_pending_script_misses()?.remove(script); + Ok(()) + } + + pub(super) fn drain_pending_script_misses( + &self, + max: usize, + ) -> Result, bdk::Error> { + let mut pending = self.lock_pending_script_misses()?; + let drained: Vec<_> = pending.iter().take(max).cloned().collect(); + for script in &drained { + pending.remove(script); + } + Ok(drained) + } + + pub(super) fn requeue_pending_script_misses(&self, scripts: I) -> Result<(), bdk::Error> + where + I: IntoIterator, + { + let mut pending = self.lock_pending_script_misses()?; + pending.extend(scripts); + Ok(()) + } + + pub(super) fn txid_marked_missing(&self, txid: &Txid) -> Result { + let missing = self.lock_missing_txids()?; + Ok(missing.contains(txid)) + } + + pub(super) fn record_missing_txid(&self, txid: Txid) -> Result<(), bdk::Error> { + self.lock_missing_txids()?.insert(txid); + Ok(()) + } + + pub(super) fn record_and_enqueue_missing_txid(&self, txid: Txid) -> Result<(), bdk::Error> { + self.lock_missing_txids()?.insert(txid); + self.lock_pending_tx_misses()?.insert(txid); + Ok(()) + } + + pub(super) fn mark_txid_not_missing(&self, txid: &Txid) -> Result<(), bdk::Error> { + self.lock_missing_txids()?.remove(txid); + self.lock_pending_tx_misses()?.remove(txid); + Ok(()) + } + + pub(super) fn drain_pending_tx_misses(&self, max: usize) -> Result, bdk::Error> { + let mut pending = self.lock_pending_tx_misses()?; + let drained: Vec<_> = pending.iter().take(max).copied().collect(); + for txid in &drained { + pending.remove(txid); + } + Ok(drained) + } + + pub(super) fn requeue_pending_tx_misses(&self, txids: I) -> Result<(), bdk::Error> + where + I: IntoIterator, + { + let mut pending = self.lock_pending_tx_misses()?; + pending.extend(txids); + Ok(()) + } + + pub(super) fn should_batch_resolve_script_misses( + &self, + threshold: usize, + ) -> Result { + let pending = self.lock_pending_script_misses()?; + Ok(pending.len() >= threshold) + } + + pub(super) fn should_batch_resolve_tx_misses( + &self, + threshold: usize, + ) -> Result { + let pending = self.lock_pending_tx_misses()?; + Ok(pending.len() >= threshold) + } + + pub(super) fn invalidate(&self) { + Self::clear_even_if_poisoned(&self.script_pubkeys); + Self::clear_even_if_poisoned(&self.transactions); + Self::clear_even_if_poisoned(&self.missing_script_pubkeys); + Self::clear_even_if_poisoned(&self.missing_txids); + Self::clear_even_if_poisoned(&self.pending_script_misses); + Self::clear_even_if_poisoned(&self.pending_tx_misses); + + self.script_pubkeys_loaded_mask.store(0, Ordering::Release); + self.raw_txs_fully_loaded.store(false, Ordering::Release); + self.summary_txs_fully_loaded + .store(false, Ordering::Release); + } +} diff --git a/src/bdk/pg/db_traits.rs b/src/bdk/pg/db_traits.rs new file mode 100644 index 00000000..e9866772 --- /dev/null +++ b/src/bdk/pg/db_traits.rs @@ -0,0 +1,433 @@ +use bdk::{ + bitcoin::{blockdata::transaction::OutPoint, Script, ScriptBuf, Transaction, Txid}, + database::{BatchDatabase, BatchOperations, Database, SyncTime}, + KeychainKind, LocalUtxo, TransactionDetails, +}; + +use super::{ + convert::BdkKeychainKind, lookups::TxLookupMode, ScriptPubkeys, SqlxWalletDb, Transactions, + Utxos, WalletBatchState, +}; + +impl BatchOperations for SqlxWalletDb { + #[tracing::instrument(name = "bdk.batch.set_script_pubkey", skip_all, err)] + fn set_script_pubkey( + &mut self, + script: &Script, + keychain: KeychainKind, + path: u32, + ) -> Result<(), bdk::Error> { + self.batch.addresses.insert(script.into(), (keychain, path)); + self.cache.mark_script_not_missing(script)?; + Ok(()) + } + + #[tracing::instrument(name = "bdk.batch.set_utxo", skip_all, err)] + fn set_utxo(&mut self, utxo: &LocalUtxo) -> Result<(), bdk::Error> { + self.batch.utxos.push(utxo.clone()); + Ok(()) + } + + #[tracing::instrument(name = "bdk.batch.set_raw_tx", skip_all, err)] + fn set_raw_tx(&mut self, _: &Transaction) -> Result<(), bdk::Error> { + Err(Self::unsupported_operation("set_raw_tx")) + } + + #[tracing::instrument(name = "bdk.batch.set_tx", skip_all, err)] + fn set_tx(&mut self, tx: &TransactionDetails) -> Result<(), bdk::Error> { + self.batch.txs.insert(tx.txid, tx.clone()); + self.cache.mark_txid_not_missing(&tx.txid)?; + Ok(()) + } + + #[tracing::instrument(name = "bdk.batch.set_last_index", skip_all, err)] + fn set_last_index(&mut self, kind: KeychainKind, idx: u32) -> Result<(), bdk::Error> { + // NOTE: This write is intentionally immediate because BDK may call it outside of + // `commit_batch` flow. + self.ctx + .rt + .block_on(async { self.indexes_repo().persist_last_index(kind, idx).await }) + } + + #[tracing::instrument(name = "bdk.batch.set_sync_time", skip_all, err)] + fn set_sync_time(&mut self, time: SyncTime) -> Result<(), bdk::Error> { + // NOTE: This write is intentionally immediate because BDK may call it outside of + // `commit_batch` flow. + self.ctx + .rt + .block_on(async { self.sync_times_repo().persist(time).await }) + } + + #[tracing::instrument(name = "bdk.batch.del_script_pubkey_from_path", skip_all, err)] + fn del_script_pubkey_from_path( + &mut self, + _: KeychainKind, + _: u32, + ) -> Result, bdk::Error> { + Err(Self::unsupported_operation("del_script_pubkey_from_path")) + } + + #[tracing::instrument(name = "bdk.batch.del_path_from_script_pubkey", skip_all, err)] + fn del_path_from_script_pubkey( + &mut self, + _: &Script, + ) -> Result, bdk::Error> { + Err(Self::unsupported_operation("del_path_from_script_pubkey")) + } + + #[tracing::instrument(name = "bdk.batch.del_utxo", skip_all, err)] + fn del_utxo(&mut self, outpoint: &OutPoint) -> Result, bdk::Error> { + self.ctx + .rt + .block_on(async { self.utxos_repo().delete(outpoint).await }) + } + + #[tracing::instrument(name = "bdk.batch.del_raw_tx", skip_all, err)] + fn del_raw_tx(&mut self, _: &Txid) -> Result, bdk::Error> { + Err(Self::unsupported_operation("del_raw_tx")) + } + + #[tracing::instrument(name = "bdk.batch.del_tx", skip_all, err)] + fn del_tx( + &mut self, + tx_id: &Txid, + _include_raw: bool, + ) -> Result, bdk::Error> { + let deleted = self + .ctx + .rt + .block_on(async { self.transactions_repo().delete(tx_id).await })?; + + self.batch.txs.remove(tx_id); + + if deleted.is_some() { + self.cache.remove_tx(tx_id)?; + } + + Ok(deleted) + } + + #[tracing::instrument(name = "bdk.batch.del_last_index", skip_all, err)] + fn del_last_index(&mut self, _: KeychainKind) -> Result, bdk::Error> { + Err(Self::unsupported_operation("del_last_index")) + } + + #[tracing::instrument(name = "bdk.batch.del_sync_time", skip_all, err)] + fn del_sync_time(&mut self) -> Result, bdk::Error> { + Err(Self::unsupported_operation("del_sync_time")) + } +} + +impl Database for SqlxWalletDb { + #[tracing::instrument(name = "bdk.db.check_descriptor_checksum", skip_all, err)] + fn check_descriptor_checksum( + &mut self, + keychain: KeychainKind, + script_bytes: B, + ) -> Result<(), bdk::Error> + where + B: AsRef<[u8]>, + { + self.ctx.rt.block_on(async { + let checksums = self.descriptor_checksums_repo(); + checksums + .check_or_persist_descriptor_checksum(keychain, script_bytes.as_ref()) + .await?; + + Ok(()) + }) + } + + #[tracing::instrument(name = "bdk.db.iter_script_pubkeys", skip_all, err)] + fn iter_script_pubkeys( + &self, + keychain: Option, + ) -> Result, bdk::Error> { + if self.cache.script_pubkeys_fully_loaded(keychain) { + return self.cache.all_script_pubkeys(keychain); + } + + let scripts_with_paths = self.ctx.rt.block_on(async { + self.script_pubkeys_repo() + .list_scripts_with_paths(keychain) + .await + })?; + + Self::cache_loaded_script_pubkeys(&self.cache, keychain, scripts_with_paths) + } + + #[tracing::instrument(name = "bdk.db.iter_utxos", skip_all, err)] + fn iter_utxos(&self) -> Result, bdk::Error> { + self.ctx + .rt + .block_on(async { self.utxos_repo().list_local_utxos().await }) + } + + #[tracing::instrument(name = "bdk.db.iter_raw_txs", skip_all, err)] + fn iter_raw_txs(&self) -> Result, bdk::Error> { + Err(Self::unsupported_operation("iter_raw_txs")) + } + + #[tracing::instrument(name = "bdk.db.iter_txs", skip_all, err)] + fn iter_txs(&self, include_raw: bool) -> Result, bdk::Error> { + let raw_loaded = self.cache.raw_txs_fully_loaded(); + let summary_loaded = self.cache.summary_txs_fully_loaded(); + let txs = if include_raw { + if raw_loaded { + self.cache + .all_txs()? + .into_iter() + .map(|tx| (tx.txid, tx)) + .collect() + } else { + let loaded = self + .ctx + .rt + .block_on(async { self.transactions_repo().load_all().await })?; + self.cache + .extend_txs(loaded.iter().map(|(txid, tx)| (*txid, tx.clone())))?; + self.cache.set_raw_txs_fully_loaded(); + loaded + } + } else if raw_loaded || summary_loaded { + // Once raw txs are fully loaded for this process, serve summary calls from cache to + // avoid repeated full-table reads. This returns the in-process snapshot (kept current + // by set/del/commit paths) rather than forcing a fresh DB roundtrip. + self.cache.all_summary_txs()? + } else { + let txs = self + .ctx + .rt + .block_on(async { self.transactions_repo().load_all_summaries().await })?; + self.cache + .extend_summary_txs(txs.iter().map(|(txid, tx)| (*txid, tx.clone())))?; + self.cache.set_summary_txs_fully_loaded(); + txs + }; + + Ok(Self::overlay_batch_txs(txs, &self.batch.txs, include_raw) + .into_values() + .collect()) + } + + #[tracing::instrument(name = "bdk.db.get_script_pubkey_from_path", skip_all, err)] + fn get_script_pubkey_from_path( + &self, + keychain: KeychainKind, + path: u32, + ) -> Result, bdk::Error> { + self.ctx + .rt + .block_on(async { self.script_pubkeys_repo().find_script(keychain, path).await }) + } + + #[tracing::instrument( + name = "bdk.db.get_path_from_script_pubkey", + skip_all, + err, + fields(source) + )] + fn get_path_from_script_pubkey( + &self, + script: &Script, + ) -> Result, bdk::Error> { + let (result, source) = self.lookup_script_pubkey_path(script)?; + tracing::Span::current().record("source", tracing::field::display(source)); + Ok(result) + } + + #[tracing::instrument(name = "bdk.db.get_utxo", skip_all, err)] + fn get_utxo(&self, outpoint: &OutPoint) -> Result, bdk::Error> { + self.ctx + .rt + .block_on(async { self.utxos_repo().find(outpoint).await }) + } + + #[tracing::instrument(name = "bdk.db.get_raw_tx", skip_all, err, fields(source))] + fn get_raw_tx(&self, tx_id: &Txid) -> Result, bdk::Error> { + let (tx, source) = self.lookup_tx_with_mode(tx_id, TxLookupMode::RequireRaw)?; + tracing::Span::current().record("source", tracing::field::display(source)); + Ok(tx.and_then(|tx| tx.transaction)) + } + + #[tracing::instrument( + name = "bdk.db.get_tx", + skip_all, + err, + fields(source, include_raw = include_raw) + )] + fn get_tx( + &self, + tx_id: &Txid, + include_raw: bool, + ) -> Result, bdk::Error> { + let (tx, source) = if include_raw { + self.lookup_tx_with_mode(tx_id, TxLookupMode::RequireRaw)? + } else { + self.lookup_tx(tx_id)? + }; + tracing::Span::current().record("source", tracing::field::display(source)); + Ok(tx.map(|tx| { + if include_raw { + tx + } else { + Self::summary_tx_from_owned(tx) + } + })) + } + + #[tracing::instrument(name = "bdk.db.get_last_index", skip_all, err)] + fn get_last_index(&self, kind: KeychainKind) -> Result, bdk::Error> { + self.ctx + .rt + .block_on(async { self.indexes_repo().get_latest(kind).await }) + } + + #[tracing::instrument(name = "bdk.db.get_sync_time", skip_all, err)] + fn get_sync_time(&self) -> Result, bdk::Error> { + self.ctx + .rt + .block_on(async { self.sync_times_repo().get().await }) + } + + #[tracing::instrument(name = "bdk.db.increment_last_index", skip_all, err)] + fn increment_last_index(&mut self, keychain: KeychainKind) -> Result { + self.ctx + .rt + .block_on(async { self.indexes_repo().increment(keychain).await }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use bdk::bitcoin::hashes::Hash; + use sqlx::postgres::PgPoolOptions; + + fn tx_details(txid: Txid) -> TransactionDetails { + TransactionDetails { + transaction: None, + txid, + received: 0, + sent: 0, + fee: None, + confirmation_time: None, + } + } + + #[tokio::test] + async fn get_tx_include_raw_does_not_return_summary_only_cache_entry() { + let pool = PgPoolOptions::new() + .connect_lazy("postgres://localhost/test") + .expect("lazy pool should be created"); + let db = SqlxWalletDb::new(pool, uuid::Uuid::nil().into()); + + let txid = Txid::all_zeros(); + db.cache + .insert_tx(txid, tx_details(txid)) + .expect("insert should succeed"); + db.cache.set_raw_txs_fully_loaded(); + + let tx = Database::get_tx(&db, &txid, true).expect("get_tx should succeed"); + assert!(tx.is_none()); + + let summary = Database::get_tx(&db, &txid, false).expect("get_tx should succeed"); + assert_eq!(summary.map(|s| s.txid), Some(txid)); + } +} + +impl BatchDatabase for SqlxWalletDb { + type Batch = Self; + + fn begin_batch(&self) -> ::Batch { + SqlxWalletDb { + ctx: self.ctx.clone(), + cache: self.cache.clone(), + batch: WalletBatchState::default(), + miss_resolution: self.miss_resolution, + } + } + + fn commit_batch( + &mut self, + mut batch: ::Batch, + ) -> Result<(), bdk::Error> { + // Atomic scope here is limited to staged script pubkeys, utxos, and transactions. + // `set_last_index` / `set_sync_time` remain immediate writes by design. + let (addresses_for_cache, addresses_for_db): (Vec<_>, Vec<_>) = batch + .batch + .addresses + .drain() + .map(|(script, (keychain, path))| { + let cache_entry = (script.clone(), (keychain, path)); + let db_entry = (BdkKeychainKind::from(keychain), path, script); + (cache_entry, db_entry) + }) + .unzip(); + + let (txs_for_cache, txs_for_db): (Vec<_>, Vec<_>) = batch + .batch + .txs + .drain() + .map(|(txid, tx)| ((txid, tx.clone()), tx)) + .unzip(); + + let utxos_for_db = std::mem::take(&mut batch.batch.utxos); + let keychain_id = batch.ctx.keychain_id; + let pool = batch.ctx.pool.clone(); + + batch.ctx.rt.block_on(async move { + let mut tx = pool + .begin() + .await + .map_err(|e| bdk::Error::Generic(e.to_string()))?; + + if !addresses_for_db.is_empty() { + ScriptPubkeys::new(keychain_id, pool.clone()) + .persist_all_in_tx(&mut tx, addresses_for_db) + .await?; + } + + if !utxos_for_db.is_empty() { + Utxos::new(keychain_id, pool.clone()) + .persist_all_in_tx(&mut tx, utxos_for_db) + .await?; + } + + if !txs_for_db.is_empty() { + Transactions::new(keychain_id, pool) + .persist_all_in_tx(&mut tx, txs_for_db) + .await?; + } + + tx.commit() + .await + .map_err(|e| bdk::Error::Generic(e.to_string()))?; + + Ok::<_, bdk::Error>(()) + })?; + + let mut cache_degraded = false; + if let Err(error) = self.cache.extend_script_pubkeys(addresses_for_cache) { + tracing::warn!( + phase = "script_pubkeys", + error = %error, + "cache update failed after successful commit; invalidating cache" + ); + cache_degraded = true; + } + if let Err(error) = self.cache.extend_txs(txs_for_cache) { + tracing::warn!( + phase = "txs", + error = %error, + "cache update failed after successful commit; invalidating cache" + ); + cache_degraded = true; + } + if cache_degraded { + self.cache.invalidate(); + } + + Ok(()) + } +} diff --git a/src/bdk/pg/lookups.rs b/src/bdk/pg/lookups.rs new file mode 100644 index 00000000..229f9e8c --- /dev/null +++ b/src/bdk/pg/lookups.rs @@ -0,0 +1,276 @@ +use bdk::{ + bitcoin::{Script, Txid}, + KeychainKind, TransactionDetails, +}; +use std::collections::HashMap; + +use super::SqlxWalletDb; + +type LookupSource = &'static str; +type ScriptPathLookup = (Option<(KeychainKind, u32)>, LookupSource); +type TxLookup = (Option, LookupSource); + +#[derive(Copy, Clone, Eq, PartialEq)] +pub(super) enum TxLookupMode { + Any, + RequireRaw, +} + +#[derive(Copy, Clone)] +pub(super) struct MissResolutionPolicy { + pub threshold: usize, + pub batch_size: usize, +} + +impl Default for MissResolutionPolicy { + fn default() -> Self { + Self { + threshold: 64, + batch_size: 512, + } + } +} + +impl SqlxWalletDb { + fn resolve_pending_script_misses(&self) -> Result<(), bdk::Error> { + if !self + .cache + .should_batch_resolve_script_misses(self.miss_resolution.threshold)? + { + return Ok(()); + } + + let pending = self + .cache + .drain_pending_script_misses(self.miss_resolution.batch_size)?; + if pending.is_empty() { + return Ok(()); + } + + let found = match self.ctx.rt.block_on(async { + self.script_pubkeys_repo() + .find_paths_for_scripts(&pending) + .await + }) { + Ok(found) => found, + Err(error) => { + self.cache.requeue_pending_script_misses(pending)?; + return Err(error); + } + }; + + if !found.is_empty() { + self.cache.extend_script_pubkeys( + found + .into_iter() + .map(|(script, (kind, path))| (script, (KeychainKind::from(kind), path))), + )?; + } + + Ok(()) + } + + fn resolve_pending_tx_misses(&self) -> Result<(), bdk::Error> { + if !self + .cache + .should_batch_resolve_tx_misses(self.miss_resolution.threshold)? + { + return Ok(()); + } + + let pending = self + .cache + .drain_pending_tx_misses(self.miss_resolution.batch_size)?; + if pending.is_empty() { + return Ok(()); + } + + let found = match self + .ctx + .rt + .block_on(async { self.transactions_repo().find_by_ids(&pending).await }) + { + Ok(found) => found, + Err(error) => { + self.cache.requeue_pending_tx_misses(pending)?; + return Err(error); + } + }; + + if !found.is_empty() { + self.cache.extend_txs(found)?; + } + + Ok(()) + } + + pub(super) fn lookup_script_pubkey_path( + &self, + script: &Script, + ) -> Result { + if let Some(path) = self.batch.addresses.get(script) { + return Ok((Some(*path), "batch")); + } + + if let Some(path) = self.cache.get_script_pubkey_path(script)? { + return Ok((Some(path), "cache")); + } + + if self.cache.script_marked_missing(script)? { + tracing::trace!("script path miss cache hit"); + return Ok((None, "miss_cache")); + } + + // Once both keychains are fully hydrated in this process, a cache miss is definitive. + if self.cache.script_pubkeys_fully_loaded(None) { + self.cache.record_missing_script(script.to_owned())?; + return Ok((None, "fully_loaded_miss")); + } + + self.resolve_pending_script_misses()?; + if let Some(path) = self.cache.get_script_pubkey_path(script)? { + return Ok((Some(path), "batch_resolve")); + } + + let script_pubkey = script.to_owned(); + let found = self + .ctx + .rt + .block_on(async { self.script_pubkeys_repo().find_path(&script_pubkey).await })?; + + if let Some((kind, path)) = found { + let value = (KeychainKind::from(kind), path); + self.cache.insert_script_pubkey(script_pubkey, value)?; + return Ok((Some(value), "db_hit")); + } + + self.cache + .record_and_enqueue_missing_script(script_pubkey)?; + + Ok((None, "db_miss")) + } + + pub(super) fn lookup_tx_with_mode( + &self, + txid: &Txid, + mode: TxLookupMode, + ) -> Result { + if let Some(tx) = self.batch.txs.get(txid) { + if Self::tx_matches_lookup_mode(tx, mode) { + return Ok((Some(tx.clone()), "batch")); + } + + return Ok((None, "batch_mode_miss")); + } + + if let Some(tx) = self.cache.get_tx(txid)? { + if Self::tx_matches_lookup_mode(&tx, mode) { + return Ok((Some(tx), "cache")); + } + + if self.cache.raw_txs_fully_loaded() { + self.cache.record_missing_txid(*txid)?; + return Ok((None, "cache_mode_miss")); + } + } + + if self.cache.txid_marked_missing(txid)? { + tracing::trace!("tx miss cache hit"); + return Ok((None, "miss_cache")); + } + + // Once raw txs are fully loaded in this process, a miss is definitive. + if self.cache.raw_txs_fully_loaded() { + self.cache.record_missing_txid(*txid)?; + return Ok((None, "fully_loaded_miss")); + } + + self.resolve_pending_tx_misses()?; + if let Some(tx) = self.cache.get_tx(txid)? { + if Self::tx_matches_lookup_mode(&tx, mode) { + return Ok((Some(tx), "batch_resolve")); + } + if self.cache.raw_txs_fully_loaded() { + self.cache.record_missing_txid(*txid)?; + return Ok((None, "batch_resolve_mode_miss")); + } + } + + let found = self + .ctx + .rt + .block_on(async { self.transactions_repo().find_by_id(txid).await })?; + + // DB rows represent persisted TransactionDetails. In `RequireRaw` mode, a summary-only + // row is a mode miss (not an existence miss). + + if let Some(tx) = &found { + self.cache.insert_tx(tx.txid, tx.clone())?; + if Self::tx_matches_lookup_mode(tx, mode) { + Ok((found, "db_hit")) + } else { + Ok((None, "db_mode_miss")) + } + } else { + self.cache.record_and_enqueue_missing_txid(*txid)?; + Ok((None, "db_miss")) + } + } + + pub(super) fn lookup_tx(&self, txid: &Txid) -> Result { + self.lookup_tx_with_mode(txid, TxLookupMode::Any) + } + + pub(super) fn tx_matches_lookup_mode(tx: &TransactionDetails, mode: TxLookupMode) -> bool { + mode == TxLookupMode::Any || tx.transaction.is_some() + } + + pub(super) fn summary_tx_from_ref(tx: &TransactionDetails) -> TransactionDetails { + TransactionDetails { + transaction: None, + txid: tx.txid, + received: tx.received, + sent: tx.sent, + fee: tx.fee, + confirmation_time: tx.confirmation_time.clone(), + } + } + + pub(super) fn summary_tx_from_owned(tx: TransactionDetails) -> TransactionDetails { + let TransactionDetails { + txid, + received, + sent, + fee, + confirmation_time, + .. + } = tx; + + TransactionDetails { + transaction: None, + txid, + received, + sent, + fee, + confirmation_time, + } + } + + pub(super) fn overlay_batch_txs( + mut txs: HashMap, + batch_txs: &HashMap, + include_raw: bool, + ) -> HashMap { + if include_raw { + txs.extend(batch_txs.iter().map(|(id, tx)| (*id, tx.clone()))); + } else { + txs.extend( + batch_txs + .iter() + .map(|(id, tx)| (*id, Self::summary_tx_from_ref(tx))), + ); + } + + txs + } +} diff --git a/src/bdk/pg/mod.rs b/src/bdk/pg/mod.rs index ad3233bd..7c8ea844 100644 --- a/src/bdk/pg/mod.rs +++ b/src/bdk/pg/mod.rs @@ -1,29 +1,28 @@ +mod cache; mod convert; +mod db_traits; mod descriptor_checksum; mod index; +mod lookups; mod script_pubkeys; mod sync_times; mod transactions; mod utxos; use bdk::{ - bitcoin::{blockdata::transaction::OutPoint, Script, ScriptBuf, Transaction, Txid}, - database::{BatchDatabase, BatchOperations, Database, SyncTime}, + bitcoin::{ScriptBuf, Txid}, KeychainKind, LocalUtxo, TransactionDetails, }; use sqlx::PgPool; use tokio::runtime::Handle; use crate::primitives::*; -use convert::BdkKeychainKind; +use cache::WalletCache; use descriptor_checksum::DescriptorChecksums; use index::Indexes; +use lookups::MissResolutionPolicy; use script_pubkeys::ScriptPubkeys; -use std::{ - collections::HashMap, - sync::atomic::{AtomicBool, AtomicU8, Ordering}, - sync::{Arc, Mutex, MutexGuard}, -}; +use std::collections::HashMap; pub(super) use sync_times::SyncTimes; pub use transactions::*; pub use utxos::*; @@ -31,12 +30,6 @@ pub use utxos::*; type ScriptPubkeyCache = HashMap; type TransactionCache = HashMap; -#[derive(Copy, Clone, Eq, PartialEq)] -enum TxLookupMode { - Any, - RequireRaw, -} - #[derive(Clone)] struct WalletDbContext { rt: Handle, @@ -61,175 +54,11 @@ struct WalletBatchState { txs: TransactionCache, } -#[derive(Clone)] -struct WalletCache { - script_pubkeys: Arc>, - transactions: Arc>, - // Process-local hint for which keychain script path sets are fully hydrated. - // Bit 0: external, bit 1: internal. - // This is intentionally not synchronized across processes. - script_pubkeys_loaded_mask: Arc, - // Process-local hint: true means this instance has already hydrated raw tx details - // from the DB at least once. It is intentionally not synchronized across processes. - raw_txs_fully_loaded: Arc, -} - -impl WalletCache { - fn new() -> Self { - Self { - script_pubkeys: Arc::new(Mutex::new(HashMap::new())), - transactions: Arc::new(Mutex::new(HashMap::new())), - script_pubkeys_loaded_mask: Arc::new(AtomicU8::new(0)), - raw_txs_fully_loaded: Arc::new(AtomicBool::new(false)), - } - } - - fn script_pubkey_mask_for(keychain: Option) -> u8 { - const EXTERNAL: u8 = 1; - const INTERNAL: u8 = 2; - match keychain { - Some(KeychainKind::External) => EXTERNAL, - Some(KeychainKind::Internal) => INTERNAL, - None => EXTERNAL | INTERNAL, - } - } - - fn lock_script_pubkeys(&self) -> Result, bdk::Error> { - self.script_pubkeys - .lock() - .map_err(|_| bdk::Error::Generic("script pubkeys cache lock poisoned".to_string())) - } - - fn lock_transactions(&self) -> Result, bdk::Error> { - self.transactions - .lock() - .map_err(|_| bdk::Error::Generic("transactions cache lock poisoned".to_string())) - } - - fn get_script_pubkey_path( - &self, - script: &Script, - ) -> Result, bdk::Error> { - let cache = self.lock_script_pubkeys()?; - Ok(cache.get(script).copied()) - } - - fn insert_script_pubkey( - &self, - script: ScriptBuf, - path: (KeychainKind, u32), - ) -> Result<(), bdk::Error> { - let mut cache = self.lock_script_pubkeys()?; - cache.insert(script, path); - Ok(()) - } - - fn extend_script_pubkeys(&self, entries: I) -> Result<(), bdk::Error> - where - I: IntoIterator, - { - let mut cache = self.lock_script_pubkeys()?; - cache.extend(entries); - Ok(()) - } - - fn all_script_pubkeys( - &self, - keychain: Option, - ) -> Result, bdk::Error> { - let cache = self.lock_script_pubkeys()?; - Ok(cache - .iter() - .filter(|(_, (kind, _))| keychain.is_none_or(|k| *kind == k)) - .map(|(script, _)| script.clone()) - .collect()) - } - - fn script_pubkeys_fully_loaded(&self, keychain: Option) -> bool { - let required_mask = Self::script_pubkey_mask_for(keychain); - let loaded_mask = self.script_pubkeys_loaded_mask.load(Ordering::Acquire); - loaded_mask & required_mask == required_mask - } - - fn mark_script_pubkeys_loaded(&self, keychain: Option) { - let mask = Self::script_pubkey_mask_for(keychain); - self.script_pubkeys_loaded_mask - .fetch_or(mask, Ordering::Release); - } - - fn get_tx(&self, txid: &Txid) -> Result, bdk::Error> { - let cache = self.lock_transactions()?; - Ok(cache.get(txid).cloned()) - } - - fn insert_tx(&self, txid: Txid, tx: TransactionDetails) -> Result<(), bdk::Error> { - let mut cache = self.lock_transactions()?; - cache.insert(txid, tx); - Ok(()) - } - - fn extend_txs(&self, entries: I) -> Result<(), bdk::Error> - where - I: IntoIterator, - { - let mut cache = self.lock_transactions()?; - cache.extend(entries); - Ok(()) - } - - fn extend_summary_txs(&self, entries: I) -> Result<(), bdk::Error> - where - I: IntoIterator, - { - let mut cache = self.lock_transactions()?; - for (txid, mut summary) in entries { - // Summary refreshes may run after raw tx bytes were already hydrated. Preserve any - // cached raw transaction payload while applying fresh DB metadata fields. - // `iter_txs` overlays in-memory batch writes afterwards, so uncommitted updates still - // take precedence in returned views. - if let Some(raw_tx) = cache - .get(&txid) - .and_then(|existing| existing.transaction.clone()) - { - summary.transaction = Some(raw_tx); - } - cache.insert(txid, summary); - } - Ok(()) - } - - fn all_txs(&self) -> Result, bdk::Error> { - let cache = self.lock_transactions()?; - Ok(cache.values().cloned().collect()) - } - - fn all_summary_txs(&self) -> Result, bdk::Error> { - let cache = self.lock_transactions()?; - Ok(cache - .values() - .map(|tx| (tx.txid, SqlxWalletDb::summary_tx_from_ref(tx))) - .collect()) - } - - fn raw_txs_fully_loaded(&self) -> bool { - self.raw_txs_fully_loaded.load(Ordering::Acquire) - } - - fn set_raw_txs_fully_loaded(&self) { - self.raw_txs_fully_loaded.store(true, Ordering::Release); - } - - fn remove_tx(&self, txid: &Txid) -> Result<(), bdk::Error> { - let mut cache = self.lock_transactions()?; - cache.remove(txid); - Ok(()) - } -} - pub struct SqlxWalletDb { ctx: WalletDbContext, cache: WalletCache, batch: WalletBatchState, + miss_resolution: MissResolutionPolicy, } impl SqlxWalletDb { @@ -242,6 +71,7 @@ impl SqlxWalletDb { ctx: WalletDbContext::new(pool, keychain_id), cache: WalletCache::new(), batch: WalletBatchState::default(), + miss_resolution: MissResolutionPolicy::default(), } } @@ -286,471 +116,6 @@ impl SqlxWalletDb { fn descriptor_checksums_repo(&self) -> DescriptorChecksums { DescriptorChecksums::new(self.ctx.keychain_id, self.ctx.pool.clone()) } - - fn lookup_script_pubkey_path( - &self, - script: &Script, - ) -> Result, bdk::Error> { - if let Some(path) = self.batch.addresses.get(script) { - return Ok(Some(*path)); - } - - if let Some(path) = self.cache.get_script_pubkey_path(script)? { - return Ok(Some(path)); - } - - let script_pubkey = script.to_owned(); - let found = self - .ctx - .rt - .block_on(async { self.script_pubkeys_repo().find_path(&script_pubkey).await })?; - - if let Some((kind, path)) = found { - let value = (KeychainKind::from(kind), path); - self.cache.insert_script_pubkey(script_pubkey, value)?; - return Ok(Some(value)); - } - - Ok(None) - } - - fn lookup_tx_with_mode( - &self, - txid: &Txid, - mode: TxLookupMode, - ) -> Result, bdk::Error> { - if let Some(tx) = self.batch.txs.get(txid) { - if Self::tx_matches_lookup_mode(tx, mode) { - return Ok(Some(tx.clone())); - } - - return Ok(None); - } - - if let Some(tx) = self.cache.get_tx(txid)? { - if Self::tx_matches_lookup_mode(&tx, mode) { - return Ok(Some(tx)); - } - - if self.cache.raw_txs_fully_loaded() { - return Ok(None); - } - } - - let found = self - .ctx - .rt - .block_on(async { self.transactions_repo().find_by_id(txid).await })?; - - // DB rows represent persisted TransactionDetails; this store does not persist a - // "summary-only" transaction format. A DB hit is therefore valid for both lookup - // modes (`Any` and `RequireRaw`). - - if let Some(tx) = &found { - self.cache.insert_tx(tx.txid, tx.clone())?; - } - - Ok(found) - } - - fn lookup_tx(&self, txid: &Txid) -> Result, bdk::Error> { - self.lookup_tx_with_mode(txid, TxLookupMode::Any) - } - - fn tx_matches_lookup_mode(tx: &TransactionDetails, mode: TxLookupMode) -> bool { - mode == TxLookupMode::Any || tx.transaction.is_some() - } - - fn summary_tx_from_ref(tx: &TransactionDetails) -> TransactionDetails { - TransactionDetails { - transaction: None, - txid: tx.txid, - received: tx.received, - sent: tx.sent, - fee: tx.fee, - confirmation_time: tx.confirmation_time.clone(), - } - } - - fn summary_tx_from_owned(tx: TransactionDetails) -> TransactionDetails { - let TransactionDetails { - txid, - received, - sent, - fee, - confirmation_time, - .. - } = tx; - - TransactionDetails { - transaction: None, - txid, - received, - sent, - fee, - confirmation_time, - } - } - - fn overlay_batch_txs( - mut txs: HashMap, - batch_txs: &HashMap, - include_raw: bool, - ) -> HashMap { - if include_raw { - txs.extend(batch_txs.iter().map(|(id, tx)| (*id, tx.clone()))); - } else { - txs.extend( - batch_txs - .iter() - .map(|(id, tx)| (*id, Self::summary_tx_from_ref(tx))), - ); - } - - txs - } -} - -impl BatchOperations for SqlxWalletDb { - #[tracing::instrument(name = "bdk.batch.set_script_pubkey", skip_all, err)] - fn set_script_pubkey( - &mut self, - script: &Script, - keychain: KeychainKind, - path: u32, - ) -> Result<(), bdk::Error> { - self.batch.addresses.insert(script.into(), (keychain, path)); - Ok(()) - } - - #[tracing::instrument(name = "bdk.batch.set_utxo", skip_all, err)] - fn set_utxo(&mut self, utxo: &LocalUtxo) -> Result<(), bdk::Error> { - self.batch.utxos.push(utxo.clone()); - Ok(()) - } - - #[tracing::instrument(name = "bdk.batch.set_raw_tx", skip_all, err)] - fn set_raw_tx(&mut self, _: &Transaction) -> Result<(), bdk::Error> { - Err(Self::unsupported_operation("set_raw_tx")) - } - - #[tracing::instrument(name = "bdk.batch.set_tx", skip_all, err)] - fn set_tx(&mut self, tx: &TransactionDetails) -> Result<(), bdk::Error> { - self.batch.txs.insert(tx.txid, tx.clone()); - Ok(()) - } - - #[tracing::instrument(name = "bdk.batch.set_last_index", skip_all, err)] - fn set_last_index(&mut self, kind: KeychainKind, idx: u32) -> Result<(), bdk::Error> { - // NOTE: This write is intentionally immediate because BDK may call it outside of - // `commit_batch` flow. - self.ctx - .rt - .block_on(async { self.indexes_repo().persist_last_index(kind, idx).await }) - } - - #[tracing::instrument(name = "bdk.batch.set_sync_time", skip_all, err)] - fn set_sync_time(&mut self, time: SyncTime) -> Result<(), bdk::Error> { - // NOTE: This write is intentionally immediate because BDK may call it outside of - // `commit_batch` flow. - self.ctx - .rt - .block_on(async { self.sync_times_repo().persist(time).await }) - } - - #[tracing::instrument(name = "bdk.batch.del_script_pubkey_from_path", skip_all, err)] - fn del_script_pubkey_from_path( - &mut self, - _: KeychainKind, - _: u32, - ) -> Result, bdk::Error> { - Err(Self::unsupported_operation("del_script_pubkey_from_path")) - } - - #[tracing::instrument(name = "bdk.batch.del_path_from_script_pubkey", skip_all, err)] - fn del_path_from_script_pubkey( - &mut self, - _: &Script, - ) -> Result, bdk::Error> { - Err(Self::unsupported_operation("del_path_from_script_pubkey")) - } - - #[tracing::instrument(name = "bdk.batch.del_utxo", skip_all, err)] - fn del_utxo(&mut self, outpoint: &OutPoint) -> Result, bdk::Error> { - self.ctx - .rt - .block_on(async { self.utxos_repo().delete(outpoint).await }) - } - - #[tracing::instrument(name = "bdk.batch.del_raw_tx", skip_all, err)] - fn del_raw_tx(&mut self, _: &Txid) -> Result, bdk::Error> { - Err(Self::unsupported_operation("del_raw_tx")) - } - - #[tracing::instrument(name = "bdk.batch.del_tx", skip_all, err)] - fn del_tx( - &mut self, - tx_id: &Txid, - _include_raw: bool, - ) -> Result, bdk::Error> { - let deleted = self - .ctx - .rt - .block_on(async { self.transactions_repo().delete(tx_id).await })?; - - if deleted.is_some() { - self.batch.txs.remove(tx_id); - self.cache.remove_tx(tx_id)?; - } - - Ok(deleted) - } - - #[tracing::instrument(name = "bdk.batch.del_last_index", skip_all, err)] - fn del_last_index(&mut self, _: KeychainKind) -> Result, bdk::Error> { - Err(Self::unsupported_operation("del_last_index")) - } - - #[tracing::instrument(name = "bdk.batch.del_sync_time", skip_all, err)] - fn del_sync_time(&mut self) -> Result, bdk::Error> { - Err(Self::unsupported_operation("del_sync_time")) - } -} - -impl Database for SqlxWalletDb { - #[tracing::instrument(name = "bdk.db.check_descriptor_checksum", skip_all, err)] - fn check_descriptor_checksum( - &mut self, - keychain: KeychainKind, - script_bytes: B, - ) -> Result<(), bdk::Error> - where - B: AsRef<[u8]>, - { - self.ctx.rt.block_on(async { - let checksums = self.descriptor_checksums_repo(); - checksums - .check_or_persist_descriptor_checksum(keychain, script_bytes.as_ref()) - .await?; - - Ok(()) - }) - } - - #[tracing::instrument(name = "bdk.db.iter_script_pubkeys", skip_all, err)] - fn iter_script_pubkeys( - &self, - keychain: Option, - ) -> Result, bdk::Error> { - if self.cache.script_pubkeys_fully_loaded(keychain) { - return self.cache.all_script_pubkeys(keychain); - } - - let scripts_with_paths = self.ctx.rt.block_on(async { - self.script_pubkeys_repo() - .list_scripts_with_paths(keychain) - .await - })?; - - Self::cache_loaded_script_pubkeys(&self.cache, keychain, scripts_with_paths) - } - - #[tracing::instrument(name = "bdk.db.iter_utxos", skip_all, err)] - fn iter_utxos(&self) -> Result, bdk::Error> { - self.ctx - .rt - .block_on(async { self.utxos_repo().list_local_utxos().await }) - } - - #[tracing::instrument(name = "bdk.db.iter_raw_txs", skip_all, err)] - fn iter_raw_txs(&self) -> Result, bdk::Error> { - Err(Self::unsupported_operation("iter_raw_txs")) - } - - #[tracing::instrument(name = "bdk.db.iter_txs", skip_all, err)] - fn iter_txs(&self, include_raw: bool) -> Result, bdk::Error> { - let txs = match (include_raw, self.cache.raw_txs_fully_loaded()) { - (true, true) => self - .cache - .all_txs()? - .into_iter() - .map(|tx| (tx.txid, tx)) - .collect(), - (true, false) => { - let loaded = self - .ctx - .rt - .block_on(async { self.transactions_repo().load_all().await })?; - self.cache - .extend_txs(loaded.iter().map(|(txid, tx)| (*txid, tx.clone())))?; - self.cache.set_raw_txs_fully_loaded(); - loaded - } - // Once raw txs are fully loaded for this process, serve summary calls from cache to - // avoid repeated full-table reads. This returns the in-process snapshot (kept current - // by set/del/commit paths) rather than forcing a fresh DB roundtrip. - (false, true) => self.cache.all_summary_txs()?, - (false, false) => { - let txs = self - .ctx - .rt - .block_on(async { self.transactions_repo().load_all_summaries().await })?; - self.cache - .extend_summary_txs(txs.iter().map(|(txid, tx)| (*txid, tx.clone())))?; - txs - } - }; - - Ok(Self::overlay_batch_txs(txs, &self.batch.txs, include_raw) - .into_values() - .collect()) - } - - #[tracing::instrument(name = "bdk.db.get_script_pubkey_from_path", skip_all, err)] - fn get_script_pubkey_from_path( - &self, - keychain: KeychainKind, - path: u32, - ) -> Result, bdk::Error> { - self.ctx - .rt - .block_on(async { self.script_pubkeys_repo().find_script(keychain, path).await }) - } - - #[tracing::instrument(name = "bdk.db.get_path_from_script_pubkey", skip_all, err)] - fn get_path_from_script_pubkey( - &self, - script: &Script, - ) -> Result, bdk::Error> { - self.lookup_script_pubkey_path(script) - } - - #[tracing::instrument(name = "bdk.db.get_utxo", skip_all, err)] - fn get_utxo(&self, outpoint: &OutPoint) -> Result, bdk::Error> { - self.ctx - .rt - .block_on(async { self.utxos_repo().find(outpoint).await }) - } - - #[tracing::instrument(name = "bdk.db.get_raw_tx", skip_all, err)] - fn get_raw_tx(&self, tx_id: &Txid) -> Result, bdk::Error> { - self.lookup_tx_with_mode(tx_id, TxLookupMode::RequireRaw) - .map(|tx| tx.and_then(|tx| tx.transaction)) - } - - #[tracing::instrument(name = "bdk.db.get_tx", skip_all, err)] - fn get_tx( - &self, - tx_id: &Txid, - include_raw: bool, - ) -> Result, bdk::Error> { - self.lookup_tx(tx_id).map(|tx| { - tx.map(|tx| { - if include_raw { - tx - } else { - Self::summary_tx_from_owned(tx) - } - }) - }) - } - - #[tracing::instrument(name = "bdk.db.get_last_index", skip_all, err)] - fn get_last_index(&self, kind: KeychainKind) -> Result, bdk::Error> { - self.ctx - .rt - .block_on(async { self.indexes_repo().get_latest(kind).await }) - } - - #[tracing::instrument(name = "bdk.db.get_sync_time", skip_all, err)] - fn get_sync_time(&self) -> Result, bdk::Error> { - self.ctx - .rt - .block_on(async { self.sync_times_repo().get().await }) - } - - #[tracing::instrument(name = "bdk.db.increment_last_index", skip_all, err)] - fn increment_last_index(&mut self, keychain: KeychainKind) -> Result { - self.ctx - .rt - .block_on(async { self.indexes_repo().increment(keychain).await }) - } -} - -impl BatchDatabase for SqlxWalletDb { - type Batch = Self; - - fn begin_batch(&self) -> ::Batch { - SqlxWalletDb { - ctx: self.ctx.clone(), - cache: self.cache.clone(), - batch: WalletBatchState::default(), - } - } - - fn commit_batch( - &mut self, - mut batch: ::Batch, - ) -> Result<(), bdk::Error> { - // Atomic scope here is limited to staged script pubkeys, utxos, and transactions. - // `set_last_index` / `set_sync_time` remain immediate writes by design. - let (addresses_for_cache, addresses_for_db): (Vec<_>, Vec<_>) = batch - .batch - .addresses - .drain() - .map(|(script, (keychain, path))| { - let cache_entry = (script.clone(), (keychain, path)); - let db_entry = (BdkKeychainKind::from(keychain), path, script); - (cache_entry, db_entry) - }) - .unzip(); - - let (txs_for_cache, txs_for_db): (Vec<_>, Vec<_>) = batch - .batch - .txs - .drain() - .map(|(txid, tx)| ((txid, tx.clone()), tx)) - .unzip(); - - let utxos_for_db = std::mem::take(&mut batch.batch.utxos); - let keychain_id = batch.ctx.keychain_id; - let pool = batch.ctx.pool.clone(); - - batch.ctx.rt.block_on(async move { - let mut tx = pool - .begin() - .await - .map_err(|e| bdk::Error::Generic(e.to_string()))?; - - if !addresses_for_db.is_empty() { - ScriptPubkeys::new(keychain_id, pool.clone()) - .persist_all_in_tx(&mut tx, addresses_for_db) - .await?; - } - - if !utxos_for_db.is_empty() { - Utxos::new(keychain_id, pool.clone()) - .persist_all_in_tx(&mut tx, utxos_for_db) - .await?; - } - - if !txs_for_db.is_empty() { - Transactions::new(keychain_id, pool) - .persist_all_in_tx(&mut tx, txs_for_db) - .await?; - } - - tx.commit() - .await - .map_err(|e| bdk::Error::Generic(e.to_string()))?; - - Ok::<_, bdk::Error>(()) - })?; - - self.cache.extend_script_pubkeys(addresses_for_cache)?; - self.cache.extend_txs(txs_for_cache)?; - Ok(()) - } } #[cfg(test)] @@ -806,9 +171,20 @@ mod tests { fn wallet_cache_raw_txs_loaded_flag_defaults_false_and_can_be_set() { let cache = WalletCache::new(); assert!(!cache.raw_txs_fully_loaded()); + assert!(!cache.summary_txs_fully_loaded()); cache.set_raw_txs_fully_loaded(); assert!(cache.raw_txs_fully_loaded()); + assert!(cache.summary_txs_fully_loaded()); + } + + #[test] + fn wallet_cache_summary_txs_loaded_flag_defaults_false_and_can_be_set() { + let cache = WalletCache::new(); + assert!(!cache.summary_txs_fully_loaded()); + + cache.set_summary_txs_fully_loaded(); + assert!(cache.summary_txs_fully_loaded()); } #[test] @@ -953,4 +329,93 @@ mod tests { .expect("all_script_pubkeys should succeed"); assert_eq!(all.len(), 2); } + + #[test] + fn wallet_cache_tracks_missing_txids_and_pending_drain() { + let cache = WalletCache::new(); + let txid = Txid::all_zeros(); + let another = Txid::from_slice(&[1; 32]).expect("valid txid"); + + cache + .record_and_enqueue_missing_txid(txid) + .expect("mark should succeed"); + cache + .record_and_enqueue_missing_txid(another) + .expect("mark should succeed"); + assert!(cache + .txid_marked_missing(&txid) + .expect("lookup should succeed")); + assert!(cache + .should_batch_resolve_tx_misses(1) + .expect("threshold check should succeed")); + + let drained = cache + .drain_pending_tx_misses(1) + .expect("drain should succeed"); + assert_eq!(drained.len(), 1); + + cache + .mark_txid_not_missing(&txid) + .expect("clear should succeed"); + assert!(!cache + .txid_marked_missing(&txid) + .expect("lookup should succeed")); + } + + #[test] + fn wallet_cache_tracks_missing_scripts_and_pending_drain() { + let cache = WalletCache::new(); + let first = ScriptBuf::from(vec![0x51]); + let second = ScriptBuf::from(vec![0x52]); + + cache + .record_and_enqueue_missing_script(first.clone()) + .expect("mark should succeed"); + cache + .record_and_enqueue_missing_script(second.clone()) + .expect("mark should succeed"); + assert!(cache + .script_marked_missing(first.as_script()) + .expect("lookup should succeed")); + assert!(cache + .should_batch_resolve_script_misses(1) + .expect("threshold check should succeed")); + + let drained = cache + .drain_pending_script_misses(1) + .expect("drain should succeed"); + assert_eq!(drained.len(), 1); + + cache + .mark_script_not_missing(first.as_script()) + .expect("clear should succeed"); + assert!(!cache + .script_marked_missing(first.as_script()) + .expect("lookup should succeed")); + } + + #[test] + fn wallet_cache_extend_summary_txs_clears_tx_miss_tracking() { + let cache = WalletCache::new(); + let txid = Txid::all_zeros(); + + cache + .record_and_enqueue_missing_txid(txid) + .expect("mark should succeed"); + assert!(cache + .txid_marked_missing(&txid) + .expect("lookup should succeed")); + + cache + .extend_summary_txs([(txid, tx_details(txid))]) + .expect("extend should succeed"); + + assert!(!cache + .txid_marked_missing(&txid) + .expect("lookup should succeed")); + let drained = cache + .drain_pending_tx_misses(1) + .expect("drain should succeed"); + assert!(drained.is_empty()); + } } diff --git a/src/bdk/pg/script_pubkeys.rs b/src/bdk/pg/script_pubkeys.rs index 01983512..c360bba3 100644 --- a/src/bdk/pg/script_pubkeys.rs +++ b/src/bdk/pg/script_pubkeys.rs @@ -1,7 +1,10 @@ +use futures::{TryStream, TryStreamExt}; use sqlx::{PgPool, Postgres, QueryBuilder, Transaction}; use tracing::instrument; use uuid::Uuid; +use std::collections::HashMap; + use super::convert::BdkKeychainKind; use crate::primitives::{bitcoin::ScriptBuf, *}; @@ -13,6 +16,8 @@ pub struct ScriptPubkeys { } impl ScriptPubkeys { + const LIST_WITH_PATHS_BATCH_SIZE: i64 = 10_000; + fn script_with_path( script: Vec, keychain_kind: BdkKeychainKind, @@ -26,43 +31,23 @@ impl ScriptPubkeys { )) } - pub fn new(keychain_id: KeychainId, pool: PgPool) -> Self { - Self { keychain_id, pool } + async fn next_stream_row(stream: &mut S) -> Result, bdk::Error> + where + S: TryStream + Unpin, + { + stream + .try_next() + .await + .map_err(|e| bdk::Error::Generic(e.to_string())) } - #[instrument(name = "bdk.script_pubkeys.persist_all", skip_all)] - // Retained for non-transactional call sites and focused tests. - #[allow(dead_code)] - pub async fn persist_all( - &self, - keys: Vec<(BdkKeychainKind, u32, ScriptBuf)>, - ) -> Result<(), bdk::Error> { - const BATCH_SIZE: usize = 5000; - let chunks = keys.chunks(BATCH_SIZE); - for chunk in chunks { - let mut query_builder: QueryBuilder = QueryBuilder::new( - r#"INSERT INTO bdk_script_pubkeys - (keychain_id, keychain_kind, path, script, script_hex, script_fmt)"#, - ); - - query_builder.push_values(chunk, |mut builder, (keychain, path, script)| { - builder.push_bind(self.keychain_id); - builder.push_bind(keychain); - builder.push_bind(*path as i32); - builder.push_bind(script.as_bytes()); - builder.push_bind(format!("{script:02x}")); - builder.push_bind(format!("{script:?}")); - }); - query_builder.push("ON CONFLICT DO NOTHING"); - - query_builder - .build() - .execute(&self.pool) - .await - .map_err(|e| bdk::Error::Generic(e.to_string()))?; - } + fn record_list_with_paths_row(last_path: &mut Option, batch_rows: &mut usize, path: i32) { + *last_path = Some(path); + *batch_rows += 1; + } - Ok(()) + pub fn new(keychain_id: KeychainId, pool: PgPool) -> Self { + Self { keychain_id, pool } } pub async fn persist_all_in_tx( @@ -143,6 +128,43 @@ impl ScriptPubkeys { .transpose() } + #[instrument(name = "bdk.script_pubkeys.find_paths_for_scripts", skip_all, fields(n_requested = scripts.len(), n_found))] + pub async fn find_paths_for_scripts( + &self, + scripts: &[ScriptBuf], + ) -> Result, bdk::Error> { + if scripts.is_empty() { + return Ok(HashMap::new()); + } + + let script_hexes: Vec = scripts + .iter() + .map(|script| format!("{script:02x}")) + .collect(); + let rows = sqlx::query!( + r#"SELECT script, keychain_kind as "keychain_kind: BdkKeychainKind", path + FROM bdk_script_pubkeys + WHERE keychain_id = $1 AND script_hex = ANY($2)"#, + Uuid::from(self.keychain_id), + &script_hexes, + ) + .fetch_all(&self.pool) + .await + .map_err(|e| bdk::Error::Generic(e.to_string()))?; + + tracing::Span::current().record("n_found", rows.len()); + + rows.into_iter() + .map(|row| { + let keychain_kind_raw: BdkKeychainKind = row.keychain_kind; + let path: i32 = row.path; + let script: Vec = row.script; + let (script, (_, path)) = Self::script_with_path(script, keychain_kind_raw, path)?; + Ok((script, (keychain_kind_raw, path))) + }) + .collect() + } + #[instrument(name = "bdk.script_pubkeys.list_scripts", skip_all)] // Retained for non-path call sites and focused tests. #[allow(dead_code)] @@ -181,37 +203,58 @@ impl ScriptPubkeys { &self, keychain: Option>, ) -> Result, bdk::Error> { - let keychain_id = Uuid::from(self.keychain_id); let keychain_kind: Option = keychain.map(Into::into); if let Some(keychain_kind) = keychain_kind { - let rows = sqlx::query!( - r#"SELECT script, keychain_kind as "keychain_kind: BdkKeychainKind", path FROM bdk_script_pubkeys - WHERE keychain_id = $1 AND keychain_kind = $2"#, + self.list_scripts_with_paths_for_keychain(keychain_kind) + .await + } else { + let (mut all, internal) = tokio::try_join!( + self.list_scripts_with_paths_for_keychain(BdkKeychainKind::External), + self.list_scripts_with_paths_for_keychain(BdkKeychainKind::Internal), + )?; + all.extend(internal); + Ok(all) + } + } + + async fn list_scripts_with_paths_for_keychain( + &self, + keychain_kind: BdkKeychainKind, + ) -> Result, bdk::Error> { + let keychain_id = Uuid::from(self.keychain_id); + let mut last_path: Option = None; + let mut all = Vec::new(); + + loop { + let mut stream = sqlx::query!( + r#"SELECT script, keychain_kind as "keychain_kind: BdkKeychainKind", path + FROM bdk_script_pubkeys + WHERE keychain_id = $1 + AND keychain_kind = $2 + AND ($3::INT4 IS NULL OR path > $3) + ORDER BY path ASC + LIMIT $4"#, keychain_id, keychain_kind as BdkKeychainKind, + last_path, + Self::LIST_WITH_PATHS_BATCH_SIZE, ) - .fetch_all(&self.pool) - .await - .map_err(|e| bdk::Error::Generic(e.to_string()))?; + .fetch(&self.pool); - Ok(rows - .into_iter() - .map(|row| Self::script_with_path(row.script, row.keychain_kind, row.path)) - .collect::, _>>()?) - } else { - let rows = sqlx::query!( - r#"SELECT script, keychain_kind as "keychain_kind: BdkKeychainKind", path FROM bdk_script_pubkeys - WHERE keychain_id = $1"#, - keychain_id, - ) - .fetch_all(&self.pool) - .await - .map_err(|e| bdk::Error::Generic(e.to_string()))?; + let mut batch_rows = 0usize; + while let Some(row) = Self::next_stream_row(&mut stream).await? { + let path: i32 = row.path; + let script: Vec = row.script; + let row_keychain_kind: BdkKeychainKind = row.keychain_kind; + Self::record_list_with_paths_row(&mut last_path, &mut batch_rows, path); + all.push(Self::script_with_path(script, row_keychain_kind, path)?); + } - Ok(rows - .into_iter() - .map(|row| Self::script_with_path(row.script, row.keychain_kind, row.path)) - .collect::, _>>()?) + if batch_rows == 0 { + break; + } } + + Ok(all) } } diff --git a/src/bdk/pg/transactions.rs b/src/bdk/pg/transactions.rs index 64ca305a..6a136eaf 100644 --- a/src/bdk/pg/transactions.rs +++ b/src/bdk/pg/transactions.rs @@ -1,4 +1,5 @@ use bdk::{bitcoin::Txid, BlockTime, LocalUtxo, TransactionDetails}; +use futures::{TryStream, TryStreamExt}; use sqlx::{PgPool, Postgres, QueryBuilder, Transaction as SqlxTransaction}; use tracing::instrument; @@ -33,6 +34,60 @@ pub struct Transactions { } impl Transactions { + const LOAD_BATCH_SIZE: i64 = 10_000; + + fn parse_txid(tx_id: &str) -> Result { + tx_id + .parse::() + .map_err(|e| bdk::Error::Generic(format!("invalid tx_id in db '{tx_id}': {e}"))) + } + + fn deserialize_details( + details_json: serde_json::Value, + ) -> Result { + serde_json::from_value::(details_json) + .map_err(|e| bdk::Error::Generic(format!("could not deserialize tx details: {e}"))) + } + + fn to_u64(value: i64, field: &str) -> Result { + if value < 0 { + return Err(bdk::Error::Generic(format!( + "negative {field} value in bdk_transactions" + ))); + } + Ok(value as u64) + } + + fn to_u32(value: i32, field: &str) -> Result { + if value < 0 { + return Err(bdk::Error::Generic(format!( + "negative {field} value in bdk_transactions" + ))); + } + Ok(value as u32) + } + + async fn next_stream_row(stream: &mut S) -> Result, bdk::Error> + where + S: TryStream + Unpin, + { + stream + .try_next() + .await + .map_err(|e| bdk::Error::Generic(e.to_string())) + } + + fn record_loaded_row( + last_tx_id: &mut Option, + total_rows: &mut usize, + batch_rows: &mut usize, + tx_id: String, + ) { + *last_tx_id = Some(tx_id); + *total_rows += 1; + *batch_rows += 1; + } + fn serialize_batch( batch: &[TransactionDetails], ) -> Result, bdk::Error> { @@ -55,56 +110,6 @@ impl Transactions { Self { keychain_id, pool } } - #[instrument(name = "bdk.transactions.persist", skip_all)] - // Retained for non-transactional call sites and focused tests. - #[allow(dead_code)] - pub async fn persist_all(&self, txs: Vec) -> Result<(), bdk::Error> { - const BATCH_SIZE: usize = 2000; - let batches = txs.chunks(BATCH_SIZE); - - for batch in batches { - let serialized_batch = Self::serialize_batch(batch)?; - - let mut query_builder: QueryBuilder = QueryBuilder::new( - r#" - INSERT INTO bdk_transactions - (keychain_id, tx_id, details_json, sent, height)"#, - ); - - query_builder.push_values( - serialized_batch, - |mut builder, (tx_id, details_json, sent, height)| { - builder.push_bind(self.keychain_id as KeychainId); - builder.push_bind(tx_id); - builder.push_bind(details_json); - builder.push_bind(sent); - builder.push_bind(height); - }, - ); - - query_builder.push( - "ON CONFLICT (keychain_id, tx_id) DO UPDATE \ - SET details_json = EXCLUDED.details_json,\ - sent = EXCLUDED.sent,\ - height = EXCLUDED.height,\ - modified_at = NOW(),\ - deleted_at = NULL \ - WHERE bdk_transactions.details_json IS DISTINCT FROM EXCLUDED.details_json \ - OR bdk_transactions.sent IS DISTINCT FROM EXCLUDED.sent \ - OR bdk_transactions.height IS DISTINCT FROM EXCLUDED.height \ - OR bdk_transactions.deleted_at IS NOT NULL", - ); - - query_builder - .build() - .execute(&self.pool) - .await - .map_err(|e| bdk::Error::Generic(e.to_string()))?; - } - - Ok(()) - } - pub async fn persist_all_in_tx( &self, tx: &mut SqlxTransaction<'_, Postgres>, @@ -188,35 +193,96 @@ impl Transactions { .fetch_optional(&self.pool) .await .map_err(|e| bdk::Error::Generic(e.to_string()))?; - tx.map(|tx| { - serde_json::from_value(tx.details_json) - .map_err(|e| bdk::Error::Generic(format!("could not deserialize tx details: {e}"))) - }) - .transpose() + tx.map(|tx| Self::deserialize_details(tx.details_json)) + .transpose() } - #[instrument(name = "bdk.transactions.load_all", skip(self), fields(n_rows))] - pub async fn load_all(&self) -> Result, bdk::Error> { - let txs = sqlx::query!( + #[instrument(name = "bdk.transactions.find_by_ids", skip_all, fields(n_requested = tx_ids.len(), n_found))] + pub async fn find_by_ids( + &self, + tx_ids: &[Txid], + ) -> Result, bdk::Error> { + if tx_ids.is_empty() { + return Ok(HashMap::new()); + } + + let tx_ids_text: Vec = tx_ids.iter().map(ToString::to_string).collect(); + let rows = sqlx::query!( r#" - SELECT details_json FROM bdk_transactions WHERE keychain_id = $1 AND deleted_at IS NULL"#, + SELECT tx_id, details_json + FROM bdk_transactions + WHERE keychain_id = $1 + AND deleted_at IS NULL + AND tx_id = ANY($2)"#, self.keychain_id as KeychainId, + &tx_ids_text, ) .fetch_all(&self.pool) .await .map_err(|e| bdk::Error::Generic(e.to_string()))?; - tracing::Span::current().record("n_rows", txs.len()); - txs.into_iter() - .map(|tx| { - serde_json::from_value::(tx.details_json) - .map(|tx| (tx.txid, tx)) - .map_err(|e| { - bdk::Error::Generic(format!("could not deserialize tx details: {e}")) - }) + + tracing::Span::current().record("n_found", rows.len()); + + rows.into_iter() + .map(|row| { + let txid = Self::parse_txid(&row.tx_id)?; + let tx = Self::deserialize_details(row.details_json)?; + if tx.txid != txid { + return Err(bdk::Error::Generic(format!( + "mismatched tx ids in bdk_transactions: tx_id column {} != details_json txid {}", + txid, tx.txid + ))); + } + Ok((tx.txid, tx)) }) .collect() } + #[instrument(name = "bdk.transactions.load_all", skip(self), fields(n_rows))] + pub async fn load_all(&self) -> Result, bdk::Error> { + let mut count = 0usize; + let mut out = HashMap::new(); + let mut last_tx_id: Option = None; + + loop { + let mut stream = sqlx::query!( + r#" + SELECT tx_id, details_json + FROM bdk_transactions + WHERE keychain_id = $1 + AND deleted_at IS NULL + AND ($2::TEXT IS NULL OR tx_id > $2) + ORDER BY tx_id ASC + LIMIT $3"#, + self.keychain_id as KeychainId, + last_tx_id.as_deref(), + Self::LOAD_BATCH_SIZE, + ) + .fetch(&self.pool); + + let mut batch_rows = 0usize; + while let Some(row) = Self::next_stream_row(&mut stream).await? { + let txid = Self::parse_txid(&row.tx_id)?; + let details = Self::deserialize_details(row.details_json)?; + if txid != details.txid { + return Err(bdk::Error::Generic(format!( + "mismatched tx ids in bdk_transactions: tx_id column {} != details_json txid {}", + txid, details.txid + ))); + } + Self::record_loaded_row(&mut last_tx_id, &mut count, &mut batch_rows, row.tx_id); + out.insert(txid, details); + } + + if batch_rows == 0 { + break; + } + } + + tracing::Span::current().record("n_rows", count); + Ok(out) + } + #[instrument( name = "bdk.transactions.load_all_summaries", skip(self), @@ -225,51 +291,37 @@ impl Transactions { pub async fn load_all_summaries( &self, ) -> Result, bdk::Error> { - let rows = sqlx::query!( - r#" - SELECT tx_id, sent, height, - (details_json->>'received')::BIGINT AS "received?", - (details_json->>'fee')::BIGINT AS "fee?", - (details_json->'confirmation_time'->>'timestamp')::BIGINT AS "confirmation_timestamp?" - FROM bdk_transactions - WHERE keychain_id = $1 AND deleted_at IS NULL"#, - self.keychain_id as KeychainId, - ) - .fetch_all(&self.pool) - .await - .map_err(|e| bdk::Error::Generic(e.to_string()))?; + let mut count = 0usize; + let mut out = HashMap::new(); + let mut last_tx_id: Option = None; - tracing::Span::current().record("n_rows", rows.len()); - - fn to_u64(value: i64, field: &str) -> Result { - if value < 0 { - return Err(bdk::Error::Generic(format!( - "negative {field} value in bdk_transactions" - ))); - } - Ok(value as u64) - } - - fn to_u32(value: i32, field: &str) -> Result { - if value < 0 { - return Err(bdk::Error::Generic(format!( - "negative {field} value in bdk_transactions" - ))); - } - Ok(value as u32) - } + loop { + let mut stream = sqlx::query!( + r#" + SELECT tx_id, sent, height, + (details_json->>'received')::BIGINT AS "received?", + (details_json->>'fee')::BIGINT AS "fee?", + (details_json->'confirmation_time'->>'timestamp')::BIGINT AS "confirmation_timestamp?" + FROM bdk_transactions + WHERE keychain_id = $1 + AND deleted_at IS NULL + AND ($2::TEXT IS NULL OR tx_id > $2) + ORDER BY tx_id ASC + LIMIT $3"#, + self.keychain_id as KeychainId, + last_tx_id.as_deref(), + Self::LOAD_BATCH_SIZE, + ) + .fetch(&self.pool); - rows.into_iter() - .map(|row| { - let txid = row - .tx_id - .parse::() - .map_err(|e| bdk::Error::Generic(format!("invalid tx_id in db: {e}")))?; + let mut batch_rows = 0usize; + while let Some(row) = Self::next_stream_row(&mut stream).await? { + let txid = Self::parse_txid(&row.tx_id)?; let confirmation_time = match (row.height, row.confirmation_timestamp) { (Some(height), Some(timestamp)) => Some(BlockTime { - height: to_u32(height, "height")?, - timestamp: to_u64(timestamp, "confirmation timestamp")?, + height: Self::to_u32(height, "height")?, + timestamp: Self::to_u64(timestamp, "confirmation timestamp")?, }), _ => None, }; @@ -277,15 +329,23 @@ impl Transactions { let details = TransactionDetails { txid, transaction: None, - received: to_u64(row.received.unwrap_or_default(), "received")?, - sent: to_u64(row.sent, "sent")?, - fee: row.fee.map(|f| to_u64(f, "fee")).transpose()?, + received: Self::to_u64(row.received.unwrap_or_default(), "received")?, + sent: Self::to_u64(row.sent, "sent")?, + fee: row.fee.map(|f| Self::to_u64(f, "fee")).transpose()?, confirmation_time, }; - Ok((txid, details)) - }) - .collect() + Self::record_loaded_row(&mut last_tx_id, &mut count, &mut batch_rows, row.tx_id); + out.insert(txid, details); + } + + if batch_rows == 0 { + break; + } + } + + tracing::Span::current().record("n_rows", count); + Ok(out) } #[instrument(name = "bdk.transactions.find_unsynced_tx", skip(self), fields(n_rows))] diff --git a/src/bdk/pg/utxos.rs b/src/bdk/pg/utxos.rs index cc0a25c8..38812399 100644 --- a/src/bdk/pg/utxos.rs +++ b/src/bdk/pg/utxos.rs @@ -39,53 +39,6 @@ impl Utxos { Self { keychain_id, pool } } - #[instrument(name = "bdk.utxos.persist_all", skip_all)] - // Retained for non-transactional call sites and focused tests. - #[allow(dead_code)] - pub async fn persist_all(&self, utxos: Vec) -> Result<(), bdk::Error> { - const BATCH_SIZE: usize = 2000; - let batches = utxos.chunks(BATCH_SIZE); - - for batch in batches { - let serialized_batch = Self::serialize_batch(batch)?; - - let mut query_builder: QueryBuilder = QueryBuilder::new( - r#"INSERT INTO bdk_utxos - (keychain_id, tx_id, vout, utxo_json, is_spent)"#, - ); - - query_builder.push_values( - serialized_batch, - |mut builder, (tx_id, vout, utxo_json, is_spent)| { - builder.push_bind(Uuid::from(self.keychain_id)); - builder.push_bind(tx_id); - builder.push_bind(vout); - builder.push_bind(utxo_json); - builder.push_bind(is_spent); - }, - ); - - query_builder.push( - "ON CONFLICT (keychain_id, tx_id, vout) DO UPDATE \ - SET utxo_json = EXCLUDED.utxo_json,\ - is_spent = EXCLUDED.is_spent,\ - modified_at = NOW(),\ - deleted_at = NULL \ - WHERE bdk_utxos.utxo_json IS DISTINCT FROM EXCLUDED.utxo_json \ - OR bdk_utxos.is_spent IS DISTINCT FROM EXCLUDED.is_spent \ - OR bdk_utxos.deleted_at IS NOT NULL", - ); - - query_builder - .build() - .execute(&self.pool) - .await - .map_err(|e| bdk::Error::Generic(e.to_string()))?; - } - - Ok(()) - } - pub async fn persist_all_in_tx( &self, tx: &mut SqlxTransaction<'_, Postgres>,