diff --git a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala
index 1a779716ec4c0..a73859891a860 100644
--- a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala
@@ -22,6 +22,9 @@ import java.io.File
import scala.io.Source._
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.fs.Path
+
import org.apache.spark.sql.SparkSession
/**
@@ -107,6 +110,13 @@ object DFSReadWriteTest {
println("Writing local file to DFS")
val dfsFilename = s"$dfsDirPath/dfs_read_write_test"
+
+ // delete file if exists
+ val fs = FileSystem.get(spark.sessionState.newHadoopConf())
+ if (fs.exists(new Path(dfsFilename))) {
+ fs.delete(new Path(dfsFilename), true)
+ }
+
val fileRDD = spark.sparkContext.parallelize(fileContents)
fileRDD.saveAsTextFile(dfsFilename)
@@ -123,7 +133,6 @@ object DFSReadWriteTest {
.sum
spark.stop()
-
if (localWordCount == dfsWordCount) {
println(s"Success! Local Word Count $localWordCount and " +
s"DFS Word Count $dfsWordCount agree.")
@@ -131,7 +140,6 @@ object DFSReadWriteTest {
println(s"Failure! Local Word Count $localWordCount " +
s"and DFS Word Count $dfsWordCount disagree.")
}
-
}
}
// scalastyle:on println
diff --git a/resource-managers/kubernetes/integration-tests/pom.xml b/resource-managers/kubernetes/integration-tests/pom.xml
index f16b536de5142..91ef0e23bf25d 100644
--- a/resource-managers/kubernetes/integration-tests/pom.xml
+++ b/resource-managers/kubernetes/integration-tests/pom.xml
@@ -29,7 +29,7 @@
1.3.0
1.4.0
- 4.1.0
+ 4.1.2
3.2.2
1.0
kubernetes-integration-tests
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
index f8f4b4177f3bd..b26e9819284b3 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala
@@ -40,7 +40,7 @@ import org.apache.spark.internal.config._
class KubernetesSuite extends SparkFunSuite
with BeforeAndAfterAll with BeforeAndAfter with BasicTestsSuite with SecretsTestsSuite
- with PythonTestsSuite with ClientModeTestsSuite with PodTemplateSuite
+ with PythonTestsSuite with ClientModeTestsSuite with PodTemplateSuite with PVTestsSuite
with Logging with Eventually with Matchers {
import KubernetesSuite._
@@ -170,6 +170,29 @@ class KubernetesSuite extends SparkFunSuite
isJVM)
}
+ protected def runDFSReadWriteAndVerifyCompletion(
+ wordCount: Int,
+ appResource: String = containerLocalSparkDistroExamplesJar,
+ driverPodChecker: Pod => Unit = doBasicDriverPodCheck,
+ executorPodChecker: Pod => Unit = doBasicExecutorPodCheck,
+ appArgs: Array[String] = Array.empty[String],
+ appLocator: String = appLocator,
+ isJVM: Boolean = true,
+ interval: Option[PatienceConfiguration.Interval] = None): Unit = {
+ runSparkApplicationAndVerifyCompletion(
+ appResource,
+ SPARK_DFS_READ_WRITE_TEST,
+ Seq(s"Success! Local Word Count $wordCount and " +
+ s"DFS Word Count $wordCount agree."),
+ appArgs,
+ driverPodChecker,
+ executorPodChecker,
+ appLocator,
+ isJVM,
+ None,
+ interval)
+ }
+
protected def runSparkRemoteCheckAndVerifyCompletion(
appResource: String = containerLocalSparkDistroExamplesJar,
driverPodChecker: Pod => Unit = doBasicDriverPodCheck,
@@ -233,7 +256,8 @@ class KubernetesSuite extends SparkFunSuite
executorPodChecker: Pod => Unit,
appLocator: String,
isJVM: Boolean,
- pyFiles: Option[String] = None): Unit = {
+ pyFiles: Option[String] = None,
+ interval: Option[PatienceConfiguration.Interval] = None): Unit = {
val appArguments = SparkAppArguments(
mainAppResource = appResource,
mainClass = mainClass,
@@ -273,10 +297,12 @@ class KubernetesSuite extends SparkFunSuite
}
}
})
- Eventually.eventually(TIMEOUT, INTERVAL) { execPods.values.nonEmpty should be (true) }
+
+ val patienceInterval = interval.getOrElse(INTERVAL)
+ Eventually.eventually(TIMEOUT, patienceInterval) { execPods.values.nonEmpty should be (true) }
execWatcher.close()
execPods.values.foreach(executorPodChecker(_))
- Eventually.eventually(TIMEOUT, INTERVAL) {
+ Eventually.eventually(TIMEOUT, patienceInterval) {
expectedLogOnCompletion.foreach { e =>
assert(kubernetesTestComponents.kubernetesClient
.pods()
@@ -375,6 +401,7 @@ class KubernetesSuite extends SparkFunSuite
private[spark] object KubernetesSuite {
val k8sTestTag = Tag("k8s")
val SPARK_PI_MAIN_CLASS: String = "org.apache.spark.examples.SparkPi"
+ val SPARK_DFS_READ_WRITE_TEST = "org.apache.spark.examples.DFSReadWriteTest"
val SPARK_REMOTE_MAIN_CLASS: String = "org.apache.spark.examples.SparkRemoteFileTest"
val SPARK_DRIVER_MAIN_CLASS: String = "org.apache.spark.examples.DriverSubmissionTest"
val TIMEOUT = PatienceConfiguration.Timeout(Span(2, Minutes))
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
index e539c8e78dab2..40059454dacfb 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
@@ -21,6 +21,7 @@ import java.util.UUID
import scala.collection.JavaConverters._
import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
import io.fabric8.kubernetes.client.DefaultKubernetesClient
import org.scalatest.concurrent.Eventually
@@ -124,7 +125,7 @@ private[spark] object SparkAppLauncher extends Logging {
appConf.toStringArray :+ appArguments.mainAppResource
if (appArguments.appArgs.nonEmpty) {
- commandLine += appArguments.appArgs.mkString(" ")
+ commandLine ++= appArguments.appArgs.to[ArrayBuffer]
}
logInfo(s"Launching a spark app with command line: ${commandLine.mkString(" ")}")
ProcessUtils.executeProcess(commandLine.toArray, timeoutSecs)
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PVTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PVTestsSuite.scala
new file mode 100644
index 0000000000000..d7a237f999c02
--- /dev/null
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/PVTestsSuite.scala
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.deploy.k8s.integrationtest
+
+import java.io.{File, PrintWriter}
+
+import scala.collection.JavaConverters._
+
+import io.fabric8.kubernetes.api.model._
+import io.fabric8.kubernetes.api.model.storage.StorageClassBuilder
+import org.scalatest.Tag
+import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
+import org.scalatest.time.{Milliseconds, Span}
+
+import org.apache.spark.deploy.k8s.integrationtest.KubernetesSuite._
+
+private[spark] trait PVTestsSuite { k8sSuite: KubernetesSuite =>
+ import PVTestsSuite._
+
+ private def setupLocalStorage(): Unit = {
+ val scBuilder = new StorageClassBuilder()
+ .withKind("StorageClass")
+ .withApiVersion("storage.k8s.io/v1")
+ .withNewMetadata()
+ .withName(STORAGE_NAME)
+ .endMetadata()
+ .withProvisioner("kubernetes.io/no-provisioner")
+ .withVolumeBindingMode("WaitForFirstConsumer")
+
+ val pvBuilder = new PersistentVolumeBuilder()
+ .withKind("PersistentVolume")
+ .withApiVersion("v1")
+ .withNewMetadata()
+ .withName("test-local-pv")
+ .endMetadata()
+ .withNewSpec()
+ .withCapacity(Map("storage" -> new QuantityBuilder().withAmount("1Gi").build()).asJava)
+ .withAccessModes("ReadWriteOnce")
+ .withPersistentVolumeReclaimPolicy("Retain")
+ .withStorageClassName("test-local-storage")
+ .withLocal(new LocalVolumeSourceBuilder().withPath(VM_PATH).build())
+ .withNewNodeAffinity()
+ .withNewRequired()
+ .withNodeSelectorTerms(new NodeSelectorTermBuilder()
+ .withMatchExpressions(new NodeSelectorRequirementBuilder()
+ .withKey("kubernetes.io/hostname")
+ .withOperator("In")
+ .withValues("minikube").build()).build())
+ .endRequired()
+ .endNodeAffinity()
+ .endSpec()
+
+ val pvcBuilder = new PersistentVolumeClaimBuilder()
+ .withKind("PersistentVolumeClaim")
+ .withApiVersion("v1")
+ .withNewMetadata()
+ .withName(PVC_NAME)
+ .endMetadata()
+ .withNewSpec()
+ .withAccessModes("ReadWriteOnce")
+ .withStorageClassName("test-local-storage")
+ .withResources(new ResourceRequirementsBuilder()
+ .withRequests(Map("storage" -> new QuantityBuilder()
+ .withAmount("1Gi").build()).asJava).build())
+ .endSpec()
+
+ kubernetesTestComponents
+ .kubernetesClient
+ .storage()
+ .storageClasses()
+ .create(scBuilder.build())
+
+ kubernetesTestComponents
+ .kubernetesClient
+ .persistentVolumes()
+ .create(pvBuilder.build())
+
+ kubernetesTestComponents
+ .kubernetesClient
+ .persistentVolumeClaims()
+ .create(pvcBuilder.build())
+ }
+
+ private def deleteLocalStorage(): Unit = {
+ kubernetesTestComponents
+ .kubernetesClient
+ .persistentVolumeClaims()
+ .withName(PVC_NAME)
+ .delete()
+
+ kubernetesTestComponents
+ .kubernetesClient
+ .persistentVolumes()
+ .withName(PV_NAME)
+ .delete()
+
+ kubernetesTestComponents
+ .kubernetesClient
+ .storage()
+ .storageClasses()
+ .withName(STORAGE_NAME)
+ .delete()
+ }
+
+ private def checkPVs(pod: Pod, file: String) = {
+ Eventually.eventually(TIMEOUT, INTERVAL) {
+ implicit val podName: String = pod.getMetadata.getName
+ implicit val components: KubernetesTestComponents = kubernetesTestComponents
+ val contents = Utils.executeCommand("cat", s"$CONTAINER_MOUNT_PATH/$file")
+ assert(contents.toString.trim.equals(FILE_CONTENTS))
+ }
+ }
+
+ private def createTempFile(): String = {
+ val filename = try {
+ val f = File.createTempFile("tmp", ".txt", new File(HOST_PATH))
+ f.deleteOnExit()
+ new PrintWriter(f) {
+ try {
+ write(FILE_CONTENTS)
+ } finally {
+ close()
+ }
+ }
+ f.getName
+ } catch {
+ case e: Exception => e.printStackTrace(); throw e;
+ }
+ filename
+ }
+
+ test("Test PVs with local storage", k8sTestTag, MinikubeTag) {
+ sparkAppConf
+ .set(s"spark.kubernetes.driver.volumes.persistentVolumeClaim.data.mount.path",
+ CONTAINER_MOUNT_PATH)
+ .set(s"spark.kubernetes.driver.volumes.persistentVolumeClaim.data.options.claimName",
+ PVC_NAME)
+ .set(s"spark.kubernetes.executor.volumes.persistentVolumeClaim.data.mount.path",
+ CONTAINER_MOUNT_PATH)
+ .set(s"spark.kubernetes.executor.volumes.persistentVolumeClaim.data.options.claimName",
+ PVC_NAME)
+ val file = createTempFile()
+ try {
+ setupLocalStorage()
+ runDFSReadWriteAndVerifyCompletion(
+ FILE_CONTENTS.split(" ").length,
+ driverPodChecker = (driverPod: Pod) => {
+ doBasicDriverPodCheck(driverPod)
+ checkPVs(driverPod, file)
+ },
+ executorPodChecker = (executorPod: Pod) => {
+ doBasicExecutorPodCheck(executorPod)
+ checkPVs(executorPod, file)
+ },
+ appArgs = Array(s"$CONTAINER_MOUNT_PATH/$file", s"$CONTAINER_MOUNT_PATH"),
+ interval = Some(PV_TESTS_INTERVAL)
+ )
+ } finally {
+ // make sure this always run
+ deleteLocalStorage()
+ }
+ }
+}
+
+private[spark] object PVTestsSuite {
+ val MinikubeTag = Tag("minikube")
+ val STORAGE_NAME = "test-local-storage"
+ val PV_NAME = "test-local-pv"
+ val PVC_NAME = "test-local-pvc"
+ val CONTAINER_MOUNT_PATH = "/opt/spark/pv-tests"
+ val HOST_PATH = sys.env.getOrElse("PVC_TESTS_HOST_PATH", "/tmp")
+ val VM_PATH = sys.env.getOrElse("PVC_TESTS_VM_PATH", "/tmp")
+ val FILE_CONTENTS = "test PVs"
+ val PV_TESTS_INTERVAL = PatienceConfiguration.Interval(Span(10, Milliseconds))
+}
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala
index b18a6aebda497..cd61ea1040f35 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/SecretsTestsSuite.scala
@@ -83,33 +83,18 @@ private[spark] trait SecretsTestsSuite { k8sSuite: KubernetesSuite =>
private def checkSecrets(pod: Pod): Unit = {
Eventually.eventually(TIMEOUT, INTERVAL) {
implicit val podName: String = pod.getMetadata.getName
- val env = executeCommand("env")
+ implicit val components: KubernetesTestComponents = kubernetesTestComponents
+ val env = Utils.executeCommand("env")
assert(env.toString.contains(ENV_SECRET_VALUE_1))
assert(env.toString.contains(ENV_SECRET_VALUE_2))
- val fileUsernameContents = executeCommand("cat", s"$SECRET_MOUNT_PATH/$ENV_SECRET_KEY_1")
- val filePasswordContents = executeCommand("cat", s"$SECRET_MOUNT_PATH/$ENV_SECRET_KEY_2")
+ val fileUsernameContents = Utils
+ .executeCommand("cat", s"$SECRET_MOUNT_PATH/$ENV_SECRET_KEY_1")
+ val filePasswordContents = Utils
+ .executeCommand("cat", s"$SECRET_MOUNT_PATH/$ENV_SECRET_KEY_2")
assert(fileUsernameContents.toString.trim.equals(ENV_SECRET_VALUE_1))
assert(filePasswordContents.toString.trim.equals(ENV_SECRET_VALUE_2))
}
}
-
- private def executeCommand(cmd: String*)(implicit podName: String): String = {
- val out = new ByteArrayOutputStream()
- val watch = kubernetesTestComponents
- .kubernetesClient
- .pods()
- .withName(podName)
- .readingInput(System.in)
- .writingOutput(out)
- .writingError(System.err)
- .withTTY()
- .exec(cmd.toArray: _*)
- // wait to get some result back
- Thread.sleep(1000)
- watch.close()
- out.flush()
- out.toString()
- }
}
private[spark] object SecretsTestsSuite {
diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala
index 663f8b6523ac8..d425f707180c8 100644
--- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala
+++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala
@@ -19,6 +19,8 @@ package org.apache.spark.deploy.k8s.integrationtest
import java.io.Closeable
import java.net.URI
+import org.apache.commons.io.output.ByteArrayOutputStream
+
import org.apache.spark.internal.Logging
object Utils extends Logging {
@@ -27,4 +29,24 @@ object Utils extends Logging {
val resource = createResource
try f.apply(resource) finally resource.close()
}
+
+ def executeCommand(cmd: String*)(
+ implicit podName: String,
+ kubernetesTestComponents: KubernetesTestComponents): String = {
+ val out = new ByteArrayOutputStream()
+ val watch = kubernetesTestComponents
+ .kubernetesClient
+ .pods()
+ .withName(podName)
+ .readingInput(System.in)
+ .writingOutput(out)
+ .writingError(System.err)
+ .withTTY()
+ .exec(cmd.toArray: _*)
+ // wait to get some result back
+ Thread.sleep(1000)
+ watch.close()
+ out.flush()
+ out.toString()
+ }
}