Skip to content

Commit cf61d37

Browse files
committed
Implement snapshot service
1 parent c94918e commit cf61d37

File tree

9 files changed

+221
-0
lines changed

9 files changed

+221
-0
lines changed

codechain/run_node.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use std::path::Path;
1919
use std::sync::{Arc, Weak};
2020
use std::time::{SystemTime, UNIX_EPOCH};
2121

22+
use ccore::snapshot_notify;
2223
use ccore::{
2324
AccountProvider, AccountProviderError, BlockId, ChainNotify, Client, ClientConfig, ClientService, EngineInfo,
2425
EngineType, Miner, MinerService, Scheme, Stratum, StratumConfig, StratumError, NUM_COLUMNS,
@@ -30,6 +31,7 @@ use ckeystore::KeyStore;
3031
use clap::ArgMatches;
3132
use clogger::{self, EmailAlarm, EmailAlarmConfig, LoggerConfig};
3233
use cnetwork::{Filters, NetworkConfig, NetworkControl, NetworkService, RoutingTable, SocketAddr};
34+
use csync::snapshot::Service as SnapshotService;
3335
use csync::{BlockSyncExtension, BlockSyncSender, TransactionSyncExtension};
3436
use ctimer::TimerLoop;
3537
use ctrlc::CtrlC;
@@ -360,6 +362,18 @@ pub fn run_node(matches: &ArgMatches) -> Result<(), String> {
360362
stratum_start(&config.stratum_config(), &miner, client.client())?
361363
}
362364

365+
let _snapshot_service = {
366+
if !config.snapshot.disable.unwrap() {
367+
let client = client.client();
368+
let (tx, rx) = snapshot_notify::create();
369+
client.engine().register_snapshot_notify_sender(tx);
370+
let service = Arc::new(SnapshotService::new(client, rx, config.snapshot.path.unwrap()));
371+
Some(service)
372+
} else {
373+
None
374+
}
375+
};
376+
363377
// drop the scheme to free up genesis state.
364378
drop(scheme);
365379

core/src/client/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ mod client;
2020
mod config;
2121
mod error;
2222
mod importer;
23+
pub mod snapshot_notify;
2324
mod test_client;
2425

2526
pub use self::chain_notify::ChainNotify;

core/src/client/snapshot_notify.rs

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
use ctypes::BlockHash;
2+
3+
use parking_lot::RwLock;
4+
use std::sync::mpsc::{sync_channel, Receiver, RecvError, SyncSender};
5+
use std::sync::{Arc, Weak};
6+
7+
pub fn create() -> (NotifySender, NotifyReceiverSource) {
8+
let (tx, rx) = sync_channel(1);
9+
let tx = Arc::new(RwLock::new(Some(tx)));
10+
let tx_weak = Arc::downgrade(&tx);
11+
(
12+
NotifySender {
13+
tx,
14+
},
15+
NotifyReceiverSource(
16+
ReceiverCanceller {
17+
tx: tx_weak,
18+
},
19+
NotifyReceiver {
20+
rx,
21+
},
22+
),
23+
)
24+
}
25+
26+
pub struct NotifySender {
27+
tx: Arc<RwLock<Option<SyncSender<BlockHash>>>>,
28+
}
29+
30+
impl NotifySender {
31+
pub fn notify(&self, block_hash: BlockHash) {
32+
let guard = self.tx.read();
33+
if let Some(tx) = guard.as_ref() {
34+
// TODO: Ignore the error. Receiver thread might be terminated or congested.
35+
let _ = tx.try_send(block_hash);
36+
} else {
37+
// ReceiverCanceller is dropped.
38+
}
39+
}
40+
}
41+
42+
pub struct NotifyReceiverSource(pub ReceiverCanceller, pub NotifyReceiver);
43+
44+
/// Dropping this makes the receiver stopped.
45+
pub struct ReceiverCanceller {
46+
tx: Weak<RwLock<Option<SyncSender<BlockHash>>>>,
47+
}
48+
49+
impl Drop for ReceiverCanceller {
50+
fn drop(&mut self) {
51+
if let Some(tx) = self.tx.upgrade() {
52+
let mut guard = tx.write();
53+
if let Some(sender) = guard.take() {
54+
drop(sender)
55+
}
56+
} else {
57+
// All NotifySender is dropped. No droppable Sender.
58+
}
59+
}
60+
}
61+
62+
/// Receiver is dropped when
63+
/// 1. There are no NotifySenders out there.
64+
/// 2. ReceiverCanceller is dropped.
65+
pub struct NotifyReceiver {
66+
rx: Receiver<BlockHash>,
67+
}
68+
69+
impl NotifyReceiver {
70+
pub fn recv(&self) -> Result<BlockHash, RecvError> {
71+
self.rx.recv()
72+
}
73+
}

core/src/consensus/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ use primitives::{Bytes, U256};
5151
use self::bit_set::BitSet;
5252
use crate::account_provider::AccountProvider;
5353
use crate::block::{ExecutedBlock, SealedBlock};
54+
use crate::client::snapshot_notify::NotifySender as SnapshotNotifySender;
5455
use crate::client::ConsensusClient;
5556
use crate::codechain_machine::CodeChainMachine;
5657
use crate::error::Error;
@@ -262,6 +263,8 @@ pub trait ConsensusEngine: Sync + Send {
262263

263264
fn register_chain_notify(&self, _: &Client) {}
264265

266+
fn register_snapshot_notify_sender(&self, _sender: SnapshotNotifySender) {}
267+
265268
fn get_best_block_from_best_proposal_header(&self, header: &HeaderView) -> BlockHash {
266269
header.hash()
267270
}

core/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ mod tests;
8484

8585
pub use crate::account_provider::{AccountProvider, Error as AccountProviderError};
8686
pub use crate::block::Block;
87+
pub use crate::client::snapshot_notify;
8788
pub use crate::client::Error::Database;
8889
pub use crate::client::{
8990
AccountData, AssetClient, BlockChainClient, BlockChainTrait, ChainNotify, Client, ClientConfig, DatabaseClient,

sync/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ extern crate trie_standardmap;
4646
extern crate util_error;
4747

4848
mod block;
49+
pub mod snapshot;
4950
mod transaction;
5051

5152
pub use crate::block::{BlockSyncEvent, BlockSyncExtension, BlockSyncSender};

sync/src/snapshot/mod.rs

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
// Copyright 2019 Kodebox, Inc.
2+
// This file is part of CodeChain.
3+
//
4+
// This program is free software: you can redistribute it and/or modify
5+
// it under the terms of the GNU Affero General Public License as
6+
// published by the Free Software Foundation, either version 3 of the
7+
// License, or (at your option) any later version.
8+
//
9+
// This program is distributed in the hope that it will be useful,
10+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
// GNU Affero General Public License for more details.
13+
//
14+
// You should have received a copy of the GNU Affero General Public License
15+
// along with this program. If not, see <https://www.gnu.org/licenses/>.
16+
17+
use std::fs::{create_dir_all, File};
18+
use std::path::PathBuf;
19+
use std::sync::Arc;
20+
use std::thread::{spawn, JoinHandle};
21+
22+
use ccore::snapshot_notify::{NotifyReceiverSource, ReceiverCanceller};
23+
use ccore::{BlockChainTrait, BlockId, Client};
24+
use cmerkle::snapshot::{ChunkCompressor, Error as SnapshotError, Snapshot};
25+
use ctypes::BlockHash;
26+
use hashdb::{AsHashDB, HashDB};
27+
use primitives::H256;
28+
29+
pub struct Service {
30+
join_handle: Option<JoinHandle<()>>,
31+
canceller: Option<ReceiverCanceller>,
32+
}
33+
34+
impl Service {
35+
pub fn new(client: Arc<Client>, notify_receiver_source: NotifyReceiverSource, root_dir: String) -> Self {
36+
let NotifyReceiverSource(canceller, receiver) = notify_receiver_source;
37+
let join_handle = spawn(move || {
38+
cinfo!(SYNC, "Snapshot service is on");
39+
while let Ok(block_hash) = receiver.recv() {
40+
cinfo!(SYNC, "Snapshot is requested for block: {}", block_hash);
41+
let state_root = if let Some(header) = client.block_header(&BlockId::Hash(block_hash)) {
42+
header.state_root()
43+
} else {
44+
cerror!(SYNC, "There isn't corresponding header for the requested block hash: {}", block_hash,);
45+
continue
46+
};
47+
let db_lock = client.state_db().read();
48+
if let Some(err) = snapshot(db_lock.as_hashdb(), block_hash, state_root, &root_dir).err() {
49+
cerror!(
50+
SYNC,
51+
"Snapshot request failed for block: {}, chunk_root: {}, err: {}",
52+
block_hash,
53+
state_root,
54+
err
55+
);
56+
}
57+
}
58+
cinfo!(SYNC, "Snapshot service is stopped")
59+
});
60+
61+
Self {
62+
canceller: Some(canceller),
63+
join_handle: Some(join_handle),
64+
}
65+
}
66+
}
67+
68+
fn snapshot(db: &dyn HashDB, block_hash: BlockHash, chunk_root: H256, root_dir: &str) -> Result<(), SnapshotError> {
69+
let snapshot_dir = {
70+
let mut res = PathBuf::new();
71+
res.push(root_dir);
72+
res.push(block_hash.to_string());
73+
res
74+
};
75+
create_dir_all(&snapshot_dir)?;
76+
77+
for chunk in Snapshot::from_hashdb(db, chunk_root) {
78+
let mut chunk_path = snapshot_dir.clone();
79+
chunk_path.push(chunk.root.to_string());
80+
let chunk_file = File::create(chunk_path)?;
81+
let compressor = ChunkCompressor::new(chunk_file);
82+
compressor.compress_chunk(&chunk)?;
83+
}
84+
85+
Ok(())
86+
}
87+
88+
impl Drop for Service {
89+
fn drop(&mut self) {
90+
if let Some(canceller) = self.canceller.take() {
91+
// The thread corresponding to the `self.join_handle` waits for the `self.canceller` is dropped.
92+
// It must be dropped first not to make deadlock at `handle.join()`.
93+
drop(canceller);
94+
}
95+
96+
if let Some(handle) = self.join_handle.take() {
97+
handle.join().expect("Snapshot service thread shouldn't panic");
98+
}
99+
}
100+
}

util/merkle/src/snapshot/error.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use primitives::H256;
2020
use rlp::DecoderError as RlpDecoderError;
2121

2222
use crate::TrieError;
23+
use std::fmt::{Display, Formatter};
2324

2425
#[derive(Debug)]
2526
pub enum Error {
@@ -53,6 +54,17 @@ impl From<ChunkError> for Error {
5354
}
5455
}
5556

57+
impl Display for Error {
58+
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
59+
match self {
60+
Error::IoError(err) => write!(f, "IoError: {}", err),
61+
Error::RlpDecoderError(err) => write!(f, "RlpDecoderError: {}", err),
62+
Error::TrieError(err) => write!(f, "TrieError: {}", err),
63+
Error::ChunkError(err) => write!(f, "ChunkError: {}", err),
64+
}
65+
}
66+
}
67+
5668
#[derive(Debug)]
5769
pub enum ChunkError {
5870
TooBig,
@@ -63,3 +75,17 @@ pub enum ChunkError {
6375
},
6476
InvalidContent,
6577
}
78+
79+
impl Display for ChunkError {
80+
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
81+
match self {
82+
ChunkError::TooBig => write!(f, "Chunk has too many elements"),
83+
ChunkError::InvalidHeight => write!(f, "Chunk height is unexpected height"),
84+
ChunkError::ChunkRootMismatch {
85+
expected,
86+
actual,
87+
} => write!(f, "Chunk root is different from expected. expected: {}, actual: {}", expected, actual),
88+
ChunkError::InvalidContent => write!(f, "Chunk content is invalid"),
89+
}
90+
}
91+
}

util/merkle/src/snapshot/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ use hashdb::HashDB;
2626
use primitives::H256;
2727

2828
use self::chunk::{Chunk, RecoveredChunk, UnresolvedChunk};
29+
pub use self::compress::{ChunkCompressor, ChunkDecompressor};
30+
pub use self::error::Error;
2931
use self::ordered_heap::OrderedHeap;
3032
use crate::nibbleslice::NibbleSlice;
3133

0 commit comments

Comments
 (0)