-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-12662][SQL] Fix DataFrame.randomSplit to avoid creating overlapping splits #10626
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to start a whole new process to test this? I think we can just run randomSplit in the normal DataFrameSuite?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have not figure out a case that can trigger the problem in the local mode.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sc.parallelize(1 to 10).mapPartitions(scala.util.Random.shuffle(_)).collect()There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh, right. We missed it. It is a good one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's neat! Converted it into a unit test in DataFrameStatSuite.
|
Test build #48893 has finished for PR 10626 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since the tests are run so frequently, I don't think you need to try these many times ... doing it once should be enough.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the just the size of the dataset. We do however test for 5 different seeds. Should I just test for 1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yea 1 is fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
also you can just run it twice and make sure the result is deterministic, i.e.
val a = df.randomSplit(...).toSeq.map(_.collect())
val b = df.randomSplit(...).toSeq.map(_.collect())
assert(a == b)
as long as these are scala collections, I think they will work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, but to be fair, this new test does test a new codepath (that of inserting a sampling operator after a shuffle)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't that the same code path?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
once we implement sample pushdown in catalyst, it shouldn't be: http://research.microsoft.com/pubs/76565/sig99sam.pdf :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what do u mean? the shuffle happens outside of catalyst, so the optimizer can't push it beneath it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to be clear, i'm suggesting removing everything the previous test case already tests, and only keep
// Verify that the results are deterministic across multiple runs
val data = sparkContext.parallelize(1 to n, 2).mapPartitions(scala.util.Random.shuffle(_)).toDF("id")
val splits = data.randomSplit(Array[Double](1, 2, 3), seed = 1)
val firstRun = splits.toSeq.map(_.collect().toSeq)
val secondRun = data.randomSplit(Array[Double](1, 2, 3), seed = 1).toSeq.map(_.collect().toSeq)
assert(firstRun == secondRun)
d866339 to
252dbc3
Compare
|
Test build #48903 has finished for PR 10626 at commit
|
8e28f15 to
ded1bfa
Compare
ded1bfa to
1b30119
Compare
|
Test build #48908 has finished for PR 10626 at commit
|
|
Test build #48924 has finished for PR 10626 at commit
|
|
Test build #48901 has finished for PR 10626 at commit
|
|
Test build #48919 has finished for PR 10626 at commit
|
|
All comments addressed! |
|
Thanks - I'm going to merge this. |
…pping splits https://issues.apache.org/jira/browse/SPARK-12662 cc yhuai Author: Sameer Agarwal <[email protected]> Closes #10626 from sameeragarwal/randomsplit. (cherry picked from commit f194d99) Signed-off-by: Reynold Xin <[email protected]>
|
@sameeragarwal Could you take a look at the following test failure? It sounds like this is caused by this fix. If you are busy, I can work on it. Thanks! |
|
I might not pick up your latest code changes. Let me merge the code. Thanks! |
|
@gatorsmile it seems like your PR is changing the behavior of SQL intersect that this test relies on. I can take a closer look at the PR but if you think this test is using intersect in a way that is not supported in SparkSQL, we can change DataFrameStatSuite:L76 to |
|
It is best for us to use local collection's intersect rather than relying on dataframe's. |
|
@gatorsmile I pulled your changes and verified that the new intersect implementation no longer works when there are deterministic sampling operators in the plan, for e.g., val plan = sparkContext.parallelize(1 to 600, 1).toDF("id").logicalPlan
val sample1Plan = new DataFrame(sqlContext, Sample(0.0, 0.1, false, 1, plan))
val sample2Plan = new DataFrame(sqlContext, Sample(0.1, 1, false, 1, plan))
assert(sample1Plan.intersect(sample2Plan).collect().isEmpty) //FAILS
assert(sample1Plan.collect().intersect(sample2Plan.collect()).isEmpty) //SUCCEEDSIf that's intentional, please let me know if you'd like me to fix the test or you'd rather fold the fix as part of your PR. Thanks! Edit: fixed example. |
|
Thank you @sameeragarwal for investigating this! Sorry to bring this to you at midnight. For helping anyone understand the problem, let me post the logical plan if we do not collect the data to the local node: This does not look right. Expression IDs should be different. Let me see how to fix this issue. Thanks! |
|
The fix is done. Thank you! It is not related to your PR. Sorry : ( After the fix, the plan should be like |
|
@gatorsmile where is the fix? |
https://issues.apache.org/jira/browse/SPARK-12662
cc @yhuai