diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 34bfc4d5a..57d24a3fb 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -24,12 +24,13 @@ use std::{ path::{Path, PathBuf}, process, sync::{Arc, Mutex, RwLock}, + time::{SystemTime, UNIX_EPOCH}, }; use arrow_array::RecordBatch; use arrow_ipc::writer::StreamWriter; use arrow_schema::{Field, Fields, Schema}; -use chrono::{NaiveDateTime, Timelike, Utc}; +use chrono::{NaiveDateTime, Timelike}; use derive_more::{Deref, DerefMut}; use itertools::Itertools; use parquet::{ @@ -72,6 +73,14 @@ const ARROW_FILE_EXTENSION: &str = "data.arrows"; pub type StreamRef = Arc; +/// Gets the unix timestamp for the minute as described by the `SystemTime` +fn minute_from_system_time(time: SystemTime) -> u128 { + time.duration_since(UNIX_EPOCH) + .expect("Legitimate time") + .as_millis() + / 60000 +} + /// All state associated with a single logstream in Parseable. pub struct Stream { pub stream_name: String, @@ -156,8 +165,7 @@ impl Stream { hostname.push_str(id); } let filename = format!( - "{}{stream_hash}.date={}.hour={:02}.minute={}.{}{hostname}.{ARROW_FILE_EXTENSION}", - Utc::now().format("%Y%m%dT%H%M"), + "{stream_hash}.date={}.hour={:02}.minute={}.{}{hostname}.{ARROW_FILE_EXTENSION}", parsed_timestamp.date(), parsed_timestamp.hour(), minute_to_slot(parsed_timestamp.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(), @@ -192,7 +200,7 @@ impl Stream { /// Only includes ones starting from the previous minute pub fn arrow_files_grouped_exclude_time( &self, - exclude: NaiveDateTime, + exclude: SystemTime, shutdown_signal: bool, ) -> HashMap> { let mut grouped_arrow_file: HashMap> = HashMap::new(); @@ -202,12 +210,13 @@ impl Stream { // don't keep the ones for the current minute if !shutdown_signal { arrow_files.retain(|path| { - !path - .file_name() - .unwrap() - .to_str() - .unwrap() - .starts_with(&exclude.format("%Y%m%dT%H%M").to_string()) + let creation = path + .metadata() + .expect("Arrow file should exist on disk") + .created() + .expect("Creation time should be accessible"); + // Compare if creation time is actually from previous minute + minute_from_system_time(creation) < minute_from_system_time(exclude) }); } @@ -429,8 +438,8 @@ impl Stream { ) -> Result, StagingError> { let mut schemas = Vec::new(); - let time = chrono::Utc::now().naive_utc(); - let staging_files = self.arrow_files_grouped_exclude_time(time, shutdown_signal); + let now = SystemTime::now(); + let staging_files = self.arrow_files_grouped_exclude_time(now, shutdown_signal); if staging_files.is_empty() { metrics::STAGING_FILES .with_label_values(&[&self.stream_name]) @@ -757,7 +766,7 @@ mod tests { use arrow_array::{Int32Array, StringArray, TimestampMillisecondArray}; use arrow_schema::{DataType, Field, TimeUnit}; - use chrono::{NaiveDate, TimeDelta}; + use chrono::{NaiveDate, TimeDelta, Utc}; use temp_dir::TempDir; use tokio::time::sleep; @@ -874,8 +883,7 @@ mod tests { ); let expected_path = staging.data_path.join(format!( - "{}{stream_hash}.date={}.hour={:02}.minute={}.{}.{ARROW_FILE_EXTENSION}", - Utc::now().format("%Y%m%dT%H%M"), + "{stream_hash}.date={}.hour={:02}.minute={}.{}.{ARROW_FILE_EXTENSION}", parsed_timestamp.date(), parsed_timestamp.hour(), minute_to_slot(parsed_timestamp.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(), @@ -909,8 +917,7 @@ mod tests { ); let expected_path = staging.data_path.join(format!( - "{}{stream_hash}.date={}.hour={:02}.minute={}.key1=value1.key2=value2.{}.{ARROW_FILE_EXTENSION}", - Utc::now().format("%Y%m%dT%H%M"), + "{stream_hash}.date={}.hour={:02}.minute={}.key1=value1.key2=value2.{}.{ARROW_FILE_EXTENSION}", parsed_timestamp.date(), parsed_timestamp.hour(), minute_to_slot(parsed_timestamp.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(),