Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.

Commit afe91b0

Browse files
committed
multithreaded worker
1 parent 17ce41a commit afe91b0

File tree

4 files changed

+156
-239
lines changed

4 files changed

+156
-239
lines changed

Cargo.lock

Lines changed: 4 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

client/consensus/pow/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,5 @@ parking_lot = "0.11.1"
3131
derive_more = "0.99.2"
3232
prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../../utils/prometheus", version = "0.9.0"}
3333
async-trait = "0.1.50"
34+
tokio = { version = "1.10.1", features = ["sync"] }
35+
tokio-stream = "0.1.7"

client/consensus/pow/src/lib.rs

Lines changed: 137 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -41,24 +41,24 @@
4141
4242
mod worker;
4343

44-
pub use crate::worker::{MiningBuild, MiningMetadata, MiningWorker};
44+
pub use crate::worker::{MiningBuild, MiningData, MiningMetadata};
4545

46-
use crate::worker::UntilImportedOrTimeout;
4746
use codec::{Decode, Encode};
4847
use futures::{Future, StreamExt};
4948
use log::*;
50-
use parking_lot::Mutex;
5149
use prometheus_endpoint::Registry;
5250
use sc_client_api::{self, backend::AuxStore, BlockOf, BlockchainEvents};
5351
use sc_consensus::{
5452
BasicQueue, BlockCheckParams, BlockImport, BlockImportParams, BoxBlockImport,
55-
BoxJustificationImport, ForkChoiceStrategy, ImportResult, Verifier,
53+
BoxJustificationImport, ForkChoiceStrategy, ImportResult, StateAction, StorageChanges,
54+
Verifier,
5655
};
5756
use sp_api::ProvideRuntimeApi;
5857
use sp_block_builder::BlockBuilder as BlockBuilderApi;
5958
use sp_blockchain::{well_known_cache_keys::Id as CacheKeyId, HeaderBackend, ProvideCache};
6059
use sp_consensus::{
61-
CanAuthorWith, Environment, Error as ConsensusError, Proposer, SelectChain, SyncOracle,
60+
BlockOrigin, CanAuthorWith, Environment, Error as ConsensusError, Proposer, SelectChain,
61+
SyncOracle,
6262
};
6363
use sp_consensus_pow::{Seal, TotalDifficulty, POW_ENGINE_ID};
6464
use sp_core::ExecutionContext;
@@ -72,6 +72,7 @@ use std::{
7272
borrow::Cow, cmp::Ordering, collections::HashMap, marker::PhantomData, sync::Arc,
7373
time::Duration,
7474
};
75+
use tokio_stream::wrappers::ReceiverStream;
7576

7677
#[derive(derive_more::Display, Debug)]
7778
pub enum Error<B: BlockT> {
@@ -502,6 +503,7 @@ where
502503
Ok(BasicQueue::new(verifier, block_import, justification_import, spawner, registry))
503504
}
504505

506+
type SealStream = ReceiverStream<Seal>;
505507
/// Start the mining worker for PoW. This function provides the necessary helper functions that can
506508
/// be used to implement a miner. However, it does not do the CPU-intensive mining itself.
507509
///
@@ -511,55 +513,71 @@ where
511513
///
512514
/// `pre_runtime` is a parameter that allows a custom additional pre-runtime digest to be inserted
513515
/// for blocks being built. This can encode authorship information, or just be a graffiti.
514-
pub fn start_mining_worker<Block, C, S, Algorithm, E, SO, L, CIDP, CAW>(
515-
block_import: BoxBlockImport<Block, sp_api::TransactionFor<C, Block>>,
516+
pub fn start_mining_worker<B, C, S, A, E, SO, L, CIDP, CAW>(
517+
mut block_import: BoxBlockImport<B, sp_api::TransactionFor<C, B>>,
516518
client: Arc<C>,
517519
select_chain: S,
518-
algorithm: Algorithm,
520+
algorithm: A,
519521
mut env: E,
520522
mut sync_oracle: SO,
521-
justification_sync_link: L,
523+
mut justification_sync_link: L,
522524
pre_runtime: Option<Vec<u8>>,
523525
create_inherent_data_providers: CIDP,
524-
timeout: Duration,
525526
build_time: Duration,
526527
can_author_with: CAW,
527528
) -> (
528-
Arc<Mutex<MiningWorker<Block, Algorithm, C, L, <E::Proposer as Proposer<Block>>::Proof>>>,
529+
tokio::sync::watch::Receiver<Option<MiningData<B::Hash, A::Difficulty>>>,
529530
impl Future<Output = ()>,
530531
)
531532
where
532-
Block: BlockT,
533-
C: ProvideRuntimeApi<Block> + BlockchainEvents<Block> + 'static,
534-
S: SelectChain<Block> + 'static,
535-
Algorithm: PowAlgorithm<Block> + Clone,
536-
Algorithm::Difficulty: Send + 'static,
537-
E: Environment<Block> + Send + Sync + 'static,
533+
B: BlockT,
534+
C: ProvideRuntimeApi<B> + BlockchainEvents<B> + 'static,
535+
S: SelectChain<B> + 'static,
536+
A: PowAlgorithm<B> + Clone,
537+
A::Difficulty: Send + 'static,
538+
E: Environment<B> + Send + Sync + 'static,
538539
E::Error: std::fmt::Debug,
539-
E::Proposer: Proposer<Block, Transaction = sp_api::TransactionFor<C, Block>>,
540+
E::Proposer: Proposer<B, Transaction = sp_api::TransactionFor<C, B>>,
540541
SO: SyncOracle + Clone + Send + Sync + 'static,
541-
L: sc_consensus::JustificationSyncLink<Block>,
542-
CIDP: CreateInherentDataProviders<Block, ()>,
543-
CAW: CanAuthorWith<Block> + Clone + Send + 'static,
542+
L: sc_consensus::JustificationSyncLink<B>,
543+
CIDP: CreateInherentDataProviders<B, ()>,
544+
CAW: CanAuthorWith<B> + Clone + Send + 'static,
544545
{
545-
let mut timer = UntilImportedOrTimeout::new(client.import_notification_stream(), timeout);
546-
let worker = Arc::new(Mutex::new(MiningWorker {
547-
build: None,
548-
algorithm: algorithm.clone(),
549-
block_import,
550-
justification_sync_link,
551-
}));
552-
let worker_ret = worker.clone();
546+
use futures::future::Either;
547+
548+
// Create a spmc channel here
549+
let (producer, consumer) = tokio::sync::watch::channel(None);
550+
551+
// Create channel for receiving a seal from the node
552+
let mut seal_channel: Option<SealStream> = None;
553+
let mut import_stream = client.import_notification_stream();
554+
let mut build = None;
553555

554556
let task = async move {
555557
loop {
556-
if timer.next().await.is_none() {
557-
break
558-
}
558+
if let Some(ref mut channel) = seal_channel {
559+
let result = futures::future::select(channel.next(), import_stream.next()).await;
560+
561+
match (result, build.take()) {
562+
// we only care about these two cases.
563+
(Either::Left((Some(seal), _)), Some(mining_build)) => {
564+
do_import_block(
565+
seal,
566+
mining_build,
567+
&algorithm,
568+
&mut block_import,
569+
&mut justification_sync_link,
570+
)
571+
.await
572+
}
573+
_ => {}
574+
}
575+
// we're done,
576+
seal_channel = None;
577+
};
559578

560579
if sync_oracle.is_major_syncing() {
561580
debug!(target: "pow", "Skipping proposal due to sync.");
562-
worker.lock().on_major_syncing();
563581
continue
564582
}
565583

@@ -587,10 +605,6 @@ where
587605
continue
588606
}
589607

590-
if worker.lock().best_hash() == Some(best_hash) {
591-
continue
592-
}
593-
594608
// The worker is locked for the duration of the whole proposing period. Within this
595609
// period, the mining target is outdated and useless anyway.
596610

@@ -636,7 +650,7 @@ where
636650
},
637651
};
638652

639-
let mut inherent_digest = Digest::<Block::Hash>::default();
653+
let mut inherent_digest = Digest::<B::Hash>::default();
640654
if let Some(pre_runtime) = &pre_runtime {
641655
inherent_digest.push(DigestItem::PreRuntime(POW_ENGINE_ID, pre_runtime.to_vec()));
642656
}
@@ -672,7 +686,11 @@ where
672686
},
673687
};
674688

675-
let build = MiningBuild::<Block, Algorithm, C, _> {
689+
let (sender, consumer) = tokio::sync::mpsc::channel(10);
690+
691+
seal_channel = Some(ReceiverStream::new(consumer));
692+
693+
let mining_build = MiningBuild::<B, A, C, _> {
676694
metadata: MiningMetadata {
677695
best_hash,
678696
pre_hash: proposal.block.header().hash(),
@@ -682,11 +700,16 @@ where
682700
proposal,
683701
};
684702

685-
worker.lock().on_build(build);
703+
let _res = producer.send(Some(MiningData {
704+
metadata: mining_build.metadata.clone(),
705+
sender: sender.clone(),
706+
}));
707+
708+
build = Some(mining_build);
686709
}
687710
};
688711

689-
(worker_ret, task)
712+
(consumer, task)
690713
}
691714

692715
/// Find PoW pre-runtime.
@@ -722,3 +745,76 @@ fn fetch_seal<B: BlockT>(
722745
_ => return Err(Error::<B>::HeaderUnsealed(hash).into()),
723746
}
724747
}
748+
749+
pub async fn do_import_block<B, C, A, P, L>(
750+
seal: Seal,
751+
build: MiningBuild<B, A, C, P>,
752+
algorithm: &A,
753+
block_import: &mut BoxBlockImport<B, sp_api::TransactionFor<C, B>>,
754+
justification_sync_link: &mut L,
755+
) where
756+
B: BlockT,
757+
C: ProvideRuntimeApi<B> + BlockchainEvents<B> + 'static,
758+
A: PowAlgorithm<B> + Clone,
759+
A::Difficulty: Send + 'static,
760+
L: sc_consensus::JustificationSyncLink<B>,
761+
{
762+
match algorithm.verify(
763+
&BlockId::Hash(build.metadata.best_hash),
764+
&build.metadata.pre_hash,
765+
build.metadata.pre_runtime.as_ref().map(|v| &v[..]),
766+
&seal,
767+
build.metadata.difficulty,
768+
) {
769+
Ok(true) => (),
770+
Ok(false) => {
771+
warn!(
772+
target: "pow",
773+
"Unable to import mined block: seal is invalid",
774+
);
775+
}
776+
Err(err) => {
777+
warn!(
778+
target: "pow",
779+
"Unable to import mined block: {:?}",
780+
err,
781+
);
782+
}
783+
}
784+
785+
let seal = DigestItem::Seal(POW_ENGINE_ID, seal);
786+
let (header, body) = build.proposal.block.deconstruct();
787+
788+
let mut import_block = BlockImportParams::new(BlockOrigin::Own, header);
789+
import_block.post_digests.push(seal);
790+
import_block.body = Some(body);
791+
import_block.state_action =
792+
StateAction::ApplyChanges(StorageChanges::Changes(build.proposal.storage_changes));
793+
794+
let intermediate =
795+
PowIntermediate::<A::Difficulty> { difficulty: Some(build.metadata.difficulty) };
796+
797+
import_block
798+
.intermediates
799+
.insert(Cow::from(INTERMEDIATE_KEY), Box::new(intermediate) as Box<_>);
800+
801+
let header = import_block.post_header();
802+
match block_import.import_block(import_block, HashMap::default()).await {
803+
Ok(res) => {
804+
res.handle_justification(&header.hash(), *header.number(), justification_sync_link);
805+
806+
info!(
807+
target: "pow",
808+
"✅ Successfully mined block on top of: {}",
809+
build.metadata.best_hash
810+
);
811+
}
812+
Err(err) => {
813+
warn!(
814+
target: "pow",
815+
"Unable to import mined block: {:?}",
816+
err,
817+
);
818+
}
819+
}
820+
}

0 commit comments

Comments
 (0)