Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 3 additions & 7 deletions src/query/listing_table_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use object_store::{path::Path, ObjectMeta, ObjectStore};
use crate::{
event::DEFAULT_TIMESTAMP_KEY,
storage::{ObjectStorage, OBJECT_STORE_DATA_GRANULARITY},
utils::TimePeriod,
utils::time::TimeRange,
};

use super::PartialTimeFilter;
Expand Down Expand Up @@ -88,12 +88,8 @@ impl ListingTableBuilder {
};

// Generate prefixes for the given time range
let prefixes = TimePeriod::new(
start_time.and_utc(),
end_time.and_utc(),
OBJECT_STORE_DATA_GRANULARITY,
)
.generate_prefixes();
let prefixes = TimeRange::new(start_time.and_utc(), end_time.and_utc())
.generate_prefixes(OBJECT_STORE_DATA_GRANULARITY);

// Categorizes prefixes into "minute" and general resolve lists.
let mut minute_resolve = HashMap::<String, Vec<String>>::new();
Expand Down
232 changes: 1 addition & 231 deletions src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use crate::rbac::role::{Action, Permission};
use crate::rbac::Users;
use actix::extract_session_key_from_req;
use actix_web::HttpRequest;
use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime, Timelike, Utc};
use chrono::{NaiveDate, NaiveDateTime, NaiveTime, Utc};
use itertools::Itertools;
use regex::Regex;
use sha2::{Digest, Sha256};
Expand Down Expand Up @@ -85,153 +85,6 @@ pub fn minute_to_prefix(minute: u32, data_granularity: u32) -> Option<String> {
))
}

pub struct TimePeriod {
start: DateTime<Utc>,
end: DateTime<Utc>,
data_granularity: u32,
}

impl TimePeriod {
pub fn new(start: DateTime<Utc>, end: DateTime<Utc>, data_granularity: u32) -> Self {
Self {
data_granularity,
start,
end,
}
}

pub fn generate_prefixes(&self) -> Vec<String> {
let end_minute = self.end.minute() + u32::from(self.end.second() > 0);
self.generate_date_prefixes(
self.start.date_naive(),
self.end.date_naive(),
(self.start.hour(), self.start.minute()),
(self.end.hour(), end_minute),
)
}

pub fn generate_minute_prefixes(
&self,
prefix: &str,
start_minute: u32,
end_minute: u32,
) -> Vec<String> {
if start_minute == end_minute {
return vec![];
}

let (start_block, end_block) = (
start_minute / self.data_granularity,
end_minute / self.data_granularity,
);

let forbidden_block = 60 / self.data_granularity;

// ensure both start and end are within the same hour, else return prefix as is
if end_block - start_block >= forbidden_block {
return vec![prefix.to_owned()];
}

let mut prefixes = vec![];

let push_prefix = |block: u32, prefixes: &mut Vec<_>| {
if let Some(minute_prefix) =
minute_to_prefix(block * self.data_granularity, self.data_granularity)
{
let prefix = prefix.to_owned() + &minute_prefix;
prefixes.push(prefix);
}
};

for block in start_block..end_block {
push_prefix(block, &mut prefixes);
}

// NOTE: for block sizes larger than a minute ensure
// ensure last block is considered
if self.data_granularity > 1 {
push_prefix(end_block, &mut prefixes);
}

prefixes
}

pub fn generate_hour_prefixes(
&self,
prefix: &str,
start_hour: u32,
start_minute: u32,
end_hour: u32,
end_minute: u32,
) -> Vec<String> {
// ensure both start and end are within the same day
if end_hour - start_hour >= 24 {
return vec![prefix.to_owned()];
}

let mut prefixes = vec![];

for hour in start_hour..=end_hour {
if hour == 24 {
break;
}
let prefix = prefix.to_owned() + &hour_to_prefix(hour);
let is_start = hour == start_hour;
let is_end = hour == end_hour;

if is_start || is_end {
let minute_prefixes = self.generate_minute_prefixes(
&prefix,
if is_start { start_minute } else { 0 },
if is_end { end_minute } else { 60 },
);
prefixes.extend(minute_prefixes);
} else {
prefixes.push(prefix);
}
}

prefixes
}

pub fn generate_date_prefixes(
&self,
start_date: NaiveDate,
end_date: NaiveDate,
start_time: (u32, u32),
end_time: (u32, u32),
) -> Vec<String> {
let mut prefixes = vec![];
let mut date = start_date;

while date <= end_date {
let prefix = date_to_prefix(date);
let is_start = date == start_date;
let is_end = date == end_date;

if is_start || is_end {
let ((start_hour, start_minute), (end_hour, end_minute)) = (
if is_start { start_time } else { (0, 0) },
if is_end { end_time } else { (24, 60) },
);
let hour_prefixes = self.generate_hour_prefixes(
&prefix,
start_hour,
start_minute,
end_hour,
end_minute,
);
prefixes.extend(hour_prefixes);
} else {
prefixes.push(prefix);
}
date = date.succ_opt().unwrap();
}

prefixes
}
}

pub fn get_url() -> Url {
if CONFIG.options.ingestor_endpoint.is_empty() {
return format!(
Expand Down Expand Up @@ -373,86 +226,3 @@ pub fn user_auth_for_query(

Ok(())
}

#[cfg(test)]
mod tests {
use chrono::DateTime;
use rstest::*;

use super::TimePeriod;

fn time_period_from_str(start: &str, end: &str) -> TimePeriod {
TimePeriod::new(
DateTime::parse_from_rfc3339(start).unwrap().into(),
DateTime::parse_from_rfc3339(end).unwrap().into(),
1,
)
}

#[rstest]
#[case::same_minute(
"2022-06-11T16:30:00+00:00", "2022-06-11T16:30:59+00:00",
&["date=2022-06-11/hour=16/minute=30/"]
)]
#[case::same_hour_different_minute(
"2022-06-11T16:57:00+00:00", "2022-06-11T16:59:00+00:00",
&[
"date=2022-06-11/hour=16/minute=57/",
"date=2022-06-11/hour=16/minute=58/"
]
)]
#[case::same_hour_with_00_to_59_minute_block(
"2022-06-11T16:00:00+00:00", "2022-06-11T16:59:59+00:00",
&["date=2022-06-11/hour=16/"]
)]
#[case::same_date_different_hours_coherent_minute(
"2022-06-11T15:00:00+00:00", "2022-06-11T17:00:00+00:00",
&[
"date=2022-06-11/hour=15/",
"date=2022-06-11/hour=16/"
]
)]
#[case::same_date_different_hours_incoherent_minutes(
"2022-06-11T15:59:00+00:00", "2022-06-11T16:01:00+00:00",
&[
"date=2022-06-11/hour=15/minute=59/",
"date=2022-06-11/hour=16/minute=00/"
]
)]
#[case::same_date_different_hours_whole_hours_between_incoherent_minutes(
"2022-06-11T15:59:00+00:00", "2022-06-11T17:01:00+00:00",
&[
"date=2022-06-11/hour=15/minute=59/",
"date=2022-06-11/hour=16/",
"date=2022-06-11/hour=17/minute=00/"
]
)]
#[case::different_date_coherent_hours_and_minutes(
"2022-06-11T00:00:00+00:00", "2022-06-13T00:00:00+00:00",
&[
"date=2022-06-11/",
"date=2022-06-12/"
]
)]
#[case::different_date_incoherent_hours_coherent_minutes(
"2022-06-11T23:00:01+00:00", "2022-06-12T01:59:59+00:00",
&[
"date=2022-06-11/hour=23/",
"date=2022-06-12/hour=00/",
"date=2022-06-12/hour=01/"
]
)]
#[case::different_date_incoherent_hours_incoherent_minutes(
"2022-06-11T23:59:59+00:00", "2022-06-12T00:01:00+00:00",
&[
"date=2022-06-11/hour=23/minute=59/",
"date=2022-06-12/hour=00/minute=00/"
]
)]
fn prefix_generation(#[case] start: &str, #[case] end: &str, #[case] right: &[&str]) {
let time_period = time_period_from_str(start, end);
let prefixes = time_period.generate_prefixes();
let left = prefixes.iter().map(String::as_str).collect::<Vec<&str>>();
assert_eq!(left.as_slice(), right);
}
}
Loading
Loading