Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion server/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ const CUSTOM_PARTITION_KEY: &str = "x-p-custom-partition";
const STATIC_SCHEMA_FLAG: &str = "x-p-static-schema-flag";
const AUTHORIZATION_KEY: &str = "authorization";
const SEPARATOR: char = '^';

const UPDATE_STREAM_KEY: &str = "x-p-update-stream";
const OIDC_SCOPE: &str = "openid profile email";
const COOKIE_AGE_DAYS: usize = 7;
const SESSION_COOKIE_NAME: &str = "session";
Expand Down
288 changes: 214 additions & 74 deletions server/src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use super::cluster::{fetch_stats_from_ingestors, INTERNAL_STREAM_NAME};
use crate::alerts::Alerts;
use crate::handlers::{
CUSTOM_PARTITION_KEY, STATIC_SCHEMA_FLAG, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY,
UPDATE_STREAM_KEY,
};
use crate::metadata::STREAM_INFO;
use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE};
Expand Down Expand Up @@ -187,77 +188,166 @@ pub async fn get_alert(req: HttpRequest) -> Result<impl Responder, StreamError>
}

pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result<impl Responder, StreamError> {
let time_partition = if let Some((_, time_partition_name)) = req
.headers()
.iter()
.find(|&(key, _)| key == TIME_PARTITION_KEY)
{
time_partition_name.to_str().unwrap()
} else {
""
};
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
let (time_partition, time_partition_limit, custom_partition, static_schema_flag, update_stream) =
fetch_headers_from_put_stream_request(&req);

if metadata::STREAM_INFO.stream_exists(&stream_name) && update_stream != "true" {
// Error if the log stream already exists
return Err(StreamError::Custom {
msg: format!(
"Logstream {stream_name} already exists, please create a new log stream with unique name"
),
status: StatusCode::BAD_REQUEST,
});
}

if !time_partition.is_empty() && update_stream == "true" {
return Err(StreamError::Custom {
msg: "Altering the time partition of an existing stream is restricted.".to_string(),
status: StatusCode::BAD_REQUEST,
});
}
let mut time_partition_in_days: &str = "";
if let Some((_, time_partition_limit_name)) = req
.headers()
.iter()
.find(|&(key, _)| key == TIME_PARTITION_LIMIT_KEY)
{
let time_partition_limit = time_partition_limit_name.to_str().unwrap();
if !time_partition_limit.ends_with('d') {
return Err(StreamError::Custom {
msg: "missing 'd' suffix for duration value".to_string(),
status: StatusCode::BAD_REQUEST,
});
}
let days = &time_partition_limit[0..time_partition_limit.len() - 1];
if days.parse::<NonZeroU32>().is_err() {
return Err(StreamError::Custom {
msg: "could not convert duration to an unsigned number".to_string(),
status: StatusCode::BAD_REQUEST,
});
if !time_partition_limit.is_empty() {
let time_partition_days = validate_time_partition_limit(&time_partition_limit);
if let Err(err) = time_partition_days {
return Err(StreamError::CreateStream(err));
} else {
time_partition_in_days = days;
time_partition_in_days = time_partition_days.unwrap();
if update_stream == "true" {
if let Err(err) = update_time_partition_limit_in_stream(
stream_name.clone(),
time_partition_in_days,
)
.await
{
return Err(StreamError::CreateStream(err));
}
return Ok(("Log stream updated", StatusCode::OK));
}
}
}
let static_schema_flag = if let Some((_, static_schema_flag)) = req
.headers()
.iter()
.find(|&(key, _)| key == STATIC_SCHEMA_FLAG)
{
static_schema_flag.to_str().unwrap()
} else {
""
};
let mut custom_partition: &str = "";
if let Some((_, custom_partition_key)) = req
.headers()
.iter()
.find(|&(key, _)| key == CUSTOM_PARTITION_KEY)
{
custom_partition = custom_partition_key.to_str().unwrap();
let custom_partition_list = custom_partition.split(',').collect::<Vec<&str>>();
if custom_partition_list.len() > 3 {
return Err(StreamError::Custom {
msg: "maximum 3 custom partition keys are supported".to_string(),
status: StatusCode::BAD_REQUEST,
});

if !static_schema_flag.is_empty() && update_stream == "true" {
return Err(StreamError::Custom {
msg: "Altering the schema of an existing stream is restricted.".to_string(),
status: StatusCode::BAD_REQUEST,
});
}

if !custom_partition.is_empty() {
if let Err(err) = validate_custom_partition(&custom_partition) {
return Err(StreamError::CreateStream(err));
}
if update_stream == "true" {
if let Err(err) =
update_custom_partition_in_stream(stream_name.clone(), &custom_partition).await
{
return Err(StreamError::CreateStream(err));
}
return Ok(("Log stream updated", StatusCode::OK));
}
}

let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
let mut schema = Arc::new(Schema::empty());
if metadata::STREAM_INFO.stream_exists(&stream_name) {
// Error if the log stream already exists
return Err(StreamError::Custom {
msg: format!(
"logstream {stream_name} already exists, please create a new log stream with unique name"
),
let schema = validate_static_schema(
&body,
&stream_name,
&time_partition,
&custom_partition,
&static_schema_flag,
);
if let Err(err) = schema {
return Err(StreamError::CreateStream(err));
}

create_stream(
stream_name,
&time_partition,
time_partition_in_days,
&custom_partition,
&static_schema_flag,
schema.unwrap(),
)
.await?;

Ok(("Log stream created", StatusCode::OK))
}

fn fetch_headers_from_put_stream_request(
req: &HttpRequest,
) -> (String, String, String, String, String) {
let mut time_partition = String::default();
let mut time_partition_limit = String::default();
let mut custom_partition = String::default();
let mut static_schema_flag = String::default();
let mut update_stream = String::default();
req.headers().iter().for_each(|(key, value)| {
if key == TIME_PARTITION_KEY {
time_partition = value.to_str().unwrap().to_string();
}
if key == TIME_PARTITION_LIMIT_KEY {
time_partition_limit = value.to_str().unwrap().to_string();
}
if key == CUSTOM_PARTITION_KEY {
custom_partition = value.to_str().unwrap().to_string();
}
if key == STATIC_SCHEMA_FLAG {
static_schema_flag = value.to_str().unwrap().to_string();
}
if key == UPDATE_STREAM_KEY {
update_stream = value.to_str().unwrap().to_string();
}
});

(
time_partition,
time_partition_limit,
custom_partition,
static_schema_flag,
update_stream,
)
}

fn validate_time_partition_limit(time_partition_limit: &str) -> Result<&str, CreateStreamError> {
if !time_partition_limit.ends_with('d') {
return Err(CreateStreamError::Custom {
msg: "Missing 'd' suffix for duration value".to_string(),
status: StatusCode::BAD_REQUEST,
});
}
let days = &time_partition_limit[0..time_partition_limit.len() - 1];
if days.parse::<NonZeroU32>().is_err() {
return Err(CreateStreamError::Custom {
msg: "Could not convert duration to an unsigned number".to_string(),
status: StatusCode::BAD_REQUEST,
});
}

Ok(days)
}

fn validate_custom_partition(custom_partition: &str) -> Result<(), CreateStreamError> {
let custom_partition_list = custom_partition.split(',').collect::<Vec<&str>>();
if custom_partition_list.len() > 3 {
return Err(CreateStreamError::Custom {
msg: "Maximum 3 custom partition keys are supported".to_string(),
status: StatusCode::BAD_REQUEST,
});
}
Ok(())
}

fn validate_static_schema(
body: &Bytes,
stream_name: &str,
time_partition: &str,
custom_partition: &str,
static_schema_flag: &str,
) -> Result<Arc<Schema>, CreateStreamError> {
let mut schema = Arc::new(Schema::empty());
if !body.is_empty() && static_schema_flag == "true" {
let static_schema: StaticSchema = serde_json::from_slice(&body)?;
let static_schema: StaticSchema = serde_json::from_slice(body)?;

let parsed_schema = convert_static_schema_to_arrow_schema(
static_schema.clone(),
Expand All @@ -267,31 +357,21 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result<impl Responder,
if let Ok(parsed_schema) = parsed_schema {
schema = parsed_schema;
} else {
return Err(StreamError::Custom {
msg: format!("unable to commit static schema, logstream {stream_name} not created"),
return Err(CreateStreamError::Custom {
msg: format!("Unable to commit static schema, logstream {stream_name} not created"),
status: StatusCode::BAD_REQUEST,
});
}
} else if body.is_empty() && static_schema_flag == "true" {
return Err(StreamError::Custom {
return Err(CreateStreamError::Custom {
msg: format!(
"please provide schema in the request body for static schema logstream {stream_name}"
"Please provide schema in the request body for static schema logstream {stream_name}"
),
status: StatusCode::BAD_REQUEST,
});
}

create_stream(
stream_name,
time_partition,
time_partition_in_days,
custom_partition,
static_schema_flag,
schema,
)
.await?;

Ok(("log stream created", StatusCode::OK))
Ok(schema)
}

pub async fn put_alert(
Expand Down Expand Up @@ -626,6 +706,56 @@ fn remove_id_from_alerts(value: &mut Value) {
}
}

pub async fn update_time_partition_limit_in_stream(
stream_name: String,
time_partition_limit: &str,
) -> Result<(), CreateStreamError> {
let storage = CONFIG.storage().get_object_store();
if let Err(err) = storage
.update_time_partition_limit_in_stream(&stream_name, time_partition_limit)
.await
{
return Err(CreateStreamError::Storage { stream_name, err });
}

if metadata::STREAM_INFO
.update_time_partition_limit(&stream_name, time_partition_limit.to_string())
.is_err()
{
return Err(CreateStreamError::Custom {
msg: "failed to update time partition limit in metadata".to_string(),
status: StatusCode::EXPECTATION_FAILED,
});
}

Ok(())
}

pub async fn update_custom_partition_in_stream(
stream_name: String,
custom_partition: &str,
) -> Result<(), CreateStreamError> {
let storage = CONFIG.storage().get_object_store();
if let Err(err) = storage
.update_custom_partition_in_stream(&stream_name, custom_partition)
.await
{
return Err(CreateStreamError::Storage { stream_name, err });
}

if metadata::STREAM_INFO
.update_custom_partition(&stream_name, custom_partition.to_string())
.is_err()
{
return Err(CreateStreamError::Custom {
msg: "failed to update custom partition in metadata".to_string(),
status: StatusCode::EXPECTATION_FAILED,
});
}

Ok(())
}

pub async fn create_stream(
stream_name: String,
time_partition: &str,
Expand Down Expand Up @@ -757,6 +887,10 @@ pub mod error {
stream_name: String,
err: ObjectStorageError,
},
#[error("{msg}")]
Custom { msg: String, status: StatusCode },
#[error("Could not deserialize into JSON object, {0}")]
SerdeError(#[from] serde_json::Error),
}

#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -809,6 +943,12 @@ pub mod error {
StreamError::CreateStream(CreateStreamError::Storage { .. }) => {
StatusCode::INTERNAL_SERVER_ERROR
}
StreamError::CreateStream(CreateStreamError::Custom { .. }) => {
StatusCode::INTERNAL_SERVER_ERROR
}
StreamError::CreateStream(CreateStreamError::SerdeError(_)) => {
StatusCode::BAD_REQUEST
}
StreamError::CacheNotEnabled(_) => StatusCode::BAD_REQUEST,
StreamError::StreamNotFound(_) => StatusCode::NOT_FOUND,
StreamError::Custom { status, .. } => *status,
Expand Down
27 changes: 27 additions & 0 deletions server/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,33 @@ impl StreamInfo {
metadata.first_event_at = first_event_at;
})
}

pub fn update_time_partition_limit(
&self,
stream_name: &str,
time_partition_limit: String,
) -> Result<(), MetadataError> {
let mut map = self.write().expect(LOCK_EXPECT);
map.get_mut(stream_name)
.ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string()))
.map(|metadata| {
metadata.time_partition_limit = Some(time_partition_limit);
})
}

pub fn update_custom_partition(
&self,
stream_name: &str,
custom_partition: String,
) -> Result<(), MetadataError> {
let mut map = self.write().expect(LOCK_EXPECT);
map.get_mut(stream_name)
.ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string()))
.map(|metadata| {
metadata.custom_partition = Some(custom_partition);
})
}

#[allow(clippy::too_many_arguments)]
pub fn add_stream(
&self,
Expand Down
Loading