From 849a6234a6d9322c58e6fb5ff166c58e45a2b7e8 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Thu, 9 Jul 2020 17:37:18 +0200 Subject: [PATCH 01/11] Implement request-responses protocols --- Cargo.lock | 27 + client/network/Cargo.toml | 3 +- client/network/src/behaviour.rs | 98 +++- client/network/src/config.rs | 13 +- client/network/src/error.rs | 9 +- client/network/src/lib.rs | 10 +- client/network/src/request_responses.rs | 742 ++++++++++++++++++++++++ client/network/src/service.rs | 219 +++++-- client/service/src/builder.rs | 2 +- 9 files changed, 1056 insertions(+), 67 deletions(-) create mode 100644 client/network/src/request_responses.rs diff --git a/Cargo.lock b/Cargo.lock index 0b24f9ef57218..1282137c8c026 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -265,6 +265,17 @@ dependencies = [ "webpki-roots 0.19.0", ] +[[package]] +name = "async-trait" +version = "0.1.36" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a265e3abeffdce30b2e26b7a11b222fe37c6067404001b434101457d0385eb92" +dependencies = [ + "proc-macro2", + "quote 1.0.6", + "syn 1.0.33", +] + [[package]] name = "atty" version = "0.2.14" @@ -2743,6 +2754,7 @@ dependencies = [ "libp2p-mplex", "libp2p-noise", "libp2p-ping", + "libp2p-request-response", "libp2p-secio", "libp2p-swarm", "libp2p-tcp", @@ -2929,6 +2941,20 @@ dependencies = [ "wasm-timer", ] +[[package]] +name = "libp2p-request-response" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f1a4cc57489fa519b98c068277b9032f04d382e8da8c0ca040d5aa872d3bfd7" +dependencies = [ + "async-trait", + "futures 0.3.5", + "libp2p-core", + "libp2p-swarm", + "smallvec 1.4.1", + "wasm-timer", +] + [[package]] name = "libp2p-secio" version = "0.20.0" @@ -6637,6 +6663,7 @@ version = "0.8.0-rc4" dependencies = [ "assert_matches", "async-std", + "async-trait", "bitflags", "bs58", "bytes 0.5.4", diff --git a/client/network/Cargo.toml b/client/network/Cargo.toml index 85f76437259dc..4b8f27729010f 100644 --- a/client/network/Cargo.toml +++ b/client/network/Cargo.toml @@ -18,6 +18,7 @@ targets = ["x86_64-unknown-linux-gnu"] prost-build = "0.6.1" [dependencies] +async-trait = "0.1" bitflags = "1.2.0" bs58 = "0.3.1" bytes = "0.5.0" @@ -65,7 +66,7 @@ zeroize = "1.0.0" [dependencies.libp2p] version = "0.21.1" default-features = false -features = ["identify", "kad", "mdns", "mplex", "noise", "ping", "tcp-async-std", "websocket", "yamux"] +features = ["identify", "kad", "mdns", "mplex", "noise", "ping", "request-response", "tcp-async-std", "websocket", "yamux"] [dev-dependencies] async-std = "1.6.2" diff --git a/client/network/src/behaviour.rs b/client/network/src/behaviour.rs index 5967613b98e46..04a779106c64f 100644 --- a/client/network/src/behaviour.rs +++ b/client/network/src/behaviour.rs @@ -16,9 +16,9 @@ use crate::{ config::{ProtocolId, Role}, block_requests, light_client_handler, finality_requests, - peer_info, discovery::{DiscoveryBehaviour, DiscoveryConfig, DiscoveryOut}, + request_responses, peer_info, discovery::{DiscoveryBehaviour, DiscoveryConfig, DiscoveryOut}, protocol::{message::{self, Roles}, CustomMessageOutcome, Protocol}, - Event, ObservedRole, DhtEvent, ExHashT, + service::maybe_utf8_bytes_to_string, Event, ObservedRole, DhtEvent, ExHashT, }; use codec::Encode as _; @@ -37,6 +37,10 @@ use std::{ time::Duration, }; +pub use crate::request_responses::{ + InboundError, InboundFailure, OutboundFailure, RequestId, SendRequestError +}; + /// General behaviour of the network. Combines all protocols together. #[derive(NetworkBehaviour)] #[behaviour(out_event = "BehaviourOut", poll_method = "poll")] @@ -48,6 +52,8 @@ pub struct Behaviour { peer_info: peer_info::PeerInfoBehaviour, /// Discovers nodes of the network. discovery: DiscoveryBehaviour, + /// Generic request-reponse protocols. + request_responses: request_responses::RequestResponsesBehaviour, /// Block request handling. block_requests: block_requests::BlockRequests, /// Finality proof request handling. @@ -74,22 +80,40 @@ pub enum BehaviourOut { RandomKademliaStarted(ProtocolId), /// We have received a request from a peer and answered it. - AnsweredRequest { + /// + /// This event is generated for statistics purposes. + InboundRequest { /// Peer which sent us a request. peer: PeerId, /// Protocol name of the request. - protocol: Vec, - /// Time it took to build the response. - build_time: Duration, + protocol: Cow<'static, str>, + /// If `Ok`, contains the time elapsed between when we received the request and when we + /// sent back the response. If `Err`, the error that happened. + outcome: Result, }, + + /// A request initiated using [`Behaviour::send_request`] has succeeded or failed. + RequestFinished { + /// Request that has succeeded. + request_id: RequestId, + /// Response sent by the remote or reason for failure. + outcome: Result, OutboundFailure>, + }, + /// Started a new request with the given node. - RequestStarted { + /// + /// This event is for statistics purposes only. The request and response handling are entirely + /// internal to the behaviour. + OpaqueRequestStarted { peer: PeerId, /// Protocol name of the request. protocol: Vec, }, /// Finished, successfully or not, a previously-started request. - RequestFinished { + /// + /// This event is for statistics purposes only. The request and response handling are entirely + /// internal to the behaviour. + OpaqueRequestFinished { /// Who we were requesting. peer: PeerId, /// Protocol name of the request. @@ -116,17 +140,20 @@ impl Behaviour { finality_proof_requests: finality_requests::FinalityProofRequests, light_client_handler: light_client_handler::LightClientHandler, disco_config: DiscoveryConfig, - ) -> Self { - Behaviour { + request_response_protocols: Vec, + ) -> Result { + Ok(Behaviour { substrate, peer_info: peer_info::PeerInfoBehaviour::new(user_agent, local_public_key), discovery: disco_config.finish(), + request_responses: + request_responses::RequestResponsesBehaviour::new(request_response_protocols.into_iter())?, block_requests, finality_proof_requests, light_client_handler, events: VecDeque::new(), role, - } + }) } /// Returns the list of nodes that we know exist in the network. @@ -163,6 +190,16 @@ impl Behaviour { self.peer_info.node(peer_id) } + /// Initiates sending a request. + /// + /// An error is returned if we are not connected to the target peer of if the protocol doesn't + /// match one that has been registered. + pub fn send_request(&mut self, target: &PeerId, protocol: &str, request: Vec) + -> Result + { + self.request_responses.send_request(target, protocol, request) + } + /// Registers a new notifications protocol. /// /// After that, you can call `write_notifications`. @@ -255,18 +292,18 @@ Behaviour { CustomMessageOutcome::BlockRequest { target, request } => { match self.block_requests.send_request(&target, request) { block_requests::SendRequestOutcome::Ok => { - self.events.push_back(BehaviourOut::RequestStarted { + self.events.push_back(BehaviourOut::OpaqueRequestStarted { peer: target, protocol: self.block_requests.protocol_name().to_vec(), }); }, block_requests::SendRequestOutcome::Replaced { request_duration, .. } => { - self.events.push_back(BehaviourOut::RequestFinished { + self.events.push_back(BehaviourOut::OpaqueRequestFinished { peer: target.clone(), protocol: self.block_requests.protocol_name().to_vec(), request_duration, }); - self.events.push_back(BehaviourOut::RequestStarted { + self.events.push_back(BehaviourOut::OpaqueRequestStarted { peer: target, protocol: self.block_requests.protocol_name().to_vec(), }); @@ -307,18 +344,41 @@ Behaviour { } } +impl NetworkBehaviourEventProcess for Behaviour { + fn inject_event(&mut self, event: request_responses::Event) { + match event { + request_responses::Event::InboundRequest { peer, protocol, outcome } => { + self.events.push_back(BehaviourOut::InboundRequest { + peer, + protocol, + outcome, + }); + } + + request_responses::Event::OutboundFinished { request_id, outcome } => { + self.events.push_back(BehaviourOut::RequestFinished { + request_id, + outcome, + }); + }, + } + } +} + impl NetworkBehaviourEventProcess> for Behaviour { fn inject_event(&mut self, event: block_requests::Event) { match event { block_requests::Event::AnsweredRequest { peer, total_handling_time } => { - self.events.push_back(BehaviourOut::AnsweredRequest { + let protocol = maybe_utf8_bytes_to_string(&self.block_requests.protocol_name()) + .into_owned(); + self.events.push_back(BehaviourOut::InboundRequest { peer, - protocol: self.block_requests.protocol_name().to_vec(), - build_time: total_handling_time, + protocol: From::from(protocol), + outcome: Ok(total_handling_time), }); }, block_requests::Event::Response { peer, original_request: _, response, request_duration } => { - self.events.push_back(BehaviourOut::RequestFinished { + self.events.push_back(BehaviourOut::OpaqueRequestFinished { peer: peer.clone(), protocol: self.block_requests.protocol_name().to_vec(), request_duration, @@ -330,7 +390,7 @@ impl NetworkBehaviourEventProcess { // There doesn't exist any mechanism to report cancellations or timeouts yet, so // we process them by disconnecting the node. - self.events.push_back(BehaviourOut::RequestFinished { + self.events.push_back(BehaviourOut::OpaqueRequestFinished { peer: peer.clone(), protocol: self.block_requests.protocol_name().to_vec(), request_duration, diff --git a/client/network/src/config.rs b/client/network/src/config.rs index 94b2993b4e6bd..8280f57356da5 100644 --- a/client/network/src/config.rs +++ b/client/network/src/config.rs @@ -23,6 +23,7 @@ pub use crate::chain::{Client, FinalityProofProvider}; pub use crate::on_demand_layer::{AlwaysBadChecker, OnDemand}; +pub use crate::request_responses::{IncomingRequest, ProtocolConfig as RequestResponseConfig}; pub use libp2p::{identity, core::PublicKey, wasm_ext::ExtTransport, build_multiaddr}; // Note: this re-export shouldn't be part of the public API of the crate and will be removed in @@ -34,9 +35,10 @@ use crate::ExHashT; use core::{fmt, iter}; use futures::future; -use libp2p::identity::{ed25519, Keypair}; -use libp2p::wasm_ext; -use libp2p::{multiaddr, Multiaddr, PeerId}; +use libp2p::{ + identity::{ed25519, Keypair}, + multiaddr, wasm_ext, Multiaddr, PeerId, +}; use prometheus_endpoint::Registry; use sp_consensus::{block_validation::BlockAnnounceValidator, import_queue::ImportQueue}; use sp_runtime::{traits::Block as BlockT, ConsensusEngineId}; @@ -407,6 +409,8 @@ pub struct NetworkConfiguration { /// List of notifications protocols that the node supports. Must also include a /// `ConsensusEngineId` for backwards-compatibility. pub notifications_protocols: Vec<(ConsensusEngineId, Cow<'static, [u8]>)>, + /// List of request-response protocols that the node supports. + pub request_response_protocols: Vec, /// Maximum allowed number of incoming connections. pub in_peers: u32, /// Number of outgoing connections we're trying to maintain. @@ -442,6 +446,7 @@ impl NetworkConfiguration { boot_nodes: Vec::new(), node_key, notifications_protocols: Vec::new(), + request_response_protocols: Vec::new(), in_peers: 25, out_peers: 75, reserved_nodes: Vec::new(), @@ -458,9 +463,7 @@ impl NetworkConfiguration { allow_non_globals_in_dht: false, } } -} -impl NetworkConfiguration { /// Create new default configuration for localhost-only connection with random port (useful for testing) pub fn new_local() -> NetworkConfiguration { let mut config = NetworkConfiguration::new( diff --git a/client/network/src/error.rs b/client/network/src/error.rs index b87e495983eaf..a1fc4b03acb7e 100644 --- a/client/network/src/error.rs +++ b/client/network/src/error.rs @@ -21,7 +21,7 @@ use crate::config::TransportConfig; use libp2p::{PeerId, Multiaddr}; -use std::fmt; +use std::{borrow::Cow, fmt}; /// Result type alias for the network. pub type Result = std::result::Result; @@ -61,6 +61,12 @@ pub enum Error { /// The invalid addresses. addresses: Vec, }, + /// The same request-response protocol has been registered multiple times. + #[display(fmt = "Request-response protocol registered multiple times: {}", protocol)] + DuplicateRequestResponseProtocol { + /// Name of the protocol registered multiple times. + protocol: Cow<'static, str>, + }, } // Make `Debug` use the `Display` implementation. @@ -78,6 +84,7 @@ impl std::error::Error for Error { Error::DuplicateBootnode { .. } => None, Error::Prometheus(ref err) => Some(err), Error::AddressesForAnotherTransport { .. } => None, + Error::DuplicateRequestResponseProtocol { .. } => None, } } } diff --git a/client/network/src/lib.rs b/client/network/src/lib.rs index b8e5d7582b96b..ad57a118ffd7b 100644 --- a/client/network/src/lib.rs +++ b/client/network/src/lib.rs @@ -251,6 +251,7 @@ mod finality_requests; mod light_client_handler; mod on_demand_layer; mod protocol; +mod request_responses; mod schema; mod service; mod transport; @@ -260,13 +261,10 @@ pub mod config; pub mod error; pub mod network_state; -pub use service::{NetworkService, NetworkWorker}; -pub use protocol::PeerInfo; -pub use protocol::event::{Event, DhtEvent, ObservedRole}; -pub use protocol::sync::SyncState; -pub use libp2p::{Multiaddr, PeerId}; #[doc(inline)] -pub use libp2p::multiaddr; +pub use libp2p::{multiaddr, Multiaddr, PeerId}; +pub use protocol::{event::{DhtEvent, Event, ObservedRole}, sync::SyncState, PeerInfo}; +pub use service::{NetworkService, NetworkWorker, OutboundFailure}; pub use sc_peerset::ReputationChange; use sp_runtime::traits::{Block as BlockT, NumberFor}; diff --git a/client/network/src/request_responses.rs b/client/network/src/request_responses.rs new file mode 100644 index 0000000000000..07c7b66d31d79 --- /dev/null +++ b/client/network/src/request_responses.rs @@ -0,0 +1,742 @@ +// Copyright 2019-2020 Parity Technologies (UK) Ltd. +// This file is part of Substrate. + +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! Collection of request-response protocols. +//! +//! The [`RequestResponses`] struct defined in this module provides support for zero or more +//! so-called "request-response" protocols. +//! +//! A request-response protocol works in the following way: +//! +//! - For every emitted request, a new substream is open and the protocol is negotiated. If the +//! remote supports the protocol, the size of the request is sent as a LEB128 number, followed +//! with the request itself. The remote then sends the size of the response as a LEB128 number, +//! followed with the response. +//! +//! - Requests have a certain time limit before they time out. This time includes the time it +//! takes to send/receive the request and response. +//! +//! - If provided, a ["requests processing"](RequestResponseConfig::requests_processing) channel +//! is used to handle incoming requests. +//! + +use futures::{channel::{mpsc, oneshot}, prelude::*}; +use libp2p::{ + core::{ + connection::{ConnectionId, ListenerId}, + ConnectedPoint, Multiaddr, PeerId, + }, + request_response::{ + RequestResponse, RequestResponseCodec, RequestResponseConfig, RequestResponseEvent, + RequestResponseMessage, ResponseChannel, ProtocolSupport + }, + swarm::{ + protocols_handler::multi::MultiHandler, NetworkBehaviour, NetworkBehaviourAction, + PollParameters, ProtocolsHandler, + }, +}; +use std::{ + borrow::Cow, collections::{hash_map::Entry, HashMap}, io, iter, pin::Pin, + task::{Context, Poll}, time::Duration, +}; + +pub use libp2p::request_response::{InboundFailure, OutboundFailure, RequestId}; + +/// Configuration for a single request-response protocol. +#[derive(Debug, Clone)] +pub struct ProtocolConfig { + /// Name of the protocol on the wire. Should be something like `/foo/bar`. + pub name: Cow<'static, str>, + + /// Maximum allowed size, in bytes, of a request. + /// + /// Any request larger than this value will be declined as a way to avoid allocating too + /// much memory for the it. + pub max_request_size: usize, + + /// Maximum allowed size, in bytes, of a response. + /// + /// Any response larger than this value will be declined as a way to avoid allocating too + /// much memory for the it. + pub max_response_size: usize, + + /// Duration after which emitted requests are considered timed out. + /// + /// If you expect the response to come back quickly, you should set this to a smaller duration. + pub request_timeout: Duration, + + /// Channel on which the networking service will send incoming requests. + /// + /// Every time a peer sends a request to the local node using this protocol, the networking + /// service will push an element on this channel. The receiving side of this channel then has + /// to pull this element, process the request, and send back the response to send back to the + /// peer. + /// + /// The size of the channel has to be carefully chosen. If the channel is full, the networking + /// service will discard the incoming request send back an error to the peer. Consequently, + /// the channel being full is an indicator that the node is overloaded. + /// + /// You can typically set the size of the channel to `T / d`, where `T` is the + /// `request_timeout` and `d` is the expected average duration of CPU and I/O it takes to + /// build a response. + /// + /// Can be `None` if the local node does not support answering incoming requests. + /// If this is `None`, then the local node will not advertise support for this protocol towards + /// other peers. If this is `Some` but the channel is closed, then the local node will + /// advertise support for this protocol, but any incoming request will lead to an error being + /// sent back. + pub requests_processing: Option>, +} + +/// A single request received by a peer on a request-response protocol. +#[derive(Debug)] +pub struct IncomingRequest { + /// Who sent the request. + pub origin: PeerId, + + /// Request sent by the remote. Will always be smaller than + /// [`RequestResponseConfig::max_response_size`]. + pub request_bytes: Vec, + + /// Channel to send back the response to. + pub answer: oneshot::Sender>, +} + +/// Event generated by the [`RequestResponsesBehaviour`]. +#[derive(Debug)] +pub enum Event { + /// A remote sent a request and either we have successfully answered it or an error happened. + /// + /// This event is generated for statistics purposes. + InboundRequest { + /// Peer which has emitted the request. + peer: PeerId, + /// Name of the protocol in question. + protocol: Cow<'static, str>, + /// If `Ok`, contains the time elapsed between when we received the request and when we + /// sent back the response. If `Err`, the error that happened. + outcome: Result, + }, + + /// A request initiated using [`RequestResponsesBehaviour::send_request`] has succeeded or + /// failed. + OutboundFinished { + /// Request that has succeeded. + request_id: RequestId, + /// Response sent by the remote or reason for failure. + outcome: Result, OutboundFailure>, + }, +} + +/// Implementation of `NetworkBehaviour` that provides support for request-response protocols. +pub struct RequestResponsesBehaviour { + /// The multiple sub-protocols, by name. + /// Contains the underlying libp2p `RequestResponse` behaviour, plus an optional + /// "response builder" used to build responses to incoming requests. + protocols: HashMap< + Cow<'static, str>, + (RequestResponse, Option>) + >, + + /// Whenever an incoming request arrives, a `Future` is added to this list and will yield the + /// response to send back to the remote. + pending_responses: stream::FuturesUnordered< + Pin + Send>> + >, +} + +/// Generated by the response builder and waiting to be processed. +enum RequestProcessingOutcome { + PendingResponse { + protocol: Cow<'static, str>, + inner_channel: ResponseChannel>, + response: Vec, + }, + Busy { + peer: PeerId, + protocol: Cow<'static, str>, + }, +} + +impl RequestResponsesBehaviour { + /// Creates a new behaviour. Must be passed a list of supported protocols. Returns an error if + /// the same protocol is passed twice. + pub fn new(list: impl Iterator) -> Result { + let mut protocols = HashMap::new(); + for protocol in list { + let mut cfg = RequestResponseConfig::default(); + cfg.set_connection_keep_alive(Duration::from_secs(10)); + cfg.set_request_timeout(protocol.request_timeout); + + let protocol_support = if protocol.requests_processing.is_some() { + ProtocolSupport::Full + } else { + ProtocolSupport::Outbound + }; + + let rq_rp = RequestResponse::new(GenericCodec { + max_request_size: protocol.max_request_size, + max_response_size: protocol.max_response_size, + }, iter::once((protocol.name.as_bytes().to_vec(), protocol_support)), cfg); + + match protocols.entry(protocol.name) { + Entry::Vacant(e) => e.insert((rq_rp, protocol.requests_processing)), + Entry::Occupied(e) => + return Err(RegisterError::DuplicateProtocol(e.key().clone())), + }; + } + + Ok(Self { + protocols, + pending_responses: stream::FuturesUnordered::new(), + }) + } + + /// Initiates sending a request. + /// + /// An error is returned if we are not connected to the target peer of if the protocol doesn't + /// match one that has been registered. + pub fn send_request(&mut self, target: &PeerId, protocol: &str, request: Vec) + -> Result + { + if let Some((protocol, _)) = self.protocols.get_mut(protocol) { + if protocol.is_connected(target) { + Ok(protocol.send_request(target, request)) + } else { + Err(SendRequestError::NotConnected) + } + } else { + Err(SendRequestError::UnknownProtocol) + } + } +} + +impl NetworkBehaviour for RequestResponsesBehaviour { + type ProtocolsHandler = MultiHandler< + String, + as NetworkBehaviour>::ProtocolsHandler, + >; + type OutEvent = Event; + + fn new_handler(&mut self) -> Self::ProtocolsHandler { + let iter = self.protocols.iter_mut() + .map(|(p, (r, _))| (p.to_string(), NetworkBehaviour::new_handler(r))); + + MultiHandler::try_from_iter(iter) + .expect("Protocols are in a HashMap and there can be at most one handler per \ + protocol name, which is the only possible error; qed") + } + + fn addresses_of_peer(&mut self, _: &PeerId) -> Vec { + Vec::new() + } + + fn inject_connection_established( + &mut self, + peer_id: &PeerId, + conn: &ConnectionId, + endpoint: &ConnectedPoint, + ) { + for (p, _) in self.protocols.values_mut() { + NetworkBehaviour::inject_connection_established(p, peer_id, conn, endpoint) + } + } + + fn inject_connected(&mut self, peer_id: &PeerId) { + for (p, _) in self.protocols.values_mut() { + NetworkBehaviour::inject_connected(p, peer_id) + } + } + + fn inject_connection_closed(&mut self, peer_id: &PeerId, conn: &ConnectionId, endpoint: &ConnectedPoint) { + for (p, _) in self.protocols.values_mut() { + NetworkBehaviour::inject_connection_closed(p, peer_id, conn, endpoint) + } + } + + fn inject_disconnected(&mut self, peer_id: &PeerId) { + for (p, _) in self.protocols.values_mut() { + NetworkBehaviour::inject_disconnected(p, peer_id) + } + } + + fn inject_addr_reach_failure( + &mut self, + peer_id: Option<&PeerId>, + addr: &Multiaddr, + error: &dyn std::error::Error + ) { + for (p, _) in self.protocols.values_mut() { + NetworkBehaviour::inject_addr_reach_failure(p, peer_id, addr, error) + } + } + + fn inject_event( + &mut self, + peer_id: PeerId, + connection: ConnectionId, + (p_name, event): ::OutEvent, + ) { + if let Some((proto, _)) = self.protocols.get_mut(&*p_name) { + return proto.inject_event(peer_id, connection, event) + } + + log::warn!(target: "sub-libp2p", + "inject_node_event: no request-response instance registered for protocol {:?}", + p_name) + } + + fn inject_new_external_addr(&mut self, addr: &Multiaddr) { + for (p, _) in self.protocols.values_mut() { + NetworkBehaviour::inject_new_external_addr(p, addr) + } + } + + fn inject_expired_listen_addr(&mut self, addr: &Multiaddr) { + for (p, _) in self.protocols.values_mut() { + NetworkBehaviour::inject_expired_listen_addr(p, addr) + } + } + + fn inject_dial_failure(&mut self, peer_id: &PeerId) { + for (p, _) in self.protocols.values_mut() { + NetworkBehaviour::inject_dial_failure(p, peer_id) + } + } + + fn inject_new_listen_addr(&mut self, addr: &Multiaddr) { + for (p, _) in self.protocols.values_mut() { + NetworkBehaviour::inject_new_listen_addr(p, addr) + } + } + + fn inject_listener_error(&mut self, id: ListenerId, err: &(dyn std::error::Error + 'static)) { + for (p, _) in self.protocols.values_mut() { + NetworkBehaviour::inject_listener_error(p, id, err) + } + } + + fn inject_listener_closed(&mut self, id: ListenerId, reason: Result<(), &io::Error>) { + for (p, _) in self.protocols.values_mut() { + NetworkBehaviour::inject_listener_closed(p, id, reason) + } + } + + fn poll( + &mut self, + cx: &mut Context, + params: &mut impl PollParameters, + ) -> Poll< + NetworkBehaviourAction< + ::InEvent, + Self::OutEvent, + >, + > { + loop { + // Poll to see if any response is ready to be sent back. + // We need to check `is_empty` first, otherwise polling would return `None`. + if !self.pending_responses.is_empty() { + while let Poll::Ready(Some(outcome)) = self.pending_responses.poll_next_unpin(cx) { + match outcome { + RequestProcessingOutcome::PendingResponse { + protocol, inner_channel, response + } => { + if let Some((protocol, _)) = self.protocols.get_mut(&*protocol) { + protocol.send_response(inner_channel, response); + } + } + RequestProcessingOutcome::Busy { peer, protocol } => { + let out = Event::InboundRequest { + peer, + protocol, + outcome: Err(InboundError::Busy), + }; + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(out)); + } + } + } + } + + // Poll request-responses protocols. + for (protocol, (behaviour, resp_builder)) in &mut self.protocols { + while let Poll::Ready(ev) = behaviour.poll(cx, params) { + let ev = match ev { + // Main events we are interested in. + NetworkBehaviourAction::GenerateEvent(ev) => ev, + + // Other events generated by the underlying behaviour are transparently + // passed through. + NetworkBehaviourAction::DialAddress { address } => { + return Poll::Ready(NetworkBehaviourAction::DialAddress { address }) + } + NetworkBehaviourAction::DialPeer { peer_id, condition } => { + return Poll::Ready(NetworkBehaviourAction::DialPeer { + peer_id, + condition, + }) + } + NetworkBehaviourAction::NotifyHandler { + peer_id, + handler, + event, + } => { + return Poll::Ready(NetworkBehaviourAction::NotifyHandler { + peer_id, + handler, + event: ((*protocol).to_string(), event), + }) + } + NetworkBehaviourAction::ReportObservedAddr { address } => { + return Poll::Ready(NetworkBehaviourAction::ReportObservedAddr { + address, + }) + } + }; + + match ev { + // Received a request from a remote. + RequestResponseEvent::Message { + peer, + message: RequestResponseMessage::Request { request, channel }, + } => { + let (tx, rx) = oneshot::channel(); + + // Submit the request to the "response builder" passed by the user at + // initialization. + if let Some(resp_builder) = resp_builder { + // If the response builder is too busy, silently drop `tx`. + // This will be reported as a `Busy` error. + let _ = resp_builder.try_send(IncomingRequest { + origin: peer.clone(), + request_bytes: request, + answer: tx, + }); + } + + let protocol = protocol.clone(); + self.pending_responses.push(Box::pin(async move { + // The `tx` created above can be dropped if we are not capable of + // processing this request, which is reflected as a "Busy" error. + if let Ok(response) = rx.await { + RequestProcessingOutcome::PendingResponse { + protocol, inner_channel: channel, response + } + } else { + RequestProcessingOutcome::Busy { peer, protocol } + } + })); + + // This `continue` makres sure that `pending_responses` gets polled + // after we have added the new element. + continue; + } + + // Received a response from a remote to one of our requests. + RequestResponseEvent::Message { + message: + RequestResponseMessage::Response { + request_id, + response, + }, + .. + } => { + let out = Event::OutboundFinished { + request_id, + outcome: Ok(response), + }; + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(out)); + } + + // One of our requests has failed. + RequestResponseEvent::OutboundFailure { + request_id, + error, + .. + } => { + let out = Event::OutboundFinished { + request_id, + outcome: Err(error), + }; + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(out)); + } + + // Remote has tried to send a request but failed. + RequestResponseEvent::InboundFailure { peer, error } => { + let out = Event::InboundRequest { + peer, + protocol: protocol.clone(), + outcome: Err(InboundError::Network(error)), + }; + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(out)); + } + }; + } + } + + break Poll::Pending; + } + } +} + +/// Error when registering a protocol. +#[derive(Debug, derive_more::Display, derive_more::Error)] +pub enum RegisterError { + /// A protocol has been specified multiple times. + DuplicateProtocol(#[error(ignore)] Cow<'static, str>), +} + +/// Error when sending a request. +#[derive(Debug, derive_more::Display, derive_more::Error)] +pub enum SendRequestError { + /// We are not currently connected to the requested peer. + NotConnected, + /// Given protocol hasn't been registered. + UnknownProtocol, +} + +/// Error when processing a request sent by a remote. +#[derive(Debug, derive_more::Display, derive_more::Error)] +pub enum InboundError { + /// Internal response builder is too busy to process this request. + Busy, + /// Problem on the network. + #[display(fmt = "Problem on the network")] + Network(#[error(ignore)] InboundFailure), +} + +/// Implements the libp2p [`RequestResponseCodec`] trait. Defines how streams of bytes are turned +/// into requests and responses and vice-versa. +#[derive(Debug, Clone)] +#[doc(hidden)] // Needs to be public in order to satisfy the Rust compiler. +pub struct GenericCodec { + max_request_size: usize, + max_response_size: usize, +} + +#[async_trait::async_trait] +impl RequestResponseCodec for GenericCodec { + type Protocol = Vec; + type Request = Vec; + type Response = Vec; + + async fn read_request( + &mut self, + _: &Self::Protocol, + mut io: &mut T, + ) -> io::Result + where + T: AsyncRead + Unpin + Send, + { + // Read the length. + let length = unsigned_varint::aio::read_usize(&mut io).await + .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?; + if length > self.max_request_size { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!("Request size exceeds limit: {} > {}", length, self.max_request_size) + )); + } + + // Read the payload. + let mut buffer = vec![0; length]; + io.read_exact(&mut buffer).await?; + Ok(buffer) + } + + async fn read_response( + &mut self, + _: &Self::Protocol, + mut io: &mut T, + ) -> io::Result + where + T: AsyncRead + Unpin + Send, + { + // Read the length. + let length = unsigned_varint::aio::read_usize(&mut io).await + .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?; + if length > self.max_response_size { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!("Response size exceeds limit: {} > {}", length, self.max_response_size) + )); + } + + // Read the payload. + let mut buffer = vec![0; length]; + io.read_exact(&mut buffer).await?; + Ok(buffer) + } + + async fn write_request( + &mut self, + _: &Self::Protocol, + io: &mut T, + req: Self::Request, + ) -> io::Result<()> + where + T: AsyncWrite + Unpin + Send, + { + // TODO: check the length? + // Write the length. + { + let mut buffer = unsigned_varint::encode::usize_buffer(); + io.write_all(unsigned_varint::encode::usize(req.len(), &mut buffer)).await?; + } + + // Write the payload. + io.write_all(&req).await?; + + io.close().await?; + Ok(()) + } + + async fn write_response( + &mut self, + _: &Self::Protocol, + io: &mut T, + res: Self::Response, + ) -> io::Result<()> + where + T: AsyncWrite + Unpin + Send, + { + // TODO: check the length? + // Write the length. + { + let mut buffer = unsigned_varint::encode::usize_buffer(); + io.write_all(unsigned_varint::encode::usize(res.len(), &mut buffer)).await?; + } + + // Write the payload. + io.write_all(&res).await?; + + io.close().await?; + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use crate::config::ProtocolId; + use futures::{channel::mpsc, prelude::*}; + use libp2p::identity::Keypair; + use libp2p::Multiaddr; + use libp2p::core::upgrade; + use libp2p::core::transport::{Transport, MemoryTransport}; + use libp2p::core::upgrade::{InboundUpgradeExt, OutboundUpgradeExt}; + use libp2p::swarm::Swarm; + use std::{collections::HashSet, iter, task::Poll, time::Duration}; + + #[test] + fn discovery_working() { + let mut user_defined = Vec::new(); + + let protocol_name = "/test/req-rep/1"; + + // Build swarms whose behaviour is `RequestResponsesBehaviour`. + let mut swarms = (0..2) + .map(|_| { + let keypair = Keypair::generate_ed25519(); + let keypair2 = keypair.clone(); + + let transport = MemoryTransport + .and_then(move |out, endpoint| { + let secio = libp2p::secio::SecioConfig::new(keypair2); + libp2p::core::upgrade::apply( + out, + secio, + endpoint, + upgrade::Version::V1 + ) + }) + .and_then(move |(peer_id, stream), endpoint| { + let peer_id2 = peer_id.clone(); + let upgrade = libp2p::yamux::Config::default() + .map_inbound(move |muxer| (peer_id, muxer)) + .map_outbound(move |muxer| (peer_id2, muxer)); + upgrade::apply(stream, upgrade, endpoint, upgrade::Version::V1) + }); + + let behaviour = { + let (tx, rx) = mpsc::channel(64); + + async_std::task::spawn(async move { + while let Some(rq) = rx.next().await { + assert_eq!(rq.request_bytes, b"this is a request"); + let _ = rq.answer.send(b"this is a response".to_vec()); + } + }); + + super::RequestResponsesBehaviour::new(iter::once(super::ProtocolConfig { + name: From::from(protocol_name), + max_request_size: 1024, + max_response_size: 1024 * 1024, + request_timeout: Duration::from_secs(30), + requests_processing: Some(tx), + })) + }; + + let mut swarm = Swarm::new(transport, behaviour, keypair.public().into_peer_id()); + let listen_addr: Multiaddr = format!("/memory/{}", rand::random::()).parse().unwrap(); + + if user_defined.is_empty() { + user_defined.push((keypair.public().into_peer_id(), listen_addr.clone())); + } + + Swarm::listen_on(&mut swarm, listen_addr.clone()).unwrap(); + (swarm, listen_addr) + }) + .collect::>(); + + let fut = futures::future::poll_fn(move |cx| { + 'polling: loop { + for swarm_n in 0..swarms.len() { + match swarms[swarm_n].0.poll_next_unpin(cx) { + Poll::Ready(Some(e)) => { + match e { + super::Event::UnroutablePeer(other) => { + // Call `add_self_reported_address` to simulate identify happening. + let addr = swarms.iter().find_map(|(s, a)| + if s.local_peer_id == other { + Some(a.clone()) + } else { + None + }) + .unwrap(); + swarms[swarm_n].0.add_self_reported_address(&other, addr); + }, + super::Event::Discovered(other) => { + to_discover[swarm_n].remove(&other); + } + _ => {} + } + continue 'polling + } + _ => {} + } + } + break; + } + + if to_discover.iter().all(|l| l.is_empty()) { + Poll::Ready(()) + } else { + Poll::Pending + } + }); + + futures::executor::block_on(fut); + } +} diff --git a/client/network/src/service.rs b/client/network/src/service.rs index 7d4135de6b9ed..be82668522f92 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -29,7 +29,7 @@ use crate::{ ExHashT, NetworkStateInfo, - behaviour::{Behaviour, BehaviourOut}, + behaviour::{self, Behaviour, BehaviourOut}, config::{parse_addr, parse_str_addr, NonReservedPeerMode, Params, Role, TransportConfig}, discovery::DiscoveryConfig, error::Error, @@ -41,7 +41,7 @@ use crate::{ protocol::{self, event::Event, LegacyConnectionKillError, sync::SyncState, PeerInfo, Protocol}, transport, ReputationChange, }; -use futures::prelude::*; +use futures::{channel::oneshot, prelude::*}; use libp2p::{PeerId, Multiaddr}; use libp2p::core::{ConnectedPoint, Executor, connection::{ConnectionError, PendingConnectionError}, either::EitherError}; use libp2p::kad::record; @@ -61,7 +61,7 @@ use sp_runtime::{ use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender}; use std::{ borrow::{Borrow, Cow}, - collections::HashSet, + collections::{HashMap, HashSet}, fs, marker::PhantomData, num:: NonZeroUsize, @@ -73,6 +73,9 @@ use std::{ }, task::Poll, }; +use wasm_timer::Instant; + +pub use behaviour::{InboundError, InboundFailure, OutboundFailure}; mod out_events; #[cfg(test)] @@ -291,16 +294,28 @@ impl NetworkWorker { config }; - let mut behaviour = Behaviour::new( - protocol, - params.role, - user_agent, - local_public, - block_requests, - finality_proof_requests, - light_client_handler, - discovery_config - ); + let mut behaviour = { + let result = Behaviour::new( + protocol, + params.role, + user_agent, + local_public, + block_requests, + finality_proof_requests, + light_client_handler, + discovery_config, + params.network_config.request_response_protocols, + ); + + match result { + Ok(b) => b, + Err(crate::request_responses::RegisterError::DuplicateProtocol(proto)) => { + return Err(Error::DuplicateRequestResponseProtocol { + protocol: proto, + }) + }, + } + }; for (engine_id, protocol_name) in ¶ms.network_config.notifications_protocols { behaviour.register_notifications_protocol(*engine_id, protocol_name.clone()); @@ -366,6 +381,7 @@ impl NetworkWorker { event_streams: out_events::OutChannels::new(params.metrics_registry.as_ref())?, metrics, boot_node_ids, + pending_requests: HashMap::with_capacity(128), }) } @@ -571,12 +587,50 @@ impl NetworkService { /// parameter is a `&'static str`, and not a `String`, in order to avoid accidentally having /// an unbounded set of Prometheus metrics, which would be quite bad in terms of memory pub fn event_stream(&self, name: &'static str) -> impl Stream { - // Note: when transitioning to stable futures, remove the `Error` entirely let (tx, rx) = out_events::channel(name); let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::EventStream(tx)); rx } + /// Sends a single targeted request to a specific peer. On success, returns the response of + /// the peer. + /// + /// Request-response protocols are a way to complement notifications protocols, but + /// notifications should remain the default ways of communicating information. For example, a + /// peer can announce something through a notification, after which the recipient can obtain + /// more information by performing a request. + /// As such, this function is meant be called only with peers we are already connected to. + /// Calling this method with a `target` we are not connected to will *not* attempt to connect + /// to said peer. + /// + /// Contrary to notifications, requests are guaranteed to not be interrupted under normal + /// circumstances. + /// + /// The protocol must have been registered through + /// [`NetworkConfiguration::request_response_protocols`]. + pub async fn request( + &self, + target: PeerId, + protocol: impl Into>, + request: Vec + ) -> Result, OutboundFailure> { + let (tx, rx) = oneshot::channel(); + let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::Request { + target, + protocol: protocol.into(), + request, + response: tx + }); + + match rx.await { + Ok(v) => v, + // The channel can only be closed if the network worker no longer exists. If the + // network worker no longer exists, then all connections to `target` are necessarily + // closed, and we legitimately report this situation as a "ConnectionClosed". + Err(_) => Err(OutboundFailure::ConnectionClosed), + } + } + /// Registers a new notifications protocol. /// /// After a protocol has been registered, you can call `write_notifications`. @@ -831,6 +885,12 @@ enum ServiceToWorkerMsg { engine_id: ConsensusEngineId, target: PeerId, }, + Request { + target: PeerId, + protocol: Cow<'static, str>, + request: Vec, + response: oneshot::Sender, OutboundFailure>>, + }, RegisterNotifProtocol { engine_id: ConsensusEngineId, protocol_name: Cow<'static, [u8]>, @@ -867,6 +927,13 @@ pub struct NetworkWorker { metrics: Option, /// The `PeerId`'s of all boot nodes. boot_node_ids: Arc>, + /// Requests started using [`NetworkService::request`]. Includes the channel to send back the + /// response, when the request has started, and the name of the protocol for diagnostic + /// purposes. + pending_requests: HashMap< + behaviour::RequestId, + (oneshot::Sender, OutboundFailure>>, Instant, String) + >, } struct Metrics { @@ -898,8 +965,10 @@ struct Metrics { peerset_num_requested: Gauge, pending_connections: Gauge, pending_connections_errors_total: CounterVec, - requests_in_total: HistogramVec, - requests_out_finished: HistogramVec, + requests_in_failure_total: CounterVec, + requests_in_success_total: HistogramVec, + requests_out_failure_total: CounterVec, + requests_out_success_total: HistogramVec, requests_out_started_total: CounterVec, } @@ -1057,10 +1126,17 @@ impl Metrics { ), &["reason"] )?, registry)?, - requests_in_total: register(HistogramVec::new( + requests_in_failure_total: register(CounterVec::new( + Opts::new( + "sub_libp2p_requests_in_failure_total", + "Total number of incoming requests that the node has failed to answer" + ), + &["protocol", "reason"] + )?, registry)?, + requests_in_success_total: register(HistogramVec::new( HistogramOpts { common_opts: Opts::new( - "sub_libp2p_requests_in_total", + "sub_libp2p_requests_in_success_total", "Total number of requests received and answered" ), buckets: prometheus_endpoint::exponential_buckets(0.001, 2.0, 16) @@ -1068,11 +1144,18 @@ impl Metrics { }, &["protocol"] )?, registry)?, - requests_out_finished: register(HistogramVec::new( + requests_out_failure_total: register(CounterVec::new( + Opts::new( + "sub_libp2p_requests_out_failure_total", + "Total number of requests that have resulted in a failure" + ), + &["protocol", "reason"] + )?, registry)?, + requests_out_success_total: register(HistogramVec::new( HistogramOpts { common_opts: Opts::new( - "sub_libp2p_requests_out_finished", - "Time between a request's start and finish (successful or not)" + "sub_libp2p_requests_out_success_total", + "For successful requests, time between a request's start and finish" ), buckets: prometheus_endpoint::exponential_buckets(0.001, 2.0, 16) .expect("parameters are always valid values; qed"), @@ -1168,7 +1251,31 @@ impl Future for NetworkWorker { .with_label_values(&["out", &maybe_utf8_bytes_to_string(&engine_id)]) .observe(message.len() as f64); } - this.network_service.user_protocol_mut().write_notification(target, engine_id, message) + this.network_service.user_protocol_mut() + .write_notification(target, engine_id, message); + }, + ServiceToWorkerMsg::Request { target, protocol, request, response } => { + // Calling `send_request` can fail immediately in some circumstances. + // This is handled by sending back an error on the channel. + match this.network_service.send_request(&target, &protocol, request) { + Ok(request_id) => { + if let Some(metrics) = this.metrics.as_ref() { + metrics.requests_out_started_total + .with_label_values(&[&protocol]) + .inc(); + } + this.pending_requests.insert( + request_id, + (response, Instant::now(), protocol.to_string()) + ); + }, + Err(behaviour::SendRequestError::NotConnected) => { + let _ = response.send(Err(OutboundFailure::ConnectionClosed)); + }, + Err(behaviour::SendRequestError::UnknownProtocol) => { + let _ = response.send(Err(OutboundFailure::UnsupportedProtocols)); + }, + } }, ServiceToWorkerMsg::RegisterNotifProtocol { engine_id, protocol_name } => { this.network_service @@ -1209,23 +1316,67 @@ impl Future for NetworkWorker { } this.import_queue.import_finality_proof(origin, hash, nb, proof); }, - Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::AnsweredRequest { protocol, build_time, .. })) => { + Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::InboundRequest { protocol, outcome, .. })) => { if let Some(metrics) = this.metrics.as_ref() { - metrics.requests_in_total - .with_label_values(&[&maybe_utf8_bytes_to_string(&protocol)]) - .observe(build_time.as_secs_f64()); + match outcome { + Ok(serve_time) => { + metrics.requests_in_success_total + .with_label_values(&[&protocol]) + .observe(serve_time.as_secs_f64()); + } + Err(err) => { + let reason = match err { + InboundError::Busy => "busy", + InboundError::Network(InboundFailure::Timeout) => "timeout", + InboundError::Network(InboundFailure::UnsupportedProtocols) => + "unsupported", + }; + + metrics.requests_in_failure_total + .with_label_values(&[&protocol, reason]) + .inc(); + } + } + } + }, + Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::RequestFinished { request_id, outcome })) => { + if let Some((send_back, started, protocol)) = this.pending_requests.remove(&request_id) { + if let Some(metrics) = this.metrics.as_ref() { + match &outcome { + Ok(_) => { + metrics.requests_out_success_total + .with_label_values(&[&protocol]) + .observe(started.elapsed().as_secs_f64()); + } + Err(err) => { + let reason = match err { + OutboundFailure::DialFailure => "dial-failure", + OutboundFailure::Timeout => "timeout", + OutboundFailure::ConnectionClosed => "connection-closed", + OutboundFailure::UnsupportedProtocols => "unsupported", + }; + + metrics.requests_out_failure_total + .with_label_values(&[&protocol, reason]) + .inc(); + } + } + } + let _ = send_back.send(outcome); + } else { + error!("Request not in pending_requests"); } }, - Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::RequestStarted { protocol, .. })) => { + Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::OpaqueRequestStarted { protocol, .. })) => { if let Some(metrics) = this.metrics.as_ref() { metrics.requests_out_started_total .with_label_values(&[&maybe_utf8_bytes_to_string(&protocol)]) .inc(); } }, - Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::RequestFinished { protocol, request_duration, .. })) => { + Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::OpaqueRequestFinished { protocol, request_duration, .. })) => { if let Some(metrics) = this.metrics.as_ref() { - metrics.requests_out_finished + metrics.requests_out_success_total .with_label_values(&[&maybe_utf8_bytes_to_string(&protocol)]) .observe(request_duration.as_secs_f64()); } @@ -1268,11 +1419,11 @@ impl Future for NetworkWorker { let reason = match cause { ConnectionError::IO(_) => "transport-error", ConnectionError::Handler(NodeHandlerWrapperError::Handler(EitherError::A(EitherError::A( - EitherError::A(EitherError::A(EitherError::B( - EitherError::A(PingFailure::Timeout)))))))) => "ping-timeout", + EitherError::A(EitherError::A(EitherError::A(EitherError::B( + EitherError::A(PingFailure::Timeout))))))))) => "ping-timeout", ConnectionError::Handler(NodeHandlerWrapperError::Handler(EitherError::A(EitherError::A( - EitherError::A(EitherError::A(EitherError::A( - EitherError::B(LegacyConnectionKillError)))))))) => "force-closed", + EitherError::A(EitherError::A(EitherError::A(EitherError::A( + EitherError::B(LegacyConnectionKillError))))))))) => "force-closed", ConnectionError::Handler(NodeHandlerWrapperError::Handler(_)) => "protocol-error", ConnectionError::Handler(NodeHandlerWrapperError::KeepAliveTimeout) => "keep-alive-timeout", }; @@ -1434,7 +1585,7 @@ impl Unpin for NetworkWorker { /// Turns bytes that are potentially UTF-8 into a reasonable representable string. /// /// Meant to be used only for debugging or metrics-reporting purposes. -fn maybe_utf8_bytes_to_string(id: &[u8]) -> Cow { +pub(crate) fn maybe_utf8_bytes_to_string(id: &[u8]) -> Cow { if let Ok(s) = std::str::from_utf8(&id[..]) { Cow::Borrowed(s) } else { diff --git a/client/service/src/builder.rs b/client/service/src/builder.rs index 49d2d61f9c2bb..0959567b3217d 100644 --- a/client/service/src/builder.rs +++ b/client/service/src/builder.rs @@ -24,7 +24,7 @@ use crate::{ config::{Configuration, KeystoreConfig, PrometheusConfig, OffchainWorkerConfig}, }; use sc_client_api::{ - self, light::RemoteBlockchain, execution_extensions::ExtensionsFactory, ExecutorProvider, + self, light::RemoteBlockchain, execution_extensions::ExtensionsFactory, ExecutorProvider, ForkBlocks, BadBlocks, CloneableSpawn, UsageProvider, backend::RemoteBackend, }; use sp_utils::mpsc::{tracing_unbounded, TracingUnboundedSender, TracingUnboundedReceiver}; From de1efccd31eeaf674211ad368a605a0e985d6c7e Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 13 Jul 2020 13:37:14 +0200 Subject: [PATCH 02/11] Add tests --- client/network/src/request_responses.rs | 276 +++++++++++++++++------- 1 file changed, 200 insertions(+), 76 deletions(-) diff --git a/client/network/src/request_responses.rs b/client/network/src/request_responses.rs index 07c7b66d31d79..4a9ca0e797701 100644 --- a/client/network/src/request_responses.rs +++ b/client/network/src/request_responses.rs @@ -629,20 +629,17 @@ impl RequestResponseCodec for GenericCodec { #[cfg(test)] mod tests { - use crate::config::ProtocolId; use futures::{channel::mpsc, prelude::*}; use libp2p::identity::Keypair; use libp2p::Multiaddr; use libp2p::core::upgrade; use libp2p::core::transport::{Transport, MemoryTransport}; use libp2p::core::upgrade::{InboundUpgradeExt, OutboundUpgradeExt}; - use libp2p::swarm::Swarm; - use std::{collections::HashSet, iter, task::Poll, time::Duration}; + use libp2p::swarm::{Swarm, SwarmEvent}; + use std::{iter, time::Duration}; #[test] - fn discovery_working() { - let mut user_defined = Vec::new(); - + fn basic_request_response_works() { let protocol_name = "/test/req-rep/1"; // Build swarms whose behaviour is `RequestResponsesBehaviour`. @@ -652,91 +649,218 @@ mod tests { let keypair2 = keypair.clone(); let transport = MemoryTransport - .and_then(move |out, endpoint| { - let secio = libp2p::secio::SecioConfig::new(keypair2); - libp2p::core::upgrade::apply( - out, - secio, - endpoint, - upgrade::Version::V1 - ) - }) - .and_then(move |(peer_id, stream), endpoint| { - let peer_id2 = peer_id.clone(); - let upgrade = libp2p::yamux::Config::default() - .map_inbound(move |muxer| (peer_id, muxer)) - .map_outbound(move |muxer| (peer_id2, muxer)); - upgrade::apply(stream, upgrade, endpoint, upgrade::Version::V1) - }); - - let behaviour = { - let (tx, rx) = mpsc::channel(64); - - async_std::task::spawn(async move { - while let Some(rq) = rx.next().await { - assert_eq!(rq.request_bytes, b"this is a request"); - let _ = rq.answer.send(b"this is a response".to_vec()); - } - }); - - super::RequestResponsesBehaviour::new(iter::once(super::ProtocolConfig { - name: From::from(protocol_name), - max_request_size: 1024, - max_response_size: 1024 * 1024, - request_timeout: Duration::from_secs(30), - requests_processing: Some(tx), - })) - }; + .and_then(move |out, endpoint| { + let secio = libp2p::secio::SecioConfig::new(keypair2); + libp2p::core::upgrade::apply( + out, + secio, + endpoint, + upgrade::Version::V1 + ) + }) + .and_then(move |(peer_id, stream), endpoint| { + let peer_id2 = peer_id.clone(); + let upgrade = libp2p::yamux::Config::default() + .map_inbound(move |muxer| (peer_id, muxer)) + .map_outbound(move |muxer| (peer_id2, muxer)); + upgrade::apply(stream, upgrade, endpoint, upgrade::Version::V1) + }); + + let behaviour = { + let (tx, mut rx) = mpsc::channel(64); + + let b = super::RequestResponsesBehaviour::new(iter::once(super::ProtocolConfig { + name: From::from(protocol_name), + max_request_size: 1024, + max_response_size: 1024 * 1024, + request_timeout: Duration::from_secs(30), + requests_processing: Some(tx), + })).unwrap(); + + async_std::task::spawn(async move { + while let Some(rq) = rx.next().await { + assert_eq!(rq.request_bytes, b"this is a request"); + let _ = rq.answer.send(b"this is a response".to_vec()); + } + }); - let mut swarm = Swarm::new(transport, behaviour, keypair.public().into_peer_id()); - let listen_addr: Multiaddr = format!("/memory/{}", rand::random::()).parse().unwrap(); + b + }; - if user_defined.is_empty() { - user_defined.push((keypair.public().into_peer_id(), listen_addr.clone())); - } + let mut swarm = Swarm::new(transport, behaviour, keypair.public().into_peer_id()); + let listen_addr: Multiaddr = format!("/memory/{}", rand::random::()).parse().unwrap(); Swarm::listen_on(&mut swarm, listen_addr.clone()).unwrap(); (swarm, listen_addr) }) .collect::>(); - let fut = futures::future::poll_fn(move |cx| { - 'polling: loop { - for swarm_n in 0..swarms.len() { - match swarms[swarm_n].0.poll_next_unpin(cx) { - Poll::Ready(Some(e)) => { - match e { - super::Event::UnroutablePeer(other) => { - // Call `add_self_reported_address` to simulate identify happening. - let addr = swarms.iter().find_map(|(s, a)| - if s.local_peer_id == other { - Some(a.clone()) - } else { - None - }) - .unwrap(); - swarms[swarm_n].0.add_self_reported_address(&other, addr); - }, - super::Event::Discovered(other) => { - to_discover[swarm_n].remove(&other); - } - _ => {} - } - continue 'polling - } + // Ask `swarm[0]` to dial `swarm[1]`. There isn't any discovery mechanism in place in + // this test, so they wouldn't connect to each other. + { + let dial_addr = swarms[1].1.clone(); + Swarm::dial_addr(&mut swarms[0].0, dial_addr).unwrap(); + } + + // Running `swarm[0]` in the background until a `InboundRequest` event happens, + // which is a hint about the test having ended. + async_std::task::spawn({ + let (mut swarm, _) = swarms.remove(0); + async move { + loop { + match swarm.next_event().await { + SwarmEvent::Behaviour(super::Event::InboundRequest { outcome, .. }) => { + assert!(outcome.is_ok()); + break + }, _ => {} } } - break; } + }); + + // Remove and run the remaining swarm. + let (mut swarm, _) = swarms.remove(0); + async_std::task::block_on(async move { + let mut sent_request_id = None; + + loop { + match swarm.next_event().await { + SwarmEvent::ConnectionEstablished { peer_id, .. } => { + let id = swarm.send_request( + &peer_id, + protocol_name, + b"this is a request".to_vec() + ).unwrap(); + assert!(sent_request_id.is_none()); + sent_request_id = Some(id); + } + SwarmEvent::Behaviour(super::Event::OutboundFinished { + request_id, + outcome, + }) => { + assert_eq!(Some(request_id), sent_request_id); + let outcome = outcome.unwrap(); + assert_eq!(outcome, b"this is a response"); + break; + } + _ => {} + } + } + }); + } - if to_discover.iter().all(|l| l.is_empty()) { - Poll::Ready(()) - } else { - Poll::Pending + #[test] + fn max_response_size_exceeded() { + let protocol_name = "/test/req-rep/1"; + + // Build swarms whose behaviour is `RequestResponsesBehaviour`. + let mut swarms = (0..2) + .map(|_| { + let keypair = Keypair::generate_ed25519(); + let keypair2 = keypair.clone(); + + let transport = MemoryTransport + .and_then(move |out, endpoint| { + let secio = libp2p::secio::SecioConfig::new(keypair2); + libp2p::core::upgrade::apply( + out, + secio, + endpoint, + upgrade::Version::V1 + ) + }) + .and_then(move |(peer_id, stream), endpoint| { + let peer_id2 = peer_id.clone(); + let upgrade = libp2p::yamux::Config::default() + .map_inbound(move |muxer| (peer_id, muxer)) + .map_outbound(move |muxer| (peer_id2, muxer)); + upgrade::apply(stream, upgrade, endpoint, upgrade::Version::V1) + }); + + let behaviour = { + let (tx, mut rx) = mpsc::channel(64); + + let b = super::RequestResponsesBehaviour::new(iter::once(super::ProtocolConfig { + name: From::from(protocol_name), + max_request_size: 1024, + max_response_size: 8, // <-- important for the test + request_timeout: Duration::from_secs(30), + requests_processing: Some(tx), + })).unwrap(); + + async_std::task::spawn(async move { + while let Some(rq) = rx.next().await { + assert_eq!(rq.request_bytes, b"this is a request"); + let _ = rq.answer.send(b"this response exceeds the limit".to_vec()); + } + }); + + b + }; + + let mut swarm = Swarm::new(transport, behaviour, keypair.public().into_peer_id()); + let listen_addr: Multiaddr = format!("/memory/{}", rand::random::()).parse().unwrap(); + + Swarm::listen_on(&mut swarm, listen_addr.clone()).unwrap(); + (swarm, listen_addr) + }) + .collect::>(); + + // Ask `swarm[0]` to dial `swarm[1]`. There isn't any discovery mechanism in place in + // this test, so they wouldn't connect to each other. + { + let dial_addr = swarms[1].1.clone(); + Swarm::dial_addr(&mut swarms[0].0, dial_addr).unwrap(); + } + + // Running `swarm[0]` in the background until a `InboundRequest` event happens, + // which is a hint about the test having ended. + async_std::task::spawn({ + let (mut swarm, _) = swarms.remove(0); + async move { + loop { + match swarm.next_event().await { + SwarmEvent::Behaviour(super::Event::InboundRequest { outcome, .. }) => { + assert!(outcome.is_ok()); + break + }, + _ => {} + } + } } }); - futures::executor::block_on(fut); + // Remove and run the remaining swarm. + let (mut swarm, _) = swarms.remove(0); + async_std::task::block_on(async move { + let mut sent_request_id = None; + + loop { + match swarm.next_event().await { + SwarmEvent::ConnectionEstablished { peer_id, .. } => { + let id = swarm.send_request( + &peer_id, + protocol_name, + b"this is a request".to_vec() + ).unwrap(); + assert!(sent_request_id.is_none()); + sent_request_id = Some(id); + } + SwarmEvent::Behaviour(super::Event::OutboundFinished { + request_id, + outcome, + }) => { + assert_eq!(Some(request_id), sent_request_id); + match outcome { + Err(super::OutboundFailure::ConnectionClosed) => {}, + _ => panic!() + } + break; + } + _ => {} + } + } + }); } } From 2b774e7a0e9bf7a2bdb4bc6d03a54641c69e451b Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 13 Jul 2020 13:50:58 +0200 Subject: [PATCH 03/11] Fix sc-cli --- client/cli/src/params/network_params.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/client/cli/src/params/network_params.rs b/client/cli/src/params/network_params.rs index 253585544d260..51a1a5da00b78 100644 --- a/client/cli/src/params/network_params.rs +++ b/client/cli/src/params/network_params.rs @@ -147,6 +147,7 @@ impl NetworkParams { listen_addresses, public_addresses, notifications_protocols: Vec::new(), + request_response_protocols: Vec::new(), node_key, node_name: node_name.to_string(), client_version: client_id.to_string(), From 418bb6170ac2e1011e409283c2731e2667c4621a Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 14 Jul 2020 09:57:22 +0200 Subject: [PATCH 04/11] Apply suggestions from code review Co-authored-by: Max Inden --- client/network/src/request_responses.rs | 12 ++++++------ client/network/src/service.rs | 4 ++-- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/client/network/src/request_responses.rs b/client/network/src/request_responses.rs index 4a9ca0e797701..6f5215564f9d9 100644 --- a/client/network/src/request_responses.rs +++ b/client/network/src/request_responses.rs @@ -64,13 +64,13 @@ pub struct ProtocolConfig { /// Maximum allowed size, in bytes, of a request. /// /// Any request larger than this value will be declined as a way to avoid allocating too - /// much memory for the it. + /// much memory for it. pub max_request_size: usize, /// Maximum allowed size, in bytes, of a response. /// /// Any response larger than this value will be declined as a way to avoid allocating too - /// much memory for the it. + /// much memory for it. pub max_response_size: usize, /// Duration after which emitted requests are considered timed out. @@ -108,7 +108,7 @@ pub struct IncomingRequest { pub origin: PeerId, /// Request sent by the remote. Will always be smaller than - /// [`RequestResponseConfig::max_response_size`]. + /// [`RequestResponseConfig::max_request_size`]. pub request_bytes: Vec, /// Channel to send back the response to. @@ -145,7 +145,7 @@ pub enum Event { pub struct RequestResponsesBehaviour { /// The multiple sub-protocols, by name. /// Contains the underlying libp2p `RequestResponse` behaviour, plus an optional - /// "response builder" used to build responses to incoming requests. + /// "response builder" used to build responses for incoming requests. protocols: HashMap< Cow<'static, str>, (RequestResponse, Option>) @@ -207,7 +207,7 @@ impl RequestResponsesBehaviour { /// Initiates sending a request. /// - /// An error is returned if we are not connected to the target peer of if the protocol doesn't + /// An error is returned if we are not connected to the target peer or if the protocol doesn't /// match one that has been registered. pub fn send_request(&mut self, target: &PeerId, protocol: &str, request: Vec) -> Result @@ -439,7 +439,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour { } })); - // This `continue` makres sure that `pending_responses` gets polled + // This `continue` makes sure that `pending_responses` gets polled // after we have added the new element. continue; } diff --git a/client/network/src/service.rs b/client/network/src/service.rs index be82668522f92..7f2dd7e95a342 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -599,7 +599,7 @@ impl NetworkService { /// notifications should remain the default ways of communicating information. For example, a /// peer can announce something through a notification, after which the recipient can obtain /// more information by performing a request. - /// As such, this function is meant be called only with peers we are already connected to. + /// As such, this function is meant to be called only with peers we are already connected to. /// Calling this method with a `target` we are not connected to will *not* attempt to connect /// to said peer. /// @@ -1147,7 +1147,7 @@ impl Metrics { requests_out_failure_total: register(CounterVec::new( Opts::new( "sub_libp2p_requests_out_failure_total", - "Total number of requests that have resulted in a failure" + "Total number of requests that have failed" ), &["protocol", "reason"] )?, registry)?, From 9083df925a965ce12516f017fc0920bb12f9a9c8 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 14 Jul 2020 17:08:29 +0200 Subject: [PATCH 05/11] Fix naming --- client/network/src/behaviour.rs | 16 +++--- client/network/src/request_responses.rs | 73 +++++++++++++------------ client/network/src/service.rs | 30 +++++----- 3 files changed, 60 insertions(+), 59 deletions(-) diff --git a/client/network/src/behaviour.rs b/client/network/src/behaviour.rs index 04a779106c64f..a044dadceb148 100644 --- a/client/network/src/behaviour.rs +++ b/client/network/src/behaviour.rs @@ -38,7 +38,7 @@ use std::{ }; pub use crate::request_responses::{ - InboundError, InboundFailure, OutboundFailure, RequestId, SendRequestError + ResponseFailure, InboundFailure, OutboundFailure, RequestId, SendRequestError }; /// General behaviour of the network. Combines all protocols together. @@ -89,7 +89,7 @@ pub enum BehaviourOut { protocol: Cow<'static, str>, /// If `Ok`, contains the time elapsed between when we received the request and when we /// sent back the response. If `Err`, the error that happened. - outcome: Result, + result: Result, }, /// A request initiated using [`Behaviour::send_request`] has succeeded or failed. @@ -97,7 +97,7 @@ pub enum BehaviourOut { /// Request that has succeeded. request_id: RequestId, /// Response sent by the remote or reason for failure. - outcome: Result, OutboundFailure>, + result: Result, OutboundFailure>, }, /// Started a new request with the given node. @@ -347,18 +347,18 @@ Behaviour { impl NetworkBehaviourEventProcess for Behaviour { fn inject_event(&mut self, event: request_responses::Event) { match event { - request_responses::Event::InboundRequest { peer, protocol, outcome } => { + request_responses::Event::InboundRequest { peer, protocol, result } => { self.events.push_back(BehaviourOut::InboundRequest { peer, protocol, - outcome, + result, }); } - request_responses::Event::OutboundFinished { request_id, outcome } => { + request_responses::Event::RequestFinished { request_id, result } => { self.events.push_back(BehaviourOut::RequestFinished { request_id, - outcome, + result, }); }, } @@ -374,7 +374,7 @@ impl NetworkBehaviourEventProcess { diff --git a/client/network/src/request_responses.rs b/client/network/src/request_responses.rs index 6f5215564f9d9..2284f751a4a25 100644 --- a/client/network/src/request_responses.rs +++ b/client/network/src/request_responses.rs @@ -105,14 +105,14 @@ pub struct ProtocolConfig { #[derive(Debug)] pub struct IncomingRequest { /// Who sent the request. - pub origin: PeerId, + pub peer: PeerId, /// Request sent by the remote. Will always be smaller than /// [`RequestResponseConfig::max_request_size`]. - pub request_bytes: Vec, + pub payload: Vec, /// Channel to send back the response to. - pub answer: oneshot::Sender>, + pub pending_response: oneshot::Sender>, } /// Event generated by the [`RequestResponsesBehaviour`]. @@ -128,16 +128,16 @@ pub enum Event { protocol: Cow<'static, str>, /// If `Ok`, contains the time elapsed between when we received the request and when we /// sent back the response. If `Err`, the error that happened. - outcome: Result, + result: Result, }, /// A request initiated using [`RequestResponsesBehaviour::send_request`] has succeeded or /// failed. - OutboundFinished { + RequestFinished { /// Request that has succeeded. request_id: RequestId, /// Response sent by the remote or reason for failure. - outcome: Result, OutboundFailure>, + result: Result, OutboundFailure>, }, } @@ -160,7 +160,7 @@ pub struct RequestResponsesBehaviour { /// Generated by the response builder and waiting to be processed. enum RequestProcessingOutcome { - PendingResponse { + Response { protocol: Cow<'static, str>, inner_channel: ResponseChannel>, response: Vec, @@ -349,9 +349,9 @@ impl NetworkBehaviour for RequestResponsesBehaviour { // Poll to see if any response is ready to be sent back. // We need to check `is_empty` first, otherwise polling would return `None`. if !self.pending_responses.is_empty() { - while let Poll::Ready(Some(outcome)) = self.pending_responses.poll_next_unpin(cx) { - match outcome { - RequestProcessingOutcome::PendingResponse { + while let Poll::Ready(Some(result)) = self.pending_responses.poll_next_unpin(cx) { + match result { + RequestProcessingOutcome::Response { protocol, inner_channel, response } => { if let Some((protocol, _)) = self.protocols.get_mut(&*protocol) { @@ -362,7 +362,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour { let out = Event::InboundRequest { peer, protocol, - outcome: Err(InboundError::Busy), + result: Err(ResponseFailure::Busy), }; return Poll::Ready(NetworkBehaviourAction::GenerateEvent(out)); } @@ -380,6 +380,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour { // Other events generated by the underlying behaviour are transparently // passed through. NetworkBehaviourAction::DialAddress { address } => { + log::error!("The request-reponse isn't supposed to start dialing peers"); return Poll::Ready(NetworkBehaviourAction::DialAddress { address }) } NetworkBehaviourAction::DialPeer { peer_id, condition } => { @@ -420,9 +421,9 @@ impl NetworkBehaviour for RequestResponsesBehaviour { // If the response builder is too busy, silently drop `tx`. // This will be reported as a `Busy` error. let _ = resp_builder.try_send(IncomingRequest { - origin: peer.clone(), - request_bytes: request, - answer: tx, + peer: peer.clone(), + payload: request, + pending_response: tx, }); } @@ -431,7 +432,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour { // The `tx` created above can be dropped if we are not capable of // processing this request, which is reflected as a "Busy" error. if let Ok(response) = rx.await { - RequestProcessingOutcome::PendingResponse { + RequestProcessingOutcome::Response { protocol, inner_channel: channel, response } } else { @@ -453,9 +454,9 @@ impl NetworkBehaviour for RequestResponsesBehaviour { }, .. } => { - let out = Event::OutboundFinished { + let out = Event::RequestFinished { request_id, - outcome: Ok(response), + result: Ok(response), }; return Poll::Ready(NetworkBehaviourAction::GenerateEvent(out)); } @@ -466,9 +467,9 @@ impl NetworkBehaviour for RequestResponsesBehaviour { error, .. } => { - let out = Event::OutboundFinished { + let out = Event::RequestFinished { request_id, - outcome: Err(error), + result: Err(error), }; return Poll::Ready(NetworkBehaviourAction::GenerateEvent(out)); } @@ -478,7 +479,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour { let out = Event::InboundRequest { peer, protocol: protocol.clone(), - outcome: Err(InboundError::Network(error)), + result: Err(ResponseFailure::Network(error)), }; return Poll::Ready(NetworkBehaviourAction::GenerateEvent(out)); } @@ -509,7 +510,7 @@ pub enum SendRequestError { /// Error when processing a request sent by a remote. #[derive(Debug, derive_more::Display, derive_more::Error)] -pub enum InboundError { +pub enum ResponseFailure { /// Internal response builder is too busy to process this request. Busy, /// Problem on the network. @@ -679,8 +680,8 @@ mod tests { async_std::task::spawn(async move { while let Some(rq) = rx.next().await { - assert_eq!(rq.request_bytes, b"this is a request"); - let _ = rq.answer.send(b"this is a response".to_vec()); + assert_eq!(rq.payload, b"this is a request"); + let _ = rq.pending_response.send(b"this is a response".to_vec()); } }); @@ -709,8 +710,8 @@ mod tests { async move { loop { match swarm.next_event().await { - SwarmEvent::Behaviour(super::Event::InboundRequest { outcome, .. }) => { - assert!(outcome.is_ok()); + SwarmEvent::Behaviour(super::Event::InboundRequest { result, .. }) => { + assert!(result.is_ok()); break }, _ => {} @@ -735,13 +736,13 @@ mod tests { assert!(sent_request_id.is_none()); sent_request_id = Some(id); } - SwarmEvent::Behaviour(super::Event::OutboundFinished { + SwarmEvent::Behaviour(super::Event::RequestFinished { request_id, - outcome, + result, }) => { assert_eq!(Some(request_id), sent_request_id); - let outcome = outcome.unwrap(); - assert_eq!(outcome, b"this is a response"); + let result = result.unwrap(); + assert_eq!(result, b"this is a response"); break; } _ => {} @@ -791,8 +792,8 @@ mod tests { async_std::task::spawn(async move { while let Some(rq) = rx.next().await { - assert_eq!(rq.request_bytes, b"this is a request"); - let _ = rq.answer.send(b"this response exceeds the limit".to_vec()); + assert_eq!(rq.payload, b"this is a request"); + let _ = rq.pending_response.send(b"this response exceeds the limit".to_vec()); } }); @@ -821,8 +822,8 @@ mod tests { async move { loop { match swarm.next_event().await { - SwarmEvent::Behaviour(super::Event::InboundRequest { outcome, .. }) => { - assert!(outcome.is_ok()); + SwarmEvent::Behaviour(super::Event::InboundRequest { result, .. }) => { + assert!(result.is_ok()); break }, _ => {} @@ -847,12 +848,12 @@ mod tests { assert!(sent_request_id.is_none()); sent_request_id = Some(id); } - SwarmEvent::Behaviour(super::Event::OutboundFinished { + SwarmEvent::Behaviour(super::Event::RequestFinished { request_id, - outcome, + result, }) => { assert_eq!(Some(request_id), sent_request_id); - match outcome { + match result { Err(super::OutboundFailure::ConnectionClosed) => {}, _ => panic!() } diff --git a/client/network/src/service.rs b/client/network/src/service.rs index 7f2dd7e95a342..d34137f877ea6 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -75,7 +75,7 @@ use std::{ }; use wasm_timer::Instant; -pub use behaviour::{InboundError, InboundFailure, OutboundFailure}; +pub use behaviour::{ResponseFailure, InboundFailure, OutboundFailure}; mod out_events; #[cfg(test)] @@ -619,7 +619,7 @@ impl NetworkService { target, protocol: protocol.into(), request, - response: tx + pending_response: tx }); match rx.await { @@ -889,7 +889,7 @@ enum ServiceToWorkerMsg { target: PeerId, protocol: Cow<'static, str>, request: Vec, - response: oneshot::Sender, OutboundFailure>>, + pending_response: oneshot::Sender, OutboundFailure>>, }, RegisterNotifProtocol { engine_id: ConsensusEngineId, @@ -1254,7 +1254,7 @@ impl Future for NetworkWorker { this.network_service.user_protocol_mut() .write_notification(target, engine_id, message); }, - ServiceToWorkerMsg::Request { target, protocol, request, response } => { + ServiceToWorkerMsg::Request { target, protocol, request, pending_response } => { // Calling `send_request` can fail immediately in some circumstances. // This is handled by sending back an error on the channel. match this.network_service.send_request(&target, &protocol, request) { @@ -1266,14 +1266,14 @@ impl Future for NetworkWorker { } this.pending_requests.insert( request_id, - (response, Instant::now(), protocol.to_string()) + (pending_response, Instant::now(), protocol.to_string()) ); }, Err(behaviour::SendRequestError::NotConnected) => { - let _ = response.send(Err(OutboundFailure::ConnectionClosed)); + let _ = pending_response.send(Err(OutboundFailure::ConnectionClosed)); }, Err(behaviour::SendRequestError::UnknownProtocol) => { - let _ = response.send(Err(OutboundFailure::UnsupportedProtocols)); + let _ = pending_response.send(Err(OutboundFailure::UnsupportedProtocols)); }, } }, @@ -1316,9 +1316,9 @@ impl Future for NetworkWorker { } this.import_queue.import_finality_proof(origin, hash, nb, proof); }, - Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::InboundRequest { protocol, outcome, .. })) => { + Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::InboundRequest { protocol, result, .. })) => { if let Some(metrics) = this.metrics.as_ref() { - match outcome { + match result { Ok(serve_time) => { metrics.requests_in_success_total .with_label_values(&[&protocol]) @@ -1326,9 +1326,9 @@ impl Future for NetworkWorker { } Err(err) => { let reason = match err { - InboundError::Busy => "busy", - InboundError::Network(InboundFailure::Timeout) => "timeout", - InboundError::Network(InboundFailure::UnsupportedProtocols) => + ResponseFailure::Busy => "busy", + ResponseFailure::Network(InboundFailure::Timeout) => "timeout", + ResponseFailure::Network(InboundFailure::UnsupportedProtocols) => "unsupported", }; @@ -1339,10 +1339,10 @@ impl Future for NetworkWorker { } } }, - Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::RequestFinished { request_id, outcome })) => { + Poll::Ready(SwarmEvent::Behaviour(BehaviourOut::RequestFinished { request_id, result })) => { if let Some((send_back, started, protocol)) = this.pending_requests.remove(&request_id) { if let Some(metrics) = this.metrics.as_ref() { - match &outcome { + match &result { Ok(_) => { metrics.requests_out_success_total .with_label_values(&[&protocol]) @@ -1362,7 +1362,7 @@ impl Future for NetworkWorker { } } } - let _ = send_back.send(outcome); + let _ = send_back.send(result); } else { error!("Request not in pending_requests"); } From 44c0203100cf0c43e5e9cace4fb91ccd9ddffa96 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 14 Jul 2020 17:09:23 +0200 Subject: [PATCH 06/11] Fix other issues --- client/network/src/request_responses.rs | 37 ++++++++++++------------- 1 file changed, 17 insertions(+), 20 deletions(-) diff --git a/client/network/src/request_responses.rs b/client/network/src/request_responses.rs index 2284f751a4a25..a9aa17d8e0a1a 100644 --- a/client/network/src/request_responses.rs +++ b/client/network/src/request_responses.rs @@ -345,28 +345,25 @@ impl NetworkBehaviour for RequestResponsesBehaviour { Self::OutEvent, >, > { - loop { + 'poll_all: loop { // Poll to see if any response is ready to be sent back. - // We need to check `is_empty` first, otherwise polling would return `None`. - if !self.pending_responses.is_empty() { - while let Poll::Ready(Some(result)) = self.pending_responses.poll_next_unpin(cx) { - match result { - RequestProcessingOutcome::Response { - protocol, inner_channel, response - } => { - if let Some((protocol, _)) = self.protocols.get_mut(&*protocol) { - protocol.send_response(inner_channel, response); - } - } - RequestProcessingOutcome::Busy { peer, protocol } => { - let out = Event::InboundRequest { - peer, - protocol, - result: Err(ResponseFailure::Busy), - }; - return Poll::Ready(NetworkBehaviourAction::GenerateEvent(out)); + while let Poll::Ready(Some(result)) = self.pending_responses.poll_next_unpin(cx) { + match result { + RequestProcessingOutcome::Response { + protocol, inner_channel, response + } => { + if let Some((protocol, _)) = self.protocols.get_mut(&*protocol) { + protocol.send_response(inner_channel, response); } } + RequestProcessingOutcome::Busy { peer, protocol } => { + let out = Event::InboundRequest { + peer, + protocol, + result: Err(ResponseFailure::Busy), + }; + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(out)); + } } } @@ -442,7 +439,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour { // This `continue` makes sure that `pending_responses` gets polled // after we have added the new element. - continue; + continue 'poll_all; } // Received a response from a remote to one of our requests. From d0d98105820c96b7e08a56eedbd3c2f7b403e4a9 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 14 Jul 2020 17:10:22 +0200 Subject: [PATCH 07/11] Other naming fix --- client/network/src/request_responses.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/client/network/src/request_responses.rs b/client/network/src/request_responses.rs index a9aa17d8e0a1a..b2f5d59c9e144 100644 --- a/client/network/src/request_responses.rs +++ b/client/network/src/request_responses.rs @@ -29,7 +29,7 @@ //! - Requests have a certain time limit before they time out. This time includes the time it //! takes to send/receive the request and response. //! -//! - If provided, a ["requests processing"](RequestResponseConfig::requests_processing) channel +//! - If provided, a ["requests processing"](RequestResponseConfig::inbound_queue) channel //! is used to handle incoming requests. //! @@ -98,7 +98,7 @@ pub struct ProtocolConfig { /// other peers. If this is `Some` but the channel is closed, then the local node will /// advertise support for this protocol, but any incoming request will lead to an error being /// sent back. - pub requests_processing: Option>, + pub inbound_queue: Option>, } /// A single request received by a peer on a request-response protocol. @@ -181,7 +181,7 @@ impl RequestResponsesBehaviour { cfg.set_connection_keep_alive(Duration::from_secs(10)); cfg.set_request_timeout(protocol.request_timeout); - let protocol_support = if protocol.requests_processing.is_some() { + let protocol_support = if protocol.inbound_queue.is_some() { ProtocolSupport::Full } else { ProtocolSupport::Outbound @@ -193,7 +193,7 @@ impl RequestResponsesBehaviour { }, iter::once((protocol.name.as_bytes().to_vec(), protocol_support)), cfg); match protocols.entry(protocol.name) { - Entry::Vacant(e) => e.insert((rq_rp, protocol.requests_processing)), + Entry::Vacant(e) => e.insert((rq_rp, protocol.inbound_queue)), Entry::Occupied(e) => return Err(RegisterError::DuplicateProtocol(e.key().clone())), }; @@ -672,7 +672,7 @@ mod tests { max_request_size: 1024, max_response_size: 1024 * 1024, request_timeout: Duration::from_secs(30), - requests_processing: Some(tx), + inbound_queue: Some(tx), })).unwrap(); async_std::task::spawn(async move { @@ -784,7 +784,7 @@ mod tests { max_request_size: 1024, max_response_size: 8, // <-- important for the test request_timeout: Duration::from_secs(30), - requests_processing: Some(tx), + inbound_queue: Some(tx), })).unwrap(); async_std::task::spawn(async move { From 47fa2ac2bd77a7af4528ad7026adb7bec93a7beb Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Wed, 15 Jul 2020 10:18:36 +0200 Subject: [PATCH 08/11] Fix error logging --- client/network/src/request_responses.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/client/network/src/request_responses.rs b/client/network/src/request_responses.rs index b2f5d59c9e144..b57dfcea926be 100644 --- a/client/network/src/request_responses.rs +++ b/client/network/src/request_responses.rs @@ -377,10 +377,11 @@ impl NetworkBehaviour for RequestResponsesBehaviour { // Other events generated by the underlying behaviour are transparently // passed through. NetworkBehaviourAction::DialAddress { address } => { - log::error!("The request-reponse isn't supposed to start dialing peers"); + log::error!("The request-response isn't supposed to start dialing peers"); return Poll::Ready(NetworkBehaviourAction::DialAddress { address }) } NetworkBehaviourAction::DialPeer { peer_id, condition } => { + log::error!("The request-response isn't supposed to start dialing peers"); return Poll::Ready(NetworkBehaviourAction::DialPeer { peer_id, condition, From 13ed8315983b57e4d1ad793d13aa356b2b6aab04 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Wed, 15 Jul 2020 11:04:37 +0200 Subject: [PATCH 09/11] Max sizes to u64 --- client/network/src/request_responses.rs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/client/network/src/request_responses.rs b/client/network/src/request_responses.rs index b57dfcea926be..6f8770d0cbbc8 100644 --- a/client/network/src/request_responses.rs +++ b/client/network/src/request_responses.rs @@ -49,8 +49,8 @@ use libp2p::{ }, }; use std::{ - borrow::Cow, collections::{hash_map::Entry, HashMap}, io, iter, pin::Pin, - task::{Context, Poll}, time::Duration, + borrow::Cow, collections::{hash_map::Entry, HashMap}, convert::TryFrom as _, io, iter, + pin::Pin, task::{Context, Poll}, time::Duration, }; pub use libp2p::request_response::{InboundFailure, OutboundFailure, RequestId}; @@ -65,13 +65,13 @@ pub struct ProtocolConfig { /// /// Any request larger than this value will be declined as a way to avoid allocating too /// much memory for it. - pub max_request_size: usize, + pub max_request_size: u64, /// Maximum allowed size, in bytes, of a response. /// /// Any response larger than this value will be declined as a way to avoid allocating too /// much memory for it. - pub max_response_size: usize, + pub max_response_size: u64, /// Duration after which emitted requests are considered timed out. /// @@ -521,8 +521,8 @@ pub enum ResponseFailure { #[derive(Debug, Clone)] #[doc(hidden)] // Needs to be public in order to satisfy the Rust compiler. pub struct GenericCodec { - max_request_size: usize, - max_response_size: usize, + max_request_size: u64, + max_response_size: u64, } #[async_trait::async_trait] @@ -542,7 +542,7 @@ impl RequestResponseCodec for GenericCodec { // Read the length. let length = unsigned_varint::aio::read_usize(&mut io).await .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?; - if length > self.max_request_size { + if length > usize::try_from(self.max_request_size).unwrap_or(usize::max_value()) { return Err(io::Error::new( io::ErrorKind::InvalidInput, format!("Request size exceeds limit: {} > {}", length, self.max_request_size) @@ -566,7 +566,7 @@ impl RequestResponseCodec for GenericCodec { // Read the length. let length = unsigned_varint::aio::read_usize(&mut io).await .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?; - if length > self.max_response_size { + if length > usize::try_from(self.max_response_size).unwrap_or(usize::max_value()) { return Err(io::Error::new( io::ErrorKind::InvalidInput, format!("Response size exceeds limit: {} > {}", length, self.max_response_size) From 3f914b7fdbc11c7b347bbf235640684ca7892e88 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Wed, 15 Jul 2020 11:21:33 +0200 Subject: [PATCH 10/11] Don't kill connections on refusal to process --- client/network/src/behaviour.rs | 4 +- client/network/src/lib.rs | 2 +- client/network/src/request_responses.rs | 63 ++++++++++++++++++------- client/network/src/service.rs | 29 +++++++----- 4 files changed, 66 insertions(+), 32 deletions(-) diff --git a/client/network/src/behaviour.rs b/client/network/src/behaviour.rs index a044dadceb148..dd5ad027b694e 100644 --- a/client/network/src/behaviour.rs +++ b/client/network/src/behaviour.rs @@ -38,7 +38,7 @@ use std::{ }; pub use crate::request_responses::{ - ResponseFailure, InboundFailure, OutboundFailure, RequestId, SendRequestError + ResponseFailure, InboundFailure, RequestFailure, OutboundFailure, RequestId, SendRequestError }; /// General behaviour of the network. Combines all protocols together. @@ -97,7 +97,7 @@ pub enum BehaviourOut { /// Request that has succeeded. request_id: RequestId, /// Response sent by the remote or reason for failure. - result: Result, OutboundFailure>, + result: Result, RequestFailure>, }, /// Started a new request with the given node. diff --git a/client/network/src/lib.rs b/client/network/src/lib.rs index ad57a118ffd7b..132f37bef4abe 100644 --- a/client/network/src/lib.rs +++ b/client/network/src/lib.rs @@ -264,7 +264,7 @@ pub mod network_state; #[doc(inline)] pub use libp2p::{multiaddr, Multiaddr, PeerId}; pub use protocol::{event::{DhtEvent, Event, ObservedRole}, sync::SyncState, PeerInfo}; -pub use service::{NetworkService, NetworkWorker, OutboundFailure}; +pub use service::{NetworkService, NetworkWorker, RequestFailure, OutboundFailure}; pub use sc_peerset::ReputationChange; use sp_runtime::traits::{Block as BlockT, NumberFor}; diff --git a/client/network/src/request_responses.rs b/client/network/src/request_responses.rs index 6f8770d0cbbc8..92233c77d6bd1 100644 --- a/client/network/src/request_responses.rs +++ b/client/network/src/request_responses.rs @@ -137,7 +137,7 @@ pub enum Event { /// Request that has succeeded. request_id: RequestId, /// Response sent by the remote or reason for failure. - result: Result, OutboundFailure>, + result: Result, RequestFailure>, }, } @@ -162,7 +162,7 @@ pub struct RequestResponsesBehaviour { enum RequestProcessingOutcome { Response { protocol: Cow<'static, str>, - inner_channel: ResponseChannel>, + inner_channel: ResponseChannel, ()>>, response: Vec, }, Busy { @@ -353,7 +353,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour { protocol, inner_channel, response } => { if let Some((protocol, _)) = self.protocols.get_mut(&*protocol) { - protocol.send_response(inner_channel, response); + protocol.send_response(inner_channel, Ok(response)); } } RequestProcessingOutcome::Busy { peer, protocol } => { @@ -454,7 +454,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour { } => { let out = Event::RequestFinished { request_id, - result: Ok(response), + result: response.map_err(|()| RequestFailure::Refused), }; return Poll::Ready(NetworkBehaviourAction::GenerateEvent(out)); } @@ -467,7 +467,7 @@ impl NetworkBehaviour for RequestResponsesBehaviour { } => { let out = Event::RequestFinished { request_id, - result: Err(error), + result: Err(RequestFailure::Network(error)), }; return Poll::Ready(NetworkBehaviourAction::GenerateEvent(out)); } @@ -506,6 +506,17 @@ pub enum SendRequestError { UnknownProtocol, } +/// Error in a request. +#[derive(Debug, derive_more::Display, derive_more::Error)] +pub enum RequestFailure { + /// Remote has closed the substream before answering, thereby signaling that it considers the + /// request as valid, but refused to answer it. + Refused, + /// Problem on the network. + #[display(fmt = "Problem on the network")] + Network(#[error(ignore)] OutboundFailure), +} + /// Error when processing a request sent by a remote. #[derive(Debug, derive_more::Display, derive_more::Error)] pub enum ResponseFailure { @@ -529,7 +540,7 @@ pub struct GenericCodec { impl RequestResponseCodec for GenericCodec { type Protocol = Vec; type Request = Vec; - type Response = Vec; + type Response = Result, ()>; async fn read_request( &mut self, @@ -563,9 +574,22 @@ impl RequestResponseCodec for GenericCodec { where T: AsyncRead + Unpin + Send, { + // Note that this function returns a `Result>`. Returning an `Err` is + // considered as a protocol error and will result in the entire connection being closed. + // Returning `Ok(Err(_))` signifies that a response has successfully been fetched, and + // that this response is an error. + // Read the length. - let length = unsigned_varint::aio::read_usize(&mut io).await - .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?; + let length = match unsigned_varint::aio::read_usize(&mut io).await { + Ok(l) => l, + Err(unsigned_varint::io::ReadError::Io(err)) + if matches!(err.kind(), io::ErrorKind::UnexpectedEof) => + { + return Ok(Err(())); + } + Err(err) => return Err(io::Error::new(io::ErrorKind::InvalidInput, err)), + }; + if length > usize::try_from(self.max_response_size).unwrap_or(usize::max_value()) { return Err(io::Error::new( io::ErrorKind::InvalidInput, @@ -576,7 +600,7 @@ impl RequestResponseCodec for GenericCodec { // Read the payload. let mut buffer = vec![0; length]; io.read_exact(&mut buffer).await?; - Ok(buffer) + Ok(Ok(buffer)) } async fn write_request( @@ -611,15 +635,18 @@ impl RequestResponseCodec for GenericCodec { where T: AsyncWrite + Unpin + Send, { - // TODO: check the length? - // Write the length. - { - let mut buffer = unsigned_varint::encode::usize_buffer(); - io.write_all(unsigned_varint::encode::usize(res.len(), &mut buffer)).await?; - } + // If `res` is an `Err`, we jump to closing the substream without writing anything on it. + if let Ok(res) = res { + // TODO: check the length? + // Write the length. + { + let mut buffer = unsigned_varint::encode::usize_buffer(); + io.write_all(unsigned_varint::encode::usize(res.len(), &mut buffer)).await?; + } - // Write the payload. - io.write_all(&res).await?; + // Write the payload. + io.write_all(&res).await?; + } io.close().await?; Ok(()) @@ -852,7 +879,7 @@ mod tests { }) => { assert_eq!(Some(request_id), sent_request_id); match result { - Err(super::OutboundFailure::ConnectionClosed) => {}, + Err(super::RequestFailure::Network(super::OutboundFailure::ConnectionClosed)) => {}, _ => panic!() } break; diff --git a/client/network/src/service.rs b/client/network/src/service.rs index d34137f877ea6..f5e484a81dab6 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -75,7 +75,7 @@ use std::{ }; use wasm_timer::Instant; -pub use behaviour::{ResponseFailure, InboundFailure, OutboundFailure}; +pub use behaviour::{ResponseFailure, InboundFailure, RequestFailure, OutboundFailure}; mod out_events; #[cfg(test)] @@ -613,7 +613,7 @@ impl NetworkService { target: PeerId, protocol: impl Into>, request: Vec - ) -> Result, OutboundFailure> { + ) -> Result, RequestFailure> { let (tx, rx) = oneshot::channel(); let _ = self.to_worker.unbounded_send(ServiceToWorkerMsg::Request { target, @@ -627,7 +627,7 @@ impl NetworkService { // The channel can only be closed if the network worker no longer exists. If the // network worker no longer exists, then all connections to `target` are necessarily // closed, and we legitimately report this situation as a "ConnectionClosed". - Err(_) => Err(OutboundFailure::ConnectionClosed), + Err(_) => Err(RequestFailure::Network(OutboundFailure::ConnectionClosed)), } } @@ -889,7 +889,7 @@ enum ServiceToWorkerMsg { target: PeerId, protocol: Cow<'static, str>, request: Vec, - pending_response: oneshot::Sender, OutboundFailure>>, + pending_response: oneshot::Sender, RequestFailure>>, }, RegisterNotifProtocol { engine_id: ConsensusEngineId, @@ -932,7 +932,7 @@ pub struct NetworkWorker { /// purposes. pending_requests: HashMap< behaviour::RequestId, - (oneshot::Sender, OutboundFailure>>, Instant, String) + (oneshot::Sender, RequestFailure>>, Instant, String) >, } @@ -1270,10 +1270,12 @@ impl Future for NetworkWorker { ); }, Err(behaviour::SendRequestError::NotConnected) => { - let _ = pending_response.send(Err(OutboundFailure::ConnectionClosed)); + let err = RequestFailure::Network(OutboundFailure::ConnectionClosed); + let _ = pending_response.send(Err(err)); }, Err(behaviour::SendRequestError::UnknownProtocol) => { - let _ = pending_response.send(Err(OutboundFailure::UnsupportedProtocols)); + let err = RequestFailure::Network(OutboundFailure::UnsupportedProtocols); + let _ = pending_response.send(Err(err)); }, } }, @@ -1350,10 +1352,15 @@ impl Future for NetworkWorker { } Err(err) => { let reason = match err { - OutboundFailure::DialFailure => "dial-failure", - OutboundFailure::Timeout => "timeout", - OutboundFailure::ConnectionClosed => "connection-closed", - OutboundFailure::UnsupportedProtocols => "unsupported", + RequestFailure::Refused => "refused", + RequestFailure::Network(OutboundFailure::DialFailure) => + "dial-failure", + RequestFailure::Network(OutboundFailure::Timeout) => + "timeout", + RequestFailure::Network(OutboundFailure::ConnectionClosed) => + "connection-closed", + RequestFailure::Network(OutboundFailure::UnsupportedProtocols) => + "unsupported", }; metrics.requests_out_failure_total From 12878bb1884cd61c88a13f07cd9356b34b19c66d Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Wed, 15 Jul 2020 11:34:41 +0200 Subject: [PATCH 11/11] Adjust comment --- client/network/src/service.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/client/network/src/service.rs b/client/network/src/service.rs index f5e484a81dab6..d154de637814c 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -603,8 +603,8 @@ impl NetworkService { /// Calling this method with a `target` we are not connected to will *not* attempt to connect /// to said peer. /// - /// Contrary to notifications, requests are guaranteed to not be interrupted under normal - /// circumstances. + /// No limit or throttling of concurrent outbound requests per peer and protocol are enforced. + /// Such restrictions, if desired, need to be enforced at the call site(s). /// /// The protocol must have been registered through /// [`NetworkConfiguration::request_response_protocols`].