Skip to content

Commit e74b785

Browse files
committed
fix code style and change next_limit to memory_limit
1 parent 400be01 commit e74b785

File tree

2 files changed

+18
-17
lines changed

2 files changed

+18
-17
lines changed

python/pyspark/rdd.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1229,7 +1229,7 @@ def partitionBy(self, numPartitions, partitionFunc=portable_hash):
12291229
# Instead, we'll form the hash buckets in Python,
12301230
# transferring O(numPartitions) objects to Java.
12311231
# Each object is a (splitNumber, [objects]) pair.
1232-
# In order to void too huge objects, the objects are
1232+
# In order to avoid too huge objects, the objects are
12331233
# grouped into chunks.
12341234
outputSerializer = self.ctx._unbatched_serializer
12351235

@@ -1316,11 +1316,11 @@ def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
13161316
spill = (self.ctx._conf.get("spark.shuffle.spill", 'True').lower()
13171317
== 'true')
13181318
memory = (_parse_memory(self.ctx._conf.get(
1319-
"spark.python.worker.memory","512m")
1319+
"spark.python.worker.memory", "512m")
13201320
agg = Aggregator(createCombiner, mergeValue, mergeCombiners)
13211321

13221322
def combineLocally(iterator):
1323-
merger = ExternalMerger(agg, memory, serializer) \
1323+
merger = ExternalMerger(agg, memory * 0.9, serializer) \
13241324
if spill else InMemoryMerger(agg)
13251325
merger.mergeValues(iterator)
13261326
return merger.iteritems()

python/pyspark/shuffle.py

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -216,13 +216,14 @@ def _get_spill_dir(self, n):
216216
""" Choose one directory for spill by number n """
217217
return os.path.join(self.localdirs[n % len(self.localdirs)], str(n))
218218

219-
def next_limit(self):
220-
"""
221-
Return the next memory limit. If the memory is not released
222-
after spilling, it will dump the data only when the used memory
223-
starts to increase.
224-
"""
225-
return max(self.memory_limit, get_used_memory() * 1.05)
219+
def _next_limit(self):
220+
#"""
221+
#Return the next memory limit. If the memory is not released
222+
#after spilling, it will dump the data only when the used memory
223+
#starts to increase.
224+
#"""
225+
#return max(self.memory_limit, get_used_memory() * 1.05)
226+
return self.memory_limit
226227

227228
def mergeValues(self, iterator):
228229
""" Combine the items by creator and combiner """
@@ -237,7 +238,7 @@ def mergeValues(self, iterator):
237238
c += 1
238239
if c % batch == 0 and get_used_memory() > self.memory_limit:
239240
self._first_spill()
240-
self._partitioned_mergeValues(iterator, self.next_limit())
241+
self._partitioned_mergeValues(iterator, self._next_limit())
241242
break
242243

243244
def _partition(self, key):
@@ -259,7 +260,7 @@ def _partitioned_mergeValues(self, iterator, limit=0):
259260
c += 1
260261
if c % batch == 0 and get_used_memory() > limit:
261262
self._spill()
262-
limit = self.next_limit()
263+
limit = self._next_limit()
263264

264265
def mergeCombiners(self, iterator, check=True):
265266
""" Merge (K,V) pair by mergeCombiner """
@@ -275,7 +276,7 @@ def mergeCombiners(self, iterator, check=True):
275276
c += 1
276277
if c % batch == 0 and get_used_memory() > self.memory_limit:
277278
self._first_spill()
278-
self._partitioned_mergeCombiners(iterator, self.next_limit())
279+
self._partitioned_mergeCombiners(iterator, self._next_limit())
279280
break
280281

281282
def _partitioned_mergeCombiners(self, iterator, limit=0):
@@ -291,7 +292,7 @@ def _partitioned_mergeCombiners(self, iterator, limit=0):
291292
c += 1
292293
if c % self.batch == 0 and get_used_memory() > limit:
293294
self._spill()
294-
limit = self.next_limit()
295+
limit = self._next_limit()
295296

296297
def _first_spill(self):
297298
"""
@@ -349,7 +350,7 @@ def _external_items(self):
349350
assert not self.data
350351
if any(self.pdata):
351352
self._spill()
352-
hard_limit = self.next_limit()
353+
hard_limit = self._next_limit()
353354

354355
try:
355356
for i in range(self.partitions):
@@ -406,7 +407,7 @@ def _recursive_merged_items(self, start):
406407
m = ExternalMerger(self.agg, self.memory_limit, self.serializer,
407408
subdirs, self.scale * self.partitions)
408409
m.pdata = [{} for _ in range(self.partitions)]
409-
limit = self.next_limit()
410+
limit = self._next_limit()
410411

411412
for j in range(self.spills):
412413
path = self._get_spill_dir(j)
@@ -416,7 +417,7 @@ def _recursive_merged_items(self, start):
416417

417418
if get_used_memory() > limit:
418419
m._spill()
419-
limit = self.next_limit()
420+
limit = self._next_limit()
420421

421422
for v in m._external_items():
422423
yield v

0 commit comments

Comments
 (0)