Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions codechain/run_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::path::Path;
use std::sync::{Arc, Weak};
use std::time::{SystemTime, UNIX_EPOCH};

use ccore::snapshot_notify;
use ccore::{
AccountProvider, AccountProviderError, BlockId, ChainNotify, Client, ClientConfig, ClientService, EngineInfo,
EngineType, Miner, MinerService, Scheme, Stratum, StratumConfig, StratumError, NUM_COLUMNS,
Expand All @@ -30,6 +31,7 @@ use ckeystore::KeyStore;
use clap::ArgMatches;
use clogger::{self, EmailAlarm, EmailAlarmConfig, LoggerConfig};
use cnetwork::{Filters, NetworkConfig, NetworkControl, NetworkService, RoutingTable, SocketAddr};
use csync::snapshot::Service as SnapshotService;
use csync::{BlockSyncExtension, BlockSyncSender, TransactionSyncExtension};
use ctimer::TimerLoop;
use ctrlc::CtrlC;
Expand Down Expand Up @@ -360,6 +362,18 @@ pub fn run_node(matches: &ArgMatches) -> Result<(), String> {
stratum_start(&config.stratum_config(), &miner, client.client())?
}

let _snapshot_service = {
if !config.snapshot.disable.unwrap() {
let client = client.client();
let (tx, rx) = snapshot_notify::create();
client.engine().register_snapshot_notify_sender(tx);
let service = Arc::new(SnapshotService::new(client, rx, config.snapshot.path.unwrap()));
Some(service)
} else {
None
}
};

// drop the scheme to free up genesis state.
drop(scheme);

Expand Down
10 changes: 9 additions & 1 deletion core/src/client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ use super::{
};
use crate::block::{ClosedBlock, IsBlock, OpenBlock, SealedBlock};
use crate::blockchain::{BlockChain, BlockProvider, BodyProvider, HeaderProvider, InvoiceProvider, TransactionAddress};
use crate::client::{ConsensusClient, TermInfo};
use crate::client::{ConsensusClient, SnapshotClient, TermInfo};
use crate::consensus::{CodeChainEngine, EngineError};
use crate::encoded;
use crate::error::{BlockImportError, Error, ImportError, SchemeError};
Expand Down Expand Up @@ -948,3 +948,11 @@ impl FindActionHandler for Client {
self.engine.find_action_handler_for(id)
}
}

impl SnapshotClient for Client {
fn notify_snapshot(&self, id: BlockId) {
if let Some(header) = self.block_header(&id) {
self.engine.send_snapshot_notify(header.hash())
}
}
}
5 changes: 5 additions & 0 deletions core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod client;
mod config;
mod error;
mod importer;
pub mod snapshot_notify;
mod test_client;

pub use self::chain_notify::ChainNotify;
Expand Down Expand Up @@ -346,3 +347,7 @@ pub trait StateInfo {
/// is unknown.
fn state_at(&self, id: BlockId) -> Option<TopLevelState>;
}

pub trait SnapshotClient {
fn notify_snapshot(&self, id: BlockId);
}
81 changes: 81 additions & 0 deletions core/src/client/snapshot_notify.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use ctypes::BlockHash;

use parking_lot::RwLock;
use std::sync::mpsc::{sync_channel, Receiver, RecvError, SyncSender};
use std::sync::{Arc, Weak};

pub fn create() -> (NotifySender, NotifyReceiverSource) {
let (tx, rx) = sync_channel(1);
let tx = Arc::new(RwLock::new(Some(tx)));
let tx_weak = Arc::downgrade(&tx);
(
NotifySender {
tx,
},
NotifyReceiverSource(
ReceiverCanceller {
tx: tx_weak,
},
NotifyReceiver {
rx,
},
),
)
}

pub struct NotifySender {
tx: Arc<RwLock<Option<SyncSender<BlockHash>>>>,
}

impl NotifySender {
pub fn notify(&self, block_hash: BlockHash) {
let guard = self.tx.read();
if let Some(tx) = guard.as_ref() {
// TODO: Ignore the error. Receiver thread might be terminated or congested.
let _ = tx.try_send(block_hash);
} else {
// ReceiverCanceller is dropped.
}
}
}

pub struct NotifyReceiverSource(pub ReceiverCanceller, pub NotifyReceiver);

/// Dropping this makes the receiver stopped.
///
/// `recv()` method of the `Receiver` will stop and return `RecvError` when corresponding `Sender` is dropped.
/// This is an inherited behaviour of `std::sync::mpsc::{Sender, Receiver}`.
/// However, we need another way to stop the `Receiver`, since `Sender` is usually shared throughout our codes.
/// We can't collect them all and destory one by one. We need a kill switch.
///
/// `ReceiverCanceller` holds weak reference to the `Sender`, so it doesn't prohibit the default behaviour.
/// Then, we can upgrade the weak reference and get the shared reference to `Sender` itself, and manually drop it with this.
pub struct ReceiverCanceller {
tx: Weak<RwLock<Option<SyncSender<BlockHash>>>>,
}

impl Drop for ReceiverCanceller {
fn drop(&mut self) {
if let Some(tx) = self.tx.upgrade() {
let mut guard = tx.write();
if let Some(sender) = guard.take() {
drop(sender)
}
} else {
// All NotifySender is dropped. No droppable Sender.
}
}
}

/// Receiver is dropped when
/// 1. There are no NotifySenders out there.
/// 2. ReceiverCanceller is dropped. See the comment of `ReceiverCanceller`.
pub struct NotifyReceiver {
rx: Receiver<BlockHash>,
}

impl NotifyReceiver {
pub fn recv(&self) -> Result<BlockHash, RecvError> {
self.rx.recv()
}
}
5 changes: 5 additions & 0 deletions core/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ use primitives::{Bytes, U256};
use self::bit_set::BitSet;
use crate::account_provider::AccountProvider;
use crate::block::{ExecutedBlock, SealedBlock};
use crate::client::snapshot_notify::NotifySender as SnapshotNotifySender;
use crate::client::ConsensusClient;
use crate::codechain_machine::CodeChainMachine;
use crate::error::Error;
Expand Down Expand Up @@ -262,6 +263,10 @@ pub trait ConsensusEngine: Sync + Send {

fn register_chain_notify(&self, _: &Client) {}

fn register_snapshot_notify_sender(&self, _sender: SnapshotNotifySender) {}

fn send_snapshot_notify(&self, _block_hash: BlockHash) {}

fn get_best_block_from_best_proposal_header(&self, header: &HeaderView) -> BlockHash {
header.hash()
}
Expand Down
18 changes: 17 additions & 1 deletion core/src/consensus/solo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ use std::sync::Arc;

use ckey::Address;
use cstate::{ActionHandler, HitHandler};
use ctypes::{CommonParams, Header};
use ctypes::{BlockHash, CommonParams, Header};
use parking_lot::RwLock;

use self::params::SoloParams;
use super::stake;
use super::{ConsensusEngine, Seal};
use crate::block::{ExecutedBlock, IsBlock};
use crate::client::snapshot_notify::NotifySender;
use crate::codechain_machine::CodeChainMachine;
use crate::consensus::{EngineError, EngineType};
use crate::error::Error;
Expand All @@ -35,6 +37,7 @@ pub struct Solo {
params: SoloParams,
machine: CodeChainMachine,
action_handlers: Vec<Arc<dyn ActionHandler>>,
snapshot_notify_sender: Arc<RwLock<Option<NotifySender>>>,
}

impl Solo {
Expand All @@ -50,6 +53,7 @@ impl Solo {
params,
machine,
action_handlers,
snapshot_notify_sender: Arc::new(RwLock::new(None)),
}
}
}
Expand Down Expand Up @@ -135,6 +139,18 @@ impl ConsensusEngine for Solo {
1
}

fn register_snapshot_notify_sender(&self, sender: NotifySender) {
let mut guard = self.snapshot_notify_sender.write();
assert!(guard.is_none(), "snapshot_notify_sender is registered twice");
*guard = Some(sender);
}

fn send_snapshot_notify(&self, block_hash: BlockHash) {
if let Some(sender) = self.snapshot_notify_sender.read().as_ref() {
sender.notify(block_hash)
}
}

fn action_handlers(&self) -> &[Arc<dyn ActionHandler>] {
&self.action_handlers
}
Expand Down
5 changes: 3 additions & 2 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,12 @@ mod tests;

pub use crate::account_provider::{AccountProvider, Error as AccountProviderError};
pub use crate::block::Block;
pub use crate::client::snapshot_notify;
pub use crate::client::Error::Database;
pub use crate::client::{
AccountData, AssetClient, BlockChainClient, BlockChainTrait, ChainNotify, Client, ClientConfig, DatabaseClient,
EngineClient, EngineInfo, ExecuteClient, ImportBlock, MiningBlockChainClient, Shard, StateInfo, TermInfo,
TestBlockChainClient, TextClient,
EngineClient, EngineInfo, ExecuteClient, ImportBlock, MiningBlockChainClient, Shard, SnapshotClient, StateInfo,
TermInfo, TestBlockChainClient, TextClient,
};
pub use crate::consensus::{EngineType, TimeGapParams};
pub use crate::db::{COL_STATE, NUM_COLUMNS};
Expand Down
11 changes: 8 additions & 3 deletions rpc/src/v1/impls/devel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use std::vec::Vec;

use ccore::{
BlockId, DatabaseClient, EngineClient, EngineInfo, MinerService, MiningBlockChainClient, SignedTransaction,
TermInfo, COL_STATE,
SnapshotClient, TermInfo, COL_STATE,
};
use ccrypto::Blake;
use cjson::bytes::Bytes;
Expand All @@ -33,7 +33,7 @@ use csync::BlockSyncEvent;
use ctypes::transaction::{
Action, AssetMintOutput, AssetOutPoint, AssetTransferInput, AssetTransferOutput, Transaction,
};
use ctypes::{Tracker, TxHash};
use ctypes::{BlockHash, Tracker, TxHash};
use jsonrpc_core::Result;
use kvdb::KeyValueDB;
use primitives::{H160, H256};
Expand Down Expand Up @@ -70,7 +70,7 @@ where

impl<C, M> Devel for DevelClient<C, M>
where
C: DatabaseClient + EngineInfo + EngineClient + MiningBlockChainClient + TermInfo + 'static,
C: DatabaseClient + EngineInfo + EngineClient + MiningBlockChainClient + TermInfo + SnapshotClient + 'static,
M: MinerService + 'static,
{
fn get_state_trie_keys(&self, offset: usize, limit: usize) -> Result<Vec<H256>> {
Expand Down Expand Up @@ -108,6 +108,11 @@ where
}
}

fn snapshot(&self, block_hash: BlockHash) -> Result<()> {
self.client.notify_snapshot(BlockId::Hash(block_hash));
Ok(())
}

fn test_tps(&self, setting: TPSTestSetting) -> Result<f64> {
let common_params = self.client.common_params(BlockId::Latest).unwrap();
let mint_fee = common_params.min_asset_mint_cost();
Expand Down
4 changes: 4 additions & 0 deletions rpc/src/v1/traits/devel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
use std::net::SocketAddr;

use cjson::bytes::Bytes;
use ctypes::BlockHash;
use jsonrpc_core::Result;
use primitives::H256;

Expand All @@ -39,6 +40,9 @@ pub trait Devel {
#[rpc(name = "devel_getBlockSyncPeers")]
fn get_block_sync_peers(&self) -> Result<Vec<SocketAddr>>;

#[rpc(name = "devel_snapshot")]
fn snapshot(&self, hash: BlockHash) -> Result<()>;

#[rpc(name = "devel_testTPS")]
fn test_tps(&self, setting: TPSTestSetting) -> Result<f64>;
}
28 changes: 28 additions & 0 deletions spec/JSON-RPC.md
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,7 @@ When `Transaction` is included in any response, there will be an additional fiel
***
* [devel_getStateTrieKeys](#devel_getstatetriekeys)
* [devel_getStateTrieValue](#devel_getstatetrievalue)
* [devel_snapshot](#devel_snapshot)
* [devel_startSealing](#devel_startsealing)
* [devel_stopSealing](#devel_stopsealing)
* [devel_getBlockSyncPeers](#devel_getblocksyncpeers)
Expand Down Expand Up @@ -2979,6 +2980,33 @@ Gets the value of the state trie with the given key.

[Back to **List of methods**](#list-of-methods)

## devel_snapshot
Snapshot the state of the given block hash.

### Params
1. key: `H256`

### Returns

### Request Example
```
curl \
-H 'Content-Type: application/json' \
-d '{"jsonrpc": "2.0", "method": "devel_snapshot", "params": ["0xfc196ede542b03b55aee9f106004e7e3d7ea6a9600692e964b4735a260356b50"], "id": null}' \
localhost:8080
```

### Response Example
```
{
"jsonrpc":"2.0",
"result":[],
"id":null
}
```

[Back to **List of methods**](#list-of-methods)

## devel_startSealing
Starts and enables sealing blocks by the miner.

Expand Down
1 change: 1 addition & 0 deletions sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ extern crate trie_standardmap;
extern crate util_error;

mod block;
pub mod snapshot;
mod transaction;

pub use crate::block::{BlockSyncEvent, BlockSyncExtension, BlockSyncSender};
Expand Down
Loading