Skip to content

Commit d12b3d7

Browse files
committed
Fetch snapshot header from peers
1 parent afda67d commit d12b3d7

File tree

1 file changed

+173
-113
lines changed

1 file changed

+173
-113
lines changed

sync/src/block/extension.rs

Lines changed: 173 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -57,8 +57,9 @@ pub struct TokenInfo {
5757
request_id: Option<u64>,
5858
}
5959

60+
#[derive(Debug)]
6061
enum State {
61-
SnapshotHeader(H256),
62+
SnapshotHeader(BlockHash, u64),
6263
SnapshotChunk(H256),
6364
Full,
6465
}
@@ -90,10 +91,11 @@ impl Extension {
9091
_ => State::SnapshotChunk(*header.hash()),
9192
}
9293
}
93-
_ => State::SnapshotHeader(hash),
94+
_ => State::SnapshotHeader(hash.into(), num),
9495
},
9596
None => State::Full,
9697
};
98+
cdebug!(SYNC, "Initial state is {:?}", state);
9799
let mut header = client.best_header();
98100
let mut hollow_headers = vec![header.decode()];
99101
while client.block_body(&BlockId::Hash(header.hash())).is_none() {
@@ -309,35 +311,45 @@ impl NetworkExtension<Event> for Extension {
309311

310312
fn on_timeout(&mut self, token: TimerToken) {
311313
match token {
312-
SYNC_TIMER_TOKEN => match self.state {
313-
State::SnapshotHeader(..) => unimplemented!(),
314-
State::SnapshotChunk(..) => unimplemented!(),
315-
State::Full => {
316-
let best_proposal_score = self.client.chain_info().best_proposal_score;
317-
let mut peer_ids: Vec<_> = self.header_downloaders.keys().cloned().collect();
318-
peer_ids.shuffle(&mut thread_rng());
319-
320-
for id in &peer_ids {
321-
let request = self.header_downloaders.get_mut(id).and_then(HeaderDownloader::create_request);
322-
if let Some(request) = request {
323-
self.send_header_request(id, request);
324-
break
314+
SYNC_TIMER_TOKEN => {
315+
let mut peer_ids: Vec<_> = self.header_downloaders.keys().cloned().collect();
316+
peer_ids.shuffle(&mut thread_rng());
317+
318+
match self.state {
319+
State::SnapshotHeader(_, num) => {
320+
for id in &peer_ids {
321+
self.send_header_request(id, RequestMessage::Headers {
322+
start_number: num,
323+
max_count: 1,
324+
});
325325
}
326326
}
327+
State::SnapshotChunk(..) => unimplemented!(),
328+
State::Full => {
329+
let best_proposal_score = self.client.chain_info().best_proposal_score;
330+
for id in &peer_ids {
331+
let request =
332+
self.header_downloaders.get_mut(id).and_then(HeaderDownloader::create_request);
333+
if let Some(request) = request {
334+
self.send_header_request(id, request);
335+
break
336+
}
337+
}
327338

328-
for id in peer_ids {
329-
let peer_score = if let Some(peer) = self.header_downloaders.get(&id) {
330-
peer.total_score()
331-
} else {
332-
U256::zero()
333-
};
339+
for id in peer_ids {
340+
let peer_score = if let Some(peer) = self.header_downloaders.get(&id) {
341+
peer.total_score()
342+
} else {
343+
U256::zero()
344+
};
334345

335-
if peer_score > best_proposal_score {
336-
self.send_body_request(&id);
346+
if peer_score > best_proposal_score {
347+
self.send_body_request(&id);
348+
}
337349
}
338350
}
339351
}
340-
},
352+
}
341353
SYNC_EXPIRE_TOKEN_BEGIN..=SYNC_EXPIRE_TOKEN_END => {
342354
self.check_sync_variable();
343355
let (id, request_id) = {
@@ -413,39 +425,72 @@ pub enum Event {
413425

414426
impl Extension {
415427
fn new_headers(&mut self, imported: Vec<BlockHash>, enacted: Vec<BlockHash>, retracted: Vec<BlockHash>) {
416-
let peer_ids: Vec<_> = self.header_downloaders.keys().cloned().collect();
417-
for id in peer_ids {
418-
if let Some(peer) = self.header_downloaders.get_mut(&id) {
419-
peer.mark_as_imported(imported.clone());
428+
if let Some(next_state) = match self.state {
429+
State::SnapshotHeader(hash, ..) => {
430+
if imported.contains(&hash) {
431+
let header = self.client.block_header(&BlockId::Hash(hash)).expect("Imported header must exist");
432+
Some(State::SnapshotChunk(header.state_root()))
433+
} else {
434+
None
435+
}
420436
}
437+
State::SnapshotChunk(..) => unimplemented!(),
438+
State::Full => {
439+
let peer_ids: Vec<_> = self.header_downloaders.keys().cloned().collect();
440+
for id in peer_ids {
441+
if let Some(peer) = self.header_downloaders.get_mut(&id) {
442+
peer.mark_as_imported(imported.clone());
443+
}
444+
}
445+
let mut headers_to_download: Vec<_> = enacted
446+
.into_iter()
447+
.map(|hash| self.client.block_header(&BlockId::Hash(hash)).expect("Enacted header must exist"))
448+
.collect();
449+
headers_to_download.sort_unstable_by_key(EncodedHeader::number);
450+
#[allow(clippy::redundant_closure)]
451+
// False alarm. https://github.com/rust-lang/rust-clippy/issues/1439
452+
headers_to_download.dedup_by_key(|h| h.hash());
453+
454+
let headers: Vec<_> = headers_to_download
455+
.into_iter()
456+
.filter(|header| self.client.block_body(&BlockId::Hash(header.hash())).is_none())
457+
.collect(); // FIXME: No need to collect here if self is not borrowed.
458+
for header in headers {
459+
let parent = self
460+
.client
461+
.block_header(&BlockId::Hash(header.parent_hash()))
462+
.expect("Enacted header must have parent");
463+
self.body_downloader.add_target(&header.decode(), &parent.decode());
464+
}
465+
self.body_downloader.remove_target(&retracted);
466+
None
467+
}
468+
} {
469+
cdebug!(SYNC, "Transitioning state to {:?}", next_state);
470+
self.state = next_state;
421471
}
422-
let mut headers_to_download: Vec<_> = enacted
423-
.into_iter()
424-
.map(|hash| self.client.block_header(&BlockId::Hash(hash)).expect("Enacted header must exist"))
425-
.collect();
426-
headers_to_download.sort_unstable_by_key(EncodedHeader::number);
427-
#[allow(clippy::redundant_closure)]
428-
// False alarm. https://github.com/rust-lang/rust-clippy/issues/1439
429-
headers_to_download.dedup_by_key(|h| h.hash());
430-
431-
let headers: Vec<_> = headers_to_download
432-
.into_iter()
433-
.filter(|header| self.client.block_body(&BlockId::Hash(header.hash())).is_none())
434-
.collect(); // FIXME: No need to collect here if self is not borrowed.
435-
for header in headers {
436-
let parent = self
437-
.client
438-
.block_header(&BlockId::Hash(header.parent_hash()))
439-
.expect("Enacted header must have parent");
440-
self.body_downloader.add_target(&header.decode(), &parent.decode());
441-
}
442-
self.body_downloader.remove_target(&retracted);
443472
}
444473

445474
fn new_blocks(&mut self, imported: Vec<BlockHash>, invalid: Vec<BlockHash>) {
446-
self.body_downloader.remove_target(&imported);
447-
self.body_downloader.remove_target(&invalid);
448-
475+
if let Some(next_state) = match self.state {
476+
State::SnapshotHeader(hash, ..) => {
477+
if imported.contains(&hash) {
478+
let header = self.client.block_header(&BlockId::Hash(hash)).expect("Imported header must exist");
479+
Some(State::SnapshotChunk(header.state_root()))
480+
} else {
481+
None
482+
}
483+
}
484+
State::SnapshotChunk(..) => None,
485+
State::Full => {
486+
self.body_downloader.remove_target(&imported);
487+
self.body_downloader.remove_target(&invalid);
488+
None
489+
}
490+
} {
491+
cdebug!(SYNC, "Transitioning state to {:?}", next_state);
492+
self.state = next_state;
493+
}
449494

450495
let chain_info = self.client.chain_info();
451496

@@ -599,37 +644,33 @@ impl Extension {
599644
return
600645
}
601646

602-
match self.state {
603-
State::SnapshotHeader(..) => unimplemented!(),
604-
State::SnapshotChunk(..) => unimplemented!(),
605-
State::Full => match response {
606-
ResponseMessage::Headers(headers) => {
607-
self.dismiss_request(from, id);
608-
self.on_header_response(from, &headers)
609-
}
610-
ResponseMessage::Bodies(bodies) => {
611-
self.check_sync_variable();
612-
let hashes = match request {
613-
RequestMessage::Bodies(hashes) => hashes,
614-
_ => unreachable!(),
615-
};
616-
assert_eq!(bodies.len(), hashes.len());
617-
if let Some(token) = self.tokens.get(from) {
618-
if let Some(token_info) = self.tokens_info.get_mut(token) {
619-
if token_info.request_id.is_none() {
620-
ctrace!(SYNC, "Expired before handling response");
621-
return
622-
}
623-
self.api.clear_timer(*token).expect("Timer clear succeed");
624-
token_info.request_id = None;
647+
match response {
648+
ResponseMessage::Headers(headers) => {
649+
self.dismiss_request(from, id);
650+
self.on_header_response(from, &headers)
651+
}
652+
ResponseMessage::Bodies(bodies) => {
653+
self.check_sync_variable();
654+
let hashes = match request {
655+
RequestMessage::Bodies(hashes) => hashes,
656+
_ => unreachable!(),
657+
};
658+
assert_eq!(bodies.len(), hashes.len());
659+
if let Some(token) = self.tokens.get(from) {
660+
if let Some(token_info) = self.tokens_info.get_mut(token) {
661+
if token_info.request_id.is_none() {
662+
ctrace!(SYNC, "Expired before handling response");
663+
return
625664
}
665+
self.api.clear_timer(*token).expect("Timer clear succeed");
666+
token_info.request_id = None;
626667
}
627-
self.dismiss_request(from, id);
628-
self.on_body_response(hashes, bodies);
629-
self.check_sync_variable();
630668
}
631-
_ => unimplemented!(),
632-
},
669+
self.dismiss_request(from, id);
670+
self.on_body_response(hashes, bodies);
671+
self.check_sync_variable();
672+
}
673+
_ => unimplemented!(),
633674
}
634675
}
635676
}
@@ -699,42 +740,61 @@ impl Extension {
699740

700741
fn on_header_response(&mut self, from: &NodeId, headers: &[Header]) {
701742
ctrace!(SYNC, "Received header response from({}) with length({})", from, headers.len());
702-
let (mut completed, pivot_score_changed) = if let Some(peer) = self.header_downloaders.get_mut(from) {
703-
let before_pivot_score = peer.pivot_score();
704-
let encoded: Vec<_> = headers.iter().map(|h| EncodedHeader::new(h.rlp_bytes().to_vec())).collect();
705-
peer.import_headers(&encoded);
706-
let after_pivot_score = peer.pivot_score();
707-
(peer.downloaded(), before_pivot_score != after_pivot_score)
708-
} else {
709-
(Vec::new(), false)
710-
};
711-
completed.sort_unstable_by_key(EncodedHeader::number);
712-
713-
let mut exists = Vec::new();
714-
let mut queued = Vec::new();
715-
716-
for header in completed {
717-
let hash = header.hash();
718-
match self.client.import_header(header.clone().into_inner()) {
719-
Err(BlockImportError::Import(ImportError::AlreadyInChain)) => exists.push(hash),
720-
Err(BlockImportError::Import(ImportError::AlreadyQueued)) => queued.push(hash),
721-
// FIXME: handle import errors
722-
Err(err) => {
723-
cwarn!(SYNC, "Cannot import header({}): {:?}", header.hash(), err);
724-
break
743+
match self.state {
744+
State::SnapshotHeader(..) => {
745+
for header in headers {
746+
match self.client.import_header(header.rlp_bytes().to_vec()) {
747+
Err(BlockImportError::Import(ImportError::AlreadyInChain)) => {}
748+
Err(BlockImportError::Import(ImportError::AlreadyQueued)) => {}
749+
// FIXME: handle import errors
750+
Err(err) => {
751+
cwarn!(SYNC, "Cannot import header({}): {:?}", header.hash(), err);
752+
break
753+
}
754+
_ => {}
755+
}
725756
}
726-
_ => {}
727757
}
728-
}
758+
State::SnapshotChunk(..) => {}
759+
State::Full => {
760+
let (mut completed, pivot_score_changed) = if let Some(peer) = self.header_downloaders.get_mut(from) {
761+
let before_pivot_score = peer.pivot_score();
762+
let encoded: Vec<_> = headers.iter().map(|h| EncodedHeader::new(h.rlp_bytes().to_vec())).collect();
763+
peer.import_headers(&encoded);
764+
let after_pivot_score = peer.pivot_score();
765+
(peer.downloaded(), before_pivot_score != after_pivot_score)
766+
} else {
767+
(Vec::new(), false)
768+
};
769+
completed.sort_unstable_by_key(EncodedHeader::number);
729770

730-
let request = self.header_downloaders.get_mut(from).and_then(|peer| {
731-
peer.mark_as_queued(queued);
732-
peer.mark_as_imported(exists);
733-
peer.create_request()
734-
});
735-
if pivot_score_changed {
736-
if let Some(request) = request {
737-
self.send_header_request(from, request);
771+
let mut exists = Vec::new();
772+
let mut queued = Vec::new();
773+
774+
for header in completed {
775+
let hash = header.hash();
776+
match self.client.import_header(header.clone().into_inner()) {
777+
Err(BlockImportError::Import(ImportError::AlreadyInChain)) => exists.push(hash),
778+
Err(BlockImportError::Import(ImportError::AlreadyQueued)) => queued.push(hash),
779+
// FIXME: handle import errors
780+
Err(err) => {
781+
cwarn!(SYNC, "Cannot import header({}): {:?}", header.hash(), err);
782+
break
783+
}
784+
_ => {}
785+
}
786+
}
787+
788+
let request = self.header_downloaders.get_mut(from).and_then(|peer| {
789+
peer.mark_as_queued(queued);
790+
peer.mark_as_imported(exists);
791+
peer.create_request()
792+
});
793+
if pivot_score_changed {
794+
if let Some(request) = request {
795+
self.send_header_request(from, request);
796+
}
797+
}
738798
}
739799
}
740800
}

0 commit comments

Comments
 (0)