@@ -20,9 +20,12 @@ use std::{io::ErrorKind, sync::Arc};
2020
2121use self :: { column:: Column , snapshot:: ManifestItem } ;
2222use crate :: handlers:: http:: base_path_without_preceding_slash;
23+ use crate :: metadata:: STREAM_INFO ;
2324use crate :: metrics:: { EVENTS_INGESTED_DATE , EVENTS_INGESTED_SIZE_DATE , EVENTS_STORAGE_SIZE_DATE } ;
2425use crate :: option:: CONFIG ;
25- use crate :: stats:: { event_labels_date, storage_size_labels_date, update_deleted_stats} ;
26+ use crate :: stats:: {
27+ event_labels_date, get_current_stats, storage_size_labels_date, update_deleted_stats,
28+ } ;
2629use crate :: {
2730 catalog:: manifest:: Manifest ,
2831 event:: DEFAULT_TIMESTAMP_KEY ,
@@ -103,9 +106,8 @@ pub async fn update_snapshot(
103106 change : manifest:: File ,
104107) -> Result < ( ) , ObjectStorageError > {
105108 let mut meta = storage. get_object_store_format ( stream_name) . await ?;
106- let meta_clone = meta. clone ( ) ;
107109 let manifests = & mut meta. snapshot . manifest_list ;
108- let time_partition = & meta_clone . time_partition ;
110+ let time_partition = & meta . time_partition ;
109111 let lower_bound = match time_partition {
110112 Some ( time_partition) => {
111113 let ( lower_bound, _) = get_file_bounds ( & change, time_partition. to_string ( ) ) ;
@@ -174,12 +176,17 @@ pub async fn update_snapshot(
174176 }
175177 }
176178
177- meta. snapshot . manifest_list = manifests. to_vec ( ) ;
178- storage. put_snapshot ( stream_name, meta. snapshot ) . await ?;
179179 if ch {
180180 if let Some ( mut manifest) = storage. get_manifest ( & path) . await ? {
181181 manifest. apply_change ( change) ;
182182 storage. put_manifest ( & path, manifest) . await ?;
183+ let stats = get_current_stats ( stream_name, "json" ) ;
184+ if let Some ( stats) = stats {
185+ meta. stats = stats;
186+ }
187+ meta. snapshot . manifest_list = manifests. to_vec ( ) ;
188+
189+ storage. put_stream_manifest ( stream_name, & meta) . await ?;
183190 } else {
184191 //instead of returning an error, create a new manifest (otherwise local to storage sync fails)
185192 //but don't update the snapshot
@@ -189,7 +196,7 @@ pub async fn update_snapshot(
189196 storage. clone ( ) ,
190197 stream_name,
191198 false ,
192- meta_clone ,
199+ meta ,
193200 events_ingested,
194201 ingestion_size,
195202 storage_size,
@@ -203,7 +210,7 @@ pub async fn update_snapshot(
203210 storage. clone ( ) ,
204211 stream_name,
205212 true ,
206- meta_clone ,
213+ meta ,
207214 events_ingested,
208215 ingestion_size,
209216 storage_size,
@@ -217,7 +224,7 @@ pub async fn update_snapshot(
217224 storage. clone ( ) ,
218225 stream_name,
219226 true ,
220- meta_clone ,
227+ meta ,
221228 events_ingested,
222229 ingestion_size,
223230 storage_size,
@@ -256,6 +263,30 @@ async fn create_manifest(
256263 files : vec ! [ change] ,
257264 ..Manifest :: default ( )
258265 } ;
266+ let mut first_event_at = STREAM_INFO . get_first_event ( stream_name) ?;
267+ if first_event_at. is_none ( ) {
268+ if let Some ( first_event) = manifest. files . first ( ) {
269+ let time_partition = & meta. time_partition ;
270+ let lower_bound = match time_partition {
271+ Some ( time_partition) => {
272+ let ( lower_bound, _) = get_file_bounds ( first_event, time_partition. to_string ( ) ) ;
273+ lower_bound
274+ }
275+ None => {
276+ let ( lower_bound, _) =
277+ get_file_bounds ( first_event, DEFAULT_TIMESTAMP_KEY . to_string ( ) ) ;
278+ lower_bound
279+ }
280+ } ;
281+ first_event_at = Some ( lower_bound. with_timezone ( & Local ) . to_rfc3339 ( ) ) ;
282+ if let Err ( err) = STREAM_INFO . set_first_event_at ( stream_name, first_event_at. clone ( ) ) {
283+ log:: error!(
284+ "Failed to update first_event_at in streaminfo for stream {:?} {err:?}" ,
285+ stream_name
286+ ) ;
287+ }
288+ }
289+ }
259290
260291 let mainfest_file_name = manifest_path ( "" ) . to_string ( ) ;
261292 let path = partition_path ( stream_name, lower_bound, upper_bound) . join ( & mainfest_file_name) ;
@@ -275,7 +306,12 @@ async fn create_manifest(
275306 } ;
276307 manifests. push ( new_snapshot_entry) ;
277308 meta. snapshot . manifest_list = manifests;
278- storage. put_snapshot ( stream_name, meta. snapshot ) . await ?;
309+ let stats = get_current_stats ( stream_name, "json" ) ;
310+ if let Some ( stats) = stats {
311+ meta. stats = stats;
312+ }
313+ meta. first_event_at = first_event_at;
314+ storage. put_stream_manifest ( stream_name, & meta) . await ?;
279315 }
280316
281317 Ok ( ( ) )
@@ -294,6 +330,8 @@ pub async fn remove_manifest_from_snapshot(
294330 let manifests = & mut meta. snapshot . manifest_list ;
295331 // Filter out items whose manifest_path contains any of the dates_to_delete
296332 manifests. retain ( |item| !dates. iter ( ) . any ( |date| item. manifest_path . contains ( date) ) ) ;
333+ meta. first_event_at = None ;
334+ STREAM_INFO . set_first_event_at ( stream_name, None ) ?;
297335 storage. put_snapshot ( stream_name, meta. snapshot ) . await ?;
298336 }
299337 match CONFIG . parseable . mode {
@@ -313,39 +351,48 @@ pub async fn get_first_event(
313351 match CONFIG . parseable . mode {
314352 Mode :: All | Mode :: Ingest => {
315353 // get current snapshot
316- let mut meta = storage. get_object_store_format ( stream_name) . await ?;
317- let manifests = & mut meta. snapshot . manifest_list ;
318- let time_partition = meta. time_partition ;
319- if manifests. is_empty ( ) {
320- log:: info!( "No manifest found for stream {stream_name}" ) ;
321- return Err ( ObjectStorageError :: Custom ( "No manifest found" . to_string ( ) ) ) ;
322- }
323- let manifest = & manifests[ 0 ] ;
324- let path = partition_path (
325- stream_name,
326- manifest. time_lower_bound ,
327- manifest. time_upper_bound ,
328- ) ;
329- let Some ( manifest) = storage. get_manifest ( & path) . await ? else {
330- return Err ( ObjectStorageError :: UnhandledError (
331- "Manifest found in snapshot but not in object-storage"
332- . to_string ( )
333- . into ( ) ,
334- ) ) ;
335- } ;
336- if let Some ( first_event) = manifest. files . first ( ) {
337- let lower_bound = match time_partition {
338- Some ( time_partition) => {
339- let ( lower_bound, _) = get_file_bounds ( first_event, time_partition) ;
340- lower_bound
341- }
342- None => {
343- let ( lower_bound, _) =
344- get_file_bounds ( first_event, DEFAULT_TIMESTAMP_KEY . to_string ( ) ) ;
345- lower_bound
346- }
354+ let stream_first_event = STREAM_INFO . get_first_event ( stream_name) ?;
355+ if stream_first_event. is_some ( ) {
356+ first_event_at = stream_first_event. unwrap ( ) ;
357+ } else {
358+ let mut meta = storage. get_object_store_format ( stream_name) . await ?;
359+ let meta_clone = meta. clone ( ) ;
360+ let manifests = meta_clone. snapshot . manifest_list ;
361+ let time_partition = meta_clone. time_partition ;
362+ if manifests. is_empty ( ) {
363+ log:: info!( "No manifest found for stream {stream_name}" ) ;
364+ return Err ( ObjectStorageError :: Custom ( "No manifest found" . to_string ( ) ) ) ;
365+ }
366+ let manifest = & manifests[ 0 ] ;
367+ let path = partition_path (
368+ stream_name,
369+ manifest. time_lower_bound ,
370+ manifest. time_upper_bound ,
371+ ) ;
372+ let Some ( manifest) = storage. get_manifest ( & path) . await ? else {
373+ return Err ( ObjectStorageError :: UnhandledError (
374+ "Manifest found in snapshot but not in object-storage"
375+ . to_string ( )
376+ . into ( ) ,
377+ ) ) ;
347378 } ;
348- first_event_at = lower_bound. with_timezone ( & Local ) . to_rfc3339 ( ) ;
379+ if let Some ( first_event) = manifest. files . first ( ) {
380+ let lower_bound = match time_partition {
381+ Some ( time_partition) => {
382+ let ( lower_bound, _) = get_file_bounds ( first_event, time_partition) ;
383+ lower_bound
384+ }
385+ None => {
386+ let ( lower_bound, _) =
387+ get_file_bounds ( first_event, DEFAULT_TIMESTAMP_KEY . to_string ( ) ) ;
388+ lower_bound
389+ }
390+ } ;
391+ first_event_at = lower_bound. with_timezone ( & Local ) . to_rfc3339 ( ) ;
392+ meta. first_event_at = Some ( first_event_at. clone ( ) ) ;
393+ storage. put_stream_manifest ( stream_name, & meta) . await ?;
394+ STREAM_INFO . set_first_event_at ( stream_name, Some ( first_event_at. clone ( ) ) ) ?;
395+ }
349396 }
350397 }
351398 Mode :: Query => {
0 commit comments