From adbae200b3dff488d1d88f84d884605389a0d0a0 Mon Sep 17 00:00:00 2001 From: gurjotkaur20 Date: Mon, 22 Jan 2024 12:33:15 +0530 Subject: [PATCH 1/6] fix: add stream creation time in get stats api --- server/src/handlers/http/logstream.rs | 17 ++++++++++++++++- server/src/metadata.rs | 10 +++++++++- 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index ebcd41189..e5c5a224b 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -268,10 +268,17 @@ pub async fn get_stats(req: HttpRequest) -> Result let stats = stats::get_current_stats(&stream_name, "json") .ok_or(StreamError::StreamNotFound(stream_name.clone()))?; + let hash_map = STREAM_INFO.read().unwrap(); + let stream_creation_time = &hash_map + .get(&stream_name) + .ok_or(StreamError::StreamNotFound(stream_name.clone()))? + .created_at; + let time = Utc::now(); let stats = serde_json::json!({ "stream": stream_name, + "creation_time": stream_creation_time, "time": time, "ingestion": { "count": stats.events, @@ -307,7 +314,15 @@ pub async fn create_stream(stream_name: String) -> Result<(), CreateStreamError> if let Err(err) = storage.create_stream(&stream_name).await { return Err(CreateStreamError::Storage { stream_name, err }); } - metadata::STREAM_INFO.add_stream(stream_name.to_string()); + + let stream_meta = CONFIG + .storage() + .get_object_store() + .get_stream_metadata(&stream_name) + .await; + let created_at = stream_meta.unwrap().created_at; + + metadata::STREAM_INFO.add_stream(stream_name.to_string(), created_at); Ok(()) } diff --git a/server/src/metadata.rs b/server/src/metadata.rs index b57cc710a..3f12bcc61 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -18,6 +18,7 @@ use arrow_array::RecordBatch; use arrow_schema::{Field, Fields, Schema}; +use chrono::Local; use itertools::Itertools; use once_cell::sync::Lazy; use std::collections::HashMap; @@ -43,6 +44,7 @@ pub struct LogStreamMetadata { pub schema: HashMap>, pub alerts: Alerts, pub cache_enabled: bool, + pub created_at: String, } // It is very unlikely that panic will occur when dealing with metadata. @@ -126,9 +128,14 @@ impl StreamInfo { }) } - pub fn add_stream(&self, stream_name: String) { + pub fn add_stream(&self, stream_name: String, created_at: String) { let mut map = self.write().expect(LOCK_EXPECT); let metadata = LogStreamMetadata { + created_at: if created_at.is_empty() { + Local::now().to_rfc3339() + } else { + created_at.clone() + }, ..Default::default() }; map.insert(stream_name, metadata); @@ -162,6 +169,7 @@ impl StreamInfo { schema, alerts, cache_enabled: meta.cache_enabled, + created_at: meta.created_at, }; let mut map = self.write().expect(LOCK_EXPECT); From 4c5228e3f8422eab83f47485c364e5daef3ac0cd Mon Sep 17 00:00:00 2001 From: gurjotkaur20 Date: Sun, 11 Feb 2024 14:09:28 +0530 Subject: [PATCH 2/6] fix: adding first_event_at timestamp in the stats api --- server/src/catalog.rs | 98 ++++++++++++++++++++------- server/src/handlers/http/ingest.rs | 38 +++++++++++ server/src/handlers/http/logstream.rs | 10 +-- server/src/metadata.rs | 15 ++++ server/src/storage.rs | 3 + server/src/storage/object_storage.rs | 17 +++++ server/src/storage/retention.rs | 32 ++++++++- 7 files changed, 184 insertions(+), 29 deletions(-) diff --git a/server/src/catalog.rs b/server/src/catalog.rs index f8adad1ca..91d93d430 100644 --- a/server/src/catalog.rs +++ b/server/src/catalog.rs @@ -18,7 +18,7 @@ use std::sync::Arc; -use chrono::{DateTime, NaiveDateTime, NaiveTime, Utc}; +use chrono::{DateTime, Local, NaiveDateTime, NaiveTime, Utc}; use relative_path::RelativePathBuf; use crate::{ @@ -69,33 +69,33 @@ impl ManifestFile for manifest::File { } } +fn get_file_bounds(file: &manifest::File) -> (DateTime, DateTime) { + match file + .columns() + .iter() + .find(|col| col.name == "p_timestamp") + .unwrap() + .stats + .clone() + .unwrap() + { + column::TypedStatistics::Int(stats) => ( + NaiveDateTime::from_timestamp_millis(stats.min) + .unwrap() + .and_utc(), + NaiveDateTime::from_timestamp_millis(stats.min) + .unwrap() + .and_utc(), + ), + _ => unreachable!(), + } +} + pub async fn update_snapshot( storage: Arc, stream_name: &str, change: manifest::File, ) -> Result<(), ObjectStorageError> { - fn get_file_bounds(file: &manifest::File) -> (DateTime, DateTime) { - match file - .columns() - .iter() - .find(|col| col.name == "p_timestamp") - .unwrap() - .stats - .clone() - .unwrap() - { - column::TypedStatistics::Int(stats) => ( - NaiveDateTime::from_timestamp_millis(stats.min) - .unwrap() - .and_utc(), - NaiveDateTime::from_timestamp_millis(stats.min) - .unwrap() - .and_utc(), - ), - _ => unreachable!(), - } - } - // get current snapshot let mut meta = storage.get_snapshot(stream_name).await?; let manifests = &mut meta.manifest_list; @@ -154,6 +154,58 @@ pub async fn update_snapshot( Ok(()) } +pub async fn remove_manifest_from_snapshot( + storage: Arc, + stream_name: &str, + dates: Vec, +) -> Result<(), ObjectStorageError> { + + // get current snapshot + let mut meta = storage.get_snapshot(stream_name).await?; + let manifests = &mut meta.manifest_list; + + // Filter out items whose manifest_path contains any of the dates_to_delete + manifests.retain(|item| { + !dates.iter().any(|date| item.manifest_path.contains(date)) + }); + + storage.put_snapshot(stream_name, meta).await?; + Ok(()) +} + +pub async fn get_first_event( + storage: Arc, + stream_name: &str +) -> Result, ObjectStorageError> { + + // get current snapshot + let mut meta = storage.get_snapshot(stream_name).await?; + let manifests = &mut meta.manifest_list; + + if manifests.is_empty() { + log::info!("No manifest found for stream {stream_name}"); + return Err(ObjectStorageError::Custom("No manifest found".to_string())); + } + + let manifest = &manifests[0]; + + let path = partition_path(stream_name, manifest.time_lower_bound, manifest.time_upper_bound); + let Some(manifest) = storage.get_manifest(&path).await? else { + return Err(ObjectStorageError::UnhandledError( + "Manifest found in snapshot but not in object-storage" + .to_string() + .into(), + )); + }; + + if let Some(first_event) = manifest.files.first() { + let (lower_bound, _) = get_file_bounds(&first_event); + let first_event_at = lower_bound.with_timezone(&Local).to_rfc3339(); + return Ok(Some(first_event_at)); + } + Ok(None) +} + /// Partition the path to which this manifest belongs. /// Useful when uploading the manifest file. fn partition_path( diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index 429c3ffcd..a5b5dd428 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -24,6 +24,7 @@ use serde_json::Value; use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; +use crate::{catalog, metadata}; use crate::event::error::EventError; use crate::event::format::EventFormat; use crate::event::{self, format}; @@ -32,6 +33,7 @@ use crate::handlers::{ STREAM_NAME_HEADER_KEY, }; use crate::metadata::STREAM_INFO; +use crate::option::CONFIG; use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError}; use super::kinesis; @@ -49,6 +51,31 @@ pub async fn ingest(req: HttpRequest, body: Bytes) -> Result Result<(), PostEr Ok(()) } +// Check if the first_event_at is empty +pub fn first_event_at_empty(stream_name: &str) -> bool { + let hash_map = STREAM_INFO.read().unwrap(); + if let Some(stream_info) = hash_map.get(stream_name) { + if let Some(first_event_at) = &stream_info.first_event_at { + return first_event_at.is_empty(); + } + } + true +} + #[derive(Debug, thiserror::Error)] pub enum PostError { #[error("Stream {0} not found")] diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index 2ef3b7435..89a2d19b0 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -267,16 +267,16 @@ pub async fn get_stats(req: HttpRequest) -> Result .ok_or(StreamError::StreamNotFound(stream_name.clone()))?; let hash_map = STREAM_INFO.read().unwrap(); - let stream_creation_time = &hash_map + let stream_meta = &hash_map .get(&stream_name) - .ok_or(StreamError::StreamNotFound(stream_name.clone()))? - .created_at; - + .ok_or(StreamError::StreamNotFound(stream_name.clone()))?; + let time = Utc::now(); let stats = serde_json::json!({ "stream": stream_name, - "creation_time": stream_creation_time, + "creation_time": &stream_meta.created_at, + "first_event_at": Some(&stream_meta.first_event_at), "time": time, "ingestion": { "count": stats.events, diff --git a/server/src/metadata.rs b/server/src/metadata.rs index 3f12bcc61..e8a250719 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -45,6 +45,7 @@ pub struct LogStreamMetadata { pub alerts: Alerts, pub cache_enabled: bool, pub created_at: String, + pub first_event_at: Option, } // It is very unlikely that panic will occur when dealing with metadata. @@ -128,6 +129,19 @@ impl StreamInfo { }) } + pub fn set_first_event_at( + &self, + stream_name: &str, + first_event_at: Option, + ) -> Result<(), MetadataError> { + let mut map = self.write().expect(LOCK_EXPECT); + map.get_mut(stream_name) + .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) + .map(|metadata| { + metadata.first_event_at = first_event_at; + }) + } + pub fn add_stream(&self, stream_name: String, created_at: String) { let mut map = self.write().expect(LOCK_EXPECT); let metadata = LogStreamMetadata { @@ -170,6 +184,7 @@ impl StreamInfo { alerts, cache_enabled: meta.cache_enabled, created_at: meta.created_at, + first_event_at: meta.first_event_at, }; let mut map = self.write().expect(LOCK_EXPECT); diff --git a/server/src/storage.rs b/server/src/storage.rs index 975fcf445..6396f9874 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -69,6 +69,8 @@ pub struct ObjectStoreFormat { pub objectstore_format: String, #[serde(rename = "created-at")] pub created_at: String, + #[serde(rename = "first-event-at")] + pub first_event_at: Option, pub owner: Owner, pub permissions: Vec, pub stats: Stats, @@ -113,6 +115,7 @@ impl Default for ObjectStoreFormat { version: CURRENT_SCHEMA_VERSION.to_string(), objectstore_format: CURRENT_OBJECT_STORE_VERSION.to_string(), created_at: Local::now().to_rfc3339(), + first_event_at: None, owner: Owner::new("".to_string(), "".to_string()), permissions: vec![Permisssion::new("parseable".to_string())], stats: Stats::default(), diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 7494d16e1..effec6455 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -161,6 +161,23 @@ pub trait ObjectStorage: Sync + 'static { self.put_object(&path, to_bytes(&stream_metadata)).await } + async fn put_first_event_at( + &self, + stream_name: &str, + first_event_at: &str, + ) -> Result<(), ObjectStorageError> { + let path = stream_json_path(stream_name); + let stream_metadata = self.get_object(&path).await?; + let first_event_ts = + serde_json::to_value(first_event_at).expect("first_event_at is perfectly serializable"); + let mut stream_metadata: serde_json::Value = + serde_json::from_slice(&stream_metadata).expect("parseable config is valid json"); + + stream_metadata["first-event-at"] = first_event_ts; + + self.put_object(&path, to_bytes(&stream_metadata)).await + } + async fn put_metadata( &self, parseable_metadata: &StorageMetadata, diff --git a/server/src/storage/retention.rs b/server/src/storage/retention.rs index 82062575a..6b3c8d4c2 100644 --- a/server/src/storage/retention.rs +++ b/server/src/storage/retention.rs @@ -187,7 +187,7 @@ mod action { use itertools::Itertools; use relative_path::RelativePathBuf; - use crate::option::CONFIG; + use crate::{catalog::{self, remove_manifest_from_snapshot}, metadata, option::CONFIG}; pub(super) async fn delete(stream_name: String, days: u32) { log::info!("running retention task - delete for stream={stream_name}"); @@ -206,6 +206,7 @@ mod action { .into_iter() .filter(|date| string_to_date(date) < retain_until) .collect_vec(); + let dates = dates_to_delete.clone(); let delete_tasks = FuturesUnordered::new(); for date in dates_to_delete { @@ -226,6 +227,35 @@ mod action { log::error!("Failed to run delete task {err:?}") } } + + let store = CONFIG.storage().get_object_store(); + let res = remove_manifest_from_snapshot(store.clone(), &stream_name, dates).await; + if let Err(err) = res { + log::error!("Failed to update manifest list in the snapshot {err:?}") + } + + if let Ok(Some(first_event_at)) = catalog::get_first_event(store, &stream_name).await { + if let Err(err) = CONFIG + .storage() + .get_object_store() + .put_first_event_at(&stream_name, &first_event_at) + .await + { + log::error!( + "Failed to update first_event_at in metadata for stream {:?} {err:?}", + stream_name + ); + } + + if let Err(err) = metadata::STREAM_INFO.set_first_event_at(&stream_name, Some(first_event_at)) + { + log::error!( + "Failed to update first_event_at in streaminfo for stream {:?} {err:?}", + stream_name + ); + } + } + } fn get_retain_until(current_date: NaiveDate, days: u64) -> NaiveDate { From 56851f98a922512d8eb6e3edbe3eb452072e2b5e Mon Sep 17 00:00:00 2001 From: gurjotkaur20 Date: Sun, 11 Feb 2024 14:10:25 +0530 Subject: [PATCH 3/6] fix: cargo fmt check --- server/src/catalog.rs | 14 +++++++------- server/src/handlers/http/ingest.rs | 19 ++++++++++--------- server/src/handlers/http/logstream.rs | 2 +- server/src/storage.rs | 2 +- server/src/storage/object_storage.rs | 4 ++-- server/src/storage/retention.rs | 18 +++++++++++------- 6 files changed, 32 insertions(+), 27 deletions(-) diff --git a/server/src/catalog.rs b/server/src/catalog.rs index 91d93d430..09c665e01 100644 --- a/server/src/catalog.rs +++ b/server/src/catalog.rs @@ -159,15 +159,12 @@ pub async fn remove_manifest_from_snapshot( stream_name: &str, dates: Vec, ) -> Result<(), ObjectStorageError> { - // get current snapshot let mut meta = storage.get_snapshot(stream_name).await?; let manifests = &mut meta.manifest_list; // Filter out items whose manifest_path contains any of the dates_to_delete - manifests.retain(|item| { - !dates.iter().any(|date| item.manifest_path.contains(date)) - }); + manifests.retain(|item| !dates.iter().any(|date| item.manifest_path.contains(date))); storage.put_snapshot(stream_name, meta).await?; Ok(()) @@ -175,9 +172,8 @@ pub async fn remove_manifest_from_snapshot( pub async fn get_first_event( storage: Arc, - stream_name: &str + stream_name: &str, ) -> Result, ObjectStorageError> { - // get current snapshot let mut meta = storage.get_snapshot(stream_name).await?; let manifests = &mut meta.manifest_list; @@ -189,7 +185,11 @@ pub async fn get_first_event( let manifest = &manifests[0]; - let path = partition_path(stream_name, manifest.time_lower_bound, manifest.time_upper_bound); + let path = partition_path( + stream_name, + manifest.time_lower_bound, + manifest.time_upper_bound, + ); let Some(manifest) = storage.get_manifest(&path).await? else { return Err(ObjectStorageError::UnhandledError( "Manifest found in snapshot but not in object-storage" diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index a5b5dd428..35f86c1ea 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -24,7 +24,6 @@ use serde_json::Value; use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; -use crate::{catalog, metadata}; use crate::event::error::EventError; use crate::event::format::EventFormat; use crate::event::{self, format}; @@ -35,6 +34,7 @@ use crate::handlers::{ use crate::metadata::STREAM_INFO; use crate::option::CONFIG; use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError}; +use crate::{catalog, metadata}; use super::kinesis; use super::logstream::error::CreateStreamError; @@ -53,20 +53,21 @@ pub async fn ingest(req: HttpRequest, body: Bytes) -> Result Result Result let stream_meta = &hash_map .get(&stream_name) .ok_or(StreamError::StreamNotFound(stream_name.clone()))?; - + let time = Utc::now(); let stats = serde_json::json!({ diff --git a/server/src/storage.rs b/server/src/storage.rs index 6396f9874..63e9577c1 100644 --- a/server/src/storage.rs +++ b/server/src/storage.rs @@ -70,7 +70,7 @@ pub struct ObjectStoreFormat { #[serde(rename = "created-at")] pub created_at: String, #[serde(rename = "first-event-at")] - pub first_event_at: Option, + pub first_event_at: Option, pub owner: Owner, pub permissions: Vec, pub stats: Stats, diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index effec6455..0d867da94 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -172,8 +172,8 @@ pub trait ObjectStorage: Sync + 'static { serde_json::to_value(first_event_at).expect("first_event_at is perfectly serializable"); let mut stream_metadata: serde_json::Value = serde_json::from_slice(&stream_metadata).expect("parseable config is valid json"); - - stream_metadata["first-event-at"] = first_event_ts; + + stream_metadata["first-event-at"] = first_event_ts; self.put_object(&path, to_bytes(&stream_metadata)).await } diff --git a/server/src/storage/retention.rs b/server/src/storage/retention.rs index 6b3c8d4c2..aedae6a4c 100644 --- a/server/src/storage/retention.rs +++ b/server/src/storage/retention.rs @@ -187,7 +187,11 @@ mod action { use itertools::Itertools; use relative_path::RelativePathBuf; - use crate::{catalog::{self, remove_manifest_from_snapshot}, metadata, option::CONFIG}; + use crate::{ + catalog::{self, remove_manifest_from_snapshot}, + metadata, + option::CONFIG, + }; pub(super) async fn delete(stream_name: String, days: u32) { log::info!("running retention task - delete for stream={stream_name}"); @@ -236,10 +240,10 @@ mod action { if let Ok(Some(first_event_at)) = catalog::get_first_event(store, &stream_name).await { if let Err(err) = CONFIG - .storage() - .get_object_store() - .put_first_event_at(&stream_name, &first_event_at) - .await + .storage() + .get_object_store() + .put_first_event_at(&stream_name, &first_event_at) + .await { log::error!( "Failed to update first_event_at in metadata for stream {:?} {err:?}", @@ -247,7 +251,8 @@ mod action { ); } - if let Err(err) = metadata::STREAM_INFO.set_first_event_at(&stream_name, Some(first_event_at)) + if let Err(err) = + metadata::STREAM_INFO.set_first_event_at(&stream_name, Some(first_event_at)) { log::error!( "Failed to update first_event_at in streaminfo for stream {:?} {err:?}", @@ -255,7 +260,6 @@ mod action { ); } } - } fn get_retain_until(current_date: NaiveDate, days: u64) -> NaiveDate { From a4173b9701959541f19e448e915dd1c287d1eeb9 Mon Sep 17 00:00:00 2001 From: gurjotkaur20 Date: Sun, 11 Feb 2024 14:18:28 +0530 Subject: [PATCH 4/6] fix: cargo check --- server/src/catalog.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/catalog.rs b/server/src/catalog.rs index 09c665e01..8a8d86d59 100644 --- a/server/src/catalog.rs +++ b/server/src/catalog.rs @@ -199,7 +199,7 @@ pub async fn get_first_event( }; if let Some(first_event) = manifest.files.first() { - let (lower_bound, _) = get_file_bounds(&first_event); + let (lower_bound, _) = get_file_bounds(first_event); let first_event_at = lower_bound.with_timezone(&Local).to_rfc3339(); return Ok(Some(first_event_at)); } From b808525623b9ea70b323b457ec3ddc430a9b19ce Mon Sep 17 00:00:00 2001 From: gurjotkaur20 Date: Thu, 15 Feb 2024 18:26:34 +0530 Subject: [PATCH 5/6] fix: modify min to max --- server/src/catalog.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/catalog.rs b/server/src/catalog.rs index 8a8d86d59..3ffdd21a1 100644 --- a/server/src/catalog.rs +++ b/server/src/catalog.rs @@ -83,7 +83,7 @@ fn get_file_bounds(file: &manifest::File) -> (DateTime, DateTime) { NaiveDateTime::from_timestamp_millis(stats.min) .unwrap() .and_utc(), - NaiveDateTime::from_timestamp_millis(stats.min) + NaiveDateTime::from_timestamp_millis(stats.max) .unwrap() .and_utc(), ), From 58b3ee90c45290c5b155ae4eacab6bcd3a30755c Mon Sep 17 00:00:00 2001 From: gurjotkaur20 Date: Sun, 18 Feb 2024 18:48:29 +0530 Subject: [PATCH 6/6] fix: move checking first_event_at_empty flow to stats api --- server/src/handlers/http/ingest.rs | 39 --------------------------- server/src/handlers/http/logstream.rs | 39 ++++++++++++++++++++++++++- 2 files changed, 38 insertions(+), 40 deletions(-) diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index 35f86c1ea..429c3ffcd 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -32,9 +32,7 @@ use crate::handlers::{ STREAM_NAME_HEADER_KEY, }; use crate::metadata::STREAM_INFO; -use crate::option::CONFIG; use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError}; -use crate::{catalog, metadata}; use super::kinesis; use super::logstream::error::CreateStreamError; @@ -51,32 +49,6 @@ pub async fn ingest(req: HttpRequest, body: Bytes) -> Result Result<(), PostEr Ok(()) } -// Check if the first_event_at is empty -pub fn first_event_at_empty(stream_name: &str) -> bool { - let hash_map = STREAM_INFO.read().unwrap(); - if let Some(stream_info) = hash_map.get(stream_name) { - if let Some(first_event_at) = &stream_info.first_event_at { - return first_event_at.is_empty(); - } - } - true -} - #[derive(Debug, thiserror::Error)] pub enum PostError { #[error("Stream {0} not found")] diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index efef85c3b..541b19ae4 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -28,7 +28,7 @@ use crate::metadata::STREAM_INFO; use crate::option::CONFIG; use crate::storage::retention::Retention; use crate::storage::{LogStream, StorageDir}; -use crate::{event, stats}; +use crate::{catalog, event, stats}; use crate::{metadata, validator}; use self::error::{CreateStreamError, StreamError}; @@ -263,6 +263,32 @@ pub async fn get_stats(req: HttpRequest) -> Result return Err(StreamError::StreamNotFound(stream_name)); } + if first_event_at_empty(&stream_name) { + let store = CONFIG.storage().get_object_store(); + if let Ok(Some(first_event_at)) = catalog::get_first_event(store, &stream_name).await { + if let Err(err) = CONFIG + .storage() + .get_object_store() + .put_first_event_at(&stream_name, &first_event_at) + .await + { + log::error!( + "Failed to update first_event_at in metadata for stream {:?} {err:?}", + stream_name + ); + } + + if let Err(err) = + metadata::STREAM_INFO.set_first_event_at(&stream_name, Some(first_event_at)) + { + log::error!( + "Failed to update first_event_at in streaminfo for stream {:?} {err:?}", + stream_name + ); + } + } + } + let stats = stats::get_current_stats(&stream_name, "json") .ok_or(StreamError::StreamNotFound(stream_name.clone()))?; @@ -292,6 +318,17 @@ pub async fn get_stats(req: HttpRequest) -> Result Ok((web::Json(stats), StatusCode::OK)) } +// Check if the first_event_at is empty +pub fn first_event_at_empty(stream_name: &str) -> bool { + let hash_map = STREAM_INFO.read().unwrap(); + if let Some(stream_info) = hash_map.get(stream_name) { + if let Some(first_event_at) = &stream_info.first_event_at { + return first_event_at.is_empty(); + } + } + true +} + fn remove_id_from_alerts(value: &mut Value) { if let Some(Value::Array(alerts)) = value.get_mut("alerts") { alerts