diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java index 24a3167b3db2d..55815c3a74f38 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java @@ -25,6 +25,8 @@ import org.apache.hadoop.crypto.OpensslAesCtrCryptoCodec; import org.apache.hadoop.crypto.OpensslSm4CtrCryptoCodec; +import java.util.concurrent.TimeUnit; + /** * This class contains constants for configuration keys used * in the common code. @@ -1077,6 +1079,13 @@ public class CommonConfigurationKeysPublic { "ipc.server.metrics.update.runner.interval"; public static final int IPC_SERVER_METRICS_UPDATE_RUNNER_INTERVAL_DEFAULT = 5000; + public static final String IPC_SERVER_OBSERVER_STALE_RPC_ENABLE = + "ipc.server.observer.stale.rpc.enable"; + public static final boolean IPC_SERVER_OBSERVER_STALE_RPC_ENABLE_DEFAULT = false; + public static final String IPC_SERVER_OBSERVER_STALE_RPC_INTERVAL = + "ipc.server.observer.stale.rpc.interval"; + public static final long IPC_SERVER_OBSERVER_STALE_RPC_DEFAULT = TimeUnit.SECONDS.toNanos(15); + /** * @see * diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 53497e9707807..afed4b00fced3 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -494,6 +494,8 @@ protected ResponseBuffer initialValue() { private int socketSendBufferSize; private final int maxDataLength; private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm + private final boolean rpcStableEnable; + private final long rpcStableInterval; volatile private boolean running = true; // true while server runs private CallQueueManager callQueue; @@ -976,6 +978,7 @@ public static class Call implements Schedulable, // Serialized RouterFederatedStateProto message to // store last seen states for multiple namespaces. private ByteString federatedNamespaceState; + private boolean isStale; Call() { this(RpcConstants.INVALID_CALL_ID, RpcConstants.INVALID_RETRY_COUNT, @@ -1009,6 +1012,15 @@ public Call(int id, int retryCount, Void ignore1, Void ignore2, this.callerContext = callerContext; this.clientStateId = Long.MIN_VALUE; this.isCallCoordinated = false; + this.isStale = false; + } + + public boolean isStale() { + return isStale; + } + + public void setStale(boolean stale) { + isStale = stale; } /** @@ -1243,6 +1255,9 @@ public Void run() throws Exception { ResponseParams responseParams = new ResponseParams(); try { + if (isStale()) { + throw new ObserverRetryOnActiveException("The rpc call in observer is stable."); + } value = call( rpcKind, connection.protocolName, rpcRequest, getTimestampNanos()); } catch (Throwable e) { @@ -3177,10 +3192,15 @@ public void run() { * In case of Observer, it handles only reads, which are * commutative. */ - // Re-queue the call and continue - requeueCall(call); - call = null; - continue; + if (rpcStableEnable && startTimeNanos - call.timestampNanos > rpcStableInterval) { + call.setStale(true); + rpcMetrics.incrStableCalls(); + } else { + // Re-queue the call and continue + requeueCall(call); + call = null; + continue; + } } LOG.debug("{}: {} for RpcKind {}.", Thread.currentThread().getName(), call, call.rpcKind); CurCall.set(call); @@ -3341,6 +3361,17 @@ protected Server(String bindAddress, int port, CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY, CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT); + this.rpcStableEnable = + conf.getBoolean(CommonConfigurationKeys.IPC_SERVER_OBSERVER_STALE_RPC_ENABLE, + CommonConfigurationKeys.IPC_SERVER_OBSERVER_STALE_RPC_ENABLE_DEFAULT); + if (this.rpcStableEnable) { + this.rpcStableInterval = + conf.getTimeDuration(CommonConfigurationKeys.IPC_SERVER_OBSERVER_STALE_RPC_INTERVAL, + CommonConfigurationKeys.IPC_SERVER_OBSERVER_STALE_RPC_DEFAULT, TimeUnit.NANOSECONDS); + } else { + this.rpcStableInterval = -1; + } + // Setup appropriate callqueue final String prefix = getQueueClassPrefix(); this.callQueue = new CallQueueManager<>( diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java index b9be973204d21..1f77c3c65f062 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java @@ -147,6 +147,8 @@ public static RpcMetrics create(Server server, Configuration conf) { MutableCounterLong rpcRequeueCalls; @Metric("Number of successful RPC calls") MutableCounterLong rpcCallSuccesses; + @Metric("Number of stale calls") + MutableCounterLong rpcStaleCalls; @Metric("Number of open connections") public int numOpenConnections() { return server.getNumOpenConnections(); @@ -363,6 +365,13 @@ public void incrRpcCallSuccesses() { rpcCallSuccesses.incr(); } + /** + * Increments the stale calls counter. + */ + public void incrStableCalls() { + rpcStaleCalls.incr(); + } + /** * Returns a MutableRate Counter. * @return Mutable Rate @@ -412,6 +421,15 @@ public long getRpcRequeueCalls() { return rpcRequeueCalls.value(); } + /** + * Returns the number of stale calls. + * @return long + */ + @VisibleForTesting + public long getRpcStaleCalls() { + return rpcStaleCalls.value(); + } + public MutableRate getDeferredRpcProcessingTime() { return deferredRpcProcessingTime; } diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index 5a5171056d048..1172c38347719 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -3834,6 +3834,23 @@ The switch to turn S3A auditing on or off. + + ipc.server.observer.stable.rpc.enable + true + + Whether to enable observer stable rpc. If enable when Observer NN's stateid is always + delayed it will thrown ObserverRetryOnActiveException after ipc.server.observer.stable.rpc.interval + is reached. + + + + + ipc.server.observer.stable.rpc.interval + 15 + + Times observer rpc is stable in seconds. + + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java index a293cb4d17c47..3888233217b8a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hdfs.server.namenode.ha; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_SERVER_OBSERVER_STALE_RPC_ENABLE; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_SERVER_OBSERVER_STALE_RPC_INTERVAL; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_OBSERVER_ENABLED_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY; import static org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter.getServiceState; @@ -106,6 +108,9 @@ public static void startUpCluster() throws Exception { // Observer and immediately try to read from it. conf.setTimeDuration( OBSERVER_PROBE_RETRY_PERIOD_KEY, 0, TimeUnit.MILLISECONDS); + conf.setBoolean(IPC_SERVER_OBSERVER_STALE_RPC_ENABLE, true); + conf.setTimeDuration(IPC_SERVER_OBSERVER_STALE_RPC_INTERVAL, + TimeUnit.SECONDS.toNanos(10), TimeUnit.NANOSECONDS); qjmhaCluster = HATestUtil.setUpObserverCluster(conf, 1, 1, true); dfsCluster = qjmhaCluster.getDfsCluster(); } @@ -170,6 +175,38 @@ public void testObserverRequeue() throws Exception { } } + @Test + public void testObserverStaleRpc() throws Exception { + FSNamesystem observerFsNS = dfsCluster.getNamesystem(2); + RpcMetrics obRpcMetrics = ((NameNodeRpcServer)dfsCluster + .getNameNodeRpc(2)).getClientRpcServer().getRpcMetrics(); + try { + // Stop EditlogTailer of Observer NameNode. + observerFsNS.getEditLogTailer().stop(); + + Path tmpTestPath = new Path("/TestObserverStableRpc"); + dfs.create(tmpTestPath, (short)1).close(); + assertSentTo(0); + // This operation will be blocked in ObserverNameNode + // until observer rpc stable time is reached + // and then throw ObserverRetryOnActiveException and client will retry to Active NN + FileStatus fileStatus = dfs.getFileStatus(tmpTestPath); + assertSentTo(0); + assertNotNull(fileStatus); + assertEquals(1, obRpcMetrics.getRpcStaleCalls()); + + observerFsNS.getEditLogTailer().doTailEdits(); + fileStatus = dfs.getFileStatus(tmpTestPath); + assertSentTo(2); + assertNotNull(fileStatus); + assertEquals(1, obRpcMetrics.getRpcStaleCalls()); + } finally { + EditLogTailer editLogTailer = new EditLogTailer(observerFsNS, conf); + observerFsNS.setEditLogTailerForTests(editLogTailer); + editLogTailer.start(); + } + } + @Test public void testNoActiveToObserver() throws Exception { try {