Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions hadoop-hdfs-project/hadoop-hdfs-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,7 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<configuration>
<source>8</source>
<excludePackageNames>org.apache.hadoop.hdfs.protocol.proto</excludePackageNames>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs;

import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.hadoop.classification.VisibleForTesting;
Expand Down Expand Up @@ -68,7 +69,8 @@ public void sleepBeforeHedgedGet() {}

public void delayWhenRenewLeaseTimeout() {}

public void onCreateBlockReader(LocatedBlock block, int chunkIndex, long offset, long length) {}
public void onCreateBlockReader(LocatedBlock block, int chunkIndex,
long offset, long length) throws IOException {}

public void failCreateBlockReader() throws InvalidBlockTokenException {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
import org.apache.hadoop.util.Time;

import java.io.EOFException;
import java.io.IOException;
Expand All @@ -54,7 +55,9 @@
import java.util.Set;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Collectors;

import static org.apache.hadoop.hdfs.util.IOUtilsClient.updateReadStatistics;

Expand All @@ -76,6 +79,8 @@ public class DFSStripedInputStream extends DFSInputStream {
protected ByteBuffer parityBuf;
private final ErasureCodingPolicy ecPolicy;
private RawErasureDecoder decoder;
private final ConcurrentHashMap<DatanodeInfo, RetryTimeoutPair> sleepingNodes =
new ConcurrentHashMap<>();

/**
* Indicate the start/end offset of the current buffered stripe in the
Expand Down Expand Up @@ -232,6 +237,101 @@ private long getOffsetInBlockGroup(long pos) {
return pos - currentLocatedBlock.getStartOffset();
}

private static class RetryTimeoutPair {
private int attempts = 0;
private long sleepTimestamp = 0;
}

/**
* Gets the list of nodes that are waiting to be retried.
* @return The list of sleeping nodes
*/
protected Collection<DatanodeInfo> checkSleepingNodes() {
return sleepingNodes.entrySet().stream().filter(entry ->
entry.getValue().sleepTimestamp > Time.monotonicNow())
.map(Map.Entry::getKey)
.collect(Collectors.toSet());
}

private long getSleepingTimestamp(int attempts) {
// Introducing a random factor to the wait time before another retry.
// The wait time is dependent on # of failures and a random factor.
// At the first time of getting a BlockMissingException, the wait time
// is a random number between 0..3000 ms. If the first retry
// still fails, we will wait 3000 ms grace period before the 2nd retry.
// Also at the second retry, the waiting window is expanded to 6000 ms
// alleviating the request rate from the server. Similarly the 3rd retry
// will wait 6000ms grace period before retry and the waiting window is
// expanded to 9000ms.
// grace period for the last round of attempt
double waitTime = dfsClient.getConf().getTimeWindow() * attempts +
// expanding time window for each failure
dfsClient.getConf().getTimeWindow() * attempts *
ThreadLocalRandom.current().nextDouble();
return Time.monotonicNow() + (long)waitTime;
}

/**
* Checks whether the {@param block} is on a sleeping node.
* @param block the block to check
* @return True if the {@param block} is on a sleeping node, otherwise false.
*/
protected boolean isBlockOnSleepingNode(LocatedBlock block) {
DNAddrPair dnInfo = getBestNodeDNAddrPair(block, null);
return dnInfo != null && checkSleepingNodes().contains(dnInfo.info);
}

/**
* Sleeps the thread until the next retry can be attempted.
*/
protected void sleepUntilRetry() {
long sleepTimestamp = 0;
int countSleepingNodes = 0;
for (RetryTimeoutPair rt : sleepingNodes.values()) {
if (rt.sleepTimestamp > Time.monotonicNow()) {
countSleepingNodes++;
if (sleepTimestamp == 0) {
sleepTimestamp = rt.sleepTimestamp;
} else {
sleepTimestamp = Math.min(sleepTimestamp, rt.sleepTimestamp);
}
}
}
long duration = sleepTimestamp - Time.monotonicNow();
try {
DFSClient.LOG.warn("Failed to acquire {}/{} blocks, will wait " +
"for {} msec and try again", countSleepingNodes,
dataBlkNum + parityBlkNum, duration);
Thread.sleep(duration);
} catch (InterruptedException e) {
DFSClient.LOG.warn("Error read retry backoff interrupted");
}
}

/**
* Update the {@param dnInfo} node with a new sleepTimestamp if it has any
* retries left, otherwise put it into the deadNodes list.
* @param dnInfo Update the sleeping information for this node.
* @return True if the node {@param dnInfo} will be retried, otherwise false.
*/
protected boolean updateSleepingNodes(DatanodeInfo dnInfo) {
// Ignore nodes that are already in the deadNodes list.
if (!getLocalDeadNodes().containsKey(dnInfo)) {
RetryTimeoutPair rt = sleepingNodes.computeIfAbsent(dnInfo,
k -> new RetryTimeoutPair());
rt.attempts++;
if (rt.attempts > dfsClient.getConf().getMaxBlockAcquireFailures()) {
// If we are out of retries, then put the node into the deadNodes list
sleepingNodes.remove(dnInfo);
addToLocalDeadNodes(dnInfo);
} else {
rt.sleepTimestamp = getSleepingTimestamp(rt.attempts);
return true;
}
}
return false;
}

boolean createBlockReader(LocatedBlock block, long offsetInBlock,
LocatedBlock[] targetBlocks, BlockReaderInfo[] readerInfos,
int chunkIndex, long readTo) throws IOException {
Expand All @@ -248,18 +348,18 @@ boolean createBlockReader(LocatedBlock block, long offsetInBlock,
targetBlocks[chunkIndex] = block;

// internal block has one location, just rule out the deadNodes
dnInfo = getBestNodeDNAddrPair(block, null);
dnInfo = getBestNodeDNAddrPair(block, checkSleepingNodes());
if (dnInfo == null) {
break;
}
if (readTo < 0 || readTo > block.getBlockSize()) {
readTo = block.getBlockSize();
}
DFSClientFaultInjector.get().onCreateBlockReader(block, chunkIndex, offsetInBlock,
readTo - offsetInBlock);
reader = getBlockReader(block, offsetInBlock,
readTo - offsetInBlock,
dnInfo.addr, dnInfo.storageType, dnInfo.info);
DFSClientFaultInjector.get().onCreateBlockReader(block, chunkIndex, offsetInBlock,
readTo - offsetInBlock);
} catch (IOException e) {
if (e instanceof InvalidEncryptionKeyException &&
retry.shouldRefetchEncryptionKey()) {
Expand All @@ -273,12 +373,11 @@ boolean createBlockReader(LocatedBlock block, long offsetInBlock,
fetchBlockAt(block.getStartOffset());
retry.refetchToken();
} else {
//TODO: handles connection issues
DFSClient.LOG.warn("Failed to connect to " + dnInfo.addr + " for " +
"block" + block.getBlock(), e);
// re-fetch the block in case the block has been moved
fetchBlockAt(block.getStartOffset());
addToLocalDeadNodes(dnInfo.info);
updateSleepingNodes(dnInfo.info);
}
}
if (reader != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,14 @@ void prepareDecodeInputs() {
@Override
boolean prepareParityChunk(int index) {
Preconditions.checkState(index >= dataBlkNum &&
alignedStripe.chunks[index] == null);
alignedStripe.isNull(index));

int bufLen = (int) alignedStripe.getSpanInBlock();
decodeInputs[index] = new ECChunk(codingBuffer.duplicate(), index * bufLen,
bufLen);

alignedStripe.chunks[index] =
new StripingChunk(decodeInputs[index].getBuffer());
alignedStripe.setChunk(index,
new StripingChunk(decodeInputs[index].getBuffer()));

return true;
}
Expand All @@ -86,9 +86,9 @@ void initDecodeInputs(AlignedStripe alignedStripe) {
}

for (int i = 0; i < dataBlkNum; i++) {
if (alignedStripe.chunks[i] == null) {
alignedStripe.chunks[i] =
new StripingChunk(decodeInputs[i].getBuffer());
if (alignedStripe.isNull(i)) {
alignedStripe.setChunk(i,
new StripingChunk(decodeInputs[i].getBuffer()));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import org.apache.hadoop.hdfs.protocol.BlockType;

import java.util.concurrent.TimeUnit;

/**
* A utility class that maintains statistics for reading.
*/
Expand All @@ -29,7 +31,7 @@ public class ReadStatistics {
private long totalZeroCopyBytesRead;

private BlockType blockType = BlockType.CONTIGUOUS;
private long totalEcDecodingTimeMillis;
private long totalEcDecodingTimeNanos;

public ReadStatistics() {
clear();
Expand Down Expand Up @@ -92,7 +94,14 @@ public synchronized BlockType getBlockType() {
* Return the total time in milliseconds used for erasure coding decoding.
*/
public synchronized long getTotalEcDecodingTimeMillis() {
return totalEcDecodingTimeMillis;
return TimeUnit.NANOSECONDS.toMillis(totalEcDecodingTimeNanos);
}

/**
* Return the total time in nanoseconds used for erasure coding decoding.
*/
public synchronized long getTotalEcDecodingTimeNanos() {
return totalEcDecodingTimeNanos;
}

public synchronized void addRemoteBytes(long amt) {
Expand All @@ -117,8 +126,8 @@ public synchronized void addZeroCopyBytes(long amt) {
this.totalZeroCopyBytesRead += amt;
}

public synchronized void addErasureCodingDecodingTime(long millis) {
this.totalEcDecodingTimeMillis += millis;
public synchronized void addErasureCodingDecodingTime(long nanos) {
this.totalEcDecodingTimeNanos += nanos;
}

synchronized void setBlockType(BlockType blockType) {
Expand All @@ -130,6 +139,6 @@ public synchronized void clear() {
this.totalLocalBytesRead = 0;
this.totalShortCircuitBytesRead = 0;
this.totalZeroCopyBytesRead = 0;
this.totalEcDecodingTimeMillis = 0;
this.totalEcDecodingTimeNanos = 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -63,25 +63,25 @@ void prepareDecodeInputs() {
cur.position(pos);
cur.limit(pos + bufLen);
decodeInputs[i] = new ECChunk(cur.slice(), 0, bufLen);
if (alignedStripe.chunks[i] == null) {
alignedStripe.chunks[i] =
new StripingChunk(decodeInputs[i].getBuffer());
if (alignedStripe.isNull(i)) {
alignedStripe.setChunk(i,
new StripingChunk(decodeInputs[i].getBuffer()));
}
}
}

@Override
boolean prepareParityChunk(int index) {
Preconditions.checkState(index >= dataBlkNum
&& alignedStripe.chunks[index] == null);
&& alignedStripe.isNull(index));
final int parityIndex = index - dataBlkNum;
ByteBuffer buf = dfsStripedInputStream.getParityBuffer().duplicate();
buf.position(cellSize * parityIndex);
buf.limit(cellSize * parityIndex + (int) alignedStripe.range.spanInBlock);
decodeInputs[index] =
new ECChunk(buf.slice(), 0, (int) alignedStripe.range.spanInBlock);
alignedStripe.chunks[index] =
new StripingChunk(decodeInputs[index].getBuffer());
alignedStripe.setChunk(index,
new StripingChunk(decodeInputs[index].getBuffer()));
return true;
}

Expand Down
Loading