Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions codechain/run_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,10 +363,11 @@ pub fn run_node(matches: &ArgMatches) -> Result<(), String> {
}

let _snapshot_service = {
let client = client.client();
let (tx, rx) = snapshot_notify::create();
client.engine().register_snapshot_notify_sender(tx);
Comment on lines +366 to +368
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you move this?

Copy link
Contributor Author

@foriequal0 foriequal0 Nov 19, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It has a little bit of a complicated story. In Tendermint, Worker waits for some register_* functions to be called before it fires up its event loop. It is implemented with crossbeam::channel and it requires to be called orderly and thoroughly. So I moved it there so the last register_ function is called unconditionally.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These weird initialization steps need to be cleaned up, but it should be an another PR.


if !config.snapshot.disable.unwrap() {
let client = client.client();
let (tx, rx) = snapshot_notify::create();
client.engine().register_snapshot_notify_sender(tx);
let service = Arc::new(SnapshotService::new(client, rx, config.snapshot.path.unwrap()));
Some(service)
} else {
Expand All @@ -376,6 +377,7 @@ pub fn run_node(matches: &ArgMatches) -> Result<(), String> {

// drop the scheme to free up genesis state.
drop(scheme);
client.client().engine().register_is_done();

cinfo!(TEST_SCRIPT, "Initialization complete");

Expand Down
13 changes: 3 additions & 10 deletions core/src/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ use primitives::{Bytes, H256};
use rlp::{Decodable, DecoderError, Encodable, RlpStream, UntrustedRlp};

use super::invoice::Invoice;
use crate::client::TermInfoExt;
use crate::client::{EngineInfo, TermInfo};
use crate::consensus::CodeChainEngine;
use crate::error::{BlockError, Error};
use crate::transaction::{SignedTransaction, UnverifiedTransaction};
use crate::BlockId;

/// A block, encoded as it is on the block chain.
#[derive(Debug, Clone, PartialEq)]
Expand Down Expand Up @@ -504,16 +506,7 @@ pub fn enact<C: ChainTimeInfo + EngineInfo + FindActionHandler + TermInfo>(
b.push_transactions(transactions, client, parent.number(), parent.timestamp())?;

let parent_common_params = client.common_params((*header.parent_hash()).into()).unwrap();
let term_common_params = {
let block_number = client
.last_term_finished_block_num((*header.parent_hash()).into())
.expect("The block of the parent hash should exist");
if block_number == 0 {
None
} else {
Some(client.common_params((block_number).into()).expect("Common params should exist"))
}
};
let term_common_params = client.term_common_params(BlockId::Hash(*header.parent_hash()));
b.close_and_lock(parent, &parent_common_params, term_common_params.as_ref())
}

Expand Down
21 changes: 20 additions & 1 deletion core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ pub trait EngineClient: Sync + Send + BlockChainTrait + ImportBlock {
fn get_kvdb(&self) -> Arc<dyn KeyValueDB>;
}

pub trait ConsensusClient: BlockChainClient + EngineClient + EngineInfo + TermInfo + StateInfo {}
pub trait ConsensusClient: BlockChainClient + EngineClient + EngineInfo + TermInfo + StateInfo + TermInfoExt {}

pub trait TermInfo {
fn last_term_finished_block_num(&self, id: BlockId) -> Option<BlockNumber>;
Expand Down Expand Up @@ -351,3 +351,22 @@ pub trait StateInfo {
pub trait SnapshotClient {
fn notify_snapshot(&self, id: BlockId);
}


pub trait TermInfoExt {
fn term_common_params(&self, id: BlockId) -> Option<CommonParams>;
}

impl<C> TermInfoExt for C
where
C: TermInfo + EngineInfo,
{
fn term_common_params(&self, id: BlockId) -> Option<CommonParams> {
let block_number = self.last_term_finished_block_num(id).expect("The block of the parent hash should exist");
if block_number == 0 {
None
} else {
Some(self.common_params(block_number.into()).expect("Common params should exist"))
}
}
}
2 changes: 2 additions & 0 deletions core/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,8 @@ pub trait ConsensusEngine: Sync + Send {

fn register_snapshot_notify_sender(&self, _sender: SnapshotNotifySender) {}

fn register_is_done(&self) {}

fn send_snapshot_notify(&self, _block_hash: BlockHash) {}

fn get_best_block_from_best_proposal_header(&self, header: &HeaderView) -> BlockHash {
Expand Down
17 changes: 12 additions & 5 deletions core/src/consensus/tendermint/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use crate::encoded;
use crate::error::Error;
use crate::views::HeaderView;
use crate::BlockId;
use client::snapshot_notify::NotifySender as SnapshotNotifySender;
use rlp::Encodable;

impl ConsensusEngine for Tendermint {
Expand Down Expand Up @@ -281,10 +282,6 @@ impl ConsensusEngine for Tendermint {
let extension = service.register_extension(move |api| TendermintExtension::new(inner, timeouts, api));
let client = Weak::clone(self.client.read().as_ref().unwrap());
self.extension_initializer.send((extension, client)).unwrap();

let (result, receiver) = crossbeam::bounded(1);
self.inner.send(worker::Event::Restore(result)).unwrap();
receiver.recv().unwrap();
}

fn register_time_gap_config_to_worker(&self, time_gap_params: TimeGapParams) {
Expand All @@ -303,6 +300,16 @@ impl ConsensusEngine for Tendermint {
client.add_notify(Arc::downgrade(&self.chain_notify) as Weak<dyn ChainNotify>);
}

fn register_snapshot_notify_sender(&self, sender: SnapshotNotifySender) {
self.snapshot_notify_sender_initializer.send(sender).unwrap();
}

fn register_is_done(&self) {
let (result, receiver) = crossbeam::bounded(1);
self.inner.send(worker::Event::Restore(result)).unwrap();
receiver.recv().unwrap();
}

fn get_best_block_from_best_proposal_header(&self, header: &HeaderView) -> BlockHash {
header.parent_hash()
}
Expand Down Expand Up @@ -343,7 +350,7 @@ impl ConsensusEngine for Tendermint {
}
}

fn block_number_if_term_changed(
pub(crate) fn block_number_if_term_changed(
header: &Header,
parent_header: &Header,
common_params: &CommonParams,
Expand Down
13 changes: 11 additions & 2 deletions core/src/consensus/tendermint/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub use self::types::{Height, Step, View};
use super::{stake, ValidatorSet};
use crate::client::ConsensusClient;
use crate::codechain_machine::CodeChainMachine;
use crate::snapshot_notify::NotifySender as SnapshotNotifySender;
use ChainNotify;

/// Timer token representing the consensus step timeouts.
Expand All @@ -58,6 +59,7 @@ pub struct Tendermint {
client: RwLock<Option<Weak<dyn ConsensusClient>>>,
external_params_initializer: crossbeam::Sender<TimeGapParams>,
extension_initializer: crossbeam::Sender<(crossbeam::Sender<network::Event>, Weak<dyn ConsensusClient>)>,
snapshot_notify_sender_initializer: crossbeam::Sender<SnapshotNotifySender>,
timeouts: TimeoutParams,
join: Option<JoinHandle<()>>,
quit_tendermint: crossbeam::Sender<()>,
Expand Down Expand Up @@ -93,15 +95,22 @@ impl Tendermint {
let timeouts = our_params.timeouts;
let machine = Arc::new(machine);

let (join, external_params_initializer, extension_initializer, inner, quit_tendermint) =
worker::spawn(our_params.validators);
let (
join,
external_params_initializer,
extension_initializer,
snapshot_notify_sender_initializer,
inner,
quit_tendermint,
) = worker::spawn(our_params.validators);
let action_handlers: Vec<Arc<dyn ActionHandler>> = vec![stake.clone()];
let chain_notify = Arc::new(TendermintChainNotify::new(inner.clone()));

Arc::new(Tendermint {
client: Default::default(),
external_params_initializer,
extension_initializer,
snapshot_notify_sender_initializer,
timeouts,
join: Some(join),
quit_tendermint,
Expand Down
53 changes: 51 additions & 2 deletions core/src/consensus/tendermint/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use crate::consensus::validator_set::{DynamicValidator, ValidatorSet};
use crate::consensus::{EngineError, Seal};
use crate::encoded;
use crate::error::{BlockError, Error};
use crate::snapshot_notify::NotifySender as SnapshotNotifySender;
use crate::transaction::{SignedTransaction, UnverifiedTransaction};
use crate::views::BlockView;
use crate::BlockId;
Expand All @@ -59,6 +60,7 @@ type SpawnResult = (
JoinHandle<()>,
crossbeam::Sender<TimeGapParams>,
crossbeam::Sender<(crossbeam::Sender<network::Event>, Weak<dyn ConsensusClient>)>,
crossbeam::Sender<SnapshotNotifySender>,
crossbeam::Sender<Event>,
crossbeam::Sender<()>,
);
Expand Down Expand Up @@ -97,6 +99,7 @@ struct Worker {
time_gap_params: TimeGapParams,
timeout_token_nonce: usize,
vote_regression_checker: VoteRegressionChecker,
snapshot_notify_sender: SnapshotNotifySender,
}

pub enum Event {
Expand Down Expand Up @@ -180,6 +183,7 @@ impl Worker {
extension: EventSender<network::Event>,
client: Weak<dyn ConsensusClient>,
time_gap_params: TimeGapParams,
snapshot_notify_sender: SnapshotNotifySender,
) -> Self {
Worker {
client,
Expand All @@ -198,6 +202,7 @@ impl Worker {
time_gap_params,
timeout_token_nonce: ENGINE_TIMEOUT_TOKEN_NONCE_BASE,
vote_regression_checker: VoteRegressionChecker::new(),
snapshot_notify_sender,
}
}

Expand All @@ -206,6 +211,7 @@ impl Worker {
let (quit, quit_receiver) = crossbeam::bounded(1);
let (external_params_initializer, external_params_receiver) = crossbeam::bounded(1);
let (extension_initializer, extension_receiver) = crossbeam::bounded(1);
let (snapshot_notify_sender_initializer, snapshot_notify_sender_receiver) = crossbeam::bounded(1);
let join = Builder::new()
.name("tendermint".to_string())
.spawn(move || {
Expand Down Expand Up @@ -249,8 +255,29 @@ impl Worker {
return
}
};
// TODO: Make initialization steps to order insensitive.
let snapshot_notify_sender = crossbeam::select! {
recv(snapshot_notify_sender_receiver) -> msg => {
match msg {
Ok(sender) => sender,
Err(crossbeam::RecvError) => {
cerror!(ENGINE, "The tendermint extension is not initalized.");
return
}
}
}
recv(quit_receiver) -> msg => {
match msg {
Ok(()) => {},
Err(crossbeam::RecvError) => {
cerror!(ENGINE, "The quit channel for tendermint thread had been closed.");
}
}
return
}
};
validators.register_client(Weak::clone(&client));
let mut inner = Self::new(validators, extension, client, time_gap_params);
let mut inner = Self::new(validators, extension, client, time_gap_params, snapshot_notify_sender);
loop {
crossbeam::select! {
recv(receiver) -> msg => {
Expand Down Expand Up @@ -374,7 +401,7 @@ impl Worker {
}
})
.unwrap();
(join, external_params_initializer, extension_initializer, sender, quit)
(join, external_params_initializer, extension_initializer, snapshot_notify_sender_initializer, sender, quit)
}

/// The client is a thread-safe struct. Using it in multi-threads is safe.
Expand Down Expand Up @@ -1630,6 +1657,28 @@ impl Worker {
}
}

let mut last_term_end = None;
for block_hash in &enacted {
let header = c.block_header(&BlockId::Hash(*block_hash)).expect("Block is enacted").decode();
if header.number() == 0 {
continue
}
let parent_header =
c.block_header(&BlockId::Hash(*header.parent_hash())).expect("Parent block should be enacted").decode();
let term_common_params = if let Some(p) = c.term_common_params(parent_header.hash().into()) {
p
} else {
continue
};
if super::engine::block_number_if_term_changed(&header, &parent_header, &term_common_params).is_some() {
last_term_end = Some(*block_hash);
}
}
if let Some(last_term_end) = last_term_end {
// TODO: Reduce the snapshot frequency.
self.snapshot_notify_sender.notify(last_term_end);
}

if let Some((last, rest)) = imported.split_last() {
let (imported, last_proposal_header) = {
let header =
Expand Down
12 changes: 2 additions & 10 deletions core/src/miner/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use crate::account_provider::{AccountProvider, Error as AccountProviderError};
use crate::block::{Block, ClosedBlock, IsBlock};
use crate::client::{
AccountData, BlockChainTrait, BlockProducer, Client, EngineInfo, ImportBlock, MiningBlockChainClient, TermInfo,
TermInfoExt,
};
use crate::codechain_machine::CodeChainMachine;
use crate::consensus::{CodeChainEngine, EngineType};
Expand Down Expand Up @@ -599,16 +600,7 @@ impl Miner {
(parent_header.decode(), parent_hash)
};
let parent_common_params = chain.common_params(parent_hash.into()).unwrap();
let term_common_params = {
let block_number = chain
.last_term_finished_block_num(parent_hash.into())
.expect("The block of the parent hash should exist");
if block_number == 0 {
None
} else {
Some(chain.common_params((block_number).into()).expect("Common params should exist"))
}
};
let term_common_params = chain.term_common_params(parent_hash.into());
let block = open_block.close(&parent_header, &parent_common_params, term_common_params.as_ref())?;

let fetch_seq = |p: &Public| {
Expand Down
3 changes: 3 additions & 0 deletions sync/src/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ impl Service {
state_root,
err
);
} else {
cinfo!(SYNC, "Snapshot is ready for block: {}", block_hash)
}
// TODO: Prune old snapshots
}
cinfo!(SYNC, "Snapshot service is stopped")
});
Expand Down
Loading