Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions bin/node-template/node/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ macro_rules! new_full_start {
.with_select_chain(|_config, backend| {
Ok(sc_client::LongestChain::new(backend.clone()))
})?
.with_transaction_pool(|config, client, _fetcher| {
.with_transaction_pool(|config, client, _fetcher, prometheus_registry| {
Copy link
Contributor

@tomaka tomaka Apr 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this is the most straight-forward solution for this PR, but ugh this API. It doesn't make sense at all.
We should really remove these closures and simply have prometheus_registry (and all the other components) be a local variable.
We can't just continue adding parameters to these closures (which is a breaking change every single time) whenever we need something that is in the builder.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not requesting any change on this PR in particular, but I thought I'd comment on that in general.

Copy link
Contributor Author

@NikVolf NikVolf Apr 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you seen this PR?
#5557

let pool_api = sc_transaction_pool::FullChainApi::new(client.clone());
Ok(sc_transaction_pool::BasicPool::new(config, std::sync::Arc::new(pool_api)))
Ok(sc_transaction_pool::BasicPool::new(config, std::sync::Arc::new(pool_api), prometheus_registry))
})?
.with_import_queue(|_config, client, mut select_chain, _transaction_pool| {
let select_chain = select_chain.take()
Expand Down Expand Up @@ -183,13 +183,13 @@ pub fn new_light(config: Configuration)
.with_select_chain(|_config, backend| {
Ok(LongestChain::new(backend.clone()))
})?
.with_transaction_pool(|config, client, fetcher| {
.with_transaction_pool(|config, client, fetcher, prometheus_registry| {
let fetcher = fetcher
.ok_or_else(|| "Trying to start light transaction pool without active fetcher")?;

let pool_api = sc_transaction_pool::LightChainApi::new(client.clone(), fetcher.clone());
let pool = sc_transaction_pool::BasicPool::with_revalidation_type(
config, Arc::new(pool_api), sc_transaction_pool::RevalidationType::Light,
config, Arc::new(pool_api), prometheus_registry, sc_transaction_pool::RevalidationType::Light,
);
Ok(pool)
})?
Expand Down
8 changes: 4 additions & 4 deletions bin/node/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,9 @@ macro_rules! new_full_start {
.with_select_chain(|_config, backend| {
Ok(sc_client::LongestChain::new(backend.clone()))
})?
.with_transaction_pool(|config, client, _fetcher| {
.with_transaction_pool(|config, client, _fetcher, prometheus_registry| {
let pool_api = sc_transaction_pool::FullChainApi::new(client.clone());
Ok(sc_transaction_pool::BasicPool::new(config, std::sync::Arc::new(pool_api)))
Ok(sc_transaction_pool::BasicPool::new(config, std::sync::Arc::new(pool_api), prometheus_registry))
})?
.with_import_queue(|_config, client, mut select_chain, _transaction_pool| {
let select_chain = select_chain.take()
Expand Down Expand Up @@ -312,12 +312,12 @@ pub fn new_light(config: Configuration)
.with_select_chain(|_config, backend| {
Ok(LongestChain::new(backend.clone()))
})?
.with_transaction_pool(|config, client, fetcher| {
.with_transaction_pool(|config, client, fetcher, prometheus_registry| {
let fetcher = fetcher
.ok_or_else(|| "Trying to start light transaction pool without active fetcher")?;
let pool_api = sc_transaction_pool::LightChainApi::new(client.clone(), fetcher.clone());
let pool = sc_transaction_pool::BasicPool::with_revalidation_type(
config, Arc::new(pool_api), sc_transaction_pool::RevalidationType::Light,
config, Arc::new(pool_api), prometheus_registry, sc_transaction_pool::RevalidationType::Light,
);
Ok(pool)
})?
Expand Down
25 changes: 21 additions & 4 deletions client/basic-authorship/src/basic_authorship.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,11 @@ mod tests {
// given
let client = Arc::new(substrate_test_runtime_client::new());
let txpool = Arc::new(
BasicPool::new(Default::default(), Arc::new(FullChainApi::new(client.clone()))).0
BasicPool::new(
Default::default(),
Arc::new(FullChainApi::new(client.clone())),
None,
).0
);

futures::executor::block_on(
Expand Down Expand Up @@ -408,7 +412,11 @@ mod tests {
fn should_not_panic_when_deadline_is_reached() {
let client = Arc::new(substrate_test_runtime_client::new());
let txpool = Arc::new(
BasicPool::new(Default::default(), Arc::new(FullChainApi::new(client.clone()))).0
BasicPool::new(
Default::default(),
Arc::new(FullChainApi::new(client.clone())),
None,
).0
);

let mut proposer_factory = ProposerFactory::new(client.clone(), txpool.clone());
Expand Down Expand Up @@ -440,8 +448,13 @@ mod tests {
.build_with_backend();
let client = Arc::new(client);
let txpool = Arc::new(
BasicPool::new(Default::default(), Arc::new(FullChainApi::new(client.clone()))).0
BasicPool::new(
Default::default(),
Arc::new(FullChainApi::new(client.clone())),
None,
).0
);

let genesis_hash = client.info().best_hash;
let block_id = BlockId::Hash(genesis_hash);

Expand Down Expand Up @@ -493,7 +506,11 @@ mod tests {
// given
let mut client = Arc::new(substrate_test_runtime_client::new());
let txpool = Arc::new(
BasicPool::new(Default::default(), Arc::new(FullChainApi::new(client.clone()))).0
BasicPool::new(
Default::default(),
Arc::new(FullChainApi::new(client.clone())),
None,
).0
);

futures::executor::block_on(
Expand Down
2 changes: 1 addition & 1 deletion client/basic-authorship/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
//! # use substrate_test_runtime_client::{self, runtime::{Extrinsic, Transfer}, AccountKeyring};
//! # use sc_transaction_pool::{BasicPool, FullChainApi};
//! # let client = Arc::new(substrate_test_runtime_client::new());
//! # let txpool = Arc::new(BasicPool::new(Default::default(), Arc::new(FullChainApi::new(client.clone()))).0);
//! # let txpool = Arc::new(BasicPool::new(Default::default(), Arc::new(FullChainApi::new(client.clone())), None).0);
//! // The first step is to create a `ProposerFactory`.
//! let mut proposer_factory = ProposerFactory::new(client.clone(), txpool.clone());
//!
Expand Down
6 changes: 3 additions & 3 deletions client/consensus/manual-seal/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ mod tests {
let (client, select_chain) = builder.build_with_longest_chain();
let client = Arc::new(client);
let inherent_data_providers = InherentDataProviders::new();
let pool = Arc::new(BasicPool::new(Options::default(), api()).0);
let pool = Arc::new(BasicPool::new(Options::default(), api(), None).0);
let env = ProposerFactory::new(
client.clone(),
pool.clone()
Expand Down Expand Up @@ -281,7 +281,7 @@ mod tests {
let (client, select_chain) = builder.build_with_longest_chain();
let client = Arc::new(client);
let inherent_data_providers = InherentDataProviders::new();
let pool = Arc::new(BasicPool::new(Options::default(), api()).0);
let pool = Arc::new(BasicPool::new(Options::default(), api(), None).0);
let env = ProposerFactory::new(
client.clone(),
pool.clone()
Expand Down Expand Up @@ -349,7 +349,7 @@ mod tests {
let client = Arc::new(client);
let inherent_data_providers = InherentDataProviders::new();
let pool_api = api();
let pool = Arc::new(BasicPool::new(Options::default(), pool_api.clone()).0);
let pool = Arc::new(BasicPool::new(Options::default(), pool_api.clone(), None).0);
let env = ProposerFactory::new(
client.clone(),
pool.clone(),
Expand Down
1 change: 1 addition & 0 deletions client/offchain/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ mod tests {
let pool = Arc::new(TestPool(BasicPool::new(
Default::default(),
Arc::new(FullChainApi::new(client.clone())),
None,
).0));
client.execution_extensions()
.register_transaction_pool(Arc::downgrade(&pool.clone()) as _);
Expand Down
1 change: 1 addition & 0 deletions client/rpc/src/author/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ impl Default for TestSetup {
let pool = Arc::new(BasicPool::new(
Default::default(),
Arc::new(FullChainApi::new(client.clone())),
None,
).0);
TestSetup {
runtime: runtime::Runtime::new().expect("Failed to create runtime in test setup"),
Expand Down
3 changes: 3 additions & 0 deletions client/service/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ use wasm_timer::SystemTime;
use sc_telemetry::{telemetry, SUBSTRATE_INFO};
use sp_transaction_pool::{MaintainedTransactionPool, ChainEvent};
use sp_blockchain;
use prometheus_endpoint::Registry as PrometheusRegistry;

pub type BackgroundTask = Pin<Box<dyn Future<Output=()> + Send>>;

Expand Down Expand Up @@ -585,6 +586,7 @@ impl<TBl, TRtApi, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TExPool, TRpc, Backend>
sc_transaction_pool::txpool::Options,
Arc<TCl>,
Option<TFchr>,
Option<&PrometheusRegistry>,
) -> Result<(UExPool, Option<BackgroundTask>), Error>
) -> Result<ServiceBuilder<TBl, TRtApi, TCl, TFchr, TSc, TImpQu, TFprb, TFpp,
UExPool, TRpc, Backend>, Error>
Expand All @@ -593,6 +595,7 @@ impl<TBl, TRtApi, TCl, TFchr, TSc, TImpQu, TFprb, TFpp, TExPool, TRpc, Backend>
self.config.transaction_pool.clone(),
self.client.clone(),
self.fetcher.clone(),
self.config.prometheus_config.as_ref().map(|config| &config.registry),
)?;

if let Some(background_task) = background_task{
Expand Down
1 change: 1 addition & 0 deletions client/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -696,6 +696,7 @@ mod tests {
let pool = Arc::new(BasicPool::new(
Default::default(),
Arc::new(FullChainApi::new(client.clone())),
None,
).0);
let source = sp_runtime::transaction_validity::TransactionSource::External;
let best = longest_chain.best_chain().unwrap();
Expand Down
1 change: 1 addition & 0 deletions client/transaction-pool/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ intervalier = "0.4.0"
log = "0.4.8"
parity-util-mem = { version = "0.6.1", default-features = false, features = ["primitive-types"] }
parking_lot = "0.10.0"
prometheus-endpoint = { package = "substrate-prometheus-endpoint", path = "../../utils/prometheus", version = "0.8.0-dev"}
sc-client-api = { version = "2.0.0-dev", path = "../api" }
sc-transaction-graph = { version = "2.0.0-dev", path = "./graph" }
sp-api = { version = "2.0.0-dev", path = "../../primitives/api" }
Expand Down
45 changes: 39 additions & 6 deletions client/transaction-pool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@
#![warn(unused_extern_crates)]

mod api;
pub mod error;
mod revalidation;
mod metrics;

pub mod error;

#[cfg(any(feature = "test-helpers", test))]
pub mod testing;
Expand All @@ -45,6 +47,9 @@ use sp_transaction_pool::{
};
use wasm_timer::Instant;

use prometheus_endpoint::Registry as PrometheusRegistry;
use crate::metrics::MetricsLink as PrometheusMetrics;

type BoxedReadyIterator<Hash, Data> = Box<dyn Iterator<Item=Arc<sc_transaction_graph::base_pool::Transaction<Hash, Data>>> + Send>;

type ReadyIteratorFor<PoolApi> = BoxedReadyIterator<sc_transaction_graph::ExHash<PoolApi>, sc_transaction_graph::ExtrinsicFor<PoolApi>>;
Expand All @@ -62,6 +67,7 @@ pub struct BasicPool<PoolApi, Block>
revalidation_strategy: Arc<Mutex<RevalidationStrategy<NumberFor<Block>>>>,
revalidation_queue: Arc<revalidation::RevalidationQueue<PoolApi>>,
ready_poll: Arc<Mutex<ReadyPoll<ReadyIteratorFor<PoolApi>, Block>>>,
metrics: PrometheusMetrics,
}

struct ReadyPoll<T, Block: BlockT> {
Expand Down Expand Up @@ -147,8 +153,9 @@ impl<PoolApi, Block> BasicPool<PoolApi, Block>
pub fn new(
options: sc_transaction_graph::Options,
pool_api: Arc<PoolApi>,
prometheus: Option<&PrometheusRegistry>,
) -> (Self, Option<Pin<Box<dyn Future<Output=()> + Send>>>) {
Self::with_revalidation_type(options, pool_api, RevalidationType::Full)
Self::with_revalidation_type(options, pool_api, prometheus, RevalidationType::Full)
}

/// Create new basic transaction pool with provided api, for tests.
Expand All @@ -166,6 +173,7 @@ impl<PoolApi, Block> BasicPool<PoolApi, Block>
revalidation_queue: Arc::new(revalidation_queue),
revalidation_strategy: Arc::new(Mutex::new(RevalidationStrategy::Always)),
ready_poll: Default::default(),
metrics: Default::default(),
},
background_task,
notifier,
Expand All @@ -177,6 +185,7 @@ impl<PoolApi, Block> BasicPool<PoolApi, Block>
pub fn with_revalidation_type(
options: sc_transaction_graph::Options,
pool_api: Arc<PoolApi>,
prometheus: Option<&PrometheusRegistry>,
revalidation_type: RevalidationType,
) -> (Self, Option<Pin<Box<dyn Future<Output=()> + Send>>>) {
let pool = Arc::new(sc_transaction_graph::Pool::new(options, pool_api.clone()));
Expand All @@ -187,6 +196,7 @@ impl<PoolApi, Block> BasicPool<PoolApi, Block>
(queue, Some(background))
},
};

(
BasicPool {
api: pool_api,
Expand All @@ -199,6 +209,7 @@ impl<PoolApi, Block> BasicPool<PoolApi, Block>
}
)),
ready_poll: Default::default(),
metrics: PrometheusMetrics::new(prometheus),
},
background_task,
)
Expand Down Expand Up @@ -228,8 +239,15 @@ impl<PoolApi, Block> TransactionPool for BasicPool<PoolApi, Block>
) -> PoolFuture<Vec<Result<TxHash<Self>, Self::Error>>, Self::Error> {
let pool = self.pool.clone();
let at = *at;

self.metrics.report(|metrics| metrics.validations_scheduled.inc_by(xts.len() as u64));

let metrics = self.metrics.clone();
async move {
pool.submit_at(&at, source, xts, false).await
let tx_count = xts.len();
let res = pool.submit_at(&at, source, xts, false).await;
metrics.report(|metrics| metrics.validations_finished.inc_by(tx_count as u64));
res
}.boxed()
}

Expand All @@ -241,8 +259,16 @@ impl<PoolApi, Block> TransactionPool for BasicPool<PoolApi, Block>
) -> PoolFuture<TxHash<Self>, Self::Error> {
let pool = self.pool.clone();
let at = *at;

self.metrics.report(|metrics| metrics.validations_scheduled.inc());

let metrics = self.metrics.clone();
async move {
pool.submit_one(&at, source, xt).await
let res = pool.submit_one(&at, source, xt).await;

metrics.report(|metrics| metrics.validations_finished.inc());
res

}.boxed()
}

Expand All @@ -255,10 +281,17 @@ impl<PoolApi, Block> TransactionPool for BasicPool<PoolApi, Block>
let at = *at;
let pool = self.pool.clone();

self.metrics.report(|metrics| metrics.validations_scheduled.inc());

let metrics = self.metrics.clone();
async move {
pool.submit_and_watch(&at, source, xt)
let result = pool.submit_and_watch(&at, source, xt)
.map(|result| result.map(|watcher| Box::new(watcher.into_stream()) as _))
.await
.await;

metrics.report(|metrics| metrics.validations_finished.inc());

result
}.boxed()
}

Expand Down
Loading