Skip to content

Commit 00fe756

Browse files
author
Feynman Liang
committed
Reset base files for rebase
1 parent f486dcd commit 00fe756

File tree

3 files changed

+85
-268
lines changed

3 files changed

+85
-268
lines changed

mllib/src/main/scala/org/apache/spark/mllib/fpm/LocalPrefixSpan.scala

Lines changed: 23 additions & 135 deletions
Original file line numberDiff line numberDiff line change
@@ -41,166 +41,54 @@ private[fpm] object LocalPrefixSpan extends Logging with Serializable {
4141
maxPatternLength: Int,
4242
prefixes: List[Int],
4343
database: Iterable[Array[Int]]): Iterator[(List[Int], Long)] = {
44-
if (prefixes.count(_ != -1) == maxPatternLength || database.isEmpty) return Iterator.empty
45-
val frequentPrefixAndCounts = getFreqPrefixAndCounts(minCount, prefixes, database)
46-
frequentPrefixAndCounts.iterator.flatMap { case (prefix, count) =>
47-
val newProjected = project(database, prefix)
48-
Iterator.single((prefix, count)) ++
49-
run(minCount, maxPatternLength, prefix, newProjected)
44+
if (prefixes.length == maxPatternLength || database.isEmpty) return Iterator.empty
45+
val frequentItemAndCounts = getFreqItemAndCounts(minCount, database)
46+
val filteredDatabase = database.map(x => x.filter(frequentItemAndCounts.contains))
47+
frequentItemAndCounts.iterator.flatMap { case (item, count) =>
48+
val newPrefixes = item :: prefixes
49+
val newProjected = project(filteredDatabase, item)
50+
Iterator.single((newPrefixes, count)) ++
51+
run(minCount, maxPatternLength, newPrefixes, newProjected)
5052
}
5153
}
5254

5355
/**
54-
* Calculate suffix sequence immediately after the first occurrence of a prefix.
55-
* @param prefix prefix to get suffix after
56+
* Calculate suffix sequence immediately after the first occurrence of an item.
57+
* @param item item to get suffix after
5658
* @param sequence sequence to extract suffix from
5759
* @return suffix sequence
5860
*/
59-
def getSuffix(prefix: List[Int], sequence: Array[Int]): (Boolean, Array[Int]) = {
60-
val element = getLastElement(prefix)
61-
if (sequence.apply(0) != -3) {
62-
if (element.length == 1) {
63-
getSingleItemElementSuffix(element, sequence)
64-
} else {
65-
getMultiItemsElementSuffix(element, sequence)
66-
}
67-
} else {
68-
if (element.length == 1) {
69-
val firstElemPos = sequence.indexOf(-1)
70-
if (firstElemPos == -1) {
71-
(false, Array())
72-
} else {
73-
getSingleItemElementSuffix(element, sequence.drop(firstElemPos + 1))
74-
}
75-
} else {
76-
val newSequence = element.take(element.length - 1) ++ sequence.drop(1)
77-
getMultiItemsElementSuffix(element, newSequence)
78-
}
79-
}
80-
}
81-
82-
private def getLastElement(prefix: List[Int]): Array[Int] = {
83-
val pos = prefix.indexOf(-1)
84-
if (pos == -1) {
85-
prefix.reverse.toArray
86-
} else {
87-
prefix.take(pos).reverse.toArray
88-
}
89-
}
90-
91-
private def getSingleItemElementSuffix(
92-
element: Array[Int],
93-
sequence: Array[Int]): (Boolean, Array[Int]) = {
94-
val index = sequence.indexOf(element.apply(0))
61+
def getSuffix(item: Int, sequence: Array[Int]): Array[Int] = {
62+
val index = sequence.indexOf(item)
9563
if (index == -1) {
96-
(false, Array())
97-
} else if (index == sequence.length - 1) {
98-
(true, Array())
99-
} else if (sequence.apply(index + 1) == -1) {
100-
(true, sequence.drop(index + 2))
101-
} else {
102-
(true, -3 +: sequence.drop(index + 1))
103-
}
104-
}
105-
106-
private def getMultiItemsElementSuffix(
107-
element: Array[Int],
108-
sequence: Array[Int]): (Boolean, Array[Int]) = {
109-
var seqPos = 0
110-
var found = false
111-
while (seqPos < sequence.length && !found) {
112-
var elemPos = 0
113-
while (!found && elemPos < element.length &&
114-
seqPos < sequence.length && sequence.apply(seqPos) != -1 ) {
115-
if (element.apply(elemPos) == sequence.apply(seqPos)) {
116-
elemPos += 1
117-
seqPos += 1
118-
} else {
119-
seqPos += 1
120-
}
121-
found = elemPos == element.length
122-
}
123-
if (!found) seqPos += 1
124-
}
125-
if (found) {
126-
if (sequence.apply(seqPos) == -1) {
127-
(true, sequence.drop(seqPos + 1))
128-
} else {
129-
(true, -3 +: sequence.drop(seqPos))
130-
}
64+
Array()
13165
} else {
132-
(false, Array())
66+
sequence.drop(index + 1)
13367
}
13468
}
13569

136-
def project(database: Iterable[Array[Int]], prefix: List[Int]): Iterable[Array[Int]] = {
70+
def project(database: Iterable[Array[Int]], prefix: Int): Iterable[Array[Int]] = {
13771
database
138-
.map(getSuffix(prefix, _)._2)
72+
.map(getSuffix(prefix, _))
13973
.filter(_.nonEmpty)
14074
}
14175

14276
/**
143-
* Generates frequent prefix by filtering the input data using minimal count level.
77+
* Generates frequent items by filtering the input data using minimal count level.
14478
* @param minCount the minimum count for an item to be frequent
145-
* @param prefix the minimum count for an item to be frequent
14679
* @param database database of sequences
14780
* @return freq item to count map
14881
*/
149-
private def getFreqPrefixAndCounts(
82+
private def getFreqItemAndCounts(
15083
minCount: Long,
151-
prefix: List[Int],
152-
database: Iterable[Array[Int]]): mutable.Map[List[Int], Long] = {
84+
database: Iterable[Array[Int]]): mutable.Map[Int, Long] = {
15385
// TODO: use PrimitiveKeyOpenHashMap
154-
155-
// get frequent items
156-
val freqItems = database
157-
.flatMap(_.distinct.filter(x => x != -1 && x != -3))
158-
.groupBy(x => x)
159-
.mapValues(_.size)
160-
.filter(_._2 >= minCount)
161-
.map(_._1)
162-
if (freqItems.isEmpty) return mutable.Map[List[Int], Long]()
163-
164-
// get prefixes and counts
165-
val singleItemCounts = mutable.Map[Int, Long]().withDefaultValue(0L)
166-
val multiItemsCounts = mutable.Map[Int, Long]().withDefaultValue(0L)
167-
val prefixLastElement = getLastElement(prefix)
86+
val counts = mutable.Map[Int, Long]().withDefaultValue(0L)
16887
database.foreach { sequence =>
169-
if (sequence.apply(0) != -3) {
170-
freqItems.foreach { item =>
171-
if (getSingleItemElementSuffix(Array(item), sequence)._1) {
172-
singleItemCounts(item) += 1
173-
}
174-
if (prefixLastElement.nonEmpty &&
175-
getMultiItemsElementSuffix(prefixLastElement :+ item, sequence)._1) {
176-
multiItemsCounts(item) += 1
177-
}
178-
}
179-
} else {
180-
val firstElemPos = sequence.indexOf(-1)
181-
if (firstElemPos != -1) {
182-
val newSequence = sequence.drop(firstElemPos + 1)
183-
freqItems.foreach { item =>
184-
if (getSingleItemElementSuffix(Array(item), newSequence)._1) {
185-
singleItemCounts(item) += 1
186-
}
187-
}
188-
}
189-
val newSequence = prefixLastElement ++ sequence.drop(1)
190-
freqItems.foreach { item =>
191-
if (prefixLastElement.nonEmpty &&
192-
getMultiItemsElementSuffix(prefixLastElement :+ item, newSequence)._1) {
193-
multiItemsCounts(item) += 1
194-
}
195-
}
88+
sequence.distinct.foreach { item =>
89+
counts(item) += 1L
19690
}
19791
}
198-
199-
if (prefix.nonEmpty) {
200-
singleItemCounts.filter(_._2 >= minCount).map(x => (x._1 :: (-1 :: prefix), x._2)) ++
201-
multiItemsCounts.filter(_._2 >= minCount).map(x => (x._1 :: prefix, x._2))
202-
} else {
203-
singleItemCounts.filter(_._2 >= minCount).map(x => (List(x._1), x._2))
204-
}
92+
counts.filter(_._2 >= minCount)
20593
}
20694
}

mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala

Lines changed: 31 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.mllib.fpm
1919

20+
import scala.collection.mutable.ArrayBuffer
21+
2022
import org.apache.spark.Logging
2123
import org.apache.spark.annotation.Experimental
2224
import org.apache.spark.rdd.RDD
@@ -48,7 +50,7 @@ class PrefixSpan private (
4850
* projected database exceeds this size, another iteration of distributed PrefixSpan is run.
4951
*/
5052
// TODO: make configurable with a better default value, 10000 may be too small
51-
private val maxLocalProjDBSize: Long = 32000000L
53+
private val maxLocalProjDBSize: Long = 10000
5254

5355
/**
5456
* Constructs a default instance with default parameters
@@ -106,20 +108,20 @@ class PrefixSpan private (
106108

107109
// (Frequent items -> number of occurrences, all items here satisfy the `minSupport` threshold
108110
val freqItemCounts = sequences
109-
.flatMap(seq => seq.distinct.filter(_ != -1).map(item => (item, 1L)))
111+
.flatMap(seq => seq.distinct.map(item => (item, 1L)))
110112
.reduceByKey(_ + _)
111113
.filter(_._2 >= minCount)
112114
.collect()
113115

114116
// Pairs of (length 1 prefix, suffix consisting of frequent items)
115-
val prefixSuffixPairs = {
117+
val itemSuffixPairs = {
116118
val freqItems = freqItemCounts.map(_._1).toSet
117119
sequences.flatMap { seq =>
118-
val filteredSeq = seq.filter(item => freqItems.contains(item) || item == -1)
120+
val filteredSeq = seq.filter(freqItems.contains(_))
119121
freqItems.flatMap { item =>
120-
val candidateSuffix = LocalPrefixSpan.getSuffix(List(item), filteredSeq)._2
122+
val candidateSuffix = LocalPrefixSpan.getSuffix(item, filteredSeq)
121123
candidateSuffix match {
122-
case suffix if suffix.nonEmpty => Some((List(item), suffix))
124+
case suffix if !suffix.isEmpty => Some((List(item), suffix))
123125
case _ => None
124126
}
125127
}
@@ -131,13 +133,11 @@ class PrefixSpan private (
131133
var resultsAccumulator = freqItemCounts.map(x => (List(x._1), x._2))
132134

133135
// Remaining work to be locally and distributively processed respectfully
134-
var (pairsForLocal, pairsForDistributed) = partitionByProjDBSize(prefixSuffixPairs)
136+
var (pairsForLocal, pairsForDistributed) = partitionByProjDBSize(itemSuffixPairs)
135137

136138
// Continue processing until no pairs for distributed processing remain (i.e. all prefixes have
137139
// projected database sizes <= `maxLocalProjDBSize`)
138-
var patternLength = 1
139-
while (pairsForDistributed.count() != 0 && patternLength < maxPatternLength) {
140-
patternLength += 1
140+
while (pairsForDistributed.count() != 0) {
141141
val (nextPatternAndCounts, nextPrefixSuffixPairs) =
142142
extendPrefixes(minCount, pairsForDistributed)
143143
pairsForDistributed.unpersist()
@@ -153,7 +153,7 @@ class PrefixSpan private (
153153
minCount, sc.parallelize(pairsForLocal, 1).groupByKey())
154154

155155
(sc.parallelize(resultsAccumulator, 1) ++ remainingResults)
156-
.map { case (pattern, count) => (pattern.reverse.toArray, count) }
156+
.map { case (pattern, count) => (pattern.toArray, count) }
157157
}
158158

159159

@@ -195,41 +195,36 @@ class PrefixSpan private (
195195
// (length N prefix, item from suffix) pairs and their corresponding number of occurrences
196196
// Every (prefix :+ suffix) is guaranteed to have support exceeding `minSupport`
197197
val prefixItemPairAndCounts = prefixSuffixPairs
198-
.flatMap { case (prefix, suffix) =>
199-
suffix.distinct.filter(item => item != -1 && item != -3).map(y => ((prefix, y), 1L)) }
198+
.flatMap { case (prefix, suffix) => suffix.distinct.map(y => ((prefix, y), 1L)) }
200199
.reduceByKey(_ + _)
201200
.filter(_._2 >= minCount)
202201

203-
// Map from prefix to set of possible next prefix from suffix
204-
val prefixToNextPrefixes = prefixItemPairAndCounts
202+
// Map from prefix to set of possible next items from suffix
203+
val prefixToNextItems = prefixItemPairAndCounts
205204
.keys
206205
.groupByKey()
207-
.map { case (prefix, items) =>
208-
(prefix, items.flatMap(item => Array(item :: (-1 :: prefix), item :: prefix)).toSet) }
206+
.mapValues(_.toSet)
209207
.collect()
210208
.toMap
211209

210+
211+
// Frequent patterns with length N+1 and their corresponding counts
212+
val extendedPrefixAndCounts = prefixItemPairAndCounts
213+
.map { case ((prefix, item), count) => (item :: prefix, count) }
214+
212215
// Remaining work, all prefixes will have length N+1
213-
val extendedPrefixAndSuffixWithFlags = prefixSuffixPairs
216+
val extendedPrefixAndSuffix = prefixSuffixPairs
217+
.filter(x => prefixToNextItems.contains(x._1))
214218
.flatMap { case (prefix, suffix) =>
215-
if (prefixToNextPrefixes.contains(prefix)) {
216-
val frequentNextPrefixes = prefixToNextPrefixes(prefix)
217-
frequentNextPrefixes.map { nextPrefix =>
218-
val suffixWithFlag = LocalPrefixSpan.getSuffix(nextPrefix, suffix)
219-
(nextPrefix, if (suffixWithFlag._1) 1L else 0L, suffixWithFlag._2)
219+
val frequentNextItems = prefixToNextItems(prefix)
220+
val filteredSuffix = suffix.filter(frequentNextItems.contains(_))
221+
frequentNextItems.flatMap { item =>
222+
LocalPrefixSpan.getSuffix(item, filteredSuffix) match {
223+
case suffix if !suffix.isEmpty => Some(item :: prefix, suffix)
224+
case _ => None
220225
}
221-
} else {
222-
None
223226
}
224-
}.persist(StorageLevel.MEMORY_AND_DISK)
225-
val extendedPrefixAndCounts = extendedPrefixAndSuffixWithFlags
226-
.map(x => (x._1, x._2))
227-
.reduceByKey(_ + _)
228-
.filter(_._2 >= minCount)
229-
val extendedPrefixAndSuffix = extendedPrefixAndSuffixWithFlags
230-
.map(x => (x._1, x._3))
231-
.filter(_._2.nonEmpty)
232-
extendedPrefixAndSuffixWithFlags.unpersist()
227+
}
233228

234229
(extendedPrefixAndCounts, extendedPrefixAndSuffix)
235230
}
@@ -245,9 +240,9 @@ class PrefixSpan private (
245240
data: RDD[(List[Int], Iterable[Array[Int]])]): RDD[(List[Int], Long)] = {
246241
data.flatMap {
247242
case (prefix, projDB) =>
248-
LocalPrefixSpan.run(minCount, maxPatternLength, prefix, projDB)
243+
LocalPrefixSpan.run(minCount, maxPatternLength, prefix.toList.reverse, projDB)
249244
.map { case (pattern: List[Int], count: Long) =>
250-
(pattern, count)
245+
(pattern.reverse, count)
251246
}
252247
}
253248
}

0 commit comments

Comments
 (0)