-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-19736][SQL] refreshByPath should clear all cached plans with the specified path #17064
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #73464 has finished for PR 17064 at commit
|
|
Thanks, LGTM. |
|
@kiszk Thanks. |
|
cc @cloud-fan |
| } | ||
| sparkSession.sharedState.cacheManager.cacheQuery(Dataset.ofRows(sparkSession, data.plan)) | ||
| case _ => // Do Nothing | ||
| cachedData.filter { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why the previous one doesn't work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This kind of collection can't be modified during iterating. Some elements are not iterated over if we delete/add elements.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but we are still modifying it during iteration, after the filter. can you be more specific about what the problem is?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we use a java collection so that we can remove elements while iterating?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After filter, we iterate on a different collection than cachedData, so it is no problem to add/delete elements to cachedData.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem can be shown clearly with an example code snippet:
val t = new scala.collection.mutable.ArrayBuffer[Int]
t += 1
t += 2
t.foreach {
case i if i > 0 =>
println(s"i = $i")
val index = t.indexWhere(_ == i)
if (index >= 0) {
t.remove(index)
}
println(s"t: $t")
t += (i + 2)
println(s"t: $t")
}
Output:
i = 1 // The first iteration, we get the first element "1"
t: ArrayBuffer(2) // "1" has been removed from the array
t: ArrayBuffer(2, 3) // New element "3" has been inserted
i = 3 // In next iteration, element "2" is wrongly skipped
t: ArrayBuffer(2) // "3" has been removed from the array
t: ArrayBuffer(2, 5)
The element "2" is never iterated over.
|
@cloud-fan I noticed you open #17097, so I should close this? |
|
no you shouldn't. That's a refactor PR and accidently fixed the same bug. |
|
|
||
| test("refreshByPath should refresh all cached plans with the specified path") { | ||
| def f(path: String, spark: SparkSession, dataCount: Int): DataFrame = { | ||
| spark.catalog.refreshByPath(path) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can put spark.range(dataCount).write.mode("overwrite").parquet(path) at the beginning of this method and name it testRefreshByPath instead of f
| val df1 = df.filter("id > 11") | ||
| df1.cache | ||
| assert(df1.count == dataCount - 12) | ||
| df1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't get it, so we call refreshByPath before caching the query? Shouldn't we test the opposite order?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The function is called twice. So actually it is meant to refresh the cache in first call. Since I will change the test to what you suggested #17064 (comment), we can get rid of this confusing.
| } | ||
|
|
||
| withTempDir { dir => | ||
| val path = dir.getPath() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we usually call dir.getCanonicalPath
| assert(f(path, spark, 100).count == 88) | ||
|
|
||
| spark.range(1000).write.mode("overwrite").parquet(path) | ||
| assert(f(path, spark, 1000).count == 988) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can make this test more explicit
spark.range(10).write.mode("overwrite").parquet(path)
spark.read.parquet(path).cache()
spark.read.parquet(path).filter($"id" > 4).cache()
assert(spark.read.parquet(path).filter($"id" > 4).count() == 5)
spark.range(20).write.mode("overwrite").parquet(path)
spark.catalog.refreshByPath(path)
assert(spark.read.parquet(path).filter($"id" > 4).count() == 15)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. Looks simpler and more explicit.
|
@cloud-fan Thanks. I will address your comments soon. |
|
Test build #73641 has finished for PR 17064 at commit
|
|
thanks merging to master! |
|
@cloud-fan Thank you! |
…L] Backport Three Cache-related PRs to Spark 2.1 ### What changes were proposed in this pull request? Backport a few cache related PRs: --- [[SPARK-19093][SQL] Cached tables are not used in SubqueryExpression](#16493) Consider the plans inside subquery expressions while looking up cache manager to make use of cached data. Currently CacheManager.useCachedData does not consider the subquery expressions in the plan. --- [[SPARK-19736][SQL] refreshByPath should clear all cached plans with the specified path](#17064) Catalog.refreshByPath can refresh the cache entry and the associated metadata for all dataframes (if any), that contain the given data source path. However, CacheManager.invalidateCachedPath doesn't clear all cached plans with the specified path. It causes some strange behaviors reported in SPARK-15678. --- [[SPARK-19765][SPARK-18549][SQL] UNCACHE TABLE should un-cache all cached plans that refer to this table](#17097) When un-cache a table, we should not only remove the cache entry for this table, but also un-cache any other cached plans that refer to this table. The following commands trigger the table uncache: `DropTableCommand`, `TruncateTableCommand`, `AlterTableRenameCommand`, `UncacheTableCommand`, `RefreshTable` and `InsertIntoHiveTable` This PR also includes some refactors: - use java.util.LinkedList to store the cache entries, so that it's safer to remove elements while iterating - rename invalidateCache to recacheByPlan, which is more obvious about what it does. ### How was this patch tested? N/A Author: Xiao Li <[email protected]> Closes #17319 from gatorsmile/backport-17097.
What changes were proposed in this pull request?
Catalog.refreshByPathcan refresh the cache entry and the associated metadata for all dataframes (if any), that contain the given data source path.However,
CacheManager.invalidateCachedPathdoesn't clear all cached plans with the specified path. It causes some strange behaviors reported in SPARK-15678.How was this patch tested?
Jenkins tests.
Please review http://spark.apache.org/contributing.html before opening a pull request.