From ace73116f1cc1d5e29edf18dafc80a61206c083d Mon Sep 17 00:00:00 2001 From: Sneha Vijayarajan Date: Tue, 17 Mar 2020 10:02:28 +0530 Subject: [PATCH 01/11] Report read-ahead error back --- .../fs/azurebfs/services/AbfsInputStream.java | 15 +- .../fs/azurebfs/services/ReadBuffer.java | 29 +- .../azurebfs/services/ReadBufferManager.java | 83 +++- .../azurebfs/services/ReadBufferWorker.java | 2 + .../services/TestAbfsInputStream.java | 438 ++++++++++++++++++ 5 files changed, 556 insertions(+), 11 deletions(-) create mode 100644 hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index 1f343424fbff8..38dfd649e6b1a 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -22,9 +22,14 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.net.HttpURLConnection; +import java.util.UUID; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileSystem.Statistics; @@ -35,6 +40,8 @@ * The AbfsInputStream for AbfsClient. */ public class AbfsInputStream extends FSInputStream { + private static Logger LOG = LoggerFactory.getLogger(AbfsInputStream.class); + private final AbfsClient client; private final Statistics statistics; private final String path; @@ -182,18 +189,19 @@ private int readInternal(final long position, final byte[] b, final int offset, int receivedBytes; // queue read-aheads + UUID queueReadAheadRequestId = UUID.randomUUID(); int numReadAheads = this.readAheadQueueDepth; long nextSize; long nextOffset = position; while (numReadAheads > 0 && nextOffset < contentLength) { nextSize = Math.min((long) bufferSize, contentLength - nextOffset); - ReadBufferManager.getBufferManager().queueReadAhead(this, nextOffset, (int) nextSize); + ReadBufferManager.getBufferManager().queueReadAhead(this, nextOffset, (int) nextSize, queueReadAheadRequestId); nextOffset = nextOffset + nextSize; numReadAheads--; } // try reading from buffers first - receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b); + receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b, queueReadAheadRequestId); if (receivedBytes > 0) { return receivedBytes; } @@ -206,6 +214,7 @@ private int readInternal(final long position, final byte[] b, final int offset, } } + @VisibleForTesting int readRemote(long position, byte[] b, int offset, int length) throws IOException { if (position < 0) { throw new IllegalArgumentException("attempting to read from negative offset"); @@ -228,6 +237,7 @@ int readRemote(long position, byte[] b, int offset, int length) throws IOExcepti final AbfsRestOperation op; AbfsPerfTracker tracker = client.getAbfsPerfTracker(); try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "readRemote", "read")) { + LOG.trace(String.format("Trigger client.read for path=%s position=%s offset=%s length=%s", path, position, offset, length)); op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag); perfInfo.registerResult(op.getResult()).registerSuccess(true); } catch (AzureBlobFileSystemException ex) { @@ -237,6 +247,7 @@ int readRemote(long position, byte[] b, int offset, int length) throws IOExcepti throw new FileNotFoundException(ere.getMessage()); } } + throw new IOException(ex); } long bytesRead = op.getResult().getBytesReceived(); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java index 00e4f008ad0a8..e6750b0aef945 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java @@ -17,11 +17,14 @@ */ package org.apache.hadoop.fs.azurebfs.services; - +import java.io.IOException; import java.util.concurrent.CountDownLatch; +import java.util.UUID; import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus; +import static org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus.READ_FAILED; + class ReadBuffer { private AbfsInputStream stream; @@ -40,6 +43,11 @@ class ReadBuffer { private boolean isLastByteConsumed = false; private boolean isAnyByteConsumed = false; + private IOException errException = null; + // The unique queue request ID which identifies all the read buffers triggered from + // a single AbfsInputStream read request. + private UUID queueReadAheadRequestId = null; + public AbfsInputStream getStream() { return stream; } @@ -88,12 +96,31 @@ public void setBufferindex(int bufferindex) { this.bufferindex = bufferindex; } + public IOException getErrException() { + return errException; + } + + public void setErrException(final java.io.IOException errException) { + this.errException = errException; + } + + public UUID getQueueReadAheadRequestId() { + return queueReadAheadRequestId; + } + + public void setQueueReadAheadRequestId(final UUID queueReadAheadRequestId) { + this.queueReadAheadRequestId = queueReadAheadRequestId; + } + public ReadBufferStatus getStatus() { return status; } public void setStatus(ReadBufferStatus status) { this.status = status; + if (status == READ_FAILED) { + bufferindex = -1; + } } public CountDownLatch getLatch() { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java index 5b71cf05225a8..373de1005dad1 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java @@ -21,11 +21,15 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.Collection; import java.util.LinkedList; import java.util.Queue; import java.util.Stack; import java.util.concurrent.CountDownLatch; +import java.util.UUID; + +import com.google.common.annotations.VisibleForTesting; /** * The Read Buffer Manager for Rest AbfsClient. @@ -90,8 +94,10 @@ private ReadBufferManager() { * @param stream The {@link AbfsInputStream} for which to do the read-ahead * @param requestedOffset The offset in the file which shoukd be read * @param requestedLength The length to read + * @param queueReadAheadRequestId unique queue request ID */ - void queueReadAhead(final AbfsInputStream stream, final long requestedOffset, final int requestedLength) { + void queueReadAhead(final AbfsInputStream stream, final long requestedOffset, final int requestedLength + , final UUID queueReadAheadRequestId) { if (LOGGER.isTraceEnabled()) { LOGGER.trace("Start Queueing readAhead for {} offset {} length {}", stream.getPath(), requestedOffset, requestedLength); @@ -101,6 +107,7 @@ void queueReadAhead(final AbfsInputStream stream, final long requestedOffset, fi if (isAlreadyQueued(stream, requestedOffset)) { return; // already queued, do not queue again } + if (freeList.isEmpty() && !tryEvict()) { return; // no buffers available, cannot queue anything } @@ -112,6 +119,7 @@ void queueReadAhead(final AbfsInputStream stream, final long requestedOffset, fi buffer.setRequestedLength(requestedLength); buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE); buffer.setLatch(new CountDownLatch(1)); + buffer.setQueueReadAheadRequestId(queueReadAheadRequestId); Integer bufferIndex = freeList.pop(); // will return a value, since we have checked size > 0 already @@ -141,7 +149,8 @@ void queueReadAhead(final AbfsInputStream stream, final long requestedOffset, fi * @param buffer the buffer to read data into. Note that the buffer will be written into from offset 0. * @return the number of bytes read */ - int getBlock(final AbfsInputStream stream, final long position, final int length, final byte[] buffer) { + int getBlock(final AbfsInputStream stream, final long position, final int length, final byte[] buffer, + final UUID queueReadAheadRequestId) throws IOException { // not synchronized, so have to be careful with locking if (LOGGER.isTraceEnabled()) { LOGGER.trace("getBlock for file {} position {} thread {}", @@ -152,7 +161,7 @@ int getBlock(final AbfsInputStream stream, final long position, final int length int bytesRead = 0; synchronized (this) { - bytesRead = getBlockFromCompletedQueue(stream, position, length, buffer); + bytesRead = getBlockFromCompletedQueue(stream, position, length, buffer, queueReadAheadRequestId); } if (bytesRead > 0) { if (LOGGER.isTraceEnabled()) { @@ -253,7 +262,12 @@ private synchronized boolean tryEvict() { } private boolean evict(final ReadBuffer buf) { - freeList.push(buf.getBufferindex()); + // As failed ReadBuffers (bufferIndx = -1) are saved in completedReadList, + // avoid adding it to freeList. + if (buf.getBufferindex() != -1) { + freeList.push(buf.getBufferindex()); + } + completedReadList.remove(buf); if (LOGGER.isTraceEnabled()) { LOGGER.trace("Evicting buffer idx {}; was used for file {} offset {} length {}", @@ -289,6 +303,19 @@ private ReadBuffer getFromList(final Collection list, final AbfsInpu return null; } + private ReadBuffer getBufferFromCompletedQueue(final AbfsInputStream stream, final long requestedOffset) { + for (ReadBuffer buffer : completedReadList) { + if ((buffer.getStream() == stream) && (requestedOffset >= buffer.getOffset())) { + if ((requestedOffset < buffer.getOffset() + buffer.getLength()) + || requestedOffset < buffer.getOffset() + buffer.getRequestedLength()) { + return buffer; + } + } + } + + return null; + } + private void clearFromReadAheadQueue(final AbfsInputStream stream, final long requestedOffset) { ReadBuffer buffer = getFromList(readAheadQueue, stream, requestedOffset); if (buffer != null) { @@ -299,11 +326,31 @@ private void clearFromReadAheadQueue(final AbfsInputStream stream, final long re } private int getBlockFromCompletedQueue(final AbfsInputStream stream, final long position, final int length, - final byte[] buffer) { - ReadBuffer buf = getFromList(completedReadList, stream, position); - if (buf == null || position >= buf.getOffset() + buf.getLength()) { + final byte[] buffer, final UUID queueReadAheadRequestId) + throws IOException { + ReadBuffer buf = getBufferFromCompletedQueue(stream, position); + + if (buf == null) { + return 0; + } + + if (buf.getStatus() == ReadBufferStatus.READ_FAILED) { + // is this failure from the requests queued by current queueReadAheadRequestId ? + // if yes, return failure exception + // else return 0 so that the main thread can make an attempt to read requested offset. + if (buf.getQueueReadAheadRequestId().equals(queueReadAheadRequestId)) { + // is read ahead issued from current queue request ID + throw buf.getErrException(); + } else { + return 0; + } + } + + if ((buf.getStatus() != ReadBufferStatus.AVAILABLE) + || (position >= buf.getOffset() + buf.getLength())) { return 0; } + int cursor = (int) (position - buf.getOffset()); int availableLengthInBuffer = buf.getLength() - cursor; int lengthToCopy = Math.min(length, availableLengthInBuffer); @@ -364,6 +411,7 @@ void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final i LOGGER.trace("ReadBufferWorker completed file {} for offset {} bytes {}", buffer.getStream().getPath(), buffer.getOffset(), bytesActuallyRead); } + synchronized (this) { inProgressList.remove(buffer); if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) { @@ -373,9 +421,13 @@ void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final i completedReadList.add(buffer); } else { freeList.push(buffer.getBufferindex()); - // buffer should go out of scope after the end of the calling method in ReadBufferWorker, and eligible for GC + // buffer will be deleted as per the eviction policy. } + + buffer.setStatus(result); + completedReadList.add(buffer); } + //outside the synchronized, since anyone receiving a wake-up from the latch must see safe-published results buffer.getLatch().countDown(); // wake up waiting threads (if any) } @@ -392,4 +444,19 @@ void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final i private long currentTimeMillis() { return System.nanoTime() / 1000 / 1000; } + + @VisibleForTesting + int getThreshold_age_milliseconds() { + return THRESHOLD_AGE_MILLISECONDS; + } + + @VisibleForTesting + int getCompletedReadListSize() { + return completedReadList.size(); + } + + @VisibleForTesting + void callTryEvict() { + tryEvict(); + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java index af69de0f089e9..aa7c25ca0c83c 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java @@ -18,6 +18,7 @@ package org.apache.hadoop.fs.azurebfs.services; +import java.io.IOException; import java.util.concurrent.CountDownLatch; import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus; @@ -64,6 +65,7 @@ public void run() { int bytesRead = buffer.getStream().readRemote(buffer.getOffset(), buffer.getBuffer(), 0, buffer.getRequestedLength()); bufferManager.doneReading(buffer, ReadBufferStatus.AVAILABLE, bytesRead); // post result back to ReadBufferManager } catch (Exception ex) { + buffer.setErrException(new IOException(ex)); bufferManager.doneReading(buffer, ReadBufferStatus.READ_FAILED, 0); } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java new file mode 100644 index 0000000000000..05b091595eb4d --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java @@ -0,0 +1,438 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.io.IOException; +import java.util.UUID; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TimeoutException; + +import static java.util.UUID.randomUUID; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.apache.hadoop.test.LambdaTestUtils.intercept; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FORWARD_SLASH; + +/** + * Unit test AbfsInputStream. + */ +public class TestAbfsInputStream extends + AbstractAbfsIntegrationTest { + + private static final int KILOBYTE = 1024; + + private AbfsRestOperation getMockRestOp() { + AbfsRestOperation op = mock(AbfsRestOperation.class); + AbfsHttpOperation httpOp = mock(AbfsHttpOperation.class); + when(httpOp.getBytesReceived()).thenReturn(1024L); + when(op.getResult()).thenReturn(httpOp); + return op; + } + + private AbfsClient getMockAbfsClient() { + // Mock failure for client.read() + AbfsClient client = mock(AbfsClient.class); + AbfsPerfTracker tracker = new AbfsPerfTracker( + "test" + , this.getAccountName(), + this.getConfiguration()); + when(client.getAbfsPerfTracker()).thenReturn(tracker); + + return client; + } + + private AbfsInputStream getAbfsInputStream(AbfsClient mockAbfsClient, String fileName) { + // Create AbfsInputStream with the client instance + AbfsInputStream inputStream = new AbfsInputStream( + mockAbfsClient + , null + , FORWARD_SLASH + fileName + , 3 * KILOBYTE + , 1 * KILOBYTE // Setting read ahead buffer size of 1 KB + , this.getConfiguration().getReadAheadQueueDepth() + , this.getConfiguration().getTolerateOobAppends() + , "eTag"); + + return inputStream; + } + + private void queueReadAheads(AbfsInputStream inputStream, UUID queueRequestId) { + // Mimic AbfsInputStream readAhead queue requests + ReadBufferManager.getBufferManager() + .queueReadAhead(inputStream, 0, 1 * KILOBYTE, queueRequestId); + ReadBufferManager.getBufferManager() + .queueReadAhead(inputStream, 1 * KILOBYTE, 1 * KILOBYTE, queueRequestId); + ReadBufferManager.getBufferManager() + .queueReadAhead(inputStream, 2 * KILOBYTE, 1 * KILOBYTE, queueRequestId); + } + + private void verifyReadCallCount(AbfsClient client, int count) throws + AzureBlobFileSystemException, InterruptedException { + // ReadAhead threads are triggered asynchronously. + // Wait a second before verifying the number of total calls. + Thread.sleep(1000); + verify(client, times(count)).read(any(String.class), any(Long.class), + any(byte[].class), any(Integer.class), any(Integer.class), + any(String.class)); + } + + private void checkEvictedStatus(AbfsInputStream inputStream, int position, boolean expectedToThrowException) + throws Exception { + // Sleep for the eviction threshold time + Thread.sleep(ReadBufferManager.getBufferManager().getThreshold_age_milliseconds() + 1000); + + // Eviction is done only when AbfsInputStream tries to queue new items. + // 1 tryEvict will remove 1 eligible item. To ensure that the current test buffer + // will get evicted (considering there could be other tests running in parallel), + // call tryEvict for the number of items that are there in completedReadList. + int numOfCompletedReadListItems = ReadBufferManager.getBufferManager().getCompletedReadListSize(); + while(numOfCompletedReadListItems > 0) { + ReadBufferManager.getBufferManager().callTryEvict(); + numOfCompletedReadListItems--; + } + + if (expectedToThrowException) { + intercept(IOException.class, + () -> inputStream.read(position, new byte[1 * KILOBYTE], 0, 1 * KILOBYTE)); + } else { + inputStream.read(position, new byte[1 * KILOBYTE], 0, 1 * KILOBYTE); + } + } + + public TestAbfsInputStream() throws Exception { + super(); + } + + /** + * This test expects AbfsInputStream to throw the exception that readAhead + * thread received on read. The readAhead thread must be initiated from the + * active read request itself. + * Also checks that the ReadBuffers are evicted as per the ReadBufferManager + * threshold criteria. + * @throws Exception + */ + @Test + public void testFailedReadAhead() throws Exception { + AbfsClient client = getMockAbfsClient(); + AbfsRestOperation successOp = getMockRestOp(); + + // Stub : + // Read request leads to 3 readahead calls: Fail all 3 readahead-client.read() + // Actual read request fails with the failure in readahead thread + doThrow(new TimeoutException("Internal Server error for RAH-Thread-X")) + .doThrow(new TimeoutException("Internal Server error for RAH-Thread-Y")) + .doThrow(new TimeoutException("Internal Server error RAH-Thread-Z")) + .doReturn(successOp) // Any extra calls to read, pass it. + .when(client) + .read(any(String.class), any(Long.class), any(byte[].class), + any(Integer.class), any(Integer.class), any(String.class)); + + AbfsInputStream inputStream = getAbfsInputStream(client, "testFailedReadAhead.txt"); + + // Scenario: ReadAhead triggered from current active read call failed + // Before the change to return exception from readahead buffer, + // AbfsInputStream would have triggered an extra readremote on noticing + // data absent in readahead buffers + // In this test, a read should trigger 3 client.read() calls as file is 3 KB + // and readahead buffer size set in AbfsInputStream is 1 KB + // There should only be a total of 3 client.read() in this test. + intercept(IOException.class, + () -> inputStream.read(new byte[1 * KILOBYTE])); + + // Only the 3 readAhead threads should have triggered client.read + verifyReadCallCount(client, 3); + + // Stub returns success for the 4th read request, if ReadBuffers still + // persisted, ReadAheadManager getBlock would have returned exception. + checkEvictedStatus(inputStream, 0,false); + } + + /** + * The test expects AbfsInputStream to initiate a remote read request for + * the request offset and length when previous read ahead on the offset had failed. + * Also checks that the ReadBuffers are evicted as per the ReadBufferManager + * threshold criteria. + * @throws Exception + */ + @Test + public void testOlderReadAheadFailure() throws Exception { + AbfsClient client = getMockAbfsClient(); + AbfsRestOperation successOp = getMockRestOp(); + + // Stub : + // First Read request leads to 3 readahead calls: Fail all 3 readahead-client.read() + // A second read request will see that readahead had failed for data in + // the requested offset range and also that its is an older readahead request. + // System issue could have resolved by now, so attempt a new read only for the requested range. + doThrow(new TimeoutException("Internal Server error for RAH-X")) + .doThrow(new TimeoutException("Internal Server error for RAH-Y")) + .doThrow(new TimeoutException("Internal Server error for RAH-Z")) + .doReturn(successOp) // pass the read for second read request + .doReturn(successOp) // pass success for post eviction test + .when(client) + .read(any(String.class), any(Long.class), any(byte[].class), + any(Integer.class), any(Integer.class), any(String.class)); + + AbfsInputStream inputStream = getAbfsInputStream(client, "testOlderReadAheadFailure.txt"); + + // First read request that fails as the readahead triggered from this request failed. + intercept(IOException.class, + () -> inputStream.read(new byte[1 * KILOBYTE])); + + // Only the 3 readAhead threads should have triggered client.read + verifyReadCallCount(client, 3); + + // Second read request should retry the read (and not issue any new readaheads) + inputStream.read(1 * KILOBYTE, new byte[1 * KILOBYTE], 0, 1 * KILOBYTE); + + // Once created, mock will remember all interactions. So total number of read + // calls will be one more from earlier (there is a reset mock which will reset the + // count, but the mock stub is erased as well which needs AbsInputStream to be recreated, + // which beats the purpose) + verifyReadCallCount(client, 4); + + // Stub returns success for the 5th read request, if ReadBuffers still + // persisted request would have failed for position 0. + checkEvictedStatus(inputStream, 0,false); + } + + /** + * The test expects AbfsInputStream to utilize any data read ahead for + * requested offset and length. + * @throws Exception + */ + @Test + public void testSuccessfulReadAhead() throws Exception { + // Mock failure for client.read() + AbfsClient client = getMockAbfsClient(); + + // Success operation mock + AbfsRestOperation op = getMockRestOp(); + + // Stub : + // Pass all readAheads and fail the post eviction request to + // prove ReadAhead buffer is used + // for post eviction check, fail all read aheads + doReturn(op) + .doReturn(op) + .doReturn(op) + .doThrow(new TimeoutException("Internal Server error for RAH-X")) + .doThrow(new TimeoutException("Internal Server error for RAH-Y")) + .doThrow(new TimeoutException("Internal Server error for RAH-Z")) + .when(client) + .read(any(String.class), any(Long.class), any(byte[].class), + any(Integer.class), any(Integer.class), any(String.class)); + + AbfsInputStream inputStream = getAbfsInputStream(client, "testSuccessfulReadAhead.txt"); + + // First read request that triggers readAheads. + inputStream.read(new byte[1 * KILOBYTE]); + + // Only the 3 readAhead threads should have triggered client.read + verifyReadCallCount(client, 3); + + // Another read request whose requested data is already read ahead. + inputStream.read(1 * KILOBYTE, new byte[1 * KILOBYTE], 0, 1 * KILOBYTE); + + // Once created, mock will remember all interactions. + // As the above read should not have triggered any server calls, total + // number of read calls made at this point will be same as last. + verifyReadCallCount(client, 3); + + // Stub will throw exception for client.read() for 4th and later calls + // if not using the read-ahead buffer exception will be thrown on read + checkEvictedStatus(inputStream, 0, true); + } + + /** + * This test expects ReadAheadManager to throw exception if the read ahead + * thread had failed for the same queueRequestId. + * Also checks that the ReadBuffers are evicted as per the ReadBufferManager + * threshold criteria. + * @throws Exception + */ + @Test + public void testReadAheadManagerForFailedReadAhead() throws Exception { + AbfsClient client = getMockAbfsClient(); + AbfsRestOperation successOp = getMockRestOp(); + + // Stub : + // Read request leads to 3 readahead calls: Fail all 3 readahead-client.read() + // Actual read request fails with the failure in readahead thread + doThrow(new TimeoutException("Internal Server error for RAH-Thread-X")) + .doThrow(new TimeoutException("Internal Server error for RAH-Thread-Y")) + .doThrow(new TimeoutException("Internal Server error RAH-Thread-Z")) + .doReturn(successOp) // Any extra calls to read, pass it. + .when(client) + .read(any(String.class), any(Long.class), any(byte[].class), + any(Integer.class), any(Integer.class), any(String.class)); + + AbfsInputStream inputStream = getAbfsInputStream(client, "testReadAheadManagerForFailedReadAhead.txt"); + + UUID queueRequestId = randomUUID(); + queueReadAheads(inputStream, queueRequestId); + + // AbfsInputStream Read would have waited for the read-ahead for the requested offset + // as we are testing from ReadAheadManager directly, sleep for a sec to + // get the read ahead threads to complete + Thread.sleep(1000); + + // if readAhead failed for specific offset, getBlock should + // throw exception from the ReadBuffer when queueRequestId match. + intercept(IOException.class, + () -> ReadBufferManager.getBufferManager().getBlock( + inputStream + , 0 + , 1 * KILOBYTE + , new byte[1 * KILOBYTE] + , queueRequestId)); + + // Only the 3 readAhead threads should have triggered client.read + verifyReadCallCount(client, 3); + + // Stub returns success for the 4th read request, if ReadBuffers still + // persisted, ReadAheadManager getBlock would have returned exception. + checkEvictedStatus(inputStream, 0, false); + } + + /** + * The test expects ReadAheadManager to return 0 receivedBytes when previous + * read ahead on the offset had failed and not throw exception received then. + * Also checks that the ReadBuffers are evicted as per the ReadBufferManager + * threshold criteria. + * @throws Exception + */ + @Test + public void testReadAheadManagerForOlderReadAheadFailure() throws Exception { + AbfsClient client = getMockAbfsClient(); + AbfsRestOperation successOp = getMockRestOp(); + + // Stub : + // First Read request leads to 3 readahead calls: Fail all 3 readahead-client.read() + // A second read request will see that readahead had failed for data in + // the requested offset range but also that its is an older readahead request. + // System issue could have resolved by now, so attempt a new read only for the requested range. + doThrow(new TimeoutException("Internal Server error for RAH-X")) + .doThrow(new TimeoutException("Internal Server error for RAH-X")) + .doThrow(new TimeoutException("Internal Server error for RAH-X")) + .doReturn(successOp) // pass the read for second read request + .doReturn(successOp) // pass success for post eviction test + .when(client) + .read(any(String.class), any(Long.class), any(byte[].class), + any(Integer.class), any(Integer.class), any(String.class)); + + AbfsInputStream inputStream = getAbfsInputStream(client, "testReadAheadManagerForOlderReadAheadFailure.txt"); + + UUID queueRequestId = randomUUID(); + queueReadAheads(inputStream, queueRequestId); + + // AbfsInputStream Read would have waited for the read-ahead for the requested offset + // as we are testing from ReadAheadManager directly, sleep for a sec to + // get the read ahead threads to complete + Thread.sleep(1000); + + // Only the 3 readAhead threads should have triggered client.read + verifyReadCallCount(client, 3); + + // getBlock from a new read request should return 0 if there is a failure + // previously in reading ahead from that offset. + int bytesRead = ReadBufferManager.getBufferManager().getBlock( + inputStream + ,1 * KILOBYTE + , 1 * KILOBYTE + , new byte[1 * KILOBYTE] + , randomUUID()); + Assert.assertTrue("bytesRead should be zero when previously read " + + "ahead buffer had failed", bytesRead == 0); + + // Stub returns success for the 5th read request, if ReadBuffers still + // persisted request would have failed for position 0. + checkEvictedStatus(inputStream, 0, false); + } + + /** + * The test expects ReadAheadManager to return data from previously read + * ahead data of same offset. + * @throws Exception + */ + @Test + public void testReadAheadManagerForSuccessfulReadAhead() throws Exception { + // Mock failure for client.read() + AbfsClient client = getMockAbfsClient(); + + // Success operation mock + AbfsRestOperation op = getMockRestOp(); + + // Stub : + // Pass all readAheads and fail the post eviction request to + // prove ReadAhead buffer is used + doReturn(op) + .doReturn(op) + .doReturn(op) + .doThrow(new TimeoutException("Internal Server error for RAH-X")) // for post eviction request + .doThrow(new TimeoutException("Internal Server error for RAH-Y")) + .doThrow(new TimeoutException("Internal Server error for RAH-Z")) + .when(client) + .read(any(String.class), any(Long.class), any(byte[].class), + any(Integer.class), any(Integer.class), any(String.class)); + + AbfsInputStream inputStream = getAbfsInputStream(client, "testSuccessfulReadAhead.txt"); + + UUID queueRequestId = randomUUID(); + queueReadAheads(inputStream, queueRequestId); + + // AbfsInputStream Read would have waited for the read-ahead for the requested offset + // as we are testing from ReadAheadManager directly, sleep for a sec to + // get the read ahead threads to complete + Thread.sleep(1000); + + // Only the 3 readAhead threads should have triggered client.read + verifyReadCallCount(client, 3); + + // getBlock for a new read should return the buffer read-ahead + int bytesRead = ReadBufferManager.getBufferManager().getBlock( + inputStream + , 1 * KILOBYTE + , 1 * KILOBYTE + , new byte[1 * KILOBYTE] + , randomUUID()); + + Assert.assertTrue("bytesRead should be non-zero from the " + + "buffer that was read-ahead", bytesRead > 0); + + // Once created, mock will remember all interactions. + // As the above read should not have triggered any server calls, total + // number of read calls made at this point will be same as last. + verifyReadCallCount(client, 3); + + // Stub will throw exception for client.read() for 4th and later calls + // if not using the read-ahead buffer exception will be thrown on read + checkEvictedStatus(inputStream, 0, true); + } +} \ No newline at end of file From 5870238c3f1b017875fc05b7b709231129c411f1 Mon Sep 17 00:00:00 2001 From: Sneha Vijayarajan Date: Thu, 19 Mar 2020 16:08:28 +0530 Subject: [PATCH 02/11] Incorporating review comments from Vinay --- .../fs/azurebfs/services/AbfsInputStream.java | 8 +--- .../fs/azurebfs/services/ReadBuffer.java | 12 ------ .../azurebfs/services/ReadBufferManager.java | 33 ++++++++------- .../services/TestAbfsInputStream.java | 41 ++++++++----------- 4 files changed, 38 insertions(+), 56 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index b42e2019c9b92..d66da5bda89ee 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -22,9 +22,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.net.HttpURLConnection; -import java.util.UUID; -import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.slf4j.Logger; @@ -194,19 +192,18 @@ private int readInternal(final long position, final byte[] b, final int offset, int receivedBytes; // queue read-aheads - UUID queueReadAheadRequestId = UUID.randomUUID(); int numReadAheads = this.readAheadQueueDepth; long nextSize; long nextOffset = position; while (numReadAheads > 0 && nextOffset < contentLength) { nextSize = Math.min((long) bufferSize, contentLength - nextOffset); - ReadBufferManager.getBufferManager().queueReadAhead(this, nextOffset, (int) nextSize, queueReadAheadRequestId); + ReadBufferManager.getBufferManager().queueReadAhead(this, nextOffset, (int) nextSize); nextOffset = nextOffset + nextSize; numReadAheads--; } // try reading from buffers first - receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b, queueReadAheadRequestId); + receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b); if (receivedBytes > 0) { return receivedBytes; } @@ -219,7 +216,6 @@ private int readInternal(final long position, final byte[] b, final int offset, } } - @VisibleForTesting int readRemote(long position, byte[] b, int offset, int length) throws IOException { if (position < 0) { throw new IllegalArgumentException("attempting to read from negative offset"); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java index e6750b0aef945..f19f99d14f9a7 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java @@ -19,7 +19,6 @@ package org.apache.hadoop.fs.azurebfs.services; import java.io.IOException; import java.util.concurrent.CountDownLatch; -import java.util.UUID; import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus; @@ -44,9 +43,6 @@ class ReadBuffer { private boolean isAnyByteConsumed = false; private IOException errException = null; - // The unique queue request ID which identifies all the read buffers triggered from - // a single AbfsInputStream read request. - private UUID queueReadAheadRequestId = null; public AbfsInputStream getStream() { return stream; @@ -104,14 +100,6 @@ public void setErrException(final java.io.IOException errException) { this.errException = errException; } - public UUID getQueueReadAheadRequestId() { - return queueReadAheadRequestId; - } - - public void setQueueReadAheadRequestId(final UUID queueReadAheadRequestId) { - this.queueReadAheadRequestId = queueReadAheadRequestId; - } - public ReadBufferStatus getStatus() { return status; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java index 373de1005dad1..20bf3b5f3c863 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java @@ -27,7 +27,6 @@ import java.util.Queue; import java.util.Stack; import java.util.concurrent.CountDownLatch; -import java.util.UUID; import com.google.common.annotations.VisibleForTesting; @@ -94,10 +93,8 @@ private ReadBufferManager() { * @param stream The {@link AbfsInputStream} for which to do the read-ahead * @param requestedOffset The offset in the file which shoukd be read * @param requestedLength The length to read - * @param queueReadAheadRequestId unique queue request ID */ - void queueReadAhead(final AbfsInputStream stream, final long requestedOffset, final int requestedLength - , final UUID queueReadAheadRequestId) { + void queueReadAhead(final AbfsInputStream stream, final long requestedOffset, final int requestedLength) { if (LOGGER.isTraceEnabled()) { LOGGER.trace("Start Queueing readAhead for {} offset {} length {}", stream.getPath(), requestedOffset, requestedLength); @@ -119,7 +116,6 @@ void queueReadAhead(final AbfsInputStream stream, final long requestedOffset, fi buffer.setRequestedLength(requestedLength); buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE); buffer.setLatch(new CountDownLatch(1)); - buffer.setQueueReadAheadRequestId(queueReadAheadRequestId); Integer bufferIndex = freeList.pop(); // will return a value, since we have checked size > 0 already @@ -149,8 +145,8 @@ void queueReadAhead(final AbfsInputStream stream, final long requestedOffset, fi * @param buffer the buffer to read data into. Note that the buffer will be written into from offset 0. * @return the number of bytes read */ - int getBlock(final AbfsInputStream stream, final long position, final int length, final byte[] buffer, - final UUID queueReadAheadRequestId) throws IOException { + int getBlock(final AbfsInputStream stream, final long position, final int length, final byte[] buffer) + throws java.io.IOException { // not synchronized, so have to be careful with locking if (LOGGER.isTraceEnabled()) { LOGGER.trace("getBlock for file {} position {} thread {}", @@ -161,7 +157,7 @@ int getBlock(final AbfsInputStream stream, final long position, final int length int bytesRead = 0; synchronized (this) { - bytesRead = getBlockFromCompletedQueue(stream, position, length, buffer, queueReadAheadRequestId); + bytesRead = getBlockFromCompletedQueue(stream, position, length, buffer); } if (bytesRead > 0) { if (LOGGER.isTraceEnabled()) { @@ -303,6 +299,12 @@ private ReadBuffer getFromList(final Collection list, final AbfsInpu return null; } + /** + * Returns buffers that failed or passed from completed queue + * @param stream + * @param requestedOffset + * @return + */ private ReadBuffer getBufferFromCompletedQueue(final AbfsInputStream stream, final long requestedOffset) { for (ReadBuffer buffer : completedReadList) { if ((buffer.getStream() == stream) && (requestedOffset >= buffer.getOffset())) { @@ -326,8 +328,7 @@ private void clearFromReadAheadQueue(final AbfsInputStream stream, final long re } private int getBlockFromCompletedQueue(final AbfsInputStream stream, final long position, final int length, - final byte[] buffer, final UUID queueReadAheadRequestId) - throws IOException { + final byte[] buffer) throws IOException { ReadBuffer buf = getBufferFromCompletedQueue(stream, position); if (buf == null) { @@ -335,10 +336,12 @@ private int getBlockFromCompletedQueue(final AbfsInputStream stream, final long } if (buf.getStatus() == ReadBufferStatus.READ_FAILED) { - // is this failure from the requests queued by current queueReadAheadRequestId ? - // if yes, return failure exception - // else return 0 so that the main thread can make an attempt to read requested offset. - if (buf.getQueueReadAheadRequestId().equals(queueReadAheadRequestId)) { + // Eviction of a read buffer is triggered only when a queue request comes in + // and each eviction attempt tries to find one eligible buffer. + // Hence there are chances that an old read-ahead buffer with exception is still + // available. To prevent new read requests to fail due to such old buffers, + // return exception only from buffers that failed within last THRESHOLD_AGE_MILLISECONDS + if ((currentTimeMillis() - (buf.getTimeStamp()) < THRESHOLD_AGE_MILLISECONDS)) { // is read ahead issued from current queue request ID throw buf.getErrException(); } else { @@ -416,7 +419,6 @@ void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final i inProgressList.remove(buffer); if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) { buffer.setStatus(ReadBufferStatus.AVAILABLE); - buffer.setTimeStamp(currentTimeMillis()); buffer.setLength(bytesActuallyRead); completedReadList.add(buffer); } else { @@ -425,6 +427,7 @@ void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final i } buffer.setStatus(result); + buffer.setTimeStamp(currentTimeMillis()); completedReadList.add(buffer); } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java index 05b091595eb4d..bd9bd38290f88 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java @@ -19,7 +19,6 @@ package org.apache.hadoop.fs.azurebfs.services; import java.io.IOException; -import java.util.UUID; import org.junit.Assert; import org.junit.Test; @@ -28,7 +27,6 @@ import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TimeoutException; -import static java.util.UUID.randomUUID; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; @@ -82,14 +80,14 @@ private AbfsInputStream getAbfsInputStream(AbfsClient mockAbfsClient, String fil return inputStream; } - private void queueReadAheads(AbfsInputStream inputStream, UUID queueRequestId) { + private void queueReadAheads(AbfsInputStream inputStream) { // Mimic AbfsInputStream readAhead queue requests ReadBufferManager.getBufferManager() - .queueReadAhead(inputStream, 0, 1 * KILOBYTE, queueRequestId); + .queueReadAhead(inputStream, 0, 1 * KILOBYTE); ReadBufferManager.getBufferManager() - .queueReadAhead(inputStream, 1 * KILOBYTE, 1 * KILOBYTE, queueRequestId); + .queueReadAhead(inputStream, 1 * KILOBYTE, 1 * KILOBYTE); ReadBufferManager.getBufferManager() - .queueReadAhead(inputStream, 2 * KILOBYTE, 1 * KILOBYTE, queueRequestId); + .queueReadAhead(inputStream, 2 * KILOBYTE, 1 * KILOBYTE); } private void verifyReadCallCount(AbfsClient client, int count) throws @@ -208,6 +206,9 @@ public void testOlderReadAheadFailure() throws Exception { // Only the 3 readAhead threads should have triggered client.read verifyReadCallCount(client, 3); + // Sleep for 30 sec so that the read ahead buffers qualify for being old. + Thread.sleep(ReadBufferManager.getBufferManager().getThreshold_age_milliseconds()); + // Second read request should retry the read (and not issue any new readaheads) inputStream.read(1 * KILOBYTE, new byte[1 * KILOBYTE], 0, 1 * KILOBYTE); @@ -295,8 +296,7 @@ public void testReadAheadManagerForFailedReadAhead() throws Exception { AbfsInputStream inputStream = getAbfsInputStream(client, "testReadAheadManagerForFailedReadAhead.txt"); - UUID queueRequestId = randomUUID(); - queueReadAheads(inputStream, queueRequestId); + queueReadAheads(inputStream); // AbfsInputStream Read would have waited for the read-ahead for the requested offset // as we are testing from ReadAheadManager directly, sleep for a sec to @@ -304,14 +304,13 @@ public void testReadAheadManagerForFailedReadAhead() throws Exception { Thread.sleep(1000); // if readAhead failed for specific offset, getBlock should - // throw exception from the ReadBuffer when queueRequestId match. + // throw exception from the ReadBuffer that failed within last 30 sec intercept(IOException.class, () -> ReadBufferManager.getBufferManager().getBlock( inputStream , 0 , 1 * KILOBYTE - , new byte[1 * KILOBYTE] - , queueRequestId)); + , new byte[1 * KILOBYTE])); // Only the 3 readAhead threads should have triggered client.read verifyReadCallCount(client, 3); @@ -349,25 +348,23 @@ public void testReadAheadManagerForOlderReadAheadFailure() throws Exception { AbfsInputStream inputStream = getAbfsInputStream(client, "testReadAheadManagerForOlderReadAheadFailure.txt"); - UUID queueRequestId = randomUUID(); - queueReadAheads(inputStream, queueRequestId); + queueReadAheads(inputStream); // AbfsInputStream Read would have waited for the read-ahead for the requested offset - // as we are testing from ReadAheadManager directly, sleep for a sec to - // get the read ahead threads to complete - Thread.sleep(1000); + // as we are testing from ReadAheadManager directly, sleep for 30 secs so that + // read buffer qualifies for to be an old buffer + Thread.sleep(ReadBufferManager.getBufferManager().getThreshold_age_milliseconds()); // Only the 3 readAhead threads should have triggered client.read verifyReadCallCount(client, 3); // getBlock from a new read request should return 0 if there is a failure - // previously in reading ahead from that offset. + // 30 sec before in read ahead buffer for respective offset. int bytesRead = ReadBufferManager.getBufferManager().getBlock( inputStream ,1 * KILOBYTE , 1 * KILOBYTE - , new byte[1 * KILOBYTE] - , randomUUID()); + , new byte[1 * KILOBYTE]); Assert.assertTrue("bytesRead should be zero when previously read " + "ahead buffer had failed", bytesRead == 0); @@ -404,8 +401,7 @@ public void testReadAheadManagerForSuccessfulReadAhead() throws Exception { AbfsInputStream inputStream = getAbfsInputStream(client, "testSuccessfulReadAhead.txt"); - UUID queueRequestId = randomUUID(); - queueReadAheads(inputStream, queueRequestId); + queueReadAheads(inputStream); // AbfsInputStream Read would have waited for the read-ahead for the requested offset // as we are testing from ReadAheadManager directly, sleep for a sec to @@ -420,8 +416,7 @@ public void testReadAheadManagerForSuccessfulReadAhead() throws Exception { inputStream , 1 * KILOBYTE , 1 * KILOBYTE - , new byte[1 * KILOBYTE] - , randomUUID()); + , new byte[1 * KILOBYTE]); Assert.assertTrue("bytesRead should be non-zero from the " + "buffer that was read-ahead", bytesRead > 0); From 1cb20dbff9e353d7cd8d4bf400721a205ab3e234 Mon Sep 17 00:00:00 2001 From: Sneha Vijayarajan Date: Thu, 19 Mar 2020 16:21:50 +0530 Subject: [PATCH 03/11] Review comments from Inigo --- .../org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java | 2 +- .../apache/hadoop/fs/azurebfs/services/ReadBufferManager.java | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index d66da5bda89ee..610256f45bd28 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -238,7 +238,7 @@ int readRemote(long position, byte[] b, int offset, int length) throws IOExcepti final AbfsRestOperation op; AbfsPerfTracker tracker = client.getAbfsPerfTracker(); try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "readRemote", "read")) { - LOG.trace(String.format("Trigger client.read for path=%s position=%s offset=%s length=%s", path, position, offset, length)); + LOG.trace("Trigger client.read for path={} position={} offset={} length={}", path, position, offset, length); op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag); perfInfo.registerResult(op.getResult()).registerSuccess(true); } catch (AzureBlobFileSystemException ex) { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java index 20bf3b5f3c863..c7a789c2ef935 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java @@ -104,7 +104,6 @@ void queueReadAhead(final AbfsInputStream stream, final long requestedOffset, fi if (isAlreadyQueued(stream, requestedOffset)) { return; // already queued, do not queue again } - if (freeList.isEmpty() && !tryEvict()) { return; // no buffers available, cannot queue anything } From dbf025459034539381857b83b6d6b41eae8cebf2 Mon Sep 17 00:00:00 2001 From: Sneha Vijayarajan Date: Mon, 23 Mar 2020 11:49:41 +0530 Subject: [PATCH 04/11] Fix for read-ahead length --- .../fs/azurebfs/services/ReadBufferManager.java | 17 +++++++---------- .../fs/azurebfs/services/ReadBufferWorker.java | 10 +++++++++- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java index c7a789c2ef935..91e060c12c63a 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java @@ -306,13 +306,15 @@ private ReadBuffer getFromList(final Collection list, final AbfsInpu */ private ReadBuffer getBufferFromCompletedQueue(final AbfsInputStream stream, final long requestedOffset) { for (ReadBuffer buffer : completedReadList) { - if ((buffer.getStream() == stream) && (requestedOffset >= buffer.getOffset())) { - if ((requestedOffset < buffer.getOffset() + buffer.getLength()) - || requestedOffset < buffer.getOffset() + buffer.getRequestedLength()) { + // Buffer is returned if the requestedOffset is at or above buffer's + // offset but less than buffer's length or the actual requestedLength + if ((buffer.getStream() == stream) + && (requestedOffset >= buffer.getOffset()) + && ((requestedOffset < buffer.getOffset() + buffer.getLength()) + || (requestedOffset < buffer.getOffset() + buffer.getRequestedLength()))) { return buffer; } } - } return null; } @@ -335,13 +337,9 @@ private int getBlockFromCompletedQueue(final AbfsInputStream stream, final long } if (buf.getStatus() == ReadBufferStatus.READ_FAILED) { - // Eviction of a read buffer is triggered only when a queue request comes in - // and each eviction attempt tries to find one eligible buffer. - // Hence there are chances that an old read-ahead buffer with exception is still - // available. To prevent new read requests to fail due to such old buffers, + // To prevent new read requests to fail due to old read-ahead attempts, // return exception only from buffers that failed within last THRESHOLD_AGE_MILLISECONDS if ((currentTimeMillis() - (buf.getTimeStamp()) < THRESHOLD_AGE_MILLISECONDS)) { - // is read ahead issued from current queue request ID throw buf.getErrException(); } else { return 0; @@ -413,7 +411,6 @@ void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final i LOGGER.trace("ReadBufferWorker completed file {} for offset {} bytes {}", buffer.getStream().getPath(), buffer.getOffset(), bytesActuallyRead); } - synchronized (this) { inProgressList.remove(buffer); if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java index aa7c25ca0c83c..4f128c61b9bfb 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java @@ -62,7 +62,15 @@ public void run() { if (buffer != null) { try { // do the actual read, from the file. - int bytesRead = buffer.getStream().readRemote(buffer.getOffset(), buffer.getBuffer(), 0, buffer.getRequestedLength()); + int bytesRead = buffer.getStream().readRemote( + buffer.getOffset() + , buffer.getBuffer() + , 0 + // If AbfsInputStream was created with bigger buffer size than + // read-ahead buffer size, make sure a valid length is passed + // for remote read + , Math.min(buffer.getRequestedLength(), buffer.getBuffer().length)); + bufferManager.doneReading(buffer, ReadBufferStatus.AVAILABLE, bytesRead); // post result back to ReadBufferManager } catch (Exception ex) { buffer.setErrException(new IOException(ex)); From 4ba43efe00c9d00d9543493ddfb502e02a58dd69 Mon Sep 17 00:00:00 2001 From: Sneha Vijayarajan Date: Fri, 27 Mar 2020 11:12:36 +0530 Subject: [PATCH 05/11] Fixing some test comments --- .../hadoop/fs/azurebfs/services/TestAbfsInputStream.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java index bd9bd38290f88..1b3501fdda436 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java @@ -187,7 +187,7 @@ public void testOlderReadAheadFailure() throws Exception { // First Read request leads to 3 readahead calls: Fail all 3 readahead-client.read() // A second read request will see that readahead had failed for data in // the requested offset range and also that its is an older readahead request. - // System issue could have resolved by now, so attempt a new read only for the requested range. + // So attempt a new read only for the requested range. doThrow(new TimeoutException("Internal Server error for RAH-X")) .doThrow(new TimeoutException("Internal Server error for RAH-Y")) .doThrow(new TimeoutException("Internal Server error for RAH-Z")) @@ -206,7 +206,7 @@ public void testOlderReadAheadFailure() throws Exception { // Only the 3 readAhead threads should have triggered client.read verifyReadCallCount(client, 3); - // Sleep for 30 sec so that the read ahead buffers qualify for being old. + // Sleep for 30 sec so that the read ahead buffer qualifies for being old. Thread.sleep(ReadBufferManager.getBufferManager().getThreshold_age_milliseconds()); // Second read request should retry the read (and not issue any new readaheads) @@ -273,7 +273,7 @@ public void testSuccessfulReadAhead() throws Exception { /** * This test expects ReadAheadManager to throw exception if the read ahead - * thread had failed for the same queueRequestId. + * thread had failed within the last 30 sec. * Also checks that the ReadBuffers are evicted as per the ReadBufferManager * threshold criteria. * @throws Exception From 772c2757fc62e92f4b81841ee84baf885331f46b Mon Sep 17 00:00:00 2001 From: Sneha Vijayarajan Date: Mon, 30 Mar 2020 16:48:34 +0530 Subject: [PATCH 06/11] Fixing full namespace use of IOException --- .../java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java | 2 +- .../apache/hadoop/fs/azurebfs/services/ReadBufferManager.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java index f19f99d14f9a7..71a01b2a1f2e0 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java @@ -96,7 +96,7 @@ public IOException getErrException() { return errException; } - public void setErrException(final java.io.IOException errException) { + public void setErrException(final IOException errException) { this.errException = errException; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java index 91e060c12c63a..7439558cffb0f 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java @@ -145,7 +145,7 @@ void queueReadAhead(final AbfsInputStream stream, final long requestedOffset, fi * @return the number of bytes read */ int getBlock(final AbfsInputStream stream, final long position, final int length, final byte[] buffer) - throws java.io.IOException { + throws IOException { // not synchronized, so have to be careful with locking if (LOGGER.isTraceEnabled()) { LOGGER.trace("getBlock for file {} position {} thread {}", From 035fa7aac34fac4098cfe21d74324ae269cfd547 Mon Sep 17 00:00:00 2001 From: Sneha Vijayarajan Date: Mon, 6 Apr 2020 15:09:40 +0530 Subject: [PATCH 07/11] Fixing NoWhiteSpaceBefore checkstyle errors --- .../azurebfs/services/ReadBufferManager.java | 2 +- .../azurebfs/services/ReadBufferWorker.java | 8 +-- .../services/TestAbfsInputStream.java | 56 +++++++++---------- 3 files changed, 33 insertions(+), 33 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java index 7439558cffb0f..9dae880ae5e72 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java @@ -445,7 +445,7 @@ private long currentTimeMillis() { } @VisibleForTesting - int getThreshold_age_milliseconds() { + int getThresholdAgeMilliseconds() { return THRESHOLD_AGE_MILLISECONDS; } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java index 4f128c61b9bfb..41acd7e06f132 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java @@ -63,13 +63,13 @@ public void run() { try { // do the actual read, from the file. int bytesRead = buffer.getStream().readRemote( - buffer.getOffset() - , buffer.getBuffer() - , 0 + buffer.getOffset(), + buffer.getBuffer(), + 0, // If AbfsInputStream was created with bigger buffer size than // read-ahead buffer size, make sure a valid length is passed // for remote read - , Math.min(buffer.getRequestedLength(), buffer.getBuffer().length)); + Math.min(buffer.getRequestedLength(), buffer.getBuffer().length)); bufferManager.doneReading(buffer, ReadBufferStatus.AVAILABLE, bytesRead); // post result back to ReadBufferManager } catch (Exception ex) { diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java index 1b3501fdda436..54101110a1baa 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java @@ -57,8 +57,8 @@ private AbfsClient getMockAbfsClient() { // Mock failure for client.read() AbfsClient client = mock(AbfsClient.class); AbfsPerfTracker tracker = new AbfsPerfTracker( - "test" - , this.getAccountName(), + "test", + this.getAccountName(), this.getConfiguration()); when(client.getAbfsPerfTracker()).thenReturn(tracker); @@ -68,14 +68,14 @@ private AbfsClient getMockAbfsClient() { private AbfsInputStream getAbfsInputStream(AbfsClient mockAbfsClient, String fileName) { // Create AbfsInputStream with the client instance AbfsInputStream inputStream = new AbfsInputStream( - mockAbfsClient - , null - , FORWARD_SLASH + fileName - , 3 * KILOBYTE - , 1 * KILOBYTE // Setting read ahead buffer size of 1 KB - , this.getConfiguration().getReadAheadQueueDepth() - , this.getConfiguration().getTolerateOobAppends() - , "eTag"); + mockAbfsClient, + null, + FORWARD_SLASH + fileName, + 3 * KILOBYTE, + 1 * KILOBYTE, // Setting read ahead buffer size of 1 KB + this.getConfiguration().getReadAheadQueueDepth(), + this.getConfiguration().getTolerateOobAppends(), + "eTag"); return inputStream; } @@ -103,14 +103,14 @@ private void verifyReadCallCount(AbfsClient client, int count) throws private void checkEvictedStatus(AbfsInputStream inputStream, int position, boolean expectedToThrowException) throws Exception { // Sleep for the eviction threshold time - Thread.sleep(ReadBufferManager.getBufferManager().getThreshold_age_milliseconds() + 1000); + Thread.sleep(ReadBufferManager.getBufferManager().getThresholdAgeMilliseconds() + 1000); // Eviction is done only when AbfsInputStream tries to queue new items. // 1 tryEvict will remove 1 eligible item. To ensure that the current test buffer // will get evicted (considering there could be other tests running in parallel), // call tryEvict for the number of items that are there in completedReadList. int numOfCompletedReadListItems = ReadBufferManager.getBufferManager().getCompletedReadListSize(); - while(numOfCompletedReadListItems > 0) { + while (numOfCompletedReadListItems > 0) { ReadBufferManager.getBufferManager().callTryEvict(); numOfCompletedReadListItems--; } @@ -168,7 +168,7 @@ public void testFailedReadAhead() throws Exception { // Stub returns success for the 4th read request, if ReadBuffers still // persisted, ReadAheadManager getBlock would have returned exception. - checkEvictedStatus(inputStream, 0,false); + checkEvictedStatus(inputStream, 0, false); } /** @@ -207,7 +207,7 @@ public void testOlderReadAheadFailure() throws Exception { verifyReadCallCount(client, 3); // Sleep for 30 sec so that the read ahead buffer qualifies for being old. - Thread.sleep(ReadBufferManager.getBufferManager().getThreshold_age_milliseconds()); + Thread.sleep(ReadBufferManager.getBufferManager().getThresholdAgeMilliseconds()); // Second read request should retry the read (and not issue any new readaheads) inputStream.read(1 * KILOBYTE, new byte[1 * KILOBYTE], 0, 1 * KILOBYTE); @@ -220,7 +220,7 @@ public void testOlderReadAheadFailure() throws Exception { // Stub returns success for the 5th read request, if ReadBuffers still // persisted request would have failed for position 0. - checkEvictedStatus(inputStream, 0,false); + checkEvictedStatus(inputStream, 0, false); } /** @@ -307,10 +307,10 @@ public void testReadAheadManagerForFailedReadAhead() throws Exception { // throw exception from the ReadBuffer that failed within last 30 sec intercept(IOException.class, () -> ReadBufferManager.getBufferManager().getBlock( - inputStream - , 0 - , 1 * KILOBYTE - , new byte[1 * KILOBYTE])); + inputStream, + 0, + 1 * KILOBYTE, + new byte[1 * KILOBYTE])); // Only the 3 readAhead threads should have triggered client.read verifyReadCallCount(client, 3); @@ -353,7 +353,7 @@ public void testReadAheadManagerForOlderReadAheadFailure() throws Exception { // AbfsInputStream Read would have waited for the read-ahead for the requested offset // as we are testing from ReadAheadManager directly, sleep for 30 secs so that // read buffer qualifies for to be an old buffer - Thread.sleep(ReadBufferManager.getBufferManager().getThreshold_age_milliseconds()); + Thread.sleep(ReadBufferManager.getBufferManager().getThresholdAgeMilliseconds()); // Only the 3 readAhead threads should have triggered client.read verifyReadCallCount(client, 3); @@ -361,10 +361,10 @@ public void testReadAheadManagerForOlderReadAheadFailure() throws Exception { // getBlock from a new read request should return 0 if there is a failure // 30 sec before in read ahead buffer for respective offset. int bytesRead = ReadBufferManager.getBufferManager().getBlock( - inputStream - ,1 * KILOBYTE - , 1 * KILOBYTE - , new byte[1 * KILOBYTE]); + inputStream, + 1 * KILOBYTE, + 1 * KILOBYTE, + new byte[1 * KILOBYTE]); Assert.assertTrue("bytesRead should be zero when previously read " + "ahead buffer had failed", bytesRead == 0); @@ -413,10 +413,10 @@ public void testReadAheadManagerForSuccessfulReadAhead() throws Exception { // getBlock for a new read should return the buffer read-ahead int bytesRead = ReadBufferManager.getBufferManager().getBlock( - inputStream - , 1 * KILOBYTE - , 1 * KILOBYTE - , new byte[1 * KILOBYTE]); + inputStream, + 1 * KILOBYTE, + 1 * KILOBYTE, + new byte[1 * KILOBYTE]); Assert.assertTrue("bytesRead should be non-zero from the " + "buffer that was read-ahead", bytesRead > 0); From 52fba323269e6272849b89824dc372398ead4894 Mon Sep 17 00:00:00 2001 From: Sneha Vijayarajan Date: Tue, 7 Apr 2020 09:29:02 +0530 Subject: [PATCH 08/11] Checkstyle LOG field --- .../org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index 4b28f487ac222..f00476daaec8c 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -43,7 +43,7 @@ */ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, StreamCapabilities { - private static Logger LOG = LoggerFactory.getLogger(AbfsInputStream.class); + private static final Logger LOG = LoggerFactory.getLogger(AbfsInputStream.class); private final AbfsClient client; private final Statistics statistics; From 58ac7019dd59b3f56c8ff7c438b45ac6f43421b3 Mon Sep 17 00:00:00 2001 From: Sneha Vijayarajan Date: Wed, 22 Apr 2020 16:04:59 +0530 Subject: [PATCH 09/11] Review comments --- .../fs/azurebfs/services/ReadBuffer.java | 1 + .../azurebfs/services/ReadBufferManager.java | 18 ++++-- .../services/TestAbfsInputStream.java | 59 ++++++++++--------- 3 files changed, 45 insertions(+), 33 deletions(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java index 71a01b2a1f2e0..5d55726222de7 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.fs.azurebfs.services; + import java.io.IOException; import java.util.concurrent.CountDownLatch; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java index 9dae880ae5e72..c551b37392a8f 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java @@ -39,8 +39,9 @@ final class ReadBufferManager { private static final int NUM_BUFFERS = 16; private static final int BLOCK_SIZE = 4 * 1024 * 1024; private static final int NUM_THREADS = 8; - private static final int THRESHOLD_AGE_MILLISECONDS = 3000; // have to see if 3 seconds is a good threshold + private static final int DEFAULT_THRESHOLD_AGE_MILLISECONDS = 3000; // have to see if 3 seconds is a good threshold + private static int thresholdAgeMilliseconds = DEFAULT_THRESHOLD_AGE_MILLISECONDS; private Thread[] threads = new Thread[NUM_THREADS]; private byte[][] buffers; // array of byte[] buffers, to hold the data that is read private Stack freeList = new Stack<>(); // indices in buffers[] array that are available @@ -248,7 +249,7 @@ private synchronized boolean tryEvict() { earliestBirthday = buf.getTimeStamp(); } } - if ((currentTimeMillis() - earliestBirthday > THRESHOLD_AGE_MILLISECONDS) && (nodeToEvict != null)) { + if ((currentTimeMillis() - earliestBirthday > thresholdAgeMilliseconds) && (nodeToEvict != null)) { return evict(nodeToEvict); } @@ -299,7 +300,7 @@ private ReadBuffer getFromList(final Collection list, final AbfsInpu } /** - * Returns buffers that failed or passed from completed queue + * Returns buffers that failed or passed from completed queue. * @param stream * @param requestedOffset * @return @@ -338,8 +339,8 @@ private int getBlockFromCompletedQueue(final AbfsInputStream stream, final long if (buf.getStatus() == ReadBufferStatus.READ_FAILED) { // To prevent new read requests to fail due to old read-ahead attempts, - // return exception only from buffers that failed within last THRESHOLD_AGE_MILLISECONDS - if ((currentTimeMillis() - (buf.getTimeStamp()) < THRESHOLD_AGE_MILLISECONDS)) { + // return exception only from buffers that failed within last thresholdAgeMilliseconds + if ((currentTimeMillis() - (buf.getTimeStamp()) < thresholdAgeMilliseconds)) { throw buf.getErrException(); } else { return 0; @@ -446,7 +447,12 @@ private long currentTimeMillis() { @VisibleForTesting int getThresholdAgeMilliseconds() { - return THRESHOLD_AGE_MILLISECONDS; + return thresholdAgeMilliseconds; + } + + @VisibleForTesting + void setThresholdAgeMilliseconds(int thresholdAgeMs) { + thresholdAgeMilliseconds = thresholdAgeMs; } @VisibleForTesting diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java index 54101110a1baa..8c5f978835193 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java @@ -43,7 +43,9 @@ public class TestAbfsInputStream extends AbstractAbfsIntegrationTest { - private static final int KILOBYTE = 1024; + private static final int _1KB = 1 * 1024; + private static final int _2KB = 2 * 1024; + private static final int _3KB = 3 * 1024; private AbfsRestOperation getMockRestOp() { AbfsRestOperation op = mock(AbfsRestOperation.class); @@ -71,8 +73,8 @@ private AbfsInputStream getAbfsInputStream(AbfsClient mockAbfsClient, String fil mockAbfsClient, null, FORWARD_SLASH + fileName, - 3 * KILOBYTE, - 1 * KILOBYTE, // Setting read ahead buffer size of 1 KB + _3KB, + _1KB, // Setting read ahead buffer size of 1 KB this.getConfiguration().getReadAheadQueueDepth(), this.getConfiguration().getTolerateOobAppends(), "eTag"); @@ -83,11 +85,11 @@ private AbfsInputStream getAbfsInputStream(AbfsClient mockAbfsClient, String fil private void queueReadAheads(AbfsInputStream inputStream) { // Mimic AbfsInputStream readAhead queue requests ReadBufferManager.getBufferManager() - .queueReadAhead(inputStream, 0, 1 * KILOBYTE); + .queueReadAhead(inputStream, 0, _1KB); ReadBufferManager.getBufferManager() - .queueReadAhead(inputStream, 1 * KILOBYTE, 1 * KILOBYTE); + .queueReadAhead(inputStream, _1KB, _1KB); ReadBufferManager.getBufferManager() - .queueReadAhead(inputStream, 2 * KILOBYTE, 1 * KILOBYTE); + .queueReadAhead(inputStream, _2KB, _2KB); } private void verifyReadCallCount(AbfsClient client, int count) throws @@ -117,14 +119,16 @@ private void checkEvictedStatus(AbfsInputStream inputStream, int position, boole if (expectedToThrowException) { intercept(IOException.class, - () -> inputStream.read(position, new byte[1 * KILOBYTE], 0, 1 * KILOBYTE)); + () -> inputStream.read(position, new byte[_1KB], 0, _1KB)); } else { - inputStream.read(position, new byte[1 * KILOBYTE], 0, 1 * KILOBYTE); + inputStream.read(position, new byte[_1KB], 0, _1KB); } } public TestAbfsInputStream() throws Exception { super(); + // Reduce thresholdAgeMilliseconds to 3 sec for the tests + ReadBufferManager.getBufferManager().setThresholdAgeMilliseconds(3000); } /** @@ -161,7 +165,7 @@ public void testFailedReadAhead() throws Exception { // and readahead buffer size set in AbfsInputStream is 1 KB // There should only be a total of 3 client.read() in this test. intercept(IOException.class, - () -> inputStream.read(new byte[1 * KILOBYTE])); + () -> inputStream.read(new byte[_1KB])); // Only the 3 readAhead threads should have triggered client.read verifyReadCallCount(client, 3); @@ -201,16 +205,16 @@ public void testOlderReadAheadFailure() throws Exception { // First read request that fails as the readahead triggered from this request failed. intercept(IOException.class, - () -> inputStream.read(new byte[1 * KILOBYTE])); + () -> inputStream.read(new byte[_1KB])); // Only the 3 readAhead threads should have triggered client.read verifyReadCallCount(client, 3); - // Sleep for 30 sec so that the read ahead buffer qualifies for being old. + // Sleep for thresholdAgeMs so that the read ahead buffer qualifies for being old. Thread.sleep(ReadBufferManager.getBufferManager().getThresholdAgeMilliseconds()); // Second read request should retry the read (and not issue any new readaheads) - inputStream.read(1 * KILOBYTE, new byte[1 * KILOBYTE], 0, 1 * KILOBYTE); + inputStream.read(_1KB, new byte[_1KB], 0, _1KB); // Once created, mock will remember all interactions. So total number of read // calls will be one more from earlier (there is a reset mock which will reset the @@ -253,13 +257,13 @@ public void testSuccessfulReadAhead() throws Exception { AbfsInputStream inputStream = getAbfsInputStream(client, "testSuccessfulReadAhead.txt"); // First read request that triggers readAheads. - inputStream.read(new byte[1 * KILOBYTE]); + inputStream.read(new byte[_1KB]); // Only the 3 readAhead threads should have triggered client.read verifyReadCallCount(client, 3); // Another read request whose requested data is already read ahead. - inputStream.read(1 * KILOBYTE, new byte[1 * KILOBYTE], 0, 1 * KILOBYTE); + inputStream.read(_1KB, new byte[_1KB], 0, _1KB); // Once created, mock will remember all interactions. // As the above read should not have triggered any server calls, total @@ -273,7 +277,7 @@ public void testSuccessfulReadAhead() throws Exception { /** * This test expects ReadAheadManager to throw exception if the read ahead - * thread had failed within the last 30 sec. + * thread had failed within the last thresholdAgeMilliseconds. * Also checks that the ReadBuffers are evicted as per the ReadBufferManager * threshold criteria. * @throws Exception @@ -304,13 +308,13 @@ public void testReadAheadManagerForFailedReadAhead() throws Exception { Thread.sleep(1000); // if readAhead failed for specific offset, getBlock should - // throw exception from the ReadBuffer that failed within last 30 sec + // throw exception from the ReadBuffer that failed within last thresholdAgeMilliseconds sec intercept(IOException.class, () -> ReadBufferManager.getBufferManager().getBlock( inputStream, 0, - 1 * KILOBYTE, - new byte[1 * KILOBYTE])); + _1KB, + new byte[_1KB])); // Only the 3 readAhead threads should have triggered client.read verifyReadCallCount(client, 3); @@ -351,7 +355,7 @@ public void testReadAheadManagerForOlderReadAheadFailure() throws Exception { queueReadAheads(inputStream); // AbfsInputStream Read would have waited for the read-ahead for the requested offset - // as we are testing from ReadAheadManager directly, sleep for 30 secs so that + // as we are testing from ReadAheadManager directly, sleep for thresholdAgeMilliseconds so that // read buffer qualifies for to be an old buffer Thread.sleep(ReadBufferManager.getBufferManager().getThresholdAgeMilliseconds()); @@ -362,11 +366,11 @@ public void testReadAheadManagerForOlderReadAheadFailure() throws Exception { // 30 sec before in read ahead buffer for respective offset. int bytesRead = ReadBufferManager.getBufferManager().getBlock( inputStream, - 1 * KILOBYTE, - 1 * KILOBYTE, - new byte[1 * KILOBYTE]); - Assert.assertTrue("bytesRead should be zero when previously read " - + "ahead buffer had failed", bytesRead == 0); + _1KB, + _1KB, + new byte[_1KB]); + Assert.assertEquals("bytesRead should be zero when previously read " + + "ahead buffer had failed", 0, bytesRead); // Stub returns success for the 5th read request, if ReadBuffers still // persisted request would have failed for position 0. @@ -414,9 +418,9 @@ public void testReadAheadManagerForSuccessfulReadAhead() throws Exception { // getBlock for a new read should return the buffer read-ahead int bytesRead = ReadBufferManager.getBufferManager().getBlock( inputStream, - 1 * KILOBYTE, - 1 * KILOBYTE, - new byte[1 * KILOBYTE]); + _1KB, + _1KB, + new byte[_1KB]); Assert.assertTrue("bytesRead should be non-zero from the " + "buffer that was read-ahead", bytesRead > 0); @@ -430,4 +434,5 @@ public void testReadAheadManagerForSuccessfulReadAhead() throws Exception { // if not using the read-ahead buffer exception will be thrown on read checkEvictedStatus(inputStream, 0, true); } + } \ No newline at end of file From 08c072358e31d31c674dc1aff00916bc6a784ea2 Mon Sep 17 00:00:00 2001 From: Sneha Vijayarajan Date: Tue, 19 May 2020 18:18:06 +0530 Subject: [PATCH 10/11] Test updates needed post SAS change --- .../fs/azurebfs/services/AbfsInputStream.java | 7 ++++ .../services/TestAbfsInputStream.java | 8 ++++- .../fs/azurebfs/utils/TestCachedSASToken.java | 34 +++++++++++++++++++ 3 files changed, 48 insertions(+), 1 deletion(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index 85eef334c4827..50380c9bb9f40 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -24,6 +24,7 @@ import java.net.HttpURLConnection; import com.google.common.base.Preconditions; +import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -436,4 +437,10 @@ public boolean hasCapability(String capability) { byte[] getBuffer() { return buffer; } + + @VisibleForTesting + protected void setCachedSasToken(final CachedSASToken cachedSasToken) { + this.cachedSasToken = cachedSasToken; + } + } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java index a1f23f4e26755..c9dacd6eb0642 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.TimeoutException; +import org.apache.hadoop.fs.azurebfs.utils.TestCachedSASToken; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doReturn; @@ -34,6 +35,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; + import static org.apache.hadoop.test.LambdaTestUtils.intercept; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FORWARD_SLASH; @@ -53,6 +55,7 @@ private AbfsRestOperation getMockRestOp() { AbfsHttpOperation httpOp = mock(AbfsHttpOperation.class); when(httpOp.getBytesReceived()).thenReturn(1024L); when(op.getResult()).thenReturn(httpOp); + when(op.getSasToken()).thenReturn(TestCachedSASToken.getTestCachedSASTokenInstance().get()); return op; } @@ -76,9 +79,12 @@ private AbfsInputStream getAbfsInputStream(AbfsClient mockAbfsClient, String fil null, FORWARD_SLASH + fileName, THREE_KB, - inputStreamContext.withReadBufferSize(ONE_KB), + inputStreamContext.withReadBufferSize(ONE_KB).withReadAheadQueueDepth(10), "eTag"); + inputStream.setCachedSasToken( + TestCachedSASToken.getTestCachedSASTokenInstance()); + return inputStream; } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TestCachedSASToken.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TestCachedSASToken.java index 1016d4bbbb4c4..cbba80877206f 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TestCachedSASToken.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/TestCachedSASToken.java @@ -22,12 +22,14 @@ import java.time.OffsetDateTime; import java.time.ZoneOffset; import java.time.format.DateTimeFormatter; +import java.util.UUID; import org.junit.Assert; import org.junit.Test; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_SAS_TOKEN_RENEW_PERIOD_FOR_STREAMS_IN_SECONDS; import static java.time.temporal.ChronoUnit.SECONDS; +import static java.time.temporal.ChronoUnit.DAYS; /** * Test CachedSASToken. @@ -159,4 +161,36 @@ public void testUpdateAndGetWithInvalidToken() throws IOException { cachedToken = cachedSasToken.get(); Assert.assertNull(cachedToken); } + + public static CachedSASToken getTestCachedSASTokenInstance() { + String expiryPostADay = OffsetDateTime.now(ZoneOffset.UTC) + .plus(1, DAYS) + .format(DateTimeFormatter.ISO_DATE_TIME); + String version = "2020-20-20"; + + StringBuilder sb = new StringBuilder(); + sb.append("skoid="); + sb.append(UUID.randomUUID().toString()); + sb.append("&sktid="); + sb.append(UUID.randomUUID().toString()); + sb.append("&skt="); + sb.append(OffsetDateTime.now(ZoneOffset.UTC) + .minus(1, DAYS) + .format(DateTimeFormatter.ISO_DATE_TIME)); + sb.append("&ske="); + sb.append(expiryPostADay); + sb.append("&sks=b"); + sb.append("&skv="); + sb.append(version); + sb.append("&sp=rw"); + sb.append("&sr=b"); + sb.append("&se="); + sb.append(expiryPostADay); + sb.append("&sv=2"); + sb.append(version); + + CachedSASToken cachedSASToken = new CachedSASToken(); + cachedSASToken.update(sb.toString()); + return cachedSASToken; + } } \ No newline at end of file From 669f3c18d26b0c29c1c50dcc8e11a039d2fe4a72 Mon Sep 17 00:00:00 2001 From: Sneha Vijayarajan Date: Tue, 19 May 2020 21:20:36 +0530 Subject: [PATCH 11/11] Fix findbug issue --- .../apache/hadoop/fs/azurebfs/services/ReadBufferManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java index c551b37392a8f..73c23b0155133 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java @@ -451,7 +451,7 @@ int getThresholdAgeMilliseconds() { } @VisibleForTesting - void setThresholdAgeMilliseconds(int thresholdAgeMs) { + static void setThresholdAgeMilliseconds(int thresholdAgeMs) { thresholdAgeMilliseconds = thresholdAgeMs; }