From 7af4945fbfb309f7a7784cba2b1fc4cb4945fba0 Mon Sep 17 00:00:00 2001 From: Syrux Date: Sat, 8 Apr 2017 12:17:04 +0200 Subject: [PATCH 1/6] [SPARK-20265][MLlib] Improve Prefix'span pre-processing efficiency --- .../src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala index 327cb974ef96..776e55cd37d9 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala @@ -174,8 +174,8 @@ class PrefixSpan private ( if (result.nonEmpty) { containsFreqItems = true allItems ++= result.sorted + allItems += 0 } - allItems += 0 } if (containsFreqItems) { Iterator.single(allItems.result()) From 8e5db6af95545121d379dbca83bedc23cbd5e6c0 Mon Sep 17 00:00:00 2001 From: Syrux Date: Sat, 8 Apr 2017 21:30:31 +0200 Subject: [PATCH 2/6] Separating cleaning methods in standalone methods + Tests for cleaning correctness --- .../apache/spark/mllib/fpm/PrefixSpan.scala | 100 ++++++++++++------ .../spark/mllib/fpm/PrefixSpanSuite.scala | 57 ++++++++++ 2 files changed, 122 insertions(+), 35 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala index 776e55cd37d9..3a3882fe30d5 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala @@ -144,45 +144,13 @@ class PrefixSpan private ( logInfo(s"minimum count for a frequent pattern: $minCount") // Find frequent items. - val freqItemAndCounts = data.flatMap { itemsets => - val uniqItems = mutable.Set.empty[Item] - itemsets.foreach { _.foreach { item => - uniqItems += item - }} - uniqItems.toIterator.map((_, 1L)) - }.reduceByKey(_ + _) - .filter { case (_, count) => - count >= minCount - }.collect() - val freqItems = freqItemAndCounts.sortBy(-_._2).map(_._1) + val freqItems = findFrequentItems(data, minCount) logInfo(s"number of frequent items: ${freqItems.length}") // Keep only frequent items from input sequences and convert them to internal storage. val itemToInt = freqItems.zipWithIndex.toMap - val dataInternalRepr = data.flatMap { itemsets => - val allItems = mutable.ArrayBuilder.make[Int] - var containsFreqItems = false - allItems += 0 - itemsets.foreach { itemsets => - val items = mutable.ArrayBuilder.make[Int] - itemsets.foreach { item => - if (itemToInt.contains(item)) { - items += itemToInt(item) + 1 // using 1-indexing in internal format - } - } - val result = items.result() - if (result.nonEmpty) { - containsFreqItems = true - allItems ++= result.sorted - allItems += 0 - } - } - if (containsFreqItems) { - Iterator.single(allItems.result()) - } else { - Iterator.empty - } - }.persist(StorageLevel.MEMORY_AND_DISK) + val dataInternalRepr = toDatabaseInternalRepr(data, itemToInt) + .persist(StorageLevel.MEMORY_AND_DISK) val results = genFreqPatterns(dataInternalRepr, minCount, maxPatternLength, maxLocalProjDBSize) @@ -231,6 +199,68 @@ class PrefixSpan private ( @Since("1.5.0") object PrefixSpan extends Logging { + /** + * This methods finds all frequent items in a input dataset. + * + * @param data Sequences of itemsets. + * @param minCount The minimal number of sequence an item should be present in to be frequent + * + * @return An array of Item containing only frequent items. + */ + private[fpm] def findFrequentItems[Item: ClassTag](data : RDD[Array[Array[Item]]], + minCount : Long): Array[Item] = { + + data.flatMap { itemsets => + val uniqItems = mutable.Set.empty[Item] + itemsets.foreach { _.foreach { item => + uniqItems += item + }} + uniqItems.toIterator.map((_, 1L)) + }.reduceByKey(_ + _).filter { case (_, count) => + count >= minCount + }.sortBy(-_._2).map(_._1).collect() + } + + /** + * This methods cleans the input dataset from un-frequent items, and translate it's item + * to their corresponding Int identifier. + * + * @param data Sequences of itemsets. + * @param itemToInt A map allowing translation of frequent Items to their Int Identifier. + * The map should only contain frequent item. + * + * @return The internal repr of the inputted dataset. With properly placed zero delimiter. + */ + private[fpm] def toDatabaseInternalRepr[Item: ClassTag](data : RDD[Array[Array[Item]]], + itemToInt : Map[Item, Int]): + RDD[Array[Int]] = { + + data.flatMap { itemsets => + val allItems = mutable.ArrayBuilder.make[Int] + var containsFreqItems = false + allItems += 0 + itemsets.foreach { itemsets => + val items = mutable.ArrayBuilder.make[Int] + itemsets.foreach { item => + if (itemToInt.contains(item)) { + items += itemToInt(item) + 1 // using 1-indexing in internal format + } + } + val result = items.result() + if (result.nonEmpty) { + containsFreqItems = true + allItems ++= result.sorted + allItems += 0 + } + } + if (containsFreqItems) { + Iterator.single(allItems.result()) + } else { + Iterator.empty + } + } + } + /** * Find the complete set of frequent sequential patterns in the input sequences. * @param data ordered sequences of itemsets. We represent a sequence internally as Array[Int], diff --git a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala index 4c2376376dd2..3fa703b8e362 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala @@ -360,6 +360,55 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext { compareResults(expected, model.freqSequences.collect()) } + test("PrefixSpan pre-processing's cleaning test") { + + // One item per itemSet + val itemToInt1 = (4 to 5).zipWithIndex.toMap + val sequences1 = Seq( + Array(Array(4), Array(1), Array(2), Array(5), Array(2), Array(4), Array(5)), + Array(Array(6), Array(7), Array(8))) + val rdd1 = sc.parallelize(sequences1, 2).cache() + + val cleanedSequence1 = PrefixSpan.toDatabaseInternalRepr(rdd1, itemToInt1).collect() + + val expected1 = Array(Array(0, 4, 0, 5, 0, 4, 0, 5, 0)) + .map(x => x.map(y => { + if (y == 0) 0 + else itemToInt1(y) + 1 + })) + + compareInternalSequences(expected1, cleanedSequence1) + + // Multi-item sequence + val itemToInt2 = (4 to 6).zipWithIndex.toMap + val sequences2 = Seq( + Array(Array(4, 5), Array(1, 6, 2), Array(2), Array(5), Array(2), Array(4), Array(5, 6, 7)), + Array(Array(8, 9), Array(1, 2))) + val rdd2 = sc.parallelize(sequences2, 2).cache() + + val cleanedSequence2 = PrefixSpan.toDatabaseInternalRepr(rdd2, itemToInt2).collect() + + val expected2 = Array(Array(0, 4, 5, 0, 6, 0, 5, 0, 4, 0, 5, 6, 0)) + .map(x => x.map(y => { + if (y == 0) 0 + else itemToInt2(y) + 1 + })) + + compareInternalSequences(expected2, cleanedSequence2) + + // Emptied sequence + val itemToInt3 = (10 to 10).zipWithIndex.toMap + val sequences3 = Seq( + Array(Array(4, 5), Array(1, 6, 2), Array(2), Array(5), Array(2), Array(4), Array(5, 6, 7)), + Array(Array(8, 9), Array(1, 2))) + val rdd3 = sc.parallelize(sequences3, 2).cache() + + val cleanedSequence3 = PrefixSpan.toDatabaseInternalRepr(rdd3, itemToInt3).collect() + val expected3: Array[Array[Int]] = Array() + + compareInternalSequences(expected3, cleanedSequence3) + } + test("model save/load") { val sequences = Seq( Array(Array(1, 2), Array(3)), @@ -409,4 +458,12 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext { val actualSet = actualValue.map(x => (x._1.toSeq, x._2)).toSet assert(expectedSet === actualSet) } + + private def compareInternalSequences( + expectedValue: Array[Array[Int]], + actualValue: Array[Array[Int]]): Unit = { + val expectedSet = expectedValue.map(x => x.toSeq).toSet + val actualSet = actualValue.map(x => x.toSeq).toSet + assert(expectedSet === actualSet) + } } From 47bd983eebe44bc3f4b1f0c26a59ae3db507aeb0 Mon Sep 17 00:00:00 2001 From: Syrux Date: Mon, 10 Apr 2017 16:11:32 +0200 Subject: [PATCH 3/6] Small corrections, done as requested --- .../org/apache/spark/mllib/fpm/PrefixSpan.scala | 13 ++++++------- .../apache/spark/mllib/fpm/PrefixSpanSuite.scala | 12 +++--------- 2 files changed, 9 insertions(+), 16 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala index 3a3882fe30d5..ac94d7efd0d3 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala @@ -207,14 +207,12 @@ object PrefixSpan extends Logging { * * @return An array of Item containing only frequent items. */ - private[fpm] def findFrequentItems[Item: ClassTag](data : RDD[Array[Array[Item]]], - minCount : Long): Array[Item] = { + private[fpm] def findFrequentItems[Item: ClassTag](data: RDD[Array[Array[Item]]], + minCount: Long): Array[Item] = { data.flatMap { itemsets => val uniqItems = mutable.Set.empty[Item] - itemsets.foreach { _.foreach { item => - uniqItems += item - }} + itemsets.foreach ( _.foreach (item => uniqItems += item)) uniqItems.toIterator.map((_, 1L)) }.reduceByKey(_ + _).filter { case (_, count) => count >= minCount @@ -231,8 +229,9 @@ object PrefixSpan extends Logging { * * @return The internal repr of the inputted dataset. With properly placed zero delimiter. */ - private[fpm] def toDatabaseInternalRepr[Item: ClassTag](data : RDD[Array[Array[Item]]], - itemToInt : Map[Item, Int]): + private[fpm] def toDatabaseInternalRepr[Item: ClassTag]( + data: RDD[Array[Array[Item]]], + itemToInt: Map[Item, Int]): RDD[Array[Int]] = { data.flatMap { itemsets => diff --git a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala index 3fa703b8e362..c2e08d078fc1 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala @@ -372,10 +372,7 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext { val cleanedSequence1 = PrefixSpan.toDatabaseInternalRepr(rdd1, itemToInt1).collect() val expected1 = Array(Array(0, 4, 0, 5, 0, 4, 0, 5, 0)) - .map(x => x.map(y => { - if (y == 0) 0 - else itemToInt1(y) + 1 - })) + .map(_.map(x => if (x == 0) 0 else itemToInt1(x) + 1)) compareInternalSequences(expected1, cleanedSequence1) @@ -389,10 +386,7 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext { val cleanedSequence2 = PrefixSpan.toDatabaseInternalRepr(rdd2, itemToInt2).collect() val expected2 = Array(Array(0, 4, 5, 0, 6, 0, 5, 0, 4, 0, 5, 6, 0)) - .map(x => x.map(y => { - if (y == 0) 0 - else itemToInt2(y) + 1 - })) + .map(_.map(x => if (x == 0) 0 else itemToInt2(x) + 1)) compareInternalSequences(expected2, cleanedSequence2) @@ -404,7 +398,7 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext { val rdd3 = sc.parallelize(sequences3, 2).cache() val cleanedSequence3 = PrefixSpan.toDatabaseInternalRepr(rdd3, itemToInt3).collect() - val expected3: Array[Array[Int]] = Array() + val expected3 = Array[Array[Int]]() compareInternalSequences(expected3, cleanedSequence3) } From 25ece478326d2e220dfedaf1590689508cfcd9ac Mon Sep 17 00:00:00 2001 From: Syrux Date: Mon, 10 Apr 2017 16:15:44 +0200 Subject: [PATCH 4/6] I had forgotten a four space indent, fixed it --- .../main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala index ac94d7efd0d3..87f410a0a58a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala @@ -207,8 +207,10 @@ object PrefixSpan extends Logging { * * @return An array of Item containing only frequent items. */ - private[fpm] def findFrequentItems[Item: ClassTag](data: RDD[Array[Array[Item]]], - minCount: Long): Array[Item] = { + private[fpm] def findFrequentItems[Item: ClassTag]( + data: RDD[Array[Array[Item]]], + minCount: Long): + Array[Item] = { data.flatMap { itemsets => val uniqItems = mutable.Set.empty[Item] From 627bfe0b04180a9c7248df6a3af519f15261faa6 Mon Sep 17 00:00:00 2001 From: Syrux Date: Mon, 10 Apr 2017 16:37:03 +0200 Subject: [PATCH 5/6] Changing to itemsets.foreach(set => uniqItems ++= set) --- .../src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala index 87f410a0a58a..4797ab1a8b5c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala @@ -214,7 +214,7 @@ object PrefixSpan extends Logging { data.flatMap { itemsets => val uniqItems = mutable.Set.empty[Item] - itemsets.foreach ( _.foreach (item => uniqItems += item)) + itemsets.foreach(set => uniqItems ++= set) uniqItems.toIterator.map((_, 1L)) }.reduceByKey(_ + _).filter { case (_, count) => count >= minCount From d799d460e215c017b4385e8ecbbca8b92128096a Mon Sep 17 00:00:00 2001 From: Syrux Date: Tue, 11 Apr 2017 10:23:15 +0200 Subject: [PATCH 6/6] Small changes in code style --- .../scala/org/apache/spark/mllib/fpm/PrefixSpan.scala | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala index 4797ab1a8b5c..3f8d65a378e2 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala @@ -209,15 +209,14 @@ object PrefixSpan extends Logging { */ private[fpm] def findFrequentItems[Item: ClassTag]( data: RDD[Array[Array[Item]]], - minCount: Long): - Array[Item] = { + minCount: Long): Array[Item] = { data.flatMap { itemsets => val uniqItems = mutable.Set.empty[Item] itemsets.foreach(set => uniqItems ++= set) uniqItems.toIterator.map((_, 1L)) }.reduceByKey(_ + _).filter { case (_, count) => - count >= minCount + count >= minCount }.sortBy(-_._2).map(_._1).collect() } @@ -233,8 +232,7 @@ object PrefixSpan extends Logging { */ private[fpm] def toDatabaseInternalRepr[Item: ClassTag]( data: RDD[Array[Array[Item]]], - itemToInt: Map[Item, Int]): - RDD[Array[Int]] = { + itemToInt: Map[Item, Int]): RDD[Array[Int]] = { data.flatMap { itemsets => val allItems = mutable.ArrayBuilder.make[Int]