From 793a5fd2da810978dff891986ccf1a0da97faad0 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Fri, 16 Dec 2022 18:13:33 +0300 Subject: [PATCH 01/14] minor: remove event count argument from `Metrics::event_in()` that is always `1` --- client/network/src/service/out_events.rs | 32 ++++++++---------------- 1 file changed, 11 insertions(+), 21 deletions(-) diff --git a/client/network/src/service/out_events.rs b/client/network/src/service/out_events.rs index 4144d7f19551e..b09efaff9056c 100644 --- a/client/network/src/service/out_events.rs +++ b/client/network/src/service/out_events.rs @@ -165,7 +165,7 @@ impl OutChannels { if let Some(metrics) = &*self.metrics { for ev in &self.event_streams { - metrics.event_in(&event, 1, ev.name); + metrics.event_in(&event, ev.name); } } } @@ -232,45 +232,35 @@ impl Metrics { }) } - fn event_in(&self, event: &Event, num: u64, name: &str) { + fn event_in(&self, event: &Event, name: &str) { match event { Event::Dht(_) => { - self.events_total.with_label_values(&["dht", "sent", name]).inc_by(num); + self.events_total.with_label_values(&["dht", "sent", name]).inc(); }, Event::SyncConnected { .. } => { - self.events_total - .with_label_values(&["sync-connected", "sent", name]) - .inc_by(num); + self.events_total.with_label_values(&["sync-connected", "sent", name]).inc(); }, Event::SyncDisconnected { .. } => { - self.events_total - .with_label_values(&["sync-disconnected", "sent", name]) - .inc_by(num); + self.events_total.with_label_values(&["sync-disconnected", "sent", name]).inc(); }, Event::NotificationStreamOpened { protocol, .. } => { format_label("notif-open-", protocol, |protocol_label| { - self.events_total - .with_label_values(&[protocol_label, "sent", name]) - .inc_by(num); + self.events_total.with_label_values(&[protocol_label, "sent", name]).inc(); }); }, Event::NotificationStreamClosed { protocol, .. } => { format_label("notif-closed-", protocol, |protocol_label| { - self.events_total - .with_label_values(&[protocol_label, "sent", name]) - .inc_by(num); + self.events_total.with_label_values(&[protocol_label, "sent", name]).inc(); }); }, Event::NotificationsReceived { messages, .. } => for (protocol, message) in messages { format_label("notif-", protocol, |protocol_label| { - self.events_total - .with_label_values(&[protocol_label, "sent", name]) - .inc_by(num); + self.events_total.with_label_values(&[protocol_label, "sent", name]).inc(); }); - self.notifications_sizes.with_label_values(&[protocol, "sent", name]).inc_by( - num.saturating_mul(u64::try_from(message.len()).unwrap_or(u64::MAX)), - ); + self.notifications_sizes + .with_label_values(&[protocol, "sent", name]) + .inc_by(u64::try_from(message.len()).unwrap_or(u64::MAX)); }, } } From 2b661e230903590341f58d4b6c168d5845ad2dc8 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Fri, 16 Dec 2022 21:39:58 +0300 Subject: [PATCH 02/14] OutChannel queue size warning --- client/network/src/service.rs | 2 +- client/network/src/service/out_events.rs | 42 ++++++++++++++++++++---- 2 files changed, 37 insertions(+), 7 deletions(-) diff --git a/client/network/src/service.rs b/client/network/src/service.rs index 08e498299a1d3..f07199d0be497 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -1003,7 +1003,7 @@ where H: ExHashT, { fn event_stream(&self, name: &'static str) -> Pin + Send>> { - let (tx, rx) = out_events::channel(name); + let (tx, rx) = out_events::channel(name, 100000); let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::EventStream(tx)); Box::pin(rx) } diff --git a/client/network/src/service/out_events.rs b/client/network/src/service/out_events.rs index b09efaff9056c..fc338612cc922 100644 --- a/client/network/src/service/out_events.rs +++ b/client/network/src/service/out_events.rs @@ -32,6 +32,7 @@ //! collection. use futures::{channel::mpsc, prelude::*, ready, stream::FusedStream}; +use log::error; use parking_lot::Mutex; use prometheus_endpoint::{register, CounterVec, GaugeVec, Opts, PrometheusError, Registry, U64}; use sc_network_common::protocol::event::Event; @@ -39,18 +40,29 @@ use std::{ cell::RefCell, fmt, pin::Pin, - sync::Arc, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, task::{Context, Poll}, }; /// Creates a new channel that can be associated to a [`OutChannels`]. /// /// The name is used in Prometheus reports. -pub fn channel(name: &'static str) -> (Sender, Receiver) { +pub fn channel(name: &'static str, queue_size_warning: usize) -> (Sender, Receiver) { let (tx, rx) = mpsc::unbounded(); let metrics = Arc::new(Mutex::new(None)); - let tx = Sender { inner: tx, name, metrics: metrics.clone() }; - let rx = Receiver { inner: rx, name, metrics }; + let queue_size = Arc::new(AtomicUsize::new(0)); + let tx = Sender { + inner: tx, + name, + queue_size: queue_size.clone(), + queue_size_warning, + warning_fired: false, + metrics: metrics.clone(), + }; + let rx = Receiver { inner: rx, name, queue_size, metrics }; (tx, rx) } @@ -63,7 +75,14 @@ pub fn channel(name: &'static str) -> (Sender, Receiver) { /// sync on drop. If someone adds a `#[derive(Clone)]` below, it is **wrong**. pub struct Sender { inner: mpsc::UnboundedSender, + /// Name to identify the channel (e.g., in Prometheus and logs). name: &'static str, + /// Number of events in the queue. Clone of [`Receiver::in_transit`]. + queue_size: Arc, + /// Threshold queue size to generate an error message in the logs. + queue_size_warning: usize, + /// We generate the error message only once to not spam the logs. + warning_fired: bool, /// Clone of [`Receiver::metrics`]. metrics: Arc>>>>, } @@ -87,6 +106,7 @@ impl Drop for Sender { pub struct Receiver { inner: mpsc::UnboundedReceiver, name: &'static str, + queue_size: Arc, /// Initially contains `None`, and will be set to a value once the corresponding [`Sender`] /// is assigned to an instance of [`OutChannels`]. metrics: Arc>>>>, @@ -97,6 +117,7 @@ impl Stream for Receiver { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { if let Some(ev) = ready!(Pin::new(&mut self.inner).poll_next(cx)) { + let _ = self.queue_size.fetch_sub(1, Ordering::Release); let metrics = self.metrics.lock().clone(); match metrics.as_ref().map(|m| m.as_ref()) { Some(Some(metrics)) => metrics.event_out(&ev, self.name), @@ -160,8 +181,17 @@ impl OutChannels { /// Sends an event. pub fn send(&mut self, event: Event) { - self.event_streams - .retain(|sender| sender.inner.unbounded_send(event.clone()).is_ok()); + self.event_streams.retain_mut(|sender| { + let queue_size = sender.queue_size.fetch_add(1, Ordering::Acquire); + if queue_size == sender.queue_size_warning && !sender.warning_fired { + sender.warning_fired = true; + error!( + "Number of unprocessed events in channel `{}` reached {}.", + sender.name, sender.queue_size_warning, + ); + } + sender.inner.unbounded_send(event.clone()).is_ok() + }); if let Some(metrics) = &*self.metrics { for ev in &self.event_streams { From 3144bd571d4c1e3ca90fcc47805fd434e6d25b23 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Mon, 19 Dec 2022 14:04:04 +0300 Subject: [PATCH 03/14] Attach creation backtrace to channal for queue size warning reporting --- client/network/src/service.rs | 2 +- client/network/src/service/out_events.rs | 11 +++++++++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/client/network/src/service.rs b/client/network/src/service.rs index f07199d0be497..d11482b313876 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -1003,7 +1003,7 @@ where H: ExHashT, { fn event_stream(&self, name: &'static str) -> Pin + Send>> { - let (tx, rx) = out_events::channel(name, 100000); + let (tx, rx) = out_events::channel(name, 100_000); let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::EventStream(tx)); Box::pin(rx) } diff --git a/client/network/src/service/out_events.rs b/client/network/src/service/out_events.rs index fc338612cc922..91f8ca2319597 100644 --- a/client/network/src/service/out_events.rs +++ b/client/network/src/service/out_events.rs @@ -37,6 +37,7 @@ use parking_lot::Mutex; use prometheus_endpoint::{register, CounterVec, GaugeVec, Opts, PrometheusError, Registry, U64}; use sc_network_common::protocol::event::Event; use std::{ + backtrace::Backtrace, cell::RefCell, fmt, pin::Pin, @@ -60,6 +61,7 @@ pub fn channel(name: &'static str, queue_size_warning: usize) -> (Sender, Receiv queue_size: queue_size.clone(), queue_size_warning, warning_fired: false, + created_backtrace: Backtrace::force_capture(), metrics: metrics.clone(), }; let rx = Receiver { inner: rx, name, queue_size, metrics }; @@ -83,6 +85,8 @@ pub struct Sender { queue_size_warning: usize, /// We generate the error message only once to not spam the logs. warning_fired: bool, + /// Backtrace of a place where the channel was created. + created_backtrace: Backtrace, /// Clone of [`Receiver::metrics`]. metrics: Arc>>>>, } @@ -186,8 +190,11 @@ impl OutChannels { if queue_size == sender.queue_size_warning && !sender.warning_fired { sender.warning_fired = true; error!( - "Number of unprocessed events in channel `{}` reached {}.", - sender.name, sender.queue_size_warning, + "The number of unprocessed events in channel `{}` reached {}.\n\ + The channel was created at:\n{}", + sender.name, + sender.queue_size_warning, + sender.created_backtrace, ); } sender.inner.unbounded_send(event.clone()).is_ok() From 49720f283e7cc427296ea72f2cc99dc6ec9c181a Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Mon, 19 Dec 2022 17:25:52 +0300 Subject: [PATCH 04/14] rustfmt --- client/network/src/service/out_events.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/client/network/src/service/out_events.rs b/client/network/src/service/out_events.rs index 91f8ca2319597..1152e611bca4c 100644 --- a/client/network/src/service/out_events.rs +++ b/client/network/src/service/out_events.rs @@ -192,9 +192,7 @@ impl OutChannels { error!( "The number of unprocessed events in channel `{}` reached {}.\n\ The channel was created at:\n{}", - sender.name, - sender.queue_size_warning, - sender.created_backtrace, + sender.name, sender.queue_size_warning, sender.created_backtrace, ); } sender.inner.unbounded_send(event.clone()).is_ok() From c640cbfc264128e5d058fa19c270df43b9343aab Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Mon, 19 Dec 2022 17:42:42 +0300 Subject: [PATCH 05/14] minor: convert tuple into struct --- client/utils/src/mpsc.rs | 55 ++++++++++++++++++++-------------------- 1 file changed, 28 insertions(+), 27 deletions(-) diff --git a/client/utils/src/mpsc.rs b/client/utils/src/mpsc.rs index ee3fba4a5ee67..339998e0853d6 100644 --- a/client/utils/src/mpsc.rs +++ b/client/utils/src/mpsc.rs @@ -49,67 +49,68 @@ mod inner { /// Wrapper Type around `UnboundedSender` that increases the global /// measure when a message is added - #[derive(Debug)] - pub struct TracingUnboundedSender(&'static str, UnboundedSender); - - // Strangely, deriving `Clone` requires that `T` is also `Clone`. - impl Clone for TracingUnboundedSender { - fn clone(&self) -> Self { - Self(self.0, self.1.clone()) - } + #[derive(Debug, Clone)] + pub struct TracingUnboundedSender { + inner: UnboundedSender, + name: &'static str, } /// Wrapper Type around `UnboundedReceiver` that decreases the global /// measure when a message is polled #[derive(Debug)] - pub struct TracingUnboundedReceiver(&'static str, UnboundedReceiver); + pub struct TracingUnboundedReceiver { + inner: UnboundedReceiver, + name: &'static str, + } /// Wrapper around `mpsc::unbounded` that tracks the in- and outflow via /// `UNBOUNDED_CHANNELS_COUNTER` pub fn tracing_unbounded( - key: &'static str, + name: &'static str, ) -> (TracingUnboundedSender, TracingUnboundedReceiver) { let (s, r) = mpsc::unbounded(); - (TracingUnboundedSender(key, s), TracingUnboundedReceiver(key, r)) + let sender = TracingUnboundedSender { inner: s, name }; + let receiver = TracingUnboundedReceiver { inner: r, name }; + (sender, receiver) } impl TracingUnboundedSender { /// Proxy function to mpsc::UnboundedSender pub fn poll_ready(&self, ctx: &mut Context) -> Poll> { - self.1.poll_ready(ctx) + self.inner.poll_ready(ctx) } /// Proxy function to mpsc::UnboundedSender pub fn is_closed(&self) -> bool { - self.1.is_closed() + self.inner.is_closed() } /// Proxy function to mpsc::UnboundedSender pub fn close_channel(&self) { - self.1.close_channel() + self.inner.close_channel() } /// Proxy function to mpsc::UnboundedSender pub fn disconnect(&mut self) { - self.1.disconnect() + self.inner.disconnect() } /// Proxy function to mpsc::UnboundedSender pub fn start_send(&mut self, msg: T) -> Result<(), SendError> { - self.1.start_send(msg) + self.inner.start_send(msg) } /// Proxy function to mpsc::UnboundedSender pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError> { - self.1.unbounded_send(msg).map(|s| { - UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.0, "send"]).inc(); + self.inner.unbounded_send(msg).map(|s| { + UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.name, "send"]).inc(); s }) } /// Proxy function to mpsc::UnboundedSender pub fn same_receiver(&self, other: &UnboundedSender) -> bool { - self.1.same_receiver(other) + self.inner.same_receiver(other) } } @@ -118,7 +119,7 @@ mod inner { // consume all items, make sure to reflect the updated count let mut count = 0; loop { - if self.1.is_terminated() { + if self.inner.is_terminated() { break } @@ -129,7 +130,7 @@ mod inner { } // and discount the messages if count > 0 { - UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.0, "dropped"]).inc_by(count); + UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.name, "dropped"]).inc_by(count); } } @@ -137,15 +138,15 @@ mod inner { /// that consumes all messages first and updates the counter pub fn close(&mut self) { self.consume(); - self.1.close() + self.inner.close() } /// Proxy function to mpsc::UnboundedReceiver /// that discounts the messages taken out pub fn try_next(&mut self) -> Result, TryRecvError> { - self.1.try_next().map(|s| { + self.inner.try_next().map(|s| { if s.is_some() { - UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.0, "received"]).inc(); + UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.name, "received"]).inc(); } s }) @@ -165,10 +166,10 @@ mod inner { fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let s = self.get_mut(); - match Pin::new(&mut s.1).poll_next(cx) { + match Pin::new(&mut s.inner).poll_next(cx) { Poll::Ready(msg) => { if msg.is_some() { - UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[s.0, "received"]).inc(); + UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[s.name, "received"]).inc(); } Poll::Ready(msg) }, @@ -179,7 +180,7 @@ mod inner { impl FusedStream for TracingUnboundedReceiver { fn is_terminated(&self) -> bool { - self.1.is_terminated() + self.inner.is_terminated() } } From 972b45298b9972d3551ec96fefce5e37e44ca190 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Mon, 19 Dec 2022 17:44:49 +0300 Subject: [PATCH 06/14] minor: renaming --- client/network/src/service/out_events.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/client/network/src/service/out_events.rs b/client/network/src/service/out_events.rs index 1152e611bca4c..c2ef7b462ed0d 100644 --- a/client/network/src/service/out_events.rs +++ b/client/network/src/service/out_events.rs @@ -61,7 +61,7 @@ pub fn channel(name: &'static str, queue_size_warning: usize) -> (Sender, Receiv queue_size: queue_size.clone(), queue_size_warning, warning_fired: false, - created_backtrace: Backtrace::force_capture(), + creation_backtrace: Backtrace::force_capture(), metrics: metrics.clone(), }; let rx = Receiver { inner: rx, name, queue_size, metrics }; @@ -86,7 +86,7 @@ pub struct Sender { /// We generate the error message only once to not spam the logs. warning_fired: bool, /// Backtrace of a place where the channel was created. - created_backtrace: Backtrace, + creation_backtrace: Backtrace, /// Clone of [`Receiver::metrics`]. metrics: Arc>>>>, } @@ -192,7 +192,7 @@ impl OutChannels { error!( "The number of unprocessed events in channel `{}` reached {}.\n\ The channel was created at:\n{}", - sender.name, sender.queue_size_warning, sender.created_backtrace, + sender.name, sender.queue_size_warning, sender.creation_backtrace, ); } sender.inner.unbounded_send(event.clone()).is_ok() From 1bb9602bf7bfa47584a2ce697d9e6922f20af130 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Tue, 20 Dec 2022 13:54:11 +0300 Subject: [PATCH 07/14] Simplify queue size couting logic by using signed integer --- client/network/src/service/out_events.rs | 39 +++++++++++++++--------- 1 file changed, 24 insertions(+), 15 deletions(-) diff --git a/client/network/src/service/out_events.rs b/client/network/src/service/out_events.rs index c2ef7b462ed0d..0e93397b5a3a9 100644 --- a/client/network/src/service/out_events.rs +++ b/client/network/src/service/out_events.rs @@ -37,12 +37,12 @@ use parking_lot::Mutex; use prometheus_endpoint::{register, CounterVec, GaugeVec, Opts, PrometheusError, Registry, U64}; use sc_network_common::protocol::event::Event; use std::{ - backtrace::Backtrace, + backtrace::{Backtrace, BacktraceStatus}, cell::RefCell, fmt, pin::Pin, sync::{ - atomic::{AtomicUsize, Ordering}, + atomic::{AtomicI64, Ordering}, Arc, }, task::{Context, Poll}, @@ -51,17 +51,17 @@ use std::{ /// Creates a new channel that can be associated to a [`OutChannels`]. /// /// The name is used in Prometheus reports. -pub fn channel(name: &'static str, queue_size_warning: usize) -> (Sender, Receiver) { +pub fn channel(name: &'static str, queue_size_warning: i64) -> (Sender, Receiver) { let (tx, rx) = mpsc::unbounded(); let metrics = Arc::new(Mutex::new(None)); - let queue_size = Arc::new(AtomicUsize::new(0)); + let queue_size = Arc::new(AtomicI64::new(0)); let tx = Sender { inner: tx, name, queue_size: queue_size.clone(), queue_size_warning, warning_fired: false, - creation_backtrace: Backtrace::force_capture(), + creation_backtrace: Backtrace::capture(), metrics: metrics.clone(), }; let rx = Receiver { inner: rx, name, queue_size, metrics }; @@ -80,9 +80,12 @@ pub struct Sender { /// Name to identify the channel (e.g., in Prometheus and logs). name: &'static str, /// Number of events in the queue. Clone of [`Receiver::in_transit`]. - queue_size: Arc, + // To not bother with ordering and possible underflow errors of the unsigned counter + // we just use `i64` and `Ordering::Relaxed`, and perceive `queue_size` as approximate. + // It can turn < 0 though. + queue_size: Arc, /// Threshold queue size to generate an error message in the logs. - queue_size_warning: usize, + queue_size_warning: i64, /// We generate the error message only once to not spam the logs. warning_fired: bool, /// Backtrace of a place where the channel was created. @@ -110,7 +113,7 @@ impl Drop for Sender { pub struct Receiver { inner: mpsc::UnboundedReceiver, name: &'static str, - queue_size: Arc, + queue_size: Arc, /// Initially contains `None`, and will be set to a value once the corresponding [`Sender`] /// is assigned to an instance of [`OutChannels`]. metrics: Arc>>>>, @@ -121,7 +124,7 @@ impl Stream for Receiver { fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { if let Some(ev) = ready!(Pin::new(&mut self.inner).poll_next(cx)) { - let _ = self.queue_size.fetch_sub(1, Ordering::Release); + let _ = self.queue_size.fetch_sub(1, Ordering::Relaxed); let metrics = self.metrics.lock().clone(); match metrics.as_ref().map(|m| m.as_ref()) { Some(Some(metrics)) => metrics.event_out(&ev, self.name), @@ -186,14 +189,20 @@ impl OutChannels { /// Sends an event. pub fn send(&mut self, event: Event) { self.event_streams.retain_mut(|sender| { - let queue_size = sender.queue_size.fetch_add(1, Ordering::Acquire); + let queue_size = sender.queue_size.fetch_add(1, Ordering::Relaxed); if queue_size == sender.queue_size_warning && !sender.warning_fired { sender.warning_fired = true; - error!( - "The number of unprocessed events in channel `{}` reached {}.\n\ - The channel was created at:\n{}", - sender.name, sender.queue_size_warning, sender.creation_backtrace, - ); + match sender.creation_backtrace.status() { + BacktraceStatus::Captured => error!( + "The number of unprocessed events in channel `{}` reached {}.\n\ + The channel was created at:\n{}", + sender.name, sender.queue_size_warning, sender.creation_backtrace, + ), + _ => error!( + "The number of unprocessed events in channel `{}` reached {}.", + sender.name, sender.queue_size_warning, + ), + } } sender.inner.unbounded_send(event.clone()).is_ok() }); From 69f27275e8cfb5b237076e21b7507e74aed0e93c Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Tue, 20 Dec 2022 14:00:00 +0300 Subject: [PATCH 08/14] Warn if there is clogging in utils::mpsc::tracing_unbounded() --- client/utils/src/mpsc.rs | 88 ++++++++++++++++++++++++++++++++++++---- 1 file changed, 80 insertions(+), 8 deletions(-) diff --git a/client/utils/src/mpsc.rs b/client/utils/src/mpsc.rs index 339998e0853d6..e28aad9a59b70 100644 --- a/client/utils/src/mpsc.rs +++ b/client/utils/src/mpsc.rs @@ -45,14 +45,43 @@ mod inner { stream::{FusedStream, Stream}, task::{Context, Poll}, }; - use std::pin::Pin; + use log::error; + use std::{ + backtrace::{Backtrace, BacktraceStatus}, + pin::Pin, + sync::{ + atomic::{AtomicBool, AtomicI64, Ordering}, + Arc, + }, + }; /// Wrapper Type around `UnboundedSender` that increases the global /// measure when a message is added - #[derive(Debug, Clone)] + #[derive(Debug)] pub struct TracingUnboundedSender { inner: UnboundedSender, name: &'static str, + // To not bother with ordering and possible underflow errors of the unsigned counter + // we just use `i64` and `Ordering::Relaxed`, and perceive `queue_size` as approximate. + // It can turn < 0 though. + queue_size: Arc, + queue_size_warning: i64, + warning_fired: Arc, + creation_backtrace: Arc, + } + + // Strangely, deriving `Clone` requires that `T` is also `Clone`. + impl Clone for TracingUnboundedSender { + fn clone(&self) -> Self { + Self { + inner: self.inner.clone(), + name: self.name, + queue_size: self.queue_size.clone(), + queue_size_warning: self.queue_size_warning, + warning_fired: self.warning_fired.clone(), + creation_backtrace: self.creation_backtrace.clone(), + } + } } /// Wrapper Type around `UnboundedReceiver` that decreases the global @@ -61,16 +90,27 @@ mod inner { pub struct TracingUnboundedReceiver { inner: UnboundedReceiver, name: &'static str, + queue_size: Arc, } /// Wrapper around `mpsc::unbounded` that tracks the in- and outflow via - /// `UNBOUNDED_CHANNELS_COUNTER` + /// `UNBOUNDED_CHANNELS_COUNTER` and warns if the message queue grows + /// above the warning threshold. pub fn tracing_unbounded( name: &'static str, + //queue_size_warning: i64, ) -> (TracingUnboundedSender, TracingUnboundedReceiver) { let (s, r) = mpsc::unbounded(); - let sender = TracingUnboundedSender { inner: s, name }; - let receiver = TracingUnboundedReceiver { inner: r, name }; + let queue_size = Arc::new(AtomicI64::new(0)); + let sender = TracingUnboundedSender { + inner: s, + name, + queue_size: queue_size.clone(), + queue_size_warning: 1, + warning_fired: Arc::new(AtomicBool::new(false)), + creation_backtrace: Arc::new(Backtrace::capture()), + }; + let receiver = TracingUnboundedReceiver { inner: r, name, queue_size }; (sender, receiver) } @@ -95,15 +135,39 @@ mod inner { self.inner.disconnect() } - /// Proxy function to mpsc::UnboundedSender pub fn start_send(&mut self, msg: T) -> Result<(), SendError> { - self.inner.start_send(msg) + // The underlying implementation of [`UnboundedSender::start_send`] is the same as + // [`UnboundedSender::unbounded_send`], so we just reuse the message counting and + // error reporting code from `unbounded_send`. + self.unbounded_send(msg).map_err(TrySendError::into_send_error) } /// Proxy function to mpsc::UnboundedSender pub fn unbounded_send(&self, msg: T) -> Result<(), TrySendError> { self.inner.unbounded_send(msg).map(|s| { UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.name, "send"]).inc(); + + let queue_size = self.queue_size.fetch_add(1, Ordering::Relaxed); + if queue_size == self.queue_size_warning && + !self.warning_fired.load(Ordering::Relaxed) + { + // `warning_fired` and `queue_size` are not synchronized, so it's possible + // that the warning is fired few times before the `warning_fired` is seen + // by all threads. This seems better than introducing a mutex guarding them. + self.warning_fired.store(true, Ordering::Relaxed); + match self.creation_backtrace.status() { + BacktraceStatus::Captured => error!( + "The number of unprocessed messages in channel `{}` reached {}.\n\ + The channel was created at:\n{}", + self.name, self.queue_size_warning, self.creation_backtrace, + ), + _ => error!( + "The number of unprocessed messages in channel `{}` reached {}.", + self.name, self.queue_size_warning, + ), + } + } + s }) } @@ -130,7 +194,9 @@ mod inner { } // and discount the messages if count > 0 { - UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.name, "dropped"]).inc_by(count); + UNBOUNDED_CHANNELS_COUNTER + .with_label_values(&[self.name, "dropped"]) + .inc_by(count); } } @@ -146,6 +212,7 @@ mod inner { pub fn try_next(&mut self) -> Result, TryRecvError> { self.inner.try_next().map(|s| { if s.is_some() { + let _ = self.queue_size.fetch_sub(1, Ordering::Relaxed); UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[self.name, "received"]).inc(); } s @@ -169,6 +236,7 @@ mod inner { match Pin::new(&mut s.inner).poll_next(cx) { Poll::Ready(msg) => { if msg.is_some() { + let _ = s.queue_size.fetch_sub(1, Ordering::Relaxed); UNBOUNDED_CHANNELS_COUNTER.with_label_values(&[s.name, "received"]).inc(); } Poll::Ready(msg) @@ -224,6 +292,10 @@ mod inner { } fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + // The difference with `TracingUnboundedSender` is intentional. The underlying + // implementation differs for `UnboundedSender` and `&UnboundedSender`: + // the latter closes the channel completely with `close_channel()`, while the former + // only closes this specific sender with `disconnect()`. self.close_channel(); Poll::Ready(Ok(())) } From cacf5b9e58f7bd5e3a29f558f596bb172d596f6e Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Tue, 20 Dec 2022 16:24:49 +0300 Subject: [PATCH 09/14] Set mpsc::tracing_unbounded() queue size warning thresholds --- client/api/src/notifications.rs | 4 +- client/beefy/rpc/src/lib.rs | 4 +- client/beefy/src/lib.rs | 2 +- client/beefy/src/tests.rs | 10 ++--- .../common/src/import_queue/basic_queue.rs | 8 ++-- .../common/src/import_queue/buffered_link.rs | 8 ++-- client/finality-grandpa/rpc/src/lib.rs | 2 +- .../src/communication/gossip.rs | 2 +- .../src/communication/periodic.rs | 1 + .../src/communication/tests.rs | 4 +- client/finality-grandpa/src/lib.rs | 3 +- client/finality-grandpa/src/observer.rs | 2 +- client/finality-grandpa/src/until_imported.rs | 8 ++-- client/network/src/service.rs | 2 +- client/network/sync/src/lib.rs | 2 +- client/network/sync/src/service/network.rs | 2 +- client/offchain/src/api/http.rs | 4 +- client/peerset/src/lib.rs | 2 +- client/rpc/src/system/tests.rs | 2 +- client/service/src/builder.rs | 2 +- client/service/src/client/client.rs | 4 +- client/service/src/task_manager/mod.rs | 3 +- client/transaction-pool/src/graph/watcher.rs | 2 +- client/transaction-pool/src/revalidation.rs | 2 +- client/utils/src/mpsc.rs | 4 +- client/utils/src/notification.rs | 4 +- client/utils/src/notification/tests.rs | 2 +- client/utils/src/pubsub.rs | 4 +- .../src/pubsub/tests/normal_operation.rs | 10 ++--- .../src/pubsub/tests/panicking_registry.rs | 37 ++++++++++--------- client/utils/src/status_sinks.rs | 4 +- 31 files changed, 79 insertions(+), 71 deletions(-) diff --git a/client/api/src/notifications.rs b/client/api/src/notifications.rs index 9fcc381f9697e..4dd23581d2622 100644 --- a/client/api/src/notifications.rs +++ b/client/api/src/notifications.rs @@ -144,7 +144,9 @@ impl StorageNotifications { filter_keys: Option<&[StorageKey]>, filter_child_keys: Option<&[(StorageKey, Option>)]>, ) -> StorageEventStream { - let receiver = self.0.subscribe(registry::SubscribeOp { filter_keys, filter_child_keys }); + let receiver = self + .0 + .subscribe(registry::SubscribeOp { filter_keys, filter_child_keys }, 100_000); StorageEventStream(receiver) } diff --git a/client/beefy/rpc/src/lib.rs b/client/beefy/rpc/src/lib.rs index 59a133b86214e..4fea98c6eb24e 100644 --- a/client/beefy/rpc/src/lib.rs +++ b/client/beefy/rpc/src/lib.rs @@ -120,7 +120,7 @@ where ) -> Result { let beefy_best_block = Arc::new(RwLock::new(None)); - let stream = best_block_stream.subscribe(); + let stream = best_block_stream.subscribe(100_000); let closure_clone = beefy_best_block.clone(); let future = stream.for_each(move |best_beefy| { let async_clone = closure_clone.clone(); @@ -141,7 +141,7 @@ where fn subscribe_justifications(&self, mut sink: SubscriptionSink) -> SubscriptionResult { let stream = self .finality_proof_stream - .subscribe() + .subscribe(100_000) .map(|vfp| notification::EncodedVersionedFinalityProof::new::(vfp)); let fut = async move { diff --git a/client/beefy/src/lib.rs b/client/beefy/src/lib.rs index a057a9fdc597d..c190f97e2623d 100644 --- a/client/beefy/src/lib.rs +++ b/client/beefy/src/lib.rs @@ -265,7 +265,7 @@ where // Subscribe to finality notifications and justifications before waiting for runtime pallet and // reuse the streams, so we don't miss notifications while waiting for pallet to be available. let mut finality_notifications = client.finality_notification_stream().fuse(); - let block_import_justif = links.from_block_import_justif_stream.subscribe().fuse(); + let block_import_justif = links.from_block_import_justif_stream.subscribe(100_000).fuse(); // Wait for BEEFY pallet to be active before starting voter. let persisted_state = diff --git a/client/beefy/src/tests.rs b/client/beefy/src/tests.rs index d2befc979b737..f068a69238c45 100644 --- a/client/beefy/src/tests.rs +++ b/client/beefy/src/tests.rs @@ -415,8 +415,8 @@ pub(crate) fn get_beefy_streams( let beefy_rpc_links = net.peer(index).data.beefy_rpc_links.lock().clone().unwrap(); let BeefyRPCLinks { from_voter_justif_stream, from_voter_best_beefy_stream } = beefy_rpc_links; - best_block_streams.push(from_voter_best_beefy_stream.subscribe()); - versioned_finality_proof_streams.push(from_voter_justif_stream.subscribe()); + best_block_streams.push(from_voter_best_beefy_stream.subscribe(100_000)); + versioned_finality_proof_streams.push(from_voter_justif_stream.subscribe(100_000)); }); (best_block_streams, versioned_finality_proof_streams) } @@ -729,7 +729,7 @@ async fn beefy_importing_blocks() { let hashof1 = block.header.hash(); // Import without justifications. - let mut justif_recv = justif_stream.subscribe(); + let mut justif_recv = justif_stream.subscribe(100_000); assert_eq!( block_import .import_block(params(block.clone(), None), HashMap::new()) @@ -772,7 +772,7 @@ async fn beefy_importing_blocks() { let builder = full_client.new_block_at(&parent_id, Default::default(), false).unwrap(); let block = builder.build().unwrap().block; let hashof2 = block.header.hash(); - let mut justif_recv = justif_stream.subscribe(); + let mut justif_recv = justif_stream.subscribe(100_000); assert_eq!( block_import.import_block(params(block, justif), HashMap::new()).await.unwrap(), ImportResult::Imported(ImportedAux { @@ -816,7 +816,7 @@ async fn beefy_importing_blocks() { let builder = full_client.new_block_at(&parent_id, Default::default(), false).unwrap(); let block = builder.build().unwrap().block; let hashof3 = block.header.hash(); - let mut justif_recv = justif_stream.subscribe(); + let mut justif_recv = justif_stream.subscribe(100_000); assert_eq!( block_import.import_block(params(block, justif), HashMap::new()).await.unwrap(), ImportResult::Imported(ImportedAux { diff --git a/client/consensus/common/src/import_queue/basic_queue.rs b/client/consensus/common/src/import_queue/basic_queue.rs index b63bc192b2e77..a96b0a47e57c7 100644 --- a/client/consensus/common/src/import_queue/basic_queue.rs +++ b/client/consensus/common/src/import_queue/basic_queue.rs @@ -69,7 +69,7 @@ impl BasicQueue { spawner: &impl sp_core::traits::SpawnEssentialNamed, prometheus_registry: Option<&Registry>, ) -> Self { - let (result_sender, result_port) = buffered_link::buffered_link(); + let (result_sender, result_port) = buffered_link::buffered_link(100_000); let metrics = prometheus_registry.and_then(|r| { Metrics::register(r) @@ -276,10 +276,10 @@ impl BlockImportWorker { use worker_messages::*; let (justification_sender, mut justification_port) = - tracing_unbounded("mpsc_import_queue_worker_justification"); + tracing_unbounded("mpsc_import_queue_worker_justification", 100_000); let (block_import_sender, block_import_port) = - tracing_unbounded("mpsc_import_queue_worker_blocks"); + tracing_unbounded("mpsc_import_queue_worker_blocks", 100_000); let mut worker = BlockImportWorker { result_sender, justification_import, metrics }; @@ -595,7 +595,7 @@ mod tests { #[test] fn prioritizes_finality_work_over_block_import() { - let (result_sender, mut result_port) = buffered_link::buffered_link(); + let (result_sender, mut result_port) = buffered_link::buffered_link(100_000); let (worker, mut finality_sender, mut block_import_sender) = BlockImportWorker::new(result_sender, (), Box::new(()), Some(Box::new(())), None); diff --git a/client/consensus/common/src/import_queue/buffered_link.rs b/client/consensus/common/src/import_queue/buffered_link.rs index e6d3b212fdbac..ab9c9d7bd64cd 100644 --- a/client/consensus/common/src/import_queue/buffered_link.rs +++ b/client/consensus/common/src/import_queue/buffered_link.rs @@ -52,8 +52,10 @@ use super::BlockImportResult; /// Wraps around an unbounded channel from the `futures` crate. The sender implements `Link` and /// can be used to buffer commands, and the receiver can be used to poll said commands and transfer /// them to another link. -pub fn buffered_link() -> (BufferedLinkSender, BufferedLinkReceiver) { - let (tx, rx) = tracing_unbounded("mpsc_buffered_link"); +pub fn buffered_link( + queue_size_warning: i64, +) -> (BufferedLinkSender, BufferedLinkReceiver) { + let (tx, rx) = tracing_unbounded("mpsc_buffered_link", queue_size_warning); let tx = BufferedLinkSender { tx }; let rx = BufferedLinkReceiver { rx: rx.fuse() }; (tx, rx) @@ -175,7 +177,7 @@ mod tests { #[test] fn is_closed() { - let (tx, rx) = super::buffered_link::(); + let (tx, rx) = super::buffered_link::(1); assert!(!tx.is_closed()); drop(rx); assert!(tx.is_closed()); diff --git a/client/finality-grandpa/rpc/src/lib.rs b/client/finality-grandpa/rpc/src/lib.rs index dfdad666ba8f3..70ff7ed176869 100644 --- a/client/finality-grandpa/rpc/src/lib.rs +++ b/client/finality-grandpa/rpc/src/lib.rs @@ -104,7 +104,7 @@ where } fn subscribe_justifications(&self, mut sink: SubscriptionSink) -> SubscriptionResult { - let stream = self.justification_stream.subscribe().map( + let stream = self.justification_stream.subscribe(100_000).map( |x: sc_finality_grandpa::GrandpaJustification| { JustificationNotification::from(x) }, diff --git a/client/finality-grandpa/src/communication/gossip.rs b/client/finality-grandpa/src/communication/gossip.rs index 408cbda745e56..cbcafc727d436 100644 --- a/client/finality-grandpa/src/communication/gossip.rs +++ b/client/finality-grandpa/src/communication/gossip.rs @@ -1364,7 +1364,7 @@ impl GossipValidator { None => None, }; - let (tx, rx) = tracing_unbounded("mpsc_grandpa_gossip_validator"); + let (tx, rx) = tracing_unbounded("mpsc_grandpa_gossip_validator", 100_000); let val = GossipValidator { inner: parking_lot::RwLock::new(Inner::new(config)), set_state, diff --git a/client/finality-grandpa/src/communication/periodic.rs b/client/finality-grandpa/src/communication/periodic.rs index 7e50abb96e441..c00fed1296512 100644 --- a/client/finality-grandpa/src/communication/periodic.rs +++ b/client/finality-grandpa/src/communication/periodic.rs @@ -70,6 +70,7 @@ impl NeighborPacketWorker { pub(super) fn new(rebroadcast_period: Duration) -> (Self, NeighborPacketSender) { let (tx, rx) = tracing_unbounded::<(Vec, NeighborPacket>)>( "mpsc_grandpa_neighbor_packet_worker", + 100_000, ); let delay = Delay::new(rebroadcast_period); diff --git a/client/finality-grandpa/src/communication/tests.rs b/client/finality-grandpa/src/communication/tests.rs index eab7bb2df50cf..839b2d52be651 100644 --- a/client/finality-grandpa/src/communication/tests.rs +++ b/client/finality-grandpa/src/communication/tests.rs @@ -135,7 +135,7 @@ impl NetworkEventStream for TestNetwork { &self, _name: &'static str, ) -> Pin + Send>> { - let (tx, rx) = tracing_unbounded("test"); + let (tx, rx) = tracing_unbounded("test", 100_000); let _ = self.sender.unbounded_send(Event::EventStream(tx)); Box::pin(rx) } @@ -253,7 +253,7 @@ fn voter_set_state() -> SharedVoterSetState { // needs to run in a tokio runtime. pub(crate) fn make_test_network() -> (impl Future, TestNetwork) { - let (tx, rx) = tracing_unbounded("test"); + let (tx, rx) = tracing_unbounded("test", 100_000); let net = TestNetwork { sender: tx }; #[derive(Clone)] diff --git a/client/finality-grandpa/src/lib.rs b/client/finality-grandpa/src/lib.rs index efc46d8f93a6d..1597e60bd6061 100644 --- a/client/finality-grandpa/src/lib.rs +++ b/client/finality-grandpa/src/lib.rs @@ -566,7 +566,8 @@ where } })?; - let (voter_commands_tx, voter_commands_rx) = tracing_unbounded("mpsc_grandpa_voter_command"); + let (voter_commands_tx, voter_commands_rx) = + tracing_unbounded("mpsc_grandpa_voter_command", 100_000); let (justification_sender, justification_stream) = GrandpaJustificationStream::channel(); diff --git a/client/finality-grandpa/src/observer.rs b/client/finality-grandpa/src/observer.rs index 1efb71e5903ec..96101a8eda0ab 100644 --- a/client/finality-grandpa/src/observer.rs +++ b/client/finality-grandpa/src/observer.rs @@ -437,7 +437,7 @@ mod tests { aux_schema::load_persistent(&*backend, client.info().genesis_hash, 0, || Ok(voters)) .unwrap(); - let (_tx, voter_command_rx) = tracing_unbounded(""); + let (_tx, voter_command_rx) = tracing_unbounded("test_mpsc_voter_command", 100_000); let observer = ObserverWork::new( client, diff --git a/client/finality-grandpa/src/until_imported.rs b/client/finality-grandpa/src/until_imported.rs index 95b658e92298a..776411f8fb493 100644 --- a/client/finality-grandpa/src/until_imported.rs +++ b/client/finality-grandpa/src/until_imported.rs @@ -579,7 +579,7 @@ mod tests { impl TestChainState { fn new() -> (Self, ImportNotifications) { - let (tx, rx) = tracing_unbounded("test"); + let (tx, rx) = tracing_unbounded("test", 100_000); let state = TestChainState { sender: tx, known_blocks: Arc::new(Mutex::new(HashMap::new())) }; @@ -680,7 +680,7 @@ mod tests { // enact all dependencies before importing the message enact_dependencies(&chain_state); - let (global_tx, global_rx) = tracing_unbounded("test"); + let (global_tx, global_rx) = tracing_unbounded("test", 100_000); let until_imported = UntilGlobalMessageBlocksImported::new( import_notifications, @@ -708,7 +708,7 @@ mod tests { let (chain_state, import_notifications) = TestChainState::new(); let block_status = chain_state.block_status(); - let (global_tx, global_rx) = tracing_unbounded("test"); + let (global_tx, global_rx) = tracing_unbounded("test", 100_000); let until_imported = UntilGlobalMessageBlocksImported::new( import_notifications, @@ -896,7 +896,7 @@ mod tests { let (chain_state, import_notifications) = TestChainState::new(); let block_status = chain_state.block_status(); - let (global_tx, global_rx) = tracing_unbounded("test"); + let (global_tx, global_rx) = tracing_unbounded("test", 100_000); let block_sync_requester = TestBlockSyncRequester::default(); diff --git a/client/network/src/service.rs b/client/network/src/service.rs index d11482b313876..66de263a19a3e 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -208,7 +208,7 @@ where ¶ms.network_config.transport, )?; - let (to_worker, from_service) = tracing_unbounded("mpsc_network_worker"); + let (to_worker, from_service) = tracing_unbounded("mpsc_network_worker", 100_000); if let Some(path) = ¶ms.network_config.net_config_path { fs::create_dir_all(path)?; diff --git a/client/network/sync/src/lib.rs b/client/network/sync/src/lib.rs index 75eda91219ec8..04cb82f2b8eda 100644 --- a/client/network/sync/src/lib.rs +++ b/client/network/sync/src/lib.rs @@ -1436,7 +1436,7 @@ where state_request_protocol_name: ProtocolName, warp_sync_protocol_name: Option, ) -> Result<(Self, ChainSyncInterfaceHandle, NonDefaultSetConfig), ClientError> { - let (tx, service_rx) = tracing_unbounded("mpsc_chain_sync"); + let (tx, service_rx) = tracing_unbounded("mpsc_chain_sync", 100_000); let block_announce_config = Self::get_block_announce_proto_config( protocol_id, fork_id, diff --git a/client/network/sync/src/service/network.rs b/client/network/sync/src/service/network.rs index c44398b0f1a9e..b81a65ae731cf 100644 --- a/client/network/sync/src/service/network.rs +++ b/client/network/sync/src/service/network.rs @@ -99,7 +99,7 @@ impl NetworkServiceHandle { impl NetworkServiceProvider { /// Create new `NetworkServiceProvider` pub fn new() -> (Self, NetworkServiceHandle) { - let (tx, rx) = tracing_unbounded("mpsc_network_service_provider"); + let (tx, rx) = tracing_unbounded("mpsc_network_service_provider", 100_000); (Self { rx }, NetworkServiceHandle::new(tx)) } diff --git a/client/offchain/src/api/http.rs b/client/offchain/src/api/http.rs index 4c97e5a47058d..a47adb3e8026e 100644 --- a/client/offchain/src/api/http.rs +++ b/client/offchain/src/api/http.rs @@ -66,8 +66,8 @@ impl SharedClient { /// Creates a pair of [`HttpApi`] and [`HttpWorker`]. pub fn http(shared_client: SharedClient) -> (HttpApi, HttpWorker) { - let (to_worker, from_api) = tracing_unbounded("mpsc_ocw_to_worker"); - let (to_api, from_worker) = tracing_unbounded("mpsc_ocw_to_api"); + let (to_worker, from_api) = tracing_unbounded("mpsc_ocw_to_worker", 100_000); + let (to_api, from_worker) = tracing_unbounded("mpsc_ocw_to_api", 100_000); let api = HttpApi { to_worker, diff --git a/client/peerset/src/lib.rs b/client/peerset/src/lib.rs index ec09835c4898e..9b1dc6a2d0276 100644 --- a/client/peerset/src/lib.rs +++ b/client/peerset/src/lib.rs @@ -275,7 +275,7 @@ pub struct Peerset { impl Peerset { /// Builds a new peerset from the given configuration. pub fn from_config(config: PeersetConfig) -> (Self, PeersetHandle) { - let (tx, rx) = tracing_unbounded("mpsc_peerset_messages"); + let (tx, rx) = tracing_unbounded("mpsc_peerset_messages", 10_000); let handle = PeersetHandle { tx: tx.clone() }; diff --git a/client/rpc/src/system/tests.rs b/client/rpc/src/system/tests.rs index 00ab9c46861e2..4da49cdd1a0c5 100644 --- a/client/rpc/src/system/tests.rs +++ b/client/rpc/src/system/tests.rs @@ -52,7 +52,7 @@ impl Default for Status { fn api>>(sync: T) -> RpcModule> { let status = sync.into().unwrap_or_default(); let should_have_peers = !status.is_dev; - let (tx, rx) = tracing_unbounded("rpc_system_tests"); + let (tx, rx) = tracing_unbounded("rpc_system_tests", 10_000); thread::spawn(move || { futures::executor::block_on(rx.for_each(move |request| { match request { diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 7153672030d6a..22cd61cc39588 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -928,7 +928,7 @@ where ); spawn_handle.spawn("import-queue", None, import_queue.run(Box::new(chain_sync_service))); - let (system_rpc_tx, system_rpc_rx) = tracing_unbounded("mpsc_system_rpc"); + let (system_rpc_tx, system_rpc_rx) = tracing_unbounded("mpsc_system_rpc", 10_000); let future = build_network_future( config.role.clone(), diff --git a/client/service/src/client/client.rs b/client/service/src/client/client.rs index 8ded5ec95c166..2d3615a96dae6 100644 --- a/client/service/src/client/client.rs +++ b/client/service/src/client/client.rs @@ -1909,13 +1909,13 @@ where { /// Get block import event stream. fn import_notification_stream(&self) -> ImportNotifications { - let (sink, stream) = tracing_unbounded("mpsc_import_notification_stream"); + let (sink, stream) = tracing_unbounded("mpsc_import_notification_stream", 100_000); self.import_notification_sinks.lock().push(sink); stream } fn finality_notification_stream(&self) -> FinalityNotifications { - let (sink, stream) = tracing_unbounded("mpsc_finality_notification_stream"); + let (sink, stream) = tracing_unbounded("mpsc_finality_notification_stream", 100_000); self.finality_notification_sinks.lock().push(sink); stream } diff --git a/client/service/src/task_manager/mod.rs b/client/service/src/task_manager/mod.rs index 49189dc21ce8d..23265f9672555 100644 --- a/client/service/src/task_manager/mod.rs +++ b/client/service/src/task_manager/mod.rs @@ -310,7 +310,8 @@ impl TaskManager { let (signal, on_exit) = exit_future::signal(); // A side-channel for essential tasks to communicate shutdown. - let (essential_failed_tx, essential_failed_rx) = tracing_unbounded("mpsc_essential_tasks"); + let (essential_failed_tx, essential_failed_rx) = + tracing_unbounded("mpsc_essential_tasks", 100); let metrics = prometheus_registry.map(Metrics::register).transpose()?; diff --git a/client/transaction-pool/src/graph/watcher.rs b/client/transaction-pool/src/graph/watcher.rs index 0613300c8684b..df5bb94edfe6d 100644 --- a/client/transaction-pool/src/graph/watcher.rs +++ b/client/transaction-pool/src/graph/watcher.rs @@ -62,7 +62,7 @@ impl Default for Sender { impl Sender { /// Add a new watcher to this sender object. pub fn new_watcher(&mut self, hash: H) -> Watcher { - let (tx, receiver) = tracing_unbounded("mpsc_txpool_watcher"); + let (tx, receiver) = tracing_unbounded("mpsc_txpool_watcher", 100_000); self.receivers.push(tx); Watcher { receiver, hash } } diff --git a/client/transaction-pool/src/revalidation.rs b/client/transaction-pool/src/revalidation.rs index b4b4299240a32..d8c8bea625fb3 100644 --- a/client/transaction-pool/src/revalidation.rs +++ b/client/transaction-pool/src/revalidation.rs @@ -291,7 +291,7 @@ where pool: Arc>, interval: Duration, ) -> (Self, Pin + Send>>) { - let (to_worker, from_queue) = tracing_unbounded("mpsc_revalidation_queue"); + let (to_worker, from_queue) = tracing_unbounded("mpsc_revalidation_queue", 100_000); let worker = RevalidationWorker::new(api.clone(), pool.clone()); diff --git a/client/utils/src/mpsc.rs b/client/utils/src/mpsc.rs index e28aad9a59b70..7db5e49f5bca9 100644 --- a/client/utils/src/mpsc.rs +++ b/client/utils/src/mpsc.rs @@ -98,7 +98,7 @@ mod inner { /// above the warning threshold. pub fn tracing_unbounded( name: &'static str, - //queue_size_warning: i64, + queue_size_warning: i64, ) -> (TracingUnboundedSender, TracingUnboundedReceiver) { let (s, r) = mpsc::unbounded(); let queue_size = Arc::new(AtomicI64::new(0)); @@ -106,7 +106,7 @@ mod inner { inner: s, name, queue_size: queue_size.clone(), - queue_size_warning: 1, + queue_size_warning, warning_fired: Arc::new(AtomicBool::new(false)), creation_backtrace: Arc::new(Backtrace::capture()), }; diff --git a/client/utils/src/notification.rs b/client/utils/src/notification.rs index ff527c343f9f2..4917a43265df4 100644 --- a/client/utils/src/notification.rs +++ b/client/utils/src/notification.rs @@ -79,8 +79,8 @@ impl NotificationStream { } /// Subscribe to a channel through which the generic payload can be received. - pub fn subscribe(&self) -> NotificationReceiver { - let receiver = self.hub.subscribe(()); + pub fn subscribe(&self, queue_size_warning: i64) -> NotificationReceiver { + let receiver = self.hub.subscribe((), queue_size_warning); NotificationReceiver { receiver } } } diff --git a/client/utils/src/notification/tests.rs b/client/utils/src/notification/tests.rs index a001fa7e89e95..f813f37d29ddb 100644 --- a/client/utils/src/notification/tests.rs +++ b/client/utils/src/notification/tests.rs @@ -36,7 +36,7 @@ fn notification_channel_simple() { // Create a future to receive a single notification // from the stream and verify its payload. - let future = stream.subscribe().take(1).for_each(move |payload| { + let future = stream.subscribe(100_000).take(1).for_each(move |payload| { let test_payload = closure_payload.clone(); async move { assert_eq!(payload, test_payload); diff --git a/client/utils/src/pubsub.rs b/client/utils/src/pubsub.rs index ba6e9ddc6ca2a..f85f44b498841 100644 --- a/client/utils/src/pubsub.rs +++ b/client/utils/src/pubsub.rs @@ -164,7 +164,7 @@ impl Hub { /// Subscribe to this Hub using the `subs_key: K`. /// /// A subscription with a key `K` is possible if the Registry implements `Subscribe`. - pub fn subscribe(&self, subs_key: K) -> Receiver + pub fn subscribe(&self, subs_key: K, queue_size_warning: i64) -> Receiver where R: Subscribe + Unsubscribe, { @@ -178,7 +178,7 @@ impl Hub { // have the sink disposed. shared_borrowed.registry.subscribe(subs_key, subs_id); - let (tx, rx) = crate::mpsc::tracing_unbounded(self.tracing_key); + let (tx, rx) = crate::mpsc::tracing_unbounded(self.tracing_key, queue_size_warning); assert!(shared_borrowed.sinks.insert(subs_id, tx).is_none(), "Used IDSequence to create another ID. Should be unique until u64 is overflowed. Should be unique."); Receiver { shared: Arc::downgrade(&self.shared), subs_id, rx } diff --git a/client/utils/src/pubsub/tests/normal_operation.rs b/client/utils/src/pubsub/tests/normal_operation.rs index a13c718d74a8f..830388de32e46 100644 --- a/client/utils/src/pubsub/tests/normal_operation.rs +++ b/client/utils/src/pubsub/tests/normal_operation.rs @@ -27,7 +27,7 @@ fn positive_rx_receives_relevant_messages_and_terminates_upon_hub_drop() { // No subscribers yet. That message is not supposed to get to anyone. hub.send(0); - let mut rx_01 = hub.subscribe(SubsKey::new()); + let mut rx_01 = hub.subscribe(SubsKey::new(), 100_000); assert_eq!(hub.subs_count(), 1); // That message is sent after subscription. Should be delivered into rx_01. @@ -49,9 +49,9 @@ fn positive_subs_count_is_correct_upon_drop_of_rxs() { let hub = TestHub::new(TK); assert_eq!(hub.subs_count(), 0); - let rx_01 = hub.subscribe(SubsKey::new()); + let rx_01 = hub.subscribe(SubsKey::new(), 100_000); assert_eq!(hub.subs_count(), 1); - let rx_02 = hub.subscribe(SubsKey::new()); + let rx_02 = hub.subscribe(SubsKey::new(), 100_000); assert_eq!(hub.subs_count(), 2); std::mem::drop(rx_01); @@ -69,11 +69,11 @@ fn positive_subs_count_is_correct_upon_drop_of_rxs_on_cloned_hubs() { assert_eq!(hub_01.subs_count(), 0); assert_eq!(hub_02.subs_count(), 0); - let rx_01 = hub_02.subscribe(SubsKey::new()); + let rx_01 = hub_02.subscribe(SubsKey::new(), 100_000); assert_eq!(hub_01.subs_count(), 1); assert_eq!(hub_02.subs_count(), 1); - let rx_02 = hub_02.subscribe(SubsKey::new()); + let rx_02 = hub_02.subscribe(SubsKey::new(), 100_000); assert_eq!(hub_01.subs_count(), 2); assert_eq!(hub_02.subs_count(), 2); diff --git a/client/utils/src/pubsub/tests/panicking_registry.rs b/client/utils/src/pubsub/tests/panicking_registry.rs index 26ce63bd51b01..cfe8168d80229 100644 --- a/client/utils/src/pubsub/tests/panicking_registry.rs +++ b/client/utils/src/pubsub/tests/panicking_registry.rs @@ -30,7 +30,7 @@ fn t01() { let hub = TestHub::new(TK); assert_hub_props(&hub, 0, 0); - let rx_01 = hub.subscribe(SubsKey::new()); + let rx_01 = hub.subscribe(SubsKey::new(), 100_000); assert_hub_props(&hub, 1, 1); std::mem::drop(rx_01); @@ -45,17 +45,17 @@ fn t02() { assert_hub_props(&hub, 0, 0); // Subscribe rx-01 - let rx_01 = hub.subscribe(SubsKey::new()); + let rx_01 = hub.subscribe(SubsKey::new(), 100_000); assert_hub_props(&hub, 1, 1); // Subscribe rx-02 so that its unsubscription will lead to an attempt to drop rx-01 in the // middle of unsubscription of rx-02 - let rx_02 = hub.subscribe(SubsKey::new().with_receiver(rx_01)); + let rx_02 = hub.subscribe(SubsKey::new().with_receiver(rx_01), 100_000); assert_hub_props(&hub, 2, 2); // Subscribe rx-03 in order to see that it will receive messages after the unclean // unsubscription - let mut rx_03 = hub.subscribe(SubsKey::new()); + let mut rx_03 = hub.subscribe(SubsKey::new(), 100_000); assert_hub_props(&hub, 3, 3); // drop rx-02 leads to an attempt to unsubscribe rx-01 @@ -69,7 +69,7 @@ fn t02() { // Subscribe rx-04 in order to see that it will receive messages after the unclean // unsubscription - let mut rx_04 = hub.subscribe(SubsKey::new()); + let mut rx_04 = hub.subscribe(SubsKey::new(), 100_000); assert_hub_props(&hub, 3, 3); hub.send(2); @@ -96,8 +96,8 @@ fn t02() { } async fn add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(hub: &TestHub) { - let rx_01 = hub.subscribe(SubsKey::new()); - let rx_02 = hub.subscribe(SubsKey::new()); + let rx_01 = hub.subscribe(SubsKey::new(), 100_000); + let rx_02 = hub.subscribe(SubsKey::new(), 100_000); hub.send(1); hub.send(2); @@ -121,9 +121,8 @@ fn t03() { add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await; assert_hub_props(&hub, 0, 0); - assert!(catch_unwind(AssertUnwindSafe( - || hub.subscribe(SubsKey::new().with_panic(SubsKeyPanic::OnSubscribePanicBefore)) - )) + assert!(catch_unwind(AssertUnwindSafe(|| hub + .subscribe(SubsKey::new().with_panic(SubsKeyPanic::OnSubscribePanicBefore), 100_000))) .is_err()); assert_hub_props(&hub, 0, 0); @@ -141,9 +140,8 @@ fn t04() { add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await; assert_hub_props(&hub, 0, 0); - assert!(catch_unwind(AssertUnwindSafe( - || hub.subscribe(SubsKey::new().with_panic(SubsKeyPanic::OnSubscribePanicAfter)) - )) + assert!(catch_unwind(AssertUnwindSafe(|| hub + .subscribe(SubsKey::new().with_panic(SubsKeyPanic::OnSubscribePanicAfter), 100_000))) .is_err()); // the registry has panicked after it has added a subs-id into its internal storage — the @@ -163,8 +161,8 @@ fn t05() { add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await; assert_hub_props(&hub, 0, 0); - let rx_01 = - hub.subscribe(SubsKey::new().with_panic(SubsKeyPanic::OnUnsubscribePanicBefore)); + let rx_01 = hub + .subscribe(SubsKey::new().with_panic(SubsKeyPanic::OnUnsubscribePanicBefore), 100_000); assert_hub_props(&hub, 1, 1); add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await; @@ -189,7 +187,8 @@ fn t06() { add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await; assert_hub_props(&hub, 0, 0); - let rx_01 = hub.subscribe(SubsKey::new().with_panic(SubsKeyPanic::OnUnsubscribePanicAfter)); + let rx_01 = hub + .subscribe(SubsKey::new().with_panic(SubsKeyPanic::OnUnsubscribePanicAfter), 100_000); assert_hub_props(&hub, 1, 1); add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await; @@ -214,7 +213,8 @@ fn t07() { add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await; assert_hub_props(&hub, 0, 0); - let rx_01 = hub.subscribe(SubsKey::new().with_panic(SubsKeyPanic::OnDispatchPanicBefore)); + let rx_01 = + hub.subscribe(SubsKey::new().with_panic(SubsKeyPanic::OnDispatchPanicBefore), 100_000); assert_hub_props(&hub, 1, 1); assert!(catch_unwind(AssertUnwindSafe(|| hub.send(1))).is_err()); assert_hub_props(&hub, 1, 1); @@ -235,7 +235,8 @@ fn t08() { add_some_subscribers_see_that_messages_are_delivered_and_unsubscribe(&hub).await; assert_hub_props(&hub, 0, 0); - let rx_01 = hub.subscribe(SubsKey::new().with_panic(SubsKeyPanic::OnDispatchPanicAfter)); + let rx_01 = + hub.subscribe(SubsKey::new().with_panic(SubsKeyPanic::OnDispatchPanicAfter), 100_000); assert_hub_props(&hub, 1, 1); assert!(catch_unwind(AssertUnwindSafe(|| hub.send(1))).is_err()); assert_hub_props(&hub, 1, 1); diff --git a/client/utils/src/status_sinks.rs b/client/utils/src/status_sinks.rs index a1d965d08085e..c536e2c18c6a1 100644 --- a/client/utils/src/status_sinks.rs +++ b/client/utils/src/status_sinks.rs @@ -58,7 +58,7 @@ impl Default for StatusSinks { impl StatusSinks { /// Builds a new empty collection. pub fn new() -> StatusSinks { - let (entries_tx, entries_rx) = tracing_unbounded("status-sinks-entries"); + let (entries_tx, entries_rx) = tracing_unbounded("status-sinks-entries", 100_000); StatusSinks { inner: Mutex::new(Inner { entries: stream::FuturesUnordered::new(), entries_rx }), @@ -196,7 +196,7 @@ mod tests { let status_sinks = StatusSinks::new(); - let (tx, rx) = tracing_unbounded("test"); + let (tx, rx) = tracing_unbounded("test", 100_000); status_sinks.push(Duration::from_millis(100), tx); let mut val_order = 5; From 82e1b1e4d2157caaa77aeac680bbcb58abcd5219 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Tue, 20 Dec 2022 17:16:37 +0300 Subject: [PATCH 10/14] Make unbounded channel in `network/transactions` metered --- Cargo.lock | 1 + client/network/transactions/Cargo.toml | 1 + client/network/transactions/src/lib.rs | 7 ++++--- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b840cadce3e5c..321ebfa2f29b4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7994,6 +7994,7 @@ dependencies = [ "pin-project", "sc-network-common", "sc-peerset", + "sc-utils", "sp-consensus", "sp-runtime", "substrate-prometheus-endpoint", diff --git a/client/network/transactions/Cargo.toml b/client/network/transactions/Cargo.toml index cb45abca02f6a..70417935a824b 100644 --- a/client/network/transactions/Cargo.toml +++ b/client/network/transactions/Cargo.toml @@ -23,5 +23,6 @@ pin-project = "1.0.12" prometheus-endpoint = { package = "substrate-prometheus-endpoint", version = "0.10.0-dev", path = "../../../utils/prometheus" } sc-network-common = { version = "0.10.0-dev", path = "../common" } sc-peerset = { version = "4.0.0-dev", path = "../../peerset" } +sc-utils = { version = "4.0.0-dev", path = "../../utils" } sp-runtime = { version = "7.0.0", path = "../../../primitives/runtime" } sp-consensus = { version = "0.10.0-dev", path = "../../../primitives/consensus/common" } diff --git a/client/network/transactions/src/lib.rs b/client/network/transactions/src/lib.rs index 4cc76507c6f16..f393cd508c934 100644 --- a/client/network/transactions/src/lib.rs +++ b/client/network/transactions/src/lib.rs @@ -40,6 +40,7 @@ use sc_network_common::{ utils::{interval, LruHashSet}, ExHashT, }; +use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; use sp_runtime::traits::Block as BlockT; use std::{ collections::{hash_map::Entry, HashMap}, @@ -168,7 +169,7 @@ impl TransactionsHandlerPrototype { metrics_registry: Option<&Registry>, ) -> error::Result<(TransactionsHandler, TransactionsHandlerController)> { let event_stream = service.event_stream("transactions-handler"); - let (to_handler, from_controller) = mpsc::unbounded(); + let (to_handler, from_controller) = tracing_unbounded("mpsc_transactions_handler", 100_000); let handler = TransactionsHandler { protocol_name: self.protocol_name, @@ -197,7 +198,7 @@ impl TransactionsHandlerPrototype { /// Controls the behaviour of a [`TransactionsHandler`] it is connected to. pub struct TransactionsHandlerController { - to_handler: mpsc::UnboundedSender>, + to_handler: TracingUnboundedSender>, } impl TransactionsHandlerController { @@ -246,7 +247,7 @@ pub struct TransactionsHandler< // All connected peers peers: HashMap>, transaction_pool: Arc>, - from_controller: mpsc::UnboundedReceiver>, + from_controller: TracingUnboundedReceiver>, /// Prometheus metrics. metrics: Option, } From 75a3389a23bd716c9339d61b906d0ed763d8bbf2 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Tue, 20 Dec 2022 17:17:31 +0300 Subject: [PATCH 11/14] Make unbounded channel in `telemetry` metered --- Cargo.lock | 1 + client/telemetry/Cargo.toml | 1 + client/telemetry/src/lib.rs | 14 ++++++++------ 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 321ebfa2f29b4..e3f9e3b3bb869 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -8333,6 +8333,7 @@ dependencies = [ "parking_lot 0.12.1", "pin-project", "rand 0.7.3", + "sc-utils", "serde", "serde_json", "thiserror", diff --git a/client/telemetry/Cargo.toml b/client/telemetry/Cargo.toml index 0a0b9284efa24..79c990d3e07f2 100644 --- a/client/telemetry/Cargo.toml +++ b/client/telemetry/Cargo.toml @@ -21,6 +21,7 @@ log = "0.4.17" parking_lot = "0.12.1" pin-project = "1.0.12" rand = "0.7.2" +sc-utils = { version = "4.0.0-dev", path = "../utils" } serde = { version = "1.0.136", features = ["derive"] } serde_json = "1.0.85" thiserror = "1.0.30" diff --git a/client/telemetry/src/lib.rs b/client/telemetry/src/lib.rs index 503a326f76c2b..aa6b841b79164 100644 --- a/client/telemetry/src/lib.rs +++ b/client/telemetry/src/lib.rs @@ -40,6 +40,7 @@ use futures::{channel::mpsc, prelude::*}; use libp2p::Multiaddr; use log::{error, warn}; use parking_lot::Mutex; +use sc_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; use serde::Serialize; use std::{ collections::{ @@ -147,8 +148,8 @@ pub struct SysInfo { pub struct TelemetryWorker { message_receiver: mpsc::Receiver, message_sender: mpsc::Sender, - register_receiver: mpsc::UnboundedReceiver, - register_sender: mpsc::UnboundedSender, + register_receiver: TracingUnboundedReceiver, + register_sender: TracingUnboundedSender, id_counter: Arc, } @@ -163,7 +164,8 @@ impl TelemetryWorker { // error as early as possible. let _transport = initialize_transport()?; let (message_sender, message_receiver) = mpsc::channel(buffer_size); - let (register_sender, register_receiver) = mpsc::unbounded(); + let (register_sender, register_receiver) = + tracing_unbounded("mpsc_telemetry_register", 10_000); Ok(Self { message_receiver, @@ -360,7 +362,7 @@ impl TelemetryWorker { #[derive(Debug, Clone)] pub struct TelemetryWorkerHandle { message_sender: mpsc::Sender, - register_sender: mpsc::UnboundedSender, + register_sender: TracingUnboundedSender, id_counter: Arc, } @@ -386,7 +388,7 @@ impl TelemetryWorkerHandle { #[derive(Debug)] pub struct Telemetry { message_sender: mpsc::Sender, - register_sender: mpsc::UnboundedSender, + register_sender: TracingUnboundedSender, id: Id, connection_notifier: TelemetryConnectionNotifier, endpoints: Option, @@ -460,7 +462,7 @@ impl TelemetryHandle { /// (re-)establishes. #[derive(Clone, Debug)] pub struct TelemetryConnectionNotifier { - register_sender: mpsc::UnboundedSender, + register_sender: TracingUnboundedSender, addresses: Vec, } From 0d527bceecd47886c893e982e29da9aa1fd58b78 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Tue, 20 Dec 2022 17:55:39 +0300 Subject: [PATCH 12/14] Fix warning --- client/network/transactions/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/network/transactions/src/lib.rs b/client/network/transactions/src/lib.rs index f393cd508c934..a5adb274d29de 100644 --- a/client/network/transactions/src/lib.rs +++ b/client/network/transactions/src/lib.rs @@ -28,7 +28,7 @@ use crate::config::*; use codec::{Decode, Encode}; -use futures::{channel::mpsc, prelude::*, stream::FuturesUnordered}; +use futures::{prelude::*, stream::FuturesUnordered}; use libp2p::{multiaddr, PeerId}; use log::{debug, trace, warn}; use prometheus_endpoint::{register, Counter, PrometheusError, Registry, U64}; From 709efba6830bcca54812ba32a6e961b28bba29ad Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Tue, 20 Dec 2022 18:20:43 +0300 Subject: [PATCH 13/14] Make rustdoc happy --- client/consensus/common/src/import_queue/buffered_link.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/consensus/common/src/import_queue/buffered_link.rs b/client/consensus/common/src/import_queue/buffered_link.rs index ab9c9d7bd64cd..4cc15d18d546f 100644 --- a/client/consensus/common/src/import_queue/buffered_link.rs +++ b/client/consensus/common/src/import_queue/buffered_link.rs @@ -28,7 +28,7 @@ //! # use sp_test_primitives::Block; //! # struct DummyLink; impl Link for DummyLink {} //! # let mut my_link = DummyLink; -//! let (mut tx, mut rx) = buffered_link::(); +//! let (mut tx, mut rx) = buffered_link::(100_000); //! tx.blocks_processed(0, 0, vec![]); //! //! // Calls `my_link.blocks_processed(0, 0, vec![])` when polled. From 2706277054a90d5b66fcc353b761dcfb004cdca0 Mon Sep 17 00:00:00 2001 From: Dmitry Markin Date: Thu, 22 Dec 2022 13:39:56 +0300 Subject: [PATCH 14/14] Expand doc --- client/consensus/common/src/import_queue/buffered_link.rs | 2 +- client/network/src/service/out_events.rs | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/client/consensus/common/src/import_queue/buffered_link.rs b/client/consensus/common/src/import_queue/buffered_link.rs index 4cc15d18d546f..71adcf2dc2ea9 100644 --- a/client/consensus/common/src/import_queue/buffered_link.rs +++ b/client/consensus/common/src/import_queue/buffered_link.rs @@ -51,7 +51,7 @@ use super::BlockImportResult; /// Wraps around an unbounded channel from the `futures` crate. The sender implements `Link` and /// can be used to buffer commands, and the receiver can be used to poll said commands and transfer -/// them to another link. +/// them to another link. `queue_size_warning` sets the warning threshold of the channel queue size. pub fn buffered_link( queue_size_warning: i64, ) -> (BufferedLinkSender, BufferedLinkReceiver) { diff --git a/client/network/src/service/out_events.rs b/client/network/src/service/out_events.rs index 0e93397b5a3a9..8febdd4726b37 100644 --- a/client/network/src/service/out_events.rs +++ b/client/network/src/service/out_events.rs @@ -50,7 +50,8 @@ use std::{ /// Creates a new channel that can be associated to a [`OutChannels`]. /// -/// The name is used in Prometheus reports. +/// The name is used in Prometheus reports, the queue size threshold is used +/// to warn if there are too many unprocessed events in the channel. pub fn channel(name: &'static str, queue_size_warning: i64) -> (Sender, Receiver) { let (tx, rx) = mpsc::unbounded(); let metrics = Arc::new(Mutex::new(None));