Skip to content

Commit ba02414

Browse files
committed
Added varargs cogroup to pyspark
1 parent c4a8a51 commit ba02414

File tree

1 file changed

+7
-10
lines changed

1 file changed

+7
-10
lines changed

python/pyspark/join.py

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -79,15 +79,12 @@ def dispatch(seq):
7979
return _do_python_join(rdd, other, numPartitions, dispatch)
8080

8181

82-
def python_cogroup(rdd, other, numPartitions):
83-
vs = rdd.map(lambda (k, v): (k, (1, v)))
84-
ws = other.map(lambda (k, v): (k, (2, v)))
82+
def python_cogroup(rdds, numPartitions):
83+
vrdds = [rdd.map(lambda (k, v): (k, (i, v))) for i, rdd in enumerate(rdds)]
84+
union_vrdds = reduce(lambda acc, other: acc.union(other), vrdds)
8585
def dispatch(seq):
86-
vbuf, wbuf = [], []
86+
bufs = [[] for rdd in vrdds]
8787
for (n, v) in seq:
88-
if n == 1:
89-
vbuf.append(v)
90-
elif n == 2:
91-
wbuf.append(v)
92-
return (ResultIterable(vbuf), ResultIterable(wbuf))
93-
return vs.union(ws).groupByKey(numPartitions).mapValues(dispatch)
88+
bufs[n].append(v)
89+
return tuple(map(ResultIterable, bufs))
90+
return union_vrdds.groupByKey(numPartitions).mapValues(dispatch)

0 commit comments

Comments
 (0)