diff --git a/Cargo.lock b/Cargo.lock index e7ef0c5b7c..fb1bfb5f43 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -296,7 +296,6 @@ dependencies = [ "log 0.4.6 (registry+https://github.com/rust-lang/crates.io-index)", "memorydb 0.1.1", "num-rational 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)", - "num_cpus 1.8.0 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.6.4 (registry+https://github.com/rust-lang/crates.io-index)", "primitives 0.4.0 (git+https://github.com/CodeChain-io/rust-codechain-primitives.git)", "rand 0.6.1 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/core/Cargo.toml b/core/Cargo.toml index 8b0043cd0d..a64f993f2c 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -24,7 +24,6 @@ hyper = { git = "https://github.com/paritytech/hyper", default-features = false journaldb = { path = "../util/journaldb" } linked-hash-map = "0.5" log = "0.4.6" -num_cpus = "1.8" kvdb = { path = "../util/kvdb" } kvdb-rocksdb = { path = "../util/kvdb-rocksdb" } kvdb-memorydb = { path = "../util/kvdb-memorydb" } diff --git a/core/src/client/importer.rs b/core/src/client/importer.rs index f5b40fac73..d49b405645 100644 --- a/core/src/client/importer.rs +++ b/core/src/client/importer.rs @@ -305,7 +305,7 @@ impl Importer { /// This is triggered by a message coming from a header queue when the header is ready for insertion pub fn import_verified_headers(&self, client: &Client) -> usize { - const MAX_HEADERS_TO_IMPORT: usize = 10_000; + const MAX_HEADERS_TO_IMPORT: usize = 1_000; let lock = self.import_lock.lock(); let headers = self.header_queue.drain(MAX_HEADERS_TO_IMPORT); self.import_headers(&headers, client, &lock) diff --git a/core/src/lib.rs b/core/src/lib.rs index 63b2909d1f..b0d8bd0016 100644 --- a/core/src/lib.rs +++ b/core/src/lib.rs @@ -39,7 +39,6 @@ extern crate kvdb_memorydb; extern crate kvdb_rocksdb; extern crate linked_hash_map; extern crate memorydb; -extern crate num_cpus; extern crate num_rational; extern crate primitives; extern crate rand; diff --git a/core/src/verification/queue/mod.rs b/core/src/verification/queue/mod.rs index 0605dc4848..b089a4c3be 100644 --- a/core/src/verification/queue/mod.rs +++ b/core/src/verification/queue/mod.rs @@ -23,7 +23,6 @@ use std::sync::{Arc, Condvar as SCondvar, Mutex as SMutex}; use std::thread::{self, JoinHandle}; use cio::IoChannel; -use num_cpus; use parking_lot::{Mutex, RwLock}; use primitives::{H256, U256}; @@ -36,8 +35,8 @@ use crate::types::{BlockStatus as Status, VerificationQueueInfo as QueueInfo}; const MIN_MEM_LIMIT: usize = 16384; const MIN_QUEUE_LIMIT: usize = 512; -// maximum possible number of verification threads. -const MAX_VERIFIERS: usize = 8; +// number of verification threads. +const NUM_VERIFIERS: usize = 2; /// Type alias for block queue convenience. pub type BlockQueue = VerificationQueue; @@ -150,10 +149,9 @@ impl VerificationQueue { let empty = Arc::new(SCondvar::new()); let more_to_verify = Arc::new(SCondvar::new()); - let num_verifiers = cmp::min(num_cpus::get(), MAX_VERIFIERS); - let mut verifier_handles = Vec::with_capacity(num_verifiers); + let mut verifier_handles = Vec::with_capacity(NUM_VERIFIERS); - for i in 0..num_verifiers { + for i in 0..NUM_VERIFIERS { let engine = engine.clone(); let verification = verification.clone(); let more_to_verify = more_to_verify.clone(); diff --git a/sync/src/block/downloader/header.rs b/sync/src/block/downloader/header.rs index 695a502ede..76f88a02f0 100644 --- a/sync/src/block/downloader/header.rs +++ b/sync/src/block/downloader/header.rs @@ -45,6 +45,7 @@ pub struct HeaderDownloader { pivot: Pivot, request_time: Option, downloaded: HashMap, + queued: HashMap, trial: usize, } @@ -69,6 +70,7 @@ impl HeaderDownloader { }, request_time: None, downloaded: HashMap::new(), + queued: HashMap::new(), trial: 0, } } @@ -100,12 +102,15 @@ impl HeaderDownloader { self.request_time.map_or(false, |time| (Instant::now() - time).as_secs() > MAX_WAIT) } - /// Find header from download cache, and then from blockchain + /// Find header from queued headers, downloaded cache and then from blockchain /// Panics if header dosn't exist fn pivot_header(&self) -> Header { - match self.downloaded.get(&self.pivot.hash) { + match self.queued.get(&self.pivot.hash) { Some(header) => header.clone(), - None => self.client.block_header(&BlockId::Hash(self.pivot.hash)).unwrap(), + None => match self.downloaded.get(&self.pivot.hash) { + Some(header) => header.clone(), + None => self.client.block_header(&BlockId::Hash(self.pivot.hash)).unwrap(), + }, } } @@ -173,7 +178,7 @@ impl HeaderDownloader { pub fn mark_as_imported(&mut self, hashes: Vec) { for hash in hashes { - self.downloaded.remove(&hash); + self.queued.remove(&hash); if self.best_hash == hash { self.pivot = Pivot { @@ -183,4 +188,12 @@ impl HeaderDownloader { } } } + + pub fn mark_as_queued(&mut self, hashes: Vec) { + for hash in hashes { + if let Some(header) = self.downloaded.remove(&hash) { + self.queued.insert(hash, header); + } + } + } } diff --git a/sync/src/block/extension.rs b/sync/src/block/extension.rs index 914df74241..01cdb59852 100644 --- a/sync/src/block/extension.rs +++ b/sync/src/block/extension.rs @@ -680,9 +680,13 @@ impl Extension { completed.sort_unstable_by_key(EncodedHeader::number); let mut exists = Vec::new(); + let mut queued = Vec::new(); + for header in completed { + let hash = header.hash(); match self.client.import_header(header.clone().into_inner()) { - Err(BlockImportError::Import(ImportError::AlreadyInChain)) => exists.push(header.hash()), + Err(BlockImportError::Import(ImportError::AlreadyInChain)) => exists.push(hash), + Err(BlockImportError::Import(ImportError::AlreadyQueued)) => queued.push(hash), // FIXME: handle import errors Err(err) => { cwarn!(SYNC, "Cannot import header({}): {:?}", header.hash(), err); @@ -693,6 +697,7 @@ impl Extension { } let request = self.header_downloaders.get_mut(from).and_then(|peer| { + peer.mark_as_queued(queued); peer.mark_as_imported(exists); peer.create_request() });