@@ -337,6 +337,8 @@ struct Peer {
337337
338338 pending_outbound_buffer : LinkedList < Vec < u8 > > ,
339339 pending_outbound_buffer_first_msg_offset : usize ,
340+ gossip_broadcast_buffer : LinkedList < Vec < u8 > > ,
341+ gossip_broadcast_buffer_first_msg_offset : usize ,
340342 awaiting_write_event : bool ,
341343
342344 pending_read_buffer : Vec < u8 > ,
@@ -393,15 +395,18 @@ impl Peer {
393395 /// backfilling gossip data to the peer. This is checked every time the peer's buffer may have
394396 /// been drained.
395397 fn should_buffer_gossip_backfill ( & self ) -> bool {
396- self . pending_outbound_buffer . is_empty ( ) &&
397- self . msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
398+ self . pending_outbound_buffer . is_empty ( ) && ! self . buffer_full_drop_gossip_broadcast ( ) // TODO: are broadcasts higher priority than background sync? seems like maybe they should be
399+ && self . msgs_sent_since_pong < BUFFER_DRAIN_MSGS_PER_TICK
398400 }
399401
400- /// Returns whether this peer's buffer is full and we should drop gossip messages .
402+ /// Returns whether this peer's outbound buffers are full and we should drop gossip broadcasts .
401403 fn buffer_full_drop_gossip_broadcast ( & self ) -> bool {
402- if self . pending_outbound_buffer . len ( ) > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP
403- || self . msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO {
404- return false
404+ let total_outbound_buffered =
405+ self . gossip_broadcast_buffer . len ( ) + self . pending_outbound_buffer . len ( ) ;
406+
407+ if total_outbound_buffered > OUTBOUND_BUFFER_LIMIT_DROP_GOSSIP ||
408+ self . msgs_sent_since_pong > BUFFER_DRAIN_MSGS_PER_TICK * FORWARD_INIT_SYNC_BUFFER_LIMIT_RATIO
409+ {
405410 return true
406411 }
407412 false
@@ -672,6 +677,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
672677
673678 pending_outbound_buffer : LinkedList :: new ( ) ,
674679 pending_outbound_buffer_first_msg_offset : 0 ,
680+ gossip_broadcast_buffer : LinkedList :: new ( ) ,
681+ gossip_broadcast_buffer_first_msg_offset : 0 ,
675682 awaiting_write_event : false ,
676683
677684 pending_read_buffer,
@@ -718,6 +725,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
718725
719726 pending_outbound_buffer : LinkedList :: new ( ) ,
720727 pending_outbound_buffer_first_msg_offset : 0 ,
728+ gossip_broadcast_buffer : LinkedList :: new ( ) ,
729+ gossip_broadcast_buffer_first_msg_offset : 0 ,
721730 awaiting_write_event : false ,
722731
723732 pending_read_buffer,
@@ -780,20 +789,29 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
780789 self . maybe_send_extra_ping ( peer) ;
781790 }
782791
783- let next_buff = match peer. pending_outbound_buffer . front ( ) {
784- None => return ,
785- Some ( buff) => buff,
786- } ;
787-
788- let pending = & next_buff[ peer. pending_outbound_buffer_first_msg_offset ..] ;
789- let data_sent = descriptor. send_data ( pending, peer. should_read ( ) ) ;
790- peer. pending_outbound_buffer_first_msg_offset += data_sent;
791- if peer. pending_outbound_buffer_first_msg_offset == next_buff. len ( ) {
792- peer. pending_outbound_buffer_first_msg_offset = 0 ;
793- peer. pending_outbound_buffer . pop_front ( ) ;
794- } else {
795- peer. awaiting_write_event = true ;
796- }
792+ // TODO: break into methods in `peer` or somehow DRY
793+ // We prioritize channel messages over gossip broadcasts
794+ if let Some ( next_buff) = peer. pending_outbound_buffer . front ( ) {
795+ let pending = & next_buff[ peer. pending_outbound_buffer_first_msg_offset ..] ;
796+ let data_sent = descriptor. send_data ( pending, peer. should_read ( ) ) ;
797+ peer. pending_outbound_buffer_first_msg_offset += data_sent;
798+ if peer. pending_outbound_buffer_first_msg_offset == next_buff. len ( ) {
799+ peer. pending_outbound_buffer_first_msg_offset = 0 ;
800+ peer. pending_outbound_buffer . pop_front ( ) ;
801+ } else {
802+ peer. awaiting_write_event = true ;
803+ }
804+ } else if let Some ( next_buff) = peer. gossip_broadcast_buffer . front ( ) {
805+ let pending = & next_buff[ peer. gossip_broadcast_buffer_first_msg_offset ..] ;
806+ let data_sent = descriptor. send_data ( pending, peer. should_read ( ) ) ;
807+ peer. gossip_broadcast_buffer_first_msg_offset += data_sent;
808+ if peer. gossip_broadcast_buffer_first_msg_offset == next_buff. len ( ) {
809+ peer. gossip_broadcast_buffer_first_msg_offset = 0 ;
810+ peer. gossip_broadcast_buffer . pop_front ( ) ;
811+ } else {
812+ peer. awaiting_write_event = true ;
813+ }
814+ } else { return }
797815 }
798816 }
799817
@@ -858,6 +876,12 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
858876 peer. pending_outbound_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encoded_message[ ..] ) ) ;
859877 }
860878
879+ /// Append a message to a peer's pending outbound/write gossip broadcast buffer
880+ fn enqueue_encoded_gossip_broadcast ( & self , peer : & mut Peer , encoded_message : & Vec < u8 > ) {
881+ peer. msgs_sent_since_pong += 1 ;
882+ peer. gossip_broadcast_buffer . push_back ( peer. channel_encryptor . encrypt_message ( & encoded_message[ ..] ) ) ;
883+ }
884+
861885 /// Append a message to a peer's pending outbound/write buffer
862886 fn enqueue_message < M : wire:: Type > ( & self , peer : & mut Peer , message : & M ) {
863887 let mut buffer = VecWriter ( Vec :: with_capacity ( 2048 ) ) ;
@@ -1337,7 +1361,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
13371361 if except_node. is_some ( ) && peer. their_node_id . as_ref ( ) == except_node {
13381362 continue ;
13391363 }
1340- self . enqueue_encoded_message ( & mut * peer, & encoded_msg) ;
1364+ self . enqueue_encoded_gossip_broadcast ( & mut * peer, & encoded_msg) ;
13411365 }
13421366 } ,
13431367 wire:: Message :: NodeAnnouncement ( ref msg) => {
@@ -1360,7 +1384,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
13601384 if except_node. is_some ( ) && peer. their_node_id . as_ref ( ) == except_node {
13611385 continue ;
13621386 }
1363- self . enqueue_encoded_message ( & mut * peer, & encoded_msg) ;
1387+ self . enqueue_encoded_gossip_broadcast ( & mut * peer, & encoded_msg) ;
13641388 }
13651389 } ,
13661390 wire:: Message :: ChannelUpdate ( ref msg) => {
@@ -1380,7 +1404,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref, CMH: Deref> P
13801404 if except_node. is_some ( ) && peer. their_node_id . as_ref ( ) == except_node {
13811405 continue ;
13821406 }
1383- self . enqueue_encoded_message ( & mut * peer, & encoded_msg) ;
1407+ self . enqueue_encoded_gossip_broadcast ( & mut * peer, & encoded_msg) ;
13841408 }
13851409 } ,
13861410 _ => debug_assert ! ( false , "We shouldn't attempt to forward anything but gossip messages" ) ,
0 commit comments