Skip to content

Conversation

@mccheah
Copy link
Contributor

@mccheah mccheah commented Aug 25, 2015

This takes the Python implementation of group-by-key and brings Scala up to parity, making Scala's group-by-key fault-tolerant to a single large group. It does so via using an ExternalList data structure to combine the groups, where ExternalList can spill if it gets too large.

First, the performance testing in cases where a single group would be too big to fit in memory:

I tried a few versions of this. In my first implementation, I wrote the ExternalList class, and simply replaced the CompactBuffer usage in PairRDDFunctions.groupByKey with external lists. I then ran the ExternalListSuite that did the group by key operation with some parameters modified. With 7200000 unique integers, bucketed into 5 buckets, the trials yielded:

89.333 ms
89.992 ms
104.026 ms

I then switched to the current implementation. It matches exactly what Python does with an ExternalSorter. So in essence, this is a sort-based group by. It wasn't clear to me why sort-based group by is better... until I saw the numbers, for the same RDD and buckets:

49632 ms
53615 ms
54340 ms

Therefore I went with the current implementation. It's not immediately clear to me however how the Python implementation - and this implementation for that matter - gets the specific speedup in using ExternalSorter. It would be great to get some feedback around why that is the case.

Some caveats / things to note that I am concerned about:

  1. Serialization is a bit hacky in ExternalList - it took a bit of work to get ExternalList to be both serializable and yet still be able to be spillable. For example, I use an extra companion object to hold references to "constants" that I don't want to be wiped out upon serialization. Due to the way Java serialization works, the code blocks in a Scala class will not be invoked again, leaving all vals as null. The time where the val makes sense to be instantiated only upon creating the class, I either have to make it a var and instantiate it in the serialization process, or mark the val as lazy so it is re-initialized upon use post-serialization.
  2. ExternalList's spilling behavior is tied to shuffle parameters, which is a bit unintuitive. This is what Python does as well. I guess the alternative would be other Spark configurations, or parameters to group-by-key that have reasonable defaults. Pretty ugly either way. This should theoretically not cause a significant performance regression in existing groupByKey workflows however - existing workflows would have just had the big groups spilled to disk after writing the whole group to the ExternalSorter anyways, right? I haven't been able to benchmark test the cases where regular groupByKey wouldn't spill normally but would spill now, however.
  3. I'm open to opinions on the cleanup logic. I register external lists with weak reference cleaners such that when the lists are GCed, the underlying files should be cleaned up shortly after. If there's a better way to do this, I'm all ears.

@mccheah
Copy link
Contributor Author

mccheah commented Aug 25, 2015

(Not as high priority as Spark 1.5.0 things.)

@SparkQA
Copy link

SparkQA commented Aug 25, 2015

Test build #41571 has finished for PR 8438 at commit 8f5d5e3.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • protected abstract class DiskIterator(file: File, blockId: BlockId, batchSizes: ArrayBuffer[Long])

@mccheah
Copy link
Contributor Author

mccheah commented Sep 17, 2015

@JoshRosen can someone review? I'll fix the merge conflicts though

@SparkQA
Copy link

SparkQA commented Sep 17, 2015

Test build #42612 has finished for PR 8438 at commit 2252019.

  • This patch fails MiMa tests.
  • This patch does not merge cleanly.
  • This patch adds the following public classes (experimental):
    • protected abstract class DiskIterator(file: File, blockId: BlockId, batchSizes: ArrayBuffer[Long])

@mccheah
Copy link
Contributor Author

mccheah commented Sep 17, 2015

I'm also not sure what MiMa tests are. Can someone give me a bit of context so I can fix the build?

@SparkQA
Copy link

SparkQA commented Sep 17, 2015

Test build #42618 has finished for PR 8438 at commit a6d5896.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • protected abstract class DiskIterator(file: File, blockId: BlockId, batchSizes: ArrayBuffer[Long])

@squito
Copy link
Contributor

squito commented Sep 18, 2015

Hi @mccheah , on the mima tests -- those check for binary compatabilities across spark versions. If you click through to the link for the test build, you'll see this in the test output:

[info] spark-core: found 12 potential binary incompatibilities (filtered 560)
[error]  * object org.apache.spark.CleanCheckpoint does not have a correspondent in new version
[error]    filter with: ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.CleanCheckpoint$")
[error]  * class org.apache.spark.CleanupTaskWeakReference does not have a correspondent in new version
[error]    filter with: ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.CleanupTaskWeakReference")
[error]  * class org.apache.spark.CleanBroadcast does not have a correspondent in new version
[error]    filter with: ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.CleanBroadcast")
[error]  * object org.apache.spark.CleanShuffle does not have a correspondent in new version
[error]    filter with: ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.CleanShuffle$")
[error]  * class org.apache.spark.CleanRDD does not have a correspondent in new version
[error]    filter with: ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.CleanRDD")
[error]  * class org.apache.spark.CleanCheckpoint does not have a correspondent in new version
[error]    filter with: ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.CleanCheckpoint")
[error]  * object org.apache.spark.CleanBroadcast does not have a correspondent in new version
[error]    filter with: ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.CleanBroadcast$")
[error]  * object org.apache.spark.CleanAccum does not have a correspondent in new version
[error]    filter with: ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.CleanAccum$")
[error]  * object org.apache.spark.CleanRDD does not have a correspondent in new version
[error]    filter with: ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.CleanRDD$")
[error]  * class org.apache.spark.CleanAccum does not have a correspondent in new version
[error]    filter with: ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.CleanAccum")
[error]  * class org.apache.spark.CleanShuffle does not have a correspondent in new version
[error]    filter with: ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.CleanShuffle")
[error]  * interface org.apache.spark.CleanupTask does not have a correspondent in new version
[error]    filter with: ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.CleanupTask")

you can see its complaining about the classes like org.apache.spark.CleanAccum that you moved around. Mima often has false positives, so the tool lets you override these checks. Its telling you how to do that by each "filter with" statement. Just stick those here: https://github.com/apache/spark/blob/master/project/MimaExcludes.scala#L73

I think this is a case of false positives, since those classes are private. (I would want to take a closer look before being certain about that.) In any case, you can stick those filters in for now to at least let the tests get further.

@JoshRosen
Copy link
Contributor

Hey @mccheah,

This is a really cool patch. Although we're trying to encourage users to migrate workloads to the DataFrames API, there are still many workloads for which this is a useful improvement. This patch has some high inherent complexity and risk, though, since the details of managing file cleanup are non-trivial and it needs to touch a number of stable code paths which haven't been modified in a long time.

I'd like to review this patch but I'm a bit too busy with other 1.6 development and design tasks to devote adequate review time right now. I may have some time to review this in a couple of weeks, though. If you'd like to try to make this easier to review, I have a couple of suggestions (perhaps off-base, since I haven't considered them in detail yet):

  1. Split the ContextCleaner changes into their own JIRA + PR.
  2. Guard the external groupBy behind a feature-flag so that users have an escape-hatch in case bugs are uncovered in the new external groupBy implementation.

@mccheah
Copy link
Contributor Author

mccheah commented Sep 21, 2015

The feature flag definitely makes sense - I'll include that.

I think the feature is a little risky to use without cleanup at all, so I'd feel more comfortable merging them both at once.

@JoshRosen
Copy link
Contributor

@mccheah, to clarify: I was suggesting merging the changes to ContextCleaner ahead of this change; I agree that the cleanup changes are a blocker to merging this.

@mccheah mccheah force-pushed the feature/external-group-by-wip branch from a6d5896 to d01cd4d Compare October 5, 2015 16:04
@mccheah
Copy link
Contributor Author

mccheah commented Oct 5, 2015

@JoshRosen sorry I lagged a bit on this.

I refactored the commit structure to clean things up, but I think we want ContextCleaner interface refactors to be its own JIRA and PR, as you suggested. The thing is, I don't think dependent pull requests on the same branch are really a thing in GitHub, so to speak. (This is different on Github as opposed to something like Gerrit, for instance).

@mccheah
Copy link
Contributor Author

mccheah commented Oct 5, 2015

Ok, we need to merge #8981 before moving forward with this one.

@mccheah
Copy link
Contributor Author

mccheah commented Oct 5, 2015

Oh also spilling is now behind a feature flag.

@SparkQA
Copy link

SparkQA commented Oct 5, 2015

Test build #43239 has finished for PR 8438 at commit d01cd4d.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • protected abstract class DiskIterator(file: File, blockId: BlockId, batchSizes: ArrayBuffer[Long])

@mccheah mccheah force-pushed the feature/external-group-by-wip branch from d01cd4d to d87b554 Compare October 7, 2015 18:40
@SparkQA
Copy link

SparkQA commented Oct 7, 2015

Test build #43333 has finished for PR 8438 at commit d87b554.

  • This patch fails MiMa tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • protected abstract class DiskIterator(file: File, blockId: BlockId, batchSizes: ArrayBuffer[Long])

…tCleaner extends

Preparing the way for SPARK-10250 which will introduce other objects
that will be cleaned via detecting their being cleaned up.
@mccheah mccheah force-pushed the feature/external-group-by-wip branch from d87b554 to 860811d Compare October 8, 2015 15:53
@SparkQA
Copy link

SparkQA commented Oct 8, 2015

Test build #43400 timed out for PR 8438 at commit 860811d after a configured wait of 250m.

@mccheah
Copy link
Contributor Author

mccheah commented Oct 8, 2015

Jenkins, retest this please

@mccheah
Copy link
Contributor Author

mccheah commented Oct 8, 2015

I get the feeling that the spilling tests I wrote are really really slow, so I may need to adjust them.

@SparkQA
Copy link

SparkQA commented Oct 9, 2015

Test build #43422 timed out for PR 8438 at commit 860811d after a configured wait of 250m.

@andrewor14
Copy link
Contributor

@mccheah thanks for submitting the patch. Unfortunately we haven't had the bandwidth to review a feature as large as this one, and it affects only rdd.groupByKey, which we encourage people to avoid anyway. With the new DataFrame and Dataset APIs I think there is less reason to merge this into Spark's code base.

Given that this patch has been idle for more than 2 months I would recommend that we close this issue.

@mccheah mccheah closed this Dec 15, 2015
@mccheah
Copy link
Contributor Author

mccheah commented Dec 15, 2015

Sorry, yeah I haven't had the bandwidth to look at this further. I agree that groupByKey should be avoided anyways so it might not be worthwhile to pursue this further.

@robert3005 robert3005 deleted the feature/external-group-by-wip branch September 24, 2016 04:09
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants