@@ -67,9 +67,9 @@ impl RoutingMessageHandler for IgnoringMessageHandler {
6767 fn handle_node_announcement ( & self , _msg : & msgs:: NodeAnnouncement ) -> Result < bool , LightningError > { Ok ( false ) }
6868 fn handle_channel_announcement ( & self , _msg : & msgs:: ChannelAnnouncement ) -> Result < bool , LightningError > { Ok ( false ) }
6969 fn handle_channel_update ( & self , _msg : & msgs:: ChannelUpdate ) -> Result < bool , LightningError > { Ok ( false ) }
70- fn get_next_channel_announcements ( & self , _starting_point : u64 , _batch_amount : u8 ) ->
71- Vec < ( msgs:: ChannelAnnouncement , Option < msgs:: ChannelUpdate > , Option < msgs:: ChannelUpdate > ) > { Vec :: new ( ) }
72- fn get_next_node_announcements ( & self , _starting_point : Option < & PublicKey > , _batch_amount : u8 ) -> Vec < msgs:: NodeAnnouncement > { Vec :: new ( ) }
70+ fn get_next_channel_announcement ( & self , _starting_point : u64 ) ->
71+ Option < ( msgs:: ChannelAnnouncement , Option < msgs:: ChannelUpdate > , Option < msgs:: ChannelUpdate > ) > { None }
72+ fn get_next_node_announcement ( & self , _starting_point : Option < & PublicKey > ) -> Option < msgs:: NodeAnnouncement > { None }
7373 fn peer_connected ( & self , _their_node_id : & PublicKey , _init : & msgs:: Init ) { }
7474 fn handle_reply_channel_range ( & self , _their_node_id : & PublicKey , _msg : msgs:: ReplyChannelRange ) -> Result < ( ) , LightningError > { Ok ( ( ) ) }
7575 fn handle_reply_short_channel_ids_end ( & self , _their_node_id : & PublicKey , _msg : msgs:: ReplyShortChannelIdsEnd ) -> Result < ( ) , LightningError > { Ok ( ( ) ) }
@@ -323,6 +323,10 @@ const MAX_BUFFER_DRAIN_TICK_INTERVALS_PER_PEER: i8 = 4;
323323/// tick. Once we have sent this many messages since the last ping, we send a ping right away to
324324/// ensures we don't just fill up our send buffer and leave the peer with too many messages to
325325/// process before the next ping.
326+ ///
327+ /// Note that we continue responding to other messages even after we've sent this many messages, so
328+ /// it's more of a general guideline used for gossip backfill (and gossip forwarding, times
329+ /// [`FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO`]) than a hard limit.
326330const BUFFER_DRAIN_MSGS_PER_TICK : usize = 32 ;
327331
328332struct Peer {
@@ -378,6 +382,29 @@ impl Peer {
378382 InitSyncTracker :: NodesSyncing ( pk) => pk < node_id,
379383 }
380384 }
385+
386+ /// Returns whether we should be reading bytes from this peer, based on whether its outbound
387+ /// buffer still has space and we don't need to pause reads to get some writes out.
388+ fn should_read ( & self ) -> bool {
389+ self . pending_outbound_buffer . len ( ) < OUTBOUND_BUFFER_LIMIT_READ_PAUSE
390+ }
391+
392+ /// Determines if we should push additional gossip messages onto a peer's outbound buffer for
393+ /// backfilling gossip data to the peer. This is checked every time the peer's buffer may have
394+ /// been drained.
395+ fn should_buffer_gossip_backfill ( & self ) -> bool {
396+ self . pending_outbound_buffer . is_empty ( ) &&
397+ self . msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
398+ }
399+
400+ /// Returns whether this peer's buffer is full and we should drop gossip messages.
401+ fn buffer_full_drop_gossip ( & self ) -> bool {
402+ if self . pending_outbound_buffer . len ( ) > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
403+ || self . msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO {
404+ return false
405+ }
406+ true
407+ }
381408}
382409
383410/// SimpleArcPeerManager is useful when you need a PeerManager with a static lifetime, e.g.
@@ -710,46 +737,39 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
710737
711738 fn do_attempt_write_data ( & self , descriptor : & mut Descriptor , peer : & mut Peer ) {
712739 while !peer. awaiting_write_event {
713- if peer. pending_outbound_buffer . len ( ) < OUTBOUND_BUFFER_LIMIT_READ_PAUSE && peer . msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK {
740+ if peer. should_buffer_gossip_backfill ( ) {
714741 match peer. sync_status {
715742 InitSyncTracker :: NoSyncRequested => { } ,
716743 InitSyncTracker :: ChannelsSyncing ( c) if c < 0xffff_ffff_ffff_ffff => {
717- let steps = ( ( OUTBOUND_BUFFER_LIMIT_READ_PAUSE - peer . pending_outbound_buffer . len ( ) + 2 ) / 3 ) as u8 ;
718- let all_messages = self . message_handler . route_handler . get_next_channel_announcements ( c , steps ) ;
719- for & ( ref announce , ref update_a_option , ref update_b_option ) in all_messages . iter ( ) {
720- self . enqueue_message ( peer, announce) ;
721- if let & Some ( ref update_a) = update_a_option {
722- self . enqueue_message ( peer, update_a) ;
744+ if let Some ( ( announce , update_a_option , update_b_option ) ) =
745+ self . message_handler . route_handler . get_next_channel_announcement ( c )
746+ {
747+ self . enqueue_message ( peer, & announce) ;
748+ if let Some ( update_a) = update_a_option {
749+ self . enqueue_message ( peer, & update_a) ;
723750 }
724- if let & Some ( ref update_b) = update_b_option {
725- self . enqueue_message ( peer, update_b) ;
751+ if let Some ( update_b) = update_b_option {
752+ self . enqueue_message ( peer, & update_b) ;
726753 }
727754 peer. sync_status = InitSyncTracker :: ChannelsSyncing ( announce. contents . short_channel_id + 1 ) ;
728- }
729- if all_messages. is_empty ( ) || all_messages. len ( ) != steps as usize {
755+ } else {
730756 peer. sync_status = InitSyncTracker :: ChannelsSyncing ( 0xffff_ffff_ffff_ffff ) ;
731757 }
732758 } ,
733759 InitSyncTracker :: ChannelsSyncing ( c) if c == 0xffff_ffff_ffff_ffff => {
734- let steps = ( OUTBOUND_BUFFER_LIMIT_READ_PAUSE - peer. pending_outbound_buffer . len ( ) ) as u8 ;
735- let all_messages = self . message_handler . route_handler . get_next_node_announcements ( None , steps) ;
736- for msg in all_messages. iter ( ) {
737- self . enqueue_message ( peer, msg) ;
760+ if let Some ( msg) = self . message_handler . route_handler . get_next_node_announcement ( None ) {
761+ self . enqueue_message ( peer, & msg) ;
738762 peer. sync_status = InitSyncTracker :: NodesSyncing ( msg. contents . node_id ) ;
739- }
740- if all_messages. is_empty ( ) || all_messages. len ( ) != steps as usize {
763+ } else {
741764 peer. sync_status = InitSyncTracker :: NoSyncRequested ;
742765 }
743766 } ,
744767 InitSyncTracker :: ChannelsSyncing ( _) => unreachable ! ( ) ,
745768 InitSyncTracker :: NodesSyncing ( key) => {
746- let steps = ( OUTBOUND_BUFFER_LIMIT_READ_PAUSE - peer. pending_outbound_buffer . len ( ) ) as u8 ;
747- let all_messages = self . message_handler . route_handler . get_next_node_announcements ( Some ( & key) , steps) ;
748- for msg in all_messages. iter ( ) {
749- self . enqueue_message ( peer, msg) ;
769+ if let Some ( msg) = self . message_handler . route_handler . get_next_node_announcement ( Some ( & key) ) {
770+ self . enqueue_message ( peer, & msg) ;
750771 peer. sync_status = InitSyncTracker :: NodesSyncing ( msg. contents . node_id ) ;
751- }
752- if all_messages. is_empty ( ) || all_messages. len ( ) != steps as usize {
772+ } else {
753773 peer. sync_status = InitSyncTracker :: NoSyncRequested ;
754774 }
755775 } ,
@@ -759,18 +779,15 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
759779 self . maybe_send_extra_ping ( peer) ;
760780 }
761781
762- if {
763- let next_buff = match peer. pending_outbound_buffer . front ( ) {
764- None => return ,
765- Some ( buff) => buff,
766- } ;
782+ let next_buff = match peer. pending_outbound_buffer . front ( ) {
783+ None => return ,
784+ Some ( buff) => buff,
785+ } ;
767786
768- let should_be_reading = peer. pending_outbound_buffer . len ( ) < OUTBOUND_BUFFER_LIMIT_READ_PAUSE ;
769- let pending = & next_buff[ peer. pending_outbound_buffer_first_msg_offset ..] ;
770- let data_sent = descriptor. send_data ( pending, should_be_reading) ;
771- peer. pending_outbound_buffer_first_msg_offset += data_sent;
772- if peer. pending_outbound_buffer_first_msg_offset == next_buff. len ( ) { true } else { false }
773- } {
787+ let pending = & next_buff[ peer. pending_outbound_buffer_first_msg_offset ..] ;
788+ let data_sent = descriptor. send_data ( pending, peer. should_read ( ) ) ;
789+ peer. pending_outbound_buffer_first_msg_offset += data_sent;
790+ if peer. pending_outbound_buffer_first_msg_offset == next_buff. len ( ) {
774791 peer. pending_outbound_buffer_first_msg_offset = 0 ;
775792 peer. pending_outbound_buffer . pop_front ( ) ;
776793 } else {
@@ -1045,7 +1062,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
10451062 }
10461063 }
10471064 }
1048- pause_read = peer. pending_outbound_buffer . len ( ) > OUTBOUND_BUFFER_LIMIT_READ_PAUSE ;
1065+ pause_read = ! peer. should_read ( ) ;
10491066
10501067 if let Some ( message) = msg_to_handle {
10511068 match self . handle_message ( & peer_mutex, peer_lock, message) {
@@ -1308,9 +1325,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
13081325 !peer. should_forward_channel_announcement ( msg. contents . short_channel_id ) {
13091326 continue
13101327 }
1311- if peer. pending_outbound_buffer . len ( ) > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
1312- || peer. msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
1313- {
1328+ if peer. buffer_full_drop_gossip ( ) {
13141329 log_gossip ! ( self . logger, "Skipping broadcast message to {:?} as its outbound buffer is full" , peer. their_node_id) ;
13151330 continue ;
13161331 }
@@ -1334,9 +1349,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
13341349 !peer. should_forward_node_announcement ( msg. contents . node_id ) {
13351350 continue
13361351 }
1337- if peer. pending_outbound_buffer . len ( ) > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
1338- || peer. msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
1339- {
1352+ if peer. buffer_full_drop_gossip ( ) {
13401353 log_gossip ! ( self . logger, "Skipping broadcast message to {:?} as its outbound buffer is full" , peer. their_node_id) ;
13411354 continue ;
13421355 }
@@ -1359,9 +1372,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
13591372 !peer. should_forward_channel_announcement ( msg. contents . short_channel_id ) {
13601373 continue
13611374 }
1362- if peer. pending_outbound_buffer . len ( ) > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
1363- || peer. msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
1364- {
1375+ if peer. buffer_full_drop_gossip ( ) {
13651376 log_gossip ! ( self . logger, "Skipping broadcast message to {:?} as its outbound buffer is full" , peer. their_node_id) ;
13661377 continue ;
13671378 }
@@ -2060,10 +2071,10 @@ mod tests {
20602071
20612072 // Check that each peer has received the expected number of channel updates and channel
20622073 // announcements.
2063- assert_eq ! ( cfgs[ 0 ] . routing_handler. chan_upds_recvd. load( Ordering :: Acquire ) , 100 ) ;
2064- assert_eq ! ( cfgs[ 0 ] . routing_handler. chan_anns_recvd. load( Ordering :: Acquire ) , 50 ) ;
2065- assert_eq ! ( cfgs[ 1 ] . routing_handler. chan_upds_recvd. load( Ordering :: Acquire ) , 100 ) ;
2066- assert_eq ! ( cfgs[ 1 ] . routing_handler. chan_anns_recvd. load( Ordering :: Acquire ) , 50 ) ;
2074+ assert_eq ! ( cfgs[ 0 ] . routing_handler. chan_upds_recvd. load( Ordering :: Acquire ) , 108 ) ;
2075+ assert_eq ! ( cfgs[ 0 ] . routing_handler. chan_anns_recvd. load( Ordering :: Acquire ) , 54 ) ;
2076+ assert_eq ! ( cfgs[ 1 ] . routing_handler. chan_upds_recvd. load( Ordering :: Acquire ) , 108 ) ;
2077+ assert_eq ! ( cfgs[ 1 ] . routing_handler. chan_anns_recvd. load( Ordering :: Acquire ) , 54 ) ;
20672078 }
20682079
20692080 #[ test]
0 commit comments