diff --git a/server/src/handlers/http/cluster/mod.rs b/server/src/handlers/http/cluster/mod.rs index 74ba8b34c..b3bd78334 100644 --- a/server/src/handlers/http/cluster/mod.rs +++ b/server/src/handlers/http/cluster/mod.rs @@ -107,6 +107,10 @@ pub async fn sync_streams_with_ingestors( let client = reqwest::Client::new(); for ingestor in ingestor_infos.iter() { + if !utils::check_liveness(&ingestor.domain_name).await { + log::warn!("Ingestor {} is not live", ingestor.domain_name); + continue; + } let url = format!( "{}{}/logstream/{}", ingestor.domain_name, diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index ac51a2724..7161d6168 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -253,33 +253,30 @@ fn validate_static_schema( custom_partition: &str, static_schema_flag: &str, ) -> Result, CreateStreamError> { - let mut schema = Arc::new(Schema::empty()); - if !body.is_empty() && static_schema_flag == "true" { - let static_schema: StaticSchema = serde_json::from_slice(body)?; - - let parsed_schema = convert_static_schema_to_arrow_schema( - static_schema.clone(), - time_partition, - custom_partition, - ); - if let Ok(parsed_schema) = parsed_schema { - schema = parsed_schema; - } else { + if static_schema_flag == "true" { + if body.is_empty() { return Err(CreateStreamError::Custom { - msg: format!("Unable to commit static schema, logstream {stream_name} not created"), + msg: format!( + "Please provide schema in the request body for static schema logstream {stream_name}" + ), status: StatusCode::BAD_REQUEST, }); } - } else if body.is_empty() && static_schema_flag == "true" { - return Err(CreateStreamError::Custom { + + let static_schema: StaticSchema = serde_json::from_slice(body)?; + let parsed_schema = + convert_static_schema_to_arrow_schema(static_schema, time_partition, custom_partition) + .map_err(|_| CreateStreamError::Custom { msg: format!( - "Please provide schema in the request body for static schema logstream {stream_name}" + "Unable to commit static schema, logstream {stream_name} not created" ), status: StatusCode::BAD_REQUEST, - }); + })?; + + return Ok(parsed_schema); } - Ok(schema) + Ok(Arc::new(Schema::empty())) } async fn create_update_stream( @@ -291,62 +288,55 @@ async fn create_update_stream( fetch_headers_from_put_stream_request(req); if metadata::STREAM_INFO.stream_exists(stream_name) && update_stream != "true" { - // Error if the log stream already exists return Err(StreamError::Custom { - msg: format!( - "Logstream {stream_name} already exists, please create a new log stream with unique name" - ), - status: StatusCode::BAD_REQUEST, - }); - } - - if !time_partition.is_empty() && update_stream == "true" { - return Err(StreamError::Custom { - msg: "Altering the time partition of an existing stream is restricted.".to_string(), + msg: format!( + "Logstream {stream_name} already exists, please create a new log stream with unique name" + ), status: StatusCode::BAD_REQUEST, }); } - let mut time_partition_in_days: &str = ""; - if !time_partition_limit.is_empty() { - let time_partition_days = validate_time_partition_limit(&time_partition_limit); - if let Err(err) = time_partition_days { - return Err(StreamError::CreateStream(err)); - } else { - time_partition_in_days = time_partition_days.unwrap(); - if update_stream == "true" { - if let Err(err) = update_time_partition_limit_in_stream( - stream_name.to_string(), - time_partition_in_days, - ) - .await - { - return Err(StreamError::CreateStream(err)); - } - return Ok(()); - } + + if update_stream == "true" { + if !STREAM_INFO.stream_exists(stream_name) { + return Err(StreamError::StreamNotFound(stream_name.to_string())); + } + if !time_partition.is_empty() { + return Err(StreamError::Custom { + msg: "Altering the time partition of an existing stream is restricted.".to_string(), + status: StatusCode::BAD_REQUEST, + }); } - } - if !static_schema_flag.is_empty() && update_stream == "true" { - return Err(StreamError::Custom { - msg: "Altering the schema of an existing stream is restricted.".to_string(), - status: StatusCode::BAD_REQUEST, - }); - } + if !static_schema_flag.is_empty() { + return Err(StreamError::Custom { + msg: "Altering the schema of an existing stream is restricted.".to_string(), + status: StatusCode::BAD_REQUEST, + }); + } - if !custom_partition.is_empty() { - if let Err(err) = validate_custom_partition(&custom_partition) { - return Err(StreamError::CreateStream(err)); + if !time_partition_limit.is_empty() { + let time_partition_days = validate_time_partition_limit(&time_partition_limit)?; + update_time_partition_limit_in_stream(stream_name.to_string(), time_partition_days) + .await?; + return Ok(()); } - if update_stream == "true" { - if let Err(err) = - update_custom_partition_in_stream(stream_name.to_string(), &custom_partition).await - { - return Err(StreamError::CreateStream(err)); - } + + if !custom_partition.is_empty() { + validate_custom_partition(&custom_partition)?; + update_custom_partition_in_stream(stream_name.to_string(), &custom_partition).await?; + return Ok(()); + } else { + update_custom_partition_in_stream(stream_name.to_string(), "").await?; return Ok(()); } } + let mut time_partition_in_days = ""; + if !time_partition_limit.is_empty() { + time_partition_in_days = validate_time_partition_limit(&time_partition_limit)?; + } + if !custom_partition.is_empty() { + validate_custom_partition(&custom_partition)?; + } let schema = validate_static_schema( body, @@ -354,10 +344,7 @@ async fn create_update_stream( &time_partition, &custom_partition, &static_schema_flag, - ); - if let Err(err) = schema { - return Err(StreamError::CreateStream(err)); - } + )?; create_stream( stream_name.to_string(), @@ -365,7 +352,7 @@ async fn create_update_stream( time_partition_in_days, &custom_partition, &static_schema_flag, - schema.unwrap(), + schema, false, ) .await?; @@ -753,6 +740,36 @@ pub async fn update_custom_partition_in_stream( stream_name: String, custom_partition: &str, ) -> Result<(), CreateStreamError> { + let static_schema_flag = STREAM_INFO.get_static_schema_flag(&stream_name).unwrap(); + if static_schema_flag.is_some() { + let schema = STREAM_INFO.schema(&stream_name).unwrap(); + + if !custom_partition.is_empty() { + let custom_partition_list = custom_partition.split(',').collect::>(); + let custom_partition_exists: HashMap<_, _> = custom_partition_list + .iter() + .map(|&partition| { + ( + partition.to_string(), + schema + .fields() + .iter() + .any(|field| field.name() == partition), + ) + }) + .collect(); + + for partition in &custom_partition_list { + if !custom_partition_exists[*partition] { + return Err(CreateStreamError::Custom { + msg: format!("custom partition field {} does not exist in the schema for the stream {}", partition, stream_name), + status: StatusCode::BAD_REQUEST, + }); + } + } + } + } + let storage = CONFIG.storage().get_object_store(); if let Err(err) = storage .update_custom_partition_in_stream(&stream_name, custom_partition) diff --git a/server/src/metadata.rs b/server/src/metadata.rs index fb4429a94..06eb995e7 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -224,6 +224,10 @@ impl StreamInfo { map.get_mut(stream_name) .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) .map(|metadata| { + if custom_partition.is_empty() { + metadata.custom_partition = None; + return; + } metadata.custom_partition = Some(custom_partition); }) } diff --git a/server/src/static_schema.rs b/server/src/static_schema.rs index 5f9f60fcf..62ea258b6 100644 --- a/server/src/static_schema.rs +++ b/server/src/static_schema.rs @@ -65,27 +65,25 @@ pub fn convert_static_schema_to_arrow_schema( fields: Vec::new(), metadata: HashMap::new(), }; - let mut time_partition_exists: bool = false; + let mut time_partition_exists = false; if !custom_partition.is_empty() { let custom_partition_list = custom_partition.split(',').collect::>(); - let mut custom_partition_exists: HashMap = - HashMap::with_capacity(custom_partition_list.len()); + let mut custom_partition_exists = HashMap::with_capacity(custom_partition_list.len()); for partition in &custom_partition_list { - for field in &static_schema.fields { - if &field.name == partition { - custom_partition_exists.insert(partition.to_string(), true); - } + if static_schema + .fields + .iter() + .any(|field| &field.name == partition) + { + custom_partition_exists.insert(partition.to_string(), true); } } - for partition in custom_partition_list { - if !custom_partition_exists.contains_key(partition) { - return Err(anyhow! { - format!( - "custom partition field {partition} does not exist in the schema for the static schema logstream" - ), - }); + + for partition in &custom_partition_list { + if !custom_partition_exists.contains_key(*partition) { + return Err(anyhow!("custom partition field {partition} does not exist in the schema for the static schema logstream")); } } } @@ -135,11 +133,7 @@ pub fn convert_static_schema_to_arrow_schema( ), }); } - let schema = add_parseable_fields_to_static_schema(parsed_schema); - if schema.is_err() { - return Err(schema.err().unwrap()); - } - Ok(schema.unwrap()) + add_parseable_fields_to_static_schema(parsed_schema) } fn add_parseable_fields_to_static_schema(