Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 81 additions & 12 deletions crates/fiber-lib/src/fiber/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,6 @@ pub const COMMITMENT_CELL_WITNESS_LEN: usize = 16 + 1 + 32 + 64;
// triggered 10 times per second, plus we also trigger `apply_retryable_tlc_operations` when
// receiving ACK from peer, so it's a reason number for 20 TPS
const RETRYABLE_TLC_OPS_INTERVAL: Duration = Duration::from_millis(100);
const WAITING_REESTABLISH_FINISH_TIMEOUT: Duration = Duration::from_millis(4000);

// if a important TLC operation is not acked in 30 seconds, we will try to disconnect the peer.
#[cfg(not(any(test, feature = "bench")))]
Expand Down Expand Up @@ -178,6 +177,8 @@ pub enum ChannelCommand {
Update(UpdateCommand, RpcReplyPort<Result<(), String>>),
NotifyEvent(ChannelEvent),
#[cfg(any(test, feature = "bench"))]
SetDeferPeerTlcUpdates(bool),
#[cfg(any(test, feature = "bench"))]
ReloadState(ReloadParams),
}

Expand All @@ -194,6 +195,10 @@ impl Display for ChannelCommand {
ChannelCommand::Update(_, _) => write!(f, "Update"),
ChannelCommand::NotifyEvent(event) => write!(f, "NotifyEvent [{:?}]", event),
#[cfg(any(test, feature = "bench"))]
ChannelCommand::SetDeferPeerTlcUpdates(enabled) => {
write!(f, "SetDeferPeerTlcUpdates [{enabled}]")
}
#[cfg(any(test, feature = "bench"))]
ChannelCommand::ReloadState(_) => write!(f, "ReloadState"),
}
}
Expand Down Expand Up @@ -532,6 +537,10 @@ where
self.apply_retryable_tlc_operations(myself, state, false)
.await;
}
if state.finish_pending_reestablish_channel_ready(myself) {
state.schedule_next_retry_task(myself);
debug_event!(self.network, "Reestablished channel in ChannelReady");
}
Ok(())
}
FiberChannelMessage::ChannelReady(_channel_ready) => {
Expand Down Expand Up @@ -565,14 +574,17 @@ where
}
FiberChannelMessage::AddTlc(add_tlc) => {
if state.defer_peer_tlc_updates {
state.queue_deferred_peer_tlc_update(DeferredPeerTlcUpdate::Add(add_tlc));
state
.try_queue_deferred_peer_tlc_update(DeferredPeerTlcUpdate::Add(add_tlc))?;
return Ok(());
}
self.handle_add_tlc_peer_message(state, add_tlc)
}
FiberChannelMessage::RemoveTlc(remove_tlc) => {
if state.defer_peer_tlc_updates {
state.queue_deferred_peer_tlc_update(DeferredPeerTlcUpdate::Remove(remove_tlc));
state.try_queue_deferred_peer_tlc_update(DeferredPeerTlcUpdate::Remove(
remove_tlc,
))?;
return Ok(());
}
self.handle_remove_tlc_peer_message(state, remove_tlc)
Expand Down Expand Up @@ -2334,6 +2346,15 @@ where
}
ChannelCommand::NotifyEvent(event) => self.handle_event(myself, state, event).await,
#[cfg(any(test, feature = "bench"))]
ChannelCommand::SetDeferPeerTlcUpdates(enabled) => {
if enabled {
state.start_defer_peer_tlc_updates();
} else {
state.stop_defer_peer_tlc_updates();
}
Ok(())
}
#[cfg(any(test, feature = "bench"))]
ChannelCommand::ReloadState(reload_params) => {
let private_key = state.private_key.clone();
*state = self
Expand Down Expand Up @@ -3252,6 +3273,10 @@ pub struct ChannelActorState {
pub core: ChannelActorData,

// --- Runtime-only fields (not serialized) ---
/// Reestablish replay has resumed message flow, but we still owe the network actor a
/// `ChannelReady` notification once the missing peer acknowledgment arrives.
#[doc = "skip_store"]
pub pending_reestablish_channel_ready: bool,
/// Temporarily defer peer TLC updates while replaying dual-owed state.
#[doc = "skip_store"]
pub defer_peer_tlc_updates: bool,
Expand Down Expand Up @@ -3315,6 +3340,7 @@ impl<'de> Deserialize<'de> for ChannelActorState {
network: None,
scheduled_channel_update_handle: None,
pending_notify_settle_tlcs: vec![],
pending_reestablish_channel_ready: false,
defer_peer_tlc_updates: false,
deferred_peer_tlc_updates: VecDeque::new(),
ephemeral_config: Default::default(),
Expand Down Expand Up @@ -3623,13 +3649,33 @@ impl ChannelActorState {
}
}

fn queue_deferred_peer_tlc_update(&mut self, update: DeferredPeerTlcUpdate) {
fn max_deferred_peer_tlc_updates(&self) -> usize {
self.local_constraints
.max_tlc_number_in_flight
.saturating_add(self.remote_constraints.max_tlc_number_in_flight) as usize
}

fn try_queue_deferred_peer_tlc_update(
&mut self,
update: DeferredPeerTlcUpdate,
) -> ProcessingChannelResult {
let max_deferred_updates = self.max_deferred_peer_tlc_updates();
if self.deferred_peer_tlc_updates.len() >= max_deferred_updates {
return Err(ProcessingChannelError::InvalidState(format!(
"Too many deferred peer TLC updates while replaying channel {}: queued {}, limit {}",
self.get_id(),
self.deferred_peer_tlc_updates.len(),
max_deferred_updates
)));
}

self.deferred_peer_tlc_updates.push_back(update);
debug!(
"Deferred peer TLC update for channel {} (queued={})",
self.get_id(),
self.deferred_peer_tlc_updates.len()
);
Ok(())
}

fn log_ack_state(&self, context: &str) {
Expand Down Expand Up @@ -4056,6 +4102,7 @@ impl ChannelActorState {
network: Some(network),
scheduled_channel_update_handle: None,
pending_notify_settle_tlcs: vec![],
pending_reestablish_channel_ready: false,
defer_peer_tlc_updates: false,
deferred_peer_tlc_updates: VecDeque::new(),
ephemeral_config: Default::default(),
Expand Down Expand Up @@ -4146,6 +4193,7 @@ impl ChannelActorState {
network: Some(network),
scheduled_channel_update_handle: None,
pending_notify_settle_tlcs: vec![],
pending_reestablish_channel_ready: false,
defer_peer_tlc_updates: false,
deferred_peer_tlc_updates: VecDeque::new(),
ephemeral_config: Default::default(),
Expand Down Expand Up @@ -5963,21 +6011,33 @@ impl ChannelActorState {
return;
};

self.pending_reestablish_channel_ready = false;
self.reestablishing = false;

// If the channel is already ready, we should notify the network actor.
// so that we update the network.outpoint_channel_map
let channel_id = self.get_id();
let pubkey = self.get_remote_pubkey();
self.network()
.send_after(WAITING_REESTABLISH_FINISH_TIMEOUT, move || {
NetworkActorMessage::new_event(NetworkActorEvent::ChannelReady(
channel_id, pubkey, outpoint,
))
});
.send_message(NetworkActorMessage::new_event(
NetworkActorEvent::ChannelReady(channel_id, pubkey, outpoint),
))
.expect(ASSUME_NETWORK_ACTOR_ALIVE);
self.on_owned_channel_updated(myself, false);
}

fn finish_pending_reestablish_channel_ready(
&mut self,
myself: &ActorRef<ChannelActorMessage>,
) -> bool {
if !self.pending_reestablish_channel_ready {
return false;
}

self.on_reestablished_channel_ready(myself);
true
}

fn resume_funding(&mut self, myself: &ActorRef<ChannelActorMessage>) {
match self.state {
ChannelState::AwaitingTxSignatures(mut flags) => {
Expand Down Expand Up @@ -6256,12 +6316,14 @@ impl ChannelActorState {
}
ChannelState::ChannelReady => {
self.clear_waiting_peer_response();
self.pending_reestablish_channel_ready = false;

let my_local_commitment_number = self.get_local_commitment_number();
let my_remote_commitment_number = self.get_remote_commitment_number();
let my_waiting_ack = self.tlc_state.waiting_ack;
let peer_local_commitment_number = reestablish_channel.local_commitment_number;
let peer_remote_commitment_number = reestablish_channel.remote_commitment_number;
let mut reestablish_complete = true;

warn!(
"peer: {:?} \
Expand Down Expand Up @@ -6361,11 +6423,18 @@ impl ChannelActorState {
self.resend_tlcs_on_reestablish(true)?;
}
} else {
// ignore, waiting for remote peer to resend revoke_and_ack
// Wait for the peer to resend the missing revoke_and_ack before declaring the
// channel ready again. We must resume normal message handling so that ack can
// be processed, but we delay the ready notification until then.
self.reestablishing = false;
self.pending_reestablish_channel_ready = true;
reestablish_complete = false;
}

self.on_reestablished_channel_ready(myself);
debug_event!(network, "Reestablished channel in ChannelReady");
if reestablish_complete {
self.on_reestablished_channel_ready(myself);
debug_event!(network, "Reestablished channel in ChannelReady");
}
}
ChannelState::ShuttingDown(flags) => {
// Resend the shutdown message to the peer if we have not received the peer's shutdown message.
Expand Down
104 changes: 68 additions & 36 deletions crates/fiber-lib/src/fiber/network.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1147,39 +1147,14 @@ where
}
}

let mut inbound_peer_sessions = state.inbound_peer_sessions();
let num_inbound_peers = inbound_peer_sessions.len();
let inbound_no_channel_peers = state.inbound_no_channel_peers_in_connected_order();
let num_inbound_no_channel_peers = inbound_no_channel_peers.len();
let num_outbound_peers = state.num_of_outbound_peers();

debug!("Maintaining network connections ticked: current num inbound peers {}, current num outbound peers {}", num_inbound_peers, num_outbound_peers);

if num_inbound_peers > state.max_inbound_peers {
debug!(
"Already connected to {} inbound peers, only wants {} peers, disconnecting some",
num_inbound_peers, state.max_inbound_peers
);
inbound_peer_sessions.retain(|k| !state.session_channels_map.contains_key(k));
let sessions_to_disconnect = if inbound_peer_sessions.len()
< num_inbound_peers - state.max_inbound_peers
{
warn!(
"Wants to disconnect more {} inbound peers, but all peers except {:?} have channels, will not disconnect any peer with channels",
num_inbound_peers - state.max_inbound_peers, &inbound_peer_sessions
);
&inbound_peer_sessions[..]
} else {
&inbound_peer_sessions[..num_inbound_peers - state.max_inbound_peers]
};
debug!(
"Disconnecting inbound peer sessions {:?}",
sessions_to_disconnect
);
for session in sessions_to_disconnect {
if let Err(err) = state.control.disconnect(*session).await {
error!("Failed to disconnect session: {}", err);
}
}
}
debug!(
"Maintaining network connections ticked: current num inbound no-channel peers {}, current num outbound peers {}",
num_inbound_no_channel_peers, num_outbound_peers
);

if num_outbound_peers >= state.min_outbound_peers {
debug!(
Expand Down Expand Up @@ -3578,11 +3553,56 @@ where
return Ok(());
}

fn inbound_peer_sessions(&self) -> Vec<SessionId> {
self.peer_session_map
.values()
.filter_map(|s| (s.session_type == SessionType::Inbound).then_some(s.session_id))
.collect()
fn session_has_channels(&self, session_id: &SessionId) -> bool {
self.session_channels_map
.get(session_id)
.is_some_and(|channels| !channels.is_empty())
}

fn inbound_no_channel_peers_in_connected_order(&self) -> Vec<(Pubkey, SessionId)> {
let mut peers = self
.peer_session_map
.iter()
.filter_map(|(pubkey, peer)| {
(peer.session_type == SessionType::Inbound
&& !self.session_has_channels(&peer.session_id))
.then_some((*pubkey, peer.session_id))
})
.collect::<Vec<_>>();
peers.sort_by_key(|(_, session_id)| *session_id);
peers
}

async fn enforce_inbound_peer_budget(&mut self) {
let inbound_no_channel_peers = self.inbound_no_channel_peers_in_connected_order();
if inbound_no_channel_peers.len() <= self.max_inbound_peers {
return;
}
let excess_peers = inbound_no_channel_peers.len() - self.max_inbound_peers;

for (pubkey, session_id) in inbound_no_channel_peers.into_iter().take(excess_peers) {
debug!(
"Disconnecting inbound no-channel peer {:?} on session {:?} immediately after connect",
pubkey, session_id
);
match self.control.disconnect(session_id).await {
Ok(()) => {
if matches!(
self.peer_session_map.get(&pubkey),
Some(peer) if peer.session_id == session_id
) {
self.peer_session_map.remove(&pubkey);
}
self.session_channels_map.remove(&session_id);
}
Err(err) => {
error!(
"Failed to disconnect inbound no-channel peer {:?} on session {:?}: {}",
pubkey, session_id, err
);
}
}
}
}

fn num_of_outbound_peers(&self) -> usize {
Expand Down Expand Up @@ -3856,6 +3876,18 @@ where
}
}

self.enforce_inbound_peer_budget().await;
if !matches!(
self.peer_session_map.get(&remote_pubkey),
Some(peer) if peer.session_id == session.id
) {
debug!(
"Peer {:?} session {:?} was disconnected by inbound peer admission control",
remote_pubkey, session.id
);
return;
}

if self.auto_announce {
let message = self.get_or_create_new_node_announcement_message();
debug!(
Expand Down
Loading
Loading