From 56015fd37060b981bcb571f3cce9863fabe67aa4 Mon Sep 17 00:00:00 2001 From: Eshan Chatterjee Date: Tue, 19 Mar 2024 15:20:19 +0530 Subject: [PATCH] fix: sync streams on ingest server start sync the streams that are present, when a new Ingest Server is started --- server/src/metadata.rs | 2 +- server/src/storage/object_storage.rs | 25 +++++++++++++++++++++++++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/server/src/metadata.rs b/server/src/metadata.rs index e8a250719..cafcf820f 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -168,7 +168,7 @@ impl StreamInfo { for stream in storage.list_streams().await? { let alerts = storage.get_alerts(&stream.name).await?; - let schema = storage.get_schema(&stream.name).await?; + let schema = storage.get_schema_for_the_first_time(&stream.name).await?; let meta = storage.get_stream_metadata(&stream.name).await?; let schema = update_schema_from_staging(&stream.name, schema); diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 486fd8403..352541e5c 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -173,6 +173,15 @@ pub trait ObjectStorage: Sync + 'static { .await } + async fn get_schema_for_the_first_time( + &self, + stream_name: &str, + ) -> Result { + let schema_path = RelativePathBuf::from_iter([stream_name, SCHEMA_FILE_NAME]); + let schema_map = self.get_object(&schema_path).await?; + Ok(serde_json::from_slice(&schema_map)?) + } + async fn get_schema(&self, stream_name: &str) -> Result { let schema_map = self.get_object(&schema_path(stream_name)).await?; Ok(serde_json::from_slice(&schema_map)?) @@ -231,6 +240,22 @@ pub trait ObjectStorage: Sync + 'static { self.put_object(&path, to_bytes(manifest)).await } + /// for future use + async fn get_stats_for_first_time( + &self, + stream_name: &str, + ) -> Result { + let path = RelativePathBuf::from_iter([stream_name, STREAM_METADATA_FILE_NAME]); + let stream_metadata = self.get_object(&path).await?; + let stream_metadata: Value = + serde_json::from_slice(&stream_metadata).expect("parseable config is valid json"); + let stats = &stream_metadata["stats"]; + + let stats = serde_json::from_value(stats.clone()).unwrap_or_default(); + + Ok(stats) + } + async fn get_stats(&self, stream_name: &str) -> Result { let stream_metadata = self.get_object(&stream_json_path(stream_name)).await?; let stream_metadata: Value =