diff --git a/core/pom.xml b/core/pom.xml index a1bdd8ec68ae..d87e2bca030e 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -266,6 +266,11 @@ junit-interface test + + org.spark-project + pyrolite + 2.0 + target/scala-${scala.binary.version}/classes diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 32f1100406d7..f9d86fed34d0 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -25,6 +25,8 @@ import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collectio import scala.collection.JavaConversions._ import scala.reflect.ClassTag +import net.razorvine.pickle.{Pickler, Unpickler} + import org.apache.spark._ import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD} import org.apache.spark.broadcast.Broadcast @@ -284,6 +286,36 @@ private[spark] object PythonRDD { file.close() } + /** + * Convert an RDD of serialized Python dictionaries to Scala Maps + * TODO: Support more Python types. + */ + def pythonToJavaMap(pyRDD: JavaRDD[Array[Byte]]): JavaRDD[Map[String, _]] = { + pyRDD.rdd.mapPartitions { iter => + val unpickle = new Unpickler + // TODO: Figure out why flatMap is necessay for pyspark + iter.flatMap { row => + unpickle.loads(row) match { + case objs: java.util.ArrayList[JMap[String, _] @unchecked] => objs.map(_.toMap) + // Incase the partition doesn't have a collection + case obj: JMap[String @unchecked, _] => Seq(obj.toMap) + } + } + } + } + + /** + * Convert and RDD of Java objects to and RDD of serialized Python objects, that is usable by + * PySpark. + */ + def javaToPython(jRDD: JavaRDD[Any]): JavaRDD[Array[Byte]] = { + jRDD.rdd.mapPartitions { iter => + val pickle = new Pickler + iter.map { row => + pickle.dumps(row) + } + } + } } private diff --git a/dev/run-tests b/dev/run-tests index 6ad674a2ba12..0725b681f1a1 100755 --- a/dev/run-tests +++ b/dev/run-tests @@ -34,6 +34,7 @@ else fi JAVA_VERSION=$($java_cmd -version 2>&1 | sed 's/java version "\(.*\)\.\(.*\)\..*"/\1\2/; 1q') [ "$JAVA_VERSION" -ge 18 ] && echo "" || echo "[Warn] Java 8 tests will not run because JDK version is < 1.8." +export SPARK_HIVE=true echo "=========================================================================" echo "Running Apache RAT checks" diff --git a/docs/README.md b/docs/README.md index 0678fc5c8670..75b1811ba99a 100644 --- a/docs/README.md +++ b/docs/README.md @@ -42,7 +42,7 @@ To mark a block of code in your markdown to be syntax highlighted by jekyll duri You can build just the Spark scaladoc by running `sbt/sbt doc` from the SPARK_PROJECT_ROOT directory. -Similarly, you can build just the PySpark epydoc by running `epydoc --config epydoc.conf` from the SPARK_PROJECT_ROOT/pyspark directory. +Similarly, you can build just the PySpark epydoc by running `epydoc --config epydoc.conf` from the SPARK_PROJECT_ROOT/pyspark directory. Documentation is only generated for classes that are listed as public in `__init__.py`. When you run `jekyll` in the docs directory, it will also copy over the scaladoc for the various Spark subprojects into the docs directory (and then also into the _site directory). We use a jekyll plugin to run `sbt/sbt doc` before building the site so if you haven't run it (recently) it may take some time as it generates all of the scaladoc. The jekyll plugin also generates the PySpark docs using [epydoc](http://epydoc.sourceforge.net/). diff --git a/docs/_plugins/copy_api_dirs.rb b/docs/_plugins/copy_api_dirs.rb index bbd56d2fd13b..05f0bd47a88a 100644 --- a/docs/_plugins/copy_api_dirs.rb +++ b/docs/_plugins/copy_api_dirs.rb @@ -32,8 +32,8 @@ curr_dir = pwd cd("..") - puts "Running sbt/sbt doc from " + pwd + "; this may take a few minutes..." - puts `sbt/sbt doc` + puts "Running 'sbt/sbt doc hive/doc' from " + pwd + "; this may take a few minutes..." + puts `sbt/sbt doc hive/doc` puts "Moving back into docs dir." cd("docs") diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md index a59393e1424d..6f616fb7c244 100644 --- a/docs/sql-programming-guide.md +++ b/docs/sql-programming-guide.md @@ -20,7 +20,7 @@ a schema that describes the data types of each column in the row. A SchemaRDD i in a traditional relational database. A SchemaRDD can be created from an existing RDD, parquet file, or by running HiveQL against data stored in [Apache Hive](http://hive.apache.org/). -**All of the examples on this page use sample data included in the Spark distribution and can be run in the spark-shell.** +**All of the examples on this page use sample data included in the Spark distribution and can be run in the `spark-shell`.** @@ -33,6 +33,19 @@ a schema that describes the data types of each column in the row. A JavaSchemaR in a traditional relational database. A JavaSchemaRDD can be created from an existing RDD, parquet file, or by running HiveQL against data stored in [Apache Hive](http://hive.apache.org/). + +
+ +Spark SQL allows relational queries expressed in SQL or HiveQL to be executed using +Spark. At the core of this component is a new type of RDD, +[SchemaRDD](api/pyspark/pyspark.sql.SchemaRDD-class.html). SchemaRDDs are composed +[Row](api/pyspark/pyspark.sql.Row-class.html) objects along with +a schema that describes the data types of each column in the row. A SchemaRDD is similar to a table +in a traditional relational database. A SchemaRDD can be created from an existing RDD, parquet +file, or by running HiveQL against data stored in [Apache Hive](http://hive.apache.org/). + +**All of the examples on this page use sample data included in the Spark distribution and can be run in the `pyspark` shell.** +
*************************************************************************************************** @@ -44,7 +57,7 @@ file, or by running HiveQL against data stored in [Apache Hive](http://hive.apac The entry point into all relational functionality in Spark is the [SQLContext](api/sql/core/index.html#org.apache.spark.sql.SQLContext) class, or one of its -decendents. To create a basic SQLContext, all you need is a SparkContext. +descendants. To create a basic SQLContext, all you need is a SparkContext. {% highlight scala %} val sc: SparkContext // An existing SparkContext. @@ -60,7 +73,7 @@ import sqlContext._ The entry point into all relational functionality in Spark is the [JavaSQLContext](api/sql/core/index.html#org.apache.spark.sql.api.java.JavaSQLContext) class, or one -of its decendents. To create a basic JavaSQLContext, all you need is a JavaSparkContext. +of its descendants. To create a basic JavaSQLContext, all you need is a JavaSparkContext. {% highlight java %} JavaSparkContext ctx = ...; // An existing JavaSparkContext. @@ -69,6 +82,19 @@ JavaSQLContext sqlCtx = new org.apache.spark.sql.api.java.JavaSQLContext(ctx); +
+ +The entry point into all relational functionality in Spark is the +[SQLContext](api/pyspark/pyspark.sql.SQLContext-class.html) class, or one +of its decedents. To create a basic SQLContext, all you need is a SparkContext. + +{% highlight python %} +from pyspark.sql import SQLContext +sqlCtx = SQLContext(sc) +{% endhighlight %} + +
+ ## Running SQL on RDDs @@ -81,7 +107,7 @@ One type of table that is supported by Spark SQL is an RDD of Scala case classes defines the schema of the table. The names of the arguments to the case class are read using reflection and become the names of the columns. Case classes can also be nested or contain complex types such as Sequences or Arrays. This RDD can be implicitly converted to a SchemaRDD and then be -registered as a table. Tables can used in subsequent SQL statements. +registered as a table. Tables can be used in subsequent SQL statements. {% highlight scala %} val sqlContext = new org.apache.spark.sql.SQLContext(sc) @@ -176,6 +202,34 @@ List teenagerNames = teenagers.map(new Function() { +
+ +One type of table that is supported by Spark SQL is an RDD of dictionaries. The keys of the +dictionary define the columns names of the table, and the types are inferred by looking at the first +row. Any RDD of dictionaries can converted to a SchemaRDD and then registered as a table. Tables +can be used in subsequent SQL statements. + +{% highlight python %} +# Load a text file and convert each line to a dictionary. +lines = sc.textFile("examples/src/main/resources/people.txt") +parts = lines.map(lambda l: l.split(",")) +people = parts.map(lambda p: {"name": p[0], "age": int(p[1])}) + +# Infer the schema, and register the SchemaRDD as a table. +# In future versions of PySpark we would like to add support for registering RDDs with other +# datatypes as tables +peopleTable = sqlCtx.inferSchema(people) +peopleTable.registerAsTable("people") + +# SQL can be run over SchemaRDDs that have been registered as a table. +teenagers = sqlCtx.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19") + +# The results of SQL queries are RDDs and support all the normal RDD operations. +teenNames = teenagers.map(lambda p: "Name: " + p.name) +{% endhighlight %} + +
+ **Note that Spark SQL currently uses a very basic SQL parser.** @@ -231,6 +285,27 @@ parquetFile.registerAsTable("parquetFile"); JavaSchemaRDD teenagers = sqlCtx.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19"); +{% endhighlight %} + + + +
+ +{% highlight python %} + +peopleTable # The SchemaRDD from the previous example. + +# SchemaRDDs can be saved as parquet files, maintaining the schema information. +peopleTable.saveAsParquetFile("people.parquet") + +# Read in the parquet file created above. Parquet files are self-describing so the schema is preserved. +# The result of loading a parquet file is also a SchemaRDD. +parquetFile = sqlCtx.parquetFile("people.parquet") + +# Parquet files can also be registered as tables and then used in SQL statements. +parquetFile.registerAsTable("parquetFile"); +teenagers = sqlCtx.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19") + {% endhighlight %}
@@ -318,4 +393,24 @@ Row[] results = hiveCtx.hql("FROM src SELECT key, value").collect(); +
+ +When working with Hive one must construct a `HiveContext`, which inherits from `SQLContext`, and +adds support for finding tables in in the MetaStore and writing queries using HiveQL. In addition to +the `sql` method a `HiveContext` also provides an `hql` methods, which allows queries to be +expressed in HiveQL. + +{% highlight python %} + +from pyspark.sql import HiveContext +hiveCtx = HiveContext(sc) + +hiveCtx.hql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") +hiveCtx.hql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") + +# Queries can be expressed in HiveQL. +results = hiveCtx.hql("FROM src SELECT key, value").collect() + +{% endhighlight %} +
diff --git a/pom.xml b/pom.xml index 5f66cbe76859..616563809e97 100644 --- a/pom.xml +++ b/pom.xml @@ -262,7 +262,7 @@ com.clearspring.analytics stream - 2.4.0 + 2.5.1