@@ -95,7 +95,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
9595 metadata : String :: default ( ) ,
9696 } ;
9797 // For internal streams, use old schema
98- event. into_recordbatch ( & schema, None , None , SchemaVersion :: V0 ) ?
98+ event. into_recordbatch ( & schema, None , None , SchemaVersion :: V0 , "" ) ?
9999 } ;
100100 event:: Event {
101101 rb,
@@ -127,7 +127,8 @@ pub async fn handle_otel_logs_ingestion(
127127 let Some ( log_source) = req. headers ( ) . get ( LOG_SOURCE_KEY ) else {
128128 return Err ( PostError :: Header ( ParseHeaderError :: MissingLogSource ) ) ;
129129 } ;
130- if log_source. to_str ( ) . unwrap ( ) != LOG_SOURCE_OTEL_LOGS {
130+ let log_source = log_source. to_str ( ) . unwrap ( ) ;
131+ if log_source != LOG_SOURCE_OTEL_LOGS {
131132 return Err ( PostError :: Invalid ( anyhow:: anyhow!(
132133 "Please use x-p-log-source: otel-logs for ingesting otel logs"
133134 ) ) ) ;
@@ -141,7 +142,7 @@ pub async fn handle_otel_logs_ingestion(
141142 let mut json = flatten_otel_logs ( & logs) ;
142143 for record in json. iter_mut ( ) {
143144 let body: Bytes = serde_json:: to_vec ( record) . unwrap ( ) . into ( ) ;
144- push_logs ( & stream_name, & req, & body) . await ?;
145+ push_logs ( & stream_name, & req, & body, log_source ) . await ?;
145146 }
146147
147148 Ok ( HttpResponse :: Ok ( ) . finish ( ) )
@@ -160,7 +161,8 @@ pub async fn handle_otel_metrics_ingestion(
160161 let Some ( log_source) = req. headers ( ) . get ( LOG_SOURCE_KEY ) else {
161162 return Err ( PostError :: Header ( ParseHeaderError :: MissingLogSource ) ) ;
162163 } ;
163- if log_source. to_str ( ) . unwrap ( ) != LOG_SOURCE_OTEL_METRICS {
164+ let log_source = log_source. to_str ( ) . unwrap ( ) ;
165+ if log_source != LOG_SOURCE_OTEL_METRICS {
164166 return Err ( PostError :: Invalid ( anyhow:: anyhow!(
165167 "Please use x-p-log-source: otel-metrics for ingesting otel metrics"
166168 ) ) ) ;
@@ -173,7 +175,7 @@ pub async fn handle_otel_metrics_ingestion(
173175 let mut json = flatten_otel_metrics ( metrics) ;
174176 for record in json. iter_mut ( ) {
175177 let body: Bytes = serde_json:: to_vec ( record) . unwrap ( ) . into ( ) ;
176- push_logs ( & stream_name, & req, & body) . await ?;
178+ push_logs ( & stream_name, & req, & body, log_source ) . await ?;
177179 }
178180
179181 Ok ( HttpResponse :: Ok ( ) . finish ( ) )
@@ -193,7 +195,8 @@ pub async fn handle_otel_traces_ingestion(
193195 let Some ( log_source) = req. headers ( ) . get ( LOG_SOURCE_KEY ) else {
194196 return Err ( PostError :: Header ( ParseHeaderError :: MissingLogSource ) ) ;
195197 } ;
196- if log_source. to_str ( ) . unwrap ( ) != LOG_SOURCE_OTEL_TRACES {
198+ let log_source = log_source. to_str ( ) . unwrap ( ) ;
199+ if log_source != LOG_SOURCE_OTEL_TRACES {
197200 return Err ( PostError :: Invalid ( anyhow:: anyhow!(
198201 "Please use x-p-log-source: otel-traces for ingesting otel traces"
199202 ) ) ) ;
@@ -206,7 +209,7 @@ pub async fn handle_otel_traces_ingestion(
206209 let mut json = flatten_otel_traces ( & traces) ;
207210 for record in json. iter_mut ( ) {
208211 let body: Bytes = serde_json:: to_vec ( record) . unwrap ( ) . into ( ) ;
209- push_logs ( & stream_name, & req, & body) . await ?;
212+ push_logs ( & stream_name, & req, & body, log_source ) . await ?;
210213 }
211214
212215 Ok ( HttpResponse :: Ok ( ) . finish ( ) )
@@ -417,6 +420,7 @@ mod tests {
417420 None ,
418421 None ,
419422 SchemaVersion :: V0 ,
423+ "" ,
420424 )
421425 . unwrap ( ) ;
422426
@@ -467,6 +471,7 @@ mod tests {
467471 None ,
468472 None ,
469473 SchemaVersion :: V0 ,
474+ "" ,
470475 )
471476 . unwrap ( ) ;
472477
@@ -500,7 +505,8 @@ mod tests {
500505
501506 let req = TestRequest :: default ( ) . to_http_request ( ) ;
502507
503- let ( rb, _) = into_event_batch ( & req, & json, schema, None , None , SchemaVersion :: V0 ) . unwrap ( ) ;
508+ let ( rb, _) =
509+ into_event_batch ( & req, & json, schema, None , None , SchemaVersion :: V0 , "" ) . unwrap ( ) ;
504510
505511 assert_eq ! ( rb. num_rows( ) , 1 ) ;
506512 assert_eq ! ( rb. num_columns( ) , 5 ) ;
@@ -532,7 +538,7 @@ mod tests {
532538
533539 let req = TestRequest :: default ( ) . to_http_request ( ) ;
534540
535- assert ! ( into_event_batch( & req, & json, schema, None , None , SchemaVersion :: V0 ) . is_err( ) ) ;
541+ assert ! ( into_event_batch( & req, & json, schema, None , None , SchemaVersion :: V0 , "" ) . is_err( ) ) ;
536542 }
537543
538544 #[ test]
@@ -550,7 +556,8 @@ mod tests {
550556
551557 let req = TestRequest :: default ( ) . to_http_request ( ) ;
552558
553- let ( rb, _) = into_event_batch ( & req, & json, schema, None , None , SchemaVersion :: V0 ) . unwrap ( ) ;
559+ let ( rb, _) =
560+ into_event_batch ( & req, & json, schema, None , None , SchemaVersion :: V0 , "" ) . unwrap ( ) ;
554561
555562 assert_eq ! ( rb. num_rows( ) , 1 ) ;
556563 assert_eq ! ( rb. num_columns( ) , 3 ) ;
@@ -568,7 +575,8 @@ mod tests {
568575 HashMap :: default ( ) ,
569576 None ,
570577 None ,
571- SchemaVersion :: V0
578+ SchemaVersion :: V0 ,
579+ ""
572580 )
573581 . is_err( ) )
574582 }
@@ -600,6 +608,7 @@ mod tests {
600608 None ,
601609 None ,
602610 SchemaVersion :: V0 ,
611+ "" ,
603612 )
604613 . unwrap ( ) ;
605614
@@ -656,6 +665,7 @@ mod tests {
656665 None ,
657666 None ,
658667 SchemaVersion :: V0 ,
668+ "" ,
659669 )
660670 . unwrap ( ) ;
661671
@@ -705,7 +715,8 @@ mod tests {
705715 ) ;
706716 let req = TestRequest :: default ( ) . to_http_request ( ) ;
707717
708- let ( rb, _) = into_event_batch ( & req, & json, schema, None , None , SchemaVersion :: V0 ) . unwrap ( ) ;
718+ let ( rb, _) =
719+ into_event_batch ( & req, & json, schema, None , None , SchemaVersion :: V0 , "" ) . unwrap ( ) ;
709720
710721 assert_eq ! ( rb. num_rows( ) , 3 ) ;
711722 assert_eq ! ( rb. num_columns( ) , 6 ) ;
@@ -754,7 +765,7 @@ mod tests {
754765 . into_iter ( ) ,
755766 ) ;
756767
757- assert ! ( into_event_batch( & req, & json, schema, None , None , SchemaVersion :: V0 ) . is_err( ) ) ;
768+ assert ! ( into_event_batch( & req, & json, schema, None , None , SchemaVersion :: V0 , "" ) . is_err( ) ) ;
758769 }
759770
760771 #[ test]
@@ -789,6 +800,7 @@ mod tests {
789800 None ,
790801 None ,
791802 SchemaVersion :: V0 ,
803+ "" ,
792804 )
793805 . unwrap ( ) ;
794806
@@ -869,6 +881,7 @@ mod tests {
869881 None ,
870882 None ,
871883 SchemaVersion :: V1 ,
884+ "" ,
872885 )
873886 . unwrap ( ) ;
874887
0 commit comments