Skip to content

Commit b686991

Browse files
steveloughrandeepakdamri
authored andcommitted
HADOOP-17450. Add Public IOStatistics API. (apache#2577)
This is the API and implementation classes of HADOOP-16830, which allows callers to query IO object instances (filesystems, streams, remote iterators, ...) and other classes for statistics on their I/O Usage: operation count and min/max/mean durations. New Packages org.apache.hadoop.fs.statistics. Public API, including: IOStatisticsSource IOStatistics IOStatisticsSnapshot (seralizable to java objects and json) +helper classes for logging and integration BufferedIOStatisticsInputStream implements IOStatisticsSource and StreamCapabilities BufferedIOStatisticsOutputStream implements IOStatisticsSource, Syncable and StreamCapabilities org.apache.hadoop.fs.statistics.impl Implementation classes for internal use. org.apache.hadoop.util.functional functional programming support for RemoteIterators and other operations which raise IOEs; all wrapper classes implement and propagate IOStatisticsSource Contributed by Steve Loughran. Change-Id: If56e8db2981613ff689c39239135e44feb25f78e
1 parent cd13fc3 commit b686991

File tree

64 files changed

+9744
-18
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

64 files changed

+9744
-18
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoOutputStream.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,13 @@
2828
import org.apache.hadoop.fs.CanSetDropBehind;
2929
import org.apache.hadoop.fs.StreamCapabilities;
3030
import org.apache.hadoop.fs.Syncable;
31+
import org.apache.hadoop.fs.statistics.IOStatistics;
32+
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
3133

3234
import com.google.common.base.Preconditions;
3335

36+
import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics;
37+
3438
/**
3539
* CryptoOutputStream encrypts data. It is not thread-safe. AES CTR mode is
3640
* required in order to ensure that the plain text and cipher text have a 1:1
@@ -48,7 +52,7 @@
4852
@InterfaceAudience.Private
4953
@InterfaceStability.Evolving
5054
public class CryptoOutputStream extends FilterOutputStream implements
51-
Syncable, CanSetDropBehind, StreamCapabilities {
55+
Syncable, CanSetDropBehind, StreamCapabilities, IOStatisticsSource {
5256
private final byte[] oneByteBuf = new byte[1];
5357
private final CryptoCodec codec;
5458
private final Encryptor encryptor;
@@ -313,4 +317,9 @@ public boolean hasCapability(String capability) {
313317
}
314318
return false;
315319
}
320+
321+
@Override
322+
public IOStatistics getIOStatistics() {
323+
return retrieveIOStatistics(out);
324+
}
316325
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/BufferedFSInputStream.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@
2424

2525
import org.apache.hadoop.classification.InterfaceAudience;
2626
import org.apache.hadoop.classification.InterfaceStability;
27+
import org.apache.hadoop.fs.statistics.IOStatistics;
28+
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
29+
30+
import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics;
2731

2832

2933
/**
@@ -33,7 +37,8 @@
3337
@InterfaceAudience.Private
3438
@InterfaceStability.Unstable
3539
public class BufferedFSInputStream extends BufferedInputStream
36-
implements Seekable, PositionedReadable, HasFileDescriptor {
40+
implements Seekable, PositionedReadable, HasFileDescriptor,
41+
IOStatisticsSource, StreamCapabilities {
3742
/**
3843
* Creates a <code>BufferedFSInputStream</code>
3944
* with the specified buffer size,
@@ -126,4 +131,26 @@ public FileDescriptor getFileDescriptor() throws IOException {
126131
return null;
127132
}
128133
}
134+
135+
/**
136+
* If the inner stream supports {@link StreamCapabilities},
137+
* forward the probe to it.
138+
* Otherwise: return false.
139+
*
140+
* @param capability string to query the stream support for.
141+
* @return true if a capability is known to be supported.
142+
*/
143+
@Override
144+
public boolean hasCapability(final String capability) {
145+
if (in instanceof StreamCapabilities) {
146+
return ((StreamCapabilities) in).hasCapability(capability);
147+
} else {
148+
return false;
149+
}
150+
}
151+
152+
@Override
153+
public IOStatistics getIOStatistics() {
154+
return retrieveIOStatistics(in);
155+
}
129156
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,9 @@
3232
import org.apache.hadoop.conf.Configuration;
3333
import org.apache.hadoop.fs.permission.AclEntry;
3434
import org.apache.hadoop.fs.permission.FsPermission;
35+
import org.apache.hadoop.fs.statistics.IOStatistics;
36+
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
37+
import org.apache.hadoop.fs.statistics.IOStatisticsSupport;
3538
import org.apache.hadoop.util.DataChecksum;
3639
import org.apache.hadoop.util.Progressable;
3740

@@ -127,7 +130,8 @@ private int getSumBufferSize(int bytesPerSum, int bufferSize) {
127130
* For open()'s FSInputStream
128131
* It verifies that data matches checksums.
129132
*******************************************************/
130-
private static class ChecksumFSInputChecker extends FSInputChecker {
133+
private static class ChecksumFSInputChecker extends FSInputChecker implements
134+
IOStatisticsSource {
131135
private ChecksumFileSystem fs;
132136
private FSDataInputStream datas;
133137
private FSDataInputStream sums;
@@ -263,6 +267,17 @@ protected int readChunk(long pos, byte[] buf, int offset, int len,
263267
}
264268
return nread;
265269
}
270+
271+
/**
272+
* Get the IO Statistics of the nested stream, falling back to
273+
* null if the stream does not implement the interface
274+
* {@link IOStatisticsSource}.
275+
* @return an IOStatistics instance or null
276+
*/
277+
@Override
278+
public IOStatistics getIOStatistics() {
279+
return IOStatisticsSupport.retrieveIOStatistics(datas);
280+
}
266281
}
267282

268283
private static class FSDataBoundedInputStream extends FSDataInputStream {
@@ -382,7 +397,8 @@ public static long getChecksumLength(long size, int bytesPerSum) {
382397

383398
/** This class provides an output stream for a checksummed file.
384399
* It generates checksums for data. */
385-
private static class ChecksumFSOutputSummer extends FSOutputSummer {
400+
private static class ChecksumFSOutputSummer extends FSOutputSummer
401+
implements IOStatisticsSource, StreamCapabilities {
386402
private FSDataOutputStream datas;
387403
private FSDataOutputStream sums;
388404
private static final float CHKSUM_AS_FRACTION = 0.01f;
@@ -436,6 +452,28 @@ protected void checkClosed() throws IOException {
436452
throw new ClosedChannelException();
437453
}
438454
}
455+
456+
/**
457+
* Get the IO Statistics of the nested stream, falling back to
458+
* null if the stream does not implement the interface
459+
* {@link IOStatisticsSource}.
460+
* @return an IOStatistics instance or null
461+
*/
462+
@Override
463+
public IOStatistics getIOStatistics() {
464+
return IOStatisticsSupport.retrieveIOStatistics(datas);
465+
}
466+
467+
/**
468+
* Probe the inner stream for a capability.
469+
*
470+
* @param capability string to query the stream support for.
471+
* @return true if a capability is known to be supported.
472+
*/
473+
@Override
474+
public boolean hasCapability(final String capability) {
475+
return datas.hasCapability(capability);
476+
}
439477
}
440478

441479
@Override

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,17 @@
2424

2525
import org.apache.hadoop.classification.InterfaceAudience;
2626
import org.apache.hadoop.classification.InterfaceStability;
27+
import org.apache.hadoop.fs.statistics.IOStatistics;
28+
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
29+
import org.apache.hadoop.fs.statistics.IOStatisticsSupport;
2730

2831
/** Utility that wraps a {@link OutputStream} in a {@link DataOutputStream}.
2932
*/
3033
@InterfaceAudience.Public
3134
@InterfaceStability.Stable
3235
public class FSDataOutputStream extends DataOutputStream
33-
implements Syncable, CanSetDropBehind, StreamCapabilities {
36+
implements Syncable, CanSetDropBehind, StreamCapabilities,
37+
IOStatisticsSource {
3438
private final OutputStream wrappedStream;
3539

3640
private static class PositionCache extends FilterOutputStream {
@@ -155,4 +159,15 @@ public void setDropBehind(Boolean dropBehind) throws IOException {
155159
"not support setting the drop-behind caching setting.");
156160
}
157161
}
162+
163+
/**
164+
* Get the IO Statistics of the nested stream, falling back to
165+
* empty statistics if the stream does not implement the interface
166+
* {@link IOStatisticsSource}.
167+
* @return an IOStatistics instance.
168+
*/
169+
@Override
170+
public IOStatistics getIOStatistics() {
171+
return IOStatisticsSupport.retrieveIOStatistics(wrappedStream);
172+
}
158173
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSInputStream.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
import com.google.common.base.Preconditions;
2525
import org.apache.hadoop.classification.InterfaceAudience;
2626
import org.apache.hadoop.classification.InterfaceStability;
27+
import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
28+
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
29+
2730
import org.slf4j.Logger;
2831
import org.slf4j.LoggerFactory;
2932

@@ -134,4 +137,23 @@ public void readFully(long position, byte[] buffer)
134137
throws IOException {
135138
readFully(position, buffer, 0, buffer.length);
136139
}
140+
141+
/**
142+
* toString method returns the superclass toString, but if the subclass
143+
* implements {@link IOStatisticsSource} then those statistics are
144+
* extracted and included in the output.
145+
* That is: statistics of subclasses are automatically reported.
146+
* @return a string value.
147+
*/
148+
@Override
149+
public String toString() {
150+
final StringBuilder sb = new StringBuilder(super.toString());
151+
sb.append('{');
152+
if (this instanceof IOStatisticsSource) {
153+
sb.append(IOStatisticsLogging.ioStatisticsSourceToString(
154+
(IOStatisticsSource) this));
155+
}
156+
sb.append('}');
157+
return sb.toString();
158+
}
137159
}

0 commit comments

Comments
 (0)