From aa69a72d71c55e93b487ac28910b9187c0c71088 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Bry=C5=84ski?= Date: Fri, 15 Sep 2017 20:01:40 +0200 Subject: [PATCH 1/6] Update types.py --- python/pyspark/sql/types.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 920cf009f599..3ccb3c0fe43f 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -483,7 +483,8 @@ def __init__(self, fields=None): self.names = [f.name for f in fields] assert all(isinstance(f, StructField) for f in fields),\ "fields should be a list of StructField" - self._needSerializeAnyField = any(f.needConversion() for f in self) + self._needConversion = [f.needConversion() for f in self] + self._needSerializeAnyField = any(self._needConversion) def add(self, field, data_type=None, nullable=True, metadata=None): """ @@ -528,7 +529,8 @@ def add(self, field, data_type=None, nullable=True, metadata=None): data_type_f = data_type self.fields.append(StructField(field, data_type_f, nullable, metadata)) self.names.append(field) - self._needSerializeAnyField = any(f.needConversion() for f in self) + self._needConversion = [f.needConversion() for f in self] + self._needSerializeAnyField = any(self._needConversion) return self def __iter__(self): @@ -619,7 +621,7 @@ def fromInternal(self, obj): # it's already converted by pickler return obj if self._needSerializeAnyField: - values = [f.fromInternal(v) for f, v in zip(self.fields, obj)] + values = [f.fromInternal(v) if n else v for f, v, n in zip(self.fields, obj, self._needConversion)] else: values = obj return _create_row(self.names, values) From e4d7f76f22007e7e77982da507b97b314ebd4b41 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Bry=C5=84ski?= Date: Fri, 15 Sep 2017 20:16:57 +0200 Subject: [PATCH 2/6] PEP8 fix --- python/pyspark/sql/types.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 3ccb3c0fe43f..fa07af16d5b7 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -621,7 +621,8 @@ def fromInternal(self, obj): # it's already converted by pickler return obj if self._needSerializeAnyField: - values = [f.fromInternal(v) if n else v for f, v, n in zip(self.fields, obj, self._needConversion)] + values = [f.fromInternal(v) if n else v + for f, v, n in zip(self.fields, obj, self._needConversion)] else: values = obj return _create_row(self.names, values) From 64afb16ead8126ff59a35288e0c43dc31e6db23c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Bry=C5=84ski?= Date: Fri, 15 Sep 2017 20:28:13 +0200 Subject: [PATCH 3/6] PEP8 fix --- python/pyspark/sql/types.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index fa07af16d5b7..c030a35dedb3 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -621,7 +621,7 @@ def fromInternal(self, obj): # it's already converted by pickler return obj if self._needSerializeAnyField: - values = [f.fromInternal(v) if n else v + values = [f.fromInternal(v) if n else v for f, v, n in zip(self.fields, obj, self._needConversion)] else: values = obj From b1800acce09c43bdea96932f3f50e862d53677a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Bry=C5=84ski?= Date: Sun, 17 Sep 2017 11:37:57 +0200 Subject: [PATCH 4/6] Add optimization to 'toInternal' function --- python/pyspark/sql/types.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index c030a35dedb3..5ea57d81e932 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -593,12 +593,15 @@ def toInternal(self, obj): if self._needSerializeAnyField: if isinstance(obj, dict): - return tuple(f.toInternal(obj.get(n)) for n, f in zip(self.names, self.fields)) + return tuple(f.toInternal(obj.get(n)) if c else obj.get(n) + for n, f, c in zip(self.names, self.fields, self._needConversion)) elif isinstance(obj, (tuple, list)): - return tuple(f.toInternal(v) for f, v in zip(self.fields, obj)) + return tuple(f.toInternal(v) if c else v + for f, v, c in zip(self.fields, obj, self._needConversion)) elif hasattr(obj, "__dict__"): d = obj.__dict__ - return tuple(f.toInternal(d.get(n)) for n, f in zip(self.names, self.fields)) + return tuple(f.toInternal(d.get(n)) if c else d.get(n) + for n, f, c in zip(self.names, self.fields, self._needConversion)) else: raise ValueError("Unexpected tuple %r with StructType" % obj) else: @@ -621,8 +624,8 @@ def fromInternal(self, obj): # it's already converted by pickler return obj if self._needSerializeAnyField: - values = [f.fromInternal(v) if n else v - for f, v, n in zip(self.fields, obj, self._needConversion)] + values = [f.fromInternal(v) if c else v + for f, v, c in zip(self.fields, obj, self._needConversion)] else: values = obj return _create_row(self.names, values) From 8708a9db9ccbdf2d54ac4c68a562ecaa7cec88de Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Bry=C5=84ski?= Date: Sun, 17 Sep 2017 11:44:39 +0200 Subject: [PATCH 5/6] PEP8 fix --- python/pyspark/sql/types.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 5ea57d81e932..25b7856c3e65 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -596,7 +596,7 @@ def toInternal(self, obj): return tuple(f.toInternal(obj.get(n)) if c else obj.get(n) for n, f, c in zip(self.names, self.fields, self._needConversion)) elif isinstance(obj, (tuple, list)): - return tuple(f.toInternal(v) if c else v + return tuple(f.toInternal(v) if c else v for f, v, c in zip(self.fields, obj, self._needConversion)) elif hasattr(obj, "__dict__"): d = obj.__dict__ From e9b779863dd84bda7a75af2f3fe3cbde892092a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maciej=20Bry=C5=84ski?= Date: Sun, 17 Sep 2017 12:02:41 +0200 Subject: [PATCH 6/6] Comments --- python/pyspark/sql/types.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 25b7856c3e65..aaf520fa8019 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -483,6 +483,7 @@ def __init__(self, fields=None): self.names = [f.name for f in fields] assert all(isinstance(f, StructField) for f in fields),\ "fields should be a list of StructField" + # Precalculated list of fields that need conversion with fromInternal/toInternal functions self._needConversion = [f.needConversion() for f in self] self._needSerializeAnyField = any(self._needConversion) @@ -529,6 +530,7 @@ def add(self, field, data_type=None, nullable=True, metadata=None): data_type_f = data_type self.fields.append(StructField(field, data_type_f, nullable, metadata)) self.names.append(field) + # Precalculated list of fields that need conversion with fromInternal/toInternal functions self._needConversion = [f.needConversion() for f in self] self._needSerializeAnyField = any(self._needConversion) return self @@ -592,6 +594,7 @@ def toInternal(self, obj): return if self._needSerializeAnyField: + # Only calling toInternal function for fields that need conversion if isinstance(obj, dict): return tuple(f.toInternal(obj.get(n)) if c else obj.get(n) for n, f, c in zip(self.names, self.fields, self._needConversion)) @@ -624,6 +627,7 @@ def fromInternal(self, obj): # it's already converted by pickler return obj if self._needSerializeAnyField: + # Only calling fromInternal function for fields that need conversion values = [f.fromInternal(v) if c else v for f, v, c in zip(self.fields, obj, self._needConversion)] else: