Skip to content

Conversation

@ArtRand
Copy link

@ArtRand ArtRand commented Sep 19, 2017

What changes were proposed in this pull request?

tl;dr: Add a class, MesosHadoopDelegationTokenManager that updates delegation tokens on a schedule on the behalf of Spark Drivers. Broadcast renewed credentials to the executors.

The problem

We recently added Kerberos support to Mesos-based Spark jobs as well as Secrets support to the Mesos Dispatcher (SPARK-16742, SPARK-20812, respectively). However the delegation tokens have a defined expiration. This poses a problem for long running Spark jobs (e.g. Spark Streaming applications). YARN has a solution for this where a thread is scheduled to renew the tokens they reach 75% of their way to expiration. It then writes the tokens to HDFS for the executors to find (uses a monotonically increasing suffix).

This solution

We replace the current method in CoarseGrainedSchedulerBackend which used to discard the token renewal time with a protected method fetchHadoopDelegationTokens. Now the individual cluster backends are responsible for overriding this method to fetch and manage token renewal. The delegation tokens themselves, are still part of the CoarseGrainedSchedulerBackend as before.
In the case of Mesos renewed Credentials are broadcasted to the executors. This maintains all transfer of Credentials within Spark (as opposed to Spark-to-HDFS). It also does not require any writing of Credentials to disk. It also does not require any GC of old files.

How was this patch tested?

Manually against a Kerberized HDFS cluster.

Thank you for the reviews.

@ArtRand ArtRand changed the title [Spark-21842] Support Kerberos ticket renewal and creation in Mesos [Spark-21842][Mesos] Support Kerberos ticket renewal and creation in Mesos Sep 19, 2017
val creds = SparkHadoopUtil.get.deserialize(tokens)
// decode tokens and add them to the credentials
UserGroupInformation.getCurrentUser.addCredentials(SparkHadoopUtil.get.deserialize(tokens))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

use the val above: UserGroupInformation.getCurrentUser.addCredentials(creds)

}
val tempCreds = ugi.getCredentials
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
var nextRenewalTime = Long.MaxValue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as spark.yarn.credentials.renewalTime, should not we have a common value somewhere?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When driver is restarted in case of yarn the old renewalTime is restored:


Does the code here cover this?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now when the MesosCredentialRenewer is initialized, it renews the current tokens and sets the renewal time to whatever the expiration time of those tokens is. On a driver restart, the same thing would happen. We could add spark.yarn.credentials.renewalTime as an override, but if the driver restarts, say 2 days later, spark.yarn.credentials.renewalTime is no longer relevant and it'll just immediately renew anyways.

Relavent code:
https://github.com/mesosphere/spark/blob/spark-21842-450-kerberos-ticket-renewal/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala#L210
^^ Where the initial renewal time is set
https://github.com/mesosphere/spark/blob/spark-21842-450-kerberos-ticket-renewal/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCredentialRenewer.scala#L66
^^ where we initialize the renewal time if the renewal time has passed

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok so we always re-new when we start by fetching the tokens, got it.

executor.stop()
}
}.start()
case UpdateDelegationTokens(tokens) =>
Copy link
Contributor

@skonto skonto Sep 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add a comment that this is received only in mesos case, since CoarseGrainedExecutorBackend is used by both yarn and standalone.


private[spark] object CoarseGrainedExecutorBackend extends Logging {

private def addDelegationTokens(tokens: Array[Byte], sparkConf: SparkConf) {
Copy link
Contributor

@skonto skonto Sep 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should move this to SparkHadoopUtil and re-use methods such as:

def addCurrentUserCredentials(creds: Credentials): Unit = {

for ((x, executorData) <- executorDataMap) {
executorData.executorEndpoint.send(UpdateDelegationTokens(tokens))
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove space

val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
val creds = SparkHadoopUtil.get.deserialize(bytes)
val intervals = creds.getAllTokens.asScala.flatMap { t =>
Try {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

t -> token
This method does not return an interval, it just returns the new expiration time.
Compare with:

new MesosCredentialRenewer(
conf,
hadoopDelegationTokenManager.get,
MesosCredentialRenewer.getTokenRenewalInterval(hadoopDelegationCreds.get, conf),
Copy link
Contributor

@skonto skonto Sep 19, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hadoopDelegationCreds.get call. Should we check against none? Creds are loaded in CoarseGrainedSchedulerBackend but if they are missing we should fail here?

@skonto
Copy link
Contributor

skonto commented Sep 19, 2017

When executor is started it asks from CoarseSchedulerBackend the spark config which contains the hadoop credentials

.
As we have discussed this is not safe. Using rpc an arbitrary executor could register to the scheduler and get tokens. Does this code handle this, do we authenticate executors?
Another topic is how about encryption at the rpc level? The latter is not supported on mesos (spark.io.encryption.enabled).

Copy link
Contributor

@susanxhuynh susanxhuynh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, Art! Would you mind adding a note about broadcasting the tokens to executors in the PR description? Also, see comments.

} catch {
case e: Exception =>
// Log the error and try to write new tokens back in an hour
logWarning("Couldn't broadcast tokens, trying agin in 20 seconds", e)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(sp) "again"

broadcastDelegationTokens(creds)
} catch {
case e: Exception =>
// Log the error and try to write new tokens back in an hour
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment says "an hour" but code has 20 seconds.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch, I changed the code to match the YARN equivalent.


case UpdateDelegationTokens(tokens) =>
logDebug("Asking each executor to update HDFS delegation tokens")
for ((x, executorData) <- executorDataMap) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(_, executorData) would be more Scala-like.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively executorDataMap.values.foreach(...)

val credentialRenewerThread = new Thread {
setName("MesosCredentialRenewer")
override def run(): Unit = {
val dummy: Option[Array[Byte]] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this for?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whoops!

def scheduleRenewal(runnable: Runnable): Unit = {
val remainingTime = timeOfNextRenewal - System.currentTimeMillis()
// val remainingTime = timeOfNextRenewal - System.currentTimeMillis()
val remainingTime = 5000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why 5000?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

well that's embarrassing, just a debugging tool that I forgot to remove.

Copy link
Contributor

@susanxhuynh susanxhuynh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

logInfo(s"Attempting to login with ${conf.get("spark.yarn.principal", null)}")
// Get new delegation tokens by logging in with a new UGI
// inspired by AMCredentialRenewer.scala:L174
val ugi = if (mode == "keytab") {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see where it refreshes the delegation tokens until the max-lifetime, then re-login with the keytab to get a new delegation tokens that'll last until the max-lifetime.

Does this skip over the potential issues with expiring delegation tokens (after the max-lifetime, 7 days default) by just re-logging in with the keytab every time the delegation tokens need to refresh, and then grabbing a new set of delegation tokens?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello @kalvinnchau You are correct, all this does is keep track of when the tokens will expire and renew them at that time. Part of my motivation for doing this is to avoid writing any files to disk (like new TGTs, if that's what you're suggesting). We can simply mount the keytab via the Mesos secrets primitive, then renew the tokens every so often. In order to be consistent I tried to keep this solution as close to YARN as possible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The correct way would be for the credential management code to differentiate between token creation and token renewal; that way it would renew tokens at the renewal internal and create new ones after the max lifetime.

But it seems the original implementation took a shortcut and just creates new one instead of renewing existing ones; changing that would require changes in the credential provider interfaces, so this is enough for now.

new MesosCredentialRenewer(
conf,
hadoopDelegationTokenManager.get,
MesosCredentialRenewer.getTokenRenewalTime(hadoopDelegationCreds.get, conf),

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This sets the first renewal time to be the expiration time of the token.

It should be similar to the way next renewal time in the MesosCredentialRenewer class is calculated so that it renews the first token after 75% of expiration time has passed:

val currTime = System.currentTimeMillis()
val renewTime = MesosCredentialRenewer.getTokenRenewalTime(hadoopDelegationCreds.get, conf)
val rt = 0.75 * (renewTime - currTime)

val credentialRenewer =
   new MesosCredentialRenewer(
     conf,
     hadoopDelegationTokenManager.get,
     (currTime + rt).toLong,
     driverEndpoint)
credentialRenewer.scheduleTokenRenewal()

@ArtRand
Copy link
Author

ArtRand commented Sep 26, 2017

Hey @kalvinnchau good catch on the first renewal time. I believe I addressed it. Have a look. Thanks again.

@kalvinnchau
Copy link

@ArtRand thanks! I've been testing a local version of doing that, I'll pull that change in and test it as well.

@kalvinnchau
Copy link

@ArtRand
I'm running into an issue where it seems like the delegation tokens are being sent to the executors (as in I see the logs stating that the tokens are being broadcast), but the old delegation tokens are still in use.

When the job first starts up it created token 777222, then ~18 hours in the refresh occurs:
[INFO ] 2017-09-26 17:59:20.639 [Credential Refresh Thread-0] DFSClient - Created HDFS_DELEGATION_TOKEN token 772826 for <principal> on ha-hdfs:<namenode>

[INFO ] 2017-09-26 17:59:20.747 [Credential Refresh Thread-0] MesosCredentialRenewer - Sending new tokens to all executors

Then at ~24 hour mark I get the exception:
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): token (HDFS_DELEGATION_TOKEN token 772222 for <principal>) is expired

Are you aware of anything that needs to be done to tell the thread to use the newer tokens? Or do the older tokens need to be removed from the UGI?

@ArtRand
Copy link
Author

ArtRand commented Sep 28, 2017

Hey @kalvinnchau thanks for having the patience to try this. This is a curious error though.

If you look at the addAll method called by UserGroupInformation.addCredentials() it should overwrite the current credentials.

I tried to reproduce your error, but being less patient, I changed my HDFS setup to request the tokens be updated every minute instead of everyday by adding the following to hdfs-site.xml:

    <property>
        <name>dfs.namenode.delegation.token.max-lifetime</name>
        <value>60000</value>
    </property>

I added some logging to the executor backend to check if they were indeed being updated.

case UpdateDelegationTokens(tokens) =>
      logInfo("Got request to update tokens")
      val oldCreds = UserGroupInformation.getCurrentUser.getCredentials
      for (t <- oldCreds.getAllTokens.asScala) {
        logInfo(s"Old Creds ${DelegationTokenIdentifier.stringifyToken(t)}")
      }
      val creds = SparkHadoopUtil.get.deserialize(tokens)
      for (t <- creds.getAllTokens.asScala) {
        val s = DelegationTokenIdentifier.stringifyToken(t)
        logInfo(s"Got new tokens $s")
      }
      SparkHadoopUtil.get.addDelegationTokens(tokens, env.conf)
      val newCreds = UserGroupInformation.getCurrentUser.getCredentials
      for (t <- newCreds.getAllTokens.asScala) {
        logInfo(s"New creds ${DelegationTokenIdentifier.stringifyToken(t)}")
      }

and indeed when I check the logs, indeed the token number has been updated.

17/09/28 03:32:58 INFO CoarseGrainedExecutorBackend: Got request to update tokens
17/09/28 03:32:58 INFO CoarseGrainedExecutorBackend: Old Creds HDFS_DELEGATION_TOKEN token 29 for hdfs on ha-hdfs:hdfs
17/09/28 03:32:59 INFO CoarseGrainedExecutorBackend: Got new tokens HDFS_DELEGATION_TOKEN token 31 for hdfs on ha-hdfs:hdfs
17/09/28 03:32:59 INFO CoarseGrainedExecutorBackend: New creds HDFS_DELEGATION_TOKEN token 31 for hdfs on ha-hdfs:hdfs

then some time later (in fact there was another update in the middle):

17/09/28 03:35:14 INFO CoarseGrainedExecutorBackend: Got request to update tokens
17/09/28 03:35:14 INFO CoarseGrainedExecutorBackend: Old Creds HDFS_DELEGATION_TOKEN token 34 for hdfs on ha-hdfs:hdfs
17/09/28 03:35:14 INFO CoarseGrainedExecutorBackend: Got new tokens HDFS_DELEGATION_TOKEN token 35 for hdfs on ha-hdfs:hdfs
17/09/28 03:35:14 INFO CoarseGrainedExecutorBackend: New creds HDFS_DELEGATION_TOKEN token 35 for hdfs on ha-hdfs:hdfs

I will run a 24h experiment to verify, but hopefully there is a way to validate that the update is working without waiting that long just to debug!

@vanzin Could you eyeball this? Am I missing something obvious?

@vanzin
Copy link
Contributor

vanzin commented Sep 28, 2017

What you wrote sounds correct. However, I've seen errors like the ones above in the past, but haven't been able to fully debug them due to lack of logs. Part of it is because the user running the app didn't provide me the full logs; but also Spark currently doesn't have the logs you added in your debug code, not even at debug level, so we have only partial information about exactly what's going on from existing logs. (And these error tend to happen only after x days, so it's kind of a pain to reproduce.)

That all being said, though, as I mentioned, what you wrote sounds correct AFAIK.

@vanzin
Copy link
Contributor

vanzin commented Sep 28, 2017

Another thing to keep in mind is that different Hadoop versions have different bugs in this area, so if you use a version of the Hadoop client library that suffers from some issue, or are talking to an HDFS version that has some bug, that can cause problems.

Without researching more I only remember an issue that affects HA mode (HDFS-9276), but there might be others.

@ArtRand
Copy link
Author

ArtRand commented Sep 29, 2017

Hello @vanzin

Thanks for taking a look at this. Good to know that there can be downstream errors depending on the situation.

Would very much appreciate a proper review on this work when you have some time, very keen on getting this into the next release.

@kalvinnchau
Copy link

@ArtRand curious, what version of hadoop are you build spark against and what version is the cluster that you're running?

@ArtRand
Copy link
Author

ArtRand commented Oct 5, 2017

@kalvinnchau I compiled Spark with Hadoop 2.6, I'm running on a DC/OS cluster with Mesos 1.4.0

@SparkQA
Copy link

SparkQA commented Nov 6, 2017

Test build #83493 has finished for PR 19272 at commit 4558cea.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ArtRand
Copy link
Author

ArtRand commented Nov 6, 2017

Hello @vanzin thanks for continuing to help with this. Please take another look at this refactor.

In this change, there is one place to interact with hadoopDelegationTokens from CoarseGrainedSchedulerBackedn: initializeHadoopDelegationTokens. This method contains the logic for initializing the tokens and setting a token renewer. It's also now resource-manager specific. This seems cleaner than having a HadoopDelegationTokenManager in CoarseGrainedSchedulerBackend because any "token management" will always want to wrapHadoopDelegationTokenManager so you can keep all the necessary information in one place. Of course, happy to discuss further.

Copy link
Contributor

@vanzin vanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't like the API. There are just too many touch points between the different classes in your patch, and non-trivial initialization order requirements. That makes the code brittle and hard to modify later.

s"${delegationTokenProviders.keys.mkString(", ")}.")

/** Construct a [[HadoopDelegationTokenManager]] for the default Hadoop filesystem */
/**
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not really changing anything, so I'd just revert changes to this file. Or, if you really want to, just keep the new @params you're adding below.

// Hadoop delegation tokens to be sent to the executors.
val hadoopDelegationCreds: Option[Array[Byte]] = getHadoopDelegationCreds()
// Hadoop delegation tokens to be sent to the executors, can be updated as necessary.
protected var hadoopDelegationTokens: Option[Array[Byte]] = initializeHadoopDelegationTokens()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this protected? There's no reason I can see for subclasses to need access to this field.


// check that the credentials are defined, even though it's likely that auth would have failed
// already if you've made it this far, then start the token renewer
if (hadoopDelegationTokens.isDefined) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You shouldn't do this here, otherwise you need to keep that field protected in the parent class and that adds unnecessary coupling. Instead, do this in initializeHadoopDelegationTokens.

Copy link
Author

@ArtRand ArtRand Nov 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that I shouldn't need to use the conditional hadoopDelegationTokens.isDefined, however there will need to be some check (UserGroupInformation.isSecurityEnabled or similar) to pass the driverEndpoint to the renewer/manager here. When the initial tokens are generated driverEndpoint is still None because start() hasn't been called yet. So I could schedule the renewal, but I'll still have to at least update the driverEndpoint here.

I could initialize the driverEndpoint in initializeHadoopDelegationTokens for Mesos and change around the logic in start() (for the MesosCoarseGrainedSchedulerBackend) but then you're just switching one conditional for another...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I may have spoke too soon, there might be a way..

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could call initializeHadoopDelegationTokens in start after everything that's needed is initialized. It would also better follow the scheduler's lifecycle.

Copy link
Author

@ArtRand ArtRand Nov 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check out the patch now. hadoopDelegationTokens now calls initializeHadoopDelegationTokens (renamed fetchHadoopDelegationTokens) by name:

  private val hadoopDelegationTokens: () => Option[Array[Byte]] = fetchHadoopDelegationTokens

where

  override def fetchHadoopDelegationTokens(): Option[Array[Byte]] = {
    if (UserGroupInformation.isSecurityEnabled) {
      Some(hadoopDelegationTokenManager.getTokens())
    } else {
      None
    }
  }

This has the effect of only generating the first set of delegation tokens once the first RetrieveSparkAppConfig message is received. At this point, everything has been initialized because renewer (renamed MesosHadoopDelegationTokenManager) is evaluated lazily with the correct driverEndpoint.

  private lazy val hadoopDelegationTokenManager: MesosHadoopDelegationTokenManager =
    new MesosHadoopDelegationTokenManager(conf, sc.hadoopConfiguration, driverEndpoint)

It's maybe a bit confusing to just avoid an extra conditional. WDYT?

* serialized) are broadcast to all running executors. On the executor side, when new Tokens are
* recieved they overwrite the current credentials.
*/
class MesosCredentialRenewer(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private[spark]


private val (secretFile, mode) = getSecretFile(conf)

var (tokens: Array[Byte], timeOfNextRenewal: Long) = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private?

(secretFile, mode)
}

def scheduleTokenRenewal(driverEndpoint: RpcEndpointRef): Unit = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why isn't this done in the constructor? There's a single call to this method, and the renewal interval could very easily be turned into a constructor arg.

Some(new HadoopDelegationTokenManager(sc.conf, sc.hadoopConfiguration))
private lazy val hadoopCredentialRenewer: MesosCredentialRenewer =
new MesosCredentialRenewer(
conf, new HadoopDelegationTokenManager(sc.conf, sc.hadoopConfiguration))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why pass in a HadoopDelegationTokenManager if it's not used by this class? The renewer can create one itself.


override def initializeHadoopDelegationTokens(): Option[Array[Byte]] = {
if (UserGroupInformation.isSecurityEnabled) {
Some(hadoopCredentialRenewer.tokens)
Copy link
Contributor

@vanzin vanzin Nov 6, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, seems to me that your "renewer" is doing more than just renewing tokens; it's also being used to generate the initial set. So aside from my comments about initializing the renewer here, you should also probably make this API a little cleaner. Right now there's too much coupling.

The renewer should do renewals only, otherwise it should be called something different.

@SparkQA
Copy link

SparkQA commented Nov 8, 2017

Test build #83607 has finished for PR 19272 at commit 5f254e5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@vanzin vanzin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, the API looks a lot better. Still a few things to take care of.

}.start()

case UpdateDelegationTokens(tokenBytes) =>
SparkHadoopUtil.get.addDelegationTokens(tokenBytes, env.conf)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a logInfo saying the tokens are being updated? This has always been helpful when debugging issues with this feature on YARN.

}

case UpdateDelegationTokens(newDelegationTokens) =>
// Update the driver's delegation tokens in case new executors are added later.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stale comment?

sparkProperties,
SparkEnv.get.securityManager.getIOEncryptionKey(),
hadoopDelegationCreds)
hadoopDelegationTokens.apply())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't you just call fetchHadoopDelegationTokens() directly?

* received they overwrite the current credentials.
*/
private[spark] class MesosHadoopDelegationTokenManager(
conf: SparkConf, hadoopConfig: Configuration,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One arg per line.

}

if (principal == null) {
logInfo(s"Using mode: $mode to retrieve Hadoop delegation tokens")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You should probably assert that mode is "tgt" in this case.

} catch {
case e: Exception =>
throw new IllegalStateException("Failed to initialize Hadoop delegation tokens\n" +
s"\tPricipal: $principal\n\tmode: $mode\n\tsecret file $secretFile\n\tException: $e")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use e as the cause of the exception you're throwing.

private def getSecretFile(conf: SparkConf): (String, String) = {
val keytab = conf.get(config.KEYTAB).orNull
val tgt = conf.getenv("KRB5CCNAME")
require(keytab != null || tgt != null, "A keytab or TGT required.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that really the case? KRB5CCNAME is not a required env variable. It has a default value, and the UGI class will use the credentials from the default location if they're available (and reloading the cache periodically).

So I think you don't really need this, but just to track whether there's a principal and keytab. And you don't need to call getUGIFromTicketCache later on since I'm pretty sure UGI takes care of that for you.


val tempCreds = ugi.getCredentials
val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf)
var nextRenewalTime = Long.MaxValue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val nextRenewalTime = ugi.doAs(new PrivilegedExceptionAction[Long] { ... }

}

private def broadcastDelegationTokens(tokens: Array[Byte]) = {
logDebug("Sending new tokens to all executors")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd make this logInfo (similar message in YARN code has helped me a lot).


private def broadcastDelegationTokens(tokens: Array[Byte]) = {
logDebug("Sending new tokens to all executors")
if (driverEndpoint == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this a require in the constructor?

@SparkQA
Copy link

SparkQA commented Nov 10, 2017

Test build #83660 has finished for PR 19272 at commit 8df7e37.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 10, 2017

Test build #83661 has finished for PR 19272 at commit 45b46ed.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@ArtRand
Copy link
Author

ArtRand commented Nov 13, 2017

Hello @vanzin thanks for the continued help with this, anything else needed?

@maverick2202
Copy link

We want to use this feature and will be great if this can be merged. Any idea which spark release will have it.

@ArtRand
Copy link
Author

ArtRand commented Nov 13, 2017

Hello @maverick2202, hopefully 2.3 (and maybe back ported?) but that's up to the Committers.

UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, secretFile.get)
} else {
// if the ticket cache is not explicitly defined, use the default
if (secretFile.isEmpty) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The point I was trying to make is that you do not need any special handling for TGT. The UGI class already does everything you need, you just need to get the current user. It will keep the TGT updated with any changes that happen on disk. You don't need to handle KRB5CCNAME anywhere, because UGI should be doing that for you. If it's not, you need to explain why you need this special handling, because the expected behavior is for this to work without you needing to do anything.

So you can simplify this class by only handling the principal / keytab case, and just using UserGroupInformation.getCurrentUser in the other case. You don't need to keep track of the "mode" or anything else, just whether you're using a principal / keytab pair.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do the executors get the TGT though? KRB5CCNAME would need to be set in the executor containers as well, right? If it is, I suppose you don't need to broadcast the delegation tokens at all because runAsSparkUser takes care of it for you?

I guess now that #19437 has been merged we can disseminate the TGT though Mesos.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do the executors get the TGT though?

They don't. That's why you're creating delegation tokens and sending them to the executor.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah gotcha. I misunderstood your previous comment to mean that you wouldn’t need to renew the tokens when using the ticket cache. I’ll simplify the logic. Thanks.

Copy link
Author

@ArtRand ArtRand Nov 14, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So getCurrentUser actually doesn't work. I believe for the reason mentioned here. The same expired credentials are returned (causing the renewer to loop).

I understand that optionally using a TGT instead of a keytab is different than the YARN reference implementation, and it's unusual to use it in this case since it expires anyways. Do you think it would be better to avoid the ticket renewal logic all together when using a TGT or to keep the older UserGroupInformation.getUGIFromTicketCache-based method?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it would be better to avoid the ticket renewal logic all together when using a TGT

If getCurrentUser does not work, then yes, that's probably the best way forward. Requiring the user to set KRB5CCNAME is not really a good way to go about this. Also because in that case the user still has to make sure the TGT is updated through other means.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, thanks.

@ArtRand
Copy link
Author

ArtRand commented Nov 14, 2017

@vanzin PTAL.
I removed the awkward mode parameter from the token manager. Now we only start the renewer thread when using a keytab/principal. The condition is logged appropriately.

@SparkQA
Copy link

SparkQA commented Nov 15, 2017

Test build #83863 has finished for PR 19272 at commit 18d77ff.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

* Add or overwrite current user's credentials with serialized delegation tokens,
* also confirms correct hadoop configuration is set.
*/
def addDelegationTokens(tokens: Array[Byte], sparkConf: SparkConf) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Always forget this class is public. Add private[spark].

* @param fraction fraction of the time until expiration return
* @return Date when the fraction of the time until expiration has passed
*/
def getDateOfNextUpdate(expirationDate: Long, fraction: Double): Long = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add private[spark].

}

def getTokens(): Array[Byte] = {
tokens
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tokens is never updated, so fetchHadoopDelegationTokens() will always return the initial set even after it's expired.

Copy link
Author

@ArtRand ArtRand Nov 15, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching this, tokens are now updated for late-joining executors. https://github.com/apache/spark/pull/19272/files#diff-765ac3c4db227cd2c5d796f00794016fR145

private def getNewDelegationTokens(): Array[Byte] = {
logInfo(s"Attempting to login to KDC with principal ${principal}")
// Get new delegation tokens by logging in with a new UGI
// inspired by AMCredentialRenewer.scala:L174.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: mentioning line numbers will make this stale very quickly.

@SparkQA
Copy link

SparkQA commented Nov 15, 2017

Test build #83912 has finished for PR 19272 at commit 049e4b5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@vanzin
Copy link
Contributor

vanzin commented Nov 15, 2017

Merging to master.

@asfgit asfgit closed this in 1e82335 Nov 15, 2017
@ArtRand
Copy link
Author

ArtRand commented Nov 16, 2017

@vanzin Thanks for the reviews and mentorship!

susanxhuynh pushed a commit to d2iq-archive/spark that referenced this pull request Nov 16, 2017
…Mesos

tl;dr: Add a class, `MesosHadoopDelegationTokenManager` that updates delegation tokens on a schedule on the behalf of Spark Drivers. Broadcast renewed credentials to the executors.

We recently added Kerberos support to Mesos-based Spark jobs as well as Secrets support to the Mesos Dispatcher (SPARK-16742, SPARK-20812, respectively). However the delegation tokens have a defined expiration. This poses a problem for long running Spark jobs (e.g. Spark Streaming applications). YARN has a solution for this where a thread is scheduled to renew the tokens they reach 75% of their way to expiration. It then writes the tokens to HDFS for the executors to find (uses a monotonically increasing suffix).

We replace the current method in `CoarseGrainedSchedulerBackend` which used to discard the token renewal time with a protected method `fetchHadoopDelegationTokens`. Now the individual cluster backends are responsible for overriding this method to fetch and manage token renewal. The delegation tokens themselves, are still part of the `CoarseGrainedSchedulerBackend` as before.
In the case of Mesos renewed Credentials are broadcasted to the executors. This maintains all transfer of Credentials within Spark (as opposed to Spark-to-HDFS). It also does not require any writing of Credentials to disk. It also does not require any GC of old files.

Manually against a Kerberized HDFS cluster.

Thank you for the reviews.

Author: ArtRand <[email protected]>

Closes apache#19272 from ArtRand/spark-21842-450-kerberos-ticket-renewal.
ArtRand pushed a commit to d2iq-archive/spark that referenced this pull request Nov 18, 2017
…Mesos (#17)

tl;dr: Add a class, `MesosHadoopDelegationTokenManager` that updates delegation tokens on a schedule on the behalf of Spark Drivers. Broadcast renewed credentials to the executors.

We recently added Kerberos support to Mesos-based Spark jobs as well as Secrets support to the Mesos Dispatcher (SPARK-16742, SPARK-20812, respectively). However the delegation tokens have a defined expiration. This poses a problem for long running Spark jobs (e.g. Spark Streaming applications). YARN has a solution for this where a thread is scheduled to renew the tokens they reach 75% of their way to expiration. It then writes the tokens to HDFS for the executors to find (uses a monotonically increasing suffix).

We replace the current method in `CoarseGrainedSchedulerBackend` which used to discard the token renewal time with a protected method `fetchHadoopDelegationTokens`. Now the individual cluster backends are responsible for overriding this method to fetch and manage token renewal. The delegation tokens themselves, are still part of the `CoarseGrainedSchedulerBackend` as before.
In the case of Mesos renewed Credentials are broadcasted to the executors. This maintains all transfer of Credentials within Spark (as opposed to Spark-to-HDFS). It also does not require any writing of Credentials to disk. It also does not require any GC of old files.

Manually against a Kerberized HDFS cluster.

Thank you for the reviews.

Author: ArtRand <[email protected]>

Closes apache#19272 from ArtRand/spark-21842-450-kerberos-ticket-renewal.
susanxhuynh pushed a commit to d2iq-archive/spark that referenced this pull request Jan 14, 2018
…Mesos

tl;dr: Add a class, `MesosHadoopDelegationTokenManager` that updates delegation tokens on a schedule on the behalf of Spark Drivers. Broadcast renewed credentials to the executors.

We recently added Kerberos support to Mesos-based Spark jobs as well as Secrets support to the Mesos Dispatcher (SPARK-16742, SPARK-20812, respectively). However the delegation tokens have a defined expiration. This poses a problem for long running Spark jobs (e.g. Spark Streaming applications). YARN has a solution for this where a thread is scheduled to renew the tokens they reach 75% of their way to expiration. It then writes the tokens to HDFS for the executors to find (uses a monotonically increasing suffix).

We replace the current method in `CoarseGrainedSchedulerBackend` which used to discard the token renewal time with a protected method `fetchHadoopDelegationTokens`. Now the individual cluster backends are responsible for overriding this method to fetch and manage token renewal. The delegation tokens themselves, are still part of the `CoarseGrainedSchedulerBackend` as before.
In the case of Mesos renewed Credentials are broadcasted to the executors. This maintains all transfer of Credentials within Spark (as opposed to Spark-to-HDFS). It also does not require any writing of Credentials to disk. It also does not require any GC of old files.

Manually against a Kerberized HDFS cluster.

Thank you for the reviews.

Author: ArtRand <[email protected]>

Closes apache#19272 from ArtRand/spark-21842-450-kerberos-ticket-renewal.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants