Skip to content
This repository was archived by the owner on Nov 15, 2023. It is now read-only.

Commit 2182cd3

Browse files
committed
revalidation queue updates
1 parent bad1280 commit 2182cd3

File tree

3 files changed

+104
-20
lines changed

3 files changed

+104
-20
lines changed

client/transaction-pool/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
1919
#![warn(missing_docs)]
2020
#![warn(unused_extern_crates)]
21+
#![recursion_limit="256"]
2122

2223
mod api;
2324
pub mod error;
@@ -345,6 +346,8 @@ impl<PoolApi, Block> MaintainedTransactionPool for BasicPool<PoolApi, Block>
345346
if let Err(e) = pool.prune_known(&id, &hashes) {
346347
log::error!("Cannot prune known in the pool {:?}!", e);
347348
}
349+
350+
revalidation_queue.notify_pruned(hashes);
348351
}
349352

350353
if next_action.resubmit {

client/transaction-pool/src/revalidation.rs

Lines changed: 80 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,15 @@ pub const BACKGROUND_REVALIDATION_INTERVAL: Duration = Duration::from_millis(5);
3434

3535
const BACKGROUND_REVALIDATION_BATCH_SIZE: usize = 20;
3636

37-
/// Payload from queue to worker.
38-
struct WorkerPayload<Api: ChainApi> {
39-
at: NumberFor<Api>,
40-
transactions: Vec<ExHash<Api>>,
37+
/// Background worker notification.
38+
enum WorkerNotify<Api: ChainApi> {
39+
Add {
40+
at: NumberFor<Api>,
41+
transactions: Vec<ExHash<Api>>,
42+
},
43+
Remove {
44+
transactions: Vec<ExHash<Api>>,
45+
},
4146
}
4247

4348
/// Async revalidation worker.
@@ -89,6 +94,7 @@ async fn batch_revalidate<Api: ChainApi>(
8994
log::trace!(target: "txpool", "[{:?}]: Unknown during revalidation: {:?}", ext_hash, err);
9095
},
9196
Ok(Ok(validity)) => {
97+
log::debug!(target: "txpool", "[{:?}]: Valid during revalidation, will be resubmitted.", ext_hash);
9298
revalidated.insert(
9399
ext_hash.clone(),
94100
ValidatedTransaction::valid_at(
@@ -113,7 +119,10 @@ async fn batch_revalidate<Api: ChainApi>(
113119
}
114120

115121
pool.validated_pool().remove_invalid(&invalid_hashes);
116-
pool.resubmit(revalidated);
122+
123+
if revalidated.len() > 0 {
124+
pool.resubmit(revalidated);
125+
}
117126
}
118127

119128
impl<Api: ChainApi> RevalidationWorker<Api> {
@@ -149,6 +158,7 @@ impl<Api: ChainApi> RevalidationWorker<Api> {
149158
} else {
150159
for xt in &to_queue {
151160
extrinsics.remove(xt);
161+
self.members.remove(xt);
152162
}
153163
}
154164
left -= to_queue.len();
@@ -163,14 +173,18 @@ impl<Api: ChainApi> RevalidationWorker<Api> {
163173
queued_exts
164174
}
165175

166-
fn push(&mut self, worker_payload: WorkerPayload<Api>) {
167-
// we don't add something that already scheduled for revalidation
168-
let transactions = worker_payload.transactions;
169-
let block_number = worker_payload.at;
170-
176+
fn push(&mut self, block_number: NumberFor<Api>, transactions: Vec<ExHash<Api>>) {
171177
for ext_hash in transactions {
172178
// we don't add something that already scheduled for revalidation
173-
if self.members.contains_key(&ext_hash) { continue; }
179+
if self.members.contains_key(&ext_hash) {
180+
log::debug!(
181+
target: "txpool",
182+
"[{:?}] Skipped adding for revalidation: Already there.",
183+
ext_hash,
184+
);
185+
186+
continue;
187+
}
174188

175189
self.block_ordered.entry(block_number)
176190
.and_modify(|value| { value.insert(ext_hash.clone()); })
@@ -183,12 +197,26 @@ impl<Api: ChainApi> RevalidationWorker<Api> {
183197
}
184198
}
185199

200+
fn remove(&mut self, transactions: Vec<ExHash<Api>>) {
201+
for ext_hash in transactions {
202+
if let Some(block_number) = self.members.remove(&ext_hash) {
203+
if let Some(block_record) = self.block_ordered.get_mut(&block_number) {
204+
block_record.remove(&ext_hash);
205+
}
206+
}
207+
}
208+
}
209+
210+
fn len(&self) -> usize {
211+
self.block_ordered.iter().map(|b| b.1.len()).sum()
212+
}
213+
186214
/// Background worker main loop.
187215
///
188216
/// It does two things: periodically tries to process some transactions
189217
/// from the queue and also accepts messages to enqueue some more
190218
/// transactions from the pool.
191-
pub async fn run(mut self, from_queue: mpsc::UnboundedReceiver<WorkerPayload<Api>>) {
219+
pub async fn run(mut self, from_queue: mpsc::UnboundedReceiver<WorkerNotify<Api>>) {
192220
let interval = interval(BACKGROUND_REVALIDATION_INTERVAL).fuse();
193221
let from_queue = from_queue.fuse();
194222
futures::pin_mut!(interval, from_queue);
@@ -198,13 +226,29 @@ impl<Api: ChainApi> RevalidationWorker<Api> {
198226
futures::select! {
199227
_ = interval.next() => {
200228
let next_batch = this.prepare_batch();
229+
let batch_len = next_batch.len();
201230
batch_revalidate(this.pool.clone(), this.api.clone(), this.best_block, next_batch).await;
231+
if batch_len > 0 || this.len() > 0 {
232+
log::debug!(
233+
target: "txpool",
234+
"Revalidated {} transactions. Left in the queue for revalidation: {}.",
235+
batch_len,
236+
this.len(),
237+
);
238+
}
202239
},
203-
workload = from_queue.next() => {
204-
match workload {
205-
Some(worker_payload) => {
206-
this.best_block = worker_payload.at;
207-
this.push(worker_payload);
240+
notification = from_queue.next() => {
241+
match notification {
242+
Some(notification) => {
243+
match notification {
244+
WorkerNotify::Add { transactions, at } => {
245+
this.best_block = at;
246+
this.push(at, transactions);
247+
},
248+
WorkerNotify::Remove { transactions } => {
249+
this.remove(transactions);
250+
},
251+
}
208252
continue;
209253
},
210254
// R.I.P. worker!
@@ -224,7 +268,7 @@ impl<Api: ChainApi> RevalidationWorker<Api> {
224268
pub struct RevalidationQueue<Api: ChainApi> {
225269
pool: Arc<Pool<Api>>,
226270
api: Arc<Api>,
227-
background: Option<mpsc::UnboundedSender<WorkerPayload<Api>>>,
271+
background: Option<mpsc::UnboundedSender<WorkerNotify<Api>>>,
228272
}
229273

230274
impl<Api: ChainApi> RevalidationQueue<Api>
@@ -264,8 +308,11 @@ where
264308
/// If queue configured without background worker, this will resolve after
265309
/// revalidation is actually done.
266310
pub async fn revalidate_later(&self, at: NumberFor<Api>, transactions: Vec<ExHash<Api>>) {
311+
if transactions.len() > 0 {
312+
log::debug!(target: "txpool", "Added {} transactions to revalidation queue", transactions.len());
313+
}
267314
if let Some(ref to_worker) = self.background {
268-
if let Err(e) = to_worker.unbounded_send(WorkerPayload { at, transactions }) {
315+
if let Err(e) = to_worker.unbounded_send(WorkerNotify::Add { at, transactions }) {
269316
log::warn!(target: "txpool", "Failed to update background worker: {:?}", e);
270317
}
271318
return;
@@ -275,6 +322,19 @@ where
275322
batch_revalidate(pool, api, at, transactions).await
276323
}
277324
}
325+
326+
/// Notify that some transactinos are no longer required to be revalidated.
327+
pub fn notify_pruned(&self, transactions: Vec<ExHash<Api>>) {
328+
if transactions.len() > 0 {
329+
log::debug!(target: "txpool", "Removing {} transactions from revalidation queue", transactions.len());
330+
}
331+
332+
if let Some(ref to_worker) = self.background {
333+
if let Err(e) = to_worker.unbounded_send(WorkerNotify::Remove { transactions }) {
334+
log::warn!(target: "txpool", "Failed to update background worker: {:?}", e);
335+
}
336+
}
337+
}
278338
}
279339

280340
#[cfg(test)]
@@ -310,4 +370,4 @@ mod tests {
310370
// number of ready
311371
assert_eq!(pool.validated_pool().status().ready, 1);
312372
}
313-
}
373+
}

client/transaction-pool/src/testing/pool.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,27 @@ fn should_not_retain_invalid_hashes_from_retracted() {
267267
assert_eq!(pool.status().ready, 0);
268268
}
269269

270+
#[test]
271+
fn should_not_resubmit_pruned_transaction_back_to_ready() {
272+
let xt = uxt(Alice, 209);
273+
274+
let (pool, _guard) = maintained_pool();
275+
276+
block_on(pool.submit_one(&BlockId::number(0), xt.clone())).expect("1. Imported");
277+
assert_eq!(pool.status().ready, 1);
278+
279+
pool.api.push_block(1, vec![xt.clone()]);
280+
281+
let event = block_event(1);
282+
283+
block_on(pool.maintain(event));
284+
285+
// maintenance is in background
286+
block_on(futures_timer::Delay::new(BACKGROUND_REVALIDATION_INTERVAL*2));
287+
288+
assert_eq!(pool.status().ready, 0);
289+
}
290+
270291
#[test]
271292
fn should_push_watchers_during_maintaince() {
272293
fn alice_uxt(nonce: u64) -> Extrinsic {

0 commit comments

Comments
 (0)