From c867fdfdf623c2e9905a376d35987dbe2914e329 Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Wed, 10 Sep 2014 01:51:44 -0700 Subject: [PATCH 01/16] add Word2Vec to pyspark --- .../mllib/api/python/PythonMLLibAPI.scala | 78 ++++++++++++++++ python/pyspark/mllib/Word2Vec.py | 88 +++++++++++++++++++ python/pyspark/mllib/_common.py | 38 ++++++++ 3 files changed, 204 insertions(+) create mode 100644 python/pyspark/mllib/Word2Vec.py diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 4343124f102a..e06040782b6b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -36,6 +36,8 @@ import org.apache.spark.mllib.tree.impurity._ import org.apache.spark.mllib.tree.model.DecisionTreeModel import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics} import org.apache.spark.mllib.stat.correlation.CorrelationNames +import org.apache.spark.mllib.feature.Word2Vec +import org.apache.spark.mllib.feature.Word2VecModel import org.apache.spark.mllib.util.MLUtils import org.apache.spark.rdd.RDD import org.apache.spark.util.Utils @@ -288,6 +290,37 @@ class PythonMLLibAPI extends Serializable { ALS.trainImplicit(ratings, rank, iterations, lambda, blocks, alpha) } + /** + * Java stub for Python mllib Word2Vec fit(). + * @param dataBytesJRDD Input + */ + def trainWord2Vec( + dataBytesJRDD: JavaRDD[Array[Byte]] + ): Word2VecModel = { + val data = dataBytesJRDD.rdd.map(SerDe.deserializeSeqString) + data.collect() + val word2vec = new Word2Vec() + val model = word2vec.fit(data) + model + } + + /** + * Java stub for Python mllib Word2VecModel + */ + def Word2VecSynonynms( + model: Word2VecModel, + word: String, + num: Int + ) = { + val result = model.findSynonyms(word, num) + val vec = Vectors.dense(result.map(_._2)) + val words = result.map(_._1).toArray + val ret = new java.util.LinkedList[java.lang.Object]() + ret.add(SerDe.serializeSeqString(words)) + ret.add(SerDe.serializeDoubleVector(vec)) + ret + } + /** * Java stub for Python mllib DecisionTree.train(). * This stub returns a handle to the Java object instead of the content of the Java object. @@ -659,6 +692,51 @@ private[spark] object SerDe extends Serializable { bytes } + private[python] def serializeSeqString(ss:Seq[String]): Array[Byte] = { + val seqLength = ss.length + val lengthArray = new Array[Int](seqLength) + var totalLength = 0 + for(s <- ss) { + totalLength += s.length + } + val bytes = new Array[Byte](8 + 4 * seqLength + totalLength) + val bb = ByteBuffer.wrap(bytes) + bb.order(ByteOrder.nativeOrder()) + bb.putInt(seqLength) + bb.putInt(totalLength) + for( i <- 0 until seqLength) { + bb.putInt(ss(i).length) + } + for(s <- ss) { + bb.put(s.getBytes()) + } + bytes + } + + private[python] def deserializeSeqString(bytes:Array[Byte]):Seq[String] = { + require(bytes.length >=0, "Byte array too short") + val seqLengthBytes = ByteBuffer.wrap(bytes, 0, 8) + seqLengthBytes.order(ByteOrder.nativeOrder()) + val ib = seqLengthBytes.asIntBuffer() + val seqLength = ib.get() + val totalLength = ib.get() + val lengthBytes = ByteBuffer.wrap(bytes, 8, 4 * seqLength) + lengthBytes.order(ByteOrder.nativeOrder()) + val stringBytes = ByteBuffer.wrap(bytes, 8 + 4 * seqLength, totalLength) + stringBytes.order(ByteOrder.nativeOrder()) + val ss = new Array[String](seqLength) + val lengthBuffer = lengthBytes.asIntBuffer() + var index = 0 + while(lengthBuffer.hasRemaining()){ + val curLen = lengthBuffer.get() + val content = new Array[Byte](curLen) + stringBytes.get(content, 0, curLen) + ss(index) = new String(content) + index += 1 + } + ss.toSeq + } + private[python] def serializeLabeledPoint(p: LabeledPoint): Array[Byte] = { val fb = serializeDoubleVector(p.features) val bytes = new Array[Byte](1 + 8 + fb.length) diff --git a/python/pyspark/mllib/Word2Vec.py b/python/pyspark/mllib/Word2Vec.py new file mode 100644 index 000000000000..02553b3ec88c --- /dev/null +++ b/python/pyspark/mllib/Word2Vec.py @@ -0,0 +1,88 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Python package for Word2Vec in MLlib. +""" + +from pyspark.mllib._common import \ + _get_unmangled_double_vector_rdd, _get_unmangled_rdd, \ + _serialize_double, _deserialize_double_matrix, _deserialize_double_vector, \ + _deserialize_string_seq, \ + _get_unmangled_string_seq_rdd + +__all__ = ['Word2Vec', 'Word2VecModel'] + +class Word2VecModel(object): + + def __init__(self, sc, java_model): + """ + :param sc: Spark context + :param java_model: Handle to Java model object + """ + self._sc = sc + self._java_model = java_model + + def __del__(self): + self._sc._gateway.detach(self._java_model) + + #def transform(self, word): + + #def findSynonyms(self, vector, num): + + def findSynonyms(self, word, num): + pythonAPI = self._sc._jvm.PythonMLLibAPI() + result = pythonAPI.Word2VecSynonynms(self._java_model, word, num) + similarity = _deserialize_double_vector(result[1]) + words = _deserialize_string_seq(result[0]) + ret = [] + for w,s in zip(words, similarity): + ret.append((w,s)) + return ret + +class Word2Vec(object): + """ + data:RDD[Array[String]] + """ + def __init__(self): + self.vectorSize = 100 + self.startingAlpha = 0.025 + self.numPartitions = 1 + self.numIterations = 1 + + def setVectorSize(self, vectorSize): + self.vectorSize = vectorSize + return self + + def setLearningRate(self, learningRate): + self.startingAlpha = learningRate + return self + + def setNumPartitions(self, numPartitions): + self.numPartitions = numPartitions + return self + + def setNumIterations(self, numIterations): + self.numIterations = numIterations + return self + + def fit(self, data): + sc = data.context + dataBytes = _get_unmangled_string_seq_rdd(data) + model = sc._jvm.PythonMLLibAPI().trainWord2Vec(dataBytes._jrdd) + return Word2VecModel(sc, model) + diff --git a/python/pyspark/mllib/_common.py b/python/pyspark/mllib/_common.py index bb60d3d0c846..b6b69d4522b7 100644 --- a/python/pyspark/mllib/_common.py +++ b/python/pyspark/mllib/_common.py @@ -143,6 +143,29 @@ def _serialize_double_vector(v): raise TypeError("_serialize_double_vector called on a %s; " "wanted ndarray or SparseVector" % type(v)) +def _serialize_string_seq(ss): + """Serialize a sequence of string""" + seqLength = len(ss) + totalLength = 0 + lengthArray = ndarray(shape=[seqLength], dtype=int32) + i = 0 + for s in ss: + length = len(s) + totalLength = totalLength + length + lengthArray[i] = length + i = i + 1 + ba = bytearray(4 + 4 + 4 * seqLength + totalLength) + header_bytes = ndarray(shape=[2], buffer=ba, offset=0, dtype=int32) + header_bytes[0] = seqLength + header_bytes[1] = totalLength + _copyto(lengthArray, buffer=ba, offset=8, shape=[seqLength],dtype=int32) + i = 0 + offset = 4 + 4 + 4 * seqLength + for s in ss: + ba[offset:offset + lengthArray[i]] = bytes(s) + offset = offset + lengthArray[i] + i = i + 1 + return ba def _serialize_dense_vector(v): """Serialize a dense vector given as a NumPy array.""" @@ -203,6 +226,19 @@ def _deserialize_double(ba, offset=0): return _unpack("d", ba[offset:])[0] +def _deserialize_string_seq(ba, offset=0): + nb = len(ba) - offset + headers = ndarray(shape=[2], buffer=ba, offset=offset, dtype=int32) + seqLength = headers[0] + totalLength = headers[1] + lengthArray = ndarray(shape=[seqLength], buffer=ba, offset=offset + 8, dtype=int32) + offset = offset + 8 + 4 * seqLength + ret = [] + for i in range(0, seqLength): + ret.append(str(ba[offset: offset + lengthArray[i]])) + offset = offset + lengthArray[i] + return ret + def _deserialize_double_vector(ba, offset=0): """Deserialize a double vector from a mutually understood format. @@ -363,6 +399,8 @@ def _get_unmangled_rdd(data, serializer, cache=True): dataBytes.cache() return dataBytes +def _get_unmangled_string_seq_rdd(data, cache=True): + return _get_unmangled_rdd(data, _serialize_string_seq, cache) def _get_unmangled_double_vector_rdd(data, cache=True): """ From 0ad3ac1efed6258607a79c0d45345d70a17dee47 Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Wed, 10 Sep 2014 03:02:56 -0700 Subject: [PATCH 02/16] minor fix --- .../scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index e06040782b6b..e156c67890db 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -298,7 +298,6 @@ class PythonMLLibAPI extends Serializable { dataBytesJRDD: JavaRDD[Array[Byte]] ): Word2VecModel = { val data = dataBytesJRDD.rdd.map(SerDe.deserializeSeqString) - data.collect() val word2vec = new Word2Vec() val model = word2vec.fit(data) model From 48d5e721a58924f33ebef31b9e67454f45480d5c Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Thu, 11 Sep 2014 02:50:30 -0700 Subject: [PATCH 03/16] Functionality improvement --- .../mllib/api/python/PythonMLLibAPI.scala | 63 ++++++++++++++++--- .../api/python/PythonMLLibAPISuite.scala | 7 +++ python/pyspark/mllib/Word2Vec.py | 60 +++++++++++++----- python/pyspark/mllib/_common.py | 46 +++++++++----- 4 files changed, 139 insertions(+), 37 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index e156c67890db..7a75c6f62b9c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -291,8 +291,12 @@ class PythonMLLibAPI extends Serializable { } /** - * Java stub for Python mllib Word2Vec fit(). - * @param dataBytesJRDD Input + * Java stub for Python mllib Word2Vec fit(). This stub returns a + * handle to the Java object instead of the content of the Java object. + * Extra care needs to be taken in the Python code to ensure it gets freed on + * exit; see the Py4J documentation. + * @param dataBytesJRDD Input JavaRDD + * @return A handle to java Word2VecModel instance at python side */ def trainWord2Vec( dataBytesJRDD: JavaRDD[Array[Byte]] @@ -304,19 +308,60 @@ class PythonMLLibAPI extends Serializable { } /** - * Java stub for Python mllib Word2VecModel + * Java stub for Python mllib Word2VecModel transform + * @param model Word2VecModel instance + * @param word a word + * @return serialized vector representation of word */ - def Word2VecSynonynms( + def Word2VecModelTransform( + model: Word2VecModel, + word: String + ): Array[Byte] = { + SerDe.serializeDoubleVector(model.transform(word)) + } + + /** + * Java stub for Python mllib Word2VecModel findSynonyms + * @param model Word2VecModel instance + * @param word a word + * @param num number of synonyms to find + * @return a java LinkedList containing serialized version of + * synonyms and similarities + */ + def Word2VecModelSynonyms( model: Word2VecModel, word: String, num: Int - ) = { + ): java.util.List[java.lang.Object] = { val result = model.findSynonyms(word, num) - val vec = Vectors.dense(result.map(_._2)) - val words = result.map(_._1).toArray + val similarity = Vectors.dense(result.map(_._2)) + val words = result.map(_._1) + val ret = new java.util.LinkedList[java.lang.Object]() + ret.add(SerDe.serializeSeqString(words)) + ret.add(SerDe.serializeDoubleVector(similarity)) + ret + } + + /** + * Java stub for Python mllib Word2VecModel findSynonyms + * @param model Word2VecModel instance + * @param vecBytes serialization of vector representation of words + * @param num number of synonyms to find + * @return a java LinkedList containing serialized version of + * synonyms and similarities + */ + def Word2VecModelSynonyms( + model: Word2VecModel, + vecBytes: Array[Byte], + num: Int + ): java.util.List[java.lang.Object] = { + val vec = SerDe.deserializeDoubleVector(vecBytes) + val result = model.findSynonyms(vec, num) + val similarity = Vectors.dense(result.map(_._2)) + val words = result.map(_._1) val ret = new java.util.LinkedList[java.lang.Object]() ret.add(SerDe.serializeSeqString(words)) - ret.add(SerDe.serializeDoubleVector(vec)) + ret.add(SerDe.serializeDoubleVector(similarity)) ret } @@ -713,7 +758,7 @@ private[spark] object SerDe extends Serializable { } private[python] def deserializeSeqString(bytes:Array[Byte]):Seq[String] = { - require(bytes.length >=0, "Byte array too short") + require(bytes.length >= 8, "Byte array too short") val seqLengthBytes = ByteBuffer.wrap(bytes, 0, 8) seqLengthBytes.order(ByteOrder.nativeOrder()) val ib = seqLengthBytes.asIntBuffer() diff --git a/mllib/src/test/scala/org/apache/spark/mllib/api/python/PythonMLLibAPISuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/api/python/PythonMLLibAPISuite.scala index 092d67bbc523..6e0764ea7212 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/api/python/PythonMLLibAPISuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/api/python/PythonMLLibAPISuite.scala @@ -79,4 +79,11 @@ class PythonMLLibAPISuite extends FunSuite { val empty2D = SerDe.to2dArray(emptyMatrix) assert(empty2D === Array[Array[Double]]()) } + + test("string seq serialization") { + val original = Array[String]("abc", "def", "ghi") + val bytes = SerDe.serializeSeqString(original) + val ss = SerDe.deserializeSeqString(bytes) + assert(ss === original) + } } diff --git a/python/pyspark/mllib/Word2Vec.py b/python/pyspark/mllib/Word2Vec.py index 02553b3ec88c..bc7c29518efc 100644 --- a/python/pyspark/mllib/Word2Vec.py +++ b/python/pyspark/mllib/Word2Vec.py @@ -20,15 +20,17 @@ """ from pyspark.mllib._common import \ - _get_unmangled_double_vector_rdd, _get_unmangled_rdd, \ - _serialize_double, _deserialize_double_matrix, _deserialize_double_vector, \ + _serialize_double_vector, \ + _deserialize_double_vector, \ _deserialize_string_seq, \ _get_unmangled_string_seq_rdd __all__ = ['Word2Vec', 'Word2VecModel'] class Word2VecModel(object): - + """ + class for Word2Vec model + """ def __init__(self, sc, java_model): """ :param sc: Spark context @@ -40,23 +42,38 @@ def __init__(self, sc, java_model): def __del__(self): self._sc._gateway.detach(self._java_model) - #def transform(self, word): + def transform(self, word): + pythonAPI = self._sc._jvm.PythonMLLibAPI() + result = pythonAPI.Word2VecModelTransform(self._java_model, word) + return _deserialize_double_vector(result) - #def findSynonyms(self, vector, num): - - def findSynonyms(self, word, num): + def findSynonyms(self, x, num): pythonAPI = self._sc._jvm.PythonMLLibAPI() - result = pythonAPI.Word2VecSynonynms(self._java_model, word, num) - similarity = _deserialize_double_vector(result[1]) + if type(x) == str: + result = pythonAPI.Word2VecModelSynonyms(self._java_model, x, num) + else: + xSer = _serialize_double_vector(x) + result = pythonAPI.Word2VecModelSynonyms(self._java_model, xSer, num) words = _deserialize_string_seq(result[0]) - ret = [] - for w,s in zip(words, similarity): - ret.append((w,s)) - return ret + similarity = _deserialize_double_vector(result[1]) + return zip(words, similarity) class Word2Vec(object): """ - data:RDD[Array[String]] + Word2Vec creates vector representation of words in a text corpus. + The algorithm first constructs a vocabulary from the corpus + and then learns vector representation of words in the vocabulary. + The vector representation can be used as features in + natural language processing and machine learning algorithms. + + We used skip-gram model in our implementation and hierarchical softmax + method to train the model. The variable names in the implementation + matches the original C implementation. + For original C implementation, see https://code.google.com/p/word2vec/ + For research papers, see + Efficient Estimation of Word Representations in Vector Space + and + Distributed Representations of Words and Phrases and their Compositionality. """ def __init__(self): self.vectorSize = 100 @@ -81,8 +98,23 @@ def setNumIterations(self, numIterations): return self def fit(self, data): + """ + :param data: Input RDD + """ sc = data.context dataBytes = _get_unmangled_string_seq_rdd(data) model = sc._jvm.PythonMLLibAPI().trainWord2Vec(dataBytes._jrdd) return Word2VecModel(sc, model) +def _test(): + import doctest + from pyspark import SparkContext + globs = globals().copy() + globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) + (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) + globs['sc'].stop() + if failure_count: + exit(-1) + +if __name__ == "__main__": + _test() diff --git a/python/pyspark/mllib/_common.py b/python/pyspark/mllib/_common.py index b6b69d4522b7..3c7782a3a2d8 100644 --- a/python/pyspark/mllib/_common.py +++ b/python/pyspark/mllib/_common.py @@ -144,7 +144,7 @@ def _serialize_double_vector(v): "wanted ndarray or SparseVector" % type(v)) def _serialize_string_seq(ss): - """Serialize a sequence of string""" + """Serialize a sequence of string.""" seqLength = len(ss) totalLength = 0 lengthArray = ndarray(shape=[seqLength], dtype=int32) @@ -200,6 +200,31 @@ def _serialize_sparse_vector(v): return ba +def _deserialize_string_seq(ba, offset=0): + """Deserialize a string sequence from a mutually understood format. + >>> import sys + >>> _derserialize_string_seq(_serialize_string_seq(['abc'])) == ['abc'] + True + """ + if type(ba) != bytearray: + raise TypeError("__deserialize_string_seq called on a %s; " + "wanted bytearray" % type(ba)) + nb = len(ba) - offset + if nb < 8: + raise TypeError("__deserialize_string_seq called on a %d-byte array, " + "which is too short" % nb) + headers = ndarray(shape=[2], buffer=ba, offset=offset, dtype=int32) + seqLength = headers[0] + totalLength = headers[1] + lengthArray = ndarray(shape=[seqLength], buffer=ba, offset=offset + 8, dtype=int32) + offset = offset + 8 + 4 * seqLength + ret = [] + for i in range(0, seqLength): + curLen = lengthArray[i] + ret.append(str(ba[offset: offset + curLen])) + offset = offset + curLen + return ret + def _deserialize_double(ba, offset=0): """Deserialize a double from a mutually understood format. @@ -226,19 +251,6 @@ def _deserialize_double(ba, offset=0): return _unpack("d", ba[offset:])[0] -def _deserialize_string_seq(ba, offset=0): - nb = len(ba) - offset - headers = ndarray(shape=[2], buffer=ba, offset=offset, dtype=int32) - seqLength = headers[0] - totalLength = headers[1] - lengthArray = ndarray(shape=[seqLength], buffer=ba, offset=offset + 8, dtype=int32) - offset = offset + 8 + 4 * seqLength - ret = [] - for i in range(0, seqLength): - ret.append(str(ba[offset: offset + lengthArray[i]])) - offset = offset + lengthArray[i] - return ret - def _deserialize_double_vector(ba, offset=0): """Deserialize a double vector from a mutually understood format. @@ -400,6 +412,12 @@ def _get_unmangled_rdd(data, serializer, cache=True): return dataBytes def _get_unmangled_string_seq_rdd(data, cache=True): + """ + Map a pickled Python RDD of Python string sequence to a Java RDD of + Array[Byte] + :param cache: If True, the serialized RDD is cached. (default = True) + WARNING: Users should unpersist() this later! + """ return _get_unmangled_rdd(data, _serialize_string_seq, cache) def _get_unmangled_double_vector_rdd(data, cache=True): From 68e7276896eeeb546f6f212f5a2f8ae5470cf0b5 Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Thu, 11 Sep 2014 04:04:36 -0700 Subject: [PATCH 04/16] minor style fixes --- python/pyspark/mllib/Word2Vec.py | 3 +++ python/pyspark/mllib/_common.py | 7 ++++++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/python/pyspark/mllib/Word2Vec.py b/python/pyspark/mllib/Word2Vec.py index bc7c29518efc..d9a0f809caae 100644 --- a/python/pyspark/mllib/Word2Vec.py +++ b/python/pyspark/mllib/Word2Vec.py @@ -27,6 +27,7 @@ __all__ = ['Word2Vec', 'Word2VecModel'] + class Word2VecModel(object): """ class for Word2Vec model @@ -58,6 +59,7 @@ def findSynonyms(self, x, num): similarity = _deserialize_double_vector(result[1]) return zip(words, similarity) + class Word2Vec(object): """ Word2Vec creates vector representation of words in a text corpus. @@ -106,6 +108,7 @@ def fit(self, data): model = sc._jvm.PythonMLLibAPI().trainWord2Vec(dataBytes._jrdd) return Word2VecModel(sc, model) + def _test(): import doctest from pyspark import SparkContext diff --git a/python/pyspark/mllib/_common.py b/python/pyspark/mllib/_common.py index 3c7782a3a2d8..9d85aa5d8500 100644 --- a/python/pyspark/mllib/_common.py +++ b/python/pyspark/mllib/_common.py @@ -143,6 +143,7 @@ def _serialize_double_vector(v): raise TypeError("_serialize_double_vector called on a %s; " "wanted ndarray or SparseVector" % type(v)) + def _serialize_string_seq(ss): """Serialize a sequence of string.""" seqLength = len(ss) @@ -158,7 +159,7 @@ def _serialize_string_seq(ss): header_bytes = ndarray(shape=[2], buffer=ba, offset=0, dtype=int32) header_bytes[0] = seqLength header_bytes[1] = totalLength - _copyto(lengthArray, buffer=ba, offset=8, shape=[seqLength],dtype=int32) + _copyto(lengthArray, buffer=ba, offset=8, shape=[seqLength], dtype=int32) i = 0 offset = 4 + 4 + 4 * seqLength for s in ss: @@ -167,6 +168,7 @@ def _serialize_string_seq(ss): i = i + 1 return ba + def _serialize_dense_vector(v): """Serialize a dense vector given as a NumPy array.""" if v.ndim != 1: @@ -225,6 +227,7 @@ def _deserialize_string_seq(ba, offset=0): offset = offset + curLen return ret + def _deserialize_double(ba, offset=0): """Deserialize a double from a mutually understood format. @@ -411,6 +414,7 @@ def _get_unmangled_rdd(data, serializer, cache=True): dataBytes.cache() return dataBytes + def _get_unmangled_string_seq_rdd(data, cache=True): """ Map a pickled Python RDD of Python string sequence to a Java RDD of @@ -420,6 +424,7 @@ def _get_unmangled_string_seq_rdd(data, cache=True): """ return _get_unmangled_rdd(data, _serialize_string_seq, cache) + def _get_unmangled_double_vector_rdd(data, cache=True): """ Map a pickled Python RDD of Python dense or sparse vectors to a Java RDD of From ca1e5ffe60e51d4e6435a22d086689a00be38c1a Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Thu, 11 Sep 2014 10:30:49 -0700 Subject: [PATCH 05/16] fix test --- python/pyspark/mllib/_common.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/mllib/_common.py b/python/pyspark/mllib/_common.py index 9d85aa5d8500..d57448de55f4 100644 --- a/python/pyspark/mllib/_common.py +++ b/python/pyspark/mllib/_common.py @@ -205,7 +205,7 @@ def _serialize_sparse_vector(v): def _deserialize_string_seq(ba, offset=0): """Deserialize a string sequence from a mutually understood format. >>> import sys - >>> _derserialize_string_seq(_serialize_string_seq(['abc'])) == ['abc'] + >>> _deserialize_string_seq(_serialize_string_seq(['abc'])) == ['abc'] True """ if type(ba) != bytearray: From 78bbb533be9f9a11cb81fe4278e0833ade7fe833 Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Thu, 25 Sep 2014 12:14:48 -0700 Subject: [PATCH 06/16] use pickle for seq string SerDe --- .../mllib/api/python/PythonMLLibAPI.scala | 22 +++++++-------- .../api/python/PythonMLLibAPISuite.scala | 7 ----- python/pyspark/mllib/Word2Vec.py | 28 +++++++++---------- 3 files changed, 24 insertions(+), 33 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index e9411d217c6b..92e57b8759bf 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -46,7 +46,6 @@ import org.apache.spark.mllib.util.MLUtils import org.apache.spark.rdd.RDD import org.apache.spark.util.Utils - /** * :: DeveloperApi :: * The Java stubs necessary for the Python mllib bindings. @@ -290,13 +289,13 @@ class PythonMLLibAPI extends Serializable { * handle to the Java object instead of the content of the Java object. * Extra care needs to be taken in the Python code to ensure it gets freed on * exit; see the Py4J documentation. - * @param dataBytesJRDD Input JavaRDD + * @param dataJRDD Input JavaRDD * @return A handle to java Word2VecModel instance at python side */ def trainWord2Vec( - dataBytesJRDD: JavaRDD[Array[Byte]] + dataJRDD: JavaRDD[java.util.ArrayList[String]] ): Word2VecModel = { - val data = dataBytesJRDD.rdd.map(SerDe.deserializeSeqString) + val data = dataJRDD.rdd.map(_.toArray(new Array[String](0)).toSeq).cache() val word2vec = new Word2Vec() val model = word2vec.fit(data) model @@ -311,8 +310,8 @@ class PythonMLLibAPI extends Serializable { def Word2VecModelTransform( model: Word2VecModel, word: String - ): Array[Byte] = { - SerDe.serializeDoubleVector(model.transform(word)) + ): Vector = { + model.transform(word) } /** @@ -332,8 +331,8 @@ class PythonMLLibAPI extends Serializable { val similarity = Vectors.dense(result.map(_._2)) val words = result.map(_._1) val ret = new java.util.LinkedList[java.lang.Object]() - ret.add(SerDe.serializeSeqString(words)) - ret.add(SerDe.serializeDoubleVector(similarity)) + ret.add(words) + ret.add(similarity) ret } @@ -347,16 +346,15 @@ class PythonMLLibAPI extends Serializable { */ def Word2VecModelSynonyms( model: Word2VecModel, - vecBytes: Array[Byte], + vec: Vector, num: Int ): java.util.List[java.lang.Object] = { - val vec = SerDe.deserializeDoubleVector(vecBytes) val result = model.findSynonyms(vec, num) val similarity = Vectors.dense(result.map(_._2)) val words = result.map(_._1) val ret = new java.util.LinkedList[java.lang.Object]() - ret.add(SerDe.serializeSeqString(words)) - ret.add(SerDe.serializeDoubleVector(similarity)) + ret.add(words) + ret.add(similarity) ret } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/api/python/PythonMLLibAPISuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/api/python/PythonMLLibAPISuite.scala index 39293ec9b015..db8ed62fa46c 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/api/python/PythonMLLibAPISuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/api/python/PythonMLLibAPISuite.scala @@ -91,11 +91,4 @@ class PythonMLLibAPISuite extends FunSuite { assert(bytes.length / 10 < 25) // 25 bytes per rating } - - test("string seq serialization") { - val original = Array[String]("abc", "def", "ghi") - val bytes = SerDe.serializeSeqString(original) - val ss = SerDe.deserializeSeqString(bytes) - assert(ss === original) - } } diff --git a/python/pyspark/mllib/Word2Vec.py b/python/pyspark/mllib/Word2Vec.py index d9a0f809caae..2871987222c7 100644 --- a/python/pyspark/mllib/Word2Vec.py +++ b/python/pyspark/mllib/Word2Vec.py @@ -19,11 +19,11 @@ Python package for Word2Vec in MLlib. """ -from pyspark.mllib._common import \ - _serialize_double_vector, \ - _deserialize_double_vector, \ - _deserialize_string_seq, \ - _get_unmangled_string_seq_rdd +from functools import wraps + +from pyspark import PickleSerializer + +from pyspark.mllib.linalg import _convert_to_vector __all__ = ['Word2Vec', 'Word2VecModel'] @@ -46,18 +46,19 @@ def __del__(self): def transform(self, word): pythonAPI = self._sc._jvm.PythonMLLibAPI() result = pythonAPI.Word2VecModelTransform(self._java_model, word) - return _deserialize_double_vector(result) + return PickleSerializer().loads(str(self._sc._jvm.SerDe.dumps(result))) def findSynonyms(self, x, num): + SerDe = self._sc._jvm.SerDe + ser = PickleSerializer() pythonAPI = self._sc._jvm.PythonMLLibAPI() if type(x) == str: - result = pythonAPI.Word2VecModelSynonyms(self._java_model, x, num) + jlist = pythonAPI.Word2VecModelSynonyms(self._java_model, x, num) else: - xSer = _serialize_double_vector(x) - result = pythonAPI.Word2VecModelSynonyms(self._java_model, xSer, num) - words = _deserialize_string_seq(result[0]) - similarity = _deserialize_double_vector(result[1]) - return zip(words, similarity) + bytes = bytearray(ser.dumps(_convert_to_vector(x))) + vec = self._sc._jvm.SerDe.loads(bytes) + jlist = pythonAPI.Word2VecModelSynonyms(self._java_model, vec, num) + return PickleSerializer().loads(str(self._sc._jvm.SerDe.dumps(jlist))) class Word2Vec(object): @@ -104,8 +105,7 @@ def fit(self, data): :param data: Input RDD """ sc = data.context - dataBytes = _get_unmangled_string_seq_rdd(data) - model = sc._jvm.PythonMLLibAPI().trainWord2Vec(dataBytes._jrdd) + model = sc._jvm.PythonMLLibAPI().trainWord2Vec(data._to_java_object_rdd()) return Word2VecModel(sc, model) From 89490bf8b61daf8687bfb272a6e2fa3965a7c1b1 Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Fri, 26 Sep 2014 18:11:08 -0700 Subject: [PATCH 07/16] add tests and Word2VecModelWrapper --- .../mllib/api/python/PythonMLLibAPI.scala | 84 +++++-------------- python/pyspark/mllib/Word2Vec.py | 35 ++++---- 2 files changed, 41 insertions(+), 78 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 92e57b8759bf..a24d25449cbe 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -29,6 +29,8 @@ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.mllib.classification._ import org.apache.spark.mllib.clustering._ +import org.apache.spark.mllib.feature.Word2Vec +import org.apache.spark.mllib.feature.Word2VecModel import org.apache.spark.mllib.optimization._ import org.apache.spark.mllib.linalg._ import org.apache.spark.mllib.random.{RandomRDDs => RG} @@ -40,8 +42,6 @@ import org.apache.spark.mllib.tree.impurity._ import org.apache.spark.mllib.tree.model.DecisionTreeModel import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics} import org.apache.spark.mllib.stat.correlation.CorrelationNames -import org.apache.spark.mllib.feature.Word2Vec -import org.apache.spark.mllib.feature.Word2VecModel import org.apache.spark.mllib.util.MLUtils import org.apache.spark.rdd.RDD import org.apache.spark.util.Utils @@ -290,72 +290,34 @@ class PythonMLLibAPI extends Serializable { * Extra care needs to be taken in the Python code to ensure it gets freed on * exit; see the Py4J documentation. * @param dataJRDD Input JavaRDD - * @return A handle to java Word2VecModel instance at python side + * @return A handle to java Word2VecModelWrapper instance at python side */ - def trainWord2Vec( - dataJRDD: JavaRDD[java.util.ArrayList[String]] - ): Word2VecModel = { - val data = dataJRDD.rdd.map(_.toArray(new Array[String](0)).toSeq).cache() + def trainWord2Vec(dataJRDD: JavaRDD[java.util.ArrayList[String]]): Word2VecModelWrapper = { + val data = dataJRDD.rdd.cache() val word2vec = new Word2Vec() val model = word2vec.fit(data) - model + new Word2VecModelWrapper(model) } - /** - * Java stub for Python mllib Word2VecModel transform - * @param model Word2VecModel instance - * @param word a word - * @return serialized vector representation of word - */ - def Word2VecModelTransform( - model: Word2VecModel, - word: String - ): Vector = { - model.transform(word) - } + private[python] class Word2VecModelWrapper(model: Word2VecModel) { + def transform(word: String): Vector = { + model.transform(word) + } - /** - * Java stub for Python mllib Word2VecModel findSynonyms - * @param model Word2VecModel instance - * @param word a word - * @param num number of synonyms to find - * @return a java LinkedList containing serialized version of - * synonyms and similarities - */ - def Word2VecModelSynonyms( - model: Word2VecModel, - word: String, - num: Int - ): java.util.List[java.lang.Object] = { - val result = model.findSynonyms(word, num) - val similarity = Vectors.dense(result.map(_._2)) - val words = result.map(_._1) - val ret = new java.util.LinkedList[java.lang.Object]() - ret.add(words) - ret.add(similarity) - ret - } + def findSynonyms(word: String, num: Int): java.util.List[java.lang.Object] = { + val vec = transform(word) + findSynonyms(vec, num) + } - /** - * Java stub for Python mllib Word2VecModel findSynonyms - * @param model Word2VecModel instance - * @param vecBytes serialization of vector representation of words - * @param num number of synonyms to find - * @return a java LinkedList containing serialized version of - * synonyms and similarities - */ - def Word2VecModelSynonyms( - model: Word2VecModel, - vec: Vector, - num: Int - ): java.util.List[java.lang.Object] = { - val result = model.findSynonyms(vec, num) - val similarity = Vectors.dense(result.map(_._2)) - val words = result.map(_._1) - val ret = new java.util.LinkedList[java.lang.Object]() - ret.add(words) - ret.add(similarity) - ret + def findSynonyms(vector: Vector, num: Int): java.util.List[java.lang.Object] = { + val result = model.findSynonyms(vector, num) + val similarity = Vectors.dense(result.map(_._2)) + val words = result.map(_._1) + val ret = new java.util.LinkedList[java.lang.Object]() + ret.add(words) + ret.add(similarity) + ret + } } /** diff --git a/python/pyspark/mllib/Word2Vec.py b/python/pyspark/mllib/Word2Vec.py index 2871987222c7..ff7e2a6197c0 100644 --- a/python/pyspark/mllib/Word2Vec.py +++ b/python/pyspark/mllib/Word2Vec.py @@ -19,8 +19,6 @@ Python package for Word2Vec in MLlib. """ -from functools import wraps - from pyspark import PickleSerializer from pyspark.mllib.linalg import _convert_to_vector @@ -44,21 +42,13 @@ def __del__(self): self._sc._gateway.detach(self._java_model) def transform(self, word): - pythonAPI = self._sc._jvm.PythonMLLibAPI() - result = pythonAPI.Word2VecModelTransform(self._java_model, word) + result = self._java_model.transform(word) return PickleSerializer().loads(str(self._sc._jvm.SerDe.dumps(result))) def findSynonyms(self, x, num): - SerDe = self._sc._jvm.SerDe - ser = PickleSerializer() - pythonAPI = self._sc._jvm.PythonMLLibAPI() - if type(x) == str: - jlist = pythonAPI.Word2VecModelSynonyms(self._java_model, x, num) - else: - bytes = bytearray(ser.dumps(_convert_to_vector(x))) - vec = self._sc._jvm.SerDe.loads(bytes) - jlist = pythonAPI.Word2VecModelSynonyms(self._java_model, vec, num) - return PickleSerializer().loads(str(self._sc._jvm.SerDe.dumps(jlist))) + jlist = self._java_model.findSynonyms(x, num) + words, similarity = PickleSerializer().loads(str(self._sc._jvm.SerDe.dumps(jlist))) + return zip(words, similarity) class Word2Vec(object): @@ -77,12 +67,22 @@ class Word2Vec(object): Efficient Estimation of Word Representations in Vector Space and Distributed Representations of Words and Phrases and their Compositionality. + >>> sentence = "a b " * 100 + "a c " * 10 + >>> localDoc = [sentence, sentence] + >>> doc = sc.parallelize(localDoc).map(lambda line: line.split(" ")) + >>> model = Word2Vec().setVectorSize(10).setSeed(42L).fit(doc) + >>> syms = model.findSynonyms("a", 2) + >>> str(syms[0][0]) + 'b' + >>> str(syms[1][0]) + 'c' """ def __init__(self): self.vectorSize = 100 self.startingAlpha = 0.025 self.numPartitions = 1 self.numIterations = 1 + self.seed = 42L def setVectorSize(self, vectorSize): self.vectorSize = vectorSize @@ -100,10 +100,11 @@ def setNumIterations(self, numIterations): self.numIterations = numIterations return self + def setSeed(self, seed): + self.seed = seed + return self + def fit(self, data): - """ - :param data: Input RDD - """ sc = data.context model = sc._jvm.PythonMLLibAPI().trainWord2Vec(data._to_java_object_rdd()) return Word2VecModel(sc, model) From b9a73831c15b88955d69ee5ea359117d1441b298 Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Fri, 26 Sep 2014 18:15:55 -0700 Subject: [PATCH 08/16] cache words RDD in fit --- .../scala/org/apache/spark/mllib/feature/Word2Vec.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala index fc1444705364..ab9ca0db9594 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala @@ -248,7 +248,7 @@ class Word2Vec extends Serializable with Logging { */ def fit[S <: Iterable[String]](dataset: RDD[S]): Word2VecModel = { - val words = dataset.flatMap(x => x) + val words = dataset.flatMap(x => x).cache() learnVocab(words) @@ -281,7 +281,9 @@ class Word2Vec extends Serializable with Logging { } } - val newSentences = sentences.repartition(numPartitions).cache() + val newSentences = sentences.repartition(numPartitions) + words.unpersist() + newSentences.cache() val initRandom = new XORShiftRandom(seed) val syn0Global = Array.fill[Float](vocabSize * vectorSize)((initRandom.nextFloat() - 0.5f) / vectorSize) From b7447eb1bdba4244ac5457489bbccaa118890f74 Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Sat, 27 Sep 2014 01:02:26 -0700 Subject: [PATCH 09/16] modify according to feedback --- .../mllib/api/python/PythonMLLibAPI.scala | 14 +++++++- .../apache/spark/mllib/feature/Word2Vec.scala | 6 ++-- python/pyspark/mllib/Word2Vec.py | 33 +++++++++++++++++-- 3 files changed, 45 insertions(+), 8 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index a24d25449cbe..f9f6a96d8525 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -292,9 +292,21 @@ class PythonMLLibAPI extends Serializable { * @param dataJRDD Input JavaRDD * @return A handle to java Word2VecModelWrapper instance at python side */ - def trainWord2Vec(dataJRDD: JavaRDD[java.util.ArrayList[String]]): Word2VecModelWrapper = { + def trainWord2Vec( + dataJRDD: JavaRDD[java.util.ArrayList[String]], + vectorSize: Int, + startingAlpha: Double, + numPartitions: Int, + numIterations: Int, + seed: Long + ): Word2VecModelWrapper = { val data = dataJRDD.rdd.cache() val word2vec = new Word2Vec() + .setVectorSize(vectorSize) + .setLearningRate(startingAlpha) + .setNumPartitions(numPartitions) + .setNumIterations(numIterations) + .setSeed(seed) val model = word2vec.fit(data) new Word2VecModelWrapper(model) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala index ab9ca0db9594..fc1444705364 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala @@ -248,7 +248,7 @@ class Word2Vec extends Serializable with Logging { */ def fit[S <: Iterable[String]](dataset: RDD[S]): Word2VecModel = { - val words = dataset.flatMap(x => x).cache() + val words = dataset.flatMap(x => x) learnVocab(words) @@ -281,9 +281,7 @@ class Word2Vec extends Serializable with Logging { } } - val newSentences = sentences.repartition(numPartitions) - words.unpersist() - newSentences.cache() + val newSentences = sentences.repartition(numPartitions).cache() val initRandom = new XORShiftRandom(seed) val syn0Global = Array.fill[Float](vocabSize * vectorSize)((initRandom.nextFloat() - 0.5f) / vectorSize) diff --git a/python/pyspark/mllib/Word2Vec.py b/python/pyspark/mllib/Word2Vec.py index ff7e2a6197c0..62eaef314fe8 100644 --- a/python/pyspark/mllib/Word2Vec.py +++ b/python/pyspark/mllib/Word2Vec.py @@ -18,8 +18,11 @@ """ Python package for Word2Vec in MLlib. """ +from numpy import random -from pyspark import PickleSerializer +from sys import maxint + +from pyspark.serializers import PickleSerializer, AutoBatchedSerializer from pyspark.mllib.linalg import _convert_to_vector @@ -42,10 +45,18 @@ def __del__(self): self._sc._gateway.detach(self._java_model) def transform(self, word): + """ + local use only + TODO: make transform usable in RDD operations from python side + """ result = self._java_model.transform(word) return PickleSerializer().loads(str(self._sc._jvm.SerDe.dumps(result))) def findSynonyms(self, x, num): + """ + local use only + TODO: make findSynonyms usable in RDD operations from python side + """ jlist = self._java_model.findSynonyms(x, num) words, similarity = PickleSerializer().loads(str(self._sc._jvm.SerDe.dumps(jlist))) return zip(words, similarity) @@ -67,6 +78,7 @@ class Word2Vec(object): Efficient Estimation of Word Representations in Vector Space and Distributed Representations of Words and Phrases and their Compositionality. + >>> sentence = "a b " * 100 + "a c " * 10 >>> localDoc = [sentence, sentence] >>> doc = sc.parallelize(localDoc).map(lambda line: line.split(" ")) @@ -76,13 +88,18 @@ class Word2Vec(object): 'b' >>> str(syms[1][0]) 'c' + >>> len(syms) + 2 + >>> vec = model.transform("a") + >>> len(vec) + 10 """ def __init__(self): self.vectorSize = 100 self.startingAlpha = 0.025 self.numPartitions = 1 self.numIterations = 1 - self.seed = 42L + self.seed = random.randint(0, high=maxint) def setVectorSize(self, vectorSize): self.vectorSize = vectorSize @@ -106,7 +123,17 @@ def setSeed(self, seed): def fit(self, data): sc = data.context - model = sc._jvm.PythonMLLibAPI().trainWord2Vec(data._to_java_object_rdd()) + ser = PickleSerializer() + vectorSize = self.vectorSize + startingAlpha = self.startingAlpha + numPartitions = self.numPartitions + numIterations = self.numIterations + seed = self.seed + + # cached = data._reserialize(AutoBatchedSerializer(ser)).cache() + model = sc._jvm.PythonMLLibAPI().trainWord2Vec( + data._to_java_object_rdd(), vectorSize, + startingAlpha, numPartitions, numIterations, seed) return Word2VecModel(sc, model) From cdef9f482fd235533764432ebd4a2d5006e0290b Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Fri, 3 Oct 2014 14:40:37 -0700 Subject: [PATCH 10/16] add missing comments --- .../mllib/api/python/PythonMLLibAPI.scala | 22 +++++---- .../apache/spark/mllib/feature/Word2Vec.scala | 10 ++-- python/pyspark/mllib/Word2Vec.py | 48 +++++++++++++++---- 3 files changed, 58 insertions(+), 22 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index f9f6a96d8525..033ce27a2efd 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -289,24 +289,28 @@ class PythonMLLibAPI extends Serializable { * handle to the Java object instead of the content of the Java object. * Extra care needs to be taken in the Python code to ensure it gets freed on * exit; see the Py4J documentation. - * @param dataJRDD Input JavaRDD + * @param dataJRDD input JavaRDD + * @param vectorSize size of vector + * @param learningRate initial learning rate + * @param numPartitions number of partitions + * @param numIterations number of iterations + * @param seed initial seed for random generator * @return A handle to java Word2VecModelWrapper instance at python side */ def trainWord2Vec( dataJRDD: JavaRDD[java.util.ArrayList[String]], vectorSize: Int, - startingAlpha: Double, + learningRate: Double, numPartitions: Int, numIterations: Int, - seed: Long - ): Word2VecModelWrapper = { + seed: Long): Word2VecModelWrapper = { val data = dataJRDD.rdd.cache() val word2vec = new Word2Vec() - .setVectorSize(vectorSize) - .setLearningRate(startingAlpha) - .setNumPartitions(numPartitions) - .setNumIterations(numIterations) - .setSeed(seed) + .setVectorSize(vectorSize) + .setLearningRate(learningRate) + .setNumPartitions(numPartitions) + .setNumIterations(numIterations) + .setSeed(seed) val model = word2vec.fit(data) new Word2VecModelWrapper(model) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala index fc1444705364..5f88e7190ee7 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala @@ -67,7 +67,7 @@ private case class VocabWord( class Word2Vec extends Serializable with Logging { private var vectorSize = 100 - private var startingAlpha = 0.025 + private var learningRate = 0.025 private var numPartitions = 1 private var numIterations = 1 private var seed = Utils.random.nextLong() @@ -84,7 +84,7 @@ class Word2Vec extends Serializable with Logging { * Sets initial learning rate (default: 0.025). */ def setLearningRate(learningRate: Double): this.type = { - this.startingAlpha = learningRate + this.learningRate = learningRate this } @@ -286,7 +286,7 @@ class Word2Vec extends Serializable with Logging { val syn0Global = Array.fill[Float](vocabSize * vectorSize)((initRandom.nextFloat() - 0.5f) / vectorSize) val syn1Global = new Array[Float](vocabSize * vectorSize) - var alpha = startingAlpha + var alpha = learningRate for (k <- 1 to numIterations) { val partial = newSentences.mapPartitionsWithIndex { case (idx, iter) => val random = new XORShiftRandom(seed ^ ((idx + 1) << 16) ^ ((-k - 1) << 8)) @@ -300,8 +300,8 @@ class Word2Vec extends Serializable with Logging { lwc = wordCount // TODO: discount by iteration? alpha = - startingAlpha * (1 - numPartitions * wordCount.toDouble / (trainWordsCount + 1)) - if (alpha < startingAlpha * 0.0001) alpha = startingAlpha * 0.0001 + learningRate * (1 - numPartitions * wordCount.toDouble / (trainWordsCount + 1)) + if (alpha < learningRate * 0.0001) alpha = learningRate * 0.0001 logInfo("wordCount = " + wordCount + ", alpha = " + alpha) } wc += sentence.size diff --git a/python/pyspark/mllib/Word2Vec.py b/python/pyspark/mllib/Word2Vec.py index 62eaef314fe8..bd555e9c6163 100644 --- a/python/pyspark/mllib/Word2Vec.py +++ b/python/pyspark/mllib/Word2Vec.py @@ -18,10 +18,10 @@ """ Python package for Word2Vec in MLlib. """ -from numpy import random - from sys import maxint +from numpy import random + from pyspark.serializers import PickleSerializer, AutoBatchedSerializer from pyspark.mllib.linalg import _convert_to_vector @@ -46,7 +46,10 @@ def __del__(self): def transform(self, word): """ - local use only + :param word: a word + :return: vector representation of word + + Note: local use only TODO: make transform usable in RDD operations from python side """ result = self._java_model.transform(word) @@ -54,7 +57,11 @@ def transform(self, word): def findSynonyms(self, x, num): """ - local use only + :param x: a word or a vector representation of word + :param num: number of synonyms to find + :return: array of (word, cosineSimilarity) + + Note: local use only TODO: make findSynonyms usable in RDD operations from python side """ jlist = self._java_model.findSynonyms(x, num) @@ -95,37 +102,62 @@ class Word2Vec(object): 10 """ def __init__(self): + """ + Construct Word2Vec instance + """ self.vectorSize = 100 - self.startingAlpha = 0.025 + self.learningRate = 0.025 self.numPartitions = 1 self.numIterations = 1 self.seed = random.randint(0, high=maxint) def setVectorSize(self, vectorSize): + """ + Sets vector size (default: 100). + """ self.vectorSize = vectorSize return self def setLearningRate(self, learningRate): - self.startingAlpha = learningRate + """ + Sets initial learning rate (default: 0.025). + """ + self.learningRate = learningRate return self def setNumPartitions(self, numPartitions): + """ + Sets number of partitions (default: 1). Use a small number for accuracy. + """ self.numPartitions = numPartitions return self def setNumIterations(self, numIterations): + """ + Sets number of iterations (default: 1), which should be smaller than or equal to number of + partitions. + """ self.numIterations = numIterations return self def setSeed(self, seed): + """ + Sets random seed (default: a random long integer). + """ self.seed = seed return self def fit(self, data): + """ + Computes the vector representation of each word in vocabulary. + + :param data: training data. + :return: python Word2VecModel instance + """ sc = data.context ser = PickleSerializer() vectorSize = self.vectorSize - startingAlpha = self.startingAlpha + learningRate = self.learningRate numPartitions = self.numPartitions numIterations = self.numIterations seed = self.seed @@ -133,7 +165,7 @@ def fit(self, data): # cached = data._reserialize(AutoBatchedSerializer(ser)).cache() model = sc._jvm.PythonMLLibAPI().trainWord2Vec( data._to_java_object_rdd(), vectorSize, - startingAlpha, numPartitions, numIterations, seed) + learningRate, numPartitions, numIterations, seed) return Word2VecModel(sc, model) From 1bdcd2e4e90b07eaf0f92c03cad34dab4c519821 Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Fri, 3 Oct 2014 15:02:38 -0700 Subject: [PATCH 11/16] minor fixes --- .../spark/mllib/api/python/PythonMLLibAPI.scala | 12 ++++++------ .../org/apache/spark/mllib/feature/Word2Vec.scala | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 033ce27a2efd..6cf235210256 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -298,12 +298,12 @@ class PythonMLLibAPI extends Serializable { * @return A handle to java Word2VecModelWrapper instance at python side */ def trainWord2Vec( - dataJRDD: JavaRDD[java.util.ArrayList[String]], - vectorSize: Int, - learningRate: Double, - numPartitions: Int, - numIterations: Int, - seed: Long): Word2VecModelWrapper = { + dataJRDD: JavaRDD[java.util.ArrayList[String]], + vectorSize: Int, + learningRate: Double, + numPartitions: Int, + numIterations: Int, + seed: Long): Word2VecModelWrapper = { val data = dataJRDD.rdd.cache() val word2vec = new Word2Vec() .setVectorSize(vectorSize) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala index 5f88e7190ee7..d321994c2a65 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala @@ -437,7 +437,7 @@ class Word2VecModel private[mllib] ( * Find synonyms of a word * @param word a word * @param num number of synonyms to find - * @return array of (word, similarity) + * @return array of (word, cosineSimilarity) */ def findSynonyms(word: String, num: Int): Array[(String, Double)] = { val vector = transform(word) From 3d8007ba4cf97136a4cfbfa49e2f0fe85ceff6a4 Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Fri, 3 Oct 2014 15:43:14 -0700 Subject: [PATCH 12/16] fix findSynonyms for vector --- python/pyspark/mllib/Word2Vec.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/python/pyspark/mllib/Word2Vec.py b/python/pyspark/mllib/Word2Vec.py index bd555e9c6163..1ecb9a5c15cf 100644 --- a/python/pyspark/mllib/Word2Vec.py +++ b/python/pyspark/mllib/Word2Vec.py @@ -64,8 +64,14 @@ def findSynonyms(self, x, num): Note: local use only TODO: make findSynonyms usable in RDD operations from python side """ - jlist = self._java_model.findSynonyms(x, num) - words, similarity = PickleSerializer().loads(str(self._sc._jvm.SerDe.dumps(jlist))) + ser = PickleSerializer() + if type(x) == str: + jlist = self._java_model.findSynonyms(x, num) + else: + bytes = bytearray(ser.dumps(_convert_to_vector(x))) + vec = self._sc._jvm.SerDe.loads(bytes) + jlist = self._java_model.findSynonyms(vec, num) + words, similarity = ser.loads(str(self._sc._jvm.SerDe.dumps(jlist))) return zip(words, similarity) @@ -100,6 +106,13 @@ class Word2Vec(object): >>> vec = model.transform("a") >>> len(vec) 10 + >>> syms = model.findSynonyms(vec, 2) + >>> str(syms[0][0]) + 'b' + >>> str(syms[1][0]) + 'c' + >>> len(syms) + 2 """ def __init__(self): """ From a73fa19786bca754ecf8567bc83bdce1f90569ee Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Fri, 3 Oct 2014 16:28:56 -0700 Subject: [PATCH 13/16] clean up --- python/pyspark/mllib/Word2Vec.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/python/pyspark/mllib/Word2Vec.py b/python/pyspark/mllib/Word2Vec.py index 1ecb9a5c15cf..cfdec0326c4e 100644 --- a/python/pyspark/mllib/Word2Vec.py +++ b/python/pyspark/mllib/Word2Vec.py @@ -18,10 +18,6 @@ """ Python package for Word2Vec in MLlib. """ -from sys import maxint - -from numpy import random - from pyspark.serializers import PickleSerializer, AutoBatchedSerializer from pyspark.mllib.linalg import _convert_to_vector @@ -122,7 +118,7 @@ def __init__(self): self.learningRate = 0.025 self.numPartitions = 1 self.numIterations = 1 - self.seed = random.randint(0, high=maxint) + self.seed = 42L def setVectorSize(self, vectorSize): """ From daf88a6d6d901185b4699ee6f7325865f3174e07 Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Tue, 7 Oct 2014 12:55:24 -0700 Subject: [PATCH 14/16] modification according to feedback --- .../mllib/api/python/PythonMLLibAPI.scala | 14 +- python/docs/pyspark.mllib.rst | 8 + python/pyspark/mllib/Word2Vec.py | 192 ------------------ python/pyspark/mllib/feature.py | 0 python/run-tests | 1 + 5 files changed, 17 insertions(+), 198 deletions(-) delete mode 100644 python/pyspark/mllib/Word2Vec.py create mode 100644 python/pyspark/mllib/feature.py diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 6cf235210256..7617ff70e7c0 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -44,6 +44,7 @@ import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics} import org.apache.spark.mllib.stat.correlation.CorrelationNames import org.apache.spark.mllib.util.MLUtils import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel import org.apache.spark.util.Utils /** @@ -304,14 +305,15 @@ class PythonMLLibAPI extends Serializable { numPartitions: Int, numIterations: Int, seed: Long): Word2VecModelWrapper = { - val data = dataJRDD.rdd.cache() + val data = dataJRDD.rdd.persist(StorageLevel.MEMORY_AND_DISK_SER) val word2vec = new Word2Vec() - .setVectorSize(vectorSize) - .setLearningRate(learningRate) - .setNumPartitions(numPartitions) - .setNumIterations(numIterations) - .setSeed(seed) + .setVectorSize(vectorSize) + .setLearningRate(learningRate) + .setNumPartitions(numPartitions) + .setNumIterations(numIterations) + .setSeed(seed) val model = word2vec.fit(data) + data.unpersist() new Word2VecModelWrapper(model) } diff --git a/python/docs/pyspark.mllib.rst b/python/docs/pyspark.mllib.rst index e95d19e97f15..4548b8739ed9 100644 --- a/python/docs/pyspark.mllib.rst +++ b/python/docs/pyspark.mllib.rst @@ -20,6 +20,14 @@ pyspark.mllib.clustering module :undoc-members: :show-inheritance: +pyspark.mllib.feature module +------------------------------- + +.. automodule:: pyspark.mllib.feature + :members: + :undoc-members: + :show-inheritance: + pyspark.mllib.linalg module --------------------------- diff --git a/python/pyspark/mllib/Word2Vec.py b/python/pyspark/mllib/Word2Vec.py deleted file mode 100644 index cfdec0326c4e..000000000000 --- a/python/pyspark/mllib/Word2Vec.py +++ /dev/null @@ -1,192 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -""" -Python package for Word2Vec in MLlib. -""" -from pyspark.serializers import PickleSerializer, AutoBatchedSerializer - -from pyspark.mllib.linalg import _convert_to_vector - -__all__ = ['Word2Vec', 'Word2VecModel'] - - -class Word2VecModel(object): - """ - class for Word2Vec model - """ - def __init__(self, sc, java_model): - """ - :param sc: Spark context - :param java_model: Handle to Java model object - """ - self._sc = sc - self._java_model = java_model - - def __del__(self): - self._sc._gateway.detach(self._java_model) - - def transform(self, word): - """ - :param word: a word - :return: vector representation of word - - Note: local use only - TODO: make transform usable in RDD operations from python side - """ - result = self._java_model.transform(word) - return PickleSerializer().loads(str(self._sc._jvm.SerDe.dumps(result))) - - def findSynonyms(self, x, num): - """ - :param x: a word or a vector representation of word - :param num: number of synonyms to find - :return: array of (word, cosineSimilarity) - - Note: local use only - TODO: make findSynonyms usable in RDD operations from python side - """ - ser = PickleSerializer() - if type(x) == str: - jlist = self._java_model.findSynonyms(x, num) - else: - bytes = bytearray(ser.dumps(_convert_to_vector(x))) - vec = self._sc._jvm.SerDe.loads(bytes) - jlist = self._java_model.findSynonyms(vec, num) - words, similarity = ser.loads(str(self._sc._jvm.SerDe.dumps(jlist))) - return zip(words, similarity) - - -class Word2Vec(object): - """ - Word2Vec creates vector representation of words in a text corpus. - The algorithm first constructs a vocabulary from the corpus - and then learns vector representation of words in the vocabulary. - The vector representation can be used as features in - natural language processing and machine learning algorithms. - - We used skip-gram model in our implementation and hierarchical softmax - method to train the model. The variable names in the implementation - matches the original C implementation. - For original C implementation, see https://code.google.com/p/word2vec/ - For research papers, see - Efficient Estimation of Word Representations in Vector Space - and - Distributed Representations of Words and Phrases and their Compositionality. - - >>> sentence = "a b " * 100 + "a c " * 10 - >>> localDoc = [sentence, sentence] - >>> doc = sc.parallelize(localDoc).map(lambda line: line.split(" ")) - >>> model = Word2Vec().setVectorSize(10).setSeed(42L).fit(doc) - >>> syms = model.findSynonyms("a", 2) - >>> str(syms[0][0]) - 'b' - >>> str(syms[1][0]) - 'c' - >>> len(syms) - 2 - >>> vec = model.transform("a") - >>> len(vec) - 10 - >>> syms = model.findSynonyms(vec, 2) - >>> str(syms[0][0]) - 'b' - >>> str(syms[1][0]) - 'c' - >>> len(syms) - 2 - """ - def __init__(self): - """ - Construct Word2Vec instance - """ - self.vectorSize = 100 - self.learningRate = 0.025 - self.numPartitions = 1 - self.numIterations = 1 - self.seed = 42L - - def setVectorSize(self, vectorSize): - """ - Sets vector size (default: 100). - """ - self.vectorSize = vectorSize - return self - - def setLearningRate(self, learningRate): - """ - Sets initial learning rate (default: 0.025). - """ - self.learningRate = learningRate - return self - - def setNumPartitions(self, numPartitions): - """ - Sets number of partitions (default: 1). Use a small number for accuracy. - """ - self.numPartitions = numPartitions - return self - - def setNumIterations(self, numIterations): - """ - Sets number of iterations (default: 1), which should be smaller than or equal to number of - partitions. - """ - self.numIterations = numIterations - return self - - def setSeed(self, seed): - """ - Sets random seed (default: a random long integer). - """ - self.seed = seed - return self - - def fit(self, data): - """ - Computes the vector representation of each word in vocabulary. - - :param data: training data. - :return: python Word2VecModel instance - """ - sc = data.context - ser = PickleSerializer() - vectorSize = self.vectorSize - learningRate = self.learningRate - numPartitions = self.numPartitions - numIterations = self.numIterations - seed = self.seed - - # cached = data._reserialize(AutoBatchedSerializer(ser)).cache() - model = sc._jvm.PythonMLLibAPI().trainWord2Vec( - data._to_java_object_rdd(), vectorSize, - learningRate, numPartitions, numIterations, seed) - return Word2VecModel(sc, model) - - -def _test(): - import doctest - from pyspark import SparkContext - globs = globals().copy() - globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) - (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) - globs['sc'].stop() - if failure_count: - exit(-1) - -if __name__ == "__main__": - _test() diff --git a/python/pyspark/mllib/feature.py b/python/pyspark/mllib/feature.py new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/python/run-tests b/python/run-tests index a7ec270c7da2..7f699574f236 100755 --- a/python/run-tests +++ b/python/run-tests @@ -83,6 +83,7 @@ run_test "pyspark/mllib/stat.py" run_test "pyspark/mllib/tests.py" run_test "pyspark/mllib/tree.py" run_test "pyspark/mllib/util.py" +run-test "pyspark/mllib/feature.py" # Try to test with PyPy if [ $(which pypy) ]; then From b13a0b9d47cb3a6604ece9773bcbbd2877db6299 Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Tue, 7 Oct 2014 15:00:53 -0700 Subject: [PATCH 15/16] resolve merge conflicts and minor fixes --- python/pyspark/mllib/feature.py | 193 ++++++++++++++++++++++++++++++++ python/run-tests | 1 + 2 files changed, 194 insertions(+) diff --git a/python/pyspark/mllib/feature.py b/python/pyspark/mllib/feature.py index e69de29bb2d1..6dd65701d84d 100644 --- a/python/pyspark/mllib/feature.py +++ b/python/pyspark/mllib/feature.py @@ -0,0 +1,193 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +Python package for feature in MLlib. +""" +from pyspark.serializers import PickleSerializer, AutoBatchedSerializer + +from pyspark.mllib.linalg import _convert_to_vector + +__all__ = ['Word2Vec', 'Word2VecModel'] + + +class Word2VecModel(object): + """ + class for Word2Vec model + """ + def __init__(self, sc, java_model): + """ + :param sc: Spark context + :param java_model: Handle to Java model object + """ + self._sc = sc + self._java_model = java_model + + def __del__(self): + self._sc._gateway.detach(self._java_model) + + def transform(self, word): + """ + :param word: a word + :return: vector representation of word + Transforms a word to its vector representation + + Note: local use only + """ + # TODO: make transform usable in RDD operations from python side + result = self._java_model.transform(word) + return PickleSerializer().loads(str(self._sc._jvm.SerDe.dumps(result))) + + def findSynonyms(self, x, num): + """ + :param x: a word or a vector representation of word + :param num: number of synonyms to find + :return: array of (word, cosineSimilarity) + Find synonyms of a word + + Note: local use only + """ + # TODO: make findSynonyms usable in RDD operations from python side + ser = PickleSerializer() + if type(x) == str: + jlist = self._java_model.findSynonyms(x, num) + else: + bytes = bytearray(ser.dumps(_convert_to_vector(x))) + vec = self._sc._jvm.SerDe.loads(bytes) + jlist = self._java_model.findSynonyms(vec, num) + words, similarity = ser.loads(str(self._sc._jvm.SerDe.dumps(jlist))) + return zip(words, similarity) + + +class Word2Vec(object): + """ + Word2Vec creates vector representation of words in a text corpus. + The algorithm first constructs a vocabulary from the corpus + and then learns vector representation of words in the vocabulary. + The vector representation can be used as features in + natural language processing and machine learning algorithms. + + We used skip-gram model in our implementation and hierarchical softmax + method to train the model. The variable names in the implementation + matches the original C implementation. + For original C implementation, see https://code.google.com/p/word2vec/ + For research papers, see + Efficient Estimation of Word Representations in Vector Space + and + Distributed Representations of Words and Phrases and their Compositionality. + + >>> sentence = "a b " * 100 + "a c " * 10 + >>> localDoc = [sentence, sentence] + >>> doc = sc.parallelize(localDoc).map(lambda line: line.split(" ")) + >>> model = Word2Vec().setVectorSize(10).setSeed(42L).fit(doc) + >>> syms = model.findSynonyms("a", 2) + >>> str(syms[0][0]) + 'b' + >>> str(syms[1][0]) + 'c' + >>> len(syms) + 2 + >>> vec = model.transform("a") + >>> len(vec) + 10 + >>> syms = model.findSynonyms(vec, 2) + >>> str(syms[0][0]) + 'b' + >>> str(syms[1][0]) + 'c' + >>> len(syms) + 2 + """ + def __init__(self): + """ + Construct Word2Vec instance + """ + self.vectorSize = 100 + self.learningRate = 0.025 + self.numPartitions = 1 + self.numIterations = 1 + self.seed = 42L + + def setVectorSize(self, vectorSize): + """ + Sets vector size (default: 100). + """ + self.vectorSize = vectorSize + return self + + def setLearningRate(self, learningRate): + """ + Sets initial learning rate (default: 0.025). + """ + self.learningRate = learningRate + return self + + def setNumPartitions(self, numPartitions): + """ + Sets number of partitions (default: 1). Use a small number for accuracy. + """ + self.numPartitions = numPartitions + return self + + def setNumIterations(self, numIterations): + """ + Sets number of iterations (default: 1), which should be smaller than or equal to number of + partitions. + """ + self.numIterations = numIterations + return self + + def setSeed(self, seed): + """ + Sets random seed. + """ + self.seed = seed + return self + + def fit(self, data): + """ + Computes the vector representation of each word in vocabulary. + + :param data: training data. RDD of subtype of Iterable[String] + :return: python Word2VecModel instance + """ + sc = data.context + ser = PickleSerializer() + vectorSize = self.vectorSize + learningRate = self.learningRate + numPartitions = self.numPartitions + numIterations = self.numIterations + seed = self.seed + + model = sc._jvm.PythonMLLibAPI().trainWord2Vec( + data._to_java_object_rdd(), vectorSize, + learningRate, numPartitions, numIterations, seed) + return Word2VecModel(sc, model) + + +def _test(): + import doctest + from pyspark import SparkContext + globs = globals().copy() + globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2) + (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) + globs['sc'].stop() + if failure_count: + exit(-1) + +if __name__ == "__main__": + _test() diff --git a/python/run-tests b/python/run-tests index c713861eb77b..63395f72788f 100755 --- a/python/run-tests +++ b/python/run-tests @@ -69,6 +69,7 @@ function run_mllib_tests() { echo "Run mllib tests ..." run_test "pyspark/mllib/classification.py" run_test "pyspark/mllib/clustering.py" + run_test "pyspark/mllib/feature.py" run_test "pyspark/mllib/linalg.py" run_test "pyspark/mllib/random.py" run_test "pyspark/mllib/recommendation.py" From 476ea34c9f576d425a05604f77cc3cab43fd5bae Mon Sep 17 00:00:00 2001 From: Liquan Pei Date: Tue, 7 Oct 2014 15:07:23 -0700 Subject: [PATCH 16/16] style fixes --- python/pyspark/mllib/feature.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyspark/mllib/feature.py b/python/pyspark/mllib/feature.py index 6dd65701d84d..a44a27fd3b6a 100644 --- a/python/pyspark/mllib/feature.py +++ b/python/pyspark/mllib/feature.py @@ -45,7 +45,7 @@ def transform(self, word): :param word: a word :return: vector representation of word Transforms a word to its vector representation - + Note: local use only """ # TODO: make transform usable in RDD operations from python side @@ -58,7 +58,7 @@ def findSynonyms(self, x, num): :param num: number of synonyms to find :return: array of (word, cosineSimilarity) Find synonyms of a word - + Note: local use only """ # TODO: make findSynonyms usable in RDD operations from python side