Skip to content

Commit 077ecb2

Browse files
committed
Recover earlier changes lost in previous merge for context.py
1 parent 5af4770 commit 077ecb2

File tree

1 file changed

+8
-97
lines changed

1 file changed

+8
-97
lines changed

python/pyspark/context.py

Lines changed: 8 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@
2828
from pyspark.conf import SparkConf
2929
from pyspark.files import SparkFiles
3030
from pyspark.java_gateway import launch_gateway
31-
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, PairDeserializer
31+
from pyspark.serializers import PickleSerializer, BatchedSerializer, UTF8Deserializer, \
32+
PairDeserializer
3233
from pyspark.storagelevel import StorageLevel
3334
from pyspark import rdd
3435
from pyspark.rdd import RDD
@@ -296,98 +297,6 @@ def wholeTextFiles(self, path):
296297
return RDD(self._jsc.wholeTextFiles(path), self,
297298
PairDeserializer(UTF8Deserializer(), UTF8Deserializer()))
298299

299-
def sequenceFile(self, name, key_class="org.apache.hadoop.io.Text", value_class="org.apache.hadoop.io.Text",
300-
key_wrapper="", value_wrapper="", minSplits=None):
301-
"""
302-
Read a Hadoop SequenceFile with arbitrary key and value Writable class from HDFS,
303-
a local file system (available on all nodes), or any Hadoop-supported file system URI.
304-
The mechanism is as follows:
305-
1. A Java RDD is created from the SequenceFile or other InputFormat, and the key and value Writable classes
306-
2. Serialization is attempted via Pyrolite pickling
307-
3. If this fails, the fallback is to call 'toString' on each key and value
308-
4. C{PickleSerializer} is used to deserialize pickled objects on the Python side
309-
>>> sorted(sc.sequenceFile(tempdir + "/sftestdata/sfint/").collect())
310-
[(1, u'aa'), (1, u'aa'), (2, u'aa'), (2, u'bb'), (2, u'bb'), (3, u'cc')]
311-
>>> sorted(sc.sequenceFile(tempdir + "/sftestdata/sfdouble/").collect())
312-
[(1.0, u'aa'), (1.0, u'aa'), (2.0, u'aa'), (2.0, u'bb'), (2.0, u'bb'), (3.0, u'cc')]
313-
>>> sorted(sc.sequenceFile(tempdir + "/sftestdata/sftext/").collect())
314-
[(u'1', u'aa'), (u'1', u'aa'), (u'2', u'aa'), (u'2', u'bb'), (u'2', u'bb'), (u'3', u'cc')]
315-
>>> sorted(sc.sequenceFile(tempdir + "/sftestdata/sfbool/").collect())
316-
[(1, False), (1, True), (2, False), (2, False), (2, True), (3, True)]
317-
>>> sorted(sc.sequenceFile(tempdir + "/sftestdata/sfnull/").collect())
318-
[(1, None), (1, None), (2, None), (2, None), (2, None), (3, None)]
319-
>>> sorted(sc.sequenceFile(tempdir + "/sftestdata/sfmap/").collect())
320-
[(1, {2.0: u'aa'}), (1, {3.0: u'bb'}), (2, {1.0: u'aa'}), (2, {1.0: u'cc'}), (2, {3.0: u'bb'}), (3, {2.0: u'dd'})]
321-
>>> r = sorted(sc.sequenceFile(tempdir + "/sftestdata/sfclass").collect())[0]
322-
>>> r == (u'1', {u'__class__': u'org.apache.spark.api.python.TestWritable', u'double': 54.0, u'int': 123, u'str': u'test1'})
323-
True
324-
"""
325-
minSplits = minSplits or min(self.defaultParallelism, 2)
326-
jrdd = self._jvm.PythonRDD.sequenceFile(self._jsc, name, key_class, value_class, key_wrapper, value_wrapper,
327-
minSplits)
328-
return RDD(jrdd, self, PickleSerializer())
329-
330-
def newAPIHadoopFile(self, name, inputformat_class, key_class="org.apache.hadoop.io.Text",
331-
value_class="org.apache.hadoop.io.Text", key_wrapper="toString",
332-
value_wrapper="toString", conf={}):
333-
"""
334-
Read a 'new API' Hadoop InputFormat with arbitrary key and value class from HDFS,
335-
a local file system (available on all nodes), or any Hadoop-supported file system URI.
336-
The mechanism is the same as for sc.sequenceFile.
337-
338-
A Hadoop configuration can be passed in as a Python dict. This will be converted into a Configuration in Java
339-
"""
340-
jconf = self._jvm.java.util.HashMap()
341-
for k, v in conf.iteritems():
342-
jconf[k] = v
343-
jrdd = self._jvm.PythonRDD.newAPIHadoopFile(self._jsc, name, inputformat_class, key_class, value_class,
344-
key_wrapper, value_wrapper, jconf)
345-
return RDD(jrdd, self, PickleSerializer())
346-
347-
def newAPIHadoopRDD(self, inputformat_class, key_class="org.apache.hadoop.io.Text",
348-
value_class="org.apache.hadoop.io.Text", key_wrapper="", value_wrapper="", conf={}):
349-
"""
350-
Read a 'new API' Hadoop InputFormat with arbitrary key and value class, from an arbitrary Hadoop configuration,
351-
which is passed in as a Python dict. This will be converted into a Configuration in Java.
352-
The mechanism is the same as for sc.sequenceFile.
353-
"""
354-
jconf = self._jvm.java.util.HashMap()
355-
for k, v in conf.iteritems():
356-
jconf[k] = v
357-
jrdd = self._jvm.PythonRDD.newAPIHadoopRDD(self._jsc, inputformat_class, key_class, value_class, key_wrapper,
358-
value_wrapper, jconf)
359-
return RDD(jrdd, self, PickleSerializer())
360-
361-
def hadoopFile(self, name, inputformat_class, key_class="org.apache.hadoop.io.Text",
362-
value_class="org.apache.hadoop.io.Text", key_wrapper="", value_wrapper="", conf={}):
363-
"""
364-
Read an 'old' Hadoop InputFormat with arbitrary key and value class from HDFS,
365-
a local file system (available on all nodes), or any Hadoop-supported file system URI.
366-
The mechanism is the same as for sc.sequenceFile.
367-
368-
A Hadoop configuration can be passed in as a Python dict. This will be converted into a Configuration in Java
369-
"""
370-
jconf = self._jvm.java.util.HashMap()
371-
for k, v in conf.iteritems():
372-
jconf[k] = v
373-
jrdd = self._jvm.PythonRDD.hadoopFile(self._jsc, name, inputformat_class, key_class, value_class, key_wrapper,
374-
value_wrapper, jconf)
375-
return RDD(jrdd, self, PickleSerializer())
376-
377-
def hadoopRDD(self, inputformat_class, key_class="org.apache.hadoop.io.Text",
378-
value_class="org.apache.hadoop.io.Text", key_wrapper="", value_wrapper="", conf={}):
379-
"""
380-
Read an 'old' Hadoop InputFormat with arbitrary key and value class, from an arbitrary Hadoop configuration,
381-
which is passed in as a Python dict. This will be converted into a Configuration in Java.
382-
The mechanism is the same as for sc.sequenceFile.
383-
"""
384-
jconf = self._jvm.java.util.HashMap()
385-
for k, v in conf.iteritems():
386-
jconf[k] = v
387-
jrdd = self._jvm.PythonRDD.hadoopRDD(self._jsc, inputformat_class, key_class, value_class, key_wrapper,
388-
value_wrapper, jconf)
389-
return RDD(jrdd, self, PickleSerializer())
390-
391300
def _checkpointFile(self, name, input_deserializer):
392301
jrdd = self._jsc.checkpointFile(name)
393302
return RDD(jrdd, self, input_deserializer)
@@ -514,8 +423,11 @@ def _getJavaStorageLevel(self, storageLevel):
514423
raise Exception("storageLevel must be of type pyspark.StorageLevel")
515424

516425
newStorageLevel = self._jvm.org.apache.spark.storage.StorageLevel
517-
return newStorageLevel(storageLevel.useDisk, storageLevel.useMemory,
518-
storageLevel.deserialized, storageLevel.replication)
426+
return newStorageLevel(storageLevel.useDisk,
427+
storageLevel.useMemory,
428+
storageLevel.useOffHeap,
429+
storageLevel.deserialized,
430+
storageLevel.replication)
519431

520432
def setJobGroup(self, groupId, description):
521433
"""
@@ -555,9 +467,8 @@ def _test():
555467
globs = globals().copy()
556468
globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
557469
globs['tempdir'] = tempfile.mkdtemp()
558-
globs['sc']._jvm.WriteInputFormatTestDataGenerator.generateData(globs['tempdir'], globs['sc']._jsc)
559470
atexit.register(lambda: shutil.rmtree(globs['tempdir']))
560-
(failure_count, test_count) = doctest.testmod(globs=globs)
471+
(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
561472
globs['sc'].stop()
562473
if failure_count:
563474
exit(-1)

0 commit comments

Comments
 (0)