Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,5 +20,6 @@ import io.fabric8.kubernetes.api.model.HasMetadata

private[spark] case class KubernetesDriverSpec(
pod: SparkPod,
driverPreKubernetesResources: Seq[HasMetadata],
driverKubernetesResources: Seq[HasMetadata],
systemProperties: Map[String, String])
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,15 @@ trait KubernetesFeatureConfigStep {

/**
* Return any additional Kubernetes resources that should be added to support this feature. Only
* applicable when creating the driver in cluster mode.
* applicable when creating the driver in cluster mode. Resources would be setup/refresh before
* Pod creation.
*/
def getAdditionalPreKubernetesResources(): Seq[HasMetadata] = Seq.empty

/**
* Return any additional Kubernetes resources that should be added to support this feature. Only
* applicable when creating the driver in cluster mode. Resources would be setup/refresh after
* Pod creation.
*/
def getAdditionalKubernetesResources(): Seq[HasMetadata] = Seq.empty
}
Original file line number Diff line number Diff line change
Expand Up @@ -133,15 +133,41 @@ private[spark] class Client(
.build()
val driverPodName = resolvedDriverPod.getMetadata.getName

// setup resources before pod creation
val preKubernetesResources = resolvedDriverSpec.driverPreKubernetesResources
try {
kubernetesClient.resourceList(preKubernetesResources: _*).createOrReplace()
} catch {
case NonFatal(e) =>
logError("Please check \"kubectl auth can-i create [resource]\" first." +
" It should be yes. And please also check your feature step implementation.")
kubernetesClient.resourceList(preKubernetesResources: _*).delete()
Copy link
Member

@dongjoon-hyun dongjoon-hyun Dec 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is the first error causing Job failure, could you add a user-friendly error message with logError?

Copy link
Member Author

@Yikun Yikun Dec 17, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, how about:

logError("Please check \"kubectl auth can-i create [resource]\" first." +
  " It should be yes. And please also check your feature step implementation.")

throw e
}

var watch: Watch = null
var createdDriverPod: Pod = null
try {
createdDriverPod = kubernetesClient.pods().create(resolvedDriverPod)
} catch {
case NonFatal(e) =>
kubernetesClient.resourceList(preKubernetesResources: _*).delete()
logError("Please check \"kubectl auth can-i create pod\" first. It should be yes.")
throw e
}

// Refresh all pre-resources' owner references
try {
addOwnerReference(createdDriverPod, preKubernetesResources)
kubernetesClient.resourceList(preKubernetesResources: _*).createOrReplace()
} catch {
case NonFatal(e) =>
kubernetesClient.pods().delete(createdDriverPod)
kubernetesClient.resourceList(preKubernetesResources: _*).delete()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens when the previous line deletes some of the resources of preKubernetesResources? For the non-existing resource, delete() API is okay?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens when the previous line deletes some of the resources of preKubernetesResources? For the non-existing resource, delete() API is okay?

Yes, it's okay, false return in here instead of exception rasing. We could find answer in kubernetes-client delete implementation and resource list delete test.

I also write a simple test to delete non-existing crd in real env, it passed as expected.

  test("Deleting nonExisting crds") {
    val crd = new CustomResourceDefinitionBuilder()
      .withNewMetadata()
      .withName("nonExisting1")
      .endMetadata()
      .build()
    val crd2 = new CustomResourceDefinitionBuilder()
      .withNewMetadata()
      .withName("nonExisting")
      .endMetadata()
      .build()
    val crds = Seq(crd, crd2)
    val client = new DefaultKubernetesClient("https://127.0.0.1:52878")
    assert(client.inNamespace("default").resourceList(crd).delete() === false)
    assert(client.inNamespace("default").resourceList(crds: _*).delete() == false)
  }

throw e
}

// setup resources after pod creation, and refresh all resources' owner references
try {
val otherKubernetesResources = resolvedDriverSpec.driverKubernetesResources ++ Seq(configMap)
addOwnerReference(createdDriverPod, otherKubernetesResources)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,18 @@ private[spark] class KubernetesDriverBuilder {

val spec = KubernetesDriverSpec(
initialPod,
driverPreKubernetesResources = Seq.empty,
driverKubernetesResources = Seq.empty,
conf.sparkConf.getAll.toMap)

features.foldLeft(spec) { case (spec, feature) =>
val configuredPod = feature.configurePod(spec.pod)
val addedSystemProperties = feature.getAdditionalPodSystemProperties()
val addedPreResources = feature.getAdditionalPreKubernetesResources()
val addedResources = feature.getAdditionalKubernetesResources()
KubernetesDriverSpec(
configuredPod,
spec.driverPreKubernetesResources ++ addedPreResources,
spec.driverKubernetesResources ++ addedResources,
spec.systemProperties ++ addedSystemProperties)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ abstract class PodBuilderSuite extends SparkFunSuite {

protected def buildPod(sparkConf: SparkConf, client: KubernetesClient): SparkPod

private val baseConf = new SparkConf(false)
protected val baseConf = new SparkConf(false)
.set(Config.CONTAINER_IMAGE, "spark-executor:latest")

test("use empty initial pod if template is not specified") {
Expand Down Expand Up @@ -80,7 +80,7 @@ abstract class PodBuilderSuite extends SparkFunSuite {
assert(exception.getMessage.contains("Could not load pod from template file."))
}

private def mockKubernetesClient(pod: Pod = podWithSupportedFeatures()): KubernetesClient = {
protected def mockKubernetesClient(pod: Pod = podWithSupportedFeatures()): KubernetesClient = {
val kubernetesClient = mock(classOf[KubernetesClient])
val pods =
mock(classOf[MixedOperation[Pod, PodList, PodResource[Pod]]])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ class DriverCommandFeatureStepSuite extends SparkFunSuite {
}
val pod = step.configurePod(SparkPod.initialPod())
val props = step.getAdditionalPodSystemProperties()
KubernetesDriverSpec(pod, Nil, props)
KubernetesDriverSpec(pod, Nil, Nil, props)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.nio.file.Files
import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model._
import io.fabric8.kubernetes.api.model.apiextensions.v1.{CustomResourceDefinition, CustomResourceDefinitionBuilder}
import io.fabric8.kubernetes.client.{KubernetesClient, Watch}
import io.fabric8.kubernetes.client.dsl.PodResource
import org.mockito.{ArgumentCaptor, Mock, MockitoAnnotations}
Expand All @@ -31,7 +32,7 @@ import org.scalatest.BeforeAndAfter
import org.scalatestplus.mockito.MockitoSugar._

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.{Config, _}
import org.apache.spark.deploy.k8s.Constants._
import org.apache.spark.deploy.k8s.Fabric8Aliases._
import org.apache.spark.util.Utils
Expand Down Expand Up @@ -62,8 +63,17 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
private val ADDITIONAL_RESOURCES = Seq(
new SecretBuilder().withNewMetadata().withName("secret").endMetadata().build())

private val PRE_RESOURCES = Seq(
new CustomResourceDefinitionBuilder().withNewMetadata().withName("preCRD").endMetadata().build()
)
private val BUILT_KUBERNETES_SPEC = KubernetesDriverSpec(
SparkPod(BUILT_DRIVER_POD, BUILT_DRIVER_CONTAINER),
Nil,
ADDITIONAL_RESOURCES,
RESOLVED_JAVA_OPTIONS)
private val BUILT_KUBERNETES_SPEC_WITH_PRERES = KubernetesDriverSpec(
SparkPod(BUILT_DRIVER_POD, BUILT_DRIVER_CONTAINER),
PRE_RESOURCES,
ADDITIONAL_RESOURCES,
RESOLVED_JAVA_OPTIONS)

Expand Down Expand Up @@ -118,6 +128,20 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
.build()
}

private val PRE_ADDITIONAL_RESOURCES_WITH_OWNER_REFERENCES = PRE_RESOURCES.map { crd =>
new CustomResourceDefinitionBuilder(crd)
.editMetadata()
.addNewOwnerReference()
.withName(POD_NAME)
.withApiVersion(DRIVER_POD_API_VERSION)
.withKind(DRIVER_POD_KIND)
.withController(true)
.withUid(DRIVER_POD_UID)
.endOwnerReference()
.endMetadata()
.build()
}

@Mock
private var kubernetesClient: KubernetesClient = _

Expand Down Expand Up @@ -192,6 +216,52 @@ class ClientSuite extends SparkFunSuite with BeforeAndAfter {
assert(configMap.getData.get(SPARK_CONF_FILE_NAME).contains("conf2key=conf2value"))
}

test("SPARK-37331: The client should create Kubernetes resources with pre resources") {
val sparkConf = new SparkConf(false)
.set(Config.CONTAINER_IMAGE, "spark-executor:latest")
.set(Config.KUBERNETES_DRIVER_POD_FEATURE_STEPS.key,
"org.apache.spark.deploy.k8s.TestStepTwo," +
"org.apache.spark.deploy.k8s.TestStep")
val preResKconf: KubernetesDriverConf = KubernetesTestConf.createDriverConf(
sparkConf = sparkConf,
resourceNamePrefix = Some(KUBERNETES_RESOURCE_PREFIX)
)

when(driverBuilder.buildFromFeatures(preResKconf, kubernetesClient))
.thenReturn(BUILT_KUBERNETES_SPEC_WITH_PRERES)
val submissionClient = new Client(
preResKconf,
driverBuilder,
kubernetesClient,
loggingPodStatusWatcher)
submissionClient.run()
val otherCreatedResources = createdResourcesArgumentCaptor.getAllValues

// 2 for pre-resource creation/update, 1 for resource creation, 1 for config map
assert(otherCreatedResources.size === 4)
val preRes = otherCreatedResources.toArray
.filter(_.isInstanceOf[CustomResourceDefinition]).toSeq

// Make sure pre-resource creation/owner reference as expected
assert(preRes.size === 2)
assert(preRes.last === PRE_ADDITIONAL_RESOURCES_WITH_OWNER_REFERENCES.head)

// Make sure original resource and config map process are not affected
val secrets = otherCreatedResources.toArray.filter(_.isInstanceOf[Secret]).toSeq
assert(secrets === ADDITIONAL_RESOURCES_WITH_OWNER_REFERENCES)
val configMaps = otherCreatedResources.toArray
.filter(_.isInstanceOf[ConfigMap]).map(_.asInstanceOf[ConfigMap])
assert(secrets.nonEmpty)
assert(configMaps.nonEmpty)
val configMap = configMaps.head
assert(configMap.getMetadata.getName ===
KubernetesClientUtils.configMapNameDriver)
assert(configMap.getImmutable())
assert(configMap.getData.containsKey(SPARK_CONF_FILE_NAME))
assert(configMap.getData.get(SPARK_CONF_FILE_NAME).contains("conf1key=conf1value"))
assert(configMap.getData.get(SPARK_CONF_FILE_NAME).contains("conf2key=conf2value"))
}

test("All files from SPARK_CONF_DIR, " +
"except templates, spark config, binary files and are within size limit, " +
"should be populated to pod's configMap.") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
*/
package org.apache.spark.deploy.k8s.submit

import io.fabric8.kubernetes.api.model.HasMetadata
import io.fabric8.kubernetes.api.model.apiextensions.v1.CustomResourceDefinitionBuilder
import io.fabric8.kubernetes.client.KubernetesClient

import org.apache.spark.SparkConf
import org.apache.spark.deploy.k8s._
import org.apache.spark.deploy.k8s.features.KubernetesFeatureConfigStep
import org.apache.spark.internal.config.ConfigEntry

class KubernetesDriverBuilderSuite extends PodBuilderSuite {
Expand All @@ -36,4 +39,46 @@ class KubernetesDriverBuilderSuite extends PodBuilderSuite {
val conf = KubernetesTestConf.createDriverConf(sparkConf = sparkConf)
new KubernetesDriverBuilder().buildFromFeatures(conf, client).pod
}

private val ADDITION_PRE_RESOURCES = Seq(
new CustomResourceDefinitionBuilder().withNewMetadata().withName("preCRD").endMetadata().build()
)

test("SPARK-37331: check driver pre kubernetes resource, empty by default") {
val sparkConf = new SparkConf(false)
.set(Config.CONTAINER_IMAGE, "spark-driver:latest")
val client = mockKubernetesClient()
val conf = KubernetesTestConf.createDriverConf(sparkConf)
val spec = new KubernetesDriverBuilder().buildFromFeatures(conf, client)
assert(spec.driverPreKubernetesResources.size === 0)
}

test("SPARK-37331: check driver pre kubernetes resource as expected") {
val sparkConf = new SparkConf(false)
.set(Config.CONTAINER_IMAGE, "spark-driver:latest")
.set(Config.KUBERNETES_DRIVER_POD_FEATURE_STEPS.key,
"org.apache.spark.deploy.k8s.submit.TestStep")
val client = mockKubernetesClient()
val conf = KubernetesTestConf.createDriverConf(
sparkConf = sparkConf
)
val spec = new KubernetesDriverBuilder().buildFromFeatures(conf, client)
assert(spec.driverPreKubernetesResources.size === 1)
assert(spec.driverPreKubernetesResources === ADDITION_PRE_RESOURCES)
}
}

class TestStep extends KubernetesFeatureConfigStep {

override def configurePod(pod: SparkPod): SparkPod = {
pod
}

override def getAdditionalPreKubernetesResources(): Seq[HasMetadata] = Seq(
new CustomResourceDefinitionBuilder()
.withNewMetadata()
.withName("preCRD")
.endMetadata()
.build()
)
}