Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions python/pyspark/mllib/clustering.py
Original file line number Diff line number Diff line change
Expand Up @@ -618,8 +618,7 @@ def assignments(self):
"""
Returns the cluster assignments of this model.
"""
return self.call("getAssignments").map(
lambda x: (PowerIterationClustering.Assignment(*x)))
return self.call("getAssignments").map(lambda x: Assignment(*x))

@classmethod
@since('1.5.0')
Expand All @@ -633,6 +632,12 @@ def load(cls, sc, path):
return PowerIterationClusteringModel(wrapper)


#: Represents an (id, cluster) tuple.
#:
#: .. versionadded:: 1.5.0
Assignment = namedtuple("Assignment", ["id", "cluster"])


class PowerIterationClustering(object):
"""
Power Iteration Clustering (PIC), a scalable graph clustering algorithm
Expand Down Expand Up @@ -671,12 +676,8 @@ def train(cls, rdd, k, maxIterations=100, initMode="random"):
rdd.map(_convert_to_vector), int(k), int(maxIterations), initMode)
return PowerIterationClusteringModel(model)

class Assignment(namedtuple("Assignment", ["id", "cluster"])):
"""
Represents an (id, cluster) tuple.

.. versionadded:: 1.5.0
"""
# Backward-compatible alias.
Assignment = Assignment


class StreamingKMeansModel(KMeansModel):
Expand Down
32 changes: 18 additions & 14 deletions python/pyspark/mllib/fpm.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def freqItemsets(self):
"""
Returns the frequent itemsets of this model.
"""
return self.call("getFreqItemsets").map(lambda x: (FPGrowth.FreqItemset(x[0], x[1])))
return self.call("getFreqItemsets").map(lambda x: (FreqItemset(x[0], x[1])))

@classmethod
@since("2.0.0")
Expand All @@ -68,6 +68,12 @@ def load(cls, sc, path):
return FPGrowthModel(wrapper)


#: Represents an (items, freq) tuple.
#:
#: .. versionadded:: 1.4.0
FreqItemset = namedtuple("FreqItemset", ["items", "freq"])


class FPGrowth(object):
"""
A Parallel FP-growth algorithm to mine frequent itemsets.
Expand All @@ -94,12 +100,8 @@ def train(cls, data, minSupport=0.3, numPartitions=-1):
model = callMLlibFunc("trainFPGrowthModel", data, float(minSupport), int(numPartitions))
return FPGrowthModel(model)

class FreqItemset(namedtuple("FreqItemset", ["items", "freq"])):
"""
Represents an (items, freq) tuple.

.. versionadded:: 1.4.0
"""
# Backward-compatible alias.
FreqItemset = FreqItemset
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this line should be removed either directly, or following a deprecation cycle.



@inherit_doc
Expand All @@ -124,7 +126,13 @@ class PrefixSpanModel(JavaModelWrapper):
@since("1.6.0")
def freqSequences(self):
"""Gets frequent sequences"""
return self.call("getFreqSequences").map(lambda x: PrefixSpan.FreqSequence(x[0], x[1]))
return self.call("getFreqSequences").map(lambda x: FreqSequence(x[0], x[1]))


#: Represents a (sequence, freq) tuple.
#:
#: .. versionadded:: 1.6.0
FreqSequence = namedtuple("FreqSequence", ["sequence", "freq"])


class PrefixSpan(object):
Expand Down Expand Up @@ -167,12 +175,8 @@ def train(cls, data, minSupport=0.1, maxPatternLength=10, maxLocalProjDBSize=320
data, minSupport, maxPatternLength, maxLocalProjDBSize)
return PrefixSpanModel(model)

class FreqSequence(namedtuple("FreqSequence", ["sequence", "freq"])):
"""
Represents a (sequence, freq) tuple.

.. versionadded:: 1.6.0
"""
# Backward-compatible alias.
FreqSequence = FreqSequence


def _test():
Expand Down
86 changes: 0 additions & 86 deletions python/pyspark/serializers.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,6 @@
from itertools import chain, product
import marshal
import struct
import types
import collections
import zlib
import itertools

Expand Down Expand Up @@ -474,90 +472,6 @@ def dumps(self, obj):
return obj


# Hack namedtuple, make it picklable

__cls = {}


def _restore(name, fields, value):
""" Restore an object of namedtuple"""
k = (name, fields)
cls = __cls.get(k)
if cls is None:
cls = collections.namedtuple(name, fields)
__cls[k] = cls
return cls(*value)


def _hack_namedtuple(cls):
""" Make class generated by namedtuple picklable """
name = cls.__name__
fields = cls._fields

def __reduce__(self):
return (_restore, (name, fields, tuple(self)))
cls.__reduce__ = __reduce__
cls._is_namedtuple_ = True
return cls


def _hijack_namedtuple():
""" Hack namedtuple() to make it picklable """
# hijack only one time
if hasattr(collections.namedtuple, "__hijack"):
return

global _old_namedtuple # or it will put in closure
global _old_namedtuple_kwdefaults # or it will put in closure too

def _copy_func(f):
return types.FunctionType(f.__code__, f.__globals__, f.__name__,
f.__defaults__, f.__closure__)

def _kwdefaults(f):
# __kwdefaults__ contains the default values of keyword-only arguments which are
# introduced from Python 3. The possible cases for __kwdefaults__ in namedtuple
# are as below:
#
# - Does not exist in Python 2.
# - Returns None in <= Python 3.5.x.
# - Returns a dictionary containing the default values to the keys from Python 3.6.x
# (See https://bugs.python.org/issue25628).
kargs = getattr(f, "__kwdefaults__", None)
if kargs is None:
return {}
else:
return kargs

_old_namedtuple = _copy_func(collections.namedtuple)
_old_namedtuple_kwdefaults = _kwdefaults(collections.namedtuple)

def namedtuple(*args, **kwargs):
for k, v in _old_namedtuple_kwdefaults.items():
kwargs[k] = kwargs.get(k, v)
cls = _old_namedtuple(*args, **kwargs)
return _hack_namedtuple(cls)

# replace namedtuple with the new one
collections.namedtuple.__globals__["_old_namedtuple_kwdefaults"] = _old_namedtuple_kwdefaults
collections.namedtuple.__globals__["_old_namedtuple"] = _old_namedtuple
collections.namedtuple.__globals__["_hack_namedtuple"] = _hack_namedtuple
collections.namedtuple.__code__ = namedtuple.__code__
collections.namedtuple.__hijack = 1

# hack the cls already generated by namedtuple.
# Those created in other modules can be pickled as normal,
# so only hack those in __main__ module
for n, o in sys.modules["__main__"].__dict__.items():
if (type(o) is type and o.__base__ is tuple
and hasattr(o, "_fields")
and "__reduce__" not in o.__dict__):
_hack_namedtuple(o) # hack inplace


_hijack_namedtuple()


class PickleSerializer(FramedSerializer):

"""
Expand Down
8 changes: 0 additions & 8 deletions python/pyspark/sql/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -1170,14 +1170,6 @@ def test_infer_nested_schema(self):
df = self.spark.createDataFrame(nestedRdd2)
self.assertEqual(Row(f1=[[1, 2], [2, 3]], f2=[1, 2]), df.collect()[0])

from collections import namedtuple
CustomRow = namedtuple('CustomRow', 'field1 field2')
rdd = self.sc.parallelize([CustomRow(field1=1, field2="row1"),
CustomRow(field1=2, field2="row2"),
CustomRow(field1=3, field2="row3")])
df = self.spark.createDataFrame(rdd)
self.assertEqual(Row(field1=1, field2=u'row1'), df.first())

def test_create_dataframe_from_dict_respects_schema(self):
df = self.spark.createDataFrame([{'a': 1}], ["b"])
self.assertEqual(df.columns, ['b'])
Expand Down
14 changes: 1 addition & 13 deletions python/pyspark/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -236,19 +236,6 @@ def test_external_sort_in_rdd(self):

class SerializationTestCase(unittest.TestCase):

def test_namedtuple(self):
from collections import namedtuple
from pickle import dumps, loads
P = namedtuple("P", "x y")
p1 = P(1, 3)
p2 = loads(dumps(p1, 2))
self.assertEqual(p1, p2)

from pyspark.cloudpickle import dumps
P2 = loads(dumps(P))
p3 = P2(1, 3)
self.assertEqual(p1, p3)

def test_itemgetter(self):
from operator import itemgetter
ser = CloudPickleSerializer()
Expand Down Expand Up @@ -902,6 +889,7 @@ def test_itemgetter(self):

def test_namedtuple_in_rdd(self):
from collections import namedtuple
global Person
Person = namedtuple("Person", "id firstName lastName")
jon = Person(1, "Jon", "Doe")
jane = Person(2, "Jane", "Doe")
Expand Down