diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index dab4f84df..42de1cb4f 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -24,9 +24,6 @@ use arrow_array::RecordBatch; use bytes::Bytes; use chrono::Utc; use http::StatusCode; -use opentelemetry_proto::tonic::logs::v1::LogsData; -use opentelemetry_proto::tonic::metrics::v1::MetricsData; -use opentelemetry_proto::tonic::trace::v1::TracesData; use serde_json::Value; use crate::event::error::EventError; @@ -34,9 +31,6 @@ use crate::event::format::{self, EventFormat, LogSource}; use crate::handlers::{LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY}; use crate::metadata::SchemaVersion; use crate::option::Mode; -use crate::otel::logs::flatten_otel_logs; -use crate::otel::metrics::flatten_otel_metrics; -use crate::otel::traces::flatten_otel_traces; use crate::parseable::{StreamNotFound, PARSEABLE}; use crate::storage::{ObjectStorageError, StreamType}; use crate::utils::header_parsing::ParseHeaderError; @@ -44,7 +38,7 @@ use crate::utils::json::flatten::JsonFlattenError; use crate::{event, LOCK_EXPECT}; use super::logstream::error::{CreateStreamError, StreamError}; -use super::modal::utils::ingest_utils::{flatten_and_push_logs, push_logs}; +use super::modal::utils::ingest_utils::flatten_and_push_logs; use super::users::dashboards::DashboardError; use super::users::filters::FiltersError; @@ -70,6 +64,14 @@ pub async fn ingest(req: HttpRequest, Json(json): Json) -> Result Result<(), PostError> { match log_source { LogSource::Kinesis => { + //custom flattening required for Amazon Kinesis let message: Message = serde_json::from_value(json)?; - let json = flatten_kinesis_logs(message); - for record in json { + for record in flatten_kinesis_logs(message) { push_logs(stream_name, record, &LogSource::default()).await?; } } - LogSource::OtelLogs | LogSource::OtelMetrics | LogSource::OtelTraces => { - return Err(PostError::OtelNotSupported); + LogSource::OtelLogs => { + //custom flattening required for otel logs + let logs: LogsData = serde_json::from_value(json)?; + for record in flatten_otel_logs(&logs) { + push_logs(stream_name, record, log_source).await?; + } + } + LogSource::OtelTraces => { + //custom flattening required for otel traces + let traces: TracesData = serde_json::from_value(json)?; + for record in flatten_otel_traces(&traces) { + push_logs(stream_name, record, log_source).await?; + } + } + LogSource::OtelMetrics => { + //custom flattening required for otel metrics + let metrics: MetricsData = serde_json::from_value(json)?; + for record in flatten_otel_metrics(metrics) { + push_logs(stream_name, record, log_source).await?; + } } _ => { push_logs(stream_name, json, log_source).await?; @@ -61,7 +83,7 @@ pub async fn flatten_and_push_logs( Ok(()) } -pub async fn push_logs( +async fn push_logs( stream_name: &str, json: Value, log_source: &LogSource,