Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.mllib.api.python

import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.clustering.PowerIterationClusteringModel

/**
* A Wrapper of PowerIterationClusteringModel to provide helper method for Python
*/
private[python] class PowerIterationClusteringModelWrapper(model: PowerIterationClusteringModel)
extends PowerIterationClusteringModel(model.k, model.assignments) {

def getAssignments: RDD[Array[Any]] = {
model.assignments.map(x => Array(x.id, x.cluster))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,33 @@ private[python] class PythonMLLibAPI extends Serializable {
model.predictSoft(data).map(Vectors.dense)
}

/**
* Java stub for Python mllib PowerIterationClustering.run(). This stub returns a
* handle to the Java object instead of the content of the Java object. Extra care
* needs to be taken in the Python code to ensure it gets freed on exit; see the
* Py4J documentation.
* @param data an RDD of (i, j, s,,ij,,) tuples representing the affinity matrix.
* @param k number of clusters.
* @param maxIterations maximum number of iterations of the power iteration loop.
* @param initMode the initialization mode. This can be either "random" to use
* a random vector as vertex properties, or "degree" to use
* normalized sum similarities. Default: random.
*/
def trainPowerIterationClusteringModel(
data: JavaRDD[Vector],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is worth documenting the elements in each vector.

k: Int,
maxIterations: Int,
initMode: String): PowerIterationClusteringModel = {

val pic = new PowerIterationClustering()
.setK(k)
.setMaxIterations(maxIterations)
.setInitializationMode(initMode)

val model = pic.run(data.rdd.map(v => (v(0).toLong, v(1).toLong, v(2))))
new PowerIterationClusteringModelWrapper(model)
}

/**
* Java stub for Python mllib ALS.train(). This stub returns a handle
* to the Java object instead of the content of the Java object. Extra care
Expand Down
98 changes: 95 additions & 3 deletions python/pyspark/mllib/clustering.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,18 @@

from numpy import array, random, tile

from collections import namedtuple

from pyspark import SparkContext
from pyspark.rdd import RDD, ignore_unicode_prefix
from pyspark.mllib.common import callMLlibFunc, callJavaFunc, _py2java, _java2py
from pyspark.mllib.common import JavaModelWrapper, callMLlibFunc, callJavaFunc, _py2java, _java2py
from pyspark.mllib.linalg import SparseVector, _convert_to_vector, DenseVector
from pyspark.mllib.stat.distribution import MultivariateGaussian
from pyspark.mllib.util import Saveable, Loader, inherit_doc
from pyspark.mllib.util import Saveable, Loader, inherit_doc, JavaLoader, JavaSaveable
from pyspark.streaming import DStream

__all__ = ['KMeansModel', 'KMeans', 'GaussianMixtureModel', 'GaussianMixture',
'PowerIterationClusteringModel', 'PowerIterationClustering',
'StreamingKMeans', 'StreamingKMeansModel']


Expand Down Expand Up @@ -272,6 +275,94 @@ def train(cls, rdd, k, convergenceTol=1e-3, maxIterations=100, seed=None, initia
return GaussianMixtureModel(weight, mvg_obj)


class PowerIterationClusteringModel(JavaModelWrapper, JavaSaveable, JavaLoader):

"""
.. note:: Experimental

Model produced by [[PowerIterationClustering]].

>>> data = [(0, 1, 1.0), (0, 2, 1.0), (1, 3, 1.0), (2, 3, 1.0),
... (0, 3, 1.0), (1, 2, 1.0), (0, 4, 0.1)]
>>> rdd = sc.parallelize(data, 2)
>>> model = PowerIterationClustering.train(rdd, 2, 100)
>>> model.k
2
>>> sorted(model.assignments().collect())
[Assignment(id=0, cluster=1), Assignment(id=1, cluster=0), ...
>>> import os, tempfile
>>> path = tempfile.mkdtemp()
>>> model.save(sc, path)
>>> sameModel = PowerIterationClusteringModel.load(sc, path)
>>> sameModel.k
2
>>> sorted(sameModel.assignments().collect())
[Assignment(id=0, cluster=1), Assignment(id=1, cluster=0), ...
>>> from shutil import rmtree
>>> try:
... rmtree(path)
... except OSError:
... pass
"""

@property
def k(self):
"""
Returns the number of clusters.
"""
return self.call("k")

def assignments(self):
"""
Returns the cluster assignments of this model.
"""
return self.call("getAssignments").map(
lambda x: (PowerIterationClustering.Assignment(*x)))

@classmethod
def load(cls, sc, path):
model = cls._load_java(sc, path)
wrapper = sc._jvm.PowerIterationClusteringModelWrapper(model)
return PowerIterationClusteringModel(wrapper)


class PowerIterationClustering(object):
"""
.. note:: Experimental

Power Iteration Clustering (PIC), a scalable graph clustering algorithm
developed by [[http://www.icml2010.org/papers/387.pdf Lin and Cohen]].
From the abstract: PIC finds a very low-dimensional embedding of a
dataset using truncated power iteration on a normalized pair-wise
similarity matrix of the data.
"""

@classmethod
def train(cls, rdd, k, maxIterations=100, initMode="random"):
"""
:param rdd: an RDD of (i, j, s,,ij,,) tuples representing the
affinity matrix, which is the matrix A in the PIC paper.
The similarity s,,ij,, must be nonnegative.
This is a symmetric matrix and hence s,,ij,, = s,,ji,,.
For any (i, j) with nonzero similarity, there should be
either (i, j, s,,ij,,) or (j, i, s,,ji,,) in the input.
Tuples with i = j are ignored, because we assume
s,,ij,, = 0.0.
:param k: Number of clusters.
:param maxIterations: Maximum number of iterations of the
PIC algorithm.
:param initMode: Initialization mode.
"""
model = callMLlibFunc("trainPowerIterationClusteringModel",
rdd.map(_convert_to_vector), int(k), int(maxIterations), initMode)
return PowerIterationClusteringModel(model)

class Assignment(namedtuple("Assignment", ["id", "cluster"])):
"""
Represents an (id, cluster) tuple.
"""


class StreamingKMeansModel(KMeansModel):
"""
.. note:: Experimental
Expand Down Expand Up @@ -466,7 +557,8 @@ def predictOnValues(self, dstream):

def _test():
import doctest
globs = globals().copy()
import pyspark.mllib.clustering
globs = pyspark.mllib.clustering.__dict__.copy()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the changes required?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the original code will raise "Py4JError: Trying to call a package" at https://github.com/apache/spark/pull/6992/files#diff-21c55b407050d37f67a2919470e047ebR324 .
I took a glance over related code and found in https://github.com/apache/spark/blob/master/python/pyspark/mllib/recommendation.py#L209 it use __dict__copy(), so I try the same method here and it works well.
Then I investigate the difference between the two ways.
Running the following code:

globs1 = globals().copy()
globs2 = pyspark.mllib.clustering.__dict__.copy()

print [k for k in glob1 if k not in glob2]
print [k for k in glob2 if k not in glob1] 

and the output is:

['__loader__']
[]

It means that globs1(can't pass test) has one elements called __loader__ more than globs2(can pass). I have no idea of what cause this, any comments will be appreciated.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't remember why this is required, but thanks for the explanation and let's keep it this way.

globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
globs['sc'].stop()
Expand Down