Skip to content

Commit a9abcaa

Browse files
Davies Liujkbradley
authored andcommitted
[SPARK-5973] [PySpark] fix zip with two RDDs with AutoBatchedSerializer
Author: Davies Liu <[email protected]> Closes #4745 from davies/fix_zip and squashes the following commits: 2124b2c [Davies Liu] Update tests.py b5c828f [Davies Liu] increase the number of records c1e40fd [Davies Liu] fix zip with two RDDs with AutoBatchedSerializer (cherry picked from commit da505e5) Signed-off-by: Joseph K. Bradley <[email protected]>
1 parent 3ad00ee commit a9abcaa

File tree

2 files changed

+7
-1
lines changed

2 files changed

+7
-1
lines changed

python/pyspark/rdd.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1815,7 +1815,7 @@ def batch_as(rdd, batchSize):
18151815

18161816
my_batch = get_batch_size(self._jrdd_deserializer)
18171817
other_batch = get_batch_size(other._jrdd_deserializer)
1818-
if my_batch != other_batch:
1818+
if my_batch != other_batch or not my_batch:
18191819
# use the smallest batchSize for both of them
18201820
batchSize = min(my_batch, other_batch)
18211821
if batchSize <= 0:

python/pyspark/tests.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -543,6 +543,12 @@ def test_zip_with_different_serializers(self):
543543
# regression test for bug in _reserializer()
544544
self.assertEqual(cnt, t.zip(rdd).count())
545545

546+
def test_zip_with_different_object_sizes(self):
547+
# regress test for SPARK-5973
548+
a = self.sc.parallelize(range(10000)).map(lambda i: '*' * i)
549+
b = self.sc.parallelize(range(10000, 20000)).map(lambda i: '*' * i)
550+
self.assertEqual(10000, a.zip(b).count())
551+
546552
def test_zip_with_different_number_of_items(self):
547553
a = self.sc.parallelize(range(5), 2)
548554
# different number of partitions

0 commit comments

Comments
 (0)