Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hadoop.hdfs.server.namenode.ha.ClientHAProxyFactory;
import org.apache.hadoop.hdfs.server.namenode.ha.HAProxyFactory;
import org.apache.hadoop.ipc.AlignmentContext;
import org.apache.hadoop.ipc.Client;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -371,7 +372,7 @@ public static ClientProtocol createProxyWithAlignmentContext(
ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
ClientNamenodeProtocolPB.class, version, address, ugi, conf,
NetUtils.getDefaultSocketFactory(conf),
org.apache.hadoop.ipc.Client.getTimeout(conf), defaultPolicy,
Client.getRpcTimeout(conf), defaultPolicy,
fallbackToSimpleAuth, alignmentContext).getProxy();

if (withRetries) { // create the proxy with retries
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2072,4 +2072,25 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final long DFS_LEASE_HARDLIMIT_DEFAULT =
HdfsClientConfigKeys.DFS_LEASE_HARDLIMIT_DEFAULT;

public static final String IPC_RPC_TIMEOUT_FOR_ALIASHMAP_PROTOCOL =
"ipc.rpc-timeout.for.aliash-map.ms";
public static final long IPC_RPC_TIMEOUT_FOR_ALIASHMAP_PROTOCOL_DEFAULT = 30000;
public static final String IPC_RPC_TIMEOUT_FOR_JOURNAL_PROTOCOL =
"ipc.rpc-timeout.for.journal.ms";
public static final long IPC_RPC_TIMEOUT_FOR_JOURNAL_PROTOCOL_DEFAULT = 30000;
public static final String IPC_RPC_TIMEOUT_FOR_REFRESH_AUTHORIZATION_PROTOCOL =
"ipc.rpc-timeout.for.refresh-authorization.ms";
public static final long IPC_RPC_TIMEOUT_FOR_REFRESH_AUTHORIZATION_PROTOCOL_DEFAULT = 0;
public static final String IPC_RPC_TIMEOUT_FOR_REFRESH_USER_MAPPING_PROTOCOL =
"ipc.rpc-timeout.for.refresh-user-mappings.ms";
public static final long IPC_RPC_TIMEOUT_FOR_REFRESH_USER_MAPPING_PROTOCOL_DEFAULT = 0;
public static final String IPC_RPC_TIMEOUT_FOR_REFRESH_CALL_QUEUE_PROTOCOL =
"ipc.rpc-timeout.for.refresh-call-queue.ms";
public static final long IPC_RPC_TIMEOUT_FOR_REFRESH_CALL_QUEUE_PROTOCOL_DEFAULT = 0;
public static final String IPC_RPC_TIMEOUT_FOR_GET_USER_MAPPING_PROTOCOL =
"ipc.rpc-timeout.for.get-user-mappings.ms";
public static final long IPC_RPC_TIMEOUT_FOR_GET_USER_MAPPING_PROTOCOL_DEFAULT = 0;
public static final String IPC_RPC_TIMEOUT_FOR_NAMENODE_PROTOCOL =
"ipc.rpc-timeout.for.namenode.ms";
public static final long IPC_RPC_TIMEOUT_FOR_NAMENODE_PROTOCOL_DEFAULT = 0;
}
Original file line number Diff line number Diff line change
Expand Up @@ -217,25 +217,34 @@ public static <T> ProxyAndInfo<T> createNonHAProxy(
private static InMemoryAliasMapProtocol createNNProxyWithInMemoryAliasMapProtocol(
InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
AlignmentContext alignmentContext) throws IOException {
int timeout = getRPCTimeout(conf,
DFSConfigKeys.IPC_RPC_TIMEOUT_FOR_ALIASHMAP_PROTOCOL,
DFSConfigKeys.IPC_RPC_TIMEOUT_FOR_ALIASHMAP_PROTOCOL_DEFAULT);
AliasMapProtocolPB proxy = createNameNodeProxy(
address, conf, ugi, AliasMapProtocolPB.class, 30000, alignmentContext);
address, conf, ugi, AliasMapProtocolPB.class, timeout, alignmentContext);
return new InMemoryAliasMapProtocolClientSideTranslatorPB(proxy);
}

private static JournalProtocol createNNProxyWithJournalProtocol(
InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
AlignmentContext alignmentContext) throws IOException {
int timeout = getRPCTimeout(conf,
DFSConfigKeys.IPC_RPC_TIMEOUT_FOR_JOURNAL_PROTOCOL,
DFSConfigKeys.IPC_RPC_TIMEOUT_FOR_JOURNAL_PROTOCOL_DEFAULT);
JournalProtocolPB proxy = createNameNodeProxy(address,
conf, ugi, JournalProtocolPB.class, 30000, alignmentContext);
conf, ugi, JournalProtocolPB.class, timeout, alignmentContext);
return new JournalProtocolTranslatorPB(proxy);
}

private static RefreshAuthorizationPolicyProtocol
createNNProxyWithRefreshAuthorizationPolicyProtocol(InetSocketAddress address,
Configuration conf, UserGroupInformation ugi,
AlignmentContext alignmentContext) throws IOException {
int timeout = getRPCTimeout(conf,
DFSConfigKeys.IPC_RPC_TIMEOUT_FOR_REFRESH_AUTHORIZATION_PROTOCOL,
DFSConfigKeys.IPC_RPC_TIMEOUT_FOR_REFRESH_AUTHORIZATION_PROTOCOL_DEFAULT);
RefreshAuthorizationPolicyProtocolPB proxy = createNameNodeProxy(address,
conf, ugi, RefreshAuthorizationPolicyProtocolPB.class, 0,
conf, ugi, RefreshAuthorizationPolicyProtocolPB.class, timeout,
alignmentContext);
return new RefreshAuthorizationPolicyProtocolClientSideTranslatorPB(proxy);
}
Expand All @@ -244,34 +253,46 @@ private static JournalProtocol createNNProxyWithJournalProtocol(
createNNProxyWithRefreshUserMappingsProtocol(InetSocketAddress address,
Configuration conf, UserGroupInformation ugi,
AlignmentContext alignmentContext) throws IOException {
int timeout = getRPCTimeout(conf,
DFSConfigKeys.IPC_RPC_TIMEOUT_FOR_REFRESH_USER_MAPPING_PROTOCOL,
DFSConfigKeys.IPC_RPC_TIMEOUT_FOR_REFRESH_USER_MAPPING_PROTOCOL_DEFAULT);
RefreshUserMappingsProtocolPB proxy = createNameNodeProxy(address, conf,
ugi, RefreshUserMappingsProtocolPB.class, 0, alignmentContext);
ugi, RefreshUserMappingsProtocolPB.class, timeout, alignmentContext);
return new RefreshUserMappingsProtocolClientSideTranslatorPB(proxy);
}

private static RefreshCallQueueProtocol
createNNProxyWithRefreshCallQueueProtocol(InetSocketAddress address,
Configuration conf, UserGroupInformation ugi,
AlignmentContext alignmentContext) throws IOException {
int timeout = getRPCTimeout(conf,
DFSConfigKeys.IPC_RPC_TIMEOUT_FOR_REFRESH_CALL_QUEUE_PROTOCOL,
DFSConfigKeys.IPC_RPC_TIMEOUT_FOR_REFRESH_CALL_QUEUE_PROTOCOL_DEFAULT);
RefreshCallQueueProtocolPB proxy = createNameNodeProxy(address, conf, ugi,
RefreshCallQueueProtocolPB.class, 0, alignmentContext);
RefreshCallQueueProtocolPB.class, timeout, alignmentContext);
return new RefreshCallQueueProtocolClientSideTranslatorPB(proxy);
}

private static GetUserMappingsProtocol createNNProxyWithGetUserMappingsProtocol(
InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
AlignmentContext alignmentContext) throws IOException {
int timeout = getRPCTimeout(conf,
DFSConfigKeys.IPC_RPC_TIMEOUT_FOR_GET_USER_MAPPING_PROTOCOL,
DFSConfigKeys.IPC_RPC_TIMEOUT_FOR_GET_USER_MAPPING_PROTOCOL_DEFAULT);
GetUserMappingsProtocolPB proxy = createNameNodeProxy(address, conf, ugi,
GetUserMappingsProtocolPB.class, 0, alignmentContext);
GetUserMappingsProtocolPB.class, timeout, alignmentContext);
return new GetUserMappingsProtocolClientSideTranslatorPB(proxy);
}

private static NamenodeProtocol createNNProxyWithNamenodeProtocol(
InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
boolean withRetries, AlignmentContext alignmentContext)
throws IOException {
NamenodeProtocolPB proxy = createNameNodeProxy(
address, conf, ugi, NamenodeProtocolPB.class, 0, alignmentContext);
int timeout = getRPCTimeout(conf,
DFSConfigKeys.IPC_RPC_TIMEOUT_FOR_NAMENODE_PROTOCOL,
DFSConfigKeys.IPC_RPC_TIMEOUT_FOR_NAMENODE_PROTOCOL_DEFAULT);
NamenodeProtocolPB proxy = createNameNodeProxy(address, conf, ugi,
NamenodeProtocolPB.class, timeout, alignmentContext);
if (withRetries) { // create the proxy with retries
RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry(5, 200,
TimeUnit.MILLISECONDS);
Expand Down Expand Up @@ -312,4 +333,21 @@ private static <T> T createNameNodeProxy(InetSocketAddress address,
alignmentContext).getProxy();
}

/**
* Try to obtain the timeout for confKey from Conf.
* If the value is invalid, just print some warn log and return the default value.
* @param conf input Configuration.
* @param confKey input conf key.
* @param defaultValue input default conf value.
* @return a non negative number.
*/
private static int getRPCTimeout(Configuration conf, String confKey, long defaultValue) {
long tmpTimeout = conf.getLong(confKey, defaultValue);
if (tmpTimeout < 0) {
LOG.warn("Invalid value {} configured for {} should be greater than or equal to 0. " +
"Using default value of : {}ms instead.", tmpTimeout, conf, defaultValue);
tmpTimeout = defaultValue;
}
return (int) tmpTimeout;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6591,4 +6591,79 @@
Enables observer reads for clients. This should only be enabled when clients are using routers.
</description>
</property>
<property>
<name>ipc.rpc-timeout.for.aliash-map.ms</name>
<value>30000</value>
</property>
<property>
<description>
The amount of time the aliasMapProtocol client will wait to read from the namenode
before timing out. If the namenode does not report progress more
frequently than this time, the client will give up waiting.
</description>
</property>
<property>
<name>ipc.rpc-timeout.for.journal.ms</name>
<value>30000</value>
<description>
The amount of time the journalProtocol client will wait to read from the journalnode
before timing out. If the journalnode does not report progress more
frequently than this time, the client will give up waiting.
</description>
</property>
<property>
<name>ipc.rpc-timeout.for.refresh-authorization.ms</name>
<value>0</value>
<description>
The amount of time the refreshAuthorizationPolicyProtocol client will wait
to read from the namenode before timing out. If the namenode does not
report progress more frequently than this time, the client will give up waiting.
The default value of 0 indicates that timeout is disabled,
which can be set to the same as ipc.client.rpc-timeout.ms, such as 120s.
</description>
</property>
<property>
<name>ipc.rpc-timeout.for.refresh-user-mappings.ms</name>
<value>0</value>
<description>
The amount of time the refreshUserMappingsProtocol client will wait
to read from the namenode before timing out. If the namenode does not
report progress more frequently than this time, the client will give up waiting.
The default value of 0 indicates that timeout is disabled,
which can be set to the same as ipc.client.rpc-timeout.ms, such as 120s.
</description>
</property>
<property>
<name>ipc.rpc-timeout.for.refresh-call-queue.ms</name>
<value>0</value>
<description>
The amount of time the refreshCallQueueProtocol client will wait
to read from the namenode before timing out. If the namenode does not
report progress more frequently than this time, the client will give up waiting.
The default value of 0 indicates that timeout is disabled,
which can be set to the same as ipc.client.rpc-timeout.ms, such as 120s.
</description>
</property>
<property>
<name>ipc.rpc-timeout.for.get-user-mappings.ms</name>
<value>0</value>
<description>
The amount of time the getUserMappingsProtocol client will wait
to read from the namenode before timing out. If the namenode does not
report progress more frequently than this time, the client will give up waiting.
The default value of 0 indicates that timeout is disabled,
which can be set to the same as ipc.client.rpc-timeout.ms, such as 120s.
</description>
</property>
<property>
<name>ipc.rpc-timeout.for.namenode.ms</name>
<value>0</value>
<description>
The amount of time the namenodeProtocol client will wait
to read from the namenode before timing out. If the namenode does not
report progress more frequently than this time, the client will give up waiting.
The default value of 0 indicates that timeout is disabled,
which can be set to the same as ipc.client.rpc-timeout.ms, such as 120s.
</description>
</property>
</configuration>