1818
1919use std:: { io:: ErrorKind , sync:: Arc } ;
2020
21- use self :: { column:: Column , snapshot:: ManifestItem } ;
22- use crate :: handlers;
23- use crate :: handlers:: http:: base_path_without_preceding_slash;
24- use crate :: metadata:: STREAM_INFO ;
25- use crate :: metrics:: { EVENTS_INGESTED_DATE , EVENTS_INGESTED_SIZE_DATE , EVENTS_STORAGE_SIZE_DATE } ;
26- use crate :: option:: { Mode , CONFIG } ;
27- use crate :: stats:: {
28- event_labels_date, get_current_stats, storage_size_labels_date, update_deleted_stats,
29- } ;
30- use crate :: {
31- catalog:: manifest:: Manifest ,
32- event:: DEFAULT_TIMESTAMP_KEY ,
33- query:: PartialTimeFilter ,
34- storage:: { object_storage:: manifest_path, ObjectStorage , ObjectStorageError } ,
35- } ;
3621use chrono:: { DateTime , Local , NaiveTime , Utc } ;
22+ use column:: Column ;
23+ use manifest:: Manifest ;
3724use relative_path:: RelativePathBuf ;
25+ use snapshot:: ManifestItem ;
3826use std:: io:: Error as IOError ;
3927use tracing:: { error, info} ;
28+
29+ use crate :: {
30+ event:: DEFAULT_TIMESTAMP_KEY ,
31+ handlers:: { self , http:: base_path_without_preceding_slash} ,
32+ metrics:: { EVENTS_INGESTED_DATE , EVENTS_INGESTED_SIZE_DATE , EVENTS_STORAGE_SIZE_DATE } ,
33+ option:: Mode ,
34+ parseable:: PARSEABLE ,
35+ query:: PartialTimeFilter ,
36+ stats:: { event_labels_date, get_current_stats, storage_size_labels_date, update_deleted_stats} ,
37+ storage:: {
38+ object_storage:: manifest_path, ObjectStorage , ObjectStorageError , ObjectStoreFormat ,
39+ } ,
40+ } ;
41+
4042pub mod column;
4143pub mod manifest;
4244pub mod snapshot;
43- use crate :: storage:: ObjectStoreFormat ;
44- pub use manifest:: create_from_parquet_file;
4545pub trait Snapshot {
4646 fn manifests ( & self , time_predicates : & [ PartialTimeFilter ] ) -> Vec < ManifestItem > ;
4747}
@@ -263,7 +263,7 @@ async fn create_manifest(
263263 files : vec ! [ change] ,
264264 ..Manifest :: default ( )
265265 } ;
266- let mut first_event_at = STREAM_INFO . get_first_event ( stream_name) ?;
266+ let mut first_event_at = PARSEABLE . streams . get_first_event ( stream_name) ?;
267267 if first_event_at. is_none ( ) {
268268 if let Some ( first_event) = manifest. files . first ( ) {
269269 let time_partition = & meta. time_partition ;
@@ -279,8 +279,9 @@ async fn create_manifest(
279279 }
280280 } ;
281281 first_event_at = Some ( lower_bound. with_timezone ( & Local ) . to_rfc3339 ( ) ) ;
282- if let Err ( err) =
283- STREAM_INFO . set_first_event_at ( stream_name, first_event_at. as_ref ( ) . unwrap ( ) )
282+ if let Err ( err) = PARSEABLE
283+ . streams
284+ . set_first_event_at ( stream_name, first_event_at. as_ref ( ) . unwrap ( ) )
284285 {
285286 error ! (
286287 "Failed to update first_event_at in streaminfo for stream {:?} {err:?}" ,
@@ -332,11 +333,11 @@ pub async fn remove_manifest_from_snapshot(
332333 let manifests = & mut meta. snapshot . manifest_list ;
333334 // Filter out items whose manifest_path contains any of the dates_to_delete
334335 manifests. retain ( |item| !dates. iter ( ) . any ( |date| item. manifest_path . contains ( date) ) ) ;
335- STREAM_INFO . reset_first_event_at ( stream_name) ?;
336+ PARSEABLE . streams . reset_first_event_at ( stream_name) ?;
336337 meta. first_event_at = None ;
337338 storage. put_snapshot ( stream_name, meta. snapshot ) . await ?;
338339 }
339- match CONFIG . options . mode {
340+ match PARSEABLE . options . mode {
340341 Mode :: All | Mode :: Ingest => {
341342 Ok ( get_first_event ( storage. clone ( ) , stream_name, Vec :: new ( ) ) . await ?)
342343 }
@@ -350,10 +351,10 @@ pub async fn get_first_event(
350351 dates : Vec < String > ,
351352) -> Result < Option < String > , ObjectStorageError > {
352353 let mut first_event_at: String = String :: default ( ) ;
353- match CONFIG . options . mode {
354+ match PARSEABLE . options . mode {
354355 Mode :: All | Mode :: Ingest => {
355356 // get current snapshot
356- let stream_first_event = STREAM_INFO . get_first_event ( stream_name) ?;
357+ let stream_first_event = PARSEABLE . streams . get_first_event ( stream_name) ?;
357358 if stream_first_event. is_some ( ) {
358359 first_event_at = stream_first_event. unwrap ( ) ;
359360 } else {
@@ -393,7 +394,9 @@ pub async fn get_first_event(
393394 first_event_at = lower_bound. with_timezone ( & Local ) . to_rfc3339 ( ) ;
394395 meta. first_event_at = Some ( first_event_at. clone ( ) ) ;
395396 storage. put_stream_manifest ( stream_name, & meta) . await ?;
396- STREAM_INFO . set_first_event_at ( stream_name, & first_event_at) ?;
397+ PARSEABLE
398+ . streams
399+ . set_first_event_at ( stream_name, & first_event_at) ?;
397400 }
398401 }
399402 }
0 commit comments