Skip to content

Commit a9ec384

Browse files
committed
Merge branch 'master' of github.com:apache/spark into fix-drop-events
2 parents b12fcd7 + f68105d commit a9ec384

File tree

33 files changed

+460
-129
lines changed

33 files changed

+460
-129
lines changed
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.api.java
19+
20+
import scala.collection.JavaConversions._
21+
import scala.reflect.ClassTag
22+
23+
import org.apache.hadoop.mapred.InputSplit
24+
25+
import org.apache.spark.annotation.DeveloperApi
26+
import org.apache.spark.api.java.JavaSparkContext._
27+
import org.apache.spark.api.java.function.{Function2 => JFunction2}
28+
import org.apache.spark.rdd.HadoopRDD
29+
30+
@DeveloperApi
31+
class JavaHadoopRDD[K, V](rdd: HadoopRDD[K, V])
32+
(implicit override val kClassTag: ClassTag[K], implicit override val vClassTag: ClassTag[V])
33+
extends JavaPairRDD[K, V](rdd) {
34+
35+
/** Maps over a partition, providing the InputSplit that was used as the base of the partition. */
36+
@DeveloperApi
37+
def mapPartitionsWithInputSplit[R](
38+
f: JFunction2[InputSplit, java.util.Iterator[(K, V)], java.util.Iterator[R]],
39+
preservesPartitioning: Boolean = false): JavaRDD[R] = {
40+
new JavaRDD(rdd.mapPartitionsWithInputSplit((a, b) => f.call(a, asJavaIterator(b)),
41+
preservesPartitioning)(fakeClassTag))(fakeClassTag)
42+
}
43+
}
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.api.java
19+
20+
import scala.collection.JavaConversions._
21+
import scala.reflect.ClassTag
22+
23+
import org.apache.hadoop.mapreduce.InputSplit
24+
25+
import org.apache.spark.annotation.DeveloperApi
26+
import org.apache.spark.api.java.JavaSparkContext._
27+
import org.apache.spark.api.java.function.{Function2 => JFunction2}
28+
import org.apache.spark.rdd.NewHadoopRDD
29+
30+
@DeveloperApi
31+
class JavaNewHadoopRDD[K, V](rdd: NewHadoopRDD[K, V])
32+
(implicit override val kClassTag: ClassTag[K], implicit override val vClassTag: ClassTag[V])
33+
extends JavaPairRDD[K, V](rdd) {
34+
35+
/** Maps over a partition, providing the InputSplit that was used as the base of the partition. */
36+
@DeveloperApi
37+
def mapPartitionsWithInputSplit[R](
38+
f: JFunction2[InputSplit, java.util.Iterator[(K, V)], java.util.Iterator[R]],
39+
preservesPartitioning: Boolean = false): JavaRDD[R] = {
40+
new JavaRDD(rdd.mapPartitionsWithInputSplit((a, b) => f.call(a, asJavaIterator(b)),
41+
preservesPartitioning)(fakeClassTag))(fakeClassTag)
42+
}
43+
}

core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ import org.apache.spark._
3434
import org.apache.spark.SparkContext.{DoubleAccumulatorParam, IntAccumulatorParam}
3535
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
3636
import org.apache.spark.broadcast.Broadcast
37-
import org.apache.spark.rdd.{EmptyRDD, RDD}
37+
import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, NewHadoopRDD, RDD}
3838

3939
/**
4040
* A Java-friendly version of [[org.apache.spark.SparkContext]] that returns
@@ -294,7 +294,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
294294
): JavaPairRDD[K, V] = {
295295
implicit val ctagK: ClassTag[K] = ClassTag(keyClass)
296296
implicit val ctagV: ClassTag[V] = ClassTag(valueClass)
297-
new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass, minPartitions))
297+
val rdd = sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass, minPartitions)
298+
new JavaHadoopRDD(rdd.asInstanceOf[HadoopRDD[K, V]])
298299
}
299300

300301
/**
@@ -314,7 +315,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
314315
): JavaPairRDD[K, V] = {
315316
implicit val ctagK: ClassTag[K] = ClassTag(keyClass)
316317
implicit val ctagV: ClassTag[V] = ClassTag(valueClass)
317-
new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass))
318+
val rdd = sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass)
319+
new JavaHadoopRDD(rdd.asInstanceOf[HadoopRDD[K, V]])
318320
}
319321

320322
/** Get an RDD for a Hadoop file with an arbitrary InputFormat.
@@ -333,7 +335,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
333335
): JavaPairRDD[K, V] = {
334336
implicit val ctagK: ClassTag[K] = ClassTag(keyClass)
335337
implicit val ctagV: ClassTag[V] = ClassTag(valueClass)
336-
new JavaPairRDD(sc.hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions))
338+
val rdd = sc.hadoopFile(path, inputFormatClass, keyClass, valueClass, minPartitions)
339+
new JavaHadoopRDD(rdd.asInstanceOf[HadoopRDD[K, V]])
337340
}
338341

339342
/** Get an RDD for a Hadoop file with an arbitrary InputFormat
@@ -351,8 +354,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
351354
): JavaPairRDD[K, V] = {
352355
implicit val ctagK: ClassTag[K] = ClassTag(keyClass)
353356
implicit val ctagV: ClassTag[V] = ClassTag(valueClass)
354-
new JavaPairRDD(sc.hadoopFile(path,
355-
inputFormatClass, keyClass, valueClass))
357+
val rdd = sc.hadoopFile(path, inputFormatClass, keyClass, valueClass)
358+
new JavaHadoopRDD(rdd.asInstanceOf[HadoopRDD[K, V]])
356359
}
357360

358361
/**
@@ -372,7 +375,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
372375
conf: Configuration): JavaPairRDD[K, V] = {
373376
implicit val ctagK: ClassTag[K] = ClassTag(kClass)
374377
implicit val ctagV: ClassTag[V] = ClassTag(vClass)
375-
new JavaPairRDD(sc.newAPIHadoopFile(path, fClass, kClass, vClass, conf))
378+
val rdd = sc.newAPIHadoopFile(path, fClass, kClass, vClass, conf)
379+
new JavaNewHadoopRDD(rdd.asInstanceOf[NewHadoopRDD[K, V]])
376380
}
377381

378382
/**
@@ -391,7 +395,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
391395
vClass: Class[V]): JavaPairRDD[K, V] = {
392396
implicit val ctagK: ClassTag[K] = ClassTag(kClass)
393397
implicit val ctagV: ClassTag[V] = ClassTag(vClass)
394-
new JavaPairRDD(sc.newAPIHadoopRDD(conf, fClass, kClass, vClass))
398+
val rdd = sc.newAPIHadoopRDD(conf, fClass, kClass, vClass)
399+
new JavaNewHadoopRDD(rdd.asInstanceOf[NewHadoopRDD[K, V]])
395400
}
396401

397402
/** Build the union of two or more RDDs. */

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ object SparkSubmit {
184184
OptionAssigner(args.archives, YARN, CLIENT, sysProp = "spark.yarn.dist.archives"),
185185

186186
// Yarn cluster only
187-
OptionAssigner(args.name, YARN, CLUSTER, clOption = "--name", sysProp = "spark.app.name"),
187+
OptionAssigner(args.name, YARN, CLUSTER, clOption = "--name"),
188188
OptionAssigner(args.driverMemory, YARN, CLUSTER, clOption = "--driver-memory"),
189189
OptionAssigner(args.queue, YARN, CLUSTER, clOption = "--queue"),
190190
OptionAssigner(args.numExecutors, YARN, CLUSTER, clOption = "--num-executors"),
@@ -268,14 +268,17 @@ object SparkSubmit {
268268
}
269269
}
270270

271+
// Properties given with --conf are superceded by other options, but take precedence over
272+
// properties in the defaults file.
273+
for ((k, v) <- args.sparkProperties) {
274+
sysProps.getOrElseUpdate(k, v)
275+
}
276+
271277
// Read from default spark properties, if any
272278
for ((k, v) <- args.getDefaultSparkProperties) {
273279
sysProps.getOrElseUpdate(k, v)
274280
}
275281

276-
// Spark properties included on command line take precedence
277-
sysProps ++= args.sparkProperties
278-
279282
(childArgs, childClasspath, sysProps, childMainClass)
280283
}
281284

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
5858
val sparkProperties: HashMap[String, String] = new HashMap[String, String]()
5959

6060
parseOpts(args.toList)
61-
loadDefaults()
61+
mergeSparkProperties()
6262
checkRequiredArguments()
6363

6464
/** Return default present in the currently defined defaults file. */
@@ -79,9 +79,11 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
7979
defaultProperties
8080
}
8181

82-
/** Fill in any undefined values based on the current properties file or built-in defaults. */
83-
private def loadDefaults(): Unit = {
84-
82+
/**
83+
* Fill in any undefined values based on the default properties file or options passed in through
84+
* the '--conf' flag.
85+
*/
86+
private def mergeSparkProperties(): Unit = {
8587
// Use common defaults file, if not specified by user
8688
if (propertiesFile == null) {
8789
sys.env.get("SPARK_HOME").foreach { sparkHome =>
@@ -94,18 +96,20 @@ private[spark] class SparkSubmitArguments(args: Seq[String]) {
9496
}
9597
}
9698

97-
val defaultProperties = getDefaultSparkProperties
99+
val properties = getDefaultSparkProperties
100+
properties.putAll(sparkProperties)
101+
98102
// Use properties file as fallback for values which have a direct analog to
99103
// arguments in this script.
100-
master = Option(master).getOrElse(defaultProperties.get("spark.master").orNull)
104+
master = Option(master).getOrElse(properties.get("spark.master").orNull)
101105
executorMemory = Option(executorMemory)
102-
.getOrElse(defaultProperties.get("spark.executor.memory").orNull)
106+
.getOrElse(properties.get("spark.executor.memory").orNull)
103107
executorCores = Option(executorCores)
104-
.getOrElse(defaultProperties.get("spark.executor.cores").orNull)
108+
.getOrElse(properties.get("spark.executor.cores").orNull)
105109
totalExecutorCores = Option(totalExecutorCores)
106-
.getOrElse(defaultProperties.get("spark.cores.max").orNull)
107-
name = Option(name).getOrElse(defaultProperties.get("spark.app.name").orNull)
108-
jars = Option(jars).getOrElse(defaultProperties.get("spark.jars").orNull)
110+
.getOrElse(properties.get("spark.cores.max").orNull)
111+
name = Option(name).getOrElse(properties.get("spark.app.name").orNull)
112+
jars = Option(jars).getOrElse(properties.get("spark.jars").orNull)
109113

110114
// This supports env vars in older versions of Spark
111115
master = Option(master).getOrElse(System.getenv("MASTER"))

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,11 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
3636
conf.getInt("spark.history.updateInterval", 10)) * 1000
3737

3838
private val logDir = conf.get("spark.history.fs.logDirectory", null)
39-
if (logDir == null) {
40-
throw new IllegalArgumentException("Logging directory must be specified.")
41-
}
39+
private val resolvedLogDir = Option(logDir)
40+
.map { d => Utils.resolveURI(d) }
41+
.getOrElse { throw new IllegalArgumentException("Logging directory must be specified.") }
4242

43-
private val fs = Utils.getHadoopFileSystem(logDir)
43+
private val fs = Utils.getHadoopFileSystem(resolvedLogDir)
4444

4545
// A timestamp of when the disk was last accessed to check for log updates
4646
private var lastLogCheckTimeMs = -1L
@@ -76,14 +76,14 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
7676

7777
private def initialize() {
7878
// Validate the log directory.
79-
val path = new Path(logDir)
79+
val path = new Path(resolvedLogDir)
8080
if (!fs.exists(path)) {
8181
throw new IllegalArgumentException(
82-
"Logging directory specified does not exist: %s".format(logDir))
82+
"Logging directory specified does not exist: %s".format(resolvedLogDir))
8383
}
8484
if (!fs.getFileStatus(path).isDir) {
8585
throw new IllegalArgumentException(
86-
"Logging directory specified is not a directory: %s".format(logDir))
86+
"Logging directory specified is not a directory: %s".format(resolvedLogDir))
8787
}
8888

8989
checkForLogs()
@@ -95,15 +95,16 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
9595

9696
override def getAppUI(appId: String): SparkUI = {
9797
try {
98-
val appLogDir = fs.getFileStatus(new Path(logDir, appId))
99-
loadAppInfo(appLogDir, true)._2
98+
val appLogDir = fs.getFileStatus(new Path(resolvedLogDir.toString, appId))
99+
val (_, ui) = loadAppInfo(appLogDir, renderUI = true)
100+
ui
100101
} catch {
101102
case e: FileNotFoundException => null
102103
}
103104
}
104105

105106
override def getConfig(): Map[String, String] =
106-
Map(("Event Log Location" -> logDir))
107+
Map("Event Log Location" -> resolvedLogDir.toString)
107108

108109
/**
109110
* Builds the application list based on the current contents of the log directory.
@@ -114,14 +115,14 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
114115
lastLogCheckTimeMs = getMonotonicTimeMs()
115116
logDebug("Checking for logs. Time is now %d.".format(lastLogCheckTimeMs))
116117
try {
117-
val logStatus = fs.listStatus(new Path(logDir))
118+
val logStatus = fs.listStatus(new Path(resolvedLogDir))
118119
val logDirs = if (logStatus != null) logStatus.filter(_.isDir).toSeq else Seq[FileStatus]()
119-
val logInfos = logDirs.filter {
120-
dir => fs.isFile(new Path(dir.getPath(), EventLoggingListener.APPLICATION_COMPLETE))
120+
val logInfos = logDirs.filter { dir =>
121+
fs.isFile(new Path(dir.getPath, EventLoggingListener.APPLICATION_COMPLETE))
121122
}
122123

123124
val currentApps = Map[String, ApplicationHistoryInfo](
124-
appList.map(app => (app.id -> app)):_*)
125+
appList.map(app => app.id -> app):_*)
125126

126127
// For any application that either (i) is not listed or (ii) has changed since the last time
127128
// the listing was created (defined by the log dir's modification time), load the app's info.
@@ -131,7 +132,8 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
131132
val curr = currentApps.getOrElse(dir.getPath().getName(), null)
132133
if (curr == null || curr.lastUpdated < getModificationTime(dir)) {
133134
try {
134-
newApps += loadAppInfo(dir, false)._1
135+
val (app, _) = loadAppInfo(dir, renderUI = false)
136+
newApps += app
135137
} catch {
136138
case e: Exception => logError(s"Failed to load app info from directory $dir.")
137139
}
@@ -159,9 +161,9 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
159161
* @return A 2-tuple `(app info, ui)`. `ui` will be null if `renderUI` is false.
160162
*/
161163
private def loadAppInfo(logDir: FileStatus, renderUI: Boolean) = {
162-
val elogInfo = EventLoggingListener.parseLoggingInfo(logDir.getPath(), fs)
163164
val path = logDir.getPath
164165
val appId = path.getName
166+
val elogInfo = EventLoggingListener.parseLoggingInfo(path, fs)
165167
val replayBus = new ReplayListenerBus(elogInfo.logPaths, fs, elogInfo.compressionCodec)
166168
val appListener = new ApplicationEventListener
167169
replayBus.addListener(appListener)

core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
4545
<div class="row-fluid">
4646
<div class="span12">
4747
<ul class="unstyled">
48-
{ providerConfig.map(e => <li><strong>{e._1}:</strong> {e._2}</li>) }
48+
{providerConfig.map { case (k, v) => <li><strong>{k}:</strong> {v}</li> }}
4949
</ul>
5050
{
5151
if (allApps.size > 0) {

core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@ import org.eclipse.jetty.servlet.{ServletContextHandler, ServletHolder}
2525

2626
import org.apache.spark.{Logging, SecurityManager, SparkConf}
2727
import org.apache.spark.deploy.SparkHadoopUtil
28-
import org.apache.spark.ui.{WebUI, SparkUI, UIUtils}
28+
import org.apache.spark.ui.{SparkUI, UIUtils, WebUI}
2929
import org.apache.spark.ui.JettyUtils._
30-
import org.apache.spark.util.{SignalLogger, Utils}
30+
import org.apache.spark.util.SignalLogger
3131

3232
/**
3333
* A web server that renders SparkUIs of completed applications.
@@ -177,7 +177,7 @@ object HistoryServer extends Logging {
177177
def main(argStrings: Array[String]) {
178178
SignalLogger.register(log)
179179
initSecurity()
180-
val args = new HistoryServerArguments(conf, argStrings)
180+
new HistoryServerArguments(conf, argStrings)
181181
val securityManager = new SecurityManager(conf)
182182

183183
val providerName = conf.getOption("spark.history.provider")

core/src/main/scala/org/apache/spark/deploy/history/HistoryServerArguments.scala

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package org.apache.spark.deploy.history
1919

2020
import org.apache.spark.SparkConf
21-
import org.apache.spark.util.Utils
2221

2322
/**
2423
* Command-line parser for the master.
@@ -32,6 +31,7 @@ private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String]
3231
args match {
3332
case ("--dir" | "-d") :: value :: tail =>
3433
logDir = value
34+
conf.set("spark.history.fs.logDirectory", value)
3535
parse(tail)
3636

3737
case ("--help" | "-h") :: tail =>
@@ -42,9 +42,6 @@ private[spark] class HistoryServerArguments(conf: SparkConf, args: Array[String]
4242
case _ =>
4343
printUsageAndExit(1)
4444
}
45-
if (logDir != null) {
46-
conf.set("spark.history.fs.logDirectory", logDir)
47-
}
4845
}
4946

5047
private def printUsageAndExit(exitCode: Int) {

0 commit comments

Comments
 (0)