diff --git a/server/src/catalog.rs b/server/src/catalog.rs index f44159612..c4c720933 100644 --- a/server/src/catalog.rs +++ b/server/src/catalog.rs @@ -210,7 +210,7 @@ pub async fn get_first_event( // get current snapshot let mut meta = storage.get_object_store_format(stream_name).await?; let manifests = &mut meta.snapshot.manifest_list; - + let time_partition = meta.time_partition; if manifests.is_empty() { log::info!("No manifest found for stream {stream_name}"); return Err(ObjectStorageError::Custom("No manifest found".to_string())); @@ -232,9 +232,15 @@ pub async fn get_first_event( }; if let Some(first_event) = manifest.files.first() { - let (lower_bound, _) = get_file_bounds(first_event, DEFAULT_TIMESTAMP_KEY.to_string()); - let first_event_at = lower_bound.with_timezone(&Local).to_rfc3339(); - return Ok(Some(first_event_at)); + if let Some(time_partition) = time_partition { + let (lower_bound, _) = get_file_bounds(first_event, time_partition); + let first_event_at = lower_bound.with_timezone(&Local).to_rfc3339(); + return Ok(Some(first_event_at)); + } else { + let (lower_bound, _) = get_file_bounds(first_event, DEFAULT_TIMESTAMP_KEY.to_string()); + let first_event_at = lower_bound.with_timezone(&Local).to_rfc3339(); + return Ok(Some(first_event_at)); + } } Ok(None) } diff --git a/server/src/metadata.rs b/server/src/metadata.rs index a6e7f601c..b20478dbc 100644 --- a/server/src/metadata.rs +++ b/server/src/metadata.rs @@ -92,6 +92,13 @@ impl StreamInfo { .map(|metadata| metadata.cache_enabled) } + pub fn get_time_partition(&self, stream_name: &str) -> Result, MetadataError> { + let map = self.read().expect(LOCK_EXPECT); + map.get(stream_name) + .ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string())) + .map(|metadata| metadata.time_partition.clone()) + } + pub fn set_stream_cache(&self, stream_name: &str, enable: bool) -> Result<(), MetadataError> { let mut map = self.write().expect(LOCK_EXPECT); let stream = map diff --git a/server/src/query.rs b/server/src/query.rs index 120143f6a..9c9d45570 100644 --- a/server/src/query.rs +++ b/server/src/query.rs @@ -130,6 +130,7 @@ impl Query { // this can be eliminated in later version of datafusion but with slight caveat // transform cannot modify stringified plans by itself // we by knowing this plan is not in the optimization procees chose to overwrite the stringified plan + match self.raw_logical_plan.clone() { LogicalPlan::Explain(plan) => { let transformed = transform( @@ -221,7 +222,7 @@ fn transform( .clone(); let mut new_filters = vec![]; - if !table_contains_any_time_filters(&table) { + if !table_contains_any_time_filters(&table, &time_partition) { let mut _start_time_filter: Expr; let mut _end_time_filter: Expr; match time_partition { @@ -276,7 +277,10 @@ fn transform( .expect("transform only transforms the tablescan") } -fn table_contains_any_time_filters(table: &datafusion::logical_expr::TableScan) -> bool { +fn table_contains_any_time_filters( + table: &datafusion::logical_expr::TableScan, + time_partition: &Option, +) -> bool { table .filters .iter() @@ -287,7 +291,11 @@ fn table_contains_any_time_filters(table: &datafusion::logical_expr::TableScan) None } }) - .any(|expr| matches!(&*expr.left, Expr::Column(Column { name, .. }) if (name == event::DEFAULT_TIMESTAMP_KEY))) + .any(|expr| { + matches!(&*expr.left, Expr::Column(Column { name, .. }) + if ((time_partition.is_some() && name == time_partition.as_ref().unwrap()) || + (!time_partition.is_some() && name == event::DEFAULT_TIMESTAMP_KEY))) + }) } #[allow(dead_code)] diff --git a/server/src/query/listing_table_builder.rs b/server/src/query/listing_table_builder.rs index 59bf05a3a..669d53c61 100644 --- a/server/src/query/listing_table_builder.rs +++ b/server/src/query/listing_table_builder.rs @@ -25,7 +25,7 @@ use datafusion::{ listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl}, }, error::DataFusionError, - logical_expr::col, + logical_expr::{col, Expr}, }; use futures_util::{future, stream::FuturesUnordered, Future, TryStreamExt}; use itertools::Itertools; @@ -183,13 +183,19 @@ impl ListingTableBuilder { self, schema: Arc, map: impl Fn(Vec) -> Vec, + time_partition: Option, ) -> Result>, DataFusionError> { if self.listing.is_empty() { return Ok(None); } - + let file_sort_order: Vec>; let file_format = ParquetFormat::default().with_enable_pruning(Some(true)); - let file_sort_order = vec![vec![col(DEFAULT_TIMESTAMP_KEY).sort(true, false)]]; + if let Some(time_partition) = time_partition { + file_sort_order = vec![vec![col(time_partition).sort(true, false)]]; + } else { + file_sort_order = vec![vec![col(DEFAULT_TIMESTAMP_KEY).sort(true, false)]]; + } + let listing_options = ListingOptions::new(Arc::new(file_format)) .with_file_extension(".parquet") .with_file_sort_order(file_sort_order) diff --git a/server/src/query/stream_schema_provider.rs b/server/src/query/stream_schema_provider.rs index 3a99b3623..5599dce53 100644 --- a/server/src/query/stream_schema_provider.rs +++ b/server/src/query/stream_schema_provider.rs @@ -16,8 +16,6 @@ * */ -use std::{any::Any, collections::HashMap, ops::Bound, sync::Arc}; - use arrow_array::RecordBatch; use arrow_schema::{Schema, SchemaRef, SortOptions}; use bytes::Bytes; @@ -46,6 +44,7 @@ use datafusion::{ use futures_util::{stream::FuturesOrdered, StreamExt, TryFutureExt, TryStreamExt}; use itertools::Itertools; use object_store::{path::Path, ObjectStore}; +use std::{any::Any, collections::HashMap, ops::Bound, sync::Arc}; use url::Url; use crate::{ @@ -114,6 +113,7 @@ async fn create_parquet_physical_plan( filters: &[Expr], limit: Option, state: &SessionState, + time_partition: Option, ) -> Result, DataFusionError> { let filters = if let Some(expr) = conjunction(filters.to_vec()) { let table_df_schema = schema.as_ref().clone().to_dfschema()?; @@ -125,14 +125,18 @@ async fn create_parquet_physical_plan( }; let sort_expr = PhysicalSortExpr { - expr: physical_plan::expressions::col(DEFAULT_TIMESTAMP_KEY, &schema)?, + expr: if let Some(time_partition) = time_partition { + physical_plan::expressions::col(&time_partition, &schema)? + } else { + physical_plan::expressions::col(DEFAULT_TIMESTAMP_KEY, &schema)? + }, options: SortOptions { descending: true, nulls_first: true, }, }; - let file_format = ParquetFormat::default().with_enable_pruning(Some(true)); + // create the execution plan let plan = file_format .create_physical_plan( @@ -151,7 +155,6 @@ async fn create_parquet_physical_plan( filters.as_ref(), ) .await?; - Ok(plan) } @@ -209,7 +212,6 @@ fn partitioned_files( let mut partitioned_files = Vec::from_iter((0..target_partition).map(|_| Vec::new())); let mut column_statistics = HashMap::>::new(); let mut count = 0; - for (index, file) in manifest_files .into_iter() .enumerate() @@ -221,7 +223,6 @@ fn partitioned_files( columns, .. } = file; - partitioned_files[index].push(PartitionedFile::new(file_path, file.file_size)); columns.into_iter().for_each(|col| { column_statistics @@ -235,7 +236,6 @@ fn partitioned_files( }); count += num_rows; } - let statistics = table_schema .fields() .iter() @@ -304,7 +304,7 @@ impl TableProvider for StandardTableProvider { return Err(DataFusionError::Plan("potentially unbounded query on time range. Table scanning requires atleast one time bound".to_string())); } - if include_now(filters, time_partition) { + if include_now(filters, time_partition.clone()) { if let Some(records) = event::STREAM_WRITERS.recordbatches_cloned(&self.stream, &self.schema) { @@ -333,6 +333,7 @@ impl TableProvider for StandardTableProvider { projection, filters, limit, + time_partition.clone(), ) .await; } @@ -375,6 +376,7 @@ impl TableProvider for StandardTableProvider { filters, limit, state, + time_partition.clone(), ) .await?; @@ -400,6 +402,7 @@ impl TableProvider for StandardTableProvider { filters, limit, state, + time_partition.clone(), ) .await?; @@ -437,11 +440,16 @@ async fn legacy_listing_table( projection: Option<&Vec>, filters: &[Expr], limit: Option, + time_partition: Option, ) -> Result, DataFusionError> { let remote_table = ListingTableBuilder::new(stream) .populate_via_listing(glob_storage.clone(), object_store, time_filters) .and_then(|builder| async { - let table = builder.build(schema.clone(), |x| glob_storage.query_prefixes(x))?; + let table = builder.build( + schema.clone(), + |x| glob_storage.query_prefixes(x), + time_partition, + )?; let res = match table { Some(table) => Some(table.scan(state, projection, filters, limit).await?), _ => None, @@ -459,6 +467,7 @@ fn final_plan( schema: Arc, ) -> Result, DataFusionError> { let mut execution_plans = execution_plans.into_iter().flatten().collect_vec(); + let exec: Arc = if execution_plans.is_empty() { let schema = match projection { Some(projection) => Arc::new(schema.project(projection)?), @@ -470,7 +479,6 @@ fn final_plan( } else { Arc::new(UnionExec::new(execution_plans)) }; - Ok(exec) } @@ -557,17 +565,6 @@ impl PartialTimeFilter { ))))), )) } - - fn is_greater_than(&self, other: &NaiveDateTime) -> bool { - match self { - PartialTimeFilter::Low(Bound::Excluded(time)) => time >= other, - PartialTimeFilter::Low(Bound::Included(time)) - | PartialTimeFilter::High(Bound::Excluded(time)) - | PartialTimeFilter::High(Bound::Included(time)) => time > other, - PartialTimeFilter::Eq(time) => time > other, - _ => unimplemented!(), - } - } } fn is_overlapping_query( @@ -575,16 +572,26 @@ fn is_overlapping_query( time_filters: &[PartialTimeFilter], ) -> bool { // This is for backwards compatiblity. Older table format relies on listing. - // if the time is lower than upper bound of first file then we consider it overlapping - let Some(first_entry_upper_bound) = - manifest_list.iter().map(|file| file.time_upper_bound).min() + // if the start time is lower than lower bound of first file then we consider it overlapping + let Some(first_entry_lower_bound) = + manifest_list.iter().map(|file| file.time_lower_bound).min() else { return true; }; - !time_filters - .iter() - .all(|filter| filter.is_greater_than(&first_entry_upper_bound.naive_utc())) + for filter in time_filters { + match filter { + PartialTimeFilter::Low(Bound::Excluded(time)) + | PartialTimeFilter::Low(Bound::Included(time)) => { + if time < &first_entry_lower_bound.naive_utc() { + return true; + } + } + _ => {} + } + } + + false } fn include_now(filters: &[Expr], time_partition: Option) -> bool { @@ -862,7 +869,7 @@ mod tests { let res = is_overlapping_query( &manifest_items(), &[PartialTimeFilter::Low(std::ops::Bound::Included( - datetime_min(2023, 12, 15).naive_utc(), + datetime_min(2023, 12, 14).naive_utc(), ))], ); @@ -874,7 +881,7 @@ mod tests { let res = is_overlapping_query( &manifest_items(), &[PartialTimeFilter::Low(std::ops::Bound::Included( - datetime_min(2023, 12, 15) + datetime_min(2023, 12, 14) .naive_utc() .add(Duration::hours(3)), ))], diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 9f016f5a9..ddd057ebb 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -335,8 +335,11 @@ pub trait ObjectStorage: Sync + 'static { let cache_enabled = STREAM_INFO .cache_enabled(stream) .map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?; + let time_partition = STREAM_INFO + .get_time_partition(stream) + .map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?; let dir = StorageDir::new(stream); - let schema = convert_disk_files_to_parquet(stream, &dir) + let schema = convert_disk_files_to_parquet(stream, &dir, time_partition) .map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?; if let Some(schema) = schema { diff --git a/server/src/storage/staging.rs b/server/src/storage/staging.rs index c07d0b24b..5388d130d 100644 --- a/server/src/storage/staging.rs +++ b/server/src/storage/staging.rs @@ -43,7 +43,7 @@ use crate::{ storage::OBJECT_STORE_DATA_GRANULARITY, utils::{self, arrow::merged_reader::MergedReverseRecordReader}, }; -use rand::Rng; +use rand::distributions::DistString; const ARROW_FILE_EXTENSION: &str = "data.arrows"; const PARQUET_FILE_EXTENSION: &str = "data.parquet"; @@ -163,10 +163,10 @@ impl StorageDir { fn arrow_path_to_parquet(path: &Path) -> PathBuf { let file_stem = path.file_stem().unwrap().to_str().unwrap(); - let mut rng = rand::thread_rng(); - let random_number: u64 = rng.gen(); + let random_string = + rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 20); let (_, filename) = file_stem.split_once('.').unwrap(); - let filename_with_random_number = format!("{}.{}.{}", filename, random_number, "arrows"); + let filename_with_random_number = format!("{}.{}.{}", filename, random_string, "arrows"); let mut parquet_path = path.to_owned(); parquet_path.set_file_name(filename_with_random_number); parquet_path.set_extension("parquet"); @@ -185,6 +185,7 @@ pub fn to_parquet_path(stream_name: &str, time: NaiveDateTime) -> PathBuf { pub fn convert_disk_files_to_parquet( stream: &str, dir: &StorageDir, + time_partition: Option, ) -> Result, MoveDataError> { let mut schemas = Vec::new(); @@ -209,10 +210,15 @@ pub fn convert_disk_files_to_parquet( } let record_reader = MergedReverseRecordReader::try_new(&files).unwrap(); + let merged_schema = record_reader.merged_schema(); + let mut index_time_partition: usize = 0; + if let Some(time_partition) = time_partition.as_ref() { + index_time_partition = merged_schema.index_of(time_partition).unwrap(); + } let parquet_file = fs::File::create(&parquet_path).map_err(|_| MoveDataError::Create)?; - let props = parquet_writer_props().build(); - let merged_schema = record_reader.merged_schema(); + let props = parquet_writer_props(time_partition.clone(), index_time_partition).build(); + schemas.push(merged_schema.clone()); let schema = Arc::new(merged_schema); let mut writer = ArrowWriter::try_new(parquet_file, schema.clone(), Some(props))?; @@ -238,19 +244,39 @@ pub fn convert_disk_files_to_parquet( } } -fn parquet_writer_props() -> WriterPropertiesBuilder { - WriterProperties::builder() - .set_max_row_group_size(CONFIG.parseable.row_group_size) - .set_compression(CONFIG.parseable.parquet_compression.into()) - .set_column_encoding( - ColumnPath::new(vec![DEFAULT_TIMESTAMP_KEY.to_string()]), - Encoding::DELTA_BINARY_PACKED, - ) - .set_sorting_columns(Some(vec![SortingColumn { - column_idx: 0, - descending: true, - nulls_first: true, - }])) +fn parquet_writer_props( + time_partition: Option, + index_time_partition: usize, +) -> WriterPropertiesBuilder { + let index_time_partition: i32 = index_time_partition as i32; + + if let Some(time_partition) = time_partition { + WriterProperties::builder() + .set_max_row_group_size(CONFIG.parseable.row_group_size) + .set_compression(CONFIG.parseable.parquet_compression.into()) + .set_column_encoding( + ColumnPath::new(vec![time_partition]), + Encoding::DELTA_BYTE_ARRAY, + ) + .set_sorting_columns(Some(vec![SortingColumn { + column_idx: index_time_partition, + descending: true, + nulls_first: true, + }])) + } else { + WriterProperties::builder() + .set_max_row_group_size(CONFIG.parseable.row_group_size) + .set_compression(CONFIG.parseable.parquet_compression.into()) + .set_column_encoding( + ColumnPath::new(vec![DEFAULT_TIMESTAMP_KEY.to_string()]), + Encoding::DELTA_BINARY_PACKED, + ) + .set_sorting_columns(Some(vec![SortingColumn { + column_idx: index_time_partition, + descending: true, + nulls_first: true, + }])) + } } #[derive(Debug, thiserror::Error)]