Skip to content

Commit f8df227

Browse files
committed
support python deps
1 parent 5a482e7 commit f8df227

File tree

7 files changed

+128
-23
lines changed

7 files changed

+128
-23
lines changed

core/src/main/scala/org/apache/spark/SparkContext.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1522,8 +1522,8 @@ class SparkContext(config: SparkConf) extends Logging {
15221522
val schemeCorrectedPath = uri.getScheme match {
15231523
case null => new File(path).getCanonicalFile.toURI.toString
15241524
case "local" =>
1525-
logWarning("File with 'local' scheme is not supported to add to file server, since " +
1526-
"it is already available on every node.")
1525+
logWarning(s"File with 'local' scheme $path is not supported to add to file server, " +
1526+
s"since it is already available on every node.")
15271527
return
15281528
case _ => path
15291529
}

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -390,6 +390,7 @@ private[spark] class SparkSubmit extends Logging {
390390
downloadFileList(_, targetDir, sparkConf, hadoopConf, secMgr)
391391
}.orNull
392392
args.files = renameResourcesToLocalFS(args.files, localFiles)
393+
args.pyFiles = renameResourcesToLocalFS(args.pyFiles, localPyFiles)
393394
}
394395
}
395396

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/KubernetesUtils.scala

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -259,12 +259,19 @@ private[spark] object KubernetesUtils extends Logging {
259259
isLocalDependency(Utils.resolveURI(resource))
260260
}
261261

262-
def renameMainAppResource(resource: String, conf: SparkConf): String = {
262+
def renameMainAppResource(
263+
resource: String,
264+
conf: Option[SparkConf] = None,
265+
isJava: Boolean = true): String = {
263266
if (isLocalAndResolvable(resource)) {
264-
SparkLauncher.NO_RESOURCE
267+
if (isJava) {
268+
SparkLauncher.NO_RESOURCE
269+
} else {
270+
uploadFileUri(resource, conf)
271+
}
265272
} else {
266273
resource
267-
}
274+
}
268275
}
269276

270277
def uploadFileUri(uri: String, conf: Option[SparkConf] = None): String = {

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/BasicDriverFeatureStep.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
160160
KUBERNETES_DRIVER_SUBMIT_CHECK.key -> "true",
161161
MEMORY_OVERHEAD_FACTOR.key -> overheadFactor.toString)
162162
// try upload local, resolvable files to a hadoop compatible file system
163-
Seq(JARS, FILES).foreach { key =>
163+
Seq(JARS, FILES, SUBMIT_PYTHON_FILES).foreach { key =>
164164
val value = conf.get(key).filter(uri => KubernetesUtils.isLocalAndResolvable(uri))
165165
val resolved = KubernetesUtils.uploadAndTransformFileUris(value, Some(conf.sparkConf))
166166
if (resolved.nonEmpty) {
@@ -170,4 +170,3 @@ private[spark] class BasicDriverFeatureStep(conf: KubernetesDriverConf)
170170
additionalProps.toMap
171171
}
172172
}
173-

resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/features/DriverCommandFeatureStep.scala

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,10 @@ private[spark] class DriverCommandFeatureStep(conf: KubernetesDriverConf)
6262
}
6363

6464
private def configureForJava(pod: SparkPod, res: String): SparkPod = {
65-
val driverContainer = baseDriverContainer(pod, res).build()
65+
// re-write primary resource, app jar is also added to spark.jars by default in SparkSubmit
66+
// no uploading takes place here
67+
val newResName = KubernetesUtils.renameMainAppResource(res)
68+
val driverContainer = baseDriverContainer(pod, newResName).build()
6669
SparkPod(pod.pod, driverContainer)
6770
}
6871

@@ -73,7 +76,9 @@ private[spark] class DriverCommandFeatureStep(conf: KubernetesDriverConf)
7376
.withValue(conf.get(PYSPARK_MAJOR_PYTHON_VERSION))
7477
.build())
7578

76-
val pythonContainer = baseDriverContainer(pod, res)
79+
// re-write primary resource to be the remote one and upload the related file
80+
val newResName = KubernetesUtils.renameMainAppResource(res, Some(conf.sparkConf), false)
81+
val pythonContainer = baseDriverContainer(pod, newResName)
7782
.addAllToEnv(pythonEnvs.asJava)
7883
.build()
7984

@@ -86,17 +91,11 @@ private[spark] class DriverCommandFeatureStep(conf: KubernetesDriverConf)
8691
}
8792

8893
private def baseDriverContainer(pod: SparkPod, resource: String): ContainerBuilder = {
89-
// re-write primary resource, app jar is also added to spark.jars by default in SparkSubmit
90-
val resolvedResource = if (conf.mainAppResource.isInstanceOf[JavaMainAppResource]) {
91-
KubernetesUtils.renameMainAppResource(resource, conf.sparkConf)
92-
} else {
93-
resource
94-
}
9594
new ContainerBuilder(pod.container)
9695
.addToArgs("driver")
9796
.addToArgs("--properties-file", SPARK_CONF_PATH)
9897
.addToArgs("--class", conf.mainClass)
99-
.addToArgs(resolvedResource)
98+
.addToArgs(resource)
10099
.addToArgs(conf.appArgs: _*)
101100
}
102101
}

resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/DepsTestsSuite.scala

Lines changed: 66 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -125,16 +125,18 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite =>
125125
.endSpec()
126126
.build()
127127

128-
kubernetesTestComponents
128+
// try until the service from a previous test is deleted
129+
Eventually.eventually(TIMEOUT, INTERVAL) (kubernetesTestComponents
129130
.kubernetesClient
130131
.services()
131-
.create(cephService)
132+
.create(cephService))
132133

133-
kubernetesTestComponents
134+
// try until the stateful set of a previous test is deleted
135+
Eventually.eventually(TIMEOUT, INTERVAL) (kubernetesTestComponents
134136
.kubernetesClient
135137
.apps()
136138
.statefulSets()
137-
.create(cephStatefulSet)
139+
.create(cephStatefulSet))
138140
}
139141

140142
private def deleteCephStorage(): Unit = {
@@ -143,24 +145,26 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite =>
143145
.apps()
144146
.statefulSets()
145147
.withName(cName)
148+
.withGracePeriod(0)
146149
.delete()
147150

148151
kubernetesTestComponents
149152
.kubernetesClient
150153
.services()
151154
.withName(svcName)
155+
.withGracePeriod(0)
152156
.delete()
153157
}
154158

155-
test("Launcher client dependencies", k8sTestTag, MinikubeTag) {
159+
test("Launcher java client dependencies", k8sTestTag, MinikubeTag) {
156160
val fileName = Utils.createTempFile(FILE_CONTENTS, HOST_PATH)
157161
try {
158162
setupCephStorage()
159163
val cephUrlStr = getServiceUrl(svcName)
160164
val cephUrl = new URL(cephUrlStr)
161165
val cephHost = cephUrl.getHost
162166
val cephPort = cephUrl.getPort
163-
val examplesJar = Utils.getExamplesJarAbsolutePath(sparkHomeDir)
167+
val examplesJar = Utils.getTestFileAbsolutePath(Utils.getExamplesJarName(), sparkHomeDir)
164168
val (accessKey, secretKey) = getCephCredentials()
165169
sparkAppConf
166170
.set("spark.hadoop.fs.s3a.access.key", accessKey)
@@ -183,6 +187,62 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite =>
183187
}
184188
}
185189

190+
test("Launcher python client dependencies using py", k8sTestTag, MinikubeTag) {
191+
val depsFile = Utils.getTestFileAbsolutePath("py_container_checks.py", sparkHomeDir)
192+
testPythonDeps(depsFile)
193+
}
194+
195+
test("Launcher python client dependencies using a zip file", k8sTestTag, MinikubeTag) {
196+
val inDepsFile = Utils.getTestFileAbsolutePath("py_container_checks.py", sparkHomeDir)
197+
val outDepsFile = s"${inDepsFile.substring(0, inDepsFile.lastIndexOf("."))}.zip"
198+
Utils.createZipFile(inDepsFile, outDepsFile)
199+
testPythonDeps(outDepsFile)
200+
}
201+
202+
private def testPythonDeps(depsFile: String): Unit = {
203+
try {
204+
setupCephStorage()
205+
val cephUrlStr = getServiceUrl(svcName)
206+
val cephUrl = new URL(cephUrlStr)
207+
val cephHost = cephUrl.getHost
208+
val cephPort = cephUrl.getPort
209+
val examplesJar = Utils.getTestFileAbsolutePath(Utils.getExamplesJarName(), sparkHomeDir)
210+
211+
val (accessKey, secretKey) = getCephCredentials()
212+
sparkAppConf
213+
.set("spark.kubernetes.container.image", pyImage)
214+
.set("spark.kubernetes.pyspark.pythonVersion", "3")
215+
.set("spark.hadoop.fs.s3a.access.key", accessKey)
216+
.set("spark.hadoop.fs.s3a.secret.key", secretKey)
217+
.set("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
218+
.set("spark.hadoop.fs.s3a.endpoint", s"$cephHost:$cephPort")
219+
.set("spark.kubernetes.file.upload.path", s"s3a://$bucket")
220+
.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
221+
.set("spark.jars.packages", "com.amazonaws:aws-java-sdk:" +
222+
"1.7.4,org.apache.hadoop:hadoop-aws:2.7.6")
223+
.set("spark.driver.extraJavaOptions", "-Divy.cache.dir=/tmp -Divy.home=/tmp")
224+
createS3Bucket(accessKey, secretKey, cephUrlStr)
225+
val pySparkFiles = Utils.getTestFileAbsolutePath("pyfiles.py", sparkHomeDir )
226+
val pyContainerChecks = depsFile
227+
runSparkApplicationAndVerifyCompletion(
228+
appResource = pySparkFiles,
229+
mainClass = "",
230+
expectedLogOnCompletion = Seq(
231+
"Python runtime version check is: True",
232+
"Python environment version check is: True",
233+
"Python runtime version check for executor is: True"),
234+
appArgs = Array("python"),
235+
driverPodChecker = doBasicDriverPyPodCheck,
236+
executorPodChecker = doBasicExecutorPyPodCheck,
237+
appLocator = appLocator,
238+
isJVM = false,
239+
pyFiles = Some(pyContainerChecks))
240+
} finally {
241+
// make sure this always runs
242+
deleteCephStorage()
243+
}
244+
}
245+
186246
// There isn't a cleaner way to get the credentials
187247
// when ceph-nano runs on k8s
188248
private def getCephCredentials(): (String, String) = {

resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/Utils.scala

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,16 @@
1616
*/
1717
package org.apache.spark.deploy.k8s.integrationtest
1818

19-
import java.io.{Closeable, File, PrintWriter}
19+
import java.io.{Closeable, File, FileInputStream, FileOutputStream, PrintWriter}
2020
import java.nio.file.{Files, Path}
2121
import java.util.concurrent.CountDownLatch
22+
import java.util.zip.{ZipEntry, ZipOutputStream}
2223

2324
import scala.collection.JavaConverters._
2425

2526
import io.fabric8.kubernetes.client.dsl.ExecListener
2627
import okhttp3.Response
28+
import org.apache.commons.compress.utils.IOUtils
2729
import org.apache.commons.io.output.ByteArrayOutputStream
2830

2931
import org.apache.spark.{SPARK_VERSION, SparkException}
@@ -131,4 +133,41 @@ object Utils extends Logging {
131133
s"under spark home test dir ${sparkHomeDir.toAbsolutePath}!")
132134
}
133135
}
136+
137+
def getTestFileAbsolutePath(fileName: String, sparkHomeDir: Path): String = {
138+
val filePathsFound = Files
139+
.walk(sparkHomeDir)
140+
.filter(Files.isRegularFile(_))
141+
.filter((f: Path) => {f.toFile.getName == fileName})
142+
// we should not have more than one here under current test build dir
143+
// we only need one though
144+
val filePath = filePathsFound
145+
.iterator()
146+
.asScala
147+
.map(_.toAbsolutePath.toString)
148+
.toArray
149+
.headOption
150+
filePath match {
151+
case Some(file) => file
152+
case _ => throw new SparkException(s"No valid $fileName file was found " +
153+
s"under spark home test dir ${sparkHomeDir.toAbsolutePath}!")
154+
}
155+
}
156+
157+
def createZipFile(inFile: String, outFile: String): Unit = {
158+
try {
159+
val fileToZip = new File(inFile)
160+
val fis = new FileInputStream(fileToZip)
161+
val fos = new FileOutputStream(outFile)
162+
val zipOut = new ZipOutputStream(fos)
163+
val zipEntry = new ZipEntry(fileToZip.getName)
164+
zipOut.putNextEntry(zipEntry)
165+
IOUtils.copy(fis, zipOut)
166+
IOUtils.closeQuietly(fis)
167+
IOUtils.closeQuietly(zipOut)
168+
} catch {
169+
case e: Exception => log.error(s"Failed to zip file: $inFile"); throw e
170+
}
171+
}
134172
}
173+

0 commit comments

Comments
 (0)