From 977035a5a140ace03c0ec32438ee49933ca95d09 Mon Sep 17 00:00:00 2001 From: lgh Date: Fri, 22 Dec 2023 18:03:26 +0800 Subject: [PATCH 1/3] [SBN READ] Observer should throw ObserverRetryOnActiveException if stateid is always delayed with Active Namenode for a period of time --- .../fs/CommonConfigurationKeysPublic.java | 9 +++++ .../java/org/apache/hadoop/ipc/Server.java | 38 +++++++++++++++++-- .../src/main/resources/core-default.xml | 17 +++++++++ .../server/namenode/ha/TestObserverNode.java | 36 ++++++++++++++++++ 4 files changed, 96 insertions(+), 4 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java index 24a3167b3db2d..b87e54c194dd3 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java @@ -25,6 +25,8 @@ import org.apache.hadoop.crypto.OpensslAesCtrCryptoCodec; import org.apache.hadoop.crypto.OpensslSm4CtrCryptoCodec; +import java.util.concurrent.TimeUnit; + /** * This class contains constants for configuration keys used * in the common code. @@ -1077,6 +1079,13 @@ public class CommonConfigurationKeysPublic { "ipc.server.metrics.update.runner.interval"; public static final int IPC_SERVER_METRICS_UPDATE_RUNNER_INTERVAL_DEFAULT = 5000; + public static final String IPC_SERVER_OBSERVER_STABLE_RPC_ENABLE = + "ipc.server.observer.stable.rpc.enable"; + public static final boolean IPC_SERVER_OBSERVER_STABLE_RPC_ENABLE_DEFAULT = false; + public static final String IPC_SERVER_OBSERVER_STABLE_RPC_INTERVAL = + "ipc.server.observer.stable.rpc.interval"; + public static final long IPC_SERVER_OBSERVER_STABLE_RPC_DEFAULT = TimeUnit.SECONDS.toNanos(15); + /** * @see * diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 53497e9707807..65d024c2ce5ca 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -494,6 +494,8 @@ protected ResponseBuffer initialValue() { private int socketSendBufferSize; private final int maxDataLength; private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm + private final boolean rpcStableEnable; + private final long rpcStableInterval; volatile private boolean running = true; // true while server runs private CallQueueManager callQueue; @@ -976,6 +978,7 @@ public static class Call implements Schedulable, // Serialized RouterFederatedStateProto message to // store last seen states for multiple namespaces. private ByteString federatedNamespaceState; + private boolean isStable; Call() { this(RpcConstants.INVALID_CALL_ID, RpcConstants.INVALID_RETRY_COUNT, @@ -1009,6 +1012,15 @@ public Call(int id, int retryCount, Void ignore1, Void ignore2, this.callerContext = callerContext; this.clientStateId = Long.MIN_VALUE; this.isCallCoordinated = false; + this.isStable = false; + } + + public boolean isStable() { + return isStable; + } + + public void setStable(boolean stable) { + isStable = stable; } /** @@ -1243,6 +1255,9 @@ public Void run() throws Exception { ResponseParams responseParams = new ResponseParams(); try { + if (isStable()) { + throw new ObserverRetryOnActiveException("The rpc call in observer is stable."); + } value = call( rpcKind, connection.protocolName, rpcRequest, getTimestampNanos()); } catch (Throwable e) { @@ -3177,10 +3192,14 @@ public void run() { * In case of Observer, it handles only reads, which are * commutative. */ - // Re-queue the call and continue - requeueCall(call); - call = null; - continue; + if (rpcStableEnable && startTimeNanos - call.timestampNanos > rpcStableInterval) { + call.setStable(true); + } else { + // Re-queue the call and continue + requeueCall(call); + call = null; + continue; + } } LOG.debug("{}: {} for RpcKind {}.", Thread.currentThread().getName(), call, call.rpcKind); CurCall.set(call); @@ -3341,6 +3360,17 @@ protected Server(String bindAddress, int port, CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY, CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT); + this.rpcStableEnable = + conf.getBoolean(CommonConfigurationKeys.IPC_SERVER_OBSERVER_STABLE_RPC_ENABLE, + CommonConfigurationKeys.IPC_SERVER_OBSERVER_STABLE_RPC_ENABLE_DEFAULT); + if (this.rpcStableEnable) { + this.rpcStableInterval = + conf.getTimeDuration(CommonConfigurationKeys.IPC_SERVER_OBSERVER_STABLE_RPC_INTERVAL, + CommonConfigurationKeys.IPC_SERVER_OBSERVER_STABLE_RPC_DEFAULT, TimeUnit.NANOSECONDS); + } else { + this.rpcStableInterval = -1; + } + // Setup appropriate callqueue final String prefix = getQueueClassPrefix(); this.callQueue = new CallQueueManager<>( diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index 5a5171056d048..1172c38347719 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -3834,6 +3834,23 @@ The switch to turn S3A auditing on or off. + + ipc.server.observer.stable.rpc.enable + true + + Whether to enable observer stable rpc. If enable when Observer NN's stateid is always + delayed it will thrown ObserverRetryOnActiveException after ipc.server.observer.stable.rpc.interval + is reached. + + + + + ipc.server.observer.stable.rpc.interval + 15 + + Times observer rpc is stable in seconds. + + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java index a293cb4d17c47..36e92f385dd82 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java @@ -17,6 +17,10 @@ */ package org.apache.hadoop.hdfs.server.namenode.ha; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_SERVER_OBSERVER_STABLE_RPC_DEFAULT; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_SERVER_OBSERVER_STABLE_RPC_ENABLE; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_SERVER_OBSERVER_STABLE_RPC_ENABLE_DEFAULT; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_SERVER_OBSERVER_STABLE_RPC_INTERVAL; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_OBSERVER_ENABLED_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY; import static org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter.getServiceState; @@ -59,6 +63,7 @@ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster; @@ -106,6 +111,9 @@ public static void startUpCluster() throws Exception { // Observer and immediately try to read from it. conf.setTimeDuration( OBSERVER_PROBE_RETRY_PERIOD_KEY, 0, TimeUnit.MILLISECONDS); + conf.setBoolean(IPC_SERVER_OBSERVER_STABLE_RPC_ENABLE, true); + conf.setTimeDuration(IPC_SERVER_OBSERVER_STABLE_RPC_INTERVAL, + TimeUnit.SECONDS.toNanos(10), TimeUnit.NANOSECONDS); qjmhaCluster = HATestUtil.setUpObserverCluster(conf, 1, 1, true); dfsCluster = qjmhaCluster.getDfsCluster(); } @@ -170,6 +178,34 @@ public void testObserverRequeue() throws Exception { } } + @Test + public void testObserverStableRpc() throws Exception { + FSNamesystem observerFsNS = dfsCluster.getNamesystem(2); + try { + // Stop EditlogTailer of Observer NameNode. + observerFsNS.getEditLogTailer().stop(); + + Path tmpTestPath = new Path("/TestObserverStableRpc"); + dfs.create(tmpTestPath, (short)1).close(); + assertSentTo(0); + // This operation will be blocked in ObserverNameNode + // until observer rpc stable time is reached + // and then throw ObserverRetryOnActiveException and client will retry to Active NN + FileStatus fileStatus = dfs.getFileStatus(tmpTestPath); + assertSentTo(0); + assertNotNull(fileStatus); + + observerFsNS.getEditLogTailer().doTailEdits(); + fileStatus = dfs.getFileStatus(tmpTestPath); + assertSentTo(2); + assertNotNull(fileStatus); + } finally { + EditLogTailer editLogTailer = new EditLogTailer(observerFsNS, conf); + observerFsNS.setEditLogTailerForTests(editLogTailer); + editLogTailer.start(); + } + } + @Test public void testNoActiveToObserver() throws Exception { try { From 02d21e1316a674fa225cc8fd1a63ea2fba9f77aa Mon Sep 17 00:00:00 2001 From: lgh Date: Tue, 26 Dec 2023 14:25:35 +0800 Subject: [PATCH 2/3] add metrics --- .../java/org/apache/hadoop/ipc/Server.java | 1 + .../apache/hadoop/ipc/metrics/RpcMetrics.java | 18 ++++++++++++++++++ .../server/namenode/ha/TestObserverNode.java | 5 ++++- 3 files changed, 23 insertions(+), 1 deletion(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 65d024c2ce5ca..26d3ef294a727 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -3194,6 +3194,7 @@ public void run() { */ if (rpcStableEnable && startTimeNanos - call.timestampNanos > rpcStableInterval) { call.setStable(true); + rpcMetrics.incrStableCalls(); } else { // Re-queue the call and continue requeueCall(call); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java index b9be973204d21..fb8e0c5425624 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java @@ -147,6 +147,8 @@ public static RpcMetrics create(Server server, Configuration conf) { MutableCounterLong rpcRequeueCalls; @Metric("Number of successful RPC calls") MutableCounterLong rpcCallSuccesses; + @Metric("Number of stable calls") + MutableCounterLong rpcStableCalls; @Metric("Number of open connections") public int numOpenConnections() { return server.getNumOpenConnections(); @@ -363,6 +365,13 @@ public void incrRpcCallSuccesses() { rpcCallSuccesses.incr(); } + /** + * Increments the stable calls counter. + */ + public void incrStableCalls() { + rpcStableCalls.incr(); + } + /** * Returns a MutableRate Counter. * @return Mutable Rate @@ -412,6 +421,15 @@ public long getRpcRequeueCalls() { return rpcRequeueCalls.value(); } + /** + * Returns the number of stable calls. + * @return long + */ + @VisibleForTesting + public long getRpcStableCalls() { + return rpcStableCalls.value(); + } + public MutableRate getDeferredRpcProcessingTime() { return deferredRpcProcessingTime; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java index 36e92f385dd82..0e5eda020cdf4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java @@ -63,7 +63,6 @@ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster; @@ -181,6 +180,8 @@ public void testObserverRequeue() throws Exception { @Test public void testObserverStableRpc() throws Exception { FSNamesystem observerFsNS = dfsCluster.getNamesystem(2); + RpcMetrics obRpcMetrics = ((NameNodeRpcServer)dfsCluster + .getNameNodeRpc(2)).getClientRpcServer().getRpcMetrics(); try { // Stop EditlogTailer of Observer NameNode. observerFsNS.getEditLogTailer().stop(); @@ -194,11 +195,13 @@ public void testObserverStableRpc() throws Exception { FileStatus fileStatus = dfs.getFileStatus(tmpTestPath); assertSentTo(0); assertNotNull(fileStatus); + assertEquals(1, obRpcMetrics.getRpcStableCalls()); observerFsNS.getEditLogTailer().doTailEdits(); fileStatus = dfs.getFileStatus(tmpTestPath); assertSentTo(2); assertNotNull(fileStatus); + assertEquals(1, obRpcMetrics.getRpcStableCalls()); } finally { EditLogTailer editLogTailer = new EditLogTailer(observerFsNS, conf); observerFsNS.setEditLogTailerForTests(editLogTailer); From a2e4edfd92c992a32c93fe8990ba8051ecb02415 Mon Sep 17 00:00:00 2001 From: lgh Date: Wed, 3 Jan 2024 11:34:00 +0800 Subject: [PATCH 3/3] rename wrong var stable to stale --- .../fs/CommonConfigurationKeysPublic.java | 12 +++++----- .../java/org/apache/hadoop/ipc/Server.java | 24 +++++++++---------- .../apache/hadoop/ipc/metrics/RpcMetrics.java | 14 +++++------ .../server/namenode/ha/TestObserverNode.java | 16 ++++++------- 4 files changed, 32 insertions(+), 34 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java index b87e54c194dd3..55815c3a74f38 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java @@ -1079,12 +1079,12 @@ public class CommonConfigurationKeysPublic { "ipc.server.metrics.update.runner.interval"; public static final int IPC_SERVER_METRICS_UPDATE_RUNNER_INTERVAL_DEFAULT = 5000; - public static final String IPC_SERVER_OBSERVER_STABLE_RPC_ENABLE = - "ipc.server.observer.stable.rpc.enable"; - public static final boolean IPC_SERVER_OBSERVER_STABLE_RPC_ENABLE_DEFAULT = false; - public static final String IPC_SERVER_OBSERVER_STABLE_RPC_INTERVAL = - "ipc.server.observer.stable.rpc.interval"; - public static final long IPC_SERVER_OBSERVER_STABLE_RPC_DEFAULT = TimeUnit.SECONDS.toNanos(15); + public static final String IPC_SERVER_OBSERVER_STALE_RPC_ENABLE = + "ipc.server.observer.stale.rpc.enable"; + public static final boolean IPC_SERVER_OBSERVER_STALE_RPC_ENABLE_DEFAULT = false; + public static final String IPC_SERVER_OBSERVER_STALE_RPC_INTERVAL = + "ipc.server.observer.stale.rpc.interval"; + public static final long IPC_SERVER_OBSERVER_STALE_RPC_DEFAULT = TimeUnit.SECONDS.toNanos(15); /** * @see diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 26d3ef294a727..afed4b00fced3 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -978,7 +978,7 @@ public static class Call implements Schedulable, // Serialized RouterFederatedStateProto message to // store last seen states for multiple namespaces. private ByteString federatedNamespaceState; - private boolean isStable; + private boolean isStale; Call() { this(RpcConstants.INVALID_CALL_ID, RpcConstants.INVALID_RETRY_COUNT, @@ -1012,15 +1012,15 @@ public Call(int id, int retryCount, Void ignore1, Void ignore2, this.callerContext = callerContext; this.clientStateId = Long.MIN_VALUE; this.isCallCoordinated = false; - this.isStable = false; + this.isStale = false; } - public boolean isStable() { - return isStable; + public boolean isStale() { + return isStale; } - public void setStable(boolean stable) { - isStable = stable; + public void setStale(boolean stale) { + isStale = stale; } /** @@ -1255,7 +1255,7 @@ public Void run() throws Exception { ResponseParams responseParams = new ResponseParams(); try { - if (isStable()) { + if (isStale()) { throw new ObserverRetryOnActiveException("The rpc call in observer is stable."); } value = call( @@ -3193,7 +3193,7 @@ public void run() { * commutative. */ if (rpcStableEnable && startTimeNanos - call.timestampNanos > rpcStableInterval) { - call.setStable(true); + call.setStale(true); rpcMetrics.incrStableCalls(); } else { // Re-queue the call and continue @@ -3362,12 +3362,12 @@ protected Server(String bindAddress, int port, CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT); this.rpcStableEnable = - conf.getBoolean(CommonConfigurationKeys.IPC_SERVER_OBSERVER_STABLE_RPC_ENABLE, - CommonConfigurationKeys.IPC_SERVER_OBSERVER_STABLE_RPC_ENABLE_DEFAULT); + conf.getBoolean(CommonConfigurationKeys.IPC_SERVER_OBSERVER_STALE_RPC_ENABLE, + CommonConfigurationKeys.IPC_SERVER_OBSERVER_STALE_RPC_ENABLE_DEFAULT); if (this.rpcStableEnable) { this.rpcStableInterval = - conf.getTimeDuration(CommonConfigurationKeys.IPC_SERVER_OBSERVER_STABLE_RPC_INTERVAL, - CommonConfigurationKeys.IPC_SERVER_OBSERVER_STABLE_RPC_DEFAULT, TimeUnit.NANOSECONDS); + conf.getTimeDuration(CommonConfigurationKeys.IPC_SERVER_OBSERVER_STALE_RPC_INTERVAL, + CommonConfigurationKeys.IPC_SERVER_OBSERVER_STALE_RPC_DEFAULT, TimeUnit.NANOSECONDS); } else { this.rpcStableInterval = -1; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java index fb8e0c5425624..1f77c3c65f062 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java @@ -147,8 +147,8 @@ public static RpcMetrics create(Server server, Configuration conf) { MutableCounterLong rpcRequeueCalls; @Metric("Number of successful RPC calls") MutableCounterLong rpcCallSuccesses; - @Metric("Number of stable calls") - MutableCounterLong rpcStableCalls; + @Metric("Number of stale calls") + MutableCounterLong rpcStaleCalls; @Metric("Number of open connections") public int numOpenConnections() { return server.getNumOpenConnections(); @@ -366,10 +366,10 @@ public void incrRpcCallSuccesses() { } /** - * Increments the stable calls counter. + * Increments the stale calls counter. */ public void incrStableCalls() { - rpcStableCalls.incr(); + rpcStaleCalls.incr(); } /** @@ -422,12 +422,12 @@ public long getRpcRequeueCalls() { } /** - * Returns the number of stable calls. + * Returns the number of stale calls. * @return long */ @VisibleForTesting - public long getRpcStableCalls() { - return rpcStableCalls.value(); + public long getRpcStaleCalls() { + return rpcStaleCalls.value(); } public MutableRate getDeferredRpcProcessingTime() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java index 0e5eda020cdf4..3888233217b8a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java @@ -17,10 +17,8 @@ */ package org.apache.hadoop.hdfs.server.namenode.ha; -import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_SERVER_OBSERVER_STABLE_RPC_DEFAULT; -import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_SERVER_OBSERVER_STABLE_RPC_ENABLE; -import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_SERVER_OBSERVER_STABLE_RPC_ENABLE_DEFAULT; -import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_SERVER_OBSERVER_STABLE_RPC_INTERVAL; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_SERVER_OBSERVER_STALE_RPC_ENABLE; +import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_SERVER_OBSERVER_STALE_RPC_INTERVAL; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_OBSERVER_ENABLED_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY; import static org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter.getServiceState; @@ -110,8 +108,8 @@ public static void startUpCluster() throws Exception { // Observer and immediately try to read from it. conf.setTimeDuration( OBSERVER_PROBE_RETRY_PERIOD_KEY, 0, TimeUnit.MILLISECONDS); - conf.setBoolean(IPC_SERVER_OBSERVER_STABLE_RPC_ENABLE, true); - conf.setTimeDuration(IPC_SERVER_OBSERVER_STABLE_RPC_INTERVAL, + conf.setBoolean(IPC_SERVER_OBSERVER_STALE_RPC_ENABLE, true); + conf.setTimeDuration(IPC_SERVER_OBSERVER_STALE_RPC_INTERVAL, TimeUnit.SECONDS.toNanos(10), TimeUnit.NANOSECONDS); qjmhaCluster = HATestUtil.setUpObserverCluster(conf, 1, 1, true); dfsCluster = qjmhaCluster.getDfsCluster(); @@ -178,7 +176,7 @@ public void testObserverRequeue() throws Exception { } @Test - public void testObserverStableRpc() throws Exception { + public void testObserverStaleRpc() throws Exception { FSNamesystem observerFsNS = dfsCluster.getNamesystem(2); RpcMetrics obRpcMetrics = ((NameNodeRpcServer)dfsCluster .getNameNodeRpc(2)).getClientRpcServer().getRpcMetrics(); @@ -195,13 +193,13 @@ public void testObserverStableRpc() throws Exception { FileStatus fileStatus = dfs.getFileStatus(tmpTestPath); assertSentTo(0); assertNotNull(fileStatus); - assertEquals(1, obRpcMetrics.getRpcStableCalls()); + assertEquals(1, obRpcMetrics.getRpcStaleCalls()); observerFsNS.getEditLogTailer().doTailEdits(); fileStatus = dfs.getFileStatus(tmpTestPath); assertSentTo(2); assertNotNull(fileStatus); - assertEquals(1, obRpcMetrics.getRpcStableCalls()); + assertEquals(1, obRpcMetrics.getRpcStaleCalls()); } finally { EditLogTailer editLogTailer = new EditLogTailer(observerFsNS, conf); observerFsNS.setEditLogTailerForTests(editLogTailer);