diff --git a/Cargo.lock b/Cargo.lock index 1ce59fa078..c6e55a743d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -424,6 +424,8 @@ dependencies = [ "primitives", "rand 0.6.1", "rlp", + "rlp_derive", + "snap", "tempfile", "trie-standardmap", ] diff --git a/core/src/client/mod.rs b/core/src/client/mod.rs index dd42b863d1..62b6d11697 100644 --- a/core/src/client/mod.rs +++ b/core/src/client/mod.rs @@ -20,6 +20,7 @@ mod client; mod config; mod error; mod importer; +pub mod snapshot_notify; mod test_client; pub use self::chain_notify::ChainNotify; diff --git a/core/src/client/snapshot_notify.rs b/core/src/client/snapshot_notify.rs new file mode 100644 index 0000000000..35844c5604 --- /dev/null +++ b/core/src/client/snapshot_notify.rs @@ -0,0 +1,97 @@ +// Copyright 2018-2019 Kodebox, Inc. +// This file is part of CodeChain. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::sync::mpsc::{sync_channel, Receiver, RecvError, SyncSender}; +use std::sync::{Arc, Weak}; + +use ctypes::BlockHash; +use parking_lot::RwLock; + +pub fn create() -> (NotifySender, NotifyReceiverSource) { + let (tx, rx) = sync_channel(1); + let tx = Arc::new(RwLock::new(Some(tx))); + let tx_weak = Arc::downgrade(&tx); + ( + NotifySender { + tx, + }, + NotifyReceiverSource( + ReceiverCanceller { + tx: tx_weak, + }, + NotifyReceiver { + rx, + }, + ), + ) +} + +pub struct NotifySender { + tx: Arc>>>, +} + +impl NotifySender { + pub fn notify(&self, block_hash: BlockHash) { + let guard = self.tx.read(); + if let Some(tx) = guard.as_ref() { + // TODO: Ignore the error. Receiver thread might be terminated or congested. + let _ = tx.try_send(block_hash); + } else { + // ReceiverCanceller is dropped. + } + } +} + +pub struct NotifyReceiverSource(pub ReceiverCanceller, pub NotifyReceiver); + +/// Dropping this makes the receiver stopped. +/// +/// `recv()` method of the `Receiver` will stop and return `RecvError` when corresponding `Sender` is dropped. +/// This is an inherited behaviour of `std::sync::mpsc::{Sender, Receiver}`. +/// However, we need another way to stop the `Receiver`, since `Sender` is usually shared throughout our codes. +/// We can't collect them all and destory one by one. We need a kill switch. +/// +/// `ReceiverCanceller` holds weak reference to the `Sender`, so it doesn't prohibit the default behaviour. +/// Then, we can upgrade the weak reference and get the shared reference to `Sender` itself, and manually drop it with this. +pub struct ReceiverCanceller { + tx: Weak>>>, +} + +impl Drop for ReceiverCanceller { + fn drop(&mut self) { + if let Some(tx) = self.tx.upgrade() { + let mut guard = tx.write(); + if let Some(sender) = guard.take() { + drop(sender) + } + } else { + // All NotifySender is dropped. No droppable Sender. + } + } +} + +/// Receiver is dropped when +/// 1. There are no NotifySenders out there. +/// 2. ReceiverCanceller is dropped. See the comment of `ReceiverCanceller`. +pub struct NotifyReceiver { + rx: Receiver, +} + +impl NotifyReceiver { + pub fn recv(&self) -> Result { + self.rx.recv() + } +} diff --git a/core/src/consensus/mod.rs b/core/src/consensus/mod.rs index db6f0fe413..faada8d1cb 100644 --- a/core/src/consensus/mod.rs +++ b/core/src/consensus/mod.rs @@ -47,6 +47,7 @@ use primitives::Bytes; use self::bit_set::BitSet; use crate::account_provider::AccountProvider; use crate::block::{ExecutedBlock, SealedBlock}; +use crate::client::snapshot_notify::NotifySender as SnapshotNotifySender; use crate::client::ConsensusClient; use crate::codechain_machine::CodeChainMachine; use crate::error::Error; @@ -236,6 +237,10 @@ pub trait ConsensusEngine: Sync + Send { fn register_chain_notify(&self, _: &Client) {} + fn complete_register(&self) {} + + fn register_snapshot_notify_sender(&self, _sender: SnapshotNotifySender) {} + fn get_best_block_from_best_proposal_header(&self, header: &HeaderView<'_>) -> BlockHash { header.hash() } diff --git a/core/src/consensus/tendermint/engine.rs b/core/src/consensus/tendermint/engine.rs index 67a83dc4d1..61b0eaafaf 100644 --- a/core/src/consensus/tendermint/engine.rs +++ b/core/src/consensus/tendermint/engine.rs @@ -37,6 +37,7 @@ use super::worker; use super::{ChainNotify, Tendermint, SEAL_FIELDS}; use crate::account_provider::AccountProvider; use crate::block::*; +use crate::client::snapshot_notify::NotifySender as SnapshotNotifySender; use crate::client::{Client, ConsensusClient}; use crate::codechain_machine::CodeChainMachine; use crate::consensus::tendermint::params::TimeGapParams; @@ -290,10 +291,6 @@ impl ConsensusEngine for Tendermint { let extension = service.register_extension(move |api| TendermintExtension::new(inner, timeouts, api)); let client = Arc::downgrade(&self.client().unwrap()); self.extension_initializer.send((extension, client)).unwrap(); - - let (result, receiver) = crossbeam::bounded(1); - self.inner.send(worker::Event::Restore(result)).unwrap(); - receiver.recv().unwrap(); } fn register_time_gap_config_to_worker(&self, time_gap_params: TimeGapParams) { @@ -312,6 +309,16 @@ impl ConsensusEngine for Tendermint { client.add_notify(Arc::downgrade(&self.chain_notify) as Weak); } + fn complete_register(&self) { + let (result, receiver) = crossbeam::bounded(1); + self.inner.send(worker::Event::Restore(result)).unwrap(); + receiver.recv().unwrap(); + } + + fn register_snapshot_notify_sender(&self, sender: SnapshotNotifySender) { + self.snapshot_notify_sender_initializer.send(sender).unwrap(); + } + fn get_best_block_from_best_proposal_header(&self, header: &HeaderView<'_>) -> BlockHash { header.parent_hash() } @@ -346,7 +353,7 @@ impl ConsensusEngine for Tendermint { } } -fn is_term_changed(header: &Header, parent: &Header, term_seconds: u64) -> bool { +pub(super) fn is_term_changed(header: &Header, parent: &Header, term_seconds: u64) -> bool { if term_seconds == 0 { return false } diff --git a/core/src/consensus/tendermint/mod.rs b/core/src/consensus/tendermint/mod.rs index 8a69eaeb09..8e7fdc9c7c 100644 --- a/core/src/consensus/tendermint/mod.rs +++ b/core/src/consensus/tendermint/mod.rs @@ -41,6 +41,7 @@ pub use self::types::{Height, Step, View}; use super::{stake, ValidatorSet}; use crate::client::ConsensusClient; use crate::codechain_machine::CodeChainMachine; +use crate::snapshot_notify::NotifySender as SnapshotNotifySender; use crate::ChainNotify; /// Timer token representing the consensus step timeouts. @@ -58,6 +59,7 @@ pub struct Tendermint { client: RwLock>>, external_params_initializer: crossbeam::Sender, extension_initializer: crossbeam::Sender<(crossbeam::Sender, Weak)>, + snapshot_notify_sender_initializer: crossbeam::Sender, timeouts: TimeoutParams, join: Option>, quit_tendermint: crossbeam::Sender<()>, @@ -93,8 +95,14 @@ impl Tendermint { let timeouts = our_params.timeouts; let machine = Arc::new(machine); - let (join, external_params_initializer, extension_initializer, inner, quit_tendermint) = - worker::spawn(our_params.validators); + let ( + join, + external_params_initializer, + extension_initializer, + snapshot_notify_sender_initializer, + inner, + quit_tendermint, + ) = worker::spawn(our_params.validators); let action_handlers: Vec> = vec![stake.clone()]; let chain_notify = Arc::new(TendermintChainNotify::new(inner.clone())); @@ -102,6 +110,7 @@ impl Tendermint { client: Default::default(), external_params_initializer, extension_initializer, + snapshot_notify_sender_initializer, timeouts, join: Some(join), quit_tendermint, diff --git a/core/src/consensus/tendermint/worker.rs b/core/src/consensus/tendermint/worker.rs index 5956ad6d9a..c6a93ef6b0 100644 --- a/core/src/consensus/tendermint/worker.rs +++ b/core/src/consensus/tendermint/worker.rs @@ -50,6 +50,7 @@ use crate::consensus::validator_set::{DynamicValidator, ValidatorSet}; use crate::consensus::{EngineError, Seal}; use crate::encoded; use crate::error::{BlockError, Error}; +use crate::snapshot_notify::NotifySender as SnapshotNotifySender; use crate::transaction::{SignedTransaction, UnverifiedTransaction}; use crate::views::BlockView; use crate::BlockId; @@ -59,6 +60,7 @@ type SpawnResult = ( JoinHandle<()>, crossbeam::Sender, crossbeam::Sender<(crossbeam::Sender, Weak)>, + crossbeam::Sender, crossbeam::Sender, crossbeam::Sender<()>, ); @@ -97,6 +99,7 @@ struct Worker { time_gap_params: TimeGapParams, timeout_token_nonce: usize, vote_regression_checker: VoteRegressionChecker, + snapshot_notify_sender: SnapshotNotifySender, } pub enum Event { @@ -180,6 +183,7 @@ impl Worker { extension: EventSender, client: Weak, time_gap_params: TimeGapParams, + snapshot_notify_sender: SnapshotNotifySender, ) -> Self { Worker { client, @@ -198,6 +202,7 @@ impl Worker { time_gap_params, timeout_token_nonce: ENGINE_TIMEOUT_TOKEN_NONCE_BASE, vote_regression_checker: VoteRegressionChecker::new(), + snapshot_notify_sender, } } @@ -206,6 +211,7 @@ impl Worker { let (quit, quit_receiver) = crossbeam::bounded(1); let (external_params_initializer, external_params_receiver) = crossbeam::bounded(1); let (extension_initializer, extension_receiver) = crossbeam::bounded(1); + let (snapshot_notify_sender_initializer, snapshot_notify_sender_receiver) = crossbeam::bounded(1); let join = Builder::new() .name("tendermint".to_string()) .spawn(move || { @@ -249,8 +255,29 @@ impl Worker { return } }; + // TODO: Make initialization steps to order insensitive. + let snapshot_notify_sender = crossbeam::select! { + recv(snapshot_notify_sender_receiver) -> msg => { + match msg { + Ok(sender) => sender, + Err(crossbeam::RecvError) => { + cerror!(ENGINE, "The tendermint extension is not initalized."); + return + } + } + } + recv(quit_receiver) -> msg => { + match msg { + Ok(()) => {}, + Err(crossbeam::RecvError) => { + cerror!(ENGINE, "The quit channel for tendermint thread had been closed."); + } + } + return + } + }; validators.register_client(Weak::clone(&client)); - let mut inner = Self::new(validators, extension, client, time_gap_params); + let mut inner = Self::new(validators, extension, client, time_gap_params, snapshot_notify_sender); loop { crossbeam::select! { recv(receiver) -> msg => { @@ -374,7 +401,7 @@ impl Worker { } }) .unwrap(); - (join, external_params_initializer, extension_initializer, sender, quit) + (join, external_params_initializer, extension_initializer, snapshot_notify_sender_initializer, sender, quit) } /// The client is a thread-safe struct. Using it in multi-threads is safe. @@ -1620,6 +1647,8 @@ impl Worker { } }; + self.send_snapshot_notify(c.as_ref(), enacted.as_slice()); + if self.step.is_commit() && (imported.len() + enacted.len() == 1) { let (_, committed_block_hash) = self.step.committed().expect("Commit state always has block_hash"); if imported.first() == Some(&committed_block_hash) { @@ -1671,6 +1700,26 @@ impl Worker { } } + // Notify once for the latest block even if multiple blocks have been enacted. + fn send_snapshot_notify(&mut self, c: &dyn ConsensusClient, enacted: &[BlockHash]) { + let mut last_snapshot_point = None; + for block_hash in enacted.iter().rev() { + let block_id = BlockId::Hash(*block_hash); + + if c.current_term_id(block_id).expect("State trie should exist for enacted block") > 0 { + let last_term_finished_block_num = c.last_term_finished_block_num(block_id).expect("Block is enacted"); + let block_number = c.block_number(&block_id).expect("Block number should exist for enacted block"); + if last_term_finished_block_num + 1 == block_number { + last_snapshot_point = Some(block_hash); + } + } + } + if let Some(last_snapshot_point) = last_snapshot_point { + // TODO: Reduce the snapshot frequency. + self.snapshot_notify_sender.notify(*last_snapshot_point); + } + } + fn send_proposal_block( &self, signature: SchnorrSignature, diff --git a/core/src/lib.rs b/core/src/lib.rs index fc3a0dc0e9..7f8d2e1213 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -62,6 +62,7 @@ mod tests; pub use crate::account_provider::{AccountProvider, Error as AccountProviderError}; pub use crate::block::Block; +pub use crate::client::snapshot_notify; pub use crate::client::{ AccountData, AssetClient, BlockChainClient, BlockChainTrait, ChainNotify, Client, ClientConfig, DatabaseClient, EngineClient, EngineInfo, ExecuteClient, ImportBlock, MiningBlockChainClient, Shard, StateInfo, TermInfo, diff --git a/foundry/config/mod.rs b/foundry/config/mod.rs index 0cbe1d9ce0..a373bf2aa8 100644 --- a/foundry/config/mod.rs +++ b/foundry/config/mod.rs @@ -296,6 +296,8 @@ fn default_enable_devel_api() -> bool { pub struct Snapshot { pub disable: Option, pub path: Option, + // Snapshot's age in blocks + pub expiration: Option, } #[derive(Deserialize)] @@ -702,6 +704,9 @@ impl Snapshot { if other.path.is_some() { self.path = other.path.clone(); } + if other.expiration.is_some() { + self.expiration = other.expiration; + } } pub fn overwrite_with(&mut self, matches: &clap::ArgMatches<'_>) -> Result<(), String> { diff --git a/foundry/config/presets/config.dev.toml b/foundry/config/presets/config.dev.toml index 801e10840b..53b6224325 100644 --- a/foundry/config/presets/config.dev.toml +++ b/foundry/config/presets/config.dev.toml @@ -50,6 +50,7 @@ max_connections = 100 [snapshot] disable = false path = "snapshot" +expiration = 100000 # blocks. About a week [email_alarm] disable = true diff --git a/foundry/config/presets/config.prod.toml b/foundry/config/presets/config.prod.toml index 2c45b38ccd..e54ed4f17b 100644 --- a/foundry/config/presets/config.prod.toml +++ b/foundry/config/presets/config.prod.toml @@ -50,6 +50,7 @@ max_connections = 100 [snapshot] disable = true path = "snapshot" +expiration = 100000 # blocks. About a week [email_alarm] disable = true diff --git a/foundry/run_node.rs b/foundry/run_node.rs index 2fa44e59ba..b47f77abaa 100644 --- a/foundry/run_node.rs +++ b/foundry/run_node.rs @@ -19,6 +19,7 @@ use std::path::Path; use std::sync::{Arc, Weak}; use std::time::{SystemTime, UNIX_EPOCH}; +use ccore::snapshot_notify; use ccore::{ AccountProvider, AccountProviderError, BlockId, ChainNotify, ClientConfig, ClientService, EngineInfo, EngineType, Miner, MinerService, Scheme, NUM_COLUMNS, @@ -30,7 +31,8 @@ use ckeystore::KeyStore; use clap::ArgMatches; use clogger::{self, EmailAlarm, LoggerConfig}; use cnetwork::{Filters, NetworkConfig, NetworkControl, NetworkService, RoutingTable, SocketAddr}; -use csync::{BlockSyncExtension, BlockSyncSender, SnapshotService, TransactionSyncExtension}; +use csync::snapshot::Service as SnapshotService; +use csync::{BlockSyncExtension, BlockSyncSender, TransactionSyncExtension}; use ctimer::TimerLoop; use ctrlc::CtrlC; use fdlimit::raise_fd_limit; @@ -328,12 +330,12 @@ pub fn run_node(matches: &ArgMatches<'_>) -> Result<(), String> { }; let _snapshot_service = { + let client = client.client(); + let (tx, rx) = snapshot_notify::create(); + client.engine().register_snapshot_notify_sender(tx); if !config.snapshot.disable.unwrap() { - // FIXME: Let's make it load snapshot period dynamically to support changing the period. - let client = client.client(); - let snapshot_period = client.common_params(BlockId::Latest).unwrap().snapshot_period(); - let service = SnapshotService::new(Arc::clone(&client), config.snapshot.path.unwrap(), snapshot_period); - client.add_notify(Arc::downgrade(&service) as Weak); + let service = + Arc::new(SnapshotService::new(client, rx, config.snapshot.path.unwrap(), config.snapshot.expiration)); Some(service) } else { None @@ -342,6 +344,7 @@ pub fn run_node(matches: &ArgMatches<'_>) -> Result<(), String> { // drop the scheme to free up genesis state. drop(scheme); + client.client().engine().complete_register(); cinfo!(TEST_SCRIPT, "Initialization complete"); diff --git a/sync/src/lib.rs b/sync/src/lib.rs index 108daa8ba6..b6efd70ecf 100644 --- a/sync/src/lib.rs +++ b/sync/src/lib.rs @@ -33,11 +33,10 @@ extern crate tempfile; extern crate trie_standardmap; mod block; -mod snapshot; +pub mod snapshot; mod transaction; pub use crate::block::{BlockSyncEvent, BlockSyncExtension, BlockSyncSender}; -pub use crate::snapshot::SnapshotService; pub use crate::transaction::TransactionSyncExtension; #[cfg(test)] diff --git a/sync/src/snapshot/mod.rs b/sync/src/snapshot/mod.rs index f0e8c6bedc..90cb6247af 100644 --- a/sync/src/snapshot/mod.rs +++ b/sync/src/snapshot/mod.rs @@ -1,4 +1,4 @@ -// Copyright 2018 Kodebox, Inc. +// Copyright 2019 Kodebox, Inc. // This file is part of CodeChain. // // This program is free software: you can redistribute it and/or modify @@ -14,9 +14,189 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -mod error; -mod service; -#[cfg_attr(feature = "cargo-clippy", allow(clippy::module_inception))] -mod snapshot; -pub use self::service::Service as SnapshotService; +use std::fs; +use std::path::PathBuf; +use std::str::FromStr; +use std::sync::Arc; +use std::thread::{spawn, JoinHandle}; + +use ccore::snapshot_notify::{NotifyReceiverSource, ReceiverCanceller}; +use ccore::{BlockChainClient, BlockChainTrait, BlockId, Client}; +use cdb::{AsHashDB, HashDB}; +use cmerkle::snapshot::{ChunkCompressor, Error as SnapshotError, Snapshot}; +use cstate::{StateDB, TopLevelState, TopStateView}; +use ctypes::BlockHash; +use primitives::H256; + +pub struct Service { + join_handle: Option>, + canceller: Option, +} + +pub fn snapshot_dir(root_dir: &str, block: &BlockHash) -> PathBuf { + let mut path = PathBuf::new(); + path.push(root_dir); + path.push(format!("{:x}", **block)); + path +} + +pub fn snapshot_path(root_dir: &str, block: &BlockHash, chunk_root: &H256) -> PathBuf { + let mut path = snapshot_dir(root_dir, block); + path.push(format!("{:x}", chunk_root)); + path +} + +impl Service { + pub fn new( + client: Arc, + notify_receiver_source: NotifyReceiverSource, + root_dir: String, + expiration: Option, + ) -> Self { + let NotifyReceiverSource(canceller, receiver) = notify_receiver_source; + let join_handle = spawn(move || { + cinfo!(SYNC, "Snapshot service is on"); + while let Ok(block_hash) = receiver.recv() { + cinfo!(SYNC, "Snapshot is requested for block: {}", block_hash); + let state_root = if let Some(header) = client.block_header(&BlockId::Hash(block_hash)) { + header.state_root() + } else { + cerror!(SYNC, "There isn't corresponding header for the requested block hash: {}", block_hash,); + continue + }; + { + let db_lock = client.state_db().read(); + if let Err(err) = snapshot(&db_lock, block_hash, state_root, &root_dir) { + cerror!( + SYNC, + "Snapshot request failed for block: {}, chunk_root: {}, err: {}", + block_hash, + state_root, + err + ); + } else { + cinfo!(SYNC, "Snapshot is ready for block: {}", block_hash) + } + } + + if let Some(expiration) = expiration { + if let Err(err) = cleanup_expired(&client, &root_dir, expiration) { + cerror!(SYNC, "Snapshot cleanup error after block hash {}, err: {}", block_hash, err); + } + } + // TODO: Prune old snapshots + } + cinfo!(SYNC, "Snapshot service is stopped") + }); + + Self { + canceller: Some(canceller), + join_handle: Some(join_handle), + } + } +} +fn snapshot(db: &StateDB, block_hash: BlockHash, root: H256, dir: &str) -> Result<(), SnapshotError> { + snapshot_trie(db.as_hashdb(), block_hash, root, dir)?; + + let top_state = TopLevelState::from_existing(db.clone(&root), root)?; + let shard_roots = { + let metadata = top_state.metadata()?.expect("Metadata must exist for snapshot block"); + let shard_num = *metadata.number_of_shards(); + (0..shard_num).map(|n| top_state.shard_root(n)) + }; + for sr in shard_roots { + snapshot_trie(db.as_hashdb(), block_hash, sr?.expect("Shard root must exist"), dir)?; + } + Ok(()) +} + +fn snapshot_trie(db: &dyn HashDB, block_hash: BlockHash, root: H256, root_dir: &str) -> Result<(), SnapshotError> { + let snapshot_dir = snapshot_dir(root_dir, &block_hash); + fs::create_dir_all(snapshot_dir)?; + + for chunk in Snapshot::from_hashdb(db, root) { + let chunk_path = snapshot_path(root_dir, &block_hash, &chunk.root); + let chunk_file = fs::File::create(chunk_path)?; + let compressor = ChunkCompressor::new(chunk_file); + compressor.compress_chunk(&chunk)?; + } + + Ok(()) +} + +fn cleanup_expired(client: &Client, root_dir: &str, expiration: u64) -> Result<(), SnapshotError> { + for entry in fs::read_dir(root_dir)? { + let entry = match entry { + Ok(entry) => entry, + Err(err) => { + cerror!(SYNC, "Snapshot cleanup can't retrieve entry. err: {}", err); + continue + } + }; + let path = entry.path(); + + match entry.file_type().map(|x| x.is_dir()) { + Ok(true) => {} + Ok(false) => continue, + Err(err) => { + cerror!(SYNC, "Snapshot cleanup can't retrieve file info: {}, err: {}", path.to_string_lossy(), err); + continue + } + } + + let name = match path.file_name().expect("Directories always have file name").to_str() { + Some(n) => n, + None => continue, + }; + let hash = match H256::from_str(name) { + Ok(h) => BlockHash::from(h), + Err(_) => continue, + }; + let number = if let Some(number) = client.block_number(&BlockId::Hash(hash)) { + number + } else { + cerror!(SYNC, "Snapshot cleanup can't retrieve block number for block_hash: {}", hash); + continue + }; + + if number + expiration < client.best_block_header().number() { + cleanup_snapshot(root_dir, hash) + } + } + Ok(()) +} + +/// Remove all files in `root_dir/block_hash` +fn cleanup_snapshot(root_dir: &str, block_hash: BlockHash) { + let path = snapshot_dir(root_dir, &block_hash); + let rename_to = PathBuf::from(root_dir).join(format!("{:x}.old", *block_hash)); + // It is okay to ignore errors. We just wanted them to be removed. + match fs::rename(path, &rename_to) { + Ok(()) => {} + Err(err) => { + cerror!(SYNC, "Snapshot cleanup: renaming {} failed, reason: {}", block_hash, err); + } + } + // Ignore the error. Cleanup failure is not a critical error. + match fs::remove_dir_all(rename_to) { + Ok(()) => {} + Err(err) => { + cerror!(SYNC, "Snapshot cleanup: removing {} failed, reason: {}", block_hash, err); + } + } +} + +impl Drop for Service { + fn drop(&mut self) { + if let Some(canceller) = self.canceller.take() { + // The thread corresponding to the `self.join_handle` waits for the `self.canceller` is dropped. + // It must be dropped first not to make deadlock at `handle.join()`. + drop(canceller); + } + + if let Some(handle) = self.join_handle.take() { + handle.join().expect("Snapshot service thread shouldn't panic"); + } + } +} diff --git a/sync/src/snapshot/service.rs b/sync/src/snapshot/service.rs deleted file mode 100644 index e076dc3661..0000000000 --- a/sync/src/snapshot/service.rs +++ /dev/null @@ -1,76 +0,0 @@ -// Copyright 2018 Kodebox, Inc. -// This file is part of CodeChain. -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -use std::io::ErrorKind; -use std::path::PathBuf; -use std::sync::Arc; -use std::thread::spawn; - -use ccore::{BlockChainClient, BlockChainTrait, BlockId, ChainNotify, Client, DatabaseClient}; -use ctypes::BlockHash; - -use super::error::Error; -use super::snapshot::{Snapshot, WriteSnapshot}; - -pub struct Service { - client: Arc, - /// Snapshot root directory - root_dir: String, - /// Snapshot creation period in unit of block numbers - period: u64, -} - -impl Service { - pub fn new(client: Arc, root_dir: String, period: u64) -> Arc { - Arc::new(Self { - client, - root_dir, - period, - }) - } -} - -impl ChainNotify for Service { - /// fires when chain has new blocks. - fn new_blocks( - &self, - _imported: Vec, - _invalid: Vec, - enacted: Vec, - _retracted: Vec, - _sealed: Vec, - ) { - let best_number = self.client.chain_info().best_block_number; - let is_checkpoint = enacted - .iter() - .map(|hash| self.client.block_number(&BlockId::Hash(*hash)).expect("Enacted block must exist")) - .any(|number| number % self.period == 0); - if is_checkpoint && best_number > self.period { - let number = (best_number / self.period - 1) * self.period; - let header = self.client.block_header(&BlockId::Number(number)).expect("Snapshot target must exist"); - - let db = self.client.database(); - let path: PathBuf = [self.root_dir.clone(), format!("{:x}", *header.hash())].iter().collect(); - let root = header.state_root(); - // FIXME: The db can be corrupted because the CodeChain doesn't wait child threads end on exit. - spawn(move || match Snapshot::try_new(path).map(|s| s.write_snapshot(db.as_ref(), &root)) { - Ok(_) => {} - Err(Error::FileError(ErrorKind::AlreadyExists)) => {} - Err(e) => cerror!(SNAPSHOT, "{}", e), - }); - } - } -} diff --git a/sync/src/snapshot/snapshot.rs b/sync/src/snapshot/snapshot.rs deleted file mode 100644 index 3d2a5e099f..0000000000 --- a/sync/src/snapshot/snapshot.rs +++ /dev/null @@ -1,391 +0,0 @@ -// Copyright 2018-2019 Kodebox, Inc. -// This file is part of CodeChain. -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU Affero General Public License as -// published by the Free Software Foundation, either version 3 of the -// License, or (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Affero General Public License for more details. -// -// You should have received a copy of the GNU Affero General Public License -// along with this program. If not, see . - -use std::collections::HashSet; -use std::convert::AsRef; -use std::fs::{create_dir_all, File}; -use std::io::{Read, Write}; -use std::iter::once; -use std::path::{Path, PathBuf}; -use std::sync::Arc; - -use ccore::COL_STATE; -use cdb::{new_journaldb, Algorithm, JournalDB}; -use cmerkle::Node; -use kvdb::KeyValueDB; -use primitives::H256; -use rlp::{Rlp, RlpStream}; -use snap; - -use super::error::Error; - -pub struct Snapshot { - path: PathBuf, -} - -impl Snapshot { - pub fn try_new

(path: P) -> Result - where - P: AsRef, { - create_dir_all(&path)?; - Ok(Snapshot { - path: path.as_ref().to_owned(), - }) - } -} - -impl Snapshot { - fn file_for(&self, root: &H256) -> PathBuf { - self.path.join(format!("{:x}", root)) - } - - fn write_nodes<'a, I>(&self, root: &H256, iter: I) -> Result<(), Error> - where - I: IntoIterator)>, { - let file = File::create(self.file_for(root))?; - let mut snappy = snap::Writer::new(file); - - let mut stream = RlpStream::new(); - stream.begin_unbounded_list(); - for (key, value) in iter { - stream.begin_list(2); - stream.append(key); - stream.append(value); - } - stream.complete_unbounded_list(); - - snappy.write_all(&stream.drain())?; - Ok(()) - } - - fn read_chunk(&self, backing: Arc, root: &H256) -> Result { - let file = File::open(self.file_for(root))?; - let mut buf = Vec::new(); - let mut snappy = snap::Reader::new(file); - snappy.read_to_end(&mut buf)?; - - let rlp = Rlp::new(&buf); - let mut journal = new_journaldb(backing, Algorithm::Archive, COL_STATE); - let mut inserted_keys = HashSet::new(); - let mut referenced_keys = HashSet::new(); - referenced_keys.insert(*root); - for rlp_pair in rlp.iter() { - if rlp_pair.item_count().unwrap() != 2 { - return Err(Error::SyncError("Chunk contains invalid size of pair".to_string())) - } - - let key = rlp_pair.val_at(0).unwrap(); - let value: Vec<_> = rlp_pair.val_at(1).unwrap(); - - let node = - Node::decoded(&value).ok_or_else(|| Error::SyncError("Chunk condtains an invalid node".to_string()))?; - - if journal.contains(&key) { - cwarn!(SNAPSHOT, "Chunk contains duplicated key: {}", key); - } - - if let Node::Branch(_, childs) = node { - for child in childs.iter() { - if let Some(child) = child { - referenced_keys.insert(*child); - } - } - } - - let hash_key = journal.insert(&value); - if hash_key != key { - return Err(Error::SyncError("Chunk contains an invalid key for a value".to_string())) - } - inserted_keys.insert(hash_key); - } - - let never_referenced_keys: Vec = - inserted_keys.iter().filter(|key| !referenced_keys.contains(key)).cloned().collect(); - - Ok(Chunk { - journal, - never_referenced_keys, - }) - } -} - -struct Chunk { - journal: Box, - never_referenced_keys: Vec, -} - -impl Chunk { - fn purge(&mut self) -> bool { - if self.never_referenced_keys.is_empty() { - return false - } - for key in &self.never_referenced_keys { - self.journal.remove(key); - } - self.never_referenced_keys.clear(); - true - } - - fn is_deeper_than(&self, root: &H256, max_depth: usize) -> bool { - let mut stack = Vec::new(); - stack.push((*root, 0)); - while let Some((key, depth)) = stack.pop() { - match self.journal.get(&key) { - None => continue, - Some(_) if depth >= max_depth => return false, - Some(value) => { - if let Some(Node::Branch(_, childs)) = Node::decoded(&value) { - for child in childs.iter() { - if let Some(child) = child { - stack.push((*child, depth + 1)); - } - } - } - } - } - } - false - } - - fn missing_keys(&self, root: &H256) -> Vec { - let mut result = Vec::new(); - let mut stack = Vec::new(); - stack.push(*root); - while let Some(key) = stack.pop() { - match self.journal.get(&key) { - None => { - result.push(key); - } - Some(value) => { - if let Some(Node::Branch(_, childs)) = Node::decoded(&value) { - for child in childs.iter() { - if let Some(child) = child { - stack.push(*child); - } - } - } - } - } - } - result - } -} - -pub trait WriteSnapshot { - fn write_snapshot(&self, db: &dyn KeyValueDB, root: &H256) -> Result<(), Error>; -} - -pub trait ReadSnapshot { - fn read_snapshot(&self, db: Arc, root: &H256) -> Result<(), Error>; -} - -impl WriteSnapshot for Snapshot { - fn write_snapshot(&self, db: &dyn KeyValueDB, root: &H256) -> Result<(), Error> { - let root_val = match db.get(COL_STATE, root) { - Ok(Some(value)) => value.to_vec(), - Ok(None) => return Err(Error::SyncError("Invalid state root, or the database is empty".to_string())), - Err(e) => return Err(e.into()), - }; - - let children = children_of(db, &root_val)?; - let mut grandchildren = Vec::new(); - for (_, value) in &children { - grandchildren.extend(children_of(db, value)?); - } - - self.write_nodes(root, once(&(*root, root_val)).chain(&children))?; - for (grandchild, _) in &grandchildren { - let nodes = enumerate_subtree(db, grandchild)?; - self.write_nodes(grandchild, &nodes)?; - } - - Ok(()) - } -} - -impl ReadSnapshot for Snapshot { - fn read_snapshot(&self, db: Arc, root: &H256) -> Result<(), Error> { - let head = { - let mut head = self.read_chunk(db.clone(), root)?; - if head.purge() { - cinfo!(SNAPSHOT, "Head chunk contains garbages"); - } - - if head.is_deeper_than(root, 2) { - return Err(Error::SyncError("Head chunk has an invalid shape".to_string())) - } - - let mut transaction = db.transaction(); - head.journal.inject(&mut transaction)?; - db.write_buffered(transaction); - head - }; - - for chunk_root in head.missing_keys(root) { - let mut chunk = self.read_chunk(db.clone(), &chunk_root)?; - if chunk.purge() { - cinfo!(SNAPSHOT, "Chunk contains garbages"); - } - - if !chunk.missing_keys(&chunk_root).is_empty() { - return Err(Error::SyncError("Chunk is an incomplete trie".to_string())) - } - - let mut transaction = db.transaction(); - chunk.journal.inject(&mut transaction)?; - db.write_buffered(transaction); - } - - Ok(()) - } -} - -fn get_node(db: &dyn KeyValueDB, key: &H256) -> Result, Error> { - match db.get(COL_STATE, key) { - Ok(Some(value)) => Ok(value.to_vec()), - Ok(None) => Err(Error::NodeNotFound(*key)), - Err(e) => Err(e.into()), - } -} - -fn children_of(db: &dyn KeyValueDB, node: &[u8]) -> Result)>, Error> { - let keys = match Node::decoded(node) { - None => Vec::new(), - Some(Node::Leaf(..)) => Vec::new(), - Some(Node::Branch(_, children)) => children.iter().filter_map(|child| *child).collect(), - }; - - let mut result = Vec::new(); - for key in keys { - result.push((key, get_node(db, &key)?)); - } - Ok(result) -} - -fn enumerate_subtree(db: &dyn KeyValueDB, root: &H256) -> Result)>, Error> { - let node = get_node(db, root)?; - let children = match Node::decoded(&node) { - None => Vec::new(), - Some(Node::Leaf(..)) => Vec::new(), - Some(Node::Branch(_, children)) => children.iter().filter_map(|child| *child).collect(), - }; - let mut result: Vec<_> = vec![(*root, node)]; - for child in children { - result.extend(enumerate_subtree(db, &child)?); - } - Ok(result) -} - -#[cfg(test)] -mod tests { - use std::collections::HashSet; - use std::sync::Arc; - - use ccore::COL_STATE; - - use cmerkle::{Trie, TrieFactory, TrieMut}; - use kvdb_memorydb; - use primitives::H256; - use tempfile::tempdir; - use trie_standardmap::{Alphabet, StandardMap, ValueMode}; - - use super::*; - - #[test] - fn init() { - let snapshot_dir = tempdir().unwrap(); - let snapshot = Snapshot::try_new(&snapshot_dir).unwrap(); - let mut root = H256::new(); - - let kvdb = Arc::new(kvdb_memorydb::create(1)); - let mut jdb = new_journaldb(kvdb.clone(), Algorithm::Archive, COL_STATE); - { - let _ = TrieFactory::create(jdb.as_hashdb_mut(), &mut root); - } - /* do nothing */ - let result = snapshot.write_snapshot(kvdb.as_ref(), &root); - - assert!(result.is_err()); - } - - fn random_insert_and_restore_with_count(count: usize) { - let mut seed = H256::new(); - let x = StandardMap { - alphabet: Alphabet::Custom(b"@QWERTYUIOPASDFGHJKLZXCVBNM[/]^_".to_vec()), - min_key: 5, - journal_key: 0, - value_mode: ValueMode::Index, - count, - } - .make_with(&mut seed); - - let snapshot_dir = tempdir().unwrap(); - let snapshot = Snapshot::try_new(&snapshot_dir).unwrap(); - let mut root = H256::new(); - { - let kvdb = Arc::new(kvdb_memorydb::create(1)); - let mut jdb = new_journaldb(kvdb.clone(), Algorithm::Archive, COL_STATE); - { - let mut t = TrieFactory::create(jdb.as_hashdb_mut(), &mut root); - let mut inserted_keys = HashSet::new(); - for &(ref key, ref value) in &x { - if !inserted_keys.insert(key) { - continue - } - assert!(t.insert(key, value).unwrap().is_none()); - assert_eq!(t.insert(key, value).unwrap(), Some(value.to_vec())); - } - } - { - let mut batch = jdb.backing().transaction(); - let _ = jdb.inject(&mut batch).unwrap(); - jdb.backing().write(batch).unwrap(); - } - - snapshot.write_snapshot(kvdb.as_ref(), &root).unwrap(); - } - - { - let kvdb = Arc::new(kvdb_memorydb::create(1)); - snapshot.read_snapshot(kvdb.clone(), &root).unwrap(); - - let jdb = new_journaldb(kvdb, Algorithm::Archive, COL_STATE); - let t = TrieFactory::readonly(jdb.as_hashdb(), &root).unwrap(); - let mut inserted_keys = HashSet::new(); - for &(ref key, ref value) in &x { - if !inserted_keys.insert(key) { - continue - } - assert_eq!(t.get(key).unwrap(), Some(value.to_vec())); - } - } - } - - #[test] - fn random_insert_and_restore_1() { - random_insert_and_restore_with_count(1); - } - - #[test] - fn random_insert_and_restore_100() { - random_insert_and_restore_with_count(100); - } - - #[test] - fn random_insert_and_restore_10000() { - random_insert_and_restore_with_count(10000); - } -} diff --git a/util/merkle/Cargo.toml b/util/merkle/Cargo.toml index f95ba0ee98..75f7a0c9b8 100644 --- a/util/merkle/Cargo.toml +++ b/util/merkle/Cargo.toml @@ -10,6 +10,8 @@ codechain-crypto = { git = "https://github.com/CodeChain-io/rust-codechain-crypt codechain-db = { path = "../db" } primitives = { git = "https://github.com/CodeChain-io/rust-codechain-primitives.git", version = "0.4" } rlp = { git = "https://github.com/CodeChain-io/rlp.git", version = "0.4" } +rlp_derive = { git = "https://github.com/CodeChain-io/rlp.git", version = "0.2" } +snap = "0.2" [dev-dependencies] kvdb = "0.1" diff --git a/util/merkle/src/lib.rs b/util/merkle/src/lib.rs index e73f43491d..2e60e94304 100644 --- a/util/merkle/src/lib.rs +++ b/util/merkle/src/lib.rs @@ -16,6 +16,8 @@ extern crate codechain_crypto as ccrypto; extern crate codechain_db as cdb; +#[macro_use] +extern crate rlp_derive; #[cfg(test)] extern crate trie_standardmap as standardmap; @@ -29,6 +31,8 @@ use primitives::H256; mod nibbleslice; pub mod node; mod skewed; +#[allow(dead_code)] +pub mod snapshot; pub mod triedb; pub mod triedbmut; pub mod triehash; diff --git a/util/merkle/src/nibbleslice.rs b/util/merkle/src/nibbleslice.rs index 328cf92eeb..a97a9baaa8 100644 --- a/util/merkle/src/nibbleslice.rs +++ b/util/merkle/src/nibbleslice.rs @@ -17,7 +17,7 @@ use std::cmp::*; use std::fmt; -#[derive(Eq, Ord)] +#[derive(Eq, Ord, Copy, Clone)] pub struct NibbleSlice<'a> { pub data: &'a [u8], pub offset: usize, diff --git a/util/merkle/src/node.rs b/util/merkle/src/node.rs index 66f2704808..4d556860d6 100644 --- a/util/merkle/src/node.rs +++ b/util/merkle/src/node.rs @@ -112,4 +112,11 @@ impl<'a> Node<'a> { } } } + + pub fn mid(self, offset: usize) -> Self { + match self { + Node::Leaf(partial, value) => Node::Leaf(partial.mid(offset), value), + Node::Branch(partial, child) => Node::Branch(partial.mid(offset), child), + } + } } diff --git a/util/merkle/src/snapshot/chunk.rs b/util/merkle/src/snapshot/chunk.rs new file mode 100644 index 0000000000..e69b04fbf9 --- /dev/null +++ b/util/merkle/src/snapshot/chunk.rs @@ -0,0 +1,284 @@ +// Copyright 2019 Kodebox, Inc. +// This file is part of CodeChain. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::collections::VecDeque; +use std::convert::From; + +use ccrypto::BLAKE_NULL_RLP; +use cdb::{DBValue, HashDB, MemoryDB}; +use primitives::H256; + +use super::error::{ChunkError, Error}; +use super::{DecodedPathSlice, PathSlice, CHUNK_HEIGHT}; +use crate::nibbleslice::NibbleSlice; +use crate::{Node, TrieDBMut}; + +#[derive(RlpEncodable, RlpDecodable, Eq, PartialEq)] +pub struct TerminalNode { + // Relative path from the chunk root. + pub path_slice: PathSlice, + pub node_rlp: Vec, +} + +impl std::fmt::Debug for TerminalNode { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { + let path_slice = NibbleSlice::from_encoded(&self.path_slice); + f.debug_struct("TerminalNode") + .field("path_slice", &path_slice) + .field("node_rlp", &NodeDebugAdaptor { + rlp: &self.node_rlp, + }) + .finish() + } +} + +struct NodeDebugAdaptor<'a> { + rlp: &'a [u8], +} + +impl<'a> std::fmt::Debug for NodeDebugAdaptor<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { + match Node::decoded(&self.rlp) { + Some(node) => write!(f, "{:?}", &node), + None => write!(f, "{:?}", self.rlp), + } + } +} + +/// An unverified chunk from the network +#[derive(Debug)] +pub struct RawChunk { + pub nodes: Vec, +} + +/// Fully recovered, and re-hydrated chunk. +pub struct RecoveredChunk { + pub(crate) root: H256, + /// contains all nodes including non-terminal nodes and terminal nodes. + /// You can blindly pour all items in `nodes` into `HashDB`. + pub(crate) nodes: Vec<(H256, DBValue)>, + /// Their path slices are relative to this chunk root. + pub(crate) unresolved_chunks: Vec, +} + +impl RawChunk { + /// Verify and recover the chunk + pub fn recover(&self, expected_chunk_root: H256) -> Result { + let mut memorydb = MemoryDB::new(); + let mut chunk_root = H256::new(); + + { + let mut trie = TrieDBMut::new(&mut memorydb, &mut chunk_root); + for node in self.nodes.iter() { + let old_val = match Node::decoded(&node.node_rlp) { + Some(Node::Branch(slice, child)) => { + let encoded = DecodedPathSlice::from_encoded(&node.path_slice).with_slice(slice).encode(); + trie.insert_raw(Node::Branch(NibbleSlice::from_encoded(&encoded), child))? + } + Some(Node::Leaf(slice, data)) => { + let encoded = DecodedPathSlice::from_encoded(&node.path_slice).with_slice(slice).encode(); + trie.insert_raw(Node::Leaf(NibbleSlice::from_encoded(&encoded), data))? + } + None => return Err(ChunkError::InvalidContent.into()), + }; + + if let Some(old_val) = old_val { + if old_val != node.node_rlp.as_slice() { + return Err(ChunkError::InvalidContent.into()) + } + } + } + } + + // Some nodes in the chunk is different from the expected. + if chunk_root != expected_chunk_root { + return Err(ChunkError::ChunkRootMismatch { + expected: expected_chunk_root, + actual: chunk_root, + } + .into()) + } + + let mut nodes = Vec::new(); + let mut unresolved_chunks = Vec::new(); + let mut queue: VecDeque = VecDeque::from(vec![NodePath::new(chunk_root)]); + while let Some(path) = queue.pop_front() { + let node = match memorydb.get(&path.key) { + Some(x) => x, + None => { + // all unresolved should depth == CHUNK_HEIGHT + 1 + if path.depth != CHUNK_HEIGHT + 1 { + return Err(ChunkError::InvalidHeight.into()) + } + + unresolved_chunks.push(UnresolvedChunk::from(path)); + continue + } + }; + + if path.depth > CHUNK_HEIGHT { + return Err(ChunkError::InvalidHeight.into()) + } + nodes.push((path.key, node.clone())); + + let node = Node::decoded(&node).expect("Chunk root was verified; Node can't be wrong"); + if let Node::Branch(slice, children) = node { + for (index, child) in children.iter().enumerate() { + if let Some(child) = child { + queue.push_back(path.with_slice_and_index(slice, index, *child)); + } + } + } + } + + Ok(RecoveredChunk { + root: expected_chunk_root, + nodes, + unresolved_chunks, + }) + } +} + +impl std::fmt::Debug for RecoveredChunk { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { + struct Adapter<'a>(&'a [(H256, DBValue)]); + impl<'a> std::fmt::Debug for Adapter<'a> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { + f.debug_list() + .entries(self.0.iter().map(|(hash, rlp)| { + (hash, NodeDebugAdaptor { + rlp, + }) + })) + .finish() + } + } + + f.debug_struct("RecoveredChunk") + .field("root", &self.root) + .field("nodes", &Adapter(&self.nodes)) + .field("unresolved_chunks", &self.unresolved_chunks) + .finish() + } +} + +/// Chunk obtained from the state db. +#[derive(Debug)] +pub struct Chunk { + pub root: H256, + pub terminal_nodes: Vec, +} + +impl Chunk { + pub(crate) fn from_chunk_root(db: &dyn HashDB, chunk_root: H256) -> Chunk { + let mut unresolved: VecDeque = VecDeque::from(vec![NodePath::new(chunk_root)]); + let mut terminal_nodes: Vec = Vec::new(); + while let Some(path) = unresolved.pop_front() { + assert!(path.key != BLAKE_NULL_RLP, "Empty DB"); + assert!(path.depth <= CHUNK_HEIGHT); + let node = db.get(&path.key).expect("Can't find the node in a db. DB is inconsistent"); + let node_decoded = Node::decoded(&node).expect("Node cannot be decoded. DB is inconsistent"); + + match node_decoded { + // Continue to BFS + Node::Branch(slice, ref children) if path.depth < CHUNK_HEIGHT => { + for (i, hash) in children.iter().enumerate() { + if let Some(hash) = hash { + unresolved.push_back(path.with_slice_and_index(slice, i, *hash)); + } + } + } + // Reached the terminal node. Branch at path.depth == CHUNK_HEIGHT || Leaf + _ => terminal_nodes.push(TerminalNode { + path_slice: path.path_slice.encode(), + node_rlp: node.to_vec(), + }), + }; + } + Chunk { + root: chunk_root, + terminal_nodes, + } + } + + // Returns path slices to unresolved chunk roots relative to this chunk root + pub(crate) fn unresolved_chunks(&self) -> Vec { + let mut result = Vec::new(); + for node in self.terminal_nodes.iter() { + let decoded = Node::decoded(&node.node_rlp).expect("All terminal nodes should be valid"); + if let Node::Branch(slice, children) = decoded { + for (i, child) in children.iter().enumerate() { + if let Some(child) = child { + result.push(UnresolvedChunk { + path_slice: DecodedPathSlice::from_encoded(&node.path_slice).with_slice_and_index(slice, i), + chunk_root: *child, + }) + } + } + } + } + result + } + + #[cfg(test)] + pub(crate) fn into_raw_chunk(self) -> RawChunk { + RawChunk { + nodes: self.terminal_nodes, + } + } +} + +/// path slice to `chunk_root` is relative to the root of originating chunk. +#[derive(Debug)] +pub(crate) struct UnresolvedChunk { + pub path_slice: DecodedPathSlice, + pub chunk_root: H256, +} + +impl From for UnresolvedChunk { + fn from(path: NodePath) -> Self { + Self { + path_slice: path.path_slice, + chunk_root: path.key, + } + } +} + +#[derive(Debug)] +struct NodePath { + // path slice to the node relative to chunk_root + path_slice: DecodedPathSlice, + depth: usize, + key: H256, +} + +impl NodePath { + fn new(key: H256) -> NodePath { + NodePath { + path_slice: DecodedPathSlice::new(), + depth: 1, + key, + } + } + + fn with_slice_and_index(&self, slice: NibbleSlice, index: usize, key: H256) -> NodePath { + NodePath { + path_slice: self.path_slice.with_slice_and_index(slice, index), + depth: self.depth + 1, + key, + } + } +} diff --git a/util/merkle/src/snapshot/compress.rs b/util/merkle/src/snapshot/compress.rs new file mode 100644 index 0000000000..c03ea0cc08 --- /dev/null +++ b/util/merkle/src/snapshot/compress.rs @@ -0,0 +1,119 @@ +// Copyright 2019 Kodebox, Inc. +// This file is part of CodeChain. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::io::{Cursor, Read, Write}; + +use rlp::{Rlp, RlpStream}; + +use super::chunk::{Chunk, RawChunk}; +use super::error::{ChunkError, Error}; +use super::CHUNK_MAX_NODES; + +pub struct ChunkDecompressor { + read: R, +} + +impl ChunkDecompressor { + pub fn new(read: R) -> Self { + ChunkDecompressor { + read, + } + } +} + +impl<'a> ChunkDecompressor> { + fn from_slice(slice: &'a [u8]) -> Self { + ChunkDecompressor::new(Cursor::new(slice)) + } +} + +impl ChunkDecompressor +where + R: Read + Clone, +{ + pub fn decompress(self) -> Result { + let mut buf = Vec::new(); + + let mut snappy = snap::Reader::new(self.read); + snappy.read_to_end(&mut buf)?; + + let rlp = Rlp::new(&buf); + let len = rlp.item_count()?; + if len > CHUNK_MAX_NODES { + return Err(ChunkError::TooBig.into()) + } + + Ok(RawChunk { + nodes: rlp.as_list()?, + }) + } +} + +pub struct ChunkCompressor { + write: W, +} + +impl ChunkCompressor { + pub fn new(write: W) -> Self { + ChunkCompressor { + write, + } + } +} + +impl ChunkCompressor +where + W: Write, +{ + pub fn compress_chunk(self, chunk: &Chunk) -> Result<(), Error> { + let mut rlp = RlpStream::new_list(chunk.terminal_nodes.len()); + for node in chunk.terminal_nodes.iter() { + rlp.append(node); + } + let mut snappy = snap::Writer::new(self.write); + snappy.write_all(rlp.as_raw())?; + Ok(()) + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::snapshot::chunk::{Chunk, TerminalNode}; + + #[test] + fn test_compress_decompress() { + let chunk = Chunk { + root: Default::default(), + terminal_nodes: vec![ + (TerminalNode { + path_slice: b"12345".to_vec(), + node_rlp: b"45678".to_vec(), + }), + (TerminalNode { + path_slice: b"56789".to_vec(), + node_rlp: b"123abc".to_vec(), + }), + ], + }; + + let mut buffer = Vec::new(); + ChunkCompressor::new(&mut buffer).compress_chunk(&chunk).unwrap(); + let decompressed = ChunkDecompressor::from_slice(&buffer).decompress().unwrap(); + + assert_eq!(chunk.terminal_nodes, decompressed.nodes); + } +} diff --git a/util/merkle/src/snapshot/error.rs b/util/merkle/src/snapshot/error.rs new file mode 100644 index 0000000000..cf947d485e --- /dev/null +++ b/util/merkle/src/snapshot/error.rs @@ -0,0 +1,91 @@ +// Copyright 2019 Kodebox, Inc. +// This file is part of CodeChain. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::fmt::{Display, Formatter}; +use std::io::Error as IoError; + +use primitives::H256; +use rlp::DecoderError as RlpDecoderError; + +use crate::TrieError; + +#[derive(Debug)] +pub enum Error { + IoError(IoError), + RlpDecoderError(RlpDecoderError), + TrieError(TrieError), + ChunkError(ChunkError), +} + +impl From for Error { + fn from(err: IoError) -> Self { + Error::IoError(err) + } +} + +impl From for Error { + fn from(err: RlpDecoderError) -> Self { + Error::RlpDecoderError(err) + } +} + +impl From for Error { + fn from(err: TrieError) -> Self { + Error::TrieError(err) + } +} + +impl From for Error { + fn from(err: ChunkError) -> Self { + Error::ChunkError(err) + } +} + +impl Display for Error { + fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> { + match self { + Error::IoError(err) => write!(f, "IoError: {}", err), + Error::RlpDecoderError(err) => write!(f, "RlpDecoderError: {}", err), + Error::TrieError(err) => write!(f, "TrieError: {}", err), + Error::ChunkError(err) => write!(f, "ChunkError: {}", err), + } + } +} + +#[derive(Debug)] +pub enum ChunkError { + TooBig, + InvalidHeight, + ChunkRootMismatch { + expected: H256, + actual: H256, + }, + InvalidContent, +} + +impl Display for ChunkError { + fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> { + match self { + ChunkError::TooBig => write!(f, "Chunk has too many elements"), + ChunkError::InvalidHeight => write!(f, "Chunk height is unexpected height"), + ChunkError::ChunkRootMismatch { + expected, + actual, + } => write!(f, "Chunk root is different from expected. expected: {}, actual: {}", expected, actual), + ChunkError::InvalidContent => write!(f, "Chunk content is invalid"), + } + } +} diff --git a/util/merkle/src/snapshot/mod.rs b/util/merkle/src/snapshot/mod.rs new file mode 100644 index 0000000000..688ce38bad --- /dev/null +++ b/util/merkle/src/snapshot/mod.rs @@ -0,0 +1,338 @@ +// Copyright 2019 Kodebox, Inc. +// This file is part of CodeChain. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +mod chunk; +mod compress; +mod error; +mod ordered_heap; + +use std::cmp::Ordering; + +use ccrypto::BLAKE_NULL_RLP; +use cdb::HashDB; +use primitives::H256; + +use self::chunk::{Chunk, RecoveredChunk, UnresolvedChunk}; +pub use self::compress::{ChunkCompressor, ChunkDecompressor}; +pub use self::error::Error; +use self::ordered_heap::OrderedHeap; +use crate::nibbleslice::NibbleSlice; + +const CHUNK_HEIGHT: usize = 3; +const CHUNK_MAX_NODES: usize = 256; // 16 ^ (CHUNK_HEIGHT-1) + +/// Example: +/// use codechain_merkle::snapshot::Restore; +/// let mut rm = Restore::new(db, root); +/// while let Some(root) = rm.next_to_feed() { +/// let raw_chunk = request(block_hash, root)?; +/// let chunk = raw_chunk.recover(root)?; +/// rm.feed(chunk); +/// } +pub struct Restore<'a> { + db: &'a mut dyn HashDB, + pending: Option, + unresolved: OrderedHeap>, +} + +impl<'a> Restore<'a> { + pub fn new(db: &'a mut dyn HashDB, merkle_root: H256) -> Self { + let mut result = Restore { + db, + pending: None, + unresolved: OrderedHeap::new(), + }; + if merkle_root != BLAKE_NULL_RLP { + result.unresolved.push(ChunkPathPrefix::new(merkle_root).into()); + } + result + } + + pub fn feed(&mut self, chunk: RecoveredChunk) { + let pending_path = self.pending.take().expect("feed() should be called after next()"); + assert_eq!(pending_path.chunk_root, chunk.root, "Unexpected chunk"); + + // Pour nodes into the DB + for (_, value) in chunk.nodes { + self.db.insert(&value); + } + + // Extend search paths + for unresolved in chunk.unresolved_chunks { + self.unresolved.push(pending_path.with_unresolved_chunk(&unresolved).into()); + } + + self.pending = None; + } + + pub fn next_to_feed(&mut self) -> Option { + if let Some(path) = self.unresolved.pop() { + assert!(self.pending.is_none(), "Previous feed() was failed"); + let chunk_root = path.chunk_root; + self.pending = Some(path.0); + + Some(chunk_root) + } else { + None + } + } +} + +/// Example: +/// use std::fs::File; +/// use codechain_merkle::snapshot::Snapshot; +/// +/// for chunk in Snapshot::from_hashdb(db, root) { +/// let mut file = File::create(format!("{}/{}", block_id, chunk.root))?; +/// let mut compressor = ChunkCompressor::new(&mut file); +/// compressor.compress(chunk); +/// } +pub struct Snapshot<'a> { + db: &'a dyn HashDB, + remaining: OrderedHeap>, +} + +impl<'a> Snapshot<'a> { + pub fn from_hashdb(db: &'a dyn HashDB, chunk_root: H256) -> Self { + let mut result = Snapshot { + db, + remaining: OrderedHeap::new(), + }; + if chunk_root != BLAKE_NULL_RLP { + result.remaining.push(ChunkPathPrefix::new(chunk_root).into()); + } + result + } +} + +impl<'a> Iterator for Snapshot<'a> { + type Item = Chunk; + + fn next(&mut self) -> Option { + if let Some(path) = self.remaining.pop() { + let chunk = Chunk::from_chunk_root(self.db, path.chunk_root); + for unresolved in chunk.unresolved_chunks() { + self.remaining.push(path.with_unresolved_chunk(&unresolved).into()); + } + Some(chunk) + } else { + None + } + } +} + + +#[derive(Debug)] +struct ChunkPathPrefix { + // Absolute path prefix of the chunk root + path_prefix: DecodedPathSlice, + depth: usize, + chunk_root: H256, +} + +impl ChunkPathPrefix { + fn new(chunk_root: H256) -> ChunkPathPrefix { + ChunkPathPrefix { + path_prefix: DecodedPathSlice::new(), + depth: 1, + chunk_root, + } + } + + fn with_unresolved_chunk(&self, unresolved: &UnresolvedChunk) -> ChunkPathPrefix { + ChunkPathPrefix { + path_prefix: self.path_prefix.with_path_slice(&unresolved.path_slice), + depth: self.depth + 1, + chunk_root: unresolved.chunk_root, + } + } +} + +impl Ord for DepthFirst { + fn cmp(&self, other: &Self) -> Ordering { + self.0.depth.cmp(&other.0.depth) + } +} + +impl From for DepthFirst { + fn from(path: ChunkPathPrefix) -> Self { + DepthFirst(path) + } +} + +/// Encoded value by NibbleSlice::encoded() +pub type PathSlice = Vec; + +/// for item i, i in 0..16 +pub(crate) struct DecodedPathSlice(Vec); + +impl DecodedPathSlice { + fn new() -> DecodedPathSlice { + DecodedPathSlice(Vec::new()) + } + + fn from_encoded(slice: &[u8]) -> DecodedPathSlice { + DecodedPathSlice(NibbleSlice::from_encoded(slice).to_vec()) + } + + fn with_slice_and_index(&self, slice: NibbleSlice, i: usize) -> DecodedPathSlice { + assert!(i < 16); + let mut v = self.0.clone(); + v.append(&mut slice.to_vec()); + v.push(i as u8); + DecodedPathSlice(v) + } + + fn with_slice(&self, slice: NibbleSlice) -> DecodedPathSlice { + let mut v = self.0.clone(); + v.append(&mut slice.to_vec()); + DecodedPathSlice(v) + } + + fn with_path_slice(&self, path_slice: &DecodedPathSlice) -> DecodedPathSlice { + let mut v = self.0.clone(); + v.extend(path_slice.0.as_slice()); + DecodedPathSlice(v) + } + + fn encode(&self) -> PathSlice { + let (encoded, _) = NibbleSlice::from_vec(&self.0); + encoded.to_vec() + } +} + +impl std::fmt::Debug for DecodedPathSlice { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> Result<(), std::fmt::Error> { + let (encoded, _) = NibbleSlice::from_vec(&self.0); + let nibble_slice = NibbleSlice::from_encoded(&encoded); + writeln!(f, "{:?}", nibble_slice) + } +} + +#[derive(Debug)] +struct DepthFirst(T); + +impl PartialOrd for DepthFirst +where + Self: Ord, +{ + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(&other)) + } +} + +impl PartialEq for DepthFirst +where + Self: Ord, +{ + fn eq(&self, other: &Self) -> bool { + self.cmp(other) == Ordering::Equal + } +} + +impl Eq for DepthFirst where Self: Ord {} + +impl std::ops::Deref for DepthFirst { + type Target = T; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use std::collections::HashMap; + use std::iter::FromIterator; + + use cdb::MemoryDB; + use primitives::{Bytes, H256}; + use standardmap::{Alphabet, StandardMap, ValueMode}; + + use super::chunk::RawChunk; + use crate::{Trie, TrieDB, TrieDBMut, TrieMut}; + + fn random_insert_and_restore_with_count(count: usize) { + let standard_map = StandardMap { + alphabet: Alphabet::Custom(b"@QWERTYUIOPASDFGHJKLZXCVBNM[/]^_".to_vec()), + min_key: 5, + journal_key: 0, + value_mode: ValueMode::Index, + count, + } + .make_with(&mut H256::new()); + // Unique standard map + let unique_map: HashMap = HashMap::from_iter(standard_map.into_iter()); + + let mut root = H256::new(); + let chunks: HashMap = { + // We will throw out `db` after snapshot. + let mut db = MemoryDB::new(); + let mut trie = TrieDBMut::new(&mut db, &mut root); + for (key, value) in &unique_map { + trie.insert(key, value).unwrap(); + } + + Snapshot::from_hashdb(&db, root).map(|chunk| (chunk.root, chunk.into_raw_chunk())).collect() + }; + dbg!(chunks.len()); + + let mut db = MemoryDB::new(); + let mut recover = Restore::new(&mut db, root); + while let Some(chunk_root) = recover.next_to_feed() { + let recovered = chunks[&chunk_root].recover(chunk_root).unwrap(); + recover.feed(recovered); + } + + let trie = TrieDB::try_new(&db, &root).unwrap(); + for (key, value) in &unique_map { + assert_eq!(trie.get(key).unwrap().as_ref(), Some(value)); + } + } + + #[test] + fn random_insert_and_restore_0() { + random_insert_and_restore_with_count(0); + } + + #[test] + fn random_insert_and_restore_1() { + random_insert_and_restore_with_count(1); + } + + #[test] + fn random_insert_and_restore_2() { + random_insert_and_restore_with_count(2); + } + + #[test] + fn random_insert_and_restore_100() { + random_insert_and_restore_with_count(100); + } + + #[test] + fn random_insert_and_restore_10000() { + random_insert_and_restore_with_count(10_000); + } + + #[test] + #[ignore] + fn random_insert_and_restore_100000() { + random_insert_and_restore_with_count(100_000); + } +} diff --git a/util/merkle/src/snapshot/ordered_heap.rs b/util/merkle/src/snapshot/ordered_heap.rs new file mode 100644 index 0000000000..d83efd77c1 --- /dev/null +++ b/util/merkle/src/snapshot/ordered_heap.rs @@ -0,0 +1,76 @@ +// Copyright 2019 Kodebox, Inc. +// This file is part of CodeChain. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::cmp::{Eq, Ord, Ordering, PartialEq, PartialOrd}; +use std::collections::BinaryHeap; + +pub struct OrderedHeap { + heap: BinaryHeap>, + seq: usize, +} + +impl OrderedHeap { + pub fn new() -> OrderedHeap { + OrderedHeap { + heap: BinaryHeap::new(), + seq: 0, + } + } + + pub fn push(&mut self, value: T) { + self.heap.push(OrderedHeapEntry { + seq: self.seq, + value, + }); + self.seq += 1; + } + + pub fn pop(&mut self) -> Option { + self.heap.pop().map(|x| x.value) + } +} + +#[derive(Debug, Clone)] +struct OrderedHeapEntry { + seq: usize, + value: T, +} + +impl Ord for OrderedHeapEntry { + fn cmp(&self, other: &Self) -> Ordering { + self.value.cmp(&other.value).then(self.seq.cmp(&other.seq).reverse()) + } +} + +impl PartialOrd for OrderedHeapEntry +where + Self: Ord, +{ + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(&other)) + } +} + +impl PartialEq for OrderedHeapEntry +where + Self: Ord, +{ + fn eq(&self, other: &Self) -> bool { + self.cmp(other) == Ordering::Equal + } +} + +impl Eq for OrderedHeapEntry where Self: Ord {} diff --git a/util/merkle/src/triedbmut.rs b/util/merkle/src/triedbmut.rs index e843112f9b..86c55c7671 100644 --- a/util/merkle/src/triedbmut.rs +++ b/util/merkle/src/triedbmut.rs @@ -169,6 +169,113 @@ impl<'a> TrieDBMut<'a> { } } + pub(crate) fn insert_raw(&mut self, node: RlpNode) -> crate::Result> { + let mut old_val = None; + let cur_hash = *self.root; + *self.root = self.insert_raw_aux(node, Some(cur_hash), &mut old_val)?; + + Ok(old_val) + } + + fn insert_raw_aux( + &mut self, + node: RlpNode, + cur_node_hash: Option, + old_val: &mut Option, + ) -> crate::Result { + let path = match &node { + RlpNode::Leaf(slice, _) | RlpNode::Branch(slice, _) => slice, + }; + + match cur_node_hash { + Some(hash) => { + let existing_node_rlp = self.db.get(&hash).ok_or_else(|| TrieError::IncompleteDatabase(hash))?; + match RlpNode::decoded(&existing_node_rlp) { + Some(RlpNode::Leaf(partial, value)) => { + // Renew the Leaf + if &partial == path { + let hash = self.db.insert(&RlpNode::encoded(node)); + *old_val = Some(existing_node_rlp); + Ok(hash) + } else { + // Make branch node and insert Leaves + let common = partial.common_prefix(&path); + let mut new_child = empty_children(); + let new_partial = partial.mid(common); + let new_path = path.mid(common); + new_child[new_partial.at(0) as usize] = Some(self.insert_aux( + new_partial.mid(1), + value, + new_child[new_partial.at(0) as usize], + old_val, + )?); + new_child[new_path.at(0) as usize] = Some(self.insert_raw_aux( + node.mid(common + 1), + new_child[new_path.at(0) as usize], + old_val, + )?); + + let hash = self + .db + .insert(&RlpNode::encoded_until(RlpNode::Branch(partial, new_child.into()), common)); + + Ok(hash) + } + } + Some(RlpNode::Branch(partial, mut children)) => { + let common = partial.common_prefix(&path); + + // Make new branch node and insert leaf and branch with new path + if common < partial.len() { + let mut new_child = empty_children(); + let new_partial = partial.mid(common); + let new_path = path.mid(common); + let o_branch = RlpNode::Branch(new_partial.mid(1), children); + + let b_hash = self.db.insert(&RlpNode::encoded(o_branch)); + + new_child[new_partial.at(0) as usize] = Some(b_hash); + new_child[new_path.at(0) as usize] = Some(self.insert_raw_aux( + node.mid(common + 1), + new_child[new_path.at(0) as usize], + old_val, + )?); + + let hash = self + .db + .insert(&RlpNode::encoded_until(RlpNode::Branch(partial, new_child.into()), common)); + + Ok(hash) + } else { + // Insert leaf into the branch node + let new_path = path.mid(common); + + children[new_path.at(0) as usize] = Some(self.insert_raw_aux( + node.mid(common + 1), + children[new_path.at(0) as usize], + old_val, + )?); + + let new_branch = RlpNode::Branch(partial, children); + let node_rlp = RlpNode::encoded(new_branch); + let hash = self.db.insert(&node_rlp); + + Ok(hash) + } + } + None => { + let hash = self.db.insert(&RlpNode::encoded(node)); + Ok(hash) + } + } + } + None => { + let hash = self.db.insert(&RlpNode::encoded(node)); + Ok(hash) + } + } + } + /// Remove auxiliary fn remove_aux( &mut self,