Skip to content

Commit b3e2238

Browse files
remagpieforiequal0
authored andcommitted
Decide the sync extension's state with the snapshot target
1 parent f718e4a commit b3e2238

File tree

3 files changed

+77
-47
lines changed

3 files changed

+77
-47
lines changed

sync/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ codechain-network = { path = "../network" }
1515
codechain-state = { path = "../state" }
1616
codechain-timer = { path = "../util/timer" }
1717
codechain-types = { path = "../types" }
18+
hashdb = { path = "../util/hashdb" }
1819
journaldb = { path = "../util/journaldb" }
1920
kvdb = "0.1"
2021
log = "0.4.6"
@@ -29,7 +30,6 @@ token-generator = "0.1.0"
2930
util-error = { path = "../util/error" }
3031

3132
[dev-dependencies]
32-
hashdb = { path = "../util/hashdb" }
3333
kvdb-memorydb = "0.1"
3434
tempfile = "3.0.4"
3535
trie-standardmap = { path = "../util/trie-standardmap" }

sync/src/block/extension.rs

Lines changed: 76 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,14 @@ use ccore::{
2424
Block, BlockChainClient, BlockChainTrait, BlockId, BlockImportError, ChainNotify, Client, ImportBlock, ImportError,
2525
UnverifiedTransaction,
2626
};
27+
use cmerkle::TrieFactory;
2728
use cnetwork::{Api, EventSender, NetworkExtension, NodeId};
2829
use cstate::FindActionHandler;
2930
use ctimer::TimerToken;
3031
use ctypes::header::{Header, Seal};
3132
use ctypes::transaction::Action;
3233
use ctypes::{BlockHash, BlockNumber};
34+
use hashdb::AsHashDB;
3335
use primitives::{H256, U256};
3436
use rand::prelude::SliceRandom;
3537
use rand::thread_rng;
@@ -55,7 +57,14 @@ pub struct TokenInfo {
5557
request_id: Option<u64>,
5658
}
5759

60+
enum State {
61+
SnapshotHeader(H256),
62+
SnapshotChunk(H256),
63+
Full,
64+
}
65+
5866
pub struct Extension {
67+
state: State,
5968
requests: HashMap<NodeId, Vec<(u64, RequestMessage)>>,
6069
connected_nodes: HashSet<NodeId>,
6170
header_downloaders: HashMap<NodeId, HeaderDownloader>,
@@ -69,9 +78,22 @@ pub struct Extension {
6978
}
7079

7180
impl Extension {
72-
pub fn new(client: Arc<Client>, api: Box<dyn Api>, _snapshot_target: Option<(H256, u64)>) -> Extension {
81+
pub fn new(client: Arc<Client>, api: Box<dyn Api>, snapshot_target: Option<(H256, u64)>) -> Extension {
7382
api.set_timer(SYNC_TIMER_TOKEN, Duration::from_millis(SYNC_TIMER_INTERVAL)).expect("Timer set succeeds");
7483

84+
let state = match snapshot_target {
85+
Some((hash, num)) => match client.block_header(&BlockId::Number(num)) {
86+
Some(ref header) if *header.hash() == hash => {
87+
let state_db = client.state_db().read();
88+
match TrieFactory::readonly(state_db.as_hashdb(), &header.state_root()) {
89+
Ok(ref trie) if trie.is_complete() => State::Full,
90+
_ => State::SnapshotChunk(*header.hash()),
91+
}
92+
}
93+
_ => State::SnapshotHeader(hash),
94+
},
95+
None => State::Full,
96+
};
7597
let mut header = client.best_header();
7698
let mut hollow_headers = vec![header.decode()];
7799
while client.block_body(&BlockId::Hash(header.hash())).is_none() {
@@ -90,6 +112,7 @@ impl Extension {
90112
}
91113
cinfo!(SYNC, "Sync extension initialized");
92114
Extension {
115+
state,
93116
requests: Default::default(),
94117
connected_nodes: Default::default(),
95118
header_downloaders: Default::default(),
@@ -308,31 +331,35 @@ impl NetworkExtension<Event> for Extension {
308331

309332
fn on_timeout(&mut self, token: TimerToken) {
310333
match token {
311-
SYNC_TIMER_TOKEN => {
312-
let best_proposal_score = self.client.chain_info().best_proposal_score;
313-
let mut peer_ids: Vec<_> = self.header_downloaders.keys().cloned().collect();
314-
peer_ids.shuffle(&mut thread_rng());
315-
316-
for id in &peer_ids {
317-
let request = self.header_downloaders.get_mut(id).and_then(HeaderDownloader::create_request);
318-
if let Some(request) = request {
319-
self.send_header_request(id, request);
320-
break
334+
SYNC_TIMER_TOKEN => match self.state {
335+
State::SnapshotHeader(..) => unimplemented!(),
336+
State::SnapshotChunk(..) => unimplemented!(),
337+
State::Full => {
338+
let best_proposal_score = self.client.chain_info().best_proposal_score;
339+
let mut peer_ids: Vec<_> = self.header_downloaders.keys().cloned().collect();
340+
peer_ids.shuffle(&mut thread_rng());
341+
342+
for id in &peer_ids {
343+
let request = self.header_downloaders.get_mut(id).and_then(HeaderDownloader::create_request);
344+
if let Some(request) = request {
345+
self.send_header_request(id, request);
346+
break
347+
}
321348
}
322-
}
323349

324-
for id in peer_ids {
325-
let peer_score = if let Some(peer) = self.header_downloaders.get(&id) {
326-
peer.total_score()
327-
} else {
328-
U256::zero()
329-
};
350+
for id in peer_ids {
351+
let peer_score = if let Some(peer) = self.header_downloaders.get(&id) {
352+
peer.total_score()
353+
} else {
354+
U256::zero()
355+
};
330356

331-
if peer_score > best_proposal_score {
332-
self.send_body_request(&id);
357+
if peer_score > best_proposal_score {
358+
self.send_body_request(&id);
359+
}
333360
}
334361
}
335-
}
362+
},
336363
SYNC_EXPIRE_TOKEN_BEGIN..=SYNC_EXPIRE_TOKEN_END => {
337364
self.check_sync_variable();
338365
let (id, request_id) = {
@@ -576,33 +603,37 @@ impl Extension {
576603
return
577604
}
578605

579-
match response {
580-
ResponseMessage::Headers(headers) => {
581-
self.dismiss_request(from, id);
582-
self.on_header_response(from, &headers)
583-
}
584-
ResponseMessage::Bodies(bodies) => {
585-
self.check_sync_variable();
586-
let hashes = match request {
587-
RequestMessage::Bodies(hashes) => hashes,
588-
_ => unreachable!(),
589-
};
590-
assert_eq!(bodies.len(), hashes.len());
591-
if let Some(token) = self.tokens.get(from) {
592-
if let Some(token_info) = self.tokens_info.get_mut(token) {
593-
if token_info.request_id.is_none() {
594-
ctrace!(SYNC, "Expired before handling response");
595-
return
606+
match self.state {
607+
State::SnapshotHeader(..) => unimplemented!(),
608+
State::SnapshotChunk(..) => unimplemented!(),
609+
State::Full => match response {
610+
ResponseMessage::Headers(headers) => {
611+
self.dismiss_request(from, id);
612+
self.on_header_response(from, &headers)
613+
}
614+
ResponseMessage::Bodies(bodies) => {
615+
self.check_sync_variable();
616+
let hashes = match request {
617+
RequestMessage::Bodies(hashes) => hashes,
618+
_ => unreachable!(),
619+
};
620+
assert_eq!(bodies.len(), hashes.len());
621+
if let Some(token) = self.tokens.get(from) {
622+
if let Some(token_info) = self.tokens_info.get_mut(token) {
623+
if token_info.request_id.is_none() {
624+
ctrace!(SYNC, "Expired before handling response");
625+
return
626+
}
627+
self.api.clear_timer(*token).expect("Timer clear succeed");
628+
token_info.request_id = None;
596629
}
597-
self.api.clear_timer(*token).expect("Timer clear succeed");
598-
token_info.request_id = None;
599630
}
631+
self.dismiss_request(from, id);
632+
self.on_body_response(hashes, bodies);
633+
self.check_sync_variable();
600634
}
601-
self.dismiss_request(from, id);
602-
self.on_body_response(hashes, bodies);
603-
self.check_sync_variable();
604-
}
605-
_ => unimplemented!(),
635+
_ => unimplemented!(),
636+
},
606637
}
607638
}
608639
}

sync/src/lib.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ extern crate codechain_state as cstate;
2525
extern crate codechain_timer as ctimer;
2626
extern crate codechain_types as ctypes;
2727

28-
#[cfg(test)]
2928
extern crate hashdb;
3029
extern crate journaldb;
3130
extern crate kvdb;

0 commit comments

Comments
 (0)