diff --git a/src/catalog/mod.rs b/src/catalog/mod.rs index aa46afb4f..878568e34 100644 --- a/src/catalog/mod.rs +++ b/src/catalog/mod.rs @@ -279,7 +279,9 @@ async fn create_manifest( } }; first_event_at = Some(lower_bound.with_timezone(&Local).to_rfc3339()); - if let Err(err) = STREAM_INFO.set_first_event_at(stream_name, first_event_at.clone()) { + if let Err(err) = + STREAM_INFO.set_first_event_at(stream_name, first_event_at.as_ref().unwrap()) + { error!( "Failed to update first_event_at in streaminfo for stream {:?} {err:?}", stream_name @@ -330,8 +332,8 @@ pub async fn remove_manifest_from_snapshot( let manifests = &mut meta.snapshot.manifest_list; // Filter out items whose manifest_path contains any of the dates_to_delete manifests.retain(|item| !dates.iter().any(|date| item.manifest_path.contains(date))); + STREAM_INFO.reset_first_event_at(stream_name)?; meta.first_event_at = None; - STREAM_INFO.set_first_event_at(stream_name, None)?; storage.put_snapshot(stream_name, meta.snapshot).await?; } match CONFIG.options.mode { @@ -391,7 +393,7 @@ pub async fn get_first_event( first_event_at = lower_bound.with_timezone(&Local).to_rfc3339(); meta.first_event_at = Some(first_event_at.clone()); storage.put_stream_manifest(stream_name, &meta).await?; - STREAM_INFO.set_first_event_at(stream_name, Some(first_event_at.clone()))?; + STREAM_INFO.set_first_event_at(stream_name, &first_event_at)?; } } } diff --git a/src/event/format/json.rs b/src/event/format/json.rs index 71fcaffc7..5006be142 100644 --- a/src/event/format/json.rs +++ b/src/event/format/json.rs @@ -94,8 +94,8 @@ impl EventFormat for Event { }; if value_arr - .iter() - .any(|value| fields_mismatch(&schema, value, schema_version)) + .iter() + .any(|value| fields_mismatch(&schema, value, schema_version)) { return Err(anyhow!( "Could not process this event due to mismatch in datatype" diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index 2b2c2a0b3..c0a2ec323 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -112,11 +112,8 @@ pub trait EventFormat: Sized { time_partition: Option<&String>, schema_version: SchemaVersion, ) -> Result<(RecordBatch, bool), AnyError> { - let (data, mut schema, is_first) = self.to_data( - storage_schema, - time_partition, - schema_version, - )?; + let (data, mut schema, is_first) = + self.to_data(storage_schema, time_partition, schema_version)?; if get_field(&schema, DEFAULT_TIMESTAMP_KEY).is_some() { return Err(anyhow!( diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index 903b51ebd..a4c25eb45 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -21,11 +21,10 @@ use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, St use super::cluster::{sync_streams_with_ingestors, INTERNAL_STREAM_NAME}; use super::ingest::create_stream_if_not_exists; use super::modal::utils::logstream_utils::{ - create_stream_and_schema_from_storage, create_update_stream, + create_stream_and_schema_from_storage, create_update_stream, update_first_event_at, }; use super::query::update_schema_when_distributed; use crate::alerts::Alerts; -use crate::catalog::get_first_event; use crate::event::format::{override_data_type, LogSource}; use crate::handlers::STREAM_TYPE_KEY; use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION}; @@ -57,7 +56,7 @@ use std::fs; use std::num::NonZeroU32; use std::str::FromStr; use std::sync::Arc; -use tracing::{error, warn}; +use tracing::warn; pub async fn delete(stream_name: Path) -> Result { let stream_name = stream_name.into_inner(); @@ -550,19 +549,19 @@ pub async fn get_stream_info(stream_name: Path) -> Result = Vec::new(); - if let Ok(Some(first_event_at)) = get_first_event(store, &stream_name, dates).await { - if let Err(err) = - metadata::STREAM_INFO.set_first_event_at(&stream_name, Some(first_event_at)) + let storage = CONFIG.storage().get_object_store(); + // if first_event_at is not found in memory map, check if it exists in the storage + // if it exists in the storage, update the first_event_at in memory map + let stream_first_event_at = + if let Ok(Some(first_event_at)) = STREAM_INFO.get_first_event(&stream_name) { + Some(first_event_at) + } else if let Ok(Some(first_event_at)) = + storage.get_first_event_from_storage(&stream_name).await { - error!( - "Failed to update first_event_at in streaminfo for stream {:?} {err:?}", - stream_name - ); - } - } + update_first_event_at(&stream_name, &first_event_at).await + } else { + None + }; let hash_map = STREAM_INFO.read().unwrap(); let stream_meta = &hash_map @@ -572,7 +571,7 @@ pub async fn get_stream_info(stream_name: Path) -> Result) -> Result Get info for given log stream diff --git a/src/handlers/http/modal/utils/logstream_utils.rs b/src/handlers/http/modal/utils/logstream_utils.rs index cdc338ad8..ab7726488 100644 --- a/src/handlers/http/modal/utils/logstream_utils.rs +++ b/src/handlers/http/modal/utils/logstream_utils.rs @@ -36,6 +36,7 @@ use crate::{ storage::{LogStream, ObjectStoreFormat, StreamType}, validator, }; +use tracing::error; pub async fn create_update_stream( headers: &HeaderMap, @@ -508,3 +509,56 @@ pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result< Ok(true) } + +/// Updates the first-event-at in storage and logstream metadata for the specified stream. +/// +/// This function updates the `first-event-at` in both the object store and the stream info metadata. +/// If either update fails, an error is logged, but the function will still return the `first-event-at`. +/// +/// # Arguments +/// +/// * `stream_name` - The name of the stream to update. +/// * `first_event_at` - The value of first-event-at. +/// +/// # Returns +/// +/// * `Option` - Returns `Some(String)` with the provided timestamp if the update is successful, +/// or `None` if an error occurs. +/// +/// # Errors +/// +/// This function logs an error if: +/// * The `first-event-at` cannot be updated in the object store. +/// * The `first-event-at` cannot be updated in the stream info. +/// +/// # Examples +///```ignore +/// ```rust +/// use parseable::handlers::http::modal::utils::logstream_utils::update_first_event_at; +/// let result = update_first_event_at("my_stream", "2023-01-01T00:00:00Z").await; +/// match result { +/// Some(timestamp) => println!("first-event-at: {}", timestamp), +/// None => eprintln!("Failed to update first-event-at"), +/// } +/// ``` +pub async fn update_first_event_at(stream_name: &str, first_event_at: &str) -> Option { + let storage = CONFIG.storage().get_object_store(); + if let Err(err) = storage + .update_first_event_in_stream(stream_name, first_event_at) + .await + { + error!( + "Failed to update first_event_at in storage for stream {:?}: {err:?}", + stream_name + ); + } + + if let Err(err) = metadata::STREAM_INFO.set_first_event_at(stream_name, first_event_at) { + error!( + "Failed to update first_event_at in stream info for stream {:?}: {err:?}", + stream_name + ); + } + + Some(first_event_at.to_string()) +} diff --git a/src/metadata.rs b/src/metadata.rs index 5c18aa329..182bc610a 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -212,13 +212,46 @@ impl StreamInfo { pub fn set_first_event_at( &self, stream_name: &str, - first_event_at: Option, + first_event_at: &str, ) -> Result<(), MetadataError> { let mut map = self.write().expect(LOCK_EXPECT); map.get_mut(stream_name) .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) .map(|metadata| { - metadata.first_event_at = first_event_at; + metadata.first_event_at = Some(first_event_at.to_owned()); + }) + } + + /// Removes the `first_event_at` timestamp for the specified stream from the LogStreamMetadata. + /// + /// This function is called during the retention task, when the parquet files along with the manifest files are deleted from the storage. + /// The manifest path is removed from the snapshot in the stream.json + /// and the first_event_at value in the stream.json is removed. + /// + /// # Arguments + /// + /// * `stream_name` - The name of the stream for which the `first_event_at` timestamp is to be removed. + /// + /// # Returns + /// + /// * `Result<(), MetadataError>` - Returns `Ok(())` if the `first_event_at` timestamp is successfully removed, + /// or a `MetadataError` if the stream metadata is not found. + /// + /// # Examples + /// ```ignore + /// ```rust + /// let result = metadata.remove_first_event_at("my_stream"); + /// match result { + /// Ok(()) => println!("first-event-at removed successfully"), + /// Err(e) => eprintln!("Error removing first-event-at from STREAM_INFO: {}", e), + /// } + /// ``` + pub fn reset_first_event_at(&self, stream_name: &str) -> Result<(), MetadataError> { + let mut map = self.write().expect(LOCK_EXPECT); + map.get_mut(stream_name) + .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) + .map(|metadata| { + metadata.first_event_at.take(); }) } diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 16d526c8a..50cef439c 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -44,7 +44,7 @@ use actix_web_prometheus::PrometheusMetrics; use arrow_schema::Schema; use async_trait::async_trait; use bytes::Bytes; -use chrono::Local; +use chrono::{DateTime, Local, Utc}; use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::RuntimeEnvBuilder}; use once_cell::sync::OnceCell; use relative_path::RelativePath; @@ -217,6 +217,42 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { Ok(()) } + /// Updates the first event timestamp in the object store for the specified stream. + /// + /// This function retrieves the current object-store format for the given stream, + /// updates the `first_event_at` field with the provided timestamp, and then + /// stores the updated format back in the object store. + /// + /// # Arguments + /// + /// * `stream_name` - The name of the stream to update. + /// * `first_event` - The timestamp of the first event to set. + /// + /// # Returns + /// + /// * `Result<(), ObjectStorageError>` - Returns `Ok(())` if the update is successful, + /// or an `ObjectStorageError` if an error occurs. + /// + /// # Examples + /// ```ignore + /// ```rust + /// let result = object_store.update_first_event_in_stream("my_stream", "2023-01-01T00:00:00Z").await; + /// assert!(result.is_ok()); + /// ``` + async fn update_first_event_in_stream( + &self, + stream_name: &str, + first_event: &str, + ) -> Result<(), ObjectStorageError> { + let mut format = self.get_object_store_format(stream_name).await?; + format.first_event_at = Some(first_event.to_string()); + let format_json = to_bytes(&format); + self.put_object(&stream_json_path(stream_name), format_json) + .await?; + + Ok(()) + } + async fn put_alerts( &self, stream_name: &str, @@ -623,6 +659,78 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { Ok(()) } + async fn get_stream_meta_from_storage( + &self, + stream_name: &str, + ) -> Result, ObjectStorageError> { + let mut stream_metas = vec![]; + let stream_meta_bytes = self + .get_objects( + Some(&RelativePathBuf::from_iter([ + stream_name, + STREAM_ROOT_DIRECTORY, + ])), + Box::new(|file_name| file_name.ends_with("stream.json")), + ) + .await; + if let Ok(stream_meta_bytes) = stream_meta_bytes { + for stream_meta in stream_meta_bytes { + let stream_meta_ob = serde_json::from_slice::(&stream_meta)?; + stream_metas.push(stream_meta_ob); + } + } + + Ok(stream_metas) + } + + /// Retrieves the earliest first-event-at from the storage for the specified stream. + /// + /// This function fetches the object-store format from all the stream.json files for the given stream from the storage, + /// extracts the `first_event_at` timestamps, and returns the earliest `first_event_at`. + /// + /// # Arguments + /// + /// * `stream_name` - The name of the stream for which `first_event_at` is to be retrieved. + /// + /// # Returns + /// + /// * `Result, ObjectStorageError>` - Returns `Ok(Some(String))` with the earliest + /// first event timestamp if found, `Ok(None)` if no timestamps are found, or an `ObjectStorageError` + /// if an error occurs. + /// + /// # Examples + /// ```ignore + /// ```rust + /// let result = get_first_event_from_storage("my_stream").await; + /// match result { + /// Ok(Some(first_event)) => println!("first-event-at: {}", first_event), + /// Ok(None) => println!("first-event-at not found"), + /// Err(err) => println!("Error: {:?}", err), + /// } + /// ``` + async fn get_first_event_from_storage( + &self, + stream_name: &str, + ) -> Result, ObjectStorageError> { + let mut all_first_events = vec![]; + let stream_metas = self.get_stream_meta_from_storage(stream_name).await; + if let Ok(stream_metas) = stream_metas { + for stream_meta in stream_metas.iter() { + if let Some(first_event) = &stream_meta.first_event_at { + let first_event = DateTime::parse_from_rfc3339(first_event).unwrap(); + let first_event = first_event.with_timezone(&Utc); + all_first_events.push(first_event); + } + } + } + + if all_first_events.is_empty() { + return Ok(None); + } + let first_event_at = all_first_events.iter().min().unwrap().to_rfc3339(); + Ok(Some(first_event_at)) + } + // pick a better name fn get_bucket_name(&self) -> String; } diff --git a/src/storage/retention.rs b/src/storage/retention.rs index 24ffbf199..2cbee48c7 100644 --- a/src/storage/retention.rs +++ b/src/storage/retention.rs @@ -218,9 +218,9 @@ mod action { return; } } - if let Ok(first_event_at) = res_remove_manifest { + if let Ok(Some(first_event_at)) = res_remove_manifest { if let Err(err) = - metadata::STREAM_INFO.set_first_event_at(&stream_name, first_event_at) + metadata::STREAM_INFO.set_first_event_at(&stream_name, &first_event_at) { error!( "Failed to update first_event_at in streaminfo for stream {:?} {err:?}",