From 107e5c58fdbe143fe6eabcfdb5d91d7b1184bb35 Mon Sep 17 00:00:00 2001 From: "Zhang, Liye" Date: Mon, 29 Dec 2014 15:35:45 +0800 Subject: [PATCH 1/2] worker reconnect to master when master restart for exception --- .../src/main/scala/org/apache/spark/deploy/DeployMessage.scala | 2 ++ .../src/main/scala/org/apache/spark/deploy/master/Master.scala | 1 + .../src/main/scala/org/apache/spark/deploy/worker/Worker.scala | 3 +++ 3 files changed, 6 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala index 243d8edb72ed3..d2891d7188c38 100644 --- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala @@ -89,6 +89,8 @@ private[deploy] object DeployMessages { case class KillDriver(driverId: String) extends DeployMessage case class ApplicationFinished(id: String) + + case class MasterDisconnected(masterUrl: String) extends DeployMessage // Worker internal diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index e8a5cfc746fed..f55c2febce28d 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -368,6 +368,7 @@ private[spark] class Master( } else { logWarning(s"Got heartbeat from unregistered worker $workerId." + " This worker was never registered, so ignoring the heartbeat.") + sender ! MasterDisconnected(masterUrl) } } } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index f0f3da5eec4df..974bb3afba6c4 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -464,6 +464,9 @@ private[spark] class Worker( case ApplicationFinished(id) => finishedApps += id maybeCleanupApplication(id) + + case MasterDisconnected(masterUrl) => + masterDisconnected() } private def masterDisconnected() { From e9c99e3969f6e058e46d65575d796d1289351318 Mon Sep 17 00:00:00 2001 From: "Zhang, Liye" Date: Mon, 29 Dec 2014 16:51:50 +0800 Subject: [PATCH 2/2] add log info --- .../main/scala/org/apache/spark/deploy/master/Master.scala | 5 ++++- .../main/scala/org/apache/spark/deploy/worker/Worker.scala | 7 ++++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index f55c2febce28d..8b67b6a70731e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -366,8 +366,11 @@ private[spark] class Master( " Asking it to re-register.") sender ! ReconnectWorker(masterUrl) } else { + // Get unknown worker's heart beat, tell the worker disconnected. And worker need to + // register to this master first. logWarning(s"Got heartbeat from unregistered worker $workerId." + - " This worker was never registered, so ignoring the heartbeat.") + " This worker was never registered, tell the worker connection is disconnected." + + " Need to re-register if want to connect.") sender ! MasterDisconnected(masterUrl) } } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 974bb3afba6c4..ae6dd16b7df74 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -466,7 +466,12 @@ private[spark] class Worker( maybeCleanupApplication(id) case MasterDisconnected(masterUrl) => - masterDisconnected() + if (masterUrl != activeMasterUrl) { + logWarning(s"Get message from Invalid Master ($masterUrl)." + + s"Valid Master is : $activeMasterUrl, so ignore the message.") + } else { + masterDisconnected() + } } private def masterDisconnected() {