@@ -292,11 +292,7 @@ def _load_stream_without_unbatching(self, stream):
292292 key_batch_stream = self .key_ser ._load_stream_without_unbatching (stream )
293293 val_batch_stream = self .val_ser ._load_stream_without_unbatching (stream )
294294 for (key_batch , val_batch ) in zip (key_batch_stream , val_batch_stream ):
295- # We must put these batches in lists exactly once and
296- # in order since they are pulling from the same stream
297- key_list = list (key_batch )
298- val_list = list (val_batch )
299- yield product (key_list , val_list )
295+ yield product (key_batch , val_batch )
300296
301297 def load_stream (self , stream ):
302298 return chain .from_iterable (self ._load_stream_without_unbatching (stream ))
@@ -320,14 +316,10 @@ def _load_stream_without_unbatching(self, stream):
320316 key_batch_stream = self .key_ser ._load_stream_without_unbatching (stream )
321317 val_batch_stream = self .val_ser ._load_stream_without_unbatching (stream )
322318 for (key_batch , val_batch ) in zip (key_batch_stream , val_batch_stream ):
323- # We must put these batches in lists exactly once and
324- # in order since they are pulling from the same stream
325- key_list = list (key_batch )
326- val_list = list (val_batch )
327- if len (key_list ) != len (val_list ):
319+ if len (key_batch ) != len (val_batch ):
328320 raise ValueError ("Can not deserialize PairRDD with different number of items"
329- " in batches: (%d, %d)" % (len (key_list ), len (val_list )))
330- yield zip (key_list , val_list )
321+ " in batches: (%d, %d)" % (len (key_batch ), len (val_batch )))
322+ yield zip (key_batch , val_batch )
331323
332324 def load_stream (self , stream ):
333325 return chain .from_iterable (self ._load_stream_without_unbatching (stream ))
0 commit comments