@@ -153,122 +153,153 @@ async fn migration_stream(
153153) -> anyhow:: Result < Option < LogStreamMetadata > > {
154154 let mut arrow_schema: Schema = Schema :: empty ( ) ;
155155
156- //check if schema exists for the node
157- //if not, create schema from querier schema from storage
158- //if not present with querier, create schema from ingestor schema from storage
156+ let schema = fetch_or_create_schema ( stream, storage) . await ?;
157+ let stream_metadata = fetch_or_create_stream_metadata ( stream, storage) . await ?;
158+
159+ let mut stream_meta_found = true ;
160+ if stream_metadata. is_empty ( ) {
161+ if PARSEABLE . options . mode != Mode :: Ingest {
162+ return Ok ( None ) ;
163+ }
164+ stream_meta_found = false ;
165+ }
166+
167+ let mut stream_metadata_value = Value :: Null ;
168+ if stream_meta_found {
169+ stream_metadata_value =
170+ serde_json:: from_slice ( & stream_metadata) . expect ( "stream.json is valid json" ) ;
171+ stream_metadata_value =
172+ migrate_stream_metadata ( stream_metadata_value, stream, storage, & schema) . await ?;
173+ }
174+
175+ if arrow_schema. fields ( ) . is_empty ( ) {
176+ arrow_schema = serde_json:: from_slice ( & schema) ?;
177+ }
178+
179+ let metadata =
180+ setup_logstream_metadata ( stream, & mut arrow_schema, stream_metadata_value) . await ?;
181+ Ok ( Some ( metadata) )
182+ }
183+
184+ async fn fetch_or_create_schema (
185+ stream : & str ,
186+ storage : & dyn ObjectStorage ,
187+ ) -> anyhow:: Result < Bytes > {
159188 let schema_path = schema_path ( stream) ;
160- let schema = if let Ok ( schema) = storage. get_object ( & schema_path) . await {
161- schema
189+ if let Ok ( schema) = storage. get_object ( & schema_path) . await {
190+ Ok ( schema)
162191 } else {
163192 let querier_schema = storage
164193 . create_schema_from_querier ( stream)
165194 . await
166195 . unwrap_or_default ( ) ;
167196 if !querier_schema. is_empty ( ) {
168- querier_schema
197+ Ok ( querier_schema)
169198 } else {
170- storage
199+ Ok ( storage
171200 . create_schema_from_ingestor ( stream)
172201 . await
173- . unwrap_or_default ( )
202+ . unwrap_or_default ( ) )
174203 }
175- } ;
204+ }
205+ }
176206
177- //check if stream.json exists for the node
178- //if not, create stream.json from querier stream.json from storage
179- //if not present with querier, create from ingestor stream.json from storage
207+ async fn fetch_or_create_stream_metadata (
208+ stream : & str ,
209+ storage : & dyn ObjectStorage ,
210+ ) -> anyhow:: Result < Bytes > {
180211 let path = stream_json_path ( stream) ;
181- let stream_metadata = if let Ok ( stream_metadata) = storage. get_object ( & path) . await {
182- stream_metadata
212+ if let Ok ( stream_metadata) = storage. get_object ( & path) . await {
213+ Ok ( stream_metadata)
183214 } else {
184215 let querier_stream = storage
185216 . create_stream_from_querier ( stream)
186217 . await
187218 . unwrap_or_default ( ) ;
188219 if !querier_stream. is_empty ( ) {
189- querier_stream
220+ Ok ( querier_stream)
190221 } else {
191- storage
222+ Ok ( storage
192223 . create_stream_from_ingestor ( stream)
193224 . await
194- . unwrap_or_default ( )
195- }
196- } ;
197-
198- let mut stream_meta_found = true ;
199- if stream_metadata. is_empty ( ) {
200- if PARSEABLE . options . mode != Mode :: Ingest {
201- return Ok ( None ) ;
225+ . unwrap_or_default ( ) )
202226 }
203- stream_meta_found = false ;
204227 }
205- let mut stream_metadata_value = Value :: Null ;
206- if stream_meta_found {
207- stream_metadata_value =
208- serde_json:: from_slice ( & stream_metadata) . expect ( "stream.json is valid json" ) ;
209- let version = stream_metadata_value
210- . as_object ( )
211- . and_then ( |meta| meta. get ( "version" ) )
212- . and_then ( |version| version. as_str ( ) ) ;
228+ }
213229
214- match version {
215- Some ( "v1" ) => {
216- stream_metadata_value = stream_metadata_migration:: v1_v4 ( stream_metadata_value) ;
217- stream_metadata_value =
218- stream_metadata_migration:: v4_v5 ( stream_metadata_value, stream) ;
219- storage
220- . put_object ( & path, to_bytes ( & stream_metadata_value) )
221- . await ?;
222- let schema = serde_json:: from_slice ( & schema) . ok ( ) ;
223- arrow_schema = schema_migration:: v1_v4 ( schema) ?;
224- storage
225- . put_object ( & schema_path, to_bytes ( & arrow_schema) )
226- . await ?;
227- }
228- Some ( "v2" ) => {
229- stream_metadata_value = stream_metadata_migration:: v2_v4 ( stream_metadata_value) ;
230- stream_metadata_value =
231- stream_metadata_migration:: v4_v5 ( stream_metadata_value, stream) ;
232- storage
233- . put_object ( & path, to_bytes ( & stream_metadata_value) )
234- . await ?;
235-
236- let schema = serde_json:: from_slice ( & schema) ?;
237- arrow_schema = schema_migration:: v2_v4 ( schema) ?;
238- storage
239- . put_object ( & schema_path, to_bytes ( & arrow_schema) )
240- . await ?;
241- }
242- Some ( "v3" ) => {
243- stream_metadata_value = stream_metadata_migration:: v3_v4 ( stream_metadata_value) ;
244- stream_metadata_value =
245- stream_metadata_migration:: v4_v5 ( stream_metadata_value, stream) ;
246- storage
247- . put_object ( & path, to_bytes ( & stream_metadata_value) )
248- . await ?;
249- }
250- Some ( "v4" ) => {
251- stream_metadata_value =
252- stream_metadata_migration:: v4_v5 ( stream_metadata_value, stream) ;
253- storage
254- . put_object ( & path, to_bytes ( & stream_metadata_value) )
255- . await ?;
256- }
257- Some ( "v5" ) => {
258- stream_metadata_value = stream_metadata_migration:: v5_v6 ( stream_metadata_value) ;
259- storage
260- . put_object ( & path, to_bytes ( & stream_metadata_value) )
261- . await ?;
262- }
263- _ => ( ) ,
230+ async fn migrate_stream_metadata (
231+ mut stream_metadata_value : Value ,
232+ stream : & str ,
233+ storage : & dyn ObjectStorage ,
234+ schema : & Bytes ,
235+ ) -> anyhow:: Result < Value > {
236+ let path = stream_json_path ( stream) ;
237+ let schema_path = schema_path ( stream) ;
238+
239+ let version = stream_metadata_value
240+ . as_object ( )
241+ . and_then ( |meta| meta. get ( "version" ) )
242+ . and_then ( |version| version. as_str ( ) ) ;
243+
244+ match version {
245+ Some ( "v1" ) => {
246+ stream_metadata_value = stream_metadata_migration:: v1_v4 ( stream_metadata_value) ;
247+ stream_metadata_value = stream_metadata_migration:: v4_v5 ( stream_metadata_value, stream) ;
248+ stream_metadata_value = stream_metadata_migration:: v5_v6 ( stream_metadata_value) ;
249+ storage
250+ . put_object ( & path, to_bytes ( & stream_metadata_value) )
251+ . await ?;
252+ let schema = serde_json:: from_slice ( schema) . ok ( ) ;
253+ let arrow_schema = schema_migration:: v1_v4 ( schema) ?;
254+ storage
255+ . put_object ( & schema_path, to_bytes ( & arrow_schema) )
256+ . await ?;
257+ }
258+ Some ( "v2" ) => {
259+ stream_metadata_value = stream_metadata_migration:: v2_v4 ( stream_metadata_value) ;
260+ stream_metadata_value = stream_metadata_migration:: v4_v5 ( stream_metadata_value, stream) ;
261+ stream_metadata_value = stream_metadata_migration:: v5_v6 ( stream_metadata_value) ;
262+ storage
263+ . put_object ( & path, to_bytes ( & stream_metadata_value) )
264+ . await ?;
265+ let schema = serde_json:: from_slice ( schema) ?;
266+ let arrow_schema = schema_migration:: v2_v4 ( schema) ?;
267+ storage
268+ . put_object ( & schema_path, to_bytes ( & arrow_schema) )
269+ . await ?;
270+ }
271+ Some ( "v3" ) => {
272+ stream_metadata_value = stream_metadata_migration:: v3_v4 ( stream_metadata_value) ;
273+ stream_metadata_value = stream_metadata_migration:: v4_v5 ( stream_metadata_value, stream) ;
274+ stream_metadata_value = stream_metadata_migration:: v5_v6 ( stream_metadata_value) ;
275+ storage
276+ . put_object ( & path, to_bytes ( & stream_metadata_value) )
277+ . await ?;
278+ }
279+ Some ( "v4" ) => {
280+ stream_metadata_value = stream_metadata_migration:: v4_v5 ( stream_metadata_value, stream) ;
281+ stream_metadata_value = stream_metadata_migration:: v5_v6 ( stream_metadata_value) ;
282+ storage
283+ . put_object ( & path, to_bytes ( & stream_metadata_value) )
284+ . await ?;
285+ }
286+ Some ( "v5" ) => {
287+ stream_metadata_value = stream_metadata_migration:: v5_v6 ( stream_metadata_value) ;
288+ storage
289+ . put_object ( & path, to_bytes ( & stream_metadata_value) )
290+ . await ?;
264291 }
292+ _ => ( ) ,
265293 }
266294
267- if arrow_schema. fields ( ) . is_empty ( ) {
268- arrow_schema = serde_json:: from_slice ( & schema) ?;
269- }
295+ Ok ( stream_metadata_value)
296+ }
270297
271- // Setup logstream meta on startup
298+ async fn setup_logstream_metadata (
299+ stream : & str ,
300+ arrow_schema : & mut Schema ,
301+ stream_metadata_value : Value ,
302+ ) -> anyhow:: Result < LogStreamMetadata > {
272303 let ObjectStoreFormat {
273304 schema_version,
274305 created_at,
@@ -285,19 +316,17 @@ async fn migration_stream(
285316 log_source,
286317 ..
287318 } = serde_json:: from_value ( stream_metadata_value) . unwrap_or_default ( ) ;
319+
288320 let storage = PARSEABLE . storage . get_object_store ( ) ;
289321
290- // update the schema and store it back
291- // NOTE: write could be saved, but the cost is cheap, given the low possibilities of being called multiple times
292- update_data_type_time_partition ( & mut arrow_schema, time_partition. as_ref ( ) ) . await ?;
293- storage. put_schema ( stream, & arrow_schema) . await ?;
294- //load stats from storage
322+ update_data_type_time_partition ( arrow_schema, time_partition. as_ref ( ) ) . await ?;
323+ storage. put_schema ( stream, arrow_schema) . await ?;
295324 fetch_stats_from_storage ( stream, stats) . await ;
296325 load_daily_metrics ( & snapshot. manifest_list , stream) ;
297326
298327 let schema = PARSEABLE
299328 . get_or_create_stream ( stream)
300- . updated_schema ( arrow_schema) ;
329+ . updated_schema ( arrow_schema. clone ( ) ) ;
301330 let schema = HashMap :: from_iter (
302331 schema
303332 . fields
@@ -320,7 +349,7 @@ async fn migration_stream(
320349 log_source,
321350 } ;
322351
323- Ok ( Some ( metadata) )
352+ Ok ( metadata)
324353}
325354
326355#[ inline( always) ]
0 commit comments