Skip to content

Commit 758561d

Browse files
WIP: separate gossip broadcasts into their own queue in PeerManager
This allows us to better prioritize channel messages over gossip broadcasts and lays groundwork for rate limiting onion messages more simply.
1 parent f47258f commit 758561d

File tree

1 file changed

+47
-22
lines changed

1 file changed

+47
-22
lines changed

lightning/src/ln/peer_handler.rs

Lines changed: 47 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,8 @@ struct Peer {
337337

338338
pending_outbound_buffer: LinkedList<Vec<u8>>,
339339
pending_outbound_buffer_first_msg_offset: usize,
340+
gossip_broadcast_buffer: LinkedList<Vec<u8>>,
341+
gossip_broadcast_buffer_first_msg_offset: usize,
340342
awaiting_write_event: bool,
341343

342344
pending_read_buffer: Vec<u8>,
@@ -393,14 +395,18 @@ impl Peer {
393395
/// backfilling gossip data to the peer. This is checked every time the peer's buffer may have
394396
/// been drained.
395397
fn should_buffer_gossip_backfill(&self) -> bool {
396-
self.pending_outbound_buffer.is_empty() &&
397-
self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
398+
self.pending_outbound_buffer.is_empty() && !self.buffer_full_drop_gossip_broadcast() // TODO: are broadcasts higher priority than background sync? seems like maybe they should be
399+
&& self.msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
398400
}
399401

400-
/// Returns whether this peer's buffer is full and we should drop gossip messages.
402+
/// Returns whether this peer's outbound buffers are full and we should drop gossip broadcasts.
401403
fn buffer_full_drop_gossip_broadcast(&self) -> bool {
402-
if self.pending_outbound_buffer.len() > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
403-
|| self.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO {
404+
let total_outbound_buffered =
405+
self.gossip_broadcast_buffer.len() + self.pending_outbound_buffer.len();
406+
407+
if total_outbound_buffered > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP ||
408+
self.msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
409+
{
404410
return true
405411
}
406412
false
@@ -671,6 +677,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
671677

672678
pending_outbound_buffer: LinkedList::new(),
673679
pending_outbound_buffer_first_msg_offset: 0,
680+
gossip_broadcast_buffer: LinkedList::new(),
681+
gossip_broadcast_buffer_first_msg_offset: 0,
674682
awaiting_write_event: false,
675683

676684
pending_read_buffer,
@@ -717,6 +725,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
717725

718726
pending_outbound_buffer: LinkedList::new(),
719727
pending_outbound_buffer_first_msg_offset: 0,
728+
gossip_broadcast_buffer: LinkedList::new(),
729+
gossip_broadcast_buffer_first_msg_offset: 0,
720730
awaiting_write_event: false,
721731

722732
pending_read_buffer,
@@ -779,20 +789,29 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
779789
self.maybe_send_extra_ping(peer);
780790
}
781791

782-
let next_buff = match peer.pending_outbound_buffer.front() {
783-
None => return,
784-
Some(buff) => buff,
785-
};
786-
787-
let pending = &next_buff[peer.pending_outbound_buffer_first_msg_offset..];
788-
let data_sent = descriptor.send_data(pending, peer.should_read());
789-
peer.pending_outbound_buffer_first_msg_offset += data_sent;
790-
if peer.pending_outbound_buffer_first_msg_offset == next_buff.len() {
791-
peer.pending_outbound_buffer_first_msg_offset = 0;
792-
peer.pending_outbound_buffer.pop_front();
793-
} else {
794-
peer.awaiting_write_event = true;
795-
}
792+
// TODO: break into methods in `peer` or somehow DRY
793+
// We prioritize channel messages over gossip broadcasts
794+
if let Some(next_buff) = peer.pending_outbound_buffer.front() {
795+
let pending = &next_buff[peer.pending_outbound_buffer_first_msg_offset..];
796+
let data_sent = descriptor.send_data(pending, peer.should_read());
797+
peer.pending_outbound_buffer_first_msg_offset += data_sent;
798+
if peer.pending_outbound_buffer_first_msg_offset == next_buff.len() {
799+
peer.pending_outbound_buffer_first_msg_offset = 0;
800+
peer.pending_outbound_buffer.pop_front();
801+
} else {
802+
peer.awaiting_write_event = true;
803+
}
804+
} else if let Some(next_buff) = peer.gossip_broadcast_buffer.front() {
805+
let pending = &next_buff[peer.gossip_broadcast_buffer_first_msg_offset..];
806+
let data_sent = descriptor.send_data(pending, peer.should_read());
807+
peer.gossip_broadcast_buffer_first_msg_offset += data_sent;
808+
if peer.gossip_broadcast_buffer_first_msg_offset == next_buff.len() {
809+
peer.gossip_broadcast_buffer_first_msg_offset = 0;
810+
peer.gossip_broadcast_buffer.pop_front();
811+
} else {
812+
peer.awaiting_write_event = true;
813+
}
814+
} else { return }
796815
}
797816
}
798817

@@ -857,6 +876,12 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
857876
peer.pending_outbound_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_message[..]));
858877
}
859878

879+
/// Append a message to a peer's pending outbound/write gossip broadcast buffer
880+
fn enqueue_encoded_gossip_broadcast(&self, peer: &mut Peer, encoded_message: &Vec<u8>) {
881+
peer.msgs_sent_since_pong += 1;
882+
peer.gossip_broadcast_buffer.push_back(peer.channel_encryptor.encrypt_message(&encoded_message[..]));
883+
}
884+
860885
/// Append a message to a peer's pending outbound/write buffer
861886
fn enqueue_message<M: wire::Type>(&self, peer: &mut Peer, message: &M) {
862887
let mut buffer = VecWriter(Vec::with_capacity(2048));
@@ -1336,7 +1361,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
13361361
if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
13371362
continue;
13381363
}
1339-
self.enqueue_encoded_message(&mut *peer, &encoded_msg);
1364+
self.enqueue_encoded_gossip_broadcast(&mut *peer, &encoded_msg);
13401365
}
13411366
},
13421367
wire::Message::NodeAnnouncement(ref msg) => {
@@ -1359,7 +1384,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
13591384
if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
13601385
continue;
13611386
}
1362-
self.enqueue_encoded_message(&mut *peer, &encoded_msg);
1387+
self.enqueue_encoded_gossip_broadcast(&mut *peer, &encoded_msg);
13631388
}
13641389
},
13651390
wire::Message::ChannelUpdate(ref msg) => {
@@ -1379,7 +1404,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
13791404
if except_node.is_some() && peer.their_node_id.as_ref() == except_node {
13801405
continue;
13811406
}
1382-
self.enqueue_encoded_message(&mut *peer, &encoded_msg);
1407+
self.enqueue_encoded_gossip_broadcast(&mut *peer, &encoded_msg);
13831408
}
13841409
},
13851410
_ => debug_assert!(false, "We shouldn't attempt to forward anything but gossip messages"),

0 commit comments

Comments
 (0)