@@ -96,6 +96,11 @@ def load_stream(self, stream):
9696 raise NotImplementedError
9797
9898 def _load_stream_without_unbatching (self , stream ):
99+ """
100+ Return an iterator of deserialized batches (lists) of objects from the input stream.
101+ if the serializer does not operate on batches the default implementation returns an
102+ iterator of single element lists.
103+ """
99104 return map (lambda x : [x ], self .load_stream (stream ))
100105
101106 # Note: our notion of "equality" is that output generated by
@@ -282,6 +287,8 @@ class CartesianDeserializer(Serializer):
282287
283288 """
284289 Deserializes the JavaRDD cartesian() of two PythonRDDs.
290+ Due to pyspark batching we cannot simply use the result of the Java RDD cartesian,
291+ we additionally need to do the cartesian within each pair of batches.
285292 """
286293
287294 def __init__ (self , key_ser , val_ser ):
@@ -292,6 +299,7 @@ def _load_stream_without_unbatching(self, stream):
292299 key_batch_stream = self .key_ser ._load_stream_without_unbatching (stream )
293300 val_batch_stream = self .val_ser ._load_stream_without_unbatching (stream )
294301 for (key_batch , val_batch ) in zip (key_batch_stream , val_batch_stream ):
302+ # for correctness with repeated cartesian/zip this must be returned as one batch
295303 yield product (key_batch , val_batch )
296304
297305 def load_stream (self , stream ):
@@ -306,6 +314,8 @@ class PairDeserializer(Serializer):
306314
307315 """
308316 Deserializes the JavaRDD zip() of two PythonRDDs.
317+ Due to pyspark batching we cannot simply use the result of the Java RDD zip,
318+ we additionally need to do the zip within each pair of batches.
309319 """
310320
311321 def __init__ (self , key_ser , val_ser ):
@@ -319,6 +329,7 @@ def _load_stream_without_unbatching(self, stream):
319329 if len (key_batch ) != len (val_batch ):
320330 raise ValueError ("Can not deserialize PairRDD with different number of items"
321331 " in batches: (%d, %d)" % (len (key_batch ), len (val_batch )))
332+ # for correctness with repeated cartesian/zip this must be returned as one batch
322333 yield zip (key_batch , val_batch )
323334
324335 def load_stream (self , stream ):
0 commit comments