@@ -303,10 +303,11 @@ public class RSRpcServices extends HBaseRpcServicesBase<HRegionServer>
303303
304304 private ScannerIdGenerator scannerIdGenerator ;
305305 private final ConcurrentMap <String , RegionScannerHolder > scanners = new ConcurrentHashMap <>();
306- // Hold the name of a closed scanner for a while. This is used to keep compatible for old clients
307- // which may send next or close request to a region scanner which has already been exhausted. The
308- // entries will be removed automatically after scannerLeaseTimeoutPeriod.
309- private final Cache <String , String > closedScanners ;
306+ // Hold the name and last sequence number of a closed scanner for a while. This is used
307+ // to keep compatible for old clients which may send next or close request to a region
308+ // scanner which has already been exhausted. The entries will be removed automatically
309+ // after scannerLeaseTimeoutPeriod.
310+ private final Cache <String , Long > closedScanners ;
310311 /**
311312 * The lease timeout period for client scanners (milliseconds).
312313 */
@@ -3083,8 +3084,16 @@ private RegionScannerHolder getRegionScanner(ScanRequest request) throws IOExcep
30833084 RegionScannerHolder rsh = this .scanners .get (scannerName );
30843085 if (rsh == null ) {
30853086 // just ignore the next or close request if scanner does not exists.
3086- if (closedScanners .getIfPresent (scannerName ) != null ) {
3087- throw SCANNER_ALREADY_CLOSED ;
3087+ Long lastCallSeq = closedScanners .getIfPresent (scannerName );
3088+ if (lastCallSeq != null ) {
3089+ // Check the sequence number to catch if the last call was incorrectly retried.
3090+ if (request .hasNextCallSeq () && request .getNextCallSeq () != lastCallSeq + 1 ) {
3091+ throw new OutOfOrderScannerNextException ("Expected nextCallSeq for closed request: "
3092+ + (lastCallSeq + 1 ) + " But the nextCallSeq got from client: "
3093+ + request .getNextCallSeq () + "; request=" + TextFormat .shortDebugString (request ));
3094+ } else {
3095+ throw SCANNER_ALREADY_CLOSED ;
3096+ }
30883097 } else {
30893098 LOG .warn ("Client tried to access missing scanner " + scannerName );
30903099 throw new UnknownScannerException (
@@ -3787,7 +3796,7 @@ private void closeScanner(HRegion region, RegionScanner scanner, String scannerN
37873796 if (region .getCoprocessorHost () != null ) {
37883797 region .getCoprocessorHost ().postScannerClose (scanner );
37893798 }
3790- closedScanners .put (scannerName , scannerName );
3799+ closedScanners .put (scannerName , rsh . getNextCallSeq () );
37913800 }
37923801 }
37933802
0 commit comments