1414// You should have received a copy of the GNU Affero General Public License
1515// along with this program. If not, see <https://www.gnu.org/licenses/>.
1616
17- use std:: fs:: { create_dir_all, File } ;
17+
18+ use std:: fs;
1819use std:: path:: PathBuf ;
20+ use std:: str:: FromStr ;
1921use std:: sync:: Arc ;
2022use std:: thread:: { spawn, JoinHandle } ;
2123
2224use ccore:: snapshot_notify:: { NotifyReceiverSource , ReceiverCanceller } ;
23- use ccore:: { BlockChainTrait , BlockId , Client } ;
25+ use ccore:: { BlockChainClient , BlockChainTrait , BlockId , Client } ;
2426use cdb:: { AsHashDB , HashDB } ;
2527use cmerkle:: snapshot:: { ChunkCompressor , Error as SnapshotError , Snapshot } ;
2628use ctypes:: BlockHash ;
2729use primitives:: H256 ;
28- use std:: ops:: Deref ;
2930
3031pub struct Service {
3132 join_handle : Option < JoinHandle < ( ) > > ,
3233 canceller : Option < ReceiverCanceller > ,
3334}
3435
36+ pub fn snapshot_dir ( root_dir : & str , block : & BlockHash ) -> PathBuf {
37+ let mut path = PathBuf :: new ( ) ;
38+ path. push ( root_dir) ;
39+ path. push ( format ! ( "{:x}" , * * block) ) ;
40+ path
41+ }
42+
43+ pub fn snapshot_path ( root_dir : & str , block : & BlockHash , chunk_root : & H256 ) -> PathBuf {
44+ let mut path = snapshot_dir ( root_dir, block) ;
45+ path. push ( format ! ( "{:x}" , chunk_root) ) ;
46+ path
47+ }
48+
3549impl Service {
36- pub fn new ( client : Arc < Client > , notify_receiver_source : NotifyReceiverSource , root_dir : String ) -> Self {
50+ pub fn new (
51+ client : Arc < Client > ,
52+ notify_receiver_source : NotifyReceiverSource ,
53+ root_dir : String ,
54+ expiration : Option < u64 > ,
55+ ) -> Self {
3756 let NotifyReceiverSource ( canceller, receiver) = notify_receiver_source;
3857 let join_handle = spawn ( move || {
3958 cinfo ! ( SYNC , "Snapshot service is on" ) ;
@@ -45,17 +64,25 @@ impl Service {
4564 cerror ! ( SYNC , "There isn't corresponding header for the requested block hash: {}" , block_hash, ) ;
4665 continue
4766 } ;
48- let db_lock = client. state_db ( ) . read ( ) ;
49- if let Some ( err) = snapshot ( db_lock. as_hashdb ( ) , block_hash, state_root, & root_dir) . err ( ) {
50- cerror ! (
51- SYNC ,
52- "Snapshot request failed for block: {}, chunk_root: {}, err: {}" ,
53- block_hash,
54- state_root,
55- err
56- ) ;
57- } else {
58- cinfo ! ( SYNC , "Snapshot is ready for block: {}" , block_hash)
67+ {
68+ let db_lock = client. state_db ( ) . read ( ) ;
69+ if let Err ( err) = snapshot ( db_lock. as_hashdb ( ) , block_hash, state_root, & root_dir) {
70+ cerror ! (
71+ SYNC ,
72+ "Snapshot request failed for block: {}, chunk_root: {}, err: {}" ,
73+ block_hash,
74+ state_root,
75+ err
76+ ) ;
77+ } else {
78+ cinfo ! ( SYNC , "Snapshot is ready for block: {}" , block_hash)
79+ }
80+ }
81+
82+ if let Some ( expiration) = expiration {
83+ if let Err ( err) = cleanup_expired ( & client, & root_dir, expiration) {
84+ cerror ! ( SYNC , "Snapshot cleanup error after block hash {}, err: {}" , block_hash, err) ;
85+ }
5986 }
6087 // TODO: Prune old snapshots
6188 }
@@ -70,25 +97,81 @@ impl Service {
7097}
7198
7299fn snapshot ( db : & dyn HashDB , block_hash : BlockHash , chunk_root : H256 , root_dir : & str ) -> Result < ( ) , SnapshotError > {
73- let snapshot_dir = {
74- let mut res = PathBuf :: new ( ) ;
75- res. push ( root_dir) ;
76- res. push ( format ! ( "{:x}" , block_hash. deref( ) ) ) ;
77- res
78- } ;
79- create_dir_all ( & snapshot_dir) ?;
100+ let snapshot_dir = snapshot_dir ( root_dir, & block_hash) ;
101+ fs:: create_dir_all ( snapshot_dir) ?;
80102
81103 for chunk in Snapshot :: from_hashdb ( db, chunk_root) {
82- let mut chunk_path = snapshot_dir. clone ( ) ;
83- chunk_path. push ( format ! ( "{:x}" , chunk. root) ) ;
84- let chunk_file = File :: create ( chunk_path) ?;
104+ let chunk_path = snapshot_path ( root_dir, & block_hash, & chunk. root ) ;
105+ let chunk_file = fs:: File :: create ( chunk_path) ?;
85106 let compressor = ChunkCompressor :: new ( chunk_file) ;
86107 compressor. compress_chunk ( & chunk) ?;
87108 }
88109
89110 Ok ( ( ) )
90111}
91112
113+ fn cleanup_expired ( client : & Client , root_dir : & str , expiration : u64 ) -> Result < ( ) , SnapshotError > {
114+ for entry in fs:: read_dir ( root_dir) ? {
115+ let entry = match entry {
116+ Ok ( entry) => entry,
117+ Err ( err) => {
118+ cerror ! ( SYNC , "Snapshot cleanup can't retrieve entry. err: {}" , err) ;
119+ continue
120+ }
121+ } ;
122+ let path = entry. path ( ) ;
123+
124+ match entry. file_type ( ) . map ( |x| x. is_dir ( ) ) {
125+ Ok ( true ) => { }
126+ Ok ( false ) => continue ,
127+ Err ( err) => {
128+ cerror ! ( SYNC , "Snapshot cleanup can't retrieve file info: {}, err: {}" , path. to_string_lossy( ) , err) ;
129+ continue
130+ }
131+ }
132+
133+ let name = match path. file_name ( ) . expect ( "Directories always have file name" ) . to_str ( ) {
134+ Some ( n) => n,
135+ None => continue ,
136+ } ;
137+ let hash = match H256 :: from_str ( name) {
138+ Ok ( h) => BlockHash :: from ( h) ,
139+ Err ( _) => continue ,
140+ } ;
141+ let number = if let Some ( number) = client. block_number ( & BlockId :: Hash ( hash) ) {
142+ number
143+ } else {
144+ cerror ! ( SYNC , "Snapshot cleanup can't retrieve block number for block_hash: {}" , hash) ;
145+ continue
146+ } ;
147+
148+ if number + expiration < client. best_block_header ( ) . number ( ) {
149+ cleanup_snapshot ( root_dir, hash)
150+ }
151+ }
152+ Ok ( ( ) )
153+ }
154+
155+ /// Remove all files in `root_dir/block_hash`
156+ fn cleanup_snapshot ( root_dir : & str , block_hash : BlockHash ) {
157+ let path = snapshot_dir ( root_dir, & block_hash) ;
158+ let rename_to = PathBuf :: from ( root_dir) . join ( format ! ( "{:x}.old" , * block_hash) ) ;
159+ // It is okay to ignore errors. We just wanted them to be removed.
160+ match fs:: rename ( path, & rename_to) {
161+ Ok ( ( ) ) => { }
162+ Err ( err) => {
163+ cerror ! ( SYNC , "Snapshot cleanup: renaming {} failed, reason: {}" , block_hash, err) ;
164+ }
165+ }
166+ // Ignore the error. Cleanup failure is not a critical error.
167+ match fs:: remove_dir_all ( rename_to) {
168+ Ok ( ( ) ) => { }
169+ Err ( err) => {
170+ cerror ! ( SYNC , "Snapshot cleanup: removing {} failed, reason: {}" , block_hash, err) ;
171+ }
172+ }
173+ }
174+
92175impl Drop for Service {
93176 fn drop ( & mut self ) {
94177 if let Some ( canceller) = self . canceller . take ( ) {
0 commit comments