Skip to content

Commit 3d4dfa1

Browse files
Luc Bourliertnachen
authored andcommitted
Adds support to kill submissions
1 parent febfaba commit 3d4dfa1

File tree

1 file changed

+15
-9
lines changed

1 file changed

+15
-9
lines changed

core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -122,17 +122,23 @@ private[spark] class MesosClusterScheduler(conf: SparkConf)
122122
def killDriver(submissionId: String): KillResponse = {
123123
stateLock.synchronized {
124124
if (launchedDrivers.contains(submissionId)) {
125-
// Kill the JOB!!!!
126-
return KillResponse(submissionId, true, Option("Killing running driver"))
125+
// Check if submission is running
126+
val task = launchedDrivers(submissionId)
127+
driver.killTask(task.taskId)
128+
Some(KillResponse(submissionId, true, Option("Killing running driver")))
129+
} else {
130+
None
127131
}
132+
}.orElse {
133+
// Check if submission is queued
134+
if (queue.remove(new DriverSubmission(submissionId, null, null))) {
135+
Some(KillResponse(submissionId, true, Option("Removed driver while it's still pending")))
136+
} else {
137+
None
138+
}
139+
}.getOrElse{
140+
KillResponse(submissionId, false, Option("Cannot find driver"))
128141
}
129-
130-
// Check if submission is queued
131-
if (queue.remove(new DriverSubmission(submissionId, null, null))) {
132-
return KillResponse(submissionId, true, Option("Removed driver while it's still pending"))
133-
}
134-
135-
KillResponse(submissionId, false, Option("Cannot find driver"))
136142
}
137143

138144
def start() {

0 commit comments

Comments
 (0)