Skip to content

Commit 4019f9f

Browse files
author
Devdutt Shenoi
committed
explain and refactor scan
1 parent d76dd5a commit 4019f9f

File tree

1 file changed

+29
-22
lines changed

1 file changed

+29
-22
lines changed

src/query/stream_schema_provider.rs

Lines changed: 29 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use crate::catalog::manifest::File;
2020
use crate::hottier::HotTierManager;
2121
use crate::option::Mode;
2222
use crate::{
23-
catalog::snapshot::{self, Snapshot},
23+
catalog::snapshot::Snapshot,
2424
storage::{ObjectStoreFormat, STREAM_ROOT_DIRECTORY},
2525
};
2626
use arrow_array::RecordBatch;
@@ -342,17 +342,20 @@ impl TableProvider for StandardTableProvider {
342342
.get_store(&self.url)
343343
.unwrap();
344344
let glob_storage = CONFIG.storage().get_object_store();
345-
346345
let object_store_format = glob_storage
347346
.get_object_store_format(&self.stream)
348347
.await
349348
.map_err(|err| DataFusionError::Plan(err.to_string()))?;
350349
let time_partition = object_store_format.time_partition;
350+
351351
let mut time_filters = extract_primary_filter(filters, time_partition.clone());
352352
if time_filters.is_empty() {
353-
return Err(DataFusionError::Plan("potentially unbounded query on time range. Table scanning requires atleast one time bound".to_string()));
353+
return Err(DataFusionError::Plan(
354+
"Table scanning requires at least one time bound for the query.".to_string(),
355+
));
354356
}
355357

358+
// Memory Execution Plan (for current stream data in memory)
356359
if include_now(filters, time_partition.clone()) {
357360
if let Some(records) =
358361
event::STREAM_WRITERS.recordbatches_cloned(&self.stream, &self.schema)
@@ -364,31 +367,30 @@ impl TableProvider for StandardTableProvider {
364367
.await?,
365368
);
366369
}
367-
};
368-
let mut merged_snapshot: snapshot::Snapshot = Snapshot::default();
369-
if CONFIG.parseable.mode == Mode::Query {
370+
}
371+
372+
// Merged snapshot creation for query mode
373+
let merged_snapshot = if CONFIG.parseable.mode == Mode::Query {
370374
let path = RelativePathBuf::from_iter([&self.stream, STREAM_ROOT_DIRECTORY]);
371-
let obs = glob_storage
375+
glob_storage
372376
.get_objects(
373377
Some(&path),
374378
Box::new(|file_name| file_name.ends_with("stream.json")),
375379
)
376-
.await;
377-
if let Ok(obs) = obs {
378-
for ob in obs {
379-
if let Ok(object_store_format) =
380-
serde_json::from_slice::<ObjectStoreFormat>(&ob)
381-
{
382-
let snapshot = object_store_format.snapshot;
383-
for manifest in snapshot.manifest_list {
384-
merged_snapshot.manifest_list.push(manifest);
385-
}
386-
}
387-
}
388-
}
380+
.await
381+
.ok()
382+
.map(|obs| {
383+
obs.into_iter()
384+
.fold(Snapshot::default(), |mut snapshot, ob| {
385+
let format = serde_json::from_slice::<ObjectStoreFormat>(&ob).unwrap();
386+
snapshot.manifest_list.extend(format.snapshot.manifest_list);
387+
snapshot
388+
})
389+
})
390+
.unwrap_or_default()
389391
} else {
390-
merged_snapshot = object_store_format.snapshot;
391-
}
392+
object_store_format.snapshot
393+
};
392394

393395
// Is query timerange is overlapping with older data.
394396
// if true, then get listing table time filters and execution plan separately
@@ -415,6 +417,7 @@ impl TableProvider for StandardTableProvider {
415417
};
416418
}
417419

420+
// Manifest file collection
418421
let mut manifest_files = collect_from_snapshot(
419422
&merged_snapshot,
420423
&time_filters,
@@ -465,6 +468,8 @@ impl TableProvider for StandardTableProvider {
465468
.await?;
466469
}
467470
}
471+
472+
// Found everything locally
468473
if manifest_files.is_empty() {
469474
QUERY_CACHE_HIT.with_label_values(&[&self.stream]).inc();
470475
return final_plan(
@@ -474,6 +479,7 @@ impl TableProvider for StandardTableProvider {
474479
);
475480
}
476481

482+
// Remote Execution Plan
477483
let (partitioned_files, statistics) = partitioned_files(manifest_files, &self.schema);
478484
let remote_exec = create_parquet_physical_plan(
479485
ObjectStoreUrl::parse(glob_storage.store_url()).unwrap(),
@@ -488,6 +494,7 @@ impl TableProvider for StandardTableProvider {
488494
)
489495
.await?;
490496

497+
// Combine all execution plans
491498
Ok(final_plan(
492499
vec![
493500
listing_exec,

0 commit comments

Comments
 (0)