From 641be4af33dfc5a382e7c1f69c431c60eb816f81 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sat, 25 Jan 2025 22:55:58 +0530 Subject: [PATCH 1/2] refactor: simply store `StreamType` --- src/event/format/json.rs | 4 ++-- src/event/format/mod.rs | 7 ++----- src/handlers/http/logstream.rs | 12 +++++++++--- src/handlers/http/modal/ingest_server.rs | 2 +- .../http/modal/query/querier_logstream.rs | 7 ++++--- src/handlers/http/modal/utils/logstream_utils.rs | 5 +---- src/metadata.rs | 15 ++++++++------- src/storage/mod.rs | 6 +++--- src/storage/object_storage.rs | 2 +- 9 files changed, 31 insertions(+), 29 deletions(-) diff --git a/src/event/format/json.rs b/src/event/format/json.rs index 71fcaffc7..5006be142 100644 --- a/src/event/format/json.rs +++ b/src/event/format/json.rs @@ -94,8 +94,8 @@ impl EventFormat for Event { }; if value_arr - .iter() - .any(|value| fields_mismatch(&schema, value, schema_version)) + .iter() + .any(|value| fields_mismatch(&schema, value, schema_version)) { return Err(anyhow!( "Could not process this event due to mismatch in datatype" diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index 2b2c2a0b3..c0a2ec323 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -112,11 +112,8 @@ pub trait EventFormat: Sized { time_partition: Option<&String>, schema_version: SchemaVersion, ) -> Result<(RecordBatch, bool), AnyError> { - let (data, mut schema, is_first) = self.to_data( - storage_schema, - time_partition, - schema_version, - )?; + let (data, mut schema, is_first) = + self.to_data(storage_schema, time_partition, schema_version)?; if get_field(&schema, DEFAULT_TIMESTAMP_KEY).is_some() { return Err(anyhow!( diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index 903b51ebd..23fa6451d 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -570,7 +570,7 @@ pub async fn get_stream_info(stream_name: Path) -> Result Get info for given log stream diff --git a/src/handlers/http/modal/query/querier_logstream.rs b/src/handlers/http/modal/query/querier_logstream.rs index 622383964..0b7cdb4f4 100644 --- a/src/handlers/http/modal/query/querier_logstream.rs +++ b/src/handlers/http/modal/query/querier_logstream.rs @@ -167,9 +167,10 @@ pub async fn get_stats( let stats = stats::get_current_stats(&stream_name, "json") .ok_or(StreamError::StreamNotFound(stream_name.clone()))?; - let ingestor_stats = if STREAM_INFO.stream_type(&stream_name).unwrap() - == Some(StreamType::UserDefined.to_string()) - { + let ingestor_stats = if matches!( + STREAM_INFO.stream_type(&stream_name), + Ok(Some(StreamType::UserDefined)) + ) { Some(fetch_stats_from_ingestors(&stream_name).await?) } else { None diff --git a/src/handlers/http/modal/utils/logstream_utils.rs b/src/handlers/http/modal/utils/logstream_utils.rs index cdc338ad8..c3e5858ed 100644 --- a/src/handlers/http/modal/utils/logstream_utils.rs +++ b/src/handlers/http/modal/utils/logstream_utils.rs @@ -484,10 +484,7 @@ pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result< .and_then(|limit| limit.parse().ok()); let custom_partition = stream_metadata.custom_partition.as_deref().unwrap_or(""); let static_schema_flag = stream_metadata.static_schema_flag; - let stream_type = stream_metadata - .stream_type - .map(|s| StreamType::from(s.as_str())) - .unwrap_or_default(); + let stream_type = stream_metadata.stream_type.unwrap_or_default(); let schema_version = stream_metadata.schema_version; let log_source = stream_metadata.log_source; metadata::STREAM_INFO.add_stream( diff --git a/src/metadata.rs b/src/metadata.rs index 5c18aa329..984fe0217 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -75,7 +75,7 @@ pub struct LogStreamMetadata { pub custom_partition: Option, pub static_schema_flag: bool, pub hot_tier_enabled: Option, - pub stream_type: Option, + pub stream_type: Option, pub log_source: LogSource, } @@ -299,7 +299,7 @@ impl StreamInfo { } else { static_schema }, - stream_type: Some(stream_type.to_string()), + stream_type: Some(stream_type), schema_version, log_source, ..Default::default() @@ -324,16 +324,17 @@ impl StreamInfo { self.read() .expect(LOCK_EXPECT) .iter() - .filter(|(_, v)| v.stream_type.clone().unwrap() == StreamType::Internal.to_string()) + .filter(|(_, v)| v.stream_type == Some(StreamType::Internal)) .map(|(k, _)| k.clone()) .collect() } - pub fn stream_type(&self, stream_name: &str) -> Result, MetadataError> { - let map = self.read().expect(LOCK_EXPECT); - map.get(stream_name) + pub fn stream_type(&self, stream_name: &str) -> Result, MetadataError> { + self.read() + .expect(LOCK_EXPECT) + .get(stream_name) .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) - .map(|metadata| metadata.stream_type.clone()) + .map(|metadata| metadata.stream_type) } pub fn update_stats( diff --git a/src/storage/mod.rs b/src/storage/mod.rs index f86b55757..2f0bdb423 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -115,7 +115,7 @@ pub struct ObjectStoreFormat { pub static_schema_flag: bool, #[serde(skip_serializing_if = "Option::is_none")] pub hot_tier_enabled: Option, - pub stream_type: Option, + pub stream_type: Option, #[serde(default)] pub log_source: LogSource, } @@ -140,7 +140,7 @@ pub struct StreamInfo { skip_serializing_if = "std::ops::Not::not" )] pub static_schema_flag: bool, - pub stream_type: Option, + pub stream_type: Option, pub log_source: LogSource, } @@ -205,7 +205,7 @@ impl Default for ObjectStoreFormat { version: CURRENT_SCHEMA_VERSION.to_string(), schema_version: SchemaVersion::V1, // Newly created streams should be v1 objectstore_format: CURRENT_OBJECT_STORE_VERSION.to_string(), - stream_type: Some(StreamType::UserDefined.to_string()), + stream_type: Some(StreamType::UserDefined), created_at: Local::now().to_rfc3339(), first_event_at: None, owner: Owner::new("".to_string(), "".to_string()), diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 16d526c8a..5ed9f45cc 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -162,7 +162,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { let format = ObjectStoreFormat { created_at: Local::now().to_rfc3339(), permissions: vec![Permisssion::new(CONFIG.options.username.clone())], - stream_type: Some(stream_type.to_string()), + stream_type: Some(stream_type), time_partition: (!time_partition.is_empty()).then(|| time_partition.to_string()), time_partition_limit: time_partition_limit.map(|limit| limit.to_string()), custom_partition: (!custom_partition.is_empty()).then(|| custom_partition.to_string()), From 513325b2916fe1a09a2f0dd0fa2924f0d06e4992 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 28 Jan 2025 16:59:38 +0530 Subject: [PATCH 2/2] refactor: serde default --- src/handlers/http/logstream.rs | 18 +++++++++--------- .../http/modal/query/querier_logstream.rs | 8 ++++---- .../http/modal/utils/logstream_utils.rs | 2 +- src/metadata.rs | 8 ++++---- src/storage/mod.rs | 8 +++++--- src/storage/object_storage.rs | 2 +- 6 files changed, 24 insertions(+), 22 deletions(-) diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index 30e5a15d1..0239c5fd0 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -568,7 +568,7 @@ pub async fn get_stream_info(stream_name: Path) -> Result Result< .and_then(|limit| limit.parse().ok()); let custom_partition = stream_metadata.custom_partition.as_deref().unwrap_or(""); let static_schema_flag = stream_metadata.static_schema_flag; - let stream_type = stream_metadata.stream_type.unwrap_or_default(); + let stream_type = stream_metadata.stream_type; let schema_version = stream_metadata.schema_version; let log_source = stream_metadata.log_source; metadata::STREAM_INFO.add_stream( diff --git a/src/metadata.rs b/src/metadata.rs index 603b56568..4aa15e66d 100644 --- a/src/metadata.rs +++ b/src/metadata.rs @@ -75,7 +75,7 @@ pub struct LogStreamMetadata { pub custom_partition: Option, pub static_schema_flag: bool, pub hot_tier_enabled: bool, - pub stream_type: Option, + pub stream_type: StreamType, pub log_source: LogSource, } @@ -332,7 +332,7 @@ impl StreamInfo { } else { static_schema }, - stream_type: Some(stream_type), + stream_type, schema_version, log_source, ..Default::default() @@ -357,12 +357,12 @@ impl StreamInfo { self.read() .expect(LOCK_EXPECT) .iter() - .filter(|(_, v)| v.stream_type == Some(StreamType::Internal)) + .filter(|(_, v)| v.stream_type == StreamType::Internal) .map(|(k, _)| k.clone()) .collect() } - pub fn stream_type(&self, stream_name: &str) -> Result, MetadataError> { + pub fn stream_type(&self, stream_name: &str) -> Result { self.read() .expect(LOCK_EXPECT) .get(stream_name) diff --git a/src/storage/mod.rs b/src/storage/mod.rs index a9cdb9e92..3a490617a 100644 --- a/src/storage/mod.rs +++ b/src/storage/mod.rs @@ -115,7 +115,8 @@ pub struct ObjectStoreFormat { pub static_schema_flag: bool, #[serde(default)] pub hot_tier_enabled: bool, - pub stream_type: Option, + #[serde(default)] + pub stream_type: StreamType, #[serde(default)] pub log_source: LogSource, } @@ -140,7 +141,8 @@ pub struct StreamInfo { skip_serializing_if = "std::ops::Not::not" )] pub static_schema_flag: bool, - pub stream_type: Option, + #[serde(default)] + pub stream_type: StreamType, pub log_source: LogSource, } @@ -205,7 +207,7 @@ impl Default for ObjectStoreFormat { version: CURRENT_SCHEMA_VERSION.to_string(), schema_version: SchemaVersion::V1, // Newly created streams should be v1 objectstore_format: CURRENT_OBJECT_STORE_VERSION.to_string(), - stream_type: Some(StreamType::UserDefined), + stream_type: StreamType::UserDefined, created_at: Local::now().to_rfc3339(), first_event_at: None, owner: Owner::new("".to_string(), "".to_string()), diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index b82b9889c..b9dc59dcd 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -162,7 +162,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { let format = ObjectStoreFormat { created_at: Local::now().to_rfc3339(), permissions: vec![Permisssion::new(CONFIG.options.username.clone())], - stream_type: Some(stream_type), + stream_type, time_partition: (!time_partition.is_empty()).then(|| time_partition.to_string()), time_partition_limit: time_partition_limit.map(|limit| limit.to_string()), custom_partition: (!custom_partition.is_empty()).then(|| custom_partition.to_string()),