Skip to content

Commit e800c8b

Browse files
Restore original RegisteredExecutor message, and send new tokens via NewTokens message.
1 parent 0e9507e commit e800c8b

File tree

3 files changed

+11
-12
lines changed

3 files changed

+11
-12
lines changed

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -72,17 +72,17 @@ private[spark] class CoarseGrainedExecutorBackend(
7272
}
7373

7474
override def receive: PartialFunction[Any, Unit] = {
75-
case RegisteredExecutor(tokens) =>
75+
case RegisteredExecutor =>
7676
logInfo("Successfully registered with driver")
7777
val (hostname, _) = Utils.parseHostPort(hostPort)
7878
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
79-
tokens.foreach { x =>
80-
val inStream = new DataInputStream(new ByteArrayInputStream(x.value.array()))
81-
val creds = new Credentials()
82-
creds.readFields(inStream)
83-
inStream.close()
84-
UserGroupInformation.getCurrentUser.addCredentials(creds)
85-
}
79+
80+
case NewTokens(tokens) =>
81+
val inStream = new DataInputStream(new ByteArrayInputStream(tokens.value.array()))
82+
val creds = new Credentials()
83+
creds.readFields(inStream)
84+
inStream.close()
85+
UserGroupInformation.getCurrentUser.addCredentials(creds)
8686

8787
case RegisterExecutorFailed(message) =>
8888
logError("Slave registration failed: " + message)

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,7 @@ private[spark] object CoarseGrainedClusterMessages {
3535
case class KillTask(taskId: Long, executor: String, interruptThread: Boolean)
3636
extends CoarseGrainedClusterMessage
3737

38-
case class RegisteredExecutor(tokens: Option[SerializableBuffer])
39-
extends CoarseGrainedClusterMessage
38+
case object RegisteredExecutor extends CoarseGrainedClusterMessage
4039

4140
case class NewTokens(tokens: SerializableBuffer) extends CoarseGrainedClusterMessage
4241

core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,8 +124,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
124124
context.reply(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
125125
} else {
126126
logInfo("Registered executor: " + executorRef + " with ID " + executorId)
127-
context.reply(RegisteredExecutor(latestTokens))
128-
127+
context.reply(RegisteredExecutor)
128+
latestTokens.foreach(x => context.reply(NewTokens(x)))
129129
addressToExecutorId(executorRef.address) = executorId
130130
totalCoreCount.addAndGet(cores)
131131
totalRegisteredExecutors.addAndGet(1)

0 commit comments

Comments
 (0)