diff --git a/src/job/batch_broadcasting.rs b/src/job/batch_broadcasting.rs index 5ff02e69..0ed5a9e1 100644 --- a/src/job/batch_broadcasting.rs +++ b/src/job/batch_broadcasting.rs @@ -1,5 +1,4 @@ -use bdk::blockchain::{Blockchain, ElectrumBlockchain}; -use electrum_client::{Client, ConfigBuilder}; +use electrum_client::{Client, ConfigBuilder, ElectrumApi}; use serde::{Deserialize, Serialize}; use tracing::instrument; @@ -27,21 +26,67 @@ pub async fn execute( blockchain_cfg: BlockchainConfig, batches: Batches, ) -> Result { - let blockchain = init_electrum(&blockchain_cfg.electrum_url).await?; + let client = init_electrum(&blockchain_cfg.electrum_url).await?; let batch = batches.find_by_id(data.account_id, data.batch_id).await?; let span = tracing::Span::current(); span.record("txid", tracing::field::display(batch.bitcoin_tx_id)); if let Some(tx) = batch.get_tx_to_broadcast() { - blockchain.broadcast(&tx).map_err(BdkError::BdkLibError)?; + broadcast_or_verify(&client, &tx, &data)?; span.record("broadcast", true); } Ok(data) } -async fn init_electrum(electrum_url: &str) -> Result { - let blockchain = ElectrumBlockchain::from(Client::from_config( +fn broadcast_or_verify( + client: &Client, + tx: &bitcoin::Transaction, + data: &BatchBroadcastingData, +) -> Result<(), JobError> { + let txid = tx.txid(); + if let Err(err) = client.transaction_broadcast(tx) { + if is_tx_known(client, txid)? { + tracing::info!( + batch_id = %data.batch_id, + txid = %txid, + error = %err, + "Broadcast returned error but transaction is already known by electrum; treating as idempotent success" + ); + } else { + return Err(BdkError::ElectrumClient(err).into()); + } + } + Ok(()) +} + +fn is_tx_known(client: &Client, txid: bitcoin::Txid) -> Result { + match client.transaction_get(&txid) { + Ok(_) => Ok(true), + Err(electrum_client::Error::Protocol(value)) if is_protocol_tx_not_found(&value) => { + Ok(false) + } + Err(err) => Err(err.into()), + } +} + +fn is_protocol_tx_not_found(value: &serde_json::Value) -> bool { + let code = value.get("code").and_then(serde_json::Value::as_i64); + let msg = value + .get("message") + .and_then(serde_json::Value::as_str) + .unwrap_or_default() + .to_ascii_lowercase(); + + code == Some(-5) + || msg.contains("'code': -5") + || msg.contains("no such mempool or blockchain transaction") + || msg.contains("no such transaction") + || msg.contains("transaction not found") +} + +async fn init_electrum(electrum_url: &str) -> Result { + let client = Client::from_config( electrum_url, ConfigBuilder::new().retry(10).timeout(Some(60)).build(), - )?); - Ok(blockchain) + )?; + Ok(client) }