1414// You should have received a copy of the GNU Affero General Public License
1515// along with this program. If not, see <https://www.gnu.org/licenses/>.
1616
17- use std:: fs:: { create_dir_all, File } ;
17+
18+ use std:: fs;
19+ use std:: ops:: Deref ;
1820use std:: path:: PathBuf ;
21+ use std:: str:: FromStr ;
1922use std:: sync:: Arc ;
2023use std:: thread:: { spawn, JoinHandle } ;
2124
2225use ccore:: snapshot_notify:: { NotifyReceiverSource , ReceiverCanceller } ;
23- use ccore:: { BlockChainTrait , BlockId , Client } ;
26+ use ccore:: { BlockChainClient , BlockChainTrait , BlockId , Client } ;
2427use cmerkle:: snapshot:: { ChunkCompressor , Error as SnapshotError , Snapshot } ;
2528use ctypes:: BlockHash ;
2629use hashdb:: { AsHashDB , HashDB } ;
2730use primitives:: H256 ;
28- use std:: ops:: Deref ;
2931
3032pub struct Service {
3133 join_handle : Option < JoinHandle < ( ) > > ,
@@ -46,7 +48,12 @@ pub fn snapshot_path(root_dir: &str, block: &BlockHash, chunk_root: &H256) -> Pa
4648}
4749
4850impl Service {
49- pub fn new ( client : Arc < Client > , notify_receiver_source : NotifyReceiverSource , root_dir : String ) -> Self {
51+ pub fn new (
52+ client : Arc < Client > ,
53+ notify_receiver_source : NotifyReceiverSource ,
54+ root_dir : String ,
55+ expiration : Option < u64 > ,
56+ ) -> Self {
5057 let NotifyReceiverSource ( canceller, receiver) = notify_receiver_source;
5158 let join_handle = spawn ( move || {
5259 cinfo ! ( SYNC , "Snapshot service is on" ) ;
@@ -58,19 +65,26 @@ impl Service {
5865 cerror ! ( SYNC , "There isn't corresponding header for the requested block hash: {}" , block_hash, ) ;
5966 continue
6067 } ;
61- let db_lock = client. state_db ( ) . read ( ) ;
62- if let Some ( err) = snapshot ( db_lock. as_hashdb ( ) , block_hash, state_root, & root_dir) . err ( ) {
63- cerror ! (
64- SYNC ,
65- "Snapshot request failed for block: {}, chunk_root: {}, err: {}" ,
66- block_hash,
67- state_root,
68- err
69- ) ;
70- } else {
71- cinfo ! ( SYNC , "Snapshot is ready for block: {}" , block_hash)
68+ {
69+ let db_lock = client. state_db ( ) . read ( ) ;
70+ if let Err ( err) = snapshot ( db_lock. as_hashdb ( ) , block_hash, state_root, & root_dir) {
71+ cerror ! (
72+ SYNC ,
73+ "Snapshot request failed for block: {}, chunk_root: {}, err: {}" ,
74+ block_hash,
75+ state_root,
76+ err
77+ ) ;
78+ } else {
79+ cinfo ! ( SYNC , "Snapshot is ready for block: {}" , block_hash)
80+ }
81+ }
82+
83+ if let Some ( expiration) = expiration {
84+ if let Err ( err) = cleanup_expired ( & client, & root_dir, expiration) {
85+ cerror ! ( SYNC , "Snapshot cleanup error after block hash {}, err: {}" , block_hash, err) ;
86+ }
7287 }
73- // TODO: Prune old snapshots
7488 }
7589 cinfo ! ( SYNC , "Snapshot service is stopped" )
7690 } ) ;
@@ -84,18 +98,80 @@ impl Service {
8498
8599fn snapshot ( db : & dyn HashDB , block_hash : BlockHash , chunk_root : H256 , root_dir : & str ) -> Result < ( ) , SnapshotError > {
86100 let snapshot_dir = snapshot_dir ( root_dir, & block_hash) ;
87- create_dir_all ( snapshot_dir) ?;
101+ fs :: create_dir_all ( snapshot_dir) ?;
88102
89103 for chunk in Snapshot :: from_hashdb ( db, chunk_root) {
90104 let chunk_path = snapshot_path ( root_dir, & block_hash, & chunk. root ) ;
91- let chunk_file = File :: create ( chunk_path) ?;
105+ let chunk_file = fs :: File :: create ( chunk_path) ?;
92106 let compressor = ChunkCompressor :: new ( chunk_file) ;
93107 compressor. compress_chunk ( & chunk) ?;
94108 }
95109
96110 Ok ( ( ) )
97111}
98112
113+ fn cleanup_expired ( client : & Client , root_dir : & str , expiration : u64 ) -> Result < ( ) , SnapshotError > {
114+ for entry in fs:: read_dir ( root_dir) ? {
115+ let entry = match entry {
116+ Ok ( entry) => entry,
117+ Err ( err) => {
118+ cerror ! ( SYNC , "Snapshot cleanup can't retrieve entry. err: {}" , err) ;
119+ continue
120+ }
121+ } ;
122+ let path = entry. path ( ) ;
123+
124+ match entry. file_type ( ) . map ( |x| x. is_dir ( ) ) {
125+ Ok ( true ) => { }
126+ Ok ( false ) => continue ,
127+ Err ( err) => {
128+ cerror ! ( SYNC , "Snapshot cleanup can't retrieve file info: {}, err: {}" , path. to_string_lossy( ) , err) ;
129+ continue
130+ }
131+ }
132+
133+ let name = match path. file_name ( ) . expect ( "Directories always have file name" ) . to_str ( ) {
134+ Some ( n) => n,
135+ None => continue ,
136+ } ;
137+ let hash = match H256 :: from_str ( name) {
138+ Ok ( h) => BlockHash :: from ( h) ,
139+ Err ( _) => continue ,
140+ } ;
141+ let number = if let Some ( number) = client. block_number ( & BlockId :: Hash ( hash) ) {
142+ number
143+ } else {
144+ cerror ! ( SYNC , "Snapshot cleanup can't retrieve block number for block_hash: {}" , hash) ;
145+ continue
146+ } ;
147+
148+ if number + expiration < client. best_block_header ( ) . number ( ) {
149+ cleanup_snapshot ( root_dir, hash)
150+ }
151+ }
152+ Ok ( ( ) )
153+ }
154+
155+ /// Remove all files in `root_dir/block_hash`
156+ fn cleanup_snapshot ( root_dir : & str , block_hash : BlockHash ) {
157+ let path = snapshot_dir ( root_dir, & block_hash) ;
158+ let rename_to = PathBuf :: from ( root_dir) . join ( format ! ( "{:x}.old" , block_hash. deref( ) ) ) ;
159+ // It is okay to ignore errors. We just wanted them to be removed.
160+ match fs:: rename ( path, & rename_to) {
161+ Ok ( ( ) ) => { }
162+ Err ( err) => {
163+ cerror ! ( SYNC , "Snapshot cleanup: renaming {} failed, reason: {}" , block_hash, err) ;
164+ }
165+ }
166+ // Ignore the error. Cleanup failure is not a critical error.
167+ match fs:: remove_dir_all ( rename_to) {
168+ Ok ( ( ) ) => { }
169+ Err ( err) => {
170+ cerror ! ( SYNC , "Snapshot cleanup: removing {} failed, reason: {}" , block_hash, err) ;
171+ }
172+ }
173+ }
174+
99175impl Drop for Service {
100176 fn drop ( & mut self ) {
101177 if let Some ( canceller) = self . canceller . take ( ) {
0 commit comments