Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;

import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics;


/**
Expand All @@ -33,7 +37,8 @@
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class BufferedFSInputStream extends BufferedInputStream
implements Seekable, PositionedReadable, HasFileDescriptor {
implements Seekable, PositionedReadable, HasFileDescriptor,
IOStatisticsSource {
/**
* Creates a <code>BufferedFSInputStream</code>
* with the specified buffer size,
Expand Down Expand Up @@ -126,4 +131,9 @@ public FileDescriptor getFileDescriptor() throws IOException {
return null;
}
}

@Override
public IOStatistics getIOStatistics() {
return retrieveIOStatistics(in);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
import org.apache.hadoop.fs.impl.OpenFileParameters;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.IOStatisticsSupport;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.LambdaUtils;
import org.apache.hadoop.util.Progressable;
Expand Down Expand Up @@ -134,7 +137,8 @@ private int getSumBufferSize(int bytesPerSum, int bufferSize) {
* For open()'s FSInputStream
* It verifies that data matches checksums.
*******************************************************/
private static class ChecksumFSInputChecker extends FSInputChecker {
private static class ChecksumFSInputChecker extends FSInputChecker implements
IOStatisticsSource {
private ChecksumFileSystem fs;
private FSDataInputStream datas;
private FSDataInputStream sums;
Expand Down Expand Up @@ -270,6 +274,17 @@ protected int readChunk(long pos, byte[] buf, int offset, int len,
}
return nread;
}

/**
* Get the IO Statistics of the nested stream, falling back to
* null if the stream does not implement the interface
* {@link IOStatisticsSource}.
* @return an IOStatistics instance or null
*/
@Override
public IOStatistics getIOStatistics() {
return IOStatisticsSupport.retrieveIOStatistics(datas);
}
}

private static class FSDataBoundedInputStream extends FSDataInputStream {
Expand Down Expand Up @@ -395,7 +410,8 @@ public static long getChecksumLength(long size, int bytesPerSum) {

/** This class provides an output stream for a checksummed file.
* It generates checksums for data. */
private static class ChecksumFSOutputSummer extends FSOutputSummer {
private static class ChecksumFSOutputSummer extends FSOutputSummer
implements IOStatisticsSource {
private FSDataOutputStream datas;
private FSDataOutputStream sums;
private static final float CHKSUM_AS_FRACTION = 0.01f;
Expand Down Expand Up @@ -449,6 +465,17 @@ protected void checkClosed() throws IOException {
throw new ClosedChannelException();
}
}

/**
* Get the IO Statistics of the nested stream, falling back to
* null if the stream does not implement the interface
* {@link IOStatisticsSource}.
* @return an IOStatistics instance or null
*/
@Override
public IOStatistics getIOStatistics() {
return IOStatisticsSupport.retrieveIOStatistics(datas);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.IOStatisticsSupport;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.util.IdentityHashStore;

Expand All @@ -40,7 +43,7 @@ public class FSDataInputStream extends DataInputStream
implements Seekable, PositionedReadable,
ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead,
HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities,
ByteBufferPositionedReadable {
ByteBufferPositionedReadable, IOStatisticsSource {
/**
* Map ByteBuffers that we have handed out to readers to ByteBufferPool
* objects
Expand Down Expand Up @@ -267,4 +270,15 @@ public void readFully(long position, ByteBuffer buf) throws IOException {
"unsupported by " + in.getClass().getCanonicalName());
}
}

/**
* Get the IO Statistics of the nested stream, falling back to
* null if the stream does not implement the interface
* {@link IOStatisticsSource}.
* @return an IOStatistics instance or null
*/
@Override
public IOStatistics getIOStatistics() {
return IOStatisticsSupport.retrieveIOStatistics(in);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.IOStatisticsSupport;

/** Utility that wraps a {@link OutputStream} in a {@link DataOutputStream}.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class FSDataOutputStream extends DataOutputStream
implements Syncable, CanSetDropBehind, StreamCapabilities {
implements Syncable, CanSetDropBehind, StreamCapabilities,
IOStatisticsSource {
private final OutputStream wrappedStream;

private static class PositionCache extends FilterOutputStream {
Expand Down Expand Up @@ -155,4 +159,15 @@ public void setDropBehind(Boolean dropBehind) throws IOException {
"not support setting the drop-behind caching setting.");
}
}

/**
* Get the IO Statistics of the nested stream, falling back to
* empty statistics if the stream does not implement the interface
* {@link IOStatisticsSource}.
* @return an IOStatistics instance.
*/
@Override
public IOStatistics getIOStatistics() {
return IOStatisticsSupport.retrieveIOStatistics(wrappedStream);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.statistics.IOStatisticsLogging;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -134,4 +137,23 @@ public void readFully(long position, byte[] buffer)
throws IOException {
readFully(position, buffer, 0, buffer.length);
}

/**
* toString method returns the superclass toString, but if the subclass
* implements {@link IOStatisticsSource} then those statistics are
* extracted and included in the output.
* That is: statistics of subclasses are automatically reported.
* @return a string value.
*/
@Override
public String toString() {
final StringBuilder sb = new StringBuilder(super.toString());
sb.append('{');
if (this instanceof IOStatisticsSource) {
sb.append(IOStatisticsLogging.ioStatisticsSourceToString(
(IOStatisticsSource) this));
}
sb.append('}');
return sb.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,25 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.impl.BufferedIOStatisticsOutputStream;
import org.apache.hadoop.fs.statistics.impl.CounterIOStatistics;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;

import static org.apache.hadoop.fs.impl.PathCapabilitiesSupport.validatePathCapabilityArgs;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_BYTES;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_EXCEPTIONS;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_SEEK_OPERATIONS;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_SKIP_BYTES;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_SKIP_OPERATIONS;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_WRITE_BYTES;
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_WRITE_EXCEPTIONS;
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.counterIOStatistics;

/****************************************************************
* Implement the FileSystem API for the raw local filesystem.
Expand Down Expand Up @@ -105,10 +117,21 @@ public void initialize(URI uri, Configuration conf) throws IOException {
/*******************************************************
* For open()'s FSInputStream.
*******************************************************/
class LocalFSFileInputStream extends FSInputStream implements HasFileDescriptor {
class LocalFSFileInputStream extends FSInputStream implements HasFileDescriptor,
IOStatisticsSource {
private FileInputStream fis;
private long position;

/**
* Minimal set of counters.
*/
private final CounterIOStatistics ioStatistics = counterIOStatistics(
STREAM_READ_BYTES,
STREAM_READ_EXCEPTIONS,
STREAM_READ_SEEK_OPERATIONS,
STREAM_READ_SKIP_OPERATIONS,
STREAM_READ_SKIP_BYTES);

public LocalFSFileInputStream(Path f) throws IOException {
fis = new FileInputStream(pathToFile(f));
}
Expand Down Expand Up @@ -150,9 +173,11 @@ public int read() throws IOException {
if (value >= 0) {
this.position++;
statistics.incrementBytesRead(1);
ioStatistics.increment(STREAM_READ_BYTES, 1);
}
return value;
} catch (IOException e) { // unexpected exception
ioStatistics.increment(STREAM_READ_EXCEPTIONS, 1);
throw new FSError(e); // assume native fs error
}
}
Expand All @@ -166,9 +191,11 @@ public int read(byte[] b, int off, int len) throws IOException {
if (value > 0) {
this.position += value;
statistics.incrementBytesRead(value);
ioStatistics.increment(STREAM_READ_BYTES, value);
}
return value;
} catch (IOException e) { // unexpected exception
ioStatistics.increment(STREAM_READ_EXCEPTIONS, 1);
throw new FSError(e); // assume native fs error
}
}
Expand All @@ -187,18 +214,22 @@ public int read(long position, byte[] b, int off, int len)
int value = fis.getChannel().read(bb, position);
if (value > 0) {
statistics.incrementBytesRead(value);
ioStatistics.increment(STREAM_READ_BYTES, value);
}
return value;
} catch (IOException e) {
ioStatistics.increment(STREAM_READ_EXCEPTIONS, 1);
throw new FSError(e);
}
}

@Override
public long skip(long n) throws IOException {
ioStatistics.increment(STREAM_READ_SKIP_OPERATIONS, 1);
long value = fis.skip(n);
if (value > 0) {
this.position += value;
ioStatistics.increment(STREAM_READ_SKIP_BYTES, value);
}
return value;
}
Expand All @@ -207,6 +238,11 @@ public long skip(long n) throws IOException {
public FileDescriptor getFileDescriptor() throws IOException {
return fis.getFD();
}

@Override
public IOStatistics getIOStatistics() {
return ioStatistics;
}
}

@Override
Expand All @@ -231,9 +267,17 @@ public FSDataInputStream open(PathHandle fd, int bufferSize)
/*********************************************************
* For create()'s FSOutputStream.
*********************************************************/
class LocalFSFileOutputStream extends OutputStream {
class LocalFSFileOutputStream extends OutputStream implements
IOStatisticsSource {
private FileOutputStream fos;


/**
* Minimal set of counters.
*/
private final CounterIOStatistics ioStatistics = counterIOStatistics(
STREAM_WRITE_BYTES,
STREAM_WRITE_EXCEPTIONS);

private LocalFSFileOutputStream(Path f, boolean append,
FsPermission permission) throws IOException {
File file = pathToFile(f);
Expand Down Expand Up @@ -273,7 +317,9 @@ private LocalFSFileOutputStream(Path f, boolean append,
public void write(byte[] b, int off, int len) throws IOException {
try {
fos.write(b, off, len);
ioStatistics.increment(STREAM_WRITE_BYTES, len);
} catch (IOException e) { // unexpected exception
ioStatistics.increment(STREAM_WRITE_EXCEPTIONS, 1);
throw new FSError(e); // assume native fs error
}
}
Expand All @@ -282,10 +328,17 @@ public void write(byte[] b, int off, int len) throws IOException {
public void write(int b) throws IOException {
try {
fos.write(b);
ioStatistics.increment(STREAM_WRITE_BYTES, 1);
} catch (IOException e) { // unexpected exception
ioStatistics.increment(STREAM_WRITE_EXCEPTIONS, 1);
throw new FSError(e); // assume native fs error
}
}

@Override
public IOStatistics getIOStatistics() {
return ioStatistics;
}
}

@Override
Expand Down Expand Up @@ -318,7 +371,7 @@ private FSDataOutputStream create(Path f, boolean overwrite,
if (parent != null && !mkdirs(parent)) {
throw new IOException("Mkdirs failed to create " + parent.toString());
}
return new FSDataOutputStream(new BufferedOutputStream(
return new FSDataOutputStream(new BufferedIOStatisticsOutputStream(
createOutputStreamWithMode(f, false, permission), bufferSize),
statistics);
}
Expand All @@ -340,7 +393,7 @@ public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
if (exists(f) && !flags.contains(CreateFlag.OVERWRITE)) {
throw new FileAlreadyExistsException("File already exists: " + f);
}
return new FSDataOutputStream(new BufferedOutputStream(
return new FSDataOutputStream(new BufferedIOStatisticsOutputStream(
createOutputStreamWithMode(f, false, permission), bufferSize),
statistics);
}
Expand Down
Loading