Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records, ProtoUtils}

import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.{SecurityManager, SparkConf, Logging}


class ExecutorRunnable(
Expand All @@ -46,7 +46,8 @@ class ExecutorRunnable(
slaveId: String,
hostname: String,
executorMemory: Int,
executorCores: Int)
executorCores: Int,
securityMgr: SecurityManager)
extends Runnable with ExecutorRunnableUtil with Logging {

var rpc: YarnRPC = YarnRPC.create(conf)
Expand Down Expand Up @@ -86,6 +87,8 @@ class ExecutorRunnable(
logInfo("Setting up executor with commands: " + commands)
ctx.setCommands(commands)

ctx.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr))

// Send the start request to the ContainerManager
val startReq = Records.newRecord(classOf[StartContainerRequest])
.asInstanceOf[StartContainerRequest]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util.concurrent.atomic.AtomicInteger
import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap}

import org.apache.spark.SparkConf
import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.scheduler.SplitInfo

import org.apache.hadoop.conf.Configuration
Expand All @@ -41,8 +41,9 @@ private[yarn] class YarnAllocationHandler(
resourceManager: AMRMProtocol,
appAttemptId: ApplicationAttemptId,
args: ApplicationMasterArguments,
preferredNodes: collection.Map[String, collection.Set[SplitInfo]])
extends YarnAllocator(conf, sparkConf, args, preferredNodes) {
preferredNodes: collection.Map[String, collection.Set[SplitInfo]],
securityMgr: SecurityManager)
extends YarnAllocator(conf, sparkConf, args, preferredNodes, securityMgr) {

private val lastResponseId = new AtomicInteger()
private val releaseList: CopyOnWriteArrayList[ContainerId] = new CopyOnWriteArrayList()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{ConverterUtils, Records}

import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.scheduler.SplitInfo
import org.apache.spark.util.Utils

Expand All @@ -45,15 +45,16 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC
sparkConf: SparkConf,
preferredNodeLocations: Map[String, Set[SplitInfo]],
uiAddress: String,
uiHistoryAddress: String) = {
uiHistoryAddress: String,
securityMgr: SecurityManager) = {
this.rpc = YarnRPC.create(conf)
this.uiHistoryAddress = uiHistoryAddress

resourceManager = registerWithResourceManager(conf)
registerApplicationMaster(uiAddress)

new YarnAllocationHandler(conf, sparkConf, resourceManager, getAttemptId(), args,
preferredNodeLocations)
preferredNodeLocations, securityMgr)
}

override def getAttemptId() = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
val securityMgr = new SecurityManager(sparkConf)

if (isDriver) {
runDriver()
runDriver(securityMgr)
} else {
runExecutorLauncher(securityMgr)
}
Expand Down Expand Up @@ -157,7 +157,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
sparkContextRef.compareAndSet(sc, null)
}

private def registerAM(uiAddress: String) = {
private def registerAM(uiAddress: String, securityMgr: SecurityManager) = {
val sc = sparkContextRef.get()

val appId = client.getAttemptId().getApplicationId().toString()
Expand All @@ -170,13 +170,14 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
if (sc != null) sc.getConf else sparkConf,
if (sc != null) sc.preferredNodeLocationData else Map(),
uiAddress,
historyAddress)
historyAddress,
securityMgr)

allocator.allocateResources()
reporterThread = launchReporterThread()
}

private def runDriver(): Unit = {
private def runDriver(securityMgr: SecurityManager): Unit = {
addAmIpFilter()
val userThread = startUserClass()

Expand All @@ -188,7 +189,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
if (sc == null) {
finish(FinalApplicationStatus.FAILED, "Timed out waiting for SparkContext.")
} else {
registerAM(sc.ui.appUIHostPort)
registerAM(sc.ui.appUIHostPort, securityMgr)
try {
userThread.join()
} finally {
Expand All @@ -203,7 +204,7 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,
conf = sparkConf, securityManager = securityMgr)._1
actor = waitForSparkDriver()
addAmIpFilter()
registerAM(sparkConf.get("spark.driver.appUIAddress", ""))
registerAM(sparkConf.get("spark.driver.appUIAddress", ""), securityMgr)

// In client mode the actor will stop the reporter thread.
reporterThread.join()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -430,10 +430,8 @@ trait ClientBase extends Logging {

// send the acl settings into YARN to control who has access via YARN interfaces
val securityManager = new SecurityManager(sparkConf)
val acls = Map[ApplicationAccessType, String] (
ApplicationAccessType.VIEW_APP -> securityManager.getViewAcls,
ApplicationAccessType.MODIFY_APP -> securityManager.getModifyAcls)
amContainer.setApplicationACLs(acls)
amContainer.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityManager))

amContainer
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.yarn.api.records._
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse

import org.apache.spark.{Logging, SparkConf, SparkEnv}
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkEnv}
import org.apache.spark.scheduler.{SplitInfo, TaskSchedulerImpl}
import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend

Expand All @@ -55,7 +55,8 @@ private[yarn] abstract class YarnAllocator(
conf: Configuration,
sparkConf: SparkConf,
args: ApplicationMasterArguments,
preferredNodes: collection.Map[String, collection.Set[SplitInfo]])
preferredNodes: collection.Map[String, collection.Set[SplitInfo]],
securityMgr: SecurityManager)
extends Logging {

// These three are locked on allocatedHostToContainersMap. Complementary data structures
Expand Down Expand Up @@ -280,7 +281,8 @@ private[yarn] abstract class YarnAllocator(
executorId,
executorHostname,
executorMemory,
executorCores)
executorCores,
securityMgr)
new Thread(executorRunnable).start()
}
}
Expand Down Expand Up @@ -444,4 +446,4 @@ private[yarn] abstract class YarnAllocator(

}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import scala.collection.{Map, Set}
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.api.records._

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.{SecurityManager, SparkConf, SparkContext}
import org.apache.spark.scheduler.SplitInfo

/**
Expand All @@ -45,7 +45,8 @@ trait YarnRMClient {
sparkConf: SparkConf,
preferredNodeLocations: Map[String, Set[SplitInfo]],
uiAddress: String,
uiHistoryAddress: String): YarnAllocator
uiHistoryAddress: String,
securityMgr: SecurityManager): YarnAllocator

/**
* Shuts down the AM. Guaranteed to only be called once.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,11 @@ import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.util.StringInterner
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.api.ApplicationConstants
import org.apache.hadoop.yarn.api.records.ApplicationAccessType
import org.apache.hadoop.yarn.util.RackResolver
import org.apache.hadoop.conf.Configuration

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.{SecurityManager, SparkConf, SparkContext}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -211,4 +212,12 @@ object YarnSparkHadoopUtil {
}
}

private[spark] def getApplicationAclsForYarn(securityMgr: SecurityManager):
Map[ApplicationAccessType, String] = {
Map[ApplicationAccessType, String] (
ApplicationAccessType.VIEW_APP -> securityMgr.getViewAcls,
ApplicationAccessType.MODIFY_APP -> securityMgr.getModifyAcls
)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ import com.google.common.io.{ByteStreams, Files}
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.scalatest.{FunSuite, Matchers}

import org.apache.spark.{Logging, SparkConf}
import org.apache.hadoop.yarn.api.records.ApplicationAccessType

import org.apache.spark.{Logging, SecurityManager, SparkConf}


class YarnSparkHadoopUtilSuite extends FunSuite with Matchers with Logging {

Expand Down Expand Up @@ -74,4 +77,75 @@ class YarnSparkHadoopUtilSuite extends FunSuite with Matchers with Logging {
yarnConf.get(key) should not be default.get(key)
}


test("test getApplicationAclsForYarn acls on") {

// spark acls on, just pick up default user
val sparkConf = new SparkConf()
sparkConf.set("spark.acls.enable", "true")

val securityMgr = new SecurityManager(sparkConf)
val acls = YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr)

val viewAcls = acls.get(ApplicationAccessType.VIEW_APP)
val modifyAcls = acls.get(ApplicationAccessType.MODIFY_APP)

viewAcls match {
case Some(vacls) => {
val aclSet = vacls.split(',').map(_.trim).toSet
assert(aclSet.contains(System.getProperty("user.name", "invalid")))
}
case None => {
fail()
}
}
modifyAcls match {
case Some(macls) => {
val aclSet = macls.split(',').map(_.trim).toSet
assert(aclSet.contains(System.getProperty("user.name", "invalid")))
}
case None => {
fail()
}
}
}

test("test getApplicationAclsForYarn acls on and specify users") {

// default spark acls are on and specify acls
val sparkConf = new SparkConf()
sparkConf.set("spark.acls.enable", "true")
sparkConf.set("spark.ui.view.acls", "user1,user2")
sparkConf.set("spark.modify.acls", "user3,user4")

val securityMgr = new SecurityManager(sparkConf)
val acls = YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr)

val viewAcls = acls.get(ApplicationAccessType.VIEW_APP)
val modifyAcls = acls.get(ApplicationAccessType.MODIFY_APP)

viewAcls match {
case Some(vacls) => {
val aclSet = vacls.split(',').map(_.trim).toSet
assert(aclSet.contains("user1"))
assert(aclSet.contains("user2"))
assert(aclSet.contains(System.getProperty("user.name", "invalid")))
}
case None => {
fail()
}
}
modifyAcls match {
case Some(macls) => {
val aclSet = macls.split(',').map(_.trim).toSet
assert(aclSet.contains("user3"))
assert(aclSet.contains("user4"))
assert(aclSet.contains(System.getProperty("user.name", "invalid")))
}
case None => {
fail()
}
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.ipc.YarnRPC
import org.apache.hadoop.yarn.util.{Apps, ConverterUtils, Records}

import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.{SecurityManager, SparkConf, Logging}


class ExecutorRunnable(
Expand All @@ -46,7 +46,8 @@ class ExecutorRunnable(
slaveId: String,
hostname: String,
executorMemory: Int,
executorCores: Int)
executorCores: Int,
securityMgr: SecurityManager)
extends Runnable with ExecutorRunnableUtil with Logging {

var rpc: YarnRPC = YarnRPC.create(conf)
Expand Down Expand Up @@ -85,6 +86,8 @@ class ExecutorRunnable(
logInfo("Setting up executor with commands: " + commands)
ctx.setCommands(commands)

ctx.setApplicationACLs(YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr))

// Send the start request to the ContainerManager
nmClient.startContainer(container, ctx)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ package org.apache.spark.deploy.yarn
import scala.collection.JavaConversions._
import scala.collection.mutable.{ArrayBuffer, HashMap}

import org.apache.spark.SparkConf
import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.scheduler.SplitInfo

import org.apache.hadoop.conf.Configuration
Expand All @@ -39,8 +39,9 @@ private[yarn] class YarnAllocationHandler(
amClient: AMRMClient[ContainerRequest],
appAttemptId: ApplicationAttemptId,
args: ApplicationMasterArguments,
preferredNodes: collection.Map[String, collection.Set[SplitInfo]])
extends YarnAllocator(conf, sparkConf, args, preferredNodes) {
preferredNodes: collection.Map[String, collection.Set[SplitInfo]],
securityMgr: SecurityManager)
extends YarnAllocator(conf, sparkConf, args, preferredNodes, securityMgr) {

override protected def releaseContainer(container: Container) = {
amClient.releaseAssignedContainer(container.getId())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.hadoop.yarn.util.ConverterUtils
import org.apache.hadoop.yarn.webapp.util.WebAppUtils

import org.apache.spark.{Logging, SparkConf}
import org.apache.spark.{Logging, SecurityManager, SparkConf}
import org.apache.spark.scheduler.SplitInfo
import org.apache.spark.util.Utils

Expand All @@ -46,7 +46,8 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC
sparkConf: SparkConf,
preferredNodeLocations: Map[String, Set[SplitInfo]],
uiAddress: String,
uiHistoryAddress: String) = {
uiHistoryAddress: String,
securityMgr: SecurityManager) = {
amClient = AMRMClient.createAMRMClient()
amClient.init(conf)
amClient.start()
Expand All @@ -55,7 +56,7 @@ private class YarnRMClientImpl(args: ApplicationMasterArguments) extends YarnRMC
logInfo("Registering the ApplicationMaster")
amClient.registerApplicationMaster(Utils.localHostName(), 0, uiAddress)
new YarnAllocationHandler(conf, sparkConf, amClient, getAttemptId(), args,
preferredNodeLocations)
preferredNodeLocations, securityMgr)
}

override def shutdown(status: FinalApplicationStatus, diagnostics: String = "") =
Expand Down