diff --git a/crates/fiber-lib/src/fiber/channel.rs b/crates/fiber-lib/src/fiber/channel.rs index f208cfbaa..f06e2051d 100644 --- a/crates/fiber-lib/src/fiber/channel.rs +++ b/crates/fiber-lib/src/fiber/channel.rs @@ -115,7 +115,6 @@ pub const COMMITMENT_CELL_WITNESS_LEN: usize = 16 + 1 + 32 + 64; // triggered 10 times per second, plus we also trigger `apply_retryable_tlc_operations` when // receiving ACK from peer, so it's a reason number for 20 TPS const RETRYABLE_TLC_OPS_INTERVAL: Duration = Duration::from_millis(100); -const WAITING_REESTABLISH_FINISH_TIMEOUT: Duration = Duration::from_millis(4000); // if a important TLC operation is not acked in 30 seconds, we will try to disconnect the peer. #[cfg(not(any(test, feature = "bench")))] @@ -178,6 +177,8 @@ pub enum ChannelCommand { Update(UpdateCommand, RpcReplyPort>), NotifyEvent(ChannelEvent), #[cfg(any(test, feature = "bench"))] + SetDeferPeerTlcUpdates(bool), + #[cfg(any(test, feature = "bench"))] ReloadState(ReloadParams), } @@ -194,6 +195,10 @@ impl Display for ChannelCommand { ChannelCommand::Update(_, _) => write!(f, "Update"), ChannelCommand::NotifyEvent(event) => write!(f, "NotifyEvent [{:?}]", event), #[cfg(any(test, feature = "bench"))] + ChannelCommand::SetDeferPeerTlcUpdates(enabled) => { + write!(f, "SetDeferPeerTlcUpdates [{enabled}]") + } + #[cfg(any(test, feature = "bench"))] ChannelCommand::ReloadState(_) => write!(f, "ReloadState"), } } @@ -532,6 +537,10 @@ where self.apply_retryable_tlc_operations(myself, state, false) .await; } + if state.finish_pending_reestablish_channel_ready(myself) { + state.schedule_next_retry_task(myself); + debug_event!(self.network, "Reestablished channel in ChannelReady"); + } Ok(()) } FiberChannelMessage::ChannelReady(_channel_ready) => { @@ -565,14 +574,17 @@ where } FiberChannelMessage::AddTlc(add_tlc) => { if state.defer_peer_tlc_updates { - state.queue_deferred_peer_tlc_update(DeferredPeerTlcUpdate::Add(add_tlc)); + state + .try_queue_deferred_peer_tlc_update(DeferredPeerTlcUpdate::Add(add_tlc))?; return Ok(()); } self.handle_add_tlc_peer_message(state, add_tlc) } FiberChannelMessage::RemoveTlc(remove_tlc) => { if state.defer_peer_tlc_updates { - state.queue_deferred_peer_tlc_update(DeferredPeerTlcUpdate::Remove(remove_tlc)); + state.try_queue_deferred_peer_tlc_update(DeferredPeerTlcUpdate::Remove( + remove_tlc, + ))?; return Ok(()); } self.handle_remove_tlc_peer_message(state, remove_tlc) @@ -2334,6 +2346,15 @@ where } ChannelCommand::NotifyEvent(event) => self.handle_event(myself, state, event).await, #[cfg(any(test, feature = "bench"))] + ChannelCommand::SetDeferPeerTlcUpdates(enabled) => { + if enabled { + state.start_defer_peer_tlc_updates(); + } else { + state.stop_defer_peer_tlc_updates(); + } + Ok(()) + } + #[cfg(any(test, feature = "bench"))] ChannelCommand::ReloadState(reload_params) => { let private_key = state.private_key.clone(); *state = self @@ -3252,6 +3273,10 @@ pub struct ChannelActorState { pub core: ChannelActorData, // --- Runtime-only fields (not serialized) --- + /// Reestablish replay has resumed message flow, but we still owe the network actor a + /// `ChannelReady` notification once the missing peer acknowledgment arrives. + #[doc = "skip_store"] + pub pending_reestablish_channel_ready: bool, /// Temporarily defer peer TLC updates while replaying dual-owed state. #[doc = "skip_store"] pub defer_peer_tlc_updates: bool, @@ -3315,6 +3340,7 @@ impl<'de> Deserialize<'de> for ChannelActorState { network: None, scheduled_channel_update_handle: None, pending_notify_settle_tlcs: vec![], + pending_reestablish_channel_ready: false, defer_peer_tlc_updates: false, deferred_peer_tlc_updates: VecDeque::new(), ephemeral_config: Default::default(), @@ -3623,13 +3649,33 @@ impl ChannelActorState { } } - fn queue_deferred_peer_tlc_update(&mut self, update: DeferredPeerTlcUpdate) { + fn max_deferred_peer_tlc_updates(&self) -> usize { + self.local_constraints + .max_tlc_number_in_flight + .saturating_add(self.remote_constraints.max_tlc_number_in_flight) as usize + } + + fn try_queue_deferred_peer_tlc_update( + &mut self, + update: DeferredPeerTlcUpdate, + ) -> ProcessingChannelResult { + let max_deferred_updates = self.max_deferred_peer_tlc_updates(); + if self.deferred_peer_tlc_updates.len() >= max_deferred_updates { + return Err(ProcessingChannelError::InvalidState(format!( + "Too many deferred peer TLC updates while replaying channel {}: queued {}, limit {}", + self.get_id(), + self.deferred_peer_tlc_updates.len(), + max_deferred_updates + ))); + } + self.deferred_peer_tlc_updates.push_back(update); debug!( "Deferred peer TLC update for channel {} (queued={})", self.get_id(), self.deferred_peer_tlc_updates.len() ); + Ok(()) } fn log_ack_state(&self, context: &str) { @@ -4056,6 +4102,7 @@ impl ChannelActorState { network: Some(network), scheduled_channel_update_handle: None, pending_notify_settle_tlcs: vec![], + pending_reestablish_channel_ready: false, defer_peer_tlc_updates: false, deferred_peer_tlc_updates: VecDeque::new(), ephemeral_config: Default::default(), @@ -4146,6 +4193,7 @@ impl ChannelActorState { network: Some(network), scheduled_channel_update_handle: None, pending_notify_settle_tlcs: vec![], + pending_reestablish_channel_ready: false, defer_peer_tlc_updates: false, deferred_peer_tlc_updates: VecDeque::new(), ephemeral_config: Default::default(), @@ -5963,6 +6011,7 @@ impl ChannelActorState { return; }; + self.pending_reestablish_channel_ready = false; self.reestablishing = false; // If the channel is already ready, we should notify the network actor. @@ -5970,14 +6019,25 @@ impl ChannelActorState { let channel_id = self.get_id(); let pubkey = self.get_remote_pubkey(); self.network() - .send_after(WAITING_REESTABLISH_FINISH_TIMEOUT, move || { - NetworkActorMessage::new_event(NetworkActorEvent::ChannelReady( - channel_id, pubkey, outpoint, - )) - }); + .send_message(NetworkActorMessage::new_event( + NetworkActorEvent::ChannelReady(channel_id, pubkey, outpoint), + )) + .expect(ASSUME_NETWORK_ACTOR_ALIVE); self.on_owned_channel_updated(myself, false); } + fn finish_pending_reestablish_channel_ready( + &mut self, + myself: &ActorRef, + ) -> bool { + if !self.pending_reestablish_channel_ready { + return false; + } + + self.on_reestablished_channel_ready(myself); + true + } + fn resume_funding(&mut self, myself: &ActorRef) { match self.state { ChannelState::AwaitingTxSignatures(mut flags) => { @@ -6256,12 +6316,14 @@ impl ChannelActorState { } ChannelState::ChannelReady => { self.clear_waiting_peer_response(); + self.pending_reestablish_channel_ready = false; let my_local_commitment_number = self.get_local_commitment_number(); let my_remote_commitment_number = self.get_remote_commitment_number(); let my_waiting_ack = self.tlc_state.waiting_ack; let peer_local_commitment_number = reestablish_channel.local_commitment_number; let peer_remote_commitment_number = reestablish_channel.remote_commitment_number; + let mut reestablish_complete = true; warn!( "peer: {:?} \ @@ -6361,11 +6423,18 @@ impl ChannelActorState { self.resend_tlcs_on_reestablish(true)?; } } else { - // ignore, waiting for remote peer to resend revoke_and_ack + // Wait for the peer to resend the missing revoke_and_ack before declaring the + // channel ready again. We must resume normal message handling so that ack can + // be processed, but we delay the ready notification until then. + self.reestablishing = false; + self.pending_reestablish_channel_ready = true; + reestablish_complete = false; } - self.on_reestablished_channel_ready(myself); - debug_event!(network, "Reestablished channel in ChannelReady"); + if reestablish_complete { + self.on_reestablished_channel_ready(myself); + debug_event!(network, "Reestablished channel in ChannelReady"); + } } ChannelState::ShuttingDown(flags) => { // Resend the shutdown message to the peer if we have not received the peer's shutdown message. diff --git a/crates/fiber-lib/src/fiber/network.rs b/crates/fiber-lib/src/fiber/network.rs index 2e8f1ea63..4fa185538 100644 --- a/crates/fiber-lib/src/fiber/network.rs +++ b/crates/fiber-lib/src/fiber/network.rs @@ -1147,39 +1147,14 @@ where } } - let mut inbound_peer_sessions = state.inbound_peer_sessions(); - let num_inbound_peers = inbound_peer_sessions.len(); + let inbound_no_channel_peers = state.inbound_no_channel_peers_in_connected_order(); + let num_inbound_no_channel_peers = inbound_no_channel_peers.len(); let num_outbound_peers = state.num_of_outbound_peers(); - debug!("Maintaining network connections ticked: current num inbound peers {}, current num outbound peers {}", num_inbound_peers, num_outbound_peers); - - if num_inbound_peers > state.max_inbound_peers { - debug!( - "Already connected to {} inbound peers, only wants {} peers, disconnecting some", - num_inbound_peers, state.max_inbound_peers - ); - inbound_peer_sessions.retain(|k| !state.session_channels_map.contains_key(k)); - let sessions_to_disconnect = if inbound_peer_sessions.len() - < num_inbound_peers - state.max_inbound_peers - { - warn!( - "Wants to disconnect more {} inbound peers, but all peers except {:?} have channels, will not disconnect any peer with channels", - num_inbound_peers - state.max_inbound_peers, &inbound_peer_sessions - ); - &inbound_peer_sessions[..] - } else { - &inbound_peer_sessions[..num_inbound_peers - state.max_inbound_peers] - }; - debug!( - "Disconnecting inbound peer sessions {:?}", - sessions_to_disconnect - ); - for session in sessions_to_disconnect { - if let Err(err) = state.control.disconnect(*session).await { - error!("Failed to disconnect session: {}", err); - } - } - } + debug!( + "Maintaining network connections ticked: current num inbound no-channel peers {}, current num outbound peers {}", + num_inbound_no_channel_peers, num_outbound_peers + ); if num_outbound_peers >= state.min_outbound_peers { debug!( @@ -3578,11 +3553,56 @@ where return Ok(()); } - fn inbound_peer_sessions(&self) -> Vec { - self.peer_session_map - .values() - .filter_map(|s| (s.session_type == SessionType::Inbound).then_some(s.session_id)) - .collect() + fn session_has_channels(&self, session_id: &SessionId) -> bool { + self.session_channels_map + .get(session_id) + .is_some_and(|channels| !channels.is_empty()) + } + + fn inbound_no_channel_peers_in_connected_order(&self) -> Vec<(Pubkey, SessionId)> { + let mut peers = self + .peer_session_map + .iter() + .filter_map(|(pubkey, peer)| { + (peer.session_type == SessionType::Inbound + && !self.session_has_channels(&peer.session_id)) + .then_some((*pubkey, peer.session_id)) + }) + .collect::>(); + peers.sort_by_key(|(_, session_id)| *session_id); + peers + } + + async fn enforce_inbound_peer_budget(&mut self) { + let inbound_no_channel_peers = self.inbound_no_channel_peers_in_connected_order(); + if inbound_no_channel_peers.len() <= self.max_inbound_peers { + return; + } + let excess_peers = inbound_no_channel_peers.len() - self.max_inbound_peers; + + for (pubkey, session_id) in inbound_no_channel_peers.into_iter().take(excess_peers) { + debug!( + "Disconnecting inbound no-channel peer {:?} on session {:?} immediately after connect", + pubkey, session_id + ); + match self.control.disconnect(session_id).await { + Ok(()) => { + if matches!( + self.peer_session_map.get(&pubkey), + Some(peer) if peer.session_id == session_id + ) { + self.peer_session_map.remove(&pubkey); + } + self.session_channels_map.remove(&session_id); + } + Err(err) => { + error!( + "Failed to disconnect inbound no-channel peer {:?} on session {:?}: {}", + pubkey, session_id, err + ); + } + } + } } fn num_of_outbound_peers(&self) -> usize { @@ -3856,6 +3876,18 @@ where } } + self.enforce_inbound_peer_budget().await; + if !matches!( + self.peer_session_map.get(&remote_pubkey), + Some(peer) if peer.session_id == session.id + ) { + debug!( + "Peer {:?} session {:?} was disconnected by inbound peer admission control", + remote_pubkey, session.id + ); + return; + } + if self.auto_announce { let message = self.get_or_create_new_node_announcement_message(); debug!( diff --git a/crates/fiber-lib/src/fiber/tests/channel.rs b/crates/fiber-lib/src/fiber/tests/channel.rs index 831700c00..c2837214b 100644 --- a/crates/fiber-lib/src/fiber/tests/channel.rs +++ b/crates/fiber-lib/src/fiber/tests/channel.rs @@ -1,6 +1,7 @@ use crate::ckb::tests::test_utils::complete_commitment_tx; use crate::fiber::channel::{ - AddTlcResponse, ReplayOrderHint, UpdateCommand, MAX_COMMITMENT_DELAY_EPOCHS, + AddTlcResponse, ChannelActorStateStore, ReloadParams, ReplayOrderHint, UpdateCommand, + DEFAULT_COMMITMENT_FEE_RATE, DEFAULT_MAX_TLC_VALUE_IN_FLIGHT, MAX_COMMITMENT_DELAY_EPOCHS, MIN_COMMITMENT_DELAY_EPOCHS, XUDT_COMPATIBLE_WITNESS, }; use crate::fiber::config::{ @@ -12,7 +13,8 @@ use crate::fiber::graph::ChannelInfo; use crate::fiber::network::{DebugEvent, FiberMessageWithTarget, PeerDisconnectReason}; use crate::fiber::payment::SendPaymentCommand; use crate::fiber::types::{ - AddTlc, FiberMessage, Hash256, Init, PeeledPaymentOnionPacket, Pubkey, TlcErr, + AddTlc, FiberMessage, Hash256, Init, PeeledPaymentOnionPacket, Pubkey, ReestablishChannel, + TlcErr, }; use crate::invoice::{CkbInvoiceStatus, Currency, InvoiceBuilder}; use crate::test_utils::{init_tracing, NetworkNode}; @@ -20,10 +22,7 @@ use crate::tests::test_utils::*; use crate::{ ckb::contracts::{get_cell_deps, Contract}, fiber::{ - channel::{ - ChannelActorStateStore, ChannelCommand, ChannelCommandWithId, RemoveTlcCommand, - ShutdownCommand, DEFAULT_COMMITMENT_FEE_RATE, - }, + channel::{ChannelCommand, ChannelCommandWithId, RemoveTlcCommand, ShutdownCommand}, config::DEFAULT_AUTO_ACCEPT_CHANNEL_CKB_FUNDING_AMOUNT, network::{AcceptChannelCommand, OpenChannelCommand}, NetworkActorCommand, NetworkActorMessage, @@ -38,9 +37,10 @@ use ckb_types::{ prelude::{AsTransactionBuilder, Builder, Entity, Pack, Unpack}, }; use fiber_types::{ - derive_private_key, derive_tlc_pubkey, AddTlcCommand, ChannelState, HashAlgorithm, - InMemorySigner, NegotiatingFundingFlags, OutboundTlcStatus, PaymentHopData, PaymentStatus, - Privkey, RemoveTlcFulfill, RemoveTlcReason, TLCId, TlcErrorCode, TlcStatus, NO_SHARED_SECRET, + derive_private_key, derive_tlc_pubkey, AddTlcCommand, ChannelConstraints, ChannelState, + HashAlgorithm, InMemorySigner, NegotiatingFundingFlags, OutboundTlcStatus, PaymentHopData, + PaymentStatus, Privkey, RemoveTlcFulfill, RemoveTlcReason, TLCId, TlcErrorCode, TlcStatus, + NO_SHARED_SECRET, }; use fiber_types::{CloseFlags, FeatureVector}; use musig2::secp::Point; @@ -51,6 +51,18 @@ use std::collections::HashSet; use std::time::Duration; use tracing::{debug, error}; +fn create_deferred_replay_test_add_tlc(channel_id: Hash256, tlc_id: u64) -> AddTlc { + AddTlc { + channel_id, + tlc_id, + amount: 1, + payment_hash: gen_rand_sha256_hash(), + expiry: now_timestamp_as_millis_u64() + 60_000, + hash_algorithm: HashAlgorithm::CkbHash, + onion_packet: None, + } +} + #[tokio::test] // Not supported on wasm: require filesystem access async fn test_connect_to_other_node() { @@ -4390,6 +4402,71 @@ async fn test_connect_to_peers_with_mutual_channel_on_restart_1() { assert!(node_b.get_triggered_unexpected_events().await.is_empty()); } +#[tokio::test] +async fn test_reestablished_channel_ready_notification_is_not_delayed() { + init_tracing(); + + let node_a_funding_amount = 100000000000; + let node_b_funding_amount = 11800000000; + + let (mut node_a, mut node_b, channel_id, _) = + NetworkNode::new_2_nodes_with_established_channel( + node_a_funding_amount, + node_b_funding_amount, + true, + ) + .await; + + node_a.restart().await; + + node_a + .expect_event( + |event| matches!(event, NetworkServiceEvent::PeerConnected(id, _addr) if id == &node_b.pubkey), + ) + .await; + + let deadline = tokio::time::Instant::now() + Duration::from_secs(1); + let mut saw_reestablished = false; + let mut saw_channel_ready = false; + + while tokio::time::Instant::now() < deadline && !(saw_reestablished && saw_channel_ready) { + let remaining = deadline.saturating_duration_since(tokio::time::Instant::now()); + let event = tokio::time::timeout(remaining, node_a.event_emitter.recv()) + .await + .expect("timed out while waiting for post-reconnect events") + .expect("event emitter unexpectedly stopped"); + + match event { + NetworkServiceEvent::DebugEvent(DebugEvent::Common(message)) + if message == "Reestablished channel in ChannelReady" => + { + saw_reestablished = true; + } + NetworkServiceEvent::ChannelReady(pubkey, ready_channel_id, _funding_tx_outpoint) + if pubkey == node_b.pubkey && ready_channel_id == channel_id => + { + saw_channel_ready = true; + } + _ => {} + } + } + + assert!( + saw_reestablished, + "expected reestablish completion debug event within 1s after reconnect" + ); + assert!( + saw_channel_ready, + "expected ChannelReady notification within 1s after reconnect completed" + ); + + node_b + .expect_debug_event("Reestablished channel in ChannelReady") + .await; + assert!(node_a.get_triggered_unexpected_events().await.is_empty()); + assert!(node_b.get_triggered_unexpected_events().await.is_empty()); +} + #[tokio::test] async fn test_connect_to_peers_with_mutual_channel_on_restart_2() { init_tracing(); @@ -6460,6 +6537,14 @@ async fn test_reestablish_restores_send_nonce() { " Verify Nonce: {:?}", state.remote_revocation_nonce_for_verify.is_some() ); + assert!( + state.remote_revocation_nonce_for_send.is_some(), + "node_b should restore send nonce after reestablish" + ); + assert!( + state.remote_revocation_nonce_for_verify.is_some(), + "node_b should retain verify nonce after reestablish" + ); let state_a = node_a.get_channel_actor_state(channel_id); println!("Node A State:"); @@ -6479,16 +6564,25 @@ async fn test_reestablish_restores_send_nonce() { " Verify Nonce: {:?}", state_a.remote_revocation_nonce_for_verify.is_some() ); + assert!( + state_a.remote_revocation_nonce_for_send.is_some(), + "node_a should have a send nonce after reestablish completes" + ); + assert!( + state_a.remote_revocation_nonce_for_verify.is_some(), + "node_a should have a verify nonce after reestablish completes" + ); // Further verification: A can send another payment. - // Use a new payment to differentiate. - let res = node_a.send_payment_keysend(&node_b, 2000, false).await; - let err_string = res.unwrap_err().to_string(); - println!("err: {err_string}"); - // check error string - assert!(err_string.contains( - "Send payment first hop error: Failed to send onion packet with error UnknownNextPeer" - )); + let second_payment_hash = node_a + .send_payment_keysend(&node_b, 2000, false) + .await + .expect("send should succeed after send nonce is restored") + .payment_hash; + node_a.wait_until_success(second_payment_hash).await; + node_a + .assert_payment_status(second_payment_hash, PaymentStatus::Success, None) + .await; } /// Bidirectional pending operations during reestablish. @@ -6532,6 +6626,128 @@ async fn test_reestablish_bidirectional_pending() { ); } +#[tokio::test] +async fn test_deferred_peer_tlc_updates_are_bounded_by_channel_constraints() { + init_tracing(); + let (mut node_a, node_b, channel_id, _) = + NetworkNode::new_2_nodes_with_established_channel(100000000000, 100000000000, true).await; + + let mut state = node_a.get_channel_actor_state(channel_id); + state.local_constraints = ChannelConstraints::new(DEFAULT_MAX_TLC_VALUE_IN_FLIGHT, 3); + state.remote_constraints = ChannelConstraints::new(DEFAULT_MAX_TLC_VALUE_IN_FLIGHT, 4); + node_a + .update_channel_actor_state( + state, + Some(ReloadParams { + notify_changes: false, + }), + ) + .await; + + let max_deferred_updates = 7; + + node_a + .network_actor + .send_message(NetworkActorMessage::Command( + NetworkActorCommand::ControlFiberChannel(ChannelCommandWithId { + channel_id, + command: ChannelCommand::SetDeferPeerTlcUpdates(true), + }), + )) + .expect("enable deferred peer TLC replay"); + + for tlc_id in 0..max_deferred_updates { + node_b + .network_actor + .send_message(NetworkActorMessage::Command( + NetworkActorCommand::SendFiberMessage(FiberMessageWithTarget::new( + node_a.pubkey, + FiberMessage::add_tlc(create_deferred_replay_test_add_tlc(channel_id, tlc_id)), + )), + )) + .expect("send deferred add_tlc message"); + } + + node_b + .network_actor + .send_message(NetworkActorMessage::Command( + NetworkActorCommand::SendFiberMessage(FiberMessageWithTarget::new( + node_a.pubkey, + FiberMessage::add_tlc(create_deferred_replay_test_add_tlc( + channel_id, + max_deferred_updates, + )), + )), + )) + .expect("send overflow deferred add_tlc message"); + + node_a + .expect_to_process_event(|event| match event { + NetworkServiceEvent::DebugEvent(DebugEvent::Common(message)) + if message.contains("Too many deferred peer TLC updates") => + { + Some(()) + } + _ => None, + }) + .await; +} + +#[tokio::test] +async fn test_reestablish_does_not_complete_while_waiting_for_peer_revoke_and_ack() { + init_tracing(); + let (mut node_a, node_b, channel_id, _) = + NetworkNode::new_2_nodes_with_established_channel(100000000000, 100000000000, true).await; + + let mut state = node_a.get_channel_actor_state(channel_id); + let peer_commitment_number = state.get_remote_commitment_number(); + state.increment_local_commitment_number(); + state.reestablishing = true; + node_a.update_channel_actor_state(state, None).await; + + while tokio::time::timeout(Duration::from_millis(25), node_a.event_emitter.recv()) + .await + .is_ok() + {} + + node_b + .network_actor + .send_message(NetworkActorMessage::Command( + NetworkActorCommand::SendFiberMessage(FiberMessageWithTarget::new( + node_a.pubkey, + FiberMessage::reestablish_channel(ReestablishChannel { + channel_id, + local_commitment_number: peer_commitment_number, + remote_commitment_number: peer_commitment_number, + }), + )), + )) + .expect("send reestablish message"); + + let mut saw_channel_ready = false; + let deadline = tokio::time::Instant::now() + Duration::from_millis(100); + while tokio::time::Instant::now() < deadline { + let remaining = deadline.saturating_duration_since(tokio::time::Instant::now()); + let Ok(Some(event)) = tokio::time::timeout(remaining, node_a.event_emitter.recv()).await + else { + break; + }; + if matches!( + event, + NetworkServiceEvent::ChannelReady(pubkey, ready_channel_id, _) + if pubkey == node_b.pubkey && ready_channel_id == channel_id + ) { + saw_channel_ready = true; + break; + } + } + + assert!( + !saw_channel_ready, + "channel should not emit ChannelReady before the missing revoke_and_ack arrives" + ); +} + /// Stress test with multiple payments and restarts. /// Tests repeated restart cycles with payments. #[tokio::test] diff --git a/crates/fiber-lib/src/fiber/tests/network.rs b/crates/fiber-lib/src/fiber/tests/network.rs index 63eaabef6..3b3123fa9 100644 --- a/crates/fiber-lib/src/fiber/tests/network.rs +++ b/crates/fiber-lib/src/fiber/tests/network.rs @@ -774,6 +774,228 @@ async fn test_persisting_bootnode() { assert!(peers.is_empty()); } +#[tokio::test] +async fn test_exceeding_inbound_peer_budget_evicts_oldest_no_channel_peer_immediately() { + init_tracing(); + + let mut target = NetworkNode::new_with_config( + NetworkNodeConfigBuilder::new() + .fiber_config_updater(|config| { + config.max_inbound_peers = Some(1); + config.min_outbound_peers = Some(0); + }) + .build(), + ) + .await; + let mut peer1 = NetworkNode::new().await; + let mut peer2 = NetworkNode::new().await; + + peer1.connect_to(&mut target).await; + + peer2.connect_to(&mut target).await; + + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + + let peers = call!(target.network_actor, |rpc_reply| { + NetworkActorMessage::Command(NetworkActorCommand::ListPeers((), rpc_reply)) + }) + .expect("target alive") + .expect("list peers"); + + assert_eq!( + peers.len(), + 1, + "target should have already evicted one no-channel inbound peer before maintenance tick", + ); + assert_eq!( + peers[0].pubkey, peer2.pubkey, + "target should retain the newest inbound no-channel peer" + ); +} + +#[tokio::test] +async fn test_inbound_peer_with_channel_does_not_consume_no_channel_peer_budget() { + init_tracing(); + + let mut target = NetworkNode::new_with_config( + NetworkNodeConfigBuilder::new() + .fiber_config_updater(|config| { + config.max_inbound_peers = Some(1); + config.min_outbound_peers = Some(0); + }) + .build(), + ) + .await; + let mut peer_with_channel = NetworkNode::new().await; + let mut peer_without_channel = NetworkNode::new().await; + + peer_with_channel.connect_to(&mut target).await; + establish_channel_between_nodes( + &mut target, + &mut peer_with_channel, + ChannelParameters::new(100_000_000_000, 11_800_000_000), + ) + .await; + + peer_without_channel.connect_to_nonblocking(&target).await; + + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + + let peers = call!(target.network_actor, |rpc_reply| { + NetworkActorMessage::Command(NetworkActorCommand::ListPeers((), rpc_reply)) + }) + .expect("target alive") + .expect("list peers"); + + assert_eq!( + peers.len(), + 2, + "a peer with a channel should not consume the no-channel inbound peer budget", + ); + assert!(peers + .iter() + .any(|peer| peer.pubkey == peer_with_channel.pubkey)); + assert!(peers + .iter() + .any(|peer| peer.pubkey == peer_without_channel.pubkey)); +} + +#[tokio::test] +async fn test_inbound_peer_with_only_closed_channels_consumes_no_channel_peer_budget() { + init_tracing(); + + let mut target = NetworkNode::new_with_config( + NetworkNodeConfigBuilder::new() + .fiber_config_updater(|config| { + config.max_inbound_peers = Some(1); + config.min_outbound_peers = Some(0); + }) + .build(), + ) + .await; + let mut peer_with_closed_channel = NetworkNode::new().await; + let mut new_peer = NetworkNode::new().await; + + peer_with_closed_channel.connect_to(&mut target).await; + let (channel_id, _funding_tx_hash) = establish_channel_between_nodes( + &mut target, + &mut peer_with_closed_channel, + ChannelParameters::new(100_000_000_000, 11_800_000_000), + ) + .await; + + target + .send_shutdown(channel_id, true) + .await + .expect("force shutdown channel"); + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + target + .send_channel_shutdown_tx_confirmed_event(peer_with_closed_channel.pubkey, channel_id, true) + .await; + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + + new_peer.connect_to_nonblocking(&target).await; + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + + let peers = call!(target.network_actor, |rpc_reply| { + NetworkActorMessage::Command(NetworkActorCommand::ListPeers((), rpc_reply)) + }) + .expect("target alive") + .expect("list peers"); + + assert_eq!( + peers.len(), + 1, + "a peer whose last channel is closed should count as a no-channel peer and be evicted when over budget", + ); + assert_eq!( + peers[0].pubkey, new_peer.pubkey, + "target should retain the newest inbound no-channel peer after the old peer's last channel closes", + ); +} + +#[tokio::test] +async fn test_new_inbound_peer_can_open_channel_after_replacing_oldest_no_channel_peer() { + init_tracing(); + + let funding_amount = 9_900_000_000u128; + let open_channel_auto_accept_min_ckb_funding_amount = Some(funding_amount as u64 + 1); + + let mut target = NetworkNode::new_with_config( + NetworkNodeConfigBuilder::new() + .fiber_config_updater(move |config| { + config.max_inbound_peers = Some(1); + config.min_outbound_peers = Some(0); + config.open_channel_auto_accept_min_ckb_funding_amount = + open_channel_auto_accept_min_ckb_funding_amount; + }) + .build(), + ) + .await; + let mut old_peer = NetworkNode::new().await; + let mut new_peer = NetworkNode::new().await; + + old_peer.connect_to(&mut target).await; + new_peer.connect_to(&mut target).await; + + old_peer + .expect_event( + |event| matches!(event, NetworkServiceEvent::PeerDisConnected(id, _reason) if id == &target.pubkey), + ) + .await; + + let peers = call!(target.network_actor, |rpc_reply| { + NetworkActorMessage::Command(NetworkActorCommand::ListPeers((), rpc_reply)) + }) + .expect("target alive") + .expect("list peers"); + assert_eq!( + peers.len(), + 1, + "target should only retain the newest inbound no-channel peer", + ); + assert_eq!( + peers[0].pubkey, new_peer.pubkey, + "target should retain the new inbound peer after evicting the oldest no-channel peer", + ); + + let target_pubkey = target.pubkey; + let message = |rpc_reply| { + NetworkActorMessage::Command(NetworkActorCommand::OpenChannel( + OpenChannelCommand { + pubkey: target_pubkey, + public: true, + one_way: false, + shutdown_script: None, + funding_amount, + funding_udt_type_script: None, + commitment_fee_rate: None, + commitment_delay_epoch: None, + funding_fee_rate: None, + tlc_expiry_delta: None, + tlc_min_value: None, + tlc_fee_proportional_millionths: None, + max_tlc_number_in_flight: None, + max_tlc_value_in_flight: None, + }, + rpc_reply, + )) + }; + + call!(new_peer.network_actor, message) + .expect("new peer alive") + .expect("open channel"); + target + .expect_event(|event| match event { + NetworkServiceEvent::ChannelPendingToBeAccepted(pubkey, _channel_id) => { + assert_eq!(pubkey, &new_peer.pubkey); + true + } + _ => false, + }) + .await; +} + #[tokio::test] async fn test_persisting_announced_nodes() { init_tracing(); diff --git a/crates/fiber-lib/src/fiber/tests/settle_tlc_set_command_tests.rs b/crates/fiber-lib/src/fiber/tests/settle_tlc_set_command_tests.rs index 61242f185..80a801e7e 100644 --- a/crates/fiber-lib/src/fiber/tests/settle_tlc_set_command_tests.rs +++ b/crates/fiber-lib/src/fiber/tests/settle_tlc_set_command_tests.rs @@ -306,6 +306,7 @@ fn create_test_channel_state_with_tlc( network: None, scheduled_channel_update_handle: None, pending_notify_settle_tlcs: vec![], + pending_reestablish_channel_ready: false, defer_peer_tlc_updates: false, deferred_peer_tlc_updates: std::collections::VecDeque::new(), ephemeral_config: Default::default(), diff --git a/crates/fiber-lib/src/store/sample/sample_channel.rs b/crates/fiber-lib/src/store/sample/sample_channel.rs index e93e39513..519ecb571 100644 --- a/crates/fiber-lib/src/store/sample/sample_channel.rs +++ b/crates/fiber-lib/src/store/sample/sample_channel.rs @@ -32,6 +32,7 @@ impl ChannelActorState { network: None, scheduled_channel_update_handle: None, pending_notify_settle_tlcs: vec![], + pending_reestablish_channel_ready: false, defer_peer_tlc_updates: false, deferred_peer_tlc_updates: VecDeque::new(), ephemeral_config: Default::default(), @@ -53,6 +54,7 @@ impl ChannelActorState { network: None, scheduled_channel_update_handle: None, pending_notify_settle_tlcs: vec![], + pending_reestablish_channel_ready: false, defer_peer_tlc_updates: false, deferred_peer_tlc_updates: VecDeque::new(), ephemeral_config: Default::default(), diff --git a/crates/fiber-lib/src/store/tests/store.rs b/crates/fiber-lib/src/store/tests/store.rs index 00d6e64d7..4e0fcf8da 100644 --- a/crates/fiber-lib/src/store/tests/store.rs +++ b/crates/fiber-lib/src/store/tests/store.rs @@ -569,6 +569,7 @@ fn test_channel_actor_state_store() { network: None, scheduled_channel_update_handle: None, pending_notify_settle_tlcs: vec![], + pending_reestablish_channel_ready: false, defer_peer_tlc_updates: false, deferred_peer_tlc_updates: Default::default(), ephemeral_config: Default::default(), @@ -703,6 +704,7 @@ fn test_serde_channel_actor_state_ciborium() { network: None, scheduled_channel_update_handle: None, pending_notify_settle_tlcs: vec![], + pending_reestablish_channel_ready: false, defer_peer_tlc_updates: false, deferred_peer_tlc_updates: Default::default(), ephemeral_config: Default::default(),