From 1a8da2a02f6ef8f48616bef1fd771fe0dc02574b Mon Sep 17 00:00:00 2001 From: wangxiaojing Date: Mon, 24 Nov 2014 20:04:05 -0800 Subject: [PATCH 1/5] add BroadcastLeftSemiJoinHash --- .../joins/BroadcastLeftSemiJoinHash.scala | 69 +++++++++++++++++++ 1 file changed, 69 insertions(+) create mode 100644 sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala new file mode 100644 index 0000000000000..9d88c8d71a14f --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.joins + +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.sql.catalyst.expressions.{Expression, Row} +import org.apache.spark.sql.catalyst.plans.physical.ClusteredDistribution +import org.apache.spark.sql.execution.{BinaryNode, SparkPlan} + +/** + * :: DeveloperApi :: + * Build the right table's join keys into a HashSet, and iteratively go through the left + * table, to find the if join keys are in the Hash set. + */ +@DeveloperApi +case class BroadcastLeftSemiJoinHash( + leftKeys: Seq[Expression], + rightKeys: Seq[Expression], + left: SparkPlan, + right: SparkPlan) extends BinaryNode with HashJoin { + + override val buildSide = BuildRight + + override def output = left.output + + override def execute() = { + + val buildIter= buildPlan.execute().map(_.copy()).collect().toIterator + val hashSet = new java.util.HashSet[Row]() + var currentRow: Row = null + + // Create a Hash set of buildKeys + while (buildIter.hasNext) { + currentRow = buildIter.next() + val rowKey = buildSideKeyGenerator(currentRow) + if (!rowKey.anyNull) { + val keyExists = hashSet.contains(rowKey) + if (!keyExists) { + hashSet.add(rowKey) + } + } + } + + val broadcastedRelation = sparkContext.broadcast(hashSet) + + streamedPlan.execute().mapPartitions { streamIter => + + val joinKeys = streamSideKeyGenerator() + streamIter.filter(current => { + !joinKeys(current).anyNull && broadcastedRelation.value.contains(joinKeys.currentValue) + }) + } + } +} From ff2e618780f4c06242e505bf7012fe518619e4ae Mon Sep 17 00:00:00 2001 From: wangxiaojing Date: Mon, 24 Nov 2014 20:09:52 -0800 Subject: [PATCH 2/5] add testsuite --- .../spark/sql/execution/SparkStrategies.scala | 6 ++ .../org/apache/spark/sql/JoinSuite.scala | 36 ++++++++++ .../spark/sql/hive/StatisticsSuite.scala | 69 ++++++++++++++++++- 3 files changed, 110 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 2954d4ce7d2d8..0fd9206a435ee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -33,6 +33,12 @@ private[sql] abstract class SparkStrategies extends QueryPlanner[SparkPlan] { object LeftSemiJoin extends Strategy with PredicateHelper { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { + case ExtractEquiJoinKeys(LeftSemi, leftKeys, rightKeys, condition, left, right) + if sqlContext.autoBroadcastJoinThreshold > 0 && + right.statistics.sizeInBytes <= sqlContext.autoBroadcastJoinThreshold => + val semiJoin = joins.BroadcastLeftSemiJoinHash( + leftKeys, rightKeys, planLater(left), planLater(right)) + condition.map(Filter(_, semiJoin)).getOrElse(semiJoin) :: Nil // Find left semi joins where at least some predicates can be evaluated by matching join keys case ExtractEquiJoinKeys(LeftSemi, leftKeys, rightKeys, condition, left, right) => val semiJoin = joins.LeftSemiJoinHash( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index 0378fd7e367f0..89f397150d92b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -48,6 +48,7 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach { case j: LeftSemiJoinBNL => j case j: CartesianProduct => j case j: BroadcastNestedLoopJoin => j + case j: BroadcastLeftSemiJoinHash => j } assert(operators.size === 1) @@ -382,4 +383,39 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach { """.stripMargin), (null, 10) :: Nil) } + test("broadcasted left semi join operator selection") { + clearCache() + sql("CACHE TABLE testData") + val tmp = autoBroadcastJoinThreshold + + sql( s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=1000000000""") + Seq( + ("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a", classOf[BroadcastLeftSemiJoinHash]) + ).foreach { + case (query, joinClass) => assertJoin(query, joinClass) + } + + sql( s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=-1""") + + Seq( + ("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a", classOf[LeftSemiJoinHash]) + ).foreach { + case (query, joinClass) => assertJoin(query, joinClass) + } + + sql( s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=-$tmp""") + sql("UNCACHE TABLE testData") + } + + test("left semi join") { + val rdd = sql("SELECT * FROM testData2 LEFT SEMI JOIN testData ON key = a") + checkAnswer(rdd, + (1, 1) :: + (1, 2) :: + (2, 1) :: + (2, 2) :: + (3, 1) :: + (3, 2) :: Nil) + + } } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index ff4071d8e2f10..b337dd1fdd850 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -22,7 +22,8 @@ import org.scalatest.BeforeAndAfterAll import scala.reflect.ClassTag import org.apache.spark.sql.{SQLConf, QueryTest} -import org.apache.spark.sql.execution.joins.{BroadcastHashJoin, ShuffledHashJoin} +import org.apache.spark.sql.catalyst.plans.logical.NativeCommand +import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.hive.execution._ @@ -193,4 +194,70 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll { ) } + test("auto converts to broadcast left semi join, by size estimate of a relation") { + def mkTest( + before: () => Unit, + after: () => Unit, + query: String, + expectedAnswer: Seq[Any], + ct: ClassTag[_]) = { + before() + + var rdd = sql(query) + + // Assert src has a size smaller than the threshold. + val sizes = rdd.queryExecution.analyzed.collect { + case r if ct.runtimeClass.isAssignableFrom(r.getClass) => r.statistics.sizeInBytes + } + assert(sizes.size === 2 && sizes(1) <= autoBroadcastJoinThreshold + && sizes(0) <= autoBroadcastJoinThreshold, + s"query should contain two relations, each of which has size smaller than autoConvertSize") + + // Using `sparkPlan` because for relevant patterns in HashJoin to be + // matched, other strategies need to be applied. + var bhj = rdd.queryExecution.sparkPlan.collect { + case j: BroadcastLeftSemiJoinHash => j + } + assert(bhj.size === 1, + s"actual query plans do not contain broadcast join: ${rdd.queryExecution}") + + checkAnswer(rdd, expectedAnswer) // check correctness of output + + TestHive.settings.synchronized { + val tmp = autoBroadcastJoinThreshold + + sql( s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=-1""") + rdd = sql(query) + bhj = rdd.queryExecution.sparkPlan.collect { + case j: BroadcastLeftSemiJoinHash => j + } + assert(bhj.isEmpty, "BroadcastHashJoin still planned even though it is switched off") + + val shj = rdd.queryExecution.sparkPlan.collect { + case j: LeftSemiJoinHash => j + } + assert(shj.size === 1, + "LeftSemiJoinHash should be planned when BroadcastHashJoin is turned off") + + sql( s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=$tmp""") + } + + after() + } + + /** Tests for MetastoreRelation */ + val leftSemiJoinQuery = + """SELECT * FROM src a + |left semi JOIN src b ON a.key=86 and a.key = b.key""".stripMargin + val Answer =(86, "val_86") ::Nil + + mkTest( + () => (), + () => (), + leftSemiJoinQuery, + Answer, + implicitly[ClassTag[MetastoreRelation]] + ) + + } } From fbe4887c10939907d8e9d55e07e4f70d38d43d3d Mon Sep 17 00:00:00 2001 From: wangxiaojing Date: Mon, 1 Dec 2014 20:57:00 -0800 Subject: [PATCH 3/5] change style --- .../joins/BroadcastLeftSemiJoinHash.scala | 2 - .../org/apache/spark/sql/JoinSuite.scala | 5 +- .../spark/sql/hive/StatisticsSuite.scala | 88 +++++++------------ 3 files changed, 37 insertions(+), 58 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala index 9d88c8d71a14f..2ab064fd0151e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastLeftSemiJoinHash.scala @@ -39,7 +39,6 @@ case class BroadcastLeftSemiJoinHash( override def output = left.output override def execute() = { - val buildIter= buildPlan.execute().map(_.copy()).collect().toIterator val hashSet = new java.util.HashSet[Row]() var currentRow: Row = null @@ -59,7 +58,6 @@ case class BroadcastLeftSemiJoinHash( val broadcastedRelation = sparkContext.broadcast(hashSet) streamedPlan.execute().mapPartitions { streamIter => - val joinKeys = streamSideKeyGenerator() streamIter.filter(current => { !joinKeys(current).anyNull && broadcastedRelation.value.contains(joinKeys.currentValue) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index 89f397150d92b..2efe7dd35573d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -383,7 +383,8 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach { """.stripMargin), (null, 10) :: Nil) } - test("broadcasted left semi join operator selection") { + + test("broadcasted left semi join operator selection") { clearCache() sql("CACHE TABLE testData") val tmp = autoBroadcastJoinThreshold @@ -403,7 +404,7 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach { case (query, joinClass) => assertJoin(query, joinClass) } - sql( s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=-$tmp""") + setConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD, tmp.toString) sql("UNCACHE TABLE testData") } diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index b337dd1fdd850..49560982665de 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -195,69 +195,49 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll { } test("auto converts to broadcast left semi join, by size estimate of a relation") { - def mkTest( - before: () => Unit, - after: () => Unit, - query: String, - expectedAnswer: Seq[Any], - ct: ClassTag[_]) = { - before() - - var rdd = sql(query) - - // Assert src has a size smaller than the threshold. - val sizes = rdd.queryExecution.analyzed.collect { - case r if ct.runtimeClass.isAssignableFrom(r.getClass) => r.statistics.sizeInBytes - } - assert(sizes.size === 2 && sizes(1) <= autoBroadcastJoinThreshold - && sizes(0) <= autoBroadcastJoinThreshold, - s"query should contain two relations, each of which has size smaller than autoConvertSize") + val leftSemiJoinQuery = + """SELECT * FROM src a + |left semi JOIN src b ON a.key=86 and a.key = b.key""".stripMargin + val answer = (86, "val_86") :: Nil - // Using `sparkPlan` because for relevant patterns in HashJoin to be - // matched, other strategies need to be applied. - var bhj = rdd.queryExecution.sparkPlan.collect { - case j: BroadcastLeftSemiJoinHash => j - } - assert(bhj.size === 1, - s"actual query plans do not contain broadcast join: ${rdd.queryExecution}") + var rdd = sql(leftSemiJoinQuery) - checkAnswer(rdd, expectedAnswer) // check correctness of output + // Assert src has a size smaller than the threshold. + val sizes = rdd.queryExecution.analyzed.collect { + case r if implicitly[ClassTag[MetastoreRelation]].runtimeClass.isAssignableFrom(r.getClass) => r.statistics.sizeInBytes + } + assert(sizes.size === 2 && sizes(1) <= autoBroadcastJoinThreshold + && sizes(0) <= autoBroadcastJoinThreshold, + s"query should contain two relations, each of which has size smaller than autoConvertSize") + + // Using `sparkPlan` because for relevant patterns in HashJoin to be + // matched, other strategies need to be applied. + var bhj = rdd.queryExecution.sparkPlan.collect { + case j: BroadcastLeftSemiJoinHash => j + } + assert(bhj.size === 1, + s"actual query plans do not contain broadcast join: ${rdd.queryExecution}") - TestHive.settings.synchronized { - val tmp = autoBroadcastJoinThreshold + checkAnswer(rdd, answer) // check correctness of output - sql( s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=-1""") - rdd = sql(query) - bhj = rdd.queryExecution.sparkPlan.collect { - case j: BroadcastLeftSemiJoinHash => j - } - assert(bhj.isEmpty, "BroadcastHashJoin still planned even though it is switched off") + TestHive.settings.synchronized { + val tmp = autoBroadcastJoinThreshold - val shj = rdd.queryExecution.sparkPlan.collect { - case j: LeftSemiJoinHash => j - } - assert(shj.size === 1, - "LeftSemiJoinHash should be planned when BroadcastHashJoin is turned off") + sql( s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=-1""") + rdd = sql(leftSemiJoinQuery) + bhj = rdd.queryExecution.sparkPlan.collect { + case j: BroadcastLeftSemiJoinHash => j + } + assert(bhj.isEmpty, "BroadcastHashJoin still planned even though it is switched off") - sql( s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=$tmp""") + val shj = rdd.queryExecution.sparkPlan.collect { + case j: LeftSemiJoinHash => j } + assert(shj.size === 1, + "LeftSemiJoinHash should be planned when BroadcastHashJoin is turned off") - after() + sql( s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=$tmp""") } - /** Tests for MetastoreRelation */ - val leftSemiJoinQuery = - """SELECT * FROM src a - |left semi JOIN src b ON a.key=86 and a.key = b.key""".stripMargin - val Answer =(86, "val_86") ::Nil - - mkTest( - () => (), - () => (), - leftSemiJoinQuery, - Answer, - implicitly[ClassTag[MetastoreRelation]] - ) - } } From f103983f7ccde33ef116128c4686e1acb2097df0 Mon Sep 17 00:00:00 2001 From: wangxiaojing Date: Wed, 3 Dec 2014 02:30:24 -0800 Subject: [PATCH 4/5] change style --- .../src/test/scala/org/apache/spark/sql/JoinSuite.scala | 7 ++++--- .../scala/org/apache/spark/sql/hive/StatisticsSuite.scala | 8 +++++--- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala index 2efe7dd35573d..1a4232dab86e7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/JoinSuite.scala @@ -389,14 +389,15 @@ class JoinSuite extends QueryTest with BeforeAndAfterEach { sql("CACHE TABLE testData") val tmp = autoBroadcastJoinThreshold - sql( s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=1000000000""") + sql(s"SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=1000000000") Seq( - ("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a", classOf[BroadcastLeftSemiJoinHash]) + ("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a", + classOf[BroadcastLeftSemiJoinHash]) ).foreach { case (query, joinClass) => assertJoin(query, joinClass) } - sql( s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=-1""") + sql(s"SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=-1") Seq( ("SELECT * FROM testData LEFT SEMI JOIN testData2 ON key = a", classOf[LeftSemiJoinHash]) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 49560982665de..9efd6c0b5080d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -204,7 +204,9 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll { // Assert src has a size smaller than the threshold. val sizes = rdd.queryExecution.analyzed.collect { - case r if implicitly[ClassTag[MetastoreRelation]].runtimeClass.isAssignableFrom(r.getClass) => r.statistics.sizeInBytes + case r if implicitly[ClassTag[MetastoreRelation]].runtimeClass + .isAssignableFrom(r.getClass) => + r.statistics.sizeInBytes } assert(sizes.size === 2 && sizes(1) <= autoBroadcastJoinThreshold && sizes(0) <= autoBroadcastJoinThreshold, @@ -223,7 +225,7 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll { TestHive.settings.synchronized { val tmp = autoBroadcastJoinThreshold - sql( s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=-1""") + sql(s"SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=-1") rdd = sql(leftSemiJoinQuery) bhj = rdd.queryExecution.sparkPlan.collect { case j: BroadcastLeftSemiJoinHash => j @@ -236,7 +238,7 @@ class StatisticsSuite extends QueryTest with BeforeAndAfterAll { assert(shj.size === 1, "LeftSemiJoinHash should be planned when BroadcastHashJoin is turned off") - sql( s"""SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=$tmp""") + sql(s"SET ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD}=$tmp") } } From a4a43c99b49156ef90fa3b9493b008823dbc01d3 Mon Sep 17 00:00:00 2001 From: wangxiaojing Date: Tue, 23 Dec 2014 18:50:23 -0800 Subject: [PATCH 5/5] rebase --- .../test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 9efd6c0b5080d..4b6a9308b9811 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -22,7 +22,6 @@ import org.scalatest.BeforeAndAfterAll import scala.reflect.ClassTag import org.apache.spark.sql.{SQLConf, QueryTest} -import org.apache.spark.sql.catalyst.plans.logical.NativeCommand import org.apache.spark.sql.execution.joins._ import org.apache.spark.sql.hive.test.TestHive import org.apache.spark.sql.hive.test.TestHive._