diff --git a/client/cli/src/params/network_params.rs b/client/cli/src/params/network_params.rs index e36bcfca49ef7..d0ec3fb9f3f20 100644 --- a/client/cli/src/params/network_params.rs +++ b/client/cli/src/params/network_params.rs @@ -95,6 +95,11 @@ pub struct NetworkParams { /// By default this option is true for `--dev` and false otherwise. #[structopt(long)] pub discover_local: bool, + + /// Use the legacy "pre-mainnet-launch" networking protocol. Enable if things seem broken. + /// This option will be removed in the future. + #[structopt(long)] + pub legacy_network_protocol: bool, } impl NetworkParams { @@ -147,7 +152,8 @@ impl NetworkParams { use_yamux_flow_control: !self.no_yamux_flow_control, }, max_parallel_downloads: self.max_parallel_downloads, - allow_non_globals_in_dht: self.discover_local || is_dev + allow_non_globals_in_dht: self.discover_local || is_dev, + use_new_block_requests_protocol: !self.legacy_network_protocol, } } } diff --git a/client/network/src/behaviour.rs b/client/network/src/behaviour.rs index 14b2245be0a12..3186801e5fd21 100644 --- a/client/network/src/behaviour.rs +++ b/client/network/src/behaviour.rs @@ -19,13 +19,16 @@ use crate::{ debug_info, discovery::{DiscoveryBehaviour, DiscoveryConfig, DiscoveryOut}, Event, ObservedRole, DhtEvent, ExHashT, }; -use crate::protocol::{self, light_client_handler, message::Roles, CustomMessageOutcome, Protocol}; +use crate::protocol::{ + self, block_requests, light_client_handler, finality_requests, + message::{self, Roles}, CustomMessageOutcome, Protocol +}; use codec::Encode as _; use libp2p::NetworkBehaviour; use libp2p::core::{Multiaddr, PeerId, PublicKey}; use libp2p::kad::record; -use libp2p::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters, toggle::Toggle}; +use libp2p::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters}; use log::debug; use sp_consensus::{BlockOrigin, import_queue::{IncomingBlock, Origin}}; use sp_runtime::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId, Justification}; @@ -46,7 +49,7 @@ pub struct Behaviour { /// Block request handling. block_requests: protocol::BlockRequests, /// Finality proof request handling. - finality_proof_requests: Toggle>, + finality_proof_requests: protocol::FinalityProofRequests, /// Light client request handling. light_client_handler: protocol::LightClientHandler, @@ -77,7 +80,7 @@ impl Behaviour { user_agent: String, local_public_key: PublicKey, block_requests: protocol::BlockRequests, - finality_proof_requests: Option>, + finality_proof_requests: protocol::FinalityProofRequests, light_client_handler: protocol::LightClientHandler, disco_config: DiscoveryConfig, ) -> Self { @@ -86,7 +89,7 @@ impl Behaviour { debug_info: debug_info::DebugInfoBehaviour::new(user_agent, local_public_key.clone()), discovery: disco_config.finish(), block_requests, - finality_proof_requests: From::from(finality_proof_requests), + finality_proof_requests, light_client_handler, events: Vec::new(), role, @@ -216,6 +219,12 @@ Behaviour { self.events.push(BehaviourOut::JustificationImport(origin, hash, nb, justification)), CustomMessageOutcome::FinalityProofImport(origin, hash, nb, proof) => self.events.push(BehaviourOut::FinalityProofImport(origin, hash, nb, proof)), + CustomMessageOutcome::BlockRequest { target, request } => { + self.block_requests.send_request(&target, request); + }, + CustomMessageOutcome::FinalityProofRequest { target, block_hash, request } => { + self.finality_proof_requests.send_request(&target, block_hash, request); + }, CustomMessageOutcome::NotificationStreamOpened { remote, protocols, roles } => { let role = reported_roles_to_observed_role(&self.role, &remote, roles); for engine_id in protocols { @@ -245,6 +254,37 @@ Behaviour { } } +impl NetworkBehaviourEventProcess> for Behaviour { + fn inject_event(&mut self, event: block_requests::Event) { + match event { + block_requests::Event::Response { peer, original_request, response } => { + let ev = self.substrate.on_block_response(peer, original_request, response); + self.inject_event(ev); + } + } + } +} + +impl NetworkBehaviourEventProcess> for Behaviour { + fn inject_event(&mut self, event: finality_requests::Event) { + match event { + finality_requests::Event::Response { peer, block_hash, proof } => { + let response = message::FinalityProofResponse { + id: 0, + block: block_hash, + proof: if !proof.is_empty() { + Some(proof) + } else { + None + }, + }; + let ev = self.substrate.on_finality_proof_response(peer, response); + self.inject_event(ev); + } + } + } +} + impl NetworkBehaviourEventProcess for Behaviour { fn inject_event(&mut self, event: debug_info::DebugInfoEvent) { diff --git a/client/network/src/config.rs b/client/network/src/config.rs index 84e2da7018a02..66800aeeaf8d2 100644 --- a/client/network/src/config.rs +++ b/client/network/src/config.rs @@ -399,7 +399,10 @@ pub struct NetworkConfiguration { /// Maximum number of peers to ask the same blocks in parallel. pub max_parallel_downloads: u32, /// Should we insert non-global addresses into the DHT? - pub allow_non_globals_in_dht: bool + pub allow_non_globals_in_dht: bool, + /// If true, uses the `//block-requests/` experimental protocol rather than + /// the legacy substream. This option is meant to be hard-wired to `true` in the future. + pub use_new_block_requests_protocol: bool, } impl NetworkConfiguration { @@ -430,7 +433,8 @@ impl NetworkConfiguration { use_yamux_flow_control: false, }, max_parallel_downloads: 5, - allow_non_globals_in_dht: false + allow_non_globals_in_dht: false, + use_new_block_requests_protocol: true, } } } diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index dca74c8d60749..8222767e1a1e6 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -237,6 +237,9 @@ pub struct Protocol { metrics: Option, /// The `PeerId`'s of all boot nodes. boot_node_ids: Arc>, + /// If true, we send back requests as `CustomMessageOutcome` events. If false, we directly + /// dispatch requests using the legacy substream. + use_new_block_requests_protocol: bool, } #[derive(Default)] @@ -357,6 +360,7 @@ impl Protocol { block_announce_validator: Box + Send>, metrics_registry: Option<&Registry>, boot_node_ids: Arc>, + use_new_block_requests_protocol: bool, queue_size_report: Option, ) -> error::Result<(Protocol, sc_peerset::PeersetHandle)> { let info = chain.info(); @@ -433,6 +437,7 @@ impl Protocol { None }, boot_node_ids, + use_new_block_requests_protocol, }; Ok((protocol, peerset_handle)) @@ -517,6 +522,8 @@ impl Protocol { self.sync.num_sync_requests() } + /// Accepts a response from the legacy substream and determines what the corresponding + /// request was. fn handle_response( &mut self, who: PeerId, @@ -806,7 +813,9 @@ impl Protocol { self.peerset_handle.report_peer(who, reputation) } - fn on_block_response( + /// Must be called in response to a [`CustomMessageOutcome::BlockRequest`] being emitted. + /// Must contain the same `PeerId` and request that have been emitted. + pub fn on_block_response( &mut self, peer: PeerId, request: message::BlockRequest, @@ -857,8 +866,15 @@ impl Protocol { Ok(sync::OnBlockData::Import(origin, blocks)) => CustomMessageOutcome::BlockImport(origin, blocks), Ok(sync::OnBlockData::Request(peer, req)) => { - self.send_request(&peer, GenericMessage::BlockRequest(req)); - CustomMessageOutcome::None + if self.use_new_block_requests_protocol { + CustomMessageOutcome::BlockRequest { + target: peer, + request: req, + } + } else { + self.send_request(&peer, GenericMessage::BlockRequest(req)); + CustomMessageOutcome::None + } } Err(sync::BadPeer(id, repu)) => { self.behaviour.disconnect_peer(&id); @@ -1025,7 +1041,16 @@ impl Protocol { if info.roles.is_full() { match self.sync.new_peer(who.clone(), info.best_hash, info.best_number) { Ok(None) => (), - Ok(Some(req)) => self.send_request(&who, GenericMessage::BlockRequest(req)), + Ok(Some(req)) => { + if self.use_new_block_requests_protocol { + self.pending_messages.push_back(CustomMessageOutcome::BlockRequest { + target: who.clone(), + request: req, + }); + } else { + self.send_request(&who, GenericMessage::BlockRequest(req)) + } + }, Err(sync::BadPeer(id, repu)) => { self.behaviour.disconnect_peer(&id); self.peerset_handle.report_peer(id, repu) @@ -1327,29 +1352,30 @@ impl Protocol { ], }, ); + + if is_their_best { + self.pending_messages.push_back(CustomMessageOutcome::PeerNewBest(who, number)); + } + match blocks_to_import { Ok(sync::OnBlockData::Import(origin, blocks)) => { - if is_their_best { - self.pending_messages.push_back(CustomMessageOutcome::PeerNewBest(who, number)); - } CustomMessageOutcome::BlockImport(origin, blocks) }, Ok(sync::OnBlockData::Request(peer, req)) => { - self.send_request(&peer, GenericMessage::BlockRequest(req)); - if is_their_best { - CustomMessageOutcome::PeerNewBest(who, number) + if self.use_new_block_requests_protocol { + CustomMessageOutcome::BlockRequest { + target: peer, + request: req, + } } else { + self.send_request(&peer, GenericMessage::BlockRequest(req)); CustomMessageOutcome::None } } Err(sync::BadPeer(id, repu)) => { self.behaviour.disconnect_peer(&id); self.peerset_handle.report_peer(id, repu); - if is_their_best { - CustomMessageOutcome::PeerNewBest(who, number) - } else { - CustomMessageOutcome::None - } + CustomMessageOutcome::None } } } @@ -1443,14 +1469,21 @@ impl Protocol { for result in results { match result { Ok((id, req)) => { - let msg = GenericMessage::BlockRequest(req); - send_request( - &mut self.behaviour, - &mut self.context_data.stats, - &mut self.context_data.peers, - &id, - msg - ) + if self.use_new_block_requests_protocol { + self.pending_messages.push_back(CustomMessageOutcome::BlockRequest { + target: id, + request: req, + }); + } else { + let msg = GenericMessage::BlockRequest(req); + send_request( + &mut self.behaviour, + &mut self.context_data.stats, + &mut self.context_data.peers, + &id, + msg + ) + } } Err(sync::BadPeer(id, repu)) => { self.behaviour.disconnect_peer(&id); @@ -1719,7 +1752,9 @@ impl Protocol { ); } - fn on_finality_proof_response( + /// Must be called after a [`CustomMessageOutcome::FinalityProofRequest`] has been emitted, + /// to notify of the response having arrived. + pub fn on_finality_proof_response( &mut self, who: PeerId, response: message::FinalityProofResponse, @@ -1798,6 +1833,7 @@ impl Protocol { /// Outcome of an incoming custom message. #[derive(Debug)] +#[must_use] pub enum CustomMessageOutcome { BlockImport(BlockOrigin, Vec>), JustificationImport(Origin, B::Hash, NumberFor, Justification), @@ -1808,6 +1844,18 @@ pub enum CustomMessageOutcome { NotificationStreamClosed { remote: PeerId, protocols: Vec }, /// Messages have been received on one or more notifications protocols. NotificationsReceived { remote: PeerId, messages: Vec<(ConsensusEngineId, Bytes)> }, + /// A new block request must be emitted. + /// Once you have the response, you must call `Protocol::on_block_response`. + /// It is the responsibility of the handler to ensure that a timeout exists. + /// If the request times out, or the peer responds in an invalid way, the peer has to be + /// disconnect. This will inform the state machine that the request it has emitted is stale. + BlockRequest { target: PeerId, request: message::BlockRequest }, + /// A new finality proof request must be emitted. + /// Once you have the response, you must call `Protocol::on_finality_proof_response`. + /// It is the responsibility of the handler to ensure that a timeout exists. + /// If the request times out, or the peer responds in an invalid way, the peer has to be + /// disconnect. This will inform the state machine that the request it has emitted is stale. + FinalityProofRequest { target: PeerId, block_hash: B::Hash, request: Vec }, /// Peer has a reported a new head of chain. PeerNewBest(PeerId, NumberFor), None, @@ -1912,30 +1960,55 @@ impl NetworkBehaviour for Protocol { } for (id, r) in self.sync.block_requests() { - send_request( - &mut self.behaviour, - &mut self.context_data.stats, - &mut self.context_data.peers, - &id, - GenericMessage::BlockRequest(r) - ) + if self.use_new_block_requests_protocol { + let event = CustomMessageOutcome::BlockRequest { + target: id, + request: r, + }; + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); + } else { + send_request( + &mut self.behaviour, + &mut self.context_data.stats, + &mut self.context_data.peers, + &id, + GenericMessage::BlockRequest(r) + ) + } } for (id, r) in self.sync.justification_requests() { - send_request( - &mut self.behaviour, - &mut self.context_data.stats, - &mut self.context_data.peers, - &id, - GenericMessage::BlockRequest(r) - ) + if self.use_new_block_requests_protocol { + let event = CustomMessageOutcome::BlockRequest { + target: id, + request: r, + }; + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); + } else { + send_request( + &mut self.behaviour, + &mut self.context_data.stats, + &mut self.context_data.peers, + &id, + GenericMessage::BlockRequest(r) + ) + } } for (id, r) in self.sync.finality_proof_requests() { - send_request( - &mut self.behaviour, - &mut self.context_data.stats, - &mut self.context_data.peers, - &id, - GenericMessage::FinalityProofRequest(r)) + if self.use_new_block_requests_protocol { + let event = CustomMessageOutcome::FinalityProofRequest { + target: id, + block_hash: r.block, + request: r.request, + }; + return Poll::Ready(NetworkBehaviourAction::GenerateEvent(event)); + } else { + send_request( + &mut self.behaviour, + &mut self.context_data.stats, + &mut self.context_data.peers, + &id, + GenericMessage::FinalityProofRequest(r)) + } } let event = match self.behaviour.poll(cx, params) { @@ -2082,6 +2155,7 @@ mod tests { Box::new(DefaultBlockAnnounceValidator::new(client.clone())), None, Default::default(), + true, None, ).unwrap(); diff --git a/client/network/src/protocol/block_requests.rs b/client/network/src/protocol/block_requests.rs index 6af5023d39fe6..f1981171af33d 100644 --- a/client/network/src/protocol/block_requests.rs +++ b/client/network/src/protocol/block_requests.rs @@ -27,7 +27,7 @@ use codec::{Encode, Decode}; use crate::{ chain::Client, config::ProtocolId, - protocol::{api, message::BlockAttributes} + protocol::{api, message::{self, BlockAttributes}} }; use futures::{future::BoxFuture, prelude::*, stream::FuturesUnordered}; use libp2p::{ @@ -36,13 +36,14 @@ use libp2p::{ Multiaddr, PeerId, connection::ConnectionId, - upgrade::{InboundUpgrade, ReadOneError, UpgradeInfo, Negotiated}, + upgrade::{InboundUpgrade, OutboundUpgrade, ReadOneError, UpgradeInfo, Negotiated}, upgrade::{DeniedUpgrade, read_one, write_one} }, swarm::{ NegotiatedSubstream, NetworkBehaviour, NetworkBehaviourAction, + NotifyHandler, OneShotHandler, OneShotHandlerConfig, PollParameters, @@ -53,8 +54,10 @@ use prost::Message; use sp_runtime::{generic::BlockId, traits::{Block, Header, One, Zero}}; use std::{ cmp::min, + collections::VecDeque, io, iter, + marker::PhantomData, sync::Arc, time::Duration, task::{Context, Poll} @@ -64,11 +67,24 @@ use void::{Void, unreachable}; // Type alias for convenience. pub type Error = Box; +/// Event generated by the block requests behaviour. +#[derive(Debug)] +pub enum Event { + /// A response to a block request has arrived. + Response { + peer: PeerId, + /// The original request passed to `send_request`. + original_request: message::BlockRequest, + response: message::BlockResponse, + }, +} + /// Configuration options for `BlockRequests`. #[derive(Debug, Clone)] pub struct Config { max_block_data_response: u32, max_request_len: usize, + max_response_len: usize, inactivity_timeout: Duration, protocol: Bytes, } @@ -78,11 +94,13 @@ impl Config { /// /// - max. block data in response = 128 /// - max. request size = 1 MiB + /// - max. response size = 16 MiB /// - inactivity timeout = 15s pub fn new(id: &ProtocolId) -> Self { let mut c = Config { max_block_data_response: 128, max_request_len: 1024 * 1024, + max_response_len: 16 * 1024 * 1024, inactivity_timeout: Duration::from_secs(15), protocol: Bytes::new(), }; @@ -102,6 +120,12 @@ impl Config { self } + /// Limit the max. size of responses to our block requests. + pub fn set_max_response_len(&mut self, v: usize) -> &mut Self { + self.max_response_len = v; + self + } + /// Limit the max. duration the substream may remain inactive before closing it. pub fn set_inactivity_timeout(&mut self, v: Duration) -> &mut Self { self.inactivity_timeout = v; @@ -127,6 +151,8 @@ pub struct BlockRequests { chain: Arc>, /// Futures sending back the block request response. outgoing: FuturesUnordered>, + /// Events to return as soon as possible from `poll`. + pending_events: VecDeque, Event>>, } impl BlockRequests @@ -138,9 +164,50 @@ where config: cfg, chain, outgoing: FuturesUnordered::new(), + pending_events: VecDeque::new(), } } + /// Issue a new block request. + /// + /// If the response doesn't arrive in time, or if the remote answers improperly, the target + /// will be disconnected. + pub fn send_request(&mut self, target: &PeerId, req: message::BlockRequest) { + let protobuf_rq = api::v1::BlockRequest { + fields: u32::from_be_bytes([req.fields.bits(), 0, 0, 0]), + from_block: match req.from { + message::FromBlock::Hash(h) => + Some(api::v1::block_request::FromBlock::Hash(h.encode())), + message::FromBlock::Number(n) => + Some(api::v1::block_request::FromBlock::Number(n.encode())), + }, + to_block: req.to.map(|h| h.encode()).unwrap_or_default(), + direction: match req.direction { + message::Direction::Ascending => api::v1::Direction::Ascending as i32, + message::Direction::Descending => api::v1::Direction::Descending as i32, + }, + max_blocks: req.max.unwrap_or(0), + }; + + let mut buf = Vec::with_capacity(protobuf_rq.encoded_len()); + if let Err(err) = protobuf_rq.encode(&mut buf) { + log::warn!("failed to encode block request {:?}: {:?}", protobuf_rq, err); + return; + } + + log::trace!("enqueueing block request to {:?}: {:?}", target, protobuf_rq); + self.pending_events.push_back(NetworkBehaviourAction::NotifyHandler { + peer_id: target.clone(), + handler: NotifyHandler::Any, + event: OutboundProtocol { + request: buf, + original_request: req, + max_response_size: self.config.max_response_len, + protocol: self.config.protocol.clone(), + }, + }); + } + /// Callback, invoked when a new block request has been received from remote. fn on_block_request ( &mut self @@ -202,6 +269,12 @@ where let number = header.number().clone(); let hash = header.hash(); let parent_hash = header.parent_hash().clone(); + let justification = if get_justification { + self.chain.justification(&BlockId::Hash(hash))? + } else { + None + }; + let is_empty_justification = justification.as_ref().map(|j| j.is_empty()).unwrap_or(false); let block_data = api::v1::BlockData { hash: hash.encode(), @@ -221,11 +294,8 @@ where }, receipt: Vec::new(), message_queue: Vec::new(), - justification: if get_justification { - self.chain.justification(&BlockId::Hash(hash))?.unwrap_or(Vec::new()) - } else { - Vec::new() - } + justification: justification.unwrap_or(Vec::new()), + is_empty_justification, }; blocks.push(block_data); @@ -251,13 +321,14 @@ impl NetworkBehaviour for BlockRequests where B: Block { - type ProtocolsHandler = OneShotHandler>; - type OutEvent = Void; + type ProtocolsHandler = OneShotHandler, OutboundProtocol, NodeEvent>; + type OutEvent = Event; fn new_handler(&mut self) -> Self::ProtocolsHandler { - let p = Protocol { + let p = InboundProtocol { max_request_len: self.config.max_request_len, protocol: self.config.protocol.clone(), + marker: PhantomData, }; let mut cfg = OneShotHandlerConfig::default(); cfg.inactive_timeout = self.config.inactivity_timeout; @@ -278,44 +349,102 @@ where &mut self, peer: PeerId, connection: ConnectionId, - Request(request, mut stream): Request + node_event: NodeEvent ) { - match self.on_block_request(&peer, &request) { - Ok(res) => { - log::trace!("enqueueing block response for peer {} with {} blocks", peer, res.blocks.len()); - let mut data = Vec::with_capacity(res.encoded_len()); - if let Err(e) = res.encode(&mut data) { - log::debug!("error encoding block response for peer {}: {}", peer, e) - } else { - let future = async move { - if let Err(e) = write_one(&mut stream, data).await { - log::debug!("error writing block response: {}", e) + match node_event { + NodeEvent::Request(request, mut stream) => { + match self.on_block_request(&peer, &request) { + Ok(res) => { + log::trace!("enqueueing block response for peer {} with {} blocks", peer, res.blocks.len()); + let mut data = Vec::with_capacity(res.encoded_len()); + if let Err(e) = res.encode(&mut data) { + log::debug!("error encoding block response for peer {}: {}", peer, e) + } else { + let future = async move { + if let Err(e) = write_one(&mut stream, data).await { + log::debug!("error writing block response: {}", e) + } + }; + self.outgoing.push(future.boxed()) } - }; - self.outgoing.push(future.boxed()) + } + Err(e) => log::debug!("error handling block request from peer {}: {}", peer, e) + } + } + NodeEvent::Response(original_request, response) => { + log::trace!("received block response from peer {} with {} blocks", peer, response.blocks.len()); + let blocks = response.blocks.into_iter().map(|block_data| { + Ok(message::BlockData:: { + hash: Decode::decode(&mut block_data.hash.as_ref())?, + header: if !block_data.header.is_empty() { + Some(Decode::decode(&mut block_data.header.as_ref())?) + } else { + None + }, + body: if original_request.fields.contains(message::BlockAttributes::BODY) { + Some(block_data.body.iter().map(|body| { + Decode::decode(&mut body.as_ref()) + }).collect::, _>>()?) + } else { + None + }, + receipt: if !block_data.message_queue.is_empty() { + Some(block_data.receipt) + } else { + None + }, + message_queue: if !block_data.message_queue.is_empty() { + Some(block_data.message_queue) + } else { + None + }, + justification: if !block_data.justification.is_empty() { + Some(block_data.justification) + } else if block_data.is_empty_justification { + Some(Vec::new()) + } else { + None + }, + }) + }).collect::, codec::Error>>(); + + match blocks { + Ok(blocks) => { + let id = original_request.id; + let ev = Event::Response { + peer, + original_request, + response: message::BlockResponse:: { id, blocks }, + }; + self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(ev)); + } + Err(err) => { + log::debug!("failed to decode block response from peer {}: {}", peer, err); + } } } - Err(e) => log::debug!("error handling block request from peer {}: {}", peer, e) } } - fn poll(&mut self, cx: &mut Context, _: &mut impl PollParameters) -> Poll> { + fn poll(&mut self, cx: &mut Context, _: &mut impl PollParameters) + -> Poll, Event>> + { + if let Some(ev) = self.pending_events.pop_front() { + return Poll::Ready(ev); + } + while let Poll::Ready(Some(_)) = self.outgoing.poll_next_unpin(cx) {} Poll::Pending } } -/// The incoming block request. -/// -/// Holds the protobuf value and the connection substream which made the -/// request and over which to send the response. +/// Output type of inbound and outbound substream upgrades. #[derive(Debug)] -pub struct Request(api::v1::BlockRequest, T); - -impl From for Request { - fn from(v: Void) -> Self { - unreachable(v) - } +pub enum NodeEvent { + /// Incoming request from remote and substream to use for the response. + Request(api::v1::BlockRequest, T), + /// Incoming response from remote. + Response(message::BlockRequest, api::v1::BlockResponse), } /// Substream upgrade protocol. @@ -325,36 +454,39 @@ impl From for Request { /// will become visible via `inject_node_event` which then dispatches to the /// relevant callback to process the message and prepare a response. #[derive(Debug, Clone)] -pub struct Protocol { +pub struct InboundProtocol { /// The max. request length in bytes. max_request_len: usize, /// The protocol to use during upgrade negotiation. protocol: Bytes, + /// Type of the block. + marker: PhantomData, } -impl UpgradeInfo for Protocol { - type Info = Bytes; - type InfoIter = iter::Once; +impl UpgradeInfo for InboundProtocol { + type Info = Bytes; + type InfoIter = iter::Once; - fn protocol_info(&self) -> Self::InfoIter { - iter::once(self.protocol.clone()) - } + fn protocol_info(&self) -> Self::InfoIter { + iter::once(self.protocol.clone()) + } } -impl InboundUpgrade for Protocol +impl InboundUpgrade for InboundProtocol where + B: Block, T: AsyncRead + AsyncWrite + Unpin + Send + 'static { - type Output = Request; - type Error = ReadOneError; - type Future = BoxFuture<'static, Result>; + type Output = NodeEvent; + type Error = ReadOneError; + type Future = BoxFuture<'static, Result>; - fn upgrade_inbound(self, mut s: T, _: Self::Info) -> Self::Future { + fn upgrade_inbound(self, mut s: T, _: Self::Info) -> Self::Future { let future = async move { let len = self.max_request_len; let vec = read_one(&mut s, len).await?; match api::v1::BlockRequest::decode(&vec[..]) { - Ok(r) => Ok(Request(r, s)), + Ok(r) => Ok(NodeEvent::Request(r, s)), Err(e) => Err(ReadOneError::Io(io::Error::new(io::ErrorKind::Other, e))) } }; @@ -362,3 +494,49 @@ where } } +/// Substream upgrade protocol. +/// +/// Sends a request to remote and awaits the response. +#[derive(Debug, Clone)] +pub struct OutboundProtocol { + /// The serialized protobuf request. + request: Vec, + /// The original request. Passed back through the API when the response comes back. + original_request: message::BlockRequest, + /// The max. response length in bytes. + max_response_size: usize, + /// The protocol to use for upgrade negotiation. + protocol: Bytes, +} + +impl UpgradeInfo for OutboundProtocol { + type Info = Bytes; + type InfoIter = iter::Once; + + fn protocol_info(&self) -> Self::InfoIter { + iter::once(self.protocol.clone()) + } +} + +impl OutboundUpgrade for OutboundProtocol +where + B: Block, + T: AsyncRead + AsyncWrite + Unpin + Send + 'static +{ + type Output = NodeEvent; + type Error = ReadOneError; + type Future = BoxFuture<'static, Result>; + + fn upgrade_outbound(self, mut s: T, _: Self::Info) -> Self::Future { + async move { + write_one(&mut s, &self.request).await?; + let vec = read_one(&mut s, self.max_response_size).await?; + + api::v1::BlockResponse::decode(&vec[..]) + .map(|r| NodeEvent::Response(self.original_request, r)) + .map_err(|e| { + ReadOneError::Io(io::Error::new(io::ErrorKind::Other, e)) + }) + }.boxed() + } +} diff --git a/client/network/src/protocol/finality_requests.rs b/client/network/src/protocol/finality_requests.rs index b12b79f41bc9a..0616759617954 100644 --- a/client/network/src/protocol/finality_requests.rs +++ b/client/network/src/protocol/finality_requests.rs @@ -27,7 +27,7 @@ use codec::{Encode, Decode}; use crate::{ chain::FinalityProofProvider, config::ProtocolId, - protocol::{api, message::BlockAttributes} + protocol::{api, message} }; use futures::{future::BoxFuture, prelude::*, stream::FuturesUnordered}; use libp2p::{ @@ -36,13 +36,14 @@ use libp2p::{ Multiaddr, PeerId, connection::ConnectionId, - upgrade::{InboundUpgrade, ReadOneError, UpgradeInfo, Negotiated}, + upgrade::{InboundUpgrade, OutboundUpgrade, ReadOneError, UpgradeInfo, Negotiated}, upgrade::{DeniedUpgrade, read_one, write_one} }, swarm::{ NegotiatedSubstream, NetworkBehaviour, NetworkBehaviourAction, + NotifyHandler, OneShotHandler, OneShotHandlerConfig, PollParameters, @@ -53,8 +54,10 @@ use prost::Message; use sp_runtime::{generic::BlockId, traits::{Block, Header, One, Zero}}; use std::{ cmp::min, + collections::VecDeque, io, iter, + marker::PhantomData, sync::Arc, time::Duration, task::{Context, Poll} @@ -64,10 +67,24 @@ use void::{Void, unreachable}; // Type alias for convenience. pub type Error = Box; +/// Event generated by the finality proof requests behaviour. +#[derive(Debug)] +pub enum Event { + /// A response to a finality proof request has arrived. + Response { + peer: PeerId, + /// Block hash originally passed to `send_request`. + block_hash: B::Hash, + /// Finality proof returned by the remote. + proof: Vec, + }, +} + /// Configuration options for `FinalityProofRequests`. #[derive(Debug, Clone)] pub struct Config { max_request_len: usize, + max_response_len: usize, inactivity_timeout: Duration, protocol: Bytes, } @@ -76,10 +93,12 @@ impl Config { /// Create a fresh configuration with the following options: /// /// - max. request size = 1 MiB + /// - max. response size = 1 MiB /// - inactivity timeout = 15s pub fn new(id: &ProtocolId) -> Self { let mut c = Config { max_request_len: 1024 * 1024, + max_response_len: 1024 * 1024, inactivity_timeout: Duration::from_secs(15), protocol: Bytes::new(), }; @@ -93,6 +112,12 @@ impl Config { self } + /// Limit the max. length of incoming finality proof response bytes. + pub fn set_max_response_len(&mut self, v: usize) -> &mut Self { + self.max_response_len = v; + self + } + /// Limit the max. duration the substream may remain inactive before closing it. pub fn set_inactivity_timeout(&mut self, v: Duration) -> &mut Self { self.inactivity_timeout = v; @@ -115,9 +140,11 @@ pub struct FinalityProofRequests { /// This behaviour's configuration. config: Config, /// How to construct finality proofs. - finality_proof_provider: Arc>, + finality_proof_provider: Option>>, /// Futures sending back the finality proof request responses. outgoing: FuturesUnordered>, + /// Events to return as soon as possible from `poll`. + pending_events: VecDeque, Event>>, } impl FinalityProofRequests @@ -125,14 +152,47 @@ where B: Block, { /// Initializes the behaviour. - pub fn new(cfg: Config, finality_proof_provider: Arc>) -> Self { + /// + /// If the proof provider is `None`, then the behaviour will not support the finality proof + /// requests protocol. + pub fn new(cfg: Config, finality_proof_provider: Option>>) -> Self { FinalityProofRequests { config: cfg, finality_proof_provider, outgoing: FuturesUnordered::new(), + pending_events: VecDeque::new(), } } + /// Issue a new finality proof request. + /// + /// If the response doesn't arrive in time, or if the remote answers improperly, the target + /// will be disconnected. + pub fn send_request(&mut self, target: &PeerId, block_hash: B::Hash, request: Vec) { + let protobuf_rq = api::v1::finality::FinalityProofRequest { + block_hash: block_hash.encode(), + request, + }; + + let mut buf = Vec::with_capacity(protobuf_rq.encoded_len()); + if let Err(err) = protobuf_rq.encode(&mut buf) { + log::warn!("failed to encode finality proof request {:?}: {:?}", protobuf_rq, err); + return; + } + + log::trace!("enqueueing finality proof request to {:?}: {:?}", target, protobuf_rq); + self.pending_events.push_back(NetworkBehaviourAction::NotifyHandler { + peer_id: target.clone(), + handler: NotifyHandler::Any, + event: OutboundProtocol { + request: buf, + block_hash, + max_response_size: self.config.max_response_len, + protocol: self.config.protocol.clone(), + }, + }); + } + /// Callback, invoked when a new finality request has been received from remote. fn on_finality_request(&mut self, peer: &PeerId, request: &api::v1::finality::FinalityProofRequest) -> Result @@ -141,10 +201,15 @@ where log::trace!(target: "sync", "Finality proof request from {} for {}", peer, block_hash); - let finality_proof = self.finality_proof_provider - .prove_finality(block_hash, &request.request)? - .unwrap_or(Vec::new()); // Note that an empty Vec is sent if no proof is available. + let finality_proof = if let Some(provider) = &self.finality_proof_provider { + provider + .prove_finality(block_hash, &request.request)? + .unwrap_or(Vec::new()) + } else { + log::error!("Answering a finality proof request while finality provider is empty"); + return Err(From::from("Empty finality proof provider".to_string())) + }; Ok(api::v1::finality::FinalityProofResponse { proof: finality_proof }) } @@ -154,13 +219,18 @@ impl NetworkBehaviour for FinalityProofRequests where B: Block { - type ProtocolsHandler = OneShotHandler>; - type OutEvent = Void; + type ProtocolsHandler = OneShotHandler, OutboundProtocol, NodeEvent>; + type OutEvent = Event; fn new_handler(&mut self) -> Self::ProtocolsHandler { - let p = Protocol { + let p = InboundProtocol { max_request_len: self.config.max_request_len, - protocol: self.config.protocol.clone(), + protocol: if self.finality_proof_provider.is_some() { + Some(self.config.protocol.clone()) + } else { + None + }, + marker: PhantomData, }; let mut cfg = OneShotHandlerConfig::default(); cfg.inactive_timeout = self.config.inactivity_timeout; @@ -181,44 +251,58 @@ where &mut self, peer: PeerId, connection: ConnectionId, - Request(request, mut stream): Request + event: NodeEvent ) { - match self.on_finality_request(&peer, &request) { - Ok(res) => { - log::trace!("enqueueing finality response for peer {}", peer); - let mut data = Vec::with_capacity(res.encoded_len()); - if let Err(e) = res.encode(&mut data) { - log::debug!("error encoding finality response for peer {}: {}", peer, e) - } else { - let future = async move { - if let Err(e) = write_one(&mut stream, data).await { - log::debug!("error writing finality response: {}", e) + match event { + NodeEvent::Request(request, mut stream) => { + match self.on_finality_request(&peer, &request) { + Ok(res) => { + log::trace!("enqueueing finality response for peer {}", peer); + let mut data = Vec::with_capacity(res.encoded_len()); + if let Err(e) = res.encode(&mut data) { + log::debug!("error encoding finality response for peer {}: {}", peer, e) + } else { + let future = async move { + if let Err(e) = write_one(&mut stream, data).await { + log::debug!("error writing finality response: {}", e) + } + }; + self.outgoing.push(future.boxed()) } - }; - self.outgoing.push(future.boxed()) + } + Err(e) => log::debug!("error handling finality request from peer {}: {}", peer, e) } } - Err(e) => log::debug!("error handling finality request from peer {}: {}", peer, e) + NodeEvent::Response(response, block_hash) => { + let ev = Event::Response { + peer, + block_hash, + proof: response.proof, + }; + self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(ev)); + } } } - fn poll(&mut self, cx: &mut Context, _: &mut impl PollParameters) -> Poll> { + fn poll(&mut self, cx: &mut Context, _: &mut impl PollParameters) + -> Poll, Event>> + { + if let Some(ev) = self.pending_events.pop_front() { + return Poll::Ready(ev); + } + while let Poll::Ready(Some(_)) = self.outgoing.poll_next_unpin(cx) {} Poll::Pending } } -/// The incoming finality proof request. -/// -/// Holds the protobuf value and the connection substream which made the -/// request and over which to send the response. +/// Output type of inbound and outbound substream upgrades. #[derive(Debug)] -pub struct Request(api::v1::finality::FinalityProofRequest, T); - -impl From for Request { - fn from(v: Void) -> Self { - unreachable(v) - } +pub enum NodeEvent { + /// Incoming request from remote and substream to use for the response. + Request(api::v1::finality::FinalityProofRequest, T), + /// Incoming response from remote. + Response(api::v1::finality::FinalityProofResponse, B::Hash), } /// Substream upgrade protocol. @@ -228,27 +312,33 @@ impl From for Request { /// will become visible via `inject_node_event` which then dispatches to the /// relevant callback to process the message and prepare a response. #[derive(Debug, Clone)] -pub struct Protocol { +pub struct InboundProtocol { /// The max. request length in bytes. max_request_len: usize, - /// The protocol to use during upgrade negotiation. - protocol: Bytes, + /// The protocol to use during upgrade negotiation. If `None`, then the incoming protocol + /// is simply disabled. + protocol: Option, + /// Marker to pin the block type. + marker: PhantomData, } -impl UpgradeInfo for Protocol { +impl UpgradeInfo for InboundProtocol { type Info = Bytes; - type InfoIter = iter::Once; + // This iterator will return either 0 elements if `self.protocol` is `None`, or 1 element if + // it is `Some`. + type InfoIter = std::option::IntoIter; fn protocol_info(&self) -> Self::InfoIter { - iter::once(self.protocol.clone()) + self.protocol.clone().into_iter() } } -impl InboundUpgrade for Protocol +impl InboundUpgrade for InboundProtocol where + B: Block, T: AsyncRead + AsyncWrite + Unpin + Send + 'static { - type Output = Request; + type Output = NodeEvent; type Error = ReadOneError; type Future = BoxFuture<'static, Result>; @@ -257,10 +347,56 @@ where let len = self.max_request_len; let vec = read_one(&mut s, len).await?; match api::v1::finality::FinalityProofRequest::decode(&vec[..]) { - Ok(r) => Ok(Request(r, s)), + Ok(r) => Ok(NodeEvent::Request(r, s)), Err(e) => Err(ReadOneError::Io(io::Error::new(io::ErrorKind::Other, e))) } }.boxed() } } +/// Substream upgrade protocol. +/// +/// Sends a request to remote and awaits the response. +#[derive(Debug, Clone)] +pub struct OutboundProtocol { + /// The serialized protobuf request. + request: Vec, + /// Block hash that has been requested. + block_hash: B::Hash, + /// The max. response length in bytes. + max_response_size: usize, + /// The protocol to use for upgrade negotiation. + protocol: Bytes, +} + +impl UpgradeInfo for OutboundProtocol { + type Info = Bytes; + type InfoIter = iter::Once; + + fn protocol_info(&self) -> Self::InfoIter { + iter::once(self.protocol.clone()) + } +} + +impl OutboundUpgrade for OutboundProtocol +where + B: Block, + T: AsyncRead + AsyncWrite + Unpin + Send + 'static +{ + type Output = NodeEvent; + type Error = ReadOneError; + type Future = BoxFuture<'static, Result>; + + fn upgrade_outbound(self, mut s: T, _: Self::Info) -> Self::Future { + async move { + write_one(&mut s, &self.request).await?; + let vec = read_one(&mut s, self.max_response_size).await?; + + api::v1::finality::FinalityProofResponse::decode(&vec[..]) + .map(|r| NodeEvent::Response(r, self.block_hash)) + .map_err(|e| { + ReadOneError::Io(io::Error::new(io::ErrorKind::Other, e)) + }) + }.boxed() + } +} diff --git a/client/network/src/protocol/schema/api.v1.proto b/client/network/src/protocol/schema/api.v1.proto index ccbf49d666115..a933c5811c109 100644 --- a/client/network/src/protocol/schema/api.v1.proto +++ b/client/network/src/protocol/schema/api.v1.proto @@ -51,5 +51,10 @@ message BlockData { bytes message_queue = 5; // optional // Justification if requested. bytes justification = 6; // optional + // True if justification should be treated as present but empty. + // This hack is unfortunately necessary because shortcomings in the protobuf format otherwise + // doesn't make in possible to differentiate between a lack of justification and an empty + // justification. + bool is_empty_justification = 7; // optional, false if absent } diff --git a/client/network/src/service.rs b/client/network/src/service.rs index 1391f8e06cc87..f811266a13445 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -211,6 +211,7 @@ impl NetworkWorker { params.block_announce_validator, params.metrics_registry.as_ref(), boot_node_ids.clone(), + params.network_config.use_new_block_requests_protocol, metrics.as_ref().map(|m| m.notifications_queues_size.clone()), )?; @@ -225,11 +226,9 @@ impl NetworkWorker { let config = protocol::block_requests::Config::new(¶ms.protocol_id); protocol::BlockRequests::new(config, params.chain.clone()) }; - let finality_proof_requests = if let Some(pb) = ¶ms.finality_proof_provider { + let finality_proof_requests = { let config = protocol::finality_requests::Config::new(¶ms.protocol_id); - Some(protocol::FinalityProofRequests::new(config, pb.clone())) - } else { - None + protocol::FinalityProofRequests::new(config, params.finality_proof_provider.clone()) }; let light_client_handler = { let config = protocol::light_client_handler::Config::new(¶ms.protocol_id);