diff --git a/Cargo.lock b/Cargo.lock index 65f5935a1e900..402dfb6ec9e1f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6911,6 +6911,7 @@ version = "0.8.0-rc6" dependencies = [ "assert_matches", "async-std", + "async-trait", "bitflags", "bs58", "bytes 0.5.6", diff --git a/client/cli/src/params/network_params.rs b/client/cli/src/params/network_params.rs index 4a33644e8934e..faaf2c2bd210d 100644 --- a/client/cli/src/params/network_params.rs +++ b/client/cli/src/params/network_params.rs @@ -148,6 +148,7 @@ impl NetworkParams { listen_addresses, public_addresses, notifications_protocols: Vec::new(), + request_response_protocols: Vec::new(), node_key, node_name: node_name.to_string(), client_version: client_id.to_string(), diff --git a/client/network/Cargo.toml b/client/network/Cargo.toml index de885bc65a45c..d5729ae06b2ca 100644 --- a/client/network/Cargo.toml +++ b/client/network/Cargo.toml @@ -16,6 +16,7 @@ targets = ["x86_64-unknown-linux-gnu"] prost-build = "0.6.1" [dependencies] +async-trait = "0.1" async-std = { version = "1.6.2", features = ["unstable"] } bitflags = "1.2.0" bs58 = "0.3.1" @@ -64,7 +65,7 @@ zeroize = "1.0.0" [dependencies.libp2p] version = "0.24.0" default-features = false -features = ["identify", "kad", "mdns-async-std", "mplex", "noise", "ping", "tcp-async-std", "websocket", "yamux"] +features = ["identify", "kad", "mdns-async-std", "mplex", "noise", "ping", "request-response", "tcp-async-std", "websocket", "yamux"] [dev-dependencies] assert_matches = "1.3" diff --git a/client/network/src/behaviour.rs b/client/network/src/behaviour.rs index 20b5adf76b832..4a47a26f55c24 100644 --- a/client/network/src/behaviour.rs +++ b/client/network/src/behaviour.rs @@ -16,7 +16,7 @@ use crate::{ config::{ProtocolId, Role}, block_requests, light_client_handler, finality_requests, - peer_info, discovery::{DiscoveryBehaviour, DiscoveryConfig, DiscoveryOut}, + peer_info, request_responses, discovery::{DiscoveryBehaviour, DiscoveryConfig, DiscoveryOut}, protocol::{message::{self, Roles}, CustomMessageOutcome, NotificationsSink, Protocol}, ObservedRole, DhtEvent, ExHashT, }; @@ -39,6 +39,10 @@ use std::{ time::Duration, }; +pub use crate::request_responses::{ + ResponseFailure, InboundFailure, RequestFailure, OutboundFailure, RequestId, SendRequestError +}; + /// General behaviour of the network. Combines all protocols together. #[derive(NetworkBehaviour)] #[behaviour(out_event = "BehaviourOut", poll_method = "poll")] @@ -50,6 +54,8 @@ pub struct Behaviour { peer_info: peer_info::PeerInfoBehaviour, /// Discovers nodes of the network. discovery: DiscoveryBehaviour, + /// Generic request-reponse protocols. + request_responses: request_responses::RequestResponsesBehaviour, /// Block request handling. block_requests: block_requests::BlockRequests, /// Finality proof request handling. @@ -76,22 +82,40 @@ pub enum BehaviourOut { RandomKademliaStarted(ProtocolId), /// We have received a request from a peer and answered it. - AnsweredRequest { + /// + /// This event is generated for statistics purposes. + InboundRequest { /// Peer which sent us a request. peer: PeerId, /// Protocol name of the request. - protocol: String, - /// Time it took to build the response. - build_time: Duration, + protocol: Cow<'static, str>, + /// If `Ok`, contains the time elapsed between when we received the request and when we + /// sent back the response. If `Err`, the error that happened. + result: Result, }, + + /// A request initiated using [`Behaviour::send_request`] has succeeded or failed. + RequestFinished { + /// Request that has succeeded. + request_id: RequestId, + /// Response sent by the remote or reason for failure. + result: Result, RequestFailure>, + }, + /// Started a new request with the given node. - RequestStarted { + /// + /// This event is for statistics purposes only. The request and response handling are entirely + /// internal to the behaviour. + OpaqueRequestStarted { peer: PeerId, /// Protocol name of the request. protocol: String, }, /// Finished, successfully or not, a previously-started request. - RequestFinished { + /// + /// This event is for statistics purposes only. The request and response handling are entirely + /// internal to the behaviour. + OpaqueRequestFinished { /// Who we were requesting. peer: PeerId, /// Protocol name of the request. @@ -161,17 +185,20 @@ impl Behaviour { finality_proof_requests: finality_requests::FinalityProofRequests, light_client_handler: light_client_handler::LightClientHandler, disco_config: DiscoveryConfig, - ) -> Self { - Behaviour { + request_response_protocols: Vec, + ) -> Result { + Ok(Behaviour { substrate, peer_info: peer_info::PeerInfoBehaviour::new(user_agent, local_public_key), discovery: disco_config.finish(), + request_responses: + request_responses::RequestResponsesBehaviour::new(request_response_protocols.into_iter())?, block_requests, finality_proof_requests, light_client_handler, events: VecDeque::new(), role, - } + }) } /// Returns the list of nodes that we know exist in the network. @@ -208,6 +235,16 @@ impl Behaviour { self.peer_info.node(peer_id) } + /// Initiates sending a request. + /// + /// An error is returned if we are not connected to the target peer of if the protocol doesn't + /// match one that has been registered. + pub fn send_request(&mut self, target: &PeerId, protocol: &str, request: Vec) + -> Result + { + self.request_responses.send_request(target, protocol, request) + } + /// Registers a new notifications protocol. /// /// Please call `event_stream` before registering a protocol, otherwise you may miss events @@ -298,18 +335,18 @@ Behaviour { CustomMessageOutcome::BlockRequest { target, request } => { match self.block_requests.send_request(&target, request) { block_requests::SendRequestOutcome::Ok => { - self.events.push_back(BehaviourOut::RequestStarted { + self.events.push_back(BehaviourOut::OpaqueRequestStarted { peer: target, protocol: self.block_requests.protocol_name().to_owned(), }); }, block_requests::SendRequestOutcome::Replaced { request_duration, .. } => { - self.events.push_back(BehaviourOut::RequestFinished { + self.events.push_back(BehaviourOut::OpaqueRequestFinished { peer: target.clone(), protocol: self.block_requests.protocol_name().to_owned(), request_duration, }); - self.events.push_back(BehaviourOut::RequestStarted { + self.events.push_back(BehaviourOut::OpaqueRequestStarted { peer: target, protocol: self.block_requests.protocol_name().to_owned(), }); @@ -358,18 +395,39 @@ Behaviour { } } +impl NetworkBehaviourEventProcess for Behaviour { + fn inject_event(&mut self, event: request_responses::Event) { + match event { + request_responses::Event::InboundRequest { peer, protocol, result } => { + self.events.push_back(BehaviourOut::InboundRequest { + peer, + protocol, + result, + }); + } + + request_responses::Event::RequestFinished { request_id, result } => { + self.events.push_back(BehaviourOut::RequestFinished { + request_id, + result, + }); + }, + } + } +} + impl NetworkBehaviourEventProcess> for Behaviour { fn inject_event(&mut self, event: block_requests::Event) { match event { block_requests::Event::AnsweredRequest { peer, total_handling_time } => { - self.events.push_back(BehaviourOut::AnsweredRequest { + self.events.push_back(BehaviourOut::InboundRequest { peer, - protocol: self.block_requests.protocol_name().to_owned(), - build_time: total_handling_time, + protocol: self.block_requests.protocol_name().to_owned().into(), + result: Ok(total_handling_time), }); }, block_requests::Event::Response { peer, original_request: _, response, request_duration } => { - self.events.push_back(BehaviourOut::RequestFinished { + self.events.push_back(BehaviourOut::OpaqueRequestFinished { peer: peer.clone(), protocol: self.block_requests.protocol_name().to_owned(), request_duration, @@ -381,7 +439,7 @@ impl NetworkBehaviourEventProcess { // There doesn't exist any mechanism to report cancellations or timeouts yet, so // we process them by disconnecting the node. - self.events.push_back(BehaviourOut::RequestFinished { + self.events.push_back(BehaviourOut::OpaqueRequestFinished { peer: peer.clone(), protocol: self.block_requests.protocol_name().to_owned(), request_duration, diff --git a/client/network/src/config.rs b/client/network/src/config.rs index bde34a0a5716a..5185befacf5ae 100644 --- a/client/network/src/config.rs +++ b/client/network/src/config.rs @@ -23,6 +23,7 @@ pub use crate::chain::{Client, FinalityProofProvider}; pub use crate::on_demand_layer::{AlwaysBadChecker, OnDemand}; +pub use crate::request_responses::{IncomingRequest, ProtocolConfig as RequestResponseConfig}; pub use libp2p::{identity, core::PublicKey, wasm_ext::ExtTransport, build_multiaddr}; // Note: this re-export shouldn't be part of the public API of the crate and will be removed in @@ -34,9 +35,10 @@ use crate::ExHashT; use core::{fmt, iter}; use futures::future; -use libp2p::identity::{ed25519, Keypair}; -use libp2p::wasm_ext; -use libp2p::{multiaddr, Multiaddr, PeerId}; +use libp2p::{ + identity::{ed25519, Keypair}, + multiaddr, wasm_ext, Multiaddr, PeerId, +}; use prometheus_endpoint::Registry; use sp_consensus::{block_validation::BlockAnnounceValidator, import_queue::ImportQueue}; use sp_runtime::{traits::Block as BlockT, ConsensusEngineId}; @@ -414,6 +416,8 @@ pub struct NetworkConfiguration { /// List of notifications protocols that the node supports. Must also include a /// `ConsensusEngineId` for backwards-compatibility. pub notifications_protocols: Vec<(ConsensusEngineId, Cow<'static, [u8]>)>, + /// List of request-response protocols that the node supports. + pub request_response_protocols: Vec, /// Maximum allowed number of incoming connections. pub in_peers: u32, /// Number of outgoing connections we're trying to maintain. @@ -449,6 +453,7 @@ impl NetworkConfiguration { boot_nodes: Vec::new(), node_key, notifications_protocols: Vec::new(), + request_response_protocols: Vec::new(), in_peers: 25, out_peers: 75, reserved_nodes: Vec::new(), @@ -465,9 +470,7 @@ impl NetworkConfiguration { allow_non_globals_in_dht: false, } } -} -impl NetworkConfiguration { /// Create new default configuration for localhost-only connection with random port (useful for testing) pub fn new_local() -> NetworkConfiguration { let mut config = NetworkConfiguration::new( diff --git a/client/network/src/error.rs b/client/network/src/error.rs index d5a4024ef53d7..7d7603ce92aab 100644 --- a/client/network/src/error.rs +++ b/client/network/src/error.rs @@ -21,7 +21,7 @@ use crate::config::TransportConfig; use libp2p::{PeerId, Multiaddr}; -use std::fmt; +use std::{borrow::Cow, fmt}; /// Result type alias for the network. pub type Result = std::result::Result; @@ -61,6 +61,12 @@ pub enum Error { /// The invalid addresses. addresses: Vec, }, + /// The same request-response protocol has been registered multiple times. + #[display(fmt = "Request-response protocol registered multiple times: {}", protocol)] + DuplicateRequestResponseProtocol { + /// Name of the protocol registered multiple times. + protocol: Cow<'static, str>, + }, } // Make `Debug` use the `Display` implementation. @@ -78,6 +84,7 @@ impl std::error::Error for Error { Error::DuplicateBootnode { .. } => None, Error::Prometheus(ref err) => Some(err), Error::AddressesForAnotherTransport { .. } => None, + Error::DuplicateRequestResponseProtocol { .. } => None, } } } diff --git a/client/network/src/lib.rs b/client/network/src/lib.rs index e01b260263566..326d73c372110 100644 --- a/client/network/src/lib.rs +++ b/client/network/src/lib.rs @@ -253,6 +253,7 @@ mod finality_requests; mod light_client_handler; mod on_demand_layer; mod protocol; +mod request_responses; mod schema; mod service; mod transport; @@ -263,13 +264,10 @@ pub mod error; pub mod gossip; pub mod network_state; -pub use service::{NetworkService, NetworkWorker}; -pub use protocol::PeerInfo; -pub use protocol::event::{Event, DhtEvent, ObservedRole}; -pub use protocol::sync::SyncState; -pub use libp2p::{Multiaddr, PeerId}; #[doc(inline)] -pub use libp2p::multiaddr; +pub use libp2p::{multiaddr, Multiaddr, PeerId}; +pub use protocol::{event::{DhtEvent, Event, ObservedRole}, sync::SyncState, PeerInfo}; +pub use service::{NetworkService, NetworkWorker, RequestFailure, OutboundFailure}; pub use sc_peerset::ReputationChange; use sp_runtime::traits::{Block as BlockT, NumberFor}; diff --git a/client/network/src/request_responses.rs b/client/network/src/request_responses.rs new file mode 100644 index 0000000000000..92233c77d6bd1 --- /dev/null +++ b/client/network/src/request_responses.rs @@ -0,0 +1,892 @@ +// Copyright 2019-2020 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! Collection of request-response protocols. +//! +//! The [`RequestResponses`] struct defined in this module provides support for zero or more +//! so-called "request-response" protocols. +//! +//! A request-response protocol works in the following way: +//! +//! - For every emitted request, a new substream is open and the protocol is negotiated. If the +//! remote supports the protocol, the size of the request is sent as a LEB128 number, followed +//! with the request itself. The remote then sends the size of the response as a LEB128 number, +//! followed with the response. +//! +//! - Requests have a certain time limit before they time out. This time includes the time it +//! takes to send/receive the request and response. +//! +//! - If provided, a ["requests processing"](RequestResponseConfig::inbound_queue) channel +//! is used to handle incoming requests. +//! + +use futures::{channel::{mpsc, oneshot}, prelude::*}; +use libp2p::{ + core::{ + connection::{ConnectionId, ListenerId}, + ConnectedPoint, Multiaddr, PeerId, + }, + request_response::{ + RequestResponse, RequestResponseCodec, RequestResponseConfig, RequestResponseEvent, + RequestResponseMessage, ResponseChannel, ProtocolSupport + }, + swarm::{ + protocols_handler::multi::MultiHandler, NetworkBehaviour, NetworkBehaviourAction, + PollParameters, ProtocolsHandler, + }, +}; +use std::{ + borrow::Cow, collections::{hash_map::Entry, HashMap}, convert::TryFrom as _, io, iter, + pin::Pin, task::{Context, Poll}, time::Duration, +}; + +pub use libp2p::request_response::{InboundFailure, OutboundFailure, RequestId}; + +/// Configuration for a single request-response protocol. +#[derive(Debug, Clone)] +pub struct ProtocolConfig { + /// Name of the protocol on the wire. Should be something like `/foo/bar`. + pub name: Cow<'static, str>, + + /// Maximum allowed size, in bytes, of a request. + /// + /// Any request larger than this value will be declined as a way to avoid allocating too + /// much memory for it. + pub max_request_size: u64, + + /// Maximum allowed size, in bytes, of a response. + /// + /// Any response larger than this value will be declined as a way to avoid allocating too + /// much memory for it. + pub max_response_size: u64, + + /// Duration after which emitted requests are considered timed out. + /// + /// If you expect the response to come back quickly, you should set this to a smaller duration. + pub request_timeout: Duration, + + /// Channel on which the networking service will send incoming requests. + /// + /// Every time a peer sends a request to the local node using this protocol, the networking + /// service will push an element on this channel. The receiving side of this channel then has + /// to pull this element, process the request, and send back the response to send back to the + /// peer. + /// + /// The size of the channel has to be carefully chosen. If the channel is full, the networking + /// service will discard the incoming request send back an error to the peer. Consequently, + /// the channel being full is an indicator that the node is overloaded. + /// + /// You can typically set the size of the channel to `T / d`, where `T` is the + /// `request_timeout` and `d` is the expected average duration of CPU and I/O it takes to + /// build a response. + /// + /// Can be `None` if the local node does not support answering incoming requests. + /// If this is `None`, then the local node will not advertise support for this protocol towards + /// other peers. If this is `Some` but the channel is closed, then the local node will + /// advertise support for this protocol, but any incoming request will lead to an error being + /// sent back. + pub inbound_queue: Option>, +} + +/// A single request received by a peer on a request-response protocol. +#[derive(Debug)] +pub struct IncomingRequest { + /// Who sent the request. + pub peer: PeerId, + + /// Request sent by the remote. Will always be smaller than + /// [`RequestResponseConfig::max_request_size`]. + pub payload: Vec, + + /// Channel to send back the response to. + pub pending_response: oneshot::Sender>, +} + +/// Event generated by the [`RequestResponsesBehaviour`]. +#[derive(Debug)] +pub enum Event { + /// A remote sent a request and either we have successfully answered it or an error happened. + /// + /// This event is generated for statistics purposes. + InboundRequest { + /// Peer which has emitted the request. + peer: PeerId, + /// Name of the protocol in question. + protocol: Cow<'static, str>, + /// If `Ok`, contains the time elapsed between when we received the request and when we + /// sent back the response. If `Err`, the error that happened. + result: Result, + }, + + /// A request initiated using [`RequestResponsesBehaviour::send_request`] has succeeded or + /// failed. + RequestFinished { + /// Request that has succeeded. + request_id: RequestId, + /// Response sent by the remote or reason for failure. + result: Result, RequestFailure>, + }, +} + +/// Implementation of `NetworkBehaviour` that provides support for request-response protocols. +pub struct RequestResponsesBehaviour { + /// The multiple sub-protocols, by name. + /// Contains the underlying libp2p `RequestResponse` behaviour, plus an optional + /// "response builder" used to build responses for incoming requests. + protocols: HashMap< + Cow<'static, str>, + (RequestResponse, Option>) + >, + + /// Whenever an incoming request arrives, a `Future` is added to this list and will yield the + /// response to send back to the remote. + pending_responses: stream::FuturesUnordered< + Pin + Send>> + >, +} + +/// Generated by the response builder and waiting to be processed. +enum RequestProcessingOutcome { + Response { + protocol: Cow<'static, str>, + inner_channel: ResponseChannel, ()>>, + response: Vec, + }, + Busy { + peer: PeerId, + protocol: Cow<'static, str>, + }, +} + +impl RequestResponsesBehaviour { + /// Creates a new behaviour. Must be passed a list of supported protocols. Returns an error if + /// the same protocol is passed twice. + pub fn new(list: impl Iterator) -> Result { + let mut protocols = HashMap::new(); + for protocol in list { + let mut cfg = RequestResponseConfig::default(); + cfg.set_connection_keep_alive(Duration::from_secs(10)); + cfg.set_request_timeout(protocol.request_timeout); + + let protocol_support = if protocol.inbound_queue.is_some() { + ProtocolSupport::Full + } else { + ProtocolSupport::Outbound + }; + + let rq_rp = RequestResponse::new(GenericCodec { + max_request_size: protocol.max_request_size, + max_response_size: protocol.max_response_size, + }, iter::once((protocol.name.as_bytes().to_vec(), protocol_support)), cfg); + + match protocols.entry(protocol.name) { + Entry::Vacant(e) => e.insert((rq_rp, protocol.inbound_queue)), + Entry::Occupied(e) => + return Err(RegisterError::DuplicateProtocol(e.key().clone())), + }; + } + + Ok(Self { + protocols, + pending_responses: stream::FuturesUnordered::new(), + }) + } + + /// Initiates sending a request. + /// + /// An error is returned if we are not connected to the target peer or if the protocol doesn't + /// match one that has been registered. + pub fn send_request(&mut self, target: &PeerId, protocol: &str, request: Vec) + -> Result + { + if let Some((protocol, _)) = self.protocols.get_mut(protocol) { + if protocol.is_connected(target) { + Ok(protocol.send_request(target, request)) + } else { + Err(SendRequestError::NotConnected) + } + } else { + Err(SendRequestError::UnknownProtocol) + } + } +} + +impl NetworkBehaviour for RequestResponsesBehaviour { + type ProtocolsHandler = MultiHandler< + String, + as NetworkBehaviour>::ProtocolsHandler, + >; + type OutEvent = Event; + + fn new_handler(&mut self) -> Self::ProtocolsHandler { + let iter = self.protocols.iter_mut() + .map(|(p, (r, _))| (p.to_string(), NetworkBehaviour::new_handler(r))); + + MultiHandler::try_from_iter(iter) + .expect("Protocols are in a HashMap and there can be at most one handler per \ + protocol name, which is the only possible error; qed") + } + + fn addresses_of_peer(&mut self, _: &PeerId) -> Vec { + Vec::new() + } + + fn inject_connection_established( + &mut self, + peer_id: &PeerId, + conn: &ConnectionId, + endpoint: &ConnectedPoint, + ) { + for (p, _) in self.protocols.values_mut() { + NetworkBehaviour::inject_connection_established(p, peer_id, conn, endpoint) + } + } + + fn inject_connected(&mut self, peer_id: &PeerId) { + for (p, _) in self.protocols.values_mut() { + NetworkBehaviour::inject_connected(p, peer_id) + } + } + + fn inject_connection_closed(&mut self, peer_id: &PeerId, conn: &ConnectionId, endpoint: &ConnectedPoint) { + for (p, _) in self.protocols.values_mut() { + NetworkBehaviour::inject_connection_closed(p, peer_id, conn, endpoint) + } + } + + fn inject_disconnected(&mut self, peer_id: &PeerId) { + for (p, _) in self.protocols.values_mut() { + NetworkBehaviour::inject_disconnected(p, peer_id) + } + } + + fn inject_addr_reach_failure( + &mut self, + peer_id: Option<&PeerId>, + addr: &Multiaddr, + error: &dyn std::error::Error + ) { + for (p, _) in self.protocols.values_mut() { + NetworkBehaviour::inject_addr_reach_failure(p, peer_id, addr, error) + } + } + + fn inject_event( + &mut self, + peer_id: PeerId, + connection: ConnectionId, + (p_name, event): ::OutEvent, + ) { + if let Some((proto, _)) = self.protocols.get_mut(&*p_name) { + return proto.inject_event(peer_id, connection, event) + } + + log::warn!(target: "sub-libp2p", + "inject_node_event: no request-response instance registered for protocol {:?}", + p_name) + } + + fn inject_new_external_addr(&mut self, addr: &Multiaddr) { + for (p, _) in self.protocols.values_mut() { + NetworkBehaviour::inject_new_external_addr(p, addr) + } + } + + fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) { + for (p, _) in self.protocols.values_mut() { + NetworkBehaviour::inject_expired_listen_addr(p, addr) + } + } + + fn inject_dial_failure(&mut self, peer_id: &PeerId) { + for (p, _) in self.protocols.values_mut() { + NetworkBehaviour::inject_dial_failure(p, peer_id) + } + } + + fn inject_new_listen_addr(&mut self, addr: &Multiaddr) { + for (p, _) in self.protocols.values_mut() { + NetworkBehaviour::inject_new_listen_addr(p, addr) + } + } + + fn inject_listener_error(&mut self, id: ListenerId, err: &(dyn std::error::Error + 'static)) { + for (p, _) in self.protocols.values_mut() { + NetworkBehaviour::inject_listener_error(p, id, err) + } + } + + fn inject_listener_closed(&mut self, id: ListenerId, reason: Result<(), &io::Error>) { + for (p, _) in self.protocols.values_mut() { + NetworkBehaviour::inject_listener_closed(p, id, reason) + } + } + + fn poll( + &mut self, + cx: &mut Context, + params: &mut impl PollParameters, + ) -> Poll< + NetworkBehaviourAction< + ::InEvent, + Self::OutEvent, + >, + > { + 'poll_all: loop { + // Poll to see if any response is ready to be sent back. + while let Poll::Ready(Some(result)) = self.pending_responses.poll_next_unpin(cx) { + match result { + RequestProcessingOutcome::Response { + protocol, inner_channel, response + } => { + if let Some((protocol, _)) = self.protocols.get_mut(&*protocol) { + protocol.send_response(inner_channel, Ok(response)); + } + } + RequestProcessingOutcome::Busy { peer, protocol } => { + let out = Event::InboundRequest { + peer, + protocol, + result: Err(ResponseFailure::Busy), + }; + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(out)); + } + } + } + + // Poll request-responses protocols. + for (protocol, (behaviour, resp_builder)) in &mut self.protocols { + while let Poll::Ready(ev) = behaviour.poll(cx, params) { + let ev = match ev { + // Main events we are interested in. + NetworkBehaviourAction::GenerateEvent(ev) => ev, + + // Other events generated by the underlying behaviour are transparently + // passed through. + NetworkBehaviourAction::DialAddress { address } => { + log::error!("The request-response isn't supposed to start dialing peers"); + return Poll::Ready(NetworkBehaviourAction::DialAddress { address }) + } + NetworkBehaviourAction::DialPeer { peer_id, condition } => { + log::error!("The request-response isn't supposed to start dialing peers"); + return Poll::Ready(NetworkBehaviourAction::DialPeer { + peer_id, + condition, + }) + } + NetworkBehaviourAction::NotifyHandler { + peer_id, + handler, + event, + } => { + return Poll::Ready(NetworkBehaviourAction::NotifyHandler { + peer_id, + handler, + event: ((*protocol).to_string(), event), + }) + } + NetworkBehaviourAction::ReportObservedAddr { address } => { + return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { + address, + }) + } + }; + + match ev { + // Received a request from a remote. + RequestResponseEvent::Message { + peer, + message: RequestResponseMessage::Request { request, channel }, + } => { + let (tx, rx) = oneshot::channel(); + + // Submit the request to the "response builder" passed by the user at + // initialization. + if let Some(resp_builder) = resp_builder { + // If the response builder is too busy, silently drop `tx`. + // This will be reported as a `Busy` error. + let _ = resp_builder.try_send(IncomingRequest { + peer: peer.clone(), + payload: request, + pending_response: tx, + }); + } + + let protocol = protocol.clone(); + self.pending_responses.push(Box::pin(async move { + // The `tx` created above can be dropped if we are not capable of + // processing this request, which is reflected as a "Busy" error. + if let Ok(response) = rx.await { + RequestProcessingOutcome::Response { + protocol, inner_channel: channel, response + } + } else { + RequestProcessingOutcome::Busy { peer, protocol } + } + })); + + // This `continue` makes sure that `pending_responses` gets polled + // after we have added the new element. + continue 'poll_all; + } + + // Received a response from a remote to one of our requests. + RequestResponseEvent::Message { + message: + RequestResponseMessage::Response { + request_id, + response, + }, + .. + } => { + let out = Event::RequestFinished { + request_id, + result: response.map_err(|()| RequestFailure::Refused), + }; + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(out)); + } + + // One of our requests has failed. + RequestResponseEvent::OutboundFailure { + request_id, + error, + .. + } => { + let out = Event::RequestFinished { + request_id, + result: Err(RequestFailure::Network(error)), + }; + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(out)); + } + + // Remote has tried to send a request but failed. + RequestResponseEvent::InboundFailure { peer, error } => { + let out = Event::InboundRequest { + peer, + protocol: protocol.clone(), + result: Err(ResponseFailure::Network(error)), + }; + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(out)); + } + }; + } + } + + break Poll::Pending; + } + } +} + +/// Error when registering a protocol. +#[derive(Debug, derive_more::Display, derive_more::Error)] +pub enum RegisterError { + /// A protocol has been specified multiple times. + DuplicateProtocol(#[error(ignore)] Cow<'static, str>), +} + +/// Error when sending a request. +#[derive(Debug, derive_more::Display, derive_more::Error)] +pub enum SendRequestError { + /// We are not currently connected to the requested peer. + NotConnected, + /// Given protocol hasn't been registered. + UnknownProtocol, +} + +/// Error in a request. +#[derive(Debug, derive_more::Display, derive_more::Error)] +pub enum RequestFailure { + /// Remote has closed the substream before answering, thereby signaling that it considers the + /// request as valid, but refused to answer it. + Refused, + /// Problem on the network. + #[display(fmt = "Problem on the network")] + Network(#[error(ignore)] OutboundFailure), +} + +/// Error when processing a request sent by a remote. +#[derive(Debug, derive_more::Display, derive_more::Error)] +pub enum ResponseFailure { + /// Internal response builder is too busy to process this request. + Busy, + /// Problem on the network. + #[display(fmt = "Problem on the network")] + Network(#[error(ignore)] InboundFailure), +} + +/// Implements the libp2p [`RequestResponseCodec`] trait. Defines how streams of bytes are turned +/// into requests and responses and vice-versa. +#[derive(Debug, Clone)] +#[doc(hidden)] // Needs to be public in order to satisfy the Rust compiler. +pub struct GenericCodec { + max_request_size: u64, + max_response_size: u64, +} + +#[async_trait::async_trait] +impl RequestResponseCodec for GenericCodec { + type Protocol = Vec; + type Request = Vec; + type Response = Result, ()>; + + async fn read_request( + &mut self, + _: &Self::Protocol, + mut io: &mut T, + ) -> io::Result + where + T: AsyncRead + Unpin + Send, + { + // Read the length. + let length = unsigned_varint::aio::read_usize(&mut io).await + .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?; + if length > usize::try_from(self.max_request_size).unwrap_or(usize::max_value()) { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!("Request size exceeds limit: {} > {}", length, self.max_request_size) + )); + } + + // Read the payload. + let mut buffer = vec![0; length]; + io.read_exact(&mut buffer).await?; + Ok(buffer) + } + + async fn read_response( + &mut self, + _: &Self::Protocol, + mut io: &mut T, + ) -> io::Result + where + T: AsyncRead + Unpin + Send, + { + // Note that this function returns a `Result>`. Returning an `Err` is + // considered as a protocol error and will result in the entire connection being closed. + // Returning `Ok(Err(_))` signifies that a response has successfully been fetched, and + // that this response is an error. + + // Read the length. + let length = match unsigned_varint::aio::read_usize(&mut io).await { + Ok(l) => l, + Err(unsigned_varint::io::ReadError::Io(err)) + if matches!(err.kind(), io::ErrorKind::UnexpectedEof) => + { + return Ok(Err(())); + } + Err(err) => return Err(io::Error::new(io::ErrorKind::InvalidInput, err)), + }; + + if length > usize::try_from(self.max_response_size).unwrap_or(usize::max_value()) { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!("Response size exceeds limit: {} > {}", length, self.max_response_size) + )); + } + + // Read the payload. + let mut buffer = vec![0; length]; + io.read_exact(&mut buffer).await?; + Ok(Ok(buffer)) + } + + async fn write_request( + &mut self, + _: &Self::Protocol, + io: &mut T, + req: Self::Request, + ) -> io::Result<()> + where + T: AsyncWrite + Unpin + Send, + { + // TODO: check the length? + // Write the length. + { + let mut buffer = unsigned_varint::encode::usize_buffer(); + io.write_all(unsigned_varint::encode::usize(req.len(), &mut buffer)).await?; + } + + // Write the payload. + io.write_all(&req).await?; + + io.close().await?; + Ok(()) + } + + async fn write_response( + &mut self, + _: &Self::Protocol, + io: &mut T, + res: Self::Response, + ) -> io::Result<()> + where + T: AsyncWrite + Unpin + Send, + { + // If `res` is an `Err`, we jump to closing the substream without writing anything on it. + if let Ok(res) = res { + // TODO: check the length? + // Write the length. + { + let mut buffer = unsigned_varint::encode::usize_buffer(); + io.write_all(unsigned_varint::encode::usize(res.len(), &mut buffer)).await?; + } + + // Write the payload. + io.write_all(&res).await?; + } + + io.close().await?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use futures::{channel::mpsc, prelude::*}; + use libp2p::identity::Keypair; + use libp2p::Multiaddr; + use libp2p::core::upgrade; + use libp2p::core::transport::{Transport, MemoryTransport}; + use libp2p::core::upgrade::{InboundUpgradeExt, OutboundUpgradeExt}; + use libp2p::swarm::{Swarm, SwarmEvent}; + use std::{iter, time::Duration}; + + #[test] + fn basic_request_response_works() { + let protocol_name = "/test/req-rep/1"; + + // Build swarms whose behaviour is `RequestResponsesBehaviour`. + let mut swarms = (0..2) + .map(|_| { + let keypair = Keypair::generate_ed25519(); + let keypair2 = keypair.clone(); + + let transport = MemoryTransport + .and_then(move |out, endpoint| { + let secio = libp2p::secio::SecioConfig::new(keypair2); + libp2p::core::upgrade::apply( + out, + secio, + endpoint, + upgrade::Version::V1 + ) + }) + .and_then(move |(peer_id, stream), endpoint| { + let peer_id2 = peer_id.clone(); + let upgrade = libp2p::yamux::Config::default() + .map_inbound(move |muxer| (peer_id, muxer)) + .map_outbound(move |muxer| (peer_id2, muxer)); + upgrade::apply(stream, upgrade, endpoint, upgrade::Version::V1) + }); + + let behaviour = { + let (tx, mut rx) = mpsc::channel(64); + + let b = super::RequestResponsesBehaviour::new(iter::once(super::ProtocolConfig { + name: From::from(protocol_name), + max_request_size: 1024, + max_response_size: 1024 * 1024, + request_timeout: Duration::from_secs(30), + inbound_queue: Some(tx), + })).unwrap(); + + async_std::task::spawn(async move { + while let Some(rq) = rx.next().await { + assert_eq!(rq.payload, b"this is a request"); + let _ = rq.pending_response.send(b"this is a response".to_vec()); + } + }); + + b + }; + + let mut swarm = Swarm::new(transport, behaviour, keypair.public().into_peer_id()); + let listen_addr: Multiaddr = format!("/memory/{}", rand::random::()).parse().unwrap(); + + Swarm::listen_on(&mut swarm, listen_addr.clone()).unwrap(); + (swarm, listen_addr) + }) + .collect::>(); + + // Ask `swarm[0]` to dial `swarm[1]`. There isn't any discovery mechanism in place in + // this test, so they wouldn't connect to each other. + { + let dial_addr = swarms[1].1.clone(); + Swarm::dial_addr(&mut swarms[0].0, dial_addr).unwrap(); + } + + // Running `swarm[0]` in the background until a `InboundRequest` event happens, + // which is a hint about the test having ended. + async_std::task::spawn({ + let (mut swarm, _) = swarms.remove(0); + async move { + loop { + match swarm.next_event().await { + SwarmEvent::Behaviour(super::Event::InboundRequest { result, .. }) => { + assert!(result.is_ok()); + break + }, + _ => {} + } + } + } + }); + + // Remove and run the remaining swarm. + let (mut swarm, _) = swarms.remove(0); + async_std::task::block_on(async move { + let mut sent_request_id = None; + + loop { + match swarm.next_event().await { + SwarmEvent::ConnectionEstablished { peer_id, .. } => { + let id = swarm.send_request( + &peer_id, + protocol_name, + b"this is a request".to_vec() + ).unwrap(); + assert!(sent_request_id.is_none()); + sent_request_id = Some(id); + } + SwarmEvent::Behaviour(super::Event::RequestFinished { + request_id, + result, + }) => { + assert_eq!(Some(request_id), sent_request_id); + let result = result.unwrap(); + assert_eq!(result, b"this is a response"); + break; + } + _ => {} + } + } + }); + } + + #[test] + fn max_response_size_exceeded() { + let protocol_name = "/test/req-rep/1"; + + // Build swarms whose behaviour is `RequestResponsesBehaviour`. + let mut swarms = (0..2) + .map(|_| { + let keypair = Keypair::generate_ed25519(); + let keypair2 = keypair.clone(); + + let transport = MemoryTransport + .and_then(move |out, endpoint| { + let secio = libp2p::secio::SecioConfig::new(keypair2); + libp2p::core::upgrade::apply( + out, + secio, + endpoint, + upgrade::Version::V1 + ) + }) + .and_then(move |(peer_id, stream), endpoint| { + let peer_id2 = peer_id.clone(); + let upgrade = libp2p::yamux::Config::default() + .map_inbound(move |muxer| (peer_id, muxer)) + .map_outbound(move |muxer| (peer_id2, muxer)); + upgrade::apply(stream, upgrade, endpoint, upgrade::Version::V1) + }); + + let behaviour = { + let (tx, mut rx) = mpsc::channel(64); + + let b = super::RequestResponsesBehaviour::new(iter::once(super::ProtocolConfig { + name: From::from(protocol_name), + max_request_size: 1024, + max_response_size: 8, // <-- important for the test + request_timeout: Duration::from_secs(30), + inbound_queue: Some(tx), + })).unwrap(); + + async_std::task::spawn(async move { + while let Some(rq) = rx.next().await { + assert_eq!(rq.payload, b"this is a request"); + let _ = rq.pending_response.send(b"this response exceeds the limit".to_vec()); + } + }); + + b + }; + + let mut swarm = Swarm::new(transport, behaviour, keypair.public().into_peer_id()); + let listen_addr: Multiaddr = format!("/memory/{}", rand::random::()).parse().unwrap(); + + Swarm::listen_on(&mut swarm, listen_addr.clone()).unwrap(); + (swarm, listen_addr) + }) + .collect::>(); + + // Ask `swarm[0]` to dial `swarm[1]`. There isn't any discovery mechanism in place in + // this test, so they wouldn't connect to each other. + { + let dial_addr = swarms[1].1.clone(); + Swarm::dial_addr(&mut swarms[0].0, dial_addr).unwrap(); + } + + // Running `swarm[0]` in the background until a `InboundRequest` event happens, + // which is a hint about the test having ended. + async_std::task::spawn({ + let (mut swarm, _) = swarms.remove(0); + async move { + loop { + match swarm.next_event().await { + SwarmEvent::Behaviour(super::Event::InboundRequest { result, .. }) => { + assert!(result.is_ok()); + break + }, + _ => {} + } + } + } + }); + + // Remove and run the remaining swarm. + let (mut swarm, _) = swarms.remove(0); + async_std::task::block_on(async move { + let mut sent_request_id = None; + + loop { + match swarm.next_event().await { + SwarmEvent::ConnectionEstablished { peer_id, .. } => { + let id = swarm.send_request( + &peer_id, + protocol_name, + b"this is a request".to_vec() + ).unwrap(); + assert!(sent_request_id.is_none()); + sent_request_id = Some(id); + } + SwarmEvent::Behaviour(super::Event::RequestFinished { + request_id, + result, + }) => { + assert_eq!(Some(request_id), sent_request_id); + match result { + Err(super::RequestFailure::Network(super::OutboundFailure::ConnectionClosed)) => {}, + _ => panic!() + } + break; + } + _ => {} + } + } + }); + } +} diff --git a/client/network/src/service.rs b/client/network/src/service.rs index 6f7751f430751..754b5b184c096 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -29,7 +29,7 @@ use crate::{ ExHashT, NetworkStateInfo, - behaviour::{Behaviour, BehaviourOut}, + behaviour::{self, Behaviour, BehaviourOut}, config::{parse_str_addr, NonReservedPeerMode, Params, Role, TransportConfig}, DhtEvent, discovery::DiscoveryConfig, @@ -42,7 +42,7 @@ use crate::{ protocol::{self, event::Event, NotifsHandlerError, LegacyConnectionKillError, NotificationsSink, Ready, sync::SyncState, PeerInfo, Protocol}, transport, ReputationChange, }; -use futures::prelude::*; +use futures::{channel::oneshot, prelude::*}; use libp2p::{PeerId, multiaddr, Multiaddr}; use libp2p::core::{ConnectedPoint, Executor, connection::{ConnectionError, PendingConnectionError}, either::EitherError}; use libp2p::kad::record; @@ -76,6 +76,9 @@ use std::{ }, task::Poll, }; +use wasm_timer::Instant; + +pub use behaviour::{ResponseFailure, InboundFailure, RequestFailure, OutboundFailure}; mod out_events; #[cfg(test)] @@ -309,16 +312,28 @@ impl NetworkWorker { config }; - let mut behaviour = Behaviour::new( - protocol, - params.role, - user_agent, - local_public, - block_requests, - finality_proof_requests, - light_client_handler, - discovery_config - ); + let mut behaviour = { + let result = Behaviour::new( + protocol, + params.role, + user_agent, + local_public, + block_requests, + finality_proof_requests, + light_client_handler, + discovery_config, + params.network_config.request_response_protocols, + ); + + match result { + Ok(b) => b, + Err(crate::request_responses::RegisterError::DuplicateProtocol(proto)) => { + return Err(Error::DuplicateRequestResponseProtocol { + protocol: proto, + }) + }, + } + }; for (engine_id, protocol_name) in ¶ms.network_config.notifications_protocols { behaviour.register_notifications_protocol(*engine_id, protocol_name.clone()); @@ -404,6 +419,7 @@ impl NetworkWorker { peers_notifications_sinks, metrics, boot_node_ids, + pending_requests: HashMap::with_capacity(128), }) } @@ -752,12 +768,50 @@ impl NetworkService { /// parameter is a `&'static str`, and not a `String`, in order to avoid accidentally having /// an unbounded set of Prometheus metrics, which would be quite bad in terms of memory pub fn event_stream(&self, name: &'static str) -> impl Stream { - // Note: when transitioning to stable futures, remove the `Error` entirely let (tx, rx) = out_events::channel(name); let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::EventStream(tx)); rx } + /// Sends a single targeted request to a specific peer. On success, returns the response of + /// the peer. + /// + /// Request-response protocols are a way to complement notifications protocols, but + /// notifications should remain the default ways of communicating information. For example, a + /// peer can announce something through a notification, after which the recipient can obtain + /// more information by performing a request. + /// As such, this function is meant to be called only with peers we are already connected to. + /// Calling this method with a `target` we are not connected to will *not* attempt to connect + /// to said peer. + /// + /// No limit or throttling of concurrent outbound requests per peer and protocol are enforced. + /// Such restrictions, if desired, need to be enforced at the call site(s). + /// + /// The protocol must have been registered through + /// [`NetworkConfiguration::request_response_protocols`]. + pub async fn request( + &self, + target: PeerId, + protocol: impl Into>, + request: Vec + ) -> Result, RequestFailure> { + let (tx, rx) = oneshot::channel(); + let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::Request { + target, + protocol: protocol.into(), + request, + pending_response: tx + }); + + match rx.await { + Ok(v) => v, + // The channel can only be closed if the network worker no longer exists. If the + // network worker no longer exists, then all connections to `target` are necessarily + // closed, and we legitimately report this situation as a "ConnectionClosed". + Err(_) => Err(RequestFailure::Network(OutboundFailure::ConnectionClosed)), + } + } + /// Registers a new notifications protocol. /// /// After a protocol has been registered, you can call `write_notifications`. @@ -1096,6 +1150,12 @@ enum ServiceToWorkerMsg { AddKnownAddress(PeerId, Multiaddr), SyncFork(Vec, B::Hash, NumberFor), EventStream(out_events::Sender), + Request { + target: PeerId, + protocol: Cow<'static, str>, + request: Vec, + pending_response: oneshot::Sender, RequestFailure>>, + }, RegisterNotifProtocol { engine_id: ConsensusEngineId, protocol_name: Cow<'static, [u8]>, @@ -1132,6 +1192,13 @@ pub struct NetworkWorker { metrics: Option, /// The `PeerId`'s of all boot nodes. boot_node_ids: Arc>, + /// Requests started using [`NetworkService::request`]. Includes the channel to send back the + /// response, when the request has started, and the name of the protocol for diagnostic + /// purposes. + pending_requests: HashMap< + behaviour::RequestId, + (oneshot::Sender, RequestFailure>>, Instant, String) + >, /// For each peer and protocol combination, an object that allows sending notifications to /// that peer. Shared with the [`NetworkService`]. peers_notifications_sinks: Arc>>, @@ -1165,8 +1232,10 @@ struct Metrics { peerset_num_requested: Gauge, pending_connections: Gauge, pending_connections_errors_total: CounterVec, - requests_in_total: HistogramVec, - requests_out_finished: HistogramVec, + requests_in_failure_total: CounterVec, + requests_in_success_total: HistogramVec, + requests_out_failure_total: CounterVec, + requests_out_success_total: HistogramVec, requests_out_started_total: CounterVec, } @@ -1347,10 +1416,17 @@ impl Metrics { ), &["reason"] )?, registry)?, - requests_in_total: register(HistogramVec::new( + requests_in_failure_total: register(CounterVec::new( + Opts::new( + "sub_libp2p_requests_in_failure_total", + "Total number of incoming requests that the node has failed to answer" + ), + &["protocol", "reason"] + )?, registry)?, + requests_in_success_total: register(HistogramVec::new( HistogramOpts { common_opts: Opts::new( - "sub_libp2p_requests_in_total", + "sub_libp2p_requests_in_success_total", "Total number of requests received and answered" ), buckets: prometheus_endpoint::exponential_buckets(0.001, 2.0, 16) @@ -1358,11 +1434,18 @@ impl Metrics { }, &["protocol"] )?, registry)?, - requests_out_finished: register(HistogramVec::new( + requests_out_failure_total: register(CounterVec::new( + Opts::new( + "sub_libp2p_requests_out_failure_total", + "Total number of requests that have failed" + ), + &["protocol", "reason"] + )?, registry)?, + requests_out_success_total: register(HistogramVec::new( HistogramOpts { common_opts: Opts::new( - "sub_libp2p_requests_out_finished", - "Time between a request's start and finish (successful or not)" + "sub_libp2p_requests_out_success_total", + "For successful requests, time between a request's start and finish" ), buckets: prometheus_endpoint::exponential_buckets(0.001, 2.0, 16) .expect("parameters are always valid values; qed"), @@ -1446,6 +1529,31 @@ impl Future for NetworkWorker { this.network_service.user_protocol_mut().set_sync_fork_request(peer_ids, &hash, number), ServiceToWorkerMsg::EventStream(sender) => this.event_streams.push(sender), + ServiceToWorkerMsg::Request { target, protocol, request, pending_response } => { + // Calling `send_request` can fail immediately in some circumstances. + // This is handled by sending back an error on the channel. + match this.network_service.send_request(&target, &protocol, request) { + Ok(request_id) => { + if let Some(metrics) = this.metrics.as_ref() { + metrics.requests_out_started_total + .with_label_values(&[&protocol]) + .inc(); + } + this.pending_requests.insert( + request_id, + (pending_response, Instant::now(), protocol.to_string()) + ); + }, + Err(behaviour::SendRequestError::NotConnected) => { + let err = RequestFailure::Network(OutboundFailure::ConnectionClosed); + let _ = pending_response.send(Err(err)); + }, + Err(behaviour::SendRequestError::UnknownProtocol) => { + let err = RequestFailure::Network(OutboundFailure::UnsupportedProtocols); + let _ = pending_response.send(Err(err)); + }, + } + }, ServiceToWorkerMsg::RegisterNotifProtocol { engine_id, protocol_name } => { this.network_service .register_notifications_protocol(engine_id, protocol_name); @@ -1494,23 +1602,72 @@ impl Future for NetworkWorker { } this.import_queue.import_finality_proof(origin, hash, nb, proof); }, - Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::AnsweredRequest { protocol, build_time, .. })) => { + Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::InboundRequest { protocol, result, .. })) => { if let Some(metrics) = this.metrics.as_ref() { - metrics.requests_in_total - .with_label_values(&[&protocol]) - .observe(build_time.as_secs_f64()); + match result { + Ok(serve_time) => { + metrics.requests_in_success_total + .with_label_values(&[&protocol]) + .observe(serve_time.as_secs_f64()); + } + Err(err) => { + let reason = match err { + ResponseFailure::Busy => "busy", + ResponseFailure::Network(InboundFailure::Timeout) => "timeout", + ResponseFailure::Network(InboundFailure::UnsupportedProtocols) => + "unsupported", + }; + + metrics.requests_in_failure_total + .with_label_values(&[&protocol, reason]) + .inc(); + } + } + } + }, + Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::RequestFinished { request_id, result })) => { + if let Some((send_back, started, protocol)) = this.pending_requests.remove(&request_id) { + if let Some(metrics) = this.metrics.as_ref() { + match &result { + Ok(_) => { + metrics.requests_out_success_total + .with_label_values(&[&protocol]) + .observe(started.elapsed().as_secs_f64()); + } + Err(err) => { + let reason = match err { + RequestFailure::Refused => "refused", + RequestFailure::Network(OutboundFailure::DialFailure) => + "dial-failure", + RequestFailure::Network(OutboundFailure::Timeout) => + "timeout", + RequestFailure::Network(OutboundFailure::ConnectionClosed) => + "connection-closed", + RequestFailure::Network(OutboundFailure::UnsupportedProtocols) => + "unsupported", + }; + + metrics.requests_out_failure_total + .with_label_values(&[&protocol, reason]) + .inc(); + } + } + } + let _ = send_back.send(result); + } else { + error!("Request not in pending_requests"); } }, - Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::RequestStarted { protocol, .. })) => { + Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::OpaqueRequestStarted { protocol, .. })) => { if let Some(metrics) = this.metrics.as_ref() { metrics.requests_out_started_total .with_label_values(&[&protocol]) .inc(); } }, - Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::RequestFinished { protocol, request_duration, .. })) => { + Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::OpaqueRequestFinished { protocol, request_duration, .. })) => { if let Some(metrics) = this.metrics.as_ref() { - metrics.requests_out_finished + metrics.requests_out_success_total .with_label_values(&[&protocol]) .observe(request_duration.as_secs_f64()); } @@ -1635,14 +1792,14 @@ impl Future for NetworkWorker { let reason = match cause { Some(ConnectionError::IO(_)) => "transport-error", Some(ConnectionError::Handler(NodeHandlerWrapperError::Handler(EitherError::A(EitherError::A( - EitherError::A(EitherError::A(EitherError::B( - EitherError::A(PingFailure::Timeout))))))))) => "ping-timeout", + EitherError::A(EitherError::A(EitherError::A(EitherError::B( + EitherError::A(PingFailure::Timeout)))))))))) => "ping-timeout", Some(ConnectionError::Handler(NodeHandlerWrapperError::Handler(EitherError::A(EitherError::A( - EitherError::A(EitherError::A(EitherError::A( - NotifsHandlerError::Legacy(LegacyConnectionKillError))))))))) => "force-closed", + EitherError::A(EitherError::A(EitherError::A(EitherError::A( + NotifsHandlerError::Legacy(LegacyConnectionKillError)))))))))) => "force-closed", Some(ConnectionError::Handler(NodeHandlerWrapperError::Handler(EitherError::A(EitherError::A( - EitherError::A(EitherError::A(EitherError::A( - NotifsHandlerError::SyncNotificationsClogged)))))))) => "sync-notifications-clogged", + EitherError::A(EitherError::A(EitherError::A(EitherError::A( + NotifsHandlerError::SyncNotificationsClogged))))))))) => "sync-notifications-clogged", Some(ConnectionError::Handler(NodeHandlerWrapperError::Handler(_))) => "protocol-error", Some(ConnectionError::Handler(NodeHandlerWrapperError::KeepAliveTimeout)) => "keep-alive-timeout", None => "actively-closed", @@ -1800,7 +1957,7 @@ impl Unpin for NetworkWorker { /// Turns bytes that are potentially UTF-8 into a reasonable representable string. /// /// Meant to be used only for debugging or metrics-reporting purposes. -fn maybe_utf8_bytes_to_string(id: &[u8]) -> Cow { +pub(crate) fn maybe_utf8_bytes_to_string(id: &[u8]) -> Cow { if let Ok(s) = std::str::from_utf8(&id[..]) { Cow::Borrowed(s) } else {