Skip to content

Commit d012cde

Browse files
committed
Update
1 parent c63f31f commit d012cde

File tree

6 files changed

+77
-38
lines changed

6 files changed

+77
-38
lines changed

core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
4949

5050
val pythonPath = PythonUtils.mergePythonPaths(
5151
PythonUtils.sparkPythonPath,
52-
envVars.getOrElse("PYTHONPATH", sys.env.getOrElse("PYSPARK_ARCHIVES_PATH", "")),
52+
envVars.getOrElse("PYTHONPATH", ""),
5353
sys.env.getOrElse("PYTHONPATH", ""))
5454

5555
def create(): Socket = {

core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ object PythonRunner {
5252
pathElements ++= formattedPyFiles
5353
pathElements += PythonUtils.sparkPythonPath
5454
pathElements += sys.env.getOrElse("PYTHONPATH", "")
55-
pathElements += sys.env.getOrElse("PYSPARK_ARCHIVES_PATH", "")
5655
val pythonPath = PythonUtils.mergePythonPaths(pathElements: _*)
5756

5857
// Launch Python process

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,21 @@ object SparkSubmit {
356356
}
357357
}
358358

359+
if (args.isPython && clusterManager == YARN) {
360+
// Zip PySpark from ${SPARK_HOME}/python/pyspark to ${SPARK_HOME}/lib/pyspark.zip
361+
// and ship to executors by Yarn.
362+
for (sparkHome <- sys.env.get("SPARK_HOME")) {
363+
val srcFile = new File(Seq(sparkHome, "python", "pyspark").mkString(File.separator))
364+
val archives = new File(Seq(sparkHome, "lib", "pyspark.zip").mkString(File.separator))
365+
if (archives.exists() || Utils.createZipArchives(archives, srcFile, "pyspark")) {
366+
val py4jPath = Seq(sparkHome, "python", "lib", "py4j-0.8.2.1-src.zip")
367+
.mkString(File.separator)
368+
args.files = mergeFileLists(args.files, Utils.resolveURIs(archives.getAbsolutePath),
369+
py4jPath)
370+
}
371+
}
372+
}
373+
359374
// Special flag to avoid deprecation warnings at the client
360375
sysProps("SPARK_SUBMIT") = "true"
361376

core/src/main/scala/org/apache/spark/util/Utils.scala

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import java.io._
2121
import java.lang.management.ManagementFactory
2222
import java.net._
2323
import java.nio.ByteBuffer
24+
import java.util.zip.{ZipEntry, ZipOutputStream}
2425
import java.util.{Properties, Locale, Random, UUID}
2526
import java.util.concurrent.{ThreadFactory, ConcurrentHashMap, Executors, ThreadPoolExecutor}
2627
import javax.net.ssl.HttpsURLConnection
@@ -2106,6 +2107,56 @@ private[spark] object Utils extends Logging {
21062107
.getOrElse(UserGroupInformation.getCurrentUser().getShortUserName())
21072108
}
21082109

2110+
/**
2111+
* Create zip archives.
2112+
*/
2113+
def createZipArchives(archives: File, srcFile: File, rootPath: String): Boolean = {
2114+
var flag = false
2115+
try {
2116+
val fileOutStream = new FileOutputStream(archives)
2117+
val buffOutStream = new BufferedOutputStream(fileOutStream)
2118+
val zipOutStream = new ZipOutputStream(buffOutStream)
2119+
flag = doZip(zipOutStream, rootPath, srcFile)
2120+
zipOutStream.close()
2121+
buffOutStream.close()
2122+
fileOutStream.close()
2123+
2124+
} catch {
2125+
case e: FileNotFoundException => logError("File to zip not found")
2126+
}
2127+
flag
2128+
}
2129+
2130+
private def doZip(zipOutStream: ZipOutputStream, curPath: String, file: File): Boolean = {
2131+
var flag = false
2132+
if (file.isDirectory) {
2133+
val files = file.listFiles()
2134+
if (files != null && files.length > 0) {
2135+
zipOutStream.putNextEntry(new ZipEntry(curPath + File.separator))
2136+
val nextPath = if (curPath.length == 0) "" else curPath + File.separator
2137+
for (subFile <- files) {
2138+
flag = doZip(zipOutStream, nextPath + subFile.getName, subFile)
2139+
}
2140+
}
2141+
} else {
2142+
zipOutStream.putNextEntry(new ZipEntry(curPath))
2143+
val fileInStream = new FileInputStream(file)
2144+
val buffInStream = new BufferedInputStream(fileInStream)
2145+
val bufSize = 8192
2146+
val buf = new Array[Byte](bufSize)
2147+
var len: Int = buffInStream.read(buf, 0, bufSize)
2148+
while (len != -1) {
2149+
zipOutStream.write(buf, 0, len)
2150+
len = buffInStream.read(buf, 0, bufSize)
2151+
}
2152+
zipOutStream.flush()
2153+
flag = true
2154+
buffInStream.close()
2155+
fileInStream.close()
2156+
}
2157+
flag
2158+
}
2159+
21092160
}
21102161

21112162
/**

yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala

Lines changed: 10 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,6 @@ private[spark] class Client(
248248
List(
249249
(SPARK_JAR, sparkJar(sparkConf), CONF_SPARK_JAR),
250250
(APP_JAR, args.userJar, CONF_SPARK_USER_JAR),
251-
(PYSPARK_ARCHIVES, pysparkArchives(sparkConf), CONF_PYSPARK_ARCHIVES),
252251
("log4j.properties", oldLog4jConf.orNull, null)
253252
).foreach { case (destName, _localPath, confKey) =>
254253
val localPath: String = if (_localPath != null) _localPath.trim() else ""
@@ -381,19 +380,26 @@ private[spark] class Client(
381380
* This sets up the launch environment, java options, and the command for launching the AM.
382381
*/
383382
private def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse)
384-
: ContainerLaunchContext = {
383+
: ContainerLaunchContext = {
385384
logInfo("Setting up container launch context for our AM")
386385

387386
val appId = newAppResponse.getApplicationId
388387
val appStagingDir = getAppStagingDir(appId)
389388
val localResources = prepareLocalResources(appStagingDir)
390389
val launchEnv = setupLaunchEnv(appStagingDir)
390+
391391
// From SPARK-1920 and SPARK-1520 we know PySpark on Yarn can not work when the assembly jar are
392392
// package by JDK 1.7+, so we ship PySpark archives to executors as assembly jar, and add this
393393
// path to PYTHONPATH.
394-
for ((resPath, res) <- localResources if resPath.contains(PYSPARK_ARCHIVES)) {
395-
launchEnv("PYSPARK_ARCHIVES_PATH") = resPath
394+
var pysparkArchives = new ArrayBuffer[String]()
395+
for ((resLink, res) <- localResources) {
396+
if (resLink.contains("pyspark") || resLink.contains("py4j")) {
397+
pysparkArchives.+=(resLink)
398+
}
396399
}
400+
launchEnv("PYTHONPATH") = pysparkArchives.toArray.mkString(File.pathSeparator)
401+
sparkConf.setExecutorEnv("PYTHONPATH", pysparkArchives.toArray.mkString(File.pathSeparator))
402+
397403
val amContainer = Records.newRecord(classOf[ContainerLaunchContext])
398404
amContainer.setLocalResources(localResources)
399405
amContainer.setEnvironment(launchEnv)
@@ -692,7 +698,6 @@ object Client extends Logging {
692698
// Alias for the Spark assembly jar, the user jar and PySpark archives
693699
val SPARK_JAR: String = "__spark__.jar"
694700
val APP_JAR: String = "__app__.jar"
695-
val PYSPARK_ARCHIVES: String = "__pyspark__.zip"
696701

697702
// URI scheme that identifies local resources
698703
val LOCAL_SCHEME = "local"
@@ -704,9 +709,6 @@ object Client extends Logging {
704709
val CONF_SPARK_JAR = "spark.yarn.jar"
705710
val ENV_SPARK_JAR = "SPARK_JAR"
706711

707-
// Location of any user-defined PySpark archives
708-
val CONF_PYSPARK_ARCHIVES = "spark.pyspark.archives"
709-
710712
// Internal config to propagate the location of the user's jar to the driver/executors
711713
val CONF_SPARK_USER_JAR = "spark.yarn.user.jar"
712714

@@ -745,28 +747,6 @@ object Client extends Logging {
745747
}
746748
}
747749

748-
/**
749-
* Find the user-defined PySpark archives if configured, or return default.
750-
* The default pyspark.zip is in the same path with assembly jar.
751-
*/
752-
private def pysparkArchives(conf: SparkConf): String = {
753-
if (conf.contains(CONF_PYSPARK_ARCHIVES)) {
754-
conf.get(CONF_PYSPARK_ARCHIVES)
755-
} else {
756-
SparkContext.jarOfClass(this.getClass) match {
757-
case Some(jarPath) =>
758-
val path = new File(jarPath)
759-
val archives = new File(path.getParent + File.separator + "pyspark.zip")
760-
if (archives.exists()) {
761-
archives.getAbsolutePath
762-
} else {
763-
""
764-
}
765-
case None => ""
766-
}
767-
}
768-
}
769-
770750
/**
771751
* Return the path to the given application's staging directory.
772752
*/

yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -299,12 +299,6 @@ class ExecutorRunnable(
299299
}
300300

301301
System.getenv().filterKeys(_.startsWith("SPARK")).foreach { case (k, v) => env(k) = v }
302-
303-
// Add PySpark archives path
304-
sys.env.get("PYSPARK_ARCHIVES_PATH") match {
305-
case Some(pythonArchivesPath) => env("PYSPARK_ARCHIVES_PATH") = pythonArchivesPath
306-
case None =>
307-
}
308302
env
309303
}
310304
}

0 commit comments

Comments
 (0)