Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ private[spark] object PythonEvalType {
val NON_UDF = 0
val SQL_BATCHED_UDF = 1
val SQL_PANDAS_UDF = 2
val SQL_PANDAS_GROUPED_UDF = 3
}

/**
Expand Down
1 change: 1 addition & 0 deletions python/pyspark/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ class PythonEvalType(object):
NON_UDF = 0
SQL_BATCHED_UDF = 1
SQL_PANDAS_UDF = 2
SQL_PANDAS_GROUPED_UDF = 3


class Serializer(object):
Expand Down
173 changes: 102 additions & 71 deletions python/pyspark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
from pyspark import since, SparkContext
from pyspark.rdd import _prepare_for_python_RDD, ignore_unicode_prefix
from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
from pyspark.sql.types import StringType, DataType, _parse_datatype_string
from pyspark.sql.types import StringType, StructType, DataType, _parse_datatype_string
from pyspark.sql.column import Column, _to_java_column, _to_seq
from pyspark.sql.dataframe import DataFrame

Expand Down Expand Up @@ -2038,13 +2038,22 @@ def _wrap_function(sc, func, returnType):
sc.pythonVer, broadcast_vars, sc._javaAccumulator)


class PythonUdfType(object):
Copy link
Member

@gatorsmile gatorsmile Oct 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also add the descriptions about these three UDF types?

  • NORMAL_UDF: row-based UDFs
  • PANDAS_UDF: scalar vectorized UDFs
  • PANDAS_GROUPED_UDF: grouped vectorized UDFs

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'll add the descriptions.

# row-based UDFs
NORMAL_UDF = 0
# scalar vectorized UDFs
PANDAS_UDF = 1
# grouped vectorized UDFs
PANDAS_GROUPED_UDF = 2


class UserDefinedFunction(object):
"""
User defined function in Python

.. versionadded:: 1.3
"""
def __init__(self, func, returnType, name=None, vectorized=False):
def __init__(self, func, returnType, name=None, pythonUdfType=PythonUdfType.NORMAL_UDF):
if not callable(func):
raise TypeError(
"Not a function or callable (__call__ is not defined): "
Expand All @@ -2058,7 +2067,7 @@ def __init__(self, func, returnType, name=None, vectorized=False):
self._name = name or (
func.__name__ if hasattr(func, '__name__')
else func.__class__.__name__)
self.vectorized = vectorized
self.pythonUdfType = pythonUdfType

@property
def returnType(self):
Expand Down Expand Up @@ -2090,7 +2099,7 @@ def _create_judf(self):
wrapped_func = _wrap_function(sc, self.func, self.returnType)
jdt = spark._jsparkSession.parseDataType(self.returnType.json())
judf = sc._jvm.org.apache.spark.sql.execution.python.UserDefinedPythonFunction(
self._name, wrapped_func, jdt, self.vectorized)
self._name, wrapped_func, jdt, self.pythonUdfType)
return judf

def __call__(self, *cols):
Expand Down Expand Up @@ -2121,33 +2130,40 @@ def wrapper(*args):

wrapper.func = self.func
wrapper.returnType = self.returnType
wrapper.vectorized = self.vectorized
wrapper.pythonUdfType = self.pythonUdfType

return wrapper


def _create_udf(f, returnType, vectorized):
def _create_udf(f, returnType, pythonUdfType):

def _udf(f, returnType=StringType(), vectorized=vectorized):
if vectorized:
def _udf(f, returnType=StringType(), pythonUdfType=pythonUdfType):
if pythonUdfType == PythonUdfType.PANDAS_UDF:
import inspect
argspec = inspect.getargspec(f)
if len(argspec.args) == 0 and argspec.varargs is None:
raise ValueError(
"0-arg pandas_udfs are not supported. "
"Instead, create a 1-arg pandas_udf and ignore the arg in your function."
Copy link
Member

@viirya viirya Oct 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe also update this error message, like "0-arg pandas_udfs/pandas_grouped_udfs are not supported. ...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I'll update the message.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, when pandas_grouped_udfs, the number of args should be only 1?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so. If it didn't become too complicated, maybe we can also check it for pandas_grouped_udf.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, let me try.

)
udf_obj = UserDefinedFunction(f, returnType, vectorized=vectorized)
elif pythonUdfType == PythonUdfType.PANDAS_GROUPED_UDF:
import inspect
argspec = inspect.getargspec(f)
if len(argspec.args) != 1 and argspec.varargs is None:
raise ValueError("Only 1-arg pandas_grouped_udfs are supported.")

udf_obj = UserDefinedFunction(f, returnType, pythonUdfType=pythonUdfType)
return udf_obj._wrapped()

# decorator @udf, @udf(), @udf(dataType()), or similar with @pandas_udf
# decorator @udf, @udf(), @udf(dataType()), or similar with @pandas_udf and @pandas_grouped_udf
if f is None or isinstance(f, (str, DataType)):
# If DataType has been passed as a positional argument
# for decorator use it as a returnType
return_type = f or returnType
return functools.partial(_udf, returnType=return_type, vectorized=vectorized)
return functools.partial(
_udf, returnType=return_type, pythonUdfType=pythonUdfType)
else:
return _udf(f=f, returnType=returnType, vectorized=vectorized)
return _udf(f=f, returnType=returnType, pythonUdfType=pythonUdfType)


@since(1.3)
Expand Down Expand Up @@ -2181,7 +2197,7 @@ def udf(f=None, returnType=StringType()):
| 8| JOHN DOE| 22|
+----------+--------------+------------+
"""
return _create_udf(f, returnType=returnType, vectorized=False)
return _create_udf(f, returnType=returnType, pythonUdfType=PythonUdfType.NORMAL_UDF)


@since(2.3)
Expand All @@ -2192,67 +2208,82 @@ def pandas_udf(f=None, returnType=StringType()):
:param f: user-defined function. A python function if used as a standalone function
:param returnType: a :class:`pyspark.sql.types.DataType` object

The user-defined function can define one of the following transformations:

1. One or more `pandas.Series` -> A `pandas.Series`

This udf is used with :meth:`pyspark.sql.DataFrame.withColumn` and
:meth:`pyspark.sql.DataFrame.select`.
The returnType should be a primitive data type, e.g., `DoubleType()`.
The length of the returned `pandas.Series` must be of the same as the input `pandas.Series`.

>>> from pyspark.sql.types import IntegerType, StringType
>>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())
>>> @pandas_udf(returnType=StringType())
... def to_upper(s):
... return s.str.upper()
...
>>> @pandas_udf(returnType="integer")
... def add_one(x):
... return x + 1
...
>>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
>>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\
... .show() # doctest: +SKIP
+----------+--------------+------------+
|slen(name)|to_upper(name)|add_one(age)|
+----------+--------------+------------+
| 8| JOHN DOE| 22|
+----------+--------------+------------+

2. A `pandas.DataFrame` -> A `pandas.DataFrame`

This udf is only used with :meth:`pyspark.sql.GroupedData.apply`.
The returnType should be a :class:`StructType` describing the schema of the returned
`pandas.DataFrame`.

>>> df = spark.createDataFrame(
... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
... ("id", "v"))
>>> @pandas_udf(returnType=df.schema)
... def normalize(pdf):
... v = pdf.v
... return pdf.assign(v=(v - v.mean()) / v.std())
>>> df.groupby('id').apply(normalize).show() # doctest: +SKIP
+---+-------------------+
| id| v|
+---+-------------------+
| 1|-0.7071067811865475|
| 1| 0.7071067811865475|
| 2|-0.8320502943378437|
| 2|-0.2773500981126146|
| 2| 1.1094003924504583|
+---+-------------------+

.. note:: This type of udf cannot be used with functions such as `withColumn` or `select`
because it defines a `DataFrame` transformation rather than a `Column`
transformation.

.. seealso:: :meth:`pyspark.sql.GroupedData.apply`
The user-defined function can define the following transformation:

One or more `pandas.Series` -> A `pandas.Series`

This udf is used with :meth:`pyspark.sql.DataFrame.withColumn` and
:meth:`pyspark.sql.DataFrame.select`.
The returnType should be a primitive data type, e.g., `DoubleType()`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happened if we do not pass a primitive data type? Do we have a test case for this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will fail in runtime. I'll add tests.

The length of the returned `pandas.Series` must be of the same as the input `pandas.Series`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this just a fact? or an input requirement?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an output requirement.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can users break this requirement? If so, what happened?

Copy link
Member Author

@ueshin ueshin Oct 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, they can and it will fail.

def test_vectorized_udf_invalid_length(self):
from pyspark.sql.functions import pandas_udf, col
import pandas as pd
df = self.spark.range(10)
raise_exception = pandas_udf(lambda _: pd.Series(1), LongType())
with QuietTest(self.sc):
with self.assertRaisesRegexp(
Exception,
'Result vector from pandas_udf was not the required length'):
df.select(raise_exception(col('id'))).collect()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Thanks!


>>> from pyspark.sql.types import IntegerType, StringType
>>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())
>>> @pandas_udf(returnType=StringType())
... def to_upper(s):
... return s.str.upper()
...
>>> @pandas_udf(returnType="integer")
... def add_one(x):
... return x + 1
...
>>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
>>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\
... .show() # doctest: +SKIP
+----------+--------------+------------+
|slen(name)|to_upper(name)|add_one(age)|
+----------+--------------+------------+
| 8| JOHN DOE| 22|
+----------+--------------+------------+

.. note:: The user-defined function must be deterministic.
"""
return _create_udf(f, returnType=returnType, pythonUdfType=PythonUdfType.PANDAS_UDF)


@since(2.3)
def pandas_grouped_udf(f=None, returnType=StructType()):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per discussion here:
#18732 (comment)

Should we consider convert pandas_udf to pandas_grouped_udf implicitly in groupby apply and not introduce pandas_grouped_udf as a user facing API?

groupby apply implies the udf is a grouped udf, so there should not be ambiguity here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I submitted another pr #19517 based on this as a comparison.
I guess it covers what you are thinking.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ueshin , yes that's what I am thinking.

Copy link
Contributor

@icexelloss icexelloss Oct 18, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is a summary of the current proposal during some offline disuccsion:

I. Use only pandas_udf

The main issues with this approach as a few people comment out is that it is hard to know what the udf does without look at the implementation.
For instance, for a udf:

@pandas_udf(DoubleType())
def foo(v):
      ...

It's hard to tell whether this function is a reduction that returns a scalar double, or a transform function that returns a pd.Series of double.

This is less than ideal because:

  • The user of the udf cannot tell which functions this udf can be used with. i.e, can this be used with groupby().apply() or withColumn or groupby().agg()?
  • Catalyst cannot do validation at planning phase, i.e., it cannot throw exception if user passes a transformation function rather than aggregation function to groupby().agg()

II. Use different decorators. i,e, pandas_udf (or pandas_scalar_udf), pandas_grouped_udf, pandas_udaf

The idea of this approach is to use pandas_grouped_udf for all group udfs, and pandas_scalar_udf for scalar pandas udfs that gets used with "withColumn". This helps with distinguish between some scalar udf and group udfs. However, this approach doesn't help to distinguish among group udfs. For instance, the group transform and group aggregation examples above.

III. Use pandas_udf decorate and a function type enum for "one-step" vectorized udf and pandas_udaf for multi-step aggregation function

This approach uses a function type enum to describe what the udf does. Here are the proposed function types:

  • transform
    A pd.Series(s) -> pd.Series transformation that is independent of the grouping. This is the existing scalar pandas udf.
  • group_transform
    A pd.Series(s) -> pd.Series transformation that is dependent of the grouping. e.g.
@pandas_udf(DoubleType(), GROUP_TRANSFORM):
def foo(v):
      return (v - v.mean()) / v.std()
  • group_aggregate:
    A pd.Series(s) -> scalar function, e.g.
@pandas_udf(DoubleType(), GROUP_AGGREGATE):
def foo(v):
      return v.mean()
  • group_map (maybe a better name):
    This defines a pd.DataFrame -> pd.DataFrame transformation. This is the current groupby().apply() udf

These types also works with window functions because window functions are either (1) group_transform (rank) or (2) group_aggregate (first, last)

I am in favor of (3). What do you guys think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Post it in another PR #19517? This discussion thread will be collapsed when Takuya made a code change.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we should consider merging #19517 first because it's an improvement of the behavior by introducing PythonUdfType instead of the hack to detect the udf type by the return type at worker, without any user-facing API changes from #18732.
The proposal and discussion should be in this pr but out of any thread to avoid being collapsed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Proposal 3 looks great! one minor question: what's the difference between transform and group_transform? Seems we don't need to care about it both in usage and implementation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the late reply.

@gatorsmile Sounds good. I will copy the discussion in this PR as @ueshin suggested.

@ueshin +1 to merge #19517. I think it's a good change and will make it easier for later changes.

@cloud-fan transform defines a transformation that doesn't reply on grouping semantics: for instance, this is a wrong udf definition:

@pandas_udf(DoubleType(), TRANSFORM):
def foo(v):
     return (v - v.mean() / v.std())

because the transformation is replying some kind of "grouping semantics", otherwise v.mean() and v.std() has no meaning for arbitrary grouping.

"""
Creates a grouped vectorized user defined function (UDF).

:param f: user-defined function. A python function if used as a standalone function
:param returnType: a :class:`pyspark.sql.types.StructType` object

The grouped user-defined function can define the following transformation:

A `pandas.DataFrame` -> A `pandas.DataFrame`

This udf is only used with :meth:`pyspark.sql.GroupedData.apply`.
The returnType should be a :class:`StructType` describing the schema of the returned
`pandas.DataFrame`.

>>> df = spark.createDataFrame(
... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
... ("id", "v"))
>>> @pandas_grouped_udf(returnType=df.schema)
... def normalize(pdf):
... v = pdf.v
... return pdf.assign(v=(v - v.mean()) / v.std())
>>> df.groupby('id').apply(normalize).show() # doctest: +SKIP
+---+-------------------+
| id| v|
+---+-------------------+
| 1|-0.7071067811865475|
| 1| 0.7071067811865475|
| 2|-0.8320502943378437|
| 2|-0.2773500981126146|
| 2| 1.1094003924504583|
+---+-------------------+

.. note:: This type of udf cannot be used with functions such as `withColumn` or `select`
because it defines a `DataFrame` transformation rather than a `Column`
transformation.

.. seealso:: :meth:`pyspark.sql.GroupedData.apply`

.. note:: The user-defined function must be deterministic.
"""
return _create_udf(f, returnType=returnType, vectorized=True)
return _create_udf(f, returnType=returnType, pythonUdfType=PythonUdfType.PANDAS_GROUPED_UDF)


blacklist = ['map', 'since', 'ignore_unicode_prefix']
Expand Down
22 changes: 12 additions & 10 deletions python/pyspark/sql/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from pyspark.rdd import ignore_unicode_prefix
from pyspark.sql.column import Column, _to_seq, _to_java_column, _create_column_from_literal
from pyspark.sql.dataframe import DataFrame
from pyspark.sql.functions import PythonUdfType
from pyspark.sql.types import *

__all__ = ["GroupedData"]
Expand Down Expand Up @@ -206,18 +207,18 @@ def apply(self, udf):
to the user-function and the returned `pandas.DataFrame`s are combined as a
:class:`DataFrame`.
The returned `pandas.DataFrame` can be of arbitrary length and its schema must match the
returnType of the pandas udf.
returnType of the pandas grouped udf.

This function does not support partial aggregation, and requires shuffling all the data in
the :class:`DataFrame`.

:param udf: A function object returned by :meth:`pyspark.sql.functions.pandas_udf`
:param udf: A function object returned by :meth:`pyspark.sql.functions.pandas_grouped_udf`

>>> from pyspark.sql.functions import pandas_udf
>>> from pyspark.sql.functions import pandas_grouped_udf
>>> df = spark.createDataFrame(
... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
... ("id", "v"))
>>> @pandas_udf(returnType=df.schema)
>>> @pandas_grouped_udf(returnType=df.schema)
... def normalize(pdf):
... v = pdf.v
... return pdf.assign(v=(v - v.mean()) / v.std())
Expand All @@ -232,16 +233,17 @@ def apply(self, udf):
| 2| 1.1094003924504583|
+---+-------------------+

.. seealso:: :meth:`pyspark.sql.functions.pandas_udf`
.. seealso:: :meth:`pyspark.sql.functions.pandas_grouped_udf`

"""
from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import pandas_grouped_udf

# Columns are special because hasattr always return True
if isinstance(udf, Column) or not hasattr(udf, 'func') or not udf.vectorized:
raise ValueError("The argument to apply must be a pandas_udf")
if isinstance(udf, Column) or not hasattr(udf, 'func') \
or udf.pythonUdfType != PythonUdfType.PANDAS_GROUPED_UDF:
raise ValueError("The argument to apply must be a pandas_grouped_udf")
if not isinstance(udf.returnType, StructType):
raise ValueError("The returnType of the pandas_udf must be a StructType")
raise ValueError("The returnType of the pandas_grouped_udf must be a StructType")

df = self._df
func = udf.func
Expand All @@ -268,7 +270,7 @@ def wrapped(*cols):
return [(result[result.columns[i]], arrow_type)
for i, arrow_type in enumerate(arrow_return_types)]

wrapped_udf_obj = pandas_udf(wrapped, returnType)
wrapped_udf_obj = pandas_grouped_udf(wrapped, returnType)
udf_column = wrapped_udf_obj(*[df[col] for col in df.columns])
jdf = self._jgd.flatMapGroupsInPandas(udf_column._jc.expr())
return DataFrame(jdf, self.sql_ctx)
Expand Down
Loading