Skip to content

Commit dfde31d

Browse files
yanboliangmengxr
authored andcommitted
[SPARK-5962] [MLLIB] Python support for Power Iteration Clustering
Python support for Power Iteration Clustering https://issues.apache.org/jira/browse/SPARK-5962 Author: Yanbo Liang <[email protected]> Closes apache#6992 from yanboliang/pyspark-pic and squashes the following commits: 6b03d82 [Yanbo Liang] address comments 4be4423 [Yanbo Liang] Python support for Power Iteration Clustering
1 parent 25f574e commit dfde31d

File tree

3 files changed

+154
-3
lines changed

3 files changed

+154
-3
lines changed
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.mllib.api.python
19+
20+
import org.apache.spark.rdd.RDD
21+
import org.apache.spark.mllib.clustering.PowerIterationClusteringModel
22+
23+
/**
24+
* A Wrapper of PowerIterationClusteringModel to provide helper method for Python
25+
*/
26+
private[python] class PowerIterationClusteringModelWrapper(model: PowerIterationClusteringModel)
27+
extends PowerIterationClusteringModel(model.k, model.assignments) {
28+
29+
def getAssignments: RDD[Array[Any]] = {
30+
model.assignments.map(x => Array(x.id, x.cluster))
31+
}
32+
}

mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,33 @@ private[python] class PythonMLLibAPI extends Serializable {
406406
model.predictSoft(data).map(Vectors.dense)
407407
}
408408

409+
/**
410+
* Java stub for Python mllib PowerIterationClustering.run(). This stub returns a
411+
* handle to the Java object instead of the content of the Java object. Extra care
412+
* needs to be taken in the Python code to ensure it gets freed on exit; see the
413+
* Py4J documentation.
414+
* @param data an RDD of (i, j, s,,ij,,) tuples representing the affinity matrix.
415+
* @param k number of clusters.
416+
* @param maxIterations maximum number of iterations of the power iteration loop.
417+
* @param initMode the initialization mode. This can be either "random" to use
418+
* a random vector as vertex properties, or "degree" to use
419+
* normalized sum similarities. Default: random.
420+
*/
421+
def trainPowerIterationClusteringModel(
422+
data: JavaRDD[Vector],
423+
k: Int,
424+
maxIterations: Int,
425+
initMode: String): PowerIterationClusteringModel = {
426+
427+
val pic = new PowerIterationClustering()
428+
.setK(k)
429+
.setMaxIterations(maxIterations)
430+
.setInitializationMode(initMode)
431+
432+
val model = pic.run(data.rdd.map(v => (v(0).toLong, v(1).toLong, v(2))))
433+
new PowerIterationClusteringModelWrapper(model)
434+
}
435+
409436
/**
410437
* Java stub for Python mllib ALS.train(). This stub returns a handle
411438
* to the Java object instead of the content of the Java object. Extra care

python/pyspark/mllib/clustering.py

Lines changed: 95 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,18 @@
2525

2626
from numpy import array, random, tile
2727

28+
from collections import namedtuple
29+
2830
from pyspark import SparkContext
2931
from pyspark.rdd import RDD, ignore_unicode_prefix
30-
from pyspark.mllib.common import callMLlibFunc, callJavaFunc, _py2java, _java2py
32+
from pyspark.mllib.common import JavaModelWrapper, callMLlibFunc, callJavaFunc, _py2java, _java2py
3133
from pyspark.mllib.linalg import SparseVector, _convert_to_vector, DenseVector
3234
from pyspark.mllib.stat.distribution import MultivariateGaussian
33-
from pyspark.mllib.util import Saveable, Loader, inherit_doc
35+
from pyspark.mllib.util import Saveable, Loader, inherit_doc, JavaLoader, JavaSaveable
3436
from pyspark.streaming import DStream
3537

3638
__all__ = ['KMeansModel', 'KMeans', 'GaussianMixtureModel', 'GaussianMixture',
39+
'PowerIterationClusteringModel', 'PowerIterationClustering',
3740
'StreamingKMeans', 'StreamingKMeansModel']
3841

3942

@@ -272,6 +275,94 @@ def train(cls, rdd, k, convergenceTol=1e-3, maxIterations=100, seed=None, initia
272275
return GaussianMixtureModel(weight, mvg_obj)
273276

274277

278+
class PowerIterationClusteringModel(JavaModelWrapper, JavaSaveable, JavaLoader):
279+
280+
"""
281+
.. note:: Experimental
282+
283+
Model produced by [[PowerIterationClustering]].
284+
285+
>>> data = [(0, 1, 1.0), (0, 2, 1.0), (1, 3, 1.0), (2, 3, 1.0),
286+
... (0, 3, 1.0), (1, 2, 1.0), (0, 4, 0.1)]
287+
>>> rdd = sc.parallelize(data, 2)
288+
>>> model = PowerIterationClustering.train(rdd, 2, 100)
289+
>>> model.k
290+
2
291+
>>> sorted(model.assignments().collect())
292+
[Assignment(id=0, cluster=1), Assignment(id=1, cluster=0), ...
293+
>>> import os, tempfile
294+
>>> path = tempfile.mkdtemp()
295+
>>> model.save(sc, path)
296+
>>> sameModel = PowerIterationClusteringModel.load(sc, path)
297+
>>> sameModel.k
298+
2
299+
>>> sorted(sameModel.assignments().collect())
300+
[Assignment(id=0, cluster=1), Assignment(id=1, cluster=0), ...
301+
>>> from shutil import rmtree
302+
>>> try:
303+
... rmtree(path)
304+
... except OSError:
305+
... pass
306+
"""
307+
308+
@property
309+
def k(self):
310+
"""
311+
Returns the number of clusters.
312+
"""
313+
return self.call("k")
314+
315+
def assignments(self):
316+
"""
317+
Returns the cluster assignments of this model.
318+
"""
319+
return self.call("getAssignments").map(
320+
lambda x: (PowerIterationClustering.Assignment(*x)))
321+
322+
@classmethod
323+
def load(cls, sc, path):
324+
model = cls._load_java(sc, path)
325+
wrapper = sc._jvm.PowerIterationClusteringModelWrapper(model)
326+
return PowerIterationClusteringModel(wrapper)
327+
328+
329+
class PowerIterationClustering(object):
330+
"""
331+
.. note:: Experimental
332+
333+
Power Iteration Clustering (PIC), a scalable graph clustering algorithm
334+
developed by [[http://www.icml2010.org/papers/387.pdf Lin and Cohen]].
335+
From the abstract: PIC finds a very low-dimensional embedding of a
336+
dataset using truncated power iteration on a normalized pair-wise
337+
similarity matrix of the data.
338+
"""
339+
340+
@classmethod
341+
def train(cls, rdd, k, maxIterations=100, initMode="random"):
342+
"""
343+
:param rdd: an RDD of (i, j, s,,ij,,) tuples representing the
344+
affinity matrix, which is the matrix A in the PIC paper.
345+
The similarity s,,ij,, must be nonnegative.
346+
This is a symmetric matrix and hence s,,ij,, = s,,ji,,.
347+
For any (i, j) with nonzero similarity, there should be
348+
either (i, j, s,,ij,,) or (j, i, s,,ji,,) in the input.
349+
Tuples with i = j are ignored, because we assume
350+
s,,ij,, = 0.0.
351+
:param k: Number of clusters.
352+
:param maxIterations: Maximum number of iterations of the
353+
PIC algorithm.
354+
:param initMode: Initialization mode.
355+
"""
356+
model = callMLlibFunc("trainPowerIterationClusteringModel",
357+
rdd.map(_convert_to_vector), int(k), int(maxIterations), initMode)
358+
return PowerIterationClusteringModel(model)
359+
360+
class Assignment(namedtuple("Assignment", ["id", "cluster"])):
361+
"""
362+
Represents an (id, cluster) tuple.
363+
"""
364+
365+
275366
class StreamingKMeansModel(KMeansModel):
276367
"""
277368
.. note:: Experimental
@@ -466,7 +557,8 @@ def predictOnValues(self, dstream):
466557

467558
def _test():
468559
import doctest
469-
globs = globals().copy()
560+
import pyspark.mllib.clustering
561+
globs = pyspark.mllib.clustering.__dict__.copy()
470562
globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
471563
(failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
472564
globs['sc'].stop()

0 commit comments

Comments
 (0)