diff --git a/server/src/handlers.rs b/server/src/handlers.rs index 5d173e1b4..78e7b2da9 100644 --- a/server/src/handlers.rs +++ b/server/src/handlers.rs @@ -33,7 +33,7 @@ const CUSTOM_PARTITION_KEY: &str = "x-p-custom-partition"; const STATIC_SCHEMA_FLAG: &str = "x-p-static-schema-flag"; const AUTHORIZATION_KEY: &str = "authorization"; const SEPARATOR: char = '^'; - +const UPDATE_STREAM_KEY: &str = "x-p-update-stream"; const OIDC_SCOPE: &str = "openid profile email"; const COOKIE_AGE_DAYS: usize = 7; const SESSION_COOKIE_NAME: &str = "session"; diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index 76f847672..6366672d6 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -23,6 +23,7 @@ use super::cluster::{fetch_stats_from_ingestors, INTERNAL_STREAM_NAME}; use crate::alerts::Alerts; use crate::handlers::{ CUSTOM_PARTITION_KEY, STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY, + UPDATE_STREAM_KEY, }; use crate::metadata::STREAM_INFO; use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE}; @@ -187,77 +188,166 @@ pub async fn get_alert(req: HttpRequest) -> Result } pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result { - let time_partition = if let Some((_, time_partition_name)) = req - .headers() - .iter() - .find(|&(key, _)| key == TIME_PARTITION_KEY) - { - time_partition_name.to_str().unwrap() - } else { - "" - }; + let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); + let (time_partition, time_partition_limit, custom_partition, static_schema_flag, update_stream) = + fetch_headers_from_put_stream_request(&req); + + if metadata::STREAM_INFO.stream_exists(&stream_name) && update_stream != "true" { + // Error if the log stream already exists + return Err(StreamError::Custom { + msg: format!( + "Logstream {stream_name} already exists, please create a new log stream with unique name" + ), + status: StatusCode::BAD_REQUEST, + }); + } + + if !time_partition.is_empty() && update_stream == "true" { + return Err(StreamError::Custom { + msg: "Altering the time partition of an existing stream is restricted.".to_string(), + status: StatusCode::BAD_REQUEST, + }); + } let mut time_partition_in_days: &str = ""; - if let Some((_, time_partition_limit_name)) = req - .headers() - .iter() - .find(|&(key, _)| key == TIME_PARTITION_LIMIT_KEY) - { - let time_partition_limit = time_partition_limit_name.to_str().unwrap(); - if !time_partition_limit.ends_with('d') { - return Err(StreamError::Custom { - msg: "missing 'd' suffix for duration value".to_string(), - status: StatusCode::BAD_REQUEST, - }); - } - let days = &time_partition_limit[0..time_partition_limit.len() - 1]; - if days.parse::().is_err() { - return Err(StreamError::Custom { - msg: "could not convert duration to an unsigned number".to_string(), - status: StatusCode::BAD_REQUEST, - }); + if !time_partition_limit.is_empty() { + let time_partition_days = validate_time_partition_limit(&time_partition_limit); + if let Err(err) = time_partition_days { + return Err(StreamError::CreateStream(err)); } else { - time_partition_in_days = days; + time_partition_in_days = time_partition_days.unwrap(); + if update_stream == "true" { + if let Err(err) = update_time_partition_limit_in_stream( + stream_name.clone(), + time_partition_in_days, + ) + .await + { + return Err(StreamError::CreateStream(err)); + } + return Ok(("Log stream updated", StatusCode::OK)); + } } } - let static_schema_flag = if let Some((_, static_schema_flag)) = req - .headers() - .iter() - .find(|&(key, _)| key == STATIC_SCHEMA_FLAG) - { - static_schema_flag.to_str().unwrap() - } else { - "" - }; - let mut custom_partition: &str = ""; - if let Some((_, custom_partition_key)) = req - .headers() - .iter() - .find(|&(key, _)| key == CUSTOM_PARTITION_KEY) - { - custom_partition = custom_partition_key.to_str().unwrap(); - let custom_partition_list = custom_partition.split(',').collect::>(); - if custom_partition_list.len() > 3 { - return Err(StreamError::Custom { - msg: "maximum 3 custom partition keys are supported".to_string(), - status: StatusCode::BAD_REQUEST, - }); + + if !static_schema_flag.is_empty() && update_stream == "true" { + return Err(StreamError::Custom { + msg: "Altering the schema of an existing stream is restricted.".to_string(), + status: StatusCode::BAD_REQUEST, + }); + } + + if !custom_partition.is_empty() { + if let Err(err) = validate_custom_partition(&custom_partition) { + return Err(StreamError::CreateStream(err)); + } + if update_stream == "true" { + if let Err(err) = + update_custom_partition_in_stream(stream_name.clone(), &custom_partition).await + { + return Err(StreamError::CreateStream(err)); + } + return Ok(("Log stream updated", StatusCode::OK)); } } - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - let mut schema = Arc::new(Schema::empty()); - if metadata::STREAM_INFO.stream_exists(&stream_name) { - // Error if the log stream already exists - return Err(StreamError::Custom { - msg: format!( - "logstream {stream_name} already exists, please create a new log stream with unique name" - ), + let schema = validate_static_schema( + &body, + &stream_name, + &time_partition, + &custom_partition, + &static_schema_flag, + ); + if let Err(err) = schema { + return Err(StreamError::CreateStream(err)); + } + + create_stream( + stream_name, + &time_partition, + time_partition_in_days, + &custom_partition, + &static_schema_flag, + schema.unwrap(), + ) + .await?; + + Ok(("Log stream created", StatusCode::OK)) +} + +fn fetch_headers_from_put_stream_request( + req: &HttpRequest, +) -> (String, String, String, String, String) { + let mut time_partition = String::default(); + let mut time_partition_limit = String::default(); + let mut custom_partition = String::default(); + let mut static_schema_flag = String::default(); + let mut update_stream = String::default(); + req.headers().iter().for_each(|(key, value)| { + if key == TIME_PARTITION_KEY { + time_partition = value.to_str().unwrap().to_string(); + } + if key == TIME_PARTITION_LIMIT_KEY { + time_partition_limit = value.to_str().unwrap().to_string(); + } + if key == CUSTOM_PARTITION_KEY { + custom_partition = value.to_str().unwrap().to_string(); + } + if key == STATIC_SCHEMA_FLAG { + static_schema_flag = value.to_str().unwrap().to_string(); + } + if key == UPDATE_STREAM_KEY { + update_stream = value.to_str().unwrap().to_string(); + } + }); + + ( + time_partition, + time_partition_limit, + custom_partition, + static_schema_flag, + update_stream, + ) +} + +fn validate_time_partition_limit(time_partition_limit: &str) -> Result<&str, CreateStreamError> { + if !time_partition_limit.ends_with('d') { + return Err(CreateStreamError::Custom { + msg: "Missing 'd' suffix for duration value".to_string(), status: StatusCode::BAD_REQUEST, }); } + let days = &time_partition_limit[0..time_partition_limit.len() - 1]; + if days.parse::().is_err() { + return Err(CreateStreamError::Custom { + msg: "Could not convert duration to an unsigned number".to_string(), + status: StatusCode::BAD_REQUEST, + }); + } + + Ok(days) +} +fn validate_custom_partition(custom_partition: &str) -> Result<(), CreateStreamError> { + let custom_partition_list = custom_partition.split(',').collect::>(); + if custom_partition_list.len() > 3 { + return Err(CreateStreamError::Custom { + msg: "Maximum 3 custom partition keys are supported".to_string(), + status: StatusCode::BAD_REQUEST, + }); + } + Ok(()) +} + +fn validate_static_schema( + body: &Bytes, + stream_name: &str, + time_partition: &str, + custom_partition: &str, + static_schema_flag: &str, +) -> Result, CreateStreamError> { + let mut schema = Arc::new(Schema::empty()); if !body.is_empty() && static_schema_flag == "true" { - let static_schema: StaticSchema = serde_json::from_slice(&body)?; + let static_schema: StaticSchema = serde_json::from_slice(body)?; let parsed_schema = convert_static_schema_to_arrow_schema( static_schema.clone(), @@ -267,31 +357,21 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result Result<(), CreateStreamError> { + let storage = CONFIG.storage().get_object_store(); + if let Err(err) = storage + .update_time_partition_limit_in_stream(&stream_name, time_partition_limit) + .await + { + return Err(CreateStreamError::Storage { stream_name, err }); + } + + if metadata::STREAM_INFO + .update_time_partition_limit(&stream_name, time_partition_limit.to_string()) + .is_err() + { + return Err(CreateStreamError::Custom { + msg: "failed to update time partition limit in metadata".to_string(), + status: StatusCode::EXPECTATION_FAILED, + }); + } + + Ok(()) +} + +pub async fn update_custom_partition_in_stream( + stream_name: String, + custom_partition: &str, +) -> Result<(), CreateStreamError> { + let storage = CONFIG.storage().get_object_store(); + if let Err(err) = storage + .update_custom_partition_in_stream(&stream_name, custom_partition) + .await + { + return Err(CreateStreamError::Storage { stream_name, err }); + } + + if metadata::STREAM_INFO + .update_custom_partition(&stream_name, custom_partition.to_string()) + .is_err() + { + return Err(CreateStreamError::Custom { + msg: "failed to update custom partition in metadata".to_string(), + status: StatusCode::EXPECTATION_FAILED, + }); + } + + Ok(()) +} + pub async fn create_stream( stream_name: String, time_partition: &str, @@ -757,6 +887,10 @@ pub mod error { stream_name: String, err: ObjectStorageError, }, + #[error("{msg}")] + Custom { msg: String, status: StatusCode }, + #[error("Could not deserialize into JSON object, {0}")] + SerdeError(#[from] serde_json::Error), } #[derive(Debug, thiserror::Error)] @@ -809,6 +943,12 @@ pub mod error { StreamError::CreateStream(CreateStreamError::Storage { .. }) => { StatusCode::INTERNAL_SERVER_ERROR } + StreamError::CreateStream(CreateStreamError::Custom { .. }) => { + StatusCode::INTERNAL_SERVER_ERROR + } + StreamError::CreateStream(CreateStreamError::SerdeError(_)) => { + StatusCode::BAD_REQUEST + } StreamError::CacheNotEnabled(_) => StatusCode::BAD_REQUEST, StreamError::StreamNotFound(_) => StatusCode::NOT_FOUND, StreamError::Custom { status, .. } => *status, diff --git a/server/src/metadata.rs b/server/src/metadata.rs index a9a8ac1dd..7d1b671ea 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -171,6 +171,33 @@ impl StreamInfo { metadata.first_event_at = first_event_at; }) } + + pub fn update_time_partition_limit( + &self, + stream_name: &str, + time_partition_limit: String, + ) -> Result<(), MetadataError> { + let mut map = self.write().expect(LOCK_EXPECT); + map.get_mut(stream_name) + .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) + .map(|metadata| { + metadata.time_partition_limit = Some(time_partition_limit); + }) + } + + pub fn update_custom_partition( + &self, + stream_name: &str, + custom_partition: String, + ) -> Result<(), MetadataError> { + let mut map = self.write().expect(LOCK_EXPECT); + map.get_mut(stream_name) + .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) + .map(|metadata| { + metadata.custom_partition = Some(custom_partition); + }) + } + #[allow(clippy::too_many_arguments)] pub fn add_stream( &self, diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 568326e94..6e11fdf6b 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -167,6 +167,42 @@ pub trait ObjectStorage: Sync + 'static { Ok(()) } + async fn update_time_partition_limit_in_stream( + &self, + stream_name: &str, + time_partition_limit: &str, + ) -> Result<(), ObjectStorageError> { + let mut format = self.get_object_store_format(stream_name).await?; + if time_partition_limit.is_empty() { + format.time_partition_limit = None; + } else { + format.time_partition_limit = Some(time_partition_limit.to_string()); + } + let format_json = to_bytes(&format); + self.put_object(&stream_json_path(stream_name), format_json) + .await?; + + Ok(()) + } + + async fn update_custom_partition_in_stream( + &self, + stream_name: &str, + custom_partition: &str, + ) -> Result<(), ObjectStorageError> { + let mut format = self.get_object_store_format(stream_name).await?; + if custom_partition.is_empty() { + format.custom_partition = None; + } else { + format.custom_partition = Some(custom_partition.to_string()); + } + let format_json = to_bytes(&format); + self.put_object(&stream_json_path(stream_name), format_json) + .await?; + + Ok(()) + } + async fn put_alerts( &self, stream_name: &str,