Skip to content

Commit 584449a

Browse files
committed
Merge remote-tracking branch 'upstream/master' into decisiontree-python-new
2 parents e06e423 + 49b3612 commit 584449a

File tree

33 files changed

+827
-114
lines changed

33 files changed

+827
-114
lines changed

core/src/main/scala/org/apache/spark/Logging.scala

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,7 @@ trait Logging {
4545
initializeIfNecessary()
4646
var className = this.getClass.getName
4747
// Ignore trailing $'s in the class names for Scala objects
48-
if (className.endsWith("$")) {
49-
className = className.substring(0, className.length - 1)
50-
}
51-
log_ = LoggerFactory.getLogger(className)
48+
log_ = LoggerFactory.getLogger(className.stripSuffix("$"))
5249
}
5350
log_
5451
}
@@ -110,23 +107,27 @@ trait Logging {
110107
}
111108

112109
private def initializeLogging() {
113-
// If Log4j is being used, but is not initialized, load a default properties file
114-
val binder = StaticLoggerBinder.getSingleton
115-
val usingLog4j = binder.getLoggerFactoryClassStr.endsWith("Log4jLoggerFactory")
116-
val log4jInitialized = LogManager.getRootLogger.getAllAppenders.hasMoreElements
117-
if (!log4jInitialized && usingLog4j) {
110+
// Don't use a logger in here, as this is itself occurring during initialization of a logger
111+
// If Log4j 1.2 is being used, but is not initialized, load a default properties file
112+
val binderClass = StaticLoggerBinder.getSingleton.getLoggerFactoryClassStr
113+
// This distinguishes the log4j 1.2 binding, currently
114+
// org.slf4j.impl.Log4jLoggerFactory, from the log4j 2.0 binding, currently
115+
// org.apache.logging.slf4j.Log4jLoggerFactory
116+
val usingLog4j12 = "org.slf4j.impl.Log4jLoggerFactory".equals(binderClass)
117+
val log4j12Initialized = LogManager.getRootLogger.getAllAppenders.hasMoreElements
118+
if (!log4j12Initialized && usingLog4j12) {
118119
val defaultLogProps = "org/apache/spark/log4j-defaults.properties"
119120
Option(Utils.getSparkClassLoader.getResource(defaultLogProps)) match {
120121
case Some(url) =>
121122
PropertyConfigurator.configure(url)
122-
log.info(s"Using Spark's default log4j profile: $defaultLogProps")
123+
System.err.println(s"Using Spark's default log4j profile: $defaultLogProps")
123124
case None =>
124125
System.err.println(s"Spark was unable to load $defaultLogProps")
125126
}
126127
}
127128
Logging.initialized = true
128129

129-
// Force a call into slf4j to initialize it. Avoids this happening from mutliple threads
130+
// Force a call into slf4j to initialize it. Avoids this happening from multiple threads
130131
// and triggering this: http://mailman.qos.ch/pipermail/slf4j-dev/2010-April/002956.html
131132
log
132133
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.api.java
19+
20+
import scala.collection.JavaConversions._
21+
import scala.reflect.ClassTag
22+
23+
import org.apache.hadoop.mapred.InputSplit
24+
25+
import org.apache.spark.annotation.DeveloperApi
26+
import org.apache.spark.api.java.JavaSparkContext._
27+
import org.apache.spark.api.java.function.{Function2 => JFunction2}
28+
import org.apache.spark.rdd.HadoopRDD
29+
30+
@DeveloperApi
31+
class JavaHadoopRDD[K, V](rdd: HadoopRDD[K, V])
32+
(implicit override val kClassTag: ClassTag[K], implicit override val vClassTag: ClassTag[V])
33+
extends JavaPairRDD[K, V](rdd) {
34+
35+
/** Maps over a partition, providing the InputSplit that was used as the base of the partition. */
36+
@DeveloperApi
37+
def mapPartitionsWithInputSplit[R](
38+
f: JFunction2[InputSplit, java.util.Iterator[(K, V)], java.util.Iterator[R]],
39+
preservesPartitioning: Boolean = false): JavaRDD[R] = {
40+
new JavaRDD(rdd.mapPartitionsWithInputSplit((a, b) => f.call(a, asJavaIterator(b)),
41+
preservesPartitioning)(fakeClassTag))(fakeClassTag)
42+
}
43+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.api.java
19+
20+
import scala.collection.JavaConversions._
21+
import scala.reflect.ClassTag
22+
23+
import org.apache.hadoop.mapreduce.InputSplit
24+
25+
import org.apache.spark.annotation.DeveloperApi
26+
import org.apache.spark.api.java.JavaSparkContext._
27+
import org.apache.spark.api.java.function.{Function2 => JFunction2}
28+
import org.apache.spark.rdd.NewHadoopRDD
29+
30+
@DeveloperApi
31+
class JavaNewHadoopRDD[K, V](rdd: NewHadoopRDD[K, V])
32+
(implicit override val kClassTag: ClassTag[K], implicit override val vClassTag: ClassTag[V])
33+
extends JavaPairRDD[K, V](rdd) {
34+
35+
/** Maps over a partition, providing the InputSplit that was used as the base of the partition. */
36+
@DeveloperApi
37+
def mapPartitionsWithInputSplit[R](
38+
f: JFunction2[InputSplit, java.util.Iterator[(K, V)], java.util.Iterator[R]],
39+
preservesPartitioning: Boolean = false): JavaRDD[R] = {
40+
new JavaRDD(rdd.mapPartitionsWithInputSplit((a, b) => f.call(a, asJavaIterator(b)),
41+
preservesPartitioning)(fakeClassTag))(fakeClassTag)
42+
}
43+
}

core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import org.apache.spark._
3434
import org.apache.spark.SparkContext.{DoubleAccumulatorParam, IntAccumulatorParam}
3535
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
3636
import org.apache.spark.broadcast.Broadcast
37-
import org.apache.spark.rdd.{EmptyRDD, RDD}
37+
import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, NewHadoopRDD, RDD}
3838

3939
/**
4040
* A Java-friendly version of [[org.apache.spark.SparkContext]] that returns
@@ -294,7 +294,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
294294
): JavaPairRDD[K, V] = {
295295
implicit val ctagK: ClassTag[K] = ClassTag(keyClass)
296296
implicit val ctagV: ClassTag[V] = ClassTag(valueClass)
297-
new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass, minPartitions))
297+
val rdd = sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass, minPartitions)
298+
new JavaHadoopRDD(rdd.asInstanceOf[HadoopRDD[K, V]])
298299
}
299300

300301
/**
@@ -314,7 +315,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
314315
): JavaPairRDD[K, V] = {
315316
implicit val ctagK: ClassTag[K] = ClassTag(keyClass)
316317
implicit val ctagV: ClassTag[V] = ClassTag(valueClass)
317-
new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass))
318+
val rdd = sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass)
319+
new JavaHadoopRDD(rdd.asInstanceOf[HadoopRDD[K, V]])
318320
}
319321

320322
/** Get an RDD for a Hadoop file with an arbitrary InputFormat.
@@ -333,7 +335,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
333335
): JavaPairRDD[K, V] = {
334336
implicit val ctagK: ClassTag[K] = ClassTag(keyClass)
335337
implicit val ctagV: ClassTag[V] = ClassTag(valueClass)
336-
new JavaPairRDD(sc.hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions))
338+
val rdd = sc.hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions)
339+
new JavaHadoopRDD(rdd.asInstanceOf[HadoopRDD[K, V]])
337340
}
338341

339342
/** Get an RDD for a Hadoop file with an arbitrary InputFormat
@@ -351,8 +354,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
351354
): JavaPairRDD[K, V] = {
352355
implicit val ctagK: ClassTag[K] = ClassTag(keyClass)
353356
implicit val ctagV: ClassTag[V] = ClassTag(valueClass)
354-
new JavaPairRDD(sc.hadoopFile(path,
355-
inputFormatClass, keyClass, valueClass))
357+
val rdd = sc.hadoopFile(path, inputFormatClass, keyClass, valueClass)
358+
new JavaHadoopRDD(rdd.asInstanceOf[HadoopRDD[K, V]])
356359
}
357360

358361
/**
@@ -372,7 +375,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
372375
conf: Configuration): JavaPairRDD[K, V] = {
373376
implicit val ctagK: ClassTag[K] = ClassTag(kClass)
374377
implicit val ctagV: ClassTag[V] = ClassTag(vClass)
375-
new JavaPairRDD(sc.newAPIHadoopFile(path, fClass, kClass, vClass, conf))
378+
val rdd = sc.newAPIHadoopFile(path, fClass, kClass, vClass, conf)
379+
new JavaNewHadoopRDD(rdd.asInstanceOf[NewHadoopRDD[K, V]])
376380
}
377381

378382
/**
@@ -391,7 +395,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
391395
vClass: Class[V]): JavaPairRDD[K, V] = {
392396
implicit val ctagK: ClassTag[K] = ClassTag(kClass)
393397
implicit val ctagV: ClassTag[V] = ClassTag(vClass)
394-
new JavaPairRDD(sc.newAPIHadoopRDD(conf, fClass, kClass, vClass))
398+
val rdd = sc.newAPIHadoopRDD(conf, fClass, kClass, vClass)
399+
new JavaNewHadoopRDD(rdd.asInstanceOf[NewHadoopRDD[K, V]])
395400
}
396401

397402
/** Build the union of two or more RDDs. */

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ object SparkSubmit {
184184
OptionAssigner(args.archives, YARN, CLIENT, sysProp = "spark.yarn.dist.archives"),
185185

186186
// Yarn cluster only
187-
OptionAssigner(args.name, YARN, CLUSTER, clOption = "--name", sysProp = "spark.app.name"),
187+
OptionAssigner(args.name, YARN, CLUSTER, clOption = "--name"),
188188
OptionAssigner(args.driverMemory, YARN, CLUSTER, clOption = "--driver-memory"),
189189
OptionAssigner(args.queue, YARN, CLUSTER, clOption = "--queue"),
190190
OptionAssigner(args.numExecutors, YARN, CLUSTER, clOption = "--num-executors"),
@@ -268,14 +268,17 @@ object SparkSubmit {
268268
}
269269
}
270270

271+
// Properties given with --conf are superceded by other options, but take precedence over
272+
// properties in the defaults file.
273+
for ((k, v) <- args.sparkProperties) {
274+
sysProps.getOrElseUpdate(k, v)
275+
}
276+
271277
// Read from default spark properties, if any
272278
for ((k, v) <- args.getDefaultSparkProperties) {
273279
sysProps.getOrElseUpdate(k, v)
274280
}
275281

276-
// Spark properties included on command line take precedence
277-
sysProps ++= args.sparkProperties
278-
279282
(childArgs, childClasspath, sysProps, childMainClass)
280283
}
281284

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
5858
val sparkProperties: HashMap[String, String] = new HashMap[String, String]()
5959

6060
parseOpts(args.toList)
61-
loadDefaults()
61+
mergeSparkProperties()
6262
checkRequiredArguments()
6363

6464
/** Return default present in the currently defined defaults file. */
@@ -79,9 +79,11 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
7979
defaultProperties
8080
}
8181

82-
/** Fill in any undefined values based on the current properties file or built-in defaults. */
83-
private def loadDefaults(): Unit = {
84-
82+
/**
83+
* Fill in any undefined values based on the default properties file or options passed in through
84+
* the '--conf' flag.
85+
*/
86+
private def mergeSparkProperties(): Unit = {
8587
// Use common defaults file, if not specified by user
8688
if (propertiesFile == null) {
8789
sys.env.get("SPARK_HOME").foreach { sparkHome =>
@@ -94,18 +96,20 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
9496
}
9597
}
9698

97-
val defaultProperties = getDefaultSparkProperties
99+
val properties = getDefaultSparkProperties
100+
properties.putAll(sparkProperties)
101+
98102
// Use properties file as fallback for values which have a direct analog to
99103
// arguments in this script.
100-
master = Option(master).getOrElse(defaultProperties.get("spark.master").orNull)
104+
master = Option(master).getOrElse(properties.get("spark.master").orNull)
101105
executorMemory = Option(executorMemory)
102-
.getOrElse(defaultProperties.get("spark.executor.memory").orNull)
106+
.getOrElse(properties.get("spark.executor.memory").orNull)
103107
executorCores = Option(executorCores)
104-
.getOrElse(defaultProperties.get("spark.executor.cores").orNull)
108+
.getOrElse(properties.get("spark.executor.cores").orNull)
105109
totalExecutorCores = Option(totalExecutorCores)
106-
.getOrElse(defaultProperties.get("spark.cores.max").orNull)
107-
name = Option(name).getOrElse(defaultProperties.get("spark.app.name").orNull)
108-
jars = Option(jars).getOrElse(defaultProperties.get("spark.jars").orNull)
110+
.getOrElse(properties.get("spark.cores.max").orNull)
111+
name = Option(name).getOrElse(properties.get("spark.app.name").orNull)
112+
jars = Option(jars).getOrElse(properties.get("spark.jars").orNull)
109113

110114
// This supports env vars in older versions of Spark
111115
master = Option(master).getOrElse(System.getenv("MASTER"))

core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ package org.apache.spark.rdd
2020
import java.text.SimpleDateFormat
2121
import java.util.Date
2222
import java.io.EOFException
23+
2324
import scala.collection.immutable.Map
25+
import scala.reflect.ClassTag
2426

2527
import org.apache.hadoop.conf.{Configurable, Configuration}
2628
import org.apache.hadoop.mapred.FileSplit
@@ -39,6 +41,7 @@ import org.apache.spark.annotation.DeveloperApi
3941
import org.apache.spark.broadcast.Broadcast
4042
import org.apache.spark.deploy.SparkHadoopUtil
4143
import org.apache.spark.executor.{DataReadMethod, InputMetrics}
44+
import org.apache.spark.rdd.HadoopRDD.HadoopMapPartitionsWithSplitRDD
4245
import org.apache.spark.util.NextIterator
4346

4447
/**
@@ -232,6 +235,14 @@ class HadoopRDD[K, V](
232235
new InterruptibleIterator[(K, V)](context, iter)
233236
}
234237

238+
/** Maps over a partition, providing the InputSplit that was used as the base of the partition. */
239+
@DeveloperApi
240+
def mapPartitionsWithInputSplit[U: ClassTag](
241+
f: (InputSplit, Iterator[(K, V)]) => Iterator[U],
242+
preservesPartitioning: Boolean = false): RDD[U] = {
243+
new HadoopMapPartitionsWithSplitRDD(this, f, preservesPartitioning)
244+
}
245+
235246
override def getPreferredLocations(split: Partition): Seq[String] = {
236247
// TODO: Filtering out "localhost" in case of file:// URLs
237248
val hadoopSplit = split.asInstanceOf[HadoopPartition]
@@ -272,4 +283,25 @@ private[spark] object HadoopRDD {
272283
conf.setInt("mapred.task.partition", splitId)
273284
conf.set("mapred.job.id", jobID.toString)
274285
}
286+
287+
/**
288+
* Analogous to [[org.apache.spark.rdd.MapPartitionsRDD]], but passes in an InputSplit to
289+
* the given function rather than the index of the partition.
290+
*/
291+
private[spark] class HadoopMapPartitionsWithSplitRDD[U: ClassTag, T: ClassTag](
292+
prev: RDD[T],
293+
f: (InputSplit, Iterator[T]) => Iterator[U],
294+
preservesPartitioning: Boolean = false)
295+
extends RDD[U](prev) {
296+
297+
override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None
298+
299+
override def getPartitions: Array[Partition] = firstParent[T].partitions
300+
301+
override def compute(split: Partition, context: TaskContext) = {
302+
val partition = split.asInstanceOf[HadoopPartition]
303+
val inputSplit = partition.inputSplit.value
304+
f(inputSplit, firstParent[T].iterator(split, context))
305+
}
306+
}
275307
}

0 commit comments

Comments
 (0)