@@ -24,27 +24,21 @@ use arrow_array::RecordBatch;
2424use bytes:: Bytes ;
2525use chrono:: Utc ;
2626use http:: StatusCode ;
27- use opentelemetry_proto:: tonic:: logs:: v1:: LogsData ;
28- use opentelemetry_proto:: tonic:: metrics:: v1:: MetricsData ;
29- use opentelemetry_proto:: tonic:: trace:: v1:: TracesData ;
3027use serde_json:: Value ;
3128
3229use crate :: event:: error:: EventError ;
3330use crate :: event:: format:: { self , EventFormat , LogSource } ;
3431use crate :: handlers:: { LOG_SOURCE_KEY , STREAM_NAME_HEADER_KEY } ;
3532use crate :: metadata:: SchemaVersion ;
3633use crate :: option:: Mode ;
37- use crate :: otel:: logs:: flatten_otel_logs;
38- use crate :: otel:: metrics:: flatten_otel_metrics;
39- use crate :: otel:: traces:: flatten_otel_traces;
4034use crate :: parseable:: { StreamNotFound , PARSEABLE } ;
4135use crate :: storage:: { ObjectStorageError , StreamType } ;
4236use crate :: utils:: header_parsing:: ParseHeaderError ;
4337use crate :: utils:: json:: flatten:: JsonFlattenError ;
4438use crate :: { event, LOCK_EXPECT } ;
4539
4640use super :: logstream:: error:: { CreateStreamError , StreamError } ;
47- use super :: modal:: utils:: ingest_utils:: { flatten_and_push_logs, push_logs } ;
41+ use super :: modal:: utils:: ingest_utils:: flatten_and_push_logs;
4842use super :: users:: dashboards:: DashboardError ;
4943use super :: users:: filters:: FiltersError ;
5044
@@ -70,6 +64,14 @@ pub async fn ingest(req: HttpRequest, Json(json): Json<Value>) -> Result<HttpRes
7064 . get ( LOG_SOURCE_KEY )
7165 . and_then ( |h| h. to_str ( ) . ok ( ) )
7266 . map_or ( LogSource :: default ( ) , LogSource :: from) ;
67+
68+ if matches ! (
69+ log_source,
70+ LogSource :: OtelLogs | LogSource :: OtelMetrics | LogSource :: OtelTraces
71+ ) {
72+ return Err ( PostError :: OtelNotSupported ) ;
73+ }
74+
7375 flatten_and_push_logs ( json, & stream_name, & log_source) . await ?;
7476
7577 Ok ( HttpResponse :: Ok ( ) . finish ( ) )
@@ -133,11 +135,7 @@ pub async fn handle_otel_logs_ingestion(
133135 . create_stream_if_not_exists ( & stream_name, StreamType :: UserDefined , LogSource :: OtelLogs )
134136 . await ?;
135137
136- //custom flattening required for otel logs
137- let logs: LogsData = serde_json:: from_value ( json) ?;
138- for record in flatten_otel_logs ( & logs) {
139- push_logs ( & stream_name, record, & log_source) . await ?;
140- }
138+ flatten_and_push_logs ( json, & stream_name, & log_source) . await ?;
141139
142140 Ok ( HttpResponse :: Ok ( ) . finish ( ) )
143141}
@@ -168,11 +166,7 @@ pub async fn handle_otel_metrics_ingestion(
168166 )
169167 . await ?;
170168
171- //custom flattening required for otel metrics
172- let metrics: MetricsData = serde_json:: from_value ( json) ?;
173- for record in flatten_otel_metrics ( metrics) {
174- push_logs ( & stream_name, record, & log_source) . await ?;
175- }
169+ flatten_and_push_logs ( json, & stream_name, & log_source) . await ?;
176170
177171 Ok ( HttpResponse :: Ok ( ) . finish ( ) )
178172}
@@ -200,11 +194,7 @@ pub async fn handle_otel_traces_ingestion(
200194 . create_stream_if_not_exists ( & stream_name, StreamType :: UserDefined , LogSource :: OtelTraces )
201195 . await ?;
202196
203- //custom flattening required for otel traces
204- let traces: TracesData = serde_json:: from_value ( json) ?;
205- for record in flatten_otel_traces ( & traces) {
206- push_logs ( & stream_name, record, & log_source) . await ?;
207- }
197+ flatten_and_push_logs ( json, & stream_name, & log_source) . await ?;
208198
209199 Ok ( HttpResponse :: Ok ( ) . finish ( ) )
210200}
@@ -245,6 +235,14 @@ pub async fn post_event(
245235 . get ( LOG_SOURCE_KEY )
246236 . and_then ( |h| h. to_str ( ) . ok ( ) )
247237 . map_or ( LogSource :: default ( ) , LogSource :: from) ;
238+
239+ if matches ! (
240+ log_source,
241+ LogSource :: OtelLogs | LogSource :: OtelMetrics | LogSource :: OtelTraces
242+ ) {
243+ return Err ( PostError :: OtelNotSupported ) ;
244+ }
245+
248246 flatten_and_push_logs ( json, & stream_name, & log_source) . await ?;
249247
250248 Ok ( HttpResponse :: Ok ( ) . finish ( ) )
0 commit comments