Skip to content

Commit 705245d

Browse files
committed
Fix some bugs after rebasing the changes on the master
1 parent 003cf80 commit 705245d

17 files changed

+114
-83
lines changed

core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,14 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskSchedule
6565

6666
private var timeoutCheckingTask: ScheduledFuture[_] = null
6767

68-
private val messageScheduler = Executors.newSingleThreadScheduledExecutor(
69-
Utils.namedThreadFactory("heart-beat-receiver-thread"))
68+
private val timeoutCheckingThread = Executors.newSingleThreadScheduledExecutor(
69+
Utils.namedThreadFactory("heartbeat-timeout-checking-thread"))
70+
71+
private val killExecutorThread = Executors.newSingleThreadExecutor(
72+
Utils.namedThreadFactory("kill-executor-thread"))
7073

7174
override def onStart(): Unit = {
72-
timeoutCheckingTask = messageScheduler.scheduleAtFixedRate(new Runnable {
75+
timeoutCheckingTask = timeoutCheckingThread.scheduleAtFixedRate(new Runnable {
7376
override def run(): Unit = {
7477
self.send(ExpireDeadHosts)
7578
}
@@ -100,7 +103,10 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskSchedule
100103
scheduler.executorLost(executorId, SlaveLost("Executor heartbeat " +
101104
s"timed out after ${now - lastSeenMs} ms"))
102105
if (sc.supportDynamicAllocation) {
103-
sc.killExecutor(executorId)
106+
// Asynchronously kill the executor to avoid blocking the current thread
107+
killExecutorThread.submit(new Runnable {
108+
override def run(): Unit = sc.killExecutor(executorId)
109+
})
104110
}
105111
executorLastSeen.remove(executorId)
106112
}
@@ -111,5 +117,11 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskSchedule
111117
if (timeoutCheckingTask != null) {
112118
timeoutCheckingTask.cancel(true)
113119
}
120+
timeoutCheckingThread.shutdownNow()
121+
killExecutorThread.shutdownNow()
114122
}
115123
}
124+
125+
object HeartbeatReceiver {
126+
val ENDPOINT_NAME = "HeartbeatReceiver"
127+
}

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,13 @@ import org.apache.mesos.MesosNativeLibrary
4646
import org.apache.spark.annotation.{DeveloperApi, Experimental}
4747
import org.apache.spark.broadcast.Broadcast
4848
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
49-
import org.apache.spark.executor.TriggerThreadDump
49+
import org.apache.spark.executor.{ExecutorEndpoint, TriggerThreadDump}
5050
import org.apache.spark.input.{StreamInputFormat, PortableDataStream, WholeTextFileInputFormat,
5151
FixedLengthBinaryInputFormat}
5252
import org.apache.spark.io.CompressionCodec
5353
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
5454
import org.apache.spark.rdd._
55-
import org.apache.spark.rpc.{RpcAddress, RpcEnv}
55+
import org.apache.spark.rpc.RpcAddress
5656
import org.apache.spark.scheduler._
5757
import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend,
5858
SparkDeploySchedulerBackend, SimrSchedulerBackend}
@@ -359,7 +359,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
359359
private[spark] var (schedulerBackend, taskScheduler) =
360360
SparkContext.createTaskScheduler(this, master)
361361
private val heartbeatReceiver = env.rpcEnv.setupThreadSafeEndpoint(
362-
"HeartbeatReceiver", new HeartbeatReceiver(this, taskScheduler))
362+
HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this, taskScheduler))
363363

364364
@volatile private[spark] var dagScheduler: DAGScheduler = _
365365
try {
@@ -448,7 +448,9 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
448448
} else {
449449
val (host, port) = env.blockManager.master.getRpcHostPortForExecutor(executorId).get
450450
val endpointRef = env.rpcEnv.setupEndpointRef(
451-
SparkEnv.executorActorSystemName, RpcAddress(host, port), "ExecutorEndpoint")
451+
SparkEnv.executorActorSystemName,
452+
RpcAddress(host, port),
453+
ExecutorEndpoint.EXECUTOR_ENDPOINT_NAME)
452454
Some(endpointRef.askWithReply[Array[ThreadStackTrace]](TriggerThreadDump))
453455
}
454456
} catch {

core/src/main/scala/org/apache/spark/SparkEnv.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -342,7 +342,8 @@ object SparkEnv extends Logging {
342342

343343
val blockManagerMaster = new BlockManagerMaster(registerOrLookupEndpoint(
344344
BlockManagerMaster.DRIVER_ENDPOINT_NAME,
345-
new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus), true), conf, isDriver)
345+
new BlockManagerMasterEndpoint(rpcEnv, isLocal, conf, listenerBus), threadSafe = true),
346+
conf, isDriver)
346347

347348
// NB: blockManager is not valid until initialize() is called later.
348349
val blockManager = new BlockManager(executorId, rpcEnv, blockManagerMaster,

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ private[spark] class CoarseGrainedExecutorBackend(
5050
override def onStart() {
5151
import scala.concurrent.ExecutionContext.Implicits.global
5252
logInfo("Connecting to driver: " + driverUrl)
53-
rpcEnv.asyncSetupEndpointRefByUrl(driverUrl).flatMap { ref =>
53+
rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
5454
driver = Some(ref)
5555
ref.sendWithReply[RegisteredExecutor.type](
5656
RegisterExecutor(executorId, self, hostPort, cores, extractLogUrls))
@@ -147,7 +147,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
147147
port,
148148
executorConf,
149149
new SecurityManager(executorConf))
150-
val driver = fetcher.setupEndpointRefByUrl(driverUrl)
150+
val driver = fetcher.setupEndpointRefByURI(driverUrl)
151151
val props = driver.askWithReply[Seq[(String, String)]](RetrieveSparkProps) ++
152152
Seq[(String, String)](("spark.app.id", appId))
153153
fetcher.shutdown()
@@ -171,7 +171,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
171171

172172
// Start the CoarseGrainedExecutorBackend endpoint.
173173
val sparkHostPort = hostname + ":" + boundPort
174-
env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
174+
env.rpcEnv.setupThreadSafeEndpoint("Executor", new CoarseGrainedExecutorBackend(
175175
env.rpcEnv, driverUrl, executorId, sparkHostPort, cores, userClassPath, env))
176176
workerUrl.foreach { url =>
177177
env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))

core/src/main/scala/org/apache/spark/executor/Executor.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,9 +86,9 @@ private[spark] class Executor(
8686
env.blockManager.initialize(conf.getAppId)
8787
}
8888

89-
// Create an actor for receiving RPCs from the driver
89+
// Create an RpcEndpoint for receiving RPCs from the driver
9090
private val executorEndpoint = env.rpcEnv.setupEndpoint(
91-
"ExecutorEndpoint", new ExecutorEndpoint(env.rpcEnv, executorId))
91+
ExecutorEndpoint.EXECUTOR_ENDPOINT_NAME, new ExecutorEndpoint(env.rpcEnv, executorId))
9292

9393
// Whether to load classes in user jars before those in Spark jars
9494
private val userClassPathFirst: Boolean = {
@@ -389,7 +389,8 @@ private[spark] class Executor(
389389
}
390390
}
391391

392-
private val heartbeatReceiverRef = RpcUtils.makeDriverRef("HeartbeatReceiver", conf, env.rpcEnv)
392+
private val heartbeatReceiverRef =
393+
RpcUtils.makeDriverRef(HeartbeatReceiver.ENDPOINT_NAME, conf, env.rpcEnv)
393394

394395
/** Reports heartbeat and metrics for active tasks to the driver. */
395396
private def reportHeartBeat(): Unit = {

core/src/main/scala/org/apache/spark/executor/ExecutorEndpoint.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import org.apache.spark.util.Utils
2626
private[spark] case object TriggerThreadDump
2727

2828
/**
29-
* Actor that runs inside of executors to enable driver -> executor RPC.
29+
* [[RpcEndpoint]] that runs inside of executors to enable driver -> executor RPC.
3030
*/
3131
private[spark]
3232
class ExecutorEndpoint(override val rpcEnv: RpcEnv, executorId: String) extends RpcEndpoint {
@@ -37,3 +37,7 @@ class ExecutorEndpoint(override val rpcEnv: RpcEnv, executorId: String) extends
3737
}
3838

3939
}
40+
41+
object ExecutorEndpoint {
42+
val EXECUTOR_ENDPOINT_NAME = "ExecutorEndpoint"
43+
}

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
6666
// Executors we have requested the cluster manager to kill that have not died yet
6767
private val executorsPendingToRemove = new HashSet[String]
6868

69-
class DriverActor(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
69+
class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
7070
extends RpcEndpoint with Logging {
7171
override protected def log = CoarseGrainedSchedulerBackend.this.log
7272

@@ -104,7 +104,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
104104
case KillTask(taskId, executorId, interruptThread) =>
105105
executorDataMap.get(executorId) match {
106106
case Some(executorInfo) =>
107-
executorInfo.executorActor.send(KillTask(taskId, executorId, interruptThread))
107+
executorInfo.executorEndpoint.send(KillTask(taskId, executorId, interruptThread))
108108
case None =>
109109
// Ignoring the task kill since the executor is not registered.
110110
logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.")
@@ -146,7 +146,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
146146
case StopExecutors =>
147147
logInfo("Asking each executor to shut down")
148148
for ((_, executorData) <- executorDataMap) {
149-
executorData.executorActor.send(StopExecutor)
149+
executorData.executorEndpoint.send(StopExecutor)
150150
}
151151
context.reply(true)
152152

@@ -200,7 +200,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
200200
else {
201201
val executorData = executorDataMap(task.executorId)
202202
executorData.freeCores -= scheduler.CPUS_PER_TASK
203-
executorData.executorActor.send(LaunchTask(new SerializableBuffer(serializedTask)))
203+
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
204204
}
205205
}
206206
}
@@ -230,7 +230,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
230230
}
231231
}
232232

233-
var driverActor: RpcEndpointRef = null
233+
var driverEndpoint: RpcEndpointRef = null
234234
val taskIdsOnSlave = new HashMap[String, HashSet[String]]
235235

236236
override def start() {
@@ -241,15 +241,15 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
241241
}
242242
}
243243
// TODO (prashant) send conf instead of properties
244-
driverActor = rpcEnv.setupThreadSafeEndpoint(
245-
CoarseGrainedSchedulerBackend.ACTOR_NAME, new DriverActor(rpcEnv, properties))
244+
driverEndpoint = rpcEnv.setupThreadSafeEndpoint(
245+
CoarseGrainedSchedulerBackend.ENDPOINT_NAME, new DriverEndpoint(rpcEnv, properties))
246246
}
247247

248248
def stopExecutors() {
249249
try {
250-
if (driverActor != null) {
250+
if (driverEndpoint != null) {
251251
logInfo("Shutting down all executors")
252-
driverActor.askWithReply[Boolean](StopExecutors)
252+
driverEndpoint.askWithReply[Boolean](StopExecutors)
253253
}
254254
} catch {
255255
case e: Exception =>
@@ -260,21 +260,21 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
260260
override def stop() {
261261
stopExecutors()
262262
try {
263-
if (driverActor != null) {
264-
driverActor.askWithReply[Boolean](StopDriver)
263+
if (driverEndpoint != null) {
264+
driverEndpoint.askWithReply[Boolean](StopDriver)
265265
}
266266
} catch {
267267
case e: Exception =>
268-
throw new SparkException("Error stopping standalone scheduler's driver actor", e)
268+
throw new SparkException("Error stopping standalone scheduler's driver endpoint", e)
269269
}
270270
}
271271

272272
override def reviveOffers() {
273-
driverActor.send(ReviveOffers)
273+
driverEndpoint.send(ReviveOffers)
274274
}
275275

276276
override def killTask(taskId: Long, executorId: String, interruptThread: Boolean) {
277-
driverActor.send(KillTask(taskId, executorId, interruptThread))
277+
driverEndpoint.send(KillTask(taskId, executorId, interruptThread))
278278
}
279279

280280
override def defaultParallelism(): Int = {
@@ -284,10 +284,10 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
284284
// Called by subclasses when notified of a lost worker
285285
def removeExecutor(executorId: String, reason: String) {
286286
try {
287-
driverActor.askWithReply[Boolean](RemoveExecutor(executorId, reason))
287+
driverEndpoint.askWithReply[Boolean](RemoveExecutor(executorId, reason))
288288
} catch {
289289
case e: Exception =>
290-
throw new SparkException("Error notifying standalone scheduler's driver actor", e)
290+
throw new SparkException("Error notifying standalone scheduler's driver endpoint", e)
291291
}
292292
}
293293

@@ -393,5 +393,5 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
393393
}
394394

395395
private[spark] object CoarseGrainedSchedulerBackend {
396-
val ACTOR_NAME = "CoarseGrainedScheduler"
396+
val ENDPOINT_NAME = "CoarseGrainedScheduler"
397397
}

core/src/main/scala/org/apache/spark/scheduler/cluster/ExecutorData.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,14 @@ import org.apache.spark.rpc.{RpcEndpointRef, RpcAddress}
2222
/**
2323
* Grouping of data for an executor used by CoarseGrainedSchedulerBackend.
2424
*
25-
* @param executorActor The ActorRef representing this executor
25+
* @param executorEndpoint The ActorRef representing this executor
2626
* @param executorAddress The network address of this executor
2727
* @param executorHost The hostname that this executor is running on
2828
* @param freeCores The current number of cores available for work on the executor
2929
* @param totalCores The total number of cores available to the executor
3030
*/
3131
private[cluster] class ExecutorData(
32-
val executorActor: RpcEndpointRef,
32+
val executorEndpoint: RpcEndpointRef,
3333
val executorAddress: RpcAddress,
3434
override val executorHost: String,
3535
var freeCores: Int,

core/src/main/scala/org/apache/spark/scheduler/cluster/SimrSchedulerBackend.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ private[spark] class SimrSchedulerBackend(
4141

4242
val driverUrl = rpcEnv.uriOf(SparkEnv.driverActorSystemName,
4343
RpcAddress(sc.conf.get("spark.driver.host"), sc.conf.get("spark.driver.port").toInt),
44-
CoarseGrainedSchedulerBackend.ACTOR_NAME)
44+
CoarseGrainedSchedulerBackend.ENDPOINT_NAME)
4545

4646
val conf = SparkHadoopUtil.get.newConfiguration(sc.conf)
4747
val fs = FileSystem.get(conf)

core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ private[spark] class SparkDeploySchedulerBackend(
5151
// The endpoint for executors to talk to us
5252
val driverUrl = rpcEnv.uriOf(SparkEnv.driverActorSystemName,
5353
RpcAddress(sc.conf.get("spark.driver.host"), sc.conf.get("spark.driver.port").toInt),
54-
CoarseGrainedSchedulerBackend.ACTOR_NAME)
54+
CoarseGrainedSchedulerBackend.ENDPOINT_NAME)
5555
val args = Seq(
5656
"--driver-url", driverUrl,
5757
"--executor-id", "{{EXECUTOR_ID}}",

0 commit comments

Comments
 (0)