@@ -79,7 +79,7 @@ pub async fn ingest(req: HttpRequest, Json(json): Json<Value>) -> Result<HttpRes
7979
8080pub  async  fn  ingest_internal_stream ( stream_name :  String ,  body :  Bytes )  -> Result < ( ) ,  PostError >  { 
8181    let  size:  usize  = body. len ( ) ; 
82-     let  parsed_timestamp  = Utc :: now ( ) . naive_utc ( ) ; 
82+     let  now  = Utc :: now ( ) ; 
8383    let  ( rb,  is_first)  = { 
8484        let  body_val:  Value  = serde_json:: from_slice ( & body) ?; 
8585        let  hash_map = PARSEABLE . streams . read ( ) . unwrap ( ) ; 
@@ -93,15 +93,15 @@ pub async fn ingest_internal_stream(stream_name: String, body: Bytes) -> Result<
9393            . clone ( ) ; 
9494        let  event = format:: json:: Event  {  data :  body_val } ; 
9595        // For internal streams, use old schema 
96-         event. into_recordbatch ( & schema,  false ,  None ,  SchemaVersion :: V0 ) ?
96+         event. into_recordbatch ( & schema,  now ,   false ,  None ,  SchemaVersion :: V0 ) ?
9797    } ; 
9898    event:: Event  { 
9999        rb, 
100100        stream_name, 
101101        origin_format :  "json" , 
102102        origin_size :  size as  u64 , 
103103        is_first_event :  is_first, 
104-         parsed_timestamp, 
104+         parsed_timestamp :  now . naive_utc ( ) , 
105105        time_partition :  None , 
106106        custom_partition_values :  HashMap :: new ( ) , 
107107        stream_type :  StreamType :: Internal , 
@@ -351,6 +351,7 @@ mod tests {
351351    use  arrow:: datatypes:: Int64Type ; 
352352    use  arrow_array:: { ArrayRef ,  Float64Array ,  Int64Array ,  ListArray ,  StringArray } ; 
353353    use  arrow_schema:: { DataType ,  Field } ; 
354+     use  chrono:: Utc ; 
354355    use  serde_json:: json; 
355356    use  std:: { collections:: HashMap ,  sync:: Arc } ; 
356357
@@ -392,8 +393,15 @@ mod tests {
392393            "b" :  "hello" , 
393394        } ) ; 
394395
395-         let  ( rb,  _)  =
396-             into_event_batch ( json,  HashMap :: default ( ) ,  false ,  None ,  SchemaVersion :: V0 ) . unwrap ( ) ; 
396+         let  ( rb,  _)  = into_event_batch ( 
397+             json, 
398+             HashMap :: default ( ) , 
399+             Utc :: now ( ) , 
400+             false , 
401+             None , 
402+             SchemaVersion :: V0 , 
403+         ) 
404+         . unwrap ( ) ; 
397405
398406        assert_eq ! ( rb. num_rows( ) ,  1 ) ; 
399407        assert_eq ! ( rb. num_columns( ) ,  4 ) ; 
@@ -419,8 +427,15 @@ mod tests {
419427            "c" :  null
420428        } ) ; 
421429
422-         let  ( rb,  _)  =
423-             into_event_batch ( json,  HashMap :: default ( ) ,  false ,  None ,  SchemaVersion :: V0 ) . unwrap ( ) ; 
430+         let  ( rb,  _)  = into_event_batch ( 
431+             json, 
432+             HashMap :: default ( ) , 
433+             Utc :: now ( ) , 
434+             false , 
435+             None , 
436+             SchemaVersion :: V0 , 
437+         ) 
438+         . unwrap ( ) ; 
424439
425440        assert_eq ! ( rb. num_rows( ) ,  1 ) ; 
426441        assert_eq ! ( rb. num_columns( ) ,  3 ) ; 
@@ -450,7 +465,8 @@ mod tests {
450465            . into_iter ( ) , 
451466        ) ; 
452467
453-         let  ( rb,  _)  = into_event_batch ( json,  schema,  false ,  None ,  SchemaVersion :: V0 ) . unwrap ( ) ; 
468+         let  ( rb,  _)  =
469+             into_event_batch ( json,  schema,  Utc :: now ( ) ,  false ,  None ,  SchemaVersion :: V0 ) . unwrap ( ) ; 
454470
455471        assert_eq ! ( rb. num_rows( ) ,  1 ) ; 
456472        assert_eq ! ( rb. num_columns( ) ,  3 ) ; 
@@ -480,7 +496,9 @@ mod tests {
480496            . into_iter ( ) , 
481497        ) ; 
482498
483-         assert ! ( into_event_batch( json,  schema,  false ,  None ,  SchemaVersion :: V0 , ) . is_err( ) ) ; 
499+         assert ! ( 
500+             into_event_batch( json,  schema,  Utc :: now( ) ,  false ,  None ,  SchemaVersion :: V0 , ) . is_err( ) 
501+         ) ; 
484502    } 
485503
486504    #[ test]  
@@ -496,7 +514,8 @@ mod tests {
496514            . into_iter ( ) , 
497515        ) ; 
498516
499-         let  ( rb,  _)  = into_event_batch ( json,  schema,  false ,  None ,  SchemaVersion :: V0 ) . unwrap ( ) ; 
517+         let  ( rb,  _)  =
518+             into_event_batch ( json,  schema,  Utc :: now ( ) ,  false ,  None ,  SchemaVersion :: V0 ) . unwrap ( ) ; 
500519
501520        assert_eq ! ( rb. num_rows( ) ,  1 ) ; 
502521        assert_eq ! ( rb. num_columns( ) ,  1 ) ; 
@@ -535,8 +554,15 @@ mod tests {
535554            } , 
536555        ] ) ; 
537556
538-         let  ( rb,  _)  =
539-             into_event_batch ( json,  HashMap :: default ( ) ,  false ,  None ,  SchemaVersion :: V0 ) . unwrap ( ) ; 
557+         let  ( rb,  _)  = into_event_batch ( 
558+             json, 
559+             HashMap :: default ( ) , 
560+             Utc :: now ( ) , 
561+             false , 
562+             None , 
563+             SchemaVersion :: V0 , 
564+         ) 
565+         . unwrap ( ) ; 
540566
541567        assert_eq ! ( rb. num_rows( ) ,  3 ) ; 
542568        assert_eq ! ( rb. num_columns( ) ,  4 ) ; 
@@ -582,8 +608,15 @@ mod tests {
582608            } , 
583609        ] ) ; 
584610
585-         let  ( rb,  _)  =
586-             into_event_batch ( json,  HashMap :: default ( ) ,  false ,  None ,  SchemaVersion :: V0 ) . unwrap ( ) ; 
611+         let  ( rb,  _)  = into_event_batch ( 
612+             json, 
613+             HashMap :: default ( ) , 
614+             Utc :: now ( ) , 
615+             false , 
616+             None , 
617+             SchemaVersion :: V0 , 
618+         ) 
619+         . unwrap ( ) ; 
587620
588621        assert_eq ! ( rb. num_rows( ) ,  3 ) ; 
589622        assert_eq ! ( rb. num_columns( ) ,  4 ) ; 
@@ -630,7 +663,8 @@ mod tests {
630663            . into_iter ( ) , 
631664        ) ; 
632665
633-         let  ( rb,  _)  = into_event_batch ( json,  schema,  false ,  None ,  SchemaVersion :: V0 ) . unwrap ( ) ; 
666+         let  ( rb,  _)  =
667+             into_event_batch ( json,  schema,  Utc :: now ( ) ,  false ,  None ,  SchemaVersion :: V0 ) . unwrap ( ) ; 
634668
635669        assert_eq ! ( rb. num_rows( ) ,  3 ) ; 
636670        assert_eq ! ( rb. num_columns( ) ,  4 ) ; 
@@ -677,7 +711,9 @@ mod tests {
677711            . into_iter ( ) , 
678712        ) ; 
679713
680-         assert ! ( into_event_batch( json,  schema,  false ,  None ,  SchemaVersion :: V0 , ) . is_err( ) ) ; 
714+         assert ! ( 
715+             into_event_batch( json,  schema,  Utc :: now( ) ,  false ,  None ,  SchemaVersion :: V0 , ) . is_err( ) 
716+         ) ; 
681717    } 
682718
683719    #[ test]  
@@ -718,6 +754,7 @@ mod tests {
718754        let  ( rb,  _)  = into_event_batch ( 
719755            flattened_json, 
720756            HashMap :: default ( ) , 
757+             Utc :: now ( ) , 
721758            false , 
722759            None , 
723760            SchemaVersion :: V0 , 
@@ -806,6 +843,7 @@ mod tests {
806843        let  ( rb,  _)  = into_event_batch ( 
807844            flattened_json, 
808845            HashMap :: default ( ) , 
846+             Utc :: now ( ) , 
809847            false , 
810848            None , 
811849            SchemaVersion :: V1 , 
0 commit comments