Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 64 additions & 35 deletions mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala
Original file line number Diff line number Diff line change
Expand Up @@ -144,45 +144,13 @@ class PrefixSpan private (
logInfo(s"minimum count for a frequent pattern: $minCount")

// Find frequent items.
val freqItemAndCounts = data.flatMap { itemsets =>
val uniqItems = mutable.Set.empty[Item]
itemsets.foreach { _.foreach { item =>
uniqItems += item
}}
uniqItems.toIterator.map((_, 1L))
}.reduceByKey(_ + _)
.filter { case (_, count) =>
count >= minCount
}.collect()
val freqItems = freqItemAndCounts.sortBy(-_._2).map(_._1)
val freqItems = findFrequentItems(data, minCount)
logInfo(s"number of frequent items: ${freqItems.length}")

// Keep only frequent items from input sequences and convert them to internal storage.
val itemToInt = freqItems.zipWithIndex.toMap
val dataInternalRepr = data.flatMap { itemsets =>
val allItems = mutable.ArrayBuilder.make[Int]
var containsFreqItems = false
allItems += 0
itemsets.foreach { itemsets =>
val items = mutable.ArrayBuilder.make[Int]
itemsets.foreach { item =>
if (itemToInt.contains(item)) {
items += itemToInt(item) + 1 // using 1-indexing in internal format
}
}
val result = items.result()
if (result.nonEmpty) {
containsFreqItems = true
allItems ++= result.sorted
}
allItems += 0
}
if (containsFreqItems) {
Iterator.single(allItems.result())
} else {
Iterator.empty
}
}.persist(StorageLevel.MEMORY_AND_DISK)
val dataInternalRepr = toDatabaseInternalRepr(data, itemToInt)
.persist(StorageLevel.MEMORY_AND_DISK)

val results = genFreqPatterns(dataInternalRepr, minCount, maxPatternLength, maxLocalProjDBSize)

Expand Down Expand Up @@ -231,6 +199,67 @@ class PrefixSpan private (
@Since("1.5.0")
object PrefixSpan extends Logging {

/**
* This methods finds all frequent items in a input dataset.
*
* @param data Sequences of itemsets.
* @param minCount The minimal number of sequence an item should be present in to be frequent
*
* @return An array of Item containing only frequent items.
*/
private[fpm] def findFrequentItems[Item: ClassTag](
data: RDD[Array[Array[Item]]],
minCount: Long): Array[Item] = {

data.flatMap { itemsets =>
val uniqItems = mutable.Set.empty[Item]
itemsets.foreach(set => uniqItems ++= set)
uniqItems.toIterator.map((_, 1L))
}.reduceByKey(_ + _).filter { case (_, count) =>
count >= minCount
}.sortBy(-_._2).map(_._1).collect()
}

/**
* This methods cleans the input dataset from un-frequent items, and translate it's item
* to their corresponding Int identifier.
*
* @param data Sequences of itemsets.
* @param itemToInt A map allowing translation of frequent Items to their Int Identifier.
* The map should only contain frequent item.
*
* @return The internal repr of the inputted dataset. With properly placed zero delimiter.
*/
private[fpm] def toDatabaseInternalRepr[Item: ClassTag](
data: RDD[Array[Array[Item]]],
itemToInt: Map[Item, Int]): RDD[Array[Int]] = {

data.flatMap { itemsets =>
val allItems = mutable.ArrayBuilder.make[Int]
var containsFreqItems = false
allItems += 0
itemsets.foreach { itemsets =>
val items = mutable.ArrayBuilder.make[Int]
itemsets.foreach { item =>
if (itemToInt.contains(item)) {
items += itemToInt(item) + 1 // using 1-indexing in internal format
}
}
val result = items.result()
if (result.nonEmpty) {
containsFreqItems = true
allItems ++= result.sorted
allItems += 0
}
}
if (containsFreqItems) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the same as checking allItems.size > 1 now, rather than maintain a flag?

Copy link
Contributor Author

@Syrux Syrux Apr 10, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but allItems is an arrayBuilder, so there is no size method.
I could change it to "if(allIItems.result().size > 1)" but I think the performance might be worse than a flag. If you still want me to change it, I will make a new commit.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. What about waiting to pre-pend the initial 0 until the end, only if not empty?

Copy link
Contributor Author

@Syrux Syrux Apr 10, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure about the performance of a prepend on arrayBuilder. I will check them first. Back in a few minutes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apparently, prepending is impossible on an arrayBuilder. The method doesn't exist (http://www.scala-lang.org/api/2.12.0/scala/collection/mutable/ArrayBuilder.html).
I think the flag is our best bet for performance. Changing it to an arrayBuffer would be far worse since a type encapsulation would be forced on the ints it contain.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK no problem, leave it. Just riffing while we're editing the code.

Iterator.single(allItems.result())
} else {
Iterator.empty
}
}
}

/**
* Find the complete set of frequent sequential patterns in the input sequences.
* @param data ordered sequences of itemsets. We represent a sequence internally as Array[Int],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,49 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext {
compareResults(expected, model.freqSequences.collect())
}

test("PrefixSpan pre-processing's cleaning test") {

// One item per itemSet
val itemToInt1 = (4 to 5).zipWithIndex.toMap
val sequences1 = Seq(
Array(Array(4), Array(1), Array(2), Array(5), Array(2), Array(4), Array(5)),
Array(Array(6), Array(7), Array(8)))
val rdd1 = sc.parallelize(sequences1, 2).cache()

val cleanedSequence1 = PrefixSpan.toDatabaseInternalRepr(rdd1, itemToInt1).collect()

val expected1 = Array(Array(0, 4, 0, 5, 0, 4, 0, 5, 0))
.map(_.map(x => if (x == 0) 0 else itemToInt1(x) + 1))

compareInternalSequences(expected1, cleanedSequence1)

// Multi-item sequence
val itemToInt2 = (4 to 6).zipWithIndex.toMap
val sequences2 = Seq(
Array(Array(4, 5), Array(1, 6, 2), Array(2), Array(5), Array(2), Array(4), Array(5, 6, 7)),
Array(Array(8, 9), Array(1, 2)))
val rdd2 = sc.parallelize(sequences2, 2).cache()

val cleanedSequence2 = PrefixSpan.toDatabaseInternalRepr(rdd2, itemToInt2).collect()

val expected2 = Array(Array(0, 4, 5, 0, 6, 0, 5, 0, 4, 0, 5, 6, 0))
.map(_.map(x => if (x == 0) 0 else itemToInt2(x) + 1))

compareInternalSequences(expected2, cleanedSequence2)

// Emptied sequence
val itemToInt3 = (10 to 10).zipWithIndex.toMap
val sequences3 = Seq(
Array(Array(4, 5), Array(1, 6, 2), Array(2), Array(5), Array(2), Array(4), Array(5, 6, 7)),
Array(Array(8, 9), Array(1, 2)))
val rdd3 = sc.parallelize(sequences3, 2).cache()

val cleanedSequence3 = PrefixSpan.toDatabaseInternalRepr(rdd3, itemToInt3).collect()
val expected3 = Array[Array[Int]]()

compareInternalSequences(expected3, cleanedSequence3)
}

test("model save/load") {
val sequences = Seq(
Array(Array(1, 2), Array(3)),
Expand Down Expand Up @@ -409,4 +452,12 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext {
val actualSet = actualValue.map(x => (x._1.toSeq, x._2)).toSet
assert(expectedSet === actualSet)
}

private def compareInternalSequences(
expectedValue: Array[Array[Int]],
actualValue: Array[Array[Int]]): Unit = {
val expectedSet = expectedValue.map(x => x.toSeq).toSet
val actualSet = actualValue.map(x => x.toSeq).toSet
assert(expectedSet === actualSet)
}
}