-
Notifications
You must be signed in to change notification settings - Fork 2.7k
client/authority-discovery: Throttle DHT requests #7018
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -35,6 +35,7 @@ use libp2p::{core::multiaddr, multihash::Multihash}; | |
| use log::{debug, error, log_enabled}; | ||
| use prometheus_endpoint::{Counter, CounterVec, Gauge, Opts, U64, register}; | ||
| use prost::Message; | ||
| use rand::{seq::SliceRandom, thread_rng}; | ||
| use sc_client_api::blockchain::HeaderBackend; | ||
| use sc_network::{ | ||
| config::MultiaddrWithPeerId, | ||
|
|
@@ -70,6 +71,9 @@ const AUTHORITIES_PRIORITY_GROUP_NAME: &'static str = "authorities"; | |
| /// Maximum number of addresses cached per authority. Additional addresses are discarded. | ||
| const MAX_ADDRESSES_PER_AUTHORITY: usize = 10; | ||
|
|
||
| /// Maximum number of in-flight DHT lookups at any given point in time. | ||
| const MAX_IN_FLIGHT_LOOKUPS: usize = 8; | ||
|
|
||
| /// Role an authority discovery module can run as. | ||
| pub enum Role { | ||
| /// Actual authority as well as a reference to its key store. | ||
|
|
@@ -137,12 +141,17 @@ where | |
|
|
||
| /// Interval to be proactive, publishing own addresses. | ||
| publish_interval: Interval, | ||
| /// Interval on which to query for addresses of other authorities. | ||
| /// Interval at which to request addresses of authorities, refilling the pending lookups queue. | ||
| query_interval: Interval, | ||
| /// Interval on which to set the peerset priority group to a new random | ||
| /// set of addresses. | ||
| priority_group_set_interval: Interval, | ||
|
|
||
| /// Queue of throttled lookups pending to be passed to the network. | ||
| pending_lookups: Vec<AuthorityId>, | ||
| /// Set of in-flight lookups. | ||
| in_flight_lookups: HashMap<libp2p::kad::record::Key, AuthorityId>, | ||
|
|
||
| addr_cache: addr_cache::AddrCache, | ||
|
|
||
| metrics: Option<Metrics>, | ||
|
|
@@ -183,8 +192,8 @@ where | |
| Duration::from_secs(12 * 60 * 60), | ||
| ); | ||
|
|
||
| // External addresses of other authorities can change at any given point in time. The | ||
| // interval on which to query for external addresses of other authorities is a trade off | ||
| // External addresses of remote authorities can change at any given point in time. The | ||
| // interval on which to trigger new queries for the current authorities is a trade off | ||
| // between efficiency and performance. | ||
| let query_interval_start = Instant::now() + LIBP2P_KADEMLIA_BOOTSTRAP_TIME; | ||
| let query_interval_duration = Duration::from_secs(10 * 60); | ||
|
|
@@ -193,9 +202,9 @@ where | |
| // Querying 500 [`AuthorityId`]s takes ~1m on the Kusama DHT (10th of August 2020) when | ||
| // comparing `authority_discovery_authority_addresses_requested_total` and | ||
| // `authority_discovery_dht_event_received`. With that in mind set the peerset priority | ||
| // group on the same interval as the [`query_interval`] above, just delayed by 2 minutes. | ||
| // group on the same interval as the [`query_interval`] above, just delayed by 5 minutes. | ||
| let priority_group_set_interval = interval_at( | ||
| query_interval_start + Duration::from_secs(2 * 60), | ||
| query_interval_start + Duration::from_secs(5 * 60), | ||
| query_interval_duration, | ||
| ); | ||
|
|
||
|
|
@@ -229,6 +238,8 @@ where | |
| publish_interval, | ||
| query_interval, | ||
| priority_group_set_interval, | ||
| pending_lookups: Vec::new(), | ||
| in_flight_lookups: HashMap::new(), | ||
| addr_cache, | ||
| role, | ||
| metrics, | ||
|
|
@@ -270,7 +281,9 @@ where | |
|
|
||
| if let Some(metrics) = &self.metrics { | ||
| metrics.publish.inc(); | ||
| metrics.amount_last_published.set(addresses.len() as u64); | ||
| metrics.amount_addresses_last_published.set( | ||
| addresses.len().try_into().unwrap_or(std::u64::MAX), | ||
| ); | ||
| } | ||
|
|
||
| let mut serialized_addresses = vec![]; | ||
|
|
@@ -314,15 +327,9 @@ where | |
| Ok(()) | ||
| } | ||
|
|
||
| fn request_addresses_of_others(&mut self) -> Result<()> { | ||
| fn refill_pending_lookups_queue(&mut self) -> Result<()> { | ||
| let id = BlockId::hash(self.client.info().best_hash); | ||
|
|
||
| let authorities = self | ||
| .client | ||
| .runtime_api() | ||
| .authorities(&id) | ||
| .map_err(Error::CallingRuntime)?; | ||
|
|
||
| let local_keys = match &self.role { | ||
| Role::Authority(key_store) => { | ||
| key_store.read() | ||
|
|
@@ -333,21 +340,52 @@ where | |
| Role::Sentry => HashSet::new(), | ||
| }; | ||
|
|
||
| for authority_id in authorities.iter() { | ||
| // Make sure we don't look up our own keys. | ||
| if !local_keys.contains(authority_id.as_ref()) { | ||
| if let Some(metrics) = &self.metrics { | ||
| metrics.request.inc(); | ||
| } | ||
| let mut authorities = self | ||
| .client | ||
| .runtime_api() | ||
| .authorities(&id) | ||
| .map_err(Error::CallingRuntime)? | ||
| .into_iter() | ||
| .filter(|id| !local_keys.contains(id.as_ref())) | ||
| .collect(); | ||
|
|
||
| self.network | ||
| .get_value(&hash_authority_id(authority_id.as_ref())); | ||
| } | ||
| self.addr_cache.retain_ids(&authorities); | ||
|
|
||
| authorities.shuffle(&mut thread_rng()); | ||
| self.pending_lookups = authorities; | ||
| // Ignore all still in-flight lookups. Those that are still in-flight are likely stalled as | ||
| // query interval ticks are far enough apart for all lookups to succeed. | ||
| self.in_flight_lookups.clear(); | ||
|
|
||
| if let Some(metrics) = &self.metrics { | ||
| metrics.requests_pending.set( | ||
| self.pending_lookups.len().try_into().unwrap_or(std::u64::MAX), | ||
| ); | ||
| } | ||
|
|
||
| Ok(()) | ||
| } | ||
|
|
||
| fn start_new_lookups(&mut self) { | ||
| while self.in_flight_lookups.len() < MAX_IN_FLIGHT_LOOKUPS { | ||
| let authority_id = match self.pending_lookups.pop() { | ||
| Some(authority) => authority, | ||
| None => return, | ||
| }; | ||
| let hash = hash_authority_id(authority_id.as_ref()); | ||
| self.network | ||
| .get_value(&hash); | ||
| self.in_flight_lookups.insert(hash, authority_id); | ||
|
|
||
| if let Some(metrics) = &self.metrics { | ||
| metrics.requests.inc(); | ||
| metrics.requests_pending.set( | ||
| self.pending_lookups.len().try_into().unwrap_or(std::u64::MAX), | ||
| ); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// Handle incoming Dht events. | ||
| /// | ||
| /// Returns either: | ||
|
|
@@ -385,10 +423,17 @@ where | |
| metrics.dht_event_received.with_label_values(&["value_not_found"]).inc(); | ||
| } | ||
|
|
||
| debug!( | ||
| target: LOG_TARGET, | ||
| "Value for hash '{:?}' not found on Dht.", hash | ||
| ) | ||
| if self.in_flight_lookups.remove(&hash).is_some() { | ||
| debug!( | ||
| target: LOG_TARGET, | ||
| "Value for hash '{:?}' not found on Dht.", hash | ||
| ) | ||
| } else { | ||
| debug!( | ||
| target: LOG_TARGET, | ||
| "Received 'ValueNotFound' for unexpected hash '{:?}'.", hash | ||
| ) | ||
| } | ||
| }, | ||
| Some(DhtEvent::ValuePut(hash)) => { | ||
| if let Some(metrics) = &self.metrics { | ||
|
|
@@ -434,23 +479,9 @@ where | |
| } | ||
| })?.ok_or(Error::ReceivingDhtValueFoundEventWithNoRecords)?; | ||
|
|
||
| let authorities = { | ||
| let block_id = BlockId::hash(self.client.info().best_hash); | ||
| // From the Dht we only get the hashed authority id. In order to retrieve the actual | ||
| // authority id and to ensure it is actually an authority, we match the hash against the | ||
| // hash of the authority id of all other authorities. | ||
| let authorities = self.client.runtime_api().authorities(&block_id)?; | ||
| self.addr_cache.retain_ids(&authorities); | ||
| authorities | ||
| .into_iter() | ||
| .map(|id| (hash_authority_id(id.as_ref()), id)) | ||
| .collect::<HashMap<_, _>>() | ||
| }; | ||
|
|
||
| // Check if the event origins from an authority in the current or next authority set. | ||
| let authority_id: &AuthorityId = authorities | ||
| .get(&remote_key) | ||
| .ok_or(Error::MatchingHashedAuthorityIdWithAuthorityId)?; | ||
| let authority_id: AuthorityId = self.in_flight_lookups | ||
| .remove(&remote_key) | ||
| .ok_or(Error::ReceivingUnexpectedRecord)?; | ||
|
|
||
| let local_peer_id = self.network.local_peer_id(); | ||
|
|
||
|
|
@@ -463,7 +494,7 @@ where | |
| let signature = AuthoritySignature::decode(&mut &signature[..]) | ||
| .map_err(Error::EncodingDecodingScale)?; | ||
|
|
||
| if !AuthorityPair::verify(&signature, &addresses, authority_id) { | ||
| if !AuthorityPair::verify(&signature, &addresses, &authority_id) { | ||
| return Err(Error::VerifyingDhtPayload); | ||
| } | ||
|
|
||
|
|
@@ -503,7 +534,7 @@ where | |
| .collect(); | ||
|
|
||
| if !remote_addresses.is_empty() { | ||
| self.addr_cache.insert(authority_id.clone(), remote_addresses); | ||
| self.addr_cache.insert(authority_id, remote_addresses); | ||
| if let Some(metrics) = &self.metrics { | ||
| metrics.known_authorities_count.set( | ||
| self.addr_cache.num_ids().try_into().unwrap_or(std::u64::MAX) | ||
|
|
@@ -610,15 +641,15 @@ where | |
| } | ||
| } | ||
|
|
||
| // Request addresses of authorities. | ||
| // Request addresses of authorities, refilling the pending lookups queue. | ||
| if let Poll::Ready(_) = self.query_interval.poll_next_unpin(cx) { | ||
| // Register waker of underlying task for next interval. | ||
| while let Poll::Ready(_) = self.query_interval.poll_next_unpin(cx) {} | ||
romanb marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| if let Err(e) = self.request_addresses_of_others() { | ||
| if let Err(e) = self.refill_pending_lookups_queue() { | ||
| error!( | ||
| target: LOG_TARGET, | ||
| "Failed to request addresses of authorities: {:?}", e, | ||
| "Failed to refill pending lookups queue: {:?}", e, | ||
| ); | ||
| } | ||
| } | ||
|
|
@@ -652,6 +683,8 @@ where | |
| } | ||
| } | ||
|
|
||
| self.start_new_lookups(); | ||
|
|
||
| Poll::Pending | ||
| } | ||
| } | ||
|
|
@@ -712,8 +745,9 @@ fn interval_at(start: Instant, duration: Duration) -> Interval { | |
| #[derive(Clone)] | ||
| pub(crate) struct Metrics { | ||
| publish: Counter<U64>, | ||
| amount_last_published: Gauge<U64>, | ||
| request: Counter<U64>, | ||
| amount_addresses_last_published: Gauge<U64>, | ||
| requests: Counter<U64>, | ||
| requests_pending: Gauge<U64>, | ||
| dht_event_received: CounterVec<U64>, | ||
| handle_value_found_event_failure: Counter<U64>, | ||
| known_authorities_count: Gauge<U64>, | ||
|
|
@@ -730,22 +764,29 @@ impl Metrics { | |
| )?, | ||
| registry, | ||
| )?, | ||
| amount_last_published: register( | ||
| amount_addresses_last_published: register( | ||
| Gauge::new( | ||
| "authority_discovery_amount_external_addresses_last_published", | ||
| "Number of external addresses published when authority discovery last \ | ||
| published addresses." | ||
| )?, | ||
| registry, | ||
| )?, | ||
| request: register( | ||
| requests: register( | ||
| Counter::new( | ||
| "authority_discovery_authority_addresses_requested_total", | ||
| "Number of times authority discovery has requested external addresses of a \ | ||
| single authority." | ||
| )?, | ||
| registry, | ||
| )?, | ||
| requests_pending: register( | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't it better to split this between two counters There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure I understand your question correctly. Let me try. With this pull request authority discovery publishes (among others) the following 3 metrics:
Your suggested We could split Does that make sense @tomaka? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If the value of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, now I get the confusion. There is a difference between pending, in-flight and finished lookups. Every 10 minutes the authority discovery module requests the current and next authority set from the runtime and overrides
Does that make sense @tomaka? |
||
| Gauge::new( | ||
| "authority_discovery_authority_address_requests_pending", | ||
| "Number of pending authority address requests." | ||
| )?, | ||
| registry, | ||
| )?, | ||
| dht_event_received: register( | ||
| CounterVec::new( | ||
| Opts::new( | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.