From 4b60d660da036ff9fabae4dfd7b30541adae5c75 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Fri, 19 Sep 2025 14:12:31 +0000 Subject: [PATCH] Limit outbound gossip buffer by size, rather than length In 686a586c96aae6901d533646fc135379f825eb0d we stopped punishing peers for slowly draining the gossip forwarding buffer, delaying responding to our ping message. While that change was nice on its own, it also now allows us to be a bit more flexible with what enters the `gossip_broadcast_buffer`. Because we now do not count pending messages in `gossip_broadcast_buffer` against the peer's ping-response timer, there's no reason to continue to limit it based on `messages_sent_since_pong`. Thus, we drop that restriction here. However, in practice, the reason for the vast majority of gossip forwarding drops on my node is the 24-message total queue limit, rather than the `messages_sent_since_pong` limit. This limit was set to bend over backwards trying to avoid counting message buffer sizes while keeping peer message buffers small. In practice, there is really no reason to do that - summing the capacity of tens of buffers is negligible cost and allows us to be much more flexible with how many messages we queue. Here we do so, limiting the total outbound message buffer size before gossip forwards are dropped to 128 KiB per peer, rather than 24 messages. In practice, this appears to almost entirely remove gossip forward drops on my node. --- lightning/src/ln/peer_channel_encryptor.rs | 5 ++ lightning/src/ln/peer_handler.rs | 59 +++++++++++++--------- 2 files changed, 39 insertions(+), 25 deletions(-) diff --git a/lightning/src/ln/peer_channel_encryptor.rs b/lightning/src/ln/peer_channel_encryptor.rs index c7a7ba51be4..09b970a9ab2 100644 --- a/lightning/src/ln/peer_channel_encryptor.rs +++ b/lightning/src/ln/peer_channel_encryptor.rs @@ -641,6 +641,11 @@ impl PeerChannelEncryptor { /// padding to allow for future encryption/MACing. pub struct MessageBuf(Vec); impl MessageBuf { + /// The total allocated space for this message + pub fn capacity(&self) -> usize { + self.0.capacity() + } + /// Creates a new buffer from an encoded message (i.e. the two message-type bytes followed by /// the message contents). /// diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index c41fe1fc8c0..3cf6c6cc2ad 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -718,19 +718,11 @@ enum MessageBatchImpl { CommitmentSigned(Vec), } -/// The ratio between buffer sizes at which we stop sending initial sync messages vs when we stop -/// forwarding gossip messages to peers altogether. -const FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO: usize = 2; - /// When the outbound buffer has this many messages, we'll stop reading bytes from the peer until /// we have fewer than this many messages in the outbound buffer again. /// We also use this as the target number of outbound gossip messages to keep in the write buffer, /// refilled as we send bytes. const OUTBOUND_BUFFER_LIMIT_READ_PAUSE: usize = 12; -/// When the outbound buffer has this many messages, we'll simply skip relaying gossip messages to -/// the peer. -const OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP: usize = - OUTBOUND_BUFFER_LIMIT_READ_PAUSE * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO; /// If we've sent a ping, and are still awaiting a response, we may need to churn our way through /// the socket receive buffer before receiving the ping. @@ -754,10 +746,20 @@ const MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER: i8 = 4; /// process before the next ping. /// /// Note that we continue responding to other messages even after we've sent this many messages, so -/// it's more of a general guideline used for gossip backfill (and gossip forwarding, times -/// [`FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO`]) than a hard limit. +/// this really limits gossip broadcast, gossip backfill, and onion message relay. const BUFFER_DRAIN_MSGS_PER_TICK: usize = 32; +/// The maximum number of bytes which we allow in a peer's outbound buffers before we start +/// dropping outbound gossip forwards. +/// +/// This is currently 128KiB, or two messages at the maximum message size (though in practice we +/// refuse to forward gossip messages which are substantially larger than we expect, so this is +/// closer to ~85 messages if all queued messages are maximum-sized channel announcements). +/// +/// Note that as we always drain the gossip forwarding queue before continuing gossip backfill, +/// the equivalent maximum buffer size for gossip backfill is zero. +const OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP: usize = 64 * 1024 * 2; + struct Peer { channel_encryptor: PeerChannelEncryptor, /// We cache a `NodeId` here to avoid serializing peers' keys every time we forward gossip @@ -889,12 +891,11 @@ impl Peer { /// Returns whether this peer's outbound buffers are full and we should drop gossip broadcasts. fn buffer_full_drop_gossip_broadcast(&self) -> bool { - let total_outbound_buffered = - self.gossip_broadcast_buffer.len() + self.pending_outbound_buffer.len(); + let total_outbound_buffered: usize = + self.gossip_broadcast_buffer.iter().map(|m| m.capacity()).sum::() + + self.pending_outbound_buffer.iter().map(|m| m.capacity()).sum::(); - total_outbound_buffered > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP - || self.msgs_sent_since_pong - > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO + total_outbound_buffered > OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP } fn set_their_node_id(&mut self, node_id: PublicKey) { @@ -4693,22 +4694,27 @@ mod tests { let secp_ctx = Secp256k1::new(); let key = SecretKey::from_slice(&[1; 32]).unwrap(); let msg = channel_announcement(&key, &key, ChannelFeatures::empty(), 42, &secp_ctx); + // The message bufer size is the message length plus two 16-byte MACs plus a 2-byte length + // and 2-byte type. + let encoded_size = msg.serialized_length() + 16 * 2 + 2 + 2; let msg_ev = MessageSendEvent::BroadcastChannelAnnouncement { msg, update_msg: None }; fd_a.hang_writes.store(true, Ordering::Relaxed); // Now push an arbitrarily large number of messages and check that only - // `OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP` messages end up in the queue. - for _ in 0..OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP * 2 { + // `OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP` message bytes end up in the queue. + for _ in 0..OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP / encoded_size { cfgs[0].routing_handler.pending_events.lock().unwrap().push(msg_ev.clone()); peers[0].process_events(); } { let peer_a_lock = peers[0].peers.read().unwrap(); - let buf_len = - peer_a_lock.get(&fd_a).unwrap().lock().unwrap().gossip_broadcast_buffer.len(); - assert_eq!(buf_len, OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP); + let peer = peer_a_lock.get(&fd_a).unwrap().lock().unwrap(); + let buf_len = peer.pending_outbound_buffer.iter().map(|m| m.capacity()).sum::() + + peer.gossip_broadcast_buffer.iter().map(|m| m.capacity()).sum::(); + assert!(buf_len > OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP - encoded_size); + assert!(buf_len < OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP); } // Check that if a broadcast message comes in from the channel handler (i.e. it is an @@ -4718,14 +4724,17 @@ mod tests { { let peer_a_lock = peers[0].peers.read().unwrap(); - let buf_len = - peer_a_lock.get(&fd_a).unwrap().lock().unwrap().gossip_broadcast_buffer.len(); - assert_eq!(buf_len, OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP + 1); + let peer = peer_a_lock.get(&fd_a).unwrap().lock().unwrap(); + let buf_len = peer.pending_outbound_buffer.iter().map(|m| m.capacity()).sum::() + + peer.gossip_broadcast_buffer.iter().map(|m| m.capacity()).sum::(); + assert!(buf_len > OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP); + assert!(buf_len < OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP + encoded_size); } // Finally, deliver all the messages and make sure we got the right count. Note that there // was an extra message that had already moved from the broadcast queue to the encrypted - // message queue so we actually receive `OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP + 2` messages. + // message queue so we actually receive `OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP + 2` + // message bytes. fd_a.hang_writes.store(false, Ordering::Relaxed); cfgs[1].routing_handler.chan_anns_recvd.store(0, Ordering::Relaxed); peers[0].write_buffer_space_avail(&mut fd_a).unwrap(); @@ -4740,7 +4749,7 @@ mod tests { assert_eq!( cfgs[1].routing_handler.chan_anns_recvd.load(Ordering::Relaxed), - OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP + 2 + OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP / encoded_size + 1 ); }