Skip to content

Commit 06df34d

Browse files
Devaraj KMarcelo Vanzin
authored andcommitted
[SPARK-11034][LAUNCHER][MESOS] Launcher: add support for monitoring Mesos apps
## What changes were proposed in this pull request? Added Launcher support for monitoring Mesos apps in Client mode. SPARK-11033 can handle the support for Mesos/Cluster mode since the Standalone/Cluster and Mesos/Cluster modes use the same code at client side. ## How was this patch tested? I verified it manually by running launcher application, able to launch, stop and kill the mesos applications and also can invoke other launcher API's. Author: Devaraj K <[email protected]> Closes #19385 from devaraj-kavali/SPARK-11034.
1 parent 1bb8b76 commit 06df34d

File tree

1 file changed

+26
-2
lines changed

1 file changed

+26
-2
lines changed

resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import org.apache.spark.{SecurityManager, SparkContext, SparkException, TaskStat
3232
import org.apache.spark.deploy.mesos.config._
3333
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
3434
import org.apache.spark.internal.config
35+
import org.apache.spark.launcher.{LauncherBackend, SparkAppHandle}
3536
import org.apache.spark.network.netty.SparkTransportConf
3637
import org.apache.spark.network.shuffle.mesos.MesosExternalShuffleClient
3738
import org.apache.spark.rpc.RpcEndpointAddress
@@ -89,6 +90,13 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
8990
// Synchronization protected by stateLock
9091
private[this] var stopCalled: Boolean = false
9192

93+
private val launcherBackend = new LauncherBackend() {
94+
override protected def onStopRequest(): Unit = {
95+
stopSchedulerBackend()
96+
setState(SparkAppHandle.State.KILLED)
97+
}
98+
}
99+
92100
// If shuffle service is enabled, the Spark driver will register with the shuffle service.
93101
// This is for cleaning up shuffle files reliably.
94102
private val shuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
@@ -182,6 +190,9 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
182190
override def start() {
183191
super.start()
184192

193+
if (sc.deployMode == "client") {
194+
launcherBackend.connect()
195+
}
185196
val startedBefore = IdHelper.startedBefore.getAndSet(true)
186197

187198
val suffix = if (startedBefore) {
@@ -202,6 +213,7 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
202213
sc.conf.getOption("spark.mesos.driver.frameworkId").map(_ + suffix)
203214
)
204215

216+
launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
205217
startScheduler(driver)
206218
}
207219

@@ -295,15 +307,21 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
295307
this.mesosExternalShuffleClient.foreach(_.init(appId))
296308
this.schedulerDriver = driver
297309
markRegistered()
310+
launcherBackend.setAppId(appId)
311+
launcherBackend.setState(SparkAppHandle.State.RUNNING)
298312
}
299313

300314
override def sufficientResourcesRegistered(): Boolean = {
301315
totalCoreCount.get >= maxCoresOption.getOrElse(0) * minRegisteredRatio
302316
}
303317

304-
override def disconnected(d: org.apache.mesos.SchedulerDriver) {}
318+
override def disconnected(d: org.apache.mesos.SchedulerDriver) {
319+
launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
320+
}
305321

306-
override def reregistered(d: org.apache.mesos.SchedulerDriver, masterInfo: MasterInfo) {}
322+
override def reregistered(d: org.apache.mesos.SchedulerDriver, masterInfo: MasterInfo) {
323+
launcherBackend.setState(SparkAppHandle.State.RUNNING)
324+
}
307325

308326
/**
309327
* Method called by Mesos to offer resources on slaves. We respond by launching an executor,
@@ -611,6 +629,12 @@ private[spark] class MesosCoarseGrainedSchedulerBackend(
611629
}
612630

613631
override def stop() {
632+
stopSchedulerBackend()
633+
launcherBackend.setState(SparkAppHandle.State.FINISHED)
634+
launcherBackend.close()
635+
}
636+
637+
private def stopSchedulerBackend() {
614638
// Make sure we're not launching tasks during shutdown
615639
stateLock.synchronized {
616640
if (stopCalled) {

0 commit comments

Comments
 (0)