From c5cc70a85689051802b56eb08907f3505068bbe3 Mon Sep 17 00:00:00 2001 From: arkpar Date: Fri, 11 May 2018 14:01:21 +0200 Subject: [PATCH 1/5] Networking and backend fixes --- substrate/client/db/src/lib.rs | 4 +- substrate/network/src/blocks.rs | 2 +- substrate/network/src/consensus.rs | 64 ++++++++++++++++++++---------- substrate/network/src/message.rs | 1 + substrate/network/src/protocol.rs | 12 ++++-- substrate/network/src/sync.rs | 11 +++-- 6 files changed, 61 insertions(+), 33 deletions(-) diff --git a/substrate/client/db/src/lib.rs b/substrate/client/db/src/lib.rs index d5f295205eeaa..1eca05fae7a19 100644 --- a/substrate/client/db/src/lib.rs +++ b/substrate/client/db/src/lib.rs @@ -243,9 +243,7 @@ impl client::blockchain::Backend for BlockchainDb { } fn hash(&self, number: block::Number) -> Result, client::error::Error> { - Ok(self.db.get(columns::BLOCK_INDEX, &number_to_db_key(number)) - .map_err(db_err)? - .map(|hash| block::HeaderHash::from_slice(&hash))) + Ok(self.header(BlockId::Number(number))?.map(|hdr| hdr.blake2_256().into())) } } diff --git a/substrate/network/src/blocks.rs b/substrate/network/src/blocks.rs index f111d431049fb..5cff7abd98dd0 100644 --- a/substrate/network/src/blocks.rs +++ b/substrate/network/src/blocks.rs @@ -125,7 +125,7 @@ impl BlockCollection { }; // crop to peers best - if range.start >= peer_best { + if range.start > peer_best { trace!(target: "sync", "Out of range for peer {} ({} vs {})", peer_id, range.start, peer_best); return None; } diff --git a/substrate/network/src/consensus.rs b/substrate/network/src/consensus.rs index af43e396322e1..929f6af3c6f69 100644 --- a/substrate/network/src/consensus.rs +++ b/substrate/network/src/consensus.rs @@ -18,18 +18,14 @@ use std::collections::{HashMap, HashSet}; use futures::sync::{oneshot, mpsc}; -use std::time::{Instant, Duration}; -use std::collections::hash_map::Entry; use io::SyncIo; use protocol::Protocol; use network::PeerId; -use primitives::Hash; +use primitives::{Hash, block::HeaderHash, block::Id as BlockId}; +use client::BlockStatus; use message::{self, Message}; use runtime_support::Hashable; -// TODO: Add additional spam/DoS attack protection. -const MESSAGE_LIFETIME_SECONDS: u64 = 600; - struct CandidateRequest { id: message::RequestId, completion: oneshot::Sender>, @@ -47,7 +43,8 @@ pub struct Consensus { our_candidate: Option<(Hash, Vec)>, statement_sink: Option>, bft_message_sink: Option<(mpsc::UnboundedSender, Hash)>, - messages: HashMap, + messages: Vec<(Hash, message::Message)>, + message_hashes: HashSet, } impl Consensus { @@ -59,6 +56,7 @@ impl Consensus { statement_sink: None, bft_message_sink: None, messages: Default::default(), + message_hashes: Default::default(), } } @@ -75,9 +73,9 @@ impl Consensus { // Send out all known messages. // TODO: limit by size let mut known_messages = HashSet::new(); - for (hash, &(_, ref m)) in self.messages.iter() { + for &(ref hash, ref message) in self.messages.iter() { known_messages.insert(hash.clone()); - protocol.send_message(io, peer_id, m.clone()); + protocol.send_message(io, peer_id, message.clone()); } self.peers.insert(peer_id, PeerConsensus { candidate_fetch: None, @@ -96,13 +94,13 @@ impl Consensus { } fn register_message(&mut self, hash: Hash, message: message::Message) { - if let Entry::Vacant(entry) = self.messages.entry(hash) { - entry.insert((Instant::now(), message)); + if self.message_hashes.insert(hash) { + self.messages.push((hash, message)); } } pub fn on_statement(&mut self, io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId, statement: message::Statement, hash: Hash) { - if self.messages.contains_key(&hash) { + if self.message_hashes.contains(&hash) { trace!(target:"sync", "Ignored already known statement from {}", peer_id); } if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { @@ -137,11 +135,24 @@ impl Consensus { } pub fn on_bft_message(&mut self, io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId, message: message::LocalizedBftMessage, hash: Hash) { - if self.messages.contains_key(&hash) { + if self.message_hashes.contains(&hash) { trace!(target:"sync", "Ignored already known BFT message from {}", peer_id); return; } + match protocol.chain().block_status(&BlockId::Hash(message.parent_hash)) { + Err(e) => { + debug!(target:"sync", "Error reading blockchain: {:?}", e); + return; + }, + Ok(status) => { + if status != BlockStatus::InChain { + trace!(target:"sync", "Ignored unknown parent BFT message from {}, hash={}", peer_id, message.parent_hash); + return; + } + }, + } + if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { peer.known_messages.insert(hash); // TODO: validate signature? @@ -168,9 +179,9 @@ impl Consensus { pub fn bft_messages(&mut self, parent_hash: Hash) -> mpsc::UnboundedReceiver{ let (sink, stream) = mpsc::unbounded(); - for (_, message) in self.messages.iter() { + for &(_, ref message) in self.messages.iter() { let bft_message = match *message { - (_, Message::BftMessage(ref msg)) => msg, + Message::BftMessage(ref msg) => msg, _ => continue, }; @@ -266,17 +277,28 @@ impl Consensus { self.peers.remove(&peer_id); } - pub fn collect_garbage(&mut self) { - let expiration = Duration::from_secs(MESSAGE_LIFETIME_SECONDS); - let now = Instant::now(); + pub fn collect_garbage(&mut self, best_block_parent: Option) { let before = self.messages.len(); - self.messages.retain(|_, &mut (timestamp, _)| timestamp < now + expiration); + let hashes = &mut self.message_hashes; + self.messages.retain(|&(ref hash, ref message)| { + best_block_parent.map_or(true, |parent_hash| { + if match *message { + Message::BftMessage(ref msg) => msg.parent_hash != parent_hash, + Message::Statement(ref msg) => msg.parent_hash != parent_hash, + _ => true, + } { + hashes.remove(hash); + true + } else { + false + } + }) + }); if self.messages.len() != before { trace!(target:"sync", "Cleaned up {} stale messages", before - self.messages.len()); } - let messages = &self.messages; for (_, ref mut peer) in self.peers.iter_mut() { - peer.known_messages.retain(|h| messages.contains_key(h)); + peer.known_messages.retain(|h| hashes.contains(h)); } } } diff --git a/substrate/network/src/message.rs b/substrate/network/src/message.rs index 5ab6c2b6a381b..8e1fe804dda05 100644 --- a/substrate/network/src/message.rs +++ b/substrate/network/src/message.rs @@ -225,6 +225,7 @@ pub enum SignedConsensusMessage { /// A vote. Vote(SignedConsensusVote), } + #[derive(Debug, PartialEq, Eq, Clone, Serialize, Deserialize)] /// A network message. pub enum Message { diff --git a/substrate/network/src/protocol.rs b/substrate/network/src/protocol.rs index 75498f37d6505..eb23a057db0db 100644 --- a/substrate/network/src/protocol.rs +++ b/substrate/network/src/protocol.rs @@ -36,7 +36,7 @@ use io::SyncIo; use error; use super::header_hash; -const REQUEST_TIMEOUT_SEC: u64 = 15; +const REQUEST_TIMEOUT_SEC: u64 = 40; const PROTOCOL_VERSION: u32 = 0; // Maximum allowed entries in `BlockResponse` @@ -344,7 +344,7 @@ impl Protocol { /// Perform time based maintenance. pub fn tick(&self, io: &mut SyncIo) { self.maintain_peers(io); - self.consensus.lock().collect_garbage(); + self.consensus.lock().collect_garbage(None); } fn maintain_peers(&self, io: &mut SyncIo) { @@ -387,6 +387,8 @@ impl Protocol { return; } + let mut sync = self.sync.write(); + let mut consensus = self.consensus.lock(); { let mut peers = self.peers.write(); let mut handshaking_peers = self.handshaking_peers.write(); @@ -420,8 +422,8 @@ impl Protocol { handshaking_peers.remove(&peer_id); debug!(target: "sync", "Connected {} {}", peer_id, io.peer_info(peer_id)); } - self.sync.write().new_peer(io, self, peer_id); - self.consensus.lock().new_peer(io, self, peer_id, &status.roles); + sync.new_peer(io, self, peer_id); + consensus.new_peer(io, self, peer_id, &status.roles); } /// Called when peer sends us new transactions @@ -511,6 +513,8 @@ impl Protocol { })); } } + + self.consensus.lock().collect_garbage(Some(header.parent_hash)); } pub fn transactions_stats(&self) -> BTreeMap { diff --git a/substrate/network/src/sync.rs b/substrate/network/src/sync.rs index 1599e162860de..eef04494a7f51 100644 --- a/substrate/network/src/sync.rs +++ b/substrate/network/src/sync.rs @@ -114,13 +114,13 @@ impl ChainSync { io.disable_peer(peer_id); }, (Ok(BlockStatus::Unknown), 0) => { - debug!(target:"sync", "New peer with unkown genesis hash {} ({}).", info.best_hash, info.best_number); + debug!(target:"sync", "New peer with unknown genesis hash {} ({}).", info.best_hash, info.best_number); io.disable_peer(peer_id); }, (Ok(BlockStatus::Unknown), _) => { let our_best = self.best_queued_number; if our_best > 0 { - debug!(target:"sync", "New peer with unkown best hash {} ({}), searching for common ancestor.", info.best_hash, info.best_number); + debug!(target:"sync", "New peer with unknown best hash {} ({}), searching for common ancestor.", info.best_hash, info.best_number); self.peers.insert(peer_id, PeerSync { common_hash: self.genesis_hash, common_number: 0, @@ -186,13 +186,15 @@ impl ChainSync { trace!(target:"sync", "Found common ancestor for peer {}: {} ({})", peer_id, block.hash, n); vec![] }, - Ok(_) if n > 0 => { + Ok(our_best) if n > 0 => { + trace!(target:"sync", "Ancestry block mismatch for peer {}: theirs: {} ({}), ours: {:?}", peer_id, block.hash, n, our_best); let n = n - 1; peer.state = PeerSyncState::AncestorSearch(n); Self::request_ancestry(io, protocol, peer_id, n); return; }, Ok(_) => { // genesis mismatch + trace!(target:"sync", "Ancestry search: genesis mismatch for peer {}", peer_id); io.disable_peer(peer_id); return; }, @@ -325,7 +327,7 @@ impl ChainSync { let stale = header.number <= self.best_queued_number; if stale { if !self.is_known_or_already_downloading(protocol, &header.parent_hash) { - trace!(target: "sync", "Ignoring unkown stale block announce from {}: {} {:?}", peer_id, hash, header); + trace!(target: "sync", "Ignoring unknown stale block announce from {}: {} {:?}", peer_id, hash, header); } else { trace!(target: "sync", "Downloading new stale block announced from {}: {} {:?}", peer_id, hash, header); self.download_stale(io, protocol, peer_id, &hash); @@ -423,6 +425,7 @@ impl ChainSync { } fn request_ancestry(io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId, block: BlockNumber) { + trace!(target: "sync", "Requesting potential common ancestor {} from {}", block, peer_id); let request = message::BlockRequest { id: 0, fields: vec![message::BlockAttribute::Header, message::BlockAttribute::Justification], From 6111c62a842a412679ea3b35784c5045bedb4a73 Mon Sep 17 00:00:00 2001 From: arkpar Date: Fri, 11 May 2018 14:42:35 +0200 Subject: [PATCH 2/5] Fixed test --- substrate/network/src/test/consensus.rs | 6 ++++-- substrate/network/src/test/mod.rs | 7 ++++++- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/substrate/network/src/test/consensus.rs b/substrate/network/src/test/consensus.rs index 0733e6d37cc51..9cf7dad818722 100644 --- a/substrate/network/src/test/consensus.rs +++ b/substrate/network/src/test/consensus.rs @@ -34,16 +34,18 @@ fn bft_messages_include_those_sent_before_asking_for_stream() { signature: Default::default(), })); + let parent_hash = peer.genesis_hash(); + let localized = LocalizedBftMessage { message: bft_message, - parent_hash: [1; 32].into(), + parent_hash: parent_hash, }; let as_bytes = ::serde_json::to_vec(&Message::BftMessage(localized.clone())).unwrap(); peer.sync.handle_packet(&mut io, 1, &as_bytes[..]); - let stream = peer.sync.bft_messages([1; 32].into()); + let stream = peer.sync.bft_messages(parent_hash); assert_eq!(stream.wait().next(), Some(Ok(localized))); } diff --git a/substrate/network/src/test/mod.rs b/substrate/network/src/test/mod.rs index 96b56ce45a93f..a62bf5177ca0f 100644 --- a/substrate/network/src/test/mod.rs +++ b/substrate/network/src/test/mod.rs @@ -23,7 +23,7 @@ use std::sync::Arc; use parking_lot::RwLock; use client; use client::block_builder::BlockBuilder; -use primitives::block::{Id as BlockId, ExtrinsicHash}; +use primitives::block::{Id as BlockId, ExtrinsicHash, HeaderHash}; use primitives; use io::SyncIo; use protocol::Protocol; @@ -187,6 +187,11 @@ impl Peer { self.generate_blocks(count, |_| ()); } } + + pub fn genesis_hash(&self) -> HeaderHash { + let info = self.client.info().expect("In-mem client does not fail"); + info.chain.genesis_hash + } } struct EmptyTransactionPool; From eeaf64525ce9566eb98edfeb580211d519ac11b3 Mon Sep 17 00:00:00 2001 From: arkpar Date: Fri, 11 May 2018 14:52:46 +0200 Subject: [PATCH 3/5] Reverted DB fix --- substrate/client/db/src/lib.rs | 6 ++++-- substrate/network/src/sync.rs | 1 - 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/substrate/client/db/src/lib.rs b/substrate/client/db/src/lib.rs index 1eca05fae7a19..c3a36736dc06d 100644 --- a/substrate/client/db/src/lib.rs +++ b/substrate/client/db/src/lib.rs @@ -243,7 +243,9 @@ impl client::blockchain::Backend for BlockchainDb { } fn hash(&self, number: block::Number) -> Result, client::error::Error> { - Ok(self.header(BlockId::Number(number))?.map(|hdr| hdr.blake2_256().into())) + Ok(self.db.get(columns::BLOCK_INDEX, &number_to_db_key(number)) + .map_err(db_err)? + .map(|hash| block::HeaderHash::from_slice(&hash))) } } @@ -360,7 +362,7 @@ impl client::backend::Backend for Backend { if let Some(justification) = pending_block.justification { transaction.put(columns::JUSTIFICATION, &key, &justification.encode()); } - transaction.put(columns::BLOCK_INDEX, &hash, &key); + transaction.put(columns::BLOCK_INDEX, &key, &hash); if pending_block.is_best { transaction.put(columns::META, meta::BEST_BLOCK, &key); } diff --git a/substrate/network/src/sync.rs b/substrate/network/src/sync.rs index eef04494a7f51..f3d6ddea8cb81 100644 --- a/substrate/network/src/sync.rs +++ b/substrate/network/src/sync.rs @@ -425,7 +425,6 @@ impl ChainSync { } fn request_ancestry(io: &mut SyncIo, protocol: &Protocol, peer_id: PeerId, block: BlockNumber) { - trace!(target: "sync", "Requesting potential common ancestor {} from {}", block, peer_id); let request = message::BlockRequest { id: 0, fields: vec![message::BlockAttribute::Header, message::BlockAttribute::Justification], From 168ff05db91ab18e09c2f3407a2a5de07b71ffbb Mon Sep 17 00:00:00 2001 From: arkpar Date: Fri, 11 May 2018 15:58:46 +0200 Subject: [PATCH 4/5] Reverted DB fix properly --- substrate/client/db/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/substrate/client/db/src/lib.rs b/substrate/client/db/src/lib.rs index c3a36736dc06d..d5f295205eeaa 100644 --- a/substrate/client/db/src/lib.rs +++ b/substrate/client/db/src/lib.rs @@ -362,7 +362,7 @@ impl client::backend::Backend for Backend { if let Some(justification) = pending_block.justification { transaction.put(columns::JUSTIFICATION, &key, &justification.encode()); } - transaction.put(columns::BLOCK_INDEX, &key, &hash); + transaction.put(columns::BLOCK_INDEX, &hash, &key); if pending_block.is_best { transaction.put(columns::META, meta::BEST_BLOCK, &key); } From 1c2311b3155832e7e15b2f4def6322696108a8a9 Mon Sep 17 00:00:00 2001 From: arkpar Date: Fri, 11 May 2018 19:19:56 +0200 Subject: [PATCH 5/5] Preserve messages with unknown parent_hash --- substrate/network/src/consensus.rs | 76 +++++++++++++++++++----------- substrate/network/src/protocol.rs | 5 +- 2 files changed, 51 insertions(+), 30 deletions(-) diff --git a/substrate/network/src/consensus.rs b/substrate/network/src/consensus.rs index 929f6af3c6f69..5c4689384deb3 100644 --- a/substrate/network/src/consensus.rs +++ b/substrate/network/src/consensus.rs @@ -18,14 +18,17 @@ use std::collections::{HashMap, HashSet}; use futures::sync::{oneshot, mpsc}; +use std::time::{Instant, Duration}; use io::SyncIo; use protocol::Protocol; use network::PeerId; -use primitives::{Hash, block::HeaderHash, block::Id as BlockId}; -use client::BlockStatus; +use primitives::{Hash, block::HeaderHash, block::Id as BlockId, block::Header}; use message::{self, Message}; use runtime_support::Hashable; +// TODO: Add additional spam/DoS attack protection. +const MESSAGE_LIFETIME_SECONDS: u64 = 600; + struct CandidateRequest { id: message::RequestId, completion: oneshot::Sender>, @@ -43,13 +46,14 @@ pub struct Consensus { our_candidate: Option<(Hash, Vec)>, statement_sink: Option>, bft_message_sink: Option<(mpsc::UnboundedSender, Hash)>, - messages: Vec<(Hash, message::Message)>, + messages: Vec<(Hash, Instant, message::Message)>, message_hashes: HashSet, + last_block_hash: HeaderHash, } impl Consensus { /// Create a new instance. - pub fn new() -> Consensus { + pub fn new(best_hash: HeaderHash) -> Consensus { Consensus { peers: HashMap::new(), our_candidate: None, @@ -57,6 +61,7 @@ impl Consensus { bft_message_sink: None, messages: Default::default(), message_hashes: Default::default(), + last_block_hash: best_hash, } } @@ -73,7 +78,7 @@ impl Consensus { // Send out all known messages. // TODO: limit by size let mut known_messages = HashSet::new(); - for &(ref hash, ref message) in self.messages.iter() { + for &(ref hash, _, ref message) in self.messages.iter() { known_messages.insert(hash.clone()); protocol.send_message(io, peer_id, message.clone()); } @@ -95,7 +100,7 @@ impl Consensus { fn register_message(&mut self, hash: Hash, message: message::Message) { if self.message_hashes.insert(hash) { - self.messages.push((hash, message)); + self.messages.push((hash, Instant::now(), message)); } } @@ -140,17 +145,18 @@ impl Consensus { return; } - match protocol.chain().block_status(&BlockId::Hash(message.parent_hash)) { - Err(e) => { + match (protocol.chain().info(), protocol.chain().header(&BlockId::Hash(message.parent_hash))) { + (_, Err(e)) | (Err(e), _) => { debug!(target:"sync", "Error reading blockchain: {:?}", e); return; }, - Ok(status) => { - if status != BlockStatus::InChain { - trace!(target:"sync", "Ignored unknown parent BFT message from {}, hash={}", peer_id, message.parent_hash); + (Ok(info), Ok(Some(header))) => { + if header.number < info.chain.best_number { + trace!(target:"sync", "Ignored ancient BFT message from {}, hash={}", peer_id, message.parent_hash); return; } }, + (Ok(_), Ok(None)) => {}, } if let Some(ref mut peer) = self.peers.get_mut(&peer_id) { @@ -179,7 +185,7 @@ impl Consensus { pub fn bft_messages(&mut self, parent_hash: Hash) -> mpsc::UnboundedReceiver{ let (sink, stream) = mpsc::unbounded(); - for &(_, ref message) in self.messages.iter() { + for &(_, _, ref message) in self.messages.iter() { let bft_message = match *message { Message::BftMessage(ref msg) => msg, _ => continue, @@ -277,23 +283,37 @@ impl Consensus { self.peers.remove(&peer_id); } - pub fn collect_garbage(&mut self, best_block_parent: Option) { - let before = self.messages.len(); + pub fn collect_garbage(&mut self, best_hash_and_header: Option<(HeaderHash, &Header)>) { let hashes = &mut self.message_hashes; - self.messages.retain(|&(ref hash, ref message)| { - best_block_parent.map_or(true, |parent_hash| { - if match *message { - Message::BftMessage(ref msg) => msg.parent_hash != parent_hash, - Message::Statement(ref msg) => msg.parent_hash != parent_hash, - _ => true, - } { - hashes.remove(hash); - true - } else { - false - } - }) - }); + let last_block_hash = &mut self.last_block_hash; + let before = self.messages.len(); + let (best_hash, best_header) = best_hash_and_header.map(|(h, header)| (Some(h), Some(header))).unwrap_or((None, None)); + if best_header.as_ref().map_or(false, |header| header.parent_hash != *last_block_hash) { + trace!(target:"sync", "Clearing conensus message cache"); + self.messages.clear(); + hashes.clear(); + } else { + let expiration = Duration::from_secs(MESSAGE_LIFETIME_SECONDS); + let now = Instant::now(); + if let Some(hash) = best_hash { + *last_block_hash = hash; + } + self.messages.retain(|&(ref hash, timestamp, ref message)| { + timestamp < now + expiration || + best_header.map_or(true, |header| { + if match *message { + Message::BftMessage(ref msg) => msg.parent_hash != header.parent_hash, + Message::Statement(ref msg) => msg.parent_hash != header.parent_hash, + _ => true, + } { + hashes.remove(hash); + true + } else { + false + } + }) + }); + } if self.messages.len() != before { trace!(target:"sync", "Cleaned up {} stale messages", before - self.messages.len()); } diff --git a/substrate/network/src/protocol.rs b/substrate/network/src/protocol.rs index eb23a057db0db..5a66b21f0ed16 100644 --- a/substrate/network/src/protocol.rs +++ b/substrate/network/src/protocol.rs @@ -114,12 +114,13 @@ impl Protocol { /// Create a new instance. pub fn new(config: ProtocolConfig, chain: Arc, transaction_pool: Arc) -> error::Result { let info = chain.info()?; + let best_hash = info.chain.best_hash; let protocol = Protocol { config: config, chain: chain, genesis_hash: info.chain.genesis_hash, sync: RwLock::new(ChainSync::new(&info)), - consensus: Mutex::new(Consensus::new()), + consensus: Mutex::new(Consensus::new(best_hash)), peers: RwLock::new(HashMap::new()), handshaking_peers: RwLock::new(HashMap::new()), transaction_pool: transaction_pool, @@ -514,7 +515,7 @@ impl Protocol { } } - self.consensus.lock().collect_garbage(Some(header.parent_hash)); + self.consensus.lock().collect_garbage(Some((hash, &header))); } pub fn transactions_stats(&self) -> BTreeMap {