Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions python/pyspark/streaming/dstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -524,8 +524,8 @@ def reduceByKeyAndWindow(self, func, invFunc, windowDuration, slideDuration=None
`invFunc` can be None, then it will reduce all the RDDs in window, could be slower
than having `invFunc`.

@param reduceFunc: associative reduce function
@param invReduceFunc: inverse function of `reduceFunc`
@param func: associative reduce function
@param invFunc: inverse function of `reduceFunc`
@param windowDuration: width of the window; must be a multiple of this DStream's
batching interval
@param slideDuration: sliding interval of the window (i.e., the interval after which
Expand Down Expand Up @@ -556,7 +556,7 @@ def invReduceFunc(t, a, b):
if kv[1] is not None else kv[0])

jreduceFunc = TransformFunction(self._sc, reduceFunc, reduced._jrdd_deserializer)
if invReduceFunc:
if invFunc:
jinvReduceFunc = TransformFunction(self._sc, invReduceFunc, reduced._jrdd_deserializer)
else:
jinvReduceFunc = None
Expand Down
11 changes: 11 additions & 0 deletions python/pyspark/streaming/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,17 @@ def test_reduce_by_invalid_window(self):
self.assertRaises(ValueError, lambda: d1.reduceByKeyAndWindow(None, None, 0.1, 0.1))
self.assertRaises(ValueError, lambda: d1.reduceByKeyAndWindow(None, None, 1, 0.1))

def test_reduce_by_key_and_window_with_none_invFunc(self):
input = [range(1), range(2), range(3), range(4), range(5), range(6)]

def func(dstream):
return dstream.map(lambda x: (x, 1))\
.reduceByKeyAndWindow(operator.add, None, 5, 1)\
.filter(lambda kv: kv[1] > 0).count()

expected = [[2], [4], [6], [6], [6], [6]]
self._test_func(input, func, expected)


class StreamingContextTests(PySparkStreamingTestCase):

Expand Down