Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
235 changes: 117 additions & 118 deletions client/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use sp_consensus::{
block_validation::BlockAnnounceValidator,
import_queue::{BlockImportResult, BlockImportError, IncomingBlock, Origin}
};
use codec::{Decode, Encode};
use codec::{Decode, DecodeAll, Encode};
use sp_runtime::{generic::BlockId, ConsensusEngineId, Justification};
use sp_runtime::traits::{
Block as BlockT, Header as HeaderT, NumberFor, Zero, CheckedSub
Expand All @@ -53,7 +53,7 @@ use std::borrow::Cow;
use std::collections::{HashMap, HashSet, VecDeque, hash_map::Entry};
use std::sync::Arc;
use std::fmt::Write;
use std::{io, num::NonZeroUsize, pin::Pin, task::Poll, time};
use std::{io, iter, num::NonZeroUsize, pin::Pin, task::Poll, time};
use log::{log, Level, trace, debug, warn, error};
use wasm_timer::Instant;

Expand Down Expand Up @@ -271,8 +271,6 @@ struct Peer<B: BlockT, H: ExHashT> {
pub struct PeerInfo<B: BlockT> {
/// Roles
pub roles: Roles,
/// Protocol version
pub protocol_version: u32,
/// Peer best block hash
pub best_hash: B::Hash,
/// Peer best block number
Expand Down Expand Up @@ -391,14 +389,6 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
};

let (peerset, peerset_handle) = sc_peerset::Peerset::from_config(peerset_config);
let versions = &((MIN_VERSION as u8)..=(CURRENT_VERSION as u8)).collect::<Vec<u8>>();
let mut behaviour = GenericProto::new(
local_peer_id,
protocol_id.clone(),
versions,
build_status_message(&config, &chain),
peerset,
);

let mut legacy_equiv_by_name = HashMap::new();

Expand All @@ -409,7 +399,6 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
proto.push_str("/transactions/1");
proto
});
behaviour.register_notif_protocol(transactions_protocol.clone(), Vec::new());
legacy_equiv_by_name.insert(transactions_protocol.clone(), Fallback::Transactions);

let block_announces_protocol: Cow<'static, str> = Cow::from({
Expand All @@ -419,12 +408,24 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
proto.push_str("/block-announces/1");
proto
});
behaviour.register_notif_protocol(
block_announces_protocol.clone(),
BlockAnnouncesHandshake::build(&config, &chain).encode()
);
legacy_equiv_by_name.insert(block_announces_protocol.clone(), Fallback::BlockAnnounce);

let behaviour = {
let versions = &((MIN_VERSION as u8)..=(CURRENT_VERSION as u8)).collect::<Vec<u8>>();
let block_announces_handshake = BlockAnnouncesHandshake::build(&config, &chain).encode();
GenericProto::new(
local_peer_id,
protocol_id.clone(),
versions,
build_status_message(&config, &chain),
peerset,
// As documented in `GenericProto`, the first protocol in the list is always the
// one carrying the handshake reported in the `CustomProtocolOpen` event.
iter::once((block_announces_protocol.clone(), block_announces_handshake))
.chain(iter::once((transactions_protocol.clone(), vec![]))),
)
};

let protocol = Protocol {
tick_timeout: Box::pin(interval(TICK_TIMEOUT)),
propagate_timeout: Box::pin(interval(PROPAGATE_TIMEOUT)),
Expand Down Expand Up @@ -829,99 +830,86 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
}
}

/// Called on receipt of a status message via the legacy protocol on the first connection between two peers.
pub fn on_peer_connected(
/// Called on the first connection between two peers, after their exchange of handshake.
fn on_peer_connected(
&mut self,
who: PeerId,
status: message::Status<B>,
status: BlockAnnouncesHandshake<B>,
notifications_sink: NotificationsSink,
) -> CustomMessageOutcome<B> {
trace!(target: "sync", "New peer {} {:?}", who, status);
let _protocol_version = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The diff here is quite large because I've remove an indentation level, sorry for that.

if self.context_data.peers.contains_key(&who) {
debug!(target: "sync", "Ignoring duplicate status packet from {}", who);
return CustomMessageOutcome::None;
}
if status.genesis_hash != self.genesis_hash {
log!(
target: "sync",
if self.important_peers.contains(&who) { Level::Warn } else { Level::Trace },
"Peer is on different chain (our genesis: {} theirs: {})",
self.genesis_hash, status.genesis_hash
);
self.peerset_handle.report_peer(who.clone(), rep::GENESIS_MISMATCH);
self.behaviour.disconnect_peer(&who);

if self.boot_node_ids.contains(&who) {
error!(
target: "sync",
"Bootnode with peer id `{}` is on a different chain (our genesis: {} theirs: {})",
who,
self.genesis_hash,
status.genesis_hash,
);
}
if self.context_data.peers.contains_key(&who) {
debug!(target: "sync", "Ignoring duplicate status packet from {}", who);
return CustomMessageOutcome::None;
}
if status.genesis_hash != self.genesis_hash {
log!(
target: "sync",
if self.important_peers.contains(&who) { Level::Warn } else { Level::Trace },
"Peer is on different chain (our genesis: {} theirs: {})",
self.genesis_hash, status.genesis_hash
);
self.peerset_handle.report_peer(who.clone(), rep::GENESIS_MISMATCH);
self.behaviour.disconnect_peer(&who);

return CustomMessageOutcome::None;
}
if status.version < MIN_VERSION && CURRENT_VERSION < status.min_supported_version {
log!(
if self.boot_node_ids.contains(&who) {
error!(
target: "sync",
if self.important_peers.contains(&who) { Level::Warn } else { Level::Trace },
"Peer {:?} using unsupported protocol version {}", who, status.version
"Bootnode with peer id `{}` is on a different chain (our genesis: {} theirs: {})",
who,
self.genesis_hash,
status.genesis_hash,
);
self.peerset_handle.report_peer(who.clone(), rep::BAD_PROTOCOL);
self.behaviour.disconnect_peer(&who);
return CustomMessageOutcome::None;
}

if self.config.roles.is_light() {
// we're not interested in light peers
if status.roles.is_light() {
debug!(target: "sync", "Peer {} is unable to serve light requests", who);
self.peerset_handle.report_peer(who.clone(), rep::BAD_ROLE);
self.behaviour.disconnect_peer(&who);
return CustomMessageOutcome::None;
}
return CustomMessageOutcome::None;
}

// we don't interested in peers that are far behind us
let self_best_block = self
.context_data
.chain
.info()
.best_number;
let blocks_difference = self_best_block
.checked_sub(&status.best_number)
.unwrap_or_else(Zero::zero)
.saturated_into::<u64>();
if blocks_difference > LIGHT_MAXIMAL_BLOCKS_DIFFERENCE {
debug!(target: "sync", "Peer {} is far behind us and will unable to serve light requests", who);
self.peerset_handle.report_peer(who.clone(), rep::PEER_BEHIND_US_LIGHT);
self.behaviour.disconnect_peer(&who);
return CustomMessageOutcome::None;
}
if self.config.roles.is_light() {
// we're not interested in light peers
if status.roles.is_light() {
debug!(target: "sync", "Peer {} is unable to serve light requests", who);
self.peerset_handle.report_peer(who.clone(), rep::BAD_ROLE);
self.behaviour.disconnect_peer(&who);
return CustomMessageOutcome::None;
}

let peer = Peer {
info: PeerInfo {
protocol_version: status.version,
roles: status.roles,
best_hash: status.best_hash,
best_number: status.best_number
},
block_request: None,
known_transactions: LruHashSet::new(NonZeroUsize::new(MAX_KNOWN_TRANSACTIONS)
.expect("Constant is nonzero")),
known_blocks: LruHashSet::new(NonZeroUsize::new(MAX_KNOWN_BLOCKS)
.expect("Constant is nonzero")),
next_request_id: 0,
obsolete_requests: HashMap::new(),
};
self.context_data.peers.insert(who.clone(), peer);
// we don't interested in peers that are far behind us
let self_best_block = self
.context_data
.chain
.info()
.best_number;
let blocks_difference = self_best_block
.checked_sub(&status.best_number)
.unwrap_or_else(Zero::zero)
.saturated_into::<u64>();
if blocks_difference > LIGHT_MAXIMAL_BLOCKS_DIFFERENCE {
debug!(target: "sync", "Peer {} is far behind us and will unable to serve light requests", who);
self.peerset_handle.report_peer(who.clone(), rep::PEER_BEHIND_US_LIGHT);
self.behaviour.disconnect_peer(&who);
return CustomMessageOutcome::None;
}
}

debug!(target: "sync", "Connected {}", who);
status.version
let peer = Peer {
info: PeerInfo {
roles: status.roles,
best_hash: status.best_hash,
best_number: status.best_number
},
block_request: None,
known_transactions: LruHashSet::new(NonZeroUsize::new(MAX_KNOWN_TRANSACTIONS)
.expect("Constant is nonzero")),
known_blocks: LruHashSet::new(NonZeroUsize::new(MAX_KNOWN_BLOCKS)
.expect("Constant is nonzero")),
next_request_id: 0,
obsolete_requests: HashMap::new(),
};
self.context_data.peers.insert(who.clone(), peer);

debug!(target: "sync", "Connected {}", who);

let info = self.context_data.peers.get(&who).expect("We just inserted above; QED").info.clone();
self.pending_messages.push_back(CustomMessageOutcome::PeerNewBest(who.clone(), status.best_number));
Expand Down Expand Up @@ -1151,20 +1139,12 @@ impl<B: BlockT, H: ExHashT> Protocol<B, H> {
if inserted || force {
let message = message::BlockAnnounce {
header: header.clone(),
state: if peer.info.protocol_version >= 4 {
if is_best {
Some(message::BlockState::Best)
} else {
Some(message::BlockState::Normal)
}
} else {
None
},
data: if peer.info.protocol_version >= 4 {
Some(data.clone())
state: if is_best {
Some(message::BlockState::Best)
} else {
None
Some(message::BlockState::Normal)
},
data: Some(data.clone()),
};

self.behaviour.write_notification(
Expand Down Expand Up @@ -1588,9 +1568,20 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {

let outcome = match event {
GenericProtoOut::CustomProtocolOpen { peer_id, received_handshake, notifications_sink, .. } => {
match <Message<B> as Decode>::decode(&mut &received_handshake[..]) {
Ok(GenericMessage::Status(handshake)) =>
self.on_peer_connected(peer_id, handshake, notifications_sink),
// `received_handshake` can be either a `Status` message if received from the
// legacy substream ,or a `BlockAnnouncesHandshake` if received from the block
// announces substream.
match <Message<B> as DecodeAll>::decode_all(&mut &received_handshake[..]) {
Ok(GenericMessage::Status(handshake)) => {
let handshake = BlockAnnouncesHandshake {
roles: handshake.roles,
best_number: handshake.best_number,
best_hash: handshake.best_hash,
genesis_hash: handshake.genesis_hash,
};

self.on_peer_connected(peer_id, handshake, notifications_sink)
},
Ok(msg) => {
debug!(
target: "sync",
Expand All @@ -1602,15 +1593,23 @@ impl<B: BlockT, H: ExHashT> NetworkBehaviour for Protocol<B, H> {
CustomMessageOutcome::None
}
Err(err) => {
debug!(
target: "sync",
"Couldn't decode handshake sent by {}: {:?}: {}",
peer_id,
received_handshake,
err.what()
);
self.peerset_handle.report_peer(peer_id, rep::BAD_MESSAGE);
CustomMessageOutcome::None
match <BlockAnnouncesHandshake<B> as DecodeAll>::decode_all(&mut &received_handshake[..]) {
Ok(handshake) => {
self.on_peer_connected(peer_id, handshake, notifications_sink)
}
Err(err2) => {
debug!(
target: "sync",
"Couldn't decode handshake sent by {}: {:?}: {} & {}",
peer_id,
received_handshake,
err.what(),
err2,
);
self.peerset_handle.report_peer(peer_id, rep::BAD_MESSAGE);
CustomMessageOutcome::None
}
}
}
}
}
Expand Down
9 changes: 8 additions & 1 deletion client/network/src/protocol/generic_proto/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -336,14 +336,21 @@ impl GenericProto {
versions: &[u8],
handshake_message: Vec<u8>,
peerset: sc_peerset::Peerset,
notif_protocols: impl Iterator<Item = (Cow<'static, str>, Vec<u8>)>,
) -> Self {
let notif_protocols = notif_protocols
.map(|(n, hs)| (n, Arc::new(RwLock::new(hs))))
.collect::<Vec<_>>();

assert!(!notif_protocols.is_empty());

let legacy_handshake_message = Arc::new(RwLock::new(handshake_message));
let legacy_protocol = RegisteredProtocol::new(protocol, versions, legacy_handshake_message);

GenericProto {
local_peer_id,
legacy_protocol,
notif_protocols: Vec::new(),
notif_protocols,
peerset,
peers: FnvHashMap::default(),
delays: Default::default(),
Expand Down
Loading