Skip to content

Commit 0b10662

Browse files
BenFradetmengxr
authored andcommitted
[SPARK-8575] [SQL] Deprecate callUDF in favor of udf
Follow up of [SPARK-8356](https://issues.apache.org/jira/browse/SPARK-8356) and apache#6902. Removes the unit test for the now deprecated ```callUdf``` Unit test in SQLQuerySuite now uses ```udf``` instead of ```callUDF``` Replaced ```callUDF``` by ```udf``` where possible in mllib Author: BenFradet <[email protected]> Closes apache#6993 from BenFradet/SPARK-8575 and squashes the following commits: 26f5a7a [BenFradet] 2 spaces instead of 1 1ddb452 [BenFradet] renamed initUDF in order to be consistent in OneVsRest 48ca15e [BenFradet] used vector type tag for udf call in VectorIndexer 0ebd0da [BenFradet] replace the now deprecated callUDF by udf in VectorIndexer 8013409 [BenFradet] replaced the now deprecated callUDF by udf in Predictor 94345b5 [BenFradet] unifomized udf calls in ProbabilisticClassifier 1305492 [BenFradet] uniformized udf calls in Classifier a672228 [BenFradet] uniformized udf calls in OneVsRest 49e4904 [BenFradet] Revert "removal of the unit test for the now deprecated callUdf" bbdeaf3 [BenFradet] fixed syntax for init udf in OneVsRest fe2a10b [BenFradet] callUDF => udf in ProbabilisticClassifier 0ea30b3 [BenFradet] callUDF => udf in Classifier where possible 197ec82 [BenFradet] callUDF => udf in OneVsRest 84d6780 [BenFradet] modified unit test in SQLQuerySuite to use udf instead of callUDF 477709f [BenFradet] removal of the unit test for the now deprecated callUdf
1 parent dfde31d commit 0b10662

File tree

6 files changed

+46
-35
lines changed

6 files changed

+46
-35
lines changed

mllib/src/main/scala/org/apache/spark/ml/Predictor.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -122,9 +122,7 @@ abstract class Predictor[
122122
*/
123123
protected def extractLabeledPoints(dataset: DataFrame): RDD[LabeledPoint] = {
124124
dataset.select($(labelCol), $(featuresCol))
125-
.map { case Row(label: Double, features: Vector) =>
126-
LabeledPoint(label, features)
127-
}
125+
.map { case Row(label: Double, features: Vector) => LabeledPoint(label, features) }
128126
}
129127
}
130128

@@ -171,7 +169,10 @@ abstract class PredictionModel[FeaturesType, M <: PredictionModel[FeaturesType,
171169
override def transform(dataset: DataFrame): DataFrame = {
172170
transformSchema(dataset.schema, logging = true)
173171
if ($(predictionCol).nonEmpty) {
174-
dataset.withColumn($(predictionCol), callUDF(predict _, DoubleType, col($(featuresCol))))
172+
val predictUDF = udf { (features: Any) =>
173+
predict(features.asInstanceOf[FeaturesType])
174+
}
175+
dataset.withColumn($(predictionCol), predictUDF(col($(featuresCol))))
175176
} else {
176177
this.logWarning(s"$uid: Predictor.transform() was called as NOOP" +
177178
" since no output columns were set.")

mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,15 +102,20 @@ abstract class ClassificationModel[FeaturesType, M <: ClassificationModel[Featur
102102
var outputData = dataset
103103
var numColsOutput = 0
104104
if (getRawPredictionCol != "") {
105-
outputData = outputData.withColumn(getRawPredictionCol,
106-
callUDF(predictRaw _, new VectorUDT, col(getFeaturesCol)))
105+
val predictRawUDF = udf { (features: Any) =>
106+
predictRaw(features.asInstanceOf[FeaturesType])
107+
}
108+
outputData = outputData.withColumn(getRawPredictionCol, predictRawUDF(col(getFeaturesCol)))
107109
numColsOutput += 1
108110
}
109111
if (getPredictionCol != "") {
110112
val predUDF = if (getRawPredictionCol != "") {
111-
callUDF(raw2prediction _, DoubleType, col(getRawPredictionCol))
113+
udf(raw2prediction _).apply(col(getRawPredictionCol))
112114
} else {
113-
callUDF(predict _, DoubleType, col(getFeaturesCol))
115+
val predictUDF = udf { (features: Any) =>
116+
predict(features.asInstanceOf[FeaturesType])
117+
}
118+
predictUDF(col(getFeaturesCol))
114119
}
115120
outputData = outputData.withColumn(getPredictionCol, predUDF)
116121
numColsOutput += 1

mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -88,9 +88,9 @@ final class OneVsRestModel private[ml] (
8888

8989
// add an accumulator column to store predictions of all the models
9090
val accColName = "mbc$acc" + UUID.randomUUID().toString
91-
val init: () => Map[Int, Double] = () => {Map()}
91+
val initUDF = udf { () => Map[Int, Double]() }
9292
val mapType = MapType(IntegerType, DoubleType, valueContainsNull = false)
93-
val newDataset = dataset.withColumn(accColName, callUDF(init, mapType))
93+
val newDataset = dataset.withColumn(accColName, initUDF())
9494

9595
// persist if underlying dataset is not persistent.
9696
val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE
@@ -106,13 +106,12 @@ final class OneVsRestModel private[ml] (
106106

107107
// add temporary column to store intermediate scores and update
108108
val tmpColName = "mbc$tmp" + UUID.randomUUID().toString
109-
val update: (Map[Int, Double], Vector) => Map[Int, Double] =
110-
(predictions: Map[Int, Double], prediction: Vector) => {
111-
predictions + ((index, prediction(1)))
112-
}
113-
val updateUdf = callUDF(update, mapType, col(accColName), col(rawPredictionCol))
109+
val updateUDF = udf { (predictions: Map[Int, Double], prediction: Vector) =>
110+
predictions + ((index, prediction(1)))
111+
}
114112
val transformedDataset = model.transform(df).select(columns : _*)
115-
val updatedDataset = transformedDataset.withColumn(tmpColName, updateUdf)
113+
val updatedDataset = transformedDataset
114+
.withColumn(tmpColName, updateUDF(col(accColName), col(rawPredictionCol)))
116115
val newColumns = origCols ++ List(col(tmpColName))
117116

118117
// switch out the intermediate column with the accumulator column
@@ -124,13 +123,13 @@ final class OneVsRestModel private[ml] (
124123
}
125124

126125
// output the index of the classifier with highest confidence as prediction
127-
val label: Map[Int, Double] => Double = (predictions: Map[Int, Double]) => {
126+
val labelUDF = udf { (predictions: Map[Int, Double]) =>
128127
predictions.maxBy(_._2)._1.toDouble
129128
}
130129

131130
// output label and label metadata as prediction
132-
val labelUdf = callUDF(label, DoubleType, col(accColName))
133-
aggregatedDataset.withColumn($(predictionCol), labelUdf.as($(predictionCol), labelMetadata))
131+
aggregatedDataset
132+
.withColumn($(predictionCol), labelUDF(col(accColName)).as($(predictionCol), labelMetadata))
134133
.drop(accColName)
135134
}
136135

@@ -185,17 +184,15 @@ final class OneVsRest(override val uid: String)
185184

186185
// create k columns, one for each binary classifier.
187186
val models = Range(0, numClasses).par.map { index =>
188-
189-
val label: Double => Double = (label: Double) => {
187+
val labelUDF = udf { (label: Double) =>
190188
if (label.toInt == index) 1.0 else 0.0
191189
}
192190

193191
// generate new label metadata for the binary problem.
194192
// TODO: use when ... otherwise after SPARK-7321 is merged
195-
val labelUDF = callUDF(label, DoubleType, col($(labelCol)))
196193
val newLabelMeta = BinaryAttribute.defaultAttr.withName("label").toMetadata()
197194
val labelColName = "mc2b$" + index
198-
val labelUDFWithNewMeta = labelUDF.as(labelColName, newLabelMeta)
195+
val labelUDFWithNewMeta = labelUDF(col($(labelCol))).as(labelColName, newLabelMeta)
199196
val trainingDataset = multiclassLabeled.withColumn(labelColName, labelUDFWithNewMeta)
200197
val classifier = getClassifier
201198
classifier.fit(trainingDataset, classifier.labelCol -> labelColName)

mllib/src/main/scala/org/apache/spark/ml/classification/ProbabilisticClassifier.scala

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -98,26 +98,34 @@ private[spark] abstract class ProbabilisticClassificationModel[
9898
var outputData = dataset
9999
var numColsOutput = 0
100100
if ($(rawPredictionCol).nonEmpty) {
101-
outputData = outputData.withColumn(getRawPredictionCol,
102-
callUDF(predictRaw _, new VectorUDT, col(getFeaturesCol)))
101+
val predictRawUDF = udf { (features: Any) =>
102+
predictRaw(features.asInstanceOf[FeaturesType])
103+
}
104+
outputData = outputData.withColumn(getRawPredictionCol, predictRawUDF(col(getFeaturesCol)))
103105
numColsOutput += 1
104106
}
105107
if ($(probabilityCol).nonEmpty) {
106108
val probUDF = if ($(rawPredictionCol).nonEmpty) {
107-
callUDF(raw2probability _, new VectorUDT, col($(rawPredictionCol)))
109+
udf(raw2probability _).apply(col($(rawPredictionCol)))
108110
} else {
109-
callUDF(predictProbability _, new VectorUDT, col($(featuresCol)))
111+
val probabilityUDF = udf { (features: Any) =>
112+
predictProbability(features.asInstanceOf[FeaturesType])
113+
}
114+
probabilityUDF(col($(featuresCol)))
110115
}
111116
outputData = outputData.withColumn($(probabilityCol), probUDF)
112117
numColsOutput += 1
113118
}
114119
if ($(predictionCol).nonEmpty) {
115120
val predUDF = if ($(rawPredictionCol).nonEmpty) {
116-
callUDF(raw2prediction _, DoubleType, col($(rawPredictionCol)))
121+
udf(raw2prediction _).apply(col($(rawPredictionCol)))
117122
} else if ($(probabilityCol).nonEmpty) {
118-
callUDF(probability2prediction _, DoubleType, col($(probabilityCol)))
123+
udf(probability2prediction _).apply(col($(probabilityCol)))
119124
} else {
120-
callUDF(predict _, DoubleType, col($(featuresCol)))
125+
val predictUDF = udf { (features: Any) =>
126+
predict(features.asInstanceOf[FeaturesType])
127+
}
128+
predictUDF(col($(featuresCol)))
121129
}
122130
outputData = outputData.withColumn($(predictionCol), predUDF)
123131
numColsOutput += 1

mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ import org.apache.spark.ml.param.shared._
3030
import org.apache.spark.ml.util.{Identifiable, SchemaUtils}
3131
import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector, VectorUDT}
3232
import org.apache.spark.sql.{DataFrame, Row}
33-
import org.apache.spark.sql.functions.callUDF
33+
import org.apache.spark.sql.functions.udf
3434
import org.apache.spark.sql.types.{StructField, StructType}
3535
import org.apache.spark.util.collection.OpenHashSet
3636

@@ -339,7 +339,8 @@ class VectorIndexerModel private[ml] (
339339
override def transform(dataset: DataFrame): DataFrame = {
340340
transformSchema(dataset.schema, logging = true)
341341
val newField = prepOutputField(dataset.schema)
342-
val newCol = callUDF(transformFunc, new VectorUDT, dataset($(inputCol)))
342+
val transformUDF = udf { (vector: Vector) => transformFunc(vector) }
343+
val newCol = transformUDF(dataset($(inputCol)))
343344
dataset.withColumn($(outputCol), newCol.as($(outputCol), newField.metadata))
344345
}
345346

sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -137,13 +137,12 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll with SQLTestUtils {
137137

138138
test("SPARK-7158 collect and take return different results") {
139139
import java.util.UUID
140-
import org.apache.spark.sql.types._
141140

142141
val df = Seq(Tuple1(1), Tuple1(2), Tuple1(3)).toDF("index")
143142
// we except the id is materialized once
144-
def id: () => String = () => { UUID.randomUUID().toString() }
143+
val idUdf = udf(() => UUID.randomUUID().toString)
145144

146-
val dfWithId = df.withColumn("id", callUDF(id, StringType))
145+
val dfWithId = df.withColumn("id", idUdf())
147146
// Make a new DataFrame (actually the same reference to the old one)
148147
val cached = dfWithId.cache()
149148
// Trigger the cache

0 commit comments

Comments
 (0)