Skip to content

Commit a1b8af1

Browse files
author
Marcelo Vanzin
committed
Merge branch 'master' into SPARK-4924
2 parents 897141f + 70f8814 commit a1b8af1

File tree

144 files changed

+1549
-1051
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

144 files changed

+1549
-1051
lines changed

assembly/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
<modelVersion>4.0.0</modelVersion>
2121
<parent>
2222
<groupId>org.apache.spark</groupId>
23-
<artifactId>spark-parent</artifactId>
23+
<artifactId>spark-parent_2.10</artifactId>
2424
<version>1.3.0-SNAPSHOT</version>
2525
<relativePath>../pom.xml</relativePath>
2626
</parent>

bagel/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
<modelVersion>4.0.0</modelVersion>
2121
<parent>
2222
<groupId>org.apache.spark</groupId>
23-
<artifactId>spark-parent</artifactId>
23+
<artifactId>spark-parent_2.10</artifactId>
2424
<version>1.3.0-SNAPSHOT</version>
2525
<relativePath>../pom.xml</relativePath>
2626
</parent>

core/pom.xml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
<modelVersion>4.0.0</modelVersion>
2121
<parent>
2222
<groupId>org.apache.spark</groupId>
23-
<artifactId>spark-parent</artifactId>
23+
<artifactId>spark-parent_2.10</artifactId>
2424
<version>1.3.0-SNAPSHOT</version>
2525
<relativePath>../pom.xml</relativePath>
2626
</parent>
@@ -324,6 +324,12 @@
324324
<artifactId>selenium-java</artifactId>
325325
<scope>test</scope>
326326
</dependency>
327+
<!-- Added for selenium: -->
328+
<dependency>
329+
<groupId>xml-apis</groupId>
330+
<artifactId>xml-apis</artifactId>
331+
<scope>test</scope>
332+
</dependency>
327333
<dependency>
328334
<groupId>org.mockito</groupId>
329335
<artifactId>mockito-all</artifactId>

core/src/main/scala/org/apache/spark/ContextCleaner.scala

Lines changed: 27 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -105,9 +105,19 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
105105
cleaningThread.start()
106106
}
107107

108-
/** Stop the cleaner. */
108+
/**
109+
* Stop the cleaning thread and wait until the thread has finished running its current task.
110+
*/
109111
def stop() {
110112
stopped = true
113+
// Interrupt the cleaning thread, but wait until the current task has finished before
114+
// doing so. This guards against the race condition where a cleaning thread may
115+
// potentially clean similarly named variables created by a different SparkContext,
116+
// resulting in otherwise inexplicable block-not-found exceptions (SPARK-6132).
117+
synchronized {
118+
cleaningThread.interrupt()
119+
}
120+
cleaningThread.join()
111121
}
112122

113123
/** Register a RDD for cleanup when it is garbage collected. */
@@ -140,21 +150,25 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
140150
try {
141151
val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
142152
.map(_.asInstanceOf[CleanupTaskWeakReference])
143-
reference.map(_.task).foreach { task =>
144-
logDebug("Got cleaning task " + task)
145-
referenceBuffer -= reference.get
146-
task match {
147-
case CleanRDD(rddId) =>
148-
doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
149-
case CleanShuffle(shuffleId) =>
150-
doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
151-
case CleanBroadcast(broadcastId) =>
152-
doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
153-
case CleanAccum(accId) =>
154-
doCleanupAccum(accId, blocking = blockOnCleanupTasks)
153+
// Synchronize here to avoid being interrupted on stop()
154+
synchronized {
155+
reference.map(_.task).foreach { task =>
156+
logDebug("Got cleaning task " + task)
157+
referenceBuffer -= reference.get
158+
task match {
159+
case CleanRDD(rddId) =>
160+
doCleanupRDD(rddId, blocking = blockOnCleanupTasks)
161+
case CleanShuffle(shuffleId) =>
162+
doCleanupShuffle(shuffleId, blocking = blockOnShuffleCleanupTasks)
163+
case CleanBroadcast(broadcastId) =>
164+
doCleanupBroadcast(broadcastId, blocking = blockOnCleanupTasks)
165+
case CleanAccum(accId) =>
166+
doCleanupAccum(accId, blocking = blockOnCleanupTasks)
167+
}
155168
}
156169
}
157170
} catch {
171+
case ie: InterruptedException if stopped => // ignore
158172
case e: Exception => logError("Error in cleaning thread", e)
159173
}
160174
}

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1392,10 +1392,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
13921392
/** Shut down the SparkContext. */
13931393
def stop() {
13941394
SparkContext.SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
1395-
postApplicationEnd()
1396-
ui.foreach(_.stop())
13971395
if (!stopped) {
13981396
stopped = true
1397+
postApplicationEnd()
1398+
ui.foreach(_.stop())
13991399
env.metricsSystem.report()
14001400
metadataCleaner.cancel()
14011401
cleaner.foreach(_.stop())

core/src/main/scala/org/apache/spark/deploy/LocalSparkCluster.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ class LocalSparkCluster(
5959
/* Start the Workers */
6060
for (workerNum <- 1 to numWorkers) {
6161
val (workerSystem, _) = Worker.startSystemAndActor(localHostname, 0, 0, coresPerWorker,
62-
memoryPerWorker, masters, null, Some(workerNum))
62+
memoryPerWorker, masters, null, Some(workerNum), _conf)
6363
workerActorSystems += workerSystem
6464
}
6565

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ private[spark] class Master(
9696
val webUi = new MasterWebUI(this, webUiPort)
9797

9898
val masterPublicAddress = {
99-
val envVar = System.getenv("SPARK_PUBLIC_DNS")
99+
val envVar = conf.getenv("SPARK_PUBLIC_DNS")
100100
if (envVar != null) envVar else host
101101
}
102102

@@ -736,30 +736,34 @@ private[spark] class Master(
736736
val appName = app.desc.name
737737
val notFoundBasePath = HistoryServer.UI_PATH_PREFIX + "/not-found"
738738
try {
739-
val eventLogFile = app.desc.eventLogDir
740-
.map { dir => EventLoggingListener.getLogPath(dir, app.id, app.desc.eventLogCodec) }
739+
val eventLogDir = app.desc.eventLogDir
741740
.getOrElse {
742741
// Event logging is not enabled for this application
743742
app.desc.appUiUrl = notFoundBasePath
744743
return false
745744
}
746-
747-
val fs = Utils.getHadoopFileSystem(eventLogFile, hadoopConf)
748-
749-
if (fs.exists(new Path(eventLogFile + EventLoggingListener.IN_PROGRESS))) {
745+
746+
val eventLogFilePrefix = EventLoggingListener.getLogPath(
747+
eventLogDir, app.id, app.desc.eventLogCodec)
748+
val fs = Utils.getHadoopFileSystem(eventLogDir, hadoopConf)
749+
val inProgressExists = fs.exists(new Path(eventLogFilePrefix +
750+
EventLoggingListener.IN_PROGRESS))
751+
752+
if (inProgressExists) {
750753
// Event logging is enabled for this application, but the application is still in progress
751-
val title = s"Application history not found (${app.id})"
752-
var msg = s"Application $appName is still in progress."
753-
logWarning(msg)
754-
msg = URLEncoder.encode(msg, "UTF-8")
755-
app.desc.appUiUrl = notFoundBasePath + s"?msg=$msg&title=$title"
756-
return false
754+
logWarning(s"Application $appName is still in progress, it may be terminated abnormally.")
757755
}
758-
756+
757+
val (eventLogFile, status) = if (inProgressExists) {
758+
(eventLogFilePrefix + EventLoggingListener.IN_PROGRESS, " (in progress)")
759+
} else {
760+
(eventLogFilePrefix, " (completed)")
761+
}
762+
759763
val logInput = EventLoggingListener.openEventLog(new Path(eventLogFile), fs)
760764
val replayBus = new ReplayListenerBus()
761765
val ui = SparkUI.createHistoryUI(new SparkConf, replayBus, new SecurityManager(conf),
762-
appName + " (completed)", HistoryServer.UI_PATH_PREFIX + s"/${app.id}")
766+
appName + status, HistoryServer.UI_PATH_PREFIX + s"/${app.id}")
763767
try {
764768
replayBus.replay(logInput, eventLogFile)
765769
} finally {
@@ -774,7 +778,7 @@ private[spark] class Master(
774778
case fnf: FileNotFoundException =>
775779
// Event logging is enabled for this application, but no event logs are found
776780
val title = s"Application history not found (${app.id})"
777-
var msg = s"No event logs found for application $appName in ${app.desc.eventLogDir}."
781+
var msg = s"No event logs found for application $appName in ${app.desc.eventLogDir.get}."
778782
logWarning(msg)
779783
msg += " Did you specify the correct logging directory?"
780784
msg = URLEncoder.encode(msg, "UTF-8")

core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ private[spark] class ExecutorRunner(
4444
val workerId: String,
4545
val host: String,
4646
val webUiPort: Int,
47+
val publicAddress: String,
4748
val sparkHome: File,
4849
val executorDir: File,
4950
val workerUrl: String,
@@ -140,7 +141,8 @@ private[spark] class ExecutorRunner(
140141
builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")
141142

142143
// Add webUI log urls
143-
val baseUrl = s"http://$host:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
144+
val baseUrl =
145+
s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
144146
builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
145147
builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")
146148

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -121,7 +121,7 @@ private[spark] class Worker(
121121
val shuffleService = new StandaloneWorkerShuffleService(conf, securityMgr)
122122

123123
val publicAddress = {
124-
val envVar = System.getenv("SPARK_PUBLIC_DNS")
124+
val envVar = conf.getenv("SPARK_PUBLIC_DNS")
125125
if (envVar != null) envVar else host
126126
}
127127
var webUi: WorkerWebUI = null
@@ -362,7 +362,8 @@ private[spark] class Worker(
362362
self,
363363
workerId,
364364
host,
365-
webUiPort,
365+
webUi.boundPort,
366+
publicAddress,
366367
sparkHome,
367368
executorDir,
368369
akkaUrl,
@@ -538,10 +539,10 @@ private[spark] object Worker extends Logging {
538539
memory: Int,
539540
masterUrls: Array[String],
540541
workDir: String,
541-
workerNumber: Option[Int] = None): (ActorSystem, Int) = {
542+
workerNumber: Option[Int] = None,
543+
conf: SparkConf = new SparkConf): (ActorSystem, Int) = {
542544

543545
// The LocalSparkCluster runs multiple local sparkWorkerX actor systems
544-
val conf = new SparkConf
545546
val systemName = "sparkWorker" + workerNumber.map(_.toString).getOrElse("")
546547
val actorName = "Worker"
547548
val securityMgr = new SecurityManager(conf)

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MemoryUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import org.apache.spark.SparkContext
2121

2222
private[spark] object MemoryUtils {
2323
// These defaults copied from YARN
24-
val OVERHEAD_FRACTION = 1.07
24+
val OVERHEAD_FRACTION = 1.10
2525
val OVERHEAD_MINIMUM = 384
2626

2727
def calculateTotalMemory(sc: SparkContext) = {

0 commit comments

Comments
 (0)