Skip to content

Commit a5647d9

Browse files
hide the queryExecution of DataFrame
1 parent fbfd3c5 commit a5647d9

File tree

2 files changed

+11
-4
lines changed

2 files changed

+11
-4
lines changed

sql/core/src/main/scala/org/apache/spark/sql/DataFrame.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ private[sql] object DataFrame {
115115
@Experimental
116116
class DataFrame private[sql](
117117
@transient val sqlContext: SQLContext,
118-
@DeveloperApi @transient var queryExecution: SQLContext#QueryExecution)
118+
@DeveloperApi @transient private var _queryExecution: SQLContext#QueryExecution)
119119
extends RDDApi[Row] with Serializable {
120120

121121
/**
@@ -134,6 +134,8 @@ class DataFrame private[sql](
134134
})
135135
}
136136

137+
@DeveloperApi def queryExecution: SQLContext#QueryExecution = _queryExecution
138+
137139
@transient protected[sql] val logicalPlan: LogicalPlan = queryExecution.logical match {
138140
// For various commands (like DDL) and queries with side effects, we force query optimization to
139141
// happen right away to let these side effects take place eagerly.
@@ -1317,7 +1319,7 @@ class DataFrame private[sql](
13171319
*/
13181320
override def persist(newLevel: StorageLevel): this.type = {
13191321
sqlContext.cacheManager.cacheQuery(this, None, newLevel)
1320-
this.queryExecution = new sqlContext.QueryExecution(this.queryExecution.logical)
1322+
this._queryExecution = new sqlContext.QueryExecution(this.queryExecution.logical)
13211323
this
13221324
}
13231325

@@ -1327,7 +1329,7 @@ class DataFrame private[sql](
13271329
*/
13281330
override def unpersist(blocking: Boolean): this.type = {
13291331
sqlContext.cacheManager.tryUncacheQuery(this, blocking)
1330-
this.queryExecution = new sqlContext.QueryExecution(this.queryExecution.logical)
1332+
this._queryExecution = new sqlContext.QueryExecution(this.queryExecution.logical)
13311333
this
13321334
}
13331335

sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,15 +122,20 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll with SQLTestUtils {
122122
import org.apache.spark.sql.types._
123123

124124
val df = Seq(Tuple1(1), Tuple1(2), Tuple1(3)).toDF("index")
125+
// we except the id is materialized once
125126
def id:() => String = () => { UUID.randomUUID().toString() }
126127

127-
// Expect the ID to have materialized at this point
128128
val dfWithId = df.withColumn("id", callUDF(id, StringType))
129+
// Make a new DataFrame (actually the same reference to the old one)
129130
val cached = dfWithId.cache()
131+
// Trigger the cache
130132
val d0 = dfWithId.collect()
131133
val d1 = cached.collect()
132134
val d2 = cached.collect()
133135

136+
// Since the ID is only materialized once, then all of the records
137+
// should come from the cache, not by re-computing. Otherwise, the ID
138+
// will be different
134139
assert(d0.map(_(0)) === d2.map(_(0)))
135140
assert(d0.map(_(1)) === d2.map(_(1)))
136141

0 commit comments

Comments
 (0)