Skip to content

Conversation

@vanzin
Copy link
Contributor

@vanzin vanzin commented Oct 15, 2015

Because the registration RPC was not really an RPC, but a bunch of
disconnected messages, it was possible for other messages to be
sent before the reply to the registration arrived, and that would
confuse the Worker. Especially in local-cluster mode, the worker was
succeptible to receiving an executor request before it received a
message from the master saying registration succeeded.

On top of the above, the change also fixes a ClassCastException when
the registration fails, which also affects the executor registration
protocol. Because the ask is issued with a specific return type,
if the error message (of a different type) was returned instead, the
code would just die with an exception. This is fixed by having a common
base trait for these reply messages.

Because the registration RPC was not really an RPC, but a bunch of
disconnected messages, it was possible for other messages to be
sent before the reply to the registration arrived, and that would
confuse the Worker. Especially in local-cluster mode, the worker was
succeptible to receiving an executor request before it received a
message from the master saying registration succeeded.

On top of the above, the change also fixes a ClassCastException when
the registration fails, which also affects the executor registration
protocol. Because the `ask` was issued with a specific return type,
if the error message (of a different type) was returned instead, the
code will just die with an exception. This is fixed by having a common
base trait for these reply messages.
@SparkQA
Copy link

SparkQA commented Oct 15, 2015

Test build #43800 has finished for PR 9138 at commit cb3f972.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class RegisterWorkerFailed(message: String) extends DeployMessage with RegisterWorkerResponse
    • case class RegisteredExecutor() extends CoarseGrainedClusterMessage
    • case class HyperLogLogPlusPlus(

@vanzin
Copy link
Contributor Author

vanzin commented Oct 15, 2015

pyspark failure, sigh. Is there a bug tracking these flaky tests? retest this please

@SparkQA
Copy link

SparkQA commented Oct 15, 2015

Test build #43808 has finished for PR 9138 at commit cb3f972.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class RegisterWorkerFailed(message: String) extends DeployMessage with RegisterWorkerResponse
    • case class RegisteredExecutor() extends CoarseGrainedClusterMessage

@vanzin
Copy link
Contributor Author

vanzin commented Oct 16, 2015

/cc @andrewor14 @zsxwing I think you're the people most familiar with this code.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is kind of scary. We should always send a response otherwise we might get random future timeout exceptions

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps, but not really related to the problem at hand.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what I'm saying is in general in receiveAndReply you should always reply because that's what the caller expects. Otherwise it's really confusing when we get future timeouts cause they're hard to debug.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with you, I'm just saying that's a bug in itself, and I'd rather not make that as part of this, because that might affect other parts of the code where the workers retry connections to different masters.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wait, actually that's a problem. Right now we have a send and an ask. The send actually won't be received by anyone because we only handle this in receiveAndReply

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might have missed a send. But the lack of a reply here is not necessarily a bug, and is definitely not related to this one.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, nevermind. I see what you mean. I'm introducing the need to reply by switching to ask. Let me take a look...

@andrewor14
Copy link
Contributor

@vanzin just so I understand, it's these two lines that are racing, right?

workerRef.send(RegisteredWorker(self, masterWebUiUrl))

worker.endpoint.send(LaunchExecutor(masterUrl,

@vanzin
Copy link
Contributor Author

vanzin commented Oct 17, 2015

@andrewor14 correct, that's what causes the race.

@SparkQA
Copy link

SparkQA commented Oct 17, 2015

Test build #43876 has finished for PR 9138 at commit ff2d3e2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class RegisterWorkerFailed(message: String) extends DeployMessage with RegisterWorkerResponse
    • case class RegisteredExecutor() extends CoarseGrainedClusterMessage

Also go back to using case objects since they seem to work; I'll make
the change to case classes where needed in the corresponding PR (not
yet sent).
@SparkQA
Copy link

SparkQA commented Oct 17, 2015

Test build #43884 has finished for PR 9138 at commit 2d35024.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class RegisterWorkerFailed(message: String) extends DeployMessage with RegisterWorkerResponse

@andrewor14
Copy link
Contributor

LGTM I'm merging this into master thanks @vanzin

@asfgit asfgit closed this in 7ab0ce6 Oct 19, 2015
@vanzin vanzin deleted the SPARK-11131 branch October 24, 2015 21:40
@zsxwing
Copy link
Member

zsxwing commented Dec 10, 2015

@vanzin just found an issue about this change. Now if the master receives RegisterWorker, it won't use the workerRef to send the reply. So there is no connection from Master to the server in Worker. If the Worker is killed now, Master only observes some client is lost, but the address is just a client address in Worker and won't match the Worker address. So Master cannot remove this dead Worker at once. However, this Worker will be removed in 60 seconds because of no heartbeat.

See the log here:
https://www.mail-archive.com/[email protected]/msg12332.html

@andrewor14
Copy link
Contributor

To echo @vanzin on SPARK-12267, the cause of SPARK-12267 is not this PR but #9210.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants