diff --git a/core/src/lib.rs b/core/src/lib.rs index 6056a35926..fc3a0dc0e9 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -76,4 +76,4 @@ pub use crate::service::ClientService; pub use crate::transaction::{ LocalizedTransaction, PendingSignedTransactions, SignedTransaction, UnverifiedTransaction, }; -pub use crate::types::{BlockId, TransactionId}; +pub use crate::types::{BlockId, BlockStatus, TransactionId}; diff --git a/spec/Block-Synchronization-Extension.md b/spec/Block-Synchronization-Extension.md index fb5a106f75..1cfc536f83 100644 --- a/spec/Block-Synchronization-Extension.md +++ b/spec/Block-Synchronization-Extension.md @@ -19,13 +19,14 @@ Message := ### Status ``` -Status(total_score, best_hash, genesis_hash) +Status(seq, best_hash, genesis_hash) ``` Send current chain status to peer. * Identifier: 0x01 -* Restriction: None +* Restriction: + * `seq` SHOULD be monotonically increasing every time the message is sent. ## Request messages @@ -53,32 +54,17 @@ Request corresponding bodies for each hash. * Restriction: * MUST include at least one item - -### GetStateHead - -``` -GetStateHead(block_hash) -``` - -Request corresponding state head for block of `block_hash`. - -* Identifier: 0x06 -* Restriction: Block number of requested block MUST be multiple of 214. - - ### GetStateChunk ``` -GetStateChunk(block_hash, tree_root) +GetStateChunk(block_hash, [...chunk_roots]) ``` -Request entire subtree starting from `tree_root`. +Request corresponding snapshot chunk for each `chunk_root`. -* Identifier: 0x08 +* Identifier: 0x0a * Restriction: - * Block number of requested block MUST be multiple of 214. - * `tree_root` MUST be included in requested block’s state trie. - * Depth of `tree_root` inside state trie MUST be equal to 2. (Depth of state root is 0) + * All values in `[...chunk_roots]` MUST be included in requested block’s state trie. ## Response messages @@ -113,30 +99,15 @@ Response to `GetBodies` message. Snappy algorithm is used to compress content. * If received body is zero-length array, it means either body value is [], or sender doesn’t have body for requested hash -### StateHead - -``` -StateHead(compressed((key_0, value_0), …) | []) -``` - -Response to `GetStateHead` message. Key and value included in this messages are raw value stored in state trie. Snappy algorithm is used for compression of content. - -* Identifier: 0x07 -* Restriction: - * State root of requested block MUST be included - * For all nodes with depth of less than 2 included in this message, all of its child MUST also be included. - * Content MUST be empty array if sender didn’t have requested data - - ### StateChunk ``` -StateChunk(compressed((key_0, value_0), …) | []) +StateChunk([compressed([terminal_0, …]) | EMPTY_BYTE, ...]) ``` -Response to `GetStateChunk` message. Details of message is same as `StateHead` message. +Response to `GetStateChunk` message. Snappy algorithm is used for compression of content. -* Identifier: 0x09 +* Identifier: 0x0b * Restriction: - * Node corresponding to tree_root in request MUST be included - * Every nodes included in message MUST have all of its child in same message. - * Content MUST be empty array if sender didn’t have requested data + * Number and order of chunks included in this message MUST be equal to request information. + * Node corresponding to `chunk_root` in request MUST be included + * If sender doesn’t have a chunk for the requested hash, corresponding chunk MUST be an uncompressed empty byte string, not omitted. diff --git a/sync/src/block/downloader/header.rs b/sync/src/block/downloader/header.rs index 963c2135e5..e8343682ad 100644 --- a/sync/src/block/downloader/header.rs +++ b/sync/src/block/downloader/header.rs @@ -31,21 +31,14 @@ const MAX_HEADER_QUEUE_LENGTH: usize = 1024; const MAX_RETRY: usize = 3; const MAX_WAIT: u64 = 15; -#[derive(Clone)] -struct Pivot { - hash: BlockHash, - total_score: U256, -} - #[derive(Clone)] pub struct HeaderDownloader { // NOTE: Use this member as minimum as possible. client: Arc, - total_score: U256, + seq: U256, best_hash: BlockHash, - - pivot: Pivot, + pivot: BlockHash, request_time: Option, downloaded: HashMap, queued: HashMap, @@ -53,24 +46,15 @@ pub struct HeaderDownloader { } impl HeaderDownloader { - pub fn total_score(&self) -> U256 { - self.total_score - } - - pub fn new(client: Arc, total_score: U256, best_hash: BlockHash) -> Self { + pub fn new(client: Arc, seq: U256, best_hash: BlockHash) -> Self { let best_header_hash = client.best_block_header().hash(); - let best_score = client.block_total_score(&BlockId::Latest).expect("Best block always exist"); Self { client, - total_score, + seq, best_hash, - - pivot: Pivot { - hash: best_header_hash, - total_score: best_score, - }, + pivot: best_header_hash, request_time: None, downloaded: HashMap::new(), queued: HashMap::new(), @@ -78,18 +62,19 @@ impl HeaderDownloader { } } - pub fn update(&mut self, total_score: U256, best_hash: BlockHash) -> bool { - match self.total_score.cmp(&total_score) { + pub fn best_hash(&self) -> BlockHash { + self.best_hash + } + + pub fn update(&mut self, seq: U256, best_hash: BlockHash) -> bool { + match self.seq.cmp(&seq) { Ordering::Equal => true, Ordering::Less => { - self.total_score = total_score; + self.seq = seq; self.best_hash = best_hash; if self.client.block_header(&BlockId::Hash(best_hash)).is_some() { - self.pivot = Pivot { - hash: best_hash, - total_score, - } + self.pivot = best_hash; } true } @@ -108,25 +93,25 @@ impl HeaderDownloader { /// Find header from queued headers, downloaded cache and then from blockchain /// Panics if header dosn't exist fn pivot_header(&self) -> Header { - match self.queued.get(&self.pivot.hash) { + match self.queued.get(&self.pivot) { Some(header) => header.clone(), - None => match self.downloaded.get(&self.pivot.hash) { + None => match self.downloaded.get(&self.pivot) { Some(header) => header.clone(), - None => self.client.block_header(&BlockId::Hash(self.pivot.hash)).unwrap(), + None => self.client.block_header(&BlockId::Hash(self.pivot)).unwrap(), }, } } - pub fn pivot_score(&self) -> U256 { - self.pivot.total_score - } - pub fn is_idle(&self) -> bool { - let can_request = self.request_time.is_none() && self.total_score > self.pivot.total_score; + let can_request = self.request_time.is_none() && self.best_hash != self.pivot; self.is_valid() && (can_request || self.is_expired()) } + pub fn is_caught_up(&self) -> bool { + self.pivot == self.best_hash + } + pub fn create_request(&mut self) -> Option { if !self.is_idle() { return None @@ -154,19 +139,15 @@ impl HeaderDownloader { let pivot_header = self.pivot_header(); // This happens when best_hash is imported by other peer. - if self.best_hash == self.pivot.hash { + if self.best_hash == self.pivot { ctrace!(SYNC, "Ignore received headers, pivot already reached the best hash"); - } else if first_header_hash == self.pivot.hash { + } else if first_header_hash == self.pivot { for header in headers.iter() { self.downloaded.insert(header.hash(), header.clone()); } // FIXME: skip known headers - let new_scores = headers[1..].iter().fold(U256::zero(), |acc, header| acc + header.score()); - self.pivot = Pivot { - hash: headers.last().expect("Last downloaded header must exist").hash(), - total_score: self.pivot.total_score + new_scores, - } + self.pivot = headers.last().expect("Last downloaded header must exist").hash(); } else if first_header_number < pivot_header.number() { ctrace!( SYNC, @@ -174,17 +155,14 @@ impl HeaderDownloader { ); } else if first_header_number == pivot_header.number() { if pivot_header.number() != 0 { - self.pivot = Pivot { - hash: pivot_header.parent_hash(), - total_score: self.pivot.total_score - pivot_header.score(), - } + self.pivot = pivot_header.parent_hash(); } } else { cerror!( SYNC, - "Invalid header update state. best_hash: {}, self.pivot.hash: {}, first_header_hash: {}", + "Invalid header update state. best_hash: {}, self.pivot: {}, first_header_hash: {}", self.best_hash, - self.pivot.hash, + self.pivot, first_header_hash ); } @@ -203,10 +181,7 @@ impl HeaderDownloader { self.downloaded.remove(&hash); if self.best_hash == hash { - self.pivot = Pivot { - hash, - total_score: self.total_score, - } + self.pivot = hash; } } self.queued.shrink_to_fit(); diff --git a/sync/src/block/extension.rs b/sync/src/block/extension.rs index 0b40141598..a40640092c 100644 --- a/sync/src/block/extension.rs +++ b/sync/src/block/extension.rs @@ -21,8 +21,8 @@ use std::time::Duration; use ccore::encoded::Header as EncodedHeader; use ccore::{ - Block, BlockChainClient, BlockChainTrait, BlockId, BlockImportError, ChainNotify, Client, ImportBlock, ImportError, - UnverifiedTransaction, + Block, BlockChainClient, BlockChainTrait, BlockId, BlockImportError, BlockStatus, ChainNotify, Client, ImportBlock, + ImportError, UnverifiedTransaction, }; use cnetwork::{Api, EventSender, NetworkExtension, NodeId}; use cstate::FindActionHandler; @@ -47,8 +47,6 @@ const SYNC_EXPIRE_TOKEN_END: TimerToken = SYNC_EXPIRE_TOKEN_BEGIN + SYNC_EXPIRE_ const SYNC_TIMER_INTERVAL: u64 = 1000; const SYNC_EXPIRE_REQUEST_INTERVAL: u64 = 15000; -const SNAPSHOT_PERIOD: u64 = (1 << 14); - #[derive(Debug, PartialEq)] pub struct TokenInfo { node_id: NodeId, @@ -66,6 +64,7 @@ pub struct Extension { client: Arc, api: Box, last_request: u64, + seq: u64, } impl Extension { @@ -100,6 +99,7 @@ impl Extension { client, api, last_request: Default::default(), + seq: Default::default(), } } @@ -115,13 +115,14 @@ impl Extension { id, Arc::new( Message::Status { - total_score: chain_info.best_proposal_score, + seq: U256::from(self.seq), best_hash: chain_info.best_proposal_block_hash, genesis_hash: chain_info.genesis_hash, } .rlp_bytes(), ), ); + self.seq += 1; } fn send_status_broadcast(&mut self) { @@ -131,13 +132,14 @@ impl Extension { id, Arc::new( Message::Status { - total_score: chain_info.best_proposal_score, + seq: U256::from(self.seq), best_hash: chain_info.best_proposal_block_hash, genesis_hash: chain_info.genesis_hash, } .rlp_bytes(), ), ); + self.seq += 1; } } @@ -152,6 +154,14 @@ impl Extension { } fn send_body_request(&mut self, id: &NodeId) { + if let Some(downloader) = self.header_downloaders.get(&id) { + if self.client.block_status(&BlockId::Hash(downloader.best_hash())) == BlockStatus::InChain { + // Peer is lagging behind the local blockchain. + // We don't need to request block bodies to this peer + return + } + } + self.check_sync_variable(); if let Some(requests) = self.requests.get_mut(id) { let have_body_request = { @@ -294,10 +304,10 @@ impl NetworkExtension for Extension { if let Ok(received_message) = Rlp::new(data).as_val() { match received_message { Message::Status { - total_score, + seq, best_hash, genesis_hash, - } => self.on_peer_status(id, total_score, best_hash, genesis_hash), + } => self.on_peer_status(id, seq, best_hash, genesis_hash), Message::Request(request_id, request) => self.on_peer_request(id, request_id, request), Message::Response(request_id, response) => self.on_peer_response(id, request_id, response), } @@ -309,7 +319,6 @@ impl NetworkExtension for Extension { fn on_timeout(&mut self, token: TimerToken) { match token { SYNC_TIMER_TOKEN => { - let best_proposal_score = self.client.chain_info().best_proposal_score; let mut peer_ids: Vec<_> = self.header_downloaders.keys().cloned().collect(); peer_ids.shuffle(&mut thread_rng()); @@ -322,15 +331,7 @@ impl NetworkExtension for Extension { } for id in peer_ids { - let peer_score = if let Some(peer) = self.header_downloaders.get(&id) { - peer.total_score() - } else { - U256::zero() - }; - - if peer_score > best_proposal_score { - self.send_body_request(&id); - } + self.send_body_request(&id); } } SYNC_EXPIRE_TOKEN_BEGIN..=SYNC_EXPIRE_TOKEN_END => { @@ -444,7 +445,7 @@ impl Extension { } impl Extension { - fn on_peer_status(&mut self, from: &NodeId, total_score: U256, best_hash: BlockHash, genesis_hash: BlockHash) { + fn on_peer_status(&mut self, from: &NodeId, seq: U256, best_hash: BlockHash, genesis_hash: BlockHash) { // Validity check if genesis_hash != self.client.chain_info().genesis_hash { cinfo!(SYNC, "Genesis hash mismatch with peer {}", from); @@ -453,17 +454,17 @@ impl Extension { match self.header_downloaders.entry(*from) { Entry::Occupied(mut peer) => { - if !peer.get_mut().update(total_score, best_hash) { + if !peer.get_mut().update(seq, best_hash) { // FIXME: It should be an error level if the consensus is PoW. cdebug!(SYNC, "Peer #{} status updated but score is less than before", from); return } } Entry::Vacant(e) => { - e.insert(HeaderDownloader::new(self.client.clone(), total_score, best_hash)); + e.insert(HeaderDownloader::new(self.client.clone(), seq, best_hash)); } } - cinfo!(SYNC, "Peer #{} status update: total_score: {}, best_hash: {}", from, total_score, best_hash); + cinfo!(SYNC, "Peer #{} status update: seq: {}, best_hash: {}", from, seq, best_hash); } fn on_peer_request(&self, from: &NodeId, id: u64, request: RequestMessage) { @@ -489,11 +490,9 @@ impl Extension { ctrace!(SYNC, "Received body request from {}", from); self.create_bodies_response(hashes) } - RequestMessage::StateHead(hash) => self.create_state_head_response(hash), - RequestMessage::StateChunk { - block_hash, - tree_root, - } => self.create_state_chunk_response(block_hash, tree_root), + RequestMessage::StateChunk(block_hash, chunk_root) => { + self.create_state_chunk_response(block_hash, chunk_root) + } }; self.api.send(from, Arc::new(Message::Response(id, response).rlp_bytes())); @@ -505,21 +504,9 @@ impl Extension { .. } => true, RequestMessage::Bodies(hashes) => !hashes.is_empty(), - RequestMessage::StateHead(hash) => match self.client.block_number(&BlockId::Hash(*hash)) { - Some(number) if number % SNAPSHOT_PERIOD == 0 => true, - _ => false, - }, RequestMessage::StateChunk { - block_hash, .. - } => { - let _is_checkpoint = match self.client.block_number(&BlockId::Hash(*block_hash)) { - Some(number) if number % SNAPSHOT_PERIOD == 0 => true, - _ => false, - }; - // FIXME: check tree_root - unimplemented!() - } + } => unimplemented!(), } } @@ -557,11 +544,7 @@ impl Extension { ResponseMessage::Bodies(bodies) } - fn create_state_head_response(&self, _hash: BlockHash) -> ResponseMessage { - unimplemented!() - } - - fn create_state_chunk_response(&self, _hash: BlockHash, _tree_root: H256) -> ResponseMessage { + fn create_state_chunk_response(&self, _hash: BlockHash, _tree_root: Vec) -> ResponseMessage { unimplemented!() } @@ -602,7 +585,7 @@ impl Extension { self.on_body_response(hashes, bodies); self.check_sync_variable(); } - _ => unimplemented!(), + ResponseMessage::StateChunk(..) => unimplemented!(), } } } @@ -656,7 +639,6 @@ impl Extension { } true } - (RequestMessage::StateHead(..), ResponseMessage::StateHead(..)) => unimplemented!(), ( RequestMessage::StateChunk { .. @@ -672,14 +654,12 @@ impl Extension { fn on_header_response(&mut self, from: &NodeId, headers: &[Header]) { ctrace!(SYNC, "Received header response from({}) with length({})", from, headers.len()); - let (mut completed, pivot_score_changed) = if let Some(peer) = self.header_downloaders.get_mut(from) { - let before_pivot_score = peer.pivot_score(); + let (mut completed, peer_is_caught_up) = if let Some(peer) = self.header_downloaders.get_mut(from) { let encoded: Vec<_> = headers.iter().map(|h| EncodedHeader::new(h.rlp_bytes().to_vec())).collect(); peer.import_headers(&encoded); - let after_pivot_score = peer.pivot_score(); - (peer.downloaded(), before_pivot_score != after_pivot_score) + (peer.downloaded(), peer.is_caught_up()) } else { - (Vec::new(), false) + (Vec::new(), true) }; completed.sort_unstable_by_key(EncodedHeader::number); @@ -705,7 +685,7 @@ impl Extension { peer.mark_as_imported(exists); peer.create_request() }); - if pivot_score_changed { + if !peer_is_caught_up { if let Some(request) = request { self.send_header_request(from, request); } @@ -745,20 +725,11 @@ impl Extension { } } - let total_score = self.client.chain_info().best_proposal_score; let mut peer_ids: Vec<_> = self.header_downloaders.keys().cloned().collect(); peer_ids.shuffle(&mut thread_rng()); for id in peer_ids { - let peer_score = if let Some(peer) = self.header_downloaders.get(&id) { - peer.total_score() - } else { - U256::zero() - }; - - if peer_score > total_score { - self.send_body_request(&id); - } + self.send_body_request(&id); } } } diff --git a/sync/src/block/message/mod.rs b/sync/src/block/message/mod.rs index dfeab779c1..5c19dad4bd 100644 --- a/sync/src/block/message/mod.rs +++ b/sync/src/block/message/mod.rs @@ -29,15 +29,13 @@ const MESSAGE_ID_GET_HEADERS: u8 = 0x02; const MESSAGE_ID_HEADERS: u8 = 0x03; const MESSAGE_ID_GET_BODIES: u8 = 0x04; const MESSAGE_ID_BODIES: u8 = 0x05; -const MESSAGE_ID_GET_STATE_HEAD: u8 = 0x06; -const MESSAGE_ID_STATE_HEAD: u8 = 0x07; -const MESSAGE_ID_GET_STATE_CHUNK: u8 = 0x08; -const MESSAGE_ID_STATE_CHUNK: u8 = 0x09; +const MESSAGE_ID_GET_STATE_CHUNK: u8 = 0x0a; +const MESSAGE_ID_STATE_CHUNK: u8 = 0x0b; #[derive(Debug, PartialEq)] pub enum Message { Status { - total_score: U256, + seq: U256, best_hash: BlockHash, genesis_hash: BlockHash, }, @@ -49,7 +47,7 @@ impl Encodable for Message { fn rlp_append(&self, s: &mut RlpStream) { match self { Message::Status { - total_score, + seq, best_hash, genesis_hash, } => { @@ -57,7 +55,7 @@ impl Encodable for Message { s.append(&MESSAGE_ID_STATUS); s.begin_list(3); - s.append(total_score); + s.append(seq); s.append(best_hash); s.append(genesis_hash); } @@ -99,7 +97,7 @@ impl Decodable for Message { } Ok(Message::Status { - total_score: message.val_at(0)?, + seq: message.val_at(0)?, best_hash: message.val_at(1)?, genesis_hash: message.val_at(2)?, }) @@ -114,11 +112,10 @@ impl Decodable for Message { let request_id = rlp.val_at(1)?; let message = rlp.at(2)?; match id { - MESSAGE_ID_GET_HEADERS - | MESSAGE_ID_GET_BODIES - | MESSAGE_ID_GET_STATE_HEAD - | MESSAGE_ID_GET_STATE_CHUNK => Ok(Message::Request(request_id, RequestMessage::decode(id, &message)?)), - MESSAGE_ID_HEADERS | MESSAGE_ID_BODIES | MESSAGE_ID_STATE_HEAD | MESSAGE_ID_STATE_CHUNK => { + MESSAGE_ID_GET_HEADERS | MESSAGE_ID_GET_BODIES | MESSAGE_ID_GET_STATE_CHUNK => { + Ok(Message::Request(request_id, RequestMessage::decode(id, &message)?)) + } + MESSAGE_ID_HEADERS | MESSAGE_ID_BODIES | MESSAGE_ID_STATE_CHUNK => { Ok(Message::Response(request_id, ResponseMessage::decode(id, &message)?)) } _ => Err(DecoderError::Custom("Unknown message id detected")), @@ -137,7 +134,7 @@ mod tests { #[test] fn status_message_rlp() { rlp_encode_and_decode_test!(Message::Status { - total_score: U256::default(), + seq: U256::zero(), best_hash: H256::default().into(), genesis_hash: H256::default().into(), }); @@ -148,10 +145,4 @@ mod tests { let request_id = 10; rlp_encode_and_decode_test!(Message::Request(request_id, RequestMessage::Bodies(vec![]))); } - - #[test] - fn request_state_head_rlp() { - let request_id = 10; - rlp_encode_and_decode_test!(Message::Request(request_id, RequestMessage::StateHead(H256::random().into()))); - } } diff --git a/sync/src/block/message/request.rs b/sync/src/block/message/request.rs index 2238e0331c..a2efc6b65d 100644 --- a/sync/src/block/message/request.rs +++ b/sync/src/block/message/request.rs @@ -25,11 +25,7 @@ pub enum RequestMessage { max_count: u64, }, Bodies(Vec), - StateHead(BlockHash), - StateChunk { - block_hash: BlockHash, - tree_root: H256, - }, + StateChunk(BlockHash, Vec), } impl Encodable for RequestMessage { @@ -46,17 +42,10 @@ impl Encodable for RequestMessage { RequestMessage::Bodies(hashes) => { s.append_list(hashes); } - RequestMessage::StateHead(block_hash) => { - s.begin_list(1); - s.append(block_hash); - } - RequestMessage::StateChunk { - block_hash, - tree_root, - } => { + RequestMessage::StateChunk(block_hash, merkle_roots) => { s.begin_list(2); s.append(block_hash); - s.append(tree_root); + s.append_list(merkle_roots); } }; } @@ -69,7 +58,6 @@ impl RequestMessage { .. } => super::MESSAGE_ID_GET_HEADERS, RequestMessage::Bodies(..) => super::MESSAGE_ID_GET_BODIES, - RequestMessage::StateHead(..) => super::MESSAGE_ID_GET_STATE_HEAD, RequestMessage::StateChunk { .. } => super::MESSAGE_ID_GET_STATE_CHUNK, @@ -92,16 +80,6 @@ impl RequestMessage { } } super::MESSAGE_ID_GET_BODIES => RequestMessage::Bodies(rlp.as_list()?), - super::MESSAGE_ID_GET_STATE_HEAD => { - let item_count = rlp.item_count()?; - if item_count != 1 { - return Err(DecoderError::RlpIncorrectListLen { - got: item_count, - expected: 1, - }) - } - RequestMessage::StateHead(rlp.val_at(0)?) - } super::MESSAGE_ID_GET_STATE_CHUNK => { let item_count = rlp.item_count()?; if item_count != 2 { @@ -110,10 +88,7 @@ impl RequestMessage { expected: 2, }) } - RequestMessage::StateChunk { - block_hash: rlp.val_at(0)?, - tree_root: rlp.val_at(1)?, - } + RequestMessage::StateChunk(rlp.val_at(0)?, rlp.list_at(1)?) } _ => return Err(DecoderError::Custom("Unknown message id detected")), }; @@ -149,18 +124,9 @@ mod tests { assert_eq!(message, decode_bytes(message.message_id(), message.rlp_bytes().as_ref())); } - #[test] - fn request_state_head_message_rlp() { - let message = RequestMessage::StateHead(H256::default().into()); - assert_eq!(message, decode_bytes(message.message_id(), message.rlp_bytes().as_ref())); - } - #[test] fn request_state_chunk_message_rlp() { - let message = RequestMessage::StateChunk { - block_hash: H256::default().into(), - tree_root: H256::default(), - }; + let message = RequestMessage::StateChunk(H256::default().into(), vec![H256::default()]); assert_eq!(message, decode_bytes(message.message_id(), message.rlp_bytes().as_ref())); } } diff --git a/sync/src/block/message/response.rs b/sync/src/block/message/response.rs index 0c870b09a5..4529e51878 100644 --- a/sync/src/block/message/response.rs +++ b/sync/src/block/message/response.rs @@ -24,8 +24,7 @@ use ctypes::Header; pub enum ResponseMessage { Headers(Vec
), Bodies(Vec>), - StateHead(Vec), - StateChunk(Vec), + StateChunk(Vec>), } impl Encodable for ResponseMessage { @@ -53,13 +52,8 @@ impl Encodable for ResponseMessage { s.append(&compressed); } - ResponseMessage::StateHead(bytes) => { - s.begin_list(1); - s.append(bytes); - } - ResponseMessage::StateChunk(bytes) => { - s.begin_list(1); - s.append(bytes); + ResponseMessage::StateChunk(chunks) => { + s.append_list::, Vec>(chunks); } }; } @@ -72,7 +66,6 @@ impl ResponseMessage { .. } => super::MESSAGE_ID_HEADERS, ResponseMessage::Bodies(..) => super::MESSAGE_ID_BODIES, - ResponseMessage::StateHead(..) => super::MESSAGE_ID_STATE_HEAD, ResponseMessage::StateChunk { .. } => super::MESSAGE_ID_STATE_CHUNK, @@ -109,26 +102,7 @@ impl ResponseMessage { } ResponseMessage::Bodies(bodies) } - super::MESSAGE_ID_STATE_HEAD => { - let item_count = rlp.item_count()?; - if item_count != 1 { - return Err(DecoderError::RlpIncorrectListLen { - got: item_count, - expected: 1, - }) - } - ResponseMessage::StateHead(rlp.val_at(0)?) - } - super::MESSAGE_ID_STATE_CHUNK => { - let item_count = rlp.item_count()?; - if item_count != 1 { - return Err(DecoderError::RlpIncorrectListLen { - got: item_count, - expected: 1, - }) - } - ResponseMessage::StateChunk(rlp.val_at(0)?) - } + super::MESSAGE_ID_STATE_CHUNK => ResponseMessage::StateChunk(rlp.as_list()?), _ => return Err(DecoderError::Custom("Unknown message id detected")), }; @@ -184,12 +158,6 @@ mod tests { assert_eq!(message, decode_bytes(message.message_id(), message.rlp_bytes().as_ref())); } - #[test] - fn state_head_message_rlp() { - let message = ResponseMessage::StateHead(vec![]); - assert_eq!(message, decode_bytes(message.message_id(), message.rlp_bytes().as_ref())); - } - #[test] fn state_chunk_message_rlp() { let message = ResponseMessage::StateChunk(vec![]); diff --git a/test/src/helper/mock/blockSyncMessage.ts b/test/src/helper/mock/blockSyncMessage.ts index d064d825b4..19385edd0d 100644 --- a/test/src/helper/mock/blockSyncMessage.ts +++ b/test/src/helper/mock/blockSyncMessage.ts @@ -64,7 +64,9 @@ export class BlockSyncMessage { if (msgId === MessageType.MESSAGE_ID_STATUS) { Emitter.emit("status"); const msg = decodedmsg[1]; - const totalScore = new U256(parseInt(msg[0].toString("hex"), 16)); + const totalScore = new U256( + parseInt(msg[0].toString("hex"), 16) || 0 + ); const bestHash = new H256(msg[1].toString("hex")); const genesisHash = new H256(msg[2].toString("hex")); return new BlockSyncMessage({