2020import operator
2121
2222from pyspark .serializers import NoOpSerializer ,\
23- BatchedSerializer , CloudPickleSerializer , pack_long
23+ BatchedSerializer , CloudPickleSerializer , pack_long ,\
24+ CompressedSerializer
2425from pyspark .rdd import _JavaStackTrace
2526from pyspark .storagelevel import StorageLevel
2627from pyspark .resultiterable import ResultIterable
@@ -458,7 +459,8 @@ def _jdstream(self):
458459 serializer = self .ctx .serializer
459460
460461 command = (self .func , self ._prev_jrdd_deserializer , serializer )
461- pickled_command = CloudPickleSerializer ().dumps (command )
462+ ser = CompressedSerializer (CloudPickleSerializer ())
463+ pickled_command = ser .dumps (command )
462464 broadcast_vars = ListConverter ().convert (
463465 [x ._jbroadcast for x in self .ctx ._pickled_broadcast_vars ],
464466 self .ctx ._gateway ._gateway_client )
@@ -467,12 +469,13 @@ def _jdstream(self):
467469 env = MapConverter ().convert (self .ctx .environment ,
468470 self .ctx ._gateway ._gateway_client )
469471 includes = ListConverter ().convert (self .ctx ._python_includes ,
470- self .ctx ._gateway ._gateway_client )
472+ self .ctx ._gateway ._gateway_client )
471473 python_dstream = self .ctx ._jvm .PythonDStream (self ._prev_jdstream .dstream (),
472- bytearray (pickled_command ),
473- env , includes , self .preservesPartitioning ,
474- self .ctx .pythonExec , broadcast_vars , self .ctx ._javaAccumulator ,
475- class_tag )
474+ bytearray (pickled_command ),
475+ env , includes , self .preservesPartitioning ,
476+ self .ctx .pythonExec ,
477+ broadcast_vars , self .ctx ._javaAccumulator ,
478+ class_tag )
476479 self ._jdstream_val = python_dstream .asJavaDStream ()
477480 return self ._jdstream_val
478481
0 commit comments