Skip to content

Commit df3394c

Browse files
committed
adress comments
1 parent 75ef545 commit df3394c

File tree

2 files changed

+33
-11
lines changed

2 files changed

+33
-11
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,6 +242,13 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int)
242242
*/
243243
def partitionIdExpression: Expression = Pmod(new Murmur3Hash(expressions), Literal(numPartitions))
244244

245+
/**
246+
* If the HashPartitioning contains an attribute which is not present in the output expressions,
247+
* the returned partitioning in `UnknownPartitioning` instead of the `HashPartitioning` of the
248+
* remaining attributes which is wrong.
249+
* Eg. `HashPartitioning('a, 'b)` with output expressions `'a as 'a1`, should produce
250+
* `UnknownPartitioning` instead of `HashPartitioning('a1)`
251+
*/
245252
override private[spark] def pruneInvalidAttribute(invalidAttr: Attribute): Partitioning = {
246253
if (this.references.contains(invalidAttr)) {
247254
UnknownPartitioning(numPartitions)
@@ -301,7 +308,7 @@ case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int)
301308

302309
override private[spark] def pruneInvalidAttribute(invalidAttr: Attribute): Partitioning = {
303310
if (this.references.contains(invalidAttr)) {
304-
val validExprs = this.children.takeWhile(!_.references.contains(invalidAttr))
311+
val validExprs = ordering.takeWhile(!_.references.contains(invalidAttr))
305312
if (validExprs.isEmpty) {
306313
UnknownPartitioning(numPartitions)
307314
} else {

sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputPartitioning.scala

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,9 @@ import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningC
2727
* exposed in the output of this plan. It can avoid the presence of redundant shuffles in queries
2828
* caused by the rename of an attribute among the partitioning ones, eg.
2929
*
30-
* spark.range(10).selectExpr("id AS key", "0").repartition($"key").write.saveAsTable("df1")
31-
* spark.range(10).selectExpr("id AS key", "0").repartition($"key").write.saveAsTable("df2")
30+
* spark.range(10).selectExpr("id AS key", "0").repartition($"key").createTempView("df1")
31+
* spark.range(10).selectExpr("id AS key", "0").repartition($"key").createTempView("df2")
32+
* sql("set spark.sql.autoBroadcastJoinThreshold=-1")
3233
* sql("""
3334
* SELECT * FROM
3435
* (SELECT key AS k from df1) t1
@@ -38,15 +39,16 @@ import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningC
3839
* """).explain
3940
*
4041
* == Physical Plan ==
41-
* *SortMergeJoin [k#56L], [k#57L], Inner
42-
* :- *Sort [k#56L ASC NULLS FIRST], false, 0
43-
* : +- Exchange hashpartitioning(k#56L, 200) // <--- Unnecessary shuffle operation
44-
* : +- *Project [key#39L AS k#56L]
45-
* : +- Exchange hashpartitioning(key#39L, 200)
46-
* : +- *Project [id#36L AS key#39L]
42+
* *SortMergeJoin [k#21L], [k#22L], Inner
43+
* :- *Sort [k#21L ASC NULLS FIRST], false, 0
44+
* : +- Exchange hashpartitioning(k#21L, 200) // <--- Unnecessary shuffle operation
45+
* : +- *Project [key#2L AS k#21L]
46+
* : +- Exchange hashpartitioning(key#2L, 200)
47+
* : +- *Project [id#0L AS key#2L]
4748
* : +- *Range (0, 10, step=1, splits=Some(4))
48-
* +- *Sort [k#57L ASC NULLS FIRST], false, 0
49-
* +- ReusedExchange [k#57L], Exchange hashpartitioning(k#56L, 200)
49+
* +- *(4) Sort [k#22L ASC NULLS FIRST], false, 0
50+
* +- *(4) Project [key#8L AS k#22L]
51+
* +- ReusedExchange [key#8L], Exchange hashpartitioning(key#2L, 200)
5052
*/
5153
trait AliasAwareOutputPartitioning extends UnaryExecNode {
5254

@@ -61,6 +63,13 @@ trait AliasAwareOutputPartitioning extends UnaryExecNode {
6163
final override def outputPartitioning: Partitioning = {
6264
child.outputPartitioning match {
6365
case partitioning: Expression =>
66+
// Creates a sequence of tuples where the first element is an `Attribute` referenced in the
67+
// partitioning expression of the child and the second is a sequence of all its aliased
68+
// occurrences in the node output. If there is no occurrence of an attribute in the output,
69+
// the second element of the tuple for it will be an empty `Seq`. If the attribute,
70+
// instead, is only present as is in the output, there will be no entry for it.
71+
// Eg. if the partitioning is RangePartitioning('a) and the node output is "a, 'a as a1,
72+
// a' as a2", then exprToEquiv will contain the tuple ('a, Seq('a, 'a as a1, 'a as a2)).
6473
val exprToEquiv = partitioning.references.map { attr =>
6574
attr -> outputExpressions.filter(e =>
6675
CleanupAliases.trimAliases(e).semanticEquals(attr))
@@ -71,9 +80,15 @@ trait AliasAwareOutputPartitioning extends UnaryExecNode {
7180
case PartitioningCollection(partitionings) => partitionings
7281
case other => Seq(other)
7382
}
83+
// Replace all the aliased expressions detected earlier with all their corresponding
84+
// occurrences. This may produce many valid partitioning expressions from a single one.
85+
// Eg. in the example above, this would produce a `Seq` of 3 `RangePartitioning`, namely:
86+
// `RangePartitioning('a)`, `RangePartitioning('a1)`, `RangePartitioning('a2)`.
7487
val validPartitionings = exprToEquiv.foldLeft(initValue) {
7588
case (partitionings, (toReplace, equivalents)) =>
7689
if (equivalents.isEmpty) {
90+
// Remove from the partitioning expression the attribute which is not present in the
91+
// node output
7792
partitionings.map(_.pruneInvalidAttribute(toReplace))
7893
} else {
7994
partitionings.flatMap {

0 commit comments

Comments
 (0)