-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[WIP][SPARK-14408][CORE] Changed RDD.treeAggregate to use fold instead of reduce #12217
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #55151 has finished for PR 12217 at commit
|
|
LGTM pending tests |
|
Jenkins retest this please |
|
Test build #55204 has finished for PR 12217 at commit
|
|
Maybe there is something going on here...investigating |
| } | ||
| partiallyAggregated.reduce(cleanCombOp) | ||
| //partiallyAggregated.reduce(cleanCombOp) | ||
| // This fails: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does anyone see why fold would fail, whereas reduce succeeds?
|
Test build #55281 has finished for PR 12217 at commit
|
|
Apparently it was because of zeroValue being used in multiple places without making a copy. Is this worth committing? I feel like a better solution is to add docs + a unit test to RDD.reduce saying that the combine operation can modify and return the first element. If people agree, I'll do that instead. |
|
Test build #55280 has finished for PR 12217 at commit
|
|
Test build #55296 has finished for PR 12217 at commit
|
|
@jkbradley I see what you mean, but, thinking about it, can this work? the reduce function is applied directly to RDD elements, so modifying one of the arguments and returning it means you're mutating the elements of the RDD in memory, which may have some undefined consequences. For fold, it's fine because the left argument is always actually the zero-value object. Right? Or am I not thinking about it correctly. It might happen to be fine to use reduce in some cases where the RDD values are not used again. |
|
I agree, but am not quite sure how these things work b/c of the serialization across tasks. I'll ping others who might know more than I do. |
|
Test build #2771 has finished for PR 12217 at commit
|
| } | ||
| partiallyAggregated.reduce(cleanCombOp) | ||
| val copiedZeroValue = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance()) | ||
| partiallyAggregated.fold(copiedZeroValue)(cleanCombOp) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's this line which makes AFTSurvivalRegression fail. Not sure why...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jkbradley Is it because the code uses zeroValue, possibly modifying it, before you copy it? what about copying it before line 1085?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried making one copy for each use of zeroValue at the beginning of the method, but that didn't fix the AFT test failures.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm OK if you've got a copy for each of the 3 usages, that really can't be it. Unless the clone isn't implemented as a deep clone for the object in question somehow. Could it be due to a different order of applying the combOp in this case? that's the only other thing I can think of if this change alone is the issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The ordering of the combOp really shouldn't matter for AFT. I feel like it must be some esoteric closure issue.
|
I'll leave this open a bit in hopes the RDD experts can take a look.... |
|
Test build #2784 has finished for PR 12217 at commit
|
|
I remember I took a look for this (in the last time while looking at stale PRs) and I remember I had no idea as well ... @jkbradley I was just wondering if we should better leave this closed rather then open? |
|
@NathanHowell, do you maybe have any idea on this (sorry, probably wrong person to cc but I know no one I could think ... )? |
|
Hi @jkbradley and @srowen, could we retest this just to see the error messages? It looks the last test results are not accessible (to me). |
|
Nothing looks obviously broken, their combiner looks fine. Rerunning the
tests would help.
…On Jun 2, 2017 07:02, "Hyukjin Kwon" ***@***.***> wrote:
Hi @jkbradley <https://github.com/jkbradley> and @srowen
<https://github.com/srowen>, could we retest this just to see the error
messages? It looks the last test results are not accessible (to me).
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#12217 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AAKbTYW4U9nMQhZ3uGwnF-p7aYOmEAU8ks5sABXrgaJpZM4IBfi3>
.
|
|
Test build #3773 has finished for PR 12217 at commit
|
|
The tests in Let me give a shot to provide a minimal reproduction after rebasing it with master. |
|
Uh.... wait. It actually pass the tests after updating this with the current master ... and even I fixed this before - e355460. I double checked that it fails the tests before and it passes the tests after this commit. I think it was a bug about |
| * | ||
| * @param depth suggested depth of the tree (default: 2) | ||
| * @see [[org.apache.spark.rdd.RDD#aggregate]] | ||
| * @see [[org.apache.spark.rdd.RDD#aggregate]] These two methods have identical semantics. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to help ... I believe the actual Javadoc errors look ...
[error] /home/jenkins/workspace/NewSparkPullRequestBuilder/core/target/java/org/apache/spark/rdd/RDD.java:660: error: unexpected content
[error] * @see {@link org.apache.spark.rdd.RDD#aggregate} These two methods have identical semantics.
[error] ^
…reduce ## What changes were proposed in this pull request? Previously, `RDD.treeAggregate` used `reduceByKey` and `reduce` in its implementation, neither of which technically allows the `seq`/`combOps` to modify and return their first arguments. This PR uses `foldByKey` and `fold` instead and notes that `aggregate` and `treeAggregate` are semantically identical in the Scala doc. Note that this had some test failures by unknown reasons. This was actually fixed in e355460. The root cause was, the `zeroValue` now becomes `AFTAggregator` and it compares `totalCnt` (where the value is actually 0). It starts merging one by one and it keeps returning `this` where `totalCnt` is 0. So, this looks not the bug in the current change. This is now fixed in the commit. So, this should pass the tests. ## How was this patch tested? Test case added in `RDDSuite`. Closes #12217 Author: Joseph K. Bradley <[email protected]> Author: hyukjinkwon <[email protected]> Closes #18198 from HyukjinKwon/SPARK-14408.
What changes were proposed in this pull request?
Previously, RDD.treeAggregate used reduceByKey and reduce in its implementation, neither of which technically allows the seq/combOps to modify and return their first arguments.
This PR uses foldByKey and fold instead and notes that aggregate and treeAggregate are semantically identical in the Scala doc.
How was this patch tested?
Existing unit tests