Skip to content

Conversation

@ueshin
Copy link
Member

@ueshin ueshin commented Oct 16, 2017

What changes were proposed in this pull request?

This is a follow-up of #18732.
This pr introduces @pandas_grouped_udf decorator for grouped vectorized UDF instead of reusing @pandas_udf decorator.

How was this patch tested?

Exisiting tests.

.. versionadded:: 1.3
"""
def __init__(self, func, returnType, name=None, vectorized=False):
def __init__(self, func, returnType, name=None, vectorized=False, grouped=False):
Copy link
Contributor

@cloud-fan cloud-fan Oct 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

vectorized=False, grouped=True is an invalid combination. How about we introduce a pythonUdfType and 0 means normal udf, 1 means pandas udf, and 2 means pandas grouped udf? We can create something like object PythonEvalType to sync this encoding between python and java.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good. I'll modify it.



@since(2.3)
def pandas_grouped_udf(f=None, returnType=StructType()):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about returnTypes without default value? pandas_grouped_udf always return a DataFrame and we should just ask users to give the data type of each column.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inside this method we can create a StructType with returnTypes and pass to _create_udf

Copy link
Member Author

@ueshin ueshin Oct 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fields of the return type are used as the output of the plan. I guess the field names are also useful for users.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah i see, make sense

@SparkQA
Copy link

SparkQA commented Oct 16, 2017

Test build #82790 has finished for PR 19505 at commit 4d2bd95.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 16, 2017

Test build #82793 has finished for PR 19505 at commit f096870.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class PythonUdfType(object):

if len(argspec.args) == 0 and argspec.varargs is None:
raise ValueError(
"0-arg pandas_udfs are not supported. "
"Instead, create a 1-arg pandas_udf and ignore the arg in your function."
Copy link
Member

@viirya viirya Oct 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe also update this error message, like "0-arg pandas_udfs/pandas_grouped_udfs are not supported. ...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I'll update the message.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, when pandas_grouped_udfs, the number of args should be only 1?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so. If it didn't become too complicated, maybe we can also check it for pandas_grouped_udf.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, let me try.

if vectorized:
def _udf(f, returnType=StringType(), pythonUdfType=pythonUdfType):
if pythonUdfType == PythonUdfType.PANDAS_UDF \
or pythonUdfType == PythonUdfType.PANDAS_GROUPED_UDF:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we add the check that PANDAS_GROUPED_UDF can only take one parameter?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I'll add it.

@SparkQA
Copy link

SparkQA commented Oct 16, 2017

Test build #82803 has finished for PR 19505 at commit 10512a6.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

udf_obj = UserDefinedFunction(f, returnType, pythonUdfType=pythonUdfType)
return udf_obj._wrapped()

# decorator @udf, @udf(), @udf(dataType()), or similar with @pandas_udf
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: update this comment

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I'll update it.

@SparkQA
Copy link

SparkQA commented Oct 16, 2017

Test build #82805 has finished for PR 19505 at commit 789e642.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

sc.pythonVer, broadcast_vars, sc._javaAccumulator)


class PythonUdfType(object):
Copy link
Member

@gatorsmile gatorsmile Oct 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you also add the descriptions about these three UDF types?

  • NORMAL_UDF: row-based UDFs
  • PANDAS_UDF: scalar vectorized UDFs
  • PANDAS_GROUPED_UDF: grouped vectorized UDFs

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I'll add the descriptions.

This udf is used with :meth:`pyspark.sql.DataFrame.withColumn` and
:meth:`pyspark.sql.DataFrame.select`.
The returnType should be a primitive data type, e.g., `DoubleType()`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happened if we do not pass a primitive data type? Do we have a test case for this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will fail in runtime. I'll add tests.

This udf is used with :meth:`pyspark.sql.DataFrame.withColumn` and
:meth:`pyspark.sql.DataFrame.select`.
The returnType should be a primitive data type, e.g., `DoubleType()`.
The length of the returned `pandas.Series` must be of the same as the input `pandas.Series`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this just a fact? or an input requirement?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's an output requirement.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can users break this requirement? If so, what happened?

Copy link
Member Author

@ueshin ueshin Oct 16, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, they can and it will fail.

def test_vectorized_udf_invalid_length(self):
from pyspark.sql.functions import pandas_udf, col
import pandas as pd
df = self.spark.range(10)
raise_exception = pandas_udf(lambda _: pd.Series(1), LongType())
with QuietTest(self.sc):
with self.assertRaisesRegexp(
Exception,
'Result vector from pandas_udf was not the required length'):
df.select(raise_exception(col('id'))).collect()

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Thanks!



@since(2.3)
def pandas_grouped_udf(f=None, returnType=StructType()):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per discussion here:
#18732 (comment)

Should we consider convert pandas_udf to pandas_grouped_udf implicitly in groupby apply and not introduce pandas_grouped_udf as a user facing API?

groupby apply implies the udf is a grouped udf, so there should not be ambiguity here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I submitted another pr #19517 based on this as a comparison.
I guess it covers what you are thinking.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ueshin , yes that's what I am thinking.

Copy link
Contributor

@icexelloss icexelloss Oct 18, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is a summary of the current proposal during some offline disuccsion:

I. Use only pandas_udf

The main issues with this approach as a few people comment out is that it is hard to know what the udf does without look at the implementation.
For instance, for a udf:

@pandas_udf(DoubleType())
def foo(v):
      ...

It's hard to tell whether this function is a reduction that returns a scalar double, or a transform function that returns a pd.Series of double.

This is less than ideal because:

  • The user of the udf cannot tell which functions this udf can be used with. i.e, can this be used with groupby().apply() or withColumn or groupby().agg()?
  • Catalyst cannot do validation at planning phase, i.e., it cannot throw exception if user passes a transformation function rather than aggregation function to groupby().agg()

II. Use different decorators. i,e, pandas_udf (or pandas_scalar_udf), pandas_grouped_udf, pandas_udaf

The idea of this approach is to use pandas_grouped_udf for all group udfs, and pandas_scalar_udf for scalar pandas udfs that gets used with "withColumn". This helps with distinguish between some scalar udf and group udfs. However, this approach doesn't help to distinguish among group udfs. For instance, the group transform and group aggregation examples above.

III. Use pandas_udf decorate and a function type enum for "one-step" vectorized udf and pandas_udaf for multi-step aggregation function

This approach uses a function type enum to describe what the udf does. Here are the proposed function types:

  • transform
    A pd.Series(s) -> pd.Series transformation that is independent of the grouping. This is the existing scalar pandas udf.
  • group_transform
    A pd.Series(s) -> pd.Series transformation that is dependent of the grouping. e.g.
@pandas_udf(DoubleType(), GROUP_TRANSFORM):
def foo(v):
      return (v - v.mean()) / v.std()
  • group_aggregate:
    A pd.Series(s) -> scalar function, e.g.
@pandas_udf(DoubleType(), GROUP_AGGREGATE):
def foo(v):
      return v.mean()
  • group_map (maybe a better name):
    This defines a pd.DataFrame -> pd.DataFrame transformation. This is the current groupby().apply() udf

These types also works with window functions because window functions are either (1) group_transform (rank) or (2) group_aggregate (first, last)

I am in favor of (3). What do you guys think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Post it in another PR #19517? This discussion thread will be collapsed when Takuya made a code change.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we should consider merging #19517 first because it's an improvement of the behavior by introducing PythonUdfType instead of the hack to detect the udf type by the return type at worker, without any user-facing API changes from #18732.
The proposal and discussion should be in this pr but out of any thread to avoid being collapsed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Proposal 3 looks great! one minor question: what's the difference between transform and group_transform? Seems we don't need to care about it both in usage and implementation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the late reply.

@gatorsmile Sounds good. I will copy the discussion in this PR as @ueshin suggested.

@ueshin +1 to merge #19517. I think it's a good change and will make it easier for later changes.

@cloud-fan transform defines a transformation that doesn't reply on grouping semantics: for instance, this is a wrong udf definition:

@pandas_udf(DoubleType(), TRANSFORM):
def foo(v):
     return (v - v.mean() / v.std())

because the transformation is replying some kind of "grouping semantics", otherwise v.mean() and v.std() has no meaning for arbitrary grouping.

@SparkQA
Copy link

SparkQA commented Oct 16, 2017

Test build #82810 has finished for PR 19505 at commit 122a7bc.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 16, 2017

Test build #82811 has finished for PR 19505 at commit fdafb35.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 16, 2017

Test build #82813 has finished for PR 19505 at commit 7332969.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

udf.references.subsetOf(child.outputSet)
}
if (validUdfs.nonEmpty) {
if (validUdfs.find(_.pythonUdfType == PythonUdfType.PANDAS_GROUPED_UDF).isDefined) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: maybe

validUdfs.exists(_.pythonUdfType == PythonUdfType.PANDAS_GROUPED_UDF)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I'll update it.

@SparkQA
Copy link

SparkQA commented Oct 17, 2017

Test build #82831 has finished for PR 19505 at commit 85f250d.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

retest this please

@HyukjinKwon
Copy link
Member

Change itself LGTM if we are okay to go separating this.

@SparkQA
Copy link

SparkQA commented Oct 17, 2017

Test build #82835 has finished for PR 19505 at commit 85f250d.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 17, 2017

Test build #82843 has finished for PR 19505 at commit 1ef25c3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

So, looks we are good to go?

@ueshin
Copy link
Member Author

ueshin commented Oct 18, 2017

I'd mark this pr as [WIP] for now because we don't reach consensus on API changes. Thanks.

@ueshin ueshin changed the title [SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby().apply() with pandas udf [WIP][SPARK-20396][SQL][PySpark][FOLLOW-UP] groupby().apply() with pandas udf Oct 18, 2017
@HyukjinKwon
Copy link
Member

I meant to ask if others agree with the current change as I could not see the ongoing discussion at that time.

@icexelloss
Copy link
Contributor

icexelloss commented Oct 19, 2017

Here is a summary of the current proposal during some offline disuccsion:

1. Use only pandas_udf

The main issues with this approach as a few people comment out is that it is hard to know what the udf does without look at the implementation.
For instance, for a udf:

@pandas_udf(DoubleType())
def foo(v):
      ...

It's hard to tell whether this function is a reduction that returns a scalar double, or a transform function that returns a pd.Series of double.

This is less than ideal because:

  • The user of the udf cannot tell which functions this udf can be used with. i.e, can this be used with groupby().apply() or withColumn or groupby().agg()?
  • Catalyst cannot do validation at planning phase, i.e., it cannot throw exception if user passes a transformation function rather than aggregation function to groupby().agg()

2. Use different decorators. i,e, pandas_udf (or pandas_scalar_udf), pandas_grouped_udf, pandas_udaf

The idea of this approach is to use pandas_grouped_udf for all group udfs, and pandas_scalar_udf for scalar pandas udfs that gets used with "withColumn". This helps with distinguish between some scalar udf and group udfs. However, this approach doesn't help to distinguish among group udfs. For instance, the group transform and group aggregation examples above.

3. Use pandas_udf decorate and a function type enum for "one-step" vectorized udf and pandas_udaf for multi-step aggregation function

This approach uses a function type enum to describe what the udf does. Here are the proposed function types:

scalar_transform

A pd.Series(s) -> pd.Series transformation that is independent of the grouping. This is the existing scalar pandas udf.

@pandas_udf(DoubleType(), SCALAR_TRANSFORM):
def plus_one(v):
      return v + 1

scalar_transform can be used with withColumn, select and etc:

df = df.withColumn("v", plus_one(v))

group_transform

A pd.Series(s) -> pd.Series transformation that is dependent of the grouping. e.g.

@pandas_udf(DoubleType(), GROUP_TRANSFORM):
def rank(v):
      return v.rank()

group_transform can be used with:

window

window = Window.partitionBy('date')

df = df.withColumn('rank', rank(df.v).over(w))

groupby

or maybe something like this in the future (Not available with the current API):

df = df.withColumn('rank', df.groupby('id').v.transform(rank))

for reference, in pandas you would write sth like this:

df = df.assign(rank=df.groupby('id').v.transform(lambda v: v.rank()))

although it's also a Series -> Series transformation, group_transform will also be rejected by withColumn, select, etc

# This doesn't make sense and will throw exception
df.withColumn(rank(df.v))

group_aggregate:

A pd.Series(s) -> scalar function, e.g.

@pandas_udf(DoubleType(), GROUP_AGGREGATE):
def mean(v):
      return v.mean()

can be used with:

window

window = Window.partitionBy('date')

df = df.withColumn('mean', mean(df.v).over(w))

groupby

df = df.groupby('id').agg(mean(df.v))

group_map (maybe a better name):

This defines a pd.DataFrame -> pd.DataFrame transformation. This is the current groupby().apply() udf

@pandas_udf(df.schema, GROUP_MAP):
def foo(pdf):
      pdf = pdf.assign(v1 = df.v1 - df.v1.mean())
      pdf = pdf.assign(v2 = df.v2 / df.v2.std())
      return pdf

Can be used with groupby apply:

df.groupby('date').apply(foo)

I am in favor of (3). What do you guys think?

@icexelloss
Copy link
Contributor

icexelloss commented Oct 19, 2017

@cloud-fan asked:
"
what's the difference between transform and group_transform? Seems we don't need to care about it both in usage and implementation.
"

My answer is:
transform defines a transformation that doesn't reply on grouping semantics: for instance, this is a wrong udf definition:

@pandas_udf(DoubleType(), TRANSFORM):
def foo(v):
     return (v - v.mean() / v.std())

because the transformation is replying some kind of "grouping semantics", otherwise v.mean() and v.std() has no meaning for arbitrary grouping. Although Catalyst cannot detect this error, but the people seeing this code can identify this error easier as the type is not group transform but the user-defined function is replying on grouping semantics. transform type also allows user to test such function by passing arbitrary grouping and verifying the results are the same.

Also, catalyst can throw exception for the code example below:

@pandas_udf(DoubleType(), GROUP_TRANSFORM):
def foo(v):
      return (v - v.mean()) / v.std()

# Should throw exception here, it should only take `transform` not `group_transform` type
df = df.withColumn(foo(df.v))

@viirya
Copy link
Member

viirya commented Oct 20, 2017

Btw, I think the scope of this change is more than just a follow-up. Should we create another JIRA for it?

@viirya
Copy link
Member

viirya commented Oct 20, 2017

@icexelloss The summary and the proposal 3 looks great. To prevent confusing, can you also put the usage of each function type in proposal 3? E.g., group_map is for groupby().apply(), transform is for withColumn, etc? Thanks.

@HyukjinKwon
Copy link
Member

+1 for separate JIRA to clarify the proposal and +0 for 3. out of those three, too.

@viirya
Copy link
Member

viirya commented Oct 20, 2017

The group_transform udfs looks a bit weird to me. @icexelloss Can you explain the use case of it? When do we need this grouping semantics?

@icexelloss
Copy link
Contributor

@viirya @cloud-fan I updated my original summary. I think it answers group_transform question. I also added more example to each type.

@HyukjinKwon @viirya I agree we can move this to a separate Jira and merge current PR of @ueshin. Maybe I can open another PR with just the proposal design doc? Not sure what's the best way is.

@gatorsmile
Copy link
Member

@ueshin Maybe close this PR?

@ueshin
Copy link
Member Author

ueshin commented Oct 20, 2017

Sure, I'd close this.
@icexelloss Of course you can open a separate JIRA and another PR. Thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants