Skip to content

Commit 28b9dd3

Browse files
committed
yarn - pass acls along with executor launch
1 parent 00362da commit 28b9dd3

File tree

12 files changed

+125
-34
lines changed

12 files changed

+125
-34
lines changed

yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
3535
import org.apache.hadoop.yarn.ipc.YarnRPC
3636
import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records, ProtoUtils}
3737

38-
import org.apache.spark.{SparkConf, Logging}
38+
import org.apache.spark.{SecurityManager, SparkConf, Logging}
3939

4040

4141
class ExecutorRunnable(
@@ -46,7 +46,8 @@ class ExecutorRunnable(
4646
slaveId: String,
4747
hostname: String,
4848
executorMemory: Int,
49-
executorCores: Int)
49+
executorCores: Int,
50+
securityMgr: SecurityManager)
5051
extends Runnable with ExecutorRunnableUtil with Logging {
5152

5253
var rpc: YarnRPC = YarnRPC.create(conf)
@@ -86,6 +87,8 @@ class ExecutorRunnable(
8687
logInfo("Setting up executor with commands: " + commands)
8788
ctx.setCommands(commands)
8889

90+
ctx.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr))
91+
8992
// Send the start request to the ContainerManager
9093
val startReq = Records.newRecord(classOf[StartContainerRequest])
9194
.asInstanceOf[StartContainerRequest]

yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicInteger
2323
import scala.collection.JavaConversions._
2424
import scala.collection.mutable.{ArrayBuffer, HashMap}
2525

26-
import org.apache.spark.SparkConf
26+
import org.apache.spark.{SecurityManager, SparkConf}
2727
import org.apache.spark.scheduler.SplitInfo
2828

2929
import org.apache.hadoop.conf.Configuration
@@ -41,8 +41,9 @@ private[yarn] class YarnAllocationHandler(
4141
resourceManager: AMRMProtocol,
4242
appAttemptId: ApplicationAttemptId,
4343
args: ApplicationMasterArguments,
44-
preferredNodes: collection.Map[String, collection.Set[SplitInfo]])
45-
extends YarnAllocator(conf, sparkConf, args, preferredNodes) {
44+
preferredNodes: collection.Map[String, collection.Set[SplitInfo]],
45+
securityMgr: SecurityManager)
46+
extends YarnAllocator(conf, sparkConf, args, preferredNodes, securityMgr) {
4647

4748
private val lastResponseId = new AtomicInteger()
4849
private val releaseList: CopyOnWriteArrayList[ContainerId] = new CopyOnWriteArrayList()

yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
2727
import org.apache.hadoop.yarn.ipc.YarnRPC
2828
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
2929

30-
import org.apache.spark.{Logging, SparkConf}
30+
import org.apache.spark.{Logging, SecurityManager, SparkConf}
3131
import org.apache.spark.scheduler.SplitInfo
3232
import org.apache.spark.util.Utils
3333

@@ -45,15 +45,16 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC
4545
sparkConf: SparkConf,
4646
preferredNodeLocations: Map[String, Set[SplitInfo]],
4747
uiAddress: String,
48-
uiHistoryAddress: String) = {
48+
uiHistoryAddress: String,
49+
securityMgr: SecurityManager) = {
4950
this.rpc = YarnRPC.create(conf)
5051
this.uiHistoryAddress = uiHistoryAddress
5152

5253
resourceManager = registerWithResourceManager(conf)
5354
registerApplicationMaster(uiAddress)
5455

5556
new YarnAllocationHandler(conf, sparkConf, resourceManager, getAttemptId(), args,
56-
preferredNodeLocations)
57+
preferredNodeLocations, securityMgr)
5758
}
5859

5960
override def getAttemptId() = {

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
116116
val securityMgr = new SecurityManager(sparkConf)
117117

118118
if (isDriver) {
119-
runDriver()
119+
runDriver(securityMgr)
120120
} else {
121121
runExecutorLauncher(securityMgr)
122122
}
@@ -157,7 +157,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
157157
sparkContextRef.compareAndSet(sc, null)
158158
}
159159

160-
private def registerAM(uiAddress: String) = {
160+
private def registerAM(uiAddress: String, securityMgr: SecurityManager) = {
161161
val sc = sparkContextRef.get()
162162

163163
val appId = client.getAttemptId().getApplicationId().toString()
@@ -170,13 +170,14 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
170170
if (sc != null) sc.getConf else sparkConf,
171171
if (sc != null) sc.preferredNodeLocationData else Map(),
172172
uiAddress,
173-
historyAddress)
173+
historyAddress,
174+
securityMgr)
174175

175176
allocator.allocateResources()
176177
reporterThread = launchReporterThread()
177178
}
178179

179-
private def runDriver(): Unit = {
180+
private def runDriver(securityMgr: SecurityManager): Unit = {
180181
addAmIpFilter()
181182
val userThread = startUserClass()
182183

@@ -188,7 +189,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
188189
if (sc == null) {
189190
finish(FinalApplicationStatus.FAILED, "Timed out waiting for SparkContext.")
190191
} else {
191-
registerAM(sc.ui.appUIHostPort)
192+
registerAM(sc.ui.appUIHostPort, securityMgr)
192193
try {
193194
userThread.join()
194195
} finally {
@@ -203,7 +204,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
203204
conf = sparkConf, securityManager = securityMgr)._1
204205
actor = waitForSparkDriver()
205206
addAmIpFilter()
206-
registerAM(sparkConf.get("spark.driver.appUIAddress", ""))
207+
registerAM(sparkConf.get("spark.driver.appUIAddress", ""), securityMgr)
207208

208209
// In client mode the actor will stop the reporter thread.
209210
reporterThread.join()

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -430,10 +430,8 @@ trait ClientBase extends Logging {
430430

431431
// send the acl settings into YARN to control who has access via YARN interfaces
432432
val securityManager = new SecurityManager(sparkConf)
433-
val acls = Map[ApplicationAccessType, String] (
434-
ApplicationAccessType.VIEW_APP -> securityManager.getViewAcls,
435-
ApplicationAccessType.MODIFY_APP -> securityManager.getModifyAcls)
436-
amContainer.setApplicationACLs(acls)
433+
amContainer.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityManager))
434+
437435
amContainer
438436
}
439437
}

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.hadoop.conf.Configuration
2828
import org.apache.hadoop.yarn.api.records._
2929
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse
3030

31-
import org.apache.spark.{Logging, SparkConf, SparkEnv}
31+
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv}
3232
import org.apache.spark.scheduler.{SplitInfo, TaskSchedulerImpl}
3333
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
3434

@@ -55,7 +55,8 @@ private[yarn] abstract class YarnAllocator(
5555
conf: Configuration,
5656
sparkConf: SparkConf,
5757
args: ApplicationMasterArguments,
58-
preferredNodes: collection.Map[String, collection.Set[SplitInfo]])
58+
preferredNodes: collection.Map[String, collection.Set[SplitInfo]],
59+
securityMgr: SecurityManager)
5960
extends Logging {
6061

6162
// These three are locked on allocatedHostToContainersMap. Complementary data structures
@@ -280,7 +281,8 @@ private[yarn] abstract class YarnAllocator(
280281
executorId,
281282
executorHostname,
282283
executorMemory,
283-
executorCores)
284+
executorCores,
285+
securityMgr)
284286
new Thread(executorRunnable).start()
285287
}
286288
}
@@ -444,4 +446,4 @@ private[yarn] abstract class YarnAllocator(
444446

445447
}
446448

447-
}
449+
}

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import scala.collection.{Map, Set}
2222
import org.apache.hadoop.yarn.conf.YarnConfiguration
2323
import org.apache.hadoop.yarn.api.records._
2424

25-
import org.apache.spark.{SparkConf, SparkContext}
25+
import org.apache.spark.{SecurityManager, SparkConf, SparkContext}
2626
import org.apache.spark.scheduler.SplitInfo
2727

2828
/**
@@ -45,7 +45,8 @@ trait YarnRMClient {
4545
sparkConf: SparkConf,
4646
preferredNodeLocations: Map[String, Set[SplitInfo]],
4747
uiAddress: String,
48-
uiHistoryAddress: String): YarnAllocator
48+
uiHistoryAddress: String,
49+
securityMgr: SecurityManager): YarnAllocator
4950

5051
/**
5152
* Shuts down the AM. Guaranteed to only be called once.

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,11 @@ import org.apache.hadoop.security.UserGroupInformation
3232
import org.apache.hadoop.util.StringInterner
3333
import org.apache.hadoop.yarn.conf.YarnConfiguration
3434
import org.apache.hadoop.yarn.api.ApplicationConstants
35+
import org.apache.hadoop.yarn.api.records.ApplicationAccessType
3536
import org.apache.hadoop.yarn.util.RackResolver
3637
import org.apache.hadoop.conf.Configuration
3738

38-
import org.apache.spark.{SparkConf, SparkContext}
39+
import org.apache.spark.{SecurityManager, SparkConf, SparkContext}
3940
import org.apache.spark.deploy.SparkHadoopUtil
4041
import org.apache.spark.util.Utils
4142

@@ -211,4 +212,12 @@ object YarnSparkHadoopUtil {
211212
}
212213
}
213214

215+
private[spark] def getApplicationAclsForYarn(securityMgr: SecurityManager):
216+
Map[ApplicationAccessType, String] = {
217+
Map[ApplicationAccessType, String] (
218+
ApplicationAccessType.VIEW_APP -> securityMgr.getViewAcls,
219+
ApplicationAccessType.MODIFY_APP -> securityMgr.getModifyAcls
220+
)
221+
}
222+
214223
}

yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala

Lines changed: 71 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,10 @@ import com.google.common.io.{ByteStreams, Files}
2323
import org.apache.hadoop.yarn.conf.YarnConfiguration
2424
import org.scalatest.{FunSuite, Matchers}
2525

26-
import org.apache.spark.{Logging, SparkConf}
26+
import org.apache.hadoop.yarn.api.records.ApplicationAccessType
27+
28+
import org.apache.spark.{Logging, SecurityManager, SparkConf}
29+
2730

2831
class YarnSparkHadoopUtilSuite extends FunSuite with Matchers with Logging {
2932

@@ -74,4 +77,71 @@ class YarnSparkHadoopUtilSuite extends FunSuite with Matchers with Logging {
7477
yarnConf.get(key) should not be default.get(key)
7578
}
7679

80+
81+
test("test getApplicationAclsForYarn default off") {
82+
83+
// default spark acls are off but view acls still default to current user
84+
val sparkConf = new SparkConf()
85+
val securityMgr = new SecurityManager(sparkConf)
86+
val acls = YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr)
87+
88+
assert(acls.getOrElse(ApplicationAccessType.VIEW_APP, "invalid") ===
89+
System.getProperty("user.name", ""))
90+
assert(acls.getOrElse(ApplicationAccessType.MODIFY_APP, "invalid") ===
91+
System.getProperty("user.name", "") )
92+
}
93+
94+
test("test getApplicationAclsForYarn acls on") {
95+
96+
// spark acls on, just pick up default user
97+
val sparkConf = new SparkConf()
98+
sparkConf.set("spark.acls.enable", "true")
99+
100+
val securityMgr = new SecurityManager(sparkConf)
101+
val acls = YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr)
102+
103+
assert(acls.getOrElse(ApplicationAccessType.VIEW_APP, "invalid") ===
104+
System.getProperty("user.name", ""))
105+
assert(acls.getOrElse(ApplicationAccessType.MODIFY_APP, "invalid") ===
106+
System.getProperty("user.name", "") )
107+
}
108+
109+
test("test getApplicationAclsForYarn acls on and specify users") {
110+
111+
// default spark acls are on and specify acls
112+
val sparkConf = new SparkConf()
113+
sparkConf.set("spark.acls.enable", "true")
114+
sparkConf.set("spark.ui.view.acls", "user1,user2")
115+
sparkConf.set("spark.modify.acls", "user3,user4")
116+
117+
val securityMgr = new SecurityManager(sparkConf)
118+
val acls = YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr)
119+
120+
val viewAcls = acls.get(ApplicationAccessType.VIEW_APP)
121+
val modifyAcls = acls.get(ApplicationAccessType.MODIFY_APP)
122+
123+
viewAcls match {
124+
case Some(vacls) => {
125+
val aclSet = vacls.split(',').map(_.trim).toSet
126+
assert(aclSet.contains("user1"))
127+
assert(aclSet.contains("user2"))
128+
assert(aclSet.contains(System.getProperty("user.name", "")))
129+
}
130+
case None => {
131+
fail()
132+
}
133+
}
134+
modifyAcls match {
135+
case Some(macls) => {
136+
val aclSet = macls.split(',').map(_.trim).toSet
137+
assert(aclSet.contains("user3"))
138+
assert(aclSet.contains("user4"))
139+
assert(aclSet.contains(System.getProperty("user.name", "")))
140+
}
141+
case None => {
142+
fail()
143+
}
144+
}
145+
146+
}
77147
}

yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
3535
import org.apache.hadoop.yarn.ipc.YarnRPC
3636
import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records}
3737

38-
import org.apache.spark.{SparkConf, Logging}
38+
import org.apache.spark.{SecurityManager, SparkConf, Logging}
3939

4040

4141
class ExecutorRunnable(
@@ -46,7 +46,8 @@ class ExecutorRunnable(
4646
slaveId: String,
4747
hostname: String,
4848
executorMemory: Int,
49-
executorCores: Int)
49+
executorCores: Int,
50+
securityMgr: SecurityManager)
5051
extends Runnable with ExecutorRunnableUtil with Logging {
5152

5253
var rpc: YarnRPC = YarnRPC.create(conf)
@@ -85,6 +86,8 @@ class ExecutorRunnable(
8586
logInfo("Setting up executor with commands: " + commands)
8687
ctx.setCommands(commands)
8788

89+
ctx.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr))
90+
8891
// Send the start request to the ContainerManager
8992
nmClient.startContainer(container, ctx)
9093
}

0 commit comments

Comments
 (0)