name property as
* an array of Strings, trimmed of the leading and trailing whitespace.
* If no such property is specified then an empty array is returned.
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/OpensslCipher.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/OpensslCipher.java
index b166cfc8611b3..c8a10404b0f84 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/OpensslCipher.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/OpensslCipher.java
@@ -177,6 +177,20 @@ private static Transform tokenizeTransformation(String transformation)
}
return new Transform(parts[0], parts[1], parts[2]);
}
+
+ public static boolean isSupported(CipherSuite suite) {
+ Transform transform;
+ int algMode;
+ int padding;
+ try {
+ transform = tokenizeTransformation(suite.getName());
+ algMode = AlgMode.get(transform.alg, transform.mode);
+ padding = Padding.get(transform.padding);
+ } catch (NoSuchAlgorithmException|NoSuchPaddingException e) {
+ return false;
+ }
+ return isSupportedSuite(algMode, padding);
+ }
/**
* Initialize this cipher with a key and IV.
@@ -298,5 +312,7 @@ private native int doFinal(long context, ByteBuffer output, int offset,
private native void clean(long ctx, long engineNum);
+ private native static boolean isSupportedSuite(int alg, int padding);
+
public native static String getLibraryName();
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/OpensslSm4CtrCryptoCodec.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/OpensslSm4CtrCryptoCodec.java
index f6b2f6a802556..9df1bbe89efa4 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/OpensslSm4CtrCryptoCodec.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/OpensslSm4CtrCryptoCodec.java
@@ -41,6 +41,10 @@ public OpensslSm4CtrCryptoCodec() {
if (loadingFailureReason != null) {
throw new RuntimeException(loadingFailureReason);
}
+
+ if (!OpensslCipher.isSupported(CipherSuite.SM4_CTR_NOPADDING)) {
+ throw new RuntimeException("The OpenSSL native library is built without SM4 CTR support");
+ }
}
@Override
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java
index f0c912224f90f..10f7b428ad142 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.crypto.key.kms;
import org.apache.commons.codec.binary.Base64;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.KeyProvider;
@@ -561,17 +562,19 @@ private + * Callers MUST have no expectation that parent directories will exist after the + * operation completes; if an object store needs to explicitly look for and create + * directory markers, that step will be omitted. + *
+ * Be aware that on some stores (AWS S3) each object listed in a bulk delete counts + * against the write IOPS limit; large page sizes are counterproductive here, as + * are attempts at parallel submissions across multiple threads. + * @see HADOOP-16823. + * Large DeleteObject requests are their own Thundering Herd + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public interface BulkDelete extends IOStatisticsSource, Closeable { + + /** + * The maximum number of objects/files to delete in a single request. + * @return a number greater than zero. + */ + int pageSize(); + + /** + * Base path of a bulk delete operation. + * All paths submitted in {@link #bulkDelete(Collection)} must be under this path. + * @return base path of a bulk delete operation. + */ + Path basePath(); + + /** + * Delete a list of files/objects. + *
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs;
+
+import java.util.Collection;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.util.Preconditions.checkArgument;
+
+/**
+ * Utility class for bulk delete operations.
+ */
+public final class BulkDeleteUtils {
+
+ private BulkDeleteUtils() {
+ }
+
+ /**
+ * Preconditions for bulk delete paths.
+ * @param paths paths to delete.
+ * @param pageSize maximum number of paths to delete in a single operation.
+ * @param basePath base path for the delete operation.
+ */
+ public static void validateBulkDeletePaths(Collection
+ * Key implications from this path capability being true:
+ *
+ * Two ranges overlap when the start offset
* of second is less than the end offset of first.
* End offset is calculated as start offset + length.
- * @param input list if input ranges.
- * @return true/false based on logic explained above.
+ * @param input input list
+ * @param fileLength file length if known
+ * @return a new sorted list.
+ * @throws IllegalArgumentException if there are overlapping ranges or
+ * a range element is invalid (other than with negative offset)
+ * @throws EOFException if the last range extends beyond the end of the file supplied
+ * or a range offset is negative
*/
- public static List extends FileRange> validateNonOverlappingAndReturnSortedRanges(
- List extends FileRange> input) {
+ public static List extends FileRange> validateAndSortRanges(
+ final List extends FileRange> input,
+ final Optional
+ * This method is used externally and must be retained with
+ * the signature unchanged.
+ * @param input input ranges.
+ * @return a new list of the ranges, sorted by offset.
+ */
+ @InterfaceStability.Stable
public static FileRange[] sortRanges(List extends FileRange> input) {
- FileRange[] sortedRanges = input.toArray(new FileRange[0]);
- Arrays.sort(sortedRanges, Comparator.comparingLong(FileRange::getOffset));
- return sortedRanges;
+ return sortRangeList(input).toArray(new FileRange[0]);
}
/**
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/WithErasureCoding.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/WithErasureCoding.java
new file mode 100644
index 0000000000000..5f8a7fbad6ea3
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/WithErasureCoding.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs;
+
+import java.io.IOException;
+
+/**
+ * Filesystems that support EC can implement this interface.
+ */
+public interface WithErasureCoding {
+
+ /**
+ * Get the EC Policy name of the given file's fileStatus.
+ * If the file is not erasure coded, this shall return null.
+ * Callers will make sure to check if fileStatus isInstance of
+ * an FS that implements this interface.
+ * If the call fails due to some error, this shall return null.
+ * @param fileStatus object of the file whose ecPolicy needs to be obtained.
+ * @return the ec Policy name
+ */
+ String getErasureCodingPolicyName(FileStatus fileStatus);
+
+ /**
+ * Set the given ecPolicy on the path.
+ * The path and ecPolicyName should be valid (not null/empty, the
+ * implementing FS shall support the supplied ecPolicy).
+ * implementations can throw IOException if these conditions are not met.
+ * @param path on which the EC policy needs to be set.
+ * @param ecPolicyName the EC policy.
+ * @throws IOException if there is an error during the set op.
+ */
+ void setErasureCodingPolicy(Path path, String ecPolicyName) throws
+ IOException;
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/CombinedFileRange.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/CombinedFileRange.java
index c9555a1e5414e..b0fae1305e3b8 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/CombinedFileRange.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/CombinedFileRange.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.fs.impl;
+import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FileRange;
import java.util.ArrayList;
@@ -27,13 +28,32 @@
* A file range that represents a set of underlying file ranges.
* This is used when we combine the user's FileRange objects
* together into a single read for efficiency.
+ *
+ * This class is not part of the public API; it MAY BE used as a parameter
+ * to vector IO operations in FileSystem implementation code (and is)
*/
+@InterfaceAudience.Private
public class CombinedFileRange extends FileRangeImpl {
- private List
+ * Thread safety: there is no synchronization on a mutable {@code FlagSet}.
+ * Once declared immutable, flags cannot be changed, so they
+ * becomes implicitly thread-safe.
+ */
+public final class FlagSet
+ * This is immutable.
+ * @return the flags.
+ */
+ public EnumSet
+ * The immutability flag is not considered, nor is the
+ * {@link #namesToValues} map, though as that is generated from
+ * the enumeration and prefix, it is implicitly equal if the prefix
+ * and enumClass fields are equal.
+ * @param o other object
+ * @return true iff the equality condition is met.
+ */
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ FlagSet> flagSet = (FlagSet>) o;
+ return Objects.equals(enumClass, flagSet.enumClass)
+ && Objects.equals(prefix, flagSet.prefix)
+ && Objects.equals(flags, flagSet.flags);
+ }
+
+ /**
+ * Hash code is based on the flags.
+ * @return a hash code.
+ */
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(flags);
+ }
+
+ /**
+ * Create a copy of the FlagSet.
+ * @return a new mutable instance with a separate copy of the flags
+ */
+ public FlagSet
+ * This header may be shared across multiple threads at the same time.
+ * so some methods are marked as synchronized, specifically those reading
+ * or writing the attribute map.
+ *
+ * For the same reason, maps and lists passed down during construction are
+ * copied into thread safe structures.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
@@ -81,6 +90,14 @@ public final class HttpReferrerAuditHeader {
private static final LogExactlyOnce WARN_OF_URL_CREATION =
new LogExactlyOnce(LOG);
+ /**
+ * Log for warning of an exception raised when building
+ * the referrer header, including building the evaluated
+ * attributes.
+ */
+ private static final LogExactlyOnce ERROR_BUILDING_REFERRER_HEADER =
+ new LogExactlyOnce(LOG);
+
/** Context ID. */
private final String contextId;
@@ -122,7 +139,11 @@ public final class HttpReferrerAuditHeader {
/**
* Instantiate.
- *
+ *
+ * All maps/enums passed down are copied into thread safe equivalents.
+ * as their origin is unknown and cannot be guaranteed to
+ * not be shared.
+ *
* Context and operationId are expected to be well formed
* numeric/hex strings, at least adequate to be
* used as individual path elements in a URL.
@@ -130,15 +151,15 @@ public final class HttpReferrerAuditHeader {
private HttpReferrerAuditHeader(
final Builder builder) {
this.contextId = requireNonNull(builder.contextId);
- this.evaluated = builder.evaluated;
- this.filter = builder.filter;
+ this.evaluated = new ConcurrentHashMap<>(builder.evaluated);
+ this.filter = ImmutableSet.copyOf(builder.filter);
this.operationName = requireNonNull(builder.operationName);
this.path1 = builder.path1;
this.path2 = builder.path2;
this.spanId = requireNonNull(builder.spanId);
// copy the parameters from the builder and extend
- attributes = builder.attributes;
+ attributes = new ConcurrentHashMap<>(builder.attributes);
addAttribute(PARAM_OP, operationName);
addAttribute(PARAM_PATH, path1);
@@ -166,17 +187,18 @@ private HttpReferrerAuditHeader(
* per entry, and "" returned.
* @return a referrer string or ""
*/
- public String buildHttpReferrer() {
+ public synchronized String buildHttpReferrer() {
String header;
try {
+ Map
+ * The intent is to avoid the need for complex reflection operations
+ * including wrapping of parameter classes, direct instantiation of
+ * new classes etc.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public final class WrappedIO {
+
+ private WrappedIO() {
+ }
+
+ /**
+ * Get the maximum number of objects/files to delete in a single request.
+ * @param fs filesystem
+ * @param path path to delete under.
+ * @return a number greater than or equal to zero.
+ * @throws UnsupportedOperationException bulk delete under that path is not supported.
+ * @throws IllegalArgumentException path not valid.
+ * @throws UncheckedIOException if an IOE was raised.
+ */
+ public static int bulkDelete_pageSize(FileSystem fs, Path path) {
+
+ return uncheckIOExceptions(() -> {
+ try (BulkDelete bulk = fs.createBulkDelete(path)) {
+ return bulk.pageSize();
+ }
+ });
+ }
+
+ /**
+ * Delete a list of files/objects.
+ *
+ * Returns false if the source is null or does not contain any statistics.
+ * @param source implementation of {@link IOStatisticsSource} or {@link IOStatistics}
+ * @return true if the the source object was aggregated.
+ */
+ public static boolean iostatisticsContext_aggregate(Object source) {
+ IOStatistics stats = retrieveIOStatistics(source);
+ if (stats != null) {
+ getCurrentIOStatisticsContext().getAggregator().aggregate(stats);
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * Convert IOStatistics to a string form, with all the metrics sorted
+ * and empty value stripped.
+ * @param statistics A statistics instance; may be null
+ * @return string value or the empty string if null
+ */
+ public static String iostatistics_toPrettyString(@Nullable Object statistics) {
+ return statistics == null
+ ? ""
+ : ioStatisticsToPrettyString((IOStatistics) statistics);
+ }
+
+ /**
+ * Apply a function to an object which may be an IOStatisticsSnapshot.
+ * @param
+ * If the WrappedIO class is found, use it.
+ *
+ * If not, falls back to the classic {@code fs.open(Path)} call.
+ * @param fs filesystem
+ * @param status file status
+ * @param readPolicies read policy to use
+ * @return the input stream
+ * @throws IOException any IO failure.
+ */
+ public static FSDataInputStream openFile(
+ FileSystem fs,
+ FileStatus status,
+ String readPolicies) throws IOException {
+ return openFileOnInstance(instance(), fs, status, readPolicies);
+ }
+
+ /**
+ * Open a file.
+ *
+ * If the WrappedIO class is found, uses
+ * {@link #fileSystem_openFile(FileSystem, Path, String, FileStatus, Long, Map)} with
+ * {@link #PARQUET_READ_POLICIES} as the list of read policies and passing down
+ * the file status.
+ *
+ * If not, falls back to the classic {@code fs.open(Path)} call.
+ * @param instance dynamic wrapped IO instance.
+ * @param fs filesystem
+ * @param status file status
+ * @param readPolicies read policy to use
+ * @return the input stream
+ * @throws IOException any IO failure.
+ */
+ @VisibleForTesting
+ static FSDataInputStream openFileOnInstance(
+ DynamicWrappedIO instance,
+ FileSystem fs,
+ FileStatus status,
+ String readPolicies) throws IOException {
+ FSDataInputStream stream;
+ if (instance.fileSystem_openFile_available()) {
+ // use openfile for a higher performance read
+ // and the ability to set a read policy.
+ // This optimizes for cloud storage by saving on IO
+ // in open and choosing the range for GET requests.
+ // For other stores, it ultimately invokes the classic open(Path)
+ // call so is no more expensive than before.
+ LOG.debug("Opening file {} through fileSystem_openFile", status);
+ stream = instance.fileSystem_openFile(fs,
+ status.getPath(),
+ readPolicies,
+ status,
+ null,
+ null);
+ } else {
+ LOG.debug("Opening file {} through open()", status);
+ stream = fs.open(status.getPath());
+ }
+ return stream;
+ }
+
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/wrappedio/impl/DynamicWrappedStatistics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/wrappedio/impl/DynamicWrappedStatistics.java
new file mode 100644
index 0000000000000..a4a25b036bc92
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/wrappedio/impl/DynamicWrappedStatistics.java
@@ -0,0 +1,678 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.io.wrappedio.impl;
+
+import java.io.Serializable;
+import java.io.UncheckedIOException;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.statistics.IOStatistics;
+import org.apache.hadoop.fs.statistics.IOStatisticsSource;
+import org.apache.hadoop.util.dynamic.DynMethods;
+
+import static org.apache.hadoop.util.dynamic.BindingUtils.available;
+import static org.apache.hadoop.util.dynamic.BindingUtils.checkAvailable;
+import static org.apache.hadoop.util.dynamic.BindingUtils.loadClass;
+import static org.apache.hadoop.util.dynamic.BindingUtils.loadStaticMethod;
+
+/**
+ * The wrapped IOStatistics methods in {@code WrappedStatistics},
+ * dynamically loaded.
+ * This is suitable for copy-and-paste into other libraries which have some
+ * version of the Parquet DynMethods classes already present.
+ */
+public final class DynamicWrappedStatistics {
+
+ /**
+ * Classname of the wrapped statistics class: {@value}.
+ */
+ public static final String WRAPPED_STATISTICS_CLASSNAME =
+ "org.apache.hadoop.io.wrappedio.WrappedStatistics";
+
+ /**
+ * Method name: {@value}.
+ */
+ public static final String IS_IOSTATISTICS_SOURCE = "isIOStatisticsSource";
+
+ /**
+ * Method name: {@value}.
+ */
+ public static final String IS_IOSTATISTICS = "isIOStatistics";
+
+ /**
+ * Method name: {@value}.
+ */
+ public static final String IS_IOSTATISTICS_SNAPSHOT = "isIOStatisticsSnapshot";
+
+ /**
+ * IOStatisticsContext method: {@value}.
+ */
+ public static final String IOSTATISTICS_CONTEXT_AGGREGATE = "iostatisticsContext_aggregate";
+
+ /**
+ * IOStatisticsContext method: {@value}.
+ */
+ public static final String IOSTATISTICS_CONTEXT_ENABLED = "iostatisticsContext_enabled";
+
+ /**
+ * IOStatisticsContext method: {@value}.
+ */
+ public static final String IOSTATISTICS_CONTEXT_GET_CURRENT = "iostatisticsContext_getCurrent";
+
+ /**
+ * IOStatisticsContext method: {@value}.
+ */
+ public static final String IOSTATISTICS_CONTEXT_SET_THREAD_CONTEXT =
+ "iostatisticsContext_setThreadIOStatisticsContext";
+
+ /**
+ * IOStatisticsContext method: {@value}.
+ */
+ public static final String IOSTATISTICS_CONTEXT_RESET = "iostatisticsContext_reset";
+
+ /**
+ * IOStatisticsContext method: {@value}.
+ */
+ public static final String IOSTATISTICS_CONTEXT_SNAPSHOT = "iostatisticsContext_snapshot";
+
+
+ /**
+ * Method name: {@value}.
+ */
+ public static final String IOSTATISTICS_SNAPSHOT_AGGREGATE = "iostatisticsSnapshot_aggregate";
+
+ /**
+ * Method name: {@value}.
+ */
+ public static final String IOSTATISTICS_SNAPSHOT_CREATE = "iostatisticsSnapshot_create";
+
+ /**
+ * Method name: {@value}.
+ */
+ public static final String IOSTATISTICS_SNAPSHOT_FROM_JSON_STRING =
+ "iostatisticsSnapshot_fromJsonString";
+
+ /**
+ * Method name: {@value}.
+ */
+ public static final String IOSTATISTICS_SNAPSHOT_LOAD = "iostatisticsSnapshot_load";
+
+ /**
+ * Method name: {@value}.
+ */
+ public static final String IOSTATISTICS_SNAPSHOT_RETRIEVE = "iostatisticsSnapshot_retrieve";
+
+ /**
+ * Method name: {@value}.
+ */
+ public static final String IOSTATISTICS_SNAPSHOT_SAVE = "iostatisticsSnapshot_save";
+
+ /**
+ * Method name: {@value}.
+ */
+ public static final String IOSTATISTICS_SNAPSHOT_TO_JSON_STRING =
+ "iostatisticsSnapshot_toJsonString";
+
+ /**
+ * Method name: {@value}.
+ */
+ public static final String IOSTATISTICS_TO_PRETTY_STRING =
+ "iostatistics_toPrettyString";
+
+ /**
+ * Method name: {@value}.
+ */
+ public static final String IOSTATISTICS_COUNTERS = "iostatistics_counters";
+
+ /**
+ * Method name: {@value}.
+ */
+ public static final String IOSTATISTICS_GAUGES = "iostatistics_gauges";
+
+ /**
+ * Method name: {@value}.
+ */
+ public static final String IOSTATISTICS_MINIMUMS = "iostatistics_minimums";
+
+ /**
+ * Method name: {@value}.
+ */
+ public static final String IOSTATISTICS_MAXIMUMS = "iostatistics_maximums";
+
+ /**
+ * Method name: {@value}.
+ */
+ public static final String IOSTATISTICS_MEANS = "iostatistics_means";
+
+ /**
+ * Was wrapped IO loaded?
+ * In the hadoop codebase, this is true.
+ * But in other libraries it may not always be true...this
+ * field is used to assist copy-and-paste adoption.
+ */
+ private final boolean loaded;
+
+ /*
+ IOStatisticsContext methods.
+ */
+ private final DynMethods.UnboundMethod iostatisticsContextAggregateMethod;
+
+ private final DynMethods.UnboundMethod iostatisticsContextEnabledMethod;
+
+ private final DynMethods.UnboundMethod iostatisticsContextGetCurrentMethod;
+
+ private final DynMethods.UnboundMethod iostatisticsContextResetMethod;
+
+ private final DynMethods.UnboundMethod iostatisticsContextSetThreadContextMethod;
+
+ private final DynMethods.UnboundMethod iostatisticsContextSnapshotMethod;
+
+ private final DynMethods.UnboundMethod iostatisticsSnapshotAggregateMethod;
+
+ private final DynMethods.UnboundMethod iostatisticsSnapshotCreateMethod;
+
+ private final DynMethods.UnboundMethod iostatisticsSnapshotCreateWithSourceMethod;
+
+ private final DynMethods.UnboundMethod iostatisticsSnapshotLoadMethod;
+
+ private final DynMethods.UnboundMethod iostatisticsSnapshotFromJsonStringMethod;
+
+ private final DynMethods.UnboundMethod iostatisticsSnapshotRetrieveMethod;
+
+ private final DynMethods.UnboundMethod iostatisticsSnapshotSaveMethod;
+
+ private final DynMethods.UnboundMethod iostatisticsToPrettyStringMethod;
+
+ private final DynMethods.UnboundMethod iostatisticsSnapshotToJsonStringMethod;
+
+ private final DynMethods.UnboundMethod iostatisticsCountersMethod;
+
+ private final DynMethods.UnboundMethod iostatisticsGaugesMethod;
+
+ private final DynMethods.UnboundMethod iostatisticsMinimumsMethod;
+
+ private final DynMethods.UnboundMethod iostatisticsMaximumsMethod;
+
+ private final DynMethods.UnboundMethod iostatisticsMeansMethod;
+
+ private final DynMethods.UnboundMethod isIOStatisticsSourceMethod;
+
+ private final DynMethods.UnboundMethod isIOStatisticsMethod;
+
+ private final DynMethods.UnboundMethod isIOStatisticsSnapshotMethod;
+
+
+ public DynamicWrappedStatistics() {
+ this(WRAPPED_STATISTICS_CLASSNAME);
+ }
+
+ public DynamicWrappedStatistics(String classname) {
+
+ // wrap the real class.
+ Class> wrappedClass = loadClass(classname);
+
+ loaded = wrappedClass != null;
+
+ // instanceof checks
+ isIOStatisticsSourceMethod = loadStaticMethod(wrappedClass,
+ Boolean.class, IS_IOSTATISTICS_SOURCE, Object.class);
+ isIOStatisticsMethod = loadStaticMethod(wrappedClass,
+ Boolean.class, IS_IOSTATISTICS, Object.class);
+ isIOStatisticsSnapshotMethod = loadStaticMethod(wrappedClass,
+ Boolean.class, IS_IOSTATISTICS_SNAPSHOT, Serializable.class);
+
+ // IOStatisticsContext operations
+ iostatisticsContextAggregateMethod = loadStaticMethod(wrappedClass,
+ Boolean.class, IOSTATISTICS_CONTEXT_AGGREGATE, Object.class);
+ iostatisticsContextEnabledMethod = loadStaticMethod(wrappedClass,
+ Boolean.class, IOSTATISTICS_CONTEXT_ENABLED);
+ iostatisticsContextGetCurrentMethod = loadStaticMethod(wrappedClass,
+ Object.class, IOSTATISTICS_CONTEXT_GET_CURRENT);
+ iostatisticsContextResetMethod = loadStaticMethod(wrappedClass,
+ Void.class, IOSTATISTICS_CONTEXT_RESET);
+ iostatisticsContextSetThreadContextMethod = loadStaticMethod(wrappedClass,
+ Void.class, IOSTATISTICS_CONTEXT_SET_THREAD_CONTEXT, Object.class);
+ iostatisticsContextSnapshotMethod = loadStaticMethod(wrappedClass,
+ Serializable.class, IOSTATISTICS_CONTEXT_SNAPSHOT);
+
+ // IOStatistics Snapshot operations
+
+ iostatisticsSnapshotAggregateMethod =
+ loadStaticMethod(wrappedClass,
+ Boolean.class,
+ IOSTATISTICS_SNAPSHOT_AGGREGATE,
+ Serializable.class,
+ Object.class);
+
+ iostatisticsSnapshotCreateMethod =
+ loadStaticMethod(wrappedClass,
+ Serializable.class,
+ IOSTATISTICS_SNAPSHOT_CREATE);
+
+ iostatisticsSnapshotCreateWithSourceMethod =
+ loadStaticMethod(wrappedClass,
+ Serializable.class,
+ IOSTATISTICS_SNAPSHOT_CREATE,
+ Object.class);
+
+ iostatisticsSnapshotFromJsonStringMethod =
+ loadStaticMethod(wrappedClass,
+ Serializable.class,
+ IOSTATISTICS_SNAPSHOT_FROM_JSON_STRING,
+ String.class);
+
+ iostatisticsSnapshotToJsonStringMethod =
+ loadStaticMethod(wrappedClass,
+ String.class,
+ IOSTATISTICS_SNAPSHOT_TO_JSON_STRING,
+ Serializable.class);
+
+ iostatisticsSnapshotRetrieveMethod =
+ loadStaticMethod(wrappedClass,
+ Serializable.class,
+ IOSTATISTICS_SNAPSHOT_RETRIEVE,
+ Object.class);
+
+ iostatisticsSnapshotLoadMethod =
+ loadStaticMethod(wrappedClass,
+ Serializable.class,
+ IOSTATISTICS_SNAPSHOT_LOAD,
+ FileSystem.class,
+ Path.class);
+
+ iostatisticsSnapshotSaveMethod =
+ loadStaticMethod(wrappedClass,
+ Void.class,
+ IOSTATISTICS_SNAPSHOT_SAVE,
+ Serializable.class,
+ FileSystem.class,
+ Path.class,
+ boolean.class); // note: not Boolean.class
+
+ // getting contents of snapshots
+ iostatisticsCountersMethod =
+ loadStaticMethod(wrappedClass,
+ Map.class,
+ IOSTATISTICS_COUNTERS,
+ Serializable.class);
+ iostatisticsGaugesMethod =
+ loadStaticMethod(wrappedClass,
+ Map.class,
+ IOSTATISTICS_GAUGES,
+ Serializable.class);
+ iostatisticsMinimumsMethod =
+ loadStaticMethod(wrappedClass,
+ Map.class,
+ IOSTATISTICS_MINIMUMS,
+ Serializable.class);
+ iostatisticsMaximumsMethod =
+ loadStaticMethod(wrappedClass,
+ Map.class,
+ IOSTATISTICS_MAXIMUMS,
+ Serializable.class);
+ iostatisticsMeansMethod =
+ loadStaticMethod(wrappedClass,
+ Map.class,
+ IOSTATISTICS_MEANS,
+ Serializable.class);
+
+ // stringification
+
+ iostatisticsToPrettyStringMethod =
+ loadStaticMethod(wrappedClass,
+ String.class,
+ IOSTATISTICS_TO_PRETTY_STRING,
+ Object.class);
+
+ }
+
+ /**
+ * Is the wrapped statistics class loaded?
+ * @return true if the wrappedIO class was found and loaded.
+ */
+ public boolean loaded() {
+ return loaded;
+ }
+
+ /**
+ * Are the core IOStatistics methods and classes available.
+ * @return true if the relevant methods are loaded.
+ */
+ public boolean ioStatisticsAvailable() {
+ return available(iostatisticsSnapshotCreateMethod);
+ }
+
+ /**
+ * Are the IOStatisticsContext methods and classes available?
+ * @return true if the relevant methods are loaded.
+ */
+ public boolean ioStatisticsContextAvailable() {
+ return available(iostatisticsContextEnabledMethod);
+ }
+
+ /**
+ * Require a IOStatistics to be available.
+ * @throws UnsupportedOperationException if the method was not found.
+ */
+ private void checkIoStatisticsAvailable() {
+ checkAvailable(iostatisticsSnapshotCreateMethod);
+ }
+
+ /**
+ * Require IOStatisticsContext methods to be available.
+ * @throws UnsupportedOperationException if the classes/methods were not found
+ */
+ private void checkIoStatisticsContextAvailable() {
+ checkAvailable(iostatisticsContextEnabledMethod);
+ }
+
+ /**
+ * Probe for an object being an instance of {@code IOStatisticsSource}.
+ * @param object object to probe
+ * @return true if the object is the right type, false if the classes
+ * were not found or the object is null/of a different type
+ */
+ public boolean isIOStatisticsSource(Object object) {
+ return ioStatisticsAvailable()
+ && (boolean) isIOStatisticsSourceMethod.invoke(null, object);
+ }
+
+ /**
+ * Probe for an object being an instance of {@code IOStatisticsSource}.
+ * @param object object to probe
+ * @return true if the object is the right type, false if the classes
+ * were not found or the object is null/of a different type
+ */
+ public boolean isIOStatistics(Object object) {
+ return ioStatisticsAvailable()
+ && (boolean) isIOStatisticsMethod.invoke(null, object);
+ }
+
+ /**
+ * Probe for an object being an instance of {@code IOStatisticsSnapshot}.
+ * @param object object to probe
+ * @return true if the object is the right type, false if the classes
+ * were not found or the object is null/of a different type
+ */
+ public boolean isIOStatisticsSnapshot(Serializable object) {
+ return ioStatisticsAvailable()
+ && (boolean) isIOStatisticsSnapshotMethod.invoke(null, object);
+ }
+
+ /**
+ * Probe to check if the thread-level IO statistics enabled.
+ * If the relevant classes and methods were not found, returns false
+ * @return true if the IOStatisticsContext API was found
+ * and is enabled.
+ */
+ public boolean iostatisticsContext_enabled() {
+ return ioStatisticsAvailable()
+ && (boolean) iostatisticsContextEnabledMethod.invoke(null);
+ }
+
+ /**
+ * Get the context's {@code IOStatisticsContext} which
+ * implements {@code IOStatisticsSource}.
+ * This is either a thread-local value or a global empty context.
+ * @return instance of {@code IOStatisticsContext}.
+ * @throws UnsupportedOperationException if the IOStatisticsContext API was not found
+ */
+ public Object iostatisticsContext_getCurrent()
+ throws UnsupportedOperationException {
+ checkIoStatisticsContextAvailable();
+ return iostatisticsContextGetCurrentMethod.invoke(null);
+ }
+
+ /**
+ * Set the IOStatisticsContext for the current thread.
+ * @param statisticsContext IOStatistics context instance for the
+ * current thread. If null, the context is reset.
+ * @throws UnsupportedOperationException if the IOStatisticsContext API was not found
+ */
+ public void iostatisticsContext_setThreadIOStatisticsContext(
+ @Nullable Object statisticsContext) throws UnsupportedOperationException {
+ checkIoStatisticsContextAvailable();
+ iostatisticsContextSetThreadContextMethod.invoke(null, statisticsContext);
+ }
+
+ /**
+ * Reset the context's IOStatistics.
+ * {@code IOStatisticsContext#reset()}
+ * @throws UnsupportedOperationException if the IOStatisticsContext API was not found
+ */
+ public void iostatisticsContext_reset()
+ throws UnsupportedOperationException {
+ checkIoStatisticsContextAvailable();
+ iostatisticsContextResetMethod.invoke(null);
+ }
+
+ /**
+ * Take a snapshot of the context IOStatistics.
+ * {@code IOStatisticsContext#snapshot()}
+ * @return an instance of {@code IOStatisticsSnapshot}.
+ * @throws UnsupportedOperationException if the IOStatisticsContext API was not found
+ */
+ public Serializable iostatisticsContext_snapshot()
+ throws UnsupportedOperationException {
+ checkIoStatisticsContextAvailable();
+ return iostatisticsContextSnapshotMethod.invoke(null);
+ }
+ /**
+ * Aggregate into the IOStatistics context the statistics passed in via
+ * IOStatistics/source parameter.
+ *
+ * Returns false if the source is null or does not contain any statistics.
+ * @param source implementation of {@link IOStatisticsSource} or {@link IOStatistics}
+ * @return true if the the source object was aggregated.
+ */
+ public boolean iostatisticsContext_aggregate(Object source) {
+ checkIoStatisticsContextAvailable();
+ return iostatisticsContextAggregateMethod.invoke(null, source);
+ }
+
+ /**
+ * Aggregate an existing {@code IOStatisticsSnapshot} with
+ * the supplied statistics.
+ * @param snapshot snapshot to update
+ * @param statistics IOStatistics to add
+ * @return true if the snapshot was updated.
+ * @throws IllegalArgumentException if the {@code statistics} argument is not
+ * null but not an instance of IOStatistics, or if {@code snapshot} is invalid.
+ * @throws UnsupportedOperationException if the IOStatistics classes were not found
+ */
+ public boolean iostatisticsSnapshot_aggregate(
+ Serializable snapshot, @Nullable Object statistics)
+ throws UnsupportedOperationException {
+ checkIoStatisticsAvailable();
+ return iostatisticsSnapshotAggregateMethod.invoke(null, snapshot, statistics);
+ }
+
+ /**
+ * Create a new {@code IOStatisticsSnapshot} instance.
+ * @return an empty IOStatisticsSnapshot.
+ * @throws UnsupportedOperationException if the IOStatistics classes were not found
+ */
+ public Serializable iostatisticsSnapshot_create()
+ throws UnsupportedOperationException {
+ checkIoStatisticsAvailable();
+ return iostatisticsSnapshotCreateMethod.invoke(null);
+ }
+
+ /**
+ * Create a new {@code IOStatisticsSnapshot} instance.
+ * @param source optional source statistics
+ * @return an IOStatisticsSnapshot.
+ * @throws ClassCastException if the {@code source} is not valid.
+ * @throws UnsupportedOperationException if the IOStatistics classes were not found
+ */
+ public Serializable iostatisticsSnapshot_create(
+ @Nullable Object source)
+ throws UnsupportedOperationException, ClassCastException {
+ checkIoStatisticsAvailable();
+ return iostatisticsSnapshotCreateWithSourceMethod.invoke(null, source);
+ }
+
+ /**
+ * Save IOStatisticsSnapshot to a JSON string.
+ * @param snapshot statistics; may be null or of an incompatible type
+ * @return JSON string value or null if source is not an IOStatisticsSnapshot
+ * @throws UncheckedIOException Any IO/jackson exception.
+ * @throws UnsupportedOperationException if the IOStatistics classes were not found
+ */
+ public String iostatisticsSnapshot_toJsonString(@Nullable Serializable snapshot)
+ throws UncheckedIOException, UnsupportedOperationException {
+ checkIoStatisticsAvailable();
+ return iostatisticsSnapshotToJsonStringMethod.invoke(null, snapshot);
+ }
+
+ /**
+ * Load IOStatisticsSnapshot from a JSON string.
+ * @param json JSON string value.
+ * @return deserialized snapshot.
+ * @throws UncheckedIOException Any IO/jackson exception.
+ * @throws UnsupportedOperationException if the IOStatistics classes were not found
+ */
+ public Serializable iostatisticsSnapshot_fromJsonString(
+ final String json) throws UncheckedIOException, UnsupportedOperationException {
+ checkIoStatisticsAvailable();
+ return iostatisticsSnapshotFromJsonStringMethod.invoke(null, json);
+ }
+
+ /**
+ * Load IOStatisticsSnapshot from a Hadoop filesystem.
+ * @param fs filesystem
+ * @param path path
+ * @return the loaded snapshot
+ * @throws UncheckedIOException Any IO exception.
+ * @throws UnsupportedOperationException if the IOStatistics classes were not found
+ */
+ public Serializable iostatisticsSnapshot_load(
+ FileSystem fs,
+ Path path) throws UncheckedIOException, UnsupportedOperationException {
+ checkIoStatisticsAvailable();
+ return iostatisticsSnapshotLoadMethod.invoke(null, fs, path);
+ }
+
+ /**
+ * Extract the IOStatistics from an object in a serializable form.
+ * @param source source object, may be null/not a statistics source/instance
+ * @return {@code IOStatisticsSnapshot} or null if the object is null/doesn't have statistics
+ * @throws UnsupportedOperationException if the IOStatistics classes were not found
+ */
+ public Serializable iostatisticsSnapshot_retrieve(@Nullable Object source)
+ throws UnsupportedOperationException {
+ checkIoStatisticsAvailable();
+ return iostatisticsSnapshotRetrieveMethod.invoke(null, source);
+ }
+
+ /**
+ * Save IOStatisticsSnapshot to a Hadoop filesystem as a JSON file.
+ * @param snapshot statistics
+ * @param fs filesystem
+ * @param path path
+ * @param overwrite should any existing file be overwritten?
+ * @throws UncheckedIOException Any IO exception.
+ * @throws UnsupportedOperationException if the IOStatistics classes were not found
+ */
+ public void iostatisticsSnapshot_save(
+ @Nullable Serializable snapshot,
+ FileSystem fs,
+ Path path,
+ boolean overwrite) throws UncheckedIOException, UnsupportedOperationException {
+
+ checkIoStatisticsAvailable();
+ iostatisticsSnapshotSaveMethod.invoke(null, snapshot, fs, path, overwrite);
+ }
+
+ /**
+ * Get the counters of an IOStatisticsSnapshot.
+ * @param source source of statistics.
+ * @return the map of counters.
+ */
+ public Map
+ * Classes in this package tagged as {@code @InterfaceAudience#Public} export
+ * methods to be loaded by reflection by other applications/libraries.
+ * Tests against these SHOULD use reflection themselves so as to guarantee
+ * stability of reflection-based access.
+ *
+ * Classes tagged as private/limited private are for support and testing.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+package org.apache.hadoop.io.wrappedio;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
\ No newline at end of file
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index f33f5dc4a3fe4..8025f53c2e1c1 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -2035,11 +2035,7 @@ public class Connection {
* Address to which the socket is connected to.
*/
private final InetAddress addr;
- /**
- * Client Host address from where the socket connection is being established to the Server.
- */
- private final String hostName;
-
+
IpcConnectionContextProto connectionContext;
String protocolName;
SaslServer saslServer;
@@ -2082,12 +2078,9 @@ public Connection(SocketChannel channel, long lastContact,
this.isOnAuxiliaryPort = isOnAuxiliaryPort;
if (addr == null) {
this.hostAddress = "*Unknown*";
- this.hostName = this.hostAddress;
} else {
// host IP address
this.hostAddress = addr.getHostAddress();
- // host name for the IP address
- this.hostName = addr.getHostName();
}
this.remotePort = socket.getPort();
this.responseQueue = new LinkedList
+ * If not set, the current thread's ClassLoader is used.
+ *
+ * @param value a ClassLoader
+ * @return this Builder for method chaining
+ */
+ public Builder loader(ClassLoader value) {
+ this.loader = value;
+ return this;
+ }
+
+ public Builder impl(String className, Class>... types) {
+ // don't do any work if an implementation has been found
+ if (ctor != null) {
+ return this;
+ }
+
+ try {
+ Class> targetClass = Class.forName(className, true, loader);
+ impl(targetClass, types);
+ } catch (NoClassDefFoundError | ClassNotFoundException e) {
+ // cannot load this implementation
+ problems.put(className, e);
+ }
+
+ return this;
+ }
+
+ public
+ * Allows callers to invoke the wrapped method with all Exceptions wrapped by
+ * RuntimeException, or with a single Exception catch block.
+ */
+ public static class UnboundMethod {
+
+ private final Method method;
+
+ private final String name;
+
+ private final int argLength;
+
+ UnboundMethod(Method method, String name) {
+ this.method = method;
+ this.name = name;
+ this.argLength =
+ (method == null || method.isVarArgs()) ? -1 : method.getParameterTypes().length;
+ }
+
+ @SuppressWarnings("unchecked")
+ public
+ * If not set, the current thread's ClassLoader is used.
+ * @param classLoader a ClassLoader
+ * @return this Builder for method chaining
+ */
+ public Builder loader(ClassLoader classLoader) {
+ this.loader = classLoader;
+ return this;
+ }
+
+ /**
+ * If no implementation has been found, adds a NOOP method.
+ *
+ * Note: calls to impl will not match after this method is called!
+ * @return this Builder for method chaining
+ */
+ public Builder orNoop() {
+ if (method == null) {
+ this.method = UnboundMethod.NOOP;
+ }
+ return this;
+ }
+
+ /**
+ * Checks for an implementation, first finding the given class by name.
+ * @param className name of a class
+ * @param methodName name of a method (different from constructor)
+ * @param argClasses argument classes for the method
+ * @return this Builder for method chaining
+ */
+ public Builder impl(String className, String methodName, Class>... argClasses) {
+ // don't do any work if an implementation has been found
+ if (method != null) {
+ return this;
+ }
+
+ try {
+ Class> targetClass = Class.forName(className, true, loader);
+ impl(targetClass, methodName, argClasses);
+ } catch (ClassNotFoundException e) {
+ // class not found on supplied classloader.
+ LOG.debug("failed to load class {}", className, e);
+ }
+ return this;
+ }
+
+ /**
+ * Checks for an implementation, first finding the given class by name.
+ *
+ * The name passed to the constructor is the method name used.
+ * @param className name of a class
+ * @param argClasses argument classes for the method
+ * @return this Builder for method chaining
+ */
+ public Builder impl(String className, Class>... argClasses) {
+ impl(className, name, argClasses);
+ return this;
+ }
+
+ /**
+ * Checks for a method implementation.
+ * @param targetClass the class to check for an implementation
+ * @param methodName name of a method (different from constructor)
+ * @param argClasses argument classes for the method
+ * @return this Builder for method chaining
+ */
+ public Builder impl(Class> targetClass, String methodName, Class>... argClasses) {
+ // don't do any work if an implementation has been found
+ if (method != null) {
+ return this;
+ }
+
+ try {
+ this.method = new UnboundMethod(targetClass.getMethod(methodName, argClasses), name);
+ } catch (NoSuchMethodException e) {
+ // not the right implementation
+ LOG.debug("failed to load method {} from class {}", methodName, targetClass, e);
+ }
+ return this;
+ }
+
+ /**
+ * Checks for a method implementation.
+ *
+ * The name passed to the constructor is the method name used.
+ * @param targetClass the class to check for an implementation
+ * @param argClasses argument classes for the method
+ * @return this Builder for method chaining
+ */
+ public Builder impl(Class> targetClass, Class>... argClasses) {
+ impl(targetClass, name, argClasses);
+ return this;
+ }
+
+ public Builder ctorImpl(Class> targetClass, Class>... argClasses) {
+ // don't do any work if an implementation has been found
+ if (method != null) {
+ return this;
+ }
+
+ try {
+ this.method = new DynConstructors.Builder().impl(targetClass, argClasses).buildChecked();
+ } catch (NoSuchMethodException e) {
+ // not the right implementation
+ LOG.debug("failed to load constructor arity {} from class {}", argClasses.length,
+ targetClass, e);
+ }
+ return this;
+ }
+
+ public Builder ctorImpl(String className, Class>... argClasses) {
+ // don't do any work if an implementation has been found
+ if (method != null) {
+ return this;
+ }
+
+ try {
+ this.method = new DynConstructors.Builder().impl(className, argClasses).buildChecked();
+ } catch (NoSuchMethodException e) {
+ // not the right implementation
+ LOG.debug("failed to load constructor arity {} from class {}", argClasses.length, className,
+ e);
+ }
+ return this;
+ }
+
+ /**
+ * Checks for an implementation, first finding the given class by name.
+ * @param className name of a class
+ * @param methodName name of a method (different from constructor)
+ * @param argClasses argument classes for the method
+ * @return this Builder for method chaining
+ */
+ public Builder hiddenImpl(String className, String methodName, Class>... argClasses) {
+ // don't do any work if an implementation has been found
+ if (method != null) {
+ return this;
+ }
+
+ try {
+ Class> targetClass = Class.forName(className, true, loader);
+ hiddenImpl(targetClass, methodName, argClasses);
+ } catch (ClassNotFoundException e) {
+ // class not found on supplied classloader.
+ LOG.debug("failed to load class {}", className, e);
+ }
+ return this;
+ }
+
+ /**
+ * Checks for an implementation, first finding the given class by name.
+ *
+ * The name passed to the constructor is the method name used.
+ * @param className name of a class
+ * @param argClasses argument classes for the method
+ * @return this Builder for method chaining
+ */
+ public Builder hiddenImpl(String className, Class>... argClasses) {
+ hiddenImpl(className, name, argClasses);
+ return this;
+ }
+
+ /**
+ * Checks for a method implementation.
+ * @param targetClass the class to check for an implementation
+ * @param methodName name of a method (different from constructor)
+ * @param argClasses argument classes for the method
+ * @return this Builder for method chaining
+ */
+ public Builder hiddenImpl(Class> targetClass, String methodName, Class>... argClasses) {
+ // don't do any work if an implementation has been found
+ if (method != null) {
+ return this;
+ }
+
+ try {
+ Method hidden = targetClass.getDeclaredMethod(methodName, argClasses);
+ AccessController.doPrivileged(new MakeAccessible(hidden));
+ this.method = new UnboundMethod(hidden, name);
+ } catch (SecurityException | NoSuchMethodException e) {
+ // unusable or not the right implementation
+ LOG.debug("failed to load method {} from class {}", methodName, targetClass, e);
+ }
+ return this;
+ }
+
+ /**
+ * Checks for a method implementation.
+ *
+ * The name passed to the constructor is the method name used.
+ * @param targetClass the class to check for an implementation
+ * @param argClasses argument classes for the method
+ * @return this Builder for method chaining
+ */
+ public Builder hiddenImpl(Class> targetClass, Class>... argClasses) {
+ hiddenImpl(targetClass, name, argClasses);
+ return this;
+ }
+
+ /**
+ * Returns the first valid implementation as a UnboundMethod or throws a
+ * NoSuchMethodException if there is none.
+ * @return a {@link UnboundMethod} with a valid implementation
+ * @throws NoSuchMethodException if no implementation was found
+ */
+ public UnboundMethod buildChecked() throws NoSuchMethodException {
+ if (method != null) {
+ return method;
+ } else {
+ throw new NoSuchMethodException("Cannot find method: " + name);
+ }
+ }
+
+ /**
+ * Returns the first valid implementation as a UnboundMethod or throws a
+ * RuntimeError if there is none.
+ * @return a {@link UnboundMethod} with a valid implementation
+ * @throws RuntimeException if no implementation was found
+ */
+ public UnboundMethod build() {
+ if (method != null) {
+ return method;
+ } else {
+ throw new RuntimeException("Cannot find method: " + name);
+ }
+ }
+
+ /**
+ * Returns the first valid implementation as a BoundMethod or throws a
+ * NoSuchMethodException if there is none.
+ * @param receiver an Object to receive the method invocation
+ * @return a {@link BoundMethod} with a valid implementation and receiver
+ * @throws IllegalStateException if the method is static
+ * @throws IllegalArgumentException if the receiver's class is incompatible
+ * @throws NoSuchMethodException if no implementation was found
+ */
+ public BoundMethod buildChecked(Object receiver) throws NoSuchMethodException {
+ return buildChecked().bind(receiver);
+ }
+
+ /**
+ * Returns the first valid implementation as a BoundMethod or throws a
+ * RuntimeError if there is none.
+ * @param receiver an Object to receive the method invocation
+ * @return a {@link BoundMethod} with a valid implementation and receiver
+ * @throws IllegalStateException if the method is static
+ * @throws IllegalArgumentException if the receiver's class is incompatible
+ * @throws RuntimeException if no implementation was found
+ */
+ public BoundMethod build(Object receiver) {
+ return build().bind(receiver);
+ }
+
+ /**
+ * Returns the first valid implementation as a StaticMethod or throws a
+ * NoSuchMethodException if there is none.
+ * @return a {@link StaticMethod} with a valid implementation
+ * @throws IllegalStateException if the method is not static
+ * @throws NoSuchMethodException if no implementation was found
+ */
+ public StaticMethod buildStaticChecked() throws NoSuchMethodException {
+ return buildChecked().asStatic();
+ }
+
+ /**
+ * Returns the first valid implementation as a StaticMethod or throws a
+ * RuntimeException if there is none.
+ * @return a {@link StaticMethod} with a valid implementation
+ * @throws IllegalStateException if the method is not static
+ * @throws RuntimeException if no implementation was found
+ */
+ public StaticMethod buildStatic() {
+ return build().asStatic();
+ }
+ }
+
+ private static final class MakeAccessible implements PrivilegedAction
+ * Important: any {@code CancellationException} raised by the future
+ * is rethrown unchanged. This has been the implicit behavior since
+ * this code was first written, and is now explicitly documented.
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
public final class FutureIO {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(FutureIO.class);
+
private FutureIO() {
}
@@ -64,17 +79,28 @@ private FutureIO() {
* Any exception generated in the future is
* extracted and rethrown.
*
+ * This method blocks until all futures in the collection have completed.
+ * If any future throws an exception during its execution, this method
+ * extracts and rethrows that exception.
+ *
+ * This method blocks until all futures in the collection have completed or
+ * the timeout expires, whichever happens first. If any future throws an
+ * exception during its execution, this method extracts and rethrows that exception.
+ * @param collection collection of futures to be evaluated
+ * @param duration timeout duration
+ * @param
+ * This method blocks until all futures in the collection have completed or
+ * the timeout expires, whichever happens first.
+ * All exceptions thrown by the futures are ignored. as is any TimeoutException.
+ * @param collection collection of futures to be evaluated
+ * @param interruptIfRunning should the cancel interrupt any active futures?
+ * @param duration total timeout duration
+ * @param
+ * This {@code constructor} is only invoked on demand
+ * when the reference is first needed,
+ * after which the same value is returned.
+ * This value MUST NOT be null.
+ *
+ * Implements {@link CallableRaisingIOE} and {@code java.util.function.Supplier}.
+ * An instance of this can therefore be used in a functional IO chain.
+ * As such, it can act as a delayed and caching invocator of a function:
+ * the supplier passed in is only ever invoked once, and only when requested.
+ * @param
+ * Invoke {@link #eval()} and convert IOEs to
+ * UncheckedIOException.
+ *
+ * This is the {@code Supplier.get()} implementation, which allows
+ * this class to passed into anything taking a supplier.
+ * @return the value
+ * @throws UncheckedIOException if the constructor raised an IOException.
+ */
+ @Override
+ public final T get() throws UncheckedIOException {
+ return uncheckIOExceptions(this::eval);
+ }
+
+ /**
+ * Is the reference set?
+ * @return true if the reference has been set.
+ */
+ public final boolean isSet() {
+ return reference.get() != null;
+ }
+
+ @Override
+ public String toString() {
+ return "LazyAtomicReference{" +
+ "reference=" + reference + '}';
+ }
+
+
+ /**
+ * Create from a supplier.
+ * This is not a constructor to avoid ambiguity when a lambda-expression is
+ * passed in.
+ * @param supplier supplier implementation.
+ * @return a lazy reference.
+ * @param
+ *
+ * Implication #2 means there is no performance penalty from use of FileSystem operations which
+ * return lists or iterators of {@code LocatedFileStatus}.
+ */
+ public static final String VIRTUAL_BLOCK_LOCATIONS = "fs.capability.virtual.block.locations";
+
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java
index cca6c28da11a3..fc36b5bd6d657 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataInputStream.java
@@ -262,6 +262,14 @@ public int read(long position, ByteBuffer buf) throws IOException {
"by " + in.getClass().getCanonicalName());
}
+ /**
+ * Delegate to the underlying stream.
+ * @param position position within file
+ * @param buf the ByteBuffer to receive the results of the read operation.
+ * @throws IOException on a failure from the nested stream.
+ * @throws UnsupportedOperationException if the inner stream does not
+ * support this operation.
+ */
@Override
public void readFully(long position, ByteBuffer buf) throws IOException {
if (in instanceof ByteBufferPositionedReadable) {
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
index 0213772ab6a5c..38ec611451750 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FileSystem.java
@@ -56,6 +56,7 @@
import org.apache.hadoop.fs.Options.HandleOpt;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.fs.impl.AbstractFSBuilderImpl;
+import org.apache.hadoop.fs.impl.DefaultBulkDeleteOperation;
import org.apache.hadoop.fs.impl.FutureDataInputStreamBuilderImpl;
import org.apache.hadoop.fs.impl.OpenFileParameters;
import org.apache.hadoop.fs.permission.AclEntry;
@@ -169,7 +170,8 @@
@InterfaceAudience.Public
@InterfaceStability.Stable
public abstract class FileSystem extends Configured
- implements Closeable, DelegationTokenIssuer, PathCapabilities {
+ implements Closeable, DelegationTokenIssuer,
+ PathCapabilities, BulkDeleteSource {
public static final String FS_DEFAULT_NAME_KEY =
CommonConfigurationKeys.FS_DEFAULT_NAME_KEY;
public static final String DEFAULT_FS =
@@ -3485,12 +3487,16 @@ public Collection
+ * (position, buffer, buffer-offset, length): Void
+ * position:= the position within the file to read data.
+ * buffer := a buffer to read fully `length` bytes into.
+ * buffer-offset := the offset within the buffer to write data
+ * length := the number of bytes to read.
+ *
+ * The passed in function MUST block until the required length of
+ * data is read, or an exception is thrown.
+ * @param range range to read
* @param buffer buffer to fill.
* @param operation operation to use for reading data.
* @throws IOException any IOE.
*/
- public static void readInDirectBuffer(int length,
- ByteBuffer buffer,
- Function4RaisingIOE
+ *
+ * @param fs filesystem
+ * @param base path to delete under.
+ * @param paths list of paths which must be absolute and under the base path.
+ * @return a list of all the paths which couldn't be deleted for a reason other
+ * than "not found" and any associated error message.
+ * @throws UnsupportedOperationException bulk delete under that path is not supported.
+ * @throws UncheckedIOException if an IOE was raised.
+ * @throws IllegalArgumentException if a path argument is invalid.
+ */
+ public static List
+ *
+ * @param fs filesystem
+ * @param base path to delete under.
+ * @param paths list of paths which must be absolute and under the base path.
+ * @return a list of all the paths which couldn't be deleted for a reason other than
+ * "not found" and any associated error message.
+ * @throws UnsupportedOperationException bulk delete under that path is not supported.
+ * @throws IllegalArgumentException if a path argument is invalid.
+ * @throws IOException IO problems including networking, authentication and more.
+ */
+ public ListString, trimming leading and
+ * trailing whitespace on each value after splitting by comma and new line separator.
+ *
+ * @param str a comma separated String with values, may be null
+ * @return a Map of String keys and values, empty
+ * Collection if null String input.
+ */
+ public static MapString, trimming
* leading and trailing whitespace on each value.
@@ -497,6 +541,22 @@ public static String[] getTrimmedStrings(String str){
return str.trim().split("\\s*[,\n]\\s*");
}
+ /**
+ * Splits "=" separated value String, trimming
+ * leading and trailing whitespace on each value.
+ *
+ * @param str an "=" separated String with values,
+ * may be null
+ * @return an array of String values, empty array if null String
+ * input
+ */
+ public static String[] getTrimmedStringsSplitByEquals(String str){
+ if (null == str || str.trim().isEmpty()) {
+ return emptyStringArray;
+ }
+ return str.trim().split("\\s*=\\s*");
+ }
+
final public static String[] emptyStringArray = {};
final public static char COMMA = ',';
final public static String COMMA_STR = ",";
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/dynamic/BindingUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/dynamic/BindingUtils.java
new file mode 100644
index 0000000000000..47a2deed41dcb
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/dynamic/BindingUtils.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.util.dynamic;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.util.function.Supplier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import static org.apache.hadoop.util.Preconditions.checkState;
+
+/**
+ * Utility methods to assist binding to Hadoop APIs through reflection.
+ * Source: {@code org.apache.parquet.hadoop.util.wrapped.io.BindingUtils}.
+ */
+@InterfaceAudience.LimitedPrivate("testing")
+@InterfaceStability.Unstable
+public final class BindingUtils {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BindingUtils.class);
+
+ private BindingUtils() {}
+
+ /**
+ * Load a class by name.
+ * @param className classname
+ * @return the class or null if it could not be loaded.
+ */
+ public static Class> loadClass(String className) {
+ try {
+ return Class.forName(className);
+ } catch (ClassNotFoundException e) {
+ LOG.debug("No class {}", className, e);
+ return null;
+ }
+ }
+
+ /**
+ * Load a class by name.
+ * @param className classname
+ * @return the class.
+ * @throws RuntimeException if the class was not found.
+ */
+ public static Class> loadClassSafely(String className) {
+ try {
+ return Class.forName(className);
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Load a class by name.
+ * @param cl classloader to use.
+ * @param className classname
+ * @return the class or null if it could not be loaded.
+ */
+ public static Class> loadClass(ClassLoader cl, String className) {
+ try {
+ return cl.loadClass(className);
+ } catch (ClassNotFoundException e) {
+ LOG.debug("No class {}", className, e);
+ return null;
+ }
+ }
+
+
+ /**
+ * Get an invocation from the source class, which will be unavailable() if
+ * the class is null or the method isn't found.
+ *
+ * @param
+ * Default implementation fails with EOFException
* while reading the ranges. Some implementation like s3, checksum fs fail fast
* as they already have the file length calculated.
+ * The contract option {@link ContractOptions#VECTOR_IO_EARLY_EOF_CHECK} is used
+ * to determine which check to perform.
*/
@Test
public void testEOFRanges() throws Exception {
- FileSystem fs = getFileSystem();
- List
+ * This is a contract test; the base class is bonded to the local fs;
+ * it is possible for other stores to implement themselves.
+ * All classes/constants are referenced here because they are part of the reflected
+ * API. If anything changes, application code breaks.
+ */
+public class TestWrappedIO extends AbstractFSContractTestBase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestWrappedIO.class);
+
+ /**
+ * Dynamic wrapped IO.
+ */
+ private DynamicWrappedIO io;
+
+ /**
+ * Dynamically Wrapped IO statistics.
+ */
+ private DynamicWrappedStatistics statistics;
+
+ @Before
+ public void setup() throws Exception {
+ super.setup();
+
+ io = new DynamicWrappedIO();
+ statistics = new DynamicWrappedStatistics();
+ statistics.iostatisticsContext_reset();
+ }
+
+ @Override
+ public void teardown() throws Exception {
+ super.teardown();
+ logIOStatisticsContext();
+ }
+
+ @Override
+ protected AbstractFSContract createContract(final Configuration conf) {
+ return new LocalFSContract(conf);
+ }
+
+ /**
+ * Verify the {@link #clazz(String)} method raises an assertion
+ * if the class isn't found.
+ */
+ @Test
+ public void testClassResolution() throws Throwable {
+ intercept(AssertionError.class, () -> clazz("no.such.class"));
+ }
+
+ @Test
+ public void testAllMethodsFound() throws Throwable {
+ io.requireAllMethodsAvailable();
+ }
+
+ /**
+ * Test the openFile operation.
+ * Lots of calls are made to read the same file to save on setup/teardown
+ * overhead and to allow for some statistics collection.
+ */
+ @Test
+ public void testOpenFileOperations() throws Throwable {
+ Path path = path("testOpenFileOperations");
+ final int len = 100;
+ final byte[] data = dataset(len, 'a', 26);
+ final FileSystem fs = getFileSystem();
+ // create the file and any statistics from it.
+ final Serializable iostats = statistics.iostatisticsSnapshot_create(
+ file(fs, path, true, data));
+ final FileStatus st = fs.getFileStatus(path);
+ final boolean ioStatisticsContextCapability;
+
+ describe("reading file " + path);
+ try (FSDataInputStream in = DynamicWrappedIO.openFile(fs,
+ fs.getFileStatus(path),
+ DynamicWrappedIO.PARQUET_READ_POLICIES)) {
+ Assertions.assertThat(in.read())
+ .describedAs("first byte")
+ .isEqualTo('a');
+ ioStatisticsContextCapability = supportsIOStatisticsContext(in);
+ if (ioStatisticsContextCapability) {
+ LOG.info("Stream has IOStatisticsContext support: {}", in);
+ } else {
+ LOG.info("Stream has no IOStatisticsContext support: {}", in);
+ }
+ Assertions.assertThat(ioStatisticsContextCapability)
+ .describedAs("Retrieved stream capability %s from %s",
+ IOSTATISTICS_CONTEXT, in)
+ .isEqualTo(WrappedIO.streamCapabilities_hasCapability(in, IOSTATISTICS_CONTEXT));
+ Assertions.assertThat(ioStatisticsContextCapability)
+ .describedAs("Actual stream capability %s from %s",
+ IOSTATISTICS_CONTEXT, in)
+ .isEqualTo(in.hasCapability(IOSTATISTICS_CONTEXT));
+ retrieveAndAggregate(iostats, in);
+ }
+
+ // open with a status
+ try (FSDataInputStream s = openFile(path, null, st, null, null)) {
+ s.seek(1);
+ s.read();
+
+ // and do a small amount of statistics collection
+ retrieveAndAggregate(iostats, s);
+ }
+
+ // open with a length and random IO passed in the map
+ try (FSDataInputStream s = openFile(path, null, null,
+ (long) len,
+ map(pair(FS_OPTION_OPENFILE_READ_POLICY, "random")))) {
+ s.seek(len - 10);
+ s.read();
+ retrieveAndAggregate(iostats, s);
+ }
+
+ // now open a file with a length option greater than the file length
+
+ // this string is used in exception logging to report where in the
+ // sequence an IOE was raised.
+ String validationPoint = "openfile call";
+
+ // open with a length and random IO passed in via the map
+ try (FSDataInputStream s = openFile(path, null, null,
+ null,
+ map(pair(FS_OPTION_OPENFILE_LENGTH, len * 2),
+ pair(FS_OPTION_OPENFILE_READ_POLICY, "random")))) {
+
+ // fails if the file length was determined and fixed in open,
+ // and the stream doesn't permit seek() beyond the file length.
+ validationPoint = "seek()";
+ s.seek(len + 10);
+
+ validationPoint = "readFully()";
+
+ // readFully must fail.
+ s.readFully(len + 10, new byte[10], 0, 10);
+ Assertions.fail("Expected an EOFException but readFully from %s", s);
+ } catch (EOFException expected) {
+ // expected
+ LOG.info("EOF successfully raised, validation point: {}", validationPoint);
+ LOG.debug("stack", expected);
+ }
+
+ // if we get this far, do a bulk delete
+ Assertions.assertThat(io.pathCapabilities_hasPathCapability(fs, path, BULK_DELETE))
+ .describedAs("Path capability %s", BULK_DELETE)
+ .isTrue();
+
+ // first assert page size was picked up
+ Assertions.assertThat(io.bulkDelete_pageSize(fs, path))
+ .describedAs("bulkDelete_pageSize for %s", path)
+ .isGreaterThanOrEqualTo(1);
+
+ // then do the delete.
+ // pass in the parent path for the bulk delete to avoid HADOOP-19196
+ Assertions
+ .assertThat(io.bulkDelete_delete(fs, path.getParent(), Lists.newArrayList(path)))
+ .describedAs("outcome of bulk delete")
+ .isEmpty();
+ }
+
+ @Test
+ public void testOpenFileNotFound() throws Throwable {
+ Path path = path("testOpenFileNotFound");
+
+ intercept(FileNotFoundException.class, () ->
+ io.fileSystem_openFile(getFileSystem(), path, null, null, null, null));
+ }
+
+ /**
+ * Test ByteBufferPositionedReadable.
+ * This is implemented by HDFS but not much else; this test skips if the stream
+ * doesn't support it.
+ */
+ @Test
+ public void testByteBufferPositionedReadable() throws Throwable {
+ Path path = path("testByteBufferPositionedReadable");
+ final int len = 100;
+ final byte[] data = dataset(len, 'a', 26);
+ final FileSystem fs = getFileSystem();
+ file(fs, path, true, data);
+
+ describe("reading file " + path);
+ try (FSDataInputStream in = openFile(path, "random", null, (long) len, null)) {
+ // skip rest of test if API is not found.
+ if (io.byteBufferPositionedReadable_readFullyAvailable(in)) {
+
+ LOG.info("ByteBufferPositionedReadable is available in {}", in);
+ ByteBuffer buffer = allocate(len);
+ io.byteBufferPositionedReadable_readFully(in, 0, buffer);
+ Assertions.assertThat(buffer.array())
+ .describedAs("Full buffer read of %s", in)
+ .isEqualTo(data);
+
+
+ // read from offset (verifies the offset is passed in)
+ final int offset = 10;
+ final int range = len - offset;
+ buffer = allocate(range);
+ io.byteBufferPositionedReadable_readFully(in, offset, buffer);
+ byte[] byteArray = new byte[range];
+ in.readFully(offset, byteArray);
+ Assertions.assertThat(buffer.array())
+ .describedAs("Offset buffer read of %s", in)
+ .isEqualTo(byteArray);
+
+ // now try to read past the EOF
+ // first verify the stream rejects this call directly
+ intercept(EOFException.class, () ->
+ in.readFully(len + 1, allocate(len)));
+
+ // then do the same through the wrapped API
+ intercept(EOFException.class, () ->
+ io.byteBufferPositionedReadable_readFully(in, len + 1, allocate(len)));
+ } else {
+ LOG.info("ByteBufferPositionedReadable is not available in {}", in);
+
+ // expect failures here
+ intercept(UnsupportedOperationException.class, () ->
+ io.byteBufferPositionedReadable_readFully(in, 0, allocate(len)));
+ }
+ }
+ }
+
+ @Test
+ public void testFilesystemIOStatistics() throws Throwable {
+
+ final FileSystem fs = getFileSystem();
+ final Serializable iostats = statistics.iostatisticsSnapshot_retrieve(fs);
+ if (iostats != null) {
+ final String status = statistics.iostatisticsSnapshot_toJsonString(iostats);
+ final Serializable roundTripped = statistics.iostatisticsSnapshot_fromJsonString(
+ status);
+
+ final Path path = methodPath();
+ statistics.iostatisticsSnapshot_save(roundTripped, fs, path, true);
+ final Serializable loaded = statistics.iostatisticsSnapshot_load(fs, path);
+
+ Assertions.assertThat(loaded)
+ .describedAs("loaded statistics from %s", path)
+ .isNotNull()
+ .satisfies(statistics::isIOStatisticsSnapshot);
+ LOG.info("loaded statistics {}",
+ statistics.iostatistics_toPrettyString(loaded));
+ }
+
+ }
+
+ /**
+ * Retrieve any IOStatistics from a class, and aggregate it to the
+ * existing IOStatistics.
+ * @param iostats statistics to update
+ * @param object statistics source
+ */
+ private void retrieveAndAggregate(final Serializable iostats, final Object object) {
+ statistics.iostatisticsSnapshot_aggregate(iostats,
+ statistics.iostatisticsSnapshot_retrieve(object));
+ }
+
+ /**
+ * Log IOStatisticsContext if enabled.
+ */
+ private void logIOStatisticsContext() {
+ // context IOStats
+ if (statistics.iostatisticsContext_enabled()) {
+ final Serializable iostats = statistics.iostatisticsContext_snapshot();
+ LOG.info("Context: {}",
+ toPrettyString(iostats));
+ } else {
+ LOG.info("IOStatisticsContext disabled");
+ }
+ }
+
+ private String toPrettyString(final Object iostats) {
+ return statistics.iostatistics_toPrettyString(iostats);
+ }
+
+ /**
+ * Does the object update the thread-local IOStatisticsContext?
+ * @param o object to cast to StreamCapabilities and probe for the capability.
+ * @return true if the methods were found, the interface implemented and the probe successful.
+ */
+ private boolean supportsIOStatisticsContext(final Object o) {
+ return io.streamCapabilities_hasCapability(o, IOSTATISTICS_CONTEXT);
+ }
+
+ /**
+ * Open a file through dynamic invocation of {@link FileSystem#openFile(Path)}.
+ * @param path path
+ * @param policy read policy
+ * @param status optional file status
+ * @param length file length or null
+ * @param options nullable map of other options
+ * @return stream of the opened file
+ */
+ private FSDataInputStream openFile(
+ final Path path,
+ final String policy,
+ final FileStatus status,
+ final Long length,
+ final Map
+ * This mixes direct use of the API to generate statistics data for
+ * the reflection accessors to retrieve and manipulate.
+ */
+public class TestWrappedStatistics extends AbstractHadoopTestBase {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestWrappedIO.class);
+
+ /**
+ * Stub Serializable.
+ */
+ private static final Serializable SERIALIZABLE = new Serializable() {};
+
+ /**
+ * Dynamically Wrapped IO statistics.
+ */
+ private final DynamicWrappedStatistics statistics = new DynamicWrappedStatistics();
+
+ /**
+ * Local FS.
+ */
+ private LocalFileSystem local;
+
+ /**
+ * Path to temporary file.
+ */
+ private Path jsonPath;
+
+ @Before
+ public void setUp() throws Exception {
+ String testDataDir = new FileSystemTestHelper().getTestRootDir();
+ File tempDir = new File(testDataDir);
+ local = FileSystem.getLocal(new Configuration());
+ // Temporary file.
+ File jsonFile = new File(tempDir, "snapshot.json");
+ jsonPath = new Path(jsonFile.toURI());
+ }
+
+ /**
+ * The class must load, with all method groups available.
+ */
+ @Test
+ public void testLoaded() throws Throwable {
+ Assertions.assertThat(statistics.ioStatisticsAvailable())
+ .describedAs("IOStatistics class must be available")
+ .isTrue();
+ Assertions.assertThat(statistics.ioStatisticsContextAvailable())
+ .describedAs("IOStatisticsContext must be available")
+ .isTrue();
+ }
+
+ @Test
+ public void testCreateEmptySnapshot() throws Throwable {
+ Assertions.assertThat(statistics.iostatisticsSnapshot_create())
+ .describedAs("iostatisticsSnapshot_create()")
+ .isInstanceOf(IOStatisticsSnapshot.class)
+ .satisfies(statistics::isIOStatisticsSnapshot)
+ .satisfies(statistics::isIOStatistics);
+ }
+
+ @Test
+ public void testCreateNullSource() throws Throwable {
+ Assertions.assertThat(statistics.iostatisticsSnapshot_create(null))
+ .describedAs("iostatisticsSnapshot_create(null)")
+ .isInstanceOf(IOStatisticsSnapshot.class);
+ }
+
+ @Test
+ public void testCreateOther() throws Throwable {
+ Assertions.assertThat(statistics.iostatisticsSnapshot_create(null))
+ .describedAs("iostatisticsSnapshot_create(null)")
+ .isInstanceOf(IOStatisticsSnapshot.class);
+ }
+
+ @Test
+ public void testCreateNonIOStatsSource() throws Throwable {
+ intercept(ClassCastException.class, () ->
+ statistics.iostatisticsSnapshot_create("hello"));
+ }
+
+ @Test
+ public void testRetrieveNullSource() throws Throwable {
+ Assertions.assertThat(statistics.iostatisticsSnapshot_retrieve(null))
+ .describedAs("iostatisticsSnapshot_retrieve(null)")
+ .isNull();
+ }
+
+ @Test
+ public void testRetrieveNonIOStatsSource() throws Throwable {
+ Assertions.assertThat(statistics.iostatisticsSnapshot_retrieve(this))
+ .describedAs("iostatisticsSnapshot_retrieve(this)")
+ .isNull();
+ }
+
+ /**
+ * Assert handling of json serialization for null value.
+ */
+ @Test
+ public void testNullInstanceToJson() throws Throwable {
+ intercept(IllegalArgumentException.class, () -> toJsonString(null));
+ }
+
+ /**
+ * Assert handling of json serialization for wrong value.
+ */
+ @Test
+ public void testWrongSerializableTypeToJson() throws Throwable {
+ intercept(IllegalArgumentException.class, () -> toJsonString(SERIALIZABLE));
+ }
+
+ /**
+ * Try to aggregate into the wrong type.
+ */
+ @Test
+ public void testAggregateWrongSerializable() throws Throwable {
+ intercept(IllegalArgumentException.class, () ->
+ statistics.iostatisticsSnapshot_aggregate(SERIALIZABLE,
+ statistics.iostatisticsContext_getCurrent()));
+ }
+
+ /**
+ * Try to save the wrong type.
+ */
+ @Test
+ public void testSaveWrongSerializable() throws Throwable {
+ intercept(IllegalArgumentException.class, () ->
+ statistics.iostatisticsSnapshot_save(SERIALIZABLE, local, jsonPath, true));
+ }
+
+ /**
+ * Test all the IOStatisticsContext operations, including
+ * JSON round trip of the statistics.
+ */
+ @Test
+ public void testIOStatisticsContextMethods() {
+
+ Assertions.assertThat(statistics.ioStatisticsContextAvailable())
+ .describedAs("ioStatisticsContextAvailable() of %s", statistics)
+ .isTrue();
+ Assertions.assertThat(statistics.iostatisticsContext_enabled())
+ .describedAs("iostatisticsContext_enabled() of %s", statistics)
+ .isTrue();
+
+ // get the current context, validate it
+ final Object current = statistics.iostatisticsContext_getCurrent();
+ Assertions.assertThat(current)
+ .describedAs("IOStatisticsContext")
+ .isInstanceOf(IOStatisticsContext.class)
+ .satisfies(statistics::isIOStatisticsSource);
+
+ // take a snapshot
+ final Serializable snapshot = statistics.iostatisticsContext_snapshot();
+ Assertions.assertThat(snapshot)
+ .satisfies(statistics::isIOStatisticsSnapshot);
+
+ // use the retrieve API to create a snapshot from the IOStatisticsSource interface
+ final Serializable retrieved = statistics.iostatisticsSnapshot_retrieve(current);
+ assertJsonEqual(retrieved, snapshot);
+
+ // to/from JSON
+ final String json = toJsonString(snapshot);
+ LOG.info("Serialized to json {}", json);
+ final Serializable snap2 = statistics.iostatisticsSnapshot_fromJsonString(json);
+ assertJsonEqual(snap2, snapshot);
+
+ // get the values
+ statistics.iostatistics_counters(snapshot);
+ statistics.iostatistics_gauges(snapshot);
+ statistics.iostatistics_minimums(snapshot);
+ statistics.iostatistics_maximums(snapshot);
+ statistics.iostatistics_means(snapshot);
+
+ // set to null
+ statistics.iostatisticsContext_setThreadIOStatisticsContext(null);
+
+ Assertions.assertThat(statistics.iostatisticsContext_getCurrent())
+ .describedAs("current IOStatisticsContext after resetting")
+ .isNotSameAs(current);
+
+ // then set to the "current" value
+ statistics.iostatisticsContext_setThreadIOStatisticsContext(current);
+
+ Assertions.assertThat(statistics.iostatisticsContext_getCurrent())
+ .describedAs("current IOStatisticsContext after resetting")
+ .isSameAs(current);
+
+ // and reset
+ statistics.iostatisticsContext_reset();
+
+ // now aggregate the retrieved stats into it.
+ Assertions.assertThat(statistics.iostatisticsContext_aggregate(retrieved))
+ .describedAs("iostatisticsContext_aggregate of %s", retrieved)
+ .isTrue();
+ }
+
+
+ /**
+ * Perform some real IOStatisticsContext operations.
+ */
+ @Test
+ public void testIOStatisticsContextInteraction() {
+ statistics.iostatisticsContext_reset();
+
+ // create a snapshot with a counter
+ final IOStatisticsSnapshot snapshot =
+ (IOStatisticsSnapshot) statistics.iostatisticsSnapshot_create();
+ snapshot.setCounter("c1", 10);
+
+ // aggregate twice
+ statistics.iostatisticsContext_aggregate(snapshot);
+ statistics.iostatisticsContext_aggregate(snapshot);
+
+ // take a snapshot
+ final IOStatisticsSnapshot snap2 =
+ (IOStatisticsSnapshot) statistics.iostatisticsContext_snapshot();
+
+ // assert the valuue
+ assertThatStatisticCounter(snap2, "c1")
+ .isEqualTo(20);
+ }
+
+ /**
+ * Expect that two IOStatisticsInstances serialized to exactly the same JSON.
+ * @param actual actual value.
+ * @param expected expected value
+ */
+ private void assertJsonEqual(Serializable actual, Serializable expected) {
+ Assertions.assertThat(toJsonString(actual))
+ .describedAs("JSON format string of %s", actual)
+ .isEqualTo(toJsonString(expected));
+ }
+
+ /**
+ * Convert a snapshot to a JSON string.
+ * @param snapshot IOStatisticsSnapshot
+ * @return a JSON serialization.
+ */
+ private String toJsonString(final Serializable snapshot) {
+ return statistics.iostatisticsSnapshot_toJsonString(snapshot);
+ }
+
+ /**
+ * Create an empty snapshot, save it then load back.
+ */
+ @Test
+ public void testLocalSaveOfEmptySnapshot() throws Throwable {
+ final Serializable snapshot = statistics.iostatisticsSnapshot_create();
+ statistics.iostatisticsSnapshot_save(snapshot, local, jsonPath, true);
+ final Serializable loaded = statistics.iostatisticsSnapshot_load(local, jsonPath);
+ LOG.info("loaded statistics {}",
+ statistics.iostatistics_toPrettyString(loaded));
+
+ // now try to save over the same path with overwrite false
+ intercept(UncheckedIOException.class, () ->
+ statistics.iostatisticsSnapshot_save(snapshot, local, jsonPath, false));
+
+ // after delete the load fails
+ local.delete(jsonPath, false);
+ intercept(UncheckedIOException.class, () ->
+ statistics.iostatisticsSnapshot_load(local, jsonPath));
+ }
+
+ /**
+ * Build up a complex statistic and assert extraction on it.
+ */
+ @Test
+ public void testStatisticExtraction() throws Throwable {
+
+ final IOStatisticsStore store = IOStatisticsBinding.iostatisticsStore()
+ .withCounters("c1", "c2")
+ .withGauges("g1")
+ .withDurationTracking("d1", "d2")
+ .build();
+
+ store.incrementCounter("c1");
+ store.setGauge("g1", 10);
+ trackDurationOfInvocation(store, "d1", () ->
+ sleep(20));
+ store.trackDuration("d1").close();
+
+ intercept(IOException.class, () ->
+ trackDurationOfInvocation(store, "d2", () -> {
+ sleep(10);
+ throw new IOException("generated");
+ }));
+
+ final Serializable snapshot = statistics.iostatisticsSnapshot_create(store);
+
+
+ // complex round trip
+ statistics.iostatisticsSnapshot_save(snapshot, local, jsonPath, true);
+ final Serializable loaded = statistics.iostatisticsSnapshot_load(local, jsonPath);
+ LOG.info("loaded statistics {}",
+ statistics.iostatistics_toPrettyString(loaded));
+ assertJsonEqual(loaded, snapshot);
+
+
+ // get the values
+ Assertions.assertThat(statistics.iostatistics_counters(loaded))
+ .containsOnlyKeys("c1", "c2",
+ "d1", "d1.failures",
+ "d2", "d2.failures")
+ .containsEntry("c1", 1L)
+ .containsEntry("d1", 2L)
+ .containsEntry("d2", 1L);
+ Assertions.assertThat(statistics.iostatistics_gauges(loaded))
+ .containsOnlyKeys("g1")
+ .containsEntry("g1", 10L);
+
+ final Map
+ * Example: expect deleting a nonexistent file to raise a
+ * {@code FileNotFoundException} with the {@code toString()} value
+ * containing the text {@code "missing"}.
+ *
+ * Example: expect deleting a nonexistent file to raise a
+ * {@code FileNotFoundException} with the {@code toString()} value
+ * containing the text {@code "missing"}.
+ *
+ * Derived from {@code org.apache.parquet.util} test suites.
+ */
+public class Concatenator {
+
+ public static class SomeCheckedException extends Exception {
+ }
+
+ private String sep = "";
+
+ public Concatenator() {
+ }
+
+ public Concatenator(String sep) {
+ this.sep = sep;
+ }
+
+ private Concatenator(char sep) {
+ this.sep = String.valueOf(sep);
+ }
+
+ public Concatenator(Exception e) throws Exception {
+ throw e;
+ }
+
+ public static Concatenator newConcatenator(String sep) {
+ return new Concatenator(sep);
+ }
+
+ private void setSeparator(String value) {
+ this.sep = value;
+ }
+
+ public String concat(String left, String right) {
+ return left + sep + right;
+ }
+
+ public String concat(String left, String middle, String right) {
+ return left + sep + middle + sep + right;
+ }
+
+ public String concat(Exception e) throws Exception {
+ throw e;
+ }
+
+ public String concat(String... strings) {
+ if (strings.length >= 1) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(strings[0]);
+ for (int i = 1; i < strings.length; i += 1) {
+ sb.append(sep);
+ sb.append(strings[i]);
+ }
+ return sb.toString();
+ }
+ return null;
+ }
+
+ public static String cat(String... strings) {
+ return new Concatenator().concat(strings);
+ }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/dynamic/TestDynConstructors.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/dynamic/TestDynConstructors.java
new file mode 100644
index 0000000000000..4d7a2db641703
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/dynamic/TestDynConstructors.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hadoop.util.dynamic;
+
+import java.util.concurrent.Callable;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.test.AbstractHadoopTestBase;
+
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Derived from {@code org.apache.parquet.util} test suites.
+ */
+public class TestDynConstructors extends AbstractHadoopTestBase {
+
+ @Test
+ public void testNoImplCall() throws Exception {
+ final DynConstructors.Builder builder = new DynConstructors.Builder();
+
+ intercept(NoSuchMethodException.class,
+ (Callable
+ *
+ * @param input input ranges
+ * @throws Exception failure
+ */
+ private void runAndValidateVectoredRead(List
+ * FileNotFoundException ioe = interceptAndValidateMessageContains(
+ * FileNotFoundException.class,
+ * "missing",
+ * "path should not be found",
+ * () -> {
+ * filesystem.delete(new Path("/missing"), false);
+ * });
+ *
+ *
+ * @param clazz class of exception; the raised exception must be this class
+ * or a subclass.
+ * @param contains strings which must be in the {@code toString()} value
+ * of the exception (order does not matter)
+ * @param eval expression to eval
+ * @param
+ * FileNotFoundException ioe = interceptAndValidateMessageContains(
+ * FileNotFoundException.class,
+ * "missing",
+ * "path should not be found",
+ * () -> {
+ * filesystem.delete(new Path("/missing"), false);
+ * });
+ *
+ *
+ * @param clazz class of exception; the raised exception must be this class
+ * or a subclass.
+ * @param contains strings which must be in the {@code toString()} value
+ * of the exception (order does not matter)
+ * @param message any message to include in exception/log messages
+ * @param eval expression to eval
+ * @param