Skip to content

Commit 3477629

Browse files
author
niuyulin
committed
HBASE-25603 Add switch for compaction after bulkload
1 parent 51a3d45 commit 3477629

File tree

2 files changed

+46
-32
lines changed

2 files changed

+46
-32
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java

Lines changed: 20 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
244244
public static final String WAL_HSYNC_CONF_KEY = "hbase.wal.hsync";
245245
public static final boolean DEFAULT_WAL_HSYNC = false;
246246

247+
/** Parameter name for compaction after bulkload */
248+
public static final String COMPACTION_AFTER_BULKLOAD_ENABLE =
249+
"hbase.compaction.after.bulkload.enable";
250+
247251
/**
248252
* This is for for using HRegion as a local storage, where we may put the recovered edits in a
249253
* special place. Once this is set, we will only replay the recovered edits under this directory
@@ -7025,19 +7029,23 @@ public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> f
70257029
}
70267030

70277031
isSuccessful = true;
7028-
//request compaction
7029-
familyWithFinalPath.keySet().forEach(family -> {
7030-
HStore store = getStore(family);
7031-
try {
7032-
if (this.rsServices != null && store.needsCompaction()) {
7033-
this.rsServices.getCompactionRequestor().requestCompaction(this, store,
7034-
"bulkload hfiles request compaction", Store.PRIORITY_USER + 1,
7035-
CompactionLifeCycleTracker.DUMMY, null);
7032+
if (conf.getBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, true)) {
7033+
// request compaction
7034+
familyWithFinalPath.keySet().forEach(family -> {
7035+
HStore store = getStore(family);
7036+
try {
7037+
if (this.rsServices != null && store.needsCompaction()) {
7038+
this.rsServices.getCompactionRequestor().requestCompaction(this, store,
7039+
"bulkload hfiles request compaction", Store.PRIORITY_USER + 1,
7040+
CompactionLifeCycleTracker.DUMMY, null);
7041+
LOG.debug("bulkload hfiles request compaction region : {}, family : {}",
7042+
this.getRegionInfo(), family);
7043+
}
7044+
} catch (IOException e) {
7045+
LOG.error("bulkload hfiles request compaction error ", e);
70367046
}
7037-
} catch (IOException e) {
7038-
LOG.error("bulkload hfiles request compaction error ", e);
7039-
}
7040-
});
7047+
});
7048+
}
70417049
} finally {
70427050
if (wal != null && !storeFiles.isEmpty()) {
70437051
// Write a bulk load event for hfiles that are loaded

hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionAfterBulkLoad.java

Lines changed: 26 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.hadoop.hbase.regionserver;
1919

20+
import static org.apache.hadoop.hbase.regionserver.HRegion.COMPACTION_AFTER_BULKLOAD_ENABLE;
2021
import static org.mockito.ArgumentMatchers.any;
2122
import static org.mockito.ArgumentMatchers.anyInt;
2223
import static org.mockito.ArgumentMatchers.eq;
@@ -84,27 +85,32 @@ public void shouldRequestCompactionAfterBulkLoad() throws IOException {
8485
for (int i = 0; i < 5; i++) {
8586
familyPaths.addAll(withFamilyPathsFor(family1, family2, family3));
8687
}
87-
when(regionServerServices.getConfiguration()).thenReturn(conf);
88-
when(regionServerServices.getCompactionRequestor()).thenReturn(compactionRequester);
89-
when(log.appendMarker(any(), any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD))))
90-
.thenAnswer(new Answer() {
91-
@Override
92-
public Object answer(InvocationOnMock invocation) {
93-
WALKeyImpl walKey = invocation.getArgument(1);
94-
MultiVersionConcurrencyControl mvcc = walKey.getMvcc();
95-
if (mvcc != null) {
96-
MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin();
97-
walKey.setWriteEntry(we);
88+
try {
89+
conf.setBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, true);
90+
when(regionServerServices.getConfiguration()).thenReturn(conf);
91+
when(regionServerServices.getCompactionRequestor()).thenReturn(compactionRequester);
92+
when(log.appendMarker(any(), any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD))))
93+
.thenAnswer(new Answer() {
94+
@Override
95+
public Object answer(InvocationOnMock invocation) {
96+
WALKeyImpl walKey = invocation.getArgument(1);
97+
MultiVersionConcurrencyControl mvcc = walKey.getMvcc();
98+
if (mvcc != null) {
99+
MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin();
100+
walKey.setWriteEntry(we);
101+
}
102+
return 01L;
98103
}
99-
return 01L;
100-
}
101-
});
104+
});
102105

103-
Mockito.doNothing().when(compactionRequester).requestCompaction(any(), any(), any(), anyInt(),
104-
any(), any());
105-
testRegionWithFamilies(family1, family2, family3).bulkLoadHFiles(familyPaths, false, null);
106-
// invoke three times for 3 families
107-
verify(compactionRequester, times(3)).requestCompaction(isA(HRegion.class), isA(HStore.class),
108-
isA(String.class), anyInt(), eq(CompactionLifeCycleTracker.DUMMY), eq(null));
106+
Mockito.doNothing().when(compactionRequester).requestCompaction(any(), any(), any(), anyInt(),
107+
any(), any());
108+
testRegionWithFamilies(family1, family2, family3).bulkLoadHFiles(familyPaths, false, null);
109+
// invoke three times for 3 families
110+
verify(compactionRequester, times(3)).requestCompaction(isA(HRegion.class), isA(HStore.class),
111+
isA(String.class), anyInt(), eq(CompactionLifeCycleTracker.DUMMY), eq(null));
112+
} finally {
113+
conf.setBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, false);
114+
}
109115
}
110116
}

0 commit comments

Comments
 (0)