From 8e9f479edd5cbacbd6622d61e614b36a2ce2b86b Mon Sep 17 00:00:00 2001 From: Anant Date: Mon, 16 Jun 2014 23:42:27 -0700 Subject: [PATCH 1/7] SPARK-1990: added compatibility for python 2.6 for ssh_read command https://issues.apache.org/jira/browse/SPARK-1990 There were some posts on the lists that spark-ec2 does not work with Python 2.6. In addition, we should check the Python version at the top of the script and exit if it's too old Author: Anant Closes #941 from anantasty/SPARK-1990 and squashes the following commits: 4ca441d [Anant] Implmented check_optput withinthe module to work with python 2.6 c6ed85c [Anant] added compatibility for python 2.6 for ssh_read command Conflicts: ec2/spark_ec2.py --- ec2/spark_ec2.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index cfc61b20100c6..8f6a69cab1ba8 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -629,9 +629,23 @@ def ssh(host, opts, command): time.sleep(30) tries = tries + 1 +# Backported from Python 2.7 for compatiblity with 2.6 (See SPARK-1990) +def _check_output(*popenargs, **kwargs): + if 'stdout' in kwargs: + raise ValueError('stdout argument not allowed, it will be overridden.') + process = subprocess.Popen(stdout=PIPE, *popenargs, **kwargs) + output, unused_err = process.communicate() + retcode = process.poll() + if retcode: + cmd = kwargs.get("args") + if cmd is None: + cmd = popenargs[0] + raise subprocess.CalledProcessError(retcode, cmd, output=output) + return output + def ssh_read(host, opts, command): - return subprocess.check_output( + return _check_output( ssh_command(opts) + ['%s@%s' % (opts.user, host), stringify_command(command)]) From 2a2eacef4b469beb7d499c29baff9d6f52d0e25e Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Tue, 17 Jun 2014 15:09:24 -0700 Subject: [PATCH 2/7] HOTFIX: bug caused by #941 This patch should have qualified the use of PIPE. This needs to be back ported into 0.9 and 1.0. Author: Patrick Wendell Closes #1108 from pwendell/hotfix and squashes the following commits: 711c58d [Patrick Wendell] HOTFIX: bug caused by #941 (cherry picked from commit b2ebf429e24566c29850c570f8d76943151ad78c) Signed-off-by: Xiangrui Meng --- ec2/spark_ec2.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py index 8f6a69cab1ba8..6d06a5db53625 100755 --- a/ec2/spark_ec2.py +++ b/ec2/spark_ec2.py @@ -633,7 +633,7 @@ def ssh(host, opts, command): def _check_output(*popenargs, **kwargs): if 'stdout' in kwargs: raise ValueError('stdout argument not allowed, it will be overridden.') - process = subprocess.Popen(stdout=PIPE, *popenargs, **kwargs) + process = subprocess.Popen(stdout=subprocess.PIPE, *popenargs, **kwargs) output, unused_err = process.communicate() retcode = process.poll() if retcode: From ef8501d33eaef15a793475ba34af5ac83e9e9d7c Mon Sep 17 00:00:00 2001 From: Ori Kremer Date: Sun, 22 Jun 2014 20:21:23 -0700 Subject: [PATCH 3/7] SPARK-2241: quote command line args in ec2 script To preserve quoted command line args (in case options have space in them). Author: Ori Kremer Closes #1169 from orikremer/quote_cmd_line_args and squashes the following commits: 67e2aa1 [Ori Kremer] quote command line args (cherry picked from commit 9fc373e3a9a8ba7bea9df0950775f48918f63a8a) Signed-off-by: Patrick Wendell --- ec2/spark-ec2 | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ec2/spark-ec2 b/ec2/spark-ec2 index 454057aa0d279..31f9771223e51 100755 --- a/ec2/spark-ec2 +++ b/ec2/spark-ec2 @@ -19,4 +19,4 @@ # cd "`dirname $0`" -PYTHONPATH="./third_party/boto-2.4.1.zip/boto-2.4.1:$PYTHONPATH" python ./spark_ec2.py $@ +PYTHONPATH="./third_party/boto-2.4.1.zip/boto-2.4.1:$PYTHONPATH" python ./spark_ec2.py "$@" From 9509819717bf2c23e2a5bac6303dd4770bc2dd71 Mon Sep 17 00:00:00 2001 From: "Wenchen Fan(Cloud)" Date: Tue, 3 Jun 2014 13:18:20 -0700 Subject: [PATCH 4/7] [SPARK-1912] fix compress memory issue during reduce When we need to read a compressed block, we will first create a compress stream instance(LZF or Snappy) and use it to wrap that block. Let's say a reducer task need to read 1000 local shuffle blocks, it will first prepare to read that 1000 blocks, which means create 1000 compression stream instance to wrap them. But the initialization of compression instance will allocate some memory and when we have many compression instance at the same time, it is a problem. Actually reducer reads the shuffle blocks one by one, so we can do the compression instance initialization lazily. Author: Wenchen Fan(Cloud) Closes #860 from cloud-fan/fix-compress and squashes the following commits: 0924a6b [Wenchen Fan(Cloud)] rename 'doWork' into 'getIterator' 07f32c2 [Wenchen Fan(Cloud)] move the LazyProxyIterator to dataDeserialize d80c426 [Wenchen Fan(Cloud)] remove empty lines in short class 2c8adb2 [Wenchen Fan(Cloud)] add inline comment 8ebff77 [Wenchen Fan(Cloud)] fix compress memory issue during reduce --- .../apache/spark/storage/BlockManager.scala | 22 +++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 41cca8dffcee7..261566077327d 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -842,8 +842,26 @@ private[spark] class BlockManager( bytes: ByteBuffer, serializer: Serializer = defaultSerializer): Iterator[Any] = { bytes.rewind() - val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true)) - serializer.newInstance().deserializeStream(stream).asIterator + + def getIterator = { + val stream = wrapForCompression(blockId, new ByteBufferInputStream(bytes, true)) + serializer.newInstance().deserializeStream(stream).asIterator + } + + if (blockId.isShuffle) { + // Reducer may need to read many local shuffle blocks and will wrap them into Iterators + // at the beginning. The wrapping will cost some memory (compression instance + // initialization, etc.). Reducer read shuffle blocks one by one so we could do the + // wrapping lazily to save memory. + class LazyProxyIterator(f: => Iterator[Any]) extends Iterator[Any] { + lazy val proxy = f + override def hasNext: Boolean = proxy.hasNext + override def next(): Any = proxy.next() + } + new LazyProxyIterator(getIterator) + } else { + getIterator + } } def stop() { From 184093ac11b0e78d723d610f8af40b98649f47a3 Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Wed, 14 May 2014 10:07:25 -0700 Subject: [PATCH 5/7] [SPARK-1620] Handle uncaught exceptions in function run by Akka scheduler If the intended behavior was that uncaught exceptions thrown in functions being run by the Akka scheduler would end up being handled by the default uncaught exception handler set in Executor, and if that behavior is, in fact, correct, then this is a way to accomplish that. I'm not certain, though, that we shouldn't be doing something different to handle uncaught exceptions from some of these scheduled functions. In any event, this PR covers all of the cases I comment on in [SPARK-1620](https://issues.apache.org/jira/browse/SPARK-1620). Author: Mark Hamstra Closes #622 from markhamstra/SPARK-1620 and squashes the following commits: 071d193 [Mark Hamstra] refactored post-SPARK-1772 1a6a35e [Mark Hamstra] another style fix d30eb94 [Mark Hamstra] scalastyle 3573ecd [Mark Hamstra] Use wrapped try/catch in Utils.tryOrExit 8fc0439 [Mark Hamstra] Make functions run by the Akka scheduler use Executor's UncaughtExceptionHandler (cherry picked from commit 17f3075bc4aa8cbed165f7b367f70e84b1bc8db9) Signed-off-by: Patrick Wendell Conflicts: core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala core/src/main/scala/org/apache/spark/util/Utils.scala Conflicts: core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala core/src/main/scala/org/apache/spark/util/Utils.scala --- .../spark/deploy/client/AppClient.scala | 20 +++++++++------- .../apache/spark/deploy/worker/Worker.scala | 18 +++++++------- .../apache/spark/storage/BlockManager.scala | 2 +- .../scala/org/apache/spark/util/Utils.scala | 24 +++++++++++++------ 4 files changed, 40 insertions(+), 24 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala index 7e98c7f8ff99b..ed03f45a46c47 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala @@ -31,6 +31,8 @@ import org.apache.spark.deploy.{ApplicationDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.Master import org.apache.spark.util.AkkaUtils +import org.apache.spark.util.Utils + /** * Interface allowing applications to speak with a Spark deploy cluster. Takes a master URL, @@ -88,14 +90,16 @@ private[spark] class AppClient( var retries = 0 registrationRetryTimer = Some { context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) { - retries += 1 - if (registered) { - registrationRetryTimer.foreach(_.cancel()) - } else if (retries >= REGISTRATION_RETRIES) { - logError("All masters are unresponsive! Giving up.") - markDead() - } else { - tryRegisterAllMasters() + Utils.tryOrExit { + retries += 1 + if (registered) { + registrationRetryTimer.foreach(_.cancel()) + } else if (retries >= REGISTRATION_RETRIES) { + logError("All masters are unresponsive! Giving up.") + markDead() + } else { + tryRegisterAllMasters() + } } } } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index bc7dd7dc8a199..4b41ded80abda 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -163,14 +163,16 @@ private[spark] class Worker( var retries = 0 registrationRetryTimer = Some { context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) { - retries += 1 - if (registered) { - registrationRetryTimer.foreach(_.cancel()) - } else if (retries >= REGISTRATION_RETRIES) { - logError("All masters are unresponsive! Giving up.") - System.exit(1) - } else { - tryRegisterAllMasters() + Utils.tryOrExit { + retries += 1 + if (registered) { + registrationRetryTimer.foreach(_.cancel()) + } else if (retries >= REGISTRATION_RETRIES) { + logError("All masters are unresponsive! Giving up.") + System.exit(1) + } else { + tryRegisterAllMasters() + } } } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 261566077327d..b4ce5848ae8a9 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -133,7 +133,7 @@ private[spark] class BlockManager( BlockManagerWorker.startBlockManagerWorker(this) if (!BlockManager.getDisableHeartBeatsForTesting(conf)) { heartBeatTask = actorSystem.scheduler.schedule(0.seconds, heartBeatFrequency.milliseconds) { - heartBeat() + Utils.tryOrExit { heartBeat() } } } } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index ec6c1ed857a05..47ebdebf31fdd 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -19,6 +19,7 @@ package org.apache.spark.util import java.io._ import java.net.{InetAddress, URL, URI, NetworkInterface, Inet4Address} +import java.nio.ByteBuffer import java.util.{Locale, Random, UUID} import java.util.concurrent.{ConcurrentHashMap, Executors, ThreadPoolExecutor} @@ -31,14 +32,11 @@ import scala.reflect.ClassTag import com.google.common.io.Files import com.google.common.util.concurrent.ThreadFactoryBuilder -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{Path, FileSystem, FileUtil} -import org.apache.hadoop.io._ - -import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance} +import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} +import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.SparkHadoopUtil -import java.nio.ByteBuffer -import org.apache.spark.{SparkConf, SparkException, Logging} +import org.apache.spark.executor.ExecutorUncaughtExceptionHandler +import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance} /** @@ -621,6 +619,18 @@ private[spark] object Utils extends Logging { output.toString } + /** + * Execute a block of code that evaluates to Unit, forwarding any uncaught exceptions to the + * default UncaughtExceptionHandler + */ + def tryOrExit(block: => Unit) { + try { + block + } catch { + case t: Throwable => ExecutorUncaughtExceptionHandler.uncaughtException(t) + } + } + /** * A regular expression to match classes of the "core" Spark API that we want to skip when * finding the call site of a method. From 5690cf080dd66b860a2a7d2d98c7393b2ed3860d Mon Sep 17 00:00:00 2001 From: Aaron Davidson Date: Mon, 12 May 2014 11:08:52 -0700 Subject: [PATCH 6/7] SPARK-1772 Stop catching Throwable, let Executors die The main issue this patch fixes is [SPARK-1772](https://issues.apache.org/jira/browse/SPARK-1772), in which Executors may not die when fatal exceptions (e.g., OOM) are thrown. This patch causes Executors to delegate to the ExecutorUncaughtExceptionHandler when a fatal exception is thrown. This patch also continues the fight in the neverending war against `case t: Throwable =>`, by only catching Exceptions in many places, and adding a wrapper for Threads and Runnables to make sure any uncaught exceptions are at least printed to the logs. It also turns out that it is unlikely that the IndestructibleActorSystem actually works, given testing ([here](https://gist.github.com/aarondav/ca1f0cdcd50727f89c0d)). The uncaughtExceptionHandler is not called from the places that we expected it would be. [SPARK-1620](https://issues.apache.org/jira/browse/SPARK-1620) deals with part of this issue, but refactoring our Actor Systems to ensure that exceptions are dealt with properly is a much bigger change, outside the scope of this PR. Author: Aaron Davidson Closes #715 from aarondav/throwable and squashes the following commits: f9b9bfe [Aaron Davidson] Remove other redundant 'throw e' e937a0a [Aaron Davidson] Address Prashant and Matei's comments 1867867 [Aaron Davidson] [RFC] SPARK-1772 Stop catching Throwable, let Executors die (cherry picked from commit 3af1f386439cdddd42e545ad63d089f4dfdf9f8a) Signed-off-by: Patrick Wendell Conflicts: core/src/main/scala/org/apache/spark/ContextCleaner.scala core/src/main/scala/org/apache/spark/SparkContext.scala core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala core/src/main/scala/org/apache/spark/deploy/Client.scala core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala core/src/main/scala/org/apache/spark/deploy/master/Master.scala core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala core/src/main/scala/org/apache/spark/executor/Executor.scala core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala core/src/main/scala/org/apache/spark/scheduler/TaskResultGetter.scala core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala core/src/main/scala/org/apache/spark/util/AkkaUtils.scala core/src/main/scala/org/apache/spark/util/Utils.scala Conflicts: core/src/main/scala/org/apache/spark/SparkContext.scala core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala core/src/main/scala/org/apache/spark/util/Utils.scala --- .../apache/spark/api/python/PythonRDD.scala | 1 - .../api/python/PythonWorkerFactory.scala | 4 +- .../apache/spark/deploy/SparkHadoopUtil.scala | 2 +- .../org/apache/spark/executor/Executor.scala | 38 ++++--------- .../ExecutorUncaughtExceptionHandler.scala | 53 +++++++++++++++++++ .../spark/storage/DiskBlockManager.scala | 6 +-- .../scala/org/apache/spark/util/Utils.scala | 26 ++++++++- 7 files changed, 94 insertions(+), 36 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/executor/ExecutorUncaughtExceptionHandler.scala diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index e03c6f9c184c9..a7790ce03d619 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -218,7 +218,6 @@ private[spark] object PythonRDD { } } catch { case eof: EOFException => {} - case e: Throwable => throw e } JavaRDD.fromRDD(sc.sc.parallelize(objs, parallelism)) } diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala index f291266fcf17c..f8c6312bad884 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala @@ -58,13 +58,11 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String try { new Socket(daemonHost, daemonPort) } catch { - case exc: SocketException => { + case exc: SocketException => logWarning("Python daemon unexpectedly quit, attempting to restart") stopDaemon() startDaemon() new Socket(daemonHost, daemonPort) - } - case e: Throwable => throw e } } } diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index f31dd4eba87c0..3086206ce79ea 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -89,7 +89,7 @@ object SparkHadoopUtil { .newInstance() .asInstanceOf[SparkHadoopUtil] } catch { - case th: Throwable => throw new SparkException("Unable to load YARN support", th) + case e: Exception => throw new SparkException("Unable to load YARN support", e) } } else { new SparkHadoopUtil diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 07411bfa1c172..356c04fed8a2a 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -78,28 +78,7 @@ private[spark] class Executor( // Setup an uncaught exception handler for non-local mode. // Make any thread terminations due to uncaught exceptions kill the entire // executor process to avoid surprising stalls. - Thread.setDefaultUncaughtExceptionHandler( - new Thread.UncaughtExceptionHandler { - override def uncaughtException(thread: Thread, exception: Throwable) { - try { - logError("Uncaught exception in thread " + thread, exception) - - // We may have been called from a shutdown hook. If so, we must not call System.exit(). - // (If we do, we will deadlock.) - if (!Utils.inShutdown()) { - if (exception.isInstanceOf[OutOfMemoryError]) { - System.exit(ExecutorExitCode.OOM) - } else { - System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION) - } - } - } catch { - case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM) - case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE) - } - } - } - ) + Thread.setDefaultUncaughtExceptionHandler(ExecutorUncaughtExceptionHandler) } val executorSource = new ExecutorSource(this, executorId) @@ -257,6 +236,11 @@ private[spark] class Executor( } case t: Throwable => { + // Attempt to exit cleanly by informing the driver of our failure. + // If anything goes wrong (or this was a fatal exception), we will delegate to + // the default uncaught exception handler, which will terminate the Executor. + logError("Exception in task ID " + taskId, t) + val serviceTime = (System.currentTimeMillis() - taskStart).toInt val metrics = attemptedTask.flatMap(t => t.metrics) for (m <- metrics) { @@ -266,11 +250,11 @@ private[spark] class Executor( val reason = ExceptionFailure(t.getClass.getName, t.toString, t.getStackTrace, metrics) execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason)) - // TODO: Should we exit the whole executor here? On the one hand, the failed task may - // have left some weird state around depending on when the exception was thrown, but on - // the other hand, maybe we could detect that when future tasks fail and exit then. - logError("Exception in task ID " + taskId, t) - //System.exit(1) + // Don't forcibly exit unless the exception was inherently fatal, to avoid + // stopping other tasks unnecessarily. + if (Utils.isFatalError(t)) { + ExecutorUncaughtExceptionHandler.uncaughtException(t) + } } } finally { // TODO: Unregister shuffle memory only for ResultTask diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorUncaughtExceptionHandler.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorUncaughtExceptionHandler.scala new file mode 100644 index 0000000000000..b0e984c03964c --- /dev/null +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorUncaughtExceptionHandler.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.executor + +import org.apache.spark.Logging +import org.apache.spark.util.Utils + +/** + * The default uncaught exception handler for Executors terminates the whole process, to avoid + * getting into a bad state indefinitely. Since Executors are relatively lightweight, it's better + * to fail fast when things go wrong. + */ +private[spark] object ExecutorUncaughtExceptionHandler + extends Thread.UncaughtExceptionHandler with Logging { + + override def uncaughtException(thread: Thread, exception: Throwable) { + try { + logError("Uncaught exception in thread " + thread, exception) + + // We may have been called from a shutdown hook. If so, we must not call System.exit(). + // (If we do, we will deadlock.) + if (!Utils.inShutdown()) { + if (exception.isInstanceOf[OutOfMemoryError]) { + System.exit(ExecutorExitCode.OOM) + } else { + System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION) + } + } + } catch { + case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM) + case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE) + } + } + + def uncaughtException(exception: Throwable) { + uncaughtException(Thread.currentThread(), exception) + } +} diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala index f3e1c38744d78..e15de53b37117 100644 --- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala @@ -134,14 +134,14 @@ private[spark] class DiskBlockManager(shuffleManager: ShuffleBlockManager, rootD private def addShutdownHook() { localDirs.foreach(localDir => Utils.registerShutdownDeleteDir(localDir)) Runtime.getRuntime.addShutdownHook(new Thread("delete Spark local dirs") { - override def run() { + override def run(): Unit = Utils.logUncaughtExceptions { logDebug("Shutdown hook called") localDirs.foreach { localDir => try { if (!Utils.hasRootAsShutdownDeleteDir(localDir)) Utils.deleteRecursively(localDir) } catch { - case t: Throwable => - logError("Exception while deleting local spark dir: " + localDir, t) + case e: Exception => + logError("Exception while deleting local spark dir: " + localDir, e) } } diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 47ebdebf31fdd..b5ecfc1fb52a5 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -28,6 +28,7 @@ import scala.collection.Map import scala.collection.mutable.ArrayBuffer import scala.io.Source import scala.reflect.ClassTag +import scala.util.control.{ControlThrowable, NonFatal} import com.google.common.io.Files import com.google.common.util.concurrent.ThreadFactoryBuilder @@ -38,7 +39,6 @@ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.executor.ExecutorUncaughtExceptionHandler import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance} - /** * Various utility methods used by Spark. */ @@ -843,4 +843,28 @@ private[spark] object Utils extends Logging { System.currentTimeMillis - start } + /** + * Executes the given block, printing and re-throwing any uncaught exceptions. + * This is particularly useful for wrapping code that runs in a thread, to ensure + * that exceptions are printed, and to avoid having to catch Throwable. + */ + def logUncaughtExceptions[T](f: => T): T = { + try { + f + } catch { + case t: Throwable => + logError(s"Uncaught exception in thread ${Thread.currentThread().getName}", t) + throw t + } + } + + /** Returns true if the given exception was fatal. See docs for scala.util.control.NonFatal. */ + def isFatalError(e: Throwable): Boolean = { + e match { + case NonFatal(_) | _: InterruptedException | _: NotImplementedError | _: ControlThrowable => + false + case _ => + true + } + } } From 2ebad24cc0bb9c9ac87bc29c53b35fcf71487791 Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Tue, 8 Jul 2014 14:49:41 -0700 Subject: [PATCH 7/7] Fixed mismerge --- core/src/main/scala/org/apache/spark/util/Utils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index b5ecfc1fb52a5..863e76aa8fd05 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -34,7 +34,7 @@ import com.google.common.io.Files import com.google.common.util.concurrent.ThreadFactoryBuilder import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} -import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException} +import org.apache.spark.{Logging, SparkConf, SparkException} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.executor.ExecutorUncaughtExceptionHandler import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}