Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions client/informant/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use futures::prelude::*;
use log::{info, trace, warn};
use parity_util_mem::MallocSizeOf;
use sc_client_api::{BlockchainEvents, UsageProvider};
use sc_network::{network_state::NetworkState, NetworkStatus};
use sc_network::NetworkStatus;
use sp_blockchain::HeaderMetadata;
use sp_runtime::traits::{Block as BlockT, Header};
use sp_transaction_pool::TransactionPool;
Expand Down Expand Up @@ -81,7 +81,7 @@ impl<T: TransactionPool + MallocSizeOf> TransactionPoolAndMaybeMallogSizeOf for
/// Builds the informant and returns a `Future` that drives the informant.
pub fn build<B: BlockT, C>(
client: Arc<C>,
network_status_sinks: Arc<status_sinks::StatusSinks<(NetworkStatus<B>, NetworkState)>>,
network_status_sinks: Arc<status_sinks::StatusSinks<NetworkStatus<B>>>,
pool: Arc<impl TransactionPoolAndMaybeMallogSizeOf>,
format: OutputFormat,
) -> impl futures::Future<Output = ()>
Expand All @@ -96,7 +96,7 @@ where
network_status_sinks.push(Duration::from_millis(5000), network_status_sink);

let display_notifications = network_status_stream
.for_each(move |(net_status, _)| {
.for_each(move |net_status| {
let info = client_1.usage_info();
if let Some(ref usage) = info.usage {
trace!(target: "usage", "Usage statistics: {}", usage);
Expand Down
4 changes: 0 additions & 4 deletions client/network/src/network_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@ pub struct NetworkState {
pub connected_peers: HashMap<String, Peer>,
/// List of node that we know of but that we're not connected to.
pub not_connected_peers: HashMap<String, NotConnectedPeer>,
/// The total number of bytes received.
pub total_bytes_inbound: u64,
/// The total number of bytes sent.
pub total_bytes_outbound: u64,
/// State of the peerset manager.
pub peerset: serde_json::Value,
}
Expand Down
294 changes: 21 additions & 273 deletions client/network/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
//! which is then processed by [`NetworkWorker::poll`].

use crate::{
ExHashT, NetworkStateInfo,
ExHashT, NetworkStateInfo, NetworkStatus,
behaviour::{self, Behaviour, BehaviourOut},
config::{parse_str_addr, NonReservedPeerMode, Params, Role, TransportConfig},
DhtEvent,
Expand All @@ -49,12 +49,8 @@ use libp2p::kad::record;
use libp2p::ping::handler::PingFailure;
use libp2p::swarm::{NetworkBehaviour, SwarmBuilder, SwarmEvent, protocols_handler::NodeHandlerWrapperError};
use log::{error, info, trace, warn};
use metrics::{Metrics, MetricSources, Histogram, HistogramVec};
use parking_lot::Mutex;
use prometheus_endpoint::{
register, Counter, CounterVec, Gauge, GaugeVec, Histogram, HistogramOpts, HistogramVec, Opts,
PrometheusError, Registry, U64,
SourcedCounter, MetricSource
};
use sc_peerset::PeersetHandle;
use sp_consensus::import_queue::{BlockImportError, BlockImportResult, ImportQueue, Link};
use sp_runtime::{
Expand All @@ -80,6 +76,7 @@ use wasm_timer::Instant;

pub use behaviour::{ResponseFailure, InboundFailure, RequestFailure, OutboundFailure};

mod metrics;
mod out_events;
#[cfg(test)]
mod tests;
Expand Down Expand Up @@ -365,10 +362,11 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
// Initialize the metrics.
let metrics = match &params.metrics_registry {
Some(registry) => {
// Sourced metrics.
BandwidthCounters::register(registry, bandwidth.clone())?;
// Other (i.e. new) metrics.
Some(Metrics::register(registry)?)
Some(metrics::register(registry, MetricSources {
bandwidth: bandwidth.clone(),
major_syncing: is_major_syncing.clone(),
connected_peers: num_connected.clone(),
})?)
}
None => None
};
Expand Down Expand Up @@ -423,6 +421,19 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
})
}

/// High-level network status information.
pub fn status(&self) -> NetworkStatus<B> {
NetworkStatus {
sync_state: self.sync_state(),
best_seen_block: self.best_seen_block(),
num_sync_peers: self.num_sync_peers(),
num_connected_peers: self.num_connected_peers(),
num_active_peers: self.num_active_peers(),
total_bytes_inbound: self.total_bytes_inbound(),
total_bytes_outbound: self.total_bytes_outbound(),
}
}

/// Returns the total number of bytes received so far.
pub fn total_bytes_inbound(&self) -> u64 {
self.service.bandwidth.total_inbound()
Expand Down Expand Up @@ -562,8 +573,6 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
peer_id: Swarm::<B, H>::local_peer_id(&swarm).to_base58(),
listened_addresses: Swarm::<B, H>::listeners(&swarm).cloned().collect(),
external_addresses: Swarm::<B, H>::external_addresses(&swarm).cloned().collect(),
total_bytes_inbound: self.service.bandwidth.total_inbound(),
total_bytes_outbound: self.service.bandwidth.total_outbound(),
connected_peers,
not_connected_peers,
peerset: swarm.user_protocol_mut().peerset_debug_info(),
Expand Down Expand Up @@ -1204,265 +1213,6 @@ pub struct NetworkWorker<B: BlockT + 'static, H: ExHashT> {
peers_notifications_sinks: Arc<Mutex<HashMap<(PeerId, ConsensusEngineId), NotificationsSink>>>,
}

struct Metrics {
// This list is ordered alphabetically
connections_closed_total: CounterVec<U64>,
connections_opened_total: CounterVec<U64>,
distinct_peers_connections_closed_total: Counter<U64>,
distinct_peers_connections_opened_total: Counter<U64>,
import_queue_blocks_submitted: Counter<U64>,
import_queue_finality_proofs_submitted: Counter<U64>,
import_queue_justifications_submitted: Counter<U64>,
incoming_connections_errors_total: CounterVec<U64>,
incoming_connections_total: Counter<U64>,
is_major_syncing: Gauge<U64>,
issued_light_requests: Counter<U64>,
kademlia_query_duration: HistogramVec,
kademlia_random_queries_total: CounterVec<U64>,
kademlia_records_count: GaugeVec<U64>,
kademlia_records_sizes_total: GaugeVec<U64>,
kbuckets_num_nodes: GaugeVec<U64>,
listeners_local_addresses: Gauge<U64>,
listeners_errors_total: Counter<U64>,
notifications_sizes: HistogramVec,
notifications_streams_closed_total: CounterVec<U64>,
notifications_streams_opened_total: CounterVec<U64>,
peers_count: Gauge<U64>,
peerset_num_discovered: Gauge<U64>,
peerset_num_requested: Gauge<U64>,
pending_connections: Gauge<U64>,
pending_connections_errors_total: CounterVec<U64>,
requests_in_failure_total: CounterVec<U64>,
requests_in_success_total: HistogramVec,
requests_out_failure_total: CounterVec<U64>,
requests_out_success_total: HistogramVec,
requests_out_started_total: CounterVec<U64>,
}

/// The source for bandwidth metrics.
#[derive(Clone)]
struct BandwidthCounters(Arc<transport::BandwidthSinks>);

impl BandwidthCounters {
fn register(registry: &Registry, sinks: Arc<transport::BandwidthSinks>)
-> Result<(), PrometheusError>
{
register(SourcedCounter::new(
&Opts::new(
"sub_libp2p_network_bytes_total",
"Total bandwidth usage"
).variable_label("direction"),
BandwidthCounters(sinks),
)?, registry)?;

Ok(())
}
}

impl MetricSource for BandwidthCounters {
type N = u64;

fn collect(&self, mut set: impl FnMut(&[&str], Self::N)) {
set(&[&"in"], self.0.total_inbound());
set(&[&"out"], self.0.total_outbound());
}
}

impl Metrics {
fn register(registry: &Registry) -> Result<Self, PrometheusError> {
Ok(Self {
// This list is ordered alphabetically
connections_closed_total: register(CounterVec::new(
Opts::new(
"sub_libp2p_connections_closed_total",
"Total number of connections closed, by direction and reason"
),
&["direction", "reason"]
)?, registry)?,
connections_opened_total: register(CounterVec::new(
Opts::new(
"sub_libp2p_connections_opened_total",
"Total number of connections opened by direction"
),
&["direction"]
)?, registry)?,
distinct_peers_connections_closed_total: register(Counter::new(
"sub_libp2p_distinct_peers_connections_closed_total",
"Total number of connections closed with distinct peers"
)?, registry)?,
distinct_peers_connections_opened_total: register(Counter::new(
"sub_libp2p_distinct_peers_connections_opened_total",
"Total number of connections opened with distinct peers"
)?, registry)?,
import_queue_blocks_submitted: register(Counter::new(
"import_queue_blocks_submitted",
"Number of blocks submitted to the import queue.",
)?, registry)?,
import_queue_finality_proofs_submitted: register(Counter::new(
"import_queue_finality_proofs_submitted",
"Number of finality proofs submitted to the import queue.",
)?, registry)?,
import_queue_justifications_submitted: register(Counter::new(
"import_queue_justifications_submitted",
"Number of justifications submitted to the import queue.",
)?, registry)?,
incoming_connections_errors_total: register(CounterVec::new(
Opts::new(
"sub_libp2p_incoming_connections_handshake_errors_total",
"Total number of incoming connections that have failed during the \
initial handshake"
),
&["reason"]
)?, registry)?,
incoming_connections_total: register(Counter::new(
"sub_libp2p_incoming_connections_total",
"Total number of incoming connections on the listening sockets"
)?, registry)?,
is_major_syncing: register(Gauge::new(
"sub_libp2p_is_major_syncing", "Whether the node is performing a major sync or not.",
)?, registry)?,
issued_light_requests: register(Counter::new(
"issued_light_requests",
"Number of light client requests that our node has issued.",
)?, registry)?,
kademlia_query_duration: register(HistogramVec::new(
HistogramOpts {
common_opts: Opts::new(
"sub_libp2p_kademlia_query_duration",
"Duration of Kademlia queries per query type"
),
buckets: prometheus_endpoint::exponential_buckets(0.5, 2.0, 10)
.expect("parameters are always valid values; qed"),
},
&["type"]
)?, registry)?,
kademlia_random_queries_total: register(CounterVec::new(
Opts::new(
"sub_libp2p_kademlia_random_queries_total",
"Number of random Kademlia queries started"
),
&["protocol"]
)?, registry)?,
kademlia_records_count: register(GaugeVec::new(
Opts::new(
"sub_libp2p_kademlia_records_count",
"Number of records in the Kademlia records store"
),
&["protocol"]
)?, registry)?,
kademlia_records_sizes_total: register(GaugeVec::new(
Opts::new(
"sub_libp2p_kademlia_records_sizes_total",
"Total size of all the records in the Kademlia records store"
),
&["protocol"]
)?, registry)?,
kbuckets_num_nodes: register(GaugeVec::new(
Opts::new(
"sub_libp2p_kbuckets_num_nodes",
"Number of nodes in the Kademlia k-buckets"
),
&["protocol"]
)?, registry)?,
listeners_local_addresses: register(Gauge::new(
"sub_libp2p_listeners_local_addresses", "Number of local addresses we're listening on"
)?, registry)?,
listeners_errors_total: register(Counter::new(
"sub_libp2p_listeners_errors_total",
"Total number of non-fatal errors reported by a listener"
)?, registry)?,
notifications_sizes: register(HistogramVec::new(
HistogramOpts {
common_opts: Opts::new(
"sub_libp2p_notifications_sizes",
"Sizes of the notifications send to and received from all nodes"
),
buckets: prometheus_endpoint::exponential_buckets(64.0, 4.0, 8)
.expect("parameters are always valid values; qed"),
},
&["direction", "protocol"]
)?, registry)?,
notifications_streams_closed_total: register(CounterVec::new(
Opts::new(
"sub_libp2p_notifications_streams_closed_total",
"Total number of notification substreams that have been closed"
),
&["protocol"]
)?, registry)?,
notifications_streams_opened_total: register(CounterVec::new(
Opts::new(
"sub_libp2p_notifications_streams_opened_total",
"Total number of notification substreams that have been opened"
),
&["protocol"]
)?, registry)?,
peers_count: register(Gauge::new(
"sub_libp2p_peers_count", "Number of network gossip peers",
)?, registry)?,
peerset_num_discovered: register(Gauge::new(
"sub_libp2p_peerset_num_discovered", "Number of nodes stored in the peerset manager",
)?, registry)?,
peerset_num_requested: register(Gauge::new(
"sub_libp2p_peerset_num_requested", "Number of nodes that the peerset manager wants us to be connected to",
)?, registry)?,
pending_connections: register(Gauge::new(
"sub_libp2p_pending_connections",
"Number of connections in the process of being established",
)?, registry)?,
pending_connections_errors_total: register(CounterVec::new(
Opts::new(
"sub_libp2p_pending_connections_errors_total",
"Total number of pending connection errors"
),
&["reason"]
)?, registry)?,
requests_in_failure_total: register(CounterVec::new(
Opts::new(
"sub_libp2p_requests_in_failure_total",
"Total number of incoming requests that the node has failed to answer"
),
&["protocol", "reason"]
)?, registry)?,
requests_in_success_total: register(HistogramVec::new(
HistogramOpts {
common_opts: Opts::new(
"sub_libp2p_requests_in_success_total",
"Total number of requests received and answered"
),
buckets: prometheus_endpoint::exponential_buckets(0.001, 2.0, 16)
.expect("parameters are always valid values; qed"),
},
&["protocol"]
)?, registry)?,
requests_out_failure_total: register(CounterVec::new(
Opts::new(
"sub_libp2p_requests_out_failure_total",
"Total number of requests that have failed"
),
&["protocol", "reason"]
)?, registry)?,
requests_out_success_total: register(HistogramVec::new(
HistogramOpts {
common_opts: Opts::new(
"sub_libp2p_requests_out_success_total",
"For successful requests, time between a request's start and finish"
),
buckets: prometheus_endpoint::exponential_buckets(0.001, 2.0, 16)
.expect("parameters are always valid values; qed"),
},
&["protocol"]
)?, registry)?,
requests_out_started_total: register(CounterVec::new(
Opts::new(
"sub_libp2p_requests_out_started_total",
"Total number of requests emitted"
),
&["protocol"]
)?, registry)?,
})
}
}

impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
type Output = ();

Expand Down Expand Up @@ -1931,7 +1681,6 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
this.is_major_syncing.store(is_major_syncing, Ordering::Relaxed);

if let Some(metrics) = this.metrics.as_ref() {
metrics.is_major_syncing.set(is_major_syncing as u64);
for (proto, num_entries) in this.network_service.num_kbuckets_entries() {
metrics.kbuckets_num_nodes.with_label_values(&[&proto.as_ref()]).set(num_entries as u64);
}
Expand All @@ -1941,7 +1690,6 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
for (proto, num_entries) in this.network_service.kademlia_records_total_size() {
metrics.kademlia_records_sizes_total.with_label_values(&[&proto.as_ref()]).set(num_entries as u64);
}
metrics.peers_count.set(num_connected_peers as u64);
metrics.peerset_num_discovered.set(this.network_service.user_protocol().num_discovered_peers() as u64);
metrics.peerset_num_requested.set(this.network_service.user_protocol().requested_peers().count() as u64);
metrics.pending_connections.set(Swarm::network_info(&this.network_service).num_connections_pending as u64);
Expand Down
Loading