Skip to content

Commit c63d658

Browse files
taklwucsringhofer
andauthored
HBASE-28595: check seq id of scan RPCs for closed scanners (#5910) (#5925)
Signed-off-by: Duo Zhang <[email protected]> Signed-off-by: Tak Lon (Stephen) Wu <[email protected]> Co-authored-by: csringhofer <[email protected]>
1 parent bd9d9c9 commit c63d658

File tree

1 file changed

+23
-10
lines changed

1 file changed

+23
-10
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -334,10 +334,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.Blockin
334334

335335
private ScannerIdGenerator scannerIdGenerator;
336336
private final ConcurrentMap<String, RegionScannerHolder> scanners = new ConcurrentHashMap<>();
337-
// Hold the name of a closed scanner for a while. This is used to keep compatible for old clients
338-
// which may send next or close request to a region scanner which has already been exhausted. The
339-
// entries will be removed automatically after scannerLeaseTimeoutPeriod.
340-
private final Cache<String, String> closedScanners;
337+
// Hold the name and last sequence number of a closed scanner for a while. This is used
338+
// to keep compatible for old clients which may send next or close request to a region
339+
// scanner which has already been exhausted. The entries will be removed automatically
340+
// after scannerLeaseTimeoutPeriod.
341+
private final Cache<String, Long> closedScanners;
341342
/**
342343
* The lease timeout period for client scanners (milliseconds).
343344
*/
@@ -3090,8 +3091,18 @@ private RegionScannerHolder getRegionScanner(ScanRequest request) throws IOExcep
30903091
RegionScannerHolder rsh = this.scanners.get(scannerName);
30913092
if (rsh == null) {
30923093
// just ignore the next or close request if scanner does not exists.
3093-
if (closedScanners.getIfPresent(scannerName) != null) {
3094-
throw SCANNER_ALREADY_CLOSED;
3094+
Long lastCallSeq = closedScanners.getIfPresent(scannerName);
3095+
if (lastCallSeq != null) {
3096+
// Check the sequence number to catch if the last call was incorrectly retried.
3097+
// The only allowed scenario is when the scanner is exhausted and one more scan
3098+
// request arrives - in this case returning 0 rows is correct.
3099+
if (request.hasNextCallSeq() && request.getNextCallSeq() != lastCallSeq + 1) {
3100+
throw new OutOfOrderScannerNextException("Expected nextCallSeq for closed request: "
3101+
+ (lastCallSeq + 1) + " But the nextCallSeq got from client: "
3102+
+ request.getNextCallSeq() + "; request=" + TextFormat.shortDebugString(request));
3103+
} else {
3104+
throw SCANNER_ALREADY_CLOSED;
3105+
}
30953106
} else {
30963107
LOG.warn("Client tried to access missing scanner " + scannerName);
30973108
throw new UnknownScannerException(
@@ -3662,7 +3673,7 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque
36623673
}
36633674
if (!builder.getMoreResults() || !builder.getMoreResultsInRegion() || closeScanner) {
36643675
scannerClosed = true;
3665-
closeScanner(region, scanner, scannerName, rpcCall);
3676+
closeScanner(region, scanner, scannerName, rpcCall, false);
36663677
}
36673678
return builder.build();
36683679
} catch (IOException e) {
@@ -3672,7 +3683,7 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque
36723683
// The scanner state might be left in a dirty state, so we will tell the Client to
36733684
// fail this RPC and close the scanner while opening up another one from the start of
36743685
// row that the client has last seen.
3675-
closeScanner(region, scanner, scannerName, rpcCall);
3686+
closeScanner(region, scanner, scannerName, rpcCall, true);
36763687

36773688
// If it is a DoNotRetryIOException already, throw as it is. Unfortunately, DNRIOE is
36783689
// used in two different semantics.
@@ -3736,7 +3747,7 @@ private void runShippedCallback(RegionScannerHolder rsh) throws ServiceException
37363747
}
37373748

37383749
private void closeScanner(HRegion region, RegionScanner scanner, String scannerName,
3739-
RpcCallContext context) throws IOException {
3750+
RpcCallContext context, boolean isError) throws IOException {
37403751
if (region.getCoprocessorHost() != null) {
37413752
if (region.getCoprocessorHost().preScannerClose(scanner)) {
37423753
// bypass the actual close.
@@ -3753,7 +3764,9 @@ private void closeScanner(HRegion region, RegionScanner scanner, String scannerN
37533764
if (region.getCoprocessorHost() != null) {
37543765
region.getCoprocessorHost().postScannerClose(scanner);
37553766
}
3756-
closedScanners.put(scannerName, scannerName);
3767+
if (!isError) {
3768+
closedScanners.put(scannerName, rsh.getNextCallSeq());
3769+
}
37573770
}
37583771
}
37593772

0 commit comments

Comments
 (0)