-
Notifications
You must be signed in to change notification settings - Fork 28.9k
SPARK[1784]: Adding a balancedPartitioner #876
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This change adds a new partitioner which allows users to specify # of keys per partition.
https://issues.apache.org/jira/browse/SPARK-1686 moved from original JIRA (by @markhamstra): In deploy.master.Master, the completeRecovery method is the last thing to be called when a standalone Master is recovering from failure. It is responsible for resetting some state, relaunching drivers, and eventually resuming its scheduling duties. There are currently four places in Master.scala where completeRecovery is called. Three of them are from within the actor's receive method, and aren't problems. The last starts from within receive when the ElectedLeader message is received, but the actual completeRecovery() call is made from the Akka scheduler. That means that it will execute on a different scheduler thread, and Master itself will end up running (i.e., schedule() ) from that Akka scheduler thread. In this PR, I added a new master message TriggerSchedule to trigger the "local" call of schedule() in the scheduler thread Author: CodingCat <[email protected]> Closes #639 from CodingCat/SPARK-1686 and squashes the following commits: 81bb4ca [CodingCat] rename variable 69e0a2a [CodingCat] style fix 36a2ac0 [CodingCat] address Aaron's comments ec9b7bb [CodingCat] address the comments 02b37ca [CodingCat] keep schedule() calling in the main thread
This partitioner uses round robin allocation strategy for keys to end up with balanced partitions for a RDD.
This reverts commit 6668015.
|
Can one of the admins verify this patch? |
|
The current contract of It turns out this sort of balanced partitioning is useful, however, and we have encoded it explicitly within RDD#coalesce(). The semantics here match Spark's assumptions about partitioners -- i.e., the resultant RDD has no Partitioner, so no assumption can be made about the colocation of keys in order to do efficient lookups/groupBys/reduceByKeys. Would this sort of manual repartitioning suit your use-case? Otherwise it would require a rather significant overhaul to Spark's Partitioner semantics. |
|
You are right there are routines which make this assumption but this is becoming a pain point for users as they end up with lopsided partitions and especially, if their dataset is huge, some larger partitions become bottleneck and extend the tail of processing time. This partitioner is explicitly targeting such scenarios. If agree upon general idea of partitioner itself, I can add checks to functions assuming Hash or Range partitioning behavior to classify Balanced partitioner as general case. User ends up with exactly balanced partitions and sacrifices a bit at lookup type routines. |
|
Yes I had a similar question. This would calculate different partitions for the same key when called from different places and times, and I imagine that causes several methods to fail. For example, what about joining two RDDs both using this partitioner (and with multiple partitions) -- anything that creates a shuffle dependency among two pair RDDs. Surely the different instances of the depended-upon RDD's partitioner will return different partitions for keys and get the answer wrong? I'm thinking of any time the partitioner instance is copied around -- it will copy state but then its state, which is essential to its answers, varies. Maybe someone more knowledgeable than I can confirm an easy way to test this, or that I really misunderstand and this never happens.
I had thought the problem was more often in pair RDDs where one key has a lot of values, and operations that group by key create imbalanced partitions? That's not the question here right, that wouldn't be helped by this. |
|
@syedhashmi If you just want to shuffle stuff around randomly (i.e. you lose affinity of keys to specific partitions) then isn't it sufficient to just call |
|
Here is the specific code: |
|
@pwendell : You are right ... your patch addresses this scenario. Does it make sense to expose this functionality through a partitioner as that is the intuitive way for most folks or do you think that will be duplication of logic? |
|
This functionality doesn't fit the definition of a Partitioner as used in Spark (which requires it to consistently return the same partition for each key), so it would be confusing to expose it as such. The In particular, Partitioners are also used to decide whether you can optimize joins and lookups based on a key's partition. This would break that behavior. |
…d ssl passwords (apache#876) Co-authored-by: Egor Krivokon <>
…er2Listener (#876) * optimize HiveThriftServer2Listener
…d ssl passwords (apache#876) Co-authored-by: Egor Krivokon <>
…d ssl passwords (apache#876) Co-authored-by: Egor Krivokon <>
This change adds a balanced partitioner to existing partitioners. The new partitioner uses round robin strategy to allocate keys to partitions so that we end up with balanced partitions for a RDD.