diff --git a/lightning/src/ln/peer_channel_encryptor.rs b/lightning/src/ln/peer_channel_encryptor.rs index c7a7ba51be4..09b970a9ab2 100644 --- a/lightning/src/ln/peer_channel_encryptor.rs +++ b/lightning/src/ln/peer_channel_encryptor.rs @@ -641,6 +641,11 @@ impl PeerChannelEncryptor { /// padding to allow for future encryption/MACing. pub struct MessageBuf(Vec); impl MessageBuf { + /// The total allocated space for this message + pub fn capacity(&self) -> usize { + self.0.capacity() + } + /// Creates a new buffer from an encoded message (i.e. the two message-type bytes followed by /// the message contents). /// diff --git a/lightning/src/ln/peer_handler.rs b/lightning/src/ln/peer_handler.rs index c41fe1fc8c0..3cf6c6cc2ad 100644 --- a/lightning/src/ln/peer_handler.rs +++ b/lightning/src/ln/peer_handler.rs @@ -718,19 +718,11 @@ enum MessageBatchImpl { CommitmentSigned(Vec), } -/// The ratio between buffer sizes at which we stop sending initial sync messages vs when we stop -/// forwarding gossip messages to peers altogether. -const FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO: usize = 2; - /// When the outbound buffer has this many messages, we'll stop reading bytes from the peer until /// we have fewer than this many messages in the outbound buffer again. /// We also use this as the target number of outbound gossip messages to keep in the write buffer, /// refilled as we send bytes. const OUTBOUND_BUFFER_LIMIT_READ_PAUSE: usize = 12; -/// When the outbound buffer has this many messages, we'll simply skip relaying gossip messages to -/// the peer. -const OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP: usize = - OUTBOUND_BUFFER_LIMIT_READ_PAUSE * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO; /// If we've sent a ping, and are still awaiting a response, we may need to churn our way through /// the socket receive buffer before receiving the ping. @@ -754,10 +746,20 @@ const MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER: i8 = 4; /// process before the next ping. /// /// Note that we continue responding to other messages even after we've sent this many messages, so -/// it's more of a general guideline used for gossip backfill (and gossip forwarding, times -/// [`FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO`]) than a hard limit. +/// this really limits gossip broadcast, gossip backfill, and onion message relay. const BUFFER_DRAIN_MSGS_PER_TICK: usize = 32; +/// The maximum number of bytes which we allow in a peer's outbound buffers before we start +/// dropping outbound gossip forwards. +/// +/// This is currently 128KiB, or two messages at the maximum message size (though in practice we +/// refuse to forward gossip messages which are substantially larger than we expect, so this is +/// closer to ~85 messages if all queued messages are maximum-sized channel announcements). +/// +/// Note that as we always drain the gossip forwarding queue before continuing gossip backfill, +/// the equivalent maximum buffer size for gossip backfill is zero. +const OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP: usize = 64 * 1024 * 2; + struct Peer { channel_encryptor: PeerChannelEncryptor, /// We cache a `NodeId` here to avoid serializing peers' keys every time we forward gossip @@ -889,12 +891,11 @@ impl Peer { /// Returns whether this peer's outbound buffers are full and we should drop gossip broadcasts. fn buffer_full_drop_gossip_broadcast(&self) -> bool { - let total_outbound_buffered = - self.gossip_broadcast_buffer.len() + self.pending_outbound_buffer.len(); + let total_outbound_buffered: usize = + self.gossip_broadcast_buffer.iter().map(|m| m.capacity()).sum::() + + self.pending_outbound_buffer.iter().map(|m| m.capacity()).sum::(); - total_outbound_buffered > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP - || self.msgs_sent_since_pong - > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO + total_outbound_buffered > OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP } fn set_their_node_id(&mut self, node_id: PublicKey) { @@ -4693,22 +4694,27 @@ mod tests { let secp_ctx = Secp256k1::new(); let key = SecretKey::from_slice(&[1; 32]).unwrap(); let msg = channel_announcement(&key, &key, ChannelFeatures::empty(), 42, &secp_ctx); + // The message bufer size is the message length plus two 16-byte MACs plus a 2-byte length + // and 2-byte type. + let encoded_size = msg.serialized_length() + 16 * 2 + 2 + 2; let msg_ev = MessageSendEvent::BroadcastChannelAnnouncement { msg, update_msg: None }; fd_a.hang_writes.store(true, Ordering::Relaxed); // Now push an arbitrarily large number of messages and check that only - // `OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP` messages end up in the queue. - for _ in 0..OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP * 2 { + // `OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP` message bytes end up in the queue. + for _ in 0..OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP / encoded_size { cfgs[0].routing_handler.pending_events.lock().unwrap().push(msg_ev.clone()); peers[0].process_events(); } { let peer_a_lock = peers[0].peers.read().unwrap(); - let buf_len = - peer_a_lock.get(&fd_a).unwrap().lock().unwrap().gossip_broadcast_buffer.len(); - assert_eq!(buf_len, OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP); + let peer = peer_a_lock.get(&fd_a).unwrap().lock().unwrap(); + let buf_len = peer.pending_outbound_buffer.iter().map(|m| m.capacity()).sum::() + + peer.gossip_broadcast_buffer.iter().map(|m| m.capacity()).sum::(); + assert!(buf_len > OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP - encoded_size); + assert!(buf_len < OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP); } // Check that if a broadcast message comes in from the channel handler (i.e. it is an @@ -4718,14 +4724,17 @@ mod tests { { let peer_a_lock = peers[0].peers.read().unwrap(); - let buf_len = - peer_a_lock.get(&fd_a).unwrap().lock().unwrap().gossip_broadcast_buffer.len(); - assert_eq!(buf_len, OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP + 1); + let peer = peer_a_lock.get(&fd_a).unwrap().lock().unwrap(); + let buf_len = peer.pending_outbound_buffer.iter().map(|m| m.capacity()).sum::() + + peer.gossip_broadcast_buffer.iter().map(|m| m.capacity()).sum::(); + assert!(buf_len > OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP); + assert!(buf_len < OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP + encoded_size); } // Finally, deliver all the messages and make sure we got the right count. Note that there // was an extra message that had already moved from the broadcast queue to the encrypted - // message queue so we actually receive `OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP + 2` messages. + // message queue so we actually receive `OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP + 2` + // message bytes. fd_a.hang_writes.store(false, Ordering::Relaxed); cfgs[1].routing_handler.chan_anns_recvd.store(0, Ordering::Relaxed); peers[0].write_buffer_space_avail(&mut fd_a).unwrap(); @@ -4740,7 +4749,7 @@ mod tests { assert_eq!( cfgs[1].routing_handler.chan_anns_recvd.load(Ordering::Relaxed), - OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP + 2 + OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP / encoded_size + 1 ); }