diff --git a/codechain/config/mod.rs b/codechain/config/mod.rs index 63b01fb928..1cc93c2841 100644 --- a/codechain/config/mod.rs +++ b/codechain/config/mod.rs @@ -284,6 +284,8 @@ fn default_enable_devel_api() -> bool { pub struct Snapshot { pub disable: Option, pub path: Option, + // Snapshot's age in blocks + pub expiration: Option, } #[derive(Deserialize)] @@ -679,6 +681,9 @@ impl Snapshot { if other.path.is_some() { self.path = other.path.clone(); } + if other.expiration.is_some() { + self.expiration = other.expiration; + } } pub fn overwrite_with(&mut self, matches: &clap::ArgMatches) -> Result<(), String> { diff --git a/codechain/config/presets/config.dev.toml b/codechain/config/presets/config.dev.toml index 2b8890b4f7..279704592a 100644 --- a/codechain/config/presets/config.dev.toml +++ b/codechain/config/presets/config.dev.toml @@ -52,6 +52,7 @@ max_connections = 100 [snapshot] disable = false path = "snapshot" +expiration = 100000 # blocks. About a week [stratum] disable = false diff --git a/codechain/config/presets/config.prod.toml b/codechain/config/presets/config.prod.toml index b67e2746bb..013883673d 100644 --- a/codechain/config/presets/config.prod.toml +++ b/codechain/config/presets/config.prod.toml @@ -52,6 +52,7 @@ max_connections = 100 [snapshot] disable = true path = "snapshot" +expiration = 100000 # blocks. About a week [stratum] disable = true diff --git a/codechain/run_node.rs b/codechain/run_node.rs index d67e6d30cf..e81393a524 100644 --- a/codechain/run_node.rs +++ b/codechain/run_node.rs @@ -371,7 +371,8 @@ pub fn run_node(matches: &ArgMatches) -> Result<(), String> { client.engine().register_snapshot_notify_sender(tx); if !config.snapshot.disable.unwrap() { - let service = Arc::new(SnapshotService::new(client, rx, config.snapshot.path.unwrap())); + let service = + Arc::new(SnapshotService::new(client, rx, config.snapshot.path.unwrap(), config.snapshot.expiration)); Some(service) } else { None diff --git a/sync/src/snapshot/mod.rs b/sync/src/snapshot/mod.rs index 760c4469ae..7a075284a2 100644 --- a/sync/src/snapshot/mod.rs +++ b/sync/src/snapshot/mod.rs @@ -14,18 +14,19 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::fs::{create_dir_all, File}; + +use std::fs; use std::path::PathBuf; +use std::str::FromStr; use std::sync::Arc; use std::thread::{spawn, JoinHandle}; use ccore::snapshot_notify::{NotifyReceiverSource, ReceiverCanceller}; -use ccore::{BlockChainTrait, BlockId, Client}; +use ccore::{BlockChainClient, BlockChainTrait, BlockId, Client}; use cmerkle::snapshot::{ChunkCompressor, Error as SnapshotError, Snapshot}; use ctypes::BlockHash; use hashdb::{AsHashDB, HashDB}; use primitives::H256; -use std::ops::Deref; pub struct Service { join_handle: Option>, @@ -35,7 +36,7 @@ pub struct Service { pub fn snapshot_dir(root_dir: &str, block: &BlockHash) -> PathBuf { let mut path = PathBuf::new(); path.push(root_dir); - path.push(format!("{:x}", block.deref())); + path.push(format!("{:x}", **block)); path } @@ -46,7 +47,12 @@ pub fn snapshot_path(root_dir: &str, block: &BlockHash, chunk_root: &H256) -> Pa } impl Service { - pub fn new(client: Arc, notify_receiver_source: NotifyReceiverSource, root_dir: String) -> Self { + pub fn new( + client: Arc, + notify_receiver_source: NotifyReceiverSource, + root_dir: String, + expiration: Option, + ) -> Self { let NotifyReceiverSource(canceller, receiver) = notify_receiver_source; let join_handle = spawn(move || { cinfo!(SYNC, "Snapshot service is on"); @@ -58,19 +64,26 @@ impl Service { cerror!(SYNC, "There isn't corresponding header for the requested block hash: {}", block_hash,); continue }; - let db_lock = client.state_db().read(); - if let Some(err) = snapshot(db_lock.as_hashdb(), block_hash, state_root, &root_dir).err() { - cerror!( - SYNC, - "Snapshot request failed for block: {}, chunk_root: {}, err: {}", - block_hash, - state_root, - err - ); - } else { - cinfo!(SYNC, "Snapshot is ready for block: {}", block_hash) + { + let db_lock = client.state_db().read(); + if let Err(err) = snapshot(db_lock.as_hashdb(), block_hash, state_root, &root_dir) { + cerror!( + SYNC, + "Snapshot request failed for block: {}, chunk_root: {}, err: {}", + block_hash, + state_root, + err + ); + } else { + cinfo!(SYNC, "Snapshot is ready for block: {}", block_hash) + } + } + + if let Some(expiration) = expiration { + if let Err(err) = cleanup_expired(&client, &root_dir, expiration) { + cerror!(SYNC, "Snapshot cleanup error after block hash {}, err: {}", block_hash, err); + } } - // TODO: Prune old snapshots } cinfo!(SYNC, "Snapshot service is stopped") }); @@ -84,11 +97,11 @@ impl Service { fn snapshot(db: &dyn HashDB, block_hash: BlockHash, chunk_root: H256, root_dir: &str) -> Result<(), SnapshotError> { let snapshot_dir = snapshot_dir(root_dir, &block_hash); - create_dir_all(snapshot_dir)?; + fs::create_dir_all(snapshot_dir)?; for chunk in Snapshot::from_hashdb(db, chunk_root) { let chunk_path = snapshot_path(root_dir, &block_hash, &chunk.root); - let chunk_file = File::create(chunk_path)?; + let chunk_file = fs::File::create(chunk_path)?; let compressor = ChunkCompressor::new(chunk_file); compressor.compress_chunk(&chunk)?; } @@ -96,6 +109,68 @@ fn snapshot(db: &dyn HashDB, block_hash: BlockHash, chunk_root: H256, root_dir: Ok(()) } +fn cleanup_expired(client: &Client, root_dir: &str, expiration: u64) -> Result<(), SnapshotError> { + for entry in fs::read_dir(root_dir)? { + let entry = match entry { + Ok(entry) => entry, + Err(err) => { + cerror!(SYNC, "Snapshot cleanup can't retrieve entry. err: {}", err); + continue + } + }; + let path = entry.path(); + + match entry.file_type().map(|x| x.is_dir()) { + Ok(true) => {} + Ok(false) => continue, + Err(err) => { + cerror!(SYNC, "Snapshot cleanup can't retrieve file info: {}, err: {}", path.to_string_lossy(), err); + continue + } + } + + let name = match path.file_name().expect("Directories always have file name").to_str() { + Some(n) => n, + None => continue, + }; + let hash = match H256::from_str(name) { + Ok(h) => BlockHash::from(h), + Err(_) => continue, + }; + let number = if let Some(number) = client.block_number(&BlockId::Hash(hash)) { + number + } else { + cerror!(SYNC, "Snapshot cleanup can't retrieve block number for block_hash: {}", hash); + continue + }; + + if number + expiration < client.best_block_header().number() { + cleanup_snapshot(root_dir, hash) + } + } + Ok(()) +} + +/// Remove all files in `root_dir/block_hash` +fn cleanup_snapshot(root_dir: &str, block_hash: BlockHash) { + let path = snapshot_dir(root_dir, &block_hash); + let rename_to = PathBuf::from(root_dir).join(format!("{:x}.old", *block_hash)); + // It is okay to ignore errors. We just wanted them to be removed. + match fs::rename(path, &rename_to) { + Ok(()) => {} + Err(err) => { + cerror!(SYNC, "Snapshot cleanup: renaming {} failed, reason: {}", block_hash, err); + } + } + // Ignore the error. Cleanup failure is not a critical error. + match fs::remove_dir_all(rename_to) { + Ok(()) => {} + Err(err) => { + cerror!(SYNC, "Snapshot cleanup: removing {} failed, reason: {}", block_hash, err); + } + } +} + impl Drop for Service { fn drop(&mut self) { if let Some(canceller) = self.canceller.take() {