From 3a74a53fc22f3e19ff8a5bd6887a17790e3d0bb7 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Thu, 6 Jun 2024 08:36:45 +0530 Subject: [PATCH 1/3] enhancement of update stream user can update time partition limit and custom partition in the same API call --- server/src/handlers/http/ingest.rs | 1 + server/src/handlers/http/logstream.rs | 77 +++++++++++++++++++++------ server/src/metadata.rs | 27 ++++++++++ server/src/storage/object_storage.rs | 36 +++++++++++++ 4 files changed, 126 insertions(+), 15 deletions(-) diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index 2cb2ffe13..c8f65151e 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -388,6 +388,7 @@ pub async fn create_stream_if_not_exists(stream_name: &str) -> Result<(), PostEr "", "", Arc::new(Schema::empty()), + false ) .await?; } diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index 15e343747..b4193d977 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -185,6 +185,13 @@ pub async fn get_alert(req: HttpRequest) -> Result } pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result { + + let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); + let mut update_stream = false; + if metadata::STREAM_INFO.stream_exists(&stream_name) { + update_stream = true; + } + let time_partition = if let Some((_, time_partition_name)) = req .headers() .iter() @@ -194,6 +201,15 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result Result Result Result Result<(), CreateStreamError>{ + if let Err(err) = storage + .update_stream( + &stream_name, + time_partition_limit, + custom_partition, + ) + .await + { + return Err(CreateStreamError::Storage { stream_name, err }); + } +} + pub async fn create_stream( stream_name: String, time_partition: &str, @@ -589,6 +621,7 @@ pub async fn create_stream( custom_partition: &str, static_schema_flag: &str, schema: Arc, + update_stream: bool ) -> Result<(), CreateStreamError> { // fail to proceed if invalid stream name if stream_name.ne(INTERNAL_STREAM_NAME) { @@ -597,7 +630,20 @@ pub async fn create_stream( // Proceed to create log stream if it doesn't exist let storage = CONFIG.storage().get_object_store(); - if let Err(err) = storage + + if update_stream { + if let Err(err) = storage + .update_stream( + &stream_name, + time_partition_limit, + custom_partition, + ) + .await + { + return Err(CreateStreamError::Storage { stream_name, err }); + } + }else{ + if let Err(err) = storage .create_stream( &stream_name, time_partition, @@ -607,9 +653,10 @@ pub async fn create_stream( schema.clone(), ) .await - { - return Err(CreateStreamError::Storage { stream_name, err }); - } + { + return Err(CreateStreamError::Storage { stream_name, err }); + } + } let stream_meta = CONFIG .storage() diff --git a/server/src/metadata.rs b/server/src/metadata.rs index 76d5dd0da..059e33a43 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -171,6 +171,33 @@ impl StreamInfo { metadata.first_event_at = first_event_at; }) } + + pub fn update_time_partition_limit( + &self, + stream_name: &str, + time_partition_limit: Option, + )-> Result<(), MetadataError>{ + let mut map = self.write().expect(LOCK_EXPECT); + map.get_mut(stream_name) + .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) + .map(|metadata| { + metadata.time_partition_limit = time_partition_limit; + }) + } + + pub fn update_custom_partition( + &self, + stream_name: &str, + custom_partition: Option, + )-> Result<(), MetadataError>{ + let mut map = self.write().expect(LOCK_EXPECT); + map.get_mut(stream_name) + .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) + .map(|metadata| { + metadata.custom_partition = custom_partition; + }) + } + #[allow(clippy::too_many_arguments)] pub fn add_stream( &self, diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 5af74cd60..66d9d67a6 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -167,6 +167,42 @@ pub trait ObjectStorage: Sync + 'static { Ok(()) } + async fn update_time_partition_limit_in_stream( + &self, + stream_name: &str, + time_partition_limit: &str, + ) -> Result<(), ObjectStorageError> { + let mut format = self.get_object_store_format(stream_name).await?; + if time_partition_limit.is_empty() { + format.time_partition_limit = None; + } else { + format.time_partition_limit = Some(time_partition_limit.to_string()); + } + let format_json = to_bytes(&format); + self.put_object(&stream_json_path(stream_name), format_json) + .await?; + + Ok(()) + } + + async fn update_custom_partition_in_stream( + &self, + stream_name: &str, + custom_partition: &str, + ) -> Result<(), ObjectStorageError> { + let mut format = self.get_object_store_format(stream_name).await?; + if custom_partition.is_empty() { + format.custom_partition = None; + } else { + format.custom_partition = Some(custom_partition.to_string()); + } + let format_json = to_bytes(&format); + self.put_object(&stream_json_path(stream_name), format_json) + .await?; + + Ok(()) + } + async fn put_alerts( &self, stream_name: &str, From 5e30ab64cdf0a105ff5da7a06dd5a29c87e26ef8 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Fri, 7 Jun 2024 12:26:48 +0530 Subject: [PATCH 2/3] enhancement for update stream user can make below changes to the existing stream - 1. time partition limit 2. custom partition send extra header X-P-Update-Header=true to make the update --- server/src/handlers.rs | 2 +- server/src/handlers/http/ingest.rs | 1 - server/src/handlers/http/logstream.rs | 155 +++++++++++++++++--------- server/src/metadata.rs | 24 ++-- 4 files changed, 117 insertions(+), 65 deletions(-) diff --git a/server/src/handlers.rs b/server/src/handlers.rs index 5d173e1b4..78e7b2da9 100644 --- a/server/src/handlers.rs +++ b/server/src/handlers.rs @@ -33,7 +33,7 @@ const CUSTOM_PARTITION_KEY: &str = "x-p-custom-partition"; const STATIC_SCHEMA_FLAG: &str = "x-p-static-schema-flag"; const AUTHORIZATION_KEY: &str = "authorization"; const SEPARATOR: char = '^'; - +const UPDATE_STREAM_KEY: &str = "x-p-update-stream"; const OIDC_SCOPE: &str = "openid profile email"; const COOKIE_AGE_DAYS: usize = 7; const SESSION_COOKIE_NAME: &str = "session"; diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index c8f65151e..2cb2ffe13 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -388,7 +388,6 @@ pub async fn create_stream_if_not_exists(stream_name: &str) -> Result<(), PostEr "", "", Arc::new(Schema::empty()), - false ) .await?; } diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index b4193d977..19d8cc999 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -23,6 +23,7 @@ use super::cluster::{fetch_stats_from_ingestors, INTERNAL_STREAM_NAME}; use crate::alerts::Alerts; use crate::handlers::{ CUSTOM_PARTITION_KEY, STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY, + UPDATE_STREAM_KEY, }; use crate::metadata::STREAM_INFO; use crate::option::{Mode, CONFIG}; @@ -185,11 +186,24 @@ pub async fn get_alert(req: HttpRequest) -> Result } pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result { - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - let mut update_stream = false; - if metadata::STREAM_INFO.stream_exists(&stream_name) { - update_stream = true; + let update_stream = if let Some((_, update_stream)) = req + .headers() + .iter() + .find(|&(key, _)| key == UPDATE_STREAM_KEY) + { + update_stream.to_str().unwrap() + } else { + "" + }; + if metadata::STREAM_INFO.stream_exists(&stream_name) && update_stream != "true" { + // Error if the log stream already exists + return Err(StreamError::Custom { + msg: format!( + "Logstream {stream_name} already exists, please create a new log stream with unique name" + ), + status: StatusCode::BAD_REQUEST, + }); } let time_partition = if let Some((_, time_partition_name)) = req @@ -202,11 +216,9 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result Result().is_err() { return Err(StreamError::Custom { - msg: "could not convert duration to an unsigned number".to_string(), + msg: "Could not convert duration to an unsigned number".to_string(), status: StatusCode::BAD_REQUEST, }); } else { time_partition_in_days = days; + if update_stream == "true" { + if let Err(err) = update_time_partition_limit_in_stream( + stream_name.clone(), + time_partition_in_days, + ) + .await + { + return Err(StreamError::CreateStream(err)); + } + return Ok(("Log stream updated", StatusCode::OK)); + } } } let static_schema_flag = if let Some((_, static_schema_flag)) = req @@ -243,11 +266,9 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result Result>(); if custom_partition_list.len() > 3 { return Err(StreamError::Custom { - msg: "maximum 3 custom partition keys are supported".to_string(), + msg: "Maximum 3 custom partition keys are supported".to_string(), status: StatusCode::BAD_REQUEST, }); } + if update_stream == "true" { + if let Err(err) = + update_custom_partition_in_stream(stream_name.clone(), custom_partition).await + { + return Err(StreamError::CreateStream(err)); + } + return Ok(("Log stream updated", StatusCode::OK)); + } } - + let mut schema = Arc::new(Schema::empty()); - if !body.is_empty() && static_schema_flag == "true" { let static_schema: StaticSchema = serde_json::from_slice(&body)?; @@ -283,14 +311,14 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result Result Result<(), CreateStreamError>{ - if let Err(err) = storage - .update_stream( - &stream_name, - time_partition_limit, - custom_partition, - ) - .await - { - return Err(CreateStreamError::Storage { stream_name, err }); - } + stream_name: String, + time_partition_limit: &str, +) -> Result<(), CreateStreamError> { + let storage = CONFIG.storage().get_object_store(); + if let Err(err) = storage + .update_time_partition_limit_in_stream(&stream_name, time_partition_limit) + .await + { + return Err(CreateStreamError::Storage { stream_name, err }); + } + + if metadata::STREAM_INFO + .update_time_partition_limit(&stream_name, time_partition_limit.to_string()) + .is_err() + { + return Err(CreateStreamError::Custom { + msg: "failed to update time partition limit in metadata".to_string(), + status: StatusCode::EXPECTATION_FAILED, + }); + } + + Ok(()) +} + +pub async fn update_custom_partition_in_stream( + stream_name: String, + custom_partition: &str, +) -> Result<(), CreateStreamError> { + let storage = CONFIG.storage().get_object_store(); + if let Err(err) = storage + .update_custom_partition_in_stream(&stream_name, custom_partition) + .await + { + return Err(CreateStreamError::Storage { stream_name, err }); + } + + if metadata::STREAM_INFO + .update_custom_partition(&stream_name, custom_partition.to_string()) + .is_err() + { + return Err(CreateStreamError::Custom { + msg: "failed to update custom partition in metadata".to_string(), + status: StatusCode::EXPECTATION_FAILED, + }); + } + + Ok(()) } pub async fn create_stream( @@ -621,7 +684,6 @@ pub async fn create_stream( custom_partition: &str, static_schema_flag: &str, schema: Arc, - update_stream: bool ) -> Result<(), CreateStreamError> { // fail to proceed if invalid stream name if stream_name.ne(INTERNAL_STREAM_NAME) { @@ -630,20 +692,7 @@ pub async fn create_stream( // Proceed to create log stream if it doesn't exist let storage = CONFIG.storage().get_object_store(); - - if update_stream { - if let Err(err) = storage - .update_stream( - &stream_name, - time_partition_limit, - custom_partition, - ) - .await - { - return Err(CreateStreamError::Storage { stream_name, err }); - } - }else{ - if let Err(err) = storage + if let Err(err) = storage .create_stream( &stream_name, time_partition, @@ -653,10 +702,9 @@ pub async fn create_stream( schema.clone(), ) .await - { - return Err(CreateStreamError::Storage { stream_name, err }); - } - } + { + return Err(CreateStreamError::Storage { stream_name, err }); + } let stream_meta = CONFIG .storage() @@ -760,6 +808,8 @@ pub mod error { stream_name: String, err: ObjectStorageError, }, + #[error("{msg}")] + Custom { msg: String, status: StatusCode }, } #[derive(Debug, thiserror::Error)] @@ -812,6 +862,9 @@ pub mod error { StreamError::CreateStream(CreateStreamError::Storage { .. }) => { StatusCode::INTERNAL_SERVER_ERROR } + StreamError::CreateStream(CreateStreamError::Custom { .. }) => { + StatusCode::INTERNAL_SERVER_ERROR + } StreamError::CacheNotEnabled(_) => StatusCode::BAD_REQUEST, StreamError::StreamNotFound(_) => StatusCode::NOT_FOUND, StreamError::Custom { status, .. } => *status, diff --git a/server/src/metadata.rs b/server/src/metadata.rs index 059e33a43..973a7453e 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -175,27 +175,27 @@ impl StreamInfo { pub fn update_time_partition_limit( &self, stream_name: &str, - time_partition_limit: Option, - )-> Result<(), MetadataError>{ + time_partition_limit: String, + ) -> Result<(), MetadataError> { let mut map = self.write().expect(LOCK_EXPECT); map.get_mut(stream_name) - .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) - .map(|metadata| { - metadata.time_partition_limit = time_partition_limit; - }) + .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) + .map(|metadata| { + metadata.time_partition_limit = Some(time_partition_limit); + }) } pub fn update_custom_partition( &self, stream_name: &str, - custom_partition: Option, - )-> Result<(), MetadataError>{ + custom_partition: String, + ) -> Result<(), MetadataError> { let mut map = self.write().expect(LOCK_EXPECT); map.get_mut(stream_name) - .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) - .map(|metadata| { - metadata.custom_partition = custom_partition; - }) + .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) + .map(|metadata| { + metadata.custom_partition = Some(custom_partition); + }) } #[allow(clippy::too_many_arguments)] From 3f6c8a58d68367425ec46cbef8a01baf96c97bef Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Fri, 7 Jun 2024 14:52:34 +0530 Subject: [PATCH 3/3] deep source analysis fix --- server/src/handlers/http/logstream.rs | 192 ++++++++++++++++---------- 1 file changed, 116 insertions(+), 76 deletions(-) diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index 19d8cc999..87b60fd29 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -187,15 +187,9 @@ pub async fn get_alert(req: HttpRequest) -> Result pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - let update_stream = if let Some((_, update_stream)) = req - .headers() - .iter() - .find(|&(key, _)| key == UPDATE_STREAM_KEY) - { - update_stream.to_str().unwrap() - } else { - "" - }; + let (time_partition, time_partition_limit, custom_partition, static_schema_flag, update_stream) = + fetch_headers_from_put_stream_request(&req); + if metadata::STREAM_INFO.stream_exists(&stream_name) && update_stream != "true" { // Error if the log stream already exists return Err(StreamError::Custom { @@ -206,16 +200,6 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result Result().is_err() { - return Err(StreamError::Custom { - msg: "Could not convert duration to an unsigned number".to_string(), - status: StatusCode::BAD_REQUEST, - }); + if !time_partition_limit.is_empty() { + let time_partition_days = validate_time_partition_limit(&time_partition_limit); + if let Err(err) = time_partition_days { + return Err(StreamError::CreateStream(err)); } else { - time_partition_in_days = days; + time_partition_in_days = time_partition_days.unwrap(); if update_stream == "true" { if let Err(err) = update_time_partition_limit_in_stream( stream_name.clone(), @@ -256,15 +226,6 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result Result>(); - if custom_partition_list.len() > 3 { - return Err(StreamError::Custom { - msg: "Maximum 3 custom partition keys are supported".to_string(), - status: StatusCode::BAD_REQUEST, - }); + if !custom_partition.is_empty() { + if let Err(err) = validate_custom_partition(&custom_partition) { + return Err(StreamError::CreateStream(err)); } if update_stream == "true" { if let Err(err) = - update_custom_partition_in_stream(stream_name.clone(), custom_partition).await + update_custom_partition_in_stream(stream_name.clone(), &custom_partition).await { return Err(StreamError::CreateStream(err)); } @@ -297,10 +248,104 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result (String, String, String, String, String) { + let mut time_partition = String::default(); + let mut time_partition_limit = String::default(); + let mut custom_partition = String::default(); + let mut static_schema_flag = String::default(); + let mut update_stream = String::default(); + req.headers().iter().for_each(|(key, value)| { + if key == TIME_PARTITION_KEY { + time_partition = value.to_str().unwrap().to_string(); + } + if key == TIME_PARTITION_LIMIT_KEY { + time_partition_limit = value.to_str().unwrap().to_string(); + } + if key == CUSTOM_PARTITION_KEY { + custom_partition = value.to_str().unwrap().to_string(); + } + if key == STATIC_SCHEMA_FLAG { + static_schema_flag = value.to_str().unwrap().to_string(); + } + if key == UPDATE_STREAM_KEY { + update_stream = value.to_str().unwrap().to_string(); + } + }); + ( + time_partition, + time_partition_limit, + custom_partition, + static_schema_flag, + update_stream, + ) +} + +fn validate_time_partition_limit(time_partition_limit: &str) -> Result<&str, CreateStreamError> { + if !time_partition_limit.ends_with('d') { + return Err(CreateStreamError::Custom { + msg: "Missing 'd' suffix for duration value".to_string(), + status: StatusCode::BAD_REQUEST, + }); + } + let days = &time_partition_limit[0..time_partition_limit.len() - 1]; + if days.parse::().is_err() { + return Err(CreateStreamError::Custom { + msg: "Could not convert duration to an unsigned number".to_string(), + status: StatusCode::BAD_REQUEST, + }); + } + + Ok(days) +} + +fn validate_custom_partition(custom_partition: &str) -> Result<(), CreateStreamError> { + let custom_partition_list = custom_partition.split(',').collect::>(); + if custom_partition_list.len() > 3 { + return Err(CreateStreamError::Custom { + msg: "Maximum 3 custom partition keys are supported".to_string(), + status: StatusCode::BAD_REQUEST, + }); + } + Ok(()) +} + +fn validate_static_schema( + body: &Bytes, + stream_name: &str, + time_partition: &str, + custom_partition: &str, + static_schema_flag: &str, +) -> Result, CreateStreamError> { + let mut schema = Arc::new(Schema::empty()); if !body.is_empty() && static_schema_flag == "true" { - let static_schema: StaticSchema = serde_json::from_slice(&body)?; + let static_schema: StaticSchema = serde_json::from_slice(body)?; let parsed_schema = convert_static_schema_to_arrow_schema( static_schema.clone(), @@ -310,13 +355,13 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result Result { StatusCode::INTERNAL_SERVER_ERROR } + StreamError::CreateStream(CreateStreamError::SerdeError(_)) => { + StatusCode::BAD_REQUEST + } StreamError::CacheNotEnabled(_) => StatusCode::BAD_REQUEST, StreamError::StreamNotFound(_) => StatusCode::NOT_FOUND, StreamError::Custom { status, .. } => *status,