Skip to content

Commit 5ae02f9

Browse files
justinuangrobert3005
authored andcommitted
Make conda channels dynamic on executors (apache-spark-on-k8s#160)
1 parent 2bd3c33 commit 5ae02f9

File tree

4 files changed

+21
-16
lines changed

4 files changed

+21
-16
lines changed

core/src/main/scala/org/apache/spark/api/conda/CondaEnvironmentManager.scala

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -34,17 +34,16 @@ import org.apache.spark.internal.config.CONDA_CHANNEL_URLS
3434
import org.apache.spark.internal.config.CONDA_VERBOSITY
3535
import org.apache.spark.util.Utils
3636

37-
final class CondaEnvironmentManager(condaBinaryPath: String, condaChannelUrls: Seq[String],
38-
verbosity: Int = 0)
39-
extends Logging {
37+
final class CondaEnvironmentManager(condaBinaryPath: String, verbosity: Int = 0) extends Logging {
4038

41-
require(condaChannelUrls.nonEmpty, "Can't have an empty list of conda channel URLs")
4239
require(verbosity >= 0 && verbosity <= 3, "Verbosity must be between 0 and 3 inclusively")
4340

4441
def create(
45-
baseDir: String,
46-
bootstrapPackages: Seq[String]): CondaEnvironment = {
47-
require(bootstrapPackages.nonEmpty, "Expected at least one bootstrap package.")
42+
baseDir: String,
43+
condaPackages: Seq[String],
44+
condaChannelUrls: Seq[String]): CondaEnvironment = {
45+
require(condaPackages.nonEmpty, "Expected at least one conda package.")
46+
require(condaChannelUrls.nonEmpty, "Can't have an empty list of conda channel URLs")
4847
val name = "conda-env"
4948

5049
// must link in /tmp to reduce path length in case baseDir is very long...
@@ -62,11 +61,11 @@ final class CondaEnvironmentManager(condaBinaryPath: String, condaChannelUrls: S
6261
List("create", "-n", name, "-y", "--override-channels", "--no-default-packages")
6362
::: verbosityFlags
6463
::: condaChannelUrls.flatMap(Iterator("--channel", _)).toList
65-
::: "--" :: bootstrapPackages.toList,
64+
::: "--" :: condaPackages.toList,
6665
description = "create conda env"
6766
)
6867

69-
new CondaEnvironment(this, linkedBaseDir, name, bootstrapPackages, condaChannelUrls)
68+
new CondaEnvironment(this, linkedBaseDir, name, condaPackages, condaChannelUrls)
7069
}
7170

7271
/**
@@ -142,10 +141,7 @@ object CondaEnvironmentManager {
142141
def fromConf(sparkConf: SparkConf): CondaEnvironmentManager = {
143142
val condaBinaryPath = sparkConf.get(CONDA_BINARY_PATH).getOrElse(
144143
sys.error(s"Expected config ${CONDA_BINARY_PATH.key} to be set"))
145-
val condaChannelUrls = sparkConf.get(CONDA_CHANNEL_URLS)
146-
require(condaChannelUrls.nonEmpty,
147-
s"Must define at least one conda channel in config ${CONDA_CHANNEL_URLS.key}")
148144
val verbosity = sparkConf.get(CONDA_VERBOSITY)
149-
new CondaEnvironmentManager(condaBinaryPath, condaChannelUrls, verbosity)
145+
new CondaEnvironmentManager(condaBinaryPath, verbosity)
150146
}
151147
}

core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ private[spark] class PythonWorkerFactory(requestedPythonExec: Option[String],
6969
val dirId = hash % localDirs.length
7070
Utils.createTempDir(localDirs(dirId).getAbsolutePath, "conda").getAbsolutePath
7171
}
72-
condaEnvManager.create(envDir, condaPackages)
72+
condaEnvManager.create(envDir, condaPackages, instructions.channels)
7373
}
7474
}
7575

core/src/main/scala/org/apache/spark/deploy/CondaRunner.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,9 +34,11 @@ abstract class CondaRunner extends Logging {
3434

3535
if (CondaEnvironmentManager.isConfigured(sparkConf)) {
3636
val condaBootstrapDeps = sparkConf.get(CONDA_BOOTSTRAP_PACKAGES)
37+
val condaChannelUrls = sparkConf.get(CONDA_CHANNEL_URLS)
3738
val condaBaseDir = Utils.createTempDir(Utils.getLocalDir(sparkConf), "conda").getAbsolutePath
3839
val condaEnvironmentManager = CondaEnvironmentManager.fromConf(sparkConf)
39-
val environment = condaEnvironmentManager.create(condaBaseDir, condaBootstrapDeps)
40+
val environment = condaEnvironmentManager
41+
.create(condaBaseDir, condaBootstrapDeps, condaChannelUrls)
4042

4143
// Save this as a global in order for SparkContext to be able to access it later, in case we
4244
// are shelling out, but providing a bridge back into this JVM.

resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,14 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
9191
|
9292
| status = open(sys.argv[1],'w')
9393
|
94-
| numpy_multiply = lambda x: numpy.multiply(x, mod1.func() * mod2.func())
94+
| # Addict exists only in external-conda-forge, not anaconda
95+
| sc.addCondaChannel("https://conda.anaconda.org/conda-forge")
96+
| sc.addCondaPackages('addict=1.0.0')
97+
|
98+
| def numpy_multiply(x):
99+
| # Ensure package from non-base channel is installed
100+
| import addict
101+
| numpy.multiply(x, mod1.func() * mod2.func())
95102
|
96103
| rdd = sc.parallelize(range(10)).map(numpy_multiply)
97104
| cnt = rdd.count()

0 commit comments

Comments
 (0)