@@ -330,9 +330,9 @@ where
330330 }
331331
332332 fn handle_dht_events ( & mut self , cx : & mut Context ) -> Result < ( ) > {
333- while let Poll :: Ready ( Some ( event ) ) = self . dht_event_rx . poll_next_unpin ( cx ) {
334- match event {
335- DhtEvent :: ValueFound ( v) => {
333+ loop {
334+ match self . dht_event_rx . poll_next_unpin ( cx ) {
335+ Poll :: Ready ( Some ( DhtEvent :: ValueFound ( v) ) ) => {
336336 if let Some ( metrics) = & self . metrics {
337337 metrics. dht_event_received . with_label_values ( & [ "value_found" ] ) . inc ( ) ;
338338 }
@@ -347,7 +347,7 @@ where
347347
348348 self . handle_dht_value_found_event ( v) ?;
349349 }
350- DhtEvent :: ValueNotFound ( hash) => {
350+ Poll :: Ready ( Some ( DhtEvent :: ValueNotFound ( hash) ) ) => {
351351 if let Some ( metrics) = & self . metrics {
352352 metrics. dht_event_received . with_label_values ( & [ "value_not_found" ] ) . inc ( ) ;
353353 }
@@ -357,7 +357,7 @@ where
357357 "Value for hash '{:?}' not found on Dht." , hash
358358 )
359359 } ,
360- DhtEvent :: ValuePut ( hash) => {
360+ Poll :: Ready ( Some ( DhtEvent :: ValuePut ( hash) ) ) => {
361361 if let Some ( metrics) = & self . metrics {
362362 metrics. dht_event_received . with_label_values ( & [ "value_put" ] ) . inc ( ) ;
363363 }
@@ -367,7 +367,7 @@ where
367367 "Successfully put hash '{:?}' on Dht." , hash,
368368 )
369369 } ,
370- DhtEvent :: ValuePutFailed ( hash) => {
370+ Poll :: Ready ( Some ( DhtEvent :: ValuePutFailed ( hash) ) ) => {
371371 if let Some ( metrics) = & self . metrics {
372372 metrics. dht_event_received . with_label_values ( & [ "value_put_failed" ] ) . inc ( ) ;
373373 }
@@ -377,10 +377,12 @@ where
377377 "Failed to put hash '{:?}' on Dht." , hash
378378 )
379379 } ,
380+ // The sender side of the dht event stream has been closed, likely due to the
381+ // network terminating.
382+ Poll :: Ready ( None ) => return Err ( Error :: DhtEventStreamTerminated ) ,
383+ Poll :: Pending => return Ok ( ( ) ) ,
380384 }
381385 }
382-
383- Ok ( ( ) )
384386 }
385387
386388 fn handle_dht_value_found_event (
@@ -483,7 +485,6 @@ where
483485 }
484486
485487 /// Update the peer set 'authority' priority group.
486- //
487488 fn update_peer_set_priority_group ( & self ) -> Result < ( ) > {
488489 let addresses = self . addr_cache . get_subset ( ) ;
489490
@@ -539,11 +540,18 @@ where
539540
540541 match inner ( ) {
541542 Ok ( ( ) ) => { }
543+
544+ // Handle fatal errors.
545+ //
546+ // Given that the network likely terminated authority discovery should do the same.
547+ Err ( Error :: DhtEventStreamTerminated ) => return Poll :: Ready ( ( ) ) ,
548+
549+ // Handle non-fatal errors.
542550 Err ( e) => error ! ( target: "sub-authority-discovery" , "Poll failure: {:?}" , e) ,
543551 } ;
544552
545- // Make sure to always return NotReady as this is a long running task with the same lifetime
546- // as the node itself.
553+ // Return Poll::Pending as this is a long running task with the same lifetime as the node
554+ // itself.
547555 Poll :: Pending
548556 }
549557}
0 commit comments