Skip to content

Commit 91b113d

Browse files
Separate gossip broadcasts into their own queue in PeerManager
This allows us to better prioritize channel messages over gossip broadcasts and lays groundwork for rate limiting onion messages more simply, since they won't be competing with gossip broadcasts for space in the main message queue.
1 parent bf00e7e commit 91b113d

File tree

1 file changed

+33
-10
lines changed

1 file changed

+33
-10
lines changed

lightning/src/ln/peer_handler.rs

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,7 @@ struct Peer {
337337

338338
pending_outbound_buffer: LinkedList<Vec<u8>>,
339339
pending_outbound_buffer_first_msg_offset: usize,
340+
gossip_broadcast_buffer: LinkedList<Vec<u8>>,
340341
awaiting_write_event: bool,
341342

342343
pending_read_buffer: Vec<u8>,
@@ -389,17 +390,26 @@ impl Peer {
389390
self.pending_outbound_buffer.len() < OUTBOUND_BUFFER_LIMIT_READ_PAUSE
390391
}
391392

392-
/// Determines if we should push additional gossip messages onto a peer's outbound buffer for
393-
/// backfilling gossip data to the peer. This is checked every time the peer's buffer may have
394-
/// been drained.
393+
/// Determines if we should push additional gossip background sync (aka "backfill") onto a peer's
394+
/// outbound buffer. This is checked every time the peer's buffer may have been drained.
395395
fn should_buffer_gossip_backfill(&self) -> bool {
396-
self.pending_outbound_buffer.is_empty() &&
397-
self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
396+
self.pending_outbound_buffer.is_empty() && self.gossip_broadcast_buffer.is_empty()
397+
&& self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
398398
}
399399

400-
/// Returns whether this peer's buffer is full and we should drop gossip messages.
400+
/// Determines if we should push additional gossip broadcast messages onto a peer's outbound
401+
/// buffer. This is checked every time the peer's buffer may have been drained.
402+
fn should_buffer_gossip_broadcast(&self) -> bool {
403+
self.pending_outbound_buffer.is_empty()
404+
&& self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
405+
}
406+
407+
/// Returns whether this peer's outbound buffers are full and we should drop gossip broadcasts.
401408
fn buffer_full_drop_gossip_broadcast(&self) -> bool {
402-
self.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP ||
409+
let total_outbound_buffered =
410+
self.gossip_broadcast_buffer.len() + self.pending_outbound_buffer.len();
411+
412+
total_outbound_buffered > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP ||
403413
self.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
404414
}
405415
}
@@ -668,6 +678,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
668678

669679
pending_outbound_buffer: LinkedList::new(),
670680
pending_outbound_buffer_first_msg_offset: 0,
681+
gossip_broadcast_buffer: LinkedList::new(),
671682
awaiting_write_event: false,
672683

673684
pending_read_buffer,
@@ -714,6 +725,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
714725

715726
pending_outbound_buffer: LinkedList::new(),
716727
pending_outbound_buffer_first_msg_offset: 0,
728+
gossip_broadcast_buffer: LinkedList::new(),
717729
awaiting_write_event: false,
718730

719731
pending_read_buffer,
@@ -734,6 +746,11 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
734746

735747
fn do_attempt_write_data(&self, descriptor: &mut Descriptor, peer: &mut Peer) {
736748
while !peer.awaiting_write_event {
749+
if peer.should_buffer_gossip_broadcast() {
750+
if let Some(msg) = peer.gossip_broadcast_buffer.pop_front() {
751+
peer.pending_outbound_buffer.push_back(msg);
752+
}
753+
}
737754
if peer.should_buffer_gossip_backfill() {
738755
match peer.sync_status {
739756
InitSyncTracker::NoSyncRequested => {},
@@ -854,6 +871,12 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
854871
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_message[..]));
855872
}
856873

874+
/// Append a message to a peer's pending outbound/write gossip broadcast buffer
875+
fn enqueue_encoded_gossip_broadcast(&self, peer: &mut Peer, encoded_message: &Vec<u8>) {
876+
peer.msgs_sent_since_pong += 1;
877+
peer.gossip_broadcast_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_message[..]));
878+
}
879+
857880
/// Append a message to a peer's pending outbound/write buffer
858881
fn enqueue_message<M: wire::Type>(&self, peer: &mut Peer, message: &M) {
859882
let mut buffer = VecWriter(Vec::with_capacity(2048));
@@ -1333,7 +1356,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
13331356
if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
13341357
continue;
13351358
}
1336-
self.enqueue_encoded_message(&mut *peer, &encoded_msg);
1359+
self.enqueue_encoded_gossip_broadcast(&mut *peer, &encoded_msg);
13371360
}
13381361
},
13391362
wire::Message::NodeAnnouncement(ref msg) => {
@@ -1356,7 +1379,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
13561379
if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
13571380
continue;
13581381
}
1359-
self.enqueue_encoded_message(&mut *peer, &encoded_msg);
1382+
self.enqueue_encoded_gossip_broadcast(&mut *peer, &encoded_msg);
13601383
}
13611384
},
13621385
wire::Message::ChannelUpdate(ref msg) => {
@@ -1376,7 +1399,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
13761399
if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
13771400
continue;
13781401
}
1379-
self.enqueue_encoded_message(&mut *peer, &encoded_msg);
1402+
self.enqueue_encoded_gossip_broadcast(&mut *peer, &encoded_msg);
13801403
}
13811404
},
13821405
_ => debug_assert!(false, "We shouldn't attempt to forward anything but gossip messages"),

0 commit comments

Comments
 (0)