@@ -337,6 +337,9 @@ struct Peer {
337337
338338 pending_outbound_buffer : LinkedList < Vec < u8 > > ,
339339 pending_outbound_buffer_first_msg_offset : usize ,
340+ // Queue gossip broadcasts separately from `pending_outbound_buffer` so we can easily prioritize
341+ // channel messages over them.
342+ gossip_broadcast_buffer : LinkedList < Vec < u8 > > ,
340343 awaiting_write_event : bool ,
341344
342345 pending_read_buffer : Vec < u8 > ,
@@ -389,21 +392,27 @@ impl Peer {
389392 self . pending_outbound_buffer . len ( ) < OUTBOUND_BUFFER_LIMIT_READ_PAUSE
390393 }
391394
392- /// Determines if we should push additional gossip messages onto a peer's outbound buffer for
393- /// backfilling gossip data to the peer. This is checked every time the peer's buffer may have
394- /// been drained.
395+ /// Determines if we should push additional gossip background sync (aka "backfill") onto a peer's
396+ /// outbound buffer. This is checked every time the peer's buffer may have been drained.
395397 fn should_buffer_gossip_backfill ( & self ) -> bool {
396- self . pending_outbound_buffer . is_empty ( ) &&
397- self . msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
398+ self . pending_outbound_buffer . is_empty ( ) && self . gossip_broadcast_buffer . is_empty ( )
399+ && self . msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
398400 }
399401
400- /// Returns whether this peer's buffer is full and we should drop gossip messages.
401- fn buffer_full_drop_gossip ( & self ) -> bool {
402- if self . pending_outbound_buffer . len ( ) > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
403- || self . msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO {
404- return false
405- }
406- true
402+ /// Determines if we should push additional gossip broadcast messages onto a peer's outbound
403+ /// buffer. This is checked every time the peer's buffer may have been drained.
404+ fn should_buffer_gossip_broadcast ( & self ) -> bool {
405+ self . pending_outbound_buffer . is_empty ( )
406+ && self . msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
407+ }
408+
409+ /// Returns whether this peer's outbound buffers are full and we should drop gossip broadcasts.
410+ fn buffer_full_drop_gossip_broadcast ( & self ) -> bool {
411+ let total_outbound_buffered =
412+ self . gossip_broadcast_buffer . len ( ) + self . pending_outbound_buffer . len ( ) ;
413+
414+ total_outbound_buffered > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP ||
415+ self . msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
407416 }
408417}
409418
@@ -671,6 +680,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
671680
672681 pending_outbound_buffer : LinkedList :: new ( ) ,
673682 pending_outbound_buffer_first_msg_offset : 0 ,
683+ gossip_broadcast_buffer : LinkedList :: new ( ) ,
674684 awaiting_write_event : false ,
675685
676686 pending_read_buffer,
@@ -717,6 +727,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
717727
718728 pending_outbound_buffer : LinkedList :: new ( ) ,
719729 pending_outbound_buffer_first_msg_offset : 0 ,
730+ gossip_broadcast_buffer : LinkedList :: new ( ) ,
720731 awaiting_write_event : false ,
721732
722733 pending_read_buffer,
@@ -737,6 +748,11 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
737748
738749 fn do_attempt_write_data ( & self , descriptor : & mut Descriptor , peer : & mut Peer ) {
739750 while !peer. awaiting_write_event {
751+ if peer. should_buffer_gossip_broadcast ( ) {
752+ if let Some ( msg) = peer. gossip_broadcast_buffer . pop_front ( ) {
753+ peer. pending_outbound_buffer . push_back ( msg) ;
754+ }
755+ }
740756 if peer. should_buffer_gossip_backfill ( ) {
741757 match peer. sync_status {
742758 InitSyncTracker :: NoSyncRequested => { } ,
@@ -851,12 +867,6 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
851867 }
852868 }
853869
854- /// Append a message to a peer's pending outbound/write buffer
855- fn enqueue_encoded_message ( & self , peer : & mut Peer , encoded_message : & Vec < u8 > ) {
856- peer. msgs_sent_since_pong += 1 ;
857- peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encoded_message[ ..] ) ) ;
858- }
859-
860870 /// Append a message to a peer's pending outbound/write buffer
861871 fn enqueue_message < M : wire:: Type > ( & self , peer : & mut Peer , message : & M ) {
862872 let mut buffer = VecWriter ( Vec :: with_capacity ( 2048 ) ) ;
@@ -867,7 +877,14 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
867877 } else {
868878 log_trace ! ( self . logger, "Enqueueing message {:?} to {}" , message, log_pubkey!( peer. their_node_id. unwrap( ) ) )
869879 }
870- self . enqueue_encoded_message ( peer, & buffer. 0 ) ;
880+ peer. msgs_sent_since_pong += 1 ;
881+ peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & buffer. 0 [ ..] ) ) ;
882+ }
883+
884+ /// Append a message to a peer's pending outbound/write gossip broadcast buffer
885+ fn enqueue_encoded_gossip_broadcast ( & self , peer : & mut Peer , encoded_message : & Vec < u8 > ) {
886+ peer. msgs_sent_since_pong += 1 ;
887+ peer. gossip_broadcast_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encoded_message[ ..] ) ) ;
871888 }
872889
873890 fn do_read_event ( & self , peer_descriptor : & mut Descriptor , data : & [ u8 ] ) -> Result < bool , PeerHandleError > {
@@ -1325,7 +1342,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
13251342 !peer. should_forward_channel_announcement ( msg. contents . short_channel_id ) {
13261343 continue
13271344 }
1328- if peer. buffer_full_drop_gossip ( ) {
1345+ if peer. buffer_full_drop_gossip_broadcast ( ) {
13291346 log_gossip ! ( self . logger, "Skipping broadcast message to {:?} as its outbound buffer is full" , peer. their_node_id) ;
13301347 continue ;
13311348 }
@@ -1336,7 +1353,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
13361353 if except_node. is_some ( ) && peer. their_node_id . as_ref ( ) == except_node {
13371354 continue ;
13381355 }
1339- self . enqueue_encoded_message ( & mut * peer, & encoded_msg) ;
1356+ self . enqueue_encoded_gossip_broadcast ( & mut * peer, & encoded_msg) ;
13401357 }
13411358 } ,
13421359 wire:: Message :: NodeAnnouncement ( ref msg) => {
@@ -1349,7 +1366,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
13491366 !peer. should_forward_node_announcement ( msg. contents . node_id ) {
13501367 continue
13511368 }
1352- if peer. buffer_full_drop_gossip ( ) {
1369+ if peer. buffer_full_drop_gossip_broadcast ( ) {
13531370 log_gossip ! ( self . logger, "Skipping broadcast message to {:?} as its outbound buffer is full" , peer. their_node_id) ;
13541371 continue ;
13551372 }
@@ -1359,7 +1376,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
13591376 if except_node. is_some ( ) && peer. their_node_id . as_ref ( ) == except_node {
13601377 continue ;
13611378 }
1362- self . enqueue_encoded_message ( & mut * peer, & encoded_msg) ;
1379+ self . enqueue_encoded_gossip_broadcast ( & mut * peer, & encoded_msg) ;
13631380 }
13641381 } ,
13651382 wire:: Message :: ChannelUpdate ( ref msg) => {
@@ -1372,14 +1389,14 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
13721389 !peer. should_forward_channel_announcement ( msg. contents . short_channel_id ) {
13731390 continue
13741391 }
1375- if peer. buffer_full_drop_gossip ( ) {
1392+ if peer. buffer_full_drop_gossip_broadcast ( ) {
13761393 log_gossip ! ( self . logger, "Skipping broadcast message to {:?} as its outbound buffer is full" , peer. their_node_id) ;
13771394 continue ;
13781395 }
13791396 if except_node. is_some ( ) && peer. their_node_id . as_ref ( ) == except_node {
13801397 continue ;
13811398 }
1382- self . enqueue_encoded_message ( & mut * peer, & encoded_msg) ;
1399+ self . enqueue_encoded_gossip_broadcast ( & mut * peer, & encoded_msg) ;
13831400 }
13841401 } ,
13851402 _ => debug_assert ! ( false , "We shouldn't attempt to forward anything but gossip messages" ) ,
0 commit comments