Skip to content

Commit d5c0361

Browse files
Davies Liudavies
authored andcommitted
[SPARK-10542] [PYSPARK] fix serialize namedtuple
Author: Davies Liu <[email protected]> Closes #8707 from davies/fix_namedtuple.
1 parent 5db51f9 commit d5c0361

File tree

3 files changed

+20
-1
lines changed

3 files changed

+20
-1
lines changed

python/pyspark/cloudpickle.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -350,6 +350,11 @@ def save_global(self, obj, name=None, pack=struct.pack):
350350
if new_override:
351351
d['__new__'] = obj.__new__
352352

353+
# workaround for namedtuple (hijacked by PySpark)
354+
if getattr(obj, '_is_namedtuple_', False):
355+
self.save_reduce(_load_namedtuple, (obj.__name__, obj._fields))
356+
return
357+
353358
self.save(_load_class)
354359
self.save_reduce(typ, (obj.__name__, obj.__bases__, {"__doc__": obj.__doc__}), obj=obj)
355360
d.pop('__doc__', None)
@@ -382,7 +387,7 @@ def save_instancemethod(self, obj):
382387
self.save_reduce(types.MethodType, (obj.__func__, obj.__self__), obj=obj)
383388
else:
384389
self.save_reduce(types.MethodType, (obj.__func__, obj.__self__, obj.__self__.__class__),
385-
obj=obj)
390+
obj=obj)
386391
dispatch[types.MethodType] = save_instancemethod
387392

388393
def save_inst(self, obj):
@@ -744,6 +749,14 @@ def _load_class(cls, d):
744749
return cls
745750

746751

752+
def _load_namedtuple(name, fields):
753+
"""
754+
Loads a class generated by namedtuple
755+
"""
756+
from collections import namedtuple
757+
return namedtuple(name, fields)
758+
759+
747760
"""Constructors for 3rd party libraries
748761
Note: These can never be renamed due to client compatibility issues"""
749762

python/pyspark/serializers.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -359,6 +359,7 @@ def _hack_namedtuple(cls):
359359
def __reduce__(self):
360360
return (_restore, (name, fields, tuple(self)))
361361
cls.__reduce__ = __reduce__
362+
cls._is_namedtuple_ = True
362363
return cls
363364

364365

python/pyspark/tests.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,11 @@ def test_namedtuple(self):
218218
p2 = loads(dumps(p1, 2))
219219
self.assertEqual(p1, p2)
220220

221+
from pyspark.cloudpickle import dumps
222+
P2 = loads(dumps(P))
223+
p3 = P2(1, 3)
224+
self.assertEqual(p1, p3)
225+
221226
def test_itemgetter(self):
222227
from operator import itemgetter
223228
ser = CloudPickleSerializer()

0 commit comments

Comments
 (0)