@@ -355,10 +355,11 @@ public class RSRpcServices
355355
356356 private ScannerIdGenerator scannerIdGenerator ;
357357 private final ConcurrentMap <String , RegionScannerHolder > scanners = new ConcurrentHashMap <>();
358- // Hold the name of a closed scanner for a while. This is used to keep compatible for old clients
359- // which may send next or close request to a region scanner which has already been exhausted. The
360- // entries will be removed automatically after scannerLeaseTimeoutPeriod.
361- private final Cache <String , String > closedScanners ;
358+ // Hold the name and last sequence number of a closed scanner for a while. This is used
359+ // to keep compatible for old clients which may send next or close request to a region
360+ // scanner which has already been exhausted. The entries will be removed automatically
361+ // after scannerLeaseTimeoutPeriod.
362+ private final Cache <String , Long > closedScanners ;
362363 /**
363364 * The lease timeout period for client scanners (milliseconds).
364365 */
@@ -3127,8 +3128,18 @@ private RegionScannerHolder getRegionScanner(ScanRequest request) throws IOExcep
31273128 RegionScannerHolder rsh = this .scanners .get (scannerName );
31283129 if (rsh == null ) {
31293130 // just ignore the next or close request if scanner does not exists.
3130- if (closedScanners .getIfPresent (scannerName ) != null ) {
3131- throw SCANNER_ALREADY_CLOSED ;
3131+ Long lastCallSeq = closedScanners .getIfPresent (scannerName );
3132+ if (lastCallSeq != null ) {
3133+ // Check the sequence number to catch if the last call was incorrectly retried.
3134+ // The only allowed scenario is when the scanner is exhausted and one more scan
3135+ // request arrives - in this case returning 0 rows is correct.
3136+ if (request .hasNextCallSeq () && request .getNextCallSeq () != lastCallSeq + 1 ) {
3137+ throw new OutOfOrderScannerNextException ("Expected nextCallSeq for closed request: "
3138+ + (lastCallSeq + 1 ) + " But the nextCallSeq got from client: "
3139+ + request .getNextCallSeq () + "; request=" + TextFormat .shortDebugString (request ));
3140+ } else {
3141+ throw SCANNER_ALREADY_CLOSED ;
3142+ }
31323143 } else {
31333144 LOG .warn ("Client tried to access missing scanner " + scannerName );
31343145 throw new UnknownScannerException (
@@ -3699,7 +3710,7 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque
36993710 }
37003711 if (!builder .getMoreResults () || !builder .getMoreResultsInRegion () || closeScanner ) {
37013712 scannerClosed = true ;
3702- closeScanner (region , scanner , scannerName , rpcCall );
3713+ closeScanner (region , scanner , scannerName , rpcCall , false );
37033714 }
37043715
37053716 // There's no point returning to a timed out client. Throwing ensures scanner is closed
@@ -3715,7 +3726,7 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque
37153726 // The scanner state might be left in a dirty state, so we will tell the Client to
37163727 // fail this RPC and close the scanner while opening up another one from the start of
37173728 // row that the client has last seen.
3718- closeScanner (region , scanner , scannerName , rpcCall );
3729+ closeScanner (region , scanner , scannerName , rpcCall , true );
37193730
37203731 // If it is a DoNotRetryIOException already, throw as it is. Unfortunately, DNRIOE is
37213732 // used in two different semantics.
@@ -3779,7 +3790,7 @@ private void runShippedCallback(RegionScannerHolder rsh) throws ServiceException
37793790 }
37803791
37813792 private void closeScanner (HRegion region , RegionScanner scanner , String scannerName ,
3782- RpcCallContext context ) throws IOException {
3793+ RpcCallContext context , boolean isError ) throws IOException {
37833794 if (region .getCoprocessorHost () != null ) {
37843795 if (region .getCoprocessorHost ().preScannerClose (scanner )) {
37853796 // bypass the actual close.
@@ -3796,7 +3807,9 @@ private void closeScanner(HRegion region, RegionScanner scanner, String scannerN
37963807 if (region .getCoprocessorHost () != null ) {
37973808 region .getCoprocessorHost ().postScannerClose (scanner );
37983809 }
3799- closedScanners .put (scannerName , scannerName );
3810+ if (!isError ) {
3811+ closedScanners .put (scannerName , rsh .getNextCallSeq ());
3812+ }
38003813 }
38013814 }
38023815
0 commit comments