From a0295e11aca5727290a8d8833ed7a714f8ea21e0 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 14 Aug 2014 13:23:02 -0700 Subject: [PATCH 1/4] add an option to use str in textFile() str is much efficient than unicode --- python/pyspark/context.py | 8 ++++---- python/pyspark/serializers.py | 23 +++++++++++++++-------- 2 files changed, 19 insertions(+), 12 deletions(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 4001ecab5ea00..b065d4d2dcd6c 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -314,7 +314,7 @@ def pickleFile(self, name, minPartitions=None): return RDD(self._jsc.objectFile(name, minPartitions), self, BatchedSerializer(PickleSerializer())) - def textFile(self, name, minPartitions=None): + def textFile(self, name, minPartitions=None, use_unicode=True): """ Read a text file from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI, and return it as an @@ -329,9 +329,9 @@ def textFile(self, name, minPartitions=None): """ minPartitions = minPartitions or min(self.defaultParallelism, 2) return RDD(self._jsc.textFile(name, minPartitions), self, - UTF8Deserializer()) + UTF8Deserializer(use_unicode)) - def wholeTextFiles(self, path, minPartitions=None): + def wholeTextFiles(self, path, minPartitions=None, use_unicode=True): """ Read a directory of text files from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system @@ -369,7 +369,7 @@ def wholeTextFiles(self, path, minPartitions=None): """ minPartitions = minPartitions or self.defaultMinPartitions return RDD(self._jsc.wholeTextFiles(path, minPartitions), self, - PairDeserializer(UTF8Deserializer(), UTF8Deserializer())) + PairDeserializer(UTF8Deserializer(use_unicode), UTF8Deserializer(use_unicode))) def _dictToJavaMap(self, d): jm = self._jvm.java.util.HashMap() diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index df90cafb245bf..de611bbbfa922 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -409,18 +409,25 @@ class UTF8Deserializer(Serializer): Deserializes streams written by String.getBytes. """ + def __init__(self, use_unicode=False): + self.use_unicode = use_unicode + def loads(self, stream): length = read_int(stream) - return stream.read(length).decode('utf8') + return stream.read(length) def load_stream(self, stream): - while True: - try: - yield self.loads(stream) - except struct.error: - return - except EOFError: - return + try: + if self.use_unicode: + while True: + yield self.loads(stream).decode("utf-8") + else: + while True: + yield self.loads(stream) + except struct.error: + return + except EOFError: + return def read_long(stream): From 85246e57c996fe26b11caa110e2390c879ee11d5 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Tue, 19 Aug 2014 15:22:47 -0700 Subject: [PATCH 2/4] add docs for use_unicode --- python/pyspark/context.py | 8 ++++++++ python/pyspark/serializers.py | 9 +++------ 2 files changed, 11 insertions(+), 6 deletions(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index b065d4d2dcd6c..89dd08368434f 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -320,6 +320,10 @@ def textFile(self, name, minPartitions=None, use_unicode=True): nodes), or any Hadoop-supported file system URI, and return it as an RDD of Strings. + If use_unicode is False, the strings will be kept as `str` (encoding + as `utf-8`), which is faster and smaller than unicode. (Added in + Spark 1.1) + >>> path = os.path.join(tempdir, "sample-text.txt") >>> with open(path, "w") as testFile: ... testFile.write("Hello world!") @@ -339,6 +343,10 @@ def wholeTextFiles(self, path, minPartitions=None, use_unicode=True): key-value pair, where the key is the path of each file, the value is the content of each file. + If use_unicode is False, the strings will be kept as `str` (encoding + as `utf-8`), which is faster and smaller than unicode. (Added in + Spark 1.1) + For example, if you have the following files:: hdfs://a-hdfs-path/part-00000 diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index de611bbbfa922..4d20d31d5d176 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -412,18 +412,15 @@ class UTF8Deserializer(Serializer): def __init__(self, use_unicode=False): self.use_unicode = use_unicode - def loads(self, stream): - length = read_int(stream) - return stream.read(length) - def load_stream(self, stream): try: + _read_int = read_int # faster than global lookup if self.use_unicode: while True: - yield self.loads(stream).decode("utf-8") + yield stream.read(_read_int(stream)).decode("utf-8") else: while True: - yield self.loads(stream) + yield stream.read(_read_int(stream)) except struct.error: return except EOFError: From a286f2fab80400f7274ad42854299dd3c328eaef Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Tue, 19 Aug 2014 18:01:28 -0700 Subject: [PATCH 3/4] rollback loads() --- python/pyspark/serializers.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index 4d20d31d5d176..e99fb926b5212 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -412,15 +412,15 @@ class UTF8Deserializer(Serializer): def __init__(self, use_unicode=False): self.use_unicode = use_unicode + def loads(self, stream): + length = read_int(stream) + s = stream.read(length) + return s.decode("utf-8") if self.use_unicode else s + def load_stream(self, stream): try: - _read_int = read_int # faster than global lookup - if self.use_unicode: - while True: - yield stream.read(_read_int(stream)).decode("utf-8") - else: - while True: - yield stream.read(_read_int(stream)) + while True: + yield self.loads(stream) except struct.error: return except EOFError: From 8352d57b511091b55e641952544d94fa35a00647 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Sat, 6 Sep 2014 17:05:57 -0700 Subject: [PATCH 4/4] update version number --- python/pyspark/context.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 89dd08368434f..833a85469e377 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -322,7 +322,7 @@ def textFile(self, name, minPartitions=None, use_unicode=True): If use_unicode is False, the strings will be kept as `str` (encoding as `utf-8`), which is faster and smaller than unicode. (Added in - Spark 1.1) + Spark 1.2) >>> path = os.path.join(tempdir, "sample-text.txt") >>> with open(path, "w") as testFile: @@ -345,7 +345,7 @@ def wholeTextFiles(self, path, minPartitions=None, use_unicode=True): If use_unicode is False, the strings will be kept as `str` (encoding as `utf-8`), which is faster and smaller than unicode. (Added in - Spark 1.1) + Spark 1.2) For example, if you have the following files::