Skip to content

Conversation

@icexelloss
Copy link
Contributor

@icexelloss icexelloss commented Jul 25, 2017

What changes were proposed in this pull request?

This PR adds an apply() function on df.groupby(). apply() takes a pandas udf that is a transformation on pandas.DataFrame -> pandas.DataFrame.

Static schema

schema = df.schema

@pandas_udf(schema)
def normalize(df):
    df = df.assign(v1 = (df.v1 - df.v1.mean()) / df.v1.std()
    return df

df.groupBy('id').apply(normalize)

Dynamic schema

This use case is removed from the PR and we will discuss this as a follow up. See discussion #18732 (review)

Another example to use pd.DataFrame dtypes as output schema of the udf:

sample_df = df.filter(df.id == 1).toPandas()

def foo(df):
      ret = # Some transformation on the input pd.DataFrame
      return ret

foo_udf = pandas_udf(foo, foo(sample_df).dtypes)

df.groupBy('id').apply(foo_udf)

In interactive use case, user usually have a sample pd.DataFrame to test function foo in their notebook. Having been able to use foo(sample_df).dtypes frees user from specifying the output schema of foo.

Design doc: https://github.com/icexelloss/spark/blob/pandas-udf-doc/docs/pyspark-pandas-udf.md

How was this patch tested?

  • Added GroupbyApplyTest

@icexelloss icexelloss changed the title groupby().apply() with pandas udf [SPARK-20396] groupby().apply() with pandas udf Jul 25, 2017
@icexelloss icexelloss changed the title [SPARK-20396] groupby().apply() with pandas udf [SPARK-20396][SQL][PySpark] groupby().apply() with pandas udf Jul 25, 2017
@icexelloss icexelloss force-pushed the groupby-apply-SPARK-20396 branch from f3bbb86 to 76a7ce6 Compare August 7, 2017 21:43
@icexelloss icexelloss force-pushed the groupby-apply-SPARK-20396 branch from ebc2c67 to 34e1dd4 Compare August 14, 2017 21:55
@holdenk
Copy link
Contributor

holdenk commented Aug 19, 2017

cc @HyukjinKwon @BryanCutler

@felixcheung
Copy link
Member

cool - this is a bit understated but potentially huge (to me anyway)

@icexelloss icexelloss force-pushed the groupby-apply-SPARK-20396 branch from 32ad7b2 to 3237cd0 Compare August 30, 2017 14:23
@HyukjinKwon
Copy link
Member

Should we maybe consider SPIP?

@icexelloss
Copy link
Contributor Author

Thanks all for comment.

@HyukjinKwon

This is part of SPIP https://issues.apache.org/jira/browse/SPARK-21190
I have been engaging in discussion of SPARK-21190 and make sure whatever I am doing here doesn't diverge from the general design the vectorized UDF.

I don't expect we start to merge this until we have a solid design in SPARK-21190. It would be great if Spark commiters can help move the discussion on SPARK-21190 forward.

@felixcheung
Copy link
Member

there's actually a number of key people participating in the discussion in JIRA/SPIP, so I think we are good

I think perhaps SPARK-20396 should be a subtask instead for tracking the overall discussion/design.

@icexelloss icexelloss force-pushed the groupby-apply-SPARK-20396 branch from c8a20cc to 8630028 Compare September 28, 2017 18:38
@icexelloss icexelloss force-pushed the groupby-apply-SPARK-20396 branch from 8630028 to 07bccca Compare September 28, 2017 18:49
@icexelloss
Copy link
Contributor Author

Hi,

Thanks to the vectorized udf change, this PR is much more smaller than the original. I think this is a useful feature and would love to get some feedback on this.

Thoughts? cc @BryanCutler @HyukjinKwon @ueshin

@HyukjinKwon
Copy link
Member

ok to test

@SparkQA
Copy link

SparkQA commented Sep 28, 2017

Test build #82288 has finished for PR 18732 at commit 07bccca.

  • This patch fails Python style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@BryanCutler BryanCutler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is looking really good @icexelloss! I'll have to look at this more in depth later as it touches a lot of code I'm not familiar with. Hopefully, someone better versed in this area can help guide you with what needs to be done to get this merged, like additional tests to add.

One question from previous discussion in the JIRA, is the length of the Pandas DataFrame from apply() determined by maxRecordsPerBatch? So if you wanted to work with an entire groupby() key, you would need to set this conf big enough?

"""
import pandas as pd
if isinstance(returnType, pd.Series):
returnType = from_pandas_dtypes(returnType)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this necessary?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we don't need this. Use consistent way to express the return type should be better.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree having a consistent way to express return type is good.

The reason I added this is to enable this usage:

sample_df = df.filter(df.id == 1).toPandas()

def foo(df):
      ret = # Some transformation on the input pd.DataFrame
      return ret

foo_udf = pandas_udf(foo, foo(sample_df).dtypes)

df.groupBy('id').apply(foo_udf)

The pattern is quite useful in interactive usage. Here the user no longer needs to specify the return schema of the foo manually. And if the user changes the return columns of foo, they don't need to change the return type of pandas_udf.

I am leaning towards keeping this but I am willing to be convinced.

[StructField('id', LongType()),
StructField('v', IntegerType()),
StructField('v1', DoubleType()),
StructField('v2', LongType())]))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The return type is a little different than current pandas_udfs. Are the resulting column names determined here? Does it have to be a StructType to do groupby().apply()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes the column names are specified in the returnType and the returnType must be a StructType.

The rational is that apply() is a mapping from a pd.Dataframe -> pd.DataFrame, therefore the returnType must be a StructType.

This is the best way I can think of to specify the column names and returnType, it makes sense to me because there should be a one-to-one mapping between the return value of the function (a pd.DataFrame) and it's schema (a StructType containing column names and dataType)

Also because pd.DataFrame doesn't support nested types, there is no ambiguity whether a StructType indicates a pd.DataFrame or nested type either.


val batchedIter: Iterator[Iterator[InternalRow]] =
iter.grouped(conf.arrowMaxRecordsPerBatch).map(_.iterator)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to group the iterator like this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The grouped iterator looks unnecessary. Actually you still write out the rows individually.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this is for making ArrowPythonRunner reusable between current pandas udf and apply() by taking Iterator[Iterator[InternalRow]] instead of Iterator[InternalRow] as its input. The rows in grouped iterator will be one RecordBatch.
I'm not sure whether it's good or not, though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually find this code doesn't work now. I will fix it.

@ueshin is right, this is to reuse ArrowEvalPython for both the current pandas udf and apply(). I basically want to lift the batching logic out of ArrowEvalPython so the called and decide how they want rows to be batched into RecordBatch.

In the current pandas udf case, it batches it by conf.arrowMaxRecordsPerBatch and in apply it batches by one group per batch.

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Sep 29, 2017

I believe I should cc @cloud-fan and @viirya too. Will take a closer look too soon.

import inspect
if len(inspect.getargspec(f).args) == 0:
argspec = inspect.getargspec(f)
if len(argspec.args) == 0 and argspec.varargs is None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BTW, let's address this comment while we are here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

@viirya
Copy link
Member

viirya commented Sep 29, 2017

Based on the doc of pandas_udf, we've already define pandas udf is an user defined function (UDF) that accepts Pandas.Series as input arguments and outputs a Pandas.Series of the same length. Is it a good idea to break this definition and let pandas udf can accept pandas.DataFrame?

Conceptually I'd see them as two types of udfs. You can't exchange them in usage.

Maybe we can define another pandas_df_udf for this purpose?

jgd = self._jgd.pivot(pivot_col, values)
return GroupedData(jgd, self.sql_ctx)

def apply(self, udf_obj):
Copy link
Member

@viirya viirya Sep 29, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We may possible pass in non-vectorized udf. Add a check for it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Member

@ueshin ueshin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@icexelloss I think this is a useful feature, too!
I left some comments for now, could you please check them as well?

grouping: Seq[Expression],
func: Expression,
override val output: Seq[Attribute],
override val child: SparkPlan
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need override val for output and child.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

func: Expression,
override val output: Seq[Attribute],
override val child: SparkPlan
) extends UnaryExecNode {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: style

    ...)
  extends UnaryExecNode {

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

.map { case (attr, i) => attr.withName(s"_$i") })

val batchedIter: Iterator[Iterator[InternalRow]] =
iter.grouped(conf.arrowMaxRecordsPerBatch).map(_.iterator)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if conf.arrowMaxRecordsPerBatch <= 0?

def apply(a: Attribute): AttributeSet = new AttributeSet(Set(new AttributeEquals(a)))

def apply(as: Attribute*): AttributeSet =
new AttributeSet(Set(as.map(new AttributeEquals(_)): _*))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this? It seems this isn't used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! Removed.

import org.apache.spark.sql.catalyst.analysis.UnresolvedDeserializer
import org.apache.spark.sql.catalyst.encoders._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.{AttributeSet, _}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can revert this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

case class FlatMapGroupsInPandas(
groupingExprs: Seq[Expression],
functionExpr: Expression,
override val output: Seq[Attribute],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need override val here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

df: DataFrame,
groupingExprs: Seq[Expression],
val df: DataFrame,
val groupingExprs: Seq[Expression],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are these val for?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val df is used for accessing the jdf object from python:

https://github.com/icexelloss/spark/blob/groupby-apply-SPARK-20396/python/pyspark/sql/group.py#L204

Alternatively, I can restore a reference to the python DataFrame in python GroupedData object. It doesn't seem to be much different though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed val from groupingExprs


private[sql] def flatMapGroupsInPandas(
expr: PythonUDF
): DataFrame = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: We can make this one line.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

val argOffsets = Array((0 until child.schema.length).toArray)

inputRDD.mapPartitionsInternal { iter =>
val grouped = GroupedIterator(iter, groupingAttributes, child.output)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should use grouping instead of groupingAttributes here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other places pass groupingAttributes to GroupedIterator. What's the difference between the two?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking that the implementation on that time doesn't support grouping like:

df.groupby(col('id') % 2 == 0).apply(...)

but the change I supposed doesn't work either.
The current implementation seems to not support the grouping above, though.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I sent a pr to your repository to support these cases icexelloss#4.
Could you take a look at it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes thanks much! I will take a look now.


private[sql] def flatMapGroupsInPandas(
expr: PythonUDF
): DataFrame = {
Copy link
Member

@viirya viirya Sep 29, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The passed PythonUDF can possibly be non-vectorized UDF too. Add a check for it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


df = DataFrame(self._jgd.df(), self.sql_ctx)
func = udf_obj.func
returnType = udf_obj.returnType
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check if the return type is struct type?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

jgd = self._jgd.pivot(pivot_col, values)
return GroupedData(jgd, self.sql_ctx)

def apply(self, udf_obj):
Copy link
Member

@viirya viirya Sep 29, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can define another pandas_df_udf for this kind of pandas udf? We can also check for this kind of pandas udf, e.g. I think it should have just one parameter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure if that is necessary. We can check if the function should have just one parameter in apply() without introducing a new pandas_df_udf too.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm basically concerned that there is no distinct difference between the current pandas udf and the new one for apply. But seems we can distinguish them by looking at the return type? If so, we may no need of pandas_df_udf.

But we should update the doc of pandas_udf for this kind of (apply) pandas udf.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. It's a totally valid concern. Yeah I think we can distinguish them by returnType.

I will update the doc.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doc updated

@icexelloss
Copy link
Contributor Author

Thanks all for the initial review! I will address some comments and upload a new version today.

/**
* Physical node for [[org.apache.spark.sql.catalyst.plans.logical.FlatMapGroupsInPandas]]
*
* Rows in each group are passed to the python worker as a Arrow record batch.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a Arrow -> an Arrow

minor nits: capitalize Python and Java, and change to Pandas.DataFrame in these paragraphs

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed "a Arrow -> an Arrow"

Fixed "Python and Java capitalization"

I am actually leaning toward keeping pandas.DataFrame . The preference to pandas is usually lower case:
https://pandas.pydata.org/pandas-docs/stable/

@BryanCutler
Copy link
Member

I had some minor comments on the docs, otherwise LGTM!

@SparkQA
Copy link

SparkQA commented Oct 10, 2017

Test build #82587 has finished for PR 18732 at commit 9c2b10e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Oct 10, 2017

Test build #82599 has finished for PR 18732 at commit dc1d406.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

Merged to master.

@asfgit asfgit closed this in bfc7e1f Oct 10, 2017
@HyukjinKwon
Copy link
Member

Nice work 👍

@icexelloss
Copy link
Contributor Author

icexelloss commented Oct 10, 2017

@HyukjinKwon Thanks!

Thanks to everyone for reviewing this tirelessly.

@cloud-fan
Copy link
Contributor

A late question: shall we create another API for it instead of reusing pandas_udf? cc @ueshin

@HyukjinKwon
Copy link
Member

I think @viirya raised this question too - #18732 (comment) and I think I also left few worries about thus here and there. To me, +0.

@icexelloss
Copy link
Contributor Author

icexelloss commented Oct 13, 2017

@cloud-fan, it's a good question, I thought quite a bit about it and discussed with @viirya -#18732 (review)

Just to recap, I think from a API perspective, having just one decorator pandas_udf making it easier for user to use - they don't need to think about which decorator to use where. It does make it a little bit complicated for implementation because some code have to interpret the context in which a pandas_udf is used, i.e., pandas_udf in groupby()apply() is a pandas.DataFrame -> pandas.DataFrame, and in withColumn, select it's pandas.Series -> pandas.Series.

Another thought is even if we were to introduce something like pandas_df_udf, we might still run into issues in the future where, say, we want a aggregate pandas udf that defines mapping pandas.Series -> scalar, so I don't think we can define a decorator for every input/output shape because there can potentially be many.

@cloud-fan
Copy link
Contributor

@ueshin is working on pandas UDAF, let's wait for his feedback.

@ueshin
Copy link
Member

ueshin commented Oct 16, 2017

I'm +0 for now.
I'm just wondering whether we can support struct types in vectorized UDF when needed in the future.

As for adding pandas UDAF, I think we need another decorator or something to specify it supports partial aggregation or not and the related parameters if needed.

@gatorsmile
Copy link
Member

gatorsmile commented Oct 16, 2017

How to name the UDF defined in this PR? GroupBy vectorized UDFs?

@rxin
Copy link
Contributor

rxin commented Oct 16, 2017

Grouped UDFs, or Grouped Vectorized UDFs.

@ueshin
Copy link
Member

ueshin commented Oct 16, 2017

I submitted a pr #19505 to introduce @pandas_grouped_udf instead of reusing @pandas_udf.

@icexelloss
Copy link
Contributor Author

I am still not crazy about introducing a pandas_grouped_udf unless there is strong reason to. @ueshin do you think this is just an issue of returnType or is there some other reason?

@cloud-fan
Copy link
Contributor

@icexelloss I think as an API, it's a little confusing that @pandas_udf can define both Series* -> Series function and DataFrame -> DataFrame function. Besides, to support StructType as the return type of Series* -> Series function, I think we have to add an extra flag to @pandas_udf. For the coming DataFrame -> Scalar pandas UDAF, we also need extra flags to represent partial aggregate ability.

From my experience of Java/Scala API design, I think it's a bad idea to have a method with many parameters as flags. We'd better have more methods. For this case, @pandas_udf, @pandas_grouped_udf and @pandas_udaf looks better to me.

@icexelloss
Copy link
Contributor Author

@cloud-fan Thanks for your feedback.

I think it makes sense to define pandas_udaf as it's own function because it is a multi-step udf and is very different from the existing pandas_udf

I also agree we shouldn't add many parameters as flag. However, here are something I am not sure about:

  • Use different function name (i.e, pandas_udf and pandas_grouped_udf) for different input/output type (pd.Series, pd.DataFrame, or Scalar):
    There could be potentially many combination of input/output types that doesn't fit into pd.Series -> pd.Series and pd.DataFrame -> pd.DataFrame. For instance, for a vectorized window function and aggregation, it would probably be a pd.Series -> scalar function, which is different from either pandas_udf or pandas_grouped_udf, I am not sure if we want to introduce another decorator for those cases.

  • Distinguish between a Struct column and a DataFrame:
    I think we can accomplish this without introducing a new function decorator or a parameter, but by the context how pandas_udf is used. For instance, withColumn add a column, so the return type specifies the column type, where groupby().apply() maps a DataFrame, so the return type specifies dataframe schema.

@cloud-fan
Copy link
Contributor

Use different function name for different input/output type

Yea it's a bad idea as there are many combinations, and I just wanna use different APIs for different scenarios, e,g, @pandas_udf for select/withColumn(Series* -> Series), @pandas_grouped_udf for groupBy(apply: DataFrame -> DataFrame, reduce: DataFrame -> Scalar and more) and @pandas_udaf for aggregate.

Different scenarios usually have different requirements, having different APIs can help us satisfy these requirements individually.

@cloud-fan
Copy link
Contributor

Let's discuss more on the new PR. At least we should create different UDF types in the implementation, the user-facing API can remain @pandas_udf.

@icexelloss
Copy link
Contributor Author

@cloud-fan Sounds good. Thanks!

asfgit pushed a commit that referenced this pull request Oct 20, 2017
## What changes were proposed in this pull request?

This is a follow-up of #18732.
This pr modifies `GroupedData.apply()` method to convert pandas udf to grouped udf implicitly.

## How was this patch tested?

Exisiting tests.

Author: Takuya UESHIN <[email protected]>

Closes #19517 from ueshin/issues/SPARK-20396/fup2.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.