From dfaee2105671d44f5c044cfb2f5c5fa92b7a9a3d Mon Sep 17 00:00:00 2001 From: Shuai Lin Date: Sat, 29 Apr 2017 16:26:26 +0800 Subject: [PATCH 1/6] Exit properly when the k8s cluster is not available. --- .../spark/deploy/kubernetes/submit/v1/Client.scala | 1 + .../kubernetes/submit/v1/LoggingPodStatusWatcher.scala | 9 ++++++--- 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/Client.scala index e1cfac8feba37..7a3cb81972a77 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/Client.scala @@ -152,6 +152,7 @@ private[spark] class Client( .pods() .withName(kubernetesAppId) .watch(loggingWatch)) { _ => + loggingWatch.start val resourceCleanShutdownHook = ShutdownHookManager.addShutdownHook(() => kubernetesResourceCleaner.deleteAllRegisteredResourcesFromKubernetes(kubernetesClient)) val cleanupServiceManagerHook = ShutdownHookManager.addShutdownHook( diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/LoggingPodStatusWatcher.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/LoggingPodStatusWatcher.scala index 7be334194d9d7..58cbc51187a91 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/LoggingPodStatusWatcher.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/LoggingPodStatusWatcher.scala @@ -44,15 +44,18 @@ private[kubernetes] class LoggingPodStatusWatcher(podCompletedFuture: CountDownL private val logRunnable: Runnable = new Runnable { override def run() = logShortStatus() } - if (interval > 0) { - scheduler.scheduleWithFixedDelay(logRunnable, 0, interval, TimeUnit.MILLISECONDS) - } private var pod: Option[Pod] = Option.empty private def phase: String = pod.map(_.getStatus().getPhase()).getOrElse("unknown") private def status: String = pod.map(_.getStatus().getContainerStatuses().toString()) .getOrElse("unknown") + def start(): Unit = { + if (interval > 0) { + scheduler.scheduleWithFixedDelay(logRunnable, 0, interval, TimeUnit.MILLISECONDS) + } + } + override def eventReceived(action: Action, pod: Pod): Unit = { this.pod = Option(pod) action match { From 528c1c50be4cfe985a5421cca1ea85111c86617c Mon Sep 17 00:00:00 2001 From: Shuai Lin Date: Sat, 29 Apr 2017 16:39:40 +0800 Subject: [PATCH 2/6] add jetty to k8s module dependency so we can use only rebuild the k8s module. --- resource-managers/kubernetes/core/pom.xml | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/pom.xml b/resource-managers/kubernetes/core/pom.xml index 70c252009c9b4..a927ef26a4d29 100644 --- a/resource-managers/kubernetes/core/pom.xml +++ b/resource-managers/kubernetes/core/pom.xml @@ -108,6 +108,14 @@ com.google.guava guava + + org.eclipse.jetty + jetty-servlet + + + org.eclipse.jetty + jetty-http + @@ -133,4 +141,3 @@ - From 0631fed134474e4b79dbbc5e9d37c9898802864a Mon Sep 17 00:00:00 2001 From: Shuai Lin Date: Tue, 16 May 2017 07:44:01 +0800 Subject: [PATCH 3/6] CR --- .../org/apache/spark/deploy/kubernetes/submit/v1/Client.scala | 2 +- .../deploy/kubernetes/submit/v1/LoggingPodStatusWatcher.scala | 3 ++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/Client.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/Client.scala index 7a3cb81972a77..dc66c870d4960 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/Client.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/Client.scala @@ -152,7 +152,7 @@ private[spark] class Client( .pods() .withName(kubernetesAppId) .watch(loggingWatch)) { _ => - loggingWatch.start + loggingWatch.start() val resourceCleanShutdownHook = ShutdownHookManager.addShutdownHook(() => kubernetesResourceCleaner.deleteAllRegisteredResourcesFromKubernetes(kubernetesClient)) val cleanupServiceManagerHook = ShutdownHookManager.addShutdownHook( diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/LoggingPodStatusWatcher.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/LoggingPodStatusWatcher.scala index 58cbc51187a91..f655a8511805f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/LoggingPodStatusWatcher.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/LoggingPodStatusWatcher.scala @@ -24,6 +24,7 @@ import io.fabric8.kubernetes.client.Watcher.Action import scala.collection.JavaConverters._ import org.apache.spark.internal.Logging +import org.apache.spark.util.ThreadUtils /** * A monitor for the running Kubernetes pod of a Spark application. Status logging occurs on @@ -40,7 +41,7 @@ private[kubernetes] class LoggingPodStatusWatcher(podCompletedFuture: CountDownL extends Watcher[Pod] with Logging { // start timer for periodic logging - private val scheduler = Executors.newScheduledThreadPool(1) + private val scheduler = ThreadUtils.newDaemonSingleThreadExecutor("logging-pod-status-watcher") private val logRunnable: Runnable = new Runnable { override def run() = logShortStatus() } From 2f933a717d903ac71e2f23da3f18abf2c32ebc25 Mon Sep 17 00:00:00 2001 From: Shuai Lin Date: Wed, 17 May 2017 09:36:55 +0800 Subject: [PATCH 4/6] Fixed single thread scheduler. --- .../deploy/kubernetes/submit/v1/LoggingPodStatusWatcher.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/LoggingPodStatusWatcher.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/LoggingPodStatusWatcher.scala index f655a8511805f..fe01a7c9aa5d9 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/LoggingPodStatusWatcher.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/LoggingPodStatusWatcher.scala @@ -41,7 +41,7 @@ private[kubernetes] class LoggingPodStatusWatcher(podCompletedFuture: CountDownL extends Watcher[Pod] with Logging { // start timer for periodic logging - private val scheduler = ThreadUtils.newDaemonSingleThreadExecutor("logging-pod-status-watcher") + private val scheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor("logging-pod-status-watcher") private val logRunnable: Runnable = new Runnable { override def run() = logShortStatus() } @@ -53,7 +53,7 @@ private[kubernetes] class LoggingPodStatusWatcher(podCompletedFuture: CountDownL def start(): Unit = { if (interval > 0) { - scheduler.scheduleWithFixedDelay(logRunnable, 0, interval, TimeUnit.MILLISECONDS) + scheduler.scheduleAtFixedRate(logRunnable, 0, interval, TimeUnit.MILLISECONDS) } } From 1e0c739110aeaaba17aee84c9e1141d7bd4da5e7 Mon Sep 17 00:00:00 2001 From: Shuai Lin Date: Wed, 17 May 2017 09:45:07 +0800 Subject: [PATCH 5/6] Fixed scalastyle check. --- .../deploy/kubernetes/submit/v1/LoggingPodStatusWatcher.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/LoggingPodStatusWatcher.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/LoggingPodStatusWatcher.scala index fe01a7c9aa5d9..537bcccaa1458 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/LoggingPodStatusWatcher.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/v1/LoggingPodStatusWatcher.scala @@ -41,7 +41,8 @@ private[kubernetes] class LoggingPodStatusWatcher(podCompletedFuture: CountDownL extends Watcher[Pod] with Logging { // start timer for periodic logging - private val scheduler = ThreadUtils.newDaemonSingleThreadScheduledExecutor("logging-pod-status-watcher") + private val scheduler = + ThreadUtils.newDaemonSingleThreadScheduledExecutor("logging-pod-status-watcher") private val logRunnable: Runnable = new Runnable { override def run() = logShortStatus() } From 0d915b5e25ca5cfc404aed5d8bd3bf9a9c885547 Mon Sep 17 00:00:00 2001 From: Shuai Lin Date: Thu, 18 May 2017 07:14:46 +0800 Subject: [PATCH 6/6] CR --- resource-managers/kubernetes/core/pom.xml | 8 -------- 1 file changed, 8 deletions(-) diff --git a/resource-managers/kubernetes/core/pom.xml b/resource-managers/kubernetes/core/pom.xml index a927ef26a4d29..aa429f73a5627 100644 --- a/resource-managers/kubernetes/core/pom.xml +++ b/resource-managers/kubernetes/core/pom.xml @@ -108,14 +108,6 @@ com.google.guava guava - - org.eclipse.jetty - jetty-servlet - - - org.eclipse.jetty - jetty-http -