@@ -20,16 +20,14 @@ use super::logstream::error::{CreateStreamError, StreamError};
2020use super :: modal:: utils:: ingest_utils:: { flatten_and_push_logs, push_logs} ;
2121use super :: users:: dashboards:: DashboardError ;
2222use super :: users:: filters:: FiltersError ;
23+ use crate :: event:: format:: LogSource ;
2324use crate :: event:: {
2425 self ,
2526 error:: EventError ,
2627 format:: { self , EventFormat } ,
2728} ;
2829use crate :: handlers:: http:: modal:: utils:: logstream_utils:: create_stream_and_schema_from_storage;
29- use crate :: handlers:: {
30- LOG_SOURCE_KEY , LOG_SOURCE_OTEL_LOGS , LOG_SOURCE_OTEL_METRICS , LOG_SOURCE_OTEL_TRACES ,
31- STREAM_NAME_HEADER_KEY ,
32- } ;
30+ use crate :: handlers:: { LOG_SOURCE_KEY , STREAM_NAME_HEADER_KEY } ;
3331use crate :: metadata:: error:: stream_info:: MetadataError ;
3432use crate :: metadata:: { SchemaVersion , STREAM_INFO } ;
3533use crate :: option:: { Mode , CONFIG } ;
@@ -95,7 +93,7 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
9593 metadata : String :: default ( ) ,
9694 } ;
9795 // For internal streams, use old schema
98- event. into_recordbatch ( & schema, None , None , SchemaVersion :: V0 , "" ) ?
96+ event. into_recordbatch ( & schema, None , None , SchemaVersion :: V0 , & LogSource :: default ( ) ) ?
9997 } ;
10098 event:: Event {
10199 rb,
@@ -127,8 +125,8 @@ pub async fn handle_otel_logs_ingestion(
127125 let Some ( log_source) = req. headers ( ) . get ( LOG_SOURCE_KEY ) else {
128126 return Err ( PostError :: Header ( ParseHeaderError :: MissingLogSource ) ) ;
129127 } ;
130- let log_source = log_source. to_str ( ) . unwrap ( ) ;
131- if log_source != LOG_SOURCE_OTEL_LOGS {
128+ let log_source = LogSource :: from ( log_source. to_str ( ) . unwrap ( ) ) ;
129+ if log_source != LogSource :: OtelLogs {
132130 return Err ( PostError :: Invalid ( anyhow:: anyhow!(
133131 "Please use x-p-log-source: otel-logs for ingesting otel logs"
134132 ) ) ) ;
@@ -142,7 +140,7 @@ pub async fn handle_otel_logs_ingestion(
142140 let mut json = flatten_otel_logs ( & logs) ;
143141 for record in json. iter_mut ( ) {
144142 let body: Bytes = serde_json:: to_vec ( record) . unwrap ( ) . into ( ) ;
145- push_logs ( & stream_name, & req, & body, log_source) . await ?;
143+ push_logs ( & stream_name, & req, & body, & log_source) . await ?;
146144 }
147145
148146 Ok ( HttpResponse :: Ok ( ) . finish ( ) )
@@ -161,8 +159,8 @@ pub async fn handle_otel_metrics_ingestion(
161159 let Some ( log_source) = req. headers ( ) . get ( LOG_SOURCE_KEY ) else {
162160 return Err ( PostError :: Header ( ParseHeaderError :: MissingLogSource ) ) ;
163161 } ;
164- let log_source = log_source. to_str ( ) . unwrap ( ) ;
165- if log_source != LOG_SOURCE_OTEL_METRICS {
162+ let log_source = LogSource :: from ( log_source. to_str ( ) . unwrap ( ) ) ;
163+ if log_source != LogSource :: OtelMetrics {
166164 return Err ( PostError :: Invalid ( anyhow:: anyhow!(
167165 "Please use x-p-log-source: otel-metrics for ingesting otel metrics"
168166 ) ) ) ;
@@ -175,7 +173,7 @@ pub async fn handle_otel_metrics_ingestion(
175173 let mut json = flatten_otel_metrics ( metrics) ;
176174 for record in json. iter_mut ( ) {
177175 let body: Bytes = serde_json:: to_vec ( record) . unwrap ( ) . into ( ) ;
178- push_logs ( & stream_name, & req, & body, log_source) . await ?;
176+ push_logs ( & stream_name, & req, & body, & log_source) . await ?;
179177 }
180178
181179 Ok ( HttpResponse :: Ok ( ) . finish ( ) )
@@ -195,8 +193,8 @@ pub async fn handle_otel_traces_ingestion(
195193 let Some ( log_source) = req. headers ( ) . get ( LOG_SOURCE_KEY ) else {
196194 return Err ( PostError :: Header ( ParseHeaderError :: MissingLogSource ) ) ;
197195 } ;
198- let log_source = log_source. to_str ( ) . unwrap ( ) ;
199- if log_source != LOG_SOURCE_OTEL_TRACES {
196+ let log_source = LogSource :: from ( log_source. to_str ( ) . unwrap ( ) ) ;
197+ if log_source != LogSource :: OtelTraces {
200198 return Err ( PostError :: Invalid ( anyhow:: anyhow!(
201199 "Please use x-p-log-source: otel-traces for ingesting otel traces"
202200 ) ) ) ;
@@ -209,7 +207,7 @@ pub async fn handle_otel_traces_ingestion(
209207 let mut json = flatten_otel_traces ( & traces) ;
210208 for record in json. iter_mut ( ) {
211209 let body: Bytes = serde_json:: to_vec ( record) . unwrap ( ) . into ( ) ;
212- push_logs ( & stream_name, & req, & body, log_source) . await ?;
210+ push_logs ( & stream_name, & req, & body, & log_source) . await ?;
213211 }
214212
215213 Ok ( HttpResponse :: Ok ( ) . finish ( ) )
@@ -371,7 +369,7 @@ mod tests {
371369 use serde_json:: json;
372370
373371 use crate :: {
374- event,
372+ event:: { self , format :: LogSource } ,
375373 handlers:: { http:: modal:: utils:: ingest_utils:: into_event_batch, PREFIX_META , PREFIX_TAGS } ,
376374 metadata:: SchemaVersion ,
377375 } ;
@@ -420,7 +418,7 @@ mod tests {
420418 None ,
421419 None ,
422420 SchemaVersion :: V0 ,
423- "" ,
421+ & LogSource :: default ( ) ,
424422 )
425423 . unwrap ( ) ;
426424
@@ -471,7 +469,7 @@ mod tests {
471469 None ,
472470 None ,
473471 SchemaVersion :: V0 ,
474- "" ,
472+ & LogSource :: default ( ) ,
475473 )
476474 . unwrap ( ) ;
477475
@@ -505,8 +503,16 @@ mod tests {
505503
506504 let req = TestRequest :: default ( ) . to_http_request ( ) ;
507505
508- let ( rb, _) =
509- into_event_batch ( & req, & json, schema, None , None , SchemaVersion :: V0 , "" ) . unwrap ( ) ;
506+ let ( rb, _) = into_event_batch (
507+ & req,
508+ & json,
509+ schema,
510+ None ,
511+ None ,
512+ SchemaVersion :: V0 ,
513+ & LogSource :: default ( ) ,
514+ )
515+ . unwrap ( ) ;
510516
511517 assert_eq ! ( rb. num_rows( ) , 1 ) ;
512518 assert_eq ! ( rb. num_columns( ) , 5 ) ;
@@ -538,7 +544,16 @@ mod tests {
538544
539545 let req = TestRequest :: default ( ) . to_http_request ( ) ;
540546
541- assert ! ( into_event_batch( & req, & json, schema, None , None , SchemaVersion :: V0 , "" ) . is_err( ) ) ;
547+ assert ! ( into_event_batch(
548+ & req,
549+ & json,
550+ schema,
551+ None ,
552+ None ,
553+ SchemaVersion :: V0 ,
554+ & LogSource :: default ( )
555+ )
556+ . is_err( ) ) ;
542557 }
543558
544559 #[ test]
@@ -556,8 +571,16 @@ mod tests {
556571
557572 let req = TestRequest :: default ( ) . to_http_request ( ) ;
558573
559- let ( rb, _) =
560- into_event_batch ( & req, & json, schema, None , None , SchemaVersion :: V0 , "" ) . unwrap ( ) ;
574+ let ( rb, _) = into_event_batch (
575+ & req,
576+ & json,
577+ schema,
578+ None ,
579+ None ,
580+ SchemaVersion :: V0 ,
581+ & LogSource :: default ( ) ,
582+ )
583+ . unwrap ( ) ;
561584
562585 assert_eq ! ( rb. num_rows( ) , 1 ) ;
563586 assert_eq ! ( rb. num_columns( ) , 3 ) ;
@@ -576,7 +599,7 @@ mod tests {
576599 None ,
577600 None ,
578601 SchemaVersion :: V0 ,
579- ""
602+ & LogSource :: default ( )
580603 )
581604 . is_err( ) )
582605 }
@@ -608,7 +631,7 @@ mod tests {
608631 None ,
609632 None ,
610633 SchemaVersion :: V0 ,
611- "" ,
634+ & LogSource :: default ( ) ,
612635 )
613636 . unwrap ( ) ;
614637
@@ -665,7 +688,7 @@ mod tests {
665688 None ,
666689 None ,
667690 SchemaVersion :: V0 ,
668- "" ,
691+ & LogSource :: default ( ) ,
669692 )
670693 . unwrap ( ) ;
671694
@@ -715,8 +738,16 @@ mod tests {
715738 ) ;
716739 let req = TestRequest :: default ( ) . to_http_request ( ) ;
717740
718- let ( rb, _) =
719- into_event_batch ( & req, & json, schema, None , None , SchemaVersion :: V0 , "" ) . unwrap ( ) ;
741+ let ( rb, _) = into_event_batch (
742+ & req,
743+ & json,
744+ schema,
745+ None ,
746+ None ,
747+ SchemaVersion :: V0 ,
748+ & LogSource :: default ( ) ,
749+ )
750+ . unwrap ( ) ;
720751
721752 assert_eq ! ( rb. num_rows( ) , 3 ) ;
722753 assert_eq ! ( rb. num_columns( ) , 6 ) ;
@@ -765,7 +796,16 @@ mod tests {
765796 . into_iter ( ) ,
766797 ) ;
767798
768- assert ! ( into_event_batch( & req, & json, schema, None , None , SchemaVersion :: V0 , "" ) . is_err( ) ) ;
799+ assert ! ( into_event_batch(
800+ & req,
801+ & json,
802+ schema,
803+ None ,
804+ None ,
805+ SchemaVersion :: V0 ,
806+ & LogSource :: default ( )
807+ )
808+ . is_err( ) ) ;
769809 }
770810
771811 #[ test]
@@ -800,7 +840,7 @@ mod tests {
800840 None ,
801841 None ,
802842 SchemaVersion :: V0 ,
803- "" ,
843+ & LogSource :: default ( ) ,
804844 )
805845 . unwrap ( ) ;
806846
@@ -881,7 +921,7 @@ mod tests {
881921 None ,
882922 None ,
883923 SchemaVersion :: V1 ,
884- "" ,
924+ & LogSource :: default ( ) ,
885925 )
886926 . unwrap ( ) ;
887927
0 commit comments