Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.apache.commons.codec.binary.Hex
import org.apache.hadoop.fs.{FileSystem, Path}

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.annotation.{DeveloperApi, Since, Unstable}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.deploy.k8s.Config.KUBERNETES_FILE_UPLOAD_PATH
import org.apache.spark.internal.Logging
Expand All @@ -38,7 +39,14 @@ import org.apache.spark.util.{Clock, SystemClock, Utils}
import org.apache.spark.util.DependencyUtils.downloadFile
import org.apache.spark.util.Utils.getHadoopFileSystem

private[spark] object KubernetesUtils extends Logging {
/**
* :: DeveloperApi ::
*
* A utility class used for K8s operations internally and for implementing ExternalClusterManagers.
*/
@Unstable
@DeveloperApi
object KubernetesUtils extends Logging {

private val systemClock = new SystemClock()
private lazy val RNG = new SecureRandom()
Expand All @@ -51,12 +59,14 @@ private[spark] object KubernetesUtils extends Logging {
* @param prefix the given property name prefix
* @return a Map storing the configuration property keys and values
*/
@Since("2.3.0")
def parsePrefixedKeyValuePairs(
sparkConf: SparkConf,
prefix: String): Map[String, String] = {
sparkConf.getAllWithPrefix(prefix).toMap
}

@Since("3.0.0")
def requireBothOrNeitherDefined(
opt1: Option[_],
opt2: Option[_],
Expand All @@ -66,6 +76,7 @@ private[spark] object KubernetesUtils extends Logging {
requireSecondIfFirstIsDefined(opt2, opt1, errMessageWhenFirstIsMissing)
}

@Since("3.0.0")
def requireSecondIfFirstIsDefined(
opt1: Option[_],
opt2: Option[_],
Expand All @@ -75,11 +86,13 @@ private[spark] object KubernetesUtils extends Logging {
}
}

@Since("2.3.0")
def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: String): Unit = {
opt1.foreach { _ => require(opt2.isEmpty, errMessage) }
opt2.foreach { _ => require(opt1.isEmpty, errMessage) }
}

@Since("3.2.0")
def loadPodFromTemplate(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Apr 30, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I mentioned in the PR title, the signature is change recently (2fa792a), @attilapiros .
I know that because I changed it. :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see the following PR description.

In this PR, Since annotations denote the last function signature changes because these are going to become public at Apache Spark 3.2.0.

kubernetesClient: KubernetesClient,
templateFileName: String,
Expand All @@ -99,6 +112,7 @@ private[spark] object KubernetesUtils extends Logging {
}
}

@Since("3.0.0")
def selectSparkContainer(pod: Pod, containerName: Option[String]): SparkPod = {
def selectNamedContainer(
containers: List[Container], name: String): Option[(Container, List[Container])] =
Expand All @@ -125,8 +139,10 @@ private[spark] object KubernetesUtils extends Logging {
}.getOrElse(SparkPod(pod, new ContainerBuilder().build()))
}

@Since("2.4.0")
def parseMasterUrl(url: String): String = url.substring("k8s://".length)

@Since("3.0.0")
def formatPairsBundle(pairs: Seq[(String, String)], indent: Int = 1) : String = {
// Use more loggable format if value is null or empty
val indentStr = "\t" * indent
Expand All @@ -141,6 +157,7 @@ private[spark] object KubernetesUtils extends Logging {
* @param pod Pod
* @return Human readable pod state
*/
@Since("3.0.0")
def formatPodState(pod: Pod): String = {
val details = Seq[(String, String)](
// pod metadata
Expand All @@ -164,6 +181,7 @@ private[spark] object KubernetesUtils extends Logging {
formatPairsBundle(details)
}

@Since("3.0.0")
def containersDescription(p: Pod, indent: Int = 1): String = {
p.getStatus.getContainerStatuses.asScala.map { status =>
Seq(
Expand All @@ -173,6 +191,7 @@ private[spark] object KubernetesUtils extends Logging {
}.map(p => formatPairsBundle(p, indent)).mkString("\n\n")
}

@Since("3.0.0")
def containerStatusDescription(containerStatus: ContainerStatus)
: Seq[(String, String)] = {
val state = containerStatus.getState
Expand Down Expand Up @@ -200,6 +219,7 @@ private[spark] object KubernetesUtils extends Logging {
}.getOrElse(Seq(("container state", "N/A")))
}

@Since("3.0.0")
def formatTime(time: String): String = {
if (time != null) time else "N/A"
}
Expand All @@ -212,6 +232,7 @@ private[spark] object KubernetesUtils extends Logging {
* This avoids using a UUID for uniqueness (too long), and relying solely on the current time
* (not unique enough).
*/
@Since("3.0.0")
def uniqueID(clock: Clock = systemClock): String = {
val random = new Array[Byte](3)
synchronized {
Expand All @@ -228,6 +249,7 @@ private[spark] object KubernetesUtils extends Logging {
* It assumes we can use the Kubernetes device plugin format: vendor-domain/resource.
* It returns a set with a tuple of vendor-domain/resource and Quantity for each resource.
*/
@Since("3.0.0")
def buildResourcesQuantities(
componentName: String,
sparkConf: SparkConf): Map[String, Quantity] = {
Expand All @@ -247,6 +269,7 @@ private[spark] object KubernetesUtils extends Logging {
/**
* Upload files and modify their uris
*/
@Since("3.0.0")
def uploadAndTransformFileUris(fileUris: Iterable[String], conf: Option[SparkConf] = None)
: Iterable[String] = {
fileUris.map { uri =>
Expand All @@ -261,11 +284,13 @@ private[spark] object KubernetesUtils extends Logging {
}
}

@Since("3.0.0")
def isLocalAndResolvable(resource: String): Boolean = {
resource != SparkLauncher.NO_RESOURCE &&
isLocalDependency(Utils.resolveURI(resource))
}

@Since("3.1.1")
def renameMainAppResource(
resource: String,
conf: Option[SparkConf] = None,
Expand All @@ -281,6 +306,7 @@ private[spark] object KubernetesUtils extends Logging {
}
}

@Since("3.0.0")
def uploadFileUri(uri: String, conf: Option[SparkConf] = None): String = {
conf match {
case Some(sConf) =>
Expand Down Expand Up @@ -325,6 +351,7 @@ private[spark] object KubernetesUtils extends Logging {
}
}

@Since("3.0.0")
def buildPodWithServiceAccount(serviceAccount: Option[String], pod: SparkPod): Option[Pod] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not released yet, @attilapiros ! :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

serviceAccount.map { account =>
new PodBuilder(pod.pod)
Expand All @@ -338,6 +365,7 @@ private[spark] object KubernetesUtils extends Logging {

// Add a OwnerReference to the given resources making the pod an owner of them so when
// the pod is deleted, the resources are garbage collected.
@Since("3.1.1")
def addOwnerReference(pod: Pod, resources: Seq[HasMetadata]): Unit = {
if (pod != null) {
val reference = new OwnerReferenceBuilder()
Expand Down