From c16fb5ff62a943d2c17524f6e8a328acfc8dfd82 Mon Sep 17 00:00:00 2001 From: Devaraj K Date: Tue, 17 May 2016 14:02:13 +0530 Subject: [PATCH 1/2] [SPARK-15359] [Mesos] Mesos dispatcher should handle DRIVER_ABORTED status from mesosDriver.run() --- .../cluster/mesos/MesosSchedulerUtils.scala | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala index 7355ba317d9a0..acfb4e389f0d1 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala @@ -120,14 +120,25 @@ private[mesos] trait MesosSchedulerUtils extends Logging { val ret = mesosDriver.run() logInfo("driver.run() returned with code " + ret) if (ret != null && ret.equals(Status.DRIVER_ABORTED)) { - error = Some(new SparkException("Error starting driver, DRIVER_ABORTED")) - markErr() + val ex = new SparkException("Error starting driver, DRIVER_ABORTED") + // if the driver gets aborted after the successful registration + if (registerLatch.getCount == 0) { + throw ex + } else { + error = Some(ex) + markErr() + } } } catch { case e: Exception => logError("driver.run() failed", e) - error = Some(e) - markErr() + // if any exception occurs after the successful registration + if (registerLatch.getCount == 0) { + throw e + } else { + error = Some(e) + markErr() + } } } }.start() From b9893a3c259e64cd98967b4157115d232cab343e Mon Sep 17 00:00:00 2001 From: Devaraj K Date: Tue, 18 Apr 2017 11:12:53 -0700 Subject: [PATCH 2/2] Changed from newDriver.run() to newDriver.start() --- .../cluster/mesos/MesosSchedulerUtils.scala | 45 +++---------------- 1 file changed, 5 insertions(+), 40 deletions(-) diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala index 1b04885173fe6..fedad790a3dba 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala @@ -112,46 +112,11 @@ trait MesosSchedulerUtils extends Logging { */ def startScheduler(newDriver: SchedulerDriver): Unit = { synchronized { - @volatile - var error: Option[Exception] = None - - // We create a new thread that will block inside `mesosDriver.run` - // until the scheduler exists - new Thread(Utils.getFormattedClassName(this) + "-mesos-driver") { - setDaemon(true) - override def run() { - try { - val ret = newDriver.run() - logInfo("driver.run() returned with code " + ret) - if (ret != null && ret.equals(Status.DRIVER_ABORTED)) { - val ex = new SparkException("Error starting driver, DRIVER_ABORTED") - // if the driver gets aborted after the successful registration - if (registerLatch.getCount == 0) { - throw ex - } else { - error = Some(ex) - markErr() - } - } - } catch { - case e: Exception => - logError("driver.run() failed", e) - // if any exception occurs after the successful registration - if (registerLatch.getCount == 0) { - throw e - } else { - error = Some(e) - markErr() - } - } - } - }.start() - - registerLatch.await() - - // propagate any error to the calling thread. This ensures that SparkContext creation fails - // without leaving a broken context that won't be able to schedule any tasks - error.foreach(throw _) + val ret = newDriver.start() + logInfo("driver.start() returned with code " + ret) + if (ret != null && ret.equals(Status.DRIVER_ABORTED)) { + throw new SparkException("Error starting driver, DRIVER_ABORTED") + } } }