@@ -35,6 +35,7 @@ use libp2p::{core::multiaddr, multihash::Multihash};
3535use log:: { debug, error, log_enabled} ;
3636use prometheus_endpoint:: { Counter , CounterVec , Gauge , Opts , U64 , register} ;
3737use prost:: Message ;
38+ use rand:: { seq:: SliceRandom , thread_rng} ;
3839use sc_client_api:: blockchain:: HeaderBackend ;
3940use sc_network:: {
4041 config:: MultiaddrWithPeerId ,
@@ -70,6 +71,9 @@ const AUTHORITIES_PRIORITY_GROUP_NAME: &'static str = "authorities";
7071/// Maximum number of addresses cached per authority. Additional addresses are discarded.
7172const MAX_ADDRESSES_PER_AUTHORITY : usize = 10 ;
7273
74+ /// Maximum number of in-flight DHT lookups at any given point in time.
75+ const MAX_IN_FLIGHT_LOOKUPS : usize = 8 ;
76+
7377/// Role an authority discovery module can run as.
7478pub enum Role {
7579 /// Actual authority as well as a reference to its key store.
@@ -137,12 +141,17 @@ where
137141
138142 /// Interval to be proactive, publishing own addresses.
139143 publish_interval : Interval ,
140- /// Interval on which to query for addresses of other authorities.
144+ /// Interval at which to request addresses of authorities, refilling the pending lookups queue .
141145 query_interval : Interval ,
142146 /// Interval on which to set the peerset priority group to a new random
143147 /// set of addresses.
144148 priority_group_set_interval : Interval ,
145149
150+ /// Queue of throttled lookups pending to be passed to the network.
151+ pending_lookups : Vec < AuthorityId > ,
152+ /// Set of in-flight lookups.
153+ in_flight_lookups : HashMap < libp2p:: kad:: record:: Key , AuthorityId > ,
154+
146155 addr_cache : addr_cache:: AddrCache ,
147156
148157 metrics : Option < Metrics > ,
@@ -183,8 +192,8 @@ where
183192 Duration :: from_secs ( 12 * 60 * 60 ) ,
184193 ) ;
185194
186- // External addresses of other authorities can change at any given point in time. The
187- // interval on which to query for external addresses of other authorities is a trade off
195+ // External addresses of remote authorities can change at any given point in time. The
196+ // interval on which to trigger new queries for the current authorities is a trade off
188197 // between efficiency and performance.
189198 let query_interval_start = Instant :: now ( ) + LIBP2P_KADEMLIA_BOOTSTRAP_TIME ;
190199 let query_interval_duration = Duration :: from_secs ( 10 * 60 ) ;
@@ -193,9 +202,9 @@ where
193202 // Querying 500 [`AuthorityId`]s takes ~1m on the Kusama DHT (10th of August 2020) when
194203 // comparing `authority_discovery_authority_addresses_requested_total` and
195204 // `authority_discovery_dht_event_received`. With that in mind set the peerset priority
196- // group on the same interval as the [`query_interval`] above, just delayed by 2 minutes.
205+ // group on the same interval as the [`query_interval`] above, just delayed by 5 minutes.
197206 let priority_group_set_interval = interval_at (
198- query_interval_start + Duration :: from_secs ( 2 * 60 ) ,
207+ query_interval_start + Duration :: from_secs ( 5 * 60 ) ,
199208 query_interval_duration,
200209 ) ;
201210
@@ -229,6 +238,8 @@ where
229238 publish_interval,
230239 query_interval,
231240 priority_group_set_interval,
241+ pending_lookups : Vec :: new ( ) ,
242+ in_flight_lookups : HashMap :: new ( ) ,
232243 addr_cache,
233244 role,
234245 metrics,
@@ -270,7 +281,9 @@ where
270281
271282 if let Some ( metrics) = & self . metrics {
272283 metrics. publish . inc ( ) ;
273- metrics. amount_last_published . set ( addresses. len ( ) as u64 ) ;
284+ metrics. amount_addresses_last_published . set (
285+ addresses. len ( ) . try_into ( ) . unwrap_or ( std:: u64:: MAX ) ,
286+ ) ;
274287 }
275288
276289 let mut serialized_addresses = vec ! [ ] ;
@@ -314,15 +327,9 @@ where
314327 Ok ( ( ) )
315328 }
316329
317- fn request_addresses_of_others ( & mut self ) -> Result < ( ) > {
330+ fn refill_pending_lookups_queue ( & mut self ) -> Result < ( ) > {
318331 let id = BlockId :: hash ( self . client . info ( ) . best_hash ) ;
319332
320- let authorities = self
321- . client
322- . runtime_api ( )
323- . authorities ( & id)
324- . map_err ( Error :: CallingRuntime ) ?;
325-
326333 let local_keys = match & self . role {
327334 Role :: Authority ( key_store) => {
328335 key_store. read ( )
@@ -333,21 +340,52 @@ where
333340 Role :: Sentry => HashSet :: new ( ) ,
334341 } ;
335342
336- for authority_id in authorities. iter ( ) {
337- // Make sure we don't look up our own keys.
338- if !local_keys. contains ( authority_id. as_ref ( ) ) {
339- if let Some ( metrics) = & self . metrics {
340- metrics. request . inc ( ) ;
341- }
343+ let mut authorities = self
344+ . client
345+ . runtime_api ( )
346+ . authorities ( & id)
347+ . map_err ( Error :: CallingRuntime ) ?
348+ . into_iter ( )
349+ . filter ( |id| !local_keys. contains ( id. as_ref ( ) ) )
350+ . collect ( ) ;
342351
343- self . network
344- . get_value ( & hash_authority_id ( authority_id. as_ref ( ) ) ) ;
345- }
352+ self . addr_cache . retain_ids ( & authorities) ;
353+
354+ authorities. shuffle ( & mut thread_rng ( ) ) ;
355+ self . pending_lookups = authorities;
356+ // Ignore all still in-flight lookups. Those that are still in-flight are likely stalled as
357+ // query interval ticks are far enough apart for all lookups to succeed.
358+ self . in_flight_lookups . clear ( ) ;
359+
360+ if let Some ( metrics) = & self . metrics {
361+ metrics. requests_pending . set (
362+ self . pending_lookups . len ( ) . try_into ( ) . unwrap_or ( std:: u64:: MAX ) ,
363+ ) ;
346364 }
347365
348366 Ok ( ( ) )
349367 }
350368
369+ fn start_new_lookups ( & mut self ) {
370+ while self . in_flight_lookups . len ( ) < MAX_IN_FLIGHT_LOOKUPS {
371+ let authority_id = match self . pending_lookups . pop ( ) {
372+ Some ( authority) => authority,
373+ None => return ,
374+ } ;
375+ let hash = hash_authority_id ( authority_id. as_ref ( ) ) ;
376+ self . network
377+ . get_value ( & hash) ;
378+ self . in_flight_lookups . insert ( hash, authority_id) ;
379+
380+ if let Some ( metrics) = & self . metrics {
381+ metrics. requests . inc ( ) ;
382+ metrics. requests_pending . set (
383+ self . pending_lookups . len ( ) . try_into ( ) . unwrap_or ( std:: u64:: MAX ) ,
384+ ) ;
385+ }
386+ }
387+ }
388+
351389 /// Handle incoming Dht events.
352390 ///
353391 /// Returns either:
@@ -385,10 +423,17 @@ where
385423 metrics. dht_event_received . with_label_values ( & [ "value_not_found" ] ) . inc ( ) ;
386424 }
387425
388- debug ! (
389- target: LOG_TARGET ,
390- "Value for hash '{:?}' not found on Dht." , hash
391- )
426+ if self . in_flight_lookups . remove ( & hash) . is_some ( ) {
427+ debug ! (
428+ target: LOG_TARGET ,
429+ "Value for hash '{:?}' not found on Dht." , hash
430+ )
431+ } else {
432+ debug ! (
433+ target: LOG_TARGET ,
434+ "Received 'ValueNotFound' for unexpected hash '{:?}'." , hash
435+ )
436+ }
392437 } ,
393438 Some ( DhtEvent :: ValuePut ( hash) ) => {
394439 if let Some ( metrics) = & self . metrics {
@@ -434,23 +479,9 @@ where
434479 }
435480 } ) ?. ok_or ( Error :: ReceivingDhtValueFoundEventWithNoRecords ) ?;
436481
437- let authorities = {
438- let block_id = BlockId :: hash ( self . client . info ( ) . best_hash ) ;
439- // From the Dht we only get the hashed authority id. In order to retrieve the actual
440- // authority id and to ensure it is actually an authority, we match the hash against the
441- // hash of the authority id of all other authorities.
442- let authorities = self . client . runtime_api ( ) . authorities ( & block_id) ?;
443- self . addr_cache . retain_ids ( & authorities) ;
444- authorities
445- . into_iter ( )
446- . map ( |id| ( hash_authority_id ( id. as_ref ( ) ) , id) )
447- . collect :: < HashMap < _ , _ > > ( )
448- } ;
449-
450- // Check if the event origins from an authority in the current or next authority set.
451- let authority_id: & AuthorityId = authorities
452- . get ( & remote_key)
453- . ok_or ( Error :: MatchingHashedAuthorityIdWithAuthorityId ) ?;
482+ let authority_id: AuthorityId = self . in_flight_lookups
483+ . remove ( & remote_key)
484+ . ok_or ( Error :: ReceivingUnexpectedRecord ) ?;
454485
455486 let local_peer_id = self . network . local_peer_id ( ) ;
456487
@@ -463,7 +494,7 @@ where
463494 let signature = AuthoritySignature :: decode ( & mut & signature[ ..] )
464495 . map_err ( Error :: EncodingDecodingScale ) ?;
465496
466- if !AuthorityPair :: verify ( & signature, & addresses, authority_id) {
497+ if !AuthorityPair :: verify ( & signature, & addresses, & authority_id) {
467498 return Err ( Error :: VerifyingDhtPayload ) ;
468499 }
469500
@@ -503,7 +534,7 @@ where
503534 . collect ( ) ;
504535
505536 if !remote_addresses. is_empty ( ) {
506- self . addr_cache . insert ( authority_id. clone ( ) , remote_addresses) ;
537+ self . addr_cache . insert ( authority_id, remote_addresses) ;
507538 if let Some ( metrics) = & self . metrics {
508539 metrics. known_authorities_count . set (
509540 self . addr_cache . num_ids ( ) . try_into ( ) . unwrap_or ( std:: u64:: MAX )
@@ -610,15 +641,15 @@ where
610641 }
611642 }
612643
613- // Request addresses of authorities.
644+ // Request addresses of authorities, refilling the pending lookups queue .
614645 if let Poll :: Ready ( _) = self . query_interval . poll_next_unpin ( cx) {
615646 // Register waker of underlying task for next interval.
616647 while let Poll :: Ready ( _) = self . query_interval . poll_next_unpin ( cx) { }
617648
618- if let Err ( e) = self . request_addresses_of_others ( ) {
649+ if let Err ( e) = self . refill_pending_lookups_queue ( ) {
619650 error ! (
620651 target: LOG_TARGET ,
621- "Failed to request addresses of authorities : {:?}" , e,
652+ "Failed to refill pending lookups queue : {:?}" , e,
622653 ) ;
623654 }
624655 }
@@ -652,6 +683,8 @@ where
652683 }
653684 }
654685
686+ self . start_new_lookups ( ) ;
687+
655688 Poll :: Pending
656689 }
657690}
@@ -712,8 +745,9 @@ fn interval_at(start: Instant, duration: Duration) -> Interval {
712745#[ derive( Clone ) ]
713746pub ( crate ) struct Metrics {
714747 publish : Counter < U64 > ,
715- amount_last_published : Gauge < U64 > ,
716- request : Counter < U64 > ,
748+ amount_addresses_last_published : Gauge < U64 > ,
749+ requests : Counter < U64 > ,
750+ requests_pending : Gauge < U64 > ,
717751 dht_event_received : CounterVec < U64 > ,
718752 handle_value_found_event_failure : Counter < U64 > ,
719753 known_authorities_count : Gauge < U64 > ,
@@ -730,22 +764,29 @@ impl Metrics {
730764 ) ?,
731765 registry,
732766 ) ?,
733- amount_last_published : register (
767+ amount_addresses_last_published : register (
734768 Gauge :: new (
735769 "authority_discovery_amount_external_addresses_last_published" ,
736770 "Number of external addresses published when authority discovery last \
737771 published addresses."
738772 ) ?,
739773 registry,
740774 ) ?,
741- request : register (
775+ requests : register (
742776 Counter :: new (
743777 "authority_discovery_authority_addresses_requested_total" ,
744778 "Number of times authority discovery has requested external addresses of a \
745779 single authority."
746780 ) ?,
747781 registry,
748782 ) ?,
783+ requests_pending : register (
784+ Gauge :: new (
785+ "authority_discovery_authority_address_requests_pending" ,
786+ "Number of pending authority address requests."
787+ ) ?,
788+ registry,
789+ ) ?,
749790 dht_event_received : register (
750791 CounterVec :: new (
751792 Opts :: new (
0 commit comments