Skip to content

Commit a23f6b1

Browse files
committed
[SPARK-23195][SQL] Keep the Hint of Cached Data
## What changes were proposed in this pull request? The broadcast hint of the cached plan is lost if we cache the plan. This PR is to correct it. ```Scala val df1 = spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", "value") val df2 = spark.createDataFrame(Seq((1, "1"), (2, "2"))).toDF("key", "value") broadcast(df2).cache() df2.collect() val df3 = df1.join(df2, Seq("key"), "inner") ``` ## How was this patch tested? Added a test. Author: gatorsmile <[email protected]> Closes #20368 from gatorsmile/cachedBroadcastHint. (cherry picked from commit 44cc4da) Signed-off-by: gatorsmile <[email protected]>
1 parent 851c303 commit a23f6b1

File tree

2 files changed

+18
-2
lines changed

2 files changed

+18
-2
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,7 @@ case class InMemoryRelation(
6363
tableName: Option[String])(
6464
@transient var _cachedColumnBuffers: RDD[CachedBatch] = null,
6565
val batchStats: LongAccumulator = child.sqlContext.sparkContext.longAccumulator,
66-
statsOfPlanToCache: Statistics = null)
66+
statsOfPlanToCache: Statistics)
6767
extends logical.LeafNode with MultiInstanceRelation {
6868

6969
override protected def innerChildren: Seq[SparkPlan] = Seq(child)
@@ -77,7 +77,7 @@ case class InMemoryRelation(
7777
// Underlying columnar RDD hasn't been materialized, use the stats from the plan to cache
7878
statsOfPlanToCache
7979
} else {
80-
Statistics(sizeInBytes = batchStats.value.longValue)
80+
Statistics(sizeInBytes = batchStats.value.longValue, hints = statsOfPlanToCache.hints)
8181
}
8282
}
8383

sql/core/src/test/scala/org/apache/spark/sql/execution/joins/BroadcastJoinSuite.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,22 @@ class BroadcastJoinSuite extends QueryTest with SQLTestUtils {
139139
}
140140
}
141141

142+
test("broadcast hint is retained in a cached plan") {
143+
Seq(true, false).foreach { materialized =>
144+
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
145+
val df1 = spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", "value")
146+
val df2 = spark.createDataFrame(Seq((1, "1"), (2, "2"))).toDF("key", "value")
147+
broadcast(df2).cache()
148+
if (materialized) df2.collect()
149+
val df3 = df1.join(df2, Seq("key"), "inner")
150+
val numBroadCastHashJoin = df3.queryExecution.executedPlan.collect {
151+
case b: BroadcastHashJoinExec => b
152+
}.size
153+
assert(numBroadCastHashJoin === 1)
154+
}
155+
}
156+
}
157+
142158
private def assertBroadcastJoin(df : Dataset[Row]) : Unit = {
143159
val df1 = spark.createDataFrame(Seq((1, "4"), (2, "2"))).toDF("key", "value")
144160
val joined = df1.join(df, Seq("key"), "inner")

0 commit comments

Comments
 (0)