diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index f9f841181064..eef9c8ad318f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -379,6 +379,8 @@ public class HRegionServer extends HBaseServerBase private final RegionServerAccounting regionServerAccounting; + private final RegionScannerLimiter regionScannerLimiter; + private NamedQueueServiceChore namedQueueServiceChore = null; // Block cache @@ -521,6 +523,7 @@ public HRegionServer(final Configuration conf) throws IOException { HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME); regionServerAccounting = new RegionServerAccounting(conf); + regionScannerLimiter = new RegionScannerLimiter(conf); blockCache = BlockCacheFactory.createBlockCache(conf); mobFileCache = new MobFileCache(conf); @@ -2078,6 +2081,7 @@ private void registerConfigurationObservers() { configurationManager.registerObserver(this.cacheFlusher); configurationManager.registerObserver(this.rpcServices); configurationManager.registerObserver(this); + configurationManager.registerObserver(regionScannerLimiter); } /* @@ -3637,4 +3641,8 @@ protected void stopChores() { public RegionReplicationBufferManager getRegionReplicationBufferManager() { return regionReplicationBufferManager; } + + public RegionScannerLimiter getRegionScannerLimiter() { + return regionScannerLimiter; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 57efe505c126..2eb3992614a0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -1341,6 +1341,7 @@ private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Ship s instanceof RpcCallback ? (RpcCallback) s : new RegionScannerCloseCallBack(s); RegionScannerHolder rsh = new RegionScannerHolder(s, r, closeCallback, shippedCallback, needCursor, fullRegionScan, getRemoteClientIpAndPort(), getUserName()); + s.setName(scannerName); RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh); assert existing == null : "scannerId must be unique within regionserver's whole lifecycle! " + scannerName + ", " + existing; @@ -3330,6 +3331,11 @@ private void scan(HBaseRpcController controller, ScanRequest request, RegionScan contextBuilder.setTimeLimit(timeScope, timeLimit); contextBuilder.setTrackMetrics(trackMetrics); ScannerContext scannerContext = contextBuilder.build(); + + RegionScannerLimiter regionScannerLimiter = + ((HRegionServer) region.rsServices).getRegionScannerLimiter(); + regionScannerLimiter.setFilterRowsLimitReached(scanner.getName(), false); + boolean limitReached = false; while (numOfResults < maxResults) { // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The @@ -3405,7 +3411,10 @@ private void scan(HBaseRpcController controller, ScanRequest request, RegionScan boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS); boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS); boolean resultsLimitReached = numOfResults >= maxResults; - limitReached = sizeLimitReached || timeLimitReached || resultsLimitReached; + boolean filterRowsLimitReached = + regionScannerLimiter.isFilterRowsLimitReached(scanner.getName()); + limitReached = + sizeLimitReached || timeLimitReached || resultsLimitReached || filterRowsLimitReached; if (limitReached || !moreRows) { // With block size limit, we may exceed size limit without collecting any results. @@ -3416,7 +3425,10 @@ private void scan(HBaseRpcController controller, ScanRequest request, RegionScan // there are more values to be read server side. If there aren't more values, // marking it as a heartbeat is wasteful because the client will need to issue // another ScanRequest only to realize that they already have all the values - if (moreRows && (timeLimitReached || sizeLimitReachedWithoutResults)) { + if ( + moreRows + && (timeLimitReached || sizeLimitReachedWithoutResults || filterRowsLimitReached) + ) { // Heartbeat messages occur when the time limit has been reached, or size limit has // been reached before collecting any results. This can happen for heavily filtered // scans which scan over too many blocks. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java index cea136a9a057..b4b5640789d8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java @@ -110,4 +110,12 @@ default String getOperationId() { * @throws IOException e */ boolean nextRaw(List result, ScannerContext scannerContext) throws IOException; + + default void setName(String name) { + + } + + default String getName() { + return null; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java index d829b1961070..c8c49243fe77 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.PrivateCellUtil; @@ -90,6 +91,7 @@ class RegionScannerImpl implements RegionScanner, Shipper, RpcCallback { private final ScannerContext defaultScannerContext; private final FilterWrapper filter; private final String operationId; + private String name; private RegionServerServices rsServices; @@ -487,7 +489,10 @@ private boolean nextInternal(List results, ScannerContext scannerContext) // Check if rowkey filter wants to exclude this row. If so, loop to next. // Technically, if we hit limits before on this row, we don't need this call. if (filterRowKey(current)) { - incrementCountOfRowsFilteredMetric(scannerContext); + if (incrementCountOfRowsFilteredMetric(scannerContext)) { + return scannerContext.setScannerState(NextState.FILTERED_ROWS_LIMIT_REACHED) + .hasMoreValues(); + } // early check, see HBASE-16296 if (isFilterDoneInternal()) { return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); @@ -552,7 +557,10 @@ private boolean nextInternal(List results, ScannerContext scannerContext) } if (isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE || filterRow()) { - incrementCountOfRowsFilteredMetric(scannerContext); + if (incrementCountOfRowsFilteredMetric(scannerContext)) { + return scannerContext.setScannerState(NextState.FILTERED_ROWS_LIMIT_REACHED) + .hasMoreValues(); + } results.clear(); boolean moreRows = nextRow(scannerContext, current); if (!moreRows) { @@ -604,7 +612,10 @@ private boolean nextInternal(List results, ScannerContext scannerContext) // Double check to prevent empty rows from appearing in result. It could be // the case when SingleColumnValueExcludeFilter is used. if (results.isEmpty()) { - incrementCountOfRowsFilteredMetric(scannerContext); + if (incrementCountOfRowsFilteredMetric(scannerContext)) { + return scannerContext.setScannerState(NextState.FILTERED_ROWS_LIMIT_REACHED) + .hasMoreValues(); + } boolean moreRows = nextRow(scannerContext, current); if (!moreRows) { return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); @@ -629,17 +640,42 @@ private boolean nextInternal(List results, ScannerContext scannerContext) } } - private void incrementCountOfRowsFilteredMetric(ScannerContext scannerContext) { + private boolean incrementCountOfRowsFilteredMetric(ScannerContext scannerContext) + throws DoNotRetryIOException { region.filteredReadRequestsCount.increment(); if (region.getMetrics() != null) { region.getMetrics().updateFilteredRecords(); } - if (scannerContext == null || !scannerContext.isTrackingMetrics()) { - return; + if (scannerContext == null) { + return false; } - scannerContext.getMetrics().countOfRowsFiltered.incrementAndGet(); + if (scannerContext.isTrackingMetrics()) { + scannerContext.getMetrics().countOfRowsFiltered.incrementAndGet(); + } + + scannerContext.incrementFilteredRowsProgress(1); + long countOfRowsFiltered = scannerContext.getFilterRowsProgress(); + if (region.rsServices instanceof HRegionServer) { + RegionScannerLimiter regionScannerLimiter = + ((HRegionServer) region.rsServices).getRegionScannerLimiter(); + long maxRowsFilteredPerRequest = regionScannerLimiter.getMaxRowsFilteredPerRequest(); + if (maxRowsFilteredPerRequest > 0 && countOfRowsFiltered >= maxRowsFilteredPerRequest) { + regionScannerLimiter.setFilterRowsLimitReached(getName(), true); + if (regionScannerLimiter.killRequest()) { + String errMsg = + String.format("Too many rows filtered, higher than the limit threshold of %s, " + + "so kill the scan request!", maxRowsFilteredPerRequest); + LOG.warn("ScannerContext={}, errMsg={}", scannerContext, errMsg); + throw new DoNotRetryIOException(errMsg); + } else { + return true; + } + } + } + + return false; } private void incrementCountOfRowsScannedMetric(ScannerContext scannerContext) { @@ -750,6 +786,9 @@ protected boolean shouldStop(Cell currentRowCell) { @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", justification = "this method is only called inside close which is synchronized") private void closeInternal() { + if (region.rsServices instanceof HRegionServer) { + ((HRegionServer) region.rsServices).getRegionScannerLimiter().removeScanner(getName()); + } if (storeHeap != null) { storeHeap.close(); storeHeap = null; @@ -806,4 +845,14 @@ public void run() throws IOException { // callback this.close(); } + + @Override + public void setName(String name) { + this.name = name; + } + + @Override + public String getName() { + return name; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerLimiter.java new file mode 100644 index 000000000000..20eae61e0b4e --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerLimiter.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.function.Function; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.conf.ConfigurationObserver; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Limit max count of rows filtered per scan request. This Limiter applies globally to scan + * requests, and the config key is + * {@link RegionScannerLimiter#HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY}. When heavily + * filtered scan requests frequently cause high load on the RegionServer, you can set the + * {@link RegionScannerLimiter#HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY} to a larger + * value (e.g. 100,000) to limit those scan requests. If you want to kill the scan request at the + * same time, you can set + * {@link RegionScannerLimiter#HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_REACHED_REQUEST_KILLED_KEY} to + * true. If you want to disable this feature, just set the + * {@link RegionScannerLimiter#HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY} to 0. + */ +@InterfaceAudience.Private +public class RegionScannerLimiter implements ConfigurationObserver { + + private static final Logger LOG = LoggerFactory.getLogger(RegionScannerLimiter.class); + + public static final String HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY = + "hbase.server.scanner.max.rows.filtered.per.request"; + + public static final String HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_REACHED_REQUEST_KILLED_KEY = + "hbase.server.scanner.max.rows.filtered.reached.request.killed"; + + // Max count of rows filtered per scan request. If equals zero, it means no limitation. + // Note: No limitation by default. + private volatile long maxRowsFilteredPerRequest = 0; + // Killing scan request when TRUE. + private volatile boolean requestKilled = false; + + private final ConcurrentMap scanners = new ConcurrentHashMap<>(); + + public RegionScannerLimiter(Configuration conf) { + onConfigurationChange(conf); + } + + private void updateLimiterConf(Configuration conf, String configKey, T oldValue, + Function applyFunc) { + try { + if (conf.get(configKey) == null) { + return; + } + T targetValue = applyFunc.apply(configKey); + if (targetValue != null) { + LOG.info("Config key={}, old value={}, new value={}", configKey, oldValue, targetValue); + } + } catch (Exception e) { + LOG.error("Failed to update config key: {}", configKey, e); + } + } + + public long getMaxRowsFilteredPerRequest() { + return this.maxRowsFilteredPerRequest; + } + + public boolean isFilterRowsLimitReached(String scannerName) { + return scanners.getOrDefault(scannerName, false); + } + + public void setFilterRowsLimitReached(String scannerName, boolean limitReached) { + scanners.put(scannerName, limitReached); + } + + public void removeScanner(String scannerName) { + scanners.remove(scannerName); + } + + public boolean killRequest() { + return requestKilled; + } + + public ConcurrentMap getScanners() { + return scanners; + } + + @Override + public void onConfigurationChange(Configuration conf) { + updateLimiterConf(conf, HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY, + maxRowsFilteredPerRequest, configKey -> { + long targetValue = conf.getLong(configKey, -1); + if (targetValue < 0) { + LOG.warn("Invalid parameter, should be greater than or equal to zero, target value: {}", + targetValue); + return null; + } + if (maxRowsFilteredPerRequest == targetValue) { + return null; + } + maxRowsFilteredPerRequest = targetValue; + return targetValue; + }); + updateLimiterConf(conf, HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_REACHED_REQUEST_KILLED_KEY, + requestKilled, configKey -> { + boolean targetValue = conf.getBoolean(configKey, false); + if (targetValue == requestKilled) { + return null; + } + requestKilled = targetValue; + return targetValue; + }); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java index 03d84f209b04..8709cc58b0e1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java @@ -130,7 +130,7 @@ public class ScannerContext { } // Progress fields are initialized to 0 - progress = new ProgressFields(0, 0, 0, 0); + progress = new ProgressFields(0, 0, 0, 0, 0); this.keepProgress = keepProgress; this.scannerState = DEFAULT_STATE; @@ -220,6 +220,13 @@ void incrementBlockProgress(int blockSize) { } } + void incrementFilteredRowsProgress(int filteredRows) { + if (filteredRows > 0) { + long curFilteredRows = progress.getFilteredRows(); + progress.setFilteredRows(curFilteredRows + filteredRows); + } + } + int getBatchProgress() { return progress.getBatch(); } @@ -236,6 +243,10 @@ long getBlockSizeProgress() { return progress.getBlockSize(); } + long getFilterRowsProgress() { + return progress.getFilteredRows(); + } + void setProgress(int batchProgress, long sizeProgress, long heapSizeProgress) { setBatchProgress(batchProgress); setSizeProgress(sizeProgress, heapSizeProgress); @@ -257,7 +268,7 @@ void setBatchProgress(int batchProgress) { * filtered. */ void clearProgress() { - progress.setFields(0, 0, 0, getBlockSizeProgress()); + progress.setFields(0, 0, 0, getBlockSizeProgress(), getFilterRowsProgress()); } /** @@ -467,6 +478,7 @@ public enum NextState { MORE_VALUES(true, false), NO_MORE_VALUES(false, false), SIZE_LIMIT_REACHED(true, true), + FILTERED_ROWS_LIMIT_REACHED(true, true), /** * Special case of size limit reached to indicate that the size limit was reached in the middle @@ -735,6 +747,7 @@ private static class ProgressFields { private static int DEFAULT_BATCH = -1; private static long DEFAULT_SIZE = -1L; + private static long DEFAULT_ROWS = -1L; // The batch limit will always be enforced between cells, thus, there isn't a field to hold the // batch scope @@ -748,19 +761,22 @@ private static class ProgressFields { // The total amount of block bytes that have been loaded in order to process cells for the // request. long blockSize = DEFAULT_SIZE; + // The total amount of rows that have been filtered in order to process this scan request. + long filteredRows = DEFAULT_ROWS; - ProgressFields(int batch, long size, long heapSize, long blockSize) { - setFields(batch, size, heapSize, blockSize); + ProgressFields(int batch, long size, long heapSize, long blockSize, long filteredRows) { + setFields(batch, size, heapSize, blockSize, filteredRows); } /** * Set all fields together. */ - void setFields(int batch, long dataSize, long heapSize, long blockSize) { + void setFields(int batch, long dataSize, long heapSize, long blockSize, long filteredRows) { setBatch(batch); setDataSize(dataSize); setHeapSize(heapSize); setBlockSize(blockSize); + setFilteredRows(filteredRows); } int getBatch() { @@ -783,6 +799,10 @@ long getBlockSize() { return this.blockSize; } + long getFilteredRows() { + return this.filteredRows; + } + void setDataSize(long dataSize) { this.dataSize = dataSize; } @@ -791,6 +811,10 @@ void setBlockSize(long blockSize) { this.blockSize = blockSize; } + void setFilteredRows(long filteredRows) { + this.filteredRows = filteredRows; + } + void setHeapSize(long heapSize) { this.heapSize = heapSize; } @@ -812,6 +836,9 @@ public String toString() { sb.append(", blockSize:"); sb.append(blockSize); + sb.append(", filteredRows:"); + sb.append(filteredRows); + sb.append("}"); return sb.toString(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerFilteredRowsLimits.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerFilteredRowsLimits.java new file mode 100644 index 000000000000..54f3ab2bcafb --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerFilteredRowsLimits.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CompareOperator; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HTestConst; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; +import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ LargeTests.class }) +public class TestScannerFilteredRowsLimits { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestScannerFilteredRowsLimits.class); + + private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + + private static int NUM_ROWS = 10; + private static byte[] ROW = Bytes.toBytes("testRow"); + private static byte[][] ROWS = HTestConst.makeNAscii(ROW, NUM_ROWS); + + private static int NUM_FAMILIES = 1; + private static byte[] FAMILY = Bytes.toBytes("testFamily"); + private static byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, NUM_FAMILIES); + + private static int NUM_QUALIFIERS = 1; + private static byte[] QUALIFIER = Bytes.toBytes("testQualifier"); + private static byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, NUM_QUALIFIERS); + + private static int VALUE_SIZE = 10; + private static byte[] VALUE = Bytes.createMaxByteArray(VALUE_SIZE); + + private static Table TABLE = null; + private static TableName TABLE_NAME = TableName.valueOf("testTable"); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniCluster(3); + TABLE = createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE); + } + + private static Table createTestTable(TableName name, byte[][] rows, byte[][] families, + byte[][] qualifiers, byte[] cellValue) throws IOException { + Table ht = TEST_UTIL.createTable(name, families); + List puts = createPuts(rows, families, qualifiers, cellValue); + ht.put(puts); + + return ht; + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + private static ArrayList createPuts(byte[][] rows, byte[][] families, byte[][] qualifiers, + byte[] value) throws IOException { + Put put; + ArrayList puts = new ArrayList<>(); + + for (int row = 0; row < rows.length; row++) { + put = new Put(rows[row]); + for (int fam = 0; fam < families.length; fam++) { + for (int qual = 0; qual < qualifiers.length; qual++) { + KeyValue kv = new KeyValue(rows[row], families[fam], qualifiers[qual], qual, value); + put.add(kv); + } + } + puts.add(put); + } + + return puts; + } + + @Test + public void testRowsFilteredMetricLimiter() throws Exception { + // Base scan configuration + Scan baseScan; + baseScan = new Scan(); + baseScan.setScanMetricsEnabled(true); + + // No matching column value should exist in any row. Filter all rows + Filter filter = + new SingleColumnValueFilter(FAMILIES[0], QUALIFIERS[0], CompareOperator.NOT_EQUAL, VALUE); + testRowsFilteredMetric(baseScan, filter, ROWS.length); + long scanRpcCalls = + getScanMetricValue(new Scan(baseScan).setFilter(filter), ScanMetrics.RPC_CALLS_METRIC_NAME); + + HRegionServer rs1; + // Update server side max count of rows filtered config. + try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(TABLE.getName())) { + RegionInfo firstHRI = locator.getAllRegionLocations().get(0).getRegion(); + rs1 = TEST_UTIL.getHBaseCluster() + .getRegionServer(TEST_UTIL.getHBaseCluster().getServerWith(firstHRI.getRegionName())); + } + + Configuration conf = TEST_UTIL.getConfiguration(); + // Set max rows filtered limitation. + conf.setLong(RegionScannerLimiter.HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY, 7); + conf.setBoolean( + RegionScannerLimiter.HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_REACHED_REQUEST_KILLED_KEY, true); + rs1.getConfigurationManager().notifyAllObservers(conf); + + assertThrows("Should throw a DoNotRetryIOException when too many rows have been filtered.", + DoNotRetryIOException.class, () -> testRowsFilteredMetric(baseScan, filter, ROWS.length)); + + conf.setBoolean( + RegionScannerLimiter.HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_REACHED_REQUEST_KILLED_KEY, + false); + rs1.getConfigurationManager().notifyAllObservers(conf); + testRowsFilteredMetric(baseScan, filter, ROWS.length); + + // Test scan rpc calls. + long scanRpcCalls2 = + getScanMetricValue(new Scan(baseScan).setFilter(filter), ScanMetrics.RPC_CALLS_METRIC_NAME); + assertEquals(scanRpcCalls + 1, scanRpcCalls2); + + conf.setLong(RegionScannerLimiter.HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY, 5); + rs1.getConfigurationManager().notifyAllObservers(conf); + long scanRpcCalls3 = + getScanMetricValue(new Scan(baseScan).setFilter(filter), ScanMetrics.RPC_CALLS_METRIC_NAME); + assertEquals(scanRpcCalls + 2, scanRpcCalls3); + + conf.setLong(RegionScannerLimiter.HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY, 3); + rs1.getConfigurationManager().notifyAllObservers(conf); + long scanRpcCalls4 = + getScanMetricValue(new Scan(baseScan).setFilter(filter), ScanMetrics.RPC_CALLS_METRIC_NAME); + assertEquals(scanRpcCalls + 3, scanRpcCalls4); + + // no max rows filtered limitation. + conf.setLong(RegionScannerLimiter.HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY, 0); + rs1.getConfigurationManager().notifyAllObservers(conf); + testRowsFilteredMetric(baseScan, filter, ROWS.length); + + assertEquals(0, rs1.getRegionScannerLimiter().getScanners().size()); + } + + private void testRowsFilteredMetric(Scan baseScan, Filter filter, int expectedNumFiltered) + throws Exception { + Scan scan = new Scan(baseScan); + if (filter != null) { + scan.setFilter(filter); + } + testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME, + expectedNumFiltered); + } + + private void testMetric(Scan scan, String metricKey, long expectedValue) throws Exception { + final long actualMetricValue = getScanMetricValue(scan, metricKey); + assertEquals( + "Metric: " + metricKey + " Expected: " + expectedValue + " Actual: " + actualMetricValue, + expectedValue, actualMetricValue); + } + + private static long getScanMetricValue(Scan scan, String metricKey) throws IOException { + assertTrue("Scan should be configured to record metrics", scan.isScanMetricsEnabled()); + ResultScanner scanner = TABLE.getScanner(scan); + // Iterate through all the results + while (scanner.next() != null) { + continue; + } + scanner.close(); + ScanMetrics metrics = scanner.getScanMetrics(); + assertNotNull("Metrics are null", metrics); + assertTrue("Metric : " + metricKey + " does not exist", metrics.hasCounter(metricKey)); + return metrics.getCounter(metricKey).get(); + } +}