Skip to content

Commit aabd7fa

Browse files
committed
fix pickle itemgetter with cloudpickle
1 parent a7d145e commit aabd7fa

File tree

2 files changed

+9
-2
lines changed

2 files changed

+9
-2
lines changed

python/pyspark/cloudpickle.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -560,8 +560,9 @@ class ItemGetterType(ctypes.Structure):
560560
]
561561

562562

563-
itemgetter_obj = ctypes.cast(ctypes.c_void_p(id(obj)), ctypes.POINTER(ItemGetterType)).contents
564-
return self.save_reduce(operator.itemgetter, (itemgetter_obj.item,))
563+
obj = ctypes.cast(ctypes.c_void_p(id(obj)), ctypes.POINTER(ItemGetterType)).contents
564+
return self.save_reduce(operator.itemgetter,
565+
obj.item if obj.nitems > 1 else (obj.item,))
565566

566567
if PyObject_HEAD:
567568
dispatch[operator.itemgetter] = save_itemgetter

python/pyspark/tests.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,12 @@ def combOp(x, y):
278278
self.assertEqual(set([2]), sets[3])
279279
self.assertEqual(set([1, 3]), sets[5])
280280

281+
def test_itemgetter(self):
282+
rdd = self.sc.parallelize([range(10)])
283+
from operator import itemgetter
284+
self.assertEqual([1], rdd.map(itemgetter(1)).collect())
285+
self.assertEqual([(2, 3)], rdd.map(itemgetter(2, 3)).collect())
286+
281287

282288
class TestIO(PySparkTestCase):
283289

0 commit comments

Comments
 (0)