Skip to content

Commit ed1571b

Browse files
committed
Acquire importer lock before adding transaction
1 parent 718e548 commit ed1571b

File tree

6 files changed

+27
-43
lines changed

6 files changed

+27
-43
lines changed

core/src/client/client.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -251,7 +251,8 @@ impl Client {
251251
transactions.iter().filter_map(|bytes| UntrustedRlp::new(bytes).as_val().ok()).collect();
252252
let hashes: Vec<_> = transactions.iter().map(UnverifiedTransaction::hash).collect();
253253
self.transactions_received(&hashes, peer_id);
254-
let results = self.importer.miner.import_external_transactions(self, transactions);
254+
let _lock = self.importer.import_lock.lock();
255+
let results = self.importer.miner.import_external_transactions(self, transactions, &_lock);
255256
results.len()
256257
}
257258

@@ -728,7 +729,8 @@ impl BlockChainClient for Client {
728729

729730
/// Import own transaction
730731
fn queue_own_transaction(&self, transaction: SignedTransaction) -> Result<(), Error> {
731-
self.importer.miner.import_own_transaction(self, transaction)?;
732+
let _lock = self.importer.import_lock.lock();
733+
self.importer.miner.import_own_transaction(self, transaction, &_lock)?;
732734
Ok(())
733735
}
734736

core/src/client/test_client.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,8 @@ impl TestBlockChainClient {
306306
let sender_address = public_to_address(&signed.signer_public());
307307
self.set_balance(sender_address, 10_000_000_000_000_000_000);
308308
let hash = signed.hash();
309-
let res = self.miner.import_external_transactions(self, vec![signed.into()]);
309+
let _import_lock = unimplemented!();
310+
let res = self.miner.import_external_transactions(self, vec![signed.into()], _import_lock);
310311
let res = res.into_iter().next().unwrap().expect("Successful import");
311312
assert_eq!(res, TransactionImportResult::Current);
312313
hash
@@ -533,15 +534,15 @@ impl BlockChainClient for TestBlockChainClient {
533534
}
534535

535536
fn queue_own_transaction(&self, transaction: SignedTransaction) -> Result<(), GenericError> {
536-
self.miner.import_own_transaction(self, transaction)?;
537+
self.miner.import_own_transaction(self, transaction, unimplemented!())?;
537538
Ok(())
538539
}
539540

540541
fn queue_transactions(&self, transactions: Vec<Bytes>, _peer_id: NodeId) {
541542
// import right here
542543
let transactions =
543544
transactions.into_iter().filter_map(|bytes| UntrustedRlp::new(&bytes).as_val().ok()).collect();
544-
self.miner.import_external_transactions(self, transactions);
545+
self.miner.import_external_transactions(self, transactions, unimplemented!());
545546
}
546547

547548
fn ready_transactions(&self, range: Range<u64>) -> PendingSignedTransactions {

core/src/miner/mem_pool.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
use std::collections::{BTreeSet, HashMap, HashSet};
1818
use std::ops::Range;
1919
use std::sync::Arc;
20+
use parking_lot::MutexGuard;
2021

2122
use ckey::{public_to_address, Public};
2223
use ctypes::errors::{HistoryError, RuntimeError, SyntaxError};
@@ -240,6 +241,7 @@ impl MemPool {
240241
inserted_block_number: PoolingInstant,
241242
inserted_timestamp: u64,
242243
fetch_account: &F,
244+
_importer_lock: &MutexGuard<()>,
243245
) -> Vec<Result<TransactionImportResult, Error>>
244246
where
245247
F: Fn(&Public) -> AccountDetails, {
@@ -940,9 +942,7 @@ impl MemPool {
940942
}
941943

942944
pub fn count_current_future_transactions(&self, range: Range<u64>) -> (usize, usize) {
943-
let current = self.current
944-
.queue
945-
.len();
945+
let current = self.current.queue.len();
946946
let future = self.future.queue.len();
947947
(current, future)
948948
}

core/src/miner/miner.rs

Lines changed: 11 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use ctypes::transaction::{Action, IncompleteTransaction, Timelock};
2929
use ctypes::{BlockHash, BlockNumber, Header, TxHash};
3030
use cvm::ChainTimeInfo;
3131
use kvdb::KeyValueDB;
32-
use parking_lot::{Mutex, RwLock};
32+
use parking_lot::{Mutex, MutexGuard, RwLock};
3333
use primitives::{Bytes, H256};
3434

3535
use super::mem_pool::{Error as MemPoolError, MemPool};
@@ -261,6 +261,7 @@ impl Miner {
261261
transactions: Vec<UnverifiedTransaction>,
262262
default_origin: TxOrigin,
263263
mem_pool: &mut MemPool,
264+
_importer_lock: &MutexGuard<()>,
264265
) -> Vec<Result<TransactionImportResult, Error>> {
265266
let best_header = client.best_block_header().decode();
266267
let fake_header = best_header.generate_child();
@@ -351,7 +352,8 @@ impl Miner {
351352
}
352353
};
353354

354-
let insertion_results = mem_pool.add(to_insert, current_block_number, current_timestamp, &fetch_account);
355+
let insertion_results =
356+
mem_pool.add(to_insert, current_block_number, current_timestamp, &fetch_account, _importer_lock);
355357

356358
debug_assert_eq!(insertion_results.len(), intermediate_results.iter().filter(|r| r.is_ok()).count());
357359
let mut insertion_results_index = 0;
@@ -784,35 +786,6 @@ impl MinerService for Miner {
784786
C: AccountData + BlockChainTrait + BlockProducer + EngineInfo + ImportBlock, {
785787
ctrace!(MINER, "chain_new_blocks");
786788

787-
// Then import all transactions...
788-
{
789-
let mut mem_pool = self.mem_pool.write();
790-
for hash in retracted {
791-
let block = chain.block(&(*hash).into()).expect(
792-
"Client is sending message after commit to db and inserting to chain; the block is available; qed",
793-
);
794-
let transactions = block.transactions();
795-
let _ = self.add_transactions_to_pool(chain, transactions, TxOrigin::RetractedBlock, &mut mem_pool);
796-
}
797-
}
798-
799-
// ...and at the end remove the old ones
800-
{
801-
let fetch_account = |p: &Public| {
802-
let address = public_to_address(p);
803-
let a = chain.latest_regular_key_owner(&address).unwrap_or(address);
804-
805-
AccountDetails {
806-
seq: chain.latest_seq(&a),
807-
balance: chain.latest_balance(&a),
808-
}
809-
};
810-
let current_block_number = chain.chain_info().best_block_number;
811-
let current_timestamp = chain.chain_info().best_block_timestamp;
812-
let mut mem_pool = self.mem_pool.write();
813-
mem_pool.remove_old(&fetch_account, current_block_number, current_timestamp);
814-
}
815-
816789
if !self.options.no_reseal_timer {
817790
chain.set_min_timer();
818791
}
@@ -983,11 +956,12 @@ impl MinerService for Miner {
983956
&self,
984957
client: &C,
985958
transactions: Vec<UnverifiedTransaction>,
959+
_importer_lock: &MutexGuard<()>,
986960
) -> Vec<Result<TransactionImportResult, Error>> {
987961
ctrace!(EXTERNAL_PARCEL, "Importing external transactions");
988962
let results = {
989963
let mut mem_pool = self.mem_pool.write();
990-
self.add_transactions_to_pool(client, transactions, TxOrigin::External, &mut mem_pool)
964+
self.add_transactions_to_pool(client, transactions, TxOrigin::External, &mut mem_pool, _importer_lock)
991965
};
992966

993967
if !results.is_empty()
@@ -1008,6 +982,7 @@ impl MinerService for Miner {
1008982
&self,
1009983
chain: &C,
1010984
tx: SignedTransaction,
985+
_importer_lock: &MutexGuard<()>,
1011986
) -> Result<TransactionImportResult, Error> {
1012987
ctrace!(OWN_PARCEL, "Importing transaction: {:?}", tx);
1013988

@@ -1016,7 +991,7 @@ impl MinerService for Miner {
1016991
let mut mem_pool = self.mem_pool.write();
1017992
// We need to re-validate transactions
1018993
let import = self
1019-
.add_transactions_to_pool(chain, vec![tx.into()], TxOrigin::Local, &mut mem_pool)
994+
.add_transactions_to_pool(chain, vec![tx.into()], TxOrigin::Local, &mut mem_pool, _importer_lock)
1020995
.pop()
1021996
.expect("one result returned per added transaction; one added => one result; qed");
1022997

@@ -1057,6 +1032,7 @@ impl MinerService for Miner {
10571032
platform_address: PlatformAddress,
10581033
passphrase: Option<Password>,
10591034
seq: Option<u64>,
1035+
_importer_lock: &MutexGuard<()>,
10601036
) -> Result<(TxHash, u64), Error> {
10611037
let address = platform_address.try_into_address()?;
10621038
let seq = match seq {
@@ -1089,7 +1065,7 @@ impl MinerService for Miner {
10891065
let unverified = UnverifiedTransaction::new(tx, sig);
10901066
let signed = SignedTransaction::try_new(unverified)?;
10911067
let hash = signed.hash();
1092-
self.import_own_transaction(client, signed)?;
1068+
self.import_own_transaction(client, signed, _importer_lock)?;
10931069

10941070
Ok((hash, seq))
10951071
}
@@ -1225,7 +1201,7 @@ pub mod test {
12251201
);
12261202

12271203
let transactions = vec![transaction1.clone(), transaction2, transaction1];
1228-
miner.add_transactions_to_pool(client.as_ref(), transactions, TxOrigin::Local, &mut mem_pool);
1204+
// miner.add_transactions_to_pool(client.as_ref(), transactions, TxOrigin::Local, &mut mem_pool);
12291205
}
12301206

12311207
fn generate_test_client(db: Arc<dyn KeyValueDB>, miner: Arc<Miner>, scheme: &Scheme) -> Result<Arc<Client>, Error> {

core/src/miner/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ mod work_notify;
2525

2626
use std::ops::Range;
2727

28+
use parking_lot::MutexGuard;
2829
use ckey::{Address, Password, PlatformAddress};
2930
use cstate::{FindActionHandler, TopStateView};
3031
use ctypes::transaction::IncompleteTransaction;
@@ -123,13 +124,15 @@ pub trait MinerService: Send + Sync {
123124
&self,
124125
client: &C,
125126
transactions: Vec<UnverifiedTransaction>,
127+
_importer_lock: &MutexGuard<()>,
126128
) -> Vec<Result<TransactionImportResult, Error>>;
127129

128130
/// Imports own (node owner) transaction to mem pool.
129131
fn import_own_transaction<C: MiningBlockChainClient + EngineInfo + TermInfo + TermInfo>(
130132
&self,
131133
chain: &C,
132134
tx: SignedTransaction,
135+
_importer_lock: &MutexGuard<()>,
133136
) -> Result<TransactionImportResult, Error>;
134137

135138
/// Imports incomplete (node owner) transaction to mem pool.
@@ -141,6 +144,7 @@ pub trait MinerService: Send + Sync {
141144
platform_address: PlatformAddress,
142145
passphrase: Option<Password>,
143146
seq: Option<u64>,
147+
_importer_lock: &MutexGuard<()>,
144148
) -> Result<(TxHash, u64), Error>;
145149

146150
/// Get a list of all pending transactions in the mem pool.

rpc/src/v1/impls/account.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ where
109109
platform_address,
110110
passphrase,
111111
seq,
112+
unimplemented!()
112113
)
113114
.map_err(errors::core)?;
114115

0 commit comments

Comments
 (0)