Skip to content

Commit 6730f72

Browse files
committed
invFunc=None does not require checkpointing
reduceByKeyAndWindow(func, None, window_size, slide_size) is equivalent to reduceByKey(func).window(window_size, slide_size).reduceByKey(func) and should not require checkpointing.
1 parent ff442bb commit 6730f72

File tree

1 file changed

+21
-20
lines changed

1 file changed

+21
-20
lines changed

python/pyspark/streaming/dstream.py

Lines changed: 21 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -542,31 +542,32 @@ def reduceByKeyAndWindow(self, func, invFunc, windowDuration, slideDuration=None
542542

543543
reduced = self.reduceByKey(func, numPartitions)
544544

545-
def reduceFunc(t, a, b):
546-
b = b.reduceByKey(func, numPartitions)
547-
r = a.union(b).reduceByKey(func, numPartitions) if a else b
548-
if filterFunc:
549-
r = r.filter(filterFunc)
550-
return r
551-
552-
def invReduceFunc(t, a, b):
553-
b = b.reduceByKey(func, numPartitions)
554-
joined = a.leftOuterJoin(b, numPartitions)
555-
return joined.mapValues(lambda kv: invFunc(kv[0], kv[1])
556-
if kv[1] is not None else kv[0])
557-
558-
jreduceFunc = TransformFunction(self._sc, reduceFunc, reduced._jrdd_deserializer)
559545
if invFunc:
546+
def reduceFunc(t, a, b):
547+
b = b.reduceByKey(func, numPartitions)
548+
r = a.union(b).reduceByKey(func, numPartitions) if a else b
549+
if filterFunc:
550+
r = r.filter(filterFunc)
551+
return r
552+
553+
def invReduceFunc(t, a, b):
554+
b = b.reduceByKey(func, numPartitions)
555+
joined = a.leftOuterJoin(b, numPartitions)
556+
return joined.mapValues(lambda kv: invFunc(kv[0], kv[1])
557+
if kv[1] is not None else kv[0])
558+
559+
jreduceFunc = TransformFunction(self._sc, reduceFunc, reduced._jrdd_deserializer)
560560
jinvReduceFunc = TransformFunction(self._sc, invReduceFunc, reduced._jrdd_deserializer)
561-
else:
562-
jinvReduceFunc = None
563-
if slideDuration is None:
564-
slideDuration = self._slideDuration
565-
dstream = self._sc._jvm.PythonReducedWindowedDStream(reduced._jdstream.dstream(),
561+
if slideDuration is None:
562+
slideDuration = self._slideDuration
563+
dstream = self._sc._jvm.PythonReducedWindowedDStream(reduced._jdstream.dstream(),
566564
jreduceFunc, jinvReduceFunc,
567565
self._ssc._jduration(windowDuration),
568566
self._ssc._jduration(slideDuration))
569-
return DStream(dstream.asJavaDStream(), self._ssc, self._sc.serializer)
567+
return DStream(dstream.asJavaDStream(), self._ssc, self._sc.serializer)
568+
else:
569+
return reduced.window(windowDuration, slideDuration).reduceByKey(func, numPartitions)
570+
570571

571572
def updateStateByKey(self, updateFunc, numPartitions=None):
572573
"""

0 commit comments

Comments
 (0)