Skip to content
This repository was archived by the owner on Jul 4, 2022. It is now read-only.

Commit 5afc777

Browse files
mxindenrphmeier
authored andcommitted
client/finality-grandpa: Make round_communication use bounded channel (#4691)
* clinet/finality-grandpa: Make round_communication use bounded channel `round_communication` returns a `Sink` and a `Stream` for outgoing and incoming messages. The messages send into the `Sink` are forwarded down to the network as well as send back into the `Stream` to ensure the node processes its own messages. So far, to send messages into the `Sink` back into the `Stream`, an unbounded channel was used. This patch updates `round_communication` and `OutgoingMessages` to use a bounded channel. This is part of a greater effort to reduce the number of owners of components within `finality-grandpa` and `network` as well as to reduce the amount of unbounded channels. For details see d4fbb89 and f0c1852. * client/finality-grandpa: Import futures03::future::ready at the top * client/finality-grandpa: Make tests use compat of future 03 * client/finality-grandpa: Do not import ready into scope Instead of importing futures03::future::ready into the scope, only import futures::future03 into scope and call ready as furure03::ready.
1 parent 7bd0dbf commit 5afc777

File tree

3 files changed

+76
-47
lines changed

3 files changed

+76
-47
lines changed

client/finality-grandpa/src/communication/mod.rs

Lines changed: 48 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -27,12 +27,13 @@
2727
//! In the future, there will be a fallback for allowing sending the same message
2828
//! under certain conditions that are used to un-stick the protocol.
2929
30-
use futures::{prelude::*, sync::mpsc};
30+
use futures::prelude::*;
3131
use futures03::{
3232
channel::mpsc as mpsc03,
3333
compat::Compat,
34-
future::{Future as Future03},
35-
stream::StreamExt,
34+
future::{self as future03, Future as Future03},
35+
sink::Sink as Sink03,
36+
stream::{Stream as Stream03, StreamExt},
3637
};
3738
use log::{debug, trace};
3839
use parking_lot::Mutex;
@@ -276,8 +277,8 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
276277
local_key: Option<AuthorityPair>,
277278
has_voted: HasVoted<B>,
278279
) -> (
279-
impl Stream<Item=SignedMessage<B>,Error=Error>,
280-
impl Sink<SinkItem=Message<B>,SinkError=Error>,
280+
impl Stream03<Item=SignedMessage<B>> + Unpin,
281+
OutgoingMessages<B>,
281282
) {
282283
self.note_round(
283284
round,
@@ -295,22 +296,20 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
295296
});
296297

297298
let topic = round_topic::<B>(round.0, set_id.0);
298-
let incoming = Compat::new(self.gossip_engine.messages_for(topic)
299-
.map(|item| Ok::<_, ()>(item)))
300-
.filter_map(|notification| {
299+
let incoming = self.gossip_engine.messages_for(topic)
300+
.filter_map(move |notification| {
301301
let decoded = GossipMessage::<B>::decode(&mut &notification.message[..]);
302-
if let Err(ref e) = decoded {
303-
debug!(target: "afg", "Skipping malformed message {:?}: {}", notification, e);
304-
}
305-
decoded.ok()
306-
})
307-
.and_then(move |msg| {
308-
match msg {
309-
GossipMessage::Vote(msg) => {
302+
303+
match decoded {
304+
Err(ref e) => {
305+
debug!(target: "afg", "Skipping malformed message {:?}: {}", notification, e);
306+
return future03::ready(None);
307+
}
308+
Ok(GossipMessage::Vote(msg)) => {
310309
// check signature.
311310
if !voters.contains_key(&msg.message.id) {
312311
debug!(target: "afg", "Skipping message from unknown voter {}", msg.message.id);
313-
return Ok(None);
312+
return future03::ready(None);
314313
}
315314

316315
if voters.len() <= TELEMETRY_VOTERS_LIMIT {
@@ -339,18 +338,16 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
339338
};
340339
}
341340

342-
Ok(Some(msg.message))
341+
future03::ready(Some(msg.message))
343342
}
344343
_ => {
345344
debug!(target: "afg", "Skipping unknown message type");
346-
return Ok(None);
345+
return future03::ready(None);
347346
}
348347
}
349-
})
350-
.filter_map(|x| x)
351-
.map_err(|()| Error::Network(format!("Failed to receive message on unbounded stream")));
348+
});
352349

353-
let (tx, out_rx) = mpsc::unbounded();
350+
let (tx, out_rx) = mpsc03::channel(0);
354351
let outgoing = OutgoingMessages::<B> {
355352
round: round.0,
356353
set_id: set_id.0,
@@ -360,14 +357,10 @@ impl<B: BlockT, N: Network<B>> NetworkBridge<B, N> {
360357
has_voted,
361358
};
362359

363-
let out_rx = out_rx.map_err(move |()| Error::Network(
364-
format!("Failed to receive on unbounded receiver for round {}", round.0)
365-
));
366-
367360
// Combine incoming votes from external GRANDPA nodes with outgoing
368361
// votes from our own GRANDPA voter to have a single
369362
// vote-import-pipeline.
370-
let incoming = incoming.select(out_rx);
363+
let incoming = futures03::stream::select(incoming, out_rx);
371364

372365
(incoming, outgoing)
373366
}
@@ -690,21 +683,29 @@ pub(crate) fn check_message_sig_with_buffer<Block: BlockT>(
690683
/// use the same raw message and key to sign. This is currently true for
691684
/// `ed25519` and `BLS` signatures (which we might use in the future), care must
692685
/// be taken when switching to different key types.
693-
struct OutgoingMessages<Block: BlockT> {
686+
pub(crate) struct OutgoingMessages<Block: BlockT> {
694687
round: RoundNumber,
695688
set_id: SetIdNumber,
696689
locals: Option<(AuthorityPair, AuthorityId)>,
697-
sender: mpsc::UnboundedSender<SignedMessage<Block>>,
690+
sender: mpsc03::Sender<SignedMessage<Block>>,
698691
network: GossipEngine<Block>,
699692
has_voted: HasVoted<Block>,
700693
}
701694

702-
impl<Block: BlockT> Sink for OutgoingMessages<Block>
695+
impl<B: BlockT> Unpin for OutgoingMessages<B> {}
696+
697+
impl<Block: BlockT> Sink03<Message<Block>> for OutgoingMessages<Block>
703698
{
704-
type SinkItem = Message<Block>;
705-
type SinkError = Error;
699+
type Error = Error;
700+
701+
fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll03<Result<(), Self::Error>> {
702+
Sink03::poll_ready(Pin::new(&mut self.sender), cx)
703+
.map(|elem| { elem.map_err(|e| {
704+
Error::Network(format!("Failed to poll_ready channel sender: {:?}", e))
705+
})})
706+
}
706707

707-
fn start_send(&mut self, mut msg: Message<Block>) -> StartSend<Message<Block>, Error> {
708+
fn start_send(mut self: Pin<&mut Self>, mut msg: Message<Block>) -> Result<(), Self::Error> {
708709
// if we've voted on this round previously under the same key, send that vote instead
709710
match &mut msg {
710711
finality_grandpa::Message::PrimaryPropose(ref mut vote) =>
@@ -760,17 +761,23 @@ impl<Block: BlockT> Sink for OutgoingMessages<Block>
760761
self.network.gossip_message(topic, message.encode(), false);
761762

762763
// forward the message to the inner sender.
763-
let _ = self.sender.unbounded_send(signed);
764-
}
764+
return self.sender.start_send(signed).map_err(|e| {
765+
Error::Network(format!("Failed to start_send on channel sender: {:?}", e))
766+
});
767+
};
765768

766-
Ok(AsyncSink::Ready)
769+
Ok(())
767770
}
768771

769-
fn poll_complete(&mut self) -> Poll<(), Error> { Ok(Async::Ready(())) }
772+
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context) -> Poll03<Result<(), Self::Error>> {
773+
Poll03::Ready(Ok(()))
774+
}
770775

771-
fn close(&mut self) -> Poll<(), Error> {
772-
// ignore errors since we allow this inner sender to be closed already.
773-
self.sender.close().or_else(|_| Ok(Async::Ready(())))
776+
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll03<Result<(), Self::Error>> {
777+
Sink03::poll_close(Pin::new(&mut self.sender), cx)
778+
.map(|elem| { elem.map_err(|e| {
779+
Error::Network(format!("Failed to poll_close channel sender: {:?}", e))
780+
})})
774781
}
775782
}
776783

client/finality-grandpa/src/environment.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,11 @@ use std::time::Duration;
2222
use log::{debug, warn, info};
2323
use parity_scale_codec::{Decode, Encode};
2424
use futures::prelude::*;
25-
use futures03::future::{FutureExt as _, TryFutureExt as _};
25+
use futures03::{
26+
compat::{Compat, CompatSink},
27+
future::{FutureExt as _, TryFutureExt as _},
28+
stream::StreamExt as _,
29+
};
2630
use futures_timer::Delay;
2731
use parking_lot::RwLock;
2832
use sp_blockchain::{HeaderBackend, Error as ClientError};
@@ -608,6 +612,9 @@ where
608612
has_voted,
609613
);
610614

615+
let incoming = Compat::new(incoming.map(|item| Ok::<_, Error>(item)));
616+
let outgoing = CompatSink::new(outgoing);
617+
611618
// schedule incoming messages from the network to be held until
612619
// corresponding blocks are imported.
613620
let incoming = Box::new(UntilVoteTargetImported::new(

client/finality-grandpa/src/tests.rs

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,17 @@ use sp_consensus::{
3737
BlockOrigin, ForkChoiceStrategy, ImportedAux, BlockImportParams, ImportResult, BlockImport,
3838
import_queue::{BoxJustificationImport, BoxFinalityProofImport},
3939
};
40-
use std::collections::{HashMap, HashSet};
41-
use std::result;
40+
use std::{
41+
collections::{HashMap, HashSet},
42+
result,
43+
pin::Pin, task,
44+
};
4245
use parity_scale_codec::Decode;
43-
use sp_runtime::traits::{Header as HeaderT, HasherFor};
46+
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, HasherFor};
4447
use sp_runtime::generic::{BlockId, DigestItem};
4548
use sp_core::{H256, NativeOrEncoded, ExecutionContext, crypto::Public};
4649
use sp_finality_grandpa::{GRANDPA_ENGINE_ID, AuthorityList, GrandpaApi};
4750
use sp_state_machine::{InMemoryBackend, prove_read, read_proof_check};
48-
use std::{pin::Pin, task};
4951

5052
use authorities::AuthoritySet;
5153
use finality_proof::{
@@ -1282,6 +1284,9 @@ fn voter_persists_its_votes() {
12821284
HasVoted::No,
12831285
);
12841286

1287+
let round_rx = futures03::compat::Compat::new(round_rx.map(|item| Ok::<_, Error>(item)));
1288+
let round_tx = futures03::compat::CompatSink::new(round_tx);
1289+
12851290
let round_tx = Arc::new(Mutex::new(round_tx));
12861291
let exit_tx = Arc::new(Mutex::new(Some(exit_tx)));
12871292

@@ -1332,7 +1337,17 @@ fn voter_persists_its_votes() {
13321337
target_hash: block_30_hash,
13331338
};
13341339

1335-
round_tx.lock().start_send(finality_grandpa::Message::Prevote(prevote)).unwrap();
1340+
// One should either be calling `Sink::send` or `Sink::start_send` followed
1341+
// by `Sink::poll_complete` to make sure items are being flushed. Given that
1342+
// we send in a loop including a delay until items are received, this can be
1343+
// ignored for the sake of reduced complexity.
1344+
if !round_tx.lock()
1345+
.start_send(finality_grandpa::Message::Prevote(prevote))
1346+
.unwrap()
1347+
.is_ready() {
1348+
panic!("expected sink to be ready to write to.");
1349+
}
1350+
13361351
Ok(())
13371352
}).map_err(|_| panic!()))
13381353

0 commit comments

Comments
 (0)