diff --git a/assembly/pom.xml b/assembly/pom.xml
index bdb38806492a6..057324e889076 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -84,11 +84,6 @@
spark-sql_${scala.binary.version}
${project.version}
-
- net.sf.py4j
- py4j
- 0.8.1
-
@@ -173,6 +168,21 @@
+
+ python
+
+
+ net.sf.py4j
+ py4j
+ 0.8.1
+
+
+ org.apache.spark
+ python-api_${scala.binary.version}
+ ${project.version}
+
+
+
spark-ganglia-lgpl
diff --git a/core/pom.xml b/core/pom.xml
index 822b5b1dd7cc2..ff8f3e7631463 100644
--- a/core/pom.xml
+++ b/core/pom.xml
@@ -244,11 +244,6 @@
junit-interface
test
-
- org.spark-project
- pyrolite
- 2.0.1
-
target/scala-${scala.binary.version}/classes
@@ -294,48 +289,6 @@
-
-
- org.codehaus.mojo
- exec-maven-plugin
- 1.2.1
-
-
- generate-resources
-
- exec
-
-
-
-
- unzip
- ../python
-
- -o
- lib/py4j*.zip
- -d
- build
-
-
-
-
-
-
- src/main/resources
-
-
- ../python
-
- pyspark/*.py
-
-
-
- ../python/build
-
- py4j/*.py
-
-
-
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index bea435ec34ce9..454a6e744e4db 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -26,7 +26,6 @@ import akka.actor._
import com.google.common.collect.MapMaker
import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.api.python.PythonWorkerFactory
import org.apache.spark.broadcast.BroadcastManager
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.network.ConnectionManager
@@ -67,15 +66,14 @@ class SparkEnv (
// A mapping of thread ID to amount of memory used for shuffle in bytes
// All accesses should be manually synchronized
val shuffleMemoryMap = mutable.HashMap[Long, Long]()
-
- private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
+ val closeables = mutable.ListBuffer[java.io.Closeable]()
// A general, soft-reference map for metadata needed during HadoopRDD split computation
// (e.g., HadoopFileRDD uses this to cache JobConfs and InputFormats).
private[spark] val hadoopJobMetadata = new MapMaker().softValues().makeMap[String, Any]()
private[spark] def stop() {
- pythonWorkers.foreach { case(key, worker) => worker.stop() }
+ closeables.toList.foreach(_.close())
httpFileServer.stop()
mapOutputTracker.stop()
shuffleFetcher.stop()
@@ -89,22 +87,6 @@ class SparkEnv (
// UPDATE: In Akka 2.1.x, this hangs if there are remote actors, so we can't call it.
// actorSystem.awaitTermination()
}
-
- private[spark]
- def createPythonWorker(pythonExec: String, envVars: Map[String, String]): java.net.Socket = {
- synchronized {
- val key = (pythonExec, envVars)
- pythonWorkers.getOrElseUpdate(key, new PythonWorkerFactory(pythonExec, envVars)).create()
- }
- }
-
- private[spark]
- def destroyPythonWorker(pythonExec: String, envVars: Map[String, String]) {
- synchronized {
- val key = (pythonExec, envVars)
- pythonWorkers(key).stop()
- }
- }
}
object SparkEnv extends Logging {
diff --git a/make-distribution.sh b/make-distribution.sh
index c05dcd89d90a7..968e279c3c5d5 100755
--- a/make-distribution.sh
+++ b/make-distribution.sh
@@ -55,6 +55,7 @@ SPARK_HADOOP_VERSION=1.0.4
SPARK_YARN=false
SPARK_HIVE=false
SPARK_TACHYON=false
+SPARK_PYTHON=true
MAKE_TGZ=false
NAME=none
@@ -105,6 +106,12 @@ else
echo "YARN disabled"
fi
+if [ "$SPARK_PYTHON" == "true" ]; then
+ echo "Python enabled"
+else
+ echo "Python disabled"
+fi
+
if [ "$SPARK_TACHYON" == "true" ]; then
echo "Tachyon Enabled"
else
@@ -122,22 +129,31 @@ else
MAYBE_HIVE=""
fi
+if [[ "$SPARK_HADOOP_VERSION" =~ "0.23." ]]; then
+ MAYBE_HADOOP023="-Phadoop-0.23"
+else
+ MAYBE_HADOOP023=""
+fi
+
if [ "$SPARK_YARN" == "true" ]; then
- if [[ "$SPARK_HADOOP_VERSION" =~ "0.23." ]]; then
- mvn clean package -DskipTests -Pyarn-alpha -Dhadoop.version=$SPARK_HADOOP_VERSION \
- -Dyarn.version=$SPARK_HADOOP_VERSION $MAYBE_HIVE -Phadoop-0.23
+ if [[ "$SPARK_HADOOP_VERSION" =~ "0.23." || "$SPARK_HADOOP_VERSION" =~ "2.0." ]]; then
+ MAYBE_YARN="-Pyarn-alpha -Dyarn.version=$SPARK_HADOOP_VERSION"
else
- mvn clean package -DskipTests -Pyarn -Dhadoop.version=$SPARK_HADOOP_VERSION \
- -Dyarn.version=$SPARK_HADOOP_VERSION $MAYBE_HIVE
+ MAYBE_YARN="-Pyarn -Dyarn.version=$SPARK_HADOOP_VERSION"
fi
else
- if [[ "$SPARK_HADOOP_VERSION" =~ "0.23." ]]; then
- mvn clean package -Phadoop-0.23 -DskipTests -Dhadoop.version=$SPARK_HADOOP_VERSION $MAYBE_HIVE
- else
- mvn clean package -DskipTests -Dhadoop.version=$SPARK_HADOOP_VERSION $MAYBE_HIVE
- fi
+ MAYBE_YARN=""
+fi
+
+if [ "$SPARK_PYTHON" == "true" ]; then
+ MAYBE_PYTHON="-Ppython"
+else
+ MAYBE_PYTHON=""
fi
+mvn clean package -Dhadoop.version=$SPARK_HADOOP_VERSION \
+-DskipTests $MAYBE_HIVE $MAYBE_HADOOP023 $MAYBE_YARN $MAYBE_PYTHON
+
# Make directories
rm -rf "$DISTDIR"
mkdir -p "$DISTDIR/lib"
@@ -152,9 +168,11 @@ mkdir "$DISTDIR"/conf
cp "$FWDIR"/conf/*.template "$DISTDIR"/conf
cp "$FWDIR"/conf/slaves "$DISTDIR"/conf
cp -r "$FWDIR/bin" "$DISTDIR"
-cp -r "$FWDIR/python" "$DISTDIR"
cp -r "$FWDIR/sbin" "$DISTDIR"
+if [ "$SPARK_PYTHON" == "true" ]; then
+ cp -r "$FWDIR/python" "$DISTDIR"
+fi
# Download and copy in tachyon, if requested
if [ "$SPARK_TACHYON" == "true" ]; then
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
index 471546cd82c7d..4551c8054078d 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala
@@ -19,11 +19,8 @@ package org.apache.spark.mllib.recommendation
import org.jblas._
-import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.api.java.JavaRDD
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext._
-import org.apache.spark.mllib.api.python.PythonMLLibAPI
/**
@@ -68,20 +65,6 @@ class MatrixFactorizationModel(
}
}
- /**
- * :: DeveloperApi ::
- * Predict the rating of many users for many products.
- * This is a Java stub for python predictAll()
- *
- * @param usersProductsJRDD A JavaRDD with serialized tuples (user, product)
- * @return JavaRDD of serialized Rating objects.
- */
- def predict(usersProductsJRDD: JavaRDD[Array[Byte]]): JavaRDD[Array[Byte]] = {
- val pythonAPI = new PythonMLLibAPI()
- val usersProducts = usersProductsJRDD.rdd.map(xBytes => pythonAPI.unpackTuple(xBytes))
- predict(usersProducts).map(rate => pythonAPI.serializeRating(rate))
- }
-
// TODO: Figure out what other good bulk prediction methods would look like.
// Probably want a way to get the top users for a product or vice-versa.
}
diff --git a/pom.xml b/pom.xml
index ebd359a9de17f..98235db013b54 100644
--- a/pom.xml
+++ b/pom.xml
@@ -94,7 +94,6 @@
streaming
sql/catalyst
sql/core
- sql/hive
repl
assembly
external/twitter
@@ -103,6 +102,8 @@
external/zeromq
external/mqtt
examples
+ sql/hive
+ python-api
@@ -739,6 +740,10 @@
${project.build.directory}/SparkTestSuite.txt
-Xmx3g -XX:MaxPermSize=${MaxPermGen} -XX:ReservedCodeCacheSize=512m
+
+ ${session.executionRootDirectory}
+ 1
+
@@ -952,6 +957,5 @@
-
diff --git a/python-api/pom.xml b/python-api/pom.xml
new file mode 100644
index 0000000000000..420aa62effd18
--- /dev/null
+++ b/python-api/pom.xml
@@ -0,0 +1,116 @@
+
+
+
+
+ 4.0.0
+
+ org.apache.spark
+ spark-parent
+ 1.0.0-SNAPSHOT
+ ../pom.xml
+
+
+ org.apache.spark
+ python-api_2.10
+ jar
+ Spark Project Python API
+ http://spark.apache.org/
+
+
+
+ org.apache.spark
+ spark-core_${scala.binary.version}
+ ${project.version}
+
+
+ org.apache.spark
+ spark-mllib_${scala.binary.version}
+ ${project.version}
+ provided
+
+
+ org.apache.spark
+ spark-sql_${scala.binary.version}
+ ${project.version}
+
+
+ org.spark-project
+ pyrolite
+ 2.0.1
+
+
+ org.scalatest
+ scalatest_${scala.binary.version}
+ test
+
+
+ org.scalacheck
+ scalacheck_${scala.binary.version}
+ test
+
+
+
+
+ target/scala-${scala.binary.version}/classes
+ target/scala-${scala.binary.version}/test-classes
+
+
+
+ org.codehaus.mojo
+ exec-maven-plugin
+ 1.2.1
+
+
+ generate-resources
+
+ exec
+
+
+
+
+ unzip
+ ../python
+
+ -o
+ lib/py4j*.zip
+ -d
+ build
+
+
+
+
+
+
+ src/main/resources
+
+
+ ../python
+
+ pyspark/*.py
+
+
+
+ ../python/build
+
+ py4j/*.py
+
+
+
+
+
diff --git a/python-api/src/main/scala/org/apache/spark/PythonSparkEnv.scala b/python-api/src/main/scala/org/apache/spark/PythonSparkEnv.scala
new file mode 100644
index 0000000000000..40bc23c6df7fb
--- /dev/null
+++ b/python-api/src/main/scala/org/apache/spark/PythonSparkEnv.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark
+
+import scala.collection.JavaConversions._
+import scala.collection.mutable
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.api.python.PythonWorkerFactory
+
+/**
+ * :: DeveloperApi ::
+ * Holds all the runtime environment objects for a running Spark instance (either master or worker),
+ * including the serializer, Akka actor system, block manager, map output tracker, etc. Currently
+ * Spark code finds the SparkEnv through a thread-local variable, so each thread that accesses these
+ * objects needs to have the right SparkEnv set. You can get the current environment with
+ * SparkEnv.get (e.g. after creating a SparkContext) and set it with SparkEnv.set.
+ *
+ * NOTE: This is not intended for external use. This is exposed for Shark and may be made private
+ * in a future release.
+ */
+@DeveloperApi
+class PythonSparkEnv(val sparkEnv: SparkEnv) {
+ private val pythonWorkers = mutable.HashMap[(String, Map[String, String]), PythonWorkerFactory]()
+
+ sparkEnv.closeables += new java.io.Closeable {
+ override def close() {
+ pythonWorkers.foreach {
+ case (key, worker) => worker.stop()
+ }
+ }
+ }
+
+ private[spark]
+ def createPythonWorker(pythonExec: String, envVars: Map[String, String]): java.net.Socket = {
+ synchronized {
+ val key = (pythonExec, envVars)
+ pythonWorkers.getOrElseUpdate(key, new PythonWorkerFactory(pythonExec, envVars)).create()
+ }
+ }
+
+ private[spark]
+ def destroyPythonWorker(pythonExec: String, envVars: Map[String, String]) {
+ synchronized {
+ val key = (pythonExec, envVars)
+ pythonWorkers(key).stop()
+ }
+ }
+
+}
+
+object PythonSparkEnv extends Logging {
+ private val env = new ThreadLocal[PythonSparkEnv]
+
+ def get: PythonSparkEnv = {
+ if (env.get == null) {
+ env.set(new PythonSparkEnv(SparkEnv.get))
+ }
+ env.get
+ }
+
+ def set(e: PythonSparkEnv) {
+ env.set(e)
+ }
+
+}
\ No newline at end of file
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala b/python-api/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala
similarity index 100%
rename from core/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala
rename to python-api/src/main/scala/org/apache/spark/api/python/PythonPartitioner.scala
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/python-api/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
similarity index 99%
rename from core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
rename to python-api/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 61407007087c6..4d0da61b72367 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/python-api/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -53,7 +53,7 @@ private[spark] class PythonRDD[T: ClassTag](
override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = {
val startTime = System.currentTimeMillis
- val env = SparkEnv.get
+ val env = PythonSparkEnv.get
val worker: Socket = env.createPythonWorker(pythonExec, envVars.toMap)
// Ensure worker socket is closed on task completion. Closing sockets is idempotent.
@@ -71,7 +71,7 @@ private[spark] class PythonRDD[T: ClassTag](
new Thread("stdin writer for " + pythonExec) {
override def run() {
try {
- SparkEnv.set(env)
+ PythonSparkEnv.set(env)
val stream = new BufferedOutputStream(worker.getOutputStream, bufferSize)
val dataOut = new DataOutputStream(stream)
// Partition index
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/python-api/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
similarity index 99%
rename from core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
rename to python-api/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
index 02799ce0091b0..8f8b103e42c6a 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
+++ b/python-api/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
@@ -25,7 +25,7 @@ import scala.collection.JavaConversions._
import org.apache.spark._
private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String, String])
- extends Logging {
+ extends Logging {
// Because forking processes from Java is expensive, we prefer to launch a single Python daemon
// (pyspark/daemon.py) and tell it to fork new workers for our tasks. This daemon currently
@@ -86,6 +86,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
// Redirect the worker's stderr to ours
new Thread("stderr reader for " + pythonExec) {
setDaemon(true)
+
override def run() {
scala.util.control.Exception.ignoring(classOf[IOException]) {
// FIXME: We copy the stream on the level of bytes to avoid encoding problems.
@@ -103,6 +104,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
// Redirect worker's stdout to our stderr
new Thread("stdout reader for " + pythonExec) {
setDaemon(true)
+
override def run() {
scala.util.control.Exception.ignoring(classOf[IOException]) {
// FIXME: We copy the stream on the level of bytes to avoid encoding problems.
@@ -159,6 +161,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
// Redirect the stderr to ours
new Thread("stderr reader for " + pythonExec) {
setDaemon(true)
+
override def run() {
scala.util.control.Exception.ignoring(classOf[IOException]) {
// FIXME: We copy the stream on the level of bytes to avoid encoding problems.
@@ -179,6 +182,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
// Redirect further stdout output to our stderr
new Thread("stdout reader for " + pythonExec) {
setDaemon(true)
+
override def run() {
scala.util.control.Exception.ignoring(classOf[IOException]) {
// FIXME: We copy the stream on the level of bytes to avoid encoding problems.
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/python-api/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
similarity index 95%
rename from mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
rename to python-api/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index 7c65b0d4750fa..4ca5265a9e2fc 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ b/python-api/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -405,4 +405,21 @@ class PythonMLLibAPI extends Serializable {
val ratings = ratingsBytesJRDD.rdd.map(unpackRating)
ALS.trainImplicit(ratings, rank, iterations, lambda, blocks, alpha)
}
+
+ /**
+ * :: DeveloperApi ::
+ * Predict the rating of many users for many products.
+ * This is a Java stub for python predictAll()
+ *
+ * @param model A ALS Model
+ * @param usersProductsJRDD A JavaRDD with serialized tuples (user, product)
+ * @return JavaRDD of serialized Rating objects.
+ */
+ def alsModelPredictAll(
+ model: MatrixFactorizationModel,usersProductsJRDD: JavaRDD[Array[Byte]]):
+ JavaRDD[Array[Byte]] = {
+ val usersProducts = usersProductsJRDD.rdd.map(xBytes => unpackTuple(xBytes))
+ model.predict(usersProducts).map(rate => serializeRating(rate))
+ }
+
}
diff --git a/python-api/src/main/scala/org/apache/spark/sql/api/python/PythonSQLAPI.scala b/python-api/src/main/scala/org/apache/spark/sql/api/python/PythonSQLAPI.scala
new file mode 100644
index 0000000000000..796518a55ed76
--- /dev/null
+++ b/python-api/src/main/scala/org/apache/spark/sql/api/python/PythonSQLAPI.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.api.python
+
+import net.razorvine.pickle.Pickler
+import org.apache.spark.annotation.DeveloperApi
+import org.apache.spark.sql.SchemaRDD
+import org.apache.spark.api.java.JavaRDD
+import java.util.{Map => JMap}
+
+
+/**
+ * :: DeveloperApi ::
+ * The Java stubs necessary for the Python mllib bindings.
+ *
+ * See python/pyspark/mllib/_common.py for the mutually agreed upon data format.
+ */
+@DeveloperApi
+class PythonSQLAPI extends Serializable {
+
+ def javaToPython(rdd: SchemaRDD): JavaRDD[Array[Byte]] = {
+ val fieldNames: Seq[String] = rdd.queryExecution.analyzed.output.map(_.name)
+ rdd.mapPartitions {
+ iter =>
+ val pickle = new Pickler
+ iter.map {
+ row =>
+ val map: JMap[String, Any] = new java.util.HashMap
+ // TODO: We place the map in an ArrayList so that the object is pickled to a List[Dict].
+ // Ideally we should be able to pickle an object directly into a Python collection so we
+ // don't have to create an ArrayList every time.
+ val arr: java.util.ArrayList[Any] = new java.util.ArrayList
+ row.zip(fieldNames).foreach {
+ case (obj, name) =>
+ map.put(name, obj)
+ }
+ arr.add(map)
+ pickle.dumps(arr)
+ }
+ }
+ }
+}
diff --git a/core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala b/python-api/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala
similarity index 100%
rename from core/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala
rename to python-api/src/test/scala/org/apache/spark/api/python/PythonRDDSuite.scala
diff --git a/python/pyspark/mllib/recommendation.py b/python/pyspark/mllib/recommendation.py
index f4a83f0209e27..80f3358543baf 100644
--- a/python/pyspark/mllib/recommendation.py
+++ b/python/pyspark/mllib/recommendation.py
@@ -52,7 +52,8 @@ def predict(self, user, product):
def predictAll(self, usersProducts):
usersProductsJRDD = _get_unmangled_rdd(usersProducts, _serialize_tuple)
- return RDD(self._java_model.predict(usersProductsJRDD._jrdd),
+ mllibapi=usersProducts.context._jvm.PythonMLLibAPI()
+ return RDD(mllibapi.alsModelPredictAll(self._java_model, usersProductsJRDD._jrdd),
self._context, RatingDeserializer())
class ALS(object):
diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py
index 1a62031db5c41..50c76433faa75 100644
--- a/python/pyspark/sql.py
+++ b/python/pyspark/sql.py
@@ -322,7 +322,8 @@ def _toPython(self):
# We have to import the Row class explicitly, so that the reference Pickler has is
# pyspark.sql.Row instead of __main__.Row
from pyspark.sql import Row
- jrdd = self._jschema_rdd.javaToPython()
+ sqlapi= usersProducts.context._jvm.PythonSQLAPI()
+ jrdd = sqlapi.javaToPython(self._jschema_rdd)
# TODO: This is inefficient, we should construct the Python Row object
# in Java land in the javaToPython function. May require a custom
# pickle serializer in Pyrolite
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
index d7782d6b32819..b13f134785bf7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala
@@ -17,8 +17,6 @@
package org.apache.spark.sql
-import net.razorvine.pickle.Pickler
-
import org.apache.spark.{Dependency, OneToOneDependency, Partition, TaskContext}
import org.apache.spark.annotation.{AlphaComponent, Experimental}
import org.apache.spark.rdd.RDD
@@ -27,8 +25,6 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.{Inner, JoinType}
import org.apache.spark.sql.catalyst.types.BooleanType
-import org.apache.spark.api.java.JavaRDD
-import java.util.{Map => JMap}
/**
* :: AlphaComponent ::
@@ -296,22 +292,4 @@ class SchemaRDD(
*/
def toSchemaRDD = this
- private[sql] def javaToPython: JavaRDD[Array[Byte]] = {
- val fieldNames: Seq[String] = this.queryExecution.analyzed.output.map(_.name)
- this.mapPartitions { iter =>
- val pickle = new Pickler
- iter.map { row =>
- val map: JMap[String, Any] = new java.util.HashMap
- // TODO: We place the map in an ArrayList so that the object is pickled to a List[Dict].
- // Ideally we should be able to pickle an object directly into a Python collection so we
- // don't have to create an ArrayList every time.
- val arr: java.util.ArrayList[Any] = new java.util.ArrayList
- row.zip(fieldNames).foreach { case (obj, name) =>
- map.put(name, obj)
- }
- arr.add(map)
- pickle.dumps(arr)
- }
- }
- }
}