Skip to content

Commit 30a9036

Browse files
committed
Make self return null after stopping RpcEndpointRef; fix docs and error messages
1 parent 705245d commit 30a9036

File tree

8 files changed

+33
-25
lines changed

8 files changed

+33
-25
lines changed

core/src/main/scala/org/apache/spark/HeartbeatReceiver.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,8 @@ private[spark] class HeartbeatReceiver(sc: SparkContext, scheduler: TaskSchedule
7373

7474
override def onStart(): Unit = {
7575
timeoutCheckingTask = timeoutCheckingThread.scheduleAtFixedRate(new Runnable {
76-
override def run(): Unit = {
77-
self.send(ExpireDeadHosts)
76+
override def run(): Unit = Utils.tryLogNonFatalError {
77+
Option(self).foreach(_.send(ExpireDeadHosts))
7878
}
7979
}, 0, checkTimeoutIntervalMs, TimeUnit.MILLISECONDS)
8080
}

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,10 @@ private[spark] class CoarseGrainedExecutorBackend(
5555
ref.sendWithReply[RegisteredExecutor.type](
5656
RegisterExecutor(executorId, self, hostPort, cores, extractLogUrls))
5757
} onComplete {
58-
case Success(msg) => self.send(msg)
59-
case Failure(e) => logError(s"Cannot register to driver: $driverUrl", e)
58+
case Success(msg) => Utils.tryLogNonFatalError {
59+
Option(self).foreach(_.send(msg)) // msg must be RegisteredExecutor
60+
}
61+
case Failure(e) => logError(s"Cannot register with driver: $driverUrl", e)
6062
}
6163
}
6264

@@ -108,7 +110,7 @@ private[spark] class CoarseGrainedExecutorBackend(
108110
logError(s"Driver $remoteAddress disassociated! Shutting down.")
109111
System.exit(1)
110112
} else {
111-
logWarning(s"Received irrelevant DisassociatedEvent $remoteAddress")
113+
logWarning(s"An unknown ($remoteAddress) driver disconnected.")
112114
}
113115
}
114116

core/src/main/scala/org/apache/spark/rpc/RpcEnv.scala

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,7 @@ private[spark] abstract class RpcEnv(conf: SparkConf) {
4040

4141
/**
4242
* Return RpcEndpointRef of the registered [[RpcEndpoint]]. Will be used to implement
43-
* [[RpcEndpoint.self]].
44-
*
45-
* Note: This method won't return null. `IllegalArgumentException` will be thrown if calling this
46-
* on a non-existent endpoint.
43+
* [[RpcEndpoint.self]]. Return `null` if the corresponding [[RpcEndpointRef]] does not exist.
4744
*/
4845
private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef
4946

@@ -195,7 +192,7 @@ private[spark] trait RpcEndpoint {
195192

196193
/**
197194
* The [[RpcEndpointRef]] of this [[RpcEndpoint]]. `self` will become valid when `onStart` is
198-
* called.
195+
* called. And `self` will become `null` when `onStop` is called.
199196
*
200197
* Note: Because before `onStart`, [[RpcEndpoint]] has not yet been registered and there is not
201198
* valid [[RpcEndpointRef]] for it. So don't call `self` before `onStart` is called.
@@ -407,7 +404,8 @@ private[spark] object RpcAddress {
407404
}
408405

409406
/**
410-
* A callback that [[RpcEndpoint]] can use it to send back a message or failure.
407+
* A callback that [[RpcEndpoint]] can use it to send back a message or failure. It's thread-safe
408+
* and can be called in any thread.
411409
*/
412410
private[spark] trait RpcCallContext {
413411

core/src/main/scala/org/apache/spark/rpc/akka/AkkaRpcEnv.scala

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -82,11 +82,7 @@ private[spark] class AkkaRpcEnv private[akka] (
8282
/**
8383
* Retrieve the [[RpcEndpointRef]] of `endpoint`.
8484
*/
85-
override def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef = {
86-
val endpointRef = endpointToRef.get(endpoint)
87-
require(endpointRef != null, s"Cannot find RpcEndpointRef of ${endpoint} in ${this}")
88-
endpointRef
89-
}
85+
override def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef = endpointToRef.get(endpoint)
9086

9187
override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {
9288
setupThreadSafeEndpoint(name, endpoint)

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
7979
// Periodically revive offers to allow delay scheduling to work
8080
val reviveInterval = conf.getLong("spark.scheduler.revive.interval", 1000)
8181
reviveThread.scheduleAtFixedRate(new Runnable {
82-
override def run(): Unit = self.send(ReviveOffers)
82+
override def run(): Unit = Utils.tryLogNonFatalError {
83+
Option(self).foreach(_.send(ReviveOffers))
84+
}
8385
}, 0, reviveInterval, TimeUnit.MILLISECONDS)
8486
}
8587

@@ -167,7 +169,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
167169

168170
override def onDisconnected(remoteAddress: RpcAddress): Unit = {
169171
addressToExecutorId.get(remoteAddress).foreach(removeExecutor(_,
170-
"remote Akka client disassociated"))
172+
"remote Rpc client disassociated"))
171173
}
172174

173175
// Make fake resource offers on just one executor

core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,9 @@ private[spark] class LocalEndpoint(
8787
if (tasks.isEmpty && scheduler.activeTaskSets.nonEmpty) {
8888
// Try to reviveOffer after 1 second, because scheduler may wait for locality timeout
8989
reviveThread.schedule(new Runnable {
90-
override def run(): Unit = self.send(ReviveOffers)
90+
override def run(): Unit = Utils.tryLogNonFatalError {
91+
Option(self).foreach(_.send(ReviveOffers))
92+
}
9193
}, 1000, TimeUnit.MILLISECONDS)
9294
}
9395
}

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1214,6 +1214,16 @@ private[spark] object Utils extends Logging {
12141214
}
12151215
}
12161216

1217+
/** Executes the given block. Log non-fatal errors if any, and only throw fatal errors */
1218+
def tryLogNonFatalError(block: => Unit) {
1219+
try {
1220+
block
1221+
} catch {
1222+
case NonFatal(t) =>
1223+
logError(s"Uncaught exception in thread ${Thread.currentThread().getName}", t)
1224+
}
1225+
}
1226+
12171227
/** Default filtering function for finding call sites using `getCallSite`. */
12181228
private def coreExclusionFunction(className: String): Boolean = {
12191229
// A regular expression to match classes of the "core" Spark API that we want to skip when

core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
311311
}
312312

313313
test("self: call in onStop") {
314-
@volatile var e: Throwable = null
314+
@volatile var selfOption: Option[RpcEndpointRef] = null
315315

316316
val endpointRef = env.setupEndpoint("self-onStop", new RpcEndpoint {
317317
override val rpcEnv = env
@@ -321,20 +321,18 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll {
321321
}
322322

323323
override def onStop(): Unit = {
324-
self
324+
selfOption = Option(self)
325325
}
326326

327327
override def onError(cause: Throwable): Unit = {
328-
e = cause
329328
}
330329
})
331330

332331
env.stop(endpointRef)
333332

334333
eventually(timeout(5 seconds), interval(10 millis)) {
335-
// Calling `self` in `onStop` is invalid
336-
assert(e != null)
337-
assert(e.getMessage.contains("Cannot find RpcEndpointRef"))
334+
// Calling `self` in `onStop` will return null, so selfOption will be None
335+
assert(selfOption == None)
338336
}
339337
}
340338

0 commit comments

Comments
 (0)