diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index e86e7923f8622..84295f64e02fe 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -218,8 +218,8 @@ class SparkContext( } catch { // TODO: Enumerate the exact reasons why it can fail // But irrespective of it, it means we cannot proceed ! - case th: Throwable => { - throw new SparkException("YARN mode not available ?", th) + case e: Exception => { + throw new SparkException("YARN mode not available ?", e) } } val backend = new CoarseGrainedSchedulerBackend(scheduler, this.env.actorSystem) @@ -233,8 +233,8 @@ class SparkContext( cons.newInstance(this).asInstanceOf[ClusterScheduler] } catch { - case th: Throwable => { - throw new SparkException("YARN mode not available ?", th) + case e: Exception => { + throw new SparkException("YARN mode not available ?", e) } } @@ -243,8 +243,8 @@ class SparkContext( val cons = clazz.getConstructor(classOf[ClusterScheduler], classOf[SparkContext]) cons.newInstance(scheduler, this).asInstanceOf[CoarseGrainedSchedulerBackend] } catch { - case th: Throwable => { - throw new SparkException("YARN mode not available ?", th) + case e: Exception => { + throw new SparkException("YARN mode not available ?", e) } } diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 12b4d94a567ce..2b9bf37ee7ff7 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -265,7 +265,6 @@ private[spark] object PythonRDD { } } catch { case eof: EOFException => {} - case e => throw e } JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism)) } diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala index 67d45723badd8..4251e9d375f94 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala @@ -58,13 +58,11 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String try { new Socket(daemonHost, daemonPort) } catch { - case exc: SocketException => { + case exc: SocketException => logWarning("Python daemon unexpectedly quit, attempting to restart") stopDaemon() startDaemon() new Socket(daemonHost, daemonPort) - } - case e => throw e } } } diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index fc1537f7963c4..530c079edd67a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -73,7 +73,7 @@ object SparkHadoopUtil { try { Class.forName("org.apache.spark.deploy.yarn.YarnSparkHadoopUtil").newInstance.asInstanceOf[SparkHadoopUtil] } catch { - case th: Throwable => throw new SparkException("Unable to load YARN support", th) + case e: Exception => throw new SparkException("Unable to load YARN support", e) } } else { new SparkHadoopUtil diff --git a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala index 77422f61ec901..536eda5f72cea 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/Client.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/Client.scala @@ -33,6 +33,7 @@ import org.apache.spark.Logging import org.apache.spark.deploy.{ApplicationDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.Master +import org.apache.spark.util.Utils /** @@ -61,6 +62,7 @@ private[spark] class Client( var masterAddress: Address = null var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times var alreadyDead = false // To avoid calling listener.dead() multiple times + var registrationRetryTimer: Option[Cancellable] = None override def preStart() { try { @@ -85,19 +87,21 @@ private[spark] class Client( tryRegisterAllMasters() var retries = 0 - lazy val retryTimer: Cancellable = + registrationRetryTimer = Some { context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) { - retries += 1 - if (registered) { - retryTimer.cancel() - } else if (retries >= REGISTRATION_RETRIES) { - logError("All masters are unresponsive! Giving up.") - markDead() - } else { - tryRegisterAllMasters() + Utils.tryOrExit { + retries += 1 + if (registered) { + registrationRetryTimer.foreach(_.cancel()) + } else if (retries >= REGISTRATION_RETRIES) { + logError("All masters are unresponsive! Giving up.") + markDead() + } else { + tryRegisterAllMasters() + } } } - retryTimer // start timer + } } def changeMaster(url: String) { @@ -174,6 +178,10 @@ private[spark] class Client( alreadyDead = true } } + + override def postStop() { + registrationRetryTimer.foreach(_.cancel()) + } } def start() { diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 2b537c91b4c84..bab065b45ed19 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -85,6 +85,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act var leaderElectionAgent: ActorRef = _ + private var recoveryCompletionTask: Cancellable = _ + // As a temporary workaround before better ways of configuring memory, we allow users to set // a flag that will perform round-robin scheduling across the nodes (spreading out each app // among all the nodes) instead of trying to consolidate each app onto a small # of nodes. @@ -128,6 +130,10 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act } override def postStop() { + // prevent the CompleteRecovery message sending to restarted master + if (recoveryCompletionTask != null) { + recoveryCompletionTask.cancel() + } webUi.stop() masterMetricsSystem.stop() applicationMetricsSystem.stop() @@ -147,10 +153,13 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act if (state == RecoveryState.RECOVERING) { beginRecovery(storedApps, storedWorkers) - context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis) { completeRecovery() } + recoveryCompletionTask = context.system.scheduler.scheduleOnce(WORKER_TIMEOUT millis, self, + CompleteRecovery) } } + case CompleteRecovery => completeRecovery() + case RevokedLeadership => { logError("Leadership has been revoked -- master shutting down.") System.exit(0) @@ -350,7 +359,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act * Schedule the currently available resources among waiting apps. This method will be called * every time a new app joins or resource availability changes. */ - def schedule() { + private def schedule() { if (state != RecoveryState.ALIVE) { return } // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app // in the queue, then the second app, etc. @@ -358,7 +367,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act // Try to spread out each app among all the nodes, until it has all its cores for (app <- waitingApps if app.coresLeft > 0) { val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) - .filter(canUse(app, _)).sortBy(_.coresFree).reverse + .filter(canUse(app, _)).sortBy(_.coresFree).reverse val numUsable = usableWorkers.length val assigned = new Array[Int](numUsable) // Number of cores to give on each node var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 216d9d44ac619..6b4a284038689 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -85,6 +85,8 @@ private[spark] class Worker( val metricsSystem = MetricsSystem.createMetricsSystem("worker") val workerSource = new WorkerSource(this) + var registrationRetryTimer: Option[Cancellable] = None + def coresFree: Int = cores - coresUsed def memoryFree: Int = memory - memoryUsed @@ -144,21 +146,22 @@ private[spark] class Worker( def registerWithMaster() { tryRegisterAllMasters() - var retries = 0 - lazy val retryTimer: Cancellable = + registrationRetryTimer = Some { context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) { - retries += 1 - if (registered) { - retryTimer.cancel() - } else if (retries >= REGISTRATION_RETRIES) { - logError("All masters are unresponsive! Giving up.") - System.exit(1) - } else { - tryRegisterAllMasters() + Utils.tryOrExit { + retries += 1 + if (registered) { + registrationRetryTimer.foreach(_.cancel()) + } else if (retries >= REGISTRATION_RETRIES) { + logError("All masters are unresponsive! Giving up.") + System.exit(1) + } else { + tryRegisterAllMasters() + } } } - retryTimer // start timer + } } override def receive = { @@ -260,6 +263,7 @@ private[spark] class Worker( } override def postStop() { + registrationRetryTimer.foreach(_.cancel()) executors.values.foreach(_.kill()) webUi.stop() metricsSystem.stop() diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 893e05837040c..dbe8985915236 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -79,28 +79,7 @@ private[spark] class Executor( // Setup an uncaught exception handler for non-local mode. // Make any thread terminations due to uncaught exceptions kill the entire // executor process to avoid surprising stalls. - Thread.setDefaultUncaughtExceptionHandler( - new Thread.UncaughtExceptionHandler { - override def uncaughtException(thread: Thread, exception: Throwable) { - try { - logError("Uncaught exception in thread " + thread, exception) - - // We may have been called from a shutdown hook. If so, we must not call System.exit(). - // (If we do, we will deadlock.) - if (!Utils.inShutdown()) { - if (exception.isInstanceOf[OutOfMemoryError]) { - System.exit(ExecutorExitCode.OOM) - } else { - System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION) - } - } - } catch { - case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM) - case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE) - } - } - } - ) + Thread.setDefaultUncaughtExceptionHandler(ExecutorUncaughtExceptionHandler) } val executorSource = new ExecutorSource(this, executorId) @@ -258,6 +237,11 @@ private[spark] class Executor( } case t: Throwable => { + // Attempt to exit cleanly by informing the driver of our failure. + // If anything goes wrong (or this was a fatal exception), we will delegate to + // the default uncaught exception handler, which will terminate the Executor. + logError("Exception in task ID " + taskId, t) + val serviceTime = (System.currentTimeMillis() - taskStart).toInt val metrics = attemptedTask.flatMap(t => t.metrics) for (m <- metrics) { @@ -267,11 +251,11 @@ private[spark] class Executor( val reason = ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics) execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) - // TODO: Should we exit the whole executor here? On the one hand, the failed task may - // have left some weird state around depending on when the exception was thrown, but on - // the other hand, maybe we could detect that when future tasks fail and exit then. - logError("Exception in task ID " + taskId, t) - //System.exit(1) + // Don't forcibly exit unless the exception was inherently fatal, to avoid + // stopping other tasks unnecessarily. + if (Utils.isFatalError(t)) { + ExecutorUncaughtExceptionHandler.uncaughtException(t) + } } } finally { runningTasks.remove(taskId) diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorUncaughtExceptionHandler.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorUncaughtExceptionHandler.scala new file mode 100644 index 0000000000000..b0e984c03964c --- /dev/null +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorUncaughtExceptionHandler.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import org.apache.spark.Logging +import org.apache.spark.util.Utils + +/** + * The default uncaught exception handler for Executors terminates the whole process, to avoid + * getting into a bad state indefinitely. Since Executors are relatively lightweight, it's better + * to fail fast when things go wrong. + */ +private[spark] object ExecutorUncaughtExceptionHandler + extends Thread.UncaughtExceptionHandler with Logging { + + override def uncaughtException(thread: Thread, exception: Throwable) { + try { + logError("Uncaught exception in thread " + thread, exception) + + // We may have been called from a shutdown hook. If so, we must not call System.exit(). + // (If we do, we will deadlock.) + if (!Utils.inShutdown()) { + if (exception.isInstanceOf[OutOfMemoryError]) { + System.exit(ExecutorExitCode.OOM) + } else { + System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION) + } + } + } catch { + case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM) + case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE) + } + } + + def uncaughtException(exception: Throwable) { + uncaughtException(Thread.currentThread(), exception) + } +} diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index a34c95b6f07b6..dd78931a79e24 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -128,7 +128,7 @@ private[spark] class BlockManager( BlockManagerWorker.startBlockManagerWorker(this) if (!BlockManager.getDisableHeartBeatsForTesting) { heartBeatTask = actorSystem.scheduler.schedule(0.seconds, heartBeatFrequency.milliseconds) { - heartBeat() + Utils.tryOrExit { heartBeat() } } } } diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index fcd2e97982955..cadb3a436bb03 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -125,14 +125,14 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD private def addShutdownHook() { localDirs.foreach(localDir => Utils.registerShutdownDeleteDir(localDir)) Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dirs") { - override def run() { + override def run(): Unit = Utils.logUncaughtExceptions { logDebug("Shutdown hook called") localDirs.foreach { localDir => try { if (!Utils.hasRootAsShutdownDeleteDir(localDir)) Utils.deleteRecursively(localDir) } catch { - case t: Throwable => - logError("Exception while deleting local spark dir: " + localDir, t) + case e: Exception => + logError("Exception while deleting local spark dir: " + localDir, e) } } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index fe932d8ede2f3..a24d5ca0b8caf 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -19,6 +19,7 @@ package org.apache.spark.util import java.io._ import java.net.{InetAddress, URL, URI, NetworkInterface, Inet4Address} +import java.nio.ByteBuffer import java.util.{Locale, Random, UUID} import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadPoolExecutor} @@ -26,17 +27,17 @@ import scala.collection.Map import scala.collection.mutable.ArrayBuffer import scala.collection.JavaConversions._ import scala.io.Source +import scala.util.control.{ControlThrowable, NonFatal} import com.google.common.io.Files import com.google.common.util.concurrent.ThreadFactoryBuilder import org.apache.hadoop.fs.{Path, FileSystem, FileUtil} -import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance} -import org.apache.spark.deploy.SparkHadoopUtil -import java.nio.ByteBuffer import org.apache.spark.{SparkException, Logging} - +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.executor.ExecutorUncaughtExceptionHandler +import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance} /** * Various utility methods used by Spark. @@ -634,6 +635,18 @@ private[spark] object Utils extends Logging { output.toString } + /** + * Execute a block of code that evaluates to Unit, forwarding any uncaught exceptions to the + * default UncaughtExceptionHandler + */ + def tryOrExit(block: => Unit) { + try { + block + } catch { + case t: Throwable => ExecutorUncaughtExceptionHandler.uncaughtException(t) + } + } + /** * A regular expression to match classes of the "core" Spark API that we want to skip when * finding the call site of a method. @@ -823,4 +836,29 @@ private[spark] object Utils extends Logging { return System.getProperties().clone() .asInstanceOf[java.util.Properties].toMap[String, String] } + + /** + * Executes the given block, printing and re-throwing any uncaught exceptions. + * This is particularly useful for wrapping code that runs in a thread, to ensure + * that exceptions are printed, and to avoid having to catch Throwable. + */ + def logUncaughtExceptions[T](f: => T): T = { + try { + f + } catch { + case t: Throwable => + logError("Uncaught exception in thread " + Thread.currentThread().getName, t) + throw t + } + } + + /** Returns true if the given exception was fatal. See docs for scala.util.control.NonFatal. */ + def isFatalError(e: Throwable): Boolean = { + e match { + case NonFatal(_) | _: InterruptedException | _: ControlThrowable => + false + case _ => + true + } + } }