Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 24 additions & 17 deletions src/parseable/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@ use std::{
path::{Path, PathBuf},
process,
sync::{Arc, Mutex, RwLock},
time::{SystemTime, UNIX_EPOCH},
};

use arrow_array::RecordBatch;
use arrow_ipc::writer::StreamWriter;
use arrow_schema::{Field, Fields, Schema};
use chrono::{NaiveDateTime, Timelike, Utc};
use chrono::{NaiveDateTime, Timelike};
use derive_more::{Deref, DerefMut};
use itertools::Itertools;
use parquet::{
Expand Down Expand Up @@ -72,6 +73,14 @@ const ARROW_FILE_EXTENSION: &str = "data.arrows";

pub type StreamRef = Arc<Stream>;

/// Gets the unix timestamp for the minute as described by the `SystemTime`
fn minute_from_system_time(time: SystemTime) -> u128 {
time.duration_since(UNIX_EPOCH)
.expect("Legitimate time")
.as_millis()
/ 60000
}

/// All state associated with a single logstream in Parseable.
pub struct Stream {
pub stream_name: String,
Expand Down Expand Up @@ -156,8 +165,7 @@ impl Stream {
hostname.push_str(id);
}
let filename = format!(
"{}{stream_hash}.date={}.hour={:02}.minute={}.{}{hostname}.{ARROW_FILE_EXTENSION}",
Utc::now().format("%Y%m%dT%H%M"),
"{stream_hash}.date={}.hour={:02}.minute={}.{}{hostname}.{ARROW_FILE_EXTENSION}",
parsed_timestamp.date(),
parsed_timestamp.hour(),
minute_to_slot(parsed_timestamp.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(),
Expand Down Expand Up @@ -192,7 +200,7 @@ impl Stream {
/// Only includes ones starting from the previous minute
pub fn arrow_files_grouped_exclude_time(
&self,
exclude: NaiveDateTime,
exclude: SystemTime,
shutdown_signal: bool,
) -> HashMap<PathBuf, Vec<PathBuf>> {
let mut grouped_arrow_file: HashMap<PathBuf, Vec<PathBuf>> = HashMap::new();
Expand All @@ -202,12 +210,13 @@ impl Stream {
// don't keep the ones for the current minute
if !shutdown_signal {
arrow_files.retain(|path| {
!path
.file_name()
.unwrap()
.to_str()
.unwrap()
.starts_with(&exclude.format("%Y%m%dT%H%M").to_string())
let creation = path
.metadata()
.expect("Arrow file should exist on disk")
.created()
.expect("Creation time should be accessible");
// Compare if creation time is actually from previous minute
minute_from_system_time(creation) < minute_from_system_time(exclude)
});
}

Expand Down Expand Up @@ -429,8 +438,8 @@ impl Stream {
) -> Result<Option<Schema>, StagingError> {
let mut schemas = Vec::new();

let time = chrono::Utc::now().naive_utc();
let staging_files = self.arrow_files_grouped_exclude_time(time, shutdown_signal);
let now = SystemTime::now();
let staging_files = self.arrow_files_grouped_exclude_time(now, shutdown_signal);
if staging_files.is_empty() {
metrics::STAGING_FILES
.with_label_values(&[&self.stream_name])
Expand Down Expand Up @@ -757,7 +766,7 @@ mod tests {

use arrow_array::{Int32Array, StringArray, TimestampMillisecondArray};
use arrow_schema::{DataType, Field, TimeUnit};
use chrono::{NaiveDate, TimeDelta};
use chrono::{NaiveDate, TimeDelta, Utc};
use temp_dir::TempDir;
use tokio::time::sleep;

Expand Down Expand Up @@ -874,8 +883,7 @@ mod tests {
);

let expected_path = staging.data_path.join(format!(
"{}{stream_hash}.date={}.hour={:02}.minute={}.{}.{ARROW_FILE_EXTENSION}",
Utc::now().format("%Y%m%dT%H%M"),
"{stream_hash}.date={}.hour={:02}.minute={}.{}.{ARROW_FILE_EXTENSION}",
parsed_timestamp.date(),
parsed_timestamp.hour(),
minute_to_slot(parsed_timestamp.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(),
Expand Down Expand Up @@ -909,8 +917,7 @@ mod tests {
);

let expected_path = staging.data_path.join(format!(
"{}{stream_hash}.date={}.hour={:02}.minute={}.key1=value1.key2=value2.{}.{ARROW_FILE_EXTENSION}",
Utc::now().format("%Y%m%dT%H%M"),
"{stream_hash}.date={}.hour={:02}.minute={}.key1=value1.key2=value2.{}.{ARROW_FILE_EXTENSION}",
parsed_timestamp.date(),
parsed_timestamp.hour(),
minute_to_slot(parsed_timestamp.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(),
Expand Down
Loading