Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions core/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod client;
mod config;
mod error;
mod importer;
pub mod snapshot_notify;
mod test_client;

pub use self::chain_notify::ChainNotify;
Expand Down
97 changes: 97 additions & 0 deletions core/src/client/snapshot_notify.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Copyright 2018-2019 Kodebox, Inc.
// This file is part of CodeChain.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use std::sync::mpsc::{sync_channel, Receiver, RecvError, SyncSender};
use std::sync::{Arc, Weak};

use ctypes::BlockHash;
use parking_lot::RwLock;

pub fn create() -> (NotifySender, NotifyReceiverSource) {
let (tx, rx) = sync_channel(1);
let tx = Arc::new(RwLock::new(Some(tx)));
let tx_weak = Arc::downgrade(&tx);
(
NotifySender {
tx,
},
NotifyReceiverSource(
ReceiverCanceller {
tx: tx_weak,
},
NotifyReceiver {
rx,
},
),
)
}

pub struct NotifySender {
tx: Arc<RwLock<Option<SyncSender<BlockHash>>>>,
}

impl NotifySender {
pub fn notify(&self, block_hash: BlockHash) {
let guard = self.tx.read();
if let Some(tx) = guard.as_ref() {
// TODO: Ignore the error. Receiver thread might be terminated or congested.
let _ = tx.try_send(block_hash);
} else {
// ReceiverCanceller is dropped.
}
}
}

pub struct NotifyReceiverSource(pub ReceiverCanceller, pub NotifyReceiver);

/// Dropping this makes the receiver stopped.
///
/// `recv()` method of the `Receiver` will stop and return `RecvError` when corresponding `Sender` is dropped.
/// This is an inherited behaviour of `std::sync::mpsc::{Sender, Receiver}`.
/// However, we need another way to stop the `Receiver`, since `Sender` is usually shared throughout our codes.
/// We can't collect them all and destory one by one. We need a kill switch.
///
/// `ReceiverCanceller` holds weak reference to the `Sender`, so it doesn't prohibit the default behaviour.
/// Then, we can upgrade the weak reference and get the shared reference to `Sender` itself, and manually drop it with this.
pub struct ReceiverCanceller {
tx: Weak<RwLock<Option<SyncSender<BlockHash>>>>,
}

impl Drop for ReceiverCanceller {
fn drop(&mut self) {
if let Some(tx) = self.tx.upgrade() {
let mut guard = tx.write();
if let Some(sender) = guard.take() {
drop(sender)
}
} else {
// All NotifySender is dropped. No droppable Sender.
}
}
}

/// Receiver is dropped when
/// 1. There are no NotifySenders out there.
/// 2. ReceiverCanceller is dropped. See the comment of `ReceiverCanceller`.
pub struct NotifyReceiver {
rx: Receiver<BlockHash>,
}

impl NotifyReceiver {
pub fn recv(&self) -> Result<BlockHash, RecvError> {
self.rx.recv()
}
}
5 changes: 5 additions & 0 deletions core/src/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use primitives::Bytes;
use self::bit_set::BitSet;
use crate::account_provider::AccountProvider;
use crate::block::{ExecutedBlock, SealedBlock};
use crate::client::snapshot_notify::NotifySender as SnapshotNotifySender;
use crate::client::ConsensusClient;
use crate::codechain_machine::CodeChainMachine;
use crate::error::Error;
Expand Down Expand Up @@ -236,6 +237,10 @@ pub trait ConsensusEngine: Sync + Send {

fn register_chain_notify(&self, _: &Client) {}

fn complete_register(&self) {}

fn register_snapshot_notify_sender(&self, _sender: SnapshotNotifySender) {}

fn get_best_block_from_best_proposal_header(&self, header: &HeaderView<'_>) -> BlockHash {
header.hash()
}
Expand Down
17 changes: 12 additions & 5 deletions core/src/consensus/tendermint/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use super::worker;
use super::{ChainNotify, Tendermint, SEAL_FIELDS};
use crate::account_provider::AccountProvider;
use crate::block::*;
use crate::client::snapshot_notify::NotifySender as SnapshotNotifySender;
use crate::client::{Client, ConsensusClient};
use crate::codechain_machine::CodeChainMachine;
use crate::consensus::tendermint::params::TimeGapParams;
Expand Down Expand Up @@ -290,10 +291,6 @@ impl ConsensusEngine for Tendermint {
let extension = service.register_extension(move |api| TendermintExtension::new(inner, timeouts, api));
let client = Arc::downgrade(&self.client().unwrap());
self.extension_initializer.send((extension, client)).unwrap();

let (result, receiver) = crossbeam::bounded(1);
self.inner.send(worker::Event::Restore(result)).unwrap();
receiver.recv().unwrap();
}

fn register_time_gap_config_to_worker(&self, time_gap_params: TimeGapParams) {
Expand All @@ -312,6 +309,16 @@ impl ConsensusEngine for Tendermint {
client.add_notify(Arc::downgrade(&self.chain_notify) as Weak<dyn ChainNotify>);
}

fn complete_register(&self) {
let (result, receiver) = crossbeam::bounded(1);
self.inner.send(worker::Event::Restore(result)).unwrap();
receiver.recv().unwrap();
}

fn register_snapshot_notify_sender(&self, sender: SnapshotNotifySender) {
self.snapshot_notify_sender_initializer.send(sender).unwrap();
}

fn get_best_block_from_best_proposal_header(&self, header: &HeaderView<'_>) -> BlockHash {
header.parent_hash()
}
Expand Down Expand Up @@ -346,7 +353,7 @@ impl ConsensusEngine for Tendermint {
}
}

fn is_term_changed(header: &Header, parent: &Header, term_seconds: u64) -> bool {
pub(super) fn is_term_changed(header: &Header, parent: &Header, term_seconds: u64) -> bool {
if term_seconds == 0 {
return false
}
Expand Down
13 changes: 11 additions & 2 deletions core/src/consensus/tendermint/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ pub use self::types::{Height, Step, View};
use super::{stake, ValidatorSet};
use crate::client::ConsensusClient;
use crate::codechain_machine::CodeChainMachine;
use crate::snapshot_notify::NotifySender as SnapshotNotifySender;
use crate::ChainNotify;

/// Timer token representing the consensus step timeouts.
Expand All @@ -58,6 +59,7 @@ pub struct Tendermint {
client: RwLock<Option<Weak<dyn ConsensusClient>>>,
external_params_initializer: crossbeam::Sender<TimeGapParams>,
extension_initializer: crossbeam::Sender<(crossbeam::Sender<network::Event>, Weak<dyn ConsensusClient>)>,
snapshot_notify_sender_initializer: crossbeam::Sender<SnapshotNotifySender>,
timeouts: TimeoutParams,
join: Option<JoinHandle<()>>,
quit_tendermint: crossbeam::Sender<()>,
Expand Down Expand Up @@ -93,15 +95,22 @@ impl Tendermint {
let timeouts = our_params.timeouts;
let machine = Arc::new(machine);

let (join, external_params_initializer, extension_initializer, inner, quit_tendermint) =
worker::spawn(our_params.validators);
let (
join,
external_params_initializer,
extension_initializer,
snapshot_notify_sender_initializer,
inner,
quit_tendermint,
) = worker::spawn(our_params.validators);
let action_handlers: Vec<Arc<dyn ActionHandler>> = vec![stake.clone()];
let chain_notify = Arc::new(TendermintChainNotify::new(inner.clone()));

Arc::new(Tendermint {
client: Default::default(),
external_params_initializer,
extension_initializer,
snapshot_notify_sender_initializer,
timeouts,
join: Some(join),
quit_tendermint,
Expand Down
53 changes: 51 additions & 2 deletions core/src/consensus/tendermint/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use crate::consensus::validator_set::{DynamicValidator, ValidatorSet};
use crate::consensus::{EngineError, Seal};
use crate::encoded;
use crate::error::{BlockError, Error};
use crate::snapshot_notify::NotifySender as SnapshotNotifySender;
use crate::transaction::{SignedTransaction, UnverifiedTransaction};
use crate::views::BlockView;
use crate::BlockId;
Expand All @@ -59,6 +60,7 @@ type SpawnResult = (
JoinHandle<()>,
crossbeam::Sender<TimeGapParams>,
crossbeam::Sender<(crossbeam::Sender<network::Event>, Weak<dyn ConsensusClient>)>,
crossbeam::Sender<SnapshotNotifySender>,
crossbeam::Sender<Event>,
crossbeam::Sender<()>,
);
Expand Down Expand Up @@ -97,6 +99,7 @@ struct Worker {
time_gap_params: TimeGapParams,
timeout_token_nonce: usize,
vote_regression_checker: VoteRegressionChecker,
snapshot_notify_sender: SnapshotNotifySender,
}

pub enum Event {
Expand Down Expand Up @@ -180,6 +183,7 @@ impl Worker {
extension: EventSender<network::Event>,
client: Weak<dyn ConsensusClient>,
time_gap_params: TimeGapParams,
snapshot_notify_sender: SnapshotNotifySender,
) -> Self {
Worker {
client,
Expand All @@ -198,6 +202,7 @@ impl Worker {
time_gap_params,
timeout_token_nonce: ENGINE_TIMEOUT_TOKEN_NONCE_BASE,
vote_regression_checker: VoteRegressionChecker::new(),
snapshot_notify_sender,
}
}

Expand All @@ -206,6 +211,7 @@ impl Worker {
let (quit, quit_receiver) = crossbeam::bounded(1);
let (external_params_initializer, external_params_receiver) = crossbeam::bounded(1);
let (extension_initializer, extension_receiver) = crossbeam::bounded(1);
let (snapshot_notify_sender_initializer, snapshot_notify_sender_receiver) = crossbeam::bounded(1);
let join = Builder::new()
.name("tendermint".to_string())
.spawn(move || {
Expand Down Expand Up @@ -249,8 +255,29 @@ impl Worker {
return
}
};
// TODO: Make initialization steps to order insensitive.
let snapshot_notify_sender = crossbeam::select! {
recv(snapshot_notify_sender_receiver) -> msg => {
match msg {
Ok(sender) => sender,
Err(crossbeam::RecvError) => {
cerror!(ENGINE, "The tendermint extension is not initalized.");
return
}
}
}
recv(quit_receiver) -> msg => {
match msg {
Ok(()) => {},
Err(crossbeam::RecvError) => {
cerror!(ENGINE, "The quit channel for tendermint thread had been closed.");
}
}
return
}
};
validators.register_client(Weak::clone(&client));
let mut inner = Self::new(validators, extension, client, time_gap_params);
let mut inner = Self::new(validators, extension, client, time_gap_params, snapshot_notify_sender);
loop {
crossbeam::select! {
recv(receiver) -> msg => {
Expand Down Expand Up @@ -374,7 +401,7 @@ impl Worker {
}
})
.unwrap();
(join, external_params_initializer, extension_initializer, sender, quit)
(join, external_params_initializer, extension_initializer, snapshot_notify_sender_initializer, sender, quit)
}

/// The client is a thread-safe struct. Using it in multi-threads is safe.
Expand Down Expand Up @@ -1620,6 +1647,8 @@ impl Worker {
}
};

self.send_snapshot_notify(c.as_ref(), enacted.as_slice());

if self.step.is_commit() && (imported.len() + enacted.len() == 1) {
let (_, committed_block_hash) = self.step.committed().expect("Commit state always has block_hash");
if imported.first() == Some(&committed_block_hash) {
Expand Down Expand Up @@ -1671,6 +1700,26 @@ impl Worker {
}
}

// Notify once for the latest block even if multiple blocks have been enacted.
fn send_snapshot_notify(&mut self, c: &dyn ConsensusClient, enacted: &[BlockHash]) {
let mut last_snapshot_point = None;
for block_hash in enacted.iter().rev() {
let block_id = BlockId::Hash(*block_hash);

if c.current_term_id(block_id).expect("State trie should exist for enacted block") > 0 {
let last_term_finished_block_num = c.last_term_finished_block_num(block_id).expect("Block is enacted");
let block_number = c.block_number(&block_id).expect("Block number should exist for enacted block");
if last_term_finished_block_num + 1 == block_number {
last_snapshot_point = Some(block_hash);
}
}
}
if let Some(last_snapshot_point) = last_snapshot_point {
// TODO: Reduce the snapshot frequency.
self.snapshot_notify_sender.notify(*last_snapshot_point);
}
}

fn send_proposal_block(
&self,
signature: SchnorrSignature,
Expand Down
1 change: 1 addition & 0 deletions core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ mod tests;

pub use crate::account_provider::{AccountProvider, Error as AccountProviderError};
pub use crate::block::Block;
pub use crate::client::snapshot_notify;
pub use crate::client::{
AccountData, AssetClient, BlockChainClient, BlockChainTrait, ChainNotify, Client, ClientConfig, DatabaseClient,
EngineClient, EngineInfo, ExecuteClient, ImportBlock, MiningBlockChainClient, Shard, StateInfo, TermInfo,
Expand Down
5 changes: 5 additions & 0 deletions foundry/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,8 @@ fn default_enable_devel_api() -> bool {
pub struct Snapshot {
pub disable: Option<bool>,
pub path: Option<String>,
// Snapshot's age in blocks
pub expiration: Option<u64>,
}

#[derive(Deserialize)]
Expand Down Expand Up @@ -702,6 +704,9 @@ impl Snapshot {
if other.path.is_some() {
self.path = other.path.clone();
}
if other.expiration.is_some() {
self.expiration = other.expiration;
}
}

pub fn overwrite_with(&mut self, matches: &clap::ArgMatches<'_>) -> Result<(), String> {
Expand Down
1 change: 1 addition & 0 deletions foundry/config/presets/config.dev.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ max_connections = 100
[snapshot]
disable = false
path = "snapshot"
expiration = 100000 # blocks. About a week

[email_alarm]
disable = true
1 change: 1 addition & 0 deletions foundry/config/presets/config.prod.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ max_connections = 100
[snapshot]
disable = true
path = "snapshot"
expiration = 100000 # blocks. About a week

[email_alarm]
disable = true
Loading