@@ -21,16 +21,9 @@ use anyhow::anyhow;
2121use arrow_schema:: Field ;
2222use bytes:: Bytes ;
2323use chrono:: { DateTime , NaiveDateTime , Utc } ;
24- use nom:: AsBytes ;
25- use opentelemetry_proto:: tonic:: {
26- logs:: v1:: LogsData , metrics:: v1:: MetricsData , trace:: v1:: TracesData ,
27- } ;
2824use itertools:: Itertools ;
2925use serde_json:: Value ;
30- use std:: {
31- collections:: { BTreeMap , HashMap } ,
32- sync:: Arc ,
33- } ;
26+ use std:: { collections:: HashMap , sync:: Arc } ;
3427
3528use crate :: {
3629 event:: {
@@ -39,11 +32,9 @@ use crate::{
3932 } ,
4033 handlers:: {
4134 http:: { ingest:: PostError , kinesis} ,
42- LOG_SOURCE_KEY , LOG_SOURCE_KINESIS , LOG_SOURCE_OTEL_LOGS , LOG_SOURCE_OTEL_METRICS ,
43- LOG_SOURCE_OTEL_TRACES , PREFIX_META , PREFIX_TAGS , SEPARATOR ,
35+ LOG_SOURCE_KEY , LOG_SOURCE_KINESIS , PREFIX_META , PREFIX_TAGS , SEPARATOR ,
4436 } ,
4537 metadata:: { SchemaVersion , STREAM_INFO } ,
46- otel:: { logs:: flatten_otel_logs, metrics:: flatten_otel_metrics, traces:: flatten_otel_traces} ,
4738 storage:: StreamType ,
4839 utils:: { header_parsing:: collect_labelled_headers, json:: convert_array_to_object} ,
4940} ;
@@ -57,32 +48,19 @@ pub async fn flatten_and_push_logs(
5748 push_logs ( stream_name, & req, & body) . await ?;
5849 return Ok ( ( ) ) ;
5950 } ;
60- let mut json: Vec < BTreeMap < String , Value > > = Vec :: new ( ) ;
61- match log_source. to_str ( ) . unwrap ( ) {
62- LOG_SOURCE_KINESIS => json = kinesis:: flatten_kinesis_logs ( & body) ,
63- //custom flattening required for otel logs
64- LOG_SOURCE_OTEL_LOGS => {
65- let logs: LogsData = serde_json:: from_slice ( body. as_bytes ( ) ) ?;
66- json = flatten_otel_logs ( & logs) ;
67- }
68- //custom flattening required for otel metrics
69- LOG_SOURCE_OTEL_METRICS => {
70- let metrics: MetricsData = serde_json:: from_slice ( body. as_bytes ( ) ) ?;
71- json = flatten_otel_metrics ( metrics) ;
72- }
73- //custom flattening required for otel traces
74- LOG_SOURCE_OTEL_TRACES => {
75- let traces: TracesData = serde_json:: from_slice ( body. as_bytes ( ) ) ?;
76- json = flatten_otel_traces ( & traces) ;
77- }
78- log_source => {
79- tracing:: warn!( "Unknown log source: {}" , log_source) ;
51+ let log_source = log_source. to_str ( ) . unwrap ( ) ;
52+ if log_source == LOG_SOURCE_KINESIS {
53+ let json = kinesis:: flatten_kinesis_logs ( & body) ;
54+ for record in json. iter ( ) {
55+ let body: Bytes = serde_json:: to_vec ( record) . unwrap ( ) . into ( ) ;
8056 push_logs ( stream_name, & req, & body) . await ?;
8157 }
82- }
83-
84- for record in json. iter_mut ( ) {
85- let body: Bytes = serde_json:: to_vec ( record) . unwrap ( ) . into ( ) ;
58+ } else if log_source. contains ( "otel" ) {
59+ return Err ( PostError :: Invalid ( anyhow ! (
60+ "Please use endpoints `/v1/logs` for otel logs, `/v1/metrics` for otel metrics and `/v1/traces` for otel traces"
61+ ) ) ) ;
62+ } else {
63+ tracing:: warn!( "Unknown log source: {}" , log_source) ;
8664 push_logs ( stream_name, & req, & body) . await ?;
8765 }
8866
0 commit comments