From 782d37db356cf252796c1c15b70c2e4de15e2493 Mon Sep 17 00:00:00 2001 From: ForVic Date: Mon, 18 Aug 2025 14:28:49 -0700 Subject: [PATCH 1/8] Capture the stack trace from failed K8s jobs, and set it in the annotations (#617) --- .../org/apache/spark/deploy/SparkSubmit.scala | 49 +++++++++ ...apache.spark.deploy.SparkDiagnosticsSetter | 18 ++++ .../org/apache/spark/deploy/k8s/Config.scala | 8 ++ .../apache/spark/deploy/k8s/Constants.scala | 1 + .../SparkKubernetesDiagnosticsSetter.scala | 79 ++++++++++++++ ...parkKubernetesDiagnosticsSetterSuite.scala | 101 ++++++++++++++++++ 6 files changed, 256 insertions(+) create mode 100644 resource-managers/kubernetes/core/src/main/resources/META-INF/services/org.apache.spark.deploy.SparkDiagnosticsSetter create mode 100644 resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetter.scala create mode 100644 resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetterSuite.scala diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 4fcc3af1d5e2..0698fbccb5e1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -67,6 +67,8 @@ private[spark] class SparkSubmit extends Logging { import DependencyUtils._ import SparkSubmit._ + private val KUBERNETES_DIAGNOSTICS_MESSAGE_LIMIT_BYTES = 64 * 1024 // 64 KiB + def doSubmit(args: Array[String]): Unit = { val appArgs = parseArguments(args) val sparkConf = appArgs.toSparkConf() @@ -1035,6 +1037,8 @@ private[spark] class SparkSubmit extends Logging { exitCode = e.exitCode case _ => } + // Store the diagnostics externally if enabled, but still throw to complete the application. + storeDiagnostics(args, sparkConf, cause) throw cause } finally { if (args.master.startsWith("k8s") && !isShell(args.primaryResource) && @@ -1058,6 +1062,26 @@ private[spark] class SparkSubmit extends Logging { /** Throw a SparkException with the given error message. */ private def error(msg: String): Unit = throw new SparkException(msg) + /** + * Store the diagnostics using the SparkDiagnosticsSetter. + */ + private def storeDiagnostics(args: SparkSubmitArguments, sparkConf: SparkConf, + t: Throwable): Unit = { + // Swallow exceptions when storing diagnostics, this shouldn't fail the application. + try { + if (args.master.startsWith("k8s") && !isShell(args.primaryResource) + && !isSqlShell(args.mainClass) && !isThriftServer(args.mainClass)) { + val diagnostics = SparkStringUtils.abbreviate( + org.apache.hadoop.util.StringUtils.stringifyException(t), + KUBERNETES_DIAGNOSTICS_MESSAGE_LIMIT_BYTES) + SparkSubmitUtils. + getSparkDiagnosticsSetters(args.master, sparkConf) + .map(_.setDiagnostics(diagnostics, sparkConf)) + } + } catch { + case e: Throwable => logWarning(s"Failed to set diagnostics: $e") + } + } } @@ -1233,6 +1257,24 @@ private[spark] object SparkSubmitUtils { case _ => throw new SparkException(s"Spark config without '=': $pair") } } + + private[deploy] def getSparkDiagnosticsSetters( + master: String, sparkConf: SparkConf): Option[SparkDiagnosticsSetter] = { + val loader = Utils.getContextOrSparkClassLoader + val serviceLoaders = + ServiceLoader.load(classOf[SparkDiagnosticsSetter], loader) + .asScala + .filter(_.supports(master, sparkConf)) + + serviceLoaders.size match { + case x if x > 1 => + throw new SparkException(s"Multiple($x) external SparkDiagnosticsSetter" + + s"registered for master url $master.") + case 1 => + Some(serviceLoaders.headOption.get) + case _ => None + } + } } /** @@ -1255,3 +1297,10 @@ private[spark] trait SparkSubmitOperation { def supports(master: String): Boolean } + +private[spark] trait SparkDiagnosticsSetter { + + def setDiagnostics(diagnostics: String, conf: SparkConf): Unit + + def supports(clusterManagerUrl: String, conf: SparkConf): Boolean +} diff --git a/resource-managers/kubernetes/core/src/main/resources/META-INF/services/org.apache.spark.deploy.SparkDiagnosticsSetter b/resource-managers/kubernetes/core/src/main/resources/META-INF/services/org.apache.spark.deploy.SparkDiagnosticsSetter new file mode 100644 index 000000000000..ccd0fc0b64b8 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/resources/META-INF/services/org.apache.spark.deploy.SparkDiagnosticsSetter @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.spark.deploy.k8s.SparkKubernetesDiagnosticsSetter \ No newline at end of file diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 8be78240e34f..9676c0ef5f45 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -784,6 +784,14 @@ private[spark] object Config extends Logging { .checkValue(value => value > 0, "Gracefully shutdown period must be a positive time value") .createWithDefaultString("20s") + val KUBERNETES_STORE_DIAGNOSTICS = + ConfigBuilder("spark.kubernetes.storeDiagnostics") + .doc("If set to true, Spark will store diagnostics information for failed applications in" + + s" the Kubernetes API server using the ${DIAGNOSTICS_ANNOTATION} annotation.") + .version("4.1.0") + .booleanConf + .createWithDefault(false) + val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label." val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation." val KUBERNETES_DRIVER_SERVICE_LABEL_PREFIX = "spark.kubernetes.driver.service.label." diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala index cd2a90898efe..c336e02088f9 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala @@ -107,6 +107,7 @@ object Constants { val DEFAULT_EXECUTOR_CONTAINER_NAME = "spark-kubernetes-executor" val NON_JVM_MEMORY_OVERHEAD_FACTOR = 0.4d val CONNECT_GRPC_BINDING_PORT = "spark.connect.grpc.binding.port" + val DIAGNOSTICS_ANNOTATION = "spark.kubernetes-diagnostics" // Hadoop Configuration val HADOOP_CONF_VOLUME = "hadoop-properties" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetter.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetter.scala new file mode 100644 index 000000000000..a001e9914b71 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetter.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s + +import io.fabric8.kubernetes.api.model.{Pod, PodBuilder} +import io.fabric8.kubernetes.client.KubernetesClient + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkDiagnosticsSetter +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants.DIAGNOSTICS_ANNOTATION +import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory.ClientType +import org.apache.spark.internal.Logging + +/** + * We use this trait and its implementation to allow for mocking the static + * client creation in tests. + */ +private[spark] trait KubernetesClientProvider { + def create(conf: SparkConf): KubernetesClient +} + +private[spark] class DefaultKubernetesClientProvider extends KubernetesClientProvider { + override def create(conf: SparkConf): KubernetesClient = { + SparkKubernetesClientFactory.createKubernetesClient( + conf.get(KUBERNETES_DRIVER_MASTER_URL), + Option(conf.get(KUBERNETES_NAMESPACE)), + KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX, + ClientType.Driver, + conf, + None) + } +} + +private[spark] class SparkKubernetesDiagnosticsSetter( + clientProvider: KubernetesClientProvider + ) extends SparkDiagnosticsSetter with Logging { + + def this() = { + this(new DefaultKubernetesClientProvider) + } + + override def setDiagnostics(diagnostics: String, conf: SparkConf): Unit = { + require(conf.get(KUBERNETES_DRIVER_POD_NAME).isDefined, + "Driver pod name must be set in order to set diagnostics on the driver pod.") + val client = clientProvider.create(conf) + conf.get(KUBERNETES_DRIVER_POD_NAME).foreach { podName => + client.pods() + .inNamespace(conf.get(KUBERNETES_NAMESPACE)) + .withName(podName) + .edit((p: Pod) => new PodBuilder(p) + .editMetadata() + .addToAnnotations(DIAGNOSTICS_ANNOTATION, diagnostics) + .endMetadata() + .build()); + } + } + + override def supports(clusterManagerUrl: String, conf: SparkConf): Boolean = { + if (conf.get(KUBERNETES_STORE_DIAGNOSTICS)) { + return clusterManagerUrl.startsWith("k8s://") + } + false + } +} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetterSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetterSuite.scala new file mode 100644 index 000000000000..abcedf891c42 --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetterSuite.scala @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s + +import java.util.function.UnaryOperator + +import io.fabric8.kubernetes.api.model.Pod +import io.fabric8.kubernetes.client.KubernetesClient +import io.fabric8.kubernetes.client.dsl.PodResource +import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations} +import org.mockito.ArgumentMatchers.any +import org.mockito.Mockito._ +import org.scalatest.BeforeAndAfterEach +import org.scalatestplus.mockito.MockitoSugar + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants.DIAGNOSTICS_ANNOTATION +import org.apache.spark.deploy.k8s.Fabric8Aliases.PODS + +class SparkKubernetesDiagnosticsSetterSuite extends SparkFunSuite + with MockitoSugar with BeforeAndAfterEach { + + @Mock + private var client: KubernetesClient = _ + @Mock + private var clientProvider: KubernetesClientProvider = _ + @Mock + private var podOperations: PODS = _ + @Mock + private var driverPodOperations: PodResource = _ + + private val driverPodName: String = "driver-pod" + private val k8sClusterManagerUrl: String = "k8s://dummy" + private val namespace: String = "default" + + private var setter: SparkKubernetesDiagnosticsSetter = _ + + override def beforeEach(): Unit = { + MockitoAnnotations.openMocks(this) + when(client.pods()).thenReturn(podOperations) + when(podOperations.inNamespace(namespace)).thenReturn(podOperations) + when(podOperations.withName(driverPodName)).thenReturn(driverPodOperations) + when(clientProvider.create(any(classOf[SparkConf]))).thenReturn(client) + setter = new SparkKubernetesDiagnosticsSetter(clientProvider) + } + + test("supports() should return true only for k8s:// URLs when the feature is enabled") { + assert(setter.supports(k8sClusterManagerUrl, + new SparkConf().set(KUBERNETES_STORE_DIAGNOSTICS, true))) + assert(!setter.supports(k8sClusterManagerUrl, new SparkConf())) + assert(!setter.supports(k8sClusterManagerUrl, + new SparkConf().set(KUBERNETES_STORE_DIAGNOSTICS, false))) + assert(!setter.supports("yarn", new SparkConf().set(KUBERNETES_STORE_DIAGNOSTICS, true))) + assert(!setter.supports("spark://localhost", new SparkConf())) + } + + test("setDiagnostics should throw if driver pod name is missing") { + val conf = new SparkConf() + .set(KUBERNETES_DRIVER_MASTER_URL, k8sClusterManagerUrl) + .set(KUBERNETES_NAMESPACE, namespace) + + assertThrows[IllegalArgumentException] { + setter.setDiagnostics("diag", conf) + } + } + + test("setDiagnostics should patch pod with diagnostics annotation") { + val diagnostics = "Fake diagnostics stack trace" + val conf = new SparkConf() + .set(KUBERNETES_DRIVER_MASTER_URL, k8sClusterManagerUrl) + .set(KUBERNETES_NAMESPACE, namespace) + .set(KUBERNETES_DRIVER_POD_NAME, driverPodName) + + setter.setDiagnostics(diagnostics, conf) + + val captor: ArgumentCaptor[UnaryOperator[Pod]] = + ArgumentCaptor.forClass(classOf[UnaryOperator[Pod]]) + verify(driverPodOperations).edit(captor.capture()) + + val fn = captor.getValue + val initialPod = SparkPod.initialPod().pod + val editedPod = fn.apply(initialPod) + + assert(editedPod.getMetadata.getAnnotations.get(DIAGNOSTICS_ANNOTATION) == diagnostics) + } +} From b2b0c8b30a59202aaf861f55a00278926d2e9dad Mon Sep 17 00:00:00 2001 From: Victor Sunderland Date: Thu, 16 Oct 2025 14:35:22 -0700 Subject: [PATCH 2/8] address review comments --- .../org/apache/spark/deploy/SparkSubmit.scala | 35 +++++++++++-------- .../SparkKubernetesDiagnosticsSetter.scala | 35 ++++++++++--------- 2 files changed, 40 insertions(+), 30 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 0698fbccb5e1..17faae002a79 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -67,8 +67,6 @@ private[spark] class SparkSubmit extends Logging { import DependencyUtils._ import SparkSubmit._ - private val KUBERNETES_DIAGNOSTICS_MESSAGE_LIMIT_BYTES = 64 * 1024 // 64 KiB - def doSubmit(args: Array[String]): Unit = { val appArgs = parseArguments(args) val sparkConf = appArgs.toSparkConf() @@ -1065,18 +1063,16 @@ private[spark] class SparkSubmit extends Logging { /** * Store the diagnostics using the SparkDiagnosticsSetter. */ - private def storeDiagnostics(args: SparkSubmitArguments, sparkConf: SparkConf, - t: Throwable): Unit = { + private def storeDiagnostics( + args: SparkSubmitArguments, + sparkConf: SparkConf, + throwable: Throwable): Unit = { // Swallow exceptions when storing diagnostics, this shouldn't fail the application. try { - if (args.master.startsWith("k8s") && !isShell(args.primaryResource) - && !isSqlShell(args.mainClass) && !isThriftServer(args.mainClass)) { - val diagnostics = SparkStringUtils.abbreviate( - org.apache.hadoop.util.StringUtils.stringifyException(t), - KUBERNETES_DIAGNOSTICS_MESSAGE_LIMIT_BYTES) - SparkSubmitUtils. - getSparkDiagnosticsSetters(args.master, sparkConf) - .map(_.setDiagnostics(diagnostics, sparkConf)) + if (!isShell(args.primaryResource) && !isSqlShell(args.mainClass) + && !isThriftServer(args.mainClass)) { + SparkSubmitUtils.getSparkDiagnosticsSetters(args.master, sparkConf) + .foreach(_.setDiagnostics(throwable, sparkConf)) } } catch { case e: Throwable => logWarning(s"Failed to set diagnostics: $e") @@ -1259,7 +1255,8 @@ private[spark] object SparkSubmitUtils { } private[deploy] def getSparkDiagnosticsSetters( - master: String, sparkConf: SparkConf): Option[SparkDiagnosticsSetter] = { + master: String, + sparkConf: SparkConf): Option[SparkDiagnosticsSetter] = { val loader = Utils.getContextOrSparkClassLoader val serviceLoaders = ServiceLoader.load(classOf[SparkDiagnosticsSetter], loader) @@ -1298,9 +1295,19 @@ private[spark] trait SparkSubmitOperation { def supports(master: String): Boolean } +/** + * Provides a hook to set the application failure details in some external system. + */ private[spark] trait SparkDiagnosticsSetter { - def setDiagnostics(diagnostics: String, conf: SparkConf): Unit + /** + * Set the failure details. + */ + def setDiagnostics(throwable: Throwable, conf: SparkConf): Unit + /** + * Whether this implementation of the SparkDiagnosticsSetter supports setting the stack + * trace for this application. + */ def supports(clusterManagerUrl: String, conf: SparkConf): Boolean } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetter.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetter.scala index a001e9914b71..e5613fd1c237 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetter.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetter.scala @@ -18,13 +18,13 @@ package org.apache.spark.deploy.k8s import io.fabric8.kubernetes.api.model.{Pod, PodBuilder} import io.fabric8.kubernetes.client.KubernetesClient - import org.apache.spark.SparkConf import org.apache.spark.deploy.SparkDiagnosticsSetter import org.apache.spark.deploy.k8s.Config._ import org.apache.spark.deploy.k8s.Constants.DIAGNOSTICS_ANNOTATION import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory.ClientType import org.apache.spark.internal.Logging +import org.apache.spark.util.{SparkStringUtils, Utils} /** * We use this trait and its implementation to allow for mocking the static @@ -50,30 +50,33 @@ private[spark] class SparkKubernetesDiagnosticsSetter( clientProvider: KubernetesClientProvider ) extends SparkDiagnosticsSetter with Logging { + private val KUBERNETES_DIAGNOSTICS_MESSAGE_LIMIT_BYTES = 64 * 1024 // 64 KiB + def this() = { this(new DefaultKubernetesClientProvider) } - override def setDiagnostics(diagnostics: String, conf: SparkConf): Unit = { + override def setDiagnostics(throwable: Throwable, conf: SparkConf): Unit = { require(conf.get(KUBERNETES_DRIVER_POD_NAME).isDefined, "Driver pod name must be set in order to set diagnostics on the driver pod.") - val client = clientProvider.create(conf) - conf.get(KUBERNETES_DRIVER_POD_NAME).foreach { podName => - client.pods() - .inNamespace(conf.get(KUBERNETES_NAMESPACE)) - .withName(podName) - .edit((p: Pod) => new PodBuilder(p) - .editMetadata() - .addToAnnotations(DIAGNOSTICS_ANNOTATION, diagnostics) - .endMetadata() - .build()); + val diagnostics = SparkStringUtils.abbreviate( + org.apache.hadoop.util.StringUtils.stringifyException(throwable), + KUBERNETES_DIAGNOSTICS_MESSAGE_LIMIT_BYTES) + Utils.tryWithResource(clientProvider.create(conf)) { client => + conf.get(KUBERNETES_DRIVER_POD_NAME).foreach { podName => + client.pods() + .inNamespace(conf.get(KUBERNETES_NAMESPACE)) + .withName(podName) + .edit((p: Pod) => new PodBuilder(p) + .editOrNewMetadata() + .addToAnnotations(DIAGNOSTICS_ANNOTATION, diagnostics) + .endMetadata() + .build()); + } } } override def supports(clusterManagerUrl: String, conf: SparkConf): Boolean = { - if (conf.get(KUBERNETES_STORE_DIAGNOSTICS)) { - return clusterManagerUrl.startsWith("k8s://") - } - false + conf.get(KUBERNETES_STORE_DIAGNOSTICS) && clusterManagerUrl.startsWith("k8s://") } } From 0b7abd80087a107e8eb2f3b10d2a7140d7dd0e60 Mon Sep 17 00:00:00 2001 From: Victor Sunderland Date: Thu, 16 Oct 2025 16:28:33 -0700 Subject: [PATCH 3/8] fix compile errors --- .../deploy/k8s/SparkKubernetesDiagnosticsSetter.scala | 5 +++-- .../k8s/SparkKubernetesDiagnosticsSetterSuite.scala | 8 +++++--- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetter.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetter.scala index e5613fd1c237..006b17d4e24c 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetter.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetter.scala @@ -18,6 +18,8 @@ package org.apache.spark.deploy.k8s import io.fabric8.kubernetes.api.model.{Pod, PodBuilder} import io.fabric8.kubernetes.client.KubernetesClient +import org.apache.hadoop.util.StringUtils + import org.apache.spark.SparkConf import org.apache.spark.deploy.SparkDiagnosticsSetter import org.apache.spark.deploy.k8s.Config._ @@ -59,8 +61,7 @@ private[spark] class SparkKubernetesDiagnosticsSetter( override def setDiagnostics(throwable: Throwable, conf: SparkConf): Unit = { require(conf.get(KUBERNETES_DRIVER_POD_NAME).isDefined, "Driver pod name must be set in order to set diagnostics on the driver pod.") - val diagnostics = SparkStringUtils.abbreviate( - org.apache.hadoop.util.StringUtils.stringifyException(throwable), + val diagnostics = SparkStringUtils.abbreviate(StringUtils.stringifyException(throwable), KUBERNETES_DIAGNOSTICS_MESSAGE_LIMIT_BYTES) Utils.tryWithResource(clientProvider.create(conf)) { client => conf.get(KUBERNETES_DRIVER_POD_NAME).foreach { podName => diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetterSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetterSuite.scala index abcedf891c42..27176ef89e26 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetterSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetterSuite.scala @@ -21,6 +21,7 @@ import java.util.function.UnaryOperator import io.fabric8.kubernetes.api.model.Pod import io.fabric8.kubernetes.client.KubernetesClient import io.fabric8.kubernetes.client.dsl.PodResource +import org.apache.hadoop.util.StringUtils import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations} import org.mockito.ArgumentMatchers.any import org.mockito.Mockito._ @@ -75,12 +76,12 @@ class SparkKubernetesDiagnosticsSetterSuite extends SparkFunSuite .set(KUBERNETES_NAMESPACE, namespace) assertThrows[IllegalArgumentException] { - setter.setDiagnostics("diag", conf) + setter.setDiagnostics(new Throwable("diag"), conf) } } test("setDiagnostics should patch pod with diagnostics annotation") { - val diagnostics = "Fake diagnostics stack trace" + val diagnostics = new Throwable("Fake diagnostics stack trace") val conf = new SparkConf() .set(KUBERNETES_DRIVER_MASTER_URL, k8sClusterManagerUrl) .set(KUBERNETES_NAMESPACE, namespace) @@ -96,6 +97,7 @@ class SparkKubernetesDiagnosticsSetterSuite extends SparkFunSuite val initialPod = SparkPod.initialPod().pod val editedPod = fn.apply(initialPod) - assert(editedPod.getMetadata.getAnnotations.get(DIAGNOSTICS_ANNOTATION) == diagnostics) + assert(editedPod.getMetadata.getAnnotations.get(DIAGNOSTICS_ANNOTATION) + == StringUtils.stringifyException(diagnostics)) } } From 1dcc99130cbd25177984bde99231be13e05068bc Mon Sep 17 00:00:00 2001 From: Victor Sunderland Date: Fri, 17 Oct 2025 17:12:40 -0700 Subject: [PATCH 4/8] initial round of review comments --- .../src/main/scala/org/apache/spark/deploy/SparkSubmit.scala | 5 ++--- .../spark/deploy/k8s/SparkKubernetesDiagnosticsSetter.scala | 5 ++--- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 17faae002a79..89591da6e80a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -1070,7 +1070,7 @@ private[spark] class SparkSubmit extends Logging { // Swallow exceptions when storing diagnostics, this shouldn't fail the application. try { if (!isShell(args.primaryResource) && !isSqlShell(args.mainClass) - && !isThriftServer(args.mainClass)) { + && !isThriftServer(args.mainClass) && !isConnectServer(args.mainClass)) { SparkSubmitUtils.getSparkDiagnosticsSetters(args.master, sparkConf) .foreach(_.setDiagnostics(throwable, sparkConf)) } @@ -1265,8 +1265,7 @@ private[spark] object SparkSubmitUtils { serviceLoaders.size match { case x if x > 1 => - throw new SparkException(s"Multiple($x) external SparkDiagnosticsSetter" + - s"registered for master url $master.") + throw new SparkException(s"Multiple($x) external SparkDiagnosticsSetter registered.") case 1 => Some(serviceLoaders.headOption.get) case _ => None diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetter.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetter.scala index 006b17d4e24c..07b1301720fd 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetter.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetter.scala @@ -48,9 +48,8 @@ private[spark] class DefaultKubernetesClientProvider extends KubernetesClientPro } } -private[spark] class SparkKubernetesDiagnosticsSetter( - clientProvider: KubernetesClientProvider - ) extends SparkDiagnosticsSetter with Logging { +private[spark] class SparkKubernetesDiagnosticsSetter(clientProvider: KubernetesClientProvider) + extends SparkDiagnosticsSetter with Logging { private val KUBERNETES_DIAGNOSTICS_MESSAGE_LIMIT_BYTES = 64 * 1024 // 64 KiB From df2f6ce42e8a20944f3a3df0967d3accd6f8e3b2 Mon Sep 17 00:00:00 2001 From: Victor Sunderland Date: Tue, 21 Oct 2025 14:39:35 -0700 Subject: [PATCH 5/8] Use a string conf, and some style --- .../main/scala/org/apache/spark/deploy/SparkSubmit.scala | 8 +++++--- .../main/scala/org/apache/spark/deploy/k8s/Config.scala | 2 +- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 89591da6e80a..262c92c44194 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -1036,7 +1036,9 @@ private[spark] class SparkSubmit extends Logging { case _ => } // Store the diagnostics externally if enabled, but still throw to complete the application. - storeDiagnostics(args, sparkConf, cause) + if (sparkConf.getBoolean("spark.kubernetes.storeDiagnostics", false)) { + storeDiagnostics(args, sparkConf, cause) + } throw cause } finally { if (args.master.startsWith("k8s") && !isShell(args.primaryResource) && @@ -1070,12 +1072,12 @@ private[spark] class SparkSubmit extends Logging { // Swallow exceptions when storing diagnostics, this shouldn't fail the application. try { if (!isShell(args.primaryResource) && !isSqlShell(args.mainClass) - && !isThriftServer(args.mainClass) && !isConnectServer(args.mainClass)) { + && !isThriftServer(args.mainClass) && !isConnectServer(args.mainClass)) { SparkSubmitUtils.getSparkDiagnosticsSetters(args.master, sparkConf) .foreach(_.setDiagnostics(throwable, sparkConf)) } } catch { - case e: Throwable => logWarning(s"Failed to set diagnostics: $e") + case e: Throwable => logDebug(s"Failed to set diagnostics: $e") } } } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 9676c0ef5f45..64e3bad64223 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -787,7 +787,7 @@ private[spark] object Config extends Logging { val KUBERNETES_STORE_DIAGNOSTICS = ConfigBuilder("spark.kubernetes.storeDiagnostics") .doc("If set to true, Spark will store diagnostics information for failed applications in" + - s" the Kubernetes API server using the ${DIAGNOSTICS_ANNOTATION} annotation.") + s" the Kubernetes API server using the $DIAGNOSTICS_ANNOTATION annotation.") .version("4.1.0") .booleanConf .createWithDefault(false) From e1509ebea4ae363773c24b6717931aaaec2c9c7d Mon Sep 17 00:00:00 2001 From: Victor Sunderland Date: Thu, 23 Oct 2025 18:03:41 -0700 Subject: [PATCH 6/8] Rename the config to exit exception --- .../org/apache/spark/deploy/SparkSubmit.scala | 16 ++++++++-------- .../org/apache/spark/deploy/k8s/Config.scala | 8 ++++---- .../org/apache/spark/deploy/k8s/Constants.scala | 2 +- .../k8s/SparkKubernetesDiagnosticsSetter.scala | 12 ++++++------ .../SparkKubernetesDiagnosticsSetterSuite.scala | 16 ++++++---------- 5 files changed, 25 insertions(+), 29 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 262c92c44194..daf2b4fb9e32 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -1036,8 +1036,8 @@ private[spark] class SparkSubmit extends Logging { case _ => } // Store the diagnostics externally if enabled, but still throw to complete the application. - if (sparkConf.getBoolean("spark.kubernetes.storeDiagnostics", false)) { - storeDiagnostics(args, sparkConf, cause) + if (sparkConf.getBoolean("spark.kubernetes.driver.annotateExitException", false)) { + annotateExitException(args, sparkConf, cause) } throw cause } finally { @@ -1063,9 +1063,9 @@ private[spark] class SparkSubmit extends Logging { private def error(msg: String): Unit = throw new SparkException(msg) /** - * Store the diagnostics using the SparkDiagnosticsSetter. + * Store the exit exception using the SparkDiagnosticsSetter. */ - private def storeDiagnostics( + private def annotateExitException( args: SparkSubmitArguments, sparkConf: SparkConf, throwable: Throwable): Unit = { @@ -1263,7 +1263,7 @@ private[spark] object SparkSubmitUtils { val serviceLoaders = ServiceLoader.load(classOf[SparkDiagnosticsSetter], loader) .asScala - .filter(_.supports(master, sparkConf)) + .filter(_.supports(master)) serviceLoaders.size match { case x if x > 1 => @@ -1307,8 +1307,8 @@ private[spark] trait SparkDiagnosticsSetter { def setDiagnostics(throwable: Throwable, conf: SparkConf): Unit /** - * Whether this implementation of the SparkDiagnosticsSetter supports setting the stack - * trace for this application. + * Whether this implementation of the SparkDiagnosticsSetter supports setting the exit + * exception for this application. */ - def supports(clusterManagerUrl: String, conf: SparkConf): Boolean + def supports(clusterManagerUrl: String): Boolean } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 64e3bad64223..3ec4bbfd6763 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -784,10 +784,10 @@ private[spark] object Config extends Logging { .checkValue(value => value > 0, "Gracefully shutdown period must be a positive time value") .createWithDefaultString("20s") - val KUBERNETES_STORE_DIAGNOSTICS = - ConfigBuilder("spark.kubernetes.storeDiagnostics") - .doc("If set to true, Spark will store diagnostics information for failed applications in" + - s" the Kubernetes API server using the $DIAGNOSTICS_ANNOTATION annotation.") + val KUBERNETES_ANNOTATE_EXIT_EXCEPTION = + ConfigBuilder("spark.kubernetes.driver.annotateExitException") + .doc("If set to true, Spark will store the exit exception failed applications in" + + s" the Kubernetes API server using the $EXIT_EXCEPTION_ANNOTATION annotation.") .version("4.1.0") .booleanConf .createWithDefault(false) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala index c336e02088f9..218ddbac70dd 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala @@ -107,7 +107,7 @@ object Constants { val DEFAULT_EXECUTOR_CONTAINER_NAME = "spark-kubernetes-executor" val NON_JVM_MEMORY_OVERHEAD_FACTOR = 0.4d val CONNECT_GRPC_BINDING_PORT = "spark.connect.grpc.binding.port" - val DIAGNOSTICS_ANNOTATION = "spark.kubernetes-diagnostics" + val EXIT_EXCEPTION_ANNOTATION = "spark.exit-exception" // Hadoop Configuration val HADOOP_CONF_VOLUME = "hadoop-properties" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetter.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetter.scala index 07b1301720fd..04a33337dbc0 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetter.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetter.scala @@ -23,7 +23,7 @@ import org.apache.hadoop.util.StringUtils import org.apache.spark.SparkConf import org.apache.spark.deploy.SparkDiagnosticsSetter import org.apache.spark.deploy.k8s.Config._ -import org.apache.spark.deploy.k8s.Constants.DIAGNOSTICS_ANNOTATION +import org.apache.spark.deploy.k8s.Constants.EXIT_EXCEPTION_ANNOTATION import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory.ClientType import org.apache.spark.internal.Logging import org.apache.spark.util.{SparkStringUtils, Utils} @@ -51,7 +51,7 @@ private[spark] class DefaultKubernetesClientProvider extends KubernetesClientPro private[spark] class SparkKubernetesDiagnosticsSetter(clientProvider: KubernetesClientProvider) extends SparkDiagnosticsSetter with Logging { - private val KUBERNETES_DIAGNOSTICS_MESSAGE_LIMIT_BYTES = 64 * 1024 // 64 KiB + private val KUBERNETES_EXIT_EXCEPTION_MESSAGE_LIMIT_BYTES = 64 * 1024 // 64 KiB def this() = { this(new DefaultKubernetesClientProvider) @@ -61,7 +61,7 @@ private[spark] class SparkKubernetesDiagnosticsSetter(clientProvider: Kubernetes require(conf.get(KUBERNETES_DRIVER_POD_NAME).isDefined, "Driver pod name must be set in order to set diagnostics on the driver pod.") val diagnostics = SparkStringUtils.abbreviate(StringUtils.stringifyException(throwable), - KUBERNETES_DIAGNOSTICS_MESSAGE_LIMIT_BYTES) + KUBERNETES_EXIT_EXCEPTION_MESSAGE_LIMIT_BYTES) Utils.tryWithResource(clientProvider.create(conf)) { client => conf.get(KUBERNETES_DRIVER_POD_NAME).foreach { podName => client.pods() @@ -69,14 +69,14 @@ private[spark] class SparkKubernetesDiagnosticsSetter(clientProvider: Kubernetes .withName(podName) .edit((p: Pod) => new PodBuilder(p) .editOrNewMetadata() - .addToAnnotations(DIAGNOSTICS_ANNOTATION, diagnostics) + .addToAnnotations(EXIT_EXCEPTION_ANNOTATION, diagnostics) .endMetadata() .build()); } } } - override def supports(clusterManagerUrl: String, conf: SparkConf): Boolean = { - conf.get(KUBERNETES_STORE_DIAGNOSTICS) && clusterManagerUrl.startsWith("k8s://") + override def supports(clusterManagerUrl: String): Boolean = { + clusterManagerUrl.startsWith("k8s://") } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetterSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetterSuite.scala index 27176ef89e26..aa8c68721109 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetterSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetterSuite.scala @@ -30,7 +30,7 @@ import org.scalatestplus.mockito.MockitoSugar import org.apache.spark.{SparkConf, SparkFunSuite} import org.apache.spark.deploy.k8s.Config._ -import org.apache.spark.deploy.k8s.Constants.DIAGNOSTICS_ANNOTATION +import org.apache.spark.deploy.k8s.Constants.EXIT_EXCEPTION_ANNOTATION import org.apache.spark.deploy.k8s.Fabric8Aliases.PODS class SparkKubernetesDiagnosticsSetterSuite extends SparkFunSuite @@ -60,14 +60,10 @@ class SparkKubernetesDiagnosticsSetterSuite extends SparkFunSuite setter = new SparkKubernetesDiagnosticsSetter(clientProvider) } - test("supports() should return true only for k8s:// URLs when the feature is enabled") { - assert(setter.supports(k8sClusterManagerUrl, - new SparkConf().set(KUBERNETES_STORE_DIAGNOSTICS, true))) - assert(!setter.supports(k8sClusterManagerUrl, new SparkConf())) - assert(!setter.supports(k8sClusterManagerUrl, - new SparkConf().set(KUBERNETES_STORE_DIAGNOSTICS, false))) - assert(!setter.supports("yarn", new SparkConf().set(KUBERNETES_STORE_DIAGNOSTICS, true))) - assert(!setter.supports("spark://localhost", new SparkConf())) + test("supports() should return true only for k8s:// URLs") { + assert(setter.supports(k8sClusterManagerUrl)) + assert(!setter.supports("yarn")) + assert(!setter.supports("spark://localhost")) } test("setDiagnostics should throw if driver pod name is missing") { @@ -97,7 +93,7 @@ class SparkKubernetesDiagnosticsSetterSuite extends SparkFunSuite val initialPod = SparkPod.initialPod().pod val editedPod = fn.apply(initialPod) - assert(editedPod.getMetadata.getAnnotations.get(DIAGNOSTICS_ANNOTATION) + assert(editedPod.getMetadata.getAnnotations.get(EXIT_EXCEPTION_ANNOTATION) == StringUtils.stringifyException(diagnostics)) } } From 1a394f06f1f243e9f43880dcbbac23b1a67cbc34 Mon Sep 17 00:00:00 2001 From: Victor Sunderland Date: Fri, 24 Oct 2025 14:16:37 -0700 Subject: [PATCH 7/8] Another round of review comments --- .../src/main/scala/org/apache/spark/deploy/SparkSubmit.scala | 5 ++--- .../spark/deploy/k8s/SparkKubernetesDiagnosticsSetter.scala | 2 -- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index daf2b4fb9e32..07d764966f9e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -1073,7 +1073,7 @@ private[spark] class SparkSubmit extends Logging { try { if (!isShell(args.primaryResource) && !isSqlShell(args.mainClass) && !isThriftServer(args.mainClass) && !isConnectServer(args.mainClass)) { - SparkSubmitUtils.getSparkDiagnosticsSetters(args.master, sparkConf) + SparkSubmitUtils.getSparkDiagnosticsSetters(args.master) .foreach(_.setDiagnostics(throwable, sparkConf)) } } catch { @@ -1257,8 +1257,7 @@ private[spark] object SparkSubmitUtils { } private[deploy] def getSparkDiagnosticsSetters( - master: String, - sparkConf: SparkConf): Option[SparkDiagnosticsSetter] = { + master: String): Option[SparkDiagnosticsSetter] = { val loader = Utils.getContextOrSparkClassLoader val serviceLoaders = ServiceLoader.load(classOf[SparkDiagnosticsSetter], loader) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetter.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetter.scala index 04a33337dbc0..9a1e79594e48 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetter.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetter.scala @@ -58,8 +58,6 @@ private[spark] class SparkKubernetesDiagnosticsSetter(clientProvider: Kubernetes } override def setDiagnostics(throwable: Throwable, conf: SparkConf): Unit = { - require(conf.get(KUBERNETES_DRIVER_POD_NAME).isDefined, - "Driver pod name must be set in order to set diagnostics on the driver pod.") val diagnostics = SparkStringUtils.abbreviate(StringUtils.stringifyException(throwable), KUBERNETES_EXIT_EXCEPTION_MESSAGE_LIMIT_BYTES) Utils.tryWithResource(clientProvider.create(conf)) { client => From 8060376fb9da55192a4a9a9d25a97187684af8a8 Mon Sep 17 00:00:00 2001 From: Victor Sunderland Date: Fri, 24 Oct 2025 17:35:22 -0700 Subject: [PATCH 8/8] fix CI --- .../k8s/SparkKubernetesDiagnosticsSetterSuite.scala | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetterSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetterSuite.scala index aa8c68721109..65c150b2a035 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetterSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetterSuite.scala @@ -66,16 +66,6 @@ class SparkKubernetesDiagnosticsSetterSuite extends SparkFunSuite assert(!setter.supports("spark://localhost")) } - test("setDiagnostics should throw if driver pod name is missing") { - val conf = new SparkConf() - .set(KUBERNETES_DRIVER_MASTER_URL, k8sClusterManagerUrl) - .set(KUBERNETES_NAMESPACE, namespace) - - assertThrows[IllegalArgumentException] { - setter.setDiagnostics(new Throwable("diag"), conf) - } - } - test("setDiagnostics should patch pod with diagnostics annotation") { val diagnostics = new Throwable("Fake diagnostics stack trace") val conf = new SparkConf()