From dd35f21f195566fc0f01982f6d3872f6ac3abdd5 Mon Sep 17 00:00:00 2001 From: NikVolf Date: Thu, 4 Jun 2020 18:05:46 +0300 Subject: [PATCH 1/5] refactor ready set size calc --- client/transaction-pool/graph/src/ready.rs | 69 +++++++++++++++++++++- 1 file changed, 66 insertions(+), 3 deletions(-) diff --git a/client/transaction-pool/graph/src/ready.rs b/client/transaction-pool/graph/src/ready.rs index b5807ffce4480..d94dd20cb1a69 100644 --- a/client/transaction-pool/graph/src/ready.rs +++ b/client/transaction-pool/graph/src/ready.rs @@ -106,6 +106,69 @@ Hence every hash retrieved from `provided_tags` is always present in `ready`; qed "#; +#[derive(Debug, parity_util_mem::MallocSizeOf)] +pub struct ReadySet { + index: HashMap>, + bytes: usize, +} + +impl Default for ReadySet { + fn default() -> Self { + ReadySet { + index: HashMap::default(), + bytes: 0, + } + } +} + +impl ReadySet { + pub fn insert(&mut self, key: Hash, val: ReadyTx) { + let new_bytes = val.transaction.transaction.bytes; + self.bytes += new_bytes; + self.index.insert(key, val); + } + + pub fn remove(&mut self, key: &Hash) -> Option> { + let val = self.index.remove(key); + let deduced_bytes = val.as_ref().map(|val| val.transaction.transaction.bytes).unwrap_or(0); + if self.bytes < deduced_bytes { + log::warn!( + "Some consistent data in ready set limit, bytes = {} while removing tx of size {}", + self.bytes, + deduced_bytes, + ); + self.bytes = 0; + } else { + self.bytes -= deduced_bytes; + } + val + } + + pub fn contains_key(&self, key: &Hash) -> bool { + self.index.contains_key(key) + } + + pub fn get(&self, key: &Hash) -> Option<&ReadyTx> { + self.index.get(key) + } + + pub fn get_mut(&mut self, key: &Hash) -> Option<&mut ReadyTx> { + self.index.get_mut(key) + } + + pub fn len(&self) -> usize { + self.index.len() + } + + pub fn bytes(&self) -> usize { + self.bytes + } + + pub fn values(&self) -> std::collections::hash_map::Values> { + self.index.values() + } +} + #[derive(Debug, parity_util_mem::MallocSizeOf)] pub struct ReadyTransactions { /// Insertion id @@ -113,7 +176,7 @@ pub struct ReadyTransactions { /// tags that are provided by Ready transactions provided_tags: HashMap, /// Transactions that are ready (i.e. don't have any requirements external to the pool) - ready: Arc>>>, + ready: Arc>>, /// Best transactions that are ready to be included to the block without any other previous transaction. best: BTreeSet>, } @@ -473,13 +536,13 @@ impl ReadyTransactions { /// Returns sum of encoding lengths of all transactions in this queue. pub fn bytes(&self) -> usize { - self.ready.read().values().fold(0, |acc, tx| acc + tx.transaction.transaction.bytes) + self.ready.read().bytes() } } /// Iterator of ready transactions ordered by priority. pub struct BestIterator { - all: Arc>>>, + all: Arc>>, awaiting: HashMap)>, best: BTreeSet>, } From 0a9929d5cb23493d1ba82558da26ee62480a0fdd Mon Sep 17 00:00:00 2001 From: Nikolay Volf Date: Thu, 4 Jun 2020 19:00:10 +0300 Subject: [PATCH 2/5] Update client/transaction-pool/graph/src/ready.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bastian Köcher --- client/transaction-pool/graph/src/ready.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/client/transaction-pool/graph/src/ready.rs b/client/transaction-pool/graph/src/ready.rs index d94dd20cb1a69..a5c27ee65926d 100644 --- a/client/transaction-pool/graph/src/ready.rs +++ b/client/transaction-pool/graph/src/ready.rs @@ -133,6 +133,7 @@ impl ReadySet { let deduced_bytes = val.as_ref().map(|val| val.transaction.transaction.bytes).unwrap_or(0); if self.bytes < deduced_bytes { log::warn!( + target: "txpool", "Some consistent data in ready set limit, bytes = {} while removing tx of size {}", self.bytes, deduced_bytes, From 141b8fc63c0c064482dac3f300d850f05d08b335 Mon Sep 17 00:00:00 2001 From: NikVolf Date: Thu, 4 Jun 2020 19:07:00 +0300 Subject: [PATCH 3/5] remove pub --- client/transaction-pool/graph/src/ready.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/transaction-pool/graph/src/ready.rs b/client/transaction-pool/graph/src/ready.rs index d94dd20cb1a69..9f9aeabbaf758 100644 --- a/client/transaction-pool/graph/src/ready.rs +++ b/client/transaction-pool/graph/src/ready.rs @@ -107,7 +107,7 @@ qed "#; #[derive(Debug, parity_util_mem::MallocSizeOf)] -pub struct ReadySet { +struct ReadySet { index: HashMap>, bytes: usize, } From 6886bff8d57ea256b673d90e180fdcbf483608a8 Mon Sep 17 00:00:00 2001 From: NikVolf Date: Mon, 8 Jun 2020 22:47:58 +0300 Subject: [PATCH 4/5] update to new variat --- client/transaction-pool/graph/src/lib.rs | 1 + client/transaction-pool/graph/src/ready.rs | 86 ++------ .../transaction-pool/graph/src/tracked_map.rs | 189 ++++++++++++++++++ 3 files changed, 205 insertions(+), 71 deletions(-) create mode 100644 client/transaction-pool/graph/src/tracked_map.rs diff --git a/client/transaction-pool/graph/src/lib.rs b/client/transaction-pool/graph/src/lib.rs index 04e5d0d3fbe9f..4a6e7b6f2a1c0 100644 --- a/client/transaction-pool/graph/src/lib.rs +++ b/client/transaction-pool/graph/src/lib.rs @@ -32,6 +32,7 @@ mod pool; mod ready; mod rotator; mod validated_pool; +mod tracked_map; pub mod base_pool; pub mod watcher; diff --git a/client/transaction-pool/graph/src/ready.rs b/client/transaction-pool/graph/src/ready.rs index 82b7170416364..8afdab34db4de 100644 --- a/client/transaction-pool/graph/src/ready.rs +++ b/client/transaction-pool/graph/src/ready.rs @@ -25,15 +25,17 @@ use std::{ use serde::Serialize; use log::trace; -use parking_lot::RwLock; use sp_runtime::traits::Member; use sp_runtime::transaction_validity::{ TransactionTag as Tag, }; use sp_transaction_pool::error; -use crate::future::WaitingTransaction; -use crate::base_pool::Transaction; +use crate::{ + base_pool::Transaction, + future::WaitingTransaction, + tracked_map::{ReadOnlyTrackedMap, TrackedMap, TrackedSize}, +}; /// An in-pool transaction reference. /// @@ -106,70 +108,6 @@ Hence every hash retrieved from `provided_tags` is always present in `ready`; qed "#; -#[derive(Debug, parity_util_mem::MallocSizeOf)] -struct ReadySet { - index: HashMap>, - bytes: usize, -} - -impl Default for ReadySet { - fn default() -> Self { - ReadySet { - index: HashMap::default(), - bytes: 0, - } - } -} - -impl ReadySet { - pub fn insert(&mut self, key: Hash, val: ReadyTx) { - let new_bytes = val.transaction.transaction.bytes; - self.bytes += new_bytes; - self.index.insert(key, val); - } - - pub fn remove(&mut self, key: &Hash) -> Option> { - let val = self.index.remove(key); - let deduced_bytes = val.as_ref().map(|val| val.transaction.transaction.bytes).unwrap_or(0); - if self.bytes < deduced_bytes { - log::warn!( - target: "txpool", - "Some consistent data in ready set limit, bytes = {} while removing tx of size {}", - self.bytes, - deduced_bytes, - ); - self.bytes = 0; - } else { - self.bytes -= deduced_bytes; - } - val - } - - pub fn contains_key(&self, key: &Hash) -> bool { - self.index.contains_key(key) - } - - pub fn get(&self, key: &Hash) -> Option<&ReadyTx> { - self.index.get(key) - } - - pub fn get_mut(&mut self, key: &Hash) -> Option<&mut ReadyTx> { - self.index.get_mut(key) - } - - pub fn len(&self) -> usize { - self.index.len() - } - - pub fn bytes(&self) -> usize { - self.bytes - } - - pub fn values(&self) -> std::collections::hash_map::Values> { - self.index.values() - } -} - #[derive(Debug, parity_util_mem::MallocSizeOf)] pub struct ReadyTransactions { /// Insertion id @@ -177,11 +115,17 @@ pub struct ReadyTransactions { /// tags that are provided by Ready transactions provided_tags: HashMap, /// Transactions that are ready (i.e. don't have any requirements external to the pool) - ready: Arc>>, + ready: TrackedMap>, /// Best transactions that are ready to be included to the block without any other previous transaction. best: BTreeSet>, } +impl TrackedSize for ReadyTx { + fn tracked_size(&self) -> usize { + self.transaction.transaction.bytes + } +} + impl Default for ReadyTransactions { fn default() -> Self { ReadyTransactions { @@ -532,18 +476,18 @@ impl ReadyTransactions { /// Returns number of transactions in this queue. pub fn len(&self) -> usize { - self.ready.read().len() + self.ready.len() } /// Returns sum of encoding lengths of all transactions in this queue. pub fn bytes(&self) -> usize { - self.ready.read().bytes() + self.ready.bytes() } } /// Iterator of ready transactions ordered by priority. pub struct BestIterator { - all: Arc>>, + all: ReadOnlyTrackedMap>, awaiting: HashMap)>, best: BTreeSet>, } diff --git a/client/transaction-pool/graph/src/tracked_map.rs b/client/transaction-pool/graph/src/tracked_map.rs new file mode 100644 index 0000000000000..2073200d0c2d0 --- /dev/null +++ b/client/transaction-pool/graph/src/tracked_map.rs @@ -0,0 +1,189 @@ +// This file is part of Substrate. + +// Copyright (C) 2018-2020 Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use std::{ + collections::HashMap, + sync::{Arc, atomic::{AtomicIsize, Ordering as AtomicOrdering}}, +}; +use parking_lot::{RwLock, RwLockWriteGuard, RwLockReadGuard}; + +/// Something that can report it's size. +pub trait TrackedSize { + fn tracked_size(&self) -> usize; +} + +/// Map with size tracking. +/// +/// Size reported might be slightly off and only approximately true. +#[derive(Debug, parity_util_mem::MallocSizeOf)] +pub struct TrackedMap { + index: Arc>>, + bytes: AtomicIsize, + length: AtomicIsize, +} + +impl Default for TrackedMap { + fn default() -> Self { + Self { + index: Arc::new(HashMap::default().into()), + bytes: 0.into(), + length: 0.into(), + } + } +} + +impl TrackedMap { + /// Current tracked length of the content. + pub fn len(&self) -> usize { + std::cmp::max(self.length.load(AtomicOrdering::Relaxed), 0) as usize + } + + /// Current sum of content length. + pub fn bytes(&self) -> usize { + std::cmp::max(self.bytes.load(AtomicOrdering::Relaxed), 0) as usize + } + + /// Read-only clone of the interior. + pub fn clone(&self) -> ReadOnlyTrackedMap { + ReadOnlyTrackedMap(self.index.clone()) + } + + /// Lock map for read. + pub fn read<'a>(&'a self) -> TrackedMapReadAccess<'a, K, V> { + TrackedMapReadAccess { + inner_guard: self.index.read(), + } + } + + /// Lock map for write. + pub fn write<'a>(&'a self) -> TrackedMapWriteAccess<'a, K, V> { + TrackedMapWriteAccess { + inner_guard: self.index.write(), + bytes: &self.bytes, + length: &self.length, + } + } +} + +/// Read-only access to map. +/// +/// The only thing can be done is .read(). +pub struct ReadOnlyTrackedMap(Arc>>); + +impl ReadOnlyTrackedMap +where + K: Eq + std::hash::Hash +{ + /// Lock map for read. + pub fn read<'a>(&'a self) -> TrackedMapReadAccess<'a, K, V> { + TrackedMapReadAccess { + inner_guard: self.0.read(), + } + } +} + +pub struct TrackedMapReadAccess<'a, K, V> { + inner_guard: RwLockReadGuard<'a, HashMap>, +} + +impl<'a, K, V> TrackedMapReadAccess<'a, K, V> +where + K: Eq + std::hash::Hash +{ + /// Returns true if map contains key. + pub fn contains_key(&self, key: &K) -> bool { + self.inner_guard.contains_key(key) + } + + /// Returns reference to the contained value by key, if exists. + pub fn get(&self, key: &K) -> Option<&V> { + self.inner_guard.get(key) + } + + /// Returns iterator over all values. + pub fn values(&self) -> std::collections::hash_map::Values { + self.inner_guard.values() + } +} + +pub struct TrackedMapWriteAccess<'a, K, V> { + bytes: &'a AtomicIsize, + length: &'a AtomicIsize, + inner_guard: RwLockWriteGuard<'a, HashMap>, +} + +impl<'a, K, V> TrackedMapWriteAccess<'a, K, V> +where + K: Eq + std::hash::Hash, V: TrackedSize +{ + /// Insert value and return previous (if any). + pub fn insert(&mut self, key: K, val: V) -> Option { + let new_bytes = val.tracked_size(); + self.bytes.fetch_add(new_bytes as isize, AtomicOrdering::Relaxed); + self.length.fetch_add(1, AtomicOrdering::Relaxed); + self.inner_guard.insert(key, val).and_then(|old_val| { + self.bytes.fetch_sub(old_val.tracked_size() as isize, AtomicOrdering::Relaxed); + self.length.fetch_sub(1, AtomicOrdering::Relaxed); + Some(old_val) + }) + } + + /// Remove value by key. + pub fn remove(&mut self, key: &K) -> Option { + let val = self.inner_guard.remove(key); + if let Some(size) = val.as_ref().map(TrackedSize::tracked_size) { + self.bytes.fetch_sub(size as isize, AtomicOrdering::Relaxed); + self.length.fetch_sub(1, AtomicOrdering::Relaxed); + } + val + } + + /// Returns mutable reference to the contained value by key, if exists. + pub fn get_mut(&mut self, key: &K) -> Option<&mut V> { + self.inner_guard.get_mut(key) + } +} + +#[cfg(test)] +mod tests { + + use super::*; + + impl TrackedSize for i32 { + fn tracked_size(&self) -> usize { *self as usize / 10 } + } + + #[test] + fn basic() { + let map = TrackedMap::default(); + map.write().insert(5, 10); + map.write().insert(6, 20); + + assert_eq!(map.bytes(), 3); + assert_eq!(map.len(), 2); + + map.write().insert(6, 30); + + assert_eq!(map.bytes(), 4); + assert_eq!(map.len(), 2); + + map.write().remove(&6); + assert_eq!(map.bytes(), 1); + assert_eq!(map.len(), 1); + } +} \ No newline at end of file From 483de27f2215ff3a08b3391e62eb6c878390152d Mon Sep 17 00:00:00 2001 From: NikVolf Date: Tue, 9 Jun 2020 15:46:28 +0300 Subject: [PATCH 5/5] rename --- client/transaction-pool/graph/src/ready.rs | 6 +++--- client/transaction-pool/graph/src/tracked_map.rs | 16 ++++++++-------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/client/transaction-pool/graph/src/ready.rs b/client/transaction-pool/graph/src/ready.rs index 8afdab34db4de..47289f26f02a9 100644 --- a/client/transaction-pool/graph/src/ready.rs +++ b/client/transaction-pool/graph/src/ready.rs @@ -34,7 +34,7 @@ use sp_transaction_pool::error; use crate::{ base_pool::Transaction, future::WaitingTransaction, - tracked_map::{ReadOnlyTrackedMap, TrackedMap, TrackedSize}, + tracked_map::{self, ReadOnlyTrackedMap, TrackedMap}, }; /// An in-pool transaction reference. @@ -120,8 +120,8 @@ pub struct ReadyTransactions { best: BTreeSet>, } -impl TrackedSize for ReadyTx { - fn tracked_size(&self) -> usize { +impl tracked_map::Size for ReadyTx { + fn size(&self) -> usize { self.transaction.transaction.bytes } } diff --git a/client/transaction-pool/graph/src/tracked_map.rs b/client/transaction-pool/graph/src/tracked_map.rs index 2073200d0c2d0..c799eb0b96ea1 100644 --- a/client/transaction-pool/graph/src/tracked_map.rs +++ b/client/transaction-pool/graph/src/tracked_map.rs @@ -23,8 +23,8 @@ use std::{ use parking_lot::{RwLock, RwLockWriteGuard, RwLockReadGuard}; /// Something that can report it's size. -pub trait TrackedSize { - fn tracked_size(&self) -> usize; +pub trait Size { + fn size(&self) -> usize; } /// Map with size tracking. @@ -129,15 +129,15 @@ pub struct TrackedMapWriteAccess<'a, K, V> { impl<'a, K, V> TrackedMapWriteAccess<'a, K, V> where - K: Eq + std::hash::Hash, V: TrackedSize + K: Eq + std::hash::Hash, V: Size { /// Insert value and return previous (if any). pub fn insert(&mut self, key: K, val: V) -> Option { - let new_bytes = val.tracked_size(); + let new_bytes = val.size(); self.bytes.fetch_add(new_bytes as isize, AtomicOrdering::Relaxed); self.length.fetch_add(1, AtomicOrdering::Relaxed); self.inner_guard.insert(key, val).and_then(|old_val| { - self.bytes.fetch_sub(old_val.tracked_size() as isize, AtomicOrdering::Relaxed); + self.bytes.fetch_sub(old_val.size() as isize, AtomicOrdering::Relaxed); self.length.fetch_sub(1, AtomicOrdering::Relaxed); Some(old_val) }) @@ -146,7 +146,7 @@ where /// Remove value by key. pub fn remove(&mut self, key: &K) -> Option { let val = self.inner_guard.remove(key); - if let Some(size) = val.as_ref().map(TrackedSize::tracked_size) { + if let Some(size) = val.as_ref().map(Size::size) { self.bytes.fetch_sub(size as isize, AtomicOrdering::Relaxed); self.length.fetch_sub(1, AtomicOrdering::Relaxed); } @@ -164,8 +164,8 @@ mod tests { use super::*; - impl TrackedSize for i32 { - fn tracked_size(&self) -> usize { *self as usize / 10 } + impl Size for i32 { + fn size(&self) -> usize { *self as usize / 10 } } #[test]