diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/topicmodeling/AbstractPLSA.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/topicmodeling/AbstractPLSA.scala new file mode 100644 index 0000000000000..9928de331ba8a --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/topicmodeling/AbstractPLSA.scala @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering.topicmodeling + +import java.util.Random + +import org.apache.spark.SparkContext +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.mllib.clustering.topicmodeling.regulaizers.{TopicsRegularizer, MatrixInPlaceModification} +import org.apache.spark.mllib.feature.Document +import org.apache.spark.rdd.RDD + +import breeze.linalg._ + +private[topicmodeling] trait AbstractPLSA[DocumentParameterType <: DocumentParameters, + GlobalParameterType <: GlobalParameters, + GlobalCounterType <: GlobalCounters] + extends TopicModel[DocumentParameterType, GlobalParameterType] with MatrixInPlaceModification { + protected val numberOfTopics: Int + protected val random: Random + protected val topicRegularizer: TopicsRegularizer + protected val sc: SparkContext + + protected def generalizedPerplexity(topicsBC: Broadcast[Array[Array[Float]]], + parameters: RDD[DocumentParameterType], + collectionLength: Int, + wordGivenModel: DocumentParameterType => (Int, Int) => Float) = { + math.exp(-(parameters.aggregate(0f)((thatOne, otherOne) => + thatOne + singleDocumentLikelihood(otherOne, topicsBC, wordGivenModel(otherOne)), + (thatOne, otherOne) => thatOne + otherOne) + topicRegularizer(topicsBC.value)) / + collectionLength) + } + + protected def getAlphabetSize(documents: RDD[Document]) = documents.first().alphabetSize + + protected def getCollectionLength(documents: RDD[Document]) = + documents.map(doc => sum(doc.tokens)).reduce(_ + _) + + protected def singleDocumentLikelihood(parameter: DocumentParameters, + topicsBC: Broadcast[Array[Array[Float]]], + wordGivenModel: ((Int, Int) => Float)) = { + sum(parameter.document.tokens.mapActivePairs(wordGivenModel)) + + parameter.priorThetaLogProbability + } + + protected def probabilityOfWordGivenTopic(word: Int, + parameter: DocumentParameters, + topicsBC: Broadcast[Array[Array[Float]]]) = { + var underLog = 0f + for (topic <- 0 until numberOfTopics) { + underLog += parameter.theta(topic) * topicsBC.value(topic)(word) + } + underLog + } + + protected def getInitialTopics(alphabetSize: Int) = { + val topics = Array.fill[Float](numberOfTopics, alphabetSize)(random.nextFloat) + normalize(topics) + sc.broadcast(topics) + } + + protected def getTopics(parameters: RDD[DocumentParameterType], + alphabetSize: Int, + oldTopics: Broadcast[Array[Array[Float]]], + globalCounters: GlobalCounterType, + foldingIn : Boolean) = { + if (foldingIn) oldTopics + else { + val newTopicCnt: Array[Array[Float]] = globalCounters.wordsFromTopics + + topicRegularizer.regularize(newTopicCnt, oldTopics.value) + normalize(newTopicCnt) + + sc.broadcast(newTopicCnt) + } + } + + private def normalize(matrix: Array[Array[Float]]) = { + matrix.foreach(array => { + val sum = array.sum + shift(array, (arr, i) => arr(i) /= sum) + }) + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/topicmodeling/DocumentParameters.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/topicmodeling/DocumentParameters.scala new file mode 100644 index 0000000000000..171ea8f6f4276 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/topicmodeling/DocumentParameters.scala @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering.topicmodeling + +import breeze.linalg.SparseVector +import org.apache.spark.mllib.clustering.topicmodeling.regulaizers.DocumentOverTopicDistributionRegularizer +import org.apache.spark.mllib.feature.Document + +/** + * the class contains document parameter in PLSA model + * @param document + * @param theta the distribution over topics + * @param regularizer + */ +class DocumentParameters(val document: Document, + val theta: Array[Float], + private val regularizer: DocumentOverTopicDistributionRegularizer) + extends Serializable { + private def getZ(topics: Array[Array[Float]]) = { + val numberOfTopics = topics.size + + document.tokens.mapActivePairs { case (word, n) => + (0 until numberOfTopics).foldLeft(0f)((sum, topic) => + sum + topics(topic)(word) * theta(topic)) + } + } + + private[topicmodeling] def wordsFromTopics(topics: Array[Array[Float]]): + Array[SparseVector[Float]] = { + val Z = getZ(topics) + + wordsToTopicCnt(topics, Z) + } + + private[topicmodeling] def wordsToTopicCnt(topics: Array[Array[Float]], + Z: SparseVector[Float]): Array[SparseVector[Float]] = { + val array = Array.ofDim[SparseVector[Float]](theta.size) + forWithIndex(theta)((topicWeight, topicNum) => + array(topicNum) = document.tokens.mapActivePairs { case (word, + num) => num * topics(topicNum)(word) * topicWeight / Z(word) + } + ) + array + } + + private def forWithIndex(array: Array[Float])(operation: (Float, Int) => Unit) { + var i = 0 + val size = array.size + while (i < size) { + operation(array(i), i) + i += 1 + } + } + + private[topicmodeling] def assignNewTheta(topics: Array[Array[Float]], + Z: SparseVector[Float]) { + val newTheta: Array[Float] = { + val array = Array.ofDim[Float](theta.size) + forWithIndex(theta)((weight, topicNum) => array(topicNum) = weight * document.tokens + .activeIterator.foldLeft(0f) { case (sum, (word, wordNum)) => + sum + wordNum * topics(topicNum)(word) / Z(word) + }) + array + } + regularizer.regularize(newTheta, theta) + + val newThetaSum = newTheta.sum + + forWithIndex(newTheta)((wordsNum, topicNum) => theta(topicNum) = wordsNum / newThetaSum) + + } + + private[topicmodeling] def getNewTheta(topics: Array[Array[Float]]) = { + val Z = getZ(topics) + assignNewTheta(topics, Z) + + this + } + + private[topicmodeling] def priorThetaLogProbability = regularizer(theta) + +} + + +private[topicmodeling] object DocumentParameters extends SparseVectorFasterSum { + + def apply(document: Document, + numberOfTopics: Int, + regularizer: DocumentOverTopicDistributionRegularizer) = { + val theta = getTheta(numberOfTopics) + new DocumentParameters(document, theta, regularizer) + } + + private def getTheta(numberOfTopics: Int) = { + Array.fill[Float](numberOfTopics)(1f / numberOfTopics) + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/topicmodeling/GlobalCounters.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/topicmodeling/GlobalCounters.scala new file mode 100644 index 0000000000000..3bf9478f13a0e --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/topicmodeling/GlobalCounters.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering.topicmodeling + +/** + * contains global counters in PLSA model -- holds n_{tw} (Vorontov's notation) counters and + * alphabet size + * + * @param wordsFromTopics + * @param alphabetSize + */ +private[topicmodeling] class GlobalCounters(val wordsFromTopics: Array[Array[Float]], + val alphabetSize: Int) extends Serializable { + + /** + * merges two GlobalParameters into a single one + * @param that other GlobalParameters + * @return GlobalParameters + */ + private[topicmodeling] def + (that: GlobalCounters) = { + wordsFromTopics.zip(that.wordsFromTopics).foreach { case (thisOne, otherOne) => + (0 until alphabetSize).foreach(i => thisOne(i) += otherOne(i)) + } + + new GlobalCounters(wordsFromTopics, alphabetSize) + } + + /** + * calculates and add local parameters to global parameters + * @param that DocumentParameters. + * @param topics words by topics distribution + * @param alphabetSize number of unique words + * @return GlobalParameters + */ + private[topicmodeling] def add(that: DocumentParameters, + topics: Array[Array[Float]], + alphabetSize: Int) = { + + val wordsFromTopic = that.wordsFromTopics(topics) + + wordsFromTopic.zip(wordsFromTopics).foreach { case (topic, words) => + topic.activeIterator.foreach{ case (word, num) => + words(word) += num + } + } + this + } +} + +private[topicmodeling] object GlobalCounters { + def apply(topicNum: Int, alphabetSize: Int) = + new GlobalCounters(Array.ofDim[Float](topicNum, alphabetSize), alphabetSize) +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/topicmodeling/GlobalParameters.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/topicmodeling/GlobalParameters.scala new file mode 100644 index 0000000000000..749905f8ea277 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/topicmodeling/GlobalParameters.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.spark.mllib.clustering.topicmodeling + +/** + * Holds global parameters of PLSA model -- \Phi matrix (topics over words distribution) and + * alphabet size + * + * @param phi -- distribution of topics over words + * @param alphabetSize + */ +class GlobalParameters(val phi : Array[Array[Float]], val alphabetSize : Int) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/topicmodeling/PLSA.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/topicmodeling/PLSA.scala new file mode 100644 index 0000000000000..df2143fb2fe46 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/topicmodeling/PLSA.scala @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering.topicmodeling + +import java.util.Random + +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.mllib.clustering.topicmodeling.regulaizers.{DocumentOverTopicDistributionRegularizer, TopicsRegularizer, UniformDocumentOverTopicRegularizer, UniformTopicRegularizer} +import org.apache.spark.mllib.feature.Document +import org.apache.spark.rdd.RDD +import org.apache.spark.{Logging, SparkContext} + + +/** + * + * distributed topic modeling via PLSA (Hofmann (1999), Vorontsov, Potapenko (2014) ) + * @param sc spark context + * @param numberOfTopics number of topics + * @param numberOfIterations number of iterations + * @param random java.util.Random need for initialisation + * @param documentOverTopicDistributionRegularizer + * @param topicRegularizer + * @param computePpx boolean. If true, model computes perplexity and prints it puts in the log at + * INFO level. it takes some time and memory + */ +class PLSA(@transient protected val sc: SparkContext, + protected val numberOfTopics: Int, + private val numberOfIterations: Int, + protected val random: Random, + private val documentOverTopicDistributionRegularizer: + DocumentOverTopicDistributionRegularizer = new UniformDocumentOverTopicRegularizer, + @transient protected val topicRegularizer: TopicsRegularizer = + new UniformTopicRegularizer, + private val computePpx: Boolean = true) + extends AbstractPLSA[DocumentParameters, GlobalParameters, GlobalCounters] + with Logging + with Serializable { + + + /** + * + * @param documents -- document collection + * @return a pair of rdd of document parameters global parameters + */ + override def infer(documents: RDD[Document]): (RDD[DocumentParameters], GlobalParameters) = { + val alphabetSize = getAlphabetSize(documents) + EM(documents, getInitialTopics(alphabetSize), alphabetSize, foldingIn = false) + } + + /** + * + * @param documents docs to be folded in + * @param globalParams global parameters that were produced by infer method (stores topics) + * @return + */ + override def foldIn(documents: RDD[Document], + globalParams: GlobalParameters): RDD[DocumentParameters] = { + EM(documents, sc.broadcast(globalParams.phi), globalParams.alphabetSize, foldingIn = true)._1 + } + + private def EM(documents: RDD[Document], + topicBC: Broadcast[Array[Array[Float]]], + alphabetSize : Int, + foldingIn : Boolean): (RDD[DocumentParameters], GlobalParameters) = { + val alphabetSize = getAlphabetSize(documents) + + val collectionLength = getCollectionLength(documents) + + val parameters = documents.map(doc => DocumentParameters(doc, + numberOfTopics, + documentOverTopicDistributionRegularizer)) + + val (result, topics) = newIteration(parameters, + topicBC, + alphabetSize, + collectionLength, + 0, + foldingIn) + + (result, new GlobalParameters(topics.value,alphabetSize)) + } + + private def newIteration(parameters: RDD[DocumentParameters], + topicsBC: Broadcast[Array[Array[Float]]], + alphabetSize: Int, + collectionLength: Int, + numberOfIteration: Int, + foldingIn : Boolean): (RDD[DocumentParameters], Broadcast[Array[Array[Float]]]) = { + if (computePpx) { + logInfo("Iteration number " + numberOfIteration) + logInfo("Perplexity=" + perplexity(topicsBC, parameters, collectionLength)) + } + if (numberOfIteration == numberOfIterations) { + (parameters, topicsBC) + } else { + val newParameters = parameters.map(param => param.getNewTheta(topicsBC.value)).cache() + val globalCounters = getGlobalCounters(parameters, topicsBC, alphabetSize) + val newTopics = getTopics(newParameters, + alphabetSize, + topicsBC, + globalCounters, + foldingIn) + + parameters.unpersist() + topicsBC.unpersist() + + newIteration(newParameters, + newTopics, + alphabetSize, + collectionLength, + numberOfIteration + 1, + foldingIn) + } + } + + private def getGlobalCounters(parameters: RDD[DocumentParameters], + topics: Broadcast[Array[Array[Float]]], + alphabetSize: Int) = { + parameters.aggregate[GlobalCounters](GlobalCounters(numberOfTopics, + alphabetSize))((thatOne, otherOne) => thatOne.add(otherOne, topics.value, alphabetSize), + (thatOne, otherOne) => thatOne + otherOne) + } + + private def perplexity(topicsBC: Broadcast[Array[Array[Float]]], + parameters: RDD[DocumentParameters], collectionLength: Int) = { + generalizedPerplexity(topicsBC, + parameters, + collectionLength, + par => (word,num) => num * math.log(probabilityOfWordGivenTopic(word, par, topicsBC)).toFloat) + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/topicmodeling/RobustDocumentParameters.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/topicmodeling/RobustDocumentParameters.scala new file mode 100644 index 0000000000000..f8663593188f1 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/topicmodeling/RobustDocumentParameters.scala @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering.topicmodeling + +import breeze.linalg.{SparseVector, sum} +import org.apache.spark.mllib.clustering.topicmodeling.regulaizers.DocumentOverTopicDistributionRegularizer +import org.apache.spark.mllib.feature.Document + +/** + * the class contains document parameter in Robust PLSA model + * + * @param document + * @param theta the distribution over topics + * @param noise noisiness of words + * @param regularizer + */ +class RobustDocumentParameters(document: Document, + theta: Array[Float], + val noise: SparseVector[Float], + regularizer: DocumentOverTopicDistributionRegularizer) + extends DocumentParameters(document, theta, regularizer) { + + private def getZ(topics: Array[Array[Float]], + background: Array[Float], + eps: Float, + gamma: Float) = { + val numberOfTopics = topics.size + + val Z = document.tokens.mapActivePairs { case (word, n) => + val sum = (0 until numberOfTopics).foldLeft(0f)((sum, topic) => + sum + topics(topic)(word) * theta(topic)) + (eps * noise(word) + gamma * background(word) + sum) / (1 + eps + gamma) + } + Z + } + + private[topicmodeling] def wordsFromTopicsAndWordsFromBackground( + topics: Array[Array[Float]], + background: Array[Float], + eps: Float, + gamma: Float): (Array[SparseVector[Float]], SparseVector[Float]) = { + val Z = getZ(topics, background, eps, gamma) + + (wordsToTopicCnt(topics, Z), wordToBackgroundCnt(background, eps, gamma, Z)) + } + + + private def wordToBackgroundCnt(background: Array[Float], + eps: Float, + gamma: Float, + Z: SparseVector[Float]): SparseVector[Float] = { + document.tokens.mapActivePairs { case (word, num) => + num * background(word) * gamma / Z(word) + } + } + + + private def getNoise(eps: Float, Z: SparseVector[Float]) = { + val newWordsFromNoise = document.tokens.mapActivePairs { case (word,num) => + eps * noise(word) * num / Z(word) + } + + val noiseWordsSum = sum(newWordsFromNoise) + + if (noiseWordsSum > 0) { + newWordsFromNoise.mapActiveValues(_ / noiseWordsSum) + } else { + newWordsFromNoise.mapActiveValues(i => 0f) + } + } + + /** + * calculates a new distribution of this document by topic, corresponding to the new topics + */ + private[topicmodeling] def getNewTheta(topics: Array[Array[Float]], + background: Array[Float], + eps: Float, + gamma: Float) = { + val Z = getZ(topics, background, eps, gamma) + + assignNewTheta(topics, Z) + + val newNoise: SparseVector[Float] = getNoise(eps, Z) + new RobustDocumentParameters(document, theta, newNoise, regularizer) + } + + +} + +/** + * companion object of DocumentParameters. Create new DocumentParameters and contain some methods + */ +private[topicmodeling] object RobustDocumentParameters extends SparseVectorFasterSum { + /** + * create new DocumentParameters + * @param document + * @param numberOfTopics + * @param gamma weight of background + * @param eps weight of noise + * @return new DocumentParameters + */ + def apply(document: Document, + numberOfTopics: Int, + gamma: Float, + eps: Float, + regularizer: DocumentOverTopicDistributionRegularizer) = { + val wordsNum = sum(document.tokens) + val noise = document.tokens.mapActiveValues(word => 1f / wordsNum) + + val documentParameters: DocumentParameters = DocumentParameters(document, numberOfTopics, + regularizer) + new RobustDocumentParameters(document, documentParameters.theta, noise, regularizer) + } + +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/topicmodeling/RobustGlobalCounters.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/topicmodeling/RobustGlobalCounters.scala new file mode 100644 index 0000000000000..4736dde7db9b7 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/topicmodeling/RobustGlobalCounters.scala @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering.topicmodeling + +/** + * contains number of words, generated by topic, and words from background. + * @param wordsFromTopic + * @param backgroundWords background topics counter + * @param alphabetSize + */ +private[topicmodeling] class RobustGlobalCounters(wordsFromTopic: Array[Array[Float]], + val backgroundWords: Array[Float], + alphabetSize: Int) extends GlobalCounters(wordsFromTopic, alphabetSize) { + + /** + * merges two GlobalParameters into a single one + * @return GlobalParameters + */ + def + (that: RobustGlobalCounters) = { + super. + (that) + + for (i <- 0 until alphabetSize) { + backgroundWords(i) += that.backgroundWords(i) + } + + new RobustGlobalCounters(wordsFromTopic, backgroundWords, alphabetSize) + } + + /** + * adds a local parameter to global parameters + * @param that DocumentParameters. + * @param topics broadcasted words by topics distribution + * @param background words by background distribution + * @param eps weight of noise + * @param gamma weight of background + * @param alphabetSize number of unique words + * @return GlobalParameters + */ + def add(that: RobustDocumentParameters, + topics: Array[Array[Float]], + background: Array[Float], + eps: Float, + gamma: Float, + alphabetSize: Int) = { + + val (wordsFromTopicInDoc, wordsFromBackground) = + that.wordsFromTopicsAndWordsFromBackground(topics, background, eps: Float, gamma) + + wordsFromTopicInDoc.zip(wordsFromTopic).foreach { case (topic, words) => + topic.activeIterator.foreach { case (word, num) => + words(word) += num + } + } + + wordsFromBackground.activeIterator.foreach { case (key, value) => + backgroundWords(key) += value + } + + this + } +} + +/** + * companion object of class GlobalParameters + */ +private[topicmodeling] object RobustGlobalCounters { + /** + * construct new GlobalParameters + * @param topicNum number of topics + * @param alphabetSize number of unique words + * @return new GlobalParameters + */ + def apply(topicNum: Int, alphabetSize: Int) = { + val topicWords = Array.ofDim[Float](topicNum, alphabetSize) + val backgroundWords = new Array[Float](alphabetSize) + new RobustGlobalCounters(topicWords, backgroundWords, alphabetSize) + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/topicmodeling/RobustGlobalParameters.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/topicmodeling/RobustGlobalParameters.scala new file mode 100644 index 0000000000000..00acadecf9dd3 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/topicmodeling/RobustGlobalParameters.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.spark.mllib.clustering.topicmodeling + +/** + * Holds global parameters of PLSA model -- \Phi matrix (topics over words distribution) and + * distribution of words in background + * + * @param phi -- distribution of topics over words + * @param alphabetSize + * @param background + */ +class RobustGlobalParameters(phi : Array[Array[Float]], + alphabetSize: Int, + val background : Array[Float] ) + extends GlobalParameters(phi, alphabetSize) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/topicmodeling/RobustPLSA.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/topicmodeling/RobustPLSA.scala new file mode 100644 index 0000000000000..5bf9f3e6da155 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/topicmodeling/RobustPLSA.scala @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering.topicmodeling + +import java.util.Random + +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.mllib.clustering.topicmodeling.regulaizers.{DocumentOverTopicDistributionRegularizer, TopicsRegularizer, UniformDocumentOverTopicRegularizer, UniformTopicRegularizer} +import org.apache.spark.mllib.feature.Document +import org.apache.spark.rdd.RDD +import org.apache.spark.{Logging, SparkContext} + + +/** + * distributed topic modeling via RobustPLSA (Hofmann (1999), Vorontsov, Potapenko (2014) ) + * + * @param sc spark context + * @param numberOfTopics number of topics + * @param numberOfIterations number of iterations + * @param random java.util.Random need for initialisation + * @param documentOverTopicDistributionRegularizer + * @param topicRegularizer + * @param computePpx boolean. If true, model computes perplexity and prints it puts in the log at + * INFO level. it takes some time and memory + * @param gamma weight of background + * @param eps weight of noise + */ +class RobustPLSA(@transient protected val sc: SparkContext, + protected val numberOfTopics: Int, + protected val numberOfIterations: Int, + protected val random: Random, + private val documentOverTopicDistributionRegularizer: + DocumentOverTopicDistributionRegularizer = + new UniformDocumentOverTopicRegularizer, + @transient protected val topicRegularizer: TopicsRegularizer = + new UniformTopicRegularizer, + private val computePpx: Boolean = true, + private val gamma: Float = 0.3f, + private val eps: Float = 0.01f) + extends AbstractPLSA[RobustDocumentParameters, RobustGlobalParameters, RobustGlobalCounters] + with Logging + with Serializable { + + + /** + * + * @param documents -- document collection + * @return a pair of rdd of document parameters global parameters + */ + override def infer(documents: RDD[Document]) + : (RDD[RobustDocumentParameters], RobustGlobalParameters) = { + val alphabetSize = getAlphabetSize(documents) + EM(documents, getInitialTopics(alphabetSize), alphabetSize, foldingIn = false) + } + + + /** + * + * @param documents docs to be folded in + * @param globalParams global parameters that were produced by infer method (stores topics) + * @return + */ + override def foldIn(documents: RDD[Document], + globalParams: RobustGlobalParameters): RDD[RobustDocumentParameters] = { + EM(documents, sc.broadcast(globalParams.phi), globalParams.alphabetSize, foldingIn = true)._1 + } + + private def EM(documents: RDD[Document], + topicBC: Broadcast[Array[Array[Float]]], + alphabetSize : Int, + foldingIn : Boolean) :(RDD[RobustDocumentParameters], RobustGlobalParameters) = { + val collectionLength = getCollectionLength(documents) + + val parameters = documents.map(doc => RobustDocumentParameters(doc, + numberOfTopics, + gamma, + eps, + documentOverTopicDistributionRegularizer)) + + val initBackground = sc.broadcast(Array.fill(alphabetSize)(1f / alphabetSize)) + + val (result, topics, newBackground) = newIteration(parameters, + topicBC, + initBackground, + alphabetSize, + collectionLength, + 0, + foldingIn) + + (result, new RobustGlobalParameters(topics.value, alphabetSize, newBackground.value)) + } + + + private def newIteration(parameters: RDD[RobustDocumentParameters], + topicsBC: Broadcast[Array[Array[Float]]], + backgroundBC: Broadcast[Array[Float]], + alphabetSize: Int, + collectionLength: Int, + numberOfIteration: Int, + foldingIn : Boolean): + (RDD[RobustDocumentParameters], + Broadcast[Array[Array[Float]]], + Broadcast[Array[Float]]) = { + + if (computePpx) { + logInfo("Iteration number " + numberOfIteration) + logInfo("Perplexity=" + perplexity(topicsBC, parameters, backgroundBC, collectionLength)) + } + if (numberOfIteration == numberOfIterations) { + (parameters, topicsBC, backgroundBC) + } else { + val newParameters = parameters.map(parameter => + parameter.getNewTheta(topicsBC.value, backgroundBC.value, eps, gamma)).cache() + val globalCounters = getGlobalCounters(parameters, topicsBC, backgroundBC, alphabetSize) + val newTopics = getTopics(newParameters, + alphabetSize, + topicsBC, + globalCounters, + foldingIn) + + val newBackground = sc.broadcast(getNewBackground(globalCounters)) + + parameters.unpersist() + topicsBC.unpersist() + backgroundBC.unpersist() + + newIteration(newParameters, + newTopics, + newBackground, + alphabetSize, + collectionLength, + numberOfIteration + 1, + foldingIn) + } + } + + private def getGlobalCounters(parameters: RDD[RobustDocumentParameters], + topics: Broadcast[Array[Array[Float]]], + background: Broadcast[Array[Float]], + alphabetSize: Int) = { + parameters.aggregate[RobustGlobalCounters](RobustGlobalCounters(numberOfTopics, + alphabetSize))( + (thatOne, otherOne) => + thatOne.add(otherOne, topics.value, background.value, eps, gamma,alphabetSize), + (thatOne, otherOne) => + thatOne + otherOne) + } + + private def getNewBackground(globalCounters: RobustGlobalCounters) = { + val sum = globalCounters.backgroundWords.sum + if (sum > 0 && gamma != 0) { + globalCounters.backgroundWords.map(i => i / sum) + } else { + Array.fill[Float](globalCounters.alphabetSize)(0f) + } + } + + + private def perplexity(topicsBC: Broadcast[Array[Array[Float]]], + parameters: RDD[RobustDocumentParameters], + backgroundBC: Broadcast[Array[Float]], + collectionLength: Int) = + generalizedPerplexity(topicsBC, + parameters, + collectionLength, + par => (word,num) => + num * math.log((probabilityOfWordGivenTopic(word, par, topicsBC) + + gamma * backgroundBC.value(word) + + eps * par.noise(word)) / (1 + eps + gamma)).toFloat) + +} + diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/topicmodeling/SparseVectorFasterSum.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/topicmodeling/SparseVectorFasterSum.scala new file mode 100644 index 0000000000000..c5f804be45eb8 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/topicmodeling/SparseVectorFasterSum.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering.topicmodeling + +import breeze.linalg.SparseVector + + +/** + * implements faster sparse int vector sum() + */ +private[topicmodeling] trait SparseVectorFasterSum { + protected def sum[T](vector: SparseVector[Int]) = { + var sum = 0 + for (element <- vector) sum += element + sum + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/topicmodeling/TopicModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/topicmodeling/TopicModel.scala new file mode 100644 index 0000000000000..555a39f402955 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/topicmodeling/TopicModel.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering.topicmodeling + +import org.apache.spark.mllib.feature.Document +import org.apache.spark.rdd.RDD + +/** + * topic modeling interface + */ +trait TopicModel[DocumentParameterType <: DocumentParameters, + GlobalParameterType <: GlobalParameters] { + /** + * + * @param documents document collection + * @return a pair of rdd of document parameters global parameters + */ + def infer(documents: RDD[Document]): (RDD[DocumentParameterType], GlobalParameterType) + + /** + * + * This method performs folding in. + * + * It finds distribution of the given documents over topics provided. + * + * @param documents docs to be folded in + * @param globalParams global parameters that were produced by infer method (stores topics) + * @return + */ + def foldIn(documents : RDD[Document], + globalParams : GlobalParameterType) : RDD[DocumentParameterType] +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/topicmodeling/regulaizers/DocumentOverTopicDistributionRegularizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/topicmodeling/regulaizers/DocumentOverTopicDistributionRegularizer.scala new file mode 100644 index 0000000000000..31a559c1bef86 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/topicmodeling/regulaizers/DocumentOverTopicDistributionRegularizer.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.spark.mllib.clustering.topicmodeling.regulaizers + +/** + * Defines a prior distribution (possibly, improper) on \Theta matrix row (a document over topics) + */ +trait DocumentOverTopicDistributionRegularizer extends Serializable with MatrixInPlaceModification { + /** + * + * @param theta a distribution of a document over topics + * @return the log of prior probability. Is used only for correct perplexity calculation + */ + private[mllib] def apply(theta: Array[Float]): Float + + /** + * This implementation performs positive cut for every value of theta + * @param thetaCnt -- the number of words in a document generated by every topic (n_{dt} + * counter in Vorontsov's notation) + * @param oldTheta -- a distribution of the document over topics obtained in the previous + * iterations + */ + private[mllib] def regularize(thetaCnt: Array[Float], oldTheta: Array[Float]): Unit = + shift(thetaCnt, (x,i) => x(i) = math.max(0, x(i))) +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/topicmodeling/regulaizers/MatrixInPlaceModification.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/topicmodeling/regulaizers/MatrixInPlaceModification.scala new file mode 100644 index 0000000000000..b1b7521022f2e --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/topicmodeling/regulaizers/MatrixInPlaceModification.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + + +package org.apache.spark.mllib.clustering.topicmodeling.regulaizers + +private[topicmodeling] trait MatrixInPlaceModification { + protected def shift(matrix: Array[Array[Float]], + op: (Array[Array[Float]], Int, Int) => Unit): Unit = + for (i <- 0 until matrix.size; j <- 0 until matrix.head.size) op(matrix, i, j) + + protected def shift(matrix: Array[Float], op: (Array[Float], Int) => Unit): Unit = + for (i <- 0 until matrix.size) op(matrix, i) +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/topicmodeling/regulaizers/SymmetricDirichletDocumentOverTopicDistributionRegularizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/topicmodeling/regulaizers/SymmetricDirichletDocumentOverTopicDistributionRegularizer.scala new file mode 100644 index 0000000000000..7c197e7be6dbb --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/topicmodeling/regulaizers/SymmetricDirichletDocumentOverTopicDistributionRegularizer.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + + +package org.apache.spark.mllib.clustering.topicmodeling.regulaizers + +import org.apache.spark.mllib.stat.impl.DirichletDistribution + +/** + * Defines symmetric Dirichlet prior + * @param alpha - paarmeter of Dirichlet distribution + */ +class SymmetricDirichletDocumentOverTopicDistributionRegularizer(private[mllib] val alpha: Float) + extends DocumentOverTopicDistributionRegularizer + with MatrixInPlaceModification { + + private val dirichletDistribution = new DirichletDistribution(alpha) + + private[mllib] override def apply(theta: Array[Float]): Float = + dirichletDistribution.logPDF(theta) + + private[mllib] override def regularize(theta: Array[Float], oldTheta: Array[Float]) = { + shift(theta, (theta, i) => theta(i) += alpha - 1) + super.regularize(theta, oldTheta) + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/topicmodeling/regulaizers/SymmetricDirichletTopicRegularizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/topicmodeling/regulaizers/SymmetricDirichletTopicRegularizer.scala new file mode 100644 index 0000000000000..4bff66cdf3066 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/topicmodeling/regulaizers/SymmetricDirichletTopicRegularizer.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + + +package org.apache.spark.mllib.clustering.topicmodeling.regulaizers + +import org.apache.spark.mllib.stat.impl.DirichletDistribution + + +/** Defines Dirichlet symmetric prior on every topic + * + * @param alpha - parmeter of Dirichlet distribution + */ +class SymmetricDirichletTopicRegularizer(private[mllib] val alpha: Float) extends TopicsRegularizer + with MatrixInPlaceModification { + private val dirichletDistribution = new DirichletDistribution(alpha) + + private[mllib] override def apply(topics: Array[Array[Float]]): Float = + topics.foldLeft(0f)((sum,x) => sum + dirichletDistribution.logPDF(x)) + + private[mllib] override def regularize(topics: Array[Array[Float]], + oldTopics: Array[Array[Float]]): Unit = { + shift(topics, (matrix, i, j) => matrix(i)(j) += (alpha - 1)) + super.regularize(topics, oldTopics: Array[Array[Float]]) + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/topicmodeling/regulaizers/TopicsRegularizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/topicmodeling/regulaizers/TopicsRegularizer.scala new file mode 100644 index 0000000000000..85acc2f53aedf --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/topicmodeling/regulaizers/TopicsRegularizer.scala @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.spark.mllib.clustering.topicmodeling.regulaizers + +/** + * Defines a prior distribution (possibly, improper) on \Phi matrix (words over topics) + */ +trait TopicsRegularizer extends MatrixInPlaceModification { + /** + * + * @param topics \Phi matrix + * @return log prior probability of \Phi. Is used for perplexity calculation only + */ + private[mllib] def apply(topics: Array[Array[Float]]): Float + + /** + * This implementation performs a positive cut on every element + * @param topicsCnt number of times the word was generated by a topic (n_{wt} in Vorontsov's + * notation) + * @param oldTopics + */ + private[mllib] def regularize(topicsCnt: Array[Array[Float]], + oldTopics: Array[Array[Float]]): Unit = + shift(topicsCnt, (x, i, j) => x(i)(j) = math.max(0, x(i)(j))) +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/topicmodeling/regulaizers/UniformDocumentOverTopicRegularizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/topicmodeling/regulaizers/UniformDocumentOverTopicRegularizer.scala new file mode 100644 index 0000000000000..01594852b12a1 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/topicmodeling/regulaizers/UniformDocumentOverTopicRegularizer.scala @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering.topicmodeling.regulaizers + +/** + * usage of this prior is equivalent to use of no prior + */ +class UniformDocumentOverTopicRegularizer extends DocumentOverTopicDistributionRegularizer { + private[mllib] override def apply(theta: Array[Float]): Float = 0 + + private[mllib] override def regularize(theta: Array[Float], oldTheta: Array[Float]): Unit = {} +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/topicmodeling/regulaizers/UniformTopicRegularizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/topicmodeling/regulaizers/UniformTopicRegularizer.scala new file mode 100644 index 0000000000000..136c8f782dc5a --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/topicmodeling/regulaizers/UniformTopicRegularizer.scala @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering.topicmodeling.regulaizers + +/** + * usage of this prior is equivalent to use of no prior + */ +class UniformTopicRegularizer extends TopicsRegularizer { + private[mllib] override def apply(topics: Array[Array[Float]]): Float = 0 + + private[mllib] override def regularize(topics: Array[Array[Float]], + oldTopics: Array[Array[Float]]): Unit = {} +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/TokenEnumerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/TokenEnumerator.scala new file mode 100644 index 0000000000000..3e9b5a218f66e --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/TokenEnumerator.scala @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.feature + +import breeze.linalg.SparseVector +import breeze.util.Index +import org.apache.spark.SparkContext.rddToPairRDDFunctions +import org.apache.spark.rdd.RDD + + + +/** + * This class represents a document in a vector space. Every word in the document is replaced + * with its serial number. + * + * @param tokens Non-zero components correspond to tokens + * Non-zero value equals to the number of time the words is included in the document. + */ +class Document(val tokens: SparseVector[Int]) extends Serializable { + /** + * + * @return number of different tokens in the collection + */ + def alphabetSize = tokens.length +} + +/** + * This class encapsulates string-to-int mapping and can be used for document creation + * @param alphabet string-to-int mapping + */ +class TokenEnumeration private[mllib](private val alphabet : Index[String]) extends Serializable { + + /** + * + * @param rawDocument a sequence of tokens to be transformed + * @return a Document that contains all the tokens + * from rawDocument that are included in the alphabet + */ + def transform(rawDocument: Seq[String]) : Document = { + val wordsMap = rawDocument.map(alphabet.apply) + .filter(_ != -1) + .foldLeft(Map[Int, Int]().withDefaultValue(0))((map, word) => map + (word -> (1 + map(word)))) + + val words = wordsMap.keys.toArray.sorted + + val tokens = new SparseVector[Int](words, words.map(word => wordsMap(word)), alphabet.size) + new Document(tokens) + } +} + +/** + * This object enumerates tokens. It assigns an integer to every token. + * E.g. defines a bijection between set of words and 0 ... (numberOfDifferentTokens - 1) + */ +class TokenEnumerator extends Serializable { + private var rareTokenThreshold : Int = 2 + + /** + * @param rareTokenThreshold tokens that are encountered in the collection less than + * rareTokenThreshold times are omitted. + * Default value: 2 + */ + def setRareTokenThreshold(rareTokenThreshold : Int) = { + this.rareTokenThreshold = rareTokenThreshold + this + } + + /** + * + * @param rawDocuments RDD of tokenized documents (every document is a sequence of tokens + * (Strings) ) + * @return a TokenEnumeration + */ + def apply(rawDocuments: RDD[Seq[String]]) : TokenEnumeration = { + val alphabet = Index(rawDocuments.flatMap(x => x) + .map((_, 1)) + .reduceByKey(_ + _) + .filter(_._2 > rareTokenThreshold) + .collect + .map(_._1)) + + new TokenEnumeration(alphabet) + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/impl/DirichletDistribution.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/impl/DirichletDistribution.scala new file mode 100644 index 0000000000000..7ed28cef099fd --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/impl/DirichletDistribution.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + + +package org.apache.spark.mllib.stat.impl + +import org.apache.commons.math3.special.Gamma + +private[mllib] class DirichletDistribution(private[mllib] val alpha: Float) extends Serializable { + private def logBeta(x: Array[Float]) = { + val n = x.size + n * Gamma.logGamma(alpha) - Gamma.logGamma(n * alpha) + } + + /** + * We need this just because in case of alpha < 1 dirichlet log likelihood is infinite for x st + * x_i = 0 + */ + private val SMALL_VALUE: Double = 0.00001 + + /** + * + * @param x + * @return probability density function at x: Dir(x | alpha) + */ + def logPDF(x: Array[Float]) = + (-logBeta(x) + (alpha - 1) * x.map(xx => math.log(xx + SMALL_VALUE)).sum).toFloat +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/topicmodeling/AbstractTopicModelSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/topicmodeling/AbstractTopicModelSuite.scala new file mode 100644 index 0000000000000..d8fd40f95e4cc --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/topicmodeling/AbstractTopicModelSuite.scala @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering.topicmodeling + +import org.apache.spark.mllib.feature.TokenEnumerator +import org.apache.spark.mllib.util.LocalClusterSparkContext +import org.scalatest.FunSuite + +trait AbstractTopicModelSuite[DocumentParameterType <: DocumentParameters, + GlobalParameterType <: GlobalParameters] extends FunSuite with LocalClusterSparkContext { + + val EPS = 1e-5 + + def testPLSA(plsa: TopicModel[DocumentParameterType,GlobalParameterType]) { + // the data in text form + val rawDocuments = sc.parallelize(Seq("a b a", "x y y z", "a b z x ").map(_.split(" ").toSeq)) + + val tokenIndexer = new TokenEnumerator().setRareTokenThreshold(0) + + // use token indexer to generate tokenIndex + val tokenIndex = tokenIndexer(rawDocuments) + + //broadcast token index + val tokenIndexBC = sc.broadcast(tokenIndex) + + val docs = rawDocuments.map(tokenIndexBC.value.transform) + + // train plsa + val (docParameters, global) = plsa.infer(docs) + + // this matrix is an array of topic distributions over words + val phi = global.phi + + // thus, every its row should sum up to one + for (topic <- phi) assert(doesSumEqualToOne(topic), "phi matrix is not normalized") + + assert(phi.forall(_.forall(_ >= 0f)), "phi matrix is non-non-negative") + + // a distribution of a document over topics (theta) should also sum up to one + for (documentParameter <- docParameters.collect) + assert(doesSumEqualToOne(documentParameter.theta), "theta is not normalized") + + assert(docParameters.collect.forall(_.theta.forall(_ >= 0f)), "theta is not non-negative") + + // let's suppose there are some more documents + val foldInRawDocs = sc.parallelize(Seq("a b b", "x y x x z", "a b b b z c x ") + .map(_.split(" ").toSeq)) + + // numerate them with the same token index + val foldInDocs = foldInRawDocs.map(tokenIndexBC.value.transform) + + // now fold in these documents + val foldedInDocParameters = plsa.foldIn(foldInDocs, global) + + // the same requirements of non-negativeness and normalization apply + assert(foldedInDocParameters.collect.forall(_.theta.forall(_ >= 0f)), + "theta for folded in docs is not non-negative") + + for (documentParameter <- docParameters.collect) + assert(doesSumEqualToOne(documentParameter.theta), + "theta for folded in docs is not normalized") + } + + private def doesSumEqualToOne(arr: Array[Float]) = math.abs(arr.sum - 1) < EPS + +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/topicmodeling/PLSASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/topicmodeling/PLSASuite.scala new file mode 100644 index 0000000000000..84715c220a661 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/topicmodeling/PLSASuite.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering.topicmodeling + +import java.util.Random + +import org.apache.spark.mllib.clustering.topicmodeling.regulaizers.{SymmetricDirichletDocumentOverTopicDistributionRegularizer, SymmetricDirichletTopicRegularizer} + +class PLSASuite extends AbstractTopicModelSuite[DocumentParameters, GlobalParameters] { + test("feasibility") { + val numberOfTopics = 2 + val numberOfIterations = 10 + + val plsa = new PLSA(sc, + numberOfTopics, + numberOfIterations, + new Random(13), + new SymmetricDirichletDocumentOverTopicDistributionRegularizer(0.2f), + new SymmetricDirichletTopicRegularizer(0.2f)) + + testPLSA(plsa) + } + +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/topicmodeling/RobustPLSASuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/topicmodeling/RobustPLSASuite.scala new file mode 100644 index 0000000000000..9f6122a23661f --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/topicmodeling/RobustPLSASuite.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering.topicmodeling + +import java.util.Random + +import org.apache.spark.mllib.clustering.topicmodeling.regulaizers.{SymmetricDirichletDocumentOverTopicDistributionRegularizer, SymmetricDirichletTopicRegularizer} + +class RobustPLSASuite extends AbstractTopicModelSuite[RobustDocumentParameters, + RobustGlobalParameters] { + test("feasibility") { + val numberOfTopics = 2 + val numberOfIterations = 10 + + val plsa = new RobustPLSA(sc, + numberOfTopics, + numberOfIterations, + new Random(13), + new SymmetricDirichletDocumentOverTopicDistributionRegularizer(0.2f), + new SymmetricDirichletTopicRegularizer(0.2f)) + + testPLSA(plsa) + } + +} \ No newline at end of file