Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStream;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamContext;
import org.apache.hadoop.fs.azurebfs.services.AbfsInputStreamStatisticsImpl;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStream;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamContext;
import org.apache.hadoop.fs.azurebfs.services.AbfsOutputStreamStatisticsImpl;
Expand Down Expand Up @@ -509,6 +510,7 @@ private AbfsInputStreamContext populateAbfsInputStreamContext() {
.withReadBufferSize(abfsConfiguration.getReadBufferSize())
.withReadAheadQueueDepth(abfsConfiguration.getReadAheadQueueDepth())
.withTolerateOobAppends(abfsConfiguration.getTolerateOobAppends())
.withStreamStatistics(new AbfsInputStreamStatisticsImpl())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
// of valid bytes in buffer)
private boolean closed = false;

/** Stream statistics. */
private final AbfsInputStreamStatistics streamStatistics;

public AbfsInputStream(
final AbfsClient client,
final Statistics statistics,
Expand All @@ -86,6 +89,7 @@ public AbfsInputStream(
this.readAheadEnabled = true;
this.cachedSasToken = new CachedSASToken(
abfsInputStreamContext.getSasTokenRenewPeriodForStreamsInSeconds());
this.streamStatistics = abfsInputStreamContext.getStreamStatistics();
}

public String getPath() {
Expand All @@ -105,10 +109,21 @@ public int read() throws IOException {

@Override
public synchronized int read(final byte[] b, final int off, final int len) throws IOException {
// check if buffer is null before logging the length
if (b != null) {
LOG.debug("read requested b.length = {} offset = {} len = {}", b.length,
off, len);
} else {
LOG.debug("read requested b = null offset = {} len = {}", off, len);
}

int currentOff = off;
int currentLen = len;
int lastReadBytes;
int totalReadBytes = 0;
if (streamStatistics != null) {
streamStatistics.readOperationStarted(off, len);
}
incrementReadOps();
do {
lastReadBytes = readOneBlock(b, currentOff, currentLen);
Expand All @@ -130,6 +145,8 @@ private int readOneBlock(final byte[] b, final int off, final int len) throws IO
}

Preconditions.checkNotNull(b);
LOG.debug("read one block requested b.length = {} off {} len {}", b.length,
off, len);

if (len == 0) {
return 0;
Expand All @@ -155,6 +172,7 @@ private int readOneBlock(final byte[] b, final int off, final int len) throws IO
bCursor = 0;
limit = 0;
if (buffer == null) {
LOG.debug("created new buffer size {}", bufferSize);
buffer = new byte[bufferSize];
}

Expand Down Expand Up @@ -183,6 +201,11 @@ private int readOneBlock(final byte[] b, final int off, final int len) throws IO
if (statistics != null) {
statistics.incrementBytesRead(bytesToRead);
}
if (streamStatistics != null) {
// Bytes read from the local buffer.
streamStatistics.bytesReadFromBuffer(bytesToRead);
streamStatistics.bytesRead(bytesToRead);
}
return bytesToRead;
}

Expand All @@ -200,8 +223,11 @@ private int readInternal(final long position, final byte[] b, final int offset,
int numReadAheads = this.readAheadQueueDepth;
long nextSize;
long nextOffset = position;
LOG.debug("read ahead enabled issuing readheads num = {}", numReadAheads);
while (numReadAheads > 0 && nextOffset < contentLength) {
nextSize = Math.min((long) bufferSize, contentLength - nextOffset);
LOG.debug("issuing read ahead requestedOffset = {} requested size {}",
nextOffset, nextSize);
ReadBufferManager.getBufferManager().queueReadAhead(this, nextOffset, (int) nextSize);
nextOffset = nextOffset + nextSize;
numReadAheads--;
Expand All @@ -211,13 +237,15 @@ private int readInternal(final long position, final byte[] b, final int offset,
receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b);
if (receivedBytes > 0) {
incrementReadOps();
LOG.debug("Received data from read ahead, not doing remote read");
return receivedBytes;
}

// got nothing from read-ahead, do our own read now
receivedBytes = readRemote(position, b, offset, length);
return receivedBytes;
} else {
LOG.debug("read ahead disabled, reading remote");
return readRemote(position, b, offset, length);
}
}
Expand Down Expand Up @@ -247,6 +275,11 @@ int readRemote(long position, byte[] b, int offset, int length) throws IOExcepti
LOG.trace("Trigger client.read for path={} position={} offset={} length={}", path, position, offset, length);
op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag, cachedSasToken.get());
cachedSasToken.update(op.getSasToken());
if (streamStatistics != null) {
streamStatistics.remoteReadOperation();
}
LOG.debug("issuing HTTP GET request params position = {} b.length = {} "
+ "offset = {} length = {}", position, b.length, offset, length);
perfInfo.registerResult(op.getResult()).registerSuccess(true);
incrementReadOps();
} catch (AzureBlobFileSystemException ex) {
Expand All @@ -262,6 +295,7 @@ int readRemote(long position, byte[] b, int offset, int length) throws IOExcepti
if (bytesRead > Integer.MAX_VALUE) {
throw new IOException("Unexpected Content-Length");
}
LOG.debug("HTTP request read bytes = {}", bytesRead);
return (int) bytesRead;
}

Expand All @@ -282,6 +316,7 @@ private void incrementReadOps() {
*/
@Override
public synchronized void seek(long n) throws IOException {
LOG.debug("requested seek to position {}", n);
if (closed) {
throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED);
}
Expand All @@ -292,13 +327,21 @@ public synchronized void seek(long n) throws IOException {
throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF);
}

if (streamStatistics != null) {
streamStatistics.seek(n, fCursor);
}

if (n>=fCursor-limit && n<=fCursor) { // within buffer
bCursor = (int) (n-(fCursor-limit));
if (streamStatistics != null) {
streamStatistics.seekInBuffer();
}
return;
}

// next read will read from here
fCursor = n;
LOG.debug("set fCursor to {}", fCursor);

//invalidate buffer
limit = 0;
Expand Down Expand Up @@ -390,6 +433,7 @@ public boolean seekToNewSource(long l) throws IOException {
public synchronized void close() throws IOException {
closed = true;
buffer = null; // de-reference the buffer so it can be GC'ed sooner
LOG.debug("Closing {}", this);
}

/**
Expand Down Expand Up @@ -443,4 +487,28 @@ protected void setCachedSasToken(final CachedSASToken cachedSasToken) {
this.cachedSasToken = cachedSasToken;
}

/**
* Getter for AbfsInputStreamStatistics.
*
* @return an instance of AbfsInputStreamStatistics.
*/
@VisibleForTesting
public AbfsInputStreamStatistics getStreamStatistics() {
return streamStatistics;
}

/**
* Get the statistics of the stream.
* @return a string value.
*/
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(super.toString());
if (streamStatistics != null) {
sb.append("AbfsInputStream@(").append(this.hashCode()).append("){");
sb.append(streamStatistics.toString());
sb.append("}");
}
return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public class AbfsInputStreamContext extends AbfsStreamContext {

private boolean tolerateOobAppends;

private AbfsInputStreamStatistics streamStatistics;

public AbfsInputStreamContext(final long sasTokenRenewPeriodForStreamsInSeconds) {
super(sasTokenRenewPeriodForStreamsInSeconds);
}
Expand All @@ -52,6 +54,12 @@ public AbfsInputStreamContext withTolerateOobAppends(
return this;
}

public AbfsInputStreamContext withStreamStatistics(
final AbfsInputStreamStatistics streamStatistics) {
this.streamStatistics = streamStatistics;
return this;
}

public AbfsInputStreamContext build() {
// Validation of parameters to be done here.
return this;
Expand All @@ -68,4 +76,8 @@ public int getReadAheadQueueDepth() {
public boolean isTolerateOobAppends() {
return tolerateOobAppends;
}

public AbfsInputStreamStatistics getStreamStatistics() {
return streamStatistics;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.azurebfs.services;

import org.apache.hadoop.classification.InterfaceStability;

/**
* Interface for statistics for the AbfsInputStream.
*/
@InterfaceStability.Unstable
public interface AbfsInputStreamStatistics {
/**
* Seek backwards, incrementing the seek and backward seek counters.
*
* @param negativeOffset how far was the seek?
* This is expected to be negative.
*/
void seekBackwards(long negativeOffset);

/**
* Record a forward seek, adding a seek operation, a forward
* seek operation, and any bytes skipped.
*
* @param skipped number of bytes skipped by reading from the stream.
* If the seek was implemented by a close + reopen, set this to zero.
*/
void seekForwards(long skipped);

/**
* Record a forward or backward seek, adding a seek operation, a forward or
* a backward seek operation, and number of bytes skipped.
*
* @param seekTo seek to the position.
* @param currentPos current position.
*/
void seek(long seekTo, long currentPos);

/**
* Increment the bytes read counter by the number of bytes;
* no-op if the argument is negative.
*
* @param bytes number of bytes read.
*/
void bytesRead(long bytes);

/**
* Record the total bytes read from buffer.
*
* @param bytes number of bytes that are read from buffer.
*/
void bytesReadFromBuffer(long bytes);

/**
* Records the total number of seeks done in the buffer.
*/
void seekInBuffer();

/**
* A {@code read(byte[] buf, int off, int len)} operation has started.
*
* @param pos starting position of the read.
* @param len length of bytes to read.
*/
void readOperationStarted(long pos, long len);

/**
* Records a successful remote read operation.
*/
void remoteReadOperation();

/**
* Makes the string of all the AbfsInputStream statistics.
* @return the string with all the statistics.
*/
@Override
String toString();
}
Loading