Skip to content

Commit 4080c4b

Browse files
committed
[SPARK-28937][SPARK-28936][KUBERNETES] Reduce test flakyness
### What changes were proposed in this pull request? Switch from using a Thread sleep for waiting for commands to finish to just waiting for the command to finish with a watcher & improve the error messages in the SecretsTestsSuite. ### Why are the changes needed? Currently some of the Spark Kubernetes tests have race conditions with command execution, and the frequent use of eventually makes debugging test failures difficult. ### Does this PR introduce any user-facing change? No ### How was this patch tested? Existing tests pass after removal of thread.sleep Closes #25765 from holdenk/SPARK-28937SPARK-28936-improve-kubernetes-integration-tests. Authored-by: Holden Karau <[email protected]> Signed-off-by: Holden Karau <[email protected]>
1 parent 42050c3 commit 4080c4b

File tree

2 files changed

+72
-19
lines changed

2 files changed

+72
-19
lines changed

resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala

Lines changed: 36 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
*/
1717
package org.apache.spark.deploy.k8s.integrationtest
1818

19+
import java.util.Locale
20+
1921
import scala.collection.JavaConverters._
2022

2123
import io.fabric8.kubernetes.api.model.{Pod, SecretBuilder}
@@ -57,11 +59,17 @@ private[spark] trait SecretsTestsSuite { k8sSuite: KubernetesSuite =>
5759
createTestSecret()
5860
sparkAppConf
5961
.set(s"spark.kubernetes.driver.secrets.$ENV_SECRET_NAME", SECRET_MOUNT_PATH)
60-
.set(s"spark.kubernetes.driver.secretKeyRef.USERNAME", s"$ENV_SECRET_NAME:username")
61-
.set(s"spark.kubernetes.driver.secretKeyRef.PASSWORD", s"$ENV_SECRET_NAME:password")
62+
.set(
63+
s"spark.kubernetes.driver.secretKeyRef.${ENV_SECRET_KEY_1_CAP}",
64+
s"$ENV_SECRET_NAME:${ENV_SECRET_KEY_1}")
65+
.set(
66+
s"spark.kubernetes.driver.secretKeyRef.${ENV_SECRET_KEY_2_CAP}",
67+
s"$ENV_SECRET_NAME:${ENV_SECRET_KEY_2}")
6268
.set(s"spark.kubernetes.executor.secrets.$ENV_SECRET_NAME", SECRET_MOUNT_PATH)
63-
.set(s"spark.kubernetes.executor.secretKeyRef.USERNAME", s"$ENV_SECRET_NAME:username")
64-
.set(s"spark.kubernetes.executor.secretKeyRef.PASSWORD", s"$ENV_SECRET_NAME:password")
69+
.set(s"spark.kubernetes.executor.secretKeyRef.${ENV_SECRET_KEY_1_CAP}",
70+
s"${ENV_SECRET_NAME}:$ENV_SECRET_KEY_1")
71+
.set(s"spark.kubernetes.executor.secretKeyRef.${ENV_SECRET_KEY_2_CAP}",
72+
s"${ENV_SECRET_NAME}:$ENV_SECRET_KEY_2")
6573
try {
6674
runSparkPiAndVerifyCompletion(
6775
driverPodChecker = (driverPod: Pod) => {
@@ -81,19 +89,30 @@ private[spark] trait SecretsTestsSuite { k8sSuite: KubernetesSuite =>
8189
}
8290

8391
private def checkSecrets(pod: Pod): Unit = {
84-
Eventually.eventually(TIMEOUT, INTERVAL) {
85-
implicit val podName: String = pod.getMetadata.getName
86-
implicit val components: KubernetesTestComponents = kubernetesTestComponents
92+
logDebug(s"Checking secrets for ${pod}")
93+
// Wait for the pod to become ready & have secrets provisioned
94+
implicit val podName: String = pod.getMetadata.getName
95+
implicit val components: KubernetesTestComponents = kubernetesTestComponents
96+
val env = Eventually.eventually(TIMEOUT, INTERVAL) {
97+
logDebug(s"Checking env of ${pod.getMetadata().getName()} ....")
8798
val env = Utils.executeCommand("env")
88-
assert(env.toString.contains(ENV_SECRET_VALUE_1))
89-
assert(env.toString.contains(ENV_SECRET_VALUE_2))
90-
val fileUsernameContents = Utils
91-
.executeCommand("cat", s"$SECRET_MOUNT_PATH/$ENV_SECRET_KEY_1")
92-
val filePasswordContents = Utils
93-
.executeCommand("cat", s"$SECRET_MOUNT_PATH/$ENV_SECRET_KEY_2")
94-
assert(fileUsernameContents.toString.trim.equals(ENV_SECRET_VALUE_1))
95-
assert(filePasswordContents.toString.trim.equals(ENV_SECRET_VALUE_2))
99+
assert(!env.isEmpty)
100+
env
96101
}
102+
env.toString should include (s"${ENV_SECRET_KEY_1_CAP}=$ENV_SECRET_VALUE_1")
103+
env.toString should include (s"${ENV_SECRET_KEY_2_CAP}=$ENV_SECRET_VALUE_2")
104+
105+
// Make sure our secret files are mounted correctly
106+
val files = Utils.executeCommand("ls", s"$SECRET_MOUNT_PATH")
107+
files should include (ENV_SECRET_KEY_1)
108+
files should include (ENV_SECRET_KEY_2)
109+
// Validate the contents
110+
val fileUsernameContents = Utils
111+
.executeCommand("cat", s"$SECRET_MOUNT_PATH/$ENV_SECRET_KEY_1")
112+
fileUsernameContents.toString.trim should equal(ENV_SECRET_VALUE_1)
113+
val filePasswordContents = Utils
114+
.executeCommand("cat", s"$SECRET_MOUNT_PATH/$ENV_SECRET_KEY_2")
115+
filePasswordContents.toString.trim should equal(ENV_SECRET_VALUE_2)
97116
}
98117
}
99118

@@ -102,6 +121,8 @@ private[spark] object SecretsTestsSuite {
102121
val SECRET_MOUNT_PATH = "/etc/secret"
103122
val ENV_SECRET_KEY_1 = "username"
104123
val ENV_SECRET_KEY_2 = "password"
124+
val ENV_SECRET_KEY_1_CAP = ENV_SECRET_KEY_1.toUpperCase(Locale.ROOT)
125+
val ENV_SECRET_KEY_2_CAP = ENV_SECRET_KEY_2.toUpperCase(Locale.ROOT)
105126
val ENV_SECRET_VALUE_1 = "secretusername"
106127
val ENV_SECRET_VALUE_2 = "secretpassword"
107128
}

resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala

Lines changed: 36 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,12 @@ package org.apache.spark.deploy.k8s.integrationtest
1818

1919
import java.io.{Closeable, File, PrintWriter}
2020
import java.nio.file.{Files, Path}
21+
import java.util.concurrent.CountDownLatch
2122

2223
import scala.collection.JavaConverters._
2324

25+
import io.fabric8.kubernetes.client.dsl.ExecListener
26+
import okhttp3.Response
2427
import org.apache.commons.io.output.ByteArrayOutputStream
2528

2629
import org.apache.spark.{SPARK_VERSION, SparkException}
@@ -45,20 +48,49 @@ object Utils extends Logging {
4548
implicit podName: String,
4649
kubernetesTestComponents: KubernetesTestComponents): String = {
4750
val out = new ByteArrayOutputStream()
48-
val watch = kubernetesTestComponents
51+
val pod = kubernetesTestComponents
4952
.kubernetesClient
5053
.pods()
5154
.withName(podName)
55+
// Avoid timing issues by looking for open/close
56+
class ReadyListener extends ExecListener {
57+
val openLatch: CountDownLatch = new CountDownLatch(1)
58+
val closeLatch: CountDownLatch = new CountDownLatch(1)
59+
60+
override def onOpen(response: Response) {
61+
openLatch.countDown()
62+
}
63+
64+
override def onClose(a: Int, b: String) {
65+
closeLatch.countDown()
66+
}
67+
68+
override def onFailure(e: Throwable, r: Response) {
69+
}
70+
71+
def waitForInputStreamToConnect(): Unit = {
72+
openLatch.await()
73+
}
74+
75+
def waitForClose(): Unit = {
76+
closeLatch.await()
77+
}
78+
}
79+
val listener = new ReadyListener()
80+
val watch = pod
5281
.readingInput(System.in)
5382
.writingOutput(out)
5483
.writingError(System.err)
5584
.withTTY()
85+
.usingListener(listener)
5686
.exec(cmd.toArray: _*)
57-
// wait to get some result back
58-
Thread.sleep(1000)
87+
// under load sometimes the stdout isn't connected by the time we try to read from it.
88+
listener.waitForInputStreamToConnect()
89+
listener.waitForClose()
5990
watch.close()
6091
out.flush()
61-
out.toString()
92+
val result = out.toString()
93+
result
6294
}
6395

6496
def createTempFile(contents: String, hostPath: String): String = {

0 commit comments

Comments
 (0)