@@ -45,7 +45,11 @@ class PrefixSpan private (
4545 private var minSupport : Double ,
4646 private var maxPatternLength : Int ) extends Logging with Serializable {
4747
48- private val maxProjectedDBSizeBeforeLocalProcessing : Long = 10000
48+ /**
49+ * The maximum number of items allowed in a projected database before local processing. If a
50+ * projected database exceeds this size, another iteration of distributed PrefixSpan is run.
51+ */
52+ private val maxLocalProjDBSize : Long = 10000
4953
5054 /**
5155 * Constructs a default instance with default parameters
@@ -63,8 +67,7 @@ class PrefixSpan private (
6367 * Sets the minimal support level (default: `0.1`).
6468 */
6569 def setMinSupport (minSupport : Double ): this .type = {
66- require(minSupport >= 0 && minSupport <= 1 ,
67- " The minimum support value must be in [0, 1]." )
70+ require(minSupport >= 0 && minSupport <= 1 , " The minimum support value must be in [0, 1]." )
6871 this .minSupport = minSupport
6972 this
7073 }
@@ -79,8 +82,7 @@ class PrefixSpan private (
7982 */
8083 def setMaxPatternLength (maxPatternLength : Int ): this .type = {
8184 // TODO: support unbounded pattern length when maxPatternLength = 0
82- require(maxPatternLength >= 1 ,
83- " The maximum pattern length value must be greater than 0." )
85+ require(maxPatternLength >= 1 , " The maximum pattern length value must be greater than 0." )
8486 this .maxPatternLength = maxPatternLength
8587 this
8688 }
@@ -119,13 +121,13 @@ class PrefixSpan private (
119121 }.filter(_._2.nonEmpty)
120122 }
121123 }
122- var (smallPrefixSuffixPairs, largePrefixSuffixPairs) = splitPrefixSuffixPairs (prefixSuffixPairs)
124+ var (smallPrefixSuffixPairs, largePrefixSuffixPairs) = partitionByProjDBSize (prefixSuffixPairs)
123125
124126 while (largePrefixSuffixPairs.count() != 0 ) {
125127 val (nextPatternAndCounts, nextPrefixSuffixPairs) =
126128 getPatternCountsAndPrefixSuffixPairs(minCount, largePrefixSuffixPairs)
127129 largePrefixSuffixPairs.unpersist()
128- val (smallerPairsPart, largerPairsPart) = splitPrefixSuffixPairs (nextPrefixSuffixPairs)
130+ val (smallerPairsPart, largerPairsPart) = partitionByProjDBSize (nextPrefixSuffixPairs)
129131 largePrefixSuffixPairs = largerPairsPart
130132 largePrefixSuffixPairs.persist(StorageLevel .MEMORY_AND_DISK )
131133 smallPrefixSuffixPairs ++= smallerPairsPart
@@ -136,7 +138,6 @@ class PrefixSpan private (
136138 val projectedDatabase = smallPrefixSuffixPairs
137139 // TODO aggregateByKey
138140 .groupByKey()
139- .mapValues(_.toArray)
140141 val nextPatternAndCounts = getPatternsInLocal(minCount, projectedDatabase)
141142 allPatternAndCounts ++= nextPatternAndCounts
142143 }
@@ -145,23 +146,21 @@ class PrefixSpan private (
145146
146147
147148 /**
148- * Split prefix suffix pairs to two parts:
149- * Prefixes with projected databases smaller than maxSuffixesBeforeLocalProcessing and
150- * Prefixes with projected databases larger than maxSuffixesBeforeLocalProcessing
149+ * Partitions the prefix-suffix pairs by projected database size.
150+ *
151151 * @param prefixSuffixPairs prefix (length n) and suffix pairs,
152- * @return small size prefix suffix pairs and big size prefix suffix pairs
153- * (RDD[prefix, suffix], RDD[prefix, suffix ])
152+ * @return prefix- suffix pairs partitioned by whether their projected database size is <= or
153+ * greater than [[ maxLocalProjDBSize ]]
154154 */
155- private def splitPrefixSuffixPairs (
156- prefixSuffixPairs : RDD [(List [Int ], Array [Int ])]):
157- (RDD [(List [Int ], Array [Int ])], RDD [(List [Int ], Array [Int ])]) = {
155+ private def partitionByProjDBSize (prefixSuffixPairs : RDD [(List [Int ], Array [Int ])])
156+ : (RDD [(List [Int ], Array [Int ])], RDD [(List [Int ], Array [Int ])]) = {
158157 val prefixToSuffixSize = prefixSuffixPairs
159158 .aggregateByKey(0 )(
160159 seqOp = { case (count, suffix) => count + suffix.length },
161160 combOp = { _ + _ })
162161 val smallPrefixes = prefixToSuffixSize
163- .filter(_._2 <= maxProjectedDBSizeBeforeLocalProcessing )
164- .map(_._1)
162+ .filter(_._2 <= maxLocalProjDBSize )
163+ .keys
165164 .collect()
166165 .toSet
167166 val small = prefixSuffixPairs.filter { case (prefix, _) => smallPrefixes.contains(prefix) }
@@ -214,7 +213,7 @@ class PrefixSpan private (
214213 */
215214 private def getPatternsInLocal (
216215 minCount : Long ,
217- data : RDD [(List [Int ], Array [Array [Int ]])]): RDD [(List [Int ], Long )] = {
216+ data : RDD [(List [Int ], Iterable [Array [Int ]])]): RDD [(List [Int ], Long )] = {
218217 data.flatMap {
219218 case (prefix, projDB) =>
220219 LocalPrefixSpan .run(minCount, maxPatternLength, prefix.toList.reverse, projDB)
0 commit comments