Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion substrate/network/src/blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ impl BlockCollection {
};

// crop to peers best
if range.start >= peer_best {
if range.start > peer_best {
trace!(target: "sync", "Out of range for peer {} ({} vs {})", peer_id, range.start, peer_best);
return None;
}
Expand Down
78 changes: 60 additions & 18 deletions substrate/network/src/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@
use std::collections::{HashMap, HashSet};
use futures::sync::{oneshot, mpsc};
use std::time::{Instant, Duration};
use std::collections::hash_map::Entry;
use io::SyncIo;
use protocol::Protocol;
use network::PeerId;
use primitives::Hash;
use primitives::{Hash, block::HeaderHash, block::Id as BlockId, block::Header};
use message::{self, Message};
use runtime_support::Hashable;

Expand All @@ -47,18 +46,22 @@ pub struct Consensus {
our_candidate: Option<(Hash, Vec<u8>)>,
statement_sink: Option<mpsc::UnboundedSender<message::Statement>>,
bft_message_sink: Option<(mpsc::UnboundedSender<message::LocalizedBftMessage>, Hash)>,
messages: HashMap<Hash, (Instant, message::Message)>,
messages: Vec<(Hash, Instant, message::Message)>,
message_hashes: HashSet<Hash>,
last_block_hash: HeaderHash,
}

impl Consensus {
/// Create a new instance.
pub fn new() -> Consensus {
pub fn new(best_hash: HeaderHash) -> Consensus {
Consensus {
peers: HashMap::new(),
our_candidate: None,
statement_sink: None,
bft_message_sink: None,
messages: Default::default(),
message_hashes: Default::default(),
last_block_hash: best_hash,
}
}

Expand All @@ -75,9 +78,9 @@ impl Consensus {
// Send out all known messages.
// TODO: limit by size
let mut known_messages = HashSet::new();
for (hash, &(_, ref m)) in self.messages.iter() {
for &(ref hash, _, ref message) in self.messages.iter() {
known_messages.insert(hash.clone());
protocol.send_message(io, peer_id, m.clone());
protocol.send_message(io, peer_id, message.clone());
}
self.peers.insert(peer_id, PeerConsensus {
candidate_fetch: None,
Expand All @@ -96,13 +99,13 @@ impl Consensus {
}

fn register_message(&mut self, hash: Hash, message: message::Message) {
if let Entry::Vacant(entry) = self.messages.entry(hash) {
entry.insert((Instant::now(), message));
if self.message_hashes.insert(hash) {
self.messages.push((hash, Instant::now(), message));
}
}

pub fn on_statement(&mut self, io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId, statement: message::Statement, hash: Hash) {
if self.messages.contains_key(&hash) {
if self.message_hashes.contains(&hash) {
trace!(target:"sync", "Ignored already known statement from {}", peer_id);
}
if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
Expand Down Expand Up @@ -137,11 +140,25 @@ impl Consensus {
}

pub fn on_bft_message(&mut self, io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId, message: message::LocalizedBftMessage, hash: Hash) {
if self.messages.contains_key(&hash) {
if self.message_hashes.contains(&hash) {
trace!(target:"sync", "Ignored already known BFT message from {}", peer_id);
return;
}

match (protocol.chain().info(), protocol.chain().header(&BlockId::Hash(message.parent_hash))) {
(_, Err(e)) | (Err(e), _) => {
debug!(target:"sync", "Error reading blockchain: {:?}", e);
return;
},
(Ok(info), Ok(Some(header))) => {
if header.number < info.chain.best_number {
trace!(target:"sync", "Ignored ancient BFT message from {}, hash={}", peer_id, message.parent_hash);
return;
}
},
(Ok(_), Ok(None)) => {},
}

if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
peer.known_messages.insert(hash);
// TODO: validate signature?
Expand All @@ -168,9 +185,9 @@ impl Consensus {
pub fn bft_messages(&mut self, parent_hash: Hash) -> mpsc::UnboundedReceiver<message::LocalizedBftMessage>{
let (sink, stream) = mpsc::unbounded();

for (_, message) in self.messages.iter() {
for &(_, _, ref message) in self.messages.iter() {
let bft_message = match *message {
(_, Message::BftMessage(ref msg)) => msg,
Message::BftMessage(ref msg) => msg,
_ => continue,
};

Expand Down Expand Up @@ -266,17 +283,42 @@ impl Consensus {
self.peers.remove(&peer_id);
}

pub fn collect_garbage(&mut self) {
let expiration = Duration::from_secs(MESSAGE_LIFETIME_SECONDS);
let now = Instant::now();
pub fn collect_garbage(&mut self, best_hash_and_header: Option<(HeaderHash, &Header)>) {
let hashes = &mut self.message_hashes;
let last_block_hash = &mut self.last_block_hash;
let before = self.messages.len();
self.messages.retain(|_, &mut (timestamp, _)| timestamp < now + expiration);
let (best_hash, best_header) = best_hash_and_header.map(|(h, header)| (Some(h), Some(header))).unwrap_or((None, None));
if best_header.as_ref().map_or(false, |header| header.parent_hash != *last_block_hash) {
trace!(target:"sync", "Clearing conensus message cache");
self.messages.clear();
hashes.clear();
} else {
let expiration = Duration::from_secs(MESSAGE_LIFETIME_SECONDS);
let now = Instant::now();
if let Some(hash) = best_hash {
*last_block_hash = hash;
}
self.messages.retain(|&(ref hash, timestamp, ref message)| {
timestamp < now + expiration ||
best_header.map_or(true, |header| {
if match *message {
Message::BftMessage(ref msg) => msg.parent_hash != header.parent_hash,
Message::Statement(ref msg) => msg.parent_hash != header.parent_hash,
_ => true,
} {
hashes.remove(hash);
true
} else {
false
}
})
});
}
if self.messages.len() != before {
trace!(target:"sync", "Cleaned up {} stale messages", before - self.messages.len());
}
let messages = &self.messages;
for (_, ref mut peer) in self.peers.iter_mut() {
peer.known_messages.retain(|h| messages.contains_key(h));
peer.known_messages.retain(|h| hashes.contains(h));
}
}
}
1 change: 1 addition & 0 deletions substrate/network/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ pub enum SignedConsensusMessage {
/// A vote.
Vote(SignedConsensusVote),
}

#[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)]
/// A network message.
pub enum Message {
Expand Down
15 changes: 10 additions & 5 deletions substrate/network/src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use io::SyncIo;
use error;
use super::header_hash;

const REQUEST_TIMEOUT_SEC: u64 = 15;
const REQUEST_TIMEOUT_SEC: u64 = 40;
const PROTOCOL_VERSION: u32 = 0;

// Maximum allowed entries in `BlockResponse`
Expand Down Expand Up @@ -114,12 +114,13 @@ impl Protocol {
/// Create a new instance.
pub fn new(config: ProtocolConfig, chain: Arc<Client>, transaction_pool: Arc<TransactionPool>) -> error::Result<Protocol> {
let info = chain.info()?;
let best_hash = info.chain.best_hash;
let protocol = Protocol {
config: config,
chain: chain,
genesis_hash: info.chain.genesis_hash,
sync: RwLock::new(ChainSync::new(&info)),
consensus: Mutex::new(Consensus::new()),
consensus: Mutex::new(Consensus::new(best_hash)),
peers: RwLock::new(HashMap::new()),
handshaking_peers: RwLock::new(HashMap::new()),
transaction_pool: transaction_pool,
Expand Down Expand Up @@ -344,7 +345,7 @@ impl Protocol {
/// Perform time based maintenance.
pub fn tick(&self, io: &mut SyncIo) {
self.maintain_peers(io);
self.consensus.lock().collect_garbage();
self.consensus.lock().collect_garbage(None);
}

fn maintain_peers(&self, io: &mut SyncIo) {
Expand Down Expand Up @@ -387,6 +388,8 @@ impl Protocol {
return;
}

let mut sync = self.sync.write();
let mut consensus = self.consensus.lock();
{
let mut peers = self.peers.write();
let mut handshaking_peers = self.handshaking_peers.write();
Expand Down Expand Up @@ -420,8 +423,8 @@ impl Protocol {
handshaking_peers.remove(&peer_id);
debug!(target: "sync", "Connected {} {}", peer_id, io.peer_info(peer_id));
}
self.sync.write().new_peer(io, self, peer_id);
self.consensus.lock().new_peer(io, self, peer_id, &status.roles);
sync.new_peer(io, self, peer_id);
consensus.new_peer(io, self, peer_id, &status.roles);
}

/// Called when peer sends us new transactions
Expand Down Expand Up @@ -511,6 +514,8 @@ impl Protocol {
}));
}
}

self.consensus.lock().collect_garbage(Some((hash, &header)));
}

pub fn transactions_stats(&self) -> BTreeMap<ExtrinsicHash, TransactionStats> {
Expand Down
10 changes: 6 additions & 4 deletions substrate/network/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,13 @@ impl ChainSync {
io.disable_peer(peer_id);
},
(Ok(BlockStatus::Unknown), 0) => {
debug!(target:"sync", "New peer with unkown genesis hash {} ({}).", info.best_hash, info.best_number);
debug!(target:"sync", "New peer with unknown genesis hash {} ({}).", info.best_hash, info.best_number);
io.disable_peer(peer_id);
},
(Ok(BlockStatus::Unknown), _) => {
let our_best = self.best_queued_number;
if our_best > 0 {
debug!(target:"sync", "New peer with unkown best hash {} ({}), searching for common ancestor.", info.best_hash, info.best_number);
debug!(target:"sync", "New peer with unknown best hash {} ({}), searching for common ancestor.", info.best_hash, info.best_number);
self.peers.insert(peer_id, PeerSync {
common_hash: self.genesis_hash,
common_number: 0,
Expand Down Expand Up @@ -187,13 +187,15 @@ impl ChainSync {
trace!(target:"sync", "Found common ancestor for peer {}: {} ({})", peer_id, block.hash, n);
vec![]
},
Ok(_) if n > 0 => {
Ok(our_best) if n > 0 => {
trace!(target:"sync", "Ancestry block mismatch for peer {}: theirs: {} ({}), ours: {:?}", peer_id, block.hash, n, our_best);
let n = n - 1;
peer.state = PeerSyncState::AncestorSearch(n);
Self::request_ancestry(io, protocol, peer_id, n);
return;
},
Ok(_) => { // genesis mismatch
trace!(target:"sync", "Ancestry search: genesis mismatch for peer {}", peer_id);
io.disable_peer(peer_id);
return;
},
Expand Down Expand Up @@ -326,7 +328,7 @@ impl ChainSync {
let stale = header.number <= self.best_queued_number;
if stale {
if !self.is_known_or_already_downloading(protocol, &header.parent_hash) {
trace!(target: "sync", "Ignoring unkown stale block announce from {}: {} {:?}", peer_id, hash, header);
trace!(target: "sync", "Ignoring unknown stale block announce from {}: {} {:?}", peer_id, hash, header);
} else {
trace!(target: "sync", "Downloading new stale block announced from {}: {} {:?}", peer_id, hash, header);
self.download_stale(io, protocol, peer_id, &hash);
Expand Down
6 changes: 4 additions & 2 deletions substrate/network/src/test/consensus.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,18 @@ fn bft_messages_include_those_sent_before_asking_for_stream() {
signature: Default::default(),
}));

let parent_hash = peer.genesis_hash();

let localized = LocalizedBftMessage {
message: bft_message,
parent_hash: [1; 32].into(),
parent_hash: parent_hash,
};


let as_bytes = ::serde_json::to_vec(&Message::BftMessage(localized.clone())).unwrap();
peer.sync.handle_packet(&mut io, 1, &as_bytes[..]);

let stream = peer.sync.bft_messages([1; 32].into());
let stream = peer.sync.bft_messages(parent_hash);

assert_eq!(stream.wait().next(), Some(Ok(localized)));
}
7 changes: 6 additions & 1 deletion substrate/network/src/test/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::sync::Arc;
use parking_lot::RwLock;
use client;
use client::block_builder::BlockBuilder;
use primitives::block::{Id as BlockId, ExtrinsicHash};
use primitives::block::{Id as BlockId, ExtrinsicHash, HeaderHash};
use primitives;
use io::SyncIo;
use protocol::Protocol;
Expand Down Expand Up @@ -187,6 +187,11 @@ impl Peer {
self.generate_blocks(count, |_| ());
}
}

pub fn genesis_hash(&self) -> HeaderHash {
let info = self.client.info().expect("In-mem client does not fail");
info.chain.genesis_hash
}
}

struct EmptyTransactionPool;
Expand Down