Skip to content
This repository was archived by the owner on Jan 9, 2020. It is now read-only.

Commit 5c60e09

Browse files
committed
Small edits to docs, tests, and python specifics
1 parent 9822155 commit 5c60e09

File tree

4 files changed

+37
-14
lines changed

4 files changed

+37
-14
lines changed

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/Client.scala

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,13 @@ private[spark] class Client(
7878
private val driverJavaOptions = submissionSparkConf.get(
7979
org.apache.spark.internal.config.DRIVER_JAVA_OPTIONS)
8080

81+
/**
82+
* Run command that initalizes a DriverSpec that will be updated
83+
* after each KubernetesSubmissionStep in the sequence that is passed in.
84+
* The final driver-spec will be used to build the Driver Container,
85+
* Driver Pod, and Kubernetes Resources
86+
*
87+
*/
8188
def run(): Unit = {
8289
// Set new metadata and a new spec so that submission steps can use PodBuilder#editMetadata()
8390
// and/or PodBuilder#editSpec() safely.
@@ -87,6 +94,8 @@ private[spark] class Client(
8794
driverContainer = new ContainerBuilder().build(),
8895
driverSparkConf = submissionSparkConf.clone(),
8996
otherKubernetesResources = Seq.empty[HasMetadata])
97+
// This orchestrator determines which steps are necessary to take to resolve varying
98+
// client arguments that are passed in
9099
for (nextStep <- submissionSteps) {
91100
currentDriverSpec = nextStep.prepareSubmission(currentDriverSpec)
92101
}
@@ -186,6 +195,13 @@ private[spark] object Client {
186195
}
187196
}
188197

198+
/**
199+
* Entry point from SparkSubmit in spark-core
200+
*
201+
*
202+
* @param args Array of strings that have interchanging values that will be
203+
* parsed by ClientArguments with the identifiers that preceed the values
204+
*/
189205
def main(args: Array[String]): Unit = {
190206
val parsedArguments = ClientArguments.fromCommandLineArgs(args)
191207
val sparkConf = new SparkConf()

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/kubernetes/submit/submitsteps/PythonStep.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ private[spark] class PythonStep(
2828

2929
override def prepareSubmission(driverSpec: KubernetesDriverSpec): KubernetesDriverSpec = {
3030
val resolvedOtherPyFilesString = if (otherPyFiles.isEmpty) {
31-
"no-py-files"
31+
"null"
3232
} else {
3333
otherPyFiles.mkString(",")
3434
}

resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/KubernetesSuite.scala

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
7272
kubernetesTestComponents.deleteNamespace()
7373
}
7474

75-
test("Run PySpark Job on file from SUBMITTER") {
75+
test("Run PySpark Job on file from SUBMITTER with --py-files") {
7676
assume(testBackend.name == MINIKUBE_TEST_BACKEND)
7777

7878
launchStagingServer(SSLOptions(), None)
@@ -82,7 +82,10 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
8282
.set(EXECUTOR_DOCKER_IMAGE,
8383
System.getProperty("spark.docker.test.executorImage", "spark-executor-py:latest"))
8484

85-
runPySparkPiAndVerifyCompletion(PYSPARK_PI_SUBMITTER_LOCAL_FILE_LOCATION, Seq.empty[String])
85+
runPySparkPiAndVerifyCompletion(
86+
PYSPARK_PI_SUBMITTER_LOCAL_FILE_LOCATION,
87+
Seq(PYSPARK_SORT_CONTAINER_LOCAL_FILE_LOCATION)
88+
)
8689
}
8790

8891
test("Run PySpark Job on file from CONTAINER with spark.jar defined") {
@@ -154,7 +157,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
154157
runSparkApplicationAndVerifyCompletion(
155158
JavaMainAppResource(SUBMITTER_LOCAL_MAIN_APP_RESOURCE),
156159
GROUP_BY_MAIN_CLASS,
157-
"The Result is",
160+
Seq("The Result is"),
158161
Array.empty[String],
159162
Seq.empty[String])
160163
}
@@ -218,7 +221,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
218221
runSparkApplicationAndVerifyCompletion(
219222
JavaMainAppResource(SUBMITTER_LOCAL_MAIN_APP_RESOURCE),
220223
FILE_EXISTENCE_MAIN_CLASS,
221-
s"File found at /opt/spark/${testExistenceFile.getName} with correct contents.",
224+
Seq(s"File found at /opt/spark/${testExistenceFile.getName} with correct contents."),
222225
Array(testExistenceFile.getName, TEST_EXISTENCE_FILE_CONTENTS),
223226
Seq.empty[String])
224227
}
@@ -250,7 +253,7 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
250253
runSparkApplicationAndVerifyCompletion(
251254
JavaMainAppResource(appResource),
252255
SPARK_PI_MAIN_CLASS,
253-
"Pi is roughly 3",
256+
Seq("Pi is roughly 3"),
254257
Array.empty[String],
255258
Seq.empty[String])
256259
}
@@ -260,15 +263,15 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
260263
runSparkApplicationAndVerifyCompletion(
261264
PythonMainAppResource(appResource),
262265
PYSPARK_PI_MAIN_CLASS,
263-
"Pi is roughly 3",
266+
Seq("Submitting 5 missing tasks from ResultStage", "Pi is roughly 3"),
264267
Array("5"),
265268
otherPyFiles)
266269
}
267270

268271
private def runSparkApplicationAndVerifyCompletion(
269272
appResource: MainAppResource,
270273
mainClass: String,
271-
expectedLogOnCompletion: String,
274+
expectedLogOnCompletion: Seq[String],
272275
appArgs: Array[String],
273276
otherPyFiles: Seq[String]): Unit = {
274277
val clientArguments = ClientArguments(
@@ -284,11 +287,13 @@ private[spark] class KubernetesSuite extends SparkFunSuite with BeforeAndAfter {
284287
.getItems
285288
.get(0)
286289
Eventually.eventually(TIMEOUT, INTERVAL) {
287-
assert(kubernetesTestComponents.kubernetesClient
288-
.pods()
289-
.withName(driverPod.getMetadata.getName)
290-
.getLog
291-
.contains(expectedLogOnCompletion), "The application did not complete.")
290+
expectedLogOnCompletion.foreach { e =>
291+
assert(kubernetesTestComponents.kubernetesClient
292+
.pods()
293+
.withName(driverPod.getMetadata.getName)
294+
.getLog
295+
.contains(e), "The application did not complete.")
296+
}
292297
}
293298
}
294299

@@ -356,6 +361,8 @@ private[spark] object KubernetesSuite {
356361
val PYSPARK_PI_MAIN_CLASS = "org.apache.spark.deploy.PythonRunner"
357362
val PYSPARK_PI_CONTAINER_LOCAL_FILE_LOCATION =
358363
"local:///opt/spark/examples/src/main/python/pi.py"
364+
val PYSPARK_SORT_CONTAINER_LOCAL_FILE_LOCATION =
365+
"local:///opt/spark/examples/src/main/python/sort.py"
359366
val PYSPARK_PI_SUBMITTER_LOCAL_FILE_LOCATION = "src/test/python/pi.py"
360367
val FILE_EXISTENCE_MAIN_CLASS = "org.apache.spark.deploy.kubernetes" +
361368
".integrationtest.jobs.FileExistenceTest"

resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/kubernetes/integrationtest/backend/minikube/MinikubeTestBackend.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ private[spark] class MinikubeTestBackend extends IntegrationTestBackend {
2727

2828
override def initialize(): Unit = {
2929
Minikube.startMinikube()
30-
// new SparkDockerImageBuilder(Minikube.getDockerEnv).buildSparkDockerImages()
30+
new SparkDockerImageBuilder(Minikube.getDockerEnv).buildSparkDockerImages()
3131
defaultClient = Minikube.getKubernetesClient
3232
}
3333

0 commit comments

Comments
 (0)