diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala index c57ceba4a997..b5c076645c25 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/LDA.scala @@ -355,7 +355,7 @@ private[clustering] trait LDAParams extends Params with HasFeaturesCol with HasM * :: Experimental :: * Model fitted by [[LDA]]. * - * @param vocabSize Vocabulary size (number of terms or terms in the vocabulary) + * @param vocabSize Vocabulary size (number of terms or words in the vocabulary) * @param sqlContext Used to construct local DataFrames for returning query results */ @Since("1.6.0") diff --git a/python/pyspark/ml/clustering.py b/python/pyspark/ml/clustering.py index 05aa2dfe74b7..e00bd1752804 100644 --- a/python/pyspark/ml/clustering.py +++ b/python/pyspark/ml/clustering.py @@ -22,7 +22,8 @@ from pyspark.mllib.common import inherit_doc __all__ = ['BisectingKMeans', 'BisectingKMeansModel', - 'KMeans', 'KMeansModel'] + 'KMeans', 'KMeansModel', + 'LDA', 'LDAModel', 'LocalLDAModel', 'DistributedLDAModel'] class KMeansModel(JavaModel, JavaMLWritable, JavaMLReadable): @@ -310,6 +311,439 @@ def _create_model(self, java_model): return BisectingKMeansModel(java_model) +class LDAModel(JavaModel): + """ + Latent Dirichlet Allocation (LDA) model. + This abstraction permits for different underlying representations, + including local and distributed data structures. + + .. versionadded:: 2.0.0 + """ + + @since("2.0.0") + def isDistributed(self): + """ + Indicates whether this instance is of type DistributedLDAModel + """ + return self._call_java("isDistributed") + + @since("2.0.0") + def vocabSize(self): + """Vocabulary size (number of terms or words in the vocabulary)""" + return self._call_java("vocabSize") + + @since("2.0.0") + def topicsMatrix(self): + """ + Inferred topics, where each topic is represented by a distribution over terms. + This is a matrix of size vocabSize x k, where each column is a topic. + No guarantees are given about the ordering of the topics. + + WARNING: If this model is actually a :py:attr:`DistributedLDAModel` instance produced by + the Expectation-Maximization ("em") `optimizer`, then this method could involve + collecting a large amount of data to the driver (on the order of vocabSize x k). + """ + return self._call_java("topicsMatrix") + + @since("2.0.0") + def logLikelihood(self, dataset): + """ + Calculates a lower bound on the log likelihood of the entire corpus. + See Equation (16) in the Online LDA paper (Hoffman et al., 2010). + + WARNING: If this model is an instance of :py:attr:`DistributedLDAModel` (produced when + :py:attr:`optimizer` is set to "em"), this involves collecting a large :py:attr:`topicsMatrix` to the + driver. This implementation may be changed in the future. + """ + return self._call_java("logLikelihood", dataset) + + @since("2.0.0") + def logPerplexity(self, dataset): + """ + Calculate an upper bound bound on perplexity. (Lower is better.) + See Equation (16) in the Online LDA paper (Hoffman et al., 2010). + + WARNING: If this model is an instance of :py:attr:`DistributedLDAModel` (produced when + :py:attr:`optimizer` is set to "em"), this involves collecting a large :py:attr:`topicsMatrix` to the + driver. This implementation may be changed in the future. + """ + return self._call_java("logPerplexity", dataset) + + @since("2.0.0") + def describeTopics(self, maxTermsPerTopic=10): + """ + Return the topics described by their top-weighted terms. + """ + return self._call_java("describeTopics", maxTermsPerTopic) + + @since("2.0.0") + def estimatedDocConcentration(self): + """ + Value for :py:attr:`LDA.docConcentration` estimated from data. + If Online LDA was used and :py:attr::`LDA.optimizeDocConcentration` was set to false, + then this returns the fixed (given) value for the :py:attr:`LDA.docConcentration` parameter. + """ + return self._call_java("estimatedDocConcentration") + + @since("2.0.0") + def trainingLogLikelihood(self): + """ + Log likelihood of the observed tokens in the training set, + given the current parameter estimates: + log P(docs | topics, topic distributions for docs, Dirichlet hyperparameters) + + Notes: + - This excludes the prior; for that, use :py:attr::`logPrior`. + - Even with :py:attr::`logPrior`, this is NOT the same as the data log likelihood given + the hyperparameters. + - This is computed from the topic distributions computed during training. If you call + :py:attr::`logLikelihood` on the same training dataset, the topic distributions + will be computed again, possibly giving different results. + """ + return self._call_java("trainingLogLikelihood") + + +class DistributedLDAModel(LDAModel): + """ + Distributed model fitted by :py:attr:`LDA`. + This type of model is currently only produced by Expectation-Maximization (EM). + This model stores the inferred topics, the full training dataset, and the topic distribution + for each training document. + + .. versionadded:: 2.0.0 + """ + def toLocal(self): + return LocalLDAModel(self._call_java("toLocal")) + + @since("2.0.0") + def logPrior(self): + """ + Log probability of the current parameter estimate: + log P(topics, topic distributions for docs | alpha, eta) + """ + return self._call_java("logPrior") + + @since("2.0.0") + def getCheckpointFiles(self): + """ + If using checkpointing and :py:attr:`LDA.keepLastCheckpoint` is set to true, then there may be + saved checkpoint files. This method is provided so that users can manage those files. + + Note that removing the checkpoints can cause failures if a partition is lost and is needed + by certain :py:attr:`DistributedLDAModel` methods. Reference counting will clean up the + checkpoints when this model and derivative data go out of scope. + """ + return self._call_java("getCheckpointFiles") + + +class LocalLDAModel(LDAModel): + """ + Local (non-distributed) model fitted by :py:attr:`LDA`. + This model stores the inferred topics only; it does not store info about the training dataset. + + .. versionadded:: 2.0.0 + """ + pass + + +class LDA(JavaEstimator, HasFeaturesCol, HasMaxIter, HasSeed, HasCheckpointInterval): + """ + Latent Dirichlet Allocation (LDA), a topic model designed for text documents. + Terminology: + + - "term" = "word": an el + - "token": instance of a term appearing in a document + - "topic": multinomial distribution over terms representing some concept + - "document": one piece of text, corresponding to one row in the input data + References: + - Original LDA paper (journal version): + Blei, Ng, and Jordan. "Latent Dirichlet Allocation." JMLR, 2003. + + Input data (featuresCol): + LDA is given a collection of documents as input data, via the featuresCol parameter. + Each document is specified as a :py:attr:`Vector` of length vocabSize, where each entry is the + count for the corresponding term (word) in the document. Feature transformers such as + :py:attr:`Tokenizer` and :py:attr:`CountVectorizer` + can be useful for converting text to word count vectors. + + >>> from pyspark.mllib.linalg import Vectors, SparseVector + >>> from pyspark.ml.clustering import LDA + >>> df = sqlContext.createDataFrame([[1, Vectors.dense([0.0, 1.0])], + ... [2, SparseVector(2, {0: 1.0})],], ["id", "features"]) + >>> lda = LDA(k=2, seed=1, optimizer="em") + >>> model = lda.fit(df) + >>> model.isDistributed() + True + >>> localModel = model.toLocal() + >>> localModel.isDistributed() + False + >>> model.vocabSize() + 2 + >>> model.describeTopics().show() + +-----+-----------+--------------------+ + |topic|termIndices| termWeights| + +-----+-----------+--------------------+ + | 0| [1, 0]|[0.50401530077160...| + | 1| [0, 1]|[0.50401530077160...| + +-----+-----------+--------------------+ + ... + >>> model.topicsMatrix() + DenseMatrix(2, 2, [0.496, 0.504, 0.504, 0.496], 0) + >>> model.estimatedDocConcentration() + DenseVector([26.0, 26.0]) + + .. versionadded:: 2.0.0 + """ + + k = Param(Params._dummy(), "k", "number of topics (clusters) to infer") + optimizer = Param(Params._dummy(), "optimizer", + "Optimizer or inference algorithm used to estimate the LDA model. " + "Supported: online, em") + learningOffset = Param(Params._dummy(), "learningOffset", + "A (positive) learning parameter that downweights early iterations." + " Larger values make early iterations count less") + learningDecay = Param(Params._dummy(), "learningDecay", "Learning rate, set as an" + "exponential decay rate. This should be between (0.5, 1.0] to " + "guarantee asymptotic convergence.") + subsamplingRate = Param(Params._dummy(), "subsamplingRate", + "Fraction of the corpus to be sampled and used in each iteration " + "of mini-batch gradient descent, in range (0, 1].") + optimizeDocConcentration = Param(Params._dummy(), "optimizeDocConcentration", + "Indicates whether the docConcentration (Dirichlet parameter " + "for document-topic distribution) will be optimized during " + "training.") + docConcentration = Param(Params._dummy(), "docConcentration", + "Concentration parameter (commonly named \"alpha\") for the " + "prior placed on documents' distributions over topics (\"theta\").") + topicConcentration = Param(Params._dummy(), "topicConcentration", + "Concentration parameter (commonly named \"beta\" or \"eta\") for " + "the prior placed on topic' distributions over terms.") + topicDistributionCol = Param(Params._dummy(), "topicDistributionCol", + "Output column with estimates of the topic mixture distribution " + "for each document (often called \"theta\" in the literature). " + "Returns a vector of zeros for an empty document.") + + @keyword_only + def __init__(self, featuresCol="features", k=10, + optimizer="online", learningOffset=1024.0, learningDecay=0.51, + subsamplingRate=0.05, optimizeDocConcentration=True, + checkpointInterval=10, maxIter=20, docConcentration=None, + topicConcentration=None, topicDistributionCol="topicDistribution", seed=None): + """ + __init__(self, featuresCol="features", k=10, \ + optimizer="online", learningOffset=1024.0, learningDecay=0.51, \ + subsamplingRate=0.05, optimizeDocConcentration=True, \ + checkpointInterval=10, maxIter=20, docConcentration=None, \ + topicConcentration=None, topicDistributionCol="topicDistribution", seed=None): + """ + super(LDA, self).__init__() + self._java_obj = self._new_java_obj("org.apache.spark.ml.clustering.LDA", self.uid) + self._setDefault(k=10, optimizer="online", learningOffset=1024.0, learningDecay=0.51, + subsamplingRate=0.05, optimizeDocConcentration=True, + checkpointInterval=10, maxIter=20) + kwargs = self.__init__._input_kwargs + self.setParams(**kwargs) + + def _create_model(self, java_model): + if self.getOptimizer() == "em": + return DistributedLDAModel(java_model) + else: + return LocalLDAModel(java_model) + + @keyword_only + @since("2.0.0") + def setParams(self, featuresCol="features", k=10, + optimizer="online", learningOffset=1024.0, learningDecay=0.51, + subsamplingRate=0.05, optimizeDocConcentration=True, + checkpointInterval=10, maxIter=20, docConcentration=None, + topicConcentration=None, + topicDistributionCol="topicDistribution", seed=None): + """ + setParams(self, featuresCol="features", k=10, \ + optimizer="online", learningOffset=1024.0, learningDecay=0.51, \ + subsamplingRate=0.05, optimizeDocConcentration=True, \ + checkpointInterval=10, maxIter=20, docConcentration=None, \ + topicConcentration=None, \ + topicDistributionCol="topicDistribution", seed=None): + + Sets params for LDA. + """ + kwargs = self.setParams._input_kwargs + return self._set(**kwargs) + + @since("2.0.0") + def setK(self, value): + """ + Sets the value of :py:attr:`k`. + + >>> algo = LDA().setK(10) + >>> algo.getK() + 10 + """ + self._paramMap[self.k] = value + return self + + @since("2.0.0") + def getK(self): + """ + Gets the value of :py:attr:`k` or its default value. + """ + return self.getOrDefault(self.k) + + @since("2.0.0") + def setOptimizer(self, value): + """ + Sets the value of :py:attr:`optimizer`. + Currenlty only support 'em' and 'online'. + + >>> algo = LDA().setOptimizer("em") + >>> algo.getOptimizer() + 'em' + """ + self._paramMap[self.optimizer] = value + return self + + @since("2.0.0") + def getOptimizer(self): + """ + Gets the value of :py:attr:`optimizer` or its default value. + """ + return self.getOrDefault(self.optimizer) + + @since("2.0.0") + def setLearningOffset(self, value): + """ + Sets the value of :py:attr:`learningOffset`. + + >>> algo = LDA().setLearningOffset(100) + >>> algo.getLearningOffset() + 100 + """ + self._paramMap[self.learningOffset] = value + return self + + @since("2.0.0") + def getLearningOffset(self): + """ + Gets the value of :py:attr:`learningOffset` or its default value. + """ + return self.getOrDefault(self.learningOffset) + + @since("2.0.0") + def setLearningDecay(self, value): + """ + Sets the value of :py:attr:`learningDecay`. + + >>> algo = LDA().setLearningDecay(0.1) + >>> algo.getLearningDecay() + 0.1... + """ + self._paramMap[self.learningDecay] = value + return self + + @since("2.0.0") + def getLearningDecay(self): + """ + Gets the value of :py:attr:`learningDecay` or its default value. + """ + return self.getOrDefault(self.learningDecay) + + @since("2.0.0") + def setSubsamplingRate(self, value): + """ + Sets the value of :py:attr:`subsamplingRate`. + + >>> algo = LDA().setSubsamplingRate(0.1) + >>> algo.getSubsamplingRate() + 0.1... + """ + self._paramMap[self.subsamplingRate] = value + return self + + @since("2.0.0") + def getSubsamplingRate(self): + """ + Gets the value of :py:attr:`subsamplingRate` or its default value. + """ + return self.getOrDefault(self.subsamplingRate) + + @since("2.0.0") + def setOptimizeDocConcentration(self, value): + """ + Sets the value of :py:attr:`optimizeDocConcentration`. + + >>> algo = LDA().setOptimizeDocConcentration(True) + >>> algo.getOptimizeDocConcentration() + True + """ + self._paramMap[self.optimizeDocConcentration] = value + return self + + @since("2.0.0") + def getOptimizeDocConcentration(self): + """ + Gets the value of :py:attr:`optimizeDocConcentration` or its default value. + """ + return self.getOrDefault(self.optimizeDocConcentration) + + @since("2.0.0") + def setDocConcentration(self, value): + """ + Sets the value of :py:attr:`docConcentration`. + + >>> algo = LDA().setDocConcentration([0.1, 0.2]) + >>> algo.getDocConcentration() + [0.1..., 0.2...] + """ + self._paramMap[self.docConcentration] = value + return self + + @since("2.0.0") + def getDocConcentration(self): + """ + Gets the value of :py:attr:`docConcentration` or its default value. + """ + return self.getOrDefault(self.docConcentration) + + @since("2.0.0") + def setTopicConcentration(self, value): + """ + Sets the value of :py:attr:`topicConcentration`. + + >>> algo = LDA().setTopicConcentration(0.5) + >>> algo.getTopicConcentration() + 0.5... + """ + self._paramMap[self.topicConcentration] = value + return self + + @since("2.0.0") + def getTopicConcentration(self): + """ + Gets the value of :py:attr:`topicConcentration` or its default value. + """ + return self.getOrDefault(self.topicConcentration) + + @since("2.0.0") + def setTopicDistributionCol(self, value): + """ + Sets the value of :py:attr:`topicDistributionCol`. + + >>> algo = LDA().setTopicDistributionCol("topicDistributionCol") + >>> algo.getTopicDistributionCol() + 'topicDistributionCol' + """ + self._paramMap[self.topicDistributionCol] = value + return self + + @since("2.0.0") + def getTopicDistributionCol(self): + """ + Gets the value of :py:attr:`topicDistributionCol` or its default value. + """ + return self.getOrDefault(self.topicDistributionCol) + + if __name__ == "__main__": import doctest import pyspark.ml.clustering