diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java index 230b86370c627..9d2399207ec86 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java @@ -97,8 +97,7 @@ public void install() throws SecurityInstallException { // If UGI use keytab for login, do not load HDFS delegation token. for (Token token : usrTok) { if (!token.getKind().equals(hdfsDelegationTokenKind)) { - final Text id = new Text(token.getIdentifier()); - credentialsToBeAdded.addToken(id, token); + credentialsToBeAdded.addToken(token.getService(), token); } } diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java index 0ee73b0d1c6ba..6dcab13185e08 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java @@ -115,13 +115,19 @@ public void testCreateTaskExecutorCredentials() throws Exception { File credentialFile = temporaryFolder.newFile("container_tokens"); final Text amRmTokenKind = AMRMTokenIdentifier.KIND_NAME; final Text hdfsDelegationTokenKind = new Text("HDFS_DELEGATION_TOKEN"); - final Text service = new Text("test-service"); + final Text amRmTokenService = new Text("rm-ip:8030"); + final Text hdfsDelegationTokenService = new Text("ha-hdfs:hadoop-namespace"); Credentials amCredentials = new Credentials(); amCredentials.addToken( - amRmTokenKind, new Token<>(new byte[4], new byte[4], amRmTokenKind, service)); + amRmTokenService, + new Token<>(new byte[4], new byte[4], amRmTokenKind, amRmTokenService)); amCredentials.addToken( - hdfsDelegationTokenKind, - new Token<>(new byte[4], new byte[4], hdfsDelegationTokenKind, service)); + hdfsDelegationTokenService, + new Token<>( + new byte[4], + new byte[4], + hdfsDelegationTokenKind, + hdfsDelegationTokenService)); amCredentials.writeTokenStorageFile( new org.apache.hadoop.fs.Path(credentialFile.getAbsolutePath()), yarnConf); diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java index 361b8e58692c8..72e0e4e87094b 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java @@ -34,7 +34,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataOutputBuffer; -import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.security.TokenCache; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; @@ -209,9 +208,8 @@ public static void setTokensFor( Collection> usrTok = currUsr.getTokens(); for (Token token : usrTok) { - final Text id = new Text(token.getIdentifier()); - LOG.info("Adding user token " + id + " with " + token); - credentials.addToken(id, token); + LOG.info("Adding user token " + token.getService() + " with " + token); + credentials.addToken(token.getService(), token); } try (DataOutputBuffer dob = new DataOutputBuffer()) { credentials.writeTokenStorageToStream(dob); @@ -560,8 +558,7 @@ static ContainerLaunchContext createTaskExecutorContext( Collection> userTokens = cred.getAllTokens(); for (Token token : userTokens) { if (!token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) { - final Text id = new Text(token.getIdentifier()); - taskManagerCred.addToken(id, token); + taskManagerCred.addToken(token.getService(), token); } }