Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.

Commit 79cbe8b

Browse files
committed
multithreaded pow mining worker
1 parent 17ce41a commit 79cbe8b

File tree

4 files changed

+208
-248
lines changed

4 files changed

+208
-248
lines changed

Cargo.lock

Lines changed: 8 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

client/consensus/pow/Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,3 +31,6 @@ parking_lot = "0.11.1"
3131
derive_more = "0.99.2"
3232
prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../../utils/prometheus", version = "0.9.0"}
3333
async-trait = "0.1.50"
34+
tokio = { version = "1.10.1", features = ["sync"] }
35+
futures-lite= "1.12.0"
36+
tokio-stream = { version = "0.1.7", features = ['sync'] }

client/consensus/pow/src/lib.rs

Lines changed: 184 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -41,24 +41,24 @@
4141
4242
mod worker;
4343

44-
pub use crate::worker::{MiningBuild, MiningMetadata, MiningWorker};
44+
pub use crate::worker::{MiningBuild, MiningData, MiningMetadata};
4545

46-
use crate::worker::UntilImportedOrTimeout;
4746
use codec::{Decode, Encode};
48-
use futures::{Future, StreamExt};
47+
use futures::{executor::block_on, Future, StreamExt};
4948
use log::*;
50-
use parking_lot::Mutex;
5149
use prometheus_endpoint::Registry;
5250
use sc_client_api::{self, backend::AuxStore, BlockOf, BlockchainEvents};
5351
use sc_consensus::{
5452
BasicQueue, BlockCheckParams, BlockImport, BlockImportParams, BoxBlockImport,
55-
BoxJustificationImport, ForkChoiceStrategy, ImportResult, Verifier,
53+
BoxJustificationImport, ForkChoiceStrategy, ImportResult, StateAction, StorageChanges,
54+
Verifier,
5655
};
5756
use sp_api::ProvideRuntimeApi;
5857
use sp_block_builder::BlockBuilder as BlockBuilderApi;
5958
use sp_blockchain::{well_known_cache_keys::Id as CacheKeyId, HeaderBackend, ProvideCache};
6059
use sp_consensus::{
61-
CanAuthorWith, Environment, Error as ConsensusError, Proposer, SelectChain, SyncOracle,
60+
BlockOrigin, CanAuthorWith, Environment, Error as ConsensusError, Proposer, SelectChain,
61+
SyncOracle,
6262
};
6363
use sp_consensus_pow::{Seal, TotalDifficulty, POW_ENGINE_ID};
6464
use sp_core::ExecutionContext;
@@ -69,9 +69,10 @@ use sp_runtime::{
6969
RuntimeString,
7070
};
7171
use std::{
72-
borrow::Cow, cmp::Ordering, collections::HashMap, marker::PhantomData, sync::Arc,
72+
borrow::Cow, cmp::Ordering, collections::HashMap, hint, marker::PhantomData, sync::Arc, thread,
7373
time::Duration,
7474
};
75+
use tokio_stream::wrappers::ReceiverStream;
7576

7677
#[derive(derive_more::Display, Debug)]
7778
pub enum Error<B: BlockT> {
@@ -440,7 +441,7 @@ impl<B: BlockT, Algorithm> PowVerifier<B, Algorithm> {
440441
if id == POW_ENGINE_ID {
441442
(DigestItem::Seal(id, seal.clone()), seal)
442443
} else {
443-
return Err(Error::WrongEngine(id))
444+
return Err(Error::WrongEngine(id));
444445
},
445446
_ => return Err(Error::HeaderUnsealed(hash)),
446447
};
@@ -502,6 +503,7 @@ where
502503
Ok(BasicQueue::new(verifier, block_import, justification_import, spawner, registry))
503504
}
504505

506+
type SealStream = ReceiverStream<Seal>;
505507
/// Start the mining worker for PoW. This function provides the necessary helper functions that can
506508
/// be used to implement a miner. However, it does not do the CPU-intensive mining itself.
507509
///
@@ -511,55 +513,78 @@ where
511513
///
512514
/// `pre_runtime` is a parameter that allows a custom additional pre-runtime digest to be inserted
513515
/// for blocks being built. This can encode authorship information, or just be a graffiti.
514-
pub fn start_mining_worker<Block, C, S, Algorithm, E, SO, L, CIDP, CAW>(
515-
block_import: BoxBlockImport<Block, sp_api::TransactionFor<C, Block>>,
516+
pub fn start_mining_worker<B, C, S, A, E, SO, L, CIDP, CAW, M>(
517+
mut block_import: BoxBlockImport<B, sp_api::TransactionFor<C, B>>,
516518
client: Arc<C>,
517519
select_chain: S,
518-
algorithm: Algorithm,
520+
algorithm: A,
519521
mut env: E,
520522
mut sync_oracle: SO,
521-
justification_sync_link: L,
523+
mut justification_sync_link: L,
522524
pre_runtime: Option<Vec<u8>>,
523525
create_inherent_data_providers: CIDP,
524-
timeout: Duration,
525526
build_time: Duration,
526527
can_author_with: CAW,
527-
) -> (
528-
Arc<Mutex<MiningWorker<Block, Algorithm, C, L, <E::Proposer as Proposer<Block>>::Proof>>>,
529-
impl Future<Output = ()>,
530-
)
528+
thread_count: u32,
529+
compute: M,
530+
) -> impl Future<Output = ()>
531531
where
532-
Block: BlockT,
533-
C: ProvideRuntimeApi<Block> + BlockchainEvents<Block> + 'static,
534-
S: SelectChain<Block> + 'static,
535-
Algorithm: PowAlgorithm<Block> + Clone,
536-
Algorithm::Difficulty: Send + 'static,
537-
E: Environment<Block> + Send + Sync + 'static,
532+
B: BlockT,
533+
B::Hash: Unpin,
534+
C: ProvideRuntimeApi<B> + BlockchainEvents<B> + 'static,
535+
S: SelectChain<B> + 'static,
536+
A: PowAlgorithm<B> + Clone,
537+
A::Difficulty: Send + Sync + Unpin + 'static,
538+
E: Environment<B> + Send + Sync + 'static,
538539
E::Error: std::fmt::Debug,
539-
E::Proposer: Proposer<Block, Transaction = sp_api::TransactionFor<C, Block>>,
540+
E::Proposer: Proposer<B, Transaction = sp_api::TransactionFor<C, B>>,
540541
SO: SyncOracle + Clone + Send + Sync + 'static,
541-
L: sc_consensus::JustificationSyncLink<Block>,
542-
CIDP: CreateInherentDataProviders<Block, ()>,
543-
CAW: CanAuthorWith<Block> + Clone + Send + 'static,
542+
L: sc_consensus::JustificationSyncLink<B>,
543+
CIDP: CreateInherentDataProviders<B, ()>,
544+
CAW: CanAuthorWith<B> + Clone + Send + 'static,
545+
M: Fn(&MiningData<B::Hash, A::Difficulty>) -> Option<Seal> + Send + Copy + 'static,
544546
{
545-
let mut timer = UntilImportedOrTimeout::new(client.import_notification_stream(), timeout);
546-
let worker = Arc::new(Mutex::new(MiningWorker {
547-
build: None,
548-
algorithm: algorithm.clone(),
549-
block_import,
550-
justification_sync_link,
551-
}));
552-
let worker_ret = worker.clone();
547+
use futures::future::Either;
553548

549+
// Create a spmc channel here
550+
let (producer, consumer) = tokio::sync::watch::channel(None);
551+
552+
// Create channel for receiving a seal from the node
553+
let mut seal_channel: Option<SealStream> = None;
554+
let mut import_stream = client.import_notification_stream();
555+
let mut build = None;
556+
557+
// authorship
554558
let task = async move {
555559
loop {
556-
if timer.next().await.is_none() {
557-
break
558-
}
560+
if let Some(mut channel) = seal_channel.take() {
561+
let result = futures::future::select(channel.next(), import_stream.next()).await;
562+
563+
match result {
564+
// we only care about these two cases.
565+
Either::Left((Some(seal), _)) => {
566+
if let Some(mining_build) = build.take() {
567+
do_import_block(
568+
seal,
569+
mining_build,
570+
&algorithm,
571+
&mut block_import,
572+
&mut justification_sync_link,
573+
)
574+
.await
575+
}
576+
}
577+
Either::Right((Some(block), _)) => {
578+
if matches!(block.origin, BlockOrigin::Own) {
579+
continue;
580+
}
581+
}
582+
_ => {}
583+
}
584+
};
559585

560586
if sync_oracle.is_major_syncing() {
561587
debug!(target: "pow", "Skipping proposal due to sync.");
562-
worker.lock().on_major_syncing();
563588
continue
564589
}
565590

@@ -587,10 +612,6 @@ where
587612
continue
588613
}
589614

590-
if worker.lock().best_hash() == Some(best_hash) {
591-
continue
592-
}
593-
594615
// The worker is locked for the duration of the whole proposing period. Within this
595616
// period, the mining target is outdated and useless anyway.
596617

@@ -636,7 +657,7 @@ where
636657
},
637658
};
638659

639-
let mut inherent_digest = Digest::<Block::Hash>::default();
660+
let mut inherent_digest = Digest::<B::Hash>::default();
640661
if let Some(pre_runtime) = &pre_runtime {
641662
inherent_digest.push(DigestItem::PreRuntime(POW_ENGINE_ID, pre_runtime.to_vec()));
642663
}
@@ -672,7 +693,11 @@ where
672693
},
673694
};
674695

675-
let build = MiningBuild::<Block, Algorithm, C, _> {
696+
let (sender, consumer) = tokio::sync::mpsc::channel(10);
697+
698+
seal_channel = Some(ReceiverStream::new(consumer));
699+
700+
let mining_build = MiningBuild::<B, A, C, _> {
676701
metadata: MiningMetadata {
677702
best_hash,
678703
pre_hash: proposal.block.header().hash(),
@@ -682,11 +707,49 @@ where
682707
proposal,
683708
};
684709

685-
worker.lock().on_build(build);
710+
let _res =
711+
producer.send(Some(MiningData { metadata: mining_build.metadata.clone(), sender }));
712+
713+
build = Some(mining_build);
686714
}
687715
};
688716

689-
(worker_ret, task)
717+
// mining
718+
for _ in 0..thread_count {
719+
let rx = consumer.clone();
720+
thread::spawn(move || {
721+
use futures_lite::future::poll_once;
722+
let mut stream = tokio_stream::wrappers::WatchStream::new(rx);
723+
let mut item = futures::executor::block_on(stream.next()).flatten();
724+
725+
block_on(async {
726+
loop {
727+
// figured it out, we simply have to check once if there's a new item
728+
// in the stream, otherwise we run compute in a hot loop
729+
// this ensures that when a new block comes in, we immediately start building on it
730+
match poll_once(stream.next()).await {
731+
Some(Some(new_item)) => {
732+
item = new_item;
733+
}
734+
// stream has ended, shutdown this thread
735+
Some(None) => return,
736+
_ => {}
737+
}
738+
739+
if let Some(ref build) = item {
740+
if let Some(seal) = compute(build) {
741+
let _ = build.sender.send(seal).await;
742+
}
743+
}
744+
// machine instruction that tells the cpu, we're in a hot loop.
745+
// and cpu can optimize for it.
746+
hint::spin_loop();
747+
}
748+
});
749+
});
750+
}
751+
752+
task
690753
}
691754

692755
/// Find PoW pre-runtime.
@@ -717,8 +780,81 @@ fn fetch_seal<B: BlockT>(
717780
if id == &POW_ENGINE_ID {
718781
Ok(seal.clone())
719782
} else {
720-
return Err(Error::<B>::WrongEngine(*id).into())
783+
return Err(Error::<B>::WrongEngine(*id).into());
721784
},
722785
_ => return Err(Error::<B>::HeaderUnsealed(hash).into()),
723786
}
724787
}
788+
789+
pub async fn do_import_block<B, C, A, P, L>(
790+
seal: Seal,
791+
build: MiningBuild<B, A, C, P>,
792+
algorithm: &A,
793+
block_import: &mut BoxBlockImport<B, sp_api::TransactionFor<C, B>>,
794+
justification_sync_link: &mut L,
795+
) where
796+
B: BlockT,
797+
C: ProvideRuntimeApi<B> + BlockchainEvents<B> + 'static,
798+
A: PowAlgorithm<B> + Clone,
799+
A::Difficulty: Send + 'static,
800+
L: sc_consensus::JustificationSyncLink<B>,
801+
{
802+
match algorithm.verify(
803+
&BlockId::Hash(build.metadata.best_hash),
804+
&build.metadata.pre_hash,
805+
build.metadata.pre_runtime.as_ref().map(|v| &v[..]),
806+
&seal,
807+
build.metadata.difficulty,
808+
) {
809+
Ok(true) => (),
810+
Ok(false) => {
811+
warn!(
812+
target: "pow",
813+
"Unable to import mined block: seal is invalid",
814+
);
815+
}
816+
Err(err) => {
817+
warn!(
818+
target: "pow",
819+
"Unable to import mined block: {:?}",
820+
err,
821+
);
822+
}
823+
}
824+
825+
let seal = DigestItem::Seal(POW_ENGINE_ID, seal);
826+
let (header, body) = build.proposal.block.deconstruct();
827+
828+
let mut import_block = BlockImportParams::new(BlockOrigin::Own, header);
829+
import_block.post_digests.push(seal);
830+
import_block.body = Some(body);
831+
import_block.state_action =
832+
StateAction::ApplyChanges(StorageChanges::Changes(build.proposal.storage_changes));
833+
834+
let intermediate =
835+
PowIntermediate::<A::Difficulty> { difficulty: Some(build.metadata.difficulty) };
836+
837+
import_block
838+
.intermediates
839+
.insert(Cow::from(INTERMEDIATE_KEY), Box::new(intermediate) as Box<_>);
840+
841+
let header = import_block.post_header();
842+
match block_import.import_block(import_block, HashMap::default()).await {
843+
Ok(res) => {
844+
res.handle_justification(&header.hash(), *header.number(), justification_sync_link);
845+
846+
info!(
847+
target: "pow",
848+
"✅ Successfully mined block on top of: {}",
849+
build.metadata.best_hash
850+
);
851+
}
852+
Err(err) => {
853+
warn!(
854+
target: "pow",
855+
"Unable to import mined block: {:?}",
856+
err,
857+
);
858+
}
859+
}
860+
}

0 commit comments

Comments
 (0)