Skip to content

Commit 4037baa

Browse files
Denys TsomenkoVedin
authored andcommitted
Pivot initial changes
1 parent d20b6d1 commit 4037baa

File tree

19 files changed

+988
-25
lines changed

19 files changed

+988
-25
lines changed

datafusion/core/src/execution/session_state.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -554,6 +554,28 @@ impl SessionState {
554554

555555
/// Optimizes the logical plan by applying optimizer rules.
556556
pub fn optimize(&self, plan: &LogicalPlan) -> datafusion_common::Result<LogicalPlan> {
557+
// Special case for Pivot nodes with subqueries
558+
/*if let LogicalPlan::Pivot(p) = plan {
559+
if let Some(subquery) = &p.value_subquery {
560+
// Optimize the subquery first
561+
let optimized_subquery = self.optimizer.optimize(
562+
subquery.as_ref().clone(),
563+
self,
564+
|_, _| {},
565+
)?;
566+
567+
// Create a new Pivot with the optimized subquery
568+
return Ok(LogicalPlan::Pivot(datafusion_expr::Pivot {
569+
input: p.input.clone(),
570+
aggregate_expr: p.aggregate_expr.clone(),
571+
pivot_column: p.pivot_column.clone(),
572+
pivot_values: p.pivot_values.clone(),
573+
schema: p.schema.clone(),
574+
value_subquery: Some(Arc::new(optimized_subquery)),
575+
}));
576+
}
577+
}*/
578+
557579
if let LogicalPlan::Explain(e) = plan {
558580
let mut stringified_plans = e.stringified_plans.clone();
559581

@@ -643,7 +665,9 @@ impl SessionState {
643665
&self,
644666
logical_plan: &LogicalPlan,
645667
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
668+
println!("logical_plan_beore_optPIVOT328: {:#?}", logical_plan);
646669
let logical_plan = self.optimize(logical_plan)?;
670+
println!("logical_plan_PIVOT329: {:#?}", logical_plan);
647671
self.query_planner
648672
.create_physical_plan(&logical_plan, self)
649673
.await

datafusion/core/src/physical_planner.rs

Lines changed: 97 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -65,22 +65,23 @@ use arrow::datatypes::{Schema, SchemaRef};
6565
use datafusion_common::display::ToStringifiedPlan;
6666
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
6767
use datafusion_common::{
68-
exec_err, internal_datafusion_err, internal_err, not_impl_err, plan_err, DFSchema,
69-
ScalarValue,
68+
exec_err, internal_datafusion_err, internal_err, not_impl_err, plan_err, DFSchema, DFSchemaRef,
69+
ScalarValue, Column, TableReference,
7070
};
7171
use datafusion_datasource::memory::MemorySourceConfig;
72-
use datafusion_expr::dml::{CopyTo, InsertOp};
72+
use datafusion_expr::dml::{CopyTo, InsertOp, DmlStatement, WriteOp};
7373
use datafusion_expr::expr::{
7474
physical_name, AggregateFunction, AggregateFunctionParams, Alias, GroupingSet,
7575
WindowFunction, WindowFunctionParams,
7676
};
7777
use datafusion_expr::expr_rewriter::unnormalize_cols;
7878
use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary;
7979
use datafusion_expr::{
80-
Analyze, DescribeTable, DmlStatement, Explain, ExplainFormat, Extension, FetchType,
80+
Analyze, DescribeTable, Explain, ExplainFormat, Extension, FetchType,
8181
Filter, JoinType, RecursiveQuery, SkipType, SortExpr, StringifiedPlan, WindowFrame,
82-
WindowFrameBound, WriteOp,
82+
WindowFrameBound, SubqueryAlias,
8383
};
84+
use datafusion_execution::FunctionRegistry;
8485
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
8586
use datafusion_physical_expr::expressions::Literal;
8687
use datafusion_physical_expr::LexOrdering;
@@ -97,6 +98,10 @@ use log::{debug, trace};
9798
use sqlparser::ast::NullTreatment;
9899
use tokio::sync::Mutex;
99100

101+
use datafusion_sql::transform_pivot_to_aggregate;
102+
103+
use datafusion_physical_plan::collect;
104+
100105
/// Physical query planner that converts a `LogicalPlan` to an
101106
/// `ExecutionPlan` suitable for execution.
102107
#[async_trait]
@@ -304,8 +309,10 @@ impl DefaultPhysicalPlanner {
304309
}
305310
1 => NodeState::ZeroOrOneChild,
306311
_ => {
312+
307313
let ready_children = Vec::with_capacity(node.inputs().len());
308314
let ready_children = Mutex::new(ready_children);
315+
309316
NodeState::TwoOrMoreChildren(ready_children)
310317
}
311318
};
@@ -887,7 +894,53 @@ impl DefaultPhysicalPlanner {
887894
options.clone(),
888895
))
889896
}
890-
897+
LogicalPlan::Pivot(pivot) => {
898+
let pivot_values = if let Some(subquery) = &pivot.value_subquery {
899+
let optimized_subquery = session_state.optimize(subquery.as_ref())?;
900+
901+
let subquery_physical_plan = self.create_physical_plan(
902+
&optimized_subquery,
903+
session_state
904+
).await?;
905+
906+
let subquery_results = collect(subquery_physical_plan.clone(), session_state.task_ctx()).await?;
907+
908+
let mut pivot_values = Vec::new();
909+
for batch in subquery_results.iter() {
910+
if batch.num_columns() != 1 {
911+
return plan_err!("Pivot subquery must return a single column");
912+
}
913+
914+
let column = batch.column(0);
915+
for row_idx in 0..batch.num_rows() {
916+
if !column.is_null(row_idx) {
917+
pivot_values.push(
918+
ScalarValue::try_from_array(column, row_idx)?
919+
);
920+
}
921+
}
922+
}
923+
pivot_values
924+
} else {
925+
pivot.pivot_values.clone()
926+
};
927+
928+
if !pivot_values.is_empty() {
929+
// Transform Pivot into Aggregate plan with the resolved pivot values
930+
let agg_plan = transform_pivot_to_aggregate(
931+
Arc::new(pivot.input.as_ref().clone()),
932+
&pivot.aggregate_expr,
933+
&pivot.pivot_column,
934+
Some(pivot_values),
935+
None,
936+
)?;
937+
938+
// The schema information is already preserved in the agg_plan
939+
return self.create_physical_plan(&agg_plan, session_state).await;
940+
} else {
941+
return plan_err!("PIVOT operation requires at least one value to pivot on");
942+
}
943+
}
891944
// 2 Children
892945
LogicalPlan::Join(Join {
893946
left,
@@ -2067,6 +2120,44 @@ impl DefaultPhysicalPlanner {
20672120
})
20682121
.collect::<Result<Vec<_>>>()?;
20692122

2123+
// Check if input plan was transformed from a PIVOT operation
2124+
// The original logical plan will be a LogicalPlan::Pivot that has been transformed to an Aggregate
2125+
match input.as_ref() {
2126+
// Direct PIVOT
2127+
LogicalPlan::Pivot(_) => {
2128+
// When we detect a PIVOT-derived plan, ensure all generated columns are preserved
2129+
if input_exec.as_any().downcast_ref::<AggregateExec>().is_some() {
2130+
let agg_exec = input_exec.as_any().downcast_ref::<AggregateExec>().unwrap();
2131+
let schema = input_exec.schema();
2132+
let group_by_len = agg_exec.group_expr().expr().len();
2133+
2134+
// If we have aggregate expressions that correspond to pivot columns
2135+
if group_by_len < schema.fields().len() {
2136+
// This is a pivot result - we need to include all columns from the aggregate
2137+
let mut all_exprs = physical_exprs.clone();
2138+
2139+
// Add any missing pivot columns (which are all columns after the group_by columns)
2140+
for (i, field) in schema.fields().iter().enumerate().skip(group_by_len) {
2141+
// Check if this column is already included in the projection
2142+
if !physical_exprs.iter().any(|(_, name)| name == field.name()) {
2143+
// Add this pivot column to the projection
2144+
all_exprs.push((
2145+
Arc::new(crate::physical_plan::expressions::Column::new(field.name(), i)) as Arc<dyn PhysicalExpr>,
2146+
field.name().clone(),
2147+
));
2148+
}
2149+
}
2150+
2151+
return Ok(Arc::new(ProjectionExec::try_new(
2152+
all_exprs,
2153+
input_exec,
2154+
)?));
2155+
}
2156+
}
2157+
},
2158+
_ => {}
2159+
}
2160+
20702161
Ok(Arc::new(ProjectionExec::try_new(
20712162
physical_exprs,
20722163
input_exec,

datafusion/expr/src/logical_plan/builder.rs

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ use crate::logical_plan::{
3434
Aggregate, Analyze, Distinct, DistinctOn, EmptyRelation, Explain, Filter, Join,
3535
JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType, Prepare,
3636
Projection, Repartition, Sort, SubqueryAlias, TableScan, Union, Unnest, Values,
37-
Window,
37+
Window, Pivot,
3838
};
3939
use crate::select_expr::SelectExpr;
4040
use crate::utils::{
@@ -1426,6 +1426,21 @@ impl LogicalPlanBuilder {
14261426
unnest_with_options(Arc::unwrap_or_clone(self.plan), columns, options)
14271427
.map(Self::new)
14281428
}
1429+
1430+
pub fn pivot(
1431+
self,
1432+
aggregate_expr: Expr,
1433+
pivot_column: Column,
1434+
pivot_values: Vec<ScalarValue>,
1435+
) -> Result<Self> {
1436+
let pivot_plan = Pivot::try_new(
1437+
self.plan,
1438+
aggregate_expr,
1439+
pivot_column,
1440+
pivot_values,
1441+
)?;
1442+
Ok(Self::new(LogicalPlan::Pivot(pivot_plan)))
1443+
}
14291444
}
14301445

14311446
impl From<LogicalPlan> for LogicalPlanBuilder {

datafusion/expr/src/logical_plan/display.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use crate::{
2424
expr_vec_fmt, Aggregate, DescribeTable, Distinct, DistinctOn, DmlStatement, Expr,
2525
Filter, Join, Limit, LogicalPlan, Partitioning, Projection, RecursiveQuery,
2626
Repartition, Sort, Subquery, SubqueryAlias, TableProviderFilterPushDown, TableScan,
27-
Unnest, Values, Window,
27+
Unnest, Values, Window, Pivot,
2828
};
2929

3030
use crate::dml::CopyTo;
@@ -650,6 +650,7 @@ impl<'a, 'b> PgJsonVisitor<'a, 'b> {
650650
"StructColumn": expr_vec_fmt!(struct_type_columns),
651651
})
652652
}
653+
LogicalPlan::Pivot(_) => todo!(),
653654
}
654655
}
655656
}

datafusion/expr/src/logical_plan/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ pub use plan::{
4141
DistinctOn, EmptyRelation, Explain, ExplainFormat, Extension, FetchType, Filter,
4242
Join, JoinConstraint, JoinType, Limit, LogicalPlan, Partitioning, PlanType,
4343
Projection, RecursiveQuery, Repartition, SkipType, Sort, StringifiedPlan, Subquery,
44-
SubqueryAlias, TableScan, ToStringifiedPlan, Union, Unnest, Values, Window,
44+
SubqueryAlias, TableScan, ToStringifiedPlan, Union, Unnest, Values, Window, Pivot
4545
};
4646
pub use statement::{
4747
Deallocate, Execute, Prepare, SetVariable, Statement, TransactionAccessMode,

0 commit comments

Comments
 (0)