From 6edbd1feaf85640b68c10a39c19d1d97d0732d28 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 17 Jul 2014 01:03:32 -0700 Subject: [PATCH 01/18] Hash based disk spilling aggregation --- python/pyspark/rdd.py | 137 +++++++++++++++++++++++++++++++--- python/pyspark/serializers.py | 27 +++++++ 2 files changed, 154 insertions(+), 10 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 0c35c666805d..e08aea6d7d5d 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -32,10 +32,12 @@ import heapq from random import Random from math import sqrt, log +import platform +import resource from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \ BatchedSerializer, CloudPickleSerializer, PairDeserializer, \ - PickleSerializer, pack_long + PickleSerializer, BatchedSerializer, AutoSerializer, pack_long from pyspark.join import python_join, python_left_outer_join, \ python_right_outer_join, python_cogroup from pyspark.statcounter import StatCounter @@ -168,6 +170,123 @@ def _replaceRoot(self, value): self._sink(1) +class Merger(object): + """ + External merger will dump the aggregated data into disks when memory usage is above + the limit, then merge them together. + + >>> combiner = lambda x, y:x+y + >>> merger = Merger(combiner, 10) + >>> N = 10000 + >>> merger.merge(zip(xrange(N), xrange(N)) * 10) + >>> merger.spills + 100 + >>> sum(1 for k,v in merger.iteritems()) + 10000 + """ + + PARTITIONS = 64 + BATCH = 1000 + + def __init__(self, combiner, memory_limit=256, path="/tmp/pyspark", serializer=None): + self.combiner = combiner + self.path = os.path.join(path, str(os.getpid())) + self.memory_limit = memory_limit + self.serializer = serializer or BatchedSerializer(AutoSerializer(), 1024) + self.item_limit = None + self.data = {} + self.pdata = [] + self.spills = 0 + + def used_memory(self): + rss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss + if platform.system() == 'Linux': + rss >>= 10 + elif platform.system() == 'Darwin': + rss >>= 20 + return rss + + def merge(self, iterator): + iterator = iter(iterator) + d = self.data + comb = self.combiner + c = 0 + for k, v in iterator: + if k in d: + d[k] = comb(d[k], v) + else: + d[k] = v + + if self.item_limit is not None: + continue + + c += 1 + if c % self.BATCH == 0 and self.used_memory() > self.memory_limit: + self.item_limit = c + self._first_spill() + self._partitioned_merge(iterator) + return + + def _partitioned_merge(self, iterator): + comb = self.combiner + c = 0 + for k, v in iterator: + d = self.pdata[hash(k) % self.PARTITIONS] + if k in d: + d[k] = comb(d[k], v) + else: + d[k] = v + c += 1 + if c >= self.item_limit: + self._spill() + c = 0 + + def _first_spill(self): + path = os.path.join(self.path, str(self.spills)) + if not os.path.exists(path): + os.makedirs(path) + streams = [open(os.path.join(path, str(i)), 'w') + for i in range(self.PARTITIONS)] + for k, v in self.data.iteritems(): + h = hash(k) % self.PARTITIONS + self.serializer.dump_stream([(k, v)], streams[h]) + for s in streams: + s.close() + self.data.clear() + self.pdata = [{} for i in range(self.PARTITIONS)] + self.spills += 1 + + def _spill(self): + path = os.path.join(self.path, str(self.spills)) + if not os.path.exists(path): + os.makedirs(path) + for i in range(self.PARTITIONS): + p = os.path.join(path, str(i)) + with open(p, 'w') as f: + self.serializer.dump_stream(self.pdata[i].iteritems(), f) + self.pdata[i].clear() + self.spills += 1 + + def iteritems(self): + if not self.pdata and not self.spills: + return self.data.iteritems() + return self._external_items() + + def _external_items(self): + for i in range(self.PARTITIONS): + self.data = self.pdata[i] + for j in range(self.spills): + p = os.path.join(self.path, str(j), str(i)) + self.merge(self.serializer.load_stream(open(p))) + os.remove(p) + for k,v in self.data.iteritems(): + yield k,v + self.data.clear() + for i in range(self.spills): + os.rmdir(os.path.join(self.path, str(i))) + os.rmdir(self.path) + + class RDD(object): """ @@ -1247,15 +1366,12 @@ def combineLocally(iterator): return combiners.iteritems() locally_combined = self.mapPartitions(combineLocally) shuffled = locally_combined.partitionBy(numPartitions) - + + executorMemory = self.ctx._jsc.sc().executorMemory() def _mergeCombiners(iterator): - combiners = {} - for (k, v) in iterator: - if k not in combiners: - combiners[k] = v - else: - combiners[k] = mergeCombiners(combiners[k], v) - return combiners.iteritems() + merger = Merger(mergeCombiners, executorMemory * 0.7) + merger.merge(iterator) + return merger.iteritems() return shuffled.mapPartitions(_mergeCombiners) def aggregateByKey(self, zeroValue, seqFunc, combFunc, numPartitions=None): @@ -1314,7 +1430,8 @@ def mergeValue(xs, x): return xs def mergeCombiners(a, b): - return a + b + a.extend(b) + return a return self.combineByKey(createCombiner, mergeValue, mergeCombiners, numPartitions).mapValues(lambda x: ResultIterable(x)) diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index b253807974a2..2144ea127b16 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -297,6 +297,33 @@ class MarshalSerializer(FramedSerializer): loads = marshal.loads +class AutoSerializer(FramedSerializer): + """ + Choose marshal or cPickle as serialization protocol autumatically + """ + def __init__(self): + FramedSerializer.__init__(self) + self._type = None + + def dumps(self, obj): + try: + if self._type is not None: + raise TypeError("fallback") + return 'M' + marshal.dumps(obj) + except Exception: + self._type = 'P' + return 'P' + cPickle.dumps(obj, -1) + + def loads(self, stream): + _type = stream[0] + if _type == 'M': + return marshal.loads(stream[1:]) + elif _type == 'P': + return cPickle.loads(stream[1:]) + else: + raise ValueError("invalid sevialization type: %s" % _type) + + class UTF8Deserializer(Serializer): """ Deserializes streams written by String.getBytes. From e9a40f6325a36cabde3e3b692c73faeb1593ad18 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Fri, 18 Jul 2014 14:59:48 -0700 Subject: [PATCH 02/18] recursive merger --- python/pyspark/rdd.py | 125 +-------------------- python/pyspark/serializers.py | 14 +-- python/pyspark/shuffle.py | 202 ++++++++++++++++++++++++++++++++++ python/pyspark/tests.py | 29 +++++ 4 files changed, 242 insertions(+), 128 deletions(-) create mode 100644 python/pyspark/shuffle.py diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index e08aea6d7d5d..ebad1a53c222 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -32,18 +32,17 @@ import heapq from random import Random from math import sqrt, log -import platform -import resource from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \ BatchedSerializer, CloudPickleSerializer, PairDeserializer, \ - PickleSerializer, BatchedSerializer, AutoSerializer, pack_long + PickleSerializer, pack_long from pyspark.join import python_join, python_left_outer_join, \ python_right_outer_join, python_cogroup from pyspark.statcounter import StatCounter from pyspark.rddsampler import RDDSampler from pyspark.storagelevel import StorageLevel from pyspark.resultiterable import ResultIterable +from pyspark.shuffle import Merger from py4j.java_collections import ListConverter, MapConverter @@ -170,123 +169,6 @@ def _replaceRoot(self, value): self._sink(1) -class Merger(object): - """ - External merger will dump the aggregated data into disks when memory usage is above - the limit, then merge them together. - - >>> combiner = lambda x, y:x+y - >>> merger = Merger(combiner, 10) - >>> N = 10000 - >>> merger.merge(zip(xrange(N), xrange(N)) * 10) - >>> merger.spills - 100 - >>> sum(1 for k,v in merger.iteritems()) - 10000 - """ - - PARTITIONS = 64 - BATCH = 1000 - - def __init__(self, combiner, memory_limit=256, path="/tmp/pyspark", serializer=None): - self.combiner = combiner - self.path = os.path.join(path, str(os.getpid())) - self.memory_limit = memory_limit - self.serializer = serializer or BatchedSerializer(AutoSerializer(), 1024) - self.item_limit = None - self.data = {} - self.pdata = [] - self.spills = 0 - - def used_memory(self): - rss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss - if platform.system() == 'Linux': - rss >>= 10 - elif platform.system() == 'Darwin': - rss >>= 20 - return rss - - def merge(self, iterator): - iterator = iter(iterator) - d = self.data - comb = self.combiner - c = 0 - for k, v in iterator: - if k in d: - d[k] = comb(d[k], v) - else: - d[k] = v - - if self.item_limit is not None: - continue - - c += 1 - if c % self.BATCH == 0 and self.used_memory() > self.memory_limit: - self.item_limit = c - self._first_spill() - self._partitioned_merge(iterator) - return - - def _partitioned_merge(self, iterator): - comb = self.combiner - c = 0 - for k, v in iterator: - d = self.pdata[hash(k) % self.PARTITIONS] - if k in d: - d[k] = comb(d[k], v) - else: - d[k] = v - c += 1 - if c >= self.item_limit: - self._spill() - c = 0 - - def _first_spill(self): - path = os.path.join(self.path, str(self.spills)) - if not os.path.exists(path): - os.makedirs(path) - streams = [open(os.path.join(path, str(i)), 'w') - for i in range(self.PARTITIONS)] - for k, v in self.data.iteritems(): - h = hash(k) % self.PARTITIONS - self.serializer.dump_stream([(k, v)], streams[h]) - for s in streams: - s.close() - self.data.clear() - self.pdata = [{} for i in range(self.PARTITIONS)] - self.spills += 1 - - def _spill(self): - path = os.path.join(self.path, str(self.spills)) - if not os.path.exists(path): - os.makedirs(path) - for i in range(self.PARTITIONS): - p = os.path.join(path, str(i)) - with open(p, 'w') as f: - self.serializer.dump_stream(self.pdata[i].iteritems(), f) - self.pdata[i].clear() - self.spills += 1 - - def iteritems(self): - if not self.pdata and not self.spills: - return self.data.iteritems() - return self._external_items() - - def _external_items(self): - for i in range(self.PARTITIONS): - self.data = self.pdata[i] - for j in range(self.spills): - p = os.path.join(self.path, str(j), str(i)) - self.merge(self.serializer.load_stream(open(p))) - os.remove(p) - for k,v in self.data.iteritems(): - yield k,v - self.data.clear() - for i in range(self.spills): - os.rmdir(os.path.join(self.path, str(i))) - os.rmdir(self.path) - - class RDD(object): """ @@ -1369,7 +1251,8 @@ def combineLocally(iterator): executorMemory = self.ctx._jsc.sc().executorMemory() def _mergeCombiners(iterator): - merger = Merger(mergeCombiners, executorMemory * 0.7) + # TODO: workdir and serializer + merger = Merger(mergeCombiners, executorMemory) merger.merge(iterator) return merger.iteritems() return shuffled.mapPartitions(_mergeCombiners) diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index 2144ea127b16..2d5d023fcd94 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -194,7 +194,7 @@ def load_stream(self, stream): return chain.from_iterable(self._load_stream_without_unbatching(stream)) def _load_stream_without_unbatching(self, stream): - return self.serializer.load_stream(stream) + return self.serializer.load_stream(stream) def __eq__(self, other): return isinstance(other, BatchedSerializer) and \ @@ -306,20 +306,20 @@ def __init__(self): self._type = None def dumps(self, obj): + if self._type is not None: + return 'P' + cPickle.dumps(obj, -1) try: - if self._type is not None: - raise TypeError("fallback") return 'M' + marshal.dumps(obj) except Exception: self._type = 'P' return 'P' + cPickle.dumps(obj, -1) - def loads(self, stream): - _type = stream[0] + def loads(self, obj): + _type = obj[0] if _type == 'M': - return marshal.loads(stream[1:]) + return marshal.loads(obj[1:]) elif _type == 'P': - return cPickle.loads(stream[1:]) + return cPickle.loads(obj[1:]) else: raise ValueError("invalid sevialization type: %s" % _type) diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py new file mode 100644 index 000000000000..a4458919b64c --- /dev/null +++ b/python/pyspark/shuffle.py @@ -0,0 +1,202 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import os +import sys +import platform +import shutil +import warnings + +from pyspark.serializers import BatchedSerializer, AutoSerializer + +try: + import psutil + def get_used_memory(): + self = psutil.Process(os.getpid()) + return self.memory_info().rss >> 20 + +except ImportError: + def get_used_memory(): + if platform.system() == 'Linux': + for line in open('/proc/self/status'): + if line.startswith('VmRSS:'): + return int(line.split()[1]) >> 10 + else: + warnings.warn("please install psutil to get accurate memory usage") + if platform.system() == "Darwin": + import resource + return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss >> 20 + # TODO: support windows + return 0 + + +class Merger(object): + + """ + External merger will dump the aggregated data into disks when memory usage + is above the limit, then merge them together. + + >>> combiner = lambda x, y:x+y + >>> merger = Merger(combiner, 10) + >>> N = 10000 + >>> merger.merge(zip(xrange(N), xrange(N)) * 10) + >>> assert merger.spills > 0 + >>> sum(v for k,v in merger.iteritems()) + 499950000 + >>> + """ + + PARTITIONS = 64 + BATCH = 10000 + + def __init__(self, combiner, memory_limit=512, path="/tmp/pysparki/merge", + serializer=None, batch_size=1024, scale=1): + self.combiner = combiner + self.path = os.path.join(path, str(os.getpid())) + self.memory_limit = memory_limit + self.serializer = serializer or BatchedSerializer(AutoSerializer(), batch_size) + self.data = {} + self.pdata = [] + self.spills = 0 + self.scale = scale + + @property + def used_memory(self): + return get_used_memory() + + @property + def next_limit(self): + return max(self.memory_limit, self.used_memory * 1.05) + + def merge(self, iterator, check=True): + """ merge (K,V) pair by combiner """ + iterator = iter(iterator) + # speedup attribute lookup + d, comb, batch = self.data, self.combiner, self.BATCH + c = 0 + for k, v in iterator: + d[k] = comb(d[k], v) if k in d else v + if not check: + continue + + c += 1 + if c % self.BATCH == 0 and self.used_memory > self.memory_limit: + self._first_spill() + self._partitioned_merge(iterator, self.next_limit) + break + + def _hash(self, key): + return (hash(key) / self.scale) % self.PARTITIONS + + def _partitioned_merge(self, iterator, limit): + comb, pdata, hfun = self.combiner, self.pdata, self._hash + c = 0 + for k, v in iterator: + d = pdata[hfun(k)] + d[k] = comb(d[k], v) if k in d else v + if not limit: + continue + c += 1 + if c % self.BATCH == 0 and self.used_memory > limit: + self._spill() + limit = self.next_limit + + def _first_spill(self): + path = os.path.join(self.path, str(self.spills)) + if not os.path.exists(path): + os.makedirs(path) + streams = [open(os.path.join(path, str(i)), 'w') + for i in range(self.PARTITIONS)] + for k, v in self.data.iteritems(): + h = self._hash(k) + self.serializer.dump_stream([(k, v)], streams[h]) + for s in streams: + s.close() + self.data.clear() + self.pdata = [{} for i in range(self.PARTITIONS)] + self.spills += 1 + + def _spill(self): + path = os.path.join(self.path, str(self.spills)) + if not os.path.exists(path): + os.makedirs(path) + for i in range(self.PARTITIONS): + p = os.path.join(path, str(i)) + with open(p, 'w') as f: + self.serializer.dump_stream(self.pdata[i].iteritems(), f) + self.pdata[i].clear() + self.spills += 1 + + def iteritems(self): + """ iterator of all merged (K,V) pairs """ + if not self.pdata and not self.spills: + return self.data.iteritems() + return self._external_items() + + def _external_items(self): + assert not self.data + if any(self.pdata): + self._spill() + hard_limit = self.next_limit + + for i in range(self.PARTITIONS): + self.data = {} + for j in range(self.spills): + p = os.path.join(self.path, str(j), str(i)) + self.merge(self.serializer.load_stream(open(p)), check=False) + + if j > 0 and self.used_memory > hard_limit and j < self.spills - 1: + self.data.clear() # will read from disk again + for v in self._recursive_merged_items(i): + yield v + return + + for v in self.data.iteritems(): + yield v + self.data.clear() + + shutil.rmtree(self.path, True) + + def _recursive_merged_items(self, start): + assert not self.data + assert self.spills > 0 + if any(self.pdata): + self._spill() + + for i in range(start, self.PARTITIONS): + m = Merger(self.combiner, self.memory_limit, + os.path.join(self.path, 'merge', str(i)), + self.serializer, scale=self.scale * self.PARTITIONS) + m.pdata = [{} for x in range(self.PARTITIONS)] + limit = self.next_limit + + for j in range(self.spills): + p = os.path.join(self.path, str(j), str(i)) + m._partitioned_merge(self.serializer.load_stream(open(p)), 0) + if m.used_memory > limit: + m._spill() + limit = self.next_limit + + for v in m._external_items(): + yield v + + shutil.rmtree(self.path, True) + + +if __name__ == '__main__': + import doctest + doctest.testmod() diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index c15bb457759e..a5e157dd3f0e 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -34,6 +34,7 @@ from pyspark.context import SparkContext from pyspark.files import SparkFiles from pyspark.serializers import read_int +from pyspark.shuffle import Merger _have_scipy = False try: @@ -47,6 +48,34 @@ SPARK_HOME = os.environ["SPARK_HOME"] +class TestMerger(unittest.TestCase): + + def setUp(self): + self.N = 1<<18 + self.l = [i for i in xrange(self.N)] + self.data = zip(self.l, self.l) + Merger.PARTITIONS = 8 + Merger.BATCH = 1<<14 + + def test_small_dataset(self): + m = Merger(lambda x,y: x+y, 1000) + m.merge(self.data) + self.assertEqual(m.spills, 0) + self.assertEqual(sum(v for k,v in m.iteritems()), sum(xrange(self.N))) + + def test_medium_dataset(self): + m = Merger(lambda x,y: x+y, 10) + m.merge(self.data * 3) + self.assertTrue(m.spills >= 1) + self.assertEqual(sum(v for k,v in m.iteritems()), sum(xrange(self.N)) * 3) + + def test_huge_dataset(self): + m = Merger(lambda x,y: x + y, 10) + m.merge(map(lambda (k,v): (k, [str(v)]), self.data) * 10) + self.assertTrue(m.spills >= 1) + self.assertEqual(sum(len(v) for k,v in m._recursive_merged_items(0)), self.N * 10) + + class PySparkTestCase(unittest.TestCase): def setUp(self): From 286aafff37e70304ca5fd7cb014bea8fe4b842a7 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Mon, 21 Jul 2014 11:43:30 -0700 Subject: [PATCH 03/18] let spilled aggregation in Python configurable add spark.python.worker.memory for memory used by Python worker. Default is 512M. --- docs/configuration.md | 9 +++++++ python/pyspark/rdd.py | 24 +++++++++++++++--- python/pyspark/shuffle.py | 52 ++++++++++++++++++++++++++++++--------- python/pyspark/tests.py | 17 ++++++++----- 4 files changed, 80 insertions(+), 22 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index a70007c16544..d36c12ee5763 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -195,6 +195,15 @@ Apart from these, the following properties are also available, and may be useful Spark's dependencies and user dependencies. It is currently an experimental feature. + + spark.python.worker.memory + 512m + + Amount of memory to use per python worker process during aggregation, in the same + format as JVM memory strings (e.g. 512m, 2g). If the memory + used during aggregation go above this amount, it will spill the data into disks. + + #### Shuffle Behavior diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index ebad1a53c222..59ef474045dc 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -42,7 +42,7 @@ from pyspark.rddsampler import RDDSampler from pyspark.storagelevel import StorageLevel from pyspark.resultiterable import ResultIterable -from pyspark.shuffle import Merger +from pyspark.shuffle import MapMerger, ExternalHashMapMerger from py4j.java_collections import ListConverter, MapConverter @@ -169,6 +169,18 @@ def _replaceRoot(self, value): self._sink(1) +def _parse_memory(s): + """ + >>> _parse_memory("256m") + 256 + >>> _parse_memory("2g") + 2048 + """ + units = {'g': 1024, 'm': 1, 't': 1<<20, 'k':1.0/1024} + if s[-1] not in units: + raise ValueError("invalid format: " + s) + return int(float(s[:-1]) * units[s[-1].lower()]) + class RDD(object): """ @@ -1249,10 +1261,14 @@ def combineLocally(iterator): locally_combined = self.mapPartitions(combineLocally) shuffled = locally_combined.partitionBy(numPartitions) - executorMemory = self.ctx._jsc.sc().executorMemory() + serializer = self.ctx.serializer + spill = ((self.ctx._conf.get("spark.shuffle.spill") or 'True').lower() + in ('true', '1', 'yes')) + memory = _parse_memory(self.ctx._conf.get("spark.python.worker.memory") or "512m") def _mergeCombiners(iterator): - # TODO: workdir and serializer - merger = Merger(mergeCombiners, executorMemory) + # TODO: workdir + merger = ExternalHashMapMerger(mergeCombiners, memory, serializer=serializer)\ + if spill else MapMerger(mergeCombiners) merger.merge(iterator) return merger.iteritems() return shuffled.mapPartitions(_mergeCombiners) diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py index a4458919b64c..a956edce0ea3 100644 --- a/python/pyspark/shuffle.py +++ b/python/pyspark/shuffle.py @@ -45,13 +45,41 @@ def get_used_memory(): class Merger(object): + """ + merge shuffled data together by combinator + """ + def merge(self, iterator): + raise NotImplementedError + + def iteritems(self): + raise NotImplementedError + + +class MapMerger(Merger): + """ + In memory merger based on map + """ + def __init__(self, combiner): + self.combiner = combiner + self.data = {} + + def merge(self, iterator): + d, comb = self.data, self.combiner + for k,v in iter(iterator): + d[k] = comb(d[k], v) if k in d else v + + def iteritems(self): + return self.data.iteritems() + + +class ExternalHashMapMerger(Merger): """ External merger will dump the aggregated data into disks when memory usage is above the limit, then merge them together. >>> combiner = lambda x, y:x+y - >>> merger = Merger(combiner, 10) + >>> merger = ExternalHashMapMerger(combiner, 10) >>> N = 10000 >>> merger.merge(zip(xrange(N), xrange(N)) * 10) >>> assert merger.spills > 0 @@ -63,16 +91,16 @@ class Merger(object): PARTITIONS = 64 BATCH = 10000 - def __init__(self, combiner, memory_limit=512, path="/tmp/pysparki/merge", - serializer=None, batch_size=1024, scale=1): + def __init__(self, combiner, memory_limit=512, path="/tmp/pyspark/merge", + serializer=None, scale=1): self.combiner = combiner - self.path = os.path.join(path, str(os.getpid())) self.memory_limit = memory_limit - self.serializer = serializer or BatchedSerializer(AutoSerializer(), batch_size) + self.path = os.path.join(path, str(os.getpid())) + self.serializer = serializer or BatchedSerializer(AutoSerializer(), 1024) + self.scale = scale self.data = {} self.pdata = [] self.spills = 0 - self.scale = scale @property def used_memory(self): @@ -94,7 +122,7 @@ def merge(self, iterator, check=True): continue c += 1 - if c % self.BATCH == 0 and self.used_memory > self.memory_limit: + if c % batch == 0 and self.used_memory > self.memory_limit: self._first_spill() self._partitioned_merge(iterator, self.next_limit) break @@ -158,7 +186,7 @@ def _external_items(self): for j in range(self.spills): p = os.path.join(self.path, str(j), str(i)) self.merge(self.serializer.load_stream(open(p)), check=False) - + if j > 0 and self.used_memory > hard_limit and j < self.spills - 1: self.data.clear() # will read from disk again for v in self._recursive_merged_items(i): @@ -178,12 +206,12 @@ def _recursive_merged_items(self, start): self._spill() for i in range(start, self.PARTITIONS): - m = Merger(self.combiner, self.memory_limit, + m = ExternalHashMapMerger(self.combiner, self.memory_limit, os.path.join(self.path, 'merge', str(i)), self.serializer, scale=self.scale * self.PARTITIONS) - m.pdata = [{} for x in range(self.PARTITIONS)] + m.pdata = [{} for _ in range(self.PARTITIONS)] limit = self.next_limit - + for j in range(self.spills): p = os.path.join(self.path, str(j), str(i)) m._partitioned_merge(self.serializer.load_stream(open(p)), 0) @@ -193,7 +221,7 @@ def _recursive_merged_items(self, start): for v in m._external_items(): yield v - + shutil.rmtree(self.path, True) diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index a5e157dd3f0e..c1a0997256c3 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -34,7 +34,7 @@ from pyspark.context import SparkContext from pyspark.files import SparkFiles from pyspark.serializers import read_int -from pyspark.shuffle import Merger +from pyspark.shuffle import MapMerger, ExternalHashMapMerger _have_scipy = False try: @@ -54,23 +54,28 @@ def setUp(self): self.N = 1<<18 self.l = [i for i in xrange(self.N)] self.data = zip(self.l, self.l) - Merger.PARTITIONS = 8 - Merger.BATCH = 1<<14 + ExternalHashMapMerger.PARTITIONS = 8 + ExternalHashMapMerger.BATCH = 1<<14 + + def test_in_memory(self): + m = MapMerger(lambda x,y: x+y) + m.merge(self.data) + self.assertEqual(sum(v for k,v in m.iteritems()), sum(xrange(self.N))) def test_small_dataset(self): - m = Merger(lambda x,y: x+y, 1000) + m = ExternalHashMapMerger(lambda x,y: x+y, 1000) m.merge(self.data) self.assertEqual(m.spills, 0) self.assertEqual(sum(v for k,v in m.iteritems()), sum(xrange(self.N))) def test_medium_dataset(self): - m = Merger(lambda x,y: x+y, 10) + m = ExternalHashMapMerger(lambda x,y: x+y, 10) m.merge(self.data * 3) self.assertTrue(m.spills >= 1) self.assertEqual(sum(v for k,v in m.iteritems()), sum(xrange(self.N)) * 3) def test_huge_dataset(self): - m = Merger(lambda x,y: x + y, 10) + m = ExternalHashMapMerger(lambda x,y: x + y, 10) m.merge(map(lambda (k,v): (k, [str(v)]), self.data) * 10) self.assertTrue(m.spills >= 1) self.assertEqual(sum(len(v) for k,v in m._recursive_merged_items(0)), self.N * 10) From 57ee7ef475946142802b7b53cf0049023a0fb529 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Mon, 21 Jul 2014 12:10:25 -0700 Subject: [PATCH 04/18] update docs --- python/pyspark/rdd.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 59ef474045dc..3b52fbb91d52 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -171,6 +171,8 @@ def _replaceRoot(self, value): def _parse_memory(s): """ + It returns a number in MB + >>> _parse_memory("256m") 256 >>> _parse_memory("2g") From 24cec6a57ea5a1c79930fa69c6ec63e751b07724 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Mon, 21 Jul 2014 13:03:04 -0700 Subject: [PATCH 05/18] get local directory by SPARK_LOCAL_DIR support multiple local directories --- .../apache/spark/api/python/PythonRDD.scala | 4 +- python/pyspark/rdd.py | 3 +- python/pyspark/shuffle.py | 77 ++++++++++++------- python/pyspark/tests.py | 1 + 4 files changed, 55 insertions(+), 30 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 462e09466bfa..aa80592abb82 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -57,7 +57,9 @@ private[spark] class PythonRDD[T: ClassTag]( override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = { val startTime = System.currentTimeMillis val env = SparkEnv.get - val worker: Socket = env.createPythonWorker(pythonExec, envVars.toMap) + val localdir = env.conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")) + val worker: Socket = env.createPythonWorker(pythonExec, + envVars.toMap + ("SPARK_LOCAL_DIR" -> localdir)) // Start a thread to feed the process input from our parent's iterator val writerThread = new WriterThread(env, worker, split, context) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 3b52fbb91d52..70143dafd612 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1268,8 +1268,7 @@ def combineLocally(iterator): in ('true', '1', 'yes')) memory = _parse_memory(self.ctx._conf.get("spark.python.worker.memory") or "512m") def _mergeCombiners(iterator): - # TODO: workdir - merger = ExternalHashMapMerger(mergeCombiners, memory, serializer=serializer)\ + merger = ExternalHashMapMerger(mergeCombiners, memory, serializer)\ if spill else MapMerger(mergeCombiners) merger.merge(iterator) return merger.iteritems() diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py index a956edce0ea3..66def2890bf0 100644 --- a/python/pyspark/shuffle.py +++ b/python/pyspark/shuffle.py @@ -91,17 +91,35 @@ class ExternalHashMapMerger(Merger): PARTITIONS = 64 BATCH = 10000 - def __init__(self, combiner, memory_limit=512, path="/tmp/pyspark/merge", - serializer=None, scale=1): + def __init__(self, combiner, memory_limit=512, serializer=None, + localdirs=None, scale=1): self.combiner = combiner self.memory_limit = memory_limit - self.path = os.path.join(path, str(os.getpid())) self.serializer = serializer or BatchedSerializer(AutoSerializer(), 1024) + self.localdirs = localdirs or self._get_dirs() self.scale = scale self.data = {} self.pdata = [] self.spills = 0 + def _get_dirs(self): + path = os.environ.get("SPARK_LOCAL_DIR", "/tmp/spark") + dirs = path.split(",") + localdirs = [] + for d in dirs: + d = os.path.join(d, "merge", str(os.getpid())) + try: + os.makedirs(d) + localdirs.append(d) + except IOError: + pass + if not localdirs: + raise IOError("no writable directories: " + path) + return localdirs + + def _get_spill_dir(self, n): + return os.path.join(self.localdirs[n % len(self.localdirs)], str(n)) + @property def used_memory(self): return get_used_memory() @@ -144,7 +162,7 @@ def _partitioned_merge(self, iterator, limit): limit = self.next_limit def _first_spill(self): - path = os.path.join(self.path, str(self.spills)) + path = self._get_spill_dir(self.spills) if not os.path.exists(path): os.makedirs(path) streams = [open(os.path.join(path, str(i)), 'w') @@ -159,7 +177,7 @@ def _first_spill(self): self.spills += 1 def _spill(self): - path = os.path.join(self.path, str(self.spills)) + path = self._get_spill_dir(self.spills) if not os.path.exists(path): os.makedirs(path) for i in range(self.PARTITIONS): @@ -181,23 +199,29 @@ def _external_items(self): self._spill() hard_limit = self.next_limit - for i in range(self.PARTITIONS): - self.data = {} - for j in range(self.spills): - p = os.path.join(self.path, str(j), str(i)) - self.merge(self.serializer.load_stream(open(p)), check=False) - - if j > 0 and self.used_memory > hard_limit and j < self.spills - 1: - self.data.clear() # will read from disk again - for v in self._recursive_merged_items(i): - yield v - return - - for v in self.data.iteritems(): - yield v - self.data.clear() - - shutil.rmtree(self.path, True) + try: + for i in range(self.PARTITIONS): + self.data = {} + for j in range(self.spills): + path = self._get_spill_dir(j) + p = os.path.join(path, str(i)) + self.merge(self.serializer.load_stream(open(p)), check=False) + + if j > 0 and self.used_memory > hard_limit and j < self.spills - 1: + self.data.clear() # will read from disk again + for v in self._recursive_merged_items(i): + yield v + return + + for v in self.data.iteritems(): + yield v + self.data.clear() + finally: + self._cleanup() + + def _cleanup(self): + for d in self.localdirs: + shutil.rmtree(d, True) def _recursive_merged_items(self, start): assert not self.data @@ -206,14 +230,15 @@ def _recursive_merged_items(self, start): self._spill() for i in range(start, self.PARTITIONS): + subdirs = [os.path.join(d, 'merge', str(i)) for d in self.localdirs] m = ExternalHashMapMerger(self.combiner, self.memory_limit, - os.path.join(self.path, 'merge', str(i)), - self.serializer, scale=self.scale * self.PARTITIONS) + self.serializer, subdirs, self.scale * self.PARTITIONS) m.pdata = [{} for _ in range(self.PARTITIONS)] limit = self.next_limit for j in range(self.spills): - p = os.path.join(self.path, str(j), str(i)) + path = self._get_spill_dir(j) + p = os.path.join(path, str(i)) m._partitioned_merge(self.serializer.load_stream(open(p)), 0) if m.used_memory > limit: m._spill() @@ -222,8 +247,6 @@ def _recursive_merged_items(self, start): for v in m._external_items(): yield v - shutil.rmtree(self.path, True) - if __name__ == '__main__': import doctest diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index c1a0997256c3..dad9ea8dd75e 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -79,6 +79,7 @@ def test_huge_dataset(self): m.merge(map(lambda (k,v): (k, [str(v)]), self.data) * 10) self.assertTrue(m.spills >= 1) self.assertEqual(sum(len(v) for k,v in m._recursive_merged_items(0)), self.N * 10) + m._cleanup() class PySparkTestCase(unittest.TestCase): From e78a0a04d8ee3cb8a8cde4b033191af6177bc238 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Mon, 21 Jul 2014 14:48:16 -0700 Subject: [PATCH 06/18] fix style --- python/pyspark/shuffle.py | 25 +++++++++++++++---------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py index 66def2890bf0..705ceba34ee7 100644 --- a/python/pyspark/shuffle.py +++ b/python/pyspark/shuffle.py @@ -25,11 +25,13 @@ try: import psutil + def get_used_memory(): self = psutil.Process(os.getpid()) return self.memory_info().rss >> 20 except ImportError: + def get_used_memory(): if platform.system() == 'Linux': for line in open('/proc/self/status'): @@ -48,6 +50,7 @@ class Merger(object): """ merge shuffled data together by combinator """ + def merge(self, iterator): raise NotImplementedError @@ -59,13 +62,14 @@ class MapMerger(Merger): """ In memory merger based on map """ + def __init__(self, combiner): self.combiner = combiner self.data = {} def merge(self, iterator): d, comb = self.data, self.combiner - for k,v in iter(iterator): + for k, v in iter(iterator): d[k] = comb(d[k], v) if k in d else v def iteritems(self): @@ -75,7 +79,7 @@ def iteritems(self): class ExternalHashMapMerger(Merger): """ - External merger will dump the aggregated data into disks when memory usage + External merger will dump the aggregated data into disks when memory usage is above the limit, then merge them together. >>> combiner = lambda x, y:x+y @@ -85,7 +89,6 @@ class ExternalHashMapMerger(Merger): >>> assert merger.spills > 0 >>> sum(v for k,v in merger.iteritems()) 499950000 - >>> """ PARTITIONS = 64 @@ -95,7 +98,8 @@ def __init__(self, combiner, memory_limit=512, serializer=None, localdirs=None, scale=1): self.combiner = combiner self.memory_limit = memory_limit - self.serializer = serializer or BatchedSerializer(AutoSerializer(), 1024) + self.serializer = serializer or\ + BatchedSerializer(AutoSerializer(), 1024) self.localdirs = localdirs or self._get_dirs() self.scale = scale self.data = {} @@ -182,7 +186,7 @@ def _spill(self): os.makedirs(path) for i in range(self.PARTITIONS): p = os.path.join(path, str(i)) - with open(p, 'w') as f: + with open(p, "w") as f: self.serializer.dump_stream(self.pdata[i].iteritems(), f) self.pdata[i].clear() self.spills += 1 @@ -205,9 +209,9 @@ def _external_items(self): for j in range(self.spills): path = self._get_spill_dir(j) p = os.path.join(path, str(i)) - self.merge(self.serializer.load_stream(open(p)), check=False) + self.merge(self.serializer.load_stream(open(p)), False) - if j > 0 and self.used_memory > hard_limit and j < self.spills - 1: + if self.used_memory > hard_limit and j < self.spills - 1: self.data.clear() # will read from disk again for v in self._recursive_merged_items(i): yield v @@ -230,9 +234,10 @@ def _recursive_merged_items(self, start): self._spill() for i in range(start, self.PARTITIONS): - subdirs = [os.path.join(d, 'merge', str(i)) for d in self.localdirs] + subdirs = [os.path.join(d, "merge", str(i)) + for d in self.localdirs] m = ExternalHashMapMerger(self.combiner, self.memory_limit, - self.serializer, subdirs, self.scale * self.PARTITIONS) + self.serializer, subdirs, self.scale * self.PARTITIONS) m.pdata = [{} for _ in range(self.PARTITIONS)] limit = self.next_limit @@ -248,6 +253,6 @@ def _recursive_merged_items(self, start): yield v -if __name__ == '__main__': +if __name__ == "__main__": import doctest doctest.testmod() From 36525838b2578a616fcb6cc7f47552ce3515b134 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Tue, 22 Jul 2014 00:49:11 -0700 Subject: [PATCH 07/18] address comments fix code style and add docs and comments use ExternalMerger for map-side aggregation check memory usage during partitionBy() --- .../apache/spark/api/python/PythonRDD.scala | 3 +- .../spark/storage/DiskBlockManager.scala | 2 +- docs/configuration.md | 2 +- python/pyspark/rdd.py | 48 ++-- python/pyspark/shuffle.py | 207 +++++++++++++----- python/pyspark/tests.py | 56 +++-- 6 files changed, 226 insertions(+), 92 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index aa80592abb82..d6b0988641a9 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -57,7 +57,8 @@ private[spark] class PythonRDD[T: ClassTag]( override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = { val startTime = System.currentTimeMillis val env = SparkEnv.get - val localdir = env.conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")) + val localdir = env.blockManager.diskBlockManager.localDirs.map( + f => f.getPath()).mkString(",") val worker: Socket = env.createPythonWorker(pythonExec, envVars.toMap + ("SPARK_LOCAL_DIR" -> localdir)) diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index 673fc19c060a..2e7ed7538e6e 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -43,7 +43,7 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD /* Create one local directory for each path mentioned in spark.local.dir; then, inside this * directory, create multiple subdirectories that we will hash files into, in order to avoid * having really large inodes at the top level. */ - private val localDirs: Array[File] = createLocalDirs() + val localDirs: Array[File] = createLocalDirs() if (localDirs.isEmpty) { logError("Failed to create any local dir.") System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR) diff --git a/docs/configuration.md b/docs/configuration.md index d36c12ee5763..110a5246a24e 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -201,7 +201,7 @@ Apart from these, the following properties are also available, and may be useful Amount of memory to use per python worker process during aggregation, in the same format as JVM memory strings (e.g. 512m, 2g). If the memory - used during aggregation go above this amount, it will spill the data into disks. + used during aggregation goes above this amount, it will spill the data into disks. diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 70143dafd612..2f6280deb8da 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -42,7 +42,8 @@ from pyspark.rddsampler import RDDSampler from pyspark.storagelevel import StorageLevel from pyspark.resultiterable import ResultIterable -from pyspark.shuffle import MapMerger, ExternalHashMapMerger +from pyspark.shuffle import Aggregator, InMemoryMerger, ExternalMerger, \ + get_used_memory from py4j.java_collections import ListConverter, MapConverter @@ -171,18 +172,20 @@ def _replaceRoot(self, value): def _parse_memory(s): """ - It returns a number in MB + Parse a memory string in the format supported by Java (e.g. 1g, 200m) and + return the value in MB >>> _parse_memory("256m") 256 >>> _parse_memory("2g") 2048 """ - units = {'g': 1024, 'm': 1, 't': 1<<20, 'k':1.0/1024} + units = {'g': 1024, 'm': 1, 't': 1 << 20, 'k': 1.0 / 1024} if s[-1] not in units: raise ValueError("invalid format: " + s) return int(float(s[:-1]) * units[s[-1].lower()]) + class RDD(object): """ @@ -1198,15 +1201,25 @@ def partitionBy(self, numPartitions, partitionFunc=None): # to Java. Each object is a (splitNumber, [objects]) pair. outputSerializer = self.ctx._unbatched_serializer + limit = _parse_memory(self.ctx._conf.get("spark.python.worker.memory") + or "512m") def add_shuffle_key(split, iterator): buckets = defaultdict(list) - + c, batch = 0, 1000 for (k, v) in iterator: buckets[partitionFunc(k) % numPartitions].append((k, v)) + c += 1 + if c % batch == 0 and get_used_memory() > limit: + for split in buckets.keys(): + yield pack_long(split) + yield outputSerializer.dumps(buckets[split]) + del buckets[split] + for (split, items) in buckets.iteritems(): yield pack_long(split) yield outputSerializer.dumps(items) + keyed = PipelinedRDD(self, add_shuffle_key) keyed._bypass_serializer = True with _JavaStackTrace(self.context) as st: @@ -1251,27 +1264,26 @@ def combineByKey(self, createCombiner, mergeValue, mergeCombiners, if numPartitions is None: numPartitions = self._defaultReducePartitions() + serializer = self.ctx.serializer + spill = (self.ctx._conf.get("spark.shuffle.spill") or 'True').lower() == 'true' + memory = _parse_memory(self.ctx._conf.get("spark.python.worker.memory") or "512m") + agg = Aggregator(createCombiner, mergeValue, mergeCombiners) + def combineLocally(iterator): - combiners = {} - for x in iterator: - (k, v) = x - if k not in combiners: - combiners[k] = createCombiner(v) - else: - combiners[k] = mergeValue(combiners[k], v) - return combiners.iteritems() + merger = ExternalMerger(agg, memory, serializer) \ + if spill else InMemoryMerger(agg) + merger.combine(iterator) + return merger.iteritems() + locally_combined = self.mapPartitions(combineLocally) shuffled = locally_combined.partitionBy(numPartitions) - serializer = self.ctx.serializer - spill = ((self.ctx._conf.get("spark.shuffle.spill") or 'True').lower() - in ('true', '1', 'yes')) - memory = _parse_memory(self.ctx._conf.get("spark.python.worker.memory") or "512m") def _mergeCombiners(iterator): - merger = ExternalHashMapMerger(mergeCombiners, memory, serializer)\ - if spill else MapMerger(mergeCombiners) + merger = ExternalMerger(agg, memory, serializer) \ + if spill else InMemoryMerger(agg) merger.merge(iterator) return merger.iteritems() + return shuffled.mapPartitions(_mergeCombiners) def aggregateByKey(self, zeroValue, seqFunc, combFunc, numPartitions=None): diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py index 705ceba34ee7..c4c34e9510a8 100644 --- a/python/pyspark/shuffle.py +++ b/python/pyspark/shuffle.py @@ -21,7 +21,7 @@ import shutil import warnings -from pyspark.serializers import BatchedSerializer, AutoSerializer +from pyspark.serializers import BatchedSerializer, PickleSerializer try: import psutil @@ -46,97 +46,166 @@ def get_used_memory(): return 0 +class Aggregator(object): + + def __init__(self, creator, combiner, mergeCombiner=None): + self.creator = creator + self.combiner = combiner + self.mergeCombiner = mergeCombiner or combiner + + class Merger(object): + """ merge shuffled data together by combinator """ + def __init__(self, aggregator): + self.agg = aggregator + + def combine(self, iterator): + """ combine the items by creator and combiner """ + raise NotImplementedError + def merge(self, iterator): + """ merge the combined items by mergeCombiner """ raise NotImplementedError def iteritems(self): + """ return the merged items ad iterator """ raise NotImplementedError -class MapMerger(Merger): +class InMemoryMerger(Merger): """ In memory merger based on map """ - def __init__(self, combiner): - self.combiner = combiner + def __init__(self, aggregator): + Merger.__init__(self, aggregator) self.data = {} + def combine(self, iterator): + """ combine the items by creator and combiner """ + # speed up attributes lookup + d, creator, comb = self.data, self.agg.creator, self.agg.combiner + for k, v in iterator: + d[k] = comb(d[k], v) if k in d else creator(v) + def merge(self, iterator): - d, comb = self.data, self.combiner - for k, v in iter(iterator): + """ merge the combined items by mergeCombiner """ + # speed up attributes lookup + d, comb = self.data, self.agg.mergeCombiner + for k, v in iterator: d[k] = comb(d[k], v) if k in d else v def iteritems(self): + """ return the merged items ad iterator """ return self.data.iteritems() -class ExternalHashMapMerger(Merger): +class ExternalMerger(Merger): """ External merger will dump the aggregated data into disks when memory usage is above the limit, then merge them together. - >>> combiner = lambda x, y:x+y - >>> merger = ExternalHashMapMerger(combiner, 10) + >>> agg = Aggregator(lambda x: x, lambda x, y: x + y) + >>> merger = ExternalMerger(agg, 10) >>> N = 10000 + >>> merger.combine(zip(xrange(N), xrange(N)) * 10) + >>> assert merger.spills > 0 + >>> sum(v for k,v in merger.iteritems()) + 499950000 + + >>> merger = ExternalMerger(agg, 10) >>> merger.merge(zip(xrange(N), xrange(N)) * 10) >>> assert merger.spills > 0 >>> sum(v for k,v in merger.iteritems()) 499950000 """ - PARTITIONS = 64 - BATCH = 10000 + PARTITIONS = 64 # number of partitions when spill data into disks + BATCH = 10000 # check the memory after # of items merged - def __init__(self, combiner, memory_limit=512, serializer=None, + def __init__(self, aggregator, memory_limit=512, serializer=None, localdirs=None, scale=1): - self.combiner = combiner + Merger.__init__(self, aggregator) self.memory_limit = memory_limit - self.serializer = serializer or\ - BatchedSerializer(AutoSerializer(), 1024) + # default serializer is only used for tests + self.serializer = serializer or \ + BatchedSerializer(PickleSerializer(), 1024) self.localdirs = localdirs or self._get_dirs() + # scale is used to scale down the hash of key for recursive hash map, self.scale = scale + # unpartitioned merged data self.data = {} + # partitioned merged data self.pdata = [] + # number of chunks dumped into disks self.spills = 0 def _get_dirs(self): + """ get all the directories """ path = os.environ.get("SPARK_LOCAL_DIR", "/tmp/spark") dirs = path.split(",") - localdirs = [] - for d in dirs: - d = os.path.join(d, "merge", str(os.getpid())) - try: - os.makedirs(d) - localdirs.append(d) - except IOError: - pass - if not localdirs: - raise IOError("no writable directories: " + path) - return localdirs + return [os.path.join(d, "python", str(os.getpid())) + for d in dirs] def _get_spill_dir(self, n): + """ choose one directory for spill by number n """ return os.path.join(self.localdirs[n % len(self.localdirs)], str(n)) - @property - def used_memory(self): - return get_used_memory() - - @property def next_limit(self): - return max(self.memory_limit, self.used_memory * 1.05) + """ + return the next memory limit. If the memory is not released + after spilling, it will dump the data only when the used memory + starts to increase. + """ + return max(self.memory_limit, get_used_memory() * 1.05) + + def combine(self, iterator): + """ combine the items by creator and combiner """ + iterator = iter(iterator) + # speedup attribute lookup + d, creator, comb = self.data, self.agg.creator, self.agg.combiner + c, batch = 0, self.BATCH + + for k, v in iterator: + d[k] = comb(d[k], v) if k in d else creator(v) + + c += 1 + if c % batch == 0 and get_used_memory() > self.memory_limit: + self._first_spill() + self._partitioned_combine(iterator, self.next_limit()) + break + + def _partition(self, key): + """ return the partition for key """ + return (hash(key) / self.scale) % self.PARTITIONS + + def _partitioned_combine(self, iterator, limit=0): + """ partition the items by key, then combine them """ + # speedup attribute lookup + creator, comb, pdata = self.agg.creator, self.agg.combiner, self.pdata + c, hfun, batch = 0, self._partition, self.BATCH + + for k, v in iterator: + d = pdata[hfun(k)] + d[k] = comb(d[k], v) if k in d else creator(v) + if not limit: + continue + + c += 1 + if c % batch == 0 and get_used_memory() > limit: + self._spill() + limit = self.next_limit() def merge(self, iterator, check=True): - """ merge (K,V) pair by combiner """ + """ merge (K,V) pair by mergeCombiner """ iterator = iter(iterator) # speedup attribute lookup - d, comb, batch = self.data, self.combiner, self.BATCH + d, comb, batch = self.data, self.agg.mergeCombiner, self.BATCH c = 0 for k, v in iterator: d[k] = comb(d[k], v) if k in d else v @@ -144,35 +213,45 @@ def merge(self, iterator, check=True): continue c += 1 - if c % batch == 0 and self.used_memory > self.memory_limit: + if c % batch == 0 and get_used_memory() > self.memory_limit: self._first_spill() - self._partitioned_merge(iterator, self.next_limit) + self._partitioned_merge(iterator, self.next_limit()) break - def _hash(self, key): - return (hash(key) / self.scale) % self.PARTITIONS - - def _partitioned_merge(self, iterator, limit): - comb, pdata, hfun = self.combiner, self.pdata, self._hash + def _partitioned_merge(self, iterator, limit=0): + """ partition the items by key, then merge them """ + comb, pdata, hfun = self.agg.mergeCombiner, self.pdata, self._partition c = 0 for k, v in iterator: d = pdata[hfun(k)] d[k] = comb(d[k], v) if k in d else v if not limit: continue + c += 1 - if c % self.BATCH == 0 and self.used_memory > limit: + if c % self.BATCH == 0 and get_used_memory() > limit: self._spill() - limit = self.next_limit + limit = self.next_limit() def _first_spill(self): + """ + dump all the data into disks partition by partition. + + The data has not been partitioned, it will iterator the dataset once, + write them into different files, has no additional memory. It only + called when the memory goes above limit at the first time. + """ path = self._get_spill_dir(self.spills) if not os.path.exists(path): os.makedirs(path) + # open all the files for writing streams = [open(os.path.join(path, str(i)), 'w') for i in range(self.PARTITIONS)] + for k, v in self.data.iteritems(): - h = self._hash(k) + h = self._partition(k) + # put one item in batch, make it compatitable with load_stream + # it will increase the memory if dump them in batch self.serializer.dump_stream([(k, v)], streams[h]) for s in streams: s.close() @@ -181,27 +260,35 @@ def _first_spill(self): self.spills += 1 def _spill(self): + """ + dump already partitioned data into disks. + + It will dump the data in batch for better performance. + """ path = self._get_spill_dir(self.spills) if not os.path.exists(path): os.makedirs(path) + for i in range(self.PARTITIONS): p = os.path.join(path, str(i)) with open(p, "w") as f: + # dump items in batch self.serializer.dump_stream(self.pdata[i].iteritems(), f) self.pdata[i].clear() self.spills += 1 def iteritems(self): - """ iterator of all merged (K,V) pairs """ + """ return all merged items as iterator """ if not self.pdata and not self.spills: return self.data.iteritems() return self._external_items() def _external_items(self): + """ return all partitioned items as iterator """ assert not self.data if any(self.pdata): self._spill() - hard_limit = self.next_limit + hard_limit = self.next_limit() try: for i in range(self.PARTITIONS): @@ -209,9 +296,10 @@ def _external_items(self): for j in range(self.spills): path = self._get_spill_dir(j) p = os.path.join(path, str(i)) + # do not check memory during merging self.merge(self.serializer.load_stream(open(p)), False) - if self.used_memory > hard_limit and j < self.spills - 1: + if get_used_memory() > hard_limit and j < self.spills - 1: self.data.clear() # will read from disk again for v in self._recursive_merged_items(i): yield v @@ -224,30 +312,39 @@ def _external_items(self): self._cleanup() def _cleanup(self): + """ clean up all the files in disks """ for d in self.localdirs: shutil.rmtree(d, True) def _recursive_merged_items(self, start): + """ + merge the partitioned items and return the as iterator + + If one partition can not be fit in memory, then them will be + partitioned and merged recursively. + """ + # make sure all the data are dumps into disks. assert not self.data - assert self.spills > 0 if any(self.pdata): self._spill() + assert self.spills > 0 for i in range(start, self.PARTITIONS): - subdirs = [os.path.join(d, "merge", str(i)) + subdirs = [os.path.join(d, "parts", str(i)) for d in self.localdirs] - m = ExternalHashMapMerger(self.combiner, self.memory_limit, - self.serializer, subdirs, self.scale * self.PARTITIONS) + m = ExternalMerger(self.agg, self.memory_limit, self.serializer, + subdirs, self.scale * self.PARTITIONS) m.pdata = [{} for _ in range(self.PARTITIONS)] - limit = self.next_limit + limit = self.next_limit() for j in range(self.spills): path = self._get_spill_dir(j) p = os.path.join(path, str(i)) - m._partitioned_merge(self.serializer.load_stream(open(p)), 0) - if m.used_memory > limit: + m._partitioned_merge(self.serializer.load_stream(open(p))) + + if get_used_memory() > limit: m._spill() - limit = self.next_limit + limit = self.next_limit() for v in m._external_items(): yield v diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index dad9ea8dd75e..234c2325d817 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -34,7 +34,7 @@ from pyspark.context import SparkContext from pyspark.files import SparkFiles from pyspark.serializers import read_int -from pyspark.shuffle import MapMerger, ExternalHashMapMerger +from pyspark.shuffle import Aggregator, InMemoryMerger, ExternalMerger _have_scipy = False try: @@ -51,34 +51,58 @@ class TestMerger(unittest.TestCase): def setUp(self): - self.N = 1<<18 + self.N = 1 << 16 self.l = [i for i in xrange(self.N)] self.data = zip(self.l, self.l) - ExternalHashMapMerger.PARTITIONS = 8 - ExternalHashMapMerger.BATCH = 1<<14 + self.agg = Aggregator(lambda x: [x], + lambda x, y: x.append(y) or x, + lambda x, y: x.extend(y) or x) + ExternalMerger.PARTITIONS = 8 + ExternalMerger.BATCH = 1 << 14 def test_in_memory(self): - m = MapMerger(lambda x,y: x+y) - m.merge(self.data) - self.assertEqual(sum(v for k,v in m.iteritems()), sum(xrange(self.N))) + m = InMemoryMerger(self.agg) + m.combine(self.data) + self.assertEqual(sum(sum(v) for k, v in m.iteritems()), + sum(xrange(self.N))) + + m = InMemoryMerger(self.agg) + m.merge(map(lambda (x, y): (x, [y]), self.data)) + self.assertEqual(sum(sum(v) for k, v in m.iteritems()), + sum(xrange(self.N))) def test_small_dataset(self): - m = ExternalHashMapMerger(lambda x,y: x+y, 1000) - m.merge(self.data) + m = ExternalMerger(self.agg, 1000) + m.combine(self.data) + self.assertEqual(m.spills, 0) + self.assertEqual(sum(sum(v) for k, v in m.iteritems()), + sum(xrange(self.N))) + + m = ExternalMerger(self.agg, 1000) + m.merge(map(lambda (x, y): (x, [y]), self.data)) self.assertEqual(m.spills, 0) - self.assertEqual(sum(v for k,v in m.iteritems()), sum(xrange(self.N))) + self.assertEqual(sum(sum(v) for k, v in m.iteritems()), + sum(xrange(self.N))) def test_medium_dataset(self): - m = ExternalHashMapMerger(lambda x,y: x+y, 10) - m.merge(self.data * 3) + m = ExternalMerger(self.agg, 10) + m.combine(self.data) + self.assertTrue(m.spills >= 1) + self.assertEqual(sum(sum(v) for k, v in m.iteritems()), + sum(xrange(self.N))) + + m = ExternalMerger(self.agg, 10) + m.merge(map(lambda (x, y): (x, [y]), self.data * 3)) self.assertTrue(m.spills >= 1) - self.assertEqual(sum(v for k,v in m.iteritems()), sum(xrange(self.N)) * 3) + self.assertEqual(sum(sum(v) for k, v in m.iteritems()), + sum(xrange(self.N)) * 3) def test_huge_dataset(self): - m = ExternalHashMapMerger(lambda x,y: x + y, 10) - m.merge(map(lambda (k,v): (k, [str(v)]), self.data) * 10) + m = ExternalMerger(self.agg, 10) + m.merge(map(lambda (k, v): (k, [str(v)]), self.data * 10)) self.assertTrue(m.spills >= 1) - self.assertEqual(sum(len(v) for k,v in m._recursive_merged_items(0)), self.N * 10) + self.assertEqual(sum(len(v) for k, v in m._recursive_merged_items(0)), + self.N * 10) m._cleanup() From 1a97ce40ff3781e556e27bebb2a8fe205848e2b1 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Tue, 22 Jul 2014 14:36:35 -0700 Subject: [PATCH 08/18] limit used memory and size of objects in partitionBy() --- python/pyspark/rdd.py | 28 ++++++++++++++++++++++------ python/pyspark/shuffle.py | 2 +- 2 files changed, 23 insertions(+), 7 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 37693141ab0d..366252c47166 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1227,23 +1227,39 @@ def partitionBy(self, numPartitions, partitionFunc=portable_hash): # Transferring O(n) objects to Java is too expensive. Instead, we'll # form the hash buckets in Python, transferring O(numPartitions) objects - # to Java. Each object is a (splitNumber, [objects]) pair. + # to Java. Each object is a (splitNumber, [objects]) pair. + # In order to void too huge objects, the objects are grouped into chunks. outputSerializer = self.ctx._unbatched_serializer - limit = _parse_memory(self.ctx._conf.get("spark.python.worker.memory") - or "512m") + limit = (_parse_memory(self.ctx._conf.get("spark.python.worker.memory") + or "512m") / 2) def add_shuffle_key(split, iterator): buckets = defaultdict(list) - c, batch = 0, 1000 + c, batch = 0, min(10 * numPartitions, 1000) + for (k, v) in iterator: buckets[partitionFunc(k) % numPartitions].append((k, v)) c += 1 - if c % batch == 0 and get_used_memory() > limit: + + # check used memory and avg size of chunk of objects + if (c % 1000 == 0 and get_used_memory() > limit + or c > batch): + n, size = len(buckets), 0 for split in buckets.keys(): yield pack_long(split) - yield outputSerializer.dumps(buckets[split]) + d = outputSerializer.dumps(buckets[split]) del buckets[split] + yield d + size += len(d) + + avg = (size / n) >> 20 + # let 1M < avg < 10M + if avg < 1: + batch *= 1.5 + elif avg > 10: + batch = max(batch / 1.5, 1) + c = 0 for (split, items) in buckets.iteritems(): yield pack_long(split) diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py index c4c34e9510a8..2cf32892fe3b 100644 --- a/python/pyspark/shuffle.py +++ b/python/pyspark/shuffle.py @@ -149,7 +149,7 @@ def _get_dirs(self): """ get all the directories """ path = os.environ.get("SPARK_LOCAL_DIR", "/tmp/spark") dirs = path.split(",") - return [os.path.join(d, "python", str(os.getpid())) + return [os.path.join(d, "python", str(os.getpid()), str(id(self))) for d in dirs] def _get_spill_dir(self, n): From fdd0a4967b1876678616a112a72d3d3806b7c16c Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Tue, 22 Jul 2014 15:03:29 -0700 Subject: [PATCH 09/18] add long doc string for ExternalMerger --- python/pyspark/shuffle.py | 31 +++++++++++++++++++++++++++---- 1 file changed, 27 insertions(+), 4 deletions(-) diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py index 2cf32892fe3b..05f599fb99a7 100644 --- a/python/pyspark/shuffle.py +++ b/python/pyspark/shuffle.py @@ -57,7 +57,7 @@ def __init__(self, creator, combiner, mergeCombiner=None): class Merger(object): """ - merge shuffled data together by combinator + merge shuffled data together by aggregator """ def __init__(self, aggregator): @@ -77,8 +77,9 @@ def iteritems(self): class InMemoryMerger(Merger): + """ - In memory merger based on map + In memory merger based on in-memory dict. """ def __init__(self, aggregator): @@ -107,8 +108,30 @@ def iteritems(self): class ExternalMerger(Merger): """ - External merger will dump the aggregated data into disks when memory usage - is above the limit, then merge them together. + External merger will dump the aggregated data into disks when + memory usage goes above the limit, then merge them together. + + This class works as follows: + + - It repeatedly combine the items and save them in one dict in + memory. + + - When the used memory goes above memory limit, it will split + the combined data into partitions by hash code, dump them + into disk, one file per partition. + + - Then it goes through the rest of the iterator, combine items + into different dict by hash. Until the used memory goes over + memory limit, it dump all the dicts into disks, one file per + dict. Repeat this again until combine all the items. + + - Before return any items, it will load each partition and + combine them seperately. Yield them before loading next + partition. + + - During loading a partition, if the memory goes over limit, + it will partition the loaded data and dump them into disks + and load them partition by partition again. >>> agg = Aggregator(lambda x: x, lambda x, y: x + y) >>> merger = ExternalMerger(agg, 10) From 6178844b5747ca6fe6feb2b0b8f8c1c847dedd3d Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Tue, 22 Jul 2014 18:45:14 -0700 Subject: [PATCH 10/18] refactor and improve docs --- python/epydoc.conf | 2 +- python/pyspark/rdd.py | 4 +- python/pyspark/shuffle.py | 130 ++++++++++++++++++++++++-------------- python/pyspark/tests.py | 16 ++--- 4 files changed, 94 insertions(+), 58 deletions(-) diff --git a/python/epydoc.conf b/python/epydoc.conf index b73860bad826..51c0faf35993 100644 --- a/python/epydoc.conf +++ b/python/epydoc.conf @@ -35,4 +35,4 @@ private: no exclude: pyspark.cloudpickle pyspark.worker pyspark.join pyspark.java_gateway pyspark.examples pyspark.shell pyspark.tests pyspark.rddsampler pyspark.daemon pyspark.mllib._common - pyspark.mllib.tests + pyspark.mllib.tests pyspark.shuffle diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 366252c47166..666d7d7fab14 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1317,7 +1317,7 @@ def combineByKey(self, createCombiner, mergeValue, mergeCombiners, def combineLocally(iterator): merger = ExternalMerger(agg, memory, serializer) \ if spill else InMemoryMerger(agg) - merger.combine(iterator) + merger.mergeValues(iterator) return merger.iteritems() locally_combined = self.mapPartitions(combineLocally) @@ -1326,7 +1326,7 @@ def combineLocally(iterator): def _mergeCombiners(iterator): merger = ExternalMerger(agg, memory, serializer) \ if spill else InMemoryMerger(agg) - merger.merge(iterator) + merger.mergeCombiners(iterator) return merger.iteritems() return shuffled.mapPartitions(_mergeCombiners) diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py index 05f599fb99a7..5dbcb780aaba 100644 --- a/python/pyspark/shuffle.py +++ b/python/pyspark/shuffle.py @@ -27,31 +27,54 @@ import psutil def get_used_memory(): + """ return the used memory in MB """ self = psutil.Process(os.getpid()) return self.memory_info().rss >> 20 except ImportError: def get_used_memory(): + """ return the used memory in MB """ if platform.system() == 'Linux': for line in open('/proc/self/status'): if line.startswith('VmRSS:'): return int(line.split()[1]) >> 10 else: - warnings.warn("please install psutil to get accurate memory usage") + warnings.warn("please install psutil to have better " + "support with spilling") if platform.system() == "Darwin": import resource - return resource.getrusage(resource.RUSAGE_SELF).ru_maxrss >> 20 + rss = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss + return rss >> 20 # TODO: support windows return 0 class Aggregator(object): - def __init__(self, creator, combiner, mergeCombiner=None): - self.creator = creator - self.combiner = combiner - self.mergeCombiner = mergeCombiner or combiner + """ + Aggregator has tree functions to merge values into combiner. + + createCombiner: (value) -> combiner + mergeValue: (combine, value) -> combiner + mergeCombiners: (combiner, combiner) -> combiner + """ + + def __init__(self, createCombiner, mergeValue, mergeCombiners): + self.createCombiner = createCombiner + self.mergeValue = mergeValue + self.mergeCombiners = mergeCombiners + + +class SimpleAggregator(Aggregator): + + """ + SimpleAggregator is useful for the cases that combiners have + same type with values + """ + + def __init__(self, combiner): + Aggregator.__init__(self, lambda x: x, combiner, combiner) class Merger(object): @@ -63,11 +86,11 @@ class Merger(object): def __init__(self, aggregator): self.agg = aggregator - def combine(self, iterator): + def mergeValues(self, iterator): """ combine the items by creator and combiner """ raise NotImplementedError - def merge(self, iterator): + def mergeCombiners(self, iterator): """ merge the combined items by mergeCombiner """ raise NotImplementedError @@ -86,17 +109,18 @@ def __init__(self, aggregator): Merger.__init__(self, aggregator) self.data = {} - def combine(self, iterator): + def mergeValues(self, iterator): """ combine the items by creator and combiner """ # speed up attributes lookup - d, creator, comb = self.data, self.agg.creator, self.agg.combiner + d, creator = self.data, self.agg.createCombiner + comb = self.agg.mergeValue for k, v in iterator: d[k] = comb(d[k], v) if k in d else creator(v) - def merge(self, iterator): + def mergeCombiners(self, iterator): """ merge the combined items by mergeCombiner """ # speed up attributes lookup - d, comb = self.data, self.agg.mergeCombiner + d, comb = self.data, self.agg.mergeCombiners for k, v in iterator: d[k] = comb(d[k], v) if k in d else v @@ -133,32 +157,43 @@ class ExternalMerger(Merger): it will partition the loaded data and dump them into disks and load them partition by partition again. - >>> agg = Aggregator(lambda x: x, lambda x, y: x + y) + `data` and `pdata` are used to hold the merged items in memory. + At first, all the data are merged into `data`. Once the used + memory goes over limit, the items in `data` are dumped indo + disks, `data` will be cleared, all rest of items will be merged + into `pdata` and then dumped into disks. Before returning, all + the items in `pdata` will be dumped into disks. + + Finally, if any items were spilled into disks, each partition + will be merged into `data` and be yielded, then cleared. + + >>> agg = SimpleAggregator(lambda x, y: x + y) >>> merger = ExternalMerger(agg, 10) >>> N = 10000 - >>> merger.combine(zip(xrange(N), xrange(N)) * 10) + >>> merger.mergeValues(zip(xrange(N), xrange(N)) * 10) >>> assert merger.spills > 0 >>> sum(v for k,v in merger.iteritems()) 499950000 >>> merger = ExternalMerger(agg, 10) - >>> merger.merge(zip(xrange(N), xrange(N)) * 10) + >>> merger.mergeCombiners(zip(xrange(N), xrange(N)) * 10) >>> assert merger.spills > 0 >>> sum(v for k,v in merger.iteritems()) 499950000 """ - PARTITIONS = 64 # number of partitions when spill data into disks - BATCH = 10000 # check the memory after # of items merged - def __init__(self, aggregator, memory_limit=512, serializer=None, - localdirs=None, scale=1): + localdirs=None, scale=1, partitions=64, batch=10000): Merger.__init__(self, aggregator) self.memory_limit = memory_limit # default serializer is only used for tests self.serializer = serializer or \ BatchedSerializer(PickleSerializer(), 1024) self.localdirs = localdirs or self._get_dirs() + # number of partitions when spill data into disks + self.partitions = partitions + # check the memory after # of items merged + self.batch = batch # scale is used to scale down the hash of key for recursive hash map, self.scale = scale # unpartitioned merged data @@ -187,12 +222,12 @@ def next_limit(self): """ return max(self.memory_limit, get_used_memory() * 1.05) - def combine(self, iterator): + def mergeValues(self, iterator): """ combine the items by creator and combiner """ iterator = iter(iterator) # speedup attribute lookup - d, creator, comb = self.data, self.agg.creator, self.agg.combiner - c, batch = 0, self.BATCH + creator, comb = self.agg.createCombiner, self.agg.mergeValue + d, c, batch = self.data, 0, self.batch for k, v in iterator: d[k] = comb(d[k], v) if k in d else creator(v) @@ -200,18 +235,18 @@ def combine(self, iterator): c += 1 if c % batch == 0 and get_used_memory() > self.memory_limit: self._first_spill() - self._partitioned_combine(iterator, self.next_limit()) + self._partitioned_mergeValues(iterator, self.next_limit()) break def _partition(self, key): """ return the partition for key """ - return (hash(key) / self.scale) % self.PARTITIONS + return (hash(key) / self.scale) % self.partitions - def _partitioned_combine(self, iterator, limit=0): + def _partitioned_mergeValues(self, iterator, limit=0): """ partition the items by key, then combine them """ # speedup attribute lookup - creator, comb, pdata = self.agg.creator, self.agg.combiner, self.pdata - c, hfun, batch = 0, self._partition, self.BATCH + creator, comb = self.agg.createCombiner, self.agg.mergeValue + c, pdata, hfun, batch = 0, self.pdata, self._partition, self.batch for k, v in iterator: d = pdata[hfun(k)] @@ -224,11 +259,11 @@ def _partitioned_combine(self, iterator, limit=0): self._spill() limit = self.next_limit() - def merge(self, iterator, check=True): + def mergeCombiners(self, iterator, check=True): """ merge (K,V) pair by mergeCombiner """ iterator = iter(iterator) # speedup attribute lookup - d, comb, batch = self.data, self.agg.mergeCombiner, self.BATCH + d, comb, batch = self.data, self.agg.mergeCombiners, self.batch c = 0 for k, v in iterator: d[k] = comb(d[k], v) if k in d else v @@ -238,13 +273,13 @@ def merge(self, iterator, check=True): c += 1 if c % batch == 0 and get_used_memory() > self.memory_limit: self._first_spill() - self._partitioned_merge(iterator, self.next_limit()) + self._partitioned_mergeCombiners(iterator, self.next_limit()) break - def _partitioned_merge(self, iterator, limit=0): + def _partitioned_mergeCombiners(self, iterator, limit=0): """ partition the items by key, then merge them """ - comb, pdata, hfun = self.agg.mergeCombiner, self.pdata, self._partition - c = 0 + comb, pdata = self.agg.mergeCombiners, self.pdata + c, hfun = 0, self._partition for k, v in iterator: d = pdata[hfun(k)] d[k] = comb(d[k], v) if k in d else v @@ -252,7 +287,7 @@ def _partitioned_merge(self, iterator, limit=0): continue c += 1 - if c % self.BATCH == 0 and get_used_memory() > limit: + if c % self.batch == 0 and get_used_memory() > limit: self._spill() limit = self.next_limit() @@ -260,16 +295,17 @@ def _first_spill(self): """ dump all the data into disks partition by partition. - The data has not been partitioned, it will iterator the dataset once, - write them into different files, has no additional memory. It only - called when the memory goes above limit at the first time. + The data has not been partitioned, it will iterator the + dataset once, write them into different files, has no + additional memory. It only called when the memory goes + above limit at the first time. """ path = self._get_spill_dir(self.spills) if not os.path.exists(path): os.makedirs(path) # open all the files for writing streams = [open(os.path.join(path, str(i)), 'w') - for i in range(self.PARTITIONS)] + for i in range(self.partitions)] for k, v in self.data.iteritems(): h = self._partition(k) @@ -279,7 +315,7 @@ def _first_spill(self): for s in streams: s.close() self.data.clear() - self.pdata = [{} for i in range(self.PARTITIONS)] + self.pdata = [{} for i in range(self.partitions)] self.spills += 1 def _spill(self): @@ -292,7 +328,7 @@ def _spill(self): if not os.path.exists(path): os.makedirs(path) - for i in range(self.PARTITIONS): + for i in range(self.partitions): p = os.path.join(path, str(i)) with open(p, "w") as f: # dump items in batch @@ -314,13 +350,14 @@ def _external_items(self): hard_limit = self.next_limit() try: - for i in range(self.PARTITIONS): + for i in range(self.partitions): self.data = {} for j in range(self.spills): path = self._get_spill_dir(j) p = os.path.join(path, str(i)) # do not check memory during merging - self.merge(self.serializer.load_stream(open(p)), False) + self.mergeCombiners(self.serializer.load_stream(open(p)), + False) if get_used_memory() > hard_limit and j < self.spills - 1: self.data.clear() # will read from disk again @@ -352,18 +389,19 @@ def _recursive_merged_items(self, start): self._spill() assert self.spills > 0 - for i in range(start, self.PARTITIONS): + for i in range(start, self.partitions): subdirs = [os.path.join(d, "parts", str(i)) for d in self.localdirs] m = ExternalMerger(self.agg, self.memory_limit, self.serializer, - subdirs, self.scale * self.PARTITIONS) - m.pdata = [{} for _ in range(self.PARTITIONS)] + subdirs, self.scale * self.partitions) + m.pdata = [{} for _ in range(self.partitions)] limit = self.next_limit() for j in range(self.spills): path = self._get_spill_dir(j) p = os.path.join(path, str(i)) - m._partitioned_merge(self.serializer.load_stream(open(p))) + m._partitioned_mergeCombiners( + self.serializer.load_stream(open(p))) if get_used_memory() > limit: m._spill() diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index 64f542622255..a92abbf371f1 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -57,49 +57,47 @@ def setUp(self): self.agg = Aggregator(lambda x: [x], lambda x, y: x.append(y) or x, lambda x, y: x.extend(y) or x) - ExternalMerger.PARTITIONS = 8 - ExternalMerger.BATCH = 1 << 14 def test_in_memory(self): m = InMemoryMerger(self.agg) - m.combine(self.data) + m.mergeValues(self.data) self.assertEqual(sum(sum(v) for k, v in m.iteritems()), sum(xrange(self.N))) m = InMemoryMerger(self.agg) - m.merge(map(lambda (x, y): (x, [y]), self.data)) + m.mergeCombiners(map(lambda (x, y): (x, [y]), self.data)) self.assertEqual(sum(sum(v) for k, v in m.iteritems()), sum(xrange(self.N))) def test_small_dataset(self): m = ExternalMerger(self.agg, 1000) - m.combine(self.data) + m.mergeValues(self.data) self.assertEqual(m.spills, 0) self.assertEqual(sum(sum(v) for k, v in m.iteritems()), sum(xrange(self.N))) m = ExternalMerger(self.agg, 1000) - m.merge(map(lambda (x, y): (x, [y]), self.data)) + m.mergeCombiners(map(lambda (x, y): (x, [y]), self.data)) self.assertEqual(m.spills, 0) self.assertEqual(sum(sum(v) for k, v in m.iteritems()), sum(xrange(self.N))) def test_medium_dataset(self): m = ExternalMerger(self.agg, 10) - m.combine(self.data) + m.mergeValues(self.data) self.assertTrue(m.spills >= 1) self.assertEqual(sum(sum(v) for k, v in m.iteritems()), sum(xrange(self.N))) m = ExternalMerger(self.agg, 10) - m.merge(map(lambda (x, y): (x, [y]), self.data * 3)) + m.mergeCombiners(map(lambda (x, y): (x, [y]), self.data * 3)) self.assertTrue(m.spills >= 1) self.assertEqual(sum(sum(v) for k, v in m.iteritems()), sum(xrange(self.N)) * 3) def test_huge_dataset(self): m = ExternalMerger(self.agg, 10) - m.merge(map(lambda (k, v): (k, [str(v)]), self.data * 10)) + m.mergeCombiners(map(lambda (k, v): (k, [str(v)]), self.data * 10)) self.assertTrue(m.spills >= 1) self.assertEqual(sum(len(v) for k, v in m._recursive_merged_items(0)), self.N * 10) From 400be01848d84f23c60d6599b54b90a9952773c8 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 23 Jul 2014 10:07:22 -0700 Subject: [PATCH 11/18] address all the comments --- python/pyspark/rdd.py | 27 +++++++++------- python/pyspark/shuffle.py | 66 ++++++++++++++++++++++++--------------- 2 files changed, 57 insertions(+), 36 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 666d7d7fab14..1031c6d8086a 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1225,14 +1225,17 @@ def partitionBy(self, numPartitions, partitionFunc=portable_hash): if numPartitions is None: numPartitions = self._defaultReducePartitions() - # Transferring O(n) objects to Java is too expensive. Instead, we'll - # form the hash buckets in Python, transferring O(numPartitions) objects - # to Java. Each object is a (splitNumber, [objects]) pair. - # In order to void too huge objects, the objects are grouped into chunks. + # Transferring O(n) objects to Java is too expensive. + # Instead, we'll form the hash buckets in Python, + # transferring O(numPartitions) objects to Java. + # Each object is a (splitNumber, [objects]) pair. + # In order to void too huge objects, the objects are + # grouped into chunks. outputSerializer = self.ctx._unbatched_serializer - limit = (_parse_memory(self.ctx._conf.get("spark.python.worker.memory") - or "512m") / 2) + limit = (_parse_memory(self.ctx._conf.get( + "spark.python.worker.memory", "512m") / 2) + def add_shuffle_key(split, iterator): buckets = defaultdict(list) @@ -1274,8 +1277,8 @@ def add_shuffle_key(split, iterator): id(partitionFunc)) jrdd = pairRDD.partitionBy(partitioner).values() rdd = RDD(jrdd, self.ctx, BatchedSerializer(outputSerializer)) - # This is required so that id(partitionFunc) remains unique, even if - # partitionFunc is a lambda: + # This is required so that id(partitionFunc) remains unique, + # even if partitionFunc is a lambda: rdd._partitionFunc = partitionFunc return rdd @@ -1310,8 +1313,10 @@ def combineByKey(self, createCombiner, mergeValue, mergeCombiners, numPartitions = self._defaultReducePartitions() serializer = self.ctx.serializer - spill = (self.ctx._conf.get("spark.shuffle.spill") or 'True').lower() == 'true' - memory = _parse_memory(self.ctx._conf.get("spark.python.worker.memory") or "512m") + spill = (self.ctx._conf.get("spark.shuffle.spill", 'True').lower() + == 'true') + memory = (_parse_memory(self.ctx._conf.get( + "spark.python.worker.memory","512m") agg = Aggregator(createCombiner, mergeValue, mergeCombiners) def combineLocally(iterator): @@ -1322,7 +1327,7 @@ def combineLocally(iterator): locally_combined = self.mapPartitions(combineLocally) shuffled = locally_combined.partitionBy(numPartitions) - + def _mergeCombiners(iterator): merger = ExternalMerger(agg, memory, serializer) \ if spill else InMemoryMerger(agg) diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py index 5dbcb780aaba..3307b245a483 100644 --- a/python/pyspark/shuffle.py +++ b/python/pyspark/shuffle.py @@ -27,20 +27,20 @@ import psutil def get_used_memory(): - """ return the used memory in MB """ + """ Return the used memory in MB """ self = psutil.Process(os.getpid()) return self.memory_info().rss >> 20 except ImportError: def get_used_memory(): - """ return the used memory in MB """ + """ Return the used memory in MB """ if platform.system() == 'Linux': for line in open('/proc/self/status'): if line.startswith('VmRSS:'): return int(line.split()[1]) >> 10 else: - warnings.warn("please install psutil to have better " + warnings.warn("Please install psutil to have better " "support with spilling") if platform.system() == "Darwin": import resource @@ -80,22 +80,22 @@ def __init__(self, combiner): class Merger(object): """ - merge shuffled data together by aggregator + Merge shuffled data together by aggregator """ def __init__(self, aggregator): self.agg = aggregator def mergeValues(self, iterator): - """ combine the items by creator and combiner """ + """ Combine the items by creator and combiner """ raise NotImplementedError def mergeCombiners(self, iterator): - """ merge the combined items by mergeCombiner """ + """ Merge the combined items by mergeCombiner """ raise NotImplementedError def iteritems(self): - """ return the merged items ad iterator """ + """ Return the merged items ad iterator """ raise NotImplementedError @@ -110,7 +110,7 @@ def __init__(self, aggregator): self.data = {} def mergeValues(self, iterator): - """ combine the items by creator and combiner """ + """ Combine the items by creator and combiner """ # speed up attributes lookup d, creator = self.data, self.agg.createCombiner comb = self.agg.mergeValue @@ -118,14 +118,14 @@ def mergeValues(self, iterator): d[k] = comb(d[k], v) if k in d else creator(v) def mergeCombiners(self, iterator): - """ merge the combined items by mergeCombiner """ + """ Merge the combined items by mergeCombiner """ # speed up attributes lookup d, comb = self.data, self.agg.mergeCombiners for k, v in iterator: d[k] = comb(d[k], v) if k in d else v def iteritems(self): - """ return the merged items ad iterator """ + """ Return the merged items ad iterator """ return self.data.iteritems() @@ -182,6 +182,8 @@ class ExternalMerger(Merger): 499950000 """ + TOTAL_PARTITIONS = 4096 + def __init__(self, aggregator, memory_limit=512, serializer=None, localdirs=None, scale=1, partitions=64, batch=10000): Merger.__init__(self, aggregator) @@ -198,32 +200,32 @@ def __init__(self, aggregator, memory_limit=512, serializer=None, self.scale = scale # unpartitioned merged data self.data = {} - # partitioned merged data + # partitioned merged data, list of dicts self.pdata = [] # number of chunks dumped into disks self.spills = 0 def _get_dirs(self): - """ get all the directories """ - path = os.environ.get("SPARK_LOCAL_DIR", "/tmp/spark") + """ Get all the directories """ + path = os.environ.get("SPARK_LOCAL_DIR", "/tmp") dirs = path.split(",") return [os.path.join(d, "python", str(os.getpid()), str(id(self))) for d in dirs] def _get_spill_dir(self, n): - """ choose one directory for spill by number n """ + """ Choose one directory for spill by number n """ return os.path.join(self.localdirs[n % len(self.localdirs)], str(n)) def next_limit(self): """ - return the next memory limit. If the memory is not released + Return the next memory limit. If the memory is not released after spilling, it will dump the data only when the used memory starts to increase. """ return max(self.memory_limit, get_used_memory() * 1.05) def mergeValues(self, iterator): - """ combine the items by creator and combiner """ + """ Combine the items by creator and combiner """ iterator = iter(iterator) # speedup attribute lookup creator, comb = self.agg.createCombiner, self.agg.mergeValue @@ -239,11 +241,11 @@ def mergeValues(self, iterator): break def _partition(self, key): - """ return the partition for key """ + """ Return the partition for key """ return (hash(key) / self.scale) % self.partitions def _partitioned_mergeValues(self, iterator, limit=0): - """ partition the items by key, then combine them """ + """ Partition the items by key, then combine them """ # speedup attribute lookup creator, comb = self.agg.createCombiner, self.agg.mergeValue c, pdata, hfun, batch = 0, self.pdata, self._partition, self.batch @@ -260,7 +262,7 @@ def _partitioned_mergeValues(self, iterator, limit=0): limit = self.next_limit() def mergeCombiners(self, iterator, check=True): - """ merge (K,V) pair by mergeCombiner """ + """ Merge (K,V) pair by mergeCombiner """ iterator = iter(iterator) # speedup attribute lookup d, comb, batch = self.data, self.agg.mergeCombiners, self.batch @@ -277,7 +279,7 @@ def mergeCombiners(self, iterator, check=True): break def _partitioned_mergeCombiners(self, iterator, limit=0): - """ partition the items by key, then merge them """ + """ Partition the items by key, then merge them """ comb, pdata = self.agg.mergeCombiners, self.pdata c, hfun = 0, self._partition for k, v in iterator: @@ -293,7 +295,7 @@ def _partitioned_mergeCombiners(self, iterator, limit=0): def _first_spill(self): """ - dump all the data into disks partition by partition. + Dump all the data into disks partition by partition. The data has not been partitioned, it will iterator the dataset once, write them into different files, has no @@ -337,13 +339,13 @@ def _spill(self): self.spills += 1 def iteritems(self): - """ return all merged items as iterator """ + """ Return all merged items as iterator """ if not self.pdata and not self.spills: return self.data.iteritems() return self._external_items() def _external_items(self): - """ return all partitioned items as iterator """ + """ Return all partitioned items as iterator """ assert not self.data if any(self.pdata): self._spill() @@ -359,7 +361,10 @@ def _external_items(self): self.mergeCombiners(self.serializer.load_stream(open(p)), False) - if get_used_memory() > hard_limit and j < self.spills - 1: + # limit the total partitions + if (self.scale * self.partitions < self.TOTAL_PARTITIONS) + and j < self.spills - 1 + and get_used_memory() > hard_limit): self.data.clear() # will read from disk again for v in self._recursive_merged_items(i): yield v @@ -368,11 +373,17 @@ def _external_items(self): for v in self.data.iteritems(): yield v self.data.clear() + + # remove the merged partition + for j in range(self.spills): + path = self._get_spill_dir(j) + os.remove(os.path.join(path, str(i))) + finally: self._cleanup() def _cleanup(self): - """ clean up all the files in disks """ + """ Clean up all the files in disks """ for d in self.localdirs: shutil.rmtree(d, True) @@ -410,6 +421,11 @@ def _recursive_merged_items(self, start): for v in m._external_items(): yield v + # remove the merged partition + for j in range(self.spills): + path = self._get_spill_dir(j) + os.remove(os.path.join(path, str(i))) + if __name__ == "__main__": import doctest From e74b78563b40c64297f5871fcf7a6970882dbd6b Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 23 Jul 2014 11:09:46 -0700 Subject: [PATCH 12/18] fix code style and change next_limit to memory_limit --- python/pyspark/rdd.py | 6 +++--- python/pyspark/shuffle.py | 29 +++++++++++++++-------------- 2 files changed, 18 insertions(+), 17 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 1031c6d8086a..6d7026db2640 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1229,7 +1229,7 @@ def partitionBy(self, numPartitions, partitionFunc=portable_hash): # Instead, we'll form the hash buckets in Python, # transferring O(numPartitions) objects to Java. # Each object is a (splitNumber, [objects]) pair. - # In order to void too huge objects, the objects are + # In order to avoid too huge objects, the objects are # grouped into chunks. outputSerializer = self.ctx._unbatched_serializer @@ -1316,11 +1316,11 @@ def combineByKey(self, createCombiner, mergeValue, mergeCombiners, spill = (self.ctx._conf.get("spark.shuffle.spill", 'True').lower() == 'true') memory = (_parse_memory(self.ctx._conf.get( - "spark.python.worker.memory","512m") + "spark.python.worker.memory", "512m") agg = Aggregator(createCombiner, mergeValue, mergeCombiners) def combineLocally(iterator): - merger = ExternalMerger(agg, memory, serializer) \ + merger = ExternalMerger(agg, memory * 0.9, serializer) \ if spill else InMemoryMerger(agg) merger.mergeValues(iterator) return merger.iteritems() diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py index 3307b245a483..86869cfdec7b 100644 --- a/python/pyspark/shuffle.py +++ b/python/pyspark/shuffle.py @@ -216,13 +216,14 @@ def _get_spill_dir(self, n): """ Choose one directory for spill by number n """ return os.path.join(self.localdirs[n % len(self.localdirs)], str(n)) - def next_limit(self): - """ - Return the next memory limit. If the memory is not released - after spilling, it will dump the data only when the used memory - starts to increase. - """ - return max(self.memory_limit, get_used_memory() * 1.05) + def _next_limit(self): + #""" + #Return the next memory limit. If the memory is not released + #after spilling, it will dump the data only when the used memory + #starts to increase. + #""" + #return max(self.memory_limit, get_used_memory() * 1.05) + return self.memory_limit def mergeValues(self, iterator): """ Combine the items by creator and combiner """ @@ -237,7 +238,7 @@ def mergeValues(self, iterator): c += 1 if c % batch == 0 and get_used_memory() > self.memory_limit: self._first_spill() - self._partitioned_mergeValues(iterator, self.next_limit()) + self._partitioned_mergeValues(iterator, self._next_limit()) break def _partition(self, key): @@ -259,7 +260,7 @@ def _partitioned_mergeValues(self, iterator, limit=0): c += 1 if c % batch == 0 and get_used_memory() > limit: self._spill() - limit = self.next_limit() + limit = self._next_limit() def mergeCombiners(self, iterator, check=True): """ Merge (K,V) pair by mergeCombiner """ @@ -275,7 +276,7 @@ def mergeCombiners(self, iterator, check=True): c += 1 if c % batch == 0 and get_used_memory() > self.memory_limit: self._first_spill() - self._partitioned_mergeCombiners(iterator, self.next_limit()) + self._partitioned_mergeCombiners(iterator, self._next_limit()) break def _partitioned_mergeCombiners(self, iterator, limit=0): @@ -291,7 +292,7 @@ def _partitioned_mergeCombiners(self, iterator, limit=0): c += 1 if c % self.batch == 0 and get_used_memory() > limit: self._spill() - limit = self.next_limit() + limit = self._next_limit() def _first_spill(self): """ @@ -349,7 +350,7 @@ def _external_items(self): assert not self.data if any(self.pdata): self._spill() - hard_limit = self.next_limit() + hard_limit = self._next_limit() try: for i in range(self.partitions): @@ -406,7 +407,7 @@ def _recursive_merged_items(self, start): m = ExternalMerger(self.agg, self.memory_limit, self.serializer, subdirs, self.scale * self.partitions) m.pdata = [{} for _ in range(self.partitions)] - limit = self.next_limit() + limit = self._next_limit() for j in range(self.spills): path = self._get_spill_dir(j) @@ -416,7 +417,7 @@ def _recursive_merged_items(self, start): if get_used_memory() > limit: m._spill() - limit = self.next_limit() + limit = self._next_limit() for v in m._external_items(): yield v From f6bd5d6bb5e5e6ac536f4e1d4ad7a159a4221ff4 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 23 Jul 2014 11:33:37 -0700 Subject: [PATCH 13/18] rollback next_limit() again, the performance difference is huge: Given next_limit = memory_limit $ time python pyspark/shuffle.py real 0m20.674s user 0m6.119s sys 0m13.993s Given next_limit = max(memory_limit, used_memory * 1.05) time python pyspark/shuffle.py real 0m0.583s user 0m0.488s sys 0m0.093s --- python/pyspark/rdd.py | 6 +++--- python/pyspark/shuffle.py | 15 +++++++-------- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 6d7026db2640..b7b0e9057798 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1234,7 +1234,7 @@ def partitionBy(self, numPartitions, partitionFunc=portable_hash): outputSerializer = self.ctx._unbatched_serializer limit = (_parse_memory(self.ctx._conf.get( - "spark.python.worker.memory", "512m") / 2) + "spark.python.worker.memory", "512m")) / 2) def add_shuffle_key(split, iterator): @@ -1315,8 +1315,8 @@ def combineByKey(self, createCombiner, mergeValue, mergeCombiners, serializer = self.ctx.serializer spill = (self.ctx._conf.get("spark.shuffle.spill", 'True').lower() == 'true') - memory = (_parse_memory(self.ctx._conf.get( - "spark.python.worker.memory", "512m") + memory = _parse_memory(self.ctx._conf.get( + "spark.python.worker.memory", "512m")) agg = Aggregator(createCombiner, mergeValue, mergeCombiners) def combineLocally(iterator): diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py index 86869cfdec7b..946da0125d80 100644 --- a/python/pyspark/shuffle.py +++ b/python/pyspark/shuffle.py @@ -217,13 +217,12 @@ def _get_spill_dir(self, n): return os.path.join(self.localdirs[n % len(self.localdirs)], str(n)) def _next_limit(self): - #""" - #Return the next memory limit. If the memory is not released - #after spilling, it will dump the data only when the used memory - #starts to increase. - #""" - #return max(self.memory_limit, get_used_memory() * 1.05) - return self.memory_limit + """ + Return the next memory limit. If the memory is not released + after spilling, it will dump the data only when the used memory + starts to increase. + """ + return max(self.memory_limit, get_used_memory() * 1.05) def mergeValues(self, iterator): """ Combine the items by creator and combiner """ @@ -363,7 +362,7 @@ def _external_items(self): False) # limit the total partitions - if (self.scale * self.partitions < self.TOTAL_PARTITIONS) + if (self.scale * self.partitions < self.TOTAL_PARTITIONS and j < self.spills - 1 and get_used_memory() > hard_limit): self.data.clear() # will read from disk again From 67e6ebaf54c68a7c91e537def53f8f43422d6d8e Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 23 Jul 2014 11:43:37 -0700 Subject: [PATCH 14/18] comment for MAX_TOTAL_PARTITIONS --- python/pyspark/shuffle.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py index 946da0125d80..54bc7db41270 100644 --- a/python/pyspark/shuffle.py +++ b/python/pyspark/shuffle.py @@ -182,7 +182,8 @@ class ExternalMerger(Merger): 499950000 """ - TOTAL_PARTITIONS = 4096 + # the max total partitions created recursively + MAX_TOTAL_PARTITIONS = 4096 def __init__(self, aggregator, memory_limit=512, serializer=None, localdirs=None, scale=1, partitions=64, batch=10000): @@ -196,7 +197,7 @@ def __init__(self, aggregator, memory_limit=512, serializer=None, self.partitions = partitions # check the memory after # of items merged self.batch = batch - # scale is used to scale down the hash of key for recursive hash map, + # scale is used to scale down the hash of key for recursive hash map self.scale = scale # unpartitioned merged data self.data = {} @@ -362,7 +363,7 @@ def _external_items(self): False) # limit the total partitions - if (self.scale * self.partitions < self.TOTAL_PARTITIONS + if (self.scale * self.partitions < self.MAX_TOTAL_PARTITIONS and j < self.spills - 1 and get_used_memory() > hard_limit): self.data.clear() # will read from disk again From dcf03a9e95f089e9f07995594b03579b9b62c1b3 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Wed, 23 Jul 2014 21:46:25 -0700 Subject: [PATCH 15/18] fix memory_info() of psutil --- python/pyspark/shuffle.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py index 54bc7db41270..06b1544cca84 100644 --- a/python/pyspark/shuffle.py +++ b/python/pyspark/shuffle.py @@ -28,9 +28,12 @@ def get_used_memory(): """ Return the used memory in MB """ - self = psutil.Process(os.getpid()) - return self.memory_info().rss >> 20 - + process = psutil.Process(os.getpid()) + if hasattr(process, "memory_info"): + info = process.memory_info() + else: + info = process.get_memory_info() + return info.rss >> 20 except ImportError: def get_used_memory(): From 902f036811615736aa38f2529cef19dcdf686291 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 24 Jul 2014 00:34:25 -0700 Subject: [PATCH 16/18] add shuffle.py into run-tests --- python/run-tests | 1 + 1 file changed, 1 insertion(+) diff --git a/python/run-tests b/python/run-tests index 9282aa47e837..29f755fc0dcd 100755 --- a/python/run-tests +++ b/python/run-tests @@ -61,6 +61,7 @@ run_test "pyspark/broadcast.py" run_test "pyspark/accumulators.py" run_test "pyspark/serializers.py" unset PYSPARK_DOC_TEST +run_test "pyspark/shuffle.py" run_test "pyspark/tests.py" run_test "pyspark/mllib/_common.py" run_test "pyspark/mllib/classification.py" From 37d71f765e804c7c1db389d5afa31f269fd4dae3 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 24 Jul 2014 15:22:55 -0700 Subject: [PATCH 17/18] balance the partitions --- python/pyspark/shuffle.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py index 06b1544cca84..8c2996f30961 100644 --- a/python/pyspark/shuffle.py +++ b/python/pyspark/shuffle.py @@ -189,7 +189,7 @@ class ExternalMerger(Merger): MAX_TOTAL_PARTITIONS = 4096 def __init__(self, aggregator, memory_limit=512, serializer=None, - localdirs=None, scale=1, partitions=64, batch=10000): + localdirs=None, scale=1, partitions=59, batch=1000): Merger.__init__(self, aggregator) self.memory_limit = memory_limit # default serializer is only used for tests @@ -208,6 +208,8 @@ def __init__(self, aggregator, memory_limit=512, serializer=None, self.pdata = [] # number of chunks dumped into disks self.spills = 0 + # randomize the hash of key, id(o) is the address of o (aligned by 8) + self._seed = id(self) + 7 def _get_dirs(self): """ Get all the directories """ @@ -246,7 +248,7 @@ def mergeValues(self, iterator): def _partition(self, key): """ Return the partition for key """ - return (hash(key) / self.scale) % self.partitions + return hash((key, self._seed)) % self.partitions def _partitioned_mergeValues(self, iterator, limit=0): """ Partition the items by key, then combine them """ From cad91bfd0d6143d9d2d30e84b174dd850711de0d Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 24 Jul 2014 18:37:43 -0700 Subject: [PATCH 18/18] call gc.collect() after data.clear() to release memory as much as possible. --- python/pyspark/shuffle.py | 71 ++++++++++++++++++++------------------- 1 file changed, 36 insertions(+), 35 deletions(-) diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py index 8c2996f30961..e3923d1c36c5 100644 --- a/python/pyspark/shuffle.py +++ b/python/pyspark/shuffle.py @@ -20,6 +20,7 @@ import platform import shutil import warnings +import gc from pyspark.serializers import BatchedSerializer, PickleSerializer @@ -242,7 +243,7 @@ def mergeValues(self, iterator): c += 1 if c % batch == 0 and get_used_memory() > self.memory_limit: - self._first_spill() + self._spill() self._partitioned_mergeValues(iterator, self._next_limit()) break @@ -280,7 +281,7 @@ def mergeCombiners(self, iterator, check=True): c += 1 if c % batch == 0 and get_used_memory() > self.memory_limit: - self._first_spill() + self._spill() self._partitioned_mergeCombiners(iterator, self._next_limit()) break @@ -299,33 +300,6 @@ def _partitioned_mergeCombiners(self, iterator, limit=0): self._spill() limit = self._next_limit() - def _first_spill(self): - """ - Dump all the data into disks partition by partition. - - The data has not been partitioned, it will iterator the - dataset once, write them into different files, has no - additional memory. It only called when the memory goes - above limit at the first time. - """ - path = self._get_spill_dir(self.spills) - if not os.path.exists(path): - os.makedirs(path) - # open all the files for writing - streams = [open(os.path.join(path, str(i)), 'w') - for i in range(self.partitions)] - - for k, v in self.data.iteritems(): - h = self._partition(k) - # put one item in batch, make it compatitable with load_stream - # it will increase the memory if dump them in batch - self.serializer.dump_stream([(k, v)], streams[h]) - for s in streams: - s.close() - self.data.clear() - self.pdata = [{} for i in range(self.partitions)] - self.spills += 1 - def _spill(self): """ dump already partitioned data into disks. @@ -336,13 +310,38 @@ def _spill(self): if not os.path.exists(path): os.makedirs(path) - for i in range(self.partitions): - p = os.path.join(path, str(i)) - with open(p, "w") as f: - # dump items in batch - self.serializer.dump_stream(self.pdata[i].iteritems(), f) - self.pdata[i].clear() + if not self.pdata: + # The data has not been partitioned, it will iterator the + # dataset once, write them into different files, has no + # additional memory. It only called when the memory goes + # above limit at the first time. + + # open all the files for writing + streams = [open(os.path.join(path, str(i)), 'w') + for i in range(self.partitions)] + + for k, v in self.data.iteritems(): + h = self._partition(k) + # put one item in batch, make it compatitable with load_stream + # it will increase the memory if dump them in batch + self.serializer.dump_stream([(k, v)], streams[h]) + + for s in streams: + s.close() + + self.data.clear() + self.pdata = [{} for i in range(self.partitions)] + + else: + for i in range(self.partitions): + p = os.path.join(path, str(i)) + with open(p, "w") as f: + # dump items in batch + self.serializer.dump_stream(self.pdata[i].iteritems(), f) + self.pdata[i].clear() + self.spills += 1 + gc.collect() # release the memory as much as possible def iteritems(self): """ Return all merged items as iterator """ @@ -372,6 +371,7 @@ def _external_items(self): and j < self.spills - 1 and get_used_memory() > hard_limit): self.data.clear() # will read from disk again + gc.collect() # release the memory as much as possible for v in self._recursive_merged_items(i): yield v return @@ -379,6 +379,7 @@ def _external_items(self): for v in self.data.iteritems(): yield v self.data.clear() + gc.collect() # remove the merged partition for j in range(self.spills):