From a127486d59528eae452dcbcc2ccfb68fdd7769b7 Mon Sep 17 00:00:00 2001 From: Xiang Gao Date: Fri, 8 Jul 2016 20:58:14 -0400 Subject: [PATCH 01/37] use array.typecode to infer type Python's array has more type than python it self, for example python only has float while array support 'f' (float) and 'd' (double) Switching to array.typecode helps spark make a better inference For example, for the code: from pyspark.sql.types import _infer_type from array import array a = array('f',[1,2,3,4,5,6]) _infer_type(a) We will get ArrayType(DoubleType,true) before change, but ArrayType(FloatType,true) after change --- python/pyspark/sql/types.py | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index a3679873e1d8..f048bf6d0b95 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -926,6 +926,23 @@ def _parse_datatype_json_value(json_value): datetime.time: TimestampType, } +# Mapping Python array types to Spark SQL DataType +_array_type_mappings = { + 'b': ByteType, + 'B': ShortType, + 'u': StringType, + 'h': ShortType, + 'H': IntegerType, + 'i': IntegerType, + 'I': LongType, + 'l': LongType, + 'L': LongType, + 'q': LongType, + 'Q': LongType, + 'f': FloatType, + 'd': DoubleType +} + if sys.version < "3": _type_mappings.update({ unicode: StringType, @@ -955,12 +972,14 @@ def _infer_type(obj): return MapType(_infer_type(key), _infer_type(value), True) else: return MapType(NullType(), NullType(), True) - elif isinstance(obj, (list, array)): + elif isinstance(obj, list): for v in obj: if v is not None: return ArrayType(_infer_type(obj[0]), True) else: return ArrayType(NullType(), True) + elif isinstance(obj, array): + return ArrayType(_array_type_mappings[obj.typecode](), True) else: try: return _infer_schema(obj) From 05979ca6eabf723cf3849ec2bf6f6e9de26cb138 Mon Sep 17 00:00:00 2001 From: Xiang Gao Date: Thu, 14 Jul 2016 16:07:12 +0800 Subject: [PATCH 02/37] add case (c: Float, FloatType) to fromJava --- .../apache/spark/sql/execution/python/EvaluatePython.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala index cf68ed4ec36a..7a86695a8ca9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala @@ -118,6 +118,10 @@ object EvaluatePython { case (c: Double, DoubleType) => c + case (c: Float, FloatType) => c + + case (c: Float, DoubleType) => c.toDouble + case (c: java.math.BigDecimal, dt: DecimalType) => Decimal(c, dt.precision, dt.scale) case (c: Int, DateType) => c From cd2ec6bc707fb6e7255b3a6a6822c3667866c63c Mon Sep 17 00:00:00 2001 From: Xiang Gao Date: Sun, 16 Oct 2016 22:44:48 -0400 Subject: [PATCH 03/37] add test for array in dataframe --- python/pyspark/sql/tests.py | 55 +++++++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index a8ca386e1ce3..227f74155e69 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -30,6 +30,8 @@ import functools import time import datetime +import array +import math import py4j try: @@ -1630,6 +1632,59 @@ def test_cache(self): "does_not_exist", lambda: spark.catalog.uncacheTable("does_not_exist")) + # test for SPARK-16542 + def test_array_types(): + int_types = set([ 'b', 'B', 'h', 'H', 'i', 'I', 'l', 'L', 'q', 'Q' ]) + float_types = set([ 'f', 'd' ]) + unsupported_types = set(array.typecodes) - int_types - float_types + def collected(a): + row = Row(myarray=a) + rdd = self.sc.parallelize([ row ]) + df = self.spark.createDataFrame(rdd) + return df.collect()[0]["myarray"][0] + # test whether pyspark can correctly handle int types + for t in int_types: + is_unsigned = t.isupper() + # test positive numbers + a = array.array(t,[1]) + while True: + try: + self.assertEqual(collected(a),a[0]) + a[0] *= 2 + except OverflowError: + break + # test negative numbers + if not is_unsigned: + a = array.array(t,[-1]) + while True: + try: + self.assertEqual(collected(a),a[0]) + a[0] *= 2 + except OverflowError: + break + # test whether pyspark can correctly handle float types + for t in float_types: + # test upper bound and precision + a = array.array(t,[1.0]) + while not math.isinf(a[0]): + self.assertEqual(collected(a),a[0]) + a[0] *= 2 + a[0] += 1 + # test lower bound + a = array.array(t,[1.0]) + while a[0]!=0: + self.assertEqual(collected(a),a[0]) + a[0] /= 2 + # test whether pyspark can correctly handle unsupported types + for t in unsupported_types: + try: + c = collected(a) + self.assertTrue(False) # if no exception thrown, fail the test + except TypeError: + pass # catch the expected exception and do nothing + except: + self.assertTrue(False) # if incorrect exception thrown, fail the test + class HiveSparkSubmitTests(SparkSubmitTests): From 82223c02082793b899c7eeca70f7bbfcea516c28 Mon Sep 17 00:00:00 2001 From: Xiang Gao Date: Sun, 16 Oct 2016 23:35:47 -0400 Subject: [PATCH 04/37] set unsigned types and Py_UNICODE as unsupported --- python/pyspark/sql/tests.py | 4 ++-- python/pyspark/sql/types.py | 12 ++++-------- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index d2adb66d12cf..108f1755b798 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -1738,8 +1738,8 @@ def test_BinaryType_serialization(self): df.collect() # test for SPARK-16542 - def test_array_types(): - int_types = set([ 'b', 'B', 'h', 'H', 'i', 'I', 'l', 'L', 'q', 'Q' ]) + def test_array_types(self): + int_types = set([ 'b', 'h', 'i', 'l' ]) float_types = set([ 'f', 'd' ]) unsupported_types = set(array.typecodes) - int_types - float_types def collected(a): diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index fa67a6ff257b..e8aeef1a51b7 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -932,16 +932,9 @@ def _parse_datatype_json_value(json_value): # Mapping Python array types to Spark SQL DataType _array_type_mappings = { 'b': ByteType, - 'B': ShortType, - 'u': StringType, 'h': ShortType, - 'H': IntegerType, 'i': IntegerType, - 'I': LongType, 'l': LongType, - 'L': LongType, - 'q': LongType, - 'Q': LongType, 'f': FloatType, 'd': DoubleType } @@ -982,7 +975,10 @@ def _infer_type(obj): else: return ArrayType(NullType(), True) elif isinstance(obj, array): - return ArrayType(_array_type_mappings[obj.typecode](), True) + if obj.typecode in _array_type_mappings: + return ArrayType(_array_type_mappings[obj.typecode](), True) + else: + raise TypeError("not supported type: array(%s)" % obj.typecode) else: try: return _infer_schema(obj) From 0a967e280b3250bf7217e61905ad28f010c4ed40 Mon Sep 17 00:00:00 2001 From: Xiang Gao Date: Mon, 17 Oct 2016 13:46:35 -0400 Subject: [PATCH 05/37] fix code style --- python/pyspark/sql/tests.py | 32 +++++++++++++++++--------------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 108f1755b798..6aa293fc9313 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -1739,56 +1739,58 @@ def test_BinaryType_serialization(self): # test for SPARK-16542 def test_array_types(self): - int_types = set([ 'b', 'h', 'i', 'l' ]) - float_types = set([ 'f', 'd' ]) + int_types = set(['b', 'h', 'i', 'l']) + float_types = set(['f', 'd']) unsupported_types = set(array.typecodes) - int_types - float_types + def collected(a): row = Row(myarray=a) - rdd = self.sc.parallelize([ row ]) + rdd = self.sc.parallelize([row]) df = self.spark.createDataFrame(rdd) return df.collect()[0]["myarray"][0] # test whether pyspark can correctly handle int types for t in int_types: is_unsigned = t.isupper() # test positive numbers - a = array.array(t,[1]) + a = array.array(t, [1]) while True: try: - self.assertEqual(collected(a),a[0]) + self.assertEqual(collected(a), a[0]) a[0] *= 2 except OverflowError: break # test negative numbers if not is_unsigned: - a = array.array(t,[-1]) + a = array.array(t, [-1]) while True: try: - self.assertEqual(collected(a),a[0]) + self.assertEqual(collected(a), a[0]) a[0] *= 2 except OverflowError: break # test whether pyspark can correctly handle float types for t in float_types: # test upper bound and precision - a = array.array(t,[1.0]) + a = array.array(t, [1.0]) while not math.isinf(a[0]): - self.assertEqual(collected(a),a[0]) + self.assertEqual(collected(a), a[0]) a[0] *= 2 a[0] += 1 # test lower bound - a = array.array(t,[1.0]) - while a[0]!=0: - self.assertEqual(collected(a),a[0]) + a = array.array(t, [1.0]) + while a[0] != 0: + self.assertEqual(collected(a), a[0]) a[0] /= 2 # test whether pyspark can correctly handle unsupported types for t in unsupported_types: try: c = collected(a) - self.assertTrue(False) # if no exception thrown, fail the test + self.assertTrue(False) # if no exception thrown, fail the test except TypeError: - pass # catch the expected exception and do nothing + pass # catch the expected exception and do nothing except: - self.assertTrue(False) # if incorrect exception thrown, fail the test + # if incorrect exception thrown, fail the test + self.assertTrue(False) class HiveSparkSubmitTests(SparkSubmitTests): From 2059435b45ed1f6337a4f935adcd029084cfec91 Mon Sep 17 00:00:00 2001 From: Xiang Gao Date: Mon, 17 Oct 2016 20:11:05 -0400 Subject: [PATCH 06/37] fix the same problem for byte and short --- python/pyspark/sql/tests.py | 17 ++++++++--------- .../sql/execution/python/EvaluatePython.scala | 14 ++++++++++---- 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 6aa293fc9313..6dbdb3adb295 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -1750,7 +1750,6 @@ def collected(a): return df.collect()[0]["myarray"][0] # test whether pyspark can correctly handle int types for t in int_types: - is_unsigned = t.isupper() # test positive numbers a = array.array(t, [1]) while True: @@ -1760,14 +1759,13 @@ def collected(a): except OverflowError: break # test negative numbers - if not is_unsigned: - a = array.array(t, [-1]) - while True: - try: - self.assertEqual(collected(a), a[0]) - a[0] *= 2 - except OverflowError: - break + a = array.array(t, [-1]) + while True: + try: + self.assertEqual(collected(a), a[0]) + a[0] *= 2 + except OverflowError: + break # test whether pyspark can correctly handle float types for t in float_types: # test upper bound and precision @@ -1784,6 +1782,7 @@ def collected(a): # test whether pyspark can correctly handle unsupported types for t in unsupported_types: try: + a = array.array(t) c = collected(a) self.assertTrue(False) # if no exception thrown, fail the test except TypeError: diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala index 25e8cf1b29dc..90726a2e0136 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala @@ -91,25 +91,31 @@ object EvaluatePython { case (c: Boolean, BooleanType) => c + case (c: Byte, ByteType) => c + case (c: Short, ByteType) => c.toByte case (c: Int, ByteType) => c.toByte case (c: Long, ByteType) => c.toByte + case (c: Byte, ShortType) => c.toShort + case (c: Short, ShortType) => c case (c: Int, ShortType) => c.toShort case (c: Long, ShortType) => c.toShort + case (c: Byte, IntegerType) => c.toInt + case (c: Short, IntegerType) => c.toInt case (c: Int, IntegerType) => c case (c: Long, IntegerType) => c.toInt + case (c: Byte, LongType) => c.toLong + case (c: Short, LongType) => c.toLong case (c: Int, LongType) => c.toLong case (c: Long, LongType) => c - case (c: Double, FloatType) => c.toFloat - - case (c: Double, DoubleType) => c - case (c: Float, FloatType) => c + case (c: Double, FloatType) => c.toFloat case (c: Float, DoubleType) => c.toDouble + case (c: Double, DoubleType) => c case (c: java.math.BigDecimal, dt: DecimalType) => Decimal(c, dt.precision, dt.scale) From b91dd55fddda53da4d3ae77b222c2fb3e8384d13 Mon Sep 17 00:00:00 2001 From: Xiang Gao Date: Wed, 28 Jun 2017 09:12:50 -0400 Subject: [PATCH 07/37] update some comments --- python/pyspark/sql/tests.py | 13 +++++++++++-- python/pyspark/sql/types.py | 4 ++++ 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 85ada678aa74..affd65820939 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -2263,6 +2263,13 @@ def test_BinaryType_serialization(self): # test for SPARK-16542 def test_array_types(self): + # This test need to make sure that the Scala type selected is at least + # as large as the python's types. This is necessary because python's + # array types depend on C implementation on the machine. Therefore there + # is no machine independent correspondence between python's array types + # and Scala types. + # See: https://docs.python.org/2/library/array.html + int_types = set(['b', 'h', 'i', 'l']) float_types = set(['f', 'd']) unsupported_types = set(array.typecodes) - int_types - float_types @@ -2272,9 +2279,10 @@ def collected(a): rdd = self.sc.parallelize([row]) df = self.spark.createDataFrame(rdd) return df.collect()[0]["myarray"][0] + # test whether pyspark can correctly handle int types for t in int_types: - # test positive numbers + # Start from 1 and keep doubling the number until overflow. a = array.array(t, [1]) while True: try: @@ -2282,7 +2290,7 @@ def collected(a): a[0] *= 2 except OverflowError: break - # test negative numbers + # Start from -1 and keep doubling the number until overflow a = array.array(t, [-1]) while True: try: @@ -2290,6 +2298,7 @@ def collected(a): a[0] *= 2 except OverflowError: break + # test whether pyspark can correctly handle float types for t in float_types: # test upper bound and precision diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 43cad9899836..7be657a2133b 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -930,6 +930,10 @@ def _parse_datatype_json_value(json_value): } # Mapping Python array types to Spark SQL DataType +# We should be careful here. The size of these types in python depends on +# C implementation (See: https://docs.python.org/2/library/array.html). +# We need to make sure that this conversion does not lose any precision. And +# this should be considered in test cases. _array_type_mappings = { 'b': ByteType, 'h': ShortType, From bd8e11176fb11eb1b7333439fa19ae2f19eaab76 Mon Sep 17 00:00:00 2001 From: Xiang Gao Date: Fri, 30 Jun 2017 22:14:30 -0400 Subject: [PATCH 08/37] add typecode 'u' 'c' --- python/pyspark/sql/tests.py | 27 +++++++++++++++++++++++---- python/pyspark/sql/types.py | 11 +++++++++++ 2 files changed, 34 insertions(+), 4 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index affd65820939..b6372ff13879 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -2270,17 +2270,23 @@ def test_array_types(self): # and Scala types. # See: https://docs.python.org/2/library/array.html - int_types = set(['b', 'h', 'i', 'l']) - float_types = set(['f', 'd']) - unsupported_types = set(array.typecodes) - int_types - float_types - def collected(a): row = Row(myarray=a) rdd = self.sc.parallelize([row]) df = self.spark.createDataFrame(rdd) return df.collect()[0]["myarray"][0] + # test whether pyspark can correctly handle string types + string_types = set() + if sys.version < "4": + string_types += set(['u']) + self.assertEqual(collected(array.array('u', ["a"])), "a") + if sys.version < "3": + string_types += set(['c']) + self.assertEqual(collected(array.array('c', ["a"])), "a") + # test whether pyspark can correctly handle int types + int_types = set(['b', 'h', 'i', 'l']) for t in int_types: # Start from 1 and keep doubling the number until overflow. a = array.array(t, [1]) @@ -2300,6 +2306,7 @@ def collected(a): break # test whether pyspark can correctly handle float types + float_types = set(['f', 'd']) for t in float_types: # test upper bound and precision a = array.array(t, [1.0]) @@ -2312,6 +2319,18 @@ def collected(a): while a[0] != 0: self.assertEqual(collected(a), a[0]) a[0] /= 2 + + # make sure that the test case cover all supported types + supported_types = int_types + float_types + string_types + self.assertEqual(supported_types, _array_type_mappings.keys) + + all_type_codes = set() + if sys.version < "3": + all_type_codes += set([ 'c','b','B','u','h','H','i','I','l','L','f','d' ]) + else: + all_type_codes += set(array.typecodes) + unsupported_types = all_type_codes - supported_types + # test whether pyspark can correctly handle unsupported types for t in unsupported_types: try: diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 7be657a2133b..231ab94a7d70 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -949,6 +949,17 @@ def _parse_datatype_json_value(json_value): long: LongType, }) +# Type code 'u' in Python's array is deprecated since version 3.3, and will be +# removed in version 4.0. See: https://docs.python.org/3/library/array.html +if sys.version < "4": + _array_type_mappings.update({ + 'u': StringType + }) + +if sys.version < "3": + _array_type_mappings.update({ + 'c': StringType + }) def _infer_type(obj): """Infer the DataType from obj From 930d16bc26128584bcd6e1194f1340f1bba86fc9 Mon Sep 17 00:00:00 2001 From: Xiang Gao Date: Fri, 30 Jun 2017 22:29:15 -0400 Subject: [PATCH 09/37] fix code style --- python/pyspark/sql/tests.py | 2 +- python/pyspark/sql/types.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index b6372ff13879..275248ec65f9 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -2326,7 +2326,7 @@ def collected(a): all_type_codes = set() if sys.version < "3": - all_type_codes += set([ 'c','b','B','u','h','H','i','I','l','L','f','d' ]) + all_type_codes += set(['c', 'b', 'B', 'u', 'h', 'H', 'i', 'I', 'l', 'L', 'f', 'd']) else: all_type_codes += set(array.typecodes) unsupported_types = all_type_codes - supported_types diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 231ab94a7d70..bf1257dfe77b 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -961,6 +961,7 @@ def _parse_datatype_json_value(json_value): 'c': StringType }) + def _infer_type(obj): """Infer the DataType from obj """ From 1b1c419ff73117508c65a424839063902ce70d21 Mon Sep 17 00:00:00 2001 From: Xiang Gao Date: Sat, 1 Jul 2017 00:04:08 -0400 Subject: [PATCH 10/37] fix a couple of errors --- python/pyspark/sql/tests.py | 24 +++++++++++++----------- python/pyspark/sql/types.py | 19 ++++++++++++++----- 2 files changed, 27 insertions(+), 16 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 275248ec65f9..e1db4d586b14 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -32,6 +32,7 @@ import datetime import array import math +import types import py4j try: @@ -2277,17 +2278,18 @@ def collected(a): return df.collect()[0]["myarray"][0] # test whether pyspark can correctly handle string types - string_types = set() + string_types = [] if sys.version < "4": - string_types += set(['u']) + string_types += ['u'] self.assertEqual(collected(array.array('u', ["a"])), "a") if sys.version < "3": - string_types += set(['c']) + string_types += ['c'] self.assertEqual(collected(array.array('c', ["a"])), "a") # test whether pyspark can correctly handle int types - int_types = set(['b', 'h', 'i', 'l']) - for t in int_types: + int_types = ['b', 'h', 'i', 'l'] + unsigned_types = ['B', 'H', 'I'] + for t in int_types + unsigned_types: # Start from 1 and keep doubling the number until overflow. a = array.array(t, [1]) while True: @@ -2296,6 +2298,7 @@ def collected(a): a[0] *= 2 except OverflowError: break + for t in int_types: # Start from -1 and keep doubling the number until overflow a = array.array(t, [-1]) while True: @@ -2306,7 +2309,7 @@ def collected(a): break # test whether pyspark can correctly handle float types - float_types = set(['f', 'd']) + float_types = ['f', 'd'] for t in float_types: # test upper bound and precision a = array.array(t, [1.0]) @@ -2321,14 +2324,13 @@ def collected(a): a[0] /= 2 # make sure that the test case cover all supported types - supported_types = int_types + float_types + string_types - self.assertEqual(supported_types, _array_type_mappings.keys) + supported_types = int_types + unsigned_types + float_types + string_types + self.assertEqual(supported_types, types._array_type_mappings.keys) - all_type_codes = set() if sys.version < "3": - all_type_codes += set(['c', 'b', 'B', 'u', 'h', 'H', 'i', 'I', 'l', 'L', 'f', 'd']) + all_type_codes = set(['c', 'b', 'B', 'u', 'h', 'H', 'i', 'I', 'l', 'L', 'f', 'd']) else: - all_type_codes += set(array.typecodes) + all_type_codes = set(array.typecodes) unsupported_types = all_type_codes - supported_types # test whether pyspark can correctly handle unsupported types diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index bf1257dfe77b..30d775c9353b 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -929,6 +929,12 @@ def _parse_datatype_json_value(json_value): datetime.time: TimestampType, } +if sys.version < "3": + _type_mappings.update({ + unicode: StringType, + long: LongType, + }) + # Mapping Python array types to Spark SQL DataType # We should be careful here. The size of these types in python depends on # C implementation (See: https://docs.python.org/2/library/array.html). @@ -936,18 +942,20 @@ def _parse_datatype_json_value(json_value): # this should be considered in test cases. _array_type_mappings = { 'b': ByteType, + 'B': ShortType, 'h': ShortType, + 'H': IntegerType, 'i': IntegerType, + 'I': LongType, 'l': LongType, + #'L': not supported + #'q': not supported + #'Q': not supported 'f': FloatType, 'd': DoubleType } -if sys.version < "3": - _type_mappings.update({ - unicode: StringType, - long: LongType, - }) + # Type code 'u' in Python's array is deprecated since version 3.3, and will be # removed in version 4.0. See: https://docs.python.org/3/library/array.html @@ -956,6 +964,7 @@ def _parse_datatype_json_value(json_value): 'u': StringType }) +# Type code 'c' are only available at python 2 if sys.version < "3": _array_type_mappings.update({ 'c': StringType From c4b09f54582f637b376cdd7cdda0616f4d1c0f20 Mon Sep 17 00:00:00 2001 From: Xiang Gao Date: Sat, 1 Jul 2017 09:15:42 -0400 Subject: [PATCH 11/37] handle types according to size --- python/pyspark/sql/types.py | 69 +++++++++++++++++++++++++++++-------- 1 file changed, 54 insertions(+), 15 deletions(-) diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 30d775c9353b..d52d84084abb 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -24,6 +24,7 @@ import re import base64 from array import array +import ctypes if sys.version >= "3": long = int @@ -936,26 +937,64 @@ def _parse_datatype_json_value(json_value): }) # Mapping Python array types to Spark SQL DataType -# We should be careful here. The size of these types in python depends on -# C implementation (See: https://docs.python.org/2/library/array.html). -# We need to make sure that this conversion does not lose any precision. And -# this should be considered in test cases. +# We should be careful here. The size of these types in python depends on C +# implementation. We need to make sure that this conversion does not lose any +# precision. +# +# Reference for C integer size, see: +# ISO/IEC 9899:201x specification, § 5.2.4.2.1 Sizes of integer types . +# Reference for python array typecode, see: +# https://docs.python.org/2/library/array.html +# https://docs.python.org/3.6/library/array.html + +_array_int_typecode_ctype_mappings = { + 'b': ctypes.c_byte, + 'B': ctypes.c_ubyte, + 'h': ctypes.c_short, + 'H': ctypes.c_ushort, + 'i': ctypes.c_int, + 'I': ctypes.c_uint, + 'l': ctypes.c_long, + 'L': ctypes.c_ulong, + 'q': ctypes.c_longlong, + 'Q': ctypes.c_ulonglong +} + +def _int_size_to_type(size): + """ + Return the Scala type from the size of integers. + """ + if size <= 8: + return ByteType + if size <= 16: + return ShortType + if size <= 32: + return IntegerType + if size <= 64: + return LongType + raise TypeError("Not supported type: integer size too large.") + _array_type_mappings = { - 'b': ByteType, - 'B': ShortType, - 'h': ShortType, - 'H': IntegerType, - 'i': IntegerType, - 'I': LongType, - 'l': LongType, - #'L': not supported - #'q': not supported - #'Q': not supported + # Warning: Actual properties for float and double in C is not unspecified. + # On most systems, they are IEEE 754 single-precision binary floating-point + # format and IEEE 754 double-precision binary floating-point. And we do + # do assume the same thing here. This means, in some rare case, the following + # conversion might fail. 'f': FloatType, 'd': DoubleType } - +# compute array typecode mappings for integer types +for _typecode in _array_int_typecode_ctype_mappings.keys(): + size = ctypes.sizeof(_array_int_typecode_ctype_mappings[_typecode]) + if _typecode.isupper(): # 1 extra bit is required to store unsigned types + size += 1 + try: + _array_type_mappings.update({ + _typecode: _int_size_to_type(size) + }) + except: + pass # Type code 'u' in Python's array is deprecated since version 3.3, and will be # removed in version 4.0. See: https://docs.python.org/3/library/array.html From 1a6b24319e73d321631651041eb0c321d23dfc91 Mon Sep 17 00:00:00 2001 From: Xiang Gao Date: Sat, 1 Jul 2017 10:37:05 -0400 Subject: [PATCH 12/37] rewrite test --- python/pyspark/sql/tests.py | 101 +++++++++++++++--------------------- python/pyspark/sql/types.py | 4 +- 2 files changed, 44 insertions(+), 61 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index e1db4d586b14..3533910d1456 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -32,7 +32,9 @@ import datetime import array import math -import types +import ctypes +from pyspark.sql import Row +from pyspark.sql.types import _array_int_typecode_ctype_mappings, _array_type_mappings import py4j try: @@ -2271,61 +2273,47 @@ def test_array_types(self): # and Scala types. # See: https://docs.python.org/2/library/array.html - def collected(a): + def assertCollectSuccess(typecode, value): + a = array.array(typecode, [value]) row = Row(myarray=a) - rdd = self.sc.parallelize([row]) - df = self.spark.createDataFrame(rdd) - return df.collect()[0]["myarray"][0] + df = self.spark.createDataFrame([row]) + self.assertEqual(df.collect()[0]["myarray"][0], value) - # test whether pyspark can correctly handle string types + supported_types = [] + + # test string types string_types = [] if sys.version < "4": - string_types += ['u'] - self.assertEqual(collected(array.array('u', ["a"])), "a") + supported_types += ['u'] + assertCollectSuccess('u', "a") if sys.version < "3": - string_types += ['c'] - self.assertEqual(collected(array.array('c', ["a"])), "a") - - # test whether pyspark can correctly handle int types - int_types = ['b', 'h', 'i', 'l'] - unsigned_types = ['B', 'H', 'I'] - for t in int_types + unsigned_types: - # Start from 1 and keep doubling the number until overflow. - a = array.array(t, [1]) - while True: - try: - self.assertEqual(collected(a), a[0]) - a[0] *= 2 - except OverflowError: - break - for t in int_types: - # Start from -1 and keep doubling the number until overflow - a = array.array(t, [-1]) - while True: - try: - self.assertEqual(collected(a), a[0]) - a[0] *= 2 - except OverflowError: - break - - # test whether pyspark can correctly handle float types - float_types = ['f', 'd'] - for t in float_types: - # test upper bound and precision - a = array.array(t, [1.0]) - while not math.isinf(a[0]): - self.assertEqual(collected(a), a[0]) - a[0] *= 2 - a[0] += 1 - # test lower bound - a = array.array(t, [1.0]) - while a[0] != 0: - self.assertEqual(collected(a), a[0]) - a[0] /= 2 - - # make sure that the test case cover all supported types - supported_types = int_types + unsigned_types + float_types + string_types - self.assertEqual(supported_types, types._array_type_mappings.keys) + supported_types += ['c'] + assertCollectSuccess('c', "a") + + # test float and double, assuming IEEE 754 floating-point format + supported_types += ['f', 'd'] + assertCollectSuccess('f',ctypes.c_float(1e+38).value) + assertCollectSuccess('f',ctypes.c_float(1e-38).value) + assertCollectSuccess('f',ctypes.c_float(1.123456).value) + assertCollectSuccess('d',ctypes.c_double(1e+308).value) + assertCollectSuccess('d',ctypes.c_double(1e+308).value) + assertCollectSuccess('d',ctypes.c_double(1.123456789012345).value) + + # test int types + supported_int = list(set(_array_int_typecode_ctype_mappings.keys()). + intersection(set(_array_type_mappings.keys()))) + supported_types += supported_int + for i in supported_int: + ctype = _array_int_typecode_ctype_mappings[i] + if i.isupper(): + assertCollectSuccess(i, 2 ** (ctypes.sizeof(ctype)) - 1) + else: + max_val = 2 ** (ctypes.sizeof(ctype) - 1) - 1 + assertCollectSuccess(i, max_val) + assertCollectSuccess(i, -max_val) + + # # make sure that the test case cover all supported types + self.assertEqual(set(supported_types), set(_array_type_mappings.keys)) if sys.version < "3": all_type_codes = set(['c', 'b', 'B', 'u', 'h', 'H', 'i', 'I', 'l', 'L', 'f', 'd']) @@ -2335,15 +2323,10 @@ def collected(a): # test whether pyspark can correctly handle unsupported types for t in unsupported_types: - try: + with self.assertRaises(SomeException): a = array.array(t) - c = collected(a) - self.assertTrue(False) # if no exception thrown, fail the test - except TypeError: - pass # catch the expected exception and do nothing - except: - # if incorrect exception thrown, fail the test - self.assertTrue(False) + self.spark.createDataFrame([Row(myarray=a)]).collect() + def test_bucketed_write(self): data = [ diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index d52d84084abb..9f6cfbdd22fa 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -977,8 +977,8 @@ def _int_size_to_type(size): _array_type_mappings = { # Warning: Actual properties for float and double in C is not unspecified. # On most systems, they are IEEE 754 single-precision binary floating-point - # format and IEEE 754 double-precision binary floating-point. And we do - # do assume the same thing here. This means, in some rare case, the following + # format and IEEE 754 double-precision binary floating-point format. And we + # do assume the same thing here. This means in some rare case the following # conversion might fail. 'f': FloatType, 'd': DoubleType From e9f02b024ce674f4a3fa092e79b4bfb61474fdbc Mon Sep 17 00:00:00 2001 From: Xiang Gao Date: Sat, 1 Jul 2017 11:07:23 -0400 Subject: [PATCH 13/37] pass python unit test --- python/pyspark/sql/tests.py | 11 +++++------ python/pyspark/sql/types.py | 18 ++++++++++++------ 2 files changed, 17 insertions(+), 12 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 3533910d1456..a3deb24638a8 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -2282,7 +2282,6 @@ def assertCollectSuccess(typecode, value): supported_types = [] # test string types - string_types = [] if sys.version < "4": supported_types += ['u'] assertCollectSuccess('u', "a") @@ -2306,24 +2305,24 @@ def assertCollectSuccess(typecode, value): for i in supported_int: ctype = _array_int_typecode_ctype_mappings[i] if i.isupper(): - assertCollectSuccess(i, 2 ** (ctypes.sizeof(ctype)) - 1) + assertCollectSuccess(i, 2 ** (ctypes.sizeof(ctype) * 8) - 1) else: - max_val = 2 ** (ctypes.sizeof(ctype) - 1) - 1 + max_val = 2 ** (ctypes.sizeof(ctype) * 8 - 1) - 1 assertCollectSuccess(i, max_val) assertCollectSuccess(i, -max_val) # # make sure that the test case cover all supported types - self.assertEqual(set(supported_types), set(_array_type_mappings.keys)) + self.assertEqual(set(supported_types), set(_array_type_mappings.keys())) if sys.version < "3": all_type_codes = set(['c', 'b', 'B', 'u', 'h', 'H', 'i', 'I', 'l', 'L', 'f', 'd']) else: all_type_codes = set(array.typecodes) - unsupported_types = all_type_codes - supported_types + unsupported_types = all_type_codes - set(supported_types) # test whether pyspark can correctly handle unsupported types for t in unsupported_types: - with self.assertRaises(SomeException): + with self.assertRaises(TypeError): a = array.array(t) self.spark.createDataFrame([Row(myarray=a)]).collect() diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 9f6cfbdd22fa..db27de91a5ca 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -955,11 +955,17 @@ def _parse_datatype_json_value(json_value): 'i': ctypes.c_int, 'I': ctypes.c_uint, 'l': ctypes.c_long, - 'L': ctypes.c_ulong, - 'q': ctypes.c_longlong, - 'Q': ctypes.c_ulonglong + 'L': ctypes.c_ulong } +# TODO: Uncomment this when 'q' and 'Q' are supported by net.razorvine.pickle +# Type code 'q' and 'Q' are not available at python 2 +# if sys.version > "2": +# _array_int_typecode_ctype_mappings.update({ +# 'q': ctypes.c_longlong, +# 'Q': ctypes.c_ulonglong +# }) + def _int_size_to_type(size): """ Return the Scala type from the size of integers. @@ -972,7 +978,7 @@ def _int_size_to_type(size): return IntegerType if size <= 64: return LongType - raise TypeError("Not supported type: integer size too large.") + raise TypeError("not supported type: integer size too large.") _array_type_mappings = { # Warning: Actual properties for float and double in C is not unspecified. @@ -986,14 +992,14 @@ def _int_size_to_type(size): # compute array typecode mappings for integer types for _typecode in _array_int_typecode_ctype_mappings.keys(): - size = ctypes.sizeof(_array_int_typecode_ctype_mappings[_typecode]) + size = ctypes.sizeof(_array_int_typecode_ctype_mappings[_typecode]) * 8 if _typecode.isupper(): # 1 extra bit is required to store unsigned types size += 1 try: _array_type_mappings.update({ _typecode: _int_size_to_type(size) }) - except: + except TypeError: pass # Type code 'u' in Python's array is deprecated since version 3.3, and will be From fe035a660958621b570132db3b8e4f1225e6bc47 Mon Sep 17 00:00:00 2001 From: Xiang Gao Date: Sat, 1 Jul 2017 11:13:59 -0400 Subject: [PATCH 14/37] fix code style --- python/pyspark/sql/tests.py | 17 ++++++++--------- python/pyspark/sql/types.py | 3 ++- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index a3deb24638a8..31c0129e04f2 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -33,8 +33,7 @@ import array import math import ctypes -from pyspark.sql import Row -from pyspark.sql.types import _array_int_typecode_ctype_mappings, _array_type_mappings + import py4j try: @@ -63,6 +62,7 @@ from pyspark.sql import SparkSession, SQLContext, HiveContext, Column, Row from pyspark.sql.types import * from pyspark.sql.types import UserDefinedType, _infer_type +from pyspark.sql.types import _array_int_typecode_ctype_mappings, _array_type_mappings from pyspark.tests import QuietTest, ReusedPySparkTestCase, SparkSubmitTests from pyspark.sql.functions import UserDefinedFunction, sha2, lit from pyspark.sql.window import Window @@ -2291,12 +2291,12 @@ def assertCollectSuccess(typecode, value): # test float and double, assuming IEEE 754 floating-point format supported_types += ['f', 'd'] - assertCollectSuccess('f',ctypes.c_float(1e+38).value) - assertCollectSuccess('f',ctypes.c_float(1e-38).value) - assertCollectSuccess('f',ctypes.c_float(1.123456).value) - assertCollectSuccess('d',ctypes.c_double(1e+308).value) - assertCollectSuccess('d',ctypes.c_double(1e+308).value) - assertCollectSuccess('d',ctypes.c_double(1.123456789012345).value) + assertCollectSuccess('f', ctypes.c_float(1e+38).value) + assertCollectSuccess('f', ctypes.c_float(1e-38).value) + assertCollectSuccess('f', ctypes.c_float(1.123456).value) + assertCollectSuccess('d', ctypes.c_double(1e+308).value) + assertCollectSuccess('d', ctypes.c_double(1e+308).value) + assertCollectSuccess('d', ctypes.c_double(1.123456789012345).value) # test int types supported_int = list(set(_array_int_typecode_ctype_mappings.keys()). @@ -2326,7 +2326,6 @@ def assertCollectSuccess(typecode, value): a = array.array(t) self.spark.createDataFrame([Row(myarray=a)]).collect() - def test_bucketed_write(self): data = [ (1, "foo", 3.0), (2, "foo", 5.0), diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index db27de91a5ca..aa28bb4a2b20 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -966,6 +966,7 @@ def _parse_datatype_json_value(json_value): # 'Q': ctypes.c_ulonglong # }) + def _int_size_to_type(size): """ Return the Scala type from the size of integers. @@ -993,7 +994,7 @@ def _int_size_to_type(size): # compute array typecode mappings for integer types for _typecode in _array_int_typecode_ctype_mappings.keys(): size = ctypes.sizeof(_array_int_typecode_ctype_mappings[_typecode]) * 8 - if _typecode.isupper(): # 1 extra bit is required to store unsigned types + if _typecode.isupper(): # 1 extra bit is required to store unsigned types size += 1 try: _array_type_mappings.update({ From cca2d6aeaff2368f20a13b24e13e82a85cd9ce5e Mon Sep 17 00:00:00 2001 From: Xiang Gao Date: Sat, 1 Jul 2017 11:31:48 -0400 Subject: [PATCH 15/37] small comment improve --- python/pyspark/sql/tests.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 373cf258ae2b..ac2dcea61236 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -2303,16 +2303,15 @@ def assertCollectSuccess(typecode, value): assertCollectSuccess(i, max_val) assertCollectSuccess(i, -max_val) - # # make sure that the test case cover all supported types + # make sure that the test case cover all supported types self.assertEqual(set(supported_types), set(_array_type_mappings.keys())) + # test unsupported types if sys.version < "3": all_type_codes = set(['c', 'b', 'B', 'u', 'h', 'H', 'i', 'I', 'l', 'L', 'f', 'd']) else: all_type_codes = set(array.typecodes) unsupported_types = all_type_codes - set(supported_types) - - # test whether pyspark can correctly handle unsupported types for t in unsupported_types: with self.assertRaises(TypeError): a = array.array(t) From 37e28a4e34a1264118086ef9298c9fab69542a72 Mon Sep 17 00:00:00 2001 From: Xiang Gao Date: Sun, 2 Jul 2017 08:23:08 -0400 Subject: [PATCH 16/37] Retrigger the build From b5af17a2fce12037b4144460c521ded70516a632 Mon Sep 17 00:00:00 2001 From: Xiang Gao Date: Wed, 5 Jul 2017 09:08:07 -0400 Subject: [PATCH 17/37] fix non-ascii code --- python/pyspark/sql/types.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index aa28bb4a2b20..60bf7874deec 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -942,7 +942,7 @@ def _parse_datatype_json_value(json_value): # precision. # # Reference for C integer size, see: -# ISO/IEC 9899:201x specification, § 5.2.4.2.1 Sizes of integer types . +# ISO/IEC 9899:201x specification, chapter 5.2.4.2.1 Sizes of integer types . # Reference for python array typecode, see: # https://docs.python.org/2/library/array.html # https://docs.python.org/3.6/library/array.html From dfca8eb554185556efa50b749570724130fd8b5a Mon Sep 17 00:00:00 2001 From: Xiang Gao Date: Wed, 5 Jul 2017 09:26:55 -0400 Subject: [PATCH 18/37] misc improvement --- python/pyspark/sql/tests.py | 10 +++++----- python/pyspark/sql/types.py | 11 ++++++++--- 2 files changed, 13 insertions(+), 8 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 10f277fc44a0..358dd5649d0d 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -2291,10 +2291,10 @@ def assertCollectSuccess(typecode, value): supported_types = [] # test string types - if sys.version < "4": + if sys.version_info[0] < 4: supported_types += ['u'] assertCollectSuccess('u', "a") - if sys.version < "3": + if sys.version_info[0] < 3: supported_types += ['c'] assertCollectSuccess('c', "a") @@ -2313,9 +2313,9 @@ def assertCollectSuccess(typecode, value): supported_types += supported_int for i in supported_int: ctype = _array_int_typecode_ctype_mappings[i] - if i.isupper(): + if i.isupper(): # unsigned assertCollectSuccess(i, 2 ** (ctypes.sizeof(ctype) * 8) - 1) - else: + else: # signed max_val = 2 ** (ctypes.sizeof(ctype) * 8 - 1) - 1 assertCollectSuccess(i, max_val) assertCollectSuccess(i, -max_val) @@ -2324,7 +2324,7 @@ def assertCollectSuccess(typecode, value): self.assertEqual(set(supported_types), set(_array_type_mappings.keys())) # test unsupported types - if sys.version < "3": + if sys.version_info[0] < 3: all_type_codes = set(['c', 'b', 'B', 'u', 'h', 'H', 'i', 'I', 'l', 'L', 'f', 'd']) else: all_type_codes = set(array.typecodes) diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 531f8d1dea2b..09784f7a46d3 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -960,7 +960,7 @@ def _parse_datatype_json_value(json_value): # TODO: Uncomment this when 'q' and 'Q' are supported by net.razorvine.pickle # Type code 'q' and 'Q' are not available at python 2 -# if sys.version > "2": +# if sys.version_info[0] >= 3: # _array_int_typecode_ctype_mappings.update({ # 'q': ctypes.c_longlong, # 'Q': ctypes.c_ulonglong @@ -981,6 +981,7 @@ def _int_size_to_type(size): return LongType raise TypeError("not supported type: integer size too large.") +# The list of all supported array typecodes is stored here _array_type_mappings = { # Warning: Actual properties for float and double in C is not unspecified. # On most systems, they are IEEE 754 single-precision binary floating-point @@ -1001,17 +1002,21 @@ def _int_size_to_type(size): _typecode: _int_size_to_type(size) }) except TypeError: + # In case when the integer size is too large to be supported by Scala, + # the typecode will be marked as unsupported. + # Exception raised by _int_size_to_type will be catched and ignored here. + # And users will get an exception during the infer. pass # Type code 'u' in Python's array is deprecated since version 3.3, and will be # removed in version 4.0. See: https://docs.python.org/3/library/array.html -if sys.version < "4": +if sys.version_info[0] < 4: _array_type_mappings.update({ 'u': StringType }) # Type code 'c' are only available at python 2 -if sys.version < "3": +if sys.version_info[0] < 3: _array_type_mappings.update({ 'c': StringType }) From 4af97afb973779dadd90567c7a1b001e9eb6334f Mon Sep 17 00:00:00 2001 From: Xiang Gao Date: Wed, 5 Jul 2017 09:31:22 -0400 Subject: [PATCH 19/37] use sys.float_info for double --- python/pyspark/sql/tests.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 358dd5649d0d..07d304f52e79 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -2303,9 +2303,9 @@ def assertCollectSuccess(typecode, value): assertCollectSuccess('f', ctypes.c_float(1e+38).value) assertCollectSuccess('f', ctypes.c_float(1e-38).value) assertCollectSuccess('f', ctypes.c_float(1.123456).value) - assertCollectSuccess('d', ctypes.c_double(1e+308).value) - assertCollectSuccess('d', ctypes.c_double(1e+308).value) - assertCollectSuccess('d', ctypes.c_double(1.123456789012345).value) + assertCollectSuccess('d', sys.float_info.max) + assertCollectSuccess('d', sys.float_info.min) + assertCollectSuccess('d', sys.float_info.epsilon) # test int types supported_int = list(set(_array_int_typecode_ctype_mappings.keys()). From aaf66f1c7cddbc07a16f221d29844032cf9742ca Mon Sep 17 00:00:00 2001 From: Xiang Gao Date: Wed, 5 Jul 2017 09:46:41 -0400 Subject: [PATCH 20/37] improve --- python/pyspark/sql/tests.py | 4 ++-- python/pyspark/sql/types.py | 9 ++++----- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 07d304f52e79..d5ca7ff418cb 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -2313,9 +2313,9 @@ def assertCollectSuccess(typecode, value): supported_types += supported_int for i in supported_int: ctype = _array_int_typecode_ctype_mappings[i] - if i.isupper(): # unsigned + if i.isupper(): # unsigned assertCollectSuccess(i, 2 ** (ctypes.sizeof(ctype) * 8) - 1) - else: # signed + else: # signed max_val = 2 ** (ctypes.sizeof(ctype) * 8 - 1) - 1 assertCollectSuccess(i, max_val) assertCollectSuccess(i, -max_val) diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 09784f7a46d3..7819a95be536 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -983,11 +983,10 @@ def _int_size_to_type(size): # The list of all supported array typecodes is stored here _array_type_mappings = { - # Warning: Actual properties for float and double in C is not unspecified. - # On most systems, they are IEEE 754 single-precision binary floating-point - # format and IEEE 754 double-precision binary floating-point format. And we - # do assume the same thing here. This means in some rare case the following - # conversion might fail. + # Warning: Actual properties for float and double in C is not specified in C. + # On almost every system supported by both python and JVM, they are IEEE 754 + # single-precision binary floating-point format and IEEE 754 double-precision + # binary floating-point format. And we do assume the same thing here for now. 'f': FloatType, 'd': DoubleType } From 385ab1f46a631118d05d34c70c241874241edc57 Mon Sep 17 00:00:00 2001 From: Xiang Gao Date: Wed, 5 Jul 2017 09:51:47 -0400 Subject: [PATCH 21/37] removed unused imports --- python/pyspark/sql/tests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index d5ca7ff418cb..f9f7ceb1ea47 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -63,7 +63,7 @@ from pyspark.sql.types import * from pyspark.sql.types import UserDefinedType, _infer_type, _make_type_verifier from pyspark.sql.types import _array_int_typecode_ctype_mappings, _array_type_mappings -from pyspark.tests import QuietTest, ReusedPySparkTestCase, SparkSubmitTests +from pyspark.tests import ReusedPySparkTestCase, SparkSubmitTests from pyspark.sql.functions import UserDefinedFunction, sha2, lit from pyspark.sql.window import Window from pyspark.sql.utils import AnalysisException, ParseException, IllegalArgumentException From 8352dab99bbed5eb4f5da0bd9e08330a53f21fae Mon Sep 17 00:00:00 2001 From: Xiang Gao Date: Wed, 5 Jul 2017 14:53:00 -0400 Subject: [PATCH 22/37] fix unicode array --- python/pyspark/sql/tests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index f9f7ceb1ea47..a60bbe493901 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -2293,7 +2293,7 @@ def assertCollectSuccess(typecode, value): # test string types if sys.version_info[0] < 4: supported_types += ['u'] - assertCollectSuccess('u', "a") + assertCollectSuccess('u', u"a") if sys.version_info[0] < 3: supported_types += ['c'] assertCollectSuccess('c', "a") From 9612f77f6c1789e91b4333e6f88f45cfed0b6052 Mon Sep 17 00:00:00 2001 From: Xiang Gao Date: Wed, 5 Jul 2017 17:14:15 -0400 Subject: [PATCH 23/37] range of int --- python/pyspark/sql/tests.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index a60bbe493901..5f72c5b91e28 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -2316,8 +2316,8 @@ def assertCollectSuccess(typecode, value): if i.isupper(): # unsigned assertCollectSuccess(i, 2 ** (ctypes.sizeof(ctype) * 8) - 1) else: # signed - max_val = 2 ** (ctypes.sizeof(ctype) * 8 - 1) - 1 - assertCollectSuccess(i, max_val) + max_val = 2 ** (ctypes.sizeof(ctype) * 8 - 1) + assertCollectSuccess(i, max_val - 1) assertCollectSuccess(i, -max_val) # make sure that the test case cover all supported types From f2774c639fdf653ec7d48127b529124dbbb9b60b Mon Sep 17 00:00:00 2001 From: Xiang Gao Date: Thu, 6 Jul 2017 10:13:49 -0400 Subject: [PATCH 24/37] fix containsNull in ArrayType --- python/pyspark/sql/types.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 7819a95be536..7cce9dc5f699 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -1051,7 +1051,7 @@ def _infer_type(obj): return ArrayType(NullType(), True) elif isinstance(obj, array): if obj.typecode in _array_type_mappings: - return ArrayType(_array_type_mappings[obj.typecode](), True) + return ArrayType(_array_type_mappings[obj.typecode](), False) else: raise TypeError("not supported type: array(%s)" % obj.typecode) else: From af9a411a5a793e831c81f85fbb96391fca852807 Mon Sep 17 00:00:00 2001 From: Xiang Gao Date: Mon, 10 Jul 2017 09:08:06 -0400 Subject: [PATCH 25/37] fix map of 'c' in ArrayConstructor --- .../main/scala/org/apache/spark/api/python/SerDeUtil.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala index 42f67e8dbe86..352a6fd9fdb3 100644 --- a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala +++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala @@ -57,11 +57,11 @@ private[spark] object SerDeUtil extends Logging { // }; // TODO: support Py_UNICODE with 2 bytes val machineCodes: Map[Char, Int] = if (ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN)) { - Map('c' -> 1, 'B' -> 0, 'b' -> 1, 'H' -> 3, 'h' -> 5, 'I' -> 7, 'i' -> 9, + Map('c' -> 19, 'B' -> 0, 'b' -> 1, 'H' -> 3, 'h' -> 5, 'I' -> 7, 'i' -> 9, 'L' -> 11, 'l' -> 13, 'f' -> 15, 'd' -> 17, 'u' -> 21 ) } else { - Map('c' -> 1, 'B' -> 0, 'b' -> 1, 'H' -> 2, 'h' -> 4, 'I' -> 6, 'i' -> 8, + Map('c' -> 18, 'B' -> 0, 'b' -> 1, 'H' -> 2, 'h' -> 4, 'I' -> 6, 'i' -> 8, 'L' -> 10, 'l' -> 12, 'f' -> 14, 'd' -> 16, 'u' -> 20 ) } From 9ee439f74b88faa1e79cf55ac50b35f650fedca6 Mon Sep 17 00:00:00 2001 From: Xiang Gao Date: Tue, 11 Jul 2017 16:50:38 -0400 Subject: [PATCH 26/37] fix data alignment --- .../main/scala/org/apache/spark/api/python/SerDeUtil.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala index 352a6fd9fdb3..71551f1d2f4f 100644 --- a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala +++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala @@ -72,7 +72,11 @@ private[spark] object SerDeUtil extends Logging { val typecode = args(0).asInstanceOf[String].charAt(0) // This must be ISO 8859-1 / Latin 1, not UTF-8, to interoperate correctly val data = args(1).asInstanceOf[String].getBytes(StandardCharsets.ISO_8859_1) - construct(typecode, machineCodes(typecode), data) + val machine_code = machineCodes(typecode) + // fix data alignment + val unit_length = if (machine_code==18 || machine_code==19) 2 else 4 + val aligned_data = data ++ Array.fill[Byte](unit_length - data.length % unit_length)(0) + construct(typecode, machine_code, aligned_data) } else if (args.length == 2 && args(0) == "l") { // On Python 2, an array of typecode 'l' should be handled as long rather than int. val values = args(1).asInstanceOf[JArrayList[_]] From 1ca696a42426660cf2c0595908ca4b516ab902fe Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Thu, 13 Jul 2017 00:36:03 +0900 Subject: [PATCH 27/37] Modify ArrayConstructor to handle an array of typecode 'c' in pypy. --- .../apache/spark/api/python/SerDeUtil.scala | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala index 71551f1d2f4f..2662d2372431 100644 --- a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala +++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala @@ -57,11 +57,11 @@ private[spark] object SerDeUtil extends Logging { // }; // TODO: support Py_UNICODE with 2 bytes val machineCodes: Map[Char, Int] = if (ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN)) { - Map('c' -> 19, 'B' -> 0, 'b' -> 1, 'H' -> 3, 'h' -> 5, 'I' -> 7, 'i' -> 9, + Map('B' -> 0, 'b' -> 1, 'H' -> 3, 'h' -> 5, 'I' -> 7, 'i' -> 9, 'L' -> 11, 'l' -> 13, 'f' -> 15, 'd' -> 17, 'u' -> 21 ) } else { - Map('c' -> 18, 'B' -> 0, 'b' -> 1, 'H' -> 2, 'h' -> 4, 'I' -> 6, 'i' -> 8, + Map('B' -> 0, 'b' -> 1, 'H' -> 2, 'h' -> 4, 'I' -> 6, 'i' -> 8, 'L' -> 10, 'l' -> 12, 'f' -> 14, 'd' -> 16, 'u' -> 20 ) } @@ -72,11 +72,17 @@ private[spark] object SerDeUtil extends Logging { val typecode = args(0).asInstanceOf[String].charAt(0) // This must be ISO 8859-1 / Latin 1, not UTF-8, to interoperate correctly val data = args(1).asInstanceOf[String].getBytes(StandardCharsets.ISO_8859_1) - val machine_code = machineCodes(typecode) - // fix data alignment - val unit_length = if (machine_code==18 || machine_code==19) 2 else 4 - val aligned_data = data ++ Array.fill[Byte](unit_length - data.length % unit_length)(0) - construct(typecode, machine_code, aligned_data) + if (typecode == 'c') { + val result = new Array[Char](data.length) + var i = 0 + while (i < data.length) { + result(i) = data(i).toChar + i += 1 + } + result + } else { + construct(typecode, machineCodes(typecode), data) + } } else if (args.length == 2 && args(0) == "l") { // On Python 2, an array of typecode 'l' should be handled as long rather than int. val values = args(1).asInstanceOf[JArrayList[_]] From f15e06113963a3feef92d47b10bb16af9050b479 Mon Sep 17 00:00:00 2001 From: Takuya UESHIN Date: Thu, 13 Jul 2017 14:05:45 +0900 Subject: [PATCH 28/37] Add a comment. --- .../src/main/scala/org/apache/spark/api/python/SerDeUtil.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala index 2662d2372431..8a76fa9ad54a 100644 --- a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala +++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala @@ -73,6 +73,9 @@ private[spark] object SerDeUtil extends Logging { // This must be ISO 8859-1 / Latin 1, not UTF-8, to interoperate correctly val data = args(1).asInstanceOf[String].getBytes(StandardCharsets.ISO_8859_1) if (typecode == 'c') { + // It seems like the pickle of pypy uses the similar protocol to Python 2.6, which uses + // a string for array data instead of list as Python 2.7, and handles an array of + // typecode 'c' as 1-byte character. val result = new Array[Char](data.length) var i = 0 while (i < data.length) { From e0cced0bc7c03e05cca46366ec5361faa547e004 Mon Sep 17 00:00:00 2001 From: Xiang Gao Date: Fri, 14 Jul 2017 13:30:04 -0400 Subject: [PATCH 29/37] some improvements according to review --- python/pyspark/sql/tests.py | 83 +++++++++++++++++++++++-------------- python/pyspark/sql/types.py | 22 +++------- 2 files changed, 58 insertions(+), 47 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 42a5271085a4..e922ad79e71d 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -2327,23 +2327,29 @@ def test_array_types(self): # See: https://docs.python.org/2/library/array.html def assertCollectSuccess(typecode, value): - a = array.array(typecode, [value]) - row = Row(myarray=a) + row = Row(myarray=array.array(typecode, [value])) df = self.spark.createDataFrame([row]) - self.assertEqual(df.collect()[0]["myarray"][0], value) + self.assertEqual(df.first()["myarray"][0], value) - supported_types = [] - - # test string types + # supported string types + # + # String types in python's array are "u" for Py_UNICODE and "c" for char. + # "u" will be removed in python 4, and "c" is not supported in python 3. + supported_string_types = [] if sys.version_info[0] < 4: - supported_types += ['u'] - assertCollectSuccess('u', u"a") + supported_string_types += ['u'] + # test unicode + assertCollectSuccess('u', u'a') if sys.version_info[0] < 3: - supported_types += ['c'] - assertCollectSuccess('c', "a") - - # test float and double, assuming IEEE 754 floating-point format - supported_types += ['f', 'd'] + supported_string_types += ['c'] + # test string + assertCollectSuccess('c', 'a') + + # supported float and double + # + # Test max, min, and precision for float and double, assuming IEEE 754 + # floating-point format. + supported_fractional_types = ['f', 'd'] assertCollectSuccess('f', ctypes.c_float(1e+38).value) assertCollectSuccess('f', ctypes.c_float(1e-38).value) assertCollectSuccess('f', ctypes.c_float(1.123456).value) @@ -2351,28 +2357,45 @@ def assertCollectSuccess(typecode, value): assertCollectSuccess('d', sys.float_info.min) assertCollectSuccess('d', sys.float_info.epsilon) - # test int types - supported_int = list(set(_array_int_typecode_ctype_mappings.keys()). - intersection(set(_array_type_mappings.keys()))) - supported_types += supported_int - for i in supported_int: - ctype = _array_int_typecode_ctype_mappings[i] - if i.isupper(): # unsigned - assertCollectSuccess(i, 2 ** (ctypes.sizeof(ctype) * 8) - 1) - else: # signed + # supported int types + # + # The Largest integral type supported in Scala is Long, a 64-bit signed + # integer. Only types with smaller or equal size are supported. + supported_int_types = list( + set(_array_int_typecode_ctype_mappings.keys()) + .intersection(set(_array_type_mappings.keys()))) + for t in supported_int_types: + ctype = _array_int_typecode_ctype_mappings[t] + if t.isupper(): + # test unsigned int types + assertCollectSuccess(t, 2 ** (ctypes.sizeof(ctype) * 8) - 1) + else: + # test signed int types max_val = 2 ** (ctypes.sizeof(ctype) * 8 - 1) - assertCollectSuccess(i, max_val - 1) - assertCollectSuccess(i, -max_val) - - # make sure that the test case cover all supported types + assertCollectSuccess(t, max_val - 1) + assertCollectSuccess(t, -max_val) + + # all supported types + # + # Make sure the types tested above: + # 1. are all supported types + # 2. cover all supported types + supported_types = (supported_string_types + + supported_fractional_types + + supported_int_types) self.assertEqual(set(supported_types), set(_array_type_mappings.keys())) - # test unsupported types + # all unsupported types + # + # Keys in _array_type_mappings is a complete list of all supported types, + # and types not in _array_type_mappings are considered unsupported. + # `array.typecodes` are not supported in python 2. if sys.version_info[0] < 3: - all_type_codes = set(['c', 'b', 'B', 'u', 'h', 'H', 'i', 'I', 'l', 'L', 'f', 'd']) + all_types = set(['c', 'b', 'B', 'u', 'h', 'H', 'i', 'I', 'l', 'L', 'f', 'd']) else: - all_type_codes = set(array.typecodes) - unsupported_types = all_type_codes - set(supported_types) + all_types = set(array.typecodes) + unsupported_types = all_types - set(supported_types) + # test unsupported types for t in unsupported_types: with self.assertRaises(TypeError): a = array.array(t) diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 36b157a90178..b700ef7b01b7 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -959,7 +959,6 @@ def _int_size_to_type(size): return IntegerType if size <= 64: return LongType - raise TypeError("not supported type: integer size too large.") # The list of all supported array typecodes is stored here _array_type_mappings = { @@ -976,29 +975,18 @@ def _int_size_to_type(size): size = ctypes.sizeof(_array_int_typecode_ctype_mappings[_typecode]) * 8 if _typecode.isupper(): # 1 extra bit is required to store unsigned types size += 1 - try: - _array_type_mappings.update({ - _typecode: _int_size_to_type(size) - }) - except TypeError: - # In case when the integer size is too large to be supported by Scala, - # the typecode will be marked as unsupported. - # Exception raised by _int_size_to_type will be catched and ignored here. - # And users will get an exception during the infer. - pass + dt = _int_size_to_type(size) + if dt is not None: + _array_type_mappings[_typecode] = dt # Type code 'u' in Python's array is deprecated since version 3.3, and will be # removed in version 4.0. See: https://docs.python.org/3/library/array.html if sys.version_info[0] < 4: - _array_type_mappings.update({ - 'u': StringType - }) + _array_type_mappings['u'] = StringType # Type code 'c' are only available at python 2 if sys.version_info[0] < 3: - _array_type_mappings.update({ - 'c': StringType - }) + _array_type_mappings['c'] = StringType def _infer_type(obj): From 788b0da5045397861ccac5c7933823facd4402ba Mon Sep 17 00:00:00 2001 From: Xiang Gao Date: Fri, 14 Jul 2017 13:37:10 -0400 Subject: [PATCH 30/37] fix code style --- python/pyspark/sql/tests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index e922ad79e71d..94f3c40b642b 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -2363,7 +2363,7 @@ def assertCollectSuccess(typecode, value): # integer. Only types with smaller or equal size are supported. supported_int_types = list( set(_array_int_typecode_ctype_mappings.keys()) - .intersection(set(_array_type_mappings.keys()))) + .intersection(set(_array_type_mappings.keys()))) for t in supported_int_types: ctype = _array_int_typecode_ctype_mappings[t] if t.isupper(): From a87bc9e8f32c35f21ce5c6a27842b36554d0fbac Mon Sep 17 00:00:00 2001 From: Xiang Gao Date: Fri, 14 Jul 2017 13:52:07 -0400 Subject: [PATCH 31/37] fix add JIRA for 'q' and 'Q' --- python/pyspark/sql/types.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index b700ef7b01b7..e8855baf92d7 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -938,7 +938,8 @@ def _parse_datatype_json_value(json_value): 'L': ctypes.c_ulong } -# TODO: Uncomment this when 'q' and 'Q' are supported by net.razorvine.pickle +# TODO: [SPARK-21420] +# Uncomment this when 'q' and 'Q' are supported by net.razorvine.pickle # Type code 'q' and 'Q' are not available at python 2 # if sys.version_info[0] >= 3: # _array_int_typecode_ctype_mappings.update({ From 6c49a4856ab26d9f207bd781f123fe01b03e13b2 Mon Sep 17 00:00:00 2001 From: Xiang Gao Date: Fri, 14 Jul 2017 14:31:31 -0400 Subject: [PATCH 32/37] store signed/unsigned in different list --- python/pyspark/sql/tests.py | 42 ++++++++++++++++++++++--------------- python/pyspark/sql/types.py | 39 +++++++++++++++++++++------------- 2 files changed, 50 insertions(+), 31 deletions(-) diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 94f3c40b642b..8610530796c4 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -62,7 +62,8 @@ from pyspark.sql import SparkSession, SQLContext, HiveContext, Column, Row from pyspark.sql.types import * from pyspark.sql.types import UserDefinedType, _infer_type, _make_type_verifier -from pyspark.sql.types import _array_int_typecode_ctype_mappings, _array_type_mappings +from pyspark.sql.types import _array_signed_int_typecode_ctype_mappings, _array_type_mappings +from pyspark.sql.types import _array_unsigned_int_typecode_ctype_mappings from pyspark.tests import QuietTest, ReusedPySparkTestCase, SparkSubmitTests from pyspark.sql.functions import UserDefinedFunction, sha2, lit from pyspark.sql.window import Window @@ -2357,23 +2358,29 @@ def assertCollectSuccess(typecode, value): assertCollectSuccess('d', sys.float_info.min) assertCollectSuccess('d', sys.float_info.epsilon) - # supported int types + # supported signed int types # - # The Largest integral type supported in Scala is Long, a 64-bit signed - # integer. Only types with smaller or equal size are supported. - supported_int_types = list( - set(_array_int_typecode_ctype_mappings.keys()) + # The size of C types changes with implementation, we need to make sure + # that there is no overflow error on the platform running this test. + supported_signed_int_types = list( + set(_array_signed_int_typecode_ctype_mappings.keys()) .intersection(set(_array_type_mappings.keys()))) - for t in supported_int_types: - ctype = _array_int_typecode_ctype_mappings[t] - if t.isupper(): - # test unsigned int types - assertCollectSuccess(t, 2 ** (ctypes.sizeof(ctype) * 8) - 1) - else: - # test signed int types - max_val = 2 ** (ctypes.sizeof(ctype) * 8 - 1) - assertCollectSuccess(t, max_val - 1) - assertCollectSuccess(t, -max_val) + for t in supported_signed_int_types: + ctype = _array_signed_int_typecode_ctype_mappings[t] + max_val = 2 ** (ctypes.sizeof(ctype) * 8 - 1) + assertCollectSuccess(t, max_val - 1) + assertCollectSuccess(t, -max_val) + + # supported unsigned int types + # + # JVM does not have unsigned types. We need to be very careful to make + # sure that there is no overflow error. + supported_unsigned_int_types = list( + set(_array_unsigned_int_typecode_ctype_mappings.keys()) + .intersection(set(_array_type_mappings.keys()))) + for t in supported_unsigned_int_types: + ctype = _array_unsigned_int_typecode_ctype_mappings[t] + assertCollectSuccess(t, 2 ** (ctypes.sizeof(ctype) * 8) - 1) # all supported types # @@ -2382,7 +2389,8 @@ def assertCollectSuccess(typecode, value): # 2. cover all supported types supported_types = (supported_string_types + supported_fractional_types + - supported_int_types) + supported_signed_int_types + + supported_unsigned_int_types) self.assertEqual(set(supported_types), set(_array_type_mappings.keys())) # all unsupported types diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index e8855baf92d7..6064e7c545a8 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -919,22 +919,28 @@ def _parse_datatype_json_value(json_value): # Mapping Python array types to Spark SQL DataType # We should be careful here. The size of these types in python depends on C # implementation. We need to make sure that this conversion does not lose any -# precision. +# precision. Also, JVM only support signed types, when converting unsigned types, +# keep in mind that it required 1 more bit when stored as singed types. # # Reference for C integer size, see: # ISO/IEC 9899:201x specification, chapter 5.2.4.2.1 Sizes of integer types . # Reference for python array typecode, see: # https://docs.python.org/2/library/array.html # https://docs.python.org/3.6/library/array.html +# Reference for JVM's supported integral types: +# http://docs.oracle.com/javase/specs/jvms/se8/html/jvms-2.html#jvms-2.3.1 -_array_int_typecode_ctype_mappings = { +_array_signed_int_typecode_ctype_mappings = { 'b': ctypes.c_byte, - 'B': ctypes.c_ubyte, 'h': ctypes.c_short, - 'H': ctypes.c_ushort, 'i': ctypes.c_int, - 'I': ctypes.c_uint, 'l': ctypes.c_long, +} + +_array_unsigned_int_typecode_ctype_mappings = { + 'B': ctypes.c_ubyte, + 'H': ctypes.c_ushort, + 'I': ctypes.c_uint, 'L': ctypes.c_ulong } @@ -942,10 +948,8 @@ def _parse_datatype_json_value(json_value): # Uncomment this when 'q' and 'Q' are supported by net.razorvine.pickle # Type code 'q' and 'Q' are not available at python 2 # if sys.version_info[0] >= 3: -# _array_int_typecode_ctype_mappings.update({ -# 'q': ctypes.c_longlong, -# 'Q': ctypes.c_ulonglong -# }) +# _array_signed_int_typecode_ctype_mappings['q'] = ctypes.c_longlong +# _array_unsigned_int_typecode_ctype_mappings['Q'] = ctypes.c_ulonglong def _int_size_to_type(size): @@ -971,11 +975,18 @@ def _int_size_to_type(size): 'd': DoubleType } -# compute array typecode mappings for integer types -for _typecode in _array_int_typecode_ctype_mappings.keys(): - size = ctypes.sizeof(_array_int_typecode_ctype_mappings[_typecode]) * 8 - if _typecode.isupper(): # 1 extra bit is required to store unsigned types - size += 1 +# compute array typecode mappings for signed integer types +for _typecode in _array_signed_int_typecode_ctype_mappings.keys(): + size = ctypes.sizeof(_array_signed_int_typecode_ctype_mappings[_typecode]) * 8 + dt = _int_size_to_type(size) + if dt is not None: + _array_type_mappings[_typecode] = dt + +# compute array typecode mappings for unsigned integer types +for _typecode in _array_unsigned_int_typecode_ctype_mappings.keys(): + # JVM does not have unsigned types, so use signed types that is at list 1 + # bit larger to store + size = ctypes.sizeof(_array_unsigned_int_typecode_ctype_mappings[_typecode]) * 8 + 1 dt = _int_size_to_type(size) if dt is not None: _array_type_mappings[_typecode] = dt From ae1aef0c40b5e72be324bd862cdf7372c798573a Mon Sep 17 00:00:00 2001 From: Xiang Gao Date: Tue, 18 Jul 2017 09:00:34 -0400 Subject: [PATCH 33/37] improvement according to review --- .../main/scala/org/apache/spark/api/python/SerDeUtil.scala | 1 - python/pyspark/sql/tests.py | 4 +--- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala index 8a76fa9ad54a..aaf8e7a1d746 100644 --- a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala +++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala @@ -55,7 +55,6 @@ private[spark] object SerDeUtil extends Logging { // {'d', sizeof(double), d_getitem, d_setitem}, // {'\0', 0, 0, 0} /* Sentinel */ // }; - // TODO: support Py_UNICODE with 2 bytes val machineCodes: Map[Char, Int] = if (ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN)) { Map('B' -> 0, 'b' -> 1, 'H' -> 3, 'h' -> 5, 'I' -> 7, 'i' -> 9, 'L' -> 11, 'l' -> 13, 'f' -> 15, 'd' -> 17, 'u' -> 21 diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py index 8610530796c4..d8e2da9ffed2 100644 --- a/python/pyspark/sql/tests.py +++ b/python/pyspark/sql/tests.py @@ -31,11 +31,9 @@ import time import datetime import array -import math import ctypes - - import py4j + try: import xmlrunner except ImportError: From cafa5fdc22270847657a29b0470dfd0ca4ad4ad2 Mon Sep 17 00:00:00 2001 From: Xiang Gao Date: Tue, 18 Jul 2017 09:03:36 -0400 Subject: [PATCH 34/37] remove the TODO for [SPARK-21420] --- python/pyspark/sql/types.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 6064e7c545a8..7c3d8958f383 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -944,13 +944,6 @@ def _parse_datatype_json_value(json_value): 'L': ctypes.c_ulong } -# TODO: [SPARK-21420] -# Uncomment this when 'q' and 'Q' are supported by net.razorvine.pickle -# Type code 'q' and 'Q' are not available at python 2 -# if sys.version_info[0] >= 3: -# _array_signed_int_typecode_ctype_mappings['q'] = ctypes.c_longlong -# _array_unsigned_int_typecode_ctype_mappings['Q'] = ctypes.c_ulonglong - def _int_size_to_type(size): """ From 88091ea4bfd7ee6311302d3b68360df3f2065f51 Mon Sep 17 00:00:00 2001 From: Xiang Gao Date: Tue, 18 Jul 2017 23:31:34 -0400 Subject: [PATCH 35/37] dirty hacking to recover 'L' support in python2 --- python/pyspark/sql/types.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 7c3d8958f383..199d58c0efc8 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -993,6 +993,14 @@ def _int_size_to_type(size): if sys.version_info[0] < 3: _array_type_mappings['c'] = StringType +# SPARK-21465: +# In python2, array of 'L' happened to be mistakenly partially supported. To +# avoid breaking user's code, we should keep this partial support. Below is a +# dirty hacking to keep this partial support and make the unit test passes +if sys.version_info[0] < 3 and 'L' not in _array_type_mappings.keys(): + _array_type_mappings['L'] = LongType + _array_unsigned_int_typecode_ctype_mappings['L'] = ctypes.c_uint + def _infer_type(obj): """Infer the DataType from obj From cdc7257080b76e1a745fe5d2c4f027dba96f85a9 Mon Sep 17 00:00:00 2001 From: Xiang Gao Date: Wed, 19 Jul 2017 08:51:02 -0400 Subject: [PATCH 36/37] modify dirty hacking to unsupport pypy for 'L' --- python/pyspark/sql/types.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 199d58c0efc8..843ad15df371 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -997,9 +997,11 @@ def _int_size_to_type(size): # In python2, array of 'L' happened to be mistakenly partially supported. To # avoid breaking user's code, we should keep this partial support. Below is a # dirty hacking to keep this partial support and make the unit test passes -if sys.version_info[0] < 3 and 'L' not in _array_type_mappings.keys(): - _array_type_mappings['L'] = LongType - _array_unsigned_int_typecode_ctype_mappings['L'] = ctypes.c_uint +import platform +if sys.version_info[0] < 3 and platform.python_implementation() != 'PyPy': + if 'L' not in _array_type_mappings.keys(): + _array_type_mappings['L'] = LongType + _array_unsigned_int_typecode_ctype_mappings['L'] = ctypes.c_uint def _infer_type(obj): From a3407459405c2a5b3c7539d5075853e65c80f9cd Mon Sep 17 00:00:00 2001 From: "Gao, Xiang" Date: Wed, 19 Jul 2017 16:32:03 -0400 Subject: [PATCH 37/37] fix typos --- python/pyspark/sql/types.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 843ad15df371..c376805c3273 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -947,7 +947,7 @@ def _parse_datatype_json_value(json_value): def _int_size_to_type(size): """ - Return the Scala type from the size of integers. + Return the Catalyst datatype from the size of integers. """ if size <= 8: return ByteType @@ -977,7 +977,7 @@ def _int_size_to_type(size): # compute array typecode mappings for unsigned integer types for _typecode in _array_unsigned_int_typecode_ctype_mappings.keys(): - # JVM does not have unsigned types, so use signed types that is at list 1 + # JVM does not have unsigned types, so use signed types that is at least 1 # bit larger to store size = ctypes.sizeof(_array_unsigned_int_typecode_ctype_mappings[_typecode]) * 8 + 1 dt = _int_size_to_type(size)