Skip to content

Commit 4879c75

Browse files
committed
CR feedback, fix issue with empty RDDs in aggregate
1 parent 70b4724 commit 4879c75

File tree

1 file changed

+5
-3
lines changed

1 file changed

+5
-3
lines changed

python/pyspark/rdd.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -584,14 +584,16 @@ def aggregate(self, zeroValue, seqOp, combOp):
584584
>>> combOp = (lambda x, y: (x[0] + y[0], x[1] + y[1]))
585585
>>> sc.parallelize([1, 2, 3, 4]).aggregate((0, 0), seqOp, combOp)
586586
(10, 4)
587+
>>> sc.parallelize([]).aggregate((0, 0), seqOp, combOp)
588+
(0, 0)
587589
"""
588590
def func(iterator):
589591
acc = zeroValue
590592
for obj in iterator:
591593
acc = seqOp(acc, obj)
592-
if acc is not None:
593-
yield acc
594-
return self.mapPartitions(func).reduce(combOp)
594+
yield acc
595+
596+
return self.mapPartitions(func).fold(zeroValue, combOp)
595597

596598
def sum(self):
597599
"""

0 commit comments

Comments
 (0)