From 91fd7e66d0c363e68bc9ebe2bf3e03c26ef348d2 Mon Sep 17 00:00:00 2001 From: zhangjiajin Date: Tue, 7 Jul 2015 15:30:10 +0800 Subject: [PATCH 01/25] Add new algorithm PrefixSpan and test file. --- .../apache/spark/mllib/fpm/Prefixspan.scala | 183 ++++++++++++++++++ .../spark/mllib/fpm/PrefixspanSuite.scala | 47 +++++ 2 files changed, 230 insertions(+) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/fpm/Prefixspan.scala create mode 100644 mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixspanSuite.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/Prefixspan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/Prefixspan.scala new file mode 100644 index 0000000000000..c110a37fce169 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/Prefixspan.scala @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.fpm + +import org.apache.spark.rdd.RDD + +/** + * + * A parallel PrefixSpan algorithm to mine sequential pattern. + * The PrefixSpan algorithm is described in + * [[http://web.engr.illinois.edu/~hanj/pdf/span01.pdf]]. + * + * @param sequences original sequences data + * @param minSupport the minimal support level of the sequential pattern, any pattern appears + * more than minSupport times will be output + * @param maxPatternLength the maximal length of the sequential pattern, any pattern appears + * less than maxPatternLength will be output + * + * @see [[https://en.wikipedia.org/wiki/Sequential_Pattern_Mining Sequential Pattern Mining + * (Wikipedia)]] + */ +class Prefixspan( + val sequences: RDD[Array[Int]], + val minSupport: Int = 2, + val maxPatternLength: Int = 50) extends java.io.Serializable { + + /** + * Calculate sequential patterns: + * a) find and collect length-one patterns + * b) for each length-one patterns and each sequence, + * emit (pattern (prefix), suffix sequence) as key-value pairs + * c) group by key and then map value iterator to array + * d) local PrefixSpan on each prefix + * @return sequential patterns + */ + def run(): RDD[(Seq[Int], Int)] = { + val (patternsOneLength, prefixAndCandidates) = findPatternsLengthOne() + val repartitionedRdd = repartitionSequences(prefixAndCandidates) + val nextPatterns = getPatternsInLocal(repartitionedRdd) + val allPatterns = patternsOneLength.map(x => (Seq(x._1), x._2)) ++ nextPatterns + allPatterns + } + + /** + * Find the patterns that it's length is one + * @return length-one patterns and projection table + */ + private def findPatternsLengthOne(): (RDD[(Int, Int)], RDD[(Seq[Int], Array[Int])]) = { + val patternsOneLength = sequences + .map(_.distinct) + .flatMap(p => p) + .map((_, 1)) + .reduceByKey(_ + _) + + val removedElements: Array[Int] = patternsOneLength + .filter(_._2 < minSupport) + .map(_._1) + .collect() + + val savedElements = patternsOneLength.filter(_._2 >= minSupport) + + val savedElementsArray = savedElements + .map(_._1) + .collect() + + val filteredSequences = + if (removedElements.isEmpty) { + sequences + } else { + sequences.map { p => + p.filter { x => !removedElements.contains(x) } + } + } + + val prefixAndCandidates = filteredSequences.flatMap { x => + savedElementsArray.map { y => + val sub = getSuffix(y, x) + (Seq(y), sub) + } + } + + (savedElements, prefixAndCandidates) + } + + /** + * Re-partition the RDD data, to get better balance and performance. + * @param data patterns and projected sequences data before re-partition + * @return patterns and projected sequences data after re-partition + */ + private def repartitionSequences( + data: RDD[(Seq[Int], Array[Int])]): RDD[(Seq[Int], Array[Array[Int]])] = { + val dataRemovedEmptyLine = data.filter(x => x._2.nonEmpty) + val dataMerged = dataRemovedEmptyLine + .groupByKey() + .map(x => (x._1, x._2.toArray)) + dataMerged + } + + /** + * calculate the patterns in local. + * @param data patterns and projected sequences data data + * @return patterns + */ + private def getPatternsInLocal( + data: RDD[(Seq[Int], Array[Array[Int]])]): RDD[(Seq[Int], Int)] = { + val result = data.flatMap { x => + getPatternsWithPrefix(x._1, x._2) + } + result + } + + /** + * calculate the patterns with one prefix in local. + * @param prefix prefix + * @param data patterns and projected sequences data + * @return patterns + */ + private def getPatternsWithPrefix( + prefix: Seq[Int], + data: Array[Array[Int]]): Array[(Seq[Int], Int)] = { + val elements = data + .map(x => x.distinct) + .flatMap(x => x) + .groupBy(x => x) + .map(x => (x._1, x._2.length)) + + val selectedSingleElements = elements.filter(x => x._2 >= minSupport) + + val selectedElements = selectedSingleElements + .map(x => (prefix ++ Seq(x._1), x._2)) + .toArray + + val cleanedSearchSpace = data + .map(x => x.filter(y => selectedSingleElements.contains(y))) + + val newSearchSpace = selectedSingleElements.map { x => + val sub = cleanedSearchSpace.map(y => getSuffix(x._1, y)).filter(_.nonEmpty) + (prefix ++ Seq(x._1), sub) + }.filter(x => x._2.nonEmpty) + .toArray + + val continueProcess = newSearchSpace.nonEmpty && prefix.length + 1 < maxPatternLength + + if (continueProcess) { + val nextPatterns = newSearchSpace + .map(x => getPatternsWithPrefix(x._1, x._2)) + .reduce(_ ++ _) + selectedElements ++ nextPatterns + } else { + selectedElements + } + } + + /** + * calculate suffix sequence following a prefix in a sequence + * @param prefix prefix + * @param sequence original sequence + * @return suffix sequence + */ + private def getSuffix(prefix: Int, sequence: Array[Int]): Array[Int] = { + val index = sequence.indexOf(prefix) + if (index == -1) { + Array() + } else { + sequence.takeRight(sequence.length - index - 1) + } + } +} \ No newline at end of file diff --git a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixspanSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixspanSuite.scala new file mode 100644 index 0000000000000..770a0c0906f95 --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixspanSuite.scala @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.mllib.fpm + +import org.apache.spark.SparkFunSuite +import org.apache.spark.mllib.util.MLlibTestSparkContext + +class PrefixspanSuite extends SparkFunSuite with MLlibTestSparkContext { + + test("Prefixspan sequences mining using Integer type") { + val sequences = Array( + Array(3, 1, 3, 4, 5), + Array(2, 3, 1), + Array(3, 4, 4, 3), + Array(1, 3, 4, 5), + Array(2, 4, 1), + Array(6, 5, 3)) + + val rdd = sc.parallelize(sequences, 2).cache() + + val prefixspan1 = new Prefixspan(rdd, 2, 50) + val result1 = prefixspan1.run() + assert(result1.count() == 19) + + val prefixspan2 = new Prefixspan(rdd, 3, 50) + val result2 = prefixspan2.run() + assert(result2.count() == 5) + + val prefixspan3 = new Prefixspan(rdd, 2, 2) + val result3 = prefixspan3.run() + assert(result3.count() == 14) + } +} From 575995f69dadad825d97f2248599eb62c1743fe7 Mon Sep 17 00:00:00 2001 From: zhangjiajin Date: Wed, 8 Jul 2015 17:07:37 +0800 Subject: [PATCH 02/25] Modified the code according to the review comments. --- .../apache/spark/mllib/fpm/PrefixSpan.scala | 209 ++++++++++++++++++ .../spark/mllib/fpm/PrefixSpanSuite.scala | 69 ++++++ 2 files changed, 278 insertions(+) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala create mode 100644 mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala new file mode 100644 index 0000000000000..70218e2742da0 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala @@ -0,0 +1,209 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.fpm + +import org.apache.spark.annotation.Experimental +import org.apache.spark.rdd.RDD + +/** + * + * :: Experimental :: + * + * A parallel PrefixSpan algorithm to mine sequential pattern. + * The PrefixSpan algorithm is described in + * [[http://doi.org/10.1109/ICDE.2001.914830]]. + * + * @param minSupport the minimal support level of the sequential pattern, any pattern appears + * more than (minSupport * size-of-the-dataset) times will be output + * @param maxPatternLength the maximal length of the sequential pattern, any pattern appears + * less than maxPatternLength will be output + * + * @see [[https://en.wikipedia.org/wiki/Sequential_Pattern_Mining Sequential Pattern Mining + * (Wikipedia)]] + */ +@Experimental +class PrefixSpan( + private var minSupport: Double, + private var maxPatternLength: Int) extends java.io.Serializable { + + private var absMinSupport: Int = 0 + + /** + * Constructs a default instance with default parameters + * {minSupport: `0.1`, maxPatternLength: 10}. + */ + def this() = this(0.1, 10) + + /** + * Sets the minimal support level (default: `0.1`). + */ + def setMinSupport(minSupport: Double): this.type = { + this.minSupport = minSupport + this + } + + /** + * Sets maximal pattern length. + */ + def setMaxPatternLength(maxPatternLength: Int): this.type = { + this.maxPatternLength = maxPatternLength + this + } + + /** + * Calculate sequential patterns: + * a) find and collect length-one patterns + * b) for each length-one patterns and each sequence, + * emit (pattern (prefix), suffix sequence) as key-value pairs + * c) group by key and then map value iterator to array + * d) local PrefixSpan on each prefix + * @return sequential patterns + */ + def run(sequences: RDD[Array[Int]]): RDD[(Seq[Int], Int)] = { + absMinSupport = getAbsoluteMinSupport(sequences) + val (lengthOnePatternsAndCounts, prefixAndCandidates) = + findLengthOnePatterns(sequences) + val repartitionedRdd = makePrefixProjectedDatabases(prefixAndCandidates) + val nextPatterns = getPatternsInLocal(repartitionedRdd) + val allPatterns = lengthOnePatternsAndCounts.map(x => (Seq(x._1), x._2)) ++ nextPatterns + allPatterns + } + + private def getAbsoluteMinSupport(sequences: RDD[Array[Int]]): Int = { + val result = if (minSupport <= 0) { + 0 + }else { + val count = sequences.count() + val support = if (minSupport <= 1) minSupport else 1 + (support * count).toInt + } + result + } + + /** + * Find the patterns that it's length is one + * @param sequences original sequences data + * @return length-one patterns and projection table + */ + private def findLengthOnePatterns( + sequences: RDD[Array[Int]]): (RDD[(Int, Int)], RDD[(Seq[Int], Array[Int])]) = { + val LengthOnePatternAndCounts = sequences + .flatMap(_.distinct.map((_, 1))) + .reduceByKey(_ + _) + val infrequentLengthOnePatterns: Array[Int] = LengthOnePatternAndCounts + .filter(_._2 < absMinSupport) + .map(_._1) + .collect() + val frequentLengthOnePatterns = LengthOnePatternAndCounts + .filter(_._2 >= absMinSupport) + val frequentLengthOnePatternsArray = frequentLengthOnePatterns + .map(_._1) + .collect() + val filteredSequences = + if (infrequentLengthOnePatterns.isEmpty) { + sequences + } else { + sequences.map { p => + p.filter { x => !infrequentLengthOnePatterns.contains(x) } + } + } + val prefixAndCandidates = filteredSequences.flatMap { x => + frequentLengthOnePatternsArray.map { y => + val sub = getSuffix(y, x) + (Seq(y), sub) + } + }.filter(x => x._2.nonEmpty) + (frequentLengthOnePatterns, prefixAndCandidates) + } + + /** + * Re-partition the RDD data, to get better balance and performance. + * @param data patterns and projected sequences data before re-partition + * @return patterns and projected sequences data after re-partition + */ + private def makePrefixProjectedDatabases( + data: RDD[(Seq[Int], Array[Int])]): RDD[(Seq[Int], Array[Array[Int]])] = { + val dataMerged = data + .groupByKey() + .mapValues(_.toArray) + dataMerged + } + + /** + * calculate the patterns in local. + * @param data patterns and projected sequences data data + * @return patterns + */ + private def getPatternsInLocal( + data: RDD[(Seq[Int], Array[Array[Int]])]): RDD[(Seq[Int], Int)] = { + val result = data.flatMap { x => + getPatternsWithPrefix(x._1, x._2) + } + result + } + + /** + * calculate the patterns with one prefix in local. + * @param prefix prefix + * @param projectedDatabase patterns and projected sequences data + * @return patterns + */ + private def getPatternsWithPrefix( + prefix: Seq[Int], + projectedDatabase: Array[Array[Int]]): Array[(Seq[Int], Int)] = { + val prefixAndCounts = projectedDatabase + .flatMap(_.distinct) + .groupBy(x => x) + .mapValues(_.length) + val frequentPrefixExtensions = prefixAndCounts.filter(x => x._2 >= absMinSupport) + val frequentPrefixesAndCounts = frequentPrefixExtensions + .map(x => (prefix ++ Seq(x._1), x._2)) + .toArray + val cleanedSearchSpace = projectedDatabase + .map(x => x.filter(y => frequentPrefixExtensions.contains(y))) + val prefixProjectedDatabases = frequentPrefixExtensions.map { x => + val sub = cleanedSearchSpace.map(y => getSuffix(x._1, y)).filter(_.nonEmpty) + (prefix ++ Seq(x._1), sub) + }.filter(x => x._2.nonEmpty) + .toArray + val continueProcess = prefixProjectedDatabases.nonEmpty && prefix.length + 1 < maxPatternLength + if (continueProcess) { + val nextPatterns = prefixProjectedDatabases + .map(x => getPatternsWithPrefix(x._1, x._2)) + .reduce(_ ++ _) + frequentPrefixesAndCounts ++ nextPatterns + } else { + frequentPrefixesAndCounts + } + } + + /** + * calculate suffix sequence following a prefix in a sequence + * @param prefix prefix + * @param sequence original sequence + * @return suffix sequence + */ + private def getSuffix(prefix: Int, sequence: Array[Int]): Array[Int] = { + val index = sequence.indexOf(prefix) + if (index == -1) { + Array() + } else { + sequence.drop(index + 1) + } + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala new file mode 100644 index 0000000000000..7796f1298891d --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.mllib.fpm + +import org.apache.spark.SparkFunSuite +import org.apache.spark.mllib.util.MLlibTestSparkContext +import org.apache.spark.rdd.RDD + +class PrefixspanSuite extends SparkFunSuite with MLlibTestSparkContext { + + test("Prefixspan sequences mining using Integer type") { + val sequences = Array( + Array(3, 1, 3, 4, 5), + Array(2, 3, 1), + Array(3, 4, 4, 3), + Array(1, 3, 4, 5), + Array(2, 4, 1), + Array(6, 5, 3)) + + val rdd = sc.parallelize(sequences, 2).cache() + + def formatResultString(data: RDD[(Seq[Int], Int)]): String = { + data.map(x => x._1.mkString(",") + ": " + x._2) + .collect() + .sortWith(_<_) + .mkString("; ") + } + + val prefixspan = new PrefixSpan() + .setMinSupport(0.34) + .setMaxPatternLength(50) + val result1 = prefixspan.run(rdd) + val len1 = result1.count().toInt + val actualValue1 = formatResultString(result1) + val expectedValue1 = + "1,3,4,5: 2; 1,3,4: 2; 1,3,5: 2; 1,3: 2; 1,4,5: 2;" + + " 1,4: 2; 1,5: 2; 1: 4; 2,1: 2; 2: 2; 3,1: 2; 3,3: 2;" + + " 3,4,5: 2; 3,4: 3; 3,5: 2; 3: 5; 4,5: 2; 4: 4; 5: 3" + assert(expectedValue1 == actualValue1) + + prefixspan.setMinSupport(0.5).setMaxPatternLength(50) + val result2 = prefixspan.run(rdd) + val expectedValue2 = "1: 4; 3,4: 3; 3: 5; 4: 4; 5: 3" + val actualValue2 = formatResultString(result2) + assert(expectedValue2 == actualValue2) + + prefixspan.setMinSupport(0.34).setMaxPatternLength(2) + val result3 = prefixspan.run(rdd) + val actualValue3 = formatResultString(result3) + val expectedValue3 = + "1,3: 2; 1,4: 2; 1,5: 2; 1: 4; 2,1: 2; 2: 2; 3,1: 2;" + + " 3,3: 2; 3,4: 3; 3,5: 2; 3: 5; 4,5: 2; 4: 4; 5: 3" + assert(expectedValue3 == actualValue3) + } +} From 951fd424ff189f9bf5619a84f3f19e942f592396 Mon Sep 17 00:00:00 2001 From: zhang jiajin Date: Wed, 8 Jul 2015 18:22:16 +0800 Subject: [PATCH 03/25] Delete Prefixspan.scala Use PrefixSpan.scala instead of Prefixspan.scala. Delete Prefixspan.scala --- .../apache/spark/mllib/fpm/Prefixspan.scala | 183 ------------------ 1 file changed, 183 deletions(-) delete mode 100644 mllib/src/main/scala/org/apache/spark/mllib/fpm/Prefixspan.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/Prefixspan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/Prefixspan.scala deleted file mode 100644 index c110a37fce169..0000000000000 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/Prefixspan.scala +++ /dev/null @@ -1,183 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.mllib.fpm - -import org.apache.spark.rdd.RDD - -/** - * - * A parallel PrefixSpan algorithm to mine sequential pattern. - * The PrefixSpan algorithm is described in - * [[http://web.engr.illinois.edu/~hanj/pdf/span01.pdf]]. - * - * @param sequences original sequences data - * @param minSupport the minimal support level of the sequential pattern, any pattern appears - * more than minSupport times will be output - * @param maxPatternLength the maximal length of the sequential pattern, any pattern appears - * less than maxPatternLength will be output - * - * @see [[https://en.wikipedia.org/wiki/Sequential_Pattern_Mining Sequential Pattern Mining - * (Wikipedia)]] - */ -class Prefixspan( - val sequences: RDD[Array[Int]], - val minSupport: Int = 2, - val maxPatternLength: Int = 50) extends java.io.Serializable { - - /** - * Calculate sequential patterns: - * a) find and collect length-one patterns - * b) for each length-one patterns and each sequence, - * emit (pattern (prefix), suffix sequence) as key-value pairs - * c) group by key and then map value iterator to array - * d) local PrefixSpan on each prefix - * @return sequential patterns - */ - def run(): RDD[(Seq[Int], Int)] = { - val (patternsOneLength, prefixAndCandidates) = findPatternsLengthOne() - val repartitionedRdd = repartitionSequences(prefixAndCandidates) - val nextPatterns = getPatternsInLocal(repartitionedRdd) - val allPatterns = patternsOneLength.map(x => (Seq(x._1), x._2)) ++ nextPatterns - allPatterns - } - - /** - * Find the patterns that it's length is one - * @return length-one patterns and projection table - */ - private def findPatternsLengthOne(): (RDD[(Int, Int)], RDD[(Seq[Int], Array[Int])]) = { - val patternsOneLength = sequences - .map(_.distinct) - .flatMap(p => p) - .map((_, 1)) - .reduceByKey(_ + _) - - val removedElements: Array[Int] = patternsOneLength - .filter(_._2 < minSupport) - .map(_._1) - .collect() - - val savedElements = patternsOneLength.filter(_._2 >= minSupport) - - val savedElementsArray = savedElements - .map(_._1) - .collect() - - val filteredSequences = - if (removedElements.isEmpty) { - sequences - } else { - sequences.map { p => - p.filter { x => !removedElements.contains(x) } - } - } - - val prefixAndCandidates = filteredSequences.flatMap { x => - savedElementsArray.map { y => - val sub = getSuffix(y, x) - (Seq(y), sub) - } - } - - (savedElements, prefixAndCandidates) - } - - /** - * Re-partition the RDD data, to get better balance and performance. - * @param data patterns and projected sequences data before re-partition - * @return patterns and projected sequences data after re-partition - */ - private def repartitionSequences( - data: RDD[(Seq[Int], Array[Int])]): RDD[(Seq[Int], Array[Array[Int]])] = { - val dataRemovedEmptyLine = data.filter(x => x._2.nonEmpty) - val dataMerged = dataRemovedEmptyLine - .groupByKey() - .map(x => (x._1, x._2.toArray)) - dataMerged - } - - /** - * calculate the patterns in local. - * @param data patterns and projected sequences data data - * @return patterns - */ - private def getPatternsInLocal( - data: RDD[(Seq[Int], Array[Array[Int]])]): RDD[(Seq[Int], Int)] = { - val result = data.flatMap { x => - getPatternsWithPrefix(x._1, x._2) - } - result - } - - /** - * calculate the patterns with one prefix in local. - * @param prefix prefix - * @param data patterns and projected sequences data - * @return patterns - */ - private def getPatternsWithPrefix( - prefix: Seq[Int], - data: Array[Array[Int]]): Array[(Seq[Int], Int)] = { - val elements = data - .map(x => x.distinct) - .flatMap(x => x) - .groupBy(x => x) - .map(x => (x._1, x._2.length)) - - val selectedSingleElements = elements.filter(x => x._2 >= minSupport) - - val selectedElements = selectedSingleElements - .map(x => (prefix ++ Seq(x._1), x._2)) - .toArray - - val cleanedSearchSpace = data - .map(x => x.filter(y => selectedSingleElements.contains(y))) - - val newSearchSpace = selectedSingleElements.map { x => - val sub = cleanedSearchSpace.map(y => getSuffix(x._1, y)).filter(_.nonEmpty) - (prefix ++ Seq(x._1), sub) - }.filter(x => x._2.nonEmpty) - .toArray - - val continueProcess = newSearchSpace.nonEmpty && prefix.length + 1 < maxPatternLength - - if (continueProcess) { - val nextPatterns = newSearchSpace - .map(x => getPatternsWithPrefix(x._1, x._2)) - .reduce(_ ++ _) - selectedElements ++ nextPatterns - } else { - selectedElements - } - } - - /** - * calculate suffix sequence following a prefix in a sequence - * @param prefix prefix - * @param sequence original sequence - * @return suffix sequence - */ - private def getSuffix(prefix: Int, sequence: Array[Int]): Array[Int] = { - val index = sequence.indexOf(prefix) - if (index == -1) { - Array() - } else { - sequence.takeRight(sequence.length - index - 1) - } - } -} \ No newline at end of file From a2eb14c7fb6abb70eaa046baf78da205c7a4ca7d Mon Sep 17 00:00:00 2001 From: zhang jiajin Date: Wed, 8 Jul 2015 18:23:31 +0800 Subject: [PATCH 04/25] Delete PrefixspanSuite.scala Use PrefixSpanSuite.scala instead of PrefixspanSuite.scala, Delete PrefixspanSuite.scala. --- .../spark/mllib/fpm/PrefixspanSuite.scala | 47 ------------------- 1 file changed, 47 deletions(-) delete mode 100644 mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixspanSuite.scala diff --git a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixspanSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixspanSuite.scala deleted file mode 100644 index 770a0c0906f95..0000000000000 --- a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixspanSuite.scala +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.mllib.fpm - -import org.apache.spark.SparkFunSuite -import org.apache.spark.mllib.util.MLlibTestSparkContext - -class PrefixspanSuite extends SparkFunSuite with MLlibTestSparkContext { - - test("Prefixspan sequences mining using Integer type") { - val sequences = Array( - Array(3, 1, 3, 4, 5), - Array(2, 3, 1), - Array(3, 4, 4, 3), - Array(1, 3, 4, 5), - Array(2, 4, 1), - Array(6, 5, 3)) - - val rdd = sc.parallelize(sequences, 2).cache() - - val prefixspan1 = new Prefixspan(rdd, 2, 50) - val result1 = prefixspan1.run() - assert(result1.count() == 19) - - val prefixspan2 = new Prefixspan(rdd, 3, 50) - val result2 = prefixspan2.run() - assert(result2.count() == 5) - - val prefixspan3 = new Prefixspan(rdd, 2, 2) - val result3 = prefixspan3.run() - assert(result3.count() == 14) - } -} From 89bc368f76c40ad0090a928cec49cd9d28ce666e Mon Sep 17 00:00:00 2001 From: zhangjiajin Date: Wed, 8 Jul 2015 18:50:38 +0800 Subject: [PATCH 05/25] Fixed a Scala style error. --- .../main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala index 70218e2742da0..4a78a25f39a40 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala @@ -84,10 +84,10 @@ class PrefixSpan( allPatterns } - private def getAbsoluteMinSupport(sequences: RDD[Array[Int]]): Int = { + private def getAbsoluteMinSupport(sequences: RDD[Array[Int]]): Int = { val result = if (minSupport <= 0) { 0 - }else { + } else { val count = sequences.count() val support = if (minSupport <= 1) minSupport else 1 (support * count).toInt From 1dd33ad82499b9ad1b446b96f2f88519ffbe9a1b Mon Sep 17 00:00:00 2001 From: zhangjiajin Date: Thu, 9 Jul 2015 22:40:29 +0800 Subject: [PATCH 06/25] Modified the code according to the review comments. --- .../apache/spark/mllib/fpm/PrefixSpan.scala | 211 +++++++++++------- .../spark/mllib/fpm/PrefixSpanSuite.scala | 98 ++++++-- 2 files changed, 201 insertions(+), 108 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala index 4a78a25f39a40..05f8c4186aaf6 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala @@ -17,8 +17,10 @@ package org.apache.spark.mllib.fpm +import org.apache.spark.Logging import org.apache.spark.annotation.Experimental import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel /** * @@ -37,15 +39,13 @@ import org.apache.spark.rdd.RDD * (Wikipedia)]] */ @Experimental -class PrefixSpan( +class PrefixSpan private ( private var minSupport: Double, - private var maxPatternLength: Int) extends java.io.Serializable { - - private var absMinSupport: Int = 0 + private var maxPatternLength: Int) extends Logging with Serializable { /** * Constructs a default instance with default parameters - * {minSupport: `0.1`, maxPatternLength: 10}. + * {minSupport: `0.1`, maxPatternLength: `10`}. */ def this() = this(0.1, 10) @@ -53,149 +53,192 @@ class PrefixSpan( * Sets the minimal support level (default: `0.1`). */ def setMinSupport(minSupport: Double): this.type = { + require(minSupport >= 0 && minSupport <= 1) this.minSupport = minSupport this } /** - * Sets maximal pattern length. + * Sets maximal pattern length (default: `10`). */ def setMaxPatternLength(maxPatternLength: Int): this.type = { + require(maxPatternLength >= 1) this.maxPatternLength = maxPatternLength this } /** - * Calculate sequential patterns: - * a) find and collect length-one patterns - * b) for each length-one patterns and each sequence, - * emit (pattern (prefix), suffix sequence) as key-value pairs - * c) group by key and then map value iterator to array - * d) local PrefixSpan on each prefix - * @return sequential patterns + * Find the complete set of sequential patterns in the input sequences. + * @param sequences input data set, contains a set of sequences, + * a sequence is an ordered list of elements. + * @return a set of sequential pattern pairs, + * the key of pair is pattern (a list of elements), + * the value of pair is the pattern's support value. */ - def run(sequences: RDD[Array[Int]]): RDD[(Seq[Int], Int)] = { - absMinSupport = getAbsoluteMinSupport(sequences) + def run(sequences: RDD[Array[Int]]): RDD[(Array[Int], Long)] = { + if (sequences.getStorageLevel == StorageLevel.NONE) { + logWarning("Input data is not cached.") + } + val minCount = getAbsoluteMinSupport(sequences) val (lengthOnePatternsAndCounts, prefixAndCandidates) = - findLengthOnePatterns(sequences) + findLengthOnePatterns(minCount, sequences) val repartitionedRdd = makePrefixProjectedDatabases(prefixAndCandidates) - val nextPatterns = getPatternsInLocal(repartitionedRdd) - val allPatterns = lengthOnePatternsAndCounts.map(x => (Seq(x._1), x._2)) ++ nextPatterns + val nextPatterns = getPatternsInLocal(minCount, repartitionedRdd) + val allPatterns = lengthOnePatternsAndCounts.map(x => (Array(x._1), x._2)) ++ nextPatterns allPatterns } - private def getAbsoluteMinSupport(sequences: RDD[Array[Int]]): Int = { - val result = if (minSupport <= 0) { - 0 - } else { - val count = sequences.count() - val support = if (minSupport <= 1) minSupport else 1 - (support * count).toInt - } - result + /** + * Get the absolute minimum support value (sequences count * minSupport). + * @param sequences input data set, contains a set of sequences, + * @return absolute minimum support value, + */ + private def getAbsoluteMinSupport(sequences: RDD[Array[Int]]): Long = { + if (minSupport == 0) 0L else (sequences.count() * minSupport).toLong } /** - * Find the patterns that it's length is one + * Generates frequent items by filtering the input data using minimal support level. + * @param minCount the absolute minimum support * @param sequences original sequences data - * @return length-one patterns and projection table + * @return array of frequent pattern ordered by their frequencies */ - private def findLengthOnePatterns( - sequences: RDD[Array[Int]]): (RDD[(Int, Int)], RDD[(Seq[Int], Array[Int])]) = { - val LengthOnePatternAndCounts = sequences - .flatMap(_.distinct.map((_, 1))) + private def getFreqItemAndCounts( + minCount: Long, + sequences: RDD[Array[Int]]): RDD[(Int, Long)] = { + sequences.flatMap(_.distinct.map((_, 1L))) .reduceByKey(_ + _) - val infrequentLengthOnePatterns: Array[Int] = LengthOnePatternAndCounts - .filter(_._2 < absMinSupport) - .map(_._1) - .collect() - val frequentLengthOnePatterns = LengthOnePatternAndCounts - .filter(_._2 >= absMinSupport) - val frequentLengthOnePatternsArray = frequentLengthOnePatterns - .map(_._1) - .collect() - val filteredSequences = - if (infrequentLengthOnePatterns.isEmpty) { - sequences - } else { - sequences.map { p => - p.filter { x => !infrequentLengthOnePatterns.contains(x) } - } - } - val prefixAndCandidates = filteredSequences.flatMap { x => - frequentLengthOnePatternsArray.map { y => + .filter(_._2 >= minCount) + } + + /** + * Generates frequent items by filtering the input data using minimal support level. + * @param minCount the absolute minimum support + * @param sequences sequences data + * @return array of frequent pattern ordered by their frequencies + */ + private def getFreqItemAndCounts( + minCount: Long, + sequences: Array[Array[Int]]): Array[(Int, Long)] = { + sequences.flatMap(_.distinct) + .groupBy(x => x) + .mapValues(_.length.toLong) + .filter(_._2 >= minCount) + .toArray + } + + /** + * Get the frequent prefixes' projected database. + * @param frequentPrefixes frequent prefixes + * @param sequences sequences data + * @return prefixes and projected database + */ + private def getPatternAndProjectedDatabase( + frequentPrefixes: Array[Int], + sequences: RDD[Array[Int]]): RDD[(Array[Int], Array[Int])] = { + val filteredSequences = sequences.map { p => + p.filter (frequentPrefixes.contains(_) ) + } + filteredSequences.flatMap { x => + frequentPrefixes.map { y => val sub = getSuffix(y, x) - (Seq(y), sub) + (Array(y), sub) } }.filter(x => x._2.nonEmpty) - (frequentLengthOnePatterns, prefixAndCandidates) } /** - * Re-partition the RDD data, to get better balance and performance. + * Get the frequent prefixes' projected database. + * @param prePrefix the frequent prefixes' prefix + * @param frequentPrefixes frequent prefixes + * @param sequences sequences data + * @return prefixes and projected database + */ + private def getPatternAndProjectedDatabase( + prePrefix: Array[Int], + frequentPrefixes: Array[Int], + sequences: Array[Array[Int]]): Array[(Array[Int], Array[Array[Int]])] = { + val filteredProjectedDatabase = sequences + .map(x => x.filter(frequentPrefixes.contains(_))) + frequentPrefixes.map { x => + val sub = filteredProjectedDatabase.map(y => getSuffix(x, y)).filter(_.nonEmpty) + (prePrefix ++ Array(x), sub) + }.filter(x => x._2.nonEmpty) + } + + /** + * Find the patterns that it's length is one + * @param minCount the absolute minimum support + * @param sequences original sequences data + * @return length-one patterns and projection table + */ + private def findLengthOnePatterns( + minCount: Long, + sequences: RDD[Array[Int]]): (RDD[(Int, Long)], RDD[(Array[Int], Array[Int])]) = { + val frequentLengthOnePatternAndCounts = getFreqItemAndCounts(minCount, sequences) + val prefixAndProjectedDatabase = getPatternAndProjectedDatabase( + frequentLengthOnePatternAndCounts.keys.collect(), sequences) + (frequentLengthOnePatternAndCounts, prefixAndProjectedDatabase) + } + + /** + * Constructs prefix-projected databases from (prefix, suffix) pairs. * @param data patterns and projected sequences data before re-partition * @return patterns and projected sequences data after re-partition */ private def makePrefixProjectedDatabases( - data: RDD[(Seq[Int], Array[Int])]): RDD[(Seq[Int], Array[Array[Int]])] = { - val dataMerged = data + data: RDD[(Array[Int], Array[Int])]): RDD[(Array[Int], Array[Array[Int]])] = { + data.map(x => (x._1.toSeq, x._2)) .groupByKey() - .mapValues(_.toArray) - dataMerged + .map(x => (x._1.toArray, x._2.toArray)) } /** * calculate the patterns in local. + * @param minCount the absolute minimum support * @param data patterns and projected sequences data data * @return patterns */ private def getPatternsInLocal( - data: RDD[(Seq[Int], Array[Array[Int]])]): RDD[(Seq[Int], Int)] = { - val result = data.flatMap { x => - getPatternsWithPrefix(x._1, x._2) + minCount: Long, + data: RDD[(Array[Int], Array[Array[Int]])]): RDD[(Array[Int], Long)] = { + data.flatMap { x => + getPatternsWithPrefix(minCount, x._1, x._2) } - result } /** * calculate the patterns with one prefix in local. + * @param minCount the absolute minimum support * @param prefix prefix * @param projectedDatabase patterns and projected sequences data * @return patterns */ private def getPatternsWithPrefix( - prefix: Seq[Int], - projectedDatabase: Array[Array[Int]]): Array[(Seq[Int], Int)] = { - val prefixAndCounts = projectedDatabase - .flatMap(_.distinct) - .groupBy(x => x) - .mapValues(_.length) - val frequentPrefixExtensions = prefixAndCounts.filter(x => x._2 >= absMinSupport) - val frequentPrefixesAndCounts = frequentPrefixExtensions - .map(x => (prefix ++ Seq(x._1), x._2)) - .toArray - val cleanedSearchSpace = projectedDatabase - .map(x => x.filter(y => frequentPrefixExtensions.contains(y))) - val prefixProjectedDatabases = frequentPrefixExtensions.map { x => - val sub = cleanedSearchSpace.map(y => getSuffix(x._1, y)).filter(_.nonEmpty) - (prefix ++ Seq(x._1), sub) - }.filter(x => x._2.nonEmpty) - .toArray + minCount: Long, + prefix: Array[Int], + projectedDatabase: Array[Array[Int]]): Array[(Array[Int], Long)] = { + val frequentPrefixAndCounts = getFreqItemAndCounts(minCount, projectedDatabase) + val frequentPatternAndCounts = frequentPrefixAndCounts + .map(x => (prefix ++ Array(x._1), x._2)) + val prefixProjectedDatabases = getPatternAndProjectedDatabase( + prefix, frequentPrefixAndCounts.map(_._1), projectedDatabase) + val continueProcess = prefixProjectedDatabases.nonEmpty && prefix.length + 1 < maxPatternLength if (continueProcess) { val nextPatterns = prefixProjectedDatabases - .map(x => getPatternsWithPrefix(x._1, x._2)) + .map(x => getPatternsWithPrefix(minCount, x._1, x._2)) .reduce(_ ++ _) - frequentPrefixesAndCounts ++ nextPatterns + frequentPatternAndCounts ++ nextPatterns } else { - frequentPrefixesAndCounts + frequentPatternAndCounts } } /** * calculate suffix sequence following a prefix in a sequence * @param prefix prefix - * @param sequence original sequence + * @param sequence sequence * @return suffix sequence */ private def getSuffix(prefix: Int, sequence: Array[Int]): Array[Int] = { diff --git a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala index 7796f1298891d..133fa3b41a754 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala @@ -22,48 +22,98 @@ import org.apache.spark.rdd.RDD class PrefixspanSuite extends SparkFunSuite with MLlibTestSparkContext { - test("Prefixspan sequences mining using Integer type") { + test("PrefixSpan using Integer type") { + + /* + library("arulesSequences") + prefixSpanSeqs = read_baskets("prefixSpanSeqs", info = c("sequenceID","eventID","SIZE")) + freqItemSeq = cspade( + prefixSpanSeqs, + parameter = list(support = 2 / length(unique(transactionInfo(prefixSpanSeqs)$sequenceID)), maxlen = 2 )) + resSeq = as(freqItemSeq, "data.frame") + resSeq + */ + val sequences = Array( - Array(3, 1, 3, 4, 5), - Array(2, 3, 1), - Array(3, 4, 4, 3), Array(1, 3, 4, 5), + Array(2, 3, 1), Array(2, 4, 1), + Array(3, 1, 3, 4, 5), + Array(3, 4, 4, 3), Array(6, 5, 3)) val rdd = sc.parallelize(sequences, 2).cache() - def formatResultString(data: RDD[(Seq[Int], Int)]): String = { - data.map(x => x._1.mkString(",") + ": " + x._2) - .collect() - .sortWith(_<_) - .mkString("; ") + def compareResult( + expectedValue: Array[(Array[Int], Long)], + actualValue: Array[(Array[Int], Long)]): Boolean = { + val sortedExpectedValue = expectedValue.sortWith{ (x, y) => + x._1.mkString(",") + ":" + x._2 < y._1.mkString(",") + ":" + y._2 + } + val sortedActualValue = actualValue.sortWith{ (x, y) => + x._1.mkString(",") + ":" + x._2 < y._1.mkString(",") + ":" + y._2 + } + sortedExpectedValue.zip(sortedActualValue) + .map(x => x._1._1.mkString(",") == x._2._1.mkString(",") && x._1._2 == x._2._2) + .reduce(_&&_) } val prefixspan = new PrefixSpan() .setMinSupport(0.34) .setMaxPatternLength(50) val result1 = prefixspan.run(rdd) - val len1 = result1.count().toInt - val actualValue1 = formatResultString(result1) - val expectedValue1 = - "1,3,4,5: 2; 1,3,4: 2; 1,3,5: 2; 1,3: 2; 1,4,5: 2;" + - " 1,4: 2; 1,5: 2; 1: 4; 2,1: 2; 2: 2; 3,1: 2; 3,3: 2;" + - " 3,4,5: 2; 3,4: 3; 3,5: 2; 3: 5; 4,5: 2; 4: 4; 5: 3" - assert(expectedValue1 == actualValue1) + val expectedValue1 = Array( + (Array(1), 4L), + (Array(1,3),2L), + (Array(1,3,4), 2L), + (Array(1,3,4,5), 2L), + (Array(1,3,5), 2L), + (Array(1,4), 2L), + (Array(1,4,5), 2L), + (Array(1,5), 2L), + (Array(2), 2L), + (Array(2,1), 2L), + (Array(3), 5L), + (Array(3,1), 2L), + (Array(3,3), 2L), + (Array(3,4), 3L), + (Array(3,4,5), 2L), + (Array(3,5), 2L), + (Array(4), 4L), + (Array(4,5), 2L), + (Array(5), 3L) + ) + assert(compareResult(expectedValue1, result1.collect())) prefixspan.setMinSupport(0.5).setMaxPatternLength(50) val result2 = prefixspan.run(rdd) - val expectedValue2 = "1: 4; 3,4: 3; 3: 5; 4: 4; 5: 3" - val actualValue2 = formatResultString(result2) - assert(expectedValue2 == actualValue2) + val expectedValue2 = Array( + (Array(1), 4L), + (Array(3), 5L), + (Array(3,4), 3L), + (Array(4), 4L), + (Array(5), 3L) + ) + assert(compareResult(expectedValue2, result2.collect())) prefixspan.setMinSupport(0.34).setMaxPatternLength(2) val result3 = prefixspan.run(rdd) - val actualValue3 = formatResultString(result3) - val expectedValue3 = - "1,3: 2; 1,4: 2; 1,5: 2; 1: 4; 2,1: 2; 2: 2; 3,1: 2;" + - " 3,3: 2; 3,4: 3; 3,5: 2; 3: 5; 4,5: 2; 4: 4; 5: 3" - assert(expectedValue3 == actualValue3) + val expectedValue3 = Array( + (Array(1), 4L), + (Array(1,3), 2L), + (Array(1,4), 2L), + (Array(1,5), 2L), + (Array(2,1), 2L), + (Array(2), 2L), + (Array(3), 5L), + (Array(3,1), 2L), + (Array(3,3), 2L), + (Array(3,4), 3L), + (Array(3,5), 2L), + (Array(4), 4L), + (Array(4,5), 2L), + (Array(5), 3L) + ) + assert(compareResult(expectedValue3, result3.collect())) } } From 4c60fb36148206abd67fe51cea667ee3d63e490e Mon Sep 17 00:00:00 2001 From: zhangjiajin Date: Thu, 9 Jul 2015 23:01:45 +0800 Subject: [PATCH 07/25] Fix some Scala style errors. --- .../spark/mllib/fpm/PrefixSpanSuite.scala | 49 ++++++++++--------- 1 file changed, 25 insertions(+), 24 deletions(-) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala index 133fa3b41a754..0cbbf3741d02a 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala @@ -29,7 +29,8 @@ class PrefixspanSuite extends SparkFunSuite with MLlibTestSparkContext { prefixSpanSeqs = read_baskets("prefixSpanSeqs", info = c("sequenceID","eventID","SIZE")) freqItemSeq = cspade( prefixSpanSeqs, - parameter = list(support = 2 / length(unique(transactionInfo(prefixSpanSeqs)$sequenceID)), maxlen = 2 )) + parameter = list(support = + 2 / length(unique(transactionInfo(prefixSpanSeqs)$sequenceID)), maxlen = 2 )) resSeq = as(freqItemSeq, "data.frame") resSeq */ @@ -64,23 +65,23 @@ class PrefixspanSuite extends SparkFunSuite with MLlibTestSparkContext { val result1 = prefixspan.run(rdd) val expectedValue1 = Array( (Array(1), 4L), - (Array(1,3),2L), - (Array(1,3,4), 2L), - (Array(1,3,4,5), 2L), - (Array(1,3,5), 2L), - (Array(1,4), 2L), - (Array(1,4,5), 2L), - (Array(1,5), 2L), + (Array(1, 3), 2L), + (Array(1, 3, 4), 2L), + (Array(1, 3, 4, 5), 2L), + (Array(1, 3, 5), 2L), + (Array(1, 4), 2L), + (Array(1, 4, 5), 2L), + (Array(1, 5), 2L), (Array(2), 2L), - (Array(2,1), 2L), + (Array(2, 1), 2L), (Array(3), 5L), - (Array(3,1), 2L), - (Array(3,3), 2L), - (Array(3,4), 3L), - (Array(3,4,5), 2L), - (Array(3,5), 2L), + (Array(3, 1), 2L), + (Array(3, 3), 2L), + (Array(3, 4), 3L), + (Array(3, 4, 5), 2L), + (Array(3, 5), 2L), (Array(4), 4L), - (Array(4,5), 2L), + (Array(4, 5), 2L), (Array(5), 3L) ) assert(compareResult(expectedValue1, result1.collect())) @@ -90,7 +91,7 @@ class PrefixspanSuite extends SparkFunSuite with MLlibTestSparkContext { val expectedValue2 = Array( (Array(1), 4L), (Array(3), 5L), - (Array(3,4), 3L), + (Array(3, 4), 3L), (Array(4), 4L), (Array(5), 3L) ) @@ -100,18 +101,18 @@ class PrefixspanSuite extends SparkFunSuite with MLlibTestSparkContext { val result3 = prefixspan.run(rdd) val expectedValue3 = Array( (Array(1), 4L), - (Array(1,3), 2L), + (Array(1, 3), 2L), (Array(1,4), 2L), - (Array(1,5), 2L), - (Array(2,1), 2L), + (Array(1, 5), 2L), + (Array(2, 1), 2L), (Array(2), 2L), (Array(3), 5L), - (Array(3,1), 2L), - (Array(3,3), 2L), - (Array(3,4), 3L), - (Array(3,5), 2L), + (Array(3, 1), 2L), + (Array(3, 3), 2L), + (Array(3, 4), 3L), + (Array(3, 5), 2L), (Array(4), 4L), - (Array(4,5), 2L), + (Array(4, 5), 2L), (Array(5), 3L) ) assert(compareResult(expectedValue3, result3.collect())) From ba5df346543e9aee119bd781b257860b65bbe7df Mon Sep 17 00:00:00 2001 From: zhangjiajin Date: Thu, 9 Jul 2015 23:10:25 +0800 Subject: [PATCH 08/25] Fix a Scala style error. --- .../test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala index 0cbbf3741d02a..e4bc77849bd2c 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala @@ -102,7 +102,7 @@ class PrefixspanSuite extends SparkFunSuite with MLlibTestSparkContext { val expectedValue3 = Array( (Array(1), 4L), (Array(1, 3), 2L), - (Array(1,4), 2L), + (Array(1, 4), 2L), (Array(1, 5), 2L), (Array(2, 1), 2L), (Array(2), 2L), From 574e56ccfb271d0ed86c3eba95d1a11a8688495d Mon Sep 17 00:00:00 2001 From: zhangjiajin Date: Fri, 10 Jul 2015 19:49:06 +0800 Subject: [PATCH 09/25] Add new object LocalPrefixSpan, and do some optimization. --- .../spark/mllib/fpm/LocalPrefixSpan.scala | 129 ++++++++++++++++++ .../apache/spark/mllib/fpm/PrefixSpan.scala | 127 ++++------------- .../spark/mllib/fpm/PrefixSpanSuite.scala | 4 +- 3 files changed, 158 insertions(+), 102 deletions(-) create mode 100644 mllib/src/main/scala/org/apache/spark/mllib/fpm/LocalPrefixSpan.scala diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/LocalPrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/LocalPrefixSpan.scala new file mode 100644 index 0000000000000..dc555001b7778 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/LocalPrefixSpan.scala @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.fpm + +import org.apache.spark.Logging +import org.apache.spark.annotation.Experimental + +/** + * + * :: Experimental :: + * + * Calculate all patterns of a projected database in local. + */ +@Experimental +private[fpm] object LocalPrefixSpan extends Logging with Serializable { + + /** + * Calculate all patterns of a projected database in local. + * @param minCount minimum count + * @param maxPatternLength maximum pattern length + * @param prefix prefix + * @param projectedDatabase the projected dabase + * @return a set of sequential pattern pairs, + * the key of pair is pattern (a list of elements), + * the value of pair is the pattern's count. + */ + def run( + minCount: Long, + maxPatternLength: Int, + prefix: Array[Int], + projectedDatabase: Array[Array[Int]]): Array[(Array[Int], Long)] = { + getPatternsWithPrefix(minCount, maxPatternLength, prefix, projectedDatabase) + } + + /** + * calculate suffix sequence following a prefix in a sequence + * @param prefix prefix + * @param sequence sequence + * @return suffix sequence + */ + def getSuffix(prefix: Int, sequence: Array[Int]): Array[Int] = { + val index = sequence.indexOf(prefix) + if (index == -1) { + Array() + } else { + sequence.drop(index + 1) + } + } + + /** + * Generates frequent items by filtering the input data using minimal count level. + * @param minCount the absolute minimum count + * @param sequences sequences data + * @return array of item and count pair + */ + private def getFreqItemAndCounts( + minCount: Long, + sequences: Array[Array[Int]]): Array[(Int, Long)] = { + sequences.flatMap(_.distinct) + .groupBy(x => x) + .mapValues(_.length.toLong) + .filter(_._2 >= minCount) + .toArray + } + + /** + * Get the frequent prefixes' projected database. + * @param prePrefix the frequent prefixes' prefix + * @param frequentPrefixes frequent prefixes + * @param sequences sequences data + * @return prefixes and projected database + */ + private def getPatternAndProjectedDatabase( + prePrefix: Array[Int], + frequentPrefixes: Array[Int], + sequences: Array[Array[Int]]): Array[(Array[Int], Array[Array[Int]])] = { + val filteredProjectedDatabase = sequences + .map(x => x.filter(frequentPrefixes.contains(_))) + frequentPrefixes.map { x => + val sub = filteredProjectedDatabase.map(y => getSuffix(x, y)).filter(_.nonEmpty) + (prePrefix ++ Array(x), sub) + }.filter(x => x._2.nonEmpty) + } + + /** + * Calculate all patterns of a projected database in local. + * @param minCount the minimum count + * @param maxPatternLength maximum pattern length + * @param prefix prefix + * @param projectedDatabase projected database + * @return patterns + */ + private def getPatternsWithPrefix( + minCount: Long, + maxPatternLength: Int, + prefix: Array[Int], + projectedDatabase: Array[Array[Int]]): Array[(Array[Int], Long)] = { + val frequentPrefixAndCounts = getFreqItemAndCounts(minCount, projectedDatabase) + val frequentPatternAndCounts = frequentPrefixAndCounts + .map(x => (prefix ++ Array(x._1), x._2)) + val prefixProjectedDatabases = getPatternAndProjectedDatabase( + prefix, frequentPrefixAndCounts.map(_._1), projectedDatabase) + + val continueProcess = prefixProjectedDatabases.nonEmpty && prefix.length + 1 < maxPatternLength + if (continueProcess) { + val nextPatterns = prefixProjectedDatabases + .map(x => getPatternsWithPrefix(minCount, maxPatternLength, x._1, x._2)) + .reduce(_ ++ _) + frequentPatternAndCounts ++ nextPatterns + } else { + frequentPatternAndCounts + } + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala index 05f8c4186aaf6..2239aa529695c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala @@ -53,7 +53,8 @@ class PrefixSpan private ( * Sets the minimal support level (default: `0.1`). */ def setMinSupport(minSupport: Double): this.type = { - require(minSupport >= 0 && minSupport <= 1) + require(minSupport >= 0 && minSupport <= 1, + "The minimum support value must be between 0 and 1, including 0 and 1.") this.minSupport = minSupport this } @@ -62,7 +63,8 @@ class PrefixSpan private ( * Sets maximal pattern length (default: `10`). */ def setMaxPatternLength(maxPatternLength: Int): this.type = { - require(maxPatternLength >= 1) + require(maxPatternLength >= 1, + "The maximum pattern length value must be greater than 0.") this.maxPatternLength = maxPatternLength this } @@ -73,35 +75,38 @@ class PrefixSpan private ( * a sequence is an ordered list of elements. * @return a set of sequential pattern pairs, * the key of pair is pattern (a list of elements), - * the value of pair is the pattern's support value. + * the value of pair is the pattern's count. */ def run(sequences: RDD[Array[Int]]): RDD[(Array[Int], Long)] = { if (sequences.getStorageLevel == StorageLevel.NONE) { logWarning("Input data is not cached.") } - val minCount = getAbsoluteMinSupport(sequences) + val minCount = getMinCount(sequences) val (lengthOnePatternsAndCounts, prefixAndCandidates) = findLengthOnePatterns(minCount, sequences) - val repartitionedRdd = makePrefixProjectedDatabases(prefixAndCandidates) - val nextPatterns = getPatternsInLocal(minCount, repartitionedRdd) - val allPatterns = lengthOnePatternsAndCounts.map(x => (Array(x._1), x._2)) ++ nextPatterns + val projectedDatabase = makePrefixProjectedDatabases(prefixAndCandidates) + val nextPatterns = getPatternsInLocal(minCount, projectedDatabase) + val lengthOnePatternsAndCountsRdd = + sequences.sparkContext.parallelize( + lengthOnePatternsAndCounts.map(x => (Array(x._1), x._2))) + val allPatterns = lengthOnePatternsAndCountsRdd ++ nextPatterns allPatterns } /** - * Get the absolute minimum support value (sequences count * minSupport). + * Get the minimum count (sequences count * minSupport). * @param sequences input data set, contains a set of sequences, - * @return absolute minimum support value, + * @return minimum count, */ - private def getAbsoluteMinSupport(sequences: RDD[Array[Int]]): Long = { - if (minSupport == 0) 0L else (sequences.count() * minSupport).toLong + private def getMinCount(sequences: RDD[Array[Int]]): Long = { + if (minSupport == 0) 0L else math.ceil(sequences.count() * minSupport).toLong } /** - * Generates frequent items by filtering the input data using minimal support level. - * @param minCount the absolute minimum support + * Generates frequent items by filtering the input data using minimal count level. + * @param minCount the absolute minimum count * @param sequences original sequences data - * @return array of frequent pattern ordered by their frequencies + * @return array of item and count pair */ private def getFreqItemAndCounts( minCount: Long, @@ -111,22 +116,6 @@ class PrefixSpan private ( .filter(_._2 >= minCount) } - /** - * Generates frequent items by filtering the input data using minimal support level. - * @param minCount the absolute minimum support - * @param sequences sequences data - * @return array of frequent pattern ordered by their frequencies - */ - private def getFreqItemAndCounts( - minCount: Long, - sequences: Array[Array[Int]]): Array[(Int, Long)] = { - sequences.flatMap(_.distinct) - .groupBy(x => x) - .mapValues(_.length.toLong) - .filter(_._2 >= minCount) - .toArray - } - /** * Get the frequent prefixes' projected database. * @param frequentPrefixes frequent prefixes @@ -141,44 +130,25 @@ class PrefixSpan private ( } filteredSequences.flatMap { x => frequentPrefixes.map { y => - val sub = getSuffix(y, x) + val sub = LocalPrefixSpan.getSuffix(y, x) (Array(y), sub) - } - }.filter(x => x._2.nonEmpty) - } - - /** - * Get the frequent prefixes' projected database. - * @param prePrefix the frequent prefixes' prefix - * @param frequentPrefixes frequent prefixes - * @param sequences sequences data - * @return prefixes and projected database - */ - private def getPatternAndProjectedDatabase( - prePrefix: Array[Int], - frequentPrefixes: Array[Int], - sequences: Array[Array[Int]]): Array[(Array[Int], Array[Array[Int]])] = { - val filteredProjectedDatabase = sequences - .map(x => x.filter(frequentPrefixes.contains(_))) - frequentPrefixes.map { x => - val sub = filteredProjectedDatabase.map(y => getSuffix(x, y)).filter(_.nonEmpty) - (prePrefix ++ Array(x), sub) - }.filter(x => x._2.nonEmpty) + }.filter(_._2.nonEmpty) + } } /** * Find the patterns that it's length is one - * @param minCount the absolute minimum support + * @param minCount the minimum count * @param sequences original sequences data * @return length-one patterns and projection table */ private def findLengthOnePatterns( minCount: Long, - sequences: RDD[Array[Int]]): (RDD[(Int, Long)], RDD[(Array[Int], Array[Int])]) = { + sequences: RDD[Array[Int]]): (Array[(Int, Long)], RDD[(Array[Int], Array[Int])]) = { val frequentLengthOnePatternAndCounts = getFreqItemAndCounts(minCount, sequences) val prefixAndProjectedDatabase = getPatternAndProjectedDatabase( frequentLengthOnePatternAndCounts.keys.collect(), sequences) - (frequentLengthOnePatternAndCounts, prefixAndProjectedDatabase) + (frequentLengthOnePatternAndCounts.collect(), prefixAndProjectedDatabase) } /** @@ -195,7 +165,7 @@ class PrefixSpan private ( /** * calculate the patterns in local. - * @param minCount the absolute minimum support + * @param minCount the absolute minimum count * @param data patterns and projected sequences data data * @return patterns */ @@ -203,50 +173,7 @@ class PrefixSpan private ( minCount: Long, data: RDD[(Array[Int], Array[Array[Int]])]): RDD[(Array[Int], Long)] = { data.flatMap { x => - getPatternsWithPrefix(minCount, x._1, x._2) - } - } - - /** - * calculate the patterns with one prefix in local. - * @param minCount the absolute minimum support - * @param prefix prefix - * @param projectedDatabase patterns and projected sequences data - * @return patterns - */ - private def getPatternsWithPrefix( - minCount: Long, - prefix: Array[Int], - projectedDatabase: Array[Array[Int]]): Array[(Array[Int], Long)] = { - val frequentPrefixAndCounts = getFreqItemAndCounts(minCount, projectedDatabase) - val frequentPatternAndCounts = frequentPrefixAndCounts - .map(x => (prefix ++ Array(x._1), x._2)) - val prefixProjectedDatabases = getPatternAndProjectedDatabase( - prefix, frequentPrefixAndCounts.map(_._1), projectedDatabase) - - val continueProcess = prefixProjectedDatabases.nonEmpty && prefix.length + 1 < maxPatternLength - if (continueProcess) { - val nextPatterns = prefixProjectedDatabases - .map(x => getPatternsWithPrefix(minCount, x._1, x._2)) - .reduce(_ ++ _) - frequentPatternAndCounts ++ nextPatterns - } else { - frequentPatternAndCounts - } - } - - /** - * calculate suffix sequence following a prefix in a sequence - * @param prefix prefix - * @param sequence sequence - * @return suffix sequence - */ - private def getSuffix(prefix: Int, sequence: Array[Int]): Array[Int] = { - val index = sequence.indexOf(prefix) - if (index == -1) { - Array() - } else { - sequence.drop(index + 1) + LocalPrefixSpan.run(minCount, maxPatternLength, x._1, x._2) } } } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala index e4bc77849bd2c..413436d3db85f 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala @@ -60,7 +60,7 @@ class PrefixspanSuite extends SparkFunSuite with MLlibTestSparkContext { } val prefixspan = new PrefixSpan() - .setMinSupport(0.34) + .setMinSupport(0.33) .setMaxPatternLength(50) val result1 = prefixspan.run(rdd) val expectedValue1 = Array( @@ -97,7 +97,7 @@ class PrefixspanSuite extends SparkFunSuite with MLlibTestSparkContext { ) assert(compareResult(expectedValue2, result2.collect())) - prefixspan.setMinSupport(0.34).setMaxPatternLength(2) + prefixspan.setMinSupport(0.33).setMaxPatternLength(2) val result3 = prefixspan.run(rdd) val expectedValue3 = Array( (Array(1), 4L), From ca9c4c8fa84202d8d533c51c277138461ba096a7 Mon Sep 17 00:00:00 2001 From: zhangjiajin Date: Sat, 11 Jul 2015 10:40:24 +0800 Subject: [PATCH 10/25] Modified the code according to the review comments. --- .../spark/mllib/fpm/LocalPrefixSpan.scala | 50 +++++++------------ .../apache/spark/mllib/fpm/PrefixSpan.scala | 42 ++++------------ 2 files changed, 27 insertions(+), 65 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/LocalPrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/LocalPrefixSpan.scala index dc555001b7778..39c48b084e550 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/LocalPrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/LocalPrefixSpan.scala @@ -30,13 +30,13 @@ import org.apache.spark.annotation.Experimental private[fpm] object LocalPrefixSpan extends Logging with Serializable { /** - * Calculate all patterns of a projected database in local. + * Calculate all patterns of a projected database. * @param minCount minimum count * @param maxPatternLength maximum pattern length * @param prefix prefix * @param projectedDatabase the projected dabase * @return a set of sequential pattern pairs, - * the key of pair is pattern (a list of elements), + * the key of pair is sequential pattern (a list of items), * the value of pair is the pattern's count. */ def run( @@ -44,7 +44,21 @@ private[fpm] object LocalPrefixSpan extends Logging with Serializable { maxPatternLength: Int, prefix: Array[Int], projectedDatabase: Array[Array[Int]]): Array[(Array[Int], Long)] = { - getPatternsWithPrefix(minCount, maxPatternLength, prefix, projectedDatabase) + val frequentPrefixAndCounts = getFreqItemAndCounts(minCount, projectedDatabase) + val frequentPatternAndCounts = frequentPrefixAndCounts + .map(x => (prefix ++ Array(x._1), x._2)) + val prefixProjectedDatabases = getPatternAndProjectedDatabase( + prefix, frequentPrefixAndCounts.map(_._1), projectedDatabase) + + val continueProcess = prefixProjectedDatabases.nonEmpty && prefix.length + 1 < maxPatternLength + if (continueProcess) { + val nextPatterns = prefixProjectedDatabases + .map(x => run(minCount, maxPatternLength, x._1, x._2)) + .reduce(_ ++ _) + frequentPatternAndCounts ++ nextPatterns + } else { + frequentPatternAndCounts + } } /** @@ -96,34 +110,4 @@ private[fpm] object LocalPrefixSpan extends Logging with Serializable { (prePrefix ++ Array(x), sub) }.filter(x => x._2.nonEmpty) } - - /** - * Calculate all patterns of a projected database in local. - * @param minCount the minimum count - * @param maxPatternLength maximum pattern length - * @param prefix prefix - * @param projectedDatabase projected database - * @return patterns - */ - private def getPatternsWithPrefix( - minCount: Long, - maxPatternLength: Int, - prefix: Array[Int], - projectedDatabase: Array[Array[Int]]): Array[(Array[Int], Long)] = { - val frequentPrefixAndCounts = getFreqItemAndCounts(minCount, projectedDatabase) - val frequentPatternAndCounts = frequentPrefixAndCounts - .map(x => (prefix ++ Array(x._1), x._2)) - val prefixProjectedDatabases = getPatternAndProjectedDatabase( - prefix, frequentPrefixAndCounts.map(_._1), projectedDatabase) - - val continueProcess = prefixProjectedDatabases.nonEmpty && prefix.length + 1 < maxPatternLength - if (continueProcess) { - val nextPatterns = prefixProjectedDatabases - .map(x => getPatternsWithPrefix(minCount, maxPatternLength, x._1, x._2)) - .reduce(_ ++ _) - frequentPatternAndCounts ++ nextPatterns - } else { - frequentPatternAndCounts - } - } } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala index 2239aa529695c..9d8c60ef0fc45 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala @@ -82,10 +82,15 @@ class PrefixSpan private ( logWarning("Input data is not cached.") } val minCount = getMinCount(sequences) - val (lengthOnePatternsAndCounts, prefixAndCandidates) = - findLengthOnePatterns(minCount, sequences) - val projectedDatabase = makePrefixProjectedDatabases(prefixAndCandidates) - val nextPatterns = getPatternsInLocal(minCount, projectedDatabase) + val lengthOnePatternsAndCounts = + getFreqItemAndCounts(minCount, sequences).collect() + val prefixAndProjectedDatabase = getPrefixAndProjectedDatabase( + lengthOnePatternsAndCounts.map(_._1), sequences) + val groupedProjectedDatabase = prefixAndProjectedDatabase + .map(x => (x._1.toSeq, x._2)) + .groupByKey() + .map(x => (x._1.toArray, x._2.toArray)) + val nextPatterns = getPatternsInLocal(minCount, groupedProjectedDatabase) val lengthOnePatternsAndCountsRdd = sequences.sparkContext.parallelize( lengthOnePatternsAndCounts.map(x => (Array(x._1), x._2))) @@ -122,7 +127,7 @@ class PrefixSpan private ( * @param sequences sequences data * @return prefixes and projected database */ - private def getPatternAndProjectedDatabase( + private def getPrefixAndProjectedDatabase( frequentPrefixes: Array[Int], sequences: RDD[Array[Int]]): RDD[(Array[Int], Array[Int])] = { val filteredSequences = sequences.map { p => @@ -136,33 +141,6 @@ class PrefixSpan private ( } } - /** - * Find the patterns that it's length is one - * @param minCount the minimum count - * @param sequences original sequences data - * @return length-one patterns and projection table - */ - private def findLengthOnePatterns( - minCount: Long, - sequences: RDD[Array[Int]]): (Array[(Int, Long)], RDD[(Array[Int], Array[Int])]) = { - val frequentLengthOnePatternAndCounts = getFreqItemAndCounts(minCount, sequences) - val prefixAndProjectedDatabase = getPatternAndProjectedDatabase( - frequentLengthOnePatternAndCounts.keys.collect(), sequences) - (frequentLengthOnePatternAndCounts.collect(), prefixAndProjectedDatabase) - } - - /** - * Constructs prefix-projected databases from (prefix, suffix) pairs. - * @param data patterns and projected sequences data before re-partition - * @return patterns and projected sequences data after re-partition - */ - private def makePrefixProjectedDatabases( - data: RDD[(Array[Int], Array[Int])]): RDD[(Array[Int], Array[Array[Int]])] = { - data.map(x => (x._1.toSeq, x._2)) - .groupByKey() - .map(x => (x._1.toArray, x._2.toArray)) - } - /** * calculate the patterns in local. * @param minCount the absolute minimum count From 22b0ef463beb0e0fe9cc696989245da79722a3a6 Mon Sep 17 00:00:00 2001 From: zhangjiajin Date: Tue, 14 Jul 2015 10:21:04 +0800 Subject: [PATCH 11/25] Add feature: Collect enough frequent prefixes before projection in PrefixSpan. --- .../apache/spark/mllib/fpm/PrefixSpan.scala | 75 ++++++++++++++++--- 1 file changed, 65 insertions(+), 10 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala index 9d8c60ef0fc45..82d864b44fa6e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala @@ -43,6 +43,8 @@ class PrefixSpan private ( private var minSupport: Double, private var maxPatternLength: Int) extends Logging with Serializable { + private val minPatternsBeforeShuffle: Int = 20 + /** * Constructs a default instance with default parameters * {minSupport: `0.1`, maxPatternLength: `10`}. @@ -86,16 +88,69 @@ class PrefixSpan private ( getFreqItemAndCounts(minCount, sequences).collect() val prefixAndProjectedDatabase = getPrefixAndProjectedDatabase( lengthOnePatternsAndCounts.map(_._1), sequences) - val groupedProjectedDatabase = prefixAndProjectedDatabase - .map(x => (x._1.toSeq, x._2)) - .groupByKey() - .map(x => (x._1.toArray, x._2.toArray)) - val nextPatterns = getPatternsInLocal(minCount, groupedProjectedDatabase) - val lengthOnePatternsAndCountsRdd = - sequences.sparkContext.parallelize( - lengthOnePatternsAndCounts.map(x => (Array(x._1), x._2))) - val allPatterns = lengthOnePatternsAndCountsRdd ++ nextPatterns - allPatterns + + var patternsCount = lengthOnePatternsAndCounts.length + var allPatternAndCounts = sequences.sparkContext.parallelize( + lengthOnePatternsAndCounts.map(x => (Array(x._1), x._2))) + var currentProjectedDatabase = prefixAndProjectedDatabase + while (patternsCount <= minPatternsBeforeShuffle && + currentProjectedDatabase.count() != 0) { + val (nextPatternAndCounts, nextProjectedDatabase) = + getPatternCountsAndProjectedDatabase(minCount, currentProjectedDatabase) + patternsCount = nextPatternAndCounts.count().toInt + currentProjectedDatabase = nextProjectedDatabase + allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts + } + if (patternsCount > 0) { + val groupedProjectedDatabase = currentProjectedDatabase + .map(x => (x._1.toSeq, x._2)) + .groupByKey() + .map(x => (x._1.toArray, x._2.toArray)) + val nextPatternAndCounts = getPatternsInLocal(minCount, groupedProjectedDatabase) + allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts + } + allPatternAndCounts + } + + /** + * Get the pattern and counts, and projected database + * @param minCount minimum count + * @param prefixAndProjectedDatabase prefix and projected database, + * @return pattern and counts, and projected database + * (Array[pattern, count], RDD[prefix, projected database ]) + */ + private def getPatternCountsAndProjectedDatabase( + minCount: Long, + prefixAndProjectedDatabase: RDD[(Array[Int], Array[Int])]): + (RDD[(Array[Int], Long)], RDD[(Array[Int], Array[Int])]) = { + val prefixAndFreqentItemAndCounts = prefixAndProjectedDatabase.flatMap{ x => + x._2.distinct.map(y => ((x._1.toSeq, y), 1L)) + }.reduceByKey(_+_) + .filter(_._2 >= minCount) + val patternAndCounts = prefixAndFreqentItemAndCounts + .map(x => (x._1._1.toArray ++ Array(x._1._2), x._2)) + val prefixlength = prefixAndProjectedDatabase.take(1)(0)._1.length + if (prefixlength + 1 >= maxPatternLength) { + (patternAndCounts, prefixAndProjectedDatabase.filter(x => false)) + } else { + val frequentItemsMap = prefixAndFreqentItemAndCounts + .keys.map(x => (x._1, x._2)) + .groupByKey() + .mapValues(_.toSet) + .collect + .toMap + val nextPrefixAndProjectedDatabase = prefixAndProjectedDatabase + .filter(x => frequentItemsMap.contains(x._1)) + .flatMap { x => + val frequentItemSet = frequentItemsMap(x._1) + val filteredSequence = x._2.filter(frequentItemSet.contains(_)) + val subProjectedDabase = frequentItemSet.map{ y => + (y, LocalPrefixSpan.getSuffix(y, filteredSequence)) + }.filter(_._2.nonEmpty) + subProjectedDabase.map(y => (x._1 ++ Array(y._1), y._2)) + } + (patternAndCounts, nextPrefixAndProjectedDatabase) + } } /** From 078d4101f56c68c6f191de57f9e542a80f2c89b5 Mon Sep 17 00:00:00 2001 From: zhangjiajin Date: Tue, 14 Jul 2015 10:46:05 +0800 Subject: [PATCH 12/25] fix a scala style error. --- .../src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala index 82d864b44fa6e..33e381e6d4d66 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala @@ -125,7 +125,7 @@ class PrefixSpan private ( (RDD[(Array[Int], Long)], RDD[(Array[Int], Array[Int])]) = { val prefixAndFreqentItemAndCounts = prefixAndProjectedDatabase.flatMap{ x => x._2.distinct.map(y => ((x._1.toSeq, y), 1L)) - }.reduceByKey(_+_) + }.reduceByKey(_ + _) .filter(_._2 >= minCount) val patternAndCounts = prefixAndFreqentItemAndCounts .map(x => (x._1._1.toArray ++ Array(x._1._2), x._2)) From 4dd1c8a2393b91dc1841c3b01dad7163371dd434 Mon Sep 17 00:00:00 2001 From: zhangjiajin Date: Wed, 15 Jul 2015 10:57:41 +0800 Subject: [PATCH 13/25] initialize file before rebase. --- .../apache/spark/mllib/fpm/PrefixSpan.scala | 75 +++---------------- 1 file changed, 10 insertions(+), 65 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala index 33e381e6d4d66..9d8c60ef0fc45 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala @@ -43,8 +43,6 @@ class PrefixSpan private ( private var minSupport: Double, private var maxPatternLength: Int) extends Logging with Serializable { - private val minPatternsBeforeShuffle: Int = 20 - /** * Constructs a default instance with default parameters * {minSupport: `0.1`, maxPatternLength: `10`}. @@ -88,69 +86,16 @@ class PrefixSpan private ( getFreqItemAndCounts(minCount, sequences).collect() val prefixAndProjectedDatabase = getPrefixAndProjectedDatabase( lengthOnePatternsAndCounts.map(_._1), sequences) - - var patternsCount = lengthOnePatternsAndCounts.length - var allPatternAndCounts = sequences.sparkContext.parallelize( - lengthOnePatternsAndCounts.map(x => (Array(x._1), x._2))) - var currentProjectedDatabase = prefixAndProjectedDatabase - while (patternsCount <= minPatternsBeforeShuffle && - currentProjectedDatabase.count() != 0) { - val (nextPatternAndCounts, nextProjectedDatabase) = - getPatternCountsAndProjectedDatabase(minCount, currentProjectedDatabase) - patternsCount = nextPatternAndCounts.count().toInt - currentProjectedDatabase = nextProjectedDatabase - allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts - } - if (patternsCount > 0) { - val groupedProjectedDatabase = currentProjectedDatabase - .map(x => (x._1.toSeq, x._2)) - .groupByKey() - .map(x => (x._1.toArray, x._2.toArray)) - val nextPatternAndCounts = getPatternsInLocal(minCount, groupedProjectedDatabase) - allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts - } - allPatternAndCounts - } - - /** - * Get the pattern and counts, and projected database - * @param minCount minimum count - * @param prefixAndProjectedDatabase prefix and projected database, - * @return pattern and counts, and projected database - * (Array[pattern, count], RDD[prefix, projected database ]) - */ - private def getPatternCountsAndProjectedDatabase( - minCount: Long, - prefixAndProjectedDatabase: RDD[(Array[Int], Array[Int])]): - (RDD[(Array[Int], Long)], RDD[(Array[Int], Array[Int])]) = { - val prefixAndFreqentItemAndCounts = prefixAndProjectedDatabase.flatMap{ x => - x._2.distinct.map(y => ((x._1.toSeq, y), 1L)) - }.reduceByKey(_ + _) - .filter(_._2 >= minCount) - val patternAndCounts = prefixAndFreqentItemAndCounts - .map(x => (x._1._1.toArray ++ Array(x._1._2), x._2)) - val prefixlength = prefixAndProjectedDatabase.take(1)(0)._1.length - if (prefixlength + 1 >= maxPatternLength) { - (patternAndCounts, prefixAndProjectedDatabase.filter(x => false)) - } else { - val frequentItemsMap = prefixAndFreqentItemAndCounts - .keys.map(x => (x._1, x._2)) - .groupByKey() - .mapValues(_.toSet) - .collect - .toMap - val nextPrefixAndProjectedDatabase = prefixAndProjectedDatabase - .filter(x => frequentItemsMap.contains(x._1)) - .flatMap { x => - val frequentItemSet = frequentItemsMap(x._1) - val filteredSequence = x._2.filter(frequentItemSet.contains(_)) - val subProjectedDabase = frequentItemSet.map{ y => - (y, LocalPrefixSpan.getSuffix(y, filteredSequence)) - }.filter(_._2.nonEmpty) - subProjectedDabase.map(y => (x._1 ++ Array(y._1), y._2)) - } - (patternAndCounts, nextPrefixAndProjectedDatabase) - } + val groupedProjectedDatabase = prefixAndProjectedDatabase + .map(x => (x._1.toSeq, x._2)) + .groupByKey() + .map(x => (x._1.toArray, x._2.toArray)) + val nextPatterns = getPatternsInLocal(minCount, groupedProjectedDatabase) + val lengthOnePatternsAndCountsRdd = + sequences.sparkContext.parallelize( + lengthOnePatternsAndCounts.map(x => (Array(x._1), x._2))) + val allPatterns = lengthOnePatternsAndCountsRdd ++ nextPatterns + allPatterns } /** From 6560c6916edeff900e54c6b5ee5b7c44cac87724 Mon Sep 17 00:00:00 2001 From: zhangjiajin Date: Wed, 15 Jul 2015 11:44:42 +0800 Subject: [PATCH 14/25] Add feature: Collect enough frequent prefixes before projection in PrefixeSpan --- .../apache/spark/mllib/fpm/PrefixSpan.scala | 75 ++++++++++++++++--- 1 file changed, 65 insertions(+), 10 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala index 9d8c60ef0fc45..33e381e6d4d66 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala @@ -43,6 +43,8 @@ class PrefixSpan private ( private var minSupport: Double, private var maxPatternLength: Int) extends Logging with Serializable { + private val minPatternsBeforeShuffle: Int = 20 + /** * Constructs a default instance with default parameters * {minSupport: `0.1`, maxPatternLength: `10`}. @@ -86,16 +88,69 @@ class PrefixSpan private ( getFreqItemAndCounts(minCount, sequences).collect() val prefixAndProjectedDatabase = getPrefixAndProjectedDatabase( lengthOnePatternsAndCounts.map(_._1), sequences) - val groupedProjectedDatabase = prefixAndProjectedDatabase - .map(x => (x._1.toSeq, x._2)) - .groupByKey() - .map(x => (x._1.toArray, x._2.toArray)) - val nextPatterns = getPatternsInLocal(minCount, groupedProjectedDatabase) - val lengthOnePatternsAndCountsRdd = - sequences.sparkContext.parallelize( - lengthOnePatternsAndCounts.map(x => (Array(x._1), x._2))) - val allPatterns = lengthOnePatternsAndCountsRdd ++ nextPatterns - allPatterns + + var patternsCount = lengthOnePatternsAndCounts.length + var allPatternAndCounts = sequences.sparkContext.parallelize( + lengthOnePatternsAndCounts.map(x => (Array(x._1), x._2))) + var currentProjectedDatabase = prefixAndProjectedDatabase + while (patternsCount <= minPatternsBeforeShuffle && + currentProjectedDatabase.count() != 0) { + val (nextPatternAndCounts, nextProjectedDatabase) = + getPatternCountsAndProjectedDatabase(minCount, currentProjectedDatabase) + patternsCount = nextPatternAndCounts.count().toInt + currentProjectedDatabase = nextProjectedDatabase + allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts + } + if (patternsCount > 0) { + val groupedProjectedDatabase = currentProjectedDatabase + .map(x => (x._1.toSeq, x._2)) + .groupByKey() + .map(x => (x._1.toArray, x._2.toArray)) + val nextPatternAndCounts = getPatternsInLocal(minCount, groupedProjectedDatabase) + allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts + } + allPatternAndCounts + } + + /** + * Get the pattern and counts, and projected database + * @param minCount minimum count + * @param prefixAndProjectedDatabase prefix and projected database, + * @return pattern and counts, and projected database + * (Array[pattern, count], RDD[prefix, projected database ]) + */ + private def getPatternCountsAndProjectedDatabase( + minCount: Long, + prefixAndProjectedDatabase: RDD[(Array[Int], Array[Int])]): + (RDD[(Array[Int], Long)], RDD[(Array[Int], Array[Int])]) = { + val prefixAndFreqentItemAndCounts = prefixAndProjectedDatabase.flatMap{ x => + x._2.distinct.map(y => ((x._1.toSeq, y), 1L)) + }.reduceByKey(_ + _) + .filter(_._2 >= minCount) + val patternAndCounts = prefixAndFreqentItemAndCounts + .map(x => (x._1._1.toArray ++ Array(x._1._2), x._2)) + val prefixlength = prefixAndProjectedDatabase.take(1)(0)._1.length + if (prefixlength + 1 >= maxPatternLength) { + (patternAndCounts, prefixAndProjectedDatabase.filter(x => false)) + } else { + val frequentItemsMap = prefixAndFreqentItemAndCounts + .keys.map(x => (x._1, x._2)) + .groupByKey() + .mapValues(_.toSet) + .collect + .toMap + val nextPrefixAndProjectedDatabase = prefixAndProjectedDatabase + .filter(x => frequentItemsMap.contains(x._1)) + .flatMap { x => + val frequentItemSet = frequentItemsMap(x._1) + val filteredSequence = x._2.filter(frequentItemSet.contains(_)) + val subProjectedDabase = frequentItemSet.map{ y => + (y, LocalPrefixSpan.getSuffix(y, filteredSequence)) + }.filter(_._2.nonEmpty) + subProjectedDabase.map(y => (x._1 ++ Array(y._1), y._2)) + } + (patternAndCounts, nextPrefixAndProjectedDatabase) + } } /** From baa2885681f19897cc5158f4fc9338543a55487a Mon Sep 17 00:00:00 2001 From: zhangjiajin Date: Wed, 15 Jul 2015 16:48:59 +0800 Subject: [PATCH 15/25] Modified the code according to the review comments. --- .../apache/spark/mllib/fpm/PrefixSpan.scala | 77 +++++++++---------- 1 file changed, 37 insertions(+), 40 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala index 33e381e6d4d66..e056f2146c3f1 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala @@ -84,72 +84,69 @@ class PrefixSpan private ( logWarning("Input data is not cached.") } val minCount = getMinCount(sequences) - val lengthOnePatternsAndCounts = - getFreqItemAndCounts(minCount, sequences).collect() - val prefixAndProjectedDatabase = getPrefixAndProjectedDatabase( - lengthOnePatternsAndCounts.map(_._1), sequences) - - var patternsCount = lengthOnePatternsAndCounts.length - var allPatternAndCounts = sequences.sparkContext.parallelize( - lengthOnePatternsAndCounts.map(x => (Array(x._1), x._2))) - var currentProjectedDatabase = prefixAndProjectedDatabase - while (patternsCount <= minPatternsBeforeShuffle && - currentProjectedDatabase.count() != 0) { - val (nextPatternAndCounts, nextProjectedDatabase) = - getPatternCountsAndProjectedDatabase(minCount, currentProjectedDatabase) + val lengthOnePatternsAndCounts = getFreqItemAndCounts(minCount, sequences) + val prefixSuffixPairs = getPrefixSuffixPairs( + lengthOnePatternsAndCounts.map(_._1).collect(), sequences) + var patternsCount: Long = lengthOnePatternsAndCounts.count() + var allPatternAndCounts = lengthOnePatternsAndCounts.map(x => (Array(x._1), x._2)) + var currentPrefixSuffixPairs = prefixSuffixPairs + while (patternsCount <= minPatternsBeforeShuffle && currentPrefixSuffixPairs.count() != 0) { + val (nextPatternAndCounts, nextPrefixSuffixPairs) = + getPatternCountsAndPrefixSuffixPairs(minCount, currentPrefixSuffixPairs) patternsCount = nextPatternAndCounts.count().toInt - currentProjectedDatabase = nextProjectedDatabase + currentPrefixSuffixPairs = nextPrefixSuffixPairs allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts } if (patternsCount > 0) { - val groupedProjectedDatabase = currentProjectedDatabase + val projectedDatabase = currentPrefixSuffixPairs .map(x => (x._1.toSeq, x._2)) .groupByKey() .map(x => (x._1.toArray, x._2.toArray)) - val nextPatternAndCounts = getPatternsInLocal(minCount, groupedProjectedDatabase) + val nextPatternAndCounts = getPatternsInLocal(minCount, projectedDatabase) allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts } allPatternAndCounts } /** - * Get the pattern and counts, and projected database + * Get the pattern and counts, and prefix suffix pairs * @param minCount minimum count - * @param prefixAndProjectedDatabase prefix and projected database, - * @return pattern and counts, and projected database - * (Array[pattern, count], RDD[prefix, projected database ]) + * @param prefixSuffixPairs prefix and suffix pairs, + * @return pattern and counts, and prefix suffix pairs + * (Array[pattern, count], RDD[prefix, suffix ]) */ - private def getPatternCountsAndProjectedDatabase( + private def getPatternCountsAndPrefixSuffixPairs( minCount: Long, - prefixAndProjectedDatabase: RDD[(Array[Int], Array[Int])]): + prefixSuffixPairs: RDD[(Array[Int], Array[Int])]): (RDD[(Array[Int], Long)], RDD[(Array[Int], Array[Int])]) = { - val prefixAndFreqentItemAndCounts = prefixAndProjectedDatabase.flatMap{ x => - x._2.distinct.map(y => ((x._1.toSeq, y), 1L)) + val prefixAndFreqentItemAndCounts = prefixSuffixPairs + .flatMap { case (prefix, suffix) => + suffix.distinct.map(y => ((prefix.toSeq, y), 1L)) }.reduceByKey(_ + _) .filter(_._2 >= minCount) val patternAndCounts = prefixAndFreqentItemAndCounts - .map(x => (x._1._1.toArray ++ Array(x._1._2), x._2)) - val prefixlength = prefixAndProjectedDatabase.take(1)(0)._1.length + .map{ case ((prefix, item), count) => (prefix.toArray :+ item, count) } + val prefixlength = prefixSuffixPairs.first()._1.length if (prefixlength + 1 >= maxPatternLength) { - (patternAndCounts, prefixAndProjectedDatabase.filter(x => false)) + (patternAndCounts, prefixSuffixPairs.filter(x => false)) } else { val frequentItemsMap = prefixAndFreqentItemAndCounts - .keys.map(x => (x._1, x._2)) + .keys .groupByKey() .mapValues(_.toSet) .collect .toMap - val nextPrefixAndProjectedDatabase = prefixAndProjectedDatabase + val nextPrefixSuffixPairs = prefixSuffixPairs .filter(x => frequentItemsMap.contains(x._1)) - .flatMap { x => - val frequentItemSet = frequentItemsMap(x._1) - val filteredSequence = x._2.filter(frequentItemSet.contains(_)) - val subProjectedDabase = frequentItemSet.map{ y => - (y, LocalPrefixSpan.getSuffix(y, filteredSequence)) + .flatMap { case (prefix, suffix) => + val frequentItemSet = frequentItemsMap(prefix) + val filteredSuffix = suffix.filter(frequentItemSet.contains(_)) + val nextSuffixes = frequentItemSet.map{ item => + (item, LocalPrefixSpan.getSuffix(item, filteredSuffix)) }.filter(_._2.nonEmpty) - subProjectedDabase.map(y => (x._1 ++ Array(y._1), y._2)) + nextSuffixes.map { case (item, suffix) => (prefix :+ item, suffix) } } - (patternAndCounts, nextPrefixAndProjectedDatabase) + (patternAndCounts, nextPrefixSuffixPairs) } } @@ -177,12 +174,12 @@ class PrefixSpan private ( } /** - * Get the frequent prefixes' projected database. + * Get the frequent prefixes and suffix pairs. * @param frequentPrefixes frequent prefixes * @param sequences sequences data - * @return prefixes and projected database + * @return prefixes and suffix pairs. */ - private def getPrefixAndProjectedDatabase( + private def getPrefixSuffixPairs( frequentPrefixes: Array[Int], sequences: RDD[Array[Int]]): RDD[(Array[Int], Array[Int])] = { val filteredSequences = sequences.map { p => @@ -199,7 +196,7 @@ class PrefixSpan private ( /** * calculate the patterns in local. * @param minCount the absolute minimum count - * @param data patterns and projected sequences data data + * @param data prefixes and projected sequences data data * @return patterns */ private def getPatternsInLocal( From 095aa3a390446205a4d22227b7ed1fbce46f2c93 Mon Sep 17 00:00:00 2001 From: zhangjiajin Date: Thu, 16 Jul 2015 11:26:26 +0800 Subject: [PATCH 16/25] Modified the code according to the review comments. --- .../apache/spark/mllib/fpm/PrefixSpan.scala | 87 ++++++++++--------- 1 file changed, 44 insertions(+), 43 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala index e056f2146c3f1..aed7e30033b8a 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala @@ -17,6 +17,8 @@ package org.apache.spark.mllib.fpm +import scala.collection.mutable.ArrayBuffer + import org.apache.spark.Logging import org.apache.spark.annotation.Experimental import org.apache.spark.rdd.RDD @@ -43,7 +45,7 @@ class PrefixSpan private ( private var minSupport: Double, private var maxPatternLength: Int) extends Logging with Serializable { - private val minPatternsBeforeShuffle: Int = 20 + private val minPatternsBeforeLocalProcessing: Int = 20 /** * Constructs a default instance with default parameters @@ -88,16 +90,20 @@ class PrefixSpan private ( val prefixSuffixPairs = getPrefixSuffixPairs( lengthOnePatternsAndCounts.map(_._1).collect(), sequences) var patternsCount: Long = lengthOnePatternsAndCounts.count() - var allPatternAndCounts = lengthOnePatternsAndCounts.map(x => (Array(x._1), x._2)) + var allPatternAndCounts = lengthOnePatternsAndCounts.map(x => (ArrayBuffer(x._1), x._2)) var currentPrefixSuffixPairs = prefixSuffixPairs - while (patternsCount <= minPatternsBeforeShuffle && currentPrefixSuffixPairs.count() != 0) { + var patternLength: Int = 1 + while (patternLength < maxPatternLength && + patternsCount <= minPatternsBeforeLocalProcessing && + currentPrefixSuffixPairs.count() != 0) { val (nextPatternAndCounts, nextPrefixSuffixPairs) = getPatternCountsAndPrefixSuffixPairs(minCount, currentPrefixSuffixPairs) - patternsCount = nextPatternAndCounts.count().toInt + patternsCount = nextPatternAndCounts.count() currentPrefixSuffixPairs = nextPrefixSuffixPairs allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts + patternLength = patternLength + 1 } - if (patternsCount > 0) { + if (patternLength < maxPatternLength && patternsCount > 0) { val projectedDatabase = currentPrefixSuffixPairs .map(x => (x._1.toSeq, x._2)) .groupByKey() @@ -105,49 +111,44 @@ class PrefixSpan private ( val nextPatternAndCounts = getPatternsInLocal(minCount, projectedDatabase) allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts } - allPatternAndCounts + allPatternAndCounts.map { case (pattern, count) => (pattern.toArray, count) } } /** * Get the pattern and counts, and prefix suffix pairs * @param minCount minimum count - * @param prefixSuffixPairs prefix and suffix pairs, - * @return pattern and counts, and prefix suffix pairs - * (Array[pattern, count], RDD[prefix, suffix ]) + * @param prefixSuffixPairs prefix (length n) and suffix pairs, + * @return pattern (length n+1) and counts, and prefix (length n+1) and suffix pairs + * (RDD[pattern, count], RDD[prefix, suffix ]) */ private def getPatternCountsAndPrefixSuffixPairs( minCount: Long, - prefixSuffixPairs: RDD[(Array[Int], Array[Int])]): - (RDD[(Array[Int], Long)], RDD[(Array[Int], Array[Int])]) = { - val prefixAndFreqentItemAndCounts = prefixSuffixPairs - .flatMap { case (prefix, suffix) => - suffix.distinct.map(y => ((prefix.toSeq, y), 1L)) - }.reduceByKey(_ + _) + prefixSuffixPairs: RDD[(ArrayBuffer[Int], Array[Int])]): + (RDD[(ArrayBuffer[Int], Long)], RDD[(ArrayBuffer[Int], Array[Int])]) = { + val prefixAndFrequentItemAndCounts = prefixSuffixPairs + .flatMap { case (prefix, suffix) => suffix.distinct.map(y => ((prefix, y), 1L)) } + .reduceByKey(_ + _) .filter(_._2 >= minCount) - val patternAndCounts = prefixAndFreqentItemAndCounts - .map{ case ((prefix, item), count) => (prefix.toArray :+ item, count) } - val prefixlength = prefixSuffixPairs.first()._1.length - if (prefixlength + 1 >= maxPatternLength) { - (patternAndCounts, prefixSuffixPairs.filter(x => false)) - } else { - val frequentItemsMap = prefixAndFreqentItemAndCounts - .keys - .groupByKey() - .mapValues(_.toSet) - .collect - .toMap - val nextPrefixSuffixPairs = prefixSuffixPairs - .filter(x => frequentItemsMap.contains(x._1)) - .flatMap { case (prefix, suffix) => - val frequentItemSet = frequentItemsMap(prefix) - val filteredSuffix = suffix.filter(frequentItemSet.contains(_)) - val nextSuffixes = frequentItemSet.map{ item => - (item, LocalPrefixSpan.getSuffix(item, filteredSuffix)) - }.filter(_._2.nonEmpty) - nextSuffixes.map { case (item, suffix) => (prefix :+ item, suffix) } + val patternAndCounts = prefixAndFrequentItemAndCounts + .map { case ((prefix, item), count) => (prefix :+ item, count) } + val prefixToFrequentNextItemsMap = prefixAndFrequentItemAndCounts + .keys + .groupByKey() + .mapValues(_.toSet) + .collect() + .toMap + val nextPrefixSuffixPairs = prefixSuffixPairs + .filter(x => prefixToFrequentNextItemsMap.contains(x._1)) + .flatMap { case (prefix, suffix) => + val frequentNextItems = prefixToFrequentNextItemsMap(prefix) + val filteredSuffix = suffix.filter(frequentNextItems.contains(_)) + frequentNextItems.flatMap { item => + val suffix = LocalPrefixSpan.getSuffix(item, filteredSuffix) + if (suffix.isEmpty) None + else Some(prefix :+ item, suffix) } - (patternAndCounts, nextPrefixSuffixPairs) } + (patternAndCounts, nextPrefixSuffixPairs) } /** @@ -181,14 +182,14 @@ class PrefixSpan private ( */ private def getPrefixSuffixPairs( frequentPrefixes: Array[Int], - sequences: RDD[Array[Int]]): RDD[(Array[Int], Array[Int])] = { + sequences: RDD[Array[Int]]): RDD[(ArrayBuffer[Int], Array[Int])] = { val filteredSequences = sequences.map { p => p.filter (frequentPrefixes.contains(_) ) } filteredSequences.flatMap { x => frequentPrefixes.map { y => val sub = LocalPrefixSpan.getSuffix(y, x) - (Array(y), sub) + (ArrayBuffer(y), sub) }.filter(_._2.nonEmpty) } } @@ -201,9 +202,9 @@ class PrefixSpan private ( */ private def getPatternsInLocal( minCount: Long, - data: RDD[(Array[Int], Array[Array[Int]])]): RDD[(Array[Int], Long)] = { - data.flatMap { x => - LocalPrefixSpan.run(minCount, maxPatternLength, x._1, x._2) - } + data: RDD[(Array[Int], Array[Array[Int]])]): RDD[(ArrayBuffer[Int], Long)] = { + data + .flatMap { x => LocalPrefixSpan.run(minCount, maxPatternLength, x._1, x._2) } + .map { case (pattern, count) => (pattern.to[ArrayBuffer], count) } } } From d2250b7871035c8096d377805ca9f9a9cf90fdd3 Mon Sep 17 00:00:00 2001 From: zhangjiajin Date: Sat, 18 Jul 2015 18:03:37 +0800 Subject: [PATCH 17/25] remove minPatternsBeforeLocalProcessing, add maxSuffixesBeforeLocalProcessing. --- .../apache/spark/mllib/fpm/PrefixSpan.scala | 55 ++++++++++++++++--- 1 file changed, 46 insertions(+), 9 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala index 139b2f6952fb8..e6fd05ac87b20 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala @@ -45,7 +45,7 @@ class PrefixSpan private ( private var minSupport: Double, private var maxPatternLength: Int) extends Logging with Serializable { - private val minPatternsBeforeLocalProcessing: Int = 20 + private val maxSuffixesBeforeLocalProcessing: Long = 10000 /** * Constructs a default instance with default parameters @@ -91,20 +91,25 @@ class PrefixSpan private ( lengthOnePatternsAndCounts.map(_._1).collect(), sequences) var patternsCount: Long = lengthOnePatternsAndCounts.count() var allPatternAndCounts = lengthOnePatternsAndCounts.map(x => (ArrayBuffer(x._1), x._2)) - var currentPrefixSuffixPairs = prefixSuffixPairs + var (smallPrefixSuffixPairs, largePrefixSuffixPairs) = + splitPrefixSuffixPairs(prefixSuffixPairs) + largePrefixSuffixPairs.persist(StorageLevel.MEMORY_AND_DISK) var patternLength: Int = 1 while (patternLength < maxPatternLength && - patternsCount <= minPatternsBeforeLocalProcessing && - currentPrefixSuffixPairs.count() != 0) { + largePrefixSuffixPairs.count() != 0) { val (nextPatternAndCounts, nextPrefixSuffixPairs) = - getPatternCountsAndPrefixSuffixPairs(minCount, currentPrefixSuffixPairs) + getPatternCountsAndPrefixSuffixPairs(minCount, largePrefixSuffixPairs) patternsCount = nextPatternAndCounts.count() - currentPrefixSuffixPairs = nextPrefixSuffixPairs + largePrefixSuffixPairs.unpersist() + val splitedPrefixSuffixPairs = splitPrefixSuffixPairs(nextPrefixSuffixPairs) + largePrefixSuffixPairs = splitedPrefixSuffixPairs._2 + largePrefixSuffixPairs.persist(StorageLevel.MEMORY_AND_DISK) + smallPrefixSuffixPairs = smallPrefixSuffixPairs ++ splitedPrefixSuffixPairs._1 allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts patternLength = patternLength + 1 } - if (patternLength < maxPatternLength && patternsCount > 0) { - val projectedDatabase = currentPrefixSuffixPairs + if (smallPrefixSuffixPairs.count() > 0) { + val projectedDatabase = smallPrefixSuffixPairs .map(x => (x._1.toSeq, x._2)) .groupByKey() .map(x => (x._1.toArray, x._2.toArray)) @@ -114,6 +119,38 @@ class PrefixSpan private ( allPatternAndCounts.map { case (pattern, count) => (pattern.toArray, count) } } + + /** + * Split prefix suffix pairs to two parts: + * suffixes' size less than maxSuffixesBeforeLocalProcessing and + * suffixes' size more than maxSuffixesBeforeLocalProcessing + * @param prefixSuffixPairs prefix (length n) and suffix pairs, + * @return small size prefix suffix pairs and big size prefix suffix pairs + * (RDD[prefix, suffix], RDD[prefix, suffix ]) + */ + private def splitPrefixSuffixPairs( + prefixSuffixPairs: RDD[(ArrayBuffer[Int], Array[Int])]): + (RDD[(ArrayBuffer[Int], Array[Int])], RDD[(ArrayBuffer[Int], Array[Int])]) = { + val suffixSizeMap = prefixSuffixPairs + .map(x => (x._1, x._2.length)) + .reduceByKey(_ + _) + .map(x => (x._2 <= maxSuffixesBeforeLocalProcessing, Set(x._1))) + .reduceByKey(_ ++ _) + .collect + .toMap + val small = if (suffixSizeMap.contains(true)) { + prefixSuffixPairs.filter(x => suffixSizeMap(true).contains(x._1)) + } else { + prefixSuffixPairs.filter(x => false) + } + val large = if (suffixSizeMap.contains(false)) { + prefixSuffixPairs.filter(x => suffixSizeMap(false).contains(x._1)) + } else { + prefixSuffixPairs.filter(x => false) + } + (small, large) + } + /** * Get the pattern and counts, and prefix suffix pairs * @param minCount minimum count @@ -205,7 +242,7 @@ class PrefixSpan private ( data: RDD[(Array[Int], Array[Array[Int]])]): RDD[(ArrayBuffer[Int], Long)] = { data.flatMap { case (prefix, projDB) => - LocalPrefixSpan.run(minCount, maxPatternLength, prefix.toList, projDB) + LocalPrefixSpan.run(minCount, maxPatternLength, prefix.toList.reverse, projDB) .map { case (pattern: List[Int], count: Long) => (pattern.toArray.reverse.to[ArrayBuffer], count) } From 64271b3f802ce1f5870209b73ff7dd0e73442076 Mon Sep 17 00:00:00 2001 From: zhangjiajin Date: Mon, 27 Jul 2015 12:28:42 +0800 Subject: [PATCH 18/25] Modified codes according to comments. --- .../apache/spark/mllib/fpm/PrefixSpan.scala | 27 ++++++++----------- 1 file changed, 11 insertions(+), 16 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala index e6fd05ac87b20..cbb514d467b4b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala @@ -45,7 +45,7 @@ class PrefixSpan private ( private var minSupport: Double, private var maxPatternLength: Int) extends Logging with Serializable { - private val maxSuffixesBeforeLocalProcessing: Long = 10000 + private val maxProjectedDBSizeBeforeLocalProcessing: Long = 10000 /** * Constructs a default instance with default parameters @@ -89,24 +89,19 @@ class PrefixSpan private ( val lengthOnePatternsAndCounts = getFreqItemAndCounts(minCount, sequences) val prefixSuffixPairs = getPrefixSuffixPairs( lengthOnePatternsAndCounts.map(_._1).collect(), sequences) - var patternsCount: Long = lengthOnePatternsAndCounts.count() + prefixSuffixPairs.persist(StorageLevel.MEMORY_AND_DISK) var allPatternAndCounts = lengthOnePatternsAndCounts.map(x => (ArrayBuffer(x._1), x._2)) var (smallPrefixSuffixPairs, largePrefixSuffixPairs) = splitPrefixSuffixPairs(prefixSuffixPairs) - largePrefixSuffixPairs.persist(StorageLevel.MEMORY_AND_DISK) - var patternLength: Int = 1 - while (patternLength < maxPatternLength && - largePrefixSuffixPairs.count() != 0) { + while (largePrefixSuffixPairs.count() != 0) { val (nextPatternAndCounts, nextPrefixSuffixPairs) = getPatternCountsAndPrefixSuffixPairs(minCount, largePrefixSuffixPairs) - patternsCount = nextPatternAndCounts.count() largePrefixSuffixPairs.unpersist() - val splitedPrefixSuffixPairs = splitPrefixSuffixPairs(nextPrefixSuffixPairs) - largePrefixSuffixPairs = splitedPrefixSuffixPairs._2 + val (smallerPairsPart, largerPairsPart) = splitPrefixSuffixPairs(nextPrefixSuffixPairs) + largePrefixSuffixPairs = largerPairsPart largePrefixSuffixPairs.persist(StorageLevel.MEMORY_AND_DISK) - smallPrefixSuffixPairs = smallPrefixSuffixPairs ++ splitedPrefixSuffixPairs._1 - allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts - patternLength = patternLength + 1 + smallPrefixSuffixPairs ++= smallerPairsPart + allPatternAndCounts ++= nextPatternAndCounts } if (smallPrefixSuffixPairs.count() > 0) { val projectedDatabase = smallPrefixSuffixPairs @@ -114,7 +109,7 @@ class PrefixSpan private ( .groupByKey() .map(x => (x._1.toArray, x._2.toArray)) val nextPatternAndCounts = getPatternsInLocal(minCount, projectedDatabase) - allPatternAndCounts = allPatternAndCounts ++ nextPatternAndCounts + allPatternAndCounts ++= nextPatternAndCounts } allPatternAndCounts.map { case (pattern, count) => (pattern.toArray, count) } } @@ -122,8 +117,8 @@ class PrefixSpan private ( /** * Split prefix suffix pairs to two parts: - * suffixes' size less than maxSuffixesBeforeLocalProcessing and - * suffixes' size more than maxSuffixesBeforeLocalProcessing + * Prefixes with projected databases smaller than maxSuffixesBeforeLocalProcessing and + * Prefixes with projected databases larger than maxSuffixesBeforeLocalProcessing * @param prefixSuffixPairs prefix (length n) and suffix pairs, * @return small size prefix suffix pairs and big size prefix suffix pairs * (RDD[prefix, suffix], RDD[prefix, suffix ]) @@ -134,7 +129,7 @@ class PrefixSpan private ( val suffixSizeMap = prefixSuffixPairs .map(x => (x._1, x._2.length)) .reduceByKey(_ + _) - .map(x => (x._2 <= maxSuffixesBeforeLocalProcessing, Set(x._1))) + .map(x => (x._2 <= maxProjectedDBSizeBeforeLocalProcessing, Set(x._1))) .reduceByKey(_ ++ _) .collect .toMap From 6e149fa3bd88a2347e635f03ab9ae5913e03beee Mon Sep 17 00:00:00 2001 From: Feynman Liang Date: Tue, 28 Jul 2015 14:36:36 -0700 Subject: [PATCH 19/25] Fix splitPrefixSuffixPairs --- .../apache/spark/mllib/fpm/PrefixSpan.scala | 30 ++++++++----------- .../spark/mllib/fpm/PrefixSpanSuite.scala | 21 ++++++------- 2 files changed, 23 insertions(+), 28 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala index cbb514d467b4b..b70ff9815adc8 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala @@ -58,7 +58,7 @@ class PrefixSpan private ( */ def setMinSupport(minSupport: Double): this.type = { require(minSupport >= 0 && minSupport <= 1, - "The minimum support value must be between 0 and 1, including 0 and 1.") + "The minimum support value must be in [0, 1].") this.minSupport = minSupport this } @@ -126,23 +126,17 @@ class PrefixSpan private ( private def splitPrefixSuffixPairs( prefixSuffixPairs: RDD[(ArrayBuffer[Int], Array[Int])]): (RDD[(ArrayBuffer[Int], Array[Int])], RDD[(ArrayBuffer[Int], Array[Int])]) = { - val suffixSizeMap = prefixSuffixPairs - .map(x => (x._1, x._2.length)) - .reduceByKey(_ + _) - .map(x => (x._2 <= maxProjectedDBSizeBeforeLocalProcessing, Set(x._1))) - .reduceByKey(_ ++ _) - .collect - .toMap - val small = if (suffixSizeMap.contains(true)) { - prefixSuffixPairs.filter(x => suffixSizeMap(true).contains(x._1)) - } else { - prefixSuffixPairs.filter(x => false) - } - val large = if (suffixSizeMap.contains(false)) { - prefixSuffixPairs.filter(x => suffixSizeMap(false).contains(x._1)) - } else { - prefixSuffixPairs.filter(x => false) - } + val prefixToSuffixSize = prefixSuffixPairs + .aggregateByKey(0)( + seqOp = { case (count, suffix) => count + suffix.length }, + combOp = { _ + _ }) + val smallPrefixes = prefixToSuffixSize + .filter(_._2 <= maxProjectedDBSizeBeforeLocalProcessing) + .map(_._1) + .collect() + .toSet + val small = prefixSuffixPairs.filter { case (prefix, _) => smallPrefixes.contains(prefix) } + val large = prefixSuffixPairs.filter { case (prefix, _) => !smallPrefixes.contains(prefix) } (small, large) } diff --git a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala index 9f107c89f6d80..6dd2dc926acc5 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala @@ -44,13 +44,6 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext { val rdd = sc.parallelize(sequences, 2).cache() - def compareResult( - expectedValue: Array[(Array[Int], Long)], - actualValue: Array[(Array[Int], Long)]): Boolean = { - expectedValue.map(x => (x._1.toSeq, x._2)).toSet == - actualValue.map(x => (x._1.toSeq, x._2)).toSet - } - val prefixspan = new PrefixSpan() .setMinSupport(0.33) .setMaxPatternLength(50) @@ -76,7 +69,7 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext { (Array(4, 5), 2L), (Array(5), 3L) ) - assert(compareResult(expectedValue1, result1.collect())) + assert(compareResults(expectedValue1, result1.collect())) prefixspan.setMinSupport(0.5).setMaxPatternLength(50) val result2 = prefixspan.run(rdd) @@ -87,7 +80,7 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext { (Array(4), 4L), (Array(5), 3L) ) - assert(compareResult(expectedValue2, result2.collect())) + assert(compareResults(expectedValue2, result2.collect())) prefixspan.setMinSupport(0.33).setMaxPatternLength(2) val result3 = prefixspan.run(rdd) @@ -107,6 +100,14 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext { (Array(4, 5), 2L), (Array(5), 3L) ) - assert(compareResult(expectedValue3, result3.collect())) + assert(compareResults(expectedValue3, result3.collect())) + } + + private def compareResults( + expectedValue: Array[(Array[Int], Long)], + actualValue: Array[(Array[Int], Long)]): Boolean = { + expectedValue.map(x => (x._1.toSeq, x._2)).toSet == + actualValue.map(x => (x._1.toSeq, x._2)).toSet } + } From 01c9ae9aa3f09aa4ec058db024f6a2cb482570bb Mon Sep 17 00:00:00 2001 From: Feynman Liang Date: Tue, 28 Jul 2015 14:39:42 -0700 Subject: [PATCH 20/25] Add getters --- .../org/apache/spark/mllib/fpm/PrefixSpan.scala | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala index b70ff9815adc8..5e6322f2a05a1 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala @@ -53,6 +53,12 @@ class PrefixSpan private ( */ def this() = this(0.1, 10) + /** + * Get the minimal support (i.e. the frequency of occurrence before a pattern is considered + * frequent). + */ + def getMinSupport(): Double = this.minSupport + /** * Sets the minimal support level (default: `0.1`). */ @@ -63,10 +69,16 @@ class PrefixSpan private ( this } + /** + * Gets the maximal pattern length (i.e. the length of the longest sequential pattern to consider. + */ + def getMaxPatternLength(): Double = this.maxPatternLength + /** * Sets maximal pattern length (default: `10`). */ def setMaxPatternLength(maxPatternLength: Int): this.type = { + // TODO: support unbounded pattern length when maxPatternLength = 0 require(maxPatternLength >= 1, "The maximum pattern length value must be greater than 0.") this.maxPatternLength = maxPatternLength From cb2a4fc71d4874f3bb3cf0d8b0331e5b41f7cf45 Mon Sep 17 00:00:00 2001 From: Feynman Liang Date: Tue, 28 Jul 2015 14:50:31 -0700 Subject: [PATCH 21/25] Inline code for readability --- .../apache/spark/mllib/fpm/PrefixSpan.scala | 64 ++++++++----------- 1 file changed, 25 insertions(+), 39 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala index 5e6322f2a05a1..5c563262e184d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala @@ -97,14 +97,31 @@ class PrefixSpan private ( if (sequences.getStorageLevel == StorageLevel.NONE) { logWarning("Input data is not cached.") } - val minCount = getMinCount(sequences) - val lengthOnePatternsAndCounts = getFreqItemAndCounts(minCount, sequences) - val prefixSuffixPairs = getPrefixSuffixPairs( - lengthOnePatternsAndCounts.map(_._1).collect(), sequences) + + // Convert min support to a min number of transactions for this dataset + val minCount = if (minSupport == 0) 0L else math.ceil(sequences.count() * minSupport).toLong + + val itemCounts = sequences + .flatMap(_.distinct.map((_, 1L))) + .reduceByKey(_ + _) + .filter(_._2 >= minCount) + + val prefixSuffixPairs = { + val frequentItems = itemCounts.map(_._1).collect() + val candidates = sequences.map { p => + p.filter (frequentItems.contains(_) ) + } + candidates.flatMap { x => + frequentItems.map { y => + val sub = LocalPrefixSpan.getSuffix(y, x) + (ArrayBuffer(y), sub) + }.filter(_._2.nonEmpty) + } + } prefixSuffixPairs.persist(StorageLevel.MEMORY_AND_DISK) - var allPatternAndCounts = lengthOnePatternsAndCounts.map(x => (ArrayBuffer(x._1), x._2)) - var (smallPrefixSuffixPairs, largePrefixSuffixPairs) = - splitPrefixSuffixPairs(prefixSuffixPairs) + + var allPatternAndCounts = itemCounts.map(x => (ArrayBuffer(x._1), x._2)) + var (smallPrefixSuffixPairs, largePrefixSuffixPairs) = splitPrefixSuffixPairs(prefixSuffixPairs) while (largePrefixSuffixPairs.count() != 0) { val (nextPatternAndCounts, nextPrefixSuffixPairs) = getPatternCountsAndPrefixSuffixPairs(minCount, largePrefixSuffixPairs) @@ -115,6 +132,7 @@ class PrefixSpan private ( smallPrefixSuffixPairs ++= smallerPairsPart allPatternAndCounts ++= nextPatternAndCounts } + if (smallPrefixSuffixPairs.count() > 0) { val projectedDatabase = smallPrefixSuffixPairs .map(x => (x._1.toSeq, x._2)) @@ -189,29 +207,6 @@ class PrefixSpan private ( (patternAndCounts, nextPrefixSuffixPairs) } - /** - * Get the minimum count (sequences count * minSupport). - * @param sequences input data set, contains a set of sequences, - * @return minimum count, - */ - private def getMinCount(sequences: RDD[Array[Int]]): Long = { - if (minSupport == 0) 0L else math.ceil(sequences.count() * minSupport).toLong - } - - /** - * Generates frequent items by filtering the input data using minimal count level. - * @param minCount the absolute minimum count - * @param sequences original sequences data - * @return array of item and count pair - */ - private def getFreqItemAndCounts( - minCount: Long, - sequences: RDD[Array[Int]]): RDD[(Int, Long)] = { - sequences.flatMap(_.distinct.map((_, 1L))) - .reduceByKey(_ + _) - .filter(_._2 >= minCount) - } - /** * Get the frequent prefixes and suffix pairs. * @param frequentPrefixes frequent prefixes @@ -221,15 +216,6 @@ class PrefixSpan private ( private def getPrefixSuffixPairs( frequentPrefixes: Array[Int], sequences: RDD[Array[Int]]): RDD[(ArrayBuffer[Int], Array[Int])] = { - val filteredSequences = sequences.map { p => - p.filter (frequentPrefixes.contains(_) ) - } - filteredSequences.flatMap { x => - frequentPrefixes.map { y => - val sub = LocalPrefixSpan.getSuffix(y, x) - (ArrayBuffer(y), sub) - }.filter(_._2.nonEmpty) - } } /** From da0091b3d4d9e9d7f058b645272e70e3256c1ac7 Mon Sep 17 00:00:00 2001 From: Feynman Liang Date: Tue, 28 Jul 2015 15:21:06 -0700 Subject: [PATCH 22/25] Use lists for prefixes to reuse data --- .../apache/spark/mllib/fpm/PrefixSpan.scala | 48 +++++++------------ 1 file changed, 18 insertions(+), 30 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala index 5c563262e184d..79f8b651f83b3 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala @@ -102,9 +102,10 @@ class PrefixSpan private ( val minCount = if (minSupport == 0) 0L else math.ceil(sequences.count() * minSupport).toLong val itemCounts = sequences - .flatMap(_.distinct.map((_, 1L))) + .flatMap(seq => seq.distinct.map(item => (item, 1L))) .reduceByKey(_ + _) .filter(_._2 >= minCount) + var allPatternAndCounts = itemCounts.map(x => (List(x._1), x._2)) val prefixSuffixPairs = { val frequentItems = itemCounts.map(_._1).collect() @@ -114,14 +115,12 @@ class PrefixSpan private ( candidates.flatMap { x => frequentItems.map { y => val sub = LocalPrefixSpan.getSuffix(y, x) - (ArrayBuffer(y), sub) + (List(y), sub) }.filter(_._2.nonEmpty) } } - prefixSuffixPairs.persist(StorageLevel.MEMORY_AND_DISK) - - var allPatternAndCounts = itemCounts.map(x => (ArrayBuffer(x._1), x._2)) var (smallPrefixSuffixPairs, largePrefixSuffixPairs) = splitPrefixSuffixPairs(prefixSuffixPairs) + while (largePrefixSuffixPairs.count() != 0) { val (nextPatternAndCounts, nextPrefixSuffixPairs) = getPatternCountsAndPrefixSuffixPairs(minCount, largePrefixSuffixPairs) @@ -135,9 +134,9 @@ class PrefixSpan private ( if (smallPrefixSuffixPairs.count() > 0) { val projectedDatabase = smallPrefixSuffixPairs - .map(x => (x._1.toSeq, x._2)) + // TODO aggregateByKey .groupByKey() - .map(x => (x._1.toArray, x._2.toArray)) + .mapValues(_.toArray) val nextPatternAndCounts = getPatternsInLocal(minCount, projectedDatabase) allPatternAndCounts ++= nextPatternAndCounts } @@ -154,8 +153,8 @@ class PrefixSpan private ( * (RDD[prefix, suffix], RDD[prefix, suffix ]) */ private def splitPrefixSuffixPairs( - prefixSuffixPairs: RDD[(ArrayBuffer[Int], Array[Int])]): - (RDD[(ArrayBuffer[Int], Array[Int])], RDD[(ArrayBuffer[Int], Array[Int])]) = { + prefixSuffixPairs: RDD[(List[Int], Array[Int])]): + (RDD[(List[Int], Array[Int])], RDD[(List[Int], Array[Int])]) = { val prefixToSuffixSize = prefixSuffixPairs .aggregateByKey(0)( seqOp = { case (count, suffix) => count + suffix.length }, @@ -179,14 +178,14 @@ class PrefixSpan private ( */ private def getPatternCountsAndPrefixSuffixPairs( minCount: Long, - prefixSuffixPairs: RDD[(ArrayBuffer[Int], Array[Int])]): - (RDD[(ArrayBuffer[Int], Long)], RDD[(ArrayBuffer[Int], Array[Int])]) = { + prefixSuffixPairs: RDD[(List[Int], Array[Int])]): + (RDD[(List[Int], Long)], RDD[(List[Int], Array[Int])]) = { val prefixAndFrequentItemAndCounts = prefixSuffixPairs .flatMap { case (prefix, suffix) => suffix.distinct.map(y => ((prefix, y), 1L)) } .reduceByKey(_ + _) .filter(_._2 >= minCount) val patternAndCounts = prefixAndFrequentItemAndCounts - .map { case ((prefix, item), count) => (prefix :+ item, count) } + .map { case ((prefix, item), count) => (item :: prefix, count) } val prefixToFrequentNextItemsMap = prefixAndFrequentItemAndCounts .keys .groupByKey() @@ -201,23 +200,12 @@ class PrefixSpan private ( frequentNextItems.flatMap { item => val suffix = LocalPrefixSpan.getSuffix(item, filteredSuffix) if (suffix.isEmpty) None - else Some(prefix :+ item, suffix) + else Some(item :: prefix, suffix) } } (patternAndCounts, nextPrefixSuffixPairs) } - /** - * Get the frequent prefixes and suffix pairs. - * @param frequentPrefixes frequent prefixes - * @param sequences sequences data - * @return prefixes and suffix pairs. - */ - private def getPrefixSuffixPairs( - frequentPrefixes: Array[Int], - sequences: RDD[Array[Int]]): RDD[(ArrayBuffer[Int], Array[Int])] = { - } - /** * calculate the patterns in local. * @param minCount the absolute minimum count @@ -226,13 +214,13 @@ class PrefixSpan private ( */ private def getPatternsInLocal( minCount: Long, - data: RDD[(Array[Int], Array[Array[Int]])]): RDD[(ArrayBuffer[Int], Long)] = { + data: RDD[(List[Int], Array[Array[Int]])]): RDD[(List[Int], Long)] = { data.flatMap { - case (prefix, projDB) => - LocalPrefixSpan.run(minCount, maxPatternLength, prefix.toList.reverse, projDB) - .map { case (pattern: List[Int], count: Long) => - (pattern.toArray.reverse.to[ArrayBuffer], count) - } + case (prefix, projDB) => + LocalPrefixSpan.run(minCount, maxPatternLength, prefix.toList.reverse, projDB) + .map { case (pattern: List[Int], count: Long) => + (pattern.reverse, count) + } } } } From 1235cfcc9367b546bcf564972a33b769f62da520 Mon Sep 17 00:00:00 2001 From: Feynman Liang Date: Tue, 28 Jul 2015 15:30:29 -0700 Subject: [PATCH 23/25] Use Iterable[Array[_]] over Array[Array[_]] for database --- .../spark/mllib/fpm/LocalPrefixSpan.scala | 6 +-- .../apache/spark/mllib/fpm/PrefixSpan.scala | 37 +++++++++---------- 2 files changed, 21 insertions(+), 22 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/LocalPrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/LocalPrefixSpan.scala index 7ead6327486cc..0ea792081086d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/LocalPrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/LocalPrefixSpan.scala @@ -40,7 +40,7 @@ private[fpm] object LocalPrefixSpan extends Logging with Serializable { minCount: Long, maxPatternLength: Int, prefixes: List[Int], - database: Array[Array[Int]]): Iterator[(List[Int], Long)] = { + database: Iterable[Array[Int]]): Iterator[(List[Int], Long)] = { if (prefixes.length == maxPatternLength || database.isEmpty) return Iterator.empty val frequentItemAndCounts = getFreqItemAndCounts(minCount, database) val filteredDatabase = database.map(x => x.filter(frequentItemAndCounts.contains)) @@ -67,7 +67,7 @@ private[fpm] object LocalPrefixSpan extends Logging with Serializable { } } - def project(database: Array[Array[Int]], prefix: Int): Array[Array[Int]] = { + def project(database: Iterable[Array[Int]], prefix: Int): Iterable[Array[Int]] = { database .map(getSuffix(prefix, _)) .filter(_.nonEmpty) @@ -81,7 +81,7 @@ private[fpm] object LocalPrefixSpan extends Logging with Serializable { */ private def getFreqItemAndCounts( minCount: Long, - database: Array[Array[Int]]): mutable.Map[Int, Long] = { + database: Iterable[Array[Int]]): mutable.Map[Int, Long] = { // TODO: use PrimitiveKeyOpenHashMap val counts = mutable.Map[Int, Long]().withDefaultValue(0L) database.foreach { sequence => diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala index 79f8b651f83b3..bbdc75532ae6f 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala @@ -45,7 +45,11 @@ class PrefixSpan private ( private var minSupport: Double, private var maxPatternLength: Int) extends Logging with Serializable { - private val maxProjectedDBSizeBeforeLocalProcessing: Long = 10000 + /** + * The maximum number of items allowed in a projected database before local processing. If a + * projected database exceeds this size, another iteration of distributed PrefixSpan is run. + */ + private val maxLocalProjDBSize: Long = 10000 /** * Constructs a default instance with default parameters @@ -63,8 +67,7 @@ class PrefixSpan private ( * Sets the minimal support level (default: `0.1`). */ def setMinSupport(minSupport: Double): this.type = { - require(minSupport >= 0 && minSupport <= 1, - "The minimum support value must be in [0, 1].") + require(minSupport >= 0 && minSupport <= 1, "The minimum support value must be in [0, 1].") this.minSupport = minSupport this } @@ -79,8 +82,7 @@ class PrefixSpan private ( */ def setMaxPatternLength(maxPatternLength: Int): this.type = { // TODO: support unbounded pattern length when maxPatternLength = 0 - require(maxPatternLength >= 1, - "The maximum pattern length value must be greater than 0.") + require(maxPatternLength >= 1, "The maximum pattern length value must be greater than 0.") this.maxPatternLength = maxPatternLength this } @@ -119,13 +121,13 @@ class PrefixSpan private ( }.filter(_._2.nonEmpty) } } - var (smallPrefixSuffixPairs, largePrefixSuffixPairs) = splitPrefixSuffixPairs(prefixSuffixPairs) + var (smallPrefixSuffixPairs, largePrefixSuffixPairs) = partitionByProjDBSize(prefixSuffixPairs) while (largePrefixSuffixPairs.count() != 0) { val (nextPatternAndCounts, nextPrefixSuffixPairs) = getPatternCountsAndPrefixSuffixPairs(minCount, largePrefixSuffixPairs) largePrefixSuffixPairs.unpersist() - val (smallerPairsPart, largerPairsPart) = splitPrefixSuffixPairs(nextPrefixSuffixPairs) + val (smallerPairsPart, largerPairsPart) = partitionByProjDBSize(nextPrefixSuffixPairs) largePrefixSuffixPairs = largerPairsPart largePrefixSuffixPairs.persist(StorageLevel.MEMORY_AND_DISK) smallPrefixSuffixPairs ++= smallerPairsPart @@ -136,7 +138,6 @@ class PrefixSpan private ( val projectedDatabase = smallPrefixSuffixPairs // TODO aggregateByKey .groupByKey() - .mapValues(_.toArray) val nextPatternAndCounts = getPatternsInLocal(minCount, projectedDatabase) allPatternAndCounts ++= nextPatternAndCounts } @@ -145,23 +146,21 @@ class PrefixSpan private ( /** - * Split prefix suffix pairs to two parts: - * Prefixes with projected databases smaller than maxSuffixesBeforeLocalProcessing and - * Prefixes with projected databases larger than maxSuffixesBeforeLocalProcessing + * Partitions the prefix-suffix pairs by projected database size. + * * @param prefixSuffixPairs prefix (length n) and suffix pairs, - * @return small size prefix suffix pairs and big size prefix suffix pairs - * (RDD[prefix, suffix], RDD[prefix, suffix ]) + * @return prefix-suffix pairs partitioned by whether their projected database size is <= or + * greater than [[maxLocalProjDBSize]] */ - private def splitPrefixSuffixPairs( - prefixSuffixPairs: RDD[(List[Int], Array[Int])]): - (RDD[(List[Int], Array[Int])], RDD[(List[Int], Array[Int])]) = { + private def partitionByProjDBSize(prefixSuffixPairs: RDD[(List[Int], Array[Int])]) + : (RDD[(List[Int], Array[Int])], RDD[(List[Int], Array[Int])]) = { val prefixToSuffixSize = prefixSuffixPairs .aggregateByKey(0)( seqOp = { case (count, suffix) => count + suffix.length }, combOp = { _ + _ }) val smallPrefixes = prefixToSuffixSize - .filter(_._2 <= maxProjectedDBSizeBeforeLocalProcessing) - .map(_._1) + .filter(_._2 <= maxLocalProjDBSize) + .keys .collect() .toSet val small = prefixSuffixPairs.filter { case (prefix, _) => smallPrefixes.contains(prefix) } @@ -214,7 +213,7 @@ class PrefixSpan private ( */ private def getPatternsInLocal( minCount: Long, - data: RDD[(List[Int], Array[Array[Int]])]): RDD[(List[Int], Long)] = { + data: RDD[(List[Int], Iterable[Array[Int]])]): RDD[(List[Int], Long)] = { data.flatMap { case (prefix, projDB) => LocalPrefixSpan.run(minCount, maxPatternLength, prefix.toList.reverse, projDB) From c2caa5cb19e5c9e54dda288c9c1e7befb21feb64 Mon Sep 17 00:00:00 2001 From: Feynman Liang Date: Tue, 28 Jul 2015 15:54:30 -0700 Subject: [PATCH 24/25] Readability improvements and comments --- .../apache/spark/mllib/fpm/PrefixSpan.scala | 64 ++++++++++--------- 1 file changed, 34 insertions(+), 30 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala index bbdc75532ae6f..8a15a867910a2 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala @@ -103,45 +103,49 @@ class PrefixSpan private ( // Convert min support to a min number of transactions for this dataset val minCount = if (minSupport == 0) 0L else math.ceil(sequences.count() * minSupport).toLong - val itemCounts = sequences + // Frequent items -> number of occurrences, all items here satisfy the `minSupport` threshold + val freqItemCounts = sequences .flatMap(seq => seq.distinct.map(item => (item, 1L))) .reduceByKey(_ + _) .filter(_._2 >= minCount) - var allPatternAndCounts = itemCounts.map(x => (List(x._1), x._2)) - val prefixSuffixPairs = { - val frequentItems = itemCounts.map(_._1).collect() - val candidates = sequences.map { p => - p.filter (frequentItems.contains(_) ) - } - candidates.flatMap { x => - frequentItems.map { y => - val sub = LocalPrefixSpan.getSuffix(y, x) - (List(y), sub) - }.filter(_._2.nonEmpty) + // Pairs of (length 1 prefix, suffix consisting of frequent items) + val itemSuffixPairs = { + val freqItems = freqItemCounts.keys.collect().toSet + sequences.flatMap { seq => + freqItems.flatMap { item => + val candidateSuffix = LocalPrefixSpan.getSuffix(item, seq.filter(freqItems.contains(_))) + candidateSuffix match { + case suffix if !suffix.isEmpty => Some((List(item), suffix)) + case _ => None + } + } } } - var (smallPrefixSuffixPairs, largePrefixSuffixPairs) = partitionByProjDBSize(prefixSuffixPairs) - while (largePrefixSuffixPairs.count() != 0) { + // Accumulator for the computed results to be returned + var resultsAccumulator = freqItemCounts.map(x => (List(x._1), x._2)) + + // Remaining work to be locally and distributively processed respectfully + var (pairsForLocal, pairsForDistributed) = partitionByProjDBSize(itemSuffixPairs) + + // Continue processing until no pairs for distributed processing remain (i.e. all prefixes have + // projected database sizes <= `maxLocalProjDBSize`) + while (pairsForDistributed.count() != 0) { val (nextPatternAndCounts, nextPrefixSuffixPairs) = - getPatternCountsAndPrefixSuffixPairs(minCount, largePrefixSuffixPairs) - largePrefixSuffixPairs.unpersist() + getPatternCountsAndPrefixSuffixPairs(minCount, pairsForDistributed) + pairsForDistributed.unpersist() val (smallerPairsPart, largerPairsPart) = partitionByProjDBSize(nextPrefixSuffixPairs) - largePrefixSuffixPairs = largerPairsPart - largePrefixSuffixPairs.persist(StorageLevel.MEMORY_AND_DISK) - smallPrefixSuffixPairs ++= smallerPairsPart - allPatternAndCounts ++= nextPatternAndCounts + pairsForDistributed = largerPairsPart + pairsForDistributed.persist(StorageLevel.MEMORY_AND_DISK) + pairsForLocal ++= smallerPairsPart + resultsAccumulator ++= nextPatternAndCounts } - if (smallPrefixSuffixPairs.count() > 0) { - val projectedDatabase = smallPrefixSuffixPairs - // TODO aggregateByKey - .groupByKey() - val nextPatternAndCounts = getPatternsInLocal(minCount, projectedDatabase) - allPatternAndCounts ++= nextPatternAndCounts - } - allPatternAndCounts.map { case (pattern, count) => (pattern.toArray, count) } + // Process the small projected databases locally + resultsAccumulator ++= getPatternsInLocal(minCount, pairsForLocal.groupByKey()) + + resultsAccumulator.map { case (pattern, count) => (pattern.toArray, count) } } @@ -177,8 +181,8 @@ class PrefixSpan private ( */ private def getPatternCountsAndPrefixSuffixPairs( minCount: Long, - prefixSuffixPairs: RDD[(List[Int], Array[Int])]): - (RDD[(List[Int], Long)], RDD[(List[Int], Array[Int])]) = { + prefixSuffixPairs: RDD[(List[Int], Array[Int])]) + : (RDD[(List[Int], Long)], RDD[(List[Int], Array[Int])]) = { val prefixAndFrequentItemAndCounts = prefixSuffixPairs .flatMap { case (prefix, suffix) => suffix.distinct.map(y => ((prefix, y), 1L)) } .reduceByKey(_ + _) From 87fa021afaa184dfbf7eafcae0beb494697c40e2 Mon Sep 17 00:00:00 2001 From: Feynman Liang Date: Tue, 28 Jul 2015 16:19:48 -0700 Subject: [PATCH 25/25] Improve extend prefix readability --- .../apache/spark/mllib/fpm/PrefixSpan.scala | 64 +++++++++++-------- 1 file changed, 39 insertions(+), 25 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala index 8a15a867910a2..5b8da9665366b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala @@ -103,7 +103,7 @@ class PrefixSpan private ( // Convert min support to a min number of transactions for this dataset val minCount = if (minSupport == 0) 0L else math.ceil(sequences.count() * minSupport).toLong - // Frequent items -> number of occurrences, all items here satisfy the `minSupport` threshold + // (Frequent items -> number of occurrences, all items here satisfy the `minSupport` threshold val freqItemCounts = sequences .flatMap(seq => seq.distinct.map(item => (item, 1L))) .reduceByKey(_ + _) @@ -113,8 +113,9 @@ class PrefixSpan private ( val itemSuffixPairs = { val freqItems = freqItemCounts.keys.collect().toSet sequences.flatMap { seq => + val filteredSeq = seq.filter(freqItems.contains(_)) freqItems.flatMap { item => - val candidateSuffix = LocalPrefixSpan.getSuffix(item, seq.filter(freqItems.contains(_))) + val candidateSuffix = LocalPrefixSpan.getSuffix(item, filteredSeq) candidateSuffix match { case suffix if !suffix.isEmpty => Some((List(item), suffix)) case _ => None @@ -123,7 +124,8 @@ class PrefixSpan private ( } } - // Accumulator for the computed results to be returned + // Accumulator for the computed results to be returned, initialized to the frequent items (i.e. + // frequent length-one prefixes) var resultsAccumulator = freqItemCounts.map(x => (List(x._1), x._2)) // Remaining work to be locally and distributively processed respectfully @@ -133,7 +135,7 @@ class PrefixSpan private ( // projected database sizes <= `maxLocalProjDBSize`) while (pairsForDistributed.count() != 0) { val (nextPatternAndCounts, nextPrefixSuffixPairs) = - getPatternCountsAndPrefixSuffixPairs(minCount, pairsForDistributed) + extendPrefixes(minCount, pairsForDistributed) pairsForDistributed.unpersist() val (smallerPairsPart, largerPairsPart) = partitionByProjDBSize(nextPrefixSuffixPairs) pairsForDistributed = largerPairsPart @@ -151,7 +153,6 @@ class PrefixSpan private ( /** * Partitions the prefix-suffix pairs by projected database size. - * * @param prefixSuffixPairs prefix (length n) and suffix pairs, * @return prefix-suffix pairs partitioned by whether their projected database size is <= or * greater than [[maxLocalProjDBSize]] @@ -173,44 +174,57 @@ class PrefixSpan private ( } /** - * Get the pattern and counts, and prefix suffix pairs + * Extends all prefixes by one item from their suffix and computes the resulting frequent prefixes + * and remaining work. * @param minCount minimum count - * @param prefixSuffixPairs prefix (length n) and suffix pairs, - * @return pattern (length n+1) and counts, and prefix (length n+1) and suffix pairs - * (RDD[pattern, count], RDD[prefix, suffix ]) + * @param prefixSuffixPairs prefix (length N) and suffix pairs, + * @return (frequent length N+1 extended prefix, count) pairs and (frequent length N+1 extended + * prefix, corresponding suffix) pairs. */ - private def getPatternCountsAndPrefixSuffixPairs( + private def extendPrefixes( minCount: Long, prefixSuffixPairs: RDD[(List[Int], Array[Int])]) : (RDD[(List[Int], Long)], RDD[(List[Int], Array[Int])]) = { - val prefixAndFrequentItemAndCounts = prefixSuffixPairs + + // (length N prefix, item from suffix) pairs and their corresponding number of occurrences + // Every (prefix :+ suffix) is guaranteed to have support exceeding `minSupport` + val prefixItemPairAndCounts = prefixSuffixPairs .flatMap { case (prefix, suffix) => suffix.distinct.map(y => ((prefix, y), 1L)) } .reduceByKey(_ + _) .filter(_._2 >= minCount) - val patternAndCounts = prefixAndFrequentItemAndCounts - .map { case ((prefix, item), count) => (item :: prefix, count) } - val prefixToFrequentNextItemsMap = prefixAndFrequentItemAndCounts + + // Map from prefix to set of possible next items from suffix + val prefixToNextItems = prefixItemPairAndCounts .keys .groupByKey() .mapValues(_.toSet) .collect() .toMap - val nextPrefixSuffixPairs = prefixSuffixPairs - .filter(x => prefixToFrequentNextItemsMap.contains(x._1)) + + + // Frequent patterns with length N+1 and their corresponding counts + val extendedPrefixAndCounts = prefixItemPairAndCounts + .map { case ((prefix, item), count) => (item :: prefix, count) } + + // Remaining work, all prefixes will have length N+1 + val extendedPrefixAndSuffix = prefixSuffixPairs + .filter(x => prefixToNextItems.contains(x._1)) .flatMap { case (prefix, suffix) => - val frequentNextItems = prefixToFrequentNextItemsMap(prefix) - val filteredSuffix = suffix.filter(frequentNextItems.contains(_)) - frequentNextItems.flatMap { item => - val suffix = LocalPrefixSpan.getSuffix(item, filteredSuffix) - if (suffix.isEmpty) None - else Some(item :: prefix, suffix) + val frequentNextItems = prefixToNextItems(prefix) + val filteredSuffix = suffix.filter(frequentNextItems.contains(_)) + frequentNextItems.flatMap { item => + LocalPrefixSpan.getSuffix(item, filteredSuffix) match { + case suffix if !suffix.isEmpty => Some(item :: prefix, suffix) + case _ => None + } + } } - } - (patternAndCounts, nextPrefixSuffixPairs) + + (extendedPrefixAndCounts, extendedPrefixAndSuffix) } /** - * calculate the patterns in local. + * Calculate the patterns in local. * @param minCount the absolute minimum count * @param data prefixes and projected sequences data data * @return patterns