Skip to content

Commit 8a66d37

Browse files
committed
Add autoremoving expired snapshot feature
1 parent 6a7e7e6 commit 8a66d37

File tree

5 files changed

+103
-20
lines changed

5 files changed

+103
-20
lines changed

codechain/config/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -284,6 +284,8 @@ fn default_enable_devel_api() -> bool {
284284
pub struct Snapshot {
285285
pub disable: Option<bool>,
286286
pub path: Option<String>,
287+
// Snapshot's age in blocks
288+
pub expiration: Option<u64>,
287289
}
288290

289291
#[derive(Deserialize)]
@@ -679,6 +681,9 @@ impl Snapshot {
679681
if other.path.is_some() {
680682
self.path = other.path.clone();
681683
}
684+
if other.expiration.is_some() {
685+
self.expiration = other.expiration;
686+
}
682687
}
683688

684689
pub fn overwrite_with(&mut self, matches: &clap::ArgMatches) -> Result<(), String> {

codechain/config/presets/config.dev.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ max_connections = 100
5252
[snapshot]
5353
disable = false
5454
path = "snapshot"
55+
expiration = 100000 # blocks. About a week
5556

5657
[stratum]
5758
disable = false

codechain/config/presets/config.prod.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ max_connections = 100
5252
[snapshot]
5353
disable = true
5454
path = "snapshot"
55+
expiration = 100000 # blocks. About a week
5556

5657
[stratum]
5758
disable = true

codechain/run_node.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -371,7 +371,8 @@ pub fn run_node(matches: &ArgMatches) -> Result<(), String> {
371371
client.engine().register_snapshot_notify_sender(tx);
372372

373373
if !config.snapshot.disable.unwrap() {
374-
let service = Arc::new(SnapshotService::new(client, rx, config.snapshot.path.unwrap()));
374+
let service =
375+
Arc::new(SnapshotService::new(client, rx, config.snapshot.path.unwrap(), config.snapshot.expiration));
375376
Some(service)
376377
} else {
377378
None

sync/src/snapshot/mod.rs

Lines changed: 94 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -14,18 +14,19 @@
1414
// You should have received a copy of the GNU Affero General Public License
1515
// along with this program. If not, see <https://www.gnu.org/licenses/>.
1616

17-
use std::fs::{create_dir_all, File};
17+
18+
use std::fs;
1819
use std::path::PathBuf;
20+
use std::str::FromStr;
1921
use std::sync::Arc;
2022
use std::thread::{spawn, JoinHandle};
2123

2224
use ccore::snapshot_notify::{NotifyReceiverSource, ReceiverCanceller};
23-
use ccore::{BlockChainTrait, BlockId, Client};
25+
use ccore::{BlockChainClient, BlockChainTrait, BlockId, Client};
2426
use cmerkle::snapshot::{ChunkCompressor, Error as SnapshotError, Snapshot};
2527
use ctypes::BlockHash;
2628
use hashdb::{AsHashDB, HashDB};
2729
use primitives::H256;
28-
use std::ops::Deref;
2930

3031
pub struct Service {
3132
join_handle: Option<JoinHandle<()>>,
@@ -35,7 +36,7 @@ pub struct Service {
3536
pub fn snapshot_dir(root_dir: &str, block: &BlockHash) -> PathBuf {
3637
let mut path = PathBuf::new();
3738
path.push(root_dir);
38-
path.push(format!("{:x}", block.deref()));
39+
path.push(format!("{:x}", **block));
3940
path
4041
}
4142

@@ -46,7 +47,12 @@ pub fn snapshot_path(root_dir: &str, block: &BlockHash, chunk_root: &H256) -> Pa
4647
}
4748

4849
impl Service {
49-
pub fn new(client: Arc<Client>, notify_receiver_source: NotifyReceiverSource, root_dir: String) -> Self {
50+
pub fn new(
51+
client: Arc<Client>,
52+
notify_receiver_source: NotifyReceiverSource,
53+
root_dir: String,
54+
expiration: Option<u64>,
55+
) -> Self {
5056
let NotifyReceiverSource(canceller, receiver) = notify_receiver_source;
5157
let join_handle = spawn(move || {
5258
cinfo!(SYNC, "Snapshot service is on");
@@ -58,19 +64,26 @@ impl Service {
5864
cerror!(SYNC, "There isn't corresponding header for the requested block hash: {}", block_hash,);
5965
continue
6066
};
61-
let db_lock = client.state_db().read();
62-
if let Some(err) = snapshot(db_lock.as_hashdb(), block_hash, state_root, &root_dir).err() {
63-
cerror!(
64-
SYNC,
65-
"Snapshot request failed for block: {}, chunk_root: {}, err: {}",
66-
block_hash,
67-
state_root,
68-
err
69-
);
70-
} else {
71-
cinfo!(SYNC, "Snapshot is ready for block: {}", block_hash)
67+
{
68+
let db_lock = client.state_db().read();
69+
if let Err(err) = snapshot(db_lock.as_hashdb(), block_hash, state_root, &root_dir) {
70+
cerror!(
71+
SYNC,
72+
"Snapshot request failed for block: {}, chunk_root: {}, err: {}",
73+
block_hash,
74+
state_root,
75+
err
76+
);
77+
} else {
78+
cinfo!(SYNC, "Snapshot is ready for block: {}", block_hash)
79+
}
80+
}
81+
82+
if let Some(expiration) = expiration {
83+
if let Err(err) = cleanup_expired(&client, &root_dir, expiration) {
84+
cerror!(SYNC, "Snapshot cleanup error after block hash {}, err: {}", block_hash, err);
85+
}
7286
}
73-
// TODO: Prune old snapshots
7487
}
7588
cinfo!(SYNC, "Snapshot service is stopped")
7689
});
@@ -84,18 +97,80 @@ impl Service {
8497

8598
fn snapshot(db: &dyn HashDB, block_hash: BlockHash, chunk_root: H256, root_dir: &str) -> Result<(), SnapshotError> {
8699
let snapshot_dir = snapshot_dir(root_dir, &block_hash);
87-
create_dir_all(snapshot_dir)?;
100+
fs::create_dir_all(snapshot_dir)?;
88101

89102
for chunk in Snapshot::from_hashdb(db, chunk_root) {
90103
let chunk_path = snapshot_path(root_dir, &block_hash, &chunk.root);
91-
let chunk_file = File::create(chunk_path)?;
104+
let chunk_file = fs::File::create(chunk_path)?;
92105
let compressor = ChunkCompressor::new(chunk_file);
93106
compressor.compress_chunk(&chunk)?;
94107
}
95108

96109
Ok(())
97110
}
98111

112+
fn cleanup_expired(client: &Client, root_dir: &str, expiration: u64) -> Result<(), SnapshotError> {
113+
for entry in fs::read_dir(root_dir)? {
114+
let entry = match entry {
115+
Ok(entry) => entry,
116+
Err(err) => {
117+
cerror!(SYNC, "Snapshot cleanup can't retrieve entry. err: {}", err);
118+
continue
119+
}
120+
};
121+
let path = entry.path();
122+
123+
match entry.file_type().map(|x| x.is_dir()) {
124+
Ok(true) => {}
125+
Ok(false) => continue,
126+
Err(err) => {
127+
cerror!(SYNC, "Snapshot cleanup can't retrieve file info: {}, err: {}", path.to_string_lossy(), err);
128+
continue
129+
}
130+
}
131+
132+
let name = match path.file_name().expect("Directories always have file name").to_str() {
133+
Some(n) => n,
134+
None => continue,
135+
};
136+
let hash = match H256::from_str(name) {
137+
Ok(h) => BlockHash::from(h),
138+
Err(_) => continue,
139+
};
140+
let number = if let Some(number) = client.block_number(&BlockId::Hash(hash)) {
141+
number
142+
} else {
143+
cerror!(SYNC, "Snapshot cleanup can't retrieve block number for block_hash: {}", hash);
144+
continue
145+
};
146+
147+
if number + expiration < client.best_block_header().number() {
148+
cleanup_snapshot(root_dir, hash)
149+
}
150+
}
151+
Ok(())
152+
}
153+
154+
/// Remove all files in `root_dir/block_hash`
155+
fn cleanup_snapshot(root_dir: &str, block_hash: BlockHash) {
156+
let path = snapshot_dir(root_dir, &block_hash);
157+
let rename_to = PathBuf::from(root_dir).join(format!("{:x}.old", *block_hash));
158+
// It is okay to ignore errors. We just wanted them to be removed.
159+
match fs::rename(path, &rename_to) {
160+
Ok(()) => {}
161+
Err(err) => {
162+
cerror!(SYNC, "Snapshot cleanup: renaming {} failed, reason: {}", block_hash, err);
163+
}
164+
}
165+
// Ignore the error. Cleanup failure is not a critical error.
166+
match fs::remove_dir_all(rename_to) {
167+
Ok(()) => {}
168+
Err(err) => {
169+
cerror!(SYNC, "Snapshot cleanup: removing {} failed, reason: {}", block_hash, err);
170+
}
171+
}
172+
}
173+
99174
impl Drop for Service {
100175
fn drop(&mut self) {
101176
if let Some(canceller) = self.canceller.take() {

0 commit comments

Comments
 (0)