Skip to content

Commit f8c83bd

Browse files
romanbmxinden
andauthored
Add support for sourced metrics. (paritytech#6895)
* Add support for sourced metrics. A sourced metric is a metric that obtains its values from an existing source, rather than the values being independently recorded. It thus allows collecting metrics from existing counters or gauges without having to duplicate them in a dedicated prometheus counter or gauge (and hence another atomic value). The first use-case is to feed the bandwidth counters from libp2p directly into prometheus. * Tabs, not spaces. * Tweak bandwidth counter registration. * Add debug assertion for variable labels and values. * Document monotonicity requirement for sourced counters. * CI * Update client/network/src/service.rs Co-authored-by: Max Inden <[email protected]> Co-authored-by: Max Inden <[email protected]>
1 parent 8e1ed7d commit f8c83bd

File tree

3 files changed

+187
-18
lines changed

3 files changed

+187
-18
lines changed

client/network/src/service.rs

Lines changed: 41 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ use parking_lot::Mutex;
5353
use prometheus_endpoint::{
5454
register, Counter, CounterVec, Gauge, GaugeVec, Histogram, HistogramOpts, HistogramVec, Opts,
5555
PrometheusError, Registry, U64,
56+
SourcedCounter, MetricSource
5657
};
5758
use sc_peerset::PeersetHandle;
5859
use sp_consensus::import_queue::{BlockImportError, BlockImportResult, ImportQueue, Link};
@@ -240,12 +241,6 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
240241
local_peer_id_legacy
241242
);
242243

243-
// Initialize the metrics.
244-
let metrics = match &params.metrics_registry {
245-
Some(registry) => Some(Metrics::register(&registry)?),
246-
None => None
247-
};
248-
249244
let checker = params.on_demand.as_ref()
250245
.map(|od| od.checker().clone())
251246
.unwrap_or_else(|| Arc::new(AlwaysBadChecker));
@@ -353,6 +348,17 @@ impl<B: BlockT + 'static, H: ExHashT> NetworkWorker<B, H> {
353348
(builder.build(), bandwidth)
354349
};
355350

351+
// Initialize the metrics.
352+
let metrics = match &params.metrics_registry {
353+
Some(registry) => {
354+
// Sourced metrics.
355+
BandwidthCounters::register(registry, bandwidth.clone())?;
356+
// Other (i.e. new) metrics.
357+
Some(Metrics::register(registry)?)
358+
}
359+
None => None
360+
};
361+
356362
// Listen on multiaddresses.
357363
for addr in &params.network_config.listen_addresses {
358364
if let Err(err) = Swarm::<B, H>::listen_on(&mut swarm, addr.clone()) {
@@ -1152,9 +1158,6 @@ struct Metrics {
11521158
kbuckets_num_nodes: GaugeVec<U64>,
11531159
listeners_local_addresses: Gauge<U64>,
11541160
listeners_errors_total: Counter<U64>,
1155-
// Note: `network_bytes_total` is a monotonic gauge obtained by
1156-
// sampling an existing counter.
1157-
network_bytes_total: GaugeVec<U64>,
11581161
notifications_sizes: HistogramVec,
11591162
notifications_streams_closed_total: CounterVec<U64>,
11601163
notifications_streams_opened_total: CounterVec<U64>,
@@ -1168,6 +1171,35 @@ struct Metrics {
11681171
requests_out_started_total: CounterVec<U64>,
11691172
}
11701173

1174+
/// The source for bandwidth metrics.
1175+
#[derive(Clone)]
1176+
struct BandwidthCounters(Arc<transport::BandwidthSinks>);
1177+
1178+
impl BandwidthCounters {
1179+
fn register(registry: &Registry, sinks: Arc<transport::BandwidthSinks>)
1180+
-> Result<(), PrometheusError>
1181+
{
1182+
register(SourcedCounter::new(
1183+
&Opts::new(
1184+
"sub_libp2p_network_bytes_total",
1185+
"Total bandwidth usage"
1186+
).variable_label("direction"),
1187+
BandwidthCounters(sinks),
1188+
)?, registry)?;
1189+
1190+
Ok(())
1191+
}
1192+
}
1193+
1194+
impl MetricSource for BandwidthCounters {
1195+
type N = u64;
1196+
1197+
fn collect(&self, mut set: impl FnMut(&[&str], Self::N)) {
1198+
set(&[&"in"], self.0.total_inbound());
1199+
set(&[&"out"], self.0.total_outbound());
1200+
}
1201+
}
1202+
11711203
impl Metrics {
11721204
fn register(registry: &Registry) -> Result<Self, PrometheusError> {
11731205
Ok(Self {
@@ -1271,13 +1303,6 @@ impl Metrics {
12711303
"sub_libp2p_listeners_errors_total",
12721304
"Total number of non-fatal errors reported by a listener"
12731305
)?, registry)?,
1274-
network_bytes_total: register(GaugeVec::new(
1275-
Opts::new(
1276-
"sub_libp2p_network_bytes_total",
1277-
"Total bandwidth usage"
1278-
),
1279-
&["direction"]
1280-
)?, registry)?,
12811306
notifications_sizes: register(HistogramVec::new(
12821307
HistogramOpts {
12831308
common_opts: Opts::new(
@@ -1725,8 +1750,6 @@ impl<B: BlockT + 'static, H: ExHashT> Future for NetworkWorker<B, H> {
17251750
this.is_major_syncing.store(is_major_syncing, Ordering::Relaxed);
17261751

17271752
if let Some(metrics) = this.metrics.as_ref() {
1728-
metrics.network_bytes_total.with_label_values(&["in"]).set(this.service.bandwidth.total_inbound());
1729-
metrics.network_bytes_total.with_label_values(&["out"]).set(this.service.bandwidth.total_outbound());
17301753
metrics.is_major_syncing.set(is_major_syncing as u64);
17311754
for (proto, num_entries) in this.network_service.num_kbuckets_entries() {
17321755
let proto = maybe_utf8_bytes_to_string(proto.as_bytes());

utils/prometheus/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@ use std::net::SocketAddr;
3131

3232
#[cfg(not(target_os = "unknown"))]
3333
mod networking;
34+
mod sourced;
35+
36+
pub use sourced::{SourcedCounter, SourcedGauge, MetricSource};
3437

3538
#[cfg(target_os = "unknown")]
3639
pub use unknown_os::init_prometheus;

utils/prometheus/src/sourced.rs

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
// Copyright 2020 Parity Technologies (UK) Ltd.
2+
// This file is part of Substrate.
3+
4+
// Substrate is free software: you can redistribute it and/or modify
5+
// it under the terms of the GNU General Public License as published by
6+
// the Free Software Foundation, either version 3 of the License, or
7+
// (at your option) any later version.
8+
9+
// Substrate is distributed in the hope that it will be useful,
10+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
// GNU General Public License for more details.
13+
14+
// You should have received a copy of the GNU General Public License
15+
// along with Substrate. If not, see <http://www.gnu.org/licenses/>.
16+
17+
//! Metrics that are collected from existing sources.
18+
19+
use prometheus::core::{Collector, Desc, Describer, Number, Opts};
20+
use prometheus::proto;
21+
use std::{cmp::Ordering, marker::PhantomData};
22+
23+
/// A counter whose values are obtained from an existing source.
24+
///
25+
/// > **Note*: The counter values provided by the source `S`
26+
/// > must be monotonically increasing. Otherwise use a
27+
/// > [`SourcedGauge`] instead.
28+
pub type SourcedCounter<S> = SourcedMetric<Counter, S>;
29+
30+
/// A gauge whose values are obtained from an existing source.
31+
pub type SourcedGauge<S> = SourcedMetric<Gauge, S>;
32+
33+
/// The type of a sourced counter.
34+
#[derive(Copy, Clone)]
35+
pub enum Counter {}
36+
37+
/// The type of a sourced gauge.
38+
#[derive(Copy, Clone)]
39+
pub enum Gauge {}
40+
41+
/// A metric whose values are obtained from an existing source,
42+
/// instead of being independently recorded.
43+
#[derive(Debug, Clone)]
44+
pub struct SourcedMetric<T, S> {
45+
source: S,
46+
desc: Desc,
47+
_type: PhantomData<T>,
48+
}
49+
50+
/// A source of values for a [`SourcedMetric`].
51+
pub trait MetricSource: Sync + Send + Clone {
52+
/// The type of the collected values.
53+
type N: Number;
54+
/// Collects the current values of the metrics from the source.
55+
fn collect(&self, set: impl FnMut(&[&str], Self::N));
56+
}
57+
58+
impl<T: SourcedType, S: MetricSource> SourcedMetric<T, S> {
59+
/// Creates a new metric that obtains its values from the given source.
60+
pub fn new(opts: &Opts, source: S) -> prometheus::Result<Self> {
61+
let desc = opts.describe()?;
62+
Ok(Self { source, desc, _type: PhantomData })
63+
}
64+
}
65+
66+
impl<T: SourcedType, S: MetricSource> Collector for SourcedMetric<T, S> {
67+
fn desc(&self) -> Vec<&Desc> {
68+
vec![&self.desc]
69+
}
70+
71+
fn collect(&self) -> Vec<proto::MetricFamily> {
72+
let mut counters = Vec::new();
73+
74+
self.source.collect(|label_values, value| {
75+
let mut m = proto::Metric::default();
76+
77+
match T::proto() {
78+
proto::MetricType::COUNTER => {
79+
let mut c = proto::Counter::default();
80+
c.set_value(value.into_f64());
81+
m.set_counter(c);
82+
}
83+
proto::MetricType::GAUGE => {
84+
let mut g = proto::Gauge::default();
85+
g.set_value(value.into_f64());
86+
m.set_gauge(g);
87+
}
88+
t => {
89+
log::error!("Unsupported sourced metric type: {:?}", t);
90+
}
91+
}
92+
93+
debug_assert_eq!(self.desc.variable_labels.len(), label_values.len());
94+
match self.desc.variable_labels.len().cmp(&label_values.len()) {
95+
Ordering::Greater =>
96+
log::warn!("Missing label values for sourced metric {}", self.desc.fq_name),
97+
Ordering::Less =>
98+
log::warn!("Too many label values for sourced metric {}", self.desc.fq_name),
99+
Ordering::Equal => {}
100+
}
101+
102+
m.set_label(self.desc.variable_labels.iter().zip(label_values)
103+
.map(|(l_name, l_value)| {
104+
let mut l = proto::LabelPair::default();
105+
l.set_name(l_name.to_string());
106+
l.set_value(l_value.to_string());
107+
l
108+
})
109+
.chain(self.desc.const_label_pairs.iter().cloned())
110+
.collect::<Vec<_>>());
111+
112+
counters.push(m);
113+
});
114+
115+
let mut m = proto::MetricFamily::default();
116+
m.set_name(self.desc.fq_name.clone());
117+
m.set_help(self.desc.help.clone());
118+
m.set_field_type(T::proto());
119+
m.set_metric(counters);
120+
121+
vec![m]
122+
}
123+
}
124+
125+
/// Types of metrics that can obtain their values from an existing source.
126+
pub trait SourcedType: private::Sealed + Sync + Send {
127+
#[doc(hidden)]
128+
fn proto() -> proto::MetricType;
129+
}
130+
131+
impl SourcedType for Counter {
132+
fn proto() -> proto::MetricType { proto::MetricType::COUNTER }
133+
}
134+
135+
impl SourcedType for Gauge {
136+
fn proto() -> proto::MetricType { proto::MetricType::GAUGE }
137+
}
138+
139+
mod private {
140+
pub trait Sealed {}
141+
impl Sealed for super::Counter {}
142+
impl Sealed for super::Gauge {}
143+
}

0 commit comments

Comments
 (0)