@@ -378,6 +378,31 @@ impl Peer {
378378 InitSyncTracker :: NodesSyncing ( pk) => pk < node_id,
379379 }
380380 }
381+
382+ /// Returns the number of gossip messages we can fit in this peer's buffer.
383+ fn gossip_buffer_slots_available ( & self ) -> usize {
384+ OUTBOUND_BUFFER_LIMIT_READ_PAUSE . saturating_sub ( self . pending_outbound_buffer . len ( ) )
385+ }
386+
387+ /// Returns whether we should be reading bytes from this peer, based on whether its outbound
388+ /// buffer still has space and we don't need to pause reads to get some writes out.
389+ fn should_read ( & self ) -> bool {
390+ self . pending_outbound_buffer . len ( ) < OUTBOUND_BUFFER_LIMIT_READ_PAUSE
391+ }
392+
393+ fn should_backfill_gossip ( & self ) -> bool {
394+ self . pending_outbound_buffer . len ( ) < OUTBOUND_BUFFER_LIMIT_READ_PAUSE &&
395+ self . msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
396+ }
397+
398+ /// Returns whether this peer's buffer is full and we should drop gossip messages.
399+ fn buffer_full_drop_gossip ( & self ) -> bool {
400+ if self . pending_outbound_buffer . len ( ) > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
401+ || self . msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO {
402+ return false
403+ }
404+ true
405+ }
381406}
382407
383408/// SimpleArcPeerManager is useful when you need a PeerManager with a static lifetime, e.g.
@@ -710,11 +735,11 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
710735
711736 fn do_attempt_write_data ( & self , descriptor : & mut Descriptor , peer : & mut Peer ) {
712737 while !peer. awaiting_write_event {
713- if peer. pending_outbound_buffer . len ( ) < OUTBOUND_BUFFER_LIMIT_READ_PAUSE && peer . msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK {
738+ if peer. should_backfill_gossip ( ) {
714739 match peer. sync_status {
715740 InitSyncTracker :: NoSyncRequested => { } ,
716741 InitSyncTracker :: ChannelsSyncing ( c) if c < 0xffff_ffff_ffff_ffff => {
717- let steps = ( ( OUTBOUND_BUFFER_LIMIT_READ_PAUSE - peer. pending_outbound_buffer . len ( ) + 2 ) / 3 ) as u8 ;
742+ let steps = ( ( peer. gossip_buffer_slots_available ( ) + 2 ) / 3 ) as u8 ;
718743 let all_messages = self . message_handler . route_handler . get_next_channel_announcements ( c, steps) ;
719744 for & ( ref announce, ref update_a_option, ref update_b_option) in all_messages. iter ( ) {
720745 self . enqueue_message ( peer, announce) ;
@@ -731,7 +756,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
731756 }
732757 } ,
733758 InitSyncTracker :: ChannelsSyncing ( c) if c == 0xffff_ffff_ffff_ffff => {
734- let steps = ( OUTBOUND_BUFFER_LIMIT_READ_PAUSE - peer. pending_outbound_buffer . len ( ) ) as u8 ;
759+ let steps = peer. gossip_buffer_slots_available ( ) as u8 ;
735760 let all_messages = self . message_handler . route_handler . get_next_node_announcements ( None , steps) ;
736761 for msg in all_messages. iter ( ) {
737762 self . enqueue_message ( peer, msg) ;
@@ -743,7 +768,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
743768 } ,
744769 InitSyncTracker :: ChannelsSyncing ( _) => unreachable ! ( ) ,
745770 InitSyncTracker :: NodesSyncing ( key) => {
746- let steps = ( OUTBOUND_BUFFER_LIMIT_READ_PAUSE - peer. pending_outbound_buffer . len ( ) ) as u8 ;
771+ let steps = peer. gossip_buffer_slots_available ( ) as u8 ;
747772 let all_messages = self . message_handler . route_handler . get_next_node_announcements ( Some ( & key) , steps) ;
748773 for msg in all_messages. iter ( ) {
749774 self . enqueue_message ( peer, msg) ;
@@ -765,9 +790,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
765790 Some ( buff) => buff,
766791 } ;
767792
768- let should_be_reading = peer. pending_outbound_buffer . len ( ) < OUTBOUND_BUFFER_LIMIT_READ_PAUSE ;
769793 let pending = & next_buff[ peer. pending_outbound_buffer_first_msg_offset ..] ;
770- let data_sent = descriptor. send_data ( pending, should_be_reading ) ;
794+ let data_sent = descriptor. send_data ( pending, peer . should_read ( ) ) ;
771795 peer. pending_outbound_buffer_first_msg_offset += data_sent;
772796 if peer. pending_outbound_buffer_first_msg_offset == next_buff. len ( ) { true } else { false }
773797 } {
@@ -1045,7 +1069,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
10451069 }
10461070 }
10471071 }
1048- pause_read = peer. pending_outbound_buffer . len ( ) > OUTBOUND_BUFFER_LIMIT_READ_PAUSE ;
1072+ pause_read = ! peer. should_read ( ) ;
10491073
10501074 if let Some ( message) = msg_to_handle {
10511075 match self . handle_message ( & peer_mutex, peer_lock, message) {
@@ -1308,9 +1332,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
13081332 !peer. should_forward_channel_announcement ( msg. contents . short_channel_id ) {
13091333 continue
13101334 }
1311- if peer. pending_outbound_buffer . len ( ) > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
1312- || peer. msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
1313- {
1335+ if peer. buffer_full_drop_gossip ( ) {
13141336 log_gossip ! ( self . logger, "Skipping broadcast message to {:?} as its outbound buffer is full" , peer. their_node_id) ;
13151337 continue ;
13161338 }
@@ -1334,9 +1356,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
13341356 !peer. should_forward_node_announcement ( msg. contents . node_id ) {
13351357 continue
13361358 }
1337- if peer. pending_outbound_buffer . len ( ) > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
1338- || peer. msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
1339- {
1359+ if peer. buffer_full_drop_gossip ( ) {
13401360 log_gossip ! ( self . logger, "Skipping broadcast message to {:?} as its outbound buffer is full" , peer. their_node_id) ;
13411361 continue ;
13421362 }
@@ -1359,9 +1379,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
13591379 !peer. should_forward_channel_announcement ( msg. contents . short_channel_id ) {
13601380 continue
13611381 }
1362- if peer. pending_outbound_buffer . len ( ) > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
1363- || peer. msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
1364- {
1382+ if peer. buffer_full_drop_gossip ( ) {
13651383 log_gossip ! ( self . logger, "Skipping broadcast message to {:?} as its outbound buffer is full" , peer. their_node_id) ;
13661384 continue ;
13671385 }
0 commit comments