From 13ea1491e54b5d7165837813941a6aa47fc312b3 Mon Sep 17 00:00:00 2001 From: Akshat Bordia Date: Sun, 19 Apr 2020 20:49:57 +0530 Subject: [PATCH 01/12] Implementation of spark.submit.waitForCompletion flag which will control if the spark-submit process will wait to exit until the application finishes/fails/errors out. This flag is set to false by default where the application will exit after submission. --- .../org/apache/spark/deploy/Client.scala | 68 ++++++++++++------- docs/spark-standalone.md | 10 +++ 2 files changed, 55 insertions(+), 23 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index 7022b986ea025..a47186c3cd075 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -61,6 +61,9 @@ private class ClientEndpoint( private val lostMasters = new HashSet[RpcAddress] private var activeMasterEndpoint: RpcEndpointRef = null + private val waitAppCompletion = conf.get("spark.submit.waitAppCompletion", "false") == "true" + private val REPORT_DRIVER_STATUS_INTERVAL = 1000 + private def getProperty(key: String, conf: SparkConf): Option[String] = { sys.props.get(key).orElse(conf.getOption(key)) @@ -124,38 +127,57 @@ private class ClientEndpoint( } } - /* Find out driver status then exit the JVM */ + /** + * Find out driver status then exit the JVM. If the waitAppCompletion is set to true, monitors + * the application until it finishes, fails or is killed. + */ def pollAndReportStatus(driverId: String): Unit = { // Since ClientEndpoint is the only RpcEndpoint in the process, blocking the event loop thread // is fine. logInfo("... waiting before polling master for driver state") Thread.sleep(5000) logInfo("... polling master for driver state") - val statusResponse = - activeMasterEndpoint.askSync[DriverStatusResponse](RequestDriverStatus(driverId)) - if (statusResponse.found) { - logInfo(s"State of $driverId is ${statusResponse.state.get}") - // Worker node, if present - (statusResponse.workerId, statusResponse.workerHostPort, statusResponse.state) match { - case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) => - logInfo(s"Driver running on $hostPort ($id)") - case _ => - } - // Exception, if present - statusResponse.exception match { - case Some(e) => - logError(s"Exception from cluster was: $e") - e.printStackTrace() - System.exit(-1) - case _ => - System.exit(0) + while (true) { + val statusResponse = + activeMasterEndpoint.askSync[DriverStatusResponse](RequestDriverStatus(driverId)) + if (statusResponse.found) { + logInfo(s"State of $driverId is ${statusResponse.state.get}") + // Worker node, if present + (statusResponse.workerId, statusResponse.workerHostPort, statusResponse.state) match { + case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) => + logInfo(s"Driver running on $hostPort ($id)") + case _ => + } + // Exception, if present + statusResponse.exception match { + case Some(e) => + logError(s"Exception from cluster was: $e") + e.printStackTrace() + System.exit(-1) + case _ => + if (!waitAppCompletion) { + logInfo(s"No exception found and waitAppCompletion is false, " + + s"exiting spark-submit JVM.") + System.exit(0) + } else if (statusResponse.state.get == DriverState.FINISHED || + statusResponse.state.get == DriverState.FAILED || + statusResponse.state.get == DriverState.ERROR || + statusResponse.state.get == DriverState.KILLED) { + logInfo(s"waitAppCompletion is true, state is ${statusResponse.state.get}, " + + s"exiting spark-submit JVM.") + System.exit(0) + } else { + logTrace(s"waitAppCompletion is true, state is ${statusResponse.state.get}," + + s"continue monitoring driver status.") + } + } + } else { + logError(s"ERROR: Cluster master did not recognize $driverId") + System.exit(-1) } - } else { - logError(s"ERROR: Cluster master did not recognize $driverId") - System.exit(-1) + Thread.sleep(REPORT_DRIVER_STATUS_INTERVAL) } } - override def receive: PartialFunction[Any, Unit] = { case SubmitDriverResponse(master, success, driverId, message) => diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md index 2c2ed53b478c3..cebec82e0ba06 100644 --- a/docs/spark-standalone.md +++ b/docs/spark-standalone.md @@ -240,6 +240,16 @@ SPARK_MASTER_OPTS supports the following system properties: 1.6.3 + + spark.submit.waitAppCompletion + true + + In Standalone cluster mode, controls whether the client waits to exit until the application completes. + If set to true, the client process will stay alive reporting the application's status. + Otherwise, the client process will exit after submission. + + 3.1.0 + spark.worker.timeout 60 From 68d76d04d1f7c7980f8d86dc2d0fc419fd0bf24e Mon Sep 17 00:00:00 2001 From: Akshat Bordia Date: Sun, 19 Apr 2020 22:18:32 +0530 Subject: [PATCH 02/12] Fixing to adher to JavaDoc and Scalastyle guidelines --- core/src/main/scala/org/apache/spark/deploy/Client.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index a47186c3cd075..08566aa3afed9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -128,9 +128,9 @@ private class ClientEndpoint( } /** - * Find out driver status then exit the JVM. If the waitAppCompletion is set to true, monitors - * the application until it finishes, fails or is killed. - */ + * Find out driver status then exit the JVM. If the waitAppCompletion is set to true, monitors + * the application until it finishes, fails or is killed. + */ def pollAndReportStatus(driverId: String): Unit = { // Since ClientEndpoint is the only RpcEndpoint in the process, blocking the event loop thread // is fine. From 34c7d26d7d69734bd334518bc36147763ee77f14 Mon Sep 17 00:00:00 2001 From: Akshat Bordia Date: Mon, 20 Apr 2020 22:18:06 +0530 Subject: [PATCH 03/12] Using conf.getBoolean for the config --- core/src/main/scala/org/apache/spark/deploy/Client.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index 08566aa3afed9..7e3ba7e84e6ea 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -61,7 +61,7 @@ private class ClientEndpoint( private val lostMasters = new HashSet[RpcAddress] private var activeMasterEndpoint: RpcEndpointRef = null - private val waitAppCompletion = conf.get("spark.submit.waitAppCompletion", "false") == "true" + private val waitAppCompletion = conf.getBoolean("spark.submit.waitAppCompletion", false) private val REPORT_DRIVER_STATUS_INTERVAL = 1000 From a93ce76d8205da6eb01ab41644c57504174d405d Mon Sep 17 00:00:00 2001 From: Akshat Bordia Date: Fri, 1 May 2020 15:40:23 +0530 Subject: [PATCH 04/12] Using Match Statement and updating config property location in documentation --- .../org/apache/spark/deploy/Client.scala | 24 ++++++++-------- docs/spark-standalone.md | 28 ++++++++++++------- 2 files changed, 31 insertions(+), 21 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index 7e3ba7e84e6ea..45735e3068c47 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -61,7 +61,8 @@ private class ClientEndpoint( private val lostMasters = new HashSet[RpcAddress] private var activeMasterEndpoint: RpcEndpointRef = null - private val waitAppCompletion = conf.getBoolean("spark.submit.waitAppCompletion", false) + private val waitAppCompletion = conf.getBoolean("spark.standalone.submit.waitAppCompletion", + false) private val REPORT_DRIVER_STATUS_INTERVAL = 1000 @@ -156,19 +157,20 @@ private class ClientEndpoint( System.exit(-1) case _ => if (!waitAppCompletion) { - logInfo(s"No exception found and waitAppCompletion is false, " + - s"exiting spark-submit JVM.") - System.exit(0) - } else if (statusResponse.state.get == DriverState.FINISHED || - statusResponse.state.get == DriverState.FAILED || - statusResponse.state.get == DriverState.ERROR || - statusResponse.state.get == DriverState.KILLED) { - logInfo(s"waitAppCompletion is true, state is ${statusResponse.state.get}, " + + logInfo(s"spark-submit not configured to wait for completion, " + s"exiting spark-submit JVM.") System.exit(0) } else { - logTrace(s"waitAppCompletion is true, state is ${statusResponse.state.get}," + - s"continue monitoring driver status.") + statusResponse.state.get match { + case DriverState.FINISHED | DriverState.FAILED | + DriverState.ERROR | DriverState.KILLED => + logInfo(s"State of $driverId is ${statusResponse.state.get}, " + + s"exiting spark-submit JVM.") + System.exit(0) + case _ => + logTrace(s"State of $driverId is ${statusResponse.state.get}," + + s"continue monitoring driver status.") + } } } } else { diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md index cebec82e0ba06..a0fb95d7091c3 100644 --- a/docs/spark-standalone.md +++ b/docs/spark-standalone.md @@ -240,16 +240,6 @@ SPARK_MASTER_OPTS supports the following system properties: 1.6.3 - - spark.submit.waitAppCompletion - true - - In Standalone cluster mode, controls whether the client waits to exit until the application completes. - If set to true, the client process will stay alive reporting the application's status. - Otherwise, the client process will exit after submission. - - 3.1.0 - spark.worker.timeout 60 @@ -384,6 +374,24 @@ To run an interactive Spark shell against the cluster, run the following command You can also pass an option `--total-executor-cores ` to control the number of cores that spark-shell uses on the cluster. +#Spark Properties + +Spark applications supports the following configuration properties specific to standalone Mode: + + + + + + + + +
Property NameDefault ValueMeaningSince Version
spark.standalone.submit.waitAppCompletionfalse + In Standalone cluster mode, controls whether the client waits to exit until the application completes. + If set to true, the client process will stay alive reporting the application's status. + Otherwise, the client process will exit after submission. + 3.1.0
+ + # Launching Spark Applications The [`spark-submit` script](submitting-applications.html) provides the most straightforward way to From d5eded1a277864db36635d1e029a562024b3dd20 Mon Sep 17 00:00:00 2001 From: Akshat Bordia Date: Sat, 2 May 2020 23:12:58 +0530 Subject: [PATCH 05/12] Addressing documentation related review comments --- docs/spark-standalone.md | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md index a0fb95d7091c3..46980066271f4 100644 --- a/docs/spark-standalone.md +++ b/docs/spark-standalone.md @@ -374,17 +374,18 @@ To run an interactive Spark shell against the cluster, run the following command You can also pass an option `--total-executor-cores ` to control the number of cores that spark-shell uses on the cluster. -#Spark Properties +# Client Properties + +Spark applications supports the following configuration properties specific to standalone mode: -Spark applications supports the following configuration properties specific to standalone Mode: From 8eef373d5e7ffe1c4fe82eef836f16df642df934 Mon Sep 17 00:00:00 2001 From: Akshat Bordia Date: Sun, 3 May 2020 00:17:15 +0530 Subject: [PATCH 06/12] Fixing indentation --- core/src/main/scala/org/apache/spark/deploy/Client.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index 45735e3068c47..a1093d5aaf2e3 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -165,10 +165,10 @@ private class ClientEndpoint( case DriverState.FINISHED | DriverState.FAILED | DriverState.ERROR | DriverState.KILLED => logInfo(s"State of $driverId is ${statusResponse.state.get}, " + - s"exiting spark-submit JVM.") + s"exiting spark-submit JVM.") System.exit(0) case _ => - logTrace(s"State of $driverId is ${statusResponse.state.get}," + + logTrace(s"State of $driverId is ${statusResponse.state.get}, " + s"continue monitoring driver status.") } } From 09181060a7b648d29c0ce4f52bd55d038eb813f3 Mon Sep 17 00:00:00 2001 From: Akshat Bordia Date: Sun, 10 May 2020 17:57:38 +0530 Subject: [PATCH 07/12] Implementing the flag using asyn message --- .../org/apache/spark/deploy/Client.scala | 35 ++++++++++++------- 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index a1093d5aaf2e3..5a4210e7f8b00 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -64,6 +64,7 @@ private class ClientEndpoint( private val waitAppCompletion = conf.getBoolean("spark.standalone.submit.waitAppCompletion", false) private val REPORT_DRIVER_STATUS_INTERVAL = 1000 + private var submittedDriverID = "" private def getProperty(key: String, conf: SparkConf): Option[String] = { @@ -138,7 +139,6 @@ private class ClientEndpoint( logInfo("... waiting before polling master for driver state") Thread.sleep(5000) logInfo("... polling master for driver state") - while (true) { val statusResponse = activeMasterEndpoint.askSync[DriverStatusResponse](RequestDriverStatus(driverId)) if (statusResponse.found) { @@ -161,31 +161,21 @@ private class ClientEndpoint( s"exiting spark-submit JVM.") System.exit(0) } else { - statusResponse.state.get match { - case DriverState.FINISHED | DriverState.FAILED | - DriverState.ERROR | DriverState.KILLED => - logInfo(s"State of $driverId is ${statusResponse.state.get}, " + - s"exiting spark-submit JVM.") - System.exit(0) - case _ => - logTrace(s"State of $driverId is ${statusResponse.state.get}, " + - s"continue monitoring driver status.") - } + asyncSendToMasterAndForwardReply[DriverStatusResponse](RequestDriverStatus(driverId)) } } } else { logError(s"ERROR: Cluster master did not recognize $driverId") System.exit(-1) } - Thread.sleep(REPORT_DRIVER_STATUS_INTERVAL) } - } override def receive: PartialFunction[Any, Unit] = { case SubmitDriverResponse(master, success, driverId, message) => logInfo(message) if (success) { activeMasterEndpoint = master + submittedDriverID = driverId.get pollAndReportStatus(driverId.get) } else if (!Utils.responseFromBackup(message)) { System.exit(-1) @@ -200,6 +190,25 @@ private class ClientEndpoint( } else if (!Utils.responseFromBackup(message)) { System.exit(-1) } + + case DriverStatusResponse(found, state, _, _, _) => + if (found) { + state.get match { + case DriverState.FINISHED | DriverState.FAILED | + DriverState.ERROR | DriverState.KILLED => + logInfo(s"State of $submittedDriverID is ${state.get}, " + + s"exiting spark-submit JVM.") + System.exit(0) + case _ => + Thread.sleep(REPORT_DRIVER_STATUS_INTERVAL) + logInfo(s"State of $submittedDriverID is ${state.get}, " + + s"continue monitoring driver status.") + asyncSendToMasterAndForwardReply[DriverStatusResponse]( + RequestDriverStatus(submittedDriverID)) + } + } else { + System.exit(-1) + } } override def onDisconnected(remoteAddress: RpcAddress): Unit = { From e22549560719666f1ba9023da45dc3f08b65a802 Mon Sep 17 00:00:00 2001 From: Akshat Bordia Date: Sun, 10 May 2020 17:57:38 +0530 Subject: [PATCH 08/12] Implementing the flag using async DriverStatusResponse message --- .../org/apache/spark/deploy/Client.scala | 35 ++++++++++++------- 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index a1093d5aaf2e3..5a4210e7f8b00 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -64,6 +64,7 @@ private class ClientEndpoint( private val waitAppCompletion = conf.getBoolean("spark.standalone.submit.waitAppCompletion", false) private val REPORT_DRIVER_STATUS_INTERVAL = 1000 + private var submittedDriverID = "" private def getProperty(key: String, conf: SparkConf): Option[String] = { @@ -138,7 +139,6 @@ private class ClientEndpoint( logInfo("... waiting before polling master for driver state") Thread.sleep(5000) logInfo("... polling master for driver state") - while (true) { val statusResponse = activeMasterEndpoint.askSync[DriverStatusResponse](RequestDriverStatus(driverId)) if (statusResponse.found) { @@ -161,31 +161,21 @@ private class ClientEndpoint( s"exiting spark-submit JVM.") System.exit(0) } else { - statusResponse.state.get match { - case DriverState.FINISHED | DriverState.FAILED | - DriverState.ERROR | DriverState.KILLED => - logInfo(s"State of $driverId is ${statusResponse.state.get}, " + - s"exiting spark-submit JVM.") - System.exit(0) - case _ => - logTrace(s"State of $driverId is ${statusResponse.state.get}, " + - s"continue monitoring driver status.") - } + asyncSendToMasterAndForwardReply[DriverStatusResponse](RequestDriverStatus(driverId)) } } } else { logError(s"ERROR: Cluster master did not recognize $driverId") System.exit(-1) } - Thread.sleep(REPORT_DRIVER_STATUS_INTERVAL) } - } override def receive: PartialFunction[Any, Unit] = { case SubmitDriverResponse(master, success, driverId, message) => logInfo(message) if (success) { activeMasterEndpoint = master + submittedDriverID = driverId.get pollAndReportStatus(driverId.get) } else if (!Utils.responseFromBackup(message)) { System.exit(-1) @@ -200,6 +190,25 @@ private class ClientEndpoint( } else if (!Utils.responseFromBackup(message)) { System.exit(-1) } + + case DriverStatusResponse(found, state, _, _, _) => + if (found) { + state.get match { + case DriverState.FINISHED | DriverState.FAILED | + DriverState.ERROR | DriverState.KILLED => + logInfo(s"State of $submittedDriverID is ${state.get}, " + + s"exiting spark-submit JVM.") + System.exit(0) + case _ => + Thread.sleep(REPORT_DRIVER_STATUS_INTERVAL) + logInfo(s"State of $submittedDriverID is ${state.get}, " + + s"continue monitoring driver status.") + asyncSendToMasterAndForwardReply[DriverStatusResponse]( + RequestDriverStatus(submittedDriverID)) + } + } else { + System.exit(-1) + } } override def onDisconnected(remoteAddress: RpcAddress): Unit = { From 9050a088fee5b856bb30455673146c8105f53f12 Mon Sep 17 00:00:00 2001 From: Akshat Bordia Date: Sat, 16 May 2020 18:07:09 +0530 Subject: [PATCH 09/12] Using Thread Scheduler to Monitor Driver Status --- .../org/apache/spark/deploy/Client.scala | 63 +++++++++++-------- docs/spark-standalone.md | 2 +- 2 files changed, 37 insertions(+), 28 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index 5a4210e7f8b00..54bd6338deb9b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -17,6 +17,8 @@ package org.apache.spark.deploy +import java.util.concurrent.TimeUnit + import scala.collection.mutable.HashSet import scala.concurrent.ExecutionContext import scala.reflect.ClassTag @@ -63,7 +65,7 @@ private class ClientEndpoint( private var activeMasterEndpoint: RpcEndpointRef = null private val waitAppCompletion = conf.getBoolean("spark.standalone.submit.waitAppCompletion", false) - private val REPORT_DRIVER_STATUS_INTERVAL = 1000 + private val REPORT_DRIVER_STATUS_INTERVAL = 10000 private var submittedDriverID = "" @@ -110,6 +112,10 @@ private class ClientEndpoint( asyncSendToMasterAndForwardReply[SubmitDriverResponse]( RequestSubmitDriver(driverDescription)) + forwardMessageThread.scheduleAtFixedRate(() => Utils.tryLogNonFatalError { + MonitorDriverStatus() + }, 0, REPORT_DRIVER_STATUS_INTERVAL, TimeUnit.MILLISECONDS) + case "kill" => val driverId = driverArgs.driverId asyncSendToMasterAndForwardReply[KillDriverResponse](RequestKillDriver(driverId)) @@ -128,6 +134,11 @@ private class ClientEndpoint( }(forwardMessageExecutionContext) } } + private def MonitorDriverStatus(): Unit = { + if (submittedDriverID != "") { + asyncSendToMasterAndForwardReply[DriverStatusResponse](RequestDriverStatus(submittedDriverID)) + } + } /** * Find out driver status then exit the JVM. If the waitAppCompletion is set to true, monitors @@ -139,29 +150,30 @@ private class ClientEndpoint( logInfo("... waiting before polling master for driver state") Thread.sleep(5000) logInfo("... polling master for driver state") - val statusResponse = + val statusResponse = activeMasterEndpoint.askSync[DriverStatusResponse](RequestDriverStatus(driverId)) - if (statusResponse.found) { - logInfo(s"State of $driverId is ${statusResponse.state.get}") - // Worker node, if present - (statusResponse.workerId, statusResponse.workerHostPort, statusResponse.state) match { - case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) => - logInfo(s"Driver running on $hostPort ($id)") - case _ => - } - // Exception, if present - statusResponse.exception match { - case Some(e) => - logError(s"Exception from cluster was: $e") - e.printStackTrace() - System.exit(-1) - case _ => - if (!waitAppCompletion) { - logInfo(s"spark-submit not configured to wait for completion, " + - s"exiting spark-submit JVM.") - System.exit(0) + if (statusResponse.found) { + logInfo(s"State of $driverId is ${statusResponse.state.get}") + // Worker node, if present + (statusResponse.workerId, statusResponse.workerHostPort, statusResponse.state) match { + case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) => + logInfo(s"Driver running on $hostPort ($id)") + case _ => + } + // Exception, if present + statusResponse.exception match { + case Some(e) => + logError(s"Exception from cluster was: $e") + e.printStackTrace() + System.exit(-1) + case _ => + if (!waitAppCompletion) { + logInfo(s"spark-submit not configured to wait for completion, " + + s"exiting spark-submit JVM.") + System.exit(0) } else { - asyncSendToMasterAndForwardReply[DriverStatusResponse](RequestDriverStatus(driverId)) + logInfo(s"spark-submit is configured to wait for completion, " + + s"continue monitoring driver status.") } } } else { @@ -196,15 +208,12 @@ private class ClientEndpoint( state.get match { case DriverState.FINISHED | DriverState.FAILED | DriverState.ERROR | DriverState.KILLED => - logInfo(s"State of $submittedDriverID is ${state.get}, " + + logInfo(s"State of driver $submittedDriverID is ${state.get}, " + s"exiting spark-submit JVM.") System.exit(0) case _ => - Thread.sleep(REPORT_DRIVER_STATUS_INTERVAL) - logInfo(s"State of $submittedDriverID is ${state.get}, " + + logDebug(s"State of driver $submittedDriverID is ${state.get}, " + s"continue monitoring driver status.") - asyncSendToMasterAndForwardReply[DriverStatusResponse]( - RequestDriverStatus(submittedDriverID)) } } else { System.exit(-1) diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md index 46980066271f4..639e8cd977de7 100644 --- a/docs/spark-standalone.md +++ b/docs/spark-standalone.md @@ -385,7 +385,7 @@ Spark applications supports the following configuration properties specific to s From 45c9817fe3857b93703eef76c540671e2bc0762e Mon Sep 17 00:00:00 2001 From: Akshat Bordia Date: Fri, 22 May 2020 16:50:54 +0530 Subject: [PATCH 10/12] Refactoring to avoid multiple calls of pollAndReportStatus method --- .../org/apache/spark/deploy/Client.scala | 96 +++++++++---------- 1 file changed, 47 insertions(+), 49 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index 54bd6338deb9b..a52cfcb26424a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -29,6 +29,7 @@ import org.apache.log4j.Logger import org.apache.spark.{SecurityManager, SparkConf} import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.{DriverState, Master} +import org.apache.spark.deploy.master.DriverState.DriverState import org.apache.spark.internal.{config, Logging} import org.apache.spark.internal.config.Network.RPC_ASK_TIMEOUT import org.apache.spark.resource.ResourceUtils @@ -67,6 +68,7 @@ private class ClientEndpoint( false) private val REPORT_DRIVER_STATUS_INTERVAL = 10000 private var submittedDriverID = "" + private var driverStatusReported = false private def getProperty(key: String, conf: SparkConf): Option[String] = { @@ -112,14 +114,15 @@ private class ClientEndpoint( asyncSendToMasterAndForwardReply[SubmitDriverResponse]( RequestSubmitDriver(driverDescription)) - forwardMessageThread.scheduleAtFixedRate(() => Utils.tryLogNonFatalError { - MonitorDriverStatus() - }, 0, REPORT_DRIVER_STATUS_INTERVAL, TimeUnit.MILLISECONDS) - case "kill" => val driverId = driverArgs.driverId + submittedDriverID = driverId asyncSendToMasterAndForwardReply[KillDriverResponse](RequestKillDriver(driverId)) } + logInfo("... waiting before polling master for driver state") + forwardMessageThread.scheduleAtFixedRate(() => Utils.tryLogNonFatalError { + monitorDriverStatus() + }, 5000, REPORT_DRIVER_STATUS_INTERVAL, TimeUnit.MILLISECONDS) } /** @@ -134,50 +137,61 @@ private class ClientEndpoint( }(forwardMessageExecutionContext) } } - private def MonitorDriverStatus(): Unit = { + + private def monitorDriverStatus(): Unit = { if (submittedDriverID != "") { asyncSendToMasterAndForwardReply[DriverStatusResponse](RequestDriverStatus(submittedDriverID)) } } /** - * Find out driver status then exit the JVM. If the waitAppCompletion is set to true, monitors - * the application until it finishes, fails or is killed. + * Processes and reports the driver status then exit the JVM if the + * waitAppCompletion is set to false, else reports the driver status + * if debug logs are enabled. */ - def pollAndReportStatus(driverId: String): Unit = { - // Since ClientEndpoint is the only RpcEndpoint in the process, blocking the event loop thread - // is fine. - logInfo("... waiting before polling master for driver state") - Thread.sleep(5000) - logInfo("... polling master for driver state") - val statusResponse = - activeMasterEndpoint.askSync[DriverStatusResponse](RequestDriverStatus(driverId)) - if (statusResponse.found) { - logInfo(s"State of $driverId is ${statusResponse.state.get}") - // Worker node, if present - (statusResponse.workerId, statusResponse.workerHostPort, statusResponse.state) match { - case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) => - logInfo(s"Driver running on $hostPort ($id)") - case _ => + def reportDriverStatus(found: Boolean, state: Option[DriverState], + workerId: Option[String], + workerHostPort: Option[String], + exception: Option[Exception]): Unit = { + if (found) { + // Using driverStatusReported to avoid writing following + // logs again when waitAppCompletion is set to true + if (!driverStatusReported) { + driverStatusReported = true + logInfo(s"State of $submittedDriverID is ${state.get}") + // Worker node, if present + (workerId, workerHostPort, state) match { + case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) => + logInfo(s"Driver running on $hostPort ($id)") + case _ => + } } // Exception, if present - statusResponse.exception match { + exception match { case Some(e) => logError(s"Exception from cluster was: $e") e.printStackTrace() System.exit(-1) case _ => - if (!waitAppCompletion) { - logInfo(s"spark-submit not configured to wait for completion, " + - s"exiting spark-submit JVM.") - System.exit(0) - } else { - logInfo(s"spark-submit is configured to wait for completion, " + - s"continue monitoring driver status.") + state.get match { + case DriverState.FINISHED | DriverState.FAILED | + DriverState.ERROR | DriverState.KILLED => + logInfo(s"State of driver $submittedDriverID is ${state.get}, " + + s"exiting spark-submit JVM.") + System.exit(0) + case _ => + if (!waitAppCompletion) { + logInfo(s"spark-submit not configured to wait for completion, " + + s"exiting spark-submit JVM.") + System.exit(0) + } else { + logDebug(s"State of driver $submittedDriverID is ${state.get}, " + + s"continue monitoring driver status.") + } } } } else { - logError(s"ERROR: Cluster master did not recognize $driverId") + logError(s"ERROR: Cluster master did not recognize $submittedDriverID") System.exit(-1) } } @@ -188,36 +202,20 @@ private class ClientEndpoint( if (success) { activeMasterEndpoint = master submittedDriverID = driverId.get - pollAndReportStatus(driverId.get) } else if (!Utils.responseFromBackup(message)) { System.exit(-1) } - case KillDriverResponse(master, driverId, success, message) => logInfo(message) if (success) { activeMasterEndpoint = master - pollAndReportStatus(driverId) } else if (!Utils.responseFromBackup(message)) { System.exit(-1) } - case DriverStatusResponse(found, state, _, _, _) => - if (found) { - state.get match { - case DriverState.FINISHED | DriverState.FAILED | - DriverState.ERROR | DriverState.KILLED => - logInfo(s"State of driver $submittedDriverID is ${state.get}, " + - s"exiting spark-submit JVM.") - System.exit(0) - case _ => - logDebug(s"State of driver $submittedDriverID is ${state.get}, " + - s"continue monitoring driver status.") - } - } else { - System.exit(-1) - } + case DriverStatusResponse(found, state, workerId, workerHostPort, exception) => + reportDriverStatus(found, state, workerId, workerHostPort, exception) } override def onDisconnected(remoteAddress: RpcAddress): Unit = { From 743d93de74dd500bc15d44b462d3ef208df09557 Mon Sep 17 00:00:00 2001 From: Akshat Bordia Date: Tue, 2 Jun 2020 13:52:09 +0530 Subject: [PATCH 11/12] Fixing indentation --- .../main/scala/org/apache/spark/deploy/Client.scala | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index a52cfcb26424a..3d53d1b69a11d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -149,10 +149,13 @@ private class ClientEndpoint( * waitAppCompletion is set to false, else reports the driver status * if debug logs are enabled. */ - def reportDriverStatus(found: Boolean, state: Option[DriverState], - workerId: Option[String], - workerHostPort: Option[String], - exception: Option[Exception]): Unit = { + + def reportDriverStatus( + found: Boolean, + state: Option[DriverState], + workerId: Option[String], + workerHostPort: Option[String], + exception: Option[Exception]): Unit = { if (found) { // Using driverStatusReported to avoid writing following // logs again when waitAppCompletion is set to true From fe142a82e9ccaccf19234408d8e398187a3351ab Mon Sep 17 00:00:00 2001 From: Akshat Bordia Date: Tue, 2 Jun 2020 13:59:58 +0530 Subject: [PATCH 12/12] Fixing Indentation for method parameters --- .../main/scala/org/apache/spark/deploy/Client.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/Client.scala b/core/src/main/scala/org/apache/spark/deploy/Client.scala index 3d53d1b69a11d..6beea5646f63b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/Client.scala @@ -151,11 +151,11 @@ private class ClientEndpoint( */ def reportDriverStatus( - found: Boolean, - state: Option[DriverState], - workerId: Option[String], - workerHostPort: Option[String], - exception: Option[Exception]): Unit = { + found: Boolean, + state: Option[DriverState], + workerId: Option[String], + workerHostPort: Option[String], + exception: Option[Exception]): Unit = { if (found) { // Using driverStatusReported to avoid writing following // logs again when waitAppCompletion is set to true
Property NameDefault ValueMeaningSince Version
spark.standalone.submit.waitAppCompletion false - In Standalone cluster mode, controls whether the client waits to exit until the application completes. - If set to true, the client process will stay alive reporting the application's status. + In standalone cluster mode, controls whether the client waits to exit until the application completes. + If set to true, the client process will stay alive polling the application's status. Otherwise, the client process will exit after submission. 3.1.0false In standalone cluster mode, controls whether the client waits to exit until the application completes. - If set to true, the client process will stay alive polling the application's status. + If set to true, the client process will stay alive polling the driver's status. Otherwise, the client process will exit after submission. 3.1.0