Skip to content

Commit b3872e0

Browse files
Vladimir VladimirovJoshRosen
authored andcommitted
SPARK-5633 pyspark saveAsTextFile support for compression codec
See https://issues.apache.org/jira/browse/SPARK-5633 for details Author: Vladimir Vladimirov <[email protected]> Closes apache#4403 from smartkiwi/master and squashes the following commits: 94c014e [Vladimir Vladimirov] SPARK-5633 pyspark saveAsTextFile support for compression codec
1 parent 65181b7 commit b3872e0

File tree

1 file changed

+20
-2
lines changed

1 file changed

+20
-2
lines changed

python/pyspark/rdd.py

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1366,10 +1366,14 @@ def saveAsPickleFile(self, path, batchSize=10):
13661366
ser = BatchedSerializer(PickleSerializer(), batchSize)
13671367
self._reserialize(ser)._jrdd.saveAsObjectFile(path)
13681368

1369-
def saveAsTextFile(self, path):
1369+
def saveAsTextFile(self, path, compressionCodecClass=None):
13701370
"""
13711371
Save this RDD as a text file, using string representations of elements.
13721372
1373+
@param path: path to text file
1374+
@param compressionCodecClass: (None by default) string i.e.
1375+
"org.apache.hadoop.io.compress.GzipCodec"
1376+
13731377
>>> tempFile = NamedTemporaryFile(delete=True)
13741378
>>> tempFile.close()
13751379
>>> sc.parallelize(range(10)).saveAsTextFile(tempFile.name)
@@ -1385,6 +1389,16 @@ def saveAsTextFile(self, path):
13851389
>>> sc.parallelize(['', 'foo', '', 'bar', '']).saveAsTextFile(tempFile2.name)
13861390
>>> ''.join(sorted(input(glob(tempFile2.name + "/part-0000*"))))
13871391
'\\n\\n\\nbar\\nfoo\\n'
1392+
1393+
Using compressionCodecClass
1394+
1395+
>>> tempFile3 = NamedTemporaryFile(delete=True)
1396+
>>> tempFile3.close()
1397+
>>> codec = "org.apache.hadoop.io.compress.GzipCodec"
1398+
>>> sc.parallelize(['foo', 'bar']).saveAsTextFile(tempFile3.name, codec)
1399+
>>> from fileinput import input, hook_compressed
1400+
>>> ''.join(sorted(input(glob(tempFile3.name + "/part*.gz"), openhook=hook_compressed)))
1401+
'bar\\nfoo\\n'
13881402
"""
13891403
def func(split, iterator):
13901404
for x in iterator:
@@ -1395,7 +1409,11 @@ def func(split, iterator):
13951409
yield x
13961410
keyed = self.mapPartitionsWithIndex(func)
13971411
keyed._bypass_serializer = True
1398-
keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)
1412+
if compressionCodecClass:
1413+
compressionCodec = self.ctx._jvm.java.lang.Class.forName(compressionCodecClass)
1414+
keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path, compressionCodec)
1415+
else:
1416+
keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path)
13991417

14001418
# Pair functions
14011419

0 commit comments

Comments
 (0)