Skip to content

Commit 8afcad4

Browse files
remagpieforiequal0
authored andcommitted
Send and receive snapshot chunk requests
1 parent 412eeaa commit 8afcad4

File tree

4 files changed

+189
-36
lines changed

4 files changed

+189
-36
lines changed

spec/Block-Synchronization-Extension.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,4 +110,4 @@ Response to `GetStateChunk` message. Snappy algorithm is used for compression of
110110
* Restriction:
111111
* Number and order of chunks included in this message MUST be equal to request information.
112112
* Node corresponding to `chunk_root` in request MUST be included
113-
* If sender doesn’t have a chunk for the requested hash, corresponding chunk MUST be compressed([]), not omitted.
113+
* If sender doesn’t have a chunk for the requested hash, corresponding chunk MUST be `[]`(uncompressed), not omitted.

sync/src/block/extension.rs

Lines changed: 169 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ use ccore::{
2424
Block, BlockChainClient, BlockChainTrait, BlockId, BlockImportError, BlockStatus, ChainNotify, Client, ImportBlock,
2525
ImportError, UnverifiedTransaction,
2626
};
27+
use cmerkle::snapshot::ChunkDecompressor;
28+
use cmerkle::snapshot::Restore as SnapshotRestore;
2729
use cmerkle::TrieFactory;
2830
use cnetwork::{Api, EventSender, NetworkExtension, NodeId};
2931
use cstate::FindActionHandler;
@@ -32,6 +34,7 @@ use ctypes::header::{Header, Seal};
3234
use ctypes::transaction::Action;
3335
use ctypes::{BlockHash, BlockNumber};
3436
use hashdb::AsHashDB;
37+
use kvdb::DBTransaction;
3538
use primitives::{H256, U256};
3639
use rand::prelude::SliceRandom;
3740
use rand::thread_rng;
@@ -58,7 +61,10 @@ pub struct TokenInfo {
5861
#[derive(Debug)]
5962
enum State {
6063
SnapshotHeader(BlockHash, u64),
61-
SnapshotChunk(H256),
64+
SnapshotChunk {
65+
block: BlockHash,
66+
restore: SnapshotRestore,
67+
},
6268
Full,
6369
}
6470

@@ -85,9 +91,13 @@ impl Extension {
8591
Some((hash, num)) => match client.block_header(&BlockId::Number(num)) {
8692
Some(ref header) if *header.hash() == hash => {
8793
let state_db = client.state_db().read();
88-
match TrieFactory::readonly(state_db.as_hashdb(), &header.state_root()) {
94+
let state_root = header.state_root();
95+
match TrieFactory::readonly(state_db.as_hashdb(), &state_root) {
8996
Ok(ref trie) if trie.is_complete() => State::Full,
90-
_ => State::SnapshotChunk(*header.hash()),
97+
_ => State::SnapshotChunk {
98+
block: hash.into(),
99+
restore: SnapshotRestore::new(state_root),
100+
},
91101
}
92102
}
93103
_ => State::SnapshotHeader(hash.into(), num),
@@ -187,6 +197,37 @@ impl Extension {
187197
self.check_sync_variable();
188198
}
189199

200+
fn send_chunk_request(&mut self, block: &BlockHash, root: &H256) {
201+
let have_chunk_request = self.requests.values().flatten().any(|r| match r {
202+
(_, RequestMessage::StateChunk(..)) => true,
203+
_ => false,
204+
});
205+
206+
if !have_chunk_request {
207+
let mut peer_ids: Vec<_> = self.header_downloaders.keys().cloned().collect();
208+
peer_ids.shuffle(&mut thread_rng());
209+
if let Some(id) = peer_ids.first() {
210+
if let Some(requests) = self.requests.get_mut(&id) {
211+
let req = RequestMessage::StateChunk(*block, vec![*root]);
212+
cdebug!(SYNC, "Request chunk to {} {:?}", id, req);
213+
let request_id = self.last_request;
214+
self.last_request += 1;
215+
requests.push((request_id, req.clone()));
216+
self.api.send(id, Arc::new(Message::Request(request_id, req).rlp_bytes().into_vec()));
217+
218+
let token = &self.tokens[id];
219+
let token_info = self.tokens_info.get_mut(token).unwrap();
220+
221+
let _ = self.api.clear_timer(*token);
222+
self.api
223+
.set_timer_once(*token, Duration::from_millis(SYNC_EXPIRE_REQUEST_INTERVAL))
224+
.expect("Timer set succeeds");
225+
token_info.request_id = Some(request_id);
226+
}
227+
}
228+
}
229+
}
230+
190231
fn check_sync_variable(&self) {
191232
let mut has_error = false;
192233
for id in self.header_downloaders.keys() {
@@ -203,6 +244,14 @@ impl Extension {
203244
})
204245
.collect();
205246

247+
let chunk_requests: Vec<RequestMessage> = requests
248+
.iter()
249+
.filter_map(|r| match r {
250+
(_, RequestMessage::StateChunk(..)) => Some(r.1.clone()),
251+
_ => None,
252+
})
253+
.collect();
254+
206255
if body_requests.len() > 1 {
207256
cerror!(SYNC, "Body request length {} > 1, body_requests: {:?}", body_requests.len(), body_requests);
208257
has_error = true;
@@ -211,16 +260,18 @@ impl Extension {
211260
let token = &self.tokens[id];
212261
let token_info = &self.tokens_info[token];
213262

214-
match (token_info.request_id, body_requests.len()) {
263+
match (token_info.request_id, body_requests.len() + chunk_requests.len()) {
215264
(Some(_), 1) => {}
216265
(None, 0) => {}
217266
_ => {
218267
cerror!(
219268
SYNC,
220-
"request_id: {:?}, body_requests.len(): {}, body_requests: {:?}",
269+
"request_id: {:?}, body_requests.len(): {}, body_requests: {:?}, chunk_requests.len(): {}, chunk_requests: {:?}",
221270
token_info.request_id,
222271
body_requests.len(),
223-
body_requests
272+
body_requests,
273+
chunk_requests.len(),
274+
chunk_requests
224275
);
225276
has_error = true;
226277
}
@@ -335,7 +386,17 @@ impl NetworkExtension<Event> for Extension {
335386
});
336387
}
337388
}
338-
State::SnapshotChunk(..) => unimplemented!(),
389+
State::SnapshotChunk {
390+
block,
391+
ref mut restore,
392+
} => {
393+
if let Some(root) = restore.next_to_feed() {
394+
self.send_chunk_request(&block, &root);
395+
} else {
396+
cdebug!(SYNC, "Transitioning state to {:?}", State::Full);
397+
self.state = State::Full;
398+
}
399+
}
339400
State::Full => {
340401
for id in &peer_ids {
341402
let request =
@@ -431,12 +492,17 @@ impl Extension {
431492
State::SnapshotHeader(hash, ..) => {
432493
if imported.contains(&hash) {
433494
let header = self.client.block_header(&BlockId::Hash(hash)).expect("Imported header must exist");
434-
Some(State::SnapshotChunk(header.state_root()))
495+
Some(State::SnapshotChunk {
496+
block: hash,
497+
restore: SnapshotRestore::new(header.state_root()),
498+
})
435499
} else {
436500
None
437501
}
438502
}
439-
State::SnapshotChunk(..) => unimplemented!(),
503+
State::SnapshotChunk {
504+
..
505+
} => None,
440506
State::Full => {
441507
let peer_ids: Vec<_> = self.header_downloaders.keys().cloned().collect();
442508
for id in peer_ids {
@@ -478,12 +544,17 @@ impl Extension {
478544
State::SnapshotHeader(hash, ..) => {
479545
if imported.contains(&hash) {
480546
let header = self.client.block_header(&BlockId::Hash(hash)).expect("Imported header must exist");
481-
Some(State::SnapshotChunk(header.state_root()))
547+
Some(State::SnapshotChunk {
548+
block: hash,
549+
restore: SnapshotRestore::new(header.state_root()),
550+
})
482551
} else {
483552
None
484553
}
485554
}
486-
State::SnapshotChunk(..) => None,
555+
State::SnapshotChunk {
556+
..
557+
} => None,
487558
State::Full => {
488559
self.body_downloader.remove_target(&imported);
489560
self.body_downloader.remove_target(&invalid);
@@ -576,7 +647,7 @@ impl Extension {
576647
RequestMessage::Bodies(hashes) => !hashes.is_empty(),
577648
RequestMessage::StateChunk {
578649
..
579-
} => unimplemented!(),
650+
} => true,
580651
}
581652
}
582653

@@ -655,7 +726,24 @@ impl Extension {
655726
self.on_body_response(hashes, bodies);
656727
self.check_sync_variable();
657728
}
658-
ResponseMessage::StateChunk(..) => unimplemented!(),
729+
ResponseMessage::StateChunk(chunks) => {
730+
let roots = match request {
731+
RequestMessage::StateChunk(_, roots) => roots,
732+
_ => unreachable!(),
733+
};
734+
if let Some(token) = self.tokens.get(from) {
735+
if let Some(token_info) = self.tokens_info.get_mut(token) {
736+
if token_info.request_id.is_none() {
737+
ctrace!(SYNC, "Expired before handling response");
738+
return
739+
}
740+
self.api.clear_timer(*token).expect("Timer clear succeed");
741+
token_info.request_id = None;
742+
}
743+
}
744+
self.dismiss_request(from, id);
745+
self.on_chunk_response(from, &roots, &chunks);
746+
}
659747
}
660748
}
661749
}
@@ -709,12 +797,10 @@ impl Extension {
709797
}
710798
true
711799
}
712-
(
713-
RequestMessage::StateChunk {
714-
..
715-
},
716-
ResponseMessage::StateChunk(..),
717-
) => unimplemented!(),
800+
(RequestMessage::StateChunk(_, roots), ResponseMessage::StateChunk(chunks)) => {
801+
// Check length
802+
roots.len() == chunks.len()
803+
}
718804
_ => {
719805
cwarn!(SYNC, "Invalid response type");
720806
false
@@ -729,7 +815,10 @@ impl Extension {
729815
[header] if header.hash() == hash => {
730816
match self.client.import_bootstrap_header(&header) {
731817
Err(BlockImportError::Import(ImportError::AlreadyInChain)) => {
732-
self.state = State::SnapshotChunk(*header.state_root());
818+
self.state = State::SnapshotChunk {
819+
block: hash,
820+
restore: SnapshotRestore::new(*header.state_root()),
821+
};
733822
}
734823
Err(BlockImportError::Import(ImportError::AlreadyQueued)) => {}
735824
// FIXME: handle import errors
@@ -747,7 +836,9 @@ impl Extension {
747836
headers.len()
748837
),
749838
},
750-
State::SnapshotChunk(..) => {}
839+
State::SnapshotChunk {
840+
..
841+
} => {}
751842
State::Full => {
752843
let (mut completed, peer_is_caught_up) = if let Some(peer) = self.header_downloaders.get_mut(from) {
753844
let encoded: Vec<_> = headers.iter().map(|h| EncodedHeader::new(h.rlp_bytes().to_vec())).collect();
@@ -829,6 +920,63 @@ impl Extension {
829920
self.send_body_request(&id);
830921
}
831922
}
923+
924+
fn on_chunk_response(&mut self, from: &NodeId, roots: &[H256], chunks: &[Vec<u8>]) {
925+
if let State::SnapshotChunk {
926+
block,
927+
ref mut restore,
928+
} = self.state
929+
{
930+
for (r, c) in roots.iter().zip(chunks) {
931+
if c.is_empty() {
932+
cdebug!(SYNC, "Peer {} sent empty response for chunk request {}", from, r);
933+
continue
934+
}
935+
let decompressor = ChunkDecompressor::from_slice(c);
936+
let raw_chunk = match decompressor.decompress() {
937+
Ok(chunk) => chunk,
938+
Err(e) => {
939+
cwarn!(SYNC, "Decode failed for chunk response from peer {}: {}", from, e);
940+
continue
941+
}
942+
};
943+
let recovered = match raw_chunk.recover(*r) {
944+
Ok(chunk) => chunk,
945+
Err(e) => {
946+
cwarn!(SYNC, "Invalid chunk response from peer {}: {}", from, e);
947+
continue
948+
}
949+
};
950+
951+
let batch = {
952+
let mut state_db = self.client.state_db().write();
953+
let hash_db = state_db.as_hashdb_mut();
954+
restore.feed(hash_db, recovered);
955+
956+
let mut batch = DBTransaction::new();
957+
match state_db.journal_under(&mut batch, 0, H256::zero()) {
958+
Ok(_) => batch,
959+
Err(e) => {
960+
cwarn!(SYNC, "Failed to write state chunk to database: {}", e);
961+
continue
962+
}
963+
}
964+
};
965+
self.client.db().write_buffered(batch);
966+
match self.client.db().flush() {
967+
Ok(_) => cdebug!(SYNC, "Wrote state chunk to database: {}", r),
968+
Err(e) => cwarn!(SYNC, "Failed to flush database: {}", e),
969+
}
970+
}
971+
972+
if let Some(root) = restore.next_to_feed() {
973+
self.send_chunk_request(&block, &root);
974+
} else {
975+
cdebug!(SYNC, "Transitioning state to {:?}", State::Full);
976+
self.state = State::Full;
977+
}
978+
}
979+
}
832980
}
833981

834982
pub struct BlockSyncSender(EventSender<Event>);

util/merkle/src/snapshot/compress.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ impl<R> ChunkDecompressor<R> {
3535
}
3636

3737
impl<'a> ChunkDecompressor<Cursor<&'a [u8]>> {
38-
fn from_slice(slice: &'a [u8]) -> Self {
38+
pub fn from_slice(slice: &'a [u8]) -> Self {
3939
ChunkDecompressor::new(Cursor::new(slice))
4040
}
4141
}

0 commit comments

Comments
 (0)