diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java index 441b18b3302f..7dfdcca7ee5e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java @@ -42,7 +42,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequestImpl; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionSplitRequester; import org.apache.hadoop.hbase.regionserver.throttle.CompactionThroughputControllerFactory; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.security.Superusers; @@ -62,7 +62,7 @@ * Compact region on request and then run split if appropriate */ @InterfaceAudience.Private -public class CompactSplit implements CompactionRequester, PropagatingConfigurationObserver { +public class CompactSplit implements CompactionSplitRequester, PropagatingConfigurationObserver { private static final Logger LOG = LoggerFactory.getLogger(CompactSplit.class); // Configuration key for the large compaction threads. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 22e3901e2240..0cc50972b4cb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -6007,7 +6007,7 @@ private long loadRecoveredHFilesIfAny(Collection stores) throws IOExcept } } if (this.rsServices != null && store.needsCompaction()) { - this.rsServices.getCompactionRequestor() + this.rsServices.getCompactionSplitRequester() .requestCompaction(this, store, "load recovered hfiles request compaction", Store.PRIORITY_USER + 1, CompactionLifeCycleTracker.DUMMY, null); } @@ -6934,18 +6934,19 @@ public Map> bulkLoadHFiles(Collection> f // guaranteed to be one beyond the file made when we flushed (or if nothing to flush, it is // a sequence id that we can be sure is beyond the last hfile written). if (assignSeqId) { - FlushResult fs = flushcache(true, false, FlushLifeCycleTracker.DUMMY); - if (fs.isFlushSucceeded()) { - seqId = ((FlushResultImpl)fs).flushSequenceId; - } else if (fs.getResult() == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) { - seqId = ((FlushResultImpl)fs).flushSequenceId; - } else if (fs.getResult() == FlushResult.Result.CANNOT_FLUSH) { + FlushResult flushResult = flushcache(true, false, + FlushLifeCycleTracker.DUMMY); + if (flushResult.isFlushSucceeded()) { + seqId = ((FlushResultImpl)flushResult).flushSequenceId; + } else if (flushResult.getResult() == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) { + seqId = ((FlushResultImpl)flushResult).flushSequenceId; + } else if (flushResult.getResult() == FlushResult.Result.CANNOT_FLUSH) { // CANNOT_FLUSH may mean that a flush is already on-going // we need to wait for that flush to complete waitForFlushes(); } else { throw new IOException("Could not bulk load with an assigned sequential ID because the "+ - "flush didn't run. Reason for not flushing: " + ((FlushResultImpl)fs).failureReason); + "flush didn't run. Reason for not flushing: " + ((FlushResultImpl)flushResult).failureReason); } } @@ -7038,22 +7039,27 @@ public Map> bulkLoadHFiles(Collection> f } isSuccessful = true; - if (conf.getBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, false)) { - // request compaction - familyWithFinalPath.keySet().forEach(family -> { - HStore store = getStore(family); - try { - if (this.rsServices != null && store.needsCompaction()) { - this.rsServices.getCompactionRequestor().requestCompaction(this, store, - "bulkload hfiles request compaction", Store.PRIORITY_USER + 1, - CompactionLifeCycleTracker.DUMMY, null); - LOG.debug("bulkload hfiles request compaction region : {}, family : {}", - this.getRegionInfo(), family); + // check split + boolean shouldSplit = checkSplit().isPresent(); + if (shouldSplit) { + this.rsServices.getCompactionSplitRequester().requestSplit(this); + } else { + // request compact + try { + if (this.rsServices != null && conf.getBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, false)) { + for (byte[] familyKey : familyWithFinalPath.keySet()) { + HStore store = getStore(familyKey); + if (store != null && store.needsCompaction()) { + this.rsServices.getCompactionSplitRequester() + .requestSystemCompaction(this, "bulk load hfiles"); + LOG.debug("Request compact for region {} after bulk load", getRegionInfo()); + break; + } } - } catch (IOException e) { - LOG.error("bulkload hfiles request compaction error ", e); } - }); + } catch (IOException e) { + LOG.error("bulk load hfiles request compaction error ", e); + } } } finally { if (wal != null && !storeFiles.isEmpty()) { @@ -8348,7 +8354,7 @@ public void requestCompaction(String why, int priority, boolean major, if (major) { stores.values().forEach(HStore::triggerMajorCompaction); } - rsServices.getCompactionRequestor().requestCompaction(this, why, priority, tracker, + rsServices.getCompactionSplitRequester().requestCompaction(this, why, priority, tracker, RpcServer.getRequestUser().orElse(null)); } @@ -8363,7 +8369,7 @@ public void requestCompaction(byte[] family, String why, int priority, boolean m if (major) { store.triggerMajorCompaction(); } - rsServices.getCompactionRequestor().requestCompaction(this, store, why, priority, tracker, + rsServices.getCompactionSplitRequester().requestCompaction(this, store, why, priority, tracker, RpcServer.getRequestUser().orElse(null)); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index a7a3b7dd6db5..a832739d7a01 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -141,7 +141,7 @@ import org.apache.hadoop.hbase.regionserver.compactions.CompactionConfiguration; import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionSplitRequester; import org.apache.hadoop.hbase.regionserver.handler.CloseMetaHandler; import org.apache.hadoop.hbase.regionserver.handler.CloseRegionHandler; import org.apache.hadoop.hbase.regionserver.handler.RSProcedureHandler; @@ -3054,7 +3054,7 @@ public FlushRequester getFlushRequester() { } @Override - public CompactionRequester getCompactionRequestor() { + public CompactionSplitRequester getCompactionSplitRequester() { return this.compactSplitThread; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java index fce8df172643..1084f363e5b5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionServerServices.java @@ -37,7 +37,7 @@ import org.apache.hadoop.hbase.quotas.RegionServerRpcQuotaManager; import org.apache.hadoop.hbase.quotas.RegionServerSpaceQuotaManager; import org.apache.hadoop.hbase.quotas.RegionSizeStore; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionSplitRequester; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.security.access.AccessChecker; import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher; @@ -73,10 +73,10 @@ public interface RegionServerServices extends Server, MutableOnlineRegions, Favo FlushRequester getFlushRequester(); /** - * @return Implementation of {@link CompactionRequester} or null. Usually it will not be null + * @return Implementation of {@link CompactionSplitRequester} or null. Usually it will not be null * unless during intialization. */ - CompactionRequester getCompactionRequestor(); + CompactionSplitRequester getCompactionSplitRequester(); /** * @return the RegionServerAccounting for this Region Server diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequester.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionSplitRequester.java similarity index 81% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequester.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionSplitRequester.java index e5f536007e87..cfeb09af3593 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequester.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionSplitRequester.java @@ -21,16 +21,17 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.security.User; import org.apache.yetus.audience.InterfaceAudience; import edu.umd.cs.findbugs.annotations.Nullable; /** - * Request a compaction. + * Request compaction and split. */ @InterfaceAudience.Private -public interface CompactionRequester { +public interface CompactionSplitRequester { /** * Request compaction on all the stores of the given region. @@ -44,6 +45,16 @@ void requestCompaction(HRegion region, String why, int priority, void requestCompaction(HRegion region, HStore store, String why, int priority, CompactionLifeCycleTracker tracker, @Nullable User user) throws IOException; + /** + * Request compaction on the given region. + */ + void requestSystemCompaction(HRegion region, String why) throws IOException; + + /** + * Request split on the given region. + */ + boolean requestSplit(final Region r) throws IOException; + /** * on/off compaction */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java index 82f7a85df39c..890e4ef361ab 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/MockRegionServerServices.java @@ -53,7 +53,7 @@ import org.apache.hadoop.hbase.regionserver.ReplicationSourceService; import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager; import org.apache.hadoop.hbase.regionserver.ServerNonceManager; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionSplitRequester; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.security.access.AccessChecker; import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher; @@ -157,7 +157,7 @@ public FlushRequester getFlushRequester() { } @Override - public CompactionRequester getCompactionRequestor() { + public CompactionSplitRequester getCompactionSplitRequester() { return null; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java index 69a7a79644e2..90f7d37bc2a4 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java @@ -67,7 +67,7 @@ import org.apache.hadoop.hbase.regionserver.ReplicationSourceService; import org.apache.hadoop.hbase.regionserver.SecureBulkLoadManager; import org.apache.hadoop.hbase.regionserver.ServerNonceManager; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionSplitRequester; import org.apache.hadoop.hbase.regionserver.throttle.ThroughputController; import org.apache.hadoop.hbase.security.access.AccessChecker; import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher; @@ -328,7 +328,7 @@ public FlushRequester getFlushRequester() { return null; } @Override - public CompactionRequester getCompactionRequestor() { + public CompactionSplitRequester getCompactionSplitRequester() { return null; } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionAfterBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionAfterBulkLoad.java index c736513e97ae..9899fa38f231 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionAfterBulkLoad.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionAfterBulkLoad.java @@ -19,19 +19,16 @@ import static org.apache.hadoop.hbase.regionserver.HRegion.COMPACTION_AFTER_BULKLOAD_ENABLE; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyInt; -import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.isA; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import static org.mockito.hamcrest.MockitoHamcrest.argThat; - import java.io.IOException; import java.util.ArrayList; import java.util.List; - import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.TableName; @@ -39,8 +36,7 @@ import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.RegionInfoBuilder; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionSplitRequester; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.wal.WALEdit; @@ -55,7 +51,7 @@ @Category(SmallTests.class) public class TestCompactionAfterBulkLoad extends TestBulkloadBase { private final RegionServerServices regionServerServices = mock(RegionServerServices.class); - private final CompactionRequester compactionRequester = mock(CompactSplit.class); + private final CompactionSplitRequester compactionSplitRequester = mock(CompactSplit.class); @ClassRule public static final HBaseClassTestRule CLASS_RULE = @@ -88,7 +84,7 @@ public void shouldRequestCompactionAfterBulkLoad() throws IOException { try { conf.setBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, true); when(regionServerServices.getConfiguration()).thenReturn(conf); - when(regionServerServices.getCompactionRequestor()).thenReturn(compactionRequester); + when(regionServerServices.getCompactionSplitRequester()).thenReturn(compactionSplitRequester); when(log.appendMarker(any(), any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD)))) .thenAnswer(new Answer() { @Override @@ -103,12 +99,10 @@ public Object answer(InvocationOnMock invocation) { } }); - Mockito.doNothing().when(compactionRequester).requestCompaction(any(), any(), any(), anyInt(), - any(), any()); - testRegionWithFamilies(family1, family2, family3).bulkLoadHFiles(familyPaths, false, null); - // invoke three times for 3 families - verify(compactionRequester, times(3)).requestCompaction(isA(HRegion.class), isA(HStore.class), - isA(String.class), anyInt(), eq(CompactionLifeCycleTracker.DUMMY), eq(null)); + Mockito.doNothing().when(compactionSplitRequester).requestSystemCompaction(any(), any()); + testRegionWithFamilies(family1, family2, family3).bulkLoadHFiles(familyPaths, true, null); + // invoke three times for 1 families + verify(compactionSplitRequester, times(1)).requestSystemCompaction(isA(HRegion.class), anyString()); } finally { conf.setBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, false); }