From ff2831dc499153c9047b48ba8330196dc24dc1af Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Thu, 25 Jul 2024 23:38:29 +0530 Subject: [PATCH] fix for create and update stream validation added validation - time partition cannot be part of custom partition --- server/src/handlers/http/ingest.rs | 2 +- server/src/handlers/http/logstream.rs | 34 ++++++++++++++++++++++ server/src/query/stream_schema_provider.rs | 2 +- server/src/utils/json/flatten.rs | 2 +- 4 files changed, 37 insertions(+), 3 deletions(-) diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index 1476e1aad..0a1e9bdd8 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -307,7 +307,7 @@ async fn push_logs(stream_name: String, req: HttpRequest, body: Bytes) -> Result } fn get_parsed_timestamp(body: &Value, time_partition: &Option) -> NaiveDateTime { - let body_timestamp = body.get(&time_partition.clone().unwrap().to_string()); + let body_timestamp = body.get(time_partition.clone().unwrap().to_string()); let parsed_timestamp = body_timestamp .unwrap() .to_owned() diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index 7161d6168..b13cd1679 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -246,6 +246,23 @@ fn validate_custom_partition(custom_partition: &str) -> Result<(), CreateStreamE Ok(()) } +fn validate_time_with_custom_partition( + time_partition: &str, + custom_partition: &str, +) -> Result<(), CreateStreamError> { + let custom_partition_list = custom_partition.split(',').collect::>(); + if custom_partition_list.contains(&time_partition) { + return Err(CreateStreamError::Custom { + msg: format!( + "time partition {} cannot be set as custom partition", + time_partition + ), + status: StatusCode::BAD_REQUEST, + }); + } + Ok(()) +} + fn validate_static_schema( body: &Bytes, stream_name: &str, @@ -338,6 +355,10 @@ async fn create_update_stream( validate_custom_partition(&custom_partition)?; } + if !time_partition.is_empty() && !custom_partition.is_empty() { + validate_time_with_custom_partition(&time_partition, &custom_partition)?; + } + let schema = validate_static_schema( body, stream_name, @@ -741,6 +762,7 @@ pub async fn update_custom_partition_in_stream( custom_partition: &str, ) -> Result<(), CreateStreamError> { let static_schema_flag = STREAM_INFO.get_static_schema_flag(&stream_name).unwrap(); + let time_partition = STREAM_INFO.get_time_partition(&stream_name).unwrap(); if static_schema_flag.is_some() { let schema = STREAM_INFO.schema(&stream_name).unwrap(); @@ -766,6 +788,18 @@ pub async fn update_custom_partition_in_stream( status: StatusCode::BAD_REQUEST, }); } + + if let Some(time_partition) = time_partition.clone() { + if time_partition == *partition { + return Err(CreateStreamError::Custom { + msg: format!( + "time partition {} cannot be set as custom partition", + partition + ), + status: StatusCode::BAD_REQUEST, + }); + } + } } } } diff --git a/server/src/query/stream_schema_provider.rs b/server/src/query/stream_schema_provider.rs index 7100ac95a..6f9eb5608 100644 --- a/server/src/query/stream_schema_provider.rs +++ b/server/src/query/stream_schema_provider.rs @@ -427,7 +427,7 @@ impl TableProvider for StandardTableProvider { let (partitioned_files, statistics) = partitioned_files(manifest_files, &self.schema, 1); let remote_exec = create_parquet_physical_plan( - ObjectStoreUrl::parse(&glob_storage.store_url()).unwrap(), + ObjectStoreUrl::parse(glob_storage.store_url()).unwrap(), partitioned_files, statistics, self.schema.clone(), diff --git a/server/src/utils/json/flatten.rs b/server/src/utils/json/flatten.rs index 60a77081d..6e84ab0b7 100644 --- a/server/src/utils/json/flatten.rs +++ b/server/src/utils/json/flatten.rs @@ -156,7 +156,7 @@ pub fn validate_time_partition( } else { 30 }; - let body_timestamp = value.get(&time_partition.clone().unwrap().to_string()); + let body_timestamp = value.get(time_partition.clone().unwrap().to_string()); if body_timestamp.is_some() && body_timestamp.unwrap().to_owned().as_str().is_some() { if body_timestamp .unwrap()