diff --git a/client/network/src/behaviour.rs b/client/network/src/behaviour.rs index 3186801e5fd21..ce92114c53011 100644 --- a/client/network/src/behaviour.rs +++ b/client/network/src/behaviour.rs @@ -261,6 +261,16 @@ impl NetworkBehaviourEventProcess { + // There doesn't exist any mechanism to report cancellations yet. + // We would normally disconnect the node, but this event happens as the result of + // a disconnect, so there's nothing more to do. + } + block_requests::Event::RequestTimeout { peer, .. } => { + // There doesn't exist any mechanism to report timeouts yet, so we process them by + // disconnecting the node. + self.substrate.disconnect_peer(&peer); + } } } } diff --git a/client/network/src/protocol/block_requests.rs b/client/network/src/protocol/block_requests.rs index f1981171af33d..3c511538d9999 100644 --- a/client/network/src/protocol/block_requests.rs +++ b/client/network/src/protocol/block_requests.rs @@ -30,6 +30,7 @@ use crate::{ protocol::{api, message::{self, BlockAttributes}} }; use futures::{future::BoxFuture, prelude::*, stream::FuturesUnordered}; +use futures_timer::Delay; use libp2p::{ core::{ ConnectedPoint, @@ -54,10 +55,11 @@ use prost::Message; use sp_runtime::{generic::BlockId, traits::{Block, Header, One, Zero}}; use std::{ cmp::min, - collections::VecDeque, + collections::{HashMap, VecDeque}, io, iter, marker::PhantomData, + pin::Pin, sync::Arc, time::Duration, task::{Context, Poll} @@ -77,6 +79,19 @@ pub enum Event { original_request: message::BlockRequest, response: message::BlockResponse, }, + /// A request has been cancelled because the peer has disconnected. + /// Disconnects can also happen as a result of violating the network protocol. + RequestCancelled { + peer: PeerId, + /// The original request passed to `send_request`. + original_request: message::BlockRequest, + }, + /// A request has timed out. + RequestTimeout { + peer: PeerId, + /// The original request passed to `send_request`. + original_request: message::BlockRequest, + } } /// Configuration options for `BlockRequests`. @@ -86,6 +101,7 @@ pub struct Config { max_request_len: usize, max_response_len: usize, inactivity_timeout: Duration, + request_timeout: Duration, protocol: Bytes, } @@ -96,12 +112,14 @@ impl Config { /// - max. request size = 1 MiB /// - max. response size = 16 MiB /// - inactivity timeout = 15s + /// - request timeout = 40s pub fn new(id: &ProtocolId) -> Self { let mut c = Config { max_block_data_response: 128, max_request_len: 1024 * 1024, max_response_len: 16 * 1024 * 1024, inactivity_timeout: Duration::from_secs(15), + request_timeout: Duration::from_secs(40), protocol: Bytes::new(), }; c.set_protocol(id); @@ -149,12 +167,27 @@ pub struct BlockRequests { config: Config, /// Blockchain client. chain: Arc>, + /// List of all active connections and the requests we've sent. + peers: HashMap>>, /// Futures sending back the block request response. outgoing: FuturesUnordered>, /// Events to return as soon as possible from `poll`. pending_events: VecDeque, Event>>, } +/// Local tracking of a libp2p connection. +#[derive(Debug)] +struct Connection { + id: ConnectionId, + ongoing_request: Option>, +} + +#[derive(Debug)] +struct OngoingRequest { + request: message::BlockRequest, + timeout: Delay, +} + impl BlockRequests where B: Block, @@ -163,6 +196,7 @@ where BlockRequests { config: cfg, chain, + peers: HashMap::new(), outgoing: FuturesUnordered::new(), pending_events: VecDeque::new(), } @@ -170,9 +204,30 @@ where /// Issue a new block request. /// + /// Cancels any existing request targeting the same `PeerId`. + /// /// If the response doesn't arrive in time, or if the remote answers improperly, the target /// will be disconnected. pub fn send_request(&mut self, target: &PeerId, req: message::BlockRequest) { + // Determine which connection to send the request to. + let connection = if let Some(peer) = self.peers.get_mut(target) { + // We don't want to have multiple requests for any given node, so in priority try to + // find a connection with an existing request, to override it. + if let Some(entry) = peer.iter_mut().find(|c| c.ongoing_request.is_some()) { + entry + } else if let Some(entry) = peer.get_mut(0) { + entry + } else { + log::error!( + target: "sync", + "State inconsistency: empty list of peer connections" + ); + return; + } + } else { + return; + }; + let protobuf_rq = api::v1::BlockRequest { fields: u32::from_be_bytes([req.fields.bits(), 0, 0, 0]), from_block: match req.from { @@ -191,14 +246,31 @@ where let mut buf = Vec::with_capacity(protobuf_rq.encoded_len()); if let Err(err) = protobuf_rq.encode(&mut buf) { - log::warn!("failed to encode block request {:?}: {:?}", protobuf_rq, err); + log::warn!( + target: "sync", + "Failed to encode block request {:?}: {:?}", + protobuf_rq, + err + ); return; } - log::trace!("enqueueing block request to {:?}: {:?}", target, protobuf_rq); + if let Some(rq) = &connection.ongoing_request { + log::debug!( + target: "sync", + "Replacing existing block request on connection {:?}", + connection.id + ); + } + connection.ongoing_request = Some(OngoingRequest { + request: req.clone(), + timeout: Delay::new(self.config.request_timeout), + }); + + log::trace!(target: "sync", "Enqueueing block request to {:?}: {:?}", target, protobuf_rq); self.pending_events.push_back(NetworkBehaviourAction::NotifyHandler { peer_id: target.clone(), - handler: NotifyHandler::Any, + handler: NotifyHandler::One(connection.id), event: OutboundProtocol { request: buf, original_request: req, @@ -215,7 +287,9 @@ where , request: &api::v1::BlockRequest ) -> Result { - log::trace!("block request from peer {}: from block {:?} to block {:?}, max blocks {:?}", + log::trace!( + target: "sync", + "Block request from peer {}: from block {:?} to block {:?}, max blocks {:?}", peer, request.from_block, request.to_block, @@ -332,6 +406,7 @@ where }; let mut cfg = OneShotHandlerConfig::default(); cfg.inactive_timeout = self.config.inactivity_timeout; + cfg.substream_timeout = self.config.request_timeout; OneShotHandler::new(SubstreamProtocol::new(p), cfg) } @@ -345,34 +420,138 @@ where fn inject_disconnected(&mut self, _peer: &PeerId) { } + fn inject_connection_established(&mut self, peer_id: &PeerId, id: &ConnectionId, _: &ConnectedPoint) { + self.peers.entry(peer_id.clone()) + .or_default() + .push(Connection { + id: *id, + ongoing_request: None, + }); + } + + fn inject_connection_closed(&mut self, peer_id: &PeerId, id: &ConnectionId, _: &ConnectedPoint) { + let mut needs_remove = false; + if let Some(entry) = self.peers.get_mut(peer_id) { + if let Some(pos) = entry.iter().position(|i| i.id == *id) { + let ongoing_request = entry.remove(pos).ongoing_request; + if let Some(ongoing_request) = ongoing_request { + log::debug!( + target: "sync", + "Connection {:?} with {} closed with ongoing sync request: {:?}", + id, + peer_id, + ongoing_request + ); + let ev = Event::RequestCancelled { + peer: peer_id.clone(), + original_request: ongoing_request.request.clone(), + }; + self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(ev)); + } + if entry.is_empty() { + needs_remove = true; + } + } else { + log::error!( + target: "sync", + "State inconsistency: connection id not found in list" + ); + } + } else { + log::error!( + target: "sync", + "State inconsistency: peer_id not found in list of connections" + ); + } + if needs_remove { + self.peers.remove(peer_id); + } + } + fn inject_event( &mut self, peer: PeerId, - connection: ConnectionId, + connection_id: ConnectionId, node_event: NodeEvent ) { match node_event { NodeEvent::Request(request, mut stream) => { match self.on_block_request(&peer, &request) { Ok(res) => { - log::trace!("enqueueing block response for peer {} with {} blocks", peer, res.blocks.len()); + log::trace!( + target: "sync", + "Enqueueing block response for peer {} with {} blocks", + peer, res.blocks.len() + ); let mut data = Vec::with_capacity(res.encoded_len()); if let Err(e) = res.encode(&mut data) { - log::debug!("error encoding block response for peer {}: {}", peer, e) + log::debug!( + target: "sync", + "Error encoding block response for peer {}: {}", + peer, e + ) } else { let future = async move { if let Err(e) = write_one(&mut stream, data).await { - log::debug!("error writing block response: {}", e) + log::debug!( + target: "sync", + "Error writing block response: {}", + e + ); } }; self.outgoing.push(future.boxed()) } } - Err(e) => log::debug!("error handling block request from peer {}: {}", peer, e) + Err(e) => log::debug!( + target: "sync", + "Error handling block request from peer {}: {}", peer, e + ) } } NodeEvent::Response(original_request, response) => { - log::trace!("received block response from peer {} with {} blocks", peer, response.blocks.len()); + log::trace!( + target: "sync", + "Received block response from peer {} with {} blocks", + peer, response.blocks.len() + ); + if let Some(connections) = self.peers.get_mut(&peer) { + if let Some(connection) = connections.iter_mut().find(|c| c.id == connection_id) { + if let Some(ongoing_request) = &mut connection.ongoing_request { + if ongoing_request.request == original_request { + connection.ongoing_request = None; + } else { + // We're no longer interested in that request. + log::debug!( + target: "sync", + "Received response from {} to obsolete block request {:?}", + peer, + original_request + ); + return; + } + } else { + // We remove from `self.peers` requests we're no longer interested in, + // so this can legitimately happen. + return; + } + } else { + log::error!( + target: "sync", + "State inconsistency: response on non-existing connection {:?}", + connection_id + ); + return; + } + } else { + log::error!( + target: "sync", + "State inconsistency: response on non-connected peer {}", + peer + ); + return; + } + let blocks = response.blocks.into_iter().map(|block_data| { Ok(message::BlockData:: { hash: Decode::decode(&mut block_data.hash.as_ref())?, @@ -419,7 +598,10 @@ where self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(ev)); } Err(err) => { - log::debug!("failed to decode block response from peer {}: {}", peer, err); + log::debug!( + target: "sync", + "Failed to decode block response from peer {}: {}", peer, err + ); } } } @@ -433,6 +615,31 @@ where return Poll::Ready(ev); } + // Check the request timeouts. + for (peer, connections) in &mut self.peers { + for connection in connections { + let ongoing_request = match &mut connection.ongoing_request { + Some(rq) => rq, + None => continue, + }; + + if let Poll::Ready(_) = Pin::new(&mut ongoing_request.timeout).poll(cx) { + let original_request = ongoing_request.request.clone(); + connection.ongoing_request = None; + log::debug!( + target: "sync", + "Request timeout for {}: {:?}", + peer, original_request + ); + let ev = Event::RequestTimeout { + peer: peer.clone(), + original_request, + }; + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev)); + } + } + } + while let Poll::Ready(Some(_)) = self.outgoing.poll_next_unpin(cx) {} Poll::Pending }