@@ -24,13 +24,13 @@ use std::{
2424 path:: { Path , PathBuf } ,
2525 process,
2626 sync:: { Arc , Mutex , RwLock } ,
27- time:: UNIX_EPOCH ,
27+ time:: { SystemTime , UNIX_EPOCH } ,
2828} ;
2929
3030use arrow_array:: RecordBatch ;
3131use arrow_ipc:: writer:: StreamWriter ;
3232use arrow_schema:: { Field , Fields , Schema } ;
33- use chrono:: { DateTime , NaiveDateTime , Timelike , Utc } ;
33+ use chrono:: { NaiveDateTime , Timelike , Utc } ;
3434use derive_more:: { Deref , DerefMut } ;
3535use itertools:: Itertools ;
3636use parquet:: {
@@ -73,6 +73,14 @@ const ARROW_FILE_EXTENSION: &str = "data.arrows";
7373
7474pub type StreamRef = Arc < Stream > ;
7575
76+ /// Gets the unix timestamp for the minute as described by the `SystemTime`
77+ fn minute_from_system_time ( time : SystemTime ) -> u128 {
78+ time. duration_since ( UNIX_EPOCH )
79+ . expect ( "Legitimate time" )
80+ . as_millis ( )
81+ / 60000
82+ }
83+
7684/// All state associated with a single logstream in Parseable.
7785pub struct Stream {
7886 pub stream_name : String ,
@@ -193,7 +201,7 @@ impl Stream {
193201 /// Only includes ones starting from the previous minute
194202 pub fn arrow_files_grouped_exclude_time (
195203 & self ,
196- exclude : DateTime < Utc > ,
204+ exclude : SystemTime ,
197205 shutdown_signal : bool ,
198206 ) -> HashMap < PathBuf , Vec < PathBuf > > {
199207 let mut grouped_arrow_file: HashMap < PathBuf , Vec < PathBuf > > = HashMap :: new ( ) ;
@@ -203,14 +211,13 @@ impl Stream {
203211 // don't keep the ones for the current minute
204212 if !shutdown_signal {
205213 arrow_files. retain ( |path| {
206- path. metadata ( )
214+ let creation = path
215+ . metadata ( )
207216 . expect ( "Arrow file should exist on disk" )
208217 . created ( )
209- . expect ( "Creation time should be accessible" )
210- . duration_since ( UNIX_EPOCH )
211- . expect ( "Unix Timestamp Duration" )
212- . as_millis ( )
213- < exclude. timestamp_millis ( ) as u128
218+ . expect ( "Creation time should be accessible" ) ;
219+ // Compare if creation time is actually from previous minute
220+ minute_from_system_time ( creation) < minute_from_system_time ( exclude)
214221 } ) ;
215222 }
216223
@@ -432,7 +439,8 @@ impl Stream {
432439 ) -> Result < Option < Schema > , StagingError > {
433440 let mut schemas = Vec :: new ( ) ;
434441
435- let staging_files = self . arrow_files_grouped_exclude_time ( Utc :: now ( ) , shutdown_signal) ;
442+ let now = SystemTime :: now ( ) ;
443+ let staging_files = self . arrow_files_grouped_exclude_time ( now, shutdown_signal) ;
436444 if staging_files. is_empty ( ) {
437445 metrics:: STAGING_FILES
438446 . with_label_values ( & [ & self . stream_name ] )
0 commit comments