@@ -632,16 +632,15 @@ pub trait SocketDescriptor: cmp::Eq + hash::Hash + Clone {
632632 ///
633633 /// If the returned size is smaller than `data.len()`, a
634634 /// [`PeerManager::write_buffer_space_avail`] call must be made the next time more data can be
635- /// written. Additionally, until a `send_data` event completes fully, no further
636- /// [`PeerManager::read_event`] calls should be made for the same peer! Because this is to
637- /// prevent denial-of-service issues, you should not read or buffer any data from the socket
638- /// until then.
635+ /// written.
639636 ///
640- /// If a [`PeerManager::read_event`] call on this descriptor had previously returned true
641- /// (indicating that read events should be paused to prevent DoS in the send buffer),
642- /// `resume_read` may be set indicating that read events on this descriptor should resume. A
643- /// `resume_read` of false carries no meaning, and should not cause any action.
644- fn send_data ( & mut self , data : & [ u8 ] , resume_read : bool ) -> usize ;
637+ /// If `continue_read` is *not* set, further [`PeerManager::read_event`] calls should be
638+ /// avoided until another call is made with it set. This allows us to pause read if there are
639+ /// too many outgoing messages queued for a peer to avoid DoS issues where a peer fills our
640+ /// buffer by sending us messages that need response without reading the responses.
641+ ///
642+ /// Note that calls may be made with an empty `data` to update the `continue_read` flag.
643+ fn send_data ( & mut self , data : & [ u8 ] , continue_read : bool ) -> usize ;
645644 /// Disconnect the socket pointed to by this SocketDescriptor.
646645 ///
647646 /// You do *not* need to call [`PeerManager::socket_disconnected`] with this socket after this
@@ -1664,7 +1663,7 @@ where
16641663 Some ( peer_mutex) => {
16651664 let mut peer = peer_mutex. lock ( ) . unwrap ( ) ;
16661665 peer. awaiting_write_event = false ;
1667- self . do_attempt_write_data ( descriptor, & mut peer, false ) ;
1666+ self . do_attempt_write_data ( descriptor, & mut peer, true ) ;
16681667 } ,
16691668 } ;
16701669 Ok ( ( ) )
@@ -1676,11 +1675,9 @@ where
16761675 ///
16771676 /// Will *not* call back into [`send_data`] on any descriptors to avoid reentrancy complexity.
16781677 /// Thus, however, you should call [`process_events`] after any `read_event` to generate
1679- /// [`send_data`] calls to handle responses.
1680- ///
1681- /// If `Ok(true)` is returned, further read_events should not be triggered until a
1682- /// [`send_data`] call on this descriptor has `resume_read` set (preventing DoS issues in the
1683- /// send buffer).
1678+ /// [`send_data`] calls to handle responses. This is also important to give [`send_data`] calls
1679+ /// a chance to pause reads if too many messages have been queued in response allowing a peer
1680+ /// to bloat our memory.
16841681 ///
16851682 /// In order to avoid processing too many messages at once per peer, `data` should be on the
16861683 /// order of 4KiB.
@@ -1689,7 +1686,7 @@ where
16891686 /// [`process_events`]: PeerManager::process_events
16901687 pub fn read_event (
16911688 & self , peer_descriptor : & mut Descriptor , data : & [ u8 ] ,
1692- ) -> Result < bool , PeerHandleError > {
1689+ ) -> Result < ( ) , PeerHandleError > {
16931690 match self . do_read_event ( peer_descriptor, data) {
16941691 Ok ( res) => Ok ( res) ,
16951692 Err ( e) => {
@@ -1718,8 +1715,7 @@ where
17181715
17191716 fn do_read_event (
17201717 & self , peer_descriptor : & mut Descriptor , data : & [ u8 ] ,
1721- ) -> Result < bool , PeerHandleError > {
1722- let mut pause_read = false ;
1718+ ) -> Result < ( ) , PeerHandleError > {
17231719 let peers = self . peers . read ( ) . unwrap ( ) ;
17241720 let mut msgs_to_forward = Vec :: new ( ) ;
17251721 let mut peer_node_id = None ;
@@ -1994,7 +1990,6 @@ where
19941990 } ,
19951991 }
19961992 }
1997- pause_read = !self . peer_should_read ( peer) ;
19981993
19991994 if let Some ( message) = msg_to_handle {
20001995 match self . handle_message ( & peer_mutex, peer_lock, message) {
@@ -2027,7 +2022,7 @@ where
20272022 ) ;
20282023 }
20292024
2030- Ok ( pause_read )
2025+ Ok ( ( ) )
20312026 }
20322027
20332028 /// Process an incoming message and return a decision (ok, lightning error, peer handling error) regarding the next action with the peer
@@ -3942,8 +3937,8 @@ mod tests {
39423937 ) -> (
39433938 FileDescriptor ,
39443939 FileDescriptor ,
3945- Result < bool , PeerHandleError > ,
3946- Result < bool , PeerHandleError > ,
3940+ Result < ( ) , PeerHandleError > ,
3941+ Result < ( ) , PeerHandleError > ,
39473942 ) {
39483943 let addr_a = SocketAddress :: TcpIpV4 { addr : [ 127 , 0 , 0 , 1 ] , port : 1000 } ;
39493944 let addr_b = SocketAddress :: TcpIpV4 { addr : [ 127 , 0 , 0 , 1 ] , port : 1001 } ;
@@ -3958,11 +3953,11 @@ mod tests {
39583953 let initial_data =
39593954 peer_b. new_outbound_connection ( id_a, fd_b. clone ( ) , Some ( addr_a. clone ( ) ) ) . unwrap ( ) ;
39603955 peer_a. new_inbound_connection ( fd_a. clone ( ) , Some ( addr_b. clone ( ) ) ) . unwrap ( ) ;
3961- assert_eq ! ( peer_a. read_event( & mut fd_a, & initial_data) . unwrap( ) , false ) ;
3956+ peer_a. read_event ( & mut fd_a, & initial_data) . unwrap ( ) ;
39623957 peer_a. process_events ( ) ;
39633958
39643959 let a_data = fd_a. outbound_data . lock ( ) . unwrap ( ) . split_off ( 0 ) ;
3965- assert_eq ! ( peer_b. read_event( & mut fd_b, & a_data) . unwrap( ) , false ) ;
3960+ peer_b. read_event ( & mut fd_b, & a_data) . unwrap ( ) ;
39663961
39673962 peer_b. process_events ( ) ;
39683963 let b_data = fd_b. outbound_data . lock ( ) . unwrap ( ) . split_off ( 0 ) ;
@@ -3989,8 +3984,8 @@ mod tests {
39893984
39903985 let ( fd_a, fd_b, a_refused, b_refused) = try_establish_connection ( peer_a, peer_b) ;
39913986
3992- assert_eq ! ( a_refused. unwrap( ) , false ) ;
3993- assert_eq ! ( b_refused. unwrap( ) , false ) ;
3987+ a_refused. unwrap ( ) ;
3988+ b_refused. unwrap ( ) ;
39943989
39953990 assert_eq ! ( peer_a. peer_by_node_id( & id_b) . unwrap( ) . counterparty_node_id, id_b) ;
39963991 assert_eq ! ( peer_a. peer_by_node_id( & id_b) . unwrap( ) . socket_address, Some ( addr_b) ) ;
@@ -4113,11 +4108,11 @@ mod tests {
41134108 let initial_data =
41144109 peer_b. new_outbound_connection ( id_a, fd_b. clone ( ) , Some ( addr_a. clone ( ) ) ) . unwrap ( ) ;
41154110 peer_a. new_inbound_connection ( fd_a. clone ( ) , Some ( addr_b. clone ( ) ) ) . unwrap ( ) ;
4116- assert_eq ! ( peer_a. read_event( & mut fd_a, & initial_data) . unwrap( ) , false ) ;
4111+ peer_a. read_event ( & mut fd_a, & initial_data) . unwrap ( ) ;
41174112 peer_a. process_events ( ) ;
41184113
41194114 let a_data = fd_a. outbound_data . lock ( ) . unwrap ( ) . split_off ( 0 ) ;
4120- assert_eq ! ( peer_b. read_event( & mut fd_b, & a_data) . unwrap( ) , false ) ;
4115+ peer_b. read_event ( & mut fd_b, & a_data) . unwrap ( ) ;
41214116
41224117 peer_b. process_events ( ) ;
41234118 let b_data = fd_b. outbound_data . lock ( ) . unwrap ( ) . split_off ( 0 ) ;
@@ -4144,11 +4139,11 @@ mod tests {
41444139 let initial_data =
41454140 peer_b. new_outbound_connection ( id_a, fd_b. clone ( ) , Some ( addr_a. clone ( ) ) ) . unwrap ( ) ;
41464141 peer_a. new_inbound_connection ( fd_a. clone ( ) , Some ( addr_b. clone ( ) ) ) . unwrap ( ) ;
4147- assert_eq ! ( peer_a. read_event( & mut fd_a, & initial_data) . unwrap( ) , false ) ;
4142+ peer_a. read_event ( & mut fd_a, & initial_data) . unwrap ( ) ;
41484143 peer_a. process_events ( ) ;
41494144
41504145 let a_data = fd_a. outbound_data . lock ( ) . unwrap ( ) . split_off ( 0 ) ;
4151- assert_eq ! ( peer_b. read_event( & mut fd_b, & a_data) . unwrap( ) , false ) ;
4146+ peer_b. read_event ( & mut fd_b, & a_data) . unwrap ( ) ;
41524147
41534148 peer_b. process_events ( ) ;
41544149 let b_data = fd_b. outbound_data . lock ( ) . unwrap ( ) . split_off ( 0 ) ;
@@ -4220,7 +4215,7 @@ mod tests {
42204215 peers[ 0 ] . process_events ( ) ;
42214216
42224217 let a_data = fd_a. outbound_data . lock ( ) . unwrap ( ) . split_off ( 0 ) ;
4223- assert_eq ! ( peers[ 1 ] . read_event( & mut fd_b, & a_data) . unwrap( ) , false ) ;
4218+ peers[ 1 ] . read_event ( & mut fd_b, & a_data) . unwrap ( ) ;
42244219 }
42254220
42264221 #[ test]
@@ -4240,13 +4235,13 @@ mod tests {
42404235 let mut dup_encryptor =
42414236 PeerChannelEncryptor :: new_outbound ( id_a, SecretKey :: from_slice ( & [ 42 ; 32 ] ) . unwrap ( ) ) ;
42424237 let initial_data = dup_encryptor. get_act_one ( & peers[ 1 ] . secp_ctx ) ;
4243- assert_eq ! ( peers[ 0 ] . read_event( & mut fd_dup, & initial_data) . unwrap( ) , false ) ;
4238+ peers[ 0 ] . read_event ( & mut fd_dup, & initial_data) . unwrap ( ) ;
42444239 peers[ 0 ] . process_events ( ) ;
42454240
42464241 let a_data = fd_dup. outbound_data . lock ( ) . unwrap ( ) . split_off ( 0 ) ;
42474242 let ( act_three, _) =
42484243 dup_encryptor. process_act_two ( & a_data[ ..] , & & cfgs[ 1 ] . node_signer ) . unwrap ( ) ;
4249- assert_eq ! ( peers[ 0 ] . read_event( & mut fd_dup, & act_three) . unwrap( ) , false ) ;
4244+ peers[ 0 ] . read_event ( & mut fd_dup, & act_three) . unwrap ( ) ;
42504245
42514246 let not_init_msg = msgs:: Ping { ponglen : 4 , byteslen : 0 } ;
42524247 let msg_bytes = dup_encryptor. encrypt_message ( & not_init_msg) ;
@@ -4504,10 +4499,10 @@ mod tests {
45044499 assert_eq ! ( peers_len, 1 ) ;
45054500 }
45064501
4507- assert_eq ! ( peers[ 0 ] . read_event( & mut fd_a, & initial_data) . unwrap( ) , false ) ;
4502+ peers[ 0 ] . read_event ( & mut fd_a, & initial_data) . unwrap ( ) ;
45084503 peers[ 0 ] . process_events ( ) ;
45094504 let a_data = fd_a. outbound_data . lock ( ) . unwrap ( ) . split_off ( 0 ) ;
4510- assert_eq ! ( peers[ 1 ] . read_event( & mut fd_b, & a_data) . unwrap( ) , false ) ;
4505+ peers[ 1 ] . read_event ( & mut fd_b, & a_data) . unwrap ( ) ;
45114506 peers[ 1 ] . process_events ( ) ;
45124507
45134508 // ...but if we get a second timer tick, we should disconnect the peer
@@ -4557,11 +4552,11 @@ mod tests {
45574552 let act_one = peer_b. new_outbound_connection ( a_id, fd_b. clone ( ) , None ) . unwrap ( ) ;
45584553 peer_a. new_inbound_connection ( fd_a. clone ( ) , None ) . unwrap ( ) ;
45594554
4560- assert_eq ! ( peer_a. read_event( & mut fd_a, & act_one) . unwrap( ) , false ) ;
4555+ peer_a. read_event ( & mut fd_a, & act_one) . unwrap ( ) ;
45614556 peer_a. process_events ( ) ;
45624557
45634558 let act_two = fd_a. outbound_data . lock ( ) . unwrap ( ) . split_off ( 0 ) ;
4564- assert_eq ! ( peer_b. read_event( & mut fd_b, & act_two) . unwrap( ) , false ) ;
4559+ peer_b. read_event ( & mut fd_b, & act_two) . unwrap ( ) ;
45654560 peer_b. process_events ( ) ;
45664561
45674562 // Calling this here triggers the race on inbound connections.
@@ -4575,7 +4570,7 @@ mod tests {
45754570 assert ! ( !handshake_complete) ;
45764571 }
45774572
4578- assert_eq ! ( peer_a. read_event( & mut fd_a, & act_three_with_init_b) . unwrap( ) , false ) ;
4573+ peer_a. read_event ( & mut fd_a, & act_three_with_init_b) . unwrap ( ) ;
45794574 peer_a. process_events ( ) ;
45804575
45814576 {
@@ -4595,7 +4590,7 @@ mod tests {
45954590 assert ! ( !handshake_complete) ;
45964591 }
45974592
4598- assert_eq ! ( peer_b. read_event( & mut fd_b, & init_a) . unwrap( ) , false ) ;
4593+ peer_b. read_event ( & mut fd_b, & init_a) . unwrap ( ) ;
45994594 peer_b. process_events ( ) ;
46004595
46014596 {
@@ -4632,7 +4627,7 @@ mod tests {
46324627 peer_a. process_events ( ) ;
46334628 let msg = fd_a. outbound_data . lock ( ) . unwrap ( ) . split_off ( 0 ) ;
46344629 assert ! ( !msg. is_empty( ) ) ;
4635- assert_eq ! ( peer_b. read_event( & mut fd_b, & msg) . unwrap( ) , false ) ;
4630+ peer_b. read_event ( & mut fd_b, & msg) . unwrap ( ) ;
46364631 peer_b. process_events ( ) ;
46374632 } ;
46384633
@@ -4675,12 +4670,12 @@ mod tests {
46754670
46764671 let msg = fd_a. outbound_data. lock( ) . unwrap( ) . split_off( 0 ) ;
46774672 if !msg. is_empty( ) {
4678- assert_eq! ( peers[ 1 ] . read_event( & mut fd_b, & msg) . unwrap( ) , false ) ;
4673+ peers[ 1 ] . read_event( & mut fd_b, & msg) . unwrap( ) ;
46794674 continue ;
46804675 }
46814676 let msg = fd_b. outbound_data. lock( ) . unwrap( ) . split_off( 0 ) ;
46824677 if !msg. is_empty( ) {
4683- assert_eq! ( peers[ 0 ] . read_event( & mut fd_a, & msg) . unwrap( ) , false ) ;
4678+ peers[ 0 ] . read_event( & mut fd_a, & msg) . unwrap( ) ;
46844679 continue ;
46854680 }
46864681 break ;
0 commit comments