-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-16542][SQL][PYSPARK] Fix bugs about types that result an array of null when creating DataFrame using python #18444
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
a127486
70131f3
505e819
05979ca
5cd817a
cd2ec6b
527d969
82223c0
0a967e2
2059435
58b120c
b91dd55
bd8e111
930d16b
1b1c419
c4b09f5
1a6b243
e9f02b0
fe035a6
7b05669
cca2d6a
37e28a4
b5af17a
9bcf2e1
dfca8eb
4af97af
aaf66f1
385ab1f
8352dab
9612f77
f2774c6
5ebc33f
af9a411
5e3128c
9ee439f
1ca696a
f15e061
7522aff
e0cced0
788b0da
a87bc9e
6c49a48
ae1aef0
cafa5fd
88091ea
cdc7257
a340745
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -55,13 +55,12 @@ private[spark] object SerDeUtil extends Logging { | |
| // {'d', sizeof(double), d_getitem, d_setitem}, | ||
| // {'\0', 0, 0, 0} /* Sentinel */ | ||
| // }; | ||
| // TODO: support Py_UNICODE with 2 bytes | ||
| val machineCodes: Map[Char, Int] = if (ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN)) { | ||
| Map('c' -> 1, 'B' -> 0, 'b' -> 1, 'H' -> 3, 'h' -> 5, 'I' -> 7, 'i' -> 9, | ||
| Map('B' -> 0, 'b' -> 1, 'H' -> 3, 'h' -> 5, 'I' -> 7, 'i' -> 9, | ||
| 'L' -> 11, 'l' -> 13, 'f' -> 15, 'd' -> 17, 'u' -> 21 | ||
| ) | ||
| } else { | ||
| Map('c' -> 1, 'B' -> 0, 'b' -> 1, 'H' -> 2, 'h' -> 4, 'I' -> 6, 'i' -> 8, | ||
| Map('B' -> 0, 'b' -> 1, 'H' -> 2, 'h' -> 4, 'I' -> 6, 'i' -> 8, | ||
| 'L' -> 10, 'l' -> 12, 'f' -> 14, 'd' -> 16, 'u' -> 20 | ||
| ) | ||
| } | ||
|
|
@@ -72,7 +71,20 @@ private[spark] object SerDeUtil extends Logging { | |
| val typecode = args(0).asInstanceOf[String].charAt(0) | ||
| // This must be ISO 8859-1 / Latin 1, not UTF-8, to interoperate correctly | ||
| val data = args(1).asInstanceOf[String].getBytes(StandardCharsets.ISO_8859_1) | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can anyone explain why |
||
| construct(typecode, machineCodes(typecode), data) | ||
| if (typecode == 'c') { | ||
| // It seems like the pickle of pypy uses the similar protocol to Python 2.6, which uses | ||
| // a string for array data instead of list as Python 2.7, and handles an array of | ||
| // typecode 'c' as 1-byte character. | ||
| val result = new Array[Char](data.length) | ||
| var i = 0 | ||
| while (i < data.length) { | ||
| result(i) = data(i).toChar | ||
| i += 1 | ||
| } | ||
| result | ||
| } else { | ||
| construct(typecode, machineCodes(typecode), data) | ||
| } | ||
| } else if (args.length == 2 && args(0) == "l") { | ||
| // On Python 2, an array of typecode 'l' should be handled as long rather than int. | ||
| val values = args(1).asInstanceOf[JArrayList[_]] | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -30,8 +30,10 @@ | |
| import functools | ||
| import time | ||
| import datetime | ||
|
|
||
| import array | ||
| import ctypes | ||
| import py4j | ||
|
|
||
| try: | ||
| import xmlrunner | ||
| except ImportError: | ||
|
|
@@ -58,6 +60,8 @@ | |
| from pyspark.sql import SparkSession, SQLContext, HiveContext, Column, Row | ||
| from pyspark.sql.types import * | ||
| from pyspark.sql.types import UserDefinedType, _infer_type, _make_type_verifier | ||
| from pyspark.sql.types import _array_signed_int_typecode_ctype_mappings, _array_type_mappings | ||
| from pyspark.sql.types import _array_unsigned_int_typecode_ctype_mappings | ||
| from pyspark.tests import QuietTest, ReusedPySparkTestCase, SparkSubmitTests | ||
| from pyspark.sql.functions import UserDefinedFunction, sha2, lit | ||
| from pyspark.sql.window import Window | ||
|
|
@@ -2312,6 +2316,97 @@ def test_BinaryType_serialization(self): | |
| df = self.spark.createDataFrame(data, schema=schema) | ||
| df.collect() | ||
|
|
||
| # test for SPARK-16542 | ||
| def test_array_types(self): | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Minor suggestions: @@ -2327,23 +2327,29 @@ class SQLTests(ReusedPySparkTestCase):
# See: https://docs.python.org/2/library/array.html
def assertCollectSuccess(typecode, value):
- a = array.array(typecode, [value])
- row = Row(myarray=a)
+ row = Row(myarray=array.array(typecode, [value]))
df = self.spark.createDataFrame([row])
- self.assertEqual(df.collect()[0]["myarray"][0], value)
+ self.assertEqual(df.first()["myarray"][0], value)
- supported_types = []
-
- # test string types
+ # supported string types
+ #
+ # blabla... "u" will be removed in python 4 blabla...
+ # and "c" not supported in python 3 blabla ...
+ supported_string_types = []
if sys.version_info[0] < 4:
- supported_types += ['u']
+ supported_string_types += ['u']
+ # test unicode
assertCollectSuccess('u', u"a")
if sys.version_info[0] < 3:
- supported_types += ['c']
+ supported_string_types += ['c']
+ # test string
assertCollectSuccess('c', "a")
+ # supported float and double
+ #
+ # tests float max min blabla
+ supported_fractional_types = ['f', 'd']
# test float and double, assuming IEEE 754 floating-point format
- supported_types += ['f', 'd']
assertCollectSuccess('f', ctypes.c_float(1e+38).value)
assertCollectSuccess('f', ctypes.c_float(1e-38).value)
assertCollectSuccess('f', ctypes.c_float(1.123456).value)
@@ -2351,33 +2357,48 @@ class SQLTests(ReusedPySparkTestCase):
assertCollectSuccess('d', sys.float_info.min)
assertCollectSuccess('d', sys.float_info.epsilon)
+ # supported int types
+ #
+ # blabla .. only supported int types.. blabla..
+ supported_int_types = list(
+ set(_array_int_typecode_ctype_mappings.keys())
+ .intersection(set(_array_type_mappings.keys())))
# test int types
- supported_int = list(set(_array_int_typecode_ctype_mappings.keys()).
- intersection(set(_array_type_mappings.keys())))
- supported_types += supported_int
- for i in supported_int:
- ctype = _array_int_typecode_ctype_mappings[i]
- if i.isupper(): # unsigned
- assertCollectSuccess(i, 2 ** (ctypes.sizeof(ctype) * 8) - 1)
- else: # signed
+ for t in supported_int_types:
+ ctype = _array_int_typecode_ctype_mappings[t]
+ if t.isupper():
+ # test unsigned int types
+ assertCollectSuccess(t, 2 ** (ctypes.sizeof(ctype) * 8) - 1)
+ else:
+ # test signed int types
max_val = 2 ** (ctypes.sizeof(ctype) * 8 - 1)
- assertCollectSuccess(i, max_val - 1)
- assertCollectSuccess(i, -max_val)
-
- # make sure that the test case cover all supported types
+ assertCollectSuccess(t, max_val - 1)
+ assertCollectSuccess(t, -max_val)
+
+ # all supported types
+ #
+ # Make sure all the supported types are blabla ...
+ supported_types = (supported_string_types +
+ supported_fractional_types +
+ supported_int_types)
+ # test these are all supported types
self.assertEqual(set(supported_types), set(_array_type_mappings.keys()))
- # test unsupported types
+ # all unsupported types
+ #
+ # ... ... types are not supported in python 2/3 blabla.
if sys.version_info[0] < 3:
- all_type_codes = set(['c', 'b', 'B', 'u', 'h', 'H', 'i', 'I', 'l', 'L', 'f', 'd'])
+ all_types = set(['c', 'b', 'B', 'u', 'h', 'H', 'i', 'I', 'l', 'L', 'f', 'd'])
else:
- all_type_codes = set(array.typecodes)
- unsupported_types = all_type_codes - set(supported_types)
+ all_types = set(array.typecodes)
+ unsupported_types = all_types - set(supported_types)
+ # test unsupported types
for t in unsupported_types:
with self.assertRaises(TypeError):
a = array.array(t)
self.spark.createDataFrame([Row(myarray=a)]).collect() |
||
| # This test need to make sure that the Scala type selected is at least | ||
| # as large as the python's types. This is necessary because python's | ||
| # array types depend on C implementation on the machine. Therefore there | ||
| # is no machine independent correspondence between python's array types | ||
| # and Scala types. | ||
| # See: https://docs.python.org/2/library/array.html | ||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @ueshin The answer to your question is explained here, a couple lines of comments I just added. |
||
|
|
||
| def assertCollectSuccess(typecode, value): | ||
| row = Row(myarray=array.array(typecode, [value])) | ||
| df = self.spark.createDataFrame([row]) | ||
| self.assertEqual(df.first()["myarray"][0], value) | ||
|
|
||
| # supported string types | ||
| # | ||
| # String types in python's array are "u" for Py_UNICODE and "c" for char. | ||
| # "u" will be removed in python 4, and "c" is not supported in python 3. | ||
| supported_string_types = [] | ||
| if sys.version_info[0] < 4: | ||
| supported_string_types += ['u'] | ||
| # test unicode | ||
| assertCollectSuccess('u', u'a') | ||
| if sys.version_info[0] < 3: | ||
| supported_string_types += ['c'] | ||
| # test string | ||
| assertCollectSuccess('c', 'a') | ||
|
|
||
| # supported float and double | ||
| # | ||
| # Test max, min, and precision for float and double, assuming IEEE 754 | ||
| # floating-point format. | ||
| supported_fractional_types = ['f', 'd'] | ||
| assertCollectSuccess('f', ctypes.c_float(1e+38).value) | ||
| assertCollectSuccess('f', ctypes.c_float(1e-38).value) | ||
| assertCollectSuccess('f', ctypes.c_float(1.123456).value) | ||
| assertCollectSuccess('d', sys.float_info.max) | ||
| assertCollectSuccess('d', sys.float_info.min) | ||
| assertCollectSuccess('d', sys.float_info.epsilon) | ||
|
|
||
| # supported signed int types | ||
| # | ||
| # The size of C types changes with implementation, we need to make sure | ||
| # that there is no overflow error on the platform running this test. | ||
| supported_signed_int_types = list( | ||
| set(_array_signed_int_typecode_ctype_mappings.keys()) | ||
| .intersection(set(_array_type_mappings.keys()))) | ||
| for t in supported_signed_int_types: | ||
| ctype = _array_signed_int_typecode_ctype_mappings[t] | ||
| max_val = 2 ** (ctypes.sizeof(ctype) * 8 - 1) | ||
| assertCollectSuccess(t, max_val - 1) | ||
| assertCollectSuccess(t, -max_val) | ||
|
|
||
| # supported unsigned int types | ||
| # | ||
| # JVM does not have unsigned types. We need to be very careful to make | ||
| # sure that there is no overflow error. | ||
| supported_unsigned_int_types = list( | ||
| set(_array_unsigned_int_typecode_ctype_mappings.keys()) | ||
| .intersection(set(_array_type_mappings.keys()))) | ||
| for t in supported_unsigned_int_types: | ||
| ctype = _array_unsigned_int_typecode_ctype_mappings[t] | ||
| assertCollectSuccess(t, 2 ** (ctypes.sizeof(ctype) * 8) - 1) | ||
|
|
||
| # all supported types | ||
| # | ||
| # Make sure the types tested above: | ||
| # 1. are all supported types | ||
| # 2. cover all supported types | ||
| supported_types = (supported_string_types + | ||
| supported_fractional_types + | ||
| supported_signed_int_types + | ||
| supported_unsigned_int_types) | ||
| self.assertEqual(set(supported_types), set(_array_type_mappings.keys())) | ||
|
|
||
| # all unsupported types | ||
| # | ||
| # Keys in _array_type_mappings is a complete list of all supported types, | ||
| # and types not in _array_type_mappings are considered unsupported. | ||
| # `array.typecodes` are not supported in python 2. | ||
| if sys.version_info[0] < 3: | ||
| all_types = set(['c', 'b', 'B', 'u', 'h', 'H', 'i', 'I', 'l', 'L', 'f', 'd']) | ||
| else: | ||
| all_types = set(array.typecodes) | ||
| unsupported_types = all_types - set(supported_types) | ||
| # test unsupported types | ||
| for t in unsupported_types: | ||
| with self.assertRaises(TypeError): | ||
| a = array.array(t) | ||
| self.spark.createDataFrame([Row(myarray=a)]).collect() | ||
|
|
||
| def test_bucketed_write(self): | ||
| data = [ | ||
| (1, "foo", 3.0), (2, "foo", 5.0), | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,6 +24,7 @@ | |
| import re | ||
| import base64 | ||
| from array import array | ||
| import ctypes | ||
|
|
||
| if sys.version >= "3": | ||
| long = int | ||
|
|
@@ -915,6 +916,93 @@ def _parse_datatype_json_value(json_value): | |
| long: LongType, | ||
| }) | ||
|
|
||
| # Mapping Python array types to Spark SQL DataType | ||
| # We should be careful here. The size of these types in python depends on C | ||
| # implementation. We need to make sure that this conversion does not lose any | ||
| # precision. Also, JVM only support signed types, when converting unsigned types, | ||
| # keep in mind that it required 1 more bit when stored as singed types. | ||
| # | ||
| # Reference for C integer size, see: | ||
| # ISO/IEC 9899:201x specification, chapter 5.2.4.2.1 Sizes of integer types <limits.h>. | ||
| # Reference for python array typecode, see: | ||
| # https://docs.python.org/2/library/array.html | ||
| # https://docs.python.org/3.6/library/array.html | ||
| # Reference for JVM's supported integral types: | ||
| # http://docs.oracle.com/javase/specs/jvms/se8/html/jvms-2.html#jvms-2.3.1 | ||
|
|
||
| _array_signed_int_typecode_ctype_mappings = { | ||
| 'b': ctypes.c_byte, | ||
| 'h': ctypes.c_short, | ||
| 'i': ctypes.c_int, | ||
| 'l': ctypes.c_long, | ||
| } | ||
|
|
||
| _array_unsigned_int_typecode_ctype_mappings = { | ||
| 'B': ctypes.c_ubyte, | ||
| 'H': ctypes.c_ushort, | ||
| 'I': ctypes.c_uint, | ||
| 'L': ctypes.c_ulong | ||
| } | ||
|
|
||
|
|
||
| def _int_size_to_type(size): | ||
| """ | ||
| Return the Catalyst datatype from the size of integers. | ||
| """ | ||
| if size <= 8: | ||
| return ByteType | ||
| if size <= 16: | ||
| return ShortType | ||
| if size <= 32: | ||
| return IntegerType | ||
| if size <= 64: | ||
| return LongType | ||
|
|
||
| # The list of all supported array typecodes is stored here | ||
| _array_type_mappings = { | ||
| # Warning: Actual properties for float and double in C is not specified in C. | ||
| # On almost every system supported by both python and JVM, they are IEEE 754 | ||
| # single-precision binary floating-point format and IEEE 754 double-precision | ||
| # binary floating-point format. And we do assume the same thing here for now. | ||
| 'f': FloatType, | ||
| 'd': DoubleType | ||
| } | ||
|
|
||
| # compute array typecode mappings for signed integer types | ||
| for _typecode in _array_signed_int_typecode_ctype_mappings.keys(): | ||
| size = ctypes.sizeof(_array_signed_int_typecode_ctype_mappings[_typecode]) * 8 | ||
| dt = _int_size_to_type(size) | ||
| if dt is not None: | ||
| _array_type_mappings[_typecode] = dt | ||
|
|
||
| # compute array typecode mappings for unsigned integer types | ||
| for _typecode in _array_unsigned_int_typecode_ctype_mappings.keys(): | ||
| # JVM does not have unsigned types, so use signed types that is at least 1 | ||
| # bit larger to store | ||
| size = ctypes.sizeof(_array_unsigned_int_typecode_ctype_mappings[_typecode]) * 8 + 1 | ||
| dt = _int_size_to_type(size) | ||
| if dt is not None: | ||
| _array_type_mappings[_typecode] = dt | ||
|
|
||
| # Type code 'u' in Python's array is deprecated since version 3.3, and will be | ||
| # removed in version 4.0. See: https://docs.python.org/3/library/array.html | ||
| if sys.version_info[0] < 4: | ||
| _array_type_mappings['u'] = StringType | ||
|
|
||
| # Type code 'c' are only available at python 2 | ||
| if sys.version_info[0] < 3: | ||
| _array_type_mappings['c'] = StringType | ||
|
|
||
| # SPARK-21465: | ||
| # In python2, array of 'L' happened to be mistakenly partially supported. To | ||
| # avoid breaking user's code, we should keep this partial support. Below is a | ||
| # dirty hacking to keep this partial support and make the unit test passes | ||
| import platform | ||
| if sys.version_info[0] < 3 and platform.python_implementation() != 'PyPy': | ||
| if 'L' not in _array_type_mappings.keys(): | ||
| _array_type_mappings['L'] = LongType | ||
| _array_unsigned_int_typecode_ctype_mappings['L'] = ctypes.c_uint | ||
|
|
||
|
|
||
| def _infer_type(obj): | ||
| """Infer the DataType from obj | ||
|
|
@@ -938,12 +1026,17 @@ def _infer_type(obj): | |
| return MapType(_infer_type(key), _infer_type(value), True) | ||
| else: | ||
| return MapType(NullType(), NullType(), True) | ||
| elif isinstance(obj, (list, array)): | ||
| elif isinstance(obj, list): | ||
| for v in obj: | ||
| if v is not None: | ||
| return ArrayType(_infer_type(obj[0]), True) | ||
| else: | ||
| return ArrayType(NullType(), True) | ||
| elif isinstance(obj, array): | ||
| if obj.typecode in _array_type_mappings: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we do explicitly
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, I take it back. This is a possibly hot path.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think from random import random,shuffle
from time import time
N = 10000000
d = {}
for i in range(N):
d[i] = random()
tests = list(range(2*N))
shuffle(tests)
time1 = time()
dummy = 0
for i in tests:
if i in d:
dummy += 1
time2 = time()
dummy = 0
for i in tests:
if i in d.keys():
dummy += 1
time3 = time()
print(time2-time1)
print(time3-time2)gives on my MacBook Pro About 10% performance difference.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yea. It should be O(1) vs O(n) IIRC. Thanks for testing out.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. And also keys() makes a copy of a key list in python 2 whereas in operator against a dict directly calls |
||
| return ArrayType(_array_type_mappings[obj.typecode](), False) | ||
| else: | ||
| raise TypeError("not supported type: array(%s)" % obj.typecode) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about failling back to type inference in this case?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there any reason to do so? I don't think array with unsupported typecode will be correctly serialized or deserialized(if it will, why not add it to supported list?). In this case, it would be better to raise an TypeError and let the user to pick another type.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay. To double check, it covers all the cases we supported before in Spark? If it can be for sure, I am fine with as is. I was trying to leave a sign-off for this reason - fixing a case we never be reachable before ('c' type in a specific Python version) and a bug (assigning the correct type for
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we fall back to
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, this worth a double check. Let me do some test on old spark.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How about doing the following?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good to me.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. OK. Let me do this. If it is possible, I would like to finish this PR before I travel.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sounds good to me, too. Let's discuss it in the next pr. |
||
| else: | ||
| try: | ||
| return _infer_schema(obj) | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Last question to double check. So, 'c' did not ever work in a specific Python version but this PR fixes it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cwas supported by spark 2.2.0:This support I think was because array was infered the same way as list. But after we make changes on the type infer of array, we have to change this accordingly to bring it back to work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, yea. I meant to say
'c' -> 1was not ever reachable for sure.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes