diff --git a/src/bin/main.rs b/src/bin/main.rs index 9b964b07..40b5536c 100644 --- a/src/bin/main.rs +++ b/src/bin/main.rs @@ -68,7 +68,6 @@ async fn connect( mut wallet: Wallet, settings: &Settings, status: fn(&str), - block: bool, ) -> Wallet where F: SecureWalletFile + std::fmt::Debug, @@ -78,7 +77,6 @@ where &settings.state.to_string(), &settings.prover.to_string(), status, - block, ) .await; @@ -169,7 +167,10 @@ async fn exec() -> anyhow::Result<()> { _ => {} }; + // get the file version early let file_version = dat::read_file_version(&wallet_path); + // we register for background sync if its interactive + let mut is_interactive = false; // get our wallet ready let mut wallet: Wallet = match cmd { @@ -202,7 +203,7 @@ async fn exec() -> anyhow::Result<()> { Command::Restore { file } => { let (mut w, pwd) = match file { Some(file) => { - // if we restore and old version file make sure we + // if we restore an old version file make sure we // know the corrrect version before asking for the // password let file_version = dat::read_file_version(file)?; @@ -264,6 +265,7 @@ async fn exec() -> anyhow::Result<()> { } }, None => { + is_interactive = true; // load a wallet in interactive mode interactive::load_wallet(&wallet_path, &settings, file_version)? } @@ -275,10 +277,12 @@ async fn exec() -> anyhow::Result<()> { false => status::interactive, }; - // we block until we connect and sync if its not a interactive command - let block = cmd.is_some(); + wallet = connect(wallet, &settings, status_cb).await; - wallet = connect(wallet, &settings, status_cb, block).await; + if is_interactive { + // register for a async-background sync if we are in interactive + wallet.register_sync().await?; + } // run command match cmd { diff --git a/src/clients.rs b/src/clients.rs index b290ad01..49f8dc62 100644 --- a/src/clients.rs +++ b/src/clients.rs @@ -22,6 +22,7 @@ use phoenix_core::{Crossover, Fee, Note}; use poseidon_merkle::Opening as PoseidonOpening; use tokio::time::{sleep, Duration}; +use std::fmt::Debug; use std::path::Path; use std::sync::{Arc, Mutex}; @@ -32,7 +33,7 @@ use super::cache::Cache; use crate::rusk::{RuskHttpClient, RuskRequest}; use crate::store::LocalStore; -use crate::Error; +use crate::{Error, SecureWalletFile, Wallet}; const STCT_INPUT_SIZE: usize = Fee::SIZE + Crossover::SIZE @@ -187,12 +188,13 @@ struct InnerState { impl StateStore { /// Creates a new state instance. Should only be called once. - pub(crate) fn new( + pub(crate) fn new( client: RuskHttpClient, data_dir: &Path, - store: LocalStore, + wallet: &mut Wallet, status: fn(&str), ) -> Result { + let store = wallet.store.clone(); let cache = Arc::new(Cache::new(data_dir, &store, status)?); let inner = Mutex::new(InnerState { client, cache }); @@ -222,7 +224,8 @@ impl StateStore { let _ = sender.send("Syncing..".to_string()); if let Err(e) = - sync_db(&mut client, &store, cache.as_ref(), status).await + sync_db(&mut client, &store, cache.as_ref(), status, &[]) + .await { // Sender should not panic and if it does something is wrong // and we should abort only when there's an error because it @@ -240,14 +243,44 @@ impl StateStore { Ok(()) } + /// Only blocking sync adds new addresses in the wallet if it detects some + /// are missing. We call this during recovery and when we connect first to + /// the network to ensure that all addresses with funds are created. + /// + /// Wallet isn't send so we cannot add addresses to it in async task for + /// register_sync + /// + /// This gets called automatically when you call `wallet.connect` #[allow(clippy::await_holding_lock)] - pub async fn sync(&self) -> Result<(), Error> { + pub async fn sync(&self, wallet: &mut Wallet) -> Result<(), Error> + where + F: SecureWalletFile + Debug, + { let state = self.inner.lock().unwrap(); let status = self.status; let store = self.store.clone(); let mut client = state.client.clone(); - sync_db(&mut client, &store, state.cache.as_ref(), status).await + let existing_addresses = wallet.addresses().len(); + + let num_of_addresses = sync_db( + &mut client, + &store, + state.cache.as_ref(), + status, + wallet.addresses(), + ) + .await?; + + for _ in existing_addresses..=num_of_addresses { + // create addresses which are not there + wallet.new_address(); + } + + // save the new address count in the wallet file + wallet.save()?; + + Ok(()) } } diff --git a/src/clients/sync.rs b/src/clients/sync.rs index 170789f6..8ba892b5 100644 --- a/src/clients/sync.rs +++ b/src/clients/sync.rs @@ -11,7 +11,7 @@ use futures::StreamExt; use phoenix_core::transaction::{ArchivedTreeLeaf, TreeLeaf}; use crate::{ - clients::Cache, rusk::RuskHttpClient, store::LocalStore, Error, + clients::Cache, rusk::RuskHttpClient, store::LocalStore, Address, Error, RuskRequest, MAX_ADDRESSES, }; @@ -24,7 +24,8 @@ pub(crate) async fn sync_db( store: &LocalStore, cache: &Cache, status: fn(&str), -) -> Result<(), Error> { + existing_addresses: &[Address], +) -> Result { let addresses: Vec<_> = (0..MAX_ADDRESSES) .flat_map(|i| store.retrieve_ssk(i as u64)) .map(|ssk| { @@ -63,6 +64,8 @@ pub(crate) async fn sync_db( // This buffer is needed because `.bytes_stream();` introduce additional // spliting of chunks according to it's own buffer let mut buffer = vec![]; + // This stores the number of addresses we need to create after a sync-up + let mut addresses_to_create = 0; while let Some(http_chunk) = stream.next().await { buffer.extend_from_slice(&http_chunk?); @@ -75,8 +78,12 @@ pub(crate) async fn sync_db( last_height = std::cmp::max(last_height, block_height); - for (ssk, vk, psk) in addresses.iter() { + for (i, (ssk, vk, psk)) in addresses.iter().enumerate() { if vk.owns(¬e) { + if existing_addresses.get(i).is_none() { + addresses_to_create = i; + } + let note_data = (note, note.gen_nullifier(ssk)); cache.insert(psk, block_height, note_data)?; @@ -92,5 +99,5 @@ pub(crate) async fn sync_db( cache.insert_last_height(last_height)?; - Ok(()) + Ok(addresses_to_create) } diff --git a/src/wallet.rs b/src/wallet.rs index c74a7438..764def68 100644 --- a/src/wallet.rs +++ b/src/wallet.rs @@ -61,7 +61,7 @@ use crate::store; pub struct Wallet { wallet: Option>, addresses: Vec
, - store: LocalStore, + pub(crate) store: LocalStore, file: Option, file_version: Option, status: fn(status: &str), @@ -235,7 +235,6 @@ impl Wallet { rusk_addr: S, prov_addr: S, status: fn(&str), - block: bool, ) -> Result<(), Error> where S: Into, @@ -256,29 +255,17 @@ impl Wallet { } }; - let (sync_tx, sync_rx) = flume::unbounded::(); - // create a state client - let state = StateStore::new( - rusk.state, - &cache_dir, - self.store.clone(), - status, - )?; - - if block { - state.sync().await?; - } else { - state.register_sync(sync_tx).await?; - } + let state = StateStore::new(rusk.state, &cache_dir, self, status)?; + // Do a blocking sync as we connect, register for background sync + // externally + state.sync(self).await?; // create wallet instance self.wallet = Some(WalletCore::new(self.store.clone(), state, prover)); // set our own status callback self.status = status; - // set sync reciever to notify successful sync - self.sync_rx = Some(sync_rx); Ok(()) } @@ -687,6 +674,22 @@ impl Wallet { Err(Error::WalletFileMissing) } } + + /// Helper function to register for async-sync outside of connect + pub async fn register_sync(&mut self) -> Result<(), Error> { + if let Some(core_wallet) = &self.wallet { + let state = core_wallet.state(); + let (sync_tx, sync_rx) = flume::unbounded::(); + + state.register_sync(sync_tx).await?; + + self.sync_rx = Some(sync_rx); + + Ok(()) + } else { + Err(Error::Offline) + } + } } /// This structs represent a Note decoded enriched with useful chain information