diff --git a/src/connectors/kafka/processor.rs b/src/connectors/kafka/processor.rs index 191396660..15bb47c11 100644 --- a/src/connectors/kafka/processor.rs +++ b/src/connectors/kafka/processor.rs @@ -42,10 +42,7 @@ use super::{config::BufferConfig, ConsumerRecord, StreamConsumer, TopicPartition pub struct ParseableSinkProcessor; impl ParseableSinkProcessor { - async fn build_event_from_chunk( - &self, - records: &[ConsumerRecord], - ) -> anyhow::Result { + async fn process_event_from_chunk(&self, records: &[ConsumerRecord]) -> anyhow::Result<()> { let stream_name = records .first() .map(|r| r.topic.as_str()) @@ -73,7 +70,7 @@ impl ParseableSinkProcessor { schema_version, )?; - let p_event = ParseableEvent { + ParseableEvent { rb, stream_name: stream_name.to_string(), origin_format: "json", @@ -83,9 +80,10 @@ impl ParseableSinkProcessor { time_partition: None, custom_partition_values: HashMap::new(), stream_type: StreamType::UserDefined, - }; + } + .process(&stream)?; - Ok(p_event) + Ok(()) } fn json_vec(records: &[ConsumerRecord]) -> (Vec, u64) { @@ -109,7 +107,7 @@ impl Processor, ()> for ParseableSinkProcessor { let len = records.len(); debug!("Processing {} records", len); - self.build_event_from_chunk(&records).await?.process()?; + self.process_event_from_chunk(&records).await?; debug!("Processed {} records", len); Ok(()) diff --git a/src/event/mod.rs b/src/event/mod.rs index 42b6514f9..c503f80a1 100644 --- a/src/event/mod.rs +++ b/src/event/mod.rs @@ -17,17 +17,17 @@ * */ -pub mod format; +use std::{collections::HashMap, sync::Arc}; use arrow_array::RecordBatch; -use arrow_schema::{Field, Fields, Schema}; +use arrow_schema::Field; +use chrono::NaiveDateTime; +use error::EventError; use itertools::Itertools; -use std::sync::Arc; -use self::error::EventError; -use crate::{metadata::update_stats, parseable::PARSEABLE, storage::StreamType, LOCK_EXPECT}; -use chrono::NaiveDateTime; -use std::collections::HashMap; +use crate::{metadata::update_stats, parseable::Stream, storage::StreamType}; + +pub mod format; pub const DEFAULT_TIMESTAMP_KEY: &str = "p_timestamp"; @@ -46,7 +46,7 @@ pub struct Event { // Events holds the schema related to a each event for a single log stream impl Event { - pub fn process(self) -> Result<(), EventError> { + pub fn process(self, stream: &Stream) -> Result<(), EventError> { let mut key = get_schema_key(&self.rb.schema().fields); if self.time_partition.is_some() { let parsed_timestamp_to_min = self.parsed_timestamp.format("%Y%m%dT%H%M").to_string(); @@ -60,10 +60,10 @@ impl Event { } if self.is_first_event { - commit_schema(&self.stream_name, self.rb.schema())?; + stream.commit_schema(self.rb.schema())?; } - PARSEABLE.get_or_create_stream(&self.stream_name).push( + stream.push( &key, &self.rb, self.parsed_timestamp, @@ -84,10 +84,10 @@ impl Event { Ok(()) } - pub fn process_unchecked(&self) -> Result<(), EventError> { + pub fn process_unchecked(&self, stream: &Stream) -> Result<(), EventError> { let key = get_schema_key(&self.rb.schema().fields); - PARSEABLE.get_or_create_stream(&self.stream_name).push( + stream.push( &key, &self.rb, self.parsed_timestamp, @@ -109,23 +109,6 @@ pub fn get_schema_key(fields: &[Arc]) -> String { format!("{hash:x}") } -pub fn commit_schema(stream_name: &str, schema: Arc) -> Result<(), EventError> { - let mut stream_metadata = PARSEABLE.streams.write().expect("lock poisoned"); - - let map = &mut stream_metadata - .get_mut(stream_name) - .expect("map has entry for this stream name") - .metadata - .write() - .expect(LOCK_EXPECT) - .schema; - let current_schema = Schema::new(map.values().cloned().collect::()); - let schema = Schema::try_merge(vec![current_schema, schema.as_ref().clone()])?; - map.clear(); - map.extend(schema.fields.iter().map(|f| (f.name().clone(), f.clone()))); - Ok(()) -} - pub mod error { use arrow_schema::ArrowError; diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 6706470a2..5990b2d34 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -26,8 +26,9 @@ use chrono::Utc; use http::StatusCode; use serde_json::Value; +use crate::event; use crate::event::error::EventError; -use crate::event::format::{self, EventFormat, LogSource}; +use crate::event::format::{json, EventFormat, LogSource}; use crate::handlers::{LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY}; use crate::metadata::SchemaVersion; use crate::option::Mode; @@ -35,10 +36,8 @@ use crate::parseable::{StreamNotFound, PARSEABLE}; use crate::storage::{ObjectStorageError, StreamType}; use crate::utils::header_parsing::ParseHeaderError; use crate::utils::json::flatten::JsonFlattenError; -use crate::{event, LOCK_EXPECT}; use super::logstream::error::{CreateStreamError, StreamError}; -use super::modal::utils::ingest_utils::flatten_and_push_logs; use super::users::dashboards::DashboardError; use super::users::filters::FiltersError; @@ -72,7 +71,10 @@ pub async fn ingest(req: HttpRequest, Json(json): Json) -> Result) -> Result Result<(), PostError> { let size: usize = body.len(); let parsed_timestamp = Utc::now().naive_utc(); - let (rb, is_first) = { - let body_val: Value = serde_json::from_slice(&body)?; - let hash_map = PARSEABLE.streams.read().unwrap(); - let schema = hash_map - .get(&stream_name) - .ok_or_else(|| StreamNotFound(stream_name.clone()))? - .metadata - .read() - .expect(LOCK_EXPECT) - .schema - .clone(); - let event = format::json::Event { data: body_val }; - // For internal streams, use old schema - event.into_recordbatch(&schema, false, None, SchemaVersion::V0)? - }; + let stream = PARSEABLE.get_stream(&stream_name)?; + let body_val: Value = serde_json::from_slice(&body)?; + let schema = stream.get_schema_raw(); + let event = json::Event { data: body_val }; + // For internal streams, use old schema + let (rb, is_first) = event.into_recordbatch(&schema, false, None, SchemaVersion::V0)?; event::Event { rb, stream_name, @@ -106,7 +99,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result< custom_partition_values: HashMap::new(), stream_type: StreamType::Internal, } - .process()?; + .process(&stream)?; Ok(()) } @@ -135,7 +128,10 @@ pub async fn handle_otel_logs_ingestion( .create_stream_if_not_exists(&stream_name, StreamType::UserDefined, LogSource::OtelLogs) .await?; - flatten_and_push_logs(json, &stream_name, &log_source).await?; + PARSEABLE + .get_stream(&stream_name)? + .flatten_and_push_logs(json, &log_source) + .await?; Ok(HttpResponse::Ok().finish()) } @@ -166,7 +162,10 @@ pub async fn handle_otel_metrics_ingestion( ) .await?; - flatten_and_push_logs(json, &stream_name, &log_source).await?; + PARSEABLE + .get_stream(&stream_name)? + .flatten_and_push_logs(json, &log_source) + .await?; Ok(HttpResponse::Ok().finish()) } @@ -194,7 +193,10 @@ pub async fn handle_otel_traces_ingestion( .create_stream_if_not_exists(&stream_name, StreamType::UserDefined, LogSource::OtelTraces) .await?; - flatten_and_push_logs(json, &stream_name, &log_source).await?; + PARSEABLE + .get_stream(&stream_name)? + .flatten_and_push_logs(json, &log_source) + .await?; Ok(HttpResponse::Ok().finish()) } @@ -243,7 +245,10 @@ pub async fn post_event( return Err(PostError::OtelNotSupported); } - flatten_and_push_logs(json, &stream_name, &log_source).await?; + PARSEABLE + .get_stream(&stream_name)? + .flatten_and_push_logs(json, &log_source) + .await?; Ok(HttpResponse::Ok().finish()) } @@ -252,6 +257,7 @@ pub async fn push_logs_unchecked( batches: RecordBatch, stream_name: &str, ) -> Result { + let stream = PARSEABLE.get_stream(stream_name)?; let unchecked_event = event::Event { rb: batches, stream_name: stream_name.to_string(), @@ -263,7 +269,7 @@ pub async fn push_logs_unchecked( custom_partition_values: HashMap::new(), // should be an empty map for unchecked push stream_type: StreamType::UserDefined, }; - unchecked_event.process_unchecked()?; + unchecked_event.process_unchecked(&stream)?; Ok(unchecked_event) } @@ -355,7 +361,7 @@ mod tests { use std::{collections::HashMap, sync::Arc}; use crate::{ - handlers::http::modal::utils::ingest_utils::into_event_batch, + event::format::{json, EventFormat}, metadata::SchemaVersion, utils::json::{convert_array_to_object, flatten::convert_to_array}, }; @@ -392,8 +398,9 @@ mod tests { "b": "hello", }); - let (rb, _) = - into_event_batch(json, HashMap::default(), false, None, SchemaVersion::V0).unwrap(); + let (rb, _) = json::Event { data: json } + .into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0) + .unwrap(); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 4); @@ -419,8 +426,9 @@ mod tests { "c": null }); - let (rb, _) = - into_event_batch(json, HashMap::default(), false, None, SchemaVersion::V0).unwrap(); + let (rb, _) = json::Event { data: json } + .into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0) + .unwrap(); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 3); @@ -450,7 +458,9 @@ mod tests { .into_iter(), ); - let (rb, _) = into_event_batch(json, schema, false, None, SchemaVersion::V0).unwrap(); + let (rb, _) = json::Event { data: json } + .into_recordbatch(&schema, false, None, SchemaVersion::V0) + .unwrap(); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 3); @@ -480,7 +490,9 @@ mod tests { .into_iter(), ); - assert!(into_event_batch(json, schema, false, None, SchemaVersion::V0,).is_err()); + assert!(json::Event { data: json } + .into_recordbatch(&schema, false, None, SchemaVersion::V0) + .is_err()); } #[test] @@ -496,7 +508,9 @@ mod tests { .into_iter(), ); - let (rb, _) = into_event_batch(json, schema, false, None, SchemaVersion::V0).unwrap(); + let (rb, _) = json::Event { data: json } + .into_recordbatch(&schema, false, None, SchemaVersion::V0) + .unwrap(); assert_eq!(rb.num_rows(), 1); assert_eq!(rb.num_columns(), 1); @@ -535,8 +549,9 @@ mod tests { }, ]); - let (rb, _) = - into_event_batch(json, HashMap::default(), false, None, SchemaVersion::V0).unwrap(); + let (rb, _) = json::Event { data: json } + .into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0) + .unwrap(); assert_eq!(rb.num_rows(), 3); assert_eq!(rb.num_columns(), 4); @@ -582,8 +597,9 @@ mod tests { }, ]); - let (rb, _) = - into_event_batch(json, HashMap::default(), false, None, SchemaVersion::V0).unwrap(); + let (rb, _) = json::Event { data: json } + .into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0) + .unwrap(); assert_eq!(rb.num_rows(), 3); assert_eq!(rb.num_columns(), 4); @@ -630,7 +646,9 @@ mod tests { .into_iter(), ); - let (rb, _) = into_event_batch(json, schema, false, None, SchemaVersion::V0).unwrap(); + let (rb, _) = json::Event { data: json } + .into_recordbatch(&schema, false, None, SchemaVersion::V0) + .unwrap(); assert_eq!(rb.num_rows(), 3); assert_eq!(rb.num_columns(), 4); @@ -677,7 +695,9 @@ mod tests { .into_iter(), ); - assert!(into_event_batch(json, schema, false, None, SchemaVersion::V0,).is_err()); + assert!(json::Event { data: json } + .into_recordbatch(&schema, false, None, SchemaVersion::V0) + .is_err()); } #[test] @@ -715,13 +735,10 @@ mod tests { ) .unwrap(); - let (rb, _) = into_event_batch( - flattened_json, - HashMap::default(), - false, - None, - SchemaVersion::V0, - ) + let (rb, _) = json::Event { + data: flattened_json, + } + .into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V0) .unwrap(); assert_eq!(rb.num_rows(), 4); assert_eq!(rb.num_columns(), 5); @@ -803,13 +820,10 @@ mod tests { ) .unwrap(); - let (rb, _) = into_event_batch( - flattened_json, - HashMap::default(), - false, - None, - SchemaVersion::V1, - ) + let (rb, _) = json::Event { + data: flattened_json, + } + .into_recordbatch(&HashMap::default(), false, None, SchemaVersion::V1) .unwrap(); assert_eq!(rb.num_rows(), 4); diff --git a/src/handlers/http/mod.rs b/src/handlers/http/mod.rs index f1f702d4b..4bdf85adf 100644 --- a/src/handlers/http/mod.rs +++ b/src/handlers/http/mod.rs @@ -34,7 +34,6 @@ pub mod cluster; pub mod correlation; pub mod health_check; pub mod ingest; -mod kinesis; pub mod llm; pub mod logstream; pub mod middleware; diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs deleted file mode 100644 index 005b38a91..000000000 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ /dev/null @@ -1,245 +0,0 @@ -/* - * Parseable Server (C) 2022 - 2024 Parseable, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - */ - -use arrow_schema::Field; -use chrono::{DateTime, NaiveDateTime, Utc}; -use itertools::Itertools; -use opentelemetry_proto::tonic::{ - logs::v1::LogsData, metrics::v1::MetricsData, trace::v1::TracesData, -}; -use serde_json::Value; -use std::{collections::HashMap, sync::Arc}; - -use crate::{ - event::{ - format::{json, EventFormat, LogSource}, - Event, - }, - handlers::http::{ - ingest::PostError, - kinesis::{flatten_kinesis_logs, Message}, - }, - metadata::SchemaVersion, - otel::{logs::flatten_otel_logs, metrics::flatten_otel_metrics, traces::flatten_otel_traces}, - parseable::{StreamNotFound, PARSEABLE}, - storage::StreamType, - utils::json::{convert_array_to_object, flatten::convert_to_array}, - LOCK_EXPECT, -}; - -pub async fn flatten_and_push_logs( - json: Value, - stream_name: &str, - log_source: &LogSource, -) -> Result<(), PostError> { - match log_source { - LogSource::Kinesis => { - //custom flattening required for Amazon Kinesis - let message: Message = serde_json::from_value(json)?; - for record in flatten_kinesis_logs(message) { - push_logs(stream_name, record, &LogSource::default()).await?; - } - } - LogSource::OtelLogs => { - //custom flattening required for otel logs - let logs: LogsData = serde_json::from_value(json)?; - for record in flatten_otel_logs(&logs) { - push_logs(stream_name, record, log_source).await?; - } - } - LogSource::OtelTraces => { - //custom flattening required for otel traces - let traces: TracesData = serde_json::from_value(json)?; - for record in flatten_otel_traces(&traces) { - push_logs(stream_name, record, log_source).await?; - } - } - LogSource::OtelMetrics => { - //custom flattening required for otel metrics - let metrics: MetricsData = serde_json::from_value(json)?; - for record in flatten_otel_metrics(metrics) { - push_logs(stream_name, record, log_source).await?; - } - } - _ => { - push_logs(stream_name, json, log_source).await?; - } - } - Ok(()) -} - -async fn push_logs( - stream_name: &str, - json: Value, - log_source: &LogSource, -) -> Result<(), PostError> { - let stream = PARSEABLE.get_stream(stream_name)?; - let time_partition = stream.get_time_partition(); - let time_partition_limit = PARSEABLE - .get_stream(stream_name)? - .get_time_partition_limit(); - let static_schema_flag = stream.get_static_schema_flag(); - let custom_partition = stream.get_custom_partition(); - let schema_version = stream.get_schema_version(); - - let data = if time_partition.is_some() || custom_partition.is_some() { - convert_array_to_object( - json, - time_partition.as_ref(), - time_partition_limit, - custom_partition.as_ref(), - schema_version, - log_source, - )? - } else { - vec![convert_to_array(convert_array_to_object( - json, - None, - None, - None, - schema_version, - log_source, - )?)?] - }; - - for value in data { - let origin_size = serde_json::to_vec(&value).unwrap().len() as u64; // string length need not be the same as byte length - let parsed_timestamp = match time_partition.as_ref() { - Some(time_partition) => get_parsed_timestamp(&value, time_partition)?, - _ => Utc::now().naive_utc(), - }; - let custom_partition_values = match custom_partition.as_ref() { - Some(custom_partition) => { - let custom_partitions = custom_partition.split(',').collect_vec(); - get_custom_partition_values(&value, &custom_partitions) - } - None => HashMap::new(), - }; - let schema = PARSEABLE - .streams - .read() - .unwrap() - .get(stream_name) - .ok_or_else(|| StreamNotFound(stream_name.to_owned()))? - .metadata - .read() - .expect(LOCK_EXPECT) - .schema - .clone(); - let (rb, is_first_event) = into_event_batch( - value, - schema, - static_schema_flag, - time_partition.as_ref(), - schema_version, - )?; - - Event { - rb, - stream_name: stream_name.to_owned(), - origin_format: "json", - origin_size, - is_first_event, - parsed_timestamp, - time_partition: time_partition.clone(), - custom_partition_values, - stream_type: StreamType::UserDefined, - } - .process()?; - } - Ok(()) -} - -pub fn into_event_batch( - data: Value, - schema: HashMap>, - static_schema_flag: bool, - time_partition: Option<&String>, - schema_version: SchemaVersion, -) -> Result<(arrow_array::RecordBatch, bool), PostError> { - let (rb, is_first) = json::Event { data }.into_recordbatch( - &schema, - static_schema_flag, - time_partition, - schema_version, - )?; - Ok((rb, is_first)) -} - -pub fn get_custom_partition_values( - json: &Value, - custom_partition_list: &[&str], -) -> HashMap { - let mut custom_partition_values: HashMap = HashMap::new(); - for custom_partition_field in custom_partition_list { - let custom_partition_value = json.get(custom_partition_field.trim()).unwrap().to_owned(); - let custom_partition_value = match custom_partition_value { - e @ Value::Number(_) | e @ Value::Bool(_) => e.to_string(), - Value::String(s) => s, - _ => "".to_string(), - }; - custom_partition_values.insert( - custom_partition_field.trim().to_string(), - custom_partition_value, - ); - } - custom_partition_values -} - -fn get_parsed_timestamp(json: &Value, time_partition: &str) -> Result { - let current_time = json - .get(time_partition) - .ok_or_else(|| PostError::MissingTimePartition(time_partition.to_string()))?; - let parsed_time: DateTime = serde_json::from_value(current_time.clone())?; - - Ok(parsed_time.naive_utc()) -} - -#[cfg(test)] -mod tests { - use std::str::FromStr; - - use serde_json::json; - - use super::*; - - #[test] - fn parse_time_parition_from_value() { - let json = json!({"timestamp": "2025-05-15T15:30:00Z"}); - let parsed = get_parsed_timestamp(&json, "timestamp"); - - let expected = NaiveDateTime::from_str("2025-05-15T15:30:00").unwrap(); - assert_eq!(parsed.unwrap(), expected); - } - - #[test] - fn time_parition_not_in_json() { - let json = json!({"timestamp": "2025-05-15T15:30:00Z"}); - let parsed = get_parsed_timestamp(&json, "timestamp"); - - matches!(parsed, Err(PostError::MissingTimePartition(_))); - } - - #[test] - fn time_parition_not_parseable_as_datetime() { - let json = json!({"timestamp": "2025-05-15T15:30:00Z"}); - let parsed = get_parsed_timestamp(&json, "timestamp"); - - matches!(parsed, Err(PostError::SerdeError(_))); - } -} diff --git a/src/handlers/http/modal/utils/mod.rs b/src/handlers/http/modal/utils/mod.rs index 61930d43d..1d0a3767b 100644 --- a/src/handlers/http/modal/utils/mod.rs +++ b/src/handlers/http/modal/utils/mod.rs @@ -16,6 +16,5 @@ * */ -pub mod ingest_utils; pub mod logstream_utils; pub mod rbac_utils; diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index 9e5d405b0..09d6bd088 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -19,6 +19,7 @@ use actix_web::http::header::ContentType; use actix_web::web::{self, Json}; use actix_web::{FromRequest, HttpRequest, HttpResponse, Responder}; +use arrow_schema::ArrowError; use chrono::{DateTime, Utc}; use datafusion::common::tree_node::TreeNode; use datafusion::error::DataFusionError; @@ -36,10 +37,9 @@ use tracing::error; use crate::event::error::EventError; use crate::handlers::http::fetch_schema; -use crate::event::commit_schema; use crate::metrics::QUERY_EXECUTE_TIME; use crate::option::Mode; -use crate::parseable::PARSEABLE; +use crate::parseable::{StreamNotFound, PARSEABLE}; use crate::query::error::ExecuteError; use crate::query::{CountsRequest, CountsResponse, Query as LogicalQuery}; use crate::query::{TableScanVisitor, QUERY_SESSION}; @@ -174,7 +174,9 @@ pub async fn update_schema_when_distributed(tables: &Vec) -> Result<(), // commit schema merges the schema internally and updates the schema in storage. commit_schema_to_storage(table, new_schema.clone()).await?; - commit_schema(table, Arc::new(new_schema))?; + PARSEABLE + .get_stream(table)? + .commit_schema(Arc::new(new_schema))?; } } } @@ -318,6 +320,10 @@ Description: {0}"# ActixError(#[from] actix_web::Error), #[error("Error: {0}")] Anyhow(#[from] anyhow::Error), + #[error("Missing stream {0}")] + StreamNotFound(#[from] StreamNotFound), + #[error("Arrow error: {0}")] + Arrow(#[from] ArrowError), } impl actix_web::ResponseError for QueryError { diff --git a/src/handlers/http/kinesis.rs b/src/kinesis.rs similarity index 100% rename from src/handlers/http/kinesis.rs rename to src/kinesis.rs diff --git a/src/lib.rs b/src/lib.rs index 91fa0a405..fa28f7051 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -29,6 +29,7 @@ pub mod correlation; mod event; pub mod handlers; pub mod hottier; +mod kinesis; mod livetail; mod metadata; pub mod metrics; diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index 2fba1d5b5..7f6e4fbf0 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -28,7 +28,7 @@ use http::{header::CONTENT_TYPE, HeaderName, HeaderValue, StatusCode}; use once_cell::sync::Lazy; pub use staging::StagingError; use streams::StreamRef; -pub use streams::{StreamNotFound, Streams}; +pub use streams::{Stream, StreamNotFound, Streams}; use tracing::error; #[cfg(feature = "kafka")] diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 7d8decdca..2ee4a9da4 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -28,10 +28,13 @@ use std::{ use arrow_array::RecordBatch; use arrow_ipc::writer::StreamWriter; -use arrow_schema::{Field, Fields, Schema}; -use chrono::{NaiveDateTime, Timelike, Utc}; +use arrow_schema::{ArrowError, Field, Fields, Schema}; +use chrono::{DateTime, NaiveDateTime, Timelike, Utc}; use derive_more::{Deref, DerefMut}; use itertools::Itertools; +use opentelemetry_proto::tonic::{ + logs::v1::LogsData, metrics::v1::MetricsData, trace::v1::TracesData, +}; use parquet::{ arrow::ArrowWriter, basic::Encoding, @@ -41,18 +44,28 @@ use parquet::{ }; use rand::distributions::DistString; use relative_path::RelativePathBuf; +use serde_json::Value; use tracing::{error, info, trace, warn}; use crate::{ cli::Options, - event::DEFAULT_TIMESTAMP_KEY, + event::{ + format::{json, EventFormat, LogSource}, + Event, DEFAULT_TIMESTAMP_KEY, + }, + handlers::http::ingest::PostError, + kinesis::{flatten_kinesis_logs, Message}, metadata::{LogStreamMetadata, SchemaVersion}, metrics, option::Mode, + otel::{logs::flatten_otel_logs, metrics::flatten_otel_metrics, traces::flatten_otel_traces}, storage::{ object_storage::to_bytes, retention::Retention, StreamType, OBJECT_STORE_DATA_GRANULARITY, }, - utils::minute_to_slot, + utils::{ + json::{convert_array_to_object, flatten::convert_to_array}, + minute_to_slot, + }, LOCK_EXPECT, }; @@ -103,6 +116,112 @@ impl Stream { }) } + pub async fn flatten_and_push_logs( + &self, + json: Value, + log_source: &LogSource, + ) -> Result<(), PostError> { + match log_source { + LogSource::Kinesis => { + //custom flattening required for Amazon Kinesis + let message: Message = serde_json::from_value(json)?; + for record in flatten_kinesis_logs(message) { + self.push_logs(record, &LogSource::default()).await?; + } + } + LogSource::OtelLogs => { + //custom flattening required for otel logs + let logs: LogsData = serde_json::from_value(json)?; + for record in flatten_otel_logs(&logs) { + self.push_logs(record, log_source).await?; + } + } + LogSource::OtelTraces => { + //custom flattening required for otel traces + let traces: TracesData = serde_json::from_value(json)?; + for record in flatten_otel_traces(&traces) { + self.push_logs(record, log_source).await?; + } + } + LogSource::OtelMetrics => { + //custom flattening required for otel metrics + let metrics: MetricsData = serde_json::from_value(json)?; + for record in flatten_otel_metrics(metrics) { + self.push_logs(record, log_source).await?; + } + } + _ => { + self.push_logs(json, log_source).await?; + } + } + Ok(()) + } + + pub async fn push_logs(&self, json: Value, log_source: &LogSource) -> Result<(), PostError> { + let time_partition = self.get_time_partition(); + let time_partition_limit = self.get_time_partition_limit(); + let static_schema_flag = self.get_static_schema_flag(); + let custom_partition = self.get_custom_partition(); + let schema_version = self.get_schema_version(); + + let data = if time_partition.is_some() || custom_partition.is_some() { + convert_array_to_object( + json, + time_partition.as_ref(), + time_partition_limit, + custom_partition.as_ref(), + schema_version, + log_source, + )? + } else { + vec![convert_to_array(convert_array_to_object( + json, + None, + None, + None, + schema_version, + log_source, + )?)?] + }; + + for value in data { + let origin_size = serde_json::to_vec(&value).unwrap().len() as u64; // string length need not be the same as byte length + let parsed_timestamp = match time_partition.as_ref() { + Some(time_partition) => get_parsed_timestamp(&value, time_partition)?, + _ => Utc::now().naive_utc(), + }; + let custom_partition_values = match custom_partition.as_ref() { + Some(custom_partition) => { + let custom_partitions = custom_partition.split(',').collect_vec(); + get_custom_partition_values(&value, &custom_partitions) + } + None => HashMap::new(), + }; + let schema = self.metadata.read().expect(LOCK_EXPECT).schema.clone(); + let (rb, is_first_event) = json::Event { data: value }.into_recordbatch( + &schema, + static_schema_flag, + time_partition.as_ref(), + schema_version, + )?; + + Event { + rb, + stream_name: self.stream_name.to_owned(), + origin_format: "json", + origin_size, + is_first_event, + parsed_timestamp, + time_partition: time_partition.clone(), + custom_partition_values, + stream_type: StreamType::UserDefined, + } + .process(self)?; + } + + Ok(()) + } + // Concatenates record batches and puts them in memory store for each event. pub fn push( &self, @@ -653,6 +772,22 @@ impl Stream { pub fn get_stream_type(&self) -> StreamType { self.metadata.read().expect(LOCK_EXPECT).stream_type } + + pub fn commit_schema(&self, schema: Arc) -> Result<(), ArrowError> { + let map = &mut self.metadata.write().expect(LOCK_EXPECT).schema; + // Construct updated schema + let current_schema = Schema::new(map.values().cloned().collect::()); + let schema = Schema::try_merge(vec![current_schema, schema.as_ref().clone()])?; + let updated_schema: HashMap> = schema + .fields + .iter() + .map(|f| (f.name().clone(), f.clone())) + .collect(); + // Update schema + _ = std::mem::replace(map, updated_schema); + + Ok(()) + } } #[derive(Deref, DerefMut, Default)] @@ -737,18 +872,73 @@ impl Streams { } } +fn get_custom_partition_values( + json: &Value, + custom_partition_list: &[&str], +) -> HashMap { + let mut custom_partition_values: HashMap = HashMap::new(); + for custom_partition_field in custom_partition_list { + let custom_partition_value = json.get(custom_partition_field.trim()).unwrap().to_owned(); + let custom_partition_value = match custom_partition_value { + e @ Value::Number(_) | e @ Value::Bool(_) => e.to_string(), + Value::String(s) => s, + _ => "".to_string(), + }; + custom_partition_values.insert( + custom_partition_field.trim().to_string(), + custom_partition_value, + ); + } + custom_partition_values +} + +fn get_parsed_timestamp(json: &Value, time_partition: &str) -> Result { + let current_time = json + .get(time_partition) + .ok_or_else(|| PostError::MissingTimePartition(time_partition.to_string()))?; + let parsed_time: DateTime = serde_json::from_value(current_time.clone())?; + + Ok(parsed_time.naive_utc()) +} + #[cfg(test)] mod tests { - use std::time::Duration; + use std::{str::FromStr, time::Duration}; use arrow_array::{Int32Array, StringArray, TimestampMillisecondArray}; use arrow_schema::{DataType, Field, TimeUnit}; use chrono::{NaiveDate, TimeDelta}; + use serde_json::json; use temp_dir::TempDir; use tokio::time::sleep; use super::*; + #[test] + fn parse_time_parition_from_value() { + let json = json!({"timestamp": "2025-05-15T15:30:00Z"}); + let parsed = get_parsed_timestamp(&json, "timestamp"); + + let expected = NaiveDateTime::from_str("2025-05-15T15:30:00").unwrap(); + assert_eq!(parsed.unwrap(), expected); + } + + #[test] + fn time_parition_not_in_json() { + let json = json!({"timestamp": "2025-05-15T15:30:00Z"}); + let parsed = get_parsed_timestamp(&json, "timestamp"); + + matches!(parsed, Err(PostError::MissingTimePartition(_))); + } + + #[test] + fn time_parition_not_parseable_as_datetime() { + let json = json!({"timestamp": "2025-05-15T15:30:00Z"}); + let parsed = get_parsed_timestamp(&json, "timestamp"); + + matches!(parsed, Err(PostError::SerdeError(_))); + } + #[test] fn test_staging_new_with_valid_stream() { let stream_name = "test_stream";