From a243ca8a045f0007dc7eb2aa8807c876c76b1e01 Mon Sep 17 00:00:00 2001 From: Simbarashe Dzinamarira Date: Wed, 7 Sep 2022 10:54:41 -0700 Subject: [PATCH 1/4] HDFS-13522: Add federated nameservices states to client protocol and propagate it between routers and clients. --- .../org/apache/hadoop/ipc/RpcConstants.java | 4 +- .../apache/hadoop/hdfs/ClientGSIContext.java | 28 ++++- .../hadoop/hdfs/NameNodeProxiesClient.java | 3 + .../apache/hadoop/hdfs/NamespaceStateId.java | 42 +++++++ .../federation/router/ConnectionManager.java | 33 ++++- .../federation/router/ConnectionPool.java | 19 ++- .../router/FederatedNamespaceIds.java | 113 ++++++++++++++++++ .../federation/router/RouterRpcClient.java | 9 +- .../federation/router/RouterRpcServer.java | 10 +- .../router/RouterStateIdContext.java | 94 +++++++++++++++ .../federation/FederationTestUtils.java | 3 +- .../router/TestConnectionManager.java | 36 +++--- .../router/TestRouterFederatedState.java | 24 ++-- .../hdfs/server/namenode/ha/HATestUtil.java | 6 +- 14 files changed, 370 insertions(+), 54 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NamespaceStateId.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederatedNamespaceIds.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java index d38474af26bf0..1202f9db94c60 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java @@ -37,7 +37,9 @@ private RpcConstants() { public static final int INVALID_RETRY_COUNT = -1; - + // Special state ID value to indicate client request header has routerFederatedState set. + public static final long REQUEST_HEADER_NAMESPACE_STATEIDS_SET = -2L; + /** * The Rpc-connection header is as follows * +----------------------------------+ diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java index 4de969642d574..8cf67ff6cc0b8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java @@ -25,7 +25,7 @@ import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto; import java.io.IOException; -import java.util.concurrent.atomic.LongAccumulator; +import org.apache.hadoop.thirdparty.protobuf.ByteString; /** * Global State Id context for the client. @@ -37,8 +37,17 @@ @InterfaceStability.Evolving public class ClientGSIContext implements AlignmentContext { - private final LongAccumulator lastSeenStateId = - new LongAccumulator(Math::max, Long.MIN_VALUE); + private final NamespaceStateId lastSeenStateId; + private ByteString routerFederatedState; + + public ClientGSIContext() { + this(new NamespaceStateId()); + } + + public ClientGSIContext(NamespaceStateId lastSeenStateId) { + this.lastSeenStateId = lastSeenStateId; + routerFederatedState = null; + } @Override public long getLastSeenStateId() { @@ -66,7 +75,11 @@ public void updateResponseState(RpcResponseHeaderProto.Builder header) { */ @Override public void receiveResponseState(RpcResponseHeaderProto header) { - lastSeenStateId.accumulate(header.getStateId()); + if (header.hasRouterFederatedState()) { + routerFederatedState = header.getRouterFederatedState(); + } else { + lastSeenStateId.update(header.getStateId()); + } } /** @@ -74,7 +87,12 @@ public void receiveResponseState(RpcResponseHeaderProto header) { */ @Override public void updateRequestState(RpcRequestHeaderProto.Builder header) { - header.setStateId(lastSeenStateId.longValue()); + if (lastSeenStateId.get() != NamespaceStateId.DEFAULT) { + header.setStateId(lastSeenStateId.get()); + } + if (routerFederatedState != null) { + header.setRouterFederatedState(routerFederatedState); + } } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java index aa9577330cfae..4acec82824238 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NameNodeProxiesClient.java @@ -349,6 +349,9 @@ public static ClientProtocol createProxyWithAlignmentContext( boolean withRetries, AtomicBoolean fallbackToSimpleAuth, AlignmentContext alignmentContext) throws IOException { + if (alignmentContext == null) { + alignmentContext = new ClientGSIContext(); + } RPC.setProtocolEngine(conf, ClientNamenodeProtocolPB.class, ProtobufRpcEngine2.class); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NamespaceStateId.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NamespaceStateId.java new file mode 100644 index 0000000000000..f6b6ad9479aac --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NamespaceStateId.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs; + +import java.util.concurrent.atomic.LongAccumulator; + +/** + * Object to hold the last-seen state Id for a namespace. + */ +public class NamespaceStateId { + public static final Long DEFAULT = Long.MIN_VALUE; + private final LongAccumulator lastSeenStateId = + new LongAccumulator(Math::max, DEFAULT); + + public long get() { + return lastSeenStateId.get(); + } + + public void update(Long stateId) { + lastSeenStateId.accumulate(stateId); + } + + public long getThenReset() { + return lastSeenStateId.getThenReset(); + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java index 5fe797bf5ce2c..550eac4c0df05 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java @@ -33,6 +33,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.hdfs.ClientGSIContext; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Time; @@ -73,6 +74,14 @@ public class ConnectionManager { /** Queue for creating new connections. */ private final BlockingQueue creatorQueue; + /** + * Store for NamespaceIds to use with observer namenodes. + */ + private final FederatedNamespaceIds federatedNamespaceIds; + /** + * Maps from connection pool ID to namespace. + */ + private final Map connectionPoolToNamespaceMap; /** Max size of queue for creating new connections. */ private final int creatorQueueMaxSize; @@ -85,15 +94,19 @@ public class ConnectionManager { /** If the connection manager is running. */ private boolean running = false; + public ConnectionManager(Configuration config) { + this(config, new FederatedNamespaceIds()); + } /** * Creates a proxy client connection pool manager. * * @param config Configuration for the connections. */ - public ConnectionManager(Configuration config) { + public ConnectionManager(Configuration config, FederatedNamespaceIds federatedNamespaceIds) { this.conf = config; - + this.federatedNamespaceIds = federatedNamespaceIds; + this.connectionPoolToNamespaceMap = new HashMap<>(); // Configure minimum, maximum and active connection pools this.maxSize = this.conf.getInt( RBFConfigKeys.DFS_ROUTER_NAMENODE_CONNECTION_POOL_SIZE, @@ -160,6 +173,10 @@ public void close() { pool.close(); } this.pools.clear(); + for (String nsID: connectionPoolToNamespaceMap.values()) { + federatedNamespaceIds.removeNamespaceId(nsID); + } + connectionPoolToNamespaceMap.clear(); } finally { writeLock.unlock(); } @@ -172,11 +189,12 @@ public void close() { * @param ugi User group information. * @param nnAddress Namenode address for the connection. * @param protocol Protocol for the connection. + * @param nsId Nameservice identity. * @return Proxy client to connect to nnId as UGI. * @throws IOException If the connection cannot be obtained. */ public ConnectionContext getConnection(UserGroupInformation ugi, - String nnAddress, Class protocol) throws IOException { + String nnAddress, Class protocol, String nsId) throws IOException { // Check if the manager is shutdown if (!this.running) { @@ -205,8 +223,10 @@ public ConnectionContext getConnection(UserGroupInformation ugi, if (pool == null) { pool = new ConnectionPool( this.conf, nnAddress, ugi, this.minSize, this.maxSize, - this.minActiveRatio, protocol); + this.minActiveRatio, protocol, + new ClientGSIContext(this.federatedNamespaceIds.getNamespaceId(nsId))); this.pools.put(connectionId, pool); + this.connectionPoolToNamespaceMap.put(connectionId, nsId); } } finally { writeLock.unlock(); @@ -430,6 +450,11 @@ public void run() { try { for (ConnectionPoolId poolId : toRemove) { pools.remove(poolId); + String nsID = connectionPoolToNamespaceMap.get(poolId); + connectionPoolToNamespaceMap.remove(poolId); + if (!connectionPoolToNamespaceMap.values().contains(nsID)) { + federatedNamespaceIds.removeNamespaceId(nsID); + } } } finally { writeLock.unlock(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java index a2aa7c869e5f6..d61d6ef2518fb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java @@ -32,6 +32,7 @@ import javax.net.SocketFactory; import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.ipc.AlignmentContext; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; @@ -108,6 +109,8 @@ public class ConnectionPool { /** Enable using multiple physical socket or not. **/ private final boolean enableMultiSocket; + /** StateID alignment context. */ + private final AlignmentContext alignmentContext; /** Map for the protocols and their protobuf implementations. */ private final static Map, ProtoImpl> PROTO_MAP = new HashMap<>(); @@ -138,7 +141,8 @@ private static class ProtoImpl { protected ConnectionPool(Configuration config, String address, UserGroupInformation user, int minPoolSize, int maxPoolSize, - float minActiveRatio, Class proto) throws IOException { + float minActiveRatio, Class proto, AlignmentContext alignmentContext) + throws IOException { this.conf = config; @@ -157,6 +161,8 @@ protected ConnectionPool(Configuration config, String address, RBFConfigKeys.DFS_ROUTER_NAMENODE_ENABLE_MULTIPLE_SOCKET_KEY, RBFConfigKeys.DFS_ROUTER_NAMENODE_ENABLE_MULTIPLE_SOCKET_DEFAULT); + this.alignmentContext = alignmentContext; + // Add minimum connections to the pool for (int i = 0; i < this.minSize; i++) { ConnectionContext newConnection = newConnection(); @@ -398,7 +404,7 @@ public String getJSON() { public ConnectionContext newConnection() throws IOException { return newConnection(this.conf, this.namenodeAddress, this.ugi, this.protocol, this.enableMultiSocket, - this.socketIndex.incrementAndGet()); + this.socketIndex.incrementAndGet(), alignmentContext); } /** @@ -413,13 +419,15 @@ public ConnectionContext newConnection() throws IOException { * @param ugi User context. * @param proto Interface of the protocol. * @param enableMultiSocket Enable multiple socket or not. + * @param alignmentContext client alignment context. * @return proto for the target ClientProtocol that contains the user's * security context. * @throws IOException If it cannot be created. */ protected static ConnectionContext newConnection(Configuration conf, String nnAddress, UserGroupInformation ugi, Class proto, - boolean enableMultiSocket, int socketIndex) throws IOException { + boolean enableMultiSocket, int socketIndex, + AlignmentContext alignmentContext) throws IOException { if (!PROTO_MAP.containsKey(proto)) { String msg = "Unsupported protocol for connection to NameNode: " + ((proto != null) ? proto.getName() : "null"); @@ -448,10 +456,11 @@ protected static ConnectionContext newConnection(Configuration conf, socket, classes.protoPb, ugi, RPC.getRpcTimeout(conf), defaultPolicy, conf, socketIndex); proxy = RPC.getProtocolProxy(classes.protoPb, version, connectionId, - conf, factory).getProxy(); + conf, factory, alignmentContext).getProxy(); } else { proxy = RPC.getProtocolProxy(classes.protoPb, version, socket, ugi, - conf, factory, RPC.getRpcTimeout(conf), defaultPolicy, null).getProxy(); + conf, factory, RPC.getRpcTimeout(conf), defaultPolicy, null, + alignmentContext).getProxy(); } T client = newProtoClient(proto, classes, proxy); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederatedNamespaceIds.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederatedNamespaceIds.java new file mode 100644 index 0000000000000..b45520459439c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederatedNamespaceIds.java @@ -0,0 +1,113 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.federation.router; + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.hdfs.NamespaceStateId; +import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RouterFederatedStateProto; +import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException; +import org.apache.hadoop.thirdparty.protobuf.ByteString; + + +/** + * Collection of last-seen namespace state Ids for a set of namespaces. + * A single NamespaceStateId is shared by all outgoing connections to a particular namespace. + * Router clients share and query the entire collection. + */ +public class FederatedNamespaceIds { + private final Map namespaceIdMap = new ConcurrentHashMap<>(); + private final ReentrantLock lock = new ReentrantLock(); + + public void updateStateUsingRequestHeader(RpcHeaderProtos.RpcRequestHeaderProto header) { + if (header.hasRouterFederatedState()) { + RouterFederatedStateProto federatedState; + try { + federatedState = RouterFederatedStateProto.parseFrom(header.getRouterFederatedState()); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + lock.lock(); + try { + federatedState.getNamespaceStateIdsMap().forEach((nsId, stateId) -> { + if (!namespaceIdMap.containsKey(nsId)) { + namespaceIdMap.putIfAbsent(nsId, new NamespaceStateId()); + } + namespaceIdMap.get(nsId).update(stateId); + }); + } finally { + lock.unlock(); + } + + } + } + + public void setResponseHeaderState(RpcHeaderProtos.RpcResponseHeaderProto.Builder headerBuilder) { + RouterFederatedStateProto.Builder federatedStateBuilder = + RouterFederatedStateProto.newBuilder(); + lock.lock(); + try { + namespaceIdMap.forEach((k, v) -> federatedStateBuilder.putNamespaceStateIds(k, v.get())); + } finally { + lock.unlock(); + } + headerBuilder.setRouterFederatedState(federatedStateBuilder.build().toByteString()); + } + + public NamespaceStateId getNamespaceId(String nsId) { + lock.lock(); + try { + namespaceIdMap.putIfAbsent(nsId, new NamespaceStateId()); + } finally { + lock.unlock(); + } + return namespaceIdMap.get(nsId); + } + + public void removeNamespaceId(String nsId) { + lock.lock(); + try { + namespaceIdMap.remove(nsId); + } finally { + lock.unlock(); + } + } + + /** + * Utility function to view state of routerFederatedState field in RPC headers. + */ + @VisibleForTesting + public static Map getRouterFederatedStateMap(ByteString byteString) { + if (byteString != null) { + RouterFederatedStateProto federatedState; + try { + federatedState = RouterFederatedStateProto.parseFrom(byteString); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + return federatedState.getNamespaceStateIdsMap(); + } else { + return Collections.emptyMap(); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index e90cc5fda41d1..1386fe352a94b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -146,7 +146,8 @@ public class RouterRpcClient { * @param monitor Optional performance monitor. */ public RouterRpcClient(Configuration conf, Router router, - ActiveNamenodeResolver resolver, RouterRpcMonitor monitor) { + ActiveNamenodeResolver resolver, RouterRpcMonitor monitor, + FederatedNamespaceIds federatedNamespaceIds) { this.router = router; this.namenodeResolver = resolver; @@ -155,7 +156,7 @@ public RouterRpcClient(Configuration conf, Router router, this.contextFieldSeparator = clientConf.get(HADOOP_CALLER_CONTEXT_SEPARATOR_KEY, HADOOP_CALLER_CONTEXT_SEPARATOR_DEFAULT); - this.connectionManager = new ConnectionManager(clientConf); + this.connectionManager = new ConnectionManager(clientConf, federatedNamespaceIds); this.connectionManager.start(); this.routerRpcFairnessPolicyController = FederationUtil.newFairnessPolicyController(conf); @@ -369,7 +370,7 @@ private ConnectionContext getConnection(UserGroupInformation ugi, String nsId, ugi.getUserName(), routerUser); } connection = this.connectionManager.getConnection( - connUGI, rpcAddress, proto); + connUGI, rpcAddress, proto, nsId); LOG.debug("User {} NN {} is using connection {}", ugi.getUserName(), rpcAddress, connection); } catch (Exception ex) { @@ -1636,7 +1637,7 @@ public Long getAcceptedPermitForNs(String ns) { /** * Refreshes/changes the fairness policy controller implementation if possible - * and returns the controller class name + * and returns the controller class name. * @param conf Configuration * @return New controller class name if successfully refreshed, else old controller class name */ diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index fc761934bc582..016f402b06f93 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -252,18 +252,18 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol, /** * Construct a router RPC server. * - * @param configuration HDFS Configuration. + * @param conf HDFS Configuration. * @param router A router using this RPC server. * @param nnResolver The NN resolver instance to determine active NNs in HA. * @param fileResolver File resolver to resolve file paths to subclusters. * @throws IOException If the RPC server could not be created. */ - public RouterRpcServer(Configuration configuration, Router router, + public RouterRpcServer(Configuration conf, Router router, ActiveNamenodeResolver nnResolver, FileSubclusterResolver fileResolver) throws IOException { super(RouterRpcServer.class.getName()); - this.conf = configuration; + this.conf = conf; this.router = router; this.namenodeResolver = nnResolver; this.subclusterResolver = fileResolver; @@ -321,6 +321,7 @@ public RouterRpcServer(Configuration configuration, Router router, // Create security manager this.securityManager = new RouterSecurityManager(this.conf); + FederatedNamespaceIds federatedNamespaceIds = new FederatedNamespaceIds(); this.rpcServer = new RPC.Builder(this.conf) .setProtocol(ClientNamenodeProtocolPB.class) @@ -331,6 +332,7 @@ public RouterRpcServer(Configuration configuration, Router router, .setnumReaders(readerCount) .setQueueSizePerHandler(handlerQueueSize) .setVerbose(false) + .setAlignmentContext(new RouterStateIdContext(federatedNamespaceIds)) .setSecretManager(this.securityManager.getSecretManager()) .build(); @@ -384,7 +386,7 @@ public RouterRpcServer(Configuration configuration, Router router, // Create the client this.rpcClient = new RouterRpcClient(this.conf, this.router, - this.namenodeResolver, this.rpcMonitor); + this.namenodeResolver, this.rpcMonitor, federatedNamespaceIds); // Initialize modules this.quotaCall = new Quota(this.router, this); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java new file mode 100644 index 0000000000000..2cf9b2721b709 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java @@ -0,0 +1,94 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.federation.router; + +import java.lang.reflect.Method; +import java.util.HashSet; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.server.namenode.ha.ReadOnly; +import org.apache.hadoop.ipc.AlignmentContext; +import org.apache.hadoop.ipc.RetriableException; +import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto; +import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto; + +import static org.apache.hadoop.ipc.RpcConstants.REQUEST_HEADER_NAMESPACE_STATEIDS_SET; + +/** + * This is the router implementation responsible for passing + * client state id to next level. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +class RouterStateIdContext implements AlignmentContext { + + private final HashSet coordinatedMethods; + private final FederatedNamespaceIds federatedNamespaceIds; + + RouterStateIdContext(FederatedNamespaceIds federatedNamespaceIds) { + this.federatedNamespaceIds = federatedNamespaceIds; + this.coordinatedMethods = new HashSet<>(); + // For now, only ClientProtocol methods can be coordinated, so only checking + // against ClientProtocol. + for (Method method : ClientProtocol.class.getDeclaredMethods()) { + if (method.isAnnotationPresent(ReadOnly.class) + && method.getAnnotationsByType(ReadOnly.class)[0].isCoordinated()) { + coordinatedMethods.add(method.getName()); + } + } + } + + @Override + public void updateResponseState(RpcResponseHeaderProto.Builder header) { + federatedNamespaceIds.setResponseHeaderState(header); + } + + @Override + public void receiveResponseState(RpcResponseHeaderProto header) { + // Do nothing. + } + + @Override + public void updateRequestState(RpcRequestHeaderProto.Builder header) { + // Do nothing. + } + + @Override + public long receiveRequestState(RpcRequestHeaderProto header, + long clientWaitTime) throws RetriableException { + federatedNamespaceIds.updateStateUsingRequestHeader(header); + if (header.hasRouterFederatedState()) { + return REQUEST_HEADER_NAMESPACE_STATEIDS_SET; + } + return header.getStateId(); + } + + @Override + public long getLastSeenStateId() { + return 0; + } + + @Override + public boolean isCoordinatedCall(String protocolName, String methodName) { + return protocolName.equals(ClientProtocol.class.getCanonicalName()) + && coordinatedMethods.contains(methodName); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java index 107a1ba9551a3..79c28986c33f1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java @@ -384,7 +384,8 @@ public static void simulateThrowExceptionRouterRpcServer( invocation.getMock()); throw new IOException("Simulate connectionManager throw IOException"); }).when(spyConnectionManager).getConnection( - any(UserGroupInformation.class), any(String.class), any(Class.class)); + any(UserGroupInformation.class), any(String.class), any(Class.class), + any(String.class)); Whitebox.setInternalState(rpcClient, "connectionManager", spyConnectionManager); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java index 04c2540c2aaf9..067d43dabd5fa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java @@ -81,15 +81,15 @@ public void shutdown() { public void testCleanup() throws Exception { Map poolMap = connManager.getPools(); - ConnectionPool pool1 = new ConnectionPool( - conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, 0.5f, ClientProtocol.class); + ConnectionPool pool1 = new ConnectionPool(conf, TEST_NN_ADDRESS, TEST_USER1, + 0, 10, 0.5f, ClientProtocol.class, null); addConnectionsToPool(pool1, 9, 4); poolMap.put( new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS, ClientProtocol.class), pool1); - ConnectionPool pool2 = new ConnectionPool( - conf, TEST_NN_ADDRESS, TEST_USER2, 0, 10, 0.5f, ClientProtocol.class); + ConnectionPool pool2 = new ConnectionPool(conf, TEST_NN_ADDRESS, TEST_USER2, + 0, 10, 0.5f, ClientProtocol.class, null); addConnectionsToPool(pool2, 10, 10); poolMap.put( new ConnectionPoolId(TEST_USER2, TEST_NN_ADDRESS, ClientProtocol.class), @@ -111,8 +111,8 @@ public void testCleanup() throws Exception { checkPoolConnections(TEST_USER2, 10, 10); // Make sure the number of connections doesn't go below minSize - ConnectionPool pool3 = new ConnectionPool( - conf, TEST_NN_ADDRESS, TEST_USER3, 2, 10, 0.5f, ClientProtocol.class); + ConnectionPool pool3 = new ConnectionPool(conf, TEST_NN_ADDRESS, TEST_USER3, + 2, 10, 0.5f, ClientProtocol.class, null); addConnectionsToPool(pool3, 8, 0); poolMap.put( new ConnectionPoolId(TEST_USER3, TEST_NN_ADDRESS, ClientProtocol.class), @@ -140,7 +140,7 @@ public void testGetConnectionWithConcurrency() throws Exception { ConnectionPool pool = new ConnectionPool( copyConf, TEST_NN_ADDRESS, TEST_USER1, 1, 10, 0.5f, - ClientProtocol.class); + ClientProtocol.class, null); poolMap.put( new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS, ClientProtocol.class), pool); @@ -174,8 +174,8 @@ public void testGetConnectionWithConcurrency() throws Exception { public void testConnectionCreatorWithException() throws Exception { // Create a bad connection pool pointing to unresolvable namenode address. ConnectionPool badPool = new ConnectionPool( - conf, UNRESOLVED_TEST_NN_ADDRESS, TEST_USER1, 0, 10, 0.5f, - ClientProtocol.class); + conf, UNRESOLVED_TEST_NN_ADDRESS, TEST_USER1, 0, 10, 0.5f, + ClientProtocol.class, null); BlockingQueue queue = new ArrayBlockingQueue<>(1); queue.add(badPool); ConnectionManager.ConnectionCreator connectionCreator = @@ -201,7 +201,7 @@ public void testGetConnectionWithException() throws Exception { // Create a bad connection pool pointing to unresolvable namenode address. ConnectionPool badPool = new ConnectionPool( conf, UNRESOLVED_TEST_NN_ADDRESS, TEST_USER1, 1, 10, 0.5f, - ClientProtocol.class); + ClientProtocol.class, null); } @Test @@ -210,8 +210,8 @@ public void testGetConnection() throws Exception { final int totalConns = 10; int activeConns = 5; - ConnectionPool pool = new ConnectionPool( - conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, 0.5f, ClientProtocol.class); + ConnectionPool pool = new ConnectionPool(conf, TEST_NN_ADDRESS, TEST_USER1, + 0, 10, 0.5f, ClientProtocol.class, null); addConnectionsToPool(pool, totalConns, activeConns); poolMap.put( new ConnectionPoolId(TEST_USER1, TEST_NN_ADDRESS, ClientProtocol.class), @@ -235,8 +235,8 @@ public void testGetConnection() throws Exception { @Test public void testValidClientIndex() throws Exception { - ConnectionPool pool = new ConnectionPool( - conf, TEST_NN_ADDRESS, TEST_USER1, 2, 2, 0.5f, ClientProtocol.class); + ConnectionPool pool = new ConnectionPool(conf, TEST_NN_ADDRESS, TEST_USER1, + 2, 2, 0.5f, ClientProtocol.class, null); for(int i = -3; i <= 3; i++) { pool.getClientIndex().set(i); ConnectionContext conn = pool.getConnection(); @@ -251,8 +251,8 @@ public void getGetConnectionNamenodeProtocol() throws Exception { final int totalConns = 10; int activeConns = 5; - ConnectionPool pool = new ConnectionPool( - conf, TEST_NN_ADDRESS, TEST_USER1, 0, 10, 0.5f, NamenodeProtocol.class); + ConnectionPool pool = new ConnectionPool(conf, TEST_NN_ADDRESS, TEST_USER1, + 0, 10, 0.5f, NamenodeProtocol.class, null); addConnectionsToPool(pool, totalConns, activeConns); poolMap.put( new ConnectionPoolId( @@ -325,7 +325,7 @@ private void testConnectionCleanup(float ratio, int totalConns, // Create one new connection pool tmpConnManager.getConnection(TEST_USER1, TEST_NN_ADDRESS, - NamenodeProtocol.class); + NamenodeProtocol.class, "ns0"); Map poolMap = tmpConnManager.getPools(); ConnectionPoolId connectionPoolId = new ConnectionPoolId(TEST_USER1, @@ -356,6 +356,6 @@ public void testUnsupportedProtoExceptionMsg() throws Exception { "Unsupported protocol for connection to NameNode: " + TestConnectionManager.class.getName(), () -> ConnectionPool.newConnection(conf, TEST_NN_ADDRESS, TEST_USER1, - TestConnectionManager.class, false, 0)); + TestConnectionManager.class, false, 0, null)); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederatedState.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederatedState.java index 60ab4b2c0bc48..c95e9d9e4ca8c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederatedState.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederatedState.java @@ -38,16 +38,22 @@ public class TestRouterFederatedState { @Test public void testRpcRouterFederatedState() throws InvalidProtocolBufferException { byte[] uuid = ClientId.getClientId(); - Map expectedStateIds = new HashMap() {{ - put("namespace1", 11L ); - put("namespace2", 22L); - }}; + Map expectedStateIds = new HashMap() { + { + put("namespace1", 11L ); + put("namespace2", 22L); + } + }; AlignmentContext alignmentContext = new AlignmentContextWithRouterState(expectedStateIds); RpcHeaderProtos.RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader( - RPC.RpcKind.RPC_PROTOCOL_BUFFER, RpcHeaderProtos.RpcRequestHeaderProto.OperationProto.RPC_FINAL_PACKET, 0, - RpcConstants.INVALID_RETRY_COUNT, uuid, alignmentContext); + RPC.RpcKind.RPC_PROTOCOL_BUFFER, + RpcHeaderProtos.RpcRequestHeaderProto.OperationProto.RPC_FINAL_PACKET, + 0, + RpcConstants.INVALID_RETRY_COUNT, + uuid, + alignmentContext); Map stateIdsFromHeader = RouterFederatedStateProto.parseFrom( @@ -59,9 +65,9 @@ public void testRpcRouterFederatedState() throws InvalidProtocolBufferException private static class AlignmentContextWithRouterState implements AlignmentContext { - Map routerFederatedState; + private Map routerFederatedState; - public AlignmentContextWithRouterState(Map namespaceStates) { + AlignmentContextWithRouterState(Map namespaceStates) { this.routerFederatedState = namespaceStates; } @@ -82,7 +88,7 @@ public void updateResponseState(RpcHeaderProtos.RpcResponseHeaderProto.Builder h public void receiveResponseState(RpcHeaderProtos.RpcResponseHeaderProto header) {} @Override - public long receiveRequestState(RpcHeaderProtos.RpcRequestHeaderProto header, long threshold) throws IOException { + public long receiveRequestState(RpcHeaderProtos.RpcRequestHeaderProto header, long threshold) { return 0; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java index 307fe04618ba6..a898fbe165295 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java @@ -35,8 +35,8 @@ import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.LongAccumulator; +import org.apache.hadoop.hdfs.NamespaceStateId; import org.apache.hadoop.thirdparty.com.google.common.base.Joiner; import org.slf4j.Logger; @@ -367,9 +367,9 @@ public static long setACStateId(DistributedFileSystem dfs, ClientGSIContext ac = (ClientGSIContext)(provider.getAlignmentContext()); Field f = ac.getClass().getDeclaredField("lastSeenStateId"); f.setAccessible(true); - LongAccumulator lastSeenStateId = (LongAccumulator)f.get(ac); + NamespaceStateId lastSeenStateId = (NamespaceStateId)f.get(ac); long currentStateId = lastSeenStateId.getThenReset(); - lastSeenStateId.accumulate(stateId); + lastSeenStateId.update(stateId); return currentStateId; } From 30497ad460cf879e940859548341b95d18d42b03 Mon Sep 17 00:00:00 2001 From: Simbarashe Dzinamarira Date: Wed, 7 Sep 2022 10:54:57 -0700 Subject: [PATCH 2/4] Avoiding denial of service between clients. --- .../java/org/apache/hadoop/ipc/Server.java | 14 +++ .../apache/hadoop/hdfs/ClientGSIContext.java | 4 +- .../federation/router/ConnectionManager.java | 7 +- .../federation/router/ConnectionPool.java | 12 ++- .../router/FederatedNamespaceIds.java | 61 +++---------- .../router/PoolAlignmentContext.java | 89 +++++++++++++++++++ .../federation/router/RBFConfigKeys.java | 4 + .../federation/router/RouterRpcClient.java | 15 +++- .../federation/router/RouterRpcServer.java | 8 +- .../router/RouterStateIdContext.java | 31 ++++--- .../src/main/resources/hdfs-rbf-default.xml | 11 +++ .../federation/FederationTestUtils.java | 2 +- .../router/TestConnectionManager.java | 2 +- .../router/TestRouterFederatedState.java | 3 +- 14 files changed, 189 insertions(+), 74 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PoolAlignmentContext.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index ec624cc3b7318..f5753efe7e512 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -937,6 +937,9 @@ public static class Call implements Schedulable, // the priority level assigned by scheduler, 0 by default private long clientStateId; private boolean isCallCoordinated; + // Serialized RouterFederatedStateProto message to + // store last seen states for multiple namespaces. + private ByteString federatedNamespaceState; Call() { this(RpcConstants.INVALID_CALL_ID, RpcConstants.INVALID_RETRY_COUNT, @@ -994,6 +997,14 @@ public ProcessingDetails getProcessingDetails() { return processingDetails; } + public void setFederatedNamespaceState(ByteString federatedNamespaceState) { + this.federatedNamespaceState = federatedNamespaceState; + } + + public ByteString getFederatedNamespaceState() { + return this.federatedNamespaceState; + } + @Override public String toString() { return "Call#" + callId + " Retry#" + retryCount; @@ -2868,6 +2879,9 @@ private void processRpcRequest(RpcRequestHeaderProto header, stateId = alignmentContext.receiveRequestState( header, getMaxIdleTime()); call.setClientStateId(stateId); + if (header.hasRouterFederatedState()) { + call.setFederatedNamespaceState(header.getRouterFederatedState()); + } } } catch (IOException ioe) { throw new RpcServerException("Processing RPC request caught ", ioe); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java index 8cf67ff6cc0b8..cd531e2cfe653 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java @@ -74,7 +74,7 @@ public void updateResponseState(RpcResponseHeaderProto.Builder header) { * in responses. */ @Override - public void receiveResponseState(RpcResponseHeaderProto header) { + public synchronized void receiveResponseState(RpcResponseHeaderProto header) { if (header.hasRouterFederatedState()) { routerFederatedState = header.getRouterFederatedState(); } else { @@ -86,7 +86,7 @@ public void receiveResponseState(RpcResponseHeaderProto header) { * Client side implementation for providing state alignment info in requests. */ @Override - public void updateRequestState(RpcRequestHeaderProto.Builder header) { + public synchronized void updateRequestState(RpcRequestHeaderProto.Builder header) { if (lastSeenStateId.get() != NamespaceStateId.DEFAULT) { header.setStateId(lastSeenStateId.get()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java index 550eac4c0df05..dfd23a943cc2c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java @@ -33,7 +33,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.classification.VisibleForTesting; -import org.apache.hadoop.hdfs.ClientGSIContext; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Time; @@ -194,7 +193,8 @@ public void close() { * @throws IOException If the connection cannot be obtained. */ public ConnectionContext getConnection(UserGroupInformation ugi, - String nnAddress, Class protocol, String nsId) throws IOException { + String nnAddress, Class protocol, String nsId, + Long clientStateId) throws IOException { // Check if the manager is shutdown if (!this.running) { @@ -224,10 +224,11 @@ public ConnectionContext getConnection(UserGroupInformation ugi, pool = new ConnectionPool( this.conf, nnAddress, ugi, this.minSize, this.maxSize, this.minActiveRatio, protocol, - new ClientGSIContext(this.federatedNamespaceIds.getNamespaceId(nsId))); + new PoolAlignmentContext(this.federatedNamespaceIds.getNamespaceId(nsId))); this.pools.put(connectionId, pool); this.connectionPoolToNamespaceMap.put(connectionId, nsId); } + pool.getPoolAlignmentContext().advanceClientStateId(clientStateId); } finally { writeLock.unlock(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java index d61d6ef2518fb..24b01a67d90f0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java @@ -110,7 +110,7 @@ public class ConnectionPool { /** Enable using multiple physical socket or not. **/ private final boolean enableMultiSocket; /** StateID alignment context. */ - private final AlignmentContext alignmentContext; + private final PoolAlignmentContext alignmentContext; /** Map for the protocols and their protobuf implementations. */ private final static Map, ProtoImpl> PROTO_MAP = new HashMap<>(); @@ -141,7 +141,7 @@ private static class ProtoImpl { protected ConnectionPool(Configuration config, String address, UserGroupInformation user, int minPoolSize, int maxPoolSize, - float minActiveRatio, Class proto, AlignmentContext alignmentContext) + float minActiveRatio, Class proto, PoolAlignmentContext alignmentContext) throws IOException { this.conf = config; @@ -217,6 +217,14 @@ public AtomicInteger getClientIndex() { return this.clientIndex; } + /** + * Get the alignment context for this pool + * @return Alignment context + */ + public PoolAlignmentContext getPoolAlignmentContext() { + return this.alignmentContext; + } + /** * Return the next connection round-robin. * diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederatedNamespaceIds.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederatedNamespaceIds.java index b45520459439c..1cf0316986214 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederatedNamespaceIds.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederatedNamespaceIds.java @@ -21,8 +21,6 @@ import java.util.Collections; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.locks.ReentrantLock; -import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.hdfs.NamespaceStateId; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos; import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RouterFederatedStateProto; @@ -36,67 +34,30 @@ * Router clients share and query the entire collection. */ public class FederatedNamespaceIds { - private final Map namespaceIdMap = new ConcurrentHashMap<>(); - private final ReentrantLock lock = new ReentrantLock(); - - public void updateStateUsingRequestHeader(RpcHeaderProtos.RpcRequestHeaderProto header) { - if (header.hasRouterFederatedState()) { - RouterFederatedStateProto federatedState; - try { - federatedState = RouterFederatedStateProto.parseFrom(header.getRouterFederatedState()); - } catch (InvalidProtocolBufferException e) { - throw new RuntimeException(e); - } - lock.lock(); - try { - federatedState.getNamespaceStateIdsMap().forEach((nsId, stateId) -> { - if (!namespaceIdMap.containsKey(nsId)) { - namespaceIdMap.putIfAbsent(nsId, new NamespaceStateId()); - } - namespaceIdMap.get(nsId).update(stateId); - }); - } finally { - lock.unlock(); - } - - } - } + private final ConcurrentHashMap namespaceIdMap = + new ConcurrentHashMap<>(); public void setResponseHeaderState(RpcHeaderProtos.RpcResponseHeaderProto.Builder headerBuilder) { + if (namespaceIdMap.isEmpty()) { + return; + } RouterFederatedStateProto.Builder federatedStateBuilder = RouterFederatedStateProto.newBuilder(); - lock.lock(); - try { - namespaceIdMap.forEach((k, v) -> federatedStateBuilder.putNamespaceStateIds(k, v.get())); - } finally { - lock.unlock(); - } + namespaceIdMap.forEach((k, v) -> federatedStateBuilder.putNamespaceStateIds(k, v.get())); headerBuilder.setRouterFederatedState(federatedStateBuilder.build().toByteString()); } public NamespaceStateId getNamespaceId(String nsId) { - lock.lock(); - try { - namespaceIdMap.putIfAbsent(nsId, new NamespaceStateId()); - } finally { - lock.unlock(); - } - return namespaceIdMap.get(nsId); + return namespaceIdMap.computeIfAbsent(nsId, key -> new NamespaceStateId()); } public void removeNamespaceId(String nsId) { - lock.lock(); - try { - namespaceIdMap.remove(nsId); - } finally { - lock.unlock(); - } + namespaceIdMap.remove(nsId); } /** - * Utility function to view state of routerFederatedState field in RPC headers. + * Utility function to parse routerFederatedState field in RPC headers. */ - @VisibleForTesting public static Map getRouterFederatedStateMap(ByteString byteString) { if (byteString != null) { RouterFederatedStateProto federatedState; @@ -110,4 +71,8 @@ public static Map getRouterFederatedStateMap(ByteString byteString return Collections.emptyMap(); } } + + public int size() { + return namespaceIdMap.size(); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PoolAlignmentContext.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PoolAlignmentContext.java new file mode 100644 index 0000000000000..6cd5270f440c6 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PoolAlignmentContext.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.federation.router; + +import java.io.IOException; +import org.apache.hadoop.hdfs.NamespaceStateId; +import org.apache.hadoop.ipc.AlignmentContext; +import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos; + + +public class PoolAlignmentContext implements AlignmentContext { + private NamespaceStateId sharedGlobalStateId; + private NamespaceStateId poolLocalStateId; + + PoolAlignmentContext(NamespaceStateId namespaceStateId) { + sharedGlobalStateId = namespaceStateId; + poolLocalStateId = new NamespaceStateId(); + } + + /** + * Client side implementation only receives state alignment info. + * It does not provide state alignment info therefore this does nothing. + */ + @Override + public void updateResponseState(RpcHeaderProtos.RpcResponseHeaderProto.Builder header) { + // Do nothing. + } + + /** + * Router update globally shared namespaceStateId value using response from + * namenodes. + */ + @Override + public void receiveResponseState(RpcHeaderProtos.RpcResponseHeaderProto header) { + sharedGlobalStateId.update(header.getStateId()); + } + + /** + * Client side implementation for routers to provide state info in requests to + * namenodes. + */ + @Override + public void updateRequestState(RpcHeaderProtos.RpcRequestHeaderProto.Builder header) { + long maxStateId = Long.max(poolLocalStateId.get(), sharedGlobalStateId.get()); + header.setStateId(maxStateId); + } + + /** + * Client side implementation only provides state alignment info in requests. + * Client does not receive RPC requests therefore this does nothing. + */ + @Override + public long receiveRequestState(RpcHeaderProtos.RpcRequestHeaderProto header, long threshold) + throws IOException { + // Do nothing. + return 0; + } + + @Override + public long getLastSeenStateId() { + return sharedGlobalStateId.get(); + } + + @Override + public boolean isCoordinatedCall(String protocolName, String method) { + throw new UnsupportedOperationException( + "Client should not be checking uncoordinated call"); + } + + public void advanceClientStateId(Long clientStateId) { + poolLocalStateId.update(clientStateId); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java index d727ab09f38e1..24a85c2d558b3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RBFConfigKeys.java @@ -191,6 +191,10 @@ public class RBFConfigKeys extends CommonConfigurationKeysPublic { FEDERATION_STORE_PREFIX + "enable"; public static final boolean DFS_ROUTER_STORE_ENABLE_DEFAULT = true; + public static final String DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE = + FEDERATION_ROUTER_PREFIX + "observer.federated.state.propagation.maxsize"; + public static final int DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE_DEFAULT = 5; + public static final String FEDERATION_STORE_SERIALIZER_CLASS = FEDERATION_STORE_PREFIX + "serializer"; public static final Class diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index 1386fe352a94b..6e2602deb1940 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -80,6 +80,7 @@ import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.net.ConnectTimeoutException; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.thirdparty.protobuf.ByteString; import org.apache.hadoop.util.StringUtils; import org.eclipse.jetty.util.ajax.JSON; import org.slf4j.Logger; @@ -369,8 +370,20 @@ private ConnectionContext getConnection(UserGroupInformation ugi, String nsId, connUGI = UserGroupInformation.createProxyUser( ugi.getUserName(), routerUser); } + + Long clientStateID = Long.MIN_VALUE; + Call call = Server.getCurCall().get(); + if (call != null) { + ByteString callFederatedNamespaceState = call.getFederatedNamespaceState(); + if (callFederatedNamespaceState != null) { + Map clientFederatedStateIds = + FederatedNamespaceIds.getRouterFederatedStateMap(callFederatedNamespaceState); + clientStateID = clientFederatedStateIds.getOrDefault(nsId, Long.MIN_VALUE); + } + } + connection = this.connectionManager.getConnection( - connUGI, rpcAddress, proto, nsId); + connUGI, rpcAddress, proto, nsId, clientStateID); LOG.debug("User {} NN {} is using connection {}", ugi.getUserName(), rpcAddress, connection); } catch (Exception ex) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index 016f402b06f93..821e873eef767 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -255,18 +255,18 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol, * @param conf HDFS Configuration. * @param router A router using this RPC server. * @param nnResolver The NN resolver instance to determine active NNs in HA. - * @param fileResolver File resolver to resolve file paths to subclusters. + * @param fResolver File resolver to resolve file paths to subclusters. * @throws IOException If the RPC server could not be created. */ public RouterRpcServer(Configuration conf, Router router, - ActiveNamenodeResolver nnResolver, FileSubclusterResolver fileResolver) + ActiveNamenodeResolver nnResolver, FileSubclusterResolver fResolver) throws IOException { super(RouterRpcServer.class.getName()); this.conf = conf; this.router = router; this.namenodeResolver = nnResolver; - this.subclusterResolver = fileResolver; + this.subclusterResolver = fResolver; // RPC server settings int handlerCount = this.conf.getInt(DFS_ROUTER_HANDLER_COUNT_KEY, @@ -332,7 +332,7 @@ public RouterRpcServer(Configuration conf, Router router, .setnumReaders(readerCount) .setQueueSizePerHandler(handlerQueueSize) .setVerbose(false) - .setAlignmentContext(new RouterStateIdContext(federatedNamespaceIds)) + .setAlignmentContext(new RouterStateIdContext(conf, federatedNamespaceIds)) .setSecretManager(this.securityManager.getSecretManager()) .build(); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java index 2cf9b2721b709..b535f9d56f05c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java @@ -23,6 +23,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.server.namenode.ha.ReadOnly; import org.apache.hadoop.ipc.AlignmentContext; @@ -30,11 +31,10 @@ import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto; -import static org.apache.hadoop.ipc.RpcConstants.REQUEST_HEADER_NAMESPACE_STATEIDS_SET; /** - * This is the router implementation responsible for passing - * client state id to next level. + * This is the router implementation to hold the state Ids for all + * namespaces. This object is only updated by responses from NameNodes. */ @InterfaceAudience.Private @InterfaceStability.Evolving @@ -42,8 +42,12 @@ class RouterStateIdContext implements AlignmentContext { private final HashSet coordinatedMethods; private final FederatedNamespaceIds federatedNamespaceIds; + /** + * Size limit for the map of state Ids to send to clients. + */ + private final int maxSizeOfFederatedStateToPropagate; - RouterStateIdContext(FederatedNamespaceIds federatedNamespaceIds) { + RouterStateIdContext(Configuration conf, FederatedNamespaceIds federatedNamespaceIds) { this.federatedNamespaceIds = federatedNamespaceIds; this.coordinatedMethods = new HashSet<>(); // For now, only ClientProtocol methods can be coordinated, so only checking @@ -54,11 +58,17 @@ class RouterStateIdContext implements AlignmentContext { coordinatedMethods.add(method.getName()); } } + + maxSizeOfFederatedStateToPropagate = + conf.getInt(RBFConfigKeys.DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE, + RBFConfigKeys.DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE_DEFAULT); } @Override public void updateResponseState(RpcResponseHeaderProto.Builder header) { - federatedNamespaceIds.setResponseHeaderState(header); + if (federatedNamespaceIds.size() <= maxSizeOfFederatedStateToPropagate) { + federatedNamespaceIds.setResponseHeaderState(header); + } } @Override @@ -71,14 +81,15 @@ public void updateRequestState(RpcRequestHeaderProto.Builder header) { // Do nothing. } + /** + * Routers do not update their state using information from clients + * to avoid clients interfering with one another. + */ @Override public long receiveRequestState(RpcRequestHeaderProto header, long clientWaitTime) throws RetriableException { - federatedNamespaceIds.updateStateUsingRequestHeader(header); - if (header.hasRouterFederatedState()) { - return REQUEST_HEADER_NAMESPACE_STATEIDS_SET; - } - return header.getStateId(); + // Do nothing. + return 0; } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml index cc5dbd2e05b57..7c0cb8b437024 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/resources/hdfs-rbf-default.xml @@ -834,4 +834,15 @@ (delete the source path directly) and skip (skip both trash and deletion). + + + dfs.federation.router.observer.federated.state.propagation.maxsize + 5 + + The maximum size of the federated state to send in the RPC header. Sending the federated + state removes the need to msync on every read call, but at the expense of having a larger + header. The cost tradeoff between the larger header and always msync'ing depends on the number + of namespaces in use and the latency of the msync requests. + + diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java index 79c28986c33f1..2cc41fadc2ff6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java @@ -385,7 +385,7 @@ public static void simulateThrowExceptionRouterRpcServer( throw new IOException("Simulate connectionManager throw IOException"); }).when(spyConnectionManager).getConnection( any(UserGroupInformation.class), any(String.class), any(Class.class), - any(String.class)); + any(String.class), any(Long.class)); Whitebox.setInternalState(rpcClient, "connectionManager", spyConnectionManager); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java index 067d43dabd5fa..8886829098099 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java @@ -325,7 +325,7 @@ private void testConnectionCleanup(float ratio, int totalConns, // Create one new connection pool tmpConnManager.getConnection(TEST_USER1, TEST_NN_ADDRESS, - NamenodeProtocol.class, "ns0"); + NamenodeProtocol.class, "ns0", 0L); Map poolMap = tmpConnManager.getPools(); ConnectionPoolId connectionPoolId = new ConnectionPoolId(TEST_USER1, diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederatedState.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederatedState.java index c95e9d9e4ca8c..2bc8cfc21b230 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederatedState.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterFederatedState.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hdfs.server.federation.router; -import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.ipc.AlignmentContext; @@ -40,7 +39,7 @@ public void testRpcRouterFederatedState() throws InvalidProtocolBufferException byte[] uuid = ClientId.getClientId(); Map expectedStateIds = new HashMap() { { - put("namespace1", 11L ); + put("namespace1", 11L); put("namespace2", 22L); } }; From 8b59b8077787cd383edf3b25012ba9765556daff Mon Sep 17 00:00:00 2001 From: Simbarashe Dzinamarira Date: Thu, 8 Sep 2022 10:10:58 -0700 Subject: [PATCH 3/4] Addressing review comments. Mostly refactoring. --- .../org/apache/hadoop/ipc/RpcConstants.java | 2 - .../apache/hadoop/hdfs/ClientGSIContext.java | 11 +-- .../apache/hadoop/hdfs/NamespaceStateId.java | 42 ---------- .../federation/router/ConnectionManager.java | 21 +++-- .../router/FederatedNamespaceIds.java | 78 ------------------- .../router/PoolAlignmentContext.java | 32 +++++--- .../federation/router/RouterRpcClient.java | 19 +---- .../federation/router/RouterRpcServer.java | 12 +-- .../router/RouterStateIdContext.java | 75 ++++++++++++++++-- .../federation/FederationTestUtils.java | 2 +- .../router/TestConnectionManager.java | 2 +- .../hdfs/server/namenode/ha/HATestUtil.java | 6 +- 12 files changed, 122 insertions(+), 180 deletions(-) delete mode 100644 hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NamespaceStateId.java delete mode 100644 hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederatedNamespaceIds.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java index 1202f9db94c60..e0ab000703398 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java @@ -37,8 +37,6 @@ private RpcConstants() { public static final int INVALID_RETRY_COUNT = -1; - // Special state ID value to indicate client request header has routerFederatedState set. - public static final long REQUEST_HEADER_NAMESPACE_STATEIDS_SET = -2L; /** * The Rpc-connection header is as follows diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java index cd531e2cfe653..7bd63dd59bad0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs; +import java.util.concurrent.atomic.LongAccumulator; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.ipc.AlignmentContext; @@ -37,14 +38,14 @@ @InterfaceStability.Evolving public class ClientGSIContext implements AlignmentContext { - private final NamespaceStateId lastSeenStateId; + private final LongAccumulator lastSeenStateId; private ByteString routerFederatedState; public ClientGSIContext() { - this(new NamespaceStateId()); + this(new LongAccumulator(Math::max, Long.MIN_VALUE)); } - public ClientGSIContext(NamespaceStateId lastSeenStateId) { + public ClientGSIContext(LongAccumulator lastSeenStateId) { this.lastSeenStateId = lastSeenStateId; routerFederatedState = null; } @@ -78,7 +79,7 @@ public synchronized void receiveResponseState(RpcResponseHeaderProto header) { if (header.hasRouterFederatedState()) { routerFederatedState = header.getRouterFederatedState(); } else { - lastSeenStateId.update(header.getStateId()); + lastSeenStateId.accumulate(header.getStateId()); } } @@ -87,7 +88,7 @@ public synchronized void receiveResponseState(RpcResponseHeaderProto header) { */ @Override public synchronized void updateRequestState(RpcRequestHeaderProto.Builder header) { - if (lastSeenStateId.get() != NamespaceStateId.DEFAULT) { + if (lastSeenStateId.get() != Long.MIN_VALUE) { header.setStateId(lastSeenStateId.get()); } if (routerFederatedState != null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NamespaceStateId.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NamespaceStateId.java deleted file mode 100644 index f6b6ad9479aac..0000000000000 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/NamespaceStateId.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdfs; - -import java.util.concurrent.atomic.LongAccumulator; - -/** - * Object to hold the last-seen state Id for a namespace. - */ -public class NamespaceStateId { - public static final Long DEFAULT = Long.MIN_VALUE; - private final LongAccumulator lastSeenStateId = - new LongAccumulator(Math::max, DEFAULT); - - public long get() { - return lastSeenStateId.get(); - } - - public void update(Long stateId) { - lastSeenStateId.accumulate(stateId); - } - - public long getThenReset() { - return lastSeenStateId.getThenReset(); - } -} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java index dfd23a943cc2c..b4882d2bc0047 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java @@ -74,9 +74,9 @@ public class ConnectionManager { /** Queue for creating new connections. */ private final BlockingQueue creatorQueue; /** - * Store for NamespaceIds to use with observer namenodes. + * Global federated namespace context for router. */ - private final FederatedNamespaceIds federatedNamespaceIds; + private final RouterStateIdContext routerStateIdContext; /** * Maps from connection pool ID to namespace. */ @@ -94,7 +94,7 @@ public class ConnectionManager { private boolean running = false; public ConnectionManager(Configuration config) { - this(config, new FederatedNamespaceIds()); + this(config, new RouterStateIdContext(config)); } /** @@ -102,9 +102,9 @@ public ConnectionManager(Configuration config) { * * @param config Configuration for the connections. */ - public ConnectionManager(Configuration config, FederatedNamespaceIds federatedNamespaceIds) { + public ConnectionManager(Configuration config, RouterStateIdContext routerStateIdContext) { this.conf = config; - this.federatedNamespaceIds = federatedNamespaceIds; + this.routerStateIdContext = routerStateIdContext; this.connectionPoolToNamespaceMap = new HashMap<>(); // Configure minimum, maximum and active connection pools this.maxSize = this.conf.getInt( @@ -173,7 +173,7 @@ public void close() { } this.pools.clear(); for (String nsID: connectionPoolToNamespaceMap.values()) { - federatedNamespaceIds.removeNamespaceId(nsID); + routerStateIdContext.removeNamespaceStateId(nsID); } connectionPoolToNamespaceMap.clear(); } finally { @@ -193,9 +193,7 @@ public void close() { * @throws IOException If the connection cannot be obtained. */ public ConnectionContext getConnection(UserGroupInformation ugi, - String nnAddress, Class protocol, String nsId, - Long clientStateId) throws IOException { - + String nnAddress, Class protocol, String nsId) throws IOException { // Check if the manager is shutdown if (!this.running) { LOG.error( @@ -224,10 +222,11 @@ public ConnectionContext getConnection(UserGroupInformation ugi, pool = new ConnectionPool( this.conf, nnAddress, ugi, this.minSize, this.maxSize, this.minActiveRatio, protocol, - new PoolAlignmentContext(this.federatedNamespaceIds.getNamespaceId(nsId))); + new PoolAlignmentContext(this.routerStateIdContext, nsId)); this.pools.put(connectionId, pool); this.connectionPoolToNamespaceMap.put(connectionId, nsId); } + long clientStateId = RouterStateIdContext.getClientStateIdFromCurrentCall(nsId); pool.getPoolAlignmentContext().advanceClientStateId(clientStateId); } finally { writeLock.unlock(); @@ -454,7 +453,7 @@ public void run() { String nsID = connectionPoolToNamespaceMap.get(poolId); connectionPoolToNamespaceMap.remove(poolId); if (!connectionPoolToNamespaceMap.values().contains(nsID)) { - federatedNamespaceIds.removeNamespaceId(nsID); + routerStateIdContext.removeNamespaceStateId(nsID); } } } finally { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederatedNamespaceIds.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederatedNamespaceIds.java deleted file mode 100644 index 1cf0316986214..0000000000000 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/FederatedNamespaceIds.java +++ /dev/null @@ -1,78 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdfs.server.federation.router; - -import java.util.Collections; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import org.apache.hadoop.hdfs.NamespaceStateId; -import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos; -import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos.RouterFederatedStateProto; -import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException; -import org.apache.hadoop.thirdparty.protobuf.ByteString; - - -/** - * Collection of last-seen namespace state Ids for a set of namespaces. - * A single NamespaceStateId is shared by all outgoing connections to a particular namespace. - * Router clients share and query the entire collection. - */ -public class FederatedNamespaceIds { - private final ConcurrentHashMap namespaceIdMap = - new ConcurrentHashMap<>(); - - public void setResponseHeaderState(RpcHeaderProtos.RpcResponseHeaderProto.Builder headerBuilder) { - if (namespaceIdMap.isEmpty()) { - return; - } - RouterFederatedStateProto.Builder federatedStateBuilder = - RouterFederatedStateProto.newBuilder(); - namespaceIdMap.forEach((k, v) -> federatedStateBuilder.putNamespaceStateIds(k, v.get())); - headerBuilder.setRouterFederatedState(federatedStateBuilder.build().toByteString()); - } - - public NamespaceStateId getNamespaceId(String nsId) { - return namespaceIdMap.computeIfAbsent(nsId, key -> new NamespaceStateId()); - } - - public void removeNamespaceId(String nsId) { - namespaceIdMap.remove(nsId); - } - - /** - * Utility function to parse routerFederatedState field in RPC headers. - */ - public static Map getRouterFederatedStateMap(ByteString byteString) { - if (byteString != null) { - RouterFederatedStateProto federatedState; - try { - federatedState = RouterFederatedStateProto.parseFrom(byteString); - } catch (InvalidProtocolBufferException e) { - throw new RuntimeException(e); - } - return federatedState.getNamespaceStateIdsMap(); - } else { - return Collections.emptyMap(); - } - } - - public int size() { - return namespaceIdMap.size(); - } -} diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PoolAlignmentContext.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PoolAlignmentContext.java index 6cd5270f440c6..571f41c4d542c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PoolAlignmentContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/PoolAlignmentContext.java @@ -19,18 +19,32 @@ package org.apache.hadoop.hdfs.server.federation.router; import java.io.IOException; -import org.apache.hadoop.hdfs.NamespaceStateId; +import java.util.concurrent.atomic.LongAccumulator; import org.apache.hadoop.ipc.AlignmentContext; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos; +/** + * An alignment context shared by all connections in a {@link ConnectionPool}. + * There is a distinct connection pool for each [namespace,UGI] pairing. + *

+ * {@link #sharedGlobalStateId} is a reference to a + * shared {@link LongAccumulator} object in the {@link RouterStateIdContext}. + * {@link #poolLocalStateId} is specific to each PoolAlignmentContext. + *

+ * The shared {@link #sharedGlobalStateId} is updated only using + * responses from NameNodes, so clients cannot poison it. + * {@link #poolLocalStateId} is used to propagate client observed + * state into NameNode requests. A misbehaving client can poison this but the effect is only + * visible to other clients with the same UGI and accessing the same namespace. + */ public class PoolAlignmentContext implements AlignmentContext { - private NamespaceStateId sharedGlobalStateId; - private NamespaceStateId poolLocalStateId; + private LongAccumulator sharedGlobalStateId; + private LongAccumulator poolLocalStateId; - PoolAlignmentContext(NamespaceStateId namespaceStateId) { - sharedGlobalStateId = namespaceStateId; - poolLocalStateId = new NamespaceStateId(); + PoolAlignmentContext(RouterStateIdContext routerStateIdContext, String namespaceId) { + sharedGlobalStateId = routerStateIdContext.getNamespaceStateId(namespaceId); + poolLocalStateId = new LongAccumulator(Math::max, Long.MIN_VALUE); } /** @@ -43,12 +57,12 @@ public void updateResponseState(RpcHeaderProtos.RpcResponseHeaderProto.Builder h } /** - * Router update globally shared namespaceStateId value using response from + * Router updates a globally shared value using response from * namenodes. */ @Override public void receiveResponseState(RpcHeaderProtos.RpcResponseHeaderProto header) { - sharedGlobalStateId.update(header.getStateId()); + sharedGlobalStateId.accumulate(header.getStateId()); } /** @@ -84,6 +98,6 @@ public boolean isCoordinatedCall(String protocolName, String method) { } public void advanceClientStateId(Long clientStateId) { - poolLocalStateId.update(clientStateId); + poolLocalStateId.accumulate(clientStateId); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java index 6e2602deb1940..9ee96b438a7a3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java @@ -80,7 +80,6 @@ import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.net.ConnectTimeoutException; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.thirdparty.protobuf.ByteString; import org.apache.hadoop.util.StringUtils; import org.eclipse.jetty.util.ajax.JSON; import org.slf4j.Logger; @@ -148,7 +147,7 @@ public class RouterRpcClient { */ public RouterRpcClient(Configuration conf, Router router, ActiveNamenodeResolver resolver, RouterRpcMonitor monitor, - FederatedNamespaceIds federatedNamespaceIds) { + RouterStateIdContext routerStateIdContext) { this.router = router; this.namenodeResolver = resolver; @@ -157,7 +156,7 @@ public RouterRpcClient(Configuration conf, Router router, this.contextFieldSeparator = clientConf.get(HADOOP_CALLER_CONTEXT_SEPARATOR_KEY, HADOOP_CALLER_CONTEXT_SEPARATOR_DEFAULT); - this.connectionManager = new ConnectionManager(clientConf, federatedNamespaceIds); + this.connectionManager = new ConnectionManager(clientConf, routerStateIdContext); this.connectionManager.start(); this.routerRpcFairnessPolicyController = FederationUtil.newFairnessPolicyController(conf); @@ -370,20 +369,8 @@ private ConnectionContext getConnection(UserGroupInformation ugi, String nsId, connUGI = UserGroupInformation.createProxyUser( ugi.getUserName(), routerUser); } - - Long clientStateID = Long.MIN_VALUE; - Call call = Server.getCurCall().get(); - if (call != null) { - ByteString callFederatedNamespaceState = call.getFederatedNamespaceState(); - if (callFederatedNamespaceState != null) { - Map clientFederatedStateIds = - FederatedNamespaceIds.getRouterFederatedStateMap(callFederatedNamespaceState); - clientStateID = clientFederatedStateIds.getOrDefault(nsId, Long.MIN_VALUE); - } - } - connection = this.connectionManager.getConnection( - connUGI, rpcAddress, proto, nsId, clientStateID); + connUGI, rpcAddress, proto, nsId); LOG.debug("User {} NN {} is using connection {}", ugi.getUserName(), rpcAddress, connection); } catch (Exception ex) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java index 821e873eef767..86fda12307cec 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java @@ -255,18 +255,18 @@ public class RouterRpcServer extends AbstractService implements ClientProtocol, * @param conf HDFS Configuration. * @param router A router using this RPC server. * @param nnResolver The NN resolver instance to determine active NNs in HA. - * @param fResolver File resolver to resolve file paths to subclusters. + * @param fileResolver File resolver to resolve file paths to subclusters. * @throws IOException If the RPC server could not be created. */ public RouterRpcServer(Configuration conf, Router router, - ActiveNamenodeResolver nnResolver, FileSubclusterResolver fResolver) + ActiveNamenodeResolver nnResolver, FileSubclusterResolver fileResolver) throws IOException { super(RouterRpcServer.class.getName()); this.conf = conf; this.router = router; this.namenodeResolver = nnResolver; - this.subclusterResolver = fResolver; + this.subclusterResolver = fileResolver; // RPC server settings int handlerCount = this.conf.getInt(DFS_ROUTER_HANDLER_COUNT_KEY, @@ -321,7 +321,7 @@ public RouterRpcServer(Configuration conf, Router router, // Create security manager this.securityManager = new RouterSecurityManager(this.conf); - FederatedNamespaceIds federatedNamespaceIds = new FederatedNamespaceIds(); + RouterStateIdContext routerStateIdContext = new RouterStateIdContext(conf); this.rpcServer = new RPC.Builder(this.conf) .setProtocol(ClientNamenodeProtocolPB.class) @@ -332,7 +332,7 @@ public RouterRpcServer(Configuration conf, Router router, .setnumReaders(readerCount) .setQueueSizePerHandler(handlerQueueSize) .setVerbose(false) - .setAlignmentContext(new RouterStateIdContext(conf, federatedNamespaceIds)) + .setAlignmentContext(routerStateIdContext) .setSecretManager(this.securityManager.getSecretManager()) .build(); @@ -386,7 +386,7 @@ public RouterRpcServer(Configuration conf, Router router, // Create the client this.rpcClient = new RouterRpcClient(this.conf, this.router, - this.namenodeResolver, this.rpcMonitor, federatedNamespaceIds); + this.namenodeResolver, this.rpcMonitor, routerStateIdContext); // Initialize modules this.quotaCall = new Quota(this.router, this); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java index b535f9d56f05c..9d2b75b0b552b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStateIdContext.java @@ -19,17 +19,25 @@ package org.apache.hadoop.hdfs.server.federation.router; import java.lang.reflect.Method; +import java.util.Collections; import java.util.HashSet; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.LongAccumulator; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.federation.protocol.proto.HdfsServerFederationProtos; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.server.namenode.ha.ReadOnly; import org.apache.hadoop.ipc.AlignmentContext; import org.apache.hadoop.ipc.RetriableException; +import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto; +import org.apache.hadoop.thirdparty.protobuf.ByteString; +import org.apache.hadoop.thirdparty.protobuf.InvalidProtocolBufferException; /** @@ -41,14 +49,16 @@ class RouterStateIdContext implements AlignmentContext { private final HashSet coordinatedMethods; - private final FederatedNamespaceIds federatedNamespaceIds; /** - * Size limit for the map of state Ids to send to clients. + * Collection of last-seen namespace state Ids for a set of namespaces. + * Each value is globally shared by all outgoing connections to a particular namespace, + * so updates should only be performed using reliable responses from NameNodes. */ + private final ConcurrentHashMap namespaceIdMap; + // Size limit for the map of state Ids to send to clients. private final int maxSizeOfFederatedStateToPropagate; - RouterStateIdContext(Configuration conf, FederatedNamespaceIds federatedNamespaceIds) { - this.federatedNamespaceIds = federatedNamespaceIds; + RouterStateIdContext(Configuration conf) { this.coordinatedMethods = new HashSet<>(); // For now, only ClientProtocol methods can be coordinated, so only checking // against ClientProtocol. @@ -59,15 +69,68 @@ class RouterStateIdContext implements AlignmentContext { } } + namespaceIdMap = new ConcurrentHashMap<>(); + maxSizeOfFederatedStateToPropagate = conf.getInt(RBFConfigKeys.DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE, RBFConfigKeys.DFS_ROUTER_OBSERVER_FEDERATED_STATE_PROPAGATION_MAXSIZE_DEFAULT); } + /** + * Adds the {@link #namespaceIdMap} to the response header that will be sent to a client. + */ + public void setResponseHeaderState(RpcResponseHeaderProto.Builder headerBuilder) { + if (namespaceIdMap.isEmpty()) { + return; + } + HdfsServerFederationProtos.RouterFederatedStateProto.Builder federatedStateBuilder = + HdfsServerFederationProtos.RouterFederatedStateProto.newBuilder(); + namespaceIdMap.forEach((k, v) -> federatedStateBuilder.putNamespaceStateIds(k, v.get())); + headerBuilder.setRouterFederatedState(federatedStateBuilder.build().toByteString()); + } + + public LongAccumulator getNamespaceStateId(String nsId) { + return namespaceIdMap.computeIfAbsent(nsId, key -> new LongAccumulator(Math::max, Long.MIN_VALUE)); + } + + public void removeNamespaceStateId(String nsId) { + namespaceIdMap.remove(nsId); + } + + /** + * Utility function to parse routerFederatedState field in RPC headers. + */ + public static Map getRouterFederatedStateMap(ByteString byteString) { + if (byteString != null) { + HdfsServerFederationProtos.RouterFederatedStateProto federatedState; + try { + federatedState = HdfsServerFederationProtos.RouterFederatedStateProto.parseFrom(byteString); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + return federatedState.getNamespaceStateIdsMap(); + } else { + return Collections.emptyMap(); + } + } + + public static long getClientStateIdFromCurrentCall(String nsId) { + Long clientStateID = Long.MIN_VALUE; + Server.Call call = Server.getCurCall().get(); + if (call != null) { + ByteString callFederatedNamespaceState = call.getFederatedNamespaceState(); + if (callFederatedNamespaceState != null) { + Map clientFederatedStateIds = getRouterFederatedStateMap(callFederatedNamespaceState); + clientStateID = clientFederatedStateIds.getOrDefault(nsId, Long.MIN_VALUE); + } + } + return clientStateID; + } + @Override public void updateResponseState(RpcResponseHeaderProto.Builder header) { - if (federatedNamespaceIds.size() <= maxSizeOfFederatedStateToPropagate) { - federatedNamespaceIds.setResponseHeaderState(header); + if (namespaceIdMap.size() <= maxSizeOfFederatedStateToPropagate) { + setResponseHeaderState(header); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java index 2cc41fadc2ff6..79c28986c33f1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/FederationTestUtils.java @@ -385,7 +385,7 @@ public static void simulateThrowExceptionRouterRpcServer( throw new IOException("Simulate connectionManager throw IOException"); }).when(spyConnectionManager).getConnection( any(UserGroupInformation.class), any(String.class), any(Class.class), - any(String.class), any(Long.class)); + any(String.class)); Whitebox.setInternalState(rpcClient, "connectionManager", spyConnectionManager); diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java index 8886829098099..067d43dabd5fa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestConnectionManager.java @@ -325,7 +325,7 @@ private void testConnectionCleanup(float ratio, int totalConns, // Create one new connection pool tmpConnManager.getConnection(TEST_USER1, TEST_NN_ADDRESS, - NamenodeProtocol.class, "ns0", 0L); + NamenodeProtocol.class, "ns0"); Map poolMap = tmpConnManager.getPools(); ConnectionPoolId connectionPoolId = new ConnectionPoolId(TEST_USER1, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java index a898fbe165295..307fe04618ba6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/HATestUtil.java @@ -35,8 +35,8 @@ import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.LongAccumulator; -import org.apache.hadoop.hdfs.NamespaceStateId; import org.apache.hadoop.thirdparty.com.google.common.base.Joiner; import org.slf4j.Logger; @@ -367,9 +367,9 @@ public static long setACStateId(DistributedFileSystem dfs, ClientGSIContext ac = (ClientGSIContext)(provider.getAlignmentContext()); Field f = ac.getClass().getDeclaredField("lastSeenStateId"); f.setAccessible(true); - NamespaceStateId lastSeenStateId = (NamespaceStateId)f.get(ac); + LongAccumulator lastSeenStateId = (LongAccumulator)f.get(ac); long currentStateId = lastSeenStateId.getThenReset(); - lastSeenStateId.update(stateId); + lastSeenStateId.accumulate(stateId); return currentStateId; } From faa6493401e587517562de9c7fab9f3d717503db Mon Sep 17 00:00:00 2001 From: Simbarashe Dzinamarira Date: Fri, 9 Sep 2022 13:23:05 -0700 Subject: [PATCH 4/4] Formatting fixes. --- .../src/main/java/org/apache/hadoop/ipc/RpcConstants.java | 2 +- .../src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java | 2 +- .../hadoop/hdfs/server/federation/router/ConnectionManager.java | 2 +- .../hadoop/hdfs/server/federation/router/ConnectionPool.java | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java index e0ab000703398..d38474af26bf0 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java @@ -37,7 +37,7 @@ private RpcConstants() { public static final int INVALID_RETRY_COUNT = -1; - + /** * The Rpc-connection header is as follows * +----------------------------------+ diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java index 7bd63dd59bad0..bcbb4b96c2aeb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hdfs; -import java.util.concurrent.atomic.LongAccumulator; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.ipc.AlignmentContext; @@ -26,6 +25,7 @@ import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto; import java.io.IOException; +import java.util.concurrent.atomic.LongAccumulator; import org.apache.hadoop.thirdparty.protobuf.ByteString; /** diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java index b4882d2bc0047..c6db9837c7cae 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionManager.java @@ -78,7 +78,7 @@ public class ConnectionManager { */ private final RouterStateIdContext routerStateIdContext; /** - * Maps from connection pool ID to namespace. + * Map from connection pool ID to namespace. */ private final Map connectionPoolToNamespaceMap; /** Max size of queue for creating new connections. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java index 24b01a67d90f0..9a9abff0677ba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ConnectionPool.java @@ -427,7 +427,7 @@ public ConnectionContext newConnection() throws IOException { * @param ugi User context. * @param proto Interface of the protocol. * @param enableMultiSocket Enable multiple socket or not. - * @param alignmentContext client alignment context. + * @param alignmentContext Client alignment context. * @return proto for the target ClientProtocol that contains the user's * security context. * @throws IOException If it cannot be created.