diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 72123f2232532..b16cc962a27e9 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -114,7 +114,8 @@ class SparkEnv ( def createPythonWorker(pythonExec: String, envVars: Map[String, String]): java.net.Socket = { synchronized { val key = (pythonExec, envVars) - pythonWorkers.getOrElseUpdate(key, new PythonWorkerFactory(pythonExec, envVars)).create() + pythonWorkers.getOrElseUpdate(key, + new PythonWorkerFactory(pythonExec, envVars, conf)).create() } } diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala index 6afa37aa36fd3..d5e8c356491a4 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala @@ -21,6 +21,7 @@ import java.io.{DataInputStream, DataOutputStream, EOFException, InputStream, Ou import java.net.{InetAddress, ServerSocket, Socket, SocketException} import java.nio.charset.StandardCharsets import java.util.Arrays +import java.util.concurrent.atomic.AtomicInteger import scala.collection.JavaConverters._ import scala.collection.mutable @@ -30,7 +31,11 @@ import org.apache.spark.internal.Logging import org.apache.spark.security.SocketAuthHelper import org.apache.spark.util.{RedirectThread, Utils} -private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String, String]) + +private[spark] class PythonWorkerFactory( + pythonExec: String, + envVars: Map[String, String], + conf: SparkConf) extends Logging { import PythonWorkerFactory._ @@ -76,6 +81,14 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String val daemonWorkers = new mutable.WeakHashMap[Socket, Int]() val idleWorkers = new mutable.Queue[Socket]() var lastActivity = 0L + val virtualEnvEnabled = conf.getBoolean("spark.pyspark.virtualenv.enabled", false) + val virtualenvPythonExec = if (virtualEnvEnabled) { + val virtualEnvFactory = new VirtualEnvFactory(pythonExec, conf, false) + virtualEnvFactory.setupVirtualEnv() + } else { + pythonExec + } + new MonitorThread().start() var simpleWorkers = new mutable.WeakHashMap[Socket, Process]() @@ -144,7 +157,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(Array(127, 0, 0, 1))) // Create and start the worker - val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", workerModule)) + val pb = new ProcessBuilder(Arrays.asList(virtualenvPythonExec, "-m", workerModule)) val workerEnv = pb.environment() workerEnv.putAll(envVars.asJava) workerEnv.put("PYTHONPATH", pythonPath) @@ -186,7 +199,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String try { // Create and start the daemon - val command = Arrays.asList(pythonExec, "-m", daemonModule) + val command = Arrays.asList(virtualenvPythonExec, "-m", daemonModule) val pb = new ProcessBuilder(command) val workerEnv = pb.environment() workerEnv.putAll(envVars.asJava) diff --git a/core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala b/core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala new file mode 100644 index 0000000000000..53cdfa2c319a6 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/api/python/VirtualEnvFactory.scala @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.python + +import java.io.File +import java.util.{Map => JMap} +import java.util.Arrays +import java.util.concurrent.atomic.AtomicInteger + +import scala.collection.JavaConverters._ + +import com.google.common.io.Files + +import org.apache.spark.SparkConf +import org.apache.spark.internal.Logging + + +class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: Boolean) + extends Logging { + + private val virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native") + private val virtualEnvBinPath = conf.get("spark.pyspark.virtualenv.bin.path", "") + private val initPythonPackages = conf.getOption("spark.pyspark.virtualenv.packages") + private var virtualEnvName: String = _ + private var virtualPythonExec: String = _ + private val VIRTUALENV_ID = new AtomicInteger() + private var isLauncher: Boolean = false + + // used by launcher when user want to use virtualenv in pyspark shell. Launcher need this class + // to create virtualenv for driver. + def this(pythonExec: String, properties: JMap[String, String], isDriver: java.lang.Boolean) { + this(pythonExec, new SparkConf().setAll(properties.asScala), isDriver) + this.isLauncher = true + } + + /* + * Create virtualenv using native virtualenv or conda + * + */ + def setupVirtualEnv(): String = { + /* + * + * Native Virtualenv: + * - Execute command: virtualenv -p --no-site-packages + * - Execute command: python -m pip --cache-dir install -r + * + * Conda + * - Execute command: conda create --prefix --file -y + * + */ + logInfo("Start to setup virtualenv...") + logDebug("user.dir=" + System.getProperty("user.dir")) + logDebug("user.home=" + System.getProperty("user.home")) + + require(virtualEnvType == "native" || virtualEnvType == "conda", + s"VirtualEnvType: $virtualEnvType is not supported." ) + require(new File(virtualEnvBinPath).exists(), + s"VirtualEnvBinPath: $virtualEnvBinPath is not defined or doesn't exist.") + // Two scenarios of creating virtualenv: + // 1. created in yarn container. Yarn will clean it up after container is exited + // 2. created outside yarn container. Spark need to create temp directory and clean it after app + // finish. + // - driver of PySpark shell + // - driver of yarn-client mode + if (isLauncher || + (isDriver && conf.get("spark.submit.deployMode") == "client")) { + val virtualenvBasedir = Files.createTempDir() + virtualenvBasedir.deleteOnExit() + virtualEnvName = virtualenvBasedir.getAbsolutePath + } else if (isDriver && conf.get("spark.submit.deployMode") == "cluster") { + virtualEnvName = "virtualenv_driver" + } else { + // use the working directory of Executor + virtualEnvName = "virtualenv_" + conf.getAppId + "_" + VIRTUALENV_ID.getAndIncrement() + } + + // Use the absolute path of requirement file in the following cases + // 1. driver of pyspark shell + // 2. driver of yarn-client mode + // otherwise just use filename as it would be downloaded to the working directory of Executor + val pysparkRequirements = + if (isLauncher || + (isDriver && conf.get("spark.submit.deployMode") == "client")) { + conf.getOption("spark.pyspark.virtualenv.requirements") + } else { + conf.getOption("spark.pyspark.virtualenv.requirements").map(_.split("/").last) + } + + val createEnvCommand = + if (virtualEnvType == "native") { + List(virtualEnvBinPath, + "-p", pythonExec, + "--no-site-packages", virtualEnvName) + } else { + // Two cases of conda + // 1. requirement file is specified. (Batch mode) + // 2. requirement file is not specified. (Interactive mode). + // In this case `spark.pyspark.virtualenv.python_version` must be specified. + + if (pysparkRequirements.isDefined) { + List(virtualEnvBinPath, + "create", "--prefix", virtualEnvName, + "--file", pysparkRequirements.get, "-y") + } else { + require(conf.contains("spark.pyspark.virtualenv.python_version"), + "spark.pyspark.virtualenv.python_version is not set when using conda " + + "in interactive mode") + val pythonVersion = conf.get("spark.pyspark.virtualenv.python_version") + List(virtualEnvBinPath, + "create", "--prefix", virtualEnvName, + "python=" + pythonVersion, "-y") + } + } + execCommand(createEnvCommand) + + virtualPythonExec = virtualEnvName + "/bin/python" + if (virtualEnvType == "native" && pysparkRequirements.isDefined) { + // requirement file for native is not mandatory, run this only when requirement file + // is specified. + execCommand(List(virtualPythonExec, "-m", "pip", + "--cache-dir", System.getProperty("user.home"), + "install", "-r", pysparkRequirements.get)) + } + // install additional packages + if (initPythonPackages.isDefined) { + execCommand(List(virtualPythonExec, "-m", "pip", + "install") ::: initPythonPackages.get.split(":").toList); + } + logInfo(s"virtualenv is created at to $virtualPythonExec") + virtualPythonExec + } + + private def execCommand(commands: List[String]): Unit = { + logInfo("Running command:" + commands.mkString(" ")) + val pb = new ProcessBuilder(commands.asJava) + // don't inheritIO when it is used in launcher, because launcher would capture the standard + // output to assemble the spark-submit command. + if(!isLauncher) { + pb.inheritIO(); + } + // pip internally use environment variable `HOME` + pb.environment().put("HOME", System.getProperty("user.home")) + val proc = pb.start() + val exitCode = proc.waitFor() + if (exitCode != 0) { + throw new RuntimeException("Fail to run command: " + commands.mkString(" ")) + } + } +} diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala index ccb30e205ca40..f07e30684281b 100644 --- a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala @@ -26,7 +26,7 @@ import scala.collection.mutable.ArrayBuffer import scala.util.Try import org.apache.spark.{SparkConf, SparkUserAppException} -import org.apache.spark.api.python.PythonUtils +import org.apache.spark.api.python.{PythonUtils, VirtualEnvFactory} import org.apache.spark.internal.config._ import org.apache.spark.util.{RedirectThread, Utils} @@ -41,12 +41,17 @@ object PythonRunner { val otherArgs = args.slice(2, args.length) val sparkConf = new SparkConf() val secret = Utils.createSecret(sparkConf) - val pythonExec = sparkConf.get(PYSPARK_DRIVER_PYTHON) + var pythonExec = sparkConf.get(PYSPARK_DRIVER_PYTHON) .orElse(sparkConf.get(PYSPARK_PYTHON)) .orElse(sys.env.get("PYSPARK_DRIVER_PYTHON")) .orElse(sys.env.get("PYSPARK_PYTHON")) .getOrElse("python") + if (sparkConf.getBoolean("spark.pyspark.virtualenv.enabled", false)) { + val virtualEnvFactory = new VirtualEnvFactory(pythonExec, sparkConf, true) + pythonExec = virtualEnvFactory.setupVirtualEnv() + } + // Format python file paths before adding them to the PYTHONPATH val formattedPythonFile = formatPath(pythonFile) val formattedPyFiles = resolvePyFiles(formatPaths(pyFiles)) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index e83d82f847c61..f6227ea39c9a9 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -439,6 +439,19 @@ private[spark] class SparkSubmit extends Logging { sparkConf.set("spark.submit.pyFiles", localPyFiles) } + // for pyspark virtualenv + if (args.isPython) { + if (clusterManager != YARN && + args.sparkProperties.getOrElse("spark.pyspark.virtualenv.enabled", "false") == "true") { + printErrorAndExit("virtualenv is only supported in yarn mode") + } + if (args.sparkProperties.contains("spark.pyspark.virtualenv.requirements")) { + // distribute virtualenv requirement file as --files + args.files = mergeFileLists(args.files, + args.sparkProperties("spark.pyspark.virtualenv.requirements")) + } + } + // In YARN mode for an R app, add the SparkR package archive and the R package // archive containing all of the built R libraries to archives so that they can // be distributed with the job diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index d8794e8e551aa..5153b6a707035 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -24,7 +24,7 @@ import javax.annotation.concurrent.GuardedBy import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} import scala.concurrent.Future -import org.apache.spark.{ExecutorAllocationClient, SparkEnv, SparkException, TaskState} +import org.apache.spark._ import org.apache.spark.internal.Logging import org.apache.spark.rpc._ import org.apache.spark.scheduler._ @@ -98,7 +98,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp private val reviveThread = ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread") - class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) + class DriverEndpoint(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { // Executors that have been lost, but for which we don't yet know the real exit reason. @@ -228,6 +228,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp context.reply(true) case RetrieveSparkAppConfig => + val sparkProperties = new ArrayBuffer[(String, String)] + for ((key, value) <- scheduler.sc.conf.getAll) { + if (key.startsWith("spark.")) { + sparkProperties += ((key, value)) + } + } val reply = SparkAppConfig( sparkProperties, SparkEnv.get.securityManager.getIOEncryptionKey(), @@ -380,24 +386,16 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp protected def minRegisteredRatio: Double = _minRegisteredRatio override def start() { - val properties = new ArrayBuffer[(String, String)] - for ((key, value) <- scheduler.sc.conf.getAll) { - if (key.startsWith("spark.")) { - properties += ((key, value)) - } - } - // TODO (prashant) send conf instead of properties - driverEndpoint = createDriverEndpointRef(properties) + driverEndpoint = createDriverEndpointRef() } - protected def createDriverEndpointRef( - properties: ArrayBuffer[(String, String)]): RpcEndpointRef = { - rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties)) + protected def createDriverEndpointRef(): RpcEndpointRef = { + rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint()) } - protected def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = { - new DriverEndpoint(rpcEnv, properties) + protected def createDriverEndpoint(): DriverEndpoint = { + new DriverEndpoint(rpcEnv) } def stopExecutors() { diff --git a/docs/submitting-applications.md b/docs/submitting-applications.md index 77aa083c4a584..a5d8c59a71825 100644 --- a/docs/submitting-applications.md +++ b/docs/submitting-applications.md @@ -218,6 +218,117 @@ These commands can be used with `pyspark`, `spark-shell`, and `spark-submit` to For Python, the equivalent `--py-files` option can be used to distribute `.egg`, `.zip` and `.py` libraries to executors. +# VirtualEnv for PySpark +For simple PySpark application, we can use `--py-files` to add its dependencies. While for a large PySpark application, +usually you will have many dependencies which may also have transitive dependencies and even some dependencies need to be compiled +first to be installed. In this case `--py-files` is not so convenient. Luckily, in python world we have virtualenv/conda to help create isolated +python work environment. We also implement virtualenv in PySpark (It is only supported in yarn mode for now). User can use this feature +in 2 scenarios: +* Batch mode (submit spark app via spark-submit) +* Interactive mode (PySpark shell or other third party Spark Notebook) + +## Prerequisites +- Each node have virtualenv/conda, python-devel installed +- Each node is internet accessible (for downloading packages) + +## Batch Mode + +In batch mode, user need to specify the additional python packages before launching spark app. There're 2 approaches to specify that: +* Provide a requirement file which contains all the packages for the virtualenv. +* Specify packages via spark configuration `spark.pyspark.virtualenv.packages`. + +Here're several examples: + +{% highlight bash %} +### Setup virtualenv using native virtualenv on yarn-client mode +bin/spark-submit \ + --master yarn \ + --deploy-mode client \ + --conf "spark.pyspark.virtualenv.enabled=true" \ + --conf "spark.pyspark.virtualenv.type=native" \ + --conf "spark.pyspark.virtualenv.requirements=" \ + --conf "spark.pyspark.virtualenv.bin.path=" \ + + +### Setup virtualenv using conda on yarn-client mode +bin/spark-submit \ + --master yarn \ + --deploy-mode client \ + --conf "spark.pyspark.virtualenv.enabled=true" \ + --conf "spark.pyspark.virtualenv.type=conda" \ + --conf "spark.pyspark.virtualenv.requirements=" \ + --conf "spark.pyspark.virtualenv.bin.path=" \ + + +### Setup virtualenv using conda on yarn-client mode and specify packages via `spark.pyspark.virtualenv.packages` +bin/spark-submit \ + --master yarn \ + --deploy-mode client \ + --conf "spark.pyspark.virtualenv.enabled=true" \ + --conf "spark.pyspark.virtualenv.type=conda" \ + --conf "spark.pyspark.virtualenv.packages=numpy,pandas" \ + --conf "spark.pyspark.virtualenv.bin.path=" \ + +{% endhighlight %} + +### How to create requirement file ? +Usually before running distributed PySpark job, you need first to run it in local environment. It is encouraged to first create your own virtualenv for your project, so you know what packages you need. After you are confident with your work and want to move it to cluster, you can run the following command to generate the requirement file for virtualenv and conda. +- pip freeze > requirements.txt +- conda list --export > requirements.txt + +## Interactive Mode +In interactive mode,user can install python packages at runtime instead of specifying them in requirement file when submitting spark app. +Here are several ways to install packages + +{% highlight python %} +sc.install_packages("numpy") # install the latest numpy +sc.install_packages("numpy==1.11.0") # install a specific version of numpy +sc.install_packages(["numpy", "pandas"]) # install multiple python packages +{% endhighlight %} + +## How to specify python version ? +Due to different mechanism of native virtualenv and conda, specifying python version for them is a little different. +In batch mode, python version is specified in requirement file for conda, while for native virtualenv python is not incuded in requirement file so you have to specify python version by setting +`spark.pyspark.python` to the specific python version executable file. In interactive mode, you don't need to specify requirement file, in this case user can set `spark.pyspark.virtualenv.python_version` for python version when using conda. +And still use `spark.pyspark.python` for native virtualenv. + + +## PySpark VirtualEnv Configurations + + + + + + + + + + + + + + + + + + + + + + + + + + + +
Property NameDefaultMeaning
spark.pyspark.virtualenv.enabledfalseWhether to enable virtualenv
spark.pyspark.virtualenv.typenativeThere're 2 approaches to support virtualenv in pyspark. One is the native virtualenv another is conda. By default it is native virtualenv.
spark.pyspark.virtualenv.requirements(none)Requirement file where required packages are specified. To be noted, the requirement file of virtualenv and conda is not compatible with each other.
spark.pyspark.virtualenv.bin.path(none)Path for virtualenv executable file or conda executable file in the cluster. It requires that the virtualenv/conda installed in the same location across the cluster.
spark.pyspark.virtualenv.packages(none)Additional python packages need to be installed when creating virtualenv runtime. Packages are separated with comma, and you can also specify version. e.g. "numpy==1.13,pandas"
+ +## Penalty of virtualenv +For each executor, it needs to take some time to setup the virtualenv (installing the packages), and for the first time, it may be very slow. e.g. The first time I install numpy on each node it takes almost 3 minutes, because it needs to download it and compiling it to wheel format. But for the next time, it only takes 3 seconds to install numpy, because it would install the numpy from the cached wheel file. + +## Note +Virtualenv in PySpark is an experimental feature added from Spark 2.4.0 and may evolve in the future version. + # More Information Once you have deployed your application, the [cluster mode overview](cluster-overview.html) describes diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java index cc65f78b45c30..2842dd580a55a 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java +++ b/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java @@ -19,6 +19,7 @@ import java.io.File; import java.io.IOException; +import java.lang.reflect.Method; import java.util.*; import static org.apache.spark.launcher.CommandBuilderUtils.*; @@ -323,20 +324,34 @@ private List buildPySparkShellCommand(Map env) throws IO // 4. environment variable PYSPARK_PYTHON // 5. python List pyargs = new ArrayList<>(); - pyargs.add(firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON), + String pythonExec = firstNonEmpty(conf.get(SparkLauncher.PYSPARK_DRIVER_PYTHON), conf.get(SparkLauncher.PYSPARK_PYTHON), System.getenv("PYSPARK_DRIVER_PYTHON"), System.getenv("PYSPARK_PYTHON"), - "python")); - String pyOpts = System.getenv("PYSPARK_DRIVER_PYTHON_OPTS"); - if (conf.containsKey(SparkLauncher.PYSPARK_PYTHON)) { - // pass conf spark.pyspark.python to python by environment variable. - env.put("PYSPARK_PYTHON", conf.get(SparkLauncher.PYSPARK_PYTHON)); + "python"); + if (conf.getOrDefault("spark.pyspark.virtualenv.enabled", "false").equals("true")) { + try { + // setup virtualenv in launcher when virtualenv is enabled in pyspark shell + Class virtualEnvClazz = Class.forName("org.apache.spark.api.python.VirtualEnvFactory"); + Object virtualEnv = virtualEnvClazz.getConstructor(String.class, Map.class, Boolean.class) + .newInstance(pythonExec, conf, true); + Method virtualEnvMethod = virtualEnvClazz.getMethod("setupVirtualEnv"); + pythonExec = (String) virtualEnvMethod.invoke(virtualEnv); + pyargs.add(pythonExec); + } catch (Exception e) { + throw new IOException(e); + } + } else { + pyargs.add(pythonExec); + if (conf.containsKey(SparkLauncher.PYSPARK_PYTHON)) { + // pass conf spark.pyspark.python to python by environment variable. + env.put("PYSPARK_PYTHON", conf.get(SparkLauncher.PYSPARK_PYTHON)); + } } + String pyOpts = System.getenv("PYSPARK_DRIVER_PYTHON_OPTS"); if (!isEmpty(pyOpts)) { pyargs.addAll(parseOptionString(pyOpts)); } - return pyargs; } diff --git a/python/pyspark/context.py b/python/pyspark/context.py index ede3b6af0a8cf..abe5460c2d65e 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -28,7 +28,7 @@ from py4j.protocol import Py4JError -from pyspark import accumulators +from pyspark import accumulators, since from pyspark.accumulators import Accumulator from pyspark.broadcast import Broadcast, BroadcastPickleRegistry from pyspark.conf import SparkConf @@ -44,6 +44,7 @@ if sys.version > '3': xrange = range + basestring = unicode = str __all__ = ['SparkContext'] @@ -189,6 +190,14 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize, self._jsc.sc().register(self._javaAccumulator) self.pythonExec = os.environ.get("PYSPARK_PYTHON", 'python') + if self._conf.get("spark.pyspark.virtualenv.enabled") == "true": + self.pythonExec = self._conf.get("spark.pyspark.python", self.pythonExec) + virtualEnvBinPath = self._conf.get("spark.pyspark.virtualenv.bin.path") + + if virtualEnvBinPath is None: + raise Exception("spark.pyspark.virtualenv.enabled is set as true but no value for " + "spark.pyspark.virtualenv.bin.path") + self.pythonVer = "%d.%d" % sys.version_info[:2] # Broadcast's __reduce__ method stores Broadcast instances here. @@ -1035,6 +1044,46 @@ def getConf(self): conf.setAll(self._conf.getAll()) return conf + @since(2.4) + def install_packages(self, packages): + """ + Install python packages on all executors and driver through pip. pip will be installed + by default no matter using native virtualenv or conda. So it is guaranteed that pip is + available if virtualenv is enabled. + + .. note:: Experimental + .. versionadded:: 2.4 + + :param packages: string for single package or a list of string for multiple packages + """ + import functools + if self._conf.get("spark.pyspark.virtualenv.enabled") != "true": + raise RuntimeError("install_packages can only use called when " + "spark.pyspark.virtualenv.enabled is set as true") + if isinstance(packages, basestring): + packages = [packages] + # seems statusTracker.getExecutorInfos() will return driver + exeuctors, so -1 here. + num_executors = len(self._jsc.sc().statusTracker().getExecutorInfos()) - 1 + dummyRDD = self.parallelize(range(num_executors), num_executors) + + def _run_pip(packages, iterator): + import pip + return pip.main(["install"] + packages) + + # install package on driver first. if installation succeeded, continue the installation + # on executors, otherwise return directly. + if _run_pip(packages, None) != 0: + return + + virtualenvPackages = self._conf.get("spark.pyspark.virtualenv.packages") + if virtualenvPackages: + self._conf.set("spark.pyspark.virtualenv.packages", virtualenvPackages + ":" + + ":".join(packages)) + else: + self._conf.set("spark.pyspark.virtualenv.packages", ":".join(packages)) + + dummyRDD.foreachPartition(functools.partial(_run_pip, packages)) + def _test(): import atexit diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala index fa6dc2c479bbf..1626d78598d03 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesClusterSchedulerBackend.scala @@ -122,12 +122,13 @@ private[spark] class KubernetesClusterSchedulerBackend( // Don't do anything else - let event handling from the Kubernetes API do the Spark changes } - override def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = { - new KubernetesDriverEndpoint(rpcEnv, properties) + override def createDriverEndpoint(): DriverEndpoint = { + new KubernetesDriverEndpoint(rpcEnv) } - private class KubernetesDriverEndpoint(rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) - extends DriverEndpoint(rpcEnv, sparkProperties) { + private class KubernetesDriverEndpoint( + rpcEnv: RpcEnv) + extends DriverEndpoint(rpcEnv) { override def onDisconnected(rpcAddress: RpcAddress): Unit = { // Don't do anything besides disabling the executor - allow the Kubernetes API events to diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index 63bea3e7a5003..5997682d269e0 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -177,8 +177,8 @@ private[spark] abstract class YarnSchedulerBackend( } } - override def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = { - new YarnDriverEndpoint(rpcEnv, properties) + override def createDriverEndpoint(): DriverEndpoint = { + new YarnDriverEndpoint(rpcEnv) } /** @@ -196,8 +196,8 @@ private[spark] abstract class YarnSchedulerBackend( * This endpoint communicates with the executors and queries the AM for an executor's exit * status when the executor is disconnected. */ - private class YarnDriverEndpoint(rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) - extends DriverEndpoint(rpcEnv, sparkProperties) { + private class YarnDriverEndpoint(rpcEnv: RpcEnv) + extends DriverEndpoint(rpcEnv) { /** * When onDisconnected is received at the driver endpoint, the superclass DriverEndpoint