Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
77fb70b
HADOOP-16830. IOStatistics API.
steveloughran Apr 27, 2020
6dc8955
HADOOP-16830. Turning off AWS SDK metric collection.
steveloughran Apr 27, 2020
50fd592
extra assert in >AbstractSTestS3AHugeFiles.test_010_CreateHugeFile; c…
steveloughran Apr 28, 2020
8466dcc
HADOOP-16830: statistics API
steveloughran Apr 28, 2020
f03a985
HADOOP-16830. Statistics APIs
steveloughran Apr 29, 2020
c95adff
HADOOP-16830. IO Statistics
steveloughran May 6, 2020
ec1d45b
HADOOP-16830. IO Statistics
steveloughran May 7, 2020
88d6e79
HADOOP-16830. IO Statistics -add local fs stats
steveloughran May 18, 2020
ab4d2e0
HADOOP-16830. missed a file
steveloughran May 19, 2020
62aaeb2
HADOOP-16830. counters are resettable
steveloughran May 21, 2020
6d9828e
HADOOP-16830. move s3a input stream stats to CounterIOStatistics
steveloughran May 22, 2020
1c3f9c2
Draft an IOStatisticEntry for more generic stats
steveloughran Jun 4, 2020
2569d1d
HADOOP-16830 move to type plus tuple stats entries
steveloughran Jun 8, 2020
4369445
HADOOP-16830 complete IOStatisticsEntry. This is getting overcomplicated
steveloughran Jun 9, 2020
fba2f98
HADOOP-16830 move to new stats api
steveloughran Jun 12, 2020
6abd4bb
HADOOP-16830 third iteration of a statistics API
steveloughran Jun 12, 2020
9eea895
HADOOP-16830 third iteration of a statistics API
steveloughran Jun 12, 2020
8d5f4ab
- builder API for creating CounterIOStatistics
steveloughran Jun 16, 2020
26affff
HADOOP-16830 counters, gauges, min and max
steveloughran Jun 18, 2020
e306ccc
empty statistics, how -ve values should be aggregated
steveloughran Jun 25, 2020
3ab8b5a
HADOOP-16830; IOStatistics stream capability
steveloughran Jun 26, 2020
5db61a6
HADOOP-16830 fix a javadoc warning. This isn't new with this PR
steveloughran Jun 29, 2020
3224e8d
HADOOP-16830. IOStatisticsSnapshot to be JSON serializable
steveloughran Jul 1, 2020
2b672a2
HADOOP-16830: statistics now round trip as JSON
steveloughran Jul 2, 2020
910fdad
HADOOP-16830. IOStatistics through S3A committers
steveloughran Jul 3, 2020
82e13f0
HADOOP-16830. S3AInputStreamStatistics
steveloughran Jul 6, 2020
327be95
HADOOP-16830. multipart uploader integration
steveloughran Jul 13, 2020
ca4a240
HADOOP-16830 IOStatistics for duration tracking, broader use
steveloughran Jul 20, 2020
34f21fd
HADOOP-16830 Duration Tracking
steveloughran Aug 6, 2020
7e199e4
HADOOP-16830 S3A OutputStream unbuffer/close stats
steveloughran Aug 6, 2020
53c88a3
HADOOP-16830 checkstyle, tests and toString
steveloughran Aug 7, 2020
b29f671
HADOOP-16830 RemoteIterator work and listings
steveloughran Aug 25, 2020
7ba251d
HADOOP-17226. Failure of ITestAssumeRole.testRestrictedCommitActions
steveloughran Aug 25, 2020
7c3b47b
HADOOP-16830 More standard operation names in hadoop-common
steveloughran Aug 25, 2020
be97274
HADOOP-16830 DurationTrackerFactory
steveloughran Aug 26, 2020
952a34f
HADOOP-16830 DurationTrackerFactory adoption
steveloughran Aug 26, 2020
0806a23
HADOOP-16830 speech dictation typo
steveloughran Aug 26, 2020
e9742f5
HADOOP-16830 RemoteIterators evolution
steveloughran Aug 28, 2020
84c30a7
HADOOP-16830 IOStatistics
steveloughran Sep 7, 2020
a6935c0
HADOOP-16830. RemoteIterators
steveloughran Sep 9, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,13 @@
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.StreamCapabilitiesPolicy;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.util.StringUtils;

import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics;

/**
* CryptoInputStream decrypts data. It is not thread-safe. AES CTR mode is
* required in order to ensure that the plain text and cipher text have a 1:1
Expand All @@ -66,7 +70,7 @@ public class CryptoInputStream extends FilterInputStream implements
Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor,
CanSetDropBehind, CanSetReadahead, HasEnhancedByteBufferAccess,
ReadableByteChannel, CanUnbuffer, StreamCapabilities,
ByteBufferPositionedReadable {
ByteBufferPositionedReadable, IOStatisticsSource {
private final byte[] oneByteBuf = new byte[1];
private final CryptoCodec codec;
private final Decryptor decryptor;
Expand Down Expand Up @@ -867,8 +871,16 @@ public boolean hasCapability(String capability) {
+ " does not expose its stream capabilities.");
}
return ((StreamCapabilities) in).hasCapability(capability);
case StreamCapabilities.IOSTATISTICS:
return (in instanceof StreamCapabilities)
&& ((StreamCapabilities) in).hasCapability(capability);
default:
return false;
}
}

@Override
public IOStatistics getIOStatistics() {
return retrieveIOStatistics(in);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,13 @@
import org.apache.hadoop.fs.CanSetDropBehind;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;

import com.google.common.base.Preconditions;

import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics;

/**
* CryptoOutputStream encrypts data. It is not thread-safe. AES CTR mode is
* required in order to ensure that the plain text and cipher text have a 1:1
Expand All @@ -48,7 +52,7 @@
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class CryptoOutputStream extends FilterOutputStream implements
Syncable, CanSetDropBehind, StreamCapabilities {
Syncable, CanSetDropBehind, StreamCapabilities, IOStatisticsSource {
private final byte[] oneByteBuf = new byte[1];
private final CryptoCodec codec;
private final Encryptor encryptor;
Expand Down Expand Up @@ -313,4 +317,9 @@ public boolean hasCapability(String capability) {
}
return false;
}

@Override
public IOStatistics getIOStatistics() {
return retrieveIOStatistics(out);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;

import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics;


/**
Expand All @@ -33,7 +37,8 @@
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class BufferedFSInputStream extends BufferedInputStream
implements Seekable, PositionedReadable, HasFileDescriptor {
implements Seekable, PositionedReadable, HasFileDescriptor,
IOStatisticsSource, StreamCapabilities {
/**
* Creates a <code>BufferedFSInputStream</code>
* with the specified buffer size,
Expand Down Expand Up @@ -126,4 +131,26 @@ public FileDescriptor getFileDescriptor() throws IOException {
return null;
}
}

/**
* If the inner stream supports {@link StreamCapabilities},
* forward the probe to it.
* Otherwise: return false.
*
* @param capability string to query the stream support for.
* @return true if a capability is known to be supported.
*/
@Override
public boolean hasCapability(final String capability) {
if (in instanceof StreamCapabilities) {
return ((StreamCapabilities) in).hasCapability(capability);
} else {
return false;
}
}

@Override
public IOStatistics getIOStatistics() {
return retrieveIOStatistics(in);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
import org.apache.hadoop.fs.impl.OpenFileParameters;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.IOStatisticsSupport;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.LambdaUtils;
import org.apache.hadoop.util.Progressable;
Expand Down Expand Up @@ -134,7 +137,8 @@ private int getSumBufferSize(int bytesPerSum, int bufferSize) {
* For open()'s FSInputStream
* It verifies that data matches checksums.
*******************************************************/
private static class ChecksumFSInputChecker extends FSInputChecker {
private static class ChecksumFSInputChecker extends FSInputChecker implements
IOStatisticsSource {
private ChecksumFileSystem fs;
private FSDataInputStream datas;
private FSDataInputStream sums;
Expand Down Expand Up @@ -270,6 +274,17 @@ protected int readChunk(long pos, byte[] buf, int offset, int len,
}
return nread;
}

/**
* Get the IO Statistics of the nested stream, falling back to
* null if the stream does not implement the interface
* {@link IOStatisticsSource}.
* @return an IOStatistics instance or null
*/
@Override
public IOStatistics getIOStatistics() {
return IOStatisticsSupport.retrieveIOStatistics(datas);
}
}

private static class FSDataBoundedInputStream extends FSDataInputStream {
Expand Down Expand Up @@ -395,7 +410,8 @@ public static long getChecksumLength(long size, int bytesPerSum) {

/** This class provides an output stream for a checksummed file.
* It generates checksums for data. */
private static class ChecksumFSOutputSummer extends FSOutputSummer {
private static class ChecksumFSOutputSummer extends FSOutputSummer
implements IOStatisticsSource, StreamCapabilities {
private FSDataOutputStream datas;
private FSDataOutputStream sums;
private static final float CHKSUM_AS_FRACTION = 0.01f;
Expand Down Expand Up @@ -449,6 +465,28 @@ protected void checkClosed() throws IOException {
throw new ClosedChannelException();
}
}

/**
* Get the IO Statistics of the nested stream, falling back to
* null if the stream does not implement the interface
* {@link IOStatisticsSource}.
* @return an IOStatistics instance or null
*/
@Override
public IOStatistics getIOStatistics() {
return IOStatisticsSupport.retrieveIOStatistics(datas);
}

/**
* Probe the inner stream for a capability.
*
* @param capability string to query the stream support for.
* @return true if a capability is known to be supported.
*/
@Override
public boolean hasCapability(final String capability) {
return datas.hasCapability(capability);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.IOStatisticsSupport;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.util.IdentityHashStore;

Expand All @@ -40,7 +43,7 @@ public class FSDataInputStream extends DataInputStream
implements Seekable, PositionedReadable,
ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead,
HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities,
ByteBufferPositionedReadable {
ByteBufferPositionedReadable, IOStatisticsSource {
/**
* Map ByteBuffers that we have handed out to readers to ByteBufferPool
* objects
Expand Down Expand Up @@ -267,4 +270,15 @@ public void readFully(long position, ByteBuffer buf) throws IOException {
"unsupported by " + in.getClass().getCanonicalName());
}
}

/**
* Get the IO Statistics of the nested stream, falling back to
* null if the stream does not implement the interface
* {@link IOStatisticsSource}.
* @return an IOStatistics instance or null
*/
@Override
public IOStatistics getIOStatistics() {
return IOStatisticsSupport.retrieveIOStatistics(in);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.IOStatisticsSupport;

/** Utility that wraps a {@link OutputStream} in a {@link DataOutputStream}.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class FSDataOutputStream extends DataOutputStream
implements Syncable, CanSetDropBehind, StreamCapabilities {
implements Syncable, CanSetDropBehind, StreamCapabilities,
IOStatisticsSource {
private final OutputStream wrappedStream;

private static class PositionCache extends FilterOutputStream {
Expand Down Expand Up @@ -155,4 +159,15 @@ public void setDropBehind(Boolean dropBehind) throws IOException {
"not support setting the drop-behind caching setting.");
}
}

/**
* Get the IO Statistics of the nested stream, falling back to
* empty statistics if the stream does not implement the interface
* {@link IOStatisticsSource}.
* @return an IOStatistics instance.
*/
@Override
public IOStatistics getIOStatistics() {
return IOStatisticsSupport.retrieveIOStatistics(wrappedStream);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -134,4 +137,23 @@ public void readFully(long position, byte[] buffer)
throws IOException {
readFully(position, buffer, 0, buffer.length);
}

/**
* toString method returns the superclass toString, but if the subclass
* implements {@link IOStatisticsSource} then those statistics are
* extracted and included in the output.
* That is: statistics of subclasses are automatically reported.
* @return a string value.
*/
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(super.toString());
sb.append('{');
if (this instanceof IOStatisticsSource) {
sb.append(IOStatisticsLogging.ioStatisticsSourceToString(
(IOStatisticsSource) this));
}
sb.append('}');
return sb.toString();
}
Comment on lines +140 to +158
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like this IOStatistics API primarily displays information through toString() messages in log statements. Is this the best way to integrate with Hadoop - or is there value to adding an explicit (possibly type safe) interface for objects to display their IOStatistics information?

Also - does it make sense here to move the calls to sb#append inside of the if statement? That way, non instances will not have their string representation changed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah nevermind I see the IOStatisticsSnapshot class is serializable and allows interprocess communication of this dataformat / an API to use.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the logging lets us see what IO downstream apps are doing even without them picking up the API -the way S3A and ABFS streams do; The FSDataInputStream wrapper class &c do toString passthrough to help here. The IOStreams API Does exactly the same thing.

Result

  1. code against this API, all the details, snapshots, java and JSON serialization, aggregation etc
  2. just log streams & fs & remoteiterable and anything else @ debug and you get to see what is going on without having binaries which only link against Hadoop releases with the API

Option #2 is the interim one which you can do for any app today -we just have to make sure the hadoop wrapper/aggregate classes all have toString() calls which forward/include stats from inner classes, and do it efficiently

}
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,20 @@

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;

/**
* MultipartUploader is an interface for copying files multipart and across
* multiple nodes.
* <p></p>
* The interface extends {@link IOStatisticsSource} so that there is no
* need to cast an instance to see if is a source of statistics.
* However, implementations MAY return null for their actual statistics.
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public interface MultipartUploader extends Closeable {
public interface MultipartUploader extends Closeable,
IOStatisticsSource {


/**
Expand Down
Loading