Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionSplitRequester;
import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.Superusers;
Expand All @@ -62,7 +62,7 @@
* Compact region on request and then run split if appropriate
*/
@InterfaceAudience.Private
public class CompactSplit implements CompactionRequester, PropagatingConfigurationObserver {
public class CompactSplit implements CompactionSplitRequester, PropagatingConfigurationObserver {
private static final Logger LOG = LoggerFactory.getLogger(CompactSplit.class);

// Configuration key for the large compaction threads.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6007,7 +6007,7 @@ private long loadRecoveredHFilesIfAny(Collection<HStore> stores) throws IOExcept
}
}
if (this.rsServices != null && store.needsCompaction()) {
this.rsServices.getCompactionRequestor()
this.rsServices.getCompactionSplitRequester()
.requestCompaction(this, store, "load recovered hfiles request compaction",
Store.PRIORITY_USER + 1, CompactionLifeCycleTracker.DUMMY, null);
}
Expand Down Expand Up @@ -6934,18 +6934,19 @@ public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> f
// guaranteed to be one beyond the file made when we flushed (or if nothing to flush, it is
// a sequence id that we can be sure is beyond the last hfile written).
if (assignSeqId) {
FlushResult fs = flushcache(true, false, FlushLifeCycleTracker.DUMMY);
if (fs.isFlushSucceeded()) {
seqId = ((FlushResultImpl)fs).flushSequenceId;
} else if (fs.getResult() == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) {
seqId = ((FlushResultImpl)fs).flushSequenceId;
} else if (fs.getResult() == FlushResult.Result.CANNOT_FLUSH) {
FlushResult flushResult = flushcache(true, false,
FlushLifeCycleTracker.DUMMY);
if (flushResult.isFlushSucceeded()) {
seqId = ((FlushResultImpl)flushResult).flushSequenceId;
} else if (flushResult.getResult() == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) {
seqId = ((FlushResultImpl)flushResult).flushSequenceId;
} else if (flushResult.getResult() == FlushResult.Result.CANNOT_FLUSH) {
// CANNOT_FLUSH may mean that a flush is already on-going
// we need to wait for that flush to complete
waitForFlushes();
} else {
throw new IOException("Could not bulk load with an assigned sequential ID because the "+
"flush didn't run. Reason for not flushing: " + ((FlushResultImpl)fs).failureReason);
"flush didn't run. Reason for not flushing: " + ((FlushResultImpl)flushResult).failureReason);
}
}

Expand Down Expand Up @@ -7038,22 +7039,27 @@ public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> f
}

isSuccessful = true;
if (conf.getBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, false)) {
// request compaction
familyWithFinalPath.keySet().forEach(family -> {
HStore store = getStore(family);
try {
if (this.rsServices != null && store.needsCompaction()) {
this.rsServices.getCompactionRequestor().requestCompaction(this, store,
"bulkload hfiles request compaction", Store.PRIORITY_USER + 1,
CompactionLifeCycleTracker.DUMMY, null);
LOG.debug("bulkload hfiles request compaction region : {}, family : {}",
this.getRegionInfo(), family);
// check split
boolean shouldSplit = checkSplit().isPresent();
if (shouldSplit) {
this.rsServices.getCompactionSplitRequester().requestSplit(this);
} else {
// request compact
try {
if (this.rsServices != null && conf.getBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, false)) {
for (byte[] familyKey : familyWithFinalPath.keySet()) {
HStore store = getStore(familyKey);
if (store != null && store.needsCompaction()) {
this.rsServices.getCompactionSplitRequester()
.requestSystemCompaction(this, "bulk load hfiles");
LOG.debug("Request compact for region {} after bulk load", getRegionInfo());
break;
}
}
} catch (IOException e) {
LOG.error("bulkload hfiles request compaction error ", e);
}
});
} catch (IOException e) {
LOG.error("bulk load hfiles request compaction error ", e);
}
}
} finally {
if (wal != null && !storeFiles.isEmpty()) {
Expand Down Expand Up @@ -8348,7 +8354,7 @@ public void requestCompaction(String why, int priority, boolean major,
if (major) {
stores.values().forEach(HStore::triggerMajorCompaction);
}
rsServices.getCompactionRequestor().requestCompaction(this, why, priority, tracker,
rsServices.getCompactionSplitRequester().requestCompaction(this, why, priority, tracker,
RpcServer.getRequestUser().orElse(null));
}

Expand All @@ -8363,7 +8369,7 @@ public void requestCompaction(byte[] family, String why, int priority, boolean m
if (major) {
store.triggerMajorCompaction();
}
rsServices.getCompactionRequestor().requestCompaction(this, store, why, priority, tracker,
rsServices.getCompactionSplitRequester().requestCompaction(this, store, why, priority, tracker,
RpcServer.getRequestUser().orElse(null));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@
import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionSplitRequester;
import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler;
import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler;
import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler;
Expand Down Expand Up @@ -3054,7 +3054,7 @@ public FlushRequester getFlushRequester() {
}

@Override
public CompactionRequester getCompactionRequestor() {
public CompactionSplitRequester getCompactionSplitRequester() {
return this.compactSplitThread;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager;
import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager;
import org.apache.hadoop.hbase.quotas.RegionSizeStore;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionSplitRequester;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.access.AccessChecker;
import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
Expand Down Expand Up @@ -73,10 +73,10 @@ public interface RegionServerServices extends Server, MutableOnlineRegions, Favo
FlushRequester getFlushRequester();

/**
* @return Implementation of {@link CompactionRequester} or null. Usually it will not be null
* @return Implementation of {@link CompactionSplitRequester} or null. Usually it will not be null
* unless during intialization.
*/
CompactionRequester getCompactionRequestor();
CompactionSplitRequester getCompactionSplitRequester();

/**
* @return the RegionServerAccounting for this Region Server
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,17 @@

import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.security.User;
import org.apache.yetus.audience.InterfaceAudience;

import edu.umd.cs.findbugs.annotations.Nullable;

/**
* Request a compaction.
* Request compaction and split.
*/
@InterfaceAudience.Private
public interface CompactionRequester {
public interface CompactionSplitRequester {

/**
* Request compaction on all the stores of the given region.
Expand All @@ -44,6 +45,16 @@ void requestCompaction(HRegion region, String why, int priority,
void requestCompaction(HRegion region, HStore store, String why, int priority,
CompactionLifeCycleTracker tracker, @Nullable User user) throws IOException;

/**
* Request compaction on the given region.
*/
void requestSystemCompaction(HRegion region, String why) throws IOException;

/**
* Request split on the given region.
*/
boolean requestSplit(final Region r) throws IOException;

/**
* on/off compaction
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager;
import org.apache.hadoop.hbase.regionserver.ServerNonceManager;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionSplitRequester;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.access.AccessChecker;
import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
Expand Down Expand Up @@ -157,7 +157,7 @@ public FlushRequester getFlushRequester() {
}

@Override
public CompactionRequester getCompactionRequestor() {
public CompactionSplitRequester getCompactionSplitRequester() {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@
import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager;
import org.apache.hadoop.hbase.regionserver.ServerNonceManager;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionSplitRequester;
import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController;
import org.apache.hadoop.hbase.security.access.AccessChecker;
import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher;
Expand Down Expand Up @@ -328,7 +328,7 @@ public FlushRequester getFlushRequester() {
return null;
}
@Override
public CompactionRequester getCompactionRequestor() {
public CompactionSplitRequester getCompactionSplitRequester() {
return null;
}
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,24 @@

import static org.apache.hadoop.hbase.regionserver.HRegion.COMPACTION_AFTER_BULKLOAD_ENABLE;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.hamcrest.MockitoHamcrest.argThat;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionSplitRequester;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WALEdit;
Expand All @@ -55,7 +51,7 @@
@Category(SmallTests.class)
public class TestCompactionAfterBulkLoad extends TestBulkloadBase {
private final RegionServerServices regionServerServices = mock(RegionServerServices.class);
private final CompactionRequester compactionRequester = mock(CompactSplit.class);
private final CompactionSplitRequester compactionSplitRequester = mock(CompactSplit.class);

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
Expand Down Expand Up @@ -88,7 +84,7 @@ public void shouldRequestCompactionAfterBulkLoad() throws IOException {
try {
conf.setBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, true);
when(regionServerServices.getConfiguration()).thenReturn(conf);
when(regionServerServices.getCompactionRequestor()).thenReturn(compactionRequester);
when(regionServerServices.getCompactionSplitRequester()).thenReturn(compactionSplitRequester);
when(log.appendMarker(any(), any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD))))
.thenAnswer(new Answer() {
@Override
Expand All @@ -103,12 +99,10 @@ public Object answer(InvocationOnMock invocation) {
}
});

Mockito.doNothing().when(compactionRequester).requestCompaction(any(), any(), any(), anyInt(),
any(), any());
testRegionWithFamilies(family1, family2, family3).bulkLoadHFiles(familyPaths, false, null);
// invoke three times for 3 families
verify(compactionRequester, times(3)).requestCompaction(isA(HRegion.class), isA(HStore.class),
isA(String.class), anyInt(), eq(CompactionLifeCycleTracker.DUMMY), eq(null));
Mockito.doNothing().when(compactionSplitRequester).requestSystemCompaction(any(), any());
testRegionWithFamilies(family1, family2, family3).bulkLoadHFiles(familyPaths, true, null);
// invoke three times for 1 families
verify(compactionSplitRequester, times(1)).requestSystemCompaction(isA(HRegion.class), anyString());
} finally {
conf.setBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, false);
}
Expand Down