This repository was archived by the owner on Nov 15, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 2.7k
Extra timeout handling in block_requests #5794
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,6 +30,7 @@ use crate::{ | |
| protocol::{api, message::{self, BlockAttributes}} | ||
| }; | ||
| use futures::{future::BoxFuture, prelude::*, stream::FuturesUnordered}; | ||
| use futures_timer::Delay; | ||
| use libp2p::{ | ||
| core::{ | ||
| ConnectedPoint, | ||
|
|
@@ -54,10 +55,11 @@ use prost::Message; | |
| use sp_runtime::{generic::BlockId, traits::{Block, Header, One, Zero}}; | ||
| use std::{ | ||
| cmp::min, | ||
| collections::VecDeque, | ||
| collections::{HashMap, VecDeque}, | ||
| io, | ||
| iter, | ||
| marker::PhantomData, | ||
| pin::Pin, | ||
| sync::Arc, | ||
| time::Duration, | ||
| task::{Context, Poll} | ||
|
|
@@ -77,6 +79,19 @@ pub enum Event<B: Block> { | |
| original_request: message::BlockRequest<B>, | ||
| response: message::BlockResponse<B>, | ||
| }, | ||
| /// A request has been cancelled because the peer has disconnected. | ||
| /// Disconnects can also happen as a result of violating the network protocol. | ||
| RequestCancelled { | ||
| peer: PeerId, | ||
| /// The original request passed to `send_request`. | ||
| original_request: message::BlockRequest<B>, | ||
| }, | ||
| /// A request has timed out. | ||
| RequestTimeout { | ||
| peer: PeerId, | ||
| /// The original request passed to `send_request`. | ||
| original_request: message::BlockRequest<B>, | ||
| } | ||
| } | ||
|
|
||
| /// Configuration options for `BlockRequests`. | ||
|
|
@@ -86,6 +101,7 @@ pub struct Config { | |
| max_request_len: usize, | ||
| max_response_len: usize, | ||
| inactivity_timeout: Duration, | ||
| request_timeout: Duration, | ||
| protocol: Bytes, | ||
| } | ||
|
|
||
|
|
@@ -96,12 +112,14 @@ impl Config { | |
| /// - max. request size = 1 MiB | ||
| /// - max. response size = 16 MiB | ||
| /// - inactivity timeout = 15s | ||
| /// - request timeout = 40s | ||
| pub fn new(id: &ProtocolId) -> Self { | ||
| let mut c = Config { | ||
| max_block_data_response: 128, | ||
| max_request_len: 1024 * 1024, | ||
| max_response_len: 16 * 1024 * 1024, | ||
| inactivity_timeout: Duration::from_secs(15), | ||
| request_timeout: Duration::from_secs(40), | ||
| protocol: Bytes::new(), | ||
| }; | ||
| c.set_protocol(id); | ||
|
|
@@ -149,12 +167,27 @@ pub struct BlockRequests<B: Block> { | |
| config: Config, | ||
| /// Blockchain client. | ||
| chain: Arc<dyn Client<B>>, | ||
| /// List of all active connections and the requests we've sent. | ||
| peers: HashMap<PeerId, Vec<Connection<B>>>, | ||
| /// Futures sending back the block request response. | ||
| outgoing: FuturesUnordered<BoxFuture<'static, ()>>, | ||
| /// Events to return as soon as possible from `poll`. | ||
| pending_events: VecDeque<NetworkBehaviourAction<OutboundProtocol<B>, Event<B>>>, | ||
| } | ||
|
|
||
| /// Local tracking of a libp2p connection. | ||
| #[derive(Debug)] | ||
| struct Connection<B: Block> { | ||
| id: ConnectionId, | ||
| ongoing_request: Option<OngoingRequest<B>>, | ||
| } | ||
|
|
||
| #[derive(Debug)] | ||
| struct OngoingRequest<B: Block> { | ||
| request: message::BlockRequest<B>, | ||
| timeout: Delay, | ||
| } | ||
|
|
||
| impl<B> BlockRequests<B> | ||
| where | ||
| B: Block, | ||
|
|
@@ -163,16 +196,38 @@ where | |
| BlockRequests { | ||
| config: cfg, | ||
| chain, | ||
| peers: HashMap::new(), | ||
| outgoing: FuturesUnordered::new(), | ||
| pending_events: VecDeque::new(), | ||
| } | ||
| } | ||
|
|
||
| /// Issue a new block request. | ||
| /// | ||
| /// Cancels any existing request targeting the same `PeerId`. | ||
| /// | ||
| /// If the response doesn't arrive in time, or if the remote answers improperly, the target | ||
| /// will be disconnected. | ||
| pub fn send_request(&mut self, target: &PeerId, req: message::BlockRequest<B>) { | ||
| // Determine which connection to send the request to. | ||
| let connection = if let Some(peer) = self.peers.get_mut(target) { | ||
| // We don't want to have multiple requests for any given node, so in priority try to | ||
| // find a connection with an existing request, to override it. | ||
| if let Some(entry) = peer.iter_mut().find(|c| c.ongoing_request.is_some()) { | ||
| entry | ||
| } else if let Some(entry) = peer.get_mut(0) { | ||
| entry | ||
| } else { | ||
| log::error!( | ||
| target: "sync", | ||
| "State inconsistency: empty list of peer connections" | ||
| ); | ||
| return; | ||
| } | ||
| } else { | ||
| return; | ||
| }; | ||
|
|
||
| let protobuf_rq = api::v1::BlockRequest { | ||
| fields: u32::from_be_bytes([req.fields.bits(), 0, 0, 0]), | ||
| from_block: match req.from { | ||
|
|
@@ -191,14 +246,31 @@ where | |
|
|
||
| let mut buf = Vec::with_capacity(protobuf_rq.encoded_len()); | ||
| if let Err(err) = protobuf_rq.encode(&mut buf) { | ||
| log::warn!("failed to encode block request {:?}: {:?}", protobuf_rq, err); | ||
| log::warn!( | ||
| target: "sync", | ||
| "Failed to encode block request {:?}: {:?}", | ||
| protobuf_rq, | ||
| err | ||
| ); | ||
| return; | ||
| } | ||
|
|
||
| log::trace!("enqueueing block request to {:?}: {:?}", target, protobuf_rq); | ||
| if let Some(rq) = &connection.ongoing_request { | ||
| log::debug!( | ||
| target: "sync", | ||
| "Replacing existing block request on connection {:?}", | ||
| connection.id | ||
| ); | ||
| } | ||
| connection.ongoing_request = Some(OngoingRequest { | ||
| request: req.clone(), | ||
| timeout: Delay::new(self.config.request_timeout), | ||
| }); | ||
|
|
||
| log::trace!(target: "sync", "Enqueueing block request to {:?}: {:?}", target, protobuf_rq); | ||
| self.pending_events.push_back(NetworkBehaviourAction::NotifyHandler { | ||
| peer_id: target.clone(), | ||
| handler: NotifyHandler::Any, | ||
| handler: NotifyHandler::One(connection.id), | ||
| event: OutboundProtocol { | ||
| request: buf, | ||
| original_request: req, | ||
|
|
@@ -215,7 +287,9 @@ where | |
| , request: &api::v1::BlockRequest | ||
| ) -> Result<api::v1::BlockResponse, Error> | ||
| { | ||
| log::trace!("block request from peer {}: from block {:?} to block {:?}, max blocks {:?}", | ||
| log::trace!( | ||
| target: "sync", | ||
| "Block request from peer {}: from block {:?} to block {:?}, max blocks {:?}", | ||
| peer, | ||
| request.from_block, | ||
| request.to_block, | ||
|
|
@@ -332,6 +406,7 @@ where | |
| }; | ||
| let mut cfg = OneShotHandlerConfig::default(); | ||
| cfg.inactive_timeout = self.config.inactivity_timeout; | ||
| cfg.substream_timeout = self.config.request_timeout; | ||
| OneShotHandler::new(SubstreamProtocol::new(p), cfg) | ||
| } | ||
|
|
||
|
|
@@ -345,34 +420,138 @@ where | |
| fn inject_disconnected(&mut self, _peer: &PeerId) { | ||
| } | ||
|
|
||
| fn inject_connection_established(&mut self, peer_id: &PeerId, id: &ConnectionId, _: &ConnectedPoint) { | ||
| self.peers.entry(peer_id.clone()) | ||
| .or_default() | ||
| .push(Connection { | ||
| id: *id, | ||
| ongoing_request: None, | ||
| }); | ||
| } | ||
|
|
||
| fn inject_connection_closed(&mut self, peer_id: &PeerId, id: &ConnectionId, _: &ConnectedPoint) { | ||
| let mut needs_remove = false; | ||
| if let Some(entry) = self.peers.get_mut(peer_id) { | ||
| if let Some(pos) = entry.iter().position(|i| i.id == *id) { | ||
| let ongoing_request = entry.remove(pos).ongoing_request; | ||
| if let Some(ongoing_request) = ongoing_request { | ||
| log::debug!( | ||
| target: "sync", | ||
| "Connection {:?} with {} closed with ongoing sync request: {:?}", | ||
| id, | ||
| peer_id, | ||
| ongoing_request | ||
| ); | ||
| let ev = Event::RequestCancelled { | ||
| peer: peer_id.clone(), | ||
| original_request: ongoing_request.request.clone(), | ||
| }; | ||
| self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(ev)); | ||
| } | ||
| if entry.is_empty() { | ||
| needs_remove = true; | ||
| } | ||
| } else { | ||
| log::error!( | ||
| target: "sync", | ||
| "State inconsistency: connection id not found in list" | ||
| ); | ||
| } | ||
| } else { | ||
| log::error!( | ||
| target: "sync", | ||
| "State inconsistency: peer_id not found in list of connections" | ||
| ); | ||
| } | ||
| if needs_remove { | ||
| self.peers.remove(peer_id); | ||
| } | ||
| } | ||
|
|
||
| fn inject_event( | ||
| &mut self, | ||
| peer: PeerId, | ||
| connection: ConnectionId, | ||
| connection_id: ConnectionId, | ||
| node_event: NodeEvent<B, NegotiatedSubstream> | ||
| ) { | ||
| match node_event { | ||
| NodeEvent::Request(request, mut stream) => { | ||
| match self.on_block_request(&peer, &request) { | ||
| Ok(res) => { | ||
| log::trace!("enqueueing block response for peer {} with {} blocks", peer, res.blocks.len()); | ||
| log::trace!( | ||
| target: "sync", | ||
| "Enqueueing block response for peer {} with {} blocks", | ||
| peer, res.blocks.len() | ||
| ); | ||
| let mut data = Vec::with_capacity(res.encoded_len()); | ||
| if let Err(e) = res.encode(&mut data) { | ||
| log::debug!("error encoding block response for peer {}: {}", peer, e) | ||
| log::debug!( | ||
| target: "sync", | ||
| "Error encoding block response for peer {}: {}", | ||
| peer, e | ||
| ) | ||
| } else { | ||
| let future = async move { | ||
| if let Err(e) = write_one(&mut stream, data).await { | ||
| log::debug!("error writing block response: {}", e) | ||
| log::debug!( | ||
| target: "sync", | ||
| "Error writing block response: {}", | ||
| e | ||
| ); | ||
| } | ||
| }; | ||
| self.outgoing.push(future.boxed()) | ||
| } | ||
| } | ||
| Err(e) => log::debug!("error handling block request from peer {}: {}", peer, e) | ||
| Err(e) => log::debug!( | ||
| target: "sync", | ||
| "Error handling block request from peer {}: {}", peer, e | ||
| ) | ||
| } | ||
| } | ||
| NodeEvent::Response(original_request, response) => { | ||
| log::trace!("received block response from peer {} with {} blocks", peer, response.blocks.len()); | ||
| log::trace!( | ||
| target: "sync", | ||
| "Received block response from peer {} with {} blocks", | ||
| peer, response.blocks.len() | ||
| ); | ||
| if let Some(connections) = self.peers.get_mut(&peer) { | ||
| if let Some(connection) = connections.iter_mut().find(|c| c.id == connection_id) { | ||
| if let Some(ongoing_request) = &mut connection.ongoing_request { | ||
| if ongoing_request.request == original_request { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this compare IDs or all of the request data? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It compares all the data, but that's basically just a bitfield of the requested fields, the source block hash or number, and the number of blocks. |
||
| connection.ongoing_request = None; | ||
| } else { | ||
| // We're no longer interested in that request. | ||
| log::debug!( | ||
| target: "sync", | ||
| "Received response from {} to obsolete block request {:?}", | ||
| peer, | ||
| original_request | ||
| ); | ||
| return; | ||
| } | ||
| } else { | ||
| // We remove from `self.peers` requests we're no longer interested in, | ||
| // so this can legitimately happen. | ||
| return; | ||
| } | ||
| } else { | ||
| log::error!( | ||
| target: "sync", | ||
| "State inconsistency: response on non-existing connection {:?}", | ||
| connection_id | ||
| ); | ||
| return; | ||
| } | ||
| } else { | ||
| log::error!( | ||
| target: "sync", | ||
| "State inconsistency: response on non-connected peer {}", | ||
| peer | ||
| ); | ||
| return; | ||
| } | ||
|
|
||
| let blocks = response.blocks.into_iter().map(|block_data| { | ||
| Ok(message::BlockData::<B> { | ||
| hash: Decode::decode(&mut block_data.hash.as_ref())?, | ||
|
|
@@ -419,7 +598,10 @@ where | |
| self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(ev)); | ||
| } | ||
| Err(err) => { | ||
| log::debug!("failed to decode block response from peer {}: {}", peer, err); | ||
| log::debug!( | ||
| target: "sync", | ||
| "Failed to decode block response from peer {}: {}", peer, err | ||
| ); | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -433,6 +615,31 @@ where | |
| return Poll::Ready(ev); | ||
| } | ||
|
|
||
| // Check the request timeouts. | ||
| for (peer, connections) in &mut self.peers { | ||
| for connection in connections { | ||
| let ongoing_request = match &mut connection.ongoing_request { | ||
| Some(rq) => rq, | ||
| None => continue, | ||
| }; | ||
|
|
||
| if let Poll::Ready(_) = Pin::new(&mut ongoing_request.timeout).poll(cx) { | ||
| let original_request = ongoing_request.request.clone(); | ||
| connection.ongoing_request = None; | ||
| log::debug!( | ||
| target: "sync", | ||
| "Request timeout for {}: {:?}", | ||
| peer, original_request | ||
| ); | ||
| let ev = Event::RequestTimeout { | ||
| peer: peer.clone(), | ||
| original_request, | ||
| }; | ||
| return Poll::Ready(NetworkBehaviourAction::GenerateEvent(ev)); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| while let Poll::Ready(Some(_)) = self.outgoing.poll_next_unpin(cx) {} | ||
| Poll::Pending | ||
| } | ||
|
|
||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is it safe to invalidate requests to a peer if more requests are sent to the same peer?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not a matter of safety or correctness. It's a design decision by the sync code to only allow one request per node.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From an API user point of view, previous requests are "cancelled", but internally we just forget about them. If a response comes and it doesn't match a known request, we discard that response.