diff --git a/core/src/lib.rs b/core/src/lib.rs index 26cf864d8f..74e2674680 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -99,4 +99,4 @@ pub use crate::service::ClientService; pub use crate::transaction::{ LocalizedTransaction, PendingSignedTransactions, SignedTransaction, UnverifiedTransaction, }; -pub use crate::types::{BlockId, TransactionId}; +pub use crate::types::{BlockId, BlockStatus, TransactionId}; diff --git a/spec/Block-Synchronization-Extension.md b/spec/Block-Synchronization-Extension.md index fb5a106f75..4ae1e0fc44 100644 --- a/spec/Block-Synchronization-Extension.md +++ b/spec/Block-Synchronization-Extension.md @@ -19,13 +19,14 @@ Message := ### Status ``` -Status(total_score, best_hash, genesis_hash) +Status(nonce, best_hash, genesis_hash) ``` Send current chain status to peer. * Identifier: 0x01 -* Restriction: None +* Restriction: + * `nonce` SHOULD be monotonically increasing every time the message is sent. ## Request messages diff --git a/sync/src/block/downloader/header.rs b/sync/src/block/downloader/header.rs index 963c2135e5..1422c1acf9 100644 --- a/sync/src/block/downloader/header.rs +++ b/sync/src/block/downloader/header.rs @@ -31,21 +31,14 @@ const MAX_HEADER_QUEUE_LENGTH: usize = 1024; const MAX_RETRY: usize = 3; const MAX_WAIT: u64 = 15; -#[derive(Clone)] -struct Pivot { - hash: BlockHash, - total_score: U256, -} - #[derive(Clone)] pub struct HeaderDownloader { // NOTE: Use this member as minimum as possible. client: Arc, - total_score: U256, + nonce: U256, best_hash: BlockHash, - - pivot: Pivot, + pivot: BlockHash, request_time: Option, downloaded: HashMap, queued: HashMap, @@ -53,24 +46,15 @@ pub struct HeaderDownloader { } impl HeaderDownloader { - pub fn total_score(&self) -> U256 { - self.total_score - } - - pub fn new(client: Arc, total_score: U256, best_hash: BlockHash) -> Self { + pub fn new(client: Arc, nonce: U256, best_hash: BlockHash) -> Self { let best_header_hash = client.best_block_header().hash(); - let best_score = client.block_total_score(&BlockId::Latest).expect("Best block always exist"); Self { client, - total_score, + nonce, best_hash, - - pivot: Pivot { - hash: best_header_hash, - total_score: best_score, - }, + pivot: best_header_hash, request_time: None, downloaded: HashMap::new(), queued: HashMap::new(), @@ -78,18 +62,19 @@ impl HeaderDownloader { } } - pub fn update(&mut self, total_score: U256, best_hash: BlockHash) -> bool { - match self.total_score.cmp(&total_score) { + pub fn best_hash(&self) -> BlockHash { + self.best_hash + } + + pub fn update(&mut self, nonce: U256, best_hash: BlockHash) -> bool { + match self.nonce.cmp(&nonce) { Ordering::Equal => true, Ordering::Less => { - self.total_score = total_score; + self.nonce = nonce; self.best_hash = best_hash; if self.client.block_header(&BlockId::Hash(best_hash)).is_some() { - self.pivot = Pivot { - hash: best_hash, - total_score, - } + self.pivot = best_hash; } true } @@ -108,25 +93,25 @@ impl HeaderDownloader { /// Find header from queued headers, downloaded cache and then from blockchain /// Panics if header dosn't exist fn pivot_header(&self) -> Header { - match self.queued.get(&self.pivot.hash) { + match self.queued.get(&self.pivot) { Some(header) => header.clone(), - None => match self.downloaded.get(&self.pivot.hash) { + None => match self.downloaded.get(&self.pivot) { Some(header) => header.clone(), - None => self.client.block_header(&BlockId::Hash(self.pivot.hash)).unwrap(), + None => self.client.block_header(&BlockId::Hash(self.pivot)).unwrap(), }, } } - pub fn pivot_score(&self) -> U256 { - self.pivot.total_score - } - pub fn is_idle(&self) -> bool { - let can_request = self.request_time.is_none() && self.total_score > self.pivot.total_score; + let can_request = self.request_time.is_none() && self.best_hash != self.pivot; self.is_valid() && (can_request || self.is_expired()) } + pub fn is_caught_up(&self) -> bool { + self.pivot == self.best_hash + } + pub fn create_request(&mut self) -> Option { if !self.is_idle() { return None @@ -154,19 +139,15 @@ impl HeaderDownloader { let pivot_header = self.pivot_header(); // This happens when best_hash is imported by other peer. - if self.best_hash == self.pivot.hash { + if self.best_hash == self.pivot { ctrace!(SYNC, "Ignore received headers, pivot already reached the best hash"); - } else if first_header_hash == self.pivot.hash { + } else if first_header_hash == self.pivot { for header in headers.iter() { self.downloaded.insert(header.hash(), header.clone()); } // FIXME: skip known headers - let new_scores = headers[1..].iter().fold(U256::zero(), |acc, header| acc + header.score()); - self.pivot = Pivot { - hash: headers.last().expect("Last downloaded header must exist").hash(), - total_score: self.pivot.total_score + new_scores, - } + self.pivot = headers.last().expect("Last downloaded header must exist").hash(); } else if first_header_number < pivot_header.number() { ctrace!( SYNC, @@ -174,17 +155,14 @@ impl HeaderDownloader { ); } else if first_header_number == pivot_header.number() { if pivot_header.number() != 0 { - self.pivot = Pivot { - hash: pivot_header.parent_hash(), - total_score: self.pivot.total_score - pivot_header.score(), - } + self.pivot = pivot_header.parent_hash(); } } else { cerror!( SYNC, - "Invalid header update state. best_hash: {}, self.pivot.hash: {}, first_header_hash: {}", + "Invalid header update state. best_hash: {}, self.pivot: {}, first_header_hash: {}", self.best_hash, - self.pivot.hash, + self.pivot, first_header_hash ); } @@ -203,10 +181,7 @@ impl HeaderDownloader { self.downloaded.remove(&hash); if self.best_hash == hash { - self.pivot = Pivot { - hash, - total_score: self.total_score, - } + self.pivot = hash; } } self.queued.shrink_to_fit(); diff --git a/sync/src/block/extension.rs b/sync/src/block/extension.rs index 28e356c3e8..798660562e 100644 --- a/sync/src/block/extension.rs +++ b/sync/src/block/extension.rs @@ -21,8 +21,8 @@ use std::time::Duration; use ccore::encoded::Header as EncodedHeader; use ccore::{ - Block, BlockChainClient, BlockChainTrait, BlockId, BlockImportError, ChainNotify, Client, ImportBlock, ImportError, - UnverifiedTransaction, + Block, BlockChainClient, BlockChainTrait, BlockId, BlockImportError, BlockStatus, ChainNotify, Client, ImportBlock, + ImportError, UnverifiedTransaction, }; use cmerkle::TrieFactory; use cnetwork::{Api, EventSender, NetworkExtension, NodeId}; @@ -76,6 +76,7 @@ pub struct Extension { client: Arc, api: Box, last_request: u64, + nonce: u64, } impl Extension { @@ -126,6 +127,7 @@ impl Extension { client, api, last_request: Default::default(), + nonce: Default::default(), } } @@ -146,6 +148,14 @@ impl Extension { } fn send_body_request(&mut self, id: &NodeId) { + if let Some(downloader) = self.header_downloaders.get(&id) { + if self.client.block_status(&BlockId::Hash(downloader.best_hash())) == BlockStatus::InChain { + // Peer is lagging behind the local blockchain. + // We don't need to request block bodies to this peer + return + } + } + self.check_sync_variable(); if let Some(requests) = self.requests.get_mut(id) { let have_body_request = { @@ -243,7 +253,7 @@ impl NetworkExtension for Extension { id, Arc::new( Message::Status { - total_score: chain_info.best_proposal_score, + nonce: U256::from(self.nonce), best_hash: chain_info.best_proposal_block_hash, genesis_hash: chain_info.genesis_hash, } @@ -251,6 +261,7 @@ impl NetworkExtension for Extension { .into_vec(), ), ); + self.nonce += 1; let t = self.connected_nodes.insert(*id); debug_assert!(t, "{} is already added to peer list", id); @@ -299,10 +310,10 @@ impl NetworkExtension for Extension { if let Ok(received_message) = UntrustedRlp::new(data).as_val() { match received_message { Message::Status { - total_score, + nonce, best_hash, genesis_hash, - } => self.on_peer_status(id, total_score, best_hash, genesis_hash), + } => self.on_peer_status(id, nonce, best_hash, genesis_hash), Message::Request(request_id, request) => self.on_peer_request(id, request_id, request), Message::Response(request_id, response) => self.on_peer_response(id, request_id, response), } @@ -328,7 +339,6 @@ impl NetworkExtension for Extension { } State::SnapshotChunk(..) => unimplemented!(), State::Full => { - let best_proposal_score = self.client.chain_info().best_proposal_score; for id in &peer_ids { let request = self.header_downloaders.get_mut(id).and_then(HeaderDownloader::create_request); @@ -339,15 +349,7 @@ impl NetworkExtension for Extension { } for id in peer_ids { - let peer_score = if let Some(peer) = self.header_downloaders.get(&id) { - peer.total_score() - } else { - U256::zero() - }; - - if peer_score > best_proposal_score { - self.send_body_request(&id); - } + self.send_body_request(&id); } } } @@ -501,7 +503,7 @@ impl Extension { id, Arc::new( Message::Status { - total_score: chain_info.best_proposal_score, + nonce: U256::from(self.nonce), best_hash: chain_info.best_proposal_block_hash, genesis_hash: chain_info.genesis_hash, } @@ -509,12 +511,13 @@ impl Extension { .into_vec(), ), ); + self.nonce += 1; } } } impl Extension { - fn on_peer_status(&mut self, from: &NodeId, total_score: U256, best_hash: BlockHash, genesis_hash: BlockHash) { + fn on_peer_status(&mut self, from: &NodeId, nonce: U256, best_hash: BlockHash, genesis_hash: BlockHash) { // Validity check if genesis_hash != self.client.chain_info().genesis_hash { cinfo!(SYNC, "Genesis hash mismatch with peer {}", from); @@ -523,17 +526,17 @@ impl Extension { match self.header_downloaders.entry(*from) { Entry::Occupied(mut peer) => { - if !peer.get_mut().update(total_score, best_hash) { + if !peer.get_mut().update(nonce, best_hash) { // FIXME: It should be an error level if the consensus is PoW. cdebug!(SYNC, "Peer #{} status updated but score is less than before", from); return } } Entry::Vacant(e) => { - e.insert(HeaderDownloader::new(self.client.clone(), total_score, best_hash)); + e.insert(HeaderDownloader::new(self.client.clone(), nonce, best_hash)); } } - cinfo!(SYNC, "Peer #{} status update: total_score: {}, best_hash: {}", from, total_score, best_hash); + cinfo!(SYNC, "Peer #{} status update: nonce: {}, best_hash: {}", from, nonce, best_hash); } fn on_peer_request(&self, from: &NodeId, id: u64, request: RequestMessage) { @@ -759,14 +762,12 @@ impl Extension { } State::SnapshotChunk(..) => {} State::Full => { - let (mut completed, pivot_score_changed) = if let Some(peer) = self.header_downloaders.get_mut(from) { - let before_pivot_score = peer.pivot_score(); + let (mut completed, peer_is_caught_up) = if let Some(peer) = self.header_downloaders.get_mut(from) { let encoded: Vec<_> = headers.iter().map(|h| EncodedHeader::new(h.rlp_bytes().to_vec())).collect(); peer.import_headers(&encoded); - let after_pivot_score = peer.pivot_score(); - (peer.downloaded(), before_pivot_score != after_pivot_score) + (peer.downloaded(), peer.is_caught_up()) } else { - (Vec::new(), false) + (Vec::new(), true) }; completed.sort_unstable_by_key(EncodedHeader::number); @@ -792,7 +793,7 @@ impl Extension { peer.mark_as_imported(exists); peer.create_request() }); - if pivot_score_changed { + if !peer_is_caught_up { if let Some(request) = request { self.send_header_request(from, request); } @@ -834,20 +835,11 @@ impl Extension { } } - let total_score = self.client.chain_info().best_proposal_score; let mut peer_ids: Vec<_> = self.header_downloaders.keys().cloned().collect(); peer_ids.shuffle(&mut thread_rng()); for id in peer_ids { - let peer_score = if let Some(peer) = self.header_downloaders.get(&id) { - peer.total_score() - } else { - U256::zero() - }; - - if peer_score > total_score { - self.send_body_request(&id); - } + self.send_body_request(&id); } } } diff --git a/sync/src/block/message/mod.rs b/sync/src/block/message/mod.rs index 1abffe9b30..5bfa4eaea5 100644 --- a/sync/src/block/message/mod.rs +++ b/sync/src/block/message/mod.rs @@ -37,7 +37,7 @@ const MESSAGE_ID_STATE_CHUNK: u8 = 0x09; #[derive(Debug, PartialEq)] pub enum Message { Status { - total_score: U256, + nonce: U256, best_hash: BlockHash, genesis_hash: BlockHash, }, @@ -49,7 +49,7 @@ impl Encodable for Message { fn rlp_append(&self, s: &mut RlpStream) { match self { Message::Status { - total_score, + nonce, best_hash, genesis_hash, } => { @@ -57,7 +57,7 @@ impl Encodable for Message { s.append(&MESSAGE_ID_STATUS); s.begin_list(3); - s.append(total_score); + s.append(nonce); s.append(best_hash); s.append(genesis_hash); } @@ -99,7 +99,7 @@ impl Decodable for Message { } Ok(Message::Status { - total_score: message.val_at(0)?, + nonce: message.val_at(0)?, best_hash: message.val_at(1)?, genesis_hash: message.val_at(2)?, }) @@ -137,7 +137,7 @@ mod tests { #[test] fn status_message_rlp() { rlp_encode_and_decode_test!(Message::Status { - total_score: U256::default(), + nonce: U256::zero(), best_hash: H256::default().into(), genesis_hash: H256::default().into(), }); diff --git a/test/src/helper/mock/blockSyncMessage.ts b/test/src/helper/mock/blockSyncMessage.ts index 3377f0c3df..b25f1aad02 100644 --- a/test/src/helper/mock/blockSyncMessage.ts +++ b/test/src/helper/mock/blockSyncMessage.ts @@ -64,7 +64,9 @@ export class BlockSyncMessage { if (msgId === MessageType.MESSAGE_ID_STATUS) { Emitter.emit("status"); const msg = decodedmsg[1]; - const totalScore = new U256(parseInt(msg[0].toString("hex"), 16)); + const totalScore = new U256( + parseInt(msg[0].toString("hex"), 16) || 0 + ); const bestHash = new H256(msg[1].toString("hex")); const genesisHash = new H256(msg[2].toString("hex")); return new BlockSyncMessage({