From 627ca68e940ee2d3413e648aa6474762bfa2e2bc Mon Sep 17 00:00:00 2001 From: Joonmo Yang Date: Thu, 7 Nov 2019 17:36:05 +0900 Subject: [PATCH 1/5] Implement Trie::is_complete --- util/merkle/src/lib.rs | 3 ++ util/merkle/src/triedb.rs | 63 +++++++++++++++++++++++++++++++++++- util/merkle/src/triedbmut.rs | 4 +++ 3 files changed, 69 insertions(+), 1 deletion(-) diff --git a/util/merkle/src/lib.rs b/util/merkle/src/lib.rs index 2e60e94304..f8822159ef 100644 --- a/util/merkle/src/lib.rs +++ b/util/merkle/src/lib.rs @@ -82,6 +82,9 @@ pub trait Trie { /// What is the value of the given key in this trie? fn get(&self, key: &[u8]) -> Result>; + + /// Does all the nodes in this trie exist in the underlying database? + fn is_complete(&self) -> bool; } /// A key-value datastore implemented as a database-backed modified Merkle tree. diff --git a/util/merkle/src/triedb.rs b/util/merkle/src/triedb.rs index 6293b72e63..69369d3c6c 100644 --- a/util/merkle/src/triedb.rs +++ b/util/merkle/src/triedb.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use ccrypto::blake256; +use ccrypto::{blake256, BLAKE_NULL_RLP}; use cdb::HashDB; use primitives::H256; @@ -101,6 +101,21 @@ impl<'db> TrieDB<'db> { None => Ok(None), } } + + /// Check if every leaf of the trie starting from `hash` exists + fn is_complete_aux(&self, hash: &H256) -> bool { + if let Some(node_rlp) = self.db.get(hash) { + match RlpNode::decoded(node_rlp.as_ref()) { + Some(RlpNode::Branch(.., children)) => { + children.iter().flatten().all(|child| self.is_complete_aux(child)) + } + Some(RlpNode::Leaf(..)) => true, + None => false, + } + } else { + false + } + } } impl<'db> Trie for TrieDB<'db> { @@ -114,6 +129,10 @@ impl<'db> Trie for TrieDB<'db> { self.get_aux(&NibbleSlice::new(&path), Some(root), &|bytes| bytes.to_vec()) } + + fn is_complete(&self) -> bool { + *self.root == BLAKE_NULL_RLP || self.is_complete_aux(self.root) + } } #[cfg(test)] @@ -123,6 +142,19 @@ mod tests { use super::*; use crate::*; + fn delete_any_child(db: &mut MemoryDB, root: &H256) { + let node_rlp = db.get(root).unwrap(); + match RlpNode::decoded(&node_rlp).unwrap() { + RlpNode::Leaf(..) => { + db.remove(root); + } + RlpNode::Branch(.., children) => { + let first_child = children.iter().find(|c| c.is_some()).unwrap().unwrap(); + db.remove(&first_child); + } + } + } + #[test] fn get() { let mut memdb = MemoryDB::new(); @@ -138,4 +170,33 @@ mod tests { assert_eq!(t.get(b"B"), Ok(Some(b"ABCBA".to_vec()))); assert_eq!(t.get(b"C"), Ok(None)); } + + #[test] + fn is_complete_success() { + let mut memdb = MemoryDB::new(); + let mut root = H256::new(); + { + let mut t = TrieDBMut::new(&mut memdb, &mut root); + t.insert(b"A", b"ABC").unwrap(); + t.insert(b"B", b"ABCBA").unwrap(); + } + + let t = TrieDB::try_new(&memdb, &root).unwrap(); + assert!(t.is_complete()); + } + + #[test] + fn is_complete_fail() { + let mut memdb = MemoryDB::new(); + let mut root = H256::new(); + { + let mut t = TrieDBMut::new(&mut memdb, &mut root); + t.insert(b"A", b"ABC").unwrap(); + t.insert(b"B", b"ABCBA").unwrap(); + } + delete_any_child(&mut memdb, &root); + + let t = TrieDB::try_new(&memdb, &root).unwrap(); + assert!(!t.is_complete()); + } } diff --git a/util/merkle/src/triedbmut.rs b/util/merkle/src/triedbmut.rs index 86c55c7671..d94fd1efa1 100644 --- a/util/merkle/src/triedbmut.rs +++ b/util/merkle/src/triedbmut.rs @@ -417,6 +417,10 @@ impl<'a> Trie for TrieDBMut<'a> { t.get(key) } + + fn is_complete(&self) -> bool { + TrieDB::try_new(self.db, self.root).map(|t| t.is_complete()).unwrap_or(false) + } } impl<'a> TrieMut for TrieDBMut<'a> { From 91925810ff8d647077efc3d5458a4cf943368445 Mon Sep 17 00:00:00 2001 From: Joonmo Yang Date: Wed, 6 Nov 2019 20:17:26 +0900 Subject: [PATCH 2/5] Add configuration option for the snapshot sync target --- foundry/config/mod.rs | 15 +++++++++++++++ foundry/foundry.yml | 12 ++++++++++++ foundry/run_node.rs | 6 +++++- sync/src/block/extension.rs | 2 +- 4 files changed, 33 insertions(+), 2 deletions(-) diff --git a/foundry/config/mod.rs b/foundry/config/mod.rs index a373bf2aa8..dc92a71276 100644 --- a/foundry/config/mod.rs +++ b/foundry/config/mod.rs @@ -25,6 +25,7 @@ use cidr::IpCidr; use ckey::PlatformAddress; use clap; use cnetwork::{FilterEntry, NetworkConfig, SocketAddr}; +use primitives::H256; use toml; pub use self::chain_type::ChainType; @@ -257,6 +258,8 @@ pub struct Network { pub min_peers: Option, pub max_peers: Option, pub sync: Option, + pub snapshot_hash: Option, + pub snapshot_number: Option, pub transaction_relay: Option, pub discovery: Option, pub discovery_type: Option, @@ -540,6 +543,12 @@ impl Network { if other.sync.is_some() { self.sync = other.sync; } + if other.snapshot_hash.is_some() { + self.snapshot_hash = other.snapshot_hash; + } + if other.snapshot_number.is_some() { + self.snapshot_number = other.snapshot_number; + } if other.transaction_relay.is_some() { self.transaction_relay = other.transaction_relay; } @@ -592,6 +601,12 @@ impl Network { if matches.is_present("no-sync") { self.sync = Some(false); } + if let Some(snapshot_hash) = matches.value_of("snapshot-hash") { + self.snapshot_hash = Some(snapshot_hash.parse().map_err(|_| "Invalid snapshot-hash")?); + } + if let Some(snapshot_number) = matches.value_of("snapshot-number") { + self.snapshot_number = Some(snapshot_number.parse().map_err(|_| "Invalid snapshot-number")?); + } if matches.is_present("no-tx-relay") { self.transaction_relay = Some(false); } diff --git a/foundry/foundry.yml b/foundry/foundry.yml index 4269a4ca33..c385818eb4 100644 --- a/foundry/foundry.yml +++ b/foundry/foundry.yml @@ -252,6 +252,18 @@ args: takes_value: true conflicts_with: - no-discovery + - snapshot-hash: + long: snapshot-hash + value_name: HASH + help: The block hash of the snapshot target block. + requires: snapshot-number + takes_value: true + - snapshot-number: + long: snapshot-number + value_name: NUM + help: The block number of the snapshot target block. + requires: snapshot-hash + takes_value: true - no-snapshot: long: no-snapshot help: Disable snapshots diff --git a/foundry/run_node.rs b/foundry/run_node.rs index b47f77abaa..cba017ee69 100644 --- a/foundry/run_node.rs +++ b/foundry/run_node.rs @@ -277,7 +277,11 @@ pub fn run_node(matches: &ArgMatches<'_>) -> Result<(), String> { if config.network.sync.unwrap() { let sync_sender = { let client = client.client(); - service.register_extension(move |api| BlockSyncExtension::new(client, api)) + let snapshot_target = match (config.network.snapshot_hash, config.network.snapshot_number) { + (Some(hash), Some(num)) => Some((hash, num)), + _ => None, + }; + service.register_extension(move |api| BlockSyncExtension::new(client, api, snapshot_target)) }; let sync = Arc::new(BlockSyncSender::from(sync_sender.clone())); client.client().add_notify(Arc::downgrade(&sync) as Weak); diff --git a/sync/src/block/extension.rs b/sync/src/block/extension.rs index a40640092c..de6ecf9b77 100644 --- a/sync/src/block/extension.rs +++ b/sync/src/block/extension.rs @@ -68,7 +68,7 @@ pub struct Extension { } impl Extension { - pub fn new(client: Arc, api: Box) -> Extension { + pub fn new(client: Arc, api: Box, _snapshot_target: Option<(H256, u64)>) -> Extension { api.set_timer(SYNC_TIMER_TOKEN, Duration::from_millis(SYNC_TIMER_INTERVAL)).expect("Timer set succeeds"); let mut header = client.best_header(); From f052d9922dbe9cab103f1ceb4d577168446a44ec Mon Sep 17 00:00:00 2001 From: Joonmo Yang Date: Thu, 7 Nov 2019 15:31:15 +0900 Subject: [PATCH 3/5] Decide the sync extension's state with the snapshot target --- sync/src/block/extension.rs | 150 ++++++++++++++++++++++++++---------- 1 file changed, 109 insertions(+), 41 deletions(-) diff --git a/sync/src/block/extension.rs b/sync/src/block/extension.rs index de6ecf9b77..86141b62f0 100644 --- a/sync/src/block/extension.rs +++ b/sync/src/block/extension.rs @@ -22,14 +22,16 @@ use std::time::Duration; use ccore::encoded::Header as EncodedHeader; use ccore::{ Block, BlockChainClient, BlockChainTrait, BlockId, BlockImportError, BlockStatus, ChainNotify, Client, ImportBlock, - ImportError, UnverifiedTransaction, + ImportError, StateInfo, UnverifiedTransaction, }; +use cdb::AsHashDB; +use cmerkle::{Trie, TrieFactory}; use cnetwork::{Api, EventSender, NetworkExtension, NodeId}; -use cstate::FindActionHandler; +use cstate::{FindActionHandler, TopStateView}; use ctimer::TimerToken; use ctypes::header::{Header, Seal}; use ctypes::transaction::Action; -use ctypes::{BlockHash, BlockNumber}; +use ctypes::{BlockHash, BlockNumber, ShardId}; use primitives::{H256, U256}; use rand::prelude::SliceRandom; use rand::thread_rng; @@ -53,7 +55,58 @@ pub struct TokenInfo { request_id: Option, } +#[derive(Debug)] +enum State { + SnapshotHeader(BlockHash), + SnapshotBody(BlockHash), + SnapshotTopChunk(H256), + SnapshotShardChunk(ShardId, H256), + Full, +} + +impl State { + fn initial(client: &Client, snapshot_target: Option<(H256, u64)>) -> Self { + let (hash, num) = match snapshot_target { + Some((h, n)) => (h.into(), n), + None => return State::Full, + }; + let header = match client.block_header(&num.into()) { + Some(h) if h.hash() == hash => h, + _ => return State::SnapshotHeader(hash), + }; + if client.block_body(&hash.into()).is_none() { + return State::SnapshotBody(hash) + } + + let state_db = client.state_db().read(); + let state_root = header.state_root(); + let top_trie = TrieFactory::readonly(state_db.as_hashdb(), &state_root); + if !top_trie.map(|t| t.is_complete()).unwrap_or(false) { + return State::SnapshotTopChunk(state_root) + } + + let top_state = client.state_at(hash.into()).expect("Top level state at the snapshot header exists"); + let metadata = top_state.metadata().unwrap().expect("Metadata must exist for the snapshot block"); + let shard_num = *metadata.number_of_shards(); + let empty_shard = (0..shard_num).find_map(|n| { + let shard_root = top_state.shard_root(n).unwrap().expect("Shard root must exist"); + let trie = TrieFactory::readonly(state_db.as_hashdb(), &shard_root); + if !trie.map(|t| t.is_complete()).unwrap_or(false) { + Some((n, shard_root)) + } else { + None + } + }); + if let Some((shard_id, shard_root)) = empty_shard { + return State::SnapshotShardChunk(shard_id, shard_root) + } + + State::Full + } +} + pub struct Extension { + state: State, requests: HashMap>, connected_nodes: HashSet, header_downloaders: HashMap, @@ -68,9 +121,11 @@ pub struct Extension { } impl Extension { - pub fn new(client: Arc, api: Box, _snapshot_target: Option<(H256, u64)>) -> Extension { + pub fn new(client: Arc, api: Box, snapshot_target: Option<(H256, u64)>) -> Extension { api.set_timer(SYNC_TIMER_TOKEN, Duration::from_millis(SYNC_TIMER_INTERVAL)).expect("Timer set succeeds"); + let state = State::initial(&client, snapshot_target); + cdebug!(SYNC, "Initial state is {:?}", state); let mut header = client.best_header(); let mut hollow_headers = vec![header.decode()]; while client.block_body(&BlockId::Hash(header.hash())).is_none() { @@ -89,6 +144,7 @@ impl Extension { } cinfo!(SYNC, "Sync extension initialized"); Extension { + state, requests: Default::default(), connected_nodes: Default::default(), header_downloaders: Default::default(), @@ -318,22 +374,28 @@ impl NetworkExtension for Extension { fn on_timeout(&mut self, token: TimerToken) { match token { - SYNC_TIMER_TOKEN => { - let mut peer_ids: Vec<_> = self.header_downloaders.keys().cloned().collect(); - peer_ids.shuffle(&mut thread_rng()); - - for id in &peer_ids { - let request = self.header_downloaders.get_mut(id).and_then(HeaderDownloader::create_request); - if let Some(request) = request { - self.send_header_request(id, request); - break + SYNC_TIMER_TOKEN => match self.state { + State::SnapshotHeader(..) => unimplemented!(), + State::SnapshotBody(..) => unimplemented!(), + State::SnapshotTopChunk(..) => unimplemented!(), + State::SnapshotShardChunk(..) => unimplemented!(), + State::Full => { + let mut peer_ids: Vec<_> = self.header_downloaders.keys().cloned().collect(); + peer_ids.shuffle(&mut thread_rng()); + + for id in &peer_ids { + let request = self.header_downloaders.get_mut(id).and_then(HeaderDownloader::create_request); + if let Some(request) = request { + self.send_header_request(id, request); + break + } } - } - for id in peer_ids { - self.send_body_request(&id); + for id in &peer_ids { + self.send_body_request(id); + } } - } + }, SYNC_EXPIRE_TOKEN_BEGIN..=SYNC_EXPIRE_TOKEN_END => { self.check_sync_variable(); let (id, request_id) = { @@ -559,33 +621,39 @@ impl Extension { return } - match response { - ResponseMessage::Headers(headers) => { - self.dismiss_request(from, id); - self.on_header_response(from, &headers) - } - ResponseMessage::Bodies(bodies) => { - self.check_sync_variable(); - let hashes = match request { - RequestMessage::Bodies(hashes) => hashes, - _ => unreachable!(), - }; - assert_eq!(bodies.len(), hashes.len()); - if let Some(token) = self.tokens.get(from) { - if let Some(token_info) = self.tokens_info.get_mut(token) { - if token_info.request_id.is_none() { - ctrace!(SYNC, "Expired before handling response"); - return + match self.state { + State::SnapshotHeader(..) => unimplemented!(), + State::SnapshotBody(..) => unimplemented!(), + State::SnapshotTopChunk(..) => unimplemented!(), + State::SnapshotShardChunk(..) => unimplemented!(), + State::Full => match response { + ResponseMessage::Headers(headers) => { + self.dismiss_request(from, id); + self.on_header_response(from, &headers) + } + ResponseMessage::Bodies(bodies) => { + self.check_sync_variable(); + let hashes = match request { + RequestMessage::Bodies(hashes) => hashes, + _ => unreachable!(), + }; + assert_eq!(bodies.len(), hashes.len()); + if let Some(token) = self.tokens.get(from) { + if let Some(token_info) = self.tokens_info.get_mut(token) { + if token_info.request_id.is_none() { + ctrace!(SYNC, "Expired before handling response"); + return + } + self.api.clear_timer(*token).expect("Timer clear succeed"); + token_info.request_id = None; } - self.api.clear_timer(*token).expect("Timer clear succeed"); - token_info.request_id = None; } + self.dismiss_request(from, id); + self.on_body_response(hashes, bodies); + self.check_sync_variable(); } - self.dismiss_request(from, id); - self.on_body_response(hashes, bodies); - self.check_sync_variable(); - } - ResponseMessage::StateChunk(..) => unimplemented!(), + ResponseMessage::StateChunk(..) => unimplemented!(), + }, } } } From 5b274a4eca4e183fdd5568cafa0d847b845a600c Mon Sep 17 00:00:00 2001 From: Joonmo Yang Date: Tue, 12 Nov 2019 18:00:02 +0900 Subject: [PATCH 4/5] Fetch snapshot header from peers --- sync/src/block/extension.rs | 258 +++++++++++++++++++++--------------- 1 file changed, 154 insertions(+), 104 deletions(-) diff --git a/sync/src/block/extension.rs b/sync/src/block/extension.rs index 86141b62f0..545595de0e 100644 --- a/sync/src/block/extension.rs +++ b/sync/src/block/extension.rs @@ -57,7 +57,7 @@ pub struct TokenInfo { #[derive(Debug)] enum State { - SnapshotHeader(BlockHash), + SnapshotHeader(BlockHash, u64), SnapshotBody(BlockHash), SnapshotTopChunk(H256), SnapshotShardChunk(ShardId, H256), @@ -72,7 +72,7 @@ impl State { }; let header = match client.block_header(&num.into()) { Some(h) if h.hash() == hash => h, - _ => return State::SnapshotHeader(hash), + _ => return State::SnapshotHeader(hash, num), }; if client.block_body(&hash.into()).is_none() { return State::SnapshotBody(hash) @@ -374,28 +374,38 @@ impl NetworkExtension for Extension { fn on_timeout(&mut self, token: TimerToken) { match token { - SYNC_TIMER_TOKEN => match self.state { - State::SnapshotHeader(..) => unimplemented!(), - State::SnapshotBody(..) => unimplemented!(), - State::SnapshotTopChunk(..) => unimplemented!(), - State::SnapshotShardChunk(..) => unimplemented!(), - State::Full => { - let mut peer_ids: Vec<_> = self.header_downloaders.keys().cloned().collect(); - peer_ids.shuffle(&mut thread_rng()); - - for id in &peer_ids { - let request = self.header_downloaders.get_mut(id).and_then(HeaderDownloader::create_request); - if let Some(request) = request { - self.send_header_request(id, request); - break + SYNC_TIMER_TOKEN => { + let mut peer_ids: Vec<_> = self.header_downloaders.keys().cloned().collect(); + peer_ids.shuffle(&mut thread_rng()); + + match self.state { + State::SnapshotHeader(_, num) => { + for id in &peer_ids { + self.send_header_request(id, RequestMessage::Headers { + start_number: num, + max_count: 1, + }); } } + State::SnapshotBody(..) => unimplemented!(), + State::SnapshotTopChunk(..) => unimplemented!(), + State::SnapshotShardChunk(..) => unimplemented!(), + State::Full => { + for id in &peer_ids { + let request = + self.header_downloaders.get_mut(id).and_then(HeaderDownloader::create_request); + if let Some(request) = request { + self.send_header_request(id, request); + break + } + } - for id in &peer_ids { - self.send_body_request(id); + for id in &peer_ids { + self.send_body_request(id); + } } } - }, + } SYNC_EXPIRE_TOKEN_BEGIN..=SYNC_EXPIRE_TOKEN_END => { self.check_sync_variable(); let (id, request_id) = { @@ -471,31 +481,51 @@ pub enum Event { impl Extension { fn new_headers(&mut self, imported: Vec, enacted: Vec, retracted: Vec) { - for peer in self.header_downloaders.values_mut() { - peer.mark_as_imported(imported.clone()); - } - let mut headers_to_download: Vec<_> = enacted - .into_iter() - .map(|hash| self.client.block_header(&BlockId::Hash(hash)).expect("Enacted header must exist")) - .collect(); - headers_to_download.sort_unstable_by_key(EncodedHeader::number); - #[allow(clippy::redundant_closure)] - // False alarm. https://github.com/rust-lang/rust-clippy/issues/1439 - headers_to_download.dedup_by_key(|h| h.hash()); + if let Some(next_state) = match self.state { + State::SnapshotHeader(hash, ..) => { + if imported.contains(&hash) { + let header = self.client.block_header(&BlockId::Hash(hash)).expect("Imported header must exist"); + Some(State::SnapshotTopChunk(header.state_root())) + } else { + None + } + } + State::SnapshotBody(..) => unimplemented!(), + State::SnapshotTopChunk(..) => unimplemented!(), + State::SnapshotShardChunk(..) => unimplemented!(), + State::Full => { + for peer in self.header_downloaders.values_mut() { + peer.mark_as_imported(imported.clone()); + } - let headers: Vec<_> = headers_to_download - .into_iter() - .filter(|header| self.client.block_body(&BlockId::Hash(header.hash())).is_none()) - .collect(); // FIXME: No need to collect here if self is not borrowed. - for header in headers { - let parent = self - .client - .block_header(&BlockId::Hash(header.parent_hash())) - .expect("Enacted header must have parent"); - let is_empty = header.transactions_root() == parent.transactions_root(); - self.body_downloader.add_target(&header.decode(), is_empty); + let mut headers_to_download: Vec<_> = enacted + .into_iter() + .map(|hash| self.client.block_header(&BlockId::Hash(hash)).expect("Enacted header must exist")) + .collect(); + headers_to_download.sort_unstable_by_key(EncodedHeader::number); + #[allow(clippy::redundant_closure)] + // False alarm. https://github.com/rust-lang/rust-clippy/issues/1439 + headers_to_download.dedup_by_key(|h| h.hash()); + + let headers: Vec<_> = headers_to_download + .into_iter() + .filter(|header| self.client.block_body(&BlockId::Hash(header.hash())).is_none()) + .collect(); // FIXME: No need to collect here if self is not borrowed. + for header in headers { + let parent = self + .client + .block_header(&BlockId::Hash(header.parent_hash())) + .expect("Enacted header must have parent"); + let is_empty = header.transactions_root() == parent.transactions_root(); + self.body_downloader.add_target(&header.decode(), is_empty); + } + self.body_downloader.remove_target(&retracted); + None + } + } { + cdebug!(SYNC, "Transitioning state to {:?}", next_state); + self.state = next_state; } - self.body_downloader.remove_target(&retracted); } fn new_blocks(&mut self, imported: Vec, invalid: Vec) { @@ -621,39 +651,33 @@ impl Extension { return } - match self.state { - State::SnapshotHeader(..) => unimplemented!(), - State::SnapshotBody(..) => unimplemented!(), - State::SnapshotTopChunk(..) => unimplemented!(), - State::SnapshotShardChunk(..) => unimplemented!(), - State::Full => match response { - ResponseMessage::Headers(headers) => { - self.dismiss_request(from, id); - self.on_header_response(from, &headers) - } - ResponseMessage::Bodies(bodies) => { - self.check_sync_variable(); - let hashes = match request { - RequestMessage::Bodies(hashes) => hashes, - _ => unreachable!(), - }; - assert_eq!(bodies.len(), hashes.len()); - if let Some(token) = self.tokens.get(from) { - if let Some(token_info) = self.tokens_info.get_mut(token) { - if token_info.request_id.is_none() { - ctrace!(SYNC, "Expired before handling response"); - return - } - self.api.clear_timer(*token).expect("Timer clear succeed"); - token_info.request_id = None; + match response { + ResponseMessage::Headers(headers) => { + self.dismiss_request(from, id); + self.on_header_response(from, &headers) + } + ResponseMessage::Bodies(bodies) => { + self.check_sync_variable(); + let hashes = match request { + RequestMessage::Bodies(hashes) => hashes, + _ => unreachable!(), + }; + assert_eq!(bodies.len(), hashes.len()); + if let Some(token) = self.tokens.get(from) { + if let Some(token_info) = self.tokens_info.get_mut(token) { + if token_info.request_id.is_none() { + ctrace!(SYNC, "Expired before handling response"); + return } + self.api.clear_timer(*token).expect("Timer clear succeed"); + token_info.request_id = None; } - self.dismiss_request(from, id); - self.on_body_response(hashes, bodies); - self.check_sync_variable(); } - ResponseMessage::StateChunk(..) => unimplemented!(), - }, + self.dismiss_request(from, id); + self.on_body_response(hashes, bodies); + self.check_sync_variable(); + } + ResponseMessage::StateChunk(..) => unimplemented!(), } } } @@ -722,40 +746,66 @@ impl Extension { fn on_header_response(&mut self, from: &NodeId, headers: &[Header]) { ctrace!(SYNC, "Received header response from({}) with length({})", from, headers.len()); - let (mut completed, peer_is_caught_up) = if let Some(peer) = self.header_downloaders.get_mut(from) { - let encoded: Vec<_> = headers.iter().map(|h| EncodedHeader::new(h.rlp_bytes().to_vec())).collect(); - peer.import_headers(&encoded); - (peer.downloaded(), peer.is_caught_up()) - } else { - (Vec::new(), true) - }; - completed.sort_unstable_by_key(EncodedHeader::number); - - let mut exists = Vec::new(); - let mut queued = Vec::new(); - - for header in completed { - let hash = header.hash(); - match self.client.import_header(header.clone().into_inner()) { - Err(BlockImportError::Import(ImportError::AlreadyInChain)) => exists.push(hash), - Err(BlockImportError::Import(ImportError::AlreadyQueued)) => queued.push(hash), - // FIXME: handle import errors - Err(err) => { - cwarn!(SYNC, "Cannot import header({}): {:?}", header.hash(), err); - break + match self.state { + State::SnapshotHeader(hash, _) => match headers { + [header] if header.hash() == hash => { + match self.client.import_header(header.rlp_bytes().to_vec()) { + Ok(_) | Err(BlockImportError::Import(ImportError::AlreadyInChain)) => {} + Err(BlockImportError::Import(ImportError::AlreadyQueued)) => {} + // FIXME: handle import errors + Err(err) => { + cwarn!(SYNC, "Cannot import header({}): {:?}", header.hash(), err); + } + } } - _ => {} - } - } + _ => cdebug!( + SYNC, + "Peer {} responded with a invalid response. requested hash: {}, response length: {}", + from, + hash, + headers.len() + ), + }, + State::SnapshotBody(..) => {} + State::SnapshotTopChunk(..) => {} + State::SnapshotShardChunk(..) => {} + State::Full => { + let (mut completed, peer_is_caught_up) = if let Some(peer) = self.header_downloaders.get_mut(from) { + let encoded: Vec<_> = headers.iter().map(|h| EncodedHeader::new(h.rlp_bytes().to_vec())).collect(); + peer.import_headers(&encoded); + (peer.downloaded(), peer.is_caught_up()) + } else { + (Vec::new(), true) + }; + completed.sort_unstable_by_key(EncodedHeader::number); - let request = self.header_downloaders.get_mut(from).and_then(|peer| { - peer.mark_as_queued(queued); - peer.mark_as_imported(exists); - peer.create_request() - }); - if !peer_is_caught_up { - if let Some(request) = request { - self.send_header_request(from, request); + let mut exists = Vec::new(); + let mut queued = Vec::new(); + + for header in completed { + let hash = header.hash(); + match self.client.import_header(header.clone().into_inner()) { + Err(BlockImportError::Import(ImportError::AlreadyInChain)) => exists.push(hash), + Err(BlockImportError::Import(ImportError::AlreadyQueued)) => queued.push(hash), + // FIXME: handle import errors + Err(err) => { + cwarn!(SYNC, "Cannot import header({}): {:?}", header.hash(), err); + break + } + _ => {} + } + } + + let request = self.header_downloaders.get_mut(from).and_then(|peer| { + peer.mark_as_queued(queued); + peer.mark_as_imported(exists); + peer.create_request() + }); + if !peer_is_caught_up { + if let Some(request) = request { + self.send_header_request(from, request); + } + } } } } From 6b904c31691e22dd6b37f5b29369bb44f7e53d13 Mon Sep 17 00:00:00 2001 From: Joonmo Yang Date: Thu, 14 Nov 2019 11:49:55 +0900 Subject: [PATCH 5/5] Make the snapshot header importable without parent --- core/src/blockchain/blockchain.rs | 12 +++++++ core/src/blockchain/headerchain.rs | 42 +++++++++++++++++++++++++ core/src/client/client.rs | 11 ++++++- core/src/client/importer.rs | 19 ++++++++++- core/src/client/mod.rs | 6 +++- core/src/client/test_client.rs | 4 +++ core/src/consensus/tendermint/worker.rs | 4 ++- foundry/run_node.rs | 2 +- sync/src/block/extension.rs | 12 ++++--- 9 files changed, 102 insertions(+), 10 deletions(-) diff --git a/core/src/blockchain/blockchain.rs b/core/src/blockchain/blockchain.rs index c0a8bcfb43..5bf339dcf0 100644 --- a/core/src/blockchain/blockchain.rs +++ b/core/src/blockchain/blockchain.rs @@ -98,6 +98,18 @@ impl BlockChain { } } + pub fn insert_bootstrap_header(&self, batch: &mut DBTransaction, header: &HeaderView) { + self.headerchain.insert_bootstrap_header(batch, header); + + let hash = header.hash(); + + *self.pending_best_block_hash.write() = Some(hash); + batch.put(db::COL_EXTRA, BEST_BLOCK_KEY, &hash); + + *self.pending_best_proposal_block_hash.write() = Some(hash); + batch.put(db::COL_EXTRA, BEST_PROPOSAL_BLOCK_KEY, &hash); + } + pub fn insert_header( &self, batch: &mut DBTransaction, diff --git a/core/src/blockchain/headerchain.rs b/core/src/blockchain/headerchain.rs index c5b1bb0149..bd41b882bb 100644 --- a/core/src/blockchain/headerchain.rs +++ b/core/src/blockchain/headerchain.rs @@ -115,6 +115,48 @@ impl HeaderChain { } } + /// Inserts a bootstrap header into backing cache database. + /// Makes the imported header the best header. + /// Expects the header to be valid and already verified. + /// If the header is already known, does nothing. + // FIXME: Find better return type. Returning `None` at duplication is not natural + pub fn insert_bootstrap_header(&self, batch: &mut DBTransaction, header: &HeaderView) { + let hash = header.hash(); + + ctrace!(HEADERCHAIN, "Inserting bootstrap block header #{}({}) to the headerchain.", header.number(), hash); + + if self.is_known_header(&hash) { + ctrace!(HEADERCHAIN, "Block header #{}({}) is already known.", header.number(), hash); + return + } + + assert!(self.pending_best_header_hash.read().is_none()); + assert!(self.pending_best_proposal_block_hash.read().is_none()); + + let compressed_header = compress(header.rlp().as_raw(), blocks_swapper()); + batch.put(db::COL_HEADERS, &hash, &compressed_header); + + let mut new_hashes = HashMap::new(); + new_hashes.insert(header.number(), hash); + let mut new_details = HashMap::new(); + new_details.insert(hash, BlockDetails { + number: header.number(), + total_score: 0.into(), + parent: header.parent_hash(), + }); + + batch.put(db::COL_EXTRA, BEST_HEADER_KEY, &hash); + *self.pending_best_header_hash.write() = Some(hash); + batch.put(db::COL_EXTRA, BEST_PROPOSAL_HEADER_KEY, &hash); + *self.pending_best_proposal_block_hash.write() = Some(hash); + + let mut pending_hashes = self.pending_hashes.write(); + let mut pending_details = self.pending_details.write(); + + batch.extend_with_cache(db::COL_EXTRA, &mut *pending_details, new_details, CacheUpdatePolicy::Overwrite); + batch.extend_with_cache(db::COL_EXTRA, &mut *pending_hashes, new_hashes, CacheUpdatePolicy::Overwrite); + } + /// Inserts the header into backing cache database. /// Expects the header to be valid and already verified. /// If the header is already known, does nothing. diff --git a/core/src/client/client.rs b/core/src/client/client.rs index c5dbf07bdb..282b37f9e7 100644 --- a/core/src/client/client.rs +++ b/core/src/client/client.rs @@ -28,7 +28,7 @@ use cstate::{ }; use ctimer::{TimeoutHandler, TimerApi, TimerScheduleError, TimerToken}; use ctypes::transaction::{AssetTransferInput, PartialHashing, ShardTransaction}; -use ctypes::{BlockHash, BlockNumber, CommonParams, ShardId, Tracker, TxHash}; +use ctypes::{BlockHash, BlockNumber, CommonParams, Header, ShardId, Tracker, TxHash}; use cvm::{decode, execute, ChainTimeInfo, ScriptResult, VMConfig}; use kvdb::{DBTransaction, KeyValueDB}; use parking_lot::{Mutex, RwLock, RwLockReadGuard}; @@ -634,6 +634,15 @@ impl ImportBlock for Client { Ok(self.importer.header_queue.import(unverified)?) } + fn import_bootstrap_header(&self, header: &Header) -> Result { + if self.block_chain().is_known_header(&header.hash()) { + return Err(BlockImportError::Import(ImportError::AlreadyInChain)) + } + let import_lock = self.importer.import_lock.lock(); + self.importer.import_bootstrap_header(header, self, &import_lock); + Ok(header.hash()) + } + fn import_sealed_block(&self, block: &SealedBlock) -> ImportResult { let h = block.header().hash(); let route = { diff --git a/core/src/client/importer.rs b/core/src/client/importer.rs index 51edfef328..077e88b406 100644 --- a/core/src/client/importer.rs +++ b/core/src/client/importer.rs @@ -100,7 +100,7 @@ impl Importer { } { - let headers: Vec<&Header> = blocks.iter().map(|block| &block.header).collect(); + let headers: Vec<_> = blocks.iter().map(|block| &block.header).collect(); self.import_headers(headers, client, &import_lock); } @@ -362,6 +362,23 @@ impl Importer { imported.len() } + pub fn import_bootstrap_header<'a>(&'a self, header: &'a Header, client: &Client, _importer_lock: &MutexGuard<()>) { + let hash = header.hash(); + ctrace!(CLIENT, "Importing bootstrap header {}-{:?}", header.number(), hash); + + { + let chain = client.block_chain(); + let mut batch = DBTransaction::new(); + chain.insert_bootstrap_header(&mut batch, &HeaderView::new(&header.rlp_bytes())); + client.db().write_buffered(batch); + chain.commit(); + } + + client.new_headers(&[hash], &[], &[hash], &[], &[], Some(hash)); + + client.db().flush().expect("DB flush failed."); + } + fn check_header(&self, header: &Header, parent: &Header) -> bool { // FIXME: self.verifier.verify_block_family if let Err(e) = self.engine.verify_block_family(&header, &parent) { diff --git a/core/src/client/mod.rs b/core/src/client/mod.rs index 62b6d11697..bc19dce254 100644 --- a/core/src/client/mod.rs +++ b/core/src/client/mod.rs @@ -38,7 +38,7 @@ use cmerkle::Result as TrieResult; use cnetwork::NodeId; use cstate::{AssetScheme, FindActionHandler, OwnedAsset, StateResult, Text, TopLevelState, TopStateView}; use ctypes::transaction::{AssetTransferInput, PartialHashing, ShardTransaction}; -use ctypes::{BlockHash, BlockNumber, CommonParams, ShardId, Tracker, TxHash}; +use ctypes::{BlockHash, BlockNumber, CommonParams, Header, ShardId, Tracker, TxHash}; use cvm::ChainTimeInfo; use kvdb::KeyValueDB; use primitives::{Bytes, H160, H256, U256}; @@ -196,6 +196,10 @@ pub trait ImportBlock { /// Import a header into the blockchain fn import_header(&self, bytes: Bytes) -> Result; + /// Import a trusted bootstrap header into the blockchain + /// Bootstrap headers don't execute any verifications + fn import_bootstrap_header(&self, bytes: &Header) -> Result; + /// Import sealed block. Skips all verifications. fn import_sealed_block(&self, block: &SealedBlock) -> ImportResult; diff --git a/core/src/client/test_client.rs b/core/src/client/test_client.rs index f8b0252119..723a294c3d 100644 --- a/core/src/client/test_client.rs +++ b/core/src/client/test_client.rs @@ -509,6 +509,10 @@ impl ImportBlock for TestBlockChainClient { unimplemented!() } + fn import_bootstrap_header(&self, _header: &BlockHeader) -> Result { + unimplemented!() + } + fn import_sealed_block(&self, _block: &SealedBlock) -> ImportResult { Ok(H256::default().into()) } diff --git a/core/src/consensus/tendermint/worker.rs b/core/src/consensus/tendermint/worker.rs index c6a93ef6b0..7453142639 100644 --- a/core/src/consensus/tendermint/worker.rs +++ b/core/src/consensus/tendermint/worker.rs @@ -52,6 +52,7 @@ use crate::encoded; use crate::error::{BlockError, Error}; use crate::snapshot_notify::NotifySender as SnapshotNotifySender; use crate::transaction::{SignedTransaction, UnverifiedTransaction}; +use crate::types::BlockStatus; use crate::views::BlockView; use crate::BlockId; use std::cell::Cell; @@ -965,7 +966,8 @@ impl Worker { } fn on_imported_proposal(&mut self, proposal: &Header) { - if proposal.number() < 1 { + // NOTE: Only the genesis block and the snapshot target don't have the parent in the blockchain + if self.client().block_status(&BlockId::Hash(*proposal.parent_hash())) == BlockStatus::Unknown { return } diff --git a/foundry/run_node.rs b/foundry/run_node.rs index cba017ee69..af07ecc9d4 100644 --- a/foundry/run_node.rs +++ b/foundry/run_node.rs @@ -264,7 +264,7 @@ pub fn run_node(matches: &ArgMatches<'_>) -> Result<(), String> { let network_config = config.network_config()?; // XXX: What should we do if the network id has been changed. let c = client.client(); - let network_id = c.common_params(BlockId::Latest).unwrap().network_id(); + let network_id = c.common_params(BlockId::Number(0)).unwrap().network_id(); let routing_table = RoutingTable::new(); let service = network_start(network_id, timer_loop, &network_config, Arc::clone(&routing_table))?; diff --git a/sync/src/block/extension.rs b/sync/src/block/extension.rs index 545595de0e..44136cd8c5 100644 --- a/sync/src/block/extension.rs +++ b/sync/src/block/extension.rs @@ -129,10 +129,12 @@ impl Extension { let mut header = client.best_header(); let mut hollow_headers = vec![header.decode()]; while client.block_body(&BlockId::Hash(header.hash())).is_none() { - header = client - .block_header(&BlockId::Hash(header.parent_hash())) - .expect("Every imported header must have parent"); - hollow_headers.push(header.decode()); + if let Some(h) = client.block_header(&BlockId::Hash(header.parent_hash())) { + header = h; + hollow_headers.push(header.decode()); + } else { + break + } } let mut body_downloader = BodyDownloader::default(); for neighbors in hollow_headers.windows(2).rev() { @@ -749,7 +751,7 @@ impl Extension { match self.state { State::SnapshotHeader(hash, _) => match headers { [header] if header.hash() == hash => { - match self.client.import_header(header.rlp_bytes().to_vec()) { + match self.client.import_bootstrap_header(&header) { Ok(_) | Err(BlockImportError::Import(ImportError::AlreadyInChain)) => {} Err(BlockImportError::Import(ImportError::AlreadyQueued)) => {} // FIXME: handle import errors