Skip to content

Conversation

@xinrong-meng
Copy link
Member

@xinrong-meng xinrong-meng commented Apr 10, 2023

What changes were proposed in this pull request?

Implement Arrow-optimized Python UDFs in Spark Connect.

Please see #39384 for motivation and performance improvements of Arrow-optimized Python UDFs.

Why are the changes needed?

Parity with vanilla PySpark.

Does this PR introduce any user-facing change?

Yes. In Spark Connect Python Client, users can:

  1. Set useArrow parameter True to enable Arrow optimization for a specific Python UDF.
>>> df = spark.range(2)
>>> df.select(udf(lambda x : x + 1, useArrow=True)('id')).show()
+------------+                                                                  
|<lambda>(id)|
+------------+
|           1|
|           2|
+------------+

# ArrowEvalPython indicates Arrow optimization
>>> df.select(udf(lambda x : x + 1, useArrow=True)('id')).explain()
== Physical Plan ==
*(2) Project [pythonUDF0#18 AS <lambda>(id)#16]
+- ArrowEvalPython [<lambda>(id#14L)#15], [pythonUDF0#18], 200
   +- *(1) Range (0, 2, step=1, splits=1)
  1. Enable spark.sql.execution.pythonUDF.arrow.enabled Spark Conf to make all Python UDFs Arrow-optimized.
>>> spark.conf.set("spark.sql.execution.pythonUDF.arrow.enabled", True)
>>> df.select(udf(lambda x : x + 1)('id')).show()
+------------+                                                                  
|<lambda>(id)|
+------------+
|           1|
|           2|
+------------+

# ArrowEvalPython indicates Arrow optimization
>>> df.select(udf(lambda x : x + 1)('id')).explain()
== Physical Plan ==
*(2) Project [pythonUDF0#30 AS <lambda>(id)#28]
+- ArrowEvalPython [<lambda>(id#26L)#27], [pythonUDF0#30], 200
   +- *(1) Range (0, 2, step=1, splits=1)

How was this patch tested?

Parity unit tests.

SPARK-40307

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ignoring the type annotations of _create_arrow_py_udf because it is shared between vanilla PySpark and Spark Connect Python Client.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function is only an extraction of original code L142 - L179 for code reuse.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is duplicated code in _create_py_udf between Spark Connect Python Client and vanilla PySpark, except for fetching the active SparkSession.
However, for a clear code path separation and abstraction, I decided not to refactor it for now.

@xinrong-meng
Copy link
Member Author

CI failed because of

Run echo "APACHE_SPARK_REF=$(git rev-parse HEAD)" >> $GITHUB_ENV
fatal: detected dubious ownership in repository at '/__w/spark/spark'
To add an exception for this directory, call:

	git config --global --add safe.directory /__w/spark/spark
fatal: detected dubious ownership in repository at '/__w/spark/spark'
To add an exception for this directory, call:

	git config --global --add safe.directory /__w/spark/spark
Error: Process completed with exit code 128.

@xinrong-meng xinrong-meng force-pushed the connect_arrow_py_udf branch from 95cad25 to f6fc6e1 Compare April 17, 2023 20:56
@xinrong-meng
Copy link
Member Author

@HyukjinKwon @zhengruifeng Would you please take a look? Thank you!

@HyukjinKwon
Copy link
Member

cc @ueshin FYI

import pandas as pd
from pyspark.sql.pandas.functions import _create_pandas_udf

return_type = regular_udf.returnType
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems that the regular_udf is only used to pass the returnType and evalType ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And regular_udf.func based on the updated code.

@zhengruifeng zhengruifeng changed the title [SPARK-43082][Connect][PYTHON] Arrow-optimized Python UDFs in Spark Connect [SPARK-43082][CONNECT][PYTHON] Arrow-optimized Python UDFs in Spark Connect Apr 20, 2023
@HyukjinKwon
Copy link
Member

Merged to master.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants