@@ -27,7 +27,7 @@ use ccore::{
2727} ;
2828use cmerkle:: snapshot:: ChunkDecompressor ;
2929use cmerkle:: snapshot:: Restore as SnapshotRestore ;
30- use cmerkle:: TrieFactory ;
30+ use cmerkle:: { skewed_merkle_root , TrieFactory } ;
3131use cnetwork:: { Api , EventSender , NetworkExtension , NodeId } ;
3232use cstate:: FindActionHandler ;
3333use ctimer:: TimerToken ;
@@ -64,7 +64,7 @@ pub struct TokenInfo {
6464enum State {
6565 SnapshotHeader ( BlockHash , u64 ) ,
6666 SnapshotBody {
67- block : BlockHash ,
67+ header : EncodedHeader ,
6868 prev_root : H256 ,
6969 } ,
7070 SnapshotChunk {
@@ -151,7 +151,7 @@ impl Extension {
151151 let parent =
152152 client. block_header ( & parent_hash. into ( ) ) . expect ( "Parent header of the snapshot header must exist" ) ;
153153 return State :: SnapshotBody {
154- block : hash ,
154+ header ,
155155 prev_root : parent. transactions_root ( ) ,
156156 }
157157 }
@@ -437,8 +437,29 @@ impl NetworkExtension<Event> for Extension {
437437 }
438438 }
439439 State :: SnapshotBody {
440+ ref header,
440441 ..
441- } => unimplemented ! ( ) ,
442+ } => {
443+ for id in & peer_ids {
444+ if let Some ( requests) = self . requests . get_mut ( id) {
445+ ctrace ! ( SYNC , "Send snapshot body request to {}" , id) ;
446+ let request = RequestMessage :: Bodies ( vec ! [ header. hash( ) ] ) ;
447+ let request_id = self . last_request ;
448+ self . last_request += 1 ;
449+ requests. push ( ( request_id, request. clone ( ) ) ) ;
450+ self . api . send ( id, Arc :: new ( Message :: Request ( request_id, request) . rlp_bytes ( ) ) ) ;
451+
452+ let token = & self . tokens [ id] ;
453+ let token_info = self . tokens_info . get_mut ( token) . unwrap ( ) ;
454+
455+ let _ = self . api . clear_timer ( * token) ;
456+ self . api
457+ . set_timer_once ( * token, Duration :: from_millis ( SYNC_EXPIRE_REQUEST_INTERVAL ) )
458+ . expect ( "Timer set succeeds" ) ;
459+ token_info. request_id = Some ( request_id) ;
460+ }
461+ }
462+ }
442463 State :: SnapshotChunk {
443464 block,
444465 ref mut restore,
@@ -815,20 +836,11 @@ impl Extension {
815836 match self . state {
816837 State :: SnapshotHeader ( hash, _) => match headers {
817838 [ parent, header] if header. hash ( ) == hash => {
818- match self . client . import_bootstrap_header ( & header) {
819- Ok ( _) | Err ( BlockImportError :: Import ( ImportError :: AlreadyInChain ) ) => {
820- self . state = State :: SnapshotBody {
821- block : hash,
822- prev_root : * parent. transactions_root ( ) ,
823- } ;
824- cdebug ! ( SYNC , "Transitioning state to {:?}" , self . state) ;
825- }
826- Err ( BlockImportError :: Import ( ImportError :: AlreadyQueued ) ) => { }
827- // FIXME: handle import errors
828- Err ( err) => {
829- cwarn ! ( SYNC , "Cannot import header({}): {:?}" , header. hash( ) , err) ;
830- }
831- }
839+ self . state = State :: SnapshotBody {
840+ header : EncodedHeader :: new ( header. rlp_bytes ( ) . to_vec ( ) ) ,
841+ prev_root : * parent. transactions_root ( ) ,
842+ } ;
843+ cdebug ! ( SYNC , "Transitioning state to {:?}" , self . state) ;
832844 }
833845 _ => cdebug ! (
834846 SYNC ,
@@ -887,42 +899,75 @@ impl Extension {
887899
888900 fn on_body_response ( & mut self , hashes : Vec < BlockHash > , bodies : Vec < Vec < UnverifiedTransaction > > ) {
889901 ctrace ! ( SYNC , "Received body response with lenth({}) {:?}" , hashes. len( ) , hashes) ;
890- {
891- self . body_downloader . import_bodies ( hashes , bodies ) ;
892- let completed = self . body_downloader . drain ( ) ;
893- for ( hash , transactions ) in completed {
894- let header = self
895- . client
896- . block_header ( & BlockId :: Hash ( hash ) )
897- . expect ( "Downloaded body's header must exist" )
898- . decode ( ) ;
899- let block = Block {
900- header,
901- transactions,
902- } ;
903- cdebug ! ( SYNC , "Body download completed for #{}({})" , block . header . number ( ) , hash ) ;
904- match self . client . import_block ( block . rlp_bytes ( & Seal :: With ) ) {
905- Err ( BlockImportError :: Import ( ImportError :: AlreadyInChain ) ) => {
906- cwarn ! ( SYNC , "Downloaded already existing block({})" , hash)
907- }
908- Err ( BlockImportError :: Import ( ImportError :: AlreadyQueued ) ) => {
909- cwarn ! ( SYNC , "Downloaded already queued in the verification queue({}) " , hash )
910- }
911- Err ( err ) => {
902+
903+ match self . state {
904+ State :: SnapshotBody {
905+ ref header ,
906+ prev_root ,
907+ } => {
908+ let body = bodies . first ( ) . expect ( "Body response in SnapshotBody state has only one body" ) ;
909+ let new_root = skewed_merkle_root ( prev_root , body. iter ( ) . map ( Encodable :: rlp_bytes ) ) ;
910+ if header . transactions_root ( ) == new_root {
911+ let block = Block {
912+ header : header . decode ( ) ,
913+ transactions : body . clone ( ) ,
914+ } ;
915+ match self . client . import_bootstrap_block ( & block ) {
916+ Ok ( _ ) | Err ( BlockImportError :: Import ( ImportError :: AlreadyInChain ) ) => {
917+ self . state = State :: SnapshotChunk {
918+ block : header . hash ( ) ,
919+ restore : SnapshotRestore :: new ( header . state_root ( ) ) ,
920+ } ;
921+ cdebug ! ( SYNC , "Transitioning state to {:?} " , self . state ) ;
922+ }
923+ Err ( BlockImportError :: Import ( ImportError :: AlreadyQueued ) ) => { }
912924 // FIXME: handle import errors
913- cwarn ! ( SYNC , "Cannot import block({}): {:?}" , hash, err) ;
914- break
925+ Err ( err) => {
926+ cwarn ! ( SYNC , "Cannot import header({}): {:?}" , header. hash( ) , err) ;
927+ }
915928 }
916- _ => { }
917929 }
918930 }
919- }
931+ State :: Full => {
932+ {
933+ self . body_downloader . import_bodies ( hashes, bodies) ;
934+ let completed = self . body_downloader . drain ( ) ;
935+ for ( hash, transactions) in completed {
936+ let header = self
937+ . client
938+ . block_header ( & BlockId :: Hash ( hash) )
939+ . expect ( "Downloaded body's header must exist" )
940+ . decode ( ) ;
941+ let block = Block {
942+ header,
943+ transactions,
944+ } ;
945+ cdebug ! ( SYNC , "Body download completed for #{}({})" , block. header. number( ) , hash) ;
946+ match self . client . import_block ( block. rlp_bytes ( & Seal :: With ) ) {
947+ Err ( BlockImportError :: Import ( ImportError :: AlreadyInChain ) ) => {
948+ cwarn ! ( SYNC , "Downloaded already existing block({})" , hash)
949+ }
950+ Err ( BlockImportError :: Import ( ImportError :: AlreadyQueued ) ) => {
951+ cwarn ! ( SYNC , "Downloaded already queued in the verification queue({})" , hash)
952+ }
953+ Err ( err) => {
954+ // FIXME: handle import errors
955+ cwarn ! ( SYNC , "Cannot import block({}): {:?}" , hash, err) ;
956+ break
957+ }
958+ _ => { }
959+ }
960+ }
961+ }
920962
921- let mut peer_ids: Vec < _ > = self . header_downloaders . keys ( ) . cloned ( ) . collect ( ) ;
922- peer_ids. shuffle ( & mut thread_rng ( ) ) ;
963+ let mut peer_ids: Vec < _ > = self . header_downloaders . keys ( ) . cloned ( ) . collect ( ) ;
964+ peer_ids. shuffle ( & mut thread_rng ( ) ) ;
923965
924- for id in peer_ids {
925- self . send_body_request ( & id) ;
966+ for id in peer_ids {
967+ self . send_body_request ( & id) ;
968+ }
969+ }
970+ _ => { }
926971 }
927972 }
928973
0 commit comments