2323import java .lang .reflect .InvocationTargetException ;
2424import java .lang .reflect .Method ;
2525import java .net .BindException ;
26+ import java .net .InetAddress ;
2627import java .net .InetSocketAddress ;
2728import java .nio .ByteBuffer ;
2829import java .util .ArrayList ;
9495import org .apache .hadoop .hbase .ipc .HBaseRpcController ;
9596import org .apache .hadoop .hbase .ipc .PriorityFunction ;
9697import org .apache .hadoop .hbase .ipc .QosPriority ;
98+ import org .apache .hadoop .hbase .ipc .RpcCall ;
9799import org .apache .hadoop .hbase .ipc .RpcCallContext ;
98100import org .apache .hadoop .hbase .ipc .RpcCallback ;
99101import org .apache .hadoop .hbase .ipc .RpcScheduler ;
@@ -399,7 +401,6 @@ public void run() throws IOException {
399401 * An Rpc callback for doing shipped() call on a RegionScanner.
400402 */
401403 private class RegionScannerShippedCallBack implements RpcCallback {
402-
403404 private final String scannerName ;
404405 private final Shipper shipper ;
405406 private final Lease lease ;
@@ -449,43 +450,48 @@ public void run() {
449450 /**
450451 * Holder class which holds the RegionScanner, nextCallSeq and RpcCallbacks together.
451452 */
452- private static final class RegionScannerHolder {
453-
453+ static final class RegionScannerHolder {
454454 private final AtomicLong nextCallSeq = new AtomicLong (0 );
455- private final String scannerName ;
456455 private final RegionScanner s ;
457456 private final HRegion r ;
458457 private final RpcCallback closeCallBack ;
459458 private final RpcCallback shippedCallback ;
460459 private byte [] rowOfLastPartialResult ;
461460 private boolean needCursor ;
462461 private boolean fullRegionScan ;
462+ private final String clientIPAndPort ;
463463
464- public RegionScannerHolder (String scannerName , RegionScanner s , HRegion r ,
464+ RegionScannerHolder (RegionScanner s , HRegion r ,
465465 RpcCallback closeCallBack , RpcCallback shippedCallback , boolean needCursor ,
466- boolean fullRegionScan ) {
467- this .scannerName = scannerName ;
466+ boolean fullRegionScan , String clientIPAndPort ) {
468467 this .s = s ;
469468 this .r = r ;
470469 this .closeCallBack = closeCallBack ;
471470 this .shippedCallback = shippedCallback ;
472471 this .needCursor = needCursor ;
473472 this .fullRegionScan = fullRegionScan ;
473+ this .clientIPAndPort = clientIPAndPort ;
474474 }
475475
476- public long getNextCallSeq () {
476+ long getNextCallSeq () {
477477 return nextCallSeq .get ();
478478 }
479479
480- public boolean incNextCallSeq (long currentSeq ) {
480+ boolean incNextCallSeq (long currentSeq ) {
481481 // Use CAS to prevent multiple scan request running on the same scanner.
482482 return nextCallSeq .compareAndSet (currentSeq , currentSeq + 1 );
483483 }
484+
485+ // Should be called only when we need to print lease expired messages otherwise
486+ // cache the String once made.
487+ @ Override
488+ public String toString () {
489+ return this .clientIPAndPort + ", " + this .r .getRegionInfo ().getRegionNameAsString ();
490+ }
484491 }
485492
486493 /**
487- * Instantiated as a scanner lease. If the lease times out, the scanner is
488- * closed
494+ * Instantiated as a scanner lease. If the lease times out, the scanner is closed
489495 */
490496 private class ScannerListener implements LeaseListener {
491497 private final String scannerName ;
@@ -497,31 +503,32 @@ private class ScannerListener implements LeaseListener {
497503 @ Override
498504 public void leaseExpired () {
499505 RegionScannerHolder rsh = scanners .remove (this .scannerName );
500- if (rsh != null ) {
501- RegionScanner s = rsh .s ;
502- LOG .info ("Scanner " + this .scannerName + " lease expired on region "
503- + s .getRegionInfo ().getRegionNameAsString ());
504- HRegion region = null ;
506+ if (rsh == null ) {
507+ LOG .warn ("Scanner lease {} expired but no outstanding scanner" , this .scannerName );
508+ return ;
509+ }
510+ LOG .info ("Scanner lease {} expired {}, user={}" , this .scannerName , rsh ,
511+ RpcServer .getRequestUserName ().orElse (null ));
512+ RegionScanner s = rsh .s ;
513+ HRegion region = null ;
514+ try {
515+ region = regionServer .getRegion (s .getRegionInfo ().getRegionName ());
516+ if (region != null && region .getCoprocessorHost () != null ) {
517+ region .getCoprocessorHost ().preScannerClose (s );
518+ }
519+ } catch (IOException e ) {
520+ LOG .error ("Closing scanner {} {}, user={}" , this .scannerName , rsh , e ,
521+ RpcServer .getRequestUserName ().orElse (null ));
522+ } finally {
505523 try {
506- region = regionServer . getRegion ( s . getRegionInfo (). getRegionName () );
524+ s . close ( );
507525 if (region != null && region .getCoprocessorHost () != null ) {
508- region .getCoprocessorHost ().preScannerClose (s );
526+ region .getCoprocessorHost ().postScannerClose (s );
509527 }
510528 } catch (IOException e ) {
511- LOG .error ("Closing scanner for " + s .getRegionInfo ().getRegionNameAsString (), e );
512- } finally {
513- try {
514- s .close ();
515- if (region != null && region .getCoprocessorHost () != null ) {
516- region .getCoprocessorHost ().postScannerClose (s );
517- }
518- } catch (IOException e ) {
519- LOG .error ("Closing scanner for " + s .getRegionInfo ().getRegionNameAsString (), e );
520- }
529+ LOG .error ("Closing scanner {} {}, user={}" , this .scannerName , rsh , e ,
530+ RpcServer .getRequestUserName ().orElse (null ));
521531 }
522- } else {
523- LOG .warn ("Scanner " + this .scannerName + " lease expired, but no related" +
524- " scanner found, hence no chance to close that related scanner!" );
525532 }
526533 }
527534 }
@@ -1305,14 +1312,19 @@ public int getScannersCount() {
13051312 return scanners .size ();
13061313 }
13071314
1308- public
1315+ /**
1316+ * @return The outstanding RegionScanner for <code>scannerId</code> or null if none found.
1317+ */
13091318 RegionScanner getScanner (long scannerId ) {
1310- String scannerIdString = Long .toString (scannerId );
1311- RegionScannerHolder scannerHolder = scanners .get (scannerIdString );
1312- if (scannerHolder != null ) {
1313- return scannerHolder .s ;
1314- }
1315- return null ;
1319+ RegionScannerHolder rsh = getRegionScannerHolder (scannerId );
1320+ return rsh == null ? null : rsh .s ;
1321+ }
1322+
1323+ /**
1324+ * @return The associated RegionScannerHolder for <code>scannerId</code> or null.
1325+ */
1326+ private RegionScannerHolder getRegionScannerHolder (long scannerId ) {
1327+ return scanners .get (toScannerName (scannerId ));
13161328 }
13171329
13181330 public String getScanDetailsWithId (long scannerId ) {
@@ -1346,12 +1358,8 @@ public String getScanDetailsWithRequest(ScanRequest request) {
13461358 * Currently the vtime is the number of "next" calls.
13471359 */
13481360 long getScannerVirtualTime (long scannerId ) {
1349- String scannerIdString = Long .toString (scannerId );
1350- RegionScannerHolder scannerHolder = scanners .get (scannerIdString );
1351- if (scannerHolder != null ) {
1352- return scannerHolder .getNextCallSeq ();
1353- }
1354- return 0L ;
1361+ RegionScannerHolder rsh = getRegionScannerHolder (scannerId );
1362+ return rsh == null ? 0L : rsh .getNextCallSeq ();
13551363 }
13561364
13571365 /**
@@ -1395,24 +1403,36 @@ Object addSize(RpcCallContext context, Result r, Object lastBlock) {
13951403 return lastBlock ;
13961404 }
13971405
1406+ /**
1407+ * @return Remote client's ip and port else null if can't be determined.
1408+ */
1409+ static String getRemoteClientIpAndPort () {
1410+ RpcCall rpcCall = RpcServer .getCurrentCall ().orElse (null );
1411+ if (rpcCall == null ) {
1412+ return HConstants .EMPTY_STRING ;
1413+ }
1414+ InetAddress address = rpcCall .getRemoteAddress ();
1415+ if (address == null ) {
1416+ return HConstants .EMPTY_STRING ;
1417+ }
1418+ // Be careful here with InetAddress. Do InetAddress#getHostAddress. It will not do a name
1419+ // resolution. Just use the IP. It is generally a smaller amount of info to keep around while
1420+ // scanning than a hostname anyways.
1421+ return Address .fromParts (address .getHostAddress (), rpcCall .getRemotePort ()).toString ();
1422+ }
1423+
13981424 private RegionScannerHolder addScanner (String scannerName , RegionScanner s , Shipper shipper ,
13991425 HRegion r , boolean needCursor , boolean fullRegionScan ) throws LeaseStillHeldException {
14001426 Lease lease = regionServer .getLeaseManager ().createLease (
14011427 scannerName , this .scannerLeaseTimeoutPeriod , new ScannerListener (scannerName ));
14021428 RpcCallback shippedCallback = new RegionScannerShippedCallBack (scannerName , shipper , lease );
1403- RpcCallback closeCallback ;
1404- if (s instanceof RpcCallback ) {
1405- closeCallback = (RpcCallback ) s ;
1406- } else {
1407- closeCallback = new RegionScannerCloseCallBack (s );
1408- }
1409-
1410- RegionScannerHolder rsh =
1411- new RegionScannerHolder (scannerName , s , r , closeCallback , shippedCallback ,
1412- needCursor , fullRegionScan );
1429+ RpcCallback closeCallback = s instanceof RpcCallback ?
1430+ (RpcCallback )s : new RegionScannerCloseCallBack (s );
1431+ RegionScannerHolder rsh = new RegionScannerHolder (s , r , closeCallback , shippedCallback ,
1432+ needCursor , fullRegionScan , getRemoteClientIpAndPort ());
14131433 RegionScannerHolder existing = scanners .putIfAbsent (scannerName , rsh );
14141434 assert existing == null : "scannerId must be unique within regionserver's whole lifecycle! " +
1415- scannerName ;
1435+ scannerName + ", " + existing ;
14161436 return rsh ;
14171437 }
14181438
@@ -3162,8 +3182,8 @@ public synchronized Throwable fillInStackTrace() {
31623182 };
31633183
31643184 private RegionScannerHolder getRegionScanner (ScanRequest request ) throws IOException {
3165- String scannerName = Long . toString (request .getScannerId ());
3166- RegionScannerHolder rsh = scanners .get (scannerName );
3185+ String scannerName = toScannerName (request .getScannerId ());
3186+ RegionScannerHolder rsh = this . scanners .get (scannerName );
31673187 if (rsh == null ) {
31683188 // just ignore the next or close request if scanner does not exists.
31693189 if (closedScanners .getIfPresent (scannerName ) != null ) {
@@ -3203,8 +3223,12 @@ private RegionScannerHolder getRegionScanner(ScanRequest request) throws IOExcep
32033223 return rsh ;
32043224 }
32053225
3206- private RegionScannerHolder newRegionScanner (ScanRequest request , ScanResponse .Builder builder )
3207- throws IOException {
3226+ /**
3227+ * @return Pair with scannerName key to use with this new Scanner and its RegionScannerHolder
3228+ * value.
3229+ */
3230+ private Pair <String , RegionScannerHolder > newRegionScanner (ScanRequest request ,
3231+ ScanResponse .Builder builder ) throws IOException {
32083232 HRegion region = getRegion (request .getRegion ());
32093233 rejectIfInStandByState (region );
32103234 ClientProtos .Scan protoScan = request .getScan ();
@@ -3236,13 +3260,24 @@ private RegionScannerHolder newRegionScanner(ScanRequest request, ScanResponse.B
32363260 builder .setScannerId (scannerId );
32373261 builder .setMvccReadPoint (scanner .getMvccReadPoint ());
32383262 builder .setTtl (scannerLeaseTimeoutPeriod );
3239- String scannerName = String . valueOf (scannerId );
3263+ String scannerName = toScannerName (scannerId );
32403264
32413265 boolean fullRegionScan = !region .getRegionInfo ().getTable ().isSystemTable () &&
32423266 isFullRegionScan (scan , region );
32433267
3244- return addScanner (scannerName , scanner , shipper , region , scan .isNeedCursorResult (),
3245- fullRegionScan );
3268+ return new Pair <String , RegionScannerHolder >(scannerName ,
3269+ addScanner (scannerName , scanner , shipper , region , scan .isNeedCursorResult (),
3270+ fullRegionScan ));
3271+ }
3272+
3273+ /**
3274+ * The returned String is used as key doing look up of outstanding Scanners in this Servers'
3275+ * this.scanners, the Map of outstanding scanners and their current state.
3276+ * @param scannerId A scanner long id.
3277+ * @return The long id as a String.
3278+ */
3279+ private static String toScannerName (long scannerId ) {
3280+ return Long .toString (scannerId );
32463281 }
32473282
32483283 private void checkScanNextCallSeq (ScanRequest request , RegionScannerHolder rsh )
@@ -3516,7 +3551,7 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque
35163551 checkOpen ();
35173552 } catch (IOException e ) {
35183553 if (request .hasScannerId ()) {
3519- String scannerName = Long . toString (request .getScannerId ());
3554+ String scannerName = toScannerName (request .getScannerId ());
35203555 if (LOG .isDebugEnabled ()) {
35213556 LOG .debug (
35223557 "Server shutting down and client tried to access missing scanner " + scannerName );
@@ -3539,14 +3574,19 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque
35393574 rpcScanRequestCount .increment ();
35403575 RegionScannerHolder rsh ;
35413576 ScanResponse .Builder builder = ScanResponse .newBuilder ();
3577+ String scannerName ;
35423578 try {
35433579 if (request .hasScannerId ()) {
35443580 // The downstream projects such as AsyncHBase in OpenTSDB need this value. See HBASE-18000
35453581 // for more details.
3546- builder .setScannerId (request .getScannerId ());
3582+ long scannerId = request .getScannerId ();
3583+ builder .setScannerId (scannerId );
3584+ scannerName = toScannerName (scannerId );
35473585 rsh = getRegionScanner (request );
35483586 } else {
3549- rsh = newRegionScanner (request , builder );
3587+ Pair <String , RegionScannerHolder > scannerNameAndRSH = newRegionScanner (request , builder );
3588+ scannerName = scannerNameAndRSH .getFirst ();
3589+ rsh = scannerNameAndRSH .getSecond ();
35503590 }
35513591 } catch (IOException e ) {
35523592 if (e == SCANNER_ALREADY_CLOSED ) {
@@ -3560,11 +3600,10 @@ public ScanResponse scan(final RpcController controller, final ScanRequest reque
35603600 rpcFullScanRequestCount .increment ();
35613601 }
35623602 HRegion region = rsh .r ;
3563- String scannerName = rsh .scannerName ;
35643603 LeaseManager .Lease lease ;
35653604 try {
35663605 // Remove lease while its being processed in server; protects against case
3567- // where processing of request takes > lease expiration time.
3606+ // where processing of request takes > lease expiration time. or null if none found.
35683607 lease = regionServer .getLeaseManager ().removeLease (scannerName );
35693608 } catch (LeaseException e ) {
35703609 throw new ServiceException (e );
0 commit comments