Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 11 additions & 14 deletions src/parseable/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use std::{
use arrow_array::RecordBatch;
use arrow_ipc::writer::StreamWriter;
use arrow_schema::{Field, Fields, Schema};
use chrono::{NaiveDateTime, Timelike, Utc};
use chrono::{NaiveDateTime, Utc};
use derive_more::{Deref, DerefMut};
use itertools::Itertools;
use parquet::{
Expand All @@ -52,7 +52,7 @@ use crate::{
storage::{
object_storage::to_bytes, retention::Retention, StreamType, OBJECT_STORE_DATA_GRANULARITY,
},
utils::minute_to_slot,
utils::time::Minute,
LOCK_EXPECT,
};

Expand Down Expand Up @@ -157,11 +157,10 @@ impl Stream {
hostname.push_str(id);
}
let filename = format!(
"{}{stream_hash}.date={}.hour={:02}.minute={}.{}{hostname}.{ARROW_FILE_EXTENSION}",
"{}{stream_hash}.{}.minute={}.{}{hostname}.{ARROW_FILE_EXTENSION}",
Utc::now().format("%Y%m%dT%H%M"),
parsed_timestamp.date(),
parsed_timestamp.hour(),
minute_to_slot(parsed_timestamp.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(),
parsed_timestamp.format("date=%Y-%m-%d.hour=%H"),
Minute::from(parsed_timestamp).to_slot(OBJECT_STORE_DATA_GRANULARITY),
custom_partition_values
.iter()
.sorted_by_key(|v| v.0)
Expand Down Expand Up @@ -860,11 +859,10 @@ mod tests {
);

let expected_path = staging.data_path.join(format!(
"{}{stream_hash}.date={}.hour={:02}.minute={}.{}.{ARROW_FILE_EXTENSION}",
"{}{stream_hash}.{}.minute={}.{}.{ARROW_FILE_EXTENSION}",
Utc::now().format("%Y%m%dT%H%M"),
parsed_timestamp.date(),
parsed_timestamp.hour(),
minute_to_slot(parsed_timestamp.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(),
parsed_timestamp.format("date=%Y-%m-%d.hour=%H"),
Minute::from(parsed_timestamp).to_slot(OBJECT_STORE_DATA_GRANULARITY),
hostname::get().unwrap().into_string().unwrap()
));

Expand Down Expand Up @@ -895,11 +893,10 @@ mod tests {
);

let expected_path = staging.data_path.join(format!(
"{}{stream_hash}.date={}.hour={:02}.minute={}.key1=value1.key2=value2.{}.{ARROW_FILE_EXTENSION}",
"{}{stream_hash}.{}.minute={}.key1=value1.key2=value2.{}.{ARROW_FILE_EXTENSION}",
Utc::now().format("%Y%m%dT%H%M"),
parsed_timestamp.date(),
parsed_timestamp.hour(),
minute_to_slot(parsed_timestamp.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(),
parsed_timestamp.format("date=%Y-%m-%d.hour=%H"),
Minute::from(parsed_timestamp).to_slot(OBJECT_STORE_DATA_GRANULARITY),
hostname::get().unwrap().into_string().unwrap()
));

Expand Down
17 changes: 0 additions & 17 deletions src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,6 @@ use regex::Regex;
use sha2::{Digest, Sha256};
use tracing::debug;

/// Convert minutes to a slot range
/// e.g. given minute = 15 and OBJECT_STORE_DATA_GRANULARITY = 10 returns "10-19"
pub fn minute_to_slot(minute: u32, data_granularity: u32) -> Option<String> {
if minute >= 60 {
return None;
}

let block_n = minute / data_granularity;
let block_start = block_n * data_granularity;
if data_granularity == 1 {
return Some(format!("{block_start:02}"));
}

let block_end = (block_n + 1) * data_granularity - 1;
Some(format!("{block_start:02}-{block_end:02}"))
}

pub fn get_ingestor_id() -> String {
let now = Utc::now().to_rfc3339();
let id = get_hash(&now).to_string().split_at(15).0.to_string();
Expand Down
59 changes: 54 additions & 5 deletions src/utils/time.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,7 @@
*
*/

use chrono::{DateTime, NaiveDate, TimeDelta, Timelike, Utc};

use super::minute_to_slot;
use chrono::{DateTime, NaiveDate, NaiveDateTime, TimeDelta, Timelike, Utc};

#[derive(Debug, thiserror::Error)]
pub enum TimeParseError {
Expand Down Expand Up @@ -231,8 +229,8 @@ impl TimeRange {
prefixes: &mut Vec<String>,
) {
let mut push_prefix = |block: u32| {
if let Some(minute_slot) = minute_to_slot(block * data_granularity, data_granularity) {
let prefix = format!("{hour_prefix}minute={minute_slot}/");
if let Ok(minute) = Minute::try_from(block * data_granularity) {
let prefix = format!("{hour_prefix}minute={}/", minute.to_slot(data_granularity));
prefixes.push(prefix);
}
};
Expand Down Expand Up @@ -266,6 +264,57 @@ impl TimeRange {
}
}

/// Represents a minute value (0-59) and provides methods for converting it to a slot range.
///
/// # Examples
///
/// ```
/// use crate::utils::time::Minute;
///
/// let minute = Minute::try_from(15).unwrap();
/// assert_eq!(minute.to_slot(10), "10-19");
/// ```
#[derive(Debug, Clone, Copy)]
pub struct Minute {
block: u32,
}

impl TryFrom<u32> for Minute {
type Error = u32;

/// Returns a Minute if block is an acceptable minute value, else returns it as is
fn try_from(block: u32) -> Result<Self, Self::Error> {
if block >= 60 {
return Err(block);
}

Ok(Self { block })
}
}

impl From<NaiveDateTime> for Minute {
fn from(timestamp: NaiveDateTime) -> Self {
Self {
block: timestamp.minute(),
}
}
}

impl Minute {
/// Convert minutes to a slot range
/// e.g. given minute = 15 and OBJECT_STORE_DATA_GRANULARITY = 10 returns "10-19"
pub fn to_slot(self, data_granularity: u32) -> String {
let block_n = self.block / data_granularity;
let block_start = block_n * data_granularity;
if data_granularity == 1 {
return format!("{block_start:02}");
}

let block_end = (block_n + 1) * data_granularity - 1;
format!("{block_start:02}-{block_end:02}")
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading