diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index d8d3a2bf4..26af2c068 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -41,7 +41,6 @@ use itertools::kmerge_by; use lazy_static::__Deref; use relative_path::RelativePath; use relative_path::RelativePathBuf; -use serde::Serialize; use serde_json::Value; use std::{ @@ -149,9 +148,29 @@ pub trait ObjectStorage: Sync + 'static { &self, stream_name: &str, ) -> Result>, ObjectStorageError> { - let schema = self.get_object(&schema_path(stream_name)).await?; - let schema = serde_json::from_slice(&schema).expect("schema map is valid json"); - Ok(schema) + let schema_map = self.get_object(&schema_path(stream_name)).await?; + if let Ok(schema_map) = serde_json::from_slice(&schema_map) { + Ok(schema_map) + } else { + // fix for schema metadata serialize + let mut schema_map: serde_json::Value = + serde_json::from_slice(&schema_map).expect("valid json"); + + for schema in schema_map + .as_object_mut() + .expect("schema map is json object") + .values_mut() + { + let map = schema.as_object_mut().unwrap(); + map.insert( + "metadata".to_string(), + Value::Object(serde_json::Map::new()), + ); + } + + Ok(serde_json::from_value(schema_map) + .expect("should be deserializable after alteration")) + } } async fn get_alerts(&self, stream_name: &str) -> Result { @@ -380,7 +399,7 @@ impl MergedRecordReader { } #[inline(always)] -fn to_bytes(any: &(impl ?Sized + Serialize)) -> Bytes { +fn to_bytes(any: &(impl ?Sized + serde::Serialize)) -> Bytes { serde_json::to_vec(any) .map(|any| any.into()) .expect("serialize cannot fail")