Skip to content

Commit 4886052

Browse files
committed
even better
1 parent c0fb1c6 commit 4886052

File tree

2 files changed

+14
-1
lines changed

2 files changed

+14
-1
lines changed

python/pyspark/rdd.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1398,7 +1398,8 @@ def registerAsTable(self, name):
13981398
self._jschema_rdd.registerAsTable(name)
13991399

14001400
def toPython(self):
1401-
jrdd = self._sc._javaToPython(self._jschema_rdd)
1401+
jrdd = self._jschema_rdd.javaToPython()
1402+
#jrdd = self._sc._javaToPython(self._jschema_rdd)
14021403
return RDD(jrdd, self._sc, self._sc.serializer)
14031404

14041405
def _test():

sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.spark.sql.api.java
1919

20+
import net.razorvine.pickle.{Pickler, Unpickler}
21+
2022
import org.apache.spark.api.java.{JavaRDDLike, JavaRDD}
2123
import org.apache.spark.sql.{SQLContext, SchemaRDD, SchemaRDDLike}
2224
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -45,4 +47,14 @@ class JavaSchemaRDD(
4547
override def wrapRDD(rdd: RDD[Row]): JavaRDD[Row] = JavaRDD.fromRDD(rdd)
4648

4749
val rdd = baseSchemaRDD.map(new Row(_))
50+
51+
def javaToPython: JavaRDD[Array[Byte]] = {
52+
this.rdd.mapPartitions { iter =>
53+
val unpickle = new Pickler
54+
iter.map { row =>
55+
val fields: Array[Any] = (for (i <- 0 to row.length - 1) yield row.get(i)).toArray
56+
unpickle.dumps(fields)
57+
}
58+
}
59+
}
4860
}

0 commit comments

Comments
 (0)