Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,10 @@ private[spark] class PythonRDD[T: ClassTag](
override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = {
val startTime = System.currentTimeMillis
val env = SparkEnv.get
val worker: Socket = env.createPythonWorker(pythonExec, envVars.toMap)
val localdir = env.blockManager.diskBlockManager.localDirs.map(
f => f.getPath()).mkString(",")
val worker: Socket = env.createPythonWorker(pythonExec,
envVars.toMap + ("SPARK_LOCAL_DIR" -> localdir))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of passing spark.local.dir, we should figure out which directories the DiskBlockManager created (you can get it from env.blockManager) and pass a comma-separated list of those. This way the data for this Spark application is all in one directory, and Java can make sure we clean it all up at the end. Otherwise the way you have things set up now, those directories are never cleared if the Python worker crashes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DiskBlockManager.localDirs is private, make it public?


// Start a thread to feed the process input from our parent's iterator
val writerThread = new WriterThread(env, worker, split, context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD
/* Create one local directory for each path mentioned in spark.local.dir; then, inside this
* directory, create multiple subdirectories that we will hash files into, in order to avoid
* having really large inodes at the top level. */
private val localDirs: Array[File] = createLocalDirs()
val localDirs: Array[File] = createLocalDirs()
if (localDirs.isEmpty) {
logError("Failed to create any local dir.")
System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR)
Expand Down
9 changes: 9 additions & 0 deletions docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,15 @@ Apart from these, the following properties are also available, and may be useful
Spark's dependencies and user dependencies. It is currently an experimental feature.
</td>
</tr>
<tr>
<td><code>spark.python.worker.memory</code></td>
<td>512m</td>
<td>
Amount of memory to use per python worker process during aggregation, in the same
format as JVM memory strings (e.g. <code>512m</code>, <code>2g</code>). If the memory
used during aggregation goes above this amount, it will spill the data into disks.
</td>
</tr>
</table>

#### Shuffle Behavior
Expand Down
2 changes: 1 addition & 1 deletion python/epydoc.conf
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,4 @@ private: no
exclude: pyspark.cloudpickle pyspark.worker pyspark.join
pyspark.java_gateway pyspark.examples pyspark.shell pyspark.tests
pyspark.rddsampler pyspark.daemon pyspark.mllib._common
pyspark.mllib.tests
pyspark.mllib.tests pyspark.shuffle
92 changes: 71 additions & 21 deletions python/pyspark/rdd.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
from pyspark.rddsampler import RDDSampler
from pyspark.storagelevel import StorageLevel
from pyspark.resultiterable import ResultIterable
from pyspark.shuffle import Aggregator, InMemoryMerger, ExternalMerger, \
get_used_memory

from py4j.java_collections import ListConverter, MapConverter

Expand Down Expand Up @@ -197,6 +199,22 @@ def _replaceRoot(self, value):
self._sink(1)


def _parse_memory(s):
"""
Parse a memory string in the format supported by Java (e.g. 1g, 200m) and
return the value in MB

>>> _parse_memory("256m")
256
>>> _parse_memory("2g")
2048
"""
units = {'g': 1024, 'm': 1, 't': 1 << 20, 'k': 1.0 / 1024}
if s[-1] not in units:
raise ValueError("invalid format: " + s)
return int(float(s[:-1]) * units[s[-1].lower()])


class RDD(object):

"""
Expand Down Expand Up @@ -1207,20 +1225,49 @@ def partitionBy(self, numPartitions, partitionFunc=portable_hash):
if numPartitions is None:
numPartitions = self._defaultReducePartitions()

# Transferring O(n) objects to Java is too expensive. Instead, we'll
# form the hash buckets in Python, transferring O(numPartitions) objects
# to Java. Each object is a (splitNumber, [objects]) pair.
# Transferring O(n) objects to Java is too expensive.
# Instead, we'll form the hash buckets in Python,
# transferring O(numPartitions) objects to Java.
# Each object is a (splitNumber, [objects]) pair.
# In order to avoid too huge objects, the objects are
# grouped into chunks.
outputSerializer = self.ctx._unbatched_serializer

limit = (_parse_memory(self.ctx._conf.get(
"spark.python.worker.memory", "512m")) / 2)

def add_shuffle_key(split, iterator):

buckets = defaultdict(list)
c, batch = 0, min(10 * numPartitions, 1000)

for (k, v) in iterator:
buckets[partitionFunc(k) % numPartitions].append((k, v))
c += 1

# check used memory and avg size of chunk of objects
if (c % 1000 == 0 and get_used_memory() > limit
or c > batch):
n, size = len(buckets), 0
for split in buckets.keys():
yield pack_long(split)
d = outputSerializer.dumps(buckets[split])
del buckets[split]
yield d
size += len(d)

avg = (size / n) >> 20
# let 1M < avg < 10M
if avg < 1:
batch *= 1.5
elif avg > 10:
batch = max(batch / 1.5, 1)
c = 0

for (split, items) in buckets.iteritems():
yield pack_long(split)
yield outputSerializer.dumps(items)

keyed = PipelinedRDD(self, add_shuffle_key)
keyed._bypass_serializer = True
with _JavaStackTrace(self.context) as st:
Expand All @@ -1230,8 +1277,8 @@ def add_shuffle_key(split, iterator):
id(partitionFunc))
jrdd = pairRDD.partitionBy(partitioner).values()
rdd = RDD(jrdd, self.ctx, BatchedSerializer(outputSerializer))
# This is required so that id(partitionFunc) remains unique, even if
# partitionFunc is a lambda:
# This is required so that id(partitionFunc) remains unique,
# even if partitionFunc is a lambda:
rdd._partitionFunc = partitionFunc
return rdd

Expand Down Expand Up @@ -1265,26 +1312,28 @@ def combineByKey(self, createCombiner, mergeValue, mergeCombiners,
if numPartitions is None:
numPartitions = self._defaultReducePartitions()

serializer = self.ctx.serializer
spill = (self.ctx._conf.get("spark.shuffle.spill", 'True').lower()
== 'true')
memory = _parse_memory(self.ctx._conf.get(
"spark.python.worker.memory", "512m"))
agg = Aggregator(createCombiner, mergeValue, mergeCombiners)

def combineLocally(iterator):
combiners = {}
for x in iterator:
(k, v) = x
if k not in combiners:
combiners[k] = createCombiner(v)
else:
combiners[k] = mergeValue(combiners[k], v)
return combiners.iteritems()
merger = ExternalMerger(agg, memory * 0.9, serializer) \
if spill else InMemoryMerger(agg)
merger.mergeValues(iterator)
return merger.iteritems()

locally_combined = self.mapPartitions(combineLocally)
shuffled = locally_combined.partitionBy(numPartitions)

def _mergeCombiners(iterator):
combiners = {}
for (k, v) in iterator:
if k not in combiners:
combiners[k] = v
else:
combiners[k] = mergeCombiners(combiners[k], v)
return combiners.iteritems()
merger = ExternalMerger(agg, memory, serializer) \
if spill else InMemoryMerger(agg)
merger.mergeCombiners(iterator)
return merger.iteritems()

return shuffled.mapPartitions(_mergeCombiners)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only implements external merging in the reduce tasks, but we need it in the map tasks too. For that you'll need to modify the Merger interface to take createCombiner, mergeValue and mergeCombiners together. Please add those there and add tests for the richer interface.


def aggregateByKey(self, zeroValue, seqFunc, combFunc, numPartitions=None):
Expand Down Expand Up @@ -1343,7 +1392,8 @@ def mergeValue(xs, x):
return xs

def mergeCombiners(a, b):
return a + b
a.extend(b)
return a

return self.combineByKey(createCombiner, mergeValue, mergeCombiners,
numPartitions).mapValues(lambda x: ResultIterable(x))
Expand Down
29 changes: 28 additions & 1 deletion python/pyspark/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ def load_stream(self, stream):
return chain.from_iterable(self._load_stream_without_unbatching(stream))

def _load_stream_without_unbatching(self, stream):
return self.serializer.load_stream(stream)
return self.serializer.load_stream(stream)

def __eq__(self, other):
return (isinstance(other, BatchedSerializer) and
Expand Down Expand Up @@ -302,6 +302,33 @@ class MarshalSerializer(FramedSerializer):
loads = marshal.loads


class AutoSerializer(FramedSerializer):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we actually use this by default yet or will it fail for NumPy arrays? If it won't work by default, we should use PickleSerializer instead and wait to fix this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will fails in some cases, such as array, so it's not safe to make it as default. I will improve it later and try to make it as default. Currently, it's still useful, because people can use it in most cases.

"""
Choose marshal or cPickle as serialization protocol autumatically
"""
def __init__(self):
FramedSerializer.__init__(self)
self._type = None

def dumps(self, obj):
if self._type is not None:
return 'P' + cPickle.dumps(obj, -1)
try:
return 'M' + marshal.dumps(obj)
except Exception:
self._type = 'P'
return 'P' + cPickle.dumps(obj, -1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the objects are not marshal-able but are pickle-able, is there a big performance cost to throwing an exception on each write? Would be good to test this, because if not, we can make this serializer our default where we now use Pickle. Even if there is a cost maybe we can do something where if 10% of the objects written fail to marshal we switch to always using pickle.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had add an fast path for it, no exception cost any more.


def loads(self, obj):
_type = obj[0]
if _type == 'M':
return marshal.loads(obj[1:])
elif _type == 'P':
return cPickle.loads(obj[1:])
else:
raise ValueError("invalid sevialization type: %s" % _type)


class UTF8Deserializer(Serializer):
"""
Deserializes streams written by String.getBytes.
Expand Down
Loading