diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakRefMetricsSource.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakRefMetricsSource.java new file mode 100644 index 0000000000000..146773857934c --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakRefMetricsSource.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.impl; + +import java.lang.ref.WeakReference; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsSource; + +import static java.util.Objects.requireNonNull; + +/** + * A weak referenced metrics source which avoids hanging on to large objects + * if somehow they don't get fully closed/cleaned up. + * The JVM may clean up all objects which are only weakly referenced whenever + * it does a GC, even if there is no memory pressure. + * To avoid these refs being removed, always keep a strong reference around + * somewhere. + */ +@InterfaceAudience.Private +public class WeakRefMetricsSource implements MetricsSource { + + /** + * Name to know when unregistering. + */ + private final String name; + + /** + * Underlying metrics source. + */ + private final WeakReference sourceWeakReference; + + /** + * Constructor. + * @param name Name to know when unregistering. + * @param source metrics source + */ + public WeakRefMetricsSource(final String name, final MetricsSource source) { + this.name = name; + this.sourceWeakReference = new WeakReference<>(requireNonNull(source)); + } + + /** + * If the weak reference is non null, update the metrics. + * @param collector to contain the resulting metrics snapshot + * @param all if true, return all metrics even if unchanged. + */ + @Override + public void getMetrics(final MetricsCollector collector, final boolean all) { + MetricsSource metricsSource = sourceWeakReference.get(); + if (metricsSource != null) { + metricsSource.getMetrics(collector, all); + } + } + + /** + * Name to know when unregistering. + * @return the name passed in during construction. + */ + public String getName() { + return name; + } + + /** + * Get the source, will be null if the reference has been GC'd + * @return the source reference + */ + public MetricsSource getSource() { + return sourceWeakReference.get(); + } + + @Override + public String toString() { + return "WeakRefMetricsSource{" + + "name='" + name + '\'' + + ", sourceWeakReference is " + + (sourceWeakReference.get() == null ? "unset" : "set") + + '}'; + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 3e6f2322d3b00..420a92788c82e 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -138,7 +138,6 @@ import org.apache.hadoop.fs.statistics.DurationTracker; import org.apache.hadoop.fs.statistics.DurationTrackerFactory; import org.apache.hadoop.fs.statistics.IOStatistics; -import org.apache.hadoop.fs.statistics.IOStatisticsLogging; import org.apache.hadoop.fs.statistics.IOStatisticsSource; import org.apache.hadoop.fs.statistics.IOStatisticsContext; import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore; @@ -459,6 +458,13 @@ public void initialize(URI name, Configuration originalConf) AuditSpan span = null; try { LOG.debug("Initializing S3AFileSystem for {}", bucket); + if (LOG.isTraceEnabled()) { + // log a full trace for deep diagnostics of where an object is created, + // for tracking down memory leak issues. + LOG.trace("Filesystem for {} created; fs.s3a.impl.disable.cache = {}", + name, originalConf.getBoolean("fs.s3a.impl.disable.cache", false), + new RuntimeException(super.toString())); + } // clone the configuration into one with propagated bucket options Configuration conf = propagateBucketOptions(originalConf, bucket); // HADOOP-17894. remove references to s3a stores in JCEKS credentials. @@ -3999,22 +4005,18 @@ public void close() throws IOException { } isClosed = true; LOG.debug("Filesystem {} is closed", uri); - if (getConf() != null) { - String iostatisticsLoggingLevel = - getConf().getTrimmed(IOSTATISTICS_LOGGING_LEVEL, - IOSTATISTICS_LOGGING_LEVEL_DEFAULT); - logIOStatisticsAtLevel(LOG, iostatisticsLoggingLevel, getIOStatistics()); - } try { super.close(); } finally { stopAllServices(); - } - // Log IOStatistics at debug. - if (LOG.isDebugEnabled()) { - // robust extract and convert to string - LOG.debug("Statistics for {}: {}", uri, - IOStatisticsLogging.ioStatisticsToPrettyString(getIOStatistics())); + // log IO statistics, including of any file deletion during + // superclass close + if (getConf() != null) { + String iostatisticsLoggingLevel = + getConf().getTrimmed(IOSTATISTICS_LOGGING_LEVEL, + IOSTATISTICS_LOGGING_LEVEL_DEFAULT); + logIOStatisticsAtLevel(LOG, iostatisticsLoggingLevel, getIOStatistics()); + } } } diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java index 46568ec2a8d3d..9d33efa9d01b0 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java @@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.impl.WeakRefMetricsSource; import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics; import org.apache.hadoop.fs.s3a.statistics.ChangeTrackerStatistics; import org.apache.hadoop.fs.s3a.statistics.CommitterStatistics; @@ -160,7 +161,10 @@ public class S3AInstrumentation implements Closeable, MetricsSource, private final DurationTrackerFactory durationTrackerFactory; - private String metricsSourceName; + /** + * Weak reference so there's no back reference to the instrumentation. + */ + private WeakRefMetricsSource metricsSourceReference; private final MetricsRegistry registry = new MetricsRegistry("s3aFileSystem").setContext(CONTEXT); @@ -233,19 +237,33 @@ public S3AInstrumentation(URI name) { new MetricDurationTrackerFactory()); } + /** + * Get the current metrics system; demand creating. + * @return a metric system, creating if need be. + */ @VisibleForTesting - public MetricsSystem getMetricsSystem() { + static MetricsSystem getMetricsSystem() { synchronized (METRICS_SYSTEM_LOCK) { if (metricsSystem == null) { metricsSystem = new MetricsSystemImpl(); metricsSystem.init(METRICS_SYSTEM_NAME); + LOG.debug("Metrics system inited {}", metricsSystem); } } return metricsSystem; } /** - * Register this instance as a metrics source. + * Does the instrumentation have a metrics system? + * @return true if the metrics system is present. + */ + @VisibleForTesting + static boolean hasMetricSystem() { + return metricsSystem != null; + } + + /** + * Register this instance as a metrics source via a weak reference. * @param name s3a:// URI for the associated FileSystem instance */ private void registerAsMetricsSource(URI name) { @@ -257,8 +275,9 @@ private void registerAsMetricsSource(URI name) { number = ++metricsSourceNameCounter; } String msName = METRICS_SOURCE_BASENAME + number; - metricsSourceName = msName + "-" + name.getHost(); - metricsSystem.register(metricsSourceName, "", this); + String metricsSourceName = msName + "-" + name.getHost(); + metricsSourceReference = new WeakRefMetricsSource(metricsSourceName, this); + metricsSystem.register(metricsSourceName, "", metricsSourceReference); } /** @@ -680,19 +699,42 @@ public void getMetrics(MetricsCollector collector, boolean all) { registry.snapshot(collector.addRecord(registry.info().name()), true); } + /** + * if registered with the metrics, return the + * name of the source. + * @return the name of the metrics, or null if this instance is not bonded. + */ + public String getMetricSourceName() { + return metricsSourceReference != null + ? metricsSourceReference.getName() + : null; + } + public void close() { - synchronized (METRICS_SYSTEM_LOCK) { - // it is critical to close each quantile, as they start a scheduled - // task in a shared thread pool. - throttleRateQuantile.stop(); - metricsSystem.unregisterSource(metricsSourceName); - metricsSourceActiveCounter--; - int activeSources = metricsSourceActiveCounter; - if (activeSources == 0) { - LOG.debug("Shutting down metrics publisher"); - metricsSystem.publishMetricsNow(); - metricsSystem.shutdown(); - metricsSystem = null; + if (metricsSourceReference != null) { + // get the name + String name = metricsSourceReference.getName(); + LOG.debug("Unregistering metrics for {}", name); + // then set to null so a second close() is a noop here. + metricsSourceReference = null; + synchronized (METRICS_SYSTEM_LOCK) { + // it is critical to close each quantile, as they start a scheduled + // task in a shared thread pool. + if (metricsSystem == null) { + LOG.debug("there is no metric system to unregister {} from", name); + return; + } + throttleRateQuantile.stop(); + + metricsSystem.unregisterSource(name); + metricsSourceActiveCounter--; + int activeSources = metricsSourceActiveCounter; + if (activeSources == 0) { + LOG.debug("Shutting down metrics publisher"); + metricsSystem.publishMetricsNow(); + metricsSystem.shutdown(); + metricsSystem = null; + } } } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClosedFS.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClosedFS.java index 79772ec9dadfe..327b0fab288f7 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClosedFS.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClosedFS.java @@ -103,4 +103,16 @@ public void testClosedOpen() throws Exception { () -> getFileSystem().open(path("to-open"))); } + @Test + public void testClosedInstrumentation() throws Exception { + // no metrics + Assertions.assertThat(S3AInstrumentation.hasMetricSystem()) + .describedAs("S3AInstrumentation.hasMetricSystem()") + .isFalse(); + + Assertions.assertThat(getFileSystem().getIOStatistics()) + .describedAs("iostatistics of %s", getFileSystem()) + .isNotNull(); + } + } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestInstrumentationLifecycle.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestInstrumentationLifecycle.java new file mode 100644 index 0000000000000..d8b9247008cc8 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestInstrumentationLifecycle.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import java.net.URI; + +import org.assertj.core.api.Assertions; +import org.junit.Test; + +import org.apache.hadoop.fs.impl.WeakRefMetricsSource; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.test.AbstractHadoopTestBase; + +import static org.apache.hadoop.fs.s3a.S3AInstrumentation.getMetricsSystem; +import static org.apache.hadoop.fs.s3a.Statistic.DIRECTORIES_CREATED; +import static org.assertj.core.api.Assertions.assertThat; + +/** + * Test the {@link S3AInstrumentation} lifecycle, in particular how + * it binds to hadoop metrics through a {@link WeakRefMetricsSource} + * and that it will deregister itself in {@link S3AInstrumentation#close()}. + */ +public class TestInstrumentationLifecycle extends AbstractHadoopTestBase { + + @Test + public void testDoubleClose() throws Throwable { + S3AInstrumentation instrumentation = new S3AInstrumentation(new URI("s3a://example/")); + + // the metric system is created in the constructor + assertThat(S3AInstrumentation.hasMetricSystem()) + .describedAs("S3AInstrumentation.hasMetricSystem()") + .isTrue(); + // ask for a metric + String metricName = DIRECTORIES_CREATED.getSymbol(); + assertThat(instrumentation.lookupMetric(metricName)) + .describedAs("lookupMetric(%s) while open", metricName) + .isNotNull(); + + MetricsSystem activeMetrics = getMetricsSystem(); + final String metricSourceName = instrumentation.getMetricSourceName(); + final MetricsSource source = activeMetrics.getSource(metricSourceName); + // verify the source is registered through a weak ref, and that the + // reference maps to the instance. + Assertions.assertThat(source) + .describedAs("metric source %s", metricSourceName) + .isNotNull() + .isInstanceOf(WeakRefMetricsSource.class) + .extracting(m -> ((WeakRefMetricsSource) m).getSource()) + .isSameAs(instrumentation); + + // this will close the metrics system + instrumentation.close(); + + // iostats is still valid + assertThat(instrumentation.getIOStatistics()) + .describedAs("iostats of %s", instrumentation) + .isNotNull(); + + // no metrics + assertThat(S3AInstrumentation.hasMetricSystem()) + .describedAs("S3AInstrumentation.hasMetricSystem()") + .isFalse(); + + // metric lookup still works, so any invocation of an s3a + // method which still updates a metric also works + assertThat(instrumentation.lookupMetric(metricName)) + .describedAs("lookupMetric(%s) when closed", metricName) + .isNotNull(); + + // which we can implicitly verify by asking for it and + // verifying that we get given a different one back + // from the demand-created instance + MetricsSystem metrics2 = getMetricsSystem(); + assertThat(metrics2) + .describedAs("metric system 2") + .isNotSameAs(activeMetrics); + + // this is going to be a no-op + instrumentation.close(); + + // which we can verify because the metrics system doesn't + // get closed this time + assertThat(getMetricsSystem()) + .describedAs("metric system 3") + .isSameAs(metrics2); + } +} diff --git a/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties b/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties index 0ec8d52042807..306a79a20a204 100644 --- a/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties +++ b/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties @@ -53,6 +53,8 @@ log4j.logger.org.apache.hadoop.ipc.Server=WARN # for debugging low level S3a operations, uncomment these lines # Log all S3A classes log4j.logger.org.apache.hadoop.fs.s3a=DEBUG +# when logging at trace, the stack of the initialize() call is logged +#log4j.logger.org.apache.hadoop.fs.s3a.S3AFileSystem=TRACE #log4j.logger.org.apache.hadoop.fs.s3a.S3AUtils=INFO #log4j.logger.org.apache.hadoop.fs.s3a.Listing=INFO log4j.logger.org.apache.hadoop.fs.s3a.SDKV2Upgrade=WARN