From 6585ea92e6e3d17a912f4aaabd3c7afcc2fca742 Mon Sep 17 00:00:00 2001 From: Juan P Lopez Date: Mon, 6 Apr 2026 10:20:30 -0500 Subject: [PATCH 1/2] fix(job): add wallet sync timeout recovery Add warning and hard timeout thresholds around BDK sync so hung wallet sync attempts fail and retry instead of requiring pod restarts. Improve observability by logging sync_wallet spawn failures with wallet/account context. --- src/job/error.rs | 8 ++++++++ src/job/mod.rs | 11 ++++++++++- src/job/sync_wallet.rs | 29 ++++++++++++++++++++++++++++- 3 files changed, 46 insertions(+), 2 deletions(-) diff --git a/src/job/error.rs b/src/job/error.rs index c9fc2e98..714ce380 100644 --- a/src/job/error.rs +++ b/src/job/error.rs @@ -58,6 +58,14 @@ pub enum JobError { PsbtMissingInSigningSessions, #[error("JobError - psbt::Error: {0}")] PsbtError(#[from] psbt::Error), + #[error( + "JobError - sync_wallet bdk sync timed out after {timeout_secs}s for wallet_id={wallet_id} keychain_id={keychain_id}" + )] + BdkSyncTimeout { + timeout_secs: u64, + wallet_id: String, + keychain_id: String, + }, } impl JobExecutionError for JobError {} diff --git a/src/job/mod.rs b/src/job/mod.rs index 84395d86..06007f41 100644 --- a/src/job/mod.rs +++ b/src/job/mod.rs @@ -278,7 +278,16 @@ async fn sync_all_wallets( .expect("couldn't build JobExecutor") .execute(|_| async move { for (account_id, wallet_id) in wallets.all_ids().await? { - let _ = spawn_sync_wallet(&pool, SyncWalletData::new(account_id, wallet_id)).await; + if let Err(err) = + spawn_sync_wallet(&pool, SyncWalletData::new(account_id, wallet_id)).await + { + tracing::error!( + account_id = %account_id, + wallet_id = %wallet_id, + error = %err, + "failed to spawn sync_wallet" + ); + } } Ok::<(), JobError>(()) }) diff --git a/src/job/sync_wallet.rs b/src/job/sync_wallet.rs index ec14e931..fad6efcc 100644 --- a/src/job/sync_wallet.rs +++ b/src/job/sync_wallet.rs @@ -19,6 +19,10 @@ use crate::{ wallet::*, }; use std::collections::HashMap; +use std::time::Duration; + +const BDK_SYNC_WARN_AFTER: Duration = Duration::from_secs(20 * 60); +const BDK_SYNC_HARD_TIMEOUT: Duration = Duration::from_secs(30 * 60); #[derive(Debug, Clone, Serialize, Deserialize)] pub struct SyncWalletData { @@ -105,7 +109,30 @@ pub async fn execute( let (blockchain, current_height) = init_electrum(&deps.blockchain_cfg.electrum_url).await?; span.record("current_height", current_height); let latest_change_settle_height = wallet.config.latest_change_settle_height(current_height); - keychain_wallet.sync(blockchain).await?; + let sync_start = tokio::time::Instant::now(); + let bdk_sync_outcome = + tokio::time::timeout(BDK_SYNC_HARD_TIMEOUT, keychain_wallet.sync(blockchain)).await; + let sync_elapsed = sync_start.elapsed(); + if sync_elapsed >= BDK_SYNC_WARN_AFTER { + tracing::warn!( + wallet_id = %data.wallet_id, + keychain_id = %keychain_id, + elapsed_secs = sync_elapsed.as_secs(), + warn_after_secs = BDK_SYNC_WARN_AFTER.as_secs(), + "bdk sync exceeded warning threshold" + ); + } + match bdk_sync_outcome { + Ok(Ok(())) => {} + Ok(Err(e)) => return Err(e.into()), + Err(_) => { + return Err(JobError::BdkSyncTimeout { + timeout_secs: BDK_SYNC_HARD_TIMEOUT.as_secs(), + wallet_id: data.wallet_id.to_string(), + keychain_id: keychain_id.to_string(), + }) + } + } let bdk_txs = Transactions::new(keychain_id, pool.clone()); let bdk_utxos = BdkUtxos::new(keychain_id, pool.clone()); let mut txs_to_skip = Vec::new(); From be2190af34e507b55ca815d31c3dc35304eccbd9 Mon Sep 17 00:00:00 2001 From: Juan P Lopez Date: Mon, 6 Apr 2026 10:44:30 -0500 Subject: [PATCH 2/2] fix(job): avoid aborting blocking wallet sync on timeout Switch sync timeout handling to threshold logging without early return so spawn_blocking sync work is not orphaned in the background. Keep warning and hard-threshold visibility while awaiting completion of the active keychain sync. --- src/job/error.rs | 8 ----- src/job/sync_wallet.rs | 68 +++++++++++++++++++++++++++++------------- 2 files changed, 47 insertions(+), 29 deletions(-) diff --git a/src/job/error.rs b/src/job/error.rs index 714ce380..c9fc2e98 100644 --- a/src/job/error.rs +++ b/src/job/error.rs @@ -58,14 +58,6 @@ pub enum JobError { PsbtMissingInSigningSessions, #[error("JobError - psbt::Error: {0}")] PsbtError(#[from] psbt::Error), - #[error( - "JobError - sync_wallet bdk sync timed out after {timeout_secs}s for wallet_id={wallet_id} keychain_id={keychain_id}" - )] - BdkSyncTimeout { - timeout_secs: u64, - wallet_id: String, - keychain_id: String, - }, } impl JobExecutionError for JobError {} diff --git a/src/job/sync_wallet.rs b/src/job/sync_wallet.rs index fad6efcc..19378e3e 100644 --- a/src/job/sync_wallet.rs +++ b/src/job/sync_wallet.rs @@ -110,27 +110,53 @@ pub async fn execute( span.record("current_height", current_height); let latest_change_settle_height = wallet.config.latest_change_settle_height(current_height); let sync_start = tokio::time::Instant::now(); - let bdk_sync_outcome = - tokio::time::timeout(BDK_SYNC_HARD_TIMEOUT, keychain_wallet.sync(blockchain)).await; - let sync_elapsed = sync_start.elapsed(); - if sync_elapsed >= BDK_SYNC_WARN_AFTER { - tracing::warn!( - wallet_id = %data.wallet_id, - keychain_id = %keychain_id, - elapsed_secs = sync_elapsed.as_secs(), - warn_after_secs = BDK_SYNC_WARN_AFTER.as_secs(), - "bdk sync exceeded warning threshold" - ); - } - match bdk_sync_outcome { - Ok(Ok(())) => {} - Ok(Err(e)) => return Err(e.into()), - Err(_) => { - return Err(JobError::BdkSyncTimeout { - timeout_secs: BDK_SYNC_HARD_TIMEOUT.as_secs(), - wallet_id: data.wallet_id.to_string(), - keychain_id: keychain_id.to_string(), - }) + let sync_fut = keychain_wallet.sync(blockchain); + tokio::pin!(sync_fut); + + let warn_timer = tokio::time::sleep(BDK_SYNC_WARN_AFTER); + tokio::pin!(warn_timer); + + let hard_timer = tokio::time::sleep(BDK_SYNC_HARD_TIMEOUT); + tokio::pin!(hard_timer); + + let mut warned = false; + let mut hard_threshold_reached = false; + + loop { + tokio::select! { + res = &mut sync_fut => { + if hard_threshold_reached { + tracing::info!( + wallet_id = %data.wallet_id, + keychain_id = %keychain_id, + elapsed_secs = sync_start.elapsed().as_secs(), + hard_timeout_secs = BDK_SYNC_HARD_TIMEOUT.as_secs(), + "bdk sync finished after hard-timeout threshold" + ); + } + res?; + break; + } + _ = &mut warn_timer, if !warned => { + warned = true; + tracing::warn!( + wallet_id = %data.wallet_id, + keychain_id = %keychain_id, + elapsed_secs = sync_start.elapsed().as_secs(), + warn_after_secs = BDK_SYNC_WARN_AFTER.as_secs(), + "bdk sync exceeded warning threshold" + ); + } + _ = &mut hard_timer, if !hard_threshold_reached => { + hard_threshold_reached = true; + tracing::error!( + wallet_id = %data.wallet_id, + keychain_id = %keychain_id, + elapsed_secs = sync_start.elapsed().as_secs(), + hard_timeout_secs = BDK_SYNC_HARD_TIMEOUT.as_secs(), + "bdk sync exceeded hard-timeout threshold; waiting for sync completion" + ); + } } } let bdk_txs = Transactions::new(keychain_id, pool.clone());