Skip to content

Commit ca23c3b

Browse files
Davies LiuJoshRosen
authored andcommitted
[SPARK-8202] [PYSPARK] fix infinite loop during external sort in PySpark
The batch size during external sort will grow up to max 10000, then shrink down to zero, causing infinite loop. Given the assumption that the items usually have similar size, so we don't need to adjust the batch size after first spill. cc JoshRosen rxin angelini Author: Davies Liu <[email protected]> Closes #6714 from davies/batch_size and squashes the following commits: b170dfb [Davies Liu] update test b9be832 [Davies Liu] Merge branch 'batch_size' of github.com:davies/spark into batch_size 6ade745 [Davies Liu] update test 5c21777 [Davies Liu] Update shuffle.py e746aec [Davies Liu] fix batch size during sort
1 parent 9dabc12 commit ca23c3b

File tree

2 files changed

+5
-5
lines changed

2 files changed

+5
-5
lines changed

python/pyspark/shuffle.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -486,7 +486,7 @@ def sorted(self, iterator, key=None, reverse=False):
486486
goes above the limit.
487487
"""
488488
global MemoryBytesSpilled, DiskBytesSpilled
489-
batch, limit = 100, self.memory_limit
489+
batch, limit = 100, self._next_limit()
490490
chunks, current_chunk = [], []
491491
iterator = iter(iterator)
492492
while True:
@@ -512,9 +512,6 @@ def load(f):
512512
f.close()
513513
chunks.append(load(open(path, 'rb')))
514514
current_chunk = []
515-
gc.collect()
516-
batch //= 2
517-
limit = self._next_limit()
518515
MemoryBytesSpilled += max(used_memory - get_used_memory(), 0) << 20
519516
DiskBytesSpilled += os.path.getsize(path)
520517
os.unlink(path) # data will be deleted after close

python/pyspark/tests.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,9 +179,12 @@ def test_in_memory_sort(self):
179179
list(sorter.sorted(l, key=lambda x: -x, reverse=True)))
180180

181181
def test_external_sort(self):
182+
class CustomizedSorter(ExternalSorter):
183+
def _next_limit(self):
184+
return self.memory_limit
182185
l = list(range(1024))
183186
random.shuffle(l)
184-
sorter = ExternalSorter(1)
187+
sorter = CustomizedSorter(1)
185188
self.assertEqual(sorted(l), list(sorter.sorted(l)))
186189
self.assertGreater(shuffle.DiskBytesSpilled, 0)
187190
last = shuffle.DiskBytesSpilled

0 commit comments

Comments
 (0)