diff --git a/Cargo.lock b/Cargo.lock index e883ba8c606e7..59ccceac9a6bf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2216,9 +2216,9 @@ checksum = "0b0e06c393068f3a6ef246c75cdca793d6a46347e75286933e5e75fd2fd11582" [[package]] name = "futures-lite" -version = "1.11.3" +version = "1.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b4481d0cd0de1d204a4fa55e7d45f07b1d958abcb06714b3446438e2eff695fb" +checksum = "7694489acd39452c77daa48516b894c153f192c3578d5a839b62c58099fcbf48" dependencies = [ "fastrand", "futures-core", @@ -7634,6 +7634,7 @@ dependencies = [ "async-trait", "derive_more", "futures 0.3.16", + "futures-core", "futures-timer 3.0.2", "log 0.4.14", "parity-scale-codec", @@ -7649,6 +7650,8 @@ dependencies = [ "sp-inherents", "sp-runtime", "substrate-prometheus-endpoint", + "tokio", + "tokio-stream", ] [[package]] @@ -10186,9 +10189,9 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "1.10.0" +version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "01cf844b23c6131f624accf65ce0e4e9956a8bb329400ea5bcc26ae3a5c20b0b" +checksum = "92036be488bb6594459f2e03b60e42df6f937fe6ca5c5ffdcb539c6b84dc40f5" dependencies = [ "autocfg 1.0.1", "bytes 1.0.1", @@ -10296,6 +10299,7 @@ dependencies = [ "futures-core", "pin-project-lite 0.2.6", "tokio", + "tokio-util", ] [[package]] diff --git a/client/consensus/pow/Cargo.toml b/client/consensus/pow/Cargo.toml index c71e11aef275e..6c1b526ef0325 100644 --- a/client/consensus/pow/Cargo.toml +++ b/client/consensus/pow/Cargo.toml @@ -31,3 +31,6 @@ parking_lot = "0.11.1" derive_more = "0.99.2" prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../../utils/prometheus", version = "0.9.0"} async-trait = "0.1.50" +tokio = { version = "1.10.1", features = ["sync"] } +tokio-stream = { version = "0.1.7", features = ['sync'] } +futures-core = "0.3.16" diff --git a/client/consensus/pow/src/lib.rs b/client/consensus/pow/src/lib.rs index 1f5781434ef71..ded5de03b91c7 100644 --- a/client/consensus/pow/src/lib.rs +++ b/client/consensus/pow/src/lib.rs @@ -41,24 +41,24 @@ mod worker; -pub use crate::worker::{MiningBuild, MiningMetadata, MiningWorker}; +pub use crate::worker::{MiningBuild, MiningData, MiningMetadata, MiningDataStream}; -use crate::worker::UntilImportedOrTimeout; use codec::{Decode, Encode}; -use futures::{Future, StreamExt}; +use futures::{Future, StreamExt, future}; use log::*; -use parking_lot::Mutex; use prometheus_endpoint::Registry; use sc_client_api::{self, backend::AuxStore, BlockOf, BlockchainEvents}; use sc_consensus::{ BasicQueue, BlockCheckParams, BlockImport, BlockImportParams, BoxBlockImport, - BoxJustificationImport, ForkChoiceStrategy, ImportResult, Verifier, + BoxJustificationImport, ForkChoiceStrategy, ImportResult, StateAction, StorageChanges, + Verifier, }; use sp_api::ProvideRuntimeApi; use sp_block_builder::BlockBuilder as BlockBuilderApi; use sp_blockchain::{well_known_cache_keys::Id as CacheKeyId, HeaderBackend, ProvideCache}; use sp_consensus::{ - CanAuthorWith, Environment, Error as ConsensusError, Proposer, SelectChain, SyncOracle, + BlockOrigin, CanAuthorWith, Environment, Error as ConsensusError, Proposer, SelectChain, + SyncOracle, }; use sp_consensus_pow::{Seal, TotalDifficulty, POW_ENGINE_ID}; use sp_core::ExecutionContext; @@ -72,6 +72,7 @@ use std::{ borrow::Cow, cmp::Ordering, collections::HashMap, marker::PhantomData, sync::Arc, time::Duration, }; +use tokio_stream::wrappers::ReceiverStream; #[derive(derive_more::Display, Debug)] pub enum Error { @@ -502,6 +503,7 @@ where Ok(BasicQueue::new(verifier, block_import, justification_import, spawner, registry)) } +type SealStream = ReceiverStream; /// Start the mining worker for PoW. This function provides the necessary helper functions that can /// be used to implement a miner. However, it does not do the CPU-intensive mining itself. /// @@ -511,55 +513,72 @@ where /// /// `pre_runtime` is a parameter that allows a custom additional pre-runtime digest to be inserted /// for blocks being built. This can encode authorship information, or just be a graffiti. -pub fn start_mining_worker( - block_import: BoxBlockImport>, +pub fn start_mining_worker( + mut block_import: BoxBlockImport>, client: Arc, select_chain: S, - algorithm: Algorithm, + algorithm: A, mut env: E, mut sync_oracle: SO, - justification_sync_link: L, + mut justification_sync_link: L, pre_runtime: Option>, create_inherent_data_providers: CIDP, - timeout: Duration, build_time: Duration, can_author_with: CAW, -) -> ( - Arc>::Proof>>>, - impl Future, -) +) -> (MiningDataStream<::Hash, A::Difficulty>, impl Future) where - Block: BlockT, - C: ProvideRuntimeApi + BlockchainEvents + 'static, - S: SelectChain + 'static, - Algorithm: PowAlgorithm + Clone, - Algorithm::Difficulty: Send + 'static, - E: Environment + Send + Sync + 'static, + B: BlockT, + B::Hash: Unpin, + C: ProvideRuntimeApi + BlockchainEvents + 'static, + S: SelectChain + 'static, + A: PowAlgorithm + Clone, + A::Difficulty: Send + Sync + Unpin + 'static, + E: Environment + Send + Sync + 'static, E::Error: std::fmt::Debug, - E::Proposer: Proposer>, + E::Proposer: Proposer>, SO: SyncOracle + Clone + Send + Sync + 'static, - L: sc_consensus::JustificationSyncLink, - CIDP: CreateInherentDataProviders, - CAW: CanAuthorWith + Clone + Send + 'static, + L: sc_consensus::JustificationSyncLink, + CIDP: CreateInherentDataProviders, + CAW: CanAuthorWith + Clone + Send + 'static, { - let mut timer = UntilImportedOrTimeout::new(client.import_notification_stream(), timeout); - let worker = Arc::new(Mutex::new(MiningWorker { - build: None, - algorithm: algorithm.clone(), - block_import, - justification_sync_link, - })); - let worker_ret = worker.clone(); + use futures::future::Either; + + // Create a spmc channel here + let (producer, mining_data_stream) = MiningDataStream::new(); + + // Create channel for receiving a seal from the node + let mut seal_channel: Option = None; + let mut import_stream = client.import_notification_stream() + .filter(|block| future::ready(!matches!(block.origin, BlockOrigin::Own))); + + let mut build = None; + // authorship let task = async move { loop { - if timer.next().await.is_none() { - break - } + if let Some(mut channel) = seal_channel.take() { + let result = futures::future::select(channel.next(), import_stream.next()).await; + + match result { + // we only care about this case. + Either::Left((Some(seal), _)) => { + if let Some(mining_build) = build.take() { + do_import_block( + seal, + mining_build, + &algorithm, + &mut block_import, + &mut justification_sync_link, + ) + .await + } + } + _ => {} + } + }; if sync_oracle.is_major_syncing() { debug!(target: "pow", "Skipping proposal due to sync."); - worker.lock().on_major_syncing(); continue } @@ -587,13 +606,6 @@ where continue } - if worker.lock().best_hash() == Some(best_hash) { - continue - } - - // The worker is locked for the duration of the whole proposing period. Within this - // period, the mining target is outdated and useless anyway. - let difficulty = match algorithm.difficulty(best_hash) { Ok(x) => x, Err(err) => { @@ -636,7 +648,7 @@ where }, }; - let mut inherent_digest = Digest::::default(); + let mut inherent_digest = Digest::::default(); if let Some(pre_runtime) = &pre_runtime { inherent_digest.push(DigestItem::PreRuntime(POW_ENGINE_ID, pre_runtime.to_vec())); } @@ -672,7 +684,11 @@ where }, }; - let build = MiningBuild:: { + let (sender, consumer) = tokio::sync::mpsc::channel(10); + + seal_channel = Some(ReceiverStream::new(consumer)); + + let mining_build = MiningBuild:: { metadata: MiningMetadata { best_hash, pre_hash: proposal.block.header().hash(), @@ -682,11 +698,19 @@ where proposal, }; - worker.lock().on_build(build); + let result = + producer.send(Some(MiningData { metadata: mining_build.metadata.clone(), sender })); + + if result.is_err() { + // Terminate task since all receivers have been dropped and in essence all mining threaded + return + } + + build = Some(mining_build); } }; - (worker_ret, task) + (mining_data_stream, task) } /// Find PoW pre-runtime. @@ -722,3 +746,76 @@ fn fetch_seal( _ => return Err(Error::::HeaderUnsealed(hash).into()), } } + +pub async fn do_import_block( + seal: Seal, + build: MiningBuild, + algorithm: &A, + block_import: &mut BoxBlockImport>, + justification_sync_link: &mut L, +) where + B: BlockT, + C: ProvideRuntimeApi + BlockchainEvents + 'static, + A: PowAlgorithm + Clone, + A::Difficulty: Send + 'static, + L: sc_consensus::JustificationSyncLink, +{ + match algorithm.verify( + &BlockId::Hash(build.metadata.best_hash), + &build.metadata.pre_hash, + build.metadata.pre_runtime.as_ref().map(|v| &v[..]), + &seal, + build.metadata.difficulty, + ) { + Ok(true) => (), + Ok(false) => { + warn!( + target: "pow", + "Unable to import mined block: seal is invalid", + ); + } + Err(err) => { + warn!( + target: "pow", + "Unable to import mined block: {:?}", + err, + ); + } + } + + let seal = DigestItem::Seal(POW_ENGINE_ID, seal); + let (header, body) = build.proposal.block.deconstruct(); + + let mut import_block = BlockImportParams::new(BlockOrigin::Own, header); + import_block.post_digests.push(seal); + import_block.body = Some(body); + import_block.state_action = + StateAction::ApplyChanges(StorageChanges::Changes(build.proposal.storage_changes)); + + let intermediate = + PowIntermediate:: { difficulty: Some(build.metadata.difficulty) }; + + import_block + .intermediates + .insert(Cow::from(INTERMEDIATE_KEY), Box::new(intermediate) as Box<_>); + + let header = import_block.post_header(); + match block_import.import_block(import_block, HashMap::default()).await { + Ok(res) => { + res.handle_justification(&header.hash(), *header.number(), justification_sync_link); + + info!( + target: "pow", + "✅ Successfully mined block on top of: {}", + build.metadata.best_hash + ); + } + Err(err) => { + warn!( + target: "pow", + "Unable to import mined block: {:?}", + err, + ); + } + } +} diff --git a/client/consensus/pow/src/worker.rs b/client/consensus/pow/src/worker.rs index c0ca16ccad3aa..b9914bd97d2e6 100644 --- a/client/consensus/pow/src/worker.rs +++ b/client/consensus/pow/src/worker.rs @@ -16,23 +16,15 @@ // You should have received a copy of the GNU General Public License // along with this program. If not, see . -use futures::{ - prelude::*, - task::{Context, Poll}, -}; -use futures_timer::Delay; -use log::*; -use sc_client_api::ImportNotifications; -use sc_consensus::{BlockImportParams, BoxBlockImport, StateAction, StorageChanges}; -use sp_consensus::{BlockOrigin, Proposal}; -use sp_runtime::{ - generic::BlockId, - traits::{Block as BlockT, Header as HeaderT}, - DigestItem, -}; -use std::{borrow::Cow, collections::HashMap, pin::Pin, time::Duration}; - -use crate::{PowAlgorithm, PowIntermediate, Seal, INTERMEDIATE_KEY, POW_ENGINE_ID}; +//! +use futures::{ready, Stream, StreamExt}; +use sp_consensus::Proposal; +use sp_runtime::traits::Block as BlockT; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio_stream::wrappers::WatchStream; + +use crate::{PowAlgorithm, Seal}; /// Mining metadata. This is the information needed to start an actual mining loop. #[derive(Clone, Eq, PartialEq)] @@ -47,187 +39,82 @@ pub struct MiningMetadata { pub difficulty: D, } +#[derive(Clone)] +pub struct MiningData { + pub metadata: MiningMetadata, + /// sink to send the seal back to the authorship task + pub sender: tokio::sync::mpsc::Sender, +} + /// A build of mining, containing the metadata and the block proposal. -pub struct MiningBuild< - Block: BlockT, - Algorithm: PowAlgorithm, - C: sp_api::ProvideRuntimeApi, - Proof, -> { +pub struct MiningBuild, C: sp_api::ProvideRuntimeApi, P> { /// Mining metadata. - pub metadata: MiningMetadata, + pub metadata: MiningMetadata, /// Mining proposal. - pub proposal: Proposal, Proof>, + pub proposal: Proposal, P>, } -/// Mining worker that exposes structs to query the current mining build and submit mined blocks. -pub struct MiningWorker< - Block: BlockT, - Algorithm: PowAlgorithm, - C: sp_api::ProvideRuntimeApi, - L: sc_consensus::JustificationSyncLink, - Proof, -> { - pub(crate) build: Option>, - pub(crate) algorithm: Algorithm, - pub(crate) block_import: BoxBlockImport>, - pub(crate) justification_sync_link: L, +type MetadataReciever = tokio::sync::watch::Receiver>>; +type MetadataProducer = tokio::sync::watch::Sender>>; + +// so this is Unpin +type MetadataWatchStream = WatchStream>>; + +/// A clone-able stream that yields [`MiningData`]. +/// Every instance of the stream will recieve the same data +/// as the stream is implemented on top of a spmc channel +/// see [`tokio::sync::watch::Receiver`] +pub struct MiningDataStream { + consumer: MetadataReciever, + inner: MetadataWatchStream, + version: usize, } -impl MiningWorker +impl Clone for MiningDataStream where - Block: BlockT, - C: sp_api::ProvideRuntimeApi, - Algorithm: PowAlgorithm, - Algorithm::Difficulty: 'static + Send, - L: sc_consensus::JustificationSyncLink, - sp_api::TransactionFor: Send + 'static, + H: 'static + Clone + Send + Sync + Unpin, + D: 'static + Clone + Send + Sync + Unpin, { - /// Get the current best hash. `None` if the worker has just started or the client is doing - /// major syncing. - pub fn best_hash(&self) -> Option { - self.build.as_ref().map(|b| b.metadata.best_hash) - } - - pub(crate) fn on_major_syncing(&mut self) { - self.build = None; - } + fn clone(&self) -> Self { + let consumer = self.consumer.clone(); + let inner = WatchStream::new(consumer.clone()); - pub(crate) fn on_build(&mut self, build: MiningBuild) { - self.build = Some(build); - } - - /// Get a copy of the current mining metadata, if available. - pub fn metadata(&self) -> Option> { - self.build.as_ref().map(|b| b.metadata.clone()) - } - - /// Submit a mined seal. The seal will be validated again. Returns true if the submission is - /// successful. - pub async fn submit(&mut self, seal: Seal) -> bool { - if let Some(build) = self.build.take() { - match self.algorithm.verify( - &BlockId::Hash(build.metadata.best_hash), - &build.metadata.pre_hash, - build.metadata.pre_runtime.as_ref().map(|v| &v[..]), - &seal, - build.metadata.difficulty, - ) { - Ok(true) => (), - Ok(false) => { - warn!( - target: "pow", - "Unable to import mined block: seal is invalid", - ); - return false - }, - Err(err) => { - warn!( - target: "pow", - "Unable to import mined block: {:?}", - err, - ); - return false - }, - } - - let seal = DigestItem::Seal(POW_ENGINE_ID, seal); - let (header, body) = build.proposal.block.deconstruct(); - - let mut import_block = BlockImportParams::new(BlockOrigin::Own, header); - import_block.post_digests.push(seal); - import_block.body = Some(body); - import_block.state_action = - StateAction::ApplyChanges(StorageChanges::Changes(build.proposal.storage_changes)); - - let intermediate = PowIntermediate:: { - difficulty: Some(build.metadata.difficulty), - }; - - import_block - .intermediates - .insert(Cow::from(INTERMEDIATE_KEY), Box::new(intermediate) as Box<_>); - - let header = import_block.post_header(); - match self.block_import.import_block(import_block, HashMap::default()).await { - Ok(res) => { - res.handle_justification( - &header.hash(), - *header.number(), - &mut self.justification_sync_link, - ); - - info!( - target: "pow", - "✅ Successfully mined block on top of: {}", - build.metadata.best_hash - ); - true - }, - Err(err) => { - warn!( - target: "pow", - "Unable to import mined block: {:?}", - err, - ); - false - }, - } - } else { - warn!( - target: "pow", - "Unable to import mined block: build does not exist", - ); - false - } + Self { consumer, inner, version: 1 } } } -/// A stream that waits for a block import or timeout. -pub struct UntilImportedOrTimeout { - import_notifications: ImportNotifications, - timeout: Duration, - inner_delay: Option, -} +impl MiningDataStream +where + H: 'static + Clone + Send + Sync + Unpin, + D: 'static + Clone + Send + Sync + Unpin, +{ + pub fn new() -> (MetadataProducer, Self) { + let (producer, consumer):(MetadataProducer, MetadataReciever) = tokio::sync::watch::channel(None); -impl UntilImportedOrTimeout { - /// Create a new stream using the given import notification and timeout duration. - pub fn new(import_notifications: ImportNotifications, timeout: Duration) -> Self { - Self { import_notifications, timeout, inner_delay: None } + let inner = WatchStream::new(consumer.clone()); + + (producer, Self { consumer, inner, version: 1 }) } } -impl Stream for UntilImportedOrTimeout { - type Item = (); - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { - let mut fire = false; - - loop { - match Stream::poll_next(Pin::new(&mut self.import_notifications), cx) { - Poll::Pending => break, - Poll::Ready(Some(_)) => { - fire = true; - }, - Poll::Ready(None) => return Poll::Ready(None), +impl Stream for MiningDataStream +where + H: 'static + Clone + Send + Sync + Unpin, + D: 'static + Clone + Send + Sync + Unpin, +{ + type Item = MiningData; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + match ready!(self.inner.poll_next_unpin(cx)) { + Some(Some(item)) => Poll::Ready(Some(item)), + Some(None) => { + if self.version == 1 { + self.version += 1; + return Poll::Pending; + } + Poll::Ready(None) } - } - - let timeout = self.timeout.clone(); - let inner_delay = self.inner_delay.get_or_insert_with(|| Delay::new(timeout)); - - match Future::poll(Pin::new(inner_delay), cx) { - Poll::Pending => (), - Poll::Ready(()) => { - fire = true; - }, - } - - if fire { - self.inner_delay = None; - Poll::Ready(Some(())) - } else { - Poll::Pending + None => Poll::Ready(None), } } }