-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-14131][SQL]Add a workaround for HADOOP-10622 to fix DataFrameReaderWriterSuite #11940
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There is a potential dead-lock in Hadoop Shell.runCommand before 2.5.0 ([HADOOP-10622](https://issues.apache.org/jira/browse/HADOOP-10622)). If we interrupt some thread running Shell.runCommand, we may hit this issue. This PR adds some protecion to prevent from interrupting the microBatchThread when we may run into Shell.runCommand. There are two places will call Shell.runCommand now: - offsetLog.add - FileStreamSource.getOffset They will create a file using HDFS API and call Shell.runCommand to set the file permission.
dev/run-tests
Outdated
|
|
||
| exec python -u ./dev/run-tests.py "$@" | ||
| set -e | ||
| for i in `seq 1 300`; do |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Try to run this multiple times and see if it won't hang in DataFrameReaderWriterSuite
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will remove them later
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is from #11922 which can reproduce this issue
|
Test build #54064 has finished for PR 11940 at commit
|
| new HDFSMetadataLog[CompositeOffset](sqlContext, checkpointFile("offsets")) | ||
|
|
||
| /** A monitor to protect "uninterruptible" and "interrupted" */ | ||
| private val uninterruptibleLock = new Object |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(new Object()) ... but you're not actually using this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. I wanted to use this separate lock but forgot
|
Test build #54063 has finished for PR 11940 at commit
|
Looks this patch did fix the issue. |
This reverts commit 7680aa0.
|
Test build #54065 has finished for PR 11940 at commit
|
This is because of the known flaky ContinuousQueryListenerSuite. |
|
Test build #54081 has finished for PR 11940 at commit
|
| uninterruptibleLock.synchronized { | ||
| uninterruptible = true | ||
| // Clear the interrupted status if it's set. | ||
| if (Thread.interrupted()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
interrupted = Thread.interrupted()? Or are you trying to not unset the interrupted status if it was true?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should not set interrupted = Thread.interrupted() is simpler.interrupted to false when Thread.interrupted() return false.
|
LGTM; looking at the code after my comment I think the shorter code is safe, but that's a super minor thing anyway. |
|
@vanzin Oh, no... |
| val newData = uniqueSources.flatMap(s => s.getOffset.map(o => s -> o)) | ||
| val newData = runUninterruptiblyInMicroBatchThread { | ||
| uniqueSources.flatMap(s => s.getOffset.map(o => s -> o)) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is hard to understand (for future devs), why these should be uninterruptible. I think this needs more documentation.
|
This logic is a little complex. Could you add some unit tests to make sure this is correct? |
Is that because there are multiple calls to |
| * Run `f` uninterruptibly in "microBatchThread". "microBatchThread" won't be interrupted before | ||
| * returning from `f`. | ||
| */ | ||
| private def runUninterruptiblyInMicroBatchThread[T](f: => T): T = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This name confused me a lot. Why isnt this just runUninterruptibly? ...InMicroBatchThread sounds like it will be called from some other thread, and the func will be offloaded to run in the microbatchthread.
Yes just for safety. Since L157 calls |
I see. Can I suggest that you change the name of the variable to |
Renamed. |
|
I'm going to merge this one after tests pass to unblock other PRs. Will address further comments in another PR. |
|
Test build #54197 has finished for PR 11940 at commit
|
|
Test build #54200 has finished for PR 11940 at commit
|
|
Test build #54198 has finished for PR 11940 at commit
|
|
Test build #54203 has finished for PR 11940 at commit
|
|
Merging to master |
## What changes were proposed in this pull request? Extract the workaround for HADOOP-10622 introduced by apache#11940 into UninterruptibleThread so that we can test and reuse it. ## How was this patch tested? Unit tests Author: Shixiong Zhu <[email protected]> Closes apache#11971 from zsxwing/uninterrupt.
What changes were proposed in this pull request?
There is a potential dead-lock in Hadoop Shell.runCommand before 2.5.0 (HADOOP-10622). If we interrupt some thread running Shell.runCommand, we may hit this issue.
This PR adds some protecion to prevent from interrupting the microBatchThread when we may run into Shell.runCommand. There are two places will call Shell.runCommand now:
They will create a file using HDFS API and call Shell.runCommand to set the file permission.
How was this patch tested?
Existing unit tests.