-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-8882][Streaming]Add a new Receiver scheduling mechanism #7276
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #36742 has finished for PR 7276 at commit
|
|
Test build #36797 has finished for PR 7276 at commit
|
|
Test build #36798 timed out for PR 7276 at commit |
|
retest this please |
|
Test build #36868 timed out for PR 7276 at commit |
|
Test build #36919 has finished for PR 7276 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@JoshRosen Pinging you to review this part of the PR that touches SparkContext.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One high-level question, which perhaps has been addressed elsewhere: why can't we use the existing submitJob method that returns a SimpleFutureAction?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW, I use the name submitAsyncJob because if using submitJob, the type inference cannot work well and many places using the previous submitJob cannot be compiled.
|
Test build #37090 timed out for PR 7276 at commit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesnt it return a list of executors?
|
My high-level comment / question: why not use the existing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This TODO needs to be removed.
The problem of |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this messaging should be done differently to align the messaging style with the other messages. It does not make sense for the receiver to fetch all the allowed locations that the receiver is allowed to start. Rather it should be
ShouldStartReceiver, which returns true or false, and accordingly the receiver is started by the ReceiverSupervisor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or if we want to maintain the naming scheme (which uses Register instead Start), it may be better to name the new one ValidateLocation(streamId, host)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, we can merge the change you did in #6294, where ReceiverSupervisor.onReceiverStart() (that is, RegisterReceiver message) is called before the receiver is started, and the receiver is only started if RegisterReceiver message returns true. This mechanism can be used by the ReceiverTracker to prevent the receiver from starting on unwanted executors - when RegisterReceiver is received by the tracker, it will return !stopping && scheduleReceiver(receiverId).contains(host)).
This will solve the race condition as well as check for location without introducing additional messaging... isnt it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds great. One potential issue is if we reject a receiver using !scheduleReceiver(receiverId).contains(host)) when receiving RegisterReceiver, we may keep restarting a receiver if the scheduled executors keep busy, such as running some long running jobs.
My current implementation only rejects a receiver for a mismatch location when restarting it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. How about this.
The scheduler returns the following.
- if there are >= 3, zero-weight options, return all of them
- else return 3 best options.
Before submitting the job, run the scheduler and use those options to launch the job. When the RegisterReceiver is called, you again run the scheduler to get updated options and verify whether the host where the task was actually launched is still in the best options. That should reduce the likelihood of the above condition happening by a lot.
How does that sound?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good. I will update it.
|
2 major high-level comments that need significant code changes before we proceed further.
|
|
Test build #38144 has finished for PR 7276 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any drawback in implementing in the current way? Is it that with 100 receivers, there will be 100 threads stuck.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it that with 100 receivers, there will be 100 threads stuck.
Right. That's why we need #7385.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be more consistent with scheduleReceivers to pass the whole Receiver object, rather than receiverId and preferredLocatons.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using receiverId and preferredLocatons is because we don't store the Receiver object currently. The Receiver object is only available when launching it.
|
Test build #38202 has finished for PR 7276 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be possible to express these test in the nicer way that ReceiverTracker expressed?
testScheduler(numReceivers = 5, preferredLocation = false, allocation = "0|1|2|3|0")
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry. Forgot to reply this one. I don't want to use an accurate assertion because the scheduling policy should be able to assign receivers to any executor as long as the finally result is even.
E.g., if we have 2 executors and 2 receivers, the scheduling result could be (receiver 0 -> executor 0, receiver 1 -> executor 1), or (receiver 0 -> executor 1, receiver 1 -> executor 0).
Looks it's hard to express it as something like allocation = "0|1|2|3|0"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it, makes sense.
|
I think this PR is close to LGTM. I will wait for #7385 to get merged, then this PR can be updated accordingly. |
|
@zsxwing I chatted with @pwendell for his opinion on this, and its fine to merge this as is without the #7385. Once that is merged we can always improve the performance later. Not that its going to really hurt performance in the current state, as it will be just a bunch of sleeping blocked threads, there is not constant thread switching. What do you think? |
|
@tdas Agree. We can do it later. |
|
Alright, then i am merging this! |
The design doc: https://docs.google.com/document/d/1ZsoRvHjpISPrDmSjsGzuSu8UjwgbtmoCTzmhgTurHJw/edit?usp=sharing