Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -166,16 +166,34 @@ class CacheManager extends Logging {
val needToRecache = scala.collection.mutable.ArrayBuffer.empty[CachedData]
while (it.hasNext) {
val cd = it.next()
if (condition(cd.plan)) {
if (clearCache) {
cd.cachedRepresentation.cacheBuilder.clearCache()
}
// If `clearCache` is false (which means the recache request comes from a non-cascading
// cache invalidation) and the cache buffer has already been loaded, we do not need to
// re-compile a physical plan because the old plan will not be used any more by the
// CacheManager although it still lives in compiled `Dataset`s and it could still work.
// Otherwise, it means either `clearCache` is true, then we have to clear the cache buffer
// and re-compile the physical plan; or it is a non-cascading cache invalidation and cache
// buffer is still empty, then we could have a more efficient new plan by removing
// dependency on the previously removed cache entries.
// Note that the `CachedRDDBuilder`.`isCachedColumnBuffersLoaded` call is a non-locking
// status test and may not return the most accurate cache buffer state. So the worse case
// scenario can be:
// 1) The buffer has been loaded, but `isCachedColumnBuffersLoaded` returns false, then we
// will clear the buffer and build a new plan. It is inefficient but doesn't affect
// correctness.
// 2) The buffer has been cleared, but `isCachedColumnBuffersLoaded` returns true, then we
// will keep it as it is. It means the physical plan has been re-compiled already in the
// other thread.
val buildNewPlan =
clearCache || !cd.cachedRepresentation.cacheBuilder.isCachedColumnBuffersLoaded
if (condition(cd.plan) && buildNewPlan) {
cd.cachedRepresentation.cacheBuilder.clearCache()
// Remove the cache entry before we create a new one, so that we can have a different
// physical plan.
it.remove()
val plan = spark.sessionState.executePlan(cd.plan).executedPlan
val newCache = InMemoryRelation(
cacheBuilder = cd.cachedRepresentation.cacheBuilder.withCachedPlan(plan),
cacheBuilder = cd.cachedRepresentation
.cacheBuilder.copy(cachedPlan = plan)(_cachedColumnBuffers = null),
logicalPlan = cd.plan)
needToRecache += cd.copy(cachedRepresentation = newCache)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,8 @@ case class CachedRDDBuilder(
}
}

def withCachedPlan(cachedPlan: SparkPlan): CachedRDDBuilder = {
new CachedRDDBuilder(
useCompression,
batchSize,
storageLevel,
cachedPlan = cachedPlan,
tableName
)(_cachedColumnBuffers)
def isCachedColumnBuffersLoaded: Boolean = {
_cachedColumnBuffers != null
}

private def buildBuffers(): RDD[CachedBatch] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,9 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext with TimeLimits

df1.unpersist(blocking = true)

// df1 un-cached; df2's cache plan re-compiled
// df1 un-cached; df2's cache plan stays the same
assert(df1.storageLevel == StorageLevel.NONE)
assertCacheDependency(df1.groupBy('a).agg(sum('b)), 0)
assertCacheDependency(df1.groupBy('a).agg(sum('b)))

val df4 = df1.groupBy('a).agg(sum('b)).agg(sum("sum(b)"))
assertCached(df4)
Expand All @@ -206,4 +206,44 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext with TimeLimits
// first time use, load cache
checkDataset(df5, Row(10))
}

test("SPARK-26708 Cache data and cached plan should stay consistent") {
val df = spark.range(0, 5).toDF("a")
val df1 = df.withColumn("b", 'a + 1)
val df2 = df.filter('a > 1)

df.cache()
// Add df1 to the CacheManager; the buffer is currently empty.
df1.cache()
// After calling collect(), df1's buffer has been loaded.
df1.collect()
// Add df2 to the CacheManager; the buffer is currently empty.
df2.cache()

// Verify that df1 is a InMemoryRelation plan with dependency on another cached plan.
assertCacheDependency(df1)
val df1InnerPlan = df1.queryExecution.withCachedData
.asInstanceOf[InMemoryRelation].cacheBuilder.cachedPlan
// Verify that df2 is a InMemoryRelation plan with dependency on another cached plan.
assertCacheDependency(df2)

df.unpersist(blocking = true)

// Verify that df1's cache has stayed the same, since df1's cache already has data
// before df.unpersist().
val df1Limit = df1.limit(2)
val df1LimitInnerPlan = df1Limit.queryExecution.withCachedData.collectFirst {
case i: InMemoryRelation => i.cacheBuilder.cachedPlan
}
assert(df1LimitInnerPlan.isDefined && df1LimitInnerPlan.get == df1InnerPlan)

// Verify that df2's cache has been re-cached, with a new physical plan rid of dependency
// on df, since df2's cache had not been loaded before df.unpersist().
val df2Limit = df2.limit(2)
val df2LimitInnerPlan = df2Limit.queryExecution.withCachedData.collectFirst {
case i: InMemoryRelation => i.cacheBuilder.cachedPlan
}
assert(df2LimitInnerPlan.isDefined &&
df2LimitInnerPlan.get.find(_.isInstanceOf[InMemoryTableScanExec]).isEmpty)
}
}