Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,8 @@ class SparkEnv (
def createPythonWorker(pythonExec: String, envVars: Map[String, String]): java.net.Socket = {
synchronized {
val key = (pythonExec, envVars)
pythonWorkers.getOrElseUpdate(key, new PythonWorkerFactory(pythonExec, envVars)).create()
pythonWorkers.getOrElseUpdate(key,
new PythonWorkerFactory(pythonExec, envVars, conf)).create()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.io.{DataInputStream, DataOutputStream, EOFException, InputStream, Ou
import java.net.{InetAddress, ServerSocket, Socket, SocketException}
import java.nio.charset.StandardCharsets
import java.util.Arrays
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.JavaConverters._
import scala.collection.mutable
Expand All @@ -30,7 +31,11 @@ import org.apache.spark.internal.Logging
import org.apache.spark.security.SocketAuthHelper
import org.apache.spark.util.{RedirectThread, Utils}

private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String, String])

private[spark] class PythonWorkerFactory(
pythonExec: String,
envVars: Map[String, String],
conf: SparkConf)
extends Logging {

import PythonWorkerFactory._
Expand Down Expand Up @@ -76,6 +81,14 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
val daemonWorkers = new mutable.WeakHashMap[Socket, Int]()
val idleWorkers = new mutable.Queue[Socket]()
var lastActivity = 0L
val virtualEnvEnabled = conf.getBoolean("spark.pyspark.virtualenv.enabled", false)
val virtualenvPythonExec = if (virtualEnvEnabled) {
val virtualEnvFactory = new VirtualEnvFactory(pythonExec, conf, false)
virtualEnvFactory.setupVirtualEnv()
} else {
pythonExec
}

new MonitorThread().start()

var simpleWorkers = new mutable.WeakHashMap[Socket, Process]()
Expand Down Expand Up @@ -144,7 +157,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String
serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(Array(127, 0, 0, 1)))

// Create and start the worker
val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", workerModule))
val pb = new ProcessBuilder(Arrays.asList(virtualenvPythonExec, "-m", workerModule))
val workerEnv = pb.environment()
workerEnv.putAll(envVars.asJava)
workerEnv.put("PYTHONPATH", pythonPath)
Expand Down Expand Up @@ -186,7 +199,7 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String

try {
// Create and start the daemon
val command = Arrays.asList(pythonExec, "-m", daemonModule)
val command = Arrays.asList(virtualenvPythonExec, "-m", daemonModule)
val pb = new ProcessBuilder(command)
val workerEnv = pb.environment()
workerEnv.putAll(envVars.asJava)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.python

import java.io.File
import java.util.{Map => JMap}
import java.util.Arrays
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.JavaConverters._

import com.google.common.io.Files

import org.apache.spark.SparkConf
import org.apache.spark.internal.Logging


class VirtualEnvFactory(pythonExec: String, conf: SparkConf, isDriver: Boolean)
extends Logging {

private val virtualEnvType = conf.get("spark.pyspark.virtualenv.type", "native")
private val virtualEnvBinPath = conf.get("spark.pyspark.virtualenv.bin.path", "")
private val initPythonPackages = conf.getOption("spark.pyspark.virtualenv.packages")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So if the factory is made once then how will these get updated?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the existing running executor, the only way to install additional packages is via sc.intall_packages. And spark.pyspark.virtualenv.packages will be updated on driver side first when sc.install_packges is called, then new allocated executor will fetch this property and install all the additional packages properly.

private var virtualEnvName: String = _
private var virtualPythonExec: String = _
private val VIRTUALENV_ID = new AtomicInteger()
private var isLauncher: Boolean = false

// used by launcher when user want to use virtualenv in pyspark shell. Launcher need this class
// to create virtualenv for driver.
def this(pythonExec: String, properties: JMap[String, String], isDriver: java.lang.Boolean) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to use java.lang.Boolean instead of Boolean?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is because it will also be called in java side via java reflection.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we can use boolean.class in java reflection.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is used by launcher module which doesn't depend on scala.

this(pythonExec, new SparkConf().setAll(properties.asScala), isDriver)
this.isLauncher = true
}

/*
* Create virtualenv using native virtualenv or conda
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wouldn't document the commands called in the function doc string.

*
*/
def setupVirtualEnv(): String = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be better to pass args into this function so that is could be properly unit-tested. It seems that there are no unit-tests for this class, so that seems to be a necessary addition.

/*
*
* Native Virtualenv:
* - Execute command: virtualenv -p <pythonExec> --no-site-packages <virtualenvName>
* - Execute command: python -m pip --cache-dir <cache-dir> install -r <requirement_file>
*
* Conda
* - Execute command: conda create --prefix <prefix> --file <requirement_file> -y
*
*/
logInfo("Start to setup virtualenv...")
logDebug("user.dir=" + System.getProperty("user.dir"))
logDebug("user.home=" + System.getProperty("user.home"))

require(virtualEnvType == "native" || virtualEnvType == "conda",
s"VirtualEnvType: $virtualEnvType is not supported." )
require(new File(virtualEnvBinPath).exists(),
s"VirtualEnvBinPath: $virtualEnvBinPath is not defined or doesn't exist.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In addition how are we handling the case for an existing s"$virtualEnvBinPath/$virtualEnvName"?

// Two scenarios of creating virtualenv:
// 1. created in yarn container. Yarn will clean it up after container is exited
// 2. created outside yarn container. Spark need to create temp directory and clean it after app
// finish.
// - driver of PySpark shell
// - driver of yarn-client mode
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the case of Kubernetes:

This will be created in the base spark-py Docker image, which is shared between the driver and executors and the containers will be cleaned up upon termination of the job via owner-labels (for the executor) and the k8s API-Server (for the driver).

As such, (hopefully with client-mode support being completed soon), the below logic should hold as well.

Is this work going to be cluster-manage agnostic? Or is this supposed to only support Yarn? I would like to see this be applicable to all first-class cluster-management systems.

I can help with appending to this PR: k8s Support and the appropriate integration tests.

if (isLauncher ||
(isDriver && conf.get("spark.submit.deployMode") == "client")) {
val virtualenvBasedir = Files.createTempDir()
virtualenvBasedir.deleteOnExit()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the temporary directory is not being deleted on exit.

virtualEnvName = virtualenvBasedir.getAbsolutePath
} else if (isDriver && conf.get("spark.submit.deployMode") == "cluster") {
virtualEnvName = "virtualenv_driver"
} else {
// use the working directory of Executor
virtualEnvName = "virtualenv_" + conf.getAppId + "_" + VIRTUALENV_ID.getAndIncrement()
}

// Use the absolute path of requirement file in the following cases
// 1. driver of pyspark shell
// 2. driver of yarn-client mode
// otherwise just use filename as it would be downloaded to the working directory of Executor
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Kubernetes world, I might want to use a requirements.txt file that is stored locally in the base docker image, regardless of client or cluster mode. Is that something that you think should be supported? Maybe a config variable spark.pyspark.virtualenv.kubernetes.localrequirements that points to a file stored as local:///var/files/requirements.txt for example.

Furthermore, when we introduce a Resource Staging Server that allows us to stage files locally, this setting will be inter-changable between something that is locally baked in vs. staged.

val pysparkRequirements =
if (isLauncher ||
(isDriver && conf.get("spark.submit.deployMode") == "client")) {
conf.getOption("spark.pyspark.virtualenv.requirements")
} else {
conf.getOption("spark.pyspark.virtualenv.requirements").map(_.split("/").last)
}

val createEnvCommand =
if (virtualEnvType == "native") {
List(virtualEnvBinPath,
"-p", pythonExec,
"--no-site-packages", virtualEnvName)
} else {
// Two cases of conda
// 1. requirement file is specified. (Batch mode)
// 2. requirement file is not specified. (Interactive mode).
// In this case `spark.pyspark.virtualenv.python_version` must be specified.

if (pysparkRequirements.isDefined) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please re-write as a .map(...).getOrElse(..) as if(.isDefined) not idiomatic Scala

List(virtualEnvBinPath,
"create", "--prefix", virtualEnvName,
"--file", pysparkRequirements.get, "-y")
} else {
require(conf.contains("spark.pyspark.virtualenv.python_version"),
"spark.pyspark.virtualenv.python_version is not set when using conda " +
"in interactive mode")
val pythonVersion = conf.get("spark.pyspark.virtualenv.python_version")
List(virtualEnvBinPath,
"create", "--prefix", virtualEnvName,
"python=" + pythonVersion, "-y")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we only use the pythonVersion if there is no requirements file? Why?

Copy link
Contributor Author

@zjffdu zjffdu Jan 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is only for conda. Conda's requirement file contains python while native virtualenv don't have python. That's why user don't need to specify python_version when using requirement file but have to specify python_version when no requirement is specified for conda, otherwise conda will create a virtualenv without python.

}
}
execCommand(createEnvCommand)

virtualPythonExec = virtualEnvName + "/bin/python"
if (virtualEnvType == "native" && pysparkRequirements.isDefined) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.forEach not .isDefined

// requirement file for native is not mandatory, run this only when requirement file
// is specified.
execCommand(List(virtualPythonExec, "-m", "pip",
"--cache-dir", System.getProperty("user.home"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why chacing in the user home dir?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In yarn mode, executor runs under user yarn, while pip would store cache in other directory that yarn don't have permission to write. So here I specify the cache dir to yarn's home directory.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this logic carry across cluster managers? Has this been considered for use-cases for Mesos? In Kubernetes, this should be fine but we also should have some documentation about this somewhere and test this with an integration test,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this builds a cache for each user where you'd end up with multiple caches for the same packages. how about a config to set a common cache (which would have to be writeable by everybody of course)?

also i'm wondering about runaway disk usage. who cleans up when things get tight?

"install", "-r", pysparkRequirements.get))
}
// install additional packages
if (initPythonPackages.isDefined) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.forEach not isDefined

execCommand(List(virtualPythonExec, "-m", "pip",
"install") ::: initPythonPackages.get.split(":").toList);
}
logInfo(s"virtualenv is created at to $virtualPythonExec")
virtualPythonExec
}

private def execCommand(commands: List[String]): Unit = {
logInfo("Running command:" + commands.mkString(" "))
val pb = new ProcessBuilder(commands.asJava)
// don't inheritIO when it is used in launcher, because launcher would capture the standard
// output to assemble the spark-submit command.
if(!isLauncher) {
pb.inheritIO();
}
// pip internally use environment variable `HOME`
pb.environment().put("HOME", System.getProperty("user.home"))
val proc = pb.start()
val exitCode = proc.waitFor()
if (exitCode != 0) {
throw new RuntimeException("Fail to run command: " + commands.mkString(" "))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import scala.collection.mutable.ArrayBuffer
import scala.util.Try

import org.apache.spark.{SparkConf, SparkUserAppException}
import org.apache.spark.api.python.PythonUtils
import org.apache.spark.api.python.{PythonUtils, VirtualEnvFactory}
import org.apache.spark.internal.config._
import org.apache.spark.util.{RedirectThread, Utils}

Expand All @@ -41,12 +41,17 @@ object PythonRunner {
val otherArgs = args.slice(2, args.length)
val sparkConf = new SparkConf()
val secret = Utils.createSecret(sparkConf)
val pythonExec = sparkConf.get(PYSPARK_DRIVER_PYTHON)
var pythonExec = sparkConf.get(PYSPARK_DRIVER_PYTHON)
.orElse(sparkConf.get(PYSPARK_PYTHON))
.orElse(sys.env.get("PYSPARK_DRIVER_PYTHON"))
.orElse(sys.env.get("PYSPARK_PYTHON"))
.getOrElse("python")

if (sparkConf.getBoolean("spark.pyspark.virtualenv.enabled", false)) {
val virtualEnvFactory = new VirtualEnvFactory(pythonExec, sparkConf, true)
pythonExec = virtualEnvFactory.setupVirtualEnv()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct me if I misunderstood. Couldn't we have some tests to check if setupVirtualEnv has a proper string at least?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

}

// Format python file paths before adding them to the PYTHONPATH
val formattedPythonFile = formatPath(pythonFile)
val formattedPyFiles = resolvePyFiles(formatPaths(pyFiles))
Expand Down
13 changes: 13 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,19 @@ private[spark] class SparkSubmit extends Logging {
sparkConf.set("spark.submit.pyFiles", localPyFiles)
}

// for pyspark virtualenv
if (args.isPython) {
if (clusterManager != YARN &&
args.sparkProperties.getOrElse("spark.pyspark.virtualenv.enabled", "false") == "true") {
printErrorAndExit("virtualenv is only supported in yarn mode")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why doesn't this work in standalone?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't tested it in standalone mode, so not guaranteed for that, it is on my plan to support it for standalone later.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah please make a JIRA to track this then.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for Kubernetes

}
if (args.sparkProperties.contains("spark.pyspark.virtualenv.requirements")) {
// distribute virtualenv requirement file as --files
args.files = mergeFileLists(args.files,
args.sparkProperties("spark.pyspark.virtualenv.requirements"))
}
}

// In YARN mode for an R app, add the SparkR package archive and the R package
// archive containing all of the built R libraries to archives so that they can
// be distributed with the job
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import javax.annotation.concurrent.GuardedBy
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.concurrent.Future

import org.apache.spark.{ExecutorAllocationClient, SparkEnv, SparkException, TaskState}
import org.apache.spark._
import org.apache.spark.internal.Logging
import org.apache.spark.rpc._
import org.apache.spark.scheduler._
Expand Down Expand Up @@ -98,7 +98,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
private val reviveThread =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread")

class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
class DriverEndpoint(override val rpcEnv: RpcEnv)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My question here is why is this change needed? Changing the scheduler backend is weird for this change.

Copy link
Contributor Author

@zjffdu zjffdu Jan 26, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this change, the following scenario won't work.

  1. Launch spark app
  2. call sc.install_packages("numpy")
  3. run sc.range(3).map(lambda x: np.__version__).collect()
  4. Restart executor (by kill it, scheduler will scheduler another executor)
  5. run sc.range(3).map(lambda x: np.__version___.collect() again, this time it would fail. Because the new scheduled executor can not set up virtualenv correctly as it can not get the updated spark.pyspark.virtualenv.packages.

That's why I make this change in core. Now executor would always get the updated SparkConf instead of the SparkConf created when spark app is started.

There's some overhead, but I believe it is very trivial, and could be improved later.

extends ThreadSafeRpcEndpoint with Logging {

// Executors that have been lost, but for which we don't yet know the real exit reason.
Expand Down Expand Up @@ -228,6 +228,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
context.reply(true)

case RetrieveSparkAppConfig =>
val sparkProperties = new ArrayBuffer[(String, String)]
for ((key, value) <- scheduler.sc.conf.getAll) {
if (key.startsWith("spark.")) {
sparkProperties += ((key, value))
}
}
val reply = SparkAppConfig(
sparkProperties,
SparkEnv.get.securityManager.getIOEncryptionKey(),
Expand Down Expand Up @@ -380,24 +386,16 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
protected def minRegisteredRatio: Double = _minRegisteredRatio

override def start() {
val properties = new ArrayBuffer[(String, String)]
for ((key, value) <- scheduler.sc.conf.getAll) {
if (key.startsWith("spark.")) {
properties += ((key, value))
}
}

// TODO (prashant) send conf instead of properties
driverEndpoint = createDriverEndpointRef(properties)
driverEndpoint = createDriverEndpointRef()
}

protected def createDriverEndpointRef(
properties: ArrayBuffer[(String, String)]): RpcEndpointRef = {
rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties))
protected def createDriverEndpointRef(): RpcEndpointRef = {
rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint())
}

protected def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
new DriverEndpoint(rpcEnv, properties)
protected def createDriverEndpoint(): DriverEndpoint = {
new DriverEndpoint(rpcEnv)
}

def stopExecutors() {
Expand Down
Loading