Skip to content

Commit 001c446

Browse files
dtolpintdas
authored andcommitted
[SPARK-11812][PYSPARK] invFunc=None works properly with python's reduceByKeyAndWindow
invFunc is optional and can be None. Instead of invFunc (the parameter) invReduceFunc (a local function) was checked for trueness (that is, not None, in this context). A local function is never None, thus the case of invFunc=None (a common one when inverse reduction is not defined) was treated incorrectly, resulting in loss of data. In addition, the docstring used wrong parameter names, also fixed. Author: David Tolpin <[email protected]> Closes #9775 from dtolpin/master. (cherry picked from commit 599a8c6) Signed-off-by: Tathagata Das <[email protected]>
1 parent 9957925 commit 001c446

File tree

2 files changed

+14
-3
lines changed

2 files changed

+14
-3
lines changed

python/pyspark/streaming/dstream.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -524,8 +524,8 @@ def reduceByKeyAndWindow(self, func, invFunc, windowDuration, slideDuration=None
524524
`invFunc` can be None, then it will reduce all the RDDs in window, could be slower
525525
than having `invFunc`.
526526
527-
@param reduceFunc: associative reduce function
528-
@param invReduceFunc: inverse function of `reduceFunc`
527+
@param func: associative reduce function
528+
@param invFunc: inverse function of `reduceFunc`
529529
@param windowDuration: width of the window; must be a multiple of this DStream's
530530
batching interval
531531
@param slideDuration: sliding interval of the window (i.e., the interval after which
@@ -556,7 +556,7 @@ def invReduceFunc(t, a, b):
556556
if kv[1] is not None else kv[0])
557557

558558
jreduceFunc = TransformFunction(self._sc, reduceFunc, reduced._jrdd_deserializer)
559-
if invReduceFunc:
559+
if invFunc:
560560
jinvReduceFunc = TransformFunction(self._sc, invReduceFunc, reduced._jrdd_deserializer)
561561
else:
562562
jinvReduceFunc = None

python/pyspark/streaming/tests.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -448,6 +448,17 @@ def test_reduce_by_invalid_window(self):
448448
self.assertRaises(ValueError, lambda: d1.reduceByKeyAndWindow(None, None, 0.1, 0.1))
449449
self.assertRaises(ValueError, lambda: d1.reduceByKeyAndWindow(None, None, 1, 0.1))
450450

451+
def test_reduce_by_key_and_window_with_none_invFunc(self):
452+
input = [range(1), range(2), range(3), range(4), range(5), range(6)]
453+
454+
def func(dstream):
455+
return dstream.map(lambda x: (x, 1))\
456+
.reduceByKeyAndWindow(operator.add, None, 5, 1)\
457+
.filter(lambda kv: kv[1] > 0).count()
458+
459+
expected = [[2], [4], [6], [6], [6], [6]]
460+
self._test_func(input, func, expected)
461+
451462

452463
class StreamingContextTests(PySparkStreamingTestCase):
453464

0 commit comments

Comments
 (0)