Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,4 +99,4 @@ pub use crate::service::ClientService;
pub use crate::transaction::{
LocalizedTransaction, PendingSignedTransactions, SignedTransaction, UnverifiedTransaction,
};
pub use crate::types::{BlockId, TransactionId};
pub use crate::types::{BlockId, BlockStatus, TransactionId};
5 changes: 3 additions & 2 deletions spec/Block-Synchronization-Extension.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ Message :=
### Status

```
Status(total_score, best_hash, genesis_hash)
Status(nonce, best_hash, genesis_hash)
```

Send current chain status to peer.

* Identifier: 0x01
* Restriction: None
* Restriction:
* `nonce` SHOULD be monotonically increasing every time the message is sent.

## Request messages

Expand Down
81 changes: 28 additions & 53 deletions sync/src/block/downloader/header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,65 +31,50 @@ const MAX_HEADER_QUEUE_LENGTH: usize = 1024;
const MAX_RETRY: usize = 3;
const MAX_WAIT: u64 = 15;

#[derive(Clone)]
struct Pivot {
hash: BlockHash,
total_score: U256,
}

#[derive(Clone)]
pub struct HeaderDownloader {
// NOTE: Use this member as minimum as possible.
client: Arc<dyn BlockChainClient>,

total_score: U256,
nonce: U256,
best_hash: BlockHash,

pivot: Pivot,
pivot: BlockHash,
request_time: Option<Instant>,
downloaded: HashMap<BlockHash, Header>,
queued: HashMap<BlockHash, Header>,
trial: usize,
}

impl HeaderDownloader {
pub fn total_score(&self) -> U256 {
self.total_score
}

pub fn new(client: Arc<dyn BlockChainClient>, total_score: U256, best_hash: BlockHash) -> Self {
pub fn new(client: Arc<dyn BlockChainClient>, nonce: U256, best_hash: BlockHash) -> Self {
let best_header_hash = client.best_block_header().hash();
let best_score = client.block_total_score(&BlockId::Latest).expect("Best block always exist");

Self {
client,

total_score,
nonce,
best_hash,

pivot: Pivot {
hash: best_header_hash,
total_score: best_score,
},
pivot: best_header_hash,
request_time: None,
downloaded: HashMap::new(),
queued: HashMap::new(),
trial: 0,
}
}

pub fn update(&mut self, total_score: U256, best_hash: BlockHash) -> bool {
match self.total_score.cmp(&total_score) {
pub fn best_hash(&self) -> BlockHash {
self.best_hash
}

pub fn update(&mut self, nonce: U256, best_hash: BlockHash) -> bool {
match self.nonce.cmp(&nonce) {
Ordering::Equal => true,
Ordering::Less => {
self.total_score = total_score;
self.nonce = nonce;
self.best_hash = best_hash;

if self.client.block_header(&BlockId::Hash(best_hash)).is_some() {
self.pivot = Pivot {
hash: best_hash,
total_score,
}
self.pivot = best_hash;
}
true
}
Expand All @@ -108,25 +93,25 @@ impl HeaderDownloader {
/// Find header from queued headers, downloaded cache and then from blockchain
/// Panics if header dosn't exist
fn pivot_header(&self) -> Header {
match self.queued.get(&self.pivot.hash) {
match self.queued.get(&self.pivot) {
Some(header) => header.clone(),
None => match self.downloaded.get(&self.pivot.hash) {
None => match self.downloaded.get(&self.pivot) {
Some(header) => header.clone(),
None => self.client.block_header(&BlockId::Hash(self.pivot.hash)).unwrap(),
None => self.client.block_header(&BlockId::Hash(self.pivot)).unwrap(),
},
}
}

pub fn pivot_score(&self) -> U256 {
self.pivot.total_score
}

pub fn is_idle(&self) -> bool {
let can_request = self.request_time.is_none() && self.total_score > self.pivot.total_score;
let can_request = self.request_time.is_none() && self.best_hash != self.pivot;

self.is_valid() && (can_request || self.is_expired())
}

pub fn is_caught_up(&self) -> bool {
self.pivot == self.best_hash
}

pub fn create_request(&mut self) -> Option<RequestMessage> {
if !self.is_idle() {
return None
Expand Down Expand Up @@ -154,37 +139,30 @@ impl HeaderDownloader {
let pivot_header = self.pivot_header();

// This happens when best_hash is imported by other peer.
if self.best_hash == self.pivot.hash {
if self.best_hash == self.pivot {
ctrace!(SYNC, "Ignore received headers, pivot already reached the best hash");
} else if first_header_hash == self.pivot.hash {
} else if first_header_hash == self.pivot {
for header in headers.iter() {
self.downloaded.insert(header.hash(), header.clone());
}

// FIXME: skip known headers
let new_scores = headers[1..].iter().fold(U256::zero(), |acc, header| acc + header.score());
self.pivot = Pivot {
hash: headers.last().expect("Last downloaded header must exist").hash(),
total_score: self.pivot.total_score + new_scores,
}
self.pivot = headers.last().expect("Last downloaded header must exist").hash();
} else if first_header_number < pivot_header.number() {
ctrace!(
SYNC,
"Ignore received headers, pivot is already updated since headers are imported by other peers"
);
} else if first_header_number == pivot_header.number() {
if pivot_header.number() != 0 {
self.pivot = Pivot {
hash: pivot_header.parent_hash(),
total_score: self.pivot.total_score - pivot_header.score(),
}
self.pivot = pivot_header.parent_hash();
}
} else {
cerror!(
SYNC,
"Invalid header update state. best_hash: {}, self.pivot.hash: {}, first_header_hash: {}",
"Invalid header update state. best_hash: {}, self.pivot: {}, first_header_hash: {}",
self.best_hash,
self.pivot.hash,
self.pivot,
first_header_hash
);
}
Expand All @@ -203,10 +181,7 @@ impl HeaderDownloader {
self.downloaded.remove(&hash);

if self.best_hash == hash {
self.pivot = Pivot {
hash,
total_score: self.total_score,
}
self.pivot = hash;
}
}
self.queued.shrink_to_fit();
Expand Down
64 changes: 28 additions & 36 deletions sync/src/block/extension.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ use std::time::Duration;

use ccore::encoded::Header as EncodedHeader;
use ccore::{
Block, BlockChainClient, BlockChainTrait, BlockId, BlockImportError, ChainNotify, Client, ImportBlock, ImportError,
UnverifiedTransaction,
Block, BlockChainClient, BlockChainTrait, BlockId, BlockImportError, BlockStatus, ChainNotify, Client, ImportBlock,
ImportError, UnverifiedTransaction,
};
use cmerkle::TrieFactory;
use cnetwork::{Api, EventSender, NetworkExtension, NodeId};
Expand Down Expand Up @@ -76,6 +76,7 @@ pub struct Extension {
client: Arc<Client>,
api: Box<dyn Api>,
last_request: u64,
nonce: u64,
}

impl Extension {
Expand Down Expand Up @@ -126,6 +127,7 @@ impl Extension {
client,
api,
last_request: Default::default(),
nonce: Default::default(),
}
}

Expand All @@ -146,6 +148,14 @@ impl Extension {
}

fn send_body_request(&mut self, id: &NodeId) {
if let Some(downloader) = self.header_downloaders.get(&id) {
if self.client.block_status(&BlockId::Hash(downloader.best_hash())) == BlockStatus::InChain {
// Peer is lagging behind the local blockchain.
// We don't need to request block bodies to this peer
return
}
}

self.check_sync_variable();
if let Some(requests) = self.requests.get_mut(id) {
let have_body_request = {
Expand Down Expand Up @@ -243,14 +253,15 @@ impl NetworkExtension<Event> for Extension {
id,
Arc::new(
Message::Status {
total_score: chain_info.best_proposal_score,
nonce: U256::from(self.nonce),
best_hash: chain_info.best_proposal_block_hash,
genesis_hash: chain_info.genesis_hash,
}
.rlp_bytes()
.into_vec(),
),
);
self.nonce += 1;
let t = self.connected_nodes.insert(*id);
debug_assert!(t, "{} is already added to peer list", id);

Expand Down Expand Up @@ -299,10 +310,10 @@ impl NetworkExtension<Event> for Extension {
if let Ok(received_message) = UntrustedRlp::new(data).as_val() {
match received_message {
Message::Status {
total_score,
nonce,
best_hash,
genesis_hash,
} => self.on_peer_status(id, total_score, best_hash, genesis_hash),
} => self.on_peer_status(id, nonce, best_hash, genesis_hash),
Message::Request(request_id, request) => self.on_peer_request(id, request_id, request),
Message::Response(request_id, response) => self.on_peer_response(id, request_id, response),
}
Expand All @@ -328,7 +339,6 @@ impl NetworkExtension<Event> for Extension {
}
State::SnapshotChunk(..) => unimplemented!(),
State::Full => {
let best_proposal_score = self.client.chain_info().best_proposal_score;
for id in &peer_ids {
let request =
self.header_downloaders.get_mut(id).and_then(HeaderDownloader::create_request);
Expand All @@ -339,15 +349,7 @@ impl NetworkExtension<Event> for Extension {
}

for id in peer_ids {
let peer_score = if let Some(peer) = self.header_downloaders.get(&id) {
peer.total_score()
} else {
U256::zero()
};

if peer_score > best_proposal_score {
self.send_body_request(&id);
}
self.send_body_request(&id);
}
}
}
Expand Down Expand Up @@ -501,20 +503,21 @@ impl Extension {
id,
Arc::new(
Message::Status {
total_score: chain_info.best_proposal_score,
nonce: U256::from(self.nonce),
best_hash: chain_info.best_proposal_block_hash,
genesis_hash: chain_info.genesis_hash,
}
.rlp_bytes()
.into_vec(),
),
);
self.nonce += 1;
}
}
}

impl Extension {
fn on_peer_status(&mut self, from: &NodeId, total_score: U256, best_hash: BlockHash, genesis_hash: BlockHash) {
fn on_peer_status(&mut self, from: &NodeId, nonce: U256, best_hash: BlockHash, genesis_hash: BlockHash) {
// Validity check
if genesis_hash != self.client.chain_info().genesis_hash {
cinfo!(SYNC, "Genesis hash mismatch with peer {}", from);
Expand All @@ -523,17 +526,17 @@ impl Extension {

match self.header_downloaders.entry(*from) {
Entry::Occupied(mut peer) => {
if !peer.get_mut().update(total_score, best_hash) {
if !peer.get_mut().update(nonce, best_hash) {
// FIXME: It should be an error level if the consensus is PoW.
cdebug!(SYNC, "Peer #{} status updated but score is less than before", from);
return
}
}
Entry::Vacant(e) => {
e.insert(HeaderDownloader::new(self.client.clone(), total_score, best_hash));
e.insert(HeaderDownloader::new(self.client.clone(), nonce, best_hash));
}
}
cinfo!(SYNC, "Peer #{} status update: total_score: {}, best_hash: {}", from, total_score, best_hash);
cinfo!(SYNC, "Peer #{} status update: nonce: {}, best_hash: {}", from, nonce, best_hash);
}

fn on_peer_request(&self, from: &NodeId, id: u64, request: RequestMessage) {
Expand Down Expand Up @@ -759,14 +762,12 @@ impl Extension {
}
State::SnapshotChunk(..) => {}
State::Full => {
let (mut completed, pivot_score_changed) = if let Some(peer) = self.header_downloaders.get_mut(from) {
let before_pivot_score = peer.pivot_score();
let (mut completed, peer_is_caught_up) = if let Some(peer) = self.header_downloaders.get_mut(from) {
let encoded: Vec<_> = headers.iter().map(|h| EncodedHeader::new(h.rlp_bytes().to_vec())).collect();
peer.import_headers(&encoded);
let after_pivot_score = peer.pivot_score();
(peer.downloaded(), before_pivot_score != after_pivot_score)
(peer.downloaded(), peer.is_caught_up())
} else {
(Vec::new(), false)
(Vec::new(), true)
};
completed.sort_unstable_by_key(EncodedHeader::number);

Expand All @@ -792,7 +793,7 @@ impl Extension {
peer.mark_as_imported(exists);
peer.create_request()
});
if pivot_score_changed {
if !peer_is_caught_up {
if let Some(request) = request {
self.send_header_request(from, request);
}
Expand Down Expand Up @@ -834,20 +835,11 @@ impl Extension {
}
}

let total_score = self.client.chain_info().best_proposal_score;
let mut peer_ids: Vec<_> = self.header_downloaders.keys().cloned().collect();
peer_ids.shuffle(&mut thread_rng());

for id in peer_ids {
let peer_score = if let Some(peer) = self.header_downloaders.get(&id) {
peer.total_score()
} else {
U256::zero()
};

if peer_score > total_score {
self.send_body_request(&id);
}
self.send_body_request(&id);
}
}
}
Expand Down
Loading