From 14381dbaf5e7344c7490a5e363c11f52aa933cd5 Mon Sep 17 00:00:00 2001 From: anant Date: Mon, 26 Aug 2024 14:47:27 +0530 Subject: [PATCH 1/3] split time filters for listing+manifest --- server/src/query/stream_schema_provider.rs | 120 +++++++++++++++++---- 1 file changed, 98 insertions(+), 22 deletions(-) diff --git a/server/src/query/stream_schema_provider.rs b/server/src/query/stream_schema_provider.rs index 75440f91f..b472fa8ae 100644 --- a/server/src/query/stream_schema_provider.rs +++ b/server/src/query/stream_schema_provider.rs @@ -295,6 +295,7 @@ impl TableProvider for StandardTableProvider { let mut memory_exec = None; let mut cache_exec = None; let mut hot_tier_exec = None; + let mut remote_table = None; let object_store = state .runtime_env() .object_store_registry @@ -307,7 +308,7 @@ impl TableProvider for StandardTableProvider { .await .map_err(|err| DataFusionError::Plan(err.to_string()))?; let time_partition = object_store_format.time_partition; - let time_filters = extract_primary_filter(filters, time_partition.clone()); + let mut time_filters = extract_primary_filter(filters, time_partition.clone()); if time_filters.is_empty() { return Err(DataFusionError::Plan("potentially unbounded query on time range. Table scanning requires atleast one time bound".to_string())); } @@ -350,21 +351,31 @@ impl TableProvider for StandardTableProvider { } // Is query timerange is overlapping with older data. + // if true, then get listing table time filters and execution plan separately if is_overlapping_query(&merged_snapshot.manifest_list, &time_filters) { - return legacy_listing_table( - self.stream.clone(), - memory_exec, - glob_storage, - object_store, - &time_filters, - self.schema.clone(), - state, - projection, - filters, - limit, - time_partition.clone(), - ) - .await; + let (listing_time_fiters, new_time_filters) = + return_listing_time_filters(&merged_snapshot.manifest_list, time_filters); + + let listing_tables = if let Some(listing_time_filter) = listing_time_fiters { + legacy_listing_table( + self.stream.clone(), + glob_storage.clone(), + object_store.clone(), + &listing_time_filter, + self.schema.clone(), + state, + projection, + filters, + limit, + time_partition.clone(), + ) + .await? + } else { + None + }; + time_filters = new_time_filters; + + remote_table = listing_tables; } let mut manifest_files = collect_from_snapshot( @@ -377,7 +388,11 @@ impl TableProvider for StandardTableProvider { .await?; if manifest_files.is_empty() { - return final_plan(vec![memory_exec], projection, self.schema.clone()); + return final_plan( + vec![remote_table, memory_exec], + projection, + self.schema.clone(), + ); } // Based on entries in the manifest files, find them in the cache and create a physical plan. @@ -464,7 +479,7 @@ impl TableProvider for StandardTableProvider { if manifest_files.is_empty() { QUERY_CACHE_HIT.with_label_values(&[&self.stream]).inc(); return final_plan( - vec![memory_exec, cache_exec, hot_tier_exec], + vec![remote_table, memory_exec, cache_exec, hot_tier_exec], projection, self.schema.clone(), ); @@ -485,7 +500,13 @@ impl TableProvider for StandardTableProvider { .await?; Ok(final_plan( - vec![memory_exec, cache_exec, hot_tier_exec, Some(remote_exec)], + vec![ + remote_table, + memory_exec, + cache_exec, + hot_tier_exec, + Some(remote_exec), + ], projection, self.schema.clone(), )?) @@ -519,7 +540,6 @@ impl TableProvider for StandardTableProvider { #[allow(clippy::too_many_arguments)] async fn legacy_listing_table( stream: String, - mem_exec: Option>, glob_storage: Arc, object_store: Arc, time_filters: &[PartialTimeFilter], @@ -529,7 +549,7 @@ async fn legacy_listing_table( filters: &[Expr], limit: Option, time_partition: Option, -) -> Result, DataFusionError> { +) -> Result>, DataFusionError> { let remote_table = ListingTableBuilder::new(stream) .populate_via_listing(glob_storage.clone(), object_store, time_filters) .and_then(|builder| async { @@ -546,7 +566,7 @@ async fn legacy_listing_table( }) .await?; - final_plan(vec![mem_exec, remote_table], projection, schema) + Ok(remote_table) } fn final_plan( @@ -581,7 +601,7 @@ fn reversed_mem_table( MemTable::try_new(schema, vec![records]) } -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum PartialTimeFilter { Low(Bound), High(Bound), @@ -662,6 +682,62 @@ fn is_overlapping_query( false } +/// This function will accept time filters provided to the query and will split them +/// into listing time filters and manifest time filters +/// This makes parseable backwards compatible for when it did not have manifests +/// Logic- +/// The control flow will only come to this function if there exists data without manifest files +/// Two new time filter vec![] are created +/// For listing table time filters, we will use OG time filter low bound and either OG time filter upper bound +/// or manifest lower bound +/// For manifest time filter, we will manifest lower bound and OG upper bound +fn return_listing_time_filters( + manifest_list: &[ManifestItem], + time_filters: Vec, +) -> (Option>, Vec) { + // vec to hold timestamps for listing + let mut vec_listing_timestamps = Vec::new(); + + let Some(first_entry_lower_bound) = + manifest_list.iter().map(|file| file.time_lower_bound).min() + else { + return (None, time_filters); + }; + + let mut new_time_filters = vec![PartialTimeFilter::Low(Bound::Included( + first_entry_lower_bound.naive_utc(), + ))]; + + time_filters.into_iter().for_each(|filter| { + match filter { + // since we've already determined that there is a need to list tables, + // we just need to check whether the filter's upper bound is < manifest lower bound + PartialTimeFilter::High(Bound::Included(upper)) + | PartialTimeFilter::High(Bound::Excluded(upper)) => { + if upper.lt(&first_entry_lower_bound.naive_utc()) { + // filter upper bound is less than manifest lower bound, continue using filter upper bound + vec_listing_timestamps.push(filter.clone()); + } else { + // use manifest lower bound as excluded + vec_listing_timestamps.push(PartialTimeFilter::High(Bound::Excluded( + first_entry_lower_bound.naive_utc(), + ))); + } + new_time_filters.push(filter.clone()); + } + _ => { + vec_listing_timestamps.push(filter); + } + } + }); + + if vec_listing_timestamps.len().gt(&0) { + (Some(vec_listing_timestamps), new_time_filters) + } else { + (None, new_time_filters) + } +} + pub fn include_now(filters: &[Expr], time_partition: Option) -> bool { let current_minute = Utc::now() .with_second(0) From 03c4e06318dbf4b1bfa9a14e9f4b90020a59c0e7 Mon Sep 17 00:00:00 2001 From: anant Date: Tue, 27 Aug 2024 15:28:42 +0530 Subject: [PATCH 2/3] separate fns for hottier and cache exectuion plan --- server/src/query/stream_schema_provider.rs | 165 ++++++++++++++------- 1 file changed, 109 insertions(+), 56 deletions(-) diff --git a/server/src/query/stream_schema_provider.rs b/server/src/query/stream_schema_provider.rs index b472fa8ae..b2041ffbf 100644 --- a/server/src/query/stream_schema_provider.rs +++ b/server/src/query/stream_schema_provider.rs @@ -16,6 +16,7 @@ * */ +use crate::catalog::manifest::File; use crate::hottier::HotTierManager; use crate::Mode; use crate::{ @@ -397,30 +398,10 @@ impl TableProvider for StandardTableProvider { // Based on entries in the manifest files, find them in the cache and create a physical plan. if let Some(cache_manager) = LocalCacheManager::global() { - let (cached, remainder) = cache_manager - .partition_on_cached(&self.stream, manifest_files, |file| &file.file_path) - .await - .map_err(|err| DataFusionError::External(Box::new(err)))?; - - // Assign remaining entries back to manifest list - // This is to be used for remote query - manifest_files = remainder; - - let cached = cached - .into_iter() - .map(|(mut file, cache_path)| { - let cache_path = - object_store::path::Path::from_absolute_path(cache_path).unwrap(); - file.file_path = cache_path.to_string(); - file - }) - .collect(); - - let (partitioned_files, statistics) = partitioned_files(cached, &self.schema, 1); - let plan = create_parquet_physical_plan( - ObjectStoreUrl::parse("file:///").unwrap(), - partitioned_files, - statistics, + cache_exec = get_cache_exectuion_plan( + cache_manager, + &self.stream, + &mut manifest_files, self.schema.clone(), projection, filters, @@ -429,41 +410,15 @@ impl TableProvider for StandardTableProvider { time_partition.clone(), ) .await?; - - cache_exec = Some(plan) } // Hot tier data fetch if let Some(hot_tier_manager) = HotTierManager::global() { if hot_tier_manager.check_stream_hot_tier_exists(&self.stream) { - let (hot_tier_files, remainder) = hot_tier_manager - .get_hot_tier_manifest_files(&self.stream, manifest_files) - .await - .map_err(|err| DataFusionError::External(Box::new(err)))?; - // Assign remaining entries back to manifest list - // This is to be used for remote query - manifest_files = remainder; - - let hot_tier_files = hot_tier_files - .into_iter() - .map(|mut file| { - let path = CONFIG - .parseable - .hot_tier_storage_path - .as_ref() - .unwrap() - .join(&file.file_path); - file.file_path = path.to_str().unwrap().to_string(); - file - }) - .collect(); - - let (partitioned_files, statistics) = - partitioned_files(hot_tier_files, &self.schema, 1); - let plan = create_parquet_physical_plan( - ObjectStoreUrl::parse("file:///").unwrap(), - partitioned_files, - statistics, + hot_tier_exec = get_hottier_exectuion_plan( + hot_tier_manager, + &self.stream, + &mut manifest_files, self.schema.clone(), projection, filters, @@ -472,8 +427,6 @@ impl TableProvider for StandardTableProvider { time_partition.clone(), ) .await?; - - hot_tier_exec = Some(plan) } } if manifest_files.is_empty() { @@ -537,6 +490,106 @@ impl TableProvider for StandardTableProvider { } } +#[allow(clippy::too_many_arguments)] +async fn get_cache_exectuion_plan( + cache_manager: &LocalCacheManager, + stream: &str, + manifest_files: &mut Vec, + schema: Arc, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + state: &SessionState, + time_partition: Option, +) -> Result>, DataFusionError> { + let (cached, remainder) = cache_manager + .partition_on_cached(stream, manifest_files.clone(), |file: &File| { + &file.file_path + }) + .await + .map_err(|err| DataFusionError::External(Box::new(err)))?; + + // Assign remaining entries back to manifest list + // This is to be used for remote query + *manifest_files = remainder; + + let cached = cached + .into_iter() + .map(|(mut file, cache_path)| { + let cache_path = object_store::path::Path::from_absolute_path(cache_path).unwrap(); + file.file_path = cache_path.to_string(); + file + }) + .collect(); + + let (partitioned_files, statistics) = partitioned_files(cached, &schema, 1); + let plan = create_parquet_physical_plan( + ObjectStoreUrl::parse("file:///").unwrap(), + partitioned_files, + statistics, + schema.clone(), + projection, + filters, + limit, + state, + time_partition.clone(), + ) + .await?; + + Ok(Some(plan)) +} + +#[allow(clippy::too_many_arguments)] +async fn get_hottier_exectuion_plan( + hot_tier_manager: &HotTierManager, + stream: &str, + manifest_files: &mut Vec, + schema: Arc, + projection: Option<&Vec>, + filters: &[Expr], + limit: Option, + state: &SessionState, + time_partition: Option, +) -> Result>, DataFusionError> { + let (hot_tier_files, remainder) = hot_tier_manager + .get_hot_tier_manifest_files(stream, manifest_files.clone()) + .await + .map_err(|err| DataFusionError::External(Box::new(err)))?; + // Assign remaining entries back to manifest list + // This is to be used for remote query + *manifest_files = remainder; + + let hot_tier_files = hot_tier_files + .into_iter() + .map(|mut file| { + let path = CONFIG + .parseable + .hot_tier_storage_path + .as_ref() + .unwrap() + .join(&file.file_path); + file.file_path = path.to_str().unwrap().to_string(); + file + }) + .collect(); + + let (partitioned_files, statistics) = partitioned_files(hot_tier_files, &schema, 1); + let plan = create_parquet_physical_plan( + ObjectStoreUrl::parse("file:///").unwrap(), + partitioned_files, + statistics, + schema.clone(), + projection, + filters, + limit, + state, + time_partition.clone(), + ) + .await?; + + Ok(Some(plan)) +} + #[allow(clippy::too_many_arguments)] async fn legacy_listing_table( stream: String, From 625e5575c4aa7084f00531ca132f7031b67614a4 Mon Sep 17 00:00:00 2001 From: anant Date: Thu, 29 Aug 2024 22:02:43 +0530 Subject: [PATCH 3/3] code cleanup --- server/src/query/stream_schema_provider.rs | 47 +++++++++++----------- 1 file changed, 23 insertions(+), 24 deletions(-) diff --git a/server/src/query/stream_schema_provider.rs b/server/src/query/stream_schema_provider.rs index b2041ffbf..e5d6055d5 100644 --- a/server/src/query/stream_schema_provider.rs +++ b/server/src/query/stream_schema_provider.rs @@ -296,7 +296,7 @@ impl TableProvider for StandardTableProvider { let mut memory_exec = None; let mut cache_exec = None; let mut hot_tier_exec = None; - let mut remote_table = None; + let mut listing_exec = None; let object_store = state .runtime_env() .object_store_registry @@ -354,10 +354,10 @@ impl TableProvider for StandardTableProvider { // Is query timerange is overlapping with older data. // if true, then get listing table time filters and execution plan separately if is_overlapping_query(&merged_snapshot.manifest_list, &time_filters) { - let (listing_time_fiters, new_time_filters) = - return_listing_time_filters(&merged_snapshot.manifest_list, time_filters); + let listing_time_fiters = + return_listing_time_filters(&merged_snapshot.manifest_list, &mut time_filters); - let listing_tables = if let Some(listing_time_filter) = listing_time_fiters { + listing_exec = if let Some(listing_time_filter) = listing_time_fiters { legacy_listing_table( self.stream.clone(), glob_storage.clone(), @@ -374,9 +374,6 @@ impl TableProvider for StandardTableProvider { } else { None }; - time_filters = new_time_filters; - - remote_table = listing_tables; } let mut manifest_files = collect_from_snapshot( @@ -390,7 +387,7 @@ impl TableProvider for StandardTableProvider { if manifest_files.is_empty() { return final_plan( - vec![remote_table, memory_exec], + vec![listing_exec, memory_exec], projection, self.schema.clone(), ); @@ -432,7 +429,7 @@ impl TableProvider for StandardTableProvider { if manifest_files.is_empty() { QUERY_CACHE_HIT.with_label_values(&[&self.stream]).inc(); return final_plan( - vec![remote_table, memory_exec, cache_exec, hot_tier_exec], + vec![listing_exec, memory_exec, cache_exec, hot_tier_exec], projection, self.schema.clone(), ); @@ -454,7 +451,7 @@ impl TableProvider for StandardTableProvider { Ok(final_plan( vec![ - remote_table, + listing_exec, memory_exec, cache_exec, hot_tier_exec, @@ -746,48 +743,50 @@ fn is_overlapping_query( /// For manifest time filter, we will manifest lower bound and OG upper bound fn return_listing_time_filters( manifest_list: &[ManifestItem], - time_filters: Vec, -) -> (Option>, Vec) { + time_filters: &mut Vec, +) -> Option> { // vec to hold timestamps for listing let mut vec_listing_timestamps = Vec::new(); - let Some(first_entry_lower_bound) = - manifest_list.iter().map(|file| file.time_lower_bound).min() - else { - return (None, time_filters); - }; + let mut first_entry_lower_bound = manifest_list + .iter() + .map(|file| file.time_lower_bound.naive_utc()) + .min()?; let mut new_time_filters = vec![PartialTimeFilter::Low(Bound::Included( - first_entry_lower_bound.naive_utc(), + first_entry_lower_bound, ))]; - time_filters.into_iter().for_each(|filter| { + time_filters.iter_mut().for_each(|filter| { match filter { // since we've already determined that there is a need to list tables, // we just need to check whether the filter's upper bound is < manifest lower bound PartialTimeFilter::High(Bound::Included(upper)) | PartialTimeFilter::High(Bound::Excluded(upper)) => { - if upper.lt(&first_entry_lower_bound.naive_utc()) { + if upper.lt(&&mut first_entry_lower_bound) { // filter upper bound is less than manifest lower bound, continue using filter upper bound vec_listing_timestamps.push(filter.clone()); } else { // use manifest lower bound as excluded vec_listing_timestamps.push(PartialTimeFilter::High(Bound::Excluded( - first_entry_lower_bound.naive_utc(), + first_entry_lower_bound, ))); } new_time_filters.push(filter.clone()); } _ => { - vec_listing_timestamps.push(filter); + vec_listing_timestamps.push(filter.clone()); } } }); + // update time_filters + *time_filters = new_time_filters; + if vec_listing_timestamps.len().gt(&0) { - (Some(vec_listing_timestamps), new_time_filters) + Some(vec_listing_timestamps) } else { - (None, new_time_filters) + None } }