diff --git a/client/network/src/transport.rs b/client/network/src/transport.rs index c2b2cda9e5846..883f828e7db51 100644 --- a/client/network/src/transport.rs +++ b/client/network/src/transport.rs @@ -55,15 +55,15 @@ pub fn build_transport( // Build the base layer of the transport. let transport = if !memory_only { let desktop_trans = tcp::TcpConfig::new().nodelay(true); - let desktop_trans = - websocket::WsConfig::new(desktop_trans).or_transport(tcp::TcpConfig::new().nodelay(true)); + let desktop_trans = websocket::WsConfig::new(desktop_trans) + .or_transport(tcp::TcpConfig::new().nodelay(true)); let dns_init = futures::executor::block_on(dns::DnsConfig::system(desktop_trans)); EitherTransport::Left(if let Ok(dns) = dns_init { EitherTransport::Left(dns) } else { let desktop_trans = tcp::TcpConfig::new().nodelay(true); - let desktop_trans = - websocket::WsConfig::new(desktop_trans).or_transport(tcp::TcpConfig::new().nodelay(true)); + let desktop_trans = websocket::WsConfig::new(desktop_trans) + .or_transport(tcp::TcpConfig::new().nodelay(true)); EitherTransport::Right(desktop_trans.map_err(dns::DnsErr::Transport)) }) } else { diff --git a/client/telemetry/src/lib.rs b/client/telemetry/src/lib.rs index fc40f999a6779..ab57072272a51 100644 --- a/client/telemetry/src/lib.rs +++ b/client/telemetry/src/lib.rs @@ -42,7 +42,10 @@ use log::{error, warn}; use parking_lot::Mutex; use serde::Serialize; use std::{ - collections::HashMap, + collections::{ + hash_map::Entry::{Occupied, Vacant}, + HashMap, + }, sync::{atomic, Arc}, }; @@ -147,7 +150,6 @@ pub struct TelemetryWorker { register_receiver: mpsc::UnboundedReceiver, register_sender: mpsc::UnboundedSender, id_counter: Arc, - transport: WsTrans, } impl TelemetryWorker { @@ -155,7 +157,7 @@ impl TelemetryWorker { /// /// Only one is needed per process. pub fn new(buffer_size: usize) -> Result { - let transport = initialize_transport()?; + let _transport = initialize_transport()?; let (message_sender, message_receiver) = mpsc::channel(buffer_size); let (register_sender, register_receiver) = mpsc::unbounded(); @@ -165,7 +167,6 @@ impl TelemetryWorker { register_receiver, register_sender, id_counter: Arc::new(atomic::AtomicU64::new(1)), - transport, }) } @@ -200,7 +201,6 @@ impl TelemetryWorker { &mut node_pool, &mut node_map, &mut pending_connection_notifications, - self.transport.clone(), ).await, } } @@ -211,7 +211,6 @@ impl TelemetryWorker { node_pool: &mut HashMap>, node_map: &mut HashMap>, pending_connection_notifications: &mut Vec<(Multiaddr, ConnectionNotifierSender)>, - transport: WsTrans, ) { let input = input.expect("the stream is never closed; qed"); @@ -248,9 +247,24 @@ impl TelemetryWorker { ); node_map.entry(id).or_default().push((verbosity, addr.clone())); - let node = node_pool.entry(addr.clone()).or_insert_with(|| { - Node::new(transport.clone(), addr.clone(), Vec::new(), Vec::new()) - }); + let node = match node_pool.entry(addr.clone()) { + Occupied(entry) => entry.into_mut(), + Vacant(entry) => { + let transport = initialize_transport(); + let transport = match transport { + Ok(t) => t, + Err(err) => { + log::error!( + target: "telemetry", + "Could not initialise transport: {}", + err, + ); + continue + }, + }; + entry.insert(Node::new(transport, addr.clone(), Vec::new(), Vec::new())) + }, + }; node.connection_messages.extend(connection_message.clone()); diff --git a/client/telemetry/src/node.rs b/client/telemetry/src/node.rs index aa0f5a3843d33..0d71a363a1b26 100644 --- a/client/telemetry/src/node.rs +++ b/client/telemetry/src/node.rs @@ -110,7 +110,6 @@ impl Node { impl Node where - TTrans: Clone + Unpin, TTrans::Dial: Unpin, TTrans::Output: Sink, Error = TSinkErr> + Stream, TSinkErr>> + Unpin, @@ -137,7 +136,7 @@ pub(crate) enum Infallible {} impl Sink for Node where - TTrans: Clone + Unpin, + TTrans: Unpin, TTrans::Dial: Unpin, TTrans::Output: Sink, Error = TSinkErr> + Stream, TSinkErr>> + Unpin, @@ -228,15 +227,18 @@ where socket = NodeSocket::wait_reconnect(); }, }, - NodeSocket::ReconnectNow => match self.transport.clone().dial(self.addr.clone()) { - Ok(d) => { - log::trace!(target: "telemetry", "Re-dialing {}", self.addr); - socket = NodeSocket::Dialing(d); - }, - Err(err) => { - log::warn!(target: "telemetry", "❌ Error while re-dialing {}: {:?}", self.addr, err); - socket = NodeSocket::wait_reconnect(); - }, + NodeSocket::ReconnectNow => { + let addr = self.addr.clone(); + match self.transport.dial(addr) { + Ok(d) => { + log::trace!(target: "telemetry", "Re-dialing {}", self.addr); + socket = NodeSocket::Dialing(d); + }, + Err(err) => { + log::warn!(target: "telemetry", "❌ Error while re-dialing {}: {:?}", self.addr, err); + socket = NodeSocket::wait_reconnect(); + }, + } }, NodeSocket::WaitingReconnect(mut s) => { if Future::poll(Pin::new(&mut s), cx).is_ready() {