Skip to content
This repository was archived by the owner on Aug 31, 2023. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
244 changes: 129 additions & 115 deletions rust/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ use crate::offer::Offer;
use crate::wallet;
use crate::wallet::Balance;
use crate::wallet::LightningTransaction;
use anyhow::anyhow;
use anyhow::Context;
use anyhow::Result;
use flutter_rust_bridge::StreamSink;
Expand All @@ -22,13 +21,14 @@ use lightning_invoice::Invoice;
use lightning_invoice::InvoiceDescription;
use rust_decimal::prelude::ToPrimitive;
use rust_decimal::Decimal;
use state::Storage;
use std::ops::Add;
use std::path::Path;
use std::str::FromStr;
use std::time::SystemTime;
use time::Duration;
pub use time::OffsetDateTime;
use tokio::try_join;
use tokio::runtime::Runtime;

pub struct Address {
pub address: String,
Expand Down Expand Up @@ -168,91 +168,114 @@ impl WalletInfo {
Ok(tx_history)
}
}
/// Lazily creates a multi threaded runtime with the the number of worker threads corresponding to
/// the number of available cores.
fn runtime() -> Result<&'static Runtime> {
static RUNTIME: Storage<Runtime> = Storage::new();

#[tokio::main(flavor = "current_thread")]
pub async fn refresh_wallet_info() -> Result<WalletInfo> {
wallet::sync()?;
WalletInfo::build_wallet_info().await
if RUNTIME.try_get().is_none() {
let runtime = Runtime::new()?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔧 I think we should expect here. When we didn't have to deal with building the Runtime explicitly (when we were using #[tokio::main], we must have been doing it implicitly anyway.

This can only be called once, so it is unfortunate that we need to deal with a possibly fallible function because of that. Furthermore, if we can't build the Runtime we cannot easily recover from this AFAIK. The best strategy in such a situation would probably be to crash the application anyway.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can be a problem if it makes us return Results in places where it wasn't already necessary, but I think all the functions which use this were already returning Results in the first place. So it doesn't matter so much in practice.

RUNTIME.set(runtime);
}
Comment on lines +176 to +179
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❓ Can't we make get_or_set work. I think that's what the library would want you to use here.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried, but I wasn't able to get it to work with a Result - do you have a suggestion how this could work?


Ok(RUNTIME.get())
}

pub fn refresh_wallet_info() -> Result<WalletInfo> {
runtime()?.block_on(async {
wallet::sync()?;
WalletInfo::build_wallet_info().await
})
}

#[tokio::main(flavor = "current_thread")]
pub async fn run(stream: StreamSink<Event>, app_dir: String) -> Result<()> {
pub fn run(stream: StreamSink<Event>, app_dir: String) -> Result<()> {
let network = config::network();
anyhow::ensure!(!app_dir.is_empty(), "app_dir must not be empty");
stream.add(Event::Init(format!("Initialising {network} wallet")));
wallet::init_wallet(Path::new(app_dir.as_str()))?;

stream.add(Event::Init("Initialising database".to_string()));
db::init_db(
&Path::new(app_dir.as_str())
.join(network.to_string())
.join("taker.sqlite"),
)
.await?;

stream.add(Event::Init("Starting full ldk node".to_string()));
let background_processor = wallet::run_ldk().await?;

stream.add(Event::Init("Fetching an offer".to_string()));
stream.add(Event::Offer(offer::get_offer().await.ok()));

stream.add(Event::Init("Fetching your balance".to_string()));
stream.add(Event::WalletInfo(
WalletInfo::build_wallet_info().await.ok(),
));
stream.add(Event::Init("Checking channel state".to_string()));
stream.add(Event::ChannelState(get_channel_state()));

stream.add(Event::Init("Ready".to_string()));
stream.add(Event::Ready);

// spawn a connection task keeping the connection with the maker alive.
let peer_manager = wallet::get_peer_manager()?;
let connection_handle = connection::spawn(peer_manager);

// sync offers every 5 seconds
let offer_handle = offer::spawn(stream.clone());

// sync wallet every 60 seconds
let wallet_sync_handle = tokio::spawn(async {
loop {
wallet::sync().unwrap_or_else(|e| tracing::error!(?e, "Failed to sync wallet"));
tokio::time::sleep(std::time::Duration::from_secs(60)).await;
}
});

// sync wallet info every 10 seconds
let wallet_info_stream = stream.clone();
let wallet_info_sync_handle = tokio::spawn(async move {
loop {
match WalletInfo::build_wallet_info().await {
Ok(wallet_info) => {
let _ = wallet_info_stream.add(Event::WalletInfo(Some(wallet_info)));
let runtime = runtime()?;
runtime.block_on(async move {
stream.add(Event::Init(format!("Initialising {network} wallet")));
wallet::init_wallet(Path::new(app_dir.as_str()))?;

stream.add(Event::Init("Initialising database".to_string()));
db::init_db(
&Path::new(app_dir.as_str())
.join(network.to_string())
.join("taker.sqlite"),
)
.await?;

stream.add(Event::Init("Starting full ldk node".to_string()));
let background_processor = wallet::run_ldk()?;

stream.add(Event::Init("Fetching an offer".to_string()));
stream.add(Event::Offer(offer::get_offer().await.ok()));

stream.add(Event::Init("Fetching your balance".to_string()));
stream.add(Event::WalletInfo(
WalletInfo::build_wallet_info().await.ok(),
));
stream.add(Event::Init("Checking channel state".to_string()));
stream.add(Event::ChannelState(get_channel_state()));

stream.add(Event::Init("Ready".to_string()));
stream.add(Event::Ready);
Comment on lines +196 to +221
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤡 I am okay with this pragmatic approach where we just explicitly call all of these once so that we can return Ready afterwards and let the UI know that we are done with the startup. But eventually it might be nice to not need to explicitly send a Ready event.

Perhaps the frontend could expect a few events to be emitted before considering the app ready, without needing to be told.


// spawn a connection task keeping the connection with the maker alive.
runtime.spawn(async move {
let peer_info = config::maker_peer_info();
loop {
let peer_manager = wallet::get_peer_manager();
Comment on lines +225 to +227
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔧 Is this possible? That way we only call it once.

Suggested change
let peer_info = config::maker_peer_info();
loop {
let peer_manager = wallet::get_peer_manager();
let peer_info = config::maker_peer_info();
let peer_manager = wallet::get_peer_manager();
loop {

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had it like this before, but I think clippy complained about the peer_manager being moved or something.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can probably add .clone() to it to bypass clippy

connection::connect(peer_manager, peer_info).await;
// add a delay before retrying to connect
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
});
Comment on lines +223 to +232
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🙃 We could extract the bodies of the spawned tasks into functions. It might aid readability.

Suggested change
// spawn a connection task keeping the connection with the maker alive.
runtime.spawn(async move {
let peer_info = config::maker_peer_info();
loop {
let peer_manager = wallet::get_peer_manager();
connection::connect(peer_manager, peer_info).await;
// add a delay before retrying to connect
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
});
runtime.spawn(stay_connected_to_maker());
// ...
fn async stay_connected_to_maker() {
let peer_info = config::maker_peer_info();
loop {
let peer_manager = wallet::get_peer_manager();
connection::connect(peer_manager, peer_info).await;
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}


let offer_stream = stream.clone();
runtime.spawn(async move {
loop {
offer_stream.add(Event::Offer(offer::get_offer().await.ok()));
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
});

runtime.spawn(async {
loop {
wallet::sync().unwrap_or_else(|e| tracing::error!(?e, "Failed to sync wallet"));
tokio::time::sleep(std::time::Duration::from_secs(60)).await;
}
});

let wallet_info_stream = stream.clone();
runtime.spawn(async move {
loop {
match WalletInfo::build_wallet_info().await {
Ok(wallet_info) => {
let _ = wallet_info_stream.add(Event::WalletInfo(Some(wallet_info)));
}
Err(e) => tracing::error!(?e, "Failed to build wallet info"),
}
Err(e) => tracing::error!(?e, "Failed to build wallet info"),
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
}
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
}
});

// sync channel state every 5 seconds
let channel_state_stream = stream.clone();
let channel_state_handle = tokio::spawn(async move {
loop {
channel_state_stream.add(Event::ChannelState(get_channel_state()));
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
});
});

try_join!(
connection_handle,
offer_handle,
wallet_sync_handle,
wallet_info_sync_handle,
channel_state_handle,
)?;
let channel_state_stream = stream.clone();
runtime.spawn(async move {
loop {
channel_state_stream.add(Event::ChannelState(get_channel_state()));
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
});

background_processor.join().map_err(|e| anyhow!(e))
runtime.spawn_blocking(move || {
// background processor joins on a sync thread, meaning that join here will block a
// full thread, which is dis-encouraged to do in async code.
if let Err(err) = background_processor.join() {
tracing::error!(?err, "Background processor stopped unexpected");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔧 It's an adverb.

Suggested change
tracing::error!(?err, "Background processor stopped unexpected");
tracing::error!(?err, "Background processor stopped unexpectedly");

}
});
Ok(())
})
}

pub fn get_balance() -> Result<Balance> {
Expand All @@ -279,16 +302,18 @@ pub fn network() -> SyncReturn<String> {
SyncReturn(config::network().to_string())
}

#[tokio::main(flavor = "current_thread")]
pub async fn open_channel(taker_amount: u64) -> Result<()> {
let peer_info = config::maker_peer_info();
wallet::open_channel(peer_info, taker_amount).await
pub fn open_channel(taker_amount: u64) -> Result<()> {
runtime()?.block_on(async {
let peer_info = config::maker_peer_info();
wallet::open_channel(peer_info, taker_amount).await
})
}

#[tokio::main(flavor = "current_thread")]
pub async fn close_channel() -> Result<()> {
let peer_info = config::maker_peer_info();
wallet::close_channel(peer_info.pubkey, false).await
pub fn close_channel() -> Result<()> {
runtime()?.block_on(async {
let peer_info = config::maker_peer_info();
wallet::close_channel(peer_info.pubkey, false).await
})
}

pub fn send_to_address(address: String, amount: u64) -> Result<String> {
Expand All @@ -301,42 +326,32 @@ pub fn send_to_address(address: String, amount: u64) -> Result<String> {
Ok(txid)
}

#[tokio::main(flavor = "current_thread")]
pub async fn list_cfds() -> Result<Vec<Cfd>> {
let mut conn = db::acquire().await?;
cfd::load_cfds(&mut conn).await
pub fn list_cfds() -> Result<Vec<Cfd>> {
runtime()?.block_on(async {
let mut conn = db::acquire().await?;
cfd::load_cfds(&mut conn).await
})
}

#[tokio::main(flavor = "current_thread")]
pub async fn open_cfd(order: Order) -> Result<()> {
cfd::open(&order).await
pub fn open_cfd(order: Order) -> Result<()> {
runtime()?.block_on(async { cfd::open(&order).await })
}

#[tokio::main(flavor = "current_thread")]
pub async fn call_faucet(address: String) -> Result<String> {
pub fn call_faucet(address: String) -> Result<String> {
anyhow::ensure!(
!address.is_empty(),
"Cannot call faucet because of empty address"
);
faucet::call_faucet(address).await
runtime()?.block_on(async { faucet::call_faucet(address).await })
}

#[tokio::main(flavor = "current_thread")]
pub async fn get_fee_recommendation() -> Result<u32> {
let fee_recommendation = wallet::get_fee_recommendation()?;

Ok(fee_recommendation)
pub fn get_fee_recommendation() -> Result<u32> {
wallet::get_fee_recommendation()
}

/// Settles a CFD with the given taker and maker amounts in sats
#[tokio::main(flavor = "current_thread")]
pub async fn settle_cfd(cfd: Cfd, offer: Offer) -> Result<()> {
cfd::settle(&cfd, &offer).await
}

#[tokio::main(flavor = "current_thread")]
pub async fn get_lightning_tx_history() -> Result<Vec<LightningTransaction>> {
wallet::get_lightning_history().await
pub fn settle_cfd(cfd: Cfd, offer: Offer) -> Result<()> {
runtime()?.block_on(async { cfd::settle(&cfd, &offer).await })
}

/// Initialise logging infrastructure for Rust
Expand All @@ -350,19 +365,18 @@ pub fn get_seed_phrase() -> Vec<String> {
wallet::get_seed_phrase()
}

#[tokio::main(flavor = "current_thread")]
pub async fn send_lightning_payment(invoice: String) -> Result<()> {
pub fn send_lightning_payment(invoice: String) -> Result<()> {
anyhow::ensure!(!invoice.is_empty(), "Cannot pay empty invoice");
wallet::send_lightning_payment(&invoice).await
runtime()?.block_on(async { wallet::send_lightning_payment(&invoice).await })
}

#[tokio::main(flavor = "current_thread")]
pub async fn create_lightning_invoice(
pub fn create_lightning_invoice(
amount_sats: u64,
expiry_secs: u32,
description: String,
) -> Result<String> {
wallet::create_invoice(amount_sats, expiry_secs, description).await
runtime()?
.block_on(async { wallet::create_invoice(amount_sats, expiry_secs, description).await })
}

// Note, this implementation has to be on the api level as otherwise it wouldn't be generated
Expand Down
56 changes: 24 additions & 32 deletions rust/src/connection.rs
Original file line number Diff line number Diff line change
@@ -1,44 +1,36 @@
use crate::config;
use crate::lightning::PeerInfo;
use crate::lightning::PeerManager;
use bdk::bitcoin::secp256k1::PublicKey;
use std::sync::Arc;
use std::time::Duration;
use tokio::task::JoinHandle;

pub fn spawn(peer_manager: Arc<PeerManager>) -> JoinHandle<()> {
// keep connection with maker alive!
tokio::spawn(async move {
let peer_info = config::maker_peer_info();
loop {
tracing::info!("Connecting to {peer_info}");
match lightning_net_tokio::connect_outbound(
Arc::clone(&peer_manager),
peer_info.pubkey,
peer_info.peer_addr,
)
.await
{
Some(connection_closed_future) => {
let mut connection_closed_future = Box::pin(connection_closed_future);
while !is_connected(&peer_manager, peer_info.pubkey) {
if futures::poll!(&mut connection_closed_future).is_ready() {
tracing::warn!("Peer disconnected before we finished the handshake! Retrying in 5 seconds.");
tokio::time::sleep(Duration::from_secs(5)).await;
return;
}
tokio::time::sleep(Duration::from_secs(5)).await;
}
tracing::info!("Successfully connected to {peer_info}");
connection_closed_future.await;
tracing::warn!("Lost connection to maker, retrying immediately.")
}
None => {
tracing::warn!("Failed to connect to maker! Retrying in 5 seconds.");
pub async fn connect(peer_manager: Arc<PeerManager>, peer_info: PeerInfo) {
tracing::info!("Connecting to {peer_info}");
match lightning_net_tokio::connect_outbound(
Arc::clone(&peer_manager),
peer_info.pubkey,
peer_info.peer_addr,
)
.await
{
Some(connection_closed_future) => {
let mut connection_closed_future = Box::pin(connection_closed_future);
while !is_connected(&peer_manager, peer_info.pubkey) {
if futures::poll!(&mut connection_closed_future).is_ready() {
tracing::warn!("Peer disconnected before we finished the handshake! Retrying in 5 seconds.");
tokio::time::sleep(Duration::from_secs(5)).await;
return;
}
tokio::time::sleep(Duration::from_secs(5)).await;
}
tracing::info!("Successfully connected to {peer_info}");
connection_closed_future.await;
tracing::warn!("Lost connection to maker, retrying immediately.")
}
None => {
tracing::warn!("Failed to connect to maker! Retrying.");
}
})
}
}

fn is_connected(peer_manager: &Arc<PeerManager>, pubkey: PublicKey) -> bool {
Expand Down
Loading