Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 8 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions client/consensus/pow/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,6 @@ parking_lot = "0.11.1"
derive_more = "0.99.2"
prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../../utils/prometheus", version = "0.9.0"}
async-trait = "0.1.50"
tokio = { version = "1.10.1", features = ["sync"] }
tokio-stream = { version = "0.1.7", features = ['sync'] }
futures-core = "0.3.16"
193 changes: 145 additions & 48 deletions client/consensus/pow/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,24 +41,24 @@

mod worker;

pub use crate::worker::{MiningBuild, MiningMetadata, MiningWorker};
pub use crate::worker::{MiningBuild, MiningData, MiningMetadata, MiningDataStream};

use crate::worker::UntilImportedOrTimeout;
use codec::{Decode, Encode};
use futures::{Future, StreamExt};
use futures::{Future, StreamExt, future};
use log::*;
use parking_lot::Mutex;
use prometheus_endpoint::Registry;
use sc_client_api::{self, backend::AuxStore, BlockOf, BlockchainEvents};
use sc_consensus::{
BasicQueue, BlockCheckParams, BlockImport, BlockImportParams, BoxBlockImport,
BoxJustificationImport, ForkChoiceStrategy, ImportResult, Verifier,
BoxJustificationImport, ForkChoiceStrategy, ImportResult, StateAction, StorageChanges,
Verifier,
};
use sp_api::ProvideRuntimeApi;
use sp_block_builder::BlockBuilder as BlockBuilderApi;
use sp_blockchain::{well_known_cache_keys::Id as CacheKeyId, HeaderBackend, ProvideCache};
use sp_consensus::{
CanAuthorWith, Environment, Error as ConsensusError, Proposer, SelectChain, SyncOracle,
BlockOrigin, CanAuthorWith, Environment, Error as ConsensusError, Proposer, SelectChain,
SyncOracle,
};
use sp_consensus_pow::{Seal, TotalDifficulty, POW_ENGINE_ID};
use sp_core::ExecutionContext;
Expand All @@ -72,6 +72,7 @@ use std::{
borrow::Cow, cmp::Ordering, collections::HashMap, marker::PhantomData, sync::Arc,
time::Duration,
};
use tokio_stream::wrappers::ReceiverStream;

#[derive(derive_more::Display, Debug)]
pub enum Error<B: BlockT> {
Expand Down Expand Up @@ -502,6 +503,7 @@ where
Ok(BasicQueue::new(verifier, block_import, justification_import, spawner, registry))
}

type SealStream = ReceiverStream<Seal>;
/// Start the mining worker for PoW. This function provides the necessary helper functions that can
/// be used to implement a miner. However, it does not do the CPU-intensive mining itself.
///
Expand All @@ -511,55 +513,72 @@ where
///
/// `pre_runtime` is a parameter that allows a custom additional pre-runtime digest to be inserted
/// for blocks being built. This can encode authorship information, or just be a graffiti.
pub fn start_mining_worker<Block, C, S, Algorithm, E, SO, L, CIDP, CAW>(
block_import: BoxBlockImport<Block, sp_api::TransactionFor<C, Block>>,
pub fn start_mining_worker<B, C, S, A, E, SO, L, CIDP, CAW>(
mut block_import: BoxBlockImport<B, sp_api::TransactionFor<C, B>>,
client: Arc<C>,
select_chain: S,
algorithm: Algorithm,
algorithm: A,
mut env: E,
mut sync_oracle: SO,
justification_sync_link: L,
mut justification_sync_link: L,
pre_runtime: Option<Vec<u8>>,
create_inherent_data_providers: CIDP,
timeout: Duration,
build_time: Duration,
can_author_with: CAW,
) -> (
Arc<Mutex<MiningWorker<Block, Algorithm, C, L, <E::Proposer as Proposer<Block>>::Proof>>>,
impl Future<Output = ()>,
)
) -> (MiningDataStream<<B as BlockT>::Hash, A::Difficulty>, impl Future<Output = ()>)
where
Block: BlockT,
C: ProvideRuntimeApi<Block> + BlockchainEvents<Block> + 'static,
S: SelectChain<Block> + 'static,
Algorithm: PowAlgorithm<Block> + Clone,
Algorithm::Difficulty: Send + 'static,
E: Environment<Block> + Send + Sync + 'static,
B: BlockT,
B::Hash: Unpin,
C: ProvideRuntimeApi<B> + BlockchainEvents<B> + 'static,
S: SelectChain<B> + 'static,
A: PowAlgorithm<B> + Clone,
A::Difficulty: Send + Sync + Unpin + 'static,
E: Environment<B> + Send + Sync + 'static,
E::Error: std::fmt::Debug,
E::Proposer: Proposer<Block, Transaction = sp_api::TransactionFor<C, Block>>,
E::Proposer: Proposer<B, Transaction = sp_api::TransactionFor<C, B>>,
SO: SyncOracle + Clone + Send + Sync + 'static,
L: sc_consensus::JustificationSyncLink<Block>,
CIDP: CreateInherentDataProviders<Block, ()>,
CAW: CanAuthorWith<Block> + Clone + Send + 'static,
L: sc_consensus::JustificationSyncLink<B>,
CIDP: CreateInherentDataProviders<B, ()>,
CAW: CanAuthorWith<B> + Clone + Send + 'static,
{
let mut timer = UntilImportedOrTimeout::new(client.import_notification_stream(), timeout);
let worker = Arc::new(Mutex::new(MiningWorker {
build: None,
algorithm: algorithm.clone(),
block_import,
justification_sync_link,
}));
let worker_ret = worker.clone();
use futures::future::Either;

// Create a spmc channel here
let (producer, mining_data_stream) = MiningDataStream::new();

// Create channel for receiving a seal from the node
let mut seal_channel: Option<SealStream> = None;
let mut import_stream = client.import_notification_stream()
.filter(|block| future::ready(!matches!(block.origin, BlockOrigin::Own)));

let mut build = None;

// authorship
let task = async move {
loop {
if timer.next().await.is_none() {
break
}
if let Some(mut channel) = seal_channel.take() {
let result = futures::future::select(channel.next(), import_stream.next()).await;

match result {
// we only care about this case.
Either::Left((Some(seal), _)) => {
if let Some(mining_build) = build.take() {
do_import_block(
seal,
mining_build,
&algorithm,
&mut block_import,
&mut justification_sync_link,
)
.await
}
}
_ => {}
}
};

if sync_oracle.is_major_syncing() {
debug!(target: "pow", "Skipping proposal due to sync.");
worker.lock().on_major_syncing();
continue
}

Expand Down Expand Up @@ -587,13 +606,6 @@ where
continue
}

if worker.lock().best_hash() == Some(best_hash) {
continue
}

// The worker is locked for the duration of the whole proposing period. Within this
// period, the mining target is outdated and useless anyway.

let difficulty = match algorithm.difficulty(best_hash) {
Ok(x) => x,
Err(err) => {
Expand Down Expand Up @@ -636,7 +648,7 @@ where
},
};

let mut inherent_digest = Digest::<Block::Hash>::default();
let mut inherent_digest = Digest::<B::Hash>::default();
if let Some(pre_runtime) = &pre_runtime {
inherent_digest.push(DigestItem::PreRuntime(POW_ENGINE_ID, pre_runtime.to_vec()));
}
Expand Down Expand Up @@ -672,7 +684,11 @@ where
},
};

let build = MiningBuild::<Block, Algorithm, C, _> {
let (sender, consumer) = tokio::sync::mpsc::channel(10);

seal_channel = Some(ReceiverStream::new(consumer));

let mining_build = MiningBuild::<B, A, C, _> {
metadata: MiningMetadata {
best_hash,
pre_hash: proposal.block.header().hash(),
Expand All @@ -682,11 +698,19 @@ where
proposal,
};

worker.lock().on_build(build);
let result =
producer.send(Some(MiningData { metadata: mining_build.metadata.clone(), sender }));

if result.is_err() {
// Terminate task since all receivers have been dropped and in essence all mining threaded
return
}

build = Some(mining_build);
}
};

(worker_ret, task)
(mining_data_stream, task)
}

/// Find PoW pre-runtime.
Expand Down Expand Up @@ -722,3 +746,76 @@ fn fetch_seal<B: BlockT>(
_ => return Err(Error::<B>::HeaderUnsealed(hash).into()),
}
}

pub async fn do_import_block<B, C, A, P, L>(
seal: Seal,
build: MiningBuild<B, A, C, P>,
algorithm: &A,
block_import: &mut BoxBlockImport<B, sp_api::TransactionFor<C, B>>,
justification_sync_link: &mut L,
) where
B: BlockT,
C: ProvideRuntimeApi<B> + BlockchainEvents<B> + 'static,
A: PowAlgorithm<B> + Clone,
A::Difficulty: Send + 'static,
L: sc_consensus::JustificationSyncLink<B>,
{
match algorithm.verify(
&BlockId::Hash(build.metadata.best_hash),
&build.metadata.pre_hash,
build.metadata.pre_runtime.as_ref().map(|v| &v[..]),
&seal,
build.metadata.difficulty,
) {
Ok(true) => (),
Ok(false) => {
warn!(
target: "pow",
"Unable to import mined block: seal is invalid",
);
}
Err(err) => {
warn!(
target: "pow",
"Unable to import mined block: {:?}",
err,
);
}
}

let seal = DigestItem::Seal(POW_ENGINE_ID, seal);
let (header, body) = build.proposal.block.deconstruct();

let mut import_block = BlockImportParams::new(BlockOrigin::Own, header);
import_block.post_digests.push(seal);
import_block.body = Some(body);
import_block.state_action =
StateAction::ApplyChanges(StorageChanges::Changes(build.proposal.storage_changes));

let intermediate =
PowIntermediate::<A::Difficulty> { difficulty: Some(build.metadata.difficulty) };

import_block
.intermediates
.insert(Cow::from(INTERMEDIATE_KEY), Box::new(intermediate) as Box<_>);

let header = import_block.post_header();
match block_import.import_block(import_block, HashMap::default()).await {
Ok(res) => {
res.handle_justification(&header.hash(), *header.number(), justification_sync_link);

info!(
target: "pow",
"✅ Successfully mined block on top of: {}",
build.metadata.best_hash
);
}
Err(err) => {
warn!(
target: "pow",
"Unable to import mined block: {:?}",
err,
);
}
}
}
Loading