@@ -57,8 +57,9 @@ pub struct TokenInfo {
5757 request_id : Option < u64 > ,
5858}
5959
60+ #[ derive( Debug ) ]
6061enum State {
61- SnapshotHeader ( H256 ) ,
62+ SnapshotHeader ( BlockHash , u64 ) ,
6263 SnapshotChunk ( H256 ) ,
6364 Full ,
6465}
@@ -90,10 +91,11 @@ impl Extension {
9091 _ => State :: SnapshotChunk ( * header. hash ( ) ) ,
9192 }
9293 }
93- _ => State :: SnapshotHeader ( hash) ,
94+ _ => State :: SnapshotHeader ( hash. into ( ) , num ) ,
9495 } ,
9596 None => State :: Full ,
9697 } ;
98+ cdebug ! ( SYNC , "Initial state is {:?}" , state) ;
9799 let mut header = client. best_header ( ) ;
98100 let mut hollow_headers = vec ! [ header. decode( ) ] ;
99101 while client. block_body ( & BlockId :: Hash ( header. hash ( ) ) ) . is_none ( ) {
@@ -309,35 +311,45 @@ impl NetworkExtension<Event> for Extension {
309311
310312 fn on_timeout ( & mut self , token : TimerToken ) {
311313 match token {
312- SYNC_TIMER_TOKEN => match self . state {
313- State :: SnapshotHeader ( ..) => unimplemented ! ( ) ,
314- State :: SnapshotChunk ( ..) => unimplemented ! ( ) ,
315- State :: Full => {
316- let best_proposal_score = self . client . chain_info ( ) . best_proposal_score ;
317- let mut peer_ids: Vec < _ > = self . header_downloaders . keys ( ) . cloned ( ) . collect ( ) ;
318- peer_ids. shuffle ( & mut thread_rng ( ) ) ;
319-
320- for id in & peer_ids {
321- let request = self . header_downloaders . get_mut ( id) . and_then ( HeaderDownloader :: create_request) ;
322- if let Some ( request) = request {
323- self . send_header_request ( id, request) ;
324- break
314+ SYNC_TIMER_TOKEN => {
315+ let mut peer_ids: Vec < _ > = self . header_downloaders . keys ( ) . cloned ( ) . collect ( ) ;
316+ peer_ids. shuffle ( & mut thread_rng ( ) ) ;
317+
318+ match self . state {
319+ State :: SnapshotHeader ( _, num) => {
320+ for id in & peer_ids {
321+ self . send_header_request ( id, RequestMessage :: Headers {
322+ start_number : num,
323+ max_count : 1 ,
324+ } ) ;
325325 }
326326 }
327+ State :: SnapshotChunk ( ..) => unimplemented ! ( ) ,
328+ State :: Full => {
329+ let best_proposal_score = self . client . chain_info ( ) . best_proposal_score ;
330+ for id in & peer_ids {
331+ let request =
332+ self . header_downloaders . get_mut ( id) . and_then ( HeaderDownloader :: create_request) ;
333+ if let Some ( request) = request {
334+ self . send_header_request ( id, request) ;
335+ break
336+ }
337+ }
327338
328- for id in peer_ids {
329- let peer_score = if let Some ( peer) = self . header_downloaders . get ( & id) {
330- peer. total_score ( )
331- } else {
332- U256 :: zero ( )
333- } ;
339+ for id in peer_ids {
340+ let peer_score = if let Some ( peer) = self . header_downloaders . get ( & id) {
341+ peer. total_score ( )
342+ } else {
343+ U256 :: zero ( )
344+ } ;
334345
335- if peer_score > best_proposal_score {
336- self . send_body_request ( & id) ;
346+ if peer_score > best_proposal_score {
347+ self . send_body_request ( & id) ;
348+ }
337349 }
338350 }
339351 }
340- } ,
352+ }
341353 SYNC_EXPIRE_TOKEN_BEGIN ..=SYNC_EXPIRE_TOKEN_END => {
342354 self . check_sync_variable ( ) ;
343355 let ( id, request_id) = {
@@ -413,39 +425,72 @@ pub enum Event {
413425
414426impl Extension {
415427 fn new_headers ( & mut self , imported : Vec < BlockHash > , enacted : Vec < BlockHash > , retracted : Vec < BlockHash > ) {
416- let peer_ids: Vec < _ > = self . header_downloaders . keys ( ) . cloned ( ) . collect ( ) ;
417- for id in peer_ids {
418- if let Some ( peer) = self . header_downloaders . get_mut ( & id) {
419- peer. mark_as_imported ( imported. clone ( ) ) ;
428+ if let Some ( next_state) = match self . state {
429+ State :: SnapshotHeader ( hash, ..) => {
430+ if imported. contains ( & hash) {
431+ let header = self . client . block_header ( & BlockId :: Hash ( hash) ) . expect ( "Imported header must exist" ) ;
432+ Some ( State :: SnapshotChunk ( header. state_root ( ) ) )
433+ } else {
434+ None
435+ }
420436 }
437+ State :: SnapshotChunk ( ..) => unimplemented ! ( ) ,
438+ State :: Full => {
439+ let peer_ids: Vec < _ > = self . header_downloaders . keys ( ) . cloned ( ) . collect ( ) ;
440+ for id in peer_ids {
441+ if let Some ( peer) = self . header_downloaders . get_mut ( & id) {
442+ peer. mark_as_imported ( imported. clone ( ) ) ;
443+ }
444+ }
445+ let mut headers_to_download: Vec < _ > = enacted
446+ . into_iter ( )
447+ . map ( |hash| self . client . block_header ( & BlockId :: Hash ( hash) ) . expect ( "Enacted header must exist" ) )
448+ . collect ( ) ;
449+ headers_to_download. sort_unstable_by_key ( EncodedHeader :: number) ;
450+ #[ allow( clippy:: redundant_closure) ]
451+ // False alarm. https://github.com/rust-lang/rust-clippy/issues/1439
452+ headers_to_download. dedup_by_key ( |h| h. hash ( ) ) ;
453+
454+ let headers: Vec < _ > = headers_to_download
455+ . into_iter ( )
456+ . filter ( |header| self . client . block_body ( & BlockId :: Hash ( header. hash ( ) ) ) . is_none ( ) )
457+ . collect ( ) ; // FIXME: No need to collect here if self is not borrowed.
458+ for header in headers {
459+ let parent = self
460+ . client
461+ . block_header ( & BlockId :: Hash ( header. parent_hash ( ) ) )
462+ . expect ( "Enacted header must have parent" ) ;
463+ self . body_downloader . add_target ( & header. decode ( ) , & parent. decode ( ) ) ;
464+ }
465+ self . body_downloader . remove_target ( & retracted) ;
466+ None
467+ }
468+ } {
469+ cdebug ! ( SYNC , "Transitioning state to {:?}" , next_state) ;
470+ self . state = next_state;
421471 }
422- let mut headers_to_download: Vec < _ > = enacted
423- . into_iter ( )
424- . map ( |hash| self . client . block_header ( & BlockId :: Hash ( hash) ) . expect ( "Enacted header must exist" ) )
425- . collect ( ) ;
426- headers_to_download. sort_unstable_by_key ( EncodedHeader :: number) ;
427- #[ allow( clippy:: redundant_closure) ]
428- // False alarm. https://github.com/rust-lang/rust-clippy/issues/1439
429- headers_to_download. dedup_by_key ( |h| h. hash ( ) ) ;
430-
431- let headers: Vec < _ > = headers_to_download
432- . into_iter ( )
433- . filter ( |header| self . client . block_body ( & BlockId :: Hash ( header. hash ( ) ) ) . is_none ( ) )
434- . collect ( ) ; // FIXME: No need to collect here if self is not borrowed.
435- for header in headers {
436- let parent = self
437- . client
438- . block_header ( & BlockId :: Hash ( header. parent_hash ( ) ) )
439- . expect ( "Enacted header must have parent" ) ;
440- self . body_downloader . add_target ( & header. decode ( ) , & parent. decode ( ) ) ;
441- }
442- self . body_downloader . remove_target ( & retracted) ;
443472 }
444473
445474 fn new_blocks ( & mut self , imported : Vec < BlockHash > , invalid : Vec < BlockHash > ) {
446- self . body_downloader . remove_target ( & imported) ;
447- self . body_downloader . remove_target ( & invalid) ;
448-
475+ if let Some ( next_state) = match self . state {
476+ State :: SnapshotHeader ( hash, ..) => {
477+ if imported. contains ( & hash) {
478+ let header = self . client . block_header ( & BlockId :: Hash ( hash) ) . expect ( "Imported header must exist" ) ;
479+ Some ( State :: SnapshotChunk ( header. state_root ( ) ) )
480+ } else {
481+ None
482+ }
483+ }
484+ State :: SnapshotChunk ( ..) => None ,
485+ State :: Full => {
486+ self . body_downloader . remove_target ( & imported) ;
487+ self . body_downloader . remove_target ( & invalid) ;
488+ None
489+ }
490+ } {
491+ cdebug ! ( SYNC , "Transitioning state to {:?}" , next_state) ;
492+ self . state = next_state;
493+ }
449494
450495 let chain_info = self . client . chain_info ( ) ;
451496
@@ -599,37 +644,33 @@ impl Extension {
599644 return
600645 }
601646
602- match self . state {
603- State :: SnapshotHeader ( ..) => unimplemented ! ( ) ,
604- State :: SnapshotChunk ( ..) => unimplemented ! ( ) ,
605- State :: Full => match response {
606- ResponseMessage :: Headers ( headers) => {
607- self . dismiss_request ( from, id) ;
608- self . on_header_response ( from, & headers)
609- }
610- ResponseMessage :: Bodies ( bodies) => {
611- self . check_sync_variable ( ) ;
612- let hashes = match request {
613- RequestMessage :: Bodies ( hashes) => hashes,
614- _ => unreachable ! ( ) ,
615- } ;
616- assert_eq ! ( bodies. len( ) , hashes. len( ) ) ;
617- if let Some ( token) = self . tokens . get ( from) {
618- if let Some ( token_info) = self . tokens_info . get_mut ( token) {
619- if token_info. request_id . is_none ( ) {
620- ctrace ! ( SYNC , "Expired before handling response" ) ;
621- return
622- }
623- self . api . clear_timer ( * token) . expect ( "Timer clear succeed" ) ;
624- token_info. request_id = None ;
647+ match response {
648+ ResponseMessage :: Headers ( headers) => {
649+ self . dismiss_request ( from, id) ;
650+ self . on_header_response ( from, & headers)
651+ }
652+ ResponseMessage :: Bodies ( bodies) => {
653+ self . check_sync_variable ( ) ;
654+ let hashes = match request {
655+ RequestMessage :: Bodies ( hashes) => hashes,
656+ _ => unreachable ! ( ) ,
657+ } ;
658+ assert_eq ! ( bodies. len( ) , hashes. len( ) ) ;
659+ if let Some ( token) = self . tokens . get ( from) {
660+ if let Some ( token_info) = self . tokens_info . get_mut ( token) {
661+ if token_info. request_id . is_none ( ) {
662+ ctrace ! ( SYNC , "Expired before handling response" ) ;
663+ return
625664 }
665+ self . api . clear_timer ( * token) . expect ( "Timer clear succeed" ) ;
666+ token_info. request_id = None ;
626667 }
627- self . dismiss_request ( from, id) ;
628- self . on_body_response ( hashes, bodies) ;
629- self . check_sync_variable ( ) ;
630668 }
631- _ => unimplemented ! ( ) ,
632- } ,
669+ self . dismiss_request ( from, id) ;
670+ self . on_body_response ( hashes, bodies) ;
671+ self . check_sync_variable ( ) ;
672+ }
673+ _ => unimplemented ! ( ) ,
633674 }
634675 }
635676 }
@@ -699,42 +740,61 @@ impl Extension {
699740
700741 fn on_header_response ( & mut self , from : & NodeId , headers : & [ Header ] ) {
701742 ctrace ! ( SYNC , "Received header response from({}) with length({})" , from, headers. len( ) ) ;
702- let ( mut completed, pivot_score_changed) = if let Some ( peer) = self . header_downloaders . get_mut ( from) {
703- let before_pivot_score = peer. pivot_score ( ) ;
704- let encoded: Vec < _ > = headers. iter ( ) . map ( |h| EncodedHeader :: new ( h. rlp_bytes ( ) . to_vec ( ) ) ) . collect ( ) ;
705- peer. import_headers ( & encoded) ;
706- let after_pivot_score = peer. pivot_score ( ) ;
707- ( peer. downloaded ( ) , before_pivot_score != after_pivot_score)
708- } else {
709- ( Vec :: new ( ) , false )
710- } ;
711- completed. sort_unstable_by_key ( EncodedHeader :: number) ;
712-
713- let mut exists = Vec :: new ( ) ;
714- let mut queued = Vec :: new ( ) ;
715-
716- for header in completed {
717- let hash = header. hash ( ) ;
718- match self . client . import_header ( header. clone ( ) . into_inner ( ) ) {
719- Err ( BlockImportError :: Import ( ImportError :: AlreadyInChain ) ) => exists. push ( hash) ,
720- Err ( BlockImportError :: Import ( ImportError :: AlreadyQueued ) ) => queued. push ( hash) ,
721- // FIXME: handle import errors
722- Err ( err) => {
723- cwarn ! ( SYNC , "Cannot import header({}): {:?}" , header. hash( ) , err) ;
724- break
743+ match self . state {
744+ State :: SnapshotHeader ( ..) => {
745+ for header in headers {
746+ match self . client . import_header ( header. rlp_bytes ( ) . to_vec ( ) ) {
747+ Err ( BlockImportError :: Import ( ImportError :: AlreadyInChain ) ) => { }
748+ Err ( BlockImportError :: Import ( ImportError :: AlreadyQueued ) ) => { }
749+ // FIXME: handle import errors
750+ Err ( err) => {
751+ cwarn ! ( SYNC , "Cannot import header({}): {:?}" , header. hash( ) , err) ;
752+ break
753+ }
754+ _ => { }
755+ }
725756 }
726- _ => { }
727757 }
728- }
758+ State :: SnapshotChunk ( ..) => { }
759+ State :: Full => {
760+ let ( mut completed, pivot_score_changed) = if let Some ( peer) = self . header_downloaders . get_mut ( from) {
761+ let before_pivot_score = peer. pivot_score ( ) ;
762+ let encoded: Vec < _ > = headers. iter ( ) . map ( |h| EncodedHeader :: new ( h. rlp_bytes ( ) . to_vec ( ) ) ) . collect ( ) ;
763+ peer. import_headers ( & encoded) ;
764+ let after_pivot_score = peer. pivot_score ( ) ;
765+ ( peer. downloaded ( ) , before_pivot_score != after_pivot_score)
766+ } else {
767+ ( Vec :: new ( ) , false )
768+ } ;
769+ completed. sort_unstable_by_key ( EncodedHeader :: number) ;
729770
730- let request = self . header_downloaders . get_mut ( from) . and_then ( |peer| {
731- peer. mark_as_queued ( queued) ;
732- peer. mark_as_imported ( exists) ;
733- peer. create_request ( )
734- } ) ;
735- if pivot_score_changed {
736- if let Some ( request) = request {
737- self . send_header_request ( from, request) ;
771+ let mut exists = Vec :: new ( ) ;
772+ let mut queued = Vec :: new ( ) ;
773+
774+ for header in completed {
775+ let hash = header. hash ( ) ;
776+ match self . client . import_header ( header. clone ( ) . into_inner ( ) ) {
777+ Err ( BlockImportError :: Import ( ImportError :: AlreadyInChain ) ) => exists. push ( hash) ,
778+ Err ( BlockImportError :: Import ( ImportError :: AlreadyQueued ) ) => queued. push ( hash) ,
779+ // FIXME: handle import errors
780+ Err ( err) => {
781+ cwarn ! ( SYNC , "Cannot import header({}): {:?}" , header. hash( ) , err) ;
782+ break
783+ }
784+ _ => { }
785+ }
786+ }
787+
788+ let request = self . header_downloaders . get_mut ( from) . and_then ( |peer| {
789+ peer. mark_as_queued ( queued) ;
790+ peer. mark_as_imported ( exists) ;
791+ peer. create_request ( )
792+ } ) ;
793+ if pivot_score_changed {
794+ if let Some ( request) = request {
795+ self . send_header_request ( from, request) ;
796+ }
797+ }
738798 }
739799 }
740800 }
0 commit comments