Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,6 @@ abstract class LDAModel private[clustering] extends Saveable {
* This model stores only the inferred topics.
* It may be used for computing topics for new documents, but it may give less accurate answers
* than the [[DistributedLDAModel]].
*
* @param topics Inferred topics (vocabSize x k matrix).
*/
@Experimental
Expand Down Expand Up @@ -221,9 +220,6 @@ class LocalLDAModel private[clustering] (
// TODO
// override def logLikelihood(documents: RDD[(Long, Vector)]): Double = ???

// TODO:
// override def topicDistributions(documents: RDD[(Long, Vector)]): RDD[(Long, Vector)] = ???

/**
* Calculate the log variational bound on perplexity. See Equation (16) in original Online
* LDA paper.
Expand Down Expand Up @@ -269,15 +265,15 @@ class LocalLDAModel private[clustering] (
// by topic (columns of lambda)
val Elogbeta = LDAUtils.dirichletExpectation(lambda.t).t

var score = documents.filter(_._2.numActives > 0).map { case (id: Long, termCounts: Vector) =>
var score = documents.filter(_._2.numNonzeros > 0).map { case (id: Long, termCounts: Vector) =>
var docScore = 0.0D
val (gammad: BDV[Double], _) = OnlineLDAOptimizer.variationalTopicInference(
termCounts, exp(Elogbeta), brzAlpha, gammaShape, k)
val Elogthetad: BDV[Double] = LDAUtils.dirichletExpectation(gammad)

// E[log p(doc | theta, beta)]
termCounts.foreachActive { case (idx, count) =>
docScore += LDAUtils.logSumExp(Elogthetad + Elogbeta(idx, ::).t)
docScore += count * LDAUtils.logSumExp(Elogthetad + Elogbeta(idx, ::).t)
}
// E[log p(theta | alpha) - log q(theta | gamma)]; assumes alpha is a vector
docScore += sum((brzAlpha - gammad) :* Elogthetad)
Expand All @@ -297,6 +293,40 @@ class LocalLDAModel private[clustering] (
score
}

/**
* Predicts the topic mixture distribution for each document (often called "theta" in the
* literature). Returns a vector of zeros for an empty document.
*
* This uses a variational approximation following Hoffman et al. (2010), where the approximate
* distribution is called "gamma." Technically, this method returns this approximation "gamma"
* for each document.
* @param documents documents to predict topic mixture distributions for
* @return An RDD of (document ID, topic mixture distribution for document)
*/
// TODO: declare in LDAModel and override once implemented in DistributedLDAModel
def topicDistributions(documents: RDD[(Long, Vector)]): RDD[(Long, Vector)] = {
// Double transpose because dirichletExpectation normalizes by row and we need to normalize
// by topic (columns of lambda)
val expElogbeta = exp(LDAUtils.dirichletExpectation(topicsMatrix.toBreeze.toDenseMatrix.t).t)
val docConcentrationBrz = this.docConcentration.toBreeze
val gammaShape = this.gammaShape
val k = this.k

documents.map { case (id: Long, termCounts: Vector) =>
if (termCounts.numNonzeros == 0) {
(id, Vectors.zeros(k))
} else {
val (gamma, _) = OnlineLDAOptimizer.variationalTopicInference(
termCounts,
expElogbeta,
docConcentrationBrz,
gammaShape,
k)
(id, Vectors.dense(normalize(gamma, 1.0).toArray))
}
}
}

}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
val gammaShape = this.gammaShape

val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions { docs =>
val nonEmptyDocs = docs.filter(_._2.numActives > 0)
val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)

val stat = BDM.zeros[Double](k, vocabSize)
var gammaPart = List[BDV[Double]]()
Expand Down Expand Up @@ -461,7 +461,8 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
private[clustering] object OnlineLDAOptimizer {
/**
* Uses variational inference to infer the topic distribution `gammad` given the term counts
* for a document. `termCounts` must be non-empty, otherwise Breeze will throw a BLAS error.
* for a document. `termCounts` must contain at least one non-zero entry, otherwise Breeze will
* throw a BLAS error.
*
* An optimization (Lee, Seung: Algorithms for non-negative matrix factorization, NIPS 2001)
* avoids explicit computation of variational parameter `phi`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,7 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext {
val alpha = 0.01
val eta = 0.01
val gammaShape = 100
// obtained from LDA model trained in gensim, see below
val topics = new DenseMatrix(numRows = vocabSize, numCols = k, values = Array(
1.86738052, 1.94056535, 1.89981687, 0.0833265, 0.07405918, 0.07940597,
0.15081551, 0.08637973, 0.12428538, 1.9474897, 1.94615165, 1.95204124))
Expand Down Expand Up @@ -281,6 +282,68 @@ class LDASuite extends SparkFunSuite with MLlibTestSparkContext {
assert(ldaModel.logPerplexity(docs) ~== -3.690D relTol 1E-3D)
}

test("LocalLDAModel predict") {
val k = 2
val vocabSize = 6
val alpha = 0.01
val eta = 0.01
val gammaShape = 100
// obtained from LDA model trained in gensim, see below
val topics = new DenseMatrix(numRows = vocabSize, numCols = k, values = Array(
1.86738052, 1.94056535, 1.89981687, 0.0833265, 0.07405918, 0.07940597,
0.15081551, 0.08637973, 0.12428538, 1.9474897, 1.94615165, 1.95204124))

def toydata: Array[(Long, Vector)] = Array(
Vectors.sparse(6, Array(0, 1), Array(1, 1)),
Vectors.sparse(6, Array(1, 2), Array(1, 1)),
Vectors.sparse(6, Array(0, 2), Array(1, 1)),
Vectors.sparse(6, Array(3, 4), Array(1, 1)),
Vectors.sparse(6, Array(3, 5), Array(1, 1)),
Vectors.sparse(6, Array(4, 5), Array(1, 1))
).zipWithIndex.map { case (wordCounts, docId) => (docId.toLong, wordCounts) }
val docs = sc.parallelize(toydata)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra line

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

val ldaModel: LocalLDAModel = new LocalLDAModel(
topics, Vectors.dense(Array.fill(k)(alpha)), eta, gammaShape)

/* Verify results using gensim:
import numpy as np
from gensim import models
corpus = [
[(0, 1.0), (1, 1.0)],
[(1, 1.0), (2, 1.0)],
[(0, 1.0), (2, 1.0)],
[(3, 1.0), (4, 1.0)],
[(3, 1.0), (5, 1.0)],
[(4, 1.0), (5, 1.0)]]
np.random.seed(2345)
lda = models.ldamodel.LdaModel(
corpus=corpus, alpha=0.01, eta=0.01, num_topics=2, update_every=0, passes=100,
decay=0.51, offset=1024)
print(list(lda.get_document_topics(corpus)))
> [[(0, 0.99504950495049516)], [(0, 0.99504950495049516)],
> [(0, 0.99504950495049516)], [(1, 0.99504950495049516)],
> [(1, 0.99504950495049516)], [(1, 0.99504950495049516)]]
*/

val expectedPredictions = List(
(0, 0.99504), (0, 0.99504),
(0, 0.99504), (1, 0.99504),
(1, 0.99504), (1, 0.99504))

val actualPredictions = ldaModel.topicDistributions(docs).map { case (id, topics) =>
// convert results to expectedPredictions format, which only has highest probability topic
val topicsBz = topics.toBreeze.toDenseVector
(id, (argmax(topicsBz), max(topicsBz)))
}.sortByKey()
.values
.collect()

expectedPredictions.zip(actualPredictions).forall { case (expected, actual) =>
expected._1 === actual._1 && (expected._2 ~== actual._2 relTol 1E-3D)
}
}

test("OnlineLDAOptimizer with asymmetric prior") {
def toydata: Array[(Long, Vector)] = Array(
Vectors.sparse(6, Array(0, 1), Array(1, 1)),
Expand Down