From 4f1cca21eacfaef6c3472a3d4431a1f0a024cf35 Mon Sep 17 00:00:00 2001 From: Fantasy-Jay <13631435453@163.com> Date: Fri, 22 Sep 2023 10:41:23 +0800 Subject: [PATCH 1/7] Limit max count of rows filtered per scan request. --- .../hbase/regionserver/HRegionServer.java | 1 + .../hbase/regionserver/RegionScannerImpl.java | 14 ++- .../regionserver/RegionScannerLimiter.java | 89 +++++++++++++++++++ ...stServerSideScanMetricsFromClientSide.java | 40 +++++++++ 4 files changed, 142 insertions(+), 2 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerLimiter.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index f9f841181064..d223bfed83c0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -2078,6 +2078,7 @@ private void registerConfigurationObservers() { configurationManager.registerObserver(this.cacheFlusher); configurationManager.registerObserver(this.rpcServices); configurationManager.registerObserver(this); + configurationManager.registerObserver(RegionScannerLimiter.create(conf)); } /* diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java index d829b1961070..a6846784b4e1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellComparator; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.PrivateCellUtil; @@ -629,7 +630,8 @@ private boolean nextInternal(List results, ScannerContext scannerContext) } } - private void incrementCountOfRowsFilteredMetric(ScannerContext scannerContext) { + private void incrementCountOfRowsFilteredMetric(ScannerContext scannerContext) + throws DoNotRetryIOException { region.filteredReadRequestsCount.increment(); if (region.getMetrics() != null) { region.getMetrics().updateFilteredRecords(); @@ -639,7 +641,15 @@ private void incrementCountOfRowsFilteredMetric(ScannerContext scannerContext) { return; } - scannerContext.getMetrics().countOfRowsFiltered.incrementAndGet(); + long countOfRowsFiltered = scannerContext.getMetrics().countOfRowsFiltered.incrementAndGet(); + long maxRowsFilteredPerRequest = RegionScannerLimiter.get().getMaxRowsFilteredPerRequest(); + if (maxRowsFilteredPerRequest > 0 && countOfRowsFiltered >= maxRowsFilteredPerRequest) { + String errMsg = String.format( + "Too many rows filtered, higher than the limit threshold of %s, so kill the scan request!", + maxRowsFilteredPerRequest); + LOG.warn("ScannerContext={}, errMsg={}", scannerContext, errMsg); + throw new DoNotRetryIOException(errMsg); + } } private void incrementCountOfRowsScannedMetric(ScannerContext scannerContext) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerLimiter.java new file mode 100644 index 000000000000..0383de473032 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerLimiter.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.conf.ConfigurationObserver; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Limit max count of rows filtered per scan request. + */ +@InterfaceAudience.Private +public class RegionScannerLimiter implements ConfigurationObserver { + + private static final Logger LOG = LoggerFactory.getLogger(RegionScannerLimiter.class); + + public static final String HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY = + "hbase.server.scanner.max.rows.filtered.per.request"; + + private static RegionScannerLimiter INSTANCE; + + // Max count of rows filtered per request. If zero, it means no limitation. + private volatile long maxRowsFilteredPerRequest = 0; + + private RegionScannerLimiter(Configuration conf) { + updateLimiterConf(conf, HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY); + } + + private void updateLimiterConf(Configuration conf, String configKey) { + try { + if (conf.get(configKey) == null) { + return; + } + + long targetValue = conf.getLong(configKey, -1); + if (targetValue < 0) { + LOG.warn("Invalid parameter, should be greater than or equal to zero, target value: {}", + targetValue); + return; + } + if (maxRowsFilteredPerRequest == targetValue) { + return; + } + + LOG.info("Config key={}, old value={}, new value={}", configKey, maxRowsFilteredPerRequest, + targetValue); + this.maxRowsFilteredPerRequest = targetValue; + } catch (Exception e) { + LOG.error("Failed to update config key: {}", configKey, e); + } + } + + public long getMaxRowsFilteredPerRequest() { + return this.maxRowsFilteredPerRequest; + } + + public static RegionScannerLimiter get() { + return INSTANCE; + } + + public static synchronized RegionScannerLimiter create(Configuration conf) { + if (INSTANCE == null) { + INSTANCE = new RegionScannerLimiter(conf); + } + return INSTANCE; + } + + @Override + public void onConfigurationChange(Configuration conf) { + updateLimiterConf(conf, HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerSideScanMetricsFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerSideScanMetricsFromClientSide.java index b80cd207683c..9c824ceb3217 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerSideScanMetricsFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerSideScanMetricsFromClientSide.java @@ -18,12 +18,16 @@ package org.apache.hadoop.hbase; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; @@ -39,6 +43,8 @@ import org.apache.hadoop.hbase.filter.RowFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.RegionScannerLimiter; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; import org.junit.AfterClass; @@ -263,6 +269,40 @@ public void testRowsFilteredMetric() throws Exception { testRowsSeenMetric(baseScan); } + @Test + public void testRowsFilteredMetricLimiter() throws Exception { + // Base scan configuration + Scan baseScan; + baseScan = new Scan(); + baseScan.setScanMetricsEnabled(true); + + // No matching column value should exist in any row. Filter all rows + Filter filter = + new SingleColumnValueFilter(FAMILIES[0], QUALIFIERS[0], CompareOperator.NOT_EQUAL, VALUE); + testRowsFilteredMetric(baseScan, filter, ROWS.length); + + HRegionServer rs1; + // update server side max count of rows filtered config. + try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(TABLE.getName())) { + RegionInfo firstHRI = locator.getAllRegionLocations().get(0).getRegion(); + rs1 = TEST_UTIL.getHBaseCluster() + .getRegionServer(TEST_UTIL.getHBaseCluster().getServerWith(firstHRI.getRegionName())); + } + + Configuration conf = TEST_UTIL.getConfiguration(); + // set max rows filtered limitation. + conf.setLong(RegionScannerLimiter.HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY, 5); + rs1.getConfigurationManager().notifyAllObservers(conf); + + assertThrows("Should throw a DoNotRetryIOException when too many rows have been filtered.", + DoNotRetryIOException.class, () -> testRowsFilteredMetric(baseScan, filter, ROWS.length)); + + // no max rows filtered limitation. + conf.setLong(RegionScannerLimiter.HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY, 0); + rs1.getConfigurationManager().notifyAllObservers(conf); + testRowsFilteredMetric(baseScan, filter, ROWS.length); + } + private void testRowsFilteredMetric(Scan baseScan) throws Exception { testRowsFilteredMetric(baseScan, null, 0); From 40b60de0b519321cf48f6c4d05ca8687b7b69c8b Mon Sep 17 00:00:00 2001 From: Fantasy-Jay <13631435453@163.com> Date: Wed, 27 Sep 2023 17:37:21 +0800 Subject: [PATCH 2/7] Limit max count of rows filtered per scan request. --- .../hbase/regionserver/HRegionServer.java | 9 +++++- .../hbase/regionserver/RegionScannerImpl.java | 17 +++++----- .../regionserver/RegionScannerLimiter.java | 31 ++++++------------- 3 files changed, 28 insertions(+), 29 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index d223bfed83c0..eef9c8ad318f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -379,6 +379,8 @@ public class HRegionServer extends HBaseServerBase private final RegionServerAccounting regionServerAccounting; + private final RegionScannerLimiter regionScannerLimiter; + private NamedQueueServiceChore namedQueueServiceChore = null; // Block cache @@ -521,6 +523,7 @@ public HRegionServer(final Configuration conf) throws IOException { HConstants.DEFAULT_HBASE_RPC_SHORTOPERATION_RETRY_PAUSE_TIME); regionServerAccounting = new RegionServerAccounting(conf); + regionScannerLimiter = new RegionScannerLimiter(conf); blockCache = BlockCacheFactory.createBlockCache(conf); mobFileCache = new MobFileCache(conf); @@ -2078,7 +2081,7 @@ private void registerConfigurationObservers() { configurationManager.registerObserver(this.cacheFlusher); configurationManager.registerObserver(this.rpcServices); configurationManager.registerObserver(this); - configurationManager.registerObserver(RegionScannerLimiter.create(conf)); + configurationManager.registerObserver(regionScannerLimiter); } /* @@ -3638,4 +3641,8 @@ protected void stopChores() { public RegionReplicationBufferManager getRegionReplicationBufferManager() { return regionReplicationBufferManager; } + + public RegionScannerLimiter getRegionScannerLimiter() { + return regionScannerLimiter; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java index a6846784b4e1..cf959424029e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java @@ -642,13 +642,16 @@ private void incrementCountOfRowsFilteredMetric(ScannerContext scannerContext) } long countOfRowsFiltered = scannerContext.getMetrics().countOfRowsFiltered.incrementAndGet(); - long maxRowsFilteredPerRequest = RegionScannerLimiter.get().getMaxRowsFilteredPerRequest(); - if (maxRowsFilteredPerRequest > 0 && countOfRowsFiltered >= maxRowsFilteredPerRequest) { - String errMsg = String.format( - "Too many rows filtered, higher than the limit threshold of %s, so kill the scan request!", - maxRowsFilteredPerRequest); - LOG.warn("ScannerContext={}, errMsg={}", scannerContext, errMsg); - throw new DoNotRetryIOException(errMsg); + if (region.rsServices instanceof HRegionServer) { + long maxRowsFilteredPerRequest = ((HRegionServer) region.rsServices).getRegionScannerLimiter() + .getMaxRowsFilteredPerRequest(); + if (maxRowsFilteredPerRequest > 0 && countOfRowsFiltered >= maxRowsFilteredPerRequest) { + String errMsg = String.format( + "Too many rows filtered, higher than the limit threshold of %s, so kill the scan request!", + maxRowsFilteredPerRequest); + LOG.warn("ScannerContext={}, errMsg={}", scannerContext, errMsg); + throw new DoNotRetryIOException(errMsg); + } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerLimiter.java index 0383de473032..03493bf0ff21 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerLimiter.java @@ -34,22 +34,20 @@ public class RegionScannerLimiter implements ConfigurationObserver { public static final String HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY = "hbase.server.scanner.max.rows.filtered.per.request"; - private static RegionScannerLimiter INSTANCE; - // Max count of rows filtered per request. If zero, it means no limitation. private volatile long maxRowsFilteredPerRequest = 0; - private RegionScannerLimiter(Configuration conf) { - updateLimiterConf(conf, HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY); + public RegionScannerLimiter(Configuration conf) { + updateLimiterConf(conf); } - private void updateLimiterConf(Configuration conf, String configKey) { + private void updateLimiterConf(Configuration conf) { try { - if (conf.get(configKey) == null) { + if (conf.get(HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY) == null) { return; } - long targetValue = conf.getLong(configKey, -1); + long targetValue = conf.getLong(HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY, -1); if (targetValue < 0) { LOG.warn("Invalid parameter, should be greater than or equal to zero, target value: {}", targetValue); @@ -59,11 +57,13 @@ private void updateLimiterConf(Configuration conf, String configKey) { return; } - LOG.info("Config key={}, old value={}, new value={}", configKey, maxRowsFilteredPerRequest, + LOG.info("Config key={}, old value={}, new value={}", + HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY, maxRowsFilteredPerRequest, targetValue); this.maxRowsFilteredPerRequest = targetValue; } catch (Exception e) { - LOG.error("Failed to update config key: {}", configKey, e); + LOG.error("Failed to update config key: {}", + HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY, e); } } @@ -71,19 +71,8 @@ public long getMaxRowsFilteredPerRequest() { return this.maxRowsFilteredPerRequest; } - public static RegionScannerLimiter get() { - return INSTANCE; - } - - public static synchronized RegionScannerLimiter create(Configuration conf) { - if (INSTANCE == null) { - INSTANCE = new RegionScannerLimiter(conf); - } - return INSTANCE; - } - @Override public void onConfigurationChange(Configuration conf) { - updateLimiterConf(conf, HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY); + updateLimiterConf(conf); } } From 8ac11d8aff3ccc54853c1c010f0627202a20f292 Mon Sep 17 00:00:00 2001 From: Fantasy-Jay <13631435453@163.com> Date: Wed, 27 Sep 2023 21:45:41 +0800 Subject: [PATCH 3/7] Limit max count of rows filtered per scan request. --- .../hadoop/hbase/regionserver/RegionScannerImpl.java | 6 +++--- .../hbase/regionserver/RegionScannerLimiter.java | 11 +++++++++-- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java index cf959424029e..77f813ff4434 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java @@ -646,9 +646,9 @@ private void incrementCountOfRowsFilteredMetric(ScannerContext scannerContext) long maxRowsFilteredPerRequest = ((HRegionServer) region.rsServices).getRegionScannerLimiter() .getMaxRowsFilteredPerRequest(); if (maxRowsFilteredPerRequest > 0 && countOfRowsFiltered >= maxRowsFilteredPerRequest) { - String errMsg = String.format( - "Too many rows filtered, higher than the limit threshold of %s, so kill the scan request!", - maxRowsFilteredPerRequest); + String errMsg = + String.format("Too many rows filtered, higher than the limit threshold of %s, " + + "so kill the scan request!", maxRowsFilteredPerRequest); LOG.warn("ScannerContext={}, errMsg={}", scannerContext, errMsg); throw new DoNotRetryIOException(errMsg); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerLimiter.java index 03493bf0ff21..daf1d9a52931 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerLimiter.java @@ -24,7 +24,13 @@ import org.slf4j.LoggerFactory; /** - * Limit max count of rows filtered per scan request. + * Limit max count of rows filtered per scan request. This Limiter applies globally to scan + * requests, and the config key is + * {@link RegionScannerLimiter#HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY}. When heavily + * filtered scan requests frequently cause high load on the RegionServer, you can set the + * {@link RegionScannerLimiter#HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY} to a larger + * value (for example, 100,000) to kill those scan requests. When you want to revert, just set the + * {@link RegionScannerLimiter#HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY} to 0. */ @InterfaceAudience.Private public class RegionScannerLimiter implements ConfigurationObserver { @@ -34,7 +40,8 @@ public class RegionScannerLimiter implements ConfigurationObserver { public static final String HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY = "hbase.server.scanner.max.rows.filtered.per.request"; - // Max count of rows filtered per request. If zero, it means no limitation. + // Max count of rows filtered per scan request. If equals zero, it means no limitation. + // Note: No limitation by default. private volatile long maxRowsFilteredPerRequest = 0; public RegionScannerLimiter(Configuration conf) { From 8fa5678c6c50e0293c4a0df1876752bdae7849cb Mon Sep 17 00:00:00 2001 From: Fantasy-Jay <13631435453@163.com> Date: Sun, 8 Oct 2023 16:04:44 +0800 Subject: [PATCH 4/7] Limit max count of rows filtered per scan request. --- .../hbase/regionserver/RSRpcServices.java | 16 +++- .../hbase/regionserver/RegionScanner.java | 8 ++ .../hbase/regionserver/RegionScannerImpl.java | 55 +++++++++--- .../regionserver/RegionScannerLimiter.java | 88 ++++++++++++++----- .../hbase/regionserver/ScannerContext.java | 1 + ...stServerSideScanMetricsFromClientSide.java | 50 +++++++++-- 6 files changed, 175 insertions(+), 43 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 57efe505c126..2eb3992614a0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -1341,6 +1341,7 @@ private RegionScannerHolder addScanner(String scannerName, RegionScanner s, Ship s instanceof RpcCallback ? (RpcCallback) s : new RegionScannerCloseCallBack(s); RegionScannerHolder rsh = new RegionScannerHolder(s, r, closeCallback, shippedCallback, needCursor, fullRegionScan, getRemoteClientIpAndPort(), getUserName()); + s.setName(scannerName); RegionScannerHolder existing = scanners.putIfAbsent(scannerName, rsh); assert existing == null : "scannerId must be unique within regionserver's whole lifecycle! " + scannerName + ", " + existing; @@ -3330,6 +3331,11 @@ private void scan(HBaseRpcController controller, ScanRequest request, RegionScan contextBuilder.setTimeLimit(timeScope, timeLimit); contextBuilder.setTrackMetrics(trackMetrics); ScannerContext scannerContext = contextBuilder.build(); + + RegionScannerLimiter regionScannerLimiter = + ((HRegionServer) region.rsServices).getRegionScannerLimiter(); + regionScannerLimiter.setFilterRowsLimitReached(scanner.getName(), false); + boolean limitReached = false; while (numOfResults < maxResults) { // Reset the batch progress to 0 before every call to RegionScanner#nextRaw. The @@ -3405,7 +3411,10 @@ private void scan(HBaseRpcController controller, ScanRequest request, RegionScan boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS); boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS); boolean resultsLimitReached = numOfResults >= maxResults; - limitReached = sizeLimitReached || timeLimitReached || resultsLimitReached; + boolean filterRowsLimitReached = + regionScannerLimiter.isFilterRowsLimitReached(scanner.getName()); + limitReached = + sizeLimitReached || timeLimitReached || resultsLimitReached || filterRowsLimitReached; if (limitReached || !moreRows) { // With block size limit, we may exceed size limit without collecting any results. @@ -3416,7 +3425,10 @@ private void scan(HBaseRpcController controller, ScanRequest request, RegionScan // there are more values to be read server side. If there aren't more values, // marking it as a heartbeat is wasteful because the client will need to issue // another ScanRequest only to realize that they already have all the values - if (moreRows && (timeLimitReached || sizeLimitReachedWithoutResults)) { + if ( + moreRows + && (timeLimitReached || sizeLimitReachedWithoutResults || filterRowsLimitReached) + ) { // Heartbeat messages occur when the time limit has been reached, or size limit has // been reached before collecting any results. This can happen for heavily filtered // scans which scan over too many blocks. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java index cea136a9a057..b4b5640789d8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScanner.java @@ -110,4 +110,12 @@ default String getOperationId() { * @throws IOException e */ boolean nextRaw(List result, ScannerContext scannerContext) throws IOException; + + default void setName(String name) { + + } + + default String getName() { + return null; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java index 77f813ff4434..9fe24a18c2ab 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java @@ -91,6 +91,7 @@ class RegionScannerImpl implements RegionScanner, Shipper, RpcCallback { private final ScannerContext defaultScannerContext; private final FilterWrapper filter; private final String operationId; + private String name; private RegionServerServices rsServices; @@ -488,7 +489,10 @@ private boolean nextInternal(List results, ScannerContext scannerContext) // Check if rowkey filter wants to exclude this row. If so, loop to next. // Technically, if we hit limits before on this row, we don't need this call. if (filterRowKey(current)) { - incrementCountOfRowsFilteredMetric(scannerContext); + if (incrementCountOfRowsFilteredMetric(scannerContext)) { + return scannerContext.setScannerState(NextState.FILTERED_ROWS_LIMIT_REACHED) + .hasMoreValues(); + } // early check, see HBASE-16296 if (isFilterDoneInternal()) { return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); @@ -553,7 +557,10 @@ private boolean nextInternal(List results, ScannerContext scannerContext) } if (isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE || filterRow()) { - incrementCountOfRowsFilteredMetric(scannerContext); + if (incrementCountOfRowsFilteredMetric(scannerContext)) { + return scannerContext.setScannerState(NextState.FILTERED_ROWS_LIMIT_REACHED) + .hasMoreValues(); + } results.clear(); boolean moreRows = nextRow(scannerContext, current); if (!moreRows) { @@ -605,7 +612,10 @@ private boolean nextInternal(List results, ScannerContext scannerContext) // Double check to prevent empty rows from appearing in result. It could be // the case when SingleColumnValueExcludeFilter is used. if (results.isEmpty()) { - incrementCountOfRowsFilteredMetric(scannerContext); + if (incrementCountOfRowsFilteredMetric(scannerContext)) { + return scannerContext.setScannerState(NextState.FILTERED_ROWS_LIMIT_REACHED) + .hasMoreValues(); + } boolean moreRows = nextRow(scannerContext, current); if (!moreRows) { return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues(); @@ -630,7 +640,7 @@ private boolean nextInternal(List results, ScannerContext scannerContext) } } - private void incrementCountOfRowsFilteredMetric(ScannerContext scannerContext) + private boolean incrementCountOfRowsFilteredMetric(ScannerContext scannerContext) throws DoNotRetryIOException { region.filteredReadRequestsCount.increment(); if (region.getMetrics() != null) { @@ -638,21 +648,29 @@ private void incrementCountOfRowsFilteredMetric(ScannerContext scannerContext) } if (scannerContext == null || !scannerContext.isTrackingMetrics()) { - return; + return false; } long countOfRowsFiltered = scannerContext.getMetrics().countOfRowsFiltered.incrementAndGet(); if (region.rsServices instanceof HRegionServer) { - long maxRowsFilteredPerRequest = ((HRegionServer) region.rsServices).getRegionScannerLimiter() - .getMaxRowsFilteredPerRequest(); + RegionScannerLimiter regionScannerLimiter = + ((HRegionServer) region.rsServices).getRegionScannerLimiter(); + long maxRowsFilteredPerRequest = regionScannerLimiter.getMaxRowsFilteredPerRequest(); if (maxRowsFilteredPerRequest > 0 && countOfRowsFiltered >= maxRowsFilteredPerRequest) { - String errMsg = - String.format("Too many rows filtered, higher than the limit threshold of %s, " - + "so kill the scan request!", maxRowsFilteredPerRequest); - LOG.warn("ScannerContext={}, errMsg={}", scannerContext, errMsg); - throw new DoNotRetryIOException(errMsg); + regionScannerLimiter.setFilterRowsLimitReached(getName(), true); + if (regionScannerLimiter.killRequest()) { + String errMsg = + String.format("Too many rows filtered, higher than the limit threshold of %s, " + + "so kill the scan request!", maxRowsFilteredPerRequest); + LOG.warn("ScannerContext={}, errMsg={}", scannerContext, errMsg); + throw new DoNotRetryIOException(errMsg); + } else { + return true; + } } } + + return false; } private void incrementCountOfRowsScannedMetric(ScannerContext scannerContext) { @@ -763,6 +781,9 @@ protected boolean shouldStop(Cell currentRowCell) { @edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "IS2_INCONSISTENT_SYNC", justification = "this method is only called inside close which is synchronized") private void closeInternal() { + if (region.rsServices instanceof HRegionServer) { + ((HRegionServer) region.rsServices).getRegionScannerLimiter().removeScanner(getName()); + } if (storeHeap != null) { storeHeap.close(); storeHeap = null; @@ -819,4 +840,14 @@ public void run() throws IOException { // callback this.close(); } + + @Override + public void setName(String name) { + this.name = name; + } + + @Override + public String getName() { + return name; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerLimiter.java index daf1d9a52931..d6f211f8b134 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerLimiter.java @@ -17,6 +17,10 @@ */ package org.apache.hadoop.hbase.regionserver; +import com.google.common.annotations.VisibleForTesting; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.function.Function; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.conf.ConfigurationObserver; import org.apache.yetus.audience.InterfaceAudience; @@ -29,7 +33,10 @@ * {@link RegionScannerLimiter#HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY}. When heavily * filtered scan requests frequently cause high load on the RegionServer, you can set the * {@link RegionScannerLimiter#HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY} to a larger - * value (for example, 100,000) to kill those scan requests. When you want to revert, just set the + * value (e.g. 100,000) to limit those scan requests. If you want to kill the scan request at the + * same time, you can set + * {@link RegionScannerLimiter#HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_REACHED_REQUEST_KILLED_KEY} to + * true. If you want to disable this feature, just set the * {@link RegionScannerLimiter#HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY} to 0. */ @InterfaceAudience.Private @@ -40,37 +47,33 @@ public class RegionScannerLimiter implements ConfigurationObserver { public static final String HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY = "hbase.server.scanner.max.rows.filtered.per.request"; + public static final String HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_REACHED_REQUEST_KILLED_KEY = + "hbase.server.scanner.max.rows.filtered.reached.request.killed"; + // Max count of rows filtered per scan request. If equals zero, it means no limitation. // Note: No limitation by default. private volatile long maxRowsFilteredPerRequest = 0; + // Killing scan request when TRUE. + private volatile boolean requestKilled = false; + + private final ConcurrentMap scanners = new ConcurrentHashMap<>(); public RegionScannerLimiter(Configuration conf) { - updateLimiterConf(conf); + onConfigurationChange(conf); } - private void updateLimiterConf(Configuration conf) { + private void updateLimiterConf(Configuration conf, String configKey, T oldValue, + Function applyFunc) { try { - if (conf.get(HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY) == null) { + if (conf.get(configKey) == null) { return; } - - long targetValue = conf.getLong(HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY, -1); - if (targetValue < 0) { - LOG.warn("Invalid parameter, should be greater than or equal to zero, target value: {}", - targetValue); - return; + T targetValue = applyFunc.apply(configKey); + if (targetValue != null) { + LOG.info("Config key={}, old value={}, new value={}", configKey, oldValue, targetValue); } - if (maxRowsFilteredPerRequest == targetValue) { - return; - } - - LOG.info("Config key={}, old value={}, new value={}", - HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY, maxRowsFilteredPerRequest, - targetValue); - this.maxRowsFilteredPerRequest = targetValue; } catch (Exception e) { - LOG.error("Failed to update config key: {}", - HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY, e); + LOG.error("Failed to update config key: {}", configKey, e); } } @@ -78,8 +81,51 @@ public long getMaxRowsFilteredPerRequest() { return this.maxRowsFilteredPerRequest; } + public boolean isFilterRowsLimitReached(String scannerName) { + return scanners.getOrDefault(scannerName, false); + } + + public void setFilterRowsLimitReached(String scannerName, boolean limitReached) { + scanners.put(scannerName, limitReached); + } + + public void removeScanner(String scannerName) { + scanners.remove(scannerName); + } + + public boolean killRequest() { + return requestKilled; + } + + @VisibleForTesting + public ConcurrentMap getScanners() { + return scanners; + } + @Override public void onConfigurationChange(Configuration conf) { - updateLimiterConf(conf); + updateLimiterConf(conf, HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY, + maxRowsFilteredPerRequest, configKey -> { + long targetValue = conf.getLong(configKey, -1); + if (targetValue < 0) { + LOG.warn("Invalid parameter, should be greater than or equal to zero, target value: {}", + targetValue); + return null; + } + if (maxRowsFilteredPerRequest == targetValue) { + return null; + } + maxRowsFilteredPerRequest = targetValue; + return targetValue; + }); + updateLimiterConf(conf, HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_REACHED_REQUEST_KILLED_KEY, + requestKilled, configKey -> { + boolean targetValue = conf.getBoolean(configKey, false); + if (targetValue == requestKilled) { + return null; + } + requestKilled = targetValue; + return targetValue; + }); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java index 03d84f209b04..94d83f6750cb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java @@ -467,6 +467,7 @@ public enum NextState { MORE_VALUES(true, false), NO_MORE_VALUES(false, false), SIZE_LIMIT_REACHED(true, true), + FILTERED_ROWS_LIMIT_REACHED(true, true), /** * Special case of size limit reached to indicate that the size limit was reached in the middle diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerSideScanMetricsFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerSideScanMetricsFromClientSide.java index 9c824ceb3217..abd94447806a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerSideScanMetricsFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerSideScanMetricsFromClientSide.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; @@ -280,9 +281,11 @@ public void testRowsFilteredMetricLimiter() throws Exception { Filter filter = new SingleColumnValueFilter(FAMILIES[0], QUALIFIERS[0], CompareOperator.NOT_EQUAL, VALUE); testRowsFilteredMetric(baseScan, filter, ROWS.length); + long scanRpcCalls = + getScanMetricValue(new Scan(baseScan).setFilter(filter), ScanMetrics.RPC_CALLS_METRIC_NAME); HRegionServer rs1; - // update server side max count of rows filtered config. + // Update server side max count of rows filtered config. try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(TABLE.getName())) { RegionInfo firstHRI = locator.getAllRegionLocations().get(0).getRegion(); rs1 = TEST_UTIL.getHBaseCluster() @@ -290,17 +293,44 @@ public void testRowsFilteredMetricLimiter() throws Exception { } Configuration conf = TEST_UTIL.getConfiguration(); - // set max rows filtered limitation. - conf.setLong(RegionScannerLimiter.HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY, 5); + // Set max rows filtered limitation. + conf.setLong(RegionScannerLimiter.HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY, 7); + conf.setBoolean( + RegionScannerLimiter.HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_REACHED_REQUEST_KILLED_KEY, true); rs1.getConfigurationManager().notifyAllObservers(conf); assertThrows("Should throw a DoNotRetryIOException when too many rows have been filtered.", DoNotRetryIOException.class, () -> testRowsFilteredMetric(baseScan, filter, ROWS.length)); + conf.setBoolean( + RegionScannerLimiter.HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_REACHED_REQUEST_KILLED_KEY, + false); + rs1.getConfigurationManager().notifyAllObservers(conf); + testRowsFilteredMetric(baseScan, filter, ROWS.length); + + // Test scan rpc calls. + long scanRpcCalls2 = + getScanMetricValue(new Scan(baseScan).setFilter(filter), ScanMetrics.RPC_CALLS_METRIC_NAME); + assertEquals(scanRpcCalls + 1, scanRpcCalls2); + + conf.setLong(RegionScannerLimiter.HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY, 5); + rs1.getConfigurationManager().notifyAllObservers(conf); + long scanRpcCalls3 = + getScanMetricValue(new Scan(baseScan).setFilter(filter), ScanMetrics.RPC_CALLS_METRIC_NAME); + assertEquals(scanRpcCalls + 2, scanRpcCalls3); + + conf.setLong(RegionScannerLimiter.HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY, 3); + rs1.getConfigurationManager().notifyAllObservers(conf); + long scanRpcCalls4 = + getScanMetricValue(new Scan(baseScan).setFilter(filter), ScanMetrics.RPC_CALLS_METRIC_NAME); + assertEquals(scanRpcCalls + 3, scanRpcCalls4); + // no max rows filtered limitation. conf.setLong(RegionScannerLimiter.HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY, 0); rs1.getConfigurationManager().notifyAllObservers(conf); testRowsFilteredMetric(baseScan, filter, ROWS.length); + + assertEquals(0, rs1.getRegionScannerLimiter().getScanners().size()); } private void testRowsFilteredMetric(Scan baseScan) throws Exception { @@ -373,6 +403,13 @@ private void testRowsFilteredMetric(Scan baseScan, Filter filter, int expectedNu * @throws Exception on unexpected failure */ private void testMetric(Scan scan, String metricKey, long expectedValue) throws Exception { + final long actualMetricValue = getScanMetricValue(scan, metricKey); + assertEquals( + "Metric: " + metricKey + " Expected: " + expectedValue + " Actual: " + actualMetricValue, + expectedValue, actualMetricValue); + } + + private static long getScanMetricValue(Scan scan, String metricKey) throws IOException { assertTrue("Scan should be configured to record metrics", scan.isScanMetricsEnabled()); ResultScanner scanner = TABLE.getScanner(scan); // Iterate through all the results @@ -381,11 +418,8 @@ private void testMetric(Scan scan, String metricKey, long expectedValue) throws } scanner.close(); ScanMetrics metrics = scanner.getScanMetrics(); - assertTrue("Metrics are null", metrics != null); + assertNotNull("Metrics are null", metrics); assertTrue("Metric : " + metricKey + " does not exist", metrics.hasCounter(metricKey)); - final long actualMetricValue = metrics.getCounter(metricKey).get(); - assertEquals( - "Metric: " + metricKey + " Expected: " + expectedValue + " Actual: " + actualMetricValue, - expectedValue, actualMetricValue); + return metrics.getCounter(metricKey).get(); } } From 3f96c2739344ee7706a4335f3d45f63a634ed559 Mon Sep 17 00:00:00 2001 From: Fantasy-Jay <13631435453@163.com> Date: Sun, 8 Oct 2023 16:46:03 +0800 Subject: [PATCH 5/7] Limit max count of rows filtered per scan request. --- .../apache/hadoop/hbase/regionserver/RegionScannerLimiter.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerLimiter.java index d6f211f8b134..b35c6dba0480 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerLimiter.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hbase.regionserver; -import com.google.common.annotations.VisibleForTesting; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.function.Function; @@ -27,6 +26,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; + /** * Limit max count of rows filtered per scan request. This Limiter applies globally to scan * requests, and the config key is From d445f694619e800f07c02b6c3075cfba3485795b Mon Sep 17 00:00:00 2001 From: Fantasy-Jay <13631435453@163.com> Date: Sun, 8 Oct 2023 17:09:19 +0800 Subject: [PATCH 6/7] Limit max count of rows filtered per scan request. --- .../apache/hadoop/hbase/regionserver/RegionScannerLimiter.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerLimiter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerLimiter.java index b35c6dba0480..20eae61e0b4e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerLimiter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerLimiter.java @@ -26,8 +26,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; - /** * Limit max count of rows filtered per scan request. This Limiter applies globally to scan * requests, and the config key is @@ -98,7 +96,6 @@ public boolean killRequest() { return requestKilled; } - @VisibleForTesting public ConcurrentMap getScanners() { return scanners; } From 05afeae16a47f77652751f69608e78b11e460db9 Mon Sep 17 00:00:00 2001 From: Fantasy-Jay <13631435453@163.com> Date: Thu, 21 Dec 2023 17:58:57 +0800 Subject: [PATCH 7/7] Limit max count of rows filtered per scan request. --- .../hbase/regionserver/RegionScannerImpl.java | 9 +- .../hbase/regionserver/ScannerContext.java | 36 ++- ...stServerSideScanMetricsFromClientSide.java | 84 +------ .../TestScannerFilteredRowsLimits.java | 213 ++++++++++++++++++ 4 files changed, 256 insertions(+), 86 deletions(-) create mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerFilteredRowsLimits.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java index 9fe24a18c2ab..c8c49243fe77 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionScannerImpl.java @@ -647,11 +647,16 @@ private boolean incrementCountOfRowsFilteredMetric(ScannerContext scannerContext region.getMetrics().updateFilteredRecords(); } - if (scannerContext == null || !scannerContext.isTrackingMetrics()) { + if (scannerContext == null) { return false; } - long countOfRowsFiltered = scannerContext.getMetrics().countOfRowsFiltered.incrementAndGet(); + if (scannerContext.isTrackingMetrics()) { + scannerContext.getMetrics().countOfRowsFiltered.incrementAndGet(); + } + + scannerContext.incrementFilteredRowsProgress(1); + long countOfRowsFiltered = scannerContext.getFilterRowsProgress(); if (region.rsServices instanceof HRegionServer) { RegionScannerLimiter regionScannerLimiter = ((HRegionServer) region.rsServices).getRegionScannerLimiter(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java index 94d83f6750cb..8709cc58b0e1 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java @@ -130,7 +130,7 @@ public class ScannerContext { } // Progress fields are initialized to 0 - progress = new ProgressFields(0, 0, 0, 0); + progress = new ProgressFields(0, 0, 0, 0, 0); this.keepProgress = keepProgress; this.scannerState = DEFAULT_STATE; @@ -220,6 +220,13 @@ void incrementBlockProgress(int blockSize) { } } + void incrementFilteredRowsProgress(int filteredRows) { + if (filteredRows > 0) { + long curFilteredRows = progress.getFilteredRows(); + progress.setFilteredRows(curFilteredRows + filteredRows); + } + } + int getBatchProgress() { return progress.getBatch(); } @@ -236,6 +243,10 @@ long getBlockSizeProgress() { return progress.getBlockSize(); } + long getFilterRowsProgress() { + return progress.getFilteredRows(); + } + void setProgress(int batchProgress, long sizeProgress, long heapSizeProgress) { setBatchProgress(batchProgress); setSizeProgress(sizeProgress, heapSizeProgress); @@ -257,7 +268,7 @@ void setBatchProgress(int batchProgress) { * filtered. */ void clearProgress() { - progress.setFields(0, 0, 0, getBlockSizeProgress()); + progress.setFields(0, 0, 0, getBlockSizeProgress(), getFilterRowsProgress()); } /** @@ -736,6 +747,7 @@ private static class ProgressFields { private static int DEFAULT_BATCH = -1; private static long DEFAULT_SIZE = -1L; + private static long DEFAULT_ROWS = -1L; // The batch limit will always be enforced between cells, thus, there isn't a field to hold the // batch scope @@ -749,19 +761,22 @@ private static class ProgressFields { // The total amount of block bytes that have been loaded in order to process cells for the // request. long blockSize = DEFAULT_SIZE; + // The total amount of rows that have been filtered in order to process this scan request. + long filteredRows = DEFAULT_ROWS; - ProgressFields(int batch, long size, long heapSize, long blockSize) { - setFields(batch, size, heapSize, blockSize); + ProgressFields(int batch, long size, long heapSize, long blockSize, long filteredRows) { + setFields(batch, size, heapSize, blockSize, filteredRows); } /** * Set all fields together. */ - void setFields(int batch, long dataSize, long heapSize, long blockSize) { + void setFields(int batch, long dataSize, long heapSize, long blockSize, long filteredRows) { setBatch(batch); setDataSize(dataSize); setHeapSize(heapSize); setBlockSize(blockSize); + setFilteredRows(filteredRows); } int getBatch() { @@ -784,6 +799,10 @@ long getBlockSize() { return this.blockSize; } + long getFilteredRows() { + return this.filteredRows; + } + void setDataSize(long dataSize) { this.dataSize = dataSize; } @@ -792,6 +811,10 @@ void setBlockSize(long blockSize) { this.blockSize = blockSize; } + void setFilteredRows(long filteredRows) { + this.filteredRows = filteredRows; + } + void setHeapSize(long heapSize) { this.heapSize = heapSize; } @@ -813,6 +836,9 @@ public String toString() { sb.append(", blockSize:"); sb.append(blockSize); + sb.append(", filteredRows:"); + sb.append(filteredRows); + sb.append("}"); return sb.toString(); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerSideScanMetricsFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerSideScanMetricsFromClientSide.java index abd94447806a..b80cd207683c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerSideScanMetricsFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestServerSideScanMetricsFromClientSide.java @@ -18,17 +18,12 @@ package org.apache.hadoop.hbase; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertThrows; import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.ArrayList; import java.util.List; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.RegionInfo; -import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; @@ -44,8 +39,6 @@ import org.apache.hadoop.hbase.filter.RowFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; -import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.regionserver.RegionScannerLimiter; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; import org.junit.AfterClass; @@ -270,69 +263,6 @@ public void testRowsFilteredMetric() throws Exception { testRowsSeenMetric(baseScan); } - @Test - public void testRowsFilteredMetricLimiter() throws Exception { - // Base scan configuration - Scan baseScan; - baseScan = new Scan(); - baseScan.setScanMetricsEnabled(true); - - // No matching column value should exist in any row. Filter all rows - Filter filter = - new SingleColumnValueFilter(FAMILIES[0], QUALIFIERS[0], CompareOperator.NOT_EQUAL, VALUE); - testRowsFilteredMetric(baseScan, filter, ROWS.length); - long scanRpcCalls = - getScanMetricValue(new Scan(baseScan).setFilter(filter), ScanMetrics.RPC_CALLS_METRIC_NAME); - - HRegionServer rs1; - // Update server side max count of rows filtered config. - try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(TABLE.getName())) { - RegionInfo firstHRI = locator.getAllRegionLocations().get(0).getRegion(); - rs1 = TEST_UTIL.getHBaseCluster() - .getRegionServer(TEST_UTIL.getHBaseCluster().getServerWith(firstHRI.getRegionName())); - } - - Configuration conf = TEST_UTIL.getConfiguration(); - // Set max rows filtered limitation. - conf.setLong(RegionScannerLimiter.HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY, 7); - conf.setBoolean( - RegionScannerLimiter.HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_REACHED_REQUEST_KILLED_KEY, true); - rs1.getConfigurationManager().notifyAllObservers(conf); - - assertThrows("Should throw a DoNotRetryIOException when too many rows have been filtered.", - DoNotRetryIOException.class, () -> testRowsFilteredMetric(baseScan, filter, ROWS.length)); - - conf.setBoolean( - RegionScannerLimiter.HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_REACHED_REQUEST_KILLED_KEY, - false); - rs1.getConfigurationManager().notifyAllObservers(conf); - testRowsFilteredMetric(baseScan, filter, ROWS.length); - - // Test scan rpc calls. - long scanRpcCalls2 = - getScanMetricValue(new Scan(baseScan).setFilter(filter), ScanMetrics.RPC_CALLS_METRIC_NAME); - assertEquals(scanRpcCalls + 1, scanRpcCalls2); - - conf.setLong(RegionScannerLimiter.HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY, 5); - rs1.getConfigurationManager().notifyAllObservers(conf); - long scanRpcCalls3 = - getScanMetricValue(new Scan(baseScan).setFilter(filter), ScanMetrics.RPC_CALLS_METRIC_NAME); - assertEquals(scanRpcCalls + 2, scanRpcCalls3); - - conf.setLong(RegionScannerLimiter.HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY, 3); - rs1.getConfigurationManager().notifyAllObservers(conf); - long scanRpcCalls4 = - getScanMetricValue(new Scan(baseScan).setFilter(filter), ScanMetrics.RPC_CALLS_METRIC_NAME); - assertEquals(scanRpcCalls + 3, scanRpcCalls4); - - // no max rows filtered limitation. - conf.setLong(RegionScannerLimiter.HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY, 0); - rs1.getConfigurationManager().notifyAllObservers(conf); - testRowsFilteredMetric(baseScan, filter, ROWS.length); - - assertEquals(0, rs1.getRegionScannerLimiter().getScanners().size()); - } - private void testRowsFilteredMetric(Scan baseScan) throws Exception { testRowsFilteredMetric(baseScan, null, 0); @@ -403,13 +333,6 @@ private void testRowsFilteredMetric(Scan baseScan, Filter filter, int expectedNu * @throws Exception on unexpected failure */ private void testMetric(Scan scan, String metricKey, long expectedValue) throws Exception { - final long actualMetricValue = getScanMetricValue(scan, metricKey); - assertEquals( - "Metric: " + metricKey + " Expected: " + expectedValue + " Actual: " + actualMetricValue, - expectedValue, actualMetricValue); - } - - private static long getScanMetricValue(Scan scan, String metricKey) throws IOException { assertTrue("Scan should be configured to record metrics", scan.isScanMetricsEnabled()); ResultScanner scanner = TABLE.getScanner(scan); // Iterate through all the results @@ -418,8 +341,11 @@ private static long getScanMetricValue(Scan scan, String metricKey) throws IOExc } scanner.close(); ScanMetrics metrics = scanner.getScanMetrics(); - assertNotNull("Metrics are null", metrics); + assertTrue("Metrics are null", metrics != null); assertTrue("Metric : " + metricKey + " does not exist", metrics.hasCounter(metricKey)); - return metrics.getCounter(metricKey).get(); + final long actualMetricValue = metrics.getCounter(metricKey).get(); + assertEquals( + "Metric: " + metricKey + " Expected: " + expectedValue + " Actual: " + actualMetricValue, + expectedValue, actualMetricValue); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerFilteredRowsLimits.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerFilteredRowsLimits.java new file mode 100644 index 000000000000..54f3ab2bcafb --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerFilteredRowsLimits.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CompareOperator; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HTestConst; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.client.RegionLocator; +import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.metrics.ScanMetrics; +import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({ LargeTests.class }) +public class TestScannerFilteredRowsLimits { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestScannerFilteredRowsLimits.class); + + private final static HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + + private static int NUM_ROWS = 10; + private static byte[] ROW = Bytes.toBytes("testRow"); + private static byte[][] ROWS = HTestConst.makeNAscii(ROW, NUM_ROWS); + + private static int NUM_FAMILIES = 1; + private static byte[] FAMILY = Bytes.toBytes("testFamily"); + private static byte[][] FAMILIES = HTestConst.makeNAscii(FAMILY, NUM_FAMILIES); + + private static int NUM_QUALIFIERS = 1; + private static byte[] QUALIFIER = Bytes.toBytes("testQualifier"); + private static byte[][] QUALIFIERS = HTestConst.makeNAscii(QUALIFIER, NUM_QUALIFIERS); + + private static int VALUE_SIZE = 10; + private static byte[] VALUE = Bytes.createMaxByteArray(VALUE_SIZE); + + private static Table TABLE = null; + private static TableName TABLE_NAME = TableName.valueOf("testTable"); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniCluster(3); + TABLE = createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE); + } + + private static Table createTestTable(TableName name, byte[][] rows, byte[][] families, + byte[][] qualifiers, byte[] cellValue) throws IOException { + Table ht = TEST_UTIL.createTable(name, families); + List puts = createPuts(rows, families, qualifiers, cellValue); + ht.put(puts); + + return ht; + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + private static ArrayList createPuts(byte[][] rows, byte[][] families, byte[][] qualifiers, + byte[] value) throws IOException { + Put put; + ArrayList puts = new ArrayList<>(); + + for (int row = 0; row < rows.length; row++) { + put = new Put(rows[row]); + for (int fam = 0; fam < families.length; fam++) { + for (int qual = 0; qual < qualifiers.length; qual++) { + KeyValue kv = new KeyValue(rows[row], families[fam], qualifiers[qual], qual, value); + put.add(kv); + } + } + puts.add(put); + } + + return puts; + } + + @Test + public void testRowsFilteredMetricLimiter() throws Exception { + // Base scan configuration + Scan baseScan; + baseScan = new Scan(); + baseScan.setScanMetricsEnabled(true); + + // No matching column value should exist in any row. Filter all rows + Filter filter = + new SingleColumnValueFilter(FAMILIES[0], QUALIFIERS[0], CompareOperator.NOT_EQUAL, VALUE); + testRowsFilteredMetric(baseScan, filter, ROWS.length); + long scanRpcCalls = + getScanMetricValue(new Scan(baseScan).setFilter(filter), ScanMetrics.RPC_CALLS_METRIC_NAME); + + HRegionServer rs1; + // Update server side max count of rows filtered config. + try (RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(TABLE.getName())) { + RegionInfo firstHRI = locator.getAllRegionLocations().get(0).getRegion(); + rs1 = TEST_UTIL.getHBaseCluster() + .getRegionServer(TEST_UTIL.getHBaseCluster().getServerWith(firstHRI.getRegionName())); + } + + Configuration conf = TEST_UTIL.getConfiguration(); + // Set max rows filtered limitation. + conf.setLong(RegionScannerLimiter.HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY, 7); + conf.setBoolean( + RegionScannerLimiter.HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_REACHED_REQUEST_KILLED_KEY, true); + rs1.getConfigurationManager().notifyAllObservers(conf); + + assertThrows("Should throw a DoNotRetryIOException when too many rows have been filtered.", + DoNotRetryIOException.class, () -> testRowsFilteredMetric(baseScan, filter, ROWS.length)); + + conf.setBoolean( + RegionScannerLimiter.HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_REACHED_REQUEST_KILLED_KEY, + false); + rs1.getConfigurationManager().notifyAllObservers(conf); + testRowsFilteredMetric(baseScan, filter, ROWS.length); + + // Test scan rpc calls. + long scanRpcCalls2 = + getScanMetricValue(new Scan(baseScan).setFilter(filter), ScanMetrics.RPC_CALLS_METRIC_NAME); + assertEquals(scanRpcCalls + 1, scanRpcCalls2); + + conf.setLong(RegionScannerLimiter.HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY, 5); + rs1.getConfigurationManager().notifyAllObservers(conf); + long scanRpcCalls3 = + getScanMetricValue(new Scan(baseScan).setFilter(filter), ScanMetrics.RPC_CALLS_METRIC_NAME); + assertEquals(scanRpcCalls + 2, scanRpcCalls3); + + conf.setLong(RegionScannerLimiter.HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY, 3); + rs1.getConfigurationManager().notifyAllObservers(conf); + long scanRpcCalls4 = + getScanMetricValue(new Scan(baseScan).setFilter(filter), ScanMetrics.RPC_CALLS_METRIC_NAME); + assertEquals(scanRpcCalls + 3, scanRpcCalls4); + + // no max rows filtered limitation. + conf.setLong(RegionScannerLimiter.HBASE_SERVER_SCANNER_MAX_ROWS_FILTERED_PER_REQUEST_KEY, 0); + rs1.getConfigurationManager().notifyAllObservers(conf); + testRowsFilteredMetric(baseScan, filter, ROWS.length); + + assertEquals(0, rs1.getRegionScannerLimiter().getScanners().size()); + } + + private void testRowsFilteredMetric(Scan baseScan, Filter filter, int expectedNumFiltered) + throws Exception { + Scan scan = new Scan(baseScan); + if (filter != null) { + scan.setFilter(filter); + } + testMetric(scan, ServerSideScanMetrics.COUNT_OF_ROWS_FILTERED_KEY_METRIC_NAME, + expectedNumFiltered); + } + + private void testMetric(Scan scan, String metricKey, long expectedValue) throws Exception { + final long actualMetricValue = getScanMetricValue(scan, metricKey); + assertEquals( + "Metric: " + metricKey + " Expected: " + expectedValue + " Actual: " + actualMetricValue, + expectedValue, actualMetricValue); + } + + private static long getScanMetricValue(Scan scan, String metricKey) throws IOException { + assertTrue("Scan should be configured to record metrics", scan.isScanMetricsEnabled()); + ResultScanner scanner = TABLE.getScanner(scan); + // Iterate through all the results + while (scanner.next() != null) { + continue; + } + scanner.close(); + ScanMetrics metrics = scanner.getScanMetrics(); + assertNotNull("Metrics are null", metrics); + assertTrue("Metric : " + metricKey + " does not exist", metrics.hasCounter(metricKey)); + return metrics.getCounter(metricKey).get(); + } +}