Skip to content

Commit b5e7a72

Browse files
Remove reflection, use a method in SparkHadoopUtil to update the token renewer.
1 parent 7bff6e9 commit b5e7a72

File tree

3 files changed

+24
-18
lines changed

3 files changed

+24
-18
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,17 @@ class SparkHadoopUtil extends Logging {
292292
}
293293
}
294294
}
295+
296+
/**
297+
* Start a thread to periodically update the current user's credentials with new delegation
298+
* tokens so that writes to HDFS do not fail.
299+
*/
300+
private[spark] def startExecutorDelegationTokenRenewer(conf: SparkConf){}
301+
302+
/**
303+
* Stop the thread that does the delegation token updates.
304+
*/
305+
private[spark] def stopExecutorDelegationTokenRenewer() {}
295306
}
296307

297308
object SparkHadoopUtil {

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -170,24 +170,10 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
170170
driverConf.set(key, value)
171171
}
172172
}
173-
// Delegation Token Updater is not supported in Hadoop 1, so use reflection.
174-
// Can't use Option[ExecutorDelegationTokenUpdater] because it is built only in YARN
175-
// profile, so use Option[Any] since even the stop method call will be via reflection.
176-
var tokenUpdaterOption: Option[Any] = None
177-
var tokenUpdaterClass: Option[Class[_]] = None
178173
if (driverConf.contains("spark.yarn.credentials.file")) {
179174
logInfo("Will periodically update credentials from: " +
180175
driverConf.get("spark.yarn.credentials.file"))
181-
182-
tokenUpdaterClass =
183-
Some(Class.forName("org.apache.spark.deploy.yarn.ExecutorDelegationTokenUpdater"))
184-
185-
// Periodically update the credentials for this user to ensure HDFS tokens get updated.
186-
val constructor =
187-
tokenUpdaterClass.get.getDeclaredConstructor(classOf[SparkConf], classOf[Configuration])
188-
tokenUpdaterOption = Some(constructor.newInstance(driverConf, SparkHadoopUtil.get.conf))
189-
tokenUpdaterClass.get.getMethod("updateCredentialsIfRequired")
190-
.invoke(tokenUpdaterOption.get)
176+
SparkHadoopUtil.get.startExecutorDelegationTokenRenewer(driverConf)
191177
}
192178

193179
val env = SparkEnv.createExecutorEnv(
@@ -205,9 +191,7 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
205191
env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
206192
}
207193
env.rpcEnv.awaitTermination()
208-
if (tokenUpdaterOption.isDefined) {
209-
tokenUpdaterClass.get.getMethod("stop").invoke(tokenUpdaterOption.get)
210-
}
194+
SparkHadoopUtil.get.stopExecutorDelegationTokenRenewer()
211195
}
212196
}
213197

yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,8 @@ import org.apache.spark.util.Utils
4444
*/
4545
class YarnSparkHadoopUtil extends SparkHadoopUtil {
4646

47+
private var tokenRenewer: Option[ExecutorDelegationTokenUpdater] = None
48+
4749
override def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation) {
4850
dest.addCredentials(source.getCredentials())
4951
}
@@ -125,6 +127,15 @@ class YarnSparkHadoopUtil extends SparkHadoopUtil {
125127
}
126128
}
127129

130+
private[spark] override def startExecutorDelegationTokenRenewer(sparkConf: SparkConf): Unit = {
131+
tokenRenewer = Some(new ExecutorDelegationTokenUpdater(sparkConf, conf))
132+
tokenRenewer.get.updateCredentialsIfRequired()
133+
}
134+
135+
private[spark] override def stopExecutorDelegationTokenRenewer(): Unit ={
136+
tokenRenewer.foreach(_.stop())
137+
}
138+
128139
}
129140

130141
object YarnSparkHadoopUtil {

0 commit comments

Comments
 (0)