Skip to content

Commit 0d11d84

Browse files
authored
Fix: Stream Sync on Ingest Server (#708)
1 parent 8717a26 commit 0d11d84

File tree

2 files changed

+26
-1
lines changed

2 files changed

+26
-1
lines changed

server/src/metadata.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ impl StreamInfo {
168168

169169
for stream in storage.list_streams().await? {
170170
let alerts = storage.get_alerts(&stream.name).await?;
171-
let schema = storage.get_schema(&stream.name).await?;
171+
let schema = storage.get_schema_for_the_first_time(&stream.name).await?;
172172
let meta = storage.get_stream_metadata(&stream.name).await?;
173173

174174
let schema = update_schema_from_staging(&stream.name, schema);

server/src/storage/object_storage.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,15 @@ pub trait ObjectStorage: Sync + 'static {
173173
.await
174174
}
175175

176+
async fn get_schema_for_the_first_time(
177+
&self,
178+
stream_name: &str,
179+
) -> Result<Schema, ObjectStorageError> {
180+
let schema_path = RelativePathBuf::from_iter([stream_name, SCHEMA_FILE_NAME]);
181+
let schema_map = self.get_object(&schema_path).await?;
182+
Ok(serde_json::from_slice(&schema_map)?)
183+
}
184+
176185
async fn get_schema(&self, stream_name: &str) -> Result<Schema, ObjectStorageError> {
177186
let schema_map = self.get_object(&schema_path(stream_name)).await?;
178187
Ok(serde_json::from_slice(&schema_map)?)
@@ -231,6 +240,22 @@ pub trait ObjectStorage: Sync + 'static {
231240
self.put_object(&path, to_bytes(manifest)).await
232241
}
233242

243+
/// for future use
244+
async fn get_stats_for_first_time(
245+
&self,
246+
stream_name: &str,
247+
) -> Result<Stats, ObjectStorageError> {
248+
let path = RelativePathBuf::from_iter([stream_name, STREAM_METADATA_FILE_NAME]);
249+
let stream_metadata = self.get_object(&path).await?;
250+
let stream_metadata: Value =
251+
serde_json::from_slice(&stream_metadata).expect("parseable config is valid json");
252+
let stats = &stream_metadata["stats"];
253+
254+
let stats = serde_json::from_value(stats.clone()).unwrap_or_default();
255+
256+
Ok(stats)
257+
}
258+
234259
async fn get_stats(&self, stream_name: &str) -> Result<Stats, ObjectStorageError> {
235260
let stream_metadata = self.get_object(&stream_json_path(stream_name)).await?;
236261
let stream_metadata: Value =

0 commit comments

Comments
 (0)