From 6e1dd5cffaaa1a0fb9b86b5dea03b4b412a6b1cf Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 21 Feb 2025 15:14:28 +0530 Subject: [PATCH 1/6] refactor: `Minute` --- src/parseable/streams.rs | 29 +++++++++++------------ src/utils/mod.rs | 17 -------------- src/utils/time.rs | 50 ++++++++++++++++++++++++++++++++++++---- 3 files changed, 58 insertions(+), 38 deletions(-) diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 7d8decdca..6ac2443cc 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -29,7 +29,7 @@ use std::{ use arrow_array::RecordBatch; use arrow_ipc::writer::StreamWriter; use arrow_schema::{Field, Fields, Schema}; -use chrono::{NaiveDateTime, Timelike, Utc}; +use chrono::{NaiveDateTime, Utc}; use derive_more::{Deref, DerefMut}; use itertools::Itertools; use parquet::{ @@ -52,7 +52,7 @@ use crate::{ storage::{ object_storage::to_bytes, retention::Retention, StreamType, OBJECT_STORE_DATA_GRANULARITY, }, - utils::minute_to_slot, + utils::time::Minute, LOCK_EXPECT, }; @@ -157,16 +157,15 @@ impl Stream { hostname.push_str(id); } let filename = format!( - "{}{stream_hash}.date={}.hour={:02}.minute={}.{}{hostname}.{ARROW_FILE_EXTENSION}", + "{}{stream_hash}.{}.minute={}.{}.{hostname}.{ARROW_FILE_EXTENSION}", Utc::now().format("%Y%m%dT%H%M"), - parsed_timestamp.date(), - parsed_timestamp.hour(), - minute_to_slot(parsed_timestamp.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(), + parsed_timestamp.format("date=%Y-%m-%d.hour=%H"), + Minute::from(parsed_timestamp).to_slot(OBJECT_STORE_DATA_GRANULARITY), custom_partition_values .iter() .sorted_by_key(|v| v.0) - .map(|(key, value)| format!("{key}={value}.")) - .join("") + .map(|(key, value)| format!("{key}={value}")) + .join(".") ); self.data_path.join(filename) } @@ -860,11 +859,10 @@ mod tests { ); let expected_path = staging.data_path.join(format!( - "{}{stream_hash}.date={}.hour={:02}.minute={}.{}.{ARROW_FILE_EXTENSION}", + "{}{stream_hash}.{}.minute={}.{}.{ARROW_FILE_EXTENSION}", Utc::now().format("%Y%m%dT%H%M"), - parsed_timestamp.date(), - parsed_timestamp.hour(), - minute_to_slot(parsed_timestamp.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(), + parsed_timestamp.format("date=%Y-%m-%d.hour=%H"), + Minute::from(parsed_timestamp).to_slot(OBJECT_STORE_DATA_GRANULARITY), hostname::get().unwrap().into_string().unwrap() )); @@ -895,11 +893,10 @@ mod tests { ); let expected_path = staging.data_path.join(format!( - "{}{stream_hash}.date={}.hour={:02}.minute={}.key1=value1.key2=value2.{}.{ARROW_FILE_EXTENSION}", + "{}{stream_hash}.{}.minute={}.key1=value1.key2=value2.{}.{ARROW_FILE_EXTENSION}", Utc::now().format("%Y%m%dT%H%M"), - parsed_timestamp.date(), - parsed_timestamp.hour(), - minute_to_slot(parsed_timestamp.minute(), OBJECT_STORE_DATA_GRANULARITY).unwrap(), + parsed_timestamp.format("date=%Y-%m-%d.hour=%H"), + Minute::from(parsed_timestamp).to_slot(OBJECT_STORE_DATA_GRANULARITY), hostname::get().unwrap().into_string().unwrap() )); diff --git a/src/utils/mod.rs b/src/utils/mod.rs index 46335da6d..021d1bf77 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -35,23 +35,6 @@ use regex::Regex; use sha2::{Digest, Sha256}; use tracing::debug; -/// Convert minutes to a slot range -/// e.g. given minute = 15 and OBJECT_STORE_DATA_GRANULARITY = 10 returns "10-19" -pub fn minute_to_slot(minute: u32, data_granularity: u32) -> Option { - if minute >= 60 { - return None; - } - - let block_n = minute / data_granularity; - let block_start = block_n * data_granularity; - if data_granularity == 1 { - return Some(format!("{block_start:02}")); - } - - let block_end = (block_n + 1) * data_granularity - 1; - Some(format!("{block_start:02}-{block_end:02}")) -} - pub fn get_ingestor_id() -> String { let now = Utc::now().to_rfc3339(); let id = get_hash(&now).to_string().split_at(15).0.to_string(); diff --git a/src/utils/time.rs b/src/utils/time.rs index 8dc808bd4..a56f4bb29 100644 --- a/src/utils/time.rs +++ b/src/utils/time.rs @@ -16,9 +16,7 @@ * */ -use chrono::{DateTime, NaiveDate, TimeDelta, Timelike, Utc}; - -use super::minute_to_slot; +use chrono::{DateTime, NaiveDate, NaiveDateTime, TimeDelta, Timelike, Utc}; #[derive(Debug, thiserror::Error)] pub enum TimeParseError { @@ -231,8 +229,8 @@ impl TimeRange { prefixes: &mut Vec, ) { let mut push_prefix = |block: u32| { - if let Some(minute_slot) = minute_to_slot(block * data_granularity, data_granularity) { - let prefix = format!("{hour_prefix}minute={minute_slot}/"); + if let Ok(minute) = Minute::try_from(block * data_granularity) { + let prefix = format!("{hour_prefix}minute={}/", minute.to_slot(data_granularity)); prefixes.push(prefix); } }; @@ -266,6 +264,48 @@ impl TimeRange { } } +/// Describes a minute block +#[derive(Debug, Clone, Copy)] +pub struct Minute { + block: u32, +} + +impl TryFrom for Minute { + type Error = u32; + + /// Returns a Minute if block is an acceptable minute value, else returns it as is + fn try_from(block: u32) -> Result { + if block >= 60 { + return Err(block); + } + + Ok(Self { block }) + } +} + +impl From for Minute { + fn from(timestamp: NaiveDateTime) -> Self { + Self { + block: timestamp.minute(), + } + } +} + +impl Minute { + /// Convert minutes to a slot range + /// e.g. given minute = 15 and OBJECT_STORE_DATA_GRANULARITY = 10 returns "10-19" + pub fn to_slot(self, data_granularity: u32) -> String { + let block_n = self.block / data_granularity; + let block_start = block_n * data_granularity; + if data_granularity == 1 { + return format!("{block_start:02}"); + } + + let block_end = (block_n + 1) * data_granularity - 1; + format!("{block_start:02}-{block_end:02}") + } +} + #[cfg(test)] mod tests { use super::*; From 2a9b1894ef298865c20bb9976073228fa07df00d Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 21 Feb 2025 15:22:32 +0530 Subject: [PATCH 2/6] fix: no custom-partitions --- src/parseable/streams.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index 6ac2443cc..22430ddde 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -157,15 +157,15 @@ impl Stream { hostname.push_str(id); } let filename = format!( - "{}{stream_hash}.{}.minute={}.{}.{hostname}.{ARROW_FILE_EXTENSION}", + "{}{stream_hash}.{}.minute={}.{}{hostname}.{ARROW_FILE_EXTENSION}", Utc::now().format("%Y%m%dT%H%M"), parsed_timestamp.format("date=%Y-%m-%d.hour=%H"), Minute::from(parsed_timestamp).to_slot(OBJECT_STORE_DATA_GRANULARITY), custom_partition_values .iter() .sorted_by_key(|v| v.0) - .map(|(key, value)| format!("{key}={value}")) - .join(".") + .map(|(key, value)| format!("{key}={value}.")) + .join("") ); self.data_path.join(filename) } From 8b4868082c04231c20b3de42da04fc708f6fb852 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 21 Feb 2025 15:24:00 +0530 Subject: [PATCH 3/6] doc: `Minute` --- src/utils/time.rs | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/src/utils/time.rs b/src/utils/time.rs index a56f4bb29..b72924198 100644 --- a/src/utils/time.rs +++ b/src/utils/time.rs @@ -264,7 +264,16 @@ impl TimeRange { } } -/// Describes a minute block +/// Represents a minute value (0-59) and provides methods for converting it to a slot range. +/// +/// # Examples +/// +/// ``` +/// use crate::utils::time::Minute; +/// +/// let minute = Minute::try_from(15).unwrap(); +/// assert_eq!(minute.to_slot(10), "10-19"); +/// ``` #[derive(Debug, Clone, Copy)] pub struct Minute { block: u32, From 5a8dcb0b6ca8c262b11b5af61d2296c87c247ee5 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 21 Feb 2025 15:36:17 +0530 Subject: [PATCH 4/6] test: minute and slotting --- src/utils/time.rs | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/src/utils/time.rs b/src/utils/time.rs index b72924198..0b2f5d993 100644 --- a/src/utils/time.rs +++ b/src/utils/time.rs @@ -470,4 +470,34 @@ mod tests { let left = prefixes.iter().map(String::as_str).collect::>(); assert_eq!(left.as_slice(), right); } + + #[test] + fn valid_minute_to_minute_slot() { + let res = Minute::try_from(10); + assert!(res.is_ok()); + assert_eq!(res.unwrap().to_slot(1), "10"); + } + + #[test] + fn invalid_minute() { + assert!(Minute::try_from(100).is_err()); + } + + #[test] + fn minute_from_timestamp() { + let timestamp = NaiveDateTime::parse_from_str("2025-01-01 02:03", "%Y-%m-%d %H:%M").unwrap(); + assert_eq!(Minute::from(timestamp).to_slot(1), "03"); + } + + #[test] + fn slot_5_min_from_timestamp() { + let timestamp = NaiveDateTime::parse_from_str("2025-01-01 02:03", "%Y-%m-%d %H:%M").unwrap(); + assert_eq!(Minute::from(timestamp).to_slot(5), "00-04"); + } + + #[test] + fn slot_30_min_from_timestamp() { + let timestamp = NaiveDateTime::parse_from_str("2025-01-01 02:33", "%Y-%m-%d %H:%M").unwrap(); + assert_eq!(Minute::from(timestamp).to_slot(30), "30-59"); + } } From 853a2fe8869ba2153907e587478637d6e248e855 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 21 Feb 2025 15:45:48 +0530 Subject: [PATCH 5/6] panic on unacceptable granularity --- src/utils/time.rs | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/utils/time.rs b/src/utils/time.rs index 0b2f5d993..c325f71db 100644 --- a/src/utils/time.rs +++ b/src/utils/time.rs @@ -303,7 +303,11 @@ impl From for Minute { impl Minute { /// Convert minutes to a slot range /// e.g. given minute = 15 and OBJECT_STORE_DATA_GRANULARITY = 10 returns "10-19" + /// + /// ### PANICS + /// If the provided `data_granularity` value isn't cleanly divisble from 60 pub fn to_slot(self, data_granularity: u32) -> String { + assert!(data_granularity % 60 == 0); let block_n = self.block / data_granularity; let block_start = block_n * data_granularity; if data_granularity == 1 { @@ -500,4 +504,10 @@ mod tests { let timestamp = NaiveDateTime::parse_from_str("2025-01-01 02:33", "%Y-%m-%d %H:%M").unwrap(); assert_eq!(Minute::from(timestamp).to_slot(30), "30-59"); } + + #[test] + #[should_panic] + fn illegal_slot_granularity() { + Minute::try_from(0).unwrap().to_slot(40); + } } From 83f7756030cd6633735da7f19d8a6ebaf3696396 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 21 Feb 2025 16:05:18 +0530 Subject: [PATCH 6/6] fix: divisbility --- src/utils/time.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/utils/time.rs b/src/utils/time.rs index c325f71db..c1fd7a96b 100644 --- a/src/utils/time.rs +++ b/src/utils/time.rs @@ -307,7 +307,7 @@ impl Minute { /// ### PANICS /// If the provided `data_granularity` value isn't cleanly divisble from 60 pub fn to_slot(self, data_granularity: u32) -> String { - assert!(data_granularity % 60 == 0); + assert!(60 % data_granularity == 0); let block_n = self.block / data_granularity; let block_start = block_n * data_granularity; if data_granularity == 1 {