Skip to content

Commit 982fa77

Browse files
author
lgh
committed
[SBN READ] Observer should throw ObserverRetryOnActiveException if stateid is always delayed with Active Namenode for a period of time
1 parent b4fed58 commit 982fa77

File tree

4 files changed

+96
-4
lines changed

4 files changed

+96
-4
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import org.apache.hadoop.crypto.OpensslAesCtrCryptoCodec;
2626
import org.apache.hadoop.crypto.OpensslSm4CtrCryptoCodec;
2727

28+
import java.util.concurrent.TimeUnit;
29+
2830
/**
2931
* This class contains constants for configuration keys used
3032
* in the common code.
@@ -1077,6 +1079,13 @@ public class CommonConfigurationKeysPublic {
10771079
"ipc.server.metrics.update.runner.interval";
10781080
public static final int IPC_SERVER_METRICS_UPDATE_RUNNER_INTERVAL_DEFAULT = 5000;
10791081

1082+
public static final String IPC_SERVER_OBSERVER_STABLE_RPC_ENABLE =
1083+
"ipc.server.observer.stable.rpc.enable";
1084+
public static final boolean IPC_SERVER_OBSERVER_STABLE_RPC_ENABLE_DEFAULT = false;
1085+
public static final String IPC_SERVER_OBSERVER_STABLE_RPC_INTERVAL =
1086+
"ipc.server.observer.stable.rpc.interval";
1087+
public static final long IPC_SERVER_OBSERVER_STABLE_RPC_DEFAULT = TimeUnit.SECONDS.toNanos(15);
1088+
10801089
/**
10811090
* @see
10821091
* <a href="{@docRoot}/../hadoop-project-dist/hadoop-common/core-default.xml">

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -494,6 +494,8 @@ protected ResponseBuffer initialValue() {
494494
private int socketSendBufferSize;
495495
private final int maxDataLength;
496496
private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm
497+
private final boolean rpcStableEnable;
498+
private final long rpcStableInterval;
497499

498500
volatile private boolean running = true; // true while server runs
499501
private CallQueueManager<Call> callQueue;
@@ -976,6 +978,7 @@ public static class Call implements Schedulable,
976978
// Serialized RouterFederatedStateProto message to
977979
// store last seen states for multiple namespaces.
978980
private ByteString federatedNamespaceState;
981+
private boolean isStable;
979982

980983
Call() {
981984
this(RpcConstants.INVALID_CALL_ID, RpcConstants.INVALID_RETRY_COUNT,
@@ -1009,6 +1012,15 @@ public Call(int id, int retryCount, Void ignore1, Void ignore2,
10091012
this.callerContext = callerContext;
10101013
this.clientStateId = Long.MIN_VALUE;
10111014
this.isCallCoordinated = false;
1015+
this.isStable = false;
1016+
}
1017+
1018+
public boolean isStable() {
1019+
return isStable;
1020+
}
1021+
1022+
public void setStable(boolean stable) {
1023+
isStable = stable;
10121024
}
10131025

10141026
/**
@@ -1243,6 +1255,9 @@ public Void run() throws Exception {
12431255
ResponseParams responseParams = new ResponseParams();
12441256

12451257
try {
1258+
if (isStable()) {
1259+
throw new ObserverRetryOnActiveException("The rpc call in observer is stable.");
1260+
}
12461261
value = call(
12471262
rpcKind, connection.protocolName, rpcRequest, getTimestampNanos());
12481263
} catch (Throwable e) {
@@ -3177,10 +3192,14 @@ public void run() {
31773192
* In case of Observer, it handles only reads, which are
31783193
* commutative.
31793194
*/
3180-
// Re-queue the call and continue
3181-
requeueCall(call);
3182-
call = null;
3183-
continue;
3195+
if (rpcStableEnable && startTimeNanos - call.timestampNanos > rpcStableInterval) {
3196+
call.setStable(true);
3197+
} else {
3198+
// Re-queue the call and continue
3199+
requeueCall(call);
3200+
call = null;
3201+
continue;
3202+
}
31843203
}
31853204
LOG.debug("{}: {} for RpcKind {}.", Thread.currentThread().getName(), call, call.rpcKind);
31863205
CurCall.set(call);
@@ -3341,6 +3360,17 @@ protected Server(String bindAddress, int port,
33413360
CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY,
33423361
CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT);
33433362

3363+
this.rpcStableEnable =
3364+
conf.getBoolean(CommonConfigurationKeys.IPC_SERVER_OBSERVER_STABLE_RPC_ENABLE,
3365+
CommonConfigurationKeys.IPC_SERVER_OBSERVER_STABLE_RPC_ENABLE_DEFAULT);
3366+
if (this.rpcStableEnable) {
3367+
this.rpcStableInterval =
3368+
conf.getTimeDuration(CommonConfigurationKeys.IPC_SERVER_OBSERVER_STABLE_RPC_INTERVAL,
3369+
CommonConfigurationKeys.IPC_SERVER_OBSERVER_STABLE_RPC_DEFAULT, TimeUnit.NANOSECONDS);
3370+
} else {
3371+
this.rpcStableInterval = -1;
3372+
}
3373+
33443374
// Setup appropriate callqueue
33453375
final String prefix = getQueueClassPrefix();
33463376
this.callQueue = new CallQueueManager<>(

hadoop-common-project/hadoop-common/src/main/resources/core-default.xml

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3834,6 +3834,23 @@ The switch to turn S3A auditing on or off.
38343834
</description>
38353835
</property>
38363836

3837+
<property>
3838+
<name>ipc.server.observer.stable.rpc.enable</name>
3839+
<value>true</value>
3840+
<description>
3841+
Whether to enable observer stable rpc. If enable when Observer NN's stateid is always
3842+
delayed it will thrown ObserverRetryOnActiveException after ipc.server.observer.stable.rpc.interval
3843+
is reached.
3844+
</description>
3845+
</property>
3846+
3847+
<property>
3848+
<name>ipc.server.observer.stable.rpc.interval</name>
3849+
<value>15</value>
3850+
<description>
3851+
Times observer rpc is stable in seconds.
3852+
</description>
3853+
</property>
38373854

38383855
<!-- YARN registry -->
38393856

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@
1717
*/
1818
package org.apache.hadoop.hdfs.server.namenode.ha;
1919

20+
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_SERVER_OBSERVER_STABLE_RPC_DEFAULT;
21+
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_SERVER_OBSERVER_STABLE_RPC_ENABLE;
22+
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_SERVER_OBSERVER_STABLE_RPC_ENABLE_DEFAULT;
23+
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_SERVER_OBSERVER_STABLE_RPC_INTERVAL;
2024
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_OBSERVER_ENABLED_KEY;
2125
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STATE_CONTEXT_ENABLED_KEY;
2226
import static org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter.getServiceState;
@@ -59,6 +63,7 @@
5963
import org.apache.hadoop.hdfs.protocol.Block;
6064
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
6165
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
66+
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
6267
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
6368
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
6469
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
@@ -106,6 +111,9 @@ public static void startUpCluster() throws Exception {
106111
// Observer and immediately try to read from it.
107112
conf.setTimeDuration(
108113
OBSERVER_PROBE_RETRY_PERIOD_KEY, 0, TimeUnit.MILLISECONDS);
114+
conf.setBoolean(IPC_SERVER_OBSERVER_STABLE_RPC_ENABLE, true);
115+
conf.setTimeDuration(IPC_SERVER_OBSERVER_STABLE_RPC_INTERVAL,
116+
TimeUnit.SECONDS.toNanos(10), TimeUnit.NANOSECONDS);
109117
qjmhaCluster = HATestUtil.setUpObserverCluster(conf, 1, 1, true);
110118
dfsCluster = qjmhaCluster.getDfsCluster();
111119
}
@@ -170,6 +178,34 @@ public void testObserverRequeue() throws Exception {
170178
}
171179
}
172180

181+
@Test
182+
public void testObserverStableRpc() throws Exception {
183+
FSNamesystem observerFsNS = dfsCluster.getNamesystem(2);
184+
try {
185+
// Stop EditlogTailer of Observer NameNode.
186+
observerFsNS.getEditLogTailer().stop();
187+
188+
Path tmpTestPath = new Path("/TestObserverStableRpc");
189+
dfs.create(tmpTestPath, (short)1).close();
190+
assertSentTo(0);
191+
// This operation will be blocked in ObserverNameNode
192+
// until observer rpc stable time is reached
193+
// and then throw ObserverRetryOnActiveException and client will retry to Active NN
194+
FileStatus fileStatus = dfs.getFileStatus(tmpTestPath);
195+
assertSentTo(0);
196+
assertNotNull(fileStatus);
197+
198+
observerFsNS.getEditLogTailer().doTailEdits();
199+
fileStatus = dfs.getFileStatus(tmpTestPath);
200+
assertSentTo(2);
201+
assertNotNull(fileStatus);
202+
} finally {
203+
EditLogTailer editLogTailer = new EditLogTailer(observerFsNS, conf);
204+
observerFsNS.setEditLogTailerForTests(editLogTailer);
205+
editLogTailer.start();
206+
}
207+
}
208+
173209
@Test
174210
public void testNoActiveToObserver() throws Exception {
175211
try {

0 commit comments

Comments
 (0)