Skip to content

Commit 767cc94

Browse files
chenghao-intelmarmbrus
authored andcommitted
[SPARK-7158] [SQL] Fix bug of cached data cannot be used in collect() after cache()
When df.cache() method called, the `withCachedData` of `QueryExecution` has been created, which mean it will not look up the cached tables when action method called afterward. Author: Cheng Hao <[email protected]> Closes apache#5714 from chenghao-intel/SPARK-7158 and squashes the following commits: 58ea8aa [Cheng Hao] style issue 2bf740f [Cheng Hao] create new QueryExecution instance for CacheManager a5647d9 [Cheng Hao] hide the queryExecution of DataFrame fbfd3c5 [Cheng Hao] make the DataFrame.queryExecution mutable for cache/persist/unpersist
1 parent 337c16d commit 767cc94

File tree

2 files changed

+27
-1
lines changed

2 files changed

+27
-1
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,7 +103,7 @@ private[sql] class CacheManager(sqlContext: SQLContext) extends Logging {
103103
sqlContext.conf.useCompression,
104104
sqlContext.conf.columnBatchSize,
105105
storageLevel,
106-
query.queryExecution.executedPlan,
106+
sqlContext.executePlan(query.logicalPlan).executedPlan,
107107
tableName))
108108
}
109109
}

sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,32 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll with SQLTestUtils {
123123
)
124124
}
125125

126+
test("SPARK-7158 collect and take return different results") {
127+
import java.util.UUID
128+
import org.apache.spark.sql.types._
129+
130+
val df = Seq(Tuple1(1), Tuple1(2), Tuple1(3)).toDF("index")
131+
// we except the id is materialized once
132+
def id: () => String = () => { UUID.randomUUID().toString() }
133+
134+
val dfWithId = df.withColumn("id", callUDF(id, StringType))
135+
// Make a new DataFrame (actually the same reference to the old one)
136+
val cached = dfWithId.cache()
137+
// Trigger the cache
138+
val d0 = dfWithId.collect()
139+
val d1 = cached.collect()
140+
val d2 = cached.collect()
141+
142+
// Since the ID is only materialized once, then all of the records
143+
// should come from the cache, not by re-computing. Otherwise, the ID
144+
// will be different
145+
assert(d0.map(_(0)) === d2.map(_(0)))
146+
assert(d0.map(_(1)) === d2.map(_(1)))
147+
148+
assert(d1.map(_(0)) === d2.map(_(0)))
149+
assert(d1.map(_(1)) === d2.map(_(1)))
150+
}
151+
126152
test("grouping on nested fields") {
127153
sqlContext.read.json(sqlContext.sparkContext.parallelize(
128154
"""{"nested": {"attribute": 1}, "value": 2}""" :: Nil))

0 commit comments

Comments
 (0)