Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions bin/node-template/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ pub fn new_full(config: Configuration) -> Result<impl AbstractService, ServiceEr
let provider = client as Arc<dyn StorageAndProofProvider<_, _>>;
Ok(Arc::new(GrandpaFinalityProofProvider::new(backend, provider)) as _)
})?
.build()?;
.build_full()?;

if role.is_authority() {
let proposer = sc_basic_authorship::ProposerFactory::new(
Expand Down Expand Up @@ -264,5 +264,5 @@ pub fn new_light(config: Configuration) -> Result<impl AbstractService, ServiceE
let provider = client as Arc<dyn StorageAndProofProvider<_, _>>;
Ok(Arc::new(GrandpaFinalityProofProvider::new(backend, provider)) as _)
})?
.build()
.build_light()
}
4 changes: 2 additions & 2 deletions bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ macro_rules! new_full {
let provider = client as Arc<dyn grandpa::StorageAndProofProvider<_, _>>;
Ok(Arc::new(grandpa::FinalityProofProvider::new(backend, provider)) as _)
})?
.build()?;
.build_full()?;

let (block_import, grandpa_link, babe_link) = import_setup.take()
.expect("Link Half and Block Import are present for Full Services or setup failed before. qed");
Expand Down Expand Up @@ -405,7 +405,7 @@ pub fn new_light(config: Configuration)

Ok(node_rpc::create_light(light_deps))
})?
.build()?;
.build_light()?;

Ok(service)
}
Expand Down
87 changes: 80 additions & 7 deletions client/service/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ use std::{
};
use wasm_timer::SystemTime;
use sc_telemetry::{telemetry, SUBSTRATE_INFO};
use sp_transaction_pool::MaintainedTransactionPool;
use sp_transaction_pool::{LocalTransactionPool, MaintainedTransactionPool};
use prometheus_endpoint::Registry;
use sc_client_db::{Backend, DatabaseSettings};
use sp_core::traits::CodeExecutor;
Expand Down Expand Up @@ -959,8 +959,7 @@ ServiceBuilder<
Ok(self)
}

/// Builds the service.
pub fn build(self) -> Result<Service<
fn build_common(self) -> Result<Service<
TBl,
Client<TBackend, TExec, TBl, TRtApi>,
TSc,
Expand Down Expand Up @@ -1016,10 +1015,6 @@ ServiceBuilder<
"best" => ?chain_info.best_hash
);

// make transaction pool available for off-chain runtime calls.
client.execution_extensions()
.register_transaction_pool(Arc::downgrade(&transaction_pool) as _);

let transaction_pool_adapter = Arc::new(TransactionPoolAdapter {
imports_external_transactions: !matches!(config.role, Role::Light),
pool: transaction_pool.clone(),
Expand Down Expand Up @@ -1421,4 +1416,82 @@ ServiceBuilder<
_base_path: config.base_path.map(Arc::new),
})
}

/// Builds the light service.
pub fn build_light(self) -> Result<Service<
TBl,
Client<TBackend, TExec, TBl, TRtApi>,
TSc,
NetworkStatus<TBl>,
NetworkService<TBl, <TBl as BlockT>::Hash>,
TExPool,
sc_offchain::OffchainWorkers<
Client<TBackend, TExec, TBl, TRtApi>,
TBackend::OffchainStorage,
TBl
>,
>, Error>
where TExec: CallExecutor<TBl, Backend = TBackend>,
{
self.build_common()
}
}

impl<TBl, TRtApi, TBackend, TExec, TSc, TImpQu, TExPool, TRpc>
ServiceBuilder<
TBl,
TRtApi,
Client<TBackend, TExec, TBl, TRtApi>,
Arc<OnDemand<TBl>>,
TSc,
TImpQu,
BoxFinalityProofRequestBuilder<TBl>,
Arc<dyn FinalityProofProvider<TBl>>,
TExPool,
TRpc,
TBackend,
> where
Client<TBackend, TExec, TBl, TRtApi>: ProvideRuntimeApi<TBl>,
<Client<TBackend, TExec, TBl, TRtApi> as ProvideRuntimeApi<TBl>>::Api:
sp_api::Metadata<TBl> +
sc_offchain::OffchainWorkerApi<TBl> +
sp_transaction_pool::runtime_api::TaggedTransactionQueue<TBl> +
sp_session::SessionKeys<TBl> +
sp_api::ApiErrorExt<Error = sp_blockchain::Error> +
sp_api::ApiExt<TBl, StateBackend = TBackend::State>,
TBl: BlockT,
TRtApi: 'static + Send + Sync,
TBackend: 'static + sc_client_api::backend::Backend<TBl> + Send,
TExec: 'static + CallExecutor<TBl> + Send + Sync + Clone,
TSc: Clone,
TImpQu: 'static + ImportQueue<TBl>,
TExPool: MaintainedTransactionPool<Block = TBl, Hash = <TBl as BlockT>::Hash> +
LocalTransactionPool<Block = TBl, Hash = <TBl as BlockT>::Hash> +
MallocSizeOfWasm +
'static,
TRpc: sc_rpc::RpcExtension<sc_rpc::Metadata>,
{

/// Builds the full service.
pub fn build_full(self) -> Result<Service<
TBl,
Client<TBackend, TExec, TBl, TRtApi>,
TSc,
NetworkStatus<TBl>,
NetworkService<TBl, <TBl as BlockT>::Hash>,
TExPool,
sc_offchain::OffchainWorkers<
Client<TBackend, TExec, TBl, TRtApi>,
TBackend::OffchainStorage,
TBl
>,
>, Error>
where TExec: CallExecutor<TBl, Backend = TBackend>,
{
// make transaction pool available for off-chain runtime calls.
self.client.execution_extensions()
.register_transaction_pool(Arc::downgrade(&self.transaction_pool) as _);

self.build_common()
}
}
88 changes: 65 additions & 23 deletions client/transaction-pool/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,29 +87,15 @@ where
let client = self.client.clone();
let at = at.clone();

self.pool.spawn_ok(futures_diagnose::diagnose("validate-transaction", async move {
sp_tracing::enter_span!("validate_transaction");
let runtime_api = client.runtime_api();
let has_v2 = sp_tracing::tracing_span! { "check_version";
runtime_api
.has_api_with::<dyn TaggedTransactionQueue<Self::Block, Error=()>, _>(
&at, |v| v >= 2,
)
.unwrap_or_default()
};

sp_tracing::enter_span!("runtime::validate_transaction");
let res = if has_v2 {
runtime_api.validate_transaction(&at, source, uxt)
} else {
#[allow(deprecated)] // old validate_transaction
runtime_api.validate_transaction_before_version_2(&at, uxt)
};
let res = res.map_err(|e| Error::RuntimeApi(e.to_string()));
if let Err(e) = tx.send(res) {
log::warn!("Unable to send a validate transaction result: {:?}", e);
}
}));
self.pool.spawn_ok(futures_diagnose::diagnose(
"validate-transaction",
async move {
let res = validate_transaction_blocking(&*client, &at, source, uxt);
if let Err(e) = tx.send(res) {
log::warn!("Unable to send a validate transaction result: {:?}", e);
}
},
));

Box::pin(async move {
match rx.await {
Expand Down Expand Up @@ -143,6 +129,62 @@ where
}
}

/// Helper function to validate a transaction using a full chain API.
/// This method will call into the runtime to perform the validation.
fn validate_transaction_blocking<Client, Block>(
client: &Client,
at: &BlockId<Block>,
source: TransactionSource,
uxt: sc_transaction_graph::ExtrinsicFor<FullChainApi<Client, Block>>,
) -> error::Result<TransactionValidity>
where
Block: BlockT,
Client: ProvideRuntimeApi<Block> + BlockBackend<Block> + BlockIdTo<Block>,
Client: Send + Sync + 'static,
Client::Api: TaggedTransactionQueue<Block>,
sp_api::ApiErrorFor<Client, Block>: Send + std::fmt::Display,
{
sp_tracing::enter_span!("validate_transaction");
let runtime_api = client.runtime_api();
let has_v2 = sp_tracing::tracing_span! { "check_version";
runtime_api
.has_api_with::<dyn TaggedTransactionQueue<Block, Error=()>, _>(&at, |v| v >= 2)
.unwrap_or_default()
};

sp_tracing::enter_span!("runtime::validate_transaction");
let res = if has_v2 {
runtime_api.validate_transaction(&at, source, uxt)
} else {
#[allow(deprecated)] // old validate_transaction
runtime_api.validate_transaction_before_version_2(&at, uxt)
};

res.map_err(|e| Error::RuntimeApi(e.to_string()))
}

impl<Client, Block> FullChainApi<Client, Block>
where
Block: BlockT,
Client: ProvideRuntimeApi<Block> + BlockBackend<Block> + BlockIdTo<Block>,
Client: Send + Sync + 'static,
Client::Api: TaggedTransactionQueue<Block>,
sp_api::ApiErrorFor<Client, Block>: Send + std::fmt::Display,
{
/// Validates a transaction by calling into the runtime, same as
/// `validate_transaction` but blocks the current thread when performing
/// validation. Only implemented for `FullChainApi` since we can call into
/// the runtime locally.
pub fn validate_transaction_blocking(
&self,
at: &BlockId<Block>,
source: TransactionSource,
uxt: sc_transaction_graph::ExtrinsicFor<Self>,
) -> error::Result<TransactionValidity> {
validate_transaction_blocking(&*self.client, at, source, uxt)
}
}

/// The transaction pool logic for light client.
pub struct LightChainApi<Client, F, Block> {
client: Arc<Client>,
Expand Down
53 changes: 53 additions & 0 deletions client/transaction-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,59 @@ impl<PoolApi, Block> TransactionPool for BasicPool<PoolApi, Block>
}
}

impl<Block, Client> sp_transaction_pool::LocalTransactionPool
for BasicPool<FullChainApi<Client, Block>, Block>
where
Block: BlockT,
Client: sp_api::ProvideRuntimeApi<Block>
+ sc_client_api::BlockBackend<Block>
+ sp_runtime::traits::BlockIdTo<Block>,
Client: Send + Sync + 'static,
Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>,
sp_api::ApiErrorFor<Client, Block>: Send + std::fmt::Display,
{
type Block = Block;
type Hash = sc_transaction_graph::ExtrinsicHash<FullChainApi<Client, Block>>;
type Error = <FullChainApi<Client, Block> as ChainApi>::Error;

fn submit_local(
&self,
at: &BlockId<Self::Block>,
xt: sp_transaction_pool::LocalTransactionFor<Self>,
) -> Result<Self::Hash, Self::Error> {
use sc_transaction_graph::ValidatedTransaction;
use sp_runtime::traits::SaturatedConversion;
use sp_runtime::transaction_validity::TransactionValidityError;

let validity = self
.api
.validate_transaction_blocking(at, TransactionSource::Local, xt.clone())?
.map_err(|e| {
Self::Error::Pool(match e {
TransactionValidityError::Invalid(i) => i.into(),
TransactionValidityError::Unknown(u) => u.into(),
})
})?;

let (hash, bytes) = self.pool.validated_pool().api().hash_and_length(&xt);
let block_number = self
.api
.block_id_to_number(at)?
.ok_or_else(|| error::Error::BlockIdConversion(format!("{:?}", at)))?;

let validated = ValidatedTransaction::valid_at(
block_number.saturated_into::<u64>(),
hash.clone(),
TransactionSource::Local,
xt,
bytes,
validity,
);

self.pool.validated_pool().submit(vec![validated]).remove(0)
}
}

#[cfg_attr(test, derive(Debug))]
enum RevalidationStatus<N> {
/// The revalidation has never been completed.
Expand Down
37 changes: 30 additions & 7 deletions primitives/transaction-pool/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ pub type BlockHash<P> = <<P as TransactionPool>::Block as BlockT>::Hash;
pub type TransactionFor<P> = <<P as TransactionPool>::Block as BlockT>::Extrinsic;
/// Type of transactions event stream for a pool.
pub type TransactionStatusStreamFor<P> = TransactionStatusStream<TxHash<P>, BlockHash<P>>;
/// Transaction type for a local pool.
pub type LocalTransactionFor<P> = <<P as LocalTransactionPool>::Block as BlockT>::Extrinsic;

/// Typical future type used in transaction pool api.
pub type PoolFuture<T, E> = std::pin::Pin<Box<dyn Future<Output=Result<T, E>> + Send>>;
Expand Down Expand Up @@ -273,6 +275,28 @@ pub trait MaintainedTransactionPool: TransactionPool {
fn maintain(&self, event: ChainEvent<Self::Block>) -> Pin<Box<dyn Future<Output=()> + Send>>;
}

/// Transaction pool interface for submitting local transactions that exposes a
/// blocking interface for submission.
pub trait LocalTransactionPool: Send + Sync {
/// Block type.
type Block: BlockT;
/// Transaction hash type.
type Hash: Hash + Eq + Member + Serialize;
/// Error type.
type Error: From<crate::error::Error> + crate::error::IntoPoolError;

/// Submits the given local unverified transaction to the pool blocking the
/// current thread for any necessary pre-verification.
/// NOTE: It MUST NOT be used for transactions that originate from the
/// network or RPC, since the validation is performed with
/// `TransactionSource::Local`.
fn submit_local(
&self,
at: &BlockId<Self::Block>,
xt: LocalTransactionFor<Self>,
) -> Result<Self::Hash, Self::Error>;
}

/// An abstraction for transaction pool.
///
/// This trait is used by offchain calls to be able to submit transactions.
Expand All @@ -291,7 +315,7 @@ pub trait OffchainSubmitTransaction<Block: BlockT>: Send + Sync {
) -> Result<(), ()>;
}

impl<TPool: TransactionPool> OffchainSubmitTransaction<TPool::Block> for TPool {
impl<TPool: LocalTransactionPool> OffchainSubmitTransaction<TPool::Block> for TPool {
fn submit_at(
&self,
at: &BlockId<TPool::Block>,
Expand All @@ -303,15 +327,14 @@ impl<TPool: TransactionPool> OffchainSubmitTransaction<TPool::Block> for TPool {
extrinsic
);

let result = futures::executor::block_on(self.submit_one(
&at, TransactionSource::Local, extrinsic,
));
let result = self.submit_local(&at, extrinsic);

result.map(|_| ())
.map_err(|e| log::warn!(
result.map(|_| ()).map_err(|e| {
log::warn!(
target: "txpool",
"(offchain call) Error submitting a transaction to the pool: {:?}",
e
))
)
})
}
}