Skip to content

Commit 4fbf332

Browse files
ericlmengxr
authored andcommitted
[SPARK-9698] [ML] Add RInteraction transformer for supporting R-style feature interactions
This is a pre-req for supporting the ":" operator in the RFormula feature transformer. Design doc from umbrella task: https://docs.google.com/document/d/10NZNSEurN2EdWM31uFYsgayIPfCFHiuIu3pCWrUmP_c/edit mengxr Author: Eric Liang <[email protected]> Closes #7987 from ericl/interaction.
1 parent f1c9115 commit 4fbf332

File tree

2 files changed

+443
-0
lines changed

2 files changed

+443
-0
lines changed
Lines changed: 278 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,278 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.ml.feature
19+
20+
import scala.collection.mutable.ArrayBuilder
21+
22+
import org.apache.spark.SparkException
23+
import org.apache.spark.annotation.Experimental
24+
import org.apache.spark.ml.attribute._
25+
import org.apache.spark.ml.param._
26+
import org.apache.spark.ml.param.shared._
27+
import org.apache.spark.ml.util.Identifiable
28+
import org.apache.spark.ml.Transformer
29+
import org.apache.spark.mllib.linalg.{Vector, VectorUDT, Vectors}
30+
import org.apache.spark.sql.{DataFrame, Row}
31+
import org.apache.spark.sql.functions._
32+
import org.apache.spark.sql.types._
33+
34+
/**
35+
* :: Experimental ::
36+
* Implements the feature interaction transform. This transformer takes in Double and Vector type
37+
* columns and outputs a flattened vector of their feature interactions. To handle interaction,
38+
* we first one-hot encode any nominal features. Then, a vector of the feature cross-products is
39+
* produced.
40+
*
41+
* For example, given the input feature values `Double(2)` and `Vector(3, 4)`, the output would be
42+
* `Vector(6, 8)` if all input features were numeric. If the first feature was instead nominal
43+
* with four categories, the output would then be `Vector(0, 0, 0, 0, 3, 4, 0, 0)`.
44+
*/
45+
@Experimental
46+
class Interaction(override val uid: String) extends Transformer
47+
with HasInputCols with HasOutputCol {
48+
49+
def this() = this(Identifiable.randomUID("interaction"))
50+
51+
/** @group setParam */
52+
def setInputCols(values: Array[String]): this.type = set(inputCols, values)
53+
54+
/** @group setParam */
55+
def setOutputCol(value: String): this.type = set(outputCol, value)
56+
57+
// optimistic schema; does not contain any ML attributes
58+
override def transformSchema(schema: StructType): StructType = {
59+
validateParams()
60+
StructType(schema.fields :+ StructField($(outputCol), new VectorUDT, false))
61+
}
62+
63+
override def transform(dataset: DataFrame): DataFrame = {
64+
validateParams()
65+
val inputFeatures = $(inputCols).map(c => dataset.schema(c))
66+
val featureEncoders = getFeatureEncoders(inputFeatures)
67+
val featureAttrs = getFeatureAttrs(inputFeatures)
68+
69+
def interactFunc = udf { row: Row =>
70+
var indices = ArrayBuilder.make[Int]
71+
var values = ArrayBuilder.make[Double]
72+
var size = 1
73+
indices += 0
74+
values += 1.0
75+
var featureIndex = row.length - 1
76+
while (featureIndex >= 0) {
77+
val prevIndices = indices.result()
78+
val prevValues = values.result()
79+
val prevSize = size
80+
val currentEncoder = featureEncoders(featureIndex)
81+
indices = ArrayBuilder.make[Int]
82+
values = ArrayBuilder.make[Double]
83+
size *= currentEncoder.outputSize
84+
currentEncoder.foreachNonzeroOutput(row(featureIndex), (i, a) => {
85+
var j = 0
86+
while (j < prevIndices.length) {
87+
indices += prevIndices(j) + i * prevSize
88+
values += prevValues(j) * a
89+
j += 1
90+
}
91+
})
92+
featureIndex -= 1
93+
}
94+
Vectors.sparse(size, indices.result(), values.result()).compressed
95+
}
96+
97+
val featureCols = inputFeatures.map { f =>
98+
f.dataType match {
99+
case DoubleType => dataset(f.name)
100+
case _: VectorUDT => dataset(f.name)
101+
case _: NumericType | BooleanType => dataset(f.name).cast(DoubleType)
102+
}
103+
}
104+
dataset.select(
105+
col("*"),
106+
interactFunc(struct(featureCols: _*)).as($(outputCol), featureAttrs.toMetadata()))
107+
}
108+
109+
/**
110+
* Creates a feature encoder for each input column, which supports efficient iteration over
111+
* one-hot encoded feature values. See also the class-level comment of [[FeatureEncoder]].
112+
*
113+
* @param features The input feature columns to create encoders for.
114+
*/
115+
private def getFeatureEncoders(features: Seq[StructField]): Array[FeatureEncoder] = {
116+
def getNumFeatures(attr: Attribute): Int = {
117+
attr match {
118+
case nominal: NominalAttribute =>
119+
math.max(1, nominal.getNumValues.getOrElse(
120+
throw new SparkException("Nominal features must have attr numValues defined.")))
121+
case _ =>
122+
1 // numeric feature
123+
}
124+
}
125+
features.map { f =>
126+
val numFeatures = f.dataType match {
127+
case _: NumericType | BooleanType =>
128+
Array(getNumFeatures(Attribute.fromStructField(f)))
129+
case _: VectorUDT =>
130+
val attrs = AttributeGroup.fromStructField(f).attributes.getOrElse(
131+
throw new SparkException("Vector attributes must be defined for interaction."))
132+
attrs.map(getNumFeatures).toArray
133+
}
134+
new FeatureEncoder(numFeatures)
135+
}.toArray
136+
}
137+
138+
/**
139+
* Generates ML attributes for the output vector of all feature interactions. We make a best
140+
* effort to generate reasonable names for output features, based on the concatenation of the
141+
* interacting feature names and values delimited with `_`. When no feature name is specified,
142+
* we fall back to using the feature index (e.g. `foo:bar_2_0` may indicate an interaction
143+
* between the numeric `foo` feature and a nominal third feature from column `bar`.
144+
*
145+
* @param features The input feature columns to the Interaction transformer.
146+
*/
147+
private def getFeatureAttrs(features: Seq[StructField]): AttributeGroup = {
148+
var featureAttrs: Seq[Attribute] = Nil
149+
features.reverse.foreach { f =>
150+
val encodedAttrs = f.dataType match {
151+
case _: NumericType | BooleanType =>
152+
val attr = Attribute.fromStructField(f)
153+
encodedFeatureAttrs(Seq(attr), None)
154+
case _: VectorUDT =>
155+
val group = AttributeGroup.fromStructField(f)
156+
encodedFeatureAttrs(group.attributes.get, Some(group.name))
157+
}
158+
if (featureAttrs.isEmpty) {
159+
featureAttrs = encodedAttrs
160+
} else {
161+
featureAttrs = encodedAttrs.flatMap { head =>
162+
featureAttrs.map { tail =>
163+
NumericAttribute.defaultAttr.withName(head.name.get + ":" + tail.name.get)
164+
}
165+
}
166+
}
167+
}
168+
new AttributeGroup($(outputCol), featureAttrs.toArray)
169+
}
170+
171+
/**
172+
* Generates the output ML attributes for a single input feature. Each output feature name has
173+
* up to three parts: the group name, feature name, and category name (for nominal features),
174+
* each separated by an underscore.
175+
*
176+
* @param inputAttrs The attributes of the input feature.
177+
* @param groupName Optional name of the input feature group (for Vector type features).
178+
*/
179+
private def encodedFeatureAttrs(
180+
inputAttrs: Seq[Attribute],
181+
groupName: Option[String]): Seq[Attribute] = {
182+
183+
def format(
184+
index: Int,
185+
attrName: Option[String],
186+
categoryName: Option[String]): String = {
187+
val parts = Seq(groupName, Some(attrName.getOrElse(index.toString)), categoryName)
188+
parts.flatten.mkString("_")
189+
}
190+
191+
inputAttrs.zipWithIndex.flatMap {
192+
case (nominal: NominalAttribute, i) =>
193+
if (nominal.values.isDefined) {
194+
nominal.values.get.map(
195+
v => BinaryAttribute.defaultAttr.withName(format(i, nominal.name, Some(v))))
196+
} else {
197+
Array.tabulate(nominal.getNumValues.get)(
198+
j => BinaryAttribute.defaultAttr.withName(format(i, nominal.name, Some(j.toString))))
199+
}
200+
case (a: Attribute, i) =>
201+
Seq(NumericAttribute.defaultAttr.withName(format(i, a.name, None)))
202+
}
203+
}
204+
205+
override def copy(extra: ParamMap): Interaction = defaultCopy(extra)
206+
207+
override def validateParams(): Unit = {
208+
require(get(inputCols).isDefined, "Input cols must be defined first.")
209+
require(get(outputCol).isDefined, "Output col must be defined first.")
210+
require($(inputCols).length > 0, "Input cols must have non-zero length.")
211+
require($(inputCols).distinct.length == $(inputCols).length, "Input cols must be distinct.")
212+
}
213+
}
214+
215+
/**
216+
* This class performs on-the-fly one-hot encoding of features as you iterate over them. To
217+
* indicate which input features should be one-hot encoded, an array of the feature counts
218+
* must be passed in ahead of time.
219+
*
220+
* @param numFeatures Array of feature counts for each input feature. For nominal features this
221+
* count is equal to the number of categories. For numeric features the count
222+
* should be set to 1.
223+
*/
224+
private[ml] class FeatureEncoder(numFeatures: Array[Int]) {
225+
assert(numFeatures.forall(_ > 0), "Features counts must all be positive.")
226+
227+
/** The size of the output vector. */
228+
val outputSize = numFeatures.sum
229+
230+
/** Precomputed offsets for the location of each output feature. */
231+
private val outputOffsets = {
232+
val arr = new Array[Int](numFeatures.length)
233+
var i = 1
234+
while (i < arr.length) {
235+
arr(i) = arr(i - 1) + numFeatures(i - 1)
236+
i += 1
237+
}
238+
arr
239+
}
240+
241+
/**
242+
* Given an input row of features, invokes the specific function for every non-zero output.
243+
*
244+
* @param value The row value to encode, either a Double or Vector.
245+
* @param f The callback to invoke on each non-zero (index, value) output pair.
246+
*/
247+
def foreachNonzeroOutput(value: Any, f: (Int, Double) => Unit): Unit = value match {
248+
case d: Double =>
249+
assert(numFeatures.length == 1, "DoubleType columns should only contain one feature.")
250+
val numOutputCols = numFeatures.head
251+
if (numOutputCols > 1) {
252+
assert(
253+
d >= 0.0 && d == d.toInt && d < numOutputCols,
254+
s"Values from column must be indices, but got $d.")
255+
f(d.toInt, 1.0)
256+
} else {
257+
f(0, d)
258+
}
259+
case vec: Vector =>
260+
assert(numFeatures.length == vec.size,
261+
s"Vector column size was ${vec.size}, expected ${numFeatures.length}")
262+
vec.foreachActive { (i, v) =>
263+
val numOutputCols = numFeatures(i)
264+
if (numOutputCols > 1) {
265+
assert(
266+
v >= 0.0 && v == v.toInt && v < numOutputCols,
267+
s"Values from column must be indices, but got $v.")
268+
f(outputOffsets(i) + v.toInt, 1.0)
269+
} else {
270+
f(outputOffsets(i), v)
271+
}
272+
}
273+
case null =>
274+
throw new SparkException("Values to interact cannot be null.")
275+
case o =>
276+
throw new SparkException(s"$o of type ${o.getClass.getName} is not supported.")
277+
}
278+
}

0 commit comments

Comments
 (0)