diff --git a/codechain/rpc.rs b/codechain/rpc.rs index 5a3f696ca4..b3219e25d8 100644 --- a/codechain/rpc.rs +++ b/codechain/rpc.rs @@ -15,8 +15,8 @@ // along with this program. If not, see . use std::io; -use std::net::SocketAddr; +use crate::config::Config; use crate::rpc_apis; use crpc::{ jsonrpc_core, start_http, start_ipc, start_ws, HttpServer, IpcServer, MetaIoHandler, Middleware, WsError, WsServer, @@ -33,38 +33,27 @@ pub struct RpcHttpConfig { } pub fn rpc_http_start( - cfg: RpcHttpConfig, - enable_devel_api: bool, - deps: &rpc_apis::ApiDependencies, + server: MetaIoHandler<(), impl Middleware<()>>, + config: RpcHttpConfig, ) -> Result { - let url = format!("{}:{}", cfg.interface, cfg.port); + let url = format!("{}:{}", config.interface, config.port); let addr = url.parse().map_err(|_| format!("Invalid JSONRPC listen host/port given: {}", url))?; - let server = setup_http_rpc_server(&addr, cfg.cors.clone(), cfg.hosts.clone(), enable_devel_api, deps)?; - cinfo!(RPC, "RPC Listening on {}", url); - if let Some(hosts) = cfg.hosts { - cinfo!(RPC, "Allowed hosts are {:?}", hosts); - } - if let Some(cors) = cfg.cors { - cinfo!(RPC, "CORS domains are {:?}", cors); - } - Ok(server) -} - -fn setup_http_rpc_server( - url: &SocketAddr, - cors_domains: Option>, - allowed_hosts: Option>, - enable_devel_api: bool, - deps: &rpc_apis::ApiDependencies, -) -> Result { - let server = setup_rpc_server(enable_devel_api, deps); - let start_result = start_http(url, cors_domains, allowed_hosts, server); + let start_result = start_http(&addr, config.cors.clone(), config.hosts.clone(), server); match start_result { Err(ref err) if err.kind() == io::ErrorKind::AddrInUse => { Err(format!("RPC address {} is already in use, make sure that another instance of a CodeChain node is not running or change the address using the --jsonrpc-port option.", url)) }, Err(e) => Err(format!("RPC error: {:?}", e)), - Ok(server) => Ok(server), + Ok(server) => { + cinfo!(RPC, "RPC Listening on {}", url); + if let Some(hosts) = config.hosts { + cinfo!(RPC, "Allowed hosts are {:?}", hosts); + } + if let Some(cors) = config.cors { + cinfo!(RPC, "CORS domains are {:?}", cors); + } + Ok(server) + }, } } @@ -74,19 +63,17 @@ pub struct RpcIpcConfig { } pub fn rpc_ipc_start( - cfg: &RpcIpcConfig, - enable_devel_api: bool, - deps: &rpc_apis::ApiDependencies, + server: MetaIoHandler<(), impl Middleware<()>>, + config: RpcIpcConfig, ) -> Result { - let server = setup_rpc_server(enable_devel_api, deps); - let start_result = start_ipc(&cfg.socket_addr, server); + let start_result = start_ipc(&config.socket_addr, server); match start_result { Err(ref err) if err.kind() == io::ErrorKind::AddrInUse => { - Err(format!("IPC address {} is already in use, make sure that another instance of a Codechain node is not running or change the address using the --ipc-path options.", cfg.socket_addr)) + Err(format!("IPC address {} is already in use, make sure that another instance of a Codechain node is not running or change the address using the --ipc-path options.", config.socket_addr)) }, Err(e) => Err(format!("IPC error: {:?}", e)), Ok(server) => { - cinfo!(RPC, "IPC Listening on {}", cfg.socket_addr); + cinfo!(RPC, "IPC Listening on {}", config.socket_addr); Ok(server) }, } @@ -99,15 +86,10 @@ pub struct RpcWsConfig { pub max_connections: usize, } -pub fn rpc_ws_start( - cfg: &RpcWsConfig, - enable_devel_api: bool, - deps: &rpc_apis::ApiDependencies, -) -> Result { - let server = setup_rpc_server(enable_devel_api, deps); - let url = format!("{}:{}", cfg.interface, cfg.port); +pub fn rpc_ws_start(server: MetaIoHandler<(), impl Middleware<()>>, config: RpcWsConfig) -> Result { + let url = format!("{}:{}", config.interface, config.port); let addr = url.parse().map_err(|_| format!("Invalid WebSockets listen host/port given: {}", url))?; - let start_result = start_ws(&addr, server, cfg.max_connections); + let start_result = start_ws(&addr, server, config.max_connections); match start_result { Err(WsError::Io(ref err)) if err.kind() == io::ErrorKind::AddrInUse => { Err(format!("WebSockets address {} is already in use, make sure that another instance of a Codechain node is not running or change the address using the --ws-port options.", addr)) @@ -120,12 +102,9 @@ pub fn rpc_ws_start( } } -fn setup_rpc_server( - enable_devel_api: bool, - deps: &rpc_apis::ApiDependencies, -) -> MetaIoHandler<(), impl Middleware<()>> { +pub fn setup_rpc_server(config: &Config, deps: &rpc_apis::ApiDependencies) -> MetaIoHandler<(), impl Middleware<()>> { let mut handler = MetaIoHandler::with_middleware(LogMiddleware::new()); - deps.extend_api(enable_devel_api, &mut handler); + deps.extend_api(config, &mut handler); rpc_apis::setup_rpc(handler) } diff --git a/codechain/rpc_apis.rs b/codechain/rpc_apis.rs index 3f4d3c016a..a447b43b42 100644 --- a/codechain/rpc_apis.rs +++ b/codechain/rpc_apis.rs @@ -22,6 +22,8 @@ use cnetwork::{EventSender, NetworkControl}; use crpc::{MetaIoHandler, Middleware, Params, Value}; use csync::BlockSyncEvent; +use crate::config::Config; + pub struct ApiDependencies { pub client: Arc, pub miner: Arc, @@ -31,11 +33,11 @@ pub struct ApiDependencies { } impl ApiDependencies { - pub fn extend_api(&self, enable_devel_api: bool, handler: &mut MetaIoHandler<(), impl Middleware<()>>) { + pub fn extend_api(&self, config: &Config, handler: &mut MetaIoHandler<(), impl Middleware<()>>) { use crpc::v1::*; handler.extend_with(ChainClient::new(Arc::clone(&self.client)).to_delegate()); handler.extend_with(MempoolClient::new(Arc::clone(&self.client)).to_delegate()); - if enable_devel_api { + if config.rpc.enable_devel_api { handler.extend_with( DevelClient::new(Arc::clone(&self.client), Arc::clone(&self.miner), self.block_sync.clone()) .to_delegate(), diff --git a/codechain/run_node.rs b/codechain/run_node.rs index 9ec3646293..85255e6e29 100644 --- a/codechain/run_node.rs +++ b/codechain/run_node.rs @@ -42,7 +42,7 @@ use crate::config::{self, load_config}; use crate::constants::{DEFAULT_DB_PATH, DEFAULT_KEYS_PATH}; use crate::dummy_network_service::DummyNetworkService; use crate::json::PasswordFile; -use crate::rpc::{rpc_http_start, rpc_ipc_start, rpc_ws_start}; +use crate::rpc::{rpc_http_start, rpc_ipc_start, rpc_ws_start, setup_rpc_server}; use crate::rpc_apis::ApiDependencies; fn network_start( @@ -316,36 +316,43 @@ pub fn run_node(matches: &ArgMatches) -> Result<(), String> { } }; - let rpc_apis_deps = ApiDependencies { - client: client.client(), - miner: Arc::clone(&miner), - network_control: Arc::clone(&network_service), - account_provider: ap, - block_sync: maybe_sync_sender, - }; + let (rpc_server, ipc_server, ws_server) = { + let rpc_apis_deps = ApiDependencies { + client: client.client(), + miner: Arc::clone(&miner), + network_control: Arc::clone(&network_service), + account_provider: ap, + block_sync: maybe_sync_sender, + }; + + let rpc_server = { + if !config.rpc.disable.unwrap() { + let server = setup_rpc_server(&config, &rpc_apis_deps); + Some(rpc_http_start(server, config.rpc_http_config())?) + } else { + None + } + }; - let rpc_server = { - if !config.rpc.disable.unwrap() { - Some(rpc_http_start(config.rpc_http_config(), config.rpc.enable_devel_api, &rpc_apis_deps)?) - } else { - None - } - }; + let ipc_server = { + if !config.ipc.disable.unwrap() { + let server = setup_rpc_server(&config, &rpc_apis_deps); + Some(rpc_ipc_start(server, config.rpc_ipc_config())?) + } else { + None + } + }; - let ipc_server = { - if !config.ipc.disable.unwrap() { - Some(rpc_ipc_start(&config.rpc_ipc_config(), config.rpc.enable_devel_api, &rpc_apis_deps)?) - } else { - None - } - }; + let ws_server = { + if !config.ws.disable.unwrap() { + let server = setup_rpc_server(&config, &rpc_apis_deps); + Some(rpc_ws_start(server, config.rpc_ws_config())?) + } else { + None + } + }; - let ws_server = { - if !config.ws.disable.unwrap() { - Some(rpc_ws_start(&config.rpc_ws_config(), config.rpc.enable_devel_api, &rpc_apis_deps)?) - } else { - None - } + (rpc_server, ipc_server, ws_server) }; if (!config.stratum.disable.unwrap()) && (miner.engine_type() == EngineType::PoW) { @@ -367,6 +374,7 @@ pub fn run_node(matches: &ArgMatches) -> Result<(), String> { // drop the scheme to free up genesis state. drop(scheme); + client.client().engine().complete_register(); cinfo!(TEST_SCRIPT, "Initialization complete"); diff --git a/core/src/block.rs b/core/src/block.rs index 9a6584d090..ab35efe3cf 100644 --- a/core/src/block.rs +++ b/core/src/block.rs @@ -313,6 +313,10 @@ impl<'x> OpenBlock<'x> { self.block.header.set_seal(seal); Ok(()) } + + pub fn inner_mut(&mut self) -> &mut ExecutedBlock { + &mut self.block + } } /// Just like `OpenBlock`, except that we've applied `Engine::on_close_block`, finished up the non-seal header fields. @@ -492,6 +496,7 @@ pub fn enact( let mut b = OpenBlock::try_new(engine, db, parent, Address::default(), vec![])?; b.populate_from(header); + engine.on_open_block(b.inner_mut())?; b.push_transactions(transactions, client, parent.number(), parent.timestamp())?; let term_common_params = client.term_common_params(BlockId::Hash(*header.parent_hash())); diff --git a/core/src/consensus/mod.rs b/core/src/consensus/mod.rs index cd7d72b1f7..37c2703fca 100644 --- a/core/src/consensus/mod.rs +++ b/core/src/consensus/mod.rs @@ -222,6 +222,11 @@ pub trait ConsensusEngine: Sync + Send { /// Stops any services that the may hold the Engine and makes it safe to drop. fn stop(&self) {} + /// Block transformation functions, before the transactions. + fn on_open_block(&self, _block: &mut ExecutedBlock) -> Result<(), Error> { + Ok(()) + } + /// Block transformation functions, after the transactions. fn on_close_block( &self, @@ -261,6 +266,8 @@ pub trait ConsensusEngine: Sync + Send { fn register_chain_notify(&self, _: &Client) {} + fn complete_register(&self) {} + fn get_best_block_from_best_proposal_header(&self, header: &HeaderView) -> BlockHash { header.hash() } diff --git a/core/src/consensus/tendermint/engine.rs b/core/src/consensus/tendermint/engine.rs index dbfbf86198..ca17232e7d 100644 --- a/core/src/consensus/tendermint/engine.rs +++ b/core/src/consensus/tendermint/engine.rs @@ -141,6 +141,15 @@ impl ConsensusEngine for Tendermint { fn stop(&self) {} + /// Block transformation functions, before the transactions. + fn on_open_block(&self, block: &mut ExecutedBlock) -> Result<(), Error> { + let metadata = block.state().metadata()?.expect("Metadata must exist"); + if block.header().number() == metadata.last_term_finished_block_num() + 1 { + // FIXME: on_term_open + } + Ok(()) + } + fn on_close_block( &self, block: &mut ExecutedBlock, @@ -269,10 +278,6 @@ impl ConsensusEngine for Tendermint { let extension = service.register_extension(move |api| TendermintExtension::new(inner, timeouts, api)); let client = Arc::downgrade(&self.client().unwrap()); self.extension_initializer.send((extension, client)).unwrap(); - - let (result, receiver) = crossbeam::bounded(1); - self.inner.send(worker::Event::Restore(result)).unwrap(); - receiver.recv().unwrap(); } fn register_time_gap_config_to_worker(&self, time_gap_params: TimeGapParams) { @@ -291,6 +296,12 @@ impl ConsensusEngine for Tendermint { client.add_notify(Arc::downgrade(&self.chain_notify) as Weak); } + fn complete_register(&self) { + let (result, receiver) = crossbeam::bounded(1); + self.inner.send(worker::Event::Restore(result)).unwrap(); + receiver.recv().unwrap(); + } + fn get_best_block_from_best_proposal_header(&self, header: &HeaderView) -> BlockHash { header.parent_hash() } diff --git a/core/src/miner/miner.rs b/core/src/miner/miner.rs index 08a2ba0a99..5b7c42d865 100644 --- a/core/src/miner/miner.rs +++ b/core/src/miner/miner.rs @@ -527,6 +527,7 @@ impl Miner { return Ok(None) } } + self.engine.on_open_block(open_block.inner_mut())?; let mut invalid_transactions = Vec::new(); diff --git a/sync/src/block/extension.rs b/sync/src/block/extension.rs index 1ea7eb4c5b..0b40141598 100644 --- a/sync/src/block/extension.rs +++ b/sync/src/block/extension.rs @@ -109,6 +109,38 @@ impl Extension { } } + fn send_status(&mut self, id: &NodeId) { + let chain_info = self.client.chain_info(); + self.api.send( + id, + Arc::new( + Message::Status { + total_score: chain_info.best_proposal_score, + best_hash: chain_info.best_proposal_block_hash, + genesis_hash: chain_info.genesis_hash, + } + .rlp_bytes(), + ), + ); + } + + fn send_status_broadcast(&mut self) { + let chain_info = self.client.chain_info(); + for id in self.connected_nodes.iter() { + self.api.send( + id, + Arc::new( + Message::Status { + total_score: chain_info.best_proposal_score, + best_hash: chain_info.best_proposal_block_hash, + genesis_hash: chain_info.genesis_hash, + } + .rlp_bytes(), + ), + ); + } + } + fn send_header_request(&mut self, id: &NodeId, request: RequestMessage) { if let Some(requests) = self.requests.get_mut(id) { ctrace!(SYNC, "Send header request to {}", id); @@ -212,18 +244,8 @@ impl NetworkExtension for Extension { fn on_node_added(&mut self, id: &NodeId, _version: u64) { cinfo!(SYNC, "New peer detected #{}", id); - let chain_info = self.client.chain_info(); - self.api.send( - id, - Arc::new( - Message::Status { - total_score: chain_info.best_proposal_score, - best_hash: chain_info.best_proposal_block_hash, - genesis_hash: chain_info.genesis_hash, - } - .rlp_bytes(), - ), - ); + self.send_status(id); + let t = self.connected_nodes.insert(*id); debug_assert!(t, "{} is already added to peer list", id); @@ -386,11 +408,8 @@ pub enum Event { impl Extension { fn new_headers(&mut self, imported: Vec, enacted: Vec, retracted: Vec) { - let peer_ids: Vec<_> = self.header_downloaders.keys().cloned().collect(); - for id in peer_ids { - if let Some(peer) = self.header_downloaders.get_mut(&id) { - peer.mark_as_imported(imported.clone()); - } + for peer in self.header_downloaders.values_mut() { + peer.mark_as_imported(imported.clone()); } let mut headers_to_download: Vec<_> = enacted .into_iter() @@ -420,22 +439,7 @@ impl Extension { self.body_downloader.remove_target(&imported); self.body_downloader.remove_target(&invalid); - - let chain_info = self.client.chain_info(); - - for id in &self.connected_nodes { - self.api.send( - id, - Arc::new( - Message::Status { - total_score: chain_info.best_proposal_score, - best_hash: chain_info.best_proposal_block_hash, - genesis_hash: chain_info.genesis_hash, - } - .rlp_bytes(), - ), - ); - } + self.send_status_broadcast(); } } diff --git a/test/src/e2e.dynval/setup.ts b/test/src/e2e.dynval/setup.ts index 493f6ef602..2c4aefa645 100644 --- a/test/src/e2e.dynval/setup.ts +++ b/test/src/e2e.dynval/setup.ts @@ -451,7 +451,7 @@ interface TermWaiter { target: number; termPeriods: number; } - ): Promise; + ): Promise; } export function setTermTestTimeout( @@ -483,7 +483,7 @@ export function setTermTestTimeout( termPeriods: number; } ) { - await node.waitForTermChange( + return await node.waitForTermChange( waiterParams.target, termPeriodsToTime(waiterParams.termPeriods, 0.5) ); diff --git a/test/src/helper/spawn.ts b/test/src/helper/spawn.ts index 4b2eb4bc95..f2d1cf396b 100644 --- a/test/src/helper/spawn.ts +++ b/test/src/helper/spawn.ts @@ -859,14 +859,16 @@ export default class CodeChain { public async waitForTermChange(target: number, timeout?: number) { const start = Date.now(); while (true) { - const termMetadata = await stake.getTermMetadata(this.sdk); + const termMetadata = (await stake.getTermMetadata(this.sdk))!; if (termMetadata && termMetadata.currentTermId >= target) { - break; + return termMetadata; } await wait(1000); if (timeout) { if (Date.now() - start > timeout * 1000) { - throw new Error(`Term didn't changed in ${timeout} s`); + throw new Error( + `Term didn't changed to ${target} in ${timeout} s. It is ${termMetadata.currentTermId} now` + ); } } } diff --git a/util/merkle/src/triedb.rs b/util/merkle/src/triedb.rs index c15ff43047..9b536753f5 100644 --- a/util/merkle/src/triedb.rs +++ b/util/merkle/src/triedb.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use ccrypto::blake256; +use ccrypto::{blake256, BLAKE_NULL_RLP}; use hashdb::HashDB; use primitives::H256; @@ -103,6 +103,27 @@ impl<'db> TrieDB<'db> { None => Ok(None), } } + + /// Check if every leaf of the trie exists + #[allow(dead_code)] + pub fn is_complete(&self) -> bool { + *self.root == BLAKE_NULL_RLP || self.is_complete_aux(self.root) + } + + /// Check if every leaf of the trie starting from `hash` exists + fn is_complete_aux(&self, hash: &H256) -> bool { + if let Some(node_rlp) = self.db.get(hash) { + match RlpNode::decoded(node_rlp.as_ref()) { + Some(RlpNode::Branch(.., children)) => { + children.iter().flatten().all(|child| self.is_complete_aux(child)) + } + Some(RlpNode::Leaf(..)) => true, + None => false, + } + } else { + false + } + } } impl<'db> Trie for TrieDB<'db> { @@ -124,6 +145,19 @@ mod tests { use crate::*; use memorydb::*; + fn delete_any_child(db: &mut MemoryDB, root: &H256) { + let node_rlp = db.get(root).unwrap(); + match RlpNode::decoded(&node_rlp).unwrap() { + RlpNode::Leaf(..) => { + db.remove(root); + } + RlpNode::Branch(.., children) => { + let first_child = children.iter().find(|c| c.is_some()).unwrap().unwrap(); + db.remove(&first_child); + } + } + } + #[test] fn get() { let mut memdb = MemoryDB::new(); @@ -139,4 +173,33 @@ mod tests { assert_eq!(t.get(b"B"), Ok(Some(b"ABCBA".to_vec()))); assert_eq!(t.get(b"C"), Ok(None)); } + + #[test] + fn is_complete_success() { + let mut memdb = MemoryDB::new(); + let mut root = H256::new(); + { + let mut t = TrieDBMut::new(&mut memdb, &mut root); + t.insert(b"A", b"ABC").unwrap(); + t.insert(b"B", b"ABCBA").unwrap(); + } + + let t = TrieDB::try_new(&memdb, &root).unwrap(); + assert!(t.is_complete()); + } + + #[test] + fn is_complete_fail() { + let mut memdb = MemoryDB::new(); + let mut root = H256::new(); + { + let mut t = TrieDBMut::new(&mut memdb, &mut root); + t.insert(b"A", b"ABC").unwrap(); + t.insert(b"B", b"ABCBA").unwrap(); + } + delete_any_child(&mut memdb, &root); + + let t = TrieDB::try_new(&memdb, &root).unwrap(); + assert!(!t.is_complete()); + } }