From 15f2d3e82e33706c9965822eed027e2411a8b8ed Mon Sep 17 00:00:00 2001 From: liujianhui Date: Tue, 3 Jan 2017 20:41:17 +0800 Subject: [PATCH] [spark-19001] add a new hasRegisterBefore tag to avoid duplicated fixed runnable --- .../apache/spark/deploy/worker/Worker.scala | 25 +++++++++++-------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index f963a4606068..d934328bbef5 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -104,6 +104,7 @@ private[deploy] class Worker( private var workerWebUiUrl: String = "" private val workerUri = RpcEndpointAddress(rpcEnv.address, endpointName).toString private var registered = false + private var hasRegisteredBefore = false private var connected = false private val workerId = generateWorkerId() private val sparkHome = @@ -351,21 +352,23 @@ private[deploy] class Worker( logInfo("Successfully registered with master " + masterRef.address.toSparkURL) registered = true changeMaster(masterRef, masterWebUiUrl) - forwordMessageScheduler.scheduleAtFixedRate(new Runnable { - override def run(): Unit = Utils.tryLogNonFatalError { - self.send(SendHeartbeat) - } - }, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS) - if (CLEANUP_ENABLED) { - logInfo( - s"Worker cleanup enabled; old application directories will be deleted in: $workDir") + if (!hasRegisteredBefore) { forwordMessageScheduler.scheduleAtFixedRate(new Runnable { override def run(): Unit = Utils.tryLogNonFatalError { - self.send(WorkDirCleanup) + self.send(SendHeartbeat) } - }, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS) + }, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS) + if (CLEANUP_ENABLED) { + logInfo( + s"Worker cleanup enabled; old application directories will be deleted in: $workDir") + forwordMessageScheduler.scheduleAtFixedRate(new Runnable { + override def run(): Unit = Utils.tryLogNonFatalError { + self.send(WorkDirCleanup) + } + }, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, TimeUnit.MILLISECONDS) + } + hasRegisteredBefore = true } - val execs = executors.values.map { e => new ExecutorDescription(e.appId, e.execId, e.cores, e.state) }