diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 4fcc3af1d5e2..07d764966f9e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -1035,6 +1035,10 @@ private[spark] class SparkSubmit extends Logging { exitCode = e.exitCode case _ => } + // Store the diagnostics externally if enabled, but still throw to complete the application. + if (sparkConf.getBoolean("spark.kubernetes.driver.annotateExitException", false)) { + annotateExitException(args, sparkConf, cause) + } throw cause } finally { if (args.master.startsWith("k8s") && !isShell(args.primaryResource) && @@ -1058,6 +1062,24 @@ private[spark] class SparkSubmit extends Logging { /** Throw a SparkException with the given error message. */ private def error(msg: String): Unit = throw new SparkException(msg) + /** + * Store the exit exception using the SparkDiagnosticsSetter. + */ + private def annotateExitException( + args: SparkSubmitArguments, + sparkConf: SparkConf, + throwable: Throwable): Unit = { + // Swallow exceptions when storing diagnostics, this shouldn't fail the application. + try { + if (!isShell(args.primaryResource) && !isSqlShell(args.mainClass) + && !isThriftServer(args.mainClass) && !isConnectServer(args.mainClass)) { + SparkSubmitUtils.getSparkDiagnosticsSetters(args.master) + .foreach(_.setDiagnostics(throwable, sparkConf)) + } + } catch { + case e: Throwable => logDebug(s"Failed to set diagnostics: $e") + } + } } @@ -1233,6 +1255,23 @@ private[spark] object SparkSubmitUtils { case _ => throw new SparkException(s"Spark config without '=': $pair") } } + + private[deploy] def getSparkDiagnosticsSetters( + master: String): Option[SparkDiagnosticsSetter] = { + val loader = Utils.getContextOrSparkClassLoader + val serviceLoaders = + ServiceLoader.load(classOf[SparkDiagnosticsSetter], loader) + .asScala + .filter(_.supports(master)) + + serviceLoaders.size match { + case x if x > 1 => + throw new SparkException(s"Multiple($x) external SparkDiagnosticsSetter registered.") + case 1 => + Some(serviceLoaders.headOption.get) + case _ => None + } + } } /** @@ -1255,3 +1294,20 @@ private[spark] trait SparkSubmitOperation { def supports(master: String): Boolean } + +/** + * Provides a hook to set the application failure details in some external system. + */ +private[spark] trait SparkDiagnosticsSetter { + + /** + * Set the failure details. + */ + def setDiagnostics(throwable: Throwable, conf: SparkConf): Unit + + /** + * Whether this implementation of the SparkDiagnosticsSetter supports setting the exit + * exception for this application. + */ + def supports(clusterManagerUrl: String): Boolean +} diff --git a/resource-managers/kubernetes/core/src/main/resources/META-INF/services/org.apache.spark.deploy.SparkDiagnosticsSetter b/resource-managers/kubernetes/core/src/main/resources/META-INF/services/org.apache.spark.deploy.SparkDiagnosticsSetter new file mode 100644 index 000000000000..ccd0fc0b64b8 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/resources/META-INF/services/org.apache.spark.deploy.SparkDiagnosticsSetter @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.spark.deploy.k8s.SparkKubernetesDiagnosticsSetter \ No newline at end of file diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala index 8be78240e34f..3ec4bbfd6763 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Config.scala @@ -784,6 +784,14 @@ private[spark] object Config extends Logging { .checkValue(value => value > 0, "Gracefully shutdown period must be a positive time value") .createWithDefaultString("20s") + val KUBERNETES_ANNOTATE_EXIT_EXCEPTION = + ConfigBuilder("spark.kubernetes.driver.annotateExitException") + .doc("If set to true, Spark will store the exit exception failed applications in" + + s" the Kubernetes API server using the $EXIT_EXCEPTION_ANNOTATION annotation.") + .version("4.1.0") + .booleanConf + .createWithDefault(false) + val KUBERNETES_DRIVER_LABEL_PREFIX = "spark.kubernetes.driver.label." val KUBERNETES_DRIVER_ANNOTATION_PREFIX = "spark.kubernetes.driver.annotation." val KUBERNETES_DRIVER_SERVICE_LABEL_PREFIX = "spark.kubernetes.driver.service.label." diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala index cd2a90898efe..218ddbac70dd 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/Constants.scala @@ -107,6 +107,7 @@ object Constants { val DEFAULT_EXECUTOR_CONTAINER_NAME = "spark-kubernetes-executor" val NON_JVM_MEMORY_OVERHEAD_FACTOR = 0.4d val CONNECT_GRPC_BINDING_PORT = "spark.connect.grpc.binding.port" + val EXIT_EXCEPTION_ANNOTATION = "spark.exit-exception" // Hadoop Configuration val HADOOP_CONF_VOLUME = "hadoop-properties" diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetter.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetter.scala new file mode 100644 index 000000000000..9a1e79594e48 --- /dev/null +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetter.scala @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s + +import io.fabric8.kubernetes.api.model.{Pod, PodBuilder} +import io.fabric8.kubernetes.client.KubernetesClient +import org.apache.hadoop.util.StringUtils + +import org.apache.spark.SparkConf +import org.apache.spark.deploy.SparkDiagnosticsSetter +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants.EXIT_EXCEPTION_ANNOTATION +import org.apache.spark.deploy.k8s.SparkKubernetesClientFactory.ClientType +import org.apache.spark.internal.Logging +import org.apache.spark.util.{SparkStringUtils, Utils} + +/** + * We use this trait and its implementation to allow for mocking the static + * client creation in tests. + */ +private[spark] trait KubernetesClientProvider { + def create(conf: SparkConf): KubernetesClient +} + +private[spark] class DefaultKubernetesClientProvider extends KubernetesClientProvider { + override def create(conf: SparkConf): KubernetesClient = { + SparkKubernetesClientFactory.createKubernetesClient( + conf.get(KUBERNETES_DRIVER_MASTER_URL), + Option(conf.get(KUBERNETES_NAMESPACE)), + KUBERNETES_AUTH_DRIVER_MOUNTED_CONF_PREFIX, + ClientType.Driver, + conf, + None) + } +} + +private[spark] class SparkKubernetesDiagnosticsSetter(clientProvider: KubernetesClientProvider) + extends SparkDiagnosticsSetter with Logging { + + private val KUBERNETES_EXIT_EXCEPTION_MESSAGE_LIMIT_BYTES = 64 * 1024 // 64 KiB + + def this() = { + this(new DefaultKubernetesClientProvider) + } + + override def setDiagnostics(throwable: Throwable, conf: SparkConf): Unit = { + val diagnostics = SparkStringUtils.abbreviate(StringUtils.stringifyException(throwable), + KUBERNETES_EXIT_EXCEPTION_MESSAGE_LIMIT_BYTES) + Utils.tryWithResource(clientProvider.create(conf)) { client => + conf.get(KUBERNETES_DRIVER_POD_NAME).foreach { podName => + client.pods() + .inNamespace(conf.get(KUBERNETES_NAMESPACE)) + .withName(podName) + .edit((p: Pod) => new PodBuilder(p) + .editOrNewMetadata() + .addToAnnotations(EXIT_EXCEPTION_ANNOTATION, diagnostics) + .endMetadata() + .build()); + } + } + } + + override def supports(clusterManagerUrl: String): Boolean = { + clusterManagerUrl.startsWith("k8s://") + } +} diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetterSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetterSuite.scala new file mode 100644 index 000000000000..65c150b2a035 --- /dev/null +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/SparkKubernetesDiagnosticsSetterSuite.scala @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy.k8s + +import java.util.function.UnaryOperator + +import io.fabric8.kubernetes.api.model.Pod +import io.fabric8.kubernetes.client.KubernetesClient +import io.fabric8.kubernetes.client.dsl.PodResource +import org.apache.hadoop.util.StringUtils +import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations} +import org.mockito.ArgumentMatchers.any +import org.mockito.Mockito._ +import org.scalatest.BeforeAndAfterEach +import org.scalatestplus.mockito.MockitoSugar + +import org.apache.spark.{SparkConf, SparkFunSuite} +import org.apache.spark.deploy.k8s.Config._ +import org.apache.spark.deploy.k8s.Constants.EXIT_EXCEPTION_ANNOTATION +import org.apache.spark.deploy.k8s.Fabric8Aliases.PODS + +class SparkKubernetesDiagnosticsSetterSuite extends SparkFunSuite + with MockitoSugar with BeforeAndAfterEach { + + @Mock + private var client: KubernetesClient = _ + @Mock + private var clientProvider: KubernetesClientProvider = _ + @Mock + private var podOperations: PODS = _ + @Mock + private var driverPodOperations: PodResource = _ + + private val driverPodName: String = "driver-pod" + private val k8sClusterManagerUrl: String = "k8s://dummy" + private val namespace: String = "default" + + private var setter: SparkKubernetesDiagnosticsSetter = _ + + override def beforeEach(): Unit = { + MockitoAnnotations.openMocks(this) + when(client.pods()).thenReturn(podOperations) + when(podOperations.inNamespace(namespace)).thenReturn(podOperations) + when(podOperations.withName(driverPodName)).thenReturn(driverPodOperations) + when(clientProvider.create(any(classOf[SparkConf]))).thenReturn(client) + setter = new SparkKubernetesDiagnosticsSetter(clientProvider) + } + + test("supports() should return true only for k8s:// URLs") { + assert(setter.supports(k8sClusterManagerUrl)) + assert(!setter.supports("yarn")) + assert(!setter.supports("spark://localhost")) + } + + test("setDiagnostics should patch pod with diagnostics annotation") { + val diagnostics = new Throwable("Fake diagnostics stack trace") + val conf = new SparkConf() + .set(KUBERNETES_DRIVER_MASTER_URL, k8sClusterManagerUrl) + .set(KUBERNETES_NAMESPACE, namespace) + .set(KUBERNETES_DRIVER_POD_NAME, driverPodName) + + setter.setDiagnostics(diagnostics, conf) + + val captor: ArgumentCaptor[UnaryOperator[Pod]] = + ArgumentCaptor.forClass(classOf[UnaryOperator[Pod]]) + verify(driverPodOperations).edit(captor.capture()) + + val fn = captor.getValue + val initialPod = SparkPod.initialPod().pod + val editedPod = fn.apply(initialPod) + + assert(editedPod.getMetadata.getAnnotations.get(EXIT_EXCEPTION_ANNOTATION) + == StringUtils.stringifyException(diagnostics)) + } +}