diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala index 6bc31c2a0e68..0c8d9646a2b4 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala @@ -29,6 +29,7 @@ import org.apache.commons.codec.binary.Hex import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.{SparkConf, SparkException} +import org.apache.spark.annotation.{DeveloperApi, Since, Unstable} import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.deploy.k8s.Config.KUBERNETES_FILE_UPLOAD_PATH import org.apache.spark.internal.Logging @@ -38,7 +39,14 @@ import org.apache.spark.util.{Clock, SystemClock, Utils} import org.apache.spark.util.DependencyUtils.downloadFile import org.apache.spark.util.Utils.getHadoopFileSystem -private[spark] object KubernetesUtils extends Logging { +/** + * :: DeveloperApi :: + * + * A utility class used for K8s operations internally and for implementing ExternalClusterManagers. + */ +@Unstable +@DeveloperApi +object KubernetesUtils extends Logging { private val systemClock = new SystemClock() private lazy val RNG = new SecureRandom() @@ -51,12 +59,14 @@ private[spark] object KubernetesUtils extends Logging { * @param prefix the given property name prefix * @return a Map storing the configuration property keys and values */ + @Since("2.3.0") def parsePrefixedKeyValuePairs( sparkConf: SparkConf, prefix: String): Map[String, String] = { sparkConf.getAllWithPrefix(prefix).toMap } + @Since("3.0.0") def requireBothOrNeitherDefined( opt1: Option[_], opt2: Option[_], @@ -66,6 +76,7 @@ private[spark] object KubernetesUtils extends Logging { requireSecondIfFirstIsDefined(opt2, opt1, errMessageWhenFirstIsMissing) } + @Since("3.0.0") def requireSecondIfFirstIsDefined( opt1: Option[_], opt2: Option[_], @@ -75,11 +86,13 @@ private[spark] object KubernetesUtils extends Logging { } } + @Since("2.3.0") def requireNandDefined(opt1: Option[_], opt2: Option[_], errMessage: String): Unit = { opt1.foreach { _ => require(opt2.isEmpty, errMessage) } opt2.foreach { _ => require(opt1.isEmpty, errMessage) } } + @Since("3.2.0") def loadPodFromTemplate( kubernetesClient: KubernetesClient, templateFileName: String, @@ -99,6 +112,7 @@ private[spark] object KubernetesUtils extends Logging { } } + @Since("3.0.0") def selectSparkContainer(pod: Pod, containerName: Option[String]): SparkPod = { def selectNamedContainer( containers: List[Container], name: String): Option[(Container, List[Container])] = @@ -125,8 +139,10 @@ private[spark] object KubernetesUtils extends Logging { }.getOrElse(SparkPod(pod, new ContainerBuilder().build())) } + @Since("2.4.0") def parseMasterUrl(url: String): String = url.substring("k8s://".length) + @Since("3.0.0") def formatPairsBundle(pairs: Seq[(String, String)], indent: Int = 1) : String = { // Use more loggable format if value is null or empty val indentStr = "\t" * indent @@ -141,6 +157,7 @@ private[spark] object KubernetesUtils extends Logging { * @param pod Pod * @return Human readable pod state */ + @Since("3.0.0") def formatPodState(pod: Pod): String = { val details = Seq[(String, String)]( // pod metadata @@ -164,6 +181,7 @@ private[spark] object KubernetesUtils extends Logging { formatPairsBundle(details) } + @Since("3.0.0") def containersDescription(p: Pod, indent: Int = 1): String = { p.getStatus.getContainerStatuses.asScala.map { status => Seq( @@ -173,6 +191,7 @@ private[spark] object KubernetesUtils extends Logging { }.map(p => formatPairsBundle(p, indent)).mkString("\n\n") } + @Since("3.0.0") def containerStatusDescription(containerStatus: ContainerStatus) : Seq[(String, String)] = { val state = containerStatus.getState @@ -200,6 +219,7 @@ private[spark] object KubernetesUtils extends Logging { }.getOrElse(Seq(("container state", "N/A"))) } + @Since("3.0.0") def formatTime(time: String): String = { if (time != null) time else "N/A" } @@ -212,6 +232,7 @@ private[spark] object KubernetesUtils extends Logging { * This avoids using a UUID for uniqueness (too long), and relying solely on the current time * (not unique enough). */ + @Since("3.0.0") def uniqueID(clock: Clock = systemClock): String = { val random = new Array[Byte](3) synchronized { @@ -228,6 +249,7 @@ private[spark] object KubernetesUtils extends Logging { * It assumes we can use the Kubernetes device plugin format: vendor-domain/resource. * It returns a set with a tuple of vendor-domain/resource and Quantity for each resource. */ + @Since("3.0.0") def buildResourcesQuantities( componentName: String, sparkConf: SparkConf): Map[String, Quantity] = { @@ -247,6 +269,7 @@ private[spark] object KubernetesUtils extends Logging { /** * Upload files and modify their uris */ + @Since("3.0.0") def uploadAndTransformFileUris(fileUris: Iterable[String], conf: Option[SparkConf] = None) : Iterable[String] = { fileUris.map { uri => @@ -261,11 +284,13 @@ private[spark] object KubernetesUtils extends Logging { } } + @Since("3.0.0") def isLocalAndResolvable(resource: String): Boolean = { resource != SparkLauncher.NO_RESOURCE && isLocalDependency(Utils.resolveURI(resource)) } + @Since("3.1.1") def renameMainAppResource( resource: String, conf: Option[SparkConf] = None, @@ -281,6 +306,7 @@ private[spark] object KubernetesUtils extends Logging { } } + @Since("3.0.0") def uploadFileUri(uri: String, conf: Option[SparkConf] = None): String = { conf match { case Some(sConf) => @@ -325,6 +351,7 @@ private[spark] object KubernetesUtils extends Logging { } } + @Since("3.0.0") def buildPodWithServiceAccount(serviceAccount: Option[String], pod: SparkPod): Option[Pod] = { serviceAccount.map { account => new PodBuilder(pod.pod) @@ -338,6 +365,7 @@ private[spark] object KubernetesUtils extends Logging { // Add a OwnerReference to the given resources making the pod an owner of them so when // the pod is deleted, the resources are garbage collected. + @Since("3.1.1") def addOwnerReference(pod: Pod, resources: Seq[HasMetadata]): Unit = { if (pod != null) { val reference = new OwnerReferenceBuilder()