From 16d3cab1ce437880b74661f5b3330e4f9bb7e89e Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Fri, 28 Aug 2020 10:56:48 +0200 Subject: [PATCH 01/11] Stop sending messages on legacy substream altogether --- client/network/src/protocol.rs | 176 ++---------------- .../src/protocol/generic_proto/behaviour.rs | 24 --- .../protocol/generic_proto/handler/group.rs | 40 ---- .../protocol/generic_proto/handler/legacy.rs | 20 -- .../src/protocol/generic_proto/tests.rs | 150 +-------------- .../protocol/generic_proto/upgrade/legacy.rs | 9 - client/network/src/service.rs | 35 +--- 7 files changed, 27 insertions(+), 427 deletions(-) diff --git a/client/network/src/protocol.rs b/client/network/src/protocol.rs index b1945b4dba47d..1cf6ddd7f95ce 100644 --- a/client/network/src/protocol.rs +++ b/client/network/src/protocol.rs @@ -39,13 +39,13 @@ use sp_consensus::{ use codec::{Decode, Encode}; use sp_runtime::{generic::BlockId, ConsensusEngineId, Justification}; use sp_runtime::traits::{ - Block as BlockT, Header as HeaderT, NumberFor, One, Zero, CheckedSub + Block as BlockT, Header as HeaderT, NumberFor, Zero, CheckedSub }; use sp_arithmetic::traits::SaturatedConversion; use message::{BlockAnnounce, Message}; use message::generic::{Message as GenericMessage, Roles}; use prometheus_endpoint::{ - Registry, Gauge, Counter, CounterVec, GaugeVec, + Registry, Gauge, Counter, GaugeVec, PrometheusError, Opts, register, U64 }; use sync::{ChainSync, SyncState}; @@ -53,7 +53,7 @@ use std::borrow::Cow; use std::collections::{HashMap, HashSet, VecDeque, hash_map::Entry}; use std::sync::Arc; use std::fmt::Write; -use std::{cmp, io, num::NonZeroUsize, pin::Pin, task::Poll, time}; +use std::{io, num::NonZeroUsize, pin::Pin, task::Poll, time}; use log::{log, Level, trace, debug, warn, error}; use wasm_timer::Instant; @@ -86,11 +86,6 @@ pub(crate) const CURRENT_VERSION: u32 = 6; /// Lowest version we support pub(crate) const MIN_VERSION: u32 = 3; -// Maximum allowed entries in `BlockResponse` -const MAX_BLOCK_DATA_RESPONSE: u32 = 128; -// Maximum total bytes allowed for block bodies in `BlockResponse` -const MAX_BODIES_BYTES: usize = 8 * 1024 * 1024; - /// When light node connects to the full node and the full node is behind light node /// for at least `LIGHT_MAXIMAL_BLOCKS_DIFFERENCE` blocks, we consider it not useful /// and disconnect to free connection slot. @@ -119,8 +114,6 @@ mod rep { pub const UNEXPECTED_RESPONSE: Rep = Rep::new_fatal("Unexpected response packet"); /// We received an unexpected transaction packet. pub const UNEXPECTED_TRANSACTIONS: Rep = Rep::new_fatal("Unexpected transactions packet"); - /// We received an unexpected light node request. - pub const UNEXPECTED_REQUEST: Rep = Rep::new_fatal("Unexpected block request packet"); /// Peer has different genesis. pub const GENESIS_MISMATCH: Rep = Rep::new_fatal("Genesis mismatch"); /// Peer is on unsupported protocol version. @@ -139,7 +132,6 @@ struct Metrics { finality_proofs: GaugeVec, justifications: GaugeVec, propagated_transactions: Counter, - legacy_requests_received: CounterVec, } impl Metrics { @@ -185,13 +177,6 @@ impl Metrics { "sync_propagated_transactions", "Number of transactions propagated to at least one peer", )?, r)?, - legacy_requests_received: register(CounterVec::new( - Opts::new( - "sync_legacy_requests_received", - "Number of block/finality/light-client requests received on the legacy substream", - ), - &["kind"] - )?, r)?, }) } } @@ -602,12 +587,6 @@ impl Protocol { match message { GenericMessage::Status(_) => debug!(target: "sub-libp2p", "Received unexpected Status"), - GenericMessage::BlockRequest(r) => self.on_block_request(who, r), - GenericMessage::BlockResponse(r) => { - let outcome = self.on_block_response(who.clone(), r); - self.update_peer_info(&who); - return outcome - }, GenericMessage::BlockAnnounce(announce) => { let outcome = self.on_block_announce(who.clone(), announce); self.update_peer_info(&who); @@ -615,6 +594,8 @@ impl Protocol { }, GenericMessage::Transactions(m) => self.on_transactions(who, m), + GenericMessage::BlockResponse(_) => + warn!(target: "sub-libp2p", "Received unexpected BlockResponse"), GenericMessage::RemoteCallResponse(_) => warn!(target: "sub-libp2p", "Received unexpected RemoteCallResponse"), GenericMessage::RemoteReadResponse(_) => @@ -625,6 +606,7 @@ impl Protocol { warn!(target: "sub-libp2p", "Received unexpected RemoteChangesResponse"), GenericMessage::FinalityProofResponse(_) => warn!(target: "sub-libp2p", "Received unexpected FinalityProofResponse"), + GenericMessage::BlockRequest(_) | GenericMessage::FinalityProofRequest(_) | GenericMessage::RemoteReadChildRequest(_) | GenericMessage::RemoteCallRequest(_) | @@ -676,21 +658,6 @@ impl Protocol { CustomMessageOutcome::None } - fn send_message( - &mut self, - who: &PeerId, - message: Option<(Cow<'static, [u8]>, Vec)>, - legacy: Message, - ) { - send_message::( - &mut self.behaviour, - &mut self.context_data.stats, - who, - message, - legacy, - ); - } - fn update_peer_request(&mut self, who: &PeerId, request: &mut message::BlockRequest) { update_peer_request::(&mut self.context_data.peers, who, request) } @@ -716,92 +683,6 @@ impl Protocol { } } - fn on_block_request(&mut self, peer: PeerId, request: message::BlockRequest) { - if let Some(metrics) = &self.metrics { - metrics.legacy_requests_received.with_label_values(&["block-request"]).inc(); - } - - trace!(target: "sync", "BlockRequest {} from {}: from {:?} to {:?} max {:?} for {:?}", - request.id, - peer, - request.from, - request.to, - request.max, - request.fields, - ); - - // sending block requests to the node that is unable to serve it is considered a bad behavior - if !self.config.roles.is_full() { - trace!(target: "sync", "Peer {} is trying to sync from the light node", peer); - self.behaviour.disconnect_peer(&peer); - self.peerset_handle.report_peer(peer, rep::UNEXPECTED_REQUEST); - return; - } - - let mut blocks = Vec::new(); - let mut id = match request.from { - message::FromBlock::Hash(h) => BlockId::Hash(h), - message::FromBlock::Number(n) => BlockId::Number(n), - }; - let max = cmp::min(request.max.unwrap_or(u32::max_value()), MAX_BLOCK_DATA_RESPONSE) as usize; - let get_header = request.fields.contains(message::BlockAttributes::HEADER); - let get_body = request.fields.contains(message::BlockAttributes::BODY); - let get_justification = request - .fields - .contains(message::BlockAttributes::JUSTIFICATION); - let mut total_size = 0; - while let Some(header) = self.context_data.chain.header(id).unwrap_or(None) { - if blocks.len() >= max || (blocks.len() >= 1 && total_size > MAX_BODIES_BYTES) { - break; - } - let number = *header.number(); - let hash = header.hash(); - let parent_hash = *header.parent_hash(); - let justification = if get_justification { - self.context_data.chain.justification(&BlockId::Hash(hash)).unwrap_or(None) - } else { - None - }; - let block_data = message::generic::BlockData { - hash, - header: if get_header { Some(header) } else { None }, - body: if get_body { - self.context_data - .chain - .block_body(&BlockId::Hash(hash)) - .unwrap_or(None) - } else { - None - }, - receipt: None, - message_queue: None, - justification, - }; - // Stop if we don't have requested block body - if get_body && block_data.body.is_none() { - trace!(target: "sync", "Missing data for block request."); - break; - } - total_size += block_data.body.as_ref().map_or(0, |b| b.len()); - blocks.push(block_data); - match request.direction { - message::Direction::Ascending => id = BlockId::Number(number + One::one()), - message::Direction::Descending => { - if number.is_zero() { - break; - } - id = BlockId::Hash(parent_hash) - } - } - } - let response = message::generic::BlockResponse { - id: request.id, - blocks, - }; - trace!(target: "sync", "Sending BlockResponse with {} blocks", response.blocks.len()); - self.send_message(&peer, None, GenericMessage::BlockResponse(response)) - } - /// Adjusts the reputation of a node. pub fn report_peer(&self, who: PeerId, reputation: sc_peerset::ReputationChange) { self.peerset_handle.report_peer(who, reputation) @@ -1205,14 +1086,11 @@ impl Protocol { .push(who.to_base58()); } trace!(target: "sync", "Sending {} transactions to {}", to_send.len(), who); - let encoded = to_send.encode(); - send_message:: ( - &mut self.behaviour, - &mut self.context_data.stats, - &who, - Some((self.transactions_protocol.clone(), encoded)), - GenericMessage::Transactions(to_send) - ) + self.behaviour.write_notification( + who, + self.transactions_protocol.clone(), + to_send.encode() + ); } } @@ -1287,15 +1165,11 @@ impl Protocol { }, }; - let encoded = message.encode(); - - send_message:: ( - &mut self.behaviour, - &mut self.context_data.stats, - &who, - Some((self.block_announces_protocol.clone(), encoded)), - Message::::BlockAnnounce(message), - ) + self.behaviour.write_notification( + who, + self.block_announces_protocol.clone(), + message.encode() + ); } } } @@ -1603,24 +1477,6 @@ fn update_peer_request( } } -fn send_message( - behaviour: &mut GenericProto, - stats: &mut HashMap<&'static str, PacketStats>, - who: &PeerId, - message: Option<(Cow<'static, [u8]>, Vec)>, - legacy_message: Message, -) { - let encoded = legacy_message.encode(); - let mut stats = stats.entry(legacy_message.id()).or_default(); - stats.bytes_out += encoded.len() as u64; - stats.count_out += 1; - if let Some((proto, msg)) = message { - behaviour.write_notification(who, proto, msg, encoded); - } else { - behaviour.send_packet(who, encoded); - } -} - impl NetworkBehaviour for Protocol { type ProtocolsHandler = ::ProtocolsHandler; type OutEvent = CustomMessageOutcome; diff --git a/client/network/src/protocol/generic_proto/behaviour.rs b/client/network/src/protocol/generic_proto/behaviour.rs index f965980640ad6..ff25d245402a5 100644 --- a/client/network/src/protocol/generic_proto/behaviour.rs +++ b/client/network/src/protocol/generic_proto/behaviour.rs @@ -553,7 +553,6 @@ impl GenericProto { target: &PeerId, protocol_name: Cow<'static, [u8]>, message: impl Into>, - encoded_fallback_message: Vec, ) { let notifs_sink = match self.peers.get(target).and_then(|p| p.get_open()) { None => { @@ -574,33 +573,10 @@ impl GenericProto { trace!(target: "sub-libp2p", "Handler({:?}) <= Packet", target); notifs_sink.send_sync_notification( &protocol_name, - encoded_fallback_message, message ); } - /// Sends a message to a peer. - /// - /// Has no effect if the custom protocol is not open with the given peer. - /// - /// Also note that even we have a valid open substream, it may in fact be already closed - /// without us knowing, in which case the packet will not be received. - pub fn send_packet(&mut self, target: &PeerId, message: Vec) { - let notifs_sink = match self.peers.get(target).and_then(|p| p.get_open()) { - None => { - debug!(target: "sub-libp2p", - "Tried to sent packet to {:?} without an open channel.", - target); - return - } - Some(sink) => sink - }; - - trace!(target: "sub-libp2p", "External API => Packet for {:?}", target); - trace!(target: "sub-libp2p", "Handler({:?}) <= Packet", target); - notifs_sink.send_legacy(message); - } - /// Returns the state of the peerset manager, for debugging purposes. pub fn peerset_debug_info(&mut self) -> serde_json::Value { self.peerset.debug_info() diff --git a/client/network/src/protocol/generic_proto/handler/group.rs b/client/network/src/protocol/generic_proto/handler/group.rs index bcdbaf848511f..cf43f029e2039 100644 --- a/client/network/src/protocol/generic_proto/handler/group.rs +++ b/client/network/src/protocol/generic_proto/handler/group.rs @@ -262,16 +262,10 @@ struct NotificationsSinkInner { /// dedicated to the peer. #[derive(Debug)] enum NotificationsSinkMessage { - /// Message emitted by [`NotificationsSink::send_legacy`]. - Legacy { - message: Vec, - }, - /// Message emitted by [`NotificationsSink::reserve_notification`] and /// [`NotificationsSink::write_notification_now`]. Notification { protocol_name: Vec, - encoded_fallback_message: Vec, message: Vec, }, @@ -280,26 +274,6 @@ enum NotificationsSinkMessage { } impl NotificationsSink { - /// Sends a message to the peer using the legacy substream. - /// - /// If too many messages are already buffered, the message is silently discarded and the - /// connection to the peer will be closed shortly after. - /// - /// This method will be removed in a future version. - pub fn send_legacy<'a>(&'a self, message: impl Into>) { - let mut lock = self.inner.sync_channel.lock(); - let result = lock.try_send(NotificationsSinkMessage::Legacy { - message: message.into() - }); - - if result.is_err() { - // Cloning the `mpsc::Sender` guarantees the allocation of an extra spot in the - // buffer, and therefore that `try_send` will succeed. - let _result2 = lock.clone().try_send(NotificationsSinkMessage::ForceClose); - debug_assert!(_result2.map(|()| true).unwrap_or_else(|err| err.is_disconnected())); - } - } - /// Sends a notification to the peer. /// /// If too many messages are already buffered, the notification is silently discarded and the @@ -312,13 +286,11 @@ impl NotificationsSink { pub fn send_sync_notification<'a>( &'a self, protocol_name: &[u8], - encoded_fallback_message: impl Into>, message: impl Into> ) { let mut lock = self.inner.sync_channel.lock(); let result = lock.try_send(NotificationsSinkMessage::Notification { protocol_name: protocol_name.to_owned(), - encoded_fallback_message: encoded_fallback_message.into(), message: message.into() }); @@ -364,12 +336,10 @@ impl<'a> Ready<'a> { /// Returns an error if the substream has been closed. pub fn send( mut self, - encoded_fallback_message: impl Into>, notification: impl Into> ) -> Result<(), ()> { self.lock.start_send(NotificationsSinkMessage::Notification { protocol_name: self.protocol_name, - encoded_fallback_message: encoded_fallback_message.into(), message: notification.into(), }).map_err(|_| ()) } @@ -602,14 +572,8 @@ impl ProtocolsHandler for NotifsHandler { }; match message { - NotificationsSinkMessage::Legacy { message } => { - self.legacy.inject_event(LegacyProtoHandlerIn::SendCustomMessage { - message - }); - } NotificationsSinkMessage::Notification { protocol_name, - encoded_fallback_message, message } => { for (handler, _) in &mut self.out_handlers { @@ -618,10 +582,6 @@ impl ProtocolsHandler for NotifsHandler { continue 'poll_notifs_sink; } } - - self.legacy.inject_event(LegacyProtoHandlerIn::SendCustomMessage { - message: encoded_fallback_message, - }); } NotificationsSinkMessage::ForceClose => { return Poll::Ready(ProtocolsHandlerEvent::Close(NotifsHandlerError::SyncNotificationsClogged)); diff --git a/client/network/src/protocol/generic_proto/handler/legacy.rs b/client/network/src/protocol/generic_proto/handler/legacy.rs index 7d31ed323a43b..d98d864dfc6fa 100644 --- a/client/network/src/protocol/generic_proto/handler/legacy.rs +++ b/client/network/src/protocol/generic_proto/handler/legacy.rs @@ -204,12 +204,6 @@ pub enum LegacyProtoHandlerIn { /// The node should stop using custom protocols. Disable, - - /// Sends a message through a custom protocol substream. - SendCustomMessage { - /// The message to send. - message: Vec, - }, } /// Event that can be emitted by a `LegacyProtoHandler`. @@ -495,17 +489,6 @@ impl LegacyProtoHandler { ProtocolState::KillAsap => ProtocolState::KillAsap, }; } - - /// Sends a message to the remote. - fn send_message(&mut self, message: Vec) { - match self.state { - ProtocolState::Normal { ref mut substreams, .. } => - substreams[0].send_message(message), - - _ => debug!(target: "sub-libp2p", "Tried to send message over closed protocol \ - with {:?}", self.remote_peer_id) - } - } } impl ProtocolsHandler for LegacyProtoHandler { @@ -539,12 +522,9 @@ impl ProtocolsHandler for LegacyProtoHandler { match message { LegacyProtoHandlerIn::Disable => self.disable(), LegacyProtoHandlerIn::Enable => self.enable(), - LegacyProtoHandlerIn::SendCustomMessage { message } => - self.send_message(message), } } - #[inline] fn inject_dial_upgrade_error(&mut self, _: (), err: ProtocolsHandlerUpgrErr) { let is_severe = match err { ProtocolsHandlerUpgrErr::Upgrade(_) => true, diff --git a/client/network/src/protocol/generic_proto/tests.rs b/client/network/src/protocol/generic_proto/tests.rs index 15c4a17df8d3a..dbe02c350100f 100644 --- a/client/network/src/protocol/generic_proto/tests.rs +++ b/client/network/src/protocol/generic_proto/tests.rs @@ -16,19 +16,16 @@ #![cfg(test)] -use futures::{prelude::*, ready}; -use codec::{Encode, Decode}; -use libp2p::core::connection::{ConnectionId, ListenerId}; -use libp2p::core::ConnectedPoint; -use libp2p::swarm::{Swarm, ProtocolsHandler, IntoProtocolsHandler}; -use libp2p::swarm::{PollParameters, NetworkBehaviour, NetworkBehaviourAction}; +use crate::protocol::generic_proto::{GenericProto, GenericProtoOut}; + +use futures::prelude::*; use libp2p::{PeerId, Multiaddr, Transport}; -use rand::seq::SliceRandom; +use libp2p::core::{connection::{ConnectionId, ListenerId}, ConnectedPoint}; +use libp2p::swarm::{ + Swarm, ProtocolsHandler, IntoProtocolsHandler, PollParameters, + NetworkBehaviour, NetworkBehaviourAction +}; use std::{error, io, task::Context, task::Poll, time::Duration}; -use std::collections::HashSet; -use crate::protocol::message::{generic::BlockResponse, Message}; -use crate::protocol::generic_proto::{GenericProto, GenericProtoOut}; -use sp_test_primitives::Block; /// Builds two nodes that have each other as bootstrap nodes. /// This is to be used only for testing, and a panic will happen if something goes wrong. @@ -216,137 +213,6 @@ impl NetworkBehaviour for CustomProtoWithAddr { } } -#[test] -fn two_nodes_transfer_lots_of_packets() { - // We spawn two nodes, then make the first one send lots of packets to the second one. The test - // ends when the second one has received all of them. - - // This test consists in transferring this given number of packets. Considering that (by - // design) the connection gets closed if one of the remotes can't follow the pace, this number - // should not exceed the size of the buffer of pending notifications. - const NUM_PACKETS: u32 = 512; - - let (mut service1, mut service2) = build_nodes(); - - let fut1 = future::poll_fn(move |cx| -> Poll<()> { - loop { - match ready!(service1.poll_next_unpin(cx)) { - Some(GenericProtoOut::CustomProtocolOpen { peer_id, .. }) => { - for n in 0 .. NUM_PACKETS { - service1.send_packet( - &peer_id, - Message::::BlockResponse(BlockResponse { - id: n as _, - blocks: Vec::new(), - }).encode() - ); - } - }, - // An empty handshake is being sent after opening. - Some(GenericProtoOut::LegacyMessage { message, .. }) if message.is_empty() => {}, - _ => panic!(), - } - } - }); - - let mut packet_counter = 0u32; - let fut2 = future::poll_fn(move |cx| { - loop { - match ready!(service2.poll_next_unpin(cx)) { - Some(GenericProtoOut::CustomProtocolOpen { .. }) => {}, - // An empty handshake is being sent after opening. - Some(GenericProtoOut::LegacyMessage { message, .. }) if message.is_empty() => {}, - Some(GenericProtoOut::LegacyMessage { message, .. }) => { - match Message::::decode(&mut &message[..]).unwrap() { - Message::::BlockResponse(BlockResponse { id: _, blocks }) => { - assert!(blocks.is_empty()); - packet_counter += 1; - if packet_counter == NUM_PACKETS { - return Poll::Ready(()) - } - }, - _ => panic!(), - } - } - _ => panic!(), - } - } - }); - - futures::executor::block_on(async move { - future::select(fut1, fut2).await; - }); -} - -#[test] -fn basic_two_nodes_requests_in_parallel() { - let (mut service1, mut service2) = build_nodes(); - - // Generate random messages with or without a request id. - let mut to_send = { - let mut to_send = Vec::new(); - let mut existing_ids = HashSet::new(); - for _ in 0..200 { // Note: don't make that number too high or the CPU usage will explode. - let req_id = loop { - let req_id = rand::random::(); - - // ensure uniqueness - odds of randomly sampling collisions - // is unlikely, but possible to cause spurious test failures. - if existing_ids.insert(req_id) { - break req_id; - } - }; - - to_send.push(Message::::BlockResponse( - BlockResponse { id: req_id, blocks: Vec::new() } - )); - } - to_send - }; - - // Clone `to_send` in `to_receive`. Below we will remove from `to_receive` the messages we - // receive, until the list is empty. - let mut to_receive = to_send.clone(); - to_send.shuffle(&mut rand::thread_rng()); - - let fut1 = future::poll_fn(move |cx| -> Poll<()> { - loop { - match ready!(service1.poll_next_unpin(cx)) { - Some(GenericProtoOut::CustomProtocolOpen { peer_id, .. }) => { - for msg in to_send.drain(..) { - service1.send_packet(&peer_id, msg.encode()); - } - }, - // An empty handshake is being sent after opening. - Some(GenericProtoOut::LegacyMessage { message, .. }) if message.is_empty() => {}, - _ => panic!(), - } - } - }); - - let fut2 = future::poll_fn(move |cx| { - loop { - match ready!(service2.poll_next_unpin(cx)) { - Some(GenericProtoOut::CustomProtocolOpen { .. }) => {}, - // An empty handshake is being sent after opening. - Some(GenericProtoOut::LegacyMessage { message, .. }) if message.is_empty() => {}, - Some(GenericProtoOut::LegacyMessage { message, .. }) => { - let pos = to_receive.iter().position(|m| m.encode() == message).unwrap(); - to_receive.remove(pos); - if to_receive.is_empty() { - return Poll::Ready(()) - } - } - _ => panic!(), - } - } - }); - - futures::executor::block_on(async move { - future::select(fut1, fut2).await; - }); -} - #[test] fn reconnect_after_disconnect() { // We connect two nodes together, then force a disconnect (through the API of the `Service`), diff --git a/client/network/src/protocol/generic_proto/upgrade/legacy.rs b/client/network/src/protocol/generic_proto/upgrade/legacy.rs index 0937a7798be98..1b2b97253d1ae 100644 --- a/client/network/src/protocol/generic_proto/upgrade/legacy.rs +++ b/client/network/src/protocol/generic_proto/upgrade/legacy.rs @@ -123,15 +123,6 @@ impl RegisteredProtocolSubstream { self.is_closing = true; self.send_queue.clear(); } - - /// Sends a message to the substream. - pub fn send_message(&mut self, data: Vec) { - if self.is_closing { - return - } - - self.send_queue.push_back(From::from(&data[..])); - } } /// Event produced by the `RegisteredProtocolSubstream`. diff --git a/client/network/src/service.rs b/client/network/src/service.rs index 754b5b184c096..8b4d7174273aa 100644 --- a/client/network/src/service.rs +++ b/client/network/src/service.rs @@ -635,18 +635,7 @@ impl NetworkService { // Determine the wire protocol name corresponding to this `engine_id`. let protocol_name = self.protocol_name_by_engine.lock().get(&engine_id).cloned(); if let Some(protocol_name) = protocol_name { - // For backwards-compatibility reason, we have to duplicate the message and pass it - // in the situation where the remote still uses the legacy substream. - let fallback = codec::Encode::encode(&{ - protocol::message::generic::Message::<(), (), (), ()>::Consensus({ - protocol::message::generic::ConsensusMessage { - engine_id, - data: message.clone(), - } - }) - }); - - sink.send_sync_notification(&protocol_name, fallback, message); + sink.send_sync_notification(&protocol_name, message); } else { return; } @@ -751,7 +740,6 @@ impl NetworkService { Ok(NotificationSender { sink, protocol_name, - engine_id, notification_size_metric: self.notifications_sizes_metric.as_ref().map(|histogram| { histogram.with_label_values(&["out", &maybe_utf8_bytes_to_string(&engine_id)]) }), @@ -1064,9 +1052,6 @@ pub struct NotificationSender { /// Name of the protocol on the wire. protocol_name: Cow<'static, [u8]>, - /// Engine ID used for the fallback message. - engine_id: ConsensusEngineId, - /// Field extracted from the [`Metrics`] struct and necessary to report the /// notifications-related metrics. notification_size_metric: Option, @@ -1080,7 +1065,6 @@ impl NotificationSender { Ok(r) => r, Err(()) => return Err(NotificationSenderError::Closed), }, - engine_id: self.engine_id, notification_size_metric: self.notification_size_metric.clone(), }) } @@ -1091,9 +1075,6 @@ impl NotificationSender { pub struct NotificationSenderReady<'a> { ready: Ready<'a>, - /// Engine ID used for the fallback message. - engine_id: ConsensusEngineId, - /// Field extracted from the [`Metrics`] struct and necessary to report the /// notifications-related metrics. notification_size_metric: Option, @@ -1108,18 +1089,8 @@ impl<'a> NotificationSenderReady<'a> { notification_size_metric.observe(notification.len() as f64); } - // For backwards-compatibility reason, we have to duplicate the message and pass it - // in the situation where the remote still uses the legacy substream. - let fallback = codec::Encode::encode(&{ - protocol::message::generic::Message::<(), (), (), ()>::Consensus({ - protocol::message::generic::ConsensusMessage { - engine_id: self.engine_id, - data: notification.clone(), - } - }) - }); - - self.ready.send(fallback, notification) + self.ready + .send(notification) .map_err(|()| NotificationSenderError::Closed) } } From 8b7f4094ea94d711d51d64572f53160ee77fba71 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Fri, 28 Aug 2020 16:03:40 +0200 Subject: [PATCH 02/11] Ensure that handshake is sent back even in case of back-pressure --- .../protocol/generic_proto/handler/group.rs | 10 ++++ .../generic_proto/handler/notif_in.rs | 20 +++++++- .../generic_proto/upgrade/notifications.rs | 46 ++++++++++++++++++- 3 files changed, 73 insertions(+), 3 deletions(-) diff --git a/client/network/src/protocol/generic_proto/handler/group.rs b/client/network/src/protocol/generic_proto/handler/group.rs index bcdbaf848511f..ce747dd495f6b 100644 --- a/client/network/src/protocol/generic_proto/handler/group.rs +++ b/client/network/src/protocol/generic_proto/handler/group.rs @@ -706,6 +706,16 @@ impl ProtocolsHandler for NotifsHandler { } } } + + } else { + // If `self.pending_legacy_handshake` is `None`, we don't want to accept events from + // the ingoing notifications substreams, but still want handshakes to go forward. + for (handler, _) in &mut self.in_handlers { + match handler.poll_process(cx) { + Poll::Pending => {}, + Poll::Ready(v) => match v {} + } + } } for (handler_num, (handler, _)) in self.out_handlers.iter_mut().enumerate() { diff --git a/client/network/src/protocol/generic_proto/handler/notif_in.rs b/client/network/src/protocol/generic_proto/handler/notif_in.rs index ddd78566fcd2a..6775138f881e8 100644 --- a/client/network/src/protocol/generic_proto/handler/notif_in.rs +++ b/client/network/src/protocol/generic_proto/handler/notif_in.rs @@ -37,7 +37,7 @@ use libp2p::swarm::{ NegotiatedSubstream, }; use log::{error, warn}; -use std::{borrow::Cow, collections::VecDeque, fmt, pin::Pin, task::{Context, Poll}}; +use std::{borrow::Cow, collections::VecDeque, convert::Infallible, fmt, pin::Pin, task::{Context, Poll}}; /// Implements the `IntoProtocolsHandler` trait of libp2p. /// @@ -139,6 +139,24 @@ impl NotifsInHandler { pub fn protocol_name(&self) -> &[u8] { self.in_protocol.protocol_name() } + + /// Equivalent to the `poll` method of `ProtocolsHandler`, except that it only drives the + /// handshake process. + /// + /// Use this method in situations where it is not desirable to receive events but still + /// necessary to drive any potential incoming handshake or request. + pub fn poll_process(&mut self, cx: &mut Context) -> Poll { + match self.substream.as_mut().map(|s| NotificationsInSubstream::poll_process(Pin::new(s), cx)) { + None | Some(Poll::Pending) => {}, + Some(Poll::Ready(Ok(v))) => match v {}, + Some(Poll::Ready(Err(_))) => { + self.substream = None; + self.events_queue.push_back(ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Closed)); + }, + } + + Poll::Pending + } } impl ProtocolsHandler for NotifsInHandler { diff --git a/client/network/src/protocol/generic_proto/upgrade/notifications.rs b/client/network/src/protocol/generic_proto/upgrade/notifications.rs index 80fd7761f8088..7113e90e91121 100644 --- a/client/network/src/protocol/generic_proto/upgrade/notifications.rs +++ b/client/network/src/protocol/generic_proto/upgrade/notifications.rs @@ -39,7 +39,7 @@ use futures::prelude::*; use futures_codec::Framed; use libp2p::core::{UpgradeInfo, InboundUpgrade, OutboundUpgrade, upgrade}; use log::error; -use std::{borrow::Cow, io, iter, mem, pin::Pin, task::{Context, Poll}}; +use std::{borrow::Cow, convert::Infallible, io, iter, mem, pin::Pin, task::{Context, Poll}}; use unsigned_varint::codec::UviBytes; /// Maximum allowed size of the two handshake messages, in bytes. @@ -158,7 +158,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static, } impl NotificationsInSubstream -where TSubstream: AsyncRead + AsyncWrite, +where TSubstream: AsyncRead + AsyncWrite + Unpin, { /// Sends the handshake in order to inform the remote that we accept the substream. pub fn send_handshake(&mut self, message: impl Into>) { @@ -169,6 +169,48 @@ where TSubstream: AsyncRead + AsyncWrite, self.handshake = NotificationsInSubstreamHandshake::PendingSend(message.into()); } + + /// Equivalent to `Stream::poll_next`, except that it only drives the handshake and is + /// guaranteed to not generate any notification. + pub fn poll_process(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let mut this = self.project(); + + loop { + match mem::replace(this.handshake, NotificationsInSubstreamHandshake::Sent) { + NotificationsInSubstreamHandshake::PendingSend(msg) => + match Sink::poll_ready(this.socket.as_mut(), cx) { + Poll::Ready(_) => { + *this.handshake = NotificationsInSubstreamHandshake::Flush; + match Sink::start_send(this.socket.as_mut(), io::Cursor::new(msg)) { + Ok(()) => {}, + Err(err) => return Poll::Ready(Err(err)), + } + }, + Poll::Pending => { + *this.handshake = NotificationsInSubstreamHandshake::PendingSend(msg); + return Poll::Pending + } + }, + NotificationsInSubstreamHandshake::Flush => + match Sink::poll_flush(this.socket.as_mut(), cx)? { + Poll::Ready(()) => + *this.handshake = NotificationsInSubstreamHandshake::Sent, + Poll::Pending => { + *this.handshake = NotificationsInSubstreamHandshake::Flush; + return Poll::Pending + } + }, + + st @ NotificationsInSubstreamHandshake::NotSent | + st @ NotificationsInSubstreamHandshake::Sent | + st @ NotificationsInSubstreamHandshake::ClosingInResponseToRemote | + st @ NotificationsInSubstreamHandshake::BothSidesClosed => { + *this.handshake = st; + return Poll::Pending; + } + } + } + } } impl Stream for NotificationsInSubstream From e8a73ba95a2473671a124f7fdcfbf6a2bd791668 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Fri, 28 Aug 2020 17:11:31 +0200 Subject: [PATCH 03/11] Update client/network/src/protocol/generic_proto/handler/group.rs Co-authored-by: Max Inden --- client/network/src/protocol/generic_proto/handler/group.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/network/src/protocol/generic_proto/handler/group.rs b/client/network/src/protocol/generic_proto/handler/group.rs index ce747dd495f6b..10e20158d4bea 100644 --- a/client/network/src/protocol/generic_proto/handler/group.rs +++ b/client/network/src/protocol/generic_proto/handler/group.rs @@ -709,7 +709,7 @@ impl ProtocolsHandler for NotifsHandler { } else { // If `self.pending_legacy_handshake` is `None`, we don't want to accept events from - // the ingoing notifications substreams, but still want handshakes to go forward. + // the incoming notifications substreams, but still want handshakes to go forward. for (handler, _) in &mut self.in_handlers { match handler.poll_process(cx) { Poll::Pending => {}, From 17b3f19a4d65a451d00e71ceab465cdf927c7618 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 31 Aug 2020 10:51:02 +0200 Subject: [PATCH 04/11] Also process OpenRequest and Closed --- .../protocol/generic_proto/handler/group.rs | 76 ++++++++++--------- .../generic_proto/handler/notif_in.rs | 17 ++++- 2 files changed, 52 insertions(+), 41 deletions(-) diff --git a/client/network/src/protocol/generic_proto/handler/group.rs b/client/network/src/protocol/generic_proto/handler/group.rs index b3b16a63eb628..93fea19836ae5 100644 --- a/client/network/src/protocol/generic_proto/handler/group.rs +++ b/client/network/src/protocol/generic_proto/handler/group.rs @@ -634,46 +634,48 @@ impl ProtocolsHandler for NotifsHandler { return Poll::Ready(ProtocolsHandlerEvent::Close(NotifsHandlerError::Legacy(err))), } } + } + + for (handler_num, (handler, handshake_message)) in self.in_handlers.iter_mut().enumerate() { + loop { + let poll = if self.pending_legacy_handshake.is_none() { + handler.poll(cx) + } else { + handler.poll_process(cx) + }; + + let ev = match poll { + Poll::Ready(e) => e, + Poll::Pending => break, + }; - for (handler_num, (handler, handshake_message)) in self.in_handlers.iter_mut().enumerate() { - while let Poll::Ready(ev) = handler.poll(cx) { - match ev { - ProtocolsHandlerEvent::OutboundSubstreamRequest { .. } => - error!("Incoming substream handler tried to open a substream"), - ProtocolsHandlerEvent::Close(err) => void::unreachable(err), - ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::OpenRequest(_)) => - match self.enabled { - EnabledState::Initial => self.pending_in.push(handler_num), - EnabledState::Enabled => { - // We create `handshake_message` on a separate line to be sure - // that the lock is released as soon as possible. - let handshake_message = handshake_message.read().clone(); - handler.inject_event(NotifsInHandlerIn::Accept(handshake_message)) - }, - EnabledState::Disabled => - handler.inject_event(NotifsInHandlerIn::Refuse), + match ev { + ProtocolsHandlerEvent::OutboundSubstreamRequest { .. } => + error!("Incoming substream handler tried to open a substream"), + ProtocolsHandlerEvent::Close(err) => void::unreachable(err), + ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::OpenRequest(_)) => + match self.enabled { + EnabledState::Initial => self.pending_in.push(handler_num), + EnabledState::Enabled => { + // We create `handshake_message` on a separate line to be sure + // that the lock is released as soon as possible. + let handshake_message = handshake_message.read().clone(); + handler.inject_event(NotifsInHandlerIn::Accept(handshake_message)) }, - ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Closed) => {}, - ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Notif(message)) => { - if self.notifications_sink_rx.is_some() { - let msg = NotifsHandlerOut::Notification { - message, - protocol_name: handler.protocol_name().clone(), - }; - return Poll::Ready(ProtocolsHandlerEvent::Custom(msg)); - } + EnabledState::Disabled => + handler.inject_event(NotifsInHandlerIn::Refuse), }, - } - } - } - - } else { - // If `self.pending_legacy_handshake` is `None`, we don't want to accept events from - // the ingoing notifications substreams, but still want handshakes to go forward. - for (handler, _) in &mut self.in_handlers { - match handler.poll_process(cx) { - Poll::Pending => {}, - Poll::Ready(v) => match v {} + ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Closed) => {}, + ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Notif(message)) => { + debug_assert!(self.pending_legacy_handshake.is_none()); + if self.notifications_sink_rx.is_some() { + let msg = NotifsHandlerOut::Notification { + message, + protocol_name: handler.protocol_name().clone(), + }; + return Poll::Ready(ProtocolsHandlerEvent::Custom(msg)); + } + }, } } } diff --git a/client/network/src/protocol/generic_proto/handler/notif_in.rs b/client/network/src/protocol/generic_proto/handler/notif_in.rs index e47c2469615e7..52d3fb0c638b8 100644 --- a/client/network/src/protocol/generic_proto/handler/notif_in.rs +++ b/client/network/src/protocol/generic_proto/handler/notif_in.rs @@ -37,7 +37,7 @@ use libp2p::swarm::{ NegotiatedSubstream, }; use log::{error, warn}; -use std::{borrow::Cow, collections::VecDeque, convert::Infallible, fmt, pin::Pin, task::{Context, Poll}}; +use std::{borrow::Cow, collections::VecDeque, fmt, pin::Pin, task::{Context, Poll}}; /// Implements the `IntoProtocolsHandler` trait of libp2p. /// @@ -140,12 +140,21 @@ impl NotifsInHandler { self.in_protocol.protocol_name() } - /// Equivalent to the `poll` method of `ProtocolsHandler`, except that it only drives the - /// handshake process. + /// Equivalent to the `poll` method of `ProtocolsHandler`, except that it is guaranteed to + /// never generate [`NotifsInHandlerOut::Notif`]. /// /// Use this method in situations where it is not desirable to receive events but still /// necessary to drive any potential incoming handshake or request. - pub fn poll_process(&mut self, cx: &mut Context) -> Poll { + pub fn poll_process( + &mut self, + cx: &mut Context + ) -> Poll< + ProtocolsHandlerEvent + > { + if let Some(event) = self.events_queue.pop_front() { + return Poll::Ready(event) + } + match self.substream.as_mut().map(|s| NotificationsInSubstream::poll_process(Pin::new(s), cx)) { None | Some(Poll::Pending) => {}, Some(Poll::Ready(Ok(v))) => match v {}, From f14a792ed73e4915cee568f3c3dddbc8e079625c Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 31 Aug 2020 10:51:02 +0200 Subject: [PATCH 05/11] Also process OpenRequest and Closed --- .../protocol/generic_proto/handler/group.rs | 76 ++++++++++--------- .../generic_proto/handler/notif_in.rs | 17 ++++- 2 files changed, 52 insertions(+), 41 deletions(-) diff --git a/client/network/src/protocol/generic_proto/handler/group.rs b/client/network/src/protocol/generic_proto/handler/group.rs index 10e20158d4bea..0694390a5d9e1 100644 --- a/client/network/src/protocol/generic_proto/handler/group.rs +++ b/client/network/src/protocol/generic_proto/handler/group.rs @@ -674,46 +674,48 @@ impl ProtocolsHandler for NotifsHandler { return Poll::Ready(ProtocolsHandlerEvent::Close(NotifsHandlerError::Legacy(err))), } } + } + + for (handler_num, (handler, handshake_message)) in self.in_handlers.iter_mut().enumerate() { + loop { + let poll = if self.pending_legacy_handshake.is_none() { + handler.poll(cx) + } else { + handler.poll_process(cx) + }; + + let ev = match poll { + Poll::Ready(e) => e, + Poll::Pending => break, + }; - for (handler_num, (handler, handshake_message)) in self.in_handlers.iter_mut().enumerate() { - while let Poll::Ready(ev) = handler.poll(cx) { - match ev { - ProtocolsHandlerEvent::OutboundSubstreamRequest { .. } => - error!("Incoming substream handler tried to open a substream"), - ProtocolsHandlerEvent::Close(err) => void::unreachable(err), - ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::OpenRequest(_)) => - match self.enabled { - EnabledState::Initial => self.pending_in.push(handler_num), - EnabledState::Enabled => { - // We create `handshake_message` on a separate line to be sure - // that the lock is released as soon as possible. - let handshake_message = handshake_message.read().clone(); - handler.inject_event(NotifsInHandlerIn::Accept(handshake_message)) - }, - EnabledState::Disabled => - handler.inject_event(NotifsInHandlerIn::Refuse), + match ev { + ProtocolsHandlerEvent::OutboundSubstreamRequest { .. } => + error!("Incoming substream handler tried to open a substream"), + ProtocolsHandlerEvent::Close(err) => void::unreachable(err), + ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::OpenRequest(_)) => + match self.enabled { + EnabledState::Initial => self.pending_in.push(handler_num), + EnabledState::Enabled => { + // We create `handshake_message` on a separate line to be sure + // that the lock is released as soon as possible. + let handshake_message = handshake_message.read().clone(); + handler.inject_event(NotifsInHandlerIn::Accept(handshake_message)) }, - ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Closed) => {}, - ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Notif(message)) => { - if self.notifications_sink_rx.is_some() { - let msg = NotifsHandlerOut::Notification { - message, - protocol_name: handler.protocol_name().to_owned().into(), - }; - return Poll::Ready(ProtocolsHandlerEvent::Custom(msg)); - } + EnabledState::Disabled => + handler.inject_event(NotifsInHandlerIn::Refuse), }, - } - } - } - - } else { - // If `self.pending_legacy_handshake` is `None`, we don't want to accept events from - // the incoming notifications substreams, but still want handshakes to go forward. - for (handler, _) in &mut self.in_handlers { - match handler.poll_process(cx) { - Poll::Pending => {}, - Poll::Ready(v) => match v {} + ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Closed) => {}, + ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Notif(message)) => { + debug_assert!(self.pending_legacy_handshake.is_none()); + if self.notifications_sink_rx.is_some() { + let msg = NotifsHandlerOut::Notification { + message, + protocol_name: handler.protocol_name().clone(), + }; + return Poll::Ready(ProtocolsHandlerEvent::Custom(msg)); + } + }, } } } diff --git a/client/network/src/protocol/generic_proto/handler/notif_in.rs b/client/network/src/protocol/generic_proto/handler/notif_in.rs index 6775138f881e8..2cc7dca2978bc 100644 --- a/client/network/src/protocol/generic_proto/handler/notif_in.rs +++ b/client/network/src/protocol/generic_proto/handler/notif_in.rs @@ -37,7 +37,7 @@ use libp2p::swarm::{ NegotiatedSubstream, }; use log::{error, warn}; -use std::{borrow::Cow, collections::VecDeque, convert::Infallible, fmt, pin::Pin, task::{Context, Poll}}; +use std::{borrow::Cow, collections::VecDeque, fmt, pin::Pin, task::{Context, Poll}}; /// Implements the `IntoProtocolsHandler` trait of libp2p. /// @@ -140,12 +140,21 @@ impl NotifsInHandler { self.in_protocol.protocol_name() } - /// Equivalent to the `poll` method of `ProtocolsHandler`, except that it only drives the - /// handshake process. + /// Equivalent to the `poll` method of `ProtocolsHandler`, except that it is guaranteed to + /// never generate [`NotifsInHandlerOut::Notif`]. /// /// Use this method in situations where it is not desirable to receive events but still /// necessary to drive any potential incoming handshake or request. - pub fn poll_process(&mut self, cx: &mut Context) -> Poll { + pub fn poll_process( + &mut self, + cx: &mut Context + ) -> Poll< + ProtocolsHandlerEvent + > { + if let Some(event) = self.events_queue.pop_front() { + return Poll::Ready(event) + } + match self.substream.as_mut().map(|s| NotificationsInSubstream::poll_process(Pin::new(s), cx)) { None | Some(Poll::Pending) => {}, Some(Poll::Ready(Ok(v))) => match v {}, From e0f9faf3e6f3141c9168088c50fb0288a126cc3b Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 31 Aug 2020 11:04:49 +0200 Subject: [PATCH 06/11] Fix bad merge --- client/network/src/protocol/generic_proto/handler/group.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/network/src/protocol/generic_proto/handler/group.rs b/client/network/src/protocol/generic_proto/handler/group.rs index 0694390a5d9e1..a13c96883d859 100644 --- a/client/network/src/protocol/generic_proto/handler/group.rs +++ b/client/network/src/protocol/generic_proto/handler/group.rs @@ -711,7 +711,7 @@ impl ProtocolsHandler for NotifsHandler { if self.notifications_sink_rx.is_some() { let msg = NotifsHandlerOut::Notification { message, - protocol_name: handler.protocol_name().clone(), + protocol_name: handler.protocol_name().to_owned().into(), }; return Poll::Ready(ProtocolsHandlerEvent::Custom(msg)); } From fdee2d7112a0880c129319b88ae437bad418c72f Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 31 Aug 2020 11:08:35 +0200 Subject: [PATCH 07/11] God I'm so lost with all these merges --- client/network/src/protocol/generic_proto/handler/group.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/network/src/protocol/generic_proto/handler/group.rs b/client/network/src/protocol/generic_proto/handler/group.rs index 498368bea3fed..6804dd3c789da 100644 --- a/client/network/src/protocol/generic_proto/handler/group.rs +++ b/client/network/src/protocol/generic_proto/handler/group.rs @@ -711,7 +711,7 @@ impl ProtocolsHandler for NotifsHandler { if self.notifications_sink_rx.is_some() { let msg = NotifsHandlerOut::Notification { message, - protocol_name: handler.protocol_name().to_owned().into(), + protocol_name: handler.protocol_name().clone(), }; return Poll::Ready(ProtocolsHandlerEvent::Custom(msg)); } From 2d9cfe1050aad8d2dd8648d7078009a6626ba592 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 31 Aug 2020 12:14:38 +0200 Subject: [PATCH 08/11] Immediately return Closed --- client/network/src/protocol/generic_proto/handler/notif_in.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/network/src/protocol/generic_proto/handler/notif_in.rs b/client/network/src/protocol/generic_proto/handler/notif_in.rs index 52d3fb0c638b8..5a50cce268117 100644 --- a/client/network/src/protocol/generic_proto/handler/notif_in.rs +++ b/client/network/src/protocol/generic_proto/handler/notif_in.rs @@ -160,7 +160,7 @@ impl NotifsInHandler { Some(Poll::Ready(Ok(v))) => match v {}, Some(Poll::Ready(Err(_))) => { self.substream = None; - self.events_queue.push_back(ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Closed)); + return Poll::Ready(ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Closed)); }, } From b1fade9cb996d5485130fa79bc5f0940a59b5c0d Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 31 Aug 2020 13:44:48 +0200 Subject: [PATCH 09/11] Add warning for sending on non-registered protocol --- .../protocol/generic_proto/handler/group.rs | 28 +++++++++++++++++-- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/client/network/src/protocol/generic_proto/handler/group.rs b/client/network/src/protocol/generic_proto/handler/group.rs index 93fea19836ae5..6640d9286eff9 100644 --- a/client/network/src/protocol/generic_proto/handler/group.rs +++ b/client/network/src/protocol/generic_proto/handler/group.rs @@ -576,12 +576,34 @@ impl ProtocolsHandler for NotifsHandler { protocol_name, message } => { + let mut found_any_with_name = false; + for (handler, _) in &mut self.out_handlers { - if *handler.protocol_name() == protocol_name && handler.is_open() { - handler.send_or_discard(message); - continue 'poll_notifs_sink; + if *handler.protocol_name() == protocol_name { + found_any_with_name = true; + if handler.is_open() { + handler.send_or_discard(message); + continue 'poll_notifs_sink; + } } } + + // If this code is reached, there exists two possibilities: + // + // - User tried to send a notification on a non-existing protocol. This + // most likely relates to https://github.com/paritytech/substrate/issues/6827 + // - User tried to send a notification to a peer we're not or no longer + // connected to. This happens in a normal scenario due to the racy nature + // of connections and disconnections, and is benign. + // + // We print a warning in the former condition. + if !found_any_with_name { + log::warn!( + target: "sub-libp2p", + "Tried to send a notification on non-registered protocol: {:?}", + protocol_name + ); + } } NotificationsSinkMessage::ForceClose => { return Poll::Ready(ProtocolsHandlerEvent::Close(NotifsHandlerError::SyncNotificationsClogged)); From c05c04ba5dcf0f0413a7cd3ca8a0eef576fd7422 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 31 Aug 2020 14:14:10 +0200 Subject: [PATCH 10/11] Register GrandPa protocol in tests --- client/finality-grandpa/src/tests.rs | 11 ++++++++++- client/network/test/src/lib.rs | 10 ++++++++-- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/client/finality-grandpa/src/tests.rs b/client/finality-grandpa/src/tests.rs index d2905e4da4453..6e8def57f50ea 100644 --- a/client/finality-grandpa/src/tests.rs +++ b/client/finality-grandpa/src/tests.rs @@ -23,7 +23,7 @@ use assert_matches::assert_matches; use environment::HasVoted; use sc_network_test::{ Block, BlockImportAdapter, Hash, PassThroughVerifier, Peer, PeersClient, PeersFullClient, - TestClient, TestNetFactory, + TestClient, TestNetFactory, FullPeerConfig, }; use sc_network::config::{ProtocolConfig, BoxFinalityProofRequestBuilder}; use parking_lot::Mutex; @@ -94,6 +94,15 @@ impl TestNetFactory for GrandpaTestNet { ProtocolConfig::default() } + fn add_full_peer(&mut self) { + self.add_full_peer_with_config(FullPeerConfig { + notifications_protocols: vec![ + (communication::GRANDPA_ENGINE_ID, communication::GRANDPA_PROTOCOL_NAME.into()) + ], + ..Default::default() + }) + } + fn make_verifier( &self, _client: PeersClient, diff --git a/client/network/test/src/lib.rs b/client/network/test/src/lib.rs index d269842386cdd..587feebe55c14 100644 --- a/client/network/test/src/lib.rs +++ b/client/network/test/src/lib.rs @@ -22,7 +22,10 @@ mod block_import; #[cfg(test)] mod sync; -use std::{collections::HashMap, pin::Pin, sync::Arc, marker::PhantomData, task::{Poll, Context as FutureContext}}; +use std::{ + borrow::Cow, collections::HashMap, pin::Pin, sync::Arc, marker::PhantomData, + task::{Poll, Context as FutureContext} +}; use libp2p::build_multiaddr; use log::trace; @@ -55,7 +58,7 @@ use sp_core::H256; use sc_network::config::ProtocolConfig; use sp_runtime::generic::{BlockId, OpaqueDigestItemId}; use sp_runtime::traits::{Block as BlockT, Header as HeaderT, NumberFor}; -use sp_runtime::Justification; +use sp_runtime::{ConsensusEngineId, Justification}; use substrate_test_runtime_client::{self, AccountKeyring}; use sc_service::client::Client; pub use sc_network::config::EmptyTransactionPool; @@ -553,6 +556,8 @@ pub struct FullPeerConfig { pub keep_blocks: Option, /// Block announce validator. pub block_announce_validator: Option + Send + Sync>>, + /// List of notification protocols that the network must support. + pub notifications_protocols: Vec<(ConsensusEngineId, Cow<'static, str>)>, } pub trait TestNetFactory: Sized { @@ -663,6 +668,7 @@ pub trait TestNetFactory: Sized { network_config.transport = TransportConfig::MemoryOnly; network_config.listen_addresses = vec![listen_addr.clone()]; network_config.allow_non_globals_in_dht = true; + network_config.notifications_protocols = config.notifications_protocols; let network = NetworkWorker::new(sc_network::config::Params { role: Role::Full, From def99c500d12161461821a2ccb37fa85e26b4b53 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Wed, 2 Sep 2020 17:02:27 +0200 Subject: [PATCH 11/11] Update client/network/src/protocol/generic_proto/handler/group.rs Co-authored-by: Max Inden --- client/network/src/protocol/generic_proto/handler/group.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/network/src/protocol/generic_proto/handler/group.rs b/client/network/src/protocol/generic_proto/handler/group.rs index 6640d9286eff9..acb241af2ad2d 100644 --- a/client/network/src/protocol/generic_proto/handler/group.rs +++ b/client/network/src/protocol/generic_proto/handler/group.rs @@ -588,7 +588,7 @@ impl ProtocolsHandler for NotifsHandler { } } - // If this code is reached, there exists two possibilities: + // This code can be reached via the following scenarios: // // - User tried to send a notification on a non-existing protocol. This // most likely relates to https://github.com/paritytech/substrate/issues/6827