diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index b2549c1d5165..cf32a7ecccee 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -334,10 +334,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.Blockin private ScannerIdGenerator scannerIdGenerator; private final ConcurrentMap scanners = new ConcurrentHashMap<>(); - // Hold the name of a closed scanner for a while. This is used to keep compatible for old clients - // which may send next or close request to a region scanner which has already been exhausted. The - // entries will be removed automatically after scannerLeaseTimeoutPeriod. - private final Cache closedScanners; + // Hold the name and last sequence number of a closed scanner for a while. This is used + // to keep compatible for old clients which may send next or close request to a region + // scanner which has already been exhausted. The entries will be removed automatically + // after scannerLeaseTimeoutPeriod. + private final Cache closedScanners; /** * The lease timeout period for client scanners (milliseconds). */ @@ -3090,8 +3091,18 @@ private RegionScannerHolder getRegionScanner(ScanRequest request) throws IOExcep RegionScannerHolder rsh = this.scanners.get(scannerName); if (rsh == null) { // just ignore the next or close request if scanner does not exists. - if (closedScanners.getIfPresent(scannerName) != null) { - throw SCANNER_ALREADY_CLOSED; + Long lastCallSeq = closedScanners.getIfPresent(scannerName); + if (lastCallSeq != null) { + // Check the sequence number to catch if the last call was incorrectly retried. + // The only allowed scenario is when the scanner is exhausted and one more scan + // request arrives - in this case returning 0 rows is correct. + if (request.hasNextCallSeq() && request.getNextCallSeq() != lastCallSeq + 1) { + throw new OutOfOrderScannerNextException("Expected nextCallSeq for closed request: " + + (lastCallSeq + 1) + " But the nextCallSeq got from client: " + + request.getNextCallSeq() + "; request=" + TextFormat.shortDebugString(request)); + } else { + throw SCANNER_ALREADY_CLOSED; + } } else { LOG.warn("Client tried to access missing scanner " + scannerName); throw new UnknownScannerException( @@ -3662,7 +3673,7 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque } if (!builder.getMoreResults() || !builder.getMoreResultsInRegion() || closeScanner) { scannerClosed = true; - closeScanner(region, scanner, scannerName, rpcCall); + closeScanner(region, scanner, scannerName, rpcCall, false); } return builder.build(); } catch (IOException e) { @@ -3672,7 +3683,7 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque // The scanner state might be left in a dirty state, so we will tell the Client to // fail this RPC and close the scanner while opening up another one from the start of // row that the client has last seen. - closeScanner(region, scanner, scannerName, rpcCall); + closeScanner(region, scanner, scannerName, rpcCall, true); // If it is a DoNotRetryIOException already, throw as it is. Unfortunately, DNRIOE is // used in two different semantics. @@ -3736,7 +3747,7 @@ private void runShippedCallback(RegionScannerHolder rsh) throws ServiceException } private void closeScanner(HRegion region, RegionScanner scanner, String scannerName, - RpcCallContext context) throws IOException { + RpcCallContext context, boolean isError) throws IOException { if (region.getCoprocessorHost() != null) { if (region.getCoprocessorHost().preScannerClose(scanner)) { // bypass the actual close. @@ -3753,7 +3764,9 @@ private void closeScanner(HRegion region, RegionScanner scanner, String scannerN if (region.getCoprocessorHost() != null) { region.getCoprocessorHost().postScannerClose(scanner); } - closedScanners.put(scannerName, scannerName); + if (!isError) { + closedScanners.put(scannerName, rsh.getNextCallSeq()); + } } }