-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-15353] [CORE] Making peer selection for block replication pluggable #13152
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-15353] [CORE] Making peer selection for block replication pluggable #13152
Conversation
| def apply(execId: String, | ||
| host: String, | ||
| port: Int, | ||
| topologyInfo: Option[String] = None): BlockManagerId = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we need to correct indentation as below..
def apply(execId: String,
host: String,
port: Int,
topologyInfo: Option[String] = None): BlockManagerId =|
Fixed style issues pointed out by @HyukjinKwon |
|
A couple high level questions:
Also, the patch would be more minimal if only the |
|
The topology info is only queried when the executor initiates and is assumed to stay the same throughout the life of the executor. Depending on the cluster manager being used, I am assuming the exact way this information is provided may differ. Resolving this at the master makes this implementation simpler as only the master needs to be able to access the service/script/class being used to resolve the topology. The communication overhead is minimal as the executors do have to communicate with the master when they initiate anyways. The getRandomPeer() method was doing quite a bit more than just getting a random peer. It was being used to manage/mutate state, which was being mutated in other places as well. I tried to keep the block placement strategy and the usage of its output separate, to make it simpler to provide a new block placement strategy. I also thought it would be best to de-couple any internal replication state management with the block replication strategy, while still keeping the structure of the state the same. The costlier operation here is the RPC fetch of all the peers from the master. The prioritization algorithm is only called once if there are no failures. If there are failures, the list of peers is requested from the master again, before the prioritizer is run. The bigger hit again, would be the RPC communication between the executor and the master. Random.shuffle in the default prioritizer uses Fisher-Yates shuffle, so is linear in time. |
I see, that makes sense, though it is a little weird to ask the master for info that you use to register right away later.
Still, I think it would be a smaller change to just move some of that logic out of getRandomPeer(), and retain the rest. Then you just need to implement getNextPeer(), and BlockManager doesn't need to worry about tracking the prioritized order internally. |
|
The state being managed inside getRandomPeer() is also modified in a couple of other places, so it won't be a very clean change to remove some of it out of getRandomPeer. Even if that is done, I agree that your approach would only mean calling getNextPeer. It would however mean adding more state to ensure expected behavior in cases where block replication fails on a peer. I am flexible about the implementation choices, so can do the modifications if needed. Just to clarify on the motivation of this interface, I have another PR SPARK-15354 that shows a couple of prioritizers that I intend to add (including a simple one that replicates HDFS's block replication strategy). Note that in case of failures, the list of peers is requested from the master afresh and is optimized over again. With this interface, the ReplicationTargetSelector would have to be generated afresh, and an iteration of the optimization would run every time getNextPeer is called. Let me know what you think. |
|
You wouldn't have to create a new selector after a failure. That case can be detected by checking if the number of failed replications has increased, e.g. I think the main benefit here is that the interface would be more flexible as a developer facing API. |
|
|
||
| blockManagerId = BlockManagerId( | ||
| executorId, blockTransferService.hostName, blockTransferService.port) | ||
| executorId, blockTransferService.hostName, blockTransferService.port, topologyInfo) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it work if topologyInfo was sent back from the master when registerBlockManager is called? It doesn't seem that anything uses blockManagerId until registration finishes. That way we wouldn't need this two-step registration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I implemented this in the commits below.
3ee664e to
7b8685c
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: seems a little easier to read if it was !a && !b
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Converted to !a && !b
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still seems the same
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My bad, the latest commit fixed this. The while loop also had a similar condition and had fixed that earlier.
…oosing the same peers for replication.
1. Adding rack attribute to hashcode and equals to block manager id. 2. Removing boolean check for rack awareness. Asking master for rack info, and master uses topology mapper. 3. Adding a topology mapper trait and a default implementation that block manager master endpoint uses to discern topology information.
…al topology instead of rack.
…o get a fully fleshed out id, with topology information, if available.
…while loop, as suggested by @ericl 2. Adding SparkConf constructor arguments to TopologyMapper, so any required properties like classname or file/script names can be passed to a custom topology mapper.
…plicationPolicy api to take the number of peers needed and adding a sampling algo linear in time and space along with test cases.
907154c to
632d043
Compare
|
Rebased to master to resolve merge conflicts |
|
LGTM - sorry that this has taken a while. I will merge once tests pass. Also cc @zsxwing for his attention. |
|
Test build #3291 has finished for PR 13152 at commit
|
|
Merging in master. |
What changes were proposed in this pull request?
This PR makes block replication strategies pluggable. It provides two trait that can be implemented, one that maps a host to its topology and is used in the master, and the second that helps prioritize a list of peers for block replication and would run in the executors.
This patch contains default implementations of these traits that make sure current Spark behavior is unchanged.
How was this patch tested?
This patch should not change Spark behavior in any way, and was tested with unit tests for storage.