diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java index 441b18b3302f..3d2103b79307 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java @@ -24,6 +24,7 @@ import java.io.IOException; import java.io.PrintWriter; import java.io.StringWriter; +import java.util.Collection; import java.util.Comparator; import java.util.Iterator; import java.util.Optional; @@ -794,11 +795,27 @@ void shutdownLongCompactions(){ } public void clearLongCompactionsQueue() { - longCompactions.getQueue().clear(); + removeFromFilesCompacting(longCompactions); } public void clearShortCompactionsQueue() { - shortCompactions.getQueue().clear(); + removeFromFilesCompacting(shortCompactions); + } + + private void removeFromFilesCompacting(ThreadPoolExecutor compactor) { + Iterator iter = compactor.getQueue().iterator(); + while (iter.hasNext()) { + Runnable runnable = iter.next(); + if (!(runnable instanceof CompactionRunner)) { + continue; + } + CompactionRunner runner = (CompactionRunner) runnable; + if (runner.compaction != null && runner.compaction.hasSelection()) { + Collection files = runner.compaction.getRequest().getFiles(); + runner.store.removeFromCompactingFiles(files); + iter.remove(); + } + } } public boolean isCompactionsEnabled() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java index 45b17bb32ad1..f837a2e987a4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -54,6 +54,7 @@ import java.util.stream.Collectors; import java.util.stream.LongStream; +import com.google.errorprone.annotations.RestrictedApi; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -1953,6 +1954,23 @@ private void addToCompactingFiles(Collection filesToAdd) { Collections.sort(filesCompacting, storeEngine.getStoreFileManager().getStoreFileComparator()); } + /** + * Remove the files from compacting files. This usually happens when we clear compaction queues. + */ + void removeFromCompactingFiles(Collection filesToRemove) { + synchronized (filesCompacting) { + filesCompacting.removeAll(filesToRemove); + } + } + + @RestrictedApi(explanation = "Should only be called in tests", link = "", + allowedOnPath = ".*/src/test/.*|.*/TestCompaction.java") + List getFilesCompacting() { + synchronized (filesCompacting) { + return Lists.newArrayList(filesCompacting); + } + } + private void removeUnneededFiles() throws IOException { if (!conf.getBoolean("hbase.store.delete.expired.storefile", true)) { return; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java index 2174d514b4b4..fa1cfa108636 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java @@ -553,6 +553,49 @@ public void testMultipleCustomCompactionRequests() throws Exception { thread.interruptIfNecessary(); } + /** + * HBASE-25880: remove files in CompactionContext from filesCompacting + * when clear compaction queues + */ + @Test + public void testRemoveCompactingFilesWhenClearCompactionQueue() throws Exception { + // setup a compact/split thread on a mock server + HRegionServer mockServer = Mockito.mock(HRegionServer.class); + Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf()); + CompactSplit thread = new CompactSplit(mockServer); + Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread); + + // create some store files and setup requests for each store on which we want to do a + // compaction + for (HStore store : r.getStores()) { + createStoreFile(r, store.getColumnFamilyName()); + createStoreFile(r, store.getColumnFamilyName()); + createStoreFile(r, store.getColumnFamilyName()); + thread.requestCompaction(r, store, "test remove compacting files", PRIORITY_USER, + new LazyTracker(), null); + } + + // I am a little confused here. Why longCompactions take one CompactionRunner? + assertEquals(2, thread.getLongCompactions().getActiveCount() + + thread.getShortCompactions().getActiveCount()); + + // compaction queues start from the third store. + List compactingFiles = r.getStores().get(2).getFilesCompacting(); + Collection storeFiles = r.getStores().get(2).getStorefiles(); + for (HStoreFile file : storeFiles) { + assertTrue(compactingFiles.contains(file)); + } + + thread.clearShortCompactionsQueue(); + + compactingFiles = r.getStores().get(2).getFilesCompacting(); + for (HStoreFile file : storeFiles) { + assertFalse(compactingFiles.contains(file)); + } + + thread.interruptIfNecessary(); + } + class StoreMockMaker extends StatefulStoreMockMaker { public ArrayList compacting = new ArrayList<>(); public ArrayList notCompacting = new ArrayList<>(); @@ -874,6 +917,21 @@ public void afterExecution(Store store) { } } + /** + * Simple {@link CompactionLifeCycleTracker} that sleep 10s before actually doCompaction to keep + * compaction queue nonempty + */ + public static class LazyTracker implements CompactionLifeCycleTracker { + @Override + public void beforeExecution(Store store) { + try { + Thread.sleep(10000); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + } + /** * Simple {@link CompactionLifeCycleTracker} on which you can wait until the requested compaction * finishes.