From 4a9eb935b8438a159c9f12239135eedd59b25fd3 Mon Sep 17 00:00:00 2001 From: Andrew Ray Date: Wed, 13 Sep 2017 20:26:15 -0500 Subject: [PATCH 1/8] remove check and add tests --- python/pyspark/serializers.py | 6 +++--- python/pyspark/tests.py | 12 ++++++++++++ 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index d5c2a7518b18..a464366bb922 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -333,6 +333,9 @@ class PairDeserializer(Serializer): Deserializes the JavaRDD zip() of two PythonRDDs. Due to pyspark batching we cannot simply use the result of the Java RDD zip, we additionally need to do the zip within each pair of batches. + + It is the responsibility of the user of this class to ensure the batch sizes of the key and + value serializer are the same size. If they are not this will give incorrect results. """ def __init__(self, key_ser, val_ser): @@ -343,9 +346,6 @@ def _load_stream_without_unbatching(self, stream): key_batch_stream = self.key_ser._load_stream_without_unbatching(stream) val_batch_stream = self.val_ser._load_stream_without_unbatching(stream) for (key_batch, val_batch) in zip(key_batch_stream, val_batch_stream): - if len(key_batch) != len(val_batch): - raise ValueError("Can not deserialize PairRDD with different number of items" - " in batches: (%d, %d)" % (len(key_batch), len(val_batch))) # for correctness with repeated cartesian/zip this must be returned as one batch yield zip(key_batch, val_batch) diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 000dd1eb8e48..cb8dbe7f24cf 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -644,6 +644,18 @@ def test_cartesian_chaining(self): set([(x, (y, y)) for x in range(10) for y in range(10)]) ) + def test_zip_chaining(self): + # Tests for SPARK-21985 + rdd = self.sc.parallelize(range(10), 2) + self.assertSetEqual( + set(rdd.zip(rdd).zip(rdd).collect()), + set([((x, x), x) for x in range(10)]) + ) + self.assertSetEqual( + set(rdd.zip(rdd.zip(rdd)).collect()), + set([((x, (x, x)) for x in range(10)]) + ) + def test_deleting_input_files(self): # Regression test for SPARK-1025 tempFile = tempfile.NamedTemporaryFile(delete=False) From 0d64a6d11237383c2a6ea21275dc9daa5cc8d634 Mon Sep 17 00:00:00 2001 From: Andrew Ray Date: Wed, 13 Sep 2017 20:46:18 -0500 Subject: [PATCH 2/8] woops --- python/pyspark/tests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index cb8dbe7f24cf..1bcdedc1db2a 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -653,7 +653,7 @@ def test_zip_chaining(self): ) self.assertSetEqual( set(rdd.zip(rdd.zip(rdd)).collect()), - set([((x, (x, x)) for x in range(10)]) + set([(x, (x, x)) for x in range(10)]) ) def test_deleting_input_files(self): From e99ed23ffa887311b8c77d57733ff005d6987bdb Mon Sep 17 00:00:00 2001 From: Andrew Ray Date: Wed, 13 Sep 2017 23:43:58 -0500 Subject: [PATCH 3/8] convert batches to list in PairDeserializer so we can check the len are equal --- python/pyspark/serializers.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index a464366bb922..a72c5085d7a2 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -333,9 +333,6 @@ class PairDeserializer(Serializer): Deserializes the JavaRDD zip() of two PythonRDDs. Due to pyspark batching we cannot simply use the result of the Java RDD zip, we additionally need to do the zip within each pair of batches. - - It is the responsibility of the user of this class to ensure the batch sizes of the key and - value serializer are the same size. If they are not this will give incorrect results. """ def __init__(self, key_ser, val_ser): @@ -346,6 +343,11 @@ def _load_stream_without_unbatching(self, stream): key_batch_stream = self.key_ser._load_stream_without_unbatching(stream) val_batch_stream = self.val_ser._load_stream_without_unbatching(stream) for (key_batch, val_batch) in zip(key_batch_stream, val_batch_stream): + key_batch = list(key_batch) + val_batch = list(val_batch) + if len(key_batch) != len(val_batch): + raise ValueError("Can not deserialize PairRDD with different number of items" + " in batches: (%d, %d)" % (len(key_batch), len(val_batch))) # for correctness with repeated cartesian/zip this must be returned as one batch yield zip(key_batch, val_batch) From 66477f8aebad08d8d3690d758ebfb85a084b5395 Mon Sep 17 00:00:00 2001 From: Andrew Ray Date: Thu, 14 Sep 2017 09:50:45 -0500 Subject: [PATCH 4/8] update doc and test --- python/pyspark/serializers.py | 2 +- python/pyspark/tests.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index a72c5085d7a2..d51ded9fba5e 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -97,7 +97,7 @@ def load_stream(self, stream): def _load_stream_without_unbatching(self, stream): """ - Return an iterator of deserialized batches (lists) of objects from the input stream. + Return an iterator of deserialized batches (iterable) of objects from the input stream. if the serializer does not operate on batches the default implementation returns an iterator of single element lists. """ diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 1bcdedc1db2a..5a00dd523bee 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -646,14 +646,14 @@ def test_cartesian_chaining(self): def test_zip_chaining(self): # Tests for SPARK-21985 - rdd = self.sc.parallelize(range(10), 2) + rdd = self.sc.parallelize('abc') self.assertSetEqual( set(rdd.zip(rdd).zip(rdd).collect()), - set([((x, x), x) for x in range(10)]) + set([((x, x), x) for x in 'abc']) ) self.assertSetEqual( set(rdd.zip(rdd.zip(rdd)).collect()), - set([(x, (x, x)) for x in range(10)]) + set([(x, (x, x)) for x in 'abc']) ) def test_deleting_input_files(self): From 54b7fd0de15b1674fd8a285708ef669c29fb1ed9 Mon Sep 17 00:00:00 2001 From: Andrew Ray Date: Thu, 14 Sep 2017 22:58:35 -0500 Subject: [PATCH 5/8] only convert to list if __len__ not available, set number of partitions in test --- python/pyspark/serializers.py | 4 ++-- python/pyspark/tests.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index d51ded9fba5e..5e3c2fcc9d81 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -343,8 +343,8 @@ def _load_stream_without_unbatching(self, stream): key_batch_stream = self.key_ser._load_stream_without_unbatching(stream) val_batch_stream = self.val_ser._load_stream_without_unbatching(stream) for (key_batch, val_batch) in zip(key_batch_stream, val_batch_stream): - key_batch = list(key_batch) - val_batch = list(val_batch) + key_batch = key_batch if hasattr(key_batch, '__len__') else list(key_batch) + val_batch = val_batch if hasattr(val_batch, '__len__') else list(val_batch) if len(key_batch) != len(val_batch): raise ValueError("Can not deserialize PairRDD with different number of items" " in batches: (%d, %d)" % (len(key_batch), len(val_batch))) diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 5a00dd523bee..9f6684b9de36 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -646,7 +646,7 @@ def test_cartesian_chaining(self): def test_zip_chaining(self): # Tests for SPARK-21985 - rdd = self.sc.parallelize('abc') + rdd = self.sc.parallelize('abc',2) self.assertSetEqual( set(rdd.zip(rdd).zip(rdd).collect()), set([((x, x), x) for x in 'abc']) From f6d42f48e7b22e0758ff92e438e620f52fd95322 Mon Sep 17 00:00:00 2001 From: Andrew Ray Date: Thu, 14 Sep 2017 23:06:43 -0500 Subject: [PATCH 6/8] style --- python/pyspark/tests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 9f6684b9de36..3c108ec92ccc 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -646,7 +646,7 @@ def test_cartesian_chaining(self): def test_zip_chaining(self): # Tests for SPARK-21985 - rdd = self.sc.parallelize('abc',2) + rdd = self.sc.parallelize('abc', 2) self.assertSetEqual( set(rdd.zip(rdd).zip(rdd).collect()), set([((x, x), x) for x in 'abc']) From ff1417e92fe3d2b0f9b829bb6dbebbcc6a60065a Mon Sep 17 00:00:00 2001 From: Andrew Ray Date: Fri, 15 Sep 2017 09:11:39 -0500 Subject: [PATCH 7/8] add comment per review --- python/pyspark/serializers.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index 5e3c2fcc9d81..dcb74efaa808 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -343,6 +343,7 @@ def _load_stream_without_unbatching(self, stream): key_batch_stream = self.key_ser._load_stream_without_unbatching(stream) val_batch_stream = self.val_ser._load_stream_without_unbatching(stream) for (key_batch, val_batch) in zip(key_batch_stream, val_batch_stream): + # the batch is an iterable, we need to check lengths so we convert to list if needed. key_batch = key_batch if hasattr(key_batch, '__len__') else list(key_batch) val_batch = val_batch if hasattr(val_batch, '__len__') else list(val_batch) if len(key_batch) != len(val_batch): From 5282ee5a8624d313ec5995db4d772ec1170ea049 Mon Sep 17 00:00:00 2001 From: Andrew Ray Date: Sun, 17 Sep 2017 09:56:41 -0500 Subject: [PATCH 8/8] nit comment --- python/pyspark/serializers.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index dcb74efaa808..660b19ad2a7c 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -343,7 +343,8 @@ def _load_stream_without_unbatching(self, stream): key_batch_stream = self.key_ser._load_stream_without_unbatching(stream) val_batch_stream = self.val_ser._load_stream_without_unbatching(stream) for (key_batch, val_batch) in zip(key_batch_stream, val_batch_stream): - # the batch is an iterable, we need to check lengths so we convert to list if needed. + # For double-zipped RDDs, the batches can be iterators from other PairDeserializer, + # instead of lists. We need to convert them to lists if needed. key_batch = key_batch if hasattr(key_batch, '__len__') else list(key_batch) val_batch = val_batch if hasattr(val_batch, '__len__') else list(val_batch) if len(key_batch) != len(val_batch):