From 1b74df0030ca55a1266e8d5b42f5d2f71c665739 Mon Sep 17 00:00:00 2001 From: Hanley Yang Date: Thu, 16 Sep 2021 17:03:23 +0800 Subject: [PATCH 1/2] HDFS-11045. Make TestDirectoryScanner#testThrottling() not based on timing. --- .../server/datanode/DirectoryScanner.java | 24 +- .../server/datanode/TestDirectoryScanner.java | 374 +++++++++--------- 2 files changed, 204 insertions(+), 194 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java index eef1732ac4426..a5a3d0bde3e29 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java @@ -733,22 +733,26 @@ public void throttle() throws InterruptedException { if (throttleLimitMsPerSec > 0L) { final long runningTime = throttleTimer.now(TimeUnit.MILLISECONDS); if (runningTime >= throttleLimitMsPerSec) { - final long sleepTime; - if (runningTime >= 1000L) { - LOG.warn("Unable to throttle within the second. Blocking for 1s."); - sleepTime = 1000L; - } else { - // Sleep for the expected time plus any time processing ran over - final long overTime = runningTime - throttleLimitMsPerSec; - sleepTime = (1000L - throttleLimitMsPerSec) + overTime; - } - Thread.sleep(sleepTime); + Thread.sleep(calculateSleepTime(runningTime)); throttleTimer.reset().start(); } accumulateTimeWaiting(); } } + @VisibleForTesting + long calculateSleepTime(long runningTime) { + if (throttleLimitMsPerSec <= 0) return 0; + if (runningTime >= 1000L) { + LOG.warn("Unable to throttle within the second. Blocking for 1s."); + return 1000L; + } else { + // Sleep for the expected time plus any time processing ran over + final long overTime = runningTime - throttleLimitMsPerSec; + return (1000L - throttleLimitMsPerSec) + overTime; + } + } + /** * Helper method to measure time running. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java index 7f79778842780..5855a0cc7ec63 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java @@ -26,6 +26,8 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; import java.io.ByteArrayOutputStream; import java.io.File; @@ -74,12 +76,14 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.AutoCloseableLock; +import org.apache.hadoop.util.Lists; import org.apache.hadoop.util.Time; import org.apache.log4j.SimpleLayout; import org.apache.log4j.WriterAppender; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; +import org.mockito.stubbing.Answer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.event.Level; @@ -665,219 +669,221 @@ public void runTest(int parallelism) throws Exception { } } + @Test + public void testCalculateSleepTime() { + // disabled + runCalculateSleepTimeTest(0, 100, 0); + runCalculateSleepTimeTest(-2, 100, 0); + runCalculateSleepTimeTest(1001, 10000, 0); + // below the limit + runCalculateSleepTimeTest(100, 99, 899); + // equals to the limit + runCalculateSleepTimeTest(100, 100, 900); + // above the limit + runCalculateSleepTimeTest(100, 101, 901); + // above 1s + runCalculateSleepTimeTest(100, 1001, 1000); + } + + public void runCalculateSleepTimeTest(int throttleLimitMsPerSec, int runintTime, long expected) { + FsDatasetSpi fds = mock(FsDatasetSpi.class); + FsVolumeSpi volumeSpi = mock(FsVolumeImpl.class); + Configuration conf = new Configuration(getConfiguration()); + conf.setInt( + DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY, + throttleLimitMsPerSec); + scanner = new DirectoryScanner(fds, conf); + ReportCompiler compiler = scanner.new ReportCompiler(volumeSpi); + assertTrue(compiler.calculateSleepTime(runintTime) == expected); + } + /** * Test that the timeslice throttle limits the report compiler thread's - * execution time correctly. We test by scanning a large block pool and - * comparing the time spent waiting to the time spent running. - * - * The block pool has to be large, or the ratio will be off. The throttle - * allows the report compiler thread to finish its current cycle when blocking - * it, so the ratio will always be a little lower than expected. The smaller - * the block pool, the further off the ratio will be. + * execution time correctly. We test by mocking a FsVolumeSpi throttling + * every {@link DFSConfigKeys#DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY} + * and comparing the time spent waiting to the time spent running. * * @throws Exception thrown on unexpected failure */ @Test public void testThrottling() throws Exception { + FsDatasetSpi fds = mock(FsDatasetSpi.class); Configuration conf = new Configuration(getConfiguration()); - - // We need lots of blocks so the report compiler threads have enough to - // keep them busy while we watch them. - int blocks = 20000; - int maxRetries = 3; - - cluster = new MiniDFSCluster.Builder(conf).build(); - - try { - cluster.waitActive(); - bpid = cluster.getNamesystem().getBlockPoolId(); - fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0)); - client = cluster.getFileSystem().getClient(); - conf.setInt( - DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY, - 100); - - final int maxBlocksPerFile = - (int) DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_DEFAULT; - int numBlocksToCreate = blocks; - while (numBlocksToCreate > 0) { - final int toCreate = Math.min(maxBlocksPerFile, numBlocksToCreate); - createFile(GenericTestUtils.getMethodName() + numBlocksToCreate, - BLOCK_LENGTH * toCreate, false); - numBlocksToCreate -= toCreate; + bpid = "bp01"; + + FsVolumeSpi volumeSpi = mock(FsVolumeImpl.class); + FsVolumeReference reference = mock(FsVolumeReference.class); + when(reference.getVolume()).thenReturn(volumeSpi); + when(volumeSpi.obtainReference()).thenReturn(reference); + when(fds.getFsVolumeReferences()).thenAnswer((Answer) + invocationOnMock -> + new FsVolumeReferences(Lists.newArrayList(volumeSpi, volumeSpi))); + when(volumeSpi.getBlockPoolList()).thenReturn(new String[]{bpid}); + doAnswer(invocationOnMock -> { + ReportCompiler compiler = invocationOnMock.getArgument(2, ReportCompiler.class); + int throttleLimitMs = conf.getInt( + DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY, + 0); + for (int i = 0; i < 3; i++) { + compiler.throttle(); + // Make sure to throttle next time. + Thread.sleep(Math.max(1, throttleLimitMs)); } + return null; + }).when(volumeSpi).compileReport(any(), any(), any()); + + float ratio; + int blocks = 0; + + conf.setInt( + DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY, + 100); + scanner = new DirectoryScanner(fds, conf); + ratio = runThrottleTest(blocks); + + // Waiting should be about 9x running. + LOG.info("RATIO: " + ratio); + assertTrue("Throttle is too restrictive", ratio <= 10f); + assertTrue("Throttle is too permissive" + ratio, ratio >= 7f); + + // Test with a different limit + conf.setInt( + DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY, + 200); + scanner = new DirectoryScanner(fds, conf); + ratio = runThrottleTest(blocks); + + // Waiting should be about 4x running. + LOG.info("RATIO: " + ratio); + assertTrue("Throttle is too restrictive", ratio <= 4.5f); + assertTrue("Throttle is too permissive", ratio >= 2.75f); + + // Test with more than 1 thread + conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY, 3); + conf.setInt( + DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY, + 100); + scanner = new DirectoryScanner(fds, conf); + ratio = runThrottleTest(blocks); + + // Waiting should be about 9x running. + LOG.info("RATIO: " + ratio); + assertTrue("Throttle is too restrictive", ratio <= 10f); + assertTrue("Throttle is too permissive", ratio >= 7f); + + // Test with no limit + scanner = new DirectoryScanner(fds, getConfiguration()); + scanner.setRetainDiffs(true); + scan(blocks, 0, 0, 0, 0, 0); + scanner.shutdown(); + assertFalse(scanner.getRunStatus()); - float ratio = 0.0f; - int retries = maxRetries; + assertTrue("Throttle appears to be engaged", + scanner.timeWaitingMs.get() < 10L); + assertTrue("Report complier threads logged no execution time", + scanner.timeRunningMs.get() > 0L); - while ((retries > 0) && ((ratio < 7f) || (ratio > 10f))) { - scanner = new DirectoryScanner(fds, conf); - ratio = runThrottleTest(blocks); - retries -= 1; - } + // Test with a 1ms limit. This also tests whether the scanner can be + // shutdown cleanly in mid stride. + conf.setInt( + DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY, + 1); + ratio = 0.0f; + ScheduledExecutorService interruptor = + Executors.newScheduledThreadPool(1); - // Waiting should be about 9x running. - LOG.info("RATIO: " + ratio); - assertTrue("Throttle is too restrictive", ratio <= 10f); - assertTrue("Throttle is too permissive" + ratio, ratio >= 7f); - - // Test with a different limit - conf.setInt( - DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY, - 200); - ratio = 0.0f; - retries = maxRetries; - - while ((retries > 0) && ((ratio < 2.75f) || (ratio > 4.5f))) { - scanner = new DirectoryScanner(fds, conf); - ratio = runThrottleTest(blocks); - retries -= 1; - } + try { + scanner = new DirectoryScanner(fds, conf); + scanner.setRetainDiffs(true); - // Waiting should be about 4x running. - LOG.info("RATIO: " + ratio); - assertTrue("Throttle is too restrictive", ratio <= 4.5f); - assertTrue("Throttle is too permissive", ratio >= 2.75f); - - // Test with more than 1 thread - conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY, 3); - conf.setInt( - DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY, - 100); - ratio = 0.0f; - retries = maxRetries; - - while ((retries > 0) && ((ratio < 7f) || (ratio > 10f))) { - scanner = new DirectoryScanner(fds, conf); - ratio = runThrottleTest(blocks); - retries -= 1; - } + final AtomicLong nowMs = new AtomicLong(); - // Waiting should be about 9x running. - LOG.info("RATIO: " + ratio); - assertTrue("Throttle is too restrictive", ratio <= 10f); - assertTrue("Throttle is too permissive", ratio >= 7f); + // Stop the scanner after 2 seconds because otherwise it will take an + // eternity to complete it's run + interruptor.schedule(new Runnable() { + @Override + public void run() { + nowMs.set(Time.monotonicNow()); + scanner.shutdown(); + } + }, 2L, TimeUnit.SECONDS); - // Test with no limit - scanner = new DirectoryScanner(fds, getConfiguration()); - scanner.setRetainDiffs(true); - scan(blocks, 0, 0, 0, 0, 0); - scanner.shutdown(); + scanner.reconcile(); assertFalse(scanner.getRunStatus()); - assertTrue("Throttle appears to be engaged", - scanner.timeWaitingMs.get() < 10L); - assertTrue("Report complier threads logged no execution time", - scanner.timeRunningMs.get() > 0L); + long finalMs = nowMs.get(); - // Test with a 1ms limit. This also tests whether the scanner can be - // shutdown cleanly in mid stride. - conf.setInt( - DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY, - 1); - ratio = 0.0f; - retries = maxRetries; - ScheduledExecutorService interruptor = - Executors.newScheduledThreadPool(maxRetries); - - try { - while ((retries > 0) && (ratio < 10)) { - scanner = new DirectoryScanner(fds, conf); - scanner.setRetainDiffs(true); - - final AtomicLong nowMs = new AtomicLong(); - - // Stop the scanner after 2 seconds because otherwise it will take an - // eternity to complete it's run - interruptor.schedule(new Runnable() { - @Override - public void run() { - nowMs.set(Time.monotonicNow()); - scanner.shutdown(); - } - }, 2L, TimeUnit.SECONDS); - - scanner.reconcile(); - assertFalse(scanner.getRunStatus()); - - long finalMs = nowMs.get(); - - // If the scan didn't complete before the shutdown was run, check - // that the shutdown was timely - if (finalMs > 0) { - LOG.info("Scanner took " + (Time.monotonicNow() - finalMs) + // If the scan didn't complete before the shutdown was run, check + // that the shutdown was timely + if (finalMs > 0) { + LOG.info("Scanner took " + (Time.monotonicNow() - finalMs) + "ms to shutdown"); - assertTrue("Scanner took too long to shutdown", + assertTrue("Scanner took too long to shutdown", Time.monotonicNow() - finalMs < 1000L); - } - - ratio = - (float) scanner.timeWaitingMs.get() / scanner.timeRunningMs.get(); - retries -= 1; - } - } finally { - interruptor.shutdown(); } - // We just want to test that it waits a lot, but it also runs some - LOG.info("RATIO: " + ratio); - assertTrue("Throttle is too permissive", ratio > 8); - assertTrue("Report complier threads logged no execution time", - scanner.timeRunningMs.get() > 0L); - - // Test with a 0 limit, i.e. disabled - conf.setInt( - DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY, - 0); - scanner = new DirectoryScanner(fds, conf); - scanner.setRetainDiffs(true); - scan(blocks, 0, 0, 0, 0, 0); - scanner.shutdown(); - assertFalse(scanner.getRunStatus()); - - assertTrue("Throttle appears to be engaged", - scanner.timeWaitingMs.get() < 10L); - assertTrue("Report complier threads logged no execution time", - scanner.timeRunningMs.get() > 0L); + ratio = + (float) scanner.timeWaitingMs.get() / scanner.timeRunningMs.get(); + } finally { + interruptor.shutdown(); + } - // Test with a 1000 limit, i.e. disabled - conf.setInt( - DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY, - 1000); - scanner = new DirectoryScanner(fds, conf); - scanner.setRetainDiffs(true); - scan(blocks, 0, 0, 0, 0, 0); - scanner.shutdown(); - assertFalse(scanner.getRunStatus()); + // We just want to test that it waits a lot, but it also runs some + LOG.info("RATIO: " + ratio); + assertTrue("Throttle is too permissive", ratio > 8); + assertTrue("Report complier threads logged no execution time", + scanner.timeRunningMs.get() > 0L); + + // Test with a 0 limit, i.e. disabled + conf.setInt( + DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY, + 0); + scanner = new DirectoryScanner(fds, conf); + scanner.setRetainDiffs(true); + scan(blocks, 0, 0, 0, 0, 0); + scanner.shutdown(); + assertFalse(scanner.getRunStatus()); - assertTrue("Throttle appears to be engaged", - scanner.timeWaitingMs.get() < 10L); - assertTrue("Report complier threads logged no execution time", - scanner.timeRunningMs.get() > 0L); + assertTrue("Throttle appears to be engaged", + scanner.timeWaitingMs.get() < 10L); + assertTrue("Report complier threads logged no execution time", + scanner.timeRunningMs.get() > 0L); - // Test that throttle works from regular start - conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY, 1); - conf.setInt( - DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY, - 10); - conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, 1); - scanner = new DirectoryScanner(fds, conf); - scanner.setRetainDiffs(true); - scanner.start(); + // Test with a 1000 limit, i.e. disabled + conf.setInt( + DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY, + 1000); + scanner = new DirectoryScanner(fds, conf); + scanner.setRetainDiffs(true); + scan(blocks, 0, 0, 0, 0, 0); + scanner.shutdown(); + assertFalse(scanner.getRunStatus()); - int count = 50; + assertTrue("Throttle appears to be engaged", + scanner.timeWaitingMs.get() < 10L); + assertTrue("Report complier threads logged no execution time", + scanner.timeRunningMs.get() > 0L); + + // Test that throttle works from regular start + conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY, 1); + conf.setInt( + DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THROTTLE_LIMIT_MS_PER_SEC_KEY, + 10); + conf.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, 1); + scanner = new DirectoryScanner(fds, conf); + scanner.setRetainDiffs(true); + scanner.start(); - while ((count > 0) && (scanner.timeWaitingMs.get() < 500L)) { - Thread.sleep(100L); - count -= 1; - } + int count = 50; - scanner.shutdown(); - assertFalse(scanner.getRunStatus()); - assertTrue("Throttle does not appear to be engaged", count > 0); - } finally { - cluster.shutdown(); + while ((count > 0) && (scanner.timeWaitingMs.get() < 500L)) { + Thread.sleep(100L); + count -= 1; } + + scanner.shutdown(); + assertFalse(scanner.getRunStatus()); + assertTrue("Throttle does not appear to be engaged", count > 0); } private float runThrottleTest(int blocks) From 9c5c5bc2e63dc2b57d98df55675f7d7de0c3ac64 Mon Sep 17 00:00:00 2001 From: Hanley Yang Date: Thu, 16 Sep 2021 20:08:55 +0800 Subject: [PATCH 2/2] Fix typo. --- .../hadoop/hdfs/server/datanode/TestDirectoryScanner.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java index 5855a0cc7ec63..a40b78b5cd992 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java @@ -685,7 +685,7 @@ public void testCalculateSleepTime() { runCalculateSleepTimeTest(100, 1001, 1000); } - public void runCalculateSleepTimeTest(int throttleLimitMsPerSec, int runintTime, long expected) { + public void runCalculateSleepTimeTest(int throttleLimitMsPerSec, int runningTime, long expected) { FsDatasetSpi fds = mock(FsDatasetSpi.class); FsVolumeSpi volumeSpi = mock(FsVolumeImpl.class); Configuration conf = new Configuration(getConfiguration()); @@ -694,7 +694,7 @@ public void runCalculateSleepTimeTest(int throttleLimitMsPerSec, int runintTime, throttleLimitMsPerSec); scanner = new DirectoryScanner(fds, conf); ReportCompiler compiler = scanner.new ReportCompiler(volumeSpi); - assertTrue(compiler.calculateSleepTime(runintTime) == expected); + assertTrue(compiler.calculateSleepTime(runningTime) == expected); } /**