Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion server/src/event/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@ pub trait EventFormat: Sized {
fn to_data(
self,
schema: HashMap<String, Arc<Field>>,
time_partition: Option<String>,
) -> Result<(Self::Data, EventSchema, bool, Tags, Metadata), AnyError>;
fn decode(data: Self::Data, schema: Arc<Schema>) -> Result<RecordBatch, AnyError>;
fn into_recordbatch(
self,
schema: HashMap<String, Arc<Field>>,
time_partition: Option<String>,
) -> Result<(RecordBatch, bool), AnyError> {
let (data, mut schema, is_first, tags, metadata) = self.to_data(schema)?;
let (data, mut schema, is_first, tags, metadata) = self.to_data(schema, time_partition)?;

if get_field(&schema, DEFAULT_TAGS_KEY).is_some() {
return Err(anyhow!("field {} is a reserved field", DEFAULT_TAGS_KEY));
Expand Down
3 changes: 2 additions & 1 deletion server/src/event/format/json.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,9 @@ impl EventFormat for Event {
fn to_data(
self,
schema: HashMap<String, Arc<Field>>,
time_partition: Option<String>,
) -> Result<(Self::Data, Vec<Arc<Field>>, bool, Tags, Metadata), anyhow::Error> {
let data = flatten_json_body(self.data)?;
let data = flatten_json_body(self.data, time_partition)?;
let stream_schema = schema;

// incoming event may be a single json or a json array
Expand Down
1 change: 1 addition & 0 deletions server/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const PREFIX_TAGS: &str = "x-p-tag-";
const PREFIX_META: &str = "x-p-meta-";
const STREAM_NAME_HEADER_KEY: &str = "x-p-stream";
const LOG_SOURCE_KEY: &str = "x-p-log-source";
const TIME_PARTITION_KEY: &str = "x-p-time-partition";
const AUTHORIZATION_KEY: &str = "authorization";
const SEPARATOR: char = '^';

Expand Down
70 changes: 53 additions & 17 deletions server/src/handlers/http/ingest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,14 @@ async fn push_logs(stream_name: String, req: HttpRequest, body: Bytes) -> Result
.ok_or(PostError::StreamNotFound(stream_name.clone()))?
.schema
.clone();
into_event_batch(req, body, schema)?

let time_partition = hash_map
.get(&stream_name)
.ok_or(PostError::StreamNotFound(stream_name.clone()))?
.time_partition
.clone();

into_event_batch(req, body, schema, time_partition)?
};

event::Event {
Expand All @@ -119,6 +126,7 @@ fn into_event_batch(
req: HttpRequest,
body: Bytes,
schema: HashMap<String, Arc<Field>>,
time_partition: Option<String>,
) -> Result<(usize, arrow_array::RecordBatch, bool), PostError> {
let tags = collect_labelled_headers(&req, PREFIX_TAGS, SEPARATOR)?;
let metadata = collect_labelled_headers(&req, PREFIX_META, SEPARATOR)?;
Expand All @@ -129,7 +137,7 @@ fn into_event_batch(
tags,
metadata,
};
let (rb, is_first) = event.into_recordbatch(schema)?;
let (rb, is_first) = event.into_recordbatch(schema, time_partition)?;
Ok((size, rb, is_first))
}

Expand All @@ -138,7 +146,7 @@ pub async fn create_stream_if_not_exists(stream_name: &str) -> Result<(), PostEr
if STREAM_INFO.stream_exists(stream_name) {
return Ok(());
}
super::logstream::create_stream(stream_name.to_string()).await?;
super::logstream::create_stream(stream_name.to_string(), "").await?;
Ok(())
}

Expand Down Expand Up @@ -241,6 +249,7 @@ mod tests {
req,
Bytes::from(serde_json::to_vec(&json).unwrap()),
HashMap::default(),
None,
)
.unwrap();

Expand Down Expand Up @@ -287,6 +296,7 @@ mod tests {
req,
Bytes::from(serde_json::to_vec(&json).unwrap()),
HashMap::default(),
None,
)
.unwrap();

Expand Down Expand Up @@ -320,8 +330,13 @@ mod tests {

let req = TestRequest::default().to_http_request();

let (_, rb, _) =
into_event_batch(req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema).unwrap();
let (_, rb, _) = into_event_batch(
req,
Bytes::from(serde_json::to_vec(&json).unwrap()),
schema,
None,
)
.unwrap();

assert_eq!(rb.num_rows(), 1);
assert_eq!(rb.num_columns(), 5);
Expand Down Expand Up @@ -353,10 +368,13 @@ mod tests {

let req = TestRequest::default().to_http_request();

assert!(
into_event_batch(req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema,)
.is_err()
);
assert!(into_event_batch(
req,
Bytes::from(serde_json::to_vec(&json).unwrap()),
schema,
None
)
.is_err());
}

#[test]
Expand All @@ -374,8 +392,13 @@ mod tests {

let req = TestRequest::default().to_http_request();

let (_, rb, _) =
into_event_batch(req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema).unwrap();
let (_, rb, _) = into_event_batch(
req,
Bytes::from(serde_json::to_vec(&json).unwrap()),
schema,
None,
)
.unwrap();

assert_eq!(rb.num_rows(), 1);
assert_eq!(rb.num_columns(), 3);
Expand All @@ -391,6 +414,7 @@ mod tests {
req,
Bytes::from(serde_json::to_vec(&json).unwrap()),
HashMap::default(),
None
)
.is_err())
}
Expand Down Expand Up @@ -419,6 +443,7 @@ mod tests {
req,
Bytes::from(serde_json::to_vec(&json).unwrap()),
HashMap::default(),
None,
)
.unwrap();

Expand Down Expand Up @@ -472,6 +497,7 @@ mod tests {
req,
Bytes::from(serde_json::to_vec(&json).unwrap()),
HashMap::default(),
None,
)
.unwrap();

Expand Down Expand Up @@ -521,8 +547,13 @@ mod tests {
);
let req = TestRequest::default().to_http_request();

let (_, rb, _) =
into_event_batch(req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema).unwrap();
let (_, rb, _) = into_event_batch(
req,
Bytes::from(serde_json::to_vec(&json).unwrap()),
schema,
None,
)
.unwrap();

assert_eq!(rb.num_rows(), 3);
assert_eq!(rb.num_columns(), 6);
Expand Down Expand Up @@ -566,6 +597,7 @@ mod tests {
req,
Bytes::from(serde_json::to_vec(&json).unwrap()),
HashMap::default(),
None,
)
.unwrap();

Expand Down Expand Up @@ -612,10 +644,13 @@ mod tests {
.into_iter(),
);

assert!(
into_event_batch(req, Bytes::from(serde_json::to_vec(&json).unwrap()), schema,)
.is_err()
);
assert!(into_event_batch(
req,
Bytes::from(serde_json::to_vec(&json).unwrap()),
schema,
None
)
.is_err());
}

#[test]
Expand Down Expand Up @@ -647,6 +682,7 @@ mod tests {
req,
Bytes::from(serde_json::to_vec(&json).unwrap()),
HashMap::default(),
None,
)
.unwrap();

Expand Down
35 changes: 26 additions & 9 deletions server/src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@

use std::fs;

use actix_web::http::StatusCode;
use actix_web::{web, HttpRequest, Responder};
use chrono::Utc;
use serde_json::Value;

use self::error::{CreateStreamError, StreamError};
use crate::alerts::Alerts;
use crate::handlers::TIME_PARTITION_KEY;
use crate::metadata::STREAM_INFO;
use crate::option::CONFIG;
use crate::storage::{retention::Retention, LogStream, StorageDir};
use crate::{catalog, event, stats};
use crate::{metadata, validator};
use actix_web::http::StatusCode;
use actix_web::{web, HttpRequest, Responder};
use chrono::Utc;
use serde_json::Value;

pub async fn delete(req: HttpRequest) -> Result<impl Responder, StreamError> {
let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
Expand Down Expand Up @@ -108,6 +108,16 @@ pub async fn get_alert(req: HttpRequest) -> Result<impl Responder, StreamError>
}

pub async fn put_stream(req: HttpRequest) -> Result<impl Responder, StreamError> {
let time_partition = if let Some((_, time_partition_name)) = req
.headers()
.iter()
.find(|&(key, _)| key == TIME_PARTITION_KEY)
{
time_partition_name.to_str().unwrap()
} else {
""
};

let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();

if metadata::STREAM_INFO.stream_exists(&stream_name) {
Expand All @@ -119,7 +129,7 @@ pub async fn put_stream(req: HttpRequest) -> Result<impl Responder, StreamError>
status: StatusCode::BAD_REQUEST,
});
} else {
create_stream(stream_name).await?;
create_stream(stream_name, time_partition).await?;
}

Ok(("log stream created", StatusCode::OK))
Expand Down Expand Up @@ -326,13 +336,16 @@ fn remove_id_from_alerts(value: &mut Value) {
}
}

pub async fn create_stream(stream_name: String) -> Result<(), CreateStreamError> {
pub async fn create_stream(
stream_name: String,
time_partition: &str,
) -> Result<(), CreateStreamError> {
// fail to proceed if invalid stream name
validator::stream_name(&stream_name)?;

// Proceed to create log stream if it doesn't exist
let storage = CONFIG.storage().get_object_store();
if let Err(err) = storage.create_stream(&stream_name).await {
if let Err(err) = storage.create_stream(&stream_name, time_partition).await {
return Err(CreateStreamError::Storage { stream_name, err });
}

Expand All @@ -344,7 +357,11 @@ pub async fn create_stream(stream_name: String) -> Result<(), CreateStreamError>
let stream_meta = stream_meta.unwrap();
let created_at = stream_meta.created_at;

metadata::STREAM_INFO.add_stream(stream_name.to_string(), created_at);
metadata::STREAM_INFO.add_stream(
stream_name.to_string(),
created_at,
time_partition.to_string(),
);

Ok(())
}
Expand Down
14 changes: 13 additions & 1 deletion server/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,13 @@ impl StreamInfo {
.map(|metadata| metadata.cache_enabled)
}

pub fn get_time_partition(&self, stream_name: &str) -> Result<Option<String>, MetadataError> {
let map = self.read().expect(LOCK_EXPECT);
map.get(stream_name)
.ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string()))
.map(|metadata| metadata.time_partition.clone())
}

pub fn set_stream_cache(&self, stream_name: &str, enable: bool) -> Result<(), MetadataError> {
let mut map = self.write().expect(LOCK_EXPECT);
let stream = map
Expand Down Expand Up @@ -143,14 +150,19 @@ impl StreamInfo {
})
}

pub fn add_stream(&self, stream_name: String, created_at: String) {
pub fn add_stream(&self, stream_name: String, created_at: String, time_partition: String) {
let mut map = self.write().expect(LOCK_EXPECT);
let metadata = LogStreamMetadata {
created_at: if created_at.is_empty() {
Local::now().to_rfc3339()
} else {
created_at
},
time_partition: if time_partition.is_empty() {
None
} else {
Some(time_partition)
},
..Default::default()
};
map.insert(stream_name, metadata);
Expand Down
4 changes: 1 addition & 3 deletions server/src/query/filter_optimizer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,7 @@ impl FilterOptimizerRule {
Expr::Column(Column::from_name(&self.column)).like(lit(format!("%{}%", literal)))
});

let Some(mut filter_expr) = patterns.next() else {
return None;
};
let mut filter_expr = patterns.next()?;
for expr in patterns {
filter_expr = or(filter_expr, expr)
}
Expand Down
4 changes: 1 addition & 3 deletions server/src/rbac/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,7 @@ impl Sessions {

// remove a specific session
pub fn remove_session(&mut self, key: &SessionKey) -> Option<String> {
let Some((user, _)) = self.active_sessions.remove(key) else {
return None;
};
let (user, _) = self.active_sessions.remove(key)?;

if let Some(items) = self.user_sessions.get_mut(&user) {
items.retain(|(session, _)| session != key);
Expand Down
16 changes: 14 additions & 2 deletions server/src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,20 @@ pub trait ObjectStorage: Sync + 'static {
Ok(())
}

async fn create_stream(&self, stream_name: &str) -> Result<(), ObjectStorageError> {
async fn create_stream(
&self,
stream_name: &str,
time_partition: &str,
) -> Result<(), ObjectStorageError> {
let mut format = ObjectStoreFormat::default();
format.set_id(CONFIG.parseable.username.clone());
let permission = Permisssion::new(CONFIG.parseable.username.clone());
format.permissions = vec![permission];
if time_partition.is_empty() {
format.time_partition = None;
} else {
format.time_partition = Some(time_partition.to_string());
}
let format_json = to_bytes(&format);
self.put_object(&schema_path(stream_name), to_bytes(&Schema::empty()))
.await?;
Expand Down Expand Up @@ -325,8 +334,11 @@ pub trait ObjectStorage: Sync + 'static {
let cache_enabled = STREAM_INFO
.cache_enabled(stream)
.map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?;
let time_partition = STREAM_INFO
.get_time_partition(stream)
.map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?;
let dir = StorageDir::new(stream);
let schema = convert_disk_files_to_parquet(stream, &dir)
let schema = convert_disk_files_to_parquet(stream, &dir, time_partition)
.map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?;

if let Some(schema) = schema {
Expand Down
Loading