diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 1ffeb129880f..3cb1b37d194b 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -112,7 +112,8 @@ class SparkEnv (
def createPythonWorker(pythonExec: String, envVars: Map[String, String]): java.net.Socket = {
synchronized {
val key = (pythonExec, envVars)
- pythonWorkers.getOrElseUpdate(key, new PythonWorkerFactory(pythonExec, envVars)).create()
+ pythonWorkers.getOrElseUpdate(key,
+ new PythonWorkerFactory(pythonExec, envVars, conf)).create()
}
}
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
index 6a5e6f7c5afb..129c12120fea 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
@@ -18,18 +18,26 @@
package org.apache.spark.api.python
import java.io.{DataInputStream, DataOutputStream, InputStream, OutputStreamWriter}
+import java.io.{File, FileInputStream, FileOutputStream, IOException}
import java.net.{InetAddress, ServerSocket, Socket, SocketException}
import java.nio.charset.StandardCharsets
+import java.nio.file.{Paths, Files}
import java.util.Arrays
+import java.util.concurrent.atomic.AtomicInteger
+import java.util.zip.{ZipEntry, ZipInputStream}
import scala.collection.mutable
import scala.collection.JavaConverters._
+import org.apache.commons.io.IOUtils
import org.apache.spark._
import org.apache.spark.internal.Logging
import org.apache.spark.util.{RedirectThread, Utils}
-private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String, String])
+
+private[spark] class PythonWorkerFactory(pythonExec: String,
+ envVars: Map[String, String],
+ conf: SparkConf)
extends Logging {
import PythonWorkerFactory._
@@ -46,6 +54,32 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
val daemonWorkers = new mutable.WeakHashMap[Socket, Int]()
val idleWorkers = new mutable.Queue[Socket]()
var lastActivity = 0L
+ val sparkFiles = conf.getOption("spark.files")
+ val virtualEnvEnabled = conf.getBoolean("spark.pyspark.virtualenv.enabled", false)
+ val virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native")
+ val virtualEnvPath = conf.get("spark.pyspark.virtualenv.bin.path", "virtualenv")
+ val virtualEnvSystemSitePackages = conf.getBoolean(
+ "spark.pyspark.virtualenv.system_site_packages", false)
+ val virtualWheelhouse = conf.get("spark.pyspark.virtualenv.wheelhouse", "wheelhouse.zip")
+ // virtualRequirements is empty string by default
+ val virtualRequirements = conf.get("spark.pyspark.virtualenv.requirements", "")
+ val virtualIndexUrl = conf.get("spark.pyspark.virtualenv.index_url", null)
+ val virtualTrustedHost = conf.get("spark.pyspark.virtualenv.trusted_host", null)
+ val virtualInstallPackage = conf.get("spark.pyspark.virtualenv.install_package", null)
+ val upgradePip = conf.getBoolean("spark.pyspark.virtualenv.upgrade_pip", false)
+ val virtualUseIndex = conf.getBoolean("spark.pyspark.virtualenv.use_index", true)
+ var virtualEnvName: String = _
+ var virtualPythonExec: String = _
+
+ // search for "wheelhouse.zip" to trigger unzipping and installation of wheelhouse
+ // also search for "requirements.txt if provided"
+ for (filename <- sparkFiles.map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten) {
+ logDebug("Looking inside" + filename)
+ val file = new File(filename)
+ val prefixes = Iterator.iterate(file)(_.getParentFile).takeWhile(_ != null).toList.reverse
+ logDebug("=> prefixes" + prefixes)
+ }
+
new MonitorThread().start()
var simpleWorkers = new mutable.WeakHashMap[Socket, Process]()
@@ -55,6 +89,10 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
envVars.getOrElse("PYTHONPATH", ""),
sys.env.getOrElse("PYTHONPATH", ""))
+ if (virtualEnvEnabled) {
+ setupVirtualEnv()
+ }
+
def create(): Socket = {
if (useDaemon) {
synchronized {
@@ -68,6 +106,193 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
}
}
+
+ def unzipWheelhouse(zipFile: String, outputFolder: String): Unit = {
+ val buffer = new Array[Byte](1024)
+ try {
+ // output directory
+ val folder = new File(outputFolder);
+ if (!folder.exists()) {
+ folder.mkdir();
+ }
+
+ // zip file content
+ val zis: ZipInputStream = new ZipInputStream(new FileInputStream(zipFile));
+ // get the zipped file list entry
+ var ze: ZipEntry = zis.getNextEntry();
+
+ while (ze != null) {
+ if (!ze.isDirectory()) {
+ val fileName = ze.getName();
+ val newFile = new File(outputFolder + File.separator + fileName);
+ logDebug("Unzipping file " + newFile.getAbsoluteFile());
+
+ // create folders
+ new File(newFile.getParent()).mkdirs();
+ val fos = new FileOutputStream(newFile);
+ var len: Int = zis.read(buffer);
+
+ while (len > 0) {
+ fos.write(buffer, 0, len)
+ len = zis.read(buffer)
+ }
+ fos.close()
+ }
+ ze = zis.getNextEntry()
+ }
+ zis.closeEntry()
+ zis.close()
+ } catch {
+ case e: IOException => logError("exception caught: " + e.getMessage)
+ }
+ }
+
+ /**
+ * Create virtualenv using native virtualenv or conda
+ *
+ * Native Virtualenv:
+ * - Install virtualenv:
+ * virtualenv -p pythonExec [--system-site-packages] virtualenvName
+ * - if wheelhouse specified:
+ * - unzip wheelhouse
+ * - upgrade pip if set by conf (default: no)
+ * - install using pip:
+ *
+ * pip install -r requirement_file.txt \
+ * --find-links=wheelhouse \
+ * [--no-index] \
+ * [--index-url http://pypi.mirror/simple] [--trusted-host pypi.mirror] \
+ * [package.whl]
+ *
+ * else, if no wheelhouse is set:
+ *
+ * pip install -r requirement_file.txt \
+ * [--no-index] \
+ * [--index-url http://pypi.mirror/simple] [--trusted-host pypi.mirror] \
+ * [package.whl]
+ *
+ * Conda
+ * - Execute command: conda create --name virtualenvName --file requirement_file.txt -y
+ *
+ */
+ def setupVirtualEnv(): Unit = {
+ logDebug("Start to setup virtualenv...")
+ virtualEnvName = "virtualenv_" + conf.getAppId + "_" + WORKER_Id.getAndIncrement()
+ // use the absolute path when it is local mode otherwise just use filename as it would be
+ // fetched from FileServer
+ val pyspark_requirements =
+ if (Utils.isLocalMaster(conf)) {
+ virtualRequirements
+ } else {
+ virtualRequirements.split("/").last
+ }
+
+ logDebug("wheelhouse: " + virtualWheelhouse)
+ if (virtualWheelhouse != null &&
+ !virtualWheelhouse.isEmpty &&
+ Files.exists(Paths.get(virtualWheelhouse))) {
+ logDebug("Unziping wheelhouse archive " + virtualWheelhouse)
+ unzipWheelhouse(virtualWheelhouse, "wheelhouse")
+ }
+
+ val createEnvCommand =
+ if (virtualEnvType == "native") {
+ if (virtualEnvSystemSitePackages) {
+ Arrays.asList(virtualEnvPath, "-p", pythonExec, "--system-site-packages", virtualEnvName)
+ }
+ else {
+ Arrays.asList(virtualEnvPath, "-p", pythonExec, virtualEnvName)
+ }
+ } else {
+ // Conda creates everything and install the packages
+ var basePipArgs = mutable.ListBuffer[String]()
+ basePipArgs += (virtualEnvPath,
+ "create",
+ "--prefix",
+ System.getProperty("user.dir") + "/" + virtualEnvName)
+ if (pyspark_requirements != null && !pyspark_requirements.isEmpty) {
+ basePipArgs += ("--file", pyspark_requirements)
+ }
+ basePipArgs += ("-y")
+ basePipArgs.toList.asJava
+ }
+ execCommand(createEnvCommand)
+ virtualPythonExec = virtualEnvName + "/bin/python"
+
+ // virtualenv will be created in the working directory of Executor.
+ if (virtualEnvType == "native") {
+ var virtualenvPipExec = virtualEnvName + "/bin/pip"
+ var pipUpgradeArgs = mutable.ListBuffer[String]()
+ if (upgradePip){
+ pipUpgradeArgs += (virtualenvPipExec, "install", "--upgrade", "pip")
+ }
+ var basePipArgs = mutable.ListBuffer[String]()
+ basePipArgs += (virtualenvPipExec, "install")
+ if (pyspark_requirements != null && !pyspark_requirements.isEmpty) {
+ basePipArgs += ("-r", pyspark_requirements)
+ }
+ if (virtualWheelhouse != null &&
+ !virtualWheelhouse.isEmpty &&
+ Files.exists(Paths.get(virtualWheelhouse))) {
+ basePipArgs += ("--find-links=wheelhouse")
+ pipUpgradeArgs += ("--find-links=wheelhouse")
+ }
+ if (virtualIndexUrl != null && !virtualIndexUrl.isEmpty) {
+ basePipArgs += ("--index-url", virtualIndexUrl)
+ pipUpgradeArgs += ("--index-url", virtualIndexUrl)
+ } else if (! virtualUseIndex){
+ basePipArgs += ("--no-index")
+ pipUpgradeArgs += ("--no-index")
+ }
+ if (virtualTrustedHost != null && !virtualTrustedHost.isEmpty) {
+ basePipArgs += ("--trusted-host", virtualTrustedHost)
+ pipUpgradeArgs += ("--trusted-host", virtualTrustedHost)
+ }
+ if (upgradePip){
+ // upgrade pip in the virtualenv
+ execCommand(pipUpgradeArgs.toList.asJava)
+ }
+ if (virtualInstallPackage != null && !virtualInstallPackage.isEmpty) {
+ basePipArgs += (virtualInstallPackage)
+ }
+ execCommand(basePipArgs.toList.asJava)
+ }
+ // do not execute a second command line in "conda" mode
+ }
+
+ def execCommand(commands: java.util.List[String]): Unit = {
+ logDebug("Running command: " + commands.asScala.mkString(" "))
+
+ val pb = new ProcessBuilder(commands)
+ pb.environment().putAll(envVars.asJava)
+ pb.environment().putAll(System.getenv())
+ pb.environment().put("HOME", System.getProperty("user.home"))
+
+ val proc = pb.start()
+
+ val exitCode = proc.waitFor()
+ if (exitCode != 0) {
+ val errString = try {
+ val err = Option(proc.getErrorStream())
+ err.map(IOUtils.toString)
+ } catch {
+ case io: IOException => None
+ }
+
+ val outString = try {
+ val out = Option(proc.getInputStream())
+ out.map(IOUtils.toString)
+ } catch {
+ case io: IOException => None
+ }
+
+ throw new RuntimeException("Fail to run command: " + commands.asScala.mkString(" ") +
+ "\nOutput: " + outString +
+ "\nStderr: " + errString
+ )
+ }
+ }
+
/**
* Connect to a worker launched through pyspark/daemon.py, which forks python processes itself
* to avoid the high cost of forking from Java. This currently only works on UNIX-based systems.
@@ -111,7 +336,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(Array(127, 0, 0, 1)))
// Create and start the worker
- val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", "pyspark.worker"))
+ val realPythonExec = if (virtualEnvEnabled) virtualPythonExec else pythonExec
+ logDebug(s"Starting worker with pythonExec: ${realPythonExec}")
+ val pb = new ProcessBuilder(Arrays.asList(realPythonExec, "-m", "pyspark.worker"))
val workerEnv = pb.environment()
workerEnv.putAll(envVars.asJava)
workerEnv.put("PYTHONPATH", pythonPath)
@@ -154,7 +381,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
try {
// Create and start the daemon
- val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", "pyspark.daemon"))
+ val realPythonExec = if (virtualEnvEnabled) virtualPythonExec else pythonExec
+ logDebug(s"Starting daemon with pythonExec: ${realPythonExec}")
+ val pb = new ProcessBuilder(Arrays.asList(realPythonExec, "-m", "pyspark.daemon"))
val workerEnv = pb.environment()
workerEnv.putAll(envVars.asJava)
workerEnv.put("PYTHONPATH", pythonPath)
@@ -307,6 +536,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
}
private object PythonWorkerFactory {
+ val WORKER_Id = new AtomicInteger()
val PROCESS_WAIT_TIMEOUT_MS = 10000
val IDLE_WORKER_TIMEOUT_MS = 60000 // kill idle workers after 1 minute
}
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
index f1761e7c1ec9..f9c4a1544fe9 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala
@@ -505,8 +505,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
| dependency conflicts.
| --repositories Comma-separated list of additional remote repositories to
| search for the maven coordinates given with --packages.
- | --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place
- | on the PYTHONPATH for Python apps.
+ | --py-files PY_FILES Comma-separated list of .zip, .egg, .whl or .py files to
+ | place on the PYTHONPATH for Python apps.
| --files FILES Comma-separated list of files to be placed in the working
| directory of each executor.
|
diff --git a/docs/cluster-overview.md b/docs/cluster-overview.md
index 814e4406cf43..6bf5ae6aab65 100644
--- a/docs/cluster-overview.md
+++ b/docs/cluster-overview.md
@@ -87,9 +87,18 @@ The following table summarizes terms you'll see used to refer to cluster concept
| Application jar |
- A jar containing the user's Spark application. In some cases users will want to create
- an "uber jar" containing their application along with its dependencies. The user's jar
- should never include Hadoop or Spark libraries, however, these will be added at runtime.
+ A jar containing the user's Spark application (for Java and Scala driver). In some cases
+ users will want to create an "uber jar" containing their application along with its
+ dependencies. The user's jar should never include Hadoop or Spark libraries, however, these
+ will be added at runtime.
+ |
+
+
+ | Application Wheelhouse |
+
+ An archive containing precompiled wheels of the user's PySpark application and dependencies
+ (for Python driver). The user's wheelhouse should not include jars, only Python Wheel files
+ for one or more architectures.
|
diff --git a/docs/configuration.md b/docs/configuration.md
index 82ce232b336d..04c51252d214 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -392,7 +392,7 @@ Apart from these, the following properties are also available, and may be useful
spark.submit.pyFiles |
|
- Comma-separated list of .zip, .egg, or .py files to place on the PYTHONPATH for Python apps.
+ Comma-separated list of .zip, .egg, .whl or .py files to place on the PYTHONPATH for Python apps.
|
diff --git a/docs/programming-guide.md b/docs/programming-guide.md
index 74d5ee1ca6b3..032f7fd01245 100644
--- a/docs/programming-guide.md
+++ b/docs/programming-guide.md
@@ -24,7 +24,7 @@ along with if you launch Spark's interactive shell -- either `bin/spark-shell` f
-Spark {{site.SPARK_VERSION}} is built and distributed to work with Scala {{site.SCALA_BINARY_VERSION}}
+Spark {{site.SPARK_VERSION}} is built and distributed to work with Scala {{site.SCALA_BINARY_VERSION}}
by default. (Spark can be built to work with other versions of Scala, too.) To write
applications in Scala, you will need to use a compatible Scala version (e.g. {{site.SCALA_BINARY_VERSION}}.X).
@@ -211,7 +211,7 @@ For a complete list of options, run `spark-shell --help`. Behind the scenes,
In the PySpark shell, a special interpreter-aware SparkContext is already created for you, in the
variable called `sc`. Making your own SparkContext will not work. You can set which master the
-context connects to using the `--master` argument, and you can add Python .zip, .egg or .py files
+context connects to using the `--master` argument, and you can add Python .zip, .egg, .whl or .py files
to the runtime path by passing a comma-separated list to `--py-files`. You can also add dependencies
(e.g. Spark Packages) to your shell session by supplying a comma-separated list of maven coordinates
to the `--packages` argument. Any additional repositories where dependencies might exist (e.g. SonaType)
@@ -240,13 +240,13 @@ use IPython, set the `PYSPARK_DRIVER_PYTHON` variable to `ipython` when running
$ PYSPARK_DRIVER_PYTHON=ipython ./bin/pyspark
{% endhighlight %}
-To use the Jupyter notebook (previously known as the IPython notebook),
+To use the Jupyter notebook (previously known as the IPython notebook),
{% highlight bash %}
$ PYSPARK_DRIVER_PYTHON=jupyter ./bin/pyspark
{% endhighlight %}
-You can customize the `ipython` or `jupyter` commands by setting `PYSPARK_DRIVER_PYTHON_OPTS`.
+You can customize the `ipython` or `jupyter` commands by setting `PYSPARK_DRIVER_PYTHON_OPTS`.
After the Jupyter Notebook server is launched, you can create a new "Python 2" notebook from
the "Files" tab. Inside the notebook, you can input the command `%pylab inline` as part of
@@ -812,7 +812,7 @@ The variables within the closure sent to each executor are now copies and thus,
In local mode, in some circumstances the `foreach` function will actually execute within the same JVM as the driver and will reference the same original **counter**, and may actually update it.
-To ensure well-defined behavior in these sorts of scenarios one should use an [`Accumulator`](#accumulators). Accumulators in Spark are used specifically to provide a mechanism for safely updating a variable when execution is split up across worker nodes in a cluster. The Accumulators section of this guide discusses these in more detail.
+To ensure well-defined behavior in these sorts of scenarios one should use an [`Accumulator`](#accumulators). Accumulators in Spark are used specifically to provide a mechanism for safely updating a variable when execution is split up across worker nodes in a cluster. The Accumulators section of this guide discusses these in more detail.
In general, closures - constructs like loops or locally defined methods, should not be used to mutate some global state. Spark does not define or guarantee the behavior of mutations to objects referenced from outside of closures. Some code that does this may work in local mode, but that's just by accident and such code will not behave as expected in distributed mode. Use an Accumulator instead if some global aggregation is needed.
@@ -1231,8 +1231,8 @@ storage levels is:
-**Note:** *In Python, stored objects will always be serialized with the [Pickle](https://docs.python.org/2/library/pickle.html) library,
-so it does not matter whether you choose a serialized level. The available storage levels in Python include `MEMORY_ONLY`, `MEMORY_ONLY_2`,
+**Note:** *In Python, stored objects will always be serialized with the [Pickle](https://docs.python.org/2/library/pickle.html) library,
+so it does not matter whether you choose a serialized level. The available storage levels in Python include `MEMORY_ONLY`, `MEMORY_ONLY_2`,
`MEMORY_AND_DISK`, `MEMORY_AND_DISK_2`, `DISK_ONLY`, and `DISK_ONLY_2`.*
Spark also automatically persists some intermediate data in shuffle operations (e.g. `reduceByKey`), even without users calling `persist`. This is done to avoid recomputing the entire input if a node fails during the shuffle. We still recommend users call `persist` on the resulting RDD if they plan to reuse it.
@@ -1374,7 +1374,7 @@ res2: Long = 10
While this code used the built-in support for accumulators of type Long, programmers can also
create their own types by subclassing [AccumulatorV2](api/scala/index.html#org.apache.spark.AccumulatorV2).
-The AccumulatorV2 abstract class has several methods which need to override:
+The AccumulatorV2 abstract class has several methods which need to override:
`reset` for resetting the accumulator to zero, and `add` for add anothor value into the accumulator, `merge` for merging another same-type accumulator into this one. Other methods need to override can refer to scala API document. For example, supposing we had a `MyVector` class
representing mathematical vectors, we could write:
diff --git a/docs/submitting-applications.md b/docs/submitting-applications.md
index 6fe304999587..57d3c8c2dafe 100644
--- a/docs/submitting-applications.md
+++ b/docs/submitting-applications.md
@@ -20,7 +20,8 @@ script as shown here while passing your jar.
For Python, you can use the `--py-files` argument of `spark-submit` to add `.py`, `.zip` or `.egg`
files to be distributed with your application. If you depend on multiple Python files we recommend
-packaging them into a `.zip` or `.egg`.
+packaging them into a `.zip` or `.egg`. For a more complex packaging system for Python, see the
+section *Advanced Dependency Management* bellow)
# Launching Applications with spark-submit
@@ -192,8 +193,445 @@ with `--packages`. All transitive dependencies will be handled when using this c
repositories (or resolvers in SBT) can be added in a comma-delimited fashion with the flag `--repositories`.
These commands can be used with `pyspark`, `spark-shell`, and `spark-submit` to include Spark Packages.
-For Python, the equivalent `--py-files` option can be used to distribute `.egg`, `.zip` and `.py` libraries
-to executors.
+# Wheel Support for Pyspark
+
+For Python, the `--py-files` option can be used to distribute `.egg`, `.zip`, `.whl` and `.py`
+libraries to executors. This modigies the `PYTHONPATH` environment variable to inject this
+dependency into the executed script environment.
+
+This solution does not scale well with complex project with multiple dependencies.
+
+There are however other solutions to deploy projects with dependencies:
+- Describe dependencies inside a `requirements.txt` and have everything installed into an isolated
+ virtual environment
+- Distribute a Python Source Distribution Package or a Wheel with a complete Python module
+
+Dependency distribution is also configurable:
+- Let each node of the Spark Cluster automatically fetch and install dependencies from Pypi or any
+ configured Pypi mirror
+- Distribute a single archive *wheelhouse* with all dependencies precompiled in Wheels files
+
+**What is a Wheel?**
+
+ A Wheel is a Python packaging format that contains a fully prepared module for a given system or
+ environment. Wheel also allow to be installed on several system. For example, modules such as
+ Numpy requires compilation of some C files, so the Wheels allows to store the object files already
+ compiled inside the various wheel packages. That why wheel might be system dependent (ex: 32
+ bits/64 bits, Linux/Windows/Mac OS, ...). The Wheel format also provides a unified metadata
+ description, so tools such as `pip install` will automatically select the precompiled wheel
+ package that best fit the current system.
+
+ Said differently, if the wheel is already prepared, no compilation occurs during installation.
+
+**How to deploy Wheels or Wheelhouse**
+
+The usage of the deployment methods described in the current section of this documentation is
+recommended for the following cases:
+
+- your PySpark script has increased in complexity and dependencies. For example, it now depends on
+ numpy for a numerical calculus, and a bunch extra packages from Pypi you are used to work with
+- you project might also depends on package that are *not* on Pypi, for example, Python libraries
+ internal to your company
+- you do not want to deal with the IT department each time you need a new Python package on each
+ Spark node. For example, you need an upgraded version of a module A
+- you have conflict with some version of dependencies, you need a certain version of a given
+ package, while another team wants another version of the same package
+
+All deployment methods described bellow involve the creation of a temporary virtual environment,
+using `virtualenv`, on each node of the Spark Cluster. This will be automatically done during the
+`spark-submit` step.
+
+Please also be aware than Wheelhouse support with a virtualenv is only supported for YARN and
+standalone cluster. Mesos cluster does not support Wheelhouse yet. You can however send wheel with
+`--py-files` to inject whl in the `PYTHON_PATH`.
+
+The following methods only differe by the way Wheels files are sent or retrieved in your Spark
+cluster nodes:
+
+- use *Big Fat Wheelhouse* when all node of your Spark Cluster does not have access to the Internet
+ and so cannot hit `pypi.python.org` website, and if your do not have any Pypi mirror internal to
+ your organization.
+- if your Spark cluster have access to the official Pypi or a mirror, you can configure
+ `spark-submit` so `pip` will automatically find and download the dependency wheels.
+- You can use `--py-files` to send all wheels manually.
+
+**Pypi Mirror**
+
+Mirroring might be useful for company with a strict Internet access policy, or weird Proxy settings.
+
+There are several solutions for mirroring Pypi internally to your organization:
+- http://doc.devpi.net/latest/: a Free mirroring solution
+- Artifactory provides
+ [Pypi mirroring](https://www.jfrog.com/confluence/display/RTF/PyPI+Repositories) in its
+ non-commercial license
+
+**Project packaging Workflow**
+
+The following workflow describes how to deploy a complex Python project with different dependencies
+to your Spark Cluster, for example, you are writing a PySpark job that depends on a library that is
+only available from inside your organization, or you use a special version of Pypi package such as
+Numpy.
+
+The workflow is now:
+
+- create a virtualenv in a directory, for instance `env`, if not already in one:
+
+{% highlight bash %}
+virtualenv env
+{% endhighlight %}
+
+- Turn your script into a real standard Python package. You need to write a standard `setup.py` to
+ make your package installable through `pip install -e . -r requirements.txt` (do not use `python
+ setup.py develop` or `python setup.py install` with `pip`, [it is not
+ recommended](https://pip.pypa.io/en/stable/reference/pip_install/#hash-checking-mode) and does not
+ handle editable dependencies correctly).
+
+- Write a `requirements.txt`. It is recommended to freeze the version of each dependency in this
+ description file. This way your job will be garentee to be deployable everytime, even if a buggy
+ version of a dependency is released on Pypi.
+
+ Example of `requirements.txt` with frozen package version:
+
+ astroid==1.4.6
+ autopep8==1.2.4
+ click==6.6
+ colorama==0.3.7
+ enum34==1.1.6
+ findspark==1.0.0
+ first==2.0.1
+ hypothesis==3.4.0
+ lazy-object-proxy==1.2.2
+ linecache2==1.0.0
+ pbr==1.10.0
+ pep8==1.7.0
+ pip-tools==1.6.5
+ py==1.4.31
+ pyflakes==1.2.3
+ pylint==1.5.6
+ pytest==2.9.2
+ six==1.10.0
+ spark-testing-base==0.0.7.post2
+ traceback2==1.4.0
+ unittest2==1.1.0
+ wheel==0.29.0
+ wrapt==1.10.8
+
+ Ensure you write a clean `setup.py` that refers this `requirements.txt` file:
+
+ from pip.req import parse_requirements
+
+ # parse_requirements() returns generator of pip.req.InstallRequirement objects
+ install_reqs = parse_requirements(, session=False)
+
+ # reqs is a list of requirement
+ # e.g. ['django==1.5.1', 'mezzanine==1.4.6']
+ reqs = [str(ir.req) for ir in install_reqs]
+
+ setup(
+ ...
+ install_requires=reqs
+ )
+
+- If you want to deploy a *Big Fat Wheelhouse* archive, ie, a zip file containing all Wheels:
+
+ Create the wheelhouse for your current project:
+{% highlight bash %}
+pip install wheelhouse
+pip wheel . --wheel-dir wheelhouse -r requirements.txt
+{% endhighlight %}
+
+ Note: please use Pip >= 8.1.3 to generate all wheels, even for "editable" dependencies. To specify
+ reference to other internal project, you can use the following syntax in your `requirements.txt`
+
+ -e git+ssh://local.server/a/project/name@7f4a7623aa219743e9b96b228b4cd86fe9bc5595#egg=package_name
+
+ It is highly recommended to specify the SHA-1, a tag or a branch of the internal dependency, and
+ update it at each release. Do not forget to specify the `egg` package name.
+
+ Documentation: [Requirements Files](https://pip.pypa.io/en/latest/user_guide/#requirements-files)
+
+ Usually, most dependencies will not be compiled if it has been done previously, `pip` is able to
+ cache all wheel file be default in the user home cache, and if wheels can be found from Pypi they
+ are automatically retrieved.
+
+ At the end you have all the `.whl` required *for your current system* in the `wheelhouse`
+ directory.
+
+ Zip the {{wheelhouse}} directory into a {{wheelhouse.zip}}.
+
+{% highlight bash %}
+rm -fv wheelhouse.zip
+zip -vrj wheelhouse.zip wheelhouse
+{% endhighlight %}
+
+ You now have a `wheelhouse.zip` archive with all the dependencies of your project *and also your
+ project module*. For example, if you have defined a module `mymodule` in `setup.py`, it will be
+ automatically installed with `spark-submit`.
+
+- If you want to let Spark cluster access to Pypi or a Pypi mirror, just build the source
+ distribution package:
+
+ python setup.py sdist
+
+ Or the wheel with:
+
+ python setup.py bdist_wheel
+
+ You now have a tar.gz or a whl file containing your current project.
+
+ Note: most of the time, your Spark job will not have low-level dependency, so building a source
+ distribution package is enough.
+
+To execute your application, you need a *launcher script*, ie, a script which is executed directly
+and will call your built package. There is no equivalent to the `--class` argument of `spark-submit`
+for Python jobs. Please note that this file might only contain a few lines. For example:
+
+{% highlight python %}
+/#!/usr/bin/env python
+
+from mypackage import run
+run()
+{% endhighlight %}
+
+Note that all the logic is stored into the `mypackage` module, which has been declared in you
+`setup.py`.
+
+**Deployment Modes support**
+
+In **Standalone**, only the `client` deployment mode is supported. You cannot use 'cluster'
+deployment. This means the driver will be executed from the machine that execute the `spark-submit`.
+So you **need** to execute the `spark-submit` from within your development virtualenv.
+
+Workers will perform installation of dedicated virtualenv, if `spark.pyspark.virtualenv.enabled` is
+set to `True`.
+
+In **YARN**, if you use `client` deployment mode, you also need to execute the `spark-submit` from
+your virtualenv. You use `cluster` deployment, the virtualenv installation will be performed like on
+all workers.
+
+There is no support for **Mesos** cluster with Virtualenv. Note than you can send wheel file with
+`--py-files` and they will be added to `PYTHON_PATH`. This is not recommended since you will not
+benefit from the advantages of installing with `pip`:
+
+- you cannot have `pip` automatically retrieve missing Python dependencies from `pypi.python.org`
+- you cannot prepare and send several version of the same package, to support different
+ architectures (ex: some worker uses python 3.4, others python 3.4, or some machines are 32 bits
+ or other are 64 bits, or some are under MacOS X and other are under Linux,...)
+
+To have these advantages, you can use the Wheelhouse deployment described bellow.
+
+**Submitting your package to the Spark Cluster:**
+
+Please remember that in "standalone" Spark instance, only the `client` deployment mode is supported.
+
+Deploy a script with many dependencies to your standalone cluster:
+
+{% highlight bash %}
+source /path/to/your/project/env/bin/activate # ensure you are in virtualenv
+bin/spark-submit
+ --master spark://localhost
+ --deploy-mode client
+ --jars java-dependencies.jar
+ --files /path/to/your/project/requirements.txt
+ --conf "spark.pyspark.virtualenv.enabled=true"
+ --conf "spark.pyspark.virtualenv.requirements=requirements.txt"
+ --conf "spark.pyspark.virtualenv.index_url=https://pypi.python.org/simple"
+ /path/to/launch/simple_script_with_some_dependencies.py
+{% endhighlight %}
+
+Execution:
+- a virtualenv is created on each worker
+- the dependencies described in `requirements.txt` are installed in each worker.
+- dependencies are downloaded from the Pypi repository
+- the driver is executed on the client, so this command line should be executed from *within* a
+ virtualenv.
+
+
+Deploy a simple runner script along with a source distribution package of the complete job project:
+
+{% highlight bash %}
+source /path/to/your/project/env/bin/activate # ensure you are in virtualenv
+bin/spark-submit
+ --master spark://localhost
+ --deploy-mode client
+ --jars some-java-dependencies
+ --files /path/to/mypackage_sdist.tag.gz
+ --conf "spark.pyspark.virtualenv.enabled=true"
+ --conf "spark.pyspark.virtualenv.install_package=mypackage_sdist.tar.gz"
+ --conf "spark.pyspark.virtualenv.index_url=https://pypi.python.org/simple"
+ /path/to/launch/runner_script.py
+{% endhighlight %}
+
+Execution:
+- a virtualenv is created on each worker
+- the package `mypackage_sdist.tar.gz` is installed with pip, so if the `setup.py`` refers
+ `requirements.txt` properly, each the dependencies are installed in each worker.
+- dependencies are downloaded from the Pypi repository
+- the driver is executed on the client, so this command line should be executed from *within* a
+ virtualenv.
+- the runner script simply call an entry point within `mypackage`.
+
+
+Deploy a wheelhouse package to your YARN cluster with:
+
+{% highlight bash %}
+bin/spark-submit
+ --master yarn
+ --deploy-mode cluster
+ --jars java-dependencies.jar
+ --files /path/to/your/project/requirements.txt,/path/to/your/project/wheelhouse.zip
+ --conf "spark.pyspark.virtualenv.enabled=true"
+ --conf "spark.pyspark.virtualenv.requirements=requirements.txt"
+ --conf "spark.pyspark.virtualenv.install_package=a_package.whl"
+ --conf "spark.pyspark.virtualenv.index_url=https://pypi.python.org/simple"
+ /path/to/launch/launcher_script.py
+{% endhighlight %}
+
+Execution:
+- a virtualenv is created on each worker
+- the dependencies described in `requirements.txt` are installed in each worker
+- dependencies are found into the wheelhouse archive. If not found, it will be downloaded from Pypi
+ repository (to avoid this, remove `spark.pyspark.virtualenv.index_url` option)
+- the driver is executed on the cluster, so this command line does *not* have to be executed from
+ within a virtualenv.
+
+
+To deploy against an internal Pypi mirror (HTTPS mirror without certificates), force pip
+upgrade (it is a good practice to always be at the latest version of pip), and inject some wheels
+manually to the `PYTHONPATH`:
+
+{% highlight bash %}
+bin/spark-submit
+ --master yarn
+ --deploy-mode cluster
+ --jars java-dependencies.jar
+ --files /path/to/your/project/requirements.txt
+ --py-files /path/to/your/project/binary/myproject.whl,/path/to/internal/dependency/other_project.whl
+ --conf "spark.pyspark.virtualenv.enabled=true"
+ --conf "spark.pyspark.virtualenv.requirements=requirements.txt"
+ --conf "spark.pyspark.virtualenv.upgrade_pip=true"
+ --conf "spark.pyspark.virtualenv.index_url=https://pypi.mycompany.com/"`
+ --conf "spark.pyspark.virtualenv.trusted_host=pypi.mycompany.com"
+ /path/to/launch/script.py
+{% endhighlight %}
+
+Execution:
+- a virtualenv is created on each worker
+- the pip tool is updated to the latest version
+- the dependencies described in `requirements.txt` are installed in each worker
+- dependencies are found into the wheelhouse archive. If not found, it will be downloaded from a Pypi
+ mirror
+- the two wheels set in the `--py-files` are added to the `PYTHONPATH`. You can use this to avoid
+ describing them in the `requirements.txt` and send them directly. Might be useful for development,
+ however for production you might want to have these dependency projects available on an internal
+ repository and referenced by URL.
+- the driver is executed on the cluster, so this command line does *not* have to be executed from
+ within a virtualenv.
+
+
+Here are the description of the configuration of the support of Wheel and Wheelhouse in Python:
+
+- `--jars java-dependencies.jar`: you still need to define the Java jars your requires inside a big
+ fat jar file with this argument, for instance if you use Spark Streaming.
+- `spark.pyspark.virtualenv.enabled`: enable the creation of the virtualenv environment at each
+ deployment and trigger the installation of wheels. This virtual environment creation has a time
+ and disk space cost. Please note that, when deploying a Big Fat Wheelhouse, *no network*
+ connection to pypi.python.org or any mirror will be made.
+- `--files /path/to/your/project/requirements.txt,/path/to/your/project/wheelhouse.zip`: this will
+ simply copy these two files to the root of the job working directory on each node. Enabling
+ 'virtualenv' will automatically use these files when they are found. Having at least
+ `requirements.txt` is mandatory.
+- `--conf spark.pyspark.virtualenv.type=conda`: you can specify the format of your requirements.txt.
+ This parameter is optional. The default value, `native`, will use the native `pip` tool to install
+ your package on each Spark node. You can also use `conda` for Conda package manager.
+- `--conf spark.pyspark.virtualenv.requirements=other_requirement.txt`: specify the name of the
+ requirement file. This parameter is optional. The default value is `requirements.txt`. Do not
+ forget to copy this file to the cluster with `--files` argument.
+- `--conf spark.pyspark.virtualenv.bin.path=venv`: specify the command to create the virtual env.
+ This parameter is optional. The default value, `virtualenv`, should work on every kind of system,
+ but if you need to specify a different command line name (ex: `venv` for Python 3) or specify a
+ full path, set this value.
+- `--conf spark.pyspark.virtualenv.wheelhouse=mywheelhouse.zip`: name of the wheelhouse archive.
+ This parameter is optional. The default value is `wheelhouse.zip`. Do not forget to move this file
+ to the cluster with `--files` argument. If found, the file will be unzipped to the `wheelhouse`
+ directory. It is not mandatory to use this archive to transfer modules found on Pypi if you have
+ Internet connectivity or a mirror of Pypi reachable from each worker. Use it primarily for
+ transfering precompiled, internal module dependencies.
+- `--conf spark.pyspark.virtualenv.upgrade_pip=true`: upgrade `pip` automatically. It is a good
+ behavior to always have the latest `pip` version. Default: `false`.
+- `--conf spark.pyspark.virtualenv.index_url=http://internalserver/pypimirror`: change the Pypi
+ repository URL (Default: `https://pypi.python.org/simple`, requires a network connectivity)
+- `--conf spark.pyspark.virtualenv.trusted_host=internalserver`: Execute `pip` with the
+ `--trusted-host` argument, ie, provide the name of the server hostname to trusted, even though it
+ does not have valid or any HTTPS. Useful when using a Pypi mirror behind HTTPS without a full
+ certificate chain.
+- `--conf spark.pyspark.virtualenv.system_site_packages=true`: this makes virtual environment
+ reference also the packages installed on the system. The default value, `false` will force
+ developers to specify all dependencies and let `pip` install them from `requirements.txt`. Set the
+ value to `true` to use preinstalled packages on each node. A virtualenv will still be created so
+ installing new packages will not compromise the worker Python installation.
+- `--conf spark.pyspark.virtualenv.use_index=false`: if set to `false`, don't try to download
+ missing dependencies from Pypi or the index URL set by `spark.pyspark.virtualenv.index_url`, in
+ which case all dependencies should be packaged in the `wheelhouse.zip` archive. Default is set to
+ `true`. Please note that if `spark.pyspark.virtualenv.index_url` is manually set,
+ `spark.pyspark.virtualenv.use_index` will be forced to `true`.
+- `--py-files /path/to/a/project/aproject.whl,/path/to/internal/dependency/other_project.whl`:
+ this allows to copy wheel to the cluster nodes and install them with `pip`. Using this arguments
+ implies two things:
+ - all wheels will be installed, you cannot have one wheel for linux 32 bits and another one for
+ linux 64 bits. In this situation zip them into a single archive and use `--files
+ wheelhouse.zip`
+ - you need to create the wheel of other internal dependencies (ie that are not on Pypi) manually
+ or select them after having made a `pip wheel`
+- `/path/to/launch/script.py`: path to the runner script. Like said earlier, it is recommended to
+ keep this file as short as possible, and only call a `run()`-like method from a package defined in
+ your `setup.py`.
+
+**Advantages**
+
+- Installation is fast and does not require compilation
+- No Internet connectivity needed when using a Big Fat Wheelhouse, no need to mess with your
+ corporate proxy, or even require a local mirroring of pypi.
+- Package versions are isolated, so two Spark job can depends on two different version of a given
+ library without any conflict
+- wheels are automatically cached (for pip version > 7.0), at the worst case, only the first time it
+ is downloaded the compilation might take time. Please note that compilation is quite rare since
+ most of the time the package on Pypi already provides precompiled wheels for major Python version
+ and systems (Ex: look all the wheels provided by [Numpy](https://pypi.python.org/pypi/numpy)).
+
+**Disadvantages**
+
+- Creating a virtualenv at each execution takes time, not that much, but still it can take some
+ seconds
+- And consume more disk space than a simpler script without any dependency
+- This is slighly more complex to setup than sending a simple python script
+- The support of heterogenous Spark nodes (ex: Linux 32 bits/64 bits,...) is possible but you need
+ to ensure **all** wheels are in the wheelhouse, to ensure pip is able to install all needed
+ package on each node of you Spark cluster. The complexity of this task, that might be not trivial,
+ is moved on the hands of the script developer and not on the IT department
+
+
+**Configuration Pypi proxy**
+
+To tell `spark-submit` to use a Pypi mirror internal to your company, you can use
+`--conf "spark.pyspark.virtualenv.index_url=http://pypi.mycompany.com/"` argument.
+
+You can also update the {{~/.pip/pip.conf}} file of each node of your Spark cluster to point by
+default to your mirror:
+
+{% highlight ini %}
+[global]
+; Low timeout
+timeout = 20
+index-url = https://<user>:<pass>@pypi.mycompany.org/
+{% endhighlight %}
+
+Note: pip does not use system certificates, if you need to set up on manually, add this line in the
+`[global]` section of `pip.conf`:
+
+{% highlight ini %}
+cert = /path/to/your/internal/certificates.pem
+{% endhighlight %}
# More Information
diff --git a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
index ea56214d2390..c0a787f7b759 100644
--- a/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
+++ b/launcher/src/main/java/org/apache/spark/launcher/SparkLauncher.java
@@ -351,7 +351,7 @@ public SparkLauncher addFile(String file) {
}
/**
- * Adds a python file / zip / egg to be submitted with the application.
+ * Adds a python file / zip / whl / egg to be submitted with the application.
*
* @param file Path to the file.
* @return This launcher.
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index a3dd1950a522..49823a83d068 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -22,7 +22,6 @@
import signal
import sys
import threading
-from threading import RLock
from tempfile import NamedTemporaryFile
from pyspark import accumulators
@@ -39,6 +38,7 @@
from pyspark.status import StatusTracker
from pyspark.profiler import ProfilerCollector, BasicProfiler
+
if sys.version > '3':
xrange = range
@@ -66,10 +66,10 @@ class SparkContext(object):
_jvm = None
_next_accum_id = 0
_active_spark_context = None
- _lock = RLock()
- _python_includes = None # zip and egg files that need to be added to PYTHONPATH
+ _lock = threading.RLock()
+ _python_includes = None # zip, egg, whl and jar files that need to be added to PYTHONPATH
- PACKAGE_EXTENSIONS = ('.zip', '.egg', '.jar')
+ PACKAGE_EXTENSIONS = ('.zip', '.egg', '.jar', '.whl')
def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
environment=None, batchSize=0, serializer=PickleSerializer(), conf=None,
@@ -82,9 +82,9 @@ def __init__(self, master=None, appName=None, sparkHome=None, pyFiles=None,
(e.g. mesos://host:port, spark://host:port, local[4]).
:param appName: A name for your job, to display on the cluster web UI.
:param sparkHome: Location where Spark is installed on cluster nodes.
- :param pyFiles: Collection of .zip or .py files to send to the cluster
- and add to PYTHONPATH. These can be paths on the local file
- system or HDFS, HTTP, HTTPS, or FTP URLs.
+ :param pyFiles: Collection of .zip, .egg, .whl or .py files to send
+ to the cluster and add to PYTHONPATH.
+ These can be paths on the local file system or HDFS, HTTP, HTTPS, or FTP URLs.
:param environment: A dictionary of environment variables to set on
worker nodes.
:param batchSize: The number of Python objects represented as a single
@@ -177,6 +177,17 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize,
self._jsc.sc().register(self._javaAccumulator)
self.pythonExec = os.environ.get("PYSPARK_PYTHON", 'python')
+ if self._conf.get("spark.pyspark.virtualenv.enabled") == "true":
+ requirements = self._conf.get("spark.pyspark.virtualenv.requirements",
+ "requirements.txt")
+ virtualEnvBinPath = self._conf.get("spark.pyspark.virtualenv.bin.path", "virtualenv")
+ if not requirements:
+ raise Exception("spark.pyspark.virtualenv.enabled is set as true but no value for "
+ "spark.pyspark.virtualenv.requirements")
+ if not virtualEnvBinPath:
+ raise Exception("spark.pyspark.virtualenv.enabled is set as true but no value for "
+ "spark.pyspark.virtualenv.bin.path")
+
self.pythonVer = "%d.%d" % sys.version_info[:2]
# Broadcast's __reduce__ method stores Broadcast instances here.
@@ -189,19 +200,34 @@ def _do_init(self, master, appName, sparkHome, pyFiles, environment, batchSize,
root_dir = SparkFiles.getRootDirectory()
sys.path.insert(1, root_dir)
+ self._python_wheels = set()
+
# Deploy any code dependencies specified in the constructor
+ # Wheel files will be installed by pip later.
self._python_includes = list()
for path in (pyFiles or []):
self.addPyFile(path)
# Deploy code dependencies set by spark-submit; these will already have been added
# with SparkContext.addFile, so we just need to add them to the PYTHONPATH
+ # Wheel files will be installed by pip later.
for path in self._conf.get("spark.submit.pyFiles", "").split(","):
- if path != "":
- (dirname, filename) = os.path.split(path)
- if filename[-4:].lower() in self.PACKAGE_EXTENSIONS:
- self._python_includes.append(filename)
- sys.path.insert(1, os.path.join(SparkFiles.getRootDirectory(), filename))
+ if path:
+ (_dirname, filename) = os.path.split(path)
+ extname = os.path.splitext(path)[1].lower()
+ if extname in self.PACKAGE_EXTENSIONS:
+ if extname == ".whl":
+ self._python_wheels.add(path)
+ else:
+ self._python_includes.append(filename)
+ sys.path.insert(1, os.path.join(SparkFiles.getRootDirectory(), filename))
+
+ # Install all wheel files at once.
+ if self._python_wheels:
+ if 'VIRTUAL_ENV' not in os.environ or not os.environ['VIRTUAL_ENV']:
+ raise Exception("Whl installation requires to run inside a virtualenv. "
+ "You may have forgotten to activate a virtualenv "
+ "when using spark-submit in 'client' deployment mode?")
# Create a temporary directory inside spark.local.dir:
local_dir = self._jvm.org.apache.spark.util.Utils.getLocalDir(self._jsc.sc().conf())
@@ -796,17 +822,27 @@ def addFile(self, path, recursive=False):
def addPyFile(self, path):
"""
- Add a .py or .zip dependency for all tasks to be executed on this
+ Add a .py, .zip or .egg dependency for all tasks to be executed on this
SparkContext in the future. The C{path} passed can be either a local
file, a file in HDFS (or other Hadoop-supported filesystems), or an
HTTP, HTTPS or FTP URI.
+ Note that .whl should not be handled by this method
"""
+ if not path:
+ return
self.addFile(path)
- (dirname, filename) = os.path.split(path) # dirname may be directory or HDFS/S3 prefix
- if filename[-4:].lower() in self.PACKAGE_EXTENSIONS:
+
+ (_dirname, filename) = os.path.split(path) # dirname may be directory or HDFS/S3 prefix
+ extname = os.path.splitext(path)[1].lower()
+ if extname == '.whl':
+ return
+
+ if extname in self.PACKAGE_EXTENSIONS:
self._python_includes.append(filename)
- # for tests in local mode
- sys.path.insert(1, os.path.join(SparkFiles.getRootDirectory(), filename))
+ if extname != '.whl':
+ # for tests in local mode
+ # Prepend the python package (except for *.whl) to sys.path
+ sys.path.insert(1, os.path.join(SparkFiles.getRootDirectory(), filename))
if sys.version > '3':
import importlib
importlib.invalidate_caches()
@@ -963,7 +999,7 @@ def _test():
globs['sc'] = SparkContext('local[4]', 'PythonTest')
globs['tempdir'] = tempfile.mkdtemp()
atexit.register(lambda: shutil.rmtree(globs['tempdir']))
- (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
+ (failure_count, _test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS)
globs['sc'].stop()
if failure_count:
exit(-1)
diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py
index 8418abf99c8d..e1d495e31a45 100644
--- a/python/pyspark/sql/session.py
+++ b/python/pyspark/sql/session.py
@@ -597,22 +597,6 @@ def stop(self):
self._sc.stop()
SparkSession._instantiatedContext = None
- @since(2.0)
- def __enter__(self):
- """
- Enable 'with SparkSession.builder.(...).getOrCreate() as session: app' syntax.
- """
- return self
-
- @since(2.0)
- def __exit__(self, exc_type, exc_val, exc_tb):
- """
- Enable 'with SparkSession.builder.(...).getOrCreate() as session: app' syntax.
-
- Specifically stop the SparkSession on exit of the with block.
- """
- self.stop()
-
def _test():
import os