Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions R/pkg/R/mllib_fpm.R
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,11 @@ setMethod("spark.freqItemsets", signature(object = "FPGrowthModel"),
# Get association rules.

#' @return A \code{SparkDataFrame} with association rules.
#' The \code{SparkDataFrame} contains three columns:
#' The \code{SparkDataFrame} contains four columns:
#' \code{antecedent} (an array of the same type as the input column),
#' \code{consequent} (an array of the same type as the input column),
#' and \code{condfidence} (confidence).
#' \code{condfidence} (confidence for the rule)
#' and \code{lift} (lift for the rule)
#' @rdname spark.fpGrowth
#' @aliases associationRules,FPGrowthModel-method
#' @note spark.associationRules(FPGrowthModel) since 2.2.0
Expand Down
3 changes: 2 additions & 1 deletion R/pkg/tests/fulltests/test_mllib_fpm.R
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ test_that("spark.fpGrowth", {
expected_association_rules <- data.frame(
antecedent = I(list(list("2"), list("3"))),
consequent = I(list(list("1"), list("1"))),
confidence = c(1, 1)
confidence = c(1, 1),
lift = c(1, 1)
)

expect_equivalent(expected_association_rules, collect(spark.associationRules(model)))
Expand Down
61 changes: 45 additions & 16 deletions mllib/src/main/scala/org/apache/spark/ml/fpm/FPGrowth.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package org.apache.spark.ml.fpm
import scala.reflect.ClassTag

import org.apache.hadoop.fs.Path
import org.json4s.{DefaultFormats, JObject}
import org.json4s.JsonDSL._

import org.apache.spark.annotation.{Experimental, Since}
import org.apache.spark.ml.{Estimator, Model}
Expand All @@ -34,6 +36,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.VersionUtils

/**
* Common params for FPGrowth and FPGrowthModel
Expand Down Expand Up @@ -175,7 +178,8 @@ class FPGrowth @Since("2.2.0") (
if (handlePersistence) {
items.persist(StorageLevel.MEMORY_AND_DISK)
}

val inputRowCount = items.count()
instr.logNumExamples(inputRowCount)
val parentModel = mllibFP.run(items)
val rows = parentModel.freqItemsets.map(f => Row(f.items, f.freq))
val schema = StructType(Seq(
Expand All @@ -187,7 +191,8 @@ class FPGrowth @Since("2.2.0") (
items.unpersist()
}

copyValues(new FPGrowthModel(uid, frequentItems)).setParent(this)
copyValues(new FPGrowthModel(uid, frequentItems, parentModel.itemSupport, inputRowCount))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, so the total count n won't just equal the sum of the count of consequents, for example, because the frequent item set was pruned of infrequent sets? Darn, yeah you need to deal with the case where you know n and where you don't.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, there is no way to know the total size of the training dataset from the frequent itemsets unfortunately... So yes, we need to deal with it unfortunately.

.setParent(this)
}

@Since("2.2.0")
Expand Down Expand Up @@ -217,7 +222,9 @@ object FPGrowth extends DefaultParamsReadable[FPGrowth] {
@Experimental
class FPGrowthModel private[ml] (
@Since("2.2.0") override val uid: String,
@Since("2.2.0") @transient val freqItemsets: DataFrame)
@Since("2.2.0") @transient val freqItemsets: DataFrame,
private val itemSupport: scala.collection.Map[Any, Double],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suppose there's no way of adding the item generic type here; it's really in the schema of the DataFrame. Does Map[_, Double] also work? I don't think it needs a change, just a side question.

If you have the support for every item, do you need the overall count here as well? the item counts have already been divided through by numTrainingRecords here. Below only itemSupport is really passed somewhere else.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, Map[_, Double] seems to work too. I can change it if you prefer.

The numTrainingRecords is used because it needs to be stored (and it then used when loading the model in order to recompute the itemSupport).

private val numTrainingRecords: Long)
extends Model[FPGrowthModel] with FPGrowthParams with MLWritable {

/** @group setParam */
Expand All @@ -241,17 +248,17 @@ class FPGrowthModel private[ml] (
@transient private var _cachedRules: DataFrame = _

/**
* Get association rules fitted using the minConfidence. Returns a dataframe
* with three fields, "antecedent", "consequent" and "confidence", where "antecedent" and
* "consequent" are Array[T] and "confidence" is Double.
* Get association rules fitted using the minConfidence. Returns a dataframe with four fields,
* "antecedent", "consequent", "confidence" and "lift", where "antecedent" and "consequent" are
* Array[T], whereas "confidence" and "lift" are Double.
*/
@Since("2.2.0")
@transient def associationRules: DataFrame = {
if ($(minConfidence) == _cachedMinConf) {
_cachedRules
} else {
_cachedRules = AssociationRules
.getAssociationRulesFromFP(freqItemsets, "items", "freq", $(minConfidence))
.getAssociationRulesFromFP(freqItemsets, "items", "freq", $(minConfidence), itemSupport)
_cachedMinConf = $(minConfidence)
_cachedRules
}
Expand Down Expand Up @@ -301,7 +308,7 @@ class FPGrowthModel private[ml] (

@Since("2.2.0")
override def copy(extra: ParamMap): FPGrowthModel = {
val copied = new FPGrowthModel(uid, freqItemsets)
val copied = new FPGrowthModel(uid, freqItemsets, itemSupport, numTrainingRecords)
copyValues(copied, extra).setParent(this.parent)
}

Expand All @@ -323,7 +330,8 @@ object FPGrowthModel extends MLReadable[FPGrowthModel] {
class FPGrowthModelWriter(instance: FPGrowthModel) extends MLWriter {

override protected def saveImpl(path: String): Unit = {
DefaultParamsWriter.saveMetadata(instance, path, sc)
val extraMetadata: JObject = Map("numTrainingRecords" -> instance.numTrainingRecords)
DefaultParamsWriter.saveMetadata(instance, path, sc, extraMetadata = Some(extraMetadata))
val dataPath = new Path(path, "data").toString
instance.freqItemsets.write.parquet(dataPath)
}
Expand All @@ -335,10 +343,28 @@ object FPGrowthModel extends MLReadable[FPGrowthModel] {
private val className = classOf[FPGrowthModel].getName

override def load(path: String): FPGrowthModel = {
implicit val format = DefaultFormats
val metadata = DefaultParamsReader.loadMetadata(path, sc, className)
val (major, minor) = VersionUtils.majorMinorVersion(metadata.sparkVersion)
val numTrainingRecords = if (major.toInt < 2 || (major.toInt == 2 && minor.toInt < 4)) {
// 2.3 and before don't store the count
0L
} else {
// 2.4+
(metadata.metadata \ "numTrainingRecords").extract[Long]
}
val dataPath = new Path(path, "data").toString
val frequentItems = sparkSession.read.parquet(dataPath)
val model = new FPGrowthModel(metadata.uid, frequentItems)
val itemSupport = if (numTrainingRecords == 0L) {
Map.empty[Any, Double]
} else {
frequentItems.rdd.flatMap {
case Row(items: Seq[_], count: Long) if items.length == 1 =>
Some(items.head -> count.toDouble / numTrainingRecords)
case _ => None
}.collectAsMap()
}
val model = new FPGrowthModel(metadata.uid, frequentItems, itemSupport, numTrainingRecords)
metadata.getAndSetParams(model)
model
}
Expand All @@ -354,27 +380,30 @@ private[fpm] object AssociationRules {
* @param itemsCol column name for frequent itemsets
* @param freqCol column name for appearance count of the frequent itemsets
* @param minConfidence minimum confidence for generating the association rules
* @return a DataFrame("antecedent"[Array], "consequent"[Array], "confidence"[Double])
* containing the association rules.
* @param itemSupport map containing an item and its support
* @return a DataFrame("antecedent"[Array], "consequent"[Array], "confidence"[Double],
* "lift" [Double]) containing the association rules.
*/
def getAssociationRulesFromFP[T: ClassTag](
dataset: Dataset[_],
itemsCol: String,
freqCol: String,
minConfidence: Double): DataFrame = {
minConfidence: Double,
itemSupport: scala.collection.Map[T, Double]): DataFrame = {

val freqItemSetRdd = dataset.select(itemsCol, freqCol).rdd
.map(row => new FreqItemset(row.getSeq[T](0).toArray, row.getLong(1)))
val rows = new MLlibAssociationRules()
.setMinConfidence(minConfidence)
.run(freqItemSetRdd)
.map(r => Row(r.antecedent, r.consequent, r.confidence))
.run(freqItemSetRdd, itemSupport)
.map(r => Row(r.antecedent, r.consequent, r.confidence, r.lift.orNull))

val dt = dataset.schema(itemsCol).dataType
val schema = StructType(Seq(
StructField("antecedent", dt, nullable = false),
StructField("consequent", dt, nullable = false),
StructField("confidence", DoubleType, nullable = false)))
StructField("confidence", DoubleType, nullable = false),
StructField("lift", DoubleType)))
val rules = dataset.sparkSession.createDataFrame(rows, schema)
rules
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,24 @@ class AssociationRules private[fpm] (
/**
* Computes the association rules with confidence above `minConfidence`.
* @param freqItemsets frequent itemset model obtained from [[FPGrowth]]
* @return a `Set[Rule[Item]]` containing the association rules.
* @return a `RDD[Rule[Item]]` containing the association rules.
*
*/
@Since("1.5.0")
def run[Item: ClassTag](freqItemsets: RDD[FreqItemset[Item]]): RDD[Rule[Item]] = {
run(freqItemsets, Map.empty[Item, Double])
}

/**
* Computes the association rules with confidence above `minConfidence`.
* @param freqItemsets frequent itemset model obtained from [[FPGrowth]]
* @param itemSupport map containing an item and its support
* @return a `RDD[Rule[Item]]` containing the association rules. The rules will be able to
* compute also the lift metric.
*/
@Since("2.4.0")
def run[Item: ClassTag](freqItemsets: RDD[FreqItemset[Item]],
itemSupport: scala.collection.Map[Item, Double]): RDD[Rule[Item]] = {
// For candidate rule X => Y, generate (X, (Y, freq(X union Y)))
val candidates = freqItemsets.flatMap { itemset =>
val items = itemset.items
Expand All @@ -76,8 +89,13 @@ class AssociationRules private[fpm] (
// Join to get (X, ((Y, freq(X union Y)), freq(X))), generate rules, and filter by confidence
candidates.join(freqItemsets.map(x => (x.items.toSeq, x.freq)))
.map { case (antecendent, ((consequent, freqUnion), freqAntecedent)) =>
new Rule(antecendent.toArray, consequent.toArray, freqUnion, freqAntecedent)
}.filter(_.confidence >= minConfidence)
new Rule(antecendent.toArray,
consequent.toArray,
freqUnion,
freqAntecedent,
// the consequent contains always only one element
itemSupport.get(consequent.head))
}.filter(_.confidence >= minConfidence)
}

/**
Expand Down Expand Up @@ -107,14 +125,21 @@ object AssociationRules {
@Since("1.5.0") val antecedent: Array[Item],
@Since("1.5.0") val consequent: Array[Item],
freqUnion: Double,
freqAntecedent: Double) extends Serializable {
freqAntecedent: Double,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally these frequencies would have been Longs I think, but too late. Yes, stay consistent.

freqConsequent: Option[Double]) extends Serializable {

/**
* Returns the confidence of the rule.
*
*/
@Since("1.5.0")
def confidence: Double = freqUnion.toDouble / freqAntecedent
def confidence: Double = freqUnion / freqAntecedent

/**
* Returns the lift of the rule.
*/
@Since("2.4.0")
def lift: Option[Double] = freqConsequent.map(fCons => confidence / fCons)

require(antecedent.toSet.intersect(consequent.toSet).isEmpty, {
val sharedItems = antecedent.toSet.intersect(consequent.toSet)
Expand Down Expand Up @@ -142,7 +167,7 @@ object AssociationRules {

override def toString: String = {
s"${antecedent.mkString("{", ",", "}")} => " +
s"${consequent.mkString("{", ",", "}")}: ${confidence}"
s"${consequent.mkString("{", ",", "}")}: (confidence: $confidence; lift: $lift)"
}
}
}
25 changes: 16 additions & 9 deletions mllib/src/main/scala/org/apache/spark/mllib/fpm/FPGrowth.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,22 @@ import org.apache.spark.storage.StorageLevel
* @tparam Item item type
*/
@Since("1.3.0")
class FPGrowthModel[Item: ClassTag] @Since("1.3.0") (
@Since("1.3.0") val freqItemsets: RDD[FreqItemset[Item]])
class FPGrowthModel[Item: ClassTag] @Since("2.4.0") (
@Since("1.3.0") val freqItemsets: RDD[FreqItemset[Item]],
@Since("2.4.0") val itemSupport: Map[Item, Double])
extends Saveable with Serializable {

@Since("1.3.0")
def this(freqItemsets: RDD[FreqItemset[Item]]) = this(freqItemsets, Map.empty)

/**
* Generates association rules for the `Item`s in [[freqItemsets]].
* @param confidence minimal confidence of the rules produced
*/
@Since("1.5.0")
def generateAssociationRules(confidence: Double): RDD[AssociationRules.Rule[Item]] = {
val associationRules = new AssociationRules(confidence)
associationRules.run(freqItemsets)
associationRules.run(freqItemsets, itemSupport)
}

/**
Expand Down Expand Up @@ -213,9 +218,12 @@ class FPGrowth private[spark] (
val minCount = math.ceil(minSupport * count).toLong
val numParts = if (numPartitions > 0) numPartitions else data.partitions.length
val partitioner = new HashPartitioner(numParts)
val freqItems = genFreqItems(data, minCount, partitioner)
val freqItemsets = genFreqItemsets(data, minCount, freqItems, partitioner)
new FPGrowthModel(freqItemsets)
val freqItemsCount = genFreqItems(data, minCount, partitioner)
val freqItemsets = genFreqItemsets(data, minCount, freqItemsCount.map(_._1), partitioner)
val itemSupport = freqItemsCount.map {
case (item, cnt) => item -> cnt.toDouble / count
}.toMap
new FPGrowthModel(freqItemsets, itemSupport)
}

/**
Expand All @@ -231,12 +239,12 @@ class FPGrowth private[spark] (
* Generates frequent items by filtering the input data using minimal support level.
* @param minCount minimum count for frequent itemsets
* @param partitioner partitioner used to distribute items
* @return array of frequent pattern ordered by their frequencies
* @return array of frequent patterns and their frequencies ordered by their frequencies
*/
private def genFreqItems[Item: ClassTag](
data: RDD[Array[Item]],
minCount: Long,
partitioner: Partitioner): Array[Item] = {
partitioner: Partitioner): Array[(Item, Long)] = {
data.flatMap { t =>
val uniq = t.toSet
if (t.length != uniq.size) {
Expand All @@ -248,7 +256,6 @@ class FPGrowth private[spark] (
.filter(_._2 >= minCount)
.collect()
.sortBy(-_._2)
.map(_._1)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ class FPGrowthSuite extends SparkFunSuite with MLlibTestSparkContext with Defaul
val model = new FPGrowth().setMinSupport(0.5).fit(data)
val generatedRules = model.setMinConfidence(0.5).associationRules
val expectedRules = spark.createDataFrame(Seq(
(Array("2"), Array("1"), 1.0),
(Array("1"), Array("2"), 0.75)
)).toDF("antecedent", "consequent", "confidence")
(Array("2"), Array("1"), 1.0, 1.0),
(Array("1"), Array("2"), 0.75, 1.0)
)).toDF("antecedent", "consequent", "confidence", "lift")
.withColumn("antecedent", col("antecedent").cast(ArrayType(dt)))
.withColumn("consequent", col("consequent").cast(ArrayType(dt)))
assert(expectedRules.sort("antecedent").rdd.collect().sameElements(
Expand Down
4 changes: 4 additions & 0 deletions project/MimaExcludes.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ object MimaExcludes {

// Exclude rules for 2.4.x
lazy val v24excludes = v23excludes ++ Seq(
// [SPARK-10697][ML] Add lift to Association rules
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.fpm.FPGrowthModel.this"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are for the private[ml] constructors right? OK to suppress, yes

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, they are the private ones.

ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.fpm.AssociationRules#Rule.this"),

// [SPARK-25044] Address translation of LMF closure primitive args to Object in Scala 2.12
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.sql.expressions.UserDefinedFunction$"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.expressions.UserDefinedFunction.apply"),
Expand Down
3 changes: 2 additions & 1 deletion python/pyspark/ml/fpm.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,11 @@ def freqItemsets(self):
@since("2.2.0")
def associationRules(self):
"""
DataFrame with three columns:
DataFrame with four columns:
* `antecedent` - Array of the same type as the input column.
* `consequent` - Array of the same type as the input column.
* `confidence` - Confidence for the rule (`DoubleType`).
* `lift` - Lift for the rule (`DoubleType`).
"""
return self._call_java("associationRules")

Expand Down
4 changes: 2 additions & 2 deletions python/pyspark/ml/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -2158,8 +2158,8 @@ def test_association_rules(self):
fpm = fp.fit(self.data)

expected_association_rules = self.spark.createDataFrame(
[([3], [1], 1.0), ([2], [1], 1.0)],
["antecedent", "consequent", "confidence"]
[([3], [1], 1.0, 1.0), ([2], [1], 1.0, 1.0)],
["antecedent", "consequent", "confidence", "lift"]
)
actual_association_rules = fpm.associationRules

Expand Down