Skip to content

Commit 443a178

Browse files
committed
Add SnapshotNotifySender to Tendermint worker
1 parent b3cc277 commit 443a178

File tree

4 files changed

+49
-7
lines changed

4 files changed

+49
-7
lines changed

codechain/run_node.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -366,10 +366,11 @@ pub fn run_node(matches: &ArgMatches) -> Result<(), String> {
366366
}
367367

368368
let _snapshot_service = {
369+
let client = client.client();
370+
let (tx, rx) = snapshot_notify::create();
371+
client.engine().register_snapshot_notify_sender(tx);
372+
369373
if !config.snapshot.disable.unwrap() {
370-
let client = client.client();
371-
let (tx, rx) = snapshot_notify::create();
372-
client.engine().register_snapshot_notify_sender(tx);
373374
let service = Arc::new(SnapshotService::new(client, rx, config.snapshot.path.unwrap()));
374375
Some(service)
375376
} else {

core/src/consensus/tendermint/engine.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ use super::worker;
3737
use super::{ChainNotify, Tendermint, SEAL_FIELDS};
3838
use crate::account_provider::AccountProvider;
3939
use crate::block::*;
40+
use crate::client::snapshot_notify::NotifySender as SnapshotNotifySender;
4041
use crate::client::{Client, ConsensusClient};
4142
use crate::codechain_machine::CodeChainMachine;
4243
use crate::consensus::tendermint::params::TimeGapParams;
@@ -359,6 +360,10 @@ impl ConsensusEngine for Tendermint {
359360
client.add_notify(Arc::downgrade(&self.chain_notify) as Weak<dyn ChainNotify>);
360361
}
361362

363+
fn register_snapshot_notify_sender(&self, sender: SnapshotNotifySender) {
364+
self.snapshot_notify_sender_initializer.send(sender).unwrap();
365+
}
366+
362367
fn complete_register(&self) {
363368
let (result, receiver) = crossbeam::bounded(1);
364369
self.inner.send(worker::Event::Restore(result)).unwrap();

core/src/consensus/tendermint/mod.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ pub use self::types::{Height, Step, View};
4141
use super::{stake, ValidatorSet};
4242
use crate::client::ConsensusClient;
4343
use crate::codechain_machine::CodeChainMachine;
44+
use crate::snapshot_notify::NotifySender as SnapshotNotifySender;
4445
use crate::ChainNotify;
4546

4647
/// Timer token representing the consensus step timeouts.
@@ -58,6 +59,7 @@ pub struct Tendermint {
5859
client: RwLock<Option<Weak<dyn ConsensusClient>>>,
5960
external_params_initializer: crossbeam::Sender<TimeGapParams>,
6061
extension_initializer: crossbeam::Sender<(crossbeam::Sender<network::Event>, Weak<dyn ConsensusClient>)>,
62+
snapshot_notify_sender_initializer: crossbeam::Sender<SnapshotNotifySender>,
6163
timeouts: TimeoutParams,
6264
join: Option<JoinHandle<()>>,
6365
quit_tendermint: crossbeam::Sender<()>,
@@ -93,15 +95,22 @@ impl Tendermint {
9395
let timeouts = our_params.timeouts;
9496
let machine = Arc::new(machine);
9597

96-
let (join, external_params_initializer, extension_initializer, inner, quit_tendermint) =
97-
worker::spawn(our_params.validators);
98+
let (
99+
join,
100+
external_params_initializer,
101+
extension_initializer,
102+
snapshot_notify_sender_initializer,
103+
inner,
104+
quit_tendermint,
105+
) = worker::spawn(our_params.validators);
98106
let action_handlers: Vec<Arc<dyn ActionHandler>> = vec![stake.clone()];
99107
let chain_notify = Arc::new(TendermintChainNotify::new(inner.clone()));
100108

101109
Arc::new(Tendermint {
102110
client: Default::default(),
103111
external_params_initializer,
104112
extension_initializer,
113+
snapshot_notify_sender_initializer,
105114
timeouts,
106115
join: Some(join),
107116
quit_tendermint,

core/src/consensus/tendermint/worker.rs

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ use crate::consensus::validator_set::{DynamicValidator, ValidatorSet};
5050
use crate::consensus::{EngineError, Seal};
5151
use crate::encoded;
5252
use crate::error::{BlockError, Error};
53+
use crate::snapshot_notify::NotifySender as SnapshotNotifySender;
5354
use crate::transaction::{SignedTransaction, UnverifiedTransaction};
5455
use crate::views::BlockView;
5556
use crate::BlockId;
@@ -59,6 +60,7 @@ type SpawnResult = (
5960
JoinHandle<()>,
6061
crossbeam::Sender<TimeGapParams>,
6162
crossbeam::Sender<(crossbeam::Sender<network::Event>, Weak<dyn ConsensusClient>)>,
63+
crossbeam::Sender<SnapshotNotifySender>,
6264
crossbeam::Sender<Event>,
6365
crossbeam::Sender<()>,
6466
);
@@ -97,6 +99,7 @@ struct Worker {
9799
time_gap_params: TimeGapParams,
98100
timeout_token_nonce: usize,
99101
vote_regression_checker: VoteRegressionChecker,
102+
snapshot_notify_sender: SnapshotNotifySender,
100103
}
101104

102105
pub enum Event {
@@ -180,6 +183,7 @@ impl Worker {
180183
extension: EventSender<network::Event>,
181184
client: Weak<dyn ConsensusClient>,
182185
time_gap_params: TimeGapParams,
186+
snapshot_notify_sender: SnapshotNotifySender,
183187
) -> Self {
184188
Worker {
185189
client,
@@ -198,6 +202,7 @@ impl Worker {
198202
time_gap_params,
199203
timeout_token_nonce: ENGINE_TIMEOUT_TOKEN_NONCE_BASE,
200204
vote_regression_checker: VoteRegressionChecker::new(),
205+
snapshot_notify_sender,
201206
}
202207
}
203208

@@ -206,6 +211,7 @@ impl Worker {
206211
let (quit, quit_receiver) = crossbeam::bounded(1);
207212
let (external_params_initializer, external_params_receiver) = crossbeam::bounded(1);
208213
let (extension_initializer, extension_receiver) = crossbeam::bounded(1);
214+
let (snapshot_notify_sender_initializer, snapshot_notify_sender_receiver) = crossbeam::bounded(1);
209215
let join = Builder::new()
210216
.name("tendermint".to_string())
211217
.spawn(move || {
@@ -249,8 +255,29 @@ impl Worker {
249255
return
250256
}
251257
};
258+
// TODO: Make initialization steps to order insensitive.
259+
let snapshot_notify_sender = crossbeam::select! {
260+
recv(snapshot_notify_sender_receiver) -> msg => {
261+
match msg {
262+
Ok(sender) => sender,
263+
Err(crossbeam::RecvError) => {
264+
cerror!(ENGINE, "The tendermint extension is not initalized.");
265+
return
266+
}
267+
}
268+
}
269+
recv(quit_receiver) -> msg => {
270+
match msg {
271+
Ok(()) => {},
272+
Err(crossbeam::RecvError) => {
273+
cerror!(ENGINE, "The quit channel for tendermint thread had been closed.");
274+
}
275+
}
276+
return
277+
}
278+
};
252279
validators.register_client(Weak::clone(&client));
253-
let mut inner = Self::new(validators, extension, client, time_gap_params);
280+
let mut inner = Self::new(validators, extension, client, time_gap_params, snapshot_notify_sender);
254281
loop {
255282
crossbeam::select! {
256283
recv(receiver) -> msg => {
@@ -374,7 +401,7 @@ impl Worker {
374401
}
375402
})
376403
.unwrap();
377-
(join, external_params_initializer, extension_initializer, sender, quit)
404+
(join, external_params_initializer, extension_initializer, snapshot_notify_sender_initializer, sender, quit)
378405
}
379406

380407
/// The client is a thread-safe struct. Using it in multi-threads is safe.

0 commit comments

Comments
 (0)