From dfdebb3130465b2bec4c85cfe5f98eeda2616ff6 Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Tue, 1 Dec 2015 15:03:03 +0800 Subject: [PATCH 1/6] [SPARK-12070][PYSPARK] PySpark implementation of Slicing operator incorrect --- python/pyspark/sql/column.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/python/pyspark/sql/column.py b/python/pyspark/sql/column.py index 81fd4e782628a..f052fdb18edd8 100644 --- a/python/pyspark/sql/column.py +++ b/python/pyspark/sql/column.py @@ -259,10 +259,30 @@ def substr(self, startPos, length): >>> df.select(df.name.substr(1, 3).alias("col")).collect() [Row(col=u'Ali'), Row(col=u'Bob')] + >>> df.select(df.name[1:3].alias("col")).collect() + [Row(col=u'Ali'), Row(col=u'Bob')] + >>> df.select(df.name[2:].alias("col")).collect() + [Row(col=u'lice'), Row(col=u'ob')] """ if type(startPos) != type(length): raise TypeError("Can not mix the type") if isinstance(startPos, (int, long)): + javaMaxInt = 2147483647 + if startPos > javaMaxInt: + raise ValueError("startPos is larger than the java max int value " + "which is not supported by pyspark, startPos=" + str(startPos)) + + if length == sys.maxint: + # length == sys.maxint when using syntax str[1:] + # cut it down to java max int because java api of substr only support int type + # of length + warnings.warn("PySpark's substr only support int type of length, " + "please make sure the length you specify is less than 2147483647") + length = javaMaxInt + elif length > javaMaxInt: + raise ValueError("length is larger than the java max int value " + "which is not supported by pyspark, length=" + str(length)) + jc = self._jc.substr(startPos, length) elif isinstance(startPos, Column): jc = self._jc.substr(startPos._jc, length._jc) From 5201882f481741bb73d5b277212b5078a7c632fe Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Tue, 1 Dec 2015 15:38:16 +0800 Subject: [PATCH 2/6] fix code style --- python/pyspark/sql/column.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/python/pyspark/sql/column.py b/python/pyspark/sql/column.py index f052fdb18edd8..78abc98fb8998 100644 --- a/python/pyspark/sql/column.py +++ b/python/pyspark/sql/column.py @@ -273,14 +273,14 @@ def substr(self, startPos, length): "which is not supported by pyspark, startPos=" + str(startPos)) if length == sys.maxint: - # length == sys.maxint when using syntax str[1:] - # cut it down to java max int because java api of substr only support int type - # of length - warnings.warn("PySpark's substr only support int type of length, " - "please make sure the length you specify is less than 2147483647") - length = javaMaxInt + # length == sys.maxint when using syntax str[1:] + # cut it down to java max int because java api of substr only support int type + # of length + warnings.warn("PySpark's substr only support int type of length, " + "please make sure the length you specify is less than 2147483647") + length = javaMaxInt elif length > javaMaxInt: - raise ValueError("length is larger than the java max int value " + raise ValueError("length is larger than the java max int value " "which is not supported by pyspark, length=" + str(length)) jc = self._jc.substr(startPos, length) From a42528ea09110c7e2597b9e05af0ab77cec8888c Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Tue, 1 Dec 2015 16:15:16 +0800 Subject: [PATCH 3/6] fix code style --- python/pyspark/sql/column.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/python/pyspark/sql/column.py b/python/pyspark/sql/column.py index 78abc98fb8998..c3cb22bb43347 100644 --- a/python/pyspark/sql/column.py +++ b/python/pyspark/sql/column.py @@ -269,19 +269,19 @@ def substr(self, startPos, length): if isinstance(startPos, (int, long)): javaMaxInt = 2147483647 if startPos > javaMaxInt: - raise ValueError("startPos is larger than the java max int value " - "which is not supported by pyspark, startPos=" + str(startPos)) + raise ValueError("startPos is larger than the java max int value " + "which is not supported by pyspark, startPos=" + str(startPos)) if length == sys.maxint: # length == sys.maxint when using syntax str[1:] # cut it down to java max int because java api of substr only support int type # of length warnings.warn("PySpark's substr only support int type of length, " - "please make sure the length you specify is less than 2147483647") + "please make sure the length you specify is less than 2147483647") length = javaMaxInt elif length > javaMaxInt: raise ValueError("length is larger than the java max int value " - "which is not supported by pyspark, length=" + str(length)) + "which is not supported by pyspark, length=" + str(length)) jc = self._jc.substr(startPos, length) elif isinstance(startPos, Column): From 8b7fc5a5a8b2cf1f31a2be131a5cd4d55a02b016 Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Tue, 1 Dec 2015 20:28:32 +0800 Subject: [PATCH 4/6] fix the test failure --- python/pyspark/sql/column.py | 43 ++++++++++++++++++++++++------------ 1 file changed, 29 insertions(+), 14 deletions(-) diff --git a/python/pyspark/sql/column.py b/python/pyspark/sql/column.py index c3cb22bb43347..b1f9ebb904c8b 100644 --- a/python/pyspark/sql/column.py +++ b/python/pyspark/sql/column.py @@ -183,13 +183,33 @@ def __init__(self, jc): # container operators __contains__ = _bin_op("contains") - __getitem__ = _bin_op("apply") # bitwise operators bitwiseOR = _bin_op("bitwiseOR") bitwiseAND = _bin_op("bitwiseAND") bitwiseXOR = _bin_op("bitwiseXOR") + def __getitem__(self, key): + if isinstance(key, slice): + if key.step is not None and key.step != 1: + raise ValueError("PySpark doesn't support slice with step") + if key.start is not None and key.stop is not None: + # str[i:j] + return self.substr(key.start, key.stop - key.start) + elif key.start is not None: + # str[i:] + return self.substr(key.start) + elif key.stop is not None: + # str[:j] + return self.substr(1, key.stop - 1) + else: + # str[:] + return self + else: + jc = key._jc if isinstance(key, Column) else key + njc = getattr(self._jc, "apply")(jc) + return Column(njc) + @since(1.3) def getItem(self, key): """ @@ -250,7 +270,7 @@ def __iter__(self): @ignore_unicode_prefix @since(1.3) - def substr(self, startPos, length): + def substr(self, startPos, length = None): """ Return a :class:`Column` which is a substring of the column. @@ -259,30 +279,27 @@ def substr(self, startPos, length): >>> df.select(df.name.substr(1, 3).alias("col")).collect() [Row(col=u'Ali'), Row(col=u'Bob')] - >>> df.select(df.name[1:3].alias("col")).collect() + >>> df.select(df.name[1:4].alias("col")).collect() [Row(col=u'Ali'), Row(col=u'Bob')] >>> df.select(df.name[2:].alias("col")).collect() [Row(col=u'lice'), Row(col=u'ob')] + >>> df.select(df.name[:3].alias("col")).collect() + [Row(col=u'Al'), Row(col=u'Bo')] + >>> df.select(df.name[:].alias("col")).collect() + [Row(col=u'Alice'), Row(col=u'Bob')] """ - if type(startPos) != type(length): + if length is not None and type(startPos) != type(length): raise TypeError("Can not mix the type") if isinstance(startPos, (int, long)): javaMaxInt = 2147483647 if startPos > javaMaxInt: raise ValueError("startPos is larger than the java max int value " "which is not supported by pyspark, startPos=" + str(startPos)) - - if length == sys.maxint: - # length == sys.maxint when using syntax str[1:] - # cut it down to java max int because java api of substr only support int type - # of length - warnings.warn("PySpark's substr only support int type of length, " - "please make sure the length you specify is less than 2147483647") + if length is None: length = javaMaxInt elif length > javaMaxInt: raise ValueError("length is larger than the java max int value " "which is not supported by pyspark, length=" + str(length)) - jc = self._jc.substr(startPos, length) elif isinstance(startPos, Column): jc = self._jc.substr(startPos._jc, length._jc) @@ -290,8 +307,6 @@ def substr(self, startPos, length): raise TypeError("Unexpected type: %s" % type(startPos)) return Column(jc) - __getslice__ = substr - @ignore_unicode_prefix @since(1.3) def inSet(self, *cols): From 9c20341a3410041050332e46f5ea48b5a3487ca0 Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Tue, 1 Dec 2015 20:40:05 +0800 Subject: [PATCH 5/6] minor change on format --- python/pyspark/sql/column.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/column.py b/python/pyspark/sql/column.py index b1f9ebb904c8b..0ac46105d7170 100644 --- a/python/pyspark/sql/column.py +++ b/python/pyspark/sql/column.py @@ -270,7 +270,7 @@ def __iter__(self): @ignore_unicode_prefix @since(1.3) - def substr(self, startPos, length = None): + def substr(self, startPos, length=None): """ Return a :class:`Column` which is a substring of the column. From fa89b5a6325c8007e013fb9b0f5644e9b56e4be6 Mon Sep 17 00:00:00 2001 From: Jeff Zhang Date: Wed, 2 Dec 2015 09:36:28 +0800 Subject: [PATCH 6/6] minor change --- python/pyspark/sql/column.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/column.py b/python/pyspark/sql/column.py index 0ac46105d7170..4639da690662a 100644 --- a/python/pyspark/sql/column.py +++ b/python/pyspark/sql/column.py @@ -291,7 +291,7 @@ def substr(self, startPos, length=None): if length is not None and type(startPos) != type(length): raise TypeError("Can not mix the type") if isinstance(startPos, (int, long)): - javaMaxInt = 2147483647 + javaMaxInt = SparkContext._active_spark_context._jvm.java.lang.Integer.MAX_VALUE if startPos > javaMaxInt: raise ValueError("startPos is larger than the java max int value " "which is not supported by pyspark, startPos=" + str(startPos))