-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-10714][SPARK-8632][SPARK-10685][SQL] Refactor Python UDF handling #8835
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
62fdea2 to
8bfe6c4
Compare
|
cc @davies |
|
Test build #42712 has finished for PR 8835 at commit
|
|
Could you try to re-use the code in Python UDF? |
|
@davies I've updated BatchPythonEvaluation too. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should keep the copy() here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea
|
Test build #42719 has finished for PR 8835 at commit
|
|
Test build #42724 has finished for PR 8835 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we mitigate this by using a LinkedBlockingDeque to have the producer-side block on inserts once the queue grows to a certain size?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Per discussion offline, the only scenario where the queue can grow really large is when the Python buffer size has been configured to be very large and the UDF result rows are very small. As a result, I think that this comment should be expanded / clarified, but this can take place in a followup PR.
|
Based on some offline discussion / debate, we've decided to merge this patch into both master and branch-1.5. I'm going to merge this now. |
…ndling This patch refactors Python UDF handling: 1. Extract the per-partition Python UDF calling logic from PythonRDD into a PythonRunner. PythonRunner itself expects iterator as input/output, and thus has no dependency on RDD. This way, we can use PythonRunner directly in a mapPartitions call, or in the future in an environment without RDDs. 2. Use PythonRunner in Spark SQL's BatchPythonEvaluation. 3. Updated BatchPythonEvaluation to only use its input once, rather than twice. This should fix Python UDF performance regression in Spark 1.5. There are a number of small cleanups I wanted to do when I looked at the code, but I kept most of those out so the diff looks small. This basically implements the approach in #8833, but with some code moving around so the correctness doesn't depend on the inner workings of Spark serialization and task execution. Author: Reynold Xin <[email protected]> Closes #8835 from rxin/python-iter-refactor. (cherry picked from commit a96ba40) Signed-off-by: Josh Rosen <[email protected]>
This patch refactors Python UDF handling:
There are a number of small cleanups I wanted to do when I looked at the code, but I kept most of those out so the diff looks small.
This basically implements the approach in #8833, but with some code moving around so the correctness doesn't depend on the inner workings of Spark serialization and task execution.