Configurations are specified by resources. A resource contains a set of
* name/value pairs as XML data. Each resource is named by either a
@@ -141,12 +141,12 @@
* Once a resource declares a value final, no subsequently-loaded
* resource can alter that value.
* For example, one might define a final parameter with:
- *
+ *
+ * </property>
*
* Administrators typically define parameters as final in
* core-site.xml for values that user applications may not alter.
@@ -164,7 +164,7 @@
*
*
* <property>
* <name>dfs.hosts.include</name>
* <value>/etc/hadoop/conf/hosts.include</value>
* <final>true</final>
- * </property>
For example, if a configuration resource contains the following property
* definitions:
- *
+ *
+ *
*
*
* <property>
* <name>basedir</name>
* <value>/user/${user.name}</value>
@@ -179,7 +179,7 @@
* <name>otherdir</name>
* <value>${env.BASE_DIR}/other</value>
* </property>
- *
When conf.get("tempdir") is called, then ${basedir} * will be resolved to another property in this Configuration, while @@ -203,7 +203,7 @@ * can define there own custom tags in hadoop.tags.custom property. * *
For example, we can tag existing property as:
- *
+ *
+ *
*
* <property>
* <name>dfs.replication</name>
* <value>3</value>
@@ -215,7 +215,7 @@
* <value>3</value>
* <tag>HDFS,SECURITY</tag>
* </property>
- *
Properties marked with tags can be retrieved with conf * .getAllPropertiesByTag("HDFS") or conf.getAllPropertiesByTags * (Arrays.asList("YARN","SECURITY")).
@@ -581,9 +581,9 @@ public static void addDeprecations(DeprecationDelta[] deltas) { * If you have multiple deprecation entries to add, it is more efficient to * use #addDeprecations(DeprecationDelta[] deltas) instead. * - * @param key - * @param newKeys - * @param customMessage + * @param key to be deprecated + * @param newKeys list of keys that take up the values of deprecated key + * @param customMessage depcrication message * @deprecated use {@link #addDeprecation(String key, String newKey, String customMessage)} instead */ @@ -605,9 +605,9 @@ public static void addDeprecation(String key, String[] newKeys, * If you have multiple deprecation entries to add, it is more efficient to * use #addDeprecations(DeprecationDelta[] deltas) instead. * - * @param key - * @param newKey - * @param customMessage + * @param key to be deprecated + * @param newKey key that take up the values of deprecated key + * @param customMessage deprecation message */ public static void addDeprecation(String key, String newKey, String customMessage) { @@ -1404,6 +1404,7 @@ void logDeprecationOnce(String name, String source) { /** * Unset a previously set property. + * @param name the property name */ public synchronized void unset(String name) { String[] names = null; @@ -1693,6 +1694,7 @@ public void setBooleanIfUnset(String name, boolean value) { * is equivalent toset(<name>, value.toString()).
* @param name property name
* @param value new value
+ * @param * *
* *
+ *
* @param config the configuration
* @param propertyName property name
@@ -3849,7 +3854,7 @@ public void write(DataOutput out) throws IOException {
/**
* get keys matching the the regex
* @param regex
- * @return Map
* Encryption/Decryption is not always on the entire file. For example,
* in Hadoop, a node may only decrypt a portion of a file (i.e. a split).
* In these situations, the counter is derived from the file position.
- *
* The IV can be calculated by combining the initial IV and the counter with
* a lossless operation (concatenation, addition, or XOR).
- * @see http://en.wikipedia.org/wiki/Block_cipher_mode_of_operation#Counter_.28CTR.29
+ * See http://en.wikipedia.org/wiki/Block_cipher_mode_of_operation#Counter_
+ * .28CTR.29
*
* @param initIV initial IV
* @param counter counter for input stream position
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java
index a2273bf83343b..5b706da3fef78 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoInputStream.java
@@ -33,6 +33,7 @@
import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.ByteBufferPositionedReadable;
import org.apache.hadoop.fs.ByteBufferReadable;
import org.apache.hadoop.fs.CanSetDropBehind;
import org.apache.hadoop.fs.CanSetReadahead;
@@ -53,10 +54,10 @@
* required in order to ensure that the plain text and cipher text have a 1:1
* mapping. The decryption is buffer based. The key points of the decryption
* are (1) calculating the counter and (2) padding through stream position:
- *
* counter = base + pos/(algorithm blocksize);
* padding = pos%(algorithm blocksize);
- *
* The underlying stream offset is maintained as state.
*/
@InterfaceAudience.Private
@@ -328,20 +329,40 @@ public int read(long position, byte[] buffer, int offset, int length)
throws IOException {
checkStream();
try {
- final int n = ((PositionedReadable) in).read(position, buffer, offset,
+ final int n = ((PositionedReadable) in).read(position, buffer, offset,
length);
if (n > 0) {
// This operation does not change the current offset of the file
decrypt(position, buffer, offset, n);
}
-
+
return n;
} catch (ClassCastException e) {
throw new UnsupportedOperationException("This stream does not support " +
"positioned read.");
}
}
-
+
+ /**
+ * Positioned readFully using {@link ByteBuffer}s. This method is thread-safe.
+ */
+ // @Override
+ public void readFully(long position, final ByteBuffer buf)
+ throws IOException {
+ checkStream();
+ if (!(in instanceof ByteBufferPositionedReadable)) {
+ throw new UnsupportedOperationException(in.getClass().getCanonicalName()
+ + " does not support positioned reads with byte buffers.");
+ }
+ int bufPos = buf.position();
+ ((ByteBufferPositionedReadable) in).readFully(position, buf);
+ final int n = buf.position() - bufPos;
+ if (n > 0) {
+ // This operation does not change the current offset of the file
+ decrypt(position, buf, n, bufPos);
+ }
+ }
+
/**
* Decrypt length bytes in buffer starting at offset. Output is also put
* into buffer starting at offset. It is thread-safe.
@@ -375,7 +396,7 @@ private void decrypt(long position, byte[] buffer, int offset, int length)
returnDecryptor(decryptor);
}
}
-
+
/** Positioned read fully. It is thread-safe */
@Override
public void readFully(long position, byte[] buffer, int offset, int length)
@@ -407,7 +428,7 @@ public void seek(long pos) throws IOException {
checkStream();
try {
/*
- * If data of target pos in the underlying stream has already been read
+ * If data of target pos in the underlying stream has already been read
* and decrypted in outBuffer, we just need to re-position outBuffer.
*/
if (pos <= streamOffset && pos >= (streamOffset - outBuffer.remaining())) {
@@ -523,7 +544,7 @@ public int read(ByteBuffer buf) throws IOException {
* Output is also buf and same start position.
* buf.position() and buf.limit() should be unchanged after decryption.
*/
- private void decrypt(ByteBuffer buf, int n, int start)
+ private void decrypt(ByteBuffer buf, int n, int start)
throws IOException {
final int pos = buf.position();
final int limit = buf.limit();
@@ -545,7 +566,52 @@ private void decrypt(ByteBuffer buf, int n, int start)
}
buf.position(pos);
}
-
+
+ private void decrypt(long filePosition, ByteBuffer buf, int length, int start)
+ throws IOException {
+ ByteBuffer localInBuffer = null;
+ ByteBuffer localOutBuffer = null;
+
+ // Duplicate the buffer so we don't have to worry about resetting the
+ // original position and limit at the end of the method
+ buf = buf.duplicate();
+
+ int decryptedBytes = 0;
+ Decryptor localDecryptor = null;
+ try {
+ localInBuffer = getBuffer();
+ localOutBuffer = getBuffer();
+ localDecryptor = getDecryptor();
+ byte[] localIV = initIV.clone();
+ updateDecryptor(localDecryptor, filePosition, localIV);
+ byte localPadding = getPadding(filePosition);
+ // Set proper filePosition for inputdata.
+ localInBuffer.position(localPadding);
+
+ while (decryptedBytes < length) {
+ buf.position(start + decryptedBytes);
+ buf.limit(start + decryptedBytes +
+ Math.min(length - decryptedBytes, localInBuffer.remaining()));
+ localInBuffer.put(buf);
+ // Do decryption
+ try {
+ decrypt(localDecryptor, localInBuffer, localOutBuffer, localPadding);
+ buf.position(start + decryptedBytes);
+ buf.limit(start + length);
+ decryptedBytes += localOutBuffer.remaining();
+ buf.put(localOutBuffer);
+ } finally {
+ localPadding = afterDecryption(localDecryptor, localInBuffer,
+ filePosition + length, localIV);
+ }
+ }
+ } finally {
+ returnBuffer(localInBuffer);
+ returnBuffer(localOutBuffer);
+ returnDecryptor(localDecryptor);
+ }
+ }
+
@Override
public int available() throws IOException {
checkStream();
@@ -605,7 +671,7 @@ public ByteBuffer read(ByteBufferPool bufferPool, int maxLength,
}
return buffer;
} catch (ClassCastException e) {
- throw new UnsupportedOperationException("This stream does not support " +
+ throw new UnsupportedOperationException("This stream does not support " +
"enhanced byte buffer access.");
}
}
@@ -740,6 +806,7 @@ public boolean hasCapability(String capability) {
case StreamCapabilities.READAHEAD:
case StreamCapabilities.DROPBEHIND:
case StreamCapabilities.UNBUFFER:
+ case StreamCapabilities.READBYTEBUFFER:
return true;
default:
return false;
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoOutputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoOutputStream.java
index 2f347c5816b2b..829f205e22eb2 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoOutputStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/CryptoOutputStream.java
@@ -28,18 +28,22 @@
import org.apache.hadoop.fs.CanSetDropBehind;
import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.hadoop.fs.Syncable;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import com.google.common.base.Preconditions;
+import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.retrieveIOStatistics;
+
/**
* CryptoOutputStream encrypts data. It is not thread-safe. AES CTR mode is
* required in order to ensure that the plain text and cipher text have a 1:1
* mapping. The encryption is buffer based. The key points of the encryption are
* (1) calculating counter and (2) padding through stream position.
- *
* counter = base + pos/(algorithm blocksize);
* padding = pos%(algorithm blocksize);
- *
* The underlying stream offset is maintained as state.
*
* Note that while some of this class' methods are synchronized, this is just to
@@ -48,7 +52,7 @@
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class CryptoOutputStream extends FilterOutputStream implements
- Syncable, CanSetDropBehind, StreamCapabilities {
+ Syncable, CanSetDropBehind, StreamCapabilities, IOStatisticsSource {
private final byte[] oneByteBuf = new byte[1];
private final CryptoCodec codec;
private final Encryptor encryptor;
@@ -313,4 +317,9 @@ public boolean hasCapability(String capability) {
}
return false;
}
+
+ @Override
+ public IOStatistics getIOStatistics() {
+ return retrieveIOStatistics(out);
+ }
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/Decryptor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/Decryptor.java
index 9958415ebd237..7556f18d6dee2 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/Decryptor.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/Decryptor.java
@@ -38,7 +38,7 @@ public interface Decryptor {
/**
* Indicate whether the decryption context is reset.
- *
* Certain modes, like CTR, require a different IV depending on the
* position in the stream. Generally, the decryptor maintains any necessary
* context for calculating the IV and counter so that no reinit is necessary
@@ -49,22 +49,22 @@ public interface Decryptor {
/**
* This presents a direct interface decrypting with direct ByteBuffers.
- *
* This function does not always decrypt the entire buffer and may potentially
* need to be called multiple times to process an entire buffer. The object
* may hold the decryption context internally.
- *
* Some implementations may require sufficient space in the destination
* buffer to decrypt the entire input buffer.
- *
* Upon return, inBuffer.position() will be advanced by the number of bytes
* read and outBuffer.position() by bytes written. Implementations should
* not modify inBuffer.limit() and outBuffer.limit().
- *
* @param inBuffer a direct {@link ByteBuffer} to read from. inBuffer may
- * not be null and inBuffer.remaining() must be > 0
+ * not be null and inBuffer.remaining() must be {@literal >} 0
* @param outBuffer a direct {@link ByteBuffer} to write to. outBuffer may
- * not be null and outBuffer.remaining() must be > 0
+ * not be null and outBuffer.remaining() must be {@literal >} 0
* @throws IOException if decryption fails
*/
public void decrypt(ByteBuffer inBuffer, ByteBuffer outBuffer)
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/Encryptor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/Encryptor.java
index 6dc3cfbe38f07..faeb176bf9de3 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/Encryptor.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/Encryptor.java
@@ -37,7 +37,7 @@ public interface Encryptor {
/**
* Indicate whether the encryption context is reset.
- *
* Certain modes, like CTR, require a different IV depending on the
* position in the stream. Generally, the encryptor maintains any necessary
* context for calculating the IV and counter so that no reinit is necessary
@@ -48,22 +48,22 @@ public interface Encryptor {
/**
* This presents a direct interface encrypting with direct ByteBuffers.
- *
* This function does not always encrypt the entire buffer and may potentially
* need to be called multiple times to process an entire buffer. The object
* may hold the encryption context internally.
- *
* Some implementations may require sufficient space in the destination
* buffer to encrypt the entire input buffer.
- *
* Upon return, inBuffer.position() will be advanced by the number of bytes
* read and outBuffer.position() by bytes written. Implementations should
* not modify inBuffer.limit() and outBuffer.limit().
- *
* @param inBuffer a direct {@link ByteBuffer} to read from. inBuffer may
- * not be null and inBuffer.remaining() must be > 0
+ * not be null and inBuffer.remaining() must be > 0
* @param outBuffer a direct {@link ByteBuffer} to write to. outBuffer may
- * not be null and outBuffer.remaining() must be > 0
+ * not be null and outBuffer.remaining() must be > 0
* @throws IOException if encryption fails
*/
public void encrypt(ByteBuffer inBuffer, ByteBuffer outBuffer)
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/OpensslCipher.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/OpensslCipher.java
index 133a9f9110216..0a2ba52e555e5 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/OpensslCipher.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/OpensslCipher.java
@@ -107,12 +107,12 @@ private OpensslCipher(long context, int alg, int padding) {
}
/**
- * Return an
*
* All
*
* Upon return, the input buffer's position will be equal to its limit;
* its limit will not have changed. The output buffer's position will have
* advanced by n, when n is the value returned by this method; the output
* buffer's limit will not have changed.
- *
*
* If
*
* The result is stored in the output buffer. Upon return, the output buffer's
* position will have advanced by n, where n is the value returned by this
* method; the output buffer's limit will not have changed.
- *
*
* If
*
* Upon finishing, this method resets this cipher object to the state it was
* in when previously initialized. That is, the object is available to encrypt
* or decrypt more data.
- *
*
* If any exception is thrown, this cipher object need to be reset before it
* can be used again.
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/JavaKeyStoreProvider.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/JavaKeyStoreProvider.java
index 5beda0d2d2eb8..7951af56bc8f9 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/JavaKeyStoreProvider.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/JavaKeyStoreProvider.java
@@ -62,23 +62,24 @@
/**
* KeyProvider based on Java's KeyStore file format. The file may be stored in
* any Hadoop FileSystem using the following name mangling:
- * jks://hdfs@nn1.example.com/my/keys.jks -> hdfs://nn1.example.com/my/keys.jks
- * jks://file/home/owen/keys.jks -> file:///home/owen/keys.jks
- *
* If the
* If the
* NOTE: Make sure the password in the password file does not have an
* ENTER at the end, else it won't be valid for the Java KeyStore.
- *
* If the environment variable, nor the property are not set, the password used
* is 'none'.
- *
* It is expected for encrypted InputFormats and OutputFormats to copy the keys
* from the original provider into the job's Credentials object, which is
* accessed via the UserProvider. Therefore, this provider won't be used by
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProvider.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProvider.java
index 9985efa90b991..a8c283ab649cc 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProvider.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProvider.java
@@ -50,7 +50,7 @@
* abstraction to separate key storage from users of encryption. It
* is intended to support getting or storing keys in a variety of ways,
* including third party bindings.
- *
*
* This implementation generates the key material and calls the
* {@link #createKey(String, byte[], Options)} method.
*
@@ -594,7 +594,7 @@ public void close() throws IOException {
/**
* Roll a new version of the given key generating the material for it.
- *
* This implementation generates the key material and calls the
* {@link #rollNewVersion(String, byte[])} method.
*
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderCryptoExtension.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderCryptoExtension.java
index 3ee3bd756e253..00d7a7dfce0f7 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderCryptoExtension.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderCryptoExtension.java
@@ -149,7 +149,7 @@ public KeyVersion getEncryptedKeyVersion() {
* Derive the initialization vector (IV) for the encryption key from the IV
* of the encrypted key. This derived IV is used with the encryption key to
* decrypt the encrypted key.
- *
* The alternative to this is using the same IV for both the encryption key
* and the encrypted key. Even a simple symmetric transformation like this
* improves security by avoiding IV re-use. IVs will also be fairly unique
@@ -195,7 +195,7 @@ public void warmUpEncryptedKeys(String... keyNames)
* The generated key material is of the same
* length as the
* NOTE: The generated key is not stored by the
* NOTE: The generated key is not stored by the
* If the given
* If the given
*
* If using an Intel chipset with RDRAND, the high-performance hardware
* random number generator will be used and it's much faster than
* {@link java.security.SecureRandom}. If RDRAND is unavailable, default
* OpenSSL secure random generator will be used. It's still faster
* and can generate strong random bytes.
- *
+ * See https://wiki.openssl.org/index.php/Random_Numbers
+ * See http://en.wikipedia.org/wiki/RdRand
*/
@InterfaceAudience.Private
public class OpensslSecureRandom extends Random {
@@ -97,7 +97,7 @@ public void setSeed(long seed) {
* random bits (right justified, with leading zeros).
*
* @param numBits number of random bits to be generated, where
- * 0 <=
- * In some FileSystem implementations such as HDFS metadata
+ *
* Refer to the HDFS extended attributes user documentation for details.
*
* @param path Path to modify
@@ -1134,7 +1153,7 @@ public void setXAttr(Path path, String name, byte[] value)
* Set an xattr of a file or directory.
* The name must be prefixed with the namespace followed by ".". For example,
* "user.attr".
- *
* Refer to the HDFS extended attributes user documentation for details.
*
* @param path Path to modify
@@ -1153,7 +1172,7 @@ public void setXAttr(Path path, String name, byte[] value,
* Get an xattr for a file or directory.
* The name must be prefixed with the namespace followed by ".". For example,
* "user.attr".
- *
* Refer to the HDFS extended attributes user documentation for details.
*
* @param path Path to get extended attribute
@@ -1170,11 +1189,13 @@ public byte[] getXAttr(Path path, String name) throws IOException {
* Get all of the xattrs for a file or directory.
* Only those xattrs for which the logged-in user has permissions to view
* are returned.
- *
* Refer to the HDFS extended attributes user documentation for details.
*
* @param path Path to get extended attributes
- * @return Map
* Refer to the HDFS extended attributes user documentation for details.
*
* @param path Path to get extended attributes
* @param names XAttr names.
- * @return Map
* Refer to the HDFS extended attributes user documentation for details.
*
* @param path Path to get extended attributes
- * @return Map
* Refer to the HDFS extended attributes user documentation for details.
*
* @param path Path to remove extended attribute
@@ -1336,6 +1359,31 @@ public boolean equals(Object other) {
return myUri.equals(((AbstractFileSystem) other).myUri);
}
+ /**
+ * Open a file with the given set of options.
+ * The base implementation performs a blocking
+ * call to {@link #open(Path, int)}in this call;
+ * the actual outcome is in the returned {@code CompletableFuture}.
+ * This avoids having to create some thread pool, while still
+ * setting up the expectation that the {@code get()} call
+ * is needed to evaluate the result.
+ * @param path path to the file
+ * @param parameters open file parameters from the builder.
+ * @return a future which will evaluate to the opened file.
+ * @throws IOException failure to resolve the link.
+ * @throws IllegalArgumentException unknown mandatory key
+ */
+ public CompletableFuture
+ * After a successful call, {@code buf.position()} will be advanced by the
+ * number of bytes read and {@code buf.limit()} will be unchanged.
+ *
+ * In the case of an exception, the state of the buffer (the contents of the
+ * buffer, the {@code buf.position()}, the {@code buf.limit()}, etc.) is
+ * undefined, and callers should be prepared to recover from this
+ * eventuality.
+ *
+ * Callers should use {@link StreamCapabilities#hasCapability(String)} with
+ * {@link StreamCapabilities#PREADBYTEBUFFER} to check if the underlying
+ * stream supports this interface, otherwise they might get a
+ * {@link UnsupportedOperationException}.
+ *
+ * Implementations should treat 0-length requests as legitimate, and must not
+ * signal an error upon their receipt.
+ *
+ * This does not change the current offset of a file, and is thread-safe.
+ *
+ * @param position position within file
+ * @param buf the ByteBuffer to receive the results of the read operation.
+ * @return the number of bytes read, possibly zero, or -1 if reached
+ * end-of-stream
+ * @throws IOException if there is some error performing the read
+ */
+ int read(long position, ByteBuffer buf) throws IOException;
+
+ /**
+ * Reads {@code buf.remaining()} bytes into buf from a given position in
+ * the file or until the end of the data was reached before the read
+ * operation completed. Callers should use {@code buf.limit(...)} to
+ * control the size of the desired read and {@code buf.position(...)} to
+ * control the offset into the buffer the data should be written to.
+ *
+ * This operation provides similar semantics to
+ * {@link #read(long, ByteBuffer)}, the difference is that this method is
+ * guaranteed to read data until the {@link ByteBuffer} is full, or until
+ * the end of the data stream is reached.
+ *
+ * @param position position within file
+ * @param buf the ByteBuffer to receive the results of the read operation.
+ * @throws IOException if there is some error performing the read
+ * @throws EOFException the end of the data was reached before
+ * the read operation completed
+ * @see #read(long, ByteBuffer)
+ */
+ void readFully(long position, ByteBuffer buf) throws IOException;
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferReadable.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferReadable.java
index 20f7224c22cd9..926b554f42ce7 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferReadable.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferReadable.java
@@ -32,18 +32,18 @@ public interface ByteBufferReadable {
/**
* Reads up to buf.remaining() bytes into buf. Callers should use
* buf.limit(..) to control the size of the desired read.
- *
* After a successful call, buf.position() will be advanced by the number
* of bytes read and buf.limit() should be unchanged.
- *
* In the case of an exception, the values of buf.position() and buf.limit()
* are undefined, and callers should be prepared to recover from this
* eventuality.
- *
* Many implementations will throw {@link UnsupportedOperationException}, so
* callers that are not confident in support for this method from the
* underlying filesystem should be prepared to handle that exception.
- *
* Implementations should treat 0-length requests as legitimate, and must not
* signal an error upon their receipt.
*
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java
index 88c30e2f99135..882f1d951dfcd 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFileSystem.java
@@ -32,6 +32,9 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+import org.apache.hadoop.fs.statistics.IOStatisticsSupport;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Progressable;
@@ -41,7 +44,7 @@
* Abstract Checksumed FileSystem.
* It provide a basic implementation of a Checksumed FileSystem,
* which creates a checksum file for each raw file.
- * It generates & verifies checksums at the client side.
+ * It generates & verifies checksums at the client side.
*
*****************************************************************/
@InterfaceAudience.Public
@@ -127,7 +130,8 @@ private int getSumBufferSize(int bytesPerSum, int bufferSize) {
* For open()'s FSInputStream
* It verifies that data matches checksums.
*******************************************************/
- private static class ChecksumFSInputChecker extends FSInputChecker {
+ private static class ChecksumFSInputChecker extends FSInputChecker implements
+ IOStatisticsSource {
private ChecksumFileSystem fs;
private FSDataInputStream datas;
private FSDataInputStream sums;
@@ -263,6 +267,17 @@ protected int readChunk(long pos, byte[] buf, int offset, int len,
}
return nread;
}
+
+ /**
+ * Get the IO Statistics of the nested stream, falling back to
+ * null if the stream does not implement the interface
+ * {@link IOStatisticsSource}.
+ * @return an IOStatistics instance or null
+ */
+ @Override
+ public IOStatistics getIOStatistics() {
+ return IOStatisticsSupport.retrieveIOStatistics(datas);
+ }
}
private static class FSDataBoundedInputStream extends FSDataInputStream {
@@ -382,7 +397,8 @@ public static long getChecksumLength(long size, int bytesPerSum) {
/** This class provides an output stream for a checksummed file.
* It generates checksums for data. */
- private static class ChecksumFSOutputSummer extends FSOutputSummer {
+ private static class ChecksumFSOutputSummer extends FSOutputSummer
+ implements IOStatisticsSource, StreamCapabilities {
private FSDataOutputStream datas;
private FSDataOutputStream sums;
private static final float CHKSUM_AS_FRACTION = 0.01f;
@@ -436,6 +452,28 @@ protected void checkClosed() throws IOException {
throw new ClosedChannelException();
}
}
+
+ /**
+ * Get the IO Statistics of the nested stream, falling back to
+ * null if the stream does not implement the interface
+ * {@link IOStatisticsSource}.
+ * @return an IOStatistics instance or null
+ */
+ @Override
+ public IOStatistics getIOStatistics() {
+ return IOStatisticsSupport.retrieveIOStatistics(datas);
+ }
+
+ /**
+ * Probe the inner stream for a capability.
+ *
+ * @param capability string to query the stream support for.
+ * @return true if a capability is known to be supported.
+ */
+ @Override
+ public boolean hasCapability(final String capability) {
+ return datas.hasCapability(capability);
+ }
}
@Override
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFs.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFs.java
index aed9db3362415..bc1122c56a2bd 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFs.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ChecksumFs.java
@@ -42,7 +42,7 @@
* Abstract Checksumed Fs.
* It provide a basic implementation of a Checksumed Fs,
* which creates a checksum file for each raw file.
- * It generates & verifies checksums at the client side.
+ * It generates & verifies checksums at the client side.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving /*Evolving for a release,to be changed to Stable */
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
index 75749499ff72d..9d9475727d863 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java
@@ -310,7 +310,7 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
"dr.who";
/**
- * User->groups static mapping to override the groups lookup
+ * User{@literal ->}groups static mapping to override the groups lookup
*/
public static final String HADOOP_USER_GROUP_STATIC_OVERRIDES =
"hadoop.user.group.static.mapping.overrides";
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java
index c3e088b66d86c..58b5f704bb831 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CreateFlag.java
@@ -29,7 +29,7 @@
* CreateFlag specifies the file create semantic. Users can combine flags like:
*
* Use the CreateFlag as follows:
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DelegateToFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DelegateToFileSystem.java
index c6c5cbb15b06c..3a139781e0372 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DelegateToFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/DelegateToFileSystem.java
@@ -24,11 +24,13 @@
import java.util.Arrays;
import java.util.EnumSet;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Options.ChecksumOpt;
+import org.apache.hadoop.fs.impl.OpenFileParameters;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Progressable;
@@ -262,6 +264,21 @@ public List
* The default implementation of this method calls {@link #getFileStatus(Path)}
* and checks the returned permissions against the requested permissions.
* Note that the getFileStatus call will be subject to authorization checks.
@@ -1509,9 +1516,9 @@ public FsStatus next(final AbstractFileSystem fs, final Path p)
*
*
* Refer to the HDFS extended attributes user documentation for details.
*
* @param path Path to modify
@@ -2550,7 +2565,7 @@ public void setXAttr(Path path, String name, byte[] value)
* Set an xattr of a file or directory.
* The name must be prefixed with the namespace followed by ".". For example,
* "user.attr".
- *
* Refer to the HDFS extended attributes user documentation for details.
*
* @param path Path to modify
@@ -2576,7 +2591,7 @@ public Void next(final AbstractFileSystem fs, final Path p)
* Get an xattr for a file or directory.
* The name must be prefixed with the namespace followed by ".". For example,
* "user.attr".
- *
* Refer to the HDFS extended attributes user documentation for details.
*
* @param path Path to get extended attribute
@@ -2599,11 +2614,12 @@ public byte[] next(final AbstractFileSystem fs, final Path p)
* Get all of the xattrs for a file or directory.
* Only those xattrs for which the logged-in user has permissions to view
* are returned.
- *
* Refer to the HDFS extended attributes user documentation for details.
*
* @param path Path to get extended attributes
- * @return Map
* Refer to the HDFS extended attributes user documentation for details.
*
* @param path Path to get extended attributes
* @param names XAttr names.
- * @return Map
* Refer to the HDFS extended attributes user documentation for details.
*
* @param path Path to remove extended attribute
@@ -2668,11 +2685,12 @@ public Void next(final AbstractFileSystem fs, final Path p)
* Get all of the xattr names for a file or directory.
* Only those xattr names which the logged-in user has permissions to view
* are returned.
- *
* Refer to the HDFS extended attributes user documentation for details.
*
* @param path Path to get extended attributes
- * @return List
*
*
* @return
*
* @return
*
* @return
* Defaults to BLOCK_TRANSFER_MODE.
*
* @param conf
@@ -195,7 +195,7 @@ int getTransferMode(Configuration conf) {
* Set the FTPClient's data connection mode based on configuration. Valid
* values are ACTIVE_LOCAL_DATA_CONNECTION_MODE,
* PASSIVE_LOCAL_DATA_CONNECTION_MODE and PASSIVE_REMOTE_DATA_CONNECTION_MODE.
- *
* Defaults to ACTIVE_LOCAL_DATA_CONNECTION_MODE.
*
* @param client
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FtpConfigKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FtpConfigKeys.java
index e59efa5b2bc56..b522102e540a4 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FtpConfigKeys.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FtpConfigKeys.java
@@ -21,6 +21,7 @@
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.ChecksumFileSystem;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.util.DataChecksum;
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/AbstractFSBuilderImpl.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/AbstractFSBuilderImpl.java
new file mode 100644
index 0000000000000..8e5d65eba2e94
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/AbstractFSBuilderImpl.java
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.impl;
+
+import javax.annotation.Nonnull;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.NoSuchElementException;
+import java.util.Optional;
+import java.util.Set;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSBuilder;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathHandle;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Builder for filesystem/filecontext operations of various kinds,
+ * with option support.
+ *
+ *
+ * This is for reporting and testing.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public final class DurationStatisticSummary implements Serializable {
+
+ private static final long serialVersionUID = 6776381340896518486L;
+
+ /** Statistic key. */
+ private final String key;
+
+ /** Are these success or failure statistics. */
+ private final boolean success;
+
+ /** Count of operation invocations. */
+ private final long count;
+
+ /** Max duration; -1 if unknown. */
+ private final long max;
+
+ /** Min duration; -1 if unknown. */
+ private final long min;
+
+ /** Mean duration -may be null. */
+ private final MeanStatistic mean;
+
+ /**
+ * Constructor.
+ * @param key Statistic key.
+ * @param success Are these success or failure statistics.
+ * @param count Count of operation invocations.
+ * @param max Max duration; -1 if unknown.
+ * @param min Min duration; -1 if unknown.
+ * @param mean Mean duration -may be null. (will be cloned)
+ */
+ public DurationStatisticSummary(final String key,
+ final boolean success,
+ final long count,
+ final long max,
+ final long min,
+ @Nullable final MeanStatistic mean) {
+ this.key = key;
+ this.success = success;
+ this.count = count;
+ this.max = max;
+ this.min = min;
+ this.mean = mean == null ? null : mean.clone();
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public boolean isSuccess() {
+ return success;
+ }
+
+ public long getCount() {
+ return count;
+ }
+
+ public long getMax() {
+ return max;
+ }
+
+ public long getMin() {
+ return min;
+ }
+
+ public MeanStatistic getMean() {
+ return mean;
+ }
+
+ @Override
+ public String toString() {
+ return "DurationStatisticSummary{" +
+ "key='" + key + '\'' +
+ ", success=" + success +
+ ", counter=" + count +
+ ", max=" + max +
+ ", mean=" + mean +
+ '}';
+ }
+
+ /**
+ * Fetch the duration timing summary of success or failure operations
+ * from an IO Statistics source.
+ * If the duration key is unknown, the summary will be incomplete.
+ * @param source source of data
+ * @param key duration statistic key
+ * @param success fetch success statistics, or if false, failure stats.
+ * @return a summary of the statistics.
+ */
+ public static DurationStatisticSummary fetchDurationSummary(
+ IOStatistics source,
+ String key,
+ boolean success) {
+ String fullkey = success ? key : key + SUFFIX_FAILURES;
+ return new DurationStatisticSummary(key, success,
+ source.counters().getOrDefault(fullkey, 0L),
+ source.maximums().getOrDefault(fullkey + SUFFIX_MAX, -1L),
+ source.minimums().getOrDefault(fullkey + SUFFIX_MIN, -1L),
+ source.meanStatistics()
+ .get(fullkey + SUFFIX_MEAN));
+ }
+
+ /**
+ * Fetch the duration timing summary from an IOStatistics source.
+ * If the duration key is unknown, the summary will be incomplete.
+ * @param source source of data
+ * @param key duration statistic key
+ * @return a summary of the statistics.
+ */
+ public static DurationStatisticSummary fetchSuccessSummary(
+ IOStatistics source,
+ String key) {
+ return fetchDurationSummary(source, key, true);
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/DurationTracker.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/DurationTracker.java
new file mode 100644
index 0000000000000..5a15c7ad66c4f
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/DurationTracker.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.statistics;
+
+import java.time.Duration;
+
+/**
+ * Interface to be implemented by objects which can track duration.
+ * It extends AutoCloseable to fit into a try-with-resources statement,
+ * but then strips out the {@code throws Exception} aspect of the signature
+ * so it doesn't force code to add extra handling for any failures.
+ *
+ * If a duration is declared as "failed()" then the failure counters
+ * will be updated.
+ */
+public interface DurationTracker extends AutoCloseable {
+
+ /**
+ * The operation failed. Failure statistics will be updated.
+ */
+ void failed();
+
+ /**
+ * Finish tracking: update the statistics with the timings.
+ */
+ void close();
+
+ /**
+ * Get the duration of an operation as a java Duration
+ * instance. If the duration tracker hasn't completed,
+ * or its duration tracking doesn't actually measure duration,
+ * returns Duration.ZERO.
+ * @return a duration, value of ZERO until close().
+ */
+ default Duration asDuration() {
+ return Duration.ZERO;
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/DurationTrackerFactory.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/DurationTrackerFactory.java
new file mode 100644
index 0000000000000..b1d87c9100f95
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/DurationTrackerFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.statistics;
+
+/**
+ * Interface for a source of duration tracking.
+ *
+ * This is intended for uses where it can be passed into classes
+ * which update operation durations, without tying those
+ * classes to internal implementation details.
+ */
+public interface DurationTrackerFactory {
+
+ /**
+ * Initiate a duration tracking operation by creating/returning
+ * an object whose {@code close()} call will
+ * update the statistics.
+ *
+ * The statistics counter with the key name will be incremented
+ * by the given count.
+ *
+ * The expected use is within a try-with-resources clause.
+ * @param key statistic key prefix
+ * @param count #of times to increment the matching counter in this
+ * operation.
+ * @return an object to close after an operation completes.
+ */
+ DurationTracker trackDuration(String key, long count);
+
+ /**
+ * Initiate a duration tracking operation by creating/returning
+ * an object whose {@code close()} call will
+ * update the statistics.
+ * The expected use is within a try-with-resources clause.
+ * @param key statistic key
+ * @return an object to close after an operation completes.
+ */
+ default DurationTracker trackDuration(String key) {
+ return trackDuration(key, 1);
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/IOStatistics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/IOStatistics.java
new file mode 100644
index 0000000000000..75d9965128101
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/IOStatistics.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.statistics;
+
+import java.util.Map;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * IO Statistics.
+ *
+ * These are low-cost per-instance statistics provided by any Hadoop
+ * I/O class instance.
+ *
+ * Consult the filesystem specification document for the requirements
+ * of an implementation of this interface.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public interface IOStatistics {
+
+ /**
+ * Map of counters.
+ * @return the current map of counters.
+ */
+ Map
+ * Exceptions are caught and downgraded to debug logging.
+ * @param source source of statistics.
+ * @return a string for logging.
+ */
+ public static String ioStatisticsSourceToString(@Nullable Object source) {
+ try {
+ return ioStatisticsToString(retrieveIOStatistics(source));
+ } catch (RuntimeException e) {
+ LOG.debug("Ignoring", e);
+ return "";
+ }
+ }
+
+ /**
+ * Convert IOStatistics to a string form.
+ * @param statistics A statistics instance.
+ * @return string value or the empty string if null
+ */
+ public static String ioStatisticsToString(
+ @Nullable final IOStatistics statistics) {
+ if (statistics != null) {
+ StringBuilder sb = new StringBuilder();
+ mapToString(sb, "counters", statistics.counters(), " ");
+ mapToString(sb, "gauges", statistics.gauges(), " ");
+ mapToString(sb, "minimums", statistics.minimums(), " ");
+ mapToString(sb, "maximums", statistics.maximums(), " ");
+ mapToString(sb, "means", statistics.meanStatistics(), " ");
+
+ return sb.toString();
+ } else {
+ return "";
+ }
+ }
+
+ /**
+ * Convert IOStatistics to a string form, with all the metrics sorted
+ * and empty value stripped.
+ * This is more expensive than the simple conversion, so should only
+ * be used for logging/output where it's known/highly likely that the
+ * caller wants to see the values. Not for debug logging.
+ * @param statistics A statistics instance.
+ * @return string value or the empty string if null
+ */
+ public static String ioStatisticsToPrettyString(
+ @Nullable final IOStatistics statistics) {
+ if (statistics != null) {
+ StringBuilder sb = new StringBuilder();
+ mapToSortedString(sb, "counters", statistics.counters(),
+ p -> p == 0);
+ mapToSortedString(sb, "\ngauges", statistics.gauges(),
+ p -> p == 0);
+ mapToSortedString(sb, "\nminimums", statistics.minimums(),
+ p -> p < 0);
+ mapToSortedString(sb, "\nmaximums", statistics.maximums(),
+ p -> p < 0);
+ mapToSortedString(sb, "\nmeans", statistics.meanStatistics(),
+ MeanStatistic::isEmpty);
+
+ return sb.toString();
+ } else {
+ return "";
+ }
+ }
+
+ /**
+ * Given a map, add its entryset to the string.
+ * The entries are only sorted if the source entryset
+ * iterator is sorted, such as from a TreeMap.
+ * @param sb string buffer to append to
+ * @param type type (for output)
+ * @param map map to evaluate
+ * @param separator separator
+ * @param
+ * Whenever this object's toString() method is called, it evaluates the
+ * statistics.
+ *
+ * This is designed to affordable to use in log statements.
+ * @param source source of statistics -may be null.
+ * @return an object whose toString() operation returns the current values.
+ */
+ public static Object demandStringifyIOStatisticsSource(
+ @Nullable IOStatisticsSource source) {
+ return new SourceToString(source);
+ }
+
+ /**
+ * On demand stringifier of an IOStatistics instance.
+ *
+ * Whenever this object's toString() method is called, it evaluates the
+ * statistics.
+ *
+ * This is for use in log statements where for the cost of creation
+ * of this entry is low; it is affordable to use in log statements.
+ * @param statistics statistics to stringify -may be null.
+ * @return an object whose toString() operation returns the current values.
+ */
+ public static Object demandStringifyIOStatistics(
+ @Nullable IOStatistics statistics) {
+ return new StatisticsToString(statistics);
+ }
+
+ /**
+ * Extract any statistics from the source and log at debug, if
+ * the log is set to log at debug.
+ * No-op if logging is not at debug or the source is null/of
+ * the wrong type/doesn't provide statistics.
+ * @param log log to log to
+ * @param message message for log -this must contain "{}" for the
+ * statistics report to actually get logged.
+ * @param source source object
+ */
+ public static void logIOStatisticsAtDebug(
+ Logger log,
+ String message,
+ Object source) {
+ if (log.isDebugEnabled()) {
+ // robust extract and convert to string
+ String stats = ioStatisticsSourceToString(source);
+ if (!stats.isEmpty()) {
+ log.debug(message, stats);
+ }
+ }
+ }
+
+ /**
+ * Extract any statistics from the source and log to
+ * this class's log at debug, if
+ * the log is set to log at debug.
+ * No-op if logging is not at debug or the source is null/of
+ * the wrong type/doesn't provide statistics.
+ * @param message message for log -this must contain "{}" for the
+ * statistics report to actually get logged.
+ * @param source source object
+ */
+ public static void logIOStatisticsAtDebug(
+ String message,
+ Object source) {
+ logIOStatisticsAtDebug(LOG, message, source);
+ }
+
+ /**
+ * On demand stringifier.
+ *
+ * Whenever this object's toString() method is called, it
+ * retrieves the latest statistics instance and re-evaluates it.
+ */
+ private static final class SourceToString {
+
+ private final IOStatisticsSource source;
+
+ private SourceToString(@Nullable IOStatisticsSource source) {
+ this.source = source;
+ }
+
+ @Override
+ public String toString() {
+ return source != null
+ ? ioStatisticsSourceToString(source)
+ : IOStatisticsBinding.NULL_SOURCE;
+ }
+ }
+
+ /**
+ * Stringifier of statistics: low cost to instantiate and every
+ * toString/logging will re-evaluate the statistics.
+ */
+ private static final class StatisticsToString {
+
+ private final IOStatistics statistics;
+
+ /**
+ * Constructor.
+ * @param statistics statistics
+ */
+ private StatisticsToString(@Nullable IOStatistics statistics) {
+ this.statistics = statistics;
+ }
+
+ /**
+ * Evaluate and stringify the statistics.
+ * @return a string value.
+ */
+ @Override
+ public String toString() {
+ return statistics != null
+ ? ioStatisticsToString(statistics)
+ : IOStatisticsBinding.NULL_SOURCE;
+ }
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/IOStatisticsSnapshot.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/IOStatisticsSnapshot.java
new file mode 100644
index 0000000000000..5b8b2e284cc11
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/IOStatisticsSnapshot.java
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.statistics;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Collectors;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding;
+import org.apache.hadoop.util.JsonSerialization;
+
+import static org.apache.hadoop.thirdparty.com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToString;
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.aggregateMaps;
+import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.snapshotMap;
+
+/**
+ * Snapshot of statistics from a different source.
+ *
+ * It is serializable so that frameworks which can use java serialization
+ * to propagate data (Spark, Flink...) can send the statistics
+ * back. For this reason, TreeMaps are explicitly used as field types,
+ * even though IDEs can recommend use of Map instead.
+ * For security reasons, untrusted java object streams should never be
+ * deserialized. If for some reason this is required, use
+ * {@link #requiredSerializationClasses()} to get the list of classes
+ * used when deserializing instances of this object.
+ *
+ *
+ * It is annotated for correct serializations with jackson2.
+ *
+ * These statistics MUST be instance specific, not thread local.
+ *
+ * It is not a requirement that the same instance is returned every time.
+ * {@link IOStatisticsSource}.
+ *
+ * If the object implementing this is Closeable, this method
+ * may return null if invoked on a closed object, even if
+ * it returns a valid instance when called earlier.
+ * @return an IOStatistics instance or null
+ */
+ default IOStatistics getIOStatistics() {
+ return null;
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/IOStatisticsSupport.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/IOStatisticsSupport.java
new file mode 100644
index 0000000000000..75977047c0f2a
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/IOStatisticsSupport.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.statistics;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.statistics.impl.StubDurationTracker;
+import org.apache.hadoop.fs.statistics.impl.StubDurationTrackerFactory;
+
+/**
+ * Support for working with IOStatistics.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public final class IOStatisticsSupport {
+
+ private IOStatisticsSupport() {
+ }
+
+ /**
+ * Take a snapshot of the current statistics state.
+ *
+ * This is not an atomic option.
+ *
+ * The instance can be serialized, and its
+ * {@code toString()} method lists all the values.
+ * @param statistics statistics
+ * @return a snapshot of the current values.
+ */
+ public static IOStatisticsSnapshot
+ snapshotIOStatistics(IOStatistics statistics) {
+
+ return new IOStatisticsSnapshot(statistics);
+ }
+
+ /**
+ * Create a snapshot statistics instance ready to aggregate data.
+ *
+ * The instance can be serialized, and its
+ * {@code toString()} method lists all the values.
+ * @return an empty snapshot
+ */
+ public static IOStatisticsSnapshot
+ snapshotIOStatistics() {
+
+ return new IOStatisticsSnapshot();
+ }
+
+ /**
+ * Get the IOStatistics of the source, casting it
+ * if it is of the relevant type, otherwise,
+ * if it implements {@link IOStatisticsSource}
+ * extracting the value.
+ *
+ * Returns null if the source isn't of the write type
+ * or the return value of
+ * {@link IOStatisticsSource#getIOStatistics()} was null.
+ * @return an IOStatistics instance or null
+ */
+
+ public static IOStatistics retrieveIOStatistics(
+ final Object source) {
+ if (source instanceof IOStatistics) {
+ return (IOStatistics) source;
+ } else if (source instanceof IOStatisticsSource) {
+ return ((IOStatisticsSource) source).getIOStatistics();
+ } else {
+ // null source or interface not implemented
+ return null;
+ }
+ }
+
+ /**
+ * Return a stub duration tracker factory whose returned trackers
+ * are always no-ops.
+ *
+ * As singletons are returned, this is very low-cost to use.
+ * @return a duration tracker factory.
+ */
+ public static DurationTrackerFactory stubDurationTrackerFactory() {
+ return StubDurationTrackerFactory.STUB_DURATION_TRACKER_FACTORY;
+ }
+
+ /**
+ * Get a stub duration tracker.
+ * @return a stub tracker.
+ */
+ public static DurationTracker stubDurationTracker() {
+ return StubDurationTracker.STUB_DURATION_TRACKER;
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/MeanStatistic.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/MeanStatistic.java
new file mode 100644
index 0000000000000..d9ff0c25c6a21
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/MeanStatistic.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.statistics;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * A mean statistic represented as the sum and the sample count;
+ * the mean is calculated on demand.
+ *
+ * It can be used to accrue values so as to dynamically update
+ * the mean. If so, know that there is no synchronization
+ * on the methods.
+ *
+ * If a statistic has 0 samples then it is considered to be empty.
+ *
+ * All 'empty' statistics are equivalent, independent of the sum value.
+ *
+ * For non-empty statistics, sum and sample values must match
+ * for equality.
+ *
+ * It is serializable and annotated for correct serializations with jackson2.
+ *
+ * Thread safety. The operations to add/copy sample data, are thread safe.
+ *
+ * So is the {@link #mean()} method. This ensures that when
+ * used to aggregated statistics, the aggregate value and sample
+ * count are set and evaluated consistently.
+ *
+ * Other methods marked as synchronized because Findbugs overreacts
+ * to the idea that some operations to update sum and sample count
+ * are synchronized, but that things like equals are not.
+ *
+ * When adding new common statistic name constants, please make them unique.
+ * By convention:
+ *
+ * When adding new common statistic name constants, please make them unique.
+ * By convention, they are implicitly unique:
+ * OpensslCipher object that implements the specified
+ * Return an OpensslCipher object that implements the specified
* transformation.
*
* @param transformation the name of the transformation, e.g.,
* AES/CTR/NoPadding.
- * @return OpensslCipher an OpensslCipher object
+ * @return OpensslCipher an OpensslCipher object
* @throws NoSuchAlgorithmException if transformation is null,
* empty, in an invalid format, or if Openssl doesn't implement the
* specified algorithm.
@@ -181,18 +181,18 @@ public void init(int mode, byte[] key, byte[] iv) {
/**
* Continues a multiple-part encryption or decryption operation. The data
* is encrypted or decrypted, depending on how this cipher was initialized.
- *
+ * input.remaining() bytes starting at
* input.position() are processed. The result is stored in
* the output buffer.
- * output.remaining() bytes are insufficient to hold the
* result, a ShortBufferException is thrown.
@@ -218,21 +218,21 @@ public int update(ByteBuffer input, ByteBuffer output)
/**
* Finishes a multiple-part operation. The data is encrypted or decrypted,
* depending on how this cipher was initialized.
- * output.remaining() bytes are insufficient to hold the result,
* a ShortBufferException is thrown.
- * HADOOP_KEYSTORE_PASSWORD environment variable is set,
* its value is used as the password for the keystore.
- * HADOOP_KEYSTORE_PASSWORD environment variable is not set,
* the password for the keystore is read from file specified in the
* {@link #KEYSTORE_PASSWORD_FILE_KEY} configuration property. The password file
* is looked up in Hadoop's configuration directory via the classpath.
- * KeyProvider implementations must be thread safe.
*/
@InterfaceAudience.Public
@@ -550,7 +550,7 @@ protected byte[] generateKey(int size, String algorithm)
/**
* Create a new key generating the material for it.
* The given key must not already exist.
- * KeyVersion material of the latest key version
* of the key and is encrypted using the same cipher.
- * KeyProvider
*
* @param encryptionKeyName
@@ -498,7 +498,7 @@ public void warmUpEncryptedKeys(String... keyNames)
* and initialization vector. The generated key material is of the same
* length as the KeyVersion material and is encrypted using the
* same cipher.
- * KeyProvider
*
* @param encryptionKeyName The latest KeyVersion of this key's material will
@@ -576,7 +576,6 @@ public void drain(String keyName) {
* NOTE: The generated key is not stored by the KeyProvider
*
* @param ekvs List containing the EncryptedKeyVersion's
- * @return The re-encrypted EncryptedKeyVersion's, in the same order.
* @throws IOException If any EncryptedKeyVersion could not be re-encrypted
* @throws GeneralSecurityException If any EncryptedKeyVersion could not be
* re-encrypted because of a cryptographic issue.
@@ -589,7 +588,7 @@ public void reencryptEncryptedKeys(ListKeyProviderCryptoExtension using a given
* {@link KeyProvider}.
- *
+ * KeyProvider implements the
* {@link CryptoExtension} interface the KeyProvider itself
* will provide the extension functionality.
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderDelegationTokenExtension.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderDelegationTokenExtension.java
index 92853ab11752f..05d99ed0810fc 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderDelegationTokenExtension.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/KeyProviderDelegationTokenExtension.java
@@ -124,7 +124,7 @@ public Token> getDelegationToken(final String renewer) throws IOException {
/**
* Creates a KeyProviderDelegationTokenExtension using a given
* {@link KeyProvider}.
- * KeyProvider implements the
* {@link DelegationTokenExtension} interface the KeyProvider
* itself will provide the extension functionality, otherwise a default
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java
index f0eaef10c17d2..71ed4557b357b 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java
@@ -263,12 +263,12 @@ public static class Factory extends KeyProviderFactory {
/**
* This provider expects URIs in the following form :
- * kms://SyncGenerationPolicy specified by the user.
* @param keyName String key name
* @param num Minimum number of values to return.
- * @return ListnumBits <= 32.
+ * 0 {@literal <=} numBits {@literal <=} 32.
*
* @return int an int containing the user-specified number
* of random bits (right justified, with leading zeros).
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java
index 9b0bab11afb9c..ccfabe52ecc06 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/AbstractFileSystem.java
@@ -25,12 +25,14 @@
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.StringTokenizer;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.HadoopIllegalArgumentException;
@@ -41,13 +43,17 @@
import org.apache.hadoop.fs.Options.ChecksumOpt;
import org.apache.hadoop.fs.Options.CreateOpts;
import org.apache.hadoop.fs.Options.Rename;
+import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl;
+import org.apache.hadoop.fs.impl.OpenFileParameters;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.LambdaUtils;
import org.apache.hadoop.util.Progressable;
import com.google.common.annotations.VisibleForTesting;
@@ -338,7 +344,7 @@ private URI getUri(URI uri, String supportedScheme,
* The default port of this file system.
*
* @return default port of this file system's Uri scheme
- * A uri with a port of -1 => default port;
+ * A uri with a port of -1 => default port;
*/
public abstract int getUriDefaultPort();
@@ -398,8 +404,11 @@ public void checkPath(Path path) {
thatPort = this.getUriDefaultPort();
}
if (thisPort != thatPort) {
- throw new InvalidPathException("Wrong FS: " + path + ", expected: "
- + this.getUri());
+ throw new InvalidPathException("Wrong FS: " + path
+ + " and port=" + thatPort
+ + ", expected: "
+ + this.getUri()
+ + " with port=" + thisPort);
}
}
@@ -450,8 +459,16 @@ public Path getInitialWorkingDirectory() {
* @return current user's home directory.
*/
public Path getHomeDirectory() {
- return new Path("/user/"+System.getProperty("user.name")).makeQualified(
- getUri(), null);
+ String username;
+ try {
+ username = UserGroupInformation.getCurrentUser().getShortUserName();
+ } catch(IOException ex) {
+ LOG.warn("Unable to get user name. Fall back to system property " +
+ "user.name", ex);
+ username = System.getProperty("user.name");
+ }
+ return new Path("/user/" + username)
+ .makeQualified(getUri(), null);
}
/**
@@ -480,9 +497,11 @@ public FsServerDefaults getServerDefaults(final Path f) throws IOException {
* through any internal symlinks or mount point
* @param p path to be resolved
* @return fully qualified path
- * @throws FileNotFoundException, AccessControlException, IOException
- * UnresolvedLinkException if symbolic link on path cannot be resolved
- * internally
+ * @throws FileNotFoundException
+ * @throws AccessControlException
+ * @throws IOException
+ * @throws UnresolvedLinkException if symbolic link on path cannot be
+ * resolved internally
*/
public Path resolvePath(final Path p) throws FileNotFoundException,
UnresolvedLinkException, AccessControlException, IOException {
@@ -848,8 +867,7 @@ public abstract FileStatus getFileStatus(final Path f)
/**
* Synchronize client metadata state.
- * BufferedFSInputStream
* with the specified buffer size,
@@ -44,7 +49,7 @@ public class BufferedFSInputStream extends BufferedInputStream
*
* @param in the underlying input stream.
* @param size the buffer size.
- * @exception IllegalArgumentException if size <= 0.
+ * @exception IllegalArgumentException if size {@literal <=} 0.
*/
public BufferedFSInputStream(FSInputStream in, int size) {
super(in, size);
@@ -126,4 +131,26 @@ public FileDescriptor getFileDescriptor() throws IOException {
return null;
}
}
+
+ /**
+ * If the inner stream supports {@link StreamCapabilities},
+ * forward the probe to it.
+ * Otherwise: return false.
+ *
+ * @param capability string to query the stream support for.
+ * @return true if a capability is known to be supported.
+ */
+ @Override
+ public boolean hasCapability(final String capability) {
+ if (in instanceof StreamCapabilities) {
+ return ((StreamCapabilities) in).hasCapability(capability);
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public IOStatistics getIOStatistics() {
+ return retrieveIOStatistics(in);
+ }
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferPositionedReadable.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferPositionedReadable.java
new file mode 100644
index 0000000000000..f8282d88c46c3
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ByteBufferPositionedReadable.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * Implementers of this interface provide a positioned read API that writes to a
+ * {@link ByteBuffer} rather than a {@code byte[]}.
+ *
+ * @see PositionedReadable
+ * @see ByteBufferReadable
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface ByteBufferPositionedReadable {
+ /**
+ * Reads up to {@code buf.remaining()} bytes into buf from a given position
+ * in the file and returns the number of bytes read. Callers should use
+ * {@code buf.limit(...)} to control the size of the desired read and
+ * {@code buf.position(...)} to control the offset into the buffer the data
+ * should be written to.
+ *
*
* EnumSet.of(CreateFlag.CREATE, CreateFlag.APPEND)
- *
+ *
* Return type on the {@link #build()} call.
+ * @param type of builder itself.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public interface FSBuilder> {
+
+ /**
+ * Set optional Builder parameter.
+ */
+ B opt(@Nonnull String key, @Nonnull String value);
+
+ /**
+ * Set optional boolean parameter for the Builder.
+ *
+ * @see #opt(String, String)
+ */
+ B opt(@Nonnull String key, boolean value);
+
+ /**
+ * Set optional int parameter for the Builder.
+ *
+ * @see #opt(String, String)
+ */
+ B opt(@Nonnull String key, int value);
+
+ /**
+ * Set optional float parameter for the Builder.
+ *
+ * @see #opt(String, String)
+ */
+ B opt(@Nonnull String key, float value);
+
+ /**
+ * Set optional long parameter for the Builder.
+ *
+ * @see #opt(String, String)
+ */
+ B opt(@Nonnull String key, long value);
+
+ /**
+ * Set optional double parameter for the Builder.
+ *
+ * @see #opt(String, String)
+ */
+ B opt(@Nonnull String key, double value);
+
+ /**
+ * Set an array of string values as optional parameter for the Builder.
+ *
+ * @see #opt(String, String)
+ */
+ B opt(@Nonnull String key, @Nonnull String... values);
+
+ /**
+ * Set mandatory option to the Builder.
+ *
+ * If the option is not supported or unavailable,
+ * the client should expect {@link #build()} throws IllegalArgumentException.
+ */
+ B must(@Nonnull String key, @Nonnull String value);
+
+ /**
+ * Set mandatory boolean option.
+ *
+ * @see #must(String, String)
+ */
+ B must(@Nonnull String key, boolean value);
+
+ /**
+ * Set mandatory int option.
+ *
+ * @see #must(String, String)
+ */
+ B must(@Nonnull String key, int value);
+
+ /**
+ * Set mandatory float option.
+ *
+ * @see #must(String, String)
+ */
+ B must(@Nonnull String key, float value);
+
+ /**
+ * Set mandatory long option.
+ *
+ * @see #must(String, String)
+ */
+ B must(@Nonnull String key, long value);
+
+ /**
+ * Set mandatory double option.
+ *
+ * @see #must(String, String)
+ */
+ B must(@Nonnull String key, double value);
+
+ /**
+ * Set a string array as mandatory option.
+ *
+ * @see #must(String, String)
+ */
+ B must(@Nonnull String key, @Nonnull String... values);
+
+ /**
+ * Instantiate the object which was being built.
+ *
+ * @throws IllegalArgumentException if the parameters are not valid.
+ * @throws UnsupportedOperationException if the filesystem does not support
+ * the specific operation.
+ * @throws IOException on filesystem IO errors.
+ */
+ S build() throws IllegalArgumentException,
+ UnsupportedOperationException, IOException;
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java
index 08d71f16c0783..e15d744935fd0 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.fs;
import java.io.DataInputStream;
+import java.io.EOFException;
import java.io.FileDescriptor;
import java.io.FileInputStream;
import java.io.IOException;
@@ -38,7 +39,8 @@
public class FSDataInputStream extends DataInputStream
implements Seekable, PositionedReadable,
ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead,
- HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities {
+ HasEnhancedByteBufferAccess, CanUnbuffer, StreamCapabilities,
+ ByteBufferPositionedReadable {
/**
* Map ByteBuffers that we have handed out to readers to ByteBufferPool
* objects
@@ -50,8 +52,8 @@ public class FSDataInputStream extends DataInputStream
public FSDataInputStream(InputStream in) {
super(in);
if( !(in instanceof Seekable) || !(in instanceof PositionedReadable) ) {
- throw new IllegalArgumentException(
- "In is not an instance of Seekable or PositionedReadable");
+ throw new IllegalArgumentException(in.getClass().getCanonicalName() +
+ " is not an instance of Seekable or PositionedReadable");
}
}
@@ -147,7 +149,8 @@ public int read(ByteBuffer buf) throws IOException {
return ((ByteBufferReadable)in).read(buf);
}
- throw new UnsupportedOperationException("Byte-buffer read unsupported by input stream");
+ throw new UnsupportedOperationException("Byte-buffer read unsupported " +
+ "by " + in.getClass().getCanonicalName());
}
@Override
@@ -167,9 +170,8 @@ public void setReadahead(Long readahead)
try {
((CanSetReadahead)in).setReadahead(readahead);
} catch (ClassCastException e) {
- throw new UnsupportedOperationException(
- "this stream does not support setting the readahead " +
- "caching strategy.");
+ throw new UnsupportedOperationException(in.getClass().getCanonicalName() +
+ " does not support setting the readahead caching strategy.");
}
}
@@ -246,4 +248,23 @@ public boolean hasCapability(String capability) {
public String toString() {
return super.toString() + ": " + in;
}
-}
+
+ @Override
+ public int read(long position, ByteBuffer buf) throws IOException {
+ if (in instanceof ByteBufferPositionedReadable) {
+ return ((ByteBufferPositionedReadable) in).read(position, buf);
+ }
+ throw new UnsupportedOperationException("Byte-buffer pread unsupported " +
+ "by " + in.getClass().getCanonicalName());
+ }
+
+ @Override
+ public void readFully(long position, ByteBuffer buf) throws IOException {
+ if (in instanceof ByteBufferPositionedReadable) {
+ ((ByteBufferPositionedReadable) in).readFully(position, buf);
+ } else {
+ throw new UnsupportedOperationException("Byte-buffer pread " +
+ "unsupported by " + in.getClass().getCanonicalName());
+ }
+ }
+}
\ No newline at end of file
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java
index 5970373a9f31a..7ec4067b9800d 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java
@@ -24,13 +24,17 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+import org.apache.hadoop.fs.statistics.IOStatisticsSupport;
/** Utility that wraps a {@link OutputStream} in a {@link DataOutputStream}.
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class FSDataOutputStream extends DataOutputStream
- implements Syncable, CanSetDropBehind, StreamCapabilities {
+ implements Syncable, CanSetDropBehind, StreamCapabilities,
+ IOStatisticsSource {
private final OutputStream wrappedStream;
private static class PositionCache extends FilterOutputStream {
@@ -155,4 +159,15 @@ public void setDropBehind(Boolean dropBehind) throws IOException {
"not support setting the drop-behind caching setting.");
}
}
+
+ /**
+ * Get the IO Statistics of the nested stream, falling back to
+ * empty statistics if the stream does not implement the interface
+ * {@link IOStatisticsSource}.
+ * @return an IOStatistics instance.
+ */
+ @Override
+ public IOStatistics getIOStatistics() {
+ return IOStatisticsSupport.retrieveIOStatistics(wrappedStream);
+ }
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStreamBuilder.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStreamBuilder.java
index d43129388bf2e..62a3182dfba20 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStreamBuilder.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStreamBuilder.java
@@ -17,22 +17,18 @@
*/
package org.apache.hadoop.fs;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Options.ChecksumOpt;
+import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.util.Progressable;
import javax.annotation.Nonnull;
import java.io.IOException;
-import java.util.Collections;
import java.util.EnumSet;
-import java.util.HashSet;
-import java.util.Set;
+import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
@@ -87,9 +83,9 @@
@InterfaceAudience.Public
@InterfaceStability.Evolving
public abstract class FSDataOutputStreamBuilder
- > {
+ >
+ extends AbstractFSBuilderImpl {
private final FileSystem fs;
- private final Path path;
private FsPermission permission = null;
private int bufferSize;
private short replication;
@@ -100,34 +96,23 @@ public abstract class FSDataOutputStreamBuilder
private Progressable progress = null;
private ChecksumOpt checksumOpt = null;
- /**
- * Contains optional and mandatory parameters.
- *
- * It does not load default configurations from default files.
- */
- private final Configuration options = new Configuration(false);
-
- /** Keep track of the keys for mandatory options. */
- private final Set
- *
dst already exists and
- * options has {@link Options.Rename#OVERWRITE}
+ * options has {@link Options.Rename#OVERWRITE}
* option false.
* @throws FileNotFoundException If src does not exist
* @throws ParentNotDirectoryException If parent of dst is not a
@@ -1262,7 +1269,7 @@ public void msync() throws IOException, UnsupportedOperationException {
* checks to perform. If the requested permissions are granted, then the
* method returns normally. If access is denied, then the method throws an
* {@link AccessControlException}.
- *
+ *
* Given a path referring to a symlink of form:
*
- * <---X--->
+ * {@literal <---}X{@literal --->}
* fs://host/A/B/link
- * <-----Y----->
+ * {@literal <-----}Y{@literal ----->}
*
* In this path X is the scheme and authority that identify the file system,
* and Y is the path leading up to the final path component "link". If Y is
@@ -1548,7 +1555,7 @@ public FsStatus next(final AbstractFileSystem fs, final Path p)
*
*
* @throws AccessControlException If access is denied
- * @throws FileAlreadyExistsException If file linkcode> already exists
+ * @throws FileAlreadyExistsException If file link already exists
* @throws FileNotFoundException If target does not exist
* @throws ParentNotDirectoryException If parent of link is not a
* directory.
@@ -2050,7 +2057,6 @@ public LocatedFileStatus next() throws IOException {
*
*
- *
* if f == null :
* result = null
- * elif f.getLen() <= start:
+ * elif f.getLen() {@literal <=} start:
* result = []
* else result = [ locations(FS, b) for b in blocks(FS, p, s, s+l)]
*
@@ -2000,7 +2014,6 @@ public FileStatus[] listStatus(Path[] files, PathFilter filter)
*
*
- *
har
*/
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HasEnhancedByteBufferAccess.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HasEnhancedByteBufferAccess.java
index 982a0efef86eb..8ceba7bddd8a2 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HasEnhancedByteBufferAccess.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/HasEnhancedByteBufferAccess.java
@@ -52,18 +52,19 @@ public interface HasEnhancedByteBufferAccess {
* @return
* We will always return an empty buffer if maxLength was 0,
* whether or not we are at EOF.
- * If maxLength > 0, we will return null if the stream has
- * reached EOF.
+ * If maxLength > 0, we will return null if the stream
+ * has reached EOF.
* Otherwise, we will return a ByteBuffer containing at least one
* byte. You must free this ByteBuffer when you are done with it
* by calling releaseBuffer on it. The buffer will continue to be
* readable until it is released in this manner. However, the
* input stream's close method may warn about unclosed buffers.
- * @throws
- * IOException: if there was an error reading.
- * UnsupportedOperationException: if factory was null, and we
- * needed an external byte buffer. UnsupportedOperationException
- * will never be thrown unless the factory argument is null.
+ * @throws IOException if there was an error reading.
+ * @throws UnsupportedOperationException if factory was null,
+ * and we needed an external byte buffer.
+ * @throws UnsupportedOperationException will never be thrown
+ * unless the factory argument is null.
+ *
*/
public ByteBuffer read(ByteBufferPool factory, int maxLength,
EnumSetfile
*/
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java
index 5e932864c8805..9b457272fcb50 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java
@@ -17,9 +17,13 @@
*/
package org.apache.hadoop.fs;
+import java.util.Collections;
import java.util.Optional;
+import java.util.Set;
import java.util.function.Function;
import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@@ -290,7 +294,7 @@ public static ChecksumOpt createDisabled() {
* @param defaultOpt Default checksum option
* @param userOpt User-specified checksum option. Ignored if null.
* @param userBytesPerChecksum User-specified bytesPerChecksum
- * Ignored if < 0.
+ * Ignored if {@literal <} 0.
*/
public static ChecksumOpt processChecksumOpt(ChecksumOpt defaultOpt,
ChecksumOpt userOpt, int userBytesPerChecksum) {
@@ -518,4 +522,119 @@ public enum ChecksumCombineMode {
MD5MD5CRC, // MD5 of block checksums, which are MD5 over chunk CRCs
COMPOSITE_CRC // Block/chunk-independent composite CRC
}
+
+ /**
+ * The standard {@code openFile()} options.
+ */
+ @InterfaceAudience.Public
+ @InterfaceStability.Evolving
+ public static final class OpenFileOptions {
+
+ private OpenFileOptions() {
+ }
+
+ /**
+ * Prefix for all standard filesystem options: {@value}.
+ */
+ private static final String FILESYSTEM_OPTION = "fs.option.";
+
+ /**
+ * Prefix for all openFile options: {@value}.
+ */
+ public static final String FS_OPTION_OPENFILE =
+ FILESYSTEM_OPTION + "openfile.";
+
+ /**
+ * OpenFile option for file length: {@value}.
+ */
+ public static final String FS_OPTION_OPENFILE_LENGTH =
+ FS_OPTION_OPENFILE + "length";
+
+ /**
+ * OpenFile option for split start: {@value}.
+ */
+ public static final String FS_OPTION_OPENFILE_SPLIT_START =
+ FS_OPTION_OPENFILE + "split.start";
+
+ /**
+ * OpenFile option for split end: {@value}.
+ */
+ public static final String FS_OPTION_OPENFILE_SPLIT_END =
+ FS_OPTION_OPENFILE + "split.end";
+
+ /**
+ * OpenFile option for buffer size: {@value}.
+ */
+ public static final String FS_OPTION_OPENFILE_BUFFER_SIZE =
+ FS_OPTION_OPENFILE + "buffer.size";
+
+ /**
+ * OpenFile option for read policies: {@value}.
+ */
+ public static final String FS_OPTION_OPENFILE_READ_POLICY =
+ FS_OPTION_OPENFILE + "read.policy";
+
+ /**
+ * Set of standard options which openFile implementations
+ * MUST recognize, even if they ignore the actual values.
+ */
+ public static final Set
+ *
*/
+ @InterfaceStability.Evolving
public interface CommonStatisticNames {
// The following names are for file system operation invocations
String OP_APPEND = "op_append";
@@ -49,6 +54,7 @@ public interface CommonStatisticNames {
String OP_DELETE = "op_delete";
String OP_EXISTS = "op_exists";
String OP_GET_CONTENT_SUMMARY = "op_get_content_summary";
+ String OP_GET_DELEGATION_TOKEN = "op_get_delegation_token";
String OP_GET_FILE_CHECKSUM = "op_get_file_checksum";
String OP_GET_FILE_STATUS = "op_get_file_status";
String OP_GET_STATUS = "op_get_status";
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java
index 3549cdc4fa392..cb129057ce74e 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java
@@ -59,6 +59,22 @@ public interface StreamCapabilities {
*/
String UNBUFFER = "in:unbuffer";
+ /**
+ * Stream read(ByteBuffer) capability implemented by
+ * {@link ByteBufferReadable#read(java.nio.ByteBuffer)}.
+ */
+ String READBYTEBUFFER = "in:readbytebuffer";
+ /**
+ * Stream read(long, ByteBuffer) capability implemented by
+ * {@link ByteBufferPositionedReadable#read(long, java.nio.ByteBuffer)}.
+ */
+ String PREADBYTEBUFFER = "in:preadbytebuffer";
+
+ /**
+ * IOStatisticsSource API.
+ */
+ String IOSTATISTICS = "iostatistics";
+
/**
* Capabilities that a stream can support and be queried for.
*/
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java
index 644cf4e50eae1..676c207e00dc4 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/ftp/FTPFileSystem.java
@@ -76,7 +76,7 @@ public class FTPFileSystem extends FileSystem {
/**
* Return the protocol scheme for the FileSystem.
- *
+ * ftp
*/
@@ -162,7 +162,7 @@ private FTPClient connect() throws IOException {
/**
* Set FTP's transfer mode based on configuration. Valid values are
* STREAM_TRANSFER_MODE, BLOCK_TRANSFER_MODE and COMPRESSED_TRANSFER_MODE.
- *
+ * .opt("foofs:option.a", true)
+ * .opt("foofs:option.b", "value")
+ * .opt("fs.s3a.open.option.etag", "9fe4c37c25b")
+ * .must("foofs:cache", true)
+ * .must("barfs:cache-size", 256 * 1024 * 1024)
+ * .build();
+ *
+ *
+ * Configuration keys declared in an {@code opt()} may be ignored by
+ * a builder which does not recognise them.
+ *
+ * Configuration keys declared in a {@code must()} function set must
+ * be understood by the implementation or a
+ * {@link IllegalArgumentException} will be thrown.
+ *
+ * @param Return type on the {@link #build()} call.
+ * @param type of builder itself.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public abstract class
+ AbstractFSBuilderImpl>
+ implements FSBuilder {
+
+ public static final String UNKNOWN_MANDATORY_KEY = "Unknown mandatory key";
+
+ @VisibleForTesting
+ static final String E_BOTH_A_PATH_AND_A_PATH_HANDLE
+ = "Both a path and a pathHandle has been provided to the constructor";
+
+ private final Optional
+ * fs.example.s3a.option => s3a:option
+ * fs.example.fs.io.policy => s3a.io.policy
+ * fs.example.something => something
+ *
+ * @param builder builder to modify
+ * @param conf configuration to read
+ * @param optionalPrefix prefix for optional settings
+ * @param mandatoryPrefix prefix for mandatory settings
+ * @param
+ * fs.example.s3a.option => s3a:option
+ * fs.example.fs.io.policy => s3a.io.policy
+ * fs.example.something => something
+ *
+ * @param builder builder to modify
+ * @param conf configuration to read
+ * @param prefix prefix to scan/strip
+ * @param mandatory are the options to be mandatory or optional?
+ */
+ public static void propagateOptions(
+ final FSBuilder, ?> builder,
+ final Configuration conf,
+ final String prefix,
+ final boolean mandatory) {
+
+ final String p = prefix.endsWith(".") ? prefix : (prefix + ".");
+ final Map
* run
- * |-> {@link #processOptions(LinkedList)}
- * \-> {@link #processRawArguments(LinkedList)}
- * |-> {@link #expandArguments(LinkedList)}
- * | \-> {@link #expandArgument(String)}*
- * \-> {@link #processArguments(LinkedList)}
- * |-> {@link #processArgument(PathData)}*
- * | |-> {@link #processPathArgument(PathData)}
- * | \-> {@link #processPaths(PathData, PathData...)}
- * | \-> {@link #processPath(PathData)}*
- * \-> {@link #processNonexistentPath(PathData)}
+ * |{@literal ->} {@link #processOptions(LinkedList)}
+ * \{@literal ->} {@link #processRawArguments(LinkedList)}
+ * |{@literal ->} {@link #expandArguments(LinkedList)}
+ * | \{@literal ->} {@link #expandArgument(String)}*
+ * \{@literal ->} {@link #processArguments(LinkedList)}
+ * |{@literal ->} {@link #processArgument(PathData)}*
+ * | |{@literal ->} {@link #processPathArgument(PathData)}
+ * | \{@literal ->} {@link #processPaths(PathData, PathData...)}
+ * | \{@literal ->} {@link #processPath(PathData)}*
+ * \{@literal ->} {@link #processNonexistentPath(PathData)}
*
* Most commands will chose to implement just
* {@link #processOptions(LinkedList)} and {@link #processPath(PathData)}
@@ -292,8 +292,8 @@ protected void processArgument(PathData item) throws IOException {
/**
* This is the last chance to modify an argument before going into the
* (possibly) recursive {@link #processPaths(PathData, PathData...)}
- * -> {@link #processPath(PathData)} loop. Ex. ls and du use this to
- * expand out directories.
+ * {@literal ->} {@link #processPath(PathData)} loop. Ex. ls and du use
+ * this to expand out directories.
* @param item a {@link PathData} representing a path which exists
* @throws IOException if anything goes wrong...
*/
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandFormat.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandFormat.java
index bf30b22e1fbe5..4dd20d108428e 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandFormat.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/CommandFormat.java
@@ -162,7 +162,7 @@ public String getOptValue(String option) {
/** Returns all the options that are set
*
- * @return Set
+ *
+ *
+ *
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public final class StoreStatisticNames {
+
+ /** {@value}. */
+ public static final String OP_APPEND = "op_append";
+
+ /** {@value}. */
+ public static final String OP_COPY_FROM_LOCAL_FILE =
+ "op_copy_from_local_file";
+
+ /** {@value}. */
+ public static final String OP_CREATE = "op_create";
+
+ /** {@value}. */
+ public static final String OP_CREATE_NON_RECURSIVE =
+ "op_create_non_recursive";
+
+ /** {@value}. */
+ public static final String OP_DELETE = "op_delete";
+
+ /** {@value}. */
+ public static final String OP_EXISTS = "op_exists";
+
+ /** {@value}. */
+ public static final String OP_GET_CONTENT_SUMMARY =
+ "op_get_content_summary";
+
+ /** {@value}. */
+ public static final String OP_GET_DELEGATION_TOKEN =
+ "op_get_delegation_token";
+
+ /** {@value}. */
+ public static final String OP_GET_FILE_CHECKSUM =
+ "op_get_file_checksum";
+
+ /** {@value}. */
+ public static final String OP_GET_FILE_STATUS = "op_get_file_status";
+
+ /** {@value}. */
+ public static final String OP_GET_STATUS = "op_get_status";
+
+ /** {@value}. */
+ public static final String OP_GLOB_STATUS = "op_glob_status";
+
+ /** {@value}. */
+ public static final String OP_IS_FILE = "op_is_file";
+
+ /** {@value}. */
+ public static final String OP_IS_DIRECTORY = "op_is_directory";
+
+ /** {@value}. */
+ public static final String OP_LIST_FILES = "op_list_files";
+
+ /** {@value}. */
+ public static final String OP_LIST_LOCATED_STATUS =
+ "op_list_located_status";
+
+ /** {@value}. */
+ public static final String OP_LIST_STATUS = "op_list_status";
+
+ /** {@value}. */
+ public static final String OP_MKDIRS = "op_mkdirs";
+
+ /** {@value}. */
+ public static final String OP_MODIFY_ACL_ENTRIES = "op_modify_acl_entries";
+
+ /** {@value}. */
+ public static final String OP_OPEN = "op_open";
+
+ /** Call to openFile() {@value}. */
+ public static final String OP_OPENFILE = "op_openfile";
+
+ /** {@value}. */
+ public static final String OP_REMOVE_ACL = "op_remove_acl";
+
+ /** {@value}. */
+ public static final String OP_REMOVE_ACL_ENTRIES = "op_remove_acl_entries";
+
+ /** {@value}. */
+ public static final String OP_REMOVE_DEFAULT_ACL = "op_remove_default_acl";
+
+ /** {@value}. */
+ public static final String OP_RENAME = "op_rename";
+
+ /** {@value}. */
+ public static final String OP_SET_ACL = "op_set_acl";
+
+ /** {@value}. */
+ public static final String OP_SET_OWNER = "op_set_owner";
+
+ /** {@value}. */
+ public static final String OP_SET_PERMISSION = "op_set_permission";
+
+ /** {@value}. */
+ public static final String OP_SET_TIMES = "op_set_times";
+
+ /** {@value}. */
+ public static final String OP_TRUNCATE = "op_truncate";
+
+ /** {@value}. */
+ public static final String DELEGATION_TOKENS_ISSUED
+ = "delegation_tokens_issued";
+
+ /** Requests throttled and retried: {@value}. */
+ public static final String STORE_IO_THROTTLED
+ = "store_io_throttled";
+
+ /** Requests made of a store: {@value}. */
+ public static final String STORE_IO_REQUEST
+ = "store_io_request";
+
+ /**
+ * IO retried: {@value}.
+ */
+ public static final String STORE_IO_RETRY
+ = "store_io_retry";
+
+ /**
+ * A store's equivalent of a paged LIST request was initiated: {@value}.
+ */
+ public static final String OBJECT_LIST_REQUEST
+ = "object_list_request";
+
+ /**
+ * Number of continued object listings made.
+ * Value :{@value}.
+ */
+ public static final String OBJECT_CONTINUE_LIST_REQUEST =
+ "object_continue_list_request";
+
+ /**
+ * A bulk DELETE request was made: {@value}.
+ * A separate statistic from {@link #OBJECT_DELETE_REQUEST}
+ * so that metrics on duration of the operations can
+ * be distinguished.
+ */
+ public static final String OBJECT_BULK_DELETE_REQUEST
+ = "object_bulk_delete_request";
+
+ /**
+ * A store's equivalent of a DELETE request was made: {@value}.
+ * This may be an HTTP DELETE verb, or it may be some custom
+ * operation which takes a list of objects to delete.
+ */
+ public static final String OBJECT_DELETE_REQUEST
+ = "object_delete_request";
+
+ /**
+ * The count of objects deleted in delete requests.
+ */
+ public static final String OBJECT_DELETE_OBJECTS
+ = "object_delete_objects";
+
+ /**
+ * Object multipart upload initiated.
+ * Value :{@value}.
+ */
+ public static final String OBJECT_MULTIPART_UPLOAD_INITIATED =
+ "object_multipart_initiated";
+
+ /**
+ * Object multipart upload aborted.
+ * Value :{@value}.
+ */
+ public static final String OBJECT_MULTIPART_UPLOAD_ABORTED =
+ "object_multipart_aborted";
+
+ /**
+ * Object put/multipart upload count.
+ * Value :{@value}.
+ */
+ public static final String OBJECT_PUT_REQUEST =
+ "object_put_request";
+
+ /**
+ * Object put/multipart upload completed count.
+ * Value :{@value}.
+ */
+ public static final String OBJECT_PUT_REQUEST_COMPLETED =
+ "object_put_request_completed";
+
+ /**
+ * Current number of active put requests.
+ * Value :{@value}.
+ */
+ public static final String OBJECT_PUT_REQUEST_ACTIVE =
+ "object_put_request_active";
+
+ /**
+ * number of bytes uploaded.
+ * Value :{@value}.
+ */
+ public static final String OBJECT_PUT_BYTES =
+ "object_put_bytes";
+
+ /**
+ * number of bytes queued for upload/being actively uploaded.
+ * Value :{@value}.
+ */
+ public static final String OBJECT_PUT_BYTES_PENDING =
+ "object_put_bytes_pending";
+
+ /**
+ * Count of S3 Select (or similar) requests issued.
+ * Value :{@value}.
+ */
+ public static final String OBJECT_SELECT_REQUESTS =
+ "object_select_requests";
+
+ /**
+ * Suffix to use for a minimum value when
+ * the same key is shared across min/mean/max
+ * statistics.
+ *
+ * Value {@value}.
+ */
+ public static final String SUFFIX_MIN = ".min";
+
+ /**
+ * Suffix to use for a maximum value when
+ * the same key is shared across max/mean/max
+ * statistics.
+ *
+ * Value {@value}.
+ */
+ public static final String SUFFIX_MAX = ".max";
+
+ /**
+ * Suffix to use for a mean statistic value when
+ * the same key is shared across mean/mean/max
+ * statistics.
+ *
+ * Value {@value}.
+ */
+ public static final String SUFFIX_MEAN = ".mean";
+
+ /**
+ * String to add to counters and other stats to track failures.
+ * This comes before the .min/.mean//max suffixes.
+ *
+ * Value {@value}.
+ */
+ public static final String SUFFIX_FAILURES = ".failures";
+
+ /**
+ * The name of the statistic collected for executor acquisition if
+ * a duration tracker factory is passed in to the constructor.
+ * {@value}.
+ */
+ public static final String ACTION_EXECUTOR_ACQUIRED =
+ "action_executor_acquired";
+
+ /**
+ * A file was opened: {@value}.
+ */
+ public static final String ACTION_FILE_OPENED
+ = "action_file_opened";
+
+ /**
+ * An HTTP HEAD request was made: {@value}.
+ */
+ public static final String ACTION_HTTP_HEAD_REQUEST
+ = "action_http_head_request";
+
+ /**
+ * An HTTP GET request was made: {@value}.
+ */
+ public static final String ACTION_HTTP_GET_REQUEST
+ = "action_http_get_request";
+
+ /**
+ * An HTTP HEAD request was made: {@value}.
+ */
+ public static final String OBJECT_METADATA_REQUESTS
+ = "object_metadata_request";
+
+ public static final String OBJECT_COPY_REQUESTS
+ = "object_copy_requests";
+
+ public static final String STORE_IO_THROTTLE_RATE
+ = "store_io_throttle_rate";
+
+ public static final String DELEGATION_TOKEN_ISSUED
+ = "delegation_token_issued";
+
+ public static final String MULTIPART_UPLOAD_INSTANTIATED
+ = "multipart_instantiated";
+
+ public static final String MULTIPART_UPLOAD_PART_PUT
+ = "multipart_upload_part_put";
+
+ public static final String MULTIPART_UPLOAD_PART_PUT_BYTES
+ = "multipart_upload_part_put_bytes";
+
+ public static final String MULTIPART_UPLOAD_ABORTED
+ = "multipart_upload_aborted";
+
+ public static final String MULTIPART_UPLOAD_ABORT_UNDER_PATH_INVOKED
+ = "multipart_upload_abort_under_path_invoked";
+
+ public static final String MULTIPART_UPLOAD_COMPLETED
+ = "multipart_upload_completed";
+
+ public static final String MULTIPART_UPLOAD_STARTED
+ = "multipart_upload_started";
+
+ private StoreStatisticNames() {
+ }
+
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
new file mode 100644
index 0000000000000..6ced15dad221a
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.statistics;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * These are common statistic names.
+ *
+ *
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public final class StreamStatisticNames {
+
+ /**
+ * Count of times the TCP stream was aborted.
+ * Value: {@value}.
+ */
+ public static final String STREAM_READ_ABORTED = "stream_aborted";
+
+ /**
+ * Bytes read from an input stream in read() calls.
+ * Does not include bytes read and then discarded in seek/close etc.
+ * These are the bytes returned to the caller.
+ * Value: {@value}.
+ */
+ public static final String STREAM_READ_BYTES
+ = "stream_read_bytes";
+
+ /**
+ * Count of bytes discarded by aborting an input stream .
+ * Value: {@value}.
+ */
+ public static final String STREAM_READ_BYTES_DISCARDED_ABORT
+ = "stream_read_bytes_discarded_in_abort";
+
+ /**
+ * Count of bytes read and discarded when closing an input stream.
+ * Value: {@value}.
+ */
+ public static final String STREAM_READ_BYTES_DISCARDED_CLOSE
+ = "stream_read_bytes_discarded_in_close";
+
+ /**
+ * Count of times the TCP stream was closed.
+ * Value: {@value}.
+ */
+ public static final String STREAM_READ_CLOSED = "stream_read_closed";
+
+ /**
+ * Total count of times an attempt to close an input stream was made.
+ * Value: {@value}.
+ */
+ public static final String STREAM_READ_CLOSE_OPERATIONS
+ = "stream_read_close_operations";
+
+ /**
+ * Total count of times an input stream to was opened.
+ * For object stores, that means the count a GET request was initiated.
+ * Value: {@value}.
+ */
+ public static final String STREAM_READ_OPENED = "stream_read_opened";
+
+ /**
+ * Count of exceptions raised during input stream reads.
+ * Value: {@value}.
+ */
+ public static final String STREAM_READ_EXCEPTIONS =
+ "stream_read_exceptions";
+
+ /**
+ * Count of readFully() operations in an input stream.
+ * Value: {@value}.
+ */
+ public static final String STREAM_READ_FULLY_OPERATIONS
+ = "stream_read_fully_operations";
+
+ /**
+ * Count of read() operations in an input stream.
+ * Value: {@value}.
+ */
+ public static final String STREAM_READ_OPERATIONS =
+ "stream_read_operations";
+
+ /**
+ * Count of incomplete read() operations in an input stream,
+ * that is, when the bytes returned were less than that requested.
+ * Value: {@value}.
+ */
+ public static final String STREAM_READ_OPERATIONS_INCOMPLETE
+ = "stream_read_operations_incomplete";
+
+ /**
+ * count/duration of aborting a remote stream during stream IO
+ * IO.
+ * Value: {@value}.
+ */
+ public static final String STREAM_READ_REMOTE_STREAM_ABORTED
+ = "stream_read_remote_stream_aborted";
+
+ /**
+ * count/duration of closing a remote stream,
+ * possibly including draining the stream to recycle
+ * the HTTP connection.
+ * Value: {@value}.
+ */
+ public static final String STREAM_READ_REMOTE_STREAM_DRAINED
+ = "stream_read_remote_stream_drain";
+
+ /**
+ * Count of version mismatches encountered while reading an input stream.
+ * Value: {@value}.
+ */
+ public static final String STREAM_READ_VERSION_MISMATCHES
+ = "stream_read_version_mismatches";
+
+ /**
+ * Count of executed seek operations which went backwards in a stream.
+ * Value: {@value}.
+ */
+ public static final String STREAM_READ_SEEK_BACKWARD_OPERATIONS =
+ "stream_read_seek_backward_operations";
+
+ /**
+ * Count of bytes moved backwards during seek operations
+ * in an input stream.
+ * Value: {@value}.
+ */
+ public static final String STREAM_READ_SEEK_BYTES_BACKWARDS
+ = "stream_read_bytes_backwards_on_seek";
+
+ /**
+ * Count of bytes read and discarded during seek() in an input stream.
+ * Value: {@value}.
+ */
+ public static final String STREAM_READ_SEEK_BYTES_DISCARDED =
+ "stream_read_seek_bytes_discarded";
+
+ /**
+ * Count of bytes skipped during forward seek operations.
+ * Value: {@value}.
+ */
+ public static final String STREAM_READ_SEEK_BYTES_SKIPPED
+ = "stream_read_seek_bytes_skipped";
+
+ /**
+ * Count of executed seek operations which went forward in
+ * an input stream.
+ * Value: {@value}.
+ */
+ public static final String STREAM_READ_SEEK_FORWARD_OPERATIONS
+ = "stream_read_seek_forward_operations";
+
+ /**
+ * Count of times the seek policy was dynamically changed
+ * in an input stream.
+ * Value: {@value}.
+ */
+ public static final String STREAM_READ_SEEK_POLICY_CHANGED =
+ "stream_read_seek_policy_changed";
+
+ /**
+ * Count of seek operations in an input stream.
+ * Value: {@value}.
+ */
+ public static final String STREAM_READ_SEEK_OPERATIONS =
+ "stream_read_seek_operations";
+
+ /**
+ * Count of {@code InputStream.skip()} calls.
+ * Value: {@value}.
+ */
+ public static final String STREAM_READ_SKIP_OPERATIONS =
+ "stream_read_skip_operations";
+
+ /**
+ * Count bytes skipped in {@code InputStream.skip()} calls.
+ * Value: {@value}.
+ */
+ public static final String STREAM_READ_SKIP_BYTES =
+ "stream_read_skip_bytes";
+
+ /**
+ * Total count of bytes read from an input stream.
+ * Value: {@value}.
+ */
+ public static final String STREAM_READ_TOTAL_BYTES
+ = "stream_read_total_bytes";
+
+ /**
+ * Count of calls of {@code CanUnbuffer.unbuffer()}.
+ * Value: {@value}.
+ */
+ public static final String STREAM_READ_UNBUFFERED
+ = "stream_read_unbuffered";
+
+ /**
+ * "Count of stream write failures reported.
+ * Value: {@value}.
+ */
+ public static final String STREAM_WRITE_EXCEPTIONS =
+ "stream_write_exceptions";
+
+ /**
+ * Count of failures when finalizing a multipart upload:
+ * {@value}.
+ */
+ public static final String STREAM_WRITE_EXCEPTIONS_COMPLETING_UPLOADS =
+ "stream_write_exceptions_completing_upload";
+
+ /**
+ * Count of block/partition uploads complete.
+ * Value: {@value}.
+ */
+ public static final String STREAM_WRITE_BLOCK_UPLOADS
+ = "stream_write_block_uploads";
+
+ /**
+ * Count of number of block uploads aborted.
+ * Value: {@value}.
+ */
+ public static final String STREAM_WRITE_BLOCK_UPLOADS_ABORTED
+ = "stream_write_block_uploads_aborted";
+
+ /**
+ * Count of block/partition uploads active.
+ * Value: {@value}.
+ */
+ public static final String STREAM_WRITE_BLOCK_UPLOADS_ACTIVE
+ = "stream_write_block_uploads_active";
+
+ /**
+ * Gauge of data queued to be written.
+ * Value: {@value}.
+ */
+ public static final String STREAM_WRITE_BLOCK_UPLOADS_BYTES_PENDING =
+ "stream_write_block_uploads_data_pending";
+
+ /**
+ * Count of number of block uploads committed.
+ * Value: {@value}.
+ */
+ public static final String STREAM_WRITE_BLOCK_UPLOADS_COMMITTED
+ = "stream_write_block_uploads_committed";
+
+ /**
+ * Gauge of block/partitions uploads queued to be written.
+ * Value: {@value}.
+ */
+ public static final String STREAM_WRITE_BLOCK_UPLOADS_PENDING
+ = "stream_write_block_uploads_pending";
+
+
+ /**
+ * "Count of bytes written to output stream including all not yet uploaded.
+ * {@value}.
+ */
+ public static final String STREAM_WRITE_BYTES
+ = "stream_write_bytes";
+
+ /**
+ * Count of total time taken for uploads to complete.
+ * {@value}.
+ */
+ public static final String STREAM_WRITE_TOTAL_TIME
+ = "stream_write_total_time";
+
+ /**
+ * Total queue duration of all block uploads.
+ * {@value}.
+ */
+ public static final String STREAM_WRITE_QUEUE_DURATION
+ = "stream_write_queue_duration";
+
+ public static final String STREAM_WRITE_TOTAL_DATA
+ = "stream_write_total_data";
+
+ private StreamStatisticNames() {
+ }
+
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/AbstractIOStatisticsImpl.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/AbstractIOStatisticsImpl.java
new file mode 100644
index 0000000000000..c701a509d8951
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/AbstractIOStatisticsImpl.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.statistics.impl;
+
+import org.apache.hadoop.fs.statistics.IOStatistics;
+
+/**
+ * Base implementation in case common methods/fields need to be added
+ * in future.
+ */
+public abstract class AbstractIOStatisticsImpl implements IOStatistics {
+
+
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/DynamicIOStatistics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/DynamicIOStatistics.java
new file mode 100644
index 0000000000000..50c2625c3513d
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/impl/DynamicIOStatistics.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.statistics.impl;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.function.Function;
+
+import org.apache.hadoop.fs.statistics.MeanStatistic;
+
+/**
+ * These statistics are dynamically evaluated by the supplied
+ * String -> type functions.
+ *
+ * This allows statistic sources to supply a list of callbacks used to
+ * generate the statistics on demand; similar to some of the Coda Hale metrics.
+ *
+ * The evaluation actually takes place during the iteration's {@code next()}
+ * call.
+ */
+final class DynamicIOStatistics
+ extends AbstractIOStatisticsImpl {
+
+ /**
+ * Counter evaluators.
+ */
+ private final EvaluatingStatisticsMap