-
Notifications
You must be signed in to change notification settings - Fork 28.9k
SPARK-1374: PySpark API for SparkSQL #363
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
ab6025d
bcc0f23
67ba875
b8b904b
5496f9f
043ca85
c0fb1c6
4886052
e948bd9
cd5f79f
be079de
4fe1319
55d1c76
725c91e
c608947
09b9980
251f99d
906d180
e9f5b8d
d26ec5e
7515ba0
79f739d
e4d21b4
20936a5
b406ba0
79621cf
f98a422
b0192d3
e00980f
1836944
40491c9
337b201
38a92b0
4285340
58e2aa9
227a0be
e4da06c
22de1d4
ded03e7
ab95eba
521ff6d
6d658ba
a19afe4
f2312c7
29245bf
3ef074a
6f7b8f6
307d6e0
0294497
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -20,7 +20,7 @@ a schema that describes the data types of each column in the row. A SchemaRDD i | |
| in a traditional relational database. A SchemaRDD can be created from an existing RDD, parquet | ||
| file, or by running HiveQL against data stored in [Apache Hive](http://hive.apache.org/). | ||
|
|
||
| **All of the examples on this page use sample data included in the Spark distribution and can be run in the spark-shell.** | ||
| **All of the examples on this page use sample data included in the Spark distribution and can be run in the `spark-shell`.** | ||
|
|
||
| </div> | ||
|
|
||
|
|
@@ -33,6 +33,19 @@ a schema that describes the data types of each column in the row. A JavaSchemaR | |
| in a traditional relational database. A JavaSchemaRDD can be created from an existing RDD, parquet | ||
| file, or by running HiveQL against data stored in [Apache Hive](http://hive.apache.org/). | ||
| </div> | ||
|
|
||
| <div data-lang="python" markdown="1"> | ||
|
|
||
| Spark SQL allows relational queries expressed in SQL or HiveQL to be executed using | ||
| Spark. At the core of this component is a new type of RDD, | ||
| [SchemaRDD](api/pyspark/pyspark.sql.SchemaRDD-class.html). SchemaRDDs are composed | ||
| [Row](api/pyspark/pyspark.sql.Row-class.html) objects along with | ||
| a schema that describes the data types of each column in the row. A SchemaRDD is similar to a table | ||
| in a traditional relational database. A SchemaRDD can be created from an existing RDD, parquet | ||
| file, or by running HiveQL against data stored in [Apache Hive](http://hive.apache.org/). | ||
|
|
||
| **All of the examples on this page use sample data included in the Spark distribution and can be run in the `pyspark` shell.** | ||
| </div> | ||
| </div> | ||
|
|
||
| *************************************************************************************************** | ||
|
|
@@ -44,7 +57,7 @@ file, or by running HiveQL against data stored in [Apache Hive](http://hive.apac | |
|
|
||
| The entry point into all relational functionality in Spark is the | ||
| [SQLContext](api/sql/core/index.html#org.apache.spark.sql.SQLContext) class, or one of its | ||
| decendents. To create a basic SQLContext, all you need is a SparkContext. | ||
| descendants. To create a basic SQLContext, all you need is a SparkContext. | ||
|
|
||
| {% highlight scala %} | ||
| val sc: SparkContext // An existing SparkContext. | ||
|
|
@@ -60,7 +73,7 @@ import sqlContext._ | |
|
|
||
| The entry point into all relational functionality in Spark is the | ||
| [JavaSQLContext](api/sql/core/index.html#org.apache.spark.sql.api.java.JavaSQLContext) class, or one | ||
| of its decendents. To create a basic JavaSQLContext, all you need is a JavaSparkContext. | ||
| of its descendants. To create a basic JavaSQLContext, all you need is a JavaSparkContext. | ||
|
|
||
| {% highlight java %} | ||
| JavaSparkContext ctx = ...; // An existing JavaSparkContext. | ||
|
|
@@ -69,6 +82,19 @@ JavaSQLContext sqlCtx = new org.apache.spark.sql.api.java.JavaSQLContext(ctx); | |
|
|
||
| </div> | ||
|
|
||
| <div data-lang="python" markdown="1"> | ||
|
|
||
| The entry point into all relational functionality in Spark is the | ||
| [SQLContext](api/pyspark/pyspark.sql.SQLContext-class.html) class, or one | ||
| of its decedents. To create a basic SQLContext, all you need is a SparkContext. | ||
|
|
||
| {% highlight python %} | ||
| from pyspark.sql import SQLContext | ||
| sqlCtx = SQLContext(sc) | ||
| {% endhighlight %} | ||
|
|
||
| </div> | ||
|
|
||
| </div> | ||
|
|
||
| ## Running SQL on RDDs | ||
|
|
@@ -81,7 +107,7 @@ One type of table that is supported by Spark SQL is an RDD of Scala case classes | |
| defines the schema of the table. The names of the arguments to the case class are read using | ||
| reflection and become the names of the columns. Case classes can also be nested or contain complex | ||
| types such as Sequences or Arrays. This RDD can be implicitly converted to a SchemaRDD and then be | ||
| registered as a table. Tables can used in subsequent SQL statements. | ||
| registered as a table. Tables can be used in subsequent SQL statements. | ||
|
|
||
| {% highlight scala %} | ||
| val sqlContext = new org.apache.spark.sql.SQLContext(sc) | ||
|
|
@@ -176,6 +202,34 @@ List<String> teenagerNames = teenagers.map(new Function<Row, String>() { | |
|
|
||
| </div> | ||
|
|
||
| <div data-lang="python" markdown="1"> | ||
|
|
||
| One type of table that is supported by Spark SQL is an RDD of dictionaries. The keys of the | ||
| dictionary define the columns names of the table, and the types are inferred by looking at the first | ||
| row. Any RDD of dictionaries can converted to a SchemaRDD and then registered as a table. Tables | ||
| can be used in subsequent SQL statements. | ||
|
|
||
| {% highlight python %} | ||
| # Load a text file and convert each line to a dictionary. | ||
| lines = sc.textFile("examples/src/main/resources/people.txt") | ||
| parts = lines.map(lambda l: l.split(",")) | ||
| people = parts.map(lambda p: {"name": p[0], "age": int(p[1])}) | ||
|
|
||
| # Infer the schema, and register the SchemaRDD as a table. | ||
| # In future versions of PySpark we would like to add support for registering RDDs with other | ||
| # datatypes as tables | ||
| peopleTable = sqlCtx.inferSchema(people) | ||
| peopleTable.registerAsTable("people") | ||
|
|
||
| # SQL can be run over SchemaRDDs that have been registered as a table. | ||
| teenagers = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") | ||
|
|
||
| # The results of SQL queries are RDDs and support all the normal RDD operations. | ||
| teenNames = teenagers.map(lambda p: "Name: " + p.name) | ||
| {% endhighlight %} | ||
|
|
||
| </div> | ||
|
|
||
| </div> | ||
|
|
||
| **Note that Spark SQL currently uses a very basic SQL parser.** | ||
|
|
@@ -231,6 +285,27 @@ parquetFile.registerAsTable("parquetFile"); | |
| JavaSchemaRDD teenagers = sqlCtx.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19"); | ||
|
|
||
|
|
||
| {% endhighlight %} | ||
|
|
||
| </div> | ||
|
|
||
| <div data-lang="python" markdown="1"> | ||
|
|
||
| {% highlight python %} | ||
|
|
||
| peopleTable # The SchemaRDD from the previous example. | ||
|
|
||
| # SchemaRDDs can be saved as parquet files, maintaining the schema information. | ||
| peopleTable.saveAsParquetFile("people.parquet") | ||
|
|
||
| # Read in the parquet file created above. Parquet files are self-describing so the schema is preserved. | ||
| # The result of loading a parquet file is also a SchemaRDD. | ||
| parquetFile = sqlCtx.parquetFile("people.parquet") | ||
|
|
||
| # Parquet files can also be registered as tables and then used in SQL statements. | ||
| parquetFile.registerAsTable("parquetFile"); | ||
| teenagers = sqlCtx.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") | ||
|
|
||
| {% endhighlight %} | ||
|
|
||
| </div> | ||
|
|
@@ -318,4 +393,24 @@ Row[] results = hiveCtx.hql("FROM src SELECT key, value").collect(); | |
|
|
||
| </div> | ||
|
|
||
| <div data-lang="python" markdown="1"> | ||
|
|
||
| When working with Hive one must construct a `HiveContext`, which inherits from `SQLContext`, and | ||
| adds support for finding tables in in the MetaStore and writing queries using HiveQL. In addition to | ||
| the `sql` method a `HiveContext` also provides an `hql` methods, which allows queries to be | ||
| expressed in HiveQL. | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure if this is said earlier in the doc, but you should say how to build Spark for Hive support.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It has it's own section earlier in the dock starting on line 338. |
||
|
|
||
| {% highlight python %} | ||
|
|
||
| from pyspark.sql import HiveContext | ||
| hiveCtx = HiveContext(sc) | ||
|
|
||
| hiveCtx.hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") | ||
| hiveCtx.hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") | ||
|
|
||
| # Queries can be expressed in HiveQL. | ||
| results = hiveCtx.hql("FROM src SELECT key, value").collect() | ||
|
|
||
| {% endhighlight %} | ||
|
|
||
| </div> | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -64,5 +64,9 @@ def run(self): | |
| java_import(gateway.jvm, "org.apache.spark.api.java.*") | ||
| java_import(gateway.jvm, "org.apache.spark.api.python.*") | ||
| java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*") | ||
| java_import(gateway.jvm, "org.apache.spark.sql.SQLContext") | ||
| java_import(gateway.jvm, "org.apache.spark.sql.hive.HiveContext") | ||
| java_import(gateway.jvm, "org.apache.spark.sql.hive.LocalHiveContext") | ||
| java_import(gateway.jvm, "org.apache.spark.sql.hive.TestHiveContext") | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will this work if users haven't built with Hive? Maybe we want to make the Hive support optional. Not sure what the best way to do so is.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This will still work, but it will throw a non-fatal exception when the user tries to use a HiveContext without hive built. I'll catch that and present a better error message that indicates that the user would need to build spark with hive with SPARK_HIVE=true.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I added an better message that tells the user they need to compile with Spark with Hive to use the hive context |
||
| java_import(gateway.jvm, "scala.Tuple2") | ||
| return gateway | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe add something saying that in future versions of PySpark, we'd like to support RDDs with other data types in registerAsTable too.