From 7711cedefe32f2c3f3b94529ff9a13bb3f978c90 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Fri, 15 Nov 2019 19:08:47 -0800 Subject: [PATCH 1/7] initial checkin --- .../execution/basicPhysicalOperators.scala | 32 ++++++++++++++++++- .../spark/sql/sources/BucketedReadSuite.scala | 19 ++++++++++- 2 files changed, 49 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 3ed42f359c0a4..6bd02af4013a5 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -81,7 +81,37 @@ case class ProjectExec(projectList: Seq[NamedExpression], child: SparkPlan) override def outputOrdering: Seq[SortOrder] = child.outputOrdering - override def outputPartitioning: Partitioning = child.outputPartitioning + override def outputPartitioning: Partitioning = { + def checkProjectList(attr: AttributeReference): Option[Attribute] = { + val result = projectList.map{ + case alias: Alias => alias.child match { + case childAttr: AttributeReference => + if (childAttr.semanticEquals(attr)) { + Some(alias.toAttribute) + } else { + None + } + case _ => None + } + case _ => None + }.filter(_.nonEmpty) + + require(result.size <= 1) + result.headOption.getOrElse(None) + } + + val outputPartitioning = child.outputPartitioning + outputPartitioning match { + case h @ HashPartitioning(expressions, numPartitions) => + val newExpressions = expressions.map{ + case a: AttributeReference => + checkProjectList(a).getOrElse(a) + case other => other + } + HashPartitioning(newExpressions, numPartitions) + case other => other + } + } override def verboseStringWithOperatorId(): String = { s""" diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index 7043b6d396977..8d92fe0152401 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -46,12 +46,13 @@ class BucketedReadWithoutHiveSupportSuite extends BucketedReadSuite with SharedS } -abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { +class BucketedReadSuite extends QueryTest with SQLTestUtils with SharedSparkSession { import testImplicits._ protected override def beforeAll(): Unit = { super.beforeAll() spark.sessionState.conf.setConf(SQLConf.LEGACY_BUCKETED_TABLE_SCAN_OUTPUT_ORDERING, true) + assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory") } protected override def afterAll(): Unit = { @@ -595,6 +596,22 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { } } + test("terry") { + df1.write.format("parquet").bucketBy(8, "i").sortBy("i", "j").saveAsTable("t1") + df1.write.format("parquet").bucketBy(8, "i").sortBy("i", "j").saveAsTable("t2") + spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "0") + sql("describe extended t1").show + sql("describe extended t2").show + sql("with t3 as (select i as i2 from t2) select * from t1, t3 where t1.i = t3.i2").explain(true) + // sql("select * from t1, t2 where t1.i = t2.i").explain(true) + // val table1 = spark.table("table1") + // val table2 = spark.table("table2") + // table1.join(table2, "i").explain(true) + // val df = spark.sql("select * from table1, table2 where table1.i = table2.i") + // df.explain(true) + // df.show + } + test("avoid shuffle when grouping keys are a super-set of bucket keys") { withTable("bucketed_table") { df1.write.format("parquet").bucketBy(8, "i").saveAsTable("bucketed_table") From d3559b47392df2989219285be4c0b25d4ed5e31e Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Wed, 18 Dec 2019 21:46:45 -0800 Subject: [PATCH 2/7] clean up --- .../execution/basicPhysicalOperators.scala | 38 +++++++------------ .../spark/sql/sources/BucketedReadSuite.scala | 34 +++++++++-------- 2 files changed, 32 insertions(+), 40 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 5e1894b4aefad..185b88b2cdb0f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -81,30 +81,14 @@ case class ProjectExec(projectList: Seq[NamedExpression], child: SparkPlan) override def outputOrdering: Seq[SortOrder] = child.outputOrdering override def outputPartitioning: Partitioning = { - def checkProjectList(attr: AttributeReference): Option[Attribute] = { - val result = projectList.map{ - case alias: Alias => alias.child match { - case childAttr: AttributeReference => - if (childAttr.semanticEquals(attr)) { - Some(alias.toAttribute) - } else { - None - } - case _ => None - } - case _ => None - }.filter(_.nonEmpty) - - require(result.size <= 1) - result.headOption.getOrElse(None) - } - - val outputPartitioning = child.outputPartitioning - outputPartitioning match { - case h @ HashPartitioning(expressions, numPartitions) => - val newExpressions = expressions.map{ + child.outputPartitioning match { + case HashPartitioning(expressions, numPartitions) => + // Replace aliases in projectList with its child expressions for any matching + // expressions in HashPartitioning. This is to ensure that outputPartitioning + // correctly matches witch requiredChildDistribution. + val newExpressions = expressions.map { case a: AttributeReference => - checkProjectList(a).getOrElse(a) + removeAlias(a).getOrElse(a) case other => other } HashPartitioning(newExpressions, numPartitions) @@ -119,8 +103,14 @@ case class ProjectExec(projectList: Seq[NamedExpression], child: SparkPlan) |Input : ${child.output.mkString("[", ", ", "]")} """.stripMargin } -} + private def removeAlias(attr: AttributeReference): Option[Attribute] = { + projectList.collectFirst { + case a @ Alias(child @ AttributeReference(_, _, _, _), _) if child.semanticEquals(attr) => + a.toAttribute + } + } +} /** Physical plan for Filter. */ case class FilterExec(condition: Expression, child: SparkPlan) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index c45bff66cbf34..1405ed12cb970 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -47,13 +47,12 @@ class BucketedReadWithoutHiveSupportSuite extends BucketedReadSuite with SharedS } -class BucketedReadSuite extends QueryTest with SQLTestUtils with SharedSparkSession { +abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { import testImplicits._ protected override def beforeAll(): Unit = { super.beforeAll() spark.sessionState.conf.setConf(SQLConf.LEGACY_BUCKETED_TABLE_SCAN_OUTPUT_ORDERING, true) - assume(spark.sparkContext.conf.get(CATALOG_IMPLEMENTATION) == "in-memory") } protected override def afterAll(): Unit = { @@ -605,20 +604,23 @@ class BucketedReadSuite extends QueryTest with SQLTestUtils with SharedSparkSess } } - test("terry") { - df1.write.format("parquet").bucketBy(8, "i").sortBy("i", "j").saveAsTable("t1") - df1.write.format("parquet").bucketBy(8, "i").sortBy("i", "j").saveAsTable("t2") - spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "0") - sql("describe extended t1").show - sql("describe extended t2").show - sql("with t3 as (select i as i2 from t2) select * from t1, t3 where t1.i = t3.i2").explain(true) - // sql("select * from t1, t2 where t1.i = t2.i").explain(true) - // val table1 = spark.table("table1") - // val table2 = spark.table("table2") - // table1.join(table2, "i").explain(true) - // val df = spark.sql("select * from table1, table2 where table1.i = table2.i") - // df.explain(true) - // df.show + test("SPARK-30298: bucket join should work with SubqueryAlias plan") { + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") { + withTable("t") { + withView("v") { + val df = (0 until 20).map(i => (i, i)).toDF("i", "j").as("df") + df.write.format("parquet").bucketBy(8, "i").saveAsTable("t") + + sql("CREATE VIEW v AS SELECT * FROM t").collect() + + val plan1 = sql("SELECT * FROM t a JOIN t b ON a.i = b.i").queryExecution.executedPlan + assert(plan1.collect { case exchange: ShuffleExchangeExec => exchange }.isEmpty) + + val plan2 = sql("SELECT * FROM t a JOIN v b ON a.i = b.i").queryExecution.executedPlan + assert(plan2.collect { case exchange: ShuffleExchangeExec => exchange }.isEmpty) + } + } + } } test("avoid shuffle when grouping keys are a super-set of bucket keys") { From c24789d7ac2fe23d0e04d7086559621f25ded644 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Thu, 2 Jan 2020 16:45:00 -0800 Subject: [PATCH 3/7] address comments --- .../AliasAwareOutputPartitioning.scala | 48 +++++++++++++++++++ .../aggregate/HashAggregateExec.scala | 4 +- .../aggregate/ObjectHashAggregateExec.scala | 4 +- .../aggregate/SortAggregateExec.scala | 6 +-- .../execution/basicPhysicalOperators.scala | 25 +--------- .../spark/sql/execution/PlannerSuite.scala | 29 +++++++++++ .../spark/sql/sources/BucketedReadSuite.scala | 2 +- 7 files changed, 87 insertions(+), 31 deletions(-) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputPartitioning.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputPartitioning.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputPartitioning.scala new file mode 100644 index 0000000000000..e93984748c20d --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputPartitioning.scala @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution + +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, NamedExpression} +import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning} + +/** + * A trait that handles aliases in the `outputExpressions` to produce `outputPartitioning` + * that satisfies output distribution requirements. + */ +trait AliasAwareOutputPartitioning extends UnaryExecNode { + protected def outputExpressions: Seq[NamedExpression] + + final override def outputPartitioning: Partitioning = { + child.outputPartitioning match { + case HashPartitioning(expressions, numPartitions) => + val newExpressions = expressions.map { + case a: AttributeReference => + removeAlias(a).getOrElse(a) + case other => other + } + HashPartitioning(newExpressions, numPartitions) + case other => other + } + } + + private def removeAlias(attr: AttributeReference): Option[Attribute] = { + outputExpressions.collectFirst { + case a @ Alias(child: AttributeReference, _) if child.semanticEquals(attr) => + a.toAttribute + } + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala index 7f19d2754673d..f73e214a6b41f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/HashAggregateExec.scala @@ -53,7 +53,7 @@ case class HashAggregateExec( initialInputBufferOffset: Int, resultExpressions: Seq[NamedExpression], child: SparkPlan) - extends UnaryExecNode with BlockingOperatorWithCodegen { + extends UnaryExecNode with BlockingOperatorWithCodegen with AliasAwareOutputPartitioning { private[this] val aggregateBufferAttributes = { aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes) @@ -75,7 +75,7 @@ case class HashAggregateExec( override def output: Seq[Attribute] = resultExpressions.map(_.toAttribute) - override def outputPartitioning: Partitioning = child.outputPartitioning + override protected def outputExpressions: Seq[NamedExpression] = resultExpressions override def producedAttributes: AttributeSet = AttributeSet(aggregateAttributes) ++ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala index 953622afebf89..4376f6b6edd57 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/ObjectHashAggregateExec.scala @@ -67,7 +67,7 @@ case class ObjectHashAggregateExec( initialInputBufferOffset: Int, resultExpressions: Seq[NamedExpression], child: SparkPlan) - extends UnaryExecNode { + extends UnaryExecNode with AliasAwareOutputPartitioning { private[this] val aggregateBufferAttributes = { aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes) @@ -97,7 +97,7 @@ case class ObjectHashAggregateExec( } } - override def outputPartitioning: Partitioning = child.outputPartitioning + override protected def outputExpressions: Seq[NamedExpression] = resultExpressions protected override def doExecute(): RDD[InternalRow] = attachTree(this, "execute") { val numOutputRows = longMetric("numOutputRows") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala index 0ddf95771d5b2..b6e684e62ea5c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/SortAggregateExec.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.expressions.aggregate._ import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.catalyst.util.truncatedString -import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} +import org.apache.spark.sql.execution.{AliasAwareOutputPartitioning, SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.metric.SQLMetrics /** @@ -38,7 +38,7 @@ case class SortAggregateExec( initialInputBufferOffset: Int, resultExpressions: Seq[NamedExpression], child: SparkPlan) - extends UnaryExecNode { + extends UnaryExecNode with AliasAwareOutputPartitioning { private[this] val aggregateBufferAttributes = { aggregateExpressions.flatMap(_.aggregateFunction.aggBufferAttributes) @@ -66,7 +66,7 @@ case class SortAggregateExec( groupingExpressions.map(SortOrder(_, Ascending)) :: Nil } - override def outputPartitioning: Partitioning = child.outputPartitioning + override protected def outputExpressions: Seq[NamedExpression] = resultExpressions override def outputOrdering: Seq[SortOrder] = { groupingExpressions.map(SortOrder(_, Ascending)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala index 185b88b2cdb0f..02c5571e60ea8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala @@ -37,7 +37,7 @@ import org.apache.spark.util.random.{BernoulliCellSampler, PoissonSampler} /** Physical plan for Project. */ case class ProjectExec(projectList: Seq[NamedExpression], child: SparkPlan) - extends UnaryExecNode with CodegenSupport { + extends UnaryExecNode with CodegenSupport with AliasAwareOutputPartitioning { override def output: Seq[Attribute] = projectList.map(_.toAttribute) @@ -80,21 +80,7 @@ case class ProjectExec(projectList: Seq[NamedExpression], child: SparkPlan) override def outputOrdering: Seq[SortOrder] = child.outputOrdering - override def outputPartitioning: Partitioning = { - child.outputPartitioning match { - case HashPartitioning(expressions, numPartitions) => - // Replace aliases in projectList with its child expressions for any matching - // expressions in HashPartitioning. This is to ensure that outputPartitioning - // correctly matches witch requiredChildDistribution. - val newExpressions = expressions.map { - case a: AttributeReference => - removeAlias(a).getOrElse(a) - case other => other - } - HashPartitioning(newExpressions, numPartitions) - case other => other - } - } + override protected def outputExpressions: Seq[NamedExpression] = projectList override def verboseStringWithOperatorId(): String = { s""" @@ -103,13 +89,6 @@ case class ProjectExec(projectList: Seq[NamedExpression], child: SparkPlan) |Input : ${child.output.mkString("[", ", ", "]")} """.stripMargin } - - private def removeAlias(attr: AttributeReference): Option[Attribute] = { - projectList.collectFirst { - case a @ Alias(child @ AttributeReference(_, _, _, _), _) if child.semanticEquals(attr) => - a.toAttribute - } - } } /** Physical plan for Filter. */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 017e548809413..022ede05a071a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -926,6 +926,35 @@ class PlannerSuite extends SharedSparkSession { } } } + + test("aliases in the project should not introduce extra shuffle.") { + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + spark.range(10).selectExpr("id AS key", "0").repartition($"key").createTempView("df1") + spark.range(10).selectExpr("id AS key", "0").repartition($"key").createTempView("df2") + val planned = sql(""" + SELECT * FROM + (SELECT key AS k from df1) t1 + INNER JOIN + (SELECT key AS k from df2) t2 + ON t1.k = t2.k + """).queryExecution.executedPlan + val exchanges = planned.collect { case s: ShuffleExchangeExec => s } + assert(exchanges.size == 1) + } + } + + test("aliases in the aggregate expressions should not introduce extra shuffle.") { + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + val t1 = spark.range(10).selectExpr("floor(id/4) as k1") + val t2 = spark.range(10).selectExpr("floor(id/4) as k2") + + val agg1 = t1.groupBy("k1").agg(count(lit("1")).as("cnt1")) + val agg2 = t2.groupBy("k2").agg(count(lit("1")).as("cnt2")).withColumnRenamed("k2", "k3") + val planned = agg1.join(agg2, $"k1" === $"k3").queryExecution.executedPlan + val exchanges = planned.collect { case s: ShuffleExchangeExec => s } + assert(exchanges.size == 2) + } + } } // Used for unit-testing EnsureRequirements diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index 1405ed12cb970..0838a0ab03076 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -604,7 +604,7 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { } } - test("SPARK-30298: bucket join should work with SubqueryAlias plan") { + test("bucket join should work with SubqueryAlias plan") { withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") { withTable("t") { withView("v") { From fcd2186c050fa3a87d9d20d31bda7f307516506d Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Mon, 6 Jan 2020 17:20:26 -0800 Subject: [PATCH 4/7] address PR comments --- .../AliasAwareOutputPartitioning.scala | 4 +- .../spark/sql/execution/PlannerSuite.scala | 55 ++++++++++++++----- 2 files changed, 44 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputPartitioning.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputPartitioning.scala index e93984748c20d..4b9a702dccd04 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputPartitioning.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputPartitioning.scala @@ -31,7 +31,7 @@ trait AliasAwareOutputPartitioning extends UnaryExecNode { case HashPartitioning(expressions, numPartitions) => val newExpressions = expressions.map { case a: AttributeReference => - removeAlias(a).getOrElse(a) + replaceAlias(a).getOrElse(a) case other => other } HashPartitioning(newExpressions, numPartitions) @@ -39,7 +39,7 @@ trait AliasAwareOutputPartitioning extends UnaryExecNode { } } - private def removeAlias(attr: AttributeReference): Option[Attribute] = { + private def replaceAlias(attr: AttributeReference): Option[Attribute] = { outputExpressions.collectFirst { case a @ Alias(child: AttributeReference, _) if child.semanticEquals(attr) => a.toAttribute diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 022ede05a071a..7cd357ad64b92 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -927,23 +927,52 @@ class PlannerSuite extends SharedSparkSession { } } - test("aliases in the project should not introduce extra shuffle.") { + test("aliases in the project should not introduce extra shuffle") { withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { - spark.range(10).selectExpr("id AS key", "0").repartition($"key").createTempView("df1") - spark.range(10).selectExpr("id AS key", "0").repartition($"key").createTempView("df2") - val planned = sql(""" - SELECT * FROM - (SELECT key AS k from df1) t1 - INNER JOIN - (SELECT key AS k from df2) t2 - ON t1.k = t2.k - """).queryExecution.executedPlan - val exchanges = planned.collect { case s: ShuffleExchangeExec => s } - assert(exchanges.size == 1) + withTempView("df1", "df2") { + spark.range(10).selectExpr("id AS key", "0").repartition($"key").createTempView("df1") + spark.range(10).selectExpr("id AS key", "0").repartition($"key").createTempView("df2") + val planned = sql( + """ + |SELECT * FROM + | (SELECT key AS k from df1) t1 + |INNER JOIN + | (SELECT key AS k from df2) t2 + |ON t1.k = t2.k + """.stripMargin).queryExecution.executedPlan + val exchanges = planned.collect { case s: ShuffleExchangeExec => s } + assert(exchanges.size == 1) + } + } + } + + test("aliases to expressions should not be replaced") { + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + withTempView("df1", "df2") { + spark.range(10).selectExpr("id AS key", "0").repartition($"key").createTempView("df1") + spark.range(10).selectExpr("id AS key", "0").repartition($"key").createTempView("df2") + val planned = sql( + """ + |SELECT * FROM + | (SELECT key + 1 AS k1 from df1) t1 + |INNER JOIN + | (SELECT key + 1 AS k2 from df2) t2 + |ON t1.k1 = t2.k2 + """.stripMargin).queryExecution.executedPlan + val exchanges = planned.collect { case s: ShuffleExchangeExec => s } + + // Make sure aliases to an expression (key + 1) are not replaced. + Seq("k1", "k2").foreach { alias => + assert(exchanges.exists(_.outputPartitioning match { + case HashPartitioning(Seq(a: AttributeReference), _) => a.name == alias + case _ => false + })) + } + } } } - test("aliases in the aggregate expressions should not introduce extra shuffle.") { + test("aliases in the aggregate expressions should not introduce extra shuffle") { withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { val t1 = spark.range(10).selectExpr("floor(id/4) as k1") val t2 = spark.range(10).selectExpr("floor(id/4) as k2") From 1762a9636b91e07c93c82bc648190956fdc24bcf Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Mon, 6 Jan 2020 17:23:00 -0800 Subject: [PATCH 5/7] fix tests --- .../scala/org/apache/spark/sql/execution/PlannerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 7cd357ad64b92..26384ec408fde 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -958,7 +958,7 @@ class PlannerSuite extends SharedSparkSession { |INNER JOIN | (SELECT key + 1 AS k2 from df2) t2 |ON t1.k1 = t2.k2 - """.stripMargin).queryExecution.executedPlan + |""".stripMargin).queryExecution.executedPlan val exchanges = planned.collect { case s: ShuffleExchangeExec => s } // Make sure aliases to an expression (key + 1) are not replaced. From b877de72055241c6c2480a2550e9c9d8d3867388 Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Wed, 22 Jan 2020 21:14:14 -0800 Subject: [PATCH 6/7] address PR comments --- .../AliasAwareOutputPartitioning.scala | 25 ++++++++++------ .../spark/sql/execution/PlannerSuite.scala | 30 +++++++++++++++++++ .../spark/sql/sources/BucketedReadSuite.scala | 11 ++----- 3 files changed, 49 insertions(+), 17 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputPartitioning.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputPartitioning.scala index 4b9a702dccd04..2c7faea019322 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputPartitioning.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/AliasAwareOutputPartitioning.scala @@ -16,7 +16,7 @@ */ package org.apache.spark.sql.execution -import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, NamedExpression} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Expression, NamedExpression} import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning} /** @@ -27,14 +27,21 @@ trait AliasAwareOutputPartitioning extends UnaryExecNode { protected def outputExpressions: Seq[NamedExpression] final override def outputPartitioning: Partitioning = { - child.outputPartitioning match { - case HashPartitioning(expressions, numPartitions) => - val newExpressions = expressions.map { - case a: AttributeReference => - replaceAlias(a).getOrElse(a) - case other => other - } - HashPartitioning(newExpressions, numPartitions) + if (hasAlias) { + child.outputPartitioning match { + case h: HashPartitioning => h.copy(expressions = replaceAliases(h.expressions)) + case other => other + } + } else { + child.outputPartitioning + } + } + + private def hasAlias: Boolean = outputExpressions.collectFirst { case _: Alias => }.isDefined + + private def replaceAliases(exprs: Seq[Expression]): Seq[Expression] = { + exprs.map { + case a: AttributeReference => replaceAlias(a).getOrElse(a) case other => other } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index e830a2add5e06..4738bcb7caaa4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan, Range, Repartition, Sort, Union} import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.execution.aggregate.{HashAggregateExec, ObjectHashAggregateExec, SortAggregateExec} import org.apache.spark.sql.execution.columnar.{InMemoryRelation, InMemoryTableScanExec} import org.apache.spark.sql.execution.exchange.{EnsureRequirements, ReusedExchangeExec, ReuseExchange, ShuffleExchangeExec} import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, SortMergeJoinExec} @@ -990,11 +991,40 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { val agg1 = t1.groupBy("k1").agg(count(lit("1")).as("cnt1")) val agg2 = t2.groupBy("k2").agg(count(lit("1")).as("cnt2")).withColumnRenamed("k2", "k3") + val planned = agg1.join(agg2, $"k1" === $"k3").queryExecution.executedPlan + + assert(planned.collect { case h: HashAggregateExec => h }.nonEmpty) + val exchanges = planned.collect { case s: ShuffleExchangeExec => s } assert(exchanges.size == 2) } } + + test("aliases in the object hash/sort aggregate expressions should not introduce extra shuffle") { + withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + Seq(true, false).foreach { useObjectHashAgg => + withSQLConf(SQLConf.USE_OBJECT_HASH_AGG.key -> useObjectHashAgg.toString) { + val t1 = spark.range(10).selectExpr("floor(id/4) as k1") + val t2 = spark.range(10).selectExpr("floor(id/4) as k2") + + val agg1 = t1.groupBy("k1").agg(collect_list("k1")) + val agg2 = t2.groupBy("k2").agg(collect_list("k2")).withColumnRenamed("k2", "k3") + + val planned = agg1.join(agg2, $"k1" === $"k3").queryExecution.executedPlan + + if (useObjectHashAgg) { + assert(planned.collect { case o: ObjectHashAggregateExec => o }.nonEmpty) + } else { + assert(planned.collect { case s: SortAggregateExec => s }.nonEmpty) + } + + val exchanges = planned.collect { case s: ShuffleExchangeExec => s } + assert(exchanges.size == 2) + } + } + } + } } // Used for unit-testing EnsureRequirements diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala index 0838a0ab03076..c7266c886128c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/BucketedReadSuite.scala @@ -608,16 +608,11 @@ abstract class BucketedReadSuite extends QueryTest with SQLTestUtils { withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "0") { withTable("t") { withView("v") { - val df = (0 until 20).map(i => (i, i)).toDF("i", "j").as("df") - df.write.format("parquet").bucketBy(8, "i").saveAsTable("t") - + spark.range(20).selectExpr("id as i").write.bucketBy(8, "i").saveAsTable("t") sql("CREATE VIEW v AS SELECT * FROM t").collect() - val plan1 = sql("SELECT * FROM t a JOIN t b ON a.i = b.i").queryExecution.executedPlan - assert(plan1.collect { case exchange: ShuffleExchangeExec => exchange }.isEmpty) - - val plan2 = sql("SELECT * FROM t a JOIN v b ON a.i = b.i").queryExecution.executedPlan - assert(plan2.collect { case exchange: ShuffleExchangeExec => exchange }.isEmpty) + val plan = sql("SELECT * FROM t a JOIN v b ON a.i = b.i").queryExecution.executedPlan + assert(plan.collect { case exchange: ShuffleExchangeExec => exchange }.isEmpty) } } } From fbafedf8793554366135b4d41189e06feb6f875e Mon Sep 17 00:00:00 2001 From: Terry Kim Date: Thu, 23 Jan 2020 10:57:30 -0800 Subject: [PATCH 7/7] address PR comments --- .../org/apache/spark/sql/execution/PlannerSuite.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 4738bcb7caaa4..94ce3559bb44b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -943,7 +943,7 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { withTempView("df1", "df2") { spark.range(10).selectExpr("id AS key", "0").repartition($"key").createTempView("df1") - spark.range(10).selectExpr("id AS key", "0").repartition($"key").createTempView("df2") + spark.range(20).selectExpr("id AS key", "0").repartition($"key").createTempView("df2") val planned = sql( """ |SELECT * FROM @@ -953,7 +953,7 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { |ON t1.k = t2.k """.stripMargin).queryExecution.executedPlan val exchanges = planned.collect { case s: ShuffleExchangeExec => s } - assert(exchanges.size == 1) + assert(exchanges.size == 2) } } } @@ -962,7 +962,7 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { withTempView("df1", "df2") { spark.range(10).selectExpr("id AS key", "0").repartition($"key").createTempView("df1") - spark.range(10).selectExpr("id AS key", "0").repartition($"key").createTempView("df2") + spark.range(20).selectExpr("id AS key", "0").repartition($"key").createTempView("df2") val planned = sql( """ |SELECT * FROM @@ -987,7 +987,7 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { test("aliases in the aggregate expressions should not introduce extra shuffle") { withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { val t1 = spark.range(10).selectExpr("floor(id/4) as k1") - val t2 = spark.range(10).selectExpr("floor(id/4) as k2") + val t2 = spark.range(20).selectExpr("floor(id/4) as k2") val agg1 = t1.groupBy("k1").agg(count(lit("1")).as("cnt1")) val agg2 = t2.groupBy("k2").agg(count(lit("1")).as("cnt2")).withColumnRenamed("k2", "k3") @@ -1006,7 +1006,7 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { Seq(true, false).foreach { useObjectHashAgg => withSQLConf(SQLConf.USE_OBJECT_HASH_AGG.key -> useObjectHashAgg.toString) { val t1 = spark.range(10).selectExpr("floor(id/4) as k1") - val t2 = spark.range(10).selectExpr("floor(id/4) as k2") + val t2 = spark.range(20).selectExpr("floor(id/4) as k2") val agg1 = t1.groupBy("k1").agg(collect_list("k1")) val agg2 = t2.groupBy("k2").agg(collect_list("k2")).withColumnRenamed("k2", "k3")