Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions lightning/src/ln/peer_channel_encryptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,11 @@ impl PeerChannelEncryptor {
/// padding to allow for future encryption/MACing.
pub struct MessageBuf(Vec<u8>);
impl MessageBuf {
/// The total allocated space for this message
pub fn capacity(&self) -> usize {
self.0.capacity()
}

/// Creates a new buffer from an encoded message (i.e. the two message-type bytes followed by
/// the message contents).
///
Expand Down
59 changes: 34 additions & 25 deletions lightning/src/ln/peer_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -718,19 +718,11 @@ enum MessageBatchImpl {
CommitmentSigned(Vec<msgs::CommitmentSigned>),
}

/// The ratio between buffer sizes at which we stop sending initial sync messages vs when we stop
/// forwarding gossip messages to peers altogether.
const FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO: usize = 2;

/// When the outbound buffer has this many messages, we'll stop reading bytes from the peer until
/// we have fewer than this many messages in the outbound buffer again.
/// We also use this as the target number of outbound gossip messages to keep in the write buffer,
/// refilled as we send bytes.
const OUTBOUND_BUFFER_LIMIT_READ_PAUSE: usize = 12;
/// When the outbound buffer has this many messages, we'll simply skip relaying gossip messages to
/// the peer.
const OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP: usize =
OUTBOUND_BUFFER_LIMIT_READ_PAUSE * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO;

/// If we've sent a ping, and are still awaiting a response, we may need to churn our way through
/// the socket receive buffer before receiving the ping.
Expand All @@ -754,10 +746,20 @@ const MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER: i8 = 4;
/// process before the next ping.
///
/// Note that we continue responding to other messages even after we've sent this many messages, so
/// it's more of a general guideline used for gossip backfill (and gossip forwarding, times
/// [`FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO`]) than a hard limit.
/// this really limits gossip broadcast, gossip backfill, and onion message relay.
const BUFFER_DRAIN_MSGS_PER_TICK: usize = 32;

/// The maximum number of bytes which we allow in a peer's outbound buffers before we start
/// dropping outbound gossip forwards.
///
/// This is currently 128KiB, or two messages at the maximum message size (though in practice we
/// refuse to forward gossip messages which are substantially larger than we expect, so this is
/// closer to ~85 messages if all queued messages are maximum-sized channel announcements).
///
/// Note that as we always drain the gossip forwarding queue before continuing gossip backfill,
/// the equivalent maximum buffer size for gossip backfill is zero.
const OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP: usize = 64 * 1024 * 2;

struct Peer {
channel_encryptor: PeerChannelEncryptor,
/// We cache a `NodeId` here to avoid serializing peers' keys every time we forward gossip
Expand Down Expand Up @@ -889,12 +891,11 @@ impl Peer {

/// Returns whether this peer's outbound buffers are full and we should drop gossip broadcasts.
fn buffer_full_drop_gossip_broadcast(&self) -> bool {
let total_outbound_buffered =
self.gossip_broadcast_buffer.len() + self.pending_outbound_buffer.len();
let total_outbound_buffered: usize =
self.gossip_broadcast_buffer.iter().map(|m| m.capacity()).sum::<usize>()
+ self.pending_outbound_buffer.iter().map(|m| m.capacity()).sum::<usize>();

total_outbound_buffered > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
|| self.msgs_sent_since_pong
> BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
total_outbound_buffered > OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP
}

fn set_their_node_id(&mut self, node_id: PublicKey) {
Expand Down Expand Up @@ -4693,22 +4694,27 @@ mod tests {
let secp_ctx = Secp256k1::new();
let key = SecretKey::from_slice(&[1; 32]).unwrap();
let msg = channel_announcement(&key, &key, ChannelFeatures::empty(), 42, &secp_ctx);
// The message bufer size is the message length plus two 16-byte MACs plus a 2-byte length
// and 2-byte type.
let encoded_size = msg.serialized_length() + 16 * 2 + 2 + 2;
let msg_ev = MessageSendEvent::BroadcastChannelAnnouncement { msg, update_msg: None };

fd_a.hang_writes.store(true, Ordering::Relaxed);

// Now push an arbitrarily large number of messages and check that only
// `OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP` messages end up in the queue.
for _ in 0..OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP * 2 {
// `OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP` message bytes end up in the queue.
for _ in 0..OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP / encoded_size {
cfgs[0].routing_handler.pending_events.lock().unwrap().push(msg_ev.clone());
peers[0].process_events();
}
Comment on lines +4706 to 4709
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should have raised this earlier sorry, looks like increasing the number of iterations here breaks the test is that expected ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, absolutely. This loop pushes exactly the new buffer limit worth of messages into the gossip_broadcast_buffer and then we test the correct handling of that later in the test.


{
let peer_a_lock = peers[0].peers.read().unwrap();
let buf_len =
peer_a_lock.get(&fd_a).unwrap().lock().unwrap().gossip_broadcast_buffer.len();
assert_eq!(buf_len, OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP);
let peer = peer_a_lock.get(&fd_a).unwrap().lock().unwrap();
let buf_len = peer.pending_outbound_buffer.iter().map(|m| m.capacity()).sum::<usize>()
+ peer.gossip_broadcast_buffer.iter().map(|m| m.capacity()).sum::<usize>();
assert!(buf_len > OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP - encoded_size);
assert!(buf_len < OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP);
}

// Check that if a broadcast message comes in from the channel handler (i.e. it is an
Expand All @@ -4718,14 +4724,17 @@ mod tests {

{
let peer_a_lock = peers[0].peers.read().unwrap();
let buf_len =
peer_a_lock.get(&fd_a).unwrap().lock().unwrap().gossip_broadcast_buffer.len();
assert_eq!(buf_len, OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP + 1);
let peer = peer_a_lock.get(&fd_a).unwrap().lock().unwrap();
let buf_len = peer.pending_outbound_buffer.iter().map(|m| m.capacity()).sum::<usize>()
+ peer.gossip_broadcast_buffer.iter().map(|m| m.capacity()).sum::<usize>();
assert!(buf_len > OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP);
assert!(buf_len < OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP + encoded_size);
}

// Finally, deliver all the messages and make sure we got the right count. Note that there
// was an extra message that had already moved from the broadcast queue to the encrypted
// message queue so we actually receive `OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP + 2` messages.
// message queue so we actually receive `OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP + 2`
// message bytes.
fd_a.hang_writes.store(false, Ordering::Relaxed);
cfgs[1].routing_handler.chan_anns_recvd.store(0, Ordering::Relaxed);
peers[0].write_buffer_space_avail(&mut fd_a).unwrap();
Expand All @@ -4740,7 +4749,7 @@ mod tests {

assert_eq!(
cfgs[1].routing_handler.chan_anns_recvd.load(Ordering::Relaxed),
OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP + 2
OUTBOUND_BUFFER_SIZE_LIMIT_DROP_GOSSIP / encoded_size + 1
);
}

Expand Down
Loading