From 6e93b9a3db889ce0e592f5570c780ced5fcac975 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Fri, 6 Mar 2020 16:19:04 -0800 Subject: [PATCH 1/6] initial commit --- ...itioning.scala => AliasAwareOutputs.scala} | 19 +++++++++++++++++-- .../aggregate/HashAggregateExec.scala | 2 +- .../aggregate/ObjectHashAggregateExec.scala | 2 +- .../aggregate/SortAggregateExec.scala | 6 +++--- .../execution/basicPhysicalOperators.scala | 4 +--- .../spark/sql/sources/BucketedReadSuite.scala | 12 ++++++++++++ 6 files changed, 35 insertions(+), 10 deletions(-) rename sql/core/src/main/scala/org/apache/spark/sql/execution/{AliasAwareOutputPartitioning.scala => AliasAwareOutputs.scala} (80%) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputPartitioning.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputs.scala similarity index 80% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputPartitioning.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputs.scala index 2c7faea01932..13d39aa96bb8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputPartitioning.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputs.scala @@ -16,16 +16,18 @@ */ package org.apache.spark.sql.execution -import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Expression, NamedExpression} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Expression, NamedExpression, SortOrder} import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning} /** * A trait that handles aliases in the `outputExpressions` to produce `outputPartitioning` * that satisfies output distribution requirements. */ -trait AliasAwareOutputPartitioning extends UnaryExecNode { +trait AliasAwareOutputs extends UnaryExecNode { protected def outputExpressions: Seq[NamedExpression] + protected def orderingExpressions: Seq[SortOrder] = child.outputOrdering + final override def outputPartitioning: Partitioning = { if (hasAlias) { child.outputPartitioning match { @@ -37,6 +39,19 @@ trait AliasAwareOutputPartitioning extends UnaryExecNode { } } + final override def outputOrdering: Seq[SortOrder] = { + if (hasAlias) { + orderingExpressions.map { s => + s.child match { + case a: AttributeReference => s.copy(child = replaceAlias(a).getOrElse(a)) + case _ => s + } + } + } else { + orderingExpressions + } + } + private def hasAlias: Boolean = outputExpressions.collectFirst { case _: Alias => }.isDefined private def replaceAliases(exprs: Seq[Expression]): Seq[Expression] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala index 7a26fd7a8541..879a50a2d32f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala @@ -53,7 +53,7 @@ case class HashAggregateExec( initialInputBufferOffset: Int, resultExpressions: Seq[NamedExpression], child: SparkPlan) - extends BaseAggregateExec with BlockingOperatorWithCodegen with AliasAwareOutputPartitioning { + extends BaseAggregateExec with BlockingOperatorWithCodegen with AliasAwareOutputs { private[this] val aggregateBufferAttributes = { aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala index 3fb58eb2cc8b..a2bb8907092c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala @@ -67,7 +67,7 @@ case class ObjectHashAggregateExec( initialInputBufferOffset: Int, resultExpressions: Seq[NamedExpression], child: SparkPlan) - extends BaseAggregateExec with AliasAwareOutputPartitioning { + extends BaseAggregateExec with AliasAwareOutputs { private[this] val aggregateBufferAttributes = { aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala index 77ed469016fa..248db31106b5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.catalyst.util.truncatedString -import org.apache.spark.sql.execution.{AliasAwareOutputPartitioning, SparkPlan} +import org.apache.spark.sql.execution.{AliasAwareOutputs, SparkPlan} import org.apache.spark.sql.execution.metric.SQLMetrics /** @@ -38,7 +38,7 @@ case class SortAggregateExec( initialInputBufferOffset: Int, resultExpressions: Seq[NamedExpression], child: SparkPlan) - extends BaseAggregateExec with AliasAwareOutputPartitioning { + extends BaseAggregateExec with AliasAwareOutputs { private[this] val aggregateBufferAttributes = { aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes) @@ -68,7 +68,7 @@ case class SortAggregateExec( override protected def outputExpressions: Seq[NamedExpression] = resultExpressions - override def outputOrdering: Seq[SortOrder] = { + override protected def orderingExpressions: Seq[SortOrder] = { groupingExpressions.map(SortOrder(_, Ascending)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index c3e259d196ba..60bac2c65418 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -39,7 +39,7 @@ import org.apache.spark.util.random.{BernoulliCellSampler, PoissonSampler} /** Physical plan for Project. */ case class ProjectExec(projectList: Seq[NamedExpression], child: SparkPlan) - extends UnaryExecNode with CodegenSupport with AliasAwareOutputPartitioning { + extends UnaryExecNode with CodegenSupport with AliasAwareOutputs { override def output: Seq[Attribute] = projectList.map(_.toAttribute) @@ -80,8 +80,6 @@ case class ProjectExec(projectList: Seq[NamedExpression], child: SparkPlan) } } - override def outputOrdering: Seq[SortOrder] = child.outputOrdering - override protected def outputExpressions: Seq[NamedExpression] = projectList override def verboseStringWithOperatorId(): String = { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index 57bbf200ed88..7242278b59ef 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -604,6 +604,18 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { } } + test("sort should not be introduced when aliases are used") { + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") { + withTable("t") { + df1.repartition(1).write.format("parquet").bucketBy(8, "i").sortBy("i").saveAsTable("t") + val t1 = spark.table("t") + val t2 = t1.selectExpr("i as ii") + val plan = t1.join(t2, t1("i") === t2("ii")).queryExecution.executedPlan + assert(plan.collect { case sort: SortExec => sort }.isEmpty) + } + } + } + test("bucket join should work with SubqueryAlias plan") { withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") { withTable("t") { From 0dc6b087e7fd7a9cd71120ef44740863cb27ddb4 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Fri, 6 Mar 2020 16:49:32 -0800 Subject: [PATCH 2/6] update comment --- .../org/apache/spark/sql/execution/AliasAwareOutputs.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputs.scala index 13d39aa96bb8..3a6865580278 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputs.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputs.scala @@ -20,8 +20,8 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeRef import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning} /** - * A trait that handles aliases in the `outputExpressions` to produce `outputPartitioning` - * that satisfies output distribution requirements. + * A trait that handles aliases in the `outputExpressions` and `orderingExpressions` to produce + * `outputPartitioning` and `outputOrdering` that satisfy distribution and ordering requirements. */ trait AliasAwareOutputs extends UnaryExecNode { protected def outputExpressions: Seq[NamedExpression] From 41871b45b479edc65e914fe0f5b59a4c9814419f Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Sat, 7 Mar 2020 19:36:49 -0800 Subject: [PATCH 3/6] Rename AliasAwareOutputPartitioningAndOrdering --- ...ts.scala => AliasAwareOutputPartitioningAndOrdering.scala} | 4 ++-- .../spark/sql/execution/aggregate/HashAggregateExec.scala | 4 +++- .../sql/execution/aggregate/ObjectHashAggregateExec.scala | 2 +- .../spark/sql/execution/aggregate/SortAggregateExec.scala | 4 ++-- .../apache/spark/sql/execution/basicPhysicalOperators.scala | 2 +- 5 files changed, 9 insertions(+), 7 deletions(-) rename sql/core/src/main/scala/org/apache/spark/sql/execution/{AliasAwareOutputs.scala => AliasAwareOutputPartitioningAndOrdering.scala} (96%) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputPartitioningAndOrdering.scala similarity index 96% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputs.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputPartitioningAndOrdering.scala index 3a6865580278..918cb64eb1d8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputs.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputPartitioningAndOrdering.scala @@ -23,7 +23,7 @@ import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partition * A trait that handles aliases in the `outputExpressions` and `orderingExpressions` to produce * `outputPartitioning` and `outputOrdering` that satisfy distribution and ordering requirements. */ -trait AliasAwareOutputs extends UnaryExecNode { +trait AliasAwareOutputPartitioningAndOrdering extends UnaryExecNode { protected def outputExpressions: Seq[NamedExpression] protected def orderingExpressions: Seq[SortOrder] = child.outputOrdering @@ -40,7 +40,7 @@ trait AliasAwareOutputs extends UnaryExecNode { } final override def outputOrdering: Seq[SortOrder] = { - if (hasAlias) { + if (false) { orderingExpressions.map { s => s.child match { case a: AttributeReference => s.copy(child = replaceAlias(a).getOrElse(a)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala index 879a50a2d32f..ea51407473fa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala @@ -53,7 +53,9 @@ case class HashAggregateExec( initialInputBufferOffset: Int, resultExpressions: Seq[NamedExpression], child: SparkPlan) - extends BaseAggregateExec with BlockingOperatorWithCodegen with AliasAwareOutputs { + extends BaseAggregateExec + with BlockingOperatorWithCodegen + with AliasAwareOutputPartitioningAndOrdering { private[this] val aggregateBufferAttributes = { aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala index a2bb8907092c..24b21057a2c6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala @@ -67,7 +67,7 @@ case class ObjectHashAggregateExec( initialInputBufferOffset: Int, resultExpressions: Seq[NamedExpression], child: SparkPlan) - extends BaseAggregateExec with AliasAwareOutputs { + extends BaseAggregateExec with AliasAwareOutputPartitioningAndOrdering { private[this] val aggregateBufferAttributes = { aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala index 248db31106b5..89ee265b50ad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.catalyst.util.truncatedString -import org.apache.spark.sql.execution.{AliasAwareOutputs, SparkPlan} +import org.apache.spark.sql.execution.{AliasAwareOutputPartitioningAndOrdering, SparkPlan} import org.apache.spark.sql.execution.metric.SQLMetrics /** @@ -38,7 +38,7 @@ case class SortAggregateExec( initialInputBufferOffset: Int, resultExpressions: Seq[NamedExpression], child: SparkPlan) - extends BaseAggregateExec with AliasAwareOutputs { + extends BaseAggregateExec with AliasAwareOutputPartitioningAndOrdering { private[this] val aggregateBufferAttributes = { aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 60bac2c65418..923ba23dfd7b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -39,7 +39,7 @@ import org.apache.spark.util.random.{BernoulliCellSampler, PoissonSampler} /** Physical plan for Project. */ case class ProjectExec(projectList: Seq[NamedExpression], child: SparkPlan) - extends UnaryExecNode with CodegenSupport with AliasAwareOutputs { + extends UnaryExecNode with CodegenSupport with AliasAwareOutputPartitioningAndOrdering { override def output: Seq[Attribute] = projectList.map(_.toAttribute) From add187ae6443a8db71d24928c625794195604e0c Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Sat, 7 Mar 2020 19:38:14 -0800 Subject: [PATCH 4/6] fix --- .../sql/execution/AliasAwareOutputPartitioningAndOrdering.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputPartitioningAndOrdering.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputPartitioningAndOrdering.scala index 918cb64eb1d8..e7214f1d6f73 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputPartitioningAndOrdering.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputPartitioningAndOrdering.scala @@ -40,7 +40,7 @@ trait AliasAwareOutputPartitioningAndOrdering extends UnaryExecNode { } final override def outputOrdering: Seq[SortOrder] = { - if (false) { + if (hasAlias) { orderingExpressions.map { s => s.child match { case a: AttributeReference => s.copy(child = replaceAlias(a).getOrElse(a)) From 346a6d32a2225ca532cff5a3b7937e57168db8db Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Mon, 9 Mar 2020 14:54:45 -0700 Subject: [PATCH 5/6] address PR comments --- ...asAwareOutputPartitioningAndOrdering.scala | 2 +- .../execution/basicPhysicalOperators.scala | 2 ++ .../spark/sql/execution/PlannerSuite.scala | 19 +++++++++++++++++++ 3 files changed, 22 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputPartitioningAndOrdering.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputPartitioningAndOrdering.scala index e7214f1d6f73..c2fc37e8f25e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputPartitioningAndOrdering.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputPartitioningAndOrdering.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partition trait AliasAwareOutputPartitioningAndOrdering extends UnaryExecNode { protected def outputExpressions: Seq[NamedExpression] - protected def orderingExpressions: Seq[SortOrder] = child.outputOrdering + protected def orderingExpressions: Seq[SortOrder] = Nil final override def outputPartitioning: Partitioning = { if (hasAlias) { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 923ba23dfd7b..26097312f5e7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -82,6 +82,8 @@ case class ProjectExec(projectList: Seq[NamedExpression], child: SparkPlan) override protected def outputExpressions: Seq[NamedExpression] = projectList + override protected def orderingExpressions: Seq[SortOrder] = child.outputOrdering + override def verboseStringWithOperatorId(): String = { s""" |(${ExplainUtils.getOpId(this)}) $nodeName ${ExplainUtils.getCodegenId(this)} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 0c5e2e3c7d1d..dfa8046cd622 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -975,6 +975,25 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { } } } + + test("aliases in the sort aggregate expressions should not introduce extra sort") { + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + withSQLConf(SQLConf.USE_OBJECT_HASH_AGG.key -> "false") { + val t1 = spark.range(10).selectExpr("floor(id/4) as k1") + val t2 = spark.range(20).selectExpr("floor(id/4) as k2") + + val agg1 = t1.groupBy("k1").agg(collect_list("k1")).withColumnRenamed("k1", "k3") + val agg2 = t2.groupBy("k2").agg(collect_list("k2")) + + val planned = agg1.join(agg2, $"k3" === $"k2").queryExecution.executedPlan + assert(planned.collect { case s: SortAggregateExec => s }.nonEmpty) + + // We expect two SortExec nodes on each side of join. + val sorts = planned.collect { case s: SortExec => s } + assert(sorts.size == 4) + } + } + } } // Used for unit-testing EnsureRequirements From fc6bc62356ef1a4bd23a69d5a73a5c7c78995c03 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Mon, 9 Mar 2020 19:57:09 -0700 Subject: [PATCH 6/6] Address PR comment --- ...scala => AliasAwareOutputExpression.scala} | 51 +++++++++++-------- .../aggregate/HashAggregateExec.scala | 2 +- .../aggregate/ObjectHashAggregateExec.scala | 2 +- .../aggregate/SortAggregateExec.scala | 6 ++- .../execution/basicPhysicalOperators.scala | 5 +- 5 files changed, 41 insertions(+), 25 deletions(-) rename sql/core/src/main/scala/org/apache/spark/sql/execution/{AliasAwareOutputPartitioningAndOrdering.scala => AliasAwareOutputExpression.scala} (69%) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputPartitioningAndOrdering.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala similarity index 69% rename from sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputPartitioningAndOrdering.scala rename to sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala index c2fc37e8f25e..fa41e865444d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputPartitioningAndOrdering.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputExpression.scala @@ -20,14 +20,33 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeRef import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning} /** - * A trait that handles aliases in the `outputExpressions` and `orderingExpressions` to produce - * `outputPartitioning` and `outputOrdering` that satisfy distribution and ordering requirements. + * A trait that provides functionality to handle aliases in the `outputExpressions`. */ -trait AliasAwareOutputPartitioningAndOrdering extends UnaryExecNode { +trait AliasAwareOutputExpression extends UnaryExecNode { protected def outputExpressions: Seq[NamedExpression] - protected def orderingExpressions: Seq[SortOrder] = Nil + protected def hasAlias: Boolean = outputExpressions.collectFirst { case _: Alias => }.isDefined + protected def replaceAliases(exprs: Seq[Expression]): Seq[Expression] = { + exprs.map { + case a: AttributeReference => replaceAlias(a).getOrElse(a) + case other => other + } + } + + protected def replaceAlias(attr: AttributeReference): Option[Attribute] = { + outputExpressions.collectFirst { + case a @ Alias(child: AttributeReference, _) if child.semanticEquals(attr) => + a.toAttribute + } + } +} + +/** + * A trait that handles aliases in the `outputExpressions` to produce `outputPartitioning` that + * satisfies distribution requirements. + */ +trait AliasAwareOutputPartitioning extends AliasAwareOutputExpression { final override def outputPartitioning: Partitioning = { if (hasAlias) { child.outputPartitioning match { @@ -38,6 +57,14 @@ trait AliasAwareOutputPartitioningAndOrdering extends UnaryExecNode { child.outputPartitioning } } +} + +/** + * A trait that handles aliases in the `orderingExpressions` to produce `outputOrdering` that + * satisfies ordering requirements. + */ +trait AliasAwareOutputOrdering extends AliasAwareOutputExpression { + protected def orderingExpressions: Seq[SortOrder] final override def outputOrdering: Seq[SortOrder] = { if (hasAlias) { @@ -51,20 +78,4 @@ trait AliasAwareOutputPartitioningAndOrdering extends UnaryExecNode { orderingExpressions } } - - private def hasAlias: Boolean = outputExpressions.collectFirst { case _: Alias => }.isDefined - - private def replaceAliases(exprs: Seq[Expression]): Seq[Expression] = { - exprs.map { - case a: AttributeReference => replaceAlias(a).getOrElse(a) - case other => other - } - } - - private def replaceAlias(attr: AttributeReference): Option[Attribute] = { - outputExpressions.collectFirst { - case a @ Alias(child: AttributeReference, _) if child.semanticEquals(attr) => - a.toAttribute - } - } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala index ea51407473fa..7f30a47ddb6f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala @@ -55,7 +55,7 @@ case class HashAggregateExec( child: SparkPlan) extends BaseAggregateExec with BlockingOperatorWithCodegen - with AliasAwareOutputPartitioningAndOrdering { + with AliasAwareOutputPartitioning { private[this] val aggregateBufferAttributes = { aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala index 24b21057a2c6..3fb58eb2cc8b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala @@ -67,7 +67,7 @@ case class ObjectHashAggregateExec( initialInputBufferOffset: Int, resultExpressions: Seq[NamedExpression], child: SparkPlan) - extends BaseAggregateExec with AliasAwareOutputPartitioningAndOrdering { + extends BaseAggregateExec with AliasAwareOutputPartitioning { private[this] val aggregateBufferAttributes = { aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala index 89ee265b50ad..9610eab82c7c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.catalyst.util.truncatedString -import org.apache.spark.sql.execution.{AliasAwareOutputPartitioningAndOrdering, SparkPlan} +import org.apache.spark.sql.execution.{AliasAwareOutputOrdering, AliasAwareOutputPartitioning, SparkPlan} import org.apache.spark.sql.execution.metric.SQLMetrics /** @@ -38,7 +38,9 @@ case class SortAggregateExec( initialInputBufferOffset: Int, resultExpressions: Seq[NamedExpression], child: SparkPlan) - extends BaseAggregateExec with AliasAwareOutputPartitioningAndOrdering { + extends BaseAggregateExec + with AliasAwareOutputPartitioning + with AliasAwareOutputOrdering { private[this] val aggregateBufferAttributes = { aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 26097312f5e7..99e485fda13a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -39,7 +39,10 @@ import org.apache.spark.util.random.{BernoulliCellSampler, PoissonSampler} /** Physical plan for Project. */ case class ProjectExec(projectList: Seq[NamedExpression], child: SparkPlan) - extends UnaryExecNode with CodegenSupport with AliasAwareOutputPartitioningAndOrdering { + extends UnaryExecNode + with CodegenSupport + with AliasAwareOutputPartitioning + with AliasAwareOutputOrdering { override def output: Seq[Attribute] = projectList.map(_.toAttribute)