From 897851308461ca95d2bc9b2754ef0588e7b748a5 Mon Sep 17 00:00:00 2001 From: tompro Date: Mon, 2 Mar 2026 11:52:24 +0000 Subject: [PATCH 1/2] Retry and subscription fixes --- crates/bcr-ebill-api/src/constants.rs | 1 - .../src/block_transport.rs | 37 ++++- crates/bcr-ebill-transport/src/nostr.rs | 19 +-- .../src/nostr_transport.rs | 77 +++++++-- .../src/transport_service.rs | 150 ++++++++++++++++++ crates/bcr-ebill-wasm/src/lib.rs | 44 +---- 6 files changed, 259 insertions(+), 69 deletions(-) diff --git a/crates/bcr-ebill-api/src/constants.rs b/crates/bcr-ebill-api/src/constants.rs index 7e745fbd..ee01a700 100644 --- a/crates/bcr-ebill-api/src/constants.rs +++ b/crates/bcr-ebill-api/src/constants.rs @@ -6,7 +6,6 @@ pub const MAX_FILE_NAME_CHARACTERS: usize = 200; pub const VALID_FILE_MIME_TYPES: [&str; 3] = ["image/jpeg", "image/png", "application/pdf"]; // When subscribing events we subtract this from the last received event time -pub const NOSTR_EVENT_TIME_SLACK: u64 = 3600 * 24 * 7; // 1 week pub const DEFAULT_INITIAL_SUBSCRIPTION_DELAY_SECONDS: u32 = 1; pub const NOSTR_MAX_RELAYS: usize = 200; diff --git a/crates/bcr-ebill-transport/src/block_transport.rs b/crates/bcr-ebill-transport/src/block_transport.rs index df818cfd..27032316 100644 --- a/crates/bcr-ebill-transport/src/block_transport.rs +++ b/crates/bcr-ebill-transport/src/block_transport.rs @@ -11,7 +11,10 @@ use bcr_ebill_core::application::ServiceTraitBounds; use bcr_ebill_core::application::company::Company; use bcr_ebill_core::protocol::blockchain::BlockchainType; use bcr_ebill_core::protocol::crypto::BcrKeys; -use bcr_ebill_core::protocol::event::{BillChainEvent, CompanyChainEvent, IdentityChainEvent}; +use bcr_ebill_core::protocol::event::{ + BillChainEvent, CompanyChainEvent, EventEnvelope, IdentityChainEvent, +}; +use bitcoin::base58; use log::{debug, error}; use bcr_ebill_api::service::transport_service::{Error, Result}; @@ -156,8 +159,20 @@ impl BlockTransportServiceApi for BlockTransportService { if let Some((recipient, invite)) = events.generate_company_invite_message() && let Some(identity) = self.nostr_transport.resolve_identity(&recipient).await { - node.send_private_event(&events.sender(), &identity, invite.try_into()?) - .await?; + let message: EventEnvelope = invite.try_into()?; + if let Err(e) = node + .send_private_event(&events.sender(), &identity, message.clone()) + .await + { + error!("Failed to send company invite, queuing for retry: {e}"); + self.nostr_transport + .queue_retry_message( + &events.sender(), + Some(&recipient), + base58::encode(&borsh::to_vec(&message)?), + ) + .await?; + } } Ok(()) @@ -216,8 +231,20 @@ impl BlockTransportServiceApi for BlockTransportService { if !invites.is_empty() { for (recipient, event) in invites { if let Some(identity) = self.nostr_transport.resolve_identity(&recipient).await { - node.send_private_event(&events.sender(), &identity, event.try_into()?) - .await?; + let message: EventEnvelope = event.try_into()?; + if let Err(e) = node + .send_private_event(&events.sender(), &identity, message.clone()) + .await + { + error!("Failed to send bill invite, queuing for retry: {e}"); + self.nostr_transport + .queue_retry_message( + &events.sender(), + Some(&recipient), + base58::encode(&borsh::to_vec(&message)?), + ) + .await?; + } } } } diff --git a/crates/bcr-ebill-transport/src/nostr.rs b/crates/bcr-ebill-transport/src/nostr.rs index 50a45467..89c8f77e 100644 --- a/crates/bcr-ebill-transport/src/nostr.rs +++ b/crates/bcr-ebill-transport/src/nostr.rs @@ -32,7 +32,7 @@ use std::{ }; use bcr_ebill_api::{ - constants::{NOSTR_EVENT_TIME_SLACK, NOSTR_MAX_RELAYS}, + constants::NOSTR_MAX_RELAYS, service::{ contact_service::ContactServiceApi, transport_service::{ @@ -1077,15 +1077,10 @@ pub async fn should_process( fn valid_time(kind: Kind, created: nostr::Timestamp, since: Option) -> bool { match since { - Some(ts) => { - let time = if matches!(kind, Kind::EncryptedDirectMessage | Kind::GiftWrap) { - add_time_slack(ts) - } else { - ts - }; + Some(time) if !matches!(kind, Kind::EncryptedDirectMessage | Kind::GiftWrap) => { created.as_u64() >= time.inner() } - None => true, + _ => true, } } @@ -1163,14 +1158,6 @@ async fn get_offset(db: &Arc, node_id: &NodeId) -> .unwrap_or(Timestamp::zero()) } -fn add_time_slack(ts: Timestamp) -> Timestamp { - if ts.inner() <= NOSTR_EVENT_TIME_SLACK { - ts - } else { - ts - NOSTR_EVENT_TIME_SLACK - } -} - pub async fn add_offset( db: &Arc, event_id: EventId, diff --git a/crates/bcr-ebill-transport/src/nostr_transport.rs b/crates/bcr-ebill-transport/src/nostr_transport.rs index 12ef8628..90d615e6 100644 --- a/crates/bcr-ebill-transport/src/nostr_transport.rs +++ b/crates/bcr-ebill-transport/src/nostr_transport.rs @@ -30,6 +30,13 @@ use log::{debug, error, warn}; use bcr_ebill_api::service::transport_service::{Error, Result}; use bcr_ebill_core::protocol::PostalAddress; +#[derive(borsh::BorshDeserialize)] +struct LegacyBillEventMessage { + event_type: bcr_ebill_core::protocol::event::EventType, + version: String, + data: BillChainEventPayload, +} + /// Transport implementation for Nostr pub struct NostrTransportService { nostr_client: Arc, @@ -147,18 +154,16 @@ impl NostrTransportService { let node = self.get_node_transport(sender); for (node_id, event_to_process) in events.iter() { if let Some(identity) = self.resolve_identity(node_id).await { + let message: EventEnvelope = event_to_process.clone().try_into()?; if let Err(e) = node - .send_private_event(sender, &identity, event_to_process.clone().try_into()?) + .send_private_event(sender, &identity, message.clone()) .await { error!("Failed to send block notification, will add it to retry queue: {e}"); self.queue_retry_message( sender, Some(node_id), - base58::encode( - &borsh::to_vec(event_to_process) - .map_err(|e| Error::Message(e.to_string()))?, - ), + base58::encode(&borsh::to_vec(&message)?), ) .await?; } @@ -287,7 +292,29 @@ impl NostrTransportService { continue; } }; - match borsh::from_slice::(&decoded) { + let message = match borsh::from_slice::(&decoded) { + Ok(message) => Ok(message), + Err(envelope_err) => { + match borsh::from_slice::(&decoded) { + Ok(legacy_event) => match borsh::to_vec(&legacy_event.data) { + Ok(data) => Ok(EventEnvelope { + event_type: legacy_event.event_type, + version: legacy_event.version, + data, + }), + Err(e) => Err(Error::Message(e.to_string())), + }, + Err(legacy_err) => { + error!( + "Failed to deserialize private retry payload as envelope ({envelope_err}) or legacy event ({legacy_err})" + ); + Err(Error::Message(envelope_err.to_string())) + } + } + } + }; + + match message { Ok(message) => { self.send_retry_private_message( &queued_message.sender_id, @@ -296,10 +323,7 @@ impl NostrTransportService { ) .await } - Err(e) => { - error!("Failed to deserialize private retry payload: {e}"); - Err(Error::Message(e.to_string())) - } + Err(e) => Err(e), } } None => { @@ -349,7 +373,7 @@ impl NostrTransportService { message: EventEnvelope, ) -> Result<()> { let node = self.get_node_transport(sender); - if let Some(identity) = self.resolve_identity(node_id).await { + if let Some(identity) = self.resolve_retry_identity(node_id).await { node.send_private_event(sender, &identity, message).await?; } else { warn!("Failed to resolve recipient for retry message, node_id: {node_id}"); @@ -360,6 +384,37 @@ impl NostrTransportService { Ok(()) } + async fn resolve_retry_identity(&self, node_id: &NodeId) -> Option { + if let Some(identity) = self.get_local_identity(node_id) { + return Some(identity); + } + + if let Some(identity) = self.resolve_node_contact(node_id).await { + return Some(identity); + } + + if let Ok(Some(nostr)) = self.nostr_contact_store.by_node_id(node_id).await { + let relays = if nostr.relays.is_empty() { + self.nostr_relays.clone() + } else { + nostr.relays + }; + return Some(BillParticipant::Anon(BillAnonParticipant { + node_id: node_id.to_owned(), + nostr_relays: relays, + })); + } + + if self.nostr_relays.is_empty() { + None + } else { + Some(BillParticipant::Anon(BillAnonParticipant { + node_id: node_id.to_owned(), + nostr_relays: self.nostr_relays.clone(), + })) + } + } + pub(crate) async fn connect(&self) { // With single multi-identity client, just connect it if let Err(e) = self.nostr_client.connect().await { diff --git a/crates/bcr-ebill-transport/src/transport_service.rs b/crates/bcr-ebill-transport/src/transport_service.rs index 12415113..adee4c72 100644 --- a/crates/bcr-ebill-transport/src/transport_service.rs +++ b/crates/bcr-ebill-transport/src/transport_service.rs @@ -483,6 +483,7 @@ mod tests { get_nostr_transport, signed_identity_proof_test, }; use bcr_ebill_core::application::contact::Contact; + use bcr_ebill_core::application::nostr_contact::{HandshakeStatus, NostrContact, TrustLevel}; use bcr_ebill_core::protocol::Timestamp; use bcr_ebill_core::protocol::blockchain::Blockchain; use bcr_ebill_core::protocol::blockchain::bill::block::{ @@ -2444,6 +2445,155 @@ mod tests { assert!(result.is_ok()); } + #[tokio::test] + async fn test_send_retry_private_message_uses_nostr_contact_without_trust() { + init_test_cfg(); + + let (service, _) = expect_service( + |mock_transport, + mock_contact_store, + mock_nostr_contact_store, + mock_queue, + _, + _, + _, + _| { + let recipient = node_id_test_other(); + let message_id = "private_msg_id"; + let sender = node_id_test(); + let payload = base58::encode( + &borsh::to_vec(&EventEnvelope { + version: "1.0".to_string(), + event_type: EventType::Bill, + data: vec![], + }) + .unwrap(), + ); + + let queued_message = NostrQueuedMessage { + id: message_id.to_string(), + sender_id: sender.to_owned(), + recipient: Some(recipient.to_owned()), + payload, + }; + + let nostr_contact = NostrContact { + npub: recipient.npub(), + node_id: recipient.to_owned(), + name: None, + relays: vec![url::Url::parse("wss://relay.example.com").unwrap()], + trust_level: TrustLevel::None, + handshake_status: HandshakeStatus::None, + contact_private_key: None, + mint_url: None, + }; + + mock_contact_store + .expect_get() + .with(eq(recipient.to_owned())) + .returning(|_| Ok(None)) + .once(); + + mock_nostr_contact_store + .expect_by_node_id() + .with(eq(recipient.to_owned())) + .returning(move |_| Ok(Some(nostr_contact.clone()))) + .once(); + + mock_transport + .expect_send_private_event() + .returning(|_, _, _| Ok(())) + .once(); + + mock_queue + .expect_get_retry_messages() + .with(eq(1)) + .returning(move |_| Ok(vec![queued_message.clone()])) + .once(); + mock_queue + .expect_get_retry_messages() + .with(eq(1)) + .returning(|_| Ok(vec![])) + .once(); + + mock_queue + .expect_succeed_retry() + .with(eq(message_id)) + .returning(|_| Ok(())) + .once(); + }, + ); + + let result = service.send_retry_messages().await; + assert!(result.is_ok()); + } + + #[tokio::test] + async fn test_send_retry_private_message_with_legacy_payload_format() { + init_test_cfg(); + + let (service, _) = expect_service( + |mock_transport, mock_contact_store, _, mock_queue, _, _, _, _| { + let recipient = node_id_test_other(); + let message_id = "private_legacy_msg_id"; + let sender = node_id_test(); + + let legacy_event = Event::new_bill(BillChainEventPayload { + event_type: BillEventType::BillMintingRequested, + bill_id: bill_id_test(), + action_type: Some(ActionType::CheckBill), + sum: Some(Sum::new_sat(1).unwrap()), + }); + + let payload = base58::encode(&borsh::to_vec(&legacy_event).unwrap()); + + let queued_message = NostrQueuedMessage { + id: message_id.to_string(), + sender_id: sender.to_owned(), + recipient: Some(recipient.to_owned()), + payload, + }; + + let identity = get_identity_public_data( + &recipient, + &Email::new("test@example.com").unwrap(), + vec![], + ); + + mock_contact_store + .expect_get() + .with(eq(recipient.to_owned())) + .returning(move |_| Ok(Some(as_contact(&identity)))) + .once(); + + mock_transport + .expect_send_private_event() + .returning(|_, _, _| Ok(())) + .once(); + + mock_queue + .expect_get_retry_messages() + .with(eq(1)) + .returning(move |_| Ok(vec![queued_message.clone()])) + .once(); + mock_queue + .expect_get_retry_messages() + .with(eq(1)) + .returning(|_| Ok(vec![])) + .once(); + + mock_queue + .expect_succeed_retry() + .with(eq(message_id)) + .returning(|_| Ok(())) + .once(); + }, + ); + + let result = service.send_retry_messages().await; + assert!(result.is_ok()); + } + #[tokio::test] async fn test_send_retry_private_message_with_invalid_base58() { init_test_cfg(); diff --git a/crates/bcr-ebill-wasm/src/lib.rs b/crates/bcr-ebill-wasm/src/lib.rs index 6543ffc7..9a6df1d2 100644 --- a/crates/bcr-ebill-wasm/src/lib.rs +++ b/crates/bcr-ebill-wasm/src/lib.rs @@ -147,49 +147,21 @@ async fn ensure_transport_contact_data_published(ctx: &Context, default_mint_nod return; } - if let Ok(full_identity) = ctx.identity_service.get_full_identity().await { - match ctx - .transport_service - .contact_transport() - .resolve_contact(&full_identity.identity.node_id) + if let Ok(full_identity) = ctx.identity_service.get_full_identity().await + && let Err(e) = ctx + .identity_service + .publish_contact(&full_identity.identity, &full_identity.key_pair) .await - { - Ok(None) => { - if let Err(e) = ctx - .identity_service - .publish_contact(&full_identity.identity, &full_identity.key_pair) - .await - { - warn!("Could not publish identity details to Nostr: {e}") - } - } - Ok(Some(_)) => (), - Err(e) => { - warn!("Could not resolve personal identity details on Nostr: {e}") - } - } + { + warn!("Could not publish identity details to Nostr: {e}") } if let Ok(companies) = ctx.company_service.get_list_of_companies().await { for c in companies.iter() { if let Ok((company, keys)) = ctx.company_service.get_company_and_keys_by_id(&c.id).await + && let Err(e) = ctx.company_service.publish_contact(&company, &keys).await { - match ctx - .transport_service - .contact_transport() - .resolve_contact(&company.id) - .await - { - Ok(None) => { - if let Err(e) = ctx.company_service.publish_contact(&company, &keys).await { - warn!("Could not publish company details to Nostr: {e}") - } - } - Ok(Some(_)) => (), - Err(e) => { - warn!("Could not resolve company details on Nostr: {e}") - } - } + warn!("Could not publish company details to Nostr: {e}") } } } From a285c858fb207ce4660cdae85c322b3dc3bdc855 Mon Sep 17 00:00:00 2001 From: tompro Date: Mon, 2 Mar 2026 12:46:04 +0000 Subject: [PATCH 2/2] Review fixes --- .../src/nostr_transport.rs | 33 +------- .../src/transport_service.rs | 84 ------------------- 2 files changed, 1 insertion(+), 116 deletions(-) diff --git a/crates/bcr-ebill-transport/src/nostr_transport.rs b/crates/bcr-ebill-transport/src/nostr_transport.rs index 90d615e6..5480d594 100644 --- a/crates/bcr-ebill-transport/src/nostr_transport.rs +++ b/crates/bcr-ebill-transport/src/nostr_transport.rs @@ -373,7 +373,7 @@ impl NostrTransportService { message: EventEnvelope, ) -> Result<()> { let node = self.get_node_transport(sender); - if let Some(identity) = self.resolve_retry_identity(node_id).await { + if let Some(identity) = self.resolve_identity(node_id).await { node.send_private_event(sender, &identity, message).await?; } else { warn!("Failed to resolve recipient for retry message, node_id: {node_id}"); @@ -384,37 +384,6 @@ impl NostrTransportService { Ok(()) } - async fn resolve_retry_identity(&self, node_id: &NodeId) -> Option { - if let Some(identity) = self.get_local_identity(node_id) { - return Some(identity); - } - - if let Some(identity) = self.resolve_node_contact(node_id).await { - return Some(identity); - } - - if let Ok(Some(nostr)) = self.nostr_contact_store.by_node_id(node_id).await { - let relays = if nostr.relays.is_empty() { - self.nostr_relays.clone() - } else { - nostr.relays - }; - return Some(BillParticipant::Anon(BillAnonParticipant { - node_id: node_id.to_owned(), - nostr_relays: relays, - })); - } - - if self.nostr_relays.is_empty() { - None - } else { - Some(BillParticipant::Anon(BillAnonParticipant { - node_id: node_id.to_owned(), - nostr_relays: self.nostr_relays.clone(), - })) - } - } - pub(crate) async fn connect(&self) { // With single multi-identity client, just connect it if let Err(e) = self.nostr_client.connect().await { diff --git a/crates/bcr-ebill-transport/src/transport_service.rs b/crates/bcr-ebill-transport/src/transport_service.rs index adee4c72..6e4e6b66 100644 --- a/crates/bcr-ebill-transport/src/transport_service.rs +++ b/crates/bcr-ebill-transport/src/transport_service.rs @@ -483,7 +483,6 @@ mod tests { get_nostr_transport, signed_identity_proof_test, }; use bcr_ebill_core::application::contact::Contact; - use bcr_ebill_core::application::nostr_contact::{HandshakeStatus, NostrContact, TrustLevel}; use bcr_ebill_core::protocol::Timestamp; use bcr_ebill_core::protocol::blockchain::Blockchain; use bcr_ebill_core::protocol::blockchain::bill::block::{ @@ -2445,89 +2444,6 @@ mod tests { assert!(result.is_ok()); } - #[tokio::test] - async fn test_send_retry_private_message_uses_nostr_contact_without_trust() { - init_test_cfg(); - - let (service, _) = expect_service( - |mock_transport, - mock_contact_store, - mock_nostr_contact_store, - mock_queue, - _, - _, - _, - _| { - let recipient = node_id_test_other(); - let message_id = "private_msg_id"; - let sender = node_id_test(); - let payload = base58::encode( - &borsh::to_vec(&EventEnvelope { - version: "1.0".to_string(), - event_type: EventType::Bill, - data: vec![], - }) - .unwrap(), - ); - - let queued_message = NostrQueuedMessage { - id: message_id.to_string(), - sender_id: sender.to_owned(), - recipient: Some(recipient.to_owned()), - payload, - }; - - let nostr_contact = NostrContact { - npub: recipient.npub(), - node_id: recipient.to_owned(), - name: None, - relays: vec![url::Url::parse("wss://relay.example.com").unwrap()], - trust_level: TrustLevel::None, - handshake_status: HandshakeStatus::None, - contact_private_key: None, - mint_url: None, - }; - - mock_contact_store - .expect_get() - .with(eq(recipient.to_owned())) - .returning(|_| Ok(None)) - .once(); - - mock_nostr_contact_store - .expect_by_node_id() - .with(eq(recipient.to_owned())) - .returning(move |_| Ok(Some(nostr_contact.clone()))) - .once(); - - mock_transport - .expect_send_private_event() - .returning(|_, _, _| Ok(())) - .once(); - - mock_queue - .expect_get_retry_messages() - .with(eq(1)) - .returning(move |_| Ok(vec![queued_message.clone()])) - .once(); - mock_queue - .expect_get_retry_messages() - .with(eq(1)) - .returning(|_| Ok(vec![])) - .once(); - - mock_queue - .expect_succeed_retry() - .with(eq(message_id)) - .returning(|_| Ok(())) - .once(); - }, - ); - - let result = service.send_retry_messages().await; - assert!(result.is_ok()); - } - #[tokio::test] async fn test_send_retry_private_message_with_legacy_payload_format() { init_test_cfg();