-
Notifications
You must be signed in to change notification settings - Fork 9.1k
YARN-11350. [Federation] Router Support DelegationToken With ZK. #5131
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
💔 -1 overall
This message was automatically generated. |
|
💔 -1 overall
This message was automatically generated. |
|
🎊 +1 overall
This message was automatically generated. |
|
@goiri Can you help review this pr? Thank you very much! |
.../java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java
Outdated
Show resolved
Hide resolved
.../java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java
Show resolved
Hide resolved
| ByteBuffer.wrap(key.getEncodedKey()), key.getExpiryDate()); | ||
| } catch (Exception ex) { | ||
| if (!quiet) { | ||
| LOG.error("No node in path [" + nodePath + "]"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use {}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will fix it.
| * @return | ||
| * @throws IOException | ||
| */ | ||
| private RouterMasterKey getRouterMasterKeyFromZK(String nodePath, boolean quiet) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see much value in this quiet option.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with you, I will remove the quiet option.
|
|
||
| @Test | ||
| public void testGetMasterKeyByDelegationKey() throws YarnException, IOException { | ||
| public void testGetMasterKeyByDelegationKey() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is correct but do we need to add all this churn?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The exception thrown by the method signature, I will recover. In the process of writing JunitTest code, I used curatorFramework.getData().forPath method to get data from ZK. This method throws an Exception, so I modified the exception throwing code of the method signature.
/**
* Commit the currently building operation using the given path
*
* @param path the path
* @return operation result if any
* @throws Exception errors
*/
public T forPath(String path) throws Exception;
| @Test(expected = NotImplementedException.class) | ||
| public void testRemoveStoredMasterKey() throws YarnException, IOException { | ||
| super.testRemoveStoredMasterKey(); | ||
| public void testUpdateStoredToken() throws Exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we keep the Test annotation in all these methods?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your suggestion, I will modify the code.
…-server-common/src/main/java/org/apache/hadoop/yarn/server/federation/store/impl/ZookeeperFederationStateStore.java Co-authored-by: Inigo Goiri <[email protected]>
|
Thanks @slfan1989 , Great work here! |
|
🎊 +1 overall
This message was automatically generated. |
|
💔 -1 overall
This message was automatically generated. |
|
🎊 +1 overall
This message was automatically generated. |
|
@goiri Can you help review this PR again? Thank you very much! |
|
🎊 +1 overall
This message was automatically generated. |
@Hexiaoqiao Thank you very much for your suggestion, I agree with your idea. After completing this pr, I will continue to carefully read the codes of the HDFS RBF / KMS modules about delegation token, and then extract this part of the code to the common module. If other modules need it, we can be used directly. |
|
🎊 +1 overall
This message was automatically generated. |
|
@goiri Can you help to review this PR again? Thank you very much! |
| */ | ||
| @Override | ||
| public RouterRMTokenResponse storeNewToken(RouterRMTokenRequest request) | ||
| public synchronized RouterRMTokenResponse storeNewToken(RouterRMTokenRequest request) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What are we protecting with the synchronized?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much for your help in reviewing the code. In ZookeeperFederationStateStore, we have 7 methods that use the synchronized keyword
- Group1 methods are mainly used to create or remove MasterKey
- storeNewMasterKey
- removeStoredMasterKey
- Group2 methods are mainly used to create or update and remove Token
- storeNewToken
- updateStoredToken
- removeStoredToken
- Group3 methods are used to increment DelegationTokenSeqNum and MasterKeyId
- incrementDelegationTokenSeqNum
- incrementCurrentKeyId
This part is mainly to protect the ZNode of ZK. We hope that the same Router has only one thread to write or update or delete the same ZNode at the same time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use a finer grain lock. There are many things synchronized that can be done in parallel. Maybe a lock per group too. Could we even do a read write lock?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your suggestion, I will refactor this part of the code using lock.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use a finer grain lock. There are many things synchronized that can be done in parallel. Maybe a lock per group too. Could we even do a read write lock?
Thank you very much for your suggestion, your understanding is accurate, if we use synchronized keyword at method level, there is too much involved.
When completing the PR (#5169), JIRA: YARN-11349. [Federation] Router Support Delegation Token With SQL.
I carefully read the AbstractDelegationTokenSecretManager code, I think we should be able to remove the synchronized keyword in ZookeeperFederationStateStore.
I am worried that concurrent access may cause different threads to generate the same primary key data, so that errors will occur when storing, so in the methods of adding, deleting, and updating MasterKey and Token I used the synchronized method.
But I found out that this does not happen for generating the same primary key.
Group1 methods are mainly used to create or remove MasterKey
The key information in RouterMasterKey is DelegationKey, and the primary key of DelegationKey is keyId.
We found that DelegationKey will be constructed in AbstractDelegationTokenSecretManager#updateCurrentKey.
- AbstractDelegationTokenSecretManager#updateCurrentKey
This method is protected by synchronized when generating the primary key keyId and storing the masterKey, so the uniqueness of the primary key can be guaranteed
private void updateCurrentKey() throws IOException {
LOG.info("Updating the current master key for generating delegation tokens");
int newCurrentId;
// 1. newCurrentId is keyId, which has been protected
synchronized (this) {
newCurrentId = incrementCurrentKeyId();
}
DelegationKey newKey = new DelegationKey(newCurrentId, System
.currentTimeMillis()
+ keyUpdateInterval + tokenMaxLifetime, generateSecret());
logUpdateMasterKey(newKey);
// 2. storeDelegationKey calls storeNewMasterKey, this part has been protected
synchronized (this) {
currentKey = newKey;
storeDelegationKey(currentKey);
}
}
protected void storeDelegationKey(DelegationKey key) throws IOException {
allKeys.put(key.getKeyId(), key);
storeNewMasterKey(key);
}
We can find that the method of removeMasterKey is also protected.
- AbstractDelegationTokenSecretManager#removeExpiredKeys
private synchronized void removeExpiredKeys() {
long now = Time.now();
for (Iterator<Map.Entry<Integer, DelegationKey>> it = allKeys.entrySet()
.iterator(); it.hasNext();) {
Map.Entry<Integer, DelegationKey> e = it.next();
if (e.getValue().getExpiryDate() < now) {
it.remove();
// ensure the tokens generated by this current key can be recovered
// with this current key after this current key is rolled
if(!e.getValue().equals(currentKey))
removeStoredMasterKey(e.getValue());
}
}
}
Group2 methods are mainly used to create or update and remove Token
For DelegationToken, we can also find that the add, update, and remove methods are all protected.
- AbstractDelegationTokenSecretManager#storeToken
protected synchronized byte[] createPassword(TokenIdent identifier) {
.....
DelegationTokenInformation tokenInfo = new DelegationTokenInformation(now
+ tokenRenewInterval, password, getTrackingIdIfEnabled(identifier));
try {
// storeToken
METRICS.trackStoreToken(() -> storeToken(identifier, tokenInfo));
} catch (IOException ioe) {
LOG.error("Could not store token " + formatTokenId(identifier) + "!!",
ioe);
}
return password;
}
- AbstractDelegationTokenSecretManager#renewToken
public synchronized long renewToken(Token<TokenIdent> token,
String renewer) throws InvalidToken, IOException {
......
long renewTime = Math.min(id.getMaxDate(), now + tokenRenewInterval);
String trackingId = getTrackingIdIfEnabled(id);
DelegationTokenInformation info = new DelegationTokenInformation(renewTime,
password, trackingId);
if (getTokenInfo(id) == null) {
throw new InvalidToken("Renewal request for unknown token "
+ formatTokenId(id));
}
// updateToken
METRICS.trackUpdateToken(() -> updateToken(id, info));
return renewTime;
}
- AbstractDelegationTokenSecretManager#cancelToken
public synchronized TokenIdent cancelToken(Token<TokenIdent> token,
String canceller) throws IOException {
......
DelegationTokenInformation info = currentTokens.remove(id);
if (info == null) {
throw new InvalidToken("Token not found " + formatTokenId(id));
}
// cancelToken
METRICS.trackRemoveToken(() -> {
removeTokenForOwnerStats(id);
removeStoredToken(id);
});
return id;
}
Group3 methods are used to increment
DelegationTokenSeqNumandMasterKeyId
These two methods are also protected in the abstract class.
- AbstractDelegationTokenSecretManager#incrementDelegationTokenSeqNum
protected synchronized int incrementDelegationTokenSeqNum() {
return ++delegationTokenSequenceNumber;
}
- AbstractDelegationTokenSecretManager#incrementCurrentKeyId
protected synchronized int incrementCurrentKeyId() {
return ++currentId;
}
|
|
||
| @Test | ||
| public void testRemoveStoredMasterKey() throws YarnException, IOException { | ||
| public void testRemoveStoredMasterKey() throws IOException, YarnException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you so much for reviewing the code, I will fix it.
|
💔 -1 overall
This message was automatically generated. |
|
💔 -1 overall
This message was automatically generated. |
|
💔 -1 overall
This message was automatically generated. |
|
💔 -1 overall
This message was automatically generated. |
| ByteBuffer.wrap(key.getEncodedKey()), key.getExpiryDate()); | ||
| FederationStateStore stateStore = this.getStateStore(); | ||
|
|
||
| assertTrue(stateStore instanceof ZookeeperFederationStateStore); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use the super method and have a function to check these things we do in every method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much for helping to review the code, I will fix it.
| RouterStoreToken storeToken = RouterStoreToken.newInstance(identifier, renewDate); | ||
| RouterRMTokenRequest request = RouterRMTokenRequest.newInstance(storeToken); | ||
| RouterRMTokenResponse routerRMTokenResponse = stateStore.storeNewToken(request); | ||
| Assert.assertNotNull(routerRMTokenResponse); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As you statically imported assertTrue and others, import this too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will fix it.
|
@goiri Can you help review pr(#5182)? Thank you very much! I fixed Java Doc issue for In the process of compiling YARN-11350, I encountered some java doc compilation errors. We can see the compilation report as follows: |
|
💔 -1 overall
This message was automatically generated. |
|
💔 -1 overall
This message was automatically generated. |
|
💔 -1 overall
This message was automatically generated. |
| import org.apache.hadoop.yarn.server.federation.store.records.AddReservationHomeSubClusterRequest; | ||
| import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterRequest; | ||
| import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationHomeSubClusterRequest; | ||
| import org.apache.hadoop.yarn.server.federation.store.records.*; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Avoid
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much for helping to review the code, I will fix it.
...st/java/org/apache/hadoop/yarn/server/federation/store/impl/TestSQLFederationStateStore.java
Show resolved
Hide resolved
| } | ||
|
|
||
| @Test(expected = NotImplementedException.class) | ||
| @Test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can just inherit and no need to have them here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with you, I will remove this part of the code.
| nodeCreatePath); | ||
|
|
||
| // Write master key data to zk. | ||
| try(ByteArrayOutputStream os = new ByteArrayOutputStream(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Space after try.
|
💔 -1 overall
This message was automatically generated. |
|
💔 -1 overall
This message was automatically generated. |
|
💔 -1 overall
This message was automatically generated. |
|
💔 -1 overall
This message was automatically generated. |
|
@goiri Can you help to merge this pr into the trunk branch? Thank you very much! We have fixed all java doc issues for |
|
💔 -1 overall
This message was automatically generated. |
|
@goiri Thank you very much for helping to review the code! |
JIRA: YARN-11350. [Federation] Router Support DelegationToken With ZK.
YARN Router needs to support DelegationToken. In the process of implementing this feature, refer to RMDelegationTokenSecretManager, HDFS RBF#ZKDelegationTokenSecretManager.
There are a few things to keep in mind to implement this feature:
1.How to store and query
masterkey?2.How to store and query
DelegationToken?3.How to store and query
SequenceNum?For
question 1andquestion 2, we can directly refer to RMDelegationTokenSecretManager to serialize the data and store it in ZK.For
question 3, we need to consider multiple routers sharing sequenceNum, and we need to ensure that the self-increment of sequenceNum can be shared among multiple routers. At this point we will use curator's SharedCount to implement this function.For the introduction of
SharedCount, we can refer to the following articleshttps://curator.apache.org/curator-recipes/shared-counter.html.
Storage display of
delegationTokenin zk.ls /federationstore

ls /federationstore/router_rm_dt_secret_manager_root

The data of

masterKeywill be stored inrouter_rm_dt_master_keys_rootThe ZNode generation rule of
masterKeyisdelegation_key_ + keyIdls /federationstore/router_rm_dt_secret_manager_root/router_rm_dt_master_keys_root
The data of

delegationTokenwill be stored inrouter_rm_delegation_tokens_rootls /federationstore/router_rm_dt_secret_manager_root/router_rm_delegation_tokens_root
The ZNode generation rule of
delegationTokenisrm_delegation_token_ + keyIdThe data of

sequenceNumwill be stored inrouter_rm_dt_sequential_number