Skip to content

Commit ecdaa3a

Browse files
committed
Implement snapshot service
1 parent a8627c2 commit ecdaa3a

File tree

9 files changed

+232
-0
lines changed

9 files changed

+232
-0
lines changed

codechain/run_node.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use std::path::Path;
1919
use std::sync::{Arc, Weak};
2020
use std::time::{SystemTime, UNIX_EPOCH};
2121

22+
use ccore::snapshot_notify;
2223
use ccore::{
2324
AccountProvider, AccountProviderError, BlockId, ChainNotify, Client, ClientConfig, ClientService, EngineInfo,
2425
EngineType, Miner, MinerService, Scheme, Stratum, StratumConfig, StratumError, NUM_COLUMNS,
@@ -30,6 +31,7 @@ use ckeystore::KeyStore;
3031
use clap::ArgMatches;
3132
use clogger::{self, EmailAlarm, LoggerConfig};
3233
use cnetwork::{Filters, NetworkConfig, NetworkControl, NetworkService, RoutingTable, SocketAddr};
34+
use csync::snapshot::Service as SnapshotService;
3335
use csync::{BlockSyncExtension, BlockSyncSender, TransactionSyncExtension};
3436
use ctimer::TimerLoop;
3537
use ctrlc::CtrlC;
@@ -363,6 +365,18 @@ pub fn run_node(matches: &ArgMatches) -> Result<(), String> {
363365
stratum_start(&config.stratum_config(), &miner, client.client())?
364366
}
365367

368+
let _snapshot_service = {
369+
if !config.snapshot.disable.unwrap() {
370+
let client = client.client();
371+
let (tx, rx) = snapshot_notify::create();
372+
client.engine().register_snapshot_notify_sender(tx);
373+
let service = Arc::new(SnapshotService::new(client, rx, config.snapshot.path.unwrap()));
374+
Some(service)
375+
} else {
376+
None
377+
}
378+
};
379+
366380
// drop the scheme to free up genesis state.
367381
drop(scheme);
368382
client.client().engine().complete_register();

core/src/client/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ mod client;
2020
mod config;
2121
mod error;
2222
mod importer;
23+
pub mod snapshot_notify;
2324
mod test_client;
2425

2526
pub use self::chain_notify::ChainNotify;

core/src/client/snapshot_notify.rs

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
use ctypes::BlockHash;
2+
3+
use parking_lot::RwLock;
4+
use std::sync::mpsc::{sync_channel, Receiver, RecvError, SyncSender};
5+
use std::sync::{Arc, Weak};
6+
7+
pub fn create() -> (NotifySender, NotifyReceiverSource) {
8+
let (tx, rx) = sync_channel(1);
9+
let tx = Arc::new(RwLock::new(Some(tx)));
10+
let tx_weak = Arc::downgrade(&tx);
11+
(
12+
NotifySender {
13+
tx,
14+
},
15+
NotifyReceiverSource(
16+
ReceiverCanceller {
17+
tx: tx_weak,
18+
},
19+
NotifyReceiver {
20+
rx,
21+
},
22+
),
23+
)
24+
}
25+
26+
pub struct NotifySender {
27+
tx: Arc<RwLock<Option<SyncSender<BlockHash>>>>,
28+
}
29+
30+
impl NotifySender {
31+
pub fn notify(&self, block_hash: BlockHash) {
32+
let guard = self.tx.read();
33+
if let Some(tx) = guard.as_ref() {
34+
// TODO: Ignore the error. Receiver thread might be terminated or congested.
35+
let _ = tx.try_send(block_hash);
36+
} else {
37+
// ReceiverCanceller is dropped.
38+
}
39+
}
40+
}
41+
42+
pub struct NotifyReceiverSource(pub ReceiverCanceller, pub NotifyReceiver);
43+
44+
/// Dropping this makes the receiver stopped.
45+
///
46+
/// `recv()` method of the `Receiver` will stop and return `RecvError` when corresponding `Sender` is dropped.
47+
/// This is an inherited behaviour of `std::sync::mpsc::{Sender, Receiver}`.
48+
/// However, we need another way to stop the `Receiver`, since `Sender` is usually shared throughout our codes.
49+
/// We can't collect them all and destory one by one. We need a kill switch.
50+
///
51+
/// `ReceiverCanceller` holds weak reference to the `Sender`, so it doesn't prohibit the default behaviour.
52+
/// Then, we can upgrade the weak reference and get the shared reference to `Sender` itself, and manually drop it with this.
53+
pub struct ReceiverCanceller {
54+
tx: Weak<RwLock<Option<SyncSender<BlockHash>>>>,
55+
}
56+
57+
impl Drop for ReceiverCanceller {
58+
fn drop(&mut self) {
59+
if let Some(tx) = self.tx.upgrade() {
60+
let mut guard = tx.write();
61+
if let Some(sender) = guard.take() {
62+
drop(sender)
63+
}
64+
} else {
65+
// All NotifySender is dropped. No droppable Sender.
66+
}
67+
}
68+
}
69+
70+
/// Receiver is dropped when
71+
/// 1. There are no NotifySenders out there.
72+
/// 2. ReceiverCanceller is dropped. See the comment of `ReceiverCanceller`.
73+
pub struct NotifyReceiver {
74+
rx: Receiver<BlockHash>,
75+
}
76+
77+
impl NotifyReceiver {
78+
pub fn recv(&self) -> Result<BlockHash, RecvError> {
79+
self.rx.recv()
80+
}
81+
}

core/src/consensus/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ use primitives::{Bytes, U256};
5151
use self::bit_set::BitSet;
5252
use crate::account_provider::AccountProvider;
5353
use crate::block::{ExecutedBlock, SealedBlock};
54+
use crate::client::snapshot_notify::NotifySender as SnapshotNotifySender;
5455
use crate::client::ConsensusClient;
5556
use crate::codechain_machine::CodeChainMachine;
5657
use crate::error::Error;
@@ -265,6 +266,8 @@ pub trait ConsensusEngine: Sync + Send {
265266

266267
fn register_chain_notify(&self, _: &Client) {}
267268

269+
fn register_snapshot_notify_sender(&self, _sender: SnapshotNotifySender) {}
270+
268271
fn complete_register(&self) {}
269272

270273
fn get_best_block_from_best_proposal_header(&self, header: &HeaderView) -> BlockHash {

core/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ mod tests;
8484

8585
pub use crate::account_provider::{AccountProvider, Error as AccountProviderError};
8686
pub use crate::block::Block;
87+
pub use crate::client::snapshot_notify;
8788
pub use crate::client::{
8889
AccountData, AssetClient, BlockChainClient, BlockChainTrait, ChainNotify, Client, ClientConfig, DatabaseClient,
8990
EngineClient, EngineInfo, ExecuteClient, ImportBlock, MiningBlockChainClient, Shard, StateInfo, TermInfo,

sync/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ extern crate trie_standardmap;
4646
extern crate util_error;
4747

4848
mod block;
49+
pub mod snapshot;
4950
mod transaction;
5051

5152
pub use crate::block::{BlockSyncEvent, BlockSyncExtension, BlockSyncSender};

sync/src/snapshot/mod.rs

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
// Copyright 2019 Kodebox, Inc.
2+
// This file is part of CodeChain.
3+
//
4+
// This program is free software: you can redistribute it and/or modify
5+
// it under the terms of the GNU Affero General Public License as
6+
// published by the Free Software Foundation, either version 3 of the
7+
// License, or (at your option) any later version.
8+
//
9+
// This program is distributed in the hope that it will be useful,
10+
// but WITHOUT ANY WARRANTY; without even the implied warranty of
11+
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12+
// GNU Affero General Public License for more details.
13+
//
14+
// You should have received a copy of the GNU Affero General Public License
15+
// along with this program. If not, see <https://www.gnu.org/licenses/>.
16+
17+
use std::fs::{create_dir_all, File};
18+
use std::path::PathBuf;
19+
use std::sync::Arc;
20+
use std::thread::{spawn, JoinHandle};
21+
22+
use ccore::snapshot_notify::{NotifyReceiverSource, ReceiverCanceller};
23+
use ccore::{BlockChainTrait, BlockId, Client};
24+
use cmerkle::snapshot::{ChunkCompressor, Error as SnapshotError, Snapshot};
25+
use ctypes::BlockHash;
26+
use hashdb::{AsHashDB, HashDB};
27+
use primitives::H256;
28+
use std::ops::Deref;
29+
30+
pub struct Service {
31+
join_handle: Option<JoinHandle<()>>,
32+
canceller: Option<ReceiverCanceller>,
33+
}
34+
35+
impl Service {
36+
pub fn new(client: Arc<Client>, notify_receiver_source: NotifyReceiverSource, root_dir: String) -> Self {
37+
let NotifyReceiverSource(canceller, receiver) = notify_receiver_source;
38+
let join_handle = spawn(move || {
39+
cinfo!(SYNC, "Snapshot service is on");
40+
while let Ok(block_hash) = receiver.recv() {
41+
cinfo!(SYNC, "Snapshot is requested for block: {}", block_hash);
42+
let state_root = if let Some(header) = client.block_header(&BlockId::Hash(block_hash)) {
43+
header.state_root()
44+
} else {
45+
cerror!(SYNC, "There isn't corresponding header for the requested block hash: {}", block_hash,);
46+
continue
47+
};
48+
let db_lock = client.state_db().read();
49+
if let Some(err) = snapshot(db_lock.as_hashdb(), block_hash, state_root, &root_dir).err() {
50+
cerror!(
51+
SYNC,
52+
"Snapshot request failed for block: {}, chunk_root: {}, err: {}",
53+
block_hash,
54+
state_root,
55+
err
56+
);
57+
} else {
58+
cinfo!(SYNC, "Snapshot is ready for block: {}", block_hash)
59+
}
60+
}
61+
cinfo!(SYNC, "Snapshot service is stopped")
62+
});
63+
64+
Self {
65+
canceller: Some(canceller),
66+
join_handle: Some(join_handle),
67+
}
68+
}
69+
}
70+
71+
fn snapshot(db: &dyn HashDB, block_hash: BlockHash, chunk_root: H256, root_dir: &str) -> Result<(), SnapshotError> {
72+
let snapshot_dir = {
73+
let mut res = PathBuf::new();
74+
res.push(root_dir);
75+
res.push(format!("{:x}", block_hash.deref()));
76+
res
77+
};
78+
create_dir_all(&snapshot_dir)?;
79+
80+
for chunk in Snapshot::from_hashdb(db, chunk_root) {
81+
let mut chunk_path = snapshot_dir.clone();
82+
chunk_path.push(format!("{:x}", chunk.root));
83+
let chunk_file = File::create(chunk_path)?;
84+
let compressor = ChunkCompressor::new(chunk_file);
85+
compressor.compress_chunk(&chunk)?;
86+
}
87+
88+
Ok(())
89+
}
90+
91+
impl Drop for Service {
92+
fn drop(&mut self) {
93+
if let Some(canceller) = self.canceller.take() {
94+
// The thread corresponding to the `self.join_handle` waits for the `self.canceller` is dropped.
95+
// It must be dropped first not to make deadlock at `handle.join()`.
96+
drop(canceller);
97+
}
98+
99+
if let Some(handle) = self.join_handle.take() {
100+
handle.join().expect("Snapshot service thread shouldn't panic");
101+
}
102+
}
103+
}

util/merkle/src/snapshot/error.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use primitives::H256;
2020
use rlp::DecoderError as RlpDecoderError;
2121

2222
use crate::TrieError;
23+
use std::fmt::{Display, Formatter};
2324

2425
#[derive(Debug)]
2526
pub enum Error {
@@ -53,6 +54,17 @@ impl From<ChunkError> for Error {
5354
}
5455
}
5556

57+
impl Display for Error {
58+
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
59+
match self {
60+
Error::IoError(err) => write!(f, "IoError: {}", err),
61+
Error::RlpDecoderError(err) => write!(f, "RlpDecoderError: {}", err),
62+
Error::TrieError(err) => write!(f, "TrieError: {}", err),
63+
Error::ChunkError(err) => write!(f, "ChunkError: {}", err),
64+
}
65+
}
66+
}
67+
5668
#[derive(Debug)]
5769
pub enum ChunkError {
5870
TooBig,
@@ -63,3 +75,17 @@ pub enum ChunkError {
6375
},
6476
InvalidContent,
6577
}
78+
79+
impl Display for ChunkError {
80+
fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), std::fmt::Error> {
81+
match self {
82+
ChunkError::TooBig => write!(f, "Chunk has too many elements"),
83+
ChunkError::InvalidHeight => write!(f, "Chunk height is unexpected height"),
84+
ChunkError::ChunkRootMismatch {
85+
expected,
86+
actual,
87+
} => write!(f, "Chunk root is different from expected. expected: {}, actual: {}", expected, actual),
88+
ChunkError::InvalidContent => write!(f, "Chunk content is invalid"),
89+
}
90+
}
91+
}

util/merkle/src/snapshot/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@ use hashdb::HashDB;
2626
use primitives::H256;
2727

2828
use self::chunk::{Chunk, RecoveredChunk, UnresolvedChunk};
29+
pub use self::compress::{ChunkCompressor, ChunkDecompressor};
30+
pub use self::error::Error;
2931
use self::ordered_heap::OrderedHeap;
3032
use crate::nibbleslice::NibbleSlice;
3133

0 commit comments

Comments
 (0)