Skip to content

Commit 5423a03

Browse files
committed
Merge branch 'master' of https://github.com/apache/spark
2 parents eb663ca + db0c038 commit 5423a03

File tree

36 files changed

+2224
-176
lines changed

36 files changed

+2224
-176
lines changed
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.api.python
19+
20+
import org.apache.spark.rdd.RDD
21+
import org.apache.spark.Logging
22+
import org.apache.hadoop.conf.Configuration
23+
import org.apache.hadoop.io._
24+
import scala.util.{Failure, Success, Try}
25+
import org.apache.spark.annotation.Experimental
26+
27+
28+
/**
29+
* :: Experimental ::
30+
* A trait for use with reading custom classes in PySpark. Implement this trait and add custom
31+
* transformation code by overriding the convert method.
32+
*/
33+
@Experimental
34+
trait Converter[T, U] extends Serializable {
35+
def convert(obj: T): U
36+
}
37+
38+
private[python] object Converter extends Logging {
39+
40+
def getInstance(converterClass: Option[String]): Converter[Any, Any] = {
41+
converterClass.map { cc =>
42+
Try {
43+
val c = Class.forName(cc).newInstance().asInstanceOf[Converter[Any, Any]]
44+
logInfo(s"Loaded converter: $cc")
45+
c
46+
} match {
47+
case Success(c) => c
48+
case Failure(err) =>
49+
logError(s"Failed to load converter: $cc")
50+
throw err
51+
}
52+
}.getOrElse { new DefaultConverter }
53+
}
54+
}
55+
56+
/**
57+
* A converter that handles conversion of common [[org.apache.hadoop.io.Writable]] objects.
58+
* Other objects are passed through without conversion.
59+
*/
60+
private[python] class DefaultConverter extends Converter[Any, Any] {
61+
62+
/**
63+
* Converts a [[org.apache.hadoop.io.Writable]] to the underlying primitive, String or
64+
* object representation
65+
*/
66+
private def convertWritable(writable: Writable): Any = {
67+
import collection.JavaConversions._
68+
writable match {
69+
case iw: IntWritable => iw.get()
70+
case dw: DoubleWritable => dw.get()
71+
case lw: LongWritable => lw.get()
72+
case fw: FloatWritable => fw.get()
73+
case t: Text => t.toString
74+
case bw: BooleanWritable => bw.get()
75+
case byw: BytesWritable => byw.getBytes
76+
case n: NullWritable => null
77+
case aw: ArrayWritable => aw.get().map(convertWritable(_))
78+
case mw: MapWritable => mapAsJavaMap(mw.map { case (k, v) =>
79+
(convertWritable(k), convertWritable(v))
80+
}.toMap)
81+
case other => other
82+
}
83+
}
84+
85+
def convert(obj: Any): Any = {
86+
obj match {
87+
case writable: Writable =>
88+
convertWritable(writable)
89+
case _ =>
90+
obj
91+
}
92+
}
93+
}
94+
95+
/** Utilities for working with Python objects <-> Hadoop-related objects */
96+
private[python] object PythonHadoopUtil {
97+
98+
/**
99+
* Convert a [[java.util.Map]] of properties to a [[org.apache.hadoop.conf.Configuration]]
100+
*/
101+
def mapToConf(map: java.util.Map[String, String]): Configuration = {
102+
import collection.JavaConversions._
103+
val conf = new Configuration()
104+
map.foreach{ case (k, v) => conf.set(k, v) }
105+
conf
106+
}
107+
108+
/**
109+
* Merges two configurations, returns a copy of left with keys from right overwriting
110+
* any matching keys in left
111+
*/
112+
def mergeConfs(left: Configuration, right: Configuration): Configuration = {
113+
import collection.JavaConversions._
114+
val copy = new Configuration(left)
115+
right.iterator().foreach(entry => copy.set(entry.getKey, entry.getValue))
116+
copy
117+
}
118+
119+
/**
120+
* Converts an RDD of key-value pairs, where key and/or value could be instances of
121+
* [[org.apache.hadoop.io.Writable]], into an RDD[(K, V)]
122+
*/
123+
def convertRDD[K, V](rdd: RDD[(K, V)],
124+
keyConverter: Converter[Any, Any],
125+
valueConverter: Converter[Any, Any]): RDD[(Any, Any)] = {
126+
rdd.map { case (k, v) => (keyConverter.convert(k), valueConverter.convert(v)) }
127+
}
128+
129+
}

core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala

Lines changed: 178 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@ import scala.util.Try
2828

2929
import net.razorvine.pickle.{Pickler, Unpickler}
3030

31+
import org.apache.hadoop.conf.Configuration
32+
import org.apache.hadoop.mapred.{InputFormat, JobConf}
33+
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
3134
import org.apache.spark._
3235
import org.apache.spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD}
3336
import org.apache.spark.broadcast.Broadcast
@@ -266,7 +269,7 @@ private object SpecialLengths {
266269
val TIMING_DATA = -3
267270
}
268271

269-
private[spark] object PythonRDD {
272+
private[spark] object PythonRDD extends Logging {
270273
val UTF8 = Charset.forName("UTF-8")
271274

272275
/**
@@ -346,6 +349,180 @@ private[spark] object PythonRDD {
346349
}
347350
}
348351

352+
/**
353+
* Create an RDD from a path using [[org.apache.hadoop.mapred.SequenceFileInputFormat]],
354+
* key and value class.
355+
* A key and/or value converter class can optionally be passed in
356+
* (see [[org.apache.spark.api.python.Converter]])
357+
*/
358+
def sequenceFile[K, V](
359+
sc: JavaSparkContext,
360+
path: String,
361+
keyClassMaybeNull: String,
362+
valueClassMaybeNull: String,
363+
keyConverterClass: String,
364+
valueConverterClass: String,
365+
minSplits: Int) = {
366+
val keyClass = Option(keyClassMaybeNull).getOrElse("org.apache.hadoop.io.Text")
367+
val valueClass = Option(valueClassMaybeNull).getOrElse("org.apache.hadoop.io.Text")
368+
implicit val kcm = ClassTag(Class.forName(keyClass)).asInstanceOf[ClassTag[K]]
369+
implicit val vcm = ClassTag(Class.forName(valueClass)).asInstanceOf[ClassTag[V]]
370+
val kc = kcm.runtimeClass.asInstanceOf[Class[K]]
371+
val vc = vcm.runtimeClass.asInstanceOf[Class[V]]
372+
373+
val rdd = sc.sc.sequenceFile[K, V](path, kc, vc, minSplits)
374+
val keyConverter = Converter.getInstance(Option(keyConverterClass))
375+
val valueConverter = Converter.getInstance(Option(valueConverterClass))
376+
val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter)
377+
JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
378+
}
379+
380+
/**
381+
* Create an RDD from a file path, using an arbitrary [[org.apache.hadoop.mapreduce.InputFormat]],
382+
* key and value class.
383+
* A key and/or value converter class can optionally be passed in
384+
* (see [[org.apache.spark.api.python.Converter]])
385+
*/
386+
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
387+
sc: JavaSparkContext,
388+
path: String,
389+
inputFormatClass: String,
390+
keyClass: String,
391+
valueClass: String,
392+
keyConverterClass: String,
393+
valueConverterClass: String,
394+
confAsMap: java.util.HashMap[String, String]) = {
395+
val conf = PythonHadoopUtil.mapToConf(confAsMap)
396+
val baseConf = sc.hadoopConfiguration()
397+
val mergedConf = PythonHadoopUtil.mergeConfs(baseConf, conf)
398+
val rdd =
399+
newAPIHadoopRDDFromClassNames[K, V, F](sc,
400+
Some(path), inputFormatClass, keyClass, valueClass, mergedConf)
401+
val keyConverter = Converter.getInstance(Option(keyConverterClass))
402+
val valueConverter = Converter.getInstance(Option(valueConverterClass))
403+
val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter)
404+
JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
405+
}
406+
407+
/**
408+
* Create an RDD from a [[org.apache.hadoop.conf.Configuration]] converted from a map that is
409+
* passed in from Python, using an arbitrary [[org.apache.hadoop.mapreduce.InputFormat]],
410+
* key and value class.
411+
* A key and/or value converter class can optionally be passed in
412+
* (see [[org.apache.spark.api.python.Converter]])
413+
*/
414+
def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
415+
sc: JavaSparkContext,
416+
inputFormatClass: String,
417+
keyClass: String,
418+
valueClass: String,
419+
keyConverterClass: String,
420+
valueConverterClass: String,
421+
confAsMap: java.util.HashMap[String, String]) = {
422+
val conf = PythonHadoopUtil.mapToConf(confAsMap)
423+
val rdd =
424+
newAPIHadoopRDDFromClassNames[K, V, F](sc,
425+
None, inputFormatClass, keyClass, valueClass, conf)
426+
val keyConverter = Converter.getInstance(Option(keyConverterClass))
427+
val valueConverter = Converter.getInstance(Option(valueConverterClass))
428+
val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter)
429+
JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
430+
}
431+
432+
private def newAPIHadoopRDDFromClassNames[K, V, F <: NewInputFormat[K, V]](
433+
sc: JavaSparkContext,
434+
path: Option[String] = None,
435+
inputFormatClass: String,
436+
keyClass: String,
437+
valueClass: String,
438+
conf: Configuration) = {
439+
implicit val kcm = ClassTag(Class.forName(keyClass)).asInstanceOf[ClassTag[K]]
440+
implicit val vcm = ClassTag(Class.forName(valueClass)).asInstanceOf[ClassTag[V]]
441+
implicit val fcm = ClassTag(Class.forName(inputFormatClass)).asInstanceOf[ClassTag[F]]
442+
val kc = kcm.runtimeClass.asInstanceOf[Class[K]]
443+
val vc = vcm.runtimeClass.asInstanceOf[Class[V]]
444+
val fc = fcm.runtimeClass.asInstanceOf[Class[F]]
445+
val rdd = if (path.isDefined) {
446+
sc.sc.newAPIHadoopFile[K, V, F](path.get, fc, kc, vc, conf)
447+
} else {
448+
sc.sc.newAPIHadoopRDD[K, V, F](conf, fc, kc, vc)
449+
}
450+
rdd
451+
}
452+
453+
/**
454+
* Create an RDD from a file path, using an arbitrary [[org.apache.hadoop.mapred.InputFormat]],
455+
* key and value class.
456+
* A key and/or value converter class can optionally be passed in
457+
* (see [[org.apache.spark.api.python.Converter]])
458+
*/
459+
def hadoopFile[K, V, F <: InputFormat[K, V]](
460+
sc: JavaSparkContext,
461+
path: String,
462+
inputFormatClass: String,
463+
keyClass: String,
464+
valueClass: String,
465+
keyConverterClass: String,
466+
valueConverterClass: String,
467+
confAsMap: java.util.HashMap[String, String]) = {
468+
val conf = PythonHadoopUtil.mapToConf(confAsMap)
469+
val baseConf = sc.hadoopConfiguration()
470+
val mergedConf = PythonHadoopUtil.mergeConfs(baseConf, conf)
471+
val rdd =
472+
hadoopRDDFromClassNames[K, V, F](sc,
473+
Some(path), inputFormatClass, keyClass, valueClass, mergedConf)
474+
val keyConverter = Converter.getInstance(Option(keyConverterClass))
475+
val valueConverter = Converter.getInstance(Option(valueConverterClass))
476+
val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter)
477+
JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
478+
}
479+
480+
/**
481+
* Create an RDD from a [[org.apache.hadoop.conf.Configuration]] converted from a map
482+
* that is passed in from Python, using an arbitrary [[org.apache.hadoop.mapred.InputFormat]],
483+
* key and value class
484+
* A key and/or value converter class can optionally be passed in
485+
* (see [[org.apache.spark.api.python.Converter]])
486+
*/
487+
def hadoopRDD[K, V, F <: InputFormat[K, V]](
488+
sc: JavaSparkContext,
489+
inputFormatClass: String,
490+
keyClass: String,
491+
valueClass: String,
492+
keyConverterClass: String,
493+
valueConverterClass: String,
494+
confAsMap: java.util.HashMap[String, String]) = {
495+
val conf = PythonHadoopUtil.mapToConf(confAsMap)
496+
val rdd =
497+
hadoopRDDFromClassNames[K, V, F](sc,
498+
None, inputFormatClass, keyClass, valueClass, conf)
499+
val keyConverter = Converter.getInstance(Option(keyConverterClass))
500+
val valueConverter = Converter.getInstance(Option(valueConverterClass))
501+
val converted = PythonHadoopUtil.convertRDD[K, V](rdd, keyConverter, valueConverter)
502+
JavaRDD.fromRDD(SerDeUtil.rddToPython(converted))
503+
}
504+
505+
private def hadoopRDDFromClassNames[K, V, F <: InputFormat[K, V]](
506+
sc: JavaSparkContext,
507+
path: Option[String] = None,
508+
inputFormatClass: String,
509+
keyClass: String,
510+
valueClass: String,
511+
conf: Configuration) = {
512+
implicit val kcm = ClassTag(Class.forName(keyClass)).asInstanceOf[ClassTag[K]]
513+
implicit val vcm = ClassTag(Class.forName(valueClass)).asInstanceOf[ClassTag[V]]
514+
implicit val fcm = ClassTag(Class.forName(inputFormatClass)).asInstanceOf[ClassTag[F]]
515+
val kc = kcm.runtimeClass.asInstanceOf[Class[K]]
516+
val vc = vcm.runtimeClass.asInstanceOf[Class[V]]
517+
val fc = fcm.runtimeClass.asInstanceOf[Class[F]]
518+
val rdd = if (path.isDefined) {
519+
sc.sc.hadoopFile(path.get, fc, kc, vc)
520+
} else {
521+
sc.sc.hadoopRDD(new JobConf(conf), fc, kc, vc)
522+
}
523+
rdd
524+
}
525+
349526
def writeUTF(str: String, dataOut: DataOutputStream) {
350527
val bytes = str.getBytes(UTF8)
351528
dataOut.writeInt(bytes.length)

0 commit comments

Comments
 (0)