@@ -20,6 +20,7 @@ package org.apache.spark.sql.execution
2020import java .util .concurrent .locks .ReentrantReadWriteLock
2121
2222import scala .collection .JavaConverters ._
23+ import scala .collection .mutable
2324
2425import org .apache .hadoop .fs .{FileSystem , Path }
2526
@@ -120,7 +121,7 @@ class CacheManager extends Logging {
120121 def uncacheQuery (
121122 query : Dataset [_],
122123 cascade : Boolean ,
123- blocking : Boolean = true ): Unit = writeLock {
124+ blocking : Boolean = true ): Unit = {
124125 uncacheQuery(query.sparkSession, query.logicalPlan, cascade, blocking)
125126 }
126127
@@ -136,21 +137,27 @@ class CacheManager extends Logging {
136137 spark : SparkSession ,
137138 plan : LogicalPlan ,
138139 cascade : Boolean ,
139- blocking : Boolean ): Unit = writeLock {
140+ blocking : Boolean ): Unit = {
140141 val shouldRemove : LogicalPlan => Boolean =
141142 if (cascade) {
142143 _.find(_.sameResult(plan)).isDefined
143144 } else {
144145 _.sameResult(plan)
145146 }
146- val it = cachedData.iterator()
147- while (it.hasNext) {
148- val cd = it.next()
149- if (shouldRemove(cd.plan)) {
150- cd.cachedRepresentation.cacheBuilder.clearCache(blocking)
151- it.remove()
147+ val plansToUncache = mutable.Buffer [CachedData ]()
148+ writeLock {
149+ val it = cachedData.iterator()
150+ while (it.hasNext) {
151+ val cd = it.next()
152+ if (shouldRemove(cd.plan)) {
153+ plansToUncache += cd
154+ it.remove()
155+ }
152156 }
153157 }
158+ plansToUncache.foreach { cd =>
159+ cd.cachedRepresentation.cacheBuilder.clearCache(blocking)
160+ }
154161 // Re-compile dependent cached queries after removing the cached query.
155162 if (! cascade) {
156163 recacheByCondition(spark, _.find(_.sameResult(plan)).isDefined, clearCache = false )
@@ -160,34 +167,44 @@ class CacheManager extends Logging {
160167 /**
161168 * Tries to re-cache all the cache entries that refer to the given plan.
162169 */
163- def recacheByPlan (spark : SparkSession , plan : LogicalPlan ): Unit = writeLock {
170+ def recacheByPlan (spark : SparkSession , plan : LogicalPlan ): Unit = {
164171 recacheByCondition(spark, _.find(_.sameResult(plan)).isDefined)
165172 }
166173
167174 private def recacheByCondition (
168175 spark : SparkSession ,
169176 condition : LogicalPlan => Boolean ,
170177 clearCache : Boolean = true ): Unit = {
171- val it = cachedData.iterator()
172178 val needToRecache = scala.collection.mutable.ArrayBuffer .empty[CachedData ]
173- while (it.hasNext) {
174- val cd = it.next()
175- if (condition(cd.plan)) {
176- if (clearCache) {
177- cd.cachedRepresentation.cacheBuilder.clearCache()
179+ writeLock {
180+ val it = cachedData.iterator()
181+ while (it.hasNext) {
182+ val cd = it.next()
183+ if (condition(cd.plan)) {
184+ needToRecache += cd
185+ // Remove the cache entry before we create a new one, so that we can have a different
186+ // physical plan.
187+ it.remove()
188+ }
189+ }
190+ }
191+ needToRecache.map { cd =>
192+ if (clearCache) {
193+ cd.cachedRepresentation.cacheBuilder.clearCache()
194+ }
195+ val plan = spark.sessionState.executePlan(cd.plan).executedPlan
196+ val newCache = InMemoryRelation (
197+ cacheBuilder = cd.cachedRepresentation.cacheBuilder.withCachedPlan(plan),
198+ logicalPlan = cd.plan)
199+ val recomputedPlan = cd.copy(cachedRepresentation = newCache)
200+ writeLock {
201+ if (lookupCachedData(recomputedPlan.plan).nonEmpty) {
202+ logWarning(" While recaching, data was already added to cache." )
203+ } else {
204+ cachedData.add(recomputedPlan)
178205 }
179- // Remove the cache entry before we create a new one, so that we can have a different
180- // physical plan.
181- it.remove()
182- val plan = spark.sessionState.executePlan(cd.plan).executedPlan
183- val newCache = InMemoryRelation (
184- cacheBuilder = cd.cachedRepresentation.cacheBuilder.withCachedPlan(plan),
185- logicalPlan = cd.plan)
186- needToRecache += cd.copy(cachedRepresentation = newCache)
187206 }
188207 }
189-
190- needToRecache.foreach(cachedData.add)
191208 }
192209
193210 /** Optionally returns cached data for the given [[Dataset ]] */
@@ -225,7 +242,7 @@ class CacheManager extends Logging {
225242 * Tries to re-cache all the cache entries that contain `resourcePath` in one or more
226243 * `HadoopFsRelation` node(s) as part of its logical plan.
227244 */
228- def recacheByPath (spark : SparkSession , resourcePath : String ): Unit = writeLock {
245+ def recacheByPath (spark : SparkSession , resourcePath : String ): Unit = {
229246 val (fs, qualifiedPath) = {
230247 val path = new Path (resourcePath)
231248 val fs = path.getFileSystem(spark.sessionState.newHadoopConf())
0 commit comments