diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index e1b1b987a..0239c5fd0 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -568,8 +568,8 @@ pub async fn get_stream_info(stream_name: Path) -> Result Result< .and_then(|limit| limit.parse().ok()); let custom_partition = stream_metadata.custom_partition.as_deref().unwrap_or(""); let static_schema_flag = stream_metadata.static_schema_flag; - let stream_type = stream_metadata - .stream_type - .map(|s| StreamType::from(s.as_str())) - .unwrap_or_default(); + let stream_type = stream_metadata.stream_type; let schema_version = stream_metadata.schema_version; let log_source = stream_metadata.log_source; metadata::STREAM_INFO.add_stream( diff --git a/src/metadata.rs b/src/metadata.rs index 87af45d65..4aa15e66d 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -75,7 +75,7 @@ pub struct LogStreamMetadata { pub custom_partition: Option, pub static_schema_flag: bool, pub hot_tier_enabled: bool, - pub stream_type: Option, + pub stream_type: StreamType, pub log_source: LogSource, } @@ -332,7 +332,7 @@ impl StreamInfo { } else { static_schema }, - stream_type: Some(stream_type.to_string()), + stream_type, schema_version, log_source, ..Default::default() @@ -357,16 +357,17 @@ impl StreamInfo { self.read() .expect(LOCK_EXPECT) .iter() - .filter(|(_, v)| v.stream_type.clone().unwrap() == StreamType::Internal.to_string()) + .filter(|(_, v)| v.stream_type == StreamType::Internal) .map(|(k, _)| k.clone()) .collect() } - pub fn stream_type(&self, stream_name: &str) -> Result, MetadataError> { - let map = self.read().expect(LOCK_EXPECT); - map.get(stream_name) + pub fn stream_type(&self, stream_name: &str) -> Result { + self.read() + .expect(LOCK_EXPECT) + .get(stream_name) .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) - .map(|metadata| metadata.stream_type.clone()) + .map(|metadata| metadata.stream_type) } pub fn update_stats( diff --git a/src/storage/mod.rs b/src/storage/mod.rs index 7098b85d3..3a490617a 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -115,7 +115,8 @@ pub struct ObjectStoreFormat { pub static_schema_flag: bool, #[serde(default)] pub hot_tier_enabled: bool, - pub stream_type: Option, + #[serde(default)] + pub stream_type: StreamType, #[serde(default)] pub log_source: LogSource, } @@ -140,7 +141,8 @@ pub struct StreamInfo { skip_serializing_if = "std::ops::Not::not" )] pub static_schema_flag: bool, - pub stream_type: Option, + #[serde(default)] + pub stream_type: StreamType, pub log_source: LogSource, } @@ -205,7 +207,7 @@ impl Default for ObjectStoreFormat { version: CURRENT_SCHEMA_VERSION.to_string(), schema_version: SchemaVersion::V1, // Newly created streams should be v1 objectstore_format: CURRENT_OBJECT_STORE_VERSION.to_string(), - stream_type: Some(StreamType::UserDefined.to_string()), + stream_type: StreamType::UserDefined, created_at: Local::now().to_rfc3339(), first_event_at: None, owner: Owner::new("".to_string(), "".to_string()), diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 8c24ad2f9..b9dc59dcd 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -162,7 +162,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { let format = ObjectStoreFormat { created_at: Local::now().to_rfc3339(), permissions: vec![Permisssion::new(CONFIG.options.username.clone())], - stream_type: Some(stream_type.to_string()), + stream_type, time_partition: (!time_partition.is_empty()).then(|| time_partition.to_string()), time_partition_limit: time_partition_limit.map(|limit| limit.to_string()), custom_partition: (!custom_partition.is_empty()).then(|| custom_partition.to_string()),