Skip to content

Commit a04dd4f

Browse files
fix: Query Optimization (#696)
modified function that checks if query has starttime before the 1st manifest lower bound time to ensure server uses manifest for count(*) where starttime is greater than manifest creation date fix for ingestion with time partition fix for query with time partition fix for query optimization fixed get_first_event call for time partition random number generation logic changed while creating parquet file name
1 parent 94c95fc commit a04dd4f

File tree

7 files changed

+123
-60
lines changed

7 files changed

+123
-60
lines changed

server/src/catalog.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -210,7 +210,7 @@ pub async fn get_first_event(
210210
// get current snapshot
211211
let mut meta = storage.get_object_store_format(stream_name).await?;
212212
let manifests = &mut meta.snapshot.manifest_list;
213-
213+
let time_partition = meta.time_partition;
214214
if manifests.is_empty() {
215215
log::info!("No manifest found for stream {stream_name}");
216216
return Err(ObjectStorageError::Custom("No manifest found".to_string()));
@@ -232,9 +232,15 @@ pub async fn get_first_event(
232232
};
233233

234234
if let Some(first_event) = manifest.files.first() {
235-
let (lower_bound, _) = get_file_bounds(first_event, DEFAULT_TIMESTAMP_KEY.to_string());
236-
let first_event_at = lower_bound.with_timezone(&Local).to_rfc3339();
237-
return Ok(Some(first_event_at));
235+
if let Some(time_partition) = time_partition {
236+
let (lower_bound, _) = get_file_bounds(first_event, time_partition);
237+
let first_event_at = lower_bound.with_timezone(&Local).to_rfc3339();
238+
return Ok(Some(first_event_at));
239+
} else {
240+
let (lower_bound, _) = get_file_bounds(first_event, DEFAULT_TIMESTAMP_KEY.to_string());
241+
let first_event_at = lower_bound.with_timezone(&Local).to_rfc3339();
242+
return Ok(Some(first_event_at));
243+
}
238244
}
239245
Ok(None)
240246
}

server/src/metadata.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,13 @@ impl StreamInfo {
9292
.map(|metadata| metadata.cache_enabled)
9393
}
9494

95+
pub fn get_time_partition(&self, stream_name: &str) -> Result<Option<String>, MetadataError> {
96+
let map = self.read().expect(LOCK_EXPECT);
97+
map.get(stream_name)
98+
.ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string()))
99+
.map(|metadata| metadata.time_partition.clone())
100+
}
101+
95102
pub fn set_stream_cache(&self, stream_name: &str, enable: bool) -> Result<(), MetadataError> {
96103
let mut map = self.write().expect(LOCK_EXPECT);
97104
let stream = map

server/src/query.rs

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ impl Query {
130130
// this can be eliminated in later version of datafusion but with slight caveat
131131
// transform cannot modify stringified plans by itself
132132
// we by knowing this plan is not in the optimization procees chose to overwrite the stringified plan
133+
133134
match self.raw_logical_plan.clone() {
134135
LogicalPlan::Explain(plan) => {
135136
let transformed = transform(
@@ -221,7 +222,7 @@ fn transform(
221222
.clone();
222223

223224
let mut new_filters = vec![];
224-
if !table_contains_any_time_filters(&table) {
225+
if !table_contains_any_time_filters(&table, &time_partition) {
225226
let mut _start_time_filter: Expr;
226227
let mut _end_time_filter: Expr;
227228
match time_partition {
@@ -276,7 +277,10 @@ fn transform(
276277
.expect("transform only transforms the tablescan")
277278
}
278279

279-
fn table_contains_any_time_filters(table: &datafusion::logical_expr::TableScan) -> bool {
280+
fn table_contains_any_time_filters(
281+
table: &datafusion::logical_expr::TableScan,
282+
time_partition: &Option<String>,
283+
) -> bool {
280284
table
281285
.filters
282286
.iter()
@@ -287,7 +291,11 @@ fn table_contains_any_time_filters(table: &datafusion::logical_expr::TableScan)
287291
None
288292
}
289293
})
290-
.any(|expr| matches!(&*expr.left, Expr::Column(Column { name, .. }) if (name == event::DEFAULT_TIMESTAMP_KEY)))
294+
.any(|expr| {
295+
matches!(&*expr.left, Expr::Column(Column { name, .. })
296+
if ((time_partition.is_some() && name == time_partition.as_ref().unwrap()) ||
297+
(!time_partition.is_some() && name == event::DEFAULT_TIMESTAMP_KEY)))
298+
})
291299
}
292300

293301
#[allow(dead_code)]

server/src/query/listing_table_builder.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use datafusion::{
2525
listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl},
2626
},
2727
error::DataFusionError,
28-
logical_expr::col,
28+
logical_expr::{col, Expr},
2929
};
3030
use futures_util::{future, stream::FuturesUnordered, Future, TryStreamExt};
3131
use itertools::Itertools;
@@ -183,13 +183,19 @@ impl ListingTableBuilder {
183183
self,
184184
schema: Arc<Schema>,
185185
map: impl Fn(Vec<String>) -> Vec<ListingTableUrl>,
186+
time_partition: Option<String>,
186187
) -> Result<Option<Arc<ListingTable>>, DataFusionError> {
187188
if self.listing.is_empty() {
188189
return Ok(None);
189190
}
190-
191+
let file_sort_order: Vec<Vec<Expr>>;
191192
let file_format = ParquetFormat::default().with_enable_pruning(Some(true));
192-
let file_sort_order = vec![vec![col(DEFAULT_TIMESTAMP_KEY).sort(true, false)]];
193+
if let Some(time_partition) = time_partition {
194+
file_sort_order = vec![vec![col(time_partition).sort(true, false)]];
195+
} else {
196+
file_sort_order = vec![vec![col(DEFAULT_TIMESTAMP_KEY).sort(true, false)]];
197+
}
198+
193199
let listing_options = ListingOptions::new(Arc::new(file_format))
194200
.with_file_extension(".parquet")
195201
.with_file_sort_order(file_sort_order)

server/src/query/stream_schema_provider.rs

Lines changed: 37 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616
*
1717
*/
1818

19-
use std::{any::Any, collections::HashMap, ops::Bound, sync::Arc};
20-
2119
use arrow_array::RecordBatch;
2220
use arrow_schema::{Schema, SchemaRef, SortOptions};
2321
use bytes::Bytes;
@@ -46,6 +44,7 @@ use datafusion::{
4644
use futures_util::{stream::FuturesOrdered, StreamExt, TryFutureExt, TryStreamExt};
4745
use itertools::Itertools;
4846
use object_store::{path::Path, ObjectStore};
47+
use std::{any::Any, collections::HashMap, ops::Bound, sync::Arc};
4948
use url::Url;
5049

5150
use crate::{
@@ -114,6 +113,7 @@ async fn create_parquet_physical_plan(
114113
filters: &[Expr],
115114
limit: Option<usize>,
116115
state: &SessionState,
116+
time_partition: Option<String>,
117117
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
118118
let filters = if let Some(expr) = conjunction(filters.to_vec()) {
119119
let table_df_schema = schema.as_ref().clone().to_dfschema()?;
@@ -125,14 +125,18 @@ async fn create_parquet_physical_plan(
125125
};
126126

127127
let sort_expr = PhysicalSortExpr {
128-
expr: physical_plan::expressions::col(DEFAULT_TIMESTAMP_KEY, &schema)?,
128+
expr: if let Some(time_partition) = time_partition {
129+
physical_plan::expressions::col(&time_partition, &schema)?
130+
} else {
131+
physical_plan::expressions::col(DEFAULT_TIMESTAMP_KEY, &schema)?
132+
},
129133
options: SortOptions {
130134
descending: true,
131135
nulls_first: true,
132136
},
133137
};
134-
135138
let file_format = ParquetFormat::default().with_enable_pruning(Some(true));
139+
136140
// create the execution plan
137141
let plan = file_format
138142
.create_physical_plan(
@@ -151,7 +155,6 @@ async fn create_parquet_physical_plan(
151155
filters.as_ref(),
152156
)
153157
.await?;
154-
155158
Ok(plan)
156159
}
157160

@@ -209,7 +212,6 @@ fn partitioned_files(
209212
let mut partitioned_files = Vec::from_iter((0..target_partition).map(|_| Vec::new()));
210213
let mut column_statistics = HashMap::<String, Option<catalog::column::TypedStatistics>>::new();
211214
let mut count = 0;
212-
213215
for (index, file) in manifest_files
214216
.into_iter()
215217
.enumerate()
@@ -221,7 +223,6 @@ fn partitioned_files(
221223
columns,
222224
..
223225
} = file;
224-
225226
partitioned_files[index].push(PartitionedFile::new(file_path, file.file_size));
226227
columns.into_iter().for_each(|col| {
227228
column_statistics
@@ -235,7 +236,6 @@ fn partitioned_files(
235236
});
236237
count += num_rows;
237238
}
238-
239239
let statistics = table_schema
240240
.fields()
241241
.iter()
@@ -304,7 +304,7 @@ impl TableProvider for StandardTableProvider {
304304
return Err(DataFusionError::Plan("potentially unbounded query on time range. Table scanning requires atleast one time bound".to_string()));
305305
}
306306

307-
if include_now(filters, time_partition) {
307+
if include_now(filters, time_partition.clone()) {
308308
if let Some(records) =
309309
event::STREAM_WRITERS.recordbatches_cloned(&self.stream, &self.schema)
310310
{
@@ -333,6 +333,7 @@ impl TableProvider for StandardTableProvider {
333333
projection,
334334
filters,
335335
limit,
336+
time_partition.clone(),
336337
)
337338
.await;
338339
}
@@ -375,6 +376,7 @@ impl TableProvider for StandardTableProvider {
375376
filters,
376377
limit,
377378
state,
379+
time_partition.clone(),
378380
)
379381
.await?;
380382

@@ -400,6 +402,7 @@ impl TableProvider for StandardTableProvider {
400402
filters,
401403
limit,
402404
state,
405+
time_partition.clone(),
403406
)
404407
.await?;
405408

@@ -437,11 +440,16 @@ async fn legacy_listing_table(
437440
projection: Option<&Vec<usize>>,
438441
filters: &[Expr],
439442
limit: Option<usize>,
443+
time_partition: Option<String>,
440444
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
441445
let remote_table = ListingTableBuilder::new(stream)
442446
.populate_via_listing(glob_storage.clone(), object_store, time_filters)
443447
.and_then(|builder| async {
444-
let table = builder.build(schema.clone(), |x| glob_storage.query_prefixes(x))?;
448+
let table = builder.build(
449+
schema.clone(),
450+
|x| glob_storage.query_prefixes(x),
451+
time_partition,
452+
)?;
445453
let res = match table {
446454
Some(table) => Some(table.scan(state, projection, filters, limit).await?),
447455
_ => None,
@@ -459,6 +467,7 @@ fn final_plan(
459467
schema: Arc<Schema>,
460468
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
461469
let mut execution_plans = execution_plans.into_iter().flatten().collect_vec();
470+
462471
let exec: Arc<dyn ExecutionPlan> = if execution_plans.is_empty() {
463472
let schema = match projection {
464473
Some(projection) => Arc::new(schema.project(projection)?),
@@ -470,7 +479,6 @@ fn final_plan(
470479
} else {
471480
Arc::new(UnionExec::new(execution_plans))
472481
};
473-
474482
Ok(exec)
475483
}
476484

@@ -557,34 +565,33 @@ impl PartialTimeFilter {
557565
))))),
558566
))
559567
}
560-
561-
fn is_greater_than(&self, other: &NaiveDateTime) -> bool {
562-
match self {
563-
PartialTimeFilter::Low(Bound::Excluded(time)) => time >= other,
564-
PartialTimeFilter::Low(Bound::Included(time))
565-
| PartialTimeFilter::High(Bound::Excluded(time))
566-
| PartialTimeFilter::High(Bound::Included(time)) => time > other,
567-
PartialTimeFilter::Eq(time) => time > other,
568-
_ => unimplemented!(),
569-
}
570-
}
571568
}
572569

573570
fn is_overlapping_query(
574571
manifest_list: &[ManifestItem],
575572
time_filters: &[PartialTimeFilter],
576573
) -> bool {
577574
// This is for backwards compatiblity. Older table format relies on listing.
578-
// if the time is lower than upper bound of first file then we consider it overlapping
579-
let Some(first_entry_upper_bound) =
580-
manifest_list.iter().map(|file| file.time_upper_bound).min()
575+
// if the start time is lower than lower bound of first file then we consider it overlapping
576+
let Some(first_entry_lower_bound) =
577+
manifest_list.iter().map(|file| file.time_lower_bound).min()
581578
else {
582579
return true;
583580
};
584581

585-
!time_filters
586-
.iter()
587-
.all(|filter| filter.is_greater_than(&first_entry_upper_bound.naive_utc()))
582+
for filter in time_filters {
583+
match filter {
584+
PartialTimeFilter::Low(Bound::Excluded(time))
585+
| PartialTimeFilter::Low(Bound::Included(time)) => {
586+
if time < &first_entry_lower_bound.naive_utc() {
587+
return true;
588+
}
589+
}
590+
_ => {}
591+
}
592+
}
593+
594+
false
588595
}
589596

590597
fn include_now(filters: &[Expr], time_partition: Option<String>) -> bool {
@@ -862,7 +869,7 @@ mod tests {
862869
let res = is_overlapping_query(
863870
&manifest_items(),
864871
&[PartialTimeFilter::Low(std::ops::Bound::Included(
865-
datetime_min(2023, 12, 15).naive_utc(),
872+
datetime_min(2023, 12, 14).naive_utc(),
866873
))],
867874
);
868875

@@ -874,7 +881,7 @@ mod tests {
874881
let res = is_overlapping_query(
875882
&manifest_items(),
876883
&[PartialTimeFilter::Low(std::ops::Bound::Included(
877-
datetime_min(2023, 12, 15)
884+
datetime_min(2023, 12, 14)
878885
.naive_utc()
879886
.add(Duration::hours(3)),
880887
))],

server/src/storage/object_storage.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -335,8 +335,11 @@ pub trait ObjectStorage: Sync + 'static {
335335
let cache_enabled = STREAM_INFO
336336
.cache_enabled(stream)
337337
.map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?;
338+
let time_partition = STREAM_INFO
339+
.get_time_partition(stream)
340+
.map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?;
338341
let dir = StorageDir::new(stream);
339-
let schema = convert_disk_files_to_parquet(stream, &dir)
342+
let schema = convert_disk_files_to_parquet(stream, &dir, time_partition)
340343
.map_err(|err| ObjectStorageError::UnhandledError(Box::new(err)))?;
341344

342345
if let Some(schema) = schema {

0 commit comments

Comments
 (0)