Skip to content

Commit e0179be

Browse files
committed
add zip pyspark archives in build or sparksubmit
1 parent 31e8e06 commit e0179be

File tree

4 files changed

+84
-15
lines changed

4 files changed

+84
-15
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ import org.apache.ivy.plugins.resolver.{ChainResolver, IBiblioResolver}
3939

4040
import org.apache.spark.SPARK_VERSION
4141
import org.apache.spark.deploy.rest._
42-
import org.apache.spark.util.{ChildFirstURLClassLoader, MutableURLClassLoader, Utils}
42+
import org.apache.spark.util.{Utils, ChildFirstURLClassLoader, MutableURLClassLoader}
4343

4444
/**
4545
* Whether to submit, kill, or request the status of an application.
@@ -328,12 +328,40 @@ object SparkSubmit {
328328
}
329329
}
330330

331-
// In yarn mode for a python app, if PYSPARK_ARCHIVES_PATH is in the user environment
332-
// add pyspark archives to files that can be distributed with the job
333-
if (args.isPython && clusterManager == YARN){
334-
sys.env.get("PYSPARK_ARCHIVES_PATH").map { archives =>
335-
args.files = mergeFileLists(args.files, Utils.resolveURIs(archives))
331+
// In yarn mode for a python app, add pyspark archives to files
332+
// that can be distributed with the job
333+
if (args.isPython && clusterManager == YARN) {
334+
var pyArchives: String = null
335+
if (sys.env.contains("PYSPARK_ARCHIVES_PATH")) {
336+
pyArchives = sys.env.get("PYSPARK_ARCHIVES_PATH").get
337+
} else {
338+
if (!sys.env.contains("SPARK_HOME")) {
339+
printErrorAndExit("SPARK_HOME does not exist for python application in yarn mode.")
340+
}
341+
val pythonPath = new ArrayBuffer[String]
342+
for (sparkHome <- sys.env.get("SPARK_HOME")) {
343+
val pyLibPath = Seq(sparkHome, "python", "lib").mkString(File.separator)
344+
val pyArchivesFile = new File(pyLibPath, "pyspark.zip")
345+
if (!pyArchivesFile.exists()) {
346+
val pySrc = new File(Seq(sparkHome, "python", "pyspark").mkString(File.separator))
347+
Utils.zipRecursive(pySrc, pyArchivesFile)
348+
}
349+
pythonPath += pyArchivesFile.getAbsolutePath
350+
pythonPath += Seq(pyLibPath, "py4j-0.8.2.1-src.zip").mkString(File.separator)
351+
}
352+
pyArchives = pythonPath.mkString(",")
336353
}
354+
355+
pyArchives = pyArchives.split(",").map( localPath=> {
356+
val localURI = Utils.resolveURI(localPath)
357+
if (localURI.getScheme != "local") {
358+
args.files = mergeFileLists(args.files, localURI.toString)
359+
(new Path(localPath)).getName
360+
} else {
361+
localURI.getPath.toString
362+
}
363+
}).mkString(File.pathSeparator)
364+
sysProps("spark.submit.pyArchives") = pyArchives
337365
}
338366

339367
// If we're running a R app, set the main class to our specific R runner

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark.util
1919

2020
import java.io._
21+
import java.util.zip._
2122
import java.lang.management.ManagementFactory
2223
import java.net._
2324
import java.nio.ByteBuffer
@@ -1000,6 +1001,40 @@ private[spark] object Utils extends Logging {
10001001
!fileInCanonicalDir.getCanonicalFile().equals(fileInCanonicalDir.getAbsoluteFile())
10011002
}
10021003

1004+
/**
1005+
* recursively add files to the zip file
1006+
*/
1007+
def addFilesToZip(parent: String, source: File, output: ZipOutputStream): Unit = {
1008+
if (source.isDirectory()) {
1009+
output.putNextEntry(new ZipEntry(parent + source.getName()))
1010+
for (file <- source.listFiles()) {
1011+
addFilesToZip(parent + source.getName + File.separator, file, output)
1012+
}
1013+
} else {
1014+
val in = new FileInputStream(source)
1015+
output.putNextEntry(new ZipEntry(parent + source.getName()))
1016+
val buf = new Array[Byte](8192)
1017+
var n = 0
1018+
while (n != -1) {
1019+
n = in.read(buf)
1020+
if (n != -1) {
1021+
output.write(buf, 0, n)
1022+
}
1023+
}
1024+
in.close()
1025+
}
1026+
}
1027+
1028+
/**
1029+
* zip source file to dest ZipFile
1030+
*/
1031+
def zipRecursive(source: File, destZipFile: File) = {
1032+
val destOutput = new ZipOutputStream(new FileOutputStream(destZipFile))
1033+
addFilesToZip("", source, destOutput)
1034+
destOutput.flush()
1035+
destOutput.close()
1036+
}
1037+
10031038
/**
10041039
* Determines if a directory contains any files newer than cutoff seconds.
10051040
*

project/SparkBuild.scala

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -361,12 +361,20 @@ object PySparkAssembly {
361361
// to be included in the assembly. We can't just add "python/" to the assembly's resource dir
362362
// list since that will copy unneeded / unwanted files.
363363
resourceGenerators in Compile <+= resourceManaged in Compile map { outDir: File =>
364+
val src = new File(BuildCommons.sparkHome, "python/pyspark")
365+
366+
val zipFile = new File(BuildCommons.sparkHome , "python/lib/pyspark.zip")
367+
IO.delete(zipFile)
368+
def entries(f: File):List[File] =
369+
f :: (if (f.isDirectory) IO.listFiles(f).toList.flatMap(entries(_)) else Nil)
370+
IO.zip(entries(src).map(
371+
d => (d, d.getAbsolutePath.substring(src.getParent.length +1))),
372+
zipFile)
373+
364374
val dst = new File(outDir, "pyspark")
365375
if (!dst.isDirectory()) {
366376
require(dst.mkdirs())
367377
}
368-
369-
val src = new File(BuildCommons.sparkHome, "python/pyspark")
370378
copy(src, dst)
371379
}
372380
)

yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -326,14 +326,12 @@ private[spark] class Client(
326326
distCacheMgr.setDistFilesEnv(env)
327327
distCacheMgr.setDistArchivesEnv(env)
328328

329-
// If PYSPARK_ARCHIVES_PATH is in the user environment, set PYTHONPATH to be passed
329+
// if spark.submit.pyArchives is in sparkConf, set PYTHONPATH to be passed
330330
// on to the ApplicationMaster and the executors.
331-
sys.env.get("PYSPARK_ARCHIVES_PATH").map { archives =>
332-
// archives will be distributed to each machine's working directory, so strip the
333-
// path prefix
334-
val pythonPath = archives.split(",").map(p => (new Path(p)).getName).mkString(":")
335-
env("PYTHONPATH") = pythonPath
336-
sparkConf.setExecutorEnv("PYTHONPATH", pythonPath)
331+
if (sparkConf.contains("spark.submit.pyArchives")) {
332+
val archives = sparkConf.get("spark.submit.pyArchives")
333+
env("PYTHONPATH") = archives
334+
sparkConf.setExecutorEnv("PYTHONPATH", archives)
337335
}
338336

339337
// Pick up any environment variables for the AM provided through spark.yarn.appMasterEnv.*

0 commit comments

Comments
 (0)