diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/pom.xml b/hadoop-hdfs-project/hadoop-hdfs-client/pom.xml
index 0e65e60d33815..f2ae537f33d8a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/pom.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/pom.xml
@@ -190,6 +190,7 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
org.apache.maven.plugins
maven-javadoc-plugin
+ 8
org.apache.hadoop.hdfs.protocol.proto
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
index 8150b6fb0e20e..8147807535894 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs;
+import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.classification.VisibleForTesting;
@@ -68,7 +69,8 @@ public void sleepBeforeHedgedGet() {}
public void delayWhenRenewLeaseTimeout() {}
- public void onCreateBlockReader(LocatedBlock block, int chunkIndex, long offset, long length) {}
+ public void onCreateBlockReader(LocatedBlock block, int chunkIndex,
+ long offset, long length) throws IOException {}
public void failCreateBlockReader() throws InvalidBlockTokenException {}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
index d6131f8ddeb54..81542792a13bd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java
@@ -42,6 +42,7 @@
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
+import org.apache.hadoop.util.Time;
import java.io.EOFException;
import java.io.IOException;
@@ -54,7 +55,9 @@
import java.util.Set;
import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
+import java.util.stream.Collectors;
import static org.apache.hadoop.hdfs.util.IOUtilsClient.updateReadStatistics;
@@ -76,6 +79,8 @@ public class DFSStripedInputStream extends DFSInputStream {
protected ByteBuffer parityBuf;
private final ErasureCodingPolicy ecPolicy;
private RawErasureDecoder decoder;
+ private final ConcurrentHashMap sleepingNodes =
+ new ConcurrentHashMap<>();
/**
* Indicate the start/end offset of the current buffered stripe in the
@@ -232,6 +237,101 @@ private long getOffsetInBlockGroup(long pos) {
return pos - currentLocatedBlock.getStartOffset();
}
+ private static class RetryTimeoutPair {
+ private int attempts = 0;
+ private long sleepTimestamp = 0;
+ }
+
+ /**
+ * Gets the list of nodes that are waiting to be retried.
+ * @return The list of sleeping nodes
+ */
+ protected Collection checkSleepingNodes() {
+ return sleepingNodes.entrySet().stream().filter(entry ->
+ entry.getValue().sleepTimestamp > Time.monotonicNow())
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toSet());
+ }
+
+ private long getSleepingTimestamp(int attempts) {
+ // Introducing a random factor to the wait time before another retry.
+ // The wait time is dependent on # of failures and a random factor.
+ // At the first time of getting a BlockMissingException, the wait time
+ // is a random number between 0..3000 ms. If the first retry
+ // still fails, we will wait 3000 ms grace period before the 2nd retry.
+ // Also at the second retry, the waiting window is expanded to 6000 ms
+ // alleviating the request rate from the server. Similarly the 3rd retry
+ // will wait 6000ms grace period before retry and the waiting window is
+ // expanded to 9000ms.
+ // grace period for the last round of attempt
+ double waitTime = dfsClient.getConf().getTimeWindow() * attempts +
+ // expanding time window for each failure
+ dfsClient.getConf().getTimeWindow() * attempts *
+ ThreadLocalRandom.current().nextDouble();
+ return Time.monotonicNow() + (long)waitTime;
+ }
+
+ /**
+ * Checks whether the {@param block} is on a sleeping node.
+ * @param block the block to check
+ * @return True if the {@param block} is on a sleeping node, otherwise false.
+ */
+ protected boolean isBlockOnSleepingNode(LocatedBlock block) {
+ DNAddrPair dnInfo = getBestNodeDNAddrPair(block, null);
+ return dnInfo != null && checkSleepingNodes().contains(dnInfo.info);
+ }
+
+ /**
+ * Sleeps the thread until the next retry can be attempted.
+ */
+ protected void sleepUntilRetry() {
+ long sleepTimestamp = 0;
+ int countSleepingNodes = 0;
+ for (RetryTimeoutPair rt : sleepingNodes.values()) {
+ if (rt.sleepTimestamp > Time.monotonicNow()) {
+ countSleepingNodes++;
+ if (sleepTimestamp == 0) {
+ sleepTimestamp = rt.sleepTimestamp;
+ } else {
+ sleepTimestamp = Math.min(sleepTimestamp, rt.sleepTimestamp);
+ }
+ }
+ }
+ long duration = sleepTimestamp - Time.monotonicNow();
+ try {
+ DFSClient.LOG.warn("Failed to acquire {}/{} blocks, will wait " +
+ "for {} msec and try again", countSleepingNodes,
+ dataBlkNum + parityBlkNum, duration);
+ Thread.sleep(duration);
+ } catch (InterruptedException e) {
+ DFSClient.LOG.warn("Error read retry backoff interrupted");
+ }
+ }
+
+ /**
+ * Update the {@param dnInfo} node with a new sleepTimestamp if it has any
+ * retries left, otherwise put it into the deadNodes list.
+ * @param dnInfo Update the sleeping information for this node.
+ * @return True if the node {@param dnInfo} will be retried, otherwise false.
+ */
+ protected boolean updateSleepingNodes(DatanodeInfo dnInfo) {
+ // Ignore nodes that are already in the deadNodes list.
+ if (!getLocalDeadNodes().containsKey(dnInfo)) {
+ RetryTimeoutPair rt = sleepingNodes.computeIfAbsent(dnInfo,
+ k -> new RetryTimeoutPair());
+ rt.attempts++;
+ if (rt.attempts > dfsClient.getConf().getMaxBlockAcquireFailures()) {
+ // If we are out of retries, then put the node into the deadNodes list
+ sleepingNodes.remove(dnInfo);
+ addToLocalDeadNodes(dnInfo);
+ } else {
+ rt.sleepTimestamp = getSleepingTimestamp(rt.attempts);
+ return true;
+ }
+ }
+ return false;
+ }
+
boolean createBlockReader(LocatedBlock block, long offsetInBlock,
LocatedBlock[] targetBlocks, BlockReaderInfo[] readerInfos,
int chunkIndex, long readTo) throws IOException {
@@ -248,18 +348,18 @@ boolean createBlockReader(LocatedBlock block, long offsetInBlock,
targetBlocks[chunkIndex] = block;
// internal block has one location, just rule out the deadNodes
- dnInfo = getBestNodeDNAddrPair(block, null);
+ dnInfo = getBestNodeDNAddrPair(block, checkSleepingNodes());
if (dnInfo == null) {
break;
}
if (readTo < 0 || readTo > block.getBlockSize()) {
readTo = block.getBlockSize();
}
+ DFSClientFaultInjector.get().onCreateBlockReader(block, chunkIndex, offsetInBlock,
+ readTo - offsetInBlock);
reader = getBlockReader(block, offsetInBlock,
readTo - offsetInBlock,
dnInfo.addr, dnInfo.storageType, dnInfo.info);
- DFSClientFaultInjector.get().onCreateBlockReader(block, chunkIndex, offsetInBlock,
- readTo - offsetInBlock);
} catch (IOException e) {
if (e instanceof InvalidEncryptionKeyException &&
retry.shouldRefetchEncryptionKey()) {
@@ -273,12 +373,11 @@ boolean createBlockReader(LocatedBlock block, long offsetInBlock,
fetchBlockAt(block.getStartOffset());
retry.refetchToken();
} else {
- //TODO: handles connection issues
DFSClient.LOG.warn("Failed to connect to " + dnInfo.addr + " for " +
"block" + block.getBlock(), e);
// re-fetch the block in case the block has been moved
fetchBlockAt(block.getStartOffset());
- addToLocalDeadNodes(dnInfo.info);
+ updateSleepingNodes(dnInfo.info);
}
}
if (reader != null) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/PositionStripeReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/PositionStripeReader.java
index 12328ebb0f01b..0995107e6beba 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/PositionStripeReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/PositionStripeReader.java
@@ -56,14 +56,14 @@ void prepareDecodeInputs() {
@Override
boolean prepareParityChunk(int index) {
Preconditions.checkState(index >= dataBlkNum &&
- alignedStripe.chunks[index] == null);
+ alignedStripe.isNull(index));
int bufLen = (int) alignedStripe.getSpanInBlock();
decodeInputs[index] = new ECChunk(codingBuffer.duplicate(), index * bufLen,
bufLen);
- alignedStripe.chunks[index] =
- new StripingChunk(decodeInputs[index].getBuffer());
+ alignedStripe.setChunk(index,
+ new StripingChunk(decodeInputs[index].getBuffer()));
return true;
}
@@ -86,9 +86,9 @@ void initDecodeInputs(AlignedStripe alignedStripe) {
}
for (int i = 0; i < dataBlkNum; i++) {
- if (alignedStripe.chunks[i] == null) {
- alignedStripe.chunks[i] =
- new StripingChunk(decodeInputs[i].getBuffer());
+ if (alignedStripe.isNull(i)) {
+ alignedStripe.setChunk(i,
+ new StripingChunk(decodeInputs[i].getBuffer()));
}
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReadStatistics.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReadStatistics.java
index af53f0abf726a..cb080c4e22054 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReadStatistics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ReadStatistics.java
@@ -19,6 +19,8 @@
import org.apache.hadoop.hdfs.protocol.BlockType;
+import java.util.concurrent.TimeUnit;
+
/**
* A utility class that maintains statistics for reading.
*/
@@ -29,7 +31,7 @@ public class ReadStatistics {
private long totalZeroCopyBytesRead;
private BlockType blockType = BlockType.CONTIGUOUS;
- private long totalEcDecodingTimeMillis;
+ private long totalEcDecodingTimeNanos;
public ReadStatistics() {
clear();
@@ -92,7 +94,14 @@ public synchronized BlockType getBlockType() {
* Return the total time in milliseconds used for erasure coding decoding.
*/
public synchronized long getTotalEcDecodingTimeMillis() {
- return totalEcDecodingTimeMillis;
+ return TimeUnit.NANOSECONDS.toMillis(totalEcDecodingTimeNanos);
+ }
+
+ /**
+ * Return the total time in nanoseconds used for erasure coding decoding.
+ */
+ public synchronized long getTotalEcDecodingTimeNanos() {
+ return totalEcDecodingTimeNanos;
}
public synchronized void addRemoteBytes(long amt) {
@@ -117,8 +126,8 @@ public synchronized void addZeroCopyBytes(long amt) {
this.totalZeroCopyBytesRead += amt;
}
- public synchronized void addErasureCodingDecodingTime(long millis) {
- this.totalEcDecodingTimeMillis += millis;
+ public synchronized void addErasureCodingDecodingTime(long nanos) {
+ this.totalEcDecodingTimeNanos += nanos;
}
synchronized void setBlockType(BlockType blockType) {
@@ -130,6 +139,6 @@ public synchronized void clear() {
this.totalLocalBytesRead = 0;
this.totalShortCircuitBytesRead = 0;
this.totalZeroCopyBytesRead = 0;
- this.totalEcDecodingTimeMillis = 0;
+ this.totalEcDecodingTimeNanos = 0;
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StatefulStripeReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StatefulStripeReader.java
index 730307b4434cb..955c67c5de537 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StatefulStripeReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StatefulStripeReader.java
@@ -63,9 +63,9 @@ void prepareDecodeInputs() {
cur.position(pos);
cur.limit(pos + bufLen);
decodeInputs[i] = new ECChunk(cur.slice(), 0, bufLen);
- if (alignedStripe.chunks[i] == null) {
- alignedStripe.chunks[i] =
- new StripingChunk(decodeInputs[i].getBuffer());
+ if (alignedStripe.isNull(i)) {
+ alignedStripe.setChunk(i,
+ new StripingChunk(decodeInputs[i].getBuffer()));
}
}
}
@@ -73,15 +73,15 @@ void prepareDecodeInputs() {
@Override
boolean prepareParityChunk(int index) {
Preconditions.checkState(index >= dataBlkNum
- && alignedStripe.chunks[index] == null);
+ && alignedStripe.isNull(index));
final int parityIndex = index - dataBlkNum;
ByteBuffer buf = dfsStripedInputStream.getParityBuffer().duplicate();
buf.position(cellSize * parityIndex);
buf.limit(cellSize * parityIndex + (int) alignedStripe.range.spanInBlock);
decodeInputs[index] =
new ECChunk(buf.slice(), 0, (int) alignedStripe.range.spanInBlock);
- alignedStripe.chunks[index] =
- new StripingChunk(decodeInputs[index].getBuffer());
+ alignedStripe.setChunk(index,
+ new StripingChunk(decodeInputs[index].getBuffer()));
return true;
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java
index bc39bace79588..ef4d972036e0b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java
@@ -41,9 +41,12 @@
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
+import static org.apache.hadoop.hdfs.util.StripedBlockUtil.ChunkByteBuffer;
+
/**
* The reader for reading a complete {@link StripedBlockUtil.AlignedStripe}.
* Note that an {@link StripedBlockUtil.AlignedStripe} may cross multiple
@@ -113,6 +116,7 @@ void skip() {
protected final LocatedBlock[] targetBlocks;
protected final CorruptedBlocks corruptedBlocks;
protected final BlockReaderInfo[] readerInfos;
+ private boolean decodeInputsPrepared = false;
protected final ErasureCodingPolicy ecPolicy;
protected final short dataBlkNum;
protected final short parityBlkNum;
@@ -173,57 +177,27 @@ void updateState4SuccessRead(StripingChunkReadResult result) {
}
private void checkMissingBlocks() throws IOException {
- if (alignedStripe.missingChunksNum > parityBlkNum) {
+ if (alignedStripe.getMissingChunksNum() > parityBlkNum) {
clearFutures();
- throw new IOException(alignedStripe.missingChunksNum
+ throw new IOException(alignedStripe.getMissingChunksNum()
+ " missing blocks, the stripe is: " + alignedStripe
+ "; locatedBlocks is: " + dfsStripedInputStream.getLocatedBlocks());
}
}
- /**
- * We need decoding. Thus go through all the data chunks and make sure we
- * submit read requests for all of them.
- */
- private void readDataForDecoding() throws IOException {
- prepareDecodeInputs();
- for (int i = 0; i < dataBlkNum; i++) {
- Preconditions.checkNotNull(alignedStripe.chunks[i]);
- if (alignedStripe.chunks[i].state == StripingChunk.REQUESTED) {
- if (!readChunk(targetBlocks[i], i)) {
- alignedStripe.missingChunksNum++;
- }
- }
- }
- checkMissingBlocks();
- }
-
- void readParityChunks(int num) throws IOException {
- for (int i = dataBlkNum, j = 0; i < dataBlkNum + parityBlkNum && j < num;
- i++) {
- if (alignedStripe.chunks[i] == null) {
- if (prepareParityChunk(i) && readChunk(targetBlocks[i], i)) {
- j++;
- } else {
- alignedStripe.missingChunksNum++;
- }
- }
- }
- checkMissingBlocks();
- }
-
- private ByteBufferStrategy[] getReadStrategies(StripingChunk chunk) {
- if (chunk.useByteBuffer()) {
+ private ByteBufferStrategy[] getReadStrategies(int chunkIndex) {
+ if (alignedStripe.useByteBuffer(chunkIndex)) {
ByteBufferStrategy strategy = new ByteBufferStrategy(
- chunk.getByteBuffer(), dfsStripedInputStream.getReadStatistics(),
+ alignedStripe.getByteBuffer(chunkIndex),
+ dfsStripedInputStream.getReadStatistics(),
dfsStripedInputStream.getDFSClient());
return new ByteBufferStrategy[]{strategy};
}
-
+ ChunkByteBuffer buf = alignedStripe.getChunkBuffer(chunkIndex);
ByteBufferStrategy[] strategies =
- new ByteBufferStrategy[chunk.getChunkBuffer().getSlices().size()];
+ new ByteBufferStrategy[buf.getSlices().size()];
for (int i = 0; i < strategies.length; i++) {
- ByteBuffer buffer = chunk.getChunkBuffer().getSlice(i);
+ ByteBuffer buffer = buf.getSlice(i);
strategies[i] = new ByteBufferStrategy(buffer,
dfsStripedInputStream.getReadStatistics(),
dfsStripedInputStream.getDFSClient());
@@ -298,113 +272,176 @@ private Callable readCells(final BlockReader reader,
};
}
- boolean readChunk(final LocatedBlock block, int chunkIndex)
+ void readChunk(final LocatedBlock block, int chunkIndex)
throws IOException {
- final StripingChunk chunk = alignedStripe.chunks[chunkIndex];
+ Preconditions.checkState(alignedStripe.isRequested(chunkIndex));
if (block == null) {
- chunk.state = StripingChunk.MISSING;
- return false;
+ alignedStripe.setMissing(chunkIndex);
+ return;
}
if (readerInfos[chunkIndex] == null) {
if (!dfsStripedInputStream.createBlockReader(block,
- alignedStripe.getOffsetInBlock(), targetBlocks,
- readerInfos, chunkIndex, readTo)) {
- chunk.state = StripingChunk.MISSING;
- return false;
+ alignedStripe.getOffsetInBlock(), targetBlocks,
+ readerInfos, chunkIndex, readTo)) {
+ if (dfsStripedInputStream.isBlockOnSleepingNode(block)) {
+ alignedStripe.setSleeping(chunkIndex);
+ } else {
+ alignedStripe.setMissing(chunkIndex);
+ }
+ return;
}
} else if (readerInfos[chunkIndex].shouldSkip) {
- chunk.state = StripingChunk.MISSING;
- return false;
+ alignedStripe.setMissing(chunkIndex);
+ return;
}
- chunk.state = StripingChunk.PENDING;
+ alignedStripe.setPending(chunkIndex);
Callable readCallable =
readCells(readerInfos[chunkIndex].reader,
readerInfos[chunkIndex].datanode,
readerInfos[chunkIndex].blockReaderOffset,
- alignedStripe.getOffsetInBlock(), getReadStrategies(chunk),
+ alignedStripe.getOffsetInBlock(), getReadStrategies(chunkIndex),
block.getBlock());
Future request = service.submit(readCallable);
futures.put(request, chunkIndex);
- return true;
+ }
+
+ private synchronized void prepareDecodeInputsInternal() {
+ if (!decodeInputsPrepared) {
+ prepareDecodeInputs();
+ decodeInputsPrepared = true;
+ }
}
/**
- * read the whole stripe. do decoding if necessary
+ * Decide which chunks to transition from READY to REQUESTED.
*/
- void readStripe() throws IOException {
- try {
- for (int i = 0; i < dataBlkNum; i++) {
- if (alignedStripe.chunks[i] != null &&
- alignedStripe.chunks[i].state != StripingChunk.ALLZERO) {
- if (!readChunk(targetBlocks[i], i)) {
- alignedStripe.missingChunksNum++;
- }
+ private void requestChunks() throws IOException {
+ if (alignedStripe.getReadyChunksNum() > 0 &&
+ alignedStripe.getFetchedChunksNum() < dataBlkNum &&
+ alignedStripe.getMissingChunksNum() <= parityBlkNum) {
+ int numToRequest = dataBlkNum - alignedStripe.getFetchedChunksNum();
+ numToRequest -= alignedStripe.getPendingChunksNum();
+ for (int i = 0, requested = 0; i < dataBlkNum + parityBlkNum &&
+ requested < numToRequest; i++) {
+ if (alignedStripe.isReady(i)) {
+ alignedStripe.setRequested(i);
+ readChunk(targetBlocks[i], i);
+ requested++;
}
}
- // There are missing block locations at this stage. Thus we need to read
- // the full stripe and one more parity block.
- if (alignedStripe.missingChunksNum > 0) {
- checkMissingBlocks();
- readDataForDecoding();
- // read parity chunks
- readParityChunks(alignedStripe.missingChunksNum);
- }
- } catch (IOException e) {
- dfsStripedInputStream.close();
- throw e;
}
- // TODO: for a full stripe we can start reading (dataBlkNum + 1) chunks
-
- // Input buffers for potential decode operation, which remains null until
- // first read failure
- while (!futures.isEmpty()) {
- try {
- long beginReadMS = Time.monotonicNow();
- StripingChunkReadResult r = StripedBlockUtil
- .getNextCompletedStripedRead(service, futures, 0);
- long readTimeMS = Time.monotonicNow() - beginReadMS;
-
- dfsStripedInputStream.updateReadStats(r.getReadStats(), readTimeMS);
- DFSClient.LOG.debug("Read task returned: {}, for stripe {}",
- r, alignedStripe);
- StripingChunk returnedChunk = alignedStripe.chunks[r.index];
- Preconditions.checkNotNull(returnedChunk);
- Preconditions.checkState(returnedChunk.state == StripingChunk.PENDING);
-
- if (r.state == StripingChunkReadResult.SUCCESSFUL) {
- returnedChunk.state = StripingChunk.FETCHED;
- alignedStripe.fetchedChunksNum++;
- updateState4SuccessRead(r);
- if (alignedStripe.fetchedChunksNum == dataBlkNum) {
- clearFutures();
- break;
- }
- } else {
- returnedChunk.state = StripingChunk.MISSING;
- // close the corresponding reader
- dfsStripedInputStream.closeReader(readerInfos[r.index]);
+ }
- final int missing = alignedStripe.missingChunksNum;
- alignedStripe.missingChunksNum++;
- checkMissingBlocks();
+ /**
+ * Prepare parity blocks.
+ * @return True if we need to request chunks for parity blocks, otherwise
+ * false.
+ */
+ private boolean handleParityChunks() {
+ boolean newParityBlocks = false;
+ int parityChunksNeeded = alignedStripe.getMissingChunksNum() +
+ alignedStripe.getSleepingChunksNum();
+ if (parityChunksNeeded > 0) {
+ prepareDecodeInputsInternal();
+ for (int i = dataBlkNum; i < dataBlkNum + parityBlkNum &&
+ parityChunksNeeded > 0; i++) {
+ if (alignedStripe.isNull(i)) {
+ prepareParityChunk(i);
+ newParityBlocks = true;
+ }
+ if (!alignedStripe.isMissing(i) &&
+ !alignedStripe.isSleeping(i)) {
+ parityChunksNeeded--;
+ }
+ }
+ }
+ return newParityBlocks;
+ }
- readDataForDecoding();
- readParityChunks(alignedStripe.missingChunksNum - missing);
+ private void checkSleepingChunks() {
+ for (int i = 0; i < dataBlkNum + parityBlkNum; i++) {
+ if (!alignedStripe.isNull(i)) {
+ if (targetBlocks[i] != null &&
+ dfsStripedInputStream.isBlockOnSleepingNode(targetBlocks[i])) {
+ alignedStripe.setSleeping(i);
+ } else if (alignedStripe.isSleeping(i)) {
+ alignedStripe.setReady(i);
}
- } catch (InterruptedException ie) {
- String err = "Read request interrupted";
- DFSClient.LOG.error(err, ie);
- dfsStripedInputStream.close();
- clearFutures();
- // Don't decode if read interrupted
- throw new InterruptedIOException(err);
}
}
+ }
- if (alignedStripe.missingChunksNum > 0) {
+ /**
+ * read the whole stripe. do decoding if necessary
+ */
+ void readStripe() throws IOException {
+ // Loop until we have fetched the number of blocks needed for reading or
+ // decoding or until there are more missing blocks than we can tolerate
+ while (alignedStripe.getFetchedChunksNum() < dataBlkNum &&
+ alignedStripe.getMissingChunksNum() <= parityBlkNum) {
+ // Check for any chunks that are on sleeping nodes
+ checkSleepingChunks();
+ // Request and start reading from chunks
+ requestChunks();
+ // The first requests might have issues, so we should handle parity blocks
+ if (handleParityChunks()) {
+ // If we need parity blocks then we should request them.
+ continue;
+ }
+ while (!futures.isEmpty()) {
+ try {
+ long beginReadMS = Time.monotonicNow();
+ StripingChunkReadResult r = StripedBlockUtil
+ .getNextCompletedStripedRead(service, futures, 0);
+ long readTimeMS = Time.monotonicNow() - beginReadMS;
+
+ dfsStripedInputStream.updateReadStats(r.getReadStats(), readTimeMS);
+ DFSClient.LOG.debug("Read task returned: {}, for stripe {}",
+ r, alignedStripe);
+ Preconditions.checkState(!alignedStripe.isNull(r.index));
+ Preconditions.checkState(alignedStripe.isPending(r.index));
+
+ if (r.state == StripingChunkReadResult.SUCCESSFUL) {
+ alignedStripe.setFetched(r.index);
+ updateState4SuccessRead(r);
+ } else {
+ if (r.getException() instanceof ExecutionException &&
+ !(r.getException().getCause() instanceof ChecksumException)) {
+ if (dfsStripedInputStream.updateSleepingNodes(
+ readerInfos[r.index].datanode)) {
+ alignedStripe.setSleeping(r.index);
+ } else {
+ alignedStripe.setMissing(r.index);
+ }
+ } else {
+ alignedStripe.setMissing(r.index);
+ // close the corresponding reader
+ dfsStripedInputStream.closeReader(readerInfos[r.index]);
+ }
+ }
+ } catch (InterruptedException ie) {
+ String err = "Read request interrupted";
+ DFSClient.LOG.error(err, ie);
+ dfsStripedInputStream.close();
+ clearFutures();
+ // Don't decode if read interrupted
+ throw new InterruptedIOException(err);
+ }
+ }
+ // If there are more sleeping chunks than parity chunks we need to wait
+ // until next node can be retried.
+ if (alignedStripe.getSleepingChunksNum() > parityBlkNum) {
+ dfsStripedInputStream.sleepUntilRetry();
+ }
+ }
+ checkMissingBlocks();
+ // We are done fetching chunks, so we can clear the futures.
+ clearFutures();
+ // If we have not fetched only data blocks, then we need to decode
+ if (alignedStripe.fetchedDataChunkNum() != dataBlkNum) {
decode();
}
}
@@ -418,15 +455,15 @@ void readStripe() throws IOException {
*/
void finalizeDecodeInputs() {
- for (int i = 0; i < alignedStripe.chunks.length; i++) {
- final StripingChunk chunk = alignedStripe.chunks[i];
- if (chunk != null && chunk.state == StripingChunk.FETCHED) {
- if (chunk.useChunkBuffer()) {
- chunk.getChunkBuffer().copyTo(decodeInputs[i].getBuffer());
+ for (int i = 0; i < dataBlkNum + parityBlkNum; i++) {
+ if (alignedStripe.isFetched(i)) {
+ if (alignedStripe.useChunkBuffer(i)) {
+ alignedStripe.getChunkBuffer(i).copyTo(
+ decodeInputs[i].getBuffer());
} else {
- chunk.getByteBuffer().flip();
+ alignedStripe.getByteBuffer(i).flip();
}
- } else if (chunk != null && chunk.state == StripingChunk.ALLZERO) {
+ } else if (alignedStripe.isAllZero(i)) {
decodeInputs[i].setAllZero(true);
}
}
@@ -446,7 +483,7 @@ void decodeAndFillBuffer(boolean fillBuffer) throws IOException {
decodeInputs[decodeIndices[i]] = null;
}
- long start = Time.monotonicNow();
+ long start = Time.monotonicNowNanos();
// Step 2: decode into prepared output buffers
decoder.decode(decodeInputs, decodeIndices, outputs);
@@ -454,13 +491,14 @@ void decodeAndFillBuffer(boolean fillBuffer) throws IOException {
if (fillBuffer) {
for (int i = 0; i < decodeIndices.length; i++) {
int missingBlkIdx = decodeIndices[i];
- StripingChunk chunk = alignedStripe.chunks[missingBlkIdx];
- if (chunk.state == StripingChunk.MISSING && chunk.useChunkBuffer()) {
- chunk.getChunkBuffer().copyFrom(outputs[i].getBuffer());
+ if (alignedStripe.isErasedIndex(missingBlkIdx) &&
+ alignedStripe.useChunkBuffer(missingBlkIdx)) {
+ alignedStripe.getChunkBuffer(missingBlkIdx).copyFrom(
+ outputs[i].getBuffer());
}
}
}
- long end = Time.monotonicNow();
+ long end = Time.monotonicNowNanos();
// Decoding time includes CPU time on erasure coding and memory copying of
// decoded data.
dfsStripedInputStream.readStatistics.addErasureCodingDecodingTime(
@@ -473,9 +511,8 @@ void decodeAndFillBuffer(boolean fillBuffer) throws IOException {
int[] prepareErasedIndices() {
int[] decodeIndices = new int[parityBlkNum];
int pos = 0;
- for (int i = 0; i < alignedStripe.chunks.length; i++) {
- if (alignedStripe.chunks[i] != null &&
- alignedStripe.chunks[i].state == StripingChunk.MISSING){
+ for (int i = 0; i < dataBlkNum + parityBlkNum; i++) {
+ if (alignedStripe.isErasedIndex(i)) {
decodeIndices[pos++] = i;
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
index 25ee8d30c9290..259b9c09a591c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java
@@ -307,11 +307,11 @@ public static StripingChunkReadResult getNextCompletedStripedRead(
} catch (ExecutionException e) {
LOG.debug("Exception during striped read task", e);
return new StripingChunkReadResult(futures.remove(future),
- StripingChunkReadResult.FAILED);
+ StripingChunkReadResult.FAILED, e);
} catch (CancellationException e) {
LOG.debug("Exception during striped read task", e);
return new StripingChunkReadResult(futures.remove(future),
- StripingChunkReadResult.CANCELLED);
+ StripingChunkReadResult.CANCELLED, e);
}
}
@@ -371,11 +371,11 @@ public static AlignedStripe[] divideOneStripe(ErasureCodingPolicy ecPolicy,
long overlapEnd = Math.min(cellEnd, stripeEnd);
int overLapLen = (int) (overlapEnd - overlapStart + 1);
if (overLapLen > 0) {
- Preconditions.checkState(s.chunks[cell.idxInStripe] == null);
+ Preconditions.checkState(s.isNull(cell.idxInStripe));
final int pos = (int) (bufOffset + overlapStart - cellStart);
buf.position(pos);
buf.limit(pos + overLapLen);
- s.chunks[cell.idxInStripe] = new StripingChunk(buf.slice());
+ s.setChunk(cell.idxInStripe, new StripingChunk(buf.slice()));
}
}
bufOffset += cell.size;
@@ -541,8 +541,8 @@ private static AlignedStripe[] mergeRangesForInternalBlocks(
long prev = -1;
for (long point : stripePoints) {
if (prev >= 0) {
- stripes.add(new AlignedStripe(prev, point - prev,
- dataBlkNum + parityBlkNum));
+ stripes.add(new AlignedStripe(prev, point - prev, dataBlkNum,
+ parityBlkNum));
}
prev = point;
}
@@ -573,7 +573,6 @@ private static void calcualteChunkPositionsInBuf(int cellSize,
for (StripingCell cell : cells) {
long cellStart = cell.idxInInternalBlk * cellSize + cell.offset;
long cellEnd = cellStart + cell.size - 1;
- StripingChunk chunk;
for (AlignedStripe s : stripes) {
long stripeEnd = s.getOffsetInBlock() + s.getSpanInBlock() - 1;
long overlapStart = Math.max(cellStart, s.getOffsetInBlock());
@@ -582,12 +581,10 @@ private static void calcualteChunkPositionsInBuf(int cellSize,
if (overLapLen <= 0) {
continue;
}
- chunk = s.chunks[cell.idxInStripe];
- if (chunk == null) {
- chunk = new StripingChunk();
- s.chunks[cell.idxInStripe] = chunk;
+ if (s.isNull(cell.idxInStripe)) {
+ s.setChunk(cell.idxInStripe, new StripingChunk());
}
- chunk.getChunkBuffer().addSlice(buf,
+ s.getChunkBuffer(cell.idxInStripe).addSlice(buf,
(int) (done + overlapStart - cellStart), overLapLen);
}
done += cell.size;
@@ -605,8 +602,8 @@ private static void prepareAllZeroChunks(LocatedStripedBlock blockGroup,
long internalBlkLen = getInternalBlockLength(blockGroup.getBlockSize(),
cellSize, dataBlkNum, i);
if (internalBlkLen <= s.getOffsetInBlock()) {
- Preconditions.checkState(s.chunks[i] == null);
- s.chunks[i] = new StripingChunk(StripingChunk.ALLZERO);
+ Preconditions.checkState(s.isNull(i));
+ s.setChunk(i, new StripingChunk(StripingChunk.ALLZERO));
}
}
}
@@ -714,16 +711,202 @@ public String toString() {
public static class AlignedStripe {
public VerticalRange range;
/** status of each chunk in the stripe. */
- public final StripingChunk[] chunks;
- public int fetchedChunksNum = 0;
- public int missingChunksNum = 0;
+ private final StripingChunk[] chunks;
+ private final int dataBlkNum;
+ private final int parityBlkNum;
+ private Map dataStateCounts = new HashMap<>();
+ private Map parityStateCounts = new HashMap<>();
- public AlignedStripe(long offsetInBlock, long length, int width) {
+ /**
+ * Get the state of the chunk, or null if the chunk is null.
+ * @param chunkIndex The chunk to get the state of
+ * @return The state of the chunk at {@param chunkIndex}, or null if the
+ * chunk at {@param chunkIndex} is null
+ */
+ private Integer getChunkState(int chunkIndex) {
+ if (chunks[chunkIndex] == null) {
+ return null;
+ }
+ return chunks[chunkIndex].state;
+ }
+
+ private void setChunkState(int chunkIndex, int chunkState) {
+ updateStateCount(chunkIndex, getChunkState(chunkIndex), -1);
+ updateStateCount(chunkIndex, chunkState, 1);
+ chunks[chunkIndex].state = chunkState;
+ }
+
+ private int getChunkStateCount(Integer chunkState) {
+ return getParityChunkStateCount(chunkState) +
+ getDataChunkStateCount(chunkState);
+ }
+
+ private int getDataChunkStateCount(Integer chunkState) {
+ return dataStateCounts.getOrDefault(chunkState, 0);
+ }
+
+ private int getParityChunkStateCount(Integer chunkState) {
+ return parityStateCounts.getOrDefault(chunkState, 0);
+ }
+
+ public void setChunk(int chunkIndex, StripingChunk chunk) {
+ updateStateCount(chunkIndex, getChunkState(chunkIndex), -1);
+ updateStateCount(chunkIndex, chunk.state, 1);
+ chunks[chunkIndex] = chunk;
+ }
+
+
+ private void updateStateCount(int chunkIndex, Integer chunkState,
+ int delta) {
+ if (chunkIndex < dataBlkNum) {
+ dataStateCounts.merge(chunkState, delta, Integer::sum);
+ } else {
+ parityStateCounts.merge(chunkState, delta, Integer::sum);
+ }
+ }
+
+ public boolean useChunkBuffer(int chunkIndex) {
+ return chunks[chunkIndex].useChunkBuffer();
+ }
+
+ public ChunkByteBuffer getChunkBuffer(int chunkIndex) {
+ return chunks[chunkIndex].getChunkBuffer();
+ }
+
+ public boolean useByteBuffer(int chunkIndex) {
+ return chunks[chunkIndex].useByteBuffer();
+ }
+
+ public ByteBuffer getByteBuffer(int chunkIndex) {
+ return chunks[chunkIndex].getByteBuffer();
+ }
+
+ public int getMissingChunksNum() {
+ return getChunkStateCount(StripingChunk.MISSING);
+ }
+
+ /**
+ * Get the number of fetched chunks that have been read into the buffer
+ * and are ready to decode.
+ * @return The number of {@link StripingChunk#FETCHED} and
+ * {@link StripingChunk#ALLZERO} chunks. If we are not attempting to
+ * decode, then we include the null data chunks because they only need to
+ * be fetched if we need to decode.
+ */
+ public int getFetchedChunksNum() {
+ int fetchedChunksNum = fetchedDataChunkNum();
+ fetchedChunksNum += getParityChunkStateCount(StripingChunk.FETCHED);
+ return fetchedChunksNum;
+ }
+
+ public int fetchedDataChunkNum() {
+ int fetchedChunksNum = 0;
+ if (getDataChunkStateCount(StripingChunk.MISSING) == 0 &&
+ getDataChunkStateCount(StripingChunk.SLEEPING) == 0) {
+ // If there are not missing or sleeping data chunks, then we do not
+ // need to fetch the null data chunks for decoding, so they can be
+ // counted as fetched
+ fetchedChunksNum += getDataChunkStateCount(null);
+ }
+ fetchedChunksNum += getDataChunkStateCount(StripingChunk.FETCHED);
+ fetchedChunksNum += getDataChunkStateCount(StripingChunk.ALLZERO);
+ return fetchedChunksNum;
+ }
+
+ public int getSleepingChunksNum() {
+ return getChunkStateCount(StripingChunk.SLEEPING);
+ }
+
+ public int getReadyChunksNum() {
+ return getChunkStateCount(StripingChunk.READY);
+ }
+
+ public int getPendingChunksNum() {
+ return getChunkStateCount(StripingChunk.PENDING);
+ }
+
+ public boolean isNull(int chunkIndex) {
+ return getChunkState(chunkIndex) == null;
+ }
+
+ public boolean isRequested(int chunkIndex) {
+ return chunks[chunkIndex] != null &&
+ chunks[chunkIndex].state == StripingChunk.REQUESTED;
+ }
+
+ public void setRequested(int chunkIndex) {
+ setChunkState(chunkIndex, StripingChunk.REQUESTED);
+ }
+
+ public boolean isFetched(int chunkIndex) {
+ return chunks[chunkIndex] != null &&
+ chunks[chunkIndex].state == StripingChunk.FETCHED;
+ }
+
+ public void setFetched(int chunkIndex) {
+ setChunkState(chunkIndex, StripingChunk.FETCHED);
+ }
+
+ public boolean isSleeping(int chunkIndex) {
+ return chunks[chunkIndex] != null &&
+ chunks[chunkIndex].state == StripingChunk.SLEEPING;
+ }
+
+ public void setSleeping(int chunkIndex) {
+ setChunkState(chunkIndex, StripingChunk.SLEEPING);
+ }
+
+ public boolean isReady(int chunkIndex) {
+ return chunks[chunkIndex] != null &&
+ chunks[chunkIndex].state == StripingChunk.READY;
+ }
+
+ public void setReady(int chunkIndex) {
+ setChunkState(chunkIndex, StripingChunk.READY);
+ }
+
+ public boolean isPending(int chunkIndex) {
+ return chunks[chunkIndex] != null &&
+ chunks[chunkIndex].state == StripingChunk.PENDING;
+ }
+
+ public void setPending(int chunkIndex) {
+ setChunkState(chunkIndex, StripingChunk.PENDING);
+ }
+
+ public boolean isMissing(int chunkIndex) {
+ return chunks[chunkIndex] != null &&
+ chunks[chunkIndex].state == StripingChunk.MISSING;
+ }
+
+ public void setMissing(int chunkIndex) {
+ setChunkState(chunkIndex, StripingChunk.MISSING);
+ }
+
+ public boolean isAllZero(int chunkIndex) {
+ return chunks[chunkIndex] != null &&
+ chunks[chunkIndex].state == StripingChunk.ALLZERO;
+ }
+
+ public boolean isErasedIndex(int chunkIndex) {
+ // Any PENDING, MISSING, SLEEPING, and REQUESTED chunks are considered
+ // erased for decoding purposes
+ return !isNull(chunkIndex) &&
+ !isFetched(chunkIndex) &&
+ !isAllZero(chunkIndex);
+ }
+
+ public AlignedStripe(long offsetInBlock, long length, int dataBlkNum,
+ int parityBlkNum) {
Preconditions.checkArgument(offsetInBlock >= 0 && length >= 0,
"OffsetInBlock(%s) and length(%s) must be non-negative",
offsetInBlock, length);
this.range = new VerticalRange(offsetInBlock, length);
- this.chunks = new StripingChunk[width];
+ this.dataBlkNum = dataBlkNum;
+ this.parityBlkNum = parityBlkNum;
+ this.chunks = new StripingChunk[dataBlkNum + parityBlkNum];
+ dataStateCounts.put(null, dataBlkNum);
+ parityStateCounts.put(null, parityBlkNum);
}
public boolean include(long pos) {
@@ -741,8 +924,8 @@ public long getSpanInBlock() {
@Override
public String toString() {
return "AlignedStripe(Offset=" + range.offsetInBlock + ", length=" +
- range.spanInBlock + ", fetchedChunksNum=" + fetchedChunksNum +
- ", missingChunksNum=" + missingChunksNum + ")";
+ range.spanInBlock + ", fetchedChunksNum=" + getFetchedChunksNum() +
+ ", missingChunksNum=" + getMissingChunksNum() + ")";
}
}
@@ -825,17 +1008,31 @@ public static class StripingChunk {
* all-zero bytes in codec calculations.
*/
public static final int ALLZERO = 0X0f;
+ /** Chunk fetch was attempted and it is now sleeping until retry. **/
+ public static final int SLEEPING = 0xf1;
+ /** Chunk is ready to start reading. **/
+ public static final int READY = 0xf2;
/**
- * If a chunk is completely in requested range, the state transition is:
- * REQUESTED (when AlignedStripe created) -> PENDING ->
- * {FETCHED | MISSING}
+ * If a chunk is completely in requested range, the state transition is:
+ * - READY (when AlignedStripe created) -> REQUESTED
+ * - REQUESTED -> {SLEEPING | PENDING}
+ * - SLEEPING -> {REQUESTED | READY}
+ * - PENDING -> {FETCHED | MISSING}
* If a chunk is completely outside requested range (including parity
* chunks), state transition is:
- * null (AlignedStripe created) ->REQUESTED (upon failure) ->
- * PENDING ...
+ * null (AlignedStripe created) -> READY (upon failure) -> REQUESTED
+ * -> {SLEEPING | PENDING} ...
*/
- public int state = REQUESTED;
+ private int state = READY;
+
+ public int getState() {
+ return state;
+ }
+
+ public void setState(int state) {
+ this.state = state;
+ }
private final ChunkByteBuffer chunkBuffer;
private final ByteBuffer byteBuffer;
@@ -937,26 +1134,38 @@ public static class StripingChunkReadResult {
public final int index;
public final int state;
+ private final Exception exception;
private final BlockReadStats readStats;
+ public Exception getException() {
+ return exception;
+ }
+
public StripingChunkReadResult(int state) {
Preconditions.checkArgument(state == TIMEOUT,
"Only timeout result should return negative index.");
this.index = -1;
this.state = state;
this.readStats = null;
+ this.exception = null;
}
- public StripingChunkReadResult(int index, int state) {
- this(index, state, null);
+ public StripingChunkReadResult(int index, int state, BlockReadStats stats) {
+ this(index, state, stats, null);
}
- public StripingChunkReadResult(int index, int state, BlockReadStats stats) {
+ public StripingChunkReadResult(int index, int state, Exception ex) {
+ this(index, state, null, ex);
+ }
+
+ public StripingChunkReadResult(int index, int state, BlockReadStats stats,
+ Exception exception) {
Preconditions.checkArgument(state != TIMEOUT,
"Timeout result should return negative index.");
this.index = index;
this.state = state;
this.readStats = stats;
+ this.exception = exception;
}
public BlockReadStats getReadStats() {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index 2e1563444a245..36c59530af713 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -4569,6 +4569,7 @@
3
Maximum failures allowed when trying to get block information from a specific datanode.
+ This is used for both EC and replication.
@@ -4686,6 +4687,7 @@
Base time window in ms for DFSClient retries. For each retry attempt,
this value is extended linearly (e.g. 3000 ms for first attempt and
first retry, 6000 ms for second retry, 9000 ms for third retry, etc.).
+ This is used for both EC and replication
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
index 6578ad0fbc8b6..d12ffa769ecc1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
@@ -90,29 +90,35 @@ static void verifyPread(FileSystem fs, Path srcPath, int fileLength,
byte[] expected, byte[] buf, ErasureCodingPolicy ecPolicy)
throws IOException {
try (FSDataInputStream in = fs.open(srcPath)) {
- int[] startOffsets = {0, 1, ecPolicy.getCellSize() - 102,
- ecPolicy.getCellSize(), ecPolicy.getCellSize() + 102,
- ecPolicy.getCellSize() * (ecPolicy.getNumDataUnits() - 1),
- ecPolicy.getCellSize() * (ecPolicy.getNumDataUnits() - 1) + 102,
- ecPolicy.getCellSize() * ecPolicy.getNumDataUnits(),
- fileLength - 102, fileLength - 1};
- for (int startOffset : startOffsets) {
- startOffset = Math.max(0, Math.min(startOffset, fileLength - 1));
- int remaining = fileLength - startOffset;
- int offset = startOffset;
- final byte[] result = new byte[remaining];
- while (remaining > 0) {
- int target = Math.min(remaining, buf.length);
- in.readFully(offset, buf, 0, target);
- System.arraycopy(buf, 0, result, offset - startOffset, target);
- remaining -= target;
- offset += target;
- }
- for (int i = 0; i < fileLength - startOffset; i++) {
- assertEquals("Byte at " + (startOffset + i) + " is different, "
- + "the startOffset is " + startOffset, expected[startOffset + i],
- result[i]);
- }
+ verifyPread(in, fileLength, expected, buf, ecPolicy);
+ }
+ }
+
+ static void verifyPread(FSDataInputStream in, int fileLength,
+ byte[] expected, byte[] buf, ErasureCodingPolicy ecPolicy)
+ throws IOException {
+ int[] startOffsets = {0, 1, ecPolicy.getCellSize() - 102,
+ ecPolicy.getCellSize(), ecPolicy.getCellSize() + 102,
+ ecPolicy.getCellSize() * (ecPolicy.getNumDataUnits() - 1),
+ ecPolicy.getCellSize() * (ecPolicy.getNumDataUnits() - 1) + 102,
+ ecPolicy.getCellSize() * ecPolicy.getNumDataUnits(),
+ fileLength - 102, fileLength - 1};
+ for (int startOffset : startOffsets) {
+ startOffset = Math.max(0, Math.min(startOffset, fileLength - 1));
+ int remaining = fileLength - startOffset;
+ int offset = startOffset;
+ final byte[] result = new byte[remaining];
+ while (remaining > 0) {
+ int target = Math.min(remaining, buf.length);
+ in.readFully(offset, buf, 0, target);
+ System.arraycopy(buf, 0, result, offset - startOffset, target);
+ remaining -= target;
+ offset += target;
+ }
+ for (int i = 0; i < fileLength - startOffset; i++) {
+ assertEquals("Byte at " + (startOffset + i) + " is different, "
+ + "the startOffset is " + startOffset, expected[startOffset + i],
+ result[i]);
}
}
}
@@ -120,16 +126,21 @@ static void verifyPread(FileSystem fs, Path srcPath, int fileLength,
static void verifyStatefulRead(FileSystem fs, Path srcPath, int fileLength,
byte[] expected, byte[] buf) throws IOException {
try (FSDataInputStream in = fs.open(srcPath)) {
- final byte[] result = new byte[fileLength];
- int readLen = 0;
- int ret;
- while ((ret = in.read(buf, 0, buf.length)) >= 0) {
- System.arraycopy(buf, 0, result, readLen, ret);
- readLen += ret;
- }
- assertEquals("The length of file should be the same to write size", fileLength, readLen);
- Assert.assertArrayEquals(expected, result);
+ verifyStatefulRead(in, fileLength, expected, buf);
+ }
+ }
+
+ static void verifyStatefulRead(FSDataInputStream in, int fileLength,
+ byte[] expected, byte[] buf) throws IOException {
+ final byte[] result = new byte[fileLength];
+ int readLen = 0;
+ int ret;
+ while ((ret = in.read(buf, 0, buf.length)) >= 0) {
+ System.arraycopy(buf, 0, result, readLen, ret);
+ readLen += ret;
}
+ assertEquals("The length of file should be the same to write size", fileLength, readLen);
+ Assert.assertArrayEquals(expected, result);
}
static void verifyStatefulRead(FileSystem fs, Path srcPath, int fileLength,
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java
index e1f7524546360..bb8586139d3d3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteReadStripedFile.java
@@ -17,14 +17,20 @@
*/
package org.apache.hadoop.hdfs;
+import org.apache.hadoop.fs.HdfsBlockLocation;
+import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfoWithStorage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.event.Level;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -32,6 +38,7 @@
import org.apache.hadoop.hdfs.web.WebHdfsTestUtil;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.Time;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -43,6 +50,12 @@
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY;
public class TestWriteReadStripedFile {
public static final Logger LOG =
@@ -75,7 +88,10 @@ public class TestWriteReadStripedFile {
@Before
public void setup() throws IOException {
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
- cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
+ cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(numDNs)
+ .storagesPerDatanode(1)
+ .build();
fs = cluster.getFileSystem();
fs.enableErasureCodingPolicy(ecPolicy.getName());
fs.mkdirs(new Path("/ec"));
@@ -279,6 +295,146 @@ public void testWriteReadUsingWebHdfs() throws Exception {
// webhdfs doesn't support bytebuffer read
}
+ @Test
+ public void testReadBackoffRetry() throws Exception {
+ int fileLength = blockSize * dataBlocks + cellSize + 123;
+
+ fs.getConf().setLong(HdfsClientConfigKeys.Retry.WINDOW_BASE_KEY, 3000);
+ fs.getConf().setLong(DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY, 3);
+ fs.initDFSClient(fs.getUri(), fs.getConf());
+
+ final byte[] expected = StripedFileTestUtil.generateBytes(fileLength);
+ Path srcPath = new Path("/ec/testReadBackoff");
+ DFSTestUtil.writeFile(fs, srcPath, new String(expected));
+
+ Set badDns = new TreeSet<>();
+ DFSClientFaultInjector.set(new DFSClientFaultInjector() {
+ @Override
+ public void onCreateBlockReader(LocatedBlock block, int chunkIndex,
+ long offset, long length) throws IOException {
+ if (badDns.contains(block.getLocations()[0])) {
+ throw new IOException("FAILED TO CONNECT FOR TEST");
+ }
+ }
+ });
+ DatanodeInfoWithStorage[] chunkToDn =
+ new DatanodeInfoWithStorage[dataBlocks + parityBlocks];
+ HdfsBlockLocation[] locs =
+ (HdfsBlockLocation[])fs.getFileBlockLocations(srcPath, 0, fileLength);
+ LocatedBlock block = locs[0].getLocatedBlock();
+ for (int i = 0; i < block.getLocations().length; i++) {
+ chunkToDn[i] = block.getLocations()[i];
+ }
+
+ StripedFileTestUtil.verifyLength(fs, srcPath, fileLength);
+ byte[] largeBuf = new byte[fileLength + 100];
+ try (FSDataInputStream in = fs.open(srcPath)) {
+ StripedFileTestUtil
+ .verifyPread(in, fileLength, expected, largeBuf, ecPolicy);
+ long decodingTimeNanos =
+ ((HdfsDataInputStream) in).getReadStatistics().getTotalEcDecodingTimeNanos();
+ // Should read without any decoding
+ Assert.assertEquals(0, decodingTimeNanos);
+ }
+
+ badDns.add(chunkToDn[0]);
+ try (FSDataInputStream in = fs.open(srcPath)) {
+ StripedFileTestUtil
+ .verifyPread(in, fileLength, expected, largeBuf, ecPolicy);
+ long decodingTimeNanos =
+ ((HdfsDataInputStream) in).getReadStatistics().getTotalEcDecodingTimeNanos();
+ // Should read without any decoding
+ Assert.assertTrue("Decoding should have happened", decodingTimeNanos > 0);
+ }
+ try (FSDataInputStream in = fs.open(srcPath)) {
+ StripedFileTestUtil
+ .verifyStatefulRead(in, fileLength, expected, largeBuf);
+ long decodingTimeNanos =
+ ((HdfsDataInputStream) in).getReadStatistics().getTotalEcDecodingTimeNanos();
+ // Should read without any decoding
+ Assert.assertTrue("Decoding should have happened", decodingTimeNanos > 0);
+ }
+
+ badDns.add(chunkToDn[1]);
+ try (FSDataInputStream in = fs.open(srcPath)) {
+ StripedFileTestUtil
+ .verifyPread(in, fileLength, expected, largeBuf, ecPolicy);
+ long decodingTimeNanos =
+ ((HdfsDataInputStream) in).getReadStatistics().getTotalEcDecodingTimeNanos();
+ // Should read without any decoding
+ Assert.assertTrue("Decoding should have happened", decodingTimeNanos > 0);
+ }
+ try (FSDataInputStream in = fs.open(srcPath)) {
+ StripedFileTestUtil
+ .verifyStatefulRead(in, fileLength, expected, largeBuf);
+ long decodingTimeNanos =
+ ((HdfsDataInputStream) in).getReadStatistics().getTotalEcDecodingTimeNanos();
+ // Should read without any decoding
+ Assert.assertTrue("Decoding should have happened", decodingTimeNanos > 0);
+ }
+
+ badDns.add(chunkToDn[2]);
+ try (FSDataInputStream in = fs.open(srcPath)) {
+ long start = Time.monotonicNow();
+ Assert.assertThrows(IOException.class, () -> {
+ StripedFileTestUtil
+ .verifyPread(in, fileLength, expected, largeBuf, ecPolicy);
+ });
+ long timeMs = Time.monotonicNow() - start;
+ Assert.assertTrue("Read should have been slower than 10 seconds but was "
+ + timeMs + " ms", timeMs > 10000);
+ }
+ try (FSDataInputStream in = fs.open(srcPath)) {
+ ExecutorService service = Executors.newSingleThreadExecutor();
+ // set the DataNode busy status back to false after 10 seconds.
+ service.submit(() -> {
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException ex) {
+ LOG.error("Interrupted while waiting to mark the DqlBusyChecker as " +
+ "not busy", ex);
+ }
+ badDns.remove(chunkToDn[0]);
+ });
+ long start = Time.monotonicNow();
+ StripedFileTestUtil
+ .verifyPread(in, fileLength, expected, largeBuf, ecPolicy);
+ long timeMs = Time.monotonicNow() - start;
+ Assert.assertTrue("Read should have been slower than 10 seconds but was "
+ + timeMs + " ms", timeMs > 10000);
+ long decodingTimeNanos =
+ ((HdfsDataInputStream) in).getReadStatistics().getTotalEcDecodingTimeNanos();
+ // Should read without any decoding
+ Assert.assertTrue("Decoding should have happened", decodingTimeNanos > 0);
+ }
+
+ badDns.add(chunkToDn[0]);
+ try (FSDataInputStream in = fs.open(srcPath)) {
+ ExecutorService service = Executors.newSingleThreadExecutor();
+ // set the DataNode busy status back to false after 10 seconds.
+ service.submit(() -> {
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException ex) {
+ LOG.error("Interrupted while waiting to mark the DqlBusyChecker as " +
+ "not busy", ex);
+ }
+ badDns.remove(chunkToDn[0]);
+ });
+ long start = Time.monotonicNow();
+ StripedFileTestUtil
+ .verifyStatefulRead(in, fileLength, expected, largeBuf);
+ // Should read with decoding
+ long timeMs = Time.monotonicNow() - start;
+ Assert.assertTrue("Read should have been slower than 10 seconds but was "
+ + timeMs + " ms", timeMs > 10000);
+ long decodingTimeNanos =
+ ((HdfsDataInputStream) in).getReadStatistics().getTotalEcDecodingTimeNanos();
+ // Should read without any decoding
+ Assert.assertTrue("Decoding should have happened", decodingTimeNanos > 0);
+ }
+ }
+
@Test
public void testConcat() throws Exception {
final byte[] data =
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocal.java
index 534243ddf318a..4180b0812ab65 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocal.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/impl/TestBlockReaderLocal.java
@@ -858,7 +858,7 @@ public void testStatisticsForErasureCodingRead() throws IOException {
Assert.assertEquals(BlockType.STRIPED, stats.getBlockType());
Assert.assertEquals(length, stats.getTotalLocalBytesRead());
Assert.assertEquals(length, stats.getTotalBytesRead());
- Assert.assertTrue(stats.getTotalEcDecodingTimeMillis() > 0);
+ Assert.assertTrue(stats.getTotalEcDecodingTimeNanos() > 0);
}
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestStripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestStripedBlockUtil.java
index d1922dff8216e..12dd52ba9f816 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestStripedBlockUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestStripedBlockUtil.java
@@ -267,13 +267,13 @@ public void testDivideByteRangeIntoStripes() {
for (AlignedStripe stripe : stripes) {
for (int i = 0; i < dataBlocks; i++) {
- StripingChunk chunk = stripe.chunks[i];
- if (chunk == null || chunk.state != StripingChunk.REQUESTED) {
+ if (stripe.isNull(i) || !stripe.isReady(i)) {
continue;
}
int done = 0;
int len;
- for (ByteBuffer slice : chunk.getChunkBuffer().getSlices()) {
+ for (ByteBuffer slice :
+ stripe.getChunkBuffer(i).getSlices()) {
len = slice.remaining();
slice.put(internalBlkBufs[i],
(int) stripe.getOffsetInBlock() + done, len);