From 1fe8a3914e5879e28d2c4895d51a555a98de8fdb Mon Sep 17 00:00:00 2001 From: pustota2009 <61382543+pustota2009@users.noreply.github.com> Date: Wed, 4 Mar 2020 20:30:25 +0300 Subject: [PATCH 1/4] Update BlockReaderFactory.java --- .../java/org/apache/hadoop/hdfs/BlockReaderFactory.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java index d27bd6ef0d..c4fcfc5748 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java @@ -391,7 +391,7 @@ private BlockReader getBlockReaderLocal() throws InvalidToken { } return null; } - ShortCircuitCache cache = clientContext.getShortCircuitCache(); + ShortCircuitCache cache = clientContext.getShortCircuitCache(block.getBlockId()); ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()); ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this); InvalidToken exc = info.getInvalidTokenException(); @@ -444,7 +444,7 @@ public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { if (curPeer.fromCache) remainingCacheTries--; DomainPeer peer = (DomainPeer)curPeer.peer; Slot slot = null; - ShortCircuitCache cache = clientContext.getShortCircuitCache(); + ShortCircuitCache cache = clientContext.getShortCircuitCache(block.getBlockId()); try { MutableBoolean usedPeer = new MutableBoolean(false); slot = cache.allocShmSlot(datanode, peer, usedPeer, @@ -503,7 +503,7 @@ public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { */ private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer, Slot slot) throws IOException { - ShortCircuitCache cache = clientContext.getShortCircuitCache(); + ShortCircuitCache cache = clientContext.getShortCircuitCache(slot.getBlockId().getBlockId()); final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(peer.getOutputStream())); SlotId slotId = slot == null ? null : slot.getSlotId(); From 26f3baefde52efd8d1de3a229185ff7a73b554d2 Mon Sep 17 00:00:00 2001 From: pustota2009 <61382543+pustota2009@users.noreply.github.com> Date: Wed, 4 Mar 2020 20:41:39 +0300 Subject: [PATCH 2/4] Update ClientContext.java --- .../org/apache/hadoop/hdfs/ClientContext.java | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java index f55aff5fd3..27a3235da9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java @@ -58,7 +58,7 @@ public class ClientContext { /** * Caches short-circuit file descriptors, mmap regions. */ - private final ShortCircuitCache shortCircuitCache; + private final ShortCircuitCache[] shortCircuitCache; /** * Caches TCP and UNIX domain sockets for reuse. @@ -89,11 +89,19 @@ public class ClientContext { * didn't match its config values yet. */ private boolean printedConfWarning = false; + + /** + * ShorCircuitCache array size. + */ + private final int clientShortCircuitNum; private ClientContext(String name, Conf conf) { this.name = name; this.confString = confAsString(conf); - this.shortCircuitCache = new ShortCircuitCache( + this.clientShortCircuitNum = conf.clientShortCircuitNum; + this.shortCircuitCache = new ShortCircuitCache[this.clientShortCircuitNum]; + for (int i = 0; i < this.clientShortCircuitNum; i++) { + this.shortCircuitCache[i] = new ShortCircuitCache( conf.shortCircuitStreamsCacheSize, conf.shortCircuitStreamsCacheExpiryMs, conf.shortCircuitMmapCacheSize, @@ -101,6 +109,7 @@ private ClientContext(String name, Conf conf) { conf.shortCircuitMmapCacheRetryTimeout, conf.shortCircuitCacheStaleThresholdMs, conf.shortCircuitSharedMemoryWatcherInterruptCheckMs); + } this.peerCache = new PeerCache(conf.socketCacheCapacity, conf.socketCacheExpiry); this.useLegacyBlockReaderLocal = conf.useLegacyBlockReaderLocal; @@ -181,10 +190,10 @@ public String getConfString() { return confString; } - public ShortCircuitCache getShortCircuitCache() { - return shortCircuitCache; + public ShortCircuitCache getShortCircuitCache(long idx) { + return shortCircuitCache[(int) (idx % clientShortCircuitNum)]; } - + public PeerCache getPeerCache() { return peerCache; } From f5508d0ee59e0c1cb06d9fb69c5aca4b467581c0 Mon Sep 17 00:00:00 2001 From: pustota2009 <61382543+pustota2009@users.noreply.github.com> Date: Wed, 4 Mar 2020 20:42:44 +0300 Subject: [PATCH 3/4] Update DFSClient.java --- .../src/main/java/org/apache/hadoop/hdfs/DFSClient.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index c49d210bf8..20f03e1542 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -323,6 +323,7 @@ public static class Conf { final long shortCircuitMmapCacheExpiryMs; final long shortCircuitMmapCacheRetryTimeout; final long shortCircuitCacheStaleThresholdMs; + final int clientShortCircuitNum; public Conf(Configuration conf) { // The hdfsTimeout is currently the same as the ipc timeout @@ -463,6 +464,11 @@ public Conf(Configuration conf) { dfsclientSlowIoWarningThresholdMs = conf.getLong( DFSConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY, DFSConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT); + + clientShortCircuitNum = conf.getInt(DFS_CLIENT_SHORT_CIRCUIT_NUM, + DFS_CLIENT_SHORT_CIRCUIT_NUM_DEFAULT); + clientShortCircuitNum = clientShortCircuitNum > 3 ? 3 : this.clientShortCircuitNum; + clientShortCircuitNum = clientShortCircuitNum < 1 ? 1 : this.clientShortCircuitNum; } public boolean isUseLegacyBlockReaderLocal() { From be48456e12a75c66e7cfd9dea58bbbf61b539c54 Mon Sep 17 00:00:00 2001 From: pustota2009 <61382543+pustota2009@users.noreply.github.com> Date: Wed, 4 Mar 2020 20:43:16 +0300 Subject: [PATCH 4/4] Update DFSConfigKeys.java --- .../src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 71a530ba18..d372ce210f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -464,6 +464,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final long DFS_CLIENT_MMAP_RETRY_TIMEOUT_MS_DEFAULT = 5 * 60 * 1000; public static final String DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS = "dfs.client.short.circuit.replica.stale.threshold.ms"; public static final long DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS_DEFAULT = 30 * 60 * 1000; + public static final String DFS_CLIENT_SHORT_CIRCUIT_NUM = "dfs.client.short.circuit.num"; + public static final int DFS_CLIENT_SHORT_CIRCUIT_NUM_DEFAULT = 1; // property for fsimage compression public static final String DFS_IMAGE_COMPRESS_KEY = "dfs.image.compress";