From 172fc20898896058b7288360eb5292ed9df9d79c Mon Sep 17 00:00:00 2001 From: pgandhi Date: Fri, 21 Jul 2017 16:00:22 -0500 Subject: [PATCH 1/8] [SPARK-21503]: Fixed the issue Added the case ExecutorLostFailure which was previously not there, thus, the default case would be executed in which case, task would be marked as completed. --- .../main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index aabf6e0c63c0..c7f7e9889d41 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -19,7 +19,7 @@ package org.apache.spark.ui.exec import scala.collection.mutable.{LinkedHashMap, ListBuffer} -import org.apache.spark.{ExceptionFailure, Resubmitted, SparkConf, SparkContext} +import org.apache.spark.{ExceptionFailure, ExecutorLostFailure, Resubmitted, SparkConf, SparkContext} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.scheduler._ import org.apache.spark.storage.{StorageStatus, StorageStatusListener} @@ -140,6 +140,8 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar return case _: ExceptionFailure => taskSummary.tasksFailed += 1 + case _: ExecutorLostFailure => + taskSummary.tasksFailed += 1 case _ => taskSummary.tasksComplete += 1 } From 81422e0f634c0f06eb2ea29fba4281176a1ab528 Mon Sep 17 00:00:00 2001 From: pgandhi Date: Tue, 25 Jul 2017 09:54:41 -0500 Subject: [PATCH 2/8] [SPARK-21503][UI]: Adding changes as per comments --- .../org/apache/spark/ui/exec/ExecutorsTab.scala | 12 ++++++------ .../scala/org/apache/spark/ui/StagePageSuite.scala | 13 +++++++++++++ 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index c7f7e9889d41..0d1b567e2443 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -19,7 +19,7 @@ package org.apache.spark.ui.exec import scala.collection.mutable.{LinkedHashMap, ListBuffer} -import org.apache.spark.{ExceptionFailure, ExecutorLostFailure, Resubmitted, SparkConf, SparkContext} +import org.apache.spark.{Resubmitted, SparkConf, SparkContext} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.scheduler._ import org.apache.spark.storage.{StorageStatus, StorageStatusListener} @@ -138,12 +138,12 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar // could have failed half-way through. The correct fix would be to keep track of the // metrics added by each attempt, but this is much more complicated. return - case _: ExceptionFailure => - taskSummary.tasksFailed += 1 - case _: ExecutorLostFailure => - taskSummary.tasksFailed += 1 case _ => - taskSummary.tasksComplete += 1 + } + if (info.successful) { + taskSummary.tasksComplete += 1 + } else { + taskSummary.tasksFailed += 1 } if (taskSummary.tasksActive >= 1) { taskSummary.tasksActive -= 1 diff --git a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala index 499d47b13d70..b8267b267dfb 100644 --- a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala @@ -84,6 +84,19 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext { jobListener.onTaskEnd( SparkListenerTaskEnd(0, 0, "result", Success, taskInfo, taskMetrics)) } + (3 to 4).foreach { + taskId => + val taskInfo = new TaskInfo(taskId, taskId, 0, 0, "0", "localhost", TaskLocality.ANY, false) + jobListener.onStageSubmitted(SparkListenerStageSubmitted(stageInfo)) + jobListener.onTaskStart(SparkListenerTaskStart(0, 0, taskInfo)) + taskInfo.markFinished(TaskState.FAILED, System.currentTimeMillis()) + val taskMetrics = TaskMetrics.empty + taskMetrics.incPeakExecutionMemory(peakExecutionMemory) + jobListener.onTaskEnd( + SparkListenerTaskEnd(0, 0, "result", + new ExecutorLostFailure("0", false, Option[String] {"Killed by external signal"}), + taskInfo, taskMetrics)) + } jobListener.onStageCompleted(SparkListenerStageCompleted(stageInfo)) page.render(request) } From f454c8933e07967548095e068063bd313ae4845c Mon Sep 17 00:00:00 2001 From: pgandhi Date: Wed, 26 Jul 2017 16:41:16 -0500 Subject: [PATCH 3/8] [SPARK-21541]: Spark Logs show incorrect job status for a job that does not create SparkContext Added a flag to check whether user has initialized Spark Context. If it is true, then we let Application Master unregister with Resource Manager else we do not. --- .../spark/deploy/yarn/ApplicationMaster.scala | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index fc925022b271..8bb532ec0b23 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -89,6 +89,9 @@ private[spark] class ApplicationMaster( @volatile private var reporterThread: Thread = _ @volatile private var allocator: YarnAllocator = _ + + // A flag to check whether user has initialized spark context + @volatile private var registered = false private val userClassLoader = { val classpath = Client.getUserClasspath(sparkConf) @@ -319,7 +322,7 @@ private[spark] class ApplicationMaster( */ final def unregister(status: FinalApplicationStatus, diagnostics: String = null): Unit = { synchronized { - if (!unregistered) { + if (registered && !unregistered) { logInfo(s"Unregistering ApplicationMaster with $status" + Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse("")) unregistered = true @@ -332,10 +335,15 @@ private[spark] class ApplicationMaster( synchronized { if (!finished) { val inShutdown = ShutdownHookManager.inShutdown() - logInfo(s"Final app status: $status, exitCode: $code" + + if (registered) { + exitCode = code + finalStatus = status + } else { + finalStatus = FinalApplicationStatus.FAILED + exitCode = ApplicationMaster.EXIT_SC_NOT_INITED + } + logInfo(s"Final app status: $finalStatus, exitCode: $exitCode" + Option(msg).map(msg => s", (reason: $msg)").getOrElse("")) - exitCode = code - finalStatus = status finalMsg = msg finished = true if (!inShutdown && Thread.currentThread() != reporterThread && reporterThread != null) { @@ -439,12 +447,11 @@ private[spark] class ApplicationMaster( sc.getConf.get("spark.driver.port"), isClusterMode = true) registerAM(sc.getConf, rpcEnv, driverRef, sc.ui.map(_.webUrl), securityMgr) + registered = true } else { // Sanity check; should never happen in normal operation, since sc should only be null // if the user app did not create a SparkContext. - if (!finished) { - throw new IllegalStateException("SparkContext is null but app is still running!") - } + throw new IllegalStateException("User did not initialize spark context!") } userClassThread.join() } catch { From 6b7d5c6e2565c7c4dd97f31fe404c59e73c7474c Mon Sep 17 00:00:00 2001 From: pgandhi Date: Wed, 26 Jul 2017 16:58:27 -0500 Subject: [PATCH 4/8] Revert "[SPARK-21541]: Spark Logs show incorrect job status for a job that does not create SparkContext" This reverts commit f454c8933e07967548095e068063bd313ae4845c. "Merged another issue to this one by mistake" --- .../spark/deploy/yarn/ApplicationMaster.scala | 21 +++++++------------ 1 file changed, 7 insertions(+), 14 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 8bb532ec0b23..fc925022b271 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -89,9 +89,6 @@ private[spark] class ApplicationMaster( @volatile private var reporterThread: Thread = _ @volatile private var allocator: YarnAllocator = _ - - // A flag to check whether user has initialized spark context - @volatile private var registered = false private val userClassLoader = { val classpath = Client.getUserClasspath(sparkConf) @@ -322,7 +319,7 @@ private[spark] class ApplicationMaster( */ final def unregister(status: FinalApplicationStatus, diagnostics: String = null): Unit = { synchronized { - if (registered && !unregistered) { + if (!unregistered) { logInfo(s"Unregistering ApplicationMaster with $status" + Option(diagnostics).map(msg => s" (diag message: $msg)").getOrElse("")) unregistered = true @@ -335,15 +332,10 @@ private[spark] class ApplicationMaster( synchronized { if (!finished) { val inShutdown = ShutdownHookManager.inShutdown() - if (registered) { - exitCode = code - finalStatus = status - } else { - finalStatus = FinalApplicationStatus.FAILED - exitCode = ApplicationMaster.EXIT_SC_NOT_INITED - } - logInfo(s"Final app status: $finalStatus, exitCode: $exitCode" + + logInfo(s"Final app status: $status, exitCode: $code" + Option(msg).map(msg => s", (reason: $msg)").getOrElse("")) + exitCode = code + finalStatus = status finalMsg = msg finished = true if (!inShutdown && Thread.currentThread() != reporterThread && reporterThread != null) { @@ -447,11 +439,12 @@ private[spark] class ApplicationMaster( sc.getConf.get("spark.driver.port"), isClusterMode = true) registerAM(sc.getConf, rpcEnv, driverRef, sc.ui.map(_.webUrl), securityMgr) - registered = true } else { // Sanity check; should never happen in normal operation, since sc should only be null // if the user app did not create a SparkContext. - throw new IllegalStateException("User did not initialize spark context!") + if (!finished) { + throw new IllegalStateException("SparkContext is null but app is still running!") + } } userClassThread.join() } catch { From e46126fe0f3d8d6f92f7f51c30d8c2154bddc126 Mon Sep 17 00:00:00 2001 From: pgandhi Date: Fri, 28 Jul 2017 11:08:08 -0500 Subject: [PATCH 5/8] [SPARK-21503]- Making Changes as per comments [SPARK-21503]- Making Changes as per comments: Removed match case statement and replaced it with an if clause. --- .../org/apache/spark/ui/exec/ExecutorsTab.scala | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala index 0d1b567e2443..770da2226fe0 100644 --- a/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala +++ b/core/src/main/scala/org/apache/spark/ui/exec/ExecutorsTab.scala @@ -131,14 +131,12 @@ class ExecutorsListener(storageStatusListener: StorageStatusListener, conf: Spar if (info != null) { val eid = info.executorId val taskSummary = executorToTaskSummary.getOrElseUpdate(eid, ExecutorTaskSummary(eid)) - taskEnd.reason match { - case Resubmitted => - // Note: For resubmitted tasks, we continue to use the metrics that belong to the - // first attempt of this task. This may not be 100% accurate because the first attempt - // could have failed half-way through. The correct fix would be to keep track of the - // metrics added by each attempt, but this is much more complicated. - return - case _ => + // Note: For resubmitted tasks, we continue to use the metrics that belong to the + // first attempt of this task. This may not be 100% accurate because the first attempt + // could have failed half-way through. The correct fix would be to keep track of the + // metrics added by each attempt, but this is much more complicated. + if (taskEnd.reason == Resubmitted) { + return } if (info.successful) { taskSummary.tasksComplete += 1 From 7f03341093c843086920e8218463b5d2ba6e37d2 Mon Sep 17 00:00:00 2001 From: pgandhi Date: Tue, 1 Aug 2017 10:52:13 -0500 Subject: [PATCH 6/8] [SPARK-21503]: Reverting Unit Test Code [SPARK-21503]: Reverting Unit Test Code - Not needed. --- .../scala/org/apache/spark/ui/StagePageSuite.scala | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala index b8267b267dfb..499d47b13d70 100644 --- a/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/StagePageSuite.scala @@ -84,19 +84,6 @@ class StagePageSuite extends SparkFunSuite with LocalSparkContext { jobListener.onTaskEnd( SparkListenerTaskEnd(0, 0, "result", Success, taskInfo, taskMetrics)) } - (3 to 4).foreach { - taskId => - val taskInfo = new TaskInfo(taskId, taskId, 0, 0, "0", "localhost", TaskLocality.ANY, false) - jobListener.onStageSubmitted(SparkListenerStageSubmitted(stageInfo)) - jobListener.onTaskStart(SparkListenerTaskStart(0, 0, taskInfo)) - taskInfo.markFinished(TaskState.FAILED, System.currentTimeMillis()) - val taskMetrics = TaskMetrics.empty - taskMetrics.incPeakExecutionMemory(peakExecutionMemory) - jobListener.onTaskEnd( - SparkListenerTaskEnd(0, 0, "result", - new ExecutorLostFailure("0", false, Option[String] {"Killed by external signal"}), - taskInfo, taskMetrics)) - } jobListener.onStageCompleted(SparkListenerStageCompleted(stageInfo)) page.render(request) } From eaf63e6bd4dddc726cf57fda080b9b5d6341e2f8 Mon Sep 17 00:00:00 2001 From: pgandhi Date: Thu, 24 Aug 2017 17:03:29 -0500 Subject: [PATCH 7/8] [SPARK-21798]: No config to replace deprecated SPARK_CLASSPATH config for launching daemons like History Server Adding new env variable SPARK_DAEMON_CLASSPATH to set classpath for launching daemons. Tested and verified for History Server and Standalone Mode. --- .../java/org/apache/spark/launcher/AbstractCommandBuilder.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java index c32974a57fcc..ee594ed83281 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java +++ b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java @@ -136,7 +136,8 @@ List buildClassPath(String appClassPath) throws IOException { Set cp = new LinkedHashSet<>(); addToClassPath(cp, appClassPath); - + addToClassPath(cp, getenv("SPARK_DAEMON_CLASSPATH")); + addToClassPath(cp, getConfDir()); boolean prependClasses = !isEmpty(getenv("SPARK_PREPEND_CLASSES")); From e421a03acbd410a835cf3117fe6592523dc649b5 Mon Sep 17 00:00:00 2001 From: pgandhi Date: Fri, 25 Aug 2017 11:13:47 -0500 Subject: [PATCH 8/8] [SPARK-21798]: No config to replace deprecated SPARK_CLASSPATH config for launching daemons like History Server Reverted the previous code change and added the environment variable SPARK_DAEMON_CLASSPATH only for launching daemon processes. --- conf/spark-env.sh.template | 1 + docs/monitoring.md | 4 ++++ docs/running-on-mesos.md | 2 ++ docs/spark-standalone.md | 4 ++++ .../org/apache/spark/launcher/AbstractCommandBuilder.java | 3 +-- .../org/apache/spark/launcher/SparkClassCommandBuilder.java | 5 +++++ 6 files changed, 17 insertions(+), 2 deletions(-) diff --git a/conf/spark-env.sh.template b/conf/spark-env.sh.template index 1663019ee575..f8c895f5303b 100755 --- a/conf/spark-env.sh.template +++ b/conf/spark-env.sh.template @@ -52,6 +52,7 @@ # - SPARK_HISTORY_OPTS, to set config properties only for the history server (e.g. "-Dx=y") # - SPARK_SHUFFLE_OPTS, to set config properties only for the external shuffle service (e.g. "-Dx=y") # - SPARK_DAEMON_JAVA_OPTS, to set config properties for all daemons (e.g. "-Dx=y") +# - SPARK_DAEMON_CLASSPATH, to set the classpath for all daemons # - SPARK_PUBLIC_DNS, to set the public dns name of the master or workers # Generic options for the daemons used in the standalone deploy mode diff --git a/docs/monitoring.md b/docs/monitoring.md index 3e577c5f3677..d22cd945eaf6 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -61,6 +61,10 @@ The history server can be configured as follows: SPARK_DAEMON_JAVA_OPTS JVM options for the history server (default: none). + + SPARK_DAEMON_CLASSPATH + Classpath for the history server (default: none). + SPARK_PUBLIC_DNS diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md index 0e5a20c578db..c12b8580af06 100644 --- a/docs/running-on-mesos.md +++ b/docs/running-on-mesos.md @@ -160,6 +160,8 @@ If you like to run the `MesosClusterDispatcher` with Marathon, you need to run t The `MesosClusterDispatcher` also supports writing recovery state into Zookeeper. This will allow the `MesosClusterDispatcher` to be able to recover all submitted and running containers on relaunch. In order to enable this recovery mode, you can set SPARK_DAEMON_JAVA_OPTS in spark-env by configuring `spark.deploy.recoveryMode` and related spark.deploy.zookeeper.* configurations. For more information about these configurations please refer to the configurations [doc](configurations.html#deploy). +You can also specify any additional jars required by the `MesosClusterDispatcher` in the classpath by setting the environment variable SPARK_DAEMON_CLASSPATH in spark-env. + From the client, you can submit a job to Mesos cluster by running `spark-submit` and specifying the master URL to the URL of the `MesosClusterDispatcher` (e.g: mesos://dispatcher:7077). You can view driver statuses on the Spark cluster Web UI. diff --git a/docs/spark-standalone.md b/docs/spark-standalone.md index 642575b46dd4..1095386c31ab 100644 --- a/docs/spark-standalone.md +++ b/docs/spark-standalone.md @@ -149,6 +149,10 @@ You can optionally configure the cluster further by setting environment variable SPARK_DAEMON_JAVA_OPTS JVM options for the Spark master and worker daemons themselves in the form "-Dx=y" (default: none). + + SPARK_DAEMON_CLASSPATH + Classpath for the Spark master and worker daemons themselves (default: none). + SPARK_PUBLIC_DNS The public DNS name of the Spark master and workers (default: none). diff --git a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java index ee594ed83281..c32974a57fcc 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java +++ b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java @@ -136,8 +136,7 @@ List buildClassPath(String appClassPath) throws IOException { Set cp = new LinkedHashSet<>(); addToClassPath(cp, appClassPath); - addToClassPath(cp, getenv("SPARK_DAEMON_CLASSPATH")); - + addToClassPath(cp, getConfDir()); boolean prependClasses = !isEmpty(getenv("SPARK_PREPEND_CLASSES")); diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java index 137ef74843da..32724acdc362 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkClassCommandBuilder.java @@ -53,16 +53,19 @@ public List buildCommand(Map env) case "org.apache.spark.deploy.master.Master": javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS"); javaOptsKeys.add("SPARK_MASTER_OPTS"); + extraClassPath = getenv("SPARK_DAEMON_CLASSPATH"); memKey = "SPARK_DAEMON_MEMORY"; break; case "org.apache.spark.deploy.worker.Worker": javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS"); javaOptsKeys.add("SPARK_WORKER_OPTS"); + extraClassPath = getenv("SPARK_DAEMON_CLASSPATH"); memKey = "SPARK_DAEMON_MEMORY"; break; case "org.apache.spark.deploy.history.HistoryServer": javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS"); javaOptsKeys.add("SPARK_HISTORY_OPTS"); + extraClassPath = getenv("SPARK_DAEMON_CLASSPATH"); memKey = "SPARK_DAEMON_MEMORY"; break; case "org.apache.spark.executor.CoarseGrainedExecutorBackend": @@ -77,11 +80,13 @@ public List buildCommand(Map env) break; case "org.apache.spark.deploy.mesos.MesosClusterDispatcher": javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS"); + extraClassPath = getenv("SPARK_DAEMON_CLASSPATH"); break; case "org.apache.spark.deploy.ExternalShuffleService": case "org.apache.spark.deploy.mesos.MesosExternalShuffleService": javaOptsKeys.add("SPARK_DAEMON_JAVA_OPTS"); javaOptsKeys.add("SPARK_SHUFFLE_OPTS"); + extraClassPath = getenv("SPARK_DAEMON_CLASSPATH"); memKey = "SPARK_DAEMON_MEMORY"; break; default: