Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,7 @@ public void install() throws SecurityInstallException {
// If UGI use keytab for login, do not load HDFS delegation token.
for (Token<? extends TokenIdentifier> token : usrTok) {
if (!token.getKind().equals(hdfsDelegationTokenKind)) {
final Text id = new Text(token.getIdentifier());
credentialsToBeAdded.addToken(id, token);
credentialsToBeAdded.addToken(token.getService(), token);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain why we don't need the deep copy as the alias any more(or why we needed it before)?

I'd prefer to avoid such subtle changes especially since HDFS-9276 also introduced some deep copy constructor: https://issues.apache.org/jira/browse/HDFS-9276?focusedCommentId=15391195&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-15391195

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lirui-apache Let me explain this a little bit.

  1. HDFS-9276 made a defensive copy because a Token instance is passed to the constructor of PrivateToken. In the view of the PrivateToken class, a defensive copy is certainly needed.
  2. For this change, token, along with its referencing object usrTok and credentialsFromTokenStorageFile, are all local variables. credentialsFromTokenStorageFile comes from reading a file. Therefore, a deep copy is not as necessary here as it is in HDFS-9276.
  3. token.getIdentifier() returns a byte[], while token.getService() returns a Text. This is probably why a deep copy looks like removed in this change. The fact is that previously to this change, it didn't make a deep copy either.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @ChengbingLiu , thanks for the explanations. I'm not very familiar with this topic. So I have some follow-up questions.

  1. Per HDFS-9276, the deep copy is added to avoid scenarios like "Whenever the content of token.service is changed, publicService will reflect the change. I don't think this is what we want." So I wonder whether we need to guard against similar situations. E.g. when token.service is changed, do we want it to be reflected in the Credentials::tokenMap?
  2. The usrTok is indeed local in HadoopModule. However, a similar change is made in Utils::setTokensFor where usrTok is retrieved from current UGI and therefore is not local to the method. Will that be an issue for us?
  3. It seems both Text(byte[]) and Text(Text) copy the byte array. So previously to this change, we do make a deep copy, no?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

E.g. when token.service is changed, do we want it to be reflected in the Credentials::tokenMap?

No, we don't want that. But that should not happen with this change, explained below.

However, a similar change is made in Utils::setTokensFor where usrTok is retrieved from current UGI and therefore is not local to the method. Will that be an issue for us?

I don't think so. In Utils::setTokensFor, all tokens retrieved from the UGI is used to construct the local variable credentials, which is then used to be serialized into ByteBuffer securityTokens, which will not reflect the change in the UGI.

It seems both Text(byte[]) and Text(Text) copy the byte array. So previously to this change, we do make a deep copy, no?

Sorry for not making myself clear. I think the purpose of the previous code new Text(token.getIdentifier()) was type conversion instead of deep copy, otherwise it should look like addToken(new Text(...), new Token(token)).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Since credentials is serialized right away, I'm fine with this change.

Copy link
Member Author

@zuston zuston May 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks my mentor @ChengbingLiu for the explanations. Thanks for @lirui-apache review.

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,13 +115,19 @@ public void testCreateTaskExecutorCredentials() throws Exception {
File credentialFile = temporaryFolder.newFile("container_tokens");
final Text amRmTokenKind = AMRMTokenIdentifier.KIND_NAME;
final Text hdfsDelegationTokenKind = new Text("HDFS_DELEGATION_TOKEN");
final Text service = new Text("test-service");
final Text amRmTokenService = new Text("rm-ip:8030");
final Text hdfsDelegationTokenService = new Text("ha-hdfs:hadoop-namespace");
Credentials amCredentials = new Credentials();
amCredentials.addToken(
amRmTokenKind, new Token<>(new byte[4], new byte[4], amRmTokenKind, service));
amRmTokenService,
new Token<>(new byte[4], new byte[4], amRmTokenKind, amRmTokenService));
amCredentials.addToken(
hdfsDelegationTokenKind,
new Token<>(new byte[4], new byte[4], hdfsDelegationTokenKind, service));
hdfsDelegationTokenService,
new Token<>(
new byte[4],
new byte[4],
hdfsDelegationTokenKind,
hdfsDelegationTokenService));
amCredentials.writeTokenStorageFile(
new org.apache.hadoop.fs.Path(credentialFile.getAbsolutePath()), yarnConf);

Expand Down
9 changes: 3 additions & 6 deletions flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
Expand Down Expand Up @@ -209,9 +208,8 @@ public static void setTokensFor(

Collection<Token<? extends TokenIdentifier>> usrTok = currUsr.getTokens();
for (Token<? extends TokenIdentifier> token : usrTok) {
final Text id = new Text(token.getIdentifier());
LOG.info("Adding user token " + id + " with " + token);
credentials.addToken(id, token);
LOG.info("Adding user token " + token.getService() + " with " + token);
credentials.addToken(token.getService(), token);
}
try (DataOutputBuffer dob = new DataOutputBuffer()) {
credentials.writeTokenStorageToStream(dob);
Expand Down Expand Up @@ -560,8 +558,7 @@ static ContainerLaunchContext createTaskExecutorContext(
Collection<Token<? extends TokenIdentifier>> userTokens = cred.getAllTokens();
for (Token<? extends TokenIdentifier> token : userTokens) {
if (!token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) {
final Text id = new Text(token.getIdentifier());
taskManagerCred.addToken(id, token);
taskManagerCred.addToken(token.getService(), token);
}
}

Expand Down