From de4464cf73abf2838f910851df92fa5060415f2d Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sun, 16 Feb 2025 11:38:05 +0530 Subject: [PATCH 1/5] refactor: move kinesis up in lib --- src/handlers/http/mod.rs | 1 - src/handlers/http/modal/utils/ingest_utils.rs | 6 ++---- src/{handlers/http => }/kinesis.rs | 0 src/lib.rs | 1 + 4 files changed, 3 insertions(+), 5 deletions(-) rename src/{handlers/http => }/kinesis.rs (100%) diff --git a/src/handlers/http/mod.rs b/src/handlers/http/mod.rs index f1f702d4b..4bdf85adf 100644 --- a/src/handlers/http/mod.rs +++ b/src/handlers/http/mod.rs @@ -34,7 +34,6 @@ pub mod cluster; pub mod correlation; pub mod health_check; pub mod ingest; -mod kinesis; pub mod llm; pub mod logstream; pub mod middleware; diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index 402f4d3df..94679d613 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -27,10 +27,8 @@ use crate::{ format::{json, EventFormat, LogSource}, Event, }, - handlers::http::{ - ingest::PostError, - kinesis::{flatten_kinesis_logs, Message}, - }, + handlers::http::ingest::PostError, + kinesis::{flatten_kinesis_logs, Message}, metadata::SchemaVersion, parseable::{StreamNotFound, PARSEABLE}, storage::StreamType, diff --git a/src/handlers/http/kinesis.rs b/src/kinesis.rs similarity index 100% rename from src/handlers/http/kinesis.rs rename to src/kinesis.rs diff --git a/src/lib.rs b/src/lib.rs index d26e37974..0863d09a2 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -29,6 +29,7 @@ pub mod correlation; mod event; pub mod handlers; pub mod hottier; +mod kinesis; mod livetail; mod metadata; pub mod metrics; From e1a24a5cf0ea23eb96385bb2a9f58644192275ce Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sun, 16 Feb 2025 11:38:58 +0530 Subject: [PATCH 2/5] refactor: `process_event_from_chunk` --- src/connectors/kafka/processor.rs | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/src/connectors/kafka/processor.rs b/src/connectors/kafka/processor.rs index 218af7b25..d066b2658 100644 --- a/src/connectors/kafka/processor.rs +++ b/src/connectors/kafka/processor.rs @@ -42,10 +42,7 @@ use super::{config::BufferConfig, ConsumerRecord, StreamConsumer, TopicPartition pub struct ParseableSinkProcessor; impl ParseableSinkProcessor { - async fn build_event_from_chunk( - &self, - records: &[ConsumerRecord], - ) -> anyhow::Result { + async fn process_event_from_chunk(&self, records: &[ConsumerRecord]) -> anyhow::Result<()> { let stream_name = records .first() .map(|r| r.topic.as_str()) @@ -73,7 +70,7 @@ impl ParseableSinkProcessor { schema_version, )?; - let p_event = ParseableEvent { + ParseableEvent { rb, stream_name: stream_name.to_string(), origin_format: "json", @@ -83,9 +80,11 @@ impl ParseableSinkProcessor { time_partition: None, custom_partition_values: HashMap::new(), stream_type: StreamType::UserDefined, - }; + } + .process() + .await?; - Ok(p_event) + Ok(()) } fn json_vec(records: &[ConsumerRecord]) -> (Vec, u64) { @@ -109,10 +108,7 @@ impl Processor, ()> for ParseableSinkProcessor { let len = records.len(); debug!("Processing {} records", len); - self.build_event_from_chunk(&records) - .await? - .process() - .await?; + self.process_event_from_chunk(&records).await?; debug!("Processed {} records", len); Ok(()) From 373d8c760ce6e1ae3a351b4bccd0ab57ad250df2 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sun, 16 Feb 2025 11:41:30 +0530 Subject: [PATCH 3/5] refactor: associate `commit_schema` with stream --- src/event/mod.rs | 36 ++++++++++-------------------------- src/handlers/http/query.rs | 12 +++++++++--- src/parseable/streams.rs | 18 +++++++++++++++++- 3 files changed, 36 insertions(+), 30 deletions(-) diff --git a/src/event/mod.rs b/src/event/mod.rs index 8b599d4b8..dae4c0c1c 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -17,17 +17,17 @@ * */ -pub mod format; +use std::{collections::HashMap, sync::Arc}; use arrow_array::RecordBatch; -use arrow_schema::{Field, Fields, Schema}; +use arrow_schema::Field; +use chrono::NaiveDateTime; +use error::EventError; use itertools::Itertools; -use std::sync::Arc; -use self::error::EventError; -use crate::{metadata::update_stats, parseable::PARSEABLE, storage::StreamType, LOCK_EXPECT}; -use chrono::NaiveDateTime; -use std::collections::HashMap; +use crate::{metadata::update_stats, parseable::PARSEABLE, storage::StreamType}; + +pub mod format; pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp"; @@ -59,11 +59,12 @@ impl Event { } } + let stream = PARSEABLE.get_or_create_stream(&self.stream_name); if self.is_first_event { - commit_schema(&self.stream_name, self.rb.schema())?; + stream.commit_schema(self.rb.schema())?; } - PARSEABLE.get_or_create_stream(&self.stream_name).push( + stream.push( &key, &self.rb, self.parsed_timestamp, @@ -109,23 +110,6 @@ pub fn get_schema_key(fields: &[Arc]) -> String { format!("{hash:x}") } -pub fn commit_schema(stream_name: &str, schema: Arc) -> Result<(), EventError> { - let mut stream_metadata = PARSEABLE.streams.write().expect("lock poisoned"); - - let map = &mut stream_metadata - .get_mut(stream_name) - .expect("map has entry for this stream name") - .metadata - .write() - .expect(LOCK_EXPECT) - .schema; - let current_schema = Schema::new(map.values().cloned().collect::()); - let schema = Schema::try_merge(vec![current_schema, schema.as_ref().clone()])?; - map.clear(); - map.extend(schema.fields.iter().map(|f| (f.name().clone(), f.clone()))); - Ok(()) -} - pub mod error { use arrow_schema::ArrowError; diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index 9e5d405b0..09d6bd088 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -19,6 +19,7 @@ use actix_web::http::header::ContentType; use actix_web::web::{self, Json}; use actix_web::{FromRequest, HttpRequest, HttpResponse, Responder}; +use arrow_schema::ArrowError; use chrono::{DateTime, Utc}; use datafusion::common::tree_node::TreeNode; use datafusion::error::DataFusionError; @@ -36,10 +37,9 @@ use tracing::error; use crate::event::error::EventError; use crate::handlers::http::fetch_schema; -use crate::event::commit_schema; use crate::metrics::QUERY_EXECUTE_TIME; use crate::option::Mode; -use crate::parseable::PARSEABLE; +use crate::parseable::{StreamNotFound, PARSEABLE}; use crate::query::error::ExecuteError; use crate::query::{CountsRequest, CountsResponse, Query as LogicalQuery}; use crate::query::{TableScanVisitor, QUERY_SESSION}; @@ -174,7 +174,9 @@ pub async fn update_schema_when_distributed(tables: &Vec) -> Result<(), // commit schema merges the schema internally and updates the schema in storage. commit_schema_to_storage(table, new_schema.clone()).await?; - commit_schema(table, Arc::new(new_schema))?; + PARSEABLE + .get_stream(table)? + .commit_schema(Arc::new(new_schema))?; } } } @@ -318,6 +320,10 @@ Description: {0}"# ActixError(#[from] actix_web::Error), #[error("Error: {0}")] Anyhow(#[from] anyhow::Error), + #[error("Missing stream {0}")] + StreamNotFound(#[from] StreamNotFound), + #[error("Arrow error: {0}")] + Arrow(#[from] ArrowError), } impl actix_web::ResponseError for QueryError { diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 7d8decdca..563a4b5df 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -28,7 +28,7 @@ use std::{ use arrow_array::RecordBatch; use arrow_ipc::writer::StreamWriter; -use arrow_schema::{Field, Fields, Schema}; +use arrow_schema::{ArrowError, Field, Fields, Schema}; use chrono::{NaiveDateTime, Timelike, Utc}; use derive_more::{Deref, DerefMut}; use itertools::Itertools; @@ -653,6 +653,22 @@ impl Stream { pub fn get_stream_type(&self) -> StreamType { self.metadata.read().expect(LOCK_EXPECT).stream_type } + + pub fn commit_schema(&self, schema: Arc) -> Result<(), ArrowError> { + let map = &mut self.metadata.write().expect(LOCK_EXPECT).schema; + // Construct updated schema + let current_schema = Schema::new(map.values().cloned().collect::()); + let schema = Schema::try_merge(vec![current_schema, schema.as_ref().clone()])?; + let updated_schema: HashMap> = schema + .fields + .iter() + .map(|f| (f.name().clone(), f.clone())) + .collect(); + // Update schema + _ = std::mem::replace(map, updated_schema); + + Ok(()) + } } #[derive(Deref, DerefMut, Default)] From 2376fa54a44a8d2de166c5698b28218a48d7ba00 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sun, 16 Feb 2025 11:43:11 +0530 Subject: [PATCH 4/5] refactor: process events with singuar stream context --- src/connectors/kafka/processor.rs | 2 +- src/event/mod.rs | 9 +++--- src/handlers/http/ingest.rs | 30 +++++++------------ src/handlers/http/modal/utils/ingest_utils.rs | 2 +- src/parseable/mod.rs | 2 +- 5 files changed, 18 insertions(+), 27 deletions(-) diff --git a/src/connectors/kafka/processor.rs b/src/connectors/kafka/processor.rs index d066b2658..86f613ab9 100644 --- a/src/connectors/kafka/processor.rs +++ b/src/connectors/kafka/processor.rs @@ -81,7 +81,7 @@ impl ParseableSinkProcessor { custom_partition_values: HashMap::new(), stream_type: StreamType::UserDefined, } - .process() + .process(&stream) .await?; Ok(()) diff --git a/src/event/mod.rs b/src/event/mod.rs index dae4c0c1c..d8d860303 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -25,7 +25,7 @@ use chrono::NaiveDateTime; use error::EventError; use itertools::Itertools; -use crate::{metadata::update_stats, parseable::PARSEABLE, storage::StreamType}; +use crate::{metadata::update_stats, parseable::Stream, storage::StreamType}; pub mod format; @@ -46,7 +46,7 @@ pub struct Event { // Events holds the schema related to a each event for a single log stream impl Event { - pub async fn process(self) -> Result<(), EventError> { + pub async fn process(self, stream: &Stream) -> Result<(), EventError> { let mut key = get_schema_key(&self.rb.schema().fields); if self.time_partition.is_some() { let parsed_timestamp_to_min = self.parsed_timestamp.format("%Y%m%dT%H%M").to_string(); @@ -59,7 +59,6 @@ impl Event { } } - let stream = PARSEABLE.get_or_create_stream(&self.stream_name); if self.is_first_event { stream.commit_schema(self.rb.schema())?; } @@ -85,10 +84,10 @@ impl Event { Ok(()) } - pub fn process_unchecked(&self) -> Result<(), EventError> { + pub fn process_unchecked(&self, stream: &Stream) -> Result<(), EventError> { let key = get_schema_key(&self.rb.schema().fields); - PARSEABLE.get_or_create_stream(&self.stream_name).push( + stream.push( &key, &self.rb, self.parsed_timestamp, diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index dab4f84df..e11db7061 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -29,8 +29,9 @@ use opentelemetry_proto::tonic::metrics::v1::MetricsData; use opentelemetry_proto::tonic::trace::v1::TracesData; use serde_json::Value; +use crate::event; use crate::event::error::EventError; -use crate::event::format::{self, EventFormat, LogSource}; +use crate::event::format::{json, EventFormat, LogSource}; use crate::handlers::{LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY}; use crate::metadata::SchemaVersion; use crate::option::Mode; @@ -41,7 +42,6 @@ use crate::parseable::{StreamNotFound, PARSEABLE}; use crate::storage::{ObjectStorageError, StreamType}; use crate::utils::header_parsing::ParseHeaderError; use crate::utils::json::flatten::JsonFlattenError; -use crate::{event, LOCK_EXPECT}; use super::logstream::error::{CreateStreamError, StreamError}; use super::modal::utils::ingest_utils::{flatten_and_push_logs, push_logs}; @@ -78,21 +78,12 @@ pub async fn ingest(req: HttpRequest, Json(json): Json) -> Result Result<(), PostError> { let size: usize = body.len(); let parsed_timestamp = Utc::now().naive_utc(); - let (rb, is_first) = { - let body_val: Value = serde_json::from_slice(&body)?; - let hash_map = PARSEABLE.streams.read().unwrap(); - let schema = hash_map - .get(&stream_name) - .ok_or_else(|| StreamNotFound(stream_name.clone()))? - .metadata - .read() - .expect(LOCK_EXPECT) - .schema - .clone(); - let event = format::json::Event { data: body_val }; - // For internal streams, use old schema - event.into_recordbatch(&schema, false, None, SchemaVersion::V0)? - }; + let stream = PARSEABLE.get_stream(&stream_name)?; + let body_val: Value = serde_json::from_slice(&body)?; + let schema = stream.get_schema_raw(); + let event = json::Event { data: body_val }; + // For internal streams, use old schema + let (rb, is_first) = event.into_recordbatch(&schema, false, None, SchemaVersion::V0)?; event::Event { rb, stream_name, @@ -104,7 +95,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result< custom_partition_values: HashMap::new(), stream_type: StreamType::Internal, } - .process() + .process(&stream) .await?; Ok(()) } @@ -254,6 +245,7 @@ pub async fn push_logs_unchecked( batches: RecordBatch, stream_name: &str, ) -> Result { + let stream = PARSEABLE.get_stream(stream_name)?; let unchecked_event = event::Event { rb: batches, stream_name: stream_name.to_string(), @@ -265,7 +257,7 @@ pub async fn push_logs_unchecked( custom_partition_values: HashMap::new(), // should be an empty map for unchecked push stream_type: StreamType::UserDefined, }; - unchecked_event.process_unchecked()?; + unchecked_event.process_unchecked(&stream)?; Ok(unchecked_event) } diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index 94679d613..78bf64fda 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -136,7 +136,7 @@ pub async fn push_logs( custom_partition_values, stream_type: StreamType::UserDefined, } - .process() + .process(&stream) .await?; } Ok(()) diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index 2fba1d5b5..7f6e4fbf0 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -28,7 +28,7 @@ use http::{header::CONTENT_TYPE, HeaderName, HeaderValue, StatusCode}; use once_cell::sync::Lazy; pub use staging::StagingError; use streams::StreamRef; -pub use streams::{StreamNotFound, Streams}; +pub use streams::{Stream, StreamNotFound, Streams}; use tracing::error; #[cfg(feature = "kafka")] From b7dd383c034910bd84dfa7dcd4c2c89b21dbe9e6 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sun, 16 Feb 2025 11:35:47 +0530 Subject: [PATCH 5/5] refactor: retire ingest utils --- src/handlers/http/ingest.rs | 84 ++++--- src/handlers/http/modal/utils/ingest_utils.rs | 222 ------------------ src/handlers/http/modal/utils/mod.rs | 1 - src/parseable/streams.rs | 160 ++++++++++++- 4 files changed, 206 insertions(+), 261 deletions(-) delete mode 100644 src/handlers/http/modal/utils/ingest_utils.rs diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index e11db7061..11e7bcfde 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -44,7 +44,6 @@ use crate::utils::header_parsing::ParseHeaderError; use crate::utils::json::flatten::JsonFlattenError; use super::logstream::error::{CreateStreamError, StreamError}; -use super::modal::utils::ingest_utils::{flatten_and_push_logs, push_logs}; use super::users::dashboards::DashboardError; use super::users::filters::FiltersError; @@ -70,7 +69,10 @@ pub async fn ingest(req: HttpRequest, Json(json): Json) -> Result. - * - */ - -use arrow_schema::Field; -use chrono::{DateTime, NaiveDateTime, Utc}; -use itertools::Itertools; -use serde_json::Value; -use std::{collections::HashMap, sync::Arc}; - -use crate::{ - event::{ - format::{json, EventFormat, LogSource}, - Event, - }, - handlers::http::ingest::PostError, - kinesis::{flatten_kinesis_logs, Message}, - metadata::SchemaVersion, - parseable::{StreamNotFound, PARSEABLE}, - storage::StreamType, - utils::json::{convert_array_to_object, flatten::convert_to_array}, - LOCK_EXPECT, -}; - -pub async fn flatten_and_push_logs( - json: Value, - stream_name: &str, - log_source: &LogSource, -) -> Result<(), PostError> { - match log_source { - LogSource::Kinesis => { - let message: Message = serde_json::from_value(json)?; - let json = flatten_kinesis_logs(message); - for record in json { - push_logs(stream_name, record, &LogSource::default()).await?; - } - } - LogSource::OtelLogs | LogSource::OtelMetrics | LogSource::OtelTraces => { - return Err(PostError::OtelNotSupported); - } - _ => { - push_logs(stream_name, json, log_source).await?; - } - } - Ok(()) -} - -pub async fn push_logs( - stream_name: &str, - json: Value, - log_source: &LogSource, -) -> Result<(), PostError> { - let stream = PARSEABLE.get_stream(stream_name)?; - let time_partition = stream.get_time_partition(); - let time_partition_limit = PARSEABLE - .get_stream(stream_name)? - .get_time_partition_limit(); - let static_schema_flag = stream.get_static_schema_flag(); - let custom_partition = stream.get_custom_partition(); - let schema_version = stream.get_schema_version(); - - let data = if time_partition.is_some() || custom_partition.is_some() { - convert_array_to_object( - json, - time_partition.as_ref(), - time_partition_limit, - custom_partition.as_ref(), - schema_version, - log_source, - )? - } else { - vec![convert_to_array(convert_array_to_object( - json, - None, - None, - None, - schema_version, - log_source, - )?)?] - }; - - for value in data { - let origin_size = serde_json::to_vec(&value).unwrap().len() as u64; // string length need not be the same as byte length - let parsed_timestamp = match time_partition.as_ref() { - Some(time_partition) => get_parsed_timestamp(&value, time_partition)?, - _ => Utc::now().naive_utc(), - }; - let custom_partition_values = match custom_partition.as_ref() { - Some(custom_partition) => { - let custom_partitions = custom_partition.split(',').collect_vec(); - get_custom_partition_values(&value, &custom_partitions) - } - None => HashMap::new(), - }; - let schema = PARSEABLE - .streams - .read() - .unwrap() - .get(stream_name) - .ok_or_else(|| StreamNotFound(stream_name.to_owned()))? - .metadata - .read() - .expect(LOCK_EXPECT) - .schema - .clone(); - let (rb, is_first_event) = into_event_batch( - value, - schema, - static_schema_flag, - time_partition.as_ref(), - schema_version, - )?; - - Event { - rb, - stream_name: stream_name.to_owned(), - origin_format: "json", - origin_size, - is_first_event, - parsed_timestamp, - time_partition: time_partition.clone(), - custom_partition_values, - stream_type: StreamType::UserDefined, - } - .process(&stream) - .await?; - } - Ok(()) -} - -pub fn into_event_batch( - data: Value, - schema: HashMap>, - static_schema_flag: bool, - time_partition: Option<&String>, - schema_version: SchemaVersion, -) -> Result<(arrow_array::RecordBatch, bool), PostError> { - let (rb, is_first) = json::Event { data }.into_recordbatch( - &schema, - static_schema_flag, - time_partition, - schema_version, - )?; - Ok((rb, is_first)) -} - -pub fn get_custom_partition_values( - json: &Value, - custom_partition_list: &[&str], -) -> HashMap { - let mut custom_partition_values: HashMap = HashMap::new(); - for custom_partition_field in custom_partition_list { - let custom_partition_value = json.get(custom_partition_field.trim()).unwrap().to_owned(); - let custom_partition_value = match custom_partition_value { - e @ Value::Number(_) | e @ Value::Bool(_) => e.to_string(), - Value::String(s) => s, - _ => "".to_string(), - }; - custom_partition_values.insert( - custom_partition_field.trim().to_string(), - custom_partition_value, - ); - } - custom_partition_values -} - -fn get_parsed_timestamp(json: &Value, time_partition: &str) -> Result { - let current_time = json - .get(time_partition) - .ok_or_else(|| PostError::MissingTimePartition(time_partition.to_string()))?; - let parsed_time: DateTime = serde_json::from_value(current_time.clone())?; - - Ok(parsed_time.naive_utc()) -} - -#[cfg(test)] -mod tests { - use std::str::FromStr; - - use serde_json::json; - - use super::*; - - #[test] - fn parse_time_parition_from_value() { - let json = json!({"timestamp": "2025-05-15T15:30:00Z"}); - let parsed = get_parsed_timestamp(&json, "timestamp"); - - let expected = NaiveDateTime::from_str("2025-05-15T15:30:00").unwrap(); - assert_eq!(parsed.unwrap(), expected); - } - - #[test] - fn time_parition_not_in_json() { - let json = json!({"timestamp": "2025-05-15T15:30:00Z"}); - let parsed = get_parsed_timestamp(&json, "timestamp"); - - matches!(parsed, Err(PostError::MissingTimePartition(_))); - } - - #[test] - fn time_parition_not_parseable_as_datetime() { - let json = json!({"timestamp": "2025-05-15T15:30:00Z"}); - let parsed = get_parsed_timestamp(&json, "timestamp"); - - matches!(parsed, Err(PostError::SerdeError(_))); - } -} diff --git a/src/handlers/http/modal/utils/mod.rs b/src/handlers/http/modal/utils/mod.rs index 61930d43d..1d0a3767b 100644 --- a/src/handlers/http/modal/utils/mod.rs +++ b/src/handlers/http/modal/utils/mod.rs @@ -16,6 +16,5 @@ * */ -pub mod ingest_utils; pub mod logstream_utils; pub mod rbac_utils; diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 563a4b5df..f31614c8f 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -29,7 +29,7 @@ use std::{ use arrow_array::RecordBatch; use arrow_ipc::writer::StreamWriter; use arrow_schema::{ArrowError, Field, Fields, Schema}; -use chrono::{NaiveDateTime, Timelike, Utc}; +use chrono::{DateTime, NaiveDateTime, Timelike, Utc}; use derive_more::{Deref, DerefMut}; use itertools::Itertools; use parquet::{ @@ -41,18 +41,27 @@ use parquet::{ }; use rand::distributions::DistString; use relative_path::RelativePathBuf; +use serde_json::Value; use tracing::{error, info, trace, warn}; use crate::{ cli::Options, - event::DEFAULT_TIMESTAMP_KEY, + event::{ + format::{json, EventFormat, LogSource}, + Event, DEFAULT_TIMESTAMP_KEY, + }, + handlers::http::ingest::PostError, + kinesis::{flatten_kinesis_logs, Message}, metadata::{LogStreamMetadata, SchemaVersion}, metrics, option::Mode, storage::{ object_storage::to_bytes, retention::Retention, StreamType, OBJECT_STORE_DATA_GRANULARITY, }, - utils::minute_to_slot, + utils::{ + json::{convert_array_to_object, flatten::convert_to_array}, + minute_to_slot, + }, LOCK_EXPECT, }; @@ -103,6 +112,94 @@ impl Stream { }) } + pub async fn flatten_and_push_logs( + &self, + json: Value, + log_source: &LogSource, + ) -> Result<(), PostError> { + match log_source { + LogSource::Kinesis => { + let message: Message = serde_json::from_value(json)?; + let json = flatten_kinesis_logs(message); + for record in json { + self.push_logs(record, &LogSource::default()).await?; + } + } + LogSource::OtelLogs | LogSource::OtelMetrics | LogSource::OtelTraces => { + return Err(PostError::OtelNotSupported); + } + _ => { + self.push_logs(json, log_source).await?; + } + } + Ok(()) + } + + pub async fn push_logs(&self, json: Value, log_source: &LogSource) -> Result<(), PostError> { + let time_partition = self.get_time_partition(); + let time_partition_limit = self.get_time_partition_limit(); + let static_schema_flag = self.get_static_schema_flag(); + let custom_partition = self.get_custom_partition(); + let schema_version = self.get_schema_version(); + + let data = if time_partition.is_some() || custom_partition.is_some() { + convert_array_to_object( + json, + time_partition.as_ref(), + time_partition_limit, + custom_partition.as_ref(), + schema_version, + log_source, + )? + } else { + vec![convert_to_array(convert_array_to_object( + json, + None, + None, + None, + schema_version, + log_source, + )?)?] + }; + + for value in data { + let origin_size = serde_json::to_vec(&value).unwrap().len() as u64; // string length need not be the same as byte length + let parsed_timestamp = match time_partition.as_ref() { + Some(time_partition) => get_parsed_timestamp(&value, time_partition)?, + _ => Utc::now().naive_utc(), + }; + let custom_partition_values = match custom_partition.as_ref() { + Some(custom_partition) => { + let custom_partitions = custom_partition.split(',').collect_vec(); + get_custom_partition_values(&value, &custom_partitions) + } + None => HashMap::new(), + }; + let schema = self.metadata.read().expect(LOCK_EXPECT).schema.clone(); + let (rb, is_first_event) = json::Event { data: value }.into_recordbatch( + &schema, + static_schema_flag, + time_partition.as_ref(), + schema_version, + )?; + + Event { + rb, + stream_name: self.stream_name.to_owned(), + origin_format: "json", + origin_size, + is_first_event, + parsed_timestamp, + time_partition: time_partition.clone(), + custom_partition_values, + stream_type: StreamType::UserDefined, + } + .process(self) + .await?; + } + Ok(()) + } + // Concatenates record batches and puts them in memory store for each event. pub fn push( &self, @@ -753,18 +850,73 @@ impl Streams { } } +fn get_custom_partition_values( + json: &Value, + custom_partition_list: &[&str], +) -> HashMap { + let mut custom_partition_values: HashMap = HashMap::new(); + for custom_partition_field in custom_partition_list { + let custom_partition_value = json.get(custom_partition_field.trim()).unwrap().to_owned(); + let custom_partition_value = match custom_partition_value { + e @ Value::Number(_) | e @ Value::Bool(_) => e.to_string(), + Value::String(s) => s, + _ => "".to_string(), + }; + custom_partition_values.insert( + custom_partition_field.trim().to_string(), + custom_partition_value, + ); + } + custom_partition_values +} + +fn get_parsed_timestamp(json: &Value, time_partition: &str) -> Result { + let current_time = json + .get(time_partition) + .ok_or_else(|| PostError::MissingTimePartition(time_partition.to_string()))?; + let parsed_time: DateTime = serde_json::from_value(current_time.clone())?; + + Ok(parsed_time.naive_utc()) +} + #[cfg(test)] mod tests { - use std::time::Duration; + use std::{str::FromStr, time::Duration}; use arrow_array::{Int32Array, StringArray, TimestampMillisecondArray}; use arrow_schema::{DataType, Field, TimeUnit}; use chrono::{NaiveDate, TimeDelta}; + use serde_json::json; use temp_dir::TempDir; use tokio::time::sleep; use super::*; + #[test] + fn parse_time_parition_from_value() { + let json = json!({"timestamp": "2025-05-15T15:30:00Z"}); + let parsed = get_parsed_timestamp(&json, "timestamp"); + + let expected = NaiveDateTime::from_str("2025-05-15T15:30:00").unwrap(); + assert_eq!(parsed.unwrap(), expected); + } + + #[test] + fn time_parition_not_in_json() { + let json = json!({"timestamp": "2025-05-15T15:30:00Z"}); + let parsed = get_parsed_timestamp(&json, "timestamp"); + + matches!(parsed, Err(PostError::MissingTimePartition(_))); + } + + #[test] + fn time_parition_not_parseable_as_datetime() { + let json = json!({"timestamp": "2025-05-15T15:30:00Z"}); + let parsed = get_parsed_timestamp(&json, "timestamp"); + + matches!(parsed, Err(PostError::SerdeError(_))); + } + #[test] fn test_staging_new_with_valid_stream() { let stream_name = "test_stream";