Skip to content

Commit 8bb8aa0

Browse files
committed
Merge remote-tracking branch 'upstream/master' into decisiontree-bugfix
2 parents 978cfcf + f68105d commit 8bb8aa0

File tree

19 files changed

+318
-55
lines changed

19 files changed

+318
-55
lines changed
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.api.java
19+
20+
import scala.collection.JavaConversions._
21+
import scala.reflect.ClassTag
22+
23+
import org.apache.hadoop.mapred.InputSplit
24+
25+
import org.apache.spark.annotation.DeveloperApi
26+
import org.apache.spark.api.java.JavaSparkContext._
27+
import org.apache.spark.api.java.function.{Function2 => JFunction2}
28+
import org.apache.spark.rdd.HadoopRDD
29+
30+
@DeveloperApi
31+
class JavaHadoopRDD[K, V](rdd: HadoopRDD[K, V])
32+
(implicit override val kClassTag: ClassTag[K], implicit override val vClassTag: ClassTag[V])
33+
extends JavaPairRDD[K, V](rdd) {
34+
35+
/** Maps over a partition, providing the InputSplit that was used as the base of the partition. */
36+
@DeveloperApi
37+
def mapPartitionsWithInputSplit[R](
38+
f: JFunction2[InputSplit, java.util.Iterator[(K, V)], java.util.Iterator[R]],
39+
preservesPartitioning: Boolean = false): JavaRDD[R] = {
40+
new JavaRDD(rdd.mapPartitionsWithInputSplit((a, b) => f.call(a, asJavaIterator(b)),
41+
preservesPartitioning)(fakeClassTag))(fakeClassTag)
42+
}
43+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.api.java
19+
20+
import scala.collection.JavaConversions._
21+
import scala.reflect.ClassTag
22+
23+
import org.apache.hadoop.mapreduce.InputSplit
24+
25+
import org.apache.spark.annotation.DeveloperApi
26+
import org.apache.spark.api.java.JavaSparkContext._
27+
import org.apache.spark.api.java.function.{Function2 => JFunction2}
28+
import org.apache.spark.rdd.NewHadoopRDD
29+
30+
@DeveloperApi
31+
class JavaNewHadoopRDD[K, V](rdd: NewHadoopRDD[K, V])
32+
(implicit override val kClassTag: ClassTag[K], implicit override val vClassTag: ClassTag[V])
33+
extends JavaPairRDD[K, V](rdd) {
34+
35+
/** Maps over a partition, providing the InputSplit that was used as the base of the partition. */
36+
@DeveloperApi
37+
def mapPartitionsWithInputSplit[R](
38+
f: JFunction2[InputSplit, java.util.Iterator[(K, V)], java.util.Iterator[R]],
39+
preservesPartitioning: Boolean = false): JavaRDD[R] = {
40+
new JavaRDD(rdd.mapPartitionsWithInputSplit((a, b) => f.call(a, asJavaIterator(b)),
41+
preservesPartitioning)(fakeClassTag))(fakeClassTag)
42+
}
43+
}

core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import org.apache.spark._
3434
import org.apache.spark.SparkContext.{DoubleAccumulatorParam, IntAccumulatorParam}
3535
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
3636
import org.apache.spark.broadcast.Broadcast
37-
import org.apache.spark.rdd.{EmptyRDD, RDD}
37+
import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, NewHadoopRDD, RDD}
3838

3939
/**
4040
* A Java-friendly version of [[org.apache.spark.SparkContext]] that returns
@@ -294,7 +294,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
294294
): JavaPairRDD[K, V] = {
295295
implicit val ctagK: ClassTag[K] = ClassTag(keyClass)
296296
implicit val ctagV: ClassTag[V] = ClassTag(valueClass)
297-
new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass, minPartitions))
297+
val rdd = sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass, minPartitions)
298+
new JavaHadoopRDD(rdd.asInstanceOf[HadoopRDD[K, V]])
298299
}
299300

300301
/**
@@ -314,7 +315,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
314315
): JavaPairRDD[K, V] = {
315316
implicit val ctagK: ClassTag[K] = ClassTag(keyClass)
316317
implicit val ctagV: ClassTag[V] = ClassTag(valueClass)
317-
new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass))
318+
val rdd = sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass)
319+
new JavaHadoopRDD(rdd.asInstanceOf[HadoopRDD[K, V]])
318320
}
319321

320322
/** Get an RDD for a Hadoop file with an arbitrary InputFormat.
@@ -333,7 +335,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
333335
): JavaPairRDD[K, V] = {
334336
implicit val ctagK: ClassTag[K] = ClassTag(keyClass)
335337
implicit val ctagV: ClassTag[V] = ClassTag(valueClass)
336-
new JavaPairRDD(sc.hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions))
338+
val rdd = sc.hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions)
339+
new JavaHadoopRDD(rdd.asInstanceOf[HadoopRDD[K, V]])
337340
}
338341

339342
/** Get an RDD for a Hadoop file with an arbitrary InputFormat
@@ -351,8 +354,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
351354
): JavaPairRDD[K, V] = {
352355
implicit val ctagK: ClassTag[K] = ClassTag(keyClass)
353356
implicit val ctagV: ClassTag[V] = ClassTag(valueClass)
354-
new JavaPairRDD(sc.hadoopFile(path,
355-
inputFormatClass, keyClass, valueClass))
357+
val rdd = sc.hadoopFile(path, inputFormatClass, keyClass, valueClass)
358+
new JavaHadoopRDD(rdd.asInstanceOf[HadoopRDD[K, V]])
356359
}
357360

358361
/**
@@ -372,7 +375,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
372375
conf: Configuration): JavaPairRDD[K, V] = {
373376
implicit val ctagK: ClassTag[K] = ClassTag(kClass)
374377
implicit val ctagV: ClassTag[V] = ClassTag(vClass)
375-
new JavaPairRDD(sc.newAPIHadoopFile(path, fClass, kClass, vClass, conf))
378+
val rdd = sc.newAPIHadoopFile(path, fClass, kClass, vClass, conf)
379+
new JavaNewHadoopRDD(rdd.asInstanceOf[NewHadoopRDD[K, V]])
376380
}
377381

378382
/**
@@ -391,7 +395,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
391395
vClass: Class[V]): JavaPairRDD[K, V] = {
392396
implicit val ctagK: ClassTag[K] = ClassTag(kClass)
393397
implicit val ctagV: ClassTag[V] = ClassTag(vClass)
394-
new JavaPairRDD(sc.newAPIHadoopRDD(conf, fClass, kClass, vClass))
398+
val rdd = sc.newAPIHadoopRDD(conf, fClass, kClass, vClass)
399+
new JavaNewHadoopRDD(rdd.asInstanceOf[NewHadoopRDD[K, V]])
395400
}
396401

397402
/** Build the union of two or more RDDs. */

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ object SparkSubmit {
184184
OptionAssigner(args.archives, YARN, CLIENT, sysProp = "spark.yarn.dist.archives"),
185185

186186
// Yarn cluster only
187-
OptionAssigner(args.name, YARN, CLUSTER, clOption = "--name", sysProp = "spark.app.name"),
187+
OptionAssigner(args.name, YARN, CLUSTER, clOption = "--name"),
188188
OptionAssigner(args.driverMemory, YARN, CLUSTER, clOption = "--driver-memory"),
189189
OptionAssigner(args.queue, YARN, CLUSTER, clOption = "--queue"),
190190
OptionAssigner(args.numExecutors, YARN, CLUSTER, clOption = "--num-executors"),
@@ -268,14 +268,17 @@ object SparkSubmit {
268268
}
269269
}
270270

271+
// Properties given with --conf are superceded by other options, but take precedence over
272+
// properties in the defaults file.
273+
for ((k, v) <- args.sparkProperties) {
274+
sysProps.getOrElseUpdate(k, v)
275+
}
276+
271277
// Read from default spark properties, if any
272278
for ((k, v) <- args.getDefaultSparkProperties) {
273279
sysProps.getOrElseUpdate(k, v)
274280
}
275281

276-
// Spark properties included on command line take precedence
277-
sysProps ++= args.sparkProperties
278-
279282
(childArgs, childClasspath, sysProps, childMainClass)
280283
}
281284

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
5858
val sparkProperties: HashMap[String, String] = new HashMap[String, String]()
5959

6060
parseOpts(args.toList)
61-
loadDefaults()
61+
mergeSparkProperties()
6262
checkRequiredArguments()
6363

6464
/** Return default present in the currently defined defaults file. */
@@ -79,9 +79,11 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
7979
defaultProperties
8080
}
8181

82-
/** Fill in any undefined values based on the current properties file or built-in defaults. */
83-
private def loadDefaults(): Unit = {
84-
82+
/**
83+
* Fill in any undefined values based on the default properties file or options passed in through
84+
* the '--conf' flag.
85+
*/
86+
private def mergeSparkProperties(): Unit = {
8587
// Use common defaults file, if not specified by user
8688
if (propertiesFile == null) {
8789
sys.env.get("SPARK_HOME").foreach { sparkHome =>
@@ -94,18 +96,20 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
9496
}
9597
}
9698

97-
val defaultProperties = getDefaultSparkProperties
99+
val properties = getDefaultSparkProperties
100+
properties.putAll(sparkProperties)
101+
98102
// Use properties file as fallback for values which have a direct analog to
99103
// arguments in this script.
100-
master = Option(master).getOrElse(defaultProperties.get("spark.master").orNull)
104+
master = Option(master).getOrElse(properties.get("spark.master").orNull)
101105
executorMemory = Option(executorMemory)
102-
.getOrElse(defaultProperties.get("spark.executor.memory").orNull)
106+
.getOrElse(properties.get("spark.executor.memory").orNull)
103107
executorCores = Option(executorCores)
104-
.getOrElse(defaultProperties.get("spark.executor.cores").orNull)
108+
.getOrElse(properties.get("spark.executor.cores").orNull)
105109
totalExecutorCores = Option(totalExecutorCores)
106-
.getOrElse(defaultProperties.get("spark.cores.max").orNull)
107-
name = Option(name).getOrElse(defaultProperties.get("spark.app.name").orNull)
108-
jars = Option(jars).getOrElse(defaultProperties.get("spark.jars").orNull)
110+
.getOrElse(properties.get("spark.cores.max").orNull)
111+
name = Option(name).getOrElse(properties.get("spark.app.name").orNull)
112+
jars = Option(jars).getOrElse(properties.get("spark.jars").orNull)
109113

110114
// This supports env vars in older versions of Spark
111115
master = Option(master).getOrElse(System.getenv("MASTER"))

core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ package org.apache.spark.rdd
2020
import java.text.SimpleDateFormat
2121
import java.util.Date
2222
import java.io.EOFException
23+
2324
import scala.collection.immutable.Map
25+
import scala.reflect.ClassTag
2426

2527
import org.apache.hadoop.conf.{Configurable, Configuration}
2628
import org.apache.hadoop.mapred.FileSplit
@@ -39,6 +41,7 @@ import org.apache.spark.annotation.DeveloperApi
3941
import org.apache.spark.broadcast.Broadcast
4042
import org.apache.spark.deploy.SparkHadoopUtil
4143
import org.apache.spark.executor.{DataReadMethod, InputMetrics}
44+
import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD
4245
import org.apache.spark.util.NextIterator
4346

4447
/**
@@ -232,6 +235,14 @@ class HadoopRDD[K, V](
232235
new InterruptibleIterator[(K, V)](context, iter)
233236
}
234237

238+
/** Maps over a partition, providing the InputSplit that was used as the base of the partition. */
239+
@DeveloperApi
240+
def mapPartitionsWithInputSplit[U: ClassTag](
241+
f: (InputSplit, Iterator[(K, V)]) => Iterator[U],
242+
preservesPartitioning: Boolean = false): RDD[U] = {
243+
new HadoopMapPartitionsWithSplitRDD(this, f, preservesPartitioning)
244+
}
245+
235246
override def getPreferredLocations(split: Partition): Seq[String] = {
236247
// TODO: Filtering out "localhost" in case of file:// URLs
237248
val hadoopSplit = split.asInstanceOf[HadoopPartition]
@@ -272,4 +283,25 @@ private[spark] object HadoopRDD {
272283
conf.setInt("mapred.task.partition", splitId)
273284
conf.set("mapred.job.id", jobID.toString)
274285
}
286+
287+
/**
288+
* Analogous to [[org.apache.spark.rdd.MapPartitionsRDD]], but passes in an InputSplit to
289+
* the given function rather than the index of the partition.
290+
*/
291+
private[spark] class HadoopMapPartitionsWithSplitRDD[U: ClassTag, T: ClassTag](
292+
prev: RDD[T],
293+
f: (InputSplit, Iterator[T]) => Iterator[U],
294+
preservesPartitioning: Boolean = false)
295+
extends RDD[U](prev) {
296+
297+
override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
298+
299+
override def getPartitions: Array[Partition] = firstParent[T].partitions
300+
301+
override def compute(split: Partition, context: TaskContext) = {
302+
val partition = split.asInstanceOf[HadoopPartition]
303+
val inputSplit = partition.inputSplit.value
304+
f(inputSplit, firstParent[T].iterator(split, context))
305+
}
306+
}
275307
}

core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ package org.apache.spark.rdd
2020
import java.text.SimpleDateFormat
2121
import java.util.Date
2222

23+
import scala.reflect.ClassTag
24+
2325
import org.apache.hadoop.conf.{Configurable, Configuration}
2426
import org.apache.hadoop.io.Writable
2527
import org.apache.hadoop.mapreduce._
@@ -32,6 +34,7 @@ import org.apache.spark.Partition
3234
import org.apache.spark.SerializableWritable
3335
import org.apache.spark.{SparkContext, TaskContext}
3436
import org.apache.spark.executor.{DataReadMethod, InputMetrics}
37+
import org.apache.spark.rdd.NewHadoopRDD.NewHadoopMapPartitionsWithSplitRDD
3538

3639
private[spark] class NewHadoopPartition(
3740
rddId: Int,
@@ -157,6 +160,14 @@ class NewHadoopRDD[K, V](
157160
new InterruptibleIterator(context, iter)
158161
}
159162

163+
/** Maps over a partition, providing the InputSplit that was used as the base of the partition. */
164+
@DeveloperApi
165+
def mapPartitionsWithInputSplit[U: ClassTag](
166+
f: (InputSplit, Iterator[(K, V)]) => Iterator[U],
167+
preservesPartitioning: Boolean = false): RDD[U] = {
168+
new NewHadoopMapPartitionsWithSplitRDD(this, f, preservesPartitioning)
169+
}
170+
160171
override def getPreferredLocations(split: Partition): Seq[String] = {
161172
val theSplit = split.asInstanceOf[NewHadoopPartition]
162173
theSplit.serializableHadoopSplit.value.getLocations.filter(_ != "localhost")
@@ -165,6 +176,29 @@ class NewHadoopRDD[K, V](
165176
def getConf: Configuration = confBroadcast.value.value
166177
}
167178

179+
private[spark] object NewHadoopRDD {
180+
/**
181+
* Analogous to [[org.apache.spark.rdd.MapPartitionsRDD]], but passes in an InputSplit to
182+
* the given function rather than the index of the partition.
183+
*/
184+
private[spark] class NewHadoopMapPartitionsWithSplitRDD[U: ClassTag, T: ClassTag](
185+
prev: RDD[T],
186+
f: (InputSplit, Iterator[T]) => Iterator[U],
187+
preservesPartitioning: Boolean = false)
188+
extends RDD[U](prev) {
189+
190+
override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
191+
192+
override def getPartitions: Array[Partition] = firstParent[T].partitions
193+
194+
override def compute(split: Partition, context: TaskContext) = {
195+
val partition = split.asInstanceOf[NewHadoopPartition]
196+
val inputSplit = partition.serializableHadoopSplit.value
197+
f(inputSplit, firstParent[T].iterator(split, context))
198+
}
199+
}
200+
}
201+
168202
private[spark] class WholeTextFileRDD(
169203
sc : SparkContext,
170204
inputFormatClass: Class[_ <: WholeTextFileInputFormat],

0 commit comments

Comments
 (0)