Skip to content

Commit 2abcf77

Browse files
committed
HDFS-15202 Boost short circuit cache (rebase PR-1884) (apache#2016)
1 parent 4525292 commit 2abcf77

File tree

9 files changed

+214
-76
lines changed

9 files changed

+214
-76
lines changed

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientContext.java

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public class ClientContext {
7777
/**
7878
* Caches short-circuit file descriptors, mmap regions.
7979
*/
80-
private final ShortCircuitCache shortCircuitCache;
80+
private final ShortCircuitCache[] shortCircuitCache;
8181

8282
/**
8383
* Caches TCP and UNIX domain sockets for reuse.
@@ -132,13 +132,23 @@ public class ClientContext {
132132
*/
133133
private DeadNodeDetector deadNodeDetector = null;
134134

135+
/**
136+
* ShortCircuitCache array size.
137+
*/
138+
private final int clientShortCircuitNum;
139+
135140
private ClientContext(String name, DfsClientConf conf,
136141
Configuration config) {
137142
final ShortCircuitConf scConf = conf.getShortCircuitConf();
138143

139144
this.name = name;
140145
this.confString = scConf.confAsString();
141-
this.shortCircuitCache = ShortCircuitCache.fromConf(scConf);
146+
this.clientShortCircuitNum = conf.getClientShortCircuitNum();
147+
this.shortCircuitCache = new ShortCircuitCache[this.clientShortCircuitNum];
148+
for (int i = 0; i < this.clientShortCircuitNum; i++) {
149+
this.shortCircuitCache[i] = ShortCircuitCache.fromConf(scConf);
150+
}
151+
142152
this.peerCache = new PeerCache(scConf.getSocketCacheCapacity(),
143153
scConf.getSocketCacheExpiry());
144154
this.keyProviderCache = new KeyProviderCache(
@@ -228,7 +238,11 @@ public String getConfString() {
228238
}
229239

230240
public ShortCircuitCache getShortCircuitCache() {
231-
return shortCircuitCache;
241+
return shortCircuitCache[0];
242+
}
243+
244+
public ShortCircuitCache getShortCircuitCache(long idx) {
245+
return shortCircuitCache[(int) (idx % clientShortCircuitNum)];
232246
}
233247

234248
public PeerCache getPeerCache() {

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,8 @@ public interface HdfsClientConfigKeys {
144144
"dfs.short.circuit.shared.memory.watcher.interrupt.check.ms";
145145
int DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT =
146146
60000;
147+
String DFS_CLIENT_SHORT_CIRCUIT_NUM = "dfs.client.short.circuit.num";
148+
int DFS_CLIENT_SHORT_CIRCUIT_NUM_DEFAULT = 1;
147149
String DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY =
148150
"dfs.client.slow.io.warning.threshold.ms";
149151
long DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT = 30000;

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/BlockReaderFactory.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -476,7 +476,8 @@ private BlockReader getBlockReaderLocal() throws IOException {
476476
"giving up on BlockReaderLocal.", this, pathInfo);
477477
return null;
478478
}
479-
ShortCircuitCache cache = clientContext.getShortCircuitCache();
479+
ShortCircuitCache cache =
480+
clientContext.getShortCircuitCache(block.getBlockId());
480481
ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(),
481482
block.getBlockPoolId());
482483
ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this);
@@ -527,7 +528,8 @@ public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
527528
if (curPeer.fromCache) remainingCacheTries--;
528529
DomainPeer peer = (DomainPeer)curPeer.peer;
529530
Slot slot = null;
530-
ShortCircuitCache cache = clientContext.getShortCircuitCache();
531+
ShortCircuitCache cache =
532+
clientContext.getShortCircuitCache(block.getBlockId());
531533
try {
532534
MutableBoolean usedPeer = new MutableBoolean(false);
533535
slot = cache.allocShmSlot(datanode, peer, usedPeer,
@@ -582,7 +584,8 @@ public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
582584
*/
583585
private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer,
584586
Slot slot) throws IOException {
585-
ShortCircuitCache cache = clientContext.getShortCircuitCache();
587+
ShortCircuitCache cache =
588+
clientContext.getShortCircuitCache(block.getBlockId());
586589
final DataOutputStream out =
587590
new DataOutputStream(new BufferedOutputStream(peer.getOutputStream(), SMALL_BUFFER_SIZE));
588591
SlotId slotId = slot == null ? null : slot.getSlotId();

hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/DfsClientConf.java

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@ public class DfsClientConf {
142142
private final long refreshReadBlockLocationsMS;
143143

144144
private final ShortCircuitConf shortCircuitConf;
145+
private final int clientShortCircuitNum;
145146

146147
private final long hedgedReadThresholdMillis;
147148
private final int hedgedReadThreadpoolSize;
@@ -272,8 +273,6 @@ public DfsClientConf(Configuration conf) {
272273
HdfsClientConfigKeys.
273274
DFS_CLIENT_REFRESH_READ_BLOCK_LOCATIONS_MS_DEFAULT);
274275

275-
shortCircuitConf = new ShortCircuitConf(conf);
276-
277276
hedgedReadThresholdMillis = conf.getLong(
278277
HedgedRead.THRESHOLD_MILLIS_KEY,
279278
HedgedRead.THRESHOLD_MILLIS_DEFAULT);
@@ -296,6 +295,17 @@ public DfsClientConf(Configuration conf) {
296295
leaseHardLimitPeriod =
297296
conf.getLong(HdfsClientConfigKeys.DFS_LEASE_HARDLIMIT_KEY,
298297
HdfsClientConfigKeys.DFS_LEASE_HARDLIMIT_DEFAULT) * 1000;
298+
299+
shortCircuitConf = new ShortCircuitConf(conf);
300+
clientShortCircuitNum = conf.getInt(
301+
HdfsClientConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_NUM,
302+
HdfsClientConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_NUM_DEFAULT);
303+
Preconditions.checkArgument(clientShortCircuitNum >= 1,
304+
HdfsClientConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_NUM +
305+
"can't be less then 1.");
306+
Preconditions.checkArgument(clientShortCircuitNum <= 5,
307+
HdfsClientConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_NUM +
308+
"can't be more then 5.");
299309
}
300310

301311
@SuppressWarnings("unchecked")
@@ -601,6 +611,13 @@ public long getSlowIoWarningThresholdMs() {
601611
return slowIoWarningThresholdMs;
602612
}
603613

614+
/*
615+
* @return the clientShortCircuitNum
616+
*/
617+
public int getClientShortCircuitNum() {
618+
return clientShortCircuitNum;
619+
}
620+
604621
/**
605622
* @return the hedgedReadThresholdMillis
606623
*/

hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4188,6 +4188,16 @@
41884188
</description>
41894189
</property>
41904190

4191+
<property>
4192+
<name>dfs.client.short.circuit.num</name>
4193+
<value>1</value>
4194+
<description>
4195+
Number of short-circuit caches. This setting should
4196+
be in the range 1 - 5. Lower values will result in lower CPU consumption; higher
4197+
values may speed up massive parallel reading files.
4198+
</description>
4199+
</property>
4200+
41914201
<property>
41924202
<name>dfs.client.read.striped.threadpool.size</name>
41934203
<value>18</value>

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,7 @@ public void testZeroCopyMmapCache() throws Exception {
358358
fsIn.close();
359359
fsIn = fs.open(TEST_PATH);
360360
final ShortCircuitCache cache = ClientContext.get(
361-
CONTEXT, conf).getShortCircuitCache();
361+
CONTEXT, conf).getShortCircuitCache(0);
362362
cache.accept(new CountingVisitor(0, 5, 5, 0));
363363
results[0] = fsIn.read(null, BLOCK_SIZE,
364364
EnumSet.of(ReadOption.SKIP_CHECKSUMS));
@@ -654,12 +654,12 @@ public void testZeroCopyReadOfCachedData() throws Exception {
654654
BLOCK_SIZE), byteBufferToArray(result2));
655655
fsIn2.releaseBuffer(result2);
656656
fsIn2.close();
657-
657+
658658
// check that the replica is anchored
659659
final ExtendedBlock firstBlock =
660660
DFSTestUtil.getFirstBlock(fs, TEST_PATH);
661661
final ShortCircuitCache cache = ClientContext.get(
662-
CONTEXT, conf).getShortCircuitCache();
662+
CONTEXT, conf).getShortCircuitCache(0);
663663
waitForReplicaAnchorStatus(cache, firstBlock, true, true, 1);
664664
// Uncache the replica
665665
fs.removeCacheDirective(directiveId);

hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderFactory.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -389,7 +389,7 @@ private void testShortCircuitCacheUnbufferWithDisableInterval(
389389

390390
try (FSDataInputStream in = dfs.open(testFile)) {
391391
Assert.assertEquals(0,
392-
dfs.getClient().getClientContext().getShortCircuitCache()
392+
dfs.getClient().getClientContext().getShortCircuitCache(0)
393393
.getReplicaInfoMapSize());
394394

395395
final byte[] buf = new byte[testFileLen];
@@ -398,12 +398,12 @@ private void testShortCircuitCacheUnbufferWithDisableInterval(
398398

399399
// Set cache size to 0 so the replica marked evictable by unbuffer
400400
// will be purged immediately.
401-
dfs.getClient().getClientContext().getShortCircuitCache()
401+
dfs.getClient().getClientContext().getShortCircuitCache(0)
402402
.setMaxTotalSize(0);
403403
LOG.info("Unbuffering");
404404
in.unbuffer();
405405
Assert.assertEquals(0,
406-
dfs.getClient().getClientContext().getShortCircuitCache()
406+
dfs.getClient().getClientContext().getShortCircuitCache(0)
407407
.getReplicaInfoMapSize());
408408

409409
DFSTestUtil.appendFile(dfs, testFile, "append more data");
@@ -432,7 +432,7 @@ private void validateReadResult(final DistributedFileSystem dfs,
432432
final int expectedScrRepMapSize) {
433433
Assert.assertThat(expected, CoreMatchers.is(actual));
434434
Assert.assertEquals(expectedScrRepMapSize,
435-
dfs.getClient().getClientContext().getShortCircuitCache()
435+
dfs.getClient().getClientContext().getShortCircuitCache(0)
436436
.getReplicaInfoMapSize());
437437
}
438438

@@ -467,7 +467,7 @@ public void testShortCircuitReadFromServerWithoutShm() throws Exception {
467467
calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
468468
Assert.assertTrue(Arrays.equals(contents, expected));
469469
final ShortCircuitCache cache =
470-
fs.getClient().getClientContext().getShortCircuitCache();
470+
fs.getClient().getClientContext().getShortCircuitCache(0);
471471
final DatanodeInfo datanode = new DatanodeInfoBuilder()
472472
.setNodeID(cluster.getDataNodes().get(0).getDatanodeId())
473473
.build();
@@ -516,7 +516,7 @@ public void testShortCircuitReadFromClientWithoutShm() throws Exception {
516516
calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
517517
Assert.assertTrue(Arrays.equals(contents, expected));
518518
final ShortCircuitCache cache =
519-
fs.getClient().getClientContext().getShortCircuitCache();
519+
fs.getClient().getClientContext().getShortCircuitCache(0);
520520
Assert.assertEquals(null, cache.getDfsClientShmManager());
521521
cluster.shutdown();
522522
sockDir.close();
@@ -548,7 +548,7 @@ public void testShortCircuitCacheShutdown() throws Exception {
548548
calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
549549
Assert.assertTrue(Arrays.equals(contents, expected));
550550
final ShortCircuitCache cache =
551-
fs.getClient().getClientContext().getShortCircuitCache();
551+
fs.getClient().getClientContext().getShortCircuitCache(0);
552552
cache.close();
553553
Assert.assertTrue(cache.getDfsClientShmManager().
554554
getDomainSocketWatcher().isClosed());

0 commit comments

Comments
 (0)