From b566818bf17c2b69d06fed2e1500e608a25bb3b7 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Tue, 6 Nov 2018 16:55:37 +0100 Subject: [PATCH 01/24] [SPARK-25951][SQL] Remove Alias when canonicalize --- .../sql/catalyst/expressions/Canonicalize.scala | 8 +++++++- .../spark/sql/execution/PlannerSuite.scala | 17 +++++++++++++++++ 2 files changed, 24 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala index fe6db8b344d3..81ea4d6bbf8e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala @@ -34,7 +34,7 @@ package org.apache.spark.sql.catalyst.expressions */ object Canonicalize { def execute(e: Expression): Expression = { - expressionReorder(ignoreNamesTypes(e)) + expressionReorder(ignoreNamesTypes(removeAlias(e))) } /** Remove names and nullability from types. */ @@ -91,4 +91,10 @@ object Canonicalize { case _ => e } + + /** Remove Aliases, */ + private def removeAlias(e: Expression): Expression = e match { + case a: Alias => a.child + case _ => e + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index e4e224df7607..828fda999c7e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -780,6 +780,23 @@ class PlannerSuite extends SharedSQLContext { classOf[PartitioningCollection]) } } + + test("SPARK-25951: avoid redundant shuffle on rename") { + val renamedA = Alias(exprA, "a")() + val doubleRename = Alias(Alias(exprA, "a")(), "b")() + val plan1 = ShuffleExchangeExec( + HashPartitioning(exprA :: Nil, 5), + DummySparkPlan(outputPartitioning = HashPartitioning(doubleRename :: Nil, 5))) + val plan2 = ShuffleExchangeExec( + HashPartitioning(exprA :: Nil, 5), + DummySparkPlan(outputPartitioning = HashPartitioning(renamedA :: Nil, 5))) + val smjExec = SortMergeJoinExec( + doubleRename :: Nil, renamedA :: Nil, Inner, None, plan1, plan2) + + val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(smjExec) + assertDistributionRequirementsAreSatisfied(outputPlan) + assert(outputPlan.collect { case _: ShuffleExchangeExec => true }.isEmpty) + } } // Used for unit-testing EnsureRequirements From 2b00f351fbb8f33cf01b30297f3785956b1b3697 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Wed, 7 Nov 2018 16:02:47 +0100 Subject: [PATCH 02/24] introduce sameResult --- .../sql/catalyst/expressions/Canonicalize.scala | 8 +------- .../sql/catalyst/expressions/Expression.scala | 14 +++++++++++++- .../spark/sql/catalyst/expressions/SortOrder.scala | 2 +- .../catalyst/expressions/namedExpressions.scala | 4 ++++ .../sql/catalyst/plans/physical/partitioning.scala | 13 ++++++++++--- .../execution/exchange/EnsureRequirements.scala | 2 +- 6 files changed, 30 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala index 81ea4d6bbf8e..fe6db8b344d3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Canonicalize.scala @@ -34,7 +34,7 @@ package org.apache.spark.sql.catalyst.expressions */ object Canonicalize { def execute(e: Expression): Expression = { - expressionReorder(ignoreNamesTypes(removeAlias(e))) + expressionReorder(ignoreNamesTypes(e)) } /** Remove names and nullability from types. */ @@ -91,10 +91,4 @@ object Canonicalize { case _ => e } - - /** Remove Aliases, */ - private def removeAlias(e: Expression): Expression = e match { - case a: Alias => a.child - case _ => e - } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index ccc5b9043a0a..e75247f07d31 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -195,7 +195,7 @@ abstract class Expression extends TreeNode[Expression] { } /** - * Returns true when two expressions will always compute the same result, even if they differ + * Returns true when two expressions will always compute the same output, even if they differ * cosmetically (i.e. capitalization of names in attributes may be different). * * See [[Canonicalize]] for more details. @@ -203,6 +203,18 @@ abstract class Expression extends TreeNode[Expression] { def semanticEquals(other: Expression): Boolean = deterministic && other.deterministic && canonicalized == other.canonicalized + /** + * Returns true when two expressions will always compute the same result, even if the output may + * be different, because of different names or similar differences. + * Usually this means they their canonicalized form equals, but it may also not be the case, as + * different output expressions can evaluate to the same result as well (eg. when an expression + * is aliased). + */ + def sameResult(other: Expression): Boolean = other match { + case a: Alias => sameResult(a.child) + case _ => this.semanticEquals(other) + } + /** * Returns a `hashCode` for the calculation performed by this expression. Unlike the standard * `hashCode`, an attempt has been made to eliminate cosmetic differences. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala index 536276b5cb29..915a4667981a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala @@ -86,7 +86,7 @@ case class SortOrder( def isAscending: Boolean = direction == Ascending def satisfies(required: SortOrder): Boolean = { - (sameOrderExpressions + child).exists(required.child.semanticEquals) && + (sameOrderExpressions + child).exists(required.child.sameResult) && direction == required.direction && nullOrdering == required.nullOrdering } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index 584a2946bd56..395a715598bd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -167,6 +167,10 @@ case class Alias(child: Expression, name: String)( } } + override def sameResult(other: Expression): Boolean = { + this.child.sameResult(other) + } + def newInstance(): NamedExpression = Alias(child, name)(qualifier = qualifier, explicitMetadata = explicitMetadata) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index cc1a5e835d9c..87a002ec57ff 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -223,15 +223,22 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int) required match { case h: HashClusteredDistribution => expressions.length == h.expressions.length && expressions.zip(h.expressions).forall { - case (l, r) => l.semanticEquals(r) + case (l, r) => l.sameResult(r) } case ClusteredDistribution(requiredClustering, _) => - expressions.forall(x => requiredClustering.exists(_.semanticEquals(x))) + expressions.forall(x => requiredClustering.exists(_.sameResult(x))) case _ => false } } } + override def sameResult(other: Expression): Boolean = other match { + case HashPartitioning(exprs, _) => expressions.zip(exprs).forall { + case (l, r) => l.sameResult(r) + } + case _ => false + } + /** * Returns an expression that will produce a valid partition ID(i.e. non-negative and is less * than numPartitions) based on hashing expressions. @@ -265,7 +272,7 @@ case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int) val minSize = Seq(requiredOrdering.size, ordering.size).min requiredOrdering.take(minSize) == ordering.take(minSize) case ClusteredDistribution(requiredClustering, _) => - ordering.map(_.child).forall(x => requiredClustering.exists(_.semanticEquals(x))) + ordering.map(_.child).forall(x => requiredClustering.exists(_.sameResult(x))) case _ => false } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index d2d5011bbcb9..c03d2da1ddee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -297,7 +297,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { // TODO: remove this after we create a physical operator for `RepartitionByExpression`. case operator @ ShuffleExchangeExec(upper: HashPartitioning, child, _) => child.outputPartitioning match { - case lower: HashPartitioning if upper.semanticEquals(lower) => child + case lower: HashPartitioning if upper.sameResult(lower) => child case _ => operator } case operator: SparkPlan => From 049124971eb9d0f84eb98cc33c4cccd65d7a42b7 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Wed, 28 Nov 2018 14:14:02 +0100 Subject: [PATCH 03/24] address comment: add comments --- .../spark/sql/catalyst/expressions/Expression.scala | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index e75247f07d31..a3ed40c0d47a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -199,6 +199,12 @@ abstract class Expression extends TreeNode[Expression] { * cosmetically (i.e. capitalization of names in attributes may be different). * * See [[Canonicalize]] for more details. + * + * This method should be used (instead of `sameResult`) when comparing if 2 expressions are the + * same and one can replace the other (eg. in Optimizer/Analyzer rules when we want to replace + * equivalent expressions). It should not be used (and `sameResult` should be used instead) when + * comparing if 2 expressions produce the same output (in this case `semanticEquals` can be too + * strict). */ def semanticEquals(other: Expression): Boolean = deterministic && other.deterministic && canonicalized == other.canonicalized @@ -206,9 +212,14 @@ abstract class Expression extends TreeNode[Expression] { /** * Returns true when two expressions will always compute the same result, even if the output may * be different, because of different names or similar differences. - * Usually this means they their canonicalized form equals, but it may also not be the case, as + * Usually this means that their canonicalized form equals, but it may also not be the case, as * different output expressions can evaluate to the same result as well (eg. when an expression * is aliased). + * + * This method should be used (instead of `semanticEquals`) when checking if 2 expressions + * produce the same output (eg. as in the case we are interested to check if the ordering is the + * same). It should not be used (and `semanticEquals` should be used instead) when comparing if 2 + * expressions are the same and one can replace the other. */ def sameResult(other: Expression): Boolean = other match { case a: Alias => sameResult(a.child) From 3831be0aba7eaf83bb03152de129c11286290586 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Wed, 28 Nov 2018 14:40:20 +0100 Subject: [PATCH 04/24] address comment --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 2 +- .../spark/sql/catalyst/expressions/Expression.scala | 8 +++----- .../spark/sql/catalyst/expressions/namedExpressions.scala | 4 ---- .../spark/sql/catalyst/plans/physical/partitioning.scala | 7 ------- 4 files changed, 4 insertions(+), 17 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index c2d22c5e7ce6..fdf3784fe4a1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -2542,7 +2542,7 @@ object EliminateUnions extends Rule[LogicalPlan] { * rule can't work for those parameters. */ object CleanupAliases extends Rule[LogicalPlan] { - private def trimAliases(e: Expression): Expression = { + private[catalyst] def trimAliases(e: Expression): Expression = { e.transformDown { case Alias(child, _) => child case MultiAlias(child, _) => child diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index a3ed40c0d47a..14f130cc3920 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.expressions import java.util.Locale import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, TypeCoercion} +import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, TypeCheckResult, TypeCoercion} import org.apache.spark.sql.catalyst.expressions.aggregate.DeclarativeAggregate import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.expressions.codegen.Block._ @@ -221,10 +221,8 @@ abstract class Expression extends TreeNode[Expression] { * same). It should not be used (and `semanticEquals` should be used instead) when comparing if 2 * expressions are the same and one can replace the other. */ - def sameResult(other: Expression): Boolean = other match { - case a: Alias => sameResult(a.child) - case _ => this.semanticEquals(other) - } + final def sameResult(other: Expression): Boolean = + CleanupAliases.trimAliases(this) semanticEquals CleanupAliases.trimAliases(other) /** * Returns a `hashCode` for the calculation performed by this expression. Unlike the standard diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index 395a715598bd..584a2946bd56 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -167,10 +167,6 @@ case class Alias(child: Expression, name: String)( } } - override def sameResult(other: Expression): Boolean = { - this.child.sameResult(other) - } - def newInstance(): NamedExpression = Alias(child, name)(qualifier = qualifier, explicitMetadata = explicitMetadata) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index 87a002ec57ff..abca856acc18 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -232,13 +232,6 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int) } } - override def sameResult(other: Expression): Boolean = other match { - case HashPartitioning(exprs, _) => expressions.zip(exprs).forall { - case (l, r) => l.sameResult(r) - } - case _ => false - } - /** * Returns an expression that will produce a valid partition ID(i.e. non-negative and is less * than numPartitions) based on hashing expressions. From 6c93e708df203cc5a2f3085340ed61bf5c8d90fe Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Wed, 28 Nov 2018 14:51:37 +0100 Subject: [PATCH 05/24] improve comments --- .../apache/spark/sql/catalyst/expressions/Expression.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index 14f130cc3920..eb01f4e25f84 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -201,9 +201,9 @@ abstract class Expression extends TreeNode[Expression] { * See [[Canonicalize]] for more details. * * This method should be used (instead of `sameResult`) when comparing if 2 expressions are the - * same and one can replace the other (eg. in Optimizer/Analyzer rules when we want to replace + * same and one can replace the other (eg. in Optimizer/Analyzer rules where we want to replace * equivalent expressions). It should not be used (and `sameResult` should be used instead) when - * comparing if 2 expressions produce the same output (in this case `semanticEquals` can be too + * comparing if 2 expressions produce the same results (in this case `semanticEquals` can be too * strict). */ def semanticEquals(other: Expression): Boolean = @@ -217,7 +217,7 @@ abstract class Expression extends TreeNode[Expression] { * is aliased). * * This method should be used (instead of `semanticEquals`) when checking if 2 expressions - * produce the same output (eg. as in the case we are interested to check if the ordering is the + * produce the same results (eg. as in the case we are interested to check if the ordering is the * same). It should not be used (and `semanticEquals` should be used instead) when comparing if 2 * expressions are the same and one can replace the other. */ From a306465d3fc0ef2c8ce43cfd14277d3aab4550e2 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Thu, 29 Nov 2018 12:10:47 +0100 Subject: [PATCH 06/24] fix recursive aliases --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index fdf3784fe4a1..65631ed3f0ee 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -2544,8 +2544,8 @@ object EliminateUnions extends Rule[LogicalPlan] { object CleanupAliases extends Rule[LogicalPlan] { private[catalyst] def trimAliases(e: Expression): Expression = { e.transformDown { - case Alias(child, _) => child - case MultiAlias(child, _) => child + case Alias(child, _) => trimAliases(child) + case MultiAlias(child, _) => trimAliases(child) } } From 13aef71df5662f4f8389ece87c4270597de93db5 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Thu, 29 Nov 2018 16:40:48 +0100 Subject: [PATCH 07/24] fix trimAliases --- .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 65631ed3f0ee..d8728bad7297 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -2543,9 +2543,9 @@ object EliminateUnions extends Rule[LogicalPlan] { */ object CleanupAliases extends Rule[LogicalPlan] { private[catalyst] def trimAliases(e: Expression): Expression = { - e.transformDown { - case Alias(child, _) => trimAliases(child) - case MultiAlias(child, _) => trimAliases(child) + e.transformUp { + case Alias(child, _) => child + case MultiAlias(child, _) => child } } From 5c6b9fc52d825c5c6c680d38f8ef8c621dded964 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Fri, 30 Nov 2018 16:46:53 +0100 Subject: [PATCH 08/24] address comments --- .../sql/catalyst/expressions/Expression.scala | 4 ++-- .../plans/physical/partitioning.scala | 18 +++++++++++++++ .../exchange/EnsureRequirements.scala | 13 ++++++++--- .../org/apache/spark/sql/DatasetSuite.scala | 23 +++++++++++++++++++ 4 files changed, 53 insertions(+), 5 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index eb01f4e25f84..55bcb65be00b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -210,8 +210,8 @@ abstract class Expression extends TreeNode[Expression] { deterministic && other.deterministic && canonicalized == other.canonicalized /** - * Returns true when two expressions will always compute the same result, even if the output may - * be different, because of different names or similar differences. + * Returns true when two expressions will always compute the same result, even if the output from + * plan perspective may be different, because of different names or similar differences. * Usually this means that their canonicalized form equals, but it may also not be the case, as * different output expressions can evaluate to the same result as well (eg. when an expression * is aliased). diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index abca856acc18..a4db1329eb43 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -42,6 +42,12 @@ sealed trait Distribution { * matching the given number of partitions. */ def createPartitioning(numPartitions: Int): Partitioning + + /** + * Returns a new `Distribution` with its `Expression`s transformed according to the provided + * function. + */ + def mapExpressions(f: Expression => Expression): Distribution = this } /** @@ -89,6 +95,10 @@ case class ClusteredDistribution( s"the actual number of partitions is $numPartitions.") HashPartitioning(clustering, numPartitions) } + + override def mapExpressions(f: Expression => Expression): ClusteredDistribution = { + copy(clustering = clustering.map(f)) + } } /** @@ -114,6 +124,10 @@ case class HashClusteredDistribution( s"the actual number of partitions is $numPartitions.") HashPartitioning(expressions, numPartitions) } + + override def mapExpressions(f: Expression => Expression): HashClusteredDistribution = { + copy(expressions = expressions.map(f)) + } } /** @@ -135,6 +149,10 @@ case class OrderedDistribution(ordering: Seq[SortOrder]) extends Distribution { override def createPartitioning(numPartitions: Int): Partitioning = { RangePartitioning(ordering, numPartitions) } + + override def mapExpressions(f: Expression => Expression): OrderedDistribution = { + copy(ordering = ordering.map(f(_).asInstanceOf[SortOrder])) + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index c03d2da1ddee..67c0950a5157 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.internal.SQLConf * each operator by inserting [[ShuffleExchangeExec]] Operators where required. Also ensure that * the input partition ordering requirements are met. */ -case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { +case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] with PredicateHelper { private def defaultNumPreShufflePartitions: Int = conf.numShufflePartitions private def targetPostShuffleInputSize: Long = conf.targetPostShuffleInputSize @@ -139,8 +139,15 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { } private def ensureDistributionAndOrdering(operator: SparkPlan): SparkPlan = { - val requiredChildDistributions: Seq[Distribution] = operator.requiredChildDistribution - val requiredChildOrderings: Seq[Seq[SortOrder]] = operator.requiredChildOrdering + val aliasMap = AttributeMap(operator.children.flatMap(_.expressions.collect { + case a: Alias => (a.toAttribute, a.child) + })) + val requiredChildDistributions: Seq[Distribution] = operator.requiredChildDistribution.map { + _.mapExpressions(replaceAlias(_, aliasMap)) + } + val requiredChildOrderings: Seq[Seq[SortOrder]] = operator.requiredChildOrdering.map { + _.map(replaceAlias(_, aliasMap).asInstanceOf[SortOrder]) + } var children: Seq[SparkPlan] = operator.children assert(requiredChildDistributions.length == children.length) assert(requiredChildOrderings.length == children.length) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 82d3b22a4867..e1be20ac10a5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -22,7 +22,10 @@ import java.sql.{Date, Timestamp} import org.apache.spark.SparkException import org.apache.spark.sql.catalyst.encoders.{OuterScopes, RowEncoder} +import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi} +import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning + import org.apache.spark.sql.catalyst.util.sideBySide import org.apache.spark.sql.execution.{LogicalRDD, RDDScanExec} import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec} @@ -1556,6 +1559,26 @@ class DatasetSuite extends QueryTest with SharedSQLContext { df.where($"city".contains(new java.lang.Character('A'))), Seq(Row("Amsterdam"))) } + + test("SPARK-25951: avoid redundant shuffle on rename") { + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + val N = 10 + val t1 = spark.range(N).selectExpr("floor(id/4) as k1") + val t2 = spark.range(N).selectExpr("floor(id/4) as k2") + + val agg1 = t1.groupBy("k1").agg(count(lit("1")).as("cnt1")) + val agg2 = t2.groupBy("k2").agg(count(lit("1")).as("cnt2")).withColumnRenamed("k2", "k3") + val finalPlan = agg1.join(agg2, $"k1" === $"k3") + val exchanges = finalPlan.queryExecution.executedPlan.collect { + case se: ShuffleExchangeExec => se + } + assert(exchanges.size == 2) + assert(!exchanges.exists(_.newPartitioning match { + case HashPartitioning(Seq(a: AttributeReference), _) => a.name == "k3" + case _ => false + })) + } + } } case class TestDataUnion(x: Int, y: Int, z: Int) From 0aaedd88377bb6761bddb1bbf85c3f6857d5a094 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Fri, 30 Nov 2018 16:48:20 +0100 Subject: [PATCH 09/24] fix import --- sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index e1be20ac10a5..730c118e7976 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -25,7 +25,6 @@ import org.apache.spark.sql.catalyst.encoders.{OuterScopes, RowEncoder} import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi} import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning - import org.apache.spark.sql.catalyst.util.sideBySide import org.apache.spark.sql.execution.{LogicalRDD, RDDScanExec} import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec} From 5bca5e348f9ee6d1f00a5f68666e991160498ed7 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Fri, 30 Nov 2018 19:10:55 +0100 Subject: [PATCH 10/24] fix --- .../exchange/EnsureRequirements.scala | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index 67c0950a5157..beda197ee9c8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -139,22 +139,20 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] with Predic } private def ensureDistributionAndOrdering(operator: SparkPlan): SparkPlan = { - val aliasMap = AttributeMap(operator.children.flatMap(_.expressions.collect { - case a: Alias => (a.toAttribute, a.child) - })) - val requiredChildDistributions: Seq[Distribution] = operator.requiredChildDistribution.map { - _.mapExpressions(replaceAlias(_, aliasMap)) - } - val requiredChildOrderings: Seq[Seq[SortOrder]] = operator.requiredChildOrdering.map { - _.map(replaceAlias(_, aliasMap).asInstanceOf[SortOrder]) - } + val requiredChildDistributions: Seq[Distribution] = operator.requiredChildDistribution + val requiredChildOrderings: Seq[Seq[SortOrder]] = operator.requiredChildOrdering var children: Seq[SparkPlan] = operator.children assert(requiredChildDistributions.length == children.length) assert(requiredChildOrderings.length == children.length) + val aliasMap = AttributeMap[Expression](children.flatMap(_.expressions.collect { + case a: Alias => (a.toAttribute, a) + })) + // Ensure that the operator's children satisfy their output distribution requirements. children = children.zip(requiredChildDistributions).map { - case (child, distribution) if child.outputPartitioning.satisfies(distribution) => + case (child, distribution) if child.outputPartitioning.satisfies( + distribution.mapExpressions(replaceAlias(_, aliasMap))) => child case (child, BroadcastDistribution(mode)) => BroadcastExchangeExec(mode, child) @@ -216,8 +214,10 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] with Predic // Now that we've performed any necessary shuffles, add sorts to guarantee output orderings: children = children.zip(requiredChildOrderings).map { case (child, requiredOrdering) => + val requiredOrderingWithAlias = + requiredOrdering.map(replaceAlias(_, aliasMap).asInstanceOf[SortOrder]) // If child.outputOrdering already satisfies the requiredOrdering, we do not need to sort. - if (SortOrder.orderingSatisfies(child.outputOrdering, requiredOrdering)) { + if (SortOrder.orderingSatisfies(child.outputOrdering, requiredOrderingWithAlias)) { child } else { SortExec(requiredOrdering, global = false, child = child) From 1f797dff9c28adccec3f32acaf361e3b94596ba0 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Mon, 3 Dec 2018 15:37:04 +0100 Subject: [PATCH 11/24] add tests --- .../SubexpressionEliminationSuite.scala | 22 +++++++++++++++++++ .../org/apache/spark/sql/DatasetSuite.scala | 12 ++++++++++ 2 files changed, 34 insertions(+) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala index 1fa185cc77eb..a23b4e82610b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala @@ -35,13 +35,16 @@ class SubexpressionEliminationSuite extends SparkFunSuite { assert(b1 != b2) assert(a != b1) assert(b1.semanticEquals(b2)) + assert(b1.sameResult(b2)) assert(!b1.semanticEquals(a)) + assert(!b1.sameResult(a)) assert(a.hashCode != b1.hashCode) assert(b1.hashCode != b2.hashCode) assert(b1.semanticHash() == b2.semanticHash()) assert(a != b3) assert(a.hashCode != b3.hashCode) assert(a.semanticEquals(b3)) + assert(a.sameResult(b3)) } test("Expression Equivalence - basic") { @@ -161,6 +164,25 @@ class SubexpressionEliminationSuite extends SparkFunSuite { // only ifExpr and its predicate expression assert(equivalence.getAllEquivalentExprs.count(_.size == 1) == 2) } + + test("SPARK-25951: Aliases handling") { + val a = AttributeReference("a", IntegerType)() + val oneAlias = Alias(a, "a1")() + val twoAlias = Alias(Alias(a, "a2")(), "a1")() + + assert(!a.semanticEquals(oneAlias)) + assert(a.sameResult(oneAlias) && oneAlias.sameResult(a)) + assert(!a.semanticEquals(twoAlias)) + assert(a.sameResult(twoAlias) && twoAlias.sameResult(a)) + assert(!oneAlias.semanticEquals(twoAlias)) + assert(oneAlias.sameResult(twoAlias) && twoAlias.sameResult(oneAlias)) + + val a2 = AttributeReference("a", IntegerType)() + assert(!a.semanticEquals(a2)) + assert(!a2.sameResult(a)) + assert(!a2.sameResult(oneAlias)) + assert(!a2.sameResult(twoAlias)) + } } case class CodegenFallbackExpression(child: Expression) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index b6790219cef6..2c227388f2f5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -1568,6 +1568,18 @@ class DatasetSuite extends QueryTest with SharedSQLContext { case HashPartitioning(Seq(a: AttributeReference), _) => a.name == "k3" case _ => false })) + + // In this case the requirement is not satisfied + val agg3 = t2.groupBy("k2").agg(count(lit("1")).as("cnt2")).withColumn("k3", $"k2" + 1) + val finalPlan2 = agg1.join(agg3, $"k1" === $"k3") + val exchanges2 = finalPlan2.queryExecution.executedPlan.collect { + case se: ShuffleExchangeExec => se + } + assert(exchanges2.size == 3) + assert(exchanges2.exists(_.newPartitioning match { + case HashPartitioning(Seq(a: AttributeReference), _) => a.name == "k3" + case _ => false + })) } } From bf1d04a819855737d1096b61b1c3d46010f50dee Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Tue, 4 Dec 2018 19:35:33 +0100 Subject: [PATCH 12/24] switch approach: update outputPartitioning --- .../plans/physical/partitioning.scala | 72 ++++++++++++++----- .../aggregate/HashAggregateExec.scala | 5 +- .../aggregate/ObjectHashAggregateExec.scala | 5 +- .../aggregate/SortAggregateExec.scala | 5 +- .../execution/basicPhysicalOperators.scala | 4 +- .../exchange/EnsureRequirements.scala | 12 +--- 6 files changed, 72 insertions(+), 31 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index a4db1329eb43..21dc045dd84a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -42,12 +42,6 @@ sealed trait Distribution { * matching the given number of partitions. */ def createPartitioning(numPartitions: Int): Partitioning - - /** - * Returns a new `Distribution` with its `Expression`s transformed according to the provided - * function. - */ - def mapExpressions(f: Expression => Expression): Distribution = this } /** @@ -95,10 +89,6 @@ case class ClusteredDistribution( s"the actual number of partitions is $numPartitions.") HashPartitioning(clustering, numPartitions) } - - override def mapExpressions(f: Expression => Expression): ClusteredDistribution = { - copy(clustering = clustering.map(f)) - } } /** @@ -124,10 +114,6 @@ case class HashClusteredDistribution( s"the actual number of partitions is $numPartitions.") HashPartitioning(expressions, numPartitions) } - - override def mapExpressions(f: Expression => Expression): HashClusteredDistribution = { - copy(expressions = expressions.map(f)) - } } /** @@ -149,10 +135,6 @@ case class OrderedDistribution(ordering: Seq[SortOrder]) extends Distribution { override def createPartitioning(numPartitions: Int): Partitioning = { RangePartitioning(ordering, numPartitions) } - - override def mapExpressions(f: Expression => Expression): OrderedDistribution = { - copy(ordering = ordering.map(f(_).asInstanceOf[SortOrder])) - } } /** @@ -206,6 +188,60 @@ trait Partitioning { } } +object Partitioning { + /** + * Gets as input the `Partitioning` of an expressions and returns the valid `Partitioning`s for + * the node w.r.t its output and its expressions. + */ + def updatePartitioningWithNewOutput( + inputPartitioning: Partitioning, + expressions: Seq[NamedExpression], + outputSet: AttributeSet): Partitioning = { + inputPartitioning match { + case partitioning: Expression => + val invalidReferences = partitioning.references.filterNot(outputSet.contains) + val exprToEquiv = invalidReferences.map { e => e -> expressions.filter(_.sameResult(e)) } + val initValue = partitioning match { + case PartitioningCollection(partitionings) => partitionings + case other => Seq(other) + } + val validPartitionings = exprToEquiv.foldLeft(initValue) { + case (partitionings, (toReplace, equivalents)) => + if (equivalents.isEmpty) { + partitionings.map { + case hp: HashPartitioning if hp.references.contains(toReplace) => + UnknownPartitioning(hp.numPartitions) + case rp: RangePartitioning if rp.references.contains(toReplace) => + val validExprs = rp.children.takeWhile(!_.references.contains(toReplace)) + if (validExprs.isEmpty) { + UnknownPartitioning(rp.numPartitions) + } else { + RangePartitioning(validExprs, rp.numPartitions) + } + case other => other + } + } else { + partitionings.flatMap { + case p: Expression if p.references.contains(toReplace) => + equivalents.map { equiv => + p.transformDown { + case e if e == toReplace => equiv.toAttribute + }.asInstanceOf[Partitioning] + } + case other => Seq(other) + } + } + } + if (validPartitionings.size == 1) { + validPartitionings.head + } else { + PartitioningCollection(validPartitionings) + } + case other => other + } + } +} + case class UnknownPartitioning(numPartitions: Int) extends Partitioning /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala index 4827f838fc51..c74e08b68264 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala @@ -67,7 +67,10 @@ case class HashAggregateExec( override def output: Seq[Attribute] = resultExpressions.map(_.toAttribute) - override def outputPartitioning: Partitioning = child.outputPartitioning + override def outputPartitioning: Partitioning = { + Partitioning.updatePartitioningWithNewOutput( + child.outputPartitioning, resultExpressions, outputSet) + } override def producedAttributes: AttributeSet = AttributeSet(aggregateAttributes) ++ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala index 7145bb03028d..3e1638c236d7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala @@ -95,7 +95,10 @@ case class ObjectHashAggregateExec( } } - override def outputPartitioning: Partitioning = child.outputPartitioning + override def outputPartitioning: Partitioning = { + Partitioning.updatePartitioningWithNewOutput( + child.outputPartitioning, resultExpressions, outputSet) + } protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") { val numOutputRows = longMetric("numOutputRows") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala index d732b905dcdd..3e3c00c6c2a4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala @@ -66,7 +66,10 @@ case class SortAggregateExec( groupingExpressions.map(SortOrder(_, Ascending)) :: Nil } - override def outputPartitioning: Partitioning = child.outputPartitioning + override def outputPartitioning: Partitioning = { + Partitioning.updatePartitioningWithNewOutput( + child.outputPartitioning, resultExpressions, outputSet) + } override def outputOrdering: Seq[SortOrder] = { groupingExpressions.map(SortOrder(_, Ascending)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 09effe087e19..fb6c71327032 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -76,7 +76,9 @@ case class ProjectExec(projectList: Seq[NamedExpression], child: SparkPlan) override def outputOrdering: Seq[SortOrder] = child.outputOrdering - override def outputPartitioning: Partitioning = child.outputPartitioning + override def outputPartitioning: Partitioning = { + Partitioning.updatePartitioningWithNewOutput(child.outputPartitioning, projectList, outputSet) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index beda197ee9c8..434f6bbb3356 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -35,7 +35,7 @@ import org.apache.spark.sql.internal.SQLConf * each operator by inserting [[ShuffleExchangeExec]] Operators where required. Also ensure that * the input partition ordering requirements are met. */ -case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] with PredicateHelper { +case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { private def defaultNumPreShufflePartitions: Int = conf.numShufflePartitions private def targetPostShuffleInputSize: Long = conf.targetPostShuffleInputSize @@ -145,14 +145,10 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] with Predic assert(requiredChildDistributions.length == children.length) assert(requiredChildOrderings.length == children.length) - val aliasMap = AttributeMap[Expression](children.flatMap(_.expressions.collect { - case a: Alias => (a.toAttribute, a) - })) // Ensure that the operator's children satisfy their output distribution requirements. children = children.zip(requiredChildDistributions).map { - case (child, distribution) if child.outputPartitioning.satisfies( - distribution.mapExpressions(replaceAlias(_, aliasMap))) => + case (child, distribution) if child.outputPartitioning.satisfies(distribution) => child case (child, BroadcastDistribution(mode)) => BroadcastExchangeExec(mode, child) @@ -214,10 +210,8 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] with Predic // Now that we've performed any necessary shuffles, add sorts to guarantee output orderings: children = children.zip(requiredChildOrderings).map { case (child, requiredOrdering) => - val requiredOrderingWithAlias = - requiredOrdering.map(replaceAlias(_, aliasMap).asInstanceOf[SortOrder]) // If child.outputOrdering already satisfies the requiredOrdering, we do not need to sort. - if (SortOrder.orderingSatisfies(child.outputOrdering, requiredOrderingWithAlias)) { + if (SortOrder.orderingSatisfies(child.outputOrdering, requiredOrdering)) { child } else { SortExec(requiredOrdering, global = false, child = child) From e4f617fc7e47d7c49f3d773ac2d91c5508c0a239 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Wed, 5 Dec 2018 12:20:54 +0100 Subject: [PATCH 13/24] fix ut error --- .../spark/sql/catalyst/plans/physical/partitioning.scala | 9 +++++++-- .../sql/execution/exchange/EnsureRequirements.scala | 1 - 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index 21dc045dd84a..09ed2cb768b3 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -231,11 +231,16 @@ object Partitioning { case other => Seq(other) } } - } + }.distinct if (validPartitionings.size == 1) { validPartitionings.head } else { - PartitioningCollection(validPartitionings) + validPartitionings.filterNot(_.isInstanceOf[UnknownPartitioning]) match { + case Seq() => PartitioningCollection(validPartitionings) + case Seq(knownPartitioning) => knownPartitioning + case knownPartitionings => PartitioningCollection(knownPartitionings) + } + } case other => other } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index 434f6bbb3356..c03d2da1ddee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -145,7 +145,6 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { assert(requiredChildDistributions.length == children.length) assert(requiredChildOrderings.length == children.length) - // Ensure that the operator's children satisfy their output distribution requirements. children = children.zip(requiredChildDistributions).map { case (child, distribution) if child.outputPartitioning.satisfies(distribution) => From bca3e873b3193bcccd68b4e042fec5c985718d5b Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Fri, 7 Dec 2018 18:06:35 +0100 Subject: [PATCH 14/24] Add test suite dedicated to partitioning --- .../plans/physical/partitioning.scala | 7 +- .../aggregate/HashAggregateExec.scala | 2 +- .../sql/execution/PartitioningSuite.scala | 247 ++++++++++++++++++ 3 files changed, 253 insertions(+), 3 deletions(-) create mode 100644 sql/core/src/test/scala/org/apache/spark/sql/execution/PartitioningSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index 09ed2cb768b3..9befcda4c3a2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -199,8 +199,11 @@ object Partitioning { outputSet: AttributeSet): Partitioning = { inputPartitioning match { case partitioning: Expression => - val invalidReferences = partitioning.references.filterNot(outputSet.contains) - val exprToEquiv = invalidReferences.map { e => e -> expressions.filter(_.sameResult(e)) } + val exprToEquiv = partitioning.references.map { attr => + attr -> expressions.filter(_.sameResult(attr)) + }.filterNot { case (attr, exprs) => + exprs.size == 1 && exprs.forall(_ == attr) + } val initValue = partitioning match { case PartitioningCollection(partitionings) => partitionings case other => Seq(other) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala index c74e08b68264..962024512c4d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala @@ -87,7 +87,7 @@ case class HashAggregateExec( // This is for testing. We force TungstenAggregationIterator to fall back to the unsafe row hash // map and/or the sort-based aggregation once it has processed a given number of input rows. - private val testFallbackStartsAt: Option[(Int, Int)] = { + lazy val testFallbackStartsAt: Option[(Int, Int)] = { sqlContext.getConf("spark.sql.TungstenAggregate.testFallbackStartsAt", null) match { case null | "" => None case fallbackStartsAt => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PartitioningSuite.scala new file mode 100644 index 000000000000..2ab987372f64 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PartitioningSuite.scala @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.expressions.{Alias, Ascending, Attribute, AttributeReference, Expression, NamedExpression, SortOrder} +import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, PartitioningCollection, RangePartitioning, UnknownPartitioning} +import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec} +import org.apache.spark.sql.types.IntegerType + +class PartitioningSuite extends SparkFunSuite { + + private val attr1 = AttributeReference("attr1", IntegerType)() + private val attr2 = AttributeReference("attr2", IntegerType)() + private val aliasedAttr1 = Alias(attr1, "alias_attr1")() + private val aliasedAttr2 = Alias(attr2, "alias_attr2")() + private val aliasedAttr1Twice = Alias(Alias(attr1, "alias_attr1")(), "alias_attr1_2")() + + private val planHashPartitioned1Attr = PartitionedSparkPlan( + output = Seq(attr1), outputPartitioning = HashPartitioning(Seq(attr1), 10)) + private val planHashPartitioned2Attr = PartitionedSparkPlan( + output = Seq(attr1, attr2), outputPartitioning = HashPartitioning(Seq(attr1, attr2), 10)) + private val planRangePartitioned1Attr = PartitionedSparkPlan( + output = Seq(attr1), outputPartitioning = simpleRangePartitioning(Seq(attr1), 10)) + private val planRangePartitioned2Attr = PartitionedSparkPlan( + output = Seq(attr1, attr2), + outputPartitioning = simpleRangePartitioning(Seq(attr1, attr2), 10)) + + def testPartitioning( + outputExpressions: Seq[NamedExpression], + inputPlan: SparkPlan, + expectedPartitioning: Partitioning): Unit = { + testProjectPartitioning(outputExpressions, inputPlan, expectedPartitioning) + testAggregatePartitioning(outputExpressions, inputPlan, expectedPartitioning) + } + + def testProjectPartitioning( + projectList: Seq[NamedExpression], + inputPlan: SparkPlan, + expectedPartitioning: Partitioning): Unit = { + assert(ProjectExec(projectList, inputPlan).outputPartitioning == expectedPartitioning) + } + + def testAggregatePartitioning( + groupingExprs: Seq[NamedExpression], + inputPlan: SparkPlan, + expectedPartitioning: Partitioning): Unit = { + val hashAgg = HashAggregateExec(requiredChildDistributionExpressions = None, + groupingExpressions = groupingExprs, + aggregateExpressions = Seq.empty, + aggregateAttributes = Seq.empty, + initialInputBufferOffset = 0, + resultExpressions = groupingExprs, + child = inputPlan) + val sortAgg = SortAggregateExec(requiredChildDistributionExpressions = None, + groupingExpressions = groupingExprs, + aggregateExpressions = Seq.empty, + aggregateAttributes = Seq.empty, + initialInputBufferOffset = 0, + resultExpressions = groupingExprs, + child = inputPlan) + val objAgg = ObjectHashAggregateExec(requiredChildDistributionExpressions = None, + groupingExpressions = groupingExprs, + aggregateExpressions = Seq.empty, + aggregateAttributes = Seq.empty, + initialInputBufferOffset = 0, + resultExpressions = groupingExprs, + child = inputPlan) + assert(hashAgg.outputPartitioning == expectedPartitioning) + assert(sortAgg.outputPartitioning == expectedPartitioning) + assert(objAgg.outputPartitioning == expectedPartitioning) + } + + def simpleRangePartitioning(exprs: Seq[Expression], numPartitions: Int): RangePartitioning = { + RangePartitioning(exprs.map(e => SortOrder(e, Ascending)), numPartitions) + } + + test("HashPartitioning with simple attribute rename") { + testPartitioning( + Seq(aliasedAttr1), + planHashPartitioned1Attr, + HashPartitioning(Seq(aliasedAttr1.toAttribute), 10)) + testPartitioning( + Seq(aliasedAttr1Twice), + planHashPartitioned1Attr, + HashPartitioning(Seq(aliasedAttr1Twice.toAttribute), 10)) + + testPartitioning( + Seq(aliasedAttr1, attr2), + planHashPartitioned2Attr, + HashPartitioning(Seq(aliasedAttr1.toAttribute, attr2), 10)) + testPartitioning( + Seq(aliasedAttr1Twice, attr2), + planHashPartitioned2Attr, + HashPartitioning(Seq(aliasedAttr1Twice.toAttribute, attr2), 10)) + + testPartitioning( + Seq(aliasedAttr1, aliasedAttr2), + planHashPartitioned2Attr, + HashPartitioning(Seq(aliasedAttr1.toAttribute, aliasedAttr2.toAttribute), 10)) + testPartitioning( + Seq(aliasedAttr1Twice, aliasedAttr2), + planHashPartitioned2Attr, + HashPartitioning(Seq(aliasedAttr1Twice.toAttribute, aliasedAttr2.toAttribute), 10)) + } + + test("HashPartitioning with double attribute rename") { + testPartitioning( + Seq(aliasedAttr1, aliasedAttr1Twice), + planHashPartitioned1Attr, + PartitioningCollection(Seq( + HashPartitioning(Seq(aliasedAttr1.toAttribute), 10), + HashPartitioning(Seq(aliasedAttr1Twice.toAttribute), 10)))) + testPartitioning( + Seq(aliasedAttr1, aliasedAttr1Twice, attr2), + planHashPartitioned2Attr, + PartitioningCollection(Seq( + HashPartitioning(Seq(aliasedAttr1.toAttribute, attr2), 10), + HashPartitioning(Seq(aliasedAttr1Twice.toAttribute, attr2), 10)))) + testPartitioning( + Seq(aliasedAttr1, aliasedAttr1Twice, attr2, aliasedAttr2), + planHashPartitioned2Attr, + PartitioningCollection(Seq( + HashPartitioning(Seq(aliasedAttr1.toAttribute, attr2), 10), + HashPartitioning(Seq(aliasedAttr1.toAttribute, aliasedAttr2.toAttribute), 10), + HashPartitioning(Seq(aliasedAttr1Twice.toAttribute, attr2), 10), + HashPartitioning(Seq(aliasedAttr1Twice.toAttribute, aliasedAttr2.toAttribute), 10)))) + } + + test("HashPartitioning without attribute in output") { + testPartitioning( + Seq(attr2), + planHashPartitioned1Attr, + UnknownPartitioning(10)) + testPartitioning( + Seq(attr1), + planHashPartitioned2Attr, + UnknownPartitioning(10)) + } + + test("HashPartitioning without renaming") { + testPartitioning( + Seq(attr1), + planHashPartitioned1Attr, + HashPartitioning(Seq(attr1), 10)) + testPartitioning( + Seq(attr1, attr2), + planHashPartitioned2Attr, + HashPartitioning(Seq(attr1, attr2), 10)) + } + + test("RangePartitioning with simple attribute rename") { + testPartitioning( + Seq(aliasedAttr1), + planRangePartitioned1Attr, + simpleRangePartitioning(Seq(aliasedAttr1.toAttribute), 10)) + testPartitioning( + Seq(aliasedAttr1Twice), + planRangePartitioned1Attr, + simpleRangePartitioning(Seq(aliasedAttr1Twice.toAttribute), 10)) + + testPartitioning( + Seq(aliasedAttr1, attr2), + planRangePartitioned2Attr, + simpleRangePartitioning(Seq(aliasedAttr1.toAttribute, attr2), 10)) + testPartitioning( + Seq(aliasedAttr1Twice, attr2), + planRangePartitioned2Attr, + simpleRangePartitioning(Seq(aliasedAttr1Twice.toAttribute, attr2), 10)) + + testPartitioning( + Seq(aliasedAttr1, aliasedAttr2), + planRangePartitioned2Attr, + simpleRangePartitioning(Seq(aliasedAttr1.toAttribute, aliasedAttr2.toAttribute), 10)) + testPartitioning( + Seq(aliasedAttr1Twice, aliasedAttr2), + planRangePartitioned2Attr, + simpleRangePartitioning(Seq(aliasedAttr1Twice.toAttribute, aliasedAttr2.toAttribute), 10)) + } + + test("RangePartitioning with double attribute rename") { + testPartitioning( + Seq(aliasedAttr1, aliasedAttr1Twice), + planRangePartitioned1Attr, + PartitioningCollection(Seq( + simpleRangePartitioning(Seq(aliasedAttr1.toAttribute), 10), + simpleRangePartitioning(Seq(aliasedAttr1Twice.toAttribute), 10)))) + testPartitioning( + Seq(aliasedAttr1, aliasedAttr1Twice, attr2), + planRangePartitioned2Attr, + PartitioningCollection(Seq( + simpleRangePartitioning(Seq(aliasedAttr1.toAttribute, attr2), 10), + simpleRangePartitioning(Seq(aliasedAttr1Twice.toAttribute, attr2), 10)))) + testPartitioning( + Seq(aliasedAttr1, aliasedAttr1Twice, attr2, aliasedAttr2), + planRangePartitioned2Attr, + PartitioningCollection(Seq( + simpleRangePartitioning(Seq(aliasedAttr1.toAttribute, attr2), 10), + simpleRangePartitioning(Seq(aliasedAttr1.toAttribute, aliasedAttr2.toAttribute), 10), + simpleRangePartitioning(Seq(aliasedAttr1Twice.toAttribute, attr2), 10), + simpleRangePartitioning(Seq(aliasedAttr1Twice.toAttribute, aliasedAttr2.toAttribute), 10)))) + } + + test("RangePartitioning without attribute in output") { + testPartitioning( + Seq(attr2), + planRangePartitioned2Attr, + UnknownPartitioning(10)) + testPartitioning( + Seq(attr1), + planRangePartitioned2Attr, + simpleRangePartitioning(Seq(attr1), 10)) + } + + test("RangePartitioning without renaming") { + testPartitioning( + Seq(attr1), + planRangePartitioned1Attr, + simpleRangePartitioning(Seq(attr1), 10)) + testPartitioning( + Seq(attr1, attr2), + planRangePartitioned2Attr, + simpleRangePartitioning(Seq(attr1, attr2), 10)) + } +} + +private case class PartitionedSparkPlan( + override val output: Seq[Attribute] = Seq.empty, + override val outputPartitioning: Partitioning = UnknownPartitioning(0), + override val children: Seq[SparkPlan] = Nil) extends SparkPlan { + override protected def doExecute() = throw new UnsupportedOperationException +} From 0f68a41beb15da35c6839ecef18ec69d29e133dc Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Fri, 7 Dec 2018 20:32:19 +0100 Subject: [PATCH 15/24] fix UT failures --- .../execution/aggregate/HashAggregateExec.scala | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala index 962024512c4d..f03977db32fc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala @@ -87,12 +87,16 @@ case class HashAggregateExec( // This is for testing. We force TungstenAggregationIterator to fall back to the unsafe row hash // map and/or the sort-based aggregation once it has processed a given number of input rows. - lazy val testFallbackStartsAt: Option[(Int, Int)] = { - sqlContext.getConf("spark.sql.TungstenAggregate.testFallbackStartsAt", null) match { - case null | "" => None - case fallbackStartsAt => - val splits = fallbackStartsAt.split(",").map(_.trim) - Some((splits.head.toInt, splits.last.toInt)) + private val testFallbackStartsAt: Option[(Int, Int)] = { + if (Utils.isTesting && sqlContext == null) { + None + } else { + sqlContext.getConf("spark.sql.TungstenAggregate.testFallbackStartsAt", null) match { + case null | "" => None + case fallbackStartsAt => + val splits = fallbackStartsAt.split(",").map(_.trim) + Some((splits.head.toInt, splits.last.toInt)) + } } } From 9af290ef546852f895672d65046fbe779ed09efb Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Wed, 30 Jan 2019 15:36:06 +0100 Subject: [PATCH 16/24] fix merge --- sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 902fa9e7ee5b..507411dc1411 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -27,7 +27,6 @@ import org.apache.spark.sql.catalyst.expressions.AttributeReference import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi} import org.apache.spark.sql.catalyst.plans.logical.SerializeFromObject import org.apache.spark.sql.catalyst.plans.physical.HashPartitioning -import org.apache.spark.sql.catalyst.plans.logical.SerializeFromObject import org.apache.spark.sql.catalyst.util.sideBySide import org.apache.spark.sql.execution.{LogicalRDD, RDDScanExec, SQLExecution} import org.apache.spark.sql.execution.exchange.{BroadcastExchangeExec, ShuffleExchangeExec} From 5904cf9c9776b7b1cb6a2b06791a0302fadbcbc1 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Thu, 31 Jan 2019 17:55:45 +0100 Subject: [PATCH 17/24] address comments --- .../sql/catalyst/expressions/Expression.scala | 23 +------------------ .../sql/catalyst/expressions/SortOrder.scala | 2 +- .../plans/physical/partitioning.scala | 10 ++++---- .../SubexpressionEliminationSuite.scala | 22 ------------------ .../exchange/EnsureRequirements.scala | 2 +- .../spark/sql/execution/PlannerSuite.scala | 17 -------------- 6 files changed, 9 insertions(+), 67 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index 57436947bfea..0de57bb2fd78 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -223,35 +223,14 @@ abstract class Expression extends TreeNode[Expression] { } /** - * Returns true when two expressions will always compute the same output, even if they differ + * Returns true when two expressions will always compute the same result, even if they differ * cosmetically (i.e. capitalization of names in attributes may be different). * * See [[Canonicalize]] for more details. - * - * This method should be used (instead of `sameResult`) when comparing if 2 expressions are the - * same and one can replace the other (eg. in Optimizer/Analyzer rules where we want to replace - * equivalent expressions). It should not be used (and `sameResult` should be used instead) when - * comparing if 2 expressions produce the same results (in this case `semanticEquals` can be too - * strict). */ def semanticEquals(other: Expression): Boolean = deterministic && other.deterministic && canonicalized == other.canonicalized - /** - * Returns true when two expressions will always compute the same result, even if the output from - * plan perspective may be different, because of different names or similar differences. - * Usually this means that their canonicalized form equals, but it may also not be the case, as - * different output expressions can evaluate to the same result as well (eg. when an expression - * is aliased). - * - * This method should be used (instead of `semanticEquals`) when checking if 2 expressions - * produce the same results (eg. as in the case we are interested to check if the ordering is the - * same). It should not be used (and `semanticEquals` should be used instead) when comparing if 2 - * expressions are the same and one can replace the other. - */ - final def sameResult(other: Expression): Boolean = - CleanupAliases.trimAliases(this) semanticEquals CleanupAliases.trimAliases(other) - /** * Returns a `hashCode` for the calculation performed by this expression. Unlike the standard * `hashCode`, an attempt has been made to eliminate cosmetic differences. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala index 915a4667981a..536276b5cb29 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SortOrder.scala @@ -86,7 +86,7 @@ case class SortOrder( def isAscending: Boolean = direction == Ascending def satisfies(required: SortOrder): Boolean = { - (sameOrderExpressions + child).exists(required.child.sameResult) && + (sameOrderExpressions + child).exists(required.child.semanticEquals) && direction == required.direction && nullOrdering == required.nullOrdering } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index 33af3353ca15..a1d26a6d4ac2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.catalyst.plans.physical +import org.apache.spark.sql.catalyst.analysis.CleanupAliases import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.types.{DataType, IntegerType} @@ -198,7 +199,8 @@ object Partitioning { inputPartitioning match { case partitioning: Expression => val exprToEquiv = partitioning.references.map { attr => - attr -> expressions.filter(_.sameResult(attr)) + attr -> expressions.filter(e => + CleanupAliases.trimAliases(e).semanticEquals(attr)) }.filterNot { case (attr, exprs) => exprs.size == 1 && exprs.forall(_ == attr) } @@ -283,10 +285,10 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int) required match { case h: HashClusteredDistribution => expressions.length == h.expressions.length && expressions.zip(h.expressions).forall { - case (l, r) => l.sameResult(r) + case (l, r) => l.semanticEquals(r) } case ClusteredDistribution(requiredClustering, _) => - expressions.forall(x => requiredClustering.exists(_.sameResult(x))) + expressions.forall(x => requiredClustering.exists(_.semanticEquals(x))) case _ => false } } @@ -341,7 +343,7 @@ case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int) val minSize = Seq(requiredOrdering.size, ordering.size).min requiredOrdering.take(minSize) == ordering.take(minSize) case ClusteredDistribution(requiredClustering, _) => - ordering.map(_.child).forall(x => requiredClustering.exists(_.sameResult(x))) + ordering.map(_.child).forall(x => requiredClustering.exists(_.semanticEquals(x))) case _ => false } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala index a23b4e82610b..1fa185cc77eb 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/SubexpressionEliminationSuite.scala @@ -35,16 +35,13 @@ class SubexpressionEliminationSuite extends SparkFunSuite { assert(b1 != b2) assert(a != b1) assert(b1.semanticEquals(b2)) - assert(b1.sameResult(b2)) assert(!b1.semanticEquals(a)) - assert(!b1.sameResult(a)) assert(a.hashCode != b1.hashCode) assert(b1.hashCode != b2.hashCode) assert(b1.semanticHash() == b2.semanticHash()) assert(a != b3) assert(a.hashCode != b3.hashCode) assert(a.semanticEquals(b3)) - assert(a.sameResult(b3)) } test("Expression Equivalence - basic") { @@ -164,25 +161,6 @@ class SubexpressionEliminationSuite extends SparkFunSuite { // only ifExpr and its predicate expression assert(equivalence.getAllEquivalentExprs.count(_.size == 1) == 2) } - - test("SPARK-25951: Aliases handling") { - val a = AttributeReference("a", IntegerType)() - val oneAlias = Alias(a, "a1")() - val twoAlias = Alias(Alias(a, "a2")(), "a1")() - - assert(!a.semanticEquals(oneAlias)) - assert(a.sameResult(oneAlias) && oneAlias.sameResult(a)) - assert(!a.semanticEquals(twoAlias)) - assert(a.sameResult(twoAlias) && twoAlias.sameResult(a)) - assert(!oneAlias.semanticEquals(twoAlias)) - assert(oneAlias.sameResult(twoAlias) && twoAlias.sameResult(oneAlias)) - - val a2 = AttributeReference("a", IntegerType)() - assert(!a.semanticEquals(a2)) - assert(!a2.sameResult(a)) - assert(!a2.sameResult(oneAlias)) - assert(!a2.sameResult(twoAlias)) - } } case class CodegenFallbackExpression(child: Expression) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index c03d2da1ddee..d2d5011bbcb9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -297,7 +297,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] { // TODO: remove this after we create a physical operator for `RepartitionByExpression`. case operator @ ShuffleExchangeExec(upper: HashPartitioning, child, _) => child.outputPartitioning match { - case lower: HashPartitioning if upper.sameResult(lower) => child + case lower: HashPartitioning if upper.semanticEquals(lower) => child case _ => operator } case operator: SparkPlan => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 83ce93ca9655..142ab6170a73 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -780,23 +780,6 @@ class PlannerSuite extends SharedSQLContext { classOf[PartitioningCollection]) } } - - test("SPARK-25951: avoid redundant shuffle on rename") { - val renamedA = Alias(exprA, "a")() - val doubleRename = Alias(Alias(exprA, "a")(), "b")() - val plan1 = ShuffleExchangeExec( - HashPartitioning(exprA :: Nil, 5), - DummySparkPlan(outputPartitioning = HashPartitioning(doubleRename :: Nil, 5))) - val plan2 = ShuffleExchangeExec( - HashPartitioning(exprA :: Nil, 5), - DummySparkPlan(outputPartitioning = HashPartitioning(renamedA :: Nil, 5))) - val smjExec = SortMergeJoinExec( - doubleRename :: Nil, renamedA :: Nil, Inner, None, plan1, plan2) - - val outputPlan = EnsureRequirements(spark.sessionState.conf).apply(smjExec) - assertDistributionRequirementsAreSatisfied(outputPlan) - assert(outputPlan.collect { case _: ShuffleExchangeExec => true }.isEmpty) - } } // Used for unit-testing EnsureRequirements From 205f1b7f6a01ecab3d8ced4ea98c1bd17bd2960f Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Thu, 31 Jan 2019 17:57:16 +0100 Subject: [PATCH 18/24] cleanup --- .../org/apache/spark/sql/catalyst/expressions/Expression.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index 0de57bb2fd78..d5d119543da7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.catalyst.expressions import java.util.Locale import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.analysis.{CleanupAliases, TypeCheckResult, TypeCoercion} +import org.apache.spark.sql.catalyst.analysis.{TypeCheckResult, TypeCoercion} import org.apache.spark.sql.catalyst.expressions.aggregate.DeclarativeAggregate import org.apache.spark.sql.catalyst.expressions.codegen._ import org.apache.spark.sql.catalyst.expressions.codegen.Block._ From f47d5df0336aa0a61356502140968979e2ff6d3c Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Fri, 1 Feb 2019 09:20:50 +0100 Subject: [PATCH 19/24] address comment --- .../plans/physical/partitioning.scala | 49 ++++++++++++------- .../aggregate/HashAggregateExec.scala | 3 +- .../aggregate/ObjectHashAggregateExec.scala | 3 +- .../aggregate/SortAggregateExec.scala | 3 +- .../execution/basicPhysicalOperators.scala | 2 +- 5 files changed, 34 insertions(+), 26 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index a1d26a6d4ac2..e3fee1a4508e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -185,18 +185,19 @@ trait Partitioning { case AllTuples => numPartitions == 1 case _ => false } -} -object Partitioning { /** - * Gets as input the `Partitioning` of an expressions and returns the valid `Partitioning`s for - * the node w.r.t its output and its expressions. + * Returns a version of this [[Partitioning]] amended by the invalid [[Attribute]]. + */ + protected def pruneInvalidAttribute(invalidAttr: Attribute): Partitioning = this + + /** + * Returns the valid `Partitioning`s for the node w.r.t its output and its expressions. */ - def updatePartitioningWithNewOutput( - inputPartitioning: Partitioning, + final def updatePartitioningWithNewOutput( expressions: Seq[NamedExpression], outputSet: AttributeSet): Partitioning = { - inputPartitioning match { + this match { case partitioning: Expression => val exprToEquiv = partitioning.references.map { attr => attr -> expressions.filter(e => @@ -211,18 +212,7 @@ object Partitioning { val validPartitionings = exprToEquiv.foldLeft(initValue) { case (partitionings, (toReplace, equivalents)) => if (equivalents.isEmpty) { - partitionings.map { - case hp: HashPartitioning if hp.references.contains(toReplace) => - UnknownPartitioning(hp.numPartitions) - case rp: RangePartitioning if rp.references.contains(toReplace) => - val validExprs = rp.children.takeWhile(!_.references.contains(toReplace)) - if (validExprs.isEmpty) { - UnknownPartitioning(rp.numPartitions) - } else { - RangePartitioning(validExprs, rp.numPartitions) - } - case other => other - } + partitionings.map(_.pruneInvalidAttribute(toReplace)) } else { partitionings.flatMap { case p: Expression if p.references.contains(toReplace) => @@ -299,6 +289,14 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int) * than numPartitions) based on hashing expressions. */ def partitionIdExpression: Expression = Pmod(new Murmur3Hash(expressions), Literal(numPartitions)) + + override protected def pruneInvalidAttribute(invalidAttr: Attribute): Partitioning = { + if (this.references.contains(invalidAttr)) { + UnknownPartitioning(numPartitions) + } else { + this + } + } } /** @@ -348,6 +346,19 @@ case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int) } } } + + override protected def pruneInvalidAttribute(invalidAttr: Attribute): Partitioning = { + if (this.references.contains(invalidAttr)) { + val validExprs = this.children.takeWhile(!_.references.contains(invalidAttr)) + if (validExprs.isEmpty) { + UnknownPartitioning(numPartitions) + } else { + RangePartitioning(validExprs, numPartitions) + } + } else { + this + } + } } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala index 113f52b22aa4..537720ae5d24 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala @@ -70,8 +70,7 @@ case class HashAggregateExec( override def output: Seq[Attribute] = resultExpressions.map(_.toAttribute) override def outputPartitioning: Partitioning = { - Partitioning.updatePartitioningWithNewOutput( - child.outputPartitioning, resultExpressions, outputSet) + child.outputPartitioning.updatePartitioningWithNewOutput(resultExpressions, outputSet) } override def producedAttributes: AttributeSet = diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala index 8efc07d9476d..89feb60a4ea9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala @@ -96,8 +96,7 @@ case class ObjectHashAggregateExec( } override def outputPartitioning: Partitioning = { - Partitioning.updatePartitioningWithNewOutput( - child.outputPartitioning, resultExpressions, outputSet) + child.outputPartitioning.updatePartitioningWithNewOutput(resultExpressions, outputSet) } protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala index bb3874a3babc..cc755a754cee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala @@ -67,8 +67,7 @@ case class SortAggregateExec( } override def outputPartitioning: Partitioning = { - Partitioning.updatePartitioningWithNewOutput( - child.outputPartitioning, resultExpressions, outputSet) + child.outputPartitioning.updatePartitioningWithNewOutput(resultExpressions, outputSet) } override def outputOrdering: Seq[SortOrder] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 53dbe450b235..f101cda1c640 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -78,7 +78,7 @@ case class ProjectExec(projectList: Seq[NamedExpression], child: SparkPlan) override def outputOrdering: Seq[SortOrder] = child.outputOrdering override def outputPartitioning: Partitioning = { - Partitioning.updatePartitioningWithNewOutput(child.outputPartitioning, projectList, outputSet) + child.outputPartitioning.updatePartitioningWithNewOutput(projectList, outputSet) } } From 09b99815e6cd34fd58d4fbee30b2cebf2b863d10 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Sun, 10 Feb 2019 12:44:56 +0100 Subject: [PATCH 20/24] use maropu's approach --- .../sql/catalyst/analysis/Analyzer.scala | 2 +- .../plans/physical/partitioning.scala | 50 +-------- .../AliasAwareOutputPartitioning.scala | 105 ++++++++++++++++++ .../aggregate/HashAggregateExec.scala | 6 +- .../aggregate/ObjectHashAggregateExec.scala | 6 +- .../aggregate/SortAggregateExec.scala | 8 +- .../execution/basicPhysicalOperators.scala | 6 +- 7 files changed, 116 insertions(+), 67 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputPartitioning.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 4adfaf2bd489..7d540fdb1d57 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -2554,7 +2554,7 @@ object EliminateUnions extends Rule[LogicalPlan] { * rule can't work for those parameters. */ object CleanupAliases extends Rule[LogicalPlan] { - private[catalyst] def trimAliases(e: Expression): Expression = { + private[spark] def trimAliases(e: Expression): Expression = { e.transformUp { case Alias(child, _) => child case MultiAlias(child, _) => child diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index e3fee1a4508e..40b43dd41760 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -189,55 +189,7 @@ trait Partitioning { /** * Returns a version of this [[Partitioning]] amended by the invalid [[Attribute]]. */ - protected def pruneInvalidAttribute(invalidAttr: Attribute): Partitioning = this - - /** - * Returns the valid `Partitioning`s for the node w.r.t its output and its expressions. - */ - final def updatePartitioningWithNewOutput( - expressions: Seq[NamedExpression], - outputSet: AttributeSet): Partitioning = { - this match { - case partitioning: Expression => - val exprToEquiv = partitioning.references.map { attr => - attr -> expressions.filter(e => - CleanupAliases.trimAliases(e).semanticEquals(attr)) - }.filterNot { case (attr, exprs) => - exprs.size == 1 && exprs.forall(_ == attr) - } - val initValue = partitioning match { - case PartitioningCollection(partitionings) => partitionings - case other => Seq(other) - } - val validPartitionings = exprToEquiv.foldLeft(initValue) { - case (partitionings, (toReplace, equivalents)) => - if (equivalents.isEmpty) { - partitionings.map(_.pruneInvalidAttribute(toReplace)) - } else { - partitionings.flatMap { - case p: Expression if p.references.contains(toReplace) => - equivalents.map { equiv => - p.transformDown { - case e if e == toReplace => equiv.toAttribute - }.asInstanceOf[Partitioning] - } - case other => Seq(other) - } - } - }.distinct - if (validPartitionings.size == 1) { - validPartitionings.head - } else { - validPartitionings.filterNot(_.isInstanceOf[UnknownPartitioning]) match { - case Seq() => PartitioningCollection(validPartitionings) - case Seq(knownPartitioning) => knownPartitioning - case knownPartitionings => PartitioningCollection(knownPartitionings) - } - - } - case other => other - } - } + private[spark] def pruneInvalidAttribute(invalidAttr: Attribute): Partitioning = this } case class UnknownPartitioning(numPartitions: Int) extends Partitioning diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputPartitioning.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputPartitioning.scala new file mode 100644 index 000000000000..63d9f89fc04b --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputPartitioning.scala @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.apache.spark.sql.catalyst.analysis.CleanupAliases +import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, NamedExpression} +import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningCollection, UnknownPartitioning} + +/** + * Trait for plans which can produce an output partitioned by aliased attributes of their child. + * It rewrites the partitioning attributes of the child with the corresponding new ones which are + * exposed in the output of this plan. It can avoid the presence of redundant shuffles in queries + * caused by the rename of an attribute among the partitioning ones, eg. + * + * spark.range(10).selectExpr("id AS key", "0").repartition($"key").write.saveAsTable("df1") + * spark.range(10).selectExpr("id AS key", "0").repartition($"key").write.saveAsTable("df2") + * sql(""" + * SELECT * FROM + * (SELECT key AS k from df1) t1 + * INNER JOIN + * (SELECT key AS k from df2) t2 + * ON t1.k = t2.k + * """).explain + * + * == Physical Plan == + * *SortMergeJoin [k#56L], [k#57L], Inner + * :- *Sort [k#56L ASC NULLS FIRST], false, 0 + * : +- Exchange hashpartitioning(k#56L, 200) // <--- Unnecessary shuffle operation + * : +- *Project [key#39L AS k#56L] + * : +- Exchange hashpartitioning(key#39L, 200) + * : +- *Project [id#36L AS key#39L] + * : +- *Range (0, 10, step=1, splits=Some(4)) + * +- *Sort [k#57L ASC NULLS FIRST], false, 0 + * +- ReusedExchange [k#57L], Exchange hashpartitioning(k#56L, 200) + */ +trait AliasAwareOutputPartitioning extends UnaryExecNode { + + protected def outputExpressions: Seq[NamedExpression] + + private def hasAlias(exprs: Seq[NamedExpression]): Boolean = + exprs.exists(_.collectFirst { case _: Alias => true }.isDefined) + + final override def outputPartitioning: Partitioning = { + if (hasAlias(outputExpressions)) { + // Returns the valid `Partitioning`s for the node w.r.t its output and its expressions. + child.outputPartitioning match { + case partitioning: Expression => + val exprToEquiv = partitioning.references.map { attr => + attr -> outputExpressions.filter(e => + CleanupAliases.trimAliases(e).semanticEquals(attr)) + }.filterNot { case (attr, exprs) => + exprs.size == 1 && exprs.forall(_ == attr) + } + val initValue = partitioning match { + case PartitioningCollection(partitionings) => partitionings + case other => Seq(other) + } + val validPartitionings = exprToEquiv.foldLeft(initValue) { + case (partitionings, (toReplace, equivalents)) => + if (equivalents.isEmpty) { + partitionings.map(_.pruneInvalidAttribute(toReplace)) + } else { + partitionings.flatMap { + case p: Expression if p.references.contains(toReplace) => + equivalents.map { equiv => + p.transformDown { + case e if e == toReplace => equiv.toAttribute + }.asInstanceOf[Partitioning] + } + case other => Seq(other) + } + } + }.distinct + if (validPartitionings.size == 1) { + validPartitionings.head + } else { + validPartitionings.filterNot(_.isInstanceOf[UnknownPartitioning]) match { + case Seq() => PartitioningCollection(validPartitionings) + case Seq(knownPartitioning) => knownPartitioning + case knownPartitionings => PartitioningCollection(knownPartitionings) + } + + } + case other => other + } + } else { + child.outputPartitioning + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala index 537720ae5d24..8959c8378b87 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala @@ -47,7 +47,7 @@ case class HashAggregateExec( initialInputBufferOffset: Int, resultExpressions: Seq[NamedExpression], child: SparkPlan) - extends UnaryExecNode with BlockingOperatorWithCodegen { + extends UnaryExecNode with BlockingOperatorWithCodegen with AliasAwareOutputPartitioning { private[this] val aggregateBufferAttributes = { aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes) @@ -69,9 +69,7 @@ case class HashAggregateExec( override def output: Seq[Attribute] = resultExpressions.map(_.toAttribute) - override def outputPartitioning: Partitioning = { - child.outputPartitioning.updatePartitioningWithNewOutput(resultExpressions, outputSet) - } + override def outputExpressions: Seq[NamedExpression] = resultExpressions override def producedAttributes: AttributeSet = AttributeSet(aggregateAttributes) ++ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala index 89feb60a4ea9..abd7fdff0ac2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala @@ -65,7 +65,7 @@ case class ObjectHashAggregateExec( initialInputBufferOffset: Int, resultExpressions: Seq[NamedExpression], child: SparkPlan) - extends UnaryExecNode { + extends UnaryExecNode with AliasAwareOutputPartitioning { private[this] val aggregateBufferAttributes = { aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes) @@ -95,9 +95,7 @@ case class ObjectHashAggregateExec( } } - override def outputPartitioning: Partitioning = { - child.outputPartitioning.updatePartitioningWithNewOutput(resultExpressions, outputSet) - } + override def outputExpressions: Seq[NamedExpression] = resultExpressions protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") { val numOutputRows = longMetric("numOutputRows") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala index cc755a754cee..dbd698345d8c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.catalyst.util.truncatedString -import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} +import org.apache.spark.sql.execution.{AliasAwareOutputPartitioning, SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.metric.SQLMetrics /** @@ -38,7 +38,7 @@ case class SortAggregateExec( initialInputBufferOffset: Int, resultExpressions: Seq[NamedExpression], child: SparkPlan) - extends UnaryExecNode { + extends UnaryExecNode with AliasAwareOutputPartitioning { private[this] val aggregateBufferAttributes = { aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes) @@ -66,9 +66,7 @@ case class SortAggregateExec( groupingExpressions.map(SortOrder(_, Ascending)) :: Nil } - override def outputPartitioning: Partitioning = { - child.outputPartitioning.updatePartitioningWithNewOutput(resultExpressions, outputSet) - } + override def outputExpressions: Seq[NamedExpression] = resultExpressions override def outputOrdering: Seq[SortOrder] = { groupingExpressions.map(SortOrder(_, Ascending)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index f101cda1c640..bc5312c51c41 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -34,7 +34,7 @@ import org.apache.spark.util.random.{BernoulliCellSampler, PoissonSampler} /** Physical plan for Project. */ case class ProjectExec(projectList: Seq[NamedExpression], child: SparkPlan) - extends UnaryExecNode with CodegenSupport { + extends UnaryExecNode with CodegenSupport with AliasAwareOutputPartitioning { override def output: Seq[Attribute] = projectList.map(_.toAttribute) @@ -77,9 +77,7 @@ case class ProjectExec(projectList: Seq[NamedExpression], child: SparkPlan) override def outputOrdering: Seq[SortOrder] = child.outputOrdering - override def outputPartitioning: Partitioning = { - child.outputPartitioning.updatePartitioningWithNewOutput(projectList, outputSet) - } + override def outputExpressions: Seq[NamedExpression] = projectList } From 69f9d5e74d87a9d08e24fdcbe3926b066fe5a3a3 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Sun, 10 Feb 2019 13:12:10 +0100 Subject: [PATCH 21/24] fix --- .../spark/sql/catalyst/plans/physical/partitioning.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index 40b43dd41760..ccf6caa9b61b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -242,7 +242,7 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int) */ def partitionIdExpression: Expression = Pmod(new Murmur3Hash(expressions), Literal(numPartitions)) - override protected def pruneInvalidAttribute(invalidAttr: Attribute): Partitioning = { + override private[spark] def pruneInvalidAttribute(invalidAttr: Attribute): Partitioning = { if (this.references.contains(invalidAttr)) { UnknownPartitioning(numPartitions) } else { @@ -299,7 +299,7 @@ case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int) } } - override protected def pruneInvalidAttribute(invalidAttr: Attribute): Partitioning = { + override private[spark] def pruneInvalidAttribute(invalidAttr: Attribute): Partitioning = { if (this.references.contains(invalidAttr)) { val validExprs = this.children.takeWhile(!_.references.contains(invalidAttr)) if (validExprs.isEmpty) { From 75ef545f32403acfb0c7b9efface2b62d8f8eb0f Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Sun, 10 Feb 2019 17:22:30 +0100 Subject: [PATCH 22/24] fix ut failures --- .../AliasAwareOutputPartitioning.scala | 86 +++++++++---------- 1 file changed, 42 insertions(+), 44 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputPartitioning.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputPartitioning.scala index 63d9f89fc04b..596601d6cdb9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputPartitioning.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputPartitioning.scala @@ -50,56 +50,54 @@ import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningC */ trait AliasAwareOutputPartitioning extends UnaryExecNode { + /** + * `Seq` of `Expression`s which define the ouput of the node. + */ protected def outputExpressions: Seq[NamedExpression] - private def hasAlias(exprs: Seq[NamedExpression]): Boolean = - exprs.exists(_.collectFirst { case _: Alias => true }.isDefined) - + /** + * Returns the valid `Partitioning`s for the node w.r.t its output and its expressions. + */ final override def outputPartitioning: Partitioning = { - if (hasAlias(outputExpressions)) { - // Returns the valid `Partitioning`s for the node w.r.t its output and its expressions. - child.outputPartitioning match { - case partitioning: Expression => - val exprToEquiv = partitioning.references.map { attr => - attr -> outputExpressions.filter(e => - CleanupAliases.trimAliases(e).semanticEquals(attr)) - }.filterNot { case (attr, exprs) => - exprs.size == 1 && exprs.forall(_ == attr) - } - val initValue = partitioning match { - case PartitioningCollection(partitionings) => partitionings - case other => Seq(other) - } - val validPartitionings = exprToEquiv.foldLeft(initValue) { - case (partitionings, (toReplace, equivalents)) => - if (equivalents.isEmpty) { - partitionings.map(_.pruneInvalidAttribute(toReplace)) - } else { - partitionings.flatMap { - case p: Expression if p.references.contains(toReplace) => - equivalents.map { equiv => - p.transformDown { - case e if e == toReplace => equiv.toAttribute - }.asInstanceOf[Partitioning] - } - case other => Seq(other) - } + child.outputPartitioning match { + case partitioning: Expression => + val exprToEquiv = partitioning.references.map { attr => + attr -> outputExpressions.filter(e => + CleanupAliases.trimAliases(e).semanticEquals(attr)) + }.filterNot { case (attr, exprs) => + exprs.size == 1 && exprs.forall(_ == attr) + } + val initValue = partitioning match { + case PartitioningCollection(partitionings) => partitionings + case other => Seq(other) + } + val validPartitionings = exprToEquiv.foldLeft(initValue) { + case (partitionings, (toReplace, equivalents)) => + if (equivalents.isEmpty) { + partitionings.map(_.pruneInvalidAttribute(toReplace)) + } else { + partitionings.flatMap { + case p: Expression if p.references.contains(toReplace) => + equivalents.map { equiv => + p.transformDown { + case e if e == toReplace => equiv.toAttribute + }.asInstanceOf[Partitioning] + } + case other => Seq(other) } - }.distinct - if (validPartitionings.size == 1) { - validPartitionings.head - } else { - validPartitionings.filterNot(_.isInstanceOf[UnknownPartitioning]) match { - case Seq() => PartitioningCollection(validPartitionings) - case Seq(knownPartitioning) => knownPartitioning - case knownPartitionings => PartitioningCollection(knownPartitionings) } - + }.distinct + if (validPartitionings.size == 1) { + validPartitionings.head + } else { + validPartitionings.filterNot(_.isInstanceOf[UnknownPartitioning]) match { + case Seq() => PartitioningCollection(validPartitionings) + case Seq(knownPartitioning) => knownPartitioning + case knownPartitionings => PartitioningCollection(knownPartitionings) } - case other => other - } - } else { - child.outputPartitioning + + } + case other => other } } } From df3394c010d3d58893dca887262a68c5a70e1fee Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Mon, 11 Feb 2019 10:33:38 +0100 Subject: [PATCH 23/24] adress comments --- .../plans/physical/partitioning.scala | 9 ++++- .../AliasAwareOutputPartitioning.scala | 35 +++++++++++++------ 2 files changed, 33 insertions(+), 11 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index ccf6caa9b61b..9bc3e071b4f9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -242,6 +242,13 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int) */ def partitionIdExpression: Expression = Pmod(new Murmur3Hash(expressions), Literal(numPartitions)) + /** + * If the HashPartitioning contains an attribute which is not present in the output expressions, + * the returned partitioning in `UnknownPartitioning` instead of the `HashPartitioning` of the + * remaining attributes which is wrong. + * Eg. `HashPartitioning('a, 'b)` with output expressions `'a as 'a1`, should produce + * `UnknownPartitioning` instead of `HashPartitioning('a1)` + */ override private[spark] def pruneInvalidAttribute(invalidAttr: Attribute): Partitioning = { if (this.references.contains(invalidAttr)) { UnknownPartitioning(numPartitions) @@ -301,7 +308,7 @@ case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int) override private[spark] def pruneInvalidAttribute(invalidAttr: Attribute): Partitioning = { if (this.references.contains(invalidAttr)) { - val validExprs = this.children.takeWhile(!_.references.contains(invalidAttr)) + val validExprs = ordering.takeWhile(!_.references.contains(invalidAttr)) if (validExprs.isEmpty) { UnknownPartitioning(numPartitions) } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputPartitioning.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputPartitioning.scala index 596601d6cdb9..505d64bded30 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputPartitioning.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputPartitioning.scala @@ -27,8 +27,9 @@ import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningC * exposed in the output of this plan. It can avoid the presence of redundant shuffles in queries * caused by the rename of an attribute among the partitioning ones, eg. * - * spark.range(10).selectExpr("id AS key", "0").repartition($"key").write.saveAsTable("df1") - * spark.range(10).selectExpr("id AS key", "0").repartition($"key").write.saveAsTable("df2") + * spark.range(10).selectExpr("id AS key", "0").repartition($"key").createTempView("df1") + * spark.range(10).selectExpr("id AS key", "0").repartition($"key").createTempView("df2") + * sql("set spark.sql.autoBroadcastJoinThreshold=-1") * sql(""" * SELECT * FROM * (SELECT key AS k from df1) t1 @@ -38,15 +39,16 @@ import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, PartitioningC * """).explain * * == Physical Plan == - * *SortMergeJoin [k#56L], [k#57L], Inner - * :- *Sort [k#56L ASC NULLS FIRST], false, 0 - * : +- Exchange hashpartitioning(k#56L, 200) // <--- Unnecessary shuffle operation - * : +- *Project [key#39L AS k#56L] - * : +- Exchange hashpartitioning(key#39L, 200) - * : +- *Project [id#36L AS key#39L] + * *SortMergeJoin [k#21L], [k#22L], Inner + * :- *Sort [k#21L ASC NULLS FIRST], false, 0 + * : +- Exchange hashpartitioning(k#21L, 200) // <--- Unnecessary shuffle operation + * : +- *Project [key#2L AS k#21L] + * : +- Exchange hashpartitioning(key#2L, 200) + * : +- *Project [id#0L AS key#2L] * : +- *Range (0, 10, step=1, splits=Some(4)) - * +- *Sort [k#57L ASC NULLS FIRST], false, 0 - * +- ReusedExchange [k#57L], Exchange hashpartitioning(k#56L, 200) + * +- *(4) Sort [k#22L ASC NULLS FIRST], false, 0 + * +- *(4) Project [key#8L AS k#22L] + * +- ReusedExchange [key#8L], Exchange hashpartitioning(key#2L, 200) */ trait AliasAwareOutputPartitioning extends UnaryExecNode { @@ -61,6 +63,13 @@ trait AliasAwareOutputPartitioning extends UnaryExecNode { final override def outputPartitioning: Partitioning = { child.outputPartitioning match { case partitioning: Expression => + // Creates a sequence of tuples where the first element is an `Attribute` referenced in the + // partitioning expression of the child and the second is a sequence of all its aliased + // occurrences in the node output. If there is no occurrence of an attribute in the output, + // the second element of the tuple for it will be an empty `Seq`. If the attribute, + // instead, is only present as is in the output, there will be no entry for it. + // Eg. if the partitioning is RangePartitioning('a) and the node output is "a, 'a as a1, + // a' as a2", then exprToEquiv will contain the tuple ('a, Seq('a, 'a as a1, 'a as a2)). val exprToEquiv = partitioning.references.map { attr => attr -> outputExpressions.filter(e => CleanupAliases.trimAliases(e).semanticEquals(attr)) @@ -71,9 +80,15 @@ trait AliasAwareOutputPartitioning extends UnaryExecNode { case PartitioningCollection(partitionings) => partitionings case other => Seq(other) } + // Replace all the aliased expressions detected earlier with all their corresponding + // occurrences. This may produce many valid partitioning expressions from a single one. + // Eg. in the example above, this would produce a `Seq` of 3 `RangePartitioning`, namely: + // `RangePartitioning('a)`, `RangePartitioning('a1)`, `RangePartitioning('a2)`. val validPartitionings = exprToEquiv.foldLeft(initValue) { case (partitionings, (toReplace, equivalents)) => if (equivalents.isEmpty) { + // Remove from the partitioning expression the attribute which is not present in the + // node output partitionings.map(_.pruneInvalidAttribute(toReplace)) } else { partitionings.flatMap { From 78d92bce7b56ff33f2a509ab6f7add5706686e00 Mon Sep 17 00:00:00 2001 From: Marco Gaido Date: Mon, 11 Feb 2019 17:06:16 +0100 Subject: [PATCH 24/24] fix rangepartitioning --- .../sql/catalyst/plans/physical/partitioning.scala | 14 +++++++------- .../spark/sql/execution/PartitioningSuite.scala | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala index 9bc3e071b4f9..b740f3d6e835 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala @@ -306,14 +306,14 @@ case class RangePartitioning(ordering: Seq[SortOrder], numPartitions: Int) } } + /** + * Returns `UnknownPartitioning` if the first ordering expressions is not valid anymore, + * otherwise it performs no modification because pruning the invalid expressions may cause + * errors when comparing with `ClusteredDistribution`s. + */ override private[spark] def pruneInvalidAttribute(invalidAttr: Attribute): Partitioning = { - if (this.references.contains(invalidAttr)) { - val validExprs = ordering.takeWhile(!_.references.contains(invalidAttr)) - if (validExprs.isEmpty) { - UnknownPartitioning(numPartitions) - } else { - RangePartitioning(validExprs, numPartitions) - } + if (ordering.headOption.forall(_.references.contains(invalidAttr))) { + UnknownPartitioning(numPartitions) } else { this } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PartitioningSuite.scala index 2ab987372f64..945fce16f863 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PartitioningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PartitioningSuite.scala @@ -224,7 +224,7 @@ class PartitioningSuite extends SparkFunSuite { testPartitioning( Seq(attr1), planRangePartitioned2Attr, - simpleRangePartitioning(Seq(attr1), 10)) + simpleRangePartitioning(Seq(attr1, attr2), 10)) } test("RangePartitioning without renaming") {