Skip to content

Commit ab6025d

Browse files
committed
compiling
1 parent 268b535 commit ab6025d

File tree

3 files changed

+39
-1
lines changed

3 files changed

+39
-1
lines changed

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collectio
2525
import scala.collection.JavaConversions._
2626
import scala.reflect.ClassTag
2727

28+
import net.razorvine.pickle.Unpickler
29+
2830
import org.apache.spark._
2931
import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD}
3032
import org.apache.spark.broadcast.Broadcast
@@ -284,6 +286,14 @@ private[spark] object PythonRDD {
284286
file.close()
285287
}
286288

289+
def pythonToJava(pyRDD: JavaRDD[Array[Byte]]): JavaRDD[_] = {
290+
pyRDD.rdd.mapPartitions { iter =>
291+
val unpickle = new Unpickler
292+
iter.map { row =>
293+
unpickle.loads(row)
294+
}
295+
}
296+
}
287297
}
288298

289299
private

project/SparkBuild.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,8 @@ object SparkBuild extends Build {
345345
"com.twitter" %% "chill" % chillVersion excludeAll(excludeAsm),
346346
"com.twitter" % "chill-java" % chillVersion excludeAll(excludeAsm),
347347
"org.tachyonproject" % "tachyon" % "0.4.1-thrift" excludeAll(excludeHadoop, excludeCurator, excludeEclipseJetty, excludePowermock),
348-
"com.clearspring.analytics" % "stream" % "2.5.1"
348+
"com.clearspring.analytics" % "stream" % "2.5.1",
349+
"net.razorvine" % "pyrolite_2.10" % "1.1"
349350
),
350351
libraryDependencies ++= maybeAvro
351352
)

sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSQLContext.scala

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,33 @@ class JavaSQLContext(sparkContext: JavaSparkContext) {
8282
new JavaSchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(schema, rowRdd)))
8383
}
8484

85+
/**
86+
* Applies a schema to an RDD of Array[Any]
87+
*/
88+
def applySchema(rdd: JavaRDD[Array[Any]]): JavaSchemaRDD = {
89+
val fields = rdd.first.map(_.getClass)
90+
val schema = fields.zipWithIndex.map { case (klass, index) =>
91+
val dataType = klass match {
92+
case c: Class[_] if c == classOf[java.lang.String] => StringType
93+
case c: Class[_] if c == java.lang.Short.TYPE => ShortType
94+
case c: Class[_] if c == java.lang.Integer.TYPE => IntegerType
95+
case c: Class[_] if c == java.lang.Long.TYPE => LongType
96+
case c: Class[_] if c == java.lang.Double.TYPE => DoubleType
97+
case c: Class[_] if c == java.lang.Byte.TYPE => ByteType
98+
case c: Class[_] if c == java.lang.Float.TYPE => FloatType
99+
case c: Class[_] if c == java.lang.Boolean.TYPE => BooleanType
100+
}
101+
102+
AttributeReference(index.toString, dataType, true)()
103+
}
104+
105+
val rowRdd = rdd.rdd.mapPartitions { iter =>
106+
iter.map { row =>
107+
new GenericRow(row): ScalaRow
108+
}
109+
}
110+
new JavaSchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(schema, rowRdd)))
111+
}
85112

86113
/**
87114
* Loads a parquet file, returning the result as a [[JavaSchemaRDD]].

0 commit comments

Comments
 (0)