Skip to content

Commit d084cbc

Browse files
dansanduleacrobert3005
authored andcommitted
Conda improvements to logging safety and customization (apache-spark-on-k8s#185)
* Fix token leak when logging in CondaEnvironmentManager, and allow configuration of higher priority pkgs_dirs through the conf * copy yarn integration test logdirs to CIRCLE_ARTIFACTS otherwise we have no idea why they failed * oops fix envs_dirs, should be a list * only copy yarn test artifacts if they exist * argh syntax * omg bash you killin me
1 parent ad4ca63 commit d084cbc

File tree

5 files changed

+85
-22
lines changed

5 files changed

+85
-22
lines changed

circle.yml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,14 @@ test:
6363
post:
6464
- find . -name unit-tests.log -exec rsync -R {} $CIRCLE_ARTIFACTS \;:
6565
parallel: true
66+
- ? |
67+
shopt -s nullglob
68+
files=(resource-managers/yarn/target/./org.apache.spark.deploy.yarn.*/*-logDir-*)
69+
if [[ ${#files[@]} != 0 ]]; then
70+
rsync -Rrm "${files[@]}" $CIRCLE_ARTIFACTS
71+
fi
72+
:
73+
parallel: true
6674

6775
deployment:
6876
release:

core/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,10 @@
261261
<groupId>com.fasterxml.jackson.core</groupId>
262262
<artifactId>jackson-databind</artifactId>
263263
</dependency>
264+
<dependency>
265+
<groupId>com.fasterxml.jackson.dataformat</groupId>
266+
<artifactId>jackson-dataformat-yaml</artifactId>
267+
</dependency>
264268
<dependency>
265269
<groupId>com.fasterxml.jackson.module</groupId>
266270
<artifactId>jackson-module-scala_${scala.binary.version}</artifactId>

core/src/main/scala/org/apache/spark/api/conda/CondaEnvironment.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,8 @@ final class CondaEnvironment(val manager: CondaEnvironmentManager,
6666
List("install", "-n", envName, "-y", "--override-channels")
6767
::: channels.iterator.flatMap(Iterator("--channel", _)).toList
6868
::: "--" :: packages.toList,
69-
description = s"install dependencies in conda env $condaEnvDir"
69+
description = s"install dependencies in conda env $condaEnvDir",
70+
channels = channels.toList
7071
)
7172

7273
this.packages ++= packages

core/src/main/scala/org/apache/spark/api/conda/CondaEnvironmentManager.scala

Lines changed: 64 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -25,19 +25,52 @@ import scala.sys.process.BasicIO
2525
import scala.sys.process.Process
2626
import scala.sys.process.ProcessBuilder
2727
import scala.sys.process.ProcessIO
28+
import scala.sys.process.ProcessLogger
29+
30+
import com.fasterxml.jackson.databind.ObjectMapper
31+
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory
32+
import org.json4s.JsonAST.JValue
33+
import org.json4s.jackson.Json4sScalaModule
34+
import org.json4s.jackson.JsonMethods
2835

2936
import org.apache.spark.SparkConf
3037
import org.apache.spark.SparkException
3138
import org.apache.spark.internal.Logging
3239
import org.apache.spark.internal.config.CONDA_BINARY_PATH
33-
import org.apache.spark.internal.config.CONDA_CHANNEL_URLS
40+
import org.apache.spark.internal.config.CONDA_GLOBAL_PACKAGE_DIRS
3441
import org.apache.spark.internal.config.CONDA_VERBOSITY
3542
import org.apache.spark.util.Utils
3643

37-
final class CondaEnvironmentManager(condaBinaryPath: String, verbosity: Int = 0) extends Logging {
44+
final class CondaEnvironmentManager(condaBinaryPath: String,
45+
verbosity: Int = 0,
46+
packageDirs: Seq[String] = Nil) extends Logging {
3847

3948
require(verbosity >= 0 && verbosity <= 3, "Verbosity must be between 0 and 3 inclusively")
4049

50+
lazy val defaultInfo: Map[String, JValue] = {
51+
logInfo("Retrieving the conda installation's info")
52+
val command = Process(List(condaBinaryPath, "info", "--json"), None)
53+
54+
val buffer = new StringBuffer
55+
val io = BasicIO(withIn = false,
56+
buffer,
57+
Some(ProcessLogger(line => logDebug(s"<conda> $line"))))
58+
59+
val exitCode = command.run(io).exitValue()
60+
if (exitCode != 0) {
61+
throw new SparkException(s"Attempt to retrieve initial conda info exited with code: "
62+
+ f"$exitCode%nCommand was: $command%nOutput was:%n${buffer.toString}")
63+
}
64+
65+
implicit val format = org.json4s.DefaultFormats
66+
JsonMethods.parse(buffer.toString).extract[Map[String, JValue]]
67+
}
68+
69+
lazy val defaultPkgsDirs: List[String] = {
70+
implicit val format = org.json4s.DefaultFormats
71+
defaultInfo("pkgs_dirs").extract[List[String]]
72+
}
73+
4174
def create(
4275
baseDir: String,
4376
condaPackages: Seq[String],
@@ -58,11 +91,11 @@ final class CondaEnvironmentManager(condaBinaryPath: String, verbosity: Int = 0)
5891
// Attempt to create environment
5992
runCondaProcess(
6093
linkedBaseDir,
61-
List("create", "-n", name, "-y", "--override-channels", "--no-default-packages")
94+
List("create", "-n", name, "-y", "--no-default-packages")
6295
::: verbosityFlags
63-
::: condaChannelUrls.flatMap(Iterator("--channel", _)).toList
6496
::: "--" :: condaPackages.toList,
65-
description = "create conda env"
97+
description = "create conda env",
98+
channels = condaChannelUrls.toList
6699
)
67100

68101
new CondaEnvironment(this, linkedBaseDir, name, condaPackages, condaChannelUrls)
@@ -77,28 +110,37 @@ final class CondaEnvironmentManager(condaBinaryPath: String, verbosity: Int = 0)
77110
*
78111
* This hack is necessary otherwise conda tries to use the homedir for pkgs cache.
79112
*/
80-
private[this] def generateCondarc(baseRoot: Path): Path = {
81-
val condaPkgsPath = Paths.get(condaBinaryPath).getParent.getParent.resolve("pkgs")
113+
private[this] def generateCondarc(baseRoot: Path, channelUrls: Seq[String]): Path = {
114+
82115
val condarc = baseRoot.resolve("condarc")
83-
val condarcContents =
84-
s"""pkgs_dirs:
85-
| - $baseRoot/pkgs
86-
| - $condaPkgsPath
87-
|envs_dirs:
88-
| - $baseRoot/envs
89-
|show_channel_urls: false
90-
|channels: []
91-
|default_channels: []
92-
""".stripMargin
93-
Files.write(condarc, List(condarcContents).asJava)
94-
logInfo(f"Using condarc at $condarc:%n$condarcContents")
116+
117+
import org.json4s.JsonAST._
118+
import org.json4s.JsonDSL._
119+
120+
// building it in json4s AST since it gives us more control over how it will be mapped
121+
val condarcNode = JObject(
122+
"pkgs_dirs" -> (packageDirs ++: s"$baseRoot/pkgs" +: defaultPkgsDirs),
123+
"envs_dirs" -> List(s"$baseRoot/envs"),
124+
"show_channel_urls" -> false,
125+
"default_channels" -> JArray(Nil),
126+
"channels" -> channelUrls
127+
)
128+
val mapper = new ObjectMapper(new YAMLFactory()).registerModule(Json4sScalaModule)
129+
130+
Files.write(condarc, mapper.writeValueAsBytes(condarcNode))
131+
132+
val sanitizedCondarc = condarcNode removeField { case (name, _) => name == "channels" }
133+
logInfo(f"Using condarc at $condarc (channels have been edited out):%n"
134+
+ mapper.writeValueAsString(sanitizedCondarc))
135+
95136
condarc
96137
}
97138

98139
private[conda] def runCondaProcess(baseRoot: Path,
99140
args: List[String],
141+
channels: List[String],
100142
description: String): Unit = {
101-
val condarc = generateCondarc(baseRoot)
143+
val condarc = generateCondarc(baseRoot, channels)
102144
val fakeHomeDir = baseRoot.resolve("home")
103145
// Attempt to create fake home dir
104146
Files.createDirectories(fakeHomeDir)
@@ -142,6 +184,7 @@ object CondaEnvironmentManager {
142184
val condaBinaryPath = sparkConf.get(CONDA_BINARY_PATH).getOrElse(
143185
sys.error(s"Expected config ${CONDA_BINARY_PATH.key} to be set"))
144186
val verbosity = sparkConf.get(CONDA_VERBOSITY)
145-
new CondaEnvironmentManager(condaBinaryPath, verbosity)
187+
val packageDirs = sparkConf.get(CONDA_GLOBAL_PACKAGE_DIRS)
188+
new CondaEnvironmentManager(condaBinaryPath, verbosity, packageDirs)
146189
}
147190
}

core/src/main/scala/org/apache/spark/internal/config/package.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,13 @@ package object config {
193193
.intConf
194194
.createWithDefault(0)
195195

196+
private[spark] val CONDA_GLOBAL_PACKAGE_DIRS = ConfigBuilder("spark.conda.packageDirs")
197+
.doc("Custom pkgs_dirs that should be prepended to the default set of pkgs_dirs that comes " +
198+
"with conda")
199+
.stringConf
200+
.toSequence
201+
.createWithDefault(Nil)
202+
196203
// To limit memory usage, we only track information for a fixed number of tasks
197204
private[spark] val UI_RETAINED_TASKS = ConfigBuilder("spark.ui.retainedTasks")
198205
.intConf

0 commit comments

Comments
 (0)