From ba32adb851b126c05737726db72960e420b79801 Mon Sep 17 00:00:00 2001 From: pustota2009 <61382543+pustota2009@users.noreply.github.com> Date: Sat, 7 Mar 2020 10:03:56 +0300 Subject: [PATCH 01/42] Update BlockReaderFactory.java --- .../apache/hadoop/hdfs/client/impl/BlockReaderFactory.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java index 2109e6e1b7ecf..bc576ca1c15cd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java @@ -475,7 +475,7 @@ private BlockReader getBlockReaderLocal() throws IOException { "giving up on BlockReaderLocal.", this, pathInfo); return null; } - ShortCircuitCache cache = clientContext.getShortCircuitCache(); + ShortCircuitCache cache = clientContext.getShortCircuitCache(block.getBlockId()); ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()); ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this); @@ -526,7 +526,7 @@ public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { if (curPeer.fromCache) remainingCacheTries--; DomainPeer peer = (DomainPeer)curPeer.peer; Slot slot = null; - ShortCircuitCache cache = clientContext.getShortCircuitCache(); + ShortCircuitCache cache = clientContext.getShortCircuitCache(block.getBlockId()); try { MutableBoolean usedPeer = new MutableBoolean(false); slot = cache.allocShmSlot(datanode, peer, usedPeer, @@ -581,7 +581,7 @@ public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { */ private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer, Slot slot) throws IOException { - ShortCircuitCache cache = clientContext.getShortCircuitCache(); + ShortCircuitCache cache = clientContext.getShortCircuitCache(slot.getBlockId().getBlockId()); final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(peer.getOutputStream(), SMALL_BUFFER_SIZE)); SlotId slotId = slot == null ? null : slot.getSlotId(); From 9c00b7d1ea571db119b9d848004705c10d372557 Mon Sep 17 00:00:00 2001 From: pustota2009 <61382543+pustota2009@users.noreply.github.com> Date: Sat, 7 Mar 2020 10:31:17 +0300 Subject: [PATCH 02/42] Update HdfsClientConfigKeys.java --- .../org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index d1ca63d8aba68..c317008f2b837 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -141,6 +141,8 @@ public interface HdfsClientConfigKeys { "dfs.short.circuit.shared.memory.watcher.interrupt.check.ms"; int DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT = 60000; + String DFS_SHORT_CIRCUIT_NUM = "dfs.client.short.circuit.num"; + int DFS_SHORT_CIRCUIT_NUM_DEFAULT = 1; String DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY = "dfs.client.slow.io.warning.threshold.ms"; long DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT = 30000; From d067bfdbd519e0f7e25c80c45ca17c784db3c3c1 Mon Sep 17 00:00:00 2001 From: pustota2009 <61382543+pustota2009@users.noreply.github.com> Date: Sat, 7 Mar 2020 10:34:04 +0300 Subject: [PATCH 03/42] Update DfsClientConf.java --- .../apache/hadoop/hdfs/client/impl/DfsClientConf.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java index 04bdfe47960f1..ccebce357c3f1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java @@ -142,7 +142,8 @@ public class DfsClientConf { replicaAccessorBuilderClasses; private final int stripedReadThreadpoolSize; - + private final int clientShortCircuitNum; + private final boolean dataTransferTcpNoDelay; public DfsClientConf(Configuration conf) { @@ -269,6 +270,14 @@ public DfsClientConf(Configuration conf) { HdfsClientConfigKeys.StripedRead.THREADPOOL_SIZE_KEY + " must be greater than 0."); replicaAccessorBuilderClasses = loadReplicaAccessorBuilderClasses(conf); + + clientShortCircuitNum = conf.getInt( + HdfsClientConfigKeys.DFS_SHORT_CIRCUIT_NUM, + HdfsClientConfigKeys.DFS_SHORT_CIRCUIT_NUM_DEFAULT); + Preconditions.checkArgument(clientShortCircuitNum >= 1, + HdfsClientConfigKeys.DFS_SHORT_CIRCUIT_NUM + "can't be less then 1."); + Preconditions.checkArgument(clientShortCircuitNum <= 3, + HdfsClientConfigKeys.DFS_SHORT_CIRCUIT_NUM + "can't be more then 3."); } @SuppressWarnings("unchecked") From 44748f6bb87d35b4d47606a864544d45e1ff1671 Mon Sep 17 00:00:00 2001 From: pustota2009 <61382543+pustota2009@users.noreply.github.com> Date: Sat, 7 Mar 2020 10:48:15 +0300 Subject: [PATCH 04/42] Update TestEnhancedByteBufferAccess.java --- .../org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java index 90b4f11a66a31..1f0800b425951 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java @@ -358,7 +358,7 @@ public void testZeroCopyMmapCache() throws Exception { fsIn.close(); fsIn = fs.open(TEST_PATH); final ShortCircuitCache cache = ClientContext.get( - CONTEXT, conf).getShortCircuitCache(); + CONTEXT, conf).getShortCircuitCache(0); cache.accept(new CountingVisitor(0, 5, 5, 0)); results[0] = fsIn.read(null, BLOCK_SIZE, EnumSet.of(ReadOption.SKIP_CHECKSUMS)); @@ -659,7 +659,7 @@ public void testZeroCopyReadOfCachedData() throws Exception { final ExtendedBlock firstBlock = DFSTestUtil.getFirstBlock(fs, TEST_PATH); final ShortCircuitCache cache = ClientContext.get( - CONTEXT, conf).getShortCircuitCache(); + CONTEXT, conf).getShortCircuitCache(0); waitForReplicaAnchorStatus(cache, firstBlock, true, true, 1); // Uncache the replica fs.removeCacheDirective(directiveId); From 04745b6aa0a88795e58a2402eef0281ea17059de Mon Sep 17 00:00:00 2001 From: pustota2009 <61382543+pustota2009@users.noreply.github.com> Date: Sat, 7 Mar 2020 10:51:59 +0300 Subject: [PATCH 05/42] Update TestBlockReaderFactory.java --- .../hdfs/client/impl/TestBlockReaderFactory.java | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderFactory.java index 6b04b14f49a77..84424494462e6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderFactory.java @@ -389,7 +389,7 @@ private void testShortCircuitCacheUnbufferWithDisableInterval( try (FSDataInputStream in = dfs.open(testFile)) { Assert.assertEquals(0, - dfs.getClient().getClientContext().getShortCircuitCache() + dfs.getClient().getClientContext().getShortCircuitCache(0) .getReplicaInfoMapSize()); final byte[] buf = new byte[testFileLen]; @@ -398,12 +398,12 @@ private void testShortCircuitCacheUnbufferWithDisableInterval( // Set cache size to 0 so the replica marked evictable by unbuffer // will be purged immediately. - dfs.getClient().getClientContext().getShortCircuitCache() + dfs.getClient().getClientContext().getShortCircuitCache(0) .setMaxTotalSize(0); LOG.info("Unbuffering"); in.unbuffer(); Assert.assertEquals(0, - dfs.getClient().getClientContext().getShortCircuitCache() + dfs.getClient().getClientContext().getShortCircuitCache(0) .getReplicaInfoMapSize()); DFSTestUtil.appendFile(dfs, testFile, "append more data"); @@ -432,7 +432,7 @@ private void validateReadResult(final DistributedFileSystem dfs, final int expectedScrRepMapSize) { Assert.assertThat(expected, CoreMatchers.is(actual)); Assert.assertEquals(expectedScrRepMapSize, - dfs.getClient().getClientContext().getShortCircuitCache() + dfs.getClient().getClientContext().getShortCircuitCache(0) .getReplicaInfoMapSize()); } @@ -467,7 +467,7 @@ public void testShortCircuitReadFromServerWithoutShm() throws Exception { calculateFileContentsFromSeed(SEED, TEST_FILE_LEN); Assert.assertTrue(Arrays.equals(contents, expected)); final ShortCircuitCache cache = - fs.getClient().getClientContext().getShortCircuitCache(); + fs.getClient().getClientContext().getShortCircuitCache(0); final DatanodeInfo datanode = new DatanodeInfoBuilder() .setNodeID(cluster.getDataNodes().get(0).getDatanodeId()) .build(); @@ -516,7 +516,7 @@ public void testShortCircuitReadFromClientWithoutShm() throws Exception { calculateFileContentsFromSeed(SEED, TEST_FILE_LEN); Assert.assertTrue(Arrays.equals(contents, expected)); final ShortCircuitCache cache = - fs.getClient().getClientContext().getShortCircuitCache(); + fs.getClient().getClientContext().getShortCircuitCache(0); Assert.assertEquals(null, cache.getDfsClientShmManager()); cluster.shutdown(); sockDir.close(); @@ -548,7 +548,7 @@ public void testShortCircuitCacheShutdown() throws Exception { calculateFileContentsFromSeed(SEED, TEST_FILE_LEN); Assert.assertTrue(Arrays.equals(contents, expected)); final ShortCircuitCache cache = - fs.getClient().getClientContext().getShortCircuitCache(); + fs.getClient().getClientContext().getShortCircuitCache(0); cache.close(); Assert.assertTrue(cache.getDfsClientShmManager(). getDomainSocketWatcher().isClosed()); From b30fa2c27ed9f70aab409da973f6e44786322df8 Mon Sep 17 00:00:00 2001 From: pustota2009 <61382543+pustota2009@users.noreply.github.com> Date: Sat, 7 Mar 2020 10:53:46 +0300 Subject: [PATCH 06/42] Update TestShortCircuitCache.java --- .../hadoop/hdfs/shortcircuit/TestShortCircuitCache.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java index b2da6a2fca386..06b4128e9692a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java @@ -431,7 +431,7 @@ public void testAllocShm() throws Exception { cluster.waitActive(); DistributedFileSystem fs = cluster.getFileSystem(); final ShortCircuitCache cache = - fs.getClient().getClientContext().getShortCircuitCache(); + fs.getClient().getClientContext().getShortCircuitCache(0); cache.getDfsClientShmManager().visit(new Visitor() { @Override public void visit(HashMap info) @@ -501,7 +501,7 @@ public void testShmBasedStaleness() throws Exception { cluster.waitActive(); DistributedFileSystem fs = cluster.getFileSystem(); final ShortCircuitCache cache = - fs.getClient().getClientContext().getShortCircuitCache(); + fs.getClient().getClientContext().getShortCircuitCache(0); String TEST_FILE = "/test_file"; final int TEST_FILE_LEN = 8193; final int SEED = 0xFADED; @@ -565,7 +565,7 @@ public void testUnlinkingReplicasInFileDescriptorCache() throws Exception { cluster.waitActive(); DistributedFileSystem fs = cluster.getFileSystem(); final ShortCircuitCache cache = - fs.getClient().getClientContext().getShortCircuitCache(); + fs.getClient().getClientContext().getShortCircuitCache(0); cache.getDfsClientShmManager().visit(new Visitor() { @Override public void visit(HashMap info) From 622901343c5bf7addfe3c4e1717b0dd768bc0890 Mon Sep 17 00:00:00 2001 From: pustota2009 <61382543+pustota2009@users.noreply.github.com> Date: Sat, 7 Mar 2020 10:55:32 +0300 Subject: [PATCH 07/42] Update HdfsClientConfigKeys.java --- .../org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index c317008f2b837..60e999730e80d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -141,7 +141,7 @@ public interface HdfsClientConfigKeys { "dfs.short.circuit.shared.memory.watcher.interrupt.check.ms"; int DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT = 60000; - String DFS_SHORT_CIRCUIT_NUM = "dfs.client.short.circuit.num"; + String DFS_SHORT_CIRCUIT_NUM = "dfs.short.circuit.num"; int DFS_SHORT_CIRCUIT_NUM_DEFAULT = 1; String DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY = "dfs.client.slow.io.warning.threshold.ms"; From fcb57634f39dabbb02603a4cd67b1f998a4d8e7a Mon Sep 17 00:00:00 2001 From: pustota2009 <61382543+pustota2009@users.noreply.github.com> Date: Sat, 7 Mar 2020 15:11:08 +0300 Subject: [PATCH 08/42] Update ClientContext.java --- .../java/org/apache/hadoop/hdfs/ClientContext.java | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java index cbd941b6b9d90..3f64a2dd34e1e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java @@ -77,7 +77,7 @@ public class ClientContext { /** * Caches short-circuit file descriptors, mmap regions. */ - private final ShortCircuitCache shortCircuitCache; + private final ShortCircuitCache[] shortCircuitCache; /** * Caches TCP and UNIX domain sockets for reuse. @@ -131,6 +131,11 @@ public class ClientContext { * the DFSInputStreams in the same client. */ private DeadNodeDetector deadNodeDetector = null; + + /** + * ShorCircuitCache array size. + */ + private final int clientShortCircuitNum; private ClientContext(String name, DfsClientConf conf, Configuration config) { @@ -138,7 +143,12 @@ private ClientContext(String name, DfsClientConf conf, this.name = name; this.confString = scConf.confAsString(); - this.shortCircuitCache = ShortCircuitCache.fromConf(scConf); + this.clientShortCircuitNum = conf.clientShortCircuitNum; + this.shortCircuitCache = new ShortCircuitCache[this.clientShortCircuitNum]; + for (int i = 0; i < this.clientShortCircuitNum; i++) { + this.shortCircuitCache[i] = ShortCircuitCache.fromConf(scConf); + } + this.peerCache = new PeerCache(scConf.getSocketCacheCapacity(), scConf.getSocketCacheExpiry()); this.keyProviderCache = new KeyProviderCache( From ccfff2af991b5aeca1c7508b668a582c8e6a5154 Mon Sep 17 00:00:00 2001 From: pustota2009 <61382543+pustota2009@users.noreply.github.com> Date: Sat, 7 Mar 2020 15:14:47 +0300 Subject: [PATCH 09/42] Update ClientContext.java --- .../src/main/java/org/apache/hadoop/hdfs/ClientContext.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java index 3f64a2dd34e1e..a95ed7f48df99 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java @@ -133,7 +133,7 @@ public class ClientContext { private DeadNodeDetector deadNodeDetector = null; /** - * ShorCircuitCache array size. + * ShortCircuitCache array size. */ private final int clientShortCircuitNum; From ed4c95fb7b10c322ad8843b2dee6d11479b8a743 Mon Sep 17 00:00:00 2001 From: pustota2009 <61382543+pustota2009@users.noreply.github.com> Date: Sun, 8 Mar 2020 10:43:31 +0300 Subject: [PATCH 10/42] Update ClientContext.java --- .../src/main/java/org/apache/hadoop/hdfs/ClientContext.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java index a95ed7f48df99..edf27b6f9d0d2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java @@ -237,8 +237,8 @@ public String getConfString() { return confString; } - public ShortCircuitCache getShortCircuitCache() { - return shortCircuitCache; + public ShortCircuitCache getShortCircuitCache(long idx) { + return shortCircuitCache[(int) (idx % clientShortCircuitNum)]; } public PeerCache getPeerCache() { From ed696e113dd66bfc922f9bd1b79413953113a5c3 Mon Sep 17 00:00:00 2001 From: pustota2009 <61382543+pustota2009@users.noreply.github.com> Date: Sun, 8 Mar 2020 10:47:11 +0300 Subject: [PATCH 11/42] Update BlockReaderFactory.java --- .../hadoop/hdfs/client/impl/BlockReaderFactory.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java index bc576ca1c15cd..893bc2fa39201 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java @@ -475,7 +475,8 @@ private BlockReader getBlockReaderLocal() throws IOException { "giving up on BlockReaderLocal.", this, pathInfo); return null; } - ShortCircuitCache cache = clientContext.getShortCircuitCache(block.getBlockId()); + ShortCircuitCache cache = + clientContext.getShortCircuitCache(block.getBlockId()); ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()); ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this); @@ -526,7 +527,8 @@ public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { if (curPeer.fromCache) remainingCacheTries--; DomainPeer peer = (DomainPeer)curPeer.peer; Slot slot = null; - ShortCircuitCache cache = clientContext.getShortCircuitCache(block.getBlockId()); + ShortCircuitCache cache = + clientContext.getShortCircuitCache(block.getBlockId()); try { MutableBoolean usedPeer = new MutableBoolean(false); slot = cache.allocShmSlot(datanode, peer, usedPeer, @@ -581,7 +583,8 @@ public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { */ private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer, Slot slot) throws IOException { - ShortCircuitCache cache = clientContext.getShortCircuitCache(slot.getBlockId().getBlockId()); + ShortCircuitCache cache = + clientContext.getShortCircuitCache(slot.getBlockId().getBlockId()); final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(peer.getOutputStream(), SMALL_BUFFER_SIZE)); SlotId slotId = slot == null ? null : slot.getSlotId(); From 81ffc9a90f3336757023cc865342717b9ad63696 Mon Sep 17 00:00:00 2001 From: pustota2009 <61382543+pustota2009@users.noreply.github.com> Date: Sun, 8 Mar 2020 10:49:40 +0300 Subject: [PATCH 12/42] Update DfsClientConf.java --- .../hadoop/hdfs/client/impl/DfsClientConf.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java index 96574fa6b3b44..e41ec324c31b0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java @@ -292,13 +292,15 @@ public DfsClientConf(Configuration conf) { conf.getLong(HdfsClientConfigKeys.DFS_LEASE_HARDLIMIT_KEY, HdfsClientConfigKeys.DFS_LEASE_HARDLIMIT_DEFAULT) * 1000; - clientShortCircuitNum = conf.getInt( - HdfsClientConfigKeys.DFS_SHORT_CIRCUIT_NUM, - HdfsClientConfigKeys.DFS_SHORT_CIRCUIT_NUM_DEFAULT); + clientShortCircuitNum = conf.getInt( + HdfsClientConfigKeys.DFS_SHORT_CIRCUIT_NUM, + HdfsClientConfigKeys.DFS_SHORT_CIRCUIT_NUM_DEFAULT); Preconditions.checkArgument(clientShortCircuitNum >= 1, - HdfsClientConfigKeys.DFS_SHORT_CIRCUIT_NUM + "can't be less then 1."); + HdfsClientConfigKeys.DFS_SHORT_CIRCUIT_NUM + + "can't be less then 1."); Preconditions.checkArgument(clientShortCircuitNum <= 3, - HdfsClientConfigKeys.DFS_SHORT_CIRCUIT_NUM + "can't be more then 3."); + HdfsClientConfigKeys.DFS_SHORT_CIRCUIT_NUM + + "can't be more then 3."); } @SuppressWarnings("unchecked") From 99ea034cadb4e9410a534c560acc8248f133f10c Mon Sep 17 00:00:00 2001 From: pustota2009 <61382543+pustota2009@users.noreply.github.com> Date: Sun, 8 Mar 2020 11:07:34 +0300 Subject: [PATCH 13/42] Update ClientContext.java --- .../src/main/java/org/apache/hadoop/hdfs/ClientContext.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java index edf27b6f9d0d2..54bdd90e7cec4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java @@ -131,7 +131,7 @@ public class ClientContext { * the DFSInputStreams in the same client. */ private DeadNodeDetector deadNodeDetector = null; - + /** * ShortCircuitCache array size. */ @@ -148,7 +148,7 @@ private ClientContext(String name, DfsClientConf conf, for (int i = 0; i < this.clientShortCircuitNum; i++) { this.shortCircuitCache[i] = ShortCircuitCache.fromConf(scConf); } - + this.peerCache = new PeerCache(scConf.getSocketCacheCapacity(), scConf.getSocketCacheExpiry()); this.keyProviderCache = new KeyProviderCache( From b8d8d50821be0a59c5909e6bc2be36a485609a94 Mon Sep 17 00:00:00 2001 From: pustota2009 <61382543+pustota2009@users.noreply.github.com> Date: Sun, 8 Mar 2020 11:08:39 +0300 Subject: [PATCH 14/42] Update DfsClientConf.java --- .../java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java index e41ec324c31b0..80f2c5ad044fa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java @@ -148,7 +148,7 @@ public class DfsClientConf { private final int stripedReadThreadpoolSize; private final int clientShortCircuitNum; - + private final boolean dataTransferTcpNoDelay; private final boolean deadNodeDetectionEnabled; From fc8fbcbc93fc46a8950b542b1442431ba462e5b5 Mon Sep 17 00:00:00 2001 From: pustota2009 <61382543+pustota2009@users.noreply.github.com> Date: Sun, 8 Mar 2020 12:18:03 +0300 Subject: [PATCH 15/42] Update ClientContext.java --- .../src/main/java/org/apache/hadoop/hdfs/ClientContext.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java index 54bdd90e7cec4..74fd6c3f72e25 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java @@ -143,7 +143,7 @@ private ClientContext(String name, DfsClientConf conf, this.name = name; this.confString = scConf.confAsString(); - this.clientShortCircuitNum = conf.clientShortCircuitNum; + this.clientShortCircuitNum = conf.getClientShortCircuitNum(); this.shortCircuitCache = new ShortCircuitCache[this.clientShortCircuitNum]; for (int i = 0; i < this.clientShortCircuitNum; i++) { this.shortCircuitCache[i] = ShortCircuitCache.fromConf(scConf); From 2f24c67cfbf71cea05fc8e87b3b1315740141ed1 Mon Sep 17 00:00:00 2001 From: pustota2009 <61382543+pustota2009@users.noreply.github.com> Date: Sun, 8 Mar 2020 12:19:10 +0300 Subject: [PATCH 16/42] Update DfsClientConf.java --- .../org/apache/hadoop/hdfs/client/impl/DfsClientConf.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java index 80f2c5ad044fa..c31d73647eed7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java @@ -606,6 +606,11 @@ public long getSlowIoWarningThresholdMs() { return slowIoWarningThresholdMs; } + /** + * @return the clientShortCircuitNum + */ + public int getClientShortCircuitNum() { return clientShortCircuitNum; } + /** * @return the hedgedReadThresholdMillis */ From 4ec972bd50fffe0e968de8771f7b6df9e9eb79cb Mon Sep 17 00:00:00 2001 From: pustota2009 <61382543+pustota2009@users.noreply.github.com> Date: Sun, 8 Mar 2020 14:06:40 +0300 Subject: [PATCH 17/42] Update DfsClientConf.java --- .../org/apache/hadoop/hdfs/client/impl/DfsClientConf.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java index c31d73647eed7..fb768aecaa229 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java @@ -609,7 +609,9 @@ public long getSlowIoWarningThresholdMs() { /** * @return the clientShortCircuitNum */ - public int getClientShortCircuitNum() { return clientShortCircuitNum; } + public int getClientShortCircuitNum() { + return clientShortCircuitNum; + } /** * @return the hedgedReadThresholdMillis From 122ede27e9a11c8daed0ae55aae0451564805d50 Mon Sep 17 00:00:00 2001 From: pustota2009 <61382543+pustota2009@users.noreply.github.com> Date: Sun, 8 Mar 2020 14:11:53 +0300 Subject: [PATCH 18/42] Update BlockReaderFactory.java --- .../org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java index 893bc2fa39201..22e2899d0c412 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java @@ -584,7 +584,7 @@ public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer, Slot slot) throws IOException { ShortCircuitCache cache = - clientContext.getShortCircuitCache(slot.getBlockId().getBlockId()); + clientContext.getShortCircuitCache(block.getBlockId()); final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(peer.getOutputStream(), SMALL_BUFFER_SIZE)); SlotId slotId = slot == null ? null : slot.getSlotId(); From e2fefa8a62051c82ea48973fa08dc767bb78a739 Mon Sep 17 00:00:00 2001 From: pustota2009 <61382543+pustota2009@users.noreply.github.com> Date: Sun, 8 Mar 2020 15:38:07 +0300 Subject: [PATCH 19/42] Update DfsClientConf.java --- .../java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java index fb768aecaa229..91347e6cf4d52 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java @@ -138,9 +138,8 @@ public class DfsClientConf { /** wait time window before refreshing blocklocation for inputstream. */ private final long refreshReadBlockLocationsMS; - + private final ShortCircuitConf shortCircuitConf; - private final long hedgedReadThresholdMillis; private final int hedgedReadThreadpoolSize; private final List> From 7153ae0c3957a8e3cce2412b62b72009d3fa9a50 Mon Sep 17 00:00:00 2001 From: pustota2009 <61382543+pustota2009@users.noreply.github.com> Date: Sun, 8 Mar 2020 17:23:56 +0300 Subject: [PATCH 20/42] Update TestBlockReaderLocal.java --- .../apache/hadoop/hdfs/client/impl/TestBlockReaderLocal.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocal.java index 95fb67a1a4e19..5a68abce6ed42 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocal.java @@ -179,7 +179,7 @@ public void runBlockReaderLocalTest(BlockReaderLocalTest test, File metaFile = cluster.getBlockMetadataFile(0, block); ShortCircuitCache shortCircuitCache = - ClientContext.getFromConf(conf).getShortCircuitCache(); + ClientContext.getFromConf(conf).getShortCircuitCache(0); cluster.shutdown(); cluster = null; test.setup(dataFile, checksum); From 1ed0c137aa9f8f3189031b0328b7e4b281f6c4ff Mon Sep 17 00:00:00 2001 From: pustota2009 <61382543+pustota2009@users.noreply.github.com> Date: Sun, 8 Mar 2020 17:24:50 +0300 Subject: [PATCH 21/42] Update TestShortCircuitCache.java --- .../apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java index 06b4128e9692a..adb1b86972dc3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java @@ -877,7 +877,7 @@ public void testRequestFileDescriptorsWhenULimit() throws Exception { return peerCache; }); - Mockito.when(clientContext.getShortCircuitCache()).thenAnswer( + Mockito.when(clientContext.getShortCircuitCache(0)).thenAnswer( (Answer) shortCircuitCacheCall -> { ShortCircuitCache cache = Mockito.mock(ShortCircuitCache.class); Mockito.when(cache.allocShmSlot( From 2174a294fec7c765d3f310338de1bcddfc3f0c7b Mon Sep 17 00:00:00 2001 From: pustota2009 <61382543+pustota2009@users.noreply.github.com> Date: Sun, 8 Mar 2020 21:22:34 +0300 Subject: [PATCH 22/42] Update TestEnhancedByteBufferAccess.java --- .../java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java index 1f0800b425951..bb7df9e3e7a1c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java @@ -655,7 +655,7 @@ public void testZeroCopyReadOfCachedData() throws Exception { fsIn2.releaseBuffer(result2); fsIn2.close(); - // check that the replica is anchored + // check that the replica is anchored final ExtendedBlock firstBlock = DFSTestUtil.getFirstBlock(fs, TEST_PATH); final ShortCircuitCache cache = ClientContext.get( From 6295b167f659bca313fe35343a4ba8bbf9b3f4ff Mon Sep 17 00:00:00 2001 From: pustota2009 <61382543+pustota2009@users.noreply.github.com> Date: Mon, 9 Mar 2020 06:54:00 +0300 Subject: [PATCH 23/42] Update DfsClientConf.java --- .../java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java | 1 + 1 file changed, 1 insertion(+) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java index 91347e6cf4d52..5117517e844ce 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java @@ -146,6 +146,7 @@ public class DfsClientConf { replicaAccessorBuilderClasses; private final int stripedReadThreadpoolSize; + private final int clientShortCircuitNum; private final boolean dataTransferTcpNoDelay; From 2876127385321857c72cb8d41e5873a2ef16d237 Mon Sep 17 00:00:00 2001 From: pustota2009 <61382543+pustota2009@users.noreply.github.com> Date: Mon, 9 Mar 2020 08:21:03 +0300 Subject: [PATCH 24/42] Update hdfs-default.xml --- .../hadoop-hdfs/src/main/resources/hdfs-default.xml | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index ede3dc09c0bf0..52636f62eb99d 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -4137,6 +4137,14 @@ + + dfs.short.circuit.num + 1 + + Number of short-circuit caches. + + + dfs.client.read.striped.threadpool.size 18 From 31692a2a74f8231241361334805157b6c3c1dc5e Mon Sep 17 00:00:00 2001 From: pustota2009 <61382543+pustota2009@users.noreply.github.com> Date: Mon, 9 Mar 2020 08:46:15 +0300 Subject: [PATCH 25/42] Update ClientContext.java --- .../src/main/java/org/apache/hadoop/hdfs/ClientContext.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java index 74fd6c3f72e25..26e71ab48adac 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java @@ -131,7 +131,7 @@ public class ClientContext { * the DFSInputStreams in the same client. */ private DeadNodeDetector deadNodeDetector = null; - + /** * ShortCircuitCache array size. */ From 3c79f1bc31ee62efb8e197003c87062d184e8ccf Mon Sep 17 00:00:00 2001 From: pustota2009 <61382543+pustota2009@users.noreply.github.com> Date: Mon, 9 Mar 2020 08:48:01 +0300 Subject: [PATCH 26/42] Update BlockReaderFactory.java --- .../apache/hadoop/hdfs/client/impl/BlockReaderFactory.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java index 22e2899d0c412..d3160a17c0f92 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java @@ -475,7 +475,7 @@ private BlockReader getBlockReaderLocal() throws IOException { "giving up on BlockReaderLocal.", this, pathInfo); return null; } - ShortCircuitCache cache = + ShortCircuitCache cache = clientContext.getShortCircuitCache(block.getBlockId()); ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()); @@ -527,7 +527,7 @@ public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { if (curPeer.fromCache) remainingCacheTries--; DomainPeer peer = (DomainPeer)curPeer.peer; Slot slot = null; - ShortCircuitCache cache = + ShortCircuitCache cache = clientContext.getShortCircuitCache(block.getBlockId()); try { MutableBoolean usedPeer = new MutableBoolean(false); @@ -583,7 +583,7 @@ public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { */ private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer, Slot slot) throws IOException { - ShortCircuitCache cache = + ShortCircuitCache cache = clientContext.getShortCircuitCache(block.getBlockId()); final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(peer.getOutputStream(), SMALL_BUFFER_SIZE)); From 1308ea79e8be238cc1c363c18ac5e3a526d03ce5 Mon Sep 17 00:00:00 2001 From: pustota2009 <61382543+pustota2009@users.noreply.github.com> Date: Mon, 9 Mar 2020 08:50:05 +0300 Subject: [PATCH 27/42] Update DfsClientConf.java --- .../apache/hadoop/hdfs/client/impl/DfsClientConf.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java index 5117517e844ce..1aef7fc2d0563 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java @@ -138,7 +138,7 @@ public class DfsClientConf { /** wait time window before refreshing blocklocation for inputstream. */ private final long refreshReadBlockLocationsMS; - + private final ShortCircuitConf shortCircuitConf; private final long hedgedReadThresholdMillis; private final int hedgedReadThreadpoolSize; @@ -296,11 +296,11 @@ public DfsClientConf(Configuration conf) { HdfsClientConfigKeys.DFS_SHORT_CIRCUIT_NUM, HdfsClientConfigKeys.DFS_SHORT_CIRCUIT_NUM_DEFAULT); Preconditions.checkArgument(clientShortCircuitNum >= 1, - HdfsClientConfigKeys.DFS_SHORT_CIRCUIT_NUM + + HdfsClientConfigKeys.DFS_SHORT_CIRCUIT_NUM + "can't be less then 1."); Preconditions.checkArgument(clientShortCircuitNum <= 3, HdfsClientConfigKeys.DFS_SHORT_CIRCUIT_NUM + - "can't be more then 3."); + "can't be more then 3."); } @SuppressWarnings("unchecked") @@ -609,8 +609,8 @@ public long getSlowIoWarningThresholdMs() { /** * @return the clientShortCircuitNum */ - public int getClientShortCircuitNum() { - return clientShortCircuitNum; + public int getClientShortCircuitNum() { + return clientShortCircuitNum; } /** From 96b34d6704325891caf4b82d695881c760b888dd Mon Sep 17 00:00:00 2001 From: pustota2009 <61382543+pustota2009@users.noreply.github.com> Date: Mon, 9 Mar 2020 08:51:16 +0300 Subject: [PATCH 28/42] Update TestEnhancedByteBufferAccess.java --- .../java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java index bb7df9e3e7a1c..699c946d05ff6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java @@ -654,7 +654,7 @@ public void testZeroCopyReadOfCachedData() throws Exception { BLOCK_SIZE), byteBufferToArray(result2)); fsIn2.releaseBuffer(result2); fsIn2.close(); - + // check that the replica is anchored final ExtendedBlock firstBlock = DFSTestUtil.getFirstBlock(fs, TEST_PATH); From 058b5c7b7d3dd6457af3c39c7e789d8e97151863 Mon Sep 17 00:00:00 2001 From: pustota2009 <61382543+pustota2009@users.noreply.github.com> Date: Mon, 9 Mar 2020 12:12:02 +0300 Subject: [PATCH 29/42] Update TestEnhancedByteBufferAccess.java --- .../java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java index 699c946d05ff6..5a03236678cfd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java @@ -655,7 +655,7 @@ public void testZeroCopyReadOfCachedData() throws Exception { fsIn2.releaseBuffer(result2); fsIn2.close(); - // check that the replica is anchored + // check that the replica is anchored final ExtendedBlock firstBlock = DFSTestUtil.getFirstBlock(fs, TEST_PATH); final ShortCircuitCache cache = ClientContext.get( From 030ae584f6a9f76dba20aeb9c4bbe2aa6ad6e399 Mon Sep 17 00:00:00 2001 From: pustota2009 <61382543+pustota2009@users.noreply.github.com> Date: Mon, 9 Mar 2020 15:47:50 +0300 Subject: [PATCH 30/42] Update TestShortCircuitCache.java --- .../apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java index adb1b86972dc3..5782c03d05b5a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java @@ -877,7 +877,7 @@ public void testRequestFileDescriptorsWhenULimit() throws Exception { return peerCache; }); - Mockito.when(clientContext.getShortCircuitCache(0)).thenAnswer( + Mockito.when(clientContext.getShortCircuitCache(blk.getBlock().getBlockId())).thenAnswer( (Answer) shortCircuitCacheCall -> { ShortCircuitCache cache = Mockito.mock(ShortCircuitCache.class); Mockito.when(cache.allocShmSlot( From abd0919ae7bb06344383563d2ccf3a03dbd983fa Mon Sep 17 00:00:00 2001 From: pustota2009 <61382543+pustota2009@users.noreply.github.com> Date: Mon, 9 Mar 2020 15:47:59 +0300 Subject: [PATCH 31/42] Update TestEnhancedByteBufferAccess.java --- .../java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java index 5a03236678cfd..19bc71111e868 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java @@ -654,7 +654,7 @@ public void testZeroCopyReadOfCachedData() throws Exception { BLOCK_SIZE), byteBufferToArray(result2)); fsIn2.releaseBuffer(result2); fsIn2.close(); - + // check that the replica is anchored final ExtendedBlock firstBlock = DFSTestUtil.getFirstBlock(fs, TEST_PATH); From 42b9b7ca303045d9dfdb05d3cff65ee9305194aa Mon Sep 17 00:00:00 2001 From: pustota2009 <61382543+pustota2009@users.noreply.github.com> Date: Mon, 9 Mar 2020 21:31:13 +0300 Subject: [PATCH 32/42] Update DfsClientConf.java --- .../org/apache/hadoop/hdfs/client/impl/DfsClientConf.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java index 1aef7fc2d0563..fd5982f8da855 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java @@ -298,9 +298,9 @@ public DfsClientConf(Configuration conf) { Preconditions.checkArgument(clientShortCircuitNum >= 1, HdfsClientConfigKeys.DFS_SHORT_CIRCUIT_NUM + "can't be less then 1."); - Preconditions.checkArgument(clientShortCircuitNum <= 3, + Preconditions.checkArgument(clientShortCircuitNum <= 5, HdfsClientConfigKeys.DFS_SHORT_CIRCUIT_NUM + - "can't be more then 3."); + "can't be more then 5."); } @SuppressWarnings("unchecked") From ed8bdf6658fce0e7560ef50eb22baf35305dad65 Mon Sep 17 00:00:00 2001 From: pustota2009 <61382543+pustota2009@users.noreply.github.com> Date: Sat, 28 Mar 2020 07:54:00 +0300 Subject: [PATCH 33/42] Update hdfs-default.xml --- .../hadoop-hdfs/src/main/resources/hdfs-default.xml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 52636f62eb99d..756766b9bb294 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -4141,7 +4141,9 @@ dfs.short.circuit.num 1 - Number of short-circuit caches. + Number of short-circuit caches. This setting should + be in the range 1 - 5. Lower values will result in lower CPU consumption; higher + values may speed up massive parallel reading files. From 5199f67a9addf9c3dac2c51d609b9528ddb20594 Mon Sep 17 00:00:00 2001 From: pustota2009 <61382543+pustota2009@users.noreply.github.com> Date: Sat, 4 Apr 2020 13:58:09 +0300 Subject: [PATCH 34/42] Update TestShortCircuitCache.java --- .../apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java index 5782c03d05b5a..c1905fcb79811 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java @@ -877,7 +877,8 @@ public void testRequestFileDescriptorsWhenULimit() throws Exception { return peerCache; }); - Mockito.when(clientContext.getShortCircuitCache(blk.getBlock().getBlockId())).thenAnswer( + Mockito.when(clientContext.getShortCircuitCache( + blk.getBlock().getBlockId())).thenAnswer( (Answer) shortCircuitCacheCall -> { ShortCircuitCache cache = Mockito.mock(ShortCircuitCache.class); Mockito.when(cache.allocShmSlot( From ad27cd324c61497e7b8c25d3e13f612d872e9638 Mon Sep 17 00:00:00 2001 From: pustota2009 Date: Sun, 5 Apr 2020 17:30:14 +0300 Subject: [PATCH 35/42] Unit tests for few ShortCircuitCaches added --- .../client/impl/TestBlockReaderLocal.java | 231 ++++++++++++------ .../shortcircuit/TestShortCircuitCache.java | 4 +- 2 files changed, 153 insertions(+), 82 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocal.java index 5a68abce6ed42..e27f320be40f7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocal.java @@ -116,7 +116,7 @@ private static void readFully(BlockReaderLocal reader, } private static class BlockReaderLocalTest { - final static int TEST_LENGTH = 12345; + final static int TEST_LENGTH = 1234567; final static int BYTES_PER_CHECKSUM = 512; public void setConfiguration(HdfsConfiguration conf) { @@ -130,10 +130,13 @@ public void doTest(BlockReaderLocal reader, byte original[]) throws IOException { // default: no-op } - } + public void doTest(BlockReaderLocal reader, byte original[], int shift) + throws IOException { + // default: no-op + } } public void runBlockReaderLocalTest(BlockReaderLocalTest test, - boolean checksum, long readahead) throws IOException { + boolean checksum, long readahead, int shortCircuitCachesNum) throws IOException { Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null)); MiniDFSCluster cluster = null; HdfsConfiguration conf = new HdfsConfiguration(); @@ -143,10 +146,12 @@ public void runBlockReaderLocalTest(BlockReaderLocalTest test, BlockReaderLocalTest.BYTES_PER_CHECKSUM); conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C"); conf.setLong(HdfsClientConfigKeys.DFS_CLIENT_CACHE_READAHEAD, readahead); + conf.setInt(HdfsClientConfigKeys.DFS_SHORT_CIRCUIT_NUM, shortCircuitCachesNum); test.setConfiguration(conf); FileInputStream dataIn = null, metaIn = null; final Path TEST_PATH = new Path("/a"); final long RANDOM_SEED = 4567L; + final int blockSize = 10 * 1024; BlockReaderLocal blockReaderLocal = null; FSDataInputStream fsIn = null; byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH]; @@ -158,8 +163,8 @@ public void runBlockReaderLocalTest(BlockReaderLocalTest test, cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); cluster.waitActive(); fs = cluster.getFileSystem(); - DFSTestUtil.createFile(fs, TEST_PATH, - BlockReaderLocalTest.TEST_LENGTH, (short)1, RANDOM_SEED); + DFSTestUtil.createFile(fs, TEST_PATH, 1024, + BlockReaderLocalTest.TEST_LENGTH, blockSize, (short)1, RANDOM_SEED); try { DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1); } catch (InterruptedException e) { @@ -174,47 +179,50 @@ public void runBlockReaderLocalTest(BlockReaderLocalTest test, BlockReaderLocalTest.TEST_LENGTH); fsIn.close(); fsIn = null; - ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, TEST_PATH); - File dataFile = cluster.getBlockFile(0, block); - File metaFile = cluster.getBlockMetadataFile(0, block); - - ShortCircuitCache shortCircuitCache = - ClientContext.getFromConf(conf).getShortCircuitCache(0); + for (int i = 0; i < shortCircuitCachesNum; i++) { + ExtendedBlock block = DFSTestUtil.getAllBlocks(fs, TEST_PATH).get(i).getBlock(); + File dataFile = cluster.getBlockFile(0, block); + File metaFile = cluster.getBlockMetadataFile(0, block); + + ShortCircuitCache shortCircuitCache = + ClientContext.getFromConf(conf).getShortCircuitCache(block.getBlockId()); + test.setup(dataFile, checksum); + FileInputStream streams[] = { + new FileInputStream(dataFile), + new FileInputStream(metaFile) + }; + dataIn = streams[0]; + metaIn = streams[1]; + ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), + block.getBlockPoolId()); + raf = new RandomAccessFile( + new File(sockDir.getDir().getAbsolutePath(), + UUID.randomUUID().toString()), "rw"); + raf.setLength(8192); + FileInputStream shmStream = new FileInputStream(raf.getFD()); + shm = new ShortCircuitShm(ShmId.createRandom(), shmStream); + ShortCircuitReplica replica = + new ShortCircuitReplica(key, dataIn, metaIn, shortCircuitCache, + Time.now(), shm.allocAndRegisterSlot( + ExtendedBlockId.fromExtendedBlock(block))); + blockReaderLocal = new BlockReaderLocal.Builder( + new DfsClientConf.ShortCircuitConf(conf)). + setFilename(TEST_PATH.getName()). + setBlock(block). + setShortCircuitReplica(replica). + setCachingStrategy(new CachingStrategy(false, readahead)). + setVerifyChecksum(checksum). + build(); + dataIn = null; + metaIn = null; + test.doTest(blockReaderLocal, original, i * blockSize); + // BlockReaderLocal should not alter the file position. + Assert.assertEquals(0, streams[0].getChannel().position()); + Assert.assertEquals(0, streams[1].getChannel().position()); + } cluster.shutdown(); cluster = null; - test.setup(dataFile, checksum); - FileInputStream streams[] = { - new FileInputStream(dataFile), - new FileInputStream(metaFile) - }; - dataIn = streams[0]; - metaIn = streams[1]; - ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), - block.getBlockPoolId()); - raf = new RandomAccessFile( - new File(sockDir.getDir().getAbsolutePath(), - UUID.randomUUID().toString()), "rw"); - raf.setLength(8192); - FileInputStream shmStream = new FileInputStream(raf.getFD()); - shm = new ShortCircuitShm(ShmId.createRandom(), shmStream); - ShortCircuitReplica replica = - new ShortCircuitReplica(key, dataIn, metaIn, shortCircuitCache, - Time.now(), shm.allocAndRegisterSlot( - ExtendedBlockId.fromExtendedBlock(block))); - blockReaderLocal = new BlockReaderLocal.Builder( - new DfsClientConf.ShortCircuitConf(conf)). - setFilename(TEST_PATH.getName()). - setBlock(block). - setShortCircuitReplica(replica). - setCachingStrategy(new CachingStrategy(false, readahead)). - setVerifyChecksum(checksum). - build(); - dataIn = null; - metaIn = null; - test.doTest(blockReaderLocal, original); - // BlockReaderLocal should not alter the file position. - Assert.assertEquals(0, streams[0].getChannel().position()); - Assert.assertEquals(0, streams[1].getChannel().position()); + } finally { if (fsIn != null) fsIn.close(); if (fs != null) fs.close(); @@ -227,14 +235,15 @@ public void runBlockReaderLocalTest(BlockReaderLocalTest test, } } + private static class TestBlockReaderLocalImmediateClose extends BlockReaderLocalTest { } @Test public void testBlockReaderLocalImmediateClose() throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), true, 0); - runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), false, 0); + runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), true, 0, 1); + runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), false, 0, 1); } private static class TestBlockReaderSimpleReads @@ -260,30 +269,30 @@ public void doTest(BlockReaderLocal reader, byte original[]) @Test public void testBlockReaderSimpleReads() throws IOException { runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true, - HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); + HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT, 1); } @Test public void testBlockReaderSimpleReadsShortReadahead() throws IOException { runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true, - BlockReaderLocalTest.BYTES_PER_CHECKSUM - 1); + BlockReaderLocalTest.BYTES_PER_CHECKSUM - 1, 1); } @Test public void testBlockReaderSimpleReadsNoChecksum() throws IOException { runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), false, - HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); + HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT, 1); } @Test public void testBlockReaderSimpleReadsNoReadahead() throws IOException { - runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true, 0); + runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), true, 0, 1); } @Test public void testBlockReaderSimpleReadsNoChecksumNoReadahead() throws IOException { - runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), false, 0); + runBlockReaderLocalTest(new TestBlockReaderSimpleReads(), false, 0, 1); } private static class TestBlockReaderLocalArrayReads2 @@ -310,26 +319,26 @@ public void doTest(BlockReaderLocal reader, byte original[]) @Test public void testBlockReaderLocalArrayReads2() throws IOException { runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(), - true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); + true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT, 1); } @Test public void testBlockReaderLocalArrayReads2NoChecksum() throws IOException { runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(), - false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); + false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT, 1); } @Test public void testBlockReaderLocalArrayReads2NoReadahead() throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(), true, 0); + runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(), true, 0, 1); } @Test public void testBlockReaderLocalArrayReads2NoChecksumNoReadahead() throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(), false, 0); + runBlockReaderLocalTest(new TestBlockReaderLocalArrayReads2(), false, 0, 1); } private static class TestBlockReaderLocalByteBufferReads @@ -354,7 +363,7 @@ public void doTest(BlockReaderLocal reader, byte original[]) public void testBlockReaderLocalByteBufferReads() throws IOException { runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferReads(), - true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); + true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT, 1); } @Test @@ -362,21 +371,21 @@ public void testBlockReaderLocalByteBufferReadsNoChecksum() throws IOException { runBlockReaderLocalTest( new TestBlockReaderLocalByteBufferReads(), - false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); + false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT, 1); } @Test public void testBlockReaderLocalByteBufferReadsNoReadahead() throws IOException { runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferReads(), - true, 0); + true, 0, 1); } @Test public void testBlockReaderLocalByteBufferReadsNoChecksumNoReadahead() throws IOException { runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferReads(), - false, 0); + false, 0, 1); } /** @@ -425,7 +434,7 @@ public void doTest(BlockReaderLocal reader, byte original[]) public void testBlockReaderLocalByteBufferFastLaneReads() throws IOException { runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferFastLaneReads(), - true, 2 * BlockReaderLocalTest.BYTES_PER_CHECKSUM); + true, 2 * BlockReaderLocalTest.BYTES_PER_CHECKSUM, 1); } @Test @@ -433,21 +442,21 @@ public void testBlockReaderLocalByteBufferFastLaneReadsNoChecksum() throws IOException { runBlockReaderLocalTest( new TestBlockReaderLocalByteBufferFastLaneReads(), - false, 2 * BlockReaderLocalTest.BYTES_PER_CHECKSUM); + false, 2 * BlockReaderLocalTest.BYTES_PER_CHECKSUM, 1); } @Test public void testBlockReaderLocalByteBufferFastLaneReadsNoReadahead() throws IOException { runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferFastLaneReads(), - true, 0); + true, 0, 1); } @Test public void testBlockReaderLocalByteBufferFastLaneReadsNoChecksumNoReadahead() throws IOException { runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferFastLaneReads(), - false, 0); + false, 0, 1); } private static class TestBlockReaderLocalReadCorruptStart @@ -486,7 +495,7 @@ public void doTest(BlockReaderLocal reader, byte original[]) public void testBlockReaderLocalReadCorruptStart() throws IOException { runBlockReaderLocalTest(new TestBlockReaderLocalReadCorruptStart(), true, - HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); + HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT, 1); } private static class TestBlockReaderLocalReadCorrupt @@ -537,26 +546,26 @@ public void doTest(BlockReaderLocal reader, byte original[]) public void testBlockReaderLocalReadCorrupt() throws IOException { runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), true, - HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); + HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT, 1); } @Test public void testBlockReaderLocalReadCorruptNoChecksum() throws IOException { runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), false, - HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); + HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT, 1); } @Test public void testBlockReaderLocalReadCorruptNoReadahead() throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), true, 0); + runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), true, 0, 1); } @Test public void testBlockReaderLocalReadCorruptNoChecksumNoReadahead() throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), false, 0); + runBlockReaderLocalTest(new TestBlockReaderLocalReadCorrupt(), false, 0, 1); } private static class TestBlockReaderLocalWithMlockChanges @@ -589,28 +598,28 @@ public void doTest(BlockReaderLocal reader, byte original[]) public void testBlockReaderLocalWithMlockChanges() throws IOException { runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(), - true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); + true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT, 1); } @Test public void testBlockReaderLocalWithMlockChangesNoChecksum() throws IOException { runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(), - false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); + false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT, 1); } @Test public void testBlockReaderLocalWithMlockChangesNoReadahead() throws IOException { runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(), - true, 0); + true, 0, 1); } @Test public void testBlockReaderLocalWithMlockChangesNoChecksumNoReadahead() throws IOException { runBlockReaderLocalTest(new TestBlockReaderLocalWithMlockChanges(), - false, 0); + false, 0, 1); } private static class TestBlockReaderLocalOnFileWithoutChecksum @@ -662,56 +671,56 @@ public void doTest(BlockReaderLocal reader, byte original[]) public void testBlockReaderLocalOnFileWithoutChecksum() throws IOException { runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(), - true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); + true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT, 1); } @Test public void testBlockReaderLocalOnFileWithoutChecksumNoChecksum() throws IOException { runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(), - false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); + false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT, 1); } @Test public void testBlockReaderLocalOnFileWithoutChecksumNoReadahead() throws IOException { runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(), - true, 0); + true, 0, 1); } @Test public void testBlockReaderLocalOnFileWithoutChecksumNoChecksumNoReadahead() throws IOException { runBlockReaderLocalTest(new TestBlockReaderLocalOnFileWithoutChecksum(), - false, 0); + false, 0, 1); } @Test public void testBlockReaderLocalReadZeroBytes() throws IOException { runBlockReaderLocalTest(new TestBlockReaderLocalReadZeroBytes(), - true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); + true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT, 1); } @Test public void testBlockReaderLocalReadZeroBytesNoChecksum() throws IOException { runBlockReaderLocalTest(new TestBlockReaderLocalReadZeroBytes(), - false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); + false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT, 1); } @Test public void testBlockReaderLocalReadZeroBytesNoReadahead() throws IOException { runBlockReaderLocalTest(new TestBlockReaderLocalReadZeroBytes(), - true, 0); + true, 0, 1); } @Test public void testBlockReaderLocalReadZeroBytesNoChecksumNoReadahead() throws IOException { runBlockReaderLocalTest(new TestBlockReaderLocalReadZeroBytes(), - false, 0); + false, 0, 1); } @@ -845,4 +854,66 @@ public void testStatisticsForErasureCodingRead() throws IOException { } } } + + private static class TestBlockReaderFiveShortCircutCachesReads + extends BlockReaderLocalTest { + @Override + public void doTest(BlockReaderLocal reader, byte original[], int shift) + throws IOException { + byte buf[] = new byte[TEST_LENGTH]; + reader.readFully(buf, 0, 512); + assertArrayRegionsEqual(original, shift, buf, 0, 512); + reader.readFully(buf, 512, 512); + assertArrayRegionsEqual(original, 512 + shift, buf, 512, 512); + reader.readFully(buf, 1024, 513); + assertArrayRegionsEqual(original, 1024 + shift, buf, 1024, 513); + reader.readFully(buf, 1537, 514); + assertArrayRegionsEqual(original, 1537 + shift, buf, 1537, 514); + // Readahead is always at least the size of one chunk in this test. + Assert.assertTrue(reader.getMaxReadaheadLength() >= + BlockReaderLocalTest.BYTES_PER_CHECKSUM); + } + } + + @Test + public void testBlockReaderFiveShortCircutCachesReads() throws IOException { + runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(), true, + HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT, 5); + } + + @Test + public void testBlockReaderFiveShortCircutCachesReadsShortReadahead() throws IOException { + runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(), true, + BlockReaderLocalTest.BYTES_PER_CHECKSUM - 1, 5); + } + + @Test + public void testBlockReaderFiveShortCircutCachesReadsNoChecksum() throws IOException { + runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(), false, + HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT, 5); + } + + @Test + public void testBlockReaderFiveShortCircutCachesReadsNoReadahead() throws IOException { + runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(), true, 0, 5); + } + + @Test + public void testBlockReaderFiveShortCircutCachesReadsNoChecksumNoReadahead() + throws IOException { + runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(), false, 0, 5); + } + + @Test(expected = IllegalArgumentException.class) + public void testBlockReaderShortCircutCachesOutOfRangeBelow() throws IOException { + runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(), true, + HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT, 0); + } + + @Test(expected = IllegalArgumentException.class) + public void testBlockReaderShortCircutCachesOutOfRangeAbove() throws IOException { + runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(), true, + HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT, 555); + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java index c1905fcb79811..692524f85010a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java @@ -880,8 +880,8 @@ public void testRequestFileDescriptorsWhenULimit() throws Exception { Mockito.when(clientContext.getShortCircuitCache( blk.getBlock().getBlockId())).thenAnswer( (Answer) shortCircuitCacheCall -> { - ShortCircuitCache cache = Mockito.mock(ShortCircuitCache.class); - Mockito.when(cache.allocShmSlot( + ShortCircuitCache cache = Mockito.mock(ShortCircuitCache.class); + Mockito.when(cache.allocShmSlot( Mockito.any(DatanodeInfo.class), Mockito.any(DomainPeer.class), Mockito.any(MutableBoolean.class), From 7092504d24cd80add1aec9adb800d023cd3bcc47 Mon Sep 17 00:00:00 2001 From: pustota2009 Date: Mon, 6 Apr 2020 03:16:57 +0300 Subject: [PATCH 36/42] fixed code style --- .../client/impl/TestBlockReaderLocal.java | 73 +++++++++++-------- .../shortcircuit/TestShortCircuitCache.java | 6 +- 2 files changed, 46 insertions(+), 33 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocal.java index e27f320be40f7..b9d88af46c59a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocal.java @@ -130,13 +130,14 @@ public void doTest(BlockReaderLocal reader, byte original[]) throws IOException { // default: no-op } - public void doTest(BlockReaderLocal reader, byte original[], int shift) + public void doTest(BlockReaderLocal reader, int shift, byte original[]) throws IOException { // default: no-op } } public void runBlockReaderLocalTest(BlockReaderLocalTest test, - boolean checksum, long readahead, int shortCircuitCachesNum) throws IOException { + boolean checksum, long readahead, int shortCircuitCachesNum) + throws IOException { Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null)); MiniDFSCluster cluster = null; HdfsConfiguration conf = new HdfsConfiguration(); @@ -146,7 +147,8 @@ public void runBlockReaderLocalTest(BlockReaderLocalTest test, BlockReaderLocalTest.BYTES_PER_CHECKSUM); conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, "CRC32C"); conf.setLong(HdfsClientConfigKeys.DFS_CLIENT_CACHE_READAHEAD, readahead); - conf.setInt(HdfsClientConfigKeys.DFS_SHORT_CIRCUIT_NUM, shortCircuitCachesNum); + conf.setInt(HdfsClientConfigKeys.DFS_SHORT_CIRCUIT_NUM, + shortCircuitCachesNum); test.setConfiguration(conf); FileInputStream dataIn = null, metaIn = null; final Path TEST_PATH = new Path("/a"); @@ -180,16 +182,18 @@ public void runBlockReaderLocalTest(BlockReaderLocalTest test, fsIn.close(); fsIn = null; for (int i = 0; i < shortCircuitCachesNum; i++) { - ExtendedBlock block = DFSTestUtil.getAllBlocks(fs, TEST_PATH).get(i).getBlock(); + ExtendedBlock block = DFSTestUtil.getAllBlocks( + fs, TEST_PATH).get(i).getBlock(); File dataFile = cluster.getBlockFile(0, block); File metaFile = cluster.getBlockMetadataFile(0, block); ShortCircuitCache shortCircuitCache = - ClientContext.getFromConf(conf).getShortCircuitCache(block.getBlockId()); + ClientContext.getFromConf(conf).getShortCircuitCache( + block.getBlockId()); test.setup(dataFile, checksum); - FileInputStream streams[] = { - new FileInputStream(dataFile), - new FileInputStream(metaFile) + FileInputStream[] streams = { + new FileInputStream(dataFile), + new FileInputStream(metaFile) }; dataIn = streams[0]; metaIn = streams[1]; @@ -215,7 +219,7 @@ public void runBlockReaderLocalTest(BlockReaderLocalTest test, build(); dataIn = null; metaIn = null; - test.doTest(blockReaderLocal, original, i * blockSize); + test.doTest(blockReaderLocal, i * blockSize, original ); // BlockReaderLocal should not alter the file position. Assert.assertEquals(0, streams[0].getChannel().position()); Assert.assertEquals(0, streams[1].getChannel().position()); @@ -242,8 +246,10 @@ private static class TestBlockReaderLocalImmediateClose @Test public void testBlockReaderLocalImmediateClose() throws IOException { - runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), true, 0, 1); - runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), false, 0, 1); + runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), + true, 0, 1); + runBlockReaderLocalTest(new TestBlockReaderLocalImmediateClose(), + false, 0, 1); } private static class TestBlockReaderSimpleReads @@ -251,7 +257,7 @@ private static class TestBlockReaderSimpleReads @Override public void doTest(BlockReaderLocal reader, byte original[]) throws IOException { - byte buf[] = new byte[TEST_LENGTH]; + byte[] buf = new byte[TEST_LENGTH]; reader.readFully(buf, 0, 512); assertArrayRegionsEqual(original, 0, buf, 0, 512); reader.readFully(buf, 512, 512); @@ -300,7 +306,7 @@ private static class TestBlockReaderLocalArrayReads2 @Override public void doTest(BlockReaderLocal reader, byte original[]) throws IOException { - byte buf[] = new byte[TEST_LENGTH]; + byte[] buf = new byte[TEST_LENGTH]; reader.readFully(buf, 0, 10); assertArrayRegionsEqual(original, 0, buf, 0, 10); reader.readFully(buf, 10, 100); @@ -378,7 +384,7 @@ public void testBlockReaderLocalByteBufferReadsNoChecksum() public void testBlockReaderLocalByteBufferReadsNoReadahead() throws IOException { runBlockReaderLocalTest(new TestBlockReaderLocalByteBufferReads(), - true, 0, 1); + true, 0, 1); } @Test @@ -477,7 +483,7 @@ public void setup(File blockFile, boolean usingChecksums) public void doTest(BlockReaderLocal reader, byte original[]) throws IOException { - byte buf[] = new byte[TEST_LENGTH]; + byte[] buf = new byte[TEST_LENGTH]; if (usingChecksums) { try { reader.readFully(buf, 0, 10); @@ -517,7 +523,7 @@ public void setup(File blockFile, boolean usingChecksums) public void doTest(BlockReaderLocal reader, byte original[]) throws IOException { - byte buf[] = new byte[TEST_LENGTH]; + byte[] buf = new byte[TEST_LENGTH]; try { reader.readFully(buf, 0, 10); assertArrayRegionsEqual(original, 0, buf, 0, 10); @@ -858,9 +864,9 @@ public void testStatisticsForErasureCodingRead() throws IOException { private static class TestBlockReaderFiveShortCircutCachesReads extends BlockReaderLocalTest { @Override - public void doTest(BlockReaderLocal reader, byte original[], int shift) + public void doTest(BlockReaderLocal reader, int shift, byte original[]) throws IOException { - byte buf[] = new byte[TEST_LENGTH]; + byte[] buf = new byte[TEST_LENGTH]; reader.readFully(buf, 0, 512); assertArrayRegionsEqual(original, shift, buf, 0, 512); reader.readFully(buf, 512, 512); @@ -877,43 +883,50 @@ public void doTest(BlockReaderLocal reader, byte original[], int shift) @Test public void testBlockReaderFiveShortCircutCachesReads() throws IOException { - runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(), true, - HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT, 5); + runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(), + true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT, + 5); } @Test public void testBlockReaderFiveShortCircutCachesReadsShortReadahead() throws IOException { - runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(), true, - BlockReaderLocalTest.BYTES_PER_CHECKSUM - 1, 5); + runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(), + true,BlockReaderLocalTest.BYTES_PER_CHECKSUM - 1, + 5); } @Test public void testBlockReaderFiveShortCircutCachesReadsNoChecksum() throws IOException { - runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(), false, - HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT, 5); + runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(), + false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT, + 5); } @Test public void testBlockReaderFiveShortCircutCachesReadsNoReadahead() throws IOException { - runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(), true, 0, 5); + runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(), + true, 0, 5); } @Test public void testBlockReaderFiveShortCircutCachesReadsNoChecksumNoReadahead() throws IOException { - runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(), false, 0, 5); + runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(), + false, 0, 5); } @Test(expected = IllegalArgumentException.class) public void testBlockReaderShortCircutCachesOutOfRangeBelow() throws IOException { - runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(), true, - HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT, 0); + runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(), + true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT, + 0); } @Test(expected = IllegalArgumentException.class) public void testBlockReaderShortCircutCachesOutOfRangeAbove() throws IOException { - runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(), true, - HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT, 555); + runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(), + true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT, + 555); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java index 692524f85010a..53cac2adee350 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java @@ -887,10 +887,10 @@ public void testRequestFileDescriptorsWhenULimit() throws Exception { Mockito.any(MutableBoolean.class), Mockito.any(ExtendedBlockId.class), Mockito.anyString())) - .thenAnswer((Answer) call -> null); + .thenAnswer((Answer) call -> null); - return cache; - } + return cache; + } ); DatanodeInfo[] nodes = blk.getLocations(); From a6a9d3cce5e22c1dce4027d5c56ed381f0dece2c Mon Sep 17 00:00:00 2001 From: pustota2009 Date: Mon, 6 Apr 2020 08:11:37 +0300 Subject: [PATCH 37/42] Unit tests added --- .../client/impl/TestBlockReaderLocal.java | 23 +++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocal.java index b9d88af46c59a..d0fd3aa702f16 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocal.java @@ -130,7 +130,7 @@ public void doTest(BlockReaderLocal reader, byte original[]) throws IOException { // default: no-op } - public void doTest(BlockReaderLocal reader, int shift, byte original[]) + public void doTest(BlockReaderLocal reader, byte[] original, int shift) throws IOException { // default: no-op } } @@ -219,7 +219,7 @@ public void runBlockReaderLocalTest(BlockReaderLocalTest test, build(); dataIn = null; metaIn = null; - test.doTest(blockReaderLocal, i * blockSize, original ); + test.doTest(blockReaderLocal, original, i * blockSize); // BlockReaderLocal should not alter the file position. Assert.assertEquals(0, streams[0].getChannel().position()); Assert.assertEquals(0, streams[1].getChannel().position()); @@ -864,7 +864,7 @@ public void testStatisticsForErasureCodingRead() throws IOException { private static class TestBlockReaderFiveShortCircutCachesReads extends BlockReaderLocalTest { @Override - public void doTest(BlockReaderLocal reader, int shift, byte original[]) + public void doTest(BlockReaderLocal reader, byte[] original, int shift) throws IOException { byte[] buf = new byte[TEST_LENGTH]; reader.readFully(buf, 0, 512); @@ -889,21 +889,24 @@ public void testBlockReaderFiveShortCircutCachesReads() throws IOException { } @Test - public void testBlockReaderFiveShortCircutCachesReadsShortReadahead() throws IOException { + public void testBlockReaderFiveShortCircutCachesReadsShortReadahead() + throws IOException { runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(), - true,BlockReaderLocalTest.BYTES_PER_CHECKSUM - 1, + true, BlockReaderLocalTest.BYTES_PER_CHECKSUM - 1, 5); } @Test - public void testBlockReaderFiveShortCircutCachesReadsNoChecksum() throws IOException { + public void testBlockReaderFiveShortCircutCachesReadsNoChecksum() + throws IOException { runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(), false, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT, 5); } @Test - public void testBlockReaderFiveShortCircutCachesReadsNoReadahead() throws IOException { + public void testBlockReaderFiveShortCircutCachesReadsNoReadahead() + throws IOException { runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(), true, 0, 5); } @@ -916,14 +919,16 @@ public void testBlockReaderFiveShortCircutCachesReadsNoChecksumNoReadahead() } @Test(expected = IllegalArgumentException.class) - public void testBlockReaderShortCircutCachesOutOfRangeBelow() throws IOException { + public void testBlockReaderShortCircutCachesOutOfRangeBelow() + throws IOException { runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(), true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT, 0); } @Test(expected = IllegalArgumentException.class) - public void testBlockReaderShortCircutCachesOutOfRangeAbove() throws IOException { + public void testBlockReaderShortCircutCachesOutOfRangeAbove() + throws IOException { runBlockReaderLocalTest(new TestBlockReaderFiveShortCircutCachesReads(), true, HdfsClientConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT, 555); From 8bdea3433a3179b06d27e9c1d2d8259de59b75f4 Mon Sep 17 00:00:00 2001 From: pustota2009 <61382543+pustota2009@users.noreply.github.com> Date: Tue, 12 May 2020 08:52:07 +0300 Subject: [PATCH 38/42] Update HdfsClientConfigKeys.java rename dfs.short.circuit.num to dfs.client.short.circuit.num --- .../org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java index f169ab1d0a701..922680c12eb86 100755 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java @@ -141,7 +141,7 @@ public interface HdfsClientConfigKeys { "dfs.short.circuit.shared.memory.watcher.interrupt.check.ms"; int DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT = 60000; - String DFS_SHORT_CIRCUIT_NUM = "dfs.short.circuit.num"; + String DFS_SHORT_CIRCUIT_NUM = "dfs.client.short.circuit.num"; int DFS_SHORT_CIRCUIT_NUM_DEFAULT = 1; String DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY = "dfs.client.slow.io.warning.threshold.ms"; From 44520e13fd46f05438a7dea9cbe9fa46c9792786 Mon Sep 17 00:00:00 2001 From: pustota2009 <61382543+pustota2009@users.noreply.github.com> Date: Tue, 12 May 2020 08:53:21 +0300 Subject: [PATCH 39/42] Update hdfs-default.xml --- .../hadoop-hdfs/src/main/resources/hdfs-default.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 756766b9bb294..f0d698d4847b5 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -4138,7 +4138,7 @@ - dfs.short.circuit.num + dfs.client.short.circuit.num 1 Number of short-circuit caches. This setting should From bbfbf1e9a12fe42e1382af822555ec0ee6d5fb98 Mon Sep 17 00:00:00 2001 From: pustota2009 <61382543+pustota2009@users.noreply.github.com> Date: Tue, 12 May 2020 08:55:24 +0300 Subject: [PATCH 40/42] Update ClientContext.java return back getShortCircuitCache() --- .../src/main/java/org/apache/hadoop/hdfs/ClientContext.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java index 26e71ab48adac..7a03240e80d9c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java @@ -237,6 +237,10 @@ public String getConfString() { return confString; } + public ShortCircuitCache getShortCircuitCache() { + return shortCircuitCache[0]; + } + public ShortCircuitCache getShortCircuitCache(long idx) { return shortCircuitCache[(int) (idx % clientShortCircuitNum)]; } From ff3f977fbaf9662b20090ba2ec092ef28951f32b Mon Sep 17 00:00:00 2001 From: pustota2009 <61382543+pustota2009@users.noreply.github.com> Date: Tue, 12 May 2020 09:51:51 +0300 Subject: [PATCH 41/42] Update ClientContext.java added comment --- .../src/main/java/org/apache/hadoop/hdfs/ClientContext.java | 1 + 1 file changed, 1 insertion(+) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java index 7a03240e80d9c..47488cbeaf5f0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java @@ -237,6 +237,7 @@ public String getConfString() { return confString; } + // Keeping this deprecated method to avoid runtime issues public ShortCircuitCache getShortCircuitCache() { return shortCircuitCache[0]; } From 9aa369fa807c48c2bc8d38c9cffe0dfd42ede73a Mon Sep 17 00:00:00 2001 From: pustota2009 <61382543+pustota2009@users.noreply.github.com> Date: Tue, 12 May 2020 13:49:23 +0300 Subject: [PATCH 42/42] Update ClientContext.java refact --- .../src/main/java/org/apache/hadoop/hdfs/ClientContext.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java index 47488cbeaf5f0..0d54534afdc8f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java @@ -239,7 +239,7 @@ public String getConfString() { // Keeping this deprecated method to avoid runtime issues public ShortCircuitCache getShortCircuitCache() { - return shortCircuitCache[0]; + return getShortCircuitCache(0); } public ShortCircuitCache getShortCircuitCache(long idx) {