@@ -718,19 +718,11 @@ enum MessageBatchImpl {
718
718
CommitmentSigned ( Vec < msgs:: CommitmentSigned > ) ,
719
719
}
720
720
721
- /// The ratio between buffer sizes at which we stop sending initial sync messages vs when we stop
722
- /// forwarding gossip messages to peers altogether.
723
- const FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO : usize = 2 ;
724
-
725
721
/// When the outbound buffer has this many messages, we'll stop reading bytes from the peer until
726
722
/// we have fewer than this many messages in the outbound buffer again.
727
723
/// We also use this as the target number of outbound gossip messages to keep in the write buffer,
728
724
/// refilled as we send bytes.
729
725
const OUTBOUND_BUFFER_LIMIT_READ_PAUSE : usize = 12 ;
730
- /// When the outbound buffer has this many messages, we'll simply skip relaying gossip messages to
731
- /// the peer.
732
- const OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP : usize =
733
- OUTBOUND_BUFFER_LIMIT_READ_PAUSE * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO ;
734
726
735
727
/// If we've sent a ping, and are still awaiting a response, we may need to churn our way through
736
728
/// the socket receive buffer before receiving the ping.
@@ -754,10 +746,20 @@ const MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER: i8 = 4;
754
746
/// process before the next ping.
755
747
///
756
748
/// Note that we continue responding to other messages even after we've sent this many messages, so
757
- /// it's more of a general guideline used for gossip backfill (and gossip forwarding, times
758
- /// [`FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO`]) than a hard limit.
749
+ /// this really limits gossip broadcast, gossip backfill, and onion message relay.
759
750
const BUFFER_DRAIN_MSGS_PER_TICK : usize = 32 ;
760
751
752
+ /// The maximum number of bytes which we allow in a peer's outbound buffers before we start
753
+ /// dropping outbound gossip forwards.
754
+ ///
755
+ /// This is currently 128KiB, or two messages at the maximum message size (though in practice we
756
+ /// refuse to forward gossip messages which are substantially larger than we expect, so this is
757
+ /// closer to ~85 messages if all queued messages are maximum-sized channel announcements).
758
+ ///
759
+ /// Note that as we always drain the gossip forwarding queue before continuing gossip backfill,
760
+ /// the equivalent maximum buffer size for gossip backfill is zero.
761
+ const OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP : usize = 64 * 1024 * 2 ;
762
+
761
763
struct Peer {
762
764
channel_encryptor : PeerChannelEncryptor ,
763
765
/// We cache a `NodeId` here to avoid serializing peers' keys every time we forward gossip
@@ -889,12 +891,11 @@ impl Peer {
889
891
890
892
/// Returns whether this peer's outbound buffers are full and we should drop gossip broadcasts.
891
893
fn buffer_full_drop_gossip_broadcast ( & self ) -> bool {
892
- let total_outbound_buffered =
893
- self . gossip_broadcast_buffer . len ( ) + self . pending_outbound_buffer . len ( ) ;
894
+ let total_outbound_buffered: usize =
895
+ self . gossip_broadcast_buffer . iter ( ) . map ( |m| m. capacity ( ) ) . sum :: < usize > ( )
896
+ + self . pending_outbound_buffer . iter ( ) . map ( |m| m. capacity ( ) ) . sum :: < usize > ( ) ;
894
897
895
- total_outbound_buffered > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
896
- || self . msgs_sent_since_pong
897
- > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
898
+ total_outbound_buffered > OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP
898
899
}
899
900
900
901
fn set_their_node_id ( & mut self , node_id : PublicKey ) {
@@ -4693,22 +4694,27 @@ mod tests {
4693
4694
let secp_ctx = Secp256k1 :: new ( ) ;
4694
4695
let key = SecretKey :: from_slice ( & [ 1 ; 32 ] ) . unwrap ( ) ;
4695
4696
let msg = channel_announcement ( & key, & key, ChannelFeatures :: empty ( ) , 42 , & secp_ctx) ;
4697
+ // The message bufer size is the message length plus two 16-byte MACs plus a 2-byte length
4698
+ // and 2-byte type.
4699
+ let encoded_size = msg. serialized_length ( ) + 16 * 2 + 2 + 2 ;
4696
4700
let msg_ev = MessageSendEvent :: BroadcastChannelAnnouncement { msg, update_msg : None } ;
4697
4701
4698
4702
fd_a. hang_writes . store ( true , Ordering :: Relaxed ) ;
4699
4703
4700
4704
// Now push an arbitrarily large number of messages and check that only
4701
- // `OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP` messages end up in the queue.
4702
- for _ in 0 ..OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP * 2 {
4705
+ // `OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP` message bytes end up in the queue.
4706
+ for _ in 0 ..OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP / encoded_size {
4703
4707
cfgs[ 0 ] . routing_handler . pending_events . lock ( ) . unwrap ( ) . push ( msg_ev. clone ( ) ) ;
4704
4708
peers[ 0 ] . process_events ( ) ;
4705
4709
}
4706
4710
4707
4711
{
4708
4712
let peer_a_lock = peers[ 0 ] . peers . read ( ) . unwrap ( ) ;
4709
- let buf_len =
4710
- peer_a_lock. get ( & fd_a) . unwrap ( ) . lock ( ) . unwrap ( ) . gossip_broadcast_buffer . len ( ) ;
4711
- assert_eq ! ( buf_len, OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP ) ;
4713
+ let peer = peer_a_lock. get ( & fd_a) . unwrap ( ) . lock ( ) . unwrap ( ) ;
4714
+ let buf_len = peer. pending_outbound_buffer . iter ( ) . map ( |m| m. capacity ( ) ) . sum :: < usize > ( )
4715
+ + peer. gossip_broadcast_buffer . iter ( ) . map ( |m| m. capacity ( ) ) . sum :: < usize > ( ) ;
4716
+ assert ! ( buf_len > OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP - encoded_size) ;
4717
+ assert ! ( buf_len < OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP ) ;
4712
4718
}
4713
4719
4714
4720
// Check that if a broadcast message comes in from the channel handler (i.e. it is an
@@ -4718,14 +4724,17 @@ mod tests {
4718
4724
4719
4725
{
4720
4726
let peer_a_lock = peers[ 0 ] . peers . read ( ) . unwrap ( ) ;
4721
- let buf_len =
4722
- peer_a_lock. get ( & fd_a) . unwrap ( ) . lock ( ) . unwrap ( ) . gossip_broadcast_buffer . len ( ) ;
4723
- assert_eq ! ( buf_len, OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP + 1 ) ;
4727
+ let peer = peer_a_lock. get ( & fd_a) . unwrap ( ) . lock ( ) . unwrap ( ) ;
4728
+ let buf_len = peer. pending_outbound_buffer . iter ( ) . map ( |m| m. capacity ( ) ) . sum :: < usize > ( )
4729
+ + peer. gossip_broadcast_buffer . iter ( ) . map ( |m| m. capacity ( ) ) . sum :: < usize > ( ) ;
4730
+ assert ! ( buf_len > OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP ) ;
4731
+ assert ! ( buf_len < OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP + encoded_size) ;
4724
4732
}
4725
4733
4726
4734
// Finally, deliver all the messages and make sure we got the right count. Note that there
4727
4735
// was an extra message that had already moved from the broadcast queue to the encrypted
4728
- // message queue so we actually receive `OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP + 2` messages.
4736
+ // message queue so we actually receive `OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP + 2`
4737
+ // message bytes.
4729
4738
fd_a. hang_writes . store ( false , Ordering :: Relaxed ) ;
4730
4739
cfgs[ 1 ] . routing_handler . chan_anns_recvd . store ( 0 , Ordering :: Relaxed ) ;
4731
4740
peers[ 0 ] . write_buffer_space_avail ( & mut fd_a) . unwrap ( ) ;
@@ -4740,7 +4749,7 @@ mod tests {
4740
4749
4741
4750
assert_eq ! (
4742
4751
cfgs[ 1 ] . routing_handler. chan_anns_recvd. load( Ordering :: Relaxed ) ,
4743
- OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP + 2
4752
+ OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP / encoded_size + 1
4744
4753
) ;
4745
4754
}
4746
4755
0 commit comments