Skip to content

Commit 26c56b7

Browse files
committed
Merge branch 'master' into rpc-rewrite
Conflicts: core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala core/src/main/scala/org/apache/spark/SparkContext.scala core/src/main/scala/org/apache/spark/util/Utils.scala yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
2 parents 9cc825a + 9b40c17 commit 26c56b7

File tree

200 files changed

+4606
-2067
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

200 files changed

+4606
-2067
lines changed

core/src/main/resources/org/apache/spark/ui/static/additional-metrics.js

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ $(function() {
3030

3131
stripeSummaryTable();
3232

33-
$("input:checkbox").click(function() {
33+
$('input[type="checkbox"]').click(function() {
3434
var column = "table ." + $(this).attr("name");
3535
$(column).toggle();
3636
stripeSummaryTable();
@@ -39,15 +39,15 @@ $(function() {
3939
$("#select-all-metrics").click(function() {
4040
if (this.checked) {
4141
// Toggle all un-checked options.
42-
$('input:checkbox:not(:checked)').trigger('click');
42+
$('input[type="checkbox"]:not(:checked)').trigger('click');
4343
} else {
4444
// Toggle all checked options.
45-
$('input:checkbox:checked').trigger('click');
45+
$('input[type="checkbox"]:checked').trigger('click');
4646
}
4747
});
4848

4949
// Trigger a click on the checkbox if a user clicks the label next to it.
5050
$("span.additional-metric-title").click(function() {
51-
$(this).parent().find('input:checkbox').trigger('click');
51+
$(this).parent().find('input[type="checkbox"]').trigger('click');
5252
});
5353
});

core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala

Lines changed: 18 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,12 @@
1717

1818
package org.apache.spark
1919

20+
import java.util.concurrent.{Executors, TimeUnit}
21+
2022
import scala.collection.mutable
2123

2224
import org.apache.spark.scheduler._
23-
import org.apache.spark.util.{SystemClock, Clock}
25+
import org.apache.spark.util.{Clock, SystemClock, Utils}
2426

2527
/**
2628
* An agent that dynamically allocates and removes executors based on the workload.
@@ -129,6 +131,10 @@ private[spark] class ExecutorAllocationManager(
129131
// Listener for Spark events that impact the allocation policy
130132
private val listener = new ExecutorAllocationListener
131133

134+
// Executor that handles the scheduling task.
135+
private val executor = Executors.newSingleThreadScheduledExecutor(
136+
Utils.namedThreadFactory("spark-dynamic-executor-allocation"))
137+
132138
/**
133139
* Verify that the settings specified through the config are valid.
134140
* If not, throw an appropriate exception.
@@ -173,32 +179,24 @@ private[spark] class ExecutorAllocationManager(
173179
}
174180

175181
/**
176-
* Register for scheduler callbacks to decide when to add and remove executors.
182+
* Register for scheduler callbacks to decide when to add and remove executors, and start
183+
* the scheduling task.
177184
*/
178185
def start(): Unit = {
179186
listenerBus.addListener(listener)
180-
startPolling()
187+
188+
val scheduleTask = new Runnable() {
189+
override def run(): Unit = Utils.logUncaughtExceptions(schedule())
190+
}
191+
executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
181192
}
182193

183194
/**
184-
* Start the main polling thread that keeps track of when to add and remove executors.
195+
* Stop the allocation manager.
185196
*/
186-
private def startPolling(): Unit = {
187-
val t = new Thread {
188-
override def run(): Unit = {
189-
while (true) {
190-
try {
191-
schedule()
192-
} catch {
193-
case e: Exception => logError("Exception in dynamic executor allocation thread!", e)
194-
}
195-
Thread.sleep(intervalMillis)
196-
}
197-
}
198-
}
199-
t.setName("spark-dynamic-executor-allocation")
200-
t.setDaemon(true)
201-
t.start()
197+
def stop(): Unit = {
198+
executor.shutdown()
199+
executor.awaitTermination(10, TimeUnit.SECONDS)
202200
}
203201

204202
/**

core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,18 +37,26 @@ private[spark] case class Heartbeat(
3737
taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics
3838
blockManagerId: BlockManagerId)
3939

40+
/**
41+
* An event that SparkContext uses to notify HeartbeatReceiver that SparkContext.taskScheduler is
42+
* created.
43+
*/
44+
private[spark] case object TaskSchedulerIsSet
45+
4046
private[spark] case object ExpireDeadHosts
4147

4248
private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean)
4349

4450
/**
4551
* Lives in the driver to receive heartbeats from executors..
4652
*/
47-
private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskScheduler)
53+
private[spark] class HeartbeatReceiver(sc: SparkContext)
4854
extends ThreadSafeRpcEndpoint with Logging {
4955

5056
override val rpcEnv: RpcEnv = sc.env.rpcEnv
5157

58+
private[spark] var scheduler: TaskScheduler = null
59+
5260
// executor ID -> timestamp of when the last heartbeat from this executor was received
5361
private val executorLastSeen = new mutable.HashMap[String, Long]
5462

@@ -82,15 +90,25 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskSchedule
8290
override def receive: PartialFunction[Any, Unit] = {
8391
case ExpireDeadHosts =>
8492
expireDeadHosts()
93+
case TaskSchedulerIsSet =>
94+
scheduler = sc.taskScheduler
8595
}
8696

8797
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
88-
case Heartbeat(executorId, taskMetrics, blockManagerId) =>
89-
val unknownExecutor = !scheduler.executorHeartbeatReceived(
90-
executorId, taskMetrics, blockManagerId)
91-
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
92-
executorLastSeen(executorId) = System.currentTimeMillis()
93-
context.reply(response)
98+
case heartbeat @ Heartbeat(executorId, taskMetrics, blockManagerId) =>
99+
if (scheduler != null) {
100+
val unknownExecutor = !scheduler.executorHeartbeatReceived(
101+
executorId, taskMetrics, blockManagerId)
102+
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
103+
executorLastSeen(executorId) = System.currentTimeMillis()
104+
context.reply(response)
105+
} else {
106+
// Because Executor will sleep several seconds before sending the first "Heartbeat", this
107+
// case rarely happens. However, if it really happens, log it and ask the executor to
108+
// register itself again.
109+
logWarning(s"Dropping $heartbeat because TaskScheduler is not ready yet")
110+
context.reply(HeartbeatResponse(reregisterBlockManager = true))
111+
}
94112
}
95113

96114
private def expireDeadHosts(): Unit = {

core/src/main/scala/org/apache/spark/MapOutputTracker.scala

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -356,11 +356,14 @@ private[spark] object MapOutputTracker extends Logging {
356356
def serializeMapStatuses(statuses: Array[MapStatus]): Array[Byte] = {
357357
val out = new ByteArrayOutputStream
358358
val objOut = new ObjectOutputStream(new GZIPOutputStream(out))
359-
// Since statuses can be modified in parallel, sync on it
360-
statuses.synchronized {
361-
objOut.writeObject(statuses)
359+
Utils.tryWithSafeFinally {
360+
// Since statuses can be modified in parallel, sync on it
361+
statuses.synchronized {
362+
objOut.writeObject(statuses)
363+
}
364+
} {
365+
objOut.close()
362366
}
363-
objOut.close()
364367
out.toByteArray
365368
}
366369

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 43 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import java.io._
2323
import java.lang.reflect.Constructor
2424
import java.net.URI
2525
import java.util.{Arrays, Properties, UUID}
26-
import java.util.concurrent.atomic.AtomicInteger
26+
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
2727
import java.util.UUID.randomUUID
2828

2929
import scala.collection.{Map, Set}
@@ -94,10 +94,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
9494

9595
val startTime = System.currentTimeMillis()
9696

97-
@volatile private var stopped: Boolean = false
97+
private val stopped: AtomicBoolean = new AtomicBoolean(false)
9898

9999
private def assertNotStopped(): Unit = {
100-
if (stopped) {
100+
if (stopped.get()) {
101101
throw new IllegalStateException("Cannot call methods on a stopped SparkContext")
102102
}
103103
}
@@ -226,9 +226,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
226226
val appName = conf.get("spark.app.name")
227227

228228
private[spark] val isEventLogEnabled = conf.getBoolean("spark.eventLog.enabled", false)
229-
private[spark] val eventLogDir: Option[String] = {
229+
private[spark] val eventLogDir: Option[URI] = {
230230
if (isEventLogEnabled) {
231-
Some(conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR).stripSuffix("/"))
231+
val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR)
232+
.stripSuffix("/")
233+
Some(Utils.resolveURI(unresolvedDir))
232234
} else {
233235
None
234236
}
@@ -355,11 +357,16 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
355357
val sparkUser = Utils.getCurrentUserName()
356358
executorEnvs("SPARK_USER") = sparkUser
357359

360+
// We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will
361+
// retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640)
362+
private val heartbeatReceiver = env.rpcEnv.setupEndpoint(
363+
HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
364+
358365
// Create and start the scheduler
359366
private[spark] var (schedulerBackend, taskScheduler) =
360367
SparkContext.createTaskScheduler(this, master)
361-
private val heartbeatReceiver = env.rpcEnv.setupEndpoint(
362-
HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this, taskScheduler))
368+
369+
heartbeatReceiver.send(TaskSchedulerIsSet)
363370

364371
@volatile private[spark] var dagScheduler: DAGScheduler = _
365372
try {
@@ -433,6 +440,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
433440
// Thread Local variable that can be used by users to pass information down the stack
434441
private val localProperties = new InheritableThreadLocal[Properties] {
435442
override protected def childValue(parent: Properties): Properties = new Properties(parent)
443+
override protected def initialValue(): Properties = new Properties()
436444
}
437445

438446
/**
@@ -476,9 +484,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
476484
* Spark fair scheduler pool.
477485
*/
478486
def setLocalProperty(key: String, value: String) {
479-
if (localProperties.get() == null) {
480-
localProperties.set(new Properties())
481-
}
482487
if (value == null) {
483488
localProperties.get.remove(key)
484489
} else {
@@ -1140,7 +1145,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
11401145
* Return whether dynamically adjusting the amount of resources allocated to
11411146
* this application is supported. This is currently only available for YARN.
11421147
*/
1143-
private[spark] def supportDynamicAllocation =
1148+
private[spark] def supportDynamicAllocation =
11441149
master.contains("yarn") || dynamicAllocationTesting
11451150

11461151
/**
@@ -1394,32 +1399,34 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
13941399
addedJars.clear()
13951400
}
13961401

1397-
/** Shut down the SparkContext. */
1402+
// Shut down the SparkContext.
13981403
def stop() {
1399-
SparkContext.SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
1400-
if (!stopped) {
1401-
stopped = true
1402-
postApplicationEnd()
1403-
ui.foreach(_.stop())
1404-
env.metricsSystem.report()
1405-
metadataCleaner.cancel()
1406-
cleaner.foreach(_.stop())
1407-
dagScheduler.stop()
1408-
dagScheduler = null
1409-
listenerBus.stop()
1410-
eventLogger.foreach(_.stop())
1411-
env.rpcEnv.stop(heartbeatReceiver)
1412-
progressBar.foreach(_.stop())
1413-
taskScheduler = null
1414-
// TODO: Cache.stop()?
1415-
env.stop()
1416-
SparkEnv.set(null)
1417-
logInfo("Successfully stopped SparkContext")
1418-
SparkContext.clearActiveContext()
1419-
} else {
1420-
logInfo("SparkContext already stopped")
1421-
}
1404+
// Use the stopping variable to ensure no contention for the stop scenario.
1405+
// Still track the stopped variable for use elsewhere in the code.
1406+
1407+
if (!stopped.compareAndSet(false, true)) {
1408+
logInfo("SparkContext already stopped.")
1409+
return
14221410
}
1411+
1412+
postApplicationEnd()
1413+
ui.foreach(_.stop())
1414+
env.metricsSystem.report()
1415+
metadataCleaner.cancel()
1416+
cleaner.foreach(_.stop())
1417+
executorAllocationManager.foreach(_.stop())
1418+
dagScheduler.stop()
1419+
dagScheduler = null
1420+
listenerBus.stop()
1421+
eventLogger.foreach(_.stop())
1422+
env.rpcEnv.stop(heartbeatReceiver)
1423+
progressBar.foreach(_.stop())
1424+
taskScheduler = null
1425+
// TODO: Cache.stop()?
1426+
env.stop()
1427+
SparkEnv.set(null)
1428+
SparkContext.clearActiveContext()
1429+
logInfo("Successfully stopped SparkContext")
14231430
}
14241431

14251432

@@ -1481,7 +1488,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
14811488
partitions: Seq[Int],
14821489
allowLocal: Boolean,
14831490
resultHandler: (Int, U) => Unit) {
1484-
if (stopped) {
1491+
if (stopped.get()) {
14851492
throw new IllegalStateException("SparkContext has been shutdown")
14861493
}
14871494
val callSite = getCallSite

core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala

Lines changed: 2 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ import org.apache.hadoop.mapred._
2626
import org.apache.hadoop.fs.FileSystem
2727
import org.apache.hadoop.fs.Path
2828

29-
import org.apache.spark.executor.CommitDeniedException
3029
import org.apache.spark.mapred.SparkHadoopMapRedUtil
3130
import org.apache.spark.rdd.HadoopRDD
3231

@@ -104,55 +103,8 @@ class SparkHadoopWriter(@transient jobConf: JobConf)
104103
}
105104

106105
def commit() {
107-
val taCtxt = getTaskContext()
108-
val cmtr = getOutputCommitter()
109-
110-
// Called after we have decided to commit
111-
def performCommit(): Unit = {
112-
try {
113-
cmtr.commitTask(taCtxt)
114-
logInfo (s"$taID: Committed")
115-
} catch {
116-
case e: IOException =>
117-
logError("Error committing the output of task: " + taID.value, e)
118-
cmtr.abortTask(taCtxt)
119-
throw e
120-
}
121-
}
122-
123-
// First, check whether the task's output has already been committed by some other attempt
124-
if (cmtr.needsTaskCommit(taCtxt)) {
125-
// The task output needs to be committed, but we don't know whether some other task attempt
126-
// might be racing to commit the same output partition. Therefore, coordinate with the driver
127-
// in order to determine whether this attempt can commit (see SPARK-4879).
128-
val shouldCoordinateWithDriver: Boolean = {
129-
val sparkConf = SparkEnv.get.conf
130-
// We only need to coordinate with the driver if there are multiple concurrent task
131-
// attempts, which should only occur if speculation is enabled
132-
val speculationEnabled = sparkConf.getBoolean("spark.speculation", false)
133-
// This (undocumented) setting is an escape-hatch in case the commit code introduces bugs
134-
sparkConf.getBoolean("spark.hadoop.outputCommitCoordination.enabled", speculationEnabled)
135-
}
136-
if (shouldCoordinateWithDriver) {
137-
val outputCommitCoordinator = SparkEnv.get.outputCommitCoordinator
138-
val canCommit = outputCommitCoordinator.canCommit(jobID, splitID, attemptID)
139-
if (canCommit) {
140-
performCommit()
141-
} else {
142-
val msg = s"$taID: Not committed because the driver did not authorize commit"
143-
logInfo(msg)
144-
// We need to abort the task so that the driver can reschedule new attempts, if necessary
145-
cmtr.abortTask(taCtxt)
146-
throw new CommitDeniedException(msg, jobID, splitID, attemptID)
147-
}
148-
} else {
149-
// Speculation is disabled or a user has chosen to manually bypass the commit coordination
150-
performCommit()
151-
}
152-
} else {
153-
// Some other attempt committed the output, so we do nothing and signal success
154-
logInfo(s"No need to commit output of task because needsTaskCommit=false: ${taID.value}")
155-
}
106+
SparkHadoopMapRedUtil.commitTask(
107+
getOutputCommitter(), getTaskContext(), jobID, splitID, attemptID)
156108
}
157109

158110
def commitJob() {

0 commit comments

Comments
 (0)