From 58238d5f9a7a97a3f511b270f331b4a0c59f3352 Mon Sep 17 00:00:00 2001 From: zhangjunfan Date: Wed, 12 May 2021 15:05:47 +0800 Subject: [PATCH] [FLINK-22534] Set delegation token's service name as credential alias --- .../runtime/security/modules/HadoopModule.java | 3 +-- .../test/java/org/apache/flink/yarn/UtilsTest.java | 14 ++++++++++---- .../src/main/java/org/apache/flink/yarn/Utils.java | 9 +++------ 3 files changed, 14 insertions(+), 12 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java index 230b86370c627..9d2399207ec86 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/security/modules/HadoopModule.java @@ -97,8 +97,7 @@ public void install() throws SecurityInstallException { // If UGI use keytab for login, do not load HDFS delegation token. for (Token token : usrTok) { if (!token.getKind().equals(hdfsDelegationTokenKind)) { - final Text id = new Text(token.getIdentifier()); - credentialsToBeAdded.addToken(id, token); + credentialsToBeAdded.addToken(token.getService(), token); } } diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java index 0ee73b0d1c6ba..6dcab13185e08 100644 --- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java +++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/UtilsTest.java @@ -115,13 +115,19 @@ public void testCreateTaskExecutorCredentials() throws Exception { File credentialFile = temporaryFolder.newFile("container_tokens"); final Text amRmTokenKind = AMRMTokenIdentifier.KIND_NAME; final Text hdfsDelegationTokenKind = new Text("HDFS_DELEGATION_TOKEN"); - final Text service = new Text("test-service"); + final Text amRmTokenService = new Text("rm-ip:8030"); + final Text hdfsDelegationTokenService = new Text("ha-hdfs:hadoop-namespace"); Credentials amCredentials = new Credentials(); amCredentials.addToken( - amRmTokenKind, new Token<>(new byte[4], new byte[4], amRmTokenKind, service)); + amRmTokenService, + new Token<>(new byte[4], new byte[4], amRmTokenKind, amRmTokenService)); amCredentials.addToken( - hdfsDelegationTokenKind, - new Token<>(new byte[4], new byte[4], hdfsDelegationTokenKind, service)); + hdfsDelegationTokenService, + new Token<>( + new byte[4], + new byte[4], + hdfsDelegationTokenKind, + hdfsDelegationTokenService)); amCredentials.writeTokenStorageFile( new org.apache.hadoop.fs.Path(credentialFile.getAbsolutePath()), yarnConf); diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java index 361b8e58692c8..72e0e4e87094b 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java @@ -34,7 +34,6 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataOutputBuffer; -import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.security.TokenCache; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; @@ -209,9 +208,8 @@ public static void setTokensFor( Collection> usrTok = currUsr.getTokens(); for (Token token : usrTok) { - final Text id = new Text(token.getIdentifier()); - LOG.info("Adding user token " + id + " with " + token); - credentials.addToken(id, token); + LOG.info("Adding user token " + token.getService() + " with " + token); + credentials.addToken(token.getService(), token); } try (DataOutputBuffer dob = new DataOutputBuffer()) { credentials.writeTokenStorageToStream(dob); @@ -560,8 +558,7 @@ static ContainerLaunchContext createTaskExecutorContext( Collection> userTokens = cred.getAllTokens(); for (Token token : userTokens) { if (!token.getKind().equals(AMRMTokenIdentifier.KIND_NAME)) { - final Text id = new Text(token.getIdentifier()); - taskManagerCred.addToken(id, token); + taskManagerCred.addToken(token.getService(), token); } }