Skip to content

Commit 20051eb

Browse files
AngersZhuuuutgravescs
authored andcommitted
[SPARK-36540][YARN] YARN-CLIENT mode should check Shutdown message when AMEndpoint disconencted
### What changes were proposed in this pull request? We meet a case AM lose connection ``` 21/08/18 02:14:15 ERROR TransportRequestHandler: Error sending result RpcResponse{requestId=5675952834716124039, body=NioManagedBuffer{buf=java.nio.HeapByteBuffer[pos=0 lim=47 cap=64]}} to xx.xx.xx.xx:41420; closing connection java.nio.channels.ClosedChannelException at io.netty.channel.AbstractChannel$AbstractUnsafe.newClosedChannelException(AbstractChannel.java:957) at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:865) at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1367) at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:717) at io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:764) at io.netty.channel.AbstractChannelHandlerContext$WriteTask.run(AbstractChannelHandlerContext.java:1104) at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(Thread.java:748) ``` Check the code about client, when AMEndpoint disconnected, will finish Application with SUCCESS final status ``` override def onDisconnected(remoteAddress: RpcAddress): Unit = { // In cluster mode or unmanaged am case, do not rely on the disassociated event to exit // This avoids potentially reporting incorrect exit codes if the driver fails if (!(isClusterMode || sparkConf.get(YARN_UNMANAGED_AM))) { logInfo(s"Driver terminated or disconnected! Shutting down. $remoteAddress") finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS) } } ``` Normally say in client mode, when application success, driver will stop and AM loss connection, it's ok that exit with SUCCESS, but if there is a not work problem cause disconnected. Still finish with final status is not correct. Then YarnClientSchedulerBackend will receive application report with final status with success and stop SparkContext cause application failed but mark it as a normal stop. ``` private class MonitorThread extends Thread { private var allowInterrupt = true override def run() { try { val YarnAppReport(_, state, diags) = client.monitorApplication(appId.get, logApplicationReport = false) logError(s"YARN application has exited unexpectedly with state $state! " + "Check the YARN application logs for more details.") diags.foreach { err => logError(s"Diagnostics message: $err") } allowInterrupt = false sc.stop() } catch { case e: InterruptedException => logInfo("Interrupting monitor thread") } } def stopMonitor(): Unit = { if (allowInterrupt) { this.interrupt() } ``` IMO, we should send a `Shutdown` message to yarn client mode AM to make sure the shut down case ### Why are the changes needed? Fix bug ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Closes #33780 from AngersZhuuuu/SPARK-36540. Authored-by: Angerszhuuuu <[email protected]> Signed-off-by: Thomas Graves <[email protected]>
1 parent dd986e8 commit 20051eb

File tree

5 files changed

+53
-4
lines changed

5 files changed

+53
-4
lines changed

docs/running-on-yarn.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -441,6 +441,19 @@ To use a custom metrics.properties for the application master and executors, upd
441441
</td>
442442
<td>1.6.0</td>
443443
</tr>
444+
<tr>
445+
<td><code>spark.yarn.am.clientModeTreatDisconnectAsFailed</code></td>
446+
<td>false</td>
447+
<td>
448+
Treat yarn-client unclean disconnects as failures. In yarn-client mode, normally the application will always finish
449+
with a final status of SUCCESS because in some cases, it is not possible to know if the Application was terminated
450+
intentionally by the user or if there was a real error. This config changes that behavior such that if the Application
451+
Master disconnects from the driver uncleanly (ie without the proper shutdown handshake) the application will
452+
terminate with a final status of FAILED. This will allow the caller to decide if it was truly a failure. Note that if
453+
this config is set and the user just terminate the client application badly it may show a status of FAILED when it wasn't really FAILED.
454+
</td>
455+
<td>3.3.0</td>
456+
</tr>
444457
<tr>
445458
<td><code>spark.yarn.am.clientModeExitOnError</code></td>
446459
<td>false</td>

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -784,6 +784,9 @@ private[spark] class ApplicationMaster(
784784
*/
785785
private class AMEndpoint(override val rpcEnv: RpcEnv, driver: RpcEndpointRef)
786786
extends RpcEndpoint with Logging {
787+
@volatile private var shutdown = false
788+
private val clientModeTreatDisconnectAsFailed =
789+
sparkConf.get(AM_CLIENT_MODE_TREAT_DISCONNECT_AS_FAILED)
787790

788791
override def onStart(): Unit = {
789792
driver.send(RegisterClusterManager(self))
@@ -801,6 +804,8 @@ private[spark] class ApplicationMaster(
801804
override def receive: PartialFunction[Any, Unit] = {
802805
case UpdateDelegationTokens(tokens) =>
803806
SparkHadoopUtil.get.addDelegationTokens(tokens, sparkConf)
807+
808+
case Shutdown => shutdown = true
804809
}
805810

806811
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
@@ -843,8 +848,13 @@ private[spark] class ApplicationMaster(
843848
// In cluster mode or unmanaged am case, do not rely on the disassociated event to exit
844849
// This avoids potentially reporting incorrect exit codes if the driver fails
845850
if (!(isClusterMode || sparkConf.get(YARN_UNMANAGED_AM))) {
846-
logInfo(s"Driver terminated or disconnected! Shutting down. $remoteAddress")
847-
finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
851+
if (shutdown || !clientModeTreatDisconnectAsFailed) {
852+
logInfo(s"Driver terminated or disconnected! Shutting down. $remoteAddress")
853+
finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS)
854+
} else {
855+
logError(s"Application Master lost connection with driver! Shutting down. $remoteAddress")
856+
finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_DISCONNECTED)
857+
}
848858
}
849859
}
850860
}
@@ -862,6 +872,7 @@ object ApplicationMaster extends Logging {
862872
private val EXIT_SECURITY = 14
863873
private val EXIT_EXCEPTION_USER_CLASS = 15
864874
private val EXIT_EARLY = 16
875+
private val EXIT_DISCONNECTED = 17
865876

866877
private var master: ApplicationMaster = _
867878

resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/config.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,21 @@ package object config extends Logging {
5252
.timeConf(TimeUnit.MILLISECONDS)
5353
.createOptional
5454

55+
private[spark] val AM_CLIENT_MODE_TREAT_DISCONNECT_AS_FAILED =
56+
ConfigBuilder("spark.yarn.am.clientModeTreatDisconnectAsFailed")
57+
.doc("Treat yarn-client unclean disconnects as failures. In yarn-client mode, normally the " +
58+
"application will always finish with a final status of SUCCESS because in some cases, " +
59+
"it is not possible to know if the Application was terminated intentionally by the user " +
60+
"or if there was a real error. This config changes that behavior such that " +
61+
"if the Application Master disconnects from the driver uncleanly (ie without the proper" +
62+
" shutdown handshake) the application will terminate with a final status of FAILED. " +
63+
"This will allow the caller to decide if it was truly a failure. Note that " +
64+
"if this config is set and the user just terminate the client application badly " +
65+
"it may show a status of FAILED when it wasn't really FAILED.")
66+
.version("3.3.0")
67+
.booleanConf
68+
.createWithDefault(false)
69+
5570
private[spark] val AM_CLIENT_MODE_EXIT_ON_ERROR =
5671
ConfigBuilder("spark.yarn.am.clientModeExitOnError")
5772
.doc("In yarn-client mode, when this is true, if driver got " +

resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ private[spark] class YarnClientSchedulerBackend(
161161
*/
162162
override def stop(): Unit = {
163163
assert(client != null, "Attempted to stop this scheduler before starting it!")
164+
yarnSchedulerEndpoint.handleClientModeDriverStop()
164165
if (monitorThread != null) {
165166
monitorThread.stopMonitor()
166167
}

resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ private[spark] abstract class YarnSchedulerBackend(
5858

5959
protected var totalExpectedExecutors = 0
6060

61-
private val yarnSchedulerEndpoint = new YarnSchedulerEndpoint(rpcEnv)
61+
protected val yarnSchedulerEndpoint = new YarnSchedulerEndpoint(rpcEnv)
6262
protected var amEndpoint: Option[RpcEndpointRef] = None
6363

6464
private val yarnSchedulerEndpointRef = rpcEnv.setupEndpoint(
@@ -291,7 +291,7 @@ private[spark] abstract class YarnSchedulerBackend(
291291
/**
292292
* An [[RpcEndpoint]] that communicates with the ApplicationMaster.
293293
*/
294-
private class YarnSchedulerEndpoint(override val rpcEnv: RpcEnv)
294+
protected class YarnSchedulerEndpoint(override val rpcEnv: RpcEnv)
295295
extends ThreadSafeRpcEndpoint with Logging {
296296

297297
private[YarnSchedulerBackend] def handleExecutorDisconnectedFromDriver(
@@ -319,6 +319,15 @@ private[spark] abstract class YarnSchedulerBackend(
319319
removeExecutorMessage.foreach { message => driverEndpoint.send(message) }
320320
}
321321

322+
private[cluster] def handleClientModeDriverStop(): Unit = {
323+
amEndpoint match {
324+
case Some(am) =>
325+
am.send(Shutdown)
326+
case None =>
327+
logWarning("Attempted to send shutdown message before the AM has registered!")
328+
}
329+
}
330+
322331
override def receive: PartialFunction[Any, Unit] = {
323332
case RegisterClusterManager(am) =>
324333
logInfo(s"ApplicationMaster registered as $am")

0 commit comments

Comments
 (0)