diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index f963a4606068..d934328bbef5 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -104,6 +104,7 @@ private[deploy] class Worker( private var workerWebUiUrl: String = "" private val workerUri = RpcEndpointAddress(rpcEnv.address, endpointName).toString private var registered = false + private var hasRegisteredBefore = false private var connected = false private val workerId = generateWorkerId() private val sparkHome = @@ -351,21 +352,23 @@ private[deploy] class Worker( logInfo("Successfully registered with master " + masterRef.address.toSparkURL) registered = true changeMaster(masterRef, masterWebUiUrl) - forwordMessageScheduler.scheduleAtFixedRate(new Runnable { - override def run(): Unit = Utils.tryLogNonFatalError { - self.send(SendHeartbeat) - } - }, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS) - if (CLEANUP_ENABLED) { - logInfo( - s"Worker cleanup enabled; old application directories will be deleted in: $workDir") + if (!hasRegisteredBefore) { forwordMessageScheduler.scheduleAtFixedRate(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { - self.send(WorkDirCleanup) + self.send(SendHeartbeat) } - }, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS) + }, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS) + if (CLEANUP_ENABLED) { + logInfo( + s"Worker cleanup enabled; old application directories will be deleted in: $workDir") + forwordMessageScheduler.scheduleAtFixedRate(new Runnable { + override def run(): Unit = Utils.tryLogNonFatalError { + self.send(WorkDirCleanup) + } + }, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS) + } + hasRegisteredBefore = true } - val execs = executors.values.map { e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state) }