From 74ffefb453dd14f49d88c7a7b8a406b82f325c56 Mon Sep 17 00:00:00 2001 From: witgo Date: Mon, 5 May 2014 00:05:44 +0800 Subject: [PATCH] SPARK-1699: Python relative independence from the core, becomes subprojects --- assembly/pom.xml | 20 ++- core/pom.xml | 47 ------- .../scala/org/apache/spark/SparkEnv.scala | 22 +--- make-distribution.sh | 40 ++++-- .../MatrixFactorizationModel.scala | 17 --- pom.xml | 8 +- python-api/pom.xml | 116 ++++++++++++++++++ .../org/apache/spark/PythonSparkEnv.scala | 80 ++++++++++++ .../spark/api/python/PythonPartitioner.scala | 0 .../apache/spark/api/python/PythonRDD.scala | 4 +- .../api/python/PythonWorkerFactory.scala | 6 +- .../mllib/api/python/PythonMLLibAPI.scala | 17 +++ .../spark/sql/api/python/PythonSQLAPI.scala | 57 +++++++++ .../spark/api/python/PythonRDDSuite.scala | 0 python/pyspark/mllib/recommendation.py | 3 +- python/pyspark/sql.py | 3 +- .../org/apache/spark/sql/SchemaRDD.scala | 22 ---- 17 files changed, 333 insertions(+), 129 deletions(-) create mode 100644 python-api/pom.xml create mode 100644 python-api/src/main/scala/org/apache/spark/PythonSparkEnv.scala rename {core => python-api}/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala (100%) rename {core => python-api}/src/main/scala/org/apache/spark/api/python/PythonRDD.scala (99%) rename {core => python-api}/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala (99%) rename {mllib => python-api}/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala (95%) create mode 100644 python-api/src/main/scala/org/apache/spark/sql/api/python/PythonSQLAPI.scala rename {core => python-api}/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala (100%) diff --git a/assembly/pom.xml b/assembly/pom.xml index bdb38806492a6..057324e889076 100644 --- a/assembly/pom.xml +++ b/assembly/pom.xml @@ -84,11 +84,6 @@ spark-sql_${scala.binary.version} ${project.version} - - net.sf.py4j - py4j - 0.8.1 - @@ -173,6 +168,21 @@ + + python + + + net.sf.py4j + py4j + 0.8.1 + + + org.apache.spark + python-api_${scala.binary.version} + ${project.version} + + + spark-ganglia-lgpl diff --git a/core/pom.xml b/core/pom.xml index 822b5b1dd7cc2..ff8f3e7631463 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -244,11 +244,6 @@ junit-interface test - - org.spark-project - pyrolite - 2.0.1 - target/scala-${scala.binary.version}/classes @@ -294,48 +289,6 @@ - - - org.codehaus.mojo - exec-maven-plugin - 1.2.1 - - - generate-resources - - exec - - - - - unzip - ../python - - -o - lib/py4j*.zip - -d - build - - - - - - - src/main/resources - - - ../python - - pyspark/*.py - - - - ../python/build - - py4j/*.py - - - diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index bea435ec34ce9..454a6e744e4db 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -26,7 +26,6 @@ import akka.actor._ import com.google.common.collect.MapMaker import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.api.python.PythonWorkerFactory import org.apache.spark.broadcast.BroadcastManager import org.apache.spark.metrics.MetricsSystem import org.apache.spark.network.ConnectionManager @@ -67,15 +66,14 @@ class SparkEnv ( // A mapping of thread ID to amount of memory used for shuffle in bytes // All accesses should be manually synchronized val shuffleMemoryMap = mutable.HashMap[Long, Long]() - - private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]() + val closeables = mutable.ListBuffer[java.io.Closeable]() // A general, soft-reference map for metadata needed during HadoopRDD split computation // (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats). private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]() private[spark] def stop() { - pythonWorkers.foreach { case(key, worker) => worker.stop() } + closeables.toList.foreach(_.close()) httpFileServer.stop() mapOutputTracker.stop() shuffleFetcher.stop() @@ -89,22 +87,6 @@ class SparkEnv ( // UPDATE: In Akka 2.1.x, this hangs if there are remote actors, so we can't call it. // actorSystem.awaitTermination() } - - private[spark] - def createPythonWorker(pythonExec: String, envVars: Map[String, String]): java.net.Socket = { - synchronized { - val key = (pythonExec, envVars) - pythonWorkers.getOrElseUpdate(key, new PythonWorkerFactory(pythonExec, envVars)).create() - } - } - - private[spark] - def destroyPythonWorker(pythonExec: String, envVars: Map[String, String]) { - synchronized { - val key = (pythonExec, envVars) - pythonWorkers(key).stop() - } - } } object SparkEnv extends Logging { diff --git a/make-distribution.sh b/make-distribution.sh index c05dcd89d90a7..968e279c3c5d5 100755 --- a/make-distribution.sh +++ b/make-distribution.sh @@ -55,6 +55,7 @@ SPARK_HADOOP_VERSION=1.0.4 SPARK_YARN=false SPARK_HIVE=false SPARK_TACHYON=false +SPARK_PYTHON=true MAKE_TGZ=false NAME=none @@ -105,6 +106,12 @@ else echo "YARN disabled" fi +if [ "$SPARK_PYTHON" == "true" ]; then + echo "Python enabled" +else + echo "Python disabled" +fi + if [ "$SPARK_TACHYON" == "true" ]; then echo "Tachyon Enabled" else @@ -122,22 +129,31 @@ else MAYBE_HIVE="" fi +if [[ "$SPARK_HADOOP_VERSION" =~ "0.23." ]]; then + MAYBE_HADOOP023="-Phadoop-0.23" +else + MAYBE_HADOOP023="" +fi + if [ "$SPARK_YARN" == "true" ]; then - if [[ "$SPARK_HADOOP_VERSION" =~ "0.23." ]]; then - mvn clean package -DskipTests -Pyarn-alpha -Dhadoop.version=$SPARK_HADOOP_VERSION \ - -Dyarn.version=$SPARK_HADOOP_VERSION $MAYBE_HIVE -Phadoop-0.23 + if [[ "$SPARK_HADOOP_VERSION" =~ "0.23." || "$SPARK_HADOOP_VERSION" =~ "2.0." ]]; then + MAYBE_YARN="-Pyarn-alpha -Dyarn.version=$SPARK_HADOOP_VERSION" else - mvn clean package -DskipTests -Pyarn -Dhadoop.version=$SPARK_HADOOP_VERSION \ - -Dyarn.version=$SPARK_HADOOP_VERSION $MAYBE_HIVE + MAYBE_YARN="-Pyarn -Dyarn.version=$SPARK_HADOOP_VERSION" fi else - if [[ "$SPARK_HADOOP_VERSION" =~ "0.23." ]]; then - mvn clean package -Phadoop-0.23 -DskipTests -Dhadoop.version=$SPARK_HADOOP_VERSION $MAYBE_HIVE - else - mvn clean package -DskipTests -Dhadoop.version=$SPARK_HADOOP_VERSION $MAYBE_HIVE - fi + MAYBE_YARN="" +fi + +if [ "$SPARK_PYTHON" == "true" ]; then + MAYBE_PYTHON="-Ppython" +else + MAYBE_PYTHON="" fi +mvn clean package -Dhadoop.version=$SPARK_HADOOP_VERSION \ +-DskipTests $MAYBE_HIVE $MAYBE_HADOOP023 $MAYBE_YARN $MAYBE_PYTHON + # Make directories rm -rf "$DISTDIR" mkdir -p "$DISTDIR/lib" @@ -152,9 +168,11 @@ mkdir "$DISTDIR"/conf cp "$FWDIR"/conf/*.template "$DISTDIR"/conf cp "$FWDIR"/conf/slaves "$DISTDIR"/conf cp -r "$FWDIR/bin" "$DISTDIR" -cp -r "$FWDIR/python" "$DISTDIR" cp -r "$FWDIR/sbin" "$DISTDIR" +if [ "$SPARK_PYTHON" == "true" ]; then + cp -r "$FWDIR/python" "$DISTDIR" +fi # Download and copy in tachyon, if requested if [ "$SPARK_TACHYON" == "true" ]; then diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index 471546cd82c7d..4551c8054078d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -19,11 +19,8 @@ package org.apache.spark.mllib.recommendation import org.jblas._ -import org.apache.spark.annotation.DeveloperApi -import org.apache.spark.api.java.JavaRDD import org.apache.spark.rdd.RDD import org.apache.spark.SparkContext._ -import org.apache.spark.mllib.api.python.PythonMLLibAPI /** @@ -68,20 +65,6 @@ class MatrixFactorizationModel( } } - /** - * :: DeveloperApi :: - * Predict the rating of many users for many products. - * This is a Java stub for python predictAll() - * - * @param usersProductsJRDD A JavaRDD with serialized tuples (user, product) - * @return JavaRDD of serialized Rating objects. - */ - def predict(usersProductsJRDD: JavaRDD[Array[Byte]]): JavaRDD[Array[Byte]] = { - val pythonAPI = new PythonMLLibAPI() - val usersProducts = usersProductsJRDD.rdd.map(xBytes => pythonAPI.unpackTuple(xBytes)) - predict(usersProducts).map(rate => pythonAPI.serializeRating(rate)) - } - // TODO: Figure out what other good bulk prediction methods would look like. // Probably want a way to get the top users for a product or vice-versa. } diff --git a/pom.xml b/pom.xml index ebd359a9de17f..98235db013b54 100644 --- a/pom.xml +++ b/pom.xml @@ -94,7 +94,6 @@ streaming sql/catalyst sql/core - sql/hive repl assembly external/twitter @@ -103,6 +102,8 @@ external/zeromq external/mqtt examples + sql/hive + python-api @@ -739,6 +740,10 @@ ${project.build.directory}/SparkTestSuite.txt -Xmx3g -XX:MaxPermSize=${MaxPermGen} -XX:ReservedCodeCacheSize=512m + + ${session.executionRootDirectory} + 1 + @@ -952,6 +957,5 @@ - diff --git a/python-api/pom.xml b/python-api/pom.xml new file mode 100644 index 0000000000000..420aa62effd18 --- /dev/null +++ b/python-api/pom.xml @@ -0,0 +1,116 @@ + + + + + 4.0.0 + + org.apache.spark + spark-parent + 1.0.0-SNAPSHOT + ../pom.xml + + + org.apache.spark + python-api_2.10 + jar + Spark Project Python API + http://spark.apache.org/ + + + + org.apache.spark + spark-core_${scala.binary.version} + ${project.version} + + + org.apache.spark + spark-mllib_${scala.binary.version} + ${project.version} + provided + + + org.apache.spark + spark-sql_${scala.binary.version} + ${project.version} + + + org.spark-project + pyrolite + 2.0.1 + + + org.scalatest + scalatest_${scala.binary.version} + test + + + org.scalacheck + scalacheck_${scala.binary.version} + test + + + + + target/scala-${scala.binary.version}/classes + target/scala-${scala.binary.version}/test-classes + + + + org.codehaus.mojo + exec-maven-plugin + 1.2.1 + + + generate-resources + + exec + + + + + unzip + ../python + + -o + lib/py4j*.zip + -d + build + + + + + + + src/main/resources + + + ../python + + pyspark/*.py + + + + ../python/build + + py4j/*.py + + + + + diff --git a/python-api/src/main/scala/org/apache/spark/PythonSparkEnv.scala b/python-api/src/main/scala/org/apache/spark/PythonSparkEnv.scala new file mode 100644 index 0000000000000..40bc23c6df7fb --- /dev/null +++ b/python-api/src/main/scala/org/apache/spark/PythonSparkEnv.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import scala.collection.JavaConversions._ +import scala.collection.mutable +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.api.python.PythonWorkerFactory + +/** + * :: DeveloperApi :: + * Holds all the runtime environment objects for a running Spark instance (either master or worker), + * including the serializer, Akka actor system, block manager, map output tracker, etc. Currently + * Spark code finds the SparkEnv through a thread-local variable, so each thread that accesses these + * objects needs to have the right SparkEnv set. You can get the current environment with + * SparkEnv.get (e.g. after creating a SparkContext) and set it with SparkEnv.set. + * + * NOTE: This is not intended for external use. This is exposed for Shark and may be made private + * in a future release. + */ +@DeveloperApi +class PythonSparkEnv(val sparkEnv: SparkEnv) { + private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]() + + sparkEnv.closeables += new java.io.Closeable { + override def close() { + pythonWorkers.foreach { + case (key, worker) => worker.stop() + } + } + } + + private[spark] + def createPythonWorker(pythonExec: String, envVars: Map[String, String]): java.net.Socket = { + synchronized { + val key = (pythonExec, envVars) + pythonWorkers.getOrElseUpdate(key, new PythonWorkerFactory(pythonExec, envVars)).create() + } + } + + private[spark] + def destroyPythonWorker(pythonExec: String, envVars: Map[String, String]) { + synchronized { + val key = (pythonExec, envVars) + pythonWorkers(key).stop() + } + } + +} + +object PythonSparkEnv extends Logging { + private val env = new ThreadLocal[PythonSparkEnv] + + def get: PythonSparkEnv = { + if (env.get == null) { + env.set(new PythonSparkEnv(SparkEnv.get)) + } + env.get + } + + def set(e: PythonSparkEnv) { + env.set(e) + } + +} \ No newline at end of file diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala b/python-api/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala similarity index 100% rename from core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala rename to python-api/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/python-api/src/main/scala/org/apache/spark/api/python/PythonRDD.scala similarity index 99% rename from core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala rename to python-api/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 61407007087c6..4d0da61b72367 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/python-api/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -53,7 +53,7 @@ private[spark] class PythonRDD[T: ClassTag]( override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = { val startTime = System.currentTimeMillis - val env = SparkEnv.get + val env = PythonSparkEnv.get val worker: Socket = env.createPythonWorker(pythonExec, envVars.toMap) // Ensure worker socket is closed on task completion. Closing sockets is idempotent. @@ -71,7 +71,7 @@ private[spark] class PythonRDD[T: ClassTag]( new Thread("stdin writer for " + pythonExec) { override def run() { try { - SparkEnv.set(env) + PythonSparkEnv.set(env) val stream = new BufferedOutputStream(worker.getOutputStream, bufferSize) val dataOut = new DataOutputStream(stream) // Partition index diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/python-api/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala similarity index 99% rename from core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala rename to python-api/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala index 02799ce0091b0..8f8b103e42c6a 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala +++ b/python-api/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala @@ -25,7 +25,7 @@ import scala.collection.JavaConversions._ import org.apache.spark._ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String, String]) - extends Logging { + extends Logging { // Because forking processes from Java is expensive, we prefer to launch a single Python daemon // (pyspark/daemon.py) and tell it to fork new workers for our tasks. This daemon currently @@ -86,6 +86,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String // Redirect the worker's stderr to ours new Thread("stderr reader for " + pythonExec) { setDaemon(true) + override def run() { scala.util.control.Exception.ignoring(classOf[IOException]) { // FIXME: We copy the stream on the level of bytes to avoid encoding problems. @@ -103,6 +104,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String // Redirect worker's stdout to our stderr new Thread("stdout reader for " + pythonExec) { setDaemon(true) + override def run() { scala.util.control.Exception.ignoring(classOf[IOException]) { // FIXME: We copy the stream on the level of bytes to avoid encoding problems. @@ -159,6 +161,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String // Redirect the stderr to ours new Thread("stderr reader for " + pythonExec) { setDaemon(true) + override def run() { scala.util.control.Exception.ignoring(classOf[IOException]) { // FIXME: We copy the stream on the level of bytes to avoid encoding problems. @@ -179,6 +182,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String // Redirect further stdout output to our stderr new Thread("stdout reader for " + pythonExec) { setDaemon(true) + override def run() { scala.util.control.Exception.ignoring(classOf[IOException]) { // FIXME: We copy the stream on the level of bytes to avoid encoding problems. diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/python-api/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala similarity index 95% rename from mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala rename to python-api/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala index 7c65b0d4750fa..4ca5265a9e2fc 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala +++ b/python-api/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala @@ -405,4 +405,21 @@ class PythonMLLibAPI extends Serializable { val ratings = ratingsBytesJRDD.rdd.map(unpackRating) ALS.trainImplicit(ratings, rank, iterations, lambda, blocks, alpha) } + + /** + * :: DeveloperApi :: + * Predict the rating of many users for many products. + * This is a Java stub for python predictAll() + * + * @param model A ALS Model + * @param usersProductsJRDD A JavaRDD with serialized tuples (user, product) + * @return JavaRDD of serialized Rating objects. + */ + def alsModelPredictAll( + model: MatrixFactorizationModel,usersProductsJRDD: JavaRDD[Array[Byte]]): + JavaRDD[Array[Byte]] = { + val usersProducts = usersProductsJRDD.rdd.map(xBytes => unpackTuple(xBytes)) + model.predict(usersProducts).map(rate => serializeRating(rate)) + } + } diff --git a/python-api/src/main/scala/org/apache/spark/sql/api/python/PythonSQLAPI.scala b/python-api/src/main/scala/org/apache/spark/sql/api/python/PythonSQLAPI.scala new file mode 100644 index 0000000000000..796518a55ed76 --- /dev/null +++ b/python-api/src/main/scala/org/apache/spark/sql/api/python/PythonSQLAPI.scala @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.api.python + +import net.razorvine.pickle.Pickler +import org.apache.spark.annotation.DeveloperApi +import org.apache.spark.sql.SchemaRDD +import org.apache.spark.api.java.JavaRDD +import java.util.{Map => JMap} + + +/** + * :: DeveloperApi :: + * The Java stubs necessary for the Python mllib bindings. + * + * See python/pyspark/mllib/_common.py for the mutually agreed upon data format. + */ +@DeveloperApi +class PythonSQLAPI extends Serializable { + + def javaToPython(rdd: SchemaRDD): JavaRDD[Array[Byte]] = { + val fieldNames: Seq[String] = rdd.queryExecution.analyzed.output.map(_.name) + rdd.mapPartitions { + iter => + val pickle = new Pickler + iter.map { + row => + val map: JMap[String, Any] = new java.util.HashMap + // TODO: We place the map in an ArrayList so that the object is pickled to a List[Dict]. + // Ideally we should be able to pickle an object directly into a Python collection so we + // don't have to create an ArrayList every time. + val arr: java.util.ArrayList[Any] = new java.util.ArrayList + row.zip(fieldNames).foreach { + case (obj, name) => + map.put(name, obj) + } + arr.add(map) + pickle.dumps(arr) + } + } + } +} diff --git a/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala b/python-api/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala similarity index 100% rename from core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala rename to python-api/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala diff --git a/python/pyspark/mllib/recommendation.py b/python/pyspark/mllib/recommendation.py index f4a83f0209e27..80f3358543baf 100644 --- a/python/pyspark/mllib/recommendation.py +++ b/python/pyspark/mllib/recommendation.py @@ -52,7 +52,8 @@ def predict(self, user, product): def predictAll(self, usersProducts): usersProductsJRDD = _get_unmangled_rdd(usersProducts, _serialize_tuple) - return RDD(self._java_model.predict(usersProductsJRDD._jrdd), + mllibapi=usersProducts.context._jvm.PythonMLLibAPI() + return RDD(mllibapi.alsModelPredictAll(self._java_model, usersProductsJRDD._jrdd), self._context, RatingDeserializer()) class ALS(object): diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py index 1a62031db5c41..50c76433faa75 100644 --- a/python/pyspark/sql.py +++ b/python/pyspark/sql.py @@ -322,7 +322,8 @@ def _toPython(self): # We have to import the Row class explicitly, so that the reference Pickler has is # pyspark.sql.Row instead of __main__.Row from pyspark.sql import Row - jrdd = self._jschema_rdd.javaToPython() + sqlapi= usersProducts.context._jvm.PythonSQLAPI() + jrdd = sqlapi.javaToPython(self._jschema_rdd) # TODO: This is inefficient, we should construct the Python Row object # in Java land in the javaToPython function. May require a custom # pickle serializer in Pyrolite diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala index d7782d6b32819..b13f134785bf7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala @@ -17,8 +17,6 @@ package org.apache.spark.sql -import net.razorvine.pickle.Pickler - import org.apache.spark.{Dependency, OneToOneDependency, Partition, TaskContext} import org.apache.spark.annotation.{AlphaComponent, Experimental} import org.apache.spark.rdd.RDD @@ -27,8 +25,6 @@ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.{Inner, JoinType} import org.apache.spark.sql.catalyst.types.BooleanType -import org.apache.spark.api.java.JavaRDD -import java.util.{Map => JMap} /** * :: AlphaComponent :: @@ -296,22 +292,4 @@ class SchemaRDD( */ def toSchemaRDD = this - private[sql] def javaToPython: JavaRDD[Array[Byte]] = { - val fieldNames: Seq[String] = this.queryExecution.analyzed.output.map(_.name) - this.mapPartitions { iter => - val pickle = new Pickler - iter.map { row => - val map: JMap[String, Any] = new java.util.HashMap - // TODO: We place the map in an ArrayList so that the object is pickled to a List[Dict]. - // Ideally we should be able to pickle an object directly into a Python collection so we - // don't have to create an ArrayList every time. - val arr: java.util.ArrayList[Any] = new java.util.ArrayList - row.zip(fieldNames).foreach { case (obj, name) => - map.put(name, obj) - } - arr.add(map) - pickle.dumps(arr) - } - } - } }