-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-2315] Implement drop, dropRight and dropWhile for RDDs #1254
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Can one of the admins verify this patch? |
|
Thanks - I can see why this might be useful, but it is a pretty high bar now to add new APIs to the RDD interface, and we need to be very careful about APIs that might have very bad performance behaviors (dropping a large number can be very slow, in particular if it crosses many partitions). For this reason, it might make more sense for this to be an example program or a blog post that's easily indexable so people can find. |
|
BTW it is just my personal opinion. Feel free to debate or find support :) |
|
My reasoning is that most use cases (or at least the ones I had in mind) are something like rdd.drop(n), where n is much smaller than rdd.count(), generally 1 or some other small number. FWIW, I implemented it via an implicit object, so it's not directly on the RDD class per se. Another way to look at it, these functions aren't worse than rdd.take(), as they use similar logic. However, it's true that if (n) is a large fraction of the size of the RDD, then it will invoke computation of a large fraction of the partitions. |
|
The thing is we must scan data twice to make sure this actually works (because we need to verify the number of partitions we checked is sufficient). Usually users' specific use case can be solved with a very simple workaround despite the lack of RDD.drop (e.g. for csv files with header that you want to drop, you can just drop it at the first partition using an drop within a mapPartitions). |
|
It will scan one partition twice: the one containing the "boundary" between things dropped and not-dropped. Any partitions prior to that boundary are ignored by the resulting RDD (so they are scanned once), and any partitions after the boundary are not examined unless/until the result RDD is evaluated. |
|
Tangentially, one thing I noticed is that currently all the "XxxRDDFunctions" implicits are automatically defined in SparkContext, and so I held to that pattern in this PR. However, another option might be to not automatically define it, and a user would import DropRDDFunctions for themselves if they wanted to use drop methods. In fact, that seems like a good pattern generally for reducing unneeded imports; one might say the same thing for OrderedRDDFunctions, etc: import XxxRDDFunctions if you need it. |
|
Note, in a typical case where one is invoking something like rdd.drop(1), or other small number, only one partition gets evaluated by drop - the first one. |
|
I also envision typical use cases as being either pre- or post-processing. That is, not something that would often appear inside a tight loop. |
|
Adding the Drop function to a contrib library of functions (which requires manual import) , as erik suggests, seems like a really good option. I could see such a contrib library also being useful for other isoteric but nevertheless important tasks, like dealing with binary data formats, etc |
take RDD as input and return new RDD with elements dropped. These methods are now implemented as lazy RDD transforms.
|
I updated this PR so that drop(), dropRight() and dropWhile() are now lazy transforms. A description of what I did is here: |
|
should Jenkins run an automatic build on PR update? |
|
Jenkins, test this please. |
|
O Jenkins Where Art Thou? |
|
jenkins appears to be awol |
|
Let me give it a try: Jenkins, this is ok to test. Jenkins, retest this please. |
|
Starting to worry I confused it by pushing the PR branch using '+' |
|
Should I consider creating a fresh PR, or is there some better way to get Jenkins to test? |
|
I'm not sure what's happening. Maybe Jenkins is lazy today. We can retry tomorrow, and if it doesn't work, create a new PR. |
|
I'm going to try closing this PR and rebooting with a fresh one |
* Update tables.scala * Update tables.scala * Update tables.scala * Update TemporaryTableSuite.scala
drop, dropRight and dropWhile methods for RDDs that return a new RDD as the result.
// example: load in some text and skip header lines
val txt = sc.textFile("data_with_header.txt")
val data = txt.drop(3)