Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
77914dd
WIP: Add kerberos principal and keytab to YARN client.
harishreedharan Jan 30, 2015
ccba5bc
WIP: More changes wrt kerberos
harishreedharan Feb 2, 2015
2b0d745
[SPARK-5342][YARN] Allow long running Spark apps to run on secure YAR…
harishreedharan Feb 19, 2015
f8fe694
Handle None if keytab-login is not scheduled.
harishreedharan Feb 19, 2015
bcfc374
Fix Hadoop-1 build by adding no-op methods in SparkHadoopUtil, with i…
harishreedharan Feb 20, 2015
d282d7a
Fix ClientSuite to set YARN mode, so that the correct class is used i…
harishreedharan Feb 20, 2015
41efde0
Merge branch 'master' into kerberos-longrunning
harishreedharan Feb 22, 2015
fb27f46
Make sure principal and keytab are set before CoarseGrainedSchedulerB…
harishreedharan Feb 24, 2015
8c6928a
Fix issue caused by direct creation of Actor object.
harishreedharan Feb 24, 2015
d79b2b9
Make sure correct credentials are passed to FileSystem#addDelegationT…
harishreedharan Feb 24, 2015
0985b4e
Write tokens to HDFS and read them back when required, rather than se…
harishreedharan Feb 27, 2015
b4cb917
Send keytab to AM via DistributedCache rather than directly via HDFS
harishreedharan Feb 28, 2015
5c11c3e
Move tests to YarnSparkHadoopUtil to fix compile issues.
harishreedharan Feb 28, 2015
f6954da
Got rid of Akka communication to renew, instead the executors check a…
harishreedharan Mar 5, 2015
f0f54cb
Be more defensive when updating the credentials file.
harishreedharan Mar 5, 2015
af6d5f0
Cleaning up files where changes weren't required.
harishreedharan Mar 5, 2015
2debcea
Change the file structure for credentials files. I will push a follow…
harishreedharan Mar 6, 2015
f4fd711
Fix SparkConf usage.
harishreedharan Mar 6, 2015
9ef5f1b
Added explanation of how the credentials refresh works, some other mi…
harishreedharan Mar 7, 2015
55522e3
Fix failure caused by Preconditions ambiguity.
harishreedharan Mar 7, 2015
0de27ee
Merge branch 'master' into kerberos-longrunning
harishreedharan Mar 23, 2015
42813b4
Remove utils.sh, which was re-added due to merge with master.
harishreedharan Mar 23, 2015
fa233bd
Adding logging, fixing minor formatting and ordering issues.
harishreedharan Mar 24, 2015
62c45ce
Relogin from keytab periodically.
harishreedharan Mar 24, 2015
61b2b27
Account for AM restarts by making sure lastSuffix is read from the fi…
harishreedharan Mar 25, 2015
2f9975c
Ensure new tokens are written out immediately on AM restart. Also, pi…
harishreedharan Mar 26, 2015
f74303c
Move the new logic into specialized classes. Add cleanup for old cred…
harishreedharan Mar 27, 2015
bcd11f9
Refactor AM and Executor token update code into separate classes, als…
harishreedharan Apr 8, 2015
7f1bc58
Minor fixes, cleanup.
harishreedharan Apr 9, 2015
0e9507e
Merge branch 'master' into kerberos-longrunning
harishreedharan Apr 9, 2015
e800c8b
Restore original RegisteredExecutor message, and send new tokens via …
harishreedharan Apr 10, 2015
8a4f268
Added docs in the security guide. Changed some code to ensure that th…
harishreedharan Apr 16, 2015
7b19643
Merge branch 'master' into kerberos-longrunning
harishreedharan Apr 16, 2015
bc083e3
Overload RegisteredExecutor to send tokens. Minor doc updates.
harishreedharan Apr 21, 2015
ebb36f5
Merge branch 'master' into kerberos-longrunning
harishreedharan Apr 21, 2015
42eead4
Remove RPC part. Refactor and move methods around, use renewal interv…
harishreedharan Apr 28, 2015
f041dd3
Merge branch 'master' into kerberos-longrunning
harishreedharan Apr 28, 2015
072659e
Fix build failure caused by thread factory getting moved to ThreadUtils.
harishreedharan Apr 28, 2015
6963bbc
Schedule renewal in AM before starting user class. Else, a restarted …
harishreedharan Apr 28, 2015
09fe224
Use token.renew to get token's renewal interval rather than using hdf…
harishreedharan Apr 29, 2015
611923a
Make sure the namenodes are listed correctly for creating tokens.
harishreedharan Apr 29, 2015
36eb8a9
Change the renewal interval config param. Fix a bunch of comments.
harishreedharan Apr 30, 2015
e851f70
Move the ExecutorDelegationTokenRenewer to yarn module. Use reflectio…
harishreedharan May 1, 2015
7bff6e9
Make sure all required classes are present in the jar. Fix import order.
harishreedharan May 1, 2015
b5e7a72
Remove reflection, use a method in SparkHadoopUtil to update the toke…
harishreedharan May 1, 2015
4d04301
Minor formatting fixes.
harishreedharan May 1, 2015
3c86bba
Import fixes. Import postfixOps explicitly.
harishreedharan May 1, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 79 additions & 2 deletions core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@

package org.apache.spark.deploy

import java.io.{ByteArrayInputStream, DataInputStream}
import java.lang.reflect.Method
import java.security.PrivilegedExceptionAction
import java.util.{Arrays, Comparator}

import com.google.common.primitives.Longs
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
import org.apache.hadoop.fs.{FileStatus, FileSystem, Path, PathFilter}
import org.apache.hadoop.fs.FileSystem.Statistics
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapreduce.JobContext
import org.apache.hadoop.security.{Credentials, UserGroupInformation}
Expand All @@ -32,14 +36,17 @@ import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.Utils

import scala.collection.JavaConversions._
import scala.concurrent.duration._
import scala.language.postfixOps

/**
* :: DeveloperApi ::
* Contains util methods to interact with Hadoop from Spark.
*/
@DeveloperApi
class SparkHadoopUtil extends Logging {
val conf: Configuration = newConfiguration(new SparkConf())
private val sparkConf = new SparkConf()
val conf: Configuration = newConfiguration(sparkConf)
UserGroupInformation.setConfiguration(conf)

/**
Expand Down Expand Up @@ -201,6 +208,61 @@ class SparkHadoopUtil extends Logging {
if (baseStatus.isDir) recurse(basePath) else Array(baseStatus)
}

/**
* Lists all the files in a directory with the specified prefix, and does not end with the
* given suffix. The returned {{FileStatus}} instances are sorted by the modification times of
* the respective files.
*/
def listFilesSorted(
remoteFs: FileSystem,
dir: Path,
prefix: String,
exclusionSuffix: String): Array[FileStatus] = {
val fileStatuses = remoteFs.listStatus(dir,
new PathFilter {
override def accept(path: Path): Boolean = {
val name = path.getName
name.startsWith(prefix) && !name.endsWith(exclusionSuffix)
}
})
Arrays.sort(fileStatuses, new Comparator[FileStatus] {
override def compare(o1: FileStatus, o2: FileStatus): Int = {
Longs.compare(o1.getModificationTime, o2.getModificationTime)
}
})
fileStatuses
}

/**
* How much time is remaining (in millis) from now to (fraction * renewal time for the token that
* is valid the latest)?
* This will return -ve (or 0) value if the fraction of validity has already expired.
*/
def getTimeFromNowToRenewal(
sparkConf: SparkConf,
fraction: Double,
credentials: Credentials): Long = {
val now = System.currentTimeMillis()

val renewalInterval =
sparkConf.getLong("spark.yarn.token.renewal.interval", (24 hours).toMillis)

credentials.getAllTokens.filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND)
.map { t =>
val identifier = new DelegationTokenIdentifier()
identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier)))
(identifier.getIssueDate + fraction * renewalInterval).toLong - now
}.foldLeft(0L)(math.max)
}


private[spark] def getSuffixForCredentialsPath(credentialsPath: Path): Int = {
val fileName = credentialsPath.getName
fileName.substring(
fileName.lastIndexOf(SparkHadoopUtil.SPARK_YARN_CREDS_COUNTER_DELIM) + 1).toInt
}


private val HADOOP_CONF_PATTERN = "(\\$\\{hadoopconf-[^\\}\\$\\s]+\\})".r.unanchored

/**
Expand Down Expand Up @@ -231,6 +293,17 @@ class SparkHadoopUtil extends Logging {
}
}
}

/**
* Start a thread to periodically update the current user's credentials with new delegation
* tokens so that writes to HDFS do not fail.
*/
private[spark] def startExecutorDelegationTokenRenewer(conf: SparkConf) {}

/**
* Stop the thread that does the delegation token updates.
*/
private[spark] def stopExecutorDelegationTokenRenewer() {}
}

object SparkHadoopUtil {
Expand All @@ -251,6 +324,10 @@ object SparkHadoopUtil {
}
}

val SPARK_YARN_CREDS_TEMP_EXTENSION = ".tmp"

val SPARK_YARN_CREDS_COUNTER_DELIM = "-"

def get: SparkHadoopUtil = {
hadoop
}
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,10 @@ object SparkSubmit {
OptionAssigner(args.archives, YARN, CLUSTER, clOption = "--archives"),
OptionAssigner(args.jars, YARN, CLUSTER, clOption = "--addJars"),

// Yarn client or cluster
OptionAssigner(args.principal, YARN, ALL_DEPLOY_MODES, clOption = "--principal"),
OptionAssigner(args.keytab, YARN, ALL_DEPLOY_MODES, clOption = "--keytab"),

// Other options
OptionAssigner(args.executorCores, STANDALONE, ALL_DEPLOY_MODES,
sysProp = "spark.executor.cores"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
var action: SparkSubmitAction = null
val sparkProperties: HashMap[String, String] = new HashMap[String, String]()
var proxyUser: String = null
var principal: String = null
var keytab: String = null

// Standalone cluster mode only
var supervise: Boolean = false
Expand Down Expand Up @@ -392,6 +394,12 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
case PROXY_USER =>
proxyUser = value

case PRINCIPAL =>
principal = value

case KEYTAB =>
keytab = value

case HELP =>
printUsageAndExit(0)

Expand Down Expand Up @@ -503,6 +511,13 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
| --num-executors NUM Number of executors to launch (Default: 2).
| --archives ARCHIVES Comma separated list of archives to be extracted into the
| working directory of each executor.
| --principal PRINCIPAL Principal to be used to login to KDC, while running on
| secure HDFS.
| --keytab KEYTAB The full path to the file that contains the keytab for the
| principal specified above. This keytab will be copied to
| the node running the Application Master via the Secure
| Distributed Cache, for renewing the login tickets and the
| delegation tokens periodically.
""".stripMargin
)
SparkSubmit.exitFn()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ package org.apache.spark.executor
import java.net.URL
import java.nio.ByteBuffer

import org.apache.hadoop.conf.Configuration
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: out of order


import scala.collection.mutable
import scala.util.{Failure, Success}

Expand Down Expand Up @@ -168,6 +170,12 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
driverConf.set(key, value)
}
}
if (driverConf.contains("spark.yarn.credentials.file")) {
logInfo("Will periodically update credentials from: " +
driverConf.get("spark.yarn.credentials.file"))
SparkHadoopUtil.get.startExecutorDelegationTokenRenewer(driverConf)
}

val env = SparkEnv.createExecutorEnv(
driverConf, executorId, hostname, port, cores, isLocal = false)

Expand All @@ -183,6 +191,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
}
env.rpcEnv.awaitTermination()
SparkHadoopUtil.get.stopExecutorDelegationTokenRenewer()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp

class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)])
extends ThreadSafeRpcEndpoint with Logging {

override protected def log = CoarseGrainedSchedulerBackend.this.log

private val addressToExecutorId = new HashMap[RpcAddress, String]
Expand Down Expand Up @@ -112,6 +113,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Ignoring the task kill since the executor is not registered.
logWarning(s"Attempted to kill task $taskId for unknown executor $executorId.")
}

}

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
Expand All @@ -122,7 +124,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
} else {
logInfo("Registered executor: " + executorRef + " with ID " + executorId)
context.reply(RegisteredExecutor)

addressToExecutorId(executorRef.address) = executorId
totalCoreCount.addAndGet(cores)
totalRegisteredExecutors.addAndGet(1)
Expand Down Expand Up @@ -243,6 +244,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
properties += ((key, value))
}
}

// TODO (prashant) send conf instead of properties
driverEndpoint = rpcEnv.setupEndpoint(
CoarseGrainedSchedulerBackend.ENDPOINT_NAME, new DriverEndpoint(rpcEnv, properties))
Expand Down
2 changes: 2 additions & 0 deletions docs/security.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ SSL must be configured on each node and configured for each component involved i
### YARN mode
The key-store can be prepared on the client side and then distributed and used by the executors as the part of the application. It is possible because the user is able to deploy files before the application is started in YARN by using `spark.yarn.dist.files` or `spark.yarn.dist.archives` configuration settings. The responsibility for encryption of transferring these files is on YARN side and has nothing to do with Spark.

For long-running apps like Spark Streaming apps to be able to write to HDFS, it is possible to pass a principal and keytab to `spark-submit` via the `--principal` and `--keytab` parameters respectively. The keytab passed in will be copied over to the machine running the Application Master via the Hadoop Distributed Cache (securely - if YARN is configured with SSL and HDFS encryption is enabled). The Kerberos login will be periodically renewed using this principal and keytab and the delegation tokens required for HDFS will be generated periodically so the application can continue writing to HDFS.

### Standalone mode
The user needs to provide key-stores and configuration options for master and workers. They have to be set by attaching appropriate Java system properties in `SPARK_MASTER_OPTS` and in `SPARK_WORKER_OPTS` environment variables, or just in `SPARK_DAEMON_JAVA_OPTS`. In this mode, the user may allow the executors to use the SSL settings inherited from the worker which spawned that executor. It can be accomplished by setting `spark.ssl.useNodeLocalConf` to `true`. If that parameter is set, the settings provided by user on the client side, are not used by the executors.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,10 @@ class SparkSubmitOptionParser {
// YARN-only options.
protected final String ARCHIVES = "--archives";
protected final String EXECUTOR_CORES = "--executor-cores";
protected final String QUEUE = "--queue";
protected final String KEYTAB = "--keytab";
protected final String NUM_EXECUTORS = "--num-executors";
protected final String PRINCIPAL = "--principal";
protected final String QUEUE = "--queue";

/**
* This is the canonical list of spark-submit options. Each entry in the array contains the
Expand All @@ -96,11 +98,13 @@ class SparkSubmitOptionParser {
{ EXECUTOR_MEMORY },
{ FILES },
{ JARS },
{ KEYTAB },
{ KILL_SUBMISSION },
{ MASTER },
{ NAME },
{ NUM_EXECUTORS },
{ PACKAGES },
{ PRINCIPAL },
{ PROPERTIES_FILE },
{ PROXY_USER },
{ PY_FILES },
Expand Down
Loading