From e457a2973ef841cdeec2fff4f8fcdf206b13d0bb Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Mon, 9 Jul 2018 17:33:54 +0100 Subject: [PATCH 01/11] skeleton of collators object --- polkadot/network/src/collators.rs | 157 ++++++++++++++++++++++++ polkadot/network/src/lib.rs | 30 ++--- substrate/network/src/protocol.rs | 5 + substrate/network/src/specialization.rs | 3 + 4 files changed, 180 insertions(+), 15 deletions(-) create mode 100644 polkadot/network/src/collators.rs diff --git a/polkadot/network/src/collators.rs b/polkadot/network/src/collators.rs new file mode 100644 index 0000000000000..0ce0f3c5270fd --- /dev/null +++ b/polkadot/network/src/collators.rs @@ -0,0 +1,157 @@ +// Copyright 2018 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Bridge between the network and consensus service for getting collations to it. + +use polkadot_primitives::{AccountId, Hash}; +use polkadot_primitives::parachain::{Id as ParaId, Collation}; +use substrate_network::{PeerId, Context}; + +use futures::prelude::*; +use futures::sync::oneshot; + +use std::collections::hash_map::{HashMap, Entry}; +use std::sync::Arc; +use parking_lot::Mutex; + +/// The role of the collator. Whether they're the primary or backup for this parachain. +pub enum Role { + /// Primary collators should send collations whenever it's time. + Primary, + /// Backup collators should not. + Backup, +} + +/// A maintenance action for the collator set. +pub enum Action { + /// Disconnect the given collator. + Disconnect(AccountId), + /// Give the collator a new role. + NewRole(AccountId, Role), +} + +/// Manages connected collators and role assignments from the perspective of a validator. +#[derive(Clone)] +pub struct Collators { + inner: Arc>, +} + +impl Collators { + /// Create a new `Collators` object. + pub fn new() -> Self { + Collators { + inner: Arc::new(Mutex::new(Inner { + collators: HashMap::new(), + bad_collators: Vec::new(), + parachain_collators: HashMap::new(), + })) + } + } + + /// Call when a new collator is authenticated. Returns the role. + pub fn on_new_collator(&self, account_id: AccountId, para_id: ParaId) -> Role { + let mut inner = self.inner.lock(); + + inner.collators.insert(account_id.clone(), para_id); + match inner.parachain_collators.entry(para_id) { + Entry::Vacant(mut vacant) => { + vacant.insert(ParachainCollators { + primary: account_id, + backup: Vec::new(), + collations: HashMap::new(), + }); + + Role::Primary + }, + Entry::Occupied(mut occupied) => { + occupied.get_mut().backup.push(account_id); + + Role::Backup + } + } + } + + /// Called when a collator disconnects. If it was the primary, returns a new primary for that + /// parachain. + pub fn on_disconnect(&self, account_id: AccountId) -> Option<(AccountId, ParaId)> { + self.inner.lock().on_disconnect(account_id) + } + + /// Call periodically to perform collator set maintenance. + /// Returns a set of actions. + pub fn maintain_peers(&self) -> Vec { + // get rid of all bad peers. + let mut inner = self.inner.lock(); + let mut actions = Vec::new(); + let bad = ::std::mem::replace(&mut inner.bad_collators, Vec::new()); + for account in bad { + actions.push(Action::Disconnect(account)); + if let Some((new_primary, _)) = inner.on_disconnect(account) { + actions.push(Action::NewRole(new_primary, Role::Primary)); + } + } + + // TODO: put underperforming collators on the back-burner. + + actions + } +} + +struct Inner { + collators: HashMap, + bad_collators: Vec, + parachain_collators: HashMap, +} + +impl Inner { + fn on_disconnect(&mut self, account_id: AccountId) -> Option<(AccountId, ParaId)> { + self.collators.remove(&account_id).and_then(|para_id| match self.parachain_collators.entry(para_id) { + Entry::Vacant(_) => None, + Entry::Occupied(mut occ) => { + if occ.get().primary == account_id { + if occ.get().backup.is_empty() { + occ.remove(); + None + } else { + let mut collators = occ.get_mut(); + collators.primary = collators.backup.pop().expect("backup non-empty; qed"); + Some((collators.primary, para_id)) + } + } else { + None + } + } + }) + } +} + +enum CollationSlot { + // not queried yet + Pending(Vec), + // waiting for next to arrive. + Awaiting(oneshot::Sender), +} + +struct ParachainCollators { + primary: AccountId, + backup: Vec, + collations: HashMap, +} + +#[cfg(test)] +mod tests { + +} diff --git a/polkadot/network/src/lib.rs b/polkadot/network/src/lib.rs index 91c53338e6571..441af041fb601 100644 --- a/polkadot/network/src/lib.rs +++ b/polkadot/network/src/lib.rs @@ -43,6 +43,7 @@ extern crate rhododendron; #[macro_use] extern crate log; +mod collators; mod router; pub mod consensus; @@ -57,10 +58,12 @@ use substrate_network::consensus_gossip::ConsensusGossip; use substrate_network::{message, generic_message}; use substrate_network::specialization::Specialization; use substrate_network::StatusMessage as GenericFullStatus; +use self::collators::Collators; use std::collections::{HashMap, HashSet}; use std::sync::Arc; + #[cfg(test)] mod tests; @@ -75,16 +78,16 @@ pub type NetworkService = ::substrate_network::Service; /// Status of a Polkadot node. #[derive(Debug, PartialEq, Eq, Clone)] pub struct Status { - collating_for: Option, + collating_for: Option<(AccountId, ParaId)>, } impl Slicable for Status { fn encode(&self) -> Vec { let mut v = Vec::new(); match self.collating_for { - Some(ref id) => { + Some(ref details) => { v.push(1); - id.using_encoded(|s| v.extend(s)); + details.using_encoded(|s| v.extend(s)); } None => { v.push(0); @@ -96,7 +99,7 @@ impl Slicable for Status { fn decode(input: &mut I) -> Option { let collating_for = match input.read_byte()? { 0 => None, - 1 => Some(ParaId::decode(input)?), + 1 => Some(Slicable::decode(input)?), _ => return None, }; Some(Status { collating_for }) @@ -207,8 +210,7 @@ fn send_polkadot_message(ctx: &mut Context, to: PeerId, message: Message) pub struct PolkadotProtocol { peers: HashMap, consensus_gossip: ConsensusGossip, - collators: HashMap>, - collating_for: Option, + collators: Collators, live_consensus: Option, in_flight: HashMap<(RequestId, PeerId), BlockDataRequest>, pending: Vec, @@ -221,8 +223,7 @@ impl PolkadotProtocol { PolkadotProtocol { peers: HashMap::new(), consensus_gossip: ConsensusGossip::new(), - collators: HashMap::new(), - collating_for: None, + collators: Collators::new(), live_consensus: None, in_flight: HashMap::new(), pending: Vec::new(), @@ -398,10 +399,8 @@ impl Specialization for PolkadotProtocol { } }; - if let Some(ref para_id) = local_status.collating_for { - self.collators.entry(para_id.clone()) - .or_insert_with(Vec::new) - .push(peer_id); + if let Some((ref acc_id, ref para_id)) = local_status.collating_for { + let collator_role = self.collators.on_new_collator(acc_id.clone(), para_id.clone()); } let validator = status.roles.iter().any(|r| *r == message::Role::Authority); @@ -426,9 +425,10 @@ impl Specialization for PolkadotProtocol { fn on_disconnect(&mut self, ctx: &mut Context, peer_id: PeerId) { if let Some(info) = self.peers.remove(&peer_id) { - if let Some(collators) = info.status.collating_for.and_then(|id| self.collators.get_mut(&id)) { - if let Some(pos) = collators.iter().position(|x| x == &peer_id) { - collators.swap_remove(pos); + if let Some((acc_id, _)) = info.status.collating_for { + if let Some((new_primary, _)) = self.collators.on_disconnect(acc_id) { + // TODO: send new primary a role-change message. + unimplemented!() } } diff --git a/substrate/network/src/protocol.rs b/substrate/network/src/protocol.rs index e77ad6307e9be..3986a979f1914 100644 --- a/substrate/network/src/protocol.rs +++ b/substrate/network/src/protocol.rs @@ -556,6 +556,11 @@ impl> Protocol where B::Header: HeaderT: Send + Sync + 'static { /// Called periodically to maintain peers and handle timeouts. fn maintain_peers(&mut self, _ctx: &mut Context) { } + + /// Called when a block is _imported_ at the head of the chain (not during major sync). + fn on_block_imported(&mut self, _ctx: &mut Context, hash: B::Hash, header: &B::Header) { } } From b04aa4e33c287568024b3db8ed46dcde56c2a314 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Tue, 10 Jul 2018 12:44:52 +0100 Subject: [PATCH 02/11] awaiting and handling collations. rename `collators` to CollationPool --- polkadot/collator/src/lib.rs | 5 +- .../src/{collators.rs => collator_pool.rs} | 168 +++++++++++------- polkadot/network/src/lib.rs | 39 +++- polkadot/primitives/src/parachain.rs | 13 +- substrate/network/src/specialization.rs | 2 +- 5 files changed, 152 insertions(+), 75 deletions(-) rename polkadot/network/src/{collators.rs => collator_pool.rs} (52%) diff --git a/polkadot/collator/src/lib.rs b/polkadot/collator/src/lib.rs index f7557f353e195..a4629c57938ec 100644 --- a/polkadot/collator/src/lib.rs +++ b/polkadot/collator/src/lib.rs @@ -157,7 +157,8 @@ pub fn collate<'a, R, P>( ingress.0.iter().flat_map(|&(id, ref msgs)| msgs.iter().cloned().map(move |msg| (id, msg))) ); - let signature = key.sign(&block_data.0[..]).into(); + let block_data_hash = block_data.hash(); + let signature = key.sign(&block_data_hash.0[..]).into(); let pubkey_bytes: [u8; 32] = key.public().into(); let receipt = parachain::CandidateReceipt { @@ -168,7 +169,7 @@ pub fn collate<'a, R, P>( balance_uploads: Vec::new(), egress_queue_roots: Vec::new(), fees: 0, - block_data_hash: block_data.hash(), + block_data_hash, }; parachain::Collation { diff --git a/polkadot/network/src/collators.rs b/polkadot/network/src/collator_pool.rs similarity index 52% rename from polkadot/network/src/collators.rs rename to polkadot/network/src/collator_pool.rs index 0ce0f3c5270fd..10c3c0cf5721a 100644 --- a/polkadot/network/src/collators.rs +++ b/polkadot/network/src/collator_pool.rs @@ -18,14 +18,10 @@ use polkadot_primitives::{AccountId, Hash}; use polkadot_primitives::parachain::{Id as ParaId, Collation}; -use substrate_network::{PeerId, Context}; -use futures::prelude::*; use futures::sync::oneshot; use std::collections::hash_map::{HashMap, Entry}; -use std::sync::Arc; -use parking_lot::Mutex; /// The role of the collator. Whether they're the primary or backup for this parachain. pub enum Role { @@ -43,35 +39,85 @@ pub enum Action { NewRole(AccountId, Role), } +enum CollationSlot { + Blank, + // not queried yet + Pending(Vec), + // waiting for next to arrive. + Awaiting(Vec>), +} + +impl CollationSlot { + fn received_collation(&mut self, collation: Collation) { + *self = match ::std::mem::replace(self, CollationSlot::Blank) { + CollationSlot::Blank => CollationSlot::Pending(vec![collation]), + CollationSlot::Pending(mut cs) => { + cs.push(collation); + CollationSlot::Pending(cs) + } + CollationSlot::Awaiting(senders) => { + for sender in senders { + let _ = sender.send(collation.clone()); + } + + CollationSlot::Blank + } + }; + } + + fn await_with(&mut self, sender: oneshot::Sender) { + *self = match ::std::mem::replace(self, CollationSlot::Blank) { + CollationSlot::Blank => CollationSlot::Awaiting(vec![sender]), + CollationSlot::Awaiting(mut senders) => { + senders.push(sender); + CollationSlot::Awaiting(senders) + } + CollationSlot::Pending(mut cs) => { + let next_collation = cs.pop().expect("empty variant is always `Blank`; qed"); + let _ = sender.send(next_collation); + + if cs.is_empty() { + CollationSlot::Blank + } else { + CollationSlot::Pending(cs) + } + } + }; + } +} + +struct ParachainCollators { + primary: AccountId, + backup: Vec, +} + /// Manages connected collators and role assignments from the perspective of a validator. -#[derive(Clone)] -pub struct Collators { - inner: Arc>, +pub struct CollatorPool { + collators: HashMap, + bad_collators: Vec, + parachain_collators: HashMap, + collations: HashMap<(Hash, ParaId), CollationSlot>, } -impl Collators { - /// Create a new `Collators` object. +impl CollatorPool { + /// Create a new `CollatorPool` object. pub fn new() -> Self { - Collators { - inner: Arc::new(Mutex::new(Inner { - collators: HashMap::new(), - bad_collators: Vec::new(), - parachain_collators: HashMap::new(), - })) + CollatorPool { + collators: HashMap::new(), + bad_collators: Vec::new(), + parachain_collators: HashMap::new(), + collations: HashMap::new(), } } /// Call when a new collator is authenticated. Returns the role. - pub fn on_new_collator(&self, account_id: AccountId, para_id: ParaId) -> Role { - let mut inner = self.inner.lock(); - - inner.collators.insert(account_id.clone(), para_id); - match inner.parachain_collators.entry(para_id) { + pub fn on_new_collator(&mut self, account_id: AccountId, para_id: ParaId) -> Role { + self.collators.insert(account_id.clone(), para_id); + match self.parachain_collators.entry(para_id) { Entry::Vacant(mut vacant) => { vacant.insert(ParachainCollators { primary: account_id, backup: Vec::new(), - collations: HashMap::new(), }); Role::Primary @@ -86,38 +132,7 @@ impl Collators { /// Called when a collator disconnects. If it was the primary, returns a new primary for that /// parachain. - pub fn on_disconnect(&self, account_id: AccountId) -> Option<(AccountId, ParaId)> { - self.inner.lock().on_disconnect(account_id) - } - - /// Call periodically to perform collator set maintenance. - /// Returns a set of actions. - pub fn maintain_peers(&self) -> Vec { - // get rid of all bad peers. - let mut inner = self.inner.lock(); - let mut actions = Vec::new(); - let bad = ::std::mem::replace(&mut inner.bad_collators, Vec::new()); - for account in bad { - actions.push(Action::Disconnect(account)); - if let Some((new_primary, _)) = inner.on_disconnect(account) { - actions.push(Action::NewRole(new_primary, Role::Primary)); - } - } - - // TODO: put underperforming collators on the back-burner. - - actions - } -} - -struct Inner { - collators: HashMap, - bad_collators: Vec, - parachain_collators: HashMap, -} - -impl Inner { - fn on_disconnect(&mut self, account_id: AccountId) -> Option<(AccountId, ParaId)> { + pub fn on_disconnect(&mut self, account_id: AccountId) -> Option<(AccountId, ParaId)> { self.collators.remove(&account_id).and_then(|para_id| match self.parachain_collators.entry(para_id) { Entry::Vacant(_) => None, Entry::Occupied(mut occ) => { @@ -136,19 +151,44 @@ impl Inner { } }) } -} -enum CollationSlot { - // not queried yet - Pending(Vec), - // waiting for next to arrive. - Awaiting(oneshot::Sender), -} + /// Called when a collation is received. + /// The collator should be registered for the parachain of the collation as a precondition of this function. + /// The collation should have been checked for integrity of signature before passing to this function. + pub fn on_collation(&mut self, account_id: AccountId, relay_parent: Hash, collation: Collation) { + if let Some(para_id) = self.collators.get(&account_id) { + debug_assert_eq!(para_id, &collation.receipt.parachain_index); -struct ParachainCollators { - primary: AccountId, - backup: Vec, - collations: HashMap, + self.collations.entry((relay_parent, para_id.clone())) + .or_insert_with(|| CollationSlot::Blank) + .received_collation(collation); + } + } + + /// Wait for a collation from a parachain. + pub fn await_collation(&mut self, relay_parent: Hash, para_id: ParaId, sender: oneshot::Sender) { + self.collations.entry((relay_parent, para_id)) + .or_insert_with(|| CollationSlot::Blank) + .await_with(sender); + } + + /// Call periodically to perform collator set maintenance. + /// Returns a set of actions to perform on the network level. + pub fn maintain_peers(&mut self) -> Vec { + // get rid of all bad peers. + let mut actions = Vec::new(); + let bad = ::std::mem::replace(&mut self.bad_collators, Vec::new()); + for account in bad { + actions.push(Action::Disconnect(account)); + if let Some((new_primary, _)) = self.on_disconnect(account) { + actions.push(Action::NewRole(new_primary, Role::Primary)); + } + } + + // TODO: put underperforming collators on the back-burner. + + actions + } } #[cfg(test)] diff --git a/polkadot/network/src/lib.rs b/polkadot/network/src/lib.rs index 441af041fb601..6c481c540ab94 100644 --- a/polkadot/network/src/lib.rs +++ b/polkadot/network/src/lib.rs @@ -43,7 +43,7 @@ extern crate rhododendron; #[macro_use] extern crate log; -mod collators; +mod collator_pool; mod router; pub mod consensus; @@ -51,14 +51,14 @@ use codec::Slicable; use futures::sync::oneshot; use parking_lot::Mutex; use polkadot_consensus::{Statement, SignedStatement, GenericStatement}; -use polkadot_primitives::{Block, SessionKey, Hash}; -use polkadot_primitives::parachain::{Id as ParaId, BlockData, Extrinsic, CandidateReceipt}; +use polkadot_primitives::{AccountId, Block, SessionKey, Hash}; +use polkadot_primitives::parachain::{Id as ParaId, BlockData, Extrinsic, CandidateReceipt, Collation}; use substrate_network::{PeerId, RequestId, Context}; use substrate_network::consensus_gossip::ConsensusGossip; use substrate_network::{message, generic_message}; use substrate_network::specialization::Specialization; use substrate_network::StatusMessage as GenericFullStatus; -use self::collators::Collators; +use self::collator_pool::CollatorPool; use std::collections::{HashMap, HashSet}; use std::sync::Arc; @@ -199,6 +199,8 @@ pub enum Message { RequestBlockData(RequestId, Hash), /// Provide block data by candidate hash or nothing if unknown. BlockData(RequestId, Option), + /// A collation provided by a peer. Relay parent and collation. + Collation(Hash, Collation), } fn send_polkadot_message(ctx: &mut Context, to: PeerId, message: Message) { @@ -210,7 +212,7 @@ fn send_polkadot_message(ctx: &mut Context, to: PeerId, message: Message) pub struct PolkadotProtocol { peers: HashMap, consensus_gossip: ConsensusGossip, - collators: Collators, + collators: CollatorPool, live_consensus: Option, in_flight: HashMap<(RequestId, PeerId), BlockDataRequest>, pending: Vec, @@ -223,7 +225,7 @@ impl PolkadotProtocol { PolkadotProtocol { peers: HashMap::new(), consensus_gossip: ConsensusGossip::new(), - collators: Collators::new(), + collators: CollatorPool::new(), live_consensus: None, in_flight: HashMap::new(), pending: Vec::new(), @@ -364,6 +366,7 @@ impl PolkadotProtocol { send_polkadot_message(ctx, peer_id, Message::BlockData(req_id, block_data)); } Message::BlockData(req_id, data) => self.on_block_data(ctx, peer_id, req_id, data), + Message::Collation(relay_parent, collation) => self.on_collation(ctx, peer_id, relay_parent, collation), } } @@ -387,7 +390,7 @@ impl PolkadotProtocol { impl Specialization for PolkadotProtocol { fn status(&self) -> Vec { - Status { collating_for: self.collating_for.clone() }.encode() + Status { collating_for: None }.encode() } fn on_connect(&mut self, ctx: &mut Context, peer_id: PeerId, status: FullStatus) { @@ -487,3 +490,25 @@ impl Specialization for PolkadotProtocol { self.dispatch_pending_requests(ctx); } } + +impl PolkadotProtocol { + // we received a collation from a peer + fn on_collation(&mut self, ctx: &mut Context, from: PeerId, relay_parent: Hash, collation: Collation) { + let collation_para = collation.receipt.parachain_index; + let collated_acc = collation.receipt.collator; + + match self.peers.get(&from) { + None => ctx.disconnect_peer(from), + Some(peer_info) => match peer_info.status.collating_for { + None => ctx.disable_peer(from), + Some((ref acc_id, ref para_id)) + if para_id != &collation_para || acc_id != &collated_acc || collation.receipt.check_signature().is_err() => ctx.disable_peer(from), + Some((ref acc_id, _)) => self.collators.on_collation(acc_id.clone(), relay_parent, collation), + }, + } + } + + fn await_collation(&mut self, relay_parent: Hash, para_id: ParaId, sender: oneshot::Sender) { + self.collators.await_collation(relay_parent, para_id, sender); + } +} diff --git a/polkadot/primitives/src/parachain.rs b/polkadot/primitives/src/parachain.rs index dc77669785d04..bb87f7d3261ac 100644 --- a/polkadot/primitives/src/parachain.rs +++ b/polkadot/primitives/src/parachain.rs @@ -144,7 +144,7 @@ pub struct CandidateReceipt { pub parachain_index: Id, /// The collator's relay-chain account ID pub collator: super::AccountId, - /// Signature on block data by collator. + /// Signature on blake2-256 of the block data by collator. pub signature: CandidateSignature, /// The head-data pub head_data: HeadData, @@ -195,6 +195,17 @@ impl CandidateReceipt { use runtime_primitives::traits::{BlakeTwo256, Hashing}; BlakeTwo256::hash_of(self) } + + /// Check integrity vs. provided block data. + pub fn check_signature(&self) -> Result<(), ()> { + use runtime_primitives::traits::Verify; + + if self.signature.verify(&self.signature.0[..], &self.collator) { + Ok(()) + } else { + Err(()) + } + } } impl PartialOrd for CandidateReceipt { diff --git a/substrate/network/src/specialization.rs b/substrate/network/src/specialization.rs index 57f027794b3d5..999c545291d88 100644 --- a/substrate/network/src/specialization.rs +++ b/substrate/network/src/specialization.rs @@ -44,5 +44,5 @@ pub trait Specialization: Send + Sync + 'static { fn maintain_peers(&mut self, _ctx: &mut Context) { } /// Called when a block is _imported_ at the head of the chain (not during major sync). - fn on_block_imported(&mut self, _ctx: &mut Context, hash: B::Hash, header: &B::Header) { } + fn on_block_imported(&mut self, _ctx: &mut Context, _hash: B::Hash, _header: &B::Header) { } } From e9a3ace74a4ef0ea2eaa9bd7815009b98cafd9cd Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Tue, 10 Jul 2018 15:38:33 +0100 Subject: [PATCH 03/11] add some tests --- polkadot/network/src/collator_pool.rs | 59 ++++++++++++++++++++++++++- 1 file changed, 58 insertions(+), 1 deletion(-) diff --git a/polkadot/network/src/collator_pool.rs b/polkadot/network/src/collator_pool.rs index 10c3c0cf5721a..4bdebb8543bee 100644 --- a/polkadot/network/src/collator_pool.rs +++ b/polkadot/network/src/collator_pool.rs @@ -24,6 +24,7 @@ use futures::sync::oneshot; use std::collections::hash_map::{HashMap, Entry}; /// The role of the collator. Whether they're the primary or backup for this parachain. +#[derive(PartialEq, Debug)] pub enum Role { /// Primary collators should send collations whenever it's time. Primary, @@ -32,6 +33,7 @@ pub enum Role { } /// A maintenance action for the collator set. +#[derive(PartialEq, Debug)] pub enum Action { /// Disconnect the given collator. Disconnect(AccountId), @@ -114,7 +116,7 @@ impl CollatorPool { pub fn on_new_collator(&mut self, account_id: AccountId, para_id: ParaId) -> Role { self.collators.insert(account_id.clone(), para_id); match self.parachain_collators.entry(para_id) { - Entry::Vacant(mut vacant) => { + Entry::Vacant(vacant) => { vacant.insert(ParachainCollators { primary: account_id, backup: Vec::new(), @@ -159,6 +161,8 @@ impl CollatorPool { if let Some(para_id) = self.collators.get(&account_id) { debug_assert_eq!(para_id, &collation.receipt.parachain_index); + // TODO: punish if not primary? + self.collations.entry((relay_parent, para_id.clone())) .or_insert_with(|| CollationSlot::Blank) .received_collation(collation); @@ -189,9 +193,62 @@ impl CollatorPool { actions } + + /// Note a bad collator. + pub fn note_bad(&mut self, collator: AccountId) { + self.bad_collators.push(collator); + } } #[cfg(test)] mod tests { + use super::*; + use polkadot_primitives::parachain::{CandidateReceipt, BlockData, HeadData}; + use substrate_primitives::H512; + use futures::Future; + + #[test] + fn note_bad_primary_gives_new_primary() { + let mut pool = CollatorPool::new(); + let para_id: ParaId = 5.into(); + let bad_primary = [0; 32].into(); + let good_backup = [1; 32].into(); + + assert_eq!(pool.on_new_collator(bad_primary, para_id.clone()), Role::Primary); + assert_eq!(pool.on_new_collator(good_backup, para_id.clone()), Role::Backup); + + pool.note_bad(bad_primary); + + assert_eq!(pool.maintain_peers(), vec![ + Action::Disconnect(bad_primary), + Action::NewRole(good_backup, Role::Primary), + ]); + } + #[test] + fn await_before_collation() { + let mut pool = CollatorPool::new(); + let para_id: ParaId = 5.into(); + let primary = [0; 32].into(); + let relay_parent = [1; 32].into(); + + assert_eq!(pool.on_new_collator(primary, para_id.clone()), Role::Primary); + let (tx, rx) = oneshot::channel(); + pool.await_collation(relay_parent, para_id, tx); + pool.on_collation(primary, relay_parent, Collation { + receipt: CandidateReceipt { + parachain_index: para_id, + collator: primary.into(), + signature: H512::from([2; 64]).into(), + head_data: HeadData(vec![1, 2, 3]), + balance_uploads: vec![], + egress_queue_roots: vec![], + fees: 0, + block_data_hash: [3; 32].into(), + }, + block_data: BlockData(vec![4, 5, 6]), + }); + + rx.wait().unwrap(); + } } From c152a0c7fe4314ec986fafd7ba2403f9d1775fa7 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Tue, 10 Jul 2018 15:50:40 +0100 Subject: [PATCH 04/11] add tests --- polkadot/network/src/collator_pool.rs | 35 ++++++--------------------- 1 file changed, 7 insertions(+), 28 deletions(-) diff --git a/polkadot/network/src/collator_pool.rs b/polkadot/network/src/collator_pool.rs index 4bdebb8543bee..9d0790f430dbd 100644 --- a/polkadot/network/src/collator_pool.rs +++ b/polkadot/network/src/collator_pool.rs @@ -134,7 +134,7 @@ impl CollatorPool { /// Called when a collator disconnects. If it was the primary, returns a new primary for that /// parachain. - pub fn on_disconnect(&mut self, account_id: AccountId) -> Option<(AccountId, ParaId)> { + pub fn on_disconnect(&mut self, account_id: AccountId) -> Option { self.collators.remove(&account_id).and_then(|para_id| match self.parachain_collators.entry(para_id) { Entry::Vacant(_) => None, Entry::Occupied(mut occ) => { @@ -145,7 +145,7 @@ impl CollatorPool { } else { let mut collators = occ.get_mut(); collators.primary = collators.backup.pop().expect("backup non-empty; qed"); - Some((collators.primary, para_id)) + Some(collators.primary) } } else { None @@ -179,24 +179,8 @@ impl CollatorPool { /// Call periodically to perform collator set maintenance. /// Returns a set of actions to perform on the network level. pub fn maintain_peers(&mut self) -> Vec { - // get rid of all bad peers. - let mut actions = Vec::new(); - let bad = ::std::mem::replace(&mut self.bad_collators, Vec::new()); - for account in bad { - actions.push(Action::Disconnect(account)); - if let Some((new_primary, _)) = self.on_disconnect(account) { - actions.push(Action::NewRole(new_primary, Role::Primary)); - } - } - - // TODO: put underperforming collators on the back-burner. - - actions - } - - /// Note a bad collator. - pub fn note_bad(&mut self, collator: AccountId) { - self.bad_collators.push(collator); + // TODO: rearrange periodically to new primary, evaluate based on latency etc. + Vec::new() } } @@ -208,7 +192,7 @@ mod tests { use futures::Future; #[test] - fn note_bad_primary_gives_new_primary() { + fn disconnect_primary_gives_new_primary() { let mut pool = CollatorPool::new(); let para_id: ParaId = 5.into(); let bad_primary = [0; 32].into(); @@ -216,13 +200,8 @@ mod tests { assert_eq!(pool.on_new_collator(bad_primary, para_id.clone()), Role::Primary); assert_eq!(pool.on_new_collator(good_backup, para_id.clone()), Role::Backup); - - pool.note_bad(bad_primary); - - assert_eq!(pool.maintain_peers(), vec![ - Action::Disconnect(bad_primary), - Action::NewRole(good_backup, Role::Primary), - ]); + assert_eq!(pool.on_disconnect(bad_primary), Some(good_backup)); + assert_eq!(pool.on_disconnect(good_backup), None); } #[test] From 352565cbd82a9253b6fcf5b4f2b7860dd13e36e3 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Tue, 10 Jul 2018 16:03:54 +0100 Subject: [PATCH 05/11] implement Collators trait for ConsensusNetwork --- polkadot/network/src/consensus.rs | 37 ++++++++++++++++++++++++++----- polkadot/network/src/lib.rs | 20 ++++++++++++++--- 2 files changed, 48 insertions(+), 9 deletions(-) diff --git a/polkadot/network/src/consensus.rs b/polkadot/network/src/consensus.rs index 0eb14d9381aaa..7cd79abfdab11 100644 --- a/polkadot/network/src/consensus.rs +++ b/polkadot/network/src/consensus.rs @@ -27,7 +27,7 @@ use polkadot_consensus::{Network, SharedTable, Collators}; use polkadot_primitives::{AccountId, Block, Hash, SessionKey}; use polkadot_primitives::parachain::{Id as ParaId, Collation}; -use futures::{future, prelude::*}; +use futures::prelude::*; use futures::sync::mpsc; use std::sync::Arc; @@ -304,13 +304,38 @@ impl Network for ConsensusNetwork

>); + +impl Future for AwaitingCollation { + type Item = Collation; + type Error = NetworkDown; + + fn poll(&mut self) -> Poll { + match self.0.poll().map_err(|_| NetworkDown)? { + Async::Ready(None) => Err(NetworkDown), + Async::Ready(Some(x)) => Ok(Async::Ready(x)), + Async::NotReady => Ok(Async::NotReady), + } + } +} + + impl Collators for ConsensusNetwork

{ - type Error = (); - type Collation = future::Empty; + type Error = NetworkDown; + type Collation = AwaitingCollation; - fn collate(&self, _parachain: ParaId, _relay_parent: Hash) -> Self::Collation { - future::empty() + fn collate(&self, parachain: ParaId, relay_parent: Hash) -> Self::Collation { + AwaitingCollation( + self.network.with_spec(|spec, _| spec.await_collation(relay_parent, parachain)) + ) } - fn note_bad_collator(&self, _collator: AccountId) { } + fn note_bad_collator(&self, collator: AccountId) { + self.network.with_spec(|spec, ctx| spec.disconnect_collator(ctx, collator)); + } } diff --git a/polkadot/network/src/lib.rs b/polkadot/network/src/lib.rs index 6c481c540ab94..acd014deddda6 100644 --- a/polkadot/network/src/lib.rs +++ b/polkadot/network/src/lib.rs @@ -429,7 +429,7 @@ impl Specialization for PolkadotProtocol { fn on_disconnect(&mut self, ctx: &mut Context, peer_id: PeerId) { if let Some(info) = self.peers.remove(&peer_id) { if let Some((acc_id, _)) = info.status.collating_for { - if let Some((new_primary, _)) = self.collators.on_disconnect(acc_id) { + if let Some(new_primary) = self.collators.on_disconnect(acc_id) { // TODO: send new primary a role-change message. unimplemented!() } @@ -508,7 +508,21 @@ impl PolkadotProtocol { } } - fn await_collation(&mut self, relay_parent: Hash, para_id: ParaId, sender: oneshot::Sender) { - self.collators.await_collation(relay_parent, para_id, sender); + fn await_collation(&mut self, relay_parent: Hash, para_id: ParaId) -> oneshot::Receiver { + let (tx, rx) = oneshot::channel(); + self.collators.await_collation(relay_parent, para_id, tx); + rx + } + + // disconnect a collator by account-id. + fn disconnect_collator(&mut self, ctx: &mut Context, account_id: AccountId) { + let bad_peers = self.peers + .iter() + .filter(|&(_, info)| info.status.collating_for.as_ref().map_or(false, |&(ref acc_id, _)| acc_id == &account_id)) + .map(|(peer_id, _)| *peer_id); + + for peer in bad_peers { + ctx.disable_peer(peer); + } } } From 11875ec74e5dc371ef8fc5a78439d8bd311a2dd9 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Tue, 10 Jul 2018 16:49:04 +0100 Subject: [PATCH 06/11] plug collators into main polkadot-network --- polkadot/network/src/collator_pool.rs | 5 +-- polkadot/network/src/consensus.rs | 2 +- polkadot/network/src/lib.rs | 63 ++++++++++++++++++++++----- 3 files changed, 54 insertions(+), 16 deletions(-) diff --git a/polkadot/network/src/collator_pool.rs b/polkadot/network/src/collator_pool.rs index 9d0790f430dbd..12568941f1308 100644 --- a/polkadot/network/src/collator_pool.rs +++ b/polkadot/network/src/collator_pool.rs @@ -24,7 +24,7 @@ use futures::sync::oneshot; use std::collections::hash_map::{HashMap, Entry}; /// The role of the collator. Whether they're the primary or backup for this parachain. -#[derive(PartialEq, Debug)] +#[derive(PartialEq, Debug, Serialize, Deserialize)] pub enum Role { /// Primary collators should send collations whenever it's time. Primary, @@ -34,6 +34,7 @@ pub enum Role { /// A maintenance action for the collator set. #[derive(PartialEq, Debug)] +#[allow(dead_code)] pub enum Action { /// Disconnect the given collator. Disconnect(AccountId), @@ -96,7 +97,6 @@ struct ParachainCollators { /// Manages connected collators and role assignments from the perspective of a validator. pub struct CollatorPool { collators: HashMap, - bad_collators: Vec, parachain_collators: HashMap, collations: HashMap<(Hash, ParaId), CollationSlot>, } @@ -106,7 +106,6 @@ impl CollatorPool { pub fn new() -> Self { CollatorPool { collators: HashMap::new(), - bad_collators: Vec::new(), parachain_collators: HashMap::new(), collations: HashMap::new(), } diff --git a/polkadot/network/src/consensus.rs b/polkadot/network/src/consensus.rs index 7cd79abfdab11..3fe22acd5aacf 100644 --- a/polkadot/network/src/consensus.rs +++ b/polkadot/network/src/consensus.rs @@ -336,6 +336,6 @@ impl Collators for ConsensusNetwork } fn note_bad_collator(&self, collator: AccountId) { - self.network.with_spec(|spec, ctx| spec.disconnect_collator(ctx, collator)); + self.network.with_spec(|spec, ctx| spec.disconnect_bad_collator(ctx, collator)); } } diff --git a/polkadot/network/src/lib.rs b/polkadot/network/src/lib.rs index acd014deddda6..e5341885b134a 100644 --- a/polkadot/network/src/lib.rs +++ b/polkadot/network/src/lib.rs @@ -58,7 +58,7 @@ use substrate_network::consensus_gossip::ConsensusGossip; use substrate_network::{message, generic_message}; use substrate_network::specialization::Specialization; use substrate_network::StatusMessage as GenericFullStatus; -use self::collator_pool::CollatorPool; +use self::collator_pool::{CollatorPool, Role, Action}; use std::collections::{HashMap, HashSet}; use std::sync::Arc; @@ -199,6 +199,8 @@ pub enum Message { RequestBlockData(RequestId, Hash), /// Provide block data by candidate hash or nothing if unknown. BlockData(RequestId, Option), + /// Tell a collator their role. + CollatorRole(Role), /// A collation provided by a peer. Relay parent and collation. Collation(Hash, Collation), } @@ -263,7 +265,10 @@ impl PolkadotProtocol { let parent_hash = consensus.parent_hash; let old_parent = self.live_consensus.as_ref().map(|c| c.parent_hash); - for (id, info) in self.peers.iter_mut().filter(|&(_, ref info)| info.validator) { + // TODO: optimize for when session key changes and only send to collators who are relevant in next few blocks. + for (id, info) in self.peers.iter_mut() + .filter(|&(_, ref info)| info.validator || info.status.collating_for.is_some()) + { send_polkadot_message( ctx, *id, @@ -367,6 +372,7 @@ impl PolkadotProtocol { } Message::BlockData(req_id, data) => self.on_block_data(ctx, peer_id, req_id, data), Message::Collation(relay_parent, collation) => self.on_collation(ctx, peer_id, relay_parent, collation), + Message::CollatorRole(_) => unimplemented!(), } } @@ -398,12 +404,22 @@ impl Specialization for PolkadotProtocol { Some(status) => status, None => { ctx.disable_peer(peer_id); - return; + return } }; if let Some((ref acc_id, ref para_id)) = local_status.collating_for { + if self.collator_peer_id(acc_id.clone()).is_some() { + ctx.disable_peer(peer_id); + return + } + let collator_role = self.collators.on_new_collator(acc_id.clone(), para_id.clone()); + send_polkadot_message( + ctx, + peer_id, + Message::CollatorRole(collator_role), + ); } let validator = status.roles.iter().any(|r| *r == message::Role::Authority); @@ -429,9 +445,15 @@ impl Specialization for PolkadotProtocol { fn on_disconnect(&mut self, ctx: &mut Context, peer_id: PeerId) { if let Some(info) = self.peers.remove(&peer_id) { if let Some((acc_id, _)) = info.status.collating_for { - if let Some(new_primary) = self.collators.on_disconnect(acc_id) { - // TODO: send new primary a role-change message. - unimplemented!() + let new_primary = self.collators.on_disconnect(acc_id) + .and_then(|new_primary| self.collator_peer_id(new_primary)); + + if let Some(new_primary) = new_primary { + send_polkadot_message( + ctx, + new_primary, + Message::CollatorRole(Role::Primary), + ) } } @@ -488,6 +510,19 @@ impl Specialization for PolkadotProtocol { fn maintain_peers(&mut self, ctx: &mut Context) { self.consensus_gossip.collect_garbage(None); self.dispatch_pending_requests(ctx); + + for collator_action in self.collators.maintain_peers() { + match collator_action { + Action::Disconnect(collator) => self.disconnect_bad_collator(ctx, collator), + Action::NewRole(account_id, role) => if let Some(collator) = self.collator_peer_id(account_id) { + send_polkadot_message( + ctx, + collator, + Message::CollatorRole(role), + ) + }, + } + } } } @@ -514,15 +549,19 @@ impl PolkadotProtocol { rx } - // disconnect a collator by account-id. - fn disconnect_collator(&mut self, ctx: &mut Context, account_id: AccountId) { - let bad_peers = self.peers + // get connected peer with given account ID for collation. + fn collator_peer_id(&self, account_id: AccountId) -> Option { + self.peers .iter() .filter(|&(_, info)| info.status.collating_for.as_ref().map_or(false, |&(ref acc_id, _)| acc_id == &account_id)) - .map(|(peer_id, _)| *peer_id); + .map(|(peer_id, _)| *peer_id) + .next() + } - for peer in bad_peers { - ctx.disable_peer(peer); + // disconnect a collator by account-id. + fn disconnect_bad_collator(&self, ctx: &mut Context, account_id: AccountId) { + if let Some(peer_id) = self.collator_peer_id(account_id) { + ctx.disable_peer(peer_id) } } } From d2366634b1313c4346510bc61f20c048cab5de50 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 11 Jul 2018 10:42:23 +0100 Subject: [PATCH 07/11] ignore collator role message --- polkadot/network/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/polkadot/network/src/lib.rs b/polkadot/network/src/lib.rs index b0d31499be528..3bf77a52123be 100644 --- a/polkadot/network/src/lib.rs +++ b/polkadot/network/src/lib.rs @@ -372,7 +372,7 @@ impl PolkadotProtocol { } Message::BlockData(req_id, data) => self.on_block_data(ctx, peer_id, req_id, data), Message::Collation(relay_parent, collation) => self.on_collation(ctx, peer_id, relay_parent, collation), - Message::CollatorRole(_) => unimplemented!(), + Message::CollatorRole(_) => {}, } } From bd70053cbd5e21eddbb2cf32fa447e2e57e1630c Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 11 Jul 2018 10:57:18 +0100 Subject: [PATCH 08/11] add a couple more tests --- polkadot/network/src/collator_pool.rs | 35 +++++++++++++++++++++++++-- polkadot/network/src/lib.rs | 5 ++-- polkadot/network/src/tests.rs | 28 ++++++++++++++++++--- 3 files changed, 61 insertions(+), 7 deletions(-) diff --git a/polkadot/network/src/collator_pool.rs b/polkadot/network/src/collator_pool.rs index 12568941f1308..2d05e02c8ce5a 100644 --- a/polkadot/network/src/collator_pool.rs +++ b/polkadot/network/src/collator_pool.rs @@ -211,8 +211,37 @@ mod tests { let relay_parent = [1; 32].into(); assert_eq!(pool.on_new_collator(primary, para_id.clone()), Role::Primary); - let (tx, rx) = oneshot::channel(); - pool.await_collation(relay_parent, para_id, tx); + let (tx1, rx1) = oneshot::channel(); + let (tx2, rx2) = oneshot::channel(); + pool.await_collation(relay_parent, para_id, tx1); + pool.await_collation(relay_parent, para_id, tx2); + pool.on_collation(primary, relay_parent, Collation { + receipt: CandidateReceipt { + parachain_index: para_id, + collator: primary.into(), + signature: H512::from([2; 64]).into(), + head_data: HeadData(vec![1, 2, 3]), + balance_uploads: vec![], + egress_queue_roots: vec![], + fees: 0, + block_data_hash: [3; 32].into(), + }, + block_data: BlockData(vec![4, 5, 6]), + }); + + rx1.wait().unwrap(); + rx2.wait().unwrap(); + } + + #[test] + fn collate_before_await() { + let mut pool = CollatorPool::new(); + let para_id: ParaId = 5.into(); + let primary = [0; 32].into(); + let relay_parent = [1; 32].into(); + + assert_eq!(pool.on_new_collator(primary, para_id.clone()), Role::Primary); + pool.on_collation(primary, relay_parent, Collation { receipt: CandidateReceipt { parachain_index: para_id, @@ -227,6 +256,8 @@ mod tests { block_data: BlockData(vec![4, 5, 6]), }); + let (tx, rx) = oneshot::channel(); + pool.await_collation(relay_parent, para_id, tx); rx.wait().unwrap(); } } diff --git a/polkadot/network/src/lib.rs b/polkadot/network/src/lib.rs index 3bf77a52123be..d149a1ed320ee 100644 --- a/polkadot/network/src/lib.rs +++ b/polkadot/network/src/lib.rs @@ -422,6 +422,8 @@ impl Specialization for PolkadotProtocol { } let validator = status.roles.iter().any(|r| *r == message::Role::Authority); + let send_key = validator || local_status.collating_for.is_some(); + self.peers.insert(peer_id, PeerInfo { status: local_status, session_keys: Default::default(), @@ -429,8 +431,7 @@ impl Specialization for PolkadotProtocol { }); self.consensus_gossip.new_peer(ctx, peer_id, &status.roles); - - if let (true, &Some(ref consensus)) = (validator, &self.live_consensus) { + if let (true, &Some(ref consensus)) = (send_key, &self.live_consensus) { send_polkadot_message( ctx, peer_id, diff --git a/polkadot/network/src/tests.rs b/polkadot/network/src/tests.rs index 19db9890ccb97..5e3eca4651646 100644 --- a/polkadot/network/src/tests.rs +++ b/polkadot/network/src/tests.rs @@ -110,11 +110,12 @@ fn sends_session_key() { let parent_hash = [0; 32].into(); let local_key = [1; 32].into(); - let status = Status { collating_for: None }; + let validator_status = Status { collating_for: None }; + let collator_status = Status { collating_for: Some(([2; 32].into(), 5.into())) }; { let mut ctx = TestContext::default(); - protocol.on_connect(&mut ctx, peer_a, make_status(&status, vec![Role::Authority])); + protocol.on_connect(&mut ctx, peer_a, make_status(&validator_status, vec![Role::Authority])); assert!(ctx.messages.is_empty()); } @@ -128,7 +129,7 @@ fn sends_session_key() { { let mut ctx = TestContext::default(); - protocol.on_connect(&mut ctx, peer_b, make_status(&status, vec![Role::Authority])); + protocol.on_connect(&mut ctx, peer_b, make_status(&collator_status, vec![])); assert!(ctx.has_message(peer_b, Message::SessionKey(parent_hash, local_key))); } } @@ -207,3 +208,24 @@ fn fetches_from_those_with_knowledge() { assert_eq!(recv.wait().unwrap(), block_data); } } + +#[test] +fn remove_bad_collator() { + let mut protocol = PolkadotProtocol::new(); + + let peer_id = 1; + let account_id = [2; 32].into(); + + let status = Status { collating_for: Some((account_id, 5.into())) }; + + { + let mut ctx = TestContext::default(); + protocol.on_connect(&mut ctx, peer_id, make_status(&status, vec![])); + } + + { + let mut ctx = TestContext::default(); + protocol.disconnect_bad_collator(&mut ctx, account_id); + assert!(ctx.disabled.contains(&peer_id)); + } +} From 2a03637388183a8a78406faa17fa407cf30f1d6b Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 11 Jul 2018 11:44:31 +0100 Subject: [PATCH 09/11] garbage collection for collations --- polkadot/consensus/src/collation.rs | 6 +++ polkadot/network/src/collator_pool.rs | 74 +++++++++++++++++++++------ polkadot/network/src/lib.rs | 7 ++- 3 files changed, 69 insertions(+), 18 deletions(-) diff --git a/polkadot/consensus/src/collation.rs b/polkadot/consensus/src/collation.rs index db490a0eb17d8..f7db48db619bb 100644 --- a/polkadot/consensus/src/collation.rs +++ b/polkadot/consensus/src/collation.rs @@ -37,6 +37,12 @@ pub trait Collators: Clone { type Collation: IntoFuture; /// Collate on a specific parachain, building on a given relay chain parent hash. + /// + /// The returned collation should be checked for basic validity in the signature + /// and will be checked for state-transition validity by the consumer of this trait. + /// + /// This does not have to guarantee local availability, as a valid collation + /// will be passed to the `TableRouter` instance. fn collate(&self, parachain: ParaId, relay_parent: Hash) -> Self::Collation; /// Note a bad collator. TODO: take proof diff --git a/polkadot/network/src/collator_pool.rs b/polkadot/network/src/collator_pool.rs index 2d05e02c8ce5a..76df8098219b3 100644 --- a/polkadot/network/src/collator_pool.rs +++ b/polkadot/network/src/collator_pool.rs @@ -22,6 +22,9 @@ use polkadot_primitives::parachain::{Id as ParaId, Collation}; use futures::sync::oneshot; use std::collections::hash_map::{HashMap, Entry}; +use std::time::{Duration, Instant}; + +const COLLATION_LIFETIME: Duration = Duration::from_secs(60 * 5); /// The role of the collator. Whether they're the primary or backup for this parachain. #[derive(PartialEq, Debug, Serialize, Deserialize)] @@ -42,7 +45,25 @@ pub enum Action { NewRole(AccountId, Role), } -enum CollationSlot { +struct CollationSlot { + live_at: Instant, + entries: SlotEntries, +} + +impl CollationSlot { + fn blank_now() -> Self { + CollationSlot { + live_at: Instant::now(), + entries: SlotEntries::Blank, + } + } + + fn stay_alive(&self, now: Instant) -> bool { + self.live_at + COLLATION_LIFETIME > now + } +} + +enum SlotEntries { Blank, // not queried yet Pending(Vec), @@ -50,39 +71,39 @@ enum CollationSlot { Awaiting(Vec>), } -impl CollationSlot { +impl SlotEntries { fn received_collation(&mut self, collation: Collation) { - *self = match ::std::mem::replace(self, CollationSlot::Blank) { - CollationSlot::Blank => CollationSlot::Pending(vec![collation]), - CollationSlot::Pending(mut cs) => { + *self = match ::std::mem::replace(self, SlotEntries::Blank) { + SlotEntries::Blank => SlotEntries::Pending(vec![collation]), + SlotEntries::Pending(mut cs) => { cs.push(collation); - CollationSlot::Pending(cs) + SlotEntries::Pending(cs) } - CollationSlot::Awaiting(senders) => { + SlotEntries::Awaiting(senders) => { for sender in senders { let _ = sender.send(collation.clone()); } - CollationSlot::Blank + SlotEntries::Blank } }; } fn await_with(&mut self, sender: oneshot::Sender) { - *self = match ::std::mem::replace(self, CollationSlot::Blank) { - CollationSlot::Blank => CollationSlot::Awaiting(vec![sender]), - CollationSlot::Awaiting(mut senders) => { + *self = match ::std::mem::replace(self, SlotEntries::Blank) { + SlotEntries::Blank => SlotEntries::Awaiting(vec![sender]), + SlotEntries::Awaiting(mut senders) => { senders.push(sender); - CollationSlot::Awaiting(senders) + SlotEntries::Awaiting(senders) } - CollationSlot::Pending(mut cs) => { + SlotEntries::Pending(mut cs) => { let next_collation = cs.pop().expect("empty variant is always `Blank`; qed"); let _ = sender.send(next_collation); if cs.is_empty() { - CollationSlot::Blank + SlotEntries::Blank } else { - CollationSlot::Pending(cs) + SlotEntries::Pending(cs) } } }; @@ -163,7 +184,8 @@ impl CollatorPool { // TODO: punish if not primary? self.collations.entry((relay_parent, para_id.clone())) - .or_insert_with(|| CollationSlot::Blank) + .or_insert_with(CollationSlot::blank_now) + .entries .received_collation(collation); } } @@ -171,7 +193,8 @@ impl CollatorPool { /// Wait for a collation from a parachain. pub fn await_collation(&mut self, relay_parent: Hash, para_id: ParaId, sender: oneshot::Sender) { self.collations.entry((relay_parent, para_id)) - .or_insert_with(|| CollationSlot::Blank) + .or_insert_with(CollationSlot::blank_now) + .entries .await_with(sender); } @@ -181,6 +204,12 @@ impl CollatorPool { // TODO: rearrange periodically to new primary, evaluate based on latency etc. Vec::new() } + + /// called when a block with given hash has been imported. + pub fn collect_garbage(&mut self, chain_head: Option<&Hash>) { + let now = Instant::now(); + self.collations.retain(|&(ref h, _), slot| chain_head != Some(h) && slot.stay_alive(now)); + } } #[cfg(test)] @@ -260,4 +289,15 @@ mod tests { pool.await_collation(relay_parent, para_id, tx); rx.wait().unwrap(); } + + #[test] + fn slot_stay_alive() { + let slot = CollationSlot::blank_now(); + let now = slot.live_at; + + assert!(slot.stay_alive(now)); + assert!(slot.stay_alive(now + Duration::from_secs(10))); + assert!(!slot.stay_alive(now + COLLATION_LIFETIME)); + assert!(!slot.stay_alive(now + COLLATION_LIFETIME + Duration::from_secs(10))); + } } diff --git a/polkadot/network/src/lib.rs b/polkadot/network/src/lib.rs index d149a1ed320ee..9ed1b1a579df3 100644 --- a/polkadot/network/src/lib.rs +++ b/polkadot/network/src/lib.rs @@ -51,7 +51,7 @@ use codec::Slicable; use futures::sync::oneshot; use parking_lot::Mutex; use polkadot_consensus::{Statement, SignedStatement, GenericStatement}; -use polkadot_primitives::{AccountId, Block, SessionKey, Hash}; +use polkadot_primitives::{AccountId, Block, SessionKey, Hash, Header}; use polkadot_primitives::parachain::{Id as ParaId, BlockData, Extrinsic, CandidateReceipt, Collation}; use substrate_network::{PeerId, RequestId, Context}; use substrate_network::consensus_gossip::ConsensusGossip; @@ -509,6 +509,7 @@ impl Specialization for PolkadotProtocol { fn maintain_peers(&mut self, ctx: &mut Context) { self.consensus_gossip.collect_garbage(None); + self.collators.collect_garbage(None); self.dispatch_pending_requests(ctx); for collator_action in self.collators.maintain_peers() { @@ -524,6 +525,10 @@ impl Specialization for PolkadotProtocol { } } } + + fn on_block_imported(&mut self, _ctx: &mut Context, hash: Hash, _header: &Header) { + self.collators.collect_garbage(Some(&hash)); + } } impl PolkadotProtocol { From 43e7f916653cc89146b0d7802a36d877460301db Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 11 Jul 2018 17:57:20 +0100 Subject: [PATCH 10/11] ensure disconnected backup collator is removed from pool --- polkadot/network/src/collator_pool.rs | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/polkadot/network/src/collator_pool.rs b/polkadot/network/src/collator_pool.rs index 76df8098219b3..12ddade1de1e8 100644 --- a/polkadot/network/src/collator_pool.rs +++ b/polkadot/network/src/collator_pool.rs @@ -168,6 +168,10 @@ impl CollatorPool { Some(collators.primary) } } else { + let pos = occ.get().backup.iter().position(|a| a == &account_id) + .expect("registered collator always present in backup if not primary; qed"); + + occ.get_mut().backup.remove(pos); None } } @@ -232,6 +236,19 @@ mod tests { assert_eq!(pool.on_disconnect(good_backup), None); } + #[test] + fn disconnect_backup_removes_from_pool() { + let mut pool = CollatorPool::new(); + let para_id: ParaId = 5.into(); + let primary = [0; 32].into(); + let backup = [1; 32].into(); + + assert_eq!(pool.on_new_collator(primary, para_id.clone()), Role::Primary); + assert_eq!(pool.on_new_collator(backup, para_id.clone()), Role::Backup); + assert_eq!(pool.on_disconnect(backup), None); + assert!(pool.parachain_collators.get(¶_id).unwrap().backup.is_empty()); + } + #[test] fn await_before_collation() { let mut pool = CollatorPool::new(); From 0f35d0b551a6c397e8e547c6488022b52fa658e9 Mon Sep 17 00:00:00 2001 From: Robert Habermeier Date: Wed, 11 Jul 2018 18:06:45 +0100 Subject: [PATCH 11/11] address other grumbles --- polkadot/network/src/lib.rs | 19 +++++++++++++++---- polkadot/primitives/src/parachain.rs | 2 +- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/polkadot/network/src/lib.rs b/polkadot/network/src/lib.rs index 9ed1b1a579df3..1a9b995e5f30e 100644 --- a/polkadot/network/src/lib.rs +++ b/polkadot/network/src/lib.rs @@ -541,9 +541,14 @@ impl PolkadotProtocol { None => ctx.disconnect_peer(from), Some(peer_info) => match peer_info.status.collating_for { None => ctx.disable_peer(from), - Some((ref acc_id, ref para_id)) - if para_id != &collation_para || acc_id != &collated_acc || collation.receipt.check_signature().is_err() => ctx.disable_peer(from), - Some((ref acc_id, _)) => self.collators.on_collation(acc_id.clone(), relay_parent, collation), + Some((ref acc_id, ref para_id)) => { + let structurally_valid = para_id == &collation_para && acc_id == &collated_acc; + if structurally_valid && collation.receipt.check_signature().is_ok() { + self.collators.on_collation(acc_id.clone(), relay_parent, collation) + } else { + ctx.disable_peer(from) + }; + } }, } } @@ -556,9 +561,15 @@ impl PolkadotProtocol { // get connected peer with given account ID for collation. fn collator_peer_id(&self, account_id: AccountId) -> Option { + let check_info = |info: &PeerInfo| info + .status + .collating_for + .as_ref() + .map_or(false, |&(ref acc_id, _)| acc_id == &account_id); + self.peers .iter() - .filter(|&(_, info)| info.status.collating_for.as_ref().map_or(false, |&(ref acc_id, _)| acc_id == &account_id)) + .filter(|&(_, info)| check_info(info)) .map(|(peer_id, _)| *peer_id) .next() } diff --git a/polkadot/primitives/src/parachain.rs b/polkadot/primitives/src/parachain.rs index d95a4e2134f22..bbf9591ee0b52 100644 --- a/polkadot/primitives/src/parachain.rs +++ b/polkadot/primitives/src/parachain.rs @@ -200,7 +200,7 @@ impl CandidateReceipt { pub fn check_signature(&self) -> Result<(), ()> { use runtime_primitives::traits::Verify; - if self.signature.verify(&self.signature.0[..], &self.collator) { + if self.signature.verify(&self.block_data_hash.0[..], &self.collator) { Ok(()) } else { Err(())