@@ -337,6 +337,9 @@ struct Peer {
337337
338338 pending_outbound_buffer : LinkedList < Vec < u8 > > ,
339339 pending_outbound_buffer_first_msg_offset : usize ,
340+ // Queue gossip broadcasts separately from `pending_outbound_buffer` so we can easily prioritize
341+ // channel messages over them.
342+ gossip_broadcast_buffer : LinkedList < Vec < u8 > > ,
340343 awaiting_write_event : bool ,
341344
342345 pending_read_buffer : Vec < u8 > ,
@@ -389,17 +392,26 @@ impl Peer {
389392 self . pending_outbound_buffer . len ( ) < OUTBOUND_BUFFER_LIMIT_READ_PAUSE
390393 }
391394
392- /// Determines if we should push additional gossip messages onto a peer's outbound buffer for
393- /// backfilling gossip data to the peer. This is checked every time the peer's buffer may have
394- /// been drained.
395+ /// Determines if we should push additional gossip background sync (aka "backfill") onto a peer's
396+ /// outbound buffer. This is checked every time the peer's buffer may have been drained.
395397 fn should_buffer_gossip_backfill ( & self ) -> bool {
396- self . pending_outbound_buffer . is_empty ( ) &&
397- self . msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
398+ self . pending_outbound_buffer . is_empty ( ) && self . gossip_broadcast_buffer . is_empty ( )
399+ && self . msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
398400 }
399401
400- /// Returns whether this peer's buffer is full and we should drop gossip messages.
402+ /// Determines if we should push additional gossip broadcast messages onto a peer's outbound
403+ /// buffer. This is checked every time the peer's buffer may have been drained.
404+ fn should_buffer_gossip_broadcast ( & self ) -> bool {
405+ self . pending_outbound_buffer . is_empty ( )
406+ && self . msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
407+ }
408+
409+ /// Returns whether this peer's outbound buffers are full and we should drop gossip broadcasts.
401410 fn buffer_full_drop_gossip_broadcast ( & self ) -> bool {
402- self . pending_outbound_buffer . len ( ) > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP ||
411+ let total_outbound_buffered =
412+ self . gossip_broadcast_buffer . len ( ) + self . pending_outbound_buffer . len ( ) ;
413+
414+ total_outbound_buffered > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP ||
403415 self . msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
404416 }
405417}
@@ -668,6 +680,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
668680
669681 pending_outbound_buffer : LinkedList :: new ( ) ,
670682 pending_outbound_buffer_first_msg_offset : 0 ,
683+ gossip_broadcast_buffer : LinkedList :: new ( ) ,
671684 awaiting_write_event : false ,
672685
673686 pending_read_buffer,
@@ -714,6 +727,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
714727
715728 pending_outbound_buffer : LinkedList :: new ( ) ,
716729 pending_outbound_buffer_first_msg_offset : 0 ,
730+ gossip_broadcast_buffer : LinkedList :: new ( ) ,
717731 awaiting_write_event : false ,
718732
719733 pending_read_buffer,
@@ -734,6 +748,11 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
734748
735749 fn do_attempt_write_data ( & self , descriptor : & mut Descriptor , peer : & mut Peer ) {
736750 while !peer. awaiting_write_event {
751+ if peer. should_buffer_gossip_broadcast ( ) {
752+ if let Some ( msg) = peer. gossip_broadcast_buffer . pop_front ( ) {
753+ peer. pending_outbound_buffer . push_back ( msg) ;
754+ }
755+ }
737756 if peer. should_buffer_gossip_backfill ( ) {
738757 match peer. sync_status {
739758 InitSyncTracker :: NoSyncRequested => { } ,
@@ -848,12 +867,6 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
848867 }
849868 }
850869
851- /// Append a message to a peer's pending outbound/write buffer
852- fn enqueue_encoded_message ( & self , peer : & mut Peer , encoded_message : & Vec < u8 > ) {
853- peer. msgs_sent_since_pong += 1 ;
854- peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encoded_message[ ..] ) ) ;
855- }
856-
857870 /// Append a message to a peer's pending outbound/write buffer
858871 fn enqueue_message < M : wire:: Type > ( & self , peer : & mut Peer , message : & M ) {
859872 let mut buffer = VecWriter ( Vec :: with_capacity ( 2048 ) ) ;
@@ -864,7 +877,14 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
864877 } else {
865878 log_trace ! ( self . logger, "Enqueueing message {:?} to {}" , message, log_pubkey!( peer. their_node_id. unwrap( ) ) )
866879 }
867- self . enqueue_encoded_message ( peer, & buffer. 0 ) ;
880+ peer. msgs_sent_since_pong += 1 ;
881+ peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & buffer. 0 [ ..] ) ) ;
882+ }
883+
884+ /// Append a message to a peer's pending outbound/write gossip broadcast buffer
885+ fn enqueue_encoded_gossip_broadcast ( & self , peer : & mut Peer , encoded_message : & Vec < u8 > ) {
886+ peer. msgs_sent_since_pong += 1 ;
887+ peer. gossip_broadcast_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encoded_message[ ..] ) ) ;
868888 }
869889
870890 fn do_read_event ( & self , peer_descriptor : & mut Descriptor , data : & [ u8 ] ) -> Result < bool , PeerHandleError > {
@@ -1333,7 +1353,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
13331353 if except_node. is_some ( ) && peer. their_node_id . as_ref ( ) == except_node {
13341354 continue ;
13351355 }
1336- self . enqueue_encoded_message ( & mut * peer, & encoded_msg) ;
1356+ self . enqueue_encoded_gossip_broadcast ( & mut * peer, & encoded_msg) ;
13371357 }
13381358 } ,
13391359 wire:: Message :: NodeAnnouncement ( ref msg) => {
@@ -1356,7 +1376,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
13561376 if except_node. is_some ( ) && peer. their_node_id . as_ref ( ) == except_node {
13571377 continue ;
13581378 }
1359- self . enqueue_encoded_message ( & mut * peer, & encoded_msg) ;
1379+ self . enqueue_encoded_gossip_broadcast ( & mut * peer, & encoded_msg) ;
13601380 }
13611381 } ,
13621382 wire:: Message :: ChannelUpdate ( ref msg) => {
@@ -1376,7 +1396,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
13761396 if except_node. is_some ( ) && peer. their_node_id . as_ref ( ) == except_node {
13771397 continue ;
13781398 }
1379- self . enqueue_encoded_message ( & mut * peer, & encoded_msg) ;
1399+ self . enqueue_encoded_gossip_broadcast ( & mut * peer, & encoded_msg) ;
13801400 }
13811401 } ,
13821402 _ => debug_assert ! ( false , "We shouldn't attempt to forward anything but gossip messages" ) ,
0 commit comments