@@ -24,7 +24,6 @@ use arrow_array::RecordBatch;
2424use bytes:: Bytes ;
2525use chrono:: Utc ;
2626use http:: StatusCode ;
27- use serde_json:: Value ;
2827
2928use crate :: event:: error:: EventError ;
3029use crate :: event:: format:: known_schema:: { self , KNOWN_SCHEMA_LIST } ;
@@ -39,7 +38,7 @@ use crate::otel::traces::OTEL_TRACES_KNOWN_FIELD_LIST;
3938use crate :: parseable:: { StreamNotFound , PARSEABLE } ;
4039use crate :: storage:: { ObjectStorageError , StreamType } ;
4140use crate :: utils:: header_parsing:: ParseHeaderError ;
42- use crate :: utils:: json:: flatten:: JsonFlattenError ;
41+ use crate :: utils:: json:: { flatten:: JsonFlattenError , strict :: StrictValue } ;
4342
4443use super :: logstream:: error:: { CreateStreamError , StreamError } ;
4544use super :: modal:: utils:: ingest_utils:: { flatten_and_push_logs, get_custom_fields_from_header} ;
@@ -51,7 +50,7 @@ use super::users::filters::FiltersError;
5150// creates if stream does not exist
5251pub async fn ingest (
5352 req : HttpRequest ,
54- Json ( mut json) : Json < Value > ,
53+ Json ( json) : Json < StrictValue > ,
5554) -> Result < HttpResponse , PostError > {
5655 let Some ( stream_name) = req. headers ( ) . get ( STREAM_NAME_HEADER_KEY ) else {
5756 return Err ( PostError :: Header ( ParseHeaderError :: MissingStreamName ) ) ;
@@ -83,6 +82,8 @@ pub async fn ingest(
8382
8483 let mut p_custom_fields = get_custom_fields_from_header ( & req) ;
8584
85+ let mut json = json. into_inner ( ) ;
86+
8687 let fields = match & log_source {
8788 LogSource :: Custom ( src) => KNOWN_SCHEMA_LIST . extract_from_inline_log (
8889 & mut json,
@@ -127,13 +128,13 @@ pub async fn ingest(
127128
128129pub async fn ingest_internal_stream ( stream_name : String , body : Bytes ) -> Result < ( ) , PostError > {
129130 let size: usize = body. len ( ) ;
130- let json: Value = serde_json:: from_slice ( & body) ?;
131+ let json: StrictValue = serde_json:: from_slice ( & body) ?;
131132 let schema = PARSEABLE . get_stream ( & stream_name) ?. get_schema_raw ( ) ;
132133 let mut p_custom_fields = HashMap :: new ( ) ;
133134 p_custom_fields. insert ( USER_AGENT_KEY . to_string ( ) , "parseable" . to_string ( ) ) ;
134135 p_custom_fields. insert ( FORMAT_KEY . to_string ( ) , LogSource :: Pmeta . to_string ( ) ) ;
135136 // For internal streams, use old schema
136- format:: json:: Event :: new ( json)
137+ format:: json:: Event :: new ( json. into_inner ( ) )
137138 . into_event (
138139 stream_name,
139140 size as u64 ,
@@ -155,7 +156,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
155156// creates if stream does not exist
156157pub async fn handle_otel_logs_ingestion (
157158 req : HttpRequest ,
158- Json ( json) : Json < Value > ,
159+ Json ( json) : Json < StrictValue > ,
159160) -> Result < HttpResponse , PostError > {
160161 let Some ( stream_name) = req. headers ( ) . get ( STREAM_NAME_HEADER_KEY ) else {
161162 return Err ( PostError :: Header ( ParseHeaderError :: MissingStreamName ) ) ;
@@ -205,7 +206,13 @@ pub async fn handle_otel_logs_ingestion(
205206
206207 let p_custom_fields = get_custom_fields_from_header ( & req) ;
207208
208- flatten_and_push_logs ( json, & stream_name, & log_source, & p_custom_fields) . await ?;
209+ flatten_and_push_logs (
210+ json. into_inner ( ) ,
211+ & stream_name,
212+ & log_source,
213+ & p_custom_fields,
214+ )
215+ . await ?;
209216
210217 Ok ( HttpResponse :: Ok ( ) . finish ( ) )
211218}
@@ -215,7 +222,7 @@ pub async fn handle_otel_logs_ingestion(
215222// creates if stream does not exist
216223pub async fn handle_otel_metrics_ingestion (
217224 req : HttpRequest ,
218- Json ( json) : Json < Value > ,
225+ Json ( json) : Json < StrictValue > ,
219226) -> Result < HttpResponse , PostError > {
220227 let Some ( stream_name) = req. headers ( ) . get ( STREAM_NAME_HEADER_KEY ) else {
221228 return Err ( PostError :: Header ( ParseHeaderError :: MissingStreamName ) ) ;
@@ -263,7 +270,13 @@ pub async fn handle_otel_metrics_ingestion(
263270
264271 let p_custom_fields = get_custom_fields_from_header ( & req) ;
265272
266- flatten_and_push_logs ( json, & stream_name, & log_source, & p_custom_fields) . await ?;
273+ flatten_and_push_logs (
274+ json. into_inner ( ) ,
275+ & stream_name,
276+ & log_source,
277+ & p_custom_fields,
278+ )
279+ . await ?;
267280
268281 Ok ( HttpResponse :: Ok ( ) . finish ( ) )
269282}
@@ -273,7 +286,7 @@ pub async fn handle_otel_metrics_ingestion(
273286// creates if stream does not exist
274287pub async fn handle_otel_traces_ingestion (
275288 req : HttpRequest ,
276- Json ( json) : Json < Value > ,
289+ Json ( json) : Json < StrictValue > ,
277290) -> Result < HttpResponse , PostError > {
278291 let Some ( stream_name) = req. headers ( ) . get ( STREAM_NAME_HEADER_KEY ) else {
279292 return Err ( PostError :: Header ( ParseHeaderError :: MissingStreamName ) ) ;
@@ -322,7 +335,13 @@ pub async fn handle_otel_traces_ingestion(
322335
323336 let p_custom_fields = get_custom_fields_from_header ( & req) ;
324337
325- flatten_and_push_logs ( json, & stream_name, & log_source, & p_custom_fields) . await ?;
338+ flatten_and_push_logs (
339+ json. into_inner ( ) ,
340+ & stream_name,
341+ & log_source,
342+ & p_custom_fields,
343+ )
344+ . await ?;
326345
327346 Ok ( HttpResponse :: Ok ( ) . finish ( ) )
328347}
@@ -333,7 +352,7 @@ pub async fn handle_otel_traces_ingestion(
333352pub async fn post_event (
334353 req : HttpRequest ,
335354 stream_name : Path < String > ,
336- Json ( mut json) : Json < Value > ,
355+ Json ( json) : Json < StrictValue > ,
337356) -> Result < HttpResponse , PostError > {
338357 let stream_name = stream_name. into_inner ( ) ;
339358
@@ -369,6 +388,7 @@ pub async fn post_event(
369388 . get ( EXTRACT_LOG_KEY )
370389 . and_then ( |h| h. to_str ( ) . ok ( ) ) ;
371390 let mut p_custom_fields = get_custom_fields_from_header ( & req) ;
391+ let mut json = json. into_inner ( ) ;
372392 match & log_source {
373393 LogSource :: OtelLogs | LogSource :: OtelMetrics | LogSource :: OtelTraces => {
374394 return Err ( PostError :: OtelNotSupported )
0 commit comments