Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2381,13 +2381,16 @@ class Analyzer(override val catalogManager: CatalogManager)
val unresolvedSortOrders = sortOrder.filter { s =>
!s.resolved || !s.references.subsetOf(aggregate.outputSet) || containsAggregate(s)
}
val aliasedOrdering =
unresolvedSortOrders.map(o => Alias(o.child, "aggOrder")())
val aggregatedOrdering = aggregate.copy(aggregateExpressions = aliasedOrdering)
val aliasedOrdering = unresolvedSortOrders.map(o => Alias(o.child, "aggOrder")())

val aggregateWithExtraOrdering = aggregate.copy(
aggregateExpressions = aggregate.aggregateExpressions ++ aliasedOrdering)

val resolvedAggregate: Aggregate =
executeSameContext(aggregatedOrdering).asInstanceOf[Aggregate]
val resolvedAliasedOrdering: Seq[Alias] =
resolvedAggregate.aggregateExpressions.asInstanceOf[Seq[Alias]]
executeSameContext(aggregateWithExtraOrdering).asInstanceOf[Aggregate]

val (reResolvedAggExprs, resolvedAliasedOrdering) =
resolvedAggregate.aggregateExpressions.splitAt(aggregate.aggregateExpressions.length)

// If we pass the analysis check, then the ordering expressions should only reference to
// aggregate expressions or grouping expressions, and it's safe to push them down to
Expand All @@ -2401,24 +2404,25 @@ class Analyzer(override val catalogManager: CatalogManager)
// expression instead.
val needsPushDown = ArrayBuffer.empty[NamedExpression]
val orderToAlias = unresolvedSortOrders.zip(aliasedOrdering)
val evaluatedOrderings = resolvedAliasedOrdering.zip(orderToAlias).map {
case (evaluated, (order, aliasOrder)) =>
val index = originalAggExprs.indexWhere {
case Alias(child, _) => child semanticEquals evaluated.child
case other => other semanticEquals evaluated.child
}
val evaluatedOrderings =
resolvedAliasedOrdering.asInstanceOf[Seq[Alias]].zip(orderToAlias).map {
case (evaluated, (order, aliasOrder)) =>
val index = reResolvedAggExprs.indexWhere {
case Alias(child, _) => child semanticEquals evaluated.child
case other => other semanticEquals evaluated.child
}

if (index == -1) {
if (CharVarcharUtils.getRawType(evaluated.metadata).nonEmpty) {
needsPushDown += aliasOrder
order.copy(child = aliasOrder)
if (index == -1) {
if (hasCharVarchar(evaluated)) {
needsPushDown += aliasOrder
order.copy(child = aliasOrder)
} else {
needsPushDown += evaluated
order.copy(child = evaluated.toAttribute)
}
} else {
needsPushDown += evaluated
order.copy(child = evaluated.toAttribute)
order.copy(child = originalAggExprs(index).toAttribute)
}
} else {
order.copy(child = originalAggExprs(index).toAttribute)
}
}

val sortOrdersMap = unresolvedSortOrders
Expand All @@ -2443,6 +2447,13 @@ class Analyzer(override val catalogManager: CatalogManager)
}
}

def hasCharVarchar(expr: Alias): Boolean = {
expr.find {
case ne: NamedExpression => CharVarcharUtils.getRawType(ne.metadata).nonEmpty
case _ => false
}.nonEmpty
}

def containsAggregate(condition: Expression): Boolean = {
condition.find(_.isInstanceOf[AggregateExpression]).isDefined
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ Input [8]: [ws_sold_date_sk#1, ws_item_sk#2, ws_web_page_sk#3, ws_order_number#4

(10) Exchange
Input [6]: [ws_sold_date_sk#1, ws_item_sk#2, ws_order_number#4, ws_quantity#5, ws_sales_price#6, ws_net_profit#7]
Arguments: hashpartitioning(cast(ws_item_sk#2 as bigint), cast(ws_order_number#4 as bigint), 5), true, [id=#10]
Arguments: hashpartitioning(cast(ws_item_sk#2 as bigint), cast(ws_order_number#4 as bigint), 5), ENSURE_REQUIREMENTS, [id=#10]

(11) Sort [codegen id : 3]
Input [6]: [ws_sold_date_sk#1, ws_item_sk#2, ws_order_number#4, ws_quantity#5, ws_sales_price#6, ws_net_profit#7]
Expand All @@ -123,7 +123,7 @@ Condition : (((((isnotnull(wr_item_sk#11) AND isnotnull(wr_order_number#16)) AND

(15) Exchange
Input [8]: [wr_item_sk#11, wr_refunded_cdemo_sk#12, wr_refunded_addr_sk#13, wr_returning_cdemo_sk#14, wr_reason_sk#15, wr_order_number#16, wr_fee#17, wr_refunded_cash#18]
Arguments: hashpartitioning(wr_item_sk#11, wr_order_number#16, 5), true, [id=#19]
Arguments: hashpartitioning(wr_item_sk#11, wr_order_number#16, 5), ENSURE_REQUIREMENTS, [id=#19]

(16) Sort [codegen id : 5]
Input [8]: [wr_item_sk#11, wr_refunded_cdemo_sk#12, wr_refunded_addr_sk#13, wr_returning_cdemo_sk#14, wr_reason_sk#15, wr_order_number#16, wr_fee#17, wr_refunded_cash#18]
Expand Down Expand Up @@ -229,7 +229,7 @@ Input [11]: [ws_quantity#5, ws_sales_price#6, ws_net_profit#7, wr_refunded_cdemo

(39) Exchange
Input [7]: [ws_quantity#5, ws_sales_price#6, wr_refunded_cdemo_sk#12, wr_returning_cdemo_sk#14, wr_fee#17, wr_refunded_cash#18, r_reason_desc#24]
Arguments: hashpartitioning(wr_refunded_cdemo_sk#12, wr_returning_cdemo_sk#14, 5), true, [id=#30]
Arguments: hashpartitioning(wr_refunded_cdemo_sk#12, wr_returning_cdemo_sk#14, 5), ENSURE_REQUIREMENTS, [id=#30]

(40) Sort [codegen id : 10]
Input [7]: [ws_quantity#5, ws_sales_price#6, wr_refunded_cdemo_sk#12, wr_returning_cdemo_sk#14, wr_fee#17, wr_refunded_cash#18, r_reason_desc#24]
Expand Down Expand Up @@ -278,7 +278,7 @@ Input [6]: [cd_demo_sk#31, cd_marital_status#32, cd_education_status#33, cd_demo

(50) Exchange
Input [4]: [cd_demo_sk#31, cd_marital_status#32, cd_education_status#33, cd_demo_sk#35]
Arguments: hashpartitioning(cast(cd_demo_sk#31 as bigint), cast(cd_demo_sk#35 as bigint), 5), true, [id=#38]
Arguments: hashpartitioning(cast(cd_demo_sk#31 as bigint), cast(cd_demo_sk#35 as bigint), 5), ENSURE_REQUIREMENTS, [id=#38]

(51) Sort [codegen id : 13]
Input [4]: [cd_demo_sk#31, cd_marital_status#32, cd_education_status#33, cd_demo_sk#35]
Expand All @@ -302,16 +302,16 @@ Results [7]: [r_reason_desc#24, sum#45, count#46, sum#47, count#48, sum#49, coun

(55) Exchange
Input [7]: [r_reason_desc#24, sum#45, count#46, sum#47, count#48, sum#49, count#50]
Arguments: hashpartitioning(r_reason_desc#24, 5), true, [id=#51]
Arguments: hashpartitioning(r_reason_desc#24, 5), ENSURE_REQUIREMENTS, [id=#51]

(56) HashAggregate [codegen id : 15]
Input [7]: [r_reason_desc#24, sum#45, count#46, sum#47, count#48, sum#49, count#50]
Keys [1]: [r_reason_desc#24]
Functions [3]: [avg(cast(ws_quantity#5 as bigint)), avg(UnscaledValue(wr_refunded_cash#18)), avg(UnscaledValue(wr_fee#17))]
Aggregate Attributes [3]: [avg(cast(ws_quantity#5 as bigint))#52, avg(UnscaledValue(wr_refunded_cash#18))#53, avg(UnscaledValue(wr_fee#17))#54]
Results [5]: [substr(r_reason_desc#24, 1, 20) AS substr(r_reason_desc, 1, 20)#55, avg(cast(ws_quantity#5 as bigint))#52 AS avg(ws_quantity)#56, cast((avg(UnscaledValue(wr_refunded_cash#18))#53 / 100.0) as decimal(11,6)) AS avg(wr_refunded_cash)#57, cast((avg(UnscaledValue(wr_fee#17))#54 / 100.0) as decimal(11,6)) AS avg(wr_fee)#58, avg(cast(ws_quantity#5 as bigint))#52 AS aggOrder#59]
Results [4]: [substr(r_reason_desc#24, 1, 20) AS substr(r_reason_desc, 1, 20)#55, avg(cast(ws_quantity#5 as bigint))#52 AS avg(ws_quantity)#56, cast((avg(UnscaledValue(wr_refunded_cash#18))#53 / 100.0) as decimal(11,6)) AS avg(wr_refunded_cash)#57, cast((avg(UnscaledValue(wr_fee#17))#54 / 100.0) as decimal(11,6)) AS avg(wr_fee)#58]

(57) TakeOrderedAndProject
Input [5]: [substr(r_reason_desc, 1, 20)#55, avg(ws_quantity)#56, avg(wr_refunded_cash)#57, avg(wr_fee)#58, aggOrder#59]
Arguments: 100, [substr(r_reason_desc, 1, 20)#55 ASC NULLS FIRST, aggOrder#59 ASC NULLS FIRST, avg(wr_refunded_cash)#57 ASC NULLS FIRST, avg(wr_fee)#58 ASC NULLS FIRST], [substr(r_reason_desc, 1, 20)#55, avg(ws_quantity)#56, avg(wr_refunded_cash)#57, avg(wr_fee)#58]
Input [4]: [substr(r_reason_desc, 1, 20)#55, avg(ws_quantity)#56, avg(wr_refunded_cash)#57, avg(wr_fee)#58]
Arguments: 100, [substr(r_reason_desc, 1, 20)#55 ASC NULLS FIRST, avg(ws_quantity)#56 ASC NULLS FIRST, avg(wr_refunded_cash)#57 ASC NULLS FIRST, avg(wr_fee)#58 ASC NULLS FIRST], [substr(r_reason_desc, 1, 20)#55, avg(ws_quantity)#56, avg(wr_refunded_cash)#57, avg(wr_fee)#58]

Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
TakeOrderedAndProject [substr(r_reason_desc, 1, 20),aggOrder,avg(wr_refunded_cash),avg(wr_fee),avg(ws_quantity)]
TakeOrderedAndProject [substr(r_reason_desc, 1, 20),avg(ws_quantity),avg(wr_refunded_cash),avg(wr_fee)]
WholeStageCodegen (15)
HashAggregate [r_reason_desc,sum,count,sum,count,sum,count] [avg(cast(ws_quantity as bigint)),avg(UnscaledValue(wr_refunded_cash)),avg(UnscaledValue(wr_fee)),substr(r_reason_desc, 1, 20),avg(ws_quantity),avg(wr_refunded_cash),avg(wr_fee),aggOrder,sum,count,sum,count,sum,count]
HashAggregate [r_reason_desc,sum,count,sum,count,sum,count] [avg(cast(ws_quantity as bigint)),avg(UnscaledValue(wr_refunded_cash)),avg(UnscaledValue(wr_fee)),substr(r_reason_desc, 1, 20),avg(ws_quantity),avg(wr_refunded_cash),avg(wr_fee),sum,count,sum,count,sum,count]
InputAdapter
Exchange [r_reason_desc] #1
WholeStageCodegen (14)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,16 +272,16 @@ Results [7]: [r_reason_desc#35, sum#43, count#44, sum#45, count#46, sum#47, coun

(49) Exchange
Input [7]: [r_reason_desc#35, sum#43, count#44, sum#45, count#46, sum#47, count#48]
Arguments: hashpartitioning(r_reason_desc#35, 5), true, [id=#49]
Arguments: hashpartitioning(r_reason_desc#35, 5), ENSURE_REQUIREMENTS, [id=#49]

(50) HashAggregate [codegen id : 9]
Input [7]: [r_reason_desc#35, sum#43, count#44, sum#45, count#46, sum#47, count#48]
Keys [1]: [r_reason_desc#35]
Functions [3]: [avg(cast(ws_quantity#5 as bigint)), avg(UnscaledValue(wr_refunded_cash#15)), avg(UnscaledValue(wr_fee#14))]
Aggregate Attributes [3]: [avg(cast(ws_quantity#5 as bigint))#50, avg(UnscaledValue(wr_refunded_cash#15))#51, avg(UnscaledValue(wr_fee#14))#52]
Results [5]: [substr(r_reason_desc#35, 1, 20) AS substr(r_reason_desc, 1, 20)#53, avg(cast(ws_quantity#5 as bigint))#50 AS avg(ws_quantity)#54, cast((avg(UnscaledValue(wr_refunded_cash#15))#51 / 100.0) as decimal(11,6)) AS avg(wr_refunded_cash)#55, cast((avg(UnscaledValue(wr_fee#14))#52 / 100.0) as decimal(11,6)) AS avg(wr_fee)#56, avg(cast(ws_quantity#5 as bigint))#50 AS aggOrder#57]
Results [4]: [substr(r_reason_desc#35, 1, 20) AS substr(r_reason_desc, 1, 20)#53, avg(cast(ws_quantity#5 as bigint))#50 AS avg(ws_quantity)#54, cast((avg(UnscaledValue(wr_refunded_cash#15))#51 / 100.0) as decimal(11,6)) AS avg(wr_refunded_cash)#55, cast((avg(UnscaledValue(wr_fee#14))#52 / 100.0) as decimal(11,6)) AS avg(wr_fee)#56]

(51) TakeOrderedAndProject
Input [5]: [substr(r_reason_desc, 1, 20)#53, avg(ws_quantity)#54, avg(wr_refunded_cash)#55, avg(wr_fee)#56, aggOrder#57]
Arguments: 100, [substr(r_reason_desc, 1, 20)#53 ASC NULLS FIRST, aggOrder#57 ASC NULLS FIRST, avg(wr_refunded_cash)#55 ASC NULLS FIRST, avg(wr_fee)#56 ASC NULLS FIRST], [substr(r_reason_desc, 1, 20)#53, avg(ws_quantity)#54, avg(wr_refunded_cash)#55, avg(wr_fee)#56]
Input [4]: [substr(r_reason_desc, 1, 20)#53, avg(ws_quantity)#54, avg(wr_refunded_cash)#55, avg(wr_fee)#56]
Arguments: 100, [substr(r_reason_desc, 1, 20)#53 ASC NULLS FIRST, avg(ws_quantity)#54 ASC NULLS FIRST, avg(wr_refunded_cash)#55 ASC NULLS FIRST, avg(wr_fee)#56 ASC NULLS FIRST], [substr(r_reason_desc, 1, 20)#53, avg(ws_quantity)#54, avg(wr_refunded_cash)#55, avg(wr_fee)#56]

Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
TakeOrderedAndProject [substr(r_reason_desc, 1, 20),aggOrder,avg(wr_refunded_cash),avg(wr_fee),avg(ws_quantity)]
TakeOrderedAndProject [substr(r_reason_desc, 1, 20),avg(ws_quantity),avg(wr_refunded_cash),avg(wr_fee)]
WholeStageCodegen (9)
HashAggregate [r_reason_desc,sum,count,sum,count,sum,count] [avg(cast(ws_quantity as bigint)),avg(UnscaledValue(wr_refunded_cash)),avg(UnscaledValue(wr_fee)),substr(r_reason_desc, 1, 20),avg(ws_quantity),avg(wr_refunded_cash),avg(wr_fee),aggOrder,sum,count,sum,count,sum,count]
HashAggregate [r_reason_desc,sum,count,sum,count,sum,count] [avg(cast(ws_quantity as bigint)),avg(UnscaledValue(wr_refunded_cash)),avg(UnscaledValue(wr_fee)),substr(r_reason_desc, 1, 20),avg(ws_quantity),avg(wr_refunded_cash),avg(wr_fee),sum,count,sum,count,sum,count]
InputAdapter
Exchange [r_reason_desc] #1
WholeStageCodegen (8)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,17 @@ trait CharVarcharTestSuite extends QueryTest with SQLTestUtils {
checkAnswer(sql("SELECT v, sum(i) FROM t GROUP BY v ORDER BY v"), Row("c", 1))
}
}

test("SPARK-34003: fix char/varchar fails w/ order by functions") {
withTable("t") {
sql(s"CREATE TABLE t(v VARCHAR(3), i INT) USING $format")
sql("INSERT INTO t VALUES ('c', 1)")
checkAnswer(sql("SELECT substr(v, 1, 2), sum(i) FROM t GROUP BY v ORDER BY substr(v, 1, 2)"),
Row("c", 1))
checkAnswer(sql("SELECT sum(i) FROM t GROUP BY v ORDER BY substr(v, 1, 2)"),
Row(1))
}
}
}

// Some basic char/varchar tests which doesn't rely on table implementation.
Expand Down