diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java index 0eb23bd19ed91..2119b1b30c362 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java @@ -136,6 +136,7 @@ public abstract class AbfsClient implements Closeable { public static final Logger LOG = LoggerFactory.getLogger(AbfsClient.class); public static final String HUNDRED_CONTINUE_USER_AGENT = SINGLE_WHITE_SPACE + HUNDRED_CONTINUE + SEMICOLON; + public static final String ABFS_CLIENT_TIMER_THREAD_NAME = "abfs-timer-client"; private final URL baseUrl; private final SharedKeyCredentials sharedKeyCredentials; @@ -154,7 +155,7 @@ public abstract class AbfsClient implements Closeable { private AccessTokenProvider tokenProvider; private SASTokenProvider sasTokenProvider; private final AbfsCounters abfsCounters; - private final Timer timer; + private Timer timer; private final String abfsMetricUrl; private boolean isMetricCollectionEnabled = false; private final MetricFormat metricFormat; @@ -263,9 +264,9 @@ private AbfsClient(final URL baseUrl, throw new IOException("Exception while initializing metric credentials " + e); } } - this.timer = new Timer( - "abfs-timer-client", true); if (isMetricCollectionEnabled) { + this.timer = new Timer( + ABFS_CLIENT_TIMER_THREAD_NAME, true); timer.schedule(new TimerTaskImpl(), metricIdlePeriod, metricIdlePeriod); @@ -297,6 +298,10 @@ public AbfsClient(final URL baseUrl, final SharedKeyCredentials sharedKeyCredent @Override public void close() throws IOException { + if (isMetricCollectionEnabled && runningTimerTask != null) { + runningTimerTask.cancel(); + timer.cancel(); + } if (keepAliveCache != null) { keepAliveCache.close(); } @@ -1425,7 +1430,7 @@ private TracingContext getMetricTracingContext() { boolean timerOrchestrator(TimerFunctionality timerFunctionality, TimerTask timerTask) { switch (timerFunctionality) { case RESUME: - if (isMetricCollectionStopped.get()) { + if (isMetricCollectionEnabled && isMetricCollectionStopped.get()) { synchronized (this) { if (isMetricCollectionStopped.get()) { resumeTimer(); @@ -1604,6 +1609,11 @@ KeepAliveCache getKeepAliveCache() { return keepAliveCache; } + @VisibleForTesting + protected Timer getTimer() { + return timer; + } + protected String getUserAgent() { return userAgent; } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java new file mode 100644 index 0000000000000..e8ab4291b32c5 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsClient.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.net.URI; +import java.net.URL; +import java.util.Map; + +import org.assertj.core.api.Assertions; +import org.junit.Test; +import org.mockito.Mockito; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; +import org.apache.hadoop.fs.azurebfs.AbfsCountersImpl; +import org.apache.hadoop.fs.azurebfs.oauth2.AccessTokenProvider; +import org.apache.hadoop.fs.azurebfs.utils.Base64; +import org.apache.hadoop.fs.azurebfs.utils.MetricFormat; + +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_METRIC_ACCOUNT_KEY; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_METRIC_ACCOUNT_NAME; +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_METRIC_FORMAT; +import static org.apache.hadoop.fs.azurebfs.services.AbfsClient.ABFS_CLIENT_TIMER_THREAD_NAME; + +/** + * Unit test cases for the AbfsClient class. + */ +public class TestAbfsClient { + private static final String ACCOUNT_NAME = "bogusAccountName.dfs.core.windows.net"; + private static final String ACCOUNT_KEY = "testKey"; + private static final long SLEEP_DURATION_MS = 500; + + /** + * Test the initialization of the AbfsClient timer when metric collection is disabled. + * In case of metric collection being disabled, the timer should not be initialized. + * Asserting that the timer is null and the abfs-timer-client thread is not running. + */ + @Test + public void testTimerInitializationWithoutMetricCollection() throws Exception { + final Configuration configuration = new Configuration(); + AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration, ACCOUNT_NAME); + + AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI("abcd"))); + AbfsClientContext abfsClientContext = new AbfsClientContextBuilder().withAbfsCounters(abfsCounters).build(); + + // Get an instance of AbfsClient. + AbfsClient client = new AbfsDfsClient(new URL("https://azure.com"), + null, + abfsConfiguration, + (AccessTokenProvider) null, + null, + abfsClientContext); + + Assertions.assertThat(client.getTimer()) + .describedAs("Timer should not be initialized") + .isNull(); + + // Check if a thread with the name "abfs-timer-client" exists + Assertions.assertThat(isThreadRunning(ABFS_CLIENT_TIMER_THREAD_NAME)) + .describedAs("Expected thread 'abfs-timer-client' not found") + .isEqualTo(false); + client.close(); + } + + /** + * Test the initialization of the AbfsClient timer when metric collection is enabled. + * In case of metric collection being enabled, the timer should be initialized. + * Asserting that the timer is not null and the abfs-timer-client thread is running. + * Also, asserting that the thread is removed after closing the client. + */ + @Test + public void testTimerInitializationWithMetricCollection() throws Exception { + final Configuration configuration = new Configuration(); + configuration.set(FS_AZURE_METRIC_FORMAT, String.valueOf(MetricFormat.INTERNAL_BACKOFF_METRIC_FORMAT)); + configuration.set(FS_AZURE_METRIC_ACCOUNT_NAME, ACCOUNT_NAME); + configuration.set(FS_AZURE_METRIC_ACCOUNT_KEY, Base64.encode(ACCOUNT_KEY.getBytes())); + AbfsConfiguration abfsConfiguration = new AbfsConfiguration(configuration, ACCOUNT_NAME); + + AbfsCounters abfsCounters = Mockito.spy(new AbfsCountersImpl(new URI("abcd"))); + AbfsClientContext abfsClientContext = new AbfsClientContextBuilder().withAbfsCounters(abfsCounters).build(); + + // Get an instance of AbfsClient. + AbfsClient client = new AbfsDfsClient(new URL("https://azure.com"), + null, + abfsConfiguration, + (AccessTokenProvider) null, + null, + abfsClientContext); + + Assertions.assertThat(client.getTimer()) + .describedAs("Timer should be initialized") + .isNotNull(); + + // Check if a thread with the name "abfs-timer-client" exists + Assertions.assertThat(isThreadRunning(ABFS_CLIENT_TIMER_THREAD_NAME)) + .describedAs("Expected thread 'abfs-timer-client' not found") + .isEqualTo(true); + client.close(); + + // Check if the thread is removed after closing the client + Thread.sleep(SLEEP_DURATION_MS); + Assertions.assertThat(isThreadRunning(ABFS_CLIENT_TIMER_THREAD_NAME)) + .describedAs("Unexpected thread 'abfs-timer-client' found") + .isEqualTo(false); + } + + /** + * Check if a thread with the specified name is running. + * + * @param threadName Name of the thread to check + * @return true if the thread is running, false otherwise + */ + private boolean isThreadRunning(String threadName) { + // Get all threads and their stack traces + Map allThreads = Thread.getAllStackTraces(); + + // Check if any thread has the specified name + for (Thread thread : allThreads.keySet()) { + if (thread.getName().equals(threadName)) { + return true; + } + } + return false; + } +}