-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-10827] [CORE] AppClient should not use askWithReply in receiveAndReply
#9317
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-10827] [CORE] AppClient should not use askWithReply in receiveAndReply
#9317
Conversation
…AndReply-SPARK-10827
|
@zsxwing would you mind checking this out? I noticed there were no existing unit tests for AppClient, so I added them. I couldn't think of a way to test that the calls are non-blocking, any ideas? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a way to tell if the client fails to connect to the master? It does log an error, but if I wanted to check to see if the AppClient endpoint was registered after calling start(), there doesn't seem to be a way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So the private volatile var registered lets us check if we suggested at registering with a master is that what you were looking for?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, I was trying to think of making a test for AppClient in the case of an unreachable Master. The rpc env logs an exception right away, but the only way to tell from outside the AppClient is to set a listener with a connected callback, then poll to see if it ever gets hit. Maybe this isn't really an issue in practice though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the only way to tell from outside the AppClient is to set a listener with a connected callback, then poll to see if it ever gets hit. Maybe this isn't really an issue in practice though.
I think that's fine.
|
Test build #44495 has finished for PR 9317 at commit
|
|
retest this please |
|
Test build #44639 has finished for PR 9317 at commit
|
|
@holdenk if you wouldn't mind taking a look at this, I'd appreciate it! |
|
Sure thing :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this code (the new runnable sending reply) seems to be duplicated a few times, maybe factor it out into a helper function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At first I was trying to put these 2 calls into something like Utils.tryWithSafeFinally with blocks as arguments but the call ended up looking a little confusing as to what was happening.
maybe just a regular function like this would be better, although a little less flexible
private def receiveAndReplyAsync[T](masterRef: RpcEndpointRef, context: RpcCallContext,
msg: T): Unit = {
// execute ask and reply in thread pool
..|
Thanks for the feedback @holdenk ! |
|
retest this please |
|
Test build #44705 has finished for PR 9317 at commit
|
…AndReply-SPARK-10827
|
retest this please |
|
bad luck today I guess :( |
|
retest this please |
|
Test build #44722 has finished for PR 9317 at commit
|
…AndReply-SPARK-10827
|
retest this please |
|
Test build #44824 has finished for PR 9317 at commit
|
|
retest this please |
|
Test build #44939 has finished for PR 9317 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use org.apache.spark.util.ThreadUtils.newDaemonCachedThreadPool instead.
|
@BryanCutler Could you also make the following variables |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: indention. The correct indention is:
private def receiveAndReplyAsync[T](
masterRef: RpcEndpointRef, context: RpcCallContext, msg: T): Unit = {
|
Thanks for the feedback @zsxwing !
I made this change, but would you mind clarifying a little where these are shared? From what I can tell, I think the |
E.g., requestTotalExecutors is called from SparkDeploySchedulerBackend and it's not in the thread that setting |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: must be notified.
|
Test build #45044 has finished for PR 9317 at commit
|
Got it, thanks! |
|
Test build #45048 has finished for PR 9317 at commit
|
|
LGTM. CC @rxin to take a final look |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's better to use an atomic reference to be more explicit here. i'm going to merge this and update it.
…veAndReply` Changed AppClient to be non-blocking in `receiveAndReply` by using a separate thread to wait for response and reply to the context. The threads are managed by a thread pool. Also added unit tests for the AppClient interface. Author: Bryan Cutler <[email protected]> Closes #9317 from BryanCutler/appClient-receiveAndReply-SPARK-10827. (cherry picked from commit a398905) Signed-off-by: Reynold Xin <[email protected]>
This is a followup for #9317 to replace volatile fields with AtomicBoolean and AtomicReference. Author: Reynold Xin <[email protected]> Closes #9611 from rxin/SPARK-10827. (cherry picked from commit e1bcf6a) Signed-off-by: Reynold Xin <[email protected]>
This is a followup for #9317 to replace volatile fields with AtomicBoolean and AtomicReference. Author: Reynold Xin <[email protected]> Closes #9611 from rxin/SPARK-10827.
This is a followup for apache#9317 to replace volatile fields with AtomicBoolean and AtomicReference. Author: Reynold Xin <[email protected]> Closes apache#9611 from rxin/SPARK-10827.
Changed AppClient to be non-blocking in
receiveAndReplyby using a separate thread to wait for response and reply to the context. The threads are managed by a thread pool. Also added unit tests for the AppClient interface.