Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions network/src/collator_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,11 @@ impl CollatorPool {
let now = Instant::now();
self.collations.retain(|&(ref h, _), slot| chain_head != Some(h) && slot.stay_alive(now));
}

/// Convert the given `CollatorId` to a `PeerId`.
pub fn collator_id_to_peer_id(&self, collator_id: &CollatorId) -> Option<&PeerId> {
self.collators.get(collator_id).map(|ids| &ids.1)
}
}

#[cfg(test)]
Expand Down
5 changes: 5 additions & 0 deletions network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,11 @@ impl PolkadotProtocol {
}
}
}

/// Convert the given `CollatorId` to a `PeerId`.
pub fn collator_id_to_peer_id(&self, collator_id: &CollatorId) -> Option<&PeerId> {
self.collators.collator_id_to_peer_id(collator_id)
}
}

impl Specialization<Block> for PolkadotProtocol {
Expand Down
38 changes: 24 additions & 14 deletions network/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ use polkadot_validation::{
SharedTable, TableRouter, SignedStatement, GenericStatement, ParachainWork, Validated
};
use polkadot_primitives::{Block, Hash};
use polkadot_primitives::parachain::{Extrinsic, CandidateReceipt, ParachainHost,
ValidatorIndex, Collation, PoVBlock,
use polkadot_primitives::parachain::{
Extrinsic, CandidateReceipt, ParachainHost, ValidatorIndex, Collation, PoVBlock,
};
use crate::gossip::{RegisteredMessageValidator, GossipMessage, GossipStatement};

Expand All @@ -51,6 +51,23 @@ pub(crate) fn attestation_topic(parent_hash: Hash) -> Hash {
BlakeTwo256::hash(&v[..])
}

/// Create a `Stream` of checked statements.
///
/// The returned stream will not terminate, so it is required to make sure that the stream is
/// dropped when it is not required anymore. Otherwise, it will stick around in memory
/// infinitely.
pub(crate) fn checked_statements<N: NetworkService>(network: &N, topic: Hash) ->
impl Stream<Item=SignedStatement, Error=()> {
// spin up a task in the background that processes all incoming statements
// validation has been done already by the gossip validator.
// this will block internally until the gossip messages stream is obtained.
network.gossip_messages_for(topic)
.filter_map(|msg| match msg.0 {
GossipMessage::Statement(s) => Some(s.signed_statement),
_ => None
})
}

/// Table routing implementation.
pub struct Router<P, E, N: NetworkService, T> {
table: Arc<SharedTable>,
Expand All @@ -76,21 +93,14 @@ impl<P, E, N: NetworkService, T> Router<P, E, N, T> {
}
}

/// Return a future of checked messages. These should be imported into the router
/// Return a `Stream` of checked messages. These should be imported into the router
/// with `import_statement`.
///
/// The returned stream will not terminate, so it is required to make sure that the stream is
/// dropped when it is not required anymore. Otherwise, it will stick around in memory
/// infinitely.
pub(crate) fn checked_statements(&self) -> impl Stream<Item=SignedStatement, Error=()> {
// spin up a task in the background that processes all incoming statements
// validation has been done already by the gossip validator.
// this will block internally until the gossip messages stream is obtained.
self.network().gossip_messages_for(self.attestation_topic)
.filter_map(|msg| match msg.0 {
GossipMessage::Statement(s) => Some(s.signed_statement),
_ => None
})
checked_statements(&**self.network(), self.attestation_topic)
}

fn parent_hash(&self) -> Hash {
Expand All @@ -107,7 +117,7 @@ impl<P, E: Clone, N: NetworkService, T: Clone> Clone for Router<P, E, N, T> {
Router {
table: self.table.clone(),
fetcher: self.fetcher.clone(),
attestation_topic: self.attestation_topic.clone(),
attestation_topic: self.attestation_topic,
deferred_statements: self.deferred_statements.clone(),
message_validator: self.message_validator.clone(),
}
Expand Down Expand Up @@ -177,7 +187,7 @@ impl<P: ProvideRuntimeApi + Send + Sync + 'static, E, N, T> Router<P, E, N, T> w
let table = self.table.clone();
let network = self.network().clone();
let knowledge = self.fetcher.knowledge().clone();
let attestation_topic = self.attestation_topic.clone();
let attestation_topic = self.attestation_topic;
let parent_hash = self.parent_hash();

producer.prime(self.fetcher.api().clone())
Expand Down Expand Up @@ -232,7 +242,7 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> TableRouter for Router<P, E, N, T> wh

impl<P, E, N: NetworkService, T> Drop for Router<P, E, N, T> {
fn drop(&mut self) {
let parent_hash = self.parent_hash().clone();
let parent_hash = self.parent_hash();
self.network().with_spec(move |spec, _| { spec.remove_validation_session(parent_hash); });
}
}
Expand Down
36 changes: 29 additions & 7 deletions network/src/validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@ use substrate_network::{PeerId, Context as NetContext};
use substrate_network::consensus_gossip::{
self, TopicNotification, MessageRecipient as GossipMessageRecipient, ConsensusMessage,
};
use polkadot_validation::{Network as ParachainNetwork, SharedTable, Collators, Statement, GenericStatement};
use polkadot_validation::{
Network as ParachainNetwork, SharedTable, Collators, Statement, GenericStatement, SignedStatement,
};
use polkadot_primitives::{Block, BlockId, Hash, SessionKey};
use polkadot_primitives::parachain::{
Id as ParaId, Collation, Extrinsic, ParachainHost, CandidateReceipt, CollatorId,
Expand Down Expand Up @@ -286,6 +288,26 @@ impl<P, E, N, T> ValidationNetwork<P, E, N, T> where

rx
}

/// Convert the given `CollatorId` to a `PeerId`.
pub fn collator_id_to_peer_id(&self, collator_id: CollatorId) ->
impl Future<Item=Option<PeerId>, Error=()> + Send
{
let (send, recv) = oneshot::channel();
self.network.with_spec(move |spec, _| {
let _ = send.send(spec.collator_id_to_peer_id(&collator_id).cloned());
});
recv.map_err(|_| ())
}

/// Create a `Stream` of checked statements for the given `relay_parent`.
///
/// The returned stream will not terminate, so it is required to make sure that the stream is
/// dropped when it is not required anymore. Otherwise, it will stick around in memory
/// infinitely.
pub fn checked_statements(&self, relay_parent: Hash) -> impl Stream<Item=SignedStatement, Error=()> {
crate::router::checked_statements(&*self.network, crate::router::attestation_topic(relay_parent))
}
}

/// A long-lived network which can create parachain statement routing processes on demand.
Expand All @@ -305,7 +327,7 @@ impl<P, E, N, T> ParachainNetwork for ValidationNetwork<P, E, N, T> where
table: Arc<SharedTable>,
authorities: &[ValidatorId],
) -> Self::BuildTableRouter {
let parent_hash = table.consensus_parent_hash().clone();
let parent_hash = *table.consensus_parent_hash();
let local_session_key = table.session_key();

let build_fetcher = self.instantiate_session(SessionParams {
Expand Down Expand Up @@ -343,7 +365,7 @@ pub struct NetworkDown;

/// A future that resolves when a collation is received.
pub struct AwaitingCollation {
outer: ::futures::sync::oneshot::Receiver<::futures::sync::oneshot::Receiver<Collation>>,
outer: futures::sync::oneshot::Receiver<::futures::sync::oneshot::Receiver<Collation>>,
inner: Option<::futures::sync::oneshot::Receiver<Collation>>
}

Expand Down Expand Up @@ -576,7 +598,7 @@ impl LiveValidationSessions {
&mut self,
params: SessionParams,
) -> (ValidationSession, Option<ValidatorId>) {
let parent_hash = params.parent_hash.clone();
let parent_hash = params.parent_hash;

let key = params.local_session_key.clone();
let recent = &mut self.recent;
Expand Down Expand Up @@ -703,7 +725,7 @@ pub struct SessionDataFetcher<P, E, N: NetworkService, T> {
impl<P, E, N: NetworkService, T> SessionDataFetcher<P, E, N, T> {
/// Get the parent hash.
pub(crate) fn parent_hash(&self) -> Hash {
self.parent_hash.clone()
self.parent_hash
}

/// Get the shared knowledge.
Expand Down Expand Up @@ -738,7 +760,7 @@ impl<P, E: Clone, N: NetworkService, T: Clone> Clone for SessionDataFetcher<P, E
network: self.network.clone(),
api: self.api.clone(),
task_executor: self.task_executor.clone(),
parent_hash: self.parent_hash.clone(),
parent_hash: self.parent_hash,
knowledge: self.knowledge.clone(),
exit: self.exit.clone(),
message_validator: self.message_validator.clone(),
Expand All @@ -754,7 +776,7 @@ impl<P: ProvideRuntimeApi + Send, E, N, T> SessionDataFetcher<P, E, N, T> where
{
/// Fetch PoV block for the given candidate receipt.
pub fn fetch_pov_block(&self, candidate: &CandidateReceipt) -> PoVReceiver {
let parachain = candidate.parachain_index.clone();
let parachain = candidate.parachain_index;
let parent_hash = self.parent_hash;

let canon_roots = self.api.runtime_api().ingress(&BlockId::hash(parent_hash), parachain)
Expand Down