Skip to content

Conversation

@mstewart141
Copy link
Contributor

@mstewart141 mstewart141 commented Mar 24, 2018

What changes were proposed in this pull request?

Add documentation about the limitations of pandas_udf with keyword arguments and related concepts, like functools.partial fn objects.

NOTE: intermediate commits on this PR show some of the steps that can be taken to fix some (but not all) of these pain points.

Survey of problems we face today:

(Initialize) Note: python 3.6 and spark 2.4snapshot.

 from pyspark.sql import SparkSession
 import inspect, functools
 from pyspark.sql.functions import pandas_udf, PandasUDFType, col, lit, udf

 spark = SparkSession.builder.getOrCreate()
 print(spark.version)

 df = spark.range(1,6).withColumn('b', col('id') * 2)

 def ok(a,b): return a+b

Using a keyword argument at the call site b=... (and yes, full stack trace below, haha):

---> 14 df.withColumn('ok', pandas_udf(f=ok, returnType='bigint')('id', b='id')).show() # no kwargs

TypeError: wrapper() got an unexpected keyword argument 'b'

Using partial with a keyword argument where the kw-arg is the first argument of the fn:
(Aside: kind of interesting that lines 15,16 work great and then 17 explodes)

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-9-e9f31b8799c1> in <module>()
     15 df.withColumn('ok', pandas_udf(f=functools.partial(ok, 7), returnType='bigint')('id')).show()
     16 df.withColumn('ok', pandas_udf(f=functools.partial(ok, b=7), returnType='bigint')('id')).show()
---> 17 df.withColumn('ok', pandas_udf(f=functools.partial(ok, a=7), returnType='bigint')('id')).show()

/Users/stu/ZZ/spark/python/pyspark/sql/functions.py in pandas_udf(f, returnType, functionType)
   2378         return functools.partial(_create_udf, returnType=return_type, evalType=eval_type)
   2379     else:
-> 2380         return _create_udf(f=f, returnType=return_type, evalType=eval_type)
   2381
   2382

/Users/stu/ZZ/spark/python/pyspark/sql/udf.py in _create_udf(f, returnType, evalType)
     54                 argspec.varargs is None:
     55             raise ValueError(
---> 56                 "Invalid function: 0-arg pandas_udfs are not supported. "
     57                 "Instead, create a 1-arg pandas_udf and ignore the arg in your function."
     58             )

ValueError: Invalid function: 0-arg pandas_udfs are not supported. Instead, create a 1-arg pandas_udf and ignore the arg in your function.

@mstewart141
Copy link
Contributor Author

@HyukjinKwon the old pr: #20798

was a disaster from a git-cleanliness perspective so i've updated here.

@HyukjinKwon
Copy link
Member

ok to test

@SparkQA
Copy link

SparkQA commented Mar 25, 2018

Test build #88566 has finished for PR 20900 at commit bc49c3c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Mar 25, 2018

@mstewart141, just to be clear, the error:

ValueError: Function has keyword-only parameters or annotations, use getfullargspec() API which can support them

is from deprecated getargspec instead of getfullargspec that's fixed by you - #20728. Current error in Python 2 seems like this:

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/.../spark/python/pyspark/sql/functions.py", line 2380, in pandas_udf
    return _create_udf(f=f, returnType=return_type, evalType=eval_type)
  File "/.../spark/python/pyspark/sql/udf.py", line 51, in _create_udf
    argspec = _get_argspec(f)
  File "/.../spark/python/pyspark/util.py", line 60, in _get_argspec
    argspec = inspect.getargspec(f)
  File "/usr/local/Cellar/python/2.7.14_3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/inspect.py", line 818, in getargspec
    raise TypeError('{!r} is not a Python function'.format(func))
TypeError: <functools.partial object at 0x1117dccb0> is not a Python function

with the reproducer below:

from functools import partial
from pyspark.sql.functions import pandas_udf

def test_func(a, b):
    return a + b

pandas_udf(partial(test_func, b='id'), "string")

I think this should work like a normal udf

from functools import partial
from pyspark.sql.functions import udf

def test_func(a, b):
    return a + b

normal_udf = udf(partial(test_func, b='id'), "string")
df = spark.createDataFrame([["a"]])
df.select(normal_udf("_1")).show()

So, I think we should add the support for callable objects / partial functions in Pandas UDFs in Python 2 too. Would you be interested in filling JIRA(s) and proceeding? If you are busy, I am willing to do it as well.

cc @ueshin, @BryanCutler and @icexelloss FYI.

in boolean expressions and it ends up with being executed all internally. If the functions
can fail on special rows, the workaround is to incorporate the condition into the functions.
.. note:: The user-defined functions do not take keyword arguments.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add on the calling side (I meant not on function declaration side) at the end for clarification.

@HyukjinKwon
Copy link
Member

From a very quick look for the case "Try to be sneaky and don't use keywords with partial:".

Seems it's due to type mismatch. This seems working fine (in Python 3):

>>> spark.range(1).withColumn('ok', pandas_udf(f=partial(test_func, 2), returnType='bigint')('id')).show()
+---+---+
| id| ok|
+---+---+
|  0|  2|
+---+---+

I think we can take this example out in the description.

@HyukjinKwon
Copy link
Member

LGTM except #20900 (comment)

@mstewart141
Copy link
Contributor Author

mstewart141 commented Mar 25, 2018

Many (though not all, I don't think callables are impacted) of the limitations of pandas_udf relative to UDF in this domain are due to the fact that pandas_udf doesn't allow for keyword arguments at the call site (nor do standard udfs). This obviously impacts plain old function-based pandas_udfs but also partial fns, where one would typically need to specify the argument (that one was partially applying) by name.

In the incremental commits of this PR as at:
9ea2595

You can see the kind of things I was investigating to try and fix that case. Indeed my original PR was (ambitiously) titled something about enabling kw args for all pandas_udfs. This is actually very easy to do for functions on python3 (and maybe plain functions in py2 also, but I suspect that this is also rather tricky as getargspec is pretty unhelpful when it comes to some of the kw-arg metadata one would need)). However, it is rather harder for the partial function case as one quickly gets into stacktraces from places like python/pyspark/worker.py where the semantics of the current strategy do not realize that a column from the arguments list may already be "accounted for" and one runs into duplicate arguments passed for a, e.g., as a result of this.

My summary is that the change to allow kw for functions is simple (at least in py3 -- indeed my incremental commit referenced above does this), but for partial fns maybe not so much. I'm pretty confident I'm most of the way to accomplishing the former, but not that latter.

However, I have no substantial knowledge of the pyspark codebase so you will likely have better luck there, should you go down that route :)

TL;DR: I could work on a PR to allow keyword arguments for python3 functions (not partials, and not py2), but that is likely too narrow a goal given the broader context.

One general question: how do we tend to think about the py2/3 split for api quirks/features? Must everything that is added for py3 also be functional in py2?

@felixcheung
Copy link
Member

felixcheung commented Mar 25, 2018

One general question: how do we tend to think about the py2/3 split for api quirks/features? Must everything that is added for py3 also be functional in py2?

ideally, is there something you have in mind that would not work in py2? (Aside from kw in your example)

@SparkQA
Copy link

SparkQA commented Mar 25, 2018

Test build #88573 has finished for PR 20900 at commit a3da39c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

I think we should generally make everything works in both Python 2 and Python 3 but I want to know if there are special chases that I am missing too if there are any.

asfgit pushed a commit that referenced this pull request Mar 26, 2018
…ord args

## What changes were proposed in this pull request?

Add documentation about the limitations of `pandas_udf` with keyword arguments and related concepts, like `functools.partial` fn objects.

NOTE: intermediate commits on this PR show some of the steps that can be taken to fix some (but not all) of these pain points.

### Survey of problems we face today:

(Initialize) Note: python 3.6 and spark 2.4snapshot.
```
 from pyspark.sql import SparkSession
 import inspect, functools
 from pyspark.sql.functions import pandas_udf, PandasUDFType, col, lit, udf

 spark = SparkSession.builder.getOrCreate()
 print(spark.version)

 df = spark.range(1,6).withColumn('b', col('id') * 2)

 def ok(a,b): return a+b
```

Using a keyword argument at the call site `b=...` (and yes, *full* stack trace below, haha):
```
---> 14 df.withColumn('ok', pandas_udf(f=ok, returnType='bigint')('id', b='id')).show() # no kwargs

TypeError: wrapper() got an unexpected keyword argument 'b'
```

Using partial with a keyword argument where the kw-arg is the first argument of the fn:
*(Aside: kind of interesting that lines 15,16 work great and then 17 explodes)*
```
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-9-e9f31b8799c1> in <module>()
     15 df.withColumn('ok', pandas_udf(f=functools.partial(ok, 7), returnType='bigint')('id')).show()
     16 df.withColumn('ok', pandas_udf(f=functools.partial(ok, b=7), returnType='bigint')('id')).show()
---> 17 df.withColumn('ok', pandas_udf(f=functools.partial(ok, a=7), returnType='bigint')('id')).show()

/Users/stu/ZZ/spark/python/pyspark/sql/functions.py in pandas_udf(f, returnType, functionType)
   2378         return functools.partial(_create_udf, returnType=return_type, evalType=eval_type)
   2379     else:
-> 2380         return _create_udf(f=f, returnType=return_type, evalType=eval_type)
   2381
   2382

/Users/stu/ZZ/spark/python/pyspark/sql/udf.py in _create_udf(f, returnType, evalType)
     54                 argspec.varargs is None:
     55             raise ValueError(
---> 56                 "Invalid function: 0-arg pandas_udfs are not supported. "
     57                 "Instead, create a 1-arg pandas_udf and ignore the arg in your function."
     58             )

ValueError: Invalid function: 0-arg pandas_udfs are not supported. Instead, create a 1-arg pandas_udf and ignore the arg in your function.
```

Author: Michael (Stu) Stewart <[email protected]>

Closes #20900 from mstewart141/udfkw2.

(cherry picked from commit 087fb31)

Signed-off-by: hyukjinkwon <[email protected]>

Signed-off-by: hyukjinkwon <[email protected]>
@HyukjinKwon
Copy link
Member

Merged to master and branch-2.3 anyway.

@asfgit asfgit closed this in 087fb31 Mar 26, 2018
@icexelloss
Copy link
Contributor

Thank you @mstewart141 for looking into this.

@HyukjinKwon should we open Jira for supporting kw args and partial functions in python UDFs? If I understand correctly, this is related to both regular python UDFs and pandas UDFs, is that right?

@mstewart141
Copy link
Contributor Author

Partials (and callable objects) are supported in UDF but not pandas_udf; kw args are not supported by either.

@HyukjinKwon
Copy link
Member

@icexelloss, yup ^ is correct. IIRC, we have some tests for normal udfs with callable objects and partial functions separately but seems the problem is in Pandas UDF. I think the fix itself will relativrly minimal (just from my wild guess).

would you be inretested in doing this?

@HyukjinKwon
Copy link
Member

HyukjinKwon commented Mar 27, 2018

to be clear, I think both functions below

class F(object):
    def __call__(...):
        ...

func = F()
def naive_func(a, b):
    ...

func = partial(naive_func, 1)

should work with Pandas UDF but seems not working given my test #20900 (comment)

@HyukjinKwon
Copy link
Member

The issue itself here (SPARK-23645) describes kwargs arguments support in both UDF and Pandas UDF on calling side. Seems not working but the fix looks going to be quite invasive and big. So, I suggested to fix the documentation for now. Maybe, we should revisit in the future. Let's monitor mailing list and JIRAs.

#20900 (comment) with #20900 (comment) is a separate issue about partial functions and callable objects in Pandas UDF, I found during review.

@icexelloss
Copy link
Contributor

@HyukjinKwon Thanks for the explanation. I will create Jira for partial functions and callable objects in Pandas UDF. I am happy to take a look at it.

@icexelloss
Copy link
Contributor

@mstewart141
Copy link
Contributor Author

mstewart141 commented Mar 27, 2018

@icexelloss as a daily user of pandas_udf, the inability to use keyword arguments, and the difficulties around default arguments (due in part to the magic that converts string arguments to pd.series, which doesn't apply to default args) , are rather more annoying to me than the lack of support for partials and callables, which are more peripheral issues.

(take as just one data point, certainly, others may have differing opinions.)

peter-toth pushed a commit to peter-toth/spark that referenced this pull request Oct 6, 2018
…ord args

## What changes were proposed in this pull request?

Add documentation about the limitations of `pandas_udf` with keyword arguments and related concepts, like `functools.partial` fn objects.

NOTE: intermediate commits on this PR show some of the steps that can be taken to fix some (but not all) of these pain points.

### Survey of problems we face today:

(Initialize) Note: python 3.6 and spark 2.4snapshot.
```
 from pyspark.sql import SparkSession
 import inspect, functools
 from pyspark.sql.functions import pandas_udf, PandasUDFType, col, lit, udf

 spark = SparkSession.builder.getOrCreate()
 print(spark.version)

 df = spark.range(1,6).withColumn('b', col('id') * 2)

 def ok(a,b): return a+b
```

Using a keyword argument at the call site `b=...` (and yes, *full* stack trace below, haha):
```
---> 14 df.withColumn('ok', pandas_udf(f=ok, returnType='bigint')('id', b='id')).show() # no kwargs

TypeError: wrapper() got an unexpected keyword argument 'b'
```

Using partial with a keyword argument where the kw-arg is the first argument of the fn:
*(Aside: kind of interesting that lines 15,16 work great and then 17 explodes)*
```
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-9-e9f31b8799c1> in <module>()
     15 df.withColumn('ok', pandas_udf(f=functools.partial(ok, 7), returnType='bigint')('id')).show()
     16 df.withColumn('ok', pandas_udf(f=functools.partial(ok, b=7), returnType='bigint')('id')).show()
---> 17 df.withColumn('ok', pandas_udf(f=functools.partial(ok, a=7), returnType='bigint')('id')).show()

/Users/stu/ZZ/spark/python/pyspark/sql/functions.py in pandas_udf(f, returnType, functionType)
   2378         return functools.partial(_create_udf, returnType=return_type, evalType=eval_type)
   2379     else:
-> 2380         return _create_udf(f=f, returnType=return_type, evalType=eval_type)
   2381
   2382

/Users/stu/ZZ/spark/python/pyspark/sql/udf.py in _create_udf(f, returnType, evalType)
     54                 argspec.varargs is None:
     55             raise ValueError(
---> 56                 "Invalid function: 0-arg pandas_udfs are not supported. "
     57                 "Instead, create a 1-arg pandas_udf and ignore the arg in your function."
     58             )

ValueError: Invalid function: 0-arg pandas_udfs are not supported. Instead, create a 1-arg pandas_udf and ignore the arg in your function.
```

Author: Michael (Stu) Stewart <[email protected]>

Closes apache#20900 from mstewart141/udfkw2.

(cherry picked from commit 087fb31)

Signed-off-by: hyukjinkwon <[email protected]>

Signed-off-by: hyukjinkwon <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants