From e746aeca630448b3bb9d425d8aefa496385f39ed Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Mon, 8 Jun 2015 23:26:29 -0700 Subject: [PATCH 1/4] fix batch size during sort --- python/pyspark/shuffle.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py index 81c420ce16541..580e75a1628e8 100644 --- a/python/pyspark/shuffle.py +++ b/python/pyspark/shuffle.py @@ -512,9 +512,6 @@ def load(f): f.close() chunks.append(load(open(path, 'rb'))) current_chunk = [] - gc.collect() - batch //= 2 - limit = self._next_limit() MemoryBytesSpilled += max(used_memory - get_used_memory(), 0) << 20 DiskBytesSpilled += os.path.getsize(path) os.unlink(path) # data will be deleted after close From 5c21777fd35fa1e2d637f59b038744ebb95a4f2f Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Tue, 9 Jun 2015 09:43:14 -0700 Subject: [PATCH 2/4] Update shuffle.py --- python/pyspark/shuffle.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py index 580e75a1628e8..67752c0d150b9 100644 --- a/python/pyspark/shuffle.py +++ b/python/pyspark/shuffle.py @@ -486,7 +486,7 @@ def sorted(self, iterator, key=None, reverse=False): goes above the limit. """ global MemoryBytesSpilled, DiskBytesSpilled - batch, limit = 100, self.memory_limit + batch, limit = 100, self._next_limit() chunks, current_chunk = [], [] iterator = iter(iterator) while True: From 6ade74518ad624d3fa49cfbbb03e0ab95e72c276 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Tue, 9 Jun 2015 17:48:02 -0700 Subject: [PATCH 3/4] update test --- python/pyspark/tests.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index f9fb37f7fc139..7257083e5cbc3 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -179,7 +179,7 @@ def test_in_memory_sort(self): list(sorter.sorted(l, key=lambda x: -x, reverse=True))) def test_external_sort(self): - l = list(range(1024)) + l = list(int(str(i) * 10) for i in range(1020)) random.shuffle(l) sorter = ExternalSorter(1) self.assertEqual(sorted(l), list(sorter.sorted(l))) From b170dfbca1d87a3738aea23ee0472f2d661728a2 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Tue, 9 Jun 2015 17:58:33 -0700 Subject: [PATCH 4/4] update test --- python/pyspark/tests.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 7257083e5cbc3..74e848c80f185 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -179,9 +179,12 @@ def test_in_memory_sort(self): list(sorter.sorted(l, key=lambda x: -x, reverse=True))) def test_external_sort(self): - l = list(int(str(i) * 10) for i in range(1020)) + class CustomizedSorter(ExternalSorter): + def _next_limit(self): + return self.memory_limit + l = list(range(1024)) random.shuffle(l) - sorter = ExternalSorter(1) + sorter = CustomizedSorter(1) self.assertEqual(sorted(l), list(sorter.sorted(l))) self.assertGreater(shuffle.DiskBytesSpilled, 0) last = shuffle.DiskBytesSpilled