Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions client/network/src/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,15 @@ pub fn build_transport(
// Build the base layer of the transport.
let transport = if !memory_only {
let desktop_trans = tcp::TcpConfig::new().nodelay(true);
let desktop_trans =
websocket::WsConfig::new(desktop_trans).or_transport(tcp::TcpConfig::new().nodelay(true));
let desktop_trans = websocket::WsConfig::new(desktop_trans)
.or_transport(tcp::TcpConfig::new().nodelay(true));
let dns_init = futures::executor::block_on(dns::DnsConfig::system(desktop_trans));
EitherTransport::Left(if let Ok(dns) = dns_init {
EitherTransport::Left(dns)
} else {
let desktop_trans = tcp::TcpConfig::new().nodelay(true);
let desktop_trans =
websocket::WsConfig::new(desktop_trans).or_transport(tcp::TcpConfig::new().nodelay(true));
let desktop_trans = websocket::WsConfig::new(desktop_trans)
.or_transport(tcp::TcpConfig::new().nodelay(true));
EitherTransport::Right(desktop_trans.map_err(dns::DnsErr::Transport))
})
} else {
Expand Down
32 changes: 23 additions & 9 deletions client/telemetry/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ use log::{error, warn};
use parking_lot::Mutex;
use serde::Serialize;
use std::{
collections::HashMap,
collections::{
hash_map::Entry::{Occupied, Vacant},
HashMap,
},
sync::{atomic, Arc},
};

Expand Down Expand Up @@ -147,15 +150,14 @@ pub struct TelemetryWorker {
register_receiver: mpsc::UnboundedReceiver<Register>,
register_sender: mpsc::UnboundedSender<Register>,
id_counter: Arc<atomic::AtomicU64>,
transport: WsTrans,
}

impl TelemetryWorker {
/// Instantiate a new [`TelemetryWorker`] which can run in background.
///
/// Only one is needed per process.
pub fn new(buffer_size: usize) -> Result<Self> {
let transport = initialize_transport()?;
let _transport = initialize_transport()?;
let (message_sender, message_receiver) = mpsc::channel(buffer_size);
let (register_sender, register_receiver) = mpsc::unbounded();

Expand All @@ -165,7 +167,6 @@ impl TelemetryWorker {
register_receiver,
register_sender,
id_counter: Arc::new(atomic::AtomicU64::new(1)),
transport,
})
}

Expand Down Expand Up @@ -200,7 +201,6 @@ impl TelemetryWorker {
&mut node_pool,
&mut node_map,
&mut pending_connection_notifications,
self.transport.clone(),
).await,
}
}
Expand All @@ -211,7 +211,6 @@ impl TelemetryWorker {
node_pool: &mut HashMap<Multiaddr, Node<WsTrans>>,
node_map: &mut HashMap<Id, Vec<(VerbosityLevel, Multiaddr)>>,
pending_connection_notifications: &mut Vec<(Multiaddr, ConnectionNotifierSender)>,
transport: WsTrans,
) {
let input = input.expect("the stream is never closed; qed");

Expand Down Expand Up @@ -248,9 +247,24 @@ impl TelemetryWorker {
);
node_map.entry(id).or_default().push((verbosity, addr.clone()));

let node = node_pool.entry(addr.clone()).or_insert_with(|| {
Node::new(transport.clone(), addr.clone(), Vec::new(), Vec::new())
});
let node = match node_pool.entry(addr.clone()) {
Occupied(entry) => entry.into_mut(),
Vacant(entry) => {
let transport = initialize_transport();
let transport = match transport {
Ok(t) => t,
Err(err) => {
log::error!(
target: "telemetry",
"Could not initialise transport: {}",
err,
);
continue
},
};
entry.insert(Node::new(transport, addr.clone(), Vec::new(), Vec::new()))
},
};

node.connection_messages.extend(connection_message.clone());

Expand Down
24 changes: 13 additions & 11 deletions client/telemetry/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ impl<TTrans: Transport> Node<TTrans> {

impl<TTrans: Transport, TSinkErr> Node<TTrans>
where
TTrans: Clone + Unpin,
TTrans::Dial: Unpin,
TTrans::Output:
Sink<Vec<u8>, Error = TSinkErr> + Stream<Item = Result<Vec<u8>, TSinkErr>> + Unpin,
Expand All @@ -137,7 +136,7 @@ pub(crate) enum Infallible {}

impl<TTrans: Transport, TSinkErr> Sink<TelemetryPayload> for Node<TTrans>
where
TTrans: Clone + Unpin,
TTrans: Unpin,
TTrans::Dial: Unpin,
TTrans::Output:
Sink<Vec<u8>, Error = TSinkErr> + Stream<Item = Result<Vec<u8>, TSinkErr>> + Unpin,
Expand Down Expand Up @@ -228,15 +227,18 @@ where
socket = NodeSocket::wait_reconnect();
},
},
NodeSocket::ReconnectNow => match self.transport.clone().dial(self.addr.clone()) {
Ok(d) => {
log::trace!(target: "telemetry", "Re-dialing {}", self.addr);
socket = NodeSocket::Dialing(d);
},
Err(err) => {
log::warn!(target: "telemetry", "❌ Error while re-dialing {}: {:?}", self.addr, err);
socket = NodeSocket::wait_reconnect();
},
NodeSocket::ReconnectNow => {
let addr = self.addr.clone();
match self.transport.dial(addr) {
Ok(d) => {
log::trace!(target: "telemetry", "Re-dialing {}", self.addr);
socket = NodeSocket::Dialing(d);
},
Err(err) => {
log::warn!(target: "telemetry", "❌ Error while re-dialing {}: {:?}", self.addr, err);
socket = NodeSocket::wait_reconnect();
},
}
},
NodeSocket::WaitingReconnect(mut s) => {
if Future::poll(Pin::new(&mut s), cx).is_ready() {
Expand Down