Skip to content

Conversation

@JoshRosen
Copy link
Contributor

What changes were proposed in this pull request?

In order to respond to task cancellation, Spark tasks must periodically check TaskContext.isInterrupted(), but this check is missing on a few critical read paths used in Spark SQL, including FileScanRDD, JDBCRDD, and UnsafeSorter-based sorts. This can cause interrupted / cancelled tasks to continue running and become zombies (as also described in #16189).

This patch aims to fix this problem by adding TaskContext.isInterrupted() checks to these paths. Note that I could have used InterruptibleIterator to simply wrap a bunch of iterators but in some cases this would have an adverse performance penalty or might not be effective due to certain special uses of Iterators in Spark SQL. Instead, I inlined InterruptibleIterator-style logic into existing iterator subclasses.

How was this patch tested?

Tested manually in spark-shell with two different reproductions of non-cancellable tasks, one involving scans of huge files and another involving sort-merge joins that spill to disk. Both causes of zombie tasks are fixed by the changes added here.


CompletionIterator[InternalRow, Iterator[InternalRow]](rowsIterator, close())
CompletionIterator[InternalRow, Iterator[InternalRow]](
new InterruptibleIterator(context, rowsIterator), close())
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose I could also have added the check into resultSetToSparkInternalRows but that function is exposed for use outside of Spark internals. Also, I think that JDBCRDD is going to be slow enough that the performance impact here shouldn't be noticeable.

private byte[] arr = new byte[1024 * 1024];
private Object baseObject = arr;
private final long baseOffset = Platform.BYTE_ARRAY_OFFSET;
private final TaskContext taskContext = TaskContext.get();
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't want to change the constructor, hence this pattern.

// to avoid performance overhead. This check is added here in `loadNext()` instead of in
// `hasNext()` because it's technically possible for the caller to be relying on
// `getNumRecords()` instead of `hasNext()` to know when to stop.
if (taskContext != null && taskContext.isInterrupted()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TaskContext can be null in case this is used on the driver outside of the context of a specific task.

@JoshRosen
Copy link
Contributor Author

For a bit of context on the UnsafeSorter changes, note that there are currently 5 implementations of UnsafeSorterIterator but three of those are just chaining / merging wrappers around the two implementations modified here.

@hvanhovell
Copy link
Contributor

LGTM - pending jenkins.

@SparkQA
Copy link

SparkQA commented Dec 20, 2016

Test build #70378 has finished for PR 16340 at commit 236efe5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@hvanhovell
Copy link
Contributor

hvanhovell commented Dec 20, 2016

Merging to master/2.1. Thanks!

asfgit pushed a commit that referenced this pull request Dec 20, 2016
…DD & UnsafeSorter

## What changes were proposed in this pull request?

In order to respond to task cancellation, Spark tasks must periodically check `TaskContext.isInterrupted()`, but this check is missing on a few critical read paths used in Spark SQL, including `FileScanRDD`, `JDBCRDD`, and UnsafeSorter-based sorts. This can cause interrupted / cancelled tasks to continue running and become zombies (as also described in #16189).

This patch aims to fix this problem by adding `TaskContext.isInterrupted()` checks to these paths. Note that I could have used `InterruptibleIterator` to simply wrap a bunch of iterators but in some cases this would have an adverse performance penalty or might not be effective due to certain special uses of Iterators in Spark SQL. Instead, I inlined `InterruptibleIterator`-style logic into existing iterator subclasses.

## How was this patch tested?

Tested manually in `spark-shell` with two different reproductions of non-cancellable tasks, one involving scans of huge files and another involving sort-merge joins that spill to disk. Both causes of zombie tasks are fixed by the changes added here.

Author: Josh Rosen <[email protected]>

Closes #16340 from JoshRosen/sql-task-interruption.

(cherry picked from commit 5857b9a)
Signed-off-by: Herman van Hovell <[email protected]>
@asfgit asfgit closed this in 5857b9a Dec 20, 2016
@JoshRosen JoshRosen deleted the sql-task-interruption branch December 20, 2016 19:47
JoshRosen added a commit to JoshRosen/spark that referenced this pull request Dec 20, 2016
…DD & UnsafeSorter

In order to respond to task cancellation, Spark tasks must periodically check `TaskContext.isInterrupted()`, but this check is missing on a few critical read paths used in Spark SQL, including `FileScanRDD`, `JDBCRDD`, and UnsafeSorter-based sorts. This can cause interrupted / cancelled tasks to continue running and become zombies (as also described in apache#16189).

This patch aims to fix this problem by adding `TaskContext.isInterrupted()` checks to these paths. Note that I could have used `InterruptibleIterator` to simply wrap a bunch of iterators but in some cases this would have an adverse performance penalty or might not be effective due to certain special uses of Iterators in Spark SQL. Instead, I inlined `InterruptibleIterator`-style logic into existing iterator subclasses.

Tested manually in `spark-shell` with two different reproductions of non-cancellable tasks, one involving scans of huge files and another involving sort-merge joins that spill to disk. Both causes of zombie tasks are fixed by the changes added here.

Author: Josh Rosen <[email protected]>

Closes apache#16340 from JoshRosen/sql-task-interruption.
asfgit pushed a commit that referenced this pull request Dec 21, 2016
…anRDD, JDBCRDD & UnsafeSorter

This is a branch-2.0 backport of #16340; the original description follows:

## What changes were proposed in this pull request?

In order to respond to task cancellation, Spark tasks must periodically check `TaskContext.isInterrupted()`, but this check is missing on a few critical read paths used in Spark SQL, including `FileScanRDD`, `JDBCRDD`, and UnsafeSorter-based sorts. This can cause interrupted / cancelled tasks to continue running and become zombies (as also described in #16189).

This patch aims to fix this problem by adding `TaskContext.isInterrupted()` checks to these paths. Note that I could have used `InterruptibleIterator` to simply wrap a bunch of iterators but in some cases this would have an adverse performance penalty or might not be effective due to certain special uses of Iterators in Spark SQL. Instead, I inlined `InterruptibleIterator`-style logic into existing iterator subclasses.

## How was this patch tested?

Tested manually in `spark-shell` with two different reproductions of non-cancellable tasks, one involving scans of huge files and another involving sort-merge joins that spill to disk. Both causes of zombie tasks are fixed by the changes added here.

Author: Josh Rosen <[email protected]>

Closes #16357 from JoshRosen/sql-task-interruption-branch-2.0.
uzadude pushed a commit to uzadude/spark that referenced this pull request Jan 27, 2017
…DD & UnsafeSorter

## What changes were proposed in this pull request?

In order to respond to task cancellation, Spark tasks must periodically check `TaskContext.isInterrupted()`, but this check is missing on a few critical read paths used in Spark SQL, including `FileScanRDD`, `JDBCRDD`, and UnsafeSorter-based sorts. This can cause interrupted / cancelled tasks to continue running and become zombies (as also described in apache#16189).

This patch aims to fix this problem by adding `TaskContext.isInterrupted()` checks to these paths. Note that I could have used `InterruptibleIterator` to simply wrap a bunch of iterators but in some cases this would have an adverse performance penalty or might not be effective due to certain special uses of Iterators in Spark SQL. Instead, I inlined `InterruptibleIterator`-style logic into existing iterator subclasses.

## How was this patch tested?

Tested manually in `spark-shell` with two different reproductions of non-cancellable tasks, one involving scans of huge files and another involving sort-merge joins that spill to disk. Both causes of zombie tasks are fixed by the changes added here.

Author: Josh Rosen <[email protected]>

Closes apache#16340 from JoshRosen/sql-task-interruption.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants