Skip to content

Commit 54b7fd0

Browse files
committed
only convert to list if __len__ not available, set number of partitions in test
1 parent 66477f8 commit 54b7fd0

File tree

2 files changed

+3
-3
lines changed

2 files changed

+3
-3
lines changed

python/pyspark/serializers.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -343,8 +343,8 @@ def _load_stream_without_unbatching(self, stream):
343343
key_batch_stream = self.key_ser._load_stream_without_unbatching(stream)
344344
val_batch_stream = self.val_ser._load_stream_without_unbatching(stream)
345345
for (key_batch, val_batch) in zip(key_batch_stream, val_batch_stream):
346-
key_batch = list(key_batch)
347-
val_batch = list(val_batch)
346+
key_batch = key_batch if hasattr(key_batch, '__len__') else list(key_batch)
347+
val_batch = val_batch if hasattr(val_batch, '__len__') else list(val_batch)
348348
if len(key_batch) != len(val_batch):
349349
raise ValueError("Can not deserialize PairRDD with different number of items"
350350
" in batches: (%d, %d)" % (len(key_batch), len(val_batch)))

python/pyspark/tests.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -646,7 +646,7 @@ def test_cartesian_chaining(self):
646646

647647
def test_zip_chaining(self):
648648
# Tests for SPARK-21985
649-
rdd = self.sc.parallelize('abc')
649+
rdd = self.sc.parallelize('abc',2)
650650
self.assertSetEqual(
651651
set(rdd.zip(rdd).zip(rdd).collect()),
652652
set([((x, x), x) for x in 'abc'])

0 commit comments

Comments
 (0)