Skip to content

Commit a9f055b

Browse files
committed
support python deps
1 parent b5ecc41 commit a9f055b

File tree

7 files changed

+128
-47
lines changed

7 files changed

+128
-47
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1521,8 +1521,8 @@ class SparkContext(config: SparkConf) extends Logging {
15211521
val schemeCorrectedURI = uri.getScheme match {
15221522
case null => new File(path).getCanonicalFile.toURI
15231523
case "local" =>
1524-
logWarning("File with 'local' scheme is not supported to add to file server, since " +
1525-
"it is already available on every node.")
1524+
logWarning(s"File with 'local' scheme $path is not supported to add to file server, " +
1525+
s"since it is already available on every node.")
15261526
return
15271527
case _ => uri
15281528
}

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -391,6 +391,7 @@ private[spark] class SparkSubmit extends Logging {
391391
downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr)
392392
}.orNull
393393
args.files = renameResourcesToLocalFS(args.files, localFiles)
394+
args.pyFiles = renameResourcesToLocalFS(args.pyFiles, localPyFiles)
394395
}
395396
}
396397

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -261,12 +261,19 @@ private[spark] object KubernetesUtils extends Logging {
261261
isLocalDependency(Utils.resolveURI(resource))
262262
}
263263

264-
def renameMainAppResource(resource: String, conf: SparkConf): String = {
264+
def renameMainAppResource(
265+
resource: String,
266+
conf: Option[SparkConf] = None,
267+
shouldUploadLocal: Boolean): String = {
265268
if (isLocalAndResolvable(resource)) {
266-
SparkLauncher.NO_RESOURCE
269+
if (shouldUploadLocal) {
270+
uploadFileUri(resource, conf)
271+
} else {
272+
SparkLauncher.NO_RESOURCE
273+
}
267274
} else {
268275
resource
269-
}
276+
}
270277
}
271278

272279
def uploadFileUri(uri: String, conf: Option[SparkConf] = None): String = {

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
157157
KUBERNETES_DRIVER_SUBMIT_CHECK.key -> "true",
158158
MEMORY_OVERHEAD_FACTOR.key -> overheadFactor.toString)
159159
// try upload local, resolvable files to a hadoop compatible file system
160-
Seq(JARS, FILES).foreach { key =>
160+
Seq(JARS, FILES, SUBMIT_PYTHON_FILES).foreach { key =>
161161
val value = conf.get(key).filter(uri => KubernetesUtils.isLocalAndResolvable(uri))
162162
val resolved = KubernetesUtils.uploadAndTransformFileUris(value, Some(conf.sparkConf))
163163
if (resolved.nonEmpty) {

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,11 @@ private[spark] class DriverCommandFeatureStep(conf: KubernetesDriverConf)
6262
}
6363

6464
private def configureForJava(pod: SparkPod, res: String): SparkPod = {
65-
val driverContainer = baseDriverContainer(pod, res).build()
65+
// re-write primary resource, app jar is also added to spark.jars by default in SparkSubmit
66+
// no uploading takes place here
67+
val newResName = KubernetesUtils
68+
.renameMainAppResource(resource = res, shouldUploadLocal = false)
69+
val driverContainer = baseDriverContainer(pod, newResName).build()
6670
SparkPod(pod.pod, driverContainer)
6771
}
6872

@@ -73,7 +77,10 @@ private[spark] class DriverCommandFeatureStep(conf: KubernetesDriverConf)
7377
.withValue(conf.get(PYSPARK_MAJOR_PYTHON_VERSION))
7478
.build())
7579

76-
val pythonContainer = baseDriverContainer(pod, res)
80+
// re-write primary resource to be the remote one and upload the related file
81+
val newResName = KubernetesUtils
82+
.renameMainAppResource(res, Option(conf.sparkConf), true)
83+
val pythonContainer = baseDriverContainer(pod, newResName)
7784
.addAllToEnv(pythonEnvs.asJava)
7885
.build()
7986

@@ -88,7 +95,7 @@ private[spark] class DriverCommandFeatureStep(conf: KubernetesDriverConf)
8895
private def baseDriverContainer(pod: SparkPod, resource: String): ContainerBuilder = {
8996
// re-write primary resource, app jar is also added to spark.jars by default in SparkSubmit
9097
val resolvedResource = if (conf.mainAppResource.isInstanceOf[JavaMainAppResource]) {
91-
KubernetesUtils.renameMainAppResource(resource, conf.sparkConf)
98+
KubernetesUtils.renameMainAppResource(resource, Option(conf.sparkConf), false)
9299
} else {
93100
resource
94101
}

resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala

Lines changed: 82 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -120,16 +120,18 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite =>
120120
.endSpec()
121121
.build()
122122

123-
kubernetesTestComponents
123+
// try until the service from a previous test is deleted
124+
Eventually.eventually(TIMEOUT, INTERVAL) (kubernetesTestComponents
124125
.kubernetesClient
125126
.services()
126-
.create(minioService)
127+
.create(minioService))
127128

128-
kubernetesTestComponents
129+
// try until the stateful set of a previous test is deleted
130+
Eventually.eventually(TIMEOUT, INTERVAL) (kubernetesTestComponents
129131
.kubernetesClient
130132
.apps()
131133
.statefulSets()
132-
.create(minioStatefulSet)
134+
.create(minioStatefulSet))
133135
}
134136

135137
private def deleteMinioStorage(): Unit = {
@@ -138,43 +140,56 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite =>
138140
.apps()
139141
.statefulSets()
140142
.withName(cName)
143+
.withGracePeriod(0)
141144
.delete()
142145

143146
kubernetesTestComponents
144147
.kubernetesClient
145148
.services()
146149
.withName(svcName)
150+
.withGracePeriod(0)
147151
.delete()
148152
}
149153

150-
test("Launcher client dependencies", k8sTestTag, MinikubeTag) {
154+
test("Launcher java client dependencies", k8sTestTag, MinikubeTag) {
151155
val fileName = Utils.createTempFile(FILE_CONTENTS, HOST_PATH)
152-
try {
153-
setupMinioStorage()
154-
val minioUrlStr = getServiceUrl(svcName)
155-
val minioUrl = new URL(minioUrlStr)
156-
val minioHost = minioUrl.getHost
157-
val minioPort = minioUrl.getPort
158-
val examplesJar = Utils.getExamplesJarAbsolutePath(sparkHomeDir)
159-
sparkAppConf
160-
.set("spark.hadoop.fs.s3a.access.key", ACCESS_KEY)
161-
.set("spark.hadoop.fs.s3a.secret.key", SECRET_KEY)
162-
.set("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
163-
.set("spark.hadoop.fs.s3a.endpoint", s"$minioHost:$minioPort")
164-
.set("spark.kubernetes.file.upload.path", s"s3a://$BUCKET")
165-
.set("spark.files", s"$HOST_PATH/$fileName")
166-
.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
167-
.set("spark.jars.packages", "com.amazonaws:aws-java-sdk:" +
168-
"1.7.4,org.apache.hadoop:hadoop-aws:2.7.6")
169-
.set("spark.driver.extraJavaOptions", "-Divy.cache.dir=/tmp -Divy.home=/tmp")
170-
createS3Bucket(ACCESS_KEY, SECRET_KEY, minioUrlStr)
156+
tryDepsTest({
157+
val examplesJar = Utils.getTestFileAbsolutePath(Utils.getExamplesJarName(), sparkHomeDir)
158+
sparkAppConf.set("spark.files", s"$HOST_PATH/$fileName")
171159
runSparkRemoteCheckAndVerifyCompletion(appResource = examplesJar,
172160
appArgs = Array(fileName),
173-
timeout = Option(DEPS_TIMEOUT))
174-
} finally {
175-
// make sure this always runs
176-
deleteMinioStorage()
177-
}
161+
timeout = Option(DEPS_TIMEOUT))})
162+
}
163+
164+
test("Launcher python client dependencies using a python file", k8sTestTag, MinikubeTag) {
165+
val depsFile = Utils.getTestFileAbsolutePath("py_container_checks.py", sparkHomeDir)
166+
testPythonDeps(depsFile)
167+
}
168+
169+
test("Launcher python client dependencies using a zip file", k8sTestTag, MinikubeTag) {
170+
val inDepsFile = Utils.getTestFileAbsolutePath("py_container_checks.py", sparkHomeDir)
171+
val outDepsFile = s"${inDepsFile.substring(0, inDepsFile.lastIndexOf("."))}.zip"
172+
Utils.createZipFile(inDepsFile, outDepsFile)
173+
testPythonDeps(outDepsFile)
174+
}
175+
176+
private def testPythonDeps(depsFile: String): Unit = {
177+
tryDepsTest({
178+
val pySparkFiles = Utils.getTestFileAbsolutePath("pyfiles.py", sparkHomeDir)
179+
setPythonSparkConfProperties(sparkAppConf)
180+
runSparkApplicationAndVerifyCompletion(
181+
appResource = pySparkFiles,
182+
mainClass = "",
183+
expectedLogOnCompletion = Seq(
184+
"Python runtime version check is: True",
185+
"Python environment version check is: True",
186+
"Python runtime version check for executor is: True"),
187+
appArgs = Array("python3"),
188+
driverPodChecker = doBasicDriverPyPodCheck,
189+
executorPodChecker = doBasicExecutorPyPodCheck,
190+
appLocator = appLocator,
191+
isJVM = false,
192+
pyFiles = Option(depsFile)) })
178193
}
179194

180195
private def extractS3Key(data: String, key: String): String = {
@@ -218,6 +233,44 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite =>
218233
url
219234
}
220235
}
236+
237+
private def getServiceHostAndPort(minioUrlStr : String) : (String, Int) = {
238+
val minioUrl = new URL(minioUrlStr)
239+
(minioUrl.getHost, minioUrl.getPort)
240+
}
241+
242+
private def setCommonSparkConfPropertiesForS3Access(
243+
conf: SparkAppConf,
244+
minioUrlStr: String): Unit = {
245+
val (minioHost, minioPort) = getServiceHostAndPort(minioUrlStr)
246+
conf.set("spark.hadoop.fs.s3a.access.key", ACCESS_KEY)
247+
.set("spark.hadoop.fs.s3a.secret.key", SECRET_KEY)
248+
.set("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
249+
.set("spark.hadoop.fs.s3a.endpoint", s"$minioHost:$minioPort")
250+
.set("spark.kubernetes.file.upload.path", s"s3a://$BUCKET")
251+
.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
252+
.set("spark.jars.packages", "com.amazonaws:aws-java-sdk:" +
253+
"1.7.4,org.apache.hadoop:hadoop-aws:2.7.6")
254+
.set("spark.driver.extraJavaOptions", "-Divy.cache.dir=/tmp -Divy.home=/tmp")
255+
}
256+
257+
private def setPythonSparkConfProperties(conf: SparkAppConf): Unit = {
258+
sparkAppConf.set("spark.kubernetes.container.image", pyImage)
259+
.set("spark.kubernetes.pyspark.pythonVersion", "3")
260+
}
261+
262+
private def tryDepsTest(runTest: => Unit): Unit = {
263+
try {
264+
setupMinioStorage()
265+
val minioUrlStr = getServiceUrl(svcName)
266+
createS3Bucket(ACCESS_KEY, SECRET_KEY, minioUrlStr)
267+
setCommonSparkConfPropertiesForS3Access(sparkAppConf, minioUrlStr)
268+
runTest
269+
} finally {
270+
// make sure this always runs
271+
deleteMinioStorage()
272+
}
273+
}
221274
}
222275

223276
private[spark] object DepsTestsSuite {

resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,16 @@
1616
*/
1717
package org.apache.spark.deploy.k8s.integrationtest
1818

19-
import java.io.{Closeable, File, PrintWriter}
19+
import java.io.{Closeable, File, FileInputStream, FileOutputStream, PrintWriter}
2020
import java.nio.file.{Files, Path}
2121
import java.util.concurrent.CountDownLatch
22+
import java.util.zip.{ZipEntry, ZipOutputStream}
2223

2324
import scala.collection.JavaConverters._
2425

2526
import io.fabric8.kubernetes.client.dsl.ExecListener
2627
import okhttp3.Response
28+
import org.apache.commons.compress.utils.IOUtils
2729
import org.apache.commons.io.output.ByteArrayOutputStream
2830

2931
import org.apache.spark.{SPARK_VERSION, SparkException}
@@ -111,24 +113,35 @@ object Utils extends Logging {
111113
filename
112114
}
113115

114-
def getExamplesJarAbsolutePath(sparkHomeDir: Path): String = {
115-
val jarName = getExamplesJarName()
116-
val jarPathsFound = Files
116+
def getTestFileAbsolutePath(fileName: String, sparkHomeDir: Path): String = {
117+
val filePathsFound = Files
117118
.walk(sparkHomeDir)
118119
.filter(Files.isRegularFile(_))
119-
.filter((f: Path) => {f.toFile.getName == jarName})
120+
.filter((f: Path) => {f.toFile.getName == fileName})
120121
// we should not have more than one here under current test build dir
121122
// we only need one though
122-
val jarPath = jarPathsFound
123+
val filePath = filePathsFound
123124
.iterator()
124125
.asScala
125126
.map(_.toAbsolutePath.toString)
126127
.toArray
127128
.headOption
128-
jarPath match {
129-
case Some(jar) => jar
130-
case _ => throw new SparkException(s"No valid $jarName file was found " +
129+
filePath match {
130+
case Some(file) => file
131+
case _ => throw new SparkException(s"No valid $fileName file was found " +
131132
s"under spark home test dir ${sparkHomeDir.toAbsolutePath}!")
132133
}
133134
}
135+
136+
def createZipFile(inFile: String, outFile: String): Unit = {
137+
val fileToZip = new File(inFile)
138+
val fis = new FileInputStream(fileToZip)
139+
val fos = new FileOutputStream(outFile)
140+
val zipOut = new ZipOutputStream(fos)
141+
val zipEntry = new ZipEntry(fileToZip.getName)
142+
zipOut.putNextEntry(zipEntry)
143+
IOUtils.copy(fis, zipOut)
144+
IOUtils.closeQuietly(fis)
145+
IOUtils.closeQuietly(zipOut)
146+
}
134147
}

0 commit comments

Comments
 (0)