Skip to content

Commit 988c060

Browse files
fix: fetch first-event-at from storage
current: query server fetches first-event-at from all the live ingestors but, this logic fails when ingestor which ingested the events for a stream is not reachable anymore, query server gets `None` change: fetch the first-event-at from all the stream jsons from storage find the earliest value, and update in server's memory map
1 parent cdfe4a0 commit 988c060

File tree

6 files changed

+227
-25
lines changed

6 files changed

+227
-25
lines changed

src/catalog/mod.rs

Lines changed: 58 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ use crate::option::{Mode, CONFIG};
2727
use crate::stats::{
2828
event_labels_date, get_current_stats, storage_size_labels_date, update_deleted_stats,
2929
};
30+
use crate::storage::object_storage::get_stream_meta_from_storage;
3031
use crate::{
3132
catalog::manifest::Manifest,
3233
event::DEFAULT_TIMESTAMP_KEY,
@@ -280,7 +281,9 @@ async fn create_manifest(
280281
}
281282
};
282283
first_event_at = Some(lower_bound.with_timezone(&Local).to_rfc3339());
283-
if let Err(err) = STREAM_INFO.set_first_event_at(stream_name, first_event_at.clone()) {
284+
if let Err(err) =
285+
STREAM_INFO.set_first_event_at(stream_name, first_event_at.as_ref().unwrap())
286+
{
284287
error!(
285288
"Failed to update first_event_at in streaminfo for stream {:?} {err:?}",
286289
stream_name
@@ -331,8 +334,8 @@ pub async fn remove_manifest_from_snapshot(
331334
let manifests = &mut meta.snapshot.manifest_list;
332335
// Filter out items whose manifest_path contains any of the dates_to_delete
333336
manifests.retain(|item| !dates.iter().any(|date| item.manifest_path.contains(date)));
337+
STREAM_INFO.reset_first_event_at(stream_name)?;
334338
meta.first_event_at = None;
335-
STREAM_INFO.set_first_event_at(stream_name, None)?;
336339
storage.put_snapshot(stream_name, meta.snapshot).await?;
337340
}
338341
match CONFIG.options.mode {
@@ -392,7 +395,7 @@ pub async fn get_first_event(
392395
first_event_at = lower_bound.with_timezone(&Local).to_rfc3339();
393396
meta.first_event_at = Some(first_event_at.clone());
394397
storage.put_stream_manifest(stream_name, &meta).await?;
395-
STREAM_INFO.set_first_event_at(stream_name, Some(first_event_at.clone()))?;
398+
STREAM_INFO.set_first_event_at(stream_name, &first_event_at)?;
396399
}
397400
}
398401
}
@@ -435,6 +438,58 @@ pub async fn get_first_event(
435438
Ok(Some(first_event_at))
436439
}
437440

441+
/// Retrieves the earliest first-event-at from the storage for the specified stream.
442+
///
443+
/// This function fetches the object-store format from all the stream.json files for the given stream from the storage,
444+
/// extracts the `first_event_at` timestamps, and returns the earliest `first_event_at`.
445+
///
446+
/// # Arguments
447+
///
448+
/// * `stream_name` - The name of the stream for which `first_event_at` is to be retrieved.
449+
///
450+
/// # Returns
451+
///
452+
/// * `Result<Option<String>, ObjectStorageError>` - Returns `Ok(Some(String))` with the earliest
453+
/// first event timestamp if found, `Ok(None)` if no timestamps are found, or an `ObjectStorageError`
454+
/// if an error occurs.
455+
///
456+
/// # Errors
457+
///
458+
/// This function will return an error if:
459+
/// * The stream metadata cannot be retrieved from the storage.
460+
///
461+
/// # Examples
462+
/// ```ignore
463+
/// ```rust
464+
/// let result = get_first_event_from_storage("my_stream").await;
465+
/// match result {
466+
/// Ok(Some(first_event)) => println!("first-event-at: {}", first_event),
467+
/// Ok(None) => println!("first-event-at not found"),
468+
/// Err(e) => eprintln!("Error retrieving first-event-at from storage: {}", e),
469+
/// }
470+
/// ```
471+
pub async fn get_first_event_from_storage(
472+
stream_name: &str,
473+
) -> Result<Option<String>, ObjectStorageError> {
474+
let mut all_first_events = vec![];
475+
let stream_metas = get_stream_meta_from_storage(stream_name).await;
476+
if let Ok(stream_metas) = stream_metas {
477+
for stream_meta in stream_metas.iter() {
478+
if let Some(first_event) = &stream_meta.first_event_at {
479+
let first_event = DateTime::parse_from_rfc3339(first_event).unwrap();
480+
let first_event = first_event.with_timezone(&Utc);
481+
all_first_events.push(first_event);
482+
}
483+
}
484+
}
485+
486+
if all_first_events.is_empty() {
487+
return Ok(None);
488+
}
489+
let first_event_at = all_first_events.iter().min().unwrap().to_rfc3339();
490+
Ok(Some(first_event_at))
491+
}
492+
438493
/// Partition the path to which this manifest belongs.
439494
/// Useful when uploading the manifest file.
440495
pub fn partition_path(

src/handlers/http/logstream.rs

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@ use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, St
2121
use super::cluster::{sync_streams_with_ingestors, INTERNAL_STREAM_NAME};
2222
use super::ingest::create_stream_if_not_exists;
2323
use super::modal::utils::logstream_utils::{
24-
create_stream_and_schema_from_storage, create_update_stream,
24+
create_stream_and_schema_from_storage, create_update_stream, update_first_event_at,
2525
};
2626
use super::query::update_schema_when_distributed;
2727
use crate::alerts::Alerts;
28-
use crate::catalog::get_first_event;
28+
use crate::catalog::get_first_event_from_storage;
2929
use crate::event::format::{override_data_type, LogSource};
3030
use crate::handlers::STREAM_TYPE_KEY;
3131
use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION};
@@ -56,7 +56,7 @@ use std::fs;
5656
use std::num::NonZeroU32;
5757
use std::str::FromStr;
5858
use std::sync::Arc;
59-
use tracing::{error, warn};
59+
use tracing::warn;
6060

6161
pub async fn delete(req: HttpRequest) -> Result<impl Responder, StreamError> {
6262
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
@@ -559,18 +559,16 @@ pub async fn get_stream_info(req: HttpRequest) -> Result<impl Responder, StreamE
559559
}
560560
}
561561

562-
let store = CONFIG.storage().get_object_store();
563-
let dates: Vec<String> = Vec::new();
564-
if let Ok(Some(first_event_at)) = get_first_event(store, &stream_name, dates).await {
565-
if let Err(err) =
566-
metadata::STREAM_INFO.set_first_event_at(&stream_name, Some(first_event_at))
567-
{
568-
error!(
569-
"Failed to update first_event_at in streaminfo for stream {:?} {err:?}",
570-
stream_name
571-
);
572-
}
573-
}
562+
// if first_event_at is not found in memory map, check if it exists in the storage
563+
// if it exists in the storage, update the first_event_at in memory map
564+
let stream_first_event_at =
565+
if let Ok(Some(first_event_at)) = STREAM_INFO.get_first_event(&stream_name) {
566+
Some(first_event_at)
567+
} else if let Ok(Some(first_event_at)) = get_first_event_from_storage(&stream_name).await {
568+
update_first_event_at(&stream_name, &first_event_at).await
569+
} else {
570+
None
571+
};
574572

575573
let hash_map = STREAM_INFO.read().unwrap();
576574
let stream_meta = &hash_map
@@ -580,7 +578,7 @@ pub async fn get_stream_info(req: HttpRequest) -> Result<impl Responder, StreamE
580578
let stream_info: StreamInfo = StreamInfo {
581579
stream_type: stream_meta.stream_type.clone(),
582580
created_at: stream_meta.created_at.clone(),
583-
first_event_at: stream_meta.first_event_at.clone(),
581+
first_event_at: stream_first_event_at,
584582
time_partition: stream_meta.time_partition.clone(),
585583
time_partition_limit: stream_meta
586584
.time_partition_limit
@@ -590,8 +588,6 @@ pub async fn get_stream_info(req: HttpRequest) -> Result<impl Responder, StreamE
590588
log_source: stream_meta.log_source.clone(),
591589
};
592590

593-
// get the other info from
594-
595591
Ok((web::Json(stream_info), StatusCode::OK))
596592
}
597593

src/handlers/http/modal/utils/logstream_utils.rs

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ use crate::{
3636
storage::{LogStream, ObjectStoreFormat, StreamType},
3737
validator,
3838
};
39+
use tracing::error;
3940

4041
pub async fn create_update_stream(
4142
req: &HttpRequest,
@@ -507,3 +508,56 @@ pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result<
507508

508509
Ok(true)
509510
}
511+
512+
/// Updates the first-event-at in storage and logstream metadata for the specified stream.
513+
///
514+
/// This function updates the `first-event-at` in both the object store and the stream info metadata.
515+
/// If either update fails, an error is logged, but the function will still return the `first-event-at`.
516+
///
517+
/// # Arguments
518+
///
519+
/// * `stream_name` - The name of the stream to update.
520+
/// * `first_event_at` - The value of first-event-at.
521+
///
522+
/// # Returns
523+
///
524+
/// * `Option<String>` - Returns `Some(String)` with the provided timestamp if the update is successful,
525+
/// or `None` if an error occurs.
526+
///
527+
/// # Errors
528+
///
529+
/// This function logs an error if:
530+
/// * The `first-event-at` cannot be updated in the object store.
531+
/// * The `first-event-at` cannot be updated in the stream info.
532+
///
533+
/// # Examples
534+
///```ignore
535+
/// ```rust
536+
/// use parseable::handlers::http::modal::utils::logstream_utils::update_first_event_at;
537+
/// let result = update_first_event_at("my_stream", "2023-01-01T00:00:00Z").await;
538+
/// match result {
539+
/// Some(timestamp) => println!("first-event-at: {}", timestamp),
540+
/// None => eprintln!("Failed to update first-event-at"),
541+
/// }
542+
/// ```
543+
pub async fn update_first_event_at(stream_name: &str, first_event_at: &str) -> Option<String> {
544+
let storage = CONFIG.storage().get_object_store();
545+
if let Err(err) = storage
546+
.update_first_event_in_stream(stream_name, first_event_at)
547+
.await
548+
{
549+
error!(
550+
"Failed to update first_event_at in storage for stream {:?}: {err:?}",
551+
stream_name
552+
);
553+
}
554+
555+
if let Err(err) = metadata::STREAM_INFO.set_first_event_at(stream_name, first_event_at) {
556+
error!(
557+
"Failed to update first_event_at in stream info for stream {:?}: {err:?}",
558+
stream_name
559+
);
560+
}
561+
562+
Some(first_event_at.to_string())
563+
}

src/metadata.rs

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -212,13 +212,50 @@ impl StreamInfo {
212212
pub fn set_first_event_at(
213213
&self,
214214
stream_name: &str,
215-
first_event_at: Option<String>,
215+
first_event_at: &str,
216216
) -> Result<(), MetadataError> {
217217
let mut map = self.write().expect(LOCK_EXPECT);
218218
map.get_mut(stream_name)
219219
.ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string()))
220220
.map(|metadata| {
221-
metadata.first_event_at = first_event_at;
221+
metadata.first_event_at = Some(first_event_at.to_owned());
222+
})
223+
}
224+
225+
/// Removes the `first_event_at` timestamp for the specified stream from the LogStreamMetadata.
226+
///
227+
/// This function acquires a write lock, retrieves the LogStreamMetadata for the given stream,
228+
/// and removes the `first_event_at` timestamp if it exists.
229+
///
230+
/// # Arguments
231+
///
232+
/// * `stream_name` - The name of the stream for which the `first_event_at` timestamp is to be removed.
233+
///
234+
/// # Returns
235+
///
236+
/// * `Result<(), MetadataError>` - Returns `Ok(())` if the `first_event_at` timestamp is successfully removed,
237+
/// or a `MetadataError` if the stream metadata is not found.
238+
///
239+
/// # Errors
240+
///
241+
/// This function will return an error if:
242+
/// * The stream metadata cannot be found.
243+
///
244+
/// # Examples
245+
/// ```ignore
246+
/// ```rust
247+
/// let result = metadata.remove_first_event_at("my_stream");
248+
/// match result {
249+
/// Ok(()) => println!("first-event-at removed successfully"),
250+
/// Err(e) => eprintln!("Error removing first-event-at from STREAM_INFO: {}", e),
251+
/// }
252+
/// ```
253+
pub fn reset_first_event_at(&self, stream_name: &str) -> Result<(), MetadataError> {
254+
let mut map = self.write().expect(LOCK_EXPECT);
255+
map.get_mut(stream_name)
256+
.ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string()))
257+
.map(|metadata| {
258+
metadata.first_event_at.take();
222259
})
223260
}
224261

src/storage/object_storage.rs

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,42 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
217217
Ok(())
218218
}
219219

220+
/// Updates the first event timestamp in the object store for the specified stream.
221+
///
222+
/// This function retrieves the current object-store format for the given stream,
223+
/// updates the `first_event_at` field with the provided timestamp, and then
224+
/// stores the updated format back in the object store.
225+
///
226+
/// # Arguments
227+
///
228+
/// * `stream_name` - The name of the stream to update.
229+
/// * `first_event` - The timestamp of the first event to set.
230+
///
231+
/// # Returns
232+
///
233+
/// * `Result<(), ObjectStorageError>` - Returns `Ok(())` if the update is successful,
234+
/// or an `ObjectStorageError` if an error occurs.
235+
///
236+
/// # Examples
237+
/// ```ignore
238+
/// ```rust
239+
/// let result = object_store.update_first_event_in_stream("my_stream", "2023-01-01T00:00:00Z").await;
240+
/// assert!(result.is_ok());
241+
/// ```
242+
async fn update_first_event_in_stream(
243+
&self,
244+
stream_name: &str,
245+
first_event: &str,
246+
) -> Result<(), ObjectStorageError> {
247+
let mut format = self.get_object_store_format(stream_name).await?;
248+
format.first_event_at = Some(first_event.to_string());
249+
let format_json = to_bytes(&format);
250+
self.put_object(&stream_json_path(stream_name), format_json)
251+
.await?;
252+
253+
Ok(())
254+
}
255+
220256
async fn put_alerts(
221257
&self,
222258
stream_name: &str,
@@ -747,3 +783,27 @@ pub fn ingestor_metadata_path(id: Option<&str>) -> RelativePathBuf {
747783
&format!("ingestor.{}.json", INGESTOR_META.get_ingestor_id()),
748784
])
749785
}
786+
787+
pub async fn get_stream_meta_from_storage(
788+
stream_name: &str,
789+
) -> Result<Vec<ObjectStoreFormat>, ObjectStorageError> {
790+
let storage = CONFIG.storage().get_object_store();
791+
let mut stream_metas = vec![];
792+
let stream_meta_bytes = storage
793+
.get_objects(
794+
Some(&RelativePathBuf::from_iter([
795+
stream_name,
796+
STREAM_ROOT_DIRECTORY,
797+
])),
798+
Box::new(|file_name| file_name.ends_with("stream.json")),
799+
)
800+
.await;
801+
if let Ok(stream_meta_bytes) = stream_meta_bytes {
802+
for stream_meta in stream_meta_bytes {
803+
let stream_meta_ob = serde_json::from_slice::<ObjectStoreFormat>(&stream_meta)?;
804+
stream_metas.push(stream_meta_ob);
805+
}
806+
}
807+
808+
Ok(stream_metas)
809+
}

src/storage/retention.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -218,9 +218,9 @@ mod action {
218218
return;
219219
}
220220
}
221-
if let Ok(first_event_at) = res_remove_manifest {
221+
if let Ok(Some(first_event_at)) = res_remove_manifest {
222222
if let Err(err) =
223-
metadata::STREAM_INFO.set_first_event_at(&stream_name, first_event_at)
223+
metadata::STREAM_INFO.set_first_event_at(&stream_name, &first_event_at)
224224
{
225225
error!(
226226
"Failed to update first_event_at in streaminfo for stream {:?} {err:?}",

0 commit comments

Comments
 (0)