Skip to content

Commit 6787716

Browse files
author
Feynman Liang
committed
Working on temporal sequences
1 parent f1114b9 commit 6787716

File tree

3 files changed

+102
-23
lines changed

3 files changed

+102
-23
lines changed

mllib/src/main/scala/org/apache/spark/mllib/fpm/LocalPrefixSpan.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,15 @@ private[fpm] object LocalPrefixSpan extends Logging with Serializable {
4141
maxPatternLength: Int,
4242
prefixes: List[Int],
4343
database: Iterable[Array[Int]]): Iterator[(List[Int], Long)] = {
44-
if (prefixes.length == maxPatternLength || database.isEmpty) return Iterator.empty
44+
if (prefixes.count(_ == DELIMITER) == maxPatternLength || database.isEmpty) {
45+
return Iterator.empty
46+
}
4547
val frequentItemAndCounts = getFreqItemAndCounts(minCount, database)
4648
val filteredDatabase = database.map { suffix =>
4749
suffix.filter(item => item == DELIMITER || frequentItemAndCounts.contains(item))
4850
}
4951
frequentItemAndCounts.iterator.flatMap { case (item, count) =>
50-
val newPrefixes = item :: prefixes
52+
val newPrefixes = DELIMITER :: item :: prefixes
5153
val newProjected = project(filteredDatabase, item)
5254
Iterator.single((newPrefixes, count)) ++
5355
run(minCount, maxPatternLength, newPrefixes, newProjected)
@@ -65,7 +67,7 @@ private[fpm] object LocalPrefixSpan extends Logging with Serializable {
6567
if (index == -1) {
6668
Array()
6769
} else {
68-
// drop until we get to the next delimiter (or end of sequence)
70+
// in case index is inside an itemset, drop until we get to the next delimiter (or end of seq)
6971
sequence.drop(index).dropWhile(_ != DELIMITER).drop(1)
7072
}
7173
}

mllib/src/main/scala/org/apache/spark/mllib/fpm/PrefixSpan.scala

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -91,9 +91,8 @@ class PrefixSpan private (
9191

9292
/**
9393
* Find the complete set of sequential patterns in the input sequences.
94-
* @param sequences a dataset of sequences. Items in a sequence are represented by non-negative
95-
* integers and delimited by [[DELIMITER]]. Non-temporal sequences
96-
* are supported by placing more than one item between delimiters.
94+
* @param sequences ordered sequences of itemsets. Items are represented by non-negative integers.
95+
* Each itemset has one or more items and is delimited by [[DELIMITER]].
9796
* @return a set of sequential pattern pairs,
9897
* the key of pair is pattern (a list of elements),
9998
* the value of pair is the pattern's count.
@@ -124,22 +123,23 @@ class PrefixSpan private (
124123
freqItems.flatMap { item =>
125124
val candidateSuffix = LocalPrefixSpan.getSuffix(item, filteredSeq)
126125
candidateSuffix match {
127-
case suffix if !suffix.isEmpty => Some((List(item), suffix))
126+
case suffix if !suffix.isEmpty => Some((List(DELIMITER, item), suffix))
128127
case _ => None
129128
}
130129
}
131130
}
132131
}
133132
// Accumulator for the computed results to be returned, initialized to the frequent items (i.e.
134133
// frequent length-one prefixes)
135-
var resultsAccumulator = freqItemCounts.map(x => (List(x._1), x._2))
134+
var resultsAccumulator = freqItemCounts.map(x => (List(DELIMITER, x._1), x._2))
136135

137136
// Remaining work to be locally and distributively processed respectfully
138137
var (pairsForLocal, pairsForDistributed) = partitionByProjDBSize(itemSuffixPairs)
139138

140139
// Continue processing until no pairs for distributed processing remain (i.e. all prefixes have
141-
// projected database sizes <= `maxLocalProjDBSize`)
142-
while (pairsForDistributed.count() != 0) {
140+
// projected database sizes <= `maxLocalProjDBSize`) or `maxPatternLength` is reached
141+
var patternLength = 1
142+
while (pairsForDistributed.count() != 0 || patternLength < maxPatternLength) {
143143
val (nextPatternAndCounts, nextPrefixSuffixPairs) =
144144
extendPrefixes(minCount, pairsForDistributed)
145145
pairsForDistributed.unpersist()
@@ -148,14 +148,15 @@ class PrefixSpan private (
148148
pairsForDistributed.persist(StorageLevel.MEMORY_AND_DISK)
149149
pairsForLocal ++= smallerPairsPart
150150
resultsAccumulator ++= nextPatternAndCounts.collect()
151+
patternLength += 1 // pattern length grows one per iteration
151152
}
152153

153154
// Process the small projected databases locally
154155
val remainingResults = getPatternsInLocal(
155156
minCount, sc.parallelize(pairsForLocal, 1).groupByKey())
156157

157158
(sc.parallelize(resultsAccumulator, 1) ++ remainingResults)
158-
.map { case (pattern, count) => (pattern.toArray, count) }
159+
.map { case (pattern, count) => (pattern.reverse.toArray, count) }
159160
}
160161

161162

@@ -209,10 +210,9 @@ class PrefixSpan private (
209210
.collect()
210211
.toMap
211212

212-
213213
// Frequent patterns with length N+1 and their corresponding counts
214214
val extendedPrefixAndCounts = prefixItemPairAndCounts
215-
.map { case ((prefix, item), count) => (item :: prefix, count) }
215+
.map { case ((prefix, item), count) => (DELIMITER :: item :: prefix, count) }
216216

217217
// Remaining work, all prefixes will have length N+1
218218
val extendedPrefixAndSuffix = prefixSuffixPairs
@@ -222,7 +222,7 @@ class PrefixSpan private (
222222
val filteredSuffix = suffix.filter(frequentNextItems.contains(_))
223223
frequentNextItems.flatMap { item =>
224224
LocalPrefixSpan.getSuffix(item, filteredSuffix) match {
225-
case suffix if !suffix.isEmpty => Some(item :: prefix, suffix)
225+
case suffix if !suffix.isEmpty => Some(DELIMITER :: item :: prefix, suffix)
226226
case _ => None
227227
}
228228
}
@@ -242,9 +242,9 @@ class PrefixSpan private (
242242
data: RDD[(List[Int], Iterable[Array[Int]])]): RDD[(List[Int], Long)] = {
243243
data.flatMap {
244244
case (prefix, projDB) =>
245-
LocalPrefixSpan.run(minCount, maxPatternLength, prefix.toList.reverse, projDB)
245+
LocalPrefixSpan.run(minCount, maxPatternLength, prefix.toList, projDB)
246246
.map { case (pattern: List[Int], count: Long) =>
247-
(pattern.reverse, count)
247+
(pattern, count)
248248
}
249249
}
250250
}

mllib/src/test/scala/org/apache/spark/mllib/fpm/PrefixSpanSuite.scala

Lines changed: 84 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext {
6868
(Array(4), 4L),
6969
(Array(4, 5), 2L),
7070
(Array(5), 3L)
71-
)
71+
).map { case (seq, count) => (insertDelimiter(seq), count) }
7272
compareResults(expectedValue1, result1.collect())
7373

7474
prefixspan.setMinSupport(0.5).setMaxPatternLength(50)
@@ -79,7 +79,7 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext {
7979
(Array(3, 4), 3L),
8080
(Array(4), 4L),
8181
(Array(5), 3L)
82-
)
82+
).map { case (seq, count) => (insertDelimiter(seq), count) }
8383
compareResults(expectedValue2, result2.collect())
8484

8585
prefixspan.setMinSupport(0.33).setMaxPatternLength(2)
@@ -99,15 +99,92 @@ class PrefixSpanSuite extends SparkFunSuite with MLlibTestSparkContext {
9999
(Array(4), 4L),
100100
(Array(4, 5), 2L),
101101
(Array(5), 3L)
102-
)
102+
).map { case (seq, count) => (insertDelimiter(seq), count) }
103103
compareResults(expectedValue3, result3.collect())
104104
}
105105

106+
test("PrefixSpan non-temporal sequences") {
107+
val sequences = Array(
108+
"a,abc,ac,d,cf",
109+
"ad,c,bc,ae",
110+
"ef,ab,df,c,b",
111+
"e,g,af,c,b,c")
112+
val coder = Array('a', 'b', 'c', 'd', 'e', 'f', 'g').zip(Array(1, 2, 3, 4, 5, 6, 7)).toMap
113+
val intSequences = sequences.map(_.split(",").flatMap(-1 +: _.toArray.map(coder)).drop(1))
114+
val data = sc.parallelize(intSequences, 2).cache()
115+
val prefixspan = new PrefixSpan()
116+
.setMinSupport(0.5)
117+
.setMaxPatternLength(5)
118+
119+
val results = prefixspan.run(data)
120+
val expectedValue4 = Array(
121+
"a:4",
122+
"b:4",
123+
"c:4",
124+
"d:3",
125+
"e:3",
126+
"f:3",
127+
"a,a:2",
128+
"a,b:4",
129+
"a,bc:2",
130+
"a,bc,a:2",
131+
"a,b,a:2",
132+
"a,b,c:2",
133+
"ab:2",
134+
"ab,c:2",
135+
"ab,d:2",
136+
"ab,d,c:2",
137+
"ab,f:2",
138+
"a,c:4",
139+
"a,c,a:2",
140+
"a,c,b:3",
141+
"a,c,c:3",
142+
"a,d:2",
143+
"a,d,c:2",
144+
"a,f:2",
145+
"b,a:2",
146+
"b,c:3",
147+
"bc:2",
148+
"bc,a:2",
149+
"b,d:2",
150+
"b,d,c:2",
151+
"b,f:2",
152+
"c,a:2",
153+
"c,b:3",
154+
"c,c:3",
155+
"d,b:2",
156+
"d,c:3",
157+
"d,c,b:2",
158+
"e,a:2",
159+
"e,a,b:2",
160+
"e,a,c:2",
161+
"e,a,c,b:2",
162+
"e,b:2",
163+
"e,b,c:2",
164+
"e,c:2",
165+
"e,c,b:2",
166+
"e,f:2",
167+
"e,f,b:2",
168+
"e,f,c:2",
169+
"e,f,c,b:2",
170+
"f,b:2",
171+
"f,b,c:2",
172+
"f,c:2",
173+
"f,c,b:2")
174+
val intExpectedValue = expectedValue4
175+
.map(_.split(":"))
176+
.map { x => (x(0).split(",").flatMap(-1 +: _.toArray.map(coder)), x(1).toLong) }
177+
compareResults(intExpectedValue, results.collect())
178+
}
179+
106180
private def compareResults(
107-
expectedValue: Array[(Array[Int], Long)],
108-
actualValue: Array[(Array[Int], Long)]): Unit = {
109-
assert(expectedValue.map(x => (x._1.toSeq, x._2)).toSet ===
110-
actualValue.map(x => (x._1.toSeq, x._2)).toSet)
181+
expectedValue: Array[(Array[Int], Long)],
182+
actualValue: Array[(Array[Int], Long)]): Unit = {
183+
val expectedSet = expectedValue.map(x => (x._1.toSeq, x._2)).toSet
184+
val actualSet = actualValue.map(x => (x._1.toSeq, x._2)).toSet
185+
println(s"missing expected:\n${expectedSet.diff(actualSet)}")
186+
println(s"extra actual:\n${actualSet.diff(expectedSet)}")
187+
assert(expectedSet === actualSet)
111188
}
112189

113190
private def insertDelimiter(sequence: Array[Int]): Array[Int] = {

0 commit comments

Comments
 (0)