Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.spark.deploy.k8s.integrationtest

import java.util.Locale

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.api.model.{Pod, SecretBuilder}
Expand Down Expand Up @@ -57,11 +59,17 @@ private[spark] trait SecretsTestsSuite { k8sSuite: KubernetesSuite =>
createTestSecret()
sparkAppConf
.set(s"spark.kubernetes.driver.secrets.$ENV_SECRET_NAME", SECRET_MOUNT_PATH)
.set(s"spark.kubernetes.driver.secretKeyRef.USERNAME", s"$ENV_SECRET_NAME:username")
.set(s"spark.kubernetes.driver.secretKeyRef.PASSWORD", s"$ENV_SECRET_NAME:password")
.set(
s"spark.kubernetes.driver.secretKeyRef.${ENV_SECRET_KEY_1_CAP}",
s"$ENV_SECRET_NAME:${ENV_SECRET_KEY_1}")
.set(
s"spark.kubernetes.driver.secretKeyRef.${ENV_SECRET_KEY_2_CAP}",
s"$ENV_SECRET_NAME:${ENV_SECRET_KEY_2}")
.set(s"spark.kubernetes.executor.secrets.$ENV_SECRET_NAME", SECRET_MOUNT_PATH)
.set(s"spark.kubernetes.executor.secretKeyRef.USERNAME", s"$ENV_SECRET_NAME:username")
.set(s"spark.kubernetes.executor.secretKeyRef.PASSWORD", s"$ENV_SECRET_NAME:password")
.set(s"spark.kubernetes.executor.secretKeyRef.${ENV_SECRET_KEY_1_CAP}",
s"${ENV_SECRET_NAME}:$ENV_SECRET_KEY_1")
.set(s"spark.kubernetes.executor.secretKeyRef.${ENV_SECRET_KEY_2_CAP}",
s"${ENV_SECRET_NAME}:$ENV_SECRET_KEY_2")
try {
runSparkPiAndVerifyCompletion(
driverPodChecker = (driverPod: Pod) => {
Expand All @@ -81,19 +89,30 @@ private[spark] trait SecretsTestsSuite { k8sSuite: KubernetesSuite =>
}

private def checkSecrets(pod: Pod): Unit = {
Eventually.eventually(TIMEOUT, INTERVAL) {
implicit val podName: String = pod.getMetadata.getName
implicit val components: KubernetesTestComponents = kubernetesTestComponents
logDebug(s"Checking secrets for ${pod}")
// Wait for the pod to become ready & have secrets provisioned
implicit val podName: String = pod.getMetadata.getName
implicit val components: KubernetesTestComponents = kubernetesTestComponents
val env = Eventually.eventually(TIMEOUT, INTERVAL) {
logDebug(s"Checking env of ${pod.getMetadata().getName()} ....")
val env = Utils.executeCommand("env")
assert(env.toString.contains(ENV_SECRET_VALUE_1))
assert(env.toString.contains(ENV_SECRET_VALUE_2))
val fileUsernameContents = Utils
.executeCommand("cat", s"$SECRET_MOUNT_PATH/$ENV_SECRET_KEY_1")
val filePasswordContents = Utils
.executeCommand("cat", s"$SECRET_MOUNT_PATH/$ENV_SECRET_KEY_2")
assert(fileUsernameContents.toString.trim.equals(ENV_SECRET_VALUE_1))
assert(filePasswordContents.toString.trim.equals(ENV_SECRET_VALUE_2))
assert(!env.isEmpty)
env
}
env.toString should include (s"${ENV_SECRET_KEY_1_CAP}=$ENV_SECRET_VALUE_1")
env.toString should include (s"${ENV_SECRET_KEY_2_CAP}=$ENV_SECRET_VALUE_2")

// Make sure our secret files are mounted correctly
val files = Utils.executeCommand("ls", s"$SECRET_MOUNT_PATH")
files should include (ENV_SECRET_KEY_1)
files should include (ENV_SECRET_KEY_2)
// Validate the contents
val fileUsernameContents = Utils
.executeCommand("cat", s"$SECRET_MOUNT_PATH/$ENV_SECRET_KEY_1")
fileUsernameContents.toString.trim should equal(ENV_SECRET_VALUE_1)
val filePasswordContents = Utils
.executeCommand("cat", s"$SECRET_MOUNT_PATH/$ENV_SECRET_KEY_2")
filePasswordContents.toString.trim should equal(ENV_SECRET_VALUE_2)
}
}

Expand All @@ -102,6 +121,8 @@ private[spark] object SecretsTestsSuite {
val SECRET_MOUNT_PATH = "/etc/secret"
val ENV_SECRET_KEY_1 = "username"
val ENV_SECRET_KEY_2 = "password"
val ENV_SECRET_KEY_1_CAP = ENV_SECRET_KEY_1.toUpperCase(Locale.ROOT)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this part of the change related, necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, although I figured cleaning up hard coded strings while I was there would be good.

val ENV_SECRET_KEY_2_CAP = ENV_SECRET_KEY_2.toUpperCase(Locale.ROOT)
val ENV_SECRET_VALUE_1 = "secretusername"
val ENV_SECRET_VALUE_2 = "secretpassword"
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ package org.apache.spark.deploy.k8s.integrationtest

import java.io.{Closeable, File, PrintWriter}
import java.nio.file.{Files, Path}
import java.util.concurrent.CountDownLatch

import scala.collection.JavaConverters._

import io.fabric8.kubernetes.client.dsl.ExecListener
import okhttp3.Response
import org.apache.commons.io.output.ByteArrayOutputStream

import org.apache.spark.{SPARK_VERSION, SparkException}
Expand All @@ -45,20 +48,49 @@ object Utils extends Logging {
implicit podName: String,
kubernetesTestComponents: KubernetesTestComponents): String = {
val out = new ByteArrayOutputStream()
val watch = kubernetesTestComponents
val pod = kubernetesTestComponents
.kubernetesClient
.pods()
.withName(podName)
// Avoid timing issues by looking for open/close
class ReadyListener extends ExecListener {
val openLatch: CountDownLatch = new CountDownLatch(1)
val closeLatch: CountDownLatch = new CountDownLatch(1)

override def onOpen(response: Response) {
openLatch.countDown()
}

override def onClose(a: Int, b: String) {
closeLatch.countDown()
}

override def onFailure(e: Throwable, r: Response) {
}

def waitForInputStreamToConnect(): Unit = {
openLatch.await()
}

def waitForClose(): Unit = {
closeLatch.await()
}
}
val listener = new ReadyListener()
val watch = pod
.readingInput(System.in)
.writingOutput(out)
.writingError(System.err)
.withTTY()
.usingListener(listener)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool I was looking for something like that in the past :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rocking :)

.exec(cmd.toArray: _*)
// wait to get some result back
Thread.sleep(1000)
// under load sometimes the stdout isn't connected by the time we try to read from it.
listener.waitForInputStreamToConnect()
listener.waitForClose()
watch.close()
out.flush()
out.toString()
val result = out.toString()
result
}

def createTempFile(contents: String, hostPath: String): String = {
Expand Down