From 0bce240ae141927979fedc51a112366b76fa6590 Mon Sep 17 00:00:00 2001 From: foxish Date: Fri, 24 Feb 2017 03:42:09 -0800 Subject: [PATCH 1/3] pod-watch progress around watch events --- .../kubernetes/LoggingPodStatusWatcher.scala | 44 +++++++++++++------ 1 file changed, 30 insertions(+), 14 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/LoggingPodStatusWatcher.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/LoggingPodStatusWatcher.scala index b7a29fedcbd2d..71d54eb666a2f 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/LoggingPodStatusWatcher.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/LoggingPodStatusWatcher.scala @@ -50,27 +50,30 @@ private[kubernetes] class LoggingPodStatusWatcher(podCompletedFuture: CountDownL } private var pod: Option[Pod] = Option.empty - private var prevPhase: String = null private def phase: String = pod.map(_.getStatus().getPhase()).getOrElse("unknown") + private def status: String = pod.map(_.getStatus().getContainerStatuses().toString()) + .getOrElse("unknown") override def eventReceived(action: Action, pod: Pod): Unit = { this.pod = Option(pod) - - logShortStatus() - if (prevPhase != phase) { - logLongStatus() - } - prevPhase = phase - - if (phase == "Succeeded" || phase == "Failed") { - podCompletedFuture.countDown() - scheduler.shutdown() + action match { + case Action.DELETED => + closeWatch() + + case Action.ERROR => + closeWatch() + + case _ => + logLongStatus() + if (hasCompleted()) { + closeWatch() + } } } override def onClose(e: KubernetesClientException): Unit = { - scheduler.shutdown() logDebug(s"Stopped watching application $appId with last-observed phase $phase") + closeWatch() } private def logShortStatus() = { @@ -78,7 +81,19 @@ private[kubernetes] class LoggingPodStatusWatcher(podCompletedFuture: CountDownL } private def logLongStatus() = { - logInfo("Phase changed, new state: " + pod.map(formatPodState(_)).getOrElse("unknown")) + logInfo("State changed, new state: " + pod.map(formatPodState(_)).getOrElse("unknown")) + } + + private def hasCompleted(): Boolean = { + if (phase == "Succeeded" || phase == "Failed") { + return true + } + false + } + + private def closeWatch(): Unit = { + podCompletedFuture.countDown() + scheduler.shutdown() } private def formatPodState(pod: Pod): String = { @@ -103,7 +118,8 @@ private[kubernetes] class LoggingPodStatusWatcher(podCompletedFuture: CountDownL .asScala .map(_.getImage) .mkString(", ")), - ("phase", pod.getStatus.getPhase()) + ("phase", pod.getStatus.getPhase()), + ("status", pod.getStatus.getContainerStatuses().toString) ) // Use more loggable format if value is null or empty From c12e185d1236533c9ff657deffe2e2a56baa849b Mon Sep 17 00:00:00 2001 From: foxish Date: Fri, 24 Feb 2017 09:56:42 -0800 Subject: [PATCH 2/3] Simplify return --- .../spark/deploy/kubernetes/LoggingPodStatusWatcher.scala | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/LoggingPodStatusWatcher.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/LoggingPodStatusWatcher.scala index 71d54eb666a2f..8cd4c5da3a928 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/LoggingPodStatusWatcher.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/LoggingPodStatusWatcher.scala @@ -85,10 +85,7 @@ private[kubernetes] class LoggingPodStatusWatcher(podCompletedFuture: CountDownL } private def hasCompleted(): Boolean = { - if (phase == "Succeeded" || phase == "Failed") { - return true - } - false + return (phase == "Succeeded" || phase == "Failed") } private def closeWatch(): Unit = { From 0592cf3592eb9b7642ff763d967d09f223e1ada5 Mon Sep 17 00:00:00 2001 From: foxish Date: Fri, 24 Feb 2017 10:52:34 -0800 Subject: [PATCH 3/3] comments --- .../spark/deploy/kubernetes/LoggingPodStatusWatcher.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/LoggingPodStatusWatcher.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/LoggingPodStatusWatcher.scala index 8cd4c5da3a928..17c3db8331ac4 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/LoggingPodStatusWatcher.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/LoggingPodStatusWatcher.scala @@ -72,7 +72,7 @@ private[kubernetes] class LoggingPodStatusWatcher(podCompletedFuture: CountDownL } override def onClose(e: KubernetesClientException): Unit = { - logDebug(s"Stopped watching application $appId with last-observed phase $phase") + logDebug(s"Stopping watching application $appId with last-observed phase $phase") closeWatch() } @@ -85,7 +85,7 @@ private[kubernetes] class LoggingPodStatusWatcher(podCompletedFuture: CountDownL } private def hasCompleted(): Boolean = { - return (phase == "Succeeded" || phase == "Failed") + phase == "Succeeded" || phase == "Failed" } private def closeWatch(): Unit = {