Skip to content

Commit 119f45d

Browse files
committed
[SPARK-5097][SQL] DataFrame
This pull request redesigns the existing Spark SQL dsl, which already provides data frame like functionalities. TODOs: With the exception of Python support, other tasks can be done in separate, follow-up PRs. - [ ] Audit of the API - [ ] Documentation - [ ] More test cases to cover the new API - [x] Python support - [ ] Type alias SchemaRDD Author: Reynold Xin <[email protected]> Author: Davies Liu <[email protected]> Closes apache#4173 from rxin/df1 and squashes the following commits: 0a1a73b [Reynold Xin] Merge branch 'df1' of github.com:rxin/spark into df1 23b4427 [Reynold Xin] Mima. 828f70d [Reynold Xin] Merge pull request #7 from davies/df 257b9e6 [Davies Liu] add repartition 6bf2b73 [Davies Liu] fix collect with UDT and tests e971078 [Reynold Xin] Missing quotes. b9306b4 [Reynold Xin] Remove removeColumn/updateColumn for now. a728bf2 [Reynold Xin] Example rename. e8aa3d3 [Reynold Xin] groupby -> groupBy. 9662c9e [Davies Liu] improve DataFrame Python API 4ae51ea [Davies Liu] python API for dataframe 1e5e454 [Reynold Xin] Fixed a bug with symbol conversion. 2ca74db [Reynold Xin] Couple minor fixes. ea98ea1 [Reynold Xin] Documentation & literal expressions. 2b22684 [Reynold Xin] Got rid of IntelliJ problems. 02bbfbc [Reynold Xin] Tightening imports. ffbce66 [Reynold Xin] Fixed compilation error. 59b6d8b [Reynold Xin] Style violation. b85edfb [Reynold Xin] ALS. 8c37f0a [Reynold Xin] Made MLlib and examples compile 6d53134 [Reynold Xin] Hive module. d35efd5 [Reynold Xin] Fixed compilation error. ce4a5d2 [Reynold Xin] Fixed test cases in SQL except ParquetIOSuite. 66d5ef1 [Reynold Xin] SQLContext minor patch. c9bcdc0 [Reynold Xin] Checkpoint: SQL module compiles!
1 parent b1b35ca commit 119f45d

File tree

82 files changed

+3444
-1572
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

82 files changed

+3444
-1572
lines changed

examples/src/main/java/org/apache/spark/examples/ml/JavaCrossValidatorExample.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@
3333
import org.apache.spark.ml.tuning.CrossValidator;
3434
import org.apache.spark.ml.tuning.CrossValidatorModel;
3535
import org.apache.spark.ml.tuning.ParamGridBuilder;
36-
import org.apache.spark.sql.SchemaRDD;
36+
import org.apache.spark.sql.DataFrame;
3737
import org.apache.spark.sql.SQLContext;
3838
import org.apache.spark.sql.Row;
3939

@@ -71,7 +71,7 @@ public static void main(String[] args) {
7171
new LabeledDocument(9L, "a e c l", 0.0),
7272
new LabeledDocument(10L, "spark compile", 1.0),
7373
new LabeledDocument(11L, "hadoop software", 0.0));
74-
SchemaRDD training = jsql.applySchema(jsc.parallelize(localTraining), LabeledDocument.class);
74+
DataFrame training = jsql.applySchema(jsc.parallelize(localTraining), LabeledDocument.class);
7575

7676
// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
7777
Tokenizer tokenizer = new Tokenizer()
@@ -112,11 +112,11 @@ public static void main(String[] args) {
112112
new Document(5L, "l m n"),
113113
new Document(6L, "mapreduce spark"),
114114
new Document(7L, "apache hadoop"));
115-
SchemaRDD test = jsql.applySchema(jsc.parallelize(localTest), Document.class);
115+
DataFrame test = jsql.applySchema(jsc.parallelize(localTest), Document.class);
116116

117117
// Make predictions on test documents. cvModel uses the best model found (lrModel).
118-
cvModel.transform(test).registerAsTable("prediction");
119-
SchemaRDD predictions = jsql.sql("SELECT id, text, score, prediction FROM prediction");
118+
cvModel.transform(test).registerTempTable("prediction");
119+
DataFrame predictions = jsql.sql("SELECT id, text, score, prediction FROM prediction");
120120
for (Row r: predictions.collect()) {
121121
System.out.println("(" + r.get(0) + ", " + r.get(1) + ") --> score=" + r.get(2)
122122
+ ", prediction=" + r.get(3));

examples/src/main/java/org/apache/spark/examples/ml/JavaSimpleParamsExample.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@
2828
import org.apache.spark.ml.classification.LogisticRegression;
2929
import org.apache.spark.mllib.linalg.Vectors;
3030
import org.apache.spark.mllib.regression.LabeledPoint;
31-
import org.apache.spark.sql.SchemaRDD;
31+
import org.apache.spark.sql.DataFrame;
3232
import org.apache.spark.sql.SQLContext;
3333
import org.apache.spark.sql.Row;
3434

@@ -48,13 +48,13 @@ public static void main(String[] args) {
4848

4949
// Prepare training data.
5050
// We use LabeledPoint, which is a JavaBean. Spark SQL can convert RDDs of JavaBeans
51-
// into SchemaRDDs, where it uses the bean metadata to infer the schema.
51+
// into DataFrames, where it uses the bean metadata to infer the schema.
5252
List<LabeledPoint> localTraining = Lists.newArrayList(
5353
new LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)),
5454
new LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)),
5555
new LabeledPoint(0.0, Vectors.dense(2.0, 1.3, 1.0)),
5656
new LabeledPoint(1.0, Vectors.dense(0.0, 1.2, -0.5)));
57-
SchemaRDD training = jsql.applySchema(jsc.parallelize(localTraining), LabeledPoint.class);
57+
DataFrame training = jsql.applySchema(jsc.parallelize(localTraining), LabeledPoint.class);
5858

5959
// Create a LogisticRegression instance. This instance is an Estimator.
6060
LogisticRegression lr = new LogisticRegression();
@@ -94,14 +94,14 @@ public static void main(String[] args) {
9494
new LabeledPoint(1.0, Vectors.dense(-1.0, 1.5, 1.3)),
9595
new LabeledPoint(0.0, Vectors.dense(3.0, 2.0, -0.1)),
9696
new LabeledPoint(1.0, Vectors.dense(0.0, 2.2, -1.5)));
97-
SchemaRDD test = jsql.applySchema(jsc.parallelize(localTest), LabeledPoint.class);
97+
DataFrame test = jsql.applySchema(jsc.parallelize(localTest), LabeledPoint.class);
9898

9999
// Make predictions on test documents using the Transformer.transform() method.
100100
// LogisticRegression.transform will only use the 'features' column.
101101
// Note that model2.transform() outputs a 'probability' column instead of the usual 'score'
102102
// column since we renamed the lr.scoreCol parameter previously.
103-
model2.transform(test).registerAsTable("results");
104-
SchemaRDD results =
103+
model2.transform(test).registerTempTable("results");
104+
DataFrame results =
105105
jsql.sql("SELECT features, label, probability, prediction FROM results");
106106
for (Row r: results.collect()) {
107107
System.out.println("(" + r.get(0) + ", " + r.get(1) + ") -> prob=" + r.get(2)

examples/src/main/java/org/apache/spark/examples/ml/JavaSimpleTextClassificationPipeline.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import org.apache.spark.ml.classification.LogisticRegression;
3030
import org.apache.spark.ml.feature.HashingTF;
3131
import org.apache.spark.ml.feature.Tokenizer;
32-
import org.apache.spark.sql.SchemaRDD;
32+
import org.apache.spark.sql.DataFrame;
3333
import org.apache.spark.sql.SQLContext;
3434
import org.apache.spark.sql.Row;
3535

@@ -54,7 +54,7 @@ public static void main(String[] args) {
5454
new LabeledDocument(1L, "b d", 0.0),
5555
new LabeledDocument(2L, "spark f g h", 1.0),
5656
new LabeledDocument(3L, "hadoop mapreduce", 0.0));
57-
SchemaRDD training = jsql.applySchema(jsc.parallelize(localTraining), LabeledDocument.class);
57+
DataFrame training = jsql.applySchema(jsc.parallelize(localTraining), LabeledDocument.class);
5858

5959
// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
6060
Tokenizer tokenizer = new Tokenizer()
@@ -79,11 +79,11 @@ public static void main(String[] args) {
7979
new Document(5L, "l m n"),
8080
new Document(6L, "mapreduce spark"),
8181
new Document(7L, "apache hadoop"));
82-
SchemaRDD test = jsql.applySchema(jsc.parallelize(localTest), Document.class);
82+
DataFrame test = jsql.applySchema(jsc.parallelize(localTest), Document.class);
8383

8484
// Make predictions on test documents.
85-
model.transform(test).registerAsTable("prediction");
86-
SchemaRDD predictions = jsql.sql("SELECT id, text, score, prediction FROM prediction");
85+
model.transform(test).registerTempTable("prediction");
86+
DataFrame predictions = jsql.sql("SELECT id, text, score, prediction FROM prediction");
8787
for (Row r: predictions.collect()) {
8888
System.out.println("(" + r.get(0) + ", " + r.get(1) + ") --> score=" + r.get(2)
8989
+ ", prediction=" + r.get(3));

examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQL.java

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@
2626
import org.apache.spark.api.java.JavaSparkContext;
2727
import org.apache.spark.api.java.function.Function;
2828

29-
import org.apache.spark.sql.SQLContext;
30-
import org.apache.spark.sql.SchemaRDD;
29+
import org.apache.spark.sql.DataFrame;
3130
import org.apache.spark.sql.Row;
31+
import org.apache.spark.sql.SQLContext;
3232

3333
public class JavaSparkSQL {
3434
public static class Person implements Serializable {
@@ -74,13 +74,13 @@ public Person call(String line) {
7474
});
7575

7676
// Apply a schema to an RDD of Java Beans and register it as a table.
77-
SchemaRDD schemaPeople = sqlCtx.applySchema(people, Person.class);
77+
DataFrame schemaPeople = sqlCtx.applySchema(people, Person.class);
7878
schemaPeople.registerTempTable("people");
7979

8080
// SQL can be run over RDDs that have been registered as tables.
81-
SchemaRDD teenagers = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
81+
DataFrame teenagers = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
8282

83-
// The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
83+
// The results of SQL queries are DataFrames and support all the normal RDD operations.
8484
// The columns of a row in the result can be accessed by ordinal.
8585
List<String> teenagerNames = teenagers.toJavaRDD().map(new Function<Row, String>() {
8686
@Override
@@ -93,17 +93,17 @@ public String call(Row row) {
9393
}
9494

9595
System.out.println("=== Data source: Parquet File ===");
96-
// JavaSchemaRDDs can be saved as parquet files, maintaining the schema information.
96+
// DataFrames can be saved as parquet files, maintaining the schema information.
9797
schemaPeople.saveAsParquetFile("people.parquet");
9898

9999
// Read in the parquet file created above.
100100
// Parquet files are self-describing so the schema is preserved.
101-
// The result of loading a parquet file is also a JavaSchemaRDD.
102-
SchemaRDD parquetFile = sqlCtx.parquetFile("people.parquet");
101+
// The result of loading a parquet file is also a DataFrame.
102+
DataFrame parquetFile = sqlCtx.parquetFile("people.parquet");
103103

104104
//Parquet files can also be registered as tables and then used in SQL statements.
105105
parquetFile.registerTempTable("parquetFile");
106-
SchemaRDD teenagers2 =
106+
DataFrame teenagers2 =
107107
sqlCtx.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
108108
teenagerNames = teenagers2.toJavaRDD().map(new Function<Row, String>() {
109109
@Override
@@ -119,8 +119,8 @@ public String call(Row row) {
119119
// A JSON dataset is pointed by path.
120120
// The path can be either a single text file or a directory storing text files.
121121
String path = "examples/src/main/resources/people.json";
122-
// Create a JavaSchemaRDD from the file(s) pointed by path
123-
SchemaRDD peopleFromJsonFile = sqlCtx.jsonFile(path);
122+
// Create a DataFrame from the file(s) pointed by path
123+
DataFrame peopleFromJsonFile = sqlCtx.jsonFile(path);
124124

125125
// Because the schema of a JSON dataset is automatically inferred, to write queries,
126126
// it is better to take a look at what is the schema.
@@ -130,13 +130,13 @@ public String call(Row row) {
130130
// |-- age: IntegerType
131131
// |-- name: StringType
132132

133-
// Register this JavaSchemaRDD as a table.
133+
// Register this DataFrame as a table.
134134
peopleFromJsonFile.registerTempTable("people");
135135

136136
// SQL statements can be run by using the sql methods provided by sqlCtx.
137-
SchemaRDD teenagers3 = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
137+
DataFrame teenagers3 = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
138138

139-
// The results of SQL queries are JavaSchemaRDDs and support all the normal RDD operations.
139+
// The results of SQL queries are DataFrame and support all the normal RDD operations.
140140
// The columns of a row in the result can be accessed by ordinal.
141141
teenagerNames = teenagers3.toJavaRDD().map(new Function<Row, String>() {
142142
@Override
@@ -146,14 +146,14 @@ public String call(Row row) {
146146
System.out.println(name);
147147
}
148148

149-
// Alternatively, a JavaSchemaRDD can be created for a JSON dataset represented by
149+
// Alternatively, a DataFrame can be created for a JSON dataset represented by
150150
// a RDD[String] storing one JSON object per string.
151151
List<String> jsonData = Arrays.asList(
152152
"{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
153153
JavaRDD<String> anotherPeopleRDD = ctx.parallelize(jsonData);
154-
SchemaRDD peopleFromJsonRDD = sqlCtx.jsonRDD(anotherPeopleRDD.rdd());
154+
DataFrame peopleFromJsonRDD = sqlCtx.jsonRDD(anotherPeopleRDD.rdd());
155155

156-
// Take a look at the schema of this new JavaSchemaRDD.
156+
// Take a look at the schema of this new DataFrame.
157157
peopleFromJsonRDD.printSchema();
158158
// The schema of anotherPeople is ...
159159
// root
@@ -164,7 +164,7 @@ public String call(Row row) {
164164

165165
peopleFromJsonRDD.registerTempTable("people2");
166166

167-
SchemaRDD peopleWithCity = sqlCtx.sql("SELECT name, address.city FROM people2");
167+
DataFrame peopleWithCity = sqlCtx.sql("SELECT name, address.city FROM people2");
168168
List<String> nameAndCity = peopleWithCity.toJavaRDD().map(new Function<Row, String>() {
169169
@Override
170170
public String call(Row row) {

examples/src/main/python/mllib/dataset_example.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
#
1717

1818
"""
19-
An example of how to use SchemaRDD as a dataset for ML. Run with::
19+
An example of how to use DataFrame as a dataset for ML. Run with::
2020
bin/spark-submit examples/src/main/python/mllib/dataset_example.py
2121
"""
2222

examples/src/main/python/sql.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,26 +30,26 @@
3030
some_rdd = sc.parallelize([Row(name="John", age=19),
3131
Row(name="Smith", age=23),
3232
Row(name="Sarah", age=18)])
33-
# Infer schema from the first row, create a SchemaRDD and print the schema
34-
some_schemardd = sqlContext.inferSchema(some_rdd)
35-
some_schemardd.printSchema()
33+
# Infer schema from the first row, create a DataFrame and print the schema
34+
some_df = sqlContext.inferSchema(some_rdd)
35+
some_df.printSchema()
3636

3737
# Another RDD is created from a list of tuples
3838
another_rdd = sc.parallelize([("John", 19), ("Smith", 23), ("Sarah", 18)])
3939
# Schema with two fields - person_name and person_age
4040
schema = StructType([StructField("person_name", StringType(), False),
4141
StructField("person_age", IntegerType(), False)])
42-
# Create a SchemaRDD by applying the schema to the RDD and print the schema
43-
another_schemardd = sqlContext.applySchema(another_rdd, schema)
44-
another_schemardd.printSchema()
42+
# Create a DataFrame by applying the schema to the RDD and print the schema
43+
another_df = sqlContext.applySchema(another_rdd, schema)
44+
another_df.printSchema()
4545
# root
4646
# |-- age: integer (nullable = true)
4747
# |-- name: string (nullable = true)
4848

4949
# A JSON dataset is pointed to by path.
5050
# The path can be either a single text file or a directory storing text files.
5151
path = os.path.join(os.environ['SPARK_HOME'], "examples/src/main/resources/people.json")
52-
# Create a SchemaRDD from the file(s) pointed to by path
52+
# Create a DataFrame from the file(s) pointed to by path
5353
people = sqlContext.jsonFile(path)
5454
# root
5555
# |-- person_name: string (nullable = false)
@@ -61,7 +61,7 @@
6161
# |-- age: IntegerType
6262
# |-- name: StringType
6363

64-
# Register this SchemaRDD as a table.
64+
# Register this DataFrame as a table.
6565
people.registerAsTable("people")
6666

6767
# SQL statements can be run by using the sql methods provided by sqlContext

examples/src/main/scala/org/apache/spark/examples/ml/CrossValidatorExample.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.spark.examples.ml
1919

2020
import org.apache.spark.{SparkConf, SparkContext}
21-
import org.apache.spark.SparkContext._
2221
import org.apache.spark.ml.Pipeline
2322
import org.apache.spark.ml.classification.LogisticRegression
2423
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
@@ -101,7 +100,7 @@ object CrossValidatorExample {
101100

102101
// Make predictions on test documents. cvModel uses the best model found (lrModel).
103102
cvModel.transform(test)
104-
.select('id, 'text, 'score, 'prediction)
103+
.select("id", "text", "score", "prediction")
105104
.collect()
106105
.foreach { case Row(id: Long, text: String, score: Double, prediction: Double) =>
107106
println("(" + id + ", " + text + ") --> score=" + score + ", prediction=" + prediction)

examples/src/main/scala/org/apache/spark/examples/ml/MovieLensALS.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ object MovieLensALS {
143143

144144
// Evaluate the model.
145145
// TODO: Create an evaluator to compute RMSE.
146-
val mse = predictions.select('rating, 'prediction)
146+
val mse = predictions.select("rating", "prediction").rdd
147147
.flatMap { case Row(rating: Float, prediction: Float) =>
148148
val err = rating.toDouble - prediction
149149
val err2 = err * err

examples/src/main/scala/org/apache/spark/examples/ml/SimpleParamsExample.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.spark.examples.ml
1919

2020
import org.apache.spark.{SparkConf, SparkContext}
21-
import org.apache.spark.SparkContext._
2221
import org.apache.spark.ml.classification.LogisticRegression
2322
import org.apache.spark.ml.param.ParamMap
2423
import org.apache.spark.mllib.linalg.{Vector, Vectors}
@@ -42,7 +41,7 @@ object SimpleParamsExample {
4241

4342
// Prepare training data.
4443
// We use LabeledPoint, which is a case class. Spark SQL can convert RDDs of Java Beans
45-
// into SchemaRDDs, where it uses the bean metadata to infer the schema.
44+
// into DataFrames, where it uses the bean metadata to infer the schema.
4645
val training = sparkContext.parallelize(Seq(
4746
LabeledPoint(1.0, Vectors.dense(0.0, 1.1, 0.1)),
4847
LabeledPoint(0.0, Vectors.dense(2.0, 1.0, -1.0)),
@@ -92,7 +91,7 @@ object SimpleParamsExample {
9291
// Note that model2.transform() outputs a 'probability' column instead of the usual 'score'
9392
// column since we renamed the lr.scoreCol parameter previously.
9493
model2.transform(test)
95-
.select('features, 'label, 'probability, 'prediction)
94+
.select("features", "label", "probability", "prediction")
9695
.collect()
9796
.foreach { case Row(features: Vector, label: Double, prob: Double, prediction: Double) =>
9897
println("(" + features + ", " + label + ") -> prob=" + prob + ", prediction=" + prediction)

examples/src/main/scala/org/apache/spark/examples/ml/SimpleTextClassificationPipeline.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ package org.apache.spark.examples.ml
2020
import scala.beans.BeanInfo
2121

2222
import org.apache.spark.{SparkConf, SparkContext}
23-
import org.apache.spark.SparkContext._
2423
import org.apache.spark.ml.Pipeline
2524
import org.apache.spark.ml.classification.LogisticRegression
2625
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
@@ -80,7 +79,7 @@ object SimpleTextClassificationPipeline {
8079

8180
// Make predictions on test documents.
8281
model.transform(test)
83-
.select('id, 'text, 'score, 'prediction)
82+
.select("id", "text", "score", "prediction")
8483
.collect()
8584
.foreach { case Row(id: Long, text: String, score: Double, prediction: Double) =>
8685
println("(" + id + ", " + text + ") --> score=" + score + ", prediction=" + prediction)

0 commit comments

Comments
 (0)