Skip to content

Conversation

@squito
Copy link
Contributor

@squito squito commented Feb 7, 2015

https://issues.apache.org/jira/browse/SPARK-1061

If you partition an RDD, save to hdfs, then reload it in a separate SparkContext, you've lost the info that the RDD was partitioned. This prevents you from getting the savings of a narrow dependency you could get. This is especially painful if you've got some big dataset on hdfs, and you periodically get small updates that need to be joined against it.

assumePartitionedBy lets you simply assign a partitioner to an RDD, so you can get your narrow dependencies back. Its up to the application to know what the partitioner should be, but it will at least verify the assignment is OK.

@SparkQA
Copy link

SparkQA commented Feb 7, 2015

Test build #26991 has finished for PR 4449 at commit e041155.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@squito squito changed the title assumePartitioned [SPARK-1061] assumePartitioned Feb 7, 2015
@SparkQA
Copy link

SparkQA commented Feb 7, 2015

Test build #26992 has finished for PR 4449 at commit 943984f.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 7, 2015

Test build #27014 has finished for PR 4449 at commit 0e98abe.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 8, 2015

Test build #27016 has finished for PR 4449 at commit b828f01.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 8, 2015

Test build #27021 has finished for PR 4449 at commit ed154ce.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@JoshRosen
Copy link
Contributor

Jenkins, retest this please.

@JoshRosen
Copy link
Contributor

(This failure was due to me changing a Jenkins settings; I've reverted the change)

@SparkQA
Copy link

SparkQA commented Feb 8, 2015

Test build #27022 has finished for PR 4449 at commit ed154ce.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 8, 2015

Test build #27023 has finished for PR 4449 at commit ea016db.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Feb 8, 2015

Test build #27026 has finished for PR 4449 at commit f6c13a1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@pwendell
Copy link
Contributor

pwendell commented Feb 8, 2015

This seems like a slightly awkward API to expose since to use it you need to basically write a customized InputFormat. If someone is writing customized InputFormat's, then why can't the just write a custom RDD as well. Is the idea that someone would write an input format that only returns a single split for each file?

@squito
Copy link
Contributor Author

squito commented Feb 9, 2015

@pwendell its a good question, I was wondering the same thing a little bit as I was writing those unit tests and was going to comment on the jira about this a little. It is definitely annoying to have to write a custom input format -- but I only need to do that to turn off splits. Every once in a while this comes up on the user list too -- should we just add another version of sc.hadoopFile, sc.textFile, and sc.sequenceFile to turn off splits? Unfortunately I don't think it makes any sense to directly pass an assumedPartitioner as an argument to those functions, since you really need to put in a map step in the middle to extract the key.

Really this gets to a more general question: when do we add these "convenience" methods to RDD?? Given that this requires application logic to track the partitioner to use, I doubt this will ever be used by other code within spark itself. But I would still make the case for its inclusion, since (a) it leads to a big optimization that is not obvious to most users. By promoting it to a function within spark itself, users are more likely to be aware of it. (b) its a little tricky to get right -- I think the verify step is really important to make sure this doesn't lead to completely wrong results in the user app. And (c) I think its a common use case. Not so common it would make it into spark tutorial's, or even into daily use of an experienced spark-user -- but I imagine it has a place in every "batch" use of spark, where there is some big dataset that lives on hdfs between spark contexts.

OTOH, we could just put this in some general location with spark-examples, and leave it out of spark itself. I guess we only need to make the change to HadoopRDD to sort the partitions.

@squito
Copy link
Contributor Author

squito commented Mar 16, 2015

ping

If I haven't made a convincing argument this is a useful addition to the core api, than I'll change the PR to only add the sorting to HadoopRDD's partitions, as that is the only change to what is already there, and I can move this to an external package

@IgorBerman
Copy link

@squito Imran, any progress on this issue? We have same problem with narrowing dependencies(exactly the case you are describing with big dataset that lives on disk with small additions to it using join in different spark context eash time)
Do you have some examples? Maybe some blog post ;)?

@squito
Copy link
Contributor Author

squito commented Jun 2, 2015

@rapen sorry no updates ... I think this is more or less ready, but seems there isn't much interest in getting this in core, unfortunately. It would be nice to put this elsewhere into a standalone package for Spark. I don't have time to do that at the moment -- feel free to take a stab at if you like.

You can also just use what's here in your project. The only problem is you need to make a copy of HadoopRdd so you can make the modifications here (in particular, to get consistent ordering of the partitions)

@IgorBerman
Copy link

@squito thanks! your work is very helpful. I'm testing now solution based on your code. I've subclassed NewHadoopRDD with same changes you've made in HaddopRDD and then created new method that creates this custom rdd(kind of copy-paste from newHadoopApi from SparkContext) + defined NonSplittable InputFormat(subclassed from avro formats). Thus I don't need to change HadoopRDD and recompile spark with it...it's just sort of extension(might be this what can be part of standalone library...I'm not pro in scala programming, so not sure to show this code to someone :)
Anyway, all shuffles disappeared ! 👍

@danielhaviv
Copy link

Hi,
Is there a chance someone could share some code that could shed some light on how to use this feature?

Thank you.
Daniel

@IgorBerman
Copy link

@danielhaviv , see tests in PR

@koertkuipers
Copy link
Contributor

i would like to have something like this in core

On Fri, Sep 4, 2015 at 6:22 AM, rapen [email protected] wrote:

@danielhaviv https://github.com/danielhaviv , see tests in PR


Reply to this email directly or view it on GitHub
#4449 (comment).

@JoshRosen
Copy link
Contributor

What's the final verdict on this? Can we do the standalone package approach for now, then close this out?

@rxin
Copy link
Contributor

rxin commented Dec 31, 2015

I'm going to close this pull request. If this is still relevant and you are interested in pushing it forward, please open a new pull request. Thanks!

@asfgit asfgit closed this in 7b4452b Dec 31, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants