Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 27 additions & 5 deletions sync/src/snapshot/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

use std::collections::HashMap;
use std::io::ErrorKind;
use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::thread::spawn;
use std::thread::{spawn, JoinHandle};

use ccore::{BlockChainClient, BlockChainTrait, BlockId, ChainNotify, Client, DatabaseClient};
use ctypes::BlockHash;
use parking_lot::Mutex;

use super::error::Error;
use super::snapshot::{Snapshot, WriteSnapshot};
Expand All @@ -31,6 +34,8 @@ pub struct Service {
root_dir: String,
/// Snapshot creation period in unit of block numbers
period: u64,
thread_ids: AtomicUsize,
joins: Arc<Mutex<HashMap<usize, JoinHandle<()>>>>,
}

impl Service {
Expand All @@ -39,6 +44,8 @@ impl Service {
client,
root_dir,
period,
thread_ids: AtomicUsize::new(0),
joins: Default::default(),
})
}
}
Expand Down Expand Up @@ -66,11 +73,26 @@ impl ChainNotify for Service {
let path: PathBuf = [self.root_dir.clone(), format!("{:x}", *header.hash())].iter().collect();
let root = header.state_root();
// FIXME: The db can be corrupted because the CodeChain doesn't wait child threads end on exit.
spawn(move || match Snapshot::try_new(path).map(|s| s.write_snapshot(db.as_ref(), &root)) {
Ok(_) => {}
Err(Error::FileError(ErrorKind::AlreadyExists)) => {}
Err(e) => cerror!(SNAPSHOT, "{}", e),
let id = self.thread_ids.fetch_add(1, Ordering::SeqCst);
let joins = Arc::clone(&self.joins);
let join = spawn(move || {
match Snapshot::try_new(path).map(|s| s.write_snapshot(db.as_ref(), &root)) {
Ok(_) => {}
Err(Error::FileError(ErrorKind::AlreadyExists)) => {}
Err(e) => cerror!(SNAPSHOT, "{}", e),
}
joins.lock().remove(&id);
});
self.joins.lock().insert(id, join);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There might be a race condition where the spawned thread exits first, so joins.remove() is called first and joins.insert() is called later.

How about cleaning it lazily?

fn new_blocks() {
	...
	let arc = Arc::new(());
	let weak = Arc::downgrade(&arc);
	let join = spawn(move || {
		let arc = arc; // If the stack dies, the arc dies. 
	    ...
	}
	let mut joins = joins.lock();
	joins.push((weak, join));
	joins.retain(|(weak, _)| Weak::upgrade(weak).is_some());
	...
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it can be solved by locking self.joins in the upper line.

let joins = Arc::clone(&self.joins);
let self_join = self.joins.lock();
let join = spawn(move || { ... });
self_join.insert(id, join);

The variable name is a bit awkward, but I think you'll get the point.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, @remagpie's looks simple. @foriequal0 how about it?

}
}
}

impl Drop for Service {
fn drop(&mut self) {
let mut joins = self.joins.lock();
for (_, join) in joins.drain() {
join.join().unwrap();
}
}
}