Skip to content

Commit 7620725

Browse files
authored
HBASE-27558 Scan quotas and limits should account for total block IO (#4967)
Signed-off-by: Duo Zhang <[email protected]>
1 parent 382681e commit 7620725

File tree

15 files changed

+553
-30
lines changed

15 files changed

+553
-30
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/io/HalfStoreFileReader.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.io.IOException;
2121
import java.nio.ByteBuffer;
2222
import java.util.Optional;
23+
import java.util.function.IntConsumer;
2324
import org.apache.hadoop.conf.Configuration;
2425
import org.apache.hadoop.hbase.Cell;
2526
import org.apache.hadoop.hbase.HConstants;
@@ -277,6 +278,11 @@ public void close() {
277278
public void shipped() throws IOException {
278279
this.delegate.shipped();
279280
}
281+
282+
@Override
283+
public void recordBlockSize(IntConsumer blockSizeConsumer) {
284+
this.delegate.recordBlockSize(blockSizeConsumer);
285+
}
280286
};
281287
}
282288

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileReaderImpl.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.nio.ByteBuffer;
2828
import java.util.ArrayList;
2929
import java.util.Optional;
30+
import java.util.function.IntConsumer;
3031
import org.apache.hadoop.conf.Configurable;
3132
import org.apache.hadoop.conf.Configuration;
3233
import org.apache.hadoop.fs.Path;
@@ -336,6 +337,9 @@ protected static class HFileScannerImpl implements HFileScanner {
336337
// RegionScannerImpl#handleException). Call the releaseIfNotCurBlock() to release the
337338
// unreferenced block please.
338339
protected HFileBlock curBlock;
340+
// Whether we returned a result for curBlock's size in recordBlockSize().
341+
// gets reset whenever curBlock is changed.
342+
private boolean providedCurrentBlockSize = false;
339343
// Previous blocks that were used in the course of the read
340344
protected final ArrayList<HFileBlock> prevBlocks = new ArrayList<>();
341345

@@ -355,6 +359,7 @@ void updateCurrBlockRef(HFileBlock block) {
355359
prevBlocks.add(this.curBlock);
356360
}
357361
this.curBlock = block;
362+
this.providedCurrentBlockSize = false;
358363
}
359364

360365
void reset() {
@@ -415,6 +420,14 @@ public void close() {
415420
this.returnBlocks(true);
416421
}
417422

423+
@Override
424+
public void recordBlockSize(IntConsumer blockSizeConsumer) {
425+
if (!providedCurrentBlockSize && curBlock != null) {
426+
providedCurrentBlockSize = true;
427+
blockSizeConsumer.accept(curBlock.getUncompressedSizeWithoutHeader());
428+
}
429+
}
430+
418431
// Returns the #bytes in HFile for the current cell. Used to skip these many bytes in current
419432
// HFile block's buffer so as to position to the next cell.
420433
private int getCurCellSerializedSize() {

hbase-server/src/main/java/org/apache/hadoop/hbase/io/hfile/HFileScanner.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.io.Closeable;
2121
import java.io.IOException;
2222
import java.nio.ByteBuffer;
23+
import java.util.function.IntConsumer;
2324
import org.apache.hadoop.hbase.Cell;
2425
import org.apache.hadoop.hbase.regionserver.Shipper;
2526
import org.apache.yetus.audience.InterfaceAudience;
@@ -140,4 +141,11 @@ public interface HFileScanner extends Shipper, Closeable {
140141
*/
141142
@Override
142143
void close();
144+
145+
/**
146+
* Record the size of the current block in bytes, passing as an argument to the blockSizeConsumer.
147+
* Implementations should ensure that blockSizeConsumer is only called once per block.
148+
* @param blockSizeConsumer to be called with block size in bytes, once per block.
149+
*/
150+
void recordBlockSize(IntConsumer blockSizeConsumer);
143151
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.Comparator;
2323
import java.util.List;
2424
import java.util.PriorityQueue;
25+
import java.util.function.IntConsumer;
2526
import org.apache.hadoop.hbase.Cell;
2627
import org.apache.hadoop.hbase.CellComparator;
2728
import org.apache.hadoop.hbase.regionserver.ScannerContext.NextState;
@@ -104,6 +105,11 @@ boolean isLatestCellFromMemstore() {
104105
return !this.current.isFileScanner();
105106
}
106107

108+
@Override
109+
public void recordBlockSize(IntConsumer blockSizeConsumer) {
110+
this.current.recordBlockSize(blockSizeConsumer);
111+
}
112+
107113
@Override
108114
public Cell next() throws IOException {
109115
if (this.current == null) {

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.io.Closeable;
2121
import java.io.IOException;
22+
import java.util.function.IntConsumer;
2223
import org.apache.hadoop.fs.Path;
2324
import org.apache.hadoop.hbase.Cell;
2425
import org.apache.hadoop.hbase.KeyValue;
@@ -125,6 +126,13 @@ default long getScannerOrder() {
125126
/** Returns true if this is a file scanner. Otherwise a memory scanner is assumed. */
126127
boolean isFileScanner();
127128

129+
/**
130+
* Record the size of the current block in bytes, passing as an argument to the blockSizeConsumer.
131+
* Implementations should ensure that blockSizeConsumer is only called once per block.
132+
* @param blockSizeConsumer to be called with block size in bytes, once per block.
133+
*/
134+
void recordBlockSize(IntConsumer blockSizeConsumer);
135+
128136
/**
129137
* @return the file path if this is a file scanner, otherwise null.
130138
* @see #isFileScanner()

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.hadoop.hbase.regionserver;
1919

2020
import java.io.IOException;
21+
import java.util.function.IntConsumer;
2122
import org.apache.commons.lang3.NotImplementedException;
2223
import org.apache.hadoop.fs.Path;
2324
import org.apache.hadoop.hbase.Cell;
@@ -63,6 +64,11 @@ public boolean isFileScanner() {
6364
return false;
6465
}
6566

67+
@Override
68+
public void recordBlockSize(IntConsumer blockSizeConsumer) {
69+
// do nothing
70+
}
71+
6672
@Override
6773
public Path getFilePath() {
6874
// Not a file by default.

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java

Lines changed: 38 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
import java.util.concurrent.atomic.AtomicBoolean;
4343
import java.util.concurrent.atomic.AtomicLong;
4444
import java.util.concurrent.atomic.LongAdder;
45-
import org.apache.commons.lang3.mutable.MutableObject;
4645
import org.apache.hadoop.conf.Configuration;
4746
import org.apache.hadoop.fs.FileSystem;
4847
import org.apache.hadoop.fs.Path;
@@ -3282,8 +3281,7 @@ private void checkLimitOfRows(int numOfCompleteRows, int limitOfRows, boolean mo
32823281
// return whether we have more results in region.
32833282
private void scan(HBaseRpcController controller, ScanRequest request, RegionScannerHolder rsh,
32843283
long maxQuotaResultSize, int maxResults, int limitOfRows, List<Result> results,
3285-
ScanResponse.Builder builder, MutableObject<Object> lastBlock, RpcCall rpcCall)
3286-
throws IOException {
3284+
ScanResponse.Builder builder, RpcCall rpcCall) throws IOException {
32873285
HRegion region = rsh.r;
32883286
RegionScanner scanner = rsh.s;
32893287
long maxResultSize;
@@ -3343,7 +3341,19 @@ private void scan(HBaseRpcController controller, ScanRequest request, RegionScan
33433341
ScannerContext.Builder contextBuilder = ScannerContext.newBuilder(true);
33443342
// maxResultSize - either we can reach this much size for all cells(being read) data or sum
33453343
// of heap size occupied by cells(being read). Cell data means its key and value parts.
3346-
contextBuilder.setSizeLimit(sizeScope, maxResultSize, maxResultSize);
3344+
// maxQuotaResultSize - max results just from server side configuration and quotas, without
3345+
// user's specified max. We use this for evaluating limits based on blocks (not cells).
3346+
// We may have accumulated some results in coprocessor preScannerNext call. We estimate
3347+
// block and cell size of those using call to addSize. Update our maximums for scanner
3348+
// context so we can account for them in the real scan.
3349+
long maxCellSize = maxResultSize;
3350+
long maxBlockSize = maxQuotaResultSize;
3351+
if (rpcCall != null) {
3352+
maxBlockSize -= rpcCall.getResponseBlockSize();
3353+
maxCellSize -= rpcCall.getResponseCellSize();
3354+
}
3355+
3356+
contextBuilder.setSizeLimit(sizeScope, maxCellSize, maxCellSize, maxBlockSize);
33473357
contextBuilder.setBatchLimit(scanner.getBatch());
33483358
contextBuilder.setTimeLimit(timeScope, timeLimit);
33493359
contextBuilder.setTrackMetrics(trackMetrics);
@@ -3398,7 +3408,6 @@ private void scan(HBaseRpcController controller, ScanRequest request, RegionScan
33983408
}
33993409
boolean mayHaveMoreCellsInRow = scannerContext.mayHaveMoreCellsInRow();
34003410
Result r = Result.create(values, null, stale, mayHaveMoreCellsInRow);
3401-
lastBlock.setValue(addSize(rpcCall, r, lastBlock.getValue()));
34023411
results.add(r);
34033412
numOfResults++;
34043413
if (!mayHaveMoreCellsInRow && limitOfRows > 0) {
@@ -3427,12 +3436,18 @@ private void scan(HBaseRpcController controller, ScanRequest request, RegionScan
34273436
limitReached = sizeLimitReached || timeLimitReached || resultsLimitReached;
34283437

34293438
if (limitReached || !moreRows) {
3439+
// With block size limit, we may exceed size limit without collecting any results.
3440+
// In this case we want to send heartbeat and/or cursor. We don't want to send heartbeat
3441+
// or cursor if results were collected, for example for cell size or heap size limits.
3442+
boolean sizeLimitReachedWithoutResults = sizeLimitReached && results.isEmpty();
34303443
// We only want to mark a ScanResponse as a heartbeat message in the event that
34313444
// there are more values to be read server side. If there aren't more values,
34323445
// marking it as a heartbeat is wasteful because the client will need to issue
34333446
// another ScanRequest only to realize that they already have all the values
3434-
if (moreRows && timeLimitReached) {
3435-
// Heartbeat messages occur when the time limit has been reached.
3447+
if (moreRows && (timeLimitReached || sizeLimitReachedWithoutResults)) {
3448+
// Heartbeat messages occur when the time limit has been reached, or size limit has
3449+
// been reached before collecting any results. This can happen for heavily filtered
3450+
// scans which scan over too many blocks.
34363451
builder.setHeartbeatMessage(true);
34373452
if (rsh.needCursor) {
34383453
Cell cursorCell = scannerContext.getLastPeekedCell();
@@ -3445,6 +3460,10 @@ private void scan(HBaseRpcController controller, ScanRequest request, RegionScan
34453460
}
34463461
values.clear();
34473462
}
3463+
if (rpcCall != null) {
3464+
rpcCall.incrementResponseBlockSize(scannerContext.getBlockSizeProgress());
3465+
rpcCall.incrementResponseCellSize(scannerContext.getHeapSizeProgress());
3466+
}
34483467
builder.setMoreResultsInRegion(moreRows);
34493468
// Check to see if the client requested that we track metrics server side. If the
34503469
// client requested metrics, retrieve the metrics from the scanner context.
@@ -3606,7 +3625,6 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque
36063625
} else {
36073626
limitOfRows = -1;
36083627
}
3609-
MutableObject<Object> lastBlock = new MutableObject<>();
36103628
boolean scannerClosed = false;
36113629
try {
36123630
List<Result> results = new ArrayList<>(Math.min(rows, 512));
@@ -3616,8 +3634,18 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque
36163634
if (region.getCoprocessorHost() != null) {
36173635
Boolean bypass = region.getCoprocessorHost().preScannerNext(scanner, results, rows);
36183636
if (!results.isEmpty()) {
3637+
// If scanner CP added results to list, we want to account for cell and block size of
3638+
// that work. We estimate this using addSize, since CP does not get ScannerContext. If
3639+
// !done, the actual scan call below will use more accurate ScannerContext block and
3640+
// cell size tracking for the rest of the request. The two result sets will be added
3641+
// together in the RpcCall accounting.
3642+
// This here is just an estimate (see addSize for more details on estimation). We don't
3643+
// pass lastBlock to the scan call below because the real scan uses ScannerContext,
3644+
// which does not use lastBlock tracking. This may result in over counting by 1 block,
3645+
// but that is unlikely since addSize is already a rough estimate.
3646+
Object lastBlock = null;
36193647
for (Result r : results) {
3620-
lastBlock.setValue(addSize(rpcCall, r, lastBlock.getValue()));
3648+
lastBlock = addSize(rpcCall, r, lastBlock);
36213649
}
36223650
}
36233651
if (bypass != null && bypass.booleanValue()) {
@@ -3626,7 +3654,7 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque
36263654
}
36273655
if (!done) {
36283656
scan((HBaseRpcController) controller, request, rsh, maxQuotaResultSize, rows, limitOfRows,
3629-
results, builder, lastBlock, rpcCall);
3657+
results, builder, rpcCall);
36303658
} else {
36313659
builder.setMoreResultsInRegion(!results.isEmpty());
36323660
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -503,7 +503,8 @@ private boolean nextInternal(List<Cell> results, ScannerContext scannerContext)
503503
results.clear();
504504

505505
// Read nothing as the rowkey was filtered, but still need to check time limit
506-
if (scannerContext.checkTimeLimit(limitScope)) {
506+
// We also check size limit because we might have read blocks in getting to this point.
507+
if (scannerContext.checkAnyLimitReached(limitScope)) {
507508
return true;
508509
}
509510
continue;
@@ -561,8 +562,9 @@ private boolean nextInternal(List<Cell> results, ScannerContext scannerContext)
561562
// This row was totally filtered out, if this is NOT the last row,
562563
// we should continue on. Otherwise, nothing else to do.
563564
if (!shouldStop) {
564-
// Read nothing as the cells was filtered, but still need to check time limit
565-
if (scannerContext.checkTimeLimit(limitScope)) {
565+
// Read nothing as the cells was filtered, but still need to check time limit.
566+
// We also check size limit because we might have read blocks in getting to this point.
567+
if (scannerContext.checkAnyLimitReached(limitScope)) {
566568
return true;
567569
}
568570
continue;
@@ -608,6 +610,13 @@ private boolean nextInternal(List<Cell> results, ScannerContext scannerContext)
608610
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
609611
}
610612
if (!shouldStop) {
613+
// We check size limit because we might have read blocks in the nextRow call above, or
614+
// in the call populateResults call. Only scans with hasFilterRow should reach this point,
615+
// and for those scans which filter row _cells_ this is the only place we can actually
616+
// enforce that the scan does not exceed limits since it bypasses all other checks above.
617+
if (scannerContext.checkSizeLimit(limitScope)) {
618+
return true;
619+
}
611620
continue;
612621
}
613622
}
@@ -705,13 +714,21 @@ public int size() {
705714

706715
protected boolean nextRow(ScannerContext scannerContext, Cell curRowCell) throws IOException {
707716
assert this.joinedContinuationRow == null : "Trying to go to next row during joinedHeap read.";
717+
718+
// Enable skipping row mode, which disables limits and skips tracking progress for all
719+
// but block size. We keep tracking block size because skipping a row in this way
720+
// might involve reading blocks along the way.
721+
scannerContext.setSkippingRow(true);
722+
708723
Cell next;
709724
while ((next = this.storeHeap.peek()) != null && CellUtil.matchingRows(next, curRowCell)) {
710725
// Check for thread interrupt status in case we have been signaled from
711726
// #interruptRegionOperation.
712727
region.checkInterrupt();
713-
this.storeHeap.next(MOCKED_LIST);
728+
this.storeHeap.next(MOCKED_LIST, scannerContext);
714729
}
730+
731+
scannerContext.setSkippingRow(false);
715732
resetFilters();
716733

717734
// Calling the hook in CP which allows it to do a fast forward

0 commit comments

Comments
 (0)