From 146d23e91a70e403bb538024ce68c8f15d111192 Mon Sep 17 00:00:00 2001 From: Simbarashe Dzinamarira Date: Tue, 13 Sep 2022 01:13:11 -0700 Subject: [PATCH 1/3] HDFS-16767: RBF: Support observer node in Router-Based Federation. --- .../metrics/FederationRPCMBean.java | 4 + .../metrics/FederationRPCMetrics.java | 24 +- .../FederationRPCPerformanceMonitor.java | 6 +- .../server/federation/metrics/RBFMetrics.java | 2 +- .../resolver/ActiveNamenodeResolver.java | 30 +- .../resolver/MembershipNamenodeResolver.java | 103 +++-- .../federation/router/ConnectionPool.java | 2 +- .../federation/router/RBFConfigKeys.java | 8 + .../router/RouterClientProtocol.java | 5 +- .../federation/router/RouterRpcClient.java | 200 ++++++++- .../federation/router/RouterRpcMonitor.java | 4 +- .../federation/router/RouterRpcServer.java | 23 +- .../src/main/resources/hdfs-rbf-default.xml | 16 + .../federation/FederationTestUtils.java | 4 +- .../federation/MiniRouterDFSCluster.java | 22 + .../hdfs/server/federation/MockResolver.java | 50 ++- ...RouterRefreshFairnessPolicyController.java | 3 +- .../resolver/TestNamenodeResolver.java | 14 +- .../router/TestObserverWithRouter.java | 425 ++++++++++++++++++ .../router/TestRouterNamenodeHeartbeat.java | 6 +- .../router/TestRouterNamenodeMonitoring.java | 2 +- .../router/TestRouterNamenodeWebScheme.java | 2 +- .../router/TestRouterRPCClientRetries.java | 2 +- .../apache/hadoop/hdfs/MiniDFSCluster.java | 2 + 24 files changed, 862 insertions(+), 97 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java index 979e7504a872b..65c6c34eb2ff6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMBean.java @@ -30,6 +30,10 @@ public interface FederationRPCMBean { long getProxyOps(); + long getActiveProxyOps(); + + long getObserverProxyOps(); + double getProxyAvg(); long getProcessingOps(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java index 823bc7b8af21c..5d5f9fb8aa12a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCMetrics.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer; import org.apache.hadoop.metrics2.MetricsSystem; import org.apache.hadoop.metrics2.annotation.Metric; @@ -49,7 +50,10 @@ public class FederationRPCMetrics implements FederationRPCMBean { private MutableRate proxy; @Metric("Number of operations the Router proxied to a Namenode") private MutableCounterLong proxyOp; - + @Metric("Number of operations the Router proxied to a Active Namenode") + private MutableCounterLong activeProxyOp; + @Metric("Number of operations the Router proxied to a Observer Namenode") + private MutableCounterLong observerProxyOp; @Metric("Number of operations to hit a standby NN") private MutableCounterLong proxyOpFailureStandby; @Metric("Number of operations to fail to reach NN") @@ -256,9 +260,15 @@ public String getAsyncCallerPool() { * Add the time to proxy an operation from the moment the Router sends it to * the Namenode until it replied. * @param time Proxy time of an operation in nanoseconds. + * @param state NameNode state. Maybe null */ - public void addProxyTime(long time) { + public void addProxyTime(long time, FederationNamenodeServiceState state) { proxy.add(time); + if(FederationNamenodeServiceState.ACTIVE == state) { + activeProxyOp.incr(); + } else if (FederationNamenodeServiceState.OBSERVER == state) { + observerProxyOp.incr(); + } proxyOp.incr(); } @@ -272,6 +282,16 @@ public long getProxyOps() { return proxyOp.value(); } + @Override + public long getActiveProxyOps() { + return activeProxyOp.value(); + } + + @Override + public long getObserverProxyOps() { + return observerProxyOp.value(); + } + /** * Add the time to process a request in the Router from the time we receive * the call until we send it to the Namenode. diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java index 159d08e26a161..b57fa070546e7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/FederationRPCPerformanceMonitor.java @@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.server.federation.router.FederationUtil; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; import org.apache.hadoop.hdfs.server.federation.router.RouterRpcMonitor; import org.apache.hadoop.hdfs.server.federation.router.RouterRpcServer; import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; @@ -147,12 +148,13 @@ public long proxyOp() { } @Override - public void proxyOpComplete(boolean success, String nsId) { + public void proxyOpComplete(boolean success, String nsId, + FederationNamenodeServiceState state) { if (success) { long proxyTime = getProxyTime(); if (proxyTime >= 0) { if (metrics != null) { - metrics.addProxyTime(proxyTime); + metrics.addProxyTime(proxyTime, state); } if (nameserviceRPCMetricsMap != null && nameserviceRPCMetricsMap.containsKey(nsId)) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/RBFMetrics.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/RBFMetrics.java index a9d761f45d289..e1068394f6f42 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/RBFMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/metrics/RBFMetrics.java @@ -886,7 +886,7 @@ private List getActiveNamenodeRegistrations() // Fetch the most recent namenode registration String nsId = nsInfo.getNameserviceId(); List nns = - namenodeResolver.getNamenodesForNameserviceId(nsId); + namenodeResolver.getNamenodesForNameserviceId(nsId, false); if (nns != null) { FederationNamenodeContext nn = nns.get(0); if (nn instanceof MembershipState) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/ActiveNamenodeResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/ActiveNamenodeResolver.java index f06df70b517cf..81335863b5f58 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/ActiveNamenodeResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/ActiveNamenodeResolver.java @@ -43,6 +43,17 @@ @InterfaceStability.Evolving public interface ActiveNamenodeResolver { + /** + * Report a failed, unavailable NN address for a nameservice or blockPool. + * + * @param ns Nameservice identifier. + * @param failedAddress The address the failed responded to the command. + * + * @throws IOException If the state store cannot be accessed. + */ + void updateUnavailableNamenode( + String ns, InetSocketAddress failedAddress) throws IOException; + /** * Report a successful, active NN address for a nameservice or blockPool. * @@ -56,20 +67,30 @@ void updateActiveNamenode( /** * Returns a prioritized list of the most recent cached registration entries - * for a single nameservice ID. - * Returns an empty list if none are found. Returns entries in preference of: + * for a single nameservice ID. Returns an empty list if none are found. + * In the case of not observerRead Returns entries in preference of : * + * + * In the case of observerRead Returns entries in preference of : + * * * @param nameserviceId Nameservice identifier. + * @param observerRead Observer read case, observer NN will be ranked first * @return Prioritized list of namenode contexts. * @throws IOException If the state store cannot be accessed. */ - List - getNamenodesForNameserviceId(String nameserviceId) throws IOException; + List getNamenodesForNameserviceId( + String nameserviceId, boolean observerRead) throws IOException; /** * Returns a prioritized list of the most recent cached registration entries @@ -77,6 +98,7 @@ void updateActiveNamenode( * Returns an empty list if none are found. Returns entries in preference of: * diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java index 9f0f78067aedd..ff43580c6538a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java @@ -19,6 +19,7 @@ import static org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState.ACTIVE; import static org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState.EXPIRED; +import static org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState.OBSERVER; import static org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState.UNAVAILABLE; import java.io.IOException; @@ -73,8 +74,13 @@ public class MembershipNamenodeResolver /** Parent router ID. */ private String routerId; - /** Cached lookup of NN for nameservice. Invalidated on cache refresh. */ + /** Cached lookup of NN for nameservice with active state ranked first. + * Invalidated on cache refresh. */ private Map> cacheNS; + /** Cached lookup of NN for nameservice with observer state ranked first. + * Invalidated on cache refresh. */ + private Map> + observerFirstCacheNS; /** Cached lookup of NN for block pool. Invalidated on cache refresh. */ private Map> cacheBP; @@ -84,6 +90,7 @@ public MembershipNamenodeResolver( this.stateStore = store; this.cacheNS = new ConcurrentHashMap<>(); + this.observerFirstCacheNS = new ConcurrentHashMap<>(); this.cacheBP = new ConcurrentHashMap<>(); if (this.stateStore != null) { @@ -133,14 +140,25 @@ public boolean loadCache(boolean force) { // Force refresh of active NN cache cacheBP.clear(); cacheNS.clear(); + observerFirstCacheNS.clear(); return true; } + @Override public void updateUnavailableNamenode(String nsId, + InetSocketAddress address) throws IOException { + updateNameNodeState(nsId, address, UNAVAILABLE); + } + @Override public void updateActiveNamenode( final String nsId, final InetSocketAddress address) throws IOException { + updateNameNodeState(nsId, address, ACTIVE); + } - // Called when we have an RPC miss and successful hit on an alternate NN. + + private void updateNameNodeState(final String nsId, + final InetSocketAddress address, FederationNamenodeServiceState state) + throws IOException { // Temporarily update our cache, it will be overwritten on the next update. try { MembershipState partial = MembershipState.newInstance(); @@ -160,10 +178,11 @@ public void updateActiveNamenode( MembershipState record = records.get(0); UpdateNamenodeRegistrationRequest updateRequest = UpdateNamenodeRegistrationRequest.newInstance( - record.getNameserviceId(), record.getNamenodeId(), ACTIVE); + record.getNameserviceId(), record.getNamenodeId(), state); membership.updateNamenodeRegistration(updateRequest); cacheNS.remove(nsId); + observerFirstCacheNS.remove(nsId); // Invalidating the full cacheBp since getting the blockpool id from // namespace id is quite costly. cacheBP.clear(); @@ -175,9 +194,11 @@ public void updateActiveNamenode( @Override public List getNamenodesForNameserviceId( - final String nsId) throws IOException { + final String nsId, boolean observerRead) throws IOException { + Map> cache + = observerRead ? observerFirstCacheNS : cacheNS; - List ret = cacheNS.get(nsId); + List ret = cache.get(nsId); if (ret != null) { return ret; } @@ -189,7 +210,8 @@ public List getNamenodesForNameserviceId( partial.setNameserviceId(nsId); GetNamenodeRegistrationsRequest request = GetNamenodeRegistrationsRequest.newInstance(partial); - result = getRecentRegistrationForQuery(request, true, false); + result = getRecentRegistrationForQuery(request, true, + false, observerRead); } catch (StateStoreUnavailableException e) { LOG.error("Cannot get active NN for {}, State Store unavailable", nsId); return null; @@ -218,7 +240,7 @@ public List getNamenodesForNameserviceId( // Cache the response ret = Collections.unmodifiableList(result); - cacheNS.put(nsId, result); + cache.put(nsId, result); return ret; } @@ -235,7 +257,7 @@ public List getNamenodesForBlockPoolId( GetNamenodeRegistrationsRequest.newInstance(partial); final List result = - getRecentRegistrationForQuery(request, true, false); + getRecentRegistrationForQuery(request, true, false, false); if (result == null || result.isEmpty()) { LOG.error("Cannot locate eligible NNs for {}", bpId); } else { @@ -346,22 +368,34 @@ public Set getDisabledNamespaces() throws IOException { } /** - * Picks the most relevant record registration that matches the query. Return - * registrations matching the query in this preference: 1) Most recently - * updated ACTIVE registration 2) Most recently updated STANDBY registration - * (if showStandby) 3) Most recently updated UNAVAILABLE registration (if - * showUnavailable). EXPIRED registrations are ignored. + * Picks the most relevant record registration that matches the query. + * If not observer read, + * return registrations matching the query in this preference: + * 1) Most recently updated ACTIVE registration + * 2) Most recently updated Observer registration + * 3) Most recently updated STANDBY registration (if showStandby) + * 4) Most recently updated UNAVAILABLE registration (if showUnavailable). + * + * If observer read, + * return registrations matching the query in this preference: + * 1) Most recently updated Observer registration + * 2) Most recently updated ACTIVE registration + * 3) Most recently updated STANDBY registration (if showStandby) + * 4) Most recently updated UNAVAILABLE registration (if showUnavailable). + * + * EXPIRED registrations are ignored. * * @param request The select query for NN registrations. * @param addUnavailable include UNAVAILABLE registrations. * @param addExpired include EXPIRED registrations. + * @param observerRead Observer read case, observer NN will be ranked first * @return List of memberships or null if no registrations that * both match the query AND the selected states. * @throws IOException */ private List getRecentRegistrationForQuery( GetNamenodeRegistrationsRequest request, boolean addUnavailable, - boolean addExpired) throws IOException { + boolean addExpired, boolean observerRead) throws IOException { // Retrieve a list of all registrations that match this query. // This may include all NN records for a namespace/blockpool, including @@ -371,24 +405,37 @@ private List getRecentRegistrationForQuery( membershipStore.getNamenodeRegistrations(request); List memberships = response.getNamenodeMemberships(); - if (!addExpired || !addUnavailable) { - Iterator iterator = memberships.iterator(); - while (iterator.hasNext()) { - MembershipState membership = iterator.next(); - if (membership.getState() == EXPIRED && !addExpired) { - iterator.remove(); - } else if (membership.getState() == UNAVAILABLE && !addUnavailable) { - iterator.remove(); - } + List observerMemberships = new ArrayList<>(); + Iterator iterator = memberships.iterator(); + while (iterator.hasNext()) { + MembershipState membership = iterator.next(); + if (membership.getState() == EXPIRED && !addExpired) { + iterator.remove(); + } else if (membership.getState() == UNAVAILABLE && !addUnavailable) { + iterator.remove(); + } else if (membership.getState() == OBSERVER && observerRead) { + iterator.remove(); + observerMemberships.add(membership); } } - List priorityList = new ArrayList<>(); - priorityList.addAll(memberships); - Collections.sort(priorityList, new NamenodePriorityComparator()); + if(!observerRead) { + Collections.sort(memberships, new NamenodePriorityComparator()); + LOG.debug("Selected most recent NN {} for query", memberships); + return memberships; + } else { + List ret = new ArrayList<>( + memberships.size() + observerMemberships.size()); + Collections.sort(memberships, new NamenodePriorityComparator()); + if(observerMemberships.size() > 1) { + Collections.shuffle(observerMemberships); + } + ret.addAll(observerMemberships); + ret.addAll(memberships); - LOG.debug("Selected most recent NN {} for query", priorityList); - return priorityList; + LOG.debug("Selected most recent NN {} for query", ret); + return ret; + } } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java index 9a9abff0677ba..ef3580b35d6f7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java @@ -218,7 +218,7 @@ public AtomicInteger getClientIndex() { } /** - * Get the alignment context for this pool + * Get the alignment context for this pool. * @return Alignment context */ public PoolAlignmentContext getPoolAlignmentContext() { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java index 24a85c2d558b3..74d12d006ddba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java @@ -191,6 +191,14 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic { FEDERATION_STORE_PREFIX + "enable"; public static final boolean DFS_ROUTER_STORE_ENABLE_DEFAULT = true; + public static final String DFS_ROUTER_OBSERVER_READ_ENABLE = + FEDERATION_ROUTER_PREFIX + "observer.read.enable"; + public static final boolean DFS_ROUTER_OBSERVER_READ_ENABLE_DEFAULT = false; + + public static final String DFS_ROUTER_OBSERVER_AUTO_MSYNC_PERIOD = + FEDERATION_ROUTER_PREFIX + "observer.auto-msync-period"; + public static final long DFS_ROUTER_OBSERVER_AUTO_MSYNC_PERIOD_DEFAULT = 0; + public static final String DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE = FEDERATION_ROUTER_PREFIX + "observer.federated.state.propagation.maxsize"; public static final int DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE_DEFAULT = 5; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java index 9d3973d450bdd..a5f83c95b7baf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java @@ -1918,7 +1918,10 @@ public BatchedEntries listOpenFiles(long prevId, @Override public void msync() throws IOException { - rpcServer.checkOperation(NameNode.OperationCategory.READ, false); + rpcServer.checkOperation(NameNode.OperationCategory.READ, true); + Set nss = namenodeResolver.getNamespaces(); + RemoteMethod method = new RemoteMethod("msync"); + rpcClient.invokeConcurrent(nss, method); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index 918a46f80ca05..36a1b18bd8376 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -38,6 +38,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; @@ -63,6 +64,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.SnapshotException; import org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessPolicyController; @@ -70,18 +72,22 @@ import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext; import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation; +import org.apache.hadoop.hdfs.server.namenode.ha.ReadOnly; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision; import org.apache.hadoop.ipc.CallerContext; +import org.apache.hadoop.ipc.ObserverRetryOnActiveException; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RetriableException; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.Server.Call; import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.net.ConnectTimeoutException; +import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Time; import org.eclipse.jetty.util.ajax.JSON; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -128,6 +134,14 @@ public class RouterRpcClient { private final RouterRpcMonitor rpcMonitor; /** Field separator of CallerContext. */ private final String contextFieldSeparator; + /** Observer read enabled. Default for all nameservices. */ + private boolean observerReadEnabled; + /** Nameservice specific override for enabling or disabling observer read. */ + private Map nsObserverReadEnabled = new ConcurrentHashMap<>(); + /** Auto msync period. */ + private long autoMsyncPeriodMs; + /** Last msync times. */ + private Map lastMsyncTimes; /** Pattern to parse a stack trace line. */ private static final Pattern STACK_TRACE_PATTERN = @@ -139,6 +153,16 @@ public class RouterRpcClient { private Map acceptedPermitsPerNs = new ConcurrentHashMap<>(); private final boolean enableProxyUser; + + private static final Method MSYNC_METHOD; + + static { + try { + MSYNC_METHOD = ClientProtocol.class.getDeclaredMethod("msync"); + } catch (NoSuchMethodException e) { + throw new RuntimeException("Failed to create msync method instance.", e); + } + } /** * Create a router RPC client to manage remote procedure calls to NNs. @@ -200,6 +224,22 @@ public RouterRpcClient(Configuration conf, Router router, failoverSleepBaseMillis, failoverSleepMaxMillis); String[] ipProxyUsers = conf.getStrings(DFS_NAMENODE_IP_PROXY_USERS); this.enableProxyUser = ipProxyUsers != null && ipProxyUsers.length > 0; + this.observerReadEnabled = conf.getBoolean( + RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_ENABLE, + RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_ENABLE_DEFAULT); + Map observerReadOverrides = conf + .getPropsWithPrefix(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_ENABLE + "."); + observerReadOverrides + .forEach((nsId, readEnabled) -> + nsObserverReadEnabled.put(nsId, Boolean.valueOf(readEnabled))); + if (this.observerReadEnabled) { + LOG.info("Observer read is enabled for router."); + this.autoMsyncPeriodMs = conf.getTimeDuration( + RBFConfigKeys.DFS_ROUTER_OBSERVER_AUTO_MSYNC_PERIOD, + RBFConfigKeys.DFS_ROUTER_OBSERVER_AUTO_MSYNC_PERIOD_DEFAULT, + TimeUnit.MILLISECONDS); + this.lastMsyncTimes = new HashMap<>(); + } } /** @@ -452,6 +492,7 @@ private RetryDecision shouldRetry(final IOException ioe, final int retryCount, * @param namenodes A prioritized list of namenodes within the same * nameservice. * @param method Remote ClientProtocol method to invoke. + * @param skipObserver Skip observer namenodes. * @param params Variable list of parameters matching the method. * @return The result of invoking the method. * @throws ConnectException If it cannot connect to any Namenode. @@ -462,7 +503,8 @@ private RetryDecision shouldRetry(final IOException ioe, final int retryCount, public Object invokeMethod( final UserGroupInformation ugi, final List namenodes, - final Class protocol, final Method method, final Object... params) + final Class protocol, final Method method, boolean skipObserver, + final Object... params) throws ConnectException, StandbyException, IOException { if (namenodes == null || namenodes.isEmpty()) { @@ -478,8 +520,13 @@ public Object invokeMethod( rpcMonitor.proxyOp(); } boolean failover = false; + boolean tryActive = false; Map ioes = new LinkedHashMap<>(); for (FederationNamenodeContext namenode : namenodes) { + if ((tryActive || skipObserver) + && namenode.getState() == FederationNamenodeServiceState.OBSERVER) { + continue; + } ConnectionContext connection = null; String nsId = namenode.getNameserviceId(); String rpcAddress = namenode.getRpcAddress(); @@ -489,13 +536,14 @@ public Object invokeMethod( final Object proxy = client.getProxy(); ret = invoke(nsId, 0, method, proxy, params); - if (failover) { + if (failover && + FederationNamenodeServiceState.OBSERVER != namenode.getState()) { // Success on alternate server, update InetSocketAddress address = client.getAddress(); namenodeResolver.updateActiveNamenode(nsId, address); } if (this.rpcMonitor != null) { - this.rpcMonitor.proxyOpComplete(true, nsId); + this.rpcMonitor.proxyOpComplete(true, nsId, namenode.getState()); } if (this.router.getRouterClientMetrics() != null) { this.router.getRouterClientMetrics().incInvokedMethod(method); @@ -503,7 +551,11 @@ public Object invokeMethod( return ret; } catch (IOException ioe) { ioes.put(namenode, ioe); - if (ioe instanceof StandbyException) { + if (ioe instanceof ObserverRetryOnActiveException) { + LOG.info("Encountered ObserverRetryOnActiveException from {}." + + " Retry active namenode directly."); + tryActive = true; + } else if (ioe instanceof StandbyException) { // Fail over indicated by retry policy and/or NN if (this.rpcMonitor != null) { this.rpcMonitor.proxyOpFailureStandby(nsId); @@ -513,10 +565,15 @@ public Object invokeMethod( if (this.rpcMonitor != null) { this.rpcMonitor.proxyOpFailureCommunicate(nsId); } - failover = true; + if (FederationNamenodeServiceState.OBSERVER == namenode.getState()) { + namenodeResolver.updateUnavailableNamenode(nsId, + NetUtils.createSocketAddr(namenode.getRpcAddress())); + } else { + failover = true; + } } else if (ioe instanceof RemoteException) { if (this.rpcMonitor != null) { - this.rpcMonitor.proxyOpComplete(true, nsId); + this.rpcMonitor.proxyOpComplete(true, nsId, namenode.getState()); } RemoteException re = (RemoteException) ioe; ioe = re.unwrapRemoteException(); @@ -546,7 +603,7 @@ public Object invokeMethod( // Communication retries are handled by the retry policy if (this.rpcMonitor != null) { this.rpcMonitor.proxyOpFailureCommunicate(nsId); - this.rpcMonitor.proxyOpComplete(false, nsId); + this.rpcMonitor.proxyOpComplete(false, nsId, namenode.getState()); } throw ioe; } @@ -557,7 +614,7 @@ public Object invokeMethod( } } if (this.rpcMonitor != null) { - this.rpcMonitor.proxyOpComplete(false, null); + this.rpcMonitor.proxyOpComplete(false, null, null); } // All namenodes were unavailable or in standby @@ -713,7 +770,7 @@ public static boolean isUnavailableException(IOException ioe) { */ private boolean isClusterUnAvailable(String nsId) throws IOException { List nnState = this.namenodeResolver - .getNamenodesForNameserviceId(nsId); + .getNamenodesForNameserviceId(nsId, false); if (nnState != null) { for (FederationNamenodeContext nnContext : nnState) { @@ -844,13 +901,15 @@ public Object invokeSingle(final String nsId, RemoteMethod method) RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController(); acquirePermit(nsId, ugi, method, controller); try { - List nns = - getNamenodesForNameservice(nsId); + boolean isObserverRead = nsObserverReadEnabled.getOrDefault(nsId, observerReadEnabled) + && isReadCall(method.getMethod()); + List nns = msync(nsId, ugi, + isObserverRead); RemoteLocationContext loc = new RemoteLocation(nsId, "/", "/"); Class proto = method.getProtocol(); Method m = method.getMethod(); Object[] params = method.getParams(loc); - return invokeMethod(ugi, nns, proto, m, params); + return invokeMethod(ugi, nns, proto, m, !isObserverRead, params); } finally { releasePermit(nsId, ugi, method, controller); } @@ -927,7 +986,7 @@ public T invokeSingle(final RemoteLocationContext location, * @throws IOException if the success condition is not met and one of the RPC * calls generated a remote exception. */ - public Object invokeSequential( + public T invokeSequential( final List locations, final RemoteMethod remoteMethod) throws IOException { return invokeSequential(locations, remoteMethod, null, null); @@ -1012,12 +1071,15 @@ public RemoteResult invokeSequential( for (final RemoteLocationContext loc : locations) { String ns = loc.getNameserviceId(); acquirePermit(ns, ugi, remoteMethod, controller); + boolean isObserverRead = nsObserverReadEnabled.getOrDefault(ns, observerReadEnabled) + && isReadCall(m); List namenodes = - getNamenodesForNameservice(ns); + msync(ns, ugi, isObserverRead); try { Class proto = remoteMethod.getProtocol(); Object[] params = remoteMethod.getParams(loc); - Object result = invokeMethod(ugi, namenodes, proto, m, params); + Object result = invokeMethod( + ugi, namenodes, proto, m, !isObserverRead, params); // Check if the result is what we expected if (isExpectedClass(expectedResultClass, result) && isExpectedValue(expectedResultValue, result)) { @@ -1373,12 +1435,15 @@ public Map invokeConcurrent( String ns = location.getNameserviceId(); RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController(); acquirePermit(ns, ugi, method, controller); + boolean isObserverRead = nsObserverReadEnabled.getOrDefault(ns, observerReadEnabled) + && isReadCall(m); final List namenodes = - getNamenodesForNameservice(ns); + msync(ns, ugi, isObserverRead); try { Class proto = method.getProtocol(); Object[] paramList = method.getParams(location); - R result = (R) invokeMethod(ugi, namenodes, proto, m, paramList); + R result = (R) invokeMethod( + ugi, namenodes, proto, m, !isObserverRead, paramList); RemoteResult remoteResult = new RemoteResult<>(location, result); return Collections.singletonList(remoteResult); } catch (IOException ioe) { @@ -1396,8 +1461,10 @@ public Map invokeConcurrent( final CallerContext originContext = CallerContext.getCurrent(); for (final T location : locations) { String nsId = location.getNameserviceId(); + boolean isObserverRead = nsObserverReadEnabled.getOrDefault(nsId, observerReadEnabled) + && isReadCall(m); final List namenodes = - getNamenodesForNameservice(nsId); + msync(nsId, ugi, isObserverRead); final Class proto = method.getProtocol(); final Object[] paramList = method.getParams(location); if (standby) { @@ -1414,7 +1481,8 @@ public Map invokeConcurrent( callables.add( () -> { transferThreadLocalContext(originCall, originContext); - return invokeMethod(ugi, nnList, proto, m, paramList); + return invokeMethod( + ugi, nnList, proto, m, !isObserverRead, paramList); }); } } else { @@ -1423,7 +1491,8 @@ public Map invokeConcurrent( callables.add( () -> { transferThreadLocalContext(originCall, originContext); - return invokeMethod(ugi, namenodes, proto, m, paramList); + return invokeMethod( + ugi, namenodes, proto, m, !isObserverRead, paramList); }); } } @@ -1514,17 +1583,21 @@ private void transferThreadLocalContext( /** * Get a prioritized list of NNs that share the same nameservice ID (in the - * same namespace). NNs that are reported as ACTIVE will be first in the list. + * same namespace). + * In observer read case, OBSERVER NNs will be first in the list. + * Otherwise, ACTIVE NNs will be first in the list. * * @param nsId The nameservice ID for the namespace. + * @param observerRead Read on observer namenode. * @return A prioritized list of NNs to use for communication. * @throws IOException If a NN cannot be located for the nameservice ID. */ private List getNamenodesForNameservice( - final String nsId) throws IOException { + final String nsId, boolean observerRead) throws IOException { final List namenodes = - namenodeResolver.getNamenodesForNameserviceId(nsId); + namenodeResolver.getNamenodesForNameserviceId(nsId, + observerRead); if (namenodes == null || namenodes.isEmpty()) { throw new IOException("Cannot locate a registered namenode for " + nsId + @@ -1670,4 +1743,85 @@ private String getCurrentFairnessPolicyControllerClassName() { } return null; } + + private List msync(String ns, + UserGroupInformation ugi, boolean isObserverRead) throws IOException { + final List namenodes = + getNamenodesForNameservice(ns, isObserverRead); + if (autoMsyncPeriodMs < 0) { + LOG.debug("Skipping msync because " + + RBFConfigKeys.DFS_ROUTER_OBSERVER_AUTO_MSYNC_PERIOD + + " is less than 0"); + return namenodes; // no need for msync + } + + if (RouterStateIdContext.getClientStateIdFromCurrentCall(ns) > Long.MIN_VALUE) { + LOG.debug("Skipping msync because client used FederatedGSIContext."); + return namenodes; + } + + if (isObserverRead) { + long callStartTime = callTime(); + + LongHolder latestMsyncTime = lastMsyncTimes.get(ns); + + if (latestMsyncTime == null) { + // initialize + synchronized (lastMsyncTimes) { + latestMsyncTime = lastMsyncTimes.get(ns); + if(latestMsyncTime == null) { + latestMsyncTime = new LongHolder(0L); + lastMsyncTimes.put(ns, latestMsyncTime); + } + } + } + + if (callStartTime - latestMsyncTime.getValue() > autoMsyncPeriodMs) { + synchronized (latestMsyncTime) { + if (callStartTime - latestMsyncTime.getValue() > autoMsyncPeriodMs) { + long requestTime = Time.monotonicNow(); + invokeMethod(ugi, namenodes, ClientProtocol.class, MSYNC_METHOD, + true, new Object[0]); + latestMsyncTime.setValue(requestTime); + } + } + } + } + return namenodes; + } + + private static long callTime() { + Call call = Server.getCurCall().get(); + if(call != null) { + return call.getTimestampNanos() / 1000000L; + } + return Time.monotonicNow(); + } + + /** + * Check if a method is read-only. + * @return whether the 'method' is a read-only operation. + */ + private static boolean isReadCall(Method method) { + if (!method.isAnnotationPresent(ReadOnly.class)) { + return false; + } + return !method.getAnnotationsByType(ReadOnly.class)[0].activeOnly(); + } + + private final static class LongHolder { + private long value; + + LongHolder(long value) { + this.value = value; + } + + public void setValue(long value) { + this.value = value; + } + + public long getValue() { + return value; + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java index 039b40ae2e585..256f03f12ff38 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcMonitor.java @@ -19,6 +19,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.server.federation.metrics.FederationRPCMetrics; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; import org.apache.hadoop.hdfs.server.federation.store.StateStoreService; /** @@ -61,8 +62,9 @@ void init( /** * Mark a proxy operation as completed. * @param success If the operation was successful. + * @param state proxy namenode state. */ - void proxyOpComplete(boolean success, String nsId); + void proxyOpComplete(boolean success, String nsId, FederationNamenodeServiceState state); /** * Failed to proxy an operation to a Namenode because it was in standby. diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index 86fda12307cec..b373ae61a92db 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -311,7 +311,7 @@ public RouterRpcServer(Configuration conf, Router router, GetUserMappingsProtocolProtos.GetUserMappingsProtocolService. newReflectiveBlockingService(getUserMappingXlator); - InetSocketAddress confRpcAddress = conf.getSocketAddr( + InetSocketAddress confRpcAddress = this.conf.getSocketAddr( RBFConfigKeys.DFS_ROUTER_RPC_BIND_HOST_KEY, RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY, RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_DEFAULT, @@ -337,18 +337,17 @@ public RouterRpcServer(Configuration conf, Router router, .build(); // Add all the RPC protocols that the Router implements - DFSUtil.addPBProtocol( - conf, NamenodeProtocolPB.class, nnPbService, this.rpcServer); - DFSUtil.addPBProtocol(conf, RefreshUserMappingsProtocolPB.class, + DFSUtil.addPBProtocol(this.conf, NamenodeProtocolPB.class, nnPbService, this.rpcServer); + DFSUtil.addPBProtocol(this.conf, RefreshUserMappingsProtocolPB.class, refreshUserMappingService, this.rpcServer); - DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class, + DFSUtil.addPBProtocol(this.conf, GetUserMappingsProtocolPB.class, getUserMappingService, this.rpcServer); // Set service-level authorization security policy - this.serviceAuthEnabled = conf.getBoolean( + this.serviceAuthEnabled = this.conf.getBoolean( HADOOP_SECURITY_AUTHORIZATION, false); if (this.serviceAuthEnabled) { - rpcServer.refreshServiceAcl(conf, new RouterPolicyProvider()); + rpcServer.refreshServiceAcl(this.conf, new RouterPolicyProvider()); } // We don't want the server to log the full stack trace for some exceptions @@ -372,14 +371,14 @@ public RouterRpcServer(Configuration conf, Router router, this.rpcAddress = new InetSocketAddress( confRpcAddress.getHostName(), listenAddress.getPort()); - if (conf.getBoolean(RBFConfigKeys.DFS_ROUTER_METRICS_ENABLE, + if (this.conf.getBoolean(RBFConfigKeys.DFS_ROUTER_METRICS_ENABLE, RBFConfigKeys.DFS_ROUTER_METRICS_ENABLE_DEFAULT)) { // Create metrics monitor Class rpcMonitorClass = this.conf.getClass( RBFConfigKeys.DFS_ROUTER_METRICS_CLASS, RBFConfigKeys.DFS_ROUTER_METRICS_CLASS_DEFAULT, RouterRpcMonitor.class); - this.rpcMonitor = ReflectionUtils.newInstance(rpcMonitorClass, conf); + this.rpcMonitor = ReflectionUtils.newInstance(rpcMonitorClass, this.conf); } else { this.rpcMonitor = null; } @@ -391,10 +390,10 @@ public RouterRpcServer(Configuration conf, Router router, // Initialize modules this.quotaCall = new Quota(this.router, this); this.nnProto = new RouterNamenodeProtocol(this); - this.clientProto = new RouterClientProtocol(conf, this); + this.clientProto = new RouterClientProtocol(this.conf, this); this.routerProto = new RouterUserProtocol(this); - long dnCacheExpire = conf.getTimeDuration( + long dnCacheExpire = this.conf.getTimeDuration( DN_REPORT_CACHE_EXPIRE, DN_REPORT_CACHE_EXPIRE_MS_DEFAULT, TimeUnit.MILLISECONDS); this.dnCache = CacheBuilder.newBuilder() @@ -1331,7 +1330,7 @@ public void modifyAclEntries(String src, List aclSpec) clientProto.modifyAclEntries(src, aclSpec); } - @Override // ClienProtocol + @Override // ClientProtocol public void removeAclEntries(String src, List aclSpec) throws IOException { clientProto.removeAclEntries(src, aclSpec); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml index 7c0cb8b437024..9ca46c6bd66c8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml @@ -835,6 +835,22 @@ + + dfs.federation.router.observer.read.enable + false + + Enable observer read in router. This value is used across all nameservices + except when overridden by dfs.federation.router.observer.read.enable.EXAMPLENAMESERVICE + for a particular nameservice. + + + + + dfs.federation.router.observer.auto-msync-period + 0 + Observer auto msync period + + dfs.federation.router.observer.federated.state.propagation.maxsize 5 diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java index 79c28986c33f1..b0a897d9f4bb2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java @@ -175,7 +175,7 @@ public static void waitNamenodeRegistered( GenericTestUtils.waitFor(() -> { try { List namenodes = - resolver.getNamenodesForNameserviceId(nsId); + resolver.getNamenodesForNameserviceId(nsId, false); if (namenodes != null) { for (FederationNamenodeContext namenode : namenodes) { // Check if this is the Namenode we are checking @@ -207,7 +207,7 @@ public static void waitNamenodeRegistered( GenericTestUtils.waitFor(() -> { try { List nns = - resolver.getNamenodesForNameserviceId(nsId); + resolver.getNamenodesForNameserviceId(nsId, false); for (FederationNamenodeContext nn : nns) { if (nn.getState().equals(state)) { return true; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java index 53247262cefb1..4fcdf6595e4ad 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java @@ -806,6 +806,7 @@ public void startCluster(Configuration overrideConf) { .numDataNodes(numDNs) .nnTopology(topology) .dataNodeConfOverlays(dnConfs) + .checkExitOnShutdown(false) .storageTypes(storageTypes) .racks(racks) .build(); @@ -1038,6 +1039,27 @@ public void switchToStandby(String nsId, String nnId) { } } + /** + * Switch a namenode in a nameservice to be the observer. + * @param nsId Nameservice identifier. + * @param nnId Namenode identifier. + */ + public void switchToObserver(String nsId, String nnId) { + try { + int total = cluster.getNumNameNodes(); + NameNodeInfo[] nns = cluster.getNameNodeInfos(); + for (int i = 0; i < total; i++) { + NameNodeInfo nn = nns[i]; + if (nn.getNameserviceId().equals(nsId) && + nn.getNamenodeId().equals(nnId)) { + cluster.transitionToObserver(i); + } + } + } catch (Throwable e) { + LOG.error("Cannot transition to active", e); + } + } + /** * Stop the federated HDFS cluster. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java index f8f6ccef36374..4aaa8e7569e88 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MockResolver.java @@ -23,6 +23,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; @@ -119,12 +120,24 @@ public void setDisableRegistration(boolean isDisable) { disableRegistration = isDisable; } + @Override public void updateUnavailableNamenode(String ns, + InetSocketAddress failedAddress) throws IOException { + updateNameNodeState(ns, failedAddress, + FederationNamenodeServiceState.UNAVAILABLE); + } + @Override public void updateActiveNamenode( String nsId, InetSocketAddress successfulAddress) { + updateNameNodeState(nsId, successfulAddress, + FederationNamenodeServiceState.ACTIVE); + } - String address = successfulAddress.getHostName() + ":" + - successfulAddress.getPort(); + private void updateNameNodeState(String nsId, + InetSocketAddress iAddr, + FederationNamenodeServiceState state) { + String sAddress = iAddr.getHostName() + ":" + + iAddr.getPort(); String key = nsId; if (key != null) { // Update the active entry @@ -132,9 +145,9 @@ public void updateActiveNamenode( List namenodes = (List) this.resolver.get(key); for (FederationNamenodeContext namenode : namenodes) { - if (namenode.getRpcAddress().equals(address)) { + if (namenode.getRpcAddress().equals(sAddress)) { MockNamenodeContext nn = (MockNamenodeContext) namenode; - nn.setState(FederationNamenodeServiceState.ACTIVE); + nn.setState(state); break; } } @@ -147,14 +160,39 @@ public void updateActiveNamenode( @Override public synchronized List - getNamenodesForNameserviceId(String nameserviceId) { + getNamenodesForNameserviceId(String nameserviceId, boolean observerRead) { // Return a copy of the list because it is updated periodically List namenodes = this.resolver.get(nameserviceId); if (namenodes == null) { namenodes = new ArrayList<>(); } - return Collections.unmodifiableList(new ArrayList<>(namenodes)); + + List ret = new ArrayList<>(); + + if (observerRead) { + Iterator iterator = namenodes + .iterator(); + List observerNN = new ArrayList<>(); + List nonObserverNN = new ArrayList<>(); + while (iterator.hasNext()) { + FederationNamenodeContext membership = iterator.next(); + if (membership.getState() == FederationNamenodeServiceState.OBSERVER) { + observerNN.add(membership); + } else { + nonObserverNN.add(membership); + } + } + Collections.shuffle(observerNN); + Collections.sort(nonObserverNN, new NamenodePriorityComparator()); + ret.addAll(observerNN); + ret.addAll(nonObserverNN); + } else { + ret.addAll(namenodes); + Collections.sort(ret, new NamenodePriorityComparator()); + } + + return Collections.unmodifiableList(ret); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRefreshFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRefreshFairnessPolicyController.java index 065209060220e..27eb485734405 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRefreshFairnessPolicyController.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRefreshFairnessPolicyController.java @@ -161,7 +161,8 @@ public void testRefreshStaticChangeHandlers() throws Exception { Thread.sleep(sleepTime); return null; }).when(client) - .invokeMethod(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any()); + .invokeMethod(Mockito.any(), Mockito.any(), Mockito.any(), + Mockito.any(), Mockito.anyBoolean(), Mockito.any()); // No calls yet assertEquals("{}", diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestNamenodeResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestNamenodeResolver.java index ed10a3a87317d..b602a27c95f60 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestNamenodeResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/resolver/TestNamenodeResolver.java @@ -129,7 +129,7 @@ private void verifyFirstRegistration(String nsId, String nnId, int resultsCount, FederationNamenodeServiceState state) throws IOException { List namenodes = - namenodeResolver.getNamenodesForNameserviceId(nsId); + namenodeResolver.getNamenodesForNameserviceId(nsId, false); if (resultsCount == 0) { assertNull(namenodes); } else { @@ -291,8 +291,8 @@ public void testCacheUpdateOnNamenodeStateUpdate() throws IOException { HAServiceState.STANDBY))); stateStore.refreshCaches(true); // Check whether the namenpde state is reported correct as standby. - FederationNamenodeContext namenode = - namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0]).get(0); + FederationNamenodeContext namenode = namenodeResolver + .getNamenodesForNameserviceId(NAMESERVICES[0], false).get(0); assertEquals(FederationNamenodeServiceState.STANDBY, namenode.getState()); String rpcAddr = namenode.getRpcAddress(); InetSocketAddress inetAddr = getInetSocketAddress(rpcAddr); @@ -301,8 +301,8 @@ public void testCacheUpdateOnNamenodeStateUpdate() throws IOException { // RouterRpcClient calls updateActiveNamenode to update the state to active, // Check whether correct updated state is returned post update. namenodeResolver.updateActiveNamenode(NAMESERVICES[0], inetAddr); - FederationNamenodeContext namenode1 = - namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0]).get(0); + FederationNamenodeContext namenode1 = namenodeResolver + .getNamenodesForNameserviceId(NAMESERVICES[0], false).get(0); assertEquals("The namenode state should be ACTIVE post update.", FederationNamenodeServiceState.ACTIVE, namenode1.getState()); } @@ -318,8 +318,8 @@ public void testCacheUpdateOnNamenodeStateUpdateWithIp() InetSocketAddress inetAddr = getInetSocketAddress(rpcAddress); namenodeResolver.updateActiveNamenode(NAMESERVICES[0], inetAddr); - FederationNamenodeContext namenode = - namenodeResolver.getNamenodesForNameserviceId(NAMESERVICES[0]).get(0); + FederationNamenodeContext namenode = namenodeResolver + .getNamenodesForNameserviceId(NAMESERVICES[0], false).get(0); assertEquals("The namenode state should be ACTIVE post update.", FederationNamenodeServiceState.ACTIVE, namenode.getState()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java new file mode 100644 index 0000000000000..3ca0e0f81d1ce --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java @@ -0,0 +1,425 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.router; + +import static org.apache.hadoop.hdfs.server.federation.FederationTestUtils.NAMENODES; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID; +import static org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys.DFS_ROUTER_MONITOR_NAMENODE; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster; +import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext; +import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder; +import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeContext; +import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamenodeServiceState; +import org.apache.hadoop.hdfs.server.federation.resolver.MembershipNamenodeResolver; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.junit.After; +import org.junit.Test; + +public class TestObserverWithRouter { + + private MiniRouterDFSCluster cluster; + + public void startUpCluster(int numberOfObserver) throws Exception { + startUpCluster(numberOfObserver, null); + } + + public void startUpCluster(int numberOfObserver, Configuration confOverrides) throws Exception { + int numberOfNamenode = 2 + numberOfObserver; + Configuration conf = new Configuration(false); + conf.setBoolean(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_ENABLE, true); + conf.setInt(RBFConfigKeys.DFS_ROUTER_OBSERVER_AUTO_MSYNC_PERIOD, 0); + conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true); + conf.set(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, "0ms"); + if (confOverrides != null) { + conf.addResource(confOverrides); + } + cluster = new MiniRouterDFSCluster(true, 2, numberOfNamenode); + cluster.addNamenodeOverrides(conf); + // Start NNs and DNs and wait until ready + cluster.startCluster(); + + // Making one Namenode active per nameservice + if (cluster.isHighAvailability()) { + for (String ns : cluster.getNameservices()) { + cluster.switchToActive(ns, NAMENODES[0]); + cluster.switchToStandby(ns, NAMENODES[1]); + for (int i = 2; i < numberOfNamenode; i++) { + cluster.switchToObserver(ns, NAMENODES[i]); + } + } + } + + Configuration routerConf = new RouterConfigBuilder() + .metrics() + .rpc() + .build(); + + cluster.addRouterOverrides(conf); + cluster.addRouterOverrides(routerConf); + + // Start routers with only an RPC service + cluster.startRouters(); + + // Register and verify all NNs with all routers + cluster.registerNamenodes(); + cluster.waitNamenodeRegistration(); + // Setup the mount table + cluster.installMockLocations(); + + cluster.waitActiveNamespaces(); + } + + @After + public void teardown() throws IOException { + if (cluster != null) { + cluster.shutdown(); + cluster = null; + } + } + + @Test + public void testObserverRead() throws Exception { + startUpCluster(1); + RouterContext routerContext = cluster.getRandomRouter(); + List namenodes = routerContext + .getRouter().getNamenodeResolver() + .getNamenodesForNameserviceId(cluster.getNameservices().get(0), true); + assertTrue("First namenode should be observer", namenodes.get(0).getState() + .equals(FederationNamenodeServiceState.OBSERVER)); + FileSystem fileSystem = routerContext.getFileSystem(); + Path path = new Path("/testFile"); + // Send Create call to active + fileSystem.create(path).close(); + + // Send read request to observer + fileSystem.open(path).close(); + + long rpcCountForActive = routerContext.getRouter().getRpcServer() + .getRPCMetrics().getActiveProxyOps(); + // Create and complete calls should be sent to active + assertEquals("Two calls should be sent to active", 2, rpcCountForActive); + + long rpcCountForObserver = routerContext.getRouter().getRpcServer() + .getRPCMetrics().getObserverProxyOps(); + // getBlockLocations should be sent to observer + assertEquals("One call should be sent to observer", 1, rpcCountForObserver); + fileSystem.close(); + } + + @Test + public void testObserverReadWithoutFederatedStatePropagation() throws Exception { + Configuration confOverrides = new Configuration(false); + confOverrides.setInt(RBFConfigKeys.DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE, 0); + startUpCluster(1, confOverrides); + RouterContext routerContext = cluster.getRandomRouter(); + List namenodes = routerContext + .getRouter().getNamenodeResolver() + .getNamenodesForNameserviceId(cluster.getNameservices().get(0), true); + assertTrue("First namenode should be observer", namenodes.get(0).getState() + .equals(FederationNamenodeServiceState.OBSERVER)); + FileSystem fileSystem = routerContext.getFileSystem(); + Path path = new Path("/testFile"); + // Send Create call to active + fileSystem.create(path).close(); + + // Send read request to observer. The router will msync to the active namenode. + fileSystem.open(path).close(); + + long rpcCountForActive = routerContext.getRouter().getRpcServer() + .getRPCMetrics().getActiveProxyOps(); + // Create, complete and msync calls should be sent to active + assertEquals("Three calls should be sent to active", 3, rpcCountForActive); + + long rpcCountForObserver = routerContext.getRouter().getRpcServer() + .getRPCMetrics().getObserverProxyOps(); + // getBlockLocations should be sent to observer + assertEquals("One call should be sent to observer", 1, rpcCountForObserver); + fileSystem.close(); + } + + @Test + public void testDisablingObserverReadUsingNameserviceOverride() throws Exception { + // Disable observer reads using per-nameservice override + Configuration confOverrides = new Configuration(false); + confOverrides.setBoolean(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_ENABLE + ".ns0", false); + startUpCluster(1, confOverrides); + + RouterContext routerContext = cluster.getRandomRouter(); + FileSystem fileSystem = routerContext.getFileSystem(); + Path path = new Path("/testFile"); + fileSystem.create(path).close(); + fileSystem.open(path).close(); + fileSystem.close(); + + long rpcCountForActive = routerContext.getRouter().getRpcServer() + .getRPCMetrics().getActiveProxyOps(); + // Create, complete and read calls should be sent to active + assertEquals("Three calls should be sent to active", 3, rpcCountForActive); + + long rpcCountForObserver = routerContext.getRouter().getRpcServer() + .getRPCMetrics().getObserverProxyOps(); + assertEquals("Zero calls should be sent to observer", 0, rpcCountForObserver); + } + + @Test + public void testReadWhenObserverIsDown() throws Exception { + startUpCluster(1); + RouterContext routerContext = cluster.getRandomRouter(); + FileSystem fileSystem = routerContext.getFileSystem(); + Path path = new Path("/testFile1"); + // Send Create call to active + fileSystem.create(path).close(); + + // Stop observer NN + int nnIndex = stopObserver(1); + + assertNotEquals("No observer found", 3, nnIndex); + + // Send read request + fileSystem.open(path).close(); + + long rpcCountForActive = routerContext.getRouter().getRpcServer() + .getRPCMetrics().getActiveProxyOps(); + // Create, complete and getBlockLocation calls should be sent to active + assertEquals("Three calls should be sent to active", 3, + rpcCountForActive); + + long rpcCountForObserver = routerContext.getRouter().getRpcServer() + .getRPCMetrics().getObserverProxyOps(); + assertEquals("No call should send to observer", 0, + rpcCountForObserver); + fileSystem.close(); + } + + @Test + public void testMultipleObserver() throws Exception { + startUpCluster(2); + RouterContext routerContext = cluster.getRandomRouter(); + FileSystem fileSystem = routerContext.getFileSystem(); + Path path = new Path("/testFile1"); + // Send Create call to active + fileSystem.create(path).close(); + + // Stop one observer NN + stopObserver(1); + + // Send read request + fileSystem.open(path).close(); + + long rpcCountForActive = routerContext.getRouter().getRpcServer() + .getRPCMetrics().getActiveProxyOps(); + + long expectedActiveRpc = 2; + long expectedObserverRpc = 1; + + // Create and complete calls should be sent to active + assertEquals("Two calls should be sent to active", + expectedActiveRpc, rpcCountForActive); + + long rpcCountForObserver = routerContext.getRouter() + .getRpcServer().getRPCMetrics().getObserverProxyOps(); + // getBlockLocation call should send to observer + assertEquals("Read should be success with another observer", + expectedObserverRpc, rpcCountForObserver); + + // Stop one observer NN + stopObserver(1); + + // Send read request + fileSystem.open(path).close(); + + rpcCountForActive = routerContext.getRouter() + .getRpcServer().getRPCMetrics().getActiveProxyOps(); + + // getBlockLocation call should be sent to active + expectedActiveRpc += 1; + assertEquals("One call should be sent to active", expectedActiveRpc, + rpcCountForActive); + expectedObserverRpc += 0; + rpcCountForObserver = routerContext.getRouter() + .getRpcServer().getRPCMetrics().getObserverProxyOps(); + assertEquals("No call should send to observer", + expectedObserverRpc, rpcCountForObserver); + fileSystem.close(); + } + + private int stopObserver(int num) { + int nnIndex; + for (nnIndex = 0; nnIndex < cluster.getNamenodes().size(); nnIndex++) { + NameNode nameNode = cluster.getCluster().getNameNode(nnIndex); + if (nameNode != null && nameNode.isObserverState()) { + cluster.getCluster().shutdownNameNode(nnIndex); + num--; + if (num == 0) { + break; + } + } + } + return nnIndex; + } + + // test router observer with multiple to know which observer NN received + // requests + @Test + public void testMultipleObserverRouter() throws Exception { + StateStoreDFSCluster innerCluster = null; + RouterContext routerContext; + MembershipNamenodeResolver resolver; + + String ns0; + String ns1; + //create 4NN, One Active One Standby and Two Observers + innerCluster = new StateStoreDFSCluster(true, 4, 4, TimeUnit.SECONDS.toMillis(5), + TimeUnit.SECONDS.toMillis(5)); + Configuration routerConf = + new RouterConfigBuilder().stateStore().admin().rpc() + .enableLocalHeartbeat(true).heartbeat().build(); + + StringBuilder sb = new StringBuilder(); + ns0 = innerCluster.getNameservices().get(0); + MiniRouterDFSCluster.NamenodeContext context = + innerCluster.getNamenodes(ns0).get(1); + routerConf.set(DFS_NAMESERVICE_ID, ns0); + routerConf.set(DFS_HA_NAMENODE_ID_KEY, context.getNamenodeId()); + + // Specify namenodes (ns1.nn0,ns1.nn1) to monitor + sb = new StringBuilder(); + ns1 = innerCluster.getNameservices().get(1); + for (MiniRouterDFSCluster.NamenodeContext ctx : innerCluster.getNamenodes(ns1)) { + String suffix = ctx.getConfSuffix(); + if (sb.length() != 0) { + sb.append(","); + } + sb.append(suffix); + } + routerConf.set(DFS_ROUTER_MONITOR_NAMENODE, sb.toString()); + routerConf.setBoolean(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_ENABLE, true); + routerConf.setInt(RBFConfigKeys.DFS_ROUTER_OBSERVER_AUTO_MSYNC_PERIOD, 0); + routerConf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true); + routerConf.set(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, "0ms"); + + innerCluster.addNamenodeOverrides(routerConf); + innerCluster.addRouterOverrides(routerConf); + innerCluster.startCluster(); + + if (innerCluster.isHighAvailability()) { + for (String ns : innerCluster.getNameservices()) { + innerCluster.switchToActive(ns, NAMENODES[0]); + innerCluster.switchToStandby(ns, NAMENODES[1]); + for (int i = 2; i < 4; i++) { + innerCluster.switchToObserver(ns, NAMENODES[i]); + } + } + } + innerCluster.startRouters(); + innerCluster.waitClusterUp(); + + routerContext = innerCluster.getRandomRouter(); + resolver = (MembershipNamenodeResolver) routerContext.getRouter() + .getNamenodeResolver(); + + resolver.loadCache(true); + List namespaceInfo0 = + resolver.getNamenodesForNameserviceId(ns0, true); + List namespaceInfo1 = + resolver.getNamenodesForNameserviceId(ns1, true); + assertEquals(namespaceInfo0.get(0).getState(), + FederationNamenodeServiceState.OBSERVER); + assertEquals(namespaceInfo0.get(1).getState(), + FederationNamenodeServiceState.OBSERVER); + assertNotEquals(namespaceInfo0.get(0).getNamenodeId(), + namespaceInfo0.get(1).getNamenodeId()); + assertEquals(namespaceInfo1.get(0).getState(), + FederationNamenodeServiceState.OBSERVER); + } + + @Test + public void testUnavaliableObserverNN() throws Exception { + startUpCluster(2); + RouterContext routerContext = cluster.getRandomRouter(); + FileSystem fileSystem = routerContext.getFileSystem(); + + stopObserver(2); + + fileSystem.listStatus(new Path("/")); + + long rpcCountForActive = routerContext.getRouter().getRpcServer() + .getRPCMetrics().getActiveProxyOps(); + + // msync and getBlockLocation calls should be sent to active when observer + // is stopped. + assertEquals("Two call should send to active", + 2, rpcCountForActive); + + fileSystem.close(); + + boolean hasUnavailable = false; + for(String ns : cluster.getNameservices()) { + List nns = routerContext.getRouter() + .getNamenodeResolver().getNamenodesForNameserviceId(ns, false); + for(FederationNamenodeContext nn : nns) { + if(FederationNamenodeServiceState.UNAVAILABLE == nn.getState()) { + hasUnavailable = true; + } + } + } + // After communicating with unavailable observer namenode, + // we will update state to unavailable. + assertTrue("There Must has unavailable NN", hasUnavailable); + } + + @Test + public void testRouterMsync() throws Exception { + startUpCluster(1); + RouterContext routerContext = cluster.getRandomRouter(); + + FileSystem fileSystem = routerContext.getFileSystem(); + Path path = new Path("/testFile"); + + // Send Create call to active + fileSystem.create(path).close(); + long rpcCountForActive = routerContext.getRouter().getRpcServer() + .getRPCMetrics().getActiveProxyOps(); + // Create and complete calls should be sent to active + assertEquals("Two calls should be sent to active", 2, + rpcCountForActive); + + // Send msync + fileSystem.msync(); + rpcCountForActive = routerContext.getRouter().getRpcServer() + .getRPCMetrics().getActiveProxyOps(); + // 2 msync calls should be sent. One to each active namenode in the two namespaces. + assertEquals("Four calls should be sent to active", 4, + rpcCountForActive); + fileSystem.close(); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeHeartbeat.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeHeartbeat.java index 94f2baeaed136..04b4b58bcb6e3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeHeartbeat.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeHeartbeat.java @@ -167,7 +167,7 @@ public void testHearbeat() throws InterruptedException, IOException { // Verify the locator has matching NN entries for each NS for (String ns : cluster.getNameservices()) { List nns = - namenodeResolver.getNamenodesForNameserviceId(ns); + namenodeResolver.getNamenodesForNameserviceId(ns, false); // Active FederationNamenodeContext active = nns.get(0); @@ -191,7 +191,7 @@ public void testHearbeat() throws InterruptedException, IOException { // Verify the locator has recorded the failover for the failover NS List failoverNSs = - namenodeResolver.getNamenodesForNameserviceId(failoverNS); + namenodeResolver.getNamenodesForNameserviceId(failoverNS, false); // Active FederationNamenodeContext active = failoverNSs.get(0); assertEquals(NAMENODES[1], active.getNamenodeId()); @@ -202,7 +202,7 @@ public void testHearbeat() throws InterruptedException, IOException { // Verify the locator has the same records for the other ns List normalNss = - namenodeResolver.getNamenodesForNameserviceId(normalNs); + namenodeResolver.getNamenodesForNameserviceId(normalNs, false); // Active active = normalNss.get(0); assertEquals(NAMENODES[0], active.getNamenodeId()); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java index 4fae86b01d399..bae2dea3ceabf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeMonitoring.java @@ -204,7 +204,7 @@ public void testNamenodeMonitoring() throws Exception { final List namespaceInfo = new ArrayList<>(); for (String nsId : nns.keySet()) { List nnReports = - resolver.getNamenodesForNameserviceId(nsId); + resolver.getNamenodesForNameserviceId(nsId, false); namespaceInfo.addAll(nnReports); } for (FederationNamenodeContext nnInfo : namespaceInfo) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeWebScheme.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeWebScheme.java index ab507aaf9ecd4..f23b02092a299 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeWebScheme.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterNamenodeWebScheme.java @@ -194,7 +194,7 @@ private void testWebScheme(HttpConfig.Policy httpPolicy, final List namespaceInfo = new ArrayList<>(); for (String nsId : nns.keySet()) { List nnReports = - resolver.getNamenodesForNameserviceId(nsId); + resolver.getNamenodesForNameserviceId(nsId, false); namespaceInfo.addAll(nnReports); } for (FederationNamenodeContext nnInfo : namespaceInfo) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java index b2bfb2f5121bb..1054e5ac8cf97 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCClientRetries.java @@ -166,7 +166,7 @@ public void testRetryWhenOneNameServiceDown() throws Exception { private void registerInvalidNameReport() throws IOException { String ns0 = cluster.getNameservices().get(0); List origin = resolver - .getNamenodesForNameserviceId(ns0); + .getNamenodesForNameserviceId(ns0, false); FederationNamenodeContext nnInfo = origin.get(0); NamenodeStatusReport report = new NamenodeStatusReport(ns0, nnInfo.getNamenodeId(), nnInfo.getRpcAddress(), diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index c4e99b1833a06..dd8bb2043828b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -2309,6 +2309,8 @@ public synchronized void restartNameNode(int nnIndex, boolean waitActive, nn.getHttpServer() .setAttribute(ImageServlet.RECENT_IMAGE_CHECK_ENABLED, false); info.nameNode = nn; + info.nameserviceId = info.conf.get(DFS_NAMESERVICE_ID); + info.nnId = info.conf.get(DFS_HA_NAMENODE_ID_KEY); info.setStartOpt(startOpt); if (waitActive) { if (numDataNodes > 0) { From 61b07588c2a1fb6057e2d84e366fb6ec53de6345 Mon Sep 17 00:00:00 2001 From: Simbarashe Dzinamarira Date: Tue, 13 Sep 2022 01:13:26 -0700 Subject: [PATCH 2/3] Disables observer reads for clients without FederatedNamespaceState. Removes auto-msync. --- .../resolver/ActiveNamenodeResolver.java | 4 +- .../resolver/MembershipNamenodeResolver.java | 6 +- .../federation/router/RBFConfigKeys.java | 4 - .../federation/router/RouterRpcClient.java | 161 ++++-------------- .../src/main/resources/hdfs-rbf-default.xml | 6 - .../router/TestObserverWithRouter.java | 42 ++--- 6 files changed, 60 insertions(+), 163 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/ActiveNamenodeResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/ActiveNamenodeResolver.java index 81335863b5f58..cae1f478604d6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/ActiveNamenodeResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/ActiveNamenodeResolver.java @@ -85,12 +85,12 @@ void updateActiveNamenode( * * * @param nameserviceId Nameservice identifier. - * @param observerRead Observer read case, observer NN will be ranked first + * @param listObserversFirst Observer read case, observer NN will be ranked first * @return Prioritized list of namenode contexts. * @throws IOException If the state store cannot be accessed. */ List getNamenodesForNameserviceId( - String nameserviceId, boolean observerRead) throws IOException; + String nameserviceId, boolean listObserversFirst) throws IOException; /** * Returns a prioritized list of the most recent cached registration entries diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java index ff43580c6538a..a16d8ee069337 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java @@ -194,9 +194,9 @@ private void updateNameNodeState(final String nsId, @Override public List getNamenodesForNameserviceId( - final String nsId, boolean observerRead) throws IOException { + final String nsId, boolean listObserversFirst) throws IOException { Map> cache - = observerRead ? observerFirstCacheNS : cacheNS; + = listObserversFirst ? observerFirstCacheNS : cacheNS; List ret = cache.get(nsId); if (ret != null) { @@ -211,7 +211,7 @@ public List getNamenodesForNameserviceId( GetNamenodeRegistrationsRequest request = GetNamenodeRegistrationsRequest.newInstance(partial); result = getRecentRegistrationForQuery(request, true, - false, observerRead); + false, listObserversFirst); } catch (StateStoreUnavailableException e) { LOG.error("Cannot get active NN for {}, State Store unavailable", nsId); return null; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java index 74d12d006ddba..7df7ca01932b2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java @@ -195,10 +195,6 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic { FEDERATION_ROUTER_PREFIX + "observer.read.enable"; public static final boolean DFS_ROUTER_OBSERVER_READ_ENABLE_DEFAULT = false; - public static final String DFS_ROUTER_OBSERVER_AUTO_MSYNC_PERIOD = - FEDERATION_ROUTER_PREFIX + "observer.auto-msync-period"; - public static final long DFS_ROUTER_OBSERVER_AUTO_MSYNC_PERIOD_DEFAULT = 0; - public static final String DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE = FEDERATION_ROUTER_PREFIX + "observer.federated.state.propagation.maxsize"; public static final int DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE_DEFAULT = 5; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index 36a1b18bd8376..9e45013e5612d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -38,7 +38,6 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; @@ -64,7 +63,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.NameNodeProxiesClient.ProxyAndInfo; import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; -import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.SnapshotException; import org.apache.hadoop.hdfs.server.federation.fairness.RouterRpcFairnessPolicyController; @@ -87,7 +85,6 @@ import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.StringUtils; -import org.apache.hadoop.util.Time; import org.eclipse.jetty.util.ajax.JSON; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -135,13 +132,9 @@ public class RouterRpcClient { /** Field separator of CallerContext. */ private final String contextFieldSeparator; /** Observer read enabled. Default for all nameservices. */ - private boolean observerReadEnabled; + private final boolean observerReadEnabled; /** Nameservice specific override for enabling or disabling observer read. */ private Map nsObserverReadEnabled = new ConcurrentHashMap<>(); - /** Auto msync period. */ - private long autoMsyncPeriodMs; - /** Last msync times. */ - private Map lastMsyncTimes; /** Pattern to parse a stack trace line. */ private static final Pattern STACK_TRACE_PATTERN = @@ -153,16 +146,6 @@ public class RouterRpcClient { private Map acceptedPermitsPerNs = new ConcurrentHashMap<>(); private final boolean enableProxyUser; - - private static final Method MSYNC_METHOD; - - static { - try { - MSYNC_METHOD = ClientProtocol.class.getDeclaredMethod("msync"); - } catch (NoSuchMethodException e) { - throw new RuntimeException("Failed to create msync method instance.", e); - } - } /** * Create a router RPC client to manage remote procedure calls to NNs. @@ -234,11 +217,6 @@ public RouterRpcClient(Configuration conf, Router router, nsObserverReadEnabled.put(nsId, Boolean.valueOf(readEnabled))); if (this.observerReadEnabled) { LOG.info("Observer read is enabled for router."); - this.autoMsyncPeriodMs = conf.getTimeDuration( - RBFConfigKeys.DFS_ROUTER_OBSERVER_AUTO_MSYNC_PERIOD, - RBFConfigKeys.DFS_ROUTER_OBSERVER_AUTO_MSYNC_PERIOD_DEFAULT, - TimeUnit.MILLISECONDS); - this.lastMsyncTimes = new HashMap<>(); } } @@ -697,16 +675,12 @@ private void addClientInfoToCallerContext() { * @param params Variable parameters * @return Response from the remote server * @throws IOException - * @throws InterruptedException */ private Object invoke(String nsId, int retryCount, final Method method, final Object obj, final Object... params) throws IOException { try { return method.invoke(obj, params); - } catch (IllegalAccessException e) { - LOG.error("Unexpected exception while proxying API", e); - return null; - } catch (IllegalArgumentException e) { + } catch (IllegalAccessException | IllegalArgumentException e) { LOG.error("Unexpected exception while proxying API", e); return null; } catch (InvocationTargetException e) { @@ -901,10 +875,8 @@ public Object invokeSingle(final String nsId, RemoteMethod method) RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController(); acquirePermit(nsId, ugi, method, controller); try { - boolean isObserverRead = nsObserverReadEnabled.getOrDefault(nsId, observerReadEnabled) - && isReadCall(method.getMethod()); - List nns = msync(nsId, ugi, - isObserverRead); + boolean isObserverRead = isObserverReadEligible(nsId, method.getMethod()); + List nns = getOrderedNamenodes(nsId, isObserverRead); RemoteLocationContext loc = new RemoteLocation(nsId, "/", "/"); Class proto = method.getProtocol(); Method m = method.getMethod(); @@ -1071,10 +1043,9 @@ public RemoteResult invokeSequential( for (final RemoteLocationContext loc : locations) { String ns = loc.getNameserviceId(); acquirePermit(ns, ugi, remoteMethod, controller); - boolean isObserverRead = nsObserverReadEnabled.getOrDefault(ns, observerReadEnabled) - && isReadCall(m); + boolean isObserverRead = isObserverReadEligible(ns, m); List namenodes = - msync(ns, ugi, isObserverRead); + getOrderedNamenodes(ns, isObserverRead); try { Class proto = remoteMethod.getProtocol(); Object[] params = remoteMethod.getParams(loc); @@ -1435,10 +1406,9 @@ public Map invokeConcurrent( String ns = location.getNameserviceId(); RouterRpcFairnessPolicyController controller = getRouterRpcFairnessPolicyController(); acquirePermit(ns, ugi, method, controller); - boolean isObserverRead = nsObserverReadEnabled.getOrDefault(ns, observerReadEnabled) - && isReadCall(m); + boolean isObserverRead = isObserverReadEligible(ns, m); final List namenodes = - msync(ns, ugi, isObserverRead); + getOrderedNamenodes(ns, isObserverRead); try { Class proto = method.getProtocol(); Object[] paramList = method.getParams(location); @@ -1461,10 +1431,9 @@ public Map invokeConcurrent( final CallerContext originContext = CallerContext.getCurrent(); for (final T location : locations) { String nsId = location.getNameserviceId(); - boolean isObserverRead = nsObserverReadEnabled.getOrDefault(nsId, observerReadEnabled) - && isReadCall(m); + boolean isObserverRead = isObserverReadEligible(nsId, m); final List namenodes = - msync(nsId, ugi, isObserverRead); + getOrderedNamenodes(nsId, isObserverRead); final Class proto = method.getProtocol(); final Object[] paramList = method.getParams(location); if (standby) { @@ -1581,31 +1550,6 @@ private void transferThreadLocalContext( CallerContext.setCurrent(originContext); } - /** - * Get a prioritized list of NNs that share the same nameservice ID (in the - * same namespace). - * In observer read case, OBSERVER NNs will be first in the list. - * Otherwise, ACTIVE NNs will be first in the list. - * - * @param nsId The nameservice ID for the namespace. - * @param observerRead Read on observer namenode. - * @return A prioritized list of NNs to use for communication. - * @throws IOException If a NN cannot be located for the nameservice ID. - */ - private List getNamenodesForNameservice( - final String nsId, boolean observerRead) throws IOException { - - final List namenodes = - namenodeResolver.getNamenodesForNameserviceId(nsId, - observerRead); - - if (namenodes == null || namenodes.isEmpty()) { - throw new IOException("Cannot locate a registered namenode for " + nsId + - " from " + router.getRouterId()); - } - return namenodes; - } - /** * Get a prioritized list of NNs that share the same block pool ID (in the * same namespace). NNs that are reported as ACTIVE will be first in the list. @@ -1744,58 +1688,37 @@ private String getCurrentFairnessPolicyControllerClassName() { return null; } - private List msync(String ns, - UserGroupInformation ugi, boolean isObserverRead) throws IOException { - final List namenodes = - getNamenodesForNameservice(ns, isObserverRead); - if (autoMsyncPeriodMs < 0) { - LOG.debug("Skipping msync because " - + RBFConfigKeys.DFS_ROUTER_OBSERVER_AUTO_MSYNC_PERIOD - + " is less than 0"); - return namenodes; // no need for msync - } + /** + * Get a prioritized list of NNs that share the same nameservice ID (in the + * same namespace). + * In observer read case, OBSERVER NNs will be first in the list. + * Otherwise, ACTIVE NNs will be first in the list. + * + * @param nsId The nameservice ID for the namespace. + * @param isObserverRead Read on observer namenode. + * @return A prioritized list of NNs to use for communication. + * @throws IOException If a NN cannot be located for the nameservice ID. + */ + private List getOrderedNamenodes(String nsId, + boolean isObserverRead) throws IOException { + final List namenodes; - if (RouterStateIdContext.getClientStateIdFromCurrentCall(ns) > Long.MIN_VALUE) { - LOG.debug("Skipping msync because client used FederatedGSIContext."); - return namenodes; + if (RouterStateIdContext.getClientStateIdFromCurrentCall(nsId) > Long.MIN_VALUE) { + namenodes = namenodeResolver.getNamenodesForNameserviceId(nsId, isObserverRead); + } else { + namenodes = namenodeResolver.getNamenodesForNameserviceId(nsId, false); } - if (isObserverRead) { - long callStartTime = callTime(); - - LongHolder latestMsyncTime = lastMsyncTimes.get(ns); - - if (latestMsyncTime == null) { - // initialize - synchronized (lastMsyncTimes) { - latestMsyncTime = lastMsyncTimes.get(ns); - if(latestMsyncTime == null) { - latestMsyncTime = new LongHolder(0L); - lastMsyncTimes.put(ns, latestMsyncTime); - } - } - } - - if (callStartTime - latestMsyncTime.getValue() > autoMsyncPeriodMs) { - synchronized (latestMsyncTime) { - if (callStartTime - latestMsyncTime.getValue() > autoMsyncPeriodMs) { - long requestTime = Time.monotonicNow(); - invokeMethod(ugi, namenodes, ClientProtocol.class, MSYNC_METHOD, - true, new Object[0]); - latestMsyncTime.setValue(requestTime); - } - } - } + if (namenodes == null || namenodes.isEmpty()) { + throw new IOException("Cannot locate a registered namenode for " + nsId + + " from " + router.getRouterId()); } return namenodes; } - private static long callTime() { - Call call = Server.getCurCall().get(); - if(call != null) { - return call.getTimestampNanos() / 1000000L; - } - return Time.monotonicNow(); + private boolean isObserverReadEligible(String nsId, Method method) { + return nsObserverReadEnabled.getOrDefault(nsId, observerReadEnabled) + && isReadCall(method); } /** @@ -1808,20 +1731,4 @@ private static boolean isReadCall(Method method) { } return !method.getAnnotationsByType(ReadOnly.class)[0].activeOnly(); } - - private final static class LongHolder { - private long value; - - LongHolder(long value) { - this.value = value; - } - - public void setValue(long value) { - this.value = value; - } - - public long getValue() { - return value; - } - } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml index 9ca46c6bd66c8..c6320054f1300 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml @@ -845,12 +845,6 @@ - - dfs.federation.router.observer.auto-msync-period - 0 - Observer auto msync period - - dfs.federation.router.observer.federated.state.propagation.maxsize 5 diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java index 3ca0e0f81d1ce..e82735bfd34d9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java @@ -56,7 +56,6 @@ public void startUpCluster(int numberOfObserver, Configuration confOverrides) th int numberOfNamenode = 2 + numberOfObserver; Configuration conf = new Configuration(false); conf.setBoolean(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_ENABLE, true); - conf.setInt(RBFConfigKeys.DFS_ROUTER_OBSERVER_AUTO_MSYNC_PERIOD, 0); conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true); conf.set(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, "0ms"); if (confOverrides != null) { @@ -113,8 +112,8 @@ public void testObserverRead() throws Exception { List namenodes = routerContext .getRouter().getNamenodeResolver() .getNamenodesForNameserviceId(cluster.getNameservices().get(0), true); - assertTrue("First namenode should be observer", namenodes.get(0).getState() - .equals(FederationNamenodeServiceState.OBSERVER)); + assertEquals("First namenode should be observer", namenodes.get(0).getState(), + FederationNamenodeServiceState.OBSERVER); FileSystem fileSystem = routerContext.getFileSystem(); Path path = new Path("/testFile"); // Send Create call to active @@ -144,8 +143,8 @@ public void testObserverReadWithoutFederatedStatePropagation() throws Exception List namenodes = routerContext .getRouter().getNamenodeResolver() .getNamenodesForNameserviceId(cluster.getNameservices().get(0), true); - assertTrue("First namenode should be observer", namenodes.get(0).getState() - .equals(FederationNamenodeServiceState.OBSERVER)); + assertEquals("First namenode should be observer", namenodes.get(0).getState(), + FederationNamenodeServiceState.OBSERVER); FileSystem fileSystem = routerContext.getFileSystem(); Path path = new Path("/testFile"); // Send Create call to active @@ -156,13 +155,12 @@ public void testObserverReadWithoutFederatedStatePropagation() throws Exception long rpcCountForActive = routerContext.getRouter().getRpcServer() .getRPCMetrics().getActiveProxyOps(); - // Create, complete and msync calls should be sent to active + // Create, complete and getBlockLocations calls should be sent to active assertEquals("Three calls should be sent to active", 3, rpcCountForActive); long rpcCountForObserver = routerContext.getRouter().getRpcServer() .getRPCMetrics().getObserverProxyOps(); - // getBlockLocations should be sent to observer - assertEquals("One call should be sent to observer", 1, rpcCountForObserver); + assertEquals("No call should be sent to observer", 0, rpcCountForObserver); fileSystem.close(); } @@ -291,7 +289,7 @@ private int stopObserver(int num) { // requests @Test public void testMultipleObserverRouter() throws Exception { - StateStoreDFSCluster innerCluster = null; + StateStoreDFSCluster innerCluster; RouterContext routerContext; MembershipNamenodeResolver resolver; @@ -312,7 +310,6 @@ public void testMultipleObserverRouter() throws Exception { routerConf.set(DFS_HA_NAMENODE_ID_KEY, context.getNamenodeId()); // Specify namenodes (ns1.nn0,ns1.nn1) to monitor - sb = new StringBuilder(); ns1 = innerCluster.getNameservices().get(1); for (MiniRouterDFSCluster.NamenodeContext ctx : innerCluster.getNamenodes(ns1)) { String suffix = ctx.getConfSuffix(); @@ -323,7 +320,6 @@ public void testMultipleObserverRouter() throws Exception { } routerConf.set(DFS_ROUTER_MONITOR_NAMENODE, sb.toString()); routerConf.setBoolean(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_ENABLE, true); - routerConf.setInt(RBFConfigKeys.DFS_ROUTER_OBSERVER_AUTO_MSYNC_PERIOD, 0); routerConf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true); routerConf.set(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, "0ms"); @@ -363,24 +359,28 @@ public void testMultipleObserverRouter() throws Exception { } @Test - public void testUnavaliableObserverNN() throws Exception { + public void testUnavailableObserverNN() throws Exception { startUpCluster(2); RouterContext routerContext = cluster.getRandomRouter(); FileSystem fileSystem = routerContext.getFileSystem(); stopObserver(2); - fileSystem.listStatus(new Path("/")); + Path path = new Path("/testFile"); + // Send Create call to active + fileSystem.create(path).close(); + + // Send read request. + fileSystem.open(path).close(); long rpcCountForActive = routerContext.getRouter().getRpcServer() .getRPCMetrics().getActiveProxyOps(); - // msync and getBlockLocation calls should be sent to active when observer - // is stopped. - assertEquals("Two call should send to active", - 2, rpcCountForActive); + // Create, complete and getBlockLocations + // calls should be sent to active. + assertEquals("Three calls should be send to active", + 3, rpcCountForActive); - fileSystem.close(); boolean hasUnavailable = false; for(String ns : cluster.getNameservices()) { @@ -392,9 +392,9 @@ public void testUnavaliableObserverNN() throws Exception { } } } - // After communicating with unavailable observer namenode, - // we will update state to unavailable. - assertTrue("There Must has unavailable NN", hasUnavailable); + // After attempting to communicate with unavailable observer namenode, + // its state is updated to unavailable. + assertTrue("There must be unavailable namenodes", hasUnavailable); } @Test From b0e67a3b9f324720039eb405d6b3d31a1dc05eb0 Mon Sep 17 00:00:00 2001 From: Simbarashe Dzinamarira Date: Tue, 13 Sep 2022 15:23:32 -0700 Subject: [PATCH 3/3] Addressing review comments. --- .../resolver/MembershipNamenodeResolver.java | 40 ++++++-------- .../federation/router/RBFConfigKeys.java | 8 +-- .../federation/router/RouterRpcClient.java | 53 +++++++++---------- .../federation/router/RouterRpcServer.java | 21 ++++---- .../src/main/resources/hdfs-rbf-default.xml | 17 ++++-- ...RouterRefreshFairnessPolicyController.java | 4 +- .../router/TestObserverWithRouter.java | 6 +-- 7 files changed, 76 insertions(+), 73 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java index a16d8ee069337..d65ebdb628ef7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/resolver/MembershipNamenodeResolver.java @@ -33,6 +33,7 @@ import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.server.federation.store.DisabledNameserviceStore; import org.apache.hadoop.hdfs.server.federation.store.MembershipStore; @@ -74,13 +75,11 @@ public class MembershipNamenodeResolver /** Parent router ID. */ private String routerId; - /** Cached lookup of NN for nameservice with active state ranked first. + /** Cached lookup of namenodes for nameservice. The keys are a pair of the nameservice + * name and a boolean indicating if observer namenodes should be listed first. + * If true, observer namenodes are listed first. If false, active namenodes are listed first. * Invalidated on cache refresh. */ - private Map> cacheNS; - /** Cached lookup of NN for nameservice with observer state ranked first. - * Invalidated on cache refresh. */ - private Map> - observerFirstCacheNS; + private Map, List> cacheNS; /** Cached lookup of NN for block pool. Invalidated on cache refresh. */ private Map> cacheBP; @@ -90,7 +89,6 @@ public MembershipNamenodeResolver( this.stateStore = store; this.cacheNS = new ConcurrentHashMap<>(); - this.observerFirstCacheNS = new ConcurrentHashMap<>(); this.cacheBP = new ConcurrentHashMap<>(); if (this.stateStore != null) { @@ -140,7 +138,6 @@ public boolean loadCache(boolean force) { // Force refresh of active NN cache cacheBP.clear(); cacheNS.clear(); - observerFirstCacheNS.clear(); return true; } @@ -181,8 +178,8 @@ private void updateNameNodeState(final String nsId, record.getNameserviceId(), record.getNamenodeId(), state); membership.updateNamenodeRegistration(updateRequest); - cacheNS.remove(nsId); - observerFirstCacheNS.remove(nsId); + cacheNS.remove(Pair.of(nsId, Boolean.TRUE)); + cacheNS.remove(Pair.of(nsId, Boolean.FALSE)); // Invalidating the full cacheBp since getting the blockpool id from // namespace id is quite costly. cacheBP.clear(); @@ -195,10 +192,8 @@ private void updateNameNodeState(final String nsId, @Override public List getNamenodesForNameserviceId( final String nsId, boolean listObserversFirst) throws IOException { - Map> cache - = listObserversFirst ? observerFirstCacheNS : cacheNS; - List ret = cache.get(nsId); + List ret = cacheNS.get(Pair.of(nsId, listObserversFirst)); if (ret != null) { return ret; } @@ -240,7 +235,7 @@ public List getNamenodesForNameserviceId( // Cache the response ret = Collections.unmodifiableList(result); - cache.put(nsId, result); + cacheNS.put(Pair.of(nsId, listObserversFirst), result); return ret; } @@ -378,7 +373,7 @@ public Set getDisabledNamespaces() throws IOException { * * If observer read, * return registrations matching the query in this preference: - * 1) Most recently updated Observer registration + * 1) Observer registrations, shuffled to disperse queries. * 2) Most recently updated ACTIVE registration * 3) Most recently updated STANDBY registration (if showStandby) * 4) Most recently updated UNAVAILABLE registration (if showUnavailable). @@ -419,23 +414,20 @@ private List getRecentRegistrationForQuery( } } - if(!observerRead) { - Collections.sort(memberships, new NamenodePriorityComparator()); - LOG.debug("Selected most recent NN {} for query", memberships); - return memberships; - } else { + memberships.sort(new NamenodePriorityComparator()); + if(observerRead) { List ret = new ArrayList<>( memberships.size() + observerMemberships.size()); - Collections.sort(memberships, new NamenodePriorityComparator()); if(observerMemberships.size() > 1) { Collections.shuffle(observerMemberships); } ret.addAll(observerMemberships); ret.addAll(memberships); - - LOG.debug("Selected most recent NN {} for query", ret); - return ret; + memberships = ret; } + + LOG.debug("Selected most recent NN {} for query", memberships); + return memberships; } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java index 7df7ca01932b2..c598076f636e7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java @@ -191,9 +191,11 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic { FEDERATION_STORE_PREFIX + "enable"; public static final boolean DFS_ROUTER_STORE_ENABLE_DEFAULT = true; - public static final String DFS_ROUTER_OBSERVER_READ_ENABLE = - FEDERATION_ROUTER_PREFIX + "observer.read.enable"; - public static final boolean DFS_ROUTER_OBSERVER_READ_ENABLE_DEFAULT = false; + public static final String DFS_ROUTER_OBSERVER_READ_DEFAULT_KEY = + FEDERATION_ROUTER_PREFIX + "observer.read.default"; + public static final boolean DFS_ROUTER_OBSERVER_READ_DEFAULT_VALUE = false; + public static final String DFS_ROUTER_OBSERVER_READ_OVERRIDES = + FEDERATION_ROUTER_PREFIX + "observer.read.overrides"; public static final String DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE = FEDERATION_ROUTER_PREFIX + "observer.federated.state.propagation.maxsize"; diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index 9e45013e5612d..62ae4b0b95de7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -38,6 +38,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.LinkedList; import java.util.List; @@ -132,9 +133,9 @@ public class RouterRpcClient { /** Field separator of CallerContext. */ private final String contextFieldSeparator; /** Observer read enabled. Default for all nameservices. */ - private final boolean observerReadEnabled; - /** Nameservice specific override for enabling or disabling observer read. */ - private Map nsObserverReadEnabled = new ConcurrentHashMap<>(); + private final boolean observerReadEnabledDefault; + /** Nameservice specific overrides of the default setting for enabling observer reads. */ + private HashSet observerReadEnabledOverrides = new HashSet<>(); /** Pattern to parse a stack trace line. */ private static final Pattern STACK_TRACE_PATTERN = @@ -207,15 +208,14 @@ public RouterRpcClient(Configuration conf, Router router, failoverSleepBaseMillis, failoverSleepMaxMillis); String[] ipProxyUsers = conf.getStrings(DFS_NAMENODE_IP_PROXY_USERS); this.enableProxyUser = ipProxyUsers != null && ipProxyUsers.length > 0; - this.observerReadEnabled = conf.getBoolean( - RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_ENABLE, - RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_ENABLE_DEFAULT); - Map observerReadOverrides = conf - .getPropsWithPrefix(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_ENABLE + "."); - observerReadOverrides - .forEach((nsId, readEnabled) -> - nsObserverReadEnabled.put(nsId, Boolean.valueOf(readEnabled))); - if (this.observerReadEnabled) { + this.observerReadEnabledDefault = conf.getBoolean( + RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_KEY, + RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_VALUE); + String[] observerReadOverrides = conf.getStrings(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_OVERRIDES); + if (observerReadOverrides != null) { + observerReadEnabledOverrides.addAll(Arrays.asList(observerReadOverrides)); + } + if (this.observerReadEnabledDefault) { LOG.info("Observer read is enabled for router."); } } @@ -469,8 +469,8 @@ private RetryDecision shouldRetry(final IOException ioe, final int retryCount, * @param ugi User group information. * @param namenodes A prioritized list of namenodes within the same * nameservice. + * @param useObserver Whether to use observer namenodes. * @param method Remote ClientProtocol method to invoke. - * @param skipObserver Skip observer namenodes. * @param params Variable list of parameters matching the method. * @return The result of invoking the method. * @throws ConnectException If it cannot connect to any Namenode. @@ -481,8 +481,8 @@ private RetryDecision shouldRetry(final IOException ioe, final int retryCount, public Object invokeMethod( final UserGroupInformation ugi, final List namenodes, - final Class protocol, final Method method, boolean skipObserver, - final Object... params) + boolean useObserver, + final Class protocol, final Method method, final Object... params) throws ConnectException, StandbyException, IOException { if (namenodes == null || namenodes.isEmpty()) { @@ -498,11 +498,10 @@ public Object invokeMethod( rpcMonitor.proxyOp(); } boolean failover = false; - boolean tryActive = false; + boolean shouldUseObserver = useObserver; Map ioes = new LinkedHashMap<>(); for (FederationNamenodeContext namenode : namenodes) { - if ((tryActive || skipObserver) - && namenode.getState() == FederationNamenodeServiceState.OBSERVER) { + if (!shouldUseObserver && (namenode.getState() == FederationNamenodeServiceState.OBSERVER)) { continue; } ConnectionContext connection = null; @@ -531,8 +530,8 @@ public Object invokeMethod( ioes.put(namenode, ioe); if (ioe instanceof ObserverRetryOnActiveException) { LOG.info("Encountered ObserverRetryOnActiveException from {}." - + " Retry active namenode directly."); - tryActive = true; + + " Retry active namenode directly.", namenode); + shouldUseObserver = false; } else if (ioe instanceof StandbyException) { // Fail over indicated by retry policy and/or NN if (this.rpcMonitor != null) { @@ -881,7 +880,7 @@ public Object invokeSingle(final String nsId, RemoteMethod method) Class proto = method.getProtocol(); Method m = method.getMethod(); Object[] params = method.getParams(loc); - return invokeMethod(ugi, nns, proto, m, !isObserverRead, params); + return invokeMethod(ugi, nns, isObserverRead, proto, m, params); } finally { releasePermit(nsId, ugi, method, controller); } @@ -1050,7 +1049,7 @@ public RemoteResult invokeSequential( Class proto = remoteMethod.getProtocol(); Object[] params = remoteMethod.getParams(loc); Object result = invokeMethod( - ugi, namenodes, proto, m, !isObserverRead, params); + ugi, namenodes, isObserverRead, proto, m, params); // Check if the result is what we expected if (isExpectedClass(expectedResultClass, result) && isExpectedValue(expectedResultValue, result)) { @@ -1413,7 +1412,7 @@ public Map invokeConcurrent( Class proto = method.getProtocol(); Object[] paramList = method.getParams(location); R result = (R) invokeMethod( - ugi, namenodes, proto, m, !isObserverRead, paramList); + ugi, namenodes, isObserverRead, proto, m, paramList); RemoteResult remoteResult = new RemoteResult<>(location, result); return Collections.singletonList(remoteResult); } catch (IOException ioe) { @@ -1451,7 +1450,7 @@ public Map invokeConcurrent( () -> { transferThreadLocalContext(originCall, originContext); return invokeMethod( - ugi, nnList, proto, m, !isObserverRead, paramList); + ugi, nnList, isObserverRead, proto, m, paramList); }); } } else { @@ -1461,7 +1460,7 @@ public Map invokeConcurrent( () -> { transferThreadLocalContext(originCall, originContext); return invokeMethod( - ugi, namenodes, proto, m, !isObserverRead, paramList); + ugi, namenodes, isObserverRead, proto, m, paramList); }); } } @@ -1717,8 +1716,8 @@ private List getOrderedNamenodes(String nsI } private boolean isObserverReadEligible(String nsId, Method method) { - return nsObserverReadEnabled.getOrDefault(nsId, observerReadEnabled) - && isReadCall(method); + boolean isReadEnabledForNamespace = observerReadEnabledDefault != observerReadEnabledOverrides.contains(nsId); + return isReadEnabledForNamespace && isReadCall(method); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index b373ae61a92db..c4173163436ce 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -311,7 +311,7 @@ public RouterRpcServer(Configuration conf, Router router, GetUserMappingsProtocolProtos.GetUserMappingsProtocolService. newReflectiveBlockingService(getUserMappingXlator); - InetSocketAddress confRpcAddress = this.conf.getSocketAddr( + InetSocketAddress confRpcAddress = conf.getSocketAddr( RBFConfigKeys.DFS_ROUTER_RPC_BIND_HOST_KEY, RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_KEY, RBFConfigKeys.DFS_ROUTER_RPC_ADDRESS_DEFAULT, @@ -337,17 +337,18 @@ public RouterRpcServer(Configuration conf, Router router, .build(); // Add all the RPC protocols that the Router implements - DFSUtil.addPBProtocol(this.conf, NamenodeProtocolPB.class, nnPbService, this.rpcServer); - DFSUtil.addPBProtocol(this.conf, RefreshUserMappingsProtocolPB.class, + DFSUtil.addPBProtocol( + conf, NamenodeProtocolPB.class, nnPbService, this.rpcServer); + DFSUtil.addPBProtocol(conf, RefreshUserMappingsProtocolPB.class, refreshUserMappingService, this.rpcServer); - DFSUtil.addPBProtocol(this.conf, GetUserMappingsProtocolPB.class, + DFSUtil.addPBProtocol(conf, GetUserMappingsProtocolPB.class, getUserMappingService, this.rpcServer); // Set service-level authorization security policy - this.serviceAuthEnabled = this.conf.getBoolean( + this.serviceAuthEnabled = conf.getBoolean( HADOOP_SECURITY_AUTHORIZATION, false); if (this.serviceAuthEnabled) { - rpcServer.refreshServiceAcl(this.conf, new RouterPolicyProvider()); + rpcServer.refreshServiceAcl(conf, new RouterPolicyProvider()); } // We don't want the server to log the full stack trace for some exceptions @@ -371,14 +372,14 @@ public RouterRpcServer(Configuration conf, Router router, this.rpcAddress = new InetSocketAddress( confRpcAddress.getHostName(), listenAddress.getPort()); - if (this.conf.getBoolean(RBFConfigKeys.DFS_ROUTER_METRICS_ENABLE, + if (conf.getBoolean(RBFConfigKeys.DFS_ROUTER_METRICS_ENABLE, RBFConfigKeys.DFS_ROUTER_METRICS_ENABLE_DEFAULT)) { // Create metrics monitor Class rpcMonitorClass = this.conf.getClass( RBFConfigKeys.DFS_ROUTER_METRICS_CLASS, RBFConfigKeys.DFS_ROUTER_METRICS_CLASS_DEFAULT, RouterRpcMonitor.class); - this.rpcMonitor = ReflectionUtils.newInstance(rpcMonitorClass, this.conf); + this.rpcMonitor = ReflectionUtils.newInstance(rpcMonitorClass, conf); } else { this.rpcMonitor = null; } @@ -390,10 +391,10 @@ public RouterRpcServer(Configuration conf, Router router, // Initialize modules this.quotaCall = new Quota(this.router, this); this.nnProto = new RouterNamenodeProtocol(this); - this.clientProto = new RouterClientProtocol(this.conf, this); + this.clientProto = new RouterClientProtocol(conf, this); this.routerProto = new RouterUserProtocol(this); - long dnCacheExpire = this.conf.getTimeDuration( + long dnCacheExpire = conf.getTimeDuration( DN_REPORT_CACHE_EXPIRE, DN_REPORT_CACHE_EXPIRE_MS_DEFAULT, TimeUnit.MILLISECONDS); this.dnCache = CacheBuilder.newBuilder() diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml index c6320054f1300..52a1e3a3bd1e1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml @@ -836,12 +836,21 @@ - dfs.federation.router.observer.read.enable + dfs.federation.router.observer.read.default false - Enable observer read in router. This value is used across all nameservices - except when overridden by dfs.federation.router.observer.read.enable.EXAMPLENAMESERVICE - for a particular nameservice. + Whether observer reads are enabled. This is a default for all nameservices. + The default can be inverted for individual namespace by adding them to + dfs.federation.router.observer.read.overrides. + + + + + dfs.federation.router.observer.read.overrides + + + Commas separated list of namespaces for which to invert the default configuration, + dfs.federation.router.observer.read.default, for whether to enable observer reads. diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRefreshFairnessPolicyController.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRefreshFairnessPolicyController.java index 27eb485734405..0741f1aed441a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRefreshFairnessPolicyController.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/fairness/TestRouterRefreshFairnessPolicyController.java @@ -161,8 +161,8 @@ public void testRefreshStaticChangeHandlers() throws Exception { Thread.sleep(sleepTime); return null; }).when(client) - .invokeMethod(Mockito.any(), Mockito.any(), Mockito.any(), - Mockito.any(), Mockito.anyBoolean(), Mockito.any()); + .invokeMethod(Mockito.any(), Mockito.any(), Mockito.anyBoolean(), + Mockito.any(), Mockito.any(), Mockito.any()); // No calls yet assertEquals("{}", diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java index e82735bfd34d9..fbd731c073f4b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestObserverWithRouter.java @@ -55,7 +55,7 @@ public void startUpCluster(int numberOfObserver) throws Exception { public void startUpCluster(int numberOfObserver, Configuration confOverrides) throws Exception { int numberOfNamenode = 2 + numberOfObserver; Configuration conf = new Configuration(false); - conf.setBoolean(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_ENABLE, true); + conf.setBoolean(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_KEY, true); conf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true); conf.set(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, "0ms"); if (confOverrides != null) { @@ -168,7 +168,7 @@ public void testObserverReadWithoutFederatedStatePropagation() throws Exception public void testDisablingObserverReadUsingNameserviceOverride() throws Exception { // Disable observer reads using per-nameservice override Configuration confOverrides = new Configuration(false); - confOverrides.setBoolean(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_ENABLE + ".ns0", false); + confOverrides.set(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_OVERRIDES, "ns0"); startUpCluster(1, confOverrides); RouterContext routerContext = cluster.getRandomRouter(); @@ -319,7 +319,7 @@ public void testMultipleObserverRouter() throws Exception { sb.append(suffix); } routerConf.set(DFS_ROUTER_MONITOR_NAMENODE, sb.toString()); - routerConf.setBoolean(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_ENABLE, true); + routerConf.setBoolean(RBFConfigKeys.DFS_ROUTER_OBSERVER_READ_DEFAULT_KEY, true); routerConf.setBoolean(DFSConfigKeys.DFS_HA_TAILEDITS_INPROGRESS_KEY, true); routerConf.set(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, "0ms");