Skip to content

Commit a888fed

Browse files
Ji Yansrowen
authored andcommitted
[SPARK-19740][MESOS] Add support in Spark to pass arbitrary parameters into docker when running on mesos with docker containerizer
## What changes were proposed in this pull request? Allow passing in arbitrary parameters into docker when launching spark executors on mesos with docker containerizer tnachen ## How was this patch tested? Manually built and tested with passed in parameter Author: Ji Yan <[email protected]> Closes #17109 from yanji84/ji/allow_set_docker_user.
1 parent e090f3c commit a888fed

File tree

3 files changed

+96
-3
lines changed

3 files changed

+96
-3
lines changed

docs/running-on-mesos.md

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,16 @@ See the [configuration page](configuration.html) for information on Spark config
356356
By default Mesos agents will not pull images they already have cached.
357357
</td>
358358
</tr>
359+
<tr>
360+
<td><code>spark.mesos.executor.docker.parameters</code></td>
361+
<td>(none)</td>
362+
<td>
363+
Set the list of custom parameters which will be passed into the <code>docker run</code> command when launching the Spark executor on Mesos using the docker containerizer. The format of this property is a comma-separated list of
364+
key/value pairs. Example:
365+
366+
<pre>key1=val1,key2=val2,key3=val3</pre>
367+
</td>
368+
</tr>
359369
<tr>
360370
<td><code>spark.mesos.executor.docker.volumes</code></td>
361371
<td>(none)</td>

resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.scheduler.cluster.mesos
1919

20-
import org.apache.mesos.Protos.{ContainerInfo, Image, NetworkInfo, Volume}
20+
import org.apache.mesos.Protos.{ContainerInfo, Image, NetworkInfo, Parameter, Volume}
2121
import org.apache.mesos.Protos.ContainerInfo.{DockerInfo, MesosInfo}
2222

2323
import org.apache.spark.{SparkConf, SparkException}
@@ -99,6 +99,28 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
9999
.toList
100100
}
101101

102+
/**
103+
* Parse a list of docker parameters, each of which
104+
* takes the form key=value
105+
*/
106+
private def parseParamsSpec(params: String): List[Parameter] = {
107+
// split with limit of 2 to avoid parsing error when '='
108+
// exists in the parameter value
109+
params.split(",").map(_.split("=", 2)).flatMap { spec: Array[String] =>
110+
val param: Parameter.Builder = Parameter.newBuilder()
111+
spec match {
112+
case Array(key, value) =>
113+
Some(param.setKey(key).setValue(value))
114+
case spec =>
115+
logWarning(s"Unable to parse arbitary parameters: $params. "
116+
+ "Expected form: \"key=value(, ...)\"")
117+
None
118+
}
119+
}
120+
.map { _.build() }
121+
.toList
122+
}
123+
102124
def containerInfo(conf: SparkConf): ContainerInfo = {
103125
val containerType = if (conf.contains("spark.mesos.executor.docker.image") &&
104126
conf.get("spark.mesos.containerizer", "docker") == "docker") {
@@ -120,8 +142,14 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
120142
.map(parsePortMappingsSpec)
121143
.getOrElse(List.empty)
122144

145+
val params = conf
146+
.getOption("spark.mesos.executor.docker.parameters")
147+
.map(parseParamsSpec)
148+
.getOrElse(List.empty)
149+
123150
if (containerType == ContainerInfo.Type.DOCKER) {
124-
containerInfo.setDocker(dockerInfo(image, forcePullImage, portMaps))
151+
containerInfo
152+
.setDocker(dockerInfo(image, forcePullImage, portMaps, params))
125153
} else {
126154
containerInfo.setMesos(mesosInfo(image, forcePullImage))
127155
}
@@ -144,11 +172,13 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging {
144172
private def dockerInfo(
145173
image: String,
146174
forcePullImage: Boolean,
147-
portMaps: List[ContainerInfo.DockerInfo.PortMapping]): DockerInfo = {
175+
portMaps: List[ContainerInfo.DockerInfo.PortMapping],
176+
params: List[Parameter]): DockerInfo = {
148177
val dockerBuilder = ContainerInfo.DockerInfo.newBuilder()
149178
.setImage(image)
150179
.setForcePullImage(forcePullImage)
151180
portMaps.foreach(dockerBuilder.addPortMappings(_))
181+
params.foreach(dockerBuilder.addParameters(_))
152182

153183
dockerBuilder.build
154184
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.scheduler.cluster.mesos
19+
20+
import org.scalatest._
21+
import org.scalatest.mock.MockitoSugar
22+
23+
import org.apache.spark.{SparkConf, SparkFunSuite}
24+
25+
class MesosSchedulerBackendUtilSuite extends SparkFunSuite {
26+
27+
test("ContainerInfo fails to parse invalid docker parameters") {
28+
val conf = new SparkConf()
29+
conf.set("spark.mesos.executor.docker.parameters", "a,b")
30+
conf.set("spark.mesos.executor.docker.image", "test")
31+
32+
val containerInfo = MesosSchedulerBackendUtil.containerInfo(conf)
33+
val params = containerInfo.getDocker.getParametersList
34+
35+
assert(params.size() == 0)
36+
}
37+
38+
test("ContainerInfo parses docker parameters") {
39+
val conf = new SparkConf()
40+
conf.set("spark.mesos.executor.docker.parameters", "a=1,b=2,c=3")
41+
conf.set("spark.mesos.executor.docker.image", "test")
42+
43+
val containerInfo = MesosSchedulerBackendUtil.containerInfo(conf)
44+
val params = containerInfo.getDocker.getParametersList
45+
assert(params.size() == 3)
46+
assert(params.get(0).getKey == "a")
47+
assert(params.get(0).getValue == "1")
48+
assert(params.get(1).getKey == "b")
49+
assert(params.get(1).getValue == "2")
50+
assert(params.get(2).getKey == "c")
51+
assert(params.get(2).getValue == "3")
52+
}
53+
}

0 commit comments

Comments
 (0)