diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java index 56249960c7ce4..d5f0e6007c2de 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/AbstractDelegationTokenSecretManager.java @@ -23,11 +23,11 @@ import java.io.IOException; import java.security.MessageDigest; import java.util.Collection; -import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import javax.crypto.SecretKey; @@ -58,10 +58,9 @@ @InterfaceAudience.Public @InterfaceStability.Evolving -public abstract -class AbstractDelegationTokenSecretManager - extends SecretManager { +public abstract class AbstractDelegationTokenSecretManager + extends SecretManager { private static final Logger LOG = LoggerFactory .getLogger(AbstractDelegationTokenSecretManager.class); @@ -80,7 +79,7 @@ private String formatTokenId(TokenIdent id) { * to DelegationTokenInformation. Protected by this object lock. */ protected final Map currentTokens - = new HashMap(); + = new ConcurrentHashMap<>(); /** * Sequence number to create DelegationTokenIdentifier. @@ -89,17 +88,17 @@ private String formatTokenId(TokenIdent id) { protected int delegationTokenSequenceNumber = 0; /** - * Access to allKeys is protected by this object lock + * Access to allKeys is protected by this object lock. */ protected final Map allKeys - = new HashMap(); + = new ConcurrentHashMap<>(); /** * Access to currentId is protected by this object lock. */ protected int currentId = 0; /** - * Access to currentKey is protected by this object lock + * Access to currentKey is protected by this object lock. */ private DelegationKey currentKey; @@ -122,7 +121,7 @@ private String formatTokenId(TokenIdent id) { protected Object noInterruptsLock = new Object(); /** - * Create a secret manager + * Create a secret manager. * @param delegationKeyUpdateInterval the number of milliseconds for rolling * new secret keys. * @param delegationTokenMaxLifetime the maximum lifetime of the delegation @@ -183,8 +182,9 @@ public long getCurrentTokensSize() { * @throws IOException raised on errors performing I/O. */ public synchronized void addKey(DelegationKey key) throws IOException { - if (running) // a safety check + if (running) { // a safety check throw new IOException("Can't add delegation key to a running SecretManager."); + } if (key.getKeyId() > getCurrentKeyId()) { setCurrentKeyId(key.getKeyId()); } @@ -453,8 +453,9 @@ private synchronized void removeExpiredKeys() { it.remove(); // ensure the tokens generated by this current key can be recovered // with this current key after this current key is rolled - if(!e.getValue().equals(currentKey)) + if(!e.getValue().equals(currentKey)) { removeStoredMasterKey(e.getValue()); + } } } } @@ -729,8 +730,9 @@ protected void logExpireTokens( } public void stopThreads() { - if (LOG.isDebugEnabled()) + if (LOG.isDebugEnabled()) { LOG.debug("Stopping expired delegation token remover thread"); + } running = false; if (tokenRemoverThread != null) { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java index c66e77ee47b50..93892ff54c397 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/ZKDelegationTokenSecretManager.java @@ -23,12 +23,11 @@ import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.Collections; import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Stream; import org.apache.curator.ensemble.fixed.FixedEnsembleProvider; import org.apache.curator.framework.CuratorFramework; @@ -37,10 +36,9 @@ import org.apache.curator.framework.api.ACLProvider; import org.apache.curator.framework.imps.DefaultACLProvider; import org.apache.curator.framework.recipes.cache.ChildData; -import org.apache.curator.framework.recipes.cache.PathChildrenCache; -import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode; -import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; -import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.framework.recipes.cache.CuratorCache; +import org.apache.curator.framework.recipes.cache.CuratorCacheBridge; +import org.apache.curator.framework.recipes.cache.CuratorCacheListener; import org.apache.curator.framework.recipes.shared.SharedCount; import org.apache.curator.framework.recipes.shared.VersionedValue; import org.apache.curator.retry.RetryNTimes; @@ -54,6 +52,7 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager; import org.apache.hadoop.util.curator.ZKCuratorManager; +import static org.apache.hadoop.util.Time.now; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException.NoNodeException; @@ -78,7 +77,7 @@ public abstract class ZKDelegationTokenSecretManager extends AbstractDelegationTokenSecretManager { - private static final String ZK_CONF_PREFIX = "zk-dt-secret-manager."; + public static final String ZK_CONF_PREFIX = "zk-dt-secret-manager."; public static final String ZK_DTSM_ZK_NUM_RETRIES = ZK_CONF_PREFIX + "zkNumRetries"; public static final String ZK_DTSM_ZK_SESSION_TIMEOUT = ZK_CONF_PREFIX @@ -101,6 +100,9 @@ public abstract class ZKDelegationTokenSecretManager { + try { + processKeyAddOrUpdate(node.getData()); + } catch (IOException e) { + LOG.error("Error while processing Curator keyCacheListener " + + "NODE_CREATED / NODE_CHANGED event"); + throw new UncheckedIOException(e); } - } - }, listenerThreadPool); - loadFromZKCache(false); - } + }) + .forDeletes(childData -> processKeyRemoved(childData.getPath())) + .build(); + keyCache.listenable().addListener(keyCacheListener); + keyCache.start(); + loadFromZKCache(false); } catch (Exception e) { - throw new IOException("Could not start PathChildrenCache for keys", e); + throw new IOException("Could not start Curator keyCacheListener for keys", + e); } - try { - tokenCache = new PathChildrenCache(zkClient, ZK_DTSM_TOKENS_ROOT, true); - if (tokenCache != null) { - tokenCache.start(StartMode.BUILD_INITIAL_CACHE); - tokenCache.getListenable().addListener(new PathChildrenCacheListener() { - - @Override - public void childEvent(CuratorFramework client, - PathChildrenCacheEvent event) throws Exception { - switch (event.getType()) { - case CHILD_ADDED: - processTokenAddOrUpdate(event.getData()); - break; - case CHILD_UPDATED: - processTokenAddOrUpdate(event.getData()); - break; - case CHILD_REMOVED: - processTokenRemoved(event.getData()); - break; - default: - break; - } - } - }, listenerThreadPool); + if (isTokenWatcherEnabled) { + LOG.info("TokenCache is enabled"); + try { + tokenCache = CuratorCache.bridgeBuilder(zkClient, ZK_DTSM_TOKENS_ROOT) + .build(); + CuratorCacheListener tokenCacheListener = CuratorCacheListener.builder() + .forCreatesAndChanges((oldNode, node) -> { + try { + processTokenAddOrUpdate(node.getData()); + } catch (IOException e) { + LOG.error("Error while processing Curator tokenCacheListener " + + "NODE_CREATED / NODE_CHANGED event"); + throw new UncheckedIOException(e); + } + }) + .forDeletes(childData -> { + try { + processTokenRemoved(childData); + } catch (IOException e) { + LOG.error("Error while processing Curator tokenCacheListener " + + "NODE_DELETED event"); + throw new UncheckedIOException(e); + } + }) + .build(); + tokenCache.listenable().addListener(tokenCacheListener); + tokenCache.start(); loadFromZKCache(true); + } catch (Exception e) { + throw new IOException( + "Could not start Curator tokenCacheListener for tokens", e); } - } catch (Exception e) { - throw new IOException("Could not start PathChildrenCache for tokens", e); } super.startThreads(); } /** - * Load the PathChildrenCache into the in-memory map. Possible caches to be + * Load the CuratorCache into the in-memory map. Possible caches to be * loaded are keyCache and tokenCache. * * @param isTokenCache true if loading tokenCache, false if loading keyCache. @@ -365,30 +363,31 @@ public void childEvent(CuratorFramework client, private void loadFromZKCache(final boolean isTokenCache) { final String cacheName = isTokenCache ? "token" : "key"; LOG.info("Starting to load {} cache.", cacheName); - final List children; + final Stream children; if (isTokenCache) { - children = tokenCache.getCurrentData(); + children = tokenCache.stream(); } else { - children = keyCache.getCurrentData(); + children = keyCache.stream(); } - int count = 0; - for (ChildData child : children) { + final AtomicInteger count = new AtomicInteger(0); + children.forEach(childData -> { try { if (isTokenCache) { - processTokenAddOrUpdate(child); + processTokenAddOrUpdate(childData.getData()); } else { - processKeyAddOrUpdate(child.getData()); + processKeyAddOrUpdate(childData.getData()); } } catch (Exception e) { LOG.info("Ignoring node {} because it failed to load.", - child.getPath()); + childData.getPath()); LOG.debug("Failure exception:", e); - ++count; + count.getAndIncrement(); } - } - if (count > 0) { - LOG.warn("Ignored {} nodes while loading {} cache.", count, cacheName); + }); + if (count.get() > 0) { + LOG.warn("Ignored {} nodes while loading {} cache.", count.get(), + cacheName); } LOG.info("Loaded {} cache.", cacheName); } @@ -398,9 +397,7 @@ private void processKeyAddOrUpdate(byte[] data) throws IOException { DataInputStream din = new DataInputStream(bin); DelegationKey key = new DelegationKey(); key.readFields(din); - synchronized (this) { - allKeys.put(key.getKeyId(), key); - } + allKeys.put(key.getKeyId(), key); } private void processKeyRemoved(String path) { @@ -410,15 +407,13 @@ private void processKeyRemoved(String path) { int j = tokSeg.indexOf('_'); if (j > 0) { int keyId = Integer.parseInt(tokSeg.substring(j + 1)); - synchronized (this) { - allKeys.remove(keyId); - } + allKeys.remove(keyId); } } } - private void processTokenAddOrUpdate(ChildData data) throws IOException { - ByteArrayInputStream bin = new ByteArrayInputStream(data.getData()); + protected TokenIdent processTokenAddOrUpdate(byte[] data) throws IOException { + ByteArrayInputStream bin = new ByteArrayInputStream(data); DataInputStream din = new DataInputStream(bin); TokenIdent ident = createIdentifier(); ident.readFields(din); @@ -429,12 +424,10 @@ private void processTokenAddOrUpdate(ChildData data) throws IOException { if (numRead > -1) { DelegationTokenInformation tokenInfo = new DelegationTokenInformation(renewDate, password); - synchronized (this) { - currentTokens.put(ident, tokenInfo); - // The cancel task might be waiting - notifyAll(); - } + currentTokens.put(ident, tokenInfo); + return ident; } + return null; } private void processTokenRemoved(ChildData data) throws IOException { @@ -442,11 +435,7 @@ private void processTokenRemoved(ChildData data) throws IOException { DataInputStream din = new DataInputStream(bin); TokenIdent ident = createIdentifier(); ident.readFields(din); - synchronized (this) { - currentTokens.remove(ident); - // The cancel task might be waiting - notifyAll(); - } + currentTokens.remove(ident); } @Override @@ -487,20 +476,6 @@ public void stopThreads() { } catch (Exception e) { LOG.error("Could not stop Curator Framework", e); } - if (listenerThreadPool != null) { - listenerThreadPool.shutdown(); - try { - // wait for existing tasks to terminate - if (!listenerThreadPool.awaitTermination(shutdownTimeout, - TimeUnit.MILLISECONDS)) { - LOG.error("Forcing Listener threadPool to shutdown !!"); - listenerThreadPool.shutdownNow(); - } - } catch (InterruptedException ie) { - listenerThreadPool.shutdownNow(); - Thread.currentThread().interrupt(); - } - } } private void createPersistentNode(String nodePath) throws Exception { @@ -647,7 +622,7 @@ protected DelegationTokenInformation getTokenInfo(TokenIdent ident) { * * @param ident Identifier of the token */ - private synchronized void syncLocalCacheWithZk(TokenIdent ident) { + protected void syncLocalCacheWithZk(TokenIdent ident) { try { DelegationTokenInformation tokenInfo = getTokenInfoFromZK(ident); if (tokenInfo != null && !currentTokens.containsKey(ident)) { @@ -661,16 +636,21 @@ private synchronized void syncLocalCacheWithZk(TokenIdent ident) { } } - private DelegationTokenInformation getTokenInfoFromZK(TokenIdent ident) + protected DelegationTokenInformation getTokenInfoFromZK(TokenIdent ident) throws IOException { return getTokenInfoFromZK(ident, false); } - private DelegationTokenInformation getTokenInfoFromZK(TokenIdent ident, + protected DelegationTokenInformation getTokenInfoFromZK(TokenIdent ident, boolean quiet) throws IOException { String nodePath = getNodePath(ZK_DTSM_TOKENS_ROOT, DELEGATION_TOKEN_PREFIX + ident.getSequenceNumber()); + return getTokenInfoFromZK(nodePath, quiet); + } + + protected DelegationTokenInformation getTokenInfoFromZK(String nodePath, + boolean quiet) throws IOException { try { byte[] data = zkClient.getData().forPath(nodePath); if ((data == null) || (data.length == 0)) { @@ -805,15 +785,30 @@ protected void updateToken(TokenIdent ident, @Override protected void removeStoredToken(TokenIdent ident) throws IOException { + removeStoredToken(ident, false); + } + + protected void removeStoredToken(TokenIdent ident, + boolean checkAgainstZkBeforeDeletion) throws IOException { String nodeRemovePath = getNodePath(ZK_DTSM_TOKENS_ROOT, DELEGATION_TOKEN_PREFIX + ident.getSequenceNumber()); - if (LOG.isDebugEnabled()) { - LOG.debug("Removing ZKDTSMDelegationToken_" - + ident.getSequenceNumber()); - } try { - if (zkClient.checkExists().forPath(nodeRemovePath) != null) { + DelegationTokenInformation dtInfo = getTokenInfoFromZK(ident, true); + if (dtInfo != null) { + // For the case there is no sync or watch miss, it is possible that the + // local storage has expired tokens which have been renewed by peer + // so double check again to avoid accidental delete + if (checkAgainstZkBeforeDeletion + && dtInfo.getRenewDate() > now()) { + LOG.info("Node already renewed by peer " + nodeRemovePath + + " so this token should not be deleted"); + return; + } + if (LOG.isDebugEnabled()) { + LOG.debug("Removing ZKDTSMDelegationToken_" + + ident.getSequenceNumber()); + } while(zkClient.checkExists().forPath(nodeRemovePath) != null){ try { zkClient.delete().guaranteed().forPath(nodeRemovePath); @@ -836,7 +831,7 @@ protected void removeStoredToken(TokenIdent ident) } @Override - public synchronized TokenIdent cancelToken(Token token, + public TokenIdent cancelToken(Token token, String canceller) throws IOException { ByteArrayInputStream buf = new ByteArrayInputStream(token.getIdentifier()); DataInputStream in = new DataInputStream(buf); @@ -847,7 +842,7 @@ public synchronized TokenIdent cancelToken(Token token, return super.cancelToken(token, canceller); } - private void addOrUpdateToken(TokenIdent ident, + protected void addOrUpdateToken(TokenIdent ident, DelegationTokenInformation info, boolean isUpdate) throws Exception { String nodeCreatePath = getNodePath(ZK_DTSM_TOKENS_ROOT, DELEGATION_TOKEN_PREFIX @@ -874,6 +869,10 @@ private void addOrUpdateToken(TokenIdent ident, } } + public boolean isTokenWatcherEnabled() { + return isTokenWatcherEnabled; + } + /** * Simple implementation of an {@link ACLProvider} that simply returns an ACL * that gives all permissions only to a single principal. @@ -905,11 +904,6 @@ static String getNodePath(String root, String nodeName) { return (root + "/" + nodeName); } - @VisibleForTesting - public ExecutorService getListenerThreadPool() { - return listenerThreadPool; - } - @VisibleForTesting DelegationTokenInformation getTokenInfoFromMemory(TokenIdent ident) { return currentTokens.get(ident); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java index 50d3dcfee757b..694b4e9a87faa 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/token/delegation/TestZKDelegationTokenSecretManager.java @@ -21,8 +21,6 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutorService; import java.util.function.Supplier; import org.apache.curator.RetryPolicy; @@ -59,15 +57,15 @@ public class TestZKDelegationTokenSecretManager { private static final Logger LOG = LoggerFactory.getLogger(TestZKDelegationTokenSecretManager.class); - private static final int TEST_RETRIES = 2; + protected static final int TEST_RETRIES = 2; - private static final int RETRY_COUNT = 5; + protected static final int RETRY_COUNT = 5; - private static final int RETRY_WAIT = 1000; + protected static final int RETRY_WAIT = 1000; - private static final long DAY_IN_SECS = 86400; + protected static final long DAY_IN_SECS = 86400; - private TestingServer zkServer; + protected TestingServer zkServer; @Rule public Timeout globalTimeout = new Timeout(300000); @@ -86,17 +84,17 @@ public void tearDown() throws Exception { } protected Configuration getSecretConf(String connectString) { - Configuration conf = new Configuration(); - conf.setBoolean(DelegationTokenManager.ENABLE_ZK_KEY, true); - conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZK_CONNECTION_STRING, connectString); - conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZNODE_WORKING_PATH, "testPath"); - conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZK_AUTH_TYPE, "none"); - conf.setLong(ZKDelegationTokenSecretManager.ZK_DTSM_ZK_SHUTDOWN_TIMEOUT, 100); - conf.setLong(DelegationTokenManager.UPDATE_INTERVAL, DAY_IN_SECS); - conf.setLong(DelegationTokenManager.MAX_LIFETIME, DAY_IN_SECS); - conf.setLong(DelegationTokenManager.RENEW_INTERVAL, DAY_IN_SECS); - conf.setLong(DelegationTokenManager.REMOVAL_SCAN_INTERVAL, DAY_IN_SECS); - return conf; + Configuration conf = new Configuration(); + conf.setBoolean(DelegationTokenManager.ENABLE_ZK_KEY, true); + conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZK_CONNECTION_STRING, connectString); + conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZNODE_WORKING_PATH, "testPath"); + conf.set(ZKDelegationTokenSecretManager.ZK_DTSM_ZK_AUTH_TYPE, "none"); + conf.setLong(ZKDelegationTokenSecretManager.ZK_DTSM_ZK_SHUTDOWN_TIMEOUT, 100); + conf.setLong(DelegationTokenManager.UPDATE_INTERVAL, DAY_IN_SECS); + conf.setLong(DelegationTokenManager.MAX_LIFETIME, DAY_IN_SECS); + conf.setLong(DelegationTokenManager.RENEW_INTERVAL, DAY_IN_SECS); + conf.setLong(DelegationTokenManager.REMOVAL_SCAN_INTERVAL, DAY_IN_SECS); + return conf; } @SuppressWarnings("unchecked") @@ -317,19 +315,13 @@ public void testCancelTokenSingleManager() throws Exception { @SuppressWarnings("rawtypes") protected void verifyDestroy(DelegationTokenManager tm, Configuration conf) throws Exception { - AbstractDelegationTokenSecretManager sm = - tm.getDelegationTokenSecretManager(); - ZKDelegationTokenSecretManager zksm = (ZKDelegationTokenSecretManager) sm; - ExecutorService es = zksm.getListenerThreadPool(); tm.destroy(); - Assert.assertTrue(es.isShutdown()); // wait for the pool to terminate long timeout = conf.getLong( ZKDelegationTokenSecretManager.ZK_DTSM_ZK_SHUTDOWN_TIMEOUT, ZKDelegationTokenSecretManager.ZK_DTSM_ZK_SHUTDOWN_TIMEOUT_DEFAULT); Thread.sleep(timeout * 3); - Assert.assertTrue(es.isTerminated()); } @SuppressWarnings({ "unchecked", "rawtypes" }) @@ -353,20 +345,9 @@ public void testStopThreads() throws Exception { tm1.init(); Token token = - (Token) + (Token) tm1.createToken(UserGroupInformation.getCurrentUser(), "foo"); Assert.assertNotNull(token); - - AbstractDelegationTokenSecretManager sm = tm1.getDelegationTokenSecretManager(); - ZKDelegationTokenSecretManager zksm = (ZKDelegationTokenSecretManager)sm; - ExecutorService es = zksm.getListenerThreadPool(); - es.submit(new Callable() { - public Void call() throws Exception { - Thread.sleep(shutdownTimeoutMillis * 2); // force this to be shutdownNow - return null; - } - }); - tm1.destroy(); } @@ -378,7 +359,7 @@ public void testACLs() throws Exception { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); String userPass = "myuser:mypass"; final ACL digestACL = new ACL(ZooDefs.Perms.ALL, new Id("digest", - DigestAuthenticationProvider.generateDigest(userPass))); + DigestAuthenticationProvider.generateDigest(userPass))); ACLProvider digestAclProvider = new ACLProvider() { @Override public List getAclForPath(String path) { return getDefaultAcl(); } @@ -392,12 +373,12 @@ public List getDefaultAcl() { }; CuratorFramework curatorFramework = - CuratorFrameworkFactory.builder() - .connectString(connectString) - .retryPolicy(retryPolicy) - .aclProvider(digestAclProvider) - .authorization("digest", userPass.getBytes("UTF-8")) - .build(); + CuratorFrameworkFactory.builder() + .connectString(connectString) + .retryPolicy(retryPolicy) + .aclProvider(digestAclProvider) + .authorization("digest", userPass.getBytes("UTF-8")) + .build(); curatorFramework.start(); ZKDelegationTokenSecretManager.setCurator(curatorFramework); tm1 = new DelegationTokenManager(conf, new Text("bla")); @@ -425,7 +406,7 @@ private void verifyACL(CuratorFramework curatorFramework, // cancelled but.. that would mean having to make an RPC call for every // verification request. // Thus, the eventual consistency tradef-off should be acceptable here... - private void verifyTokenFail(DelegationTokenManager tm, + protected void verifyTokenFail(DelegationTokenManager tm, Token token) throws IOException, InterruptedException { verifyTokenFailWithRetry(tm, token, RETRY_COUNT); diff --git a/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/CuratorService.java b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/CuratorService.java index a01b7151b6989..3457fa28634a6 100644 --- a/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/CuratorService.java +++ b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/client/impl/zk/CuratorService.java @@ -18,6 +18,9 @@ package org.apache.hadoop.registry.client.impl.zk; +import org.apache.curator.framework.recipes.cache.CuratorCache; +import org.apache.curator.framework.recipes.cache.CuratorCacheBridge; +import org.apache.curator.framework.recipes.cache.CuratorCacheListener; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; import org.apache.curator.ensemble.EnsembleProvider; @@ -28,9 +31,6 @@ import org.apache.curator.framework.api.CreateBuilder; import org.apache.curator.framework.api.DeleteBuilder; import org.apache.curator.framework.api.GetChildrenBuilder; -import org.apache.curator.framework.recipes.cache.TreeCache; -import org.apache.curator.framework.recipes.cache.TreeCacheEvent; -import org.apache.curator.framework.recipes.cache.TreeCacheListener; import org.apache.curator.retry.BoundedExponentialBackoffRetry; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -56,6 +56,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; +import java.io.UncheckedIOException; import java.util.List; /** @@ -109,9 +110,9 @@ public class CuratorService extends CompositeService private EnsembleProvider ensembleProvider; /** - * Registry tree cache. + * Registry Curator cache. */ - private TreeCache treeCache; + private CuratorCacheBridge curatorCacheBridge; /** * Construct the service. @@ -189,8 +190,8 @@ protected void serviceStart() throws Exception { protected void serviceStop() throws Exception { IOUtils.closeStream(curator); - if (treeCache != null) { - treeCache.close(); + if (curatorCacheBridge != null) { + curatorCacheBridge.close(); } super.serviceStop(); } @@ -824,73 +825,54 @@ protected String dumpRegistryRobustly(boolean verbose) { * * @param listener the listener. * @return a handle allowing for the management of the listener. - * @throws Exception if registration fails due to error. */ - public ListenerHandle registerPathListener(final PathListener listener) - throws Exception { - - final TreeCacheListener pathChildrenCacheListener = - new TreeCacheListener() { - - public void childEvent(CuratorFramework curatorFramework, - TreeCacheEvent event) - throws Exception { - String path = null; - if (event != null && event.getData() != null) { - path = event.getData().getPath(); - } - assert event != null; - switch (event.getType()) { - case NODE_ADDED: - LOG.info("Informing listener of added node {}", path); - listener.nodeAdded(path); - - break; - - case NODE_REMOVED: - LOG.info("Informing listener of removed node {}", path); - listener.nodeRemoved(path); - - break; - - case NODE_UPDATED: - LOG.info("Informing listener of updated node {}", path); - listener.nodeAdded(path); - - break; - - default: - // do nothing - break; - - } + public ListenerHandle registerPathListener(final PathListener listener) { + + CuratorCacheListener cacheListener = CuratorCacheListener.builder() + .forCreatesAndChanges((oldNode, node) -> { + final String path = node.getPath(); + LOG.info("Informing listener of added/updated node {}", path); + try { + listener.nodeAdded(path); + } catch (IOException e) { + LOG.error("Error while processing Curator listener " + + "NODE_CREATED / NODE_CHANGED event"); + throw new UncheckedIOException(e); } - }; - treeCache.getListenable().addListener(pathChildrenCacheListener); - - return new ListenerHandle() { - @Override - public void remove() { - treeCache.getListenable().removeListener(pathChildrenCacheListener); - } - }; + }) + .forDeletes(childData -> { + final String path = childData.getPath(); + LOG.info("Informing listener of removed node {}", path); + try { + listener.nodeRemoved(path); + } catch (IOException e) { + LOG.error("Error while processing Curator listener " + + "NODE_DELETED event"); + throw new UncheckedIOException(e); + } + }) + .build(); + curatorCacheBridge.listenable().addListener(cacheListener); + return () -> curatorCacheBridge.listenable().removeListener(cacheListener); } // TODO: should caches be stopped and then restarted if need be? /** - * Create the tree cache that monitors the registry for node addition, update, - * and deletion. - * - * @throws Exception if any issue arises during monitoring. + * Instantiate the Curator cache that monitors the registry for node + * addition, update and deletion. */ - public void monitorRegistryEntries() - throws Exception { + public void instantiateCacheForRegistry() { String registryPath = getConfig().get(RegistryConstants.KEY_REGISTRY_ZK_ROOT, RegistryConstants.DEFAULT_ZK_REGISTRY_ROOT); - treeCache = new TreeCache(curator, registryPath); - treeCache.start(); + curatorCacheBridge = CuratorCache.bridgeBuilder(curator, registryPath) + .build(); + } + + public void startCache() { + curatorCacheBridge.start(); } + } diff --git a/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/dns/RegistryDNSServer.java b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/dns/RegistryDNSServer.java index 1ff5f26b47207..8d0a38cfd47f9 100644 --- a/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/dns/RegistryDNSServer.java +++ b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/dns/RegistryDNSServer.java @@ -106,7 +106,7 @@ protected void serviceStart() throws Exception { private void manageRegistryDNS() { try { - registryOperations.monitorRegistryEntries(); + registryOperations.instantiateCacheForRegistry(); registryOperations.registerPathListener(new PathListener() { private String registryRoot = getConfig(). get(RegistryConstants.KEY_REGISTRY_ZK_ROOT, @@ -157,6 +157,7 @@ public void nodeRemoved(String path) throws IOException { } }); + registryOperations.startCache(); // create listener for record deletions diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/ZKDelegationTokenSecretManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/ZKDelegationTokenSecretManagerImpl.java index 4a111187ac46a..dcb05159500ae 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/ZKDelegationTokenSecretManagerImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/security/token/ZKDelegationTokenSecretManagerImpl.java @@ -19,13 +19,24 @@ package org.apache.hadoop.hdfs.server.federation.router.security.token; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier; import org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager; +import org.apache.hadoop.util.Time; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; /** * Zookeeper based router delegation token store implementation. @@ -33,24 +44,181 @@ public class ZKDelegationTokenSecretManagerImpl extends ZKDelegationTokenSecretManager { + public static final String ZK_DTSM_ROUTER_TOKEN_SYNC_INTERVAL = + ZK_CONF_PREFIX + "router.token.sync.interval"; + public static final int ZK_DTSM_ROUTER_TOKEN_SYNC_INTERVAL_DEFAULT = 5; + private static final Logger LOG = LoggerFactory.getLogger(ZKDelegationTokenSecretManagerImpl.class); - private Configuration conf = null; + private Configuration conf; + + private final ScheduledExecutorService scheduler = + Executors.newSingleThreadScheduledExecutor(); + + // Local cache of delegation tokens, used for deprecating tokens from + // currentTokenMap + private final Set localTokenCache = + new HashSet<>(); + // Native zk client for getting all tokens + private ZooKeeper zookeeper; + private final String TOKEN_PATH = "/" + zkClient.getNamespace() + + ZK_DTSM_TOKENS_ROOT; + // The flag used to issue an extra check before deletion + // Since cancel token and token remover thread use the same + // API here and one router could have a token that is renewed + // by another router, thus token remover should always check ZK + // to confirm whether it has been renewed or not + private ThreadLocal checkAgainstZkBeforeDeletion = + new ThreadLocal() { + @Override + protected Boolean initialValue() { + return true; + } + }; public ZKDelegationTokenSecretManagerImpl(Configuration conf) { super(conf); this.conf = conf; try { - super.startThreads(); + startThreads(); } catch (IOException e) { LOG.error("Error starting threads for zkDelegationTokens", e); } LOG.info("Zookeeper delegation token secret manager instantiated"); } + @Override + public void startThreads() throws IOException { + super.startThreads(); + // start token cache related work when watcher is disabled + if (!isTokenWatcherEnabled()) { + LOG.info("Watcher for tokens is disabled in this secret manager"); + try { + // By default set this variable + checkAgainstZkBeforeDeletion.set(true); + // Ensure the token root path exists + if (zkClient.checkExists().forPath(ZK_DTSM_TOKENS_ROOT) == null) { + zkClient.create().creatingParentsIfNeeded() + .withMode(CreateMode.PERSISTENT) + .forPath(ZK_DTSM_TOKENS_ROOT); + } + // Set up zookeeper client + try { + zookeeper = zkClient.getZookeeperClient().getZooKeeper(); + } catch (Exception e) { + LOG.info("Cannot get zookeeper client ", e); + } finally { + if (zookeeper == null) { + throw new IOException("Zookeeper client is null"); + } + } + + LOG.info("Start loading token cache"); + long start = Time.now(); + rebuildTokenCache(true); + LOG.info("Loaded token cache in {} milliseconds", Time.now() - start); + + int syncInterval = conf.getInt(ZK_DTSM_ROUTER_TOKEN_SYNC_INTERVAL, + ZK_DTSM_ROUTER_TOKEN_SYNC_INTERVAL_DEFAULT); + scheduler.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + try { + rebuildTokenCache(false); + } catch (Exception e) { + // ignore + } + } + }, syncInterval, syncInterval, TimeUnit.SECONDS); + } catch (Exception e) { + LOG.error("Error rebuilding local cache for zkDelegationTokens ", e); + } + } + } + + @Override + public void stopThreads() { + super.stopThreads(); + scheduler.shutdown(); + } + @Override public DelegationTokenIdentifier createIdentifier() { return new DelegationTokenIdentifier(); } + + /** + * This function will rebuild local token cache from zk storage. + * It is first called when the secret manager is initialized and + * then regularly at a configured interval. + * + * @param initial whether this is called during initialization + * @throws IOException + */ + private void rebuildTokenCache(boolean initial) throws IOException { + localTokenCache.clear(); + // Use bare zookeeper client to get all children since curator will + // wrap the same API with a sorting process. This is time consuming given + // millions of tokens + List zkTokens; + try { + zkTokens = zookeeper.getChildren(TOKEN_PATH, false); + } catch (KeeperException | InterruptedException e) { + throw new IOException("Tokens cannot be fetched from path " + + TOKEN_PATH, e); + } + byte[] data; + for (String tokenPath : zkTokens) { + try { + data = zkClient.getData().forPath( + ZK_DTSM_TOKENS_ROOT + "/" + tokenPath); + } catch (KeeperException.NoNodeException e) { + LOG.debug("No node in path [" + tokenPath + "]"); + continue; + } catch (Exception ex) { + throw new IOException(ex); + } + // Store data to currentTokenMap + AbstractDelegationTokenIdentifier ident = processTokenAddOrUpdate(data); + // Store data to localTokenCache for sync + localTokenCache.add(ident); + } + if (!initial) { + // Sync zkTokens with local cache, specifically + // 1) add/update tokens to local cache from zk, which is done through + // processTokenAddOrUpdate above + // 2) remove tokens in local cache but not in zk anymore + for (AbstractDelegationTokenIdentifier ident : currentTokens.keySet()) { + if (!localTokenCache.contains(ident)) { + currentTokens.remove(ident); + } + } + } + } + + @Override + public AbstractDelegationTokenIdentifier cancelToken( + Token token, String canceller) + throws IOException { + checkAgainstZkBeforeDeletion.set(false); + AbstractDelegationTokenIdentifier ident = super.cancelToken(token, + canceller); + checkAgainstZkBeforeDeletion.set(true); + return ident; + } + + @Override + protected void removeStoredToken(AbstractDelegationTokenIdentifier ident) + throws IOException { + super.removeStoredToken(ident, checkAgainstZkBeforeDeletion.get()); + } + + @Override + protected void addOrUpdateToken(AbstractDelegationTokenIdentifier ident, + DelegationTokenInformation info, boolean isUpdate) throws Exception { + // Store the data in local memory first + currentTokens.put(ident, info); + super.addOrUpdateToken(ident, info, isUpdate); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/security/token/TestZKDelegationTokenSecretManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/security/token/TestZKDelegationTokenSecretManagerImpl.java new file mode 100644 index 0000000000000..3c7f8e88a91d1 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/security/token/TestZKDelegationTokenSecretManagerImpl.java @@ -0,0 +1,234 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.federation.security.token; + +import static org.apache.hadoop.hdfs.server.federation.router.security.token.ZKDelegationTokenSecretManagerImpl.ZK_DTSM_ROUTER_TOKEN_SYNC_INTERVAL; +import static org.apache.hadoop.security.token.delegation.ZKDelegationTokenSecretManager.ZK_DTSM_TOKEN_WATCHER_ENABLED; +import static org.apache.hadoop.security.token.delegation.web.DelegationTokenManager.REMOVAL_SCAN_INTERVAL; +import static org.apache.hadoop.security.token.delegation.web.DelegationTokenManager.RENEW_INTERVAL; +import static org.junit.Assert.fail; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.router.security.token.ZKDelegationTokenSecretManagerImpl; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.SecretManager; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.delegation.TestZKDelegationTokenSecretManager; +import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier; +import org.apache.hadoop.security.token.delegation.web.DelegationTokenManager; +import org.apache.hadoop.util.Time; +import org.junit.Assert; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TestZKDelegationTokenSecretManagerImpl + extends TestZKDelegationTokenSecretManager { + private static final Logger LOG = + LoggerFactory.getLogger(TestZKDelegationTokenSecretManagerImpl.class); + + @SuppressWarnings("unchecked") + @Test + public void testMultiNodeOperationWithoutWatch() throws Exception { + String connectString = zkServer.getConnectString(); + Configuration conf = getSecretConf(connectString); + // disable watch + conf.setBoolean(ZK_DTSM_TOKEN_WATCHER_ENABLED, false); + conf.setInt(ZK_DTSM_ROUTER_TOKEN_SYNC_INTERVAL, 3); + + for (int i = 0; i < TEST_RETRIES; i++) { + ZKDelegationTokenSecretManagerImpl dtsm1 = + new ZKDelegationTokenSecretManagerImpl(conf); + ZKDelegationTokenSecretManagerImpl dtsm2 = + new ZKDelegationTokenSecretManagerImpl(conf); + DelegationTokenManager tm1, tm2; + tm1 = new DelegationTokenManager(conf, new Text("bla")); + tm1.setExternalDelegationTokenSecretManager(dtsm1); + tm2 = new DelegationTokenManager(conf, new Text("bla")); + tm2.setExternalDelegationTokenSecretManager(dtsm2); + + // common token operation without watchers should still be working + Token token = + (Token) tm1.createToken( + UserGroupInformation.getCurrentUser(), "foo"); + Assert.assertNotNull(token); + tm2.verifyToken(token); + tm2.renewToken(token, "foo"); + tm1.verifyToken(token); + tm1.cancelToken(token, "foo"); + try { + verifyTokenFail(tm2, token); + fail("Expected InvalidToken"); + } catch (SecretManager.InvalidToken it) { + // Ignore + } + + token = (Token) tm2.createToken( + UserGroupInformation.getCurrentUser(), "bar"); + Assert.assertNotNull(token); + tm1.verifyToken(token); + tm1.renewToken(token, "bar"); + tm2.verifyToken(token); + tm2.cancelToken(token, "bar"); + try { + verifyTokenFail(tm1, token); + fail("Expected InvalidToken"); + } catch (SecretManager.InvalidToken it) { + // Ignore + } + + dtsm1.stopThreads(); + dtsm2.stopThreads(); + verifyDestroy(tm1, conf); + verifyDestroy(tm2, conf); + } + } + + @Test + public void testMultiNodeTokenRemovalShortSyncWithoutWatch() + throws Exception { + String connectString = zkServer.getConnectString(); + Configuration conf = getSecretConf(connectString); + // disable watch + conf.setBoolean(ZK_DTSM_TOKEN_WATCHER_ENABLED, false); + // make sync quick + conf.setInt(ZK_DTSM_ROUTER_TOKEN_SYNC_INTERVAL, 3); + // set the renew window and removal interval to be a + // short time to trigger the background cleanup + conf.setInt(RENEW_INTERVAL, 10); + conf.setInt(REMOVAL_SCAN_INTERVAL, 10); + + for (int i = 0; i < TEST_RETRIES; i++) { + ZKDelegationTokenSecretManagerImpl dtsm1 = + new ZKDelegationTokenSecretManagerImpl(conf); + ZKDelegationTokenSecretManagerImpl dtsm2 = + new ZKDelegationTokenSecretManagerImpl(conf); + DelegationTokenManager tm1, tm2; + tm1 = new DelegationTokenManager(conf, new Text("bla")); + tm1.setExternalDelegationTokenSecretManager(dtsm1); + tm2 = new DelegationTokenManager(conf, new Text("bla")); + tm2.setExternalDelegationTokenSecretManager(dtsm2); + + // time: X + // token expiry time: + // tm1: X + 10 + // tm2: X + 10 + Token token = + (Token) tm1.createToken( + UserGroupInformation.getCurrentUser(), "foo"); + Assert.assertNotNull(token); + tm2.verifyToken(token); + + // time: X + 9 + // token expiry time: + // tm1: X + 10 + // tm2: X + 19 + Thread.sleep(9 * 1000); + tm2.renewToken(token, "foo"); + tm1.verifyToken(token); + + // time: X + 13 + // token expiry time: (sync happened) + // tm1: X + 19 + // tm2: X + 19 + Thread.sleep(4 * 1000); + tm1.verifyToken(token); + tm2.verifyToken(token); + + dtsm1.stopThreads(); + dtsm2.stopThreads(); + verifyDestroy(tm1, conf); + verifyDestroy(tm2, conf); + } + } + + // This is very unlikely to happen in real case, but worth putting + // the case out + @Test + public void testMultiNodeTokenRemovalLongSyncWithoutWatch() + throws Exception { + String connectString = zkServer.getConnectString(); + Configuration conf = getSecretConf(connectString); + // disable watch + conf.setBoolean(ZK_DTSM_TOKEN_WATCHER_ENABLED, false); + // make sync quick + conf.setInt(ZK_DTSM_ROUTER_TOKEN_SYNC_INTERVAL, 20); + // set the renew window and removal interval to be a + // short time to trigger the background cleanup + conf.setInt(RENEW_INTERVAL, 10); + conf.setInt(REMOVAL_SCAN_INTERVAL, 10); + + for (int i = 0; i < TEST_RETRIES; i++) { + ZKDelegationTokenSecretManagerImpl dtsm1 = + new ZKDelegationTokenSecretManagerImpl(conf); + ZKDelegationTokenSecretManagerImpl dtsm2 = + new ZKDelegationTokenSecretManagerImpl(conf); + ZKDelegationTokenSecretManagerImpl dtsm3 = + new ZKDelegationTokenSecretManagerImpl(conf); + DelegationTokenManager tm1, tm2, tm3; + tm1 = new DelegationTokenManager(conf, new Text("bla")); + tm1.setExternalDelegationTokenSecretManager(dtsm1); + tm2 = new DelegationTokenManager(conf, new Text("bla")); + tm2.setExternalDelegationTokenSecretManager(dtsm2); + tm3 = new DelegationTokenManager(conf, new Text("bla")); + tm3.setExternalDelegationTokenSecretManager(dtsm3); + + // time: X + // token expiry time: + // tm1: X + 10 + // tm2: X + 10 + // tm3: No token due to no sync + Token token = + (Token) tm1.createToken( + UserGroupInformation.getCurrentUser(), "foo"); + Assert.assertNotNull(token); + tm2.verifyToken(token); + + // time: X + 9 + // token expiry time: + // tm1: X + 10 + // tm2: X + 19 + // tm3: No token due to no sync + Thread.sleep(9 * 1000); + long renewalTime = tm2.renewToken(token, "foo"); + LOG.info("Renew for token {} at current time {} renewal time {}", + token.getIdentifier(), Time.formatTime(Time.now()), + Time.formatTime(renewalTime)); + tm1.verifyToken(token); + + // time: X + 13 + // token expiry time: (sync din't happen) + // tm1: X + 10 + // tm2: X + 19 + // tm3: X + 19 due to fetch from zk + Thread.sleep(4 * 1000); + tm2.verifyToken(token); + tm3.verifyToken(token); + + dtsm1.stopThreads(); + dtsm2.stopThreads(); + dtsm3.stopThreads(); + verifyDestroy(tm1, conf); + verifyDestroy(tm2, conf); + verifyDestroy(tm3, conf); + } + } + +}