From 7c9cf1e573473785d958e866a1d9da34451ab64f Mon Sep 17 00:00:00 2001 From: Gabor Bota Date: Mon, 27 Jan 2020 16:10:45 +0100 Subject: [PATCH] HADOOP-16961. ABFS: Adding metrics to AbfsInputStream Change-Id: I034b771533b8314364a3762034439e323758ee09 --- .../fs/azurebfs/services/AbfsInputStream.java | 56 +++++++ .../services/AbfsInputStreamStatistics.java | 74 +++++++++ .../AbfsInputStreamStatisticsImpl.java | 145 ++++++++++++++++++ 3 files changed, 275 insertions(+) create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatistics.java create mode 100644 hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index 0c06014a40832..5e4decd82c267 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -24,6 +24,8 @@ import java.net.HttpURLConnection; import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.CanUnbuffer; import org.apache.hadoop.fs.FSExceptionMessages; @@ -41,6 +43,12 @@ public class AbfsInputStream extends FSInputStream implements CanUnbuffer, StreamCapabilities { + private static final Logger LOG = LoggerFactory.getLogger( + AbfsInputStream.class); + + /** Stream statistics. */ + private final AbfsInputStreamStatisticsImpl streamStatistics; + private final AbfsClient client; private final Statistics statistics; private final String path; @@ -78,6 +86,7 @@ public AbfsInputStream( this.tolerateOobAppends = tolerateOobAppends; this.eTag = eTag; this.readAheadEnabled = true; + this.streamStatistics = new AbfsInputStreamStatisticsImpl(); } public String getPath() { @@ -97,10 +106,19 @@ public int read() throws IOException { @Override public synchronized int read(final byte[] b, final int off, final int len) throws IOException { + // check if buffer is null before logging the length + if (b != null) { + LOG.debug("read requested b.length = {} offset = {} len = {}", b.length, + off, len); + } else { + LOG.debug("read requested b = null offset = {} len = {}", off, len); + } + int currentOff = off; int currentLen = len; int lastReadBytes; int totalReadBytes = 0; + streamStatistics.readOperationStarted(off, len); incrementReadOps(); do { lastReadBytes = readOneBlock(b, currentOff, currentLen); @@ -122,6 +140,8 @@ private int readOneBlock(final byte[] b, final int off, final int len) throws IO } Preconditions.checkNotNull(b); + LOG.debug("read one block requested b.length = {} off {} len {}", b.length, + off, len); if (len == 0) { return 0; @@ -147,6 +167,7 @@ private int readOneBlock(final byte[] b, final int off, final int len) throws IO bCursor = 0; limit = 0; if (buffer == null) { + LOG.debug("created new buffer size {}", bufferSize); buffer = new byte[bufferSize]; } @@ -175,6 +196,7 @@ private int readOneBlock(final byte[] b, final int off, final int len) throws IO if (statistics != null) { statistics.incrementBytesRead(bytesToRead); } + streamStatistics.bytesRead(bytesToRead); return bytesToRead; } @@ -192,8 +214,11 @@ private int readInternal(final long position, final byte[] b, final int offset, int numReadAheads = this.readAheadQueueDepth; long nextSize; long nextOffset = position; + LOG.debug("read ahead enabled issuing readheads num = {}", numReadAheads); while (numReadAheads > 0 && nextOffset < contentLength) { nextSize = Math.min((long) bufferSize, contentLength - nextOffset); + LOG.debug("issuing read ahead requestedOffset = {} requested size {}", + nextOffset, nextSize); ReadBufferManager.getBufferManager().queueReadAhead(this, nextOffset, (int) nextSize); nextOffset = nextOffset + nextSize; numReadAheads--; @@ -203,6 +228,8 @@ private int readInternal(final long position, final byte[] b, final int offset, receivedBytes = ReadBufferManager.getBufferManager().getBlock(this, position, length, b); if (receivedBytes > 0) { incrementReadOps(); + LOG.debug("Received data from read ahead, not doing remote read"); + streamStatistics.bytesReadFromBuffer(receivedBytes); return receivedBytes; } @@ -210,6 +237,7 @@ private int readInternal(final long position, final byte[] b, final int offset, receivedBytes = readRemote(position, b, offset, length); return receivedBytes; } else { + LOG.debug("read ahead disabled, reading remote"); return readRemote(position, b, offset, length); } } @@ -236,6 +264,10 @@ int readRemote(long position, byte[] b, int offset, int length) throws IOExcepti final AbfsRestOperation op; AbfsPerfTracker tracker = client.getAbfsPerfTracker(); try (AbfsPerfInfo perfInfo = new AbfsPerfInfo(tracker, "readRemote", "read")) { + streamStatistics.remoteReadOperation(); + LOG.debug( + "issuing HTTP GET request params position = {} b.length = {} offset = {} length = {}", + position, b.length, offset, length); op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag); perfInfo.registerResult(op.getResult()).registerSuccess(true); incrementReadOps(); @@ -252,6 +284,7 @@ int readRemote(long position, byte[] b, int offset, int length) throws IOExcepti if (bytesRead > Integer.MAX_VALUE) { throw new IOException("Unexpected Content-Length"); } + LOG.debug("HTTP request read bytes = {}", bytesRead); return (int) bytesRead; } @@ -272,6 +305,7 @@ private void incrementReadOps() { */ @Override public synchronized void seek(long n) throws IOException { + LOG.debug("requested seek to position {}", n); if (closed) { throw new IOException(FSExceptionMessages.STREAM_IS_CLOSED); } @@ -282,13 +316,17 @@ public synchronized void seek(long n) throws IOException { throw new EOFException(FSExceptionMessages.CANNOT_SEEK_PAST_EOF); } + streamStatistics.seek(n, fCursor); + if (n>=fCursor-limit && n<=fCursor) { // within buffer bCursor = (int) (n-(fCursor-limit)); + streamStatistics.seekInBuffer(); return; } // next read will read from here fCursor = n; + LOG.debug("set fCursor to {}", fCursor); //invalidate buffer limit = 0; @@ -380,6 +418,7 @@ public boolean seekToNewSource(long l) throws IOException { public synchronized void close() throws IOException { closed = true; buffer = null; // de-reference the buffer so it can be GC'ed sooner + LOG.debug("Closing {}", this); } /** @@ -427,4 +466,21 @@ public boolean hasCapability(String capability) { byte[] getBuffer() { return buffer; } + + public AbfsInputStreamStatisticsImpl getStreamStatistics() { + return streamStatistics; + } + + /** + * Get the statistics of the stream. + * @return a string value. + */ + @Override + public String toString() { + final StringBuilder sb = new StringBuilder(super.toString()); + sb.append("AbfsInputStream@(").append(this.hashCode()).append("){"); + sb.append(streamStatistics.toString()); + sb.append("}"); + return sb.toString(); + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatistics.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatistics.java new file mode 100644 index 0000000000000..bd7a23140d5b6 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatistics.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Interface for statistics for the AbfsInputStream. + */ +@InterfaceStability.Unstable +public interface AbfsInputStreamStatistics { + /** + * Seek backwards, incrementing the seek and backward seek counters. + * @param negativeOffset how far was the seek? + * This is expected to be negative. + */ + void seekBackwards(long negativeOffset); + + /** + * Record a forward seek, adding a seek operation, a forward + * seek operation, and any bytes skipped. + * @param skipped number of bytes skipped by reading from the stream. + * If the seek was implemented by a close + reopen, set this to zero. + */ + void seekForwards(long skipped); + + /** + * Record a forward or backward seek, adding a seek operation, a forward or + * a backward seek operation, and number of bytes skipped. + * @param seekTo seek to the position + * @param currentPos current position + */ + void seek(long seekTo, long currentPos); + + /** + * Increment the bytes read counter by the number of bytes; + * no-op if the argument is negative. + * @param bytes number of bytes read + */ + void bytesRead(long bytes); + + void bytesReadFromBuffer(long bytes); + + void seekInBuffer(); + + /** + * A {@code read(byte[] buf, int off, int len)} operation has started. + * @param pos starting position of the read + * @param len length of bytes to read + */ + void readOperationStarted(long pos, long len); + + void remoteReadOperation(); + + @Override + @InterfaceStability.Unstable + String toString(); +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java new file mode 100644 index 0000000000000..fcdf014e5a625 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamStatisticsImpl.java @@ -0,0 +1,145 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Stats for the AbfsInputStream. + */ +public class AbfsInputStreamStatisticsImpl + implements AbfsInputStreamStatistics { + public volatile long seekOperations; + public volatile long forwardSeekOperations; + public volatile long backwardSeekOperations; + public volatile long bytesRead; + public volatile long bytesSkippedOnSeek; + public volatile long bytesBackwardsOnSeek; + public volatile long seekInBuffer; + public volatile long readOperations; + public volatile long bytesReadFromBuffer; + public volatile long remoteReadOperations; + + /** + * Seek backwards, incrementing the seek and backward seek counters. + * @param negativeOffset how far was the seek? + * This is expected to be negative. + */ + @Override + public void seekBackwards(long negativeOffset) { + seekOperations++; + backwardSeekOperations++; + bytesBackwardsOnSeek -= negativeOffset; + } + + /** + * Record a forward seek, adding a seek operation, a forward + * seek operation, and any bytes skipped. + * @param skipped number of bytes skipped by reading from the stream. + * If the seek was implemented by a close + reopen, set this to zero. + */ + @Override + public void seekForwards(long skipped) { + seekOperations++; + forwardSeekOperations++; + if (skipped > 0) { + bytesSkippedOnSeek += skipped; + } + } + + /** + * Record a forward or backward seek, adding a seek operation, a forward or + * a backward seek operation, and number of bytes skipped. + * The seek direction will be calculated based on the parameters. + * @param seekTo seek to the position + * @param currentPos current position + */ + @Override + public void seek(long seekTo, long currentPos) { + if (seekTo >= currentPos) { + this.seekForwards(seekTo - currentPos); + } else { + this.seekBackwards(currentPos - seekTo); + } + } + + /** + * Increment the bytes read counter by the number of bytes; + * no-op if the argument is negative. + * @param bytes number of bytes read + */ + @Override + public void bytesRead(long bytes) { + if (bytes > 0) { + bytesRead += bytes; + } + } + + @Override + public void bytesReadFromBuffer(long bytes) { + if (bytes > 0) { + bytesReadFromBuffer += bytes; + } + } + + @Override + public void seekInBuffer() { + seekInBuffer++; + } + + /** + * A {@code read(byte[] buf, int off, int len)} operation has started. + * @param pos starting position of the read + * @param len length of bytes to read + */ + @Override + public void readOperationStarted(long pos, long len) { + readOperations++; + } + + @Override + public void remoteReadOperation() { + remoteReadOperations++; + } + + /** + * String operator describes all the current statistics. + * Important: there are no guarantees as to the stability + * of this value. + * @return the current values of the stream statistics. + */ + @Override + @InterfaceStability.Unstable + public String toString() { + final StringBuilder sb = new StringBuilder( + "StreamStatistics{"); + sb.append(", SeekOperations=").append(seekOperations); + sb.append(", ForwardSeekOperations=").append(forwardSeekOperations); + sb.append(", BackwardSeekOperations=").append(backwardSeekOperations); + sb.append(", BytesSkippedOnSeek=").append(bytesSkippedOnSeek); + sb.append(", BytesBackwardsOnSeek=").append(bytesBackwardsOnSeek); + sb.append(", seekInBuffer=").append(seekInBuffer); + sb.append(", BytesRead=").append(bytesRead); + sb.append(", ReadOperations=").append(readOperations); + sb.append(", bytesReadFromBuffer=").append(bytesReadFromBuffer); + sb.append(", remoteReadOperations=").append(remoteReadOperations); + sb.append('}'); + return sb.toString(); + } +}