From e187aeb67a13493c6f5a9e540779a677d3502b04 Mon Sep 17 00:00:00 2001 From: vinodkc Date: Sat, 29 Jul 2017 20:40:24 +0530 Subject: [PATCH 1/2] Fixed unpersisting related DFs --- .../spark/sql/execution/CacheManager.scala | 9 ++++++--- .../org/apache/spark/sql/DatasetCacheSuite.scala | 16 ++++++++++++++++ 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index 0ea806d6cb50b..dc5eb8de098de 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -118,11 +118,14 @@ class CacheManager extends Logging { */ def uncacheQuery(spark: SparkSession, plan: LogicalPlan, blocking: Boolean): Unit = writeLock { val it = cachedData.iterator() + val cachedList = cachedData.asScala while (it.hasNext) { val cd = it.next() - if (cd.plan.find(_.sameResult(plan)).isDefined) { - cd.cachedRepresentation.cachedColumnBuffers.unpersist(blocking) - it.remove() + cachedList.find(cData => plan.sameResult(cData.plan)) match { + case Some(_) => + cd.cachedRepresentation.cachedColumnBuffers.unpersist(blocking) + it.remove() + case _ => } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala index e0561ee2797a5..4667d798d4f7b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala @@ -80,6 +80,22 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext { assert(ds2.storageLevel == StorageLevel.NONE, "The Dataset ds2 should not be cached.") } + test("SPARK-21478: persist parent and child Dataset and unpersist parent Dataset") { + val ds1 = Seq(1).toDF() + ds1.persist() + ds1.count() + assert(ds1.storageLevel.useMemory) + + val ds2 = ds1.select($"value" * 2) + ds2.persist() + ds2.count() + assert(ds2.storageLevel.useMemory) + + ds1.unpersist() + assert(ds1.storageLevel == StorageLevel.NONE, "The Dataset ds1 should not be cached.") + assert(ds2.storageLevel.useMemory, "The Dataset ds2 should be cached.") + } + test("persist and then groupBy columns asKey, map") { val ds = Seq(("a", 10), ("a", 20), ("b", 1), ("b", 2), ("c", 1)).toDS() val grouped = ds.groupByKey(_._1) From 857b3dd5804355331d3bef4eb8136e6604232758 Mon Sep 17 00:00:00 2001 From: vinodkc Date: Sat, 29 Jul 2017 23:31:01 +0530 Subject: [PATCH 2/2] Updated test cases and condition for unpersist --- .../org/apache/spark/sql/execution/CacheManager.scala | 11 ++++------- .../org/apache/spark/sql/DatasetCacheSuite.scala | 8 +++++++- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index dc5eb8de098de..700c7986dea99 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -114,18 +114,15 @@ class CacheManager extends Logging { } /** - * Un-cache all the cache entries that refer to the given plan. + * Un-cache the cache entry that refers to the given plan. */ def uncacheQuery(spark: SparkSession, plan: LogicalPlan, blocking: Boolean): Unit = writeLock { val it = cachedData.iterator() - val cachedList = cachedData.asScala while (it.hasNext) { val cd = it.next() - cachedList.find(cData => plan.sameResult(cData.plan)) match { - case Some(_) => - cd.cachedRepresentation.cachedColumnBuffers.unpersist(blocking) - it.remove() - case _ => + if (plan.sameResult(cd.plan)) { + cd.cachedRepresentation.cachedColumnBuffers.unpersist(blocking) + it.remove() } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala index 4667d798d4f7b..40f07e37fe0d1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetCacheSuite.scala @@ -43,6 +43,10 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext { // joined Dataset should not be persisted val joined = ds1.joinWith(ds2, $"a.value" === $"b.value") assert(joined.storageLevel == StorageLevel.NONE) + // cleanup + ds1.unpersist() + assert(ds1.storageLevel == StorageLevel.NONE, "The Dataset ds1 should not be cached.") + } test("persist and unpersist") { @@ -58,7 +62,7 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext { 2, 3, 4) // Drop the cache. cached.unpersist() - assert(cached.storageLevel == StorageLevel.NONE, "The Dataset should not be cached.") + assert(cached.storageLevel == StorageLevel.NONE, "The Dataset cached should not be cached.") } test("persist and then rebind right encoder when join 2 datasets") { @@ -94,6 +98,8 @@ class DatasetCacheSuite extends QueryTest with SharedSQLContext { ds1.unpersist() assert(ds1.storageLevel == StorageLevel.NONE, "The Dataset ds1 should not be cached.") assert(ds2.storageLevel.useMemory, "The Dataset ds2 should be cached.") + ds2.unpersist() + assert(ds2.storageLevel == StorageLevel.NONE, "The Dataset ds2 should not be cached.") } test("persist and then groupBy columns asKey, map") {