From 89ff6d5a73558220b744ce93fd8d916918b2fb86 Mon Sep 17 00:00:00 2001 From: Cheng Pan Date: Fri, 8 Aug 2025 17:47:42 +0800 Subject: [PATCH] [SPARK-53198][CORE] Support terminating driver JVM after SparkContext is stopped --- .../spark/deploy/SparkLivenessPlugin.scala | 64 +++++++++++++++++++ .../spark/internal/config/package.scala | 21 ++++++ .../org/apache/spark/util/SparkExitCode.scala | 3 + 3 files changed, 88 insertions(+) create mode 100644 core/src/main/scala/org/apache/spark/deploy/SparkLivenessPlugin.scala diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkLivenessPlugin.scala b/core/src/main/scala/org/apache/spark/deploy/SparkLivenessPlugin.scala new file mode 100644 index 000000000000..2d0f43fe44f3 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/SparkLivenessPlugin.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.deploy + +import java.util.{Map => JMap} +import java.util.concurrent.{ScheduledExecutorService, TimeUnit} + +import scala.jdk.CollectionConverters._ + +import org.apache.spark.SparkContext +import org.apache.spark.api.plugin.{DriverPlugin, ExecutorPlugin, PluginContext, SparkPlugin} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.LogKeys._ +import org.apache.spark.internal.config._ +import org.apache.spark.util.{SparkExitCode, ThreadUtils} + +/** + * A built-in plugin to check liveness of Spark essential components, e.g., SparkContext. + */ +class SparkLivenessPlugin extends SparkPlugin { + override def driverPlugin(): DriverPlugin = new SparkLivenessDriverPlugin() + + // No-op + override def executorPlugin(): ExecutorPlugin = null +} + +class SparkLivenessDriverPlugin extends DriverPlugin with Logging { + + private val timer: ScheduledExecutorService = + ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-liveness") + + override def init(sc: SparkContext, ctx: PluginContext): JMap[String, String] = { + val checkInterval = sc.conf.get(DRIVER_SPARK_CONTEXT_LIVENESS_CHECK_INTERVAL) + val terminateDelay = sc.conf.get(DRIVER_SPARK_CONTEXT_LIVENESS_TERMINATE_DELAY) + if (checkInterval == 0) { + logWarning("SparkContext liveness check is disabled.") + } else { + val task: Runnable = () => { + if (sc.isStopped) { + logWarning(log"SparkContext is stopped, will terminate Driver JVM " + + log"after ${MDC(TIME_UNITS, terminateDelay)} seconds.") + Thread.sleep(terminateDelay * 1000L) + System.exit(SparkExitCode.SPARK_CONTEXT_STOPPED) + } + } + timer.scheduleWithFixedDelay(task, checkInterval, checkInterval, TimeUnit.SECONDS) + } + Map.empty[String, String].asJava + } +} diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala index c25a4fd45c58..daa3ed914376 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/package.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala @@ -1209,6 +1209,27 @@ package object config { .checkValue(v => v >= 0, "The value should be a non-negative time value.") .createWithDefaultString("0min") + private[spark] val DRIVER_SPARK_CONTEXT_LIVENESS_CHECK_INTERVAL = + ConfigBuilder("spark.driver.liveness.sparkContext.checkInterval") + .doc("If set a positive time value, spark driver periodically checks whether " + + "the SparkContext is live. Once the SparkContext is detected to be stopped, " + + "terminate the driver with the exit code 69 after a delay. " + + "To use, set `spark.plugins=org.apache.spark.deploy.SparkLivenessPlugin`.") + .version("4.1.0") + .timeConf(TimeUnit.SECONDS) + .checkValue(v => v >= 0, "The value should be a non-negative time value.") + .createWithDefaultString("10s") + + private[spark] val DRIVER_SPARK_CONTEXT_LIVENESS_TERMINATE_DELAY = + ConfigBuilder("spark.driver.liveness.sparkContext.terminateDelay") + .doc("Self-terminate waiting duration after detecting SparkContext is stopped. " + + s"This config takes effect only when " + + s"${DRIVER_SPARK_CONTEXT_LIVENESS_CHECK_INTERVAL.key} effective.") + .version("4.1.0") + .timeConf(TimeUnit.SECONDS) + .checkValue(v => v >= 0, "The value should be a non-negative time value.") + .createWithDefaultString("120s") + private[spark] val DRIVER_BIND_ADDRESS = ConfigBuilder("spark.driver.bindAddress") .doc("Address where to bind network listen sockets on the driver.") .version("2.1.0") diff --git a/core/src/main/scala/org/apache/spark/util/SparkExitCode.scala b/core/src/main/scala/org/apache/spark/util/SparkExitCode.scala index 0ffc2afd9635..d5b06833e197 100644 --- a/core/src/main/scala/org/apache/spark/util/SparkExitCode.scala +++ b/core/src/main/scala/org/apache/spark/util/SparkExitCode.scala @@ -45,6 +45,9 @@ private[spark] object SparkExitCode { OutOfMemoryError. */ val OOM = 52 + /** Exit because the SparkContext is stopped. */ + val SPARK_CONTEXT_STOPPED = 69 + /** Exit due to ClassNotFoundException or NoClassDefFoundError. */ val CLASS_NOT_FOUND = 101