Skip to content

Commit d76dd5a

Browse files
author
Devdutt Shenoi
committed
explain and refactor physical plan creation
1 parent a146c98 commit d76dd5a

File tree

2 files changed

+39
-23
lines changed

2 files changed

+39
-23
lines changed

src/hottier.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ pub struct StreamHotTier {
6565
pub oldest_date_time_entry: Option<String>,
6666
}
6767

68+
#[derive(Debug)]
6869
pub struct HotTierManager {
6970
filesystem: LocalFileSystem,
7071
hot_tier_path: PathBuf,

src/query/stream_schema_provider.rs

Lines changed: 38 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ use bytes::Bytes;
2929
use chrono::{DateTime, NaiveDateTime, Timelike, Utc};
3030
use datafusion::catalog::Session;
3131
use datafusion::common::stats::Precision;
32-
use datafusion::logical_expr::utils::conjunction;
3332
use datafusion::{
3433
catalog::SchemaProvider,
3534
common::{
@@ -126,44 +125,60 @@ async fn create_parquet_physical_plan(
126125
state: &dyn Session,
127126
time_partition: Option<String>,
128127
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
129-
let filters = if let Some(expr) = conjunction(filters.to_vec()) {
130-
let table_df_schema = schema.as_ref().clone().to_dfschema()?;
131-
let filters = create_physical_expr(&expr, &table_df_schema, state.execution_props())?;
132-
Some(filters)
128+
// Convert filters to physical expressions if applicable
129+
let filters = filters
130+
.iter()
131+
.cloned()
132+
.reduce(|a, b| a.and(b))
133+
.map(|expr| {
134+
let table_df_schema = schema.as_ref().clone().to_dfschema()?;
135+
create_physical_expr(&expr, &table_df_schema, state.execution_props())
136+
})
137+
.transpose()?;
138+
139+
// Determine the column for sorting
140+
let sort_column = if let Some(partition_col) = &time_partition {
141+
physical_plan::expressions::col(partition_col, &schema)?
133142
} else {
134-
None
143+
physical_plan::expressions::col(DEFAULT_TIMESTAMP_KEY, &schema)?
135144
};
136145

146+
// Create the sort expression
137147
let sort_expr = PhysicalSortExpr {
138-
expr: if let Some(time_partition) = time_partition {
139-
physical_plan::expressions::col(&time_partition, &schema)?
140-
} else {
141-
physical_plan::expressions::col(DEFAULT_TIMESTAMP_KEY, &schema)?
142-
},
148+
expr: sort_column,
143149
options: SortOptions {
144150
descending: true,
145151
nulls_first: true,
146152
},
147153
};
154+
155+
// Configure the Parquet file format
148156
let file_format = ParquetFormat::default().with_enable_pruning(true);
149157

150-
// create the execution plan
158+
// Prepare the file scan configuration
159+
let file_scan_config = FileScanConfig {
160+
object_store_url,
161+
file_schema: schema.clone(),
162+
file_groups: partitions,
163+
statistics,
164+
projection: projection.cloned(),
165+
limit,
166+
output_ordering: vec![vec![sort_expr]],
167+
table_partition_cols: Vec::new(),
168+
};
169+
170+
// Generate the physical execution plan
151171
let plan = file_format
152172
.create_physical_plan(
153-
state.as_any().downcast_ref::<SessionState>().unwrap(), // Remove this when ParquetFormat catches up
154-
FileScanConfig {
155-
object_store_url,
156-
file_schema: schema.clone(),
157-
file_groups: partitions,
158-
statistics,
159-
projection: projection.cloned(),
160-
limit,
161-
output_ordering: vec![vec![sort_expr]],
162-
table_partition_cols: Vec::new(),
163-
},
173+
state
174+
.as_any()
175+
.downcast_ref::<SessionState>()
176+
.expect("Session must be a SessionState"),
177+
file_scan_config,
164178
filters.as_ref(),
165179
)
166180
.await?;
181+
167182
Ok(plan)
168183
}
169184

0 commit comments

Comments
 (0)