Skip to content

Commit 75cac1f

Browse files
committed
[SPARK-37497][K8S] Promote ExecutorPods[PollingSnapshot|WatchSnapshot]Source to DeveloperApi
### What changes were proposed in this pull request? This PR aims to promote `ExecutorPodsWatchSnapshotSource` and `ExecutorPodsPollingSnapshotSource` as **stable** `DeveloperApi` in order to maintain it officially in a backward compatible way at Apache Spark 3.3.0. ### Why are the changes needed? - Since SPARK-24248 at Apache Spark 2.4.0, `ExecutorPodsWatchSnapshotSource` and `ExecutorPodsPollingSnapshotSource` have been used to monitor executor pods without any interface changes for over 3 years. - Apache Spark 3.1.1 makes `Kubernetes` module GA and provides an extensible external cluster manager framework. New `ExternalClusterManager` for K8s environment need to depend on this to monitor pods. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manual review. Closes #34751 from dongjoon-hyun/SPARK-37497. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]> (cherry picked from commit 2b04496) Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent b8b5f94 commit 75cac1f

File tree

2 files changed

+25
-2
lines changed

2 files changed

+25
-2
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsPollingSnapshotSource.scala

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,21 @@ import io.fabric8.kubernetes.client.KubernetesClient
2222
import scala.collection.JavaConverters._
2323

2424
import org.apache.spark.SparkConf
25+
import org.apache.spark.annotation.{DeveloperApi, Since, Stable}
2526
import org.apache.spark.deploy.k8s.Config._
2627
import org.apache.spark.deploy.k8s.Constants._
2728
import org.apache.spark.internal.Logging
2829
import org.apache.spark.util.{ThreadUtils, Utils}
2930

30-
private[spark] class ExecutorPodsPollingSnapshotSource(
31+
/**
32+
* :: DeveloperApi ::
33+
*
34+
* A class used for polling K8s executor pods by ExternalClusterManagers.
35+
* @since 3.1.3
36+
*/
37+
@Stable
38+
@DeveloperApi
39+
class ExecutorPodsPollingSnapshotSource(
3140
conf: SparkConf,
3241
kubernetesClient: KubernetesClient,
3342
snapshotsStore: ExecutorPodsSnapshotsStore,
@@ -37,13 +46,15 @@ private[spark] class ExecutorPodsPollingSnapshotSource(
3746

3847
private var pollingFuture: Future[_] = _
3948

49+
@Since("3.1.3")
4050
def start(applicationId: String): Unit = {
4151
require(pollingFuture == null, "Cannot start polling more than once.")
4252
logDebug(s"Starting to check for executor pod state every $pollingInterval ms.")
4353
pollingFuture = pollingExecutor.scheduleWithFixedDelay(
4454
new PollRunnable(applicationId), pollingInterval, pollingInterval, TimeUnit.MILLISECONDS)
4555
}
4656

57+
@Since("3.1.3")
4758
def stop(): Unit = {
4859
if (pollingFuture != null) {
4960
pollingFuture.cancel(true)

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsWatchSnapshotSource.scala

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,27 @@ import io.fabric8.kubernetes.api.model.Pod
2222
import io.fabric8.kubernetes.client.{KubernetesClient, KubernetesClientException, Watcher}
2323
import io.fabric8.kubernetes.client.Watcher.Action
2424

25+
import org.apache.spark.annotation.{DeveloperApi, Since, Stable}
2526
import org.apache.spark.deploy.k8s.Constants._
2627
import org.apache.spark.internal.Logging
2728
import org.apache.spark.util.Utils
2829

29-
private[spark] class ExecutorPodsWatchSnapshotSource(
30+
/**
31+
* :: DeveloperApi ::
32+
*
33+
* A class used for watching K8s executor pods by ExternalClusterManagers.
34+
*
35+
* @since 3.1.3
36+
*/
37+
@Stable
38+
@DeveloperApi
39+
class ExecutorPodsWatchSnapshotSource(
3040
snapshotsStore: ExecutorPodsSnapshotsStore,
3141
kubernetesClient: KubernetesClient) extends Logging {
3242

3343
private var watchConnection: Closeable = _
3444

45+
@Since("3.1.3")
3546
def start(applicationId: String): Unit = {
3647
require(watchConnection == null, "Cannot start the watcher twice.")
3748
logDebug(s"Starting watch for pods with labels $SPARK_APP_ID_LABEL=$applicationId," +
@@ -42,6 +53,7 @@ private[spark] class ExecutorPodsWatchSnapshotSource(
4253
.watch(new ExecutorPodsWatcher())
4354
}
4455

56+
@Since("3.1.3")
4557
def stop(): Unit = {
4658
if (watchConnection != null) {
4759
Utils.tryLogNonFatalError {

0 commit comments

Comments
 (0)