Skip to content

Conversation

@vanzin
Copy link
Contributor

@vanzin vanzin commented Jan 11, 2019

This change addes a new mode for credential renewal that does not require
a keytab; it uses the local ticket cache instead, so it works while the
user keeps the cache valid.

This can be useful for, e.g., people running long spark-shell sessions where
their kerberos login is kept up-to-date.

The main change to enable this behavior is in HadoopDelegationTokenManager,
with a small change in the HDFS token provider. The other changes are to avoid
creating duplicate tokens when submitting the application to YARN; they allow
the tokens from the scheduler to be sent to the YARN AM, reducing the round trips
to HDFS.

For that, the scheduler initialization code was changed a little bit so that
the tokens are available when the YARN client is initialized. That basically
takes care of a long-standing TODO that was in the code to clean up configuration
propagation to the driver's RPC endpoint (in CoarseGrainedSchedulerBackend).

Tested with an app designed to stress this functionality, with both keytab and
cache-based logins. Some basic kerberos tests on k8s also.

Marcelo Vanzin added 2 commits January 11, 2019 13:29
… cache.

This change addes a new mode for credential renewal that does not require
a keytab; it uses the local ticket cache instead, so it works while the
user keeps the cache valid.

This can be useful for, e.g., people running long spark-shell sessions where
their kerberos login is kept up-to-date.

The main change to enable this behavior is in HadoopDelegationTokenManager,
with a small change in the HDFS token provider. The other changes are to avoid
creating duplicate tokens when submitting the application to YARN; they allow
the tokens from the scheduler to be sent to the YARN AM, reducing the round trips
to HDFS.

For that, the scheduler initialization code was changed a little bit so that
the tokens are available when the YARN client is initialized. That basically
takes care of a long-standing TODO that was in the code to clean up configuration
propagation to the driver's RPC endpoint (in CoarseGrainedSchedulerBackend).

Tested with an app designed to stress this functionality, with both keytab and
cache-based logins. Some basic kerberos tests on k8s also.
@SparkQA
Copy link

SparkQA commented Jan 11, 2019

Test build #101105 has finished for PR 23525 at commit 7fa03ec.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class DriverEndpoint extends ThreadSafeRpcEndpoint with Logging

@SparkQA
Copy link

SparkQA commented Jan 11, 2019

Test build #101106 has finished for PR 23525 at commit f7e6734.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 12, 2019

Test build #101107 has finished for PR 23525 at commit df85d68.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

ugi
} else {
logInfo(s"Attempting to load user's ticket cache.")
val ccache = sparkConf.getenv("KRB5CCNAME")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering if adding an additional optional configuration parameter with the path of the KRB5CC file could also be useful? Possibly more useful when using this in cluster mode?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how you'd use this in cluster mode; but the biggest issue is that the Hadoop libraries only use the env variable (which is also recognized by all kerberos tools). So we can't really add a Spark-specific option.

} else {
logInfo(s"Attempting to load user's ticket cache.")
val ccache = sparkConf.getenv("KRB5CCNAME")
val user = Option(sparkConf.getenv("KRB5PRINCIPAL")).getOrElse(
Copy link
Contributor

@LucaCanali LucaCanali Jan 15, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to also check/use the value of spark.yarn.principal (or an ad-hoc config parameter if "reusing" this one is not OK) if provided by the user?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark.kerberos.principal is tightly coupled to keytabs. But the problem here is twofold: first, you're already logged in, so you can't define the principal at this point. Second, that env variable is the one used by Hadoop libraries.

val tgtRenewalTask = new Runnable() {
new Runnable() {
override def run(): Unit = {
ugi.checkTGTAndReloginFromKeytab()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When testing this I get a warning message "WARN UserGroupInformation: Not attempting to re-login since the last re-login was attempted less than 600 seconds before.." every minute (I am using the default value of spark.yarn.kerberos.relogin.period).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From looking at the code it seems like a problem with your config. As far as I understand, you'd get this if your tgt was about to expire but hadoop.kerberos.min.seconds.before.relogin has not passed yet. That defaults to 60s (not 600), but it seems you've changed that value in your config.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @vanzin for the detailed explanations.
After some additional investigation I found that if I compile Spark with Hadoop 3.1 the behavior is OK. I can still reproduce the issue I mentioned with the standard 2.7 version in my environment. It appears that hadoop.kerberos.min.seconds.before.relogin is not available in Hadoop 2.7 and was introduced in 2.8?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was testing with Hadoop 2 and did not see the error you saw. But let me take another look at the code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it was added in HADOOP-7930 (2.8 has it). Before that the timeout was hardcoded to 600s.

But it still should not print that if the TGT is still valid. So it seems to me your TGT lifetime configuration makes the code hit this path. The UGI class will try to renew if 80% of the lifetime has elapsed. So to avoid it your TGT lifetime should be at the very least 12.5 min, perhaps even higher.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I should clarify that the warning messages I reported are for the case where I use the TGT and --conf spark.kerberos.renewal.credentials=ccache rather than using keytab, apologies for the possible confusion this may have generated.
Looking now at the code for UserGroupInformatio.reloginFromTicketCache I can see that it calls hasSufficientTimeElapsed which is responsible for generating the warning message in question, when users are trying to renew at a rate higher than a certain frequency. As you pointed out, with hadoop.kerberos.min.seconds.before.relogin set to default value of 60 we are OK as it matches the default for spark.kerberos.relogin.period, (but this requires HADOOP-7930, e.g. Hadoop version >= 2.8)).
On a related topic, I can see that checkTGTAndReloginFromKeytab has a "silent" way of checking if the rate of request for renewal is higher than the threshold, so no warning are generated in this case.
Does this make sense and is it reproducible in you Hadoop 2.7 environment too?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the case where I use the TGT and --conf spark.kerberos.renewal.credentials=ccache

Ah, that makes more sense. I'm testing on CDH, which is based on 2.6 but has a lot of patches including HADOOP-7930, so I don't see that message.

Anyway, since all Hadoop libs I looked at seem to take care of keeping the TGT up-to-date, I'll remove this code, and we can re-introduce it later if needed.

Spark driver. In the case of YARN, this means using HDFS as a staging area for the keytab, so it's
strongly recommended that both YARN and HDFS be secured with encryption, at least.

### Using a ticket cache
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice improvement in this PR. I guess it is worth documenting it also on docs/running-on-yarn.md

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This document is linked from the YARN doc. No need to duplicate documentation.

.createWithDefaultString("1m")

private[spark] val KERBEROS_RENEWAL_CREDENTIALS =
ConfigBuilder("spark.kerberos.renewal.credentials")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering why an additional config needed and not just falling back to cache if keytab is not provided + the cluster is secure.

Copy link
Contributor Author

@vanzin vanzin Jan 22, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would be a change in behavior. e.g in YARN that would increase the number of delegation tokens an app creates when it's run without a keytab.

Second, if your TGT is not renewable (kinit -l foo, instead of kinit -l foo -r bar), you'll get noisy errors in the output.

Also, a config opens the path for other ways of providing tokens (e.g. renewing based on a delegation token cache, which some k8s guys were interested in).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I see, it's fine.

A little bit offtopic here but I've considered to apply this to the kafka area. If no global JVM security config provided then use this config or something.

@SparkQA
Copy link

SparkQA commented Jan 23, 2019

Test build #101557 has finished for PR 23525 at commit 331bb6b.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@gaborgsomogyi gaborgsomogyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only minor things. Overall looks good.

def stop(): Unit = {
if (renewalExecutor != null) {
renewalExecutor.shutdown()
renewalExecutor.shutdownNow()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why force needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shutdown doesn't stop the executor. Any schedule tasks will still be run. So the executor might stay up for about a day if there's a renewal task scheduled.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, thanks for the info.

// user as renewer.
sparkConf.get(PRINCIPAL).flatMap { renewer =>
val user = sparkConf.get(KERBEROS_RENEWAL_CREDENTIALS) match {
case "keytab" => sparkConf.get(PRINCIPAL)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't one get the user all the time with UserGroupInformation.getCurrentUser().getUserName()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess.


protected def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
new DriverEndpoint(rpcEnv, properties)
protected def createDriverEndpoint(): DriverEndpoint = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Curly braces can be dropped


override def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
new KubernetesDriverEndpoint(sc.env.rpcEnv, properties)
override def createDriverEndpoint(): DriverEndpoint = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Curly braces can be dropped

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't fit in the same line, so braces remain.


override def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
new YarnDriverEndpoint(rpcEnv, properties)
override def createDriverEndpoint(): DriverEndpoint = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Curly braces can be dropped

@SparkQA
Copy link

SparkQA commented Jan 24, 2019

Test build #101604 has finished for PR 23525 at commit 57aed47.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@gaborgsomogyi gaborgsomogyi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@vanzin
Copy link
Contributor Author

vanzin commented Jan 25, 2019

Will merge Monday if no more feedback. (@squito @tgravescs in case you want to look at it.)

@vanzin
Copy link
Contributor Author

vanzin commented Jan 28, 2019

Merging to master.

@asfgit asfgit closed this in 2a67dbf Jan 28, 2019
@vanzin vanzin deleted the SPARK-26595 branch January 29, 2019 21:26
jackylee-ch pushed a commit to jackylee-ch/spark that referenced this pull request Feb 18, 2019
… cache.

This change addes a new mode for credential renewal that does not require
a keytab; it uses the local ticket cache instead, so it works while the
user keeps the cache valid.

This can be useful for, e.g., people running long spark-shell sessions where
their kerberos login is kept up-to-date.

The main change to enable this behavior is in HadoopDelegationTokenManager,
with a small change in the HDFS token provider. The other changes are to avoid
creating duplicate tokens when submitting the application to YARN; they allow
the tokens from the scheduler to be sent to the YARN AM, reducing the round trips
to HDFS.

For that, the scheduler initialization code was changed a little bit so that
the tokens are available when the YARN client is initialized. That basically
takes care of a long-standing TODO that was in the code to clean up configuration
propagation to the driver's RPC endpoint (in CoarseGrainedSchedulerBackend).

Tested with an app designed to stress this functionality, with both keytab and
cache-based logins. Some basic kerberos tests on k8s also.

Closes apache#23525 from vanzin/SPARK-26595.

Authored-by: Marcelo Vanzin <[email protected]>
Signed-off-by: Marcelo Vanzin <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants