Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
284 changes: 206 additions & 78 deletions server/src/query/stream_schema_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*
*/

use crate::catalog::manifest::File;
use crate::hottier::HotTierManager;
use crate::Mode;
use crate::{
Expand Down Expand Up @@ -295,6 +296,7 @@ impl TableProvider for StandardTableProvider {
let mut memory_exec = None;
let mut cache_exec = None;
let mut hot_tier_exec = None;
let mut listing_exec = None;
let object_store = state
.runtime_env()
.object_store_registry
Expand All @@ -307,7 +309,7 @@ impl TableProvider for StandardTableProvider {
.await
.map_err(|err| DataFusionError::Plan(err.to_string()))?;
let time_partition = object_store_format.time_partition;
let time_filters = extract_primary_filter(filters, time_partition.clone());
let mut time_filters = extract_primary_filter(filters, time_partition.clone());
if time_filters.is_empty() {
return Err(DataFusionError::Plan("potentially unbounded query on time range. Table scanning requires atleast one time bound".to_string()));
}
Expand Down Expand Up @@ -350,21 +352,28 @@ impl TableProvider for StandardTableProvider {
}

// Is query timerange is overlapping with older data.
// if true, then get listing table time filters and execution plan separately
if is_overlapping_query(&merged_snapshot.manifest_list, &time_filters) {
return legacy_listing_table(
self.stream.clone(),
memory_exec,
glob_storage,
object_store,
&time_filters,
self.schema.clone(),
state,
projection,
filters,
limit,
time_partition.clone(),
)
.await;
let listing_time_fiters =
return_listing_time_filters(&merged_snapshot.manifest_list, &mut time_filters);

listing_exec = if let Some(listing_time_filter) = listing_time_fiters {
legacy_listing_table(
self.stream.clone(),
glob_storage.clone(),
object_store.clone(),
&listing_time_filter,
self.schema.clone(),
state,
projection,
filters,
limit,
time_partition.clone(),
)
.await?
} else {
None
};
}

let mut manifest_files = collect_from_snapshot(
Expand All @@ -377,35 +386,19 @@ impl TableProvider for StandardTableProvider {
.await?;

if manifest_files.is_empty() {
return final_plan(vec![memory_exec], projection, self.schema.clone());
return final_plan(
vec![listing_exec, memory_exec],
projection,
self.schema.clone(),
);
}

// Based on entries in the manifest files, find them in the cache and create a physical plan.
if let Some(cache_manager) = LocalCacheManager::global() {
let (cached, remainder) = cache_manager
.partition_on_cached(&self.stream, manifest_files, |file| &file.file_path)
.await
.map_err(|err| DataFusionError::External(Box::new(err)))?;

// Assign remaining entries back to manifest list
// This is to be used for remote query
manifest_files = remainder;

let cached = cached
.into_iter()
.map(|(mut file, cache_path)| {
let cache_path =
object_store::path::Path::from_absolute_path(cache_path).unwrap();
file.file_path = cache_path.to_string();
file
})
.collect();

let (partitioned_files, statistics) = partitioned_files(cached, &self.schema, 1);
let plan = create_parquet_physical_plan(
ObjectStoreUrl::parse("file:///").unwrap(),
partitioned_files,
statistics,
cache_exec = get_cache_exectuion_plan(
cache_manager,
&self.stream,
&mut manifest_files,
self.schema.clone(),
projection,
filters,
Expand All @@ -414,41 +407,15 @@ impl TableProvider for StandardTableProvider {
time_partition.clone(),
)
.await?;

cache_exec = Some(plan)
}

// Hot tier data fetch
if let Some(hot_tier_manager) = HotTierManager::global() {
if hot_tier_manager.check_stream_hot_tier_exists(&self.stream) {
let (hot_tier_files, remainder) = hot_tier_manager
.get_hot_tier_manifest_files(&self.stream, manifest_files)
.await
.map_err(|err| DataFusionError::External(Box::new(err)))?;
// Assign remaining entries back to manifest list
// This is to be used for remote query
manifest_files = remainder;

let hot_tier_files = hot_tier_files
.into_iter()
.map(|mut file| {
let path = CONFIG
.parseable
.hot_tier_storage_path
.as_ref()
.unwrap()
.join(&file.file_path);
file.file_path = path.to_str().unwrap().to_string();
file
})
.collect();

let (partitioned_files, statistics) =
partitioned_files(hot_tier_files, &self.schema, 1);
let plan = create_parquet_physical_plan(
ObjectStoreUrl::parse("file:///").unwrap(),
partitioned_files,
statistics,
hot_tier_exec = get_hottier_exectuion_plan(
hot_tier_manager,
&self.stream,
&mut manifest_files,
self.schema.clone(),
projection,
filters,
Expand All @@ -457,14 +424,12 @@ impl TableProvider for StandardTableProvider {
time_partition.clone(),
)
.await?;

hot_tier_exec = Some(plan)
}
}
if manifest_files.is_empty() {
QUERY_CACHE_HIT.with_label_values(&[&self.stream]).inc();
return final_plan(
vec![memory_exec, cache_exec, hot_tier_exec],
vec![listing_exec, memory_exec, cache_exec, hot_tier_exec],
projection,
self.schema.clone(),
);
Expand All @@ -485,7 +450,13 @@ impl TableProvider for StandardTableProvider {
.await?;

Ok(final_plan(
vec![memory_exec, cache_exec, hot_tier_exec, Some(remote_exec)],
vec![
listing_exec,
memory_exec,
cache_exec,
hot_tier_exec,
Some(remote_exec),
],
projection,
self.schema.clone(),
)?)
Expand Down Expand Up @@ -516,10 +487,109 @@ impl TableProvider for StandardTableProvider {
}
}

#[allow(clippy::too_many_arguments)]
async fn get_cache_exectuion_plan(
cache_manager: &LocalCacheManager,
stream: &str,
manifest_files: &mut Vec<File>,
schema: Arc<Schema>,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
state: &SessionState,
time_partition: Option<String>,
) -> Result<Option<Arc<dyn ExecutionPlan>>, DataFusionError> {
let (cached, remainder) = cache_manager
.partition_on_cached(stream, manifest_files.clone(), |file: &File| {
&file.file_path
})
.await
.map_err(|err| DataFusionError::External(Box::new(err)))?;

// Assign remaining entries back to manifest list
// This is to be used for remote query
*manifest_files = remainder;

let cached = cached
.into_iter()
.map(|(mut file, cache_path)| {
let cache_path = object_store::path::Path::from_absolute_path(cache_path).unwrap();
file.file_path = cache_path.to_string();
file
})
.collect();

let (partitioned_files, statistics) = partitioned_files(cached, &schema, 1);
let plan = create_parquet_physical_plan(
ObjectStoreUrl::parse("file:///").unwrap(),
partitioned_files,
statistics,
schema.clone(),
projection,
filters,
limit,
state,
time_partition.clone(),
)
.await?;

Ok(Some(plan))
}

#[allow(clippy::too_many_arguments)]
async fn get_hottier_exectuion_plan(
hot_tier_manager: &HotTierManager,
stream: &str,
manifest_files: &mut Vec<File>,
schema: Arc<Schema>,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
state: &SessionState,
time_partition: Option<String>,
) -> Result<Option<Arc<dyn ExecutionPlan>>, DataFusionError> {
let (hot_tier_files, remainder) = hot_tier_manager
.get_hot_tier_manifest_files(stream, manifest_files.clone())
.await
.map_err(|err| DataFusionError::External(Box::new(err)))?;
// Assign remaining entries back to manifest list
// This is to be used for remote query
*manifest_files = remainder;

let hot_tier_files = hot_tier_files
.into_iter()
.map(|mut file| {
let path = CONFIG
.parseable
.hot_tier_storage_path
.as_ref()
.unwrap()
.join(&file.file_path);
file.file_path = path.to_str().unwrap().to_string();
file
})
.collect();

let (partitioned_files, statistics) = partitioned_files(hot_tier_files, &schema, 1);
let plan = create_parquet_physical_plan(
ObjectStoreUrl::parse("file:///").unwrap(),
partitioned_files,
statistics,
schema.clone(),
projection,
filters,
limit,
state,
time_partition.clone(),
)
.await?;

Ok(Some(plan))
}

#[allow(clippy::too_many_arguments)]
async fn legacy_listing_table(
stream: String,
mem_exec: Option<Arc<dyn ExecutionPlan>>,
glob_storage: Arc<dyn ObjectStorage + Send>,
object_store: Arc<dyn ObjectStore>,
time_filters: &[PartialTimeFilter],
Expand All @@ -529,7 +599,7 @@ async fn legacy_listing_table(
filters: &[Expr],
limit: Option<usize>,
time_partition: Option<String>,
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
) -> Result<Option<Arc<dyn ExecutionPlan>>, DataFusionError> {
let remote_table = ListingTableBuilder::new(stream)
.populate_via_listing(glob_storage.clone(), object_store, time_filters)
.and_then(|builder| async {
Expand All @@ -546,7 +616,7 @@ async fn legacy_listing_table(
})
.await?;

final_plan(vec![mem_exec, remote_table], projection, schema)
Ok(remote_table)
}

fn final_plan(
Expand Down Expand Up @@ -581,7 +651,7 @@ fn reversed_mem_table(
MemTable::try_new(schema, vec![records])
}

#[derive(Debug)]
#[derive(Debug, Clone)]
pub enum PartialTimeFilter {
Low(Bound<NaiveDateTime>),
High(Bound<NaiveDateTime>),
Expand Down Expand Up @@ -662,6 +732,64 @@ fn is_overlapping_query(
false
}

/// This function will accept time filters provided to the query and will split them
/// into listing time filters and manifest time filters
/// This makes parseable backwards compatible for when it did not have manifests
/// Logic-
/// The control flow will only come to this function if there exists data without manifest files
/// Two new time filter vec![] are created
/// For listing table time filters, we will use OG time filter low bound and either OG time filter upper bound
/// or manifest lower bound
/// For manifest time filter, we will manifest lower bound and OG upper bound
fn return_listing_time_filters(
manifest_list: &[ManifestItem],
time_filters: &mut Vec<PartialTimeFilter>,
) -> Option<Vec<PartialTimeFilter>> {
// vec to hold timestamps for listing
let mut vec_listing_timestamps = Vec::new();

let mut first_entry_lower_bound = manifest_list
.iter()
.map(|file| file.time_lower_bound.naive_utc())
.min()?;

let mut new_time_filters = vec![PartialTimeFilter::Low(Bound::Included(
first_entry_lower_bound,
))];

time_filters.iter_mut().for_each(|filter| {
match filter {
// since we've already determined that there is a need to list tables,
// we just need to check whether the filter's upper bound is < manifest lower bound
PartialTimeFilter::High(Bound::Included(upper))
| PartialTimeFilter::High(Bound::Excluded(upper)) => {
if upper.lt(&&mut first_entry_lower_bound) {
// filter upper bound is less than manifest lower bound, continue using filter upper bound
vec_listing_timestamps.push(filter.clone());
} else {
// use manifest lower bound as excluded
vec_listing_timestamps.push(PartialTimeFilter::High(Bound::Excluded(
first_entry_lower_bound,
)));
}
new_time_filters.push(filter.clone());
}
_ => {
vec_listing_timestamps.push(filter.clone());
}
}
});

// update time_filters
*time_filters = new_time_filters;

if vec_listing_timestamps.len().gt(&0) {
Some(vec_listing_timestamps)
} else {
None
}
}

pub fn include_now(filters: &[Expr], time_partition: Option<String>) -> bool {
let current_minute = Utc::now()
.with_second(0)
Expand Down