Skip to content

Commit ba91bf5

Browse files
committed
[SPARK-5752][SQL] Don't implicitly convert RDDs directly to DataFrames
- The old implicit would convert RDDs directly to DataFrames, and that added too many methods. - toDataFrame -> toDF - Dsl -> functions - implicits moved into SQLContext.implicits - addColumn -> withColumn - renameColumn -> withColumnRenamed Python changes: - toDataFrame -> toDF - Dsl -> functions package - addColumn -> withColumn - renameColumn -> withColumnRenamed - add toDF functions to RDD on SQLContext init - add flatMap to DataFrame Author: Reynold Xin <[email protected]> Author: Davies Liu <[email protected]> Closes #4556 from rxin/SPARK-5752 and squashes the following commits: 5ef9910 [Reynold Xin] More fix 61d3fca [Reynold Xin] Merge branch 'df5' of github.com:davies/spark into SPARK-5752 ff5832c [Reynold Xin] Fix python 749c675 [Reynold Xin] count(*) fixes. 5806df0 [Reynold Xin] Fix build break again. d941f3d [Reynold Xin] Fixed explode compilation break. fe1267a [Davies Liu] flatMap c4afb8e [Reynold Xin] style d9de47f [Davies Liu] add comment b783994 [Davies Liu] add comment for toDF e2154e5 [Davies Liu] schema() -> schema 3a1004f [Davies Liu] Dsl -> functions, toDF() fb256af [Reynold Xin] - toDataFrame -> toDF - Dsl -> functions - implicits moved into SQLContext.implicits - addColumn -> withColumn - renameColumn -> withColumnRenamed 0dd74eb [Reynold Xin] [SPARK-5752][SQL] Don't implicitly convert RDDs directly to DataFrames 97dd47c [Davies Liu] fix mistake 6168f74 [Davies Liu] fix test 1fc0199 [Davies Liu] fix test a075cd5 [Davies Liu] clean up, toPandas 663d314 [Davies Liu] add test for agg('*') 9e214d5 [Reynold Xin] count(*) fixes. 1ed7136 [Reynold Xin] Fix build break again. 921b2e3 [Reynold Xin] Fixed explode compilation break. 14698d4 [Davies Liu] flatMap ba3e12d [Reynold Xin] style d08c92d [Davies Liu] add comment 5c8b524 [Davies Liu] add comment for toDF a4e5e66 [Davies Liu] schema() -> schema d377fc9 [Davies Liu] Dsl -> functions, toDF() 6b3086c [Reynold Xin] - toDataFrame -> toDF - Dsl -> functions - implicits moved into SQLContext.implicits - addColumn -> withColumn - renameColumn -> withColumnRenamed 807e8b1 [Reynold Xin] [SPARK-5752][SQL] Don't implicitly convert RDDs directly to DataFrames (cherry picked from commit e98dfe6) Signed-off-by: Reynold Xin <[email protected]>
1 parent db57479 commit ba91bf5

File tree

70 files changed

+596
-456
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

70 files changed

+596
-456
lines changed

examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ object CrossValidatorExample {
9090
crossval.setNumFolds(2) // Use 3+ in practice
9191

9292
// Run cross-validation, and choose the best set of parameters.
93-
val cvModel = crossval.fit(training)
93+
val cvModel = crossval.fit(training.toDF)
9494

9595
// Prepare test documents, which are unlabeled.
9696
val test = sc.parallelize(Seq(
@@ -100,7 +100,7 @@ object CrossValidatorExample {
100100
Document(7L, "apache hadoop")))
101101

102102
// Make predictions on test documents. cvModel uses the best model found (lrModel).
103-
cvModel.transform(test)
103+
cvModel.transform(test.toDF)
104104
.select("id", "text", "probability", "prediction")
105105
.collect()
106106
.foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>

examples/src/main/scala/org/apache/spark/examples/ml/DeveloperApiExample.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ object DeveloperApiExample {
5858
lr.setMaxIter(10)
5959

6060
// Learn a LogisticRegression model. This uses the parameters stored in lr.
61-
val model = lr.fit(training)
61+
val model = lr.fit(training.toDF)
6262

6363
// Prepare test data.
6464
val test = sc.parallelize(Seq(
@@ -67,7 +67,7 @@ object DeveloperApiExample {
6767
LabeledPoint(1.0, Vectors.dense(0.0, 2.2, -1.5))))
6868

6969
// Make predictions on test data.
70-
val sumPredictions: Double = model.transform(test)
70+
val sumPredictions: Double = model.transform(test.toDF)
7171
.select("features", "label", "prediction")
7272
.collect()
7373
.map { case Row(features: Vector, label: Double, prediction: Double) =>

examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -137,9 +137,9 @@ object MovieLensALS {
137137
.setRegParam(params.regParam)
138138
.setNumBlocks(params.numBlocks)
139139

140-
val model = als.fit(training)
140+
val model = als.fit(training.toDF)
141141

142-
val predictions = model.transform(test).cache()
142+
val predictions = model.transform(test.toDF).cache()
143143

144144
// Evaluate the model.
145145
// TODO: Create an evaluator to compute RMSE.
@@ -158,7 +158,7 @@ object MovieLensALS {
158158

159159
// Inspect false positives.
160160
predictions.registerTempTable("prediction")
161-
sc.textFile(params.movies).map(Movie.parseMovie).registerTempTable("movie")
161+
sc.textFile(params.movies).map(Movie.parseMovie).toDF.registerTempTable("movie")
162162
sqlContext.sql(
163163
"""
164164
|SELECT userId, prediction.movieId, title, rating, prediction

examples/src/main/scala/org/apache/spark/examples/ml/SimpleParamsExample.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ object SimpleParamsExample {
5858
.setRegParam(0.01)
5959

6060
// Learn a LogisticRegression model. This uses the parameters stored in lr.
61-
val model1 = lr.fit(training)
61+
val model1 = lr.fit(training.toDF)
6262
// Since model1 is a Model (i.e., a Transformer produced by an Estimator),
6363
// we can view the parameters it used during fit().
6464
// This prints the parameter (name: value) pairs, where names are unique IDs for this
@@ -77,7 +77,7 @@ object SimpleParamsExample {
7777

7878
// Now learn a new model using the paramMapCombined parameters.
7979
// paramMapCombined overrides all parameters set earlier via lr.set* methods.
80-
val model2 = lr.fit(training, paramMapCombined)
80+
val model2 = lr.fit(training.toDF, paramMapCombined)
8181
println("Model 2 was fit using parameters: " + model2.fittingParamMap)
8282

8383
// Prepare test data.
@@ -90,7 +90,7 @@ object SimpleParamsExample {
9090
// LogisticRegression.transform will only use the 'features' column.
9191
// Note that model2.transform() outputs a 'myProbability' column instead of the usual
9292
// 'probability' column since we renamed the lr.probabilityCol parameter previously.
93-
model2.transform(test)
93+
model2.transform(test.toDF)
9494
.select("features", "label", "myProbability", "prediction")
9595
.collect()
9696
.foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) =>

examples/src/main/scala/org/apache/spark/examples/ml/SimpleTextClassificationPipeline.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ object SimpleTextClassificationPipeline {
6969
.setStages(Array(tokenizer, hashingTF, lr))
7070

7171
// Fit the pipeline to training documents.
72-
val model = pipeline.fit(training)
72+
val model = pipeline.fit(training.toDF)
7373

7474
// Prepare test documents, which are unlabeled.
7575
val test = sc.parallelize(Seq(
@@ -79,7 +79,7 @@ object SimpleTextClassificationPipeline {
7979
Document(7L, "apache hadoop")))
8080

8181
// Make predictions on test documents.
82-
model.transform(test)
82+
model.transform(test.toDF)
8383
.select("id", "text", "probability", "prediction")
8484
.collect()
8585
.foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>

examples/src/main/scala/org/apache/spark/examples/mllib/DatasetExample.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -81,18 +81,18 @@ object DatasetExample {
8181
println(s"Loaded ${origData.count()} instances from file: ${params.input}")
8282

8383
// Convert input data to DataFrame explicitly.
84-
val df: DataFrame = origData.toDataFrame
84+
val df: DataFrame = origData.toDF
8585
println(s"Inferred schema:\n${df.schema.prettyJson}")
8686
println(s"Converted to DataFrame with ${df.count()} records")
8787

88-
// Select columns, using implicit conversion to DataFrames.
89-
val labelsDf: DataFrame = origData.select("label")
88+
// Select columns
89+
val labelsDf: DataFrame = df.select("label")
9090
val labels: RDD[Double] = labelsDf.map { case Row(v: Double) => v }
9191
val numLabels = labels.count()
9292
val meanLabel = labels.fold(0.0)(_ + _) / numLabels
9393
println(s"Selected label column with average value $meanLabel")
9494

95-
val featuresDf: DataFrame = origData.select("features")
95+
val featuresDf: DataFrame = df.select("features")
9696
val features: RDD[Vector] = featuresDf.map { case Row(v: Vector) => v }
9797
val featureSummary = features.aggregate(new MultivariateOnlineSummarizer())(
9898
(summary, feat) => summary.add(feat),

examples/src/main/scala/org/apache/spark/examples/sql/RDDRelation.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.examples.sql
1919

2020
import org.apache.spark.{SparkConf, SparkContext}
2121
import org.apache.spark.sql.SQLContext
22-
import org.apache.spark.sql.Dsl._
22+
import org.apache.spark.sql.functions._
2323

2424
// One method for defining the schema of an RDD is to make a case class with the desired column
2525
// names and types.
@@ -34,10 +34,10 @@ object RDDRelation {
3434
// Importing the SQL context gives access to all the SQL functions and implicit conversions.
3535
import sqlContext.implicits._
3636

37-
val rdd = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i")))
37+
val df = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i"))).toDF
3838
// Any RDD containing case classes can be registered as a table. The schema of the table is
3939
// automatically inferred using scala reflection.
40-
rdd.registerTempTable("records")
40+
df.registerTempTable("records")
4141

4242
// Once tables have been registered, you can run SQL queries over them.
4343
println("Result of SELECT *:")
@@ -55,10 +55,10 @@ object RDDRelation {
5555
rddFromSql.map(row => s"Key: ${row(0)}, Value: ${row(1)}").collect().foreach(println)
5656

5757
// Queries can also be written using a LINQ-like Scala DSL.
58-
rdd.where($"key" === 1).orderBy($"value".asc).select($"key").collect().foreach(println)
58+
df.where($"key" === 1).orderBy($"value".asc).select($"key").collect().foreach(println)
5959

6060
// Write out an RDD as a parquet file.
61-
rdd.saveAsParquetFile("pair.parquet")
61+
df.saveAsParquetFile("pair.parquet")
6262

6363
// Read in parquet file. Parquet files are self-describing so the schmema is preserved.
6464
val parquetFile = sqlContext.parquetFile("pair.parquet")

examples/src/main/scala/org/apache/spark/examples/sql/hive/HiveFromSpark.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ object HiveFromSpark {
6868

6969
// You can also register RDDs as temporary tables within a HiveContext.
7070
val rdd = sc.parallelize((1 to 100).map(i => Record(i, s"val_$i")))
71-
rdd.registerTempTable("records")
71+
rdd.toDF.registerTempTable("records")
7272

7373
// Queries can then join RDD data with data stored in Hive.
7474
println("Result of SELECT *:")

mllib/src/main/scala/org/apache/spark/ml/Transformer.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import org.apache.spark.Logging
2323
import org.apache.spark.annotation.AlphaComponent
2424
import org.apache.spark.ml.param._
2525
import org.apache.spark.sql.DataFrame
26-
import org.apache.spark.sql.Dsl._
26+
import org.apache.spark.sql.functions._
2727
import org.apache.spark.sql.types._
2828

2929
/**
@@ -100,7 +100,7 @@ private[ml] abstract class UnaryTransformer[IN, OUT, T <: UnaryTransformer[IN, O
100100
override def transform(dataset: DataFrame, paramMap: ParamMap): DataFrame = {
101101
transformSchema(dataset.schema, paramMap, logging = true)
102102
val map = this.paramMap ++ paramMap
103-
dataset.select($"*", callUDF(
104-
this.createTransformFunc(map), outputDataType, dataset(map(inputCol))).as(map(outputCol)))
103+
dataset.withColumn(map(outputCol),
104+
callUDF(this.createTransformFunc(map), outputDataType, dataset(map(inputCol))))
105105
}
106106
}

mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import org.apache.spark.annotation.{DeveloperApi, AlphaComponent}
2121
import org.apache.spark.ml.impl.estimator.{PredictionModel, Predictor, PredictorParams}
2222
import org.apache.spark.ml.param.{Params, ParamMap, HasRawPredictionCol}
2323
import org.apache.spark.mllib.linalg.{Vector, VectorUDT}
24-
import org.apache.spark.sql.Dsl._
24+
import org.apache.spark.sql.functions._
2525
import org.apache.spark.sql.DataFrame
2626
import org.apache.spark.sql.types.{DataType, DoubleType, StructType}
2727

@@ -182,24 +182,22 @@ private[ml] object ClassificationModel {
182182
if (map(model.rawPredictionCol) != "") {
183183
// output raw prediction
184184
val features2raw: FeaturesType => Vector = model.predictRaw
185-
tmpData = tmpData.select($"*",
186-
callUDF(features2raw, new VectorUDT,
187-
col(map(model.featuresCol))).as(map(model.rawPredictionCol)))
185+
tmpData = tmpData.withColumn(map(model.rawPredictionCol),
186+
callUDF(features2raw, new VectorUDT, col(map(model.featuresCol))))
188187
numColsOutput += 1
189188
if (map(model.predictionCol) != "") {
190189
val raw2pred: Vector => Double = (rawPred) => {
191190
rawPred.toArray.zipWithIndex.maxBy(_._1)._2
192191
}
193-
tmpData = tmpData.select($"*", callUDF(raw2pred, DoubleType,
194-
col(map(model.rawPredictionCol))).as(map(model.predictionCol)))
192+
tmpData = tmpData.withColumn(map(model.predictionCol),
193+
callUDF(raw2pred, DoubleType, col(map(model.rawPredictionCol))))
195194
numColsOutput += 1
196195
}
197196
} else if (map(model.predictionCol) != "") {
198197
// output prediction
199198
val features2pred: FeaturesType => Double = model.predict
200-
tmpData = tmpData.select($"*",
201-
callUDF(features2pred, DoubleType,
202-
col(map(model.featuresCol))).as(map(model.predictionCol)))
199+
tmpData = tmpData.withColumn(map(model.predictionCol),
200+
callUDF(features2pred, DoubleType, col(map(model.featuresCol))))
203201
numColsOutput += 1
204202
}
205203
(numColsOutput, tmpData)

0 commit comments

Comments
 (0)