From c26b6b1cb873f40211684b0bf192e24ae697e220 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Tue, 21 Jan 2025 21:43:54 -0500 Subject: [PATCH 1/3] fix: fetch first-event-at from storage current: query server fetches first-event-at from all the live ingestors but, this logic fails when ingestor which ingested the events for a stream is not reachable anymore, query server gets `None` change: fetch the first-event-at from all the stream jsons from storage find the earliest value, and update in server's memory map --- src/catalog/mod.rs | 61 ++++++++++++++++++- src/handlers/http/logstream.rs | 32 +++++----- .../http/modal/utils/logstream_utils.rs | 54 ++++++++++++++++ src/metadata.rs | 41 ++++++++++++- src/storage/object_storage.rs | 60 ++++++++++++++++++ src/storage/retention.rs | 4 +- 6 files changed, 227 insertions(+), 25 deletions(-) diff --git a/src/catalog/mod.rs b/src/catalog/mod.rs index aa46afb4f..0f4c44b19 100644 --- a/src/catalog/mod.rs +++ b/src/catalog/mod.rs @@ -27,6 +27,7 @@ use crate::option::{Mode, CONFIG}; use crate::stats::{ event_labels_date, get_current_stats, storage_size_labels_date, update_deleted_stats, }; +use crate::storage::object_storage::get_stream_meta_from_storage; use crate::{ catalog::manifest::Manifest, event::DEFAULT_TIMESTAMP_KEY, @@ -279,7 +280,9 @@ async fn create_manifest( } }; first_event_at = Some(lower_bound.with_timezone(&Local).to_rfc3339()); - if let Err(err) = STREAM_INFO.set_first_event_at(stream_name, first_event_at.clone()) { + if let Err(err) = + STREAM_INFO.set_first_event_at(stream_name, first_event_at.as_ref().unwrap()) + { error!( "Failed to update first_event_at in streaminfo for stream {:?} {err:?}", stream_name @@ -330,8 +333,8 @@ pub async fn remove_manifest_from_snapshot( let manifests = &mut meta.snapshot.manifest_list; // Filter out items whose manifest_path contains any of the dates_to_delete manifests.retain(|item| !dates.iter().any(|date| item.manifest_path.contains(date))); + STREAM_INFO.reset_first_event_at(stream_name)?; meta.first_event_at = None; - STREAM_INFO.set_first_event_at(stream_name, None)?; storage.put_snapshot(stream_name, meta.snapshot).await?; } match CONFIG.options.mode { @@ -391,7 +394,7 @@ pub async fn get_first_event( first_event_at = lower_bound.with_timezone(&Local).to_rfc3339(); meta.first_event_at = Some(first_event_at.clone()); storage.put_stream_manifest(stream_name, &meta).await?; - STREAM_INFO.set_first_event_at(stream_name, Some(first_event_at.clone()))?; + STREAM_INFO.set_first_event_at(stream_name, &first_event_at)?; } } } @@ -432,6 +435,58 @@ pub async fn get_first_event( Ok(Some(first_event_at)) } +/// Retrieves the earliest first-event-at from the storage for the specified stream. +/// +/// This function fetches the object-store format from all the stream.json files for the given stream from the storage, +/// extracts the `first_event_at` timestamps, and returns the earliest `first_event_at`. +/// +/// # Arguments +/// +/// * `stream_name` - The name of the stream for which `first_event_at` is to be retrieved. +/// +/// # Returns +/// +/// * `Result, ObjectStorageError>` - Returns `Ok(Some(String))` with the earliest +/// first event timestamp if found, `Ok(None)` if no timestamps are found, or an `ObjectStorageError` +/// if an error occurs. +/// +/// # Errors +/// +/// This function will return an error if: +/// * The stream metadata cannot be retrieved from the storage. +/// +/// # Examples +/// ```ignore +/// ```rust +/// let result = get_first_event_from_storage("my_stream").await; +/// match result { +/// Ok(Some(first_event)) => println!("first-event-at: {}", first_event), +/// Ok(None) => println!("first-event-at not found"), +/// Err(e) => eprintln!("Error retrieving first-event-at from storage: {}", e), +/// } +/// ``` +pub async fn get_first_event_from_storage( + stream_name: &str, +) -> Result, ObjectStorageError> { + let mut all_first_events = vec![]; + let stream_metas = get_stream_meta_from_storage(stream_name).await; + if let Ok(stream_metas) = stream_metas { + for stream_meta in stream_metas.iter() { + if let Some(first_event) = &stream_meta.first_event_at { + let first_event = DateTime::parse_from_rfc3339(first_event).unwrap(); + let first_event = first_event.with_timezone(&Utc); + all_first_events.push(first_event); + } + } + } + + if all_first_events.is_empty() { + return Ok(None); + } + let first_event_at = all_first_events.iter().min().unwrap().to_rfc3339(); + Ok(Some(first_event_at)) +} + /// Partition the path to which this manifest belongs. /// Useful when uploading the manifest file. pub fn partition_path( diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index 903b51ebd..4b4151bc0 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -21,11 +21,11 @@ use super::cluster::utils::{merge_quried_stats, IngestionStats, QueriedStats, St use super::cluster::{sync_streams_with_ingestors, INTERNAL_STREAM_NAME}; use super::ingest::create_stream_if_not_exists; use super::modal::utils::logstream_utils::{ - create_stream_and_schema_from_storage, create_update_stream, + create_stream_and_schema_from_storage, create_update_stream, update_first_event_at, }; use super::query::update_schema_when_distributed; use crate::alerts::Alerts; -use crate::catalog::get_first_event; +use crate::catalog::get_first_event_from_storage; use crate::event::format::{override_data_type, LogSource}; use crate::handlers::STREAM_TYPE_KEY; use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION}; @@ -57,7 +57,7 @@ use std::fs; use std::num::NonZeroU32; use std::str::FromStr; use std::sync::Arc; -use tracing::{error, warn}; +use tracing::warn; pub async fn delete(stream_name: Path) -> Result { let stream_name = stream_name.into_inner(); @@ -551,18 +551,16 @@ pub async fn get_stream_info(stream_name: Path) -> Result = Vec::new(); - if let Ok(Some(first_event_at)) = get_first_event(store, &stream_name, dates).await { - if let Err(err) = - metadata::STREAM_INFO.set_first_event_at(&stream_name, Some(first_event_at)) - { - error!( - "Failed to update first_event_at in streaminfo for stream {:?} {err:?}", - stream_name - ); - } - } + // if first_event_at is not found in memory map, check if it exists in the storage + // if it exists in the storage, update the first_event_at in memory map + let stream_first_event_at = + if let Ok(Some(first_event_at)) = STREAM_INFO.get_first_event(&stream_name) { + Some(first_event_at) + } else if let Ok(Some(first_event_at)) = get_first_event_from_storage(&stream_name).await { + update_first_event_at(&stream_name, &first_event_at).await + } else { + None + }; let hash_map = STREAM_INFO.read().unwrap(); let stream_meta = &hash_map @@ -572,7 +570,7 @@ pub async fn get_stream_info(stream_name: Path) -> Result) -> Result Result< Ok(true) } + +/// Updates the first-event-at in storage and logstream metadata for the specified stream. +/// +/// This function updates the `first-event-at` in both the object store and the stream info metadata. +/// If either update fails, an error is logged, but the function will still return the `first-event-at`. +/// +/// # Arguments +/// +/// * `stream_name` - The name of the stream to update. +/// * `first_event_at` - The value of first-event-at. +/// +/// # Returns +/// +/// * `Option` - Returns `Some(String)` with the provided timestamp if the update is successful, +/// or `None` if an error occurs. +/// +/// # Errors +/// +/// This function logs an error if: +/// * The `first-event-at` cannot be updated in the object store. +/// * The `first-event-at` cannot be updated in the stream info. +/// +/// # Examples +///```ignore +/// ```rust +/// use parseable::handlers::http::modal::utils::logstream_utils::update_first_event_at; +/// let result = update_first_event_at("my_stream", "2023-01-01T00:00:00Z").await; +/// match result { +/// Some(timestamp) => println!("first-event-at: {}", timestamp), +/// None => eprintln!("Failed to update first-event-at"), +/// } +/// ``` +pub async fn update_first_event_at(stream_name: &str, first_event_at: &str) -> Option { + let storage = CONFIG.storage().get_object_store(); + if let Err(err) = storage + .update_first_event_in_stream(stream_name, first_event_at) + .await + { + error!( + "Failed to update first_event_at in storage for stream {:?}: {err:?}", + stream_name + ); + } + + if let Err(err) = metadata::STREAM_INFO.set_first_event_at(stream_name, first_event_at) { + error!( + "Failed to update first_event_at in stream info for stream {:?}: {err:?}", + stream_name + ); + } + + Some(first_event_at.to_string()) +} diff --git a/src/metadata.rs b/src/metadata.rs index 5c18aa329..8f24a1c15 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -212,13 +212,50 @@ impl StreamInfo { pub fn set_first_event_at( &self, stream_name: &str, - first_event_at: Option, + first_event_at: &str, ) -> Result<(), MetadataError> { let mut map = self.write().expect(LOCK_EXPECT); map.get_mut(stream_name) .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) .map(|metadata| { - metadata.first_event_at = first_event_at; + metadata.first_event_at = Some(first_event_at.to_owned()); + }) + } + + /// Removes the `first_event_at` timestamp for the specified stream from the LogStreamMetadata. + /// + /// This function acquires a write lock, retrieves the LogStreamMetadata for the given stream, + /// and removes the `first_event_at` timestamp if it exists. + /// + /// # Arguments + /// + /// * `stream_name` - The name of the stream for which the `first_event_at` timestamp is to be removed. + /// + /// # Returns + /// + /// * `Result<(), MetadataError>` - Returns `Ok(())` if the `first_event_at` timestamp is successfully removed, + /// or a `MetadataError` if the stream metadata is not found. + /// + /// # Errors + /// + /// This function will return an error if: + /// * The stream metadata cannot be found. + /// + /// # Examples + /// ```ignore + /// ```rust + /// let result = metadata.remove_first_event_at("my_stream"); + /// match result { + /// Ok(()) => println!("first-event-at removed successfully"), + /// Err(e) => eprintln!("Error removing first-event-at from STREAM_INFO: {}", e), + /// } + /// ``` + pub fn reset_first_event_at(&self, stream_name: &str) -> Result<(), MetadataError> { + let mut map = self.write().expect(LOCK_EXPECT); + map.get_mut(stream_name) + .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) + .map(|metadata| { + metadata.first_event_at.take(); }) } diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 16d526c8a..d99094ea0 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -217,6 +217,42 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { Ok(()) } + /// Updates the first event timestamp in the object store for the specified stream. + /// + /// This function retrieves the current object-store format for the given stream, + /// updates the `first_event_at` field with the provided timestamp, and then + /// stores the updated format back in the object store. + /// + /// # Arguments + /// + /// * `stream_name` - The name of the stream to update. + /// * `first_event` - The timestamp of the first event to set. + /// + /// # Returns + /// + /// * `Result<(), ObjectStorageError>` - Returns `Ok(())` if the update is successful, + /// or an `ObjectStorageError` if an error occurs. + /// + /// # Examples + /// ```ignore + /// ```rust + /// let result = object_store.update_first_event_in_stream("my_stream", "2023-01-01T00:00:00Z").await; + /// assert!(result.is_ok()); + /// ``` + async fn update_first_event_in_stream( + &self, + stream_name: &str, + first_event: &str, + ) -> Result<(), ObjectStorageError> { + let mut format = self.get_object_store_format(stream_name).await?; + format.first_event_at = Some(first_event.to_string()); + let format_json = to_bytes(&format); + self.put_object(&stream_json_path(stream_name), format_json) + .await?; + + Ok(()) + } + async fn put_alerts( &self, stream_name: &str, @@ -747,3 +783,27 @@ pub fn ingestor_metadata_path(id: Option<&str>) -> RelativePathBuf { &format!("ingestor.{}.json", INGESTOR_META.get_ingestor_id()), ]) } + +pub async fn get_stream_meta_from_storage( + stream_name: &str, +) -> Result, ObjectStorageError> { + let storage = CONFIG.storage().get_object_store(); + let mut stream_metas = vec![]; + let stream_meta_bytes = storage + .get_objects( + Some(&RelativePathBuf::from_iter([ + stream_name, + STREAM_ROOT_DIRECTORY, + ])), + Box::new(|file_name| file_name.ends_with("stream.json")), + ) + .await; + if let Ok(stream_meta_bytes) = stream_meta_bytes { + for stream_meta in stream_meta_bytes { + let stream_meta_ob = serde_json::from_slice::(&stream_meta)?; + stream_metas.push(stream_meta_ob); + } + } + + Ok(stream_metas) +} diff --git a/src/storage/retention.rs b/src/storage/retention.rs index 24ffbf199..2cbee48c7 100644 --- a/src/storage/retention.rs +++ b/src/storage/retention.rs @@ -218,9 +218,9 @@ mod action { return; } } - if let Ok(first_event_at) = res_remove_manifest { + if let Ok(Some(first_event_at)) = res_remove_manifest { if let Err(err) = - metadata::STREAM_INFO.set_first_event_at(&stream_name, first_event_at) + metadata::STREAM_INFO.set_first_event_at(&stream_name, &first_event_at) { error!( "Failed to update first_event_at in streaminfo for stream {:?} {err:?}", From b618c4713fd7c41d533daf45fa592c03ae6b85f1 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Fri, 24 Jan 2025 12:55:03 -0500 Subject: [PATCH 2/3] updated comments and styling changes --- src/catalog/mod.rs | 7 +------ src/event/format/json.rs | 4 ++-- src/event/format/mod.rs | 7 ++----- src/handlers/http/modal/ingest_server.rs | 2 +- src/metadata.rs | 10 +++------- 5 files changed, 9 insertions(+), 21 deletions(-) diff --git a/src/catalog/mod.rs b/src/catalog/mod.rs index 0f4c44b19..c1c877afd 100644 --- a/src/catalog/mod.rs +++ b/src/catalog/mod.rs @@ -450,11 +450,6 @@ pub async fn get_first_event( /// first event timestamp if found, `Ok(None)` if no timestamps are found, or an `ObjectStorageError` /// if an error occurs. /// -/// # Errors -/// -/// This function will return an error if: -/// * The stream metadata cannot be retrieved from the storage. -/// /// # Examples /// ```ignore /// ```rust @@ -462,7 +457,7 @@ pub async fn get_first_event( /// match result { /// Ok(Some(first_event)) => println!("first-event-at: {}", first_event), /// Ok(None) => println!("first-event-at not found"), -/// Err(e) => eprintln!("Error retrieving first-event-at from storage: {}", e), +/// Err(err) => println!("Error: {:?}", err), /// } /// ``` pub async fn get_first_event_from_storage( diff --git a/src/event/format/json.rs b/src/event/format/json.rs index 71fcaffc7..5006be142 100644 --- a/src/event/format/json.rs +++ b/src/event/format/json.rs @@ -94,8 +94,8 @@ impl EventFormat for Event { }; if value_arr - .iter() - .any(|value| fields_mismatch(&schema, value, schema_version)) + .iter() + .any(|value| fields_mismatch(&schema, value, schema_version)) { return Err(anyhow!( "Could not process this event due to mismatch in datatype" diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index 2b2c2a0b3..c0a2ec323 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -112,11 +112,8 @@ pub trait EventFormat: Sized { time_partition: Option<&String>, schema_version: SchemaVersion, ) -> Result<(RecordBatch, bool), AnyError> { - let (data, mut schema, is_first) = self.to_data( - storage_schema, - time_partition, - schema_version, - )?; + let (data, mut schema, is_first) = + self.to_data(storage_schema, time_partition, schema_version)?; if get_field(&schema, DEFAULT_TIMESTAMP_KEY).is_some() { return Err(anyhow!( diff --git a/src/handlers/http/modal/ingest_server.rs b/src/handlers/http/modal/ingest_server.rs index ce7ec20b2..215f79478 100644 --- a/src/handlers/http/modal/ingest_server.rs +++ b/src/handlers/http/modal/ingest_server.rs @@ -249,7 +249,7 @@ impl IngestServer { web::put() .to(ingestor_logstream::put_stream) .authorize_for_stream(Action::CreateStream), - ) + ), ) .service( // GET "/logstream/{logstream}/info" ==> Get info for given log stream diff --git a/src/metadata.rs b/src/metadata.rs index 8f24a1c15..182bc610a 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -224,8 +224,9 @@ impl StreamInfo { /// Removes the `first_event_at` timestamp for the specified stream from the LogStreamMetadata. /// - /// This function acquires a write lock, retrieves the LogStreamMetadata for the given stream, - /// and removes the `first_event_at` timestamp if it exists. + /// This function is called during the retention task, when the parquet files along with the manifest files are deleted from the storage. + /// The manifest path is removed from the snapshot in the stream.json + /// and the first_event_at value in the stream.json is removed. /// /// # Arguments /// @@ -236,11 +237,6 @@ impl StreamInfo { /// * `Result<(), MetadataError>` - Returns `Ok(())` if the `first_event_at` timestamp is successfully removed, /// or a `MetadataError` if the stream metadata is not found. /// - /// # Errors - /// - /// This function will return an error if: - /// * The stream metadata cannot be found. - /// /// # Examples /// ```ignore /// ```rust From 3700cb2545a1faddef1499a7a3a3398764e084cf Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Fri, 24 Jan 2025 13:26:58 -0500 Subject: [PATCH 3/3] moved to ObjectStorage trait --- src/catalog/mod.rs | 48 ----------------- src/handlers/http/logstream.rs | 7 +-- src/storage/object_storage.rs | 98 +++++++++++++++++++++++++--------- 3 files changed, 77 insertions(+), 76 deletions(-) diff --git a/src/catalog/mod.rs b/src/catalog/mod.rs index c1c877afd..878568e34 100644 --- a/src/catalog/mod.rs +++ b/src/catalog/mod.rs @@ -27,7 +27,6 @@ use crate::option::{Mode, CONFIG}; use crate::stats::{ event_labels_date, get_current_stats, storage_size_labels_date, update_deleted_stats, }; -use crate::storage::object_storage::get_stream_meta_from_storage; use crate::{ catalog::manifest::Manifest, event::DEFAULT_TIMESTAMP_KEY, @@ -435,53 +434,6 @@ pub async fn get_first_event( Ok(Some(first_event_at)) } -/// Retrieves the earliest first-event-at from the storage for the specified stream. -/// -/// This function fetches the object-store format from all the stream.json files for the given stream from the storage, -/// extracts the `first_event_at` timestamps, and returns the earliest `first_event_at`. -/// -/// # Arguments -/// -/// * `stream_name` - The name of the stream for which `first_event_at` is to be retrieved. -/// -/// # Returns -/// -/// * `Result, ObjectStorageError>` - Returns `Ok(Some(String))` with the earliest -/// first event timestamp if found, `Ok(None)` if no timestamps are found, or an `ObjectStorageError` -/// if an error occurs. -/// -/// # Examples -/// ```ignore -/// ```rust -/// let result = get_first_event_from_storage("my_stream").await; -/// match result { -/// Ok(Some(first_event)) => println!("first-event-at: {}", first_event), -/// Ok(None) => println!("first-event-at not found"), -/// Err(err) => println!("Error: {:?}", err), -/// } -/// ``` -pub async fn get_first_event_from_storage( - stream_name: &str, -) -> Result, ObjectStorageError> { - let mut all_first_events = vec![]; - let stream_metas = get_stream_meta_from_storage(stream_name).await; - if let Ok(stream_metas) = stream_metas { - for stream_meta in stream_metas.iter() { - if let Some(first_event) = &stream_meta.first_event_at { - let first_event = DateTime::parse_from_rfc3339(first_event).unwrap(); - let first_event = first_event.with_timezone(&Utc); - all_first_events.push(first_event); - } - } - } - - if all_first_events.is_empty() { - return Ok(None); - } - let first_event_at = all_first_events.iter().min().unwrap().to_rfc3339(); - Ok(Some(first_event_at)) -} - /// Partition the path to which this manifest belongs. /// Useful when uploading the manifest file. pub fn partition_path( diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index 4b4151bc0..a4c25eb45 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -25,7 +25,6 @@ use super::modal::utils::logstream_utils::{ }; use super::query::update_schema_when_distributed; use crate::alerts::Alerts; -use crate::catalog::get_first_event_from_storage; use crate::event::format::{override_data_type, LogSource}; use crate::handlers::STREAM_TYPE_KEY; use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION}; @@ -550,13 +549,15 @@ pub async fn get_stream_info(stream_name: Path) -> Result Result, ObjectStorageError> { + let mut stream_metas = vec![]; + let stream_meta_bytes = self + .get_objects( + Some(&RelativePathBuf::from_iter([ + stream_name, + STREAM_ROOT_DIRECTORY, + ])), + Box::new(|file_name| file_name.ends_with("stream.json")), + ) + .await; + if let Ok(stream_meta_bytes) = stream_meta_bytes { + for stream_meta in stream_meta_bytes { + let stream_meta_ob = serde_json::from_slice::(&stream_meta)?; + stream_metas.push(stream_meta_ob); + } + } + + Ok(stream_metas) + } + + /// Retrieves the earliest first-event-at from the storage for the specified stream. + /// + /// This function fetches the object-store format from all the stream.json files for the given stream from the storage, + /// extracts the `first_event_at` timestamps, and returns the earliest `first_event_at`. + /// + /// # Arguments + /// + /// * `stream_name` - The name of the stream for which `first_event_at` is to be retrieved. + /// + /// # Returns + /// + /// * `Result, ObjectStorageError>` - Returns `Ok(Some(String))` with the earliest + /// first event timestamp if found, `Ok(None)` if no timestamps are found, or an `ObjectStorageError` + /// if an error occurs. + /// + /// # Examples + /// ```ignore + /// ```rust + /// let result = get_first_event_from_storage("my_stream").await; + /// match result { + /// Ok(Some(first_event)) => println!("first-event-at: {}", first_event), + /// Ok(None) => println!("first-event-at not found"), + /// Err(err) => println!("Error: {:?}", err), + /// } + /// ``` + async fn get_first_event_from_storage( + &self, + stream_name: &str, + ) -> Result, ObjectStorageError> { + let mut all_first_events = vec![]; + let stream_metas = self.get_stream_meta_from_storage(stream_name).await; + if let Ok(stream_metas) = stream_metas { + for stream_meta in stream_metas.iter() { + if let Some(first_event) = &stream_meta.first_event_at { + let first_event = DateTime::parse_from_rfc3339(first_event).unwrap(); + let first_event = first_event.with_timezone(&Utc); + all_first_events.push(first_event); + } + } + } + + if all_first_events.is_empty() { + return Ok(None); + } + let first_event_at = all_first_events.iter().min().unwrap().to_rfc3339(); + Ok(Some(first_event_at)) + } + // pick a better name fn get_bucket_name(&self) -> String; } @@ -783,27 +855,3 @@ pub fn ingestor_metadata_path(id: Option<&str>) -> RelativePathBuf { &format!("ingestor.{}.json", INGESTOR_META.get_ingestor_id()), ]) } - -pub async fn get_stream_meta_from_storage( - stream_name: &str, -) -> Result, ObjectStorageError> { - let storage = CONFIG.storage().get_object_store(); - let mut stream_metas = vec![]; - let stream_meta_bytes = storage - .get_objects( - Some(&RelativePathBuf::from_iter([ - stream_name, - STREAM_ROOT_DIRECTORY, - ])), - Box::new(|file_name| file_name.ends_with("stream.json")), - ) - .await; - if let Ok(stream_meta_bytes) = stream_meta_bytes { - for stream_meta in stream_meta_bytes { - let stream_meta_ob = serde_json::from_slice::(&stream_meta)?; - stream_metas.push(stream_meta_ob); - } - } - - Ok(stream_metas) -}