From b4bc82d2072e0ddb2204a04404d7afa2b2263aa9 Mon Sep 17 00:00:00 2001 From: Ahir Reddy Date: Sun, 6 Apr 2014 15:00:47 -0700 Subject: [PATCH 01/18] compiling --- .../apache/spark/api/python/PythonRDD.scala | 10 +++++++ project/SparkBuild.scala | 3 ++- .../spark/sql/api/java/JavaSQLContext.scala | 27 +++++++++++++++++++ 3 files changed, 39 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 32f1100406d74..934bfae31432f 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -25,6 +25,8 @@ import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collectio import scala.collection.JavaConversions._ import scala.reflect.ClassTag +import net.razorvine.pickle.Unpickler + import org.apache.spark._ import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD} import org.apache.spark.broadcast.Broadcast @@ -284,6 +286,14 @@ private[spark] object PythonRDD { file.close() } + def pythonToJava(pyRDD: JavaRDD[Array[Byte]]): JavaRDD[_] = { + pyRDD.rdd.mapPartitions { iter => + val unpickle = new Unpickler + iter.map { row => + unpickle.loads(row) + } + } + } } private diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 843a874fbfdb0..53a574fe524dd 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -328,7 +328,8 @@ object SparkBuild extends Build { "com.twitter" %% "chill" % "0.3.1" excludeAll(excludeAsm), "com.twitter" % "chill-java" % "0.3.1" excludeAll(excludeAsm), "org.tachyonproject" % "tachyon" % "0.4.1-thrift" excludeAll(excludeHadoop, excludeCurator, excludeEclipseJetty, excludePowermock), - "com.clearspring.analytics" % "stream" % "2.5.1" + "com.clearspring.analytics" % "stream" % "2.5.1", + "net.razorvine" % "pyrolite_2.10" % "1.1" ), libraryDependencies ++= maybeAvro ) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala index 573345e42c43c..81816ef98e3a4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala @@ -82,6 +82,33 @@ class JavaSQLContext(sparkContext: JavaSparkContext) { new JavaSchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(schema, rowRdd))) } + /** + * Applies a schema to an RDD of Array[Any] + */ + def applySchema(rdd: JavaRDD[Array[Any]]): JavaSchemaRDD = { + val fields = rdd.first.map(_.getClass) + val schema = fields.zipWithIndex.map { case (klass, index) => + val dataType = klass match { + case c: Class[_] if c == classOf[java.lang.String] => StringType + case c: Class[_] if c == java.lang.Short.TYPE => ShortType + case c: Class[_] if c == java.lang.Integer.TYPE => IntegerType + case c: Class[_] if c == java.lang.Long.TYPE => LongType + case c: Class[_] if c == java.lang.Double.TYPE => DoubleType + case c: Class[_] if c == java.lang.Byte.TYPE => ByteType + case c: Class[_] if c == java.lang.Float.TYPE => FloatType + case c: Class[_] if c == java.lang.Boolean.TYPE => BooleanType + } + + AttributeReference(index.toString, dataType, true)() + } + + val rowRdd = rdd.rdd.mapPartitions { iter => + iter.map { row => + new GenericRow(row): ScalaRow + } + } + new JavaSchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(schema, rowRdd))) + } /** * Loads a parquet file, returning the result as a [[JavaSchemaRDD]]. From b6f4feb3c4917f463d2f54647dd2781a20fc63bc Mon Sep 17 00:00:00 2001 From: Ahir Reddy Date: Sun, 6 Apr 2014 15:03:59 -0700 Subject: [PATCH 02/18] Java to python --- .../scala/org/apache/spark/api/python/PythonRDD.scala | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 934bfae31432f..0322401765494 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -25,7 +25,7 @@ import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collectio import scala.collection.JavaConversions._ import scala.reflect.ClassTag -import net.razorvine.pickle.Unpickler +import net.razorvine.pickle.{Pickler, Unpickler} import org.apache.spark._ import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD} @@ -294,6 +294,15 @@ private[spark] object PythonRDD { } } } + + def javaToPython(jRDD: JavaRDD[Any]): JavaRDD[Array[Byte]] = { + jRDD.rdd.mapPartitions { iter => + val unpickle = new Pickler + iter.map { row => + unpickle.dumps(row) + } + } + } } private From 5cb8dc05a03a74f37f6cdaff165b3f1d1a94c1db Mon Sep 17 00:00:00 2001 From: Ahir Reddy Date: Sun, 6 Apr 2014 16:07:10 -0700 Subject: [PATCH 03/18] java to python, and python to java --- .../scala/org/apache/spark/api/python/PythonRDD.scala | 8 ++++++-- python/pyspark/context.py | 2 ++ 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 0322401765494..31f7cd319a0c0 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -289,8 +289,12 @@ private[spark] object PythonRDD { def pythonToJava(pyRDD: JavaRDD[Array[Byte]]): JavaRDD[_] = { pyRDD.rdd.mapPartitions { iter => val unpickle = new Unpickler - iter.map { row => - unpickle.loads(row) + iter.flatMap { row => + unpickle.loads(row) match { + case objs: java.util.ArrayList[Any] => objs + // Incase the partition doesn't have a collection + case obj => Seq(obj) + } } } } diff --git a/python/pyspark/context.py b/python/pyspark/context.py index d8667e84fedff..7a5f9349649f8 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -174,6 +174,8 @@ def _ensure_initialized(cls, instance=None, gateway=None): SparkContext._gateway = gateway or launch_gateway() SparkContext._jvm = SparkContext._gateway.jvm SparkContext._writeToFile = SparkContext._jvm.PythonRDD.writeToFile + SparkContext._pythonToJava = SparkContext._jvm.PythonRDD.pythonToJava + SparkContext._javaToPython = SparkContext._jvm.PythonRDD.javaToPython if instance: if SparkContext._active_spark_context and SparkContext._active_spark_context != instance: From d2c60af513afca5aec0292316c9c0516de66927f Mon Sep 17 00:00:00 2001 From: Ahir Reddy Date: Sun, 6 Apr 2014 21:41:09 -0700 Subject: [PATCH 04/18] Added schema rdd class --- python/pyspark/java_gateway.py | 1 + python/pyspark/rdd.py | 8 ++++++++ 2 files changed, 9 insertions(+) diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 6a16756e0576d..8b079f7215b4b 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -64,5 +64,6 @@ def run(self): java_import(gateway.jvm, "org.apache.spark.api.java.*") java_import(gateway.jvm, "org.apache.spark.api.python.*") java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*") + java_import(gateway.jvm, "org.apache.spark.sql.api.java.JavaSQLContext") java_import(gateway.jvm, "scala.Tuple2") return gateway diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index fb27863e07f55..c793765f03064 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1387,6 +1387,14 @@ def _jrdd(self): def _is_pipelinable(self): return not (self.is_cached or self.is_checkpointed) +class SchemaRDD: + + def __init__(self, pyRDD): + self._pyRDD = pyRDD + self.ctx = pyRDD.ctx + self.sql_ctx = self.ctx._jvm.JavaSQLContext(self.ctx._jsc) + self._jrdd = self.ctx._pythonToJava(pyRDD._jrdd) + def _test(): import doctest From 949071bfd269f0ac608bfa470474c91cae97f91f Mon Sep 17 00:00:00 2001 From: Ahir Reddy Date: Mon, 7 Apr 2014 15:45:55 -0700 Subject: [PATCH 05/18] doesn't crash --- .../apache/spark/api/python/PythonRDD.scala | 3 +- .../spark/sql/api/java/JavaSQLContext.scala | 28 +++++++++++-------- 2 files changed, 19 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 31f7cd319a0c0..9eb16e1cec050 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -208,7 +208,7 @@ private object SpecialLengths { val TIMING_DATA = -3 } -private[spark] object PythonRDD { +object PythonRDD { val UTF8 = Charset.forName("UTF-8") def readRDDFromFile(sc: JavaSparkContext, filename: String, parallelism: Int): @@ -289,6 +289,7 @@ private[spark] object PythonRDD { def pythonToJava(pyRDD: JavaRDD[Array[Byte]]): JavaRDD[_] = { pyRDD.rdd.mapPartitions { iter => val unpickle = new Unpickler + // TODO: Figure out why flatMap is necessay for pyspark iter.flatMap { row => unpickle.loads(row) match { case objs: java.util.ArrayList[Any] => objs diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala index 81816ef98e3a4..92bf5bf20c175 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala @@ -85,26 +85,32 @@ class JavaSQLContext(sparkContext: JavaSparkContext) { /** * Applies a schema to an RDD of Array[Any] */ - def applySchema(rdd: JavaRDD[Array[Any]]): JavaSchemaRDD = { - val fields = rdd.first.map(_.getClass) + def applySchema(rdd: JavaRDD[_]): JavaSchemaRDD = { + val fields = rdd.first match { + case row: java.util.ArrayList[_] => row.toArray.map(_.getClass) + case row => throw new Exception(s"Rows must be Lists 1 ${row.getClass}") + } + val schema = fields.zipWithIndex.map { case (klass, index) => val dataType = klass match { case c: Class[_] if c == classOf[java.lang.String] => StringType - case c: Class[_] if c == java.lang.Short.TYPE => ShortType - case c: Class[_] if c == java.lang.Integer.TYPE => IntegerType - case c: Class[_] if c == java.lang.Long.TYPE => LongType - case c: Class[_] if c == java.lang.Double.TYPE => DoubleType - case c: Class[_] if c == java.lang.Byte.TYPE => ByteType - case c: Class[_] if c == java.lang.Float.TYPE => FloatType - case c: Class[_] if c == java.lang.Boolean.TYPE => BooleanType + case c: Class[_] if c == classOf[java.lang.Integer] => IntegerType + // case c: Class[_] if c == java.lang.Short.TYPE => ShortType + // case c: Class[_] if c == java.lang.Integer.TYPE => IntegerType + // case c: Class[_] if c == java.lang.Long.TYPE => LongType + // case c: Class[_] if c == java.lang.Double.TYPE => DoubleType + // case c: Class[_] if c == java.lang.Byte.TYPE => ByteType + // case c: Class[_] if c == java.lang.Float.TYPE => FloatType + // case c: Class[_] if c == java.lang.Boolean.TYPE => BooleanType } AttributeReference(index.toString, dataType, true)() } val rowRdd = rdd.rdd.mapPartitions { iter => - iter.map { row => - new GenericRow(row): ScalaRow + iter.map { + case row: java.util.ArrayList[_] => new GenericRow(row.toArray.asInstanceOf[Array[Any]]): ScalaRow + case row => throw new Exception(s"Rows must be Lists 2 ${row.getClass}") } } new JavaSchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(schema, rowRdd))) From 9cb15c858dbacfe6156c7289575d0d1baa5a986c Mon Sep 17 00:00:00 2001 From: Ahir Reddy Date: Mon, 7 Apr 2014 16:09:22 -0700 Subject: [PATCH 06/18] working --- python/pyspark/rdd.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index c793765f03064..83178df511b26 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1394,6 +1394,7 @@ def __init__(self, pyRDD): self.ctx = pyRDD.ctx self.sql_ctx = self.ctx._jvm.JavaSQLContext(self.ctx._jsc) self._jrdd = self.ctx._pythonToJava(pyRDD._jrdd) + self._srdd = self.sql_ctx.applySchema(self._jrdd) def _test(): From 730803e0843a3497d4bdf663a86363b33a8883c2 Mon Sep 17 00:00:00 2001 From: Ahir Reddy Date: Mon, 7 Apr 2014 16:47:36 -0700 Subject: [PATCH 07/18] more working --- python/pyspark/context.py | 19 ++++++++++++++++++- python/pyspark/rdd.py | 16 ++++++++++------ 2 files changed, 28 insertions(+), 7 deletions(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 7a5f9349649f8..751ed7d674722 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -32,7 +32,7 @@ PairDeserializer from pyspark.storagelevel import StorageLevel from pyspark import rdd -from pyspark.rdd import RDD +from pyspark.rdd import RDD, SchemaRDD from py4j.java_collections import ListConverter @@ -462,6 +462,23 @@ def sparkUser(self): """ return self._jsc.sc().sparkUser() +class SQLContext: + + def __init__(self, sparkContext): + self._sc = sparkContext + self._jsc = self._sc._jsc + self._jvm = self._sc._jvm + self._jsql_ctx = self._jvm.JavaSQLContext(self._jsc) + + def sql(self, sqlQuery): + return SchemaRDD(self._jsql_ctx.sql(sqlQuery), self) + + def applySchema(self, rdd): + jrdd = self._sc._pythonToJava(rdd._jrdd) + srdd = self._jsql_ctx.applySchema(jrdd) + return SchemaRDD(srdd, self) + + def _test(): import atexit import doctest diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 83178df511b26..a6141a8853cc3 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1389,13 +1389,17 @@ def _is_pipelinable(self): class SchemaRDD: - def __init__(self, pyRDD): - self._pyRDD = pyRDD - self.ctx = pyRDD.ctx - self.sql_ctx = self.ctx._jvm.JavaSQLContext(self.ctx._jsc) - self._jrdd = self.ctx._pythonToJava(pyRDD._jrdd) - self._srdd = self.sql_ctx.applySchema(self._jrdd) + def __init__(self, jschema_rdd, sql_ctx): + self.sql_ctx = sql_ctx + self._sc = sql_ctx._sc + self._jschema_rdd = jschema_rdd + def registerAsTable(self, name): + self._jschema_rdd.registerAsTable(name) + + def toPython(self): + jrdd = self._sc._javaToPython(self._jschema_rdd) + return RDD(jrdd, self._sc, self._sc.serializer) def _test(): import doctest From 837bd13bfa2e757ca6cdbe79af1ae00cba7749f0 Mon Sep 17 00:00:00 2001 From: Ahir Reddy Date: Mon, 7 Apr 2014 17:16:57 -0700 Subject: [PATCH 08/18] even better --- python/pyspark/rdd.py | 3 ++- .../apache/spark/sql/api/java/JavaSchemaRDD.scala | 12 ++++++++++++ 2 files changed, 14 insertions(+), 1 deletion(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index a6141a8853cc3..11e840623bc30 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1398,7 +1398,8 @@ def registerAsTable(self, name): self._jschema_rdd.registerAsTable(name) def toPython(self): - jrdd = self._sc._javaToPython(self._jschema_rdd) + jrdd = self._jschema_rdd.javaToPython() + #jrdd = self._sc._javaToPython(self._jschema_rdd) return RDD(jrdd, self._sc, self._sc.serializer) def _test(): diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala index d43d672938f51..f068519cc0e5e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.api.java +import net.razorvine.pickle.{Pickler, Unpickler} + import org.apache.spark.api.java.{JavaRDDLike, JavaRDD} import org.apache.spark.sql.{SQLContext, SchemaRDD, SchemaRDDLike} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -45,4 +47,14 @@ class JavaSchemaRDD( override def wrapRDD(rdd: RDD[Row]): JavaRDD[Row] = JavaRDD.fromRDD(rdd) val rdd = baseSchemaRDD.map(new Row(_)) + + def javaToPython: JavaRDD[Array[Byte]] = { + this.rdd.mapPartitions { iter => + val unpickle = new Pickler + iter.map { row => + val fields: Array[Any] = (for (i <- 0 to row.length - 1) yield row.get(i)).toArray + unpickle.dumps(fields) + } + } + } } From 224add86bf0ca3af5c478d8189103463f2ed9918 Mon Sep 17 00:00:00 2001 From: Ahir Reddy Date: Mon, 7 Apr 2014 18:26:48 -0700 Subject: [PATCH 09/18] yippie --- project/SparkBuild.scala | 1 + python/pyspark/context.py | 5 +++-- .../org/apache/spark/sql/api/java/JavaSQLContext.scala | 8 +++++--- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 53a574fe524dd..60d34ab2fb0f2 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -507,6 +507,7 @@ object SparkBuild extends Build { def extraAssemblySettings() = Seq( test in assembly := {}, + assemblyOption in assembly ~= { _.copy(cacheOutput = false) }, mergeStrategy in assembly := { case m if m.toLowerCase.endsWith("manifest.mf") => MergeStrategy.discard case m if m.toLowerCase.matches("meta-inf.*\\.sf$") => MergeStrategy.discard diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 751ed7d674722..22a98a7ec955e 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -473,9 +473,10 @@ def __init__(self, sparkContext): def sql(self, sqlQuery): return SchemaRDD(self._jsql_ctx.sql(sqlQuery), self) - def applySchema(self, rdd): + def applySchema(self, rdd, fieldNames): + fieldNames = ListConverter().convert(fieldNames, self._sc._gateway._gateway_client) jrdd = self._sc._pythonToJava(rdd._jrdd) - srdd = self._jsql_ctx.applySchema(jrdd) + srdd = self._jsql_ctx.applySchema(jrdd, fieldNames) return SchemaRDD(srdd, self) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala index 92bf5bf20c175..bd9fe7fbb0096 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala @@ -85,13 +85,13 @@ class JavaSQLContext(sparkContext: JavaSparkContext) { /** * Applies a schema to an RDD of Array[Any] */ - def applySchema(rdd: JavaRDD[_]): JavaSchemaRDD = { + def applySchema(rdd: JavaRDD[_], fieldNames: java.util.ArrayList[Any]): JavaSchemaRDD = { val fields = rdd.first match { case row: java.util.ArrayList[_] => row.toArray.map(_.getClass) case row => throw new Exception(s"Rows must be Lists 1 ${row.getClass}") } - val schema = fields.zipWithIndex.map { case (klass, index) => + val schema = fields.zip(fieldNames.toArray).map { case (klass, fieldName) => val dataType = klass match { case c: Class[_] if c == classOf[java.lang.String] => StringType case c: Class[_] if c == classOf[java.lang.Integer] => IntegerType @@ -104,7 +104,9 @@ class JavaSQLContext(sparkContext: JavaSparkContext) { // case c: Class[_] if c == java.lang.Boolean.TYPE => BooleanType } - AttributeReference(index.toString, dataType, true)() + println(fieldName.toString) + // TODO: No bueno, fieldName.toString used because I can't figure out the casting + AttributeReference(fieldName.toString, dataType, true)() } val rowRdd = rdd.rdd.mapPartitions { iter => From f16524d873d5b7e1f881d1d2bab66a88f9193bd7 Mon Sep 17 00:00:00 2001 From: Ahir Reddy Date: Mon, 7 Apr 2014 21:25:14 -0700 Subject: [PATCH 10/18] Switched to using Scala SQLContext --- .../apache/spark/api/python/PythonRDD.scala | 14 +++++++ python/pyspark/context.py | 16 +++++--- python/pyspark/java_gateway.py | 2 +- .../org/apache/spark/sql/SQLContext.scala | 27 ++++++++++++++ .../org/apache/spark/sql/SchemaRDD.scala | 13 +++++++ .../spark/sql/api/java/JavaSQLContext.scala | 37 +------------------ .../spark/sql/api/java/JavaSchemaRDD.scala | 12 ------ 7 files changed, 66 insertions(+), 55 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 9eb16e1cec050..11ab81f1498ba 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -300,6 +300,20 @@ object PythonRDD { } } + def pythonToJavaMap(pyRDD: JavaRDD[Array[Byte]]): JavaRDD[Map[String, _]] = { + pyRDD.rdd.mapPartitions { iter => + val unpickle = new Unpickler + // TODO: Figure out why flatMap is necessay for pyspark + iter.flatMap { row => + unpickle.loads(row) match { + case objs: java.util.ArrayList[JMap[String, _]] => objs.map(_.toMap) + // Incase the partition doesn't have a collection + case obj: JMap[String, _] => Seq(obj.toMap) + } + } + } + } + def javaToPython(jRDD: JavaRDD[Any]): JavaRDD[Array[Byte]] = { jRDD.rdd.mapPartitions { iter => val unpickle = new Pickler diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 22a98a7ec955e..b8ac6db974573 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -175,6 +175,7 @@ def _ensure_initialized(cls, instance=None, gateway=None): SparkContext._jvm = SparkContext._gateway.jvm SparkContext._writeToFile = SparkContext._jvm.PythonRDD.writeToFile SparkContext._pythonToJava = SparkContext._jvm.PythonRDD.pythonToJava + SparkContext._pythonToJavaMap = SparkContext._jvm.PythonRDD.pythonToJavaMap SparkContext._javaToPython = SparkContext._jvm.PythonRDD.javaToPython if instance: @@ -468,15 +469,18 @@ def __init__(self, sparkContext): self._sc = sparkContext self._jsc = self._sc._jsc self._jvm = self._sc._jvm - self._jsql_ctx = self._jvm.JavaSQLContext(self._jsc) + self._ssql_ctx = self._jvm.SQLContext(self._jsc.sc()) def sql(self, sqlQuery): - return SchemaRDD(self._jsql_ctx.sql(sqlQuery), self) + return SchemaRDD(self._ssql_ctx.sql(sqlQuery), self) - def applySchema(self, rdd, fieldNames): - fieldNames = ListConverter().convert(fieldNames, self._sc._gateway._gateway_client) - jrdd = self._sc._pythonToJava(rdd._jrdd) - srdd = self._jsql_ctx.applySchema(jrdd, fieldNames) + def applySchema(self, rdd): + first = rdd.first() + if (rdd.__class__ is SchemaRDD): + raise Exception("Cannot apply schema to %s" % SchemaRDD.__name__) + + jrdd = self._sc._pythonToJavaMap(rdd._jrdd) + srdd = self._ssql_ctx.applySchema(jrdd.rdd()) return SchemaRDD(srdd, self) diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 8b079f7215b4b..d8dd2a65225e1 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -64,6 +64,6 @@ def run(self): java_import(gateway.jvm, "org.apache.spark.api.java.*") java_import(gateway.jvm, "org.apache.spark.api.python.*") java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*") - java_import(gateway.jvm, "org.apache.spark.sql.api.java.JavaSQLContext") + java_import(gateway.jvm, "org.apache.spark.sql.SQLContext") java_import(gateway.jvm, "scala.Tuple2") return gateway diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index 36059c6630aa4..ef23cf1739246 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -25,11 +25,13 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.dsl import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.types._ import org.apache.spark.sql.catalyst.optimizer.Optimizer import org.apache.spark.sql.catalyst.plans.logical.{Subquery, LogicalPlan} import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.columnar.InMemoryColumnarTableScan import org.apache.spark.sql.execution._ +import org.apache.spark.api.java.JavaRDD /** * ALPHA COMPONENT @@ -238,4 +240,29 @@ class SQLContext(@transient val sparkContext: SparkContext) */ def debugExec() = DebugQuery(executedPlan).execute().collect() } + + def applySchema(rdd: RDD[Map[String, _]]): SchemaRDD = { + val schema = rdd.first.map { case (fieldName, obj) => + val dataType = obj.getClass match { + case c: Class[_] if c == classOf[java.lang.String] => StringType + case c: Class[_] if c == classOf[java.lang.Integer] => IntegerType + // case c: Class[_] if c == java.lang.Short.TYPE => ShortType + // case c: Class[_] if c == java.lang.Integer.TYPE => IntegerType + // case c: Class[_] if c == java.lang.Long.TYPE => LongType + // case c: Class[_] if c == java.lang.Double.TYPE => DoubleType + // case c: Class[_] if c == java.lang.Byte.TYPE => ByteType + // case c: Class[_] if c == java.lang.Float.TYPE => FloatType + // case c: Class[_] if c == java.lang.Boolean.TYPE => BooleanType + } + AttributeReference(fieldName, dataType, true)() + }.toSeq + + val rowRdd = rdd.mapPartitions { iter => + iter.map { map => + new GenericRow(map.values.toArray.asInstanceOf[Array[Any]]): Row + } + } + new SchemaRDD(this, SparkLogicalPlan(ExistingRdd(schema, rowRdd))) + } + } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala index fc95781448569..d65c2f447539d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql +import net.razorvine.pickle.{Pickler, Unpickler} + import org.apache.spark.{Dependency, OneToOneDependency, Partition, TaskContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.analysis._ @@ -24,6 +26,7 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.{Inner, JoinType} import org.apache.spark.sql.catalyst.types.BooleanType +import org.apache.spark.api.java.JavaRDD /** * ALPHA COMPONENT @@ -307,4 +310,14 @@ class SchemaRDD( /** FOR INTERNAL USE ONLY */ def analyze = sqlContext.analyzer(logicalPlan) + + def javaToPython: JavaRDD[Array[Byte]] = { + this.mapPartitions { iter => + val unpickle = new Pickler + iter.map { row => + val fields: Array[Any] = (for (i <- 0 to row.length - 1) yield row(i)).toArray + unpickle.dumps(fields) + } + } + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala index bd9fe7fbb0096..4ca4505fbfc5a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.api.java import java.beans.{Introspector, PropertyDescriptor} +import java.util.{Map => JMap} import org.apache.spark.api.java.{JavaRDD, JavaSparkContext} import org.apache.spark.sql.SQLContext @@ -82,42 +83,6 @@ class JavaSQLContext(sparkContext: JavaSparkContext) { new JavaSchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(schema, rowRdd))) } - /** - * Applies a schema to an RDD of Array[Any] - */ - def applySchema(rdd: JavaRDD[_], fieldNames: java.util.ArrayList[Any]): JavaSchemaRDD = { - val fields = rdd.first match { - case row: java.util.ArrayList[_] => row.toArray.map(_.getClass) - case row => throw new Exception(s"Rows must be Lists 1 ${row.getClass}") - } - - val schema = fields.zip(fieldNames.toArray).map { case (klass, fieldName) => - val dataType = klass match { - case c: Class[_] if c == classOf[java.lang.String] => StringType - case c: Class[_] if c == classOf[java.lang.Integer] => IntegerType - // case c: Class[_] if c == java.lang.Short.TYPE => ShortType - // case c: Class[_] if c == java.lang.Integer.TYPE => IntegerType - // case c: Class[_] if c == java.lang.Long.TYPE => LongType - // case c: Class[_] if c == java.lang.Double.TYPE => DoubleType - // case c: Class[_] if c == java.lang.Byte.TYPE => ByteType - // case c: Class[_] if c == java.lang.Float.TYPE => FloatType - // case c: Class[_] if c == java.lang.Boolean.TYPE => BooleanType - } - - println(fieldName.toString) - // TODO: No bueno, fieldName.toString used because I can't figure out the casting - AttributeReference(fieldName.toString, dataType, true)() - } - - val rowRdd = rdd.rdd.mapPartitions { iter => - iter.map { - case row: java.util.ArrayList[_] => new GenericRow(row.toArray.asInstanceOf[Array[Any]]): ScalaRow - case row => throw new Exception(s"Rows must be Lists 2 ${row.getClass}") - } - } - new JavaSchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(schema, rowRdd))) - } - /** * Loads a parquet file, returning the result as a [[JavaSchemaRDD]]. */ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala index f068519cc0e5e..d43d672938f51 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql.api.java -import net.razorvine.pickle.{Pickler, Unpickler} - import org.apache.spark.api.java.{JavaRDDLike, JavaRDD} import org.apache.spark.sql.{SQLContext, SchemaRDD, SchemaRDDLike} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan @@ -47,14 +45,4 @@ class JavaSchemaRDD( override def wrapRDD(rdd: RDD[Row]): JavaRDD[Row] = JavaRDD.fromRDD(rdd) val rdd = baseSchemaRDD.map(new Row(_)) - - def javaToPython: JavaRDD[Array[Byte]] = { - this.rdd.mapPartitions { iter => - val unpickle = new Pickler - iter.map { row => - val fields: Array[Any] = (for (i <- 0 to row.length - 1) yield row.get(i)).toArray - unpickle.dumps(fields) - } - } - } } From d69594dca922f87aa4ac05c4ab0b59a47eb12e5b Mon Sep 17 00:00:00 2001 From: Ahir Reddy Date: Mon, 7 Apr 2014 22:11:23 -0700 Subject: [PATCH 11/18] returning dictionaries works --- .../scala/org/apache/spark/sql/SchemaRDD.scala | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala index d65c2f447539d..886869c22d55d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala @@ -27,6 +27,7 @@ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.{Inner, JoinType} import org.apache.spark.sql.catalyst.types.BooleanType import org.apache.spark.api.java.JavaRDD +import java.util.{Map => JMap} /** * ALPHA COMPONENT @@ -312,11 +313,18 @@ class SchemaRDD( def analyze = sqlContext.analyzer(logicalPlan) def javaToPython: JavaRDD[Array[Byte]] = { + //val fieldNames: Seq[String] = logicalPlan.references.map(_.name) this.mapPartitions { iter => - val unpickle = new Pickler + val pickle = new Pickler iter.map { row => - val fields: Array[Any] = (for (i <- 0 to row.length - 1) yield row(i)).toArray - unpickle.dumps(fields) + val fieldNames: Seq[String] = (1 to row.length).map(_.toString + "KEY") //TODO: Temporary + val map: JMap[String, Any] = new java.util.HashMap + val arr: java.util.ArrayList[Any] = new java.util.ArrayList + row.zip(fieldNames).foreach { case (obj, name) => + map.put(name, obj) + } + arr.add(map) + pickle.dumps(arr) } } } From 337ed16ea5d30fc9e51415607cde2f24219c5624 Mon Sep 17 00:00:00 2001 From: Ahir Reddy Date: Mon, 7 Apr 2014 22:17:48 -0700 Subject: [PATCH 12/18] output dictionaries correctly --- sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala index 886869c22d55d..5dec32ef418df 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala @@ -313,11 +313,10 @@ class SchemaRDD( def analyze = sqlContext.analyzer(logicalPlan) def javaToPython: JavaRDD[Array[Byte]] = { - //val fieldNames: Seq[String] = logicalPlan.references.map(_.name) + val fieldNames: Seq[String] = this.queryExecution.analyzed.output.map(_.name) this.mapPartitions { iter => val pickle = new Pickler iter.map { row => - val fieldNames: Seq[String] = (1 to row.length).map(_.toString + "KEY") //TODO: Temporary val map: JMap[String, Any] = new java.util.HashMap val arr: java.util.ArrayList[Any] = new java.util.ArrayList row.zip(fieldNames).foreach { case (obj, name) => From ed9e3b447f0114e8bbe02166fb61d7965a4eb641 Mon Sep 17 00:00:00 2001 From: Ahir Reddy Date: Mon, 7 Apr 2014 22:25:26 -0700 Subject: [PATCH 13/18] return row objects --- python/pyspark/rdd.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 11e840623bc30..885e12a3e9975 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1387,6 +1387,11 @@ def _jrdd(self): def _is_pipelinable(self): return not (self.is_cached or self.is_checkpointed) +class Row: + + def __init__(self, d): + self.__dict__ = dict(self.__dict__.items() + d.items()) + class SchemaRDD: def __init__(self, jschema_rdd, sql_ctx): @@ -1400,7 +1405,7 @@ def registerAsTable(self, name): def toPython(self): jrdd = self._jschema_rdd.javaToPython() #jrdd = self._sc._javaToPython(self._jschema_rdd) - return RDD(jrdd, self._sc, self._sc.serializer) + return RDD(jrdd, self._sc, self._sc.serializer).map(lambda d: Row(d)) def _test(): import doctest From 2d44498d9932821437fc3c0794eafe357591d86d Mon Sep 17 00:00:00 2001 From: Ahir Reddy Date: Mon, 7 Apr 2014 22:42:19 -0700 Subject: [PATCH 14/18] awesome row objects --- python/pyspark/rdd.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 885e12a3e9975..e7e807d70dae9 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1387,10 +1387,12 @@ def _jrdd(self): def _is_pipelinable(self): return not (self.is_cached or self.is_checkpointed) -class Row: +class Row(dict): def __init__(self, d): - self.__dict__ = dict(self.__dict__.items() + d.items()) + d.update(self.__dict__) + self.__dict__ = d + dict.__init__(self, d) class SchemaRDD: From 1f6e3436291572bbcda267179b320daa939e7b8e Mon Sep 17 00:00:00 2001 From: Ahir Reddy Date: Mon, 7 Apr 2014 23:13:01 -0700 Subject: [PATCH 15/18] SchemaRDD now has all RDD operations --- python/pyspark/rdd.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index e7e807d70dae9..e0a70ae557bae 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1401,14 +1401,26 @@ def __init__(self, jschema_rdd, sql_ctx): self._sc = sql_ctx._sc self._jschema_rdd = jschema_rdd + self._jrdd = self.toPython()._jrdd + self.is_cached = False + self.is_checkpointed = False + self.ctx = self.sql_ctx._sc + self._jrdd_deserializer = self.ctx.serializer + # TODO: Figure out how to make this lazy + #self._id = self._jrdd.id() + def registerAsTable(self, name): self._jschema_rdd.registerAsTable(name) def toPython(self): jrdd = self._jschema_rdd.javaToPython() - #jrdd = self._sc._javaToPython(self._jschema_rdd) return RDD(jrdd, self._sc, self._sc.serializer).map(lambda d: Row(d)) +customRDDDict = dict(RDD.__dict__) +del customRDDDict["__init__"] + +SchemaRDD.__dict__.update(customRDDDict) + def _test(): import doctest from pyspark.context import SparkContext From ef91795554afd59c5fefa61721df62354094b92d Mon Sep 17 00:00:00 2001 From: Ahir Reddy Date: Mon, 7 Apr 2014 23:19:05 -0700 Subject: [PATCH 16/18] made jrdd explicitly lazy --- python/pyspark/rdd.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index e0a70ae557bae..e2a1f7735e19f 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1394,20 +1394,25 @@ def __init__(self, d): self.__dict__ = d dict.__init__(self, d) -class SchemaRDD: +class SchemaRDD(RDD): def __init__(self, jschema_rdd, sql_ctx): self.sql_ctx = sql_ctx self._sc = sql_ctx._sc self._jschema_rdd = jschema_rdd - self._jrdd = self.toPython()._jrdd self.is_cached = False self.is_checkpointed = False self.ctx = self.sql_ctx._sc self._jrdd_deserializer = self.ctx.serializer - # TODO: Figure out how to make this lazy - #self._id = self._jrdd.id() + + @property + def _jrdd(self): + return self.toPython()._jrdd + + @property + def _id(self): + return self._jrdd.id() def registerAsTable(self, name): self._jschema_rdd.registerAsTable(name) @@ -1416,11 +1421,6 @@ def toPython(self): jrdd = self._jschema_rdd.javaToPython() return RDD(jrdd, self._sc, self._sc.serializer).map(lambda d: Row(d)) -customRDDDict = dict(RDD.__dict__) -del customRDDDict["__init__"] - -SchemaRDD.__dict__.update(customRDDDict) - def _test(): import doctest from pyspark.context import SparkContext From ec5b6e63782f3d181546bcd00bfb05e039a52b1d Mon Sep 17 00:00:00 2001 From: Ahir Reddy Date: Mon, 7 Apr 2014 23:33:52 -0700 Subject: [PATCH 17/18] for now only allow dictionaries as input --- python/pyspark/context.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index b8ac6db974573..f30ebb9c8e7d9 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -475,9 +475,10 @@ def sql(self, sqlQuery): return SchemaRDD(self._ssql_ctx.sql(sqlQuery), self) def applySchema(self, rdd): - first = rdd.first() if (rdd.__class__ is SchemaRDD): raise Exception("Cannot apply schema to %s" % SchemaRDD.__name__) + elif type(rdd.first()) is not dict: + raise Exception("Only RDDs with dictionaries can be converted to %s" % SchemaRDD.__name__) jrdd = self._sc._pythonToJavaMap(rdd._jrdd) srdd = self._ssql_ctx.applySchema(jrdd.rdd()) From 6c690e590e214c6f4e4e7f28eaedb30874df5ec6 Mon Sep 17 00:00:00 2001 From: Ahir Reddy Date: Mon, 7 Apr 2014 23:36:53 -0700 Subject: [PATCH 18/18] added todo explaining cost of creating Row object in python --- python/pyspark/rdd.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index e2a1f7735e19f..e11300212c4b1 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -1419,6 +1419,9 @@ def registerAsTable(self, name): def toPython(self): jrdd = self._jschema_rdd.javaToPython() + # TODO: This is inefficient, we should construct the Python Row object + # in Java land in the javaToPython function. May require a custom + # pickle serializer in Pyrolite return RDD(jrdd, self._sc, self._sc.serializer).map(lambda d: Row(d)) def _test():