Skip to content
This repository was archived by the owner on Feb 9, 2021. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ private BlockReader getBlockReaderLocal() throws InvalidToken {
}
return null;
}
ShortCircuitCache cache = clientContext.getShortCircuitCache();
ShortCircuitCache cache = clientContext.getShortCircuitCache(block.getBlockId());
ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId());
ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this);
InvalidToken exc = info.getInvalidTokenException();
Expand Down Expand Up @@ -444,7 +444,7 @@ public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
if (curPeer.fromCache) remainingCacheTries--;
DomainPeer peer = (DomainPeer)curPeer.peer;
Slot slot = null;
ShortCircuitCache cache = clientContext.getShortCircuitCache();
ShortCircuitCache cache = clientContext.getShortCircuitCache(block.getBlockId());
try {
MutableBoolean usedPeer = new MutableBoolean(false);
slot = cache.allocShmSlot(datanode, peer, usedPeer,
Expand Down Expand Up @@ -503,7 +503,7 @@ public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
*/
private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer,
Slot slot) throws IOException {
ShortCircuitCache cache = clientContext.getShortCircuitCache();
ShortCircuitCache cache = clientContext.getShortCircuitCache(slot.getBlockId().getBlockId());
final DataOutputStream out =
new DataOutputStream(new BufferedOutputStream(peer.getOutputStream()));
SlotId slotId = slot == null ? null : slot.getSlotId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public class ClientContext {
/**
* Caches short-circuit file descriptors, mmap regions.
*/
private final ShortCircuitCache shortCircuitCache;
private final ShortCircuitCache[] shortCircuitCache;

/**
* Caches TCP and UNIX domain sockets for reuse.
Expand Down Expand Up @@ -89,18 +89,27 @@ public class ClientContext {
* didn't match its config values yet.
*/
private boolean printedConfWarning = false;

/**
* ShorCircuitCache array size.
*/
private final int clientShortCircuitNum;

private ClientContext(String name, Conf conf) {
this.name = name;
this.confString = confAsString(conf);
this.shortCircuitCache = new ShortCircuitCache(
this.clientShortCircuitNum = conf.clientShortCircuitNum;
this.shortCircuitCache = new ShortCircuitCache[this.clientShortCircuitNum];
for (int i = 0; i < this.clientShortCircuitNum; i++) {
this.shortCircuitCache[i] = new ShortCircuitCache(
conf.shortCircuitStreamsCacheSize,
conf.shortCircuitStreamsCacheExpiryMs,
conf.shortCircuitMmapCacheSize,
conf.shortCircuitMmapCacheExpiryMs,
conf.shortCircuitMmapCacheRetryTimeout,
conf.shortCircuitCacheStaleThresholdMs,
conf.shortCircuitSharedMemoryWatcherInterruptCheckMs);
}
this.peerCache =
new PeerCache(conf.socketCacheCapacity, conf.socketCacheExpiry);
this.useLegacyBlockReaderLocal = conf.useLegacyBlockReaderLocal;
Expand Down Expand Up @@ -181,10 +190,10 @@ public String getConfString() {
return confString;
}

public ShortCircuitCache getShortCircuitCache() {
return shortCircuitCache;
public ShortCircuitCache getShortCircuitCache(long idx) {
return shortCircuitCache[(int) (idx % clientShortCircuitNum)];
}

public PeerCache getPeerCache() {
return peerCache;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ public static class Conf {
final long shortCircuitMmapCacheExpiryMs;
final long shortCircuitMmapCacheRetryTimeout;
final long shortCircuitCacheStaleThresholdMs;
final int clientShortCircuitNum;

public Conf(Configuration conf) {
// The hdfsTimeout is currently the same as the ipc timeout
Expand Down Expand Up @@ -463,6 +464,11 @@ public Conf(Configuration conf) {
dfsclientSlowIoWarningThresholdMs = conf.getLong(
DFSConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_KEY,
DFSConfigKeys.DFS_CLIENT_SLOW_IO_WARNING_THRESHOLD_DEFAULT);

clientShortCircuitNum = conf.getInt(DFS_CLIENT_SHORT_CIRCUIT_NUM,
DFS_CLIENT_SHORT_CIRCUIT_NUM_DEFAULT);
clientShortCircuitNum = clientShortCircuitNum > 3 ? 3 : this.clientShortCircuitNum;
clientShortCircuitNum = clientShortCircuitNum < 1 ? 1 : this.clientShortCircuitNum;
}

public boolean isUseLegacyBlockReaderLocal() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -464,6 +464,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final long DFS_CLIENT_MMAP_RETRY_TIMEOUT_MS_DEFAULT = 5 * 60 * 1000;
public static final String DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS = "dfs.client.short.circuit.replica.stale.threshold.ms";
public static final long DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS_DEFAULT = 30 * 60 * 1000;
public static final String DFS_CLIENT_SHORT_CIRCUIT_NUM = "dfs.client.short.circuit.num";
public static final int DFS_CLIENT_SHORT_CIRCUIT_NUM_DEFAULT = 1;

// property for fsimage compression
public static final String DFS_IMAGE_COMPRESS_KEY = "dfs.image.compress";
Expand Down