-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-25461][PySpark][SQL] Add document for mismatch between return type of Pandas.Series and return type of pandas udf #22610
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #96849 has finished for PR 22610 at commit
|
|
cc @HyukjinKwon Can you take a look at this when you have time? Thanks. |
|
Test build #96853 has finished for PR 22610 at commit
|
|
Yea, I will do this week. Sorry I missed the cc in the JIRA. |
|
The idea sounds good to me from a cursory look for now. |
python/pyspark/worker.py
Outdated
| both_are_timestamp = pa.types.is_timestamp(arrow_type_of_result) and \ | ||
| pa.types.is_timestamp(arrow_return_type) | ||
| if not both_are_timestamp and arrow_return_type != arrow_type_of_result: | ||
| print("WARN: Arrow type %s of return Pandas.Series of the user-defined function's " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this appear when being run in an executor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, but as the other print usage in worker.py, I think this can be seen in the worker log?
This is also useful when testing in pyspark shell.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, it might be useful to see the warning if doing some local tests etc. My only concern is that users might be confused why they see a warning locally, but doesn't appear in logs.. Man, it would be nice to have some proper python logging for this!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm, I'm neutral on whether we should display this warning message, before we have an option to check the unsafe conversion by PyArrow. @HyukjinKwon if you are also supportive, I will remove this and leave this PR as documentation only.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes .. I support to just fix the doc first here only and make a PR separately later if needed.
|
Thanks for looking into this @viirya ! I agree that there are lots of cases where casting to another type is intentional and works fine, so this isn't a bug. The only other idea I have is to provide an option to raise an error if the type needs to be cast. That might be possible with pyarrow right now, but I'm not sure how useful it would be. |
|
Thanks @BryanCutler! Yes, this should not be a bug but is used as a warning to users that there might be some type conversion they are not noticed at first glance on the Pandas UDFs. For now the conversion is silently done behind the scene and as the case in the JIRA shows it might not be easily noticed that Pandas.Series from UDFs isn't matched with defined UDFs' return types. |
Actually sounds good to me. I think the problem is we are not quite clear about what we want when the type is mismatched in UDFs (see also #20163 for a reminder). IIRC, we rather roughly agreed upon documenting exact type match (and only allowing exact type match (?)). @viirya and @BryanCutler, how about we document that return types should be matched (we can leave a chart or map referring One additional improvement might be .. we describe that type casting behaviour is .. say .. not guaranteed but I am not sure how we can nicely document this. Probably only mentioning the type mapping is fine. |
python/pyspark/worker.py
Outdated
| print("WARN: Can't infer arrow type of Pandas.Series's dtype: %s, which might not " | ||
| "match the arrow type %s of defined return type %s" % (result.dtype, | ||
| arrow_return_type, | ||
| return_type), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would fix the indentation here tho :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok. thanks. :-)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry I may misunderstand, do you mean L113 and L114 should be aligned with L112? But after that, lint-python will complain.
|
@HyukjinKwon Thanks! I agree that having document of this is definitely useful. I will try to add it and let's see if it is ok for you. I think it is good to mention that users are responsible for ensuring return type of Pandas UDF matches defined return type. The mapping is good reference to show in the document too. |
|
So pyarrow just added an option when converting from Pandas to raise an error for unsafe casts. I'd have to try it out to see if it would prevent this case though apache/arrow#2504. It's a common option when working with Pandas, so users might be familiar with it and might be more useful to expose this as a Spark conf rather than checking the types. Btw, I'm working on fixing the float to boolean conversion here apache/arrow#2698 |
|
Thanks, @BryanCutler. WDYT about documenting the type map thing? |
|
Thanks @BryanCutler! Looks like an useful option. It is pretty new one, is it said we need to upgrade to latest PyArrow in order to use it? Since it is an option at |
I think that would help in the cases of dates/times because those can get a little confusing. For primitives, I think it's pretty straightforward, so I don't know how much that would help. Maybe it we just highlight some potential pitfalls? The problem here was that when a null value was introduced, Pandas automatically converted the data to float to insert a NaN value, then the Arrow conversion from float to bool is broken. When the data just had ints, the conversion seems ok, so it ended up giving inconsistent confusing results. Not sure what might have helped here, it's just a nasty bug :) |
Yeah, it's part of pyarrow.Array now, but will only be in the 0.11.0 release so we would have to do it after the next upgrade. |
Then I think we can wait for next upgrade to use this feature of pyarrow.Array and raise exception on unsafe cast. |
|
I think it is more reasonable to use the option when converting from Pandas to raise an error for unsafe casts. It should be better than to display warning message. Not sure how long before next upgrade, do you think we should add some words into document to explain this pitfalls especially? Or we just leave it until next upgrade? @HyukjinKwon @BryanCutler |
|
Btw, I checked our |
|
One clear thing looks adding some documentation .. |
|
So I've added a bit document for this. @HyukjinKwon @BryanCutler please check it when you have time. |
|
Test build #96935 has finished for PR 22610 at commit
|
python/pyspark/sql/functions.py
Outdated
| .. note:: The data type of returned `pandas.Series` from the user-defined functions should be | ||
| matched with defined returnType. When there is mismatch between them, it is not guaranteed | ||
| that the conversion by SparkSQL during serialization is correct at all and users might get |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe I am concerning too much .. but how about just say ..
... defined returnType (see :meth:`types.to_arrow_type` and :meth:`types.from_arrow_type`).
When there is mismatch between them, the conversion is not guaranteed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of saying "conversion is not guaranteed" which sounds like results might be arbitrary, could we say "..mismatch between them, an attempt will be made to cast the data and results should be checked for accuracy."?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
an attempt will be made to cast the data and results should be checked for accuracy."
it sounds like the casting is intentional. I think the casting logic is not that clear as far as I can tell, comparing SQL casting logic. Can we leave this not guaranteed for now and document the casting logic here instead? Does Arrow have some kind of documentation for type conversion BTW?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, as actually we don't intentionally cast the returned data.
How about:
When there is mismatch between them, Spark might do conversion on returned data.
The conversion is not guaranteed to be correct and results should be checked for accuracy by users.
HyukjinKwon
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good except the comments above.
|
Test build #97044 has finished for PR 22610 at commit
|
| matched with defined returnType (see :meth:`types.to_arrow_type` and | ||
| :meth:`types.from_arrow_type`). When there is mismatch between them, Spark might do | ||
| conversion on returned data. The conversion is not guaranteed to be correct and results | ||
| should be checked for accuracy by users. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am merging this since this describes the current status but let's make it clear and try to get rid of this note within 3.0.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I agreed. If there is next upgrade of PyArrow available, we may be able to provide the option to raise error when an unsafe cast.
|
Merged to master. |
…hon data and SQL types in normal UDFs ### What changes were proposed in this pull request? We are facing some problems about type conversions between Python data and SQL types in UDFs (Pandas UDFs as well). It's even difficult to identify the problems (see #20163 and #22610). This PR targets to internally document the type conversion table. Some of them looks buggy and we should fix them. ```python import sys import array import datetime from decimal import Decimal from pyspark.sql import Row from pyspark.sql.types import * from pyspark.sql.functions import udf if sys.version >= '3': long = int data = [ None, True, 1, long(1), "a", u"a", datetime.date(1970, 1, 1), datetime.datetime(1970, 1, 1, 0, 0), 1.0, array.array("i", [1]), [1], (1,), bytearray([65, 66, 67]), Decimal(1), {"a": 1}, Row(kwargs=1), Row("namedtuple")(1), ] types = [ BooleanType(), ByteType(), ShortType(), IntegerType(), LongType(), StringType(), DateType(), TimestampType(), FloatType(), DoubleType(), ArrayType(IntegerType()), BinaryType(), DecimalType(10, 0), MapType(StringType(), IntegerType()), StructType([StructField("_1", IntegerType())]), ] df = spark.range(1) results = [] count = 0 total = len(types) * len(data) spark.sparkContext.setLogLevel("FATAL") for t in types: result = [] for v in data: try: row = df.select(udf(lambda: v, t)()).first() ret_str = repr(row[0]) except Exception: ret_str = "X" result.append(ret_str) progress = "SQL Type: [%s]\n Python Value: [%s(%s)]\n Result Python Value: [%s]" % ( t.simpleString(), str(v), type(v).__name__, ret_str) count += 1 print("%s/%s:\n %s" % (count, total, progress)) results.append([t.simpleString()] + list(map(str, result))) schema = ["SQL Type \\ Python Value(Type)"] + list(map(lambda v: "%s(%s)" % (str(v), type(v).__name__), data)) strings = spark.createDataFrame(results, schema=schema)._jdf.showString(20, 20, False) print("\n".join(map(lambda line: " # %s # noqa" % line, strings.strip().split("\n")))) ``` This table was generated under Python 2 but the code above is Python 3 compatible as well. ## How was this patch tested? Manually tested and lint check. Closes #22655 from HyukjinKwon/SPARK-25666. Authored-by: hyukjinkwon <[email protected]> Signed-off-by: hyukjinkwon <[email protected]>
…das data and SQL types in Pandas UDFs ## What changes were proposed in this pull request? We are facing some problems about type conversions between Pandas data and SQL types in Pandas UDFs. It's even difficult to identify the problems (see #20163 and #22610). This PR targets to internally document the type conversion table. Some of them looks buggy and we should fix them. Table can be generated via the codes below: ```python from pyspark.sql.types import * from pyspark.sql.functions import pandas_udf columns = [ ('none', 'object(NoneType)'), ('bool', 'bool'), ('int8', 'int8'), ('int16', 'int16'), ('int32', 'int32'), ('int64', 'int64'), ('uint8', 'uint8'), ('uint16', 'uint16'), ('uint32', 'uint32'), ('uint64', 'uint64'), ('float64', 'float16'), ('float64', 'float32'), ('float64', 'float64'), ('date', 'datetime64[ns]'), ('tz_aware_dates', 'datetime64[ns, US/Eastern]'), ('string', 'object(string)'), ('decimal', 'object(Decimal)'), ('array', 'object(array[int32])'), ('float128', 'float128'), ('complex64', 'complex64'), ('complex128', 'complex128'), ('category', 'category'), ('tdeltas', 'timedelta64[ns]'), ] def create_dataframe(): import pandas as pd import numpy as np import decimal pdf = pd.DataFrame({ 'none': [None, None], 'bool': [True, False], 'int8': np.arange(1, 3).astype('int8'), 'int16': np.arange(1, 3).astype('int16'), 'int32': np.arange(1, 3).astype('int32'), 'int64': np.arange(1, 3).astype('int64'), 'uint8': np.arange(1, 3).astype('uint8'), 'uint16': np.arange(1, 3).astype('uint16'), 'uint32': np.arange(1, 3).astype('uint32'), 'uint64': np.arange(1, 3).astype('uint64'), 'float16': np.arange(1, 3).astype('float16'), 'float32': np.arange(1, 3).astype('float32'), 'float64': np.arange(1, 3).astype('float64'), 'float128': np.arange(1, 3).astype('float128'), 'complex64': np.arange(1, 3).astype('complex64'), 'complex128': np.arange(1, 3).astype('complex128'), 'string': list('ab'), 'array': pd.Series([np.array([1, 2, 3], dtype=np.int32), np.array([1, 2, 3], dtype=np.int32)]), 'decimal': pd.Series([decimal.Decimal('1'), decimal.Decimal('2')]), 'date': pd.date_range('19700101', periods=2).values, 'category': pd.Series(list("AB")).astype('category')}) pdf['tdeltas'] = [pdf.date.diff()[1], pdf.date.diff()[0]] pdf['tz_aware_dates'] = pd.date_range('19700101', periods=2, tz='US/Eastern') return pdf types = [ BooleanType(), ByteType(), ShortType(), IntegerType(), LongType(), FloatType(), DoubleType(), DateType(), TimestampType(), StringType(), DecimalType(10, 0), ArrayType(IntegerType()), MapType(StringType(), IntegerType()), StructType([StructField("_1", IntegerType())]), BinaryType(), ] df = spark.range(2).repartition(1) results = [] count = 0 total = len(types) * len(columns) values = [] spark.sparkContext.setLogLevel("FATAL") for t in types: result = [] for column, pandas_t in columns: v = create_dataframe()[column][0] values.append(v) try: row = df.select(pandas_udf(lambda _: create_dataframe()[column], t)(df.id)).first() ret_str = repr(row[0]) except Exception: ret_str = "X" result.append(ret_str) progress = "SQL Type: [%s]\n Pandas Value(Type): %s(%s)]\n Result Python Value: [%s]" % ( t.simpleString(), v, pandas_t, ret_str) count += 1 print("%s/%s:\n %s" % (count, total, progress)) results.append([t.simpleString()] + list(map(str, result))) schema = ["SQL Type \\ Pandas Value(Type)"] + list(map(lambda values_column: "%s(%s)" % (values_column[0], values_column[1][1]), zip(values, columns))) strings = spark.createDataFrame(results, schema=schema)._jdf.showString(20, 20, False) print("\n".join(map(lambda line: " # %s # noqa" % line, strings.strip().split("\n")))) ``` This code is compatible with both Python 2 and 3 but the table was generated under Python 2. ## How was this patch tested? Manually tested and lint check. Closes #22795 from HyukjinKwon/SPARK-25798. Authored-by: hyukjinkwon <[email protected]> Signed-off-by: Bryan Cutler <[email protected]>
…type of Pandas.Series and return type of pandas udf ## What changes were proposed in this pull request? For Pandas UDFs, we get arrow type from defined Catalyst return data type of UDFs. We use this arrow type to do serialization of data. If the defined return data type doesn't match with actual return type of Pandas.Series returned by Pandas UDFs, it has a risk to return incorrect data from Python side. Currently we don't have reliable approach to check if the data conversion is safe or not. We leave some document to notify this to users for now. When there is next upgrade of PyArrow available we can use to check it, we should add the option to check it. ## How was this patch tested? Only document change. Closes apache#22610 from viirya/SPARK-25461. Authored-by: Liang-Chi Hsieh <[email protected]> Signed-off-by: hyukjinkwon <[email protected]>
…hon data and SQL types in normal UDFs ### What changes were proposed in this pull request? We are facing some problems about type conversions between Python data and SQL types in UDFs (Pandas UDFs as well). It's even difficult to identify the problems (see apache#20163 and apache#22610). This PR targets to internally document the type conversion table. Some of them looks buggy and we should fix them. ```python import sys import array import datetime from decimal import Decimal from pyspark.sql import Row from pyspark.sql.types import * from pyspark.sql.functions import udf if sys.version >= '3': long = int data = [ None, True, 1, long(1), "a", u"a", datetime.date(1970, 1, 1), datetime.datetime(1970, 1, 1, 0, 0), 1.0, array.array("i", [1]), [1], (1,), bytearray([65, 66, 67]), Decimal(1), {"a": 1}, Row(kwargs=1), Row("namedtuple")(1), ] types = [ BooleanType(), ByteType(), ShortType(), IntegerType(), LongType(), StringType(), DateType(), TimestampType(), FloatType(), DoubleType(), ArrayType(IntegerType()), BinaryType(), DecimalType(10, 0), MapType(StringType(), IntegerType()), StructType([StructField("_1", IntegerType())]), ] df = spark.range(1) results = [] count = 0 total = len(types) * len(data) spark.sparkContext.setLogLevel("FATAL") for t in types: result = [] for v in data: try: row = df.select(udf(lambda: v, t)()).first() ret_str = repr(row[0]) except Exception: ret_str = "X" result.append(ret_str) progress = "SQL Type: [%s]\n Python Value: [%s(%s)]\n Result Python Value: [%s]" % ( t.simpleString(), str(v), type(v).__name__, ret_str) count += 1 print("%s/%s:\n %s" % (count, total, progress)) results.append([t.simpleString()] + list(map(str, result))) schema = ["SQL Type \\ Python Value(Type)"] + list(map(lambda v: "%s(%s)" % (str(v), type(v).__name__), data)) strings = spark.createDataFrame(results, schema=schema)._jdf.showString(20, 20, False) print("\n".join(map(lambda line: " # %s # noqa" % line, strings.strip().split("\n")))) ``` This table was generated under Python 2 but the code above is Python 3 compatible as well. ## How was this patch tested? Manually tested and lint check. Closes apache#22655 from HyukjinKwon/SPARK-25666. Authored-by: hyukjinkwon <[email protected]> Signed-off-by: hyukjinkwon <[email protected]>
…das data and SQL types in Pandas UDFs ## What changes were proposed in this pull request? We are facing some problems about type conversions between Pandas data and SQL types in Pandas UDFs. It's even difficult to identify the problems (see apache#20163 and apache#22610). This PR targets to internally document the type conversion table. Some of them looks buggy and we should fix them. Table can be generated via the codes below: ```python from pyspark.sql.types import * from pyspark.sql.functions import pandas_udf columns = [ ('none', 'object(NoneType)'), ('bool', 'bool'), ('int8', 'int8'), ('int16', 'int16'), ('int32', 'int32'), ('int64', 'int64'), ('uint8', 'uint8'), ('uint16', 'uint16'), ('uint32', 'uint32'), ('uint64', 'uint64'), ('float64', 'float16'), ('float64', 'float32'), ('float64', 'float64'), ('date', 'datetime64[ns]'), ('tz_aware_dates', 'datetime64[ns, US/Eastern]'), ('string', 'object(string)'), ('decimal', 'object(Decimal)'), ('array', 'object(array[int32])'), ('float128', 'float128'), ('complex64', 'complex64'), ('complex128', 'complex128'), ('category', 'category'), ('tdeltas', 'timedelta64[ns]'), ] def create_dataframe(): import pandas as pd import numpy as np import decimal pdf = pd.DataFrame({ 'none': [None, None], 'bool': [True, False], 'int8': np.arange(1, 3).astype('int8'), 'int16': np.arange(1, 3).astype('int16'), 'int32': np.arange(1, 3).astype('int32'), 'int64': np.arange(1, 3).astype('int64'), 'uint8': np.arange(1, 3).astype('uint8'), 'uint16': np.arange(1, 3).astype('uint16'), 'uint32': np.arange(1, 3).astype('uint32'), 'uint64': np.arange(1, 3).astype('uint64'), 'float16': np.arange(1, 3).astype('float16'), 'float32': np.arange(1, 3).astype('float32'), 'float64': np.arange(1, 3).astype('float64'), 'float128': np.arange(1, 3).astype('float128'), 'complex64': np.arange(1, 3).astype('complex64'), 'complex128': np.arange(1, 3).astype('complex128'), 'string': list('ab'), 'array': pd.Series([np.array([1, 2, 3], dtype=np.int32), np.array([1, 2, 3], dtype=np.int32)]), 'decimal': pd.Series([decimal.Decimal('1'), decimal.Decimal('2')]), 'date': pd.date_range('19700101', periods=2).values, 'category': pd.Series(list("AB")).astype('category')}) pdf['tdeltas'] = [pdf.date.diff()[1], pdf.date.diff()[0]] pdf['tz_aware_dates'] = pd.date_range('19700101', periods=2, tz='US/Eastern') return pdf types = [ BooleanType(), ByteType(), ShortType(), IntegerType(), LongType(), FloatType(), DoubleType(), DateType(), TimestampType(), StringType(), DecimalType(10, 0), ArrayType(IntegerType()), MapType(StringType(), IntegerType()), StructType([StructField("_1", IntegerType())]), BinaryType(), ] df = spark.range(2).repartition(1) results = [] count = 0 total = len(types) * len(columns) values = [] spark.sparkContext.setLogLevel("FATAL") for t in types: result = [] for column, pandas_t in columns: v = create_dataframe()[column][0] values.append(v) try: row = df.select(pandas_udf(lambda _: create_dataframe()[column], t)(df.id)).first() ret_str = repr(row[0]) except Exception: ret_str = "X" result.append(ret_str) progress = "SQL Type: [%s]\n Pandas Value(Type): %s(%s)]\n Result Python Value: [%s]" % ( t.simpleString(), v, pandas_t, ret_str) count += 1 print("%s/%s:\n %s" % (count, total, progress)) results.append([t.simpleString()] + list(map(str, result))) schema = ["SQL Type \\ Pandas Value(Type)"] + list(map(lambda values_column: "%s(%s)" % (values_column[0], values_column[1][1]), zip(values, columns))) strings = spark.createDataFrame(results, schema=schema)._jdf.showString(20, 20, False) print("\n".join(map(lambda line: " # %s # noqa" % line, strings.strip().split("\n")))) ``` This code is compatible with both Python 2 and 3 but the table was generated under Python 2. ## How was this patch tested? Manually tested and lint check. Closes apache#22795 from HyukjinKwon/SPARK-25798. Authored-by: hyukjinkwon <[email protected]> Signed-off-by: Bryan Cutler <[email protected]>
What changes were proposed in this pull request?
For Pandas UDFs, we get arrow type from defined Catalyst return data type of UDFs. We use this arrow type to do serialization of data. If the defined return data type doesn't match with actual return type of Pandas.Series returned by Pandas UDFs, it has a risk to return incorrect data from Python side.
Currently we don't have reliable approach to check if the data conversion is safe or not. We leave some document to notify this to users for now. When there is next upgrade of PyArrow available we can use to check it, we should add the option to check it.
How was this patch tested?
Only document change.