Skip to content
This repository was archived by the owner on Dec 16, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions src/bin/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ async fn connect<F>(
mut wallet: Wallet<F>,
settings: &Settings,
status: fn(&str),
block: bool,
) -> Wallet<F>
where
F: SecureWalletFile + std::fmt::Debug,
Expand All @@ -78,7 +77,6 @@ where
&settings.state.to_string(),
&settings.prover.to_string(),
status,
block,
)
.await;

Expand Down Expand Up @@ -169,7 +167,10 @@ async fn exec() -> anyhow::Result<()> {
_ => {}
};

// get the file version early
let file_version = dat::read_file_version(&wallet_path);
// we register for background sync if its interactive
let mut is_interactive = false;

// get our wallet ready
let mut wallet: Wallet<WalletFile> = match cmd {
Expand Down Expand Up @@ -202,7 +203,7 @@ async fn exec() -> anyhow::Result<()> {
Command::Restore { file } => {
let (mut w, pwd) = match file {
Some(file) => {
// if we restore and old version file make sure we
// if we restore an old version file make sure we
// know the corrrect version before asking for the
// password
let file_version = dat::read_file_version(file)?;
Expand Down Expand Up @@ -264,6 +265,7 @@ async fn exec() -> anyhow::Result<()> {
}
},
None => {
is_interactive = true;
// load a wallet in interactive mode
interactive::load_wallet(&wallet_path, &settings, file_version)?
}
Expand All @@ -275,10 +277,12 @@ async fn exec() -> anyhow::Result<()> {
false => status::interactive,
};

// we block until we connect and sync if its not a interactive command
let block = cmd.is_some();
wallet = connect(wallet, &settings, status_cb).await;

wallet = connect(wallet, &settings, status_cb, block).await;
if is_interactive {
// register for a async-background sync if we are in interactive
wallet.register_sync().await?;
}

// run command
match cmd {
Expand Down
45 changes: 39 additions & 6 deletions src/clients.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use phoenix_core::{Crossover, Fee, Note};
use poseidon_merkle::Opening as PoseidonOpening;
use tokio::time::{sleep, Duration};

use std::fmt::Debug;
use std::path::Path;
use std::sync::{Arc, Mutex};

Expand All @@ -32,7 +33,7 @@ use super::cache::Cache;

use crate::rusk::{RuskHttpClient, RuskRequest};
use crate::store::LocalStore;
use crate::Error;
use crate::{Error, SecureWalletFile, Wallet};

const STCT_INPUT_SIZE: usize = Fee::SIZE
+ Crossover::SIZE
Expand Down Expand Up @@ -187,12 +188,13 @@ struct InnerState {

impl StateStore {
/// Creates a new state instance. Should only be called once.
pub(crate) fn new(
pub(crate) fn new<F: SecureWalletFile + Debug>(
client: RuskHttpClient,
data_dir: &Path,
store: LocalStore,
wallet: &mut Wallet<F>,
status: fn(&str),
) -> Result<Self, Error> {
let store = wallet.store.clone();
let cache = Arc::new(Cache::new(data_dir, &store, status)?);
let inner = Mutex::new(InnerState { client, cache });

Expand Down Expand Up @@ -222,7 +224,8 @@ impl StateStore {
let _ = sender.send("Syncing..".to_string());

if let Err(e) =
sync_db(&mut client, &store, cache.as_ref(), status).await
sync_db(&mut client, &store, cache.as_ref(), status, &[])
.await
{
// Sender should not panic and if it does something is wrong
// and we should abort only when there's an error because it
Expand All @@ -240,14 +243,44 @@ impl StateStore {
Ok(())
}

/// Only blocking sync adds new addresses in the wallet if it detects some
/// are missing. We call this during recovery and when we connect first to
/// the network to ensure that all addresses with funds are created.
///
/// Wallet isn't send so we cannot add addresses to it in async task for
/// register_sync
///
/// This gets called automatically when you call `wallet.connect`
#[allow(clippy::await_holding_lock)]
pub async fn sync(&self) -> Result<(), Error> {
pub async fn sync<F>(&self, wallet: &mut Wallet<F>) -> Result<(), Error>
where
F: SecureWalletFile + Debug,
{
let state = self.inner.lock().unwrap();
let status = self.status;
let store = self.store.clone();
let mut client = state.client.clone();

sync_db(&mut client, &store, state.cache.as_ref(), status).await
let existing_addresses = wallet.addresses().len();

let num_of_addresses = sync_db(
&mut client,
&store,
state.cache.as_ref(),
status,
wallet.addresses(),
)
.await?;

for _ in existing_addresses..=num_of_addresses {
// create addresses which are not there
wallet.new_address();
}

// save the new address count in the wallet file
wallet.save()?;

Ok(())
}
}

Expand Down
15 changes: 11 additions & 4 deletions src/clients/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use futures::StreamExt;
use phoenix_core::transaction::{ArchivedTreeLeaf, TreeLeaf};

use crate::{
clients::Cache, rusk::RuskHttpClient, store::LocalStore, Error,
clients::Cache, rusk::RuskHttpClient, store::LocalStore, Address, Error,
RuskRequest, MAX_ADDRESSES,
};

Expand All @@ -24,7 +24,8 @@ pub(crate) async fn sync_db(
store: &LocalStore,
cache: &Cache,
status: fn(&str),
) -> Result<(), Error> {
existing_addresses: &[Address],
) -> Result<usize, Error> {
let addresses: Vec<_> = (0..MAX_ADDRESSES)
.flat_map(|i| store.retrieve_ssk(i as u64))
.map(|ssk| {
Expand Down Expand Up @@ -63,6 +64,8 @@ pub(crate) async fn sync_db(
// This buffer is needed because `.bytes_stream();` introduce additional
// spliting of chunks according to it's own buffer
let mut buffer = vec![];
// This stores the number of addresses we need to create after a sync-up
let mut addresses_to_create = 0;

while let Some(http_chunk) = stream.next().await {
buffer.extend_from_slice(&http_chunk?);
Expand All @@ -75,8 +78,12 @@ pub(crate) async fn sync_db(

last_height = std::cmp::max(last_height, block_height);

for (ssk, vk, psk) in addresses.iter() {
for (i, (ssk, vk, psk)) in addresses.iter().enumerate() {
if vk.owns(&note) {
if existing_addresses.get(i).is_none() {
addresses_to_create = i;
}

let note_data = (note, note.gen_nullifier(ssk));
cache.insert(psk, block_height, note_data)?;

Expand All @@ -92,5 +99,5 @@ pub(crate) async fn sync_db(

cache.insert_last_height(last_height)?;

Ok(())
Ok(addresses_to_create)
}
39 changes: 21 additions & 18 deletions src/wallet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ use crate::store;
pub struct Wallet<F: SecureWalletFile + Debug> {
wallet: Option<WalletCore<LocalStore, StateStore, Prover>>,
addresses: Vec<Address>,
store: LocalStore,
pub(crate) store: LocalStore,
file: Option<F>,
file_version: Option<DatFileVersion>,
status: fn(status: &str),
Expand Down Expand Up @@ -235,7 +235,6 @@ impl<F: SecureWalletFile + Debug> Wallet<F> {
rusk_addr: S,
prov_addr: S,
status: fn(&str),
block: bool,
) -> Result<(), Error>
where
S: Into<String>,
Expand All @@ -256,29 +255,17 @@ impl<F: SecureWalletFile + Debug> Wallet<F> {
}
};

let (sync_tx, sync_rx) = flume::unbounded::<String>();

// create a state client
let state = StateStore::new(
rusk.state,
&cache_dir,
self.store.clone(),
status,
)?;

if block {
state.sync().await?;
} else {
state.register_sync(sync_tx).await?;
}
let state = StateStore::new(rusk.state, &cache_dir, self, status)?;
// Do a blocking sync as we connect, register for background sync
// externally
state.sync(self).await?;

// create wallet instance
self.wallet = Some(WalletCore::new(self.store.clone(), state, prover));

// set our own status callback
self.status = status;
// set sync reciever to notify successful sync
self.sync_rx = Some(sync_rx);

Ok(())
}
Expand Down Expand Up @@ -687,6 +674,22 @@ impl<F: SecureWalletFile + Debug> Wallet<F> {
Err(Error::WalletFileMissing)
}
}

/// Helper function to register for async-sync outside of connect
pub async fn register_sync(&mut self) -> Result<(), Error> {
if let Some(core_wallet) = &self.wallet {
let state = core_wallet.state();
let (sync_tx, sync_rx) = flume::unbounded::<String>();

state.register_sync(sync_tx).await?;

self.sync_rx = Some(sync_rx);

Ok(())
} else {
Err(Error::Offline)
}
}
}

/// This structs represent a Note decoded enriched with useful chain information
Expand Down