@@ -659,6 +659,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
659659 let pause_read = {
660660 let mut peers_lock = self . peers . lock ( ) . unwrap ( ) ;
661661 let peers = & mut * peers_lock;
662+ let mut msgs_to_forward = Vec :: new ( ) ;
663+ let mut peer_node_id = None ;
662664 let pause_read = match peers. peers . get_mut ( peer_descriptor) {
663665 None => panic ! ( "Descriptor for read_event is not already known to PeerManager" ) ,
664666 Some ( peer) => {
@@ -794,13 +796,18 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
794796 }
795797 } ;
796798
797- if let Err ( handling_error ) = self . handle_message ( & mut peers. peers_needing_send , peer, peer_descriptor. clone ( ) , message) {
798- match handling_error {
799+ match self . handle_message ( & mut peers. peers_needing_send , peer, peer_descriptor. clone ( ) , message) {
800+ Err ( handling_error ) => match handling_error {
799801 MessageHandlingError :: PeerHandleError ( e) => { return Err ( e) } ,
800802 MessageHandlingError :: LightningError ( e) => {
801803 try_potential_handleerror ! ( Err ( e) ) ;
802804 } ,
803- }
805+ } ,
806+ Ok ( Some ( msg) ) => {
807+ peer_node_id = Some ( peer. their_node_id . expect ( "After noise is complete, their_node_id is always set" ) ) ;
808+ msgs_to_forward. push ( msg) ;
809+ } ,
810+ Ok ( None ) => { } ,
804811 }
805812 }
806813 }
@@ -812,14 +819,19 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
812819 }
813820 } ;
814821
822+ for msg in msgs_to_forward. drain ( ..) {
823+ self . forward_broadcast_msg ( peers, & msg, peer_node_id. as_ref ( ) ) ;
824+ }
825+
815826 pause_read
816827 } ;
817828
818829 Ok ( pause_read)
819830 }
820831
821832 /// Process an incoming message and return a decision (ok, lightning error, peer handling error) regarding the next action with the peer
822- fn handle_message ( & self , peers_needing_send : & mut HashSet < Descriptor > , peer : & mut Peer , peer_descriptor : Descriptor , message : wire:: Message ) -> Result < ( ) , MessageHandlingError > {
833+ /// Returns the message back if it needs to be broadcasted to all other peers.
834+ fn handle_message ( & self , peers_needing_send : & mut HashSet < Descriptor > , peer : & mut Peer , peer_descriptor : Descriptor , message : wire:: Message ) -> Result < Option < wire:: Message > , MessageHandlingError > {
823835 log_trace ! ( self . logger, "Received message of type {} from {}" , message. type_id( ) , log_pubkey!( peer. their_node_id. unwrap( ) ) ) ;
824836
825837 // Need an Init as first message
@@ -829,6 +841,8 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
829841 return Err ( PeerHandleError { no_connection_possible : false } . into ( ) ) ;
830842 }
831843
844+ let mut should_forward = None ;
845+
832846 match message {
833847 // Setup and Control messages:
834848 wire:: Message :: Init ( msg) => {
@@ -951,34 +965,28 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
951965 self . message_handler . chan_handler . handle_announcement_signatures ( & peer. their_node_id . unwrap ( ) , & msg) ;
952966 } ,
953967 wire:: Message :: ChannelAnnouncement ( msg) => {
954- let should_forward = match self . message_handler . route_handler . handle_channel_announcement ( & msg) {
968+ if match self . message_handler . route_handler . handle_channel_announcement ( & msg) {
955969 Ok ( v) => v,
956970 Err ( e) => { return Err ( e. into ( ) ) ; } ,
957- } ;
958-
959- if should_forward {
960- // TODO: forward msg along to all our other peers!
971+ } {
972+ should_forward = Some ( wire:: Message :: ChannelAnnouncement ( msg) ) ;
961973 }
962974 } ,
963975 wire:: Message :: NodeAnnouncement ( msg) => {
964- let should_forward = match self . message_handler . route_handler . handle_node_announcement ( & msg) {
976+ if match self . message_handler . route_handler . handle_node_announcement ( & msg) {
965977 Ok ( v) => v,
966978 Err ( e) => { return Err ( e. into ( ) ) ; } ,
967- } ;
968-
969- if should_forward {
970- // TODO: forward msg along to all our other peers!
979+ } {
980+ should_forward = Some ( wire:: Message :: NodeAnnouncement ( msg) ) ;
971981 }
972982 } ,
973983 wire:: Message :: ChannelUpdate ( msg) => {
974984 self . message_handler . chan_handler . handle_channel_update ( & peer. their_node_id . unwrap ( ) , & msg) ;
975- let should_forward = match self . message_handler . route_handler . handle_channel_update ( & msg) {
985+ if match self . message_handler . route_handler . handle_channel_update ( & msg) {
976986 Ok ( v) => v,
977987 Err ( e) => { return Err ( e. into ( ) ) ; } ,
978- } ;
979-
980- if should_forward {
981- // TODO: forward msg along to all our other peers!
988+ } {
989+ should_forward = Some ( wire:: Message :: ChannelUpdate ( msg) ) ;
982990 }
983991 } ,
984992 wire:: Message :: QueryShortChannelIds ( msg) => {
@@ -1007,7 +1015,7 @@ impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, L: Deref> PeerManager<D
10071015 log_trace ! ( self . logger, "Received unknown odd message of type {}, ignoring" , msg_type) ;
10081016 }
10091017 } ;
1010- Ok ( ( ) )
1018+ Ok ( should_forward )
10111019 }
10121020
10131021 fn forward_broadcast_msg ( & self , peers : & mut PeerHolder < Descriptor > , msg : & wire:: Message , except_node : Option < & PublicKey > ) {
0 commit comments