Skip to content

Commit 7d38be8

Browse files
authored
Fix incompatible type in IS NOT DISTINCT filter for PIVOT (#25)
* Fix incompattible type in IS NOT DISTINCT filter * Cargo fmt * Cargo clippy
1 parent 6007430 commit 7d38be8

File tree

2 files changed

+33
-5
lines changed

2 files changed

+33
-5
lines changed

datafusion/core/src/physical_planner.rs

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,8 @@ use datafusion_expr::expr::{
7676
use datafusion_expr::expr_rewriter::unnormalize_cols;
7777
use datafusion_expr::logical_plan::builder::wrap_projection_for_join_if_necessary;
7878
use datafusion_expr::{
79-
Analyze, BinaryExpr, DescribeTable, DmlStatement, Explain, ExplainFormat, Extension,
80-
FetchType, Filter, JoinType, LogicalPlanBuilder, RecursiveQuery, SkipType,
79+
Analyze, BinaryExpr, Cast, DescribeTable, DmlStatement, Explain, ExplainFormat,
80+
Extension, FetchType, Filter, JoinType, LogicalPlanBuilder, RecursiveQuery, SkipType,
8181
StringifiedPlan, WindowFrame, WindowFrameBound, WriteOp,
8282
};
8383
use datafusion_physical_expr::aggregate::{AggregateExprBuilder, AggregateFunctionExpr};
@@ -1773,16 +1773,31 @@ pub fn transform_pivot_to_aggregate(
17731773
.map(|col: datafusion_common::Column| Expr::Column(col))
17741774
.collect();
17751775

1776-
let builder = LogicalPlanBuilder::from(Arc::unwrap_or_clone(input));
1776+
let builder = LogicalPlanBuilder::from(Arc::unwrap_or_clone(Arc::clone(&input)));
17771777

17781778
// Create the aggregate plan with filtered aggregates
17791779
let mut aggregate_exprs = Vec::new();
17801780

1781+
let input_schema = input.schema();
1782+
let pivot_col_idx = match input_schema.index_of_column(pivot_column) {
1783+
Ok(idx) => idx,
1784+
Err(_) => {
1785+
return plan_err!(
1786+
"Pivot column '{}' does not exist in input schema",
1787+
pivot_column
1788+
)
1789+
}
1790+
};
1791+
let pivot_col_type = input_schema.field(pivot_col_idx).data_type();
1792+
17811793
for value in &pivot_values {
17821794
let filter_condition = Expr::BinaryExpr(BinaryExpr::new(
17831795
Box::new(Expr::Column(pivot_column.clone())),
17841796
Operator::IsNotDistinctFrom,
1785-
Box::new(Expr::Literal(value.clone())),
1797+
Box::new(Expr::Cast(Cast::new(
1798+
Box::new(Expr::Literal(value.clone())),
1799+
pivot_col_type.clone(),
1800+
))),
17861801
));
17871802

17881803
let filtered_agg = match aggregate_expr {

datafusion/sqllogictest/test_files/pivot.slt

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,19 @@ ORDER BY empid;
6868
2 39500 90700
6969
3 1001 1001
7070

71+
# PIVOT with cast to pivot column type
72+
query TIII
73+
SELECT *
74+
FROM quarterly_sales
75+
PIVOT(SUM(amount) FOR empid IN (1,2,3))
76+
ORDER BY quarter;
77+
----
78+
2023_Q1 10400 39500 NULL
79+
2023_Q2 8000 90700 NULL
80+
2023_Q3 11000 12000 2700
81+
2023_Q4 18000 5300 28900
82+
83+
7184
# PIVOT with automatic detection of all distinct column values using ANY
7285
query TIII
7386
SELECT *
@@ -119,7 +132,7 @@ ORDER BY empid;
119132

120133

121134
# Non-existent column in the FOR clause
122-
query error DataFusion error: Schema error: No field named non_existent_column\. Valid fields are quarterly_sales\.empid, quarterly_sales\.amount, quarterly_sales\.quarter\.
135+
query error DataFusion error: Error during planning: Pivot column 'non_existent_column' does not exist in input schema
123136
SELECT *
124137
FROM quarterly_sales
125138
PIVOT(SUM(amount) FOR non_existent_column IN ('2023_Q1', '2023_Q2'))

0 commit comments

Comments
 (0)