Skip to content

Commit 2621609

Browse files
author
Marcelo Vanzin
committed
Merge branch 'master' into SPARK-4194
Conflicts: core/src/main/scala/org/apache/spark/SparkContext.scala
2 parents 6b73fcb + 14632b7 commit 2621609

File tree

13 files changed

+271
-140
lines changed

13 files changed

+271
-140
lines changed

core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala

Lines changed: 25 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,16 +37,24 @@ private[spark] case class Heartbeat(
3737
taskMetrics: Array[(Long, TaskMetrics)], // taskId -> TaskMetrics
3838
blockManagerId: BlockManagerId)
3939

40+
/**
41+
* An event that SparkContext uses to notify HeartbeatReceiver that SparkContext.taskScheduler is
42+
* created.
43+
*/
44+
private[spark] case object TaskSchedulerIsSet
45+
4046
private[spark] case object ExpireDeadHosts
4147

4248
private[spark] case class HeartbeatResponse(reregisterBlockManager: Boolean)
4349

4450
/**
4551
* Lives in the driver to receive heartbeats from executors..
4652
*/
47-
private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskScheduler)
53+
private[spark] class HeartbeatReceiver(sc: SparkContext)
4854
extends Actor with ActorLogReceive with Logging {
4955

56+
private var scheduler: TaskScheduler = null
57+
5058
// executor ID -> timestamp of when the last heartbeat from this executor was received
5159
private val executorLastSeen = new mutable.HashMap[String, Long]
5260

@@ -71,12 +79,22 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskSchedule
7179
}
7280

7381
override def receiveWithLogging: PartialFunction[Any, Unit] = {
74-
case Heartbeat(executorId, taskMetrics, blockManagerId) =>
75-
val unknownExecutor = !scheduler.executorHeartbeatReceived(
76-
executorId, taskMetrics, blockManagerId)
77-
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
78-
executorLastSeen(executorId) = System.currentTimeMillis()
79-
sender ! response
82+
case TaskSchedulerIsSet =>
83+
scheduler = sc.taskScheduler
84+
case heartbeat @ Heartbeat(executorId, taskMetrics, blockManagerId) =>
85+
if (scheduler != null) {
86+
val unknownExecutor = !scheduler.executorHeartbeatReceived(
87+
executorId, taskMetrics, blockManagerId)
88+
val response = HeartbeatResponse(reregisterBlockManager = unknownExecutor)
89+
executorLastSeen(executorId) = System.currentTimeMillis()
90+
sender ! response
91+
} else {
92+
// Because Executor will sleep several seconds before sending the first "Heartbeat", this
93+
// case rarely happens. However, if it really happens, log it and ask the executor to
94+
// register itself again.
95+
logWarning(s"Dropping $heartbeat because TaskScheduler is not ready yet")
96+
sender ! HeartbeatResponse(reregisterBlockManager = true)
97+
}
8098
case ExpireDeadHosts =>
8199
expireDeadHosts()
82100
}

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 48 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import java.io._
2323
import java.lang.reflect.Constructor
2424
import java.net.URI
2525
import java.util.{Arrays, Properties, UUID}
26-
import java.util.concurrent.atomic.AtomicInteger
26+
import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
2727
import java.util.UUID.randomUUID
2828

2929
import scala.collection.{Map, Set}
@@ -97,10 +97,10 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
9797

9898
val startTime = System.currentTimeMillis()
9999

100-
@volatile private var stopped: Boolean = false
100+
private val stopped: AtomicBoolean = new AtomicBoolean(false)
101101

102102
private def assertNotStopped(): Unit = {
103-
if (stopped) {
103+
if (stopped.get()) {
104104
throw new IllegalStateException("Cannot call methods on a stopped SparkContext")
105105
}
106106
}
@@ -203,7 +203,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
203203
* ------------------------------------------------------------------------------------- */
204204

205205
private var _conf: SparkConf = _
206-
private var _eventLogDir: Option[String] = None
206+
private var _eventLogDir: Option[URI] = None
207207
private var _eventLogCodec: Option[String] = None
208208
private var _env: SparkEnv = _
209209
private var _metadataCleaner: MetadataCleaner = _
@@ -244,7 +244,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
244244
def appName: String = _conf.get("spark.app.name")
245245

246246
private[spark] def isEventLogEnabled: Boolean = _conf.getBoolean("spark.eventLog.enabled", false)
247-
private[spark] def eventLogDir: Option[String] = _eventLogDir
247+
private[spark] def eventLogDir: Option[URI] = _eventLogDir
248248
private[spark] def eventLogCodec: Option[String] = _eventLogCodec
249249

250250
// Generate the random name for a temp folder in Tachyon
@@ -370,7 +370,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
370370

371371
_eventLogDir =
372372
if (isEventLogEnabled) {
373-
Some(conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR).stripSuffix("/"))
373+
val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR)
374+
.stripSuffix("/")
375+
Some(Utils.resolveURI(unresolvedDir))
374376
} else {
375377
None
376378
}
@@ -456,7 +458,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
456458
_schedulerBackend = sched
457459
_taskScheduler = ts
458460
_heartbeatReceiver = env.actorSystem.actorOf(
459-
Props(new HeartbeatReceiver(this, taskScheduler)), "HeartbeatReceiver")
461+
Props(new HeartbeatReceiver(this)), "HeartbeatReceiver")
460462
_dagScheduler = new DAGScheduler(this)
461463

462464
// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
@@ -1466,46 +1468,46 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
14661468
addedJars.clear()
14671469
}
14681470

1469-
/** Shut down the SparkContext. */
1471+
// Shut down the SparkContext.
14701472
def stop() {
1471-
SparkContext.SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
1472-
if (!stopped) {
1473-
stopped = true
1474-
postApplicationEnd()
1475-
_ui.foreach(_.stop())
1476-
if (env != null) {
1477-
env.metricsSystem.report()
1478-
}
1479-
if (metadataCleaner != null) {
1480-
metadataCleaner.cancel()
1481-
}
1482-
_cleaner.foreach(_.stop())
1483-
_executorAllocationManager.foreach(_.stop())
1484-
if (_dagScheduler != null) {
1485-
_dagScheduler.stop()
1486-
_dagScheduler = null
1487-
}
1488-
if (_listenerBusStarted) {
1489-
listenerBus.stop()
1490-
_listenerBusStarted = false
1491-
}
1492-
_eventLogger.foreach(_.stop())
1493-
if (env != null) {
1494-
env.actorSystem.stop(_heartbeatReceiver)
1495-
}
1496-
_progressBar.foreach(_.stop())
1497-
_taskScheduler = null
1498-
// TODO: Cache.stop()?
1499-
if (_env != null) {
1500-
_env.stop()
1501-
SparkEnv.set(null)
1502-
}
1503-
logInfo("Successfully stopped SparkContext")
1504-
SparkContext.clearActiveContext()
1505-
} else {
1506-
logInfo("SparkContext already stopped")
1507-
}
1473+
// Use the stopping variable to ensure no contention for the stop scenario.
1474+
// Still track the stopped variable for use elsewhere in the code.
1475+
if (!stopped.compareAndSet(false, true)) {
1476+
logInfo("SparkContext already stopped.")
1477+
return
1478+
}
1479+
1480+
postApplicationEnd()
1481+
_ui.foreach(_.stop())
1482+
if (env != null) {
1483+
env.metricsSystem.report()
1484+
}
1485+
if (metadataCleaner != null) {
1486+
metadataCleaner.cancel()
1487+
}
1488+
_cleaner.foreach(_.stop())
1489+
_executorAllocationManager.foreach(_.stop())
1490+
if (_dagScheduler != null) {
1491+
_dagScheduler.stop()
1492+
_dagScheduler = null
1493+
}
1494+
if (_listenerBusStarted) {
1495+
listenerBus.stop()
1496+
_listenerBusStarted = false
1497+
}
1498+
_eventLogger.foreach(_.stop())
1499+
if (env != null) {
1500+
env.actorSystem.stop(_heartbeatReceiver)
1501+
}
1502+
_progressBar.foreach(_.stop())
1503+
_taskScheduler = null
1504+
// TODO: Cache.stop()?
1505+
if (_env != null) {
1506+
_env.stop()
1507+
SparkEnv.set(null)
15081508
}
1509+
SparkContext.clearActiveContext()
1510+
logInfo("Successfully stopped SparkContext")
15091511
}
15101512

15111513

@@ -1567,7 +1569,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
15671569
partitions: Seq[Int],
15681570
allowLocal: Boolean,
15691571
resultHandler: (Int, U) => Unit) {
1570-
if (stopped) {
1572+
if (stopped.get()) {
15711573
throw new IllegalStateException("SparkContext has been shutdown")
15721574
}
15731575
val callSite = getCallSite

core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,15 @@
1717

1818
package org.apache.spark.deploy
1919

20+
import java.net.URI
21+
2022
private[spark] class ApplicationDescription(
2123
val name: String,
2224
val maxCores: Option[Int],
2325
val memoryPerSlave: Int,
2426
val command: Command,
2527
var appUiUrl: String,
26-
val eventLogDir: Option[String] = None,
28+
val eventLogDir: Option[URI] = None,
2729
// short name of compression codec used when writing event logs, if any (e.g. lzf)
2830
val eventLogCodec: Option[String] = None)
2931
extends Serializable {
@@ -36,7 +38,7 @@ private[spark] class ApplicationDescription(
3638
memoryPerSlave: Int = memoryPerSlave,
3739
command: Command = command,
3840
appUiUrl: String = appUiUrl,
39-
eventLogDir: Option[String] = eventLogDir,
41+
eventLogDir: Option[URI] = eventLogDir,
4042
eventLogCodec: Option[String] = eventLogCodec): ApplicationDescription =
4143
new ApplicationDescription(
4244
name, maxCores, memoryPerSlave, command, appUiUrl, eventLogDir, eventLogCodec)

core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -47,21 +47,21 @@ import org.apache.spark.util.{JsonProtocol, Utils}
4747
*/
4848
private[spark] class EventLoggingListener(
4949
appId: String,
50-
logBaseDir: String,
50+
logBaseDir: URI,
5151
sparkConf: SparkConf,
5252
hadoopConf: Configuration)
5353
extends SparkListener with Logging {
5454

5555
import EventLoggingListener._
5656

57-
def this(appId: String, logBaseDir: String, sparkConf: SparkConf) =
57+
def this(appId: String, logBaseDir: URI, sparkConf: SparkConf) =
5858
this(appId, logBaseDir, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf))
5959

6060
private val shouldCompress = sparkConf.getBoolean("spark.eventLog.compress", false)
6161
private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false)
6262
private val testing = sparkConf.getBoolean("spark.eventLog.testing", false)
6363
private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024
64-
private val fileSystem = Utils.getHadoopFileSystem(new URI(logBaseDir), hadoopConf)
64+
private val fileSystem = Utils.getHadoopFileSystem(logBaseDir, hadoopConf)
6565
private val compressionCodec =
6666
if (shouldCompress) {
6767
Some(CompressionCodec.createCodec(sparkConf))
@@ -259,13 +259,13 @@ private[spark] object EventLoggingListener extends Logging {
259259
* @return A path which consists of file-system-safe characters.
260260
*/
261261
def getLogPath(
262-
logBaseDir: String,
262+
logBaseDir: URI,
263263
appId: String,
264264
compressionCodecName: Option[String] = None): String = {
265265
val sanitizedAppId = appId.replaceAll("[ :/]", "-").replaceAll("[.${}'\"]", "_").toLowerCase
266266
// e.g. app_123, app_123.lzf
267267
val logName = sanitizedAppId + compressionCodecName.map { "." + _ }.getOrElse("")
268-
Utils.resolveURI(logBaseDir).toString.stripSuffix("/") + "/" + logName
268+
logBaseDir.toString.stripSuffix("/") + "/" + logName
269269
}
270270

271271
/**

core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ class FsHistoryProviderSuite extends FunSuite with BeforeAndAfter with Matchers
5050
inProgress: Boolean,
5151
codec: Option[String] = None): File = {
5252
val ip = if (inProgress) EventLoggingListener.IN_PROGRESS else ""
53-
val logUri = EventLoggingListener.getLogPath(testDir.getAbsolutePath, appId)
53+
val logUri = EventLoggingListener.getLogPath(testDir.toURI, appId)
5454
val logPath = new URI(logUri).getPath + ip
5555
new File(logPath)
5656
}

core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
6161
test("Verify log file exist") {
6262
// Verify logging directory exists
6363
val conf = getLoggingConf(testDirPath)
64-
val eventLogger = new EventLoggingListener("test", testDirPath.toUri().toString(), conf)
64+
val eventLogger = new EventLoggingListener("test", testDirPath.toUri(), conf)
6565
eventLogger.start()
6666

6767
val logPath = new Path(eventLogger.logPath + EventLoggingListener.IN_PROGRESS)
@@ -95,7 +95,7 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
9595
}
9696

9797
test("Log overwriting") {
98-
val logUri = EventLoggingListener.getLogPath(testDir.getAbsolutePath, "test")
98+
val logUri = EventLoggingListener.getLogPath(testDir.toURI, "test")
9999
val logPath = new URI(logUri).getPath
100100
// Create file before writing the event log
101101
new FileOutputStream(new File(logPath)).close()
@@ -107,16 +107,19 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
107107

108108
test("Event log name") {
109109
// without compression
110-
assert(s"file:/base-dir/app1" === EventLoggingListener.getLogPath("/base-dir", "app1"))
110+
assert(s"file:/base-dir/app1" === EventLoggingListener.getLogPath(
111+
Utils.resolveURI("/base-dir"), "app1"))
111112
// with compression
112113
assert(s"file:/base-dir/app1.lzf" ===
113-
EventLoggingListener.getLogPath("/base-dir", "app1", Some("lzf")))
114+
EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"), "app1", Some("lzf")))
114115
// illegal characters in app ID
115116
assert(s"file:/base-dir/a-fine-mind_dollar_bills__1" ===
116-
EventLoggingListener.getLogPath("/base-dir", "a fine:mind$dollar{bills}.1"))
117+
EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"),
118+
"a fine:mind$dollar{bills}.1"))
117119
// illegal characters in app ID with compression
118120
assert(s"file:/base-dir/a-fine-mind_dollar_bills__1.lz4" ===
119-
EventLoggingListener.getLogPath("/base-dir", "a fine:mind$dollar{bills}.1", Some("lz4")))
121+
EventLoggingListener.getLogPath(Utils.resolveURI("/base-dir"),
122+
"a fine:mind$dollar{bills}.1", Some("lz4")))
120123
}
121124

122125
/* ----------------- *
@@ -137,7 +140,7 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
137140
val conf = getLoggingConf(testDirPath, compressionCodec)
138141
extraConf.foreach { case (k, v) => conf.set(k, v) }
139142
val logName = compressionCodec.map("test-" + _).getOrElse("test")
140-
val eventLogger = new EventLoggingListener(logName, testDirPath.toUri().toString(), conf)
143+
val eventLogger = new EventLoggingListener(logName, testDirPath.toUri(), conf)
141144
val listenerBus = new LiveListenerBus
142145
val applicationStart = SparkListenerApplicationStart("Greatest App (N)ever", None,
143146
125L, "Mickey")
@@ -173,12 +176,15 @@ class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with Bef
173176
* This runs a simple Spark job and asserts that the expected events are logged when expected.
174177
*/
175178
private def testApplicationEventLogging(compressionCodec: Option[String] = None) {
179+
// Set defaultFS to something that would cause an exception, to make sure we don't run
180+
// into SPARK-6688.
176181
val conf = getLoggingConf(testDirPath, compressionCodec)
182+
.set("spark.hadoop.fs.defaultFS", "unsupported://example.com")
177183
val sc = new SparkContext("local-cluster[2,2,512]", "test", conf)
178184
assert(sc.eventLogger.isDefined)
179185
val eventLogger = sc.eventLogger.get
180186
val eventLogPath = eventLogger.logPath
181-
val expectedLogDir = testDir.toURI().toString()
187+
val expectedLogDir = testDir.toURI()
182188
assert(eventLogPath === EventLoggingListener.getLogPath(
183189
expectedLogDir, sc.applicationId, compressionCodec.map(CompressionCodec.getShortName)))
184190

core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.scheduler
1919

2020
import java.io.{File, PrintWriter}
21+
import java.net.URI
2122

2223
import org.json4s.jackson.JsonMethods._
2324
import org.scalatest.{BeforeAndAfter, FunSuite}
@@ -145,7 +146,7 @@ class ReplayListenerSuite extends FunSuite with BeforeAndAfter {
145146
* log the events.
146147
*/
147148
private class EventMonster(conf: SparkConf)
148-
extends EventLoggingListener("test", "testdir", conf) {
149+
extends EventLoggingListener("test", new URI("testdir"), conf) {
149150

150151
override def start() { }
151152

project/MimaExcludes.scala

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,10 @@ object MimaExcludes {
6060
) ++ Seq(
6161
// SPARK-6510 Add a Graph#minus method acting as Set#difference
6262
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.graphx.VertexRDD.minus")
63+
) ++ Seq(
64+
// SPARK-6492 Fix deadlock in SparkContext.stop()
65+
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.SparkContext.org$" +
66+
"apache$spark$SparkContext$$SPARK_CONTEXT_CONSTRUCTOR_LOCK")
6367
)
6468

6569
case v if v.startsWith("1.3") =>

0 commit comments

Comments
 (0)