@@ -21,13 +21,14 @@ use crate::planner::{ContextProvider, PlannerContext, SqlToRel};
2121
2222use datafusion_common:: tree_node:: { Transformed , TreeNode } ;
2323use datafusion_common:: {
24- not_impl_err, plan_err, Column , DFSchema , Diagnostic , Result , ScalarValue , Span , Spans , TableReference
24+ not_impl_err, plan_err, Column , DFSchema , Diagnostic , Result , ScalarValue , Span ,
25+ Spans , TableReference ,
2526} ;
27+ use datafusion_expr:: binary:: comparison_coercion;
2628use datafusion_expr:: builder:: subquery_alias;
2729use datafusion_expr:: { expr:: Unnest , Expr , LogicalPlan , LogicalPlanBuilder } ;
2830use datafusion_expr:: { Subquery , SubqueryAlias } ;
29- use sqlparser:: ast:: { FunctionArg , FunctionArgExpr , Spanned , TableFactor , NullInclusion } ;
30- use datafusion_expr:: binary:: comparison_coercion;
31+ use sqlparser:: ast:: { FunctionArg , FunctionArgExpr , NullInclusion , Spanned , TableFactor } ;
3132
3233mod join;
3334
@@ -309,87 +310,96 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
309310 } => {
310311 let base_plan = self . create_relation ( * table, planner_context) ?;
311312 let base_schema = base_plan. schema ( ) ;
312-
313+
313314 let value_column = value. value . clone ( ) ;
314315 let name_column = name. value . clone ( ) ;
315-
316+
316317 let mut unpivot_column_indices = Vec :: new ( ) ;
317318 let mut unpivot_column_names = Vec :: new ( ) ;
318-
319+
319320 let mut common_type = None ;
320-
321+
321322 for column_ident in & columns {
322323 let column_name = column_ident. value . clone ( ) ;
323-
324- if let Some ( idx) = base_schema. index_of_column_by_name ( None , & column_name) {
324+
325+ if let Some ( idx) =
326+ base_schema. index_of_column_by_name ( None , & column_name)
327+ {
325328 let field = base_schema. field ( idx) ;
326329 let field_type = field. data_type ( ) ;
327-
330+
328331 // Verify all unpivot columns have compatible types
329332 if let Some ( current_type) = & common_type {
330333 if comparison_coercion ( current_type, field_type) . is_none ( ) {
331334 return plan_err ! (
332- "UNPIVOT columns must have the same data type. Found {} and {} " ,
333- current_type , field_type
335+ "The type of column '{}' conflicts with the type of other columns in the UNPIVOT list. " ,
336+ column_name . to_uppercase ( )
334337 ) ;
335338 }
336339 } else {
337340 common_type = Some ( field_type. clone ( ) ) ;
338341 }
339-
342+
340343 unpivot_column_indices. push ( idx) ;
341344 unpivot_column_names. push ( column_name) ;
342345 } else {
343346 return plan_err ! ( "Column '{}' not found in input" , column_name) ;
344347 }
345348 }
346-
349+
347350 if unpivot_column_names. is_empty ( ) {
348351 return plan_err ! ( "UNPIVOT requires at least one column to unpivot" ) ;
349352 }
350-
353+
351354 let non_pivot_exprs: Vec < Expr > = base_schema
352355 . fields ( )
353356 . iter ( )
354357 . enumerate ( )
355358 . filter ( |( i, _) | !unpivot_column_indices. contains ( i) )
356359 . map ( |( _, f) | Expr :: Column ( Column :: new ( None :: < & str > , f. name ( ) ) ) )
357360 . collect ( ) ;
358-
361+
359362 let mut union_inputs = Vec :: with_capacity ( unpivot_column_names. len ( ) ) ;
360-
363+
361364 for col_name in & unpivot_column_names {
362365 let mut projection_exprs = non_pivot_exprs. clone ( ) ;
363-
364- let name_expr = Expr :: Literal ( ScalarValue :: Utf8 ( Some ( col_name. clone ( ) ) ) )
365- . alias ( name_column. clone ( ) ) ;
366-
367- let value_expr = Expr :: Column ( Column :: new ( None :: < & str > , col_name. clone ( ) ) )
368- . alias ( value_column. clone ( ) ) ;
369-
366+
367+ let name_expr =
368+ Expr :: Literal ( ScalarValue :: Utf8 ( Some ( col_name. clone ( ) ) ) )
369+ . alias ( name_column. clone ( ) ) ;
370+
371+ let value_expr =
372+ Expr :: Column ( Column :: new ( None :: < & str > , col_name. clone ( ) ) )
373+ . alias ( value_column. clone ( ) ) ;
374+
370375 projection_exprs. push ( name_expr) ;
371376 projection_exprs. push ( value_expr) ;
372-
377+
373378 let mut builder = LogicalPlanBuilder :: from ( base_plan. clone ( ) )
374379 . project ( projection_exprs) ?;
375-
376- if null_inclusion. clone ( ) . unwrap_or ( NullInclusion :: ExcludeNulls ) == NullInclusion :: ExcludeNulls {
380+
381+ if null_inclusion
382+ . clone ( )
383+ . unwrap_or ( NullInclusion :: ExcludeNulls )
384+ == NullInclusion :: ExcludeNulls
385+ {
377386 let col = Column :: new ( None :: < & str > , value_column. clone ( ) ) ;
378- builder = builder. filter ( Expr :: IsNotNull ( Box :: new ( Expr :: Column ( col) ) ) ) ?;
387+ builder = builder
388+ . filter ( Expr :: IsNotNull ( Box :: new ( Expr :: Column ( col) ) ) ) ?;
379389 }
380-
390+
381391 union_inputs. push ( builder. build ( ) ?) ;
382392 }
383-
393+
384394 let first = union_inputs. remove ( 0 ) ;
385395 let mut union_builder = LogicalPlanBuilder :: from ( first) ;
386-
396+
387397 for plan in union_inputs {
388398 union_builder = union_builder. union ( plan) ?;
389399 }
390-
400+
391401 let unpivot_plan = union_builder. build ( ) ?;
392-
402+
393403 ( unpivot_plan, alias)
394404 }
395405 // @todo: Support TableFactory::TableFunction
0 commit comments