Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@
*/
package org.apache.spark.deploy.k8s

import org.apache.spark.SparkConf
import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.{ContainerStateRunning, ContainerStateTerminated, ContainerStateWaiting, ContainerStatus, Pod, Time}

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.util.Utils

private[spark] object KubernetesUtils {
Expand Down Expand Up @@ -60,4 +64,81 @@ private[spark] object KubernetesUtils {
}

def parseMasterUrl(url: String): String = url.substring("k8s://".length)

def formatPairsBundle(pairs: Seq[(String, String)], indent: Int = 1) : String = {
// Use more loggable format if value is null or empty
val indentStr = "\t" * indent
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we prefer space-based indentation? Curious as to whether others have an opinion about this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just preserved the original codes choice here, I would happily change to spaces if preferred

pairs.map {
case (k, v) => s"\n$indentStr $k: ${Option(v).filter(_.nonEmpty).getOrElse("N/A")}"
}.mkString("")
}

/**
* Given a pod, output a human readable representation of its state
*
* @param pod Pod
* @return Human readable pod state
*/
def formatPodState(pod: Pod): String = {
val details = Seq[(String, String)](
// pod metadata
("pod name", pod.getMetadata.getName),
("namespace", pod.getMetadata.getNamespace),
("labels", pod.getMetadata.getLabels.asScala.mkString(", ")),
("pod uid", pod.getMetadata.getUid),
("creation time", formatTime(pod.getMetadata.getCreationTimestamp)),

// spec details
("service account name", pod.getSpec.getServiceAccountName),
("volumes", pod.getSpec.getVolumes.asScala.map(_.getName).mkString(", ")),
("node name", pod.getSpec.getNodeName),

// status
("start time", formatTime(pod.getStatus.getStartTime)),
("phase", pod.getStatus.getPhase),
("container status", containersDescription(pod, 2))
)

formatPairsBundle(details)
}

def containersDescription(p: Pod, indent: Int = 1): String = {
p.getStatus.getContainerStatuses.asScala.map { status =>
Seq(
("container name", status.getName),
("container image", status.getImage)) ++
containerStatusDescription(status)
}.map(p => formatPairsBundle(p, indent)).mkString("\n\n")
}

def containerStatusDescription(containerStatus: ContainerStatus)
: Seq[(String, String)] = {
val state = containerStatus.getState
Option(state.getRunning)
.orElse(Option(state.getTerminated))
.orElse(Option(state.getWaiting))
.map {
case running: ContainerStateRunning =>
Seq(
("container state", "running"),
("container started at", formatTime(running.getStartedAt)))
case waiting: ContainerStateWaiting =>
Seq(
("container state", "waiting"),
("pending reason", waiting.getReason))
case terminated: ContainerStateTerminated =>
Seq(
("container state", "terminated"),
("container started at", formatTime(terminated.getStartedAt)),
("container finished at", formatTime(terminated.getFinishedAt)),
("exit code", terminated.getExitCode.toString),
("termination reason", terminated.getReason))
case unknown =>
throw new SparkException(s"Unexpected container status type ${unknown.getClass}.")
}.getOrElse(Seq(("container state", "N/A")))
}

def formatTime(time: Time): String = {
if (time != null) time.getTime else "N/A"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import io.fabric8.kubernetes.client.{KubernetesClientException, Watcher}
import io.fabric8.kubernetes.client.Watcher.Action

import org.apache.spark.SparkException
import org.apache.spark.deploy.k8s.KubernetesUtils._
import org.apache.spark.internal.Logging
import org.apache.spark.util.ThreadUtils

Expand Down Expand Up @@ -99,82 +100,10 @@ private[k8s] class LoggingPodStatusWatcherImpl(
scheduler.shutdown()
}

private def formatPodState(pod: Pod): String = {
val details = Seq[(String, String)](
// pod metadata
("pod name", pod.getMetadata.getName),
("namespace", pod.getMetadata.getNamespace),
("labels", pod.getMetadata.getLabels.asScala.mkString(", ")),
("pod uid", pod.getMetadata.getUid),
("creation time", formatTime(pod.getMetadata.getCreationTimestamp)),

// spec details
("service account name", pod.getSpec.getServiceAccountName),
("volumes", pod.getSpec.getVolumes.asScala.map(_.getName).mkString(", ")),
("node name", pod.getSpec.getNodeName),

// status
("start time", formatTime(pod.getStatus.getStartTime)),
("container images",
pod.getStatus.getContainerStatuses
.asScala
.map(_.getImage)
.mkString(", ")),
("phase", pod.getStatus.getPhase),
("status", pod.getStatus.getContainerStatuses.toString)
)

formatPairsBundle(details)
}

private def formatPairsBundle(pairs: Seq[(String, String)]) = {
// Use more loggable format if value is null or empty
pairs.map {
case (k, v) => s"\n\t $k: ${Option(v).filter(_.nonEmpty).getOrElse("N/A")}"
}.mkString("")
}

override def awaitCompletion(): Unit = {
podCompletedFuture.await()
logInfo(pod.map { p =>
s"Container final statuses:\n\n${containersDescription(p)}"
}.getOrElse("No containers were found in the driver pod."))
}

private def containersDescription(p: Pod): String = {
p.getStatus.getContainerStatuses.asScala.map { status =>
Seq(
("Container name", status.getName),
("Container image", status.getImage)) ++
containerStatusDescription(status)
}.map(formatPairsBundle).mkString("\n\n")
}

private def containerStatusDescription(
containerStatus: ContainerStatus): Seq[(String, String)] = {
val state = containerStatus.getState
Option(state.getRunning)
.orElse(Option(state.getTerminated))
.orElse(Option(state.getWaiting))
.map {
case running: ContainerStateRunning =>
Seq(
("Container state", "Running"),
("Container started at", formatTime(running.getStartedAt)))
case waiting: ContainerStateWaiting =>
Seq(
("Container state", "Waiting"),
("Pending reason", waiting.getReason))
case terminated: ContainerStateTerminated =>
Seq(
("Container state", "Terminated"),
("Exit code", terminated.getExitCode.toString))
case unknown =>
throw new SparkException(s"Unexpected container status type ${unknown.getClass}.")
}.getOrElse(Seq(("Container state", "N/A")))
}

private def formatTime(time: Time): String = {
if (time != null) time.getTime else "N/A"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.collection.mutable

import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s.Config._
import org.apache.spark.deploy.k8s.KubernetesUtils._
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.ExecutorExited
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -151,13 +152,15 @@ private[spark] class ExecutorPodsLifecycleManager(

private def exitReasonMessage(podState: FinalPodState, execId: Long, exitCode: Int) = {
val pod = podState.pod
val reason = Option(pod.getStatus.getReason)
val message = Option(pod.getStatus.getMessage)
s"""
|The executor with id $execId exited with exit code $exitCode.
|The API gave the following brief reason: ${pod.getStatus.getReason}
|The API gave the following message: ${pod.getStatus.getMessage}
|The API gave the following brief reason: ${reason.getOrElse("N/A")}
|The API gave the following message: ${message.getOrElse("N/A")}
|The API gave the following container statuses:
|
|${pod.getStatus.getContainerStatuses.asScala.map(_.toString).mkString("\n===\n")}
|${containersDescription(pod)}
""".stripMargin
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import scala.collection.mutable

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s.Fabric8Aliases._
import org.apache.spark.deploy.k8s.KubernetesUtils._
import org.apache.spark.scheduler.ExecutorExited
import org.apache.spark.scheduler.cluster.k8s.ExecutorLifecycleTestUtils._

Expand Down Expand Up @@ -104,13 +105,15 @@ class ExecutorPodsLifecycleManagerSuite extends SparkFunSuite with BeforeAndAfte
}

private def exitReasonMessage(failedExecutorId: Int, failedPod: Pod): String = {
val reason = Option(failedPod.getStatus.getReason)
val message = Option(failedPod.getStatus.getMessage)
s"""
|The executor with id $failedExecutorId exited with exit code 1.
|The API gave the following brief reason: ${failedPod.getStatus.getReason}
|The API gave the following message: ${failedPod.getStatus.getMessage}
|The API gave the following brief reason: ${reason.getOrElse("N/A")}
|The API gave the following message: ${message.getOrElse("N/A")}
|The API gave the following container statuses:
|
|${failedPod.getStatus.getContainerStatuses.asScala.map(_.toString).mkString("\n===\n")}
|${containersDescription(failedPod)}
""".stripMargin
}

Expand Down