Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions python/pyspark/shuffle.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ def sorted(self, iterator, key=None, reverse=False):
goes above the limit.
"""
global MemoryBytesSpilled, DiskBytesSpilled
batch, limit = 100, self.memory_limit
batch, limit = 100, self._next_limit()
chunks, current_chunk = [], []
iterator = iter(iterator)
while True:
Expand All @@ -512,9 +512,6 @@ def load(f):
f.close()
chunks.append(load(open(path, 'rb')))
current_chunk = []
gc.collect()
batch //= 2
limit = self._next_limit()
MemoryBytesSpilled += max(used_memory - get_used_memory(), 0) << 20
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the call to gc.collect being removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We won't change limit to _next_limit() (which call get_used_memory()). This line here was to get better number about how much memory was used, is not needed anymore.

DiskBytesSpilled += os.path.getsize(path)
os.unlink(path) # data will be deleted after close
Expand Down
5 changes: 4 additions & 1 deletion python/pyspark/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,12 @@ def test_in_memory_sort(self):
list(sorter.sorted(l, key=lambda x: -x, reverse=True)))

def test_external_sort(self):
class CustomizedSorter(ExternalSorter):
def _next_limit(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that we should add a comment here to explain why we're mocking out this part of the code; it doesn't seem self-evident to me and I'm worried that it's going to confuse future readers of this code.

Also, do you think that it's worth adding a separate test case for this path and keeping the old test? There might be some duplication of the code which does assertions over metrics, but we possibly can factor it out into a shared method.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like the intent here is to mock get_used_memory(); I'm not super familiar with Python mocking frameworks, but if this was Java then I imagine that get_used_memory() would be a method of some context / environment object that we could mock.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, without the mock, it will take a long time to reach memory limit (slowing down tests).

return self.memory_limit
l = list(range(1024))
random.shuffle(l)
sorter = ExternalSorter(1)
sorter = CustomizedSorter(1)
self.assertEqual(sorted(l), list(sorter.sorted(l)))
self.assertGreater(shuffle.DiskBytesSpilled, 0)
last = shuffle.DiskBytesSpilled
Expand Down