From 70a832c991d9663a0dc5d0ee75dd90291e1a5592 Mon Sep 17 00:00:00 2001 From: huiruan Date: Wed, 12 May 2021 22:30:36 +0800 Subject: [PATCH 1/6] remove files in CompactionContext from filesCompacting when clear compaction queues --- .../hbase/regionserver/CompactSplit.java | 11 ++++ .../hadoop/hbase/regionserver/HStore.java | 16 +++++ .../hbase/regionserver/TestCompaction.java | 58 +++++++++++++++++++ 3 files changed, 85 insertions(+) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java index 441b18b3302f..226d70177e43 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; +import java.util.Collection; import java.util.Comparator; import java.util.Iterator; import java.util.Optional; @@ -794,13 +795,23 @@ void shutdownLongCompactions(){ } public void clearLongCompactionsQueue() { + removeFromFilesCompacting(longCompactions); longCompactions.getQueue().clear(); } public void clearShortCompactionsQueue() { + removeFromFilesCompacting(shortCompactions); shortCompactions.getQueue().clear(); } + private void removeFromFilesCompacting(ThreadPoolExecutor compactor) { + for (Runnable runnable : compactor.getQueue()) { + CompactionRunner runner = (CompactionRunner) runnable; + Collection files = runner.compaction.getRequest().getFiles(); + runner.store.removeFromCompactingFiles(files); + } + } + public boolean isCompactionsEnabled() { return compactionsEnabled; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 45b17bb32ad1..aa28da46bd08 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -1953,6 +1953,22 @@ private void addToCompactingFiles(Collection filesToAdd) { Collections.sort(filesCompacting, storeEngine.getStoreFileManager().getStoreFileComparator()); } + /** + * Remove the files from compacting files. This usually happens when we clear compaction queues. + */ + public void removeFromCompactingFiles(Collection filesToRemove) { + synchronized (filesCompacting) { + filesCompacting.removeAll(filesToRemove); + Collections.sort(filesCompacting, storeEngine.getStoreFileManager().getStoreFileComparator()); + } + } + + public List getFilesCompacting() { + synchronized (filesCompacting) { + return Lists.newArrayList(filesCompacting); + } + } + private void removeUnneededFiles() throws IOException { if (!conf.getBoolean("hbase.store.delete.expired.storefile", true)) { return; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java index 2174d514b4b4..fa1cfa108636 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java @@ -553,6 +553,49 @@ public void testMultipleCustomCompactionRequests() throws Exception { thread.interruptIfNecessary(); } + /** + * HBASE-25880: remove files in CompactionContext from filesCompacting + * when clear compaction queues + */ + @Test + public void testRemoveCompactingFilesWhenClearCompactionQueue() throws Exception { + // setup a compact/split thread on a mock server + HRegionServer mockServer = Mockito.mock(HRegionServer.class); + Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf()); + CompactSplit thread = new CompactSplit(mockServer); + Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); + + // create some store files and setup requests for each store on which we want to do a + // compaction + for (HStore store : r.getStores()) { + createStoreFile(r, store.getColumnFamilyName()); + createStoreFile(r, store.getColumnFamilyName()); + createStoreFile(r, store.getColumnFamilyName()); + thread.requestCompaction(r, store, "test remove compacting files", PRIORITY_USER, + new LazyTracker(), null); + } + + // I am a little confused here. Why longCompactions take one CompactionRunner? + assertEquals(2, thread.getLongCompactions().getActiveCount() + + thread.getShortCompactions().getActiveCount()); + + // compaction queues start from the third store. + List compactingFiles = r.getStores().get(2).getFilesCompacting(); + Collection storeFiles = r.getStores().get(2).getStorefiles(); + for (HStoreFile file : storeFiles) { + assertTrue(compactingFiles.contains(file)); + } + + thread.clearShortCompactionsQueue(); + + compactingFiles = r.getStores().get(2).getFilesCompacting(); + for (HStoreFile file : storeFiles) { + assertFalse(compactingFiles.contains(file)); + } + + thread.interruptIfNecessary(); + } + class StoreMockMaker extends StatefulStoreMockMaker { public ArrayList compacting = new ArrayList<>(); public ArrayList notCompacting = new ArrayList<>(); @@ -874,6 +917,21 @@ public void afterExecution(Store store) { } } + /** + * Simple {@link CompactionLifeCycleTracker} that sleep 10s before actually doCompaction to keep + * compaction queue nonempty + */ + public static class LazyTracker implements CompactionLifeCycleTracker { + @Override + public void beforeExecution(Store store) { + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + /** * Simple {@link CompactionLifeCycleTracker} on which you can wait until the requested compaction * finishes. From 191f6430ddca53dc1a1a7475ae65e2e1e605b514 Mon Sep 17 00:00:00 2001 From: huiruan Date: Thu, 13 May 2021 13:58:09 +0800 Subject: [PATCH 2/6] change some public methods to package-private --- .../java/org/apache/hadoop/hbase/regionserver/HStore.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index aa28da46bd08..da861ae99123 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -54,6 +54,7 @@ import java.util.stream.Collectors; import java.util.stream.LongStream; +import com.google.errorprone.annotations.RestrictedApi; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -1956,14 +1957,16 @@ private void addToCompactingFiles(Collection filesToAdd) { /** * Remove the files from compacting files. This usually happens when we clear compaction queues. */ - public void removeFromCompactingFiles(Collection filesToRemove) { + void removeFromCompactingFiles(Collection filesToRemove) { synchronized (filesCompacting) { filesCompacting.removeAll(filesToRemove); Collections.sort(filesCompacting, storeEngine.getStoreFileManager().getStoreFileComparator()); } } - public List getFilesCompacting() { + @RestrictedApi(explanation = "Should only be called in tests", link = "", + allowedOnPath = ".*/src/test/.*|.*/TestCompaction.java") + List getFilesCompacting() { synchronized (filesCompacting) { return Lists.newArrayList(filesCompacting); } From a928f67d814dbb64815370be54e21ac82f8935ef Mon Sep 17 00:00:00 2001 From: huiruan Date: Thu, 13 May 2021 13:59:45 +0800 Subject: [PATCH 3/6] add type check for CompactionRunner --- .../org/apache/hadoop/hbase/regionserver/CompactSplit.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java index 226d70177e43..b4d591c9c75f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java @@ -806,6 +806,9 @@ public void clearShortCompactionsQueue() { private void removeFromFilesCompacting(ThreadPoolExecutor compactor) { for (Runnable runnable : compactor.getQueue()) { + if (!(runnable instanceof CompactionRunner)) { + continue; + } CompactionRunner runner = (CompactionRunner) runnable; Collection files = runner.compaction.getRequest().getFiles(); runner.store.removeFromCompactingFiles(files); From d5334bdccc97bbe98e18602fdba7f752b0f407f1 Mon Sep 17 00:00:00 2001 From: huiruan Date: Thu, 13 May 2021 14:02:24 +0800 Subject: [PATCH 4/6] fix a null pointer bug --- .../apache/hadoop/hbase/regionserver/CompactSplit.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java index b4d591c9c75f..98e4b7416a4b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java @@ -810,8 +810,12 @@ private void removeFromFilesCompacting(ThreadPoolExecutor compactor) { continue; } CompactionRunner runner = (CompactionRunner) runnable; - Collection files = runner.compaction.getRequest().getFiles(); - runner.store.removeFromCompactingFiles(files); + // for system compaction, files selection will be delayed until the compaction task + // actually runs, so compaction context is null for system compaction + if (runner.compaction != null) { + Collection files = runner.compaction.getRequest().getFiles(); + runner.store.removeFromCompactingFiles(files); + } } } From d278f87c5ce8fc36e5ca94e1d3cbdf2b545eeec5 Mon Sep 17 00:00:00 2001 From: huiruan Date: Thu, 13 May 2021 20:02:55 +0800 Subject: [PATCH 5/6] remove files and compact request one by one --- .../hadoop/hbase/regionserver/CompactSplit.java | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java index 98e4b7416a4b..3d2103b79307 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java @@ -796,25 +796,24 @@ void shutdownLongCompactions(){ public void clearLongCompactionsQueue() { removeFromFilesCompacting(longCompactions); - longCompactions.getQueue().clear(); } public void clearShortCompactionsQueue() { removeFromFilesCompacting(shortCompactions); - shortCompactions.getQueue().clear(); } private void removeFromFilesCompacting(ThreadPoolExecutor compactor) { - for (Runnable runnable : compactor.getQueue()) { + Iterator iter = compactor.getQueue().iterator(); + while (iter.hasNext()) { + Runnable runnable = iter.next(); if (!(runnable instanceof CompactionRunner)) { continue; } CompactionRunner runner = (CompactionRunner) runnable; - // for system compaction, files selection will be delayed until the compaction task - // actually runs, so compaction context is null for system compaction - if (runner.compaction != null) { + if (runner.compaction != null && runner.compaction.hasSelection()) { Collection files = runner.compaction.getRequest().getFiles(); runner.store.removeFromCompactingFiles(files); + iter.remove(); } } } From 1358039f26b75987f0bbe538c1feb1679121745d Mon Sep 17 00:00:00 2001 From: huiruan Date: Fri, 14 May 2021 22:09:19 +0800 Subject: [PATCH 6/6] remove unnecessary sort for filesCompacting --- .../main/java/org/apache/hadoop/hbase/regionserver/HStore.java | 1 - 1 file changed, 1 deletion(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index da861ae99123..f837a2e987a4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -1960,7 +1960,6 @@ private void addToCompactingFiles(Collection filesToAdd) { void removeFromCompactingFiles(Collection filesToRemove) { synchronized (filesCompacting) { filesCompacting.removeAll(filesToRemove); - Collections.sort(filesCompacting, storeEngine.getStoreFileManager().getStoreFileComparator()); } }