@@ -21,16 +21,19 @@ mod metadata_migration;
2121mod schema_migration;
2222mod stream_metadata_migration;
2323
24- use std:: fs:: OpenOptions ;
24+ use std:: { fs:: OpenOptions , sync :: Arc } ;
2525
2626use bytes:: Bytes ;
27+ use itertools:: Itertools ;
2728use relative_path:: RelativePathBuf ;
2829use serde:: Serialize ;
2930
3031use crate :: {
3132 option:: Config ,
3233 storage:: {
33- object_storage:: { parseable_json_path, stream_json_path} , ObjectStorage , ObjectStorageError , SCHEMA_FILE_NAME ,
34+ object_storage:: { parseable_json_path, stream_json_path} ,
35+ ObjectStorage , ObjectStorageError , PARSEABLE_METADATA_FILE_NAME , PARSEABLE_ROOT_DIRECTORY ,
36+ SCHEMA_FILE_NAME , STREAM_ROOT_DIRECTORY ,
3437 } ,
3538} ;
3639
@@ -120,7 +123,8 @@ async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow::
120123 . put_object ( & path, to_bytes ( & new_stream_metadata) )
121124 . await ?;
122125
123- let schema_path = RelativePathBuf :: from_iter ( [ stream, SCHEMA_FILE_NAME ] ) ;
126+ let schema_path =
127+ RelativePathBuf :: from_iter ( [ stream, STREAM_ROOT_DIRECTORY , SCHEMA_FILE_NAME ] ) ;
124128 let schema = storage. get_object ( & schema_path) . await ?;
125129 let schema = serde_json:: from_slice ( & schema) . ok ( ) ;
126130 let map = schema_migration:: v1_v3 ( schema) ?;
@@ -132,7 +136,8 @@ async fn migration_stream(stream: &str, storage: &dyn ObjectStorage) -> anyhow::
132136 . put_object ( & path, to_bytes ( & new_stream_metadata) )
133137 . await ?;
134138
135- let schema_path = RelativePathBuf :: from_iter ( [ stream, SCHEMA_FILE_NAME ] ) ;
139+ let schema_path =
140+ RelativePathBuf :: from_iter ( [ stream, STREAM_ROOT_DIRECTORY , SCHEMA_FILE_NAME ] ) ;
136141 let schema = storage. get_object ( & schema_path) . await ?;
137142 let schema = serde_json:: from_slice ( & schema) ?;
138143 let map = schema_migration:: v2_v3 ( schema) ?;
@@ -195,7 +200,6 @@ pub async fn put_remote_metadata(
195200
196201pub fn put_staging_metadata ( config : & Config , metadata : & serde_json:: Value ) -> anyhow:: Result < ( ) > {
197202 let path = parseable_json_path ( ) . to_path ( config. staging_dir ( ) ) ;
198- //config.staging_dir().join(PARSEABLE_METADATA_FILE_NAME);
199203 let mut file = OpenOptions :: new ( )
200204 . create ( true )
201205 . truncate ( true )
@@ -204,3 +208,92 @@ pub fn put_staging_metadata(config: &Config, metadata: &serde_json::Value) -> an
204208 serde_json:: to_writer ( & mut file, metadata) ?;
205209 Ok ( ( ) )
206210}
211+
212+ pub async fn run_file_migration ( config : & Config ) -> anyhow:: Result < ( ) > {
213+ let object_store = config. storage ( ) . get_object_store ( ) ;
214+
215+ let old_meta_file_path = RelativePathBuf :: from ( PARSEABLE_METADATA_FILE_NAME ) ;
216+
217+ // if this errors that means migrations is already done
218+ if let Err ( err) = object_store. get_object ( & old_meta_file_path) . await {
219+ if matches ! ( err, ObjectStorageError :: NoSuchKey ( _) ) {
220+ return Ok ( ( ) ) ;
221+ }
222+ return Err ( err. into ( ) ) ;
223+ }
224+
225+ run_meta_file_migration ( & object_store, old_meta_file_path) . await ?;
226+ run_stream_files_migration ( object_store) . await ?;
227+
228+ Ok ( ( ) )
229+ }
230+
231+ async fn run_meta_file_migration (
232+ object_store : & Arc < dyn ObjectStorage + Send > ,
233+ old_meta_file_path : RelativePathBuf ,
234+ ) -> anyhow:: Result < ( ) > {
235+ log:: info!( "Migrating metadata files to new location" ) ;
236+
237+ // get the list of all meta files
238+ let mut meta_files = object_store. get_ingester_meta_file_paths ( ) . await ?;
239+ meta_files. push ( old_meta_file_path) ;
240+
241+ for file in meta_files {
242+ match object_store. get_object ( & file) . await {
243+ Ok ( bytes) => {
244+ // we can unwrap here because we know the file exists
245+ let new_path = RelativePathBuf :: from_iter ( [
246+ PARSEABLE_ROOT_DIRECTORY ,
247+ file. file_name ( ) . unwrap ( ) ,
248+ ] ) ;
249+ object_store. put_object ( & new_path, bytes) . await ?;
250+ object_store. delete_object ( & file) . await ?;
251+ }
252+ Err ( err) => {
253+ // if error is not a no such key error, something weird happened
254+ // so return the error
255+ if !matches ! ( err, ObjectStorageError :: NoSuchKey ( _) ) {
256+ return Err ( err. into ( ) ) ;
257+ }
258+ }
259+ }
260+ }
261+
262+ Ok ( ( ) )
263+ }
264+
265+ async fn run_stream_files_migration (
266+ object_store : Arc < dyn ObjectStorage + Send > ,
267+ ) -> anyhow:: Result < ( ) > {
268+ let streams = object_store
269+ . list_old_streams ( )
270+ . await ?
271+ . into_iter ( )
272+ . map ( |stream| stream. name )
273+ . collect_vec ( ) ;
274+
275+ for stream in streams {
276+ let paths = object_store. get_stream_file_paths ( & stream) . await ?;
277+
278+ for path in paths {
279+ match object_store. get_object ( & path) . await {
280+ Ok ( bytes) => {
281+ let new_path = RelativePathBuf :: from_iter ( [
282+ stream. as_str ( ) ,
283+ STREAM_ROOT_DIRECTORY ,
284+ path. file_name ( ) . unwrap ( ) ,
285+ ] ) ;
286+ object_store. put_object ( & new_path, bytes) . await ?;
287+ object_store. delete_object ( & path) . await ?;
288+ }
289+ Err ( err) => {
290+ if !matches ! ( err, ObjectStorageError :: NoSuchKey ( _) ) {
291+ return Err ( err. into ( ) ) ;
292+ }
293+ }
294+ }
295+ }
296+ }
297+
298+ Ok ( ( ) )
299+ }
0 commit comments