From 6730f72d2d9aa2c535abc9719e589369cc7b4cdb Mon Sep 17 00:00:00 2001 From: David Tolpin Date: Sun, 22 Nov 2015 01:22:31 +0200 Subject: [PATCH 1/3] invFunc=None does not require checkpointing reduceByKeyAndWindow(func, None, window_size, slide_size) is equivalent to reduceByKey(func).window(window_size, slide_size).reduceByKey(func) and should not require checkpointing. --- python/pyspark/streaming/dstream.py | 41 +++++++++++++++-------------- 1 file changed, 21 insertions(+), 20 deletions(-) diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index acec850f02c2d..070db952af9b0 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -542,31 +542,32 @@ def reduceByKeyAndWindow(self, func, invFunc, windowDuration, slideDuration=None reduced = self.reduceByKey(func, numPartitions) - def reduceFunc(t, a, b): - b = b.reduceByKey(func, numPartitions) - r = a.union(b).reduceByKey(func, numPartitions) if a else b - if filterFunc: - r = r.filter(filterFunc) - return r - - def invReduceFunc(t, a, b): - b = b.reduceByKey(func, numPartitions) - joined = a.leftOuterJoin(b, numPartitions) - return joined.mapValues(lambda kv: invFunc(kv[0], kv[1]) - if kv[1] is not None else kv[0]) - - jreduceFunc = TransformFunction(self._sc, reduceFunc, reduced._jrdd_deserializer) if invFunc: + def reduceFunc(t, a, b): + b = b.reduceByKey(func, numPartitions) + r = a.union(b).reduceByKey(func, numPartitions) if a else b + if filterFunc: + r = r.filter(filterFunc) + return r + + def invReduceFunc(t, a, b): + b = b.reduceByKey(func, numPartitions) + joined = a.leftOuterJoin(b, numPartitions) + return joined.mapValues(lambda kv: invFunc(kv[0], kv[1]) + if kv[1] is not None else kv[0]) + + jreduceFunc = TransformFunction(self._sc, reduceFunc, reduced._jrdd_deserializer) jinvReduceFunc = TransformFunction(self._sc, invReduceFunc, reduced._jrdd_deserializer) - else: - jinvReduceFunc = None - if slideDuration is None: - slideDuration = self._slideDuration - dstream = self._sc._jvm.PythonReducedWindowedDStream(reduced._jdstream.dstream(), + if slideDuration is None: + slideDuration = self._slideDuration + dstream = self._sc._jvm.PythonReducedWindowedDStream(reduced._jdstream.dstream(), jreduceFunc, jinvReduceFunc, self._ssc._jduration(windowDuration), self._ssc._jduration(slideDuration)) - return DStream(dstream.asJavaDStream(), self._ssc, self._sc.serializer) + return DStream(dstream.asJavaDStream(), self._ssc, self._sc.serializer) + else: + return reduced.window(windowDuration, slideDuration).reduceByKey(func, numPartitions) + def updateStateByKey(self, updateFunc, numPartitions=None): """ From 60cfd642757ad94ad7dba0ae979036a83ea7aaeb Mon Sep 17 00:00:00 2001 From: David Tolpin Date: Wed, 16 Dec 2015 22:51:27 +0200 Subject: [PATCH 2/3] PEP8 failed checks fixed --- python/pyspark/streaming/dstream.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index 070db952af9b0..3b11ced8f34f1 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -561,14 +561,13 @@ def invReduceFunc(t, a, b): if slideDuration is None: slideDuration = self._slideDuration dstream = self._sc._jvm.PythonReducedWindowedDStream(reduced._jdstream.dstream(), - jreduceFunc, jinvReduceFunc, - self._ssc._jduration(windowDuration), - self._ssc._jduration(slideDuration)) + jreduceFunc, jinvReduceFunc, + self._ssc._jduration(windowDuration), + self._ssc._jduration(slideDuration)) return DStream(dstream.asJavaDStream(), self._ssc, self._sc.serializer) else: return reduced.window(windowDuration, slideDuration).reduceByKey(func, numPartitions) - def updateStateByKey(self, updateFunc, numPartitions=None): """ Return a new "state" DStream where the state for each key is updated by applying From 8a92809816f579a5563b3f0bfffbe5184c29a576 Mon Sep 17 00:00:00 2001 From: David Tolpin Date: Thu, 17 Dec 2015 00:11:10 +0200 Subject: [PATCH 3/3] made lines to be less than 100 characters long --- python/pyspark/streaming/dstream.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index 3b11ced8f34f1..9969a08e44948 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -560,10 +560,11 @@ def invReduceFunc(t, a, b): jinvReduceFunc = TransformFunction(self._sc, invReduceFunc, reduced._jrdd_deserializer) if slideDuration is None: slideDuration = self._slideDuration - dstream = self._sc._jvm.PythonReducedWindowedDStream(reduced._jdstream.dstream(), - jreduceFunc, jinvReduceFunc, - self._ssc._jduration(windowDuration), - self._ssc._jduration(slideDuration)) + dstream = self._sc._jvm.PythonReducedWindowedDStream( + reduced._jdstream.dstream(), + jreduceFunc, jinvReduceFunc, + self._ssc._jduration(windowDuration), + self._ssc._jduration(slideDuration)) return DStream(dstream.asJavaDStream(), self._ssc, self._sc.serializer) else: return reduced.window(windowDuration, slideDuration).reduceByKey(func, numPartitions)