From 28ed502725c55bce04b461e912b97d831a665ad4 Mon Sep 17 00:00:00 2001 From: Joonmo Yang Date: Tue, 12 Nov 2019 18:00:02 +0900 Subject: [PATCH 1/2] Fetch snapshot header from peers --- sync/src/block/extension.rs | 286 ++++++++++++++++++++++-------------- 1 file changed, 173 insertions(+), 113 deletions(-) diff --git a/sync/src/block/extension.rs b/sync/src/block/extension.rs index 76c6c8c957..5f441af6d3 100644 --- a/sync/src/block/extension.rs +++ b/sync/src/block/extension.rs @@ -57,8 +57,9 @@ pub struct TokenInfo { request_id: Option, } +#[derive(Debug)] enum State { - SnapshotHeader(H256), + SnapshotHeader(BlockHash, u64), SnapshotChunk(H256), Full, } @@ -90,10 +91,11 @@ impl Extension { _ => State::SnapshotChunk(*header.hash()), } } - _ => State::SnapshotHeader(hash), + _ => State::SnapshotHeader(hash.into(), num), }, None => State::Full, }; + cdebug!(SYNC, "Initial state is {:?}", state); let mut header = client.best_header(); let mut hollow_headers = vec![header.decode()]; while client.block_body(&BlockId::Hash(header.hash())).is_none() { @@ -309,35 +311,45 @@ impl NetworkExtension for Extension { fn on_timeout(&mut self, token: TimerToken) { match token { - SYNC_TIMER_TOKEN => match self.state { - State::SnapshotHeader(..) => unimplemented!(), - State::SnapshotChunk(..) => unimplemented!(), - State::Full => { - let best_proposal_score = self.client.chain_info().best_proposal_score; - let mut peer_ids: Vec<_> = self.header_downloaders.keys().cloned().collect(); - peer_ids.shuffle(&mut thread_rng()); - - for id in &peer_ids { - let request = self.header_downloaders.get_mut(id).and_then(HeaderDownloader::create_request); - if let Some(request) = request { - self.send_header_request(id, request); - break + SYNC_TIMER_TOKEN => { + let mut peer_ids: Vec<_> = self.header_downloaders.keys().cloned().collect(); + peer_ids.shuffle(&mut thread_rng()); + + match self.state { + State::SnapshotHeader(_, num) => { + for id in &peer_ids { + self.send_header_request(id, RequestMessage::Headers { + start_number: num, + max_count: 1, + }); } } + State::SnapshotChunk(..) => unimplemented!(), + State::Full => { + let best_proposal_score = self.client.chain_info().best_proposal_score; + for id in &peer_ids { + let request = + self.header_downloaders.get_mut(id).and_then(HeaderDownloader::create_request); + if let Some(request) = request { + self.send_header_request(id, request); + break + } + } - for id in peer_ids { - let peer_score = if let Some(peer) = self.header_downloaders.get(&id) { - peer.total_score() - } else { - U256::zero() - }; + for id in peer_ids { + let peer_score = if let Some(peer) = self.header_downloaders.get(&id) { + peer.total_score() + } else { + U256::zero() + }; - if peer_score > best_proposal_score { - self.send_body_request(&id); + if peer_score > best_proposal_score { + self.send_body_request(&id); + } } } } - }, + } SYNC_EXPIRE_TOKEN_BEGIN..=SYNC_EXPIRE_TOKEN_END => { self.check_sync_variable(); let (id, request_id) = { @@ -413,39 +425,72 @@ pub enum Event { impl Extension { fn new_headers(&mut self, imported: Vec, enacted: Vec, retracted: Vec) { - let peer_ids: Vec<_> = self.header_downloaders.keys().cloned().collect(); - for id in peer_ids { - if let Some(peer) = self.header_downloaders.get_mut(&id) { - peer.mark_as_imported(imported.clone()); + if let Some(next_state) = match self.state { + State::SnapshotHeader(hash, ..) => { + if imported.contains(&hash) { + let header = self.client.block_header(&BlockId::Hash(hash)).expect("Imported header must exist"); + Some(State::SnapshotChunk(header.state_root())) + } else { + None + } } + State::SnapshotChunk(..) => unimplemented!(), + State::Full => { + let peer_ids: Vec<_> = self.header_downloaders.keys().cloned().collect(); + for id in peer_ids { + if let Some(peer) = self.header_downloaders.get_mut(&id) { + peer.mark_as_imported(imported.clone()); + } + } + let mut headers_to_download: Vec<_> = enacted + .into_iter() + .map(|hash| self.client.block_header(&BlockId::Hash(hash)).expect("Enacted header must exist")) + .collect(); + headers_to_download.sort_unstable_by_key(EncodedHeader::number); + #[allow(clippy::redundant_closure)] + // False alarm. https://github.com/rust-lang/rust-clippy/issues/1439 + headers_to_download.dedup_by_key(|h| h.hash()); + + let headers: Vec<_> = headers_to_download + .into_iter() + .filter(|header| self.client.block_body(&BlockId::Hash(header.hash())).is_none()) + .collect(); // FIXME: No need to collect here if self is not borrowed. + for header in headers { + let parent = self + .client + .block_header(&BlockId::Hash(header.parent_hash())) + .expect("Enacted header must have parent"); + self.body_downloader.add_target(&header.decode(), &parent.decode()); + } + self.body_downloader.remove_target(&retracted); + None + } + } { + cdebug!(SYNC, "Transitioning state to {:?}", next_state); + self.state = next_state; } - let mut headers_to_download: Vec<_> = enacted - .into_iter() - .map(|hash| self.client.block_header(&BlockId::Hash(hash)).expect("Enacted header must exist")) - .collect(); - headers_to_download.sort_unstable_by_key(EncodedHeader::number); - #[allow(clippy::redundant_closure)] - // False alarm. https://github.com/rust-lang/rust-clippy/issues/1439 - headers_to_download.dedup_by_key(|h| h.hash()); - - let headers: Vec<_> = headers_to_download - .into_iter() - .filter(|header| self.client.block_body(&BlockId::Hash(header.hash())).is_none()) - .collect(); // FIXME: No need to collect here if self is not borrowed. - for header in headers { - let parent = self - .client - .block_header(&BlockId::Hash(header.parent_hash())) - .expect("Enacted header must have parent"); - self.body_downloader.add_target(&header.decode(), &parent.decode()); - } - self.body_downloader.remove_target(&retracted); } fn new_blocks(&mut self, imported: Vec, invalid: Vec) { - self.body_downloader.remove_target(&imported); - self.body_downloader.remove_target(&invalid); - + if let Some(next_state) = match self.state { + State::SnapshotHeader(hash, ..) => { + if imported.contains(&hash) { + let header = self.client.block_header(&BlockId::Hash(hash)).expect("Imported header must exist"); + Some(State::SnapshotChunk(header.state_root())) + } else { + None + } + } + State::SnapshotChunk(..) => None, + State::Full => { + self.body_downloader.remove_target(&imported); + self.body_downloader.remove_target(&invalid); + None + } + } { + cdebug!(SYNC, "Transitioning state to {:?}", next_state); + self.state = next_state; + } let chain_info = self.client.chain_info(); @@ -599,37 +644,33 @@ impl Extension { return } - match self.state { - State::SnapshotHeader(..) => unimplemented!(), - State::SnapshotChunk(..) => unimplemented!(), - State::Full => match response { - ResponseMessage::Headers(headers) => { - self.dismiss_request(from, id); - self.on_header_response(from, &headers) - } - ResponseMessage::Bodies(bodies) => { - self.check_sync_variable(); - let hashes = match request { - RequestMessage::Bodies(hashes) => hashes, - _ => unreachable!(), - }; - assert_eq!(bodies.len(), hashes.len()); - if let Some(token) = self.tokens.get(from) { - if let Some(token_info) = self.tokens_info.get_mut(token) { - if token_info.request_id.is_none() { - ctrace!(SYNC, "Expired before handling response"); - return - } - self.api.clear_timer(*token).expect("Timer clear succeed"); - token_info.request_id = None; + match response { + ResponseMessage::Headers(headers) => { + self.dismiss_request(from, id); + self.on_header_response(from, &headers) + } + ResponseMessage::Bodies(bodies) => { + self.check_sync_variable(); + let hashes = match request { + RequestMessage::Bodies(hashes) => hashes, + _ => unreachable!(), + }; + assert_eq!(bodies.len(), hashes.len()); + if let Some(token) = self.tokens.get(from) { + if let Some(token_info) = self.tokens_info.get_mut(token) { + if token_info.request_id.is_none() { + ctrace!(SYNC, "Expired before handling response"); + return } + self.api.clear_timer(*token).expect("Timer clear succeed"); + token_info.request_id = None; } - self.dismiss_request(from, id); - self.on_body_response(hashes, bodies); - self.check_sync_variable(); } - _ => unimplemented!(), - }, + self.dismiss_request(from, id); + self.on_body_response(hashes, bodies); + self.check_sync_variable(); + } + _ => unimplemented!(), } } } @@ -699,42 +740,61 @@ impl Extension { fn on_header_response(&mut self, from: &NodeId, headers: &[Header]) { ctrace!(SYNC, "Received header response from({}) with length({})", from, headers.len()); - let (mut completed, pivot_score_changed) = if let Some(peer) = self.header_downloaders.get_mut(from) { - let before_pivot_score = peer.pivot_score(); - let encoded: Vec<_> = headers.iter().map(|h| EncodedHeader::new(h.rlp_bytes().to_vec())).collect(); - peer.import_headers(&encoded); - let after_pivot_score = peer.pivot_score(); - (peer.downloaded(), before_pivot_score != after_pivot_score) - } else { - (Vec::new(), false) - }; - completed.sort_unstable_by_key(EncodedHeader::number); - - let mut exists = Vec::new(); - let mut queued = Vec::new(); - - for header in completed { - let hash = header.hash(); - match self.client.import_header(header.clone().into_inner()) { - Err(BlockImportError::Import(ImportError::AlreadyInChain)) => exists.push(hash), - Err(BlockImportError::Import(ImportError::AlreadyQueued)) => queued.push(hash), - // FIXME: handle import errors - Err(err) => { - cwarn!(SYNC, "Cannot import header({}): {:?}", header.hash(), err); - break + match self.state { + State::SnapshotHeader(..) => { + for header in headers { + match self.client.import_header(header.rlp_bytes().to_vec()) { + Err(BlockImportError::Import(ImportError::AlreadyInChain)) => {} + Err(BlockImportError::Import(ImportError::AlreadyQueued)) => {} + // FIXME: handle import errors + Err(err) => { + cwarn!(SYNC, "Cannot import header({}): {:?}", header.hash(), err); + break + } + _ => {} + } } - _ => {} } - } + State::SnapshotChunk(..) => {} + State::Full => { + let (mut completed, pivot_score_changed) = if let Some(peer) = self.header_downloaders.get_mut(from) { + let before_pivot_score = peer.pivot_score(); + let encoded: Vec<_> = headers.iter().map(|h| EncodedHeader::new(h.rlp_bytes().to_vec())).collect(); + peer.import_headers(&encoded); + let after_pivot_score = peer.pivot_score(); + (peer.downloaded(), before_pivot_score != after_pivot_score) + } else { + (Vec::new(), false) + }; + completed.sort_unstable_by_key(EncodedHeader::number); - let request = self.header_downloaders.get_mut(from).and_then(|peer| { - peer.mark_as_queued(queued); - peer.mark_as_imported(exists); - peer.create_request() - }); - if pivot_score_changed { - if let Some(request) = request { - self.send_header_request(from, request); + let mut exists = Vec::new(); + let mut queued = Vec::new(); + + for header in completed { + let hash = header.hash(); + match self.client.import_header(header.clone().into_inner()) { + Err(BlockImportError::Import(ImportError::AlreadyInChain)) => exists.push(hash), + Err(BlockImportError::Import(ImportError::AlreadyQueued)) => queued.push(hash), + // FIXME: handle import errors + Err(err) => { + cwarn!(SYNC, "Cannot import header({}): {:?}", header.hash(), err); + break + } + _ => {} + } + } + + let request = self.header_downloaders.get_mut(from).and_then(|peer| { + peer.mark_as_queued(queued); + peer.mark_as_imported(exists); + peer.create_request() + }); + if pivot_score_changed { + if let Some(request) = request { + self.send_header_request(from, request); + } + } } } } From 91087564836735848cd1f7dbfcffb75dc39c2eb2 Mon Sep 17 00:00:00 2001 From: Joonmo Yang Date: Thu, 14 Nov 2019 11:49:55 +0900 Subject: [PATCH 2/2] Make the snapshot header importable without parent --- codechain/run_node.rs | 2 +- core/src/blockchain/blockchain.rs | 12 +++++++++ core/src/blockchain/headerchain.rs | 42 ++++++++++++++++++++++++++++++ core/src/client/client.rs | 11 +++++++- core/src/client/importer.rs | 20 +++++++++++++- core/src/client/mod.rs | 6 ++++- core/src/client/test_client.rs | 4 +++ sync/src/block/extension.rs | 12 +++++---- 8 files changed, 100 insertions(+), 9 deletions(-) diff --git a/codechain/run_node.rs b/codechain/run_node.rs index acb37e572c..ea31281846 100644 --- a/codechain/run_node.rs +++ b/codechain/run_node.rs @@ -287,7 +287,7 @@ pub fn run_node(matches: &ArgMatches) -> Result<(), String> { let network_config = config.network_config()?; // XXX: What should we do if the network id has been changed. let c = client.client(); - let network_id = c.common_params(BlockId::Latest).unwrap().network_id(); + let network_id = c.common_params(BlockId::Number(0)).unwrap().network_id(); let routing_table = RoutingTable::new(); let service = network_start(network_id, timer_loop, &network_config, Arc::clone(&routing_table))?; diff --git a/core/src/blockchain/blockchain.rs b/core/src/blockchain/blockchain.rs index 4d7ee18845..4bd57be9ec 100644 --- a/core/src/blockchain/blockchain.rs +++ b/core/src/blockchain/blockchain.rs @@ -98,6 +98,18 @@ impl BlockChain { } } + pub fn insert_bootstrap_header(&self, batch: &mut DBTransaction, header: &HeaderView) { + self.headerchain.insert_bootstrap_header(batch, header); + + let hash = header.hash(); + + *self.pending_best_block_hash.write() = Some(hash); + batch.put(db::COL_EXTRA, BEST_BLOCK_KEY, &hash); + + *self.pending_best_proposal_block_hash.write() = Some(hash); + batch.put(db::COL_EXTRA, BEST_PROPOSAL_BLOCK_KEY, &hash); + } + pub fn insert_header( &self, batch: &mut DBTransaction, diff --git a/core/src/blockchain/headerchain.rs b/core/src/blockchain/headerchain.rs index 2451bfc0b6..9bd88f4787 100644 --- a/core/src/blockchain/headerchain.rs +++ b/core/src/blockchain/headerchain.rs @@ -115,6 +115,48 @@ impl HeaderChain { } } + /// Inserts a bootstrap header into backing cache database. + /// Makes the imported header the best header. + /// Expects the header to be valid and already verified. + /// If the header is already known, does nothing. + // FIXME: Find better return type. Returning `None` at duplication is not natural + pub fn insert_bootstrap_header(&self, batch: &mut DBTransaction, header: &HeaderView) { + let hash = header.hash(); + + ctrace!(HEADERCHAIN, "Inserting bootstrap block header #{}({}) to the headerchain.", header.number(), hash); + + if self.is_known_header(&hash) { + ctrace!(HEADERCHAIN, "Block header #{}({}) is already known.", header.number(), hash); + return + } + + assert!(self.pending_best_header_hash.read().is_none()); + assert!(self.pending_best_proposal_block_hash.read().is_none()); + + let compressed_header = compress(header.rlp().as_raw(), blocks_swapper()); + batch.put(db::COL_HEADERS, &hash, &compressed_header); + + let mut new_hashes = HashMap::new(); + new_hashes.insert(header.number(), hash); + let mut new_details = HashMap::new(); + new_details.insert(hash, BlockDetails { + number: header.number(), + total_score: 0.into(), + parent: header.parent_hash(), + }); + + batch.put(db::COL_EXTRA, BEST_HEADER_KEY, &hash); + *self.pending_best_header_hash.write() = Some(hash); + batch.put(db::COL_EXTRA, BEST_PROPOSAL_HEADER_KEY, &hash); + *self.pending_best_proposal_block_hash.write() = Some(hash); + + let mut pending_hashes = self.pending_hashes.write(); + let mut pending_details = self.pending_details.write(); + + batch.extend_with_cache(db::COL_EXTRA, &mut *pending_details, new_details, CacheUpdatePolicy::Overwrite); + batch.extend_with_cache(db::COL_EXTRA, &mut *pending_hashes, new_hashes, CacheUpdatePolicy::Overwrite); + } + /// Inserts the header into backing cache database. /// Expects the header to be valid and already verified. /// If the header is already known, does nothing. diff --git a/core/src/client/client.rs b/core/src/client/client.rs index add839f9fb..3f6306bc66 100644 --- a/core/src/client/client.rs +++ b/core/src/client/client.rs @@ -28,7 +28,7 @@ use cstate::{ }; use ctimer::{TimeoutHandler, TimerApi, TimerScheduleError, TimerToken}; use ctypes::transaction::{AssetTransferInput, PartialHashing, ShardTransaction}; -use ctypes::{BlockHash, BlockNumber, CommonParams, ShardId, Tracker, TxHash}; +use ctypes::{BlockHash, BlockNumber, CommonParams, Header, ShardId, Tracker, TxHash}; use cvm::{decode, execute, ChainTimeInfo, ScriptResult, VMConfig}; use hashdb::AsHashDB; use journaldb; @@ -664,6 +664,15 @@ impl ImportBlock for Client { Ok(self.importer.header_queue.import(unverified)?) } + fn import_bootstrap_header(&self, header: &Header) -> Result { + if self.block_chain().is_known_header(&header.hash()) { + return Err(BlockImportError::Import(ImportError::AlreadyInChain)) + } + let import_lock = self.importer.import_lock.lock(); + self.importer.import_bootstrap_header(header, self, &import_lock); + Ok(header.hash()) + } + fn import_sealed_block(&self, block: &SealedBlock) -> ImportResult { let h = block.header().hash(); let start = Instant::now(); diff --git a/core/src/client/importer.rs b/core/src/client/importer.rs index b9bf9479a5..36a253965e 100644 --- a/core/src/client/importer.rs +++ b/core/src/client/importer.rs @@ -101,12 +101,13 @@ impl Importer { } { - let headers: Vec<&Header> = blocks.iter().map(|block| &block.header).collect(); + let headers: Vec<_> = blocks.iter().map(|block| &block.header).collect(); self.import_headers(headers, client, &import_lock); } let start = Instant::now(); + // NOTE: There are no situation importing "trusted block"s. for block in blocks { let header = &block.header; ctrace!(CLIENT, "Importing block {}", header.number()); @@ -370,6 +371,23 @@ impl Importer { imported.len() } + pub fn import_bootstrap_header<'a>(&'a self, header: &'a Header, client: &Client, _importer_lock: &MutexGuard<()>) { + let hash = header.hash(); + ctrace!(CLIENT, "Importing bootstrap header {}-{:?}", header.number(), hash); + + { + let chain = client.block_chain(); + let mut batch = DBTransaction::new(); + chain.insert_bootstrap_header(&mut batch, &HeaderView::new(&header.rlp_bytes())); + client.db().write_buffered(batch); + chain.commit(); + } + + client.new_headers(&[hash], &[], &[hash], &[], &[], 0, Some(hash)); + + client.db().flush().expect("DB flush failed."); + } + fn check_header(&self, header: &Header, parent: &Header) -> bool { // FIXME: self.verifier.verify_block_family if let Err(e) = self.engine.verify_block_family(&header, &parent) { diff --git a/core/src/client/mod.rs b/core/src/client/mod.rs index 6b565e903a..ac14869656 100644 --- a/core/src/client/mod.rs +++ b/core/src/client/mod.rs @@ -37,7 +37,7 @@ use cmerkle::Result as TrieResult; use cnetwork::NodeId; use cstate::{AssetScheme, FindActionHandler, OwnedAsset, StateResult, Text, TopLevelState, TopStateView}; use ctypes::transaction::{AssetTransferInput, PartialHashing, ShardTransaction}; -use ctypes::{BlockHash, BlockNumber, CommonParams, ShardId, Tracker, TxHash}; +use ctypes::{BlockHash, BlockNumber, CommonParams, Header, ShardId, Tracker, TxHash}; use cvm::ChainTimeInfo; use kvdb::KeyValueDB; use primitives::{Bytes, H160, H256, U256}; @@ -200,6 +200,10 @@ pub trait ImportBlock { /// Import a header into the blockchain fn import_header(&self, bytes: Bytes) -> Result; + /// Import a trusted bootstrap header into the blockchain + /// Bootstrap headers don't execute any verifications + fn import_bootstrap_header(&self, bytes: &Header) -> Result; + /// Import sealed block. Skips all verifications. fn import_sealed_block(&self, block: &SealedBlock) -> ImportResult; diff --git a/core/src/client/test_client.rs b/core/src/client/test_client.rs index 8b61a8f443..ed7e69d834 100644 --- a/core/src/client/test_client.rs +++ b/core/src/client/test_client.rs @@ -510,6 +510,10 @@ impl ImportBlock for TestBlockChainClient { unimplemented!() } + fn import_bootstrap_header(&self, _header: &BlockHeader) -> Result { + unimplemented!() + } + fn import_sealed_block(&self, _block: &SealedBlock) -> ImportResult { Ok(H256::default().into()) } diff --git a/sync/src/block/extension.rs b/sync/src/block/extension.rs index 5f441af6d3..28e356c3e8 100644 --- a/sync/src/block/extension.rs +++ b/sync/src/block/extension.rs @@ -99,10 +99,12 @@ impl Extension { let mut header = client.best_header(); let mut hollow_headers = vec![header.decode()]; while client.block_body(&BlockId::Hash(header.hash())).is_none() { - header = client - .block_header(&BlockId::Hash(header.parent_hash())) - .expect("Every imported header must have parent"); - hollow_headers.push(header.decode()); + if let Some(h) = client.block_header(&BlockId::Hash(header.parent_hash())) { + header = h; + hollow_headers.push(header.decode()); + } else { + break + } } let mut body_downloader = BodyDownloader::default(); for neighbors in hollow_headers.windows(2).rev() { @@ -743,7 +745,7 @@ impl Extension { match self.state { State::SnapshotHeader(..) => { for header in headers { - match self.client.import_header(header.rlp_bytes().to_vec()) { + match self.client.import_bootstrap_header(&header) { Err(BlockImportError::Import(ImportError::AlreadyInChain)) => {} Err(BlockImportError::Import(ImportError::AlreadyQueued)) => {} // FIXME: handle import errors