-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[Spark-21842][Mesos] Support Kerberos ticket renewal and creation in Mesos #19272
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
44a6098
781c5a7
18d2c6c
43ab547
f136eaa
488f72a
f5925fd
3f22efe
e522150
837157d
c95f80b
e8bbc9e
1596d80
864ab7e
7e0590a
f93c551
b2fbcf2
4558cea
5f254e5
8df7e37
45b46ed
18d77ff
049e4b5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -24,11 +24,7 @@ import javax.annotation.concurrent.GuardedBy | |
| import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} | ||
| import scala.concurrent.Future | ||
|
|
||
| import org.apache.hadoop.security.UserGroupInformation | ||
|
|
||
| import org.apache.spark.{ExecutorAllocationClient, SparkEnv, SparkException, TaskState} | ||
| import org.apache.spark.deploy.SparkHadoopUtil | ||
| import org.apache.spark.deploy.security.HadoopDelegationTokenManager | ||
| import org.apache.spark.internal.Logging | ||
| import org.apache.spark.rpc._ | ||
| import org.apache.spark.scheduler._ | ||
|
|
@@ -99,12 +95,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp | |
| // The num of current max ExecutorId used to re-register appMaster | ||
| @volatile protected var currentExecutorIdCounter = 0 | ||
|
|
||
| // hadoop token manager used by some sub-classes (e.g. Mesos) | ||
| def hadoopDelegationTokenManager: Option[HadoopDelegationTokenManager] = None | ||
|
|
||
| // Hadoop delegation tokens to be sent to the executors. | ||
| val hadoopDelegationCreds: Option[Array[Byte]] = getHadoopDelegationCreds() | ||
|
|
||
| class DriverEndpoint(override val rpcEnv: RpcEnv, sparkProperties: Seq[(String, String)]) | ||
| extends ThreadSafeRpcEndpoint with Logging { | ||
|
|
||
|
|
@@ -159,6 +149,11 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp | |
| scheduler.getExecutorsAliveOnHost(host).foreach { exec => | ||
| killExecutors(exec.toSeq, replace = true, force = true) | ||
| } | ||
|
|
||
| case UpdateDelegationTokens(newDelegationTokens) => | ||
| executorDataMap.values.foreach { ed => | ||
| ed.executorEndpoint.send(UpdateDelegationTokens(newDelegationTokens)) | ||
| } | ||
| } | ||
|
|
||
| override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { | ||
|
|
@@ -236,7 +231,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp | |
| val reply = SparkAppConfig( | ||
| sparkProperties, | ||
| SparkEnv.get.securityManager.getIOEncryptionKey(), | ||
| hadoopDelegationCreds) | ||
| fetchHadoopDelegationTokens()) | ||
| context.reply(reply) | ||
| } | ||
|
|
||
|
|
@@ -686,18 +681,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp | |
| true | ||
| } | ||
|
|
||
| protected def getHadoopDelegationCreds(): Option[Array[Byte]] = { | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This method was only called once, and would discard the renewal time information limiting it's utility. |
||
| if (UserGroupInformation.isSecurityEnabled && hadoopDelegationTokenManager.isDefined) { | ||
| hadoopDelegationTokenManager.map { manager => | ||
| val creds = UserGroupInformation.getCurrentUser.getCredentials | ||
| val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) | ||
| manager.obtainDelegationTokens(hadoopConf, creds) | ||
| SparkHadoopUtil.get.serialize(creds) | ||
| } | ||
| } else { | ||
| None | ||
| } | ||
| } | ||
| protected def fetchHadoopDelegationTokens(): Option[Array[Byte]] = { None } | ||
| } | ||
|
|
||
| private[spark] object CoarseGrainedSchedulerBackend { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,7 +17,7 @@ | |
|
|
||
| package org.apache.spark.scheduler.cluster.mesos | ||
|
|
||
| import org.apache.spark.{SparkContext, SparkException} | ||
| import org.apache.spark.SparkContext | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Change not needed?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| import org.apache.spark.internal.config._ | ||
| import org.apache.spark.scheduler.{ExternalClusterManager, SchedulerBackend, TaskScheduler, TaskSchedulerImpl} | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,157 @@ | ||
| /* | ||
| * Licensed to the Apache Software Foundation (ASF) under one or more | ||
| * contributor license agreements. See the NOTICE file distributed with | ||
| * this work for additional information regarding copyright ownership. | ||
| * The ASF licenses this file to You under the Apache License, Version 2.0 | ||
| * (the "License"); you may not use this file except in compliance with | ||
| * the License. You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| package org.apache.spark.scheduler.cluster.mesos | ||
|
|
||
| import java.security.PrivilegedExceptionAction | ||
| import java.util.concurrent.{ScheduledExecutorService, TimeUnit} | ||
|
|
||
| import org.apache.hadoop.conf.Configuration | ||
| import org.apache.hadoop.security.UserGroupInformation | ||
|
|
||
| import org.apache.spark.SparkConf | ||
| import org.apache.spark.deploy.SparkHadoopUtil | ||
| import org.apache.spark.deploy.security.HadoopDelegationTokenManager | ||
| import org.apache.spark.internal.{config, Logging} | ||
| import org.apache.spark.rpc.RpcEndpointRef | ||
| import org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.UpdateDelegationTokens | ||
| import org.apache.spark.util.ThreadUtils | ||
|
|
||
|
|
||
| /** | ||
| * The MesosHadoopDelegationTokenManager fetches and updates Hadoop delegation tokens on the behalf | ||
| * of the MesosCoarseGrainedSchedulerBackend. It is modeled after the YARN AMCredentialRenewer, | ||
| * and similarly will renew the Credentials when 75% of the renewal interval has passed. | ||
| * The principal difference is that instead of writing the new credentials to HDFS and | ||
| * incrementing the timestamp of the file, the new credentials (called Tokens when they are | ||
| * serialized) are broadcast to all running executors. On the executor side, when new Tokens are | ||
| * received they overwrite the current credentials. | ||
| */ | ||
| private[spark] class MesosHadoopDelegationTokenManager( | ||
| conf: SparkConf, | ||
| hadoopConfig: Configuration, | ||
| driverEndpoint: RpcEndpointRef) | ||
| extends Logging { | ||
|
|
||
| require(driverEndpoint != null, "DriverEndpoint is not initialized") | ||
|
|
||
| private val credentialRenewerThread: ScheduledExecutorService = | ||
| ThreadUtils.newDaemonSingleThreadScheduledExecutor("Credential Renewal Thread") | ||
|
|
||
| private val tokenManager: HadoopDelegationTokenManager = | ||
| new HadoopDelegationTokenManager(conf, hadoopConfig) | ||
|
|
||
| private val principal: String = conf.get(config.PRINCIPAL).orNull | ||
|
|
||
| private var (tokens: Array[Byte], timeOfNextRenewal: Long) = { | ||
| try { | ||
| val creds = UserGroupInformation.getCurrentUser.getCredentials | ||
| val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) | ||
| val rt = tokenManager.obtainDelegationTokens(hadoopConf, creds) | ||
| logInfo(s"Initialized tokens: ${SparkHadoopUtil.get.dumpTokens(creds)}") | ||
| (SparkHadoopUtil.get.serialize(creds), rt) | ||
| } catch { | ||
| case e: Exception => | ||
| logError(s"Failed to fetch Hadoop delegation tokens $e") | ||
| throw e | ||
| } | ||
| } | ||
|
|
||
| private val keytabFile: Option[String] = conf.get(config.KEYTAB) | ||
|
|
||
| scheduleTokenRenewal() | ||
|
|
||
| private def scheduleTokenRenewal(): Unit = { | ||
| if (keytabFile.isDefined) { | ||
| require(principal != null, "Principal is required for Keytab-based authentication") | ||
| logInfo(s"Using keytab: ${keytabFile.get} and principal $principal") | ||
| } else { | ||
| logInfo("Using ticket cache for Kerberos authentication, no token renewal.") | ||
| return | ||
| } | ||
|
|
||
| def scheduleRenewal(runnable: Runnable): Unit = { | ||
| val remainingTime = timeOfNextRenewal - System.currentTimeMillis() | ||
| if (remainingTime <= 0) { | ||
| logInfo("Credentials have expired, creating new ones now.") | ||
| runnable.run() | ||
| } else { | ||
| logInfo(s"Scheduling login from keytab in $remainingTime millis.") | ||
| credentialRenewerThread.schedule(runnable, remainingTime, TimeUnit.MILLISECONDS) | ||
| } | ||
| } | ||
|
|
||
| val credentialRenewerRunnable = | ||
| new Runnable { | ||
| override def run(): Unit = { | ||
| try { | ||
| getNewDelegationTokens() | ||
| broadcastDelegationTokens(tokens) | ||
| } catch { | ||
| case e: Exception => | ||
| // Log the error and try to write new tokens back in an hour | ||
| logWarning("Couldn't broadcast tokens, trying again in an hour", e) | ||
| credentialRenewerThread.schedule(this, 1, TimeUnit.HOURS) | ||
| return | ||
| } | ||
| scheduleRenewal(this) | ||
| } | ||
| } | ||
| scheduleRenewal(credentialRenewerRunnable) | ||
| } | ||
|
|
||
| private def getNewDelegationTokens(): Unit = { | ||
| logInfo(s"Attempting to login to KDC with principal ${principal}") | ||
| // Get new delegation tokens by logging in with a new UGI inspired by AMCredentialRenewer.scala | ||
| // Don't protect against keytabFile being empty because it's guarded above. | ||
| val ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytabFile.get) | ||
| logInfo("Successfully logged into KDC") | ||
| val tempCreds = ugi.getCredentials | ||
| val hadoopConf = SparkHadoopUtil.get.newConfiguration(conf) | ||
| val nextRenewalTime = ugi.doAs(new PrivilegedExceptionAction[Long] { | ||
| override def run(): Long = { | ||
| tokenManager.obtainDelegationTokens(hadoopConf, tempCreds) | ||
| } | ||
| }) | ||
|
|
||
| val currTime = System.currentTimeMillis() | ||
| timeOfNextRenewal = if (nextRenewalTime <= currTime) { | ||
| logWarning(s"Next credential renewal time ($nextRenewalTime) is earlier than " + | ||
| s"current time ($currTime), which is unexpected, please check your credential renewal " + | ||
| "related configurations in the target services.") | ||
| currTime | ||
| } else { | ||
| SparkHadoopUtil.getDateOfNextUpdate(nextRenewalTime, 0.75) | ||
| } | ||
| logInfo(s"Time of next renewal is in ${timeOfNextRenewal - System.currentTimeMillis()} ms") | ||
|
|
||
| // Add the temp credentials back to the original ones. | ||
| UserGroupInformation.getCurrentUser.addCredentials(tempCreds) | ||
| // update tokens for late or dynamically added executors | ||
| tokens = SparkHadoopUtil.get.serialize(tempCreds) | ||
| } | ||
|
|
||
| private def broadcastDelegationTokens(tokens: Array[Byte]) = { | ||
| logInfo("Sending new tokens to all executors") | ||
| driverEndpoint.send(UpdateDelegationTokens(tokens)) | ||
| } | ||
|
|
||
| def getTokens(): Array[Byte] = { | ||
| tokens | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for catching this, |
||
| } | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No longer needed because resource-manager backends (may) implement their own
initializeHadoopDelegationTokens.