From 8fc043925487ee388841d30be7804c29f1b95901 Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Fri, 2 May 2014 13:24:48 -0700 Subject: [PATCH 1/5] Make functions run by the Akka scheduler use Executor's UncaughtExceptionHandler --- .../spark/deploy/client/AppClient.scala | 3 +- .../apache/spark/deploy/worker/Worker.scala | 3 +- .../org/apache/spark/executor/Executor.scala | 2 +- .../spark/scheduler/TaskSchedulerImpl.scala | 2 + .../apache/spark/storage/BlockManager.scala | 1 + .../spark/util/UncaughtExceptionHandler.scala | 42 +++++++++++++++++++ 6 files changed, 50 insertions(+), 3 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/util/UncaughtExceptionHandler.scala diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala index 896913d796604..174ccfadc6052 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala @@ -30,7 +30,7 @@ import org.apache.spark.{Logging, SparkConf, SparkException} import org.apache.spark.deploy.{ApplicationDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.Master -import org.apache.spark.util.AkkaUtils +import org.apache.spark.util.{UncaughtExceptionHandler, AkkaUtils} /** * Interface allowing applications to speak with a Spark deploy cluster. Takes a master URL, @@ -88,6 +88,7 @@ private[spark] class AppClient( var retries = 0 registrationRetryTimer = Some { context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) { + Thread.currentThread.setUncaughtExceptionHandler(UncaughtExceptionHandler) retries += 1 if (registered) { registrationRetryTimer.foreach(_.cancel()) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 85d25dc7dbfa4..ee7a28d631b57 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -34,7 +34,7 @@ import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.{DriverState, Master} import org.apache.spark.deploy.worker.ui.WorkerWebUI import org.apache.spark.metrics.MetricsSystem -import org.apache.spark.util.{AkkaUtils, Utils} +import org.apache.spark.util.{UncaughtExceptionHandler, AkkaUtils, Utils} /** * @param masterUrls Each url should look like spark://host:port. @@ -166,6 +166,7 @@ private[spark] class Worker( var retries = 0 registrationRetryTimer = Some { context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) { + Thread.currentThread.setUncaughtExceptionHandler(UncaughtExceptionHandler) retries += 1 if (registered) { registrationRetryTimer.foreach(_.cancel()) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index baee7a216a7c3..f07bdf990f8dc 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -29,7 +29,7 @@ import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.scheduler._ import org.apache.spark.storage.{StorageLevel, TaskResultBlockId} -import org.apache.spark.util.{AkkaUtils, Utils} +import org.apache.spark.util.{UncaughtExceptionHandler, AkkaUtils, Utils} /** * Spark executor used with Mesos, YARN, and the standalone scheduler. diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 5a68f38bc5844..e0c2e0de260c6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -31,6 +31,7 @@ import scala.util.Random import org.apache.spark._ import org.apache.spark.TaskState.TaskState import org.apache.spark.scheduler.SchedulingMode.SchedulingMode +import org.apache.spark.util.UncaughtExceptionHandler /** * Schedules tasks for multiple types of clusters by acting through a SchedulerBackend. @@ -139,6 +140,7 @@ private[spark] class TaskSchedulerImpl( import sc.env.actorSystem.dispatcher sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds, SPECULATION_INTERVAL milliseconds) { + Thread.currentThread.setUncaughtExceptionHandler(UncaughtExceptionHandler) checkSpeculatableTasks() } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 6d7d4f922e1fa..171a37fa1f87e 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -155,6 +155,7 @@ private[spark] class BlockManager( BlockManagerWorker.startBlockManagerWorker(this) if (!BlockManager.getDisableHeartBeatsForTesting(conf)) { heartBeatTask = actorSystem.scheduler.schedule(0.seconds, heartBeatFrequency.milliseconds) { + Thread.currentThread.setUncaughtExceptionHandler(UncaughtExceptionHandler) heartBeat() } } diff --git a/core/src/main/scala/org/apache/spark/util/UncaughtExceptionHandler.scala b/core/src/main/scala/org/apache/spark/util/UncaughtExceptionHandler.scala new file mode 100644 index 0000000000000..2eddfc6217008 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/util/UncaughtExceptionHandler.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util + +import org.apache.spark.executor.ExecutorExitCode +import org.apache.spark.Logging + +object UncaughtExceptionHandler extends Thread.UncaughtExceptionHandler with Logging { + override def uncaughtException(thread: Thread, exception: Throwable) { + try { + logError("Uncaught exception in thread " + thread, exception) + + // We may have been called from a shutdown hook. If so, we must not call System.exit(). + // (If we do, we will deadlock.) + if (!Utils.inShutdown()) { + if (exception.isInstanceOf[OutOfMemoryError]) { + System.exit(ExecutorExitCode.OOM) + } else { + System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION) + } + } + } catch { + case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM) + case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE) + } + } +} From 3573ecddf9de7d8ca55c4c96e5293285e7bde1d4 Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Tue, 6 May 2014 16:51:49 -0700 Subject: [PATCH 2/5] Use wrapped try/catch in Utils.tryOrExit --- .../spark/deploy/client/AppClient.scala | 19 +++++++++-------- .../apache/spark/deploy/worker/Worker.scala | 21 ++++++++++--------- .../spark/scheduler/TaskSchedulerImpl.scala | 5 ++--- .../apache/spark/storage/BlockManager.scala | 3 +-- .../spark/util/UncaughtExceptionHandler.scala | 2 +- .../scala/org/apache/spark/util/Utils.scala | 12 +++++++++++ 6 files changed, 37 insertions(+), 25 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala index 174ccfadc6052..d38e9e79204c2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala @@ -30,7 +30,7 @@ import org.apache.spark.{Logging, SparkConf, SparkException} import org.apache.spark.deploy.{ApplicationDescription, ExecutorState} import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.Master -import org.apache.spark.util.{UncaughtExceptionHandler, AkkaUtils} +import org.apache.spark.util.{Utils, AkkaUtils} /** * Interface allowing applications to speak with a Spark deploy cluster. Takes a master URL, @@ -88,14 +88,15 @@ private[spark] class AppClient( var retries = 0 registrationRetryTimer = Some { context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) { - Thread.currentThread.setUncaughtExceptionHandler(UncaughtExceptionHandler) - retries += 1 - if (registered) { - registrationRetryTimer.foreach(_.cancel()) - } else if (retries >= REGISTRATION_RETRIES) { - markDead("All masters are unresponsive! Giving up.") - } else { - tryRegisterAllMasters() + Utils.tryOrExit { + retries += 1 + if (registered) { + registrationRetryTimer.foreach(_.cancel()) + } else if (retries >= REGISTRATION_RETRIES) { + markDead("All masters are unresponsive! Giving up.") + } else { + tryRegisterAllMasters() + } } } } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index ee7a28d631b57..134624c35a57e 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -34,7 +34,7 @@ import org.apache.spark.deploy.DeployMessages._ import org.apache.spark.deploy.master.{DriverState, Master} import org.apache.spark.deploy.worker.ui.WorkerWebUI import org.apache.spark.metrics.MetricsSystem -import org.apache.spark.util.{UncaughtExceptionHandler, AkkaUtils, Utils} +import org.apache.spark.util.{AkkaUtils, Utils} /** * @param masterUrls Each url should look like spark://host:port. @@ -166,15 +166,16 @@ private[spark] class Worker( var retries = 0 registrationRetryTimer = Some { context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) { - Thread.currentThread.setUncaughtExceptionHandler(UncaughtExceptionHandler) - retries += 1 - if (registered) { - registrationRetryTimer.foreach(_.cancel()) - } else if (retries >= REGISTRATION_RETRIES) { - logError("All masters are unresponsive! Giving up.") - System.exit(1) - } else { - tryRegisterAllMasters() + Utils.tryOrExit { + retries += 1 + if (registered) { + registrationRetryTimer.foreach(_.cancel()) + } else if (retries >= REGISTRATION_RETRIES) { + logError("All masters are unresponsive! Giving up.") + System.exit(1) + } else { + tryRegisterAllMasters() + } } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index e0c2e0de260c6..bbb4fbf60b3e6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -31,7 +31,7 @@ import scala.util.Random import org.apache.spark._ import org.apache.spark.TaskState.TaskState import org.apache.spark.scheduler.SchedulingMode.SchedulingMode -import org.apache.spark.util.UncaughtExceptionHandler +import org.apache.spark.util.Utils /** * Schedules tasks for multiple types of clusters by acting through a SchedulerBackend. @@ -140,8 +140,7 @@ private[spark] class TaskSchedulerImpl( import sc.env.actorSystem.dispatcher sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds, SPECULATION_INTERVAL milliseconds) { - Thread.currentThread.setUncaughtExceptionHandler(UncaughtExceptionHandler) - checkSpeculatableTasks() + Utils.tryOrExit { checkSpeculatableTasks() } } } } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 171a37fa1f87e..6534095811907 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -155,8 +155,7 @@ private[spark] class BlockManager( BlockManagerWorker.startBlockManagerWorker(this) if (!BlockManager.getDisableHeartBeatsForTesting(conf)) { heartBeatTask = actorSystem.scheduler.schedule(0.seconds, heartBeatFrequency.milliseconds) { - Thread.currentThread.setUncaughtExceptionHandler(UncaughtExceptionHandler) - heartBeat() + Utils.tryOrExit { heartBeat() } } } } diff --git a/core/src/main/scala/org/apache/spark/util/UncaughtExceptionHandler.scala b/core/src/main/scala/org/apache/spark/util/UncaughtExceptionHandler.scala index 2eddfc6217008..c566c0148fd31 100644 --- a/core/src/main/scala/org/apache/spark/util/UncaughtExceptionHandler.scala +++ b/core/src/main/scala/org/apache/spark/util/UncaughtExceptionHandler.scala @@ -20,7 +20,7 @@ package org.apache.spark.util import org.apache.spark.executor.ExecutorExitCode import org.apache.spark.Logging -object UncaughtExceptionHandler extends Thread.UncaughtExceptionHandler with Logging { +private[spark] object UncaughtExceptionHandler extends Thread.UncaughtExceptionHandler with Logging { override def uncaughtException(thread: Thread, exception: Throwable) { try { logError("Uncaught exception in thread " + thread, exception) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 8f7594ada2ba1..8e50f4eb6aecc 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -778,6 +778,18 @@ private[spark] object Utils extends Logging { output.toString } + /** + * Execute a block of code that evaluates to Unit, forwarding any uncaught exceptions to the + * default UncaughtExceptionHandler + */ + def tryOrExit(block: => Unit) { + try { + block + } catch { + case t: Throwable => UncaughtExceptionHandler.uncaughtException(Thread.currentThread, t) + } + } + /** * A regular expression to match classes of the "core" Spark API that we want to skip when * finding the call site of a method. From d30eb9465ba29dce1927277949feafa8e2b5e821 Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Tue, 6 May 2014 17:01:58 -0700 Subject: [PATCH 3/5] scalastyle --- .../scala/org/apache/spark/util/UncaughtExceptionHandler.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/UncaughtExceptionHandler.scala b/core/src/main/scala/org/apache/spark/util/UncaughtExceptionHandler.scala index c566c0148fd31..30491dfc48d7b 100644 --- a/core/src/main/scala/org/apache/spark/util/UncaughtExceptionHandler.scala +++ b/core/src/main/scala/org/apache/spark/util/UncaughtExceptionHandler.scala @@ -20,7 +20,8 @@ package org.apache.spark.util import org.apache.spark.executor.ExecutorExitCode import org.apache.spark.Logging -private[spark] object UncaughtExceptionHandler extends Thread.UncaughtExceptionHandler with Logging { +private[spark] object UncaughtExceptionHandler + extends Thread.UncaughtExceptionHandler with Logging { override def uncaughtException(thread: Thread, exception: Throwable) { try { logError("Uncaught exception in thread " + thread, exception) From 1a6a35e9d29f301a36aea4a80396acbdd6960c35 Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Tue, 6 May 2014 17:04:30 -0700 Subject: [PATCH 4/5] another style fix --- .../scala/org/apache/spark/util/UncaughtExceptionHandler.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/core/src/main/scala/org/apache/spark/util/UncaughtExceptionHandler.scala b/core/src/main/scala/org/apache/spark/util/UncaughtExceptionHandler.scala index 30491dfc48d7b..987f912f90d01 100644 --- a/core/src/main/scala/org/apache/spark/util/UncaughtExceptionHandler.scala +++ b/core/src/main/scala/org/apache/spark/util/UncaughtExceptionHandler.scala @@ -22,6 +22,7 @@ import org.apache.spark.Logging private[spark] object UncaughtExceptionHandler extends Thread.UncaughtExceptionHandler with Logging { + override def uncaughtException(thread: Thread, exception: Throwable) { try { logError("Uncaught exception in thread " + thread, exception) From 071d193e6954087a6f5f9a375cdc599f1a3f09bd Mon Sep 17 00:00:00 2001 From: Mark Hamstra Date: Mon, 12 May 2014 13:58:25 -0700 Subject: [PATCH 5/5] refactored post-SPARK-1772 --- .../org/apache/spark/executor/Executor.scala | 2 +- .../spark/util/UncaughtExceptionHandler.scala | 44 ------------------- .../scala/org/apache/spark/util/Utils.scala | 3 +- 3 files changed, 3 insertions(+), 46 deletions(-) delete mode 100644 core/src/main/scala/org/apache/spark/util/UncaughtExceptionHandler.scala diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index f07bdf990f8dc..baee7a216a7c3 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -29,7 +29,7 @@ import org.apache.spark._ import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.scheduler._ import org.apache.spark.storage.{StorageLevel, TaskResultBlockId} -import org.apache.spark.util.{UncaughtExceptionHandler, AkkaUtils, Utils} +import org.apache.spark.util.{AkkaUtils, Utils} /** * Spark executor used with Mesos, YARN, and the standalone scheduler. diff --git a/core/src/main/scala/org/apache/spark/util/UncaughtExceptionHandler.scala b/core/src/main/scala/org/apache/spark/util/UncaughtExceptionHandler.scala deleted file mode 100644 index 987f912f90d01..0000000000000 --- a/core/src/main/scala/org/apache/spark/util/UncaughtExceptionHandler.scala +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.util - -import org.apache.spark.executor.ExecutorExitCode -import org.apache.spark.Logging - -private[spark] object UncaughtExceptionHandler - extends Thread.UncaughtExceptionHandler with Logging { - - override def uncaughtException(thread: Thread, exception: Throwable) { - try { - logError("Uncaught exception in thread " + thread, exception) - - // We may have been called from a shutdown hook. If so, we must not call System.exit(). - // (If we do, we will deadlock.) - if (!Utils.inShutdown()) { - if (exception.isInstanceOf[OutOfMemoryError]) { - System.exit(ExecutorExitCode.OOM) - } else { - System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION) - } - } - } catch { - case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM) - case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE) - } - } -} diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 8e50f4eb6aecc..4384daed272b7 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -40,6 +40,7 @@ import tachyon.client.{TachyonFile,TachyonFS} import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkException} import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.executor.ExecutorUncaughtExceptionHandler import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance} /** @@ -786,7 +787,7 @@ private[spark] object Utils extends Logging { try { block } catch { - case t: Throwable => UncaughtExceptionHandler.uncaughtException(Thread.currentThread, t) + case t: Throwable => ExecutorUncaughtExceptionHandler.uncaughtException(t) } }