Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions server/src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ pub async fn sync_streams_with_ingestors(

let client = reqwest::Client::new();
for ingestor in ingestor_infos.iter() {
if !utils::check_liveness(&ingestor.domain_name).await {
log::warn!("Ingestor {} is not live", ingestor.domain_name);
continue;
}
let url = format!(
"{}{}/logstream/{}",
ingestor.domain_name,
Expand Down
153 changes: 85 additions & 68 deletions server/src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,33 +253,30 @@ fn validate_static_schema(
custom_partition: &str,
static_schema_flag: &str,
) -> Result<Arc<Schema>, CreateStreamError> {
let mut schema = Arc::new(Schema::empty());
if !body.is_empty() && static_schema_flag == "true" {
let static_schema: StaticSchema = serde_json::from_slice(body)?;

let parsed_schema = convert_static_schema_to_arrow_schema(
static_schema.clone(),
time_partition,
custom_partition,
);
if let Ok(parsed_schema) = parsed_schema {
schema = parsed_schema;
} else {
if static_schema_flag == "true" {
if body.is_empty() {
return Err(CreateStreamError::Custom {
msg: format!("Unable to commit static schema, logstream {stream_name} not created"),
msg: format!(
"Please provide schema in the request body for static schema logstream {stream_name}"
),
status: StatusCode::BAD_REQUEST,
});
}
} else if body.is_empty() && static_schema_flag == "true" {
return Err(CreateStreamError::Custom {

let static_schema: StaticSchema = serde_json::from_slice(body)?;
let parsed_schema =
convert_static_schema_to_arrow_schema(static_schema, time_partition, custom_partition)
.map_err(|_| CreateStreamError::Custom {
msg: format!(
"Please provide schema in the request body for static schema logstream {stream_name}"
"Unable to commit static schema, logstream {stream_name} not created"
),
status: StatusCode::BAD_REQUEST,
});
})?;

return Ok(parsed_schema);
}

Ok(schema)
Ok(Arc::new(Schema::empty()))
}

async fn create_update_stream(
Expand All @@ -291,81 +288,71 @@ async fn create_update_stream(
fetch_headers_from_put_stream_request(req);

if metadata::STREAM_INFO.stream_exists(stream_name) && update_stream != "true" {
// Error if the log stream already exists
return Err(StreamError::Custom {
msg: format!(
"Logstream {stream_name} already exists, please create a new log stream with unique name"
),
status: StatusCode::BAD_REQUEST,
});
}

if !time_partition.is_empty() && update_stream == "true" {
return Err(StreamError::Custom {
msg: "Altering the time partition of an existing stream is restricted.".to_string(),
msg: format!(
"Logstream {stream_name} already exists, please create a new log stream with unique name"
),
status: StatusCode::BAD_REQUEST,
});
}
let mut time_partition_in_days: &str = "";
if !time_partition_limit.is_empty() {
let time_partition_days = validate_time_partition_limit(&time_partition_limit);
if let Err(err) = time_partition_days {
return Err(StreamError::CreateStream(err));
} else {
time_partition_in_days = time_partition_days.unwrap();
if update_stream == "true" {
if let Err(err) = update_time_partition_limit_in_stream(
stream_name.to_string(),
time_partition_in_days,
)
.await
{
return Err(StreamError::CreateStream(err));
}
return Ok(());
}

if update_stream == "true" {
if !STREAM_INFO.stream_exists(stream_name) {
return Err(StreamError::StreamNotFound(stream_name.to_string()));
}
if !time_partition.is_empty() {
return Err(StreamError::Custom {
msg: "Altering the time partition of an existing stream is restricted.".to_string(),
status: StatusCode::BAD_REQUEST,
});
}
}

if !static_schema_flag.is_empty() && update_stream == "true" {
return Err(StreamError::Custom {
msg: "Altering the schema of an existing stream is restricted.".to_string(),
status: StatusCode::BAD_REQUEST,
});
}
if !static_schema_flag.is_empty() {
return Err(StreamError::Custom {
msg: "Altering the schema of an existing stream is restricted.".to_string(),
status: StatusCode::BAD_REQUEST,
});
}

if !custom_partition.is_empty() {
if let Err(err) = validate_custom_partition(&custom_partition) {
return Err(StreamError::CreateStream(err));
if !time_partition_limit.is_empty() {
let time_partition_days = validate_time_partition_limit(&time_partition_limit)?;
update_time_partition_limit_in_stream(stream_name.to_string(), time_partition_days)
.await?;
return Ok(());
}
if update_stream == "true" {
if let Err(err) =
update_custom_partition_in_stream(stream_name.to_string(), &custom_partition).await
{
return Err(StreamError::CreateStream(err));
}

if !custom_partition.is_empty() {
validate_custom_partition(&custom_partition)?;
update_custom_partition_in_stream(stream_name.to_string(), &custom_partition).await?;
return Ok(());
} else {
update_custom_partition_in_stream(stream_name.to_string(), "").await?;
return Ok(());
}
}
let mut time_partition_in_days = "";
if !time_partition_limit.is_empty() {
time_partition_in_days = validate_time_partition_limit(&time_partition_limit)?;
}
if !custom_partition.is_empty() {
validate_custom_partition(&custom_partition)?;
}

let schema = validate_static_schema(
body,
stream_name,
&time_partition,
&custom_partition,
&static_schema_flag,
);
if let Err(err) = schema {
return Err(StreamError::CreateStream(err));
}
)?;

create_stream(
stream_name.to_string(),
&time_partition,
time_partition_in_days,
&custom_partition,
&static_schema_flag,
schema.unwrap(),
schema,
false,
)
.await?;
Expand Down Expand Up @@ -753,6 +740,36 @@ pub async fn update_custom_partition_in_stream(
stream_name: String,
custom_partition: &str,
) -> Result<(), CreateStreamError> {
let static_schema_flag = STREAM_INFO.get_static_schema_flag(&stream_name).unwrap();
if static_schema_flag.is_some() {
let schema = STREAM_INFO.schema(&stream_name).unwrap();

if !custom_partition.is_empty() {
let custom_partition_list = custom_partition.split(',').collect::<Vec<&str>>();
let custom_partition_exists: HashMap<_, _> = custom_partition_list
.iter()
.map(|&partition| {
(
partition.to_string(),
schema
.fields()
.iter()
.any(|field| field.name() == partition),
)
})
.collect();

for partition in &custom_partition_list {
if !custom_partition_exists[*partition] {
return Err(CreateStreamError::Custom {
msg: format!("custom partition field {} does not exist in the schema for the stream {}", partition, stream_name),
status: StatusCode::BAD_REQUEST,
});
}
}
}
}

let storage = CONFIG.storage().get_object_store();
if let Err(err) = storage
.update_custom_partition_in_stream(&stream_name, custom_partition)
Expand Down
4 changes: 4 additions & 0 deletions server/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,10 @@ impl StreamInfo {
map.get_mut(stream_name)
.ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string()))
.map(|metadata| {
if custom_partition.is_empty() {
metadata.custom_partition = None;
return;
}
metadata.custom_partition = Some(custom_partition);
})
}
Expand Down
32 changes: 13 additions & 19 deletions server/src/static_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,27 +65,25 @@ pub fn convert_static_schema_to_arrow_schema(
fields: Vec::new(),
metadata: HashMap::new(),
};
let mut time_partition_exists: bool = false;
let mut time_partition_exists = false;

if !custom_partition.is_empty() {
let custom_partition_list = custom_partition.split(',').collect::<Vec<&str>>();
let mut custom_partition_exists: HashMap<String, bool> =
HashMap::with_capacity(custom_partition_list.len());
let mut custom_partition_exists = HashMap::with_capacity(custom_partition_list.len());

for partition in &custom_partition_list {
for field in &static_schema.fields {
if &field.name == partition {
custom_partition_exists.insert(partition.to_string(), true);
}
if static_schema
.fields
.iter()
.any(|field| &field.name == partition)
{
custom_partition_exists.insert(partition.to_string(), true);
}
}
for partition in custom_partition_list {
if !custom_partition_exists.contains_key(partition) {
return Err(anyhow! {
format!(
"custom partition field {partition} does not exist in the schema for the static schema logstream"
),
});

for partition in &custom_partition_list {
if !custom_partition_exists.contains_key(*partition) {
return Err(anyhow!("custom partition field {partition} does not exist in the schema for the static schema logstream"));
}
}
}
Expand Down Expand Up @@ -135,11 +133,7 @@ pub fn convert_static_schema_to_arrow_schema(
),
});
}
let schema = add_parseable_fields_to_static_schema(parsed_schema);
if schema.is_err() {
return Err(schema.err().unwrap());
}
Ok(schema.unwrap())
add_parseable_fields_to_static_schema(parsed_schema)
}

fn add_parseable_fields_to_static_schema(
Expand Down