Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.apache.hadoop.crypto.OpensslAesCtrCryptoCodec;
import org.apache.hadoop.crypto.OpensslSm4CtrCryptoCodec;

import java.util.concurrent.TimeUnit;

/**
* This class contains constants for configuration keys used
* in the common code.
Expand Down Expand Up @@ -1077,6 +1079,13 @@ public class CommonConfigurationKeysPublic {
"ipc.server.metrics.update.runner.interval";
public static final int IPC_SERVER_METRICS_UPDATE_RUNNER_INTERVAL_DEFAULT = 5000;

public static final String IPC_SERVER_OBSERVER_STALE_RPC_ENABLE =
"ipc.server.observer.stale.rpc.enable";
public static final boolean IPC_SERVER_OBSERVER_STALE_RPC_ENABLE_DEFAULT = false;
public static final String IPC_SERVER_OBSERVER_STALE_RPC_INTERVAL =
"ipc.server.observer.stale.rpc.interval";
public static final long IPC_SERVER_OBSERVER_STALE_RPC_DEFAULT = TimeUnit.SECONDS.toNanos(15);

/**
* @see
* <a href="{@docRoot}/../hadoop-project-dist/hadoop-common/core-default.xml">
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -494,6 +494,8 @@ protected ResponseBuffer initialValue() {
private int socketSendBufferSize;
private final int maxDataLength;
private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
private final boolean rpcStableEnable;
private final long rpcStableInterval;

volatile private boolean running = true; // true while server runs
private CallQueueManager<Call> callQueue;
Expand Down Expand Up @@ -976,6 +978,7 @@ public static class Call implements Schedulable,
// Serialized RouterFederatedStateProto message to
// store last seen states for multiple namespaces.
private ByteString federatedNamespaceState;
private boolean isStale;

Call() {
this(RpcConstants.INVALID_CALL_ID, RpcConstants.INVALID_RETRY_COUNT,
Expand Down Expand Up @@ -1009,6 +1012,15 @@ public Call(int id, int retryCount, Void ignore1, Void ignore2,
this.callerContext = callerContext;
this.clientStateId = Long.MIN_VALUE;
this.isCallCoordinated = false;
this.isStale = false;
}

public boolean isStale() {
return isStale;
}

public void setStale(boolean stale) {
isStale = stale;
}

/**
Expand Down Expand Up @@ -1243,6 +1255,9 @@ public Void run() throws Exception {
ResponseParams responseParams = new ResponseParams();

try {
if (isStale()) {
throw new ObserverRetryOnActiveException("The rpc call in observer is stable.");
}
value = call(
rpcKind, connection.protocolName, rpcRequest, getTimestampNanos());
} catch (Throwable e) {
Expand Down Expand Up @@ -3177,10 +3192,15 @@ public void run() {
* In case of Observer, it handles only reads, which are
* commutative.
*/
// Re-queue the call and continue
requeueCall(call);
call = null;
continue;
if (rpcStableEnable && startTimeNanos - call.timestampNanos > rpcStableInterval) {
call.setStale(true);
rpcMetrics.incrStableCalls();
} else {
// Re-queue the call and continue
requeueCall(call);
call = null;
continue;
}
}
LOG.debug("{}: {} for RpcKind {}.", Thread.currentThread().getName(), call, call.rpcKind);
CurCall.set(call);
Expand Down Expand Up @@ -3341,6 +3361,17 @@ protected Server(String bindAddress, int port,
CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY,
CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT);

this.rpcStableEnable =
conf.getBoolean(CommonConfigurationKeys.IPC_SERVER_OBSERVER_STALE_RPC_ENABLE,
CommonConfigurationKeys.IPC_SERVER_OBSERVER_STALE_RPC_ENABLE_DEFAULT);
if (this.rpcStableEnable) {
this.rpcStableInterval =
conf.getTimeDuration(CommonConfigurationKeys.IPC_SERVER_OBSERVER_STALE_RPC_INTERVAL,
CommonConfigurationKeys.IPC_SERVER_OBSERVER_STALE_RPC_DEFAULT, TimeUnit.NANOSECONDS);
} else {
this.rpcStableInterval = -1;
}

// Setup appropriate callqueue
final String prefix = getQueueClassPrefix();
this.callQueue = new CallQueueManager<>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ public static RpcMetrics create(Server server, Configuration conf) {
MutableCounterLong rpcRequeueCalls;
@Metric("Number of successful RPC calls")
MutableCounterLong rpcCallSuccesses;
@Metric("Number of stale calls")
MutableCounterLong rpcStaleCalls;

@Metric("Number of open connections") public int numOpenConnections() {
return server.getNumOpenConnections();
Expand Down Expand Up @@ -363,6 +365,13 @@ public void incrRpcCallSuccesses() {
rpcCallSuccesses.incr();
}

/**
* Increments the stale calls counter.
*/
public void incrStableCalls() {
rpcStaleCalls.incr();
}

/**
* Returns a MutableRate Counter.
* @return Mutable Rate
Expand Down Expand Up @@ -412,6 +421,15 @@ public long getRpcRequeueCalls() {
return rpcRequeueCalls.value();
}

/**
* Returns the number of stale calls.
* @return long
*/
@VisibleForTesting
public long getRpcStaleCalls() {
return rpcStaleCalls.value();
}

public MutableRate getDeferredRpcProcessingTime() {
return deferredRpcProcessingTime;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3834,6 +3834,23 @@ The switch to turn S3A auditing on or off.
</description>
</property>

<property>
<name>ipc.server.observer.stable.rpc.enable</name>
<value>true</value>
<description>
Whether to enable observer stable rpc. If enable when Observer NN's stateid is always
delayed it will thrown ObserverRetryOnActiveException after ipc.server.observer.stable.rpc.interval
is reached.
</description>
</property>

<property>
<name>ipc.server.observer.stable.rpc.interval</name>
<value>15</value>
<description>
Times observer rpc is stable in seconds.
</description>
</property>

<!-- YARN registry -->

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.namenode.ha;

import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_SERVER_OBSERVER_STALE_RPC_ENABLE;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_SERVER_OBSERVER_STALE_RPC_INTERVAL;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_OBSERVER_ENABLED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY;
import static org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter.getServiceState;
Expand Down Expand Up @@ -106,6 +108,9 @@ public static void startUpCluster() throws Exception {
// Observer and immediately try to read from it.
conf.setTimeDuration(
OBSERVER_PROBE_RETRY_PERIOD_KEY, 0, TimeUnit.MILLISECONDS);
conf.setBoolean(IPC_SERVER_OBSERVER_STALE_RPC_ENABLE, true);
conf.setTimeDuration(IPC_SERVER_OBSERVER_STALE_RPC_INTERVAL,
TimeUnit.SECONDS.toNanos(10), TimeUnit.NANOSECONDS);
qjmhaCluster = HATestUtil.setUpObserverCluster(conf, 1, 1, true);
dfsCluster = qjmhaCluster.getDfsCluster();
}
Expand Down Expand Up @@ -170,6 +175,38 @@ public void testObserverRequeue() throws Exception {
}
}

@Test
public void testObserverStaleRpc() throws Exception {
FSNamesystem observerFsNS = dfsCluster.getNamesystem(2);
RpcMetrics obRpcMetrics = ((NameNodeRpcServer)dfsCluster
.getNameNodeRpc(2)).getClientRpcServer().getRpcMetrics();
try {
// Stop EditlogTailer of Observer NameNode.
observerFsNS.getEditLogTailer().stop();

Path tmpTestPath = new Path("/TestObserverStableRpc");
dfs.create(tmpTestPath, (short)1).close();
assertSentTo(0);
// This operation will be blocked in ObserverNameNode
// until observer rpc stable time is reached
// and then throw ObserverRetryOnActiveException and client will retry to Active NN
FileStatus fileStatus = dfs.getFileStatus(tmpTestPath);
assertSentTo(0);
assertNotNull(fileStatus);
assertEquals(1, obRpcMetrics.getRpcStaleCalls());

observerFsNS.getEditLogTailer().doTailEdits();
fileStatus = dfs.getFileStatus(tmpTestPath);
assertSentTo(2);
assertNotNull(fileStatus);
assertEquals(1, obRpcMetrics.getRpcStaleCalls());
} finally {
EditLogTailer editLogTailer = new EditLogTailer(observerFsNS, conf);
observerFsNS.setEditLogTailerForTests(editLogTailer);
editLogTailer.start();
}
}

@Test
public void testNoActiveToObserver() throws Exception {
try {
Expand Down