Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 71 additions & 20 deletions client/network/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
use crate::{
config::{ProtocolId, Role}, block_requests, light_client_handler, finality_requests,
peer_info, discovery::{DiscoveryBehaviour, DiscoveryConfig, DiscoveryOut},
protocol::{message::{self, Roles}, CustomMessageOutcome, Protocol},
Event, ObservedRole, DhtEvent, ExHashT,
protocol::{message::{self, Roles}, CustomMessageOutcome, NotificationsSink, Protocol},
ObservedRole, DhtEvent, ExHashT,
};

use bytes::Bytes;
use codec::Encode as _;
use libp2p::NetworkBehaviour;
use libp2p::core::{Multiaddr, PeerId, PublicKey};
Expand Down Expand Up @@ -98,11 +99,53 @@ pub enum BehaviourOut<B: BlockT> {
request_duration: Duration,
},

/// Any event represented by the [`Event`] enum.
/// Opened a substream with the given node with the given notifications protocol.
///
/// > **Note**: The [`Event`] enum contains the events that are available through the public
/// > API of the library.
Event(Event),
/// The protocol is always one of the notification protocols that have been registered.
NotificationStreamOpened {
/// Node we opened the substream with.
remote: PeerId,
/// The concerned protocol. Each protocol uses a different substream.
engine_id: ConsensusEngineId,
/// Object that permits sending notifications to the peer.
notifications_sink: NotificationsSink,
/// Role of the remote.
role: ObservedRole,
},

/// The [`NotificationsSink`] object used to send notifications with the given peer must be
/// replaced with a new one.
///
/// This event is typically emitted when a transport-level connection is closed and we fall
/// back to a secondary connection.
NotificationStreamReplaced {
/// Id of the peer we are connected to.
remote: PeerId,
/// The concerned protocol. Each protocol uses a different substream.
engine_id: ConsensusEngineId,
/// Replacement for the previous [`NotificationsSink`].
notifications_sink: NotificationsSink,
},

/// Closed a substream with the given node. Always matches a corresponding previous
/// `NotificationStreamOpened` message.
NotificationStreamClosed {
/// Node we closed the substream with.
remote: PeerId,
/// The concerned protocol. Each protocol uses a different substream.
engine_id: ConsensusEngineId,
},

/// Received one or more messages from the given node using the given protocol.
NotificationsReceived {
/// Node we received the message from.
remote: PeerId,
/// Concerned protocol and associated message.
messages: Vec<(ConsensusEngineId, Bytes)>,
},

/// Event generated by a DHT.
Dht(DhtEvent),
}

impl<B: BlockT, H: ExHashT> Behaviour<B, H> {
Expand Down Expand Up @@ -184,12 +227,12 @@ impl<B: BlockT, H: ExHashT> Behaviour<B, H> {
let list = self.substrate.register_notifications_protocol(engine_id, protocol_name, handshake_message);
for (remote, roles) in list {
let role = reported_roles_to_observed_role(&self.role, remote, roles);
let ev = Event::NotificationStreamOpened {
self.events.push_back(BehaviourOut::NotificationStreamOpened {
remote: remote.clone(),
engine_id,
role,
};
self.events.push_back(BehaviourOut::Event(ev));
notifications_sink: todo!(), // TODO: uuuuggghhh
});
}
}

Expand Down Expand Up @@ -278,26 +321,34 @@ Behaviour<B, H> {
CustomMessageOutcome::FinalityProofRequest { target, block_hash, request } => {
self.finality_proof_requests.send_request(&target, block_hash, request);
},
CustomMessageOutcome::NotificationStreamOpened { remote, protocols, roles } => {
CustomMessageOutcome::NotificationStreamOpened { remote, protocols, roles, notifications_sink } => {
let role = reported_roles_to_observed_role(&self.role, &remote, roles);
for engine_id in protocols {
self.events.push_back(BehaviourOut::Event(Event::NotificationStreamOpened {
self.events.push_back(BehaviourOut::NotificationStreamOpened {
remote: remote.clone(),
engine_id,
role: role.clone(),
}));
notifications_sink: notifications_sink.clone(),
});
}
},
CustomMessageOutcome::NotificationStreamReplaced { remote, protocols, notifications_sink } =>
for engine_id in protocols {
self.events.push_back(BehaviourOut::NotificationStreamReplaced {
remote: remote.clone(),
engine_id,
notifications_sink: notifications_sink.clone(),
});
},
CustomMessageOutcome::NotificationStreamClosed { remote, protocols } =>
for engine_id in protocols {
self.events.push_back(BehaviourOut::Event(Event::NotificationStreamClosed {
self.events.push_back(BehaviourOut::NotificationStreamClosed {
remote: remote.clone(),
engine_id,
}));
});
},
CustomMessageOutcome::NotificationsReceived { remote, messages } => {
let ev = Event::NotificationsReceived { remote, messages };
self.events.push_back(BehaviourOut::Event(ev));
self.events.push_back(BehaviourOut::NotificationsReceived { remote, messages });
},
CustomMessageOutcome::PeerNewBest(peer_id, number) => {
self.light_client_handler.update_best_block(&peer_id, number);
Expand Down Expand Up @@ -393,16 +444,16 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviourEventProcess<DiscoveryOut>
self.substrate.add_discovered_nodes(iter::once(peer_id));
}
DiscoveryOut::ValueFound(results) => {
self.events.push_back(BehaviourOut::Event(Event::Dht(DhtEvent::ValueFound(results))));
self.events.push_back(BehaviourOut::Dht(DhtEvent::ValueFound(results)));
}
DiscoveryOut::ValueNotFound(key) => {
self.events.push_back(BehaviourOut::Event(Event::Dht(DhtEvent::ValueNotFound(key))));
self.events.push_back(BehaviourOut::Dht(DhtEvent::ValueNotFound(key)));
}
DiscoveryOut::ValuePut(key) => {
self.events.push_back(BehaviourOut::Event(Event::Dht(DhtEvent::ValuePut(key))));
self.events.push_back(BehaviourOut::Dht(DhtEvent::ValuePut(key)));
}
DiscoveryOut::ValuePutFailed(key) => {
self.events.push_back(BehaviourOut::Event(Event::Dht(DhtEvent::ValuePutFailed(key))));
self.events.push_back(BehaviourOut::Dht(DhtEvent::ValuePutFailed(key)));
}
DiscoveryOut::RandomKademliaStarted(protocols) => {
for protocol in protocols {
Expand Down
40 changes: 33 additions & 7 deletions client/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ pub mod message;
pub mod event;
pub mod sync;

pub use generic_proto::LegacyConnectionKillError;
pub use generic_proto::{NotificationsSink, Ready, LegacyConnectionKillError};

const REQUEST_TIMEOUT_SEC: u64 = 40;
/// Interval at which we perform time based maintenance
Expand Down Expand Up @@ -939,7 +939,12 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
}

/// Called on receipt of a status message via the legacy protocol on the first connection between two peers.
pub fn on_peer_connected(&mut self, who: PeerId, status: message::Status<B>) -> CustomMessageOutcome<B> {
pub fn on_peer_connected(
&mut self,
who: PeerId,
status: message::Status<B>,
notifications_sink: NotificationsSink,
) -> CustomMessageOutcome<B> {
trace!(target: "sync", "New peer {} {:?}", who, status);
let _protocol_version = {
if self.context_data.peers.contains_key(&who) {
Expand Down Expand Up @@ -1051,10 +1056,12 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
remote: who,
protocols: self.protocol_name_by_engine.keys().cloned().collect(),
roles: info.roles,
notifications_sink,
}
}

/// Send a notification to the given peer we're connected to.
// TODO: remove
/*/// Send a notification to the given peer we're connected to.
///
/// Doesn't do anything if we don't have a notifications substream for that protocol with that
/// peer.
Expand All @@ -1078,7 +1085,7 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
engine_id
);
}
}
}*/

/// Registers a new notifications protocol.
///
Expand Down Expand Up @@ -1829,7 +1836,18 @@ pub enum CustomMessageOutcome<B: BlockT> {
JustificationImport(Origin, B::Hash, NumberFor<B>, Justification),
FinalityProofImport(Origin, B::Hash, NumberFor<B>, Vec<u8>),
/// Notification protocols have been opened with a remote.
NotificationStreamOpened { remote: PeerId, protocols: Vec<ConsensusEngineId>, roles: Roles },
NotificationStreamOpened {
remote: PeerId,
protocols: Vec<ConsensusEngineId>,
roles: Roles,
notifications_sink: NotificationsSink
},
/// The [`NotificationsSink`] of some notification protocols need an update.
NotificationStreamReplaced {
remote: PeerId,
protocols: Vec<ConsensusEngineId>,
notifications_sink: NotificationsSink,
},
/// Notification protocols have been closed with a remote.
NotificationStreamClosed { remote: PeerId, protocols: Vec<ConsensusEngineId> },
/// Messages have been received on one or more notifications protocols.
Expand Down Expand Up @@ -1994,9 +2012,10 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
};

let outcome = match event {
GenericProtoOut::CustomProtocolOpen { peer_id, received_handshake, .. } => {
GenericProtoOut::CustomProtocolOpen { peer_id, received_handshake, notifications_sink, .. } => {
match <Message<B> as Decode>::decode(&mut &received_handshake[..]) {
Ok(GenericMessage::Status(handshake)) => self.on_peer_connected(peer_id, handshake),
Ok(GenericMessage::Status(handshake)) =>
self.on_peer_connected(peer_id, handshake, notifications_sink),
Ok(msg) => {
debug!(
target: "sync",
Expand All @@ -2020,6 +2039,13 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
}
}
}
GenericProtoOut::CustomProtocolReplaced { peer_id, notifications_sink, .. } => {
CustomMessageOutcome::NotificationStreamReplaced {
remote: peer_id,
protocols: self.protocol_name_by_engine.keys().cloned().collect(),
notifications_sink,
}
},
GenericProtoOut::CustomProtocolClosed { peer_id, .. } => {
self.on_peer_disconnected(peer_id)
},
Expand Down
2 changes: 1 addition & 1 deletion client/network/src/protocol/generic_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
//! network, then performs the Substrate protocol handling on top.

pub use self::behaviour::{GenericProto, GenericProtoOut};
pub use self::handler::LegacyConnectionKillError;
pub use self::handler::{NotificationsSink, Ready, LegacyConnectionKillError};

mod behaviour;
mod handler;
Expand Down
Loading