Skip to content

Conversation

@HyukjinKwon
Copy link
Member

@HyukjinKwon HyukjinKwon commented Feb 7, 2018

What changes were proposed in this pull request?

This PR targets to explicitly specify supported types in Pandas UDFs.
The main change here is to add a deduplicated and explicit type checking in returnType ahead with documenting this; however, it happened to fix multiple things.

  1. Currently, we don't support BinaryType in Pandas UDFs, for example, see:

    from pyspark.sql.functions import pandas_udf
    pudf = pandas_udf(lambda x: x, "binary")
    df = spark.createDataFrame([[bytearray(1)]])
    df.select(pudf("_1")).show()
    ...
    TypeError: Unsupported type in conversion to Arrow: BinaryType
    

    We can document this behaviour for its guide.

  2. Also, the grouped aggregate Pandas UDF fails fast on ArrayType but seems we can support this case.

    from pyspark.sql.functions import pandas_udf, PandasUDFType
    foo = pandas_udf(lambda v: v.mean(), 'array<double>', PandasUDFType.GROUPED_AGG)
    df = spark.range(100).selectExpr("id", "array(id) as value")
    df.groupBy("id").agg(foo("value")).show()
    ...
     NotImplementedError: ArrayType, StructType and MapType are not supported with PandasUDFType.GROUPED_AGG
    
  3. Since we can check the return type ahead, we can fail fast before actual execution.

    # we can fail fast at this stage because we know the schema ahead
    pandas_udf(lambda x: x, BinaryType())

How was this patch tested?

Manually tested and unit tests for BinaryType and ArrayType(...) were added.

@HyukjinKwon
Copy link
Member Author

HyukjinKwon commented Feb 7, 2018

@ueshin, @BryanCutler and @icexelloss, mind taking a look please when you are available?

import pandas as pd
result = f(*series)
return pd.Series(result)
return pd.Series([result])
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change seems to be required:

>>> import numpy as np
>>> import pandas as pd
>>> pd.Series(np.array([1, 2, 3]))
0    1
1    2
2    3
dtype: int64
>>> pd.Series([np.array([1, 2, 3])])
0    [1, 2, 3]
dtype: object
>>> pd.Series(1)
0    1
dtype: int64
>>> pd.Series([1])
0    1
dtype: int64

@SparkQA
Copy link

SparkQA commented Feb 7, 2018

Test build #87160 has finished for PR 20531 at commit ec708d5.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 7, 2018

Test build #87163 has finished for PR 20531 at commit 7cbebaf.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

### Supported SQL Types

Currently, all Spark SQL data types are supported by Arrow-based conversion except `MapType`,
Currently, all Spark SQL data types are supported by Arrow-based conversion except `BinaryType`, `MapType`,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought binary type is supported... I am curious, what's the reason that it doesn't work now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was under impression that we don't support this. Seems Arrow doesn't work consistently with what Spark does. I think it's actually related with #20507.

I am careful to say this out but I believe the root cause is how to handle str in Python 2. Technically, it's bytes but named string. As you might already know, due to this confusion, unicode became str and str became bytes in Python 3. Spark handles this as StringType in general whereas seems Arrow deals with binaries.

I think we shouldn't support this for now until we get the consistent behaviour.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Thanks for the explanation.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, we need to look into these details more before we can support this type

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should BinaryType be added to the unsupported types with arrow.enabled in SQLConf.scala?

def foo(x):
return x
self.assertEqual(foo.returnType, schema)
self.assertEqual(foo.returnType, schema[0].dataType)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we just:

self.assertEqual(foo.returnType, DoubleType())

?

from pyspark.sql.functions import pandas_udf, PandasUDFType
df = self.data
from pyspark.sql.functions import pandas_udf, PandasUDFType, array, col
df = self.data.withColumn("arr", array(col("id")))
Copy link
Contributor

@icexelloss icexelloss Feb 7, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: It seems a bit arbitrary to mix array type in this test. Array probably belongs to a new test (if it doesn't exist yet) test_array, test_complex_types sth like test_all_types

with QuietTest(self.sc):
with self.assertRaisesRegexp(NotImplementedError, 'not supported'):
@pandas_udf(ArrayType(DoubleType()), PandasUDFType.GROUPED_AGG)
@pandas_udf(ArrayType(ArrayType(TimestampType())), PandasUDFType.GROUPED_AGG)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is ArrayType(TimestampType()) a special case that is not supported? (I haven't fully tested this when implementing this feature, is only array of primitives supported?)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems because we don't handle the timezone issue when it's nested. There are few todos, for example:

# TODO: handle nested timestamps, such as ArrayType(TimestampType())?

# TODO: handle nested timestamps, such as ArrayType(TimestampType())?

# TODO: handle nested timestamps, such as ArrayType(TimestampType())?

# TODO: handle nested timestamps, such as ArrayType(TimestampType())?

to_arrow_schema(self._returnType_placeholder)
except TypeError:
raise NotImplementedError(
"Invalid returnType with a grouped map Pandas UDF: "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: a grouped map Pandas UDF -> grouped map Pandas UDFs?


result1 = df.groupby('id').agg(sum_udf(df.v), mean_udf(df.v)).sort('id')
mean_arr_udf = pandas_udf(
self.pandas_agg_mean_udf.func,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For arrays, can we add tests for:

  • Test type coercion, e.g., specified type is array<double> and returned array is [0, 1, 2]?
  • Test exception: function returns array of different types like [0, "hello"]

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, we can have this to be a follow up and I can do it too

Copy link
Member Author

@HyukjinKwon HyukjinKwon Feb 7, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you meant to type coercion (did I understand correctly?), I already tested in my local. Seems not working properly. Similar thing was discussed in #20163 (comment) (thanks @ueshin).

Will reread the comments when I am more awake tomorrow ...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think with Pandas UDFs, certain type coercion is supported, e.g., when user specify "double type" and returns a pd.Series of int, it will automatically cast it to pd.Series of double. This behavior is different from regular Python UDF, which will return null in this case. Most of the type coercion is done by pyarrow. (Btw, I think type coercion in Pandas UDFs is an huge improvement over Python UDF because that's one of the biggest frustration our PySpark users have...)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw, if type coercion is not working with array type, I think it's still fine to allow using array type and fix type coercion separately.

Copy link
Member Author

@HyukjinKwon HyukjinKwon Feb 10, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm .. let's do it separately for type coercion stuff in another issue. I think we need another iteration for it. I am actually less sure yet if we officially document and support the type coercion given our past discussion.

@icexelloss
Copy link
Contributor

@HyukjinKwon Looks good to me at high level. Left some comments.

@HyukjinKwon
Copy link
Member Author

Yup, let me try to address them tomorrow. Thanks for your review.

"Invalid returnType with a grouped map Pandas UDF: "
"%s is not supported" % str(self._returnType_placeholder))
else:
raise TypeError("Invalid returnType for a grouped map Pandas "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: a grouped map Pandas UDF -> grouped map Pandas UDFs?

to_arrow_type(self._returnType_placeholder)
except TypeError:
raise NotImplementedError(
"Invalid returnType with a scalar Pandas UDF: %s is "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

to_arrow_type(self._returnType_placeholder)
except TypeError:
raise NotImplementedError(
"Invalid returnType with a grouped aggregate Pandas UDF: "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@SparkQA
Copy link

SparkQA commented Feb 7, 2018

Test build #87164 has finished for PR 20531 at commit 68662ec.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

raise NotImplementedError(
"ArrayType, StructType and MapType are not supported with "
"PandasUDFType.GROUPED_AGG")
if self.evalType == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF:
Copy link
Member

@ueshin ueshin Feb 8, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'd prefer to keep the check order by the definition in PythonEvalType if you don't have a special reason.

E.g.,

if self.evalType == PythonEvalType.SQL_SCALAR_PANDAS_UDF:
    ...
elif self.evalType == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF:
    ...
elif self.evalType == PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF:
    ...

with self.assertRaisesRegexp(NotImplementedError, 'not supported'):
@pandas_udf(ArrayType(DoubleType()), PandasUDFType.GROUPED_AGG)
@pandas_udf(ArrayType(ArrayType(TimestampType())), PandasUDFType.GROUPED_AGG)
def mean_and_std_udf(v):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should rename this?

@HyukjinKwon
Copy link
Member Author

@ueshin and @icexelloss, thanks for your review. I tried to address the comments at my best.

@SparkQA
Copy link

SparkQA commented Feb 8, 2018

Test build #87214 has finished for PR 20531 at commit 36617e4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@BryanCutler BryanCutler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, just mentioned that might want to include BInaryType as unsupported in SQLConf doc. Thanks for doing some cleanup too!

DataFrame to the driver program and should be done on a small subset of the data. Not all Spark
data types are currently supported and an error can be raised if a column has an unsupported type,
see [Supported Types](#supported-sql-arrow-types). If an error occurs during `createDataFrame()`,
see [Supported SQL Types](#supported-sql-arrow-types). If an error occurs during `createDataFrame()`,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice catch!

### Supported SQL Types

Currently, all Spark SQL data types are supported by Arrow-based conversion except `MapType`,
Currently, all Spark SQL data types are supported by Arrow-based conversion except `BinaryType`, `MapType`,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should BinaryType be added to the unsupported types with arrow.enabled in SQLConf.scala?

@icexelloss
Copy link
Contributor

icexelloss commented Feb 9, 2018

@HyukjinKwon LGTM! My only comment left is #20531 (comment) . But we can have separate PR for testing type coercion with array type.

@SparkQA
Copy link

SparkQA commented Feb 10, 2018

Test build #87280 has finished for PR 20531 at commit 07f2d78.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member Author

retest this please

@SparkQA
Copy link

SparkQA commented Feb 10, 2018

Test build #87281 has finished for PR 20531 at commit 07f2d78.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member Author

@ueshin, does this looks fine to you too?

@ueshin
Copy link
Member

ueshin commented Feb 12, 2018

@HyukjinKwon Yes, LGTM.

@HyukjinKwon
Copy link
Member Author

retest this please

@SparkQA
Copy link

SparkQA commented Feb 12, 2018

Test build #87325 has finished for PR 20531 at commit 07f2d78.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member Author

retest this please

@SparkQA
Copy link

SparkQA commented Feb 12, 2018

Test build #87328 has finished for PR 20531 at commit 07f2d78.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 12, 2018

Test build #87329 has finished for PR 20531 at commit 07f2d78.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member Author

Merged to master.

@asfgit asfgit closed this in c338c8c Feb 12, 2018
@HyukjinKwon
Copy link
Member Author

Thank you for reviewing this, @icexelloss, @ueshin and @BryanCutler.

HyukjinKwon added a commit to HyukjinKwon/spark that referenced this pull request Feb 12, 2018
This PR targets to explicitly specify supported types in Pandas UDFs.
The main change here is to add a deduplicated and explicit type checking in `returnType` ahead with documenting this; however, it happened to fix multiple things.

1. Currently, we don't support `BinaryType` in Pandas UDFs, for example, see:

    ```python
    from pyspark.sql.functions import pandas_udf
    pudf = pandas_udf(lambda x: x, "binary")
    df = spark.createDataFrame([[bytearray(1)]])
    df.select(pudf("_1")).show()
    ```
    ```
    ...
    TypeError: Unsupported type in conversion to Arrow: BinaryType
    ```

    We can document this behaviour for its guide.

2. Also, the grouped aggregate Pandas UDF fails fast on `ArrayType` but seems we can support this case.

    ```python
    from pyspark.sql.functions import pandas_udf, PandasUDFType
    foo = pandas_udf(lambda v: v.mean(), 'array<double>', PandasUDFType.GROUPED_AGG)
    df = spark.range(100).selectExpr("id", "array(id) as value")
    df.groupBy("id").agg(foo("value")).show()
    ```

    ```
    ...
     NotImplementedError: ArrayType, StructType and MapType are not supported with PandasUDFType.GROUPED_AGG
    ```

3. Since we can check the return type ahead, we can fail fast before actual execution.

    ```python
    # we can fail fast at this stage because we know the schema ahead
    pandas_udf(lambda x: x, BinaryType())
    ```

Manually tested and unit tests for `BinaryType` and `ArrayType(...)` were added.

Author: hyukjinkwon <[email protected]>

Closes apache#20531 from HyukjinKwon/pudf-cleanup.

(cherry picked from commit c338c8c)
Signed-off-by: hyukjinkwon <[email protected]>
arrow_type = pa.timestamp('us', tz='UTC')
elif type(dt) == ArrayType:
if type(dt.elementType) == TimestampType:
raise TypeError("Unsupported type in conversion to Arrow: " + str(dt))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the behavior before this PR?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think timestamps with localisation issue. See #20531 (comment).

asfgit pushed a commit that referenced this pull request Feb 13, 2018
…in Pandas UDFs

## What changes were proposed in this pull request?

This PR backports #20531:

It explicitly specifies supported types in Pandas UDFs.
The main change here is to add a deduplicated and explicit type checking in `returnType` ahead with documenting this; however, it happened to fix multiple things.

1. Currently, we don't support `BinaryType` in Pandas UDFs, for example, see:

    ```python
    from pyspark.sql.functions import pandas_udf
    pudf = pandas_udf(lambda x: x, "binary")
    df = spark.createDataFrame([[bytearray(1)]])
    df.select(pudf("_1")).show()
    ```
    ```
    ...
    TypeError: Unsupported type in conversion to Arrow: BinaryType
    ```

    We can document this behaviour for its guide.

2. Since we can check the return type ahead, we can fail fast before actual execution.

    ```python
    # we can fail fast at this stage because we know the schema ahead
    pandas_udf(lambda x: x, BinaryType())
    ```

## How was this patch tested?

Manually tested and unit tests for `BinaryType` and `ArrayType(...)` were added.

Author: hyukjinkwon <[email protected]>

Closes #20588 from HyukjinKwon/PR_TOOL_PICK_PR_20531_BRANCH-2.3.
@HyukjinKwon HyukjinKwon deleted the pudf-cleanup branch October 16, 2018 12:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants