Skip to content

Commit 37d71f7

Browse files
committed
balance the partitions
1 parent 902f036 commit 37d71f7

File tree

1 file changed

+4
-2
lines changed

1 file changed

+4
-2
lines changed

python/pyspark/shuffle.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,7 @@ class ExternalMerger(Merger):
189189
MAX_TOTAL_PARTITIONS = 4096
190190

191191
def __init__(self, aggregator, memory_limit=512, serializer=None,
192-
localdirs=None, scale=1, partitions=64, batch=10000):
192+
localdirs=None, scale=1, partitions=59, batch=1000):
193193
Merger.__init__(self, aggregator)
194194
self.memory_limit = memory_limit
195195
# default serializer is only used for tests
@@ -208,6 +208,8 @@ def __init__(self, aggregator, memory_limit=512, serializer=None,
208208
self.pdata = []
209209
# number of chunks dumped into disks
210210
self.spills = 0
211+
# randomize the hash of key, id(o) is the address of o (aligned by 8)
212+
self._seed = id(self) + 7
211213

212214
def _get_dirs(self):
213215
""" Get all the directories """
@@ -246,7 +248,7 @@ def mergeValues(self, iterator):
246248

247249
def _partition(self, key):
248250
""" Return the partition for key """
249-
return (hash(key) / self.scale) % self.partitions
251+
return hash((key, self._seed)) % self.partitions
250252

251253
def _partitioned_mergeValues(self, iterator, limit=0):
252254
""" Partition the items by key, then combine them """

0 commit comments

Comments
 (0)