Skip to content

Commit 1aff89e

Browse files
committed
HBASE-25709 Close region may stuck when region is compacting and skipped most cells read (#3117)
Signed-off-by: Andrew Purtell <[email protected]>
1 parent 0a7e49c commit 1aff89e

File tree

3 files changed

+144
-0
lines changed

3 files changed

+144
-0
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -756,6 +756,11 @@ public boolean next(List<Cell> outResult, ScannerContext scannerContext) throws
756756
default:
757757
throw new RuntimeException("UNEXPECTED");
758758
}
759+
760+
// when reaching the heartbeat cells, try to return from the loop.
761+
if (kvsScanned % cellsPerHeartbeatCheck == 0) {
762+
return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();
763+
}
759764
} while ((cell = this.heap.peek()) != null);
760765

761766
if (count > 0) {

hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7029,6 +7029,74 @@ public void testCellTTLs() throws IOException {
70297029
assertNull(r.getValue(fam1, q1));
70307030
}
70317031

7032+
@Test
7033+
public void testTTLsUsingSmallHeartBeatCells() throws IOException {
7034+
IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
7035+
EnvironmentEdgeManager.injectEdge(edge);
7036+
7037+
final byte[] row = Bytes.toBytes("testRow");
7038+
final byte[] q1 = Bytes.toBytes("q1");
7039+
final byte[] q2 = Bytes.toBytes("q2");
7040+
final byte[] q3 = Bytes.toBytes("q3");
7041+
final byte[] q4 = Bytes.toBytes("q4");
7042+
final byte[] q5 = Bytes.toBytes("q5");
7043+
final byte[] q6 = Bytes.toBytes("q6");
7044+
final byte[] q7 = Bytes.toBytes("q7");
7045+
final byte[] q8 = Bytes.toBytes("q8");
7046+
7047+
// 10 seconds
7048+
int ttlSecs = 10;
7049+
TableDescriptor tableDescriptor =
7050+
TableDescriptorBuilder.newBuilder(TableName.valueOf(name.getMethodName())).setColumnFamily(
7051+
ColumnFamilyDescriptorBuilder.newBuilder(fam1).setTimeToLive(ttlSecs).build()).build();
7052+
7053+
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
7054+
conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS);
7055+
// using small heart beat cells
7056+
conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 2);
7057+
7058+
region = HBaseTestingUtility
7059+
.createRegionAndWAL(RegionInfoBuilder.newBuilder(tableDescriptor.getTableName()).build(),
7060+
TEST_UTIL.getDataTestDir(), conf, tableDescriptor);
7061+
assertNotNull(region);
7062+
long now = EnvironmentEdgeManager.currentTime();
7063+
// Add a cell that will expire in 5 seconds via cell TTL
7064+
region.put(new Put(row).addColumn(fam1, q1, now, HConstants.EMPTY_BYTE_ARRAY));
7065+
region.put(new Put(row).addColumn(fam1, q2, now, HConstants.EMPTY_BYTE_ARRAY));
7066+
region.put(new Put(row).addColumn(fam1, q3, now, HConstants.EMPTY_BYTE_ARRAY));
7067+
// Add a cell that will expire after 10 seconds via family setting
7068+
region
7069+
.put(new Put(row).addColumn(fam1, q4, now + ttlSecs * 1000 + 1, HConstants.EMPTY_BYTE_ARRAY));
7070+
region
7071+
.put(new Put(row).addColumn(fam1, q5, now + ttlSecs * 1000 + 1, HConstants.EMPTY_BYTE_ARRAY));
7072+
7073+
region.put(new Put(row).addColumn(fam1, q6, now, HConstants.EMPTY_BYTE_ARRAY));
7074+
region.put(new Put(row).addColumn(fam1, q7, now, HConstants.EMPTY_BYTE_ARRAY));
7075+
region
7076+
.put(new Put(row).addColumn(fam1, q8, now + ttlSecs * 1000 + 1, HConstants.EMPTY_BYTE_ARRAY));
7077+
7078+
// Flush so we are sure store scanning gets this right
7079+
region.flush(true);
7080+
7081+
// A query at time T+0 should return all cells
7082+
checkScan(8);
7083+
7084+
// Increment time to T+ttlSecs seconds
7085+
edge.incrementTime(ttlSecs * 1000);
7086+
checkScan(3);
7087+
}
7088+
7089+
private void checkScan(int expectCellSize) throws IOException{
7090+
Scan s = new Scan().withStartRow(row);
7091+
ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true);
7092+
ScannerContext scannerContext = contextBuilder.build();
7093+
RegionScanner scanner = region.getScanner(s);
7094+
List<Cell> kvs = new ArrayList<>();
7095+
scanner.next(kvs, scannerContext);
7096+
assertEquals(expectCellSize, kvs.size());
7097+
scanner.close();
7098+
}
7099+
70327100
@Test
70337101
public void testIncrementTimestampsAreMonotonic() throws IOException {
70347102
region = initHRegion(tableName, method, CONF, fam1);

hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHStore.java

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1279,6 +1279,77 @@ public long getSmallestReadPoint(HStore store) {
12791279
}
12801280
}
12811281

1282+
@Test
1283+
public void testPreventLoopRead() throws Exception {
1284+
init(this.name.getMethodName());
1285+
Configuration conf = HBaseConfiguration.create();
1286+
// use small heart beat cells
1287+
conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 2);
1288+
IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
1289+
EnvironmentEdgeManager.injectEdge(edge);
1290+
byte[] r0 = Bytes.toBytes("row0");
1291+
byte[] value0 = Bytes.toBytes("value0");
1292+
byte[] value1 = Bytes.toBytes("value1");
1293+
MemStoreSizing memStoreSizing = new NonThreadSafeMemStoreSizing();
1294+
long ts = EnvironmentEdgeManager.currentTime();
1295+
long seqId = 100;
1296+
init(name.getMethodName(), conf, TableDescriptorBuilder.newBuilder(TableName.valueOf(table)),
1297+
ColumnFamilyDescriptorBuilder.newBuilder(family).setTimeToLive(10).build(),
1298+
new MyStoreHook() {
1299+
@Override public long getSmallestReadPoint(HStore store) {
1300+
return seqId + 3;
1301+
}
1302+
});
1303+
// The cells having the value0 will be expired
1304+
store.add(createCell(r0, qf1, ts, seqId, value0), memStoreSizing);
1305+
store.add(createCell(r0, qf2, ts, seqId, value0), memStoreSizing);
1306+
store.add(createCell(r0, qf3, ts, seqId, value0), memStoreSizing);
1307+
store.add(createCell(r0, qf4, ts + 10000 + 1, seqId, value1), memStoreSizing);
1308+
store.add(createCell(r0, qf5, ts, seqId, value0), memStoreSizing);
1309+
store.add(createCell(r0, qf6, ts + 10000 + 1, seqId, value1), memStoreSizing);
1310+
1311+
List<Cell> myList = new ArrayList<>();
1312+
Scan scan = new Scan().withStartRow(r0);
1313+
ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(false);
1314+
// test normal scan, should return all the cells
1315+
ScannerContext scannerContext = contextBuilder.build();
1316+
try (InternalScanner scanner = (InternalScanner) store.getScanner(scan, null,
1317+
seqId + 3)) {
1318+
scanner.next(myList, scannerContext);
1319+
assertEquals(6, myList.size());
1320+
}
1321+
1322+
// test skip two ttl cells and return with empty results, default prevent loop skip is on
1323+
edge.incrementTime(10 * 1000);
1324+
scannerContext = contextBuilder.build();
1325+
myList.clear();
1326+
try (InternalScanner scanner = (InternalScanner) store.getScanner(scan, null,
1327+
seqId + 3)) {
1328+
// r0
1329+
scanner.next(myList, scannerContext);
1330+
assertEquals(0, myList.size());
1331+
}
1332+
1333+
// should scan all non-ttl expired cells by iterative next
1334+
int resultCells = 0;
1335+
try (InternalScanner scanner = (InternalScanner) store.getScanner(scan, null,
1336+
seqId + 3)) {
1337+
boolean hasMore = true;
1338+
while (hasMore) {
1339+
myList.clear();
1340+
hasMore = scanner.next(myList, scannerContext);
1341+
assertTrue(myList.size() < 6);
1342+
resultCells += myList.size();
1343+
}
1344+
for (Cell c : myList) {
1345+
byte[] actualValue = CellUtil.cloneValue(c);
1346+
assertTrue("expected:" + Bytes.toStringBinary(value1) + ", actual:" + Bytes
1347+
.toStringBinary(actualValue), Bytes.equals(actualValue, value1));
1348+
}
1349+
}
1350+
assertEquals(2, resultCells);
1351+
}
1352+
12821353
@Test
12831354
public void testCreateScannerAndSnapshotConcurrently() throws IOException, InterruptedException {
12841355
Configuration conf = HBaseConfiguration.create();

0 commit comments

Comments
 (0)