From ffd878581ab0ccc2c57069813a4ca4af0f996d76 Mon Sep 17 00:00:00 2001 From: Daksh Date: Tue, 22 Aug 2023 14:29:06 -0400 Subject: [PATCH 1/3] Create addresses which are found in the network but are not in wallet-cli --- src/bin/main.rs | 16 ++++++++++------ src/clients.rs | 42 ++++++++++++++++++++++++++++++++++++------ src/clients/sync.rs | 19 ++++++++++++++----- src/wallet.rs | 39 +++++++++++++++++++++------------------ 4 files changed, 81 insertions(+), 35 deletions(-) diff --git a/src/bin/main.rs b/src/bin/main.rs index 9b964b07..40b5536c 100644 --- a/src/bin/main.rs +++ b/src/bin/main.rs @@ -68,7 +68,6 @@ async fn connect( mut wallet: Wallet, settings: &Settings, status: fn(&str), - block: bool, ) -> Wallet where F: SecureWalletFile + std::fmt::Debug, @@ -78,7 +77,6 @@ where &settings.state.to_string(), &settings.prover.to_string(), status, - block, ) .await; @@ -169,7 +167,10 @@ async fn exec() -> anyhow::Result<()> { _ => {} }; + // get the file version early let file_version = dat::read_file_version(&wallet_path); + // we register for background sync if its interactive + let mut is_interactive = false; // get our wallet ready let mut wallet: Wallet = match cmd { @@ -202,7 +203,7 @@ async fn exec() -> anyhow::Result<()> { Command::Restore { file } => { let (mut w, pwd) = match file { Some(file) => { - // if we restore and old version file make sure we + // if we restore an old version file make sure we // know the corrrect version before asking for the // password let file_version = dat::read_file_version(file)?; @@ -264,6 +265,7 @@ async fn exec() -> anyhow::Result<()> { } }, None => { + is_interactive = true; // load a wallet in interactive mode interactive::load_wallet(&wallet_path, &settings, file_version)? } @@ -275,10 +277,12 @@ async fn exec() -> anyhow::Result<()> { false => status::interactive, }; - // we block until we connect and sync if its not a interactive command - let block = cmd.is_some(); + wallet = connect(wallet, &settings, status_cb).await; - wallet = connect(wallet, &settings, status_cb, block).await; + if is_interactive { + // register for a async-background sync if we are in interactive + wallet.register_sync().await?; + } // run command match cmd { diff --git a/src/clients.rs b/src/clients.rs index b290ad01..6b12730b 100644 --- a/src/clients.rs +++ b/src/clients.rs @@ -22,6 +22,7 @@ use phoenix_core::{Crossover, Fee, Note}; use poseidon_merkle::Opening as PoseidonOpening; use tokio::time::{sleep, Duration}; +use std::fmt::Debug; use std::path::Path; use std::sync::{Arc, Mutex}; @@ -32,7 +33,7 @@ use super::cache::Cache; use crate::rusk::{RuskHttpClient, RuskRequest}; use crate::store::LocalStore; -use crate::Error; +use crate::{Address, Error, SecureWalletFile, Wallet}; const STCT_INPUT_SIZE: usize = Fee::SIZE + Crossover::SIZE @@ -187,12 +188,13 @@ struct InnerState { impl StateStore { /// Creates a new state instance. Should only be called once. - pub(crate) fn new( + pub(crate) fn new( client: RuskHttpClient, data_dir: &Path, - store: LocalStore, + wallet: &mut Wallet, status: fn(&str), ) -> Result { + let store = wallet.store.clone(); let cache = Arc::new(Cache::new(data_dir, &store, status)?); let inner = Mutex::new(InnerState { client, cache }); @@ -222,7 +224,8 @@ impl StateStore { let _ = sender.send("Syncing..".to_string()); if let Err(e) = - sync_db(&mut client, &store, cache.as_ref(), status).await + sync_db(&mut client, &store, cache.as_ref(), status, None) + .await { // Sender should not panic and if it does something is wrong // and we should abort only when there's an error because it @@ -240,14 +243,41 @@ impl StateStore { Ok(()) } + /// Only blocking sync adds new addresses in the wallet if it detects some + /// are missing. We call this during recovery and when we connect first to + /// the network to ensure that all addresses with funds are created. + /// + /// Wallet isn't send so we cannot add addresses to it in async task for + /// register_sync + /// + /// This gets called automatically when you call `wallet.connect` #[allow(clippy::await_holding_lock)] - pub async fn sync(&self) -> Result<(), Error> { + pub async fn sync(&self, wallet: &mut Wallet) -> Result<(), Error> + where + F: SecureWalletFile + Debug, + { let state = self.inner.lock().unwrap(); let status = self.status; let store = self.store.clone(); let mut client = state.client.clone(); - sync_db(&mut client, &store, state.cache.as_ref(), status).await + let num_of_addresses = sync_db( + &mut client, + &store, + state.cache.as_ref(), + status, + Some(wallet.addresses()), + ) + .await?; + + for _ in 0..num_of_addresses { + // create addresses which are not there + wallet.new_address(); + } + // save the new address count in the wallet file + wallet.save()?; + + Ok(()) } } diff --git a/src/clients/sync.rs b/src/clients/sync.rs index 170789f6..1edca473 100644 --- a/src/clients/sync.rs +++ b/src/clients/sync.rs @@ -4,14 +4,14 @@ // // Copyright (c) DUSK NETWORK. All rights reserved. -use std::mem::size_of; +use std::{mem::size_of, ops::AddAssign}; use dusk_wallet_core::Store; use futures::StreamExt; use phoenix_core::transaction::{ArchivedTreeLeaf, TreeLeaf}; use crate::{ - clients::Cache, rusk::RuskHttpClient, store::LocalStore, Error, + clients::Cache, rusk::RuskHttpClient, store::LocalStore, Address, Error, RuskRequest, MAX_ADDRESSES, }; @@ -24,7 +24,8 @@ pub(crate) async fn sync_db( store: &LocalStore, cache: &Cache, status: fn(&str), -) -> Result<(), Error> { + existing_addresses: Option<&[Address]>, +) -> Result { let addresses: Vec<_> = (0..MAX_ADDRESSES) .flat_map(|i| store.retrieve_ssk(i as u64)) .map(|ssk| { @@ -63,6 +64,8 @@ pub(crate) async fn sync_db( // This buffer is needed because `.bytes_stream();` introduce additional // spliting of chunks according to it's own buffer let mut buffer = vec![]; + // This stores the number of addresses we need to create after a sync-up + let mut addresses_to_create = 0u8; while let Some(http_chunk) = stream.next().await { buffer.extend_from_slice(&http_chunk?); @@ -75,8 +78,14 @@ pub(crate) async fn sync_db( last_height = std::cmp::max(last_height, block_height); - for (ssk, vk, psk) in addresses.iter() { + for (i, (ssk, vk, psk)) in addresses.iter().enumerate() { if vk.owns(¬e) { + if let Some(existing_addresses) = existing_addresses { + if existing_addresses.get(i).is_none() { + addresses_to_create.add_assign(1); + } + } + let note_data = (note, note.gen_nullifier(ssk)); cache.insert(psk, block_height, note_data)?; @@ -92,5 +101,5 @@ pub(crate) async fn sync_db( cache.insert_last_height(last_height)?; - Ok(()) + Ok(addresses_to_create) } diff --git a/src/wallet.rs b/src/wallet.rs index c74a7438..764def68 100644 --- a/src/wallet.rs +++ b/src/wallet.rs @@ -61,7 +61,7 @@ use crate::store; pub struct Wallet { wallet: Option>, addresses: Vec
, - store: LocalStore, + pub(crate) store: LocalStore, file: Option, file_version: Option, status: fn(status: &str), @@ -235,7 +235,6 @@ impl Wallet { rusk_addr: S, prov_addr: S, status: fn(&str), - block: bool, ) -> Result<(), Error> where S: Into, @@ -256,29 +255,17 @@ impl Wallet { } }; - let (sync_tx, sync_rx) = flume::unbounded::(); - // create a state client - let state = StateStore::new( - rusk.state, - &cache_dir, - self.store.clone(), - status, - )?; - - if block { - state.sync().await?; - } else { - state.register_sync(sync_tx).await?; - } + let state = StateStore::new(rusk.state, &cache_dir, self, status)?; + // Do a blocking sync as we connect, register for background sync + // externally + state.sync(self).await?; // create wallet instance self.wallet = Some(WalletCore::new(self.store.clone(), state, prover)); // set our own status callback self.status = status; - // set sync reciever to notify successful sync - self.sync_rx = Some(sync_rx); Ok(()) } @@ -687,6 +674,22 @@ impl Wallet { Err(Error::WalletFileMissing) } } + + /// Helper function to register for async-sync outside of connect + pub async fn register_sync(&mut self) -> Result<(), Error> { + if let Some(core_wallet) = &self.wallet { + let state = core_wallet.state(); + let (sync_tx, sync_rx) = flume::unbounded::(); + + state.register_sync(sync_tx).await?; + + self.sync_rx = Some(sync_rx); + + Ok(()) + } else { + Err(Error::Offline) + } + } } /// This structs represent a Note decoded enriched with useful chain information From beabe21b10e4587ebcdbfc0a1f326c2b2ed112c5 Mon Sep 17 00:00:00 2001 From: Daksh Date: Tue, 22 Aug 2023 14:41:55 -0400 Subject: [PATCH 2/3] Fix clippy error --- src/clients.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/clients.rs b/src/clients.rs index 6b12730b..42ecf87a 100644 --- a/src/clients.rs +++ b/src/clients.rs @@ -33,7 +33,7 @@ use super::cache::Cache; use crate::rusk::{RuskHttpClient, RuskRequest}; use crate::store::LocalStore; -use crate::{Address, Error, SecureWalletFile, Wallet}; +use crate::{Error, SecureWalletFile, Wallet}; const STCT_INPUT_SIZE: usize = Fee::SIZE + Crossover::SIZE From e66fb4779b83c724211e4f74ce2cee0bbedb3874 Mon Sep 17 00:00:00 2001 From: Daksh Date: Fri, 25 Aug 2023 16:09:31 -0400 Subject: [PATCH 3/3] Don't count previous addresses --- src/clients.rs | 9 ++++++--- src/clients/sync.rs | 14 ++++++-------- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/src/clients.rs b/src/clients.rs index 42ecf87a..49f8dc62 100644 --- a/src/clients.rs +++ b/src/clients.rs @@ -224,7 +224,7 @@ impl StateStore { let _ = sender.send("Syncing..".to_string()); if let Err(e) = - sync_db(&mut client, &store, cache.as_ref(), status, None) + sync_db(&mut client, &store, cache.as_ref(), status, &[]) .await { // Sender should not panic and if it does something is wrong @@ -261,19 +261,22 @@ impl StateStore { let store = self.store.clone(); let mut client = state.client.clone(); + let existing_addresses = wallet.addresses().len(); + let num_of_addresses = sync_db( &mut client, &store, state.cache.as_ref(), status, - Some(wallet.addresses()), + wallet.addresses(), ) .await?; - for _ in 0..num_of_addresses { + for _ in existing_addresses..=num_of_addresses { // create addresses which are not there wallet.new_address(); } + // save the new address count in the wallet file wallet.save()?; diff --git a/src/clients/sync.rs b/src/clients/sync.rs index 1edca473..8ba892b5 100644 --- a/src/clients/sync.rs +++ b/src/clients/sync.rs @@ -4,7 +4,7 @@ // // Copyright (c) DUSK NETWORK. All rights reserved. -use std::{mem::size_of, ops::AddAssign}; +use std::mem::size_of; use dusk_wallet_core::Store; use futures::StreamExt; @@ -24,8 +24,8 @@ pub(crate) async fn sync_db( store: &LocalStore, cache: &Cache, status: fn(&str), - existing_addresses: Option<&[Address]>, -) -> Result { + existing_addresses: &[Address], +) -> Result { let addresses: Vec<_> = (0..MAX_ADDRESSES) .flat_map(|i| store.retrieve_ssk(i as u64)) .map(|ssk| { @@ -65,7 +65,7 @@ pub(crate) async fn sync_db( // spliting of chunks according to it's own buffer let mut buffer = vec![]; // This stores the number of addresses we need to create after a sync-up - let mut addresses_to_create = 0u8; + let mut addresses_to_create = 0; while let Some(http_chunk) = stream.next().await { buffer.extend_from_slice(&http_chunk?); @@ -80,10 +80,8 @@ pub(crate) async fn sync_db( for (i, (ssk, vk, psk)) in addresses.iter().enumerate() { if vk.owns(¬e) { - if let Some(existing_addresses) = existing_addresses { - if existing_addresses.get(i).is_none() { - addresses_to_create.add_assign(1); - } + if existing_addresses.get(i).is_none() { + addresses_to_create = i; } let note_data = (note, note.gen_nullifier(ssk));