-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-1021] Defer the data-driven computation of partition bounds in so... #1689
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Can one of the admins verify this patch? |
|
Jenkins, this is ok to test. |
|
QA tests have started for PR 1689. This patch merges cleanly. |
|
QA results for PR 1689: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we perhaps make this thread safe?
|
QA tests have started for PR 1689. This patch merges cleanly. |
|
QA results for PR 1689: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we not want to deserialize valRB if it is not null? Are you worried rangeBounds might be called while the deserialization is happening?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also was assuming readObject might be called in multiple threads. Can that happen?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that's not possible
|
Latest push updates RangePartition sampling job to be async, and updates the async action functions so that they will properly enclose the sampling job induced by calling 'partitions'. |
|
QA tests have started for PR 1689 at commit
|
|
Excellent! I'll try to find some time to review this soon. |
|
QA tests have finished for PR 1689 at commit
|
|
QA tests have started for PR 1689 at commit
|
|
QA tests have finished for PR 1689 at commit
|
|
Can one of the admins verify this patch? |
… sortByKey() until evaluation.
…ePartitioner sampling job properly
f3448e4 to
50b6da6
Compare
|
Jenkins, test this please. |
|
QA tests have started for PR 1689 at commit
|
|
@erikerlandson thanks for looking at this. A few questions:
c.parallelize(1 to 1000).map(x => (x, x)).sortByKey().join(sc.parallelize(1 to 10).map(x=>(x,x))) |
|
QA tests have finished for PR 1689 at commit
|
|
Hi @rxin,
My impression is that this whack-a-mole with non-laziness stems from a combination of (a) a data-dependent partitioner(s), with (b) methods that refer to input partitioners as part of the construction of new RDDs. It might be possible to thread some design changes around so that references to partitioning are consistently encapsulated in a Future. Functions such as However it seems (imo) outside the scope of this particular Jira/PR. Maybe we could start another umbrella Jira to track possible solutions along these lines. Another orthogonal thought -- you can short circuit all this by providing a partitioner instead of forcing it to be computed from data. That's not as sexy, or widely applicable, as some deeper fix to the problem, but users can do it now as a workaround when it's feasible. |
|
Or, maybe just look into playing the same game with the cogrouped RDDs that I did with sortByKey. Don't get into invoking |
|
Yea I don't think we need to fully solve 3 here. My main concern with these set of changes is 2, since a single badly behaved RDD can potentially block the (unfortunately single threaded) scheduler forever. Let me think about this a little bit and get back to you. If you have an idea about how to fix that, feel free to suggest them. |
|
So far the best idea I have for (2) is to set some kind of time-out on the evaluation. The bound computation uses subsampling that will (when all goes well) cap the computation at constant time(*). If the timeout triggers, some sub-optimal falback for partitioning might be used. Or just fail the entire evaluation. (*) more accurately, constant number of samples. the time required could depend on various things. |
|
Actually I looked at it again. I don't think it would block the scheduler because we compute partitions outside the scheduler thread. This approach looks good to me! |
|
@erikerlandson i'm going to merge this first. Maybe we can do the cleanup later. |
|
BTW one thing that would be great to add is a test that makes sure we don't block the main dag scheduler thread. The reason I think we don't block is that we call rdd.partitions.length in submitJob: /**
* Submit a job to the job scheduler and get a JobWaiter object back. The JobWaiter object
* can be used to block until the the job finishes executing or can be used to cancel the job.
*/
def submitJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
allowLocal: Boolean,
resultHandler: (Int, U) => Unit,
properties: Properties = null): JobWaiter[U] =
{
// Check to make sure we are not launching a task on a partition that does not exist.
val maxPartitions = rdd.partitions.length |
|
Have either of you thought about how to coordinate this with Josh's work on SPARK-3626? #2482 |
|
Since this PR was merged the correlationoptimizer14 test has been hanging. We might want to consider rolling back. You can reproduce the problem as follows: |
|
I reverted this commit. @erikerlandson mind taking a look at this problem? |
|
@marmbrus, FWIW, the Not sure why, but running |
…apache#1689) We’ve added checkAllStateStoreProviders for Dedisco project to allow checking all state stores which is a debugging feature. One thing we recently discussed with Dedisco project is, the time spent on such check is not counted by reportTimeTaken which can be observed in stream progress later.
...rtByKey() until evaluation.