-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-18928][branch-2.0]Check TaskContext.isInterrupted() in FileScanRDD, JDBCRDD & UnsafeSorter #16357
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-18928][branch-2.0]Check TaskContext.isInterrupted() in FileScanRDD, JDBCRDD & UnsafeSorter #16357
Conversation
…DD & UnsafeSorter In order to respond to task cancellation, Spark tasks must periodically check `TaskContext.isInterrupted()`, but this check is missing on a few critical read paths used in Spark SQL, including `FileScanRDD`, `JDBCRDD`, and UnsafeSorter-based sorts. This can cause interrupted / cancelled tasks to continue running and become zombies (as also described in apache#16189). This patch aims to fix this problem by adding `TaskContext.isInterrupted()` checks to these paths. Note that I could have used `InterruptibleIterator` to simply wrap a bunch of iterators but in some cases this would have an adverse performance penalty or might not be effective due to certain special uses of Iterators in Spark SQL. Instead, I inlined `InterruptibleIterator`-style logic into existing iterator subclasses. Tested manually in `spark-shell` with two different reproductions of non-cancellable tasks, one involving scans of huge files and another involving sort-merge joins that spill to disk. Both causes of zombie tasks are fixed by the changes added here. Author: Josh Rosen <[email protected]> Closes apache#16340 from JoshRosen/sql-task-interruption.
|
LGTM |
| // `getNumRecords()` instead of `hasNext()` to know when to stop. | ||
| if (taskContext != null && taskContext.isInterrupted()) { | ||
| throw new TaskKilledException(); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wont this not be in the internal tight loop for reading data ?
If yes, dereferencing a volatile for each tuple processed is worrying.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already have this in tight loops in the form of InterruptibleIterator wrapping all over the place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In an admittedly non-scientific benchmark, I tried running
import org.apache.spark._
sc.parallelize(1 to (1000 * 1000 * 1000), 1).mapPartitions { iter =>
val tc = TaskContext.get()
iter.map { x =>
tc.isInterrupted()
x + 1
}
}.count()
a few times with and without the tc.isInterrupted() check and there wasn't a measurable time difference in my environment. While I imagine that the volatile read could incur some higher costs in certain circumstances I think that the overhead of all of the virtual function calls and iterator interfaces, etc. will mask any gains by optimizing this read.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As you mentioned, this is unscientific microbenchmark :-)
Also, the isInterrupted was probably optimized away anyway ?
Since we dont need gaurantees on how soon interruption is to be honoured, batching its application (if possible) would be better in sorting ?
Yes, we do have InterruptibleIterator - unfortunately, we dont have a way to optimize that (afaik).
| // to avoid performance overhead. | ||
| if (context.isInterrupted()) { | ||
| throw new TaskKilledException | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not move this to next() instead of hasNext (latter is not mandatory to be called - as seen here : #16252)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This particular iterator already won't work unless hasNext() is called since in that case nobody will call nextIterator().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, looks like this iterator is already broken; and we are adding to that now.
|
Test build #70417 has finished for PR 16357 at commit
|
|
Merging to branch 2.0. |
…anRDD, JDBCRDD & UnsafeSorter This is a branch-2.0 backport of #16340; the original description follows: ## What changes were proposed in this pull request? In order to respond to task cancellation, Spark tasks must periodically check `TaskContext.isInterrupted()`, but this check is missing on a few critical read paths used in Spark SQL, including `FileScanRDD`, `JDBCRDD`, and UnsafeSorter-based sorts. This can cause interrupted / cancelled tasks to continue running and become zombies (as also described in #16189). This patch aims to fix this problem by adding `TaskContext.isInterrupted()` checks to these paths. Note that I could have used `InterruptibleIterator` to simply wrap a bunch of iterators but in some cases this would have an adverse performance penalty or might not be effective due to certain special uses of Iterators in Spark SQL. Instead, I inlined `InterruptibleIterator`-style logic into existing iterator subclasses. ## How was this patch tested? Tested manually in `spark-shell` with two different reproductions of non-cancellable tasks, one involving scans of huge files and another involving sort-merge joins that spill to disk. Both causes of zombie tasks are fixed by the changes added here. Author: Josh Rosen <[email protected]> Closes #16357 from JoshRosen/sql-task-interruption-branch-2.0.
|
@yhuai as I already mentioned a couple of days back, please allow more time for review - not everyone is in the same time zone or have the same working hours. Thanks. |
|
@mridulm ok. I merged this because it is a backport (the original patch has already been merged to 2.1 and master) and I believe Josh has already addressed your concerns. If you want us hold the merge, it will be good to explicitly mention it next time. Thanks! |
|
@yhuai I had not seen the original PR - else would have commented there. |
This is a branch-2.0 backport of #16340; the original description follows:
What changes were proposed in this pull request?
In order to respond to task cancellation, Spark tasks must periodically check
TaskContext.isInterrupted(), but this check is missing on a few critical read paths used in Spark SQL, includingFileScanRDD,JDBCRDD, and UnsafeSorter-based sorts. This can cause interrupted / cancelled tasks to continue running and become zombies (as also described in #16189).This patch aims to fix this problem by adding
TaskContext.isInterrupted()checks to these paths. Note that I could have usedInterruptibleIteratorto simply wrap a bunch of iterators but in some cases this would have an adverse performance penalty or might not be effective due to certain special uses of Iterators in Spark SQL. Instead, I inlinedInterruptibleIterator-style logic into existing iterator subclasses.How was this patch tested?
Tested manually in
spark-shellwith two different reproductions of non-cancellable tasks, one involving scans of huge files and another involving sort-merge joins that spill to disk. Both causes of zombie tasks are fixed by the changes added here.