Skip to content

Commit cf977e4

Browse files
committed
[SPARK-16367][PYSPARK] Add wheelhouse support
- Merge of #13599 ("virtualenv in pyspark", Bug SPARK-13587) - and #5408 (wheel package support for Pyspark", bug SPARK-6764) - Documentation updated - only Standalone and YARN supported. Mesos not supported - only tested with virtualenv/pip. Conda not tested - client deployment + pip install w/ index: ok (1 min 30 exec) - client deployment + wheelhouse w/o index: ko (cffi refuse the builded wheel) Signed-off-by: Gaetan Semet <[email protected]>
1 parent 7e3063e commit cf977e4

File tree

8 files changed

+687
-62
lines changed

8 files changed

+687
-62
lines changed

core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala

Lines changed: 176 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,18 @@
1818
package org.apache.spark.api.python
1919

2020
import java.io.{DataInputStream, DataOutputStream, InputStream, OutputStreamWriter}
21+
import java.io.{File, FileInputStream, FileOutputStream, IOException}
2122
import java.net.{InetAddress, ServerSocket, Socket, SocketException}
2223
import java.nio.charset.StandardCharsets
24+
import java.nio.file.{Paths, Files}
2325
import java.util.Arrays
2426
import java.util.concurrent.atomic.AtomicInteger
27+
import java.util.zip.{ZipEntry, ZipInputStream}
2528

2629
import scala.collection.mutable
2730
import scala.collection.JavaConverters._
2831

32+
import org.apache.commons.io.IOUtils
2933
import org.apache.spark._
3034
import org.apache.spark.internal.Logging
3135
import org.apache.spark.util.{RedirectThread, Utils}
@@ -50,12 +54,32 @@ private[spark] class PythonWorkerFactory(pythonExec: String,
5054
val daemonWorkers = new mutable.WeakHashMap[Socket, Int]()
5155
val idleWorkers = new mutable.Queue[Socket]()
5256
var lastActivity = 0L
57+
val sparkFiles = conf.getOption("spark.files")
5358
val virtualEnvEnabled = conf.getBoolean("spark.pyspark.virtualenv.enabled", false)
5459
val virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native")
55-
val virtualEnvPath = conf.get("spark.pyspark.virtualenv.bin.path", "")
60+
val virtualEnvPath = conf.get("spark.pyspark.virtualenv.bin.path", "virtualenv")
61+
val virtualEnvSystemSitePackages = conf.getBoolean(
62+
"spark.pyspark.virtualenv.system_site_packages", false)
63+
val virtualWheelhouse = conf.get("spark.pyspark.virtualenv.wheelhouse", "wheelhouse.zip")
64+
// virtualRequirements is empty string by default
65+
val virtualRequirements = conf.get("spark.pyspark.virtualenv.requirements", "")
66+
val virtualIndexUrl = conf.get("spark.pyspark.virtualenv.index_url", null)
67+
val virtualTrustedHost = conf.get("spark.pyspark.virtualenv.trusted_host", null)
68+
val virtualInstallPackage = conf.get("spark.pyspark.virtualenv.install_package", null)
69+
val upgradePip = conf.getBoolean("spark.pyspark.virtualenv.upgrade_pip", false)
70+
val virtualUseIndex = conf.getBoolean("spark.pyspark.virtualenv.use_index", true)
5671
var virtualEnvName: String = _
5772
var virtualPythonExec: String = _
5873

74+
// search for "wheelhouse.zip" to trigger unzipping and installation of wheelhouse
75+
// also search for "requirements.txt if provided"
76+
for (filename <- sparkFiles.map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten) {
77+
logDebug("Looking inside" + filename)
78+
val file = new File(filename)
79+
val prefixes = Iterator.iterate(file)(_.getParentFile).takeWhile(_ != null).toList.reverse
80+
logDebug("=> prefixes" + prefixes)
81+
}
82+
5983
new MonitorThread().start()
6084

6185
var simpleWorkers = new mutable.WeakHashMap[Socket, Process]()
@@ -65,7 +89,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String,
6589
envVars.getOrElse("PYTHONPATH", ""),
6690
sys.env.getOrElse("PYTHONPATH", ""))
6791

68-
if (conf.getBoolean("spark.pyspark.virtualenv.enabled", false)) {
92+
if (virtualEnvEnabled) {
6993
setupVirtualEnv()
7094
}
7195

@@ -82,15 +106,73 @@ private[spark] class PythonWorkerFactory(pythonExec: String,
82106
}
83107
}
84108

109+
110+
def unzipWheelhouse(zipFile: String, outputFolder: String): Unit = {
111+
val buffer = new Array[Byte](1024)
112+
try {
113+
// output directory
114+
val folder = new File(outputFolder);
115+
if (!folder.exists()) {
116+
folder.mkdir();
117+
}
118+
119+
// zip file content
120+
val zis: ZipInputStream = new ZipInputStream(new FileInputStream(zipFile));
121+
// get the zipped file list entry
122+
var ze: ZipEntry = zis.getNextEntry();
123+
124+
while (ze != null) {
125+
if (!ze.isDirectory()) {
126+
val fileName = ze.getName();
127+
val newFile = new File(outputFolder + File.separator + fileName);
128+
logDebug("Unzipping file " + newFile.getAbsoluteFile());
129+
130+
// create folders
131+
new File(newFile.getParent()).mkdirs();
132+
val fos = new FileOutputStream(newFile);
133+
var len: Int = zis.read(buffer);
134+
135+
while (len > 0) {
136+
fos.write(buffer, 0, len)
137+
len = zis.read(buffer)
138+
}
139+
fos.close()
140+
}
141+
ze = zis.getNextEntry()
142+
}
143+
zis.closeEntry()
144+
zis.close()
145+
} catch {
146+
case e: IOException => logError("exception caught: " + e.getMessage)
147+
}
148+
}
149+
85150
/**
86151
* Create virtualenv using native virtualenv or conda
87152
*
88153
* Native Virtualenv:
89-
* - Execute command: virtualenv -p pythonExec --no-site-packages virtualenvName
90-
* - Execute command: python -m pip --cache-dir cache-dir install -r requirement_file
154+
* - Install virtualenv:
155+
* virtualenv -p pythonExec [--system-site-packages] virtualenvName
156+
* - if wheelhouse specified:
157+
* - unzip wheelhouse
158+
* - upgrade pip if set by conf (default: no)
159+
* - install using pip:
160+
*
161+
* pip install -r requirement_file.txt \
162+
* --find-links=wheelhouse \
163+
* [--no-index] \
164+
* [--index-url http://pypi.mirror/simple] [--trusted-host pypi.mirror] \
165+
* [package.whl]
166+
*
167+
* else, if no wheelhouse is set:
168+
*
169+
* pip install -r requirement_file.txt \
170+
* [--no-index] \
171+
* [--index-url http://pypi.mirror/simple] [--trusted-host pypi.mirror] \
172+
* [package.whl]
91173
*
92174
* Conda
93-
* - Execute command: conda create --name virtualenvName --file requirement_file -y
175+
* - Execute command: conda create --name virtualenvName --file requirement_file.txt -y
94176
*
95177
*/
96178
def setupVirtualEnv(): Unit = {
@@ -100,41 +182,114 @@ private[spark] class PythonWorkerFactory(pythonExec: String,
100182
// fetched from FileServer
101183
val pyspark_requirements =
102184
if (Utils.isLocalMaster(conf)) {
103-
conf.get("spark.pyspark.virtualenv.requirements")
185+
virtualRequirements
104186
} else {
105-
conf.get("spark.pyspark.virtualenv.requirements").split("/").last
187+
virtualRequirements.split("/").last
106188
}
107189

190+
logDebug("wheelhouse: " + virtualWheelhouse)
191+
if (virtualWheelhouse != null &&
192+
!virtualWheelhouse.isEmpty &&
193+
Files.exists(Paths.get(virtualWheelhouse))) {
194+
logDebug("Unziping wheelhouse archive " + virtualWheelhouse)
195+
unzipWheelhouse(virtualWheelhouse, "wheelhouse")
196+
}
197+
108198
val createEnvCommand =
109199
if (virtualEnvType == "native") {
110-
Arrays.asList(virtualEnvPath,
111-
"-p", pythonExec,
112-
"--no-site-packages", virtualEnvName)
200+
if (virtualEnvSystemSitePackages) {
201+
Arrays.asList(virtualEnvPath, "-p", pythonExec, "--system-site-packages", virtualEnvName)
202+
}
203+
else {
204+
Arrays.asList(virtualEnvPath, "-p", pythonExec, virtualEnvName)
205+
}
113206
} else {
114-
Arrays.asList(virtualEnvPath,
115-
"create", "--prefix", System.getProperty("user.dir") + "/" + virtualEnvName,
116-
"--file", pyspark_requirements, "-y")
207+
// Conda creates everything and install the packages
208+
var basePipArgs = mutable.ListBuffer[String]()
209+
basePipArgs += (virtualEnvPath,
210+
"create",
211+
"--prefix",
212+
System.getProperty("user.dir") + "/" + virtualEnvName)
213+
if (pyspark_requirements != null && !pyspark_requirements.isEmpty) {
214+
basePipArgs += ("--file", pyspark_requirements)
215+
}
216+
basePipArgs += ("-y")
217+
basePipArgs.toList.asJava
117218
}
118219
execCommand(createEnvCommand)
119-
// virtualenv will be created in the working directory of Executor.
120220
virtualPythonExec = virtualEnvName + "/bin/python"
221+
222+
// virtualenv will be created in the working directory of Executor.
121223
if (virtualEnvType == "native") {
122-
execCommand(Arrays.asList(virtualPythonExec, "-m", "pip",
123-
"--cache-dir", System.getProperty("user.home"),
124-
"install", "-r", pyspark_requirements))
224+
var virtualenvPipExec = virtualEnvName + "/bin/pip"
225+
var pipUpgradeArgs = mutable.ListBuffer[String]()
226+
if (upgradePip){
227+
pipUpgradeArgs += (virtualenvPipExec, "install", "--upgrade", "pip")
228+
}
229+
var basePipArgs = mutable.ListBuffer[String]()
230+
basePipArgs += (virtualenvPipExec, "install")
231+
if (pyspark_requirements != null && !pyspark_requirements.isEmpty) {
232+
basePipArgs += ("-r", pyspark_requirements)
233+
}
234+
if (virtualWheelhouse != null &&
235+
!virtualWheelhouse.isEmpty &&
236+
Files.exists(Paths.get(virtualWheelhouse))) {
237+
basePipArgs += ("--find-links=wheelhouse")
238+
pipUpgradeArgs += ("--find-links=wheelhouse")
239+
}
240+
if (virtualIndexUrl != null && !virtualIndexUrl.isEmpty) {
241+
basePipArgs += ("--index-url", virtualIndexUrl)
242+
pipUpgradeArgs += ("--index-url", virtualIndexUrl)
243+
} else if (! virtualUseIndex){
244+
basePipArgs += ("--no-index")
245+
pipUpgradeArgs += ("--no-index")
246+
}
247+
if (virtualTrustedHost != null && !virtualTrustedHost.isEmpty) {
248+
basePipArgs += ("--trusted-host", virtualTrustedHost)
249+
pipUpgradeArgs += ("--trusted-host", virtualTrustedHost)
250+
}
251+
if (upgradePip){
252+
// upgrade pip in the virtualenv
253+
execCommand(pipUpgradeArgs.toList.asJava)
254+
}
255+
if (virtualInstallPackage != null && !virtualInstallPackage.isEmpty) {
256+
basePipArgs += (virtualInstallPackage)
257+
}
258+
execCommand(basePipArgs.toList.asJava)
125259
}
260+
// do not execute a second command line in "conda" mode
126261
}
127262

128263
def execCommand(commands: java.util.List[String]): Unit = {
129-
logDebug("Running command:" + commands.asScala.mkString(" "))
130-
val pb = new ProcessBuilder(commands).inheritIO()
264+
logDebug("Running command: " + commands.asScala.mkString(" "))
265+
266+
val pb = new ProcessBuilder(commands)
131267
pb.environment().putAll(envVars.asJava)
132268
pb.environment().putAll(System.getenv())
133269
pb.environment().put("HOME", System.getProperty("user.home"))
270+
134271
val proc = pb.start()
272+
135273
val exitCode = proc.waitFor()
136274
if (exitCode != 0) {
137-
throw new RuntimeException("Fail to run command: " + commands.asScala.mkString(" "))
275+
val errString = try {
276+
val err = Option(proc.getErrorStream())
277+
err.map(IOUtils.toString)
278+
} catch {
279+
case io: IOException => None
280+
}
281+
282+
val outString = try {
283+
val out = Option(proc.getInputStream())
284+
out.map(IOUtils.toString)
285+
} catch {
286+
case io: IOException => None
287+
}
288+
289+
throw new RuntimeException("Fail to run command: " + commands.asScala.mkString(" ") +
290+
"\nOutput: " + outString +
291+
"\nStderr: " + errString
292+
)
138293
}
139294
}
140295

@@ -183,8 +338,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String,
183338
// Create and start the worker
184339
val realPythonExec = if (virtualEnvEnabled) virtualPythonExec else pythonExec
185340
logDebug(s"Starting worker with pythonExec: ${realPythonExec}")
186-
val pb = new ProcessBuilder(Arrays.asList(realPythonExec,
187-
"-m", "pyspark.worker"))
341+
val pb = new ProcessBuilder(Arrays.asList(realPythonExec, "-m", "pyspark.worker"))
188342
val workerEnv = pb.environment()
189343
workerEnv.putAll(envVars.asJava)
190344
workerEnv.put("PYTHONPATH", pythonPath)

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -505,8 +505,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
505505
| dependency conflicts.
506506
| --repositories Comma-separated list of additional remote repositories to
507507
| search for the maven coordinates given with --packages.
508-
| --py-files PY_FILES Comma-separated list of .zip, .egg, or .py files to place
509-
| on the PYTHONPATH for Python apps.
508+
| --py-files PY_FILES Comma-separated list of .zip, .egg, .whl or .py files to
509+
| place on the PYTHONPATH for Python apps.
510510
| --files FILES Comma-separated list of files to be placed in the working
511511
| directory of each executor.
512512
|

docs/cluster-overview.md

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,9 +87,18 @@ The following table summarizes terms you'll see used to refer to cluster concept
8787
<tr>
8888
<td>Application jar</td>
8989
<td>
90-
A jar containing the user's Spark application. In some cases users will want to create
91-
an "uber jar" containing their application along with its dependencies. The user's jar
92-
should never include Hadoop or Spark libraries, however, these will be added at runtime.
90+
A jar containing the user's Spark application (for Java and Scala driver). In some cases
91+
users will want to create an "uber jar" containing their application along with its
92+
dependencies. The user's jar should never include Hadoop or Spark libraries, however, these
93+
will be added at runtime.
94+
</td>
95+
</tr>
96+
<tr>
97+
<td>Application Wheelhouse</td>
98+
<td>
99+
An archive containing precompiled wheels of the user's PySpark application and dependencies
100+
(for Python driver). The user's wheelhouse should not include jars, only Python Wheel files
101+
for one or more architectures.
93102
</td>
94103
</tr>
95104
<tr>

docs/configuration.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -392,7 +392,7 @@ Apart from these, the following properties are also available, and may be useful
392392
<td><code>spark.submit.pyFiles</code></td>
393393
<td></td>
394394
<td>
395-
Comma-separated list of .zip, .egg, or .py files to place on the PYTHONPATH for Python apps.
395+
Comma-separated list of .zip, .egg, .whl or .py files to place on the PYTHONPATH for Python apps.
396396
</td>
397397
</tr>
398398
<tr>

docs/programming-guide.md

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ along with if you launch Spark's interactive shell -- either `bin/spark-shell` f
2424

2525
<div data-lang="scala" markdown="1">
2626

27-
Spark {{site.SPARK_VERSION}} is built and distributed to work with Scala {{site.SCALA_BINARY_VERSION}}
27+
Spark {{site.SPARK_VERSION}} is built and distributed to work with Scala {{site.SCALA_BINARY_VERSION}}
2828
by default. (Spark can be built to work with other versions of Scala, too.) To write
2929
applications in Scala, you will need to use a compatible Scala version (e.g. {{site.SCALA_BINARY_VERSION}}.X).
3030

@@ -211,7 +211,7 @@ For a complete list of options, run `spark-shell --help`. Behind the scenes,
211211

212212
In the PySpark shell, a special interpreter-aware SparkContext is already created for you, in the
213213
variable called `sc`. Making your own SparkContext will not work. You can set which master the
214-
context connects to using the `--master` argument, and you can add Python .zip, .egg or .py files
214+
context connects to using the `--master` argument, and you can add Python .zip, .egg, .whl or .py files
215215
to the runtime path by passing a comma-separated list to `--py-files`. You can also add dependencies
216216
(e.g. Spark Packages) to your shell session by supplying a comma-separated list of maven coordinates
217217
to the `--packages` argument. Any additional repositories where dependencies might exist (e.g. SonaType)
@@ -240,13 +240,13 @@ use IPython, set the `PYSPARK_DRIVER_PYTHON` variable to `ipython` when running
240240
$ PYSPARK_DRIVER_PYTHON=ipython ./bin/pyspark
241241
{% endhighlight %}
242242

243-
To use the Jupyter notebook (previously known as the IPython notebook),
243+
To use the Jupyter notebook (previously known as the IPython notebook),
244244

245245
{% highlight bash %}
246246
$ PYSPARK_DRIVER_PYTHON=jupyter ./bin/pyspark
247247
{% endhighlight %}
248248

249-
You can customize the `ipython` or `jupyter` commands by setting `PYSPARK_DRIVER_PYTHON_OPTS`.
249+
You can customize the `ipython` or `jupyter` commands by setting `PYSPARK_DRIVER_PYTHON_OPTS`.
250250

251251
After the Jupyter Notebook server is launched, you can create a new "Python 2" notebook from
252252
the "Files" tab. Inside the notebook, you can input the command `%pylab inline` as part of
@@ -812,7 +812,7 @@ The variables within the closure sent to each executor are now copies and thus,
812812

813813
In local mode, in some circumstances the `foreach` function will actually execute within the same JVM as the driver and will reference the same original **counter**, and may actually update it.
814814

815-
To ensure well-defined behavior in these sorts of scenarios one should use an [`Accumulator`](#accumulators). Accumulators in Spark are used specifically to provide a mechanism for safely updating a variable when execution is split up across worker nodes in a cluster. The Accumulators section of this guide discusses these in more detail.
815+
To ensure well-defined behavior in these sorts of scenarios one should use an [`Accumulator`](#accumulators). Accumulators in Spark are used specifically to provide a mechanism for safely updating a variable when execution is split up across worker nodes in a cluster. The Accumulators section of this guide discusses these in more detail.
816816

817817
In general, closures - constructs like loops or locally defined methods, should not be used to mutate some global state. Spark does not define or guarantee the behavior of mutations to objects referenced from outside of closures. Some code that does this may work in local mode, but that's just by accident and such code will not behave as expected in distributed mode. Use an Accumulator instead if some global aggregation is needed.
818818

@@ -1231,8 +1231,8 @@ storage levels is:
12311231
</tr>
12321232
</table>
12331233

1234-
**Note:** *In Python, stored objects will always be serialized with the [Pickle](https://docs.python.org/2/library/pickle.html) library,
1235-
so it does not matter whether you choose a serialized level. The available storage levels in Python include `MEMORY_ONLY`, `MEMORY_ONLY_2`,
1234+
**Note:** *In Python, stored objects will always be serialized with the [Pickle](https://docs.python.org/2/library/pickle.html) library,
1235+
so it does not matter whether you choose a serialized level. The available storage levels in Python include `MEMORY_ONLY`, `MEMORY_ONLY_2`,
12361236
`MEMORY_AND_DISK`, `MEMORY_AND_DISK_2`, `DISK_ONLY`, and `DISK_ONLY_2`.*
12371237

12381238
Spark also automatically persists some intermediate data in shuffle operations (e.g. `reduceByKey`), even without users calling `persist`. This is done to avoid recomputing the entire input if a node fails during the shuffle. We still recommend users call `persist` on the resulting RDD if they plan to reuse it.
@@ -1374,7 +1374,7 @@ res2: Long = 10
13741374

13751375
While this code used the built-in support for accumulators of type Long, programmers can also
13761376
create their own types by subclassing [AccumulatorV2](api/scala/index.html#org.apache.spark.AccumulatorV2).
1377-
The AccumulatorV2 abstract class has several methods which need to override:
1377+
The AccumulatorV2 abstract class has several methods which need to override:
13781378
`reset` for resetting the accumulator to zero, and `add` for add anothor value into the accumulator, `merge` for merging another same-type accumulator into this one. Other methods need to override can refer to scala API document. For example, supposing we had a `MyVector` class
13791379
representing mathematical vectors, we could write:
13801380

0 commit comments

Comments
 (0)