@@ -332,7 +332,7 @@ def test_groupByKey_batch(self):
332332 """Basic operation test for DStream.groupByKey with batch deserializer."""
333333 test_input = [range (1 , 5 ), [1 , 1 , 1 , 2 , 2 , 3 ], ["a" , "a" , "b" , "" , "" , "" ]]
334334 def test_func (dstream ):
335- return dstream .map (lambda x : (x ,1 )).groupByKey ()
335+ return dstream .map (lambda x : (x , 1 )).groupByKey ()
336336 expected_output = [[(1 , [1 ]), (2 , [1 ]), (3 , [1 ]), (4 , [1 ])],
337337 [(1 , [1 , 1 , 1 ]), (2 , [1 , 1 ]), (3 , [1 ])],
338338 [("a" , [1 , 1 ]), ("b" , [1 ]), ("" , [1 , 1 , 1 ])]]
@@ -345,8 +345,9 @@ def test_func(dstream):
345345 def test_groupByKey_unbatch (self ):
346346 """Basic operation test for DStream.groupByKey with unbatch deserializer."""
347347 test_input = [range (1 , 4 ), [1 , 1 , "" ], ["a" , "a" , "b" ]]
348+
348349 def test_func (dstream ):
349- return dstream .map (lambda x : (x ,1 )).groupByKey ()
350+ return dstream .map (lambda x : (x , 1 )).groupByKey ()
350351 expected_output = [[(1 , [1 ]), (2 , [1 ]), (3 , [1 ])],
351352 [(1 , [1 , 1 ]), ("" , [1 ])],
352353 [("a" , [1 , 1 ]), ("b" , [1 ])]]
@@ -356,6 +357,36 @@ def test_func(dstream):
356357 self ._sort_result_based_on_key (result )
357358 self .assertEqual (expected_output , output )
358359
360+ def test_combineByKey_batch (self ):
361+ """Basic operation test for DStream.combineByKey with batch deserializer."""
362+ test_input = [range (1 , 5 ), [1 , 1 , 1 , 2 , 2 , 3 ], ["a" , "a" , "b" , "" , "" , "" ]]
363+
364+ def test_func (dstream ):
365+ def add (a , b ): return a + str (b )
366+ return dstream .map (lambda x : (x , 1 )).combineByKey (str , add , add )
367+ expected_output = [[(1 , "1" ), (2 , "1" ), (3 , "1" ), (4 , "1" )],
368+ [(1 , "111" ), (2 , "11" ), (3 , "1" )],
369+ [("a" , "11" ), ("b" , "1" ), ("" , "111" )]]
370+ output = self ._run_stream (test_input , test_func , expected_output )
371+ for result in (output , expected_output ):
372+ self ._sort_result_based_on_key (result )
373+ self .assertEqual (expected_output , output )
374+
375+ def test_combineByKey_unbatch (self ):
376+ """Basic operation test for DStream.combineByKey with unbatch deserializer."""
377+ test_input = [range (1 , 4 ), [1 , 1 , "" ], ["a" , "a" , "b" ]]
378+
379+ def test_func (dstream ):
380+ def add (a , b ): return a + str (b )
381+ return dstream .map (lambda x : (x , 1 )).combineByKey (str , add , add )
382+ expected_output = [[(1 , "1" ), (2 , "1" ), (3 , "1" )],
383+ [(1 , "11" ), ("" , "1" )],
384+ [("a" , "11" ), ("b" , "1" )]]
385+ output = self ._run_stream (test_input , test_func , expected_output )
386+ for result in (output , expected_output ):
387+ self ._sort_result_based_on_key (result )
388+ self .assertEqual (expected_output , output )
389+
359390 def _convert_iter_value_to_list (self , outputs ):
360391 """Return key value pair list. Value is converted to iterator to list."""
361392 result = list ()
0 commit comments