Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Optional;
Expand Down Expand Up @@ -794,11 +795,27 @@ void shutdownLongCompactions(){
}

public void clearLongCompactionsQueue() {
longCompactions.getQueue().clear();
removeFromFilesCompacting(longCompactions);
}

public void clearShortCompactionsQueue() {
shortCompactions.getQueue().clear();
removeFromFilesCompacting(shortCompactions);
}

private void removeFromFilesCompacting(ThreadPoolExecutor compactor) {
Iterator<Runnable> iter = compactor.getQueue().iterator();
while (iter.hasNext()) {
Runnable runnable = iter.next();
if (!(runnable instanceof CompactionRunner)) {
continue;
}
CompactionRunner runner = (CompactionRunner) runnable;
if (runner.compaction != null && runner.compaction.hasSelection()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No data race here? Not sure, just asking

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for taking a look on this PR.

I don't think there is data race here. It's safe. Thanks.

Collection<HStoreFile> files = runner.compaction.getRequest().getFiles();
runner.store.removeFromCompactingFiles(files);
iter.remove();
}
}
}

public boolean isCompactionsEnabled() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.util.stream.Collectors;
import java.util.stream.LongStream;

import com.google.errorprone.annotations.RestrictedApi;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -1953,6 +1954,23 @@ private void addToCompactingFiles(Collection<HStoreFile> filesToAdd) {
Collections.sort(filesCompacting, storeEngine.getStoreFileManager().getStoreFileComparator());
}

/**
* Remove the files from compacting files. This usually happens when we clear compaction queues.
*/
void removeFromCompactingFiles(Collection<HStoreFile> filesToRemove) {
synchronized (filesCompacting) {
filesCompacting.removeAll(filesToRemove);
}
}

@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*|.*/TestCompaction.java")
List<HStoreFile> getFilesCompacting() {
synchronized (filesCompacting) {
return Lists.newArrayList(filesCompacting);
}
}

private void removeUnneededFiles() throws IOException {
if (!conf.getBoolean("hbase.store.delete.expired.storefile", true)) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,49 @@ public void testMultipleCustomCompactionRequests() throws Exception {
thread.interruptIfNecessary();
}

/**
* HBASE-25880: remove files in CompactionContext from filesCompacting
* when clear compaction queues
*/
@Test
public void testRemoveCompactingFilesWhenClearCompactionQueue() throws Exception {
// setup a compact/split thread on a mock server
HRegionServer mockServer = Mockito.mock(HRegionServer.class);
Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
CompactSplit thread = new CompactSplit(mockServer);
Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);

// create some store files and setup requests for each store on which we want to do a
// compaction
for (HStore store : r.getStores()) {
createStoreFile(r, store.getColumnFamilyName());
createStoreFile(r, store.getColumnFamilyName());
createStoreFile(r, store.getColumnFamilyName());
thread.requestCompaction(r, store, "test remove compacting files", PRIORITY_USER,
new LazyTracker(), null);
}

// I am a little confused here. Why longCompactions take one CompactionRunner?
assertEquals(2, thread.getLongCompactions().getActiveCount() +
thread.getShortCompactions().getActiveCount());

// compaction queues start from the third store.
List<HStoreFile> compactingFiles = r.getStores().get(2).getFilesCompacting();
Collection<HStoreFile> storeFiles = r.getStores().get(2).getStorefiles();
for (HStoreFile file : storeFiles) {
assertTrue(compactingFiles.contains(file));
}

thread.clearShortCompactionsQueue();

compactingFiles = r.getStores().get(2).getFilesCompacting();
for (HStoreFile file : storeFiles) {
assertFalse(compactingFiles.contains(file));
}

thread.interruptIfNecessary();
}

class StoreMockMaker extends StatefulStoreMockMaker {
public ArrayList<HStoreFile> compacting = new ArrayList<>();
public ArrayList<HStoreFile> notCompacting = new ArrayList<>();
Expand Down Expand Up @@ -874,6 +917,21 @@ public void afterExecution(Store store) {
}
}

/**
* Simple {@link CompactionLifeCycleTracker} that sleep 10s before actually doCompaction to keep
* compaction queue nonempty
*/
public static class LazyTracker implements CompactionLifeCycleTracker {
@Override
public void beforeExecution(Store store) {
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}

/**
* Simple {@link CompactionLifeCycleTracker} on which you can wait until the requested compaction
* finishes.
Expand Down