From a03a31dca664d4b8448db48d06ff9029c0bfa684 Mon Sep 17 00:00:00 2001 From: jjy Date: Fri, 13 Mar 2026 14:48:42 +0800 Subject: [PATCH 01/15] fix: enforce no-channel inbound peer budget on connect --- crates/fiber-lib/src/fiber/config.rs | 6 +- crates/fiber-lib/src/fiber/network.rs | 106 +++++++++++++++----- crates/fiber-lib/src/fiber/tests/network.rs | 86 ++++++++++++++++ 3 files changed, 170 insertions(+), 28 deletions(-) diff --git a/crates/fiber-lib/src/fiber/config.rs b/crates/fiber-lib/src/fiber/config.rs index d800ebf18..229a16d5d 100644 --- a/crates/fiber-lib/src/fiber/config.rs +++ b/crates/fiber-lib/src/fiber/config.rs @@ -235,13 +235,13 @@ pub struct FiberConfig { )] pub(crate) gossip_network_maintenance_interval_ms: Option, - /// Maximal number of inbound connections. The node will disconnect inbound connections - /// when the number of inbound connection exceeds this number. [default: 16] + /// Maximal number of inbound peers without channels. The node will disconnect the oldest + /// inbound peers without channels when their number exceeds this budget. [default: 16] #[arg( name = "FIBER_MAX_INBOUND_PEERS", long = "fiber-max-inbound-peers", env, - help = "Maximal number of inbound connections. The node will disconnect inbound connections when the number of inbound connection exceeds this number. [default: 16]" + help = "Maximal number of inbound peers without channels. The node will disconnect the oldest inbound peers without channels when their number exceeds this budget. [default: 16]" )] pub(crate) max_inbound_peers: Option, diff --git a/crates/fiber-lib/src/fiber/network.rs b/crates/fiber-lib/src/fiber/network.rs index 2e8f1ea63..7a307e7d0 100644 --- a/crates/fiber-lib/src/fiber/network.rs +++ b/crates/fiber-lib/src/fiber/network.rs @@ -1147,35 +1147,31 @@ where } } - let mut inbound_peer_sessions = state.inbound_peer_sessions(); - let num_inbound_peers = inbound_peer_sessions.len(); + let inbound_no_channel_peers = state.inbound_no_channel_peers_in_connected_order(); + let num_inbound_no_channel_peers = inbound_no_channel_peers.len(); let num_outbound_peers = state.num_of_outbound_peers(); - debug!("Maintaining network connections ticked: current num inbound peers {}, current num outbound peers {}", num_inbound_peers, num_outbound_peers); + debug!( + "Maintaining network connections ticked: current num inbound no-channel peers {}, current num outbound peers {}", + num_inbound_no_channel_peers, num_outbound_peers + ); - if num_inbound_peers > state.max_inbound_peers { + if num_inbound_no_channel_peers > state.max_inbound_peers { debug!( - "Already connected to {} inbound peers, only wants {} peers, disconnecting some", - num_inbound_peers, state.max_inbound_peers - ); - inbound_peer_sessions.retain(|k| !state.session_channels_map.contains_key(k)); - let sessions_to_disconnect = if inbound_peer_sessions.len() - < num_inbound_peers - state.max_inbound_peers - { - warn!( - "Wants to disconnect more {} inbound peers, but all peers except {:?} have channels, will not disconnect any peer with channels", - num_inbound_peers - state.max_inbound_peers, &inbound_peer_sessions - ); - &inbound_peer_sessions[..] - } else { - &inbound_peer_sessions[..num_inbound_peers - state.max_inbound_peers] - }; + "Already connected to {} inbound no-channel peers, only wants {} peers, disconnecting some", + num_inbound_no_channel_peers, state.max_inbound_peers + ); + let sessions_to_disconnect = inbound_no_channel_peers + .iter() + .take(num_inbound_no_channel_peers - state.max_inbound_peers) + .map(|(_, session_id)| *session_id) + .collect::>(); debug!( - "Disconnecting inbound peer sessions {:?}", + "Disconnecting inbound no-channel peer sessions {:?}", sessions_to_disconnect ); for session in sessions_to_disconnect { - if let Err(err) = state.control.disconnect(*session).await { + if let Err(err) = state.control.disconnect(session).await { error!("Failed to disconnect session: {}", err); } } @@ -2926,6 +2922,7 @@ pub struct NetworkActorState { gossip_actor: Option>, max_inbound_peers: usize, min_outbound_peers: usize, + next_connected_peer_order: u64, // The features of the node, used to indicate the capabilities of the node. features: FeatureVector, channel_ephemeral_config: ChannelEphemeralConfig, @@ -2938,6 +2935,7 @@ pub struct NetworkActorState { pub struct ConnectedPeer { pub session_id: SessionId, pub session_type: SessionType, + pub connected_order: u64, pub address: Multiaddr, pub features: Option, } @@ -3578,13 +3576,55 @@ where return Ok(()); } - fn inbound_peer_sessions(&self) -> Vec { - self.peer_session_map - .values() - .filter_map(|s| (s.session_type == SessionType::Inbound).then_some(s.session_id)) + fn inbound_no_channel_peers_in_connected_order(&self) -> Vec<(Pubkey, SessionId)> { + let mut peers = self + .peer_session_map + .iter() + .filter_map(|(pubkey, peer)| { + (peer.session_type == SessionType::Inbound + && !self.session_channels_map.contains_key(&peer.session_id)) + .then_some((peer.connected_order, *pubkey, peer.session_id)) + }) + .collect::>(); + peers.sort_by_key(|(connected_order, _, _)| *connected_order); + peers + .into_iter() + .map(|(_, pubkey, session_id)| (pubkey, session_id)) .collect() } + async fn enforce_inbound_peer_budget(&mut self) { + let inbound_no_channel_peers = self.inbound_no_channel_peers_in_connected_order(); + if inbound_no_channel_peers.len() <= self.max_inbound_peers { + return; + } + let excess_peers = inbound_no_channel_peers.len() - self.max_inbound_peers; + + for (pubkey, session_id) in inbound_no_channel_peers.into_iter().take(excess_peers) { + debug!( + "Disconnecting inbound no-channel peer {:?} on session {:?} immediately after connect", + pubkey, session_id + ); + match self.control.disconnect(session_id).await { + Ok(()) => { + if matches!( + self.peer_session_map.get(&pubkey), + Some(peer) if peer.session_id == session_id + ) { + self.peer_session_map.remove(&pubkey); + } + self.session_channels_map.remove(&session_id); + } + Err(err) => { + error!( + "Failed to disconnect inbound no-channel peer {:?} on session {:?}: {}", + pubkey, session_id, err + ); + } + } + } + } + fn num_of_outbound_peers(&self) -> usize { self.peer_session_map .values() @@ -3833,11 +3873,14 @@ where async fn on_peer_connected(&mut self, remote_pubkey: Pubkey, session: &SessionContext) { debug!("Peer {:?} connected", remote_pubkey); + let connected_order = self.next_connected_peer_order; + self.next_connected_peer_order = self.next_connected_peer_order.saturating_add(1); self.peer_session_map.insert( remote_pubkey, ConnectedPeer { session_id: session.id, session_type: session.ty, + connected_order, address: session.address.clone(), features: None, }, @@ -3856,6 +3899,18 @@ where } } + self.enforce_inbound_peer_budget().await; + if !matches!( + self.peer_session_map.get(&remote_pubkey), + Some(peer) if peer.session_id == session.id + ) { + debug!( + "Peer {:?} session {:?} was disconnected by inbound peer admission control", + remote_pubkey, session.id + ); + return; + } + if self.auto_announce { let message = self.get_or_create_new_node_announcement_message(); debug!( @@ -4657,6 +4712,7 @@ where gossip_actor, max_inbound_peers: config.max_inbound_peers(), min_outbound_peers: config.min_outbound_peers(), + next_connected_peer_order: 0, features, channel_ephemeral_config: ChannelEphemeralConfig { funding_timeout_seconds: config.funding_timeout_seconds, diff --git a/crates/fiber-lib/src/fiber/tests/network.rs b/crates/fiber-lib/src/fiber/tests/network.rs index 63eaabef6..51ef73717 100644 --- a/crates/fiber-lib/src/fiber/tests/network.rs +++ b/crates/fiber-lib/src/fiber/tests/network.rs @@ -774,6 +774,92 @@ async fn test_persisting_bootnode() { assert!(peers.is_empty()); } +#[tokio::test] +async fn test_exceeding_inbound_peer_budget_evicts_oldest_no_channel_peer_immediately() { + init_tracing(); + + let mut target = NetworkNode::new_with_config( + NetworkNodeConfigBuilder::new() + .fiber_config_updater(|config| { + config.max_inbound_peers = Some(1); + config.min_outbound_peers = Some(0); + }) + .build(), + ) + .await; + let mut peer1 = NetworkNode::new().await; + let mut peer2 = NetworkNode::new().await; + + peer1.connect_to(&mut target).await; + + peer2.connect_to(&mut target).await; + + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + + let peers = call!(target.network_actor, |rpc_reply| { + NetworkActorMessage::Command(NetworkActorCommand::ListPeers((), rpc_reply)) + }) + .expect("target alive") + .expect("list peers"); + + assert_eq!( + peers.len(), + 1, + "target should have already evicted one no-channel inbound peer before maintenance tick", + ); + assert_eq!( + peers[0].pubkey, peer2.pubkey, + "target should retain the newest inbound no-channel peer" + ); +} + +#[tokio::test] +async fn test_inbound_peer_with_channel_does_not_consume_no_channel_peer_budget() { + init_tracing(); + + let mut target = NetworkNode::new_with_config( + NetworkNodeConfigBuilder::new() + .fiber_config_updater(|config| { + config.max_inbound_peers = Some(1); + config.min_outbound_peers = Some(0); + }) + .build(), + ) + .await; + let mut peer_with_channel = NetworkNode::new().await; + let mut peer_without_channel = NetworkNode::new().await; + + peer_with_channel.connect_to(&mut target).await; + establish_channel_between_nodes( + &mut target, + &mut peer_with_channel, + ChannelParameters::new(100_000_000_000, 11_800_000_000), + ) + .await; + + peer_without_channel.connect_to_nonblocking(&target).await; + + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + + let peers = call!(target.network_actor, |rpc_reply| { + NetworkActorMessage::Command(NetworkActorCommand::ListPeers((), rpc_reply)) + }) + .expect("target alive") + .expect("list peers"); + + assert_eq!( + peers.len(), + 2, + "a peer with a channel should not consume the no-channel inbound peer budget", + ); + assert!(peers + .iter() + .any(|peer| peer.pubkey == peer_with_channel.pubkey)); + assert!(peers + .iter() + .any(|peer| peer.pubkey == peer_without_channel.pubkey)); +} + #[tokio::test] async fn test_persisting_announced_nodes() { init_tracing(); From dcfe71bd834b5b6165e64d8c58b53c6497de15c4 Mon Sep 17 00:00:00 2001 From: jjy Date: Fri, 13 Mar 2026 15:46:21 +0800 Subject: [PATCH 02/15] fix: single-flight pending inbound channel opens --- crates/fiber-lib/src/fiber/config.rs | 8 +- crates/fiber-lib/src/fiber/network.rs | 31 ++- crates/fiber-lib/src/fiber/tests/network.rs | 213 ++++++++++++++++++-- 3 files changed, 221 insertions(+), 31 deletions(-) diff --git a/crates/fiber-lib/src/fiber/config.rs b/crates/fiber-lib/src/fiber/config.rs index 229a16d5d..673c8e40c 100644 --- a/crates/fiber-lib/src/fiber/config.rs +++ b/crates/fiber-lib/src/fiber/config.rs @@ -341,21 +341,21 @@ pub struct FiberConfig { #[arg(skip)] pub wasm_key_pair: Option, - /// Max allowed number of channels to be accepted from one peer. [default: 20] + /// Max allowed total number of pending inbound channels awaiting acceptance. [default: 20] #[arg( name = "FIBER_TO_BE_ACCEPTED_CHANNELS_NUMBER_LIMIT", long = "fiber-to-be-accepted-channels-number-limit", env, - help = "Max allowed number of channels to be accepted from one peer. [default: 20]" + help = "Max allowed total number of pending inbound channels awaiting acceptance. [default: 20]" )] pub to_be_accepted_channels_number_limit: Option, - /// Max allowed storage bytes of channels to be accepted from one peer. [default: 50KB] + /// Max allowed total storage bytes of pending inbound channels awaiting acceptance. [default: 50KB] #[arg( name = "FIBER_TO_BE_ACCEPTED_CHANNELS_BYTESS_LIMIT", long = "fiber-to-be-accepted-channels-bytes-limit", env, - help = "Max allowed bytes of channels to be accepted from one peer. [default: 50KB]" + help = "Max allowed total bytes of pending inbound channels awaiting acceptance. [default: 50KB]" )] pub to_be_accepted_channels_bytes_limit: Option, diff --git a/crates/fiber-lib/src/fiber/network.rs b/crates/fiber-lib/src/fiber/network.rs index 7a307e7d0..f6aa9eacc 100644 --- a/crates/fiber-lib/src/fiber/network.rs +++ b/crates/fiber-lib/src/fiber/network.rs @@ -5102,7 +5102,7 @@ impl ToBeAcceptedChannels { self.map.remove(id) } - // insert and apply throttle control + // insert and apply single-flight and aggregate throttle control fn try_insert( &mut self, id: Hash256, @@ -5118,18 +5118,27 @@ impl ToBeAcceptedChannels { return Err(ProcessingChannelError::RepeatedProcessing(err_message)); } + if let Some((existing_channel_id, _)) = self + .map + .iter() + .find(|(_, (saved_pubkey, _))| *saved_pubkey == pubkey) + { + return Err(ProcessingChannelError::ToBeAcceptedChannelsExceedLimit( + format!( + "Peer {:?} already has a pending inbound channel awaiting acceptance: {:?}", + pubkey, existing_channel_id + ), + )); + } + // The map should be small because of the flow control, so calculate the total number and // bytes on the fly. - let (total_number, total_bytes) = self - .map - .values() - .filter(|(saved_pubkey, _)| *saved_pubkey == pubkey) - .fold( - (1, open_channel.mem_size()), - |(count, size), (_, saved_open_channel)| { - (count + 1, size + saved_open_channel.mem_size()) - }, - ); + let (total_number, total_bytes) = self.map.values().fold( + (1, open_channel.mem_size()), + |(count, size), (_, saved_open_channel)| { + (count + 1, size + saved_open_channel.mem_size()) + }, + ); if total_number > self.total_number_limit { return Err(ProcessingChannelError::ToBeAcceptedChannelsExceedLimit( diff --git a/crates/fiber-lib/src/fiber/tests/network.rs b/crates/fiber-lib/src/fiber/tests/network.rs index 51ef73717..b5b672f2b 100644 --- a/crates/fiber-lib/src/fiber/tests/network.rs +++ b/crates/fiber-lib/src/fiber/tests/network.rs @@ -1455,11 +1455,94 @@ async fn test_to_be_accepted_channels_number_limit() { .build(), ) .await; + let mut peer1 = NetworkNode::new().await; + let mut peer2 = NetworkNode::new().await; + let mut peer3 = NetworkNode::new().await; + node.connect_to(&mut peer1).await; + node.connect_to(&mut peer2).await; + node.connect_to(&mut peer3).await; + + let node_pubkey = node.pubkey; + + let message = |rpc_reply| { + NetworkActorMessage::Command(NetworkActorCommand::OpenChannel( + OpenChannelCommand { + pubkey: node_pubkey, + public: true, + one_way: false, + shutdown_script: None, + funding_amount, + funding_udt_type_script: None, + commitment_fee_rate: None, + commitment_delay_epoch: None, + funding_fee_rate: None, + tlc_expiry_delta: None, + tlc_min_value: None, + tlc_fee_proportional_millionths: None, + max_tlc_number_in_flight: None, + max_tlc_value_in_flight: None, + }, + rpc_reply, + )) + }; + + call!(peer1.network_actor, message) + .expect("peer1 alive") + .expect("open channel"); + node.expect_event(|event| match event { + NetworkServiceEvent::ChannelPendingToBeAccepted(pubkey, _channel_id) => { + assert_eq!(pubkey, &peer1.pubkey); + true + } + _ => false, + }) + .await; + + call!(peer2.network_actor, message) + .expect("peer2 alive") + .expect("open channel"); + node.expect_event(|event| match event { + NetworkServiceEvent::ChannelPendingToBeAccepted(pubkey, _channel_id) => { + assert_eq!(pubkey, &peer2.pubkey); + true + } + _ => false, + }) + .await; + + call!(peer3.network_actor, message) + .expect("peer3 alive") + .expect("open channel"); + tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; + + let pending = call!(node.network_actor, |rpc_reply| { + NetworkActorMessage::Command(NetworkActorCommand::GetPendingAcceptChannels(rpc_reply)) + }) + .expect("node alive") + .expect("pending accept channels"); + assert_eq!(pending.len(), 2); + assert!(pending.iter().any(|channel| channel.pubkey == peer1.pubkey)); + assert!(pending.iter().any(|channel| channel.pubkey == peer2.pubkey)); + assert!(!pending.iter().any(|channel| channel.pubkey == peer3.pubkey)); +} + +#[tokio::test] +async fn test_same_peer_second_pending_inbound_channel_is_rejected() { + let funding_amount = 9_900_000_000u128; + let open_channel_auto_accept_min_ckb_funding_amount = Some(funding_amount as u64 + 1); + let mut node = NetworkNode::new_with_config( + NetworkNodeConfigBuilder::new() + .fiber_config_updater(move |config| { + config.open_channel_auto_accept_min_ckb_funding_amount = + open_channel_auto_accept_min_ckb_funding_amount; + }) + .build(), + ) + .await; let mut peer = NetworkNode::new().await; node.connect_to(&mut peer).await; let node_pubkey = node.pubkey; - let message = |rpc_reply| { NetworkActorMessage::Command(NetworkActorCommand::OpenChannel( OpenChannelCommand { @@ -1494,22 +1577,106 @@ async fn test_to_be_accepted_channels_number_limit() { }) .await; + call!(node.network_actor, |rpc_reply| { + NetworkActorMessage::Command(NetworkActorCommand::GetPendingAcceptChannels(rpc_reply)) + }) + .expect("node alive") + .expect("pending accept channels") + .iter() + .find(|pending| pending.pubkey == peer.pubkey) + .expect("first pending channel exists"); + call!(peer.network_actor, message) .expect("peer alive") .expect("open channel"); + tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; + + let pending = call!(node.network_actor, |rpc_reply| { + NetworkActorMessage::Command(NetworkActorCommand::GetPendingAcceptChannels(rpc_reply)) + }) + .expect("node alive") + .expect("pending accept channels"); + assert_eq!( + pending.len(), + 1, + "second pending inbound open from same peer should be rejected" + ); + assert_eq!(pending[0].pubkey, peer.pubkey); +} + +#[tokio::test] +async fn test_different_peers_can_each_have_one_pending_inbound_channel() { + let funding_amount = 9_900_000_000u128; + let open_channel_auto_accept_min_ckb_funding_amount = Some(funding_amount as u64 + 1); + let mut node = NetworkNode::new_with_config( + NetworkNodeConfigBuilder::new() + .fiber_config_updater(move |config| { + config.open_channel_auto_accept_min_ckb_funding_amount = + open_channel_auto_accept_min_ckb_funding_amount; + }) + .build(), + ) + .await; + let mut peer1 = NetworkNode::new().await; + let mut peer2 = NetworkNode::new().await; + node.connect_to(&mut peer1).await; + node.connect_to(&mut peer2).await; + + let node_pubkey = node.pubkey; + let message = |rpc_reply| { + NetworkActorMessage::Command(NetworkActorCommand::OpenChannel( + OpenChannelCommand { + pubkey: node_pubkey, + public: true, + one_way: false, + shutdown_script: None, + funding_amount, + funding_udt_type_script: None, + commitment_fee_rate: None, + commitment_delay_epoch: None, + funding_fee_rate: None, + tlc_expiry_delta: None, + tlc_min_value: None, + tlc_fee_proportional_millionths: None, + max_tlc_number_in_flight: None, + max_tlc_value_in_flight: None, + }, + rpc_reply, + )) + }; + + call!(peer1.network_actor, message) + .expect("peer1 alive") + .expect("open channel"); node.expect_event(|event| match event { NetworkServiceEvent::ChannelPendingToBeAccepted(pubkey, _channel_id) => { - assert_eq!(pubkey, &peer.pubkey); + assert_eq!(pubkey, &peer1.pubkey); true } _ => false, }) .await; - call!(peer.network_actor, message) - .expect("peer alive") + call!(peer2.network_actor, message) + .expect("peer2 alive") .expect("open channel"); - node.expect_debug_event("ChannelPendingToBeRejected").await; + node.expect_event(|event| match event { + NetworkServiceEvent::ChannelPendingToBeAccepted(pubkey, _channel_id) => { + assert_eq!(pubkey, &peer2.pubkey); + true + } + _ => false, + }) + .await; + + let pending = call!(node.network_actor, |rpc_reply| { + NetworkActorMessage::Command(NetworkActorCommand::GetPendingAcceptChannels(rpc_reply)) + }) + .expect("node alive") + .expect("pending accept channels"); + assert_eq!(pending.len(), 2); + assert!(pending.iter().any(|channel| channel.pubkey == peer1.pubkey)); + assert!(pending.iter().any(|channel| channel.pubkey == peer2.pubkey)); } #[tokio::test] @@ -1562,8 +1729,12 @@ async fn test_to_be_accepted_channels_bytes_limit() { .build(), ) .await; - let mut peer = NetworkNode::new().await; - node.connect_to(&mut peer).await; + let mut peer1 = NetworkNode::new().await; + let mut peer2 = NetworkNode::new().await; + let mut peer3 = NetworkNode::new().await; + node.connect_to(&mut peer1).await; + node.connect_to(&mut peer2).await; + node.connect_to(&mut peer3).await; let node_pubkey = node.pubkey; @@ -1589,32 +1760,42 @@ async fn test_to_be_accepted_channels_bytes_limit() { )) }; - call!(peer.network_actor, message) - .expect("peer alive") + call!(peer1.network_actor, message) + .expect("peer1 alive") .expect("open channel"); node.expect_event(|event| match event { NetworkServiceEvent::ChannelPendingToBeAccepted(pubkey, _channel_id) => { - assert_eq!(pubkey, &peer.pubkey); + assert_eq!(pubkey, &peer1.pubkey); true } _ => false, }) .await; - call!(peer.network_actor, message) - .expect("peer alive") + call!(peer2.network_actor, message) + .expect("peer2 alive") .expect("open channel"); node.expect_event(|event| match event { NetworkServiceEvent::ChannelPendingToBeAccepted(pubkey, _channel_id) => { - assert_eq!(pubkey, &peer.pubkey); + assert_eq!(pubkey, &peer2.pubkey); true } _ => false, }) .await; - call!(peer.network_actor, message) - .expect("peer alive") + call!(peer3.network_actor, message) + .expect("peer3 alive") .expect("open channel"); - node.expect_debug_event("ChannelPendingToBeRejected").await; + tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; + + let pending = call!(node.network_actor, |rpc_reply| { + NetworkActorMessage::Command(NetworkActorCommand::GetPendingAcceptChannels(rpc_reply)) + }) + .expect("node alive") + .expect("pending accept channels"); + assert_eq!(pending.len(), 2); + assert!(pending.iter().any(|channel| channel.pubkey == peer1.pubkey)); + assert!(pending.iter().any(|channel| channel.pubkey == peer2.pubkey)); + assert!(!pending.iter().any(|channel| channel.pubkey == peer3.pubkey)); } From 6526d5c0a1a7fd42b9fa106fa127ad6168b7ad47 Mon Sep 17 00:00:00 2001 From: jjy Date: Fri, 13 Mar 2026 16:10:50 +0800 Subject: [PATCH 03/15] fix: emit channel ready when reestablish completes --- crates/fiber-lib/src/fiber/channel.rs | 10 ++-- crates/fiber-lib/src/fiber/tests/channel.rs | 65 +++++++++++++++++++++ 2 files changed, 69 insertions(+), 6 deletions(-) diff --git a/crates/fiber-lib/src/fiber/channel.rs b/crates/fiber-lib/src/fiber/channel.rs index f208cfbaa..1765acf81 100644 --- a/crates/fiber-lib/src/fiber/channel.rs +++ b/crates/fiber-lib/src/fiber/channel.rs @@ -115,7 +115,6 @@ pub const COMMITMENT_CELL_WITNESS_LEN: usize = 16 + 1 + 32 + 64; // triggered 10 times per second, plus we also trigger `apply_retryable_tlc_operations` when // receiving ACK from peer, so it's a reason number for 20 TPS const RETRYABLE_TLC_OPS_INTERVAL: Duration = Duration::from_millis(100); -const WAITING_REESTABLISH_FINISH_TIMEOUT: Duration = Duration::from_millis(4000); // if a important TLC operation is not acked in 30 seconds, we will try to disconnect the peer. #[cfg(not(any(test, feature = "bench")))] @@ -5970,11 +5969,10 @@ impl ChannelActorState { let channel_id = self.get_id(); let pubkey = self.get_remote_pubkey(); self.network() - .send_after(WAITING_REESTABLISH_FINISH_TIMEOUT, move || { - NetworkActorMessage::new_event(NetworkActorEvent::ChannelReady( - channel_id, pubkey, outpoint, - )) - }); + .send_message(NetworkActorMessage::new_event( + NetworkActorEvent::ChannelReady(channel_id, pubkey, outpoint), + )) + .expect(ASSUME_NETWORK_ACTOR_ALIVE); self.on_owned_channel_updated(myself, false); } diff --git a/crates/fiber-lib/src/fiber/tests/channel.rs b/crates/fiber-lib/src/fiber/tests/channel.rs index 831700c00..44daaaba0 100644 --- a/crates/fiber-lib/src/fiber/tests/channel.rs +++ b/crates/fiber-lib/src/fiber/tests/channel.rs @@ -4390,6 +4390,71 @@ async fn test_connect_to_peers_with_mutual_channel_on_restart_1() { assert!(node_b.get_triggered_unexpected_events().await.is_empty()); } +#[tokio::test] +async fn test_reestablished_channel_ready_notification_is_not_delayed() { + init_tracing(); + + let node_a_funding_amount = 100000000000; + let node_b_funding_amount = 11800000000; + + let (mut node_a, mut node_b, channel_id, _) = + NetworkNode::new_2_nodes_with_established_channel( + node_a_funding_amount, + node_b_funding_amount, + true, + ) + .await; + + node_a.restart().await; + + node_a + .expect_event( + |event| matches!(event, NetworkServiceEvent::PeerConnected(id, _addr) if id == &node_b.pubkey), + ) + .await; + + let deadline = tokio::time::Instant::now() + Duration::from_secs(1); + let mut saw_reestablished = false; + let mut saw_channel_ready = false; + + while tokio::time::Instant::now() < deadline && !(saw_reestablished && saw_channel_ready) { + let remaining = deadline.saturating_duration_since(tokio::time::Instant::now()); + let event = tokio::time::timeout(remaining, node_a.event_emitter.recv()) + .await + .expect("timed out while waiting for post-reconnect events") + .expect("event emitter unexpectedly stopped"); + + match event { + NetworkServiceEvent::DebugEvent(DebugEvent::Common(message)) + if message == "Reestablished channel in ChannelReady" => + { + saw_reestablished = true; + } + NetworkServiceEvent::ChannelReady(pubkey, ready_channel_id, _funding_tx_outpoint) + if pubkey == node_b.pubkey && ready_channel_id == channel_id => + { + saw_channel_ready = true; + } + _ => {} + } + } + + assert!( + saw_reestablished, + "expected reestablish completion debug event within 1s after reconnect" + ); + assert!( + saw_channel_ready, + "expected ChannelReady notification within 1s after reconnect completed" + ); + + node_b + .expect_debug_event("Reestablished channel in ChannelReady") + .await; + assert!(node_a.get_triggered_unexpected_events().await.is_empty()); + assert!(node_b.get_triggered_unexpected_events().await.is_empty()); +} + #[tokio::test] async fn test_connect_to_peers_with_mutual_channel_on_restart_2() { init_tracing(); From c4f50bc0c939cbb6f0d9e77fe47d97fbbfe6c386 Mon Sep 17 00:00:00 2001 From: jjy Date: Fri, 13 Mar 2026 17:28:20 +0800 Subject: [PATCH 04/15] fix: bound deferred replay tlc updates --- crates/fiber-lib/src/fiber/channel.rs | 307 +++++++++++++++++++++++++- 1 file changed, 304 insertions(+), 3 deletions(-) diff --git a/crates/fiber-lib/src/fiber/channel.rs b/crates/fiber-lib/src/fiber/channel.rs index 1765acf81..eb100a699 100644 --- a/crates/fiber-lib/src/fiber/channel.rs +++ b/crates/fiber-lib/src/fiber/channel.rs @@ -564,14 +564,17 @@ where } FiberChannelMessage::AddTlc(add_tlc) => { if state.defer_peer_tlc_updates { - state.queue_deferred_peer_tlc_update(DeferredPeerTlcUpdate::Add(add_tlc)); + state + .try_queue_deferred_peer_tlc_update(DeferredPeerTlcUpdate::Add(add_tlc))?; return Ok(()); } self.handle_add_tlc_peer_message(state, add_tlc) } FiberChannelMessage::RemoveTlc(remove_tlc) => { if state.defer_peer_tlc_updates { - state.queue_deferred_peer_tlc_update(DeferredPeerTlcUpdate::Remove(remove_tlc)); + state.try_queue_deferred_peer_tlc_update(DeferredPeerTlcUpdate::Remove( + remove_tlc, + ))?; return Ok(()); } self.handle_remove_tlc_peer_message(state, remove_tlc) @@ -3622,13 +3625,33 @@ impl ChannelActorState { } } - fn queue_deferred_peer_tlc_update(&mut self, update: DeferredPeerTlcUpdate) { + fn max_deferred_peer_tlc_updates(&self) -> usize { + self.local_constraints + .max_tlc_number_in_flight + .saturating_add(self.remote_constraints.max_tlc_number_in_flight) as usize + } + + fn try_queue_deferred_peer_tlc_update( + &mut self, + update: DeferredPeerTlcUpdate, + ) -> ProcessingChannelResult { + let max_deferred_updates = self.max_deferred_peer_tlc_updates(); + if self.deferred_peer_tlc_updates.len() >= max_deferred_updates { + return Err(ProcessingChannelError::InvalidState(format!( + "Too many deferred peer TLC updates while replaying channel {}: queued {}, limit {}", + self.get_id(), + self.deferred_peer_tlc_updates.len(), + max_deferred_updates + ))); + } + self.deferred_peer_tlc_updates.push_back(update); debug!( "Deferred peer TLC update for channel {} (queued={})", self.get_id(), self.deferred_peer_tlc_updates.len() ); + Ok(()) } fn log_ack_state(&self, context: &str) { @@ -7533,3 +7556,281 @@ impl From<&AcceptChannel> for ChannelBasePublicKeys { } } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::gen_rand_sha256_hash; + use crate::invoice::{CkbInvoice, CkbInvoiceStatus, InvoiceError, InvoiceStore, PreimageStore}; + use crate::now_timestamp_as_millis_u64; + use crate::tests::gen_utils::{gen_rand_fiber_private_key, gen_rand_fiber_public_key}; + use ckb_types::packed::{OutPoint, Script}; + use fiber_types::HashAlgorithm; + use std::cell::RefCell; + + struct NoopNetworkActor; + + #[async_trait::async_trait] + impl Actor for NoopNetworkActor { + type Msg = NetworkActorMessage; + type State = (); + type Arguments = (); + + async fn pre_start( + &self, + _myself: ActorRef, + _args: Self::Arguments, + ) -> Result { + Ok(()) + } + + async fn handle( + &self, + _myself: ActorRef, + _message: Self::Msg, + _state: &mut Self::State, + ) -> Result<(), ActorProcessingErr> { + Ok(()) + } + } + + struct NoopChannelMailboxActor; + + #[async_trait::async_trait] + impl Actor for NoopChannelMailboxActor { + type Msg = ChannelActorMessage; + type State = (); + type Arguments = (); + + async fn pre_start( + &self, + _myself: ActorRef, + _args: Self::Arguments, + ) -> Result { + Ok(()) + } + + async fn handle( + &self, + _myself: ActorRef, + _message: Self::Msg, + _state: &mut Self::State, + ) -> Result<(), ActorProcessingErr> { + Ok(()) + } + } + + #[derive(Default)] + struct MockStore { + channel_states: RefCell>, + preimages: RefCell>, + } + + impl InvoiceStore for MockStore { + fn get_invoice(&self, _id: &Hash256) -> Option { + None + } + + fn insert_invoice( + &self, + _invoice: CkbInvoice, + _preimage: Option, + ) -> Result<(), InvoiceError> { + Ok(()) + } + + fn update_invoice_status( + &self, + _id: &Hash256, + _status: CkbInvoiceStatus, + ) -> Result<(), InvoiceError> { + Ok(()) + } + + fn get_invoice_status(&self, _id: &Hash256) -> Option { + None + } + } + + impl PreimageStore for MockStore { + fn insert_preimage(&self, payment_hash: Hash256, preimage: Hash256) { + self.preimages.borrow_mut().insert(payment_hash, preimage); + } + + fn remove_preimage(&self, payment_hash: &Hash256) { + self.preimages.borrow_mut().remove(payment_hash); + } + + fn get_preimage(&self, payment_hash: &Hash256) -> Option { + self.preimages.borrow().get(payment_hash).copied() + } + } + + impl ChannelActorStateStore for MockStore { + fn get_channel_actor_state(&self, id: &Hash256) -> Option { + self.channel_states.borrow().get(id).cloned() + } + + fn insert_channel_actor_state(&self, state: ChannelActorState) { + self.channel_states.borrow_mut().insert(state.id, state); + } + + fn delete_channel_actor_state(&self, id: &Hash256) { + self.channel_states.borrow_mut().remove(id); + } + + fn get_channel_ids_by_pubkey(&self, _pubkey: &Pubkey) -> Vec { + self.channel_states.borrow().keys().copied().collect() + } + + fn get_channel_states( + &self, + _pubkey: Option, + ) -> Vec<(Pubkey, Hash256, ChannelState)> { + vec![] + } + + fn get_channel_state_by_outpoint(&self, _id: &OutPoint) -> Option { + None + } + + fn insert_payment_custom_records( + &self, + _payment_hash: &Hash256, + _custom_records: PaymentCustomRecords, + ) { + } + + fn get_payment_custom_records( + &self, + _payment_hash: &Hash256, + ) -> Option { + None + } + + fn insert_payment_hold_tlc(&self, _payment_hash: Hash256, _hold_tlc: HoldTlc) {} + + fn remove_payment_hold_tlc( + &self, + _payment_hash: &Hash256, + _channel_id: &Hash256, + _tlc_id: u64, + ) { + } + + fn get_payment_hold_tlcs(&self, _payment_hash: Hash256) -> Vec { + vec![] + } + + fn get_node_hold_tlcs(&self) -> HashMap> { + HashMap::default() + } + + fn is_tlc_settled(&self, _channel_id: &Hash256, _payment_hash: &Hash256) -> bool { + false + } + + fn store_pending_commit_diff(&self, _channel_id: &Hash256, _diff: &CommitDiff) {} + + fn get_pending_commit_diff(&self, _channel_id: &Hash256) -> Option { + None + } + + fn delete_pending_commit_diff(&self, _channel_id: &Hash256) {} + } + + fn create_test_state(network: ActorRef) -> ChannelActorState { + let local_private_key = gen_rand_fiber_private_key(); + let local_pubkey = local_private_key.pubkey(); + let remote_pubkey = gen_rand_fiber_public_key(); + let mut state = ChannelActorState::new_outbound_channel( + None, + false, + &[7; 32], + local_pubkey, + remote_pubkey, + 1_000_000, + 0, + DEFAULT_COMMITMENT_FEE_RATE, + DEFAULT_COMMITMENT_DELAY_EPOCHS, + DEFAULT_FEE_RATE, + None, + Script::default(), + DEFAULT_MAX_TLC_VALUE_IN_FLIGHT, + 3, + ChannelTlcInfo::default(), + network, + local_private_key, + ); + state.update_state(ChannelState::ChannelReady); + state.remote_constraints = ChannelConstraints::new(DEFAULT_MAX_TLC_VALUE_IN_FLIGHT, 4); + state.start_defer_peer_tlc_updates(); + state + } + + fn create_test_add_tlc(channel_id: Hash256, tlc_id: u64) -> AddTlc { + AddTlc { + channel_id, + tlc_id, + amount: 1, + payment_hash: gen_rand_sha256_hash(), + expiry: now_timestamp_as_millis_u64() + 60_000, + hash_algorithm: HashAlgorithm::CkbHash, + onion_packet: None, + } + } + + #[tokio::test] + async fn test_deferred_peer_tlc_updates_are_bounded_by_channel_constraints() { + let network = Actor::spawn(None, NoopNetworkActor, ()) + .await + .expect("start noop network actor") + .0; + let myself = Actor::spawn(None, NoopChannelMailboxActor, ()) + .await + .expect("start noop mailbox actor") + .0; + let store = MockStore::default(); + + let mut state = create_test_state(network.clone()); + let actor = ChannelActor::new( + state.get_local_pubkey(), + state.get_remote_pubkey(), + network, + store, + ); + let channel_id = state.get_id(); + + let max_deferred_updates = state.local_constraints.max_tlc_number_in_flight + + state.remote_constraints.max_tlc_number_in_flight; + + for tlc_id in 0..max_deferred_updates { + actor + .handle_peer_message( + &myself, + &mut state, + FiberChannelMessage::AddTlc(create_test_add_tlc(channel_id, tlc_id)), + ) + .await + .expect("updates within channel limits should be deferred"); + } + + let overflow = actor + .handle_peer_message( + &myself, + &mut state, + FiberChannelMessage::AddTlc(create_test_add_tlc(channel_id, max_deferred_updates)), + ) + .await; + + assert!( + overflow.is_err(), + "updates beyond the negotiated TLC limits should be rejected while deferring replay" + ); + assert_eq!( + state.deferred_peer_tlc_updates.len(), + max_deferred_updates as usize, + "deferred replay queue should stop growing once it reaches the negotiated TLC budget" + ); + } +} From 4e78629e2b0f1f781007dd2ade6b2be8a0a1125e Mon Sep 17 00:00:00 2001 From: jjy Date: Fri, 13 Mar 2026 18:14:45 +0800 Subject: [PATCH 05/15] fix: delay channel ready until reestablish ack --- crates/fiber-lib/src/fiber/channel.rs | 141 +++++++++++++++++- .../tests/settle_tlc_set_command_tests.rs | 1 + .../src/store/sample/sample_channel.rs | 2 + crates/fiber-lib/src/store/tests/store.rs | 2 + 4 files changed, 143 insertions(+), 3 deletions(-) diff --git a/crates/fiber-lib/src/fiber/channel.rs b/crates/fiber-lib/src/fiber/channel.rs index eb100a699..1e4512a42 100644 --- a/crates/fiber-lib/src/fiber/channel.rs +++ b/crates/fiber-lib/src/fiber/channel.rs @@ -531,6 +531,10 @@ where self.apply_retryable_tlc_operations(myself, state, false) .await; } + if state.finish_pending_reestablish_channel_ready(myself) { + state.schedule_next_retry_task(myself); + debug_event!(self.network, "Reestablished channel in ChannelReady"); + } Ok(()) } FiberChannelMessage::ChannelReady(_channel_ready) => { @@ -3254,6 +3258,10 @@ pub struct ChannelActorState { pub core: ChannelActorData, // --- Runtime-only fields (not serialized) --- + /// Reestablish replay has resumed message flow, but we still owe the network actor a + /// `ChannelReady` notification once the missing peer acknowledgment arrives. + #[doc = "skip_store"] + pub pending_reestablish_channel_ready: bool, /// Temporarily defer peer TLC updates while replaying dual-owed state. #[doc = "skip_store"] pub defer_peer_tlc_updates: bool, @@ -3317,6 +3325,7 @@ impl<'de> Deserialize<'de> for ChannelActorState { network: None, scheduled_channel_update_handle: None, pending_notify_settle_tlcs: vec![], + pending_reestablish_channel_ready: false, defer_peer_tlc_updates: false, deferred_peer_tlc_updates: VecDeque::new(), ephemeral_config: Default::default(), @@ -4078,6 +4087,7 @@ impl ChannelActorState { network: Some(network), scheduled_channel_update_handle: None, pending_notify_settle_tlcs: vec![], + pending_reestablish_channel_ready: false, defer_peer_tlc_updates: false, deferred_peer_tlc_updates: VecDeque::new(), ephemeral_config: Default::default(), @@ -4168,6 +4178,7 @@ impl ChannelActorState { network: Some(network), scheduled_channel_update_handle: None, pending_notify_settle_tlcs: vec![], + pending_reestablish_channel_ready: false, defer_peer_tlc_updates: false, deferred_peer_tlc_updates: VecDeque::new(), ephemeral_config: Default::default(), @@ -5985,6 +5996,7 @@ impl ChannelActorState { return; }; + self.pending_reestablish_channel_ready = false; self.reestablishing = false; // If the channel is already ready, we should notify the network actor. @@ -5999,6 +6011,18 @@ impl ChannelActorState { self.on_owned_channel_updated(myself, false); } + fn finish_pending_reestablish_channel_ready( + &mut self, + myself: &ActorRef, + ) -> bool { + if !self.pending_reestablish_channel_ready { + return false; + } + + self.on_reestablished_channel_ready(myself); + true + } + fn resume_funding(&mut self, myself: &ActorRef) { match self.state { ChannelState::AwaitingTxSignatures(mut flags) => { @@ -6277,12 +6301,14 @@ impl ChannelActorState { } ChannelState::ChannelReady => { self.clear_waiting_peer_response(); + self.pending_reestablish_channel_ready = false; let my_local_commitment_number = self.get_local_commitment_number(); let my_remote_commitment_number = self.get_remote_commitment_number(); let my_waiting_ack = self.tlc_state.waiting_ack; let peer_local_commitment_number = reestablish_channel.local_commitment_number; let peer_remote_commitment_number = reestablish_channel.remote_commitment_number; + let mut reestablish_complete = true; warn!( "peer: {:?} \ @@ -6382,11 +6408,18 @@ impl ChannelActorState { self.resend_tlcs_on_reestablish(true)?; } } else { - // ignore, waiting for remote peer to resend revoke_and_ack + // Wait for the peer to resend the missing revoke_and_ack before declaring the + // channel ready again. We must resume normal message handling so that ack can + // be processed, but we delay the ready notification until then. + self.reestablishing = false; + self.pending_reestablish_channel_ready = true; + reestablish_complete = false; } - self.on_reestablished_channel_ready(myself); - debug_event!(network, "Reestablished channel in ChannelReady"); + if reestablish_complete { + self.on_reestablished_channel_ready(myself); + debug_event!(network, "Reestablished channel in ChannelReady"); + } } ChannelState::ShuttingDown(flags) => { // Resend the shutdown message to the peer if we have not received the peer's shutdown message. @@ -7567,6 +7600,7 @@ mod tests { use ckb_types::packed::{OutPoint, Script}; use fiber_types::HashAlgorithm; use std::cell::RefCell; + use tokio::sync::mpsc; struct NoopNetworkActor; @@ -7594,6 +7628,35 @@ mod tests { } } + struct RecordingNetworkActor { + sender: mpsc::UnboundedSender, + } + + #[async_trait::async_trait] + impl Actor for RecordingNetworkActor { + type Msg = NetworkActorMessage; + type State = (); + type Arguments = mpsc::UnboundedSender; + + async fn pre_start( + &self, + _myself: ActorRef, + _sender: Self::Arguments, + ) -> Result { + Ok(()) + } + + async fn handle( + &self, + _myself: ActorRef, + message: Self::Msg, + _state: &mut Self::State, + ) -> Result<(), ActorProcessingErr> { + self.sender.send(message).expect("record network message"); + Ok(()) + } + } + struct NoopChannelMailboxActor; #[async_trait::async_trait] @@ -7768,6 +7831,16 @@ mod tests { state } + fn create_ready_test_state(network: ActorRef) -> ChannelActorState { + let mut state = create_test_state(network); + state.defer_peer_tlc_updates = false; + state.deferred_peer_tlc_updates.clear(); + state.funding_tx = Some(Transaction::default()); + state.funding_tx_confirmed_at = + Some((Default::default(), 0, now_timestamp_as_millis_u64())); + state + } + fn create_test_add_tlc(channel_id: Hash256, tlc_id: u64) -> AddTlc { AddTlc { channel_id, @@ -7833,4 +7906,66 @@ mod tests { "deferred replay queue should stop growing once it reaches the negotiated TLC budget" ); } + + #[tokio::test] + async fn test_reestablish_does_not_complete_while_waiting_for_peer_revoke_and_ack() { + let (tx, mut rx) = mpsc::unbounded_channel(); + let network = Actor::spawn(None, RecordingNetworkActor { sender: tx.clone() }, tx) + .await + .expect("start recording network actor") + .0; + let myself = Actor::spawn(None, NoopChannelMailboxActor, ()) + .await + .expect("start noop mailbox actor") + .0; + let store = MockStore::default(); + + let mut state = create_ready_test_state(network.clone()); + state.increment_local_commitment_number(); + state.reestablishing = true; + + let actor = ChannelActor::new( + state.get_local_pubkey(), + state.get_remote_pubkey(), + network, + store, + ); + let reestablish = ReestablishChannel { + channel_id: state.get_id(), + local_commitment_number: state.get_remote_commitment_number(), + remote_commitment_number: state.get_remote_commitment_number(), + }; + + while rx.try_recv().is_ok() {} + + actor + .handle_peer_message( + &myself, + &mut state, + FiberChannelMessage::ReestablishChannel(reestablish), + ) + .await + .expect("peer reestablish should be processed"); + + let mut saw_channel_ready = false; + let deadline = tokio::time::Instant::now() + Duration::from_millis(100); + while tokio::time::Instant::now() < deadline { + let remaining = deadline.saturating_duration_since(tokio::time::Instant::now()); + let Ok(Some(message)) = tokio::time::timeout(remaining, rx.recv()).await else { + break; + }; + if matches!( + message, + NetworkActorMessage::Event(NetworkActorEvent::ChannelReady(..)) + ) { + saw_channel_ready = true; + break; + } + } + + assert!( + !saw_channel_ready, + "channel should not emit ChannelReady before the missing revoke_and_ack arrives" + ); + } } diff --git a/crates/fiber-lib/src/fiber/tests/settle_tlc_set_command_tests.rs b/crates/fiber-lib/src/fiber/tests/settle_tlc_set_command_tests.rs index 61242f185..80a801e7e 100644 --- a/crates/fiber-lib/src/fiber/tests/settle_tlc_set_command_tests.rs +++ b/crates/fiber-lib/src/fiber/tests/settle_tlc_set_command_tests.rs @@ -306,6 +306,7 @@ fn create_test_channel_state_with_tlc( network: None, scheduled_channel_update_handle: None, pending_notify_settle_tlcs: vec![], + pending_reestablish_channel_ready: false, defer_peer_tlc_updates: false, deferred_peer_tlc_updates: std::collections::VecDeque::new(), ephemeral_config: Default::default(), diff --git a/crates/fiber-lib/src/store/sample/sample_channel.rs b/crates/fiber-lib/src/store/sample/sample_channel.rs index e93e39513..519ecb571 100644 --- a/crates/fiber-lib/src/store/sample/sample_channel.rs +++ b/crates/fiber-lib/src/store/sample/sample_channel.rs @@ -32,6 +32,7 @@ impl ChannelActorState { network: None, scheduled_channel_update_handle: None, pending_notify_settle_tlcs: vec![], + pending_reestablish_channel_ready: false, defer_peer_tlc_updates: false, deferred_peer_tlc_updates: VecDeque::new(), ephemeral_config: Default::default(), @@ -53,6 +54,7 @@ impl ChannelActorState { network: None, scheduled_channel_update_handle: None, pending_notify_settle_tlcs: vec![], + pending_reestablish_channel_ready: false, defer_peer_tlc_updates: false, deferred_peer_tlc_updates: VecDeque::new(), ephemeral_config: Default::default(), diff --git a/crates/fiber-lib/src/store/tests/store.rs b/crates/fiber-lib/src/store/tests/store.rs index 00d6e64d7..4e0fcf8da 100644 --- a/crates/fiber-lib/src/store/tests/store.rs +++ b/crates/fiber-lib/src/store/tests/store.rs @@ -569,6 +569,7 @@ fn test_channel_actor_state_store() { network: None, scheduled_channel_update_handle: None, pending_notify_settle_tlcs: vec![], + pending_reestablish_channel_ready: false, defer_peer_tlc_updates: false, deferred_peer_tlc_updates: Default::default(), ephemeral_config: Default::default(), @@ -703,6 +704,7 @@ fn test_serde_channel_actor_state_ciborium() { network: None, scheduled_channel_update_handle: None, pending_notify_settle_tlcs: vec![], + pending_reestablish_channel_ready: false, defer_peer_tlc_updates: false, deferred_peer_tlc_updates: Default::default(), ephemeral_config: Default::default(), From a82dd94f29f9f302b2ba03539e64198266f94f1e Mon Sep 17 00:00:00 2001 From: jjy Date: Fri, 13 Mar 2026 19:28:25 +0800 Subject: [PATCH 06/15] test: move reestablish regression into channel test module --- crates/fiber-lib/src/fiber/channel.rs | 103 -------------------- crates/fiber-lib/src/fiber/tests/channel.rs | 58 ++++++++++- 2 files changed, 57 insertions(+), 104 deletions(-) diff --git a/crates/fiber-lib/src/fiber/channel.rs b/crates/fiber-lib/src/fiber/channel.rs index 1e4512a42..a322cf32e 100644 --- a/crates/fiber-lib/src/fiber/channel.rs +++ b/crates/fiber-lib/src/fiber/channel.rs @@ -7600,8 +7600,6 @@ mod tests { use ckb_types::packed::{OutPoint, Script}; use fiber_types::HashAlgorithm; use std::cell::RefCell; - use tokio::sync::mpsc; - struct NoopNetworkActor; #[async_trait::async_trait] @@ -7628,35 +7626,6 @@ mod tests { } } - struct RecordingNetworkActor { - sender: mpsc::UnboundedSender, - } - - #[async_trait::async_trait] - impl Actor for RecordingNetworkActor { - type Msg = NetworkActorMessage; - type State = (); - type Arguments = mpsc::UnboundedSender; - - async fn pre_start( - &self, - _myself: ActorRef, - _sender: Self::Arguments, - ) -> Result { - Ok(()) - } - - async fn handle( - &self, - _myself: ActorRef, - message: Self::Msg, - _state: &mut Self::State, - ) -> Result<(), ActorProcessingErr> { - self.sender.send(message).expect("record network message"); - Ok(()) - } - } - struct NoopChannelMailboxActor; #[async_trait::async_trait] @@ -7831,16 +7800,6 @@ mod tests { state } - fn create_ready_test_state(network: ActorRef) -> ChannelActorState { - let mut state = create_test_state(network); - state.defer_peer_tlc_updates = false; - state.deferred_peer_tlc_updates.clear(); - state.funding_tx = Some(Transaction::default()); - state.funding_tx_confirmed_at = - Some((Default::default(), 0, now_timestamp_as_millis_u64())); - state - } - fn create_test_add_tlc(channel_id: Hash256, tlc_id: u64) -> AddTlc { AddTlc { channel_id, @@ -7906,66 +7865,4 @@ mod tests { "deferred replay queue should stop growing once it reaches the negotiated TLC budget" ); } - - #[tokio::test] - async fn test_reestablish_does_not_complete_while_waiting_for_peer_revoke_and_ack() { - let (tx, mut rx) = mpsc::unbounded_channel(); - let network = Actor::spawn(None, RecordingNetworkActor { sender: tx.clone() }, tx) - .await - .expect("start recording network actor") - .0; - let myself = Actor::spawn(None, NoopChannelMailboxActor, ()) - .await - .expect("start noop mailbox actor") - .0; - let store = MockStore::default(); - - let mut state = create_ready_test_state(network.clone()); - state.increment_local_commitment_number(); - state.reestablishing = true; - - let actor = ChannelActor::new( - state.get_local_pubkey(), - state.get_remote_pubkey(), - network, - store, - ); - let reestablish = ReestablishChannel { - channel_id: state.get_id(), - local_commitment_number: state.get_remote_commitment_number(), - remote_commitment_number: state.get_remote_commitment_number(), - }; - - while rx.try_recv().is_ok() {} - - actor - .handle_peer_message( - &myself, - &mut state, - FiberChannelMessage::ReestablishChannel(reestablish), - ) - .await - .expect("peer reestablish should be processed"); - - let mut saw_channel_ready = false; - let deadline = tokio::time::Instant::now() + Duration::from_millis(100); - while tokio::time::Instant::now() < deadline { - let remaining = deadline.saturating_duration_since(tokio::time::Instant::now()); - let Ok(Some(message)) = tokio::time::timeout(remaining, rx.recv()).await else { - break; - }; - if matches!( - message, - NetworkActorMessage::Event(NetworkActorEvent::ChannelReady(..)) - ) { - saw_channel_ready = true; - break; - } - } - - assert!( - !saw_channel_ready, - "channel should not emit ChannelReady before the missing revoke_and_ack arrives" - ); - } } diff --git a/crates/fiber-lib/src/fiber/tests/channel.rs b/crates/fiber-lib/src/fiber/tests/channel.rs index 44daaaba0..3c9bec2d9 100644 --- a/crates/fiber-lib/src/fiber/tests/channel.rs +++ b/crates/fiber-lib/src/fiber/tests/channel.rs @@ -12,7 +12,8 @@ use crate::fiber::graph::ChannelInfo; use crate::fiber::network::{DebugEvent, FiberMessageWithTarget, PeerDisconnectReason}; use crate::fiber::payment::SendPaymentCommand; use crate::fiber::types::{ - AddTlc, FiberMessage, Hash256, Init, PeeledPaymentOnionPacket, Pubkey, TlcErr, + AddTlc, FiberMessage, Hash256, Init, PeeledPaymentOnionPacket, Pubkey, ReestablishChannel, + TlcErr, }; use crate::invoice::{CkbInvoiceStatus, Currency, InvoiceBuilder}; use crate::test_utils::{init_tracing, NetworkNode}; @@ -6597,6 +6598,61 @@ async fn test_reestablish_bidirectional_pending() { ); } +#[tokio::test] +async fn test_reestablish_does_not_complete_while_waiting_for_peer_revoke_and_ack() { + init_tracing(); + let (mut node_a, node_b, channel_id, _) = + NetworkNode::new_2_nodes_with_established_channel(100000000000, 100000000000, true).await; + + let mut state = node_a.get_channel_actor_state(channel_id); + let peer_commitment_number = state.get_remote_commitment_number(); + state.increment_local_commitment_number(); + state.reestablishing = true; + node_a.update_channel_actor_state(state, None).await; + + while tokio::time::timeout(Duration::from_millis(25), node_a.event_emitter.recv()) + .await + .is_ok() + {} + + node_b + .network_actor + .send_message(NetworkActorMessage::Command( + NetworkActorCommand::SendFiberMessage(FiberMessageWithTarget::new( + node_a.pubkey, + FiberMessage::reestablish_channel(ReestablishChannel { + channel_id, + local_commitment_number: peer_commitment_number, + remote_commitment_number: peer_commitment_number, + }), + )), + )) + .expect("send reestablish message"); + + let mut saw_channel_ready = false; + let deadline = tokio::time::Instant::now() + Duration::from_millis(100); + while tokio::time::Instant::now() < deadline { + let remaining = deadline.saturating_duration_since(tokio::time::Instant::now()); + let Ok(Some(event)) = tokio::time::timeout(remaining, node_a.event_emitter.recv()).await + else { + break; + }; + if matches!( + event, + NetworkServiceEvent::ChannelReady(pubkey, ready_channel_id, _) + if pubkey == node_b.pubkey && ready_channel_id == channel_id + ) { + saw_channel_ready = true; + break; + } + } + + assert!( + !saw_channel_ready, + "channel should not emit ChannelReady before the missing revoke_and_ack arrives" + ); +} + /// Stress test with multiple payments and restarts. /// Tests repeated restart cycles with payments. #[tokio::test] From 50c697c6c2d71a4dcaec55dc88340bc59850fb27 Mon Sep 17 00:00:00 2001 From: jjy Date: Fri, 13 Mar 2026 19:45:29 +0800 Subject: [PATCH 07/15] test: move channel actor coverage into fiber test module --- crates/fiber-lib/src/fiber/channel.rs | 277 ------------------ crates/fiber-lib/src/fiber/tests/channel.rs | 299 +++++++++++++++++++- 2 files changed, 284 insertions(+), 292 deletions(-) diff --git a/crates/fiber-lib/src/fiber/channel.rs b/crates/fiber-lib/src/fiber/channel.rs index a322cf32e..39bb94b09 100644 --- a/crates/fiber-lib/src/fiber/channel.rs +++ b/crates/fiber-lib/src/fiber/channel.rs @@ -7589,280 +7589,3 @@ impl From<&AcceptChannel> for ChannelBasePublicKeys { } } } - -#[cfg(test)] -mod tests { - use super::*; - use crate::gen_rand_sha256_hash; - use crate::invoice::{CkbInvoice, CkbInvoiceStatus, InvoiceError, InvoiceStore, PreimageStore}; - use crate::now_timestamp_as_millis_u64; - use crate::tests::gen_utils::{gen_rand_fiber_private_key, gen_rand_fiber_public_key}; - use ckb_types::packed::{OutPoint, Script}; - use fiber_types::HashAlgorithm; - use std::cell::RefCell; - struct NoopNetworkActor; - - #[async_trait::async_trait] - impl Actor for NoopNetworkActor { - type Msg = NetworkActorMessage; - type State = (); - type Arguments = (); - - async fn pre_start( - &self, - _myself: ActorRef, - _args: Self::Arguments, - ) -> Result { - Ok(()) - } - - async fn handle( - &self, - _myself: ActorRef, - _message: Self::Msg, - _state: &mut Self::State, - ) -> Result<(), ActorProcessingErr> { - Ok(()) - } - } - - struct NoopChannelMailboxActor; - - #[async_trait::async_trait] - impl Actor for NoopChannelMailboxActor { - type Msg = ChannelActorMessage; - type State = (); - type Arguments = (); - - async fn pre_start( - &self, - _myself: ActorRef, - _args: Self::Arguments, - ) -> Result { - Ok(()) - } - - async fn handle( - &self, - _myself: ActorRef, - _message: Self::Msg, - _state: &mut Self::State, - ) -> Result<(), ActorProcessingErr> { - Ok(()) - } - } - - #[derive(Default)] - struct MockStore { - channel_states: RefCell>, - preimages: RefCell>, - } - - impl InvoiceStore for MockStore { - fn get_invoice(&self, _id: &Hash256) -> Option { - None - } - - fn insert_invoice( - &self, - _invoice: CkbInvoice, - _preimage: Option, - ) -> Result<(), InvoiceError> { - Ok(()) - } - - fn update_invoice_status( - &self, - _id: &Hash256, - _status: CkbInvoiceStatus, - ) -> Result<(), InvoiceError> { - Ok(()) - } - - fn get_invoice_status(&self, _id: &Hash256) -> Option { - None - } - } - - impl PreimageStore for MockStore { - fn insert_preimage(&self, payment_hash: Hash256, preimage: Hash256) { - self.preimages.borrow_mut().insert(payment_hash, preimage); - } - - fn remove_preimage(&self, payment_hash: &Hash256) { - self.preimages.borrow_mut().remove(payment_hash); - } - - fn get_preimage(&self, payment_hash: &Hash256) -> Option { - self.preimages.borrow().get(payment_hash).copied() - } - } - - impl ChannelActorStateStore for MockStore { - fn get_channel_actor_state(&self, id: &Hash256) -> Option { - self.channel_states.borrow().get(id).cloned() - } - - fn insert_channel_actor_state(&self, state: ChannelActorState) { - self.channel_states.borrow_mut().insert(state.id, state); - } - - fn delete_channel_actor_state(&self, id: &Hash256) { - self.channel_states.borrow_mut().remove(id); - } - - fn get_channel_ids_by_pubkey(&self, _pubkey: &Pubkey) -> Vec { - self.channel_states.borrow().keys().copied().collect() - } - - fn get_channel_states( - &self, - _pubkey: Option, - ) -> Vec<(Pubkey, Hash256, ChannelState)> { - vec![] - } - - fn get_channel_state_by_outpoint(&self, _id: &OutPoint) -> Option { - None - } - - fn insert_payment_custom_records( - &self, - _payment_hash: &Hash256, - _custom_records: PaymentCustomRecords, - ) { - } - - fn get_payment_custom_records( - &self, - _payment_hash: &Hash256, - ) -> Option { - None - } - - fn insert_payment_hold_tlc(&self, _payment_hash: Hash256, _hold_tlc: HoldTlc) {} - - fn remove_payment_hold_tlc( - &self, - _payment_hash: &Hash256, - _channel_id: &Hash256, - _tlc_id: u64, - ) { - } - - fn get_payment_hold_tlcs(&self, _payment_hash: Hash256) -> Vec { - vec![] - } - - fn get_node_hold_tlcs(&self) -> HashMap> { - HashMap::default() - } - - fn is_tlc_settled(&self, _channel_id: &Hash256, _payment_hash: &Hash256) -> bool { - false - } - - fn store_pending_commit_diff(&self, _channel_id: &Hash256, _diff: &CommitDiff) {} - - fn get_pending_commit_diff(&self, _channel_id: &Hash256) -> Option { - None - } - - fn delete_pending_commit_diff(&self, _channel_id: &Hash256) {} - } - - fn create_test_state(network: ActorRef) -> ChannelActorState { - let local_private_key = gen_rand_fiber_private_key(); - let local_pubkey = local_private_key.pubkey(); - let remote_pubkey = gen_rand_fiber_public_key(); - let mut state = ChannelActorState::new_outbound_channel( - None, - false, - &[7; 32], - local_pubkey, - remote_pubkey, - 1_000_000, - 0, - DEFAULT_COMMITMENT_FEE_RATE, - DEFAULT_COMMITMENT_DELAY_EPOCHS, - DEFAULT_FEE_RATE, - None, - Script::default(), - DEFAULT_MAX_TLC_VALUE_IN_FLIGHT, - 3, - ChannelTlcInfo::default(), - network, - local_private_key, - ); - state.update_state(ChannelState::ChannelReady); - state.remote_constraints = ChannelConstraints::new(DEFAULT_MAX_TLC_VALUE_IN_FLIGHT, 4); - state.start_defer_peer_tlc_updates(); - state - } - - fn create_test_add_tlc(channel_id: Hash256, tlc_id: u64) -> AddTlc { - AddTlc { - channel_id, - tlc_id, - amount: 1, - payment_hash: gen_rand_sha256_hash(), - expiry: now_timestamp_as_millis_u64() + 60_000, - hash_algorithm: HashAlgorithm::CkbHash, - onion_packet: None, - } - } - - #[tokio::test] - async fn test_deferred_peer_tlc_updates_are_bounded_by_channel_constraints() { - let network = Actor::spawn(None, NoopNetworkActor, ()) - .await - .expect("start noop network actor") - .0; - let myself = Actor::spawn(None, NoopChannelMailboxActor, ()) - .await - .expect("start noop mailbox actor") - .0; - let store = MockStore::default(); - - let mut state = create_test_state(network.clone()); - let actor = ChannelActor::new( - state.get_local_pubkey(), - state.get_remote_pubkey(), - network, - store, - ); - let channel_id = state.get_id(); - - let max_deferred_updates = state.local_constraints.max_tlc_number_in_flight - + state.remote_constraints.max_tlc_number_in_flight; - - for tlc_id in 0..max_deferred_updates { - actor - .handle_peer_message( - &myself, - &mut state, - FiberChannelMessage::AddTlc(create_test_add_tlc(channel_id, tlc_id)), - ) - .await - .expect("updates within channel limits should be deferred"); - } - - let overflow = actor - .handle_peer_message( - &myself, - &mut state, - FiberChannelMessage::AddTlc(create_test_add_tlc(channel_id, max_deferred_updates)), - ) - .await; - - assert!( - overflow.is_err(), - "updates beyond the negotiated TLC limits should be rejected while deferring replay" - ); - assert_eq!( - state.deferred_peer_tlc_updates.len(), - max_deferred_updates as usize, - "deferred replay queue should stop growing once it reaches the negotiated TLC budget" - ); - } -} diff --git a/crates/fiber-lib/src/fiber/tests/channel.rs b/crates/fiber-lib/src/fiber/tests/channel.rs index 3c9bec2d9..9e9002a0e 100644 --- a/crates/fiber-lib/src/fiber/tests/channel.rs +++ b/crates/fiber-lib/src/fiber/tests/channel.rs @@ -1,7 +1,9 @@ use crate::ckb::tests::test_utils::complete_commitment_tx; use crate::fiber::channel::{ - AddTlcResponse, ReplayOrderHint, UpdateCommand, MAX_COMMITMENT_DELAY_EPOCHS, - MIN_COMMITMENT_DELAY_EPOCHS, XUDT_COMPATIBLE_WITNESS, + AddTlcResponse, ChannelActor, ChannelActorMessage, ChannelActorState, ChannelActorStateStore, + CommitDiff, ReplayOrderHint, UpdateCommand, DEFAULT_COMMITMENT_FEE_RATE, DEFAULT_FEE_RATE, + DEFAULT_MAX_TLC_VALUE_IN_FLIGHT, MAX_COMMITMENT_DELAY_EPOCHS, MIN_COMMITMENT_DELAY_EPOCHS, + XUDT_COMPATIBLE_WITNESS, }; use crate::fiber::config::{ DEFAULT_COMMITMENT_DELAY_EPOCHS, DEFAULT_FINAL_TLC_EXPIRY_DELTA, DEFAULT_TLC_EXPIRY_DELTA, @@ -12,19 +14,19 @@ use crate::fiber::graph::ChannelInfo; use crate::fiber::network::{DebugEvent, FiberMessageWithTarget, PeerDisconnectReason}; use crate::fiber::payment::SendPaymentCommand; use crate::fiber::types::{ - AddTlc, FiberMessage, Hash256, Init, PeeledPaymentOnionPacket, Pubkey, ReestablishChannel, - TlcErr, + AddTlc, FiberChannelMessage, FiberMessage, Hash256, HoldTlc, Init, PeeledPaymentOnionPacket, + Pubkey, ReestablishChannel, TlcErr, +}; +use crate::invoice::{ + CkbInvoice, CkbInvoiceStatus, Currency, InvoiceBuilder, InvoiceError, InvoiceStore, + PreimageStore, }; -use crate::invoice::{CkbInvoiceStatus, Currency, InvoiceBuilder}; use crate::test_utils::{init_tracing, NetworkNode}; use crate::tests::test_utils::*; use crate::{ ckb::contracts::{get_cell_deps, Contract}, fiber::{ - channel::{ - ChannelActorStateStore, ChannelCommand, ChannelCommandWithId, RemoveTlcCommand, - ShutdownCommand, DEFAULT_COMMITMENT_FEE_RATE, - }, + channel::{ChannelCommand, ChannelCommandWithId, RemoveTlcCommand, ShutdownCommand}, config::DEFAULT_AUTO_ACCEPT_CHANNEL_CKB_FUNDING_AMOUNT, network::{AcceptChannelCommand, OpenChannelCommand}, NetworkActorCommand, NetworkActorMessage, @@ -35,23 +37,231 @@ use crate::{ use ckb_types::core::EpochNumberWithFraction; use ckb_types::{ core::{tx_pool::TxStatus, FeeRate}, - packed::{CellInput, Script, Transaction}, + packed::{CellInput, OutPoint, Script, Transaction}, prelude::{AsTransactionBuilder, Builder, Entity, Pack, Unpack}, }; use fiber_types::{ - derive_private_key, derive_tlc_pubkey, AddTlcCommand, ChannelState, HashAlgorithm, - InMemorySigner, NegotiatingFundingFlags, OutboundTlcStatus, PaymentHopData, PaymentStatus, - Privkey, RemoveTlcFulfill, RemoveTlcReason, TLCId, TlcErrorCode, TlcStatus, NO_SHARED_SECRET, + derive_private_key, derive_tlc_pubkey, AddTlcCommand, ChannelConstraints, ChannelState, + ChannelTlcInfo, HashAlgorithm, InMemorySigner, NegotiatingFundingFlags, OutboundTlcStatus, + PaymentCustomRecords, PaymentHopData, PaymentStatus, Privkey, RemoveTlcFulfill, + RemoveTlcReason, TLCId, TlcErrorCode, TlcStatus, NO_SHARED_SECRET, }; use fiber_types::{CloseFlags, FeatureVector}; use musig2::secp::Point; use musig2::KeyAggContext; -use ractor::call; +use ractor::{call, Actor, ActorProcessingErr, ActorRef}; use secp256k1::SECP256K1; -use std::collections::HashSet; +use std::cell::RefCell; +use std::collections::{HashMap, HashSet}; use std::time::Duration; use tracing::{debug, error}; +struct NoopNetworkActor; + +#[async_trait::async_trait] +impl Actor for NoopNetworkActor { + type Msg = NetworkActorMessage; + type State = (); + type Arguments = (); + + async fn pre_start( + &self, + _myself: ActorRef, + _args: Self::Arguments, + ) -> Result { + Ok(()) + } + + async fn handle( + &self, + _myself: ActorRef, + _message: Self::Msg, + _state: &mut Self::State, + ) -> Result<(), ActorProcessingErr> { + Ok(()) + } +} + +struct NoopChannelMailboxActor; + +#[async_trait::async_trait] +impl Actor for NoopChannelMailboxActor { + type Msg = ChannelActorMessage; + type State = (); + type Arguments = (); + + async fn pre_start( + &self, + _myself: ActorRef, + _args: Self::Arguments, + ) -> Result { + Ok(()) + } + + async fn handle( + &self, + _myself: ActorRef, + _message: Self::Msg, + _state: &mut Self::State, + ) -> Result<(), ActorProcessingErr> { + Ok(()) + } +} + +#[derive(Default)] +struct DeferredReplayMockStore { + channel_states: RefCell>, + preimages: RefCell>, +} + +impl InvoiceStore for DeferredReplayMockStore { + fn get_invoice(&self, _id: &Hash256) -> Option { + None + } + + fn insert_invoice( + &self, + _invoice: CkbInvoice, + _preimage: Option, + ) -> Result<(), InvoiceError> { + Ok(()) + } + + fn update_invoice_status( + &self, + _id: &Hash256, + _status: CkbInvoiceStatus, + ) -> Result<(), InvoiceError> { + Ok(()) + } + + fn get_invoice_status(&self, _id: &Hash256) -> Option { + None + } +} + +impl PreimageStore for DeferredReplayMockStore { + fn insert_preimage(&self, payment_hash: Hash256, preimage: Hash256) { + self.preimages.borrow_mut().insert(payment_hash, preimage); + } + + fn remove_preimage(&self, payment_hash: &Hash256) { + self.preimages.borrow_mut().remove(payment_hash); + } + + fn get_preimage(&self, payment_hash: &Hash256) -> Option { + self.preimages.borrow().get(payment_hash).copied() + } +} + +impl ChannelActorStateStore for DeferredReplayMockStore { + fn get_channel_actor_state(&self, id: &Hash256) -> Option { + self.channel_states.borrow().get(id).cloned() + } + + fn insert_channel_actor_state(&self, state: ChannelActorState) { + self.channel_states.borrow_mut().insert(state.id, state); + } + + fn delete_channel_actor_state(&self, id: &Hash256) { + self.channel_states.borrow_mut().remove(id); + } + + fn get_channel_ids_by_pubkey(&self, _pubkey: &Pubkey) -> Vec { + self.channel_states.borrow().keys().copied().collect() + } + + fn get_channel_states(&self, _pubkey: Option) -> Vec<(Pubkey, Hash256, ChannelState)> { + vec![] + } + + fn get_channel_state_by_outpoint(&self, _id: &OutPoint) -> Option { + None + } + + fn insert_payment_custom_records( + &self, + _payment_hash: &Hash256, + _custom_records: PaymentCustomRecords, + ) { + } + + fn get_payment_custom_records(&self, _payment_hash: &Hash256) -> Option { + None + } + + fn insert_payment_hold_tlc(&self, _payment_hash: Hash256, _hold_tlc: HoldTlc) {} + + fn remove_payment_hold_tlc( + &self, + _payment_hash: &Hash256, + _channel_id: &Hash256, + _tlc_id: u64, + ) { + } + + fn get_payment_hold_tlcs(&self, _payment_hash: Hash256) -> Vec { + vec![] + } + + fn get_node_hold_tlcs(&self) -> HashMap> { + HashMap::default() + } + + fn is_tlc_settled(&self, _channel_id: &Hash256, _payment_hash: &Hash256) -> bool { + false + } + + fn store_pending_commit_diff(&self, _channel_id: &Hash256, _diff: &CommitDiff) {} + + fn get_pending_commit_diff(&self, _channel_id: &Hash256) -> Option { + None + } + + fn delete_pending_commit_diff(&self, _channel_id: &Hash256) {} +} + +fn create_deferred_replay_test_state(network: ActorRef) -> ChannelActorState { + let local_private_key = gen_rand_fiber_private_key(); + let local_pubkey = local_private_key.pubkey(); + let remote_pubkey = gen_rand_fiber_public_key(); + let mut state = ChannelActorState::new_outbound_channel( + None, + false, + &[7; 32], + local_pubkey, + remote_pubkey, + 1_000_000, + 0, + DEFAULT_COMMITMENT_FEE_RATE, + DEFAULT_COMMITMENT_DELAY_EPOCHS, + DEFAULT_FEE_RATE, + None, + Script::default(), + DEFAULT_MAX_TLC_VALUE_IN_FLIGHT, + 3, + ChannelTlcInfo::default(), + network, + local_private_key, + ); + state.update_state(ChannelState::ChannelReady); + state.remote_constraints = ChannelConstraints::new(DEFAULT_MAX_TLC_VALUE_IN_FLIGHT, 4); + state.defer_peer_tlc_updates = true; + state +} + +fn create_deferred_replay_test_add_tlc(channel_id: Hash256, tlc_id: u64) -> AddTlc { + AddTlc { + channel_id, + tlc_id, + amount: 1, + payment_hash: gen_rand_sha256_hash(), + expiry: now_timestamp_as_millis_u64() + 60_000, + hash_algorithm: HashAlgorithm::CkbHash, + onion_packet: None, + } +} + #[tokio::test] // Not supported on wasm: require filesystem access async fn test_connect_to_other_node() { @@ -6598,6 +6808,65 @@ async fn test_reestablish_bidirectional_pending() { ); } +#[tokio::test] +async fn test_deferred_peer_tlc_updates_are_bounded_by_channel_constraints() { + let network = Actor::spawn(None, NoopNetworkActor, ()) + .await + .expect("start noop network actor") + .0; + let myself = Actor::spawn(None, NoopChannelMailboxActor, ()) + .await + .expect("start noop mailbox actor") + .0; + let store = DeferredReplayMockStore::default(); + + let mut state = create_deferred_replay_test_state(network.clone()); + let actor = ChannelActor::new( + state.get_local_pubkey(), + state.get_remote_pubkey(), + network, + store, + ); + let channel_id = state.get_id(); + + let max_deferred_updates = state.local_constraints.max_tlc_number_in_flight + + state.remote_constraints.max_tlc_number_in_flight; + + for tlc_id in 0..max_deferred_updates { + actor + .handle_peer_message( + &myself, + &mut state, + FiberChannelMessage::AddTlc(create_deferred_replay_test_add_tlc( + channel_id, tlc_id, + )), + ) + .await + .expect("updates within channel limits should be deferred"); + } + + let overflow = actor + .handle_peer_message( + &myself, + &mut state, + FiberChannelMessage::AddTlc(create_deferred_replay_test_add_tlc( + channel_id, + max_deferred_updates, + )), + ) + .await; + + assert!( + overflow.is_err(), + "updates beyond the negotiated TLC limits should be rejected while deferring replay" + ); + assert_eq!( + state.deferred_peer_tlc_updates.len(), + max_deferred_updates as usize, + "deferred replay queue should stop growing once it reaches the negotiated TLC budget" + ); +} + #[tokio::test] async fn test_reestablish_does_not_complete_while_waiting_for_peer_revoke_and_ack() { init_tracing(); From 5dcf70458590fad476374ab22110a342c0ffe014 Mon Sep 17 00:00:00 2001 From: jjy Date: Fri, 13 Mar 2026 20:49:30 +0800 Subject: [PATCH 08/15] fix: scope pending inbound limits per peer --- crates/fiber-lib/src/fiber/config.rs | 8 ++++---- crates/fiber-lib/src/fiber/network.rs | 18 +++++++++++------- crates/fiber-lib/src/fiber/tests/network.rs | 12 ++++++------ 3 files changed, 21 insertions(+), 17 deletions(-) diff --git a/crates/fiber-lib/src/fiber/config.rs b/crates/fiber-lib/src/fiber/config.rs index 673c8e40c..2057ef5b0 100644 --- a/crates/fiber-lib/src/fiber/config.rs +++ b/crates/fiber-lib/src/fiber/config.rs @@ -341,21 +341,21 @@ pub struct FiberConfig { #[arg(skip)] pub wasm_key_pair: Option, - /// Max allowed total number of pending inbound channels awaiting acceptance. [default: 20] + /// Max allowed number of pending inbound channels awaiting acceptance from one peer. [default: 20] #[arg( name = "FIBER_TO_BE_ACCEPTED_CHANNELS_NUMBER_LIMIT", long = "fiber-to-be-accepted-channels-number-limit", env, - help = "Max allowed total number of pending inbound channels awaiting acceptance. [default: 20]" + help = "Max allowed number of pending inbound channels awaiting acceptance from one peer. [default: 20]" )] pub to_be_accepted_channels_number_limit: Option, - /// Max allowed total storage bytes of pending inbound channels awaiting acceptance. [default: 50KB] + /// Max allowed storage bytes of pending inbound channels awaiting acceptance from one peer. [default: 50KB] #[arg( name = "FIBER_TO_BE_ACCEPTED_CHANNELS_BYTESS_LIMIT", long = "fiber-to-be-accepted-channels-bytes-limit", env, - help = "Max allowed total bytes of pending inbound channels awaiting acceptance. [default: 50KB]" + help = "Max allowed bytes of pending inbound channels awaiting acceptance from one peer. [default: 50KB]" )] pub to_be_accepted_channels_bytes_limit: Option, diff --git a/crates/fiber-lib/src/fiber/network.rs b/crates/fiber-lib/src/fiber/network.rs index f6aa9eacc..c5fee72d5 100644 --- a/crates/fiber-lib/src/fiber/network.rs +++ b/crates/fiber-lib/src/fiber/network.rs @@ -5102,7 +5102,7 @@ impl ToBeAcceptedChannels { self.map.remove(id) } - // insert and apply single-flight and aggregate throttle control + // insert and apply single-flight and per-peer throttle control fn try_insert( &mut self, id: Hash256, @@ -5133,12 +5133,16 @@ impl ToBeAcceptedChannels { // The map should be small because of the flow control, so calculate the total number and // bytes on the fly. - let (total_number, total_bytes) = self.map.values().fold( - (1, open_channel.mem_size()), - |(count, size), (_, saved_open_channel)| { - (count + 1, size + saved_open_channel.mem_size()) - }, - ); + let (total_number, total_bytes) = self + .map + .values() + .filter(|(saved_pubkey, _)| *saved_pubkey == pubkey) + .fold( + (1, open_channel.mem_size()), + |(count, size), (_, saved_open_channel)| { + (count + 1, size + saved_open_channel.mem_size()) + }, + ); if total_number > self.total_number_limit { return Err(ProcessingChannelError::ToBeAcceptedChannelsExceedLimit( diff --git a/crates/fiber-lib/src/fiber/tests/network.rs b/crates/fiber-lib/src/fiber/tests/network.rs index b5b672f2b..4e062c96c 100644 --- a/crates/fiber-lib/src/fiber/tests/network.rs +++ b/crates/fiber-lib/src/fiber/tests/network.rs @@ -1441,7 +1441,7 @@ async fn test_abort_funding_on_sign_funding_tx_failure() { } #[tokio::test] -async fn test_to_be_accepted_channels_number_limit() { +async fn test_to_be_accepted_channels_number_limit_is_per_peer() { let funding_amount = 9_900_000_000u128; let open_channel_auto_accept_min_ckb_funding_amount = Some(funding_amount as u64 + 1); let mut node = NetworkNode::new_with_config( @@ -1520,10 +1520,10 @@ async fn test_to_be_accepted_channels_number_limit() { }) .expect("node alive") .expect("pending accept channels"); - assert_eq!(pending.len(), 2); + assert_eq!(pending.len(), 3); assert!(pending.iter().any(|channel| channel.pubkey == peer1.pubkey)); assert!(pending.iter().any(|channel| channel.pubkey == peer2.pubkey)); - assert!(!pending.iter().any(|channel| channel.pubkey == peer3.pubkey)); + assert!(pending.iter().any(|channel| channel.pubkey == peer3.pubkey)); } #[tokio::test] @@ -1680,7 +1680,7 @@ async fn test_different_peers_can_each_have_one_pending_inbound_channel() { } #[tokio::test] -async fn test_to_be_accepted_channels_bytes_limit() { +async fn test_to_be_accepted_channels_bytes_limit_is_per_peer() { init_tracing(); let rand_privkey = gen_rand_fiber_private_key(); @@ -1794,8 +1794,8 @@ async fn test_to_be_accepted_channels_bytes_limit() { }) .expect("node alive") .expect("pending accept channels"); - assert_eq!(pending.len(), 2); + assert_eq!(pending.len(), 3); assert!(pending.iter().any(|channel| channel.pubkey == peer1.pubkey)); assert!(pending.iter().any(|channel| channel.pubkey == peer2.pubkey)); - assert!(!pending.iter().any(|channel| channel.pubkey == peer3.pubkey)); + assert!(pending.iter().any(|channel| channel.pubkey == peer3.pubkey)); } From 6ceebd3966b15ab9d856813f1b382809e7f5c891 Mon Sep 17 00:00:00 2001 From: jjy Date: Sun, 15 Mar 2026 23:40:26 +0800 Subject: [PATCH 09/15] test: use real channel actors in deferred replay test --- crates/fiber-lib/src/fiber/channel.rs | 15 + crates/fiber-lib/src/fiber/tests/channel.rs | 325 ++++---------------- 2 files changed, 82 insertions(+), 258 deletions(-) diff --git a/crates/fiber-lib/src/fiber/channel.rs b/crates/fiber-lib/src/fiber/channel.rs index 39bb94b09..f06e2051d 100644 --- a/crates/fiber-lib/src/fiber/channel.rs +++ b/crates/fiber-lib/src/fiber/channel.rs @@ -177,6 +177,8 @@ pub enum ChannelCommand { Update(UpdateCommand, RpcReplyPort>), NotifyEvent(ChannelEvent), #[cfg(any(test, feature = "bench"))] + SetDeferPeerTlcUpdates(bool), + #[cfg(any(test, feature = "bench"))] ReloadState(ReloadParams), } @@ -193,6 +195,10 @@ impl Display for ChannelCommand { ChannelCommand::Update(_, _) => write!(f, "Update"), ChannelCommand::NotifyEvent(event) => write!(f, "NotifyEvent [{:?}]", event), #[cfg(any(test, feature = "bench"))] + ChannelCommand::SetDeferPeerTlcUpdates(enabled) => { + write!(f, "SetDeferPeerTlcUpdates [{enabled}]") + } + #[cfg(any(test, feature = "bench"))] ChannelCommand::ReloadState(_) => write!(f, "ReloadState"), } } @@ -2340,6 +2346,15 @@ where } ChannelCommand::NotifyEvent(event) => self.handle_event(myself, state, event).await, #[cfg(any(test, feature = "bench"))] + ChannelCommand::SetDeferPeerTlcUpdates(enabled) => { + if enabled { + state.start_defer_peer_tlc_updates(); + } else { + state.stop_defer_peer_tlc_updates(); + } + Ok(()) + } + #[cfg(any(test, feature = "bench"))] ChannelCommand::ReloadState(reload_params) => { let private_key = state.private_key.clone(); *state = self diff --git a/crates/fiber-lib/src/fiber/tests/channel.rs b/crates/fiber-lib/src/fiber/tests/channel.rs index 9e9002a0e..c59a1d1e2 100644 --- a/crates/fiber-lib/src/fiber/tests/channel.rs +++ b/crates/fiber-lib/src/fiber/tests/channel.rs @@ -1,9 +1,8 @@ use crate::ckb::tests::test_utils::complete_commitment_tx; use crate::fiber::channel::{ - AddTlcResponse, ChannelActor, ChannelActorMessage, ChannelActorState, ChannelActorStateStore, - CommitDiff, ReplayOrderHint, UpdateCommand, DEFAULT_COMMITMENT_FEE_RATE, DEFAULT_FEE_RATE, - DEFAULT_MAX_TLC_VALUE_IN_FLIGHT, MAX_COMMITMENT_DELAY_EPOCHS, MIN_COMMITMENT_DELAY_EPOCHS, - XUDT_COMPATIBLE_WITNESS, + AddTlcResponse, ChannelActorStateStore, ReloadParams, ReplayOrderHint, UpdateCommand, + DEFAULT_COMMITMENT_FEE_RATE, DEFAULT_MAX_TLC_VALUE_IN_FLIGHT, MAX_COMMITMENT_DELAY_EPOCHS, + MIN_COMMITMENT_DELAY_EPOCHS, XUDT_COMPATIBLE_WITNESS, }; use crate::fiber::config::{ DEFAULT_COMMITMENT_DELAY_EPOCHS, DEFAULT_FINAL_TLC_EXPIRY_DELTA, DEFAULT_TLC_EXPIRY_DELTA, @@ -14,13 +13,10 @@ use crate::fiber::graph::ChannelInfo; use crate::fiber::network::{DebugEvent, FiberMessageWithTarget, PeerDisconnectReason}; use crate::fiber::payment::SendPaymentCommand; use crate::fiber::types::{ - AddTlc, FiberChannelMessage, FiberMessage, Hash256, HoldTlc, Init, PeeledPaymentOnionPacket, - Pubkey, ReestablishChannel, TlcErr, -}; -use crate::invoice::{ - CkbInvoice, CkbInvoiceStatus, Currency, InvoiceBuilder, InvoiceError, InvoiceStore, - PreimageStore, + AddTlc, FiberMessage, Hash256, Init, PeeledPaymentOnionPacket, Pubkey, ReestablishChannel, + TlcErr, }; +use crate::invoice::{CkbInvoiceStatus, Currency, InvoiceBuilder}; use crate::test_utils::{init_tracing, NetworkNode}; use crate::tests::test_utils::*; use crate::{ @@ -37,219 +33,24 @@ use crate::{ use ckb_types::core::EpochNumberWithFraction; use ckb_types::{ core::{tx_pool::TxStatus, FeeRate}, - packed::{CellInput, OutPoint, Script, Transaction}, + packed::{CellInput, Script, Transaction}, prelude::{AsTransactionBuilder, Builder, Entity, Pack, Unpack}, }; use fiber_types::{ derive_private_key, derive_tlc_pubkey, AddTlcCommand, ChannelConstraints, ChannelState, - ChannelTlcInfo, HashAlgorithm, InMemorySigner, NegotiatingFundingFlags, OutboundTlcStatus, - PaymentCustomRecords, PaymentHopData, PaymentStatus, Privkey, RemoveTlcFulfill, - RemoveTlcReason, TLCId, TlcErrorCode, TlcStatus, NO_SHARED_SECRET, + HashAlgorithm, InMemorySigner, NegotiatingFundingFlags, OutboundTlcStatus, PaymentHopData, + PaymentStatus, Privkey, RemoveTlcFulfill, RemoveTlcReason, TLCId, TlcErrorCode, TlcStatus, + NO_SHARED_SECRET, }; use fiber_types::{CloseFlags, FeatureVector}; use musig2::secp::Point; use musig2::KeyAggContext; -use ractor::{call, Actor, ActorProcessingErr, ActorRef}; +use ractor::call; use secp256k1::SECP256K1; -use std::cell::RefCell; -use std::collections::{HashMap, HashSet}; +use std::collections::HashSet; use std::time::Duration; use tracing::{debug, error}; -struct NoopNetworkActor; - -#[async_trait::async_trait] -impl Actor for NoopNetworkActor { - type Msg = NetworkActorMessage; - type State = (); - type Arguments = (); - - async fn pre_start( - &self, - _myself: ActorRef, - _args: Self::Arguments, - ) -> Result { - Ok(()) - } - - async fn handle( - &self, - _myself: ActorRef, - _message: Self::Msg, - _state: &mut Self::State, - ) -> Result<(), ActorProcessingErr> { - Ok(()) - } -} - -struct NoopChannelMailboxActor; - -#[async_trait::async_trait] -impl Actor for NoopChannelMailboxActor { - type Msg = ChannelActorMessage; - type State = (); - type Arguments = (); - - async fn pre_start( - &self, - _myself: ActorRef, - _args: Self::Arguments, - ) -> Result { - Ok(()) - } - - async fn handle( - &self, - _myself: ActorRef, - _message: Self::Msg, - _state: &mut Self::State, - ) -> Result<(), ActorProcessingErr> { - Ok(()) - } -} - -#[derive(Default)] -struct DeferredReplayMockStore { - channel_states: RefCell>, - preimages: RefCell>, -} - -impl InvoiceStore for DeferredReplayMockStore { - fn get_invoice(&self, _id: &Hash256) -> Option { - None - } - - fn insert_invoice( - &self, - _invoice: CkbInvoice, - _preimage: Option, - ) -> Result<(), InvoiceError> { - Ok(()) - } - - fn update_invoice_status( - &self, - _id: &Hash256, - _status: CkbInvoiceStatus, - ) -> Result<(), InvoiceError> { - Ok(()) - } - - fn get_invoice_status(&self, _id: &Hash256) -> Option { - None - } -} - -impl PreimageStore for DeferredReplayMockStore { - fn insert_preimage(&self, payment_hash: Hash256, preimage: Hash256) { - self.preimages.borrow_mut().insert(payment_hash, preimage); - } - - fn remove_preimage(&self, payment_hash: &Hash256) { - self.preimages.borrow_mut().remove(payment_hash); - } - - fn get_preimage(&self, payment_hash: &Hash256) -> Option { - self.preimages.borrow().get(payment_hash).copied() - } -} - -impl ChannelActorStateStore for DeferredReplayMockStore { - fn get_channel_actor_state(&self, id: &Hash256) -> Option { - self.channel_states.borrow().get(id).cloned() - } - - fn insert_channel_actor_state(&self, state: ChannelActorState) { - self.channel_states.borrow_mut().insert(state.id, state); - } - - fn delete_channel_actor_state(&self, id: &Hash256) { - self.channel_states.borrow_mut().remove(id); - } - - fn get_channel_ids_by_pubkey(&self, _pubkey: &Pubkey) -> Vec { - self.channel_states.borrow().keys().copied().collect() - } - - fn get_channel_states(&self, _pubkey: Option) -> Vec<(Pubkey, Hash256, ChannelState)> { - vec![] - } - - fn get_channel_state_by_outpoint(&self, _id: &OutPoint) -> Option { - None - } - - fn insert_payment_custom_records( - &self, - _payment_hash: &Hash256, - _custom_records: PaymentCustomRecords, - ) { - } - - fn get_payment_custom_records(&self, _payment_hash: &Hash256) -> Option { - None - } - - fn insert_payment_hold_tlc(&self, _payment_hash: Hash256, _hold_tlc: HoldTlc) {} - - fn remove_payment_hold_tlc( - &self, - _payment_hash: &Hash256, - _channel_id: &Hash256, - _tlc_id: u64, - ) { - } - - fn get_payment_hold_tlcs(&self, _payment_hash: Hash256) -> Vec { - vec![] - } - - fn get_node_hold_tlcs(&self) -> HashMap> { - HashMap::default() - } - - fn is_tlc_settled(&self, _channel_id: &Hash256, _payment_hash: &Hash256) -> bool { - false - } - - fn store_pending_commit_diff(&self, _channel_id: &Hash256, _diff: &CommitDiff) {} - - fn get_pending_commit_diff(&self, _channel_id: &Hash256) -> Option { - None - } - - fn delete_pending_commit_diff(&self, _channel_id: &Hash256) {} -} - -fn create_deferred_replay_test_state(network: ActorRef) -> ChannelActorState { - let local_private_key = gen_rand_fiber_private_key(); - let local_pubkey = local_private_key.pubkey(); - let remote_pubkey = gen_rand_fiber_public_key(); - let mut state = ChannelActorState::new_outbound_channel( - None, - false, - &[7; 32], - local_pubkey, - remote_pubkey, - 1_000_000, - 0, - DEFAULT_COMMITMENT_FEE_RATE, - DEFAULT_COMMITMENT_DELAY_EPOCHS, - DEFAULT_FEE_RATE, - None, - Script::default(), - DEFAULT_MAX_TLC_VALUE_IN_FLIGHT, - 3, - ChannelTlcInfo::default(), - network, - local_private_key, - ); - state.update_state(ChannelState::ChannelReady); - state.remote_constraints = ChannelConstraints::new(DEFAULT_MAX_TLC_VALUE_IN_FLIGHT, 4); - state.defer_peer_tlc_updates = true; - state -} - fn create_deferred_replay_test_add_tlc(channel_id: Hash256, tlc_id: u64) -> AddTlc { AddTlc { channel_id, @@ -6810,61 +6611,69 @@ async fn test_reestablish_bidirectional_pending() { #[tokio::test] async fn test_deferred_peer_tlc_updates_are_bounded_by_channel_constraints() { - let network = Actor::spawn(None, NoopNetworkActor, ()) - .await - .expect("start noop network actor") - .0; - let myself = Actor::spawn(None, NoopChannelMailboxActor, ()) - .await - .expect("start noop mailbox actor") - .0; - let store = DeferredReplayMockStore::default(); - - let mut state = create_deferred_replay_test_state(network.clone()); - let actor = ChannelActor::new( - state.get_local_pubkey(), - state.get_remote_pubkey(), - network, - store, - ); - let channel_id = state.get_id(); + init_tracing(); + let (mut node_a, node_b, channel_id, _) = + NetworkNode::new_2_nodes_with_established_channel(100000000000, 100000000000, true).await; - let max_deferred_updates = state.local_constraints.max_tlc_number_in_flight - + state.remote_constraints.max_tlc_number_in_flight; + let mut state = node_a.get_channel_actor_state(channel_id); + state.local_constraints = ChannelConstraints::new(DEFAULT_MAX_TLC_VALUE_IN_FLIGHT, 3); + state.remote_constraints = ChannelConstraints::new(DEFAULT_MAX_TLC_VALUE_IN_FLIGHT, 4); + node_a + .update_channel_actor_state( + state, + Some(ReloadParams { + notify_changes: false, + }), + ) + .await; + + let max_deferred_updates = 7; + + node_a + .network_actor + .send_message(NetworkActorMessage::Command( + NetworkActorCommand::ControlFiberChannel(ChannelCommandWithId { + channel_id, + command: ChannelCommand::SetDeferPeerTlcUpdates(true), + }), + )) + .expect("enable deferred peer TLC replay"); for tlc_id in 0..max_deferred_updates { - actor - .handle_peer_message( - &myself, - &mut state, - FiberChannelMessage::AddTlc(create_deferred_replay_test_add_tlc( - channel_id, tlc_id, + node_b + .network_actor + .send_message(NetworkActorMessage::Command( + NetworkActorCommand::SendFiberMessage(FiberMessageWithTarget::new( + node_a.pubkey, + FiberMessage::add_tlc(create_deferred_replay_test_add_tlc(channel_id, tlc_id)), )), - ) - .await - .expect("updates within channel limits should be deferred"); + )) + .expect("send deferred add_tlc message"); } - let overflow = actor - .handle_peer_message( - &myself, - &mut state, - FiberChannelMessage::AddTlc(create_deferred_replay_test_add_tlc( - channel_id, - max_deferred_updates, + node_b + .network_actor + .send_message(NetworkActorMessage::Command( + NetworkActorCommand::SendFiberMessage(FiberMessageWithTarget::new( + node_a.pubkey, + FiberMessage::add_tlc(create_deferred_replay_test_add_tlc( + channel_id, + max_deferred_updates, + )), )), - ) - .await; + )) + .expect("send overflow deferred add_tlc message"); - assert!( - overflow.is_err(), - "updates beyond the negotiated TLC limits should be rejected while deferring replay" - ); - assert_eq!( - state.deferred_peer_tlc_updates.len(), - max_deferred_updates as usize, - "deferred replay queue should stop growing once it reaches the negotiated TLC budget" - ); + node_a + .expect_to_process_event(|event| match event { + NetworkServiceEvent::DebugEvent(DebugEvent::Common(message)) + if message.contains("Too many deferred peer TLC updates") => + { + Some(()) + } + _ => None, + }) + .await; } #[tokio::test] From 71c123dfd907d6c6fa37adecaae1da7185cc1250 Mon Sep 17 00:00:00 2001 From: jjy Date: Sun, 15 Mar 2026 23:54:59 +0800 Subject: [PATCH 10/15] test: fix reestablish send nonce assertion --- crates/fiber-lib/src/fiber/tests/channel.rs | 33 ++++++++++++++++----- 1 file changed, 25 insertions(+), 8 deletions(-) diff --git a/crates/fiber-lib/src/fiber/tests/channel.rs b/crates/fiber-lib/src/fiber/tests/channel.rs index c59a1d1e2..c2837214b 100644 --- a/crates/fiber-lib/src/fiber/tests/channel.rs +++ b/crates/fiber-lib/src/fiber/tests/channel.rs @@ -6537,6 +6537,14 @@ async fn test_reestablish_restores_send_nonce() { " Verify Nonce: {:?}", state.remote_revocation_nonce_for_verify.is_some() ); + assert!( + state.remote_revocation_nonce_for_send.is_some(), + "node_b should restore send nonce after reestablish" + ); + assert!( + state.remote_revocation_nonce_for_verify.is_some(), + "node_b should retain verify nonce after reestablish" + ); let state_a = node_a.get_channel_actor_state(channel_id); println!("Node A State:"); @@ -6556,16 +6564,25 @@ async fn test_reestablish_restores_send_nonce() { " Verify Nonce: {:?}", state_a.remote_revocation_nonce_for_verify.is_some() ); + assert!( + state_a.remote_revocation_nonce_for_send.is_some(), + "node_a should have a send nonce after reestablish completes" + ); + assert!( + state_a.remote_revocation_nonce_for_verify.is_some(), + "node_a should have a verify nonce after reestablish completes" + ); // Further verification: A can send another payment. - // Use a new payment to differentiate. - let res = node_a.send_payment_keysend(&node_b, 2000, false).await; - let err_string = res.unwrap_err().to_string(); - println!("err: {err_string}"); - // check error string - assert!(err_string.contains( - "Send payment first hop error: Failed to send onion packet with error UnknownNextPeer" - )); + let second_payment_hash = node_a + .send_payment_keysend(&node_b, 2000, false) + .await + .expect("send should succeed after send nonce is restored") + .payment_hash; + node_a.wait_until_success(second_payment_hash).await; + node_a + .assert_payment_status(second_payment_hash, PaymentStatus::Success, None) + .await; } /// Bidirectional pending operations during reestablish. From 85100460b2743c0c54d99dfbde26365a2fa38bb5 Mon Sep 17 00:00:00 2001 From: jjy Date: Mon, 16 Mar 2026 11:16:04 +0800 Subject: [PATCH 11/15] test: cover inbound peer replacement during channel open --- crates/fiber-lib/src/fiber/tests/network.rs | 82 +++++++++++++++++++++ 1 file changed, 82 insertions(+) diff --git a/crates/fiber-lib/src/fiber/tests/network.rs b/crates/fiber-lib/src/fiber/tests/network.rs index 4e062c96c..cb268c0e3 100644 --- a/crates/fiber-lib/src/fiber/tests/network.rs +++ b/crates/fiber-lib/src/fiber/tests/network.rs @@ -860,6 +860,88 @@ async fn test_inbound_peer_with_channel_does_not_consume_no_channel_peer_budget( .any(|peer| peer.pubkey == peer_without_channel.pubkey)); } +#[tokio::test] +async fn test_new_inbound_peer_can_open_channel_after_replacing_oldest_no_channel_peer() { + init_tracing(); + + let funding_amount = 9_900_000_000u128; + let open_channel_auto_accept_min_ckb_funding_amount = Some(funding_amount as u64 + 1); + + let mut target = NetworkNode::new_with_config( + NetworkNodeConfigBuilder::new() + .fiber_config_updater(move |config| { + config.max_inbound_peers = Some(1); + config.min_outbound_peers = Some(0); + config.open_channel_auto_accept_min_ckb_funding_amount = + open_channel_auto_accept_min_ckb_funding_amount; + }) + .build(), + ) + .await; + let mut old_peer = NetworkNode::new().await; + let mut new_peer = NetworkNode::new().await; + + old_peer.connect_to(&mut target).await; + new_peer.connect_to(&mut target).await; + + old_peer + .expect_event( + |event| matches!(event, NetworkServiceEvent::PeerDisConnected(id, _reason) if id == &target.pubkey), + ) + .await; + + let peers = call!(target.network_actor, |rpc_reply| { + NetworkActorMessage::Command(NetworkActorCommand::ListPeers((), rpc_reply)) + }) + .expect("target alive") + .expect("list peers"); + assert_eq!( + peers.len(), + 1, + "target should only retain the newest inbound no-channel peer", + ); + assert_eq!( + peers[0].pubkey, new_peer.pubkey, + "target should retain the new inbound peer after evicting the oldest no-channel peer", + ); + + let target_pubkey = target.pubkey; + let message = |rpc_reply| { + NetworkActorMessage::Command(NetworkActorCommand::OpenChannel( + OpenChannelCommand { + pubkey: target_pubkey, + public: true, + one_way: false, + shutdown_script: None, + funding_amount, + funding_udt_type_script: None, + commitment_fee_rate: None, + commitment_delay_epoch: None, + funding_fee_rate: None, + tlc_expiry_delta: None, + tlc_min_value: None, + tlc_fee_proportional_millionths: None, + max_tlc_number_in_flight: None, + max_tlc_value_in_flight: None, + }, + rpc_reply, + )) + }; + + call!(new_peer.network_actor, message) + .expect("new peer alive") + .expect("open channel"); + target + .expect_event(|event| match event { + NetworkServiceEvent::ChannelPendingToBeAccepted(pubkey, _channel_id) => { + assert_eq!(pubkey, &new_peer.pubkey); + true + } + _ => false, + }) + .await; +} + #[tokio::test] async fn test_persisting_announced_nodes() { init_tracing(); From 53911853f4c11c1092eba1312dd198939bc90488 Mon Sep 17 00:00:00 2001 From: jjy Date: Tue, 17 Mar 2026 11:00:50 +0800 Subject: [PATCH 12/15] fix: remove duplicate pending-open peer limit --- crates/fiber-lib/src/fiber/config.rs | 14 +- crates/fiber-lib/src/fiber/network.rs | 15 +- crates/fiber-lib/src/fiber/tests/network.rs | 217 ++------------------ 3 files changed, 26 insertions(+), 220 deletions(-) diff --git a/crates/fiber-lib/src/fiber/config.rs b/crates/fiber-lib/src/fiber/config.rs index 2057ef5b0..d800ebf18 100644 --- a/crates/fiber-lib/src/fiber/config.rs +++ b/crates/fiber-lib/src/fiber/config.rs @@ -235,13 +235,13 @@ pub struct FiberConfig { )] pub(crate) gossip_network_maintenance_interval_ms: Option, - /// Maximal number of inbound peers without channels. The node will disconnect the oldest - /// inbound peers without channels when their number exceeds this budget. [default: 16] + /// Maximal number of inbound connections. The node will disconnect inbound connections + /// when the number of inbound connection exceeds this number. [default: 16] #[arg( name = "FIBER_MAX_INBOUND_PEERS", long = "fiber-max-inbound-peers", env, - help = "Maximal number of inbound peers without channels. The node will disconnect the oldest inbound peers without channels when their number exceeds this budget. [default: 16]" + help = "Maximal number of inbound connections. The node will disconnect inbound connections when the number of inbound connection exceeds this number. [default: 16]" )] pub(crate) max_inbound_peers: Option, @@ -341,21 +341,21 @@ pub struct FiberConfig { #[arg(skip)] pub wasm_key_pair: Option, - /// Max allowed number of pending inbound channels awaiting acceptance from one peer. [default: 20] + /// Max allowed number of channels to be accepted from one peer. [default: 20] #[arg( name = "FIBER_TO_BE_ACCEPTED_CHANNELS_NUMBER_LIMIT", long = "fiber-to-be-accepted-channels-number-limit", env, - help = "Max allowed number of pending inbound channels awaiting acceptance from one peer. [default: 20]" + help = "Max allowed number of channels to be accepted from one peer. [default: 20]" )] pub to_be_accepted_channels_number_limit: Option, - /// Max allowed storage bytes of pending inbound channels awaiting acceptance from one peer. [default: 50KB] + /// Max allowed storage bytes of channels to be accepted from one peer. [default: 50KB] #[arg( name = "FIBER_TO_BE_ACCEPTED_CHANNELS_BYTESS_LIMIT", long = "fiber-to-be-accepted-channels-bytes-limit", env, - help = "Max allowed bytes of pending inbound channels awaiting acceptance from one peer. [default: 50KB]" + help = "Max allowed bytes of channels to be accepted from one peer. [default: 50KB]" )] pub to_be_accepted_channels_bytes_limit: Option, diff --git a/crates/fiber-lib/src/fiber/network.rs b/crates/fiber-lib/src/fiber/network.rs index c5fee72d5..7a307e7d0 100644 --- a/crates/fiber-lib/src/fiber/network.rs +++ b/crates/fiber-lib/src/fiber/network.rs @@ -5102,7 +5102,7 @@ impl ToBeAcceptedChannels { self.map.remove(id) } - // insert and apply single-flight and per-peer throttle control + // insert and apply throttle control fn try_insert( &mut self, id: Hash256, @@ -5118,19 +5118,6 @@ impl ToBeAcceptedChannels { return Err(ProcessingChannelError::RepeatedProcessing(err_message)); } - if let Some((existing_channel_id, _)) = self - .map - .iter() - .find(|(_, (saved_pubkey, _))| *saved_pubkey == pubkey) - { - return Err(ProcessingChannelError::ToBeAcceptedChannelsExceedLimit( - format!( - "Peer {:?} already has a pending inbound channel awaiting acceptance: {:?}", - pubkey, existing_channel_id - ), - )); - } - // The map should be small because of the flow control, so calculate the total number and // bytes on the fly. let (total_number, total_bytes) = self diff --git a/crates/fiber-lib/src/fiber/tests/network.rs b/crates/fiber-lib/src/fiber/tests/network.rs index cb268c0e3..0ce11bd6e 100644 --- a/crates/fiber-lib/src/fiber/tests/network.rs +++ b/crates/fiber-lib/src/fiber/tests/network.rs @@ -1523,7 +1523,7 @@ async fn test_abort_funding_on_sign_funding_tx_failure() { } #[tokio::test] -async fn test_to_be_accepted_channels_number_limit_is_per_peer() { +async fn test_to_be_accepted_channels_number_limit() { let funding_amount = 9_900_000_000u128; let open_channel_auto_accept_min_ckb_funding_amount = Some(funding_amount as u64 + 1); let mut node = NetworkNode::new_with_config( @@ -1537,94 +1537,11 @@ async fn test_to_be_accepted_channels_number_limit_is_per_peer() { .build(), ) .await; - let mut peer1 = NetworkNode::new().await; - let mut peer2 = NetworkNode::new().await; - let mut peer3 = NetworkNode::new().await; - node.connect_to(&mut peer1).await; - node.connect_to(&mut peer2).await; - node.connect_to(&mut peer3).await; - - let node_pubkey = node.pubkey; - - let message = |rpc_reply| { - NetworkActorMessage::Command(NetworkActorCommand::OpenChannel( - OpenChannelCommand { - pubkey: node_pubkey, - public: true, - one_way: false, - shutdown_script: None, - funding_amount, - funding_udt_type_script: None, - commitment_fee_rate: None, - commitment_delay_epoch: None, - funding_fee_rate: None, - tlc_expiry_delta: None, - tlc_min_value: None, - tlc_fee_proportional_millionths: None, - max_tlc_number_in_flight: None, - max_tlc_value_in_flight: None, - }, - rpc_reply, - )) - }; - - call!(peer1.network_actor, message) - .expect("peer1 alive") - .expect("open channel"); - node.expect_event(|event| match event { - NetworkServiceEvent::ChannelPendingToBeAccepted(pubkey, _channel_id) => { - assert_eq!(pubkey, &peer1.pubkey); - true - } - _ => false, - }) - .await; - - call!(peer2.network_actor, message) - .expect("peer2 alive") - .expect("open channel"); - node.expect_event(|event| match event { - NetworkServiceEvent::ChannelPendingToBeAccepted(pubkey, _channel_id) => { - assert_eq!(pubkey, &peer2.pubkey); - true - } - _ => false, - }) - .await; - - call!(peer3.network_actor, message) - .expect("peer3 alive") - .expect("open channel"); - tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; - - let pending = call!(node.network_actor, |rpc_reply| { - NetworkActorMessage::Command(NetworkActorCommand::GetPendingAcceptChannels(rpc_reply)) - }) - .expect("node alive") - .expect("pending accept channels"); - assert_eq!(pending.len(), 3); - assert!(pending.iter().any(|channel| channel.pubkey == peer1.pubkey)); - assert!(pending.iter().any(|channel| channel.pubkey == peer2.pubkey)); - assert!(pending.iter().any(|channel| channel.pubkey == peer3.pubkey)); -} - -#[tokio::test] -async fn test_same_peer_second_pending_inbound_channel_is_rejected() { - let funding_amount = 9_900_000_000u128; - let open_channel_auto_accept_min_ckb_funding_amount = Some(funding_amount as u64 + 1); - let mut node = NetworkNode::new_with_config( - NetworkNodeConfigBuilder::new() - .fiber_config_updater(move |config| { - config.open_channel_auto_accept_min_ckb_funding_amount = - open_channel_auto_accept_min_ckb_funding_amount; - }) - .build(), - ) - .await; let mut peer = NetworkNode::new().await; node.connect_to(&mut peer).await; let node_pubkey = node.pubkey; + let message = |rpc_reply| { NetworkActorMessage::Command(NetworkActorCommand::OpenChannel( OpenChannelCommand { @@ -1659,110 +1576,26 @@ async fn test_same_peer_second_pending_inbound_channel_is_rejected() { }) .await; - call!(node.network_actor, |rpc_reply| { - NetworkActorMessage::Command(NetworkActorCommand::GetPendingAcceptChannels(rpc_reply)) - }) - .expect("node alive") - .expect("pending accept channels") - .iter() - .find(|pending| pending.pubkey == peer.pubkey) - .expect("first pending channel exists"); - call!(peer.network_actor, message) .expect("peer alive") .expect("open channel"); - tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; - - let pending = call!(node.network_actor, |rpc_reply| { - NetworkActorMessage::Command(NetworkActorCommand::GetPendingAcceptChannels(rpc_reply)) - }) - .expect("node alive") - .expect("pending accept channels"); - assert_eq!( - pending.len(), - 1, - "second pending inbound open from same peer should be rejected" - ); - assert_eq!(pending[0].pubkey, peer.pubkey); -} - -#[tokio::test] -async fn test_different_peers_can_each_have_one_pending_inbound_channel() { - let funding_amount = 9_900_000_000u128; - let open_channel_auto_accept_min_ckb_funding_amount = Some(funding_amount as u64 + 1); - let mut node = NetworkNode::new_with_config( - NetworkNodeConfigBuilder::new() - .fiber_config_updater(move |config| { - config.open_channel_auto_accept_min_ckb_funding_amount = - open_channel_auto_accept_min_ckb_funding_amount; - }) - .build(), - ) - .await; - let mut peer1 = NetworkNode::new().await; - let mut peer2 = NetworkNode::new().await; - node.connect_to(&mut peer1).await; - node.connect_to(&mut peer2).await; - - let node_pubkey = node.pubkey; - let message = |rpc_reply| { - NetworkActorMessage::Command(NetworkActorCommand::OpenChannel( - OpenChannelCommand { - pubkey: node_pubkey, - public: true, - one_way: false, - shutdown_script: None, - funding_amount, - funding_udt_type_script: None, - commitment_fee_rate: None, - commitment_delay_epoch: None, - funding_fee_rate: None, - tlc_expiry_delta: None, - tlc_min_value: None, - tlc_fee_proportional_millionths: None, - max_tlc_number_in_flight: None, - max_tlc_value_in_flight: None, - }, - rpc_reply, - )) - }; - - call!(peer1.network_actor, message) - .expect("peer1 alive") - .expect("open channel"); node.expect_event(|event| match event { NetworkServiceEvent::ChannelPendingToBeAccepted(pubkey, _channel_id) => { - assert_eq!(pubkey, &peer1.pubkey); + assert_eq!(pubkey, &peer.pubkey); true } _ => false, }) .await; - call!(peer2.network_actor, message) - .expect("peer2 alive") + call!(peer.network_actor, message) + .expect("peer alive") .expect("open channel"); - node.expect_event(|event| match event { - NetworkServiceEvent::ChannelPendingToBeAccepted(pubkey, _channel_id) => { - assert_eq!(pubkey, &peer2.pubkey); - true - } - _ => false, - }) - .await; - - let pending = call!(node.network_actor, |rpc_reply| { - NetworkActorMessage::Command(NetworkActorCommand::GetPendingAcceptChannels(rpc_reply)) - }) - .expect("node alive") - .expect("pending accept channels"); - assert_eq!(pending.len(), 2); - assert!(pending.iter().any(|channel| channel.pubkey == peer1.pubkey)); - assert!(pending.iter().any(|channel| channel.pubkey == peer2.pubkey)); + node.expect_debug_event("ChannelPendingToBeRejected").await; } #[tokio::test] -async fn test_to_be_accepted_channels_bytes_limit_is_per_peer() { +async fn test_to_be_accepted_channels_bytes_limit() { init_tracing(); let rand_privkey = gen_rand_fiber_private_key(); @@ -1811,12 +1644,8 @@ async fn test_to_be_accepted_channels_bytes_limit_is_per_peer() { .build(), ) .await; - let mut peer1 = NetworkNode::new().await; - let mut peer2 = NetworkNode::new().await; - let mut peer3 = NetworkNode::new().await; - node.connect_to(&mut peer1).await; - node.connect_to(&mut peer2).await; - node.connect_to(&mut peer3).await; + let mut peer = NetworkNode::new().await; + node.connect_to(&mut peer).await; let node_pubkey = node.pubkey; @@ -1842,42 +1671,32 @@ async fn test_to_be_accepted_channels_bytes_limit_is_per_peer() { )) }; - call!(peer1.network_actor, message) - .expect("peer1 alive") + call!(peer.network_actor, message) + .expect("peer alive") .expect("open channel"); node.expect_event(|event| match event { NetworkServiceEvent::ChannelPendingToBeAccepted(pubkey, _channel_id) => { - assert_eq!(pubkey, &peer1.pubkey); + assert_eq!(pubkey, &peer.pubkey); true } _ => false, }) .await; - call!(peer2.network_actor, message) - .expect("peer2 alive") + call!(peer.network_actor, message) + .expect("peer alive") .expect("open channel"); node.expect_event(|event| match event { NetworkServiceEvent::ChannelPendingToBeAccepted(pubkey, _channel_id) => { - assert_eq!(pubkey, &peer2.pubkey); + assert_eq!(pubkey, &peer.pubkey); true } _ => false, }) .await; - call!(peer3.network_actor, message) - .expect("peer3 alive") + call!(peer.network_actor, message) + .expect("peer alive") .expect("open channel"); - tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; - - let pending = call!(node.network_actor, |rpc_reply| { - NetworkActorMessage::Command(NetworkActorCommand::GetPendingAcceptChannels(rpc_reply)) - }) - .expect("node alive") - .expect("pending accept channels"); - assert_eq!(pending.len(), 3); - assert!(pending.iter().any(|channel| channel.pubkey == peer1.pubkey)); - assert!(pending.iter().any(|channel| channel.pubkey == peer2.pubkey)); - assert!(pending.iter().any(|channel| channel.pubkey == peer3.pubkey)); + node.expect_debug_event("ChannelPendingToBeRejected").await; } From 53f82f5eddc3416b3035f40a9883ca74885040c9 Mon Sep 17 00:00:00 2001 From: jjy Date: Tue, 17 Mar 2026 11:23:50 +0800 Subject: [PATCH 13/15] fix: treat empty session channel sets as no-channel peers --- crates/fiber-lib/src/fiber/network.rs | 8 ++- crates/fiber-lib/src/fiber/tests/network.rs | 54 +++++++++++++++++++++ 2 files changed, 61 insertions(+), 1 deletion(-) diff --git a/crates/fiber-lib/src/fiber/network.rs b/crates/fiber-lib/src/fiber/network.rs index 7a307e7d0..d019bcc03 100644 --- a/crates/fiber-lib/src/fiber/network.rs +++ b/crates/fiber-lib/src/fiber/network.rs @@ -3576,13 +3576,19 @@ where return Ok(()); } + fn session_has_channels(&self, session_id: &SessionId) -> bool { + self.session_channels_map + .get(session_id) + .is_some_and(|channels| !channels.is_empty()) + } + fn inbound_no_channel_peers_in_connected_order(&self) -> Vec<(Pubkey, SessionId)> { let mut peers = self .peer_session_map .iter() .filter_map(|(pubkey, peer)| { (peer.session_type == SessionType::Inbound - && !self.session_channels_map.contains_key(&peer.session_id)) + && !self.session_has_channels(&peer.session_id)) .then_some((peer.connected_order, *pubkey, peer.session_id)) }) .collect::>(); diff --git a/crates/fiber-lib/src/fiber/tests/network.rs b/crates/fiber-lib/src/fiber/tests/network.rs index 0ce11bd6e..3b3123fa9 100644 --- a/crates/fiber-lib/src/fiber/tests/network.rs +++ b/crates/fiber-lib/src/fiber/tests/network.rs @@ -860,6 +860,60 @@ async fn test_inbound_peer_with_channel_does_not_consume_no_channel_peer_budget( .any(|peer| peer.pubkey == peer_without_channel.pubkey)); } +#[tokio::test] +async fn test_inbound_peer_with_only_closed_channels_consumes_no_channel_peer_budget() { + init_tracing(); + + let mut target = NetworkNode::new_with_config( + NetworkNodeConfigBuilder::new() + .fiber_config_updater(|config| { + config.max_inbound_peers = Some(1); + config.min_outbound_peers = Some(0); + }) + .build(), + ) + .await; + let mut peer_with_closed_channel = NetworkNode::new().await; + let mut new_peer = NetworkNode::new().await; + + peer_with_closed_channel.connect_to(&mut target).await; + let (channel_id, _funding_tx_hash) = establish_channel_between_nodes( + &mut target, + &mut peer_with_closed_channel, + ChannelParameters::new(100_000_000_000, 11_800_000_000), + ) + .await; + + target + .send_shutdown(channel_id, true) + .await + .expect("force shutdown channel"); + tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; + target + .send_channel_shutdown_tx_confirmed_event(peer_with_closed_channel.pubkey, channel_id, true) + .await; + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + + new_peer.connect_to_nonblocking(&target).await; + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + + let peers = call!(target.network_actor, |rpc_reply| { + NetworkActorMessage::Command(NetworkActorCommand::ListPeers((), rpc_reply)) + }) + .expect("target alive") + .expect("list peers"); + + assert_eq!( + peers.len(), + 1, + "a peer whose last channel is closed should count as a no-channel peer and be evicted when over budget", + ); + assert_eq!( + peers[0].pubkey, new_peer.pubkey, + "target should retain the newest inbound no-channel peer after the old peer's last channel closes", + ); +} + #[tokio::test] async fn test_new_inbound_peer_can_open_channel_after_replacing_oldest_no_channel_peer() { init_tracing(); From c1babee9c3742587505ce6f745d12cc27569dee4 Mon Sep 17 00:00:00 2001 From: jjy Date: Mon, 23 Mar 2026 19:15:13 +0800 Subject: [PATCH 14/15] Clean duplicated code --- crates/fiber-lib/src/fiber/network.rs | 21 --------------------- 1 file changed, 21 deletions(-) diff --git a/crates/fiber-lib/src/fiber/network.rs b/crates/fiber-lib/src/fiber/network.rs index d019bcc03..efa91d234 100644 --- a/crates/fiber-lib/src/fiber/network.rs +++ b/crates/fiber-lib/src/fiber/network.rs @@ -1156,27 +1156,6 @@ where num_inbound_no_channel_peers, num_outbound_peers ); - if num_inbound_no_channel_peers > state.max_inbound_peers { - debug!( - "Already connected to {} inbound no-channel peers, only wants {} peers, disconnecting some", - num_inbound_no_channel_peers, state.max_inbound_peers - ); - let sessions_to_disconnect = inbound_no_channel_peers - .iter() - .take(num_inbound_no_channel_peers - state.max_inbound_peers) - .map(|(_, session_id)| *session_id) - .collect::>(); - debug!( - "Disconnecting inbound no-channel peer sessions {:?}", - sessions_to_disconnect - ); - for session in sessions_to_disconnect { - if let Err(err) = state.control.disconnect(session).await { - error!("Failed to disconnect session: {}", err); - } - } - } - if num_outbound_peers >= state.min_outbound_peers { debug!( "Already connected to {} outbound peers, wants a minimal of {} peers, skipping connecting to more peers", From 9a1d4328c86e7f8debfaddab8674758cd9dee5a2 Mon Sep 17 00:00:00 2001 From: jjy Date: Mon, 23 Mar 2026 19:56:16 +0800 Subject: [PATCH 15/15] Remove peer connected_order --- crates/fiber-lib/src/fiber/network.rs | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/crates/fiber-lib/src/fiber/network.rs b/crates/fiber-lib/src/fiber/network.rs index efa91d234..4fa185538 100644 --- a/crates/fiber-lib/src/fiber/network.rs +++ b/crates/fiber-lib/src/fiber/network.rs @@ -2901,7 +2901,6 @@ pub struct NetworkActorState { gossip_actor: Option>, max_inbound_peers: usize, min_outbound_peers: usize, - next_connected_peer_order: u64, // The features of the node, used to indicate the capabilities of the node. features: FeatureVector, channel_ephemeral_config: ChannelEphemeralConfig, @@ -2914,7 +2913,6 @@ pub struct NetworkActorState { pub struct ConnectedPeer { pub session_id: SessionId, pub session_type: SessionType, - pub connected_order: u64, pub address: Multiaddr, pub features: Option, } @@ -3568,14 +3566,11 @@ where .filter_map(|(pubkey, peer)| { (peer.session_type == SessionType::Inbound && !self.session_has_channels(&peer.session_id)) - .then_some((peer.connected_order, *pubkey, peer.session_id)) + .then_some((*pubkey, peer.session_id)) }) .collect::>(); - peers.sort_by_key(|(connected_order, _, _)| *connected_order); + peers.sort_by_key(|(_, session_id)| *session_id); peers - .into_iter() - .map(|(_, pubkey, session_id)| (pubkey, session_id)) - .collect() } async fn enforce_inbound_peer_budget(&mut self) { @@ -3858,14 +3853,11 @@ where async fn on_peer_connected(&mut self, remote_pubkey: Pubkey, session: &SessionContext) { debug!("Peer {:?} connected", remote_pubkey); - let connected_order = self.next_connected_peer_order; - self.next_connected_peer_order = self.next_connected_peer_order.saturating_add(1); self.peer_session_map.insert( remote_pubkey, ConnectedPeer { session_id: session.id, session_type: session.ty, - connected_order, address: session.address.clone(), features: None, }, @@ -4697,7 +4689,6 @@ where gossip_actor, max_inbound_peers: config.max_inbound_peers(), min_outbound_peers: config.min_outbound_peers(), - next_connected_peer_order: 0, features, channel_ephemeral_config: ChannelEphemeralConfig { funding_timeout_seconds: config.funding_timeout_seconds,