Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,13 @@ private[spark] class Client(
private val driverJavaOptions = submissionSparkConf.get(
org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)

/**
* Run command that initalizes a DriverSpec that will be updated
* after each KubernetesSubmissionStep in the sequence that is passed in.
* The final driver-spec will be used to build the Driver Container,
* Driver Pod, and Kubernetes Resources
*
*/
def run(): Unit = {
// Set new metadata and a new spec so that submission steps can use PodBuilder#editMetadata()
// and/or PodBuilder#editSpec() safely.
Expand All @@ -87,6 +94,8 @@ private[spark] class Client(
driverContainer = new ContainerBuilder().build(),
driverSparkConf = submissionSparkConf.clone(),
otherKubernetesResources = Seq.empty[HasMetadata])
// This orchestrator determines which steps are necessary to take to resolve varying
// client arguments that are passed in
for (nextStep <- submissionSteps) {
currentDriverSpec = nextStep.prepareSubmission(currentDriverSpec)
}
Expand Down Expand Up @@ -186,6 +195,13 @@ private[spark] object Client {
}
}

/**
* Entry point from SparkSubmit in spark-core
*
*
* @param args Array of strings that have interchanging values that will be
* parsed by ClientArguments with the identifiers that preceed the values
*/
def main(args: Array[String]): Unit = {
val parsedArguments = ClientArguments.fromCommandLineArgs(args)
val sparkConf = new SparkConf()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ private[spark] class PythonStep(

override def prepareSubmission(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
val resolvedOtherPyFilesString = if (otherPyFiles.isEmpty) {
"no-py-files"
"null"
} else {
otherPyFiles.mkString(",")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
kubernetesTestComponents.deleteNamespace()
}

test("Run PySpark Job on file from SUBMITTER") {
test("Run PySpark Job on file from SUBMITTER with --py-files") {
assume(testBackend.name == MINIKUBE_TEST_BACKEND)

launchStagingServer(SSLOptions(), None)
Expand All @@ -82,7 +82,10 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
.set(EXECUTOR_DOCKER_IMAGE,
System.getProperty("spark.docker.test.executorImage", "spark-executor-py:latest"))

runPySparkPiAndVerifyCompletion(PYSPARK_PI_SUBMITTER_LOCAL_FILE_LOCATION, Seq.empty[String])
runPySparkPiAndVerifyCompletion(
PYSPARK_PI_SUBMITTER_LOCAL_FILE_LOCATION,
Seq(PYSPARK_SORT_CONTAINER_LOCAL_FILE_LOCATION)
)
}

test("Run PySpark Job on file from CONTAINER with spark.jar defined") {
Expand Down Expand Up @@ -154,7 +157,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
runSparkApplicationAndVerifyCompletion(
JavaMainAppResource(SUBMITTER_LOCAL_MAIN_APP_RESOURCE),
GROUP_BY_MAIN_CLASS,
"The Result is",
Seq("The Result is"),
Array.empty[String],
Seq.empty[String])
}
Expand Down Expand Up @@ -218,7 +221,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
runSparkApplicationAndVerifyCompletion(
JavaMainAppResource(SUBMITTER_LOCAL_MAIN_APP_RESOURCE),
FILE_EXISTENCE_MAIN_CLASS,
s"File found at /opt/spark/${testExistenceFile.getName} with correct contents.",
Seq(s"File found at /opt/spark/${testExistenceFile.getName} with correct contents."),
Array(testExistenceFile.getName, TEST_EXISTENCE_FILE_CONTENTS),
Seq.empty[String])
}
Expand Down Expand Up @@ -250,7 +253,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
runSparkApplicationAndVerifyCompletion(
JavaMainAppResource(appResource),
SPARK_PI_MAIN_CLASS,
"Pi is roughly 3",
Seq("Pi is roughly 3"),
Array.empty[String],
Seq.empty[String])
}
Expand All @@ -260,15 +263,15 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
runSparkApplicationAndVerifyCompletion(
PythonMainAppResource(appResource),
PYSPARK_PI_MAIN_CLASS,
"Pi is roughly 3",
Seq("Submitting 5 missing tasks from ResultStage", "Pi is roughly 3"),
Array("5"),
otherPyFiles)
}

private def runSparkApplicationAndVerifyCompletion(
appResource: MainAppResource,
mainClass: String,
expectedLogOnCompletion: String,
expectedLogOnCompletion: Seq[String],
appArgs: Array[String],
otherPyFiles: Seq[String]): Unit = {
val clientArguments = ClientArguments(
Expand All @@ -284,11 +287,13 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
.getItems
.get(0)
Eventually.eventually(TIMEOUT, INTERVAL) {
assert(kubernetesTestComponents.kubernetesClient
.pods()
.withName(driverPod.getMetadata.getName)
.getLog
.contains(expectedLogOnCompletion), "The application did not complete.")
expectedLogOnCompletion.foreach { e =>
assert(kubernetesTestComponents.kubernetesClient
.pods()
.withName(driverPod.getMetadata.getName)
.getLog
.contains(e), "The application did not complete.")
}
}
}

Expand Down Expand Up @@ -356,6 +361,8 @@ private[spark] object KubernetesSuite {
val PYSPARK_PI_MAIN_CLASS = "org.apache.spark.deploy.PythonRunner"
val PYSPARK_PI_CONTAINER_LOCAL_FILE_LOCATION =
"local:///opt/spark/examples/src/main/python/pi.py"
val PYSPARK_SORT_CONTAINER_LOCAL_FILE_LOCATION =
"local:///opt/spark/examples/src/main/python/sort.py"
val PYSPARK_PI_SUBMITTER_LOCAL_FILE_LOCATION = "src/test/python/pi.py"
val FILE_EXISTENCE_MAIN_CLASS = "org.apache.spark.deploy.kubernetes" +
".integrationtest.jobs.FileExistenceTest"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ private[spark] class MinikubeTestBackend extends IntegrationTestBackend {

override def initialize(): Unit = {
Minikube.startMinikube()
// new SparkDockerImageBuilder(Minikube.getDockerEnv).buildSparkDockerImages()
new SparkDockerImageBuilder(Minikube.getDockerEnv).buildSparkDockerImages()
defaultClient = Minikube.getKubernetesClient
}

Expand Down