1717 */
1818
1919use crate :: alerts:: Alerts ;
20- use crate :: metadata:: Stats ;
20+ use crate :: metadata:: { Stats , STREAM_INFO } ;
2121use crate :: option:: CONFIG ;
2222use crate :: query:: Query ;
2323use crate :: utils;
@@ -32,7 +32,7 @@ use std::fmt::Debug;
3232use std:: fs;
3333use std:: io;
3434use std:: iter:: Iterator ;
35- use std:: path:: Path ;
35+ use std:: path:: { Path , PathBuf } ;
3636
3737extern crate walkdir;
3838use walkdir:: WalkDir ;
@@ -72,34 +72,31 @@ pub trait ObjectStorage: Sync + 'static {
7272 return Ok ( ( ) ) ;
7373 }
7474
75- let entries = fs:: read_dir ( & CONFIG . parseable . local_disk_path ) ?
76- . map ( |res| res. map ( |e| e. path ( ) ) )
77- . collect :: < Result < Vec < _ > , io:: Error > > ( ) ?;
75+ let streams = STREAM_INFO . list_streams ( ) ;
7876
7977 // entries here means all the streams present on local disk
80- for entry in entries {
81- let path = entry. into_os_string ( ) . into_string ( ) . unwrap ( ) ;
82- let init_sync = StorageSync :: new ( path) ;
78+ for stream in streams {
79+ let sync = StorageSync :: new ( stream. clone ( ) ) ;
8380
8481 // if data.parquet file not present, skip this stream
85- if !init_sync . parquet_path_exists ( ) {
82+ if !sync . dir . parquet_path_exists ( ) {
8683 continue ;
8784 }
8885
89- let dir = init_sync. get_dir_name ( ) ;
90- if let Err ( e) = dir. create_dir_name_tmp ( ) {
86+ if let Err ( e) = sync. dir . create_temp_dir ( ) {
9187 log:: error!(
92- "Error copying parquet file {} due to error [{}]" ,
93- dir . parquet_path ,
88+ "Error creating tmp directory for {} due to error [{}]" ,
89+ & stream ,
9490 e
9591 ) ;
9692 continue ;
9793 }
9894
99- if let Err ( e) = dir . move_parquet_to_tmp ( ) {
95+ if let Err ( e) = sync . move_parquet_to_temp ( ) {
10096 log:: error!(
101- "Error copying parquet from stream dir to tmp in path {} due to error [{}]" ,
102- dir. dir_name_local,
97+ "Error copying parquet from stream directory in [{}] to tmp directory [{}] due to error [{}]" ,
98+ sync. dir. data_path. to_string_lossy( ) ,
99+ sync. dir. temp_dir. to_string_lossy( ) ,
103100 e
104101 ) ;
105102 continue ;
@@ -114,35 +111,30 @@ pub trait ObjectStorage: Sync + 'static {
114111 return Ok ( ( ) ) ;
115112 }
116113
117- let entries = fs:: read_dir ( & CONFIG . parseable . local_disk_path ) ?
118- . map ( |res| res. map ( |e| e. path ( ) ) )
119- . collect :: < Result < Vec < _ > , io:: Error > > ( ) ?;
114+ let streams = STREAM_INFO . list_streams ( ) ;
120115
121- for entry in entries {
122- let path = entry. into_os_string ( ) . into_string ( ) . unwrap ( ) ;
123- let init_sync = StorageSync :: new ( path) ;
116+ for stream in streams {
117+ let dir = StorageDir :: new ( stream. clone ( ) ) ;
124118
125- let dir = init_sync . get_dir_name ( ) ;
126-
127- for file in WalkDir :: new ( & format ! ( "{}/tmp" , & dir . dir_name_local ) )
119+ for file in WalkDir :: new ( dir . temp_dir )
120+ . min_depth ( 1 )
121+ . max_depth ( 1 )
128122 . into_iter ( )
129123 . filter_map ( |file| file. ok ( ) )
124+ . map ( |file| file. path ( ) . to_path_buf ( ) )
125+ . filter ( |file| file. is_file ( ) )
130126 {
131- if file. metadata ( ) . unwrap ( ) . is_file ( ) {
132- let file_local = format ! ( "{}" , file. path( ) . display( ) ) ;
133- let file_s3 = file_local. replace ( "/tmp" , "" ) ;
134- let final_s3_path =
135- file_s3. replace ( & format ! ( "{}/" , CONFIG . parseable. local_disk_path) , "" ) ;
136- let f_path = str:: replace ( & final_s3_path, "." , "/" ) ;
137- let f_new_path = f_path. replace ( "/parquet" , ".parquet" ) ;
138- let _put_parquet_file = self . upload_file ( & f_new_path, & file_local) . await ?;
139- if let Err ( e) = dir. delete_parquet_file ( file_local. clone ( ) ) {
140- log:: error!(
141- "Error deleting parquet file in path {} due to error [{}]" ,
142- file_local,
143- e
144- ) ;
145- }
127+ let filename = file. file_name ( ) . unwrap ( ) . to_str ( ) . unwrap ( ) ;
128+ let file_suffix = str:: replacen ( filename, "." , "/" , 3 ) ;
129+ let s3_path = format ! ( "{}/{}" , stream, file_suffix) ;
130+
131+ let _put_parquet_file = self . upload_file ( & s3_path, file. to_str ( ) . unwrap ( ) ) . await ?;
132+ if let Err ( e) = fs:: remove_file ( & file) {
133+ log:: error!(
134+ "Error deleting parquet file in path {} due to error [{}]" ,
135+ file. to_string_lossy( ) ,
136+ e
137+ ) ;
146138 }
147139 }
148140 }
@@ -156,82 +148,59 @@ pub struct LogStream {
156148}
157149
158150#[ derive( Debug ) ]
159- struct DirName {
160- dir_name_tmp_local : String ,
161- dir_name_local : String ,
162- parquet_path : String ,
163- parquet_file_local : String ,
151+ struct StorageDir {
152+ pub data_path : PathBuf ,
153+ pub temp_dir : PathBuf ,
164154}
165155
166- impl DirName {
167- fn move_parquet_to_tmp ( & self ) -> io:: Result < ( ) > {
168- fs:: rename (
169- & self . parquet_path ,
170- format ! ( "{}/{}" , self . dir_name_tmp_local, self . parquet_file_local) ,
171- )
156+ impl StorageDir {
157+ fn new ( stream_name : String ) -> Self {
158+ let data_path = CONFIG . parseable . local_stream_data_path ( & stream_name) ;
159+ let temp_dir = data_path. join ( "tmp" ) ;
160+
161+ Self {
162+ data_path,
163+ temp_dir,
164+ }
172165 }
173166
174- fn create_dir_name_tmp ( & self ) -> io:: Result < ( ) > {
175- fs:: create_dir_all ( & self . dir_name_tmp_local )
167+ fn create_temp_dir ( & self ) -> io:: Result < ( ) > {
168+ fs:: create_dir_all ( & self . temp_dir )
176169 }
177170
178- fn delete_parquet_file ( & self , path : String ) -> io:: Result < ( ) > {
179- fs:: remove_file ( path)
171+ fn move_parquet_to_temp ( & self , filename : String ) -> io:: Result < ( ) > {
172+ fs:: rename (
173+ self . data_path . join ( "data.parquet" ) ,
174+ self . temp_dir . join ( filename) ,
175+ )
176+ }
177+
178+ fn parquet_path_exists ( & self ) -> bool {
179+ self . data_path . join ( "data.parquet" ) . exists ( )
180180 }
181181}
182182
183183struct StorageSync {
184- path : String ,
184+ pub dir : StorageDir ,
185185 time : chrono:: DateTime < Utc > ,
186186}
187187
188188impl StorageSync {
189- fn new ( path : String ) -> Self {
190- Self {
191- path,
192- time : Utc :: now ( ) ,
193- }
189+ fn new ( stream_name : String ) -> Self {
190+ let dir = StorageDir :: new ( stream_name) ;
191+ let time = Utc :: now ( ) ;
192+ Self { dir, time }
194193 }
195194
196- fn parquet_path_exists ( & self ) -> bool {
197- let new_parquet_path = format ! ( "{}/data.parquet" , & self . path) ;
198-
199- Path :: new ( & new_parquet_path) . exists ( )
200- }
201-
202- fn get_dir_name ( & self ) -> DirName {
203- let local_path = format ! ( "{}/" , CONFIG . parseable. local_disk_path) ;
204- let _storage_path = format ! ( "{}/" , CONFIG . storage. bucket_name( ) ) ;
205- let stream_name = self . path . replace ( & local_path, "" ) ;
206- let parquet_path = format ! ( "{}/data.parquet" , self . path) ;
207- // subtract OBJECT_STORE_DATA_GRANULARITY from current time here,
208- // this is because, when we're creating this file
209- // the data in the file is from OBJECT_STORE_DATA_GRANULARITY time ago.
195+ fn move_parquet_to_temp ( & self ) -> io:: Result < ( ) > {
210196 let time = self . time - Duration :: minutes ( OBJECT_STORE_DATA_GRANULARITY as i64 ) ;
211197 let uri = utils:: date_to_prefix ( time. date ( ) )
212198 + & utils:: hour_to_prefix ( time. hour ( ) )
213199 + & utils:: minute_to_prefix ( time. minute ( ) , OBJECT_STORE_DATA_GRANULARITY ) . unwrap ( ) ;
214-
215200 let local_uri = str:: replace ( & uri, "/" , "." ) ;
216-
217- let dir_name_tmp_local = format ! ( "{}{}/tmp" , local_path, stream_name) ;
218-
219- let storage_dir_name_s3 = format ! ( "{}/{}" , stream_name, uri) ;
220-
221- let random_string = utils:: random_string ( ) ;
222-
223- let parquet_file_local = format ! ( "{}{}.parquet" , local_uri, random_string) ;
224-
225- let _parquet_file_s3 = format ! ( "{}{}.parquet" , storage_dir_name_s3, random_string) ;
226-
227- let dir_name_local = local_path + & stream_name;
228-
229- DirName {
230- dir_name_tmp_local,
231- dir_name_local,
232- parquet_path,
233- parquet_file_local,
234- }
201+ let hostname = utils:: hostname_unchecked ( ) ;
202+ let parquet_file_local = format ! ( "{}{}.data.parquet" , local_uri, hostname) ;
203+ self . dir . move_parquet_to_temp ( parquet_file_local)
235204 }
236205}
237206
0 commit comments