From 8838647a477045f699cdb5bdd6cc3c1c3cf23cb5 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 21 Apr 2020 14:10:54 +0200 Subject: [PATCH 1/3] Add a protocol that answers finality proofs --- client/network/build.rs | 1 + client/network/src/behaviour.rs | 4 + client/network/src/protocol.rs | 5 + .../network/src/protocol/finality_requests.rs | 268 ++++++++++++++++++ .../src/protocol/schema/finality.v1.proto | 19 ++ client/network/src/service.rs | 11 +- 6 files changed, 306 insertions(+), 2 deletions(-) create mode 100644 client/network/src/protocol/finality_requests.rs create mode 100644 client/network/src/protocol/schema/finality.v1.proto diff --git a/client/network/build.rs b/client/network/build.rs index 0fd1f128660e9..991b1cba5d6c8 100644 --- a/client/network/build.rs +++ b/client/network/build.rs @@ -1,5 +1,6 @@ const PROTOS: &[&str] = &[ "src/protocol/schema/api.v1.proto", + "src/protocol/schema/finality.v1.proto", "src/protocol/schema/light.v1.proto" ]; diff --git a/client/network/src/behaviour.rs b/client/network/src/behaviour.rs index 880b381e669b6..47be474ff52c2 100644 --- a/client/network/src/behaviour.rs +++ b/client/network/src/behaviour.rs @@ -45,6 +45,8 @@ pub struct Behaviour { discovery: DiscoveryBehaviour, /// Block request handling. block_requests: protocol::BlockRequests, + /// Finality proof request handling. + finality_proof_requests: protocol::FinalityProofRequests, /// Light client request handling. light_client_handler: protocol::LightClientHandler, @@ -75,6 +77,7 @@ impl Behaviour { user_agent: String, local_public_key: PublicKey, block_requests: protocol::BlockRequests, + finality_proof_requests: protocol::FinalityProofRequests, light_client_handler: protocol::LightClientHandler, disco_config: DiscoveryConfig, ) -> Self { @@ -83,6 +86,7 @@ impl Behaviour { debug_info: debug_info::DebugInfoBehaviour::new(user_agent, local_public_key.clone()), discovery: disco_config.finish(), block_requests, + finality_proof_requests, light_client_handler, events: Vec::new(), role, diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index 42084b7ce31c1..57e7129c00099 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -62,6 +62,9 @@ use wasm_timer::Instant; pub mod api { pub mod v1 { include!(concat!(env!("OUT_DIR"), "/api.v1.rs")); + pub mod finality { + include!(concat!(env!("OUT_DIR"), "/api.v1.finality.rs")); + } pub mod light { include!(concat!(env!("OUT_DIR"), "/api.v1.light.rs")); } @@ -72,12 +75,14 @@ mod generic_proto; mod util; pub mod block_requests; +pub mod finality_requests; pub mod message; pub mod event; pub mod light_client_handler; pub mod sync; pub use block_requests::BlockRequests; +pub use finality_requests::FinalityProofRequests; pub use light_client_handler::LightClientHandler; pub use generic_proto::LegacyConnectionKillError; diff --git a/client/network/src/protocol/finality_requests.rs b/client/network/src/protocol/finality_requests.rs new file mode 100644 index 0000000000000..a13dfa58e3aa4 --- /dev/null +++ b/client/network/src/protocol/finality_requests.rs @@ -0,0 +1,268 @@ +// Copyright 2020 Parity Technologies (UK) Ltd. +// This file is part of Substrate. +// +// Substrate is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// Substrate is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with Substrate. If not, see . + +//! `NetworkBehaviour` implementation which handles incoming finality proof requests. +//! +//! Every request is coming in on a separate connection substream which gets +//! closed after we have sent the response back. Incoming requests are encoded +//! as protocol buffers (cf. `finality.v1.proto`). + +#![allow(unused)] + +use bytes::Bytes; +use codec::{Encode, Decode}; +use crate::{ + chain::FinalityProofProvider, + config::ProtocolId, + protocol::{api, message::BlockAttributes} +}; +use futures::{future::BoxFuture, prelude::*, stream::FuturesUnordered}; +use libp2p::{ + core::{ + ConnectedPoint, + Multiaddr, + PeerId, + connection::ConnectionId, + upgrade::{InboundUpgrade, ReadOneError, UpgradeInfo, Negotiated}, + upgrade::{DeniedUpgrade, read_one, write_one} + }, + swarm::{ + NegotiatedSubstream, + NetworkBehaviour, + NetworkBehaviourAction, + OneShotHandler, + OneShotHandlerConfig, + PollParameters, + SubstreamProtocol + } +}; +use prost::Message; +use sp_runtime::{generic::BlockId, traits::{Block, Header, One, Zero}}; +use std::{ + cmp::min, + io, + iter, + sync::Arc, + time::Duration, + task::{Context, Poll} +}; +use void::{Void, unreachable}; + +// Type alias for convenience. +pub type Error = Box; + +/// Configuration options for `FinalityProofRequests`. +#[derive(Debug, Clone)] +pub struct Config { + max_request_len: usize, + inactivity_timeout: Duration, + protocol: Bytes, +} + +impl Config { + /// Create a fresh configuration with the following options: + /// + /// - max. request size = 1 MiB + /// - inactivity timeout = 15s + pub fn new(id: &ProtocolId) -> Self { + let mut c = Config { + max_request_len: 1024 * 1024, + inactivity_timeout: Duration::from_secs(15), + protocol: Bytes::new(), + }; + c.set_protocol(id); + c + } + + /// Limit the max. length of incoming finality proof request bytes. + pub fn set_max_request_len(&mut self, v: usize) -> &mut Self { + self.max_request_len = v; + self + } + + /// Limit the max. duration the substream may remain inactive before closing it. + pub fn set_inactivity_timeout(&mut self, v: Duration) -> &mut Self { + self.inactivity_timeout = v; + self + } + + /// Set protocol to use for upgrade negotiation. + pub fn set_protocol(&mut self, id: &ProtocolId) -> &mut Self { + let mut v = Vec::new(); + v.extend_from_slice(b"/"); + v.extend_from_slice(id.as_bytes()); + v.extend_from_slice(b"/finality-proof/1"); + self.protocol = v.into(); + self + } +} + +/// The finality proof request handling behaviour. +pub struct FinalityProofRequests { + /// This behaviour's configuration. + config: Config, + /// How to construct finality proofs. + finality_proof_provider: Option>>, + /// Futures sending back the finality proof request responses. + outgoing: FuturesUnordered>, +} + +impl FinalityProofRequests +where + B: Block, +{ + /// Initializes the behaviour. + pub fn new(cfg: Config, finality_proof_provider: Option>>) -> Self { + FinalityProofRequests { + config: cfg, + finality_proof_provider, + outgoing: FuturesUnordered::new(), + } + } + + /// Callback, invoked when a new finality request has been received from remote. + fn on_finality_request(&mut self, peer: &PeerId, request: &api::v1::finality::FinalityProofRequest) + -> Result + { + let block_hash = Decode::decode(&mut request.block_hash.as_ref())?; + + log::trace!(target: "sync", "Finality proof request from {} for {}", peer, block_hash); + + let provider = self.finality_proof_provider.as_ref() + .ok_or_else(|| String::from("Finality provider is not configured"))?; + + let finality_proof = provider.prove_finality(block_hash, &request.request)? + .unwrap_or(Vec::new()); + // Note that an empty Vec is sent if no proof is available. + + Ok(api::v1::finality::FinalityProofResponse { proof: finality_proof }) + } +} + +impl NetworkBehaviour for FinalityProofRequests +where + B: Block +{ + type ProtocolsHandler = OneShotHandler>; + type OutEvent = Void; + + fn new_handler(&mut self) -> Self::ProtocolsHandler { + let p = Protocol { + max_request_len: self.config.max_request_len, + protocol: self.config.protocol.clone(), + }; + let mut cfg = OneShotHandlerConfig::default(); + cfg.inactive_timeout = self.config.inactivity_timeout; + OneShotHandler::new(SubstreamProtocol::new(p), cfg) + } + + fn addresses_of_peer(&mut self, _: &PeerId) -> Vec { + Vec::new() + } + + fn inject_connected(&mut self, _peer: &PeerId) { + } + + fn inject_disconnected(&mut self, _peer: &PeerId) { + } + + fn inject_event( + &mut self, + peer: PeerId, + connection: ConnectionId, + Request(request, mut stream): Request + ) { + match self.on_finality_request(&peer, &request) { + Ok(res) => { + log::trace!("enqueueing finality response for peer {}", peer); + let mut data = Vec::with_capacity(res.encoded_len()); + if let Err(e) = res.encode(&mut data) { + log::debug!("error encoding finality response for peer {}: {}", peer, e) + } else { + let future = async move { + if let Err(e) = write_one(&mut stream, data).await { + log::debug!("error writing finality response: {}", e) + } + }; + self.outgoing.push(future.boxed()) + } + } + Err(e) => log::debug!("error handling finality request from peer {}: {}", peer, e) + } + } + + fn poll(&mut self, cx: &mut Context, _: &mut impl PollParameters) -> Poll> { + while let Poll::Ready(Some(_)) = self.outgoing.poll_next_unpin(cx) {} + Poll::Pending + } +} + +/// The incoming finality proof request. +/// +/// Holds the protobuf value and the connection substream which made the +/// request and over which to send the response. +#[derive(Debug)] +pub struct Request(api::v1::finality::FinalityProofRequest, T); + +impl From for Request { + fn from(v: Void) -> Self { + unreachable(v) + } +} + +/// Substream upgrade protocol. +/// +/// We attempt to parse an incoming protobuf encoded request (cf. `Request`) +/// which will be handled by the `FinalityProofRequests` behaviour, i.e. the request +/// will become visible via `inject_node_event` which then dispatches to the +/// relevant callback to process the message and prepare a response. +#[derive(Debug, Clone)] +pub struct Protocol { + /// The max. request length in bytes. + max_request_len: usize, + /// The protocol to use during upgrade negotiation. + protocol: Bytes, +} + +impl UpgradeInfo for Protocol { + type Info = Bytes; + type InfoIter = iter::Once; + + fn protocol_info(&self) -> Self::InfoIter { + iter::once(self.protocol.clone()) + } +} + +impl InboundUpgrade for Protocol +where + T: AsyncRead + AsyncWrite + Unpin + Send + 'static +{ + type Output = Request; + type Error = ReadOneError; + type Future = BoxFuture<'static, Result>; + + fn upgrade_inbound(self, mut s: T, _: Self::Info) -> Self::Future { + async move { + let len = self.max_request_len; + let vec = read_one(&mut s, len).await?; + match api::v1::finality::FinalityProofRequest::decode(&vec[..]) { + Ok(r) => Ok(Request(r, s)), + Err(e) => Err(ReadOneError::Io(io::Error::new(io::ErrorKind::Other, e))) + } + }.boxed() + } +} + diff --git a/client/network/src/protocol/schema/finality.v1.proto b/client/network/src/protocol/schema/finality.v1.proto new file mode 100644 index 0000000000000..2c8497d96a593 --- /dev/null +++ b/client/network/src/protocol/schema/finality.v1.proto @@ -0,0 +1,19 @@ +// Schema definition for finality proof request/responses. + +syntax = "proto3"; + +package api.v1.finality; + +// Request a finality proof from a peer. +message FinalityProofRequest { + // SCALE-encoded hash of the block to request. + bytes block_hash = 1; + // Opaque chain-specific additional request data. + bytes request = 2; +} + +// Response to a finality proof request. +message FinalityProofResponse { + // Opaque chain-specific additional response data. + bytes proof = 1; // optional +} diff --git a/client/network/src/service.rs b/client/network/src/service.rs index d29cb94ee8a36..20b7a482955f5 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -225,6 +225,10 @@ impl NetworkWorker { let config = protocol::block_requests::Config::new(¶ms.protocol_id); protocol::BlockRequests::new(config, params.chain.clone()) }; + let finality_proof_requests = { + let config = protocol::finality_requests::Config::new(¶ms.protocol_id); + protocol::FinalityProofRequests::new(config, params.finality_proof_provider.clone()) + }; let light_client_handler = { let config = protocol::light_client_handler::Config::new(¶ms.protocol_id); protocol::LightClientHandler::new( @@ -261,6 +265,7 @@ impl NetworkWorker { user_agent, local_public, block_requests, + finality_proof_requests, light_client_handler, discovery_config ); @@ -1113,10 +1118,12 @@ impl Future for NetworkWorker { ConnectionError::IO(_) => metrics.connections_closed_total.with_label_values(&[dir, "transport-error"]).inc(), ConnectionError::Handler(NodeHandlerWrapperError::Handler(EitherError::A(EitherError::A( - EitherError::A(EitherError::B(EitherError::A(PingFailure::Timeout))))))) => + EitherError::A(EitherError::A(EitherError::B( + EitherError::A(PingFailure::Timeout)))))))) => metrics.connections_closed_total.with_label_values(&[dir, "ping-timeout"]).inc(), ConnectionError::Handler(NodeHandlerWrapperError::Handler(EitherError::A(EitherError::A( - EitherError::A(EitherError::A(EitherError::B(LegacyConnectionKillError))))))) => + EitherError::A(EitherError::A(EitherError::A( + EitherError::B(LegacyConnectionKillError)))))))) => metrics.connections_closed_total.with_label_values(&[dir, "force-closed"]).inc(), ConnectionError::Handler(NodeHandlerWrapperError::Handler(_)) => metrics.connections_closed_total.with_label_values(&[dir, "protocol-error"]).inc(), From 3c241ec864d17c8b8080efab2cd08f8d90fc9d54 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 21 Apr 2020 14:39:18 +0200 Subject: [PATCH 2/3] Fix documentation --- client/network/src/protocol/schema/finality.v1.proto | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/network/src/protocol/schema/finality.v1.proto b/client/network/src/protocol/schema/finality.v1.proto index 2c8497d96a593..843bc4eca0990 100644 --- a/client/network/src/protocol/schema/finality.v1.proto +++ b/client/network/src/protocol/schema/finality.v1.proto @@ -14,6 +14,6 @@ message FinalityProofRequest { // Response to a finality proof request. message FinalityProofResponse { - // Opaque chain-specific additional response data. + // Opaque chain-specific finality proof. Empty if no such proof exists. bytes proof = 1; // optional } From 03a7ad2157f57d3da71bbb9441f414a83d4b66d7 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Tue, 21 Apr 2020 17:21:27 +0200 Subject: [PATCH 3/3] Use Toggle --- client/network/src/behaviour.rs | 8 ++++---- client/network/src/protocol/finality_requests.rs | 10 ++++------ client/network/src/service.rs | 6 ++++-- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/client/network/src/behaviour.rs b/client/network/src/behaviour.rs index 47be474ff52c2..14b2245be0a12 100644 --- a/client/network/src/behaviour.rs +++ b/client/network/src/behaviour.rs @@ -25,7 +25,7 @@ use codec::Encode as _; use libp2p::NetworkBehaviour; use libp2p::core::{Multiaddr, PeerId, PublicKey}; use libp2p::kad::record; -use libp2p::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters}; +use libp2p::swarm::{NetworkBehaviourAction, NetworkBehaviourEventProcess, PollParameters, toggle::Toggle}; use log::debug; use sp_consensus::{BlockOrigin, import_queue::{IncomingBlock, Origin}}; use sp_runtime::{traits::{Block as BlockT, NumberFor}, ConsensusEngineId, Justification}; @@ -46,7 +46,7 @@ pub struct Behaviour { /// Block request handling. block_requests: protocol::BlockRequests, /// Finality proof request handling. - finality_proof_requests: protocol::FinalityProofRequests, + finality_proof_requests: Toggle>, /// Light client request handling. light_client_handler: protocol::LightClientHandler, @@ -77,7 +77,7 @@ impl Behaviour { user_agent: String, local_public_key: PublicKey, block_requests: protocol::BlockRequests, - finality_proof_requests: protocol::FinalityProofRequests, + finality_proof_requests: Option>, light_client_handler: protocol::LightClientHandler, disco_config: DiscoveryConfig, ) -> Self { @@ -86,7 +86,7 @@ impl Behaviour { debug_info: debug_info::DebugInfoBehaviour::new(user_agent, local_public_key.clone()), discovery: disco_config.finish(), block_requests, - finality_proof_requests, + finality_proof_requests: From::from(finality_proof_requests), light_client_handler, events: Vec::new(), role, diff --git a/client/network/src/protocol/finality_requests.rs b/client/network/src/protocol/finality_requests.rs index a13dfa58e3aa4..b12b79f41bc9a 100644 --- a/client/network/src/protocol/finality_requests.rs +++ b/client/network/src/protocol/finality_requests.rs @@ -115,7 +115,7 @@ pub struct FinalityProofRequests { /// This behaviour's configuration. config: Config, /// How to construct finality proofs. - finality_proof_provider: Option>>, + finality_proof_provider: Arc>, /// Futures sending back the finality proof request responses. outgoing: FuturesUnordered>, } @@ -125,7 +125,7 @@ where B: Block, { /// Initializes the behaviour. - pub fn new(cfg: Config, finality_proof_provider: Option>>) -> Self { + pub fn new(cfg: Config, finality_proof_provider: Arc>) -> Self { FinalityProofRequests { config: cfg, finality_proof_provider, @@ -141,10 +141,8 @@ where log::trace!(target: "sync", "Finality proof request from {} for {}", peer, block_hash); - let provider = self.finality_proof_provider.as_ref() - .ok_or_else(|| String::from("Finality provider is not configured"))?; - - let finality_proof = provider.prove_finality(block_hash, &request.request)? + let finality_proof = self.finality_proof_provider + .prove_finality(block_hash, &request.request)? .unwrap_or(Vec::new()); // Note that an empty Vec is sent if no proof is available. diff --git a/client/network/src/service.rs b/client/network/src/service.rs index 20b7a482955f5..e360a8defec9d 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -225,9 +225,11 @@ impl NetworkWorker { let config = protocol::block_requests::Config::new(¶ms.protocol_id); protocol::BlockRequests::new(config, params.chain.clone()) }; - let finality_proof_requests = { + let finality_proof_requests = if let Some(pb) = ¶ms.finality_proof_provider { let config = protocol::finality_requests::Config::new(¶ms.protocol_id); - protocol::FinalityProofRequests::new(config, params.finality_proof_provider.clone()) + Some(protocol::FinalityProofRequests::new(config, pb.clone())) + } else { + None }; let light_client_handler = { let config = protocol::light_client_handler::Config::new(¶ms.protocol_id);