Skip to content

Commit 4b078e8

Browse files
committed
Polishing
1 parent 4037baa commit 4b078e8

File tree

10 files changed

+89
-213
lines changed

10 files changed

+89
-213
lines changed

datafusion/core/src/execution/session_state.rs

Lines changed: 0 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -554,28 +554,6 @@ impl SessionState {
554554

555555
/// Optimizes the logical plan by applying optimizer rules.
556556
pub fn optimize(&self, plan: &LogicalPlan) -> datafusion_common::Result<LogicalPlan> {
557-
// Special case for Pivot nodes with subqueries
558-
/*if let LogicalPlan::Pivot(p) = plan {
559-
if let Some(subquery) = &p.value_subquery {
560-
// Optimize the subquery first
561-
let optimized_subquery = self.optimizer.optimize(
562-
subquery.as_ref().clone(),
563-
self,
564-
|_, _| {},
565-
)?;
566-
567-
// Create a new Pivot with the optimized subquery
568-
return Ok(LogicalPlan::Pivot(datafusion_expr::Pivot {
569-
input: p.input.clone(),
570-
aggregate_expr: p.aggregate_expr.clone(),
571-
pivot_column: p.pivot_column.clone(),
572-
pivot_values: p.pivot_values.clone(),
573-
schema: p.schema.clone(),
574-
value_subquery: Some(Arc::new(optimized_subquery)),
575-
}));
576-
}
577-
}*/
578-
579557
if let LogicalPlan::Explain(e) = plan {
580558
let mut stringified_plans = e.stringified_plans.clone();
581559

@@ -665,9 +643,7 @@ impl SessionState {
665643
&self,
666644
logical_plan: &LogicalPlan,
667645
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
668-
println!("logical_plan_beore_optPIVOT328: {:#?}", logical_plan);
669646
let logical_plan = self.optimize(logical_plan)?;
670-
println!("logical_plan_PIVOT329: {:#?}", logical_plan);
671647
self.query_planner
672648
.create_physical_plan(&logical_plan, self)
673649
.await

datafusion/core/src/physical_planner.rs

Lines changed: 80 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -65,8 +65,8 @@ use arrow::datatypes::{Schema, SchemaRef};
6565
use datafusion_common::display::ToStringifiedPlan;
6666
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
6767
use datafusion_common::{
68-
exec_err, internal_datafusion_err, internal_err, not_impl_err, plan_err, DFSchema, DFSchemaRef,
69-
ScalarValue, Column, TableReference,
68+
exec_err, internal_datafusion_err, internal_err, not_impl_err, plan_err, DFSchema,
69+
ScalarValue, Column,
7070
};
7171
use datafusion_datasource::memory::MemorySourceConfig;
7272
use datafusion_expr::dml::{CopyTo, InsertOp, DmlStatement, WriteOp};
@@ -76,12 +76,7 @@ use datafusion_expr::expr::{
7676
};
7777
use datafusion_expr::expr_rewriter::unnormalize_cols;
7878
use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary;
79-
use datafusion_expr::{
80-
Analyze, DescribeTable, Explain, ExplainFormat, Extension, FetchType,
81-
Filter, JoinType, RecursiveQuery, SkipType, SortExpr, StringifiedPlan, WindowFrame,
82-
WindowFrameBound, SubqueryAlias,
83-
};
84-
use datafusion_execution::FunctionRegistry;
79+
use datafusion_expr::{Analyze, DescribeTable, Explain, ExplainFormat, Extension, FetchType, Filter, JoinType, RecursiveQuery, SkipType, SortExpr, StringifiedPlan, WindowFrame, WindowFrameBound, LogicalPlanBuilder, BinaryExpr};
8580
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
8681
use datafusion_physical_expr::expressions::Literal;
8782
use datafusion_physical_expr::LexOrdering;
@@ -97,8 +92,7 @@ use itertools::{multiunzip, Itertools};
9792
use log::{debug, trace};
9893
use sqlparser::ast::NullTreatment;
9994
use tokio::sync::Mutex;
100-
101-
use datafusion_sql::transform_pivot_to_aggregate;
95+
use datafusion_expr_common::operator::Operator;
10296

10397
use datafusion_physical_plan::collect;
10498

@@ -309,10 +303,8 @@ impl DefaultPhysicalPlanner {
309303
}
310304
1 => NodeState::ZeroOrOneChild,
311305
_ => {
312-
313306
let ready_children = Vec::with_capacity(node.inputs().len());
314307
let ready_children = Mutex::new(ready_children);
315-
316308
NodeState::TwoOrMoreChildren(ready_children)
317309
}
318310
};
@@ -924,21 +916,18 @@ impl DefaultPhysicalPlanner {
924916
} else {
925917
pivot.pivot_values.clone()
926918
};
927-
928-
if !pivot_values.is_empty() {
929-
// Transform Pivot into Aggregate plan with the resolved pivot values
919+
920+
return if !pivot_values.is_empty() {
930921
let agg_plan = transform_pivot_to_aggregate(
931922
Arc::new(pivot.input.as_ref().clone()),
932923
&pivot.aggregate_expr,
933924
&pivot.pivot_column,
934-
Some(pivot_values),
935-
None,
925+
pivot_values,
936926
)?;
937-
938-
// The schema information is already preserved in the agg_plan
939-
return self.create_physical_plan(&agg_plan, session_state).await;
927+
928+
self.create_physical_plan(&agg_plan, session_state).await
940929
} else {
941-
return plan_err!("PIVOT operation requires at least one value to pivot on");
930+
plan_err!("PIVOT operation requires at least one value to pivot on")
942931
}
943932
}
944933
// 2 Children
@@ -1764,6 +1753,76 @@ pub fn create_physical_sort_exprs(
17641753
.collect::<Result<LexOrdering>>()
17651754
}
17661755

1756+
/// Transform a PIVOT operation into a more standard Aggregate + Projection plan
1757+
/// For known pivot values, we create a projection that includes "IS NOT DISTINCT FROM" conditions
1758+
///
1759+
/// For example, for SUM(amount) PIVOT(quarter FOR quarter in ('2023_Q1', '2023_Q2')), we create:
1760+
/// - SUM(amount) FILTER (WHERE quarter IS NOT DISTINCT FROM '2023_Q1') AS "2023_Q1"
1761+
/// - SUM(amount) FILTER (WHERE quarter IS NOT DISTINCT FROM '2023_Q2') AS "2023_Q2"
1762+
///
1763+
pub fn transform_pivot_to_aggregate(
1764+
input: Arc<LogicalPlan>,
1765+
aggregate_expr: &Expr,
1766+
pivot_column: &Column,
1767+
pivot_values: Vec<ScalarValue>,
1768+
) -> Result<LogicalPlan> {
1769+
let df_schema = input.schema();
1770+
1771+
let all_columns: Vec<Column> = df_schema.columns();
1772+
1773+
// Filter to include only columns we want for GROUP BY
1774+
// (exclude pivot column and aggregate expression columns)
1775+
let group_by_columns: Vec<Expr> = all_columns
1776+
.into_iter()
1777+
.filter(|col| {
1778+
col.name != pivot_column.name
1779+
&& !aggregate_expr.column_refs().iter().any(|agg_col| agg_col.name == col.name)
1780+
})
1781+
.map(|col| Expr::Column(col))
1782+
.collect();
1783+
1784+
let builder = LogicalPlanBuilder::from(Arc::unwrap_or_clone(input.clone()));
1785+
1786+
let mut aggregate_exprs = Vec::new();
1787+
1788+
for value in &pivot_values {
1789+
let filter_condition = Expr::BinaryExpr(BinaryExpr::new(
1790+
Box::new(Expr::Column(pivot_column.clone())),
1791+
Operator::IsNotDistinctFrom,
1792+
Box::new(Expr::Literal(value.clone()))
1793+
));
1794+
1795+
let filtered_agg = match aggregate_expr {
1796+
Expr::AggregateFunction(agg) => {
1797+
let mut new_params = agg.params.clone();
1798+
new_params.filter = Some(Box::new(filter_condition));
1799+
Expr::AggregateFunction(AggregateFunction {
1800+
func: agg.func.clone(),
1801+
params: new_params,
1802+
})
1803+
},
1804+
_ => {
1805+
return plan_err!("Unsupported aggregate expression should always be AggregateFunction");
1806+
}
1807+
};
1808+
1809+
// Use the pivot value as the column name
1810+
let field_name = value.to_string().trim_matches('\'').to_string();
1811+
let aliased_agg = Expr::Alias(Alias {
1812+
expr: Box::new(filtered_agg),
1813+
relation: None,
1814+
name: field_name,
1815+
metadata: None,
1816+
});
1817+
1818+
aggregate_exprs.push(aliased_agg);
1819+
}
1820+
1821+
let aggregate_plan = builder.aggregate(group_by_columns, aggregate_exprs)?.build()?;
1822+
1823+
Ok(aggregate_plan)
1824+
}
1825+
17671826
impl DefaultPhysicalPlanner {
17681827
/// Handles capturing the various plans for EXPLAIN queries
17691828
///

datafusion/expr/src/logical_plan/display.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use crate::{
2424
expr_vec_fmt, Aggregate, DescribeTable, Distinct, DistinctOn, DmlStatement, Expr,
2525
Filter, Join, Limit, LogicalPlan, Partitioning, Projection, RecursiveQuery,
2626
Repartition, Sort, Subquery, SubqueryAlias, TableProviderFilterPushDown, TableScan,
27-
Unnest, Values, Window, Pivot,
27+
Unnest, Values, Window,
2828
};
2929

3030
use crate::dml::CopyTo;

datafusion/expr/src/logical_plan/plan.rs

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020
use std::cmp::Ordering;
2121
use std::collections::{BTreeMap, HashMap, HashSet};
2222
use std::fmt::{self, Debug, Display, Formatter};
23-
use std::fs::metadata;
2423
use std::hash::{Hash, Hasher};
2524
use std::str::FromStr;
2625
use std::sync::{Arc, LazyLock};
@@ -1179,10 +1178,10 @@ impl LogicalPlan {
11791178
Ok(new_plan)
11801179
}
11811180
LogicalPlan::Pivot(Pivot {
1182-
aggregate_expr,
1181+
aggregate_expr: _,
11831182
pivot_column,
11841183
pivot_values,
1185-
schema,
1184+
schema: _,
11861185
value_subquery,
11871186
..
11881187
}) => {
@@ -2345,7 +2344,6 @@ impl Pivot {
23452344
pivot_column: Column,
23462345
value_subquery: Arc<LogicalPlan>,
23472346
) -> Result<Self> {
2348-
// Create an empty schema - will be filled in when the subquery is executed
23492347
let schema = pivot_schema_without_values(
23502348
input.schema(),
23512349
&aggregate_expr,
@@ -2356,15 +2354,13 @@ impl Pivot {
23562354
input,
23572355
aggregate_expr,
23582356
pivot_column,
2359-
pivot_values: Vec::new(), // Will be populated during physical planning
2357+
pivot_values: Vec::new(),
23602358
schema: Arc::new(schema),
23612359
value_subquery: Some(value_subquery),
23622360
})
23632361
}
23642362
}
23652363

2366-
/// Create a pivot schema without knowing the pivot values
2367-
/// This is used when we have a subquery for pivot values
23682364
fn pivot_schema_without_values(
23692365
input_schema: &DFSchemaRef,
23702366
aggregate_expr: &Expr,
@@ -2387,7 +2383,6 @@ fn pivot_schema_without_values(
23872383
DFSchema::new_with_metadata(fields_with_table_ref, input_schema.metadata().clone())
23882384
}
23892385

2390-
/// Create a pivot schema with known pivot values
23912386
fn pivot_schema(
23922387
input_schema: &DFSchemaRef,
23932388
aggregate_expr: &Expr,
@@ -2396,14 +2391,12 @@ fn pivot_schema(
23962391
) -> Result<DFSchema> {
23972392
let mut fields = vec![];
23982393

2399-
// Include all fields except pivot and value columns
24002394
for field in input_schema.fields() {
24012395
if !aggregate_expr.column_refs().iter().any(|col| col.name() == field.name()) && field.name() != pivot_column.name() {
24022396
fields.push(field.clone());
24032397
}
24042398
}
24052399

2406-
// Add new fields for each pivot value
24072400
for pivot_value in pivot_values {
24082401
let field_name = format!("{}", pivot_value);
24092402
let data_type = aggregate_expr.get_type(input_schema)?;
@@ -5135,4 +5128,3 @@ digraph {
51355128
Ok(())
51365129
}
51375130
}
5138-

datafusion/optimizer/src/optimize_projections/mod.rs

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -383,23 +383,8 @@ fn optimize_projections(
383383
dependency_indices.clone(),
384384
)]
385385
}
386-
LogicalPlan::Pivot(pivot) => {
387-
return Ok(Transformed::no(plan)); // TODO: Implement this
388-
/*
389-
// For PIVOT operations, we need columns from the aggregate expression and the pivot column
390-
let mut pivot_indices = Vec::new();
391-
392-
// Add required indices for the pivot column
393-
if let Ok(idx) = pivot.input.schema().index_of_column(&pivot.pivot_column) {
394-
pivot_indices.push(idx);
395-
}
396-
397-
// Create RequiredIndices with these indices and add dependency from aggregate expression
398-
let required = RequiredIndices::new_from_indices(pivot_indices)
399-
.with_exprs(pivot.input.schema(), std::iter::once(&pivot.aggregate_expr))
400-
.with_projection_beneficial();
401-
402-
vec![required]*/
386+
LogicalPlan::Pivot(_) => {
387+
return Ok(Transformed::no(plan));
403388
},
404389

405390
};

datafusion/proto/src/logical_plan/mod.rs

Lines changed: 0 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1809,19 +1809,6 @@ impl AsLogicalPlan for LogicalPlanNode {
18091809
})
18101810
}
18111811
LogicalPlan::Pivot(_) => {
1812-
/*let input =
1813-
LogicalPlanNode::try_from_logical_plan(pivot.input.as_ref(), extension_codec)?;
1814-
Ok(LogicalPlanNode {
1815-
logical_plan_type: Some(LogicalPlanType::Pivot(Box::new(
1816-
protobuf::PivotNode {
1817-
input: Some(Box::new(input)),
1818-
aggregate_expr: pivot.aggregate_expr.clone(),
1819-
pivot_column: pivot.pivot_column.clone(),
1820-
pivot_values: pivot.pivot_values.clone(),
1821-
schema: convert_required!(*pivot.schema)?,
1822-
},
1823-
))),
1824-
})*/
18251812
Err(proto_error(
18261813
"LogicalPlan serde is not yet implemented for Statement",
18271814
))

datafusion/sql/src/lib.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,3 @@ mod values;
6262
)]
6363
pub use datafusion_common::{ResolvedTableReference, TableReference};
6464
pub use sqlparser;
65-
66-
// Re-export the transform_pivot_to_aggregate function
67-
pub use relation::transform_pivot_to_aggregate;

0 commit comments

Comments
 (0)