diff --git a/server/src/catalog.rs b/server/src/catalog.rs index 7fd4aff93..eb1237727 100644 --- a/server/src/catalog.rs +++ b/server/src/catalog.rs @@ -23,6 +23,7 @@ use crate::handlers::http::base_path_without_preceding_slash; use crate::option::CONFIG; use crate::{ catalog::manifest::Manifest, + event::DEFAULT_TIMESTAMP_KEY, query::PartialTimeFilter, storage::{object_storage::manifest_path, ObjectStorage, ObjectStorageError}, }; @@ -41,8 +42,11 @@ pub trait Snapshot { } pub trait ManifestFile { + #[allow(unused)] fn file_name(&self) -> &str; + #[allow(unused)] fn ingestion_size(&self) -> u64; + #[allow(unused)] fn file_size(&self) -> u64; fn num_rows(&self) -> u64; fn columns(&self) -> &[Column]; @@ -70,14 +74,17 @@ impl ManifestFile for manifest::File { } } -fn get_file_bounds(file: &manifest::File) -> (DateTime, DateTime) { +fn get_file_bounds( + file: &manifest::File, + partition_column: String, +) -> (DateTime, DateTime) { match file .columns() .iter() - .find(|col| col.name == "p_timestamp") + .find(|col| col.name == partition_column) .unwrap() .stats - .clone() + .as_ref() .unwrap() { column::TypedStatistics::Int(stats) => ( @@ -95,8 +102,19 @@ pub async fn update_snapshot( ) -> Result<(), ObjectStorageError> { // get current snapshot let mut meta = storage.get_object_store_format(stream_name).await?; + let meta_clone = meta.clone(); let manifests = &mut meta.snapshot.manifest_list; - let (lower_bound, _) = get_file_bounds(&change); + let time_partition: Option = meta_clone.time_partition; + let lower_bound = match time_partition { + Some(time_partition) => { + let (lower_bound, _) = get_file_bounds(&change, time_partition); + lower_bound + } + None => { + let (lower_bound, _) = get_file_bounds(&change, DEFAULT_TIMESTAMP_KEY.to_string()); + lower_bound + } + }; let pos = manifests.iter().position(|item| { item.time_lower_bound <= lower_bound && lower_bound < item.time_upper_bound }); @@ -239,6 +257,7 @@ pub async fn get_first_event( // get current snapshot let mut meta = storage.get_object_store_format(stream_name).await?; let manifests = &mut meta.snapshot.manifest_list; + let time_partition = meta.time_partition; if manifests.is_empty() { log::info!("No manifest found for stream {stream_name}"); return Err(ObjectStorageError::Custom("No manifest found".to_string())); @@ -257,7 +276,17 @@ pub async fn get_first_event( )); }; if let Some(first_event) = manifest.files.first() { - let (lower_bound, _) = get_file_bounds(first_event); + let lower_bound = match time_partition { + Some(time_partition) => { + let (lower_bound, _) = get_file_bounds(first_event, time_partition); + lower_bound + } + None => { + let (lower_bound, _) = + get_file_bounds(first_event, DEFAULT_TIMESTAMP_KEY.to_string()); + lower_bound + } + }; first_event_at = lower_bound.with_timezone(&Local).to_rfc3339(); } } diff --git a/server/src/event.rs b/server/src/event.rs index 98774daf2..eeb95e0b0 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -29,6 +29,7 @@ use std::sync::Arc; use self::error::EventError; pub use self::writer::STREAM_WRITERS; use crate::metadata; +use chrono::NaiveDateTime; pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp"; pub const DEFAULT_TAGS_KEY: &str = "p_tags"; @@ -41,19 +42,30 @@ pub struct Event { pub origin_format: &'static str, pub origin_size: u64, pub is_first_event: bool, + pub parsed_timestamp: NaiveDateTime, + pub time_partition: Option, } // Events holds the schema related to a each event for a single log stream impl Event { pub async fn process(self) -> Result<(), EventError> { - let key = get_schema_key(&self.rb.schema().fields); - let num_rows = self.rb.num_rows() as u64; + let mut key = get_schema_key(&self.rb.schema().fields); + if self.time_partition.is_some() { + let parsed_timestamp_to_min = self.parsed_timestamp.format("%Y%m%dT%H%M").to_string(); + key = format!("{key}{parsed_timestamp_to_min}"); + } + let num_rows = self.rb.num_rows() as u64; if self.is_first_event { commit_schema(&self.stream_name, self.rb.schema())?; } - Self::process_event(&self.stream_name, &key, self.rb.clone())?; + Self::process_event( + &self.stream_name, + &key, + self.rb.clone(), + self.parsed_timestamp, + )?; metadata::STREAM_INFO.update_stats( &self.stream_name, @@ -80,8 +92,9 @@ impl Event { stream_name: &str, schema_key: &str, rb: RecordBatch, + parsed_timestamp: NaiveDateTime, ) -> Result<(), EventError> { - STREAM_WRITERS.append_to_local(stream_name, schema_key, rb)?; + STREAM_WRITERS.append_to_local(stream_name, schema_key, rb, parsed_timestamp)?; Ok(()) } } @@ -90,7 +103,7 @@ pub fn get_schema_key(fields: &[Arc]) -> String { // Fields must be sorted let mut hasher = xxhash_rust::xxh3::Xxh3::new(); for field in fields.iter().sorted_by_key(|v| v.name()) { - hasher.update(field.name().as_bytes()) + hasher.update(field.name().as_bytes()); } let hash = hasher.digest(); format!("{hash:x}") diff --git a/server/src/event/format.rs b/server/src/event/format.rs index 169f35e23..8f5971a13 100644 --- a/server/src/event/format.rs +++ b/server/src/event/format.rs @@ -41,20 +41,20 @@ pub trait EventFormat: Sized { fn to_data( self, schema: HashMap>, - time_partition: Option, static_schema_flag: Option, + time_partition: Option, ) -> Result<(Self::Data, EventSchema, bool, Tags, Metadata), AnyError>; fn decode(data: Self::Data, schema: Arc) -> Result; fn into_recordbatch( self, storage_schema: HashMap>, - time_partition: Option, static_schema_flag: Option, + time_partition: Option, ) -> Result<(RecordBatch, bool), AnyError> { let (data, mut schema, is_first, tags, metadata) = self.to_data( storage_schema.clone(), - time_partition, static_schema_flag.clone(), + time_partition.clone(), )?; if get_field(&schema, DEFAULT_TAGS_KEY).is_some() { @@ -96,10 +96,11 @@ pub trait EventFormat: Sized { ))); // prepare the record batch and new fields to be added - let new_schema = Arc::new(Schema::new(schema)); + let mut new_schema = Arc::new(Schema::new(schema)); if !Self::is_schema_matching(new_schema.clone(), storage_schema, static_schema_flag) { return Err(anyhow!("Schema mismatch")); } + new_schema = update_field_type_in_schema(new_schema, time_partition); let rb = Self::decode(data, new_schema.clone())?; let tags_arr = StringArray::from_iter_values(std::iter::repeat(&tags).take(rb.num_rows())); let metadata_arr = @@ -143,3 +144,30 @@ pub trait EventFormat: Sized { true } } + +pub fn update_field_type_in_schema( + schema: Arc, + time_partition: Option, +) -> Arc { + if time_partition.is_none() { + return schema; + } + let field_name = time_partition.unwrap(); + let new_schema: Vec = schema + .fields() + .iter() + .map(|field| { + if *field.name() == field_name { + if field.data_type() == &DataType::Utf8 { + let new_data_type = DataType::Timestamp(TimeUnit::Millisecond, None); + Field::new(field.name().clone(), new_data_type, true) + } else { + Field::new(field.name(), field.data_type().clone(), true) + } + } else { + Field::new(field.name(), field.data_type().clone(), true) + } + }) + .collect(); + Arc::new(Schema::new(new_schema)) +} diff --git a/server/src/event/format/json.rs b/server/src/event/format/json.rs index b31acbedc..eac24c1da 100644 --- a/server/src/event/format/json.rs +++ b/server/src/event/format/json.rs @@ -45,10 +45,10 @@ impl EventFormat for Event { fn to_data( self, schema: HashMap>, - time_partition: Option, static_schema_flag: Option, + time_partition: Option, ) -> Result<(Self::Data, Vec>, bool, Tags, Metadata), anyhow::Error> { - let data = flatten_json_body(self.data, time_partition)?; + let data = flatten_json_body(self.data, None, false)?; let stream_schema = schema; // incoming event may be a single json or a json array @@ -68,7 +68,12 @@ impl EventFormat for Event { let schema = match derive_arrow_schema(&stream_schema, fields) { Ok(schema) => schema, Err(_) => match infer_json_schema_from_iterator(value_arr.iter().map(Ok)) { - Ok(infer_schema) => { + Ok(mut infer_schema) => { + let new_infer_schema = super::super::format::update_field_type_in_schema( + Arc::new(infer_schema), + time_partition, + ); + infer_schema = Schema::new(new_infer_schema.fields().clone()); if let Err(err) = Schema::try_merge(vec![ Schema::new(stream_schema.values().cloned().collect::()), infer_schema.clone(), diff --git a/server/src/event/writer.rs b/server/src/event/writer.rs index 2d1b46d4e..737a0a514 100644 --- a/server/src/event/writer.rs +++ b/server/src/event/writer.rs @@ -25,11 +25,11 @@ use std::{ sync::{Arc, Mutex, RwLock}, }; -use crate::utils; - use self::{errors::StreamWriterError, file_writer::FileWriter, mem_writer::MemWriter}; +use crate::utils; use arrow_array::{RecordBatch, TimestampMillisecondArray}; use arrow_schema::Schema; +use chrono::NaiveDateTime; use chrono::Utc; use derive_more::{Deref, DerefMut}; use once_cell::sync::Lazy; @@ -48,6 +48,7 @@ impl Writer { stream_name: &str, schema_key: &str, rb: RecordBatch, + parsed_timestamp: NaiveDateTime, ) -> Result<(), StreamWriterError> { let rb = utils::arrow::replace_columns( rb.schema(), @@ -56,7 +57,8 @@ impl Writer { &[Arc::new(get_timestamp_array(rb.num_rows()))], ); - self.disk.push(stream_name, schema_key, &rb)?; + self.disk + .push(stream_name, schema_key, &rb, parsed_timestamp)?; self.mem.push(schema_key, rb); Ok(()) } @@ -72,15 +74,18 @@ impl WriterTable { stream_name: &str, schema_key: &str, record: RecordBatch, + parsed_timestamp: NaiveDateTime, ) -> Result<(), StreamWriterError> { let hashmap_guard = self.read().unwrap(); match hashmap_guard.get(stream_name) { Some(stream_writer) => { - stream_writer - .lock() - .unwrap() - .push(stream_name, schema_key, record)?; + stream_writer.lock().unwrap().push( + stream_name, + schema_key, + record, + parsed_timestamp, + )?; } None => { drop(hashmap_guard); @@ -88,13 +93,15 @@ impl WriterTable { // check for race condition // if map contains entry then just if let Some(writer) = map.get(stream_name) { - writer - .lock() - .unwrap() - .push(stream_name, schema_key, record)?; + writer.lock().unwrap().push( + stream_name, + schema_key, + record, + parsed_timestamp, + )?; } else { let mut writer = Writer::default(); - writer.push(stream_name, schema_key, record)?; + writer.push(stream_name, schema_key, record, parsed_timestamp)?; map.insert(stream_name.to_owned(), Mutex::new(writer)); } } diff --git a/server/src/event/writer/file_writer.rs b/server/src/event/writer/file_writer.rs index d90e361e3..1b193eb4c 100644 --- a/server/src/event/writer/file_writer.rs +++ b/server/src/event/writer/file_writer.rs @@ -24,9 +24,9 @@ use std::collections::HashMap; use std::fs::{File, OpenOptions}; use std::path::PathBuf; -use crate::storage::staging::StorageDir; - use super::errors::StreamWriterError; +use crate::storage::staging::StorageDir; +use chrono::NaiveDateTime; pub struct ArrowWriter { pub file_path: PathBuf, @@ -43,6 +43,7 @@ impl FileWriter { stream_name: &str, schema_key: &str, record: &RecordBatch, + parsed_timestamp: NaiveDateTime, ) -> Result<(), StreamWriterError> { match self.get_mut(schema_key) { Some(writer) => { @@ -54,7 +55,8 @@ impl FileWriter { // entry is not present thus we create it None => { // this requires mutable borrow of the map so we drop this read lock and wait for write lock - let (path, writer) = init_new_stream_writer_file(stream_name, schema_key, record)?; + let (path, writer) = + init_new_stream_writer_file(stream_name, schema_key, record, parsed_timestamp)?; self.insert( schema_key.to_owned(), ArrowWriter { @@ -79,9 +81,10 @@ fn init_new_stream_writer_file( stream_name: &str, schema_key: &str, record: &RecordBatch, + parsed_timestamp: NaiveDateTime, ) -> Result<(PathBuf, StreamWriter), StreamWriterError> { let dir = StorageDir::new(stream_name); - let path = dir.path_by_current_time(schema_key); + let path = dir.path_by_current_time(schema_key, parsed_timestamp); std::fs::create_dir_all(dir.data_path)?; let file = OpenOptions::new().create(true).append(true).open(&path)?; diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index d878b7d5a..edfc75efa 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -31,9 +31,11 @@ use crate::metadata::{self, STREAM_INFO}; use crate::option::{Mode, CONFIG}; use crate::storage::{LogStream, ObjectStorageError}; use crate::utils::header_parsing::{collect_labelled_headers, ParseHeaderError}; +use crate::utils::json::convert_array_to_object; use actix_web::{http::header::ContentType, HttpRequest, HttpResponse}; use arrow_schema::{Field, Schema}; use bytes::Bytes; +use chrono::{DateTime, Utc}; use http::StatusCode; use serde_json::Value; use std::collections::{BTreeMap, HashMap}; @@ -95,58 +97,106 @@ pub async fn post_event(req: HttpRequest, body: Bytes) -> Result Result<(), PostError> { - let (size, rb, is_first_event) = { - let hash_map = STREAM_INFO.read().unwrap(); - let schema = hash_map - .get(&stream_name) - .ok_or(PostError::StreamNotFound(stream_name.clone()))? - .schema - .clone(); - let time_partition = hash_map - .get(&stream_name) - .ok_or(PostError::StreamNotFound(stream_name.clone()))? - .time_partition - .clone(); - let static_schema_flag = hash_map - .get(&stream_name) - .ok_or(PostError::StreamNotFound(stream_name.clone()))? - .static_schema_flag - .clone(); - - into_event_batch(req, body, schema, time_partition, static_schema_flag)? - }; - - event::Event { - rb, - stream_name, - origin_format: "json", - origin_size: size as u64, - is_first_event, + let glob_storage = CONFIG.storage().get_object_store(); + let object_store_format = glob_storage + .get_object_store_format(&stream_name) + .await + .map_err(|_err| PostError::StreamNotFound(stream_name.clone()))?; + + let time_partition = object_store_format.time_partition; + let static_schema_flag = object_store_format.static_schema_flag; + let body_val: Value = serde_json::from_slice(&body)?; + let size: usize = body.len(); + let mut parsed_timestamp = Utc::now().naive_utc(); + if time_partition.is_none() { + let stream = stream_name.clone(); + let (rb, is_first_event) = get_stream_schema( + stream.clone(), + req, + body_val, + static_schema_flag, + time_partition.clone(), + )?; + event::Event { + rb, + stream_name: stream, + origin_format: "json", + origin_size: size as u64, + is_first_event, + parsed_timestamp, + time_partition, + } + .process() + .await?; + } else { + let data = convert_array_to_object(body_val.clone(), time_partition.clone())?; + for value in data { + let body_timestamp = value.get(&time_partition.clone().unwrap().to_string()); + parsed_timestamp = body_timestamp + .unwrap() + .to_owned() + .as_str() + .unwrap() + .parse::>() + .unwrap() + .naive_utc(); + + let (rb, is_first_event) = get_stream_schema( + stream_name.clone(), + req.clone(), + value.clone(), + static_schema_flag.clone(), + time_partition.clone(), + )?; + event::Event { + rb, + stream_name: stream_name.clone(), + origin_format: "json", + origin_size: value.to_string().into_bytes().len() as u64, + is_first_event, + parsed_timestamp, + time_partition: time_partition.clone(), + } + .process() + .await?; + } } - .process() - .await?; Ok(()) } +fn get_stream_schema( + stream_name: String, + req: HttpRequest, + body: Value, + static_schema_flag: Option, + time_partition: Option, +) -> Result<(arrow_array::RecordBatch, bool), PostError> { + let hash_map = STREAM_INFO.read().unwrap(); + let schema = hash_map + .get(&stream_name) + .ok_or(PostError::StreamNotFound(stream_name))? + .schema + .clone(); + into_event_batch(req, body, schema, static_schema_flag, time_partition) +} + fn into_event_batch( req: HttpRequest, - body: Bytes, + body: Value, schema: HashMap>, - time_partition: Option, static_schema_flag: Option, -) -> Result<(usize, arrow_array::RecordBatch, bool), PostError> { + time_partition: Option, +) -> Result<(arrow_array::RecordBatch, bool), PostError> { let tags = collect_labelled_headers(&req, PREFIX_TAGS, SEPARATOR)?; let metadata = collect_labelled_headers(&req, PREFIX_META, SEPARATOR)?; - let size = body.len(); - let body: Value = serde_json::from_slice(&body)?; let event = format::json::Event { data: body, tags, metadata, }; - let (rb, is_first) = event.into_recordbatch(schema, time_partition, static_schema_flag)?; - Ok((size, rb, is_first)) + let (rb, is_first) = event.into_recordbatch(schema, static_schema_flag, time_partition)?; + Ok((rb, is_first)) } // Check if the stream exists and create a new stream if doesn't exist @@ -249,7 +299,6 @@ mod tests { types::Int64Type, ArrayRef, Float64Array, Int64Array, ListArray, StringArray, }; use arrow_schema::{DataType, Field}; - use bytes::Bytes; use serde_json::json; use crate::{ @@ -296,16 +345,8 @@ mod tests { .append_header((PREFIX_META.to_string() + "C", "meta1")) .to_http_request(); - let (size, rb, _) = into_event_batch( - req, - Bytes::from(serde_json::to_vec(&json).unwrap()), - HashMap::default(), - None, - None, - ) - .unwrap(); - - assert_eq!(size, 28); + let (rb, _) = into_event_batch(req, json, HashMap::default(), None, None).unwrap(); + assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 6); assert_eq!( @@ -344,14 +385,7 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (_, rb, _) = into_event_batch( - req, - Bytes::from(serde_json::to_vec(&json).unwrap()), - HashMap::default(), - None, - None, - ) - .unwrap(); + let (rb, _) = into_event_batch(req, json, HashMap::default(), None, None).unwrap(); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 6); @@ -383,14 +417,7 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (_, rb, _) = into_event_batch( - req, - Bytes::from(serde_json::to_vec(&json).unwrap()), - schema, - None, - None, - ) - .unwrap(); + let (rb, _) = into_event_batch(req, json, schema, None, None).unwrap(); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 5); @@ -422,14 +449,7 @@ mod tests { let req = TestRequest::default().to_http_request(); - assert!(into_event_batch( - req, - Bytes::from(serde_json::to_vec(&json).unwrap()), - schema, - None, - None - ) - .is_err()); + assert!(into_event_batch(req, json, schema, None, None).is_err()); } #[test] @@ -447,14 +467,7 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (_, rb, _) = into_event_batch( - req, - Bytes::from(serde_json::to_vec(&json).unwrap()), - schema, - None, - None, - ) - .unwrap(); + let (rb, _) = into_event_batch(req, json, schema, None, None).unwrap(); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 3); @@ -466,14 +479,7 @@ mod tests { let req = TestRequest::default().to_http_request(); - assert!(into_event_batch( - req, - Bytes::from(serde_json::to_vec(&json).unwrap()), - HashMap::default(), - None, - None - ) - .is_err()) + assert!(into_event_batch(req, json, HashMap::default(), None, None).is_err()) } #[test] @@ -496,14 +502,7 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (_, rb, _) = into_event_batch( - req, - Bytes::from(serde_json::to_vec(&json).unwrap()), - HashMap::default(), - None, - None, - ) - .unwrap(); + let (rb, _) = into_event_batch(req, json, HashMap::default(), None, None).unwrap(); assert_eq!(rb.num_rows(), 3); assert_eq!(rb.num_columns(), 6); @@ -551,14 +550,7 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (_, rb, _) = into_event_batch( - req, - Bytes::from(serde_json::to_vec(&json).unwrap()), - HashMap::default(), - None, - None, - ) - .unwrap(); + let (rb, _) = into_event_batch(req, json, HashMap::default(), None, None).unwrap(); assert_eq!(rb.num_rows(), 3); assert_eq!(rb.num_columns(), 6); @@ -606,14 +598,7 @@ mod tests { ); let req = TestRequest::default().to_http_request(); - let (_, rb, _) = into_event_batch( - req, - Bytes::from(serde_json::to_vec(&json).unwrap()), - schema, - None, - None, - ) - .unwrap(); + let (rb, _) = into_event_batch(req, json, schema, None, None).unwrap(); assert_eq!(rb.num_rows(), 3); assert_eq!(rb.num_columns(), 6); @@ -662,14 +647,7 @@ mod tests { .into_iter(), ); - assert!(into_event_batch( - req, - Bytes::from(serde_json::to_vec(&json).unwrap()), - schema, - None, - None - ) - .is_err()); + assert!(into_event_batch(req, json, schema, None, None).is_err()); } #[test] @@ -697,14 +675,7 @@ mod tests { let req = TestRequest::default().to_http_request(); - let (_, rb, _) = into_event_batch( - req, - Bytes::from(serde_json::to_vec(&json).unwrap()), - HashMap::default(), - None, - None, - ) - .unwrap(); + let (rb, _) = into_event_batch(req, json, HashMap::default(), None, None).unwrap(); assert_eq!(rb.num_rows(), 4); assert_eq!(rb.num_columns(), 7); diff --git a/server/src/handlers/http/logstream.rs b/server/src/handlers/http/logstream.rs index 433ec3651..1496a403c 100644 --- a/server/src/handlers/http/logstream.rs +++ b/server/src/handlers/http/logstream.rs @@ -215,7 +215,9 @@ pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result String { - self.to_str().to_string() +impl fmt::Display for Mode { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.to_str()) } } diff --git a/server/src/query.rs b/server/src/query.rs index 13e603f21..aaac8d1cf 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -229,13 +229,13 @@ fn transform( Some(time_partition) => { _start_time_filter = PartialTimeFilter::Low(std::ops::Bound::Included(start_time)) - .binary_expr_timestamp_partition_key(Expr::Column(Column::new( + .binary_expr(Expr::Column(Column::new( Some(table.table_name.to_owned_reference()), time_partition.clone(), ))); _end_time_filter = PartialTimeFilter::High(std::ops::Bound::Excluded(end_time)) - .binary_expr_timestamp_partition_key(Expr::Column(Column::new( + .binary_expr(Expr::Column(Column::new( Some(table.table_name.to_owned_reference()), time_partition, ))); @@ -243,13 +243,13 @@ fn transform( None => { _start_time_filter = PartialTimeFilter::Low(std::ops::Bound::Included(start_time)) - .binary_expr_default_timestamp_key(Expr::Column(Column::new( + .binary_expr(Expr::Column(Column::new( Some(table.table_name.to_owned_reference()), event::DEFAULT_TIMESTAMP_KEY, ))); _end_time_filter = PartialTimeFilter::High(std::ops::Bound::Excluded(end_time)) - .binary_expr_default_timestamp_key(Expr::Column(Column::new( + .binary_expr(Expr::Column(Column::new( Some(table.table_name.to_owned_reference()), event::DEFAULT_TIMESTAMP_KEY, ))); diff --git a/server/src/query/stream_schema_provider.rs b/server/src/query/stream_schema_provider.rs index 649f55ea2..a2627bf1e 100644 --- a/server/src/query/stream_schema_provider.rs +++ b/server/src/query/stream_schema_provider.rs @@ -551,7 +551,7 @@ impl PartialTimeFilter { Some(value) } - pub fn binary_expr_default_timestamp_key(&self, left: Expr) -> Expr { + pub fn binary_expr(&self, left: Expr) -> Expr { let (op, right) = match self { PartialTimeFilter::Low(Bound::Excluded(time)) => { (Operator::Gt, time.and_utc().timestamp_millis()) @@ -578,26 +578,6 @@ impl PartialTimeFilter { ))), )) } - - pub fn binary_expr_timestamp_partition_key(&self, left: Expr) -> Expr { - let (op, right) = match self { - PartialTimeFilter::Low(Bound::Excluded(time)) => (Operator::Gt, time), - PartialTimeFilter::Low(Bound::Included(time)) => (Operator::GtEq, time), - PartialTimeFilter::High(Bound::Excluded(time)) => (Operator::Lt, time), - PartialTimeFilter::High(Bound::Included(time)) => (Operator::LtEq, time), - PartialTimeFilter::Eq(time) => (Operator::Eq, time), - _ => unimplemented!(), - }; - - Expr::BinaryExpr(BinaryExpr::new( - Box::new(left), - op, - Box::new(Expr::Literal(ScalarValue::Utf8(Some(format!( - "{:?}", - right - ))))), - )) - } } fn is_overlapping_query( diff --git a/server/src/static_schema.rs b/server/src/static_schema.rs index cbf48b7ae..b7305da88 100644 --- a/server/src/static_schema.rs +++ b/server/src/static_schema.rs @@ -6,16 +6,17 @@ use std::str; use arrow_schema::{DataType, Field, Schema, TimeUnit}; use std::{collections::HashMap, sync::Arc}; -#[derive(Serialize, Deserialize, Debug)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct StaticSchema { fields: Vec, } -#[derive(Serialize, Deserialize, Debug)] +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] pub struct SchemaFields { name: String, data_type: String, } + #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct ParsedSchema { @@ -39,14 +40,21 @@ pub struct Fields { pub struct Metadata {} pub fn convert_static_schema_to_arrow_schema( static_schema: StaticSchema, + time_partition: &str, ) -> Result, AnyError> { let mut parsed_schema = ParsedSchema { fields: Vec::new(), metadata: HashMap::new(), }; - for field in static_schema.fields.iter() { + let mut time_partition_exists: bool = false; + for mut field in static_schema.fields { + if !time_partition.is_empty() && field.name == time_partition { + time_partition_exists = true; + field.data_type = "datetime".to_string(); + } let parsed_field = Fields { name: field.name.clone(), + data_type: { match field.data_type.as_str() { "int" => DataType::Int64, @@ -74,8 +82,16 @@ pub fn convert_static_schema_to_arrow_schema( dict_is_ordered: default_dict_is_ordered(), metadata: HashMap::new(), }; + parsed_schema.fields.push(parsed_field); } + if !time_partition.is_empty() && !time_partition_exists { + return Err(anyhow! { + format!( + "time partition field {time_partition} does not exist in the schema for the static schema logstream" + ), + }); + } let schema = add_parseable_fields_to_static_schema(parsed_schema); if schema.is_err() { return Err(schema.err().unwrap()); diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 77eb9f20d..82d35ecc3 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -293,6 +293,7 @@ pub trait ObjectStorage: Sync + 'static { } /// for future use + #[allow(dead_code)] async fn get_stats_for_first_time( &self, stream_name: &str, diff --git a/server/src/storage/staging.rs b/server/src/storage/staging.rs index fa689f0b8..abeac7062 100644 --- a/server/src/storage/staging.rs +++ b/server/src/storage/staging.rs @@ -38,7 +38,7 @@ use crate::{ }; use arrow_schema::{ArrowError, Schema}; use base64::Engine; -use chrono::{NaiveDateTime, Timelike, Utc}; +use chrono::{NaiveDateTime, Timelike}; use parquet::{ arrow::ArrowWriter, basic::Encoding, @@ -47,6 +47,7 @@ use parquet::{ format::SortingColumn, schema::types::ColumnPath, }; +use rand::distributions::DistString; const ARROW_FILE_EXTENSION: &str = "data.arrows"; const PARQUET_FILE_EXTENSION: &str = "data.parquet"; @@ -85,14 +86,19 @@ impl StorageDir { ) } - fn filename_by_current_time(stream_hash: &str) -> String { - let datetime = Utc::now(); - Self::filename_by_time(stream_hash, datetime.naive_utc()) + fn filename_by_current_time(stream_hash: &str, parsed_timestamp: NaiveDateTime) -> String { + Self::filename_by_time(stream_hash, parsed_timestamp) } - pub fn path_by_current_time(&self, stream_hash: &str) -> PathBuf { - self.data_path - .join(Self::filename_by_current_time(stream_hash)) + pub fn path_by_current_time( + &self, + stream_hash: &str, + parsed_timestamp: NaiveDateTime, + ) -> PathBuf { + self.data_path.join(Self::filename_by_current_time( + stream_hash, + parsed_timestamp, + )) } pub fn arrow_files(&self) -> Vec { @@ -166,11 +172,13 @@ impl StorageDir { } fn arrow_path_to_parquet(path: &Path) -> PathBuf { - let filename = path.file_name().unwrap().to_str().unwrap(); + let filename = path.file_stem().unwrap().to_str().unwrap(); let (_, filename) = filename.split_once('.').unwrap(); let filename = filename.rsplit_once('.').expect("contains the delim `.`"); let filename = format!("{}.{}", filename.0, filename.1); - + let random_string = + rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 15); + let filename_with_random_number = format!("{}.{}.{}", filename, random_string, "arrows"); /* let file_stem = path.file_stem().unwrap().to_str().unwrap(); let random_string = @@ -180,7 +188,7 @@ impl StorageDir { */ let mut parquet_path = path.to_owned(); - parquet_path.set_file_name(filename); + parquet_path.set_file_name(filename_with_random_number); parquet_path.set_extension("parquet"); parquet_path } diff --git a/server/src/utils/json.rs b/server/src/utils/json.rs index 082b4e823..c55ff362e 100644 --- a/server/src/utils/json.rs +++ b/server/src/utils/json.rs @@ -24,8 +24,22 @@ pub mod flatten; pub fn flatten_json_body( body: serde_json::Value, time_partition: Option, + validation_required: bool, ) -> Result { - flatten::flatten(body, "_", time_partition) + flatten::flatten(body, "_", time_partition, validation_required) +} + +pub fn convert_array_to_object( + body: Value, + time_partition: Option, +) -> Result, anyhow::Error> { + let data = flatten_json_body(body, time_partition, true)?; + let value_arr = match data { + Value::Array(arr) => arr, + value @ Value::Object(_) => vec![value], + _ => unreachable!("flatten would have failed beforehand"), + }; + Ok(value_arr) } pub fn convert_to_string(value: &Value) -> Value { diff --git a/server/src/utils/json/flatten.rs b/server/src/utils/json/flatten.rs index bc7a33a21..b41cec228 100644 --- a/server/src/utils/json/flatten.rs +++ b/server/src/utils/json/flatten.rs @@ -17,7 +17,7 @@ */ use anyhow::anyhow; -use chrono::{DateTime, Timelike, Utc}; +use chrono::{DateTime, Duration, Utc}; use itertools::Itertools; use serde_json::map::Map; use serde_json::value::Value; @@ -26,26 +26,47 @@ pub fn flatten( nested_value: Value, separator: &str, time_partition: Option, + validation_required: bool, ) -> Result { match nested_value { Value::Object(nested_dict) => { - let validate_time_partition_result = - validate_time_partition(Value::Object(nested_dict.clone()), time_partition.clone()); - if validate_time_partition_result.is_ok() { + if validation_required { + let validate_time_partition_result = validate_time_partition( + Value::Object(nested_dict.clone()), + time_partition.clone(), + ); + if validate_time_partition_result.is_ok() { + let mut map = Map::new(); + flatten_object(&mut map, None, nested_dict, separator)?; + Ok(Value::Object(map)) + } else { + Err(anyhow!(validate_time_partition_result.unwrap_err())) + } + } else { let mut map = Map::new(); flatten_object(&mut map, None, nested_dict, separator)?; Ok(Value::Object(map)) - } else { - Err(anyhow!(validate_time_partition_result.unwrap_err())) } } Value::Array(mut arr) => { for _value in &mut arr { let value: Value = _value.clone(); - let validate_time_partition_result = - validate_time_partition(value, time_partition.clone()); - - if validate_time_partition_result.is_ok() { + if validation_required { + let validate_time_partition_result = + validate_time_partition(value, time_partition.clone()); + + if validate_time_partition_result.is_ok() { + let value = std::mem::replace(_value, Value::Null); + let mut map = Map::new(); + let Value::Object(obj) = value else { + return Err(anyhow!("Expected object in array of objects")); + }; + flatten_object(&mut map, None, obj, separator)?; + *_value = Value::Object(map); + } else { + return Err(anyhow!(validate_time_partition_result.unwrap_err())); + } + } else { let value = std::mem::replace(_value, Value::Null); let mut map = Map::new(); let Value::Object(obj) = value else { @@ -53,8 +74,6 @@ pub fn flatten( }; flatten_object(&mut map, None, obj, separator)?; *_value = Value::Object(map); - } else { - return Err(anyhow!(validate_time_partition_result.unwrap_err())); } } Ok(Value::Array(arr)) @@ -89,14 +108,11 @@ pub fn validate_time_partition( .unwrap() .naive_utc(); - if parsed_timestamp.date() == Utc::now().naive_utc().date() - && parsed_timestamp.hour() == Utc::now().naive_utc().hour() - && parsed_timestamp.minute() == Utc::now().naive_utc().minute() - { + if parsed_timestamp >= Utc::now().naive_utc() - Duration::days(30) { Ok(true) } else { Err(anyhow!(format!( - "field {} and server time are not same", + "field {} value is more than a month old", time_partition.unwrap() ))) } @@ -229,19 +245,19 @@ mod tests { #[test] fn flatten_single_key_string() { let obj = json!({"key": "value"}); - assert_eq!(obj.clone(), flatten(obj, "_", None).unwrap()); + assert_eq!(obj.clone(), flatten(obj, "_", None, false).unwrap()); } #[test] fn flatten_single_key_int() { let obj = json!({"key": 1}); - assert_eq!(obj.clone(), flatten(obj, "_", None).unwrap()); + assert_eq!(obj.clone(), flatten(obj, "_", None, false).unwrap()); } #[test] fn flatten_multiple_key_value() { let obj = json!({"key1": 1, "key2": "value2"}); - assert_eq!(obj.clone(), flatten(obj, "_", None).unwrap()); + assert_eq!(obj.clone(), flatten(obj, "_", None, false).unwrap()); } #[test] @@ -249,7 +265,7 @@ mod tests { let obj = json!({"key": "value", "nested_key": {"key":"value"}}); assert_eq!( json!({"key": "value", "nested_key.key": "value"}), - flatten(obj, ".", None).unwrap() + flatten(obj, ".", None, false).unwrap() ); } @@ -258,7 +274,7 @@ mod tests { let obj = json!({"key": "value", "nested_key": {"key1":"value1", "key2": "value2"}}); assert_eq!( json!({"key": "value", "nested_key.key1": "value1", "nested_key.key2": "value2"}), - flatten(obj, ".", None).unwrap() + flatten(obj, ".", None, false).unwrap() ); } @@ -267,7 +283,7 @@ mod tests { let obj = json!({"key": "value", "nested_key": {"key1":[1,2,3]}}); assert_eq!( json!({"key": "value", "nested_key.key1": [1,2,3]}), - flatten(obj, ".", None).unwrap() + flatten(obj, ".", None, false).unwrap() ); } @@ -276,7 +292,7 @@ mod tests { let obj = json!({"key": [{"a": "value0"}, {"a": "value1"}]}); assert_eq!( json!({"key.a": ["value0", "value1"]}), - flatten(obj, ".", None).unwrap() + flatten(obj, ".", None, false).unwrap() ); } @@ -285,7 +301,7 @@ mod tests { let obj = json!({"key": [{"a": "value0"}, {"a": "value1", "b": "value1"}]}); assert_eq!( json!({"key.a": ["value0", "value1"], "key.b": [null, "value1"]}), - flatten(obj, ".", None).unwrap() + flatten(obj, ".", None, false).unwrap() ); } @@ -294,7 +310,7 @@ mod tests { let obj = json!({"key": [{"a": "value0", "b": "value0"}, {"a": "value1"}]}); assert_eq!( json!({"key.a": ["value0", "value1"], "key.b": ["value0", null]}), - flatten(obj, ".", None).unwrap() + flatten(obj, ".", None, false).unwrap() ); } @@ -303,7 +319,7 @@ mod tests { let obj = json!({"key": [{"a": {"p": 0}, "b": "value0"}, {"b": "value1"}]}); assert_eq!( json!({"key.a.p": [0, null], "key.b": ["value0", "value1"]}), - flatten(obj, ".", None).unwrap() + flatten(obj, ".", None, false).unwrap() ); } @@ -312,14 +328,14 @@ mod tests { let obj = json!({"key": [{"a": [{"p": "value0", "q": "value0"}, {"p": "value1", "q": null}], "b": "value0"}, {"b": "value1"}]}); assert_eq!( json!({"key.a.p": [["value0", "value1"], null], "key.a.q": [["value0", null], null], "key.b": ["value0", "value1"]}), - flatten(obj, ".", None).unwrap() + flatten(obj, ".", None, false).unwrap() ); } #[test] fn flatten_mixed_object() { let obj = json!({"a": 42, "arr": ["1", {"key": "2"}, {"key": {"nested": "3"}}]}); - assert!(flatten(obj, ".", None).is_err()); + assert!(flatten(obj, ".", None, false).is_err()); } #[test]