From c4abda4545279dab353e70a4c3c393c95aeafcde Mon Sep 17 00:00:00 2001 From: Xinrong Meng Date: Wed, 4 Jan 2023 13:17:15 +0800 Subject: [PATCH 01/16] init --- python/pyspark/sql/functions.py | 6 +-- python/pyspark/sql/udf.py | 72 ++++++++++++++++++++++++++++++++- 2 files changed, 74 insertions(+), 4 deletions(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 60276b2a0b167..ca9537c8be8c8 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -44,7 +44,7 @@ from pyspark.sql.types import ArrayType, DataType, StringType, StructType, _from_numpy_type # Keep UserDefinedFunction import for backwards compatible import; moved in SPARK-22409 -from pyspark.sql.udf import UserDefinedFunction, _create_udf # noqa: F401 +from pyspark.sql.udf import UserDefinedFunction, _create_py_udf # noqa: F401 # Keep pandas_udf and PandasUDFType import for backwards compatible import; moved in SPARK-28264 from pyspark.sql.pandas.functions import pandas_udf, PandasUDFType # noqa: F401 @@ -10093,10 +10093,10 @@ def udf( # for decorator use it as a returnType return_type = f or returnType return functools.partial( - _create_udf, returnType=return_type, evalType=PythonEvalType.SQL_BATCHED_UDF + _create_py_udf, returnType=return_type, evalType=PythonEvalType.SQL_BATCHED_UDF ) else: - return _create_udf(f=f, returnType=returnType, evalType=PythonEvalType.SQL_BATCHED_UDF) + return _create_py_udf(f=f, returnType=returnType, evalType=PythonEvalType.SQL_BATCHED_UDF) def _test() -> None: diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py index 7c7be392cd384..64e7dc6f8e6ab 100644 --- a/python/pyspark/sql/udf.py +++ b/python/pyspark/sql/udf.py @@ -17,6 +17,8 @@ """ User-defined function related classes and functions """ +from inspect import getfullargspec + import functools import inspect import sys @@ -30,12 +32,16 @@ from pyspark.rdd import _prepare_for_python_RDD, PythonEvalType from pyspark.sql.column import Column, _to_java_column, _to_seq from pyspark.sql.types import ( - StringType, + ArrayType, + BinaryType, DataType, + MapType, + StringType, StructType, _parse_datatype_string, ) from pyspark.sql.pandas.types import to_arrow_type +from pyspark.sql.pandas.utils import require_minimum_pandas_version, require_minimum_pyarrow_version if TYPE_CHECKING: from pyspark.sql._typing import DataTypeOrString, ColumnOrName, UserDefinedFunctionLike @@ -75,6 +81,70 @@ def _create_udf( return udf_obj._wrapped() +def _create_py_udf( + f: Callable[..., Any], + returnType: "DataTypeOrString", + evalType: int, +) -> "UserDefinedFunctionLike": + from pyspark.sql import SparkSession + + session = SparkSession._instantiatedSession + is_arrow_enabled = ( + session is not None + and session.conf.get("spark.sql.execution.pythonUDF.arrow.enabled") == "true" + ) + regular_udf = _create_udf(f, returnType, evalType) + return_type = regular_udf.returnType + try: + is_func_with_args = len(getfullargspec(f).args) > 0 + except TypeError: + is_func_with_args = False + is_output_atomic_type = ( + not isinstance(return_type, StructType) + and not isinstance(return_type, MapType) + and not isinstance(return_type, ArrayType) + ) + if is_arrow_enabled and is_output_atomic_type and is_func_with_args: + require_minimum_pandas_version() + require_minimum_pyarrow_version() + + import pandas as pd + from pyspark.sql.pandas.functions import _create_pandas_udf + + result_func = lambda pdf: pdf + if type(return_type) == StringType: + result_func = lambda r: str(r) if r is not None else r + elif type(return_type) == BinaryType: + result_func = lambda r: bytes(r) if r is not None else r + + def vectorized_udf(*args: pd.Series) -> pd.Series: + if any(map(lambda arg: isinstance(arg, pd.DataFrame), args)): + raise NotImplementedError( + "Struct input type are not supported with Arrow optimization " + "enabled in Python UDFs. Disable " + "'spark.sql.execution.pythonUDF.arrow.enabled' to workaround." + ) + # Always cast via "result_func" because regular UDF supports more permissive casting + # compared to pandas UDFs. This is to don't break the user's codes + # from enabling this edge feature. + return pd.Series(result_func(f(*a)) for a in zip(*args)) + + # Regular UDFs can take callable instances too. + vectorized_udf.__name__ = f.__name__ if hasattr(f, "__name__") else f.__class__.__name__ + vectorized_udf.__module__ = ( + f.__module__ if hasattr(f, "__module__") else f.__class__.__module__ + ) + vectorized_udf.__doc__ = f.__doc__ + pudf = _create_pandas_udf(vectorized_udf, returnType, None) + # Keep the attributes as if this is a regular Python UDF. + pudf.func = f + pudf.returnType = return_type + pudf.evalType = regular_udf.evalType + return pudf + else: + return regular_udf + + class UserDefinedFunction: """ User defined function in Python From b6a262d5462c7786d3f2922eb9dc3ec1047402c7 Mon Sep 17 00:00:00 2001 From: Xinrong Meng Date: Wed, 4 Jan 2023 13:18:12 +0800 Subject: [PATCH 02/16] conf --- .../scala/org/apache/spark/sql/internal/SQLConf.scala | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index abde198617d3e..fa69ab0ce381e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2761,6 +2761,15 @@ object SQLConf { // show full stacktrace in tests but hide in production by default. .createWithDefault(!Utils.isTesting) + val PYTHON_UDF_ARROW_ENABLED = + buildConf("spark.sql.execution.pythonUDF.arrow.enabled") + .doc("Enable Arrow optimization in regular Python UDFs. This optimization " + + "can only be enabled for atomic output types and input types except struct and map types " + + "when the given function takes at least one argument.") + .version("3.4.0") + .booleanConf + .createWithDefault(false) + val PANDAS_GROUPED_MAP_ASSIGN_COLUMNS_BY_NAME = buildConf("spark.sql.legacy.execution.pandas.groupedMap.assignColumnsByName") .internal() From 096561f33177687a3d7135a2b3eb3eb2b469b3f8 Mon Sep 17 00:00:00 2001 From: Xinrong Meng Date: Wed, 4 Jan 2023 13:21:48 +0800 Subject: [PATCH 03/16] test --- dev/sparktestsupport/modules.py | 1 + .../sql/tests/test_arrow_python_udf.py | 90 +++++++++++++++++++ python/pyspark/sql/tests/test_udf.py | 9 +- 3 files changed, 99 insertions(+), 1 deletion(-) create mode 100644 python/pyspark/sql/tests/test_arrow_python_udf.py diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index 5df495096b72b..a6b8338bf1626 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -467,6 +467,7 @@ def __hash__(self): "pyspark.sql.observation", # unittests "pyspark.sql.tests.test_arrow", + "pyspark.sql.tests.test_arrow_python_udf", "pyspark.sql.tests.test_catalog", "pyspark.sql.tests.test_column", "pyspark.sql.tests.test_conf", diff --git a/python/pyspark/sql/tests/test_arrow_python_udf.py b/python/pyspark/sql/tests/test_arrow_python_udf.py new file mode 100644 index 0000000000000..c2c97f5ac651b --- /dev/null +++ b/python/pyspark/sql/tests/test_arrow_python_udf.py @@ -0,0 +1,90 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import unittest + +from pyspark.sql.functions import udf +from pyspark.sql.tests.test_udf import BaseUDFTests +from pyspark.testing.sqlutils import ( + have_pandas, + have_pyarrow, + pandas_requirement_message, + pyarrow_requirement_message, + ReusedSQLTestCase, +) + + +@unittest.skipIf( + not have_pandas or not have_pyarrow, pandas_requirement_message or pyarrow_requirement_message +) +class PythonUDFArrowTests(BaseUDFTests, ReusedSQLTestCase): + @classmethod + def setUpClass(cls): + super(PythonUDFArrowTests, cls).setUpClass() + cls.spark.conf.set("spark.sql.execution.pythonUDF.arrow.enabled", "true") + + @unittest.skip("Unrelated test, and it fails when it runs duplicatedly.") + def test_broadcast_in_udf(self): + super(PythonUDFArrowTests, self).test_broadcast_in_udf() + + @unittest.skip("Unrelated test, and it fails when it runs duplicatedly.") + def test_register_java_function(self): + super(PythonUDFArrowTests, self).test_register_java_function() + + @unittest.skip("Unrelated test, and it fails when it runs duplicatedly.") + def test_register_java_udaf(self): + super(PythonUDFArrowTests, self).test_register_java_udaf() + + @unittest.skip("Struct input types are not supported with Arrow optimization") + def test_udf_input_serialization_valuecompare_disabled(self): + super(PythonUDFArrowTests, self).test_udf_input_serialization_valuecompare_disabled() + + def test_nested_input_error(self): + with self.assertRaisesRegexp( + Exception, "NotImplementedError: Struct input type are not supported" + ): + self.spark.range(1).selectExpr("struct(1, 2) as struct").select( + udf(lambda x: x)("struct") + ).collect() + + def test_complex_input_types(self): + # TODO: add map type tests back when PyArrow is upgraded. + row = ( + self.spark.range(1) + .selectExpr("array(1, 2, 3) as array", "map('a', 'b') as map") + .select( + udf(lambda x: str(x))("array"), + udf(lambda x: str(x))("map"), + ) + .first() + ) + + # The input is NumPy array when the optimization is on. + self.assertEquals(row[0], "[1 2 3]") + self.assertEquals(row[1], "{'a': 'b'}") + + +if __name__ == "__main__": + from pyspark.sql.tests.test_arrow_python_udf import * # noqa: F401 + + try: + import xmlrunner + + testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", verbosity=2) + except ImportError: + testRunner = None + unittest.main(testRunner=testRunner, verbosity=2) diff --git a/python/pyspark/sql/tests/test_udf.py b/python/pyspark/sql/tests/test_udf.py index 03bcbaf6ddff0..5a674c7d5383b 100644 --- a/python/pyspark/sql/tests/test_udf.py +++ b/python/pyspark/sql/tests/test_udf.py @@ -43,7 +43,7 @@ from pyspark.testing.utils import QuietTest -class UDFTests(ReusedSQLTestCase): +class BaseUDFTests(object): def test_udf_with_callable(self): d = [Row(number=i, squared=i**2) for i in range(10)] rdd = self.sc.parallelize(d) @@ -804,6 +804,13 @@ def test_udf_with_rand(self): ) +class UDFTests(BaseUDFTests, ReusedSQLTestCase): + @classmethod + def setUpClass(cls): + super(BaseUDFTests, cls).setUpClass() + cls.spark.conf.set("spark.sql.execution.pythonUDF.arrow.enabled", "false") + + class UDFInitializationTests(unittest.TestCase): def tearDown(self): if SparkSession._instantiatedSession is not None: From 52d68f9796bd266d54fec85a4b0293cee0b4474a Mon Sep 17 00:00:00 2001 From: Xinrong Meng Date: Wed, 4 Jan 2023 14:14:10 +0800 Subject: [PATCH 04/16] useArrow parameter --- python/pyspark/sql/functions.py | 22 +++++++++++++++++++--- python/pyspark/sql/udf.py | 14 ++++++++++---- 2 files changed, 29 insertions(+), 7 deletions(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index ca9537c8be8c8..73698afa4e346 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -9980,7 +9980,10 @@ def unwrap_udt(col: "ColumnOrName") -> Column: @overload def udf( - f: Callable[..., Any], returnType: "DataTypeOrString" = StringType() + f: Callable[..., Any], + returnType: "DataTypeOrString" = StringType(), + *, + useArrow: Optional[bool] = None, ) -> "UserDefinedFunctionLike": ... @@ -9988,6 +9991,8 @@ def udf( @overload def udf( f: Optional["DataTypeOrString"] = None, + *, + useArrow: Optional[bool] = None, ) -> Callable[[Callable[..., Any]], "UserDefinedFunctionLike"]: ... @@ -9996,6 +10001,7 @@ def udf( def udf( *, returnType: "DataTypeOrString" = StringType(), + useArrow: Optional[bool] = None, ) -> Callable[[Callable[..., Any]], "UserDefinedFunctionLike"]: ... @@ -10003,6 +10009,8 @@ def udf( def udf( f: Optional[Union[Callable[..., Any], "DataTypeOrString"]] = None, returnType: "DataTypeOrString" = StringType(), + *, + useArrow: Optional[bool] = None, ) -> Union["UserDefinedFunctionLike", Callable[[Callable[..., Any]], "UserDefinedFunctionLike"]]: """Creates a user defined function (UDF). @@ -10015,6 +10023,9 @@ def udf( returnType : :class:`pyspark.sql.types.DataType` or str the return type of the user-defined function. The value can be either a :class:`pyspark.sql.types.DataType` object or a DDL-formatted type string. + useArrow : bool or None + whether to use Arrow to optimize the (de)serialization. When it is None, the + Spark config "spark.sql.execution.pythonUDF.arrow.enabled" takes effect. Examples -------- @@ -10093,10 +10104,15 @@ def udf( # for decorator use it as a returnType return_type = f or returnType return functools.partial( - _create_py_udf, returnType=return_type, evalType=PythonEvalType.SQL_BATCHED_UDF + _create_py_udf, + returnType=return_type, + evalType=PythonEvalType.SQL_BATCHED_UDF, + useArrow=useArrow, ) else: - return _create_py_udf(f=f, returnType=returnType, evalType=PythonEvalType.SQL_BATCHED_UDF) + return _create_py_udf( + f=f, returnType=returnType, evalType=PythonEvalType.SQL_BATCHED_UDF, useArrow=useArrow + ) def _test() -> None: diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py index 64e7dc6f8e6ab..5ecd160585400 100644 --- a/python/pyspark/sql/udf.py +++ b/python/pyspark/sql/udf.py @@ -85,14 +85,20 @@ def _create_py_udf( f: Callable[..., Any], returnType: "DataTypeOrString", evalType: int, + useArrow: Optional[bool] = None, ) -> "UserDefinedFunctionLike": from pyspark.sql import SparkSession session = SparkSession._instantiatedSession - is_arrow_enabled = ( - session is not None - and session.conf.get("spark.sql.execution.pythonUDF.arrow.enabled") == "true" - ) + if session is None: + is_arrow_enabled = False + else: + is_arrow_enabled = ( + session.conf.get("spark.sql.execution.pythonUDF.arrow.enabled") == "true" + if useArrow is None + else useArrow + ) + regular_udf = _create_udf(f, returnType, evalType) return_type = regular_udf.returnType try: From acd28f4ae02ee8591647f82898c48a1e3554d77e Mon Sep 17 00:00:00 2001 From: Xinrong Meng Date: Wed, 4 Jan 2023 14:14:19 +0800 Subject: [PATCH 05/16] test --- .../sql/tests/test_arrow_python_udf.py | 41 +++++++++++++++++++ python/pyspark/sql/tests/test_udf.py | 41 +++++++++++++++++++ 2 files changed, 82 insertions(+) diff --git a/python/pyspark/sql/tests/test_arrow_python_udf.py b/python/pyspark/sql/tests/test_arrow_python_udf.py index c2c97f5ac651b..e457924ae88c3 100644 --- a/python/pyspark/sql/tests/test_arrow_python_udf.py +++ b/python/pyspark/sql/tests/test_arrow_python_udf.py @@ -77,6 +77,47 @@ def test_complex_input_types(self): self.assertEquals(row[0], "[1 2 3]") self.assertEquals(row[1], "{'a': 'b'}") + def test_useArrow(self): + # useArrow=True + row_true = ( + self.spark.range(1) + .selectExpr( + "array(1, 2, 3) as array", + ) + .select( + udf(lambda x: str(x), useArrow=True)("array"), + ) + .first() + ) + + # useArrow=None + row_none = ( + self.spark.range(1) + .selectExpr( + "array(1, 2, 3) as array", + ) + .select( + udf(lambda x: str(x), useArrow=None)("array"), + ) + .first() + ) + + # The input is a NumPy array when the Arrow optimization is on. + self.assertEquals(row_true[0], row_none[0]) # "[1 2 3]" + + # useArrow=False + row_false = ( + self.spark.range(1) + .selectExpr( + "array(1, 2, 3) as array", + ) + .select( + udf(lambda x: str(x), useArrow=False)("array"), + ) + .first() + ) + self.assertEquals(row_false[0], "[1, 2, 3]") + if __name__ == "__main__": from pyspark.sql.tests.test_arrow_python_udf import * # noqa: F401 diff --git a/python/pyspark/sql/tests/test_udf.py b/python/pyspark/sql/tests/test_udf.py index 5a674c7d5383b..31e94e3b7f9f7 100644 --- a/python/pyspark/sql/tests/test_udf.py +++ b/python/pyspark/sql/tests/test_udf.py @@ -811,6 +811,47 @@ def setUpClass(cls): cls.spark.conf.set("spark.sql.execution.pythonUDF.arrow.enabled", "false") +def test_useArrow(self): + # useArrow=True + row_true = ( + self.spark.range(1) + .selectExpr( + "array(1, 2, 3) as array", + ) + .select( + udf(lambda x: str(x), useArrow=True)("array"), + ) + .first() + ) + # The input is a NumPy array when the Arrow optimization is on. + self.assertEquals(row_true[0], "[1 2 3]") + + # useArrow=None + row_none = ( + self.spark.range(1) + .selectExpr( + "array(1, 2, 3) as array", + ) + .select( + udf(lambda x: str(x), useArrow=None)("array"), + ) + .first() + ) + + # useArrow=False + row_false = ( + self.spark.range(1) + .selectExpr( + "array(1, 2, 3) as array", + ) + .select( + udf(lambda x: str(x), useArrow=False)("array"), + ) + .first() + ) + self.assertEquals(row_false[0], row_none[0]) # "[1, 2, 3]" + + class UDFInitializationTests(unittest.TestCase): def tearDown(self): if SparkSession._instantiatedSession is not None: From ae71261c11146eb15832b87ccb554dfbc6fb4201 Mon Sep 17 00:00:00 2001 From: Xinrong Meng Date: Wed, 4 Jan 2023 20:29:24 +0800 Subject: [PATCH 06/16] doc --- python/pyspark/sql/udf.py | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py index 5ecd160585400..4df6f93af43e0 100644 --- a/python/pyspark/sql/udf.py +++ b/python/pyspark/sql/udf.py @@ -87,6 +87,29 @@ def _create_py_udf( evalType: int, useArrow: Optional[bool] = None, ) -> "UserDefinedFunctionLike": + # The following table shows the results when users misuse a UDF - when the user-specified return + # type(SQL Type) of the UDF and the actual instance(Python Value(Type)) that the UDF returns are + # different. Some of the behaviors are buggy and might be changed in the near future. + # +-----------------------------+--------------+----------+------+------+----------------+-----------------------------+----------+----------------------+---------+-----------+----------------------------+----------+--------------+ # noqa + # |SQL Type \ Python Value(Type)|None(NoneType)|True(bool)|1(int)|a(str)|1970-01-01(date)|1970-01-01 00:00:00(datetime)|1.0(float)|array('i', [1])(array)|[1](list)|(1,)(tuple)|bytearray(b'ABC')(bytearray)|1(Decimal)|{'a': 1}(dict)| # noqa + # +-----------------------------+--------------+----------+------+------+----------------+-----------------------------+----------+----------------------+---------+-----------+----------------------------+----------+--------------+ # noqa + # | boolean| X| X| X| X| X| X| X| X| X| X| X| X| X| # noqa + # | tinyint| X| X| X| X| X| X| X| X| X| X| X| X| X| # noqa + # | smallint| X| X| X| X| X| X| X| X| X| X| X| X| X| # noqa + # | int| X| X| X| X| X| X| X| X| X| X| X| X| X| # noqa + # | bigint| X| X| X| X| X| X| X| X| X| X| X| X| X| # noqa + # | string| X| X| X| X| X| X| X| X| X| X| X| X| X| # noqa + # | date| X| X| X| X| X| X| X| X| X| X| X| X| X| # noqa + # | timestamp| X| X| X| X| X| X| X| X| X| X| X| X| X| # noqa + # | float| X| X| X| X| X| X| X| X| X| X| X| X| X| # noqa + # | double| X| X| X| X| X| X| X| X| X| X| X| X| X| # noqa + # | binary| X| X| X| X| X| X| X| X| X| X| X| X| X| # noqa + # | decimal(10,0)| X| X| X| X| X| X| X| X| X| X| X| X| X| # noqa + # +-----------------------------+--------------+----------+------+------+----------------+-----------------------------+----------+----------------------+---------+-----------+----------------------------+----------+--------------+ # noqa + # Note: The values of 'SQL Type' are DDL formatted strings, which can be used as `returnType`s. + # Note: The values inside the table are generated by `repr`. X' means it throws an exception + # during the conversion. + from pyspark.sql import SparkSession session = SparkSession._instantiatedSession From bd76e27769128d41b8f32eec182bddee8c3fee86 Mon Sep 17 00:00:00 2001 From: Xinrong Meng Date: Wed, 4 Jan 2023 20:46:51 +0800 Subject: [PATCH 07/16] lint --- python/pyspark/sql/udf.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py index 4df6f93af43e0..2ee70fe03a352 100644 --- a/python/pyspark/sql/udf.py +++ b/python/pyspark/sql/udf.py @@ -140,11 +140,11 @@ def _create_py_udf( import pandas as pd from pyspark.sql.pandas.functions import _create_pandas_udf - result_func = lambda pdf: pdf + result_func = lambda pdf: pdf # noqa: E731 if type(return_type) == StringType: - result_func = lambda r: str(r) if r is not None else r + result_func = lambda r: str(r) if r is not None else r # noqa: E731 elif type(return_type) == BinaryType: - result_func = lambda r: bytes(r) if r is not None else r + result_func = lambda r: bytes(r) if r is not None else r # noqa: E731 def vectorized_udf(*args: pd.Series) -> pd.Series: if any(map(lambda arg: isinstance(arg, pd.DataFrame), args)): From 73fdd43ba7798a6ecf7dd57dca026547b92e9397 Mon Sep 17 00:00:00 2001 From: Xinrong Meng Date: Thu, 5 Jan 2023 09:21:17 +0800 Subject: [PATCH 08/16] lint --- python/pyspark/sql/udf.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py index 2ee70fe03a352..e5428c2b48f5f 100644 --- a/python/pyspark/sql/udf.py +++ b/python/pyspark/sql/udf.py @@ -164,7 +164,7 @@ def vectorized_udf(*args: pd.Series) -> pd.Series: f.__module__ if hasattr(f, "__module__") else f.__class__.__module__ ) vectorized_udf.__doc__ = f.__doc__ - pudf = _create_pandas_udf(vectorized_udf, returnType, None) + pudf = _create_pandas_udf(vectorized_udf, returnType, None) # type: ignore[attr-defined] # Keep the attributes as if this is a regular Python UDF. pudf.func = f pudf.returnType = return_type From fd1c05f80fcebe790c7a86516fb24bb96f8c6f50 Mon Sep 17 00:00:00 2001 From: Xinrong Meng Date: Thu, 5 Jan 2023 12:00:26 +0800 Subject: [PATCH 09/16] doc --- python/pyspark/sql/udf.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py index e5428c2b48f5f..0d73f144989a2 100644 --- a/python/pyspark/sql/udf.py +++ b/python/pyspark/sql/udf.py @@ -140,6 +140,11 @@ def _create_py_udf( import pandas as pd from pyspark.sql.pandas.functions import _create_pandas_udf + # "result_func" ensures the result of a Python UDF to be consistent with/without Arrow + # optimization. + # Otherwise, an Arrow-optimized Python UDF raises "pyarrow.lib.ArrowTypeError: Expected a + # string or bytes dtype, got ..." whereas a non-Arrow-optimized Python UDF returns + # successfully. result_func = lambda pdf: pdf # noqa: E731 if type(return_type) == StringType: result_func = lambda r: str(r) if r is not None else r # noqa: E731 @@ -153,9 +158,6 @@ def vectorized_udf(*args: pd.Series) -> pd.Series: "enabled in Python UDFs. Disable " "'spark.sql.execution.pythonUDF.arrow.enabled' to workaround." ) - # Always cast via "result_func" because regular UDF supports more permissive casting - # compared to pandas UDFs. This is to don't break the user's codes - # from enabling this edge feature. return pd.Series(result_func(f(*a)) for a in zip(*args)) # Regular UDFs can take callable instances too. From 7b396a5f7ba90ae2cdbef3630a9d616052b6e56c Mon Sep 17 00:00:00 2001 From: Xinrong Meng Date: Thu, 5 Jan 2023 12:20:54 +0800 Subject: [PATCH 10/16] lint --- python/pyspark/sql/udf.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py index 0d73f144989a2..8f08c1f86bb1b 100644 --- a/python/pyspark/sql/udf.py +++ b/python/pyspark/sql/udf.py @@ -138,7 +138,7 @@ def _create_py_udf( require_minimum_pyarrow_version() import pandas as pd - from pyspark.sql.pandas.functions import _create_pandas_udf + from pyspark.sql.pandas.functions import _create_pandas_udf # type: ignore[attr-defined] # "result_func" ensures the result of a Python UDF to be consistent with/without Arrow # optimization. @@ -166,7 +166,7 @@ def vectorized_udf(*args: pd.Series) -> pd.Series: f.__module__ if hasattr(f, "__module__") else f.__class__.__module__ ) vectorized_udf.__doc__ = f.__doc__ - pudf = _create_pandas_udf(vectorized_udf, returnType, None) # type: ignore[attr-defined] + pudf = _create_pandas_udf(vectorized_udf, returnType, None) # Keep the attributes as if this is a regular Python UDF. pudf.func = f pudf.returnType = return_type From 1f2ff85d09656297062f64bc87f74ebc4c3dabb6 Mon Sep 17 00:00:00 2001 From: Xinrong Meng Date: Thu, 5 Jan 2023 12:46:53 +0800 Subject: [PATCH 11/16] doc --- python/pyspark/sql/udf.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py index 8f08c1f86bb1b..1c17e1b60a5b4 100644 --- a/python/pyspark/sql/udf.py +++ b/python/pyspark/sql/udf.py @@ -87,9 +87,12 @@ def _create_py_udf( evalType: int, useArrow: Optional[bool] = None, ) -> "UserDefinedFunctionLike": - # The following table shows the results when users misuse a UDF - when the user-specified return - # type(SQL Type) of the UDF and the actual instance(Python Value(Type)) that the UDF returns are - # different. Some of the behaviors are buggy and might be changed in the near future. + # The following table shows the results when the type coercion in Arrow is needed, that is, + # when the user-specified return type(SQL Type) of the UDF and the actual instance(Python + # Value(Type)) that the UDF returns are different. + # Arrow and Pickle have different type coercion rules, so a UDF might have a different result + # with/without Arrow optimization. That's the main reason the Arrow optimization for Python + # UDFs is disabled by default. # +-----------------------------+--------------+----------+------+------+----------------+-----------------------------+----------+----------------------+---------+-----------+----------------------------+----------+--------------+ # noqa # |SQL Type \ Python Value(Type)|None(NoneType)|True(bool)|1(int)|a(str)|1970-01-01(date)|1970-01-01 00:00:00(datetime)|1.0(float)|array('i', [1])(array)|[1](list)|(1,)(tuple)|bytearray(b'ABC')(bytearray)|1(Decimal)|{'a': 1}(dict)| # noqa # +-----------------------------+--------------+----------+------+------+----------------+-----------------------------+----------+----------------------+---------+-----------+----------------------------+----------+--------------+ # noqa From 478156919881f8b6af3b22c077ae5d35df270989 Mon Sep 17 00:00:00 2001 From: Xinrong Meng Date: Sun, 8 Jan 2023 18:55:00 +0800 Subject: [PATCH 12/16] doc --- python/pyspark/sql/udf.py | 33 +++++++++++++++++---------------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py index 1c17e1b60a5b4..79ae456b1f75c 100644 --- a/python/pyspark/sql/udf.py +++ b/python/pyspark/sql/udf.py @@ -93,22 +93,23 @@ def _create_py_udf( # Arrow and Pickle have different type coercion rules, so a UDF might have a different result # with/without Arrow optimization. That's the main reason the Arrow optimization for Python # UDFs is disabled by default. - # +-----------------------------+--------------+----------+------+------+----------------+-----------------------------+----------+----------------------+---------+-----------+----------------------------+----------+--------------+ # noqa - # |SQL Type \ Python Value(Type)|None(NoneType)|True(bool)|1(int)|a(str)|1970-01-01(date)|1970-01-01 00:00:00(datetime)|1.0(float)|array('i', [1])(array)|[1](list)|(1,)(tuple)|bytearray(b'ABC')(bytearray)|1(Decimal)|{'a': 1}(dict)| # noqa - # +-----------------------------+--------------+----------+------+------+----------------+-----------------------------+----------+----------------------+---------+-----------+----------------------------+----------+--------------+ # noqa - # | boolean| X| X| X| X| X| X| X| X| X| X| X| X| X| # noqa - # | tinyint| X| X| X| X| X| X| X| X| X| X| X| X| X| # noqa - # | smallint| X| X| X| X| X| X| X| X| X| X| X| X| X| # noqa - # | int| X| X| X| X| X| X| X| X| X| X| X| X| X| # noqa - # | bigint| X| X| X| X| X| X| X| X| X| X| X| X| X| # noqa - # | string| X| X| X| X| X| X| X| X| X| X| X| X| X| # noqa - # | date| X| X| X| X| X| X| X| X| X| X| X| X| X| # noqa - # | timestamp| X| X| X| X| X| X| X| X| X| X| X| X| X| # noqa - # | float| X| X| X| X| X| X| X| X| X| X| X| X| X| # noqa - # | double| X| X| X| X| X| X| X| X| X| X| X| X| X| # noqa - # | binary| X| X| X| X| X| X| X| X| X| X| X| X| X| # noqa - # | decimal(10,0)| X| X| X| X| X| X| X| X| X| X| X| X| X| # noqa - # +-----------------------------+--------------+----------+------+------+----------------+-----------------------------+----------+----------------------+---------+-----------+----------------------------+----------+--------------+ # noqa + # +-----------------------------+--------------+----------+------+---------------+--------------------+-----------------------------+----------+----------------------+---------+--------------------+----------------------------+------------+--------------+ # noqa + # |SQL Type \ Python Value(Type)|None(NoneType)|True(bool)|1(int)| a(str)| 1970-01-01(date)|1970-01-01 00:00:00(datetime)|1.0(float)|array('i', [1])(array)|[1](list)| (1,)(tuple)|bytearray(b'ABC')(bytearray)| 1(Decimal)|{'a': 1}(dict)| # noqa + # +-----------------------------+--------------+----------+------+---------------+--------------------+-----------------------------+----------+----------------------+---------+--------------------+----------------------------+------------+--------------+ # noqa + # | boolean| None| True| None| None| None| None| None| None| None| None| None| None| None| # noqa + # | tinyint| None| None| 1| None| None| None| None| None| None| None| None| None| None| # noqa + # | smallint| None| None| 1| None| None| None| None| None| None| None| None| None| None| # noqa + # | int| None| None| 1| None| None| None| None| None| None| None| None| None| None| # noqa + # | bigint| None| None| 1| None| None| None| None| None| None| None| None| None| None| # noqa + # | string| None| 'true'| '1'| 'a'|'java.util.Gregor...| 'java.util.Gregor...| '1.0'| '[I@120d813a'| '[1]'|'[Ljava.lang.Obje...| '[B@48571878'| '1'| '{a=1}'| # noqa + # | date| None| X| X| X|datetime.date(197...| datetime.date(197...| X| X| X| X| X| X| X| # noqa + # | timestamp| None| X| X| X| X| datetime.datetime...| X| X| X| X| X| X| X| # noqa + # | float| None| None| None| None| None| None| 1.0| None| None| None| None| None| None| # noqa + # | double| None| None| None| None| None| None| 1.0| None| None| None| None| None| None| # noqa + # | binary| None| None| None|bytearray(b'a')| None| None| None| None| None| None| bytearray(b'ABC')| None| None| # noqa + # | decimal(10,0)| None| None| None| None| None| None| None| None| None| None| None|Decimal('1')| None| # noqa + # +-----------------------------+--------------+----------+------+---------------+--------------------+-----------------------------+----------+----------------------+---------+--------------------+----------------------------+------------+--------------+ # noqa + # Note: Python 3.9.15, Pandas 1.5.2 and PyArrow 10.0.1 are used. # Note: The values of 'SQL Type' are DDL formatted strings, which can be used as `returnType`s. # Note: The values inside the table are generated by `repr`. X' means it throws an exception # during the conversion. From 6ec9173f19b0ae4010401728109a073f49b57264 Mon Sep 17 00:00:00 2001 From: Xinrong Meng Date: Sun, 8 Jan 2023 18:56:47 +0800 Subject: [PATCH 13/16] rename --- python/pyspark/sql/tests/test_arrow_python_udf.py | 12 +++++++++++- python/pyspark/sql/tests/test_udf.py | 2 +- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/tests/test_arrow_python_udf.py b/python/pyspark/sql/tests/test_arrow_python_udf.py index e457924ae88c3..2f9146859c2e4 100644 --- a/python/pyspark/sql/tests/test_arrow_python_udf.py +++ b/python/pyspark/sql/tests/test_arrow_python_udf.py @@ -77,7 +77,7 @@ def test_complex_input_types(self): self.assertEquals(row[0], "[1 2 3]") self.assertEquals(row[1], "{'a': 'b'}") - def test_useArrow(self): + def test_use_arrow(self): # useArrow=True row_true = ( self.spark.range(1) @@ -118,6 +118,16 @@ def test_useArrow(self): ) self.assertEquals(row_false[0], "[1, 2, 3]") + def test_type_coercion(self): + actual_res = True + row_arrow = self.spark.range(1).select(udf(lambda _: actual_res, "string")("id")).first() + row_non_arrow = ( + self.spark.range(1) + .select(udf(lambda _: actual_res, "string", useArrow=False)("id")) + .first() + ) + self.assertEquals(repr(row_arrow[0]), repr(row_non_arrow[0])) + if __name__ == "__main__": from pyspark.sql.tests.test_arrow_python_udf import * # noqa: F401 diff --git a/python/pyspark/sql/tests/test_udf.py b/python/pyspark/sql/tests/test_udf.py index 31e94e3b7f9f7..fb669d158f5f6 100644 --- a/python/pyspark/sql/tests/test_udf.py +++ b/python/pyspark/sql/tests/test_udf.py @@ -811,7 +811,7 @@ def setUpClass(cls): cls.spark.conf.set("spark.sql.execution.pythonUDF.arrow.enabled", "false") -def test_useArrow(self): +def test_use_arrow(self): # useArrow=True row_true = ( self.spark.range(1) From 592a7987cd2b487c9a7d49d1b94ca3ef5e734a78 Mon Sep 17 00:00:00 2001 From: Xinrong Meng Date: Mon, 9 Jan 2023 11:27:28 +0800 Subject: [PATCH 14/16] unused --- python/pyspark/sql/tests/test_arrow_python_udf.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/python/pyspark/sql/tests/test_arrow_python_udf.py b/python/pyspark/sql/tests/test_arrow_python_udf.py index 2f9146859c2e4..904c93e137225 100644 --- a/python/pyspark/sql/tests/test_arrow_python_udf.py +++ b/python/pyspark/sql/tests/test_arrow_python_udf.py @@ -118,16 +118,6 @@ def test_use_arrow(self): ) self.assertEquals(row_false[0], "[1, 2, 3]") - def test_type_coercion(self): - actual_res = True - row_arrow = self.spark.range(1).select(udf(lambda _: actual_res, "string")("id")).first() - row_non_arrow = ( - self.spark.range(1) - .select(udf(lambda _: actual_res, "string", useArrow=False)("id")) - .first() - ) - self.assertEquals(repr(row_arrow[0]), repr(row_non_arrow[0])) - if __name__ == "__main__": from pyspark.sql.tests.test_arrow_python_udf import * # noqa: F401 From 5d2edfb9bd728871827e848c28e64119010b88f6 Mon Sep 17 00:00:00 2001 From: Xinrong Meng Date: Mon, 9 Jan 2023 11:29:10 +0800 Subject: [PATCH 15/16] unused --- python/pyspark/sql/tests/test_arrow_python_udf.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/pyspark/sql/tests/test_arrow_python_udf.py b/python/pyspark/sql/tests/test_arrow_python_udf.py index 904c93e137225..6b788d79848d6 100644 --- a/python/pyspark/sql/tests/test_arrow_python_udf.py +++ b/python/pyspark/sql/tests/test_arrow_python_udf.py @@ -62,7 +62,6 @@ def test_nested_input_error(self): ).collect() def test_complex_input_types(self): - # TODO: add map type tests back when PyArrow is upgraded. row = ( self.spark.range(1) .selectExpr("array(1, 2, 3) as array", "map('a', 'b') as map") From 2a5c54cedffdc5d4c3b514a705242833201ec287 Mon Sep 17 00:00:00 2001 From: Xinrong Meng Date: Mon, 9 Jan 2023 19:23:35 +0800 Subject: [PATCH 16/16] TRIGGER TESTS