Skip to content

Commit 66ef929

Browse files
committed
Rename total_score to nonce in sync message
When CodeChain is synchronized by snapshot sync, `total_score` doesn't mean the accumulated score from the genesis block. To handle this, the sync extension is updated to not rely on `total_score` for deciding whether a peer is leading or not, and use the value only as a monotonically increasing nonce.
1 parent ea56c0e commit 66ef929

File tree

6 files changed

+68
-98
lines changed

6 files changed

+68
-98
lines changed

core/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,4 +99,4 @@ pub use crate::service::ClientService;
9999
pub use crate::transaction::{
100100
LocalizedTransaction, PendingSignedTransactions, SignedTransaction, UnverifiedTransaction,
101101
};
102-
pub use crate::types::{BlockId, TransactionId};
102+
pub use crate::types::{BlockId, BlockStatus, TransactionId};

spec/Block-Synchronization-Extension.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,14 @@ Message :=
1919
### Status
2020

2121
```
22-
Status(total_score, best_hash, genesis_hash)
22+
Status(nonce, best_hash, genesis_hash)
2323
```
2424

2525
Send current chain status to peer.
2626

2727
* Identifier: 0x01
28-
* Restriction: None
28+
* Restriction:
29+
* `nonce` SHOULD be monotonically increasing every time the message is sent.
2930

3031
## Request messages
3132

sync/src/block/downloader/header.rs

Lines changed: 28 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -31,65 +31,50 @@ const MAX_HEADER_QUEUE_LENGTH: usize = 1024;
3131
const MAX_RETRY: usize = 3;
3232
const MAX_WAIT: u64 = 15;
3333

34-
#[derive(Clone)]
35-
struct Pivot {
36-
hash: BlockHash,
37-
total_score: U256,
38-
}
39-
4034
#[derive(Clone)]
4135
pub struct HeaderDownloader {
4236
// NOTE: Use this member as minimum as possible.
4337
client: Arc<dyn BlockChainClient>,
4438

45-
total_score: U256,
39+
nonce: U256,
4640
best_hash: BlockHash,
47-
48-
pivot: Pivot,
41+
pivot: BlockHash,
4942
request_time: Option<Instant>,
5043
downloaded: HashMap<BlockHash, Header>,
5144
queued: HashMap<BlockHash, Header>,
5245
trial: usize,
5346
}
5447

5548
impl HeaderDownloader {
56-
pub fn total_score(&self) -> U256 {
57-
self.total_score
58-
}
59-
60-
pub fn new(client: Arc<dyn BlockChainClient>, total_score: U256, best_hash: BlockHash) -> Self {
49+
pub fn new(client: Arc<dyn BlockChainClient>, nonce: U256, best_hash: BlockHash) -> Self {
6150
let best_header_hash = client.best_block_header().hash();
62-
let best_score = client.block_total_score(&BlockId::Latest).expect("Best block always exist");
6351

6452
Self {
6553
client,
6654

67-
total_score,
55+
nonce,
6856
best_hash,
69-
70-
pivot: Pivot {
71-
hash: best_header_hash,
72-
total_score: best_score,
73-
},
57+
pivot: best_header_hash,
7458
request_time: None,
7559
downloaded: HashMap::new(),
7660
queued: HashMap::new(),
7761
trial: 0,
7862
}
7963
}
8064

81-
pub fn update(&mut self, total_score: U256, best_hash: BlockHash) -> bool {
82-
match self.total_score.cmp(&total_score) {
65+
pub fn best_hash(&self) -> BlockHash {
66+
self.best_hash
67+
}
68+
69+
pub fn update(&mut self, nonce: U256, best_hash: BlockHash) -> bool {
70+
match self.nonce.cmp(&nonce) {
8371
Ordering::Equal => true,
8472
Ordering::Less => {
85-
self.total_score = total_score;
73+
self.nonce = nonce;
8674
self.best_hash = best_hash;
8775

8876
if self.client.block_header(&BlockId::Hash(best_hash)).is_some() {
89-
self.pivot = Pivot {
90-
hash: best_hash,
91-
total_score,
92-
}
77+
self.pivot = best_hash;
9378
}
9479
true
9580
}
@@ -108,25 +93,25 @@ impl HeaderDownloader {
10893
/// Find header from queued headers, downloaded cache and then from blockchain
10994
/// Panics if header dosn't exist
11095
fn pivot_header(&self) -> Header {
111-
match self.queued.get(&self.pivot.hash) {
96+
match self.queued.get(&self.pivot) {
11297
Some(header) => header.clone(),
113-
None => match self.downloaded.get(&self.pivot.hash) {
98+
None => match self.downloaded.get(&self.pivot) {
11499
Some(header) => header.clone(),
115-
None => self.client.block_header(&BlockId::Hash(self.pivot.hash)).unwrap(),
100+
None => self.client.block_header(&BlockId::Hash(self.pivot)).unwrap(),
116101
},
117102
}
118103
}
119104

120-
pub fn pivot_score(&self) -> U256 {
121-
self.pivot.total_score
122-
}
123-
124105
pub fn is_idle(&self) -> bool {
125-
let can_request = self.request_time.is_none() && self.total_score > self.pivot.total_score;
106+
let can_request = self.request_time.is_none() && self.best_hash != self.pivot;
126107

127108
self.is_valid() && (can_request || self.is_expired())
128109
}
129110

111+
pub fn is_caught_up(&self) -> bool {
112+
self.pivot == self.best_hash
113+
}
114+
130115
pub fn create_request(&mut self) -> Option<RequestMessage> {
131116
if !self.is_idle() {
132117
return None
@@ -154,37 +139,30 @@ impl HeaderDownloader {
154139
let pivot_header = self.pivot_header();
155140

156141
// This happens when best_hash is imported by other peer.
157-
if self.best_hash == self.pivot.hash {
142+
if self.best_hash == self.pivot {
158143
ctrace!(SYNC, "Ignore received headers, pivot already reached the best hash");
159-
} else if first_header_hash == self.pivot.hash {
144+
} else if first_header_hash == self.pivot {
160145
for header in headers.iter() {
161146
self.downloaded.insert(header.hash(), header.clone());
162147
}
163148

164149
// FIXME: skip known headers
165-
let new_scores = headers[1..].iter().fold(U256::zero(), |acc, header| acc + header.score());
166-
self.pivot = Pivot {
167-
hash: headers.last().expect("Last downloaded header must exist").hash(),
168-
total_score: self.pivot.total_score + new_scores,
169-
}
150+
self.pivot = headers.last().expect("Last downloaded header must exist").hash();
170151
} else if first_header_number < pivot_header.number() {
171152
ctrace!(
172153
SYNC,
173154
"Ignore received headers, pivot is already updated since headers are imported by other peers"
174155
);
175156
} else if first_header_number == pivot_header.number() {
176157
if pivot_header.number() != 0 {
177-
self.pivot = Pivot {
178-
hash: pivot_header.parent_hash(),
179-
total_score: self.pivot.total_score - pivot_header.score(),
180-
}
158+
self.pivot = pivot_header.parent_hash();
181159
}
182160
} else {
183161
cerror!(
184162
SYNC,
185-
"Invalid header update state. best_hash: {}, self.pivot.hash: {}, first_header_hash: {}",
163+
"Invalid header update state. best_hash: {}, self.pivot: {}, first_header_hash: {}",
186164
self.best_hash,
187-
self.pivot.hash,
165+
self.pivot,
188166
first_header_hash
189167
);
190168
}
@@ -203,10 +181,7 @@ impl HeaderDownloader {
203181
self.downloaded.remove(&hash);
204182

205183
if self.best_hash == hash {
206-
self.pivot = Pivot {
207-
hash,
208-
total_score: self.total_score,
209-
}
184+
self.pivot = hash;
210185
}
211186
}
212187
self.queued.shrink_to_fit();

sync/src/block/extension.rs

Lines changed: 28 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ use std::time::Duration;
2121

2222
use ccore::encoded::Header as EncodedHeader;
2323
use ccore::{
24-
Block, BlockChainClient, BlockChainTrait, BlockId, BlockImportError, ChainNotify, Client, ImportBlock, ImportError,
25-
UnverifiedTransaction,
24+
Block, BlockChainClient, BlockChainTrait, BlockId, BlockImportError, BlockStatus, ChainNotify, Client, ImportBlock,
25+
ImportError, UnverifiedTransaction,
2626
};
2727
use cmerkle::TrieFactory;
2828
use cnetwork::{Api, EventSender, NetworkExtension, NodeId};
@@ -76,6 +76,7 @@ pub struct Extension {
7676
client: Arc<Client>,
7777
api: Box<dyn Api>,
7878
last_request: u64,
79+
nonce: u64,
7980
}
8081

8182
impl Extension {
@@ -126,6 +127,7 @@ impl Extension {
126127
client,
127128
api,
128129
last_request: Default::default(),
130+
nonce: Default::default(),
129131
}
130132
}
131133

@@ -146,6 +148,14 @@ impl Extension {
146148
}
147149

148150
fn send_body_request(&mut self, id: &NodeId) {
151+
if let Some(downloader) = self.header_downloaders.get(&id) {
152+
if self.client.block_status(&BlockId::Hash(downloader.best_hash())) == BlockStatus::InChain {
153+
// Peer is lagging behind the local blockchain.
154+
// We don't need to request block bodies to this peer
155+
return
156+
}
157+
}
158+
149159
self.check_sync_variable();
150160
if let Some(requests) = self.requests.get_mut(id) {
151161
let have_body_request = {
@@ -243,14 +253,15 @@ impl NetworkExtension<Event> for Extension {
243253
id,
244254
Arc::new(
245255
Message::Status {
246-
total_score: chain_info.best_proposal_score,
256+
nonce: U256::from(self.nonce),
247257
best_hash: chain_info.best_proposal_block_hash,
248258
genesis_hash: chain_info.genesis_hash,
249259
}
250260
.rlp_bytes()
251261
.into_vec(),
252262
),
253263
);
264+
self.nonce += 1;
254265
let t = self.connected_nodes.insert(*id);
255266
debug_assert!(t, "{} is already added to peer list", id);
256267

@@ -299,10 +310,10 @@ impl NetworkExtension<Event> for Extension {
299310
if let Ok(received_message) = UntrustedRlp::new(data).as_val() {
300311
match received_message {
301312
Message::Status {
302-
total_score,
313+
nonce,
303314
best_hash,
304315
genesis_hash,
305-
} => self.on_peer_status(id, total_score, best_hash, genesis_hash),
316+
} => self.on_peer_status(id, nonce, best_hash, genesis_hash),
306317
Message::Request(request_id, request) => self.on_peer_request(id, request_id, request),
307318
Message::Response(request_id, response) => self.on_peer_response(id, request_id, response),
308319
}
@@ -328,7 +339,6 @@ impl NetworkExtension<Event> for Extension {
328339
}
329340
State::SnapshotChunk(..) => unimplemented!(),
330341
State::Full => {
331-
let best_proposal_score = self.client.chain_info().best_proposal_score;
332342
for id in &peer_ids {
333343
let request =
334344
self.header_downloaders.get_mut(id).and_then(HeaderDownloader::create_request);
@@ -339,15 +349,7 @@ impl NetworkExtension<Event> for Extension {
339349
}
340350

341351
for id in peer_ids {
342-
let peer_score = if let Some(peer) = self.header_downloaders.get(&id) {
343-
peer.total_score()
344-
} else {
345-
U256::zero()
346-
};
347-
348-
if peer_score > best_proposal_score {
349-
self.send_body_request(&id);
350-
}
352+
self.send_body_request(&id);
351353
}
352354
}
353355
}
@@ -501,20 +503,21 @@ impl Extension {
501503
id,
502504
Arc::new(
503505
Message::Status {
504-
total_score: chain_info.best_proposal_score,
506+
nonce: U256::from(self.nonce),
505507
best_hash: chain_info.best_proposal_block_hash,
506508
genesis_hash: chain_info.genesis_hash,
507509
}
508510
.rlp_bytes()
509511
.into_vec(),
510512
),
511513
);
514+
self.nonce += 1;
512515
}
513516
}
514517
}
515518

516519
impl Extension {
517-
fn on_peer_status(&mut self, from: &NodeId, total_score: U256, best_hash: BlockHash, genesis_hash: BlockHash) {
520+
fn on_peer_status(&mut self, from: &NodeId, nonce: U256, best_hash: BlockHash, genesis_hash: BlockHash) {
518521
// Validity check
519522
if genesis_hash != self.client.chain_info().genesis_hash {
520523
cinfo!(SYNC, "Genesis hash mismatch with peer {}", from);
@@ -523,17 +526,17 @@ impl Extension {
523526

524527
match self.header_downloaders.entry(*from) {
525528
Entry::Occupied(mut peer) => {
526-
if !peer.get_mut().update(total_score, best_hash) {
529+
if !peer.get_mut().update(nonce, best_hash) {
527530
// FIXME: It should be an error level if the consensus is PoW.
528531
cdebug!(SYNC, "Peer #{} status updated but score is less than before", from);
529532
return
530533
}
531534
}
532535
Entry::Vacant(e) => {
533-
e.insert(HeaderDownloader::new(self.client.clone(), total_score, best_hash));
536+
e.insert(HeaderDownloader::new(self.client.clone(), nonce, best_hash));
534537
}
535538
}
536-
cinfo!(SYNC, "Peer #{} status update: total_score: {}, best_hash: {}", from, total_score, best_hash);
539+
cinfo!(SYNC, "Peer #{} status update: nonce: {}, best_hash: {}", from, nonce, best_hash);
537540
}
538541

539542
fn on_peer_request(&self, from: &NodeId, id: u64, request: RequestMessage) {
@@ -759,14 +762,12 @@ impl Extension {
759762
}
760763
State::SnapshotChunk(..) => {}
761764
State::Full => {
762-
let (mut completed, pivot_score_changed) = if let Some(peer) = self.header_downloaders.get_mut(from) {
763-
let before_pivot_score = peer.pivot_score();
765+
let (mut completed, peer_is_caught_up) = if let Some(peer) = self.header_downloaders.get_mut(from) {
764766
let encoded: Vec<_> = headers.iter().map(|h| EncodedHeader::new(h.rlp_bytes().to_vec())).collect();
765767
peer.import_headers(&encoded);
766-
let after_pivot_score = peer.pivot_score();
767-
(peer.downloaded(), before_pivot_score != after_pivot_score)
768+
(peer.downloaded(), peer.is_caught_up())
768769
} else {
769-
(Vec::new(), false)
770+
(Vec::new(), true)
770771
};
771772
completed.sort_unstable_by_key(EncodedHeader::number);
772773

@@ -792,7 +793,7 @@ impl Extension {
792793
peer.mark_as_imported(exists);
793794
peer.create_request()
794795
});
795-
if pivot_score_changed {
796+
if !peer_is_caught_up {
796797
if let Some(request) = request {
797798
self.send_header_request(from, request);
798799
}
@@ -834,20 +835,11 @@ impl Extension {
834835
}
835836
}
836837

837-
let total_score = self.client.chain_info().best_proposal_score;
838838
let mut peer_ids: Vec<_> = self.header_downloaders.keys().cloned().collect();
839839
peer_ids.shuffle(&mut thread_rng());
840840

841841
for id in peer_ids {
842-
let peer_score = if let Some(peer) = self.header_downloaders.get(&id) {
843-
peer.total_score()
844-
} else {
845-
U256::zero()
846-
};
847-
848-
if peer_score > total_score {
849-
self.send_body_request(&id);
850-
}
842+
self.send_body_request(&id);
851843
}
852844
}
853845
}

0 commit comments

Comments
 (0)