|
17 | 17 |
|
18 | 18 | package org.apache.spark.mllib.fpm |
19 | 19 |
|
| 20 | +import scala.collection.mutable |
| 21 | + |
20 | 22 | import org.apache.spark.Logging |
21 | | -import org.apache.spark.annotation.Experimental |
22 | 23 |
|
23 | 24 | /** |
24 | | - * |
25 | | - * :: Experimental :: |
26 | | - * |
27 | 25 | * Calculate all patterns of a projected database in local. |
28 | 26 | */ |
29 | | -@Experimental |
30 | 27 | private[fpm] object LocalPrefixSpan extends Logging with Serializable { |
31 | 28 |
|
32 | 29 | /** |
33 | 30 | * Calculate all patterns of a projected database. |
34 | 31 | * @param minCount minimum count |
35 | 32 | * @param maxPatternLength maximum pattern length |
36 | | - * @param prefix prefix |
37 | | - * @param projectedDatabase the projected dabase |
| 33 | + * @param prefixes prefixes in reversed order |
| 34 | + * @param database the projected database |
38 | 35 | * @return a set of sequential pattern pairs, |
39 | | - * the key of pair is sequential pattern (a list of items), |
| 36 | + * the key of pair is sequential pattern (a list of items in reversed order), |
40 | 37 | * the value of pair is the pattern's count. |
41 | 38 | */ |
42 | 39 | def run( |
43 | 40 | minCount: Long, |
44 | 41 | maxPatternLength: Int, |
45 | | - prefix: Array[Int], |
46 | | - projectedDatabase: Array[Array[Int]]): Array[(Array[Int], Long)] = { |
47 | | - val frequentPrefixAndCounts = getFreqItemAndCounts(minCount, projectedDatabase) |
48 | | - val frequentPatternAndCounts = frequentPrefixAndCounts |
49 | | - .map(x => (prefix ++ Array(x._1), x._2)) |
50 | | - val prefixProjectedDatabases = getPatternAndProjectedDatabase( |
51 | | - prefix, frequentPrefixAndCounts.map(_._1), projectedDatabase) |
52 | | - |
53 | | - val continueProcess = prefixProjectedDatabases.nonEmpty && prefix.length + 1 < maxPatternLength |
54 | | - if (continueProcess) { |
55 | | - val nextPatterns = prefixProjectedDatabases |
56 | | - .map(x => run(minCount, maxPatternLength, x._1, x._2)) |
57 | | - .reduce(_ ++ _) |
58 | | - frequentPatternAndCounts ++ nextPatterns |
59 | | - } else { |
60 | | - frequentPatternAndCounts |
| 42 | + prefixes: List[Int], |
| 43 | + database: Array[Array[Int]]): Iterator[(List[Int], Long)] = { |
| 44 | + if (prefixes.length == maxPatternLength || database.isEmpty) return Iterator.empty |
| 45 | + val frequentItemAndCounts = getFreqItemAndCounts(minCount, database) |
| 46 | + val filteredDatabase = database.map(x => x.filter(frequentItemAndCounts.contains)) |
| 47 | + frequentItemAndCounts.iterator.flatMap { case (item, count) => |
| 48 | + val newPrefixes = item :: prefixes |
| 49 | + val newProjected = project(filteredDatabase, item) |
| 50 | + Iterator.single((newPrefixes, count)) ++ |
| 51 | + run(minCount, maxPatternLength, newPrefixes, newProjected) |
61 | 52 | } |
62 | 53 | } |
63 | 54 |
|
64 | 55 | /** |
65 | | - * calculate suffix sequence following a prefix in a sequence |
66 | | - * @param prefix prefix |
67 | | - * @param sequence sequence |
| 56 | + * Calculate suffix sequence immediately after the first occurrence of an item. |
| 57 | + * @param item item to get suffix after |
| 58 | + * @param sequence sequence to extract suffix from |
68 | 59 | * @return suffix sequence |
69 | 60 | */ |
70 | | - def getSuffix(prefix: Int, sequence: Array[Int]): Array[Int] = { |
71 | | - val index = sequence.indexOf(prefix) |
| 61 | + def getSuffix(item: Int, sequence: Array[Int]): Array[Int] = { |
| 62 | + val index = sequence.indexOf(item) |
72 | 63 | if (index == -1) { |
73 | 64 | Array() |
74 | 65 | } else { |
75 | 66 | sequence.drop(index + 1) |
76 | 67 | } |
77 | 68 | } |
78 | 69 |
|
| 70 | + def project(database: Array[Array[Int]], prefix: Int): Array[Array[Int]] = { |
| 71 | + database |
| 72 | + .map(getSuffix(prefix, _)) |
| 73 | + .filter(_.nonEmpty) |
| 74 | + } |
| 75 | + |
79 | 76 | /** |
80 | 77 | * Generates frequent items by filtering the input data using minimal count level. |
81 | | - * @param minCount the absolute minimum count |
82 | | - * @param sequences sequences data |
83 | | - * @return array of item and count pair |
| 78 | + * @param minCount the minimum count for an item to be frequent |
| 79 | + * @param database database of sequences |
| 80 | + * @return freq item to count map |
84 | 81 | */ |
85 | 82 | private def getFreqItemAndCounts( |
86 | 83 | minCount: Long, |
87 | | - sequences: Array[Array[Int]]): Array[(Int, Long)] = { |
88 | | - sequences.flatMap(_.distinct) |
89 | | - .groupBy(x => x) |
90 | | - .mapValues(_.length.toLong) |
91 | | - .filter(_._2 >= minCount) |
92 | | - .toArray |
93 | | - } |
94 | | - |
95 | | - /** |
96 | | - * Get the frequent prefixes' projected database. |
97 | | - * @param prePrefix the frequent prefixes' prefix |
98 | | - * @param frequentPrefixes frequent prefixes |
99 | | - * @param sequences sequences data |
100 | | - * @return prefixes and projected database |
101 | | - */ |
102 | | - private def getPatternAndProjectedDatabase( |
103 | | - prePrefix: Array[Int], |
104 | | - frequentPrefixes: Array[Int], |
105 | | - sequences: Array[Array[Int]]): Array[(Array[Int], Array[Array[Int]])] = { |
106 | | - val filteredProjectedDatabase = sequences |
107 | | - .map(x => x.filter(frequentPrefixes.contains(_))) |
108 | | - frequentPrefixes.map { x => |
109 | | - val sub = filteredProjectedDatabase.map(y => getSuffix(x, y)).filter(_.nonEmpty) |
110 | | - (prePrefix ++ Array(x), sub) |
111 | | - }.filter(x => x._2.nonEmpty) |
| 84 | + database: Array[Array[Int]]): mutable.Map[Int, Long] = { |
| 85 | + // TODO: use PrimitiveKeyOpenHashMap |
| 86 | + val counts = mutable.Map[Int, Long]().withDefaultValue(0L) |
| 87 | + database.foreach { sequence => |
| 88 | + sequence.distinct.foreach { item => |
| 89 | + counts(item) += 1L |
| 90 | + } |
| 91 | + } |
| 92 | + counts.filter(_._2 >= minCount) |
112 | 93 | } |
113 | 94 | } |
0 commit comments