Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
8dfc9e2
HADOOP-17271. S3A statistics to support IOStatistics
steveloughran Sep 21, 2020
c906359
HADOOP-16830. Reinstate the deleted FunctionsRaisingIOE class.
steveloughran Sep 22, 2020
e606309
HADOOP-16830. Checkstyles and findbugs
steveloughran Sep 23, 2020
11ed66b
HADOOP-16830. findbugs is finally silent
steveloughran Sep 24, 2020
8309a6e
HADOOP-16830: trying to make S3AInstrumentation leaner
steveloughran Sep 28, 2020
e78d276
HADOOP-16830. DurationTracker to handle failures
steveloughran Sep 29, 2020
00a8f89
HADOOP-16830 S3A to use DurationTracker failure tracking
steveloughran Sep 29, 2020
e0ca619
HADOOP-16830. Closeable iterator is self closing.
steveloughran Sep 30, 2020
748d485
replace ProvidedFileStatusIterator with RemoteIterators classes
steveloughran Oct 1, 2020
35c0875
HADOOP-16830. Duration Tracking testing
steveloughran Oct 1, 2020
3a6c6ce
HADOOP-16830. Finishing up API
steveloughran Oct 6, 2020
250db7a
HADOOP-17271. S3A IOStatistics
steveloughran Oct 7, 2020
09a9dbd
HADOOP-16830. Checkstyles &c
steveloughran Oct 7, 2020
16a7000
HADOOP-16830. Add new type casting remote iterable
steveloughran Oct 8, 2020
750f569
HADOOP-17271. listStatusIterable() to become instrumented
steveloughran Oct 8, 2020
ff8ba38
HADOOP-16830. IOStatistics feature creep
steveloughran Oct 12, 2020
1fef445
HADOOP-16830. Improving integration with S3A code
steveloughran Oct 14, 2020
db54f65
HADOOP-17271. S3AFS to aggregate stats from streams & lists
steveloughran Oct 14, 2020
730c5b1
HADOOP-17271. S3A instrumentation switching to using atomics for update.
steveloughran Oct 16, 2020
92aaac1
HADOOP-16830. Switch to newly discovered UncheckedIOException
steveloughran Oct 29, 2020
fe08d9f
HADOOP-17271. S3A code switches to UncheckedIOException to track
steveloughran Oct 29, 2020
76e2eab
HADOOP-16830 javadocs on S3AInputStreamStatistics
steveloughran Dec 4, 2020
ded607e
HADOOP-17271. repackage S3A Statistics classes
steveloughran Dec 7, 2020
57a7627
HADOOP-16830 API/Implementation changes for statistics
steveloughran Dec 15, 2020
71e8280
HADOOP-17271. S3A Statistics Enhancement/tuning
steveloughran Dec 15, 2020
231819c
HADOOP-16830 + HADOOP-17271: S3A instrumentation
steveloughran Dec 16, 2020
0ced163
HADOOP-16830. HADOOP-17271. Checkstyle, findbugs and mock failures.
steveloughran Dec 17, 2020
5855790
HADOOP-16830. S3AInstrumentation: inline a variable.
steveloughran Dec 24, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,13 @@
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.StreamCapabilitiesPolicy;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.util.StringUtils;

import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics;

/**
* CryptoInputStream decrypts data. It is not thread-safe. AES CTR mode is
* required in order to ensure that the plain text and cipher text have a 1:1
Expand All @@ -66,7 +70,7 @@ public class CryptoInputStream extends FilterInputStream implements
Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor,
CanSetDropBehind, CanSetReadahead, HasEnhancedByteBufferAccess,
ReadableByteChannel, CanUnbuffer, StreamCapabilities,
ByteBufferPositionedReadable {
ByteBufferPositionedReadable, IOStatisticsSource {
private final byte[] oneByteBuf = new byte[1];
private final CryptoCodec codec;
private final Decryptor decryptor;
Expand Down Expand Up @@ -867,8 +871,16 @@ public boolean hasCapability(String capability) {
+ " does not expose its stream capabilities.");
}
return ((StreamCapabilities) in).hasCapability(capability);
case StreamCapabilities.IOSTATISTICS:
return (in instanceof StreamCapabilities)
&& ((StreamCapabilities) in).hasCapability(capability);
default:
return false;
}
}

@Override
public IOStatistics getIOStatistics() {
return retrieveIOStatistics(in);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,13 @@
import org.apache.hadoop.fs.CanSetDropBehind;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;

import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;

import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics;

/**
* CryptoOutputStream encrypts data. It is not thread-safe. AES CTR mode is
* required in order to ensure that the plain text and cipher text have a 1:1
Expand All @@ -48,7 +52,7 @@
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class CryptoOutputStream extends FilterOutputStream implements
Syncable, CanSetDropBehind, StreamCapabilities {
Syncable, CanSetDropBehind, StreamCapabilities, IOStatisticsSource {
private final byte[] oneByteBuf = new byte[1];
private final CryptoCodec codec;
private final Encryptor encryptor;
Expand Down Expand Up @@ -313,4 +317,9 @@ public boolean hasCapability(String capability) {
}
return false;
}

@Override
public IOStatistics getIOStatistics() {
return retrieveIOStatistics(out);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;

import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics;


/**
Expand All @@ -33,7 +37,8 @@
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class BufferedFSInputStream extends BufferedInputStream
implements Seekable, PositionedReadable, HasFileDescriptor {
implements Seekable, PositionedReadable, HasFileDescriptor,
IOStatisticsSource, StreamCapabilities {
/**
* Creates a <code>BufferedFSInputStream</code>
* with the specified buffer size,
Expand Down Expand Up @@ -126,4 +131,26 @@ public FileDescriptor getFileDescriptor() throws IOException {
return null;
}
}

/**
* If the inner stream supports {@link StreamCapabilities},
* forward the probe to it.
* Otherwise: return false.
*
* @param capability string to query the stream support for.
* @return true if a capability is known to be supported.
*/
@Override
public boolean hasCapability(final String capability) {
if (in instanceof StreamCapabilities) {
return ((StreamCapabilities) in).hasCapability(capability);
} else {
return false;
}
}

@Override
public IOStatistics getIOStatistics() {
return retrieveIOStatistics(in);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
import org.apache.hadoop.fs.impl.OpenFileParameters;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.IOStatisticsSupport;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.LambdaUtils;
import org.apache.hadoop.util.Progressable;
Expand Down Expand Up @@ -134,7 +137,8 @@ private int getSumBufferSize(int bytesPerSum, int bufferSize) {
* For open()'s FSInputStream
* It verifies that data matches checksums.
*******************************************************/
private static class ChecksumFSInputChecker extends FSInputChecker {
private static class ChecksumFSInputChecker extends FSInputChecker implements
IOStatisticsSource {
private ChecksumFileSystem fs;
private FSDataInputStream datas;
private FSDataInputStream sums;
Expand Down Expand Up @@ -270,6 +274,17 @@ protected int readChunk(long pos, byte[] buf, int offset, int len,
}
return nread;
}

/**
* Get the IO Statistics of the nested stream, falling back to
* null if the stream does not implement the interface
* {@link IOStatisticsSource}.
* @return an IOStatistics instance or null
*/
@Override
public IOStatistics getIOStatistics() {
return IOStatisticsSupport.retrieveIOStatistics(datas);
}
}

private static class FSDataBoundedInputStream extends FSDataInputStream {
Expand Down Expand Up @@ -395,7 +410,8 @@ public static long getChecksumLength(long size, int bytesPerSum) {

/** This class provides an output stream for a checksummed file.
* It generates checksums for data. */
private static class ChecksumFSOutputSummer extends FSOutputSummer {
private static class ChecksumFSOutputSummer extends FSOutputSummer
implements IOStatisticsSource, StreamCapabilities {
private FSDataOutputStream datas;
private FSDataOutputStream sums;
private static final float CHKSUM_AS_FRACTION = 0.01f;
Expand Down Expand Up @@ -449,6 +465,28 @@ protected void checkClosed() throws IOException {
throw new ClosedChannelException();
}
}

/**
* Get the IO Statistics of the nested stream, falling back to
* null if the stream does not implement the interface
* {@link IOStatisticsSource}.
* @return an IOStatistics instance or null
*/
@Override
public IOStatistics getIOStatistics() {
return IOStatisticsSupport.retrieveIOStatistics(datas);
}

/**
* Probe the inner stream for a capability.
*
* @param capability string to query the stream support for.
* @return true if a capability is known to be supported.
*/
@Override
public boolean hasCapability(final String capability) {
return datas.hasCapability(capability);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.IOStatisticsSupport;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.util.IdentityHashStore;

Expand All @@ -40,7 +43,7 @@ public class FSDataInputStream extends DataInputStream
implements Seekable, PositionedReadable,
ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead,
HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities,
ByteBufferPositionedReadable {
ByteBufferPositionedReadable, IOStatisticsSource {
/**
* Map ByteBuffers that we have handed out to readers to ByteBufferPool
* objects
Expand Down Expand Up @@ -267,4 +270,15 @@ public void readFully(long position, ByteBuffer buf) throws IOException {
"unsupported by " + in.getClass().getCanonicalName());
}
}

/**
* Get the IO Statistics of the nested stream, falling back to
* null if the stream does not implement the interface
* {@link IOStatisticsSource}.
* @return an IOStatistics instance or null
*/
@Override
public IOStatistics getIOStatistics() {
return IOStatisticsSupport.retrieveIOStatistics(in);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.IOStatisticsSupport;

/** Utility that wraps a {@link OutputStream} in a {@link DataOutputStream}.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class FSDataOutputStream extends DataOutputStream
implements Syncable, CanSetDropBehind, StreamCapabilities {
implements Syncable, CanSetDropBehind, StreamCapabilities,
IOStatisticsSource {
private final OutputStream wrappedStream;

private static class PositionCache extends FilterOutputStream {
Expand Down Expand Up @@ -155,4 +159,15 @@ public void setDropBehind(Boolean dropBehind) throws IOException {
"not support setting the drop-behind caching setting.");
}
}

/**
* Get the IO Statistics of the nested stream, falling back to
* empty statistics if the stream does not implement the interface
* {@link IOStatisticsSource}.
* @return an IOStatistics instance.
*/
@Override
public IOStatistics getIOStatistics() {
return IOStatisticsSupport.retrieveIOStatistics(wrappedStream);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -134,4 +137,23 @@ public void readFully(long position, byte[] buffer)
throws IOException {
readFully(position, buffer, 0, buffer.length);
}

/**
* toString method returns the superclass toString, but if the subclass
* implements {@link IOStatisticsSource} then those statistics are
* extracted and included in the output.
* That is: statistics of subclasses are automatically reported.
* @return a string value.
*/
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(super.toString());
sb.append('{');
if (this instanceof IOStatisticsSource) {
sb.append(IOStatisticsLogging.ioStatisticsSourceToString(
(IOStatisticsSource) this));
}
sb.append('}');
return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,20 @@

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;

/**
* MultipartUploader is an interface for copying files multipart and across
* multiple nodes.
* <p></p>
* The interface extends {@link IOStatisticsSource} so that there is no
* need to cast an instance to see if is a source of statistics.
* However, implementations MAY return null for their actual statistics.
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public interface MultipartUploader extends Closeable {
public interface MultipartUploader extends Closeable,
IOStatisticsSource {


/**
Expand Down
Loading