Skip to content

Commit 51b53a7

Browse files
committed
[SPARK-3260] yarn - pass acls along with executor launch
Pass along the acl settings when we launch a container so that they can be applied to viewing the logs on a running NodeManager. Author: Thomas Graves <[email protected]> Closes apache#2185 from tgravescs/SPARK-3260 and squashes the following commits: 6f94b5a [Thomas Graves] make unit test more robust 28b9dd3 [Thomas Graves] yarn - pass acls along with executor launch
1 parent 6a37ed8 commit 51b53a7

File tree

12 files changed

+129
-34
lines changed

12 files changed

+129
-34
lines changed

yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
3535
import org.apache.hadoop.yarn.ipc.YarnRPC
3636
import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records, ProtoUtils}
3737

38-
import org.apache.spark.{SparkConf, Logging}
38+
import org.apache.spark.{SecurityManager, SparkConf, Logging}
3939

4040

4141
class ExecutorRunnable(
@@ -46,7 +46,8 @@ class ExecutorRunnable(
4646
slaveId: String,
4747
hostname: String,
4848
executorMemory: Int,
49-
executorCores: Int)
49+
executorCores: Int,
50+
securityMgr: SecurityManager)
5051
extends Runnable with ExecutorRunnableUtil with Logging {
5152

5253
var rpc: YarnRPC = YarnRPC.create(conf)
@@ -86,6 +87,8 @@ class ExecutorRunnable(
8687
logInfo("Setting up executor with commands: " + commands)
8788
ctx.setCommands(commands)
8889

90+
ctx.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr))
91+
8992
// Send the start request to the ContainerManager
9093
val startReq = Records.newRecord(classOf[StartContainerRequest])
9194
.asInstanceOf[StartContainerRequest]

yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicInteger
2323
import scala.collection.JavaConversions._
2424
import scala.collection.mutable.{ArrayBuffer, HashMap}
2525

26-
import org.apache.spark.SparkConf
26+
import org.apache.spark.{SecurityManager, SparkConf}
2727
import org.apache.spark.scheduler.SplitInfo
2828

2929
import org.apache.hadoop.conf.Configuration
@@ -41,8 +41,9 @@ private[yarn] class YarnAllocationHandler(
4141
resourceManager: AMRMProtocol,
4242
appAttemptId: ApplicationAttemptId,
4343
args: ApplicationMasterArguments,
44-
preferredNodes: collection.Map[String, collection.Set[SplitInfo]])
45-
extends YarnAllocator(conf, sparkConf, args, preferredNodes) {
44+
preferredNodes: collection.Map[String, collection.Set[SplitInfo]],
45+
securityMgr: SecurityManager)
46+
extends YarnAllocator(conf, sparkConf, args, preferredNodes, securityMgr) {
4647

4748
private val lastResponseId = new AtomicInteger()
4849
private val releaseList: CopyOnWriteArrayList[ContainerId] = new CopyOnWriteArrayList()

yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClientImpl.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
2727
import org.apache.hadoop.yarn.ipc.YarnRPC
2828
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}
2929

30-
import org.apache.spark.{Logging, SparkConf}
30+
import org.apache.spark.{Logging, SecurityManager, SparkConf}
3131
import org.apache.spark.scheduler.SplitInfo
3232
import org.apache.spark.util.Utils
3333

@@ -45,15 +45,16 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC
4545
sparkConf: SparkConf,
4646
preferredNodeLocations: Map[String, Set[SplitInfo]],
4747
uiAddress: String,
48-
uiHistoryAddress: String) = {
48+
uiHistoryAddress: String,
49+
securityMgr: SecurityManager) = {
4950
this.rpc = YarnRPC.create(conf)
5051
this.uiHistoryAddress = uiHistoryAddress
5152

5253
resourceManager = registerWithResourceManager(conf)
5354
registerApplicationMaster(uiAddress)
5455

5556
new YarnAllocationHandler(conf, sparkConf, resourceManager, getAttemptId(), args,
56-
preferredNodeLocations)
57+
preferredNodeLocations, securityMgr)
5758
}
5859

5960
override def getAttemptId() = {

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
116116
val securityMgr = new SecurityManager(sparkConf)
117117

118118
if (isDriver) {
119-
runDriver()
119+
runDriver(securityMgr)
120120
} else {
121121
runExecutorLauncher(securityMgr)
122122
}
@@ -157,7 +157,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
157157
sparkContextRef.compareAndSet(sc, null)
158158
}
159159

160-
private def registerAM(uiAddress: String) = {
160+
private def registerAM(uiAddress: String, securityMgr: SecurityManager) = {
161161
val sc = sparkContextRef.get()
162162

163163
val appId = client.getAttemptId().getApplicationId().toString()
@@ -170,13 +170,14 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
170170
if (sc != null) sc.getConf else sparkConf,
171171
if (sc != null) sc.preferredNodeLocationData else Map(),
172172
uiAddress,
173-
historyAddress)
173+
historyAddress,
174+
securityMgr)
174175

175176
allocator.allocateResources()
176177
reporterThread = launchReporterThread()
177178
}
178179

179-
private def runDriver(): Unit = {
180+
private def runDriver(securityMgr: SecurityManager): Unit = {
180181
addAmIpFilter()
181182
val userThread = startUserClass()
182183

@@ -188,7 +189,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
188189
if (sc == null) {
189190
finish(FinalApplicationStatus.FAILED, "Timed out waiting for SparkContext.")
190191
} else {
191-
registerAM(sc.ui.appUIHostPort)
192+
registerAM(sc.ui.appUIHostPort, securityMgr)
192193
try {
193194
userThread.join()
194195
} finally {
@@ -203,7 +204,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
203204
conf = sparkConf, securityManager = securityMgr)._1
204205
actor = waitForSparkDriver()
205206
addAmIpFilter()
206-
registerAM(sparkConf.get("spark.driver.appUIAddress", ""))
207+
registerAM(sparkConf.get("spark.driver.appUIAddress", ""), securityMgr)
207208

208209
// In client mode the actor will stop the reporter thread.
209210
reporterThread.join()

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ClientBase.scala

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -430,10 +430,8 @@ trait ClientBase extends Logging {
430430

431431
// send the acl settings into YARN to control who has access via YARN interfaces
432432
val securityManager = new SecurityManager(sparkConf)
433-
val acls = Map[ApplicationAccessType, String] (
434-
ApplicationAccessType.VIEW_APP -> securityManager.getViewAcls,
435-
ApplicationAccessType.MODIFY_APP -> securityManager.getModifyAcls)
436-
amContainer.setApplicationACLs(acls)
433+
amContainer.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityManager))
434+
437435
amContainer
438436
}
439437
}

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ import org.apache.hadoop.conf.Configuration
2828
import org.apache.hadoop.yarn.api.records._
2929
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse
3030

31-
import org.apache.spark.{Logging, SparkConf, SparkEnv}
31+
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv}
3232
import org.apache.spark.scheduler.{SplitInfo, TaskSchedulerImpl}
3333
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
3434

@@ -55,7 +55,8 @@ private[yarn] abstract class YarnAllocator(
5555
conf: Configuration,
5656
sparkConf: SparkConf,
5757
args: ApplicationMasterArguments,
58-
preferredNodes: collection.Map[String, collection.Set[SplitInfo]])
58+
preferredNodes: collection.Map[String, collection.Set[SplitInfo]],
59+
securityMgr: SecurityManager)
5960
extends Logging {
6061

6162
// These three are locked on allocatedHostToContainersMap. Complementary data structures
@@ -280,7 +281,8 @@ private[yarn] abstract class YarnAllocator(
280281
executorId,
281282
executorHostname,
282283
executorMemory,
283-
executorCores)
284+
executorCores,
285+
securityMgr)
284286
new Thread(executorRunnable).start()
285287
}
286288
}
@@ -444,4 +446,4 @@ private[yarn] abstract class YarnAllocator(
444446

445447
}
446448

447-
}
449+
}

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnRMClient.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ import scala.collection.{Map, Set}
2222
import org.apache.hadoop.yarn.conf.YarnConfiguration
2323
import org.apache.hadoop.yarn.api.records._
2424

25-
import org.apache.spark.{SparkConf, SparkContext}
25+
import org.apache.spark.{SecurityManager, SparkConf, SparkContext}
2626
import org.apache.spark.scheduler.SplitInfo
2727

2828
/**
@@ -45,7 +45,8 @@ trait YarnRMClient {
4545
sparkConf: SparkConf,
4646
preferredNodeLocations: Map[String, Set[SplitInfo]],
4747
uiAddress: String,
48-
uiHistoryAddress: String): YarnAllocator
48+
uiHistoryAddress: String,
49+
securityMgr: SecurityManager): YarnAllocator
4950

5051
/**
5152
* Shuts down the AM. Guaranteed to only be called once.

yarn/common/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,11 @@ import org.apache.hadoop.security.UserGroupInformation
3232
import org.apache.hadoop.util.StringInterner
3333
import org.apache.hadoop.yarn.conf.YarnConfiguration
3434
import org.apache.hadoop.yarn.api.ApplicationConstants
35+
import org.apache.hadoop.yarn.api.records.ApplicationAccessType
3536
import org.apache.hadoop.yarn.util.RackResolver
3637
import org.apache.hadoop.conf.Configuration
3738

38-
import org.apache.spark.{SparkConf, SparkContext}
39+
import org.apache.spark.{SecurityManager, SparkConf, SparkContext}
3940
import org.apache.spark.deploy.SparkHadoopUtil
4041
import org.apache.spark.util.Utils
4142

@@ -211,4 +212,12 @@ object YarnSparkHadoopUtil {
211212
}
212213
}
213214

215+
private[spark] def getApplicationAclsForYarn(securityMgr: SecurityManager):
216+
Map[ApplicationAccessType, String] = {
217+
Map[ApplicationAccessType, String] (
218+
ApplicationAccessType.VIEW_APP -> securityMgr.getViewAcls,
219+
ApplicationAccessType.MODIFY_APP -> securityMgr.getModifyAcls
220+
)
221+
}
222+
214223
}

yarn/common/src/test/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtilSuite.scala

Lines changed: 75 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,10 @@ import com.google.common.io.{ByteStreams, Files}
2323
import org.apache.hadoop.yarn.conf.YarnConfiguration
2424
import org.scalatest.{FunSuite, Matchers}
2525

26-
import org.apache.spark.{Logging, SparkConf}
26+
import org.apache.hadoop.yarn.api.records.ApplicationAccessType
27+
28+
import org.apache.spark.{Logging, SecurityManager, SparkConf}
29+
2730

2831
class YarnSparkHadoopUtilSuite extends FunSuite with Matchers with Logging {
2932

@@ -74,4 +77,75 @@ class YarnSparkHadoopUtilSuite extends FunSuite with Matchers with Logging {
7477
yarnConf.get(key) should not be default.get(key)
7578
}
7679

80+
81+
test("test getApplicationAclsForYarn acls on") {
82+
83+
// spark acls on, just pick up default user
84+
val sparkConf = new SparkConf()
85+
sparkConf.set("spark.acls.enable", "true")
86+
87+
val securityMgr = new SecurityManager(sparkConf)
88+
val acls = YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr)
89+
90+
val viewAcls = acls.get(ApplicationAccessType.VIEW_APP)
91+
val modifyAcls = acls.get(ApplicationAccessType.MODIFY_APP)
92+
93+
viewAcls match {
94+
case Some(vacls) => {
95+
val aclSet = vacls.split(',').map(_.trim).toSet
96+
assert(aclSet.contains(System.getProperty("user.name", "invalid")))
97+
}
98+
case None => {
99+
fail()
100+
}
101+
}
102+
modifyAcls match {
103+
case Some(macls) => {
104+
val aclSet = macls.split(',').map(_.trim).toSet
105+
assert(aclSet.contains(System.getProperty("user.name", "invalid")))
106+
}
107+
case None => {
108+
fail()
109+
}
110+
}
111+
}
112+
113+
test("test getApplicationAclsForYarn acls on and specify users") {
114+
115+
// default spark acls are on and specify acls
116+
val sparkConf = new SparkConf()
117+
sparkConf.set("spark.acls.enable", "true")
118+
sparkConf.set("spark.ui.view.acls", "user1,user2")
119+
sparkConf.set("spark.modify.acls", "user3,user4")
120+
121+
val securityMgr = new SecurityManager(sparkConf)
122+
val acls = YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr)
123+
124+
val viewAcls = acls.get(ApplicationAccessType.VIEW_APP)
125+
val modifyAcls = acls.get(ApplicationAccessType.MODIFY_APP)
126+
127+
viewAcls match {
128+
case Some(vacls) => {
129+
val aclSet = vacls.split(',').map(_.trim).toSet
130+
assert(aclSet.contains("user1"))
131+
assert(aclSet.contains("user2"))
132+
assert(aclSet.contains(System.getProperty("user.name", "invalid")))
133+
}
134+
case None => {
135+
fail()
136+
}
137+
}
138+
modifyAcls match {
139+
case Some(macls) => {
140+
val aclSet = macls.split(',').map(_.trim).toSet
141+
assert(aclSet.contains("user3"))
142+
assert(aclSet.contains("user4"))
143+
assert(aclSet.contains(System.getProperty("user.name", "invalid")))
144+
}
145+
case None => {
146+
fail()
147+
}
148+
}
149+
150+
}
77151
}

yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
3535
import org.apache.hadoop.yarn.ipc.YarnRPC
3636
import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records}
3737

38-
import org.apache.spark.{SparkConf, Logging}
38+
import org.apache.spark.{SecurityManager, SparkConf, Logging}
3939

4040

4141
class ExecutorRunnable(
@@ -46,7 +46,8 @@ class ExecutorRunnable(
4646
slaveId: String,
4747
hostname: String,
4848
executorMemory: Int,
49-
executorCores: Int)
49+
executorCores: Int,
50+
securityMgr: SecurityManager)
5051
extends Runnable with ExecutorRunnableUtil with Logging {
5152

5253
var rpc: YarnRPC = YarnRPC.create(conf)
@@ -85,6 +86,8 @@ class ExecutorRunnable(
8586
logInfo("Setting up executor with commands: " + commands)
8687
ctx.setCommands(commands)
8788

89+
ctx.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr))
90+
8891
// Send the start request to the ContainerManager
8992
nmClient.startContainer(container, ctx)
9093
}

0 commit comments

Comments
 (0)