Skip to content

Commit 39577a2

Browse files
Stavros Kontopoulosshaneknapp
authored andcommitted
[SPARK-24902][K8S] Add PV integration tests
## What changes were proposed in this pull request? - Adds persistent volume integration tests - Adds a custom tag to the test to exclude it if it is run against a cloud backend. - Assumes default fs type for the host, AFAIK that is ext4. ## How was this patch tested? Manually run the tests against minikube as usual: ``` [INFO] --- scalatest-maven-plugin:1.0:test (integration-test) spark-kubernetes-integration-tests_2.12 --- Discovery starting. Discovery completed in 192 milliseconds. Run starting. Expected test count is: 16 KubernetesSuite: - Run SparkPi with no resources - Run SparkPi with a very long application name. - Use SparkLauncher.NO_RESOURCE - Run SparkPi with a master URL without a scheme. - Run SparkPi with an argument. - Run SparkPi with custom labels, annotations, and environment variables. - Run extraJVMOptions check on driver - Run SparkRemoteFileTest using a remote data file - Run SparkPi with env and mount secrets. - Run PySpark on simple pi.py example - Run PySpark with Python2 to test a pyfiles example - Run PySpark with Python3 to test a pyfiles example - Run PySpark with memory customization - Run in client mode. - Start pod creation from template - Test PVs with local storage ``` Closes #23514 from skonto/pvctests. Authored-by: Stavros Kontopoulos <[email protected]> Signed-off-by: shane knapp <[email protected]>
1 parent 49b0411 commit 39577a2

File tree

6 files changed

+260
-28
lines changed

6 files changed

+260
-28
lines changed

examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ import java.io.File
2222

2323
import scala.io.Source._
2424

25+
import org.apache.hadoop.fs.FileSystem
26+
import org.apache.hadoop.fs.Path
27+
2528
import org.apache.spark.sql.SparkSession
2629

2730
/**
@@ -107,6 +110,13 @@ object DFSReadWriteTest {
107110

108111
println("Writing local file to DFS")
109112
val dfsFilename = s"$dfsDirPath/dfs_read_write_test"
113+
114+
// delete file if exists
115+
val fs = FileSystem.get(spark.sessionState.newHadoopConf())
116+
if (fs.exists(new Path(dfsFilename))) {
117+
fs.delete(new Path(dfsFilename), true)
118+
}
119+
110120
val fileRDD = spark.sparkContext.parallelize(fileContents)
111121
fileRDD.saveAsTextFile(dfsFilename)
112122

@@ -123,15 +133,13 @@ object DFSReadWriteTest {
123133
.sum
124134

125135
spark.stop()
126-
127136
if (localWordCount == dfsWordCount) {
128137
println(s"Success! Local Word Count $localWordCount and " +
129138
s"DFS Word Count $dfsWordCount agree.")
130139
} else {
131140
println(s"Failure! Local Word Count $localWordCount " +
132141
s"and DFS Word Count $dfsWordCount disagree.")
133142
}
134-
135143
}
136144
}
137145
// scalastyle:on println

resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ import org.apache.spark.internal.config._
4040

4141
class KubernetesSuite extends SparkFunSuite
4242
with BeforeAndAfterAll with BeforeAndAfter with BasicTestsSuite with SecretsTestsSuite
43-
with PythonTestsSuite with ClientModeTestsSuite with PodTemplateSuite
43+
with PythonTestsSuite with ClientModeTestsSuite with PodTemplateSuite with PVTestsSuite
4444
with Logging with Eventually with Matchers {
4545

4646
import KubernetesSuite._
@@ -178,6 +178,29 @@ class KubernetesSuite extends SparkFunSuite
178178
isJVM)
179179
}
180180

181+
protected def runDFSReadWriteAndVerifyCompletion(
182+
wordCount: Int,
183+
appResource: String = containerLocalSparkDistroExamplesJar,
184+
driverPodChecker: Pod => Unit = doBasicDriverPodCheck,
185+
executorPodChecker: Pod => Unit = doBasicExecutorPodCheck,
186+
appArgs: Array[String] = Array.empty[String],
187+
appLocator: String = appLocator,
188+
isJVM: Boolean = true,
189+
interval: Option[PatienceConfiguration.Interval] = None): Unit = {
190+
runSparkApplicationAndVerifyCompletion(
191+
appResource,
192+
SPARK_DFS_READ_WRITE_TEST,
193+
Seq(s"Success! Local Word Count $wordCount and " +
194+
s"DFS Word Count $wordCount agree."),
195+
appArgs,
196+
driverPodChecker,
197+
executorPodChecker,
198+
appLocator,
199+
isJVM,
200+
None,
201+
interval)
202+
}
203+
181204
protected def runSparkRemoteCheckAndVerifyCompletion(
182205
appResource: String = containerLocalSparkDistroExamplesJar,
183206
driverPodChecker: Pod => Unit = doBasicDriverPodCheck,
@@ -241,7 +264,8 @@ class KubernetesSuite extends SparkFunSuite
241264
executorPodChecker: Pod => Unit,
242265
appLocator: String,
243266
isJVM: Boolean,
244-
pyFiles: Option[String] = None): Unit = {
267+
pyFiles: Option[String] = None,
268+
interval: Option[PatienceConfiguration.Interval] = None): Unit = {
245269
val appArguments = SparkAppArguments(
246270
mainAppResource = appResource,
247271
mainClass = mainClass,
@@ -281,10 +305,12 @@ class KubernetesSuite extends SparkFunSuite
281305
}
282306
}
283307
})
284-
Eventually.eventually(TIMEOUT, INTERVAL) { execPods.values.nonEmpty should be (true) }
308+
309+
val patienceInterval = interval.getOrElse(INTERVAL)
310+
Eventually.eventually(TIMEOUT, patienceInterval) { execPods.values.nonEmpty should be (true) }
285311
execWatcher.close()
286312
execPods.values.foreach(executorPodChecker(_))
287-
Eventually.eventually(TIMEOUT, INTERVAL) {
313+
Eventually.eventually(TIMEOUT, patienceInterval) {
288314
expectedLogOnCompletion.foreach { e =>
289315
assert(kubernetesTestComponents.kubernetesClient
290316
.pods()
@@ -383,6 +409,7 @@ class KubernetesSuite extends SparkFunSuite
383409
private[spark] object KubernetesSuite {
384410
val k8sTestTag = Tag("k8s")
385411
val SPARK_PI_MAIN_CLASS: String = "org.apache.spark.examples.SparkPi"
412+
val SPARK_DFS_READ_WRITE_TEST = "org.apache.spark.examples.DFSReadWriteTest"
386413
val SPARK_REMOTE_MAIN_CLASS: String = "org.apache.spark.examples.SparkRemoteFileTest"
387414
val SPARK_DRIVER_MAIN_CLASS: String = "org.apache.spark.examples.DriverSubmissionTest"
388415
val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes))

resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import java.util.UUID
2121

2222
import scala.collection.JavaConverters._
2323
import scala.collection.mutable
24+
import scala.collection.mutable.ArrayBuffer
2425

2526
import io.fabric8.kubernetes.client.DefaultKubernetesClient
2627
import org.scalatest.concurrent.Eventually
@@ -124,7 +125,7 @@ private[spark] object SparkAppLauncher extends Logging {
124125
appConf.toStringArray :+ appArguments.mainAppResource
125126

126127
if (appArguments.appArgs.nonEmpty) {
127-
commandLine += appArguments.appArgs.mkString(" ")
128+
commandLine ++= appArguments.appArgs.to[ArrayBuffer]
128129
}
129130
logInfo(s"Launching a spark app with command line: ${commandLine.mkString(" ")}")
130131
ProcessUtils.executeProcess(commandLine.toArray, timeoutSecs)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.deploy.k8s.integrationtest
18+
19+
import java.io.{File, PrintWriter}
20+
21+
import scala.collection.JavaConverters._
22+
23+
import io.fabric8.kubernetes.api.model._
24+
import io.fabric8.kubernetes.api.model.storage.StorageClassBuilder
25+
import org.scalatest.Tag
26+
import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
27+
import org.scalatest.time.{Milliseconds, Span}
28+
29+
import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite._
30+
31+
private[spark] trait PVTestsSuite { k8sSuite: KubernetesSuite =>
32+
import PVTestsSuite._
33+
34+
private def setupLocalStorage(): Unit = {
35+
val scBuilder = new StorageClassBuilder()
36+
.withKind("StorageClass")
37+
.withApiVersion("storage.k8s.io/v1")
38+
.withNewMetadata()
39+
.withName(STORAGE_NAME)
40+
.endMetadata()
41+
.withProvisioner("kubernetes.io/no-provisioner")
42+
.withVolumeBindingMode("WaitForFirstConsumer")
43+
44+
val pvBuilder = new PersistentVolumeBuilder()
45+
.withKind("PersistentVolume")
46+
.withApiVersion("v1")
47+
.withNewMetadata()
48+
.withName("test-local-pv")
49+
.endMetadata()
50+
.withNewSpec()
51+
.withCapacity(Map("storage" -> new QuantityBuilder().withAmount("1Gi").build()).asJava)
52+
.withAccessModes("ReadWriteOnce")
53+
.withPersistentVolumeReclaimPolicy("Retain")
54+
.withStorageClassName("test-local-storage")
55+
.withLocal(new LocalVolumeSourceBuilder().withPath(VM_PATH).build())
56+
.withNewNodeAffinity()
57+
.withNewRequired()
58+
.withNodeSelectorTerms(new NodeSelectorTermBuilder()
59+
.withMatchExpressions(new NodeSelectorRequirementBuilder()
60+
.withKey("kubernetes.io/hostname")
61+
.withOperator("In")
62+
.withValues("minikube").build()).build())
63+
.endRequired()
64+
.endNodeAffinity()
65+
.endSpec()
66+
67+
val pvcBuilder = new PersistentVolumeClaimBuilder()
68+
.withKind("PersistentVolumeClaim")
69+
.withApiVersion("v1")
70+
.withNewMetadata()
71+
.withName(PVC_NAME)
72+
.endMetadata()
73+
.withNewSpec()
74+
.withAccessModes("ReadWriteOnce")
75+
.withStorageClassName("test-local-storage")
76+
.withResources(new ResourceRequirementsBuilder()
77+
.withRequests(Map("storage" -> new QuantityBuilder()
78+
.withAmount("1Gi").build()).asJava).build())
79+
.endSpec()
80+
81+
kubernetesTestComponents
82+
.kubernetesClient
83+
.storage()
84+
.storageClasses()
85+
.create(scBuilder.build())
86+
87+
kubernetesTestComponents
88+
.kubernetesClient
89+
.persistentVolumes()
90+
.create(pvBuilder.build())
91+
92+
kubernetesTestComponents
93+
.kubernetesClient
94+
.persistentVolumeClaims()
95+
.create(pvcBuilder.build())
96+
}
97+
98+
private def deleteLocalStorage(): Unit = {
99+
kubernetesTestComponents
100+
.kubernetesClient
101+
.persistentVolumeClaims()
102+
.withName(PVC_NAME)
103+
.delete()
104+
105+
kubernetesTestComponents
106+
.kubernetesClient
107+
.persistentVolumes()
108+
.withName(PV_NAME)
109+
.delete()
110+
111+
kubernetesTestComponents
112+
.kubernetesClient
113+
.storage()
114+
.storageClasses()
115+
.withName(STORAGE_NAME)
116+
.delete()
117+
}
118+
119+
private def checkPVs(pod: Pod, file: String) = {
120+
Eventually.eventually(TIMEOUT, INTERVAL) {
121+
implicit val podName: String = pod.getMetadata.getName
122+
implicit val components: KubernetesTestComponents = kubernetesTestComponents
123+
val contents = Utils.executeCommand("cat", s"$CONTAINER_MOUNT_PATH/$file")
124+
assert(contents.toString.trim.equals(FILE_CONTENTS))
125+
}
126+
}
127+
128+
private def createTempFile(): String = {
129+
val filename = try {
130+
val f = File.createTempFile("tmp", ".txt", new File(HOST_PATH))
131+
f.deleteOnExit()
132+
new PrintWriter(f) {
133+
try {
134+
write(FILE_CONTENTS)
135+
} finally {
136+
close()
137+
}
138+
}
139+
f.getName
140+
} catch {
141+
case e: Exception => e.printStackTrace(); throw e;
142+
}
143+
filename
144+
}
145+
146+
test("Test PVs with local storage", k8sTestTag, MinikubeTag) {
147+
sparkAppConf
148+
.set(s"spark.kubernetes.driver.volumes.persistentVolumeClaim.data.mount.path",
149+
CONTAINER_MOUNT_PATH)
150+
.set(s"spark.kubernetes.driver.volumes.persistentVolumeClaim.data.options.claimName",
151+
PVC_NAME)
152+
.set(s"spark.kubernetes.executor.volumes.persistentVolumeClaim.data.mount.path",
153+
CONTAINER_MOUNT_PATH)
154+
.set(s"spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.claimName",
155+
PVC_NAME)
156+
val file = createTempFile()
157+
try {
158+
setupLocalStorage()
159+
runDFSReadWriteAndVerifyCompletion(
160+
FILE_CONTENTS.split(" ").length,
161+
driverPodChecker = (driverPod: Pod) => {
162+
doBasicDriverPodCheck(driverPod)
163+
checkPVs(driverPod, file)
164+
},
165+
executorPodChecker = (executorPod: Pod) => {
166+
doBasicExecutorPodCheck(executorPod)
167+
checkPVs(executorPod, file)
168+
},
169+
appArgs = Array(s"$CONTAINER_MOUNT_PATH/$file", s"$CONTAINER_MOUNT_PATH"),
170+
interval = Some(PV_TESTS_INTERVAL)
171+
)
172+
} finally {
173+
// make sure this always run
174+
deleteLocalStorage()
175+
}
176+
}
177+
}
178+
179+
private[spark] object PVTestsSuite {
180+
val MinikubeTag = Tag("minikube")
181+
val STORAGE_NAME = "test-local-storage"
182+
val PV_NAME = "test-local-pv"
183+
val PVC_NAME = "test-local-pvc"
184+
val CONTAINER_MOUNT_PATH = "/opt/spark/pv-tests"
185+
val HOST_PATH = sys.env.getOrElse("PVC_TESTS_HOST_PATH", "/tmp")
186+
val VM_PATH = sys.env.getOrElse("PVC_TESTS_VM_PATH", "/tmp")
187+
val FILE_CONTENTS = "test PVs"
188+
val PV_TESTS_INTERVAL = PatienceConfiguration.Interval(Span(10, Milliseconds))
189+
}

resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala

Lines changed: 6 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -83,33 +83,18 @@ private[spark] trait SecretsTestsSuite { k8sSuite: KubernetesSuite =>
8383
private def checkSecrets(pod: Pod): Unit = {
8484
Eventually.eventually(TIMEOUT, INTERVAL) {
8585
implicit val podName: String = pod.getMetadata.getName
86-
val env = executeCommand("env")
86+
implicit val components: KubernetesTestComponents = kubernetesTestComponents
87+
val env = Utils.executeCommand("env")
8788
assert(env.toString.contains(ENV_SECRET_VALUE_1))
8889
assert(env.toString.contains(ENV_SECRET_VALUE_2))
89-
val fileUsernameContents = executeCommand("cat", s"$SECRET_MOUNT_PATH/$ENV_SECRET_KEY_1")
90-
val filePasswordContents = executeCommand("cat", s"$SECRET_MOUNT_PATH/$ENV_SECRET_KEY_2")
90+
val fileUsernameContents = Utils
91+
.executeCommand("cat", s"$SECRET_MOUNT_PATH/$ENV_SECRET_KEY_1")
92+
val filePasswordContents = Utils
93+
.executeCommand("cat", s"$SECRET_MOUNT_PATH/$ENV_SECRET_KEY_2")
9194
assert(fileUsernameContents.toString.trim.equals(ENV_SECRET_VALUE_1))
9295
assert(filePasswordContents.toString.trim.equals(ENV_SECRET_VALUE_2))
9396
}
9497
}
95-
96-
private def executeCommand(cmd: String*)(implicit podName: String): String = {
97-
val out = new ByteArrayOutputStream()
98-
val watch = kubernetesTestComponents
99-
.kubernetesClient
100-
.pods()
101-
.withName(podName)
102-
.readingInput(System.in)
103-
.writingOutput(out)
104-
.writingError(System.err)
105-
.withTTY()
106-
.exec(cmd.toArray: _*)
107-
// wait to get some result back
108-
Thread.sleep(1000)
109-
watch.close()
110-
out.flush()
111-
out.toString()
112-
}
11398
}
11499

115100
private[spark] object SecretsTestsSuite {

resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ package org.apache.spark.deploy.k8s.integrationtest
1919
import java.io.Closeable
2020
import java.net.URI
2121

22+
import org.apache.commons.io.output.ByteArrayOutputStream
23+
2224
import org.apache.spark.internal.Logging
2325

2426
object Utils extends Logging {
@@ -27,4 +29,24 @@ object Utils extends Logging {
2729
val resource = createResource
2830
try f.apply(resource) finally resource.close()
2931
}
32+
33+
def executeCommand(cmd: String*)(
34+
implicit podName: String,
35+
kubernetesTestComponents: KubernetesTestComponents): String = {
36+
val out = new ByteArrayOutputStream()
37+
val watch = kubernetesTestComponents
38+
.kubernetesClient
39+
.pods()
40+
.withName(podName)
41+
.readingInput(System.in)
42+
.writingOutput(out)
43+
.writingError(System.err)
44+
.withTTY()
45+
.exec(cmd.toArray: _*)
46+
// wait to get some result back
47+
Thread.sleep(1000)
48+
watch.close()
49+
out.flush()
50+
out.toString()
51+
}
3052
}

0 commit comments

Comments
 (0)