From 8b7f4094ea94d711d51d64572f53160ee77fba71 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Fri, 28 Aug 2020 16:03:40 +0200 Subject: [PATCH 1/6] Ensure that handshake is sent back even in case of back-pressure --- .../protocol/generic_proto/handler/group.rs | 10 ++++ .../generic_proto/handler/notif_in.rs | 20 +++++++- .../generic_proto/upgrade/notifications.rs | 46 ++++++++++++++++++- 3 files changed, 73 insertions(+), 3 deletions(-) diff --git a/client/network/src/protocol/generic_proto/handler/group.rs b/client/network/src/protocol/generic_proto/handler/group.rs index bcdbaf848511f..ce747dd495f6b 100644 --- a/client/network/src/protocol/generic_proto/handler/group.rs +++ b/client/network/src/protocol/generic_proto/handler/group.rs @@ -706,6 +706,16 @@ impl ProtocolsHandler for NotifsHandler { } } } + + } else { + // If `self.pending_legacy_handshake` is `None`, we don't want to accept events from + // the ingoing notifications substreams, but still want handshakes to go forward. + for (handler, _) in &mut self.in_handlers { + match handler.poll_process(cx) { + Poll::Pending => {}, + Poll::Ready(v) => match v {} + } + } } for (handler_num, (handler, _)) in self.out_handlers.iter_mut().enumerate() { diff --git a/client/network/src/protocol/generic_proto/handler/notif_in.rs b/client/network/src/protocol/generic_proto/handler/notif_in.rs index ddd78566fcd2a..6775138f881e8 100644 --- a/client/network/src/protocol/generic_proto/handler/notif_in.rs +++ b/client/network/src/protocol/generic_proto/handler/notif_in.rs @@ -37,7 +37,7 @@ use libp2p::swarm::{ NegotiatedSubstream, }; use log::{error, warn}; -use std::{borrow::Cow, collections::VecDeque, fmt, pin::Pin, task::{Context, Poll}}; +use std::{borrow::Cow, collections::VecDeque, convert::Infallible, fmt, pin::Pin, task::{Context, Poll}}; /// Implements the `IntoProtocolsHandler` trait of libp2p. /// @@ -139,6 +139,24 @@ impl NotifsInHandler { pub fn protocol_name(&self) -> &[u8] { self.in_protocol.protocol_name() } + + /// Equivalent to the `poll` method of `ProtocolsHandler`, except that it only drives the + /// handshake process. + /// + /// Use this method in situations where it is not desirable to receive events but still + /// necessary to drive any potential incoming handshake or request. + pub fn poll_process(&mut self, cx: &mut Context) -> Poll { + match self.substream.as_mut().map(|s| NotificationsInSubstream::poll_process(Pin::new(s), cx)) { + None | Some(Poll::Pending) => {}, + Some(Poll::Ready(Ok(v))) => match v {}, + Some(Poll::Ready(Err(_))) => { + self.substream = None; + self.events_queue.push_back(ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Closed)); + }, + } + + Poll::Pending + } } impl ProtocolsHandler for NotifsInHandler { diff --git a/client/network/src/protocol/generic_proto/upgrade/notifications.rs b/client/network/src/protocol/generic_proto/upgrade/notifications.rs index 80fd7761f8088..7113e90e91121 100644 --- a/client/network/src/protocol/generic_proto/upgrade/notifications.rs +++ b/client/network/src/protocol/generic_proto/upgrade/notifications.rs @@ -39,7 +39,7 @@ use futures::prelude::*; use futures_codec::Framed; use libp2p::core::{UpgradeInfo, InboundUpgrade, OutboundUpgrade, upgrade}; use log::error; -use std::{borrow::Cow, io, iter, mem, pin::Pin, task::{Context, Poll}}; +use std::{borrow::Cow, convert::Infallible, io, iter, mem, pin::Pin, task::{Context, Poll}}; use unsigned_varint::codec::UviBytes; /// Maximum allowed size of the two handshake messages, in bytes. @@ -158,7 +158,7 @@ where TSubstream: AsyncRead + AsyncWrite + Unpin + Send + 'static, } impl NotificationsInSubstream -where TSubstream: AsyncRead + AsyncWrite, +where TSubstream: AsyncRead + AsyncWrite + Unpin, { /// Sends the handshake in order to inform the remote that we accept the substream. pub fn send_handshake(&mut self, message: impl Into>) { @@ -169,6 +169,48 @@ where TSubstream: AsyncRead + AsyncWrite, self.handshake = NotificationsInSubstreamHandshake::PendingSend(message.into()); } + + /// Equivalent to `Stream::poll_next`, except that it only drives the handshake and is + /// guaranteed to not generate any notification. + pub fn poll_process(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let mut this = self.project(); + + loop { + match mem::replace(this.handshake, NotificationsInSubstreamHandshake::Sent) { + NotificationsInSubstreamHandshake::PendingSend(msg) => + match Sink::poll_ready(this.socket.as_mut(), cx) { + Poll::Ready(_) => { + *this.handshake = NotificationsInSubstreamHandshake::Flush; + match Sink::start_send(this.socket.as_mut(), io::Cursor::new(msg)) { + Ok(()) => {}, + Err(err) => return Poll::Ready(Err(err)), + } + }, + Poll::Pending => { + *this.handshake = NotificationsInSubstreamHandshake::PendingSend(msg); + return Poll::Pending + } + }, + NotificationsInSubstreamHandshake::Flush => + match Sink::poll_flush(this.socket.as_mut(), cx)? { + Poll::Ready(()) => + *this.handshake = NotificationsInSubstreamHandshake::Sent, + Poll::Pending => { + *this.handshake = NotificationsInSubstreamHandshake::Flush; + return Poll::Pending + } + }, + + st @ NotificationsInSubstreamHandshake::NotSent | + st @ NotificationsInSubstreamHandshake::Sent | + st @ NotificationsInSubstreamHandshake::ClosingInResponseToRemote | + st @ NotificationsInSubstreamHandshake::BothSidesClosed => { + *this.handshake = st; + return Poll::Pending; + } + } + } + } } impl Stream for NotificationsInSubstream From e8a73ba95a2473671a124f7fdcfbf6a2bd791668 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Fri, 28 Aug 2020 17:11:31 +0200 Subject: [PATCH 2/6] Update client/network/src/protocol/generic_proto/handler/group.rs Co-authored-by: Max Inden --- client/network/src/protocol/generic_proto/handler/group.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/network/src/protocol/generic_proto/handler/group.rs b/client/network/src/protocol/generic_proto/handler/group.rs index ce747dd495f6b..10e20158d4bea 100644 --- a/client/network/src/protocol/generic_proto/handler/group.rs +++ b/client/network/src/protocol/generic_proto/handler/group.rs @@ -709,7 +709,7 @@ impl ProtocolsHandler for NotifsHandler { } else { // If `self.pending_legacy_handshake` is `None`, we don't want to accept events from - // the ingoing notifications substreams, but still want handshakes to go forward. + // the incoming notifications substreams, but still want handshakes to go forward. for (handler, _) in &mut self.in_handlers { match handler.poll_process(cx) { Poll::Pending => {}, From f14a792ed73e4915cee568f3c3dddbc8e079625c Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 31 Aug 2020 10:51:02 +0200 Subject: [PATCH 3/6] Also process OpenRequest and Closed --- .../protocol/generic_proto/handler/group.rs | 76 ++++++++++--------- .../generic_proto/handler/notif_in.rs | 17 ++++- 2 files changed, 52 insertions(+), 41 deletions(-) diff --git a/client/network/src/protocol/generic_proto/handler/group.rs b/client/network/src/protocol/generic_proto/handler/group.rs index 10e20158d4bea..0694390a5d9e1 100644 --- a/client/network/src/protocol/generic_proto/handler/group.rs +++ b/client/network/src/protocol/generic_proto/handler/group.rs @@ -674,46 +674,48 @@ impl ProtocolsHandler for NotifsHandler { return Poll::Ready(ProtocolsHandlerEvent::Close(NotifsHandlerError::Legacy(err))), } } + } + + for (handler_num, (handler, handshake_message)) in self.in_handlers.iter_mut().enumerate() { + loop { + let poll = if self.pending_legacy_handshake.is_none() { + handler.poll(cx) + } else { + handler.poll_process(cx) + }; + + let ev = match poll { + Poll::Ready(e) => e, + Poll::Pending => break, + }; - for (handler_num, (handler, handshake_message)) in self.in_handlers.iter_mut().enumerate() { - while let Poll::Ready(ev) = handler.poll(cx) { - match ev { - ProtocolsHandlerEvent::OutboundSubstreamRequest { .. } => - error!("Incoming substream handler tried to open a substream"), - ProtocolsHandlerEvent::Close(err) => void::unreachable(err), - ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::OpenRequest(_)) => - match self.enabled { - EnabledState::Initial => self.pending_in.push(handler_num), - EnabledState::Enabled => { - // We create `handshake_message` on a separate line to be sure - // that the lock is released as soon as possible. - let handshake_message = handshake_message.read().clone(); - handler.inject_event(NotifsInHandlerIn::Accept(handshake_message)) - }, - EnabledState::Disabled => - handler.inject_event(NotifsInHandlerIn::Refuse), + match ev { + ProtocolsHandlerEvent::OutboundSubstreamRequest { .. } => + error!("Incoming substream handler tried to open a substream"), + ProtocolsHandlerEvent::Close(err) => void::unreachable(err), + ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::OpenRequest(_)) => + match self.enabled { + EnabledState::Initial => self.pending_in.push(handler_num), + EnabledState::Enabled => { + // We create `handshake_message` on a separate line to be sure + // that the lock is released as soon as possible. + let handshake_message = handshake_message.read().clone(); + handler.inject_event(NotifsInHandlerIn::Accept(handshake_message)) }, - ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Closed) => {}, - ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Notif(message)) => { - if self.notifications_sink_rx.is_some() { - let msg = NotifsHandlerOut::Notification { - message, - protocol_name: handler.protocol_name().to_owned().into(), - }; - return Poll::Ready(ProtocolsHandlerEvent::Custom(msg)); - } + EnabledState::Disabled => + handler.inject_event(NotifsInHandlerIn::Refuse), }, - } - } - } - - } else { - // If `self.pending_legacy_handshake` is `None`, we don't want to accept events from - // the incoming notifications substreams, but still want handshakes to go forward. - for (handler, _) in &mut self.in_handlers { - match handler.poll_process(cx) { - Poll::Pending => {}, - Poll::Ready(v) => match v {} + ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Closed) => {}, + ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Notif(message)) => { + debug_assert!(self.pending_legacy_handshake.is_none()); + if self.notifications_sink_rx.is_some() { + let msg = NotifsHandlerOut::Notification { + message, + protocol_name: handler.protocol_name().clone(), + }; + return Poll::Ready(ProtocolsHandlerEvent::Custom(msg)); + } + }, } } } diff --git a/client/network/src/protocol/generic_proto/handler/notif_in.rs b/client/network/src/protocol/generic_proto/handler/notif_in.rs index 6775138f881e8..2cc7dca2978bc 100644 --- a/client/network/src/protocol/generic_proto/handler/notif_in.rs +++ b/client/network/src/protocol/generic_proto/handler/notif_in.rs @@ -37,7 +37,7 @@ use libp2p::swarm::{ NegotiatedSubstream, }; use log::{error, warn}; -use std::{borrow::Cow, collections::VecDeque, convert::Infallible, fmt, pin::Pin, task::{Context, Poll}}; +use std::{borrow::Cow, collections::VecDeque, fmt, pin::Pin, task::{Context, Poll}}; /// Implements the `IntoProtocolsHandler` trait of libp2p. /// @@ -140,12 +140,21 @@ impl NotifsInHandler { self.in_protocol.protocol_name() } - /// Equivalent to the `poll` method of `ProtocolsHandler`, except that it only drives the - /// handshake process. + /// Equivalent to the `poll` method of `ProtocolsHandler`, except that it is guaranteed to + /// never generate [`NotifsInHandlerOut::Notif`]. /// /// Use this method in situations where it is not desirable to receive events but still /// necessary to drive any potential incoming handshake or request. - pub fn poll_process(&mut self, cx: &mut Context) -> Poll { + pub fn poll_process( + &mut self, + cx: &mut Context + ) -> Poll< + ProtocolsHandlerEvent + > { + if let Some(event) = self.events_queue.pop_front() { + return Poll::Ready(event) + } + match self.substream.as_mut().map(|s| NotificationsInSubstream::poll_process(Pin::new(s), cx)) { None | Some(Poll::Pending) => {}, Some(Poll::Ready(Ok(v))) => match v {}, From e0f9faf3e6f3141c9168088c50fb0288a126cc3b Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 31 Aug 2020 11:04:49 +0200 Subject: [PATCH 4/6] Fix bad merge --- client/network/src/protocol/generic_proto/handler/group.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/network/src/protocol/generic_proto/handler/group.rs b/client/network/src/protocol/generic_proto/handler/group.rs index 0694390a5d9e1..a13c96883d859 100644 --- a/client/network/src/protocol/generic_proto/handler/group.rs +++ b/client/network/src/protocol/generic_proto/handler/group.rs @@ -711,7 +711,7 @@ impl ProtocolsHandler for NotifsHandler { if self.notifications_sink_rx.is_some() { let msg = NotifsHandlerOut::Notification { message, - protocol_name: handler.protocol_name().clone(), + protocol_name: handler.protocol_name().to_owned().into(), }; return Poll::Ready(ProtocolsHandlerEvent::Custom(msg)); } From fdee2d7112a0880c129319b88ae437bad418c72f Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 31 Aug 2020 11:08:35 +0200 Subject: [PATCH 5/6] God I'm so lost with all these merges --- client/network/src/protocol/generic_proto/handler/group.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/network/src/protocol/generic_proto/handler/group.rs b/client/network/src/protocol/generic_proto/handler/group.rs index 498368bea3fed..6804dd3c789da 100644 --- a/client/network/src/protocol/generic_proto/handler/group.rs +++ b/client/network/src/protocol/generic_proto/handler/group.rs @@ -711,7 +711,7 @@ impl ProtocolsHandler for NotifsHandler { if self.notifications_sink_rx.is_some() { let msg = NotifsHandlerOut::Notification { message, - protocol_name: handler.protocol_name().to_owned().into(), + protocol_name: handler.protocol_name().clone(), }; return Poll::Ready(ProtocolsHandlerEvent::Custom(msg)); } From 2d9cfe1050aad8d2dd8648d7078009a6626ba592 Mon Sep 17 00:00:00 2001 From: Pierre Krieger Date: Mon, 31 Aug 2020 12:14:38 +0200 Subject: [PATCH 6/6] Immediately return Closed --- client/network/src/protocol/generic_proto/handler/notif_in.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/network/src/protocol/generic_proto/handler/notif_in.rs b/client/network/src/protocol/generic_proto/handler/notif_in.rs index 52d3fb0c638b8..5a50cce268117 100644 --- a/client/network/src/protocol/generic_proto/handler/notif_in.rs +++ b/client/network/src/protocol/generic_proto/handler/notif_in.rs @@ -160,7 +160,7 @@ impl NotifsInHandler { Some(Poll::Ready(Ok(v))) => match v {}, Some(Poll::Ready(Err(_))) => { self.substream = None; - self.events_queue.push_back(ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Closed)); + return Poll::Ready(ProtocolsHandlerEvent::Custom(NotifsInHandlerOut::Closed)); }, }