Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ object CrossValidatorExample {
crossval.setNumFolds(2) // Use 3+ in practice

// Run cross-validation, and choose the best set of parameters.
val cvModel = crossval.fit(training)
val cvModel = crossval.fit(training.toDF)

// Prepare test documents, which are unlabeled.
val test = sc.parallelize(Seq(
Expand All @@ -100,7 +100,7 @@ object CrossValidatorExample {
Document(7L, "apache hadoop")))

// Make predictions on test documents. cvModel uses the best model found (lrModel).
cvModel.transform(test)
cvModel.transform(test.toDF)
.select("id", "text", "probability", "prediction")
.collect()
.foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ object DeveloperApiExample {
lr.setMaxIter(10)

// Learn a LogisticRegression model. This uses the parameters stored in lr.
val model = lr.fit(training)
val model = lr.fit(training.toDF)

// Prepare test data.
val test = sc.parallelize(Seq(
Expand All @@ -67,7 +67,7 @@ object DeveloperApiExample {
LabeledPoint(1.0, Vectors.dense(0.0, 2.2, -1.5))))

// Make predictions on test data.
val sumPredictions: Double = model.transform(test)
val sumPredictions: Double = model.transform(test.toDF)
.select("features", "label", "prediction")
.collect()
.map { case Row(features: Vector, label: Double, prediction: Double) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,9 @@ object MovieLensALS {
.setRegParam(params.regParam)
.setNumBlocks(params.numBlocks)

val model = als.fit(training)
val model = als.fit(training.toDF)

val predictions = model.transform(test).cache()
val predictions = model.transform(test.toDF).cache()

// Evaluate the model.
// TODO: Create an evaluator to compute RMSE.
Expand All @@ -158,7 +158,7 @@ object MovieLensALS {

// Inspect false positives.
predictions.registerTempTable("prediction")
sc.textFile(params.movies).map(Movie.parseMovie).registerTempTable("movie")
sc.textFile(params.movies).map(Movie.parseMovie).toDF.registerTempTable("movie")
sqlContext.sql(
"""
|SELECT userId, prediction.movieId, title, rating, prediction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ object SimpleParamsExample {
.setRegParam(0.01)

// Learn a LogisticRegression model. This uses the parameters stored in lr.
val model1 = lr.fit(training)
val model1 = lr.fit(training.toDF)
// Since model1 is a Model (i.e., a Transformer produced by an Estimator),
// we can view the parameters it used during fit().
// This prints the parameter (name: value) pairs, where names are unique IDs for this
Expand All @@ -77,7 +77,7 @@ object SimpleParamsExample {

// Now learn a new model using the paramMapCombined parameters.
// paramMapCombined overrides all parameters set earlier via lr.set* methods.
val model2 = lr.fit(training, paramMapCombined)
val model2 = lr.fit(training.toDF, paramMapCombined)
println("Model 2 was fit using parameters: " + model2.fittingParamMap)

// Prepare test data.
Expand All @@ -90,7 +90,7 @@ object SimpleParamsExample {
// LogisticRegression.transform will only use the 'features' column.
// Note that model2.transform() outputs a 'myProbability' column instead of the usual
// 'probability' column since we renamed the lr.probabilityCol parameter previously.
model2.transform(test)
model2.transform(test.toDF)
.select("features", "label", "myProbability", "prediction")
.collect()
.foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ object SimpleTextClassificationPipeline {
.setStages(Array(tokenizer, hashingTF, lr))

// Fit the pipeline to training documents.
val model = pipeline.fit(training)
val model = pipeline.fit(training.toDF)

// Prepare test documents, which are unlabeled.
val test = sc.parallelize(Seq(
Expand All @@ -79,7 +79,7 @@ object SimpleTextClassificationPipeline {
Document(7L, "apache hadoop")))

// Make predictions on test documents.
model.transform(test)
model.transform(test.toDF)
.select("id", "text", "probability", "prediction")
.collect()
.foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,18 +81,18 @@ object DatasetExample {
println(s"Loaded ${origData.count()} instances from file: ${params.input}")

// Convert input data to DataFrame explicitly.
val df: DataFrame = origData.toDataFrame
val df: DataFrame = origData.toDF
println(s"Inferred schema:\n${df.schema.prettyJson}")
println(s"Converted to DataFrame with ${df.count()} records")

// Select columns, using implicit conversion to DataFrames.
val labelsDf: DataFrame = origData.select("label")
// Select columns
val labelsDf: DataFrame = df.select("label")
val labels: RDD[Double] = labelsDf.map { case Row(v: Double) => v }
val numLabels = labels.count()
val meanLabel = labels.fold(0.0)(_ + _) / numLabels
println(s"Selected label column with average value $meanLabel")

val featuresDf: DataFrame = origData.select("features")
val featuresDf: DataFrame = df.select("features")
val features: RDD[Vector] = featuresDf.map { case Row(v: Vector) => v }
val featureSummary = features.aggregate(new MultivariateOnlineSummarizer())(
(summary, feat) => summary.add(feat),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.examples.sql

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.Dsl._
import org.apache.spark.sql.functions._

// One method for defining the schema of an RDD is to make a case class with the desired column
// names and types.
Expand All @@ -34,10 +34,10 @@ object RDDRelation {
// Importing the SQL context gives access to all the SQL functions and implicit conversions.
import sqlContext.implicits._

val rdd = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i")))
val df = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i"))).toDF
// Any RDD containing case classes can be registered as a table. The schema of the table is
// automatically inferred using scala reflection.
rdd.registerTempTable("records")
df.registerTempTable("records")

// Once tables have been registered, you can run SQL queries over them.
println("Result of SELECT *:")
Expand All @@ -55,10 +55,10 @@ object RDDRelation {
rddFromSql.map(row => s"Key: ${row(0)}, Value: ${row(1)}").collect().foreach(println)

// Queries can also be written using a LINQ-like Scala DSL.
rdd.where($"key" === 1).orderBy($"value".asc).select($"key").collect().foreach(println)
df.where($"key" === 1).orderBy($"value".asc).select($"key").collect().foreach(println)

// Write out an RDD as a parquet file.
rdd.saveAsParquetFile("pair.parquet")
df.saveAsParquetFile("pair.parquet")

// Read in parquet file. Parquet files are self-describing so the schmema is preserved.
val parquetFile = sqlContext.parquetFile("pair.parquet")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ object HiveFromSpark {

// You can also register RDDs as temporary tables within a HiveContext.
val rdd = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i")))
rdd.registerTempTable("records")
rdd.toDF.registerTempTable("records")

// Queries can then join RDD data with data stored in Hive.
println("Result of SELECT *:")
Expand Down
6 changes: 3 additions & 3 deletions mllib/src/main/scala/org/apache/spark/ml/Transformer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.Logging
import org.apache.spark.annotation.AlphaComponent
import org.apache.spark.ml.param._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Dsl._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._

/**
Expand Down Expand Up @@ -97,7 +97,7 @@ private[ml] abstract class UnaryTransformer[IN, OUT, T <: UnaryTransformer[IN, O
override def transform(dataset: DataFrame, paramMap: ParamMap): DataFrame = {
transformSchema(dataset.schema, paramMap, logging = true)
val map = this.paramMap ++ paramMap
dataset.select($"*", callUDF(
this.createTransformFunc(map), outputDataType, dataset(map(inputCol))).as(map(outputCol)))
dataset.withColumn(map(outputCol),
callUDF(this.createTransformFunc(map), outputDataType, dataset(map(inputCol))))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: use udf

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.annotation.{DeveloperApi, AlphaComponent}
import org.apache.spark.ml.impl.estimator.{PredictionModel, Predictor, PredictorParams}
import org.apache.spark.ml.param.{Params, ParamMap, HasRawPredictionCol}
import org.apache.spark.mllib.linalg.{Vector, VectorUDT}
import org.apache.spark.sql.Dsl._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.{DataType, DoubleType, StructType}

Expand Down Expand Up @@ -180,24 +180,22 @@ private[ml] object ClassificationModel {
if (map(model.rawPredictionCol) != "") {
// output raw prediction
val features2raw: FeaturesType => Vector = model.predictRaw
tmpData = tmpData.select($"*",
callUDF(features2raw, new VectorUDT,
col(map(model.featuresCol))).as(map(model.rawPredictionCol)))
tmpData = tmpData.withColumn(map(model.rawPredictionCol),
callUDF(features2raw, new VectorUDT, col(map(model.featuresCol))))
numColsOutput += 1
if (map(model.predictionCol) != "") {
val raw2pred: Vector => Double = (rawPred) => {
rawPred.toArray.zipWithIndex.maxBy(_._1)._2
}
tmpData = tmpData.select($"*", callUDF(raw2pred, DoubleType,
col(map(model.rawPredictionCol))).as(map(model.predictionCol)))
tmpData = tmpData.withColumn(map(model.predictionCol),
callUDF(raw2pred, DoubleType, col(map(model.rawPredictionCol))))
numColsOutput += 1
}
} else if (map(model.predictionCol) != "") {
// output prediction
val features2pred: FeaturesType => Double = model.predict
tmpData = tmpData.select($"*",
callUDF(features2pred, DoubleType,
col(map(model.featuresCol))).as(map(model.predictionCol)))
tmpData = tmpData.withColumn(map(model.predictionCol),
callUDF(features2pred, DoubleType, col(map(model.featuresCol))))
numColsOutput += 1
}
(numColsOutput, tmpData)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.ml.param._
import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS
import org.apache.spark.mllib.linalg.{VectorUDT, BLAS, Vector, Vectors}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Dsl._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.DoubleType
import org.apache.spark.storage.StorageLevel

Expand Down Expand Up @@ -124,44 +124,39 @@ class LogisticRegressionModel private[ml] (
var numColsOutput = 0
if (map(rawPredictionCol) != "") {
val features2raw: Vector => Vector = (features) => predictRaw(features)
tmpData = tmpData.select($"*",
callUDF(features2raw, new VectorUDT, col(map(featuresCol))).as(map(rawPredictionCol)))
tmpData = tmpData.withColumn(map(rawPredictionCol),
callUDF(features2raw, new VectorUDT, col(map(featuresCol))))
numColsOutput += 1
}
if (map(probabilityCol) != "") {
if (map(rawPredictionCol) != "") {
val raw2prob: Vector => Vector = { (rawPreds: Vector) =>
val raw2prob = udf { (rawPreds: Vector) =>
val prob1 = 1.0 / (1.0 + math.exp(-rawPreds(1)))
Vectors.dense(1.0 - prob1, prob1)
Vectors.dense(1.0 - prob1, prob1): Vector
}
tmpData = tmpData.select($"*",
callUDF(raw2prob, new VectorUDT, col(map(rawPredictionCol))).as(map(probabilityCol)))
tmpData = tmpData.withColumn(map(probabilityCol), raw2prob(col(map(rawPredictionCol))))
} else {
val features2prob: Vector => Vector = (features: Vector) => predictProbabilities(features)
tmpData = tmpData.select($"*",
callUDF(features2prob, new VectorUDT, col(map(featuresCol))).as(map(probabilityCol)))
val features2prob = udf { (features: Vector) => predictProbabilities(features) : Vector }
tmpData = tmpData.withColumn(map(probabilityCol), features2prob(col(map(featuresCol))))
}
numColsOutput += 1
}
if (map(predictionCol) != "") {
val t = map(threshold)
if (map(probabilityCol) != "") {
val predict: Vector => Double = { probs: Vector =>
val predict = udf { probs: Vector =>
if (probs(1) > t) 1.0 else 0.0
}
tmpData = tmpData.select($"*",
callUDF(predict, DoubleType, col(map(probabilityCol))).as(map(predictionCol)))
tmpData = tmpData.withColumn(map(predictionCol), predict(col(map(probabilityCol))))
} else if (map(rawPredictionCol) != "") {
val predict: Vector => Double = { rawPreds: Vector =>
val predict = udf { rawPreds: Vector =>
val prob1 = 1.0 / (1.0 + math.exp(-rawPreds(1)))
if (prob1 > t) 1.0 else 0.0
}
tmpData = tmpData.select($"*",
callUDF(predict, DoubleType, col(map(rawPredictionCol))).as(map(predictionCol)))
tmpData = tmpData.withColumn(map(predictionCol), predict(col(map(rawPredictionCol))))
} else {
val predict: Vector => Double = (features: Vector) => this.predict(features)
tmpData = tmpData.select($"*",
callUDF(predict, DoubleType, col(map(featuresCol))).as(map(predictionCol)))
val predict = udf { features: Vector => this.predict(features) }
tmpData = tmpData.withColumn(map(predictionCol), predict(col(map(featuresCol))))
}
numColsOutput += 1
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.annotation.{AlphaComponent, DeveloperApi}
import org.apache.spark.ml.param.{HasProbabilityCol, ParamMap, Params}
import org.apache.spark.mllib.linalg.{Vector, VectorUDT}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Dsl._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{DataType, StructType}


Expand Down Expand Up @@ -120,8 +120,8 @@ private[spark] abstract class ProbabilisticClassificationModel[
val features2probs: FeaturesType => Vector = (features) => {
tmpModel.predictProbabilities(features)
}
outputData.select($"*",
callUDF(features2probs, new VectorUDT, col(map(featuresCol))).as(map(probabilityCol)))
outputData.withColumn(map(probabilityCol),
callUDF(features2probs, new VectorUDT, col(map(featuresCol))))
} else {
if (numColsOutput == 0) {
this.logWarning(s"$uid: ProbabilisticClassificationModel.transform() was called as NOOP" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.apache.spark.ml.param._
import org.apache.spark.mllib.feature
import org.apache.spark.mllib.linalg.{Vector, VectorUDT}
import org.apache.spark.sql._
import org.apache.spark.sql.Dsl._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StructField, StructType}

/**
Expand Down Expand Up @@ -82,7 +82,7 @@ class StandardScalerModel private[ml] (
transformSchema(dataset.schema, paramMap, logging = true)
val map = this.paramMap ++ paramMap
val scale = udf((v: Vector) => { scaler.transform(v) } : Vector)
dataset.select($"*", scale(col(map(inputCol))).as(map(outputCol)))
dataset.withColumn(map(outputCol), scale(col(map(inputCol))))
}

private[ml] override def transformSchema(schema: StructType, paramMap: ParamMap): StructType = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.mllib.linalg.{VectorUDT, Vector}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row}
import org.apache.spark.sql.Dsl._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{DataType, DoubleType, StructType}


Expand Down Expand Up @@ -209,7 +209,7 @@ private[spark] abstract class PredictionModel[FeaturesType, M <: PredictionModel
val pred: FeaturesType => Double = (features) => {
tmpModel.predict(features)
}
dataset.select($"*", callUDF(pred, DoubleType, col(map(featuresCol))).as(map(predictionCol)))
dataset.withColumn(map(predictionCol), callUDF(pred, DoubleType, col(map(featuresCol))))
} else {
this.logWarning(s"$uid: Predictor.transform() was called as NOOP" +
" since no output columns were set.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.ml.param._
import org.apache.spark.mllib.optimization.NNLS
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Dsl._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{DoubleType, FloatType, IntegerType, StructField, StructType}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -124,8 +124,8 @@ class ALSModel private[ml] (
override def transform(dataset: DataFrame, paramMap: ParamMap): DataFrame = {
import dataset.sqlContext.implicits._
val map = this.paramMap ++ paramMap
val users = userFactors.toDataFrame("id", "features")
val items = itemFactors.toDataFrame("id", "features")
val users = userFactors.toDF("id", "features")
val items = itemFactors.toDF("id", "features")

// Register a UDF for DataFrame, and then
// create a new column named map(predictionCol) by running the predict UDF.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ object NaiveBayesModel extends Loader[NaiveBayesModel] {
sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path))

// Create Parquet data.
val dataRDD: DataFrame = sc.parallelize(Seq(data), 1)
val dataRDD: DataFrame = sc.parallelize(Seq(data), 1).toDF
dataRDD.saveAsParquetFile(dataPath(path))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ private[classification] object GLMClassificationModel {

// Create Parquet data.
val data = Data(weights, intercept, threshold)
sc.parallelize(Seq(data), 1).saveAsParquetFile(Loader.dataPath(path))
sc.parallelize(Seq(data), 1).toDF.saveAsParquetFile(Loader.dataPath(path))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,8 +187,8 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] {
val metadata = compact(render(
("class" -> thisClassName) ~ ("version" -> thisFormatVersion) ~ ("rank" -> model.rank)))
sc.parallelize(Seq(metadata), 1).saveAsTextFile(metadataPath(path))
model.userFeatures.toDataFrame("id", "features").saveAsParquetFile(userPath(path))
model.productFeatures.toDataFrame("id", "features").saveAsParquetFile(productPath(path))
model.userFeatures.toDF("id", "features").saveAsParquetFile(userPath(path))
model.productFeatures.toDF("id", "features").saveAsParquetFile(productPath(path))
}

def load(sc: SparkContext, path: String): MatrixFactorizationModel = {
Expand Down
Loading