From 2a665324c52afedf7db22816fff1217e223698c0 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Thu, 17 Nov 2022 16:40:27 +0000 Subject: [PATCH 1/5] HADOOP-18526. Leak of S3AInstrumentation instances via hadoop Metrics references This has triggered an OOM in a process which was churning through s3a fs instances; the increased memory footprint of IOStatistics amplified what must have been a long standing issue. * Makes sure instrumentation is closed when fs is closed * Uses a weak reference from metrics to instrumentation, so even if the fs wasn't closed (see HADOOP-18478), this ref would not be holding things back. Change-Id: I3324d4bcfd9fb1770691bfca2d28ab0183faa27d --- .../hadoop/fs/impl/WeakRefMetricsSource.java | 85 +++++++++++++++++++ .../apache/hadoop/fs/s3a/S3AFileSystem.java | 2 + .../hadoop/fs/s3a/S3AInstrumentation.java | 12 ++- 3 files changed, 96 insertions(+), 3 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakRefMetricsSource.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakRefMetricsSource.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakRefMetricsSource.java new file mode 100644 index 0000000000000..b6b0895a90860 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakRefMetricsSource.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.impl; + +import java.lang.ref.WeakReference; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsSource; + +import static java.util.Objects.requireNonNull; + +/** + * A weak referenced metrics source which avoids hanging on to large objects + * if somehow they don't get fully closed/cleaned up. + */ +@InterfaceAudience.Private +public class WeakRefMetricsSource implements MetricsSource { + + /** + * Name to know when unregistering. + */ + private final String name; + + /** + * Underlying metrics source. + */ + private final WeakReference sourceWeakReference; + + /** + * Constructor. + * @param name Name to know when unregistering. + * @param source metrics source + */ + public WeakRefMetricsSource(final String name, final MetricsSource source) { + this.name = name; + this.sourceWeakReference = new WeakReference<>(requireNonNull(source)); + } + + /** + * If the weak reference is non null, update the metrics. + * @param collector to contain the resulting metrics snapshot + * @param all if true, return all metrics even if unchanged. + */ + @Override + public void getMetrics(final MetricsCollector collector, final boolean all) { + MetricsSource metricsSource = sourceWeakReference.get(); + if (metricsSource != null) { + metricsSource.getMetrics(collector, all); + } + } + + /** + * Name to know when unregistering. + * @return the name passed in during construction. + */ + public String getName() { + return name; + } + + @Override + public String toString() { + return "WeakRefMetricsSource{" + + "name='" + name + '\'' + + ", sourceWeakReference is " + + (sourceWeakReference.get() == null ? "unset" : "set") + + '}'; + } +} diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 3e6f2322d3b00..52bca10df8687 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -4016,6 +4016,8 @@ public void close() throws IOException { LOG.debug("Statistics for {}: {}", uri, IOStatisticsLogging.ioStatisticsToPrettyString(getIOStatistics())); } + // and shut down the statistics, which includes deregistering the metrics + cleanupWithLogger(LOG, instrumentation); } /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java index 46568ec2a8d3d..415ac0d9ff36f 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java @@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.impl.WeakRefMetricsSource; import org.apache.hadoop.fs.s3a.statistics.BlockOutputStreamStatistics; import org.apache.hadoop.fs.s3a.statistics.ChangeTrackerStatistics; import org.apache.hadoop.fs.s3a.statistics.CommitterStatistics; @@ -160,7 +161,10 @@ public class S3AInstrumentation implements Closeable, MetricsSource, private final DurationTrackerFactory durationTrackerFactory; - private String metricsSourceName; + /** + * Weak reference so there's no back reference to the instrumentation. + */ + private WeakRefMetricsSource metricsSourceReference; private final MetricsRegistry registry = new MetricsRegistry("s3aFileSystem").setContext(CONTEXT); @@ -257,7 +261,8 @@ private void registerAsMetricsSource(URI name) { number = ++metricsSourceNameCounter; } String msName = METRICS_SOURCE_BASENAME + number; - metricsSourceName = msName + "-" + name.getHost(); + String metricsSourceName = msName + "-" + name.getHost(); + metricsSourceReference = new WeakRefMetricsSource(metricsSourceName, this); metricsSystem.register(metricsSourceName, "", this); } @@ -681,11 +686,12 @@ public void getMetrics(MetricsCollector collector, boolean all) { } public void close() { + LOG.debug("Unregistering metrics"); synchronized (METRICS_SYSTEM_LOCK) { // it is critical to close each quantile, as they start a scheduled // task in a shared thread pool. throttleRateQuantile.stop(); - metricsSystem.unregisterSource(metricsSourceName); + metricsSystem.unregisterSource(metricsSourceReference.getName()); metricsSourceActiveCounter--; int activeSources = metricsSourceActiveCounter; if (activeSources == 0) { From cf0b5e1c17f0494ec2be071590e029d8a041e870 Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Wed, 23 Nov 2022 14:54:57 +0000 Subject: [PATCH 2/5] HADOOP-18526. Leak of S3AInstrumentation instances via hadoop Metrics references Add tests that instrumentation.close() unregisters *once*. reverted the change to close() it in s3afs.close(), because that was already done in stopAllServices(). if something is leaking S3AInstrumentation instances, it is because it is creating fs instances outside the cache, and not closing them. once dereferenced and a GC triggered, the fs instance and most internal classes are GC'd, but not the instrumentation because of the back ref. This weakref link will do that, but will result in the metrics structure being filled up with lots of weakrefs which don't get cleaned up. It's a lot less expensive than strong refs, but still not ideal. Fixing that would be tricky as the base metrics class or a subclass would need to do cleanups (when?). Change-Id: I07fe792301e23ed1d3676fbb8cc033c5a96e55a2 --- .../hadoop/fs/impl/WeakRefMetricsSource.java | 4 + .../apache/hadoop/fs/s3a/S3AFileSystem.java | 23 ++--- .../hadoop/fs/s3a/S3AInstrumentation.java | 53 ++++++++--- .../hadoop/fs/s3a/ITestS3AClosedFS.java | 12 +++ .../fs/s3a/TestInstrumentationLifecycle.java | 87 +++++++++++++++++++ 5 files changed, 150 insertions(+), 29 deletions(-) create mode 100644 hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestInstrumentationLifecycle.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakRefMetricsSource.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakRefMetricsSource.java index b6b0895a90860..6948f9daaf7ea 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakRefMetricsSource.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakRefMetricsSource.java @@ -29,6 +29,10 @@ /** * A weak referenced metrics source which avoids hanging on to large objects * if somehow they don't get fully closed/cleaned up. + * The JVM may clean up all objects which are only weakly referenced whenever + * it does a GC, even if there is no memory pressure. + * To avoid these refs being removed, always keep a strong reference around + * somewhere. */ @InterfaceAudience.Private public class WeakRefMetricsSource implements MetricsSource { diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 52bca10df8687..12b9d1e3c2877 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -138,7 +138,6 @@ import org.apache.hadoop.fs.statistics.DurationTracker; import org.apache.hadoop.fs.statistics.DurationTrackerFactory; import org.apache.hadoop.fs.statistics.IOStatistics; -import org.apache.hadoop.fs.statistics.IOStatisticsLogging; import org.apache.hadoop.fs.statistics.IOStatisticsSource; import org.apache.hadoop.fs.statistics.IOStatisticsContext; import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore; @@ -3999,25 +3998,19 @@ public void close() throws IOException { } isClosed = true; LOG.debug("Filesystem {} is closed", uri); - if (getConf() != null) { - String iostatisticsLoggingLevel = - getConf().getTrimmed(IOSTATISTICS_LOGGING_LEVEL, - IOSTATISTICS_LOGGING_LEVEL_DEFAULT); - logIOStatisticsAtLevel(LOG, iostatisticsLoggingLevel, getIOStatistics()); - } try { super.close(); } finally { stopAllServices(); + // log IO statistics, including of any file deletion during + // superclass close + if (getConf() != null) { + String iostatisticsLoggingLevel = + getConf().getTrimmed(IOSTATISTICS_LOGGING_LEVEL, + IOSTATISTICS_LOGGING_LEVEL_DEFAULT); + logIOStatisticsAtLevel(LOG, iostatisticsLoggingLevel, getIOStatistics()); + } } - // Log IOStatistics at debug. - if (LOG.isDebugEnabled()) { - // robust extract and convert to string - LOG.debug("Statistics for {}: {}", uri, - IOStatisticsLogging.ioStatisticsToPrettyString(getIOStatistics())); - } - // and shut down the statistics, which includes deregistering the metrics - cleanupWithLogger(LOG, instrumentation); } /** diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java index 415ac0d9ff36f..60a776a058e46 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java @@ -237,17 +237,31 @@ public S3AInstrumentation(URI name) { new MetricDurationTrackerFactory()); } + /** + * Get the current metrics system; demand creating. + * @return a metric system, creating if need be. + */ @VisibleForTesting - public MetricsSystem getMetricsSystem() { + static MetricsSystem getMetricsSystem() { synchronized (METRICS_SYSTEM_LOCK) { if (metricsSystem == null) { metricsSystem = new MetricsSystemImpl(); metricsSystem.init(METRICS_SYSTEM_NAME); + LOG.debug("Metrics system inited {}", metricsSystem); } } return metricsSystem; } + /** + * Does the instrumentation have a metrics system? + * @return true if the metrics system is present. + */ + @VisibleForTesting + static boolean hasMetricSystem() { + return metricsSystem != null; + } + /** * Register this instance as a metrics source. * @param name s3a:// URI for the associated FileSystem instance @@ -686,19 +700,30 @@ public void getMetrics(MetricsCollector collector, boolean all) { } public void close() { - LOG.debug("Unregistering metrics"); - synchronized (METRICS_SYSTEM_LOCK) { - // it is critical to close each quantile, as they start a scheduled - // task in a shared thread pool. - throttleRateQuantile.stop(); - metricsSystem.unregisterSource(metricsSourceReference.getName()); - metricsSourceActiveCounter--; - int activeSources = metricsSourceActiveCounter; - if (activeSources == 0) { - LOG.debug("Shutting down metrics publisher"); - metricsSystem.publishMetricsNow(); - metricsSystem.shutdown(); - metricsSystem = null; + if (metricsSourceReference != null) { + // get the name + String name = metricsSourceReference.getName(); + LOG.debug("Unregistering metrics for {}", name); + // then set to null so a second close() is a noop here. + metricsSourceReference = null; + synchronized (METRICS_SYSTEM_LOCK) { + // it is critical to close each quantile, as they start a scheduled + // task in a shared thread pool. + if (metricsSystem == null) { + LOG.debug("there is no metric system to unregister {} from", name); + return; + } + throttleRateQuantile.stop(); + + metricsSystem.unregisterSource(name); + metricsSourceActiveCounter--; + int activeSources = metricsSourceActiveCounter; + if (activeSources == 0) { + LOG.debug("Shutting down metrics publisher"); + metricsSystem.publishMetricsNow(); + metricsSystem.shutdown(); + metricsSystem = null; + } } } } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClosedFS.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClosedFS.java index 79772ec9dadfe..327b0fab288f7 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClosedFS.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AClosedFS.java @@ -103,4 +103,16 @@ public void testClosedOpen() throws Exception { () -> getFileSystem().open(path("to-open"))); } + @Test + public void testClosedInstrumentation() throws Exception { + // no metrics + Assertions.assertThat(S3AInstrumentation.hasMetricSystem()) + .describedAs("S3AInstrumentation.hasMetricSystem()") + .isFalse(); + + Assertions.assertThat(getFileSystem().getIOStatistics()) + .describedAs("iostatistics of %s", getFileSystem()) + .isNotNull(); + } + } diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestInstrumentationLifecycle.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestInstrumentationLifecycle.java new file mode 100644 index 0000000000000..2dce6b629cb22 --- /dev/null +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestInstrumentationLifecycle.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import java.net.URI; + +import org.junit.Test; + +import org.apache.hadoop.fs.statistics.StoreStatisticNames; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.test.AbstractHadoopTestBase; + +import static org.apache.hadoop.fs.s3a.S3AInstrumentation.getMetricsSystem; +import static org.apache.hadoop.fs.s3a.Statistic.DIRECTORIES_CREATED; +import static org.assertj.core.api.Assertions.assertThat; + +public class TestInstrumentationLifecycle extends AbstractHadoopTestBase { + + @Test + public void testDoubleClose() throws Throwable { + S3AInstrumentation instrumentation = new S3AInstrumentation(new URI("s3a://example/")); + + // the metric system is created in the constructor + assertThat(S3AInstrumentation.hasMetricSystem()) + .describedAs("S3AInstrumentation.hasMetricSystem()") + .isTrue(); + // ask for a metric + String metricName = DIRECTORIES_CREATED.getSymbol(); + assertThat(instrumentation.lookupMetric(metricName)) + .describedAs("lookupMetric(%s) while open", metricName) + .isNotNull(); + + MetricsSystem activeMetrics = getMetricsSystem(); + + // this will close the metrics system + instrumentation.close(); + + // iostats is still valid + assertThat(instrumentation.getIOStatistics()) + .describedAs("iostats of %s", instrumentation) + .isNotNull(); + + // no metrics + assertThat(S3AInstrumentation.hasMetricSystem()) + .describedAs("S3AInstrumentation.hasMetricSystem()") + .isFalse(); + + // metric lookup still works, so any invocation of an s3a + // method which still updates a metric also works + assertThat(instrumentation.lookupMetric(metricName)) + .describedAs("lookupMetric(%s) when closed", metricName) + .isNotNull(); + + // which we can implicitly verify by asking for it and + // verifying that we get given a different one back + // from the demand-created instance + MetricsSystem metrics2 = getMetricsSystem(); + assertThat(metrics2) + .describedAs("metric system 2") + .isNotSameAs(activeMetrics); + + // this is going to be a no-op + instrumentation.close(); + + // which we can verify because the metrics system doesn't + // get closed this time + assertThat(getMetricsSystem()) + .describedAs("metric system 3") + .isSameAs(metrics2); + } +} From f8cdd3d3e5e17b5a49a6f57336446df065f9a82f Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Wed, 30 Nov 2022 17:41:54 +0000 Subject: [PATCH 3/5] HADOOP-18526. checkstyle and TRACE of stack in s3afs.initialize() off by default...test/log4j.properties has it commented out Change-Id: Ie83b74009ccae8869b2c5386206b913e0f76af2f --- .../main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java | 6 ++++++ .../apache/hadoop/fs/s3a/TestInstrumentationLifecycle.java | 1 - hadoop-tools/hadoop-aws/src/test/resources/log4j.properties | 2 ++ 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 12b9d1e3c2877..54d0328c7768e 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -458,6 +458,12 @@ public void initialize(URI name, Configuration originalConf) AuditSpan span = null; try { LOG.debug("Initializing S3AFileSystem for {}", bucket); + if (LOG.isTraceEnabled()) { + // log a full trace for deep diagnostics of where an object is created, + // for tracking down memory leak issues. + LOG.trace("Filesystem for {} creation stack", + name, new RuntimeException(super.toString())); + } // clone the configuration into one with propagated bucket options Configuration conf = propagateBucketOptions(originalConf, bucket); // HADOOP-17894. remove references to s3a stores in JCEKS credentials. diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestInstrumentationLifecycle.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestInstrumentationLifecycle.java index 2dce6b629cb22..61c0d06be0958 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestInstrumentationLifecycle.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestInstrumentationLifecycle.java @@ -22,7 +22,6 @@ import org.junit.Test; -import org.apache.hadoop.fs.statistics.StoreStatisticNames; import org.apache.hadoop.metrics2.MetricsSystem; import org.apache.hadoop.test.AbstractHadoopTestBase; diff --git a/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties b/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties index 0ec8d52042807..306a79a20a204 100644 --- a/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties +++ b/hadoop-tools/hadoop-aws/src/test/resources/log4j.properties @@ -53,6 +53,8 @@ log4j.logger.org.apache.hadoop.ipc.Server=WARN # for debugging low level S3a operations, uncomment these lines # Log all S3A classes log4j.logger.org.apache.hadoop.fs.s3a=DEBUG +# when logging at trace, the stack of the initialize() call is logged +#log4j.logger.org.apache.hadoop.fs.s3a.S3AFileSystem=TRACE #log4j.logger.org.apache.hadoop.fs.s3a.S3AUtils=INFO #log4j.logger.org.apache.hadoop.fs.s3a.Listing=INFO log4j.logger.org.apache.hadoop.fs.s3a.SDKV2Upgrade=WARN From 48974549308b1eb63cb832e3a04beef3f593018d Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Tue, 6 Dec 2022 17:09:31 +0000 Subject: [PATCH 4/5] HADOOP-18526. s3a instrumentation * use the weak ref when registering * log state of cache.disabled in trace-level constructor The reason for the logging is that running out of memory due to instrumentation refs is probably a sign of s3a fs instances being created and not closed. HADOOP-18476 should partially address this through FileContext; the trace is there to identify other possible causes. Change-Id: If78d16e8bfc63eea93ec328543ab110f82c93d90 --- .../main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java | 5 +++-- .../java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java | 4 ++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java index 54d0328c7768e..420a92788c82e 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java @@ -461,8 +461,9 @@ public void initialize(URI name, Configuration originalConf) if (LOG.isTraceEnabled()) { // log a full trace for deep diagnostics of where an object is created, // for tracking down memory leak issues. - LOG.trace("Filesystem for {} creation stack", - name, new RuntimeException(super.toString())); + LOG.trace("Filesystem for {} created; fs.s3a.impl.disable.cache = {}", + name, originalConf.getBoolean("fs.s3a.impl.disable.cache", false), + new RuntimeException(super.toString())); } // clone the configuration into one with propagated bucket options Configuration conf = propagateBucketOptions(originalConf, bucket); diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java index 60a776a058e46..69e726deb80bc 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java @@ -263,7 +263,7 @@ static boolean hasMetricSystem() { } /** - * Register this instance as a metrics source. + * Register this instance as a metrics source via a weak reference. * @param name s3a:// URI for the associated FileSystem instance */ private void registerAsMetricsSource(URI name) { @@ -277,7 +277,7 @@ private void registerAsMetricsSource(URI name) { String msName = METRICS_SOURCE_BASENAME + number; String metricsSourceName = msName + "-" + name.getHost(); metricsSourceReference = new WeakRefMetricsSource(metricsSourceName, this); - metricsSystem.register(metricsSourceName, "", this); + metricsSystem.register(metricsSourceName, "", metricsSourceReference); } /** From 627910bb26cc21528c88509683fa639737aba26f Mon Sep 17 00:00:00 2001 From: Steve Loughran Date: Thu, 8 Dec 2022 14:07:30 +0000 Subject: [PATCH 5/5] HADOOP-18526. review feedback and test improvement. Extended S3AInstrumentation and WeakRefMetricsSource to allow for the test to verify that the link from metrics to instrumentation works and is through the weak reference Change-Id: I4fc156dfa60d7c7a868d8b765777c0f445a83257 --- .../hadoop/fs/impl/WeakRefMetricsSource.java | 8 ++++++++ .../hadoop/fs/s3a/S3AInstrumentation.java | 11 +++++++++++ .../fs/s3a/TestInstrumentationLifecycle.java | 18 ++++++++++++++++++ 3 files changed, 37 insertions(+) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakRefMetricsSource.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakRefMetricsSource.java index 6948f9daaf7ea..146773857934c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakRefMetricsSource.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/WeakRefMetricsSource.java @@ -78,6 +78,14 @@ public String getName() { return name; } + /** + * Get the source, will be null if the reference has been GC'd + * @return the source reference + */ + public MetricsSource getSource() { + return sourceWeakReference.get(); + } + @Override public String toString() { return "WeakRefMetricsSource{" + diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java index 69e726deb80bc..9d33efa9d01b0 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java @@ -699,6 +699,17 @@ public void getMetrics(MetricsCollector collector, boolean all) { registry.snapshot(collector.addRecord(registry.info().name()), true); } + /** + * if registered with the metrics, return the + * name of the source. + * @return the name of the metrics, or null if this instance is not bonded. + */ + public String getMetricSourceName() { + return metricsSourceReference != null + ? metricsSourceReference.getName() + : null; + } + public void close() { if (metricsSourceReference != null) { // get the name diff --git a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestInstrumentationLifecycle.java b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestInstrumentationLifecycle.java index 61c0d06be0958..d8b9247008cc8 100644 --- a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestInstrumentationLifecycle.java +++ b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestInstrumentationLifecycle.java @@ -20,8 +20,11 @@ import java.net.URI; +import org.assertj.core.api.Assertions; import org.junit.Test; +import org.apache.hadoop.fs.impl.WeakRefMetricsSource; +import org.apache.hadoop.metrics2.MetricsSource; import org.apache.hadoop.metrics2.MetricsSystem; import org.apache.hadoop.test.AbstractHadoopTestBase; @@ -29,6 +32,11 @@ import static org.apache.hadoop.fs.s3a.Statistic.DIRECTORIES_CREATED; import static org.assertj.core.api.Assertions.assertThat; +/** + * Test the {@link S3AInstrumentation} lifecycle, in particular how + * it binds to hadoop metrics through a {@link WeakRefMetricsSource} + * and that it will deregister itself in {@link S3AInstrumentation#close()}. + */ public class TestInstrumentationLifecycle extends AbstractHadoopTestBase { @Test @@ -46,6 +54,16 @@ public void testDoubleClose() throws Throwable { .isNotNull(); MetricsSystem activeMetrics = getMetricsSystem(); + final String metricSourceName = instrumentation.getMetricSourceName(); + final MetricsSource source = activeMetrics.getSource(metricSourceName); + // verify the source is registered through a weak ref, and that the + // reference maps to the instance. + Assertions.assertThat(source) + .describedAs("metric source %s", metricSourceName) + .isNotNull() + .isInstanceOf(WeakRefMetricsSource.class) + .extracting(m -> ((WeakRefMetricsSource) m).getSource()) + .isSameAs(instrumentation); // this will close the metrics system instrumentation.close();