diff --git a/core/src/miner/miner.rs b/core/src/miner/miner.rs index 0278715cff..ccf5b791f9 100644 --- a/core/src/miner/miner.rs +++ b/core/src/miner/miner.rs @@ -16,7 +16,6 @@ use std::collections::HashSet; use std::iter::once; -use std::iter::FromIterator; use std::ops::Range; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; @@ -30,7 +29,7 @@ use ctypes::{BlockHash, BlockNumber, Header, TxHash}; use cvm::ChainTimeInfo; use kvdb::KeyValueDB; use parking_lot::{Mutex, RwLock}; -use primitives::{Bytes, H256}; +use primitives::{Bytes, H256, U256}; use super::mem_pool::{Error as MemPoolError, MemPool}; use super::mem_pool_types::{AccountDetails, MemPoolInput, TxOrigin, TxTimelock}; @@ -111,31 +110,155 @@ struct SealingWork { enabled: bool, } -type TransactionListener = Box; - pub struct Miner { mem_pool: Arc>, - transaction_listener: RwLock>, - next_allowed_reseal: Mutex, - next_mandatory_reseal: RwLock, - sealing_block_last_request: Mutex, + next_allowed_reseal: NextAllowedReseal, + next_mandatory_reseal: NextMandatoryReseal, + sealing_block_last_request: SealingBlockLastRequest, sealing_work: Mutex, - params: RwLock, + params: Params, engine: Arc, options: MinerOptions, sealing_enabled: AtomicBool, accounts: Option>, + notifiers: Notifiers, + malicious_users: Users, + immune_users: Users, +} + +struct Users { + users: RwLock>, +} + +impl Users { + pub fn new() -> Self { + Self { + users: RwLock::new(HashSet::new()), + } + } + + pub fn cloned(&self) -> Vec
{ + self.users.read().iter().map(Clone::clone).collect() + } + + pub fn contains(&self, address: &Address) -> bool { + self.users.read().contains(address) + } + + pub fn insert(&self, address: Address) -> bool { + self.users.write().insert(address) + } + + pub fn remove_users<'a>(&self, addresses: impl Iterator) { + let mut users = self.users.write(); + for address in addresses { + users.remove(address); + } + } +} + +struct Notifiers { notifiers: RwLock>>, - malicious_users: RwLock>, - immune_users: RwLock>, +} + +impl Notifiers { + pub fn new(notifiers: Vec>) -> Self { + Self { + notifiers: RwLock::new(notifiers), + } + } + + pub fn push(&self, notifier: Box) { + self.notifiers.write().push(notifier); + } + + pub fn is_empty(&self) -> bool { + self.notifiers.read().is_empty() + } + + pub fn notify(&self, pow_hash: H256, target: U256) { + // FIXME: Calling callbacks inside of lock lifetime may cause a deadlock. + for notifier in self.notifiers.read().iter() { + notifier.notify(pow_hash, target) + } + } +} + +struct SealingBlockLastRequest { + block_number: Mutex, +} + +impl SealingBlockLastRequest { + pub fn new() -> Self { + Self { + block_number: Mutex::new(0), + } + } + + pub fn get(&self) -> u64 { + *self.block_number.lock() + } + + /// Returns previous value + pub fn set(&self, block_number: u64) -> u64 { + let mut guard = self.block_number.lock(); + let prev = *guard; + *guard = block_number; + prev + } +} + +type NextAllowedReseal = NextMandatoryReseal; + +struct NextMandatoryReseal { + instant: RwLock, +} + +impl NextMandatoryReseal { + pub fn new(instant: Instant) -> Self { + Self { + instant: RwLock::new(instant), + } + } + + pub fn get(&self) -> Instant { + *self.instant.read() + } + + pub fn set(&self, instant: Instant) { + *self.instant.write() = instant; + } +} + +struct Params { + params: RwLock, +} + +impl Params { + pub fn new(params: AuthoringParams) -> Self { + Self { + params: RwLock::new(params), + } + } + + pub fn get(&self) -> AuthoringParams { + self.params.read().clone() + } + + pub fn apply(&self, f: F) + where + F: FnOnce(&mut AuthoringParams) -> (), { + let mut params = self.params.write(); + f(&mut params); + } } impl Miner { /// Push listener that will handle new jobs pub fn add_work_listener(&self, notifier: Box) { - self.notifiers.write().push(notifier); + self.notifiers.push(notifier); } pub fn new( @@ -173,11 +296,10 @@ impl Miner { Self { mem_pool, - transaction_listener: RwLock::new(vec![]), - next_allowed_reseal: Mutex::new(Instant::now()), - next_mandatory_reseal: RwLock::new(Instant::now() + options.reseal_max_period), - params: RwLock::new(AuthoringParams::default()), - sealing_block_last_request: Mutex::new(0), + next_allowed_reseal: NextAllowedReseal::new(Instant::now()), + next_mandatory_reseal: NextMandatoryReseal::new(Instant::now() + options.reseal_max_period), + params: Params::new(AuthoringParams::default()), + sealing_block_last_request: SealingBlockLastRequest::new(), sealing_work: Mutex::new(SealingWork { queue: SealingQueue::new(options.work_queue_size), enabled: options.force_sealing || scheme.engine.seals_internally().is_some(), @@ -186,9 +308,9 @@ impl Miner { options, sealing_enabled: AtomicBool::new(true), accounts, - notifiers: RwLock::new(notifiers), - malicious_users: RwLock::new(HashSet::new()), - immune_users: RwLock::new(HashSet::new()), + notifiers: Notifiers::new(notifiers), + malicious_users: Users::new(), + immune_users: Users::new(), } } @@ -196,11 +318,6 @@ impl Miner { self.mem_pool.write().recover_from_db(client); } - /// Set a callback to be notified about imported transactions' hashes. - pub fn add_transactions_listener(&self, f: Box) { - self.transaction_listener.write().push(f); - } - /// Get `Some` `clone()` of the current pending block's state or `None` if we're not sealing. pub fn pending_state(&self, latest_block_number: BlockNumber) -> Option { self.map_pending_block(|b| b.state().clone(), latest_block_number) @@ -226,7 +343,7 @@ impl Miner { let mut sealing_work = self.sealing_work.lock(); if sealing_work.enabled { ctrace!(MINER, "requires_reseal: sealing enabled"); - let last_request = *self.sealing_block_last_request.lock(); + let last_request = self.sealing_block_last_request.get(); let should_disable_sealing = !self.options.force_sealing && !has_local_transactions && self.engine.seals_internally().is_none() @@ -278,7 +395,7 @@ impl Miner { let signer_public = tx.recover_public()?; let signer_address = public_to_address(&signer_public); if default_origin.is_local() { - self.immune_users.write().insert(signer_address); + self.immune_users.insert(signer_address); } let origin = self @@ -291,7 +408,7 @@ impl Miner { }) .unwrap_or(default_origin); - if self.malicious_users.read().contains(&signer_address) { + if self.malicious_users.contains(&signer_address) { // FIXME: just to skip, think about another way. return Ok(()) } @@ -302,7 +419,6 @@ impl Miner { if !self.is_allowed_transaction(&tx.action) { cdebug!(MINER, "Rejected transaction {:?}: {:?} is not allowed transaction", hash, tx.action); } - let immune_users = self.immune_users.read(); let tx = tx .verify_basic() .map_err(From::from) @@ -313,8 +429,8 @@ impl Miner { .and_then(|_| CodeChainMachine::verify_transaction_seal(tx, &fake_header)) .map_err(|e| { match e { - Error::Syntax(_) if !origin.is_local() && !immune_users.contains(&signer_address) => { - self.malicious_users.write().insert(signer_address); + Error::Syntax(_) if !origin.is_local() && !self.immune_users.contains(&signer_address) => { + self.malicious_users.insert(signer_address); } _ => {} } @@ -325,8 +441,8 @@ impl Miner { // This check goes here because verify_transaction takes SignedTransaction parameter self.engine.machine().verify_transaction(&tx, &fake_header, client, false).map_err(|e| { match e { - Error::Syntax(_) if !origin.is_local() && !immune_users.contains(&signer_address) => { - self.malicious_users.write().insert(signer_address); + Error::Syntax(_) if !origin.is_local() && !self.immune_users.contains(&signer_address) => { + self.malicious_users.insert(signer_address); } _ => {} } @@ -355,7 +471,7 @@ impl Miner { debug_assert_eq!(insertion_results.len(), intermediate_results.iter().filter(|r| r.is_ok()).count()); let mut insertion_results_index = 0; - let results = intermediate_results + intermediate_results .into_iter() .map(|res| match res { Err(e) => Err(e), @@ -367,13 +483,7 @@ impl Miner { Ok(result) } }) - .collect(); - - for listener in &*self.transaction_listener.read() { - listener(&inserted); - } - - results + .collect() } fn calculate_timelock(&self, tx: &SignedTransaction, client: &C) -> Result { @@ -448,7 +558,7 @@ impl Miner { let is_new = original_work_hash.map_or(true, |h| *block.block().header().hash() != h); sealing_work.queue.push(block); // If push notifications are enabled we assume all work items are used. - if !self.notifiers.read().is_empty() && is_new { + if !self.notifiers.is_empty() && is_new { sealing_work.queue.use_last_ref(); } (Some((pow_hash, score, number)), is_new) @@ -465,9 +575,7 @@ impl Miner { if is_new { if let Some((pow_hash, score, _number)) = work { let target = self.engine.score_to_target(&score); - for notifier in self.notifiers.read().iter() { - notifier.notify(pow_hash, target) - } + self.notifiers.notify(pow_hash, target); } } } @@ -485,7 +593,7 @@ impl Miner { let last_work_hash = sealing_work.queue.peek_last_ref().map(|pb| *pb.block().header().hash()); ctrace!(MINER, "prepare_block: No existing work - making new block"); - let params = self.params.read().clone(); + let params = self.params.get(); let open_block = chain.prepare_open_block(parent_block_id, params.author, params.extra_data); let (block_number, parent_hash) = { let header = open_block.block().header(); @@ -529,11 +637,10 @@ impl Miner { let tx_total = transactions.len(); let mut invalid_tx_users = HashSet::new(); - let immune_users = self.immune_users.read(); for tx in transactions { let signer_public = tx.signer_public(); let signer_address = public_to_address(&signer_public); - if self.malicious_users.read().contains(&signer_address) { + if self.malicious_users.contains(&signer_address) { invalid_transactions.push(tx.hash()); continue } @@ -567,9 +674,9 @@ impl Miner { .read() .is_local_transaction(hash) .expect("The tx is clearly fetched from the mempool") - && !immune_users.contains(&signer_address) + && !self.immune_users.contains(&signer_address) { - self.malicious_users.write().insert(signer_address); + self.malicious_users.insert(signer_address); } } _ => {} @@ -635,7 +742,7 @@ impl Miner { C: BlockChainTrait + ImportBlock, { if block.transactions().is_empty() && !self.options.force_sealing - && Instant::now() <= *self.next_mandatory_reseal.read() + && Instant::now() <= self.next_mandatory_reseal.get() { cdebug!(MINER, "seal_block_internally: no sealing."); return false @@ -652,7 +759,7 @@ impl Miner { return false } - *self.next_mandatory_reseal.write() = Instant::now() + self.options.reseal_max_period; + self.next_mandatory_reseal.set(Instant::now() + self.options.reseal_max_period); let sealed = if self.engine_type().is_seal_first() { block.lock().already_sealed() } else { @@ -679,7 +786,7 @@ impl Miner { /// Are we allowed to do a non-mandatory reseal? fn transaction_reseal_allowed(&self) -> bool { - self.sealing_enabled.load(Ordering::Relaxed) && (Instant::now() > *self.next_allowed_reseal.lock()) + self.sealing_enabled.load(Ordering::Relaxed) && (Instant::now() > self.next_allowed_reseal.get()) } fn map_pending_block(&self, f: F, latest_block_number: BlockNumber) -> Option @@ -724,11 +831,11 @@ impl MinerService for Miner { } fn authoring_params(&self) -> AuthoringParams { - self.params.read().clone() + self.params.get() } fn set_author(&self, address: Address) -> Result<(), AccountProviderError> { - self.params.write().author = address; + self.params.apply(|params| params.author = address); if self.engine_type().need_signer_key() && self.engine.seals_internally().is_some() { if let Some(ref ap) = self.accounts { @@ -752,7 +859,7 @@ impl MinerService for Miner { } fn set_extra_data(&self, extra_data: Bytes) { - self.params.write().extra_data = extra_data; + self.params.apply(|params| params.extra_data = extra_data); } fn minimal_fee(&self) -> u64 { @@ -859,16 +966,16 @@ impl MinerService for Miner { } } } - let mut sealing_block_last_request = self.sealing_block_last_request.lock(); + let best_number = client.chain_info().best_block_number; - if *sealing_block_last_request != best_number { + let prev_request = self.sealing_block_last_request.set(best_number); + if prev_request != best_number { ctrace!( MINER, "prepare_work_sealing: Miner received request (was {}, now {}) - waking up.", - *sealing_block_last_request, + prev_request, best_number ); - *sealing_block_last_request = best_number; } // Return if we restarted @@ -930,7 +1037,7 @@ impl MinerService for Miner { } // Sealing successful - *self.next_allowed_reseal.lock() = Instant::now() + self.options.reseal_min_period; + self.next_allowed_reseal.set(Instant::now() + self.options.reseal_min_period); if !self.options.no_reseal_timer { chain.set_min_timer(); } @@ -1126,32 +1233,23 @@ impl MinerService for Miner { } fn get_malicious_users(&self) -> Vec
{ - Vec::from_iter(self.malicious_users.read().iter().map(Clone::clone)) + self.malicious_users.cloned() } fn release_malicious_users(&self, prisoner_vec: Vec
) { - let mut malicious_users = self.malicious_users.write(); - for address in prisoner_vec { - malicious_users.remove(&address); - } + self.malicious_users.remove_users(prisoner_vec.iter()); } fn imprison_malicious_users(&self, prisoner_vec: Vec
) { - let mut malicious_users = self.malicious_users.write(); - for address in prisoner_vec { - malicious_users.insert(address); - } + self.malicious_users.remove_users(prisoner_vec.iter()); } fn get_immune_users(&self) -> Vec
{ - Vec::from_iter(self.immune_users.read().iter().map(Clone::clone)) + self.immune_users.cloned() } fn register_immune_users(&self, immune_user_vec: Vec
) { - let mut immune_users = self.immune_users.write(); - for address in immune_user_vec { - immune_users.insert(address); - } + self.immune_users.remove_users(immune_user_vec.iter()) } }