@@ -24,17 +24,17 @@ use std::time::Duration;
2424use ccore:: encoded:: Header as EncodedHeader ;
2525use ccore:: {
2626 Block , BlockChainClient , BlockChainTrait , BlockId , BlockImportError , BlockStatus , ChainNotify , Client , ImportBlock ,
27- ImportError , UnverifiedTransaction ,
27+ ImportError , StateInfo , UnverifiedTransaction ,
2828} ;
2929use cmerkle:: snapshot:: ChunkDecompressor ;
3030use cmerkle:: snapshot:: Restore as SnapshotRestore ;
3131use cmerkle:: { skewed_merkle_root, TrieFactory } ;
3232use cnetwork:: { Api , EventSender , NetworkExtension , NodeId } ;
33- use cstate:: FindActionHandler ;
33+ use cstate:: { FindActionHandler , TopLevelState , TopStateView } ;
3434use ctimer:: TimerToken ;
3535use ctypes:: header:: { Header , Seal } ;
3636use ctypes:: transaction:: Action ;
37- use ctypes:: { BlockHash , BlockNumber } ;
37+ use ctypes:: { BlockHash , BlockNumber , ShardId } ;
3838use hashdb:: AsHashDB ;
3939use kvdb:: DBTransaction ;
4040use primitives:: { H256 , U256 } ;
@@ -68,10 +68,15 @@ enum State {
6868 header : EncodedHeader ,
6969 prev_root : H256 ,
7070 } ,
71- SnapshotChunk {
71+ SnapshotTopChunk {
7272 block : BlockHash ,
7373 restore : SnapshotRestore ,
7474 } ,
75+ SnapshotShardChunk {
76+ block : BlockHash ,
77+ shard_id : ShardId ,
78+ restore : SnapshotRestore ,
79+ } ,
7580 Full ,
7681}
7782
@@ -82,7 +87,7 @@ impl State {
8287 None => return State :: Full ,
8388 } ;
8489 let header = match client. block_header ( & num. into ( ) ) {
85- Some ( ref h) if h. hash ( ) == hash => h. clone ( ) ,
90+ Some ( h) if h. hash ( ) == hash => h,
8691 _ => return State :: SnapshotHeader ( hash, num) ,
8792 } ;
8893 if client. block_body ( & hash. into ( ) ) . is_none ( ) {
@@ -97,13 +102,35 @@ impl State {
97102
98103 let state_db = client. state_db ( ) . read ( ) ;
99104 let state_root = header. state_root ( ) ;
100- match TrieFactory :: readonly ( state_db. as_hashdb ( ) , & state_root) {
101- Ok ( ref trie ) if trie . is_complete ( ) => State :: Full ,
102- _ => State :: SnapshotChunk {
105+ let top_trie = TrieFactory :: readonly ( state_db. as_hashdb ( ) , & state_root) ;
106+ if !top_trie . map ( |t| t . is_complete ( ) ) . unwrap_or ( false ) {
107+ return State :: SnapshotTopChunk {
103108 block : hash,
104109 restore : SnapshotRestore :: new ( state_root) ,
105- } ,
110+ }
111+ }
112+
113+ let top_state = client. state_at ( hash. into ( ) ) . expect ( "Top level state at the snapshot header exists" ) ;
114+ let metadata = top_state. metadata ( ) . unwrap ( ) . expect ( "Metadata must exist for the snapshot block" ) ;
115+ let shard_num = * metadata. number_of_shards ( ) ;
116+ let empty_shard = ( 0 ..shard_num) . find_map ( |n| {
117+ let shard_root = top_state. shard_root ( n) . unwrap ( ) . expect ( "Shard root must exist" ) ;
118+ let trie = TrieFactory :: readonly ( state_db. as_hashdb ( ) , & shard_root) ;
119+ if !trie. map ( |t| t. is_complete ( ) ) . unwrap_or ( false ) {
120+ Some ( ( n, shard_root) )
121+ } else {
122+ None
123+ }
124+ } ) ;
125+ if let Some ( ( shard_id, shard_root) ) = empty_shard {
126+ return State :: SnapshotShardChunk {
127+ block : hash,
128+ shard_id,
129+ restore : SnapshotRestore :: new ( shard_root) ,
130+ }
106131 }
132+
133+ State :: Full
107134 }
108135
109136 fn next ( & self , client : & Client ) -> Self {
@@ -121,13 +148,48 @@ impl State {
121148 State :: SnapshotBody {
122149 header,
123150 ..
124- } => State :: SnapshotChunk {
151+ } => State :: SnapshotTopChunk {
125152 block : header. hash ( ) ,
126153 restore : SnapshotRestore :: new ( header. state_root ( ) ) ,
127154 } ,
128- State :: SnapshotChunk {
155+ State :: SnapshotTopChunk {
156+ block,
129157 ..
130- } => State :: Full ,
158+ } => {
159+ let header = client. block_header ( & ( * block) . into ( ) ) . expect ( "Snapshot header must exist" ) ;
160+ let state_root = header. state_root ( ) ;
161+ let state_db = client. state_db ( ) . read ( ) ;
162+ let top_state = TopLevelState :: from_existing ( state_db. clone ( & state_root) , state_root) . unwrap ( ) ;
163+ let shard_root = top_state. shard_root ( 0 ) . unwrap ( ) . expect ( "Shard 0 always exists" ) ;
164+ State :: SnapshotShardChunk {
165+ block : * block,
166+ shard_id : 0 ,
167+ restore : SnapshotRestore :: new ( shard_root) ,
168+ }
169+ }
170+ State :: SnapshotShardChunk {
171+ block,
172+ shard_id,
173+ ..
174+ } => {
175+ let top_state = client. state_at ( ( * block) . into ( ) ) . expect ( "State at the snapshot header must exist" ) ;
176+ let metadata = top_state. metadata ( ) . unwrap ( ) . expect ( "Metadata must exist for snapshot block" ) ;
177+ let shard_num = * metadata. number_of_shards ( ) ;
178+ if shard_id + 1 == shard_num {
179+ State :: Full
180+ } else {
181+ let next_shard = shard_id + 1 ;
182+ let shard_root = top_state
183+ . shard_root ( next_shard)
184+ . expect ( "Top level state must be valid" )
185+ . expect ( "Shard root must exist" ) ;
186+ State :: SnapshotShardChunk {
187+ block : * block,
188+ shard_id : next_shard,
189+ restore : SnapshotRestore :: new ( shard_root) ,
190+ }
191+ }
192+ }
131193 State :: Full => State :: Full ,
132194 }
133195 }
@@ -206,11 +268,15 @@ impl Extension {
206268 header,
207269 ..
208270 } => header. hash ( ) ,
209- State :: SnapshotChunk {
271+ State :: SnapshotTopChunk {
272+ block,
273+ ..
274+ } => * block,
275+ State :: SnapshotShardChunk {
210276 block,
211277 ..
212278 } => * block,
213- State :: Full => unreachable ! ( "Trying to transition state from State::Full" ) ,
279+ State :: Full => panic ! ( "Trying to transit the state from State::Full" ) ,
214280 } ;
215281 self . client . force_update_best_block ( & best_hash) ;
216282 for downloader in self . header_downloaders . values_mut ( ) {
@@ -522,9 +588,20 @@ impl NetworkExtension<Event> for Extension {
522588 }
523589 }
524590 }
525- State :: SnapshotChunk {
591+ State :: SnapshotTopChunk {
592+ block,
593+ ref mut restore,
594+ } => {
595+ if let Some ( root) = restore. next_to_feed ( ) {
596+ self . send_chunk_request ( & block, & root) ;
597+ } else {
598+ self . move_state ( ) ;
599+ }
600+ }
601+ State :: SnapshotShardChunk {
526602 block,
527603 ref mut restore,
604+ ..
528605 } => {
529606 if let Some ( root) = restore. next_to_feed ( ) {
530607 self . send_chunk_request ( & block, & root) ;
@@ -925,12 +1002,6 @@ impl Extension {
9251002 headers. len( )
9261003 ) ,
9271004 } ,
928- State :: SnapshotBody {
929- ..
930- } => { }
931- State :: SnapshotChunk {
932- ..
933- } => { }
9341005 State :: Full => {
9351006 let ( mut completed, peer_is_caught_up) = if let Some ( peer) = self . header_downloaders . get_mut ( from) {
9361007 let encoded: Vec < _ > = headers. iter ( ) . map ( |h| EncodedHeader :: new ( h. rlp_bytes ( ) . to_vec ( ) ) ) . collect ( ) ;
@@ -969,6 +1040,7 @@ impl Extension {
9691040 }
9701041 }
9711042 }
1043+ _ => { }
9721044 }
9731045 }
9741046
@@ -1044,12 +1116,18 @@ impl Extension {
10441116
10451117 fn on_chunk_response ( & mut self , from : & NodeId , roots : & [ H256 ] , chunks : & [ Vec < u8 > ] ) {
10461118 let ( block, restore) = match self . state {
1047- State :: SnapshotChunk {
1119+ State :: SnapshotTopChunk {
10481120 block,
10491121 ref mut restore,
10501122 } => ( block, restore) ,
1123+ State :: SnapshotShardChunk {
1124+ block,
1125+ ref mut restore,
1126+ ..
1127+ } => ( block, restore) ,
10511128 _ => return ,
10521129 } ;
1130+ assert_eq ! ( roots. len( ) , chunks. len( ) ) ;
10531131 for ( r, c) in roots. iter ( ) . zip ( chunks) {
10541132 if c. is_empty ( ) {
10551133 cdebug ! ( SYNC , "Peer {} sent empty response for chunk request {}" , from, r) ;
0 commit comments