Skip to content

Commit e99ed23

Browse files
committed
convert batches to list in PairDeserializer so we can check the len are equal
1 parent 0d64a6d commit e99ed23

File tree

1 file changed

+5
-3
lines changed

1 file changed

+5
-3
lines changed

python/pyspark/serializers.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -333,9 +333,6 @@ class PairDeserializer(Serializer):
333333
Deserializes the JavaRDD zip() of two PythonRDDs.
334334
Due to pyspark batching we cannot simply use the result of the Java RDD zip,
335335
we additionally need to do the zip within each pair of batches.
336-
337-
It is the responsibility of the user of this class to ensure the batch sizes of the key and
338-
value serializer are the same size. If they are not this will give incorrect results.
339336
"""
340337

341338
def __init__(self, key_ser, val_ser):
@@ -346,6 +343,11 @@ def _load_stream_without_unbatching(self, stream):
346343
key_batch_stream = self.key_ser._load_stream_without_unbatching(stream)
347344
val_batch_stream = self.val_ser._load_stream_without_unbatching(stream)
348345
for (key_batch, val_batch) in zip(key_batch_stream, val_batch_stream):
346+
key_batch = list(key_batch)
347+
val_batch = list(val_batch)
348+
if len(key_batch) != len(val_batch):
349+
raise ValueError("Can not deserialize PairRDD with different number of items"
350+
" in batches: (%d, %d)" % (len(key_batch), len(val_batch)))
349351
# for correctness with repeated cartesian/zip this must be returned as one batch
350352
yield zip(key_batch, val_batch)
351353

0 commit comments

Comments
 (0)