-
Notifications
You must be signed in to change notification settings - Fork 28.9k
SPARK-1579: Clean up PythonRDD and avoid swallowing IOExceptions #640
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This patch includes several cleanups to PythonRDD, focused around
fixing SPARK-1579 cleanly. Listed in order of importance:
- The Python daemon waits for Spark to close the socket before exiting,
in order to avoid causing spurious IOExceptions in Spark's
PythonRDD::WriterThread.
- Removes the Python Monitor Thread, which polled for task cancellations
in order to kill the Python worker. Instead, we do this in the
onCompleteCallback, since this is guaranteed to be called during
cancellation.
- Adds a "completed" variable to TaskContext to avoid the issue noted in
SPARK-1019, where onCompleteCallbacks may be execution-order dependent.
Along with this, I removed the "context.interrupted = true" flag in
the onCompleteCallback.
- Extracts PythonRDD::WriterThread to its own class.
Since this patch provides an alternative solution to SPARK-1019, I did
test it with
```sc.textFile("latlon.tsv").take(5)```
many times without error.
Additionally, in order to test the unswallowed exceptions, I performed
```sc.textFile("s3n://<big file>").count()```
and cut my internet. Prior to this patch, we got the "stdin writer
exited early" message, which is unhelpful. Now, we get the
SocketExceptions propagated through Spark to the user and get
proper (but unsuccessful) task retries.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ahirreddy This logic replaces the monitor thread. Please take a look.
|
Merged build triggered. |
|
Merged build started. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@pwendell I removed this line and the comment regarding SPARK-1019 because we now check for task completion in the worker thread before throwing an exception. Please let me know if this fix is insufficient (I'm not really sure why we had to interrupt before?).
|
Merged build finished. |
|
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14655/ |
|
Looks like the Python tests may have timed out; do they work locally? |
|
Oops, this is actually due to a bug in my reimplementation of Python cancellation. onCompleteCallback doesn't get called if the iterator is stuck reading from Python. I can just revert that part of the patch to continue to use a separate thread, or I can add an "onInterruptCallback" to TaskContext. I will look more into it tomorrow morning. |
|
Merged build triggered. |
|
Merged build started. |
|
Merged build finished. |
|
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14668/ |
|
Merged build triggered. |
|
Merged build started. |
|
Merged build finished. |
|
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14685/ |
|
Merged build triggered. |
|
Merged build started. |
|
Merged build finished. |
|
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14698/ |
|
Merged build triggered. |
|
Merged build started. |
|
Merged build triggered. |
|
Merged build started. |
|
Merged build triggered. |
|
Merged build started. |
|
Merged build finished. |
|
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14702/ |
|
Merged build triggered. |
|
Merged build started. |
|
Merged build finished. |
|
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14699/ |
|
Merged build finished. |
|
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14700/ |
|
Merged build finished. |
|
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14703/ |
python/pyspark/daemon.py
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As per our offline discussion this needs to handle the case where the socket shuts down gracefully.
if sock.recv(4096) == "":
return
|
Merged build triggered. |
|
Merged build started. |
|
Merged build finished. All automated tests passed. |
|
All automated tests passed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Not used, and you probably didn't mean to move the other import
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mm, it was decided (on the dev list) that we'd put Scala language features above all other imports. I guess that hasn't made its way to the code style guide yet...
Removed the Try.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
o, so the new order is scala > java > library > spark?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(talked offline, but just in case anyone else was curious, the order is scala.language.* > java > scala > library > spark. The scala language features are made special because they're intended to change the way the compiler behaves, although it's more of a futureproof thing right now.)
|
Okay I did a quick pass and this looks good. I'm going to pull this in. Thanks @aarondav! |
This patch includes several cleanups to PythonRDD, focused around fixing [SPARK-1579](https://issues.apache.org/jira/browse/SPARK-1579) cleanly. Listed in order of approximate importance: - The Python daemon waits for Spark to close the socket before exiting, in order to avoid causing spurious IOExceptions in Spark's `PythonRDD::WriterThread`. - Removes the Python Monitor Thread, which polled for task cancellations in order to kill the Python worker. Instead, we do this in the onCompleteCallback, since this is guaranteed to be called during cancellation. - Adds a "completed" variable to TaskContext to avoid the issue noted in [SPARK-1019](https://issues.apache.org/jira/browse/SPARK-1019), where onCompleteCallbacks may be execution-order dependent. Along with this, I removed the "context.interrupted = true" flag in the onCompleteCallback. - Extracts PythonRDD::WriterThread to its own class. Since this patch provides an alternative solution to [SPARK-1019](https://issues.apache.org/jira/browse/SPARK-1019), I did test it with ``` sc.textFile("latlon.tsv").take(5) ``` many times without error. Additionally, in order to test the unswallowed exceptions, I performed ``` sc.textFile("s3n://<big file>").count() ``` and cut my internet during execution. Prior to this patch, we got the "stdin writer exited early" message, which was unhelpful. Now, we get the SocketExceptions propagated through Spark to the user and get proper (though unsuccessful) task retries. Author: Aaron Davidson <[email protected]> Closes #640 from aarondav/pyspark-io and squashes the following commits: b391ff8 [Aaron Davidson] Detect "clean socket shutdowns" and stop waiting on the socket c0c49da [Aaron Davidson] SPARK-1579: Clean up PythonRDD and avoid swallowing IOExceptions (cherry picked from commit 3308722) Signed-off-by: Patrick Wendell <[email protected]>
It makes little sense to start a TaskContext that is interrupted. Indeed, I searched for all use cases of it and didn't find a single instance in which `interrupted` is true on construction. This was inspired by reviewing #640, which adds an additional `@volatile var completed` that is similar. These are not the most urgent changes, but I wanted to push them out before I forget. Author: Andrew Or <[email protected]> Closes #675 from andrewor14/task-context and squashes the following commits: 9575e02 [Andrew Or] Add space 69455d1 [Andrew Or] Merge branch 'master' of github.com:apache/spark into task-context c471490 [Andrew Or] Oops, removed one flag too many. Adding it back. 85311f8 [Andrew Or] Move interrupted flag from TaskContext constructor (cherry picked from commit c3f8b78) Signed-off-by: Aaron Davidson <[email protected]>
It makes little sense to start a TaskContext that is interrupted. Indeed, I searched for all use cases of it and didn't find a single instance in which `interrupted` is true on construction. This was inspired by reviewing #640, which adds an additional `@volatile var completed` that is similar. These are not the most urgent changes, but I wanted to push them out before I forget. Author: Andrew Or <[email protected]> Closes #675 from andrewor14/task-context and squashes the following commits: 9575e02 [Andrew Or] Add space 69455d1 [Andrew Or] Merge branch 'master' of github.com:apache/spark into task-context c471490 [Andrew Or] Oops, removed one flag too many. Adding it back. 85311f8 [Andrew Or] Move interrupted flag from TaskContext constructor
This patch includes several cleanups to PythonRDD, focused around fixing [SPARK-1579](https://issues.apache.org/jira/browse/SPARK-1579) cleanly. Listed in order of approximate importance: - The Python daemon waits for Spark to close the socket before exiting, in order to avoid causing spurious IOExceptions in Spark's `PythonRDD::WriterThread`. - Removes the Python Monitor Thread, which polled for task cancellations in order to kill the Python worker. Instead, we do this in the onCompleteCallback, since this is guaranteed to be called during cancellation. - Adds a "completed" variable to TaskContext to avoid the issue noted in [SPARK-1019](https://issues.apache.org/jira/browse/SPARK-1019), where onCompleteCallbacks may be execution-order dependent. Along with this, I removed the "context.interrupted = true" flag in the onCompleteCallback. - Extracts PythonRDD::WriterThread to its own class. Since this patch provides an alternative solution to [SPARK-1019](https://issues.apache.org/jira/browse/SPARK-1019), I did test it with ``` sc.textFile("latlon.tsv").take(5) ``` many times without error. Additionally, in order to test the unswallowed exceptions, I performed ``` sc.textFile("s3n://<big file>").count() ``` and cut my internet during execution. Prior to this patch, we got the "stdin writer exited early" message, which was unhelpful. Now, we get the SocketExceptions propagated through Spark to the user and get proper (though unsuccessful) task retries. Author: Aaron Davidson <[email protected]> Closes apache#640 from aarondav/pyspark-io and squashes the following commits: b391ff8 [Aaron Davidson] Detect "clean socket shutdowns" and stop waiting on the socket c0c49da [Aaron Davidson] SPARK-1579: Clean up PythonRDD and avoid swallowing IOExceptions
It makes little sense to start a TaskContext that is interrupted. Indeed, I searched for all use cases of it and didn't find a single instance in which `interrupted` is true on construction. This was inspired by reviewing apache#640, which adds an additional `@volatile var completed` that is similar. These are not the most urgent changes, but I wanted to push them out before I forget. Author: Andrew Or <[email protected]> Closes apache#675 from andrewor14/task-context and squashes the following commits: 9575e02 [Andrew Or] Add space 69455d1 [Andrew Or] Merge branch 'master' of github.com:apache/spark into task-context c471490 [Andrew Or] Oops, removed one flag too many. Adding it back. 85311f8 [Andrew Or] Move interrupted flag from TaskContext constructor
…n repartition case (apache#640) ## What changes were proposed in this pull request? Disable using radix sort in ShuffleExchangeExec when we do repartition. In apache#20393, we fixed the indeterministic result in the shuffle repartition case by performing a local sort before repartitioning. But for the newly added sort operation, we use radix sort which is wrong because binary data can't be compared by only the prefix. This makes the sort unstable and fails to solve the indeterminate shuffle output problem. ### Why are the changes needed? Fix the correctness bug caused by repartition after a shuffle. ### Does this PR introduce any user-facing change? Yes, user will get the right result in the case of repartition stage rerun. ## How was this patch tested? Test with `local-cluster[5, 2, 5120]`, use the integrated test below, it can return a right answer 100000000. ``` import scala.sys.process._ import org.apache.spark.TaskContext val res = spark.range(0, 10000 * 10000, 1).map{ x => (x % 1000, x)} // kill an executor in the stage that performs repartition(239) val df = res.repartition(113).map{ x => (x._1 + 1, x._2)}.repartition(239).map { x => if (TaskContext.get.attemptNumber == 0 && TaskContext.get.partitionId < 1 && TaskContext.get.stageAttemptNumber == 0) { throw new Exception("pkill -f -n java".!!) } x } val r2 = df.distinct.count() ``` Closes apache#25491 from xuanyuanking/SPARK-28699-fix. Authored-by: Yuanjian Li <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> Co-authored-by: Li Yuanjian <[email protected]>
…l rows in ColumnarToRowExec (apache#640) ### What changes were proposed in this pull request? This patch cleans up ColumnVector resource after processing all rows in ColumnarToRowExec. This patch only focus on codeben implementation of ColumnarToRowExec. For non-codegen, it should be relatively rare to use, and currently no good way has proposed, so leaving it to a follow up. ### Why are the changes needed? Currently we only assign null to ColumnarBatch object but it doesn't release the resources hold by the vectors in the batch. For OnHeapColumnVector, the Java arrays may be automatically collected by JVM, but for OffHeapColumnVector, the allocated off-heap memory will be leaked. For custom ColumnVector implementations like Arrow-based, it also possibly causes issues on memory safety if the underlying buffers are reused across batches. Because when ColumnarToRowExec begins to fill values for next batch, the arrays in previous batch are still hold. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#48767 from viirya/close_if_not_writable. Authored-by: Liang-Chi Hsieh <[email protected]> (cherry picked from commit 800faf0) Signed-off-by: Kent Yao <[email protected]> Co-authored-by: Liang-Chi Hsieh <[email protected]>
…ssing all rows in ColumnarToRowExec (apache#640)" (apache#649) This reverts commit 1afacc7.
This patch includes several cleanups to PythonRDD, focused around fixing SPARK-1579 cleanly. Listed in order of approximate importance:
in order to avoid causing spurious IOExceptions in Spark's
PythonRDD::WriterThread.in order to kill the Python worker. Instead, we do this in the
onCompleteCallback, since this is guaranteed to be called during
cancellation.
SPARK-1019, where onCompleteCallbacks may be execution-order dependent.
Along with this, I removed the "context.interrupted = true" flag in
the onCompleteCallback.
Since this patch provides an alternative solution to SPARK-1019, I did test it with
many times without error.
Additionally, in order to test the unswallowed exceptions, I performed
and cut my internet during execution. Prior to this patch, we got the "stdin writer exited early" message, which was unhelpful. Now, we get the SocketExceptions propagated through Spark to the user and get proper (though unsuccessful) task retries.