Skip to content

Commit 44f2857

Browse files
committed
Remove msgpack dependency and switch serialization to Pyrolite, plus some clean up and refactoring
1 parent c0ebfb6 commit 44f2857

File tree

4 files changed

+109
-117
lines changed

4 files changed

+109
-117
lines changed

core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,20 @@
1717

1818
package org.apache.spark.api.python
1919

20+
import org.apache.spark.rdd.RDD
21+
import org.apache.spark.SparkContext
2022
import org.apache.hadoop.conf.Configuration
23+
import org.apache.hadoop.io._
24+
2125

2226
/**
2327
* Utilities for working with Python objects -> Hadoop-related objects
2428
*/
2529
private[python] object PythonHadoopUtil {
2630

31+
/**
32+
* Convert a Map of properties to a [[org.apache.hadoop.conf.Configuration]]
33+
*/
2734
def mapToConf(map: java.util.Map[String, String]) = {
2835
import collection.JavaConversions._
2936
val conf = new Configuration()
@@ -42,4 +49,38 @@ private[python] object PythonHadoopUtil {
4249
copy
4350
}
4451

52+
/**
53+
* Converts an RDD of key-value pairs, where key and/or value could be instances of
54+
* [[org.apache.hadoop.io.Writable]], into an RDD[(K, V)]
55+
*/
56+
def convertRDD[K, V](rdd: RDD[(K, V)]) = {
57+
rdd.map{
58+
case (k: Writable, v: Writable) => (convert(k).asInstanceOf[K], convert(v).asInstanceOf[V])
59+
case (k: Writable, v) => (convert(k).asInstanceOf[K], v.asInstanceOf[V])
60+
case (k, v: Writable) => (k.asInstanceOf[K], convert(v).asInstanceOf[V])
61+
case (k, v) => (k.asInstanceOf[K], v.asInstanceOf[V])
62+
}
63+
}
64+
65+
/**
66+
* Converts a [[org.apache.hadoop.io.Writable]] to the underlying primitive, String or
67+
* object representation
68+
*/
69+
private def convert(writable: Writable): Any = {
70+
import collection.JavaConversions._
71+
writable match {
72+
case iw: IntWritable => SparkContext.intWritableConverter().convert(iw)
73+
case dw: DoubleWritable => SparkContext.doubleWritableConverter().convert(dw)
74+
case lw: LongWritable => SparkContext.longWritableConverter().convert(lw)
75+
case fw: FloatWritable => SparkContext.floatWritableConverter().convert(fw)
76+
case t: Text => SparkContext.stringWritableConverter().convert(t)
77+
case bw: BooleanWritable => SparkContext.booleanWritableConverter().convert(bw)
78+
case byw: BytesWritable => SparkContext.bytesWritableConverter().convert(byw)
79+
case n: NullWritable => null
80+
case aw: ArrayWritable => aw.get().map(convert(_))
81+
case mw: MapWritable => mapAsJavaMap(mw.map{ case (k, v) => (convert(k), convert(v)) }.toMap)
82+
case other => other
83+
}
84+
}
85+
4586
}

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,7 @@ private object SpecialLengths {
211211
val TIMING_DATA = -3
212212
}
213213

214-
private[spark] object PythonRDD {
214+
private[spark] object PythonRDD extends Logging {
215215
val UTF8 = Charset.forName("UTF-8")
216216

217217
def readRDDFromFile(sc: JavaSparkContext, filename: String, parallelism: Int):
@@ -273,20 +273,21 @@ private[spark] object PythonRDD {
273273
}
274274

275275
/** Create and RDD from a path using [[org.apache.hadoop.mapred.SequenceFileInputFormat]] */
276-
def sequenceFile[K, V](sc: JavaSparkContext,
277-
path: String,
278-
keyClass: String,
279-
valueClass: String,
280-
keyWrapper: String,
281-
valueWrapper: String,
282-
minSplits: Int) = {
276+
def sequenceFile[K, V](
277+
sc: JavaSparkContext,
278+
path: String,
279+
keyClass: String,
280+
valueClass: String,
281+
keyWrapper: String,
282+
valueWrapper: String,
283+
minSplits: Int) = {
283284
implicit val kcm = ClassTag(Class.forName(keyClass)).asInstanceOf[ClassTag[K]]
284285
implicit val vcm = ClassTag(Class.forName(valueClass)).asInstanceOf[ClassTag[V]]
285286
val kc = kcm.runtimeClass.asInstanceOf[Class[K]]
286287
val vc = vcm.runtimeClass.asInstanceOf[Class[V]]
287288
val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits)
288-
val converted = SerDeUtil.convertRDD[K, V](rdd)
289-
JavaRDD.fromRDD(SerDeUtil.serMsgPack[K, V](converted))
289+
val converted = PythonHadoopUtil.convertRDD[K, V](rdd)
290+
JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
290291
}
291292

292293
/**
@@ -308,8 +309,8 @@ private[spark] object PythonRDD {
308309
val rdd =
309310
newAPIHadoopRDDFromClassNames[K, V, F](sc,
310311
Some(path), inputFormatClazz, keyClazz, valueClazz, mergedConf)
311-
val converted = SerDeUtil.convertRDD[K, V](rdd)
312-
JavaRDD.fromRDD(SerDeUtil.serMsgPack[K, V](converted))
312+
val converted = PythonHadoopUtil.convertRDD[K, V](rdd)
313+
JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
313314
}
314315

315316
/**
@@ -329,8 +330,8 @@ private[spark] object PythonRDD {
329330
val rdd =
330331
newAPIHadoopRDDFromClassNames[K, V, F](sc,
331332
None, inputFormatClazz, keyClazz, valueClazz, conf)
332-
val converted = SerDeUtil.convertRDD[K, V](rdd)
333-
JavaRDD.fromRDD(SerDeUtil.serMsgPack[K, V](converted))
333+
val converted = PythonHadoopUtil.convertRDD[K, V](rdd)
334+
JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
334335
}
335336

336337
private def newAPIHadoopRDDFromClassNames[K, V, F <: NewInputFormat[K, V]](
@@ -373,8 +374,8 @@ private[spark] object PythonRDD {
373374
val rdd =
374375
hadoopRDDFromClassNames[K, V, F](sc,
375376
Some(path), inputFormatClazz, keyClazz, valueClazz, mergedConf)
376-
val converted = SerDeUtil.convertRDD[K, V](rdd)
377-
JavaRDD.fromRDD(SerDeUtil.serMsgPack[K, V](converted))
377+
val converted = PythonHadoopUtil.convertRDD[K, V](rdd)
378+
JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
378379
}
379380

380381
/**
@@ -394,8 +395,8 @@ private[spark] object PythonRDD {
394395
val rdd =
395396
hadoopRDDFromClassNames[K, V, F](sc,
396397
None, inputFormatClazz, keyClazz, valueClazz, conf)
397-
val converted = SerDeUtil.convertRDD[K, V](rdd)
398-
JavaRDD.fromRDD(SerDeUtil.serMsgPack[K, V](converted))
398+
val converted = PythonHadoopUtil.convertRDD[K, V](rdd)
399+
JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
399400
}
400401

401402
private def hadoopRDDFromClassNames[K, V, F <: InputFormat[K, V]](

core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala

Lines changed: 48 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,12 @@
1717

1818
package org.apache.spark.api.python
1919

20-
import org.msgpack.ScalaMessagePack
2120
import scala.util.Try
2221
import org.apache.spark.rdd.RDD
23-
import org.apache.spark.{SparkContext, Logging}
24-
import org.apache.hadoop.io._
22+
import org.apache.spark.Logging
2523
import scala.util.Success
2624
import scala.util.Failure
25+
import net.razorvine.pickle.Pickler
2726

2827
/**
2928
* Utilities for serialization / deserialization between Python and Java, using MsgPack.
@@ -33,106 +32,58 @@ import scala.util.Failure
3332
private[python] object SerDeUtil extends Logging {
3433

3534
/**
36-
* Checks whether a Scala object needs to be registered with MsgPack. String, primitives
37-
* and the standard collections don't need to be registered as MsgPack takes care of serializing
38-
* them and registering them throws scary looking errors (but still works).
35+
* Convert an RDD of key-value pairs to an RDD of serialized Python objects, that is usable
36+
* by PySpark. By default, if serialization fails, toString is called and the string
37+
* representation is serialized
3938
*/
40-
def needsToBeRegistered[T](t: T) = {
41-
t match {
42-
case d: Double => false
43-
case f: Float => false
44-
case i: Int => false
45-
case l: Long => false
46-
case b: Byte => false
47-
case c: Char => false
48-
case bool: Boolean => false
49-
case s: String => false
50-
case m: Map[_, _] => false
51-
case a: Seq[_] => false
52-
case o: Option[_] => false
53-
case _ => true
54-
}
55-
}
56-
57-
/** Attempts to register a class with MsgPack */
58-
def register[T](t: T, msgpack: ScalaMessagePack) {
59-
if (!needsToBeRegistered(t)) {
60-
return
61-
}
62-
val clazz = t.getClass
63-
Try {
64-
msgpack.register(clazz)
65-
log.info(s"Registered key/value class with MsgPack: $clazz")
66-
} match {
67-
case Failure(err) =>
68-
log.warn(s"""Failed to register class ($clazz) with MsgPack.
69-
Falling back to default MsgPack serialization, or 'toString' as last resort.
70-
Error: ${err.getMessage}""")
71-
case Success(result) =>
72-
}
73-
}
74-
75-
/** Serializes an RDD[(K, V)] -> RDD[Array[Byte]] using MsgPack */
76-
def serMsgPack[K, V](rdd: RDD[(K, V)]) = {
77-
import org.msgpack.ScalaMessagePack._
78-
rdd.mapPartitions{ pairs =>
79-
val mp = new ScalaMessagePack
80-
var triedReg = false
81-
pairs.map{ pair =>
82-
Try {
83-
if (!triedReg) {
84-
register(pair._1, mp)
85-
register(pair._2, mp)
86-
triedReg = true
39+
def rddToPython[K, V](rdd: RDD[(K, V)]): RDD[Array[Byte]] = {
40+
rdd.mapPartitions{ iter =>
41+
val pickle = new Pickler
42+
var keyFailed = false
43+
var valueFailed = false
44+
var firstRecord = true
45+
iter.map{ case (k, v) =>
46+
if (firstRecord) {
47+
Try {
48+
pickle.dumps(Array(k, v))
49+
} match {
50+
case Success(b) =>
51+
case Failure(err) =>
52+
val kt = Try {
53+
pickle.dumps(k)
54+
}
55+
val vt = Try {
56+
pickle.dumps(v)
57+
}
58+
(kt, vt) match {
59+
case (Failure(kf), Failure(vf)) =>
60+
log.warn(s"""Failed to pickle Java object as key: ${k.getClass.getSimpleName};
61+
Error: ${kf.getMessage}""")
62+
log.warn(s"""Failed to pickle Java object as value: ${v.getClass.getSimpleName};
63+
Error: ${vf.getMessage}""")
64+
keyFailed = true
65+
valueFailed = true
66+
case (Failure(kf), _) =>
67+
log.warn(s"""Failed to pickle Java object as key: ${k.getClass.getSimpleName};
68+
Error: ${kf.getMessage}""")
69+
keyFailed = true
70+
case (_, Failure(vf)) =>
71+
log.warn(s"""Failed to pickle Java object as value: ${v.getClass.getSimpleName};
72+
Error: ${vf.getMessage}""")
73+
valueFailed = true
74+
}
8775
}
88-
mp.write(pair)
89-
} match {
90-
case Failure(err) =>
91-
log.debug("Failed to write", err)
92-
Try {
93-
write((pair._1.toString, pair._2.toString))
94-
} match {
95-
case Success(result) => result
96-
case Failure(e) => throw e
97-
}
98-
case Success(result) => result
76+
firstRecord = false
77+
}
78+
(keyFailed, valueFailed) match {
79+
case (true, true) => pickle.dumps(Array(k.toString, v.toString))
80+
case (true, false) => pickle.dumps(Array(k.toString, v))
81+
case (false, true) => pickle.dumps(Array(k, v.toString))
82+
case (false, false) => pickle.dumps(Array(k, v))
9983
}
10084
}
10185
}
10286
}
10387

104-
/**
105-
* Converts an RDD of key-value pairs, where key and/or value could be instances of
106-
* [[org.apache.hadoop.io.Writable]], into an RDD[(K, V)]
107-
*/
108-
def convertRDD[K, V](rdd: RDD[(K, V)]) = {
109-
rdd.map{
110-
case (k: Writable, v: Writable) => (convert(k).asInstanceOf[K], convert(v).asInstanceOf[V])
111-
case (k: Writable, v) => (convert(k).asInstanceOf[K], v.asInstanceOf[V])
112-
case (k, v: Writable) => (k.asInstanceOf[K], convert(v).asInstanceOf[V])
113-
case (k, v) => (k.asInstanceOf[K], v.asInstanceOf[V])
114-
}
115-
}
116-
117-
/** Converts a [[org.apache.hadoop.io.Writable]] to the underlying primitive, String or
118-
* object representation
119-
*/
120-
def convert(writable: Writable): Any = {
121-
import collection.JavaConversions._
122-
writable match {
123-
case iw: IntWritable => SparkContext.intWritableConverter().convert(iw)
124-
case dw: DoubleWritable => SparkContext.doubleWritableConverter().convert(dw)
125-
case lw: LongWritable => SparkContext.longWritableConverter().convert(lw)
126-
case fw: FloatWritable => SparkContext.floatWritableConverter().convert(fw)
127-
case t: Text => SparkContext.stringWritableConverter().convert(t)
128-
case bw: BooleanWritable => SparkContext.booleanWritableConverter().convert(bw)
129-
case byw: BytesWritable => SparkContext.bytesWritableConverter().convert(byw)
130-
case n: NullWritable => None
131-
case aw: ArrayWritable => aw.get().map(convert(_))
132-
case mw: MapWritable => mw.map{ case (k, v) => (convert(k), convert(v)) }.toMap
133-
case other => other
134-
}
135-
}
136-
13788
}
13889

project/SparkBuild.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -344,8 +344,7 @@ object SparkBuild extends Build {
344344
"com.twitter" % "chill-java" % chillVersion excludeAll(excludeAsm),
345345
"org.tachyonproject" % "tachyon" % "0.4.1-thrift" excludeAll(excludeHadoop, excludeCurator, excludeEclipseJetty, excludePowermock),
346346
"com.clearspring.analytics" % "stream" % "2.5.1" excludeAll(excludeFastutil),
347-
"org.spark-project" % "pyrolite" % "2.0",
348-
"org.msgpack" %% "msgpack-scala" % "0.6.8"
347+
"org.spark-project" % "pyrolite" % "2.0"
349348
),
350349
libraryDependencies ++= maybeAvro
351350
)

0 commit comments

Comments
 (0)