@@ -1233,7 +1233,7 @@ def _mergeCombiners(iterator):
12331233 combiners [k ] = mergeCombiners (combiners [k ], v )
12341234 return combiners .iteritems ()
12351235 return shuffled .mapPartitions (_mergeCombiners )
1236-
1236+
12371237 def aggregateByKey (self , zeroValue , seqFunc , combFunc , numPartitions = None ):
12381238 """
12391239 Aggregate the values of each key, using given combine functions and a neutral "zero value".
@@ -1245,7 +1245,7 @@ def aggregateByKey(self, zeroValue, seqFunc, combFunc, numPartitions=None):
12451245 """
12461246 def createZero ():
12471247 return copy .deepcopy (zeroValue )
1248-
1248+
12491249 return self .combineByKey (lambda v : seqFunc (createZero (), v ), seqFunc , combFunc , numPartitions )
12501250
12511251 def foldByKey (self , zeroValue , func , numPartitions = None ):
@@ -1324,11 +1324,11 @@ def mapValues(self, f):
13241324 return self .map (map_values_fn , preservesPartitioning = True )
13251325
13261326 # TODO: support varargs cogroup of several RDDs.
1327- def groupWith (self , other ):
1327+ def groupWith (self , other , * others ):
13281328 """
13291329 Alias for cogroup.
13301330 """
1331- return self . cogroup ( other )
1331+ return python_cogroup (( self , other ) + others , numPartitions = None )
13321332
13331333 # TODO: add variant with custom parittioner
13341334 def cogroup (self , other , numPartitions = None ):
@@ -1342,7 +1342,7 @@ def cogroup(self, other, numPartitions=None):
13421342 >>> map((lambda (x,y): (x, (list(y[0]), list(y[1])))), sorted(list(x.cogroup(y).collect())))
13431343 [('a', ([1], [2])), ('b', ([4], []))]
13441344 """
1345- return python_cogroup (self , other , numPartitions )
1345+ return python_cogroup (( self , other ) , numPartitions )
13461346
13471347 def subtractByKey (self , other , numPartitions = None ):
13481348 """
0 commit comments