From b190a8d9a18904c121c4bb2488ce4d68d2dcc9ec Mon Sep 17 00:00:00 2001 From: Seulgi Kim Date: Mon, 20 May 2019 20:58:35 +0900 Subject: [PATCH] Implement Drop for SnapshotService --- sync/src/snapshot/service.rs | 32 +++++++++++++++++++++++++++----- 1 file changed, 27 insertions(+), 5 deletions(-) diff --git a/sync/src/snapshot/service.rs b/sync/src/snapshot/service.rs index e076dc3661..615f19f808 100644 --- a/sync/src/snapshot/service.rs +++ b/sync/src/snapshot/service.rs @@ -14,13 +14,16 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +use std::collections::HashMap; use std::io::ErrorKind; use std::path::PathBuf; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; -use std::thread::spawn; +use std::thread::{spawn, JoinHandle}; use ccore::{BlockChainClient, BlockChainTrait, BlockId, ChainNotify, Client, DatabaseClient}; use ctypes::BlockHash; +use parking_lot::Mutex; use super::error::Error; use super::snapshot::{Snapshot, WriteSnapshot}; @@ -31,6 +34,8 @@ pub struct Service { root_dir: String, /// Snapshot creation period in unit of block numbers period: u64, + thread_ids: AtomicUsize, + joins: Arc>>>, } impl Service { @@ -39,6 +44,8 @@ impl Service { client, root_dir, period, + thread_ids: AtomicUsize::new(0), + joins: Default::default(), }) } } @@ -66,11 +73,26 @@ impl ChainNotify for Service { let path: PathBuf = [self.root_dir.clone(), format!("{:x}", *header.hash())].iter().collect(); let root = header.state_root(); // FIXME: The db can be corrupted because the CodeChain doesn't wait child threads end on exit. - spawn(move || match Snapshot::try_new(path).map(|s| s.write_snapshot(db.as_ref(), &root)) { - Ok(_) => {} - Err(Error::FileError(ErrorKind::AlreadyExists)) => {} - Err(e) => cerror!(SNAPSHOT, "{}", e), + let id = self.thread_ids.fetch_add(1, Ordering::SeqCst); + let joins = Arc::clone(&self.joins); + let join = spawn(move || { + match Snapshot::try_new(path).map(|s| s.write_snapshot(db.as_ref(), &root)) { + Ok(_) => {} + Err(Error::FileError(ErrorKind::AlreadyExists)) => {} + Err(e) => cerror!(SNAPSHOT, "{}", e), + } + joins.lock().remove(&id); }); + self.joins.lock().insert(id, join); + } + } +} + +impl Drop for Service { + fn drop(&mut self) { + let mut joins = self.joins.lock(); + for (_, join) in joins.drain() { + join.join().unwrap(); } } }