diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 57d24a3fb..d9379a5b7 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -52,7 +52,7 @@ use crate::{ metrics, option::Mode, storage::{object_storage::to_bytes, retention::Retention, StreamType}, - utils::minute_to_slot, + utils::time::Minute, LOCK_EXPECT, OBJECT_STORE_DATA_GRANULARITY, }; @@ -168,7 +168,7 @@ impl Stream { "{stream_hash}.date={}.hour={:02}.minute={}.{}{hostname}.{ARROW_FILE_EXTENSION}", parsed_timestamp.date(), parsed_timestamp.hour(), - minute_to_slot(parsed_timestamp.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(), + Minute::from(parsed_timestamp).to_slot(OBJECT_STORE_DATA_GRANULARITY), custom_partition_values .iter() .sorted_by_key(|v| v.0) @@ -886,7 +886,7 @@ mod tests { "{stream_hash}.date={}.hour={:02}.minute={}.{}.{ARROW_FILE_EXTENSION}", parsed_timestamp.date(), parsed_timestamp.hour(), - minute_to_slot(parsed_timestamp.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(), + Minute::from(parsed_timestamp).to_slot(OBJECT_STORE_DATA_GRANULARITY), hostname::get().unwrap().into_string().unwrap() )); @@ -920,7 +920,7 @@ mod tests { "{stream_hash}.date={}.hour={:02}.minute={}.key1=value1.key2=value2.{}.{ARROW_FILE_EXTENSION}", parsed_timestamp.date(), parsed_timestamp.hour(), - minute_to_slot(parsed_timestamp.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(), + Minute::from(parsed_timestamp).to_slot(OBJECT_STORE_DATA_GRANULARITY), hostname::get().unwrap().into_string().unwrap() )); diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 46335da6d..021d1bf77 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -35,23 +35,6 @@ use regex::Regex; use sha2::{Digest, Sha256}; use tracing::debug; -/// Convert minutes to a slot range -/// e.g. given minute = 15 and OBJECT_STORE_DATA_GRANULARITY = 10 returns "10-19" -pub fn minute_to_slot(minute: u32, data_granularity: u32) -> Option { - if minute >= 60 { - return None; - } - - let block_n = minute / data_granularity; - let block_start = block_n * data_granularity; - if data_granularity == 1 { - return Some(format!("{block_start:02}")); - } - - let block_end = (block_n + 1) * data_granularity - 1; - Some(format!("{block_start:02}-{block_end:02}")) -} - pub fn get_ingestor_id() -> String { let now = Utc::now().to_rfc3339(); let id = get_hash(&now).to_string().split_at(15).0.to_string(); diff --git a/src/utils/time.rs b/src/utils/time.rs index 8dc808bd4..c153999f6 100644 --- a/src/utils/time.rs +++ b/src/utils/time.rs @@ -16,9 +16,7 @@ * */ -use chrono::{DateTime, NaiveDate, TimeDelta, Timelike, Utc}; - -use super::minute_to_slot; +use chrono::{DateTime, NaiveDate, NaiveDateTime, TimeDelta, Timelike, Utc}; #[derive(Debug, thiserror::Error)] pub enum TimeParseError { @@ -231,8 +229,8 @@ impl TimeRange { prefixes: &mut Vec, ) { let mut push_prefix = |block: u32| { - if let Some(minute_slot) = minute_to_slot(block * data_granularity, data_granularity) { - let prefix = format!("{hour_prefix}minute={minute_slot}/"); + if let Ok(minute) = Minute::try_from(block * data_granularity) { + let prefix = format!("{hour_prefix}minute={}/", minute.to_slot(data_granularity)); prefixes.push(prefix); } }; @@ -266,6 +264,61 @@ impl TimeRange { } } +/// Represents a minute value (0-59) and provides methods for converting it to a slot range. +/// +/// # Examples +/// +/// ``` +/// use crate::utils::time::Minute; +/// +/// let minute = Minute::try_from(15).unwrap(); +/// assert_eq!(minute.to_slot(10), "10-19"); +/// ``` +#[derive(Debug, Clone, Copy)] +pub struct Minute { + block: u32, +} + +impl TryFrom for Minute { + type Error = u32; + + /// Returns a Minute if block is an acceptable minute value, else returns it as is + fn try_from(block: u32) -> Result { + if block >= 60 { + return Err(block); + } + + Ok(Self { block }) + } +} + +impl From for Minute { + fn from(timestamp: NaiveDateTime) -> Self { + Self { + block: timestamp.minute(), + } + } +} + +impl Minute { + /// Convert minutes to a slot range + /// e.g. given minute = 15 and OBJECT_STORE_DATA_GRANULARITY = 10 returns "10-19" + /// + /// ### PANICS + /// If the provided `data_granularity` value isn't cleanly divisble from 60 + pub fn to_slot(self, data_granularity: u32) -> String { + assert!(60 % data_granularity == 0); + let block_n = self.block / data_granularity; + let block_start = block_n * data_granularity; + if data_granularity == 1 { + return format!("{block_start:02}"); + } + + let block_end = (block_n + 1) * data_granularity - 1; + format!("{block_start:02}-{block_end:02}") + } +} + #[cfg(test)] mod tests { use super::*; @@ -421,4 +474,43 @@ mod tests { let left = prefixes.iter().map(String::as_str).collect::>(); assert_eq!(left.as_slice(), right); } + + #[test] + fn valid_minute_to_minute_slot() { + let res = Minute::try_from(10); + assert!(res.is_ok()); + assert_eq!(res.unwrap().to_slot(1), "10"); + } + + #[test] + fn invalid_minute() { + assert!(Minute::try_from(100).is_err()); + } + + #[test] + fn minute_from_timestamp() { + let timestamp = + NaiveDateTime::parse_from_str("2025-01-01 02:03", "%Y-%m-%d %H:%M").unwrap(); + assert_eq!(Minute::from(timestamp).to_slot(1), "03"); + } + + #[test] + fn slot_5_min_from_timestamp() { + let timestamp = + NaiveDateTime::parse_from_str("2025-01-01 02:03", "%Y-%m-%d %H:%M").unwrap(); + assert_eq!(Minute::from(timestamp).to_slot(5), "00-04"); + } + + #[test] + fn slot_30_min_from_timestamp() { + let timestamp = + NaiveDateTime::parse_from_str("2025-01-01 02:33", "%Y-%m-%d %H:%M").unwrap(); + assert_eq!(Minute::from(timestamp).to_slot(30), "30-59"); + } + + #[test] + #[should_panic] + fn illegal_slot_granularity() { + Minute::try_from(0).unwrap().to_slot(40); + } }