Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
176cd15
Initial commit of groupby apply
icexelloss Sep 28, 2017
f109afb
Clean up tests
icexelloss Sep 28, 2017
07bccca
Add support for dtypes as returnType
icexelloss Sep 28, 2017
e7a9b27
Fix pep8 sytle check
icexelloss Sep 28, 2017
83b647e
Address comments. Updated doc string for pandas_udf.
icexelloss Sep 29, 2017
8d98b3e
Replace iter.grouped with BatchIterator
icexelloss Sep 29, 2017
96ce587
[Minor] Fix pep8
icexelloss Sep 29, 2017
213dd1a
Clean up code. Refine doc for pandas_udf() and apply(). Address comme…
icexelloss Oct 3, 2017
d37a9e6
Remove dynamic returnType support
icexelloss Oct 3, 2017
1ea2b71
Fix pep8 style check
icexelloss Oct 3, 2017
4943ceb
Fix ExtractPythonUDFs
icexelloss Oct 4, 2017
21fed0d
Address new PR comments
icexelloss Oct 4, 2017
40d7e8a
Add a test for complex groupby.
ueshin Oct 4, 2017
427a847
Fix complex groupby.
ueshin Oct 4, 2017
0929d4d
Add support for empty groupby.
ueshin Oct 4, 2017
d9a3e8d
Skip grouping if groupingAttributes is empty.
ueshin Oct 4, 2017
ce0d54c
Address some new comments
icexelloss Oct 4, 2017
657942b
Fix minor typo
icexelloss Oct 5, 2017
fa88c88
Add doc for FlatMapGroupsInPandasExec
icexelloss Oct 5, 2017
e4efb32
Fix doctest in group.py
icexelloss Oct 5, 2017
f572385
Fix doctest for group.py
icexelloss Oct 5, 2017
5162ed1
Add comments and standardize exception handling in wrap_pandas_udf
icexelloss Oct 5, 2017
d628f4e
Minor: Fix pep8
icexelloss Oct 5, 2017
20fb1fe
Fix test
icexelloss Oct 6, 2017
284ba00
Minor edit to groupby apply doc
icexelloss Oct 6, 2017
876b118
Improve documentation. FlatMapGroupsInPandas logical node to pythonLo…
icexelloss Oct 9, 2017
b0410a2
Fix use-defined -> user-defined
icexelloss Oct 10, 2017
87edfcc
changed wrapping to be in one place
BryanCutler Oct 6, 2017
4413ed4
changed to pickle spark type instead of arrow types for wrapped func
BryanCutler Oct 9, 2017
a064b21
move import
BryanCutler Oct 9, 2017
a036f70
Address CR comments
icexelloss Oct 10, 2017
b88a4d8
Minor: Fix typo in doc
icexelloss Oct 10, 2017
9c2b10e
Clean up imports in ExtractPythonUDFs.scala
icexelloss Oct 10, 2017
dc1d406
Address comments about docs
icexelloss Oct 10, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions python/pyspark/sql/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -1227,7 +1227,7 @@ def groupBy(self, *cols):
"""
jgd = self._jdf.groupBy(self._jcols(*cols))
from pyspark.sql.group import GroupedData
return GroupedData(jgd, self.sql_ctx)
return GroupedData(jgd, self)

@since(1.4)
def rollup(self, *cols):
Expand All @@ -1248,7 +1248,7 @@ def rollup(self, *cols):
"""
jgd = self._jdf.rollup(self._jcols(*cols))
from pyspark.sql.group import GroupedData
return GroupedData(jgd, self.sql_ctx)
return GroupedData(jgd, self)

@since(1.4)
def cube(self, *cols):
Expand All @@ -1271,7 +1271,7 @@ def cube(self, *cols):
"""
jgd = self._jdf.cube(self._jcols(*cols))
from pyspark.sql.group import GroupedData
return GroupedData(jgd, self.sql_ctx)
return GroupedData(jgd, self)

@since(1.3)
def agg(self, *exprs):
Expand Down
98 changes: 72 additions & 26 deletions python/pyspark/sql/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -2058,7 +2058,7 @@ def __init__(self, func, returnType, name=None, vectorized=False):
self._name = name or (
func.__name__ if hasattr(func, '__name__')
else func.__class__.__name__)
self._vectorized = vectorized
self.vectorized = vectorized
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh sorry, I should have been more clear. This should stay self._vectorized since it is a private variable to the class, it's only wrapped.vectorized (which you already changed below), isn't being used as private so shouldn't have an underscore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I kind of dislike the inconsistency between UserDefinedFunction and its wrapped function. I think they are just the same thing except for the wrapped function has doc string. For ease of mind, I think we should make them either both private or public.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we ok with having vectorized being public field? I am fine with both public or private but I do think the fields of the function returned by UserDefinedFuncion_wrapped() should have the same field names as UserDefinedFunction to avoid confusion.


@property
def returnType(self):
Expand Down Expand Up @@ -2090,7 +2090,7 @@ def _create_judf(self):
wrapped_func = _wrap_function(sc, self.func, self.returnType)
jdt = spark._jsparkSession.parseDataType(self.returnType.json())
judf = sc._jvm.org.apache.spark.sql.execution.python.UserDefinedPythonFunction(
self._name, wrapped_func, jdt, self._vectorized)
self._name, wrapped_func, jdt, self.vectorized)
return judf

def __call__(self, *cols):
Expand Down Expand Up @@ -2118,8 +2118,10 @@ def wrapper(*args):
wrapper.__name__ = self._name
wrapper.__module__ = (self.func.__module__ if hasattr(self.func, '__module__')
else self.func.__class__.__module__)

wrapper.func = self.func
wrapper.returnType = self.returnType
wrapper.vectorized = self.vectorized

return wrapper

Expand All @@ -2129,8 +2131,12 @@ def _create_udf(f, returnType, vectorized):
def _udf(f, returnType=StringType(), vectorized=vectorized):
if vectorized:
import inspect
if len(inspect.getargspec(f).args) == 0:
raise NotImplementedError("0-parameter pandas_udfs are not currently supported")
argspec = inspect.getargspec(f)
if len(argspec.args) == 0 and argspec.varargs is None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hadn't really thought about it, but does this mean varargs are supported? I suppose it could, but maybe best to include a test for it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think varargs are fine. I will add the test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

raise ValueError(
"0-arg pandas_udfs are not supported. "
"Instead, create a 1-arg pandas_udf and ignore the arg in your function."
)
udf_obj = UserDefinedFunction(f, returnType, vectorized=vectorized)
return udf_obj._wrapped()

Expand All @@ -2146,7 +2152,7 @@ def _udf(f, returnType=StringType(), vectorized=vectorized):

@since(1.3)
def udf(f=None, returnType=StringType()):
"""Creates a :class:`Column` expression representing a user defined function (UDF).
"""Creates a user defined function (UDF).

.. note:: The user-defined functions must be deterministic. Due to optimization,
duplicate invocations may be eliminated or the function may even be invoked more times than
Expand Down Expand Up @@ -2181,30 +2187,70 @@ def udf(f=None, returnType=StringType()):
@since(2.3)
def pandas_udf(f=None, returnType=StringType()):
"""
Creates a :class:`Column` expression representing a user defined function (UDF) that accepts
`Pandas.Series` as input arguments and outputs a `Pandas.Series` of the same length.
Creates a vectorized user defined function (UDF).

:param f: python function if used as a standalone function
:param f: user-defined function. A python function if used as a standalone function
:param returnType: a :class:`pyspark.sql.types.DataType` object

>>> from pyspark.sql.types import IntegerType, StringType
>>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())
>>> @pandas_udf(returnType=StringType())
... def to_upper(s):
... return s.str.upper()
...
>>> @pandas_udf(returnType="integer")
... def add_one(x):
... return x + 1
...
>>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
>>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\
... .show() # doctest: +SKIP
+----------+--------------+------------+
|slen(name)|to_upper(name)|add_one(age)|
+----------+--------------+------------+
| 8| JOHN DOE| 22|
+----------+--------------+------------+
The user-defined function can define one of the following transformations:

1. One or more `pandas.Series` -> A `pandas.Series`

This udf is used with :meth:`pyspark.sql.DataFrame.withColumn` and
:meth:`pyspark.sql.DataFrame.select`.
The returnType should be a primitive data type, e.g., `DoubleType()`.
The length of the returned `pandas.Series` must be of the same as the input `pandas.Series`.

>>> from pyspark.sql.types import IntegerType, StringType
>>> slen = pandas_udf(lambda s: s.str.len(), IntegerType())
>>> @pandas_udf(returnType=StringType())
... def to_upper(s):
... return s.str.upper()
...
>>> @pandas_udf(returnType="integer")
... def add_one(x):
... return x + 1
...
>>> df = spark.createDataFrame([(1, "John Doe", 21)], ("id", "name", "age"))
>>> df.select(slen("name").alias("slen(name)"), to_upper("name"), add_one("age")) \\
... .show() # doctest: +SKIP
+----------+--------------+------------+
|slen(name)|to_upper(name)|add_one(age)|
+----------+--------------+------------+
| 8| JOHN DOE| 22|
+----------+--------------+------------+

2. A `pandas.DataFrame` -> A `pandas.DataFrame`
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks producing an warning here:

spark/python/pyspark/sql/functions.py:docstring of pyspark.sql.functions.pandas_udf:46: WARNING: Enumerated list ends without a blank line; unexpected unindent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed indentation.


This udf is only used with :meth:`pyspark.sql.GroupedData.apply`.
The returnType should be a :class:`StructType` describing the schema of the returned
`pandas.DataFrame`.

>>> df = spark.createDataFrame(
... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
... ("id", "v"))
>>> @pandas_udf(returnType=df.schema)
... def normalize(pdf):
... v = pdf.v
... return pdf.assign(v=(v - v.mean()) / v.std())
>>> df.groupby('id').apply(normalize).show() # doctest: +SKIP
+---+-------------------+
| id| v|
+---+-------------------+
| 1|-0.7071067811865475|
| 1| 0.7071067811865475|
| 2|-0.8320502943378437|
| 2|-0.2773500981126146|
| 2| 1.1094003924504583|
+---+-------------------+

.. note:: This type of udf cannot be used with functions such as `withColumn` or `select`
because it defines a `DataFrame` transformation rather than a `Column`
transformation.

.. seealso:: :meth:`pyspark.sql.GroupedData.apply`

.. note:: The user-defined function must be deterministic.
"""
return _create_udf(f, returnType=returnType, vectorized=True)

Expand Down
88 changes: 84 additions & 4 deletions python/pyspark/sql/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,10 @@ class GroupedData(object):
.. versionadded:: 1.3
"""

def __init__(self, jgd, sql_ctx):
def __init__(self, jgd, df):
self._jgd = jgd
self.sql_ctx = sql_ctx
self._df = df
self.sql_ctx = df.sql_ctx

@ignore_unicode_prefix
@since(1.3)
Expand Down Expand Up @@ -170,7 +171,7 @@ def sum(self, *cols):
@since(1.6)
def pivot(self, pivot_col, values=None):
"""
Pivots a column of the current [[DataFrame]] and perform the specified aggregation.
Pivots a column of the current :class:`DataFrame` and perform the specified aggregation.
There are two versions of pivot function: one that requires the caller to specify the list
of distinct values to pivot on, and one that does not. The latter is more concise but less
efficient, because Spark needs to first compute the list of distinct values internally.
Expand All @@ -192,7 +193,85 @@ def pivot(self, pivot_col, values=None):
jgd = self._jgd.pivot(pivot_col)
else:
jgd = self._jgd.pivot(pivot_col, values)
return GroupedData(jgd, self.sql_ctx)
return GroupedData(jgd, self._df)

@since(2.3)
def apply(self, udf):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rxin just to recap our discussion regarding naming:

You asked:

What's the difference between this one and the transform function you also proposed? I'm trying to see if all the naming
makes sense when considered together.

Answer is:
transform takes a function: pd.Series -> pd.Series and apply the function on each column (or subset of columns). The input and output Series are of the same length.

apply takes a function: pd.DataFrame -> pd.DataFrame and apply the function on the group. Similar to flatMapGroups

Does this make sense to you?

"""
Maps each group of the current :class:`DataFrame` using a pandas udf and returns the result
as a `DataFrame`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"pandas udf" to "pandas_udf"

DataFrame to :class:DataFrame

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think "pandas udf" as a word is fine. pandas_udf is the function name.


The user-defined function should take a `pandas.DataFrame` and return another
`pandas.DataFrame`. For each group, all columns are passed together as a `pandas.DataFrame`
to the user-function and the returned `pandas.DataFrame`s are combined as a
:class:`DataFrame`.
The returned `pandas.DataFrame` can be of arbitrary length and its schema must match the
returnType of the pandas udf.

This function does not support partial aggregation, and requires shuffling all the data in
the :class:`DataFrame`.

:param udf: A function object returned by :meth:`pyspark.sql.functions.pandas_udf`

>>> from pyspark.sql.functions import pandas_udf
>>> df = spark.createDataFrame(
... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
... ("id", "v"))
>>> @pandas_udf(returnType=df.schema)
... def normalize(pdf):
... v = pdf.v
... return pdf.assign(v=(v - v.mean()) / v.std())
>>> df.groupby('id').apply(normalize).show() # doctest: +SKIP
Copy link
Contributor Author

@icexelloss icexelloss Oct 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems this is still not skipped by doc test.

What's the best way to run doctest locally?

I tried

./run-tests --modules=pyspark-sql --parallelism=4

But it's giving me a different failure.

Copy link
Contributor Author

@icexelloss icexelloss Oct 5, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have been using

SPARK_TESTING=1  bin/pyspark pyspark.sql.tests GroupbyApplyTests

But this doesn't seem to do doctest.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the problem is, pandas_udf is unimportable in this doctest. Up to my knowledge, # doctest: +SKIP is per line.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably, importing pandas_udf should solve the problem I guess.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, it looks this file does not define spark as a global that is used in doctests. I think we should add something like ...

      sc = spark.sparkContext
      globs['sc'] = sc
+     globs['spark'] = spark 
      globs['df'] = sc.parallelize([(2, 'Alice'), (5, 'Bob')]) \

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh..Thanks! Will give it a try.

Still, is there a easier way to run the pyspark tests locally (the way jenkins runs them)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure.. I think what you know is what I usually do.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@icexelloss , this works for me to run doctests locally

SPARK_TESTING=1 bin/pyspark pyspark.sql.group

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, cool. I misunderstood. That's the answer to the question.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ahh neat. Thanks @HyukjinKwon @BryanCutler

doctest passes now.

+---+-------------------+
| id| v|
+---+-------------------+
| 1|-0.7071067811865475|
| 1| 0.7071067811865475|
| 2|-0.8320502943378437|
| 2|-0.2773500981126146|
| 2| 1.1094003924504583|
+---+-------------------+

.. seealso:: :meth:`pyspark.sql.functions.pandas_udf`

"""
from pyspark.sql.functions import pandas_udf

# Columns are special because hasattr always return True
if isinstance(udf, Column) or not hasattr(udf, 'func') or not udf.vectorized:
raise ValueError("The argument to apply must be a pandas_udf")
if not isinstance(udf.returnType, StructType):
raise ValueError("The returnType of the pandas_udf must be a StructType")

df = self._df
func = udf.func
returnType = udf.returnType
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it necessary to make all these copies? I could understand maybe copying func and columns because they are in the wrapped function, but not sure if df and returnType need to be copied

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually like it because I think it's more readable this way.


# The python executors expects the function to use pd.Series as input and output
# So we to create a wrapper function that turns that to a pd.DataFrame before passing
# down to the user function, then turn the result pd.DataFrame back into pd.Series
columns = df.columns

def wrapped(*cols):
from pyspark.sql.types import to_arrow_type
import pandas as pd
result = func(pd.concat(cols, axis=1, keys=columns))
if not isinstance(result, pd.DataFrame):
raise TypeError("Return type of the user-defined function should be "
"Pandas.DataFrame, but is {}".format(type(result)))
if not len(result.columns) == len(returnType):
raise RuntimeError(
"Number of columns of the returned Pandas.DataFrame "
"doesn't match specified schema. "
"Expected: {} Actual: {}".format(len(returnType), len(result.columns)))
arrow_return_types = (to_arrow_type(field.dataType) for field in returnType)
return [(result[result.columns[i]], arrow_type)
for i, arrow_type in enumerate(arrow_return_types)]

wrapped_udf_obj = pandas_udf(wrapped, returnType)
udf_column = wrapped_udf_obj(*[df[col] for col in df.columns])
jdf = self._jgd.flatMapGroupsInPandas(udf_column._jc.expr())
return DataFrame(jdf, self.sql_ctx)


def _test():
Expand All @@ -206,6 +285,7 @@ def _test():
.getOrCreate()
sc = spark.sparkContext
globs['sc'] = sc
globs['spark'] = spark
globs['df'] = sc.parallelize([(2, 'Alice'), (5, 'Bob')]) \
.toDF(StructType([StructField('age', IntegerType()),
StructField('name', StringType())]))
Expand Down
Loading