From 048570f7e5f421288b7c297e4d2e3873626a6adc Mon Sep 17 00:00:00 2001 From: "Michael (Stu) Stewart" Date: Sun, 11 Mar 2018 13:38:29 -0700 Subject: [PATCH 1/5] [SPARK-23645][PYTHON] Allow python udfs to be called with keyword arguments --- python/pyspark/sql/udf.py | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py index 24dd06c26089c..4ea78fd97e4d5 100644 --- a/python/pyspark/sql/udf.py +++ b/python/pyspark/sql/udf.py @@ -20,6 +20,7 @@ import sys import inspect import functools +import itertools import sys from pyspark import SparkContext, since @@ -165,7 +166,20 @@ def _create_judf(self): self._name, wrapped_func, jdt, self.evalType, self.deterministic) return judf - def __call__(self, *cols): + def __call__(self, *cols, **kwcols): + # Handle keyword arguments + required = _get_argspec(self.func).args + if len(cols) < len(required): + # Extract remaining required arguments (from kwcols) in proper order + # Ensure no duplicate or unused arguments were passed + cols = tuple(itertools.chain.from_iterable( + [cols, (kwcols.pop(c) for c in required[len(cols):])])) + kwargs_remaining = list(kwcols.keys()) + if kwargs_remaining: + raise TypeError(self._name + "() " + + "got unexpected (or duplicated) keyword arguments: " + + str(kwargs_remaining)) + judf = self._judf sc = SparkContext._active_spark_context return Column(judf.apply(_to_seq(sc, cols, _to_java_column))) @@ -187,8 +201,8 @@ def _wrapped(self): a for a in functools.WRAPPER_ASSIGNMENTS if a != '__name__' and a != '__module__') @functools.wraps(self.func, assigned=assignments) - def wrapper(*args): - return self(*args) + def wrapper(*args, **kwargs): + return self(*args, **kwargs) wrapper.__name__ = self._name wrapper.__module__ = (self.func.__module__ if hasattr(self.func, '__module__') From 9ea2595f0cecb0cd05e0e6b99baf538679332e8b Mon Sep 17 00:00:00 2001 From: "Michael (Stu) Stewart" Date: Sun, 18 Mar 2018 11:04:21 -0700 Subject: [PATCH 2/5] Incomplete / Show issue with partial fn in pandas_udf --- python/pyspark/sql/functions.py | 6 +++++ python/pyspark/sql/udf.py | 47 +++++++++++++++++++++++---------- python/pyspark/worker.py | 2 +- 3 files changed, 40 insertions(+), 15 deletions(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index dff590983b4d9..0cb499f9f5680 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2155,6 +2155,9 @@ def udf(f=None, returnType=StringType()): in boolean expressions and it ends up with being executed all internally. If the functions can fail on special rows, the workaround is to incorporate the condition into the functions. + .. note:: The user-defined functions may take keyword arguments e.g. (a=7) in python3, but in + python2 they can not. + :param f: python function if used as a standalone function :param returnType: the return type of the user-defined function. The value can be either a :class:`pyspark.sql.types.DataType` object or a DDL-formatted type string. @@ -2338,6 +2341,9 @@ def pandas_udf(f=None, returnType=None, functionType=None): .. note:: The user-defined functions do not support conditional expressions or short circuiting in boolean expressions and it ends up with being executed all internally. If the functions can fail on special rows, the workaround is to incorporate the condition into the functions. + + .. note:: The user-defined functions may take keyword arguments e.g. (a=7) in python3, but in + python2 they can not. """ # decorator @pandas_udf(returnType, functionType) is_decorator = f is None or isinstance(f, (str, DataType)) diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py index 4ea78fd97e4d5..5fbaeb0ef084c 100644 --- a/python/pyspark/sql/udf.py +++ b/python/pyspark/sql/udf.py @@ -52,7 +52,8 @@ def _create_udf(f, returnType, evalType): argspec = _get_argspec(f) if evalType == PythonEvalType.SQL_SCALAR_PANDAS_UDF and len(argspec.args) == 0 and \ - argspec.varargs is None: + argspec.varargs is None and not \ + (sys.version_info[0] > 2 and len(argspec.kwonlyargs) > 0): raise ValueError( "Invalid function: 0-arg pandas_udfs are not supported. " "Instead, create a 1-arg pandas_udf and ignore the arg in your function." @@ -167,19 +168,37 @@ def _create_judf(self): return judf def __call__(self, *cols, **kwcols): - # Handle keyword arguments - required = _get_argspec(self.func).args - if len(cols) < len(required): - # Extract remaining required arguments (from kwcols) in proper order - # Ensure no duplicate or unused arguments were passed - cols = tuple(itertools.chain.from_iterable( - [cols, (kwcols.pop(c) for c in required[len(cols):])])) - kwargs_remaining = list(kwcols.keys()) - if kwargs_remaining: - raise TypeError(self._name + "() " - + "got unexpected (or duplicated) keyword arguments: " - + str(kwargs_remaining)) - + # Handle keyword arguments for python3 + if sys.version_info[0] > 2: + spec = _get_argspec(self.func) + required = spec.args + spec.kwonlyargs + defaults = spec.kwonlydefaults or {} + if len(cols) < len(required): + print('qqqqq', '\nrequired', required, '\ndefaults', defaults, '\ncols', cols, '\nkwcols', kwcols) + + def _normalize_args(cols_, kwcols_): + """ + Extract remaining required arguments (from kwcols) in proper order. + Ensure no duplicate or unused arguments were passed. + """ + updated_cols = tuple(itertools.chain.from_iterable( + [cols_, (kwcols_.pop(c) for c in required[len(cols_):] if c not in defaults)])) + kwargs_remaining = list(set(kwcols_.keys()) - set(defaults.keys())) + print('REMAIN', kwargs_remaining) + if kwargs_remaining: + raise TypeError(self._name + "() " + + "got unexpected (or duplicated) keyword arguments: " + + str(kwargs_remaining)) + return updated_cols + + def _merge(d1, d2): + d = d1.copy() + d.update(d2) + print('merged', d) + return d + + cols = _normalize_args(cols, _merge(kwcols, kwcols)) + print('FINALLLL cols', cols) judf = self._judf sc = SparkContext._active_spark_context return Column(judf.apply(_to_seq(sc, cols, _to_java_column))) diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index a1a4336b1e8de..fd05b227177cd 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -79,7 +79,7 @@ def wrap_scalar_pandas_udf(f, return_type): arrow_return_type = to_arrow_type(return_type) def verify_result_length(*a): - result = f(*a) + result = f(*a) # <-- this does not have any notion that f may be a functools.partial that already has some args accounted for, so partial fns blow up if not hasattr(result, "__len__"): raise TypeError("Return type of the user-defined functon should be " "Pandas.Series, but is {}".format(type(result))) From acd1cbe53dc7d1bf83b1022a7e36652cd9530b58 Mon Sep 17 00:00:00 2001 From: "Michael (Stu) Stewart" Date: Sun, 18 Mar 2018 11:13:53 -0700 Subject: [PATCH 3/5] Add note RE no keyword args in python UDFs --- docs/sql-programming-guide.md | 6 +++++ python/pyspark/sql/functions.py | 6 ++--- python/pyspark/sql/udf.py | 41 ++++----------------------------- python/pyspark/worker.py | 2 +- 4 files changed, 13 insertions(+), 42 deletions(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 421e2eaf62bfb..5f7cf7d438527 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1799,6 +1799,12 @@ different than a Pandas timestamp. It is recommended to use Pandas time series f working with timestamps in `pandas_udf`s to get the best performance, see [here](https://pandas.pydata.org/pandas-docs/stable/timeseries.html) for details. +### Keyword Arguments and Related Structures + +Currently, for `pandas_udf` it is not possible to pass keyword arguments to a function. The wrapped +function must also not be a `functools.partial` function object. Functions with a zero-length argument +list are unsupported, but can be approximated via a single-argument udf which ignores the passed arg. + # Migration Guide ## Upgrading From Spark SQL 2.3 to 2.4 diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 0cb499f9f5680..da6ce652c2758 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2155,8 +2155,7 @@ def udf(f=None, returnType=StringType()): in boolean expressions and it ends up with being executed all internally. If the functions can fail on special rows, the workaround is to incorporate the condition into the functions. - .. note:: The user-defined functions may take keyword arguments e.g. (a=7) in python3, but in - python2 they can not. + .. note:: The user-defined functions may not take keyword arguments. :param f: python function if used as a standalone function :param returnType: the return type of the user-defined function. The value can be either a @@ -2342,8 +2341,7 @@ def pandas_udf(f=None, returnType=None, functionType=None): in boolean expressions and it ends up with being executed all internally. If the functions can fail on special rows, the workaround is to incorporate the condition into the functions. - .. note:: The user-defined functions may take keyword arguments e.g. (a=7) in python3, but in - python2 they can not. + .. note:: The user-defined functions may not take keyword arguments. """ # decorator @pandas_udf(returnType, functionType) is_decorator = f is None or isinstance(f, (str, DataType)) diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py index 5fbaeb0ef084c..24dd06c26089c 100644 --- a/python/pyspark/sql/udf.py +++ b/python/pyspark/sql/udf.py @@ -20,7 +20,6 @@ import sys import inspect import functools -import itertools import sys from pyspark import SparkContext, since @@ -52,8 +51,7 @@ def _create_udf(f, returnType, evalType): argspec = _get_argspec(f) if evalType == PythonEvalType.SQL_SCALAR_PANDAS_UDF and len(argspec.args) == 0 and \ - argspec.varargs is None and not \ - (sys.version_info[0] > 2 and len(argspec.kwonlyargs) > 0): + argspec.varargs is None: raise ValueError( "Invalid function: 0-arg pandas_udfs are not supported. " "Instead, create a 1-arg pandas_udf and ignore the arg in your function." @@ -167,38 +165,7 @@ def _create_judf(self): self._name, wrapped_func, jdt, self.evalType, self.deterministic) return judf - def __call__(self, *cols, **kwcols): - # Handle keyword arguments for python3 - if sys.version_info[0] > 2: - spec = _get_argspec(self.func) - required = spec.args + spec.kwonlyargs - defaults = spec.kwonlydefaults or {} - if len(cols) < len(required): - print('qqqqq', '\nrequired', required, '\ndefaults', defaults, '\ncols', cols, '\nkwcols', kwcols) - - def _normalize_args(cols_, kwcols_): - """ - Extract remaining required arguments (from kwcols) in proper order. - Ensure no duplicate or unused arguments were passed. - """ - updated_cols = tuple(itertools.chain.from_iterable( - [cols_, (kwcols_.pop(c) for c in required[len(cols_):] if c not in defaults)])) - kwargs_remaining = list(set(kwcols_.keys()) - set(defaults.keys())) - print('REMAIN', kwargs_remaining) - if kwargs_remaining: - raise TypeError(self._name + "() " - + "got unexpected (or duplicated) keyword arguments: " - + str(kwargs_remaining)) - return updated_cols - - def _merge(d1, d2): - d = d1.copy() - d.update(d2) - print('merged', d) - return d - - cols = _normalize_args(cols, _merge(kwcols, kwcols)) - print('FINALLLL cols', cols) + def __call__(self, *cols): judf = self._judf sc = SparkContext._active_spark_context return Column(judf.apply(_to_seq(sc, cols, _to_java_column))) @@ -220,8 +187,8 @@ def _wrapped(self): a for a in functools.WRAPPER_ASSIGNMENTS if a != '__name__' and a != '__module__') @functools.wraps(self.func, assigned=assignments) - def wrapper(*args, **kwargs): - return self(*args, **kwargs) + def wrapper(*args): + return self(*args) wrapper.__name__ = self._name wrapper.__module__ = (self.func.__module__ if hasattr(self.func, '__module__') diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py index fd05b227177cd..a1a4336b1e8de 100644 --- a/python/pyspark/worker.py +++ b/python/pyspark/worker.py @@ -79,7 +79,7 @@ def wrap_scalar_pandas_udf(f, return_type): arrow_return_type = to_arrow_type(return_type) def verify_result_length(*a): - result = f(*a) # <-- this does not have any notion that f may be a functools.partial that already has some args accounted for, so partial fns blow up + result = f(*a) if not hasattr(result, "__len__"): raise TypeError("Return type of the user-defined functon should be " "Pandas.Series, but is {}".format(type(result))) From bc49c3cc5ae2e23da5cc7b6d7e1a779e9d012c8c Mon Sep 17 00:00:00 2001 From: "Michael (Stu) Stewart" Date: Sat, 24 Mar 2018 10:30:15 -0700 Subject: [PATCH 4/5] Address comments --- docs/sql-programming-guide.md | 6 ------ python/pyspark/sql/functions.py | 4 ++-- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index 5f7cf7d438527..421e2eaf62bfb 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -1799,12 +1799,6 @@ different than a Pandas timestamp. It is recommended to use Pandas time series f working with timestamps in `pandas_udf`s to get the best performance, see [here](https://pandas.pydata.org/pandas-docs/stable/timeseries.html) for details. -### Keyword Arguments and Related Structures - -Currently, for `pandas_udf` it is not possible to pass keyword arguments to a function. The wrapped -function must also not be a `functools.partial` function object. Functions with a zero-length argument -list are unsupported, but can be approximated via a single-argument udf which ignores the passed arg. - # Migration Guide ## Upgrading From Spark SQL 2.3 to 2.4 diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index da6ce652c2758..7261eb4974e9d 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2155,7 +2155,7 @@ def udf(f=None, returnType=StringType()): in boolean expressions and it ends up with being executed all internally. If the functions can fail on special rows, the workaround is to incorporate the condition into the functions. - .. note:: The user-defined functions may not take keyword arguments. + .. note:: The user-defined functions do not take keyword arguments. :param f: python function if used as a standalone function :param returnType: the return type of the user-defined function. The value can be either a @@ -2341,7 +2341,7 @@ def pandas_udf(f=None, returnType=None, functionType=None): in boolean expressions and it ends up with being executed all internally. If the functions can fail on special rows, the workaround is to incorporate the condition into the functions. - .. note:: The user-defined functions may not take keyword arguments. + .. note:: The user-defined functions do not take keyword arguments. """ # decorator @pandas_udf(returnType, functionType) is_decorator = f is None or isinstance(f, (str, DataType)) From a3da39ca62f69fd4e3a4c417ed28613edd15924f Mon Sep 17 00:00:00 2001 From: "Michael (Stu) Stewart" Date: Sun, 25 Mar 2018 11:32:09 -0700 Subject: [PATCH 5/5] Update wording --- python/pyspark/sql/functions.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 7261eb4974e9d..0ac030f03414d 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -2155,7 +2155,7 @@ def udf(f=None, returnType=StringType()): in boolean expressions and it ends up with being executed all internally. If the functions can fail on special rows, the workaround is to incorporate the condition into the functions. - .. note:: The user-defined functions do not take keyword arguments. + .. note:: The user-defined functions do not take keyword arguments on the calling side. :param f: python function if used as a standalone function :param returnType: the return type of the user-defined function. The value can be either a @@ -2341,7 +2341,7 @@ def pandas_udf(f=None, returnType=None, functionType=None): in boolean expressions and it ends up with being executed all internally. If the functions can fail on special rows, the workaround is to incorporate the condition into the functions. - .. note:: The user-defined functions do not take keyword arguments. + .. note:: The user-defined functions do not take keyword arguments on the calling side. """ # decorator @pandas_udf(returnType, functionType) is_decorator = f is None or isinstance(f, (str, DataType))