|  | 
| 35 | 35 | import java.util.Arrays; | 
| 36 | 36 | import java.util.Collection; | 
| 37 | 37 | import java.util.List; | 
|  | 38 | +import java.util.concurrent.atomic.AtomicInteger; | 
| 38 | 39 | import org.apache.hadoop.conf.Configuration; | 
| 39 | 40 | import org.apache.hadoop.hbase.Cell; | 
| 40 | 41 | import org.apache.hadoop.hbase.CompareOperator; | 
|  | 
| 44 | 45 | import org.apache.hadoop.hbase.HRegionLocation; | 
| 45 | 46 | import org.apache.hadoop.hbase.HTestConst; | 
| 46 | 47 | import org.apache.hadoop.hbase.KeyValue; | 
|  | 48 | +import org.apache.hadoop.hbase.ServerName; | 
| 47 | 49 | import org.apache.hadoop.hbase.SingleProcessHBaseCluster; | 
| 48 | 50 | import org.apache.hadoop.hbase.StartTestingClusterOption; | 
| 49 | 51 | import org.apache.hadoop.hbase.TableName; | 
| 50 | 52 | import org.apache.hadoop.hbase.TableNameTestRule; | 
| 51 | 53 | import org.apache.hadoop.hbase.TableNotFoundException; | 
| 52 | 54 | import org.apache.hadoop.hbase.client.Scan.ReadType; | 
| 53 | 55 | import org.apache.hadoop.hbase.exceptions.DeserializationException; | 
|  | 56 | +import org.apache.hadoop.hbase.exceptions.OutOfOrderScannerNextException; | 
| 54 | 57 | import org.apache.hadoop.hbase.filter.BinaryComparator; | 
| 55 | 58 | import org.apache.hadoop.hbase.filter.ColumnPrefixFilter; | 
| 56 | 59 | import org.apache.hadoop.hbase.filter.ColumnRangeFilter; | 
| 57 | 60 | import org.apache.hadoop.hbase.filter.FilterBase; | 
| 58 | 61 | import org.apache.hadoop.hbase.filter.QualifierFilter; | 
|  | 62 | +import org.apache.hadoop.hbase.ipc.RpcClient; | 
| 59 | 63 | import org.apache.hadoop.hbase.regionserver.HRegionServer; | 
| 60 | 64 | import org.apache.hadoop.hbase.testclassification.ClientTests; | 
| 61 | 65 | import org.apache.hadoop.hbase.testclassification.MediumTests; | 
| @@ -907,6 +911,86 @@ public void testScannerWithPartialResults() throws Exception { | 
| 907 | 911 |     } | 
| 908 | 912 |   } | 
| 909 | 913 | 
 | 
|  | 914 | +  @Test | 
|  | 915 | +  public void testRepeatedFinalScan() throws Exception { | 
|  | 916 | +    TableName tableName = TableName.valueOf("testRepeatedFinalScan"); | 
|  | 917 | +    TEST_UTIL.createTable(tableName, FAMILY).close(); | 
|  | 918 | + | 
|  | 919 | +    Configuration c2 = new Configuration(TEST_UTIL.getConfiguration()); | 
|  | 920 | +    // We want to work on a separate connection. | 
|  | 921 | +    c2.set(HConstants.HBASE_CLIENT_INSTANCE_ID, String.valueOf(-1)); | 
|  | 922 | +    c2.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 100); // retry a lot | 
|  | 923 | +    c2.setInt(HConstants.HBASE_CLIENT_PAUSE, 5); | 
|  | 924 | + | 
|  | 925 | +    Connection connection = ConnectionFactory.createConnection(c2); | 
|  | 926 | +    final Table table = connection.getTable(tableName); | 
|  | 927 | + | 
|  | 928 | +    final int ROWS_TO_INSERT = 100; | 
|  | 929 | +    final byte[] LARGE_VALUE = generateHugeValue(128 * 1024); | 
|  | 930 | + | 
|  | 931 | +    Admin admin = TEST_UTIL.getAdmin(); | 
|  | 932 | +    List<Put> putList = new ArrayList<>(); | 
|  | 933 | +    for (long i = 0; i < ROWS_TO_INSERT; i++) { | 
|  | 934 | +      Put put = new Put(Bytes.toBytes(i)); | 
|  | 935 | +      put.addColumn(FAMILY, QUALIFIER, LARGE_VALUE); | 
|  | 936 | +      putList.add(put); | 
|  | 937 | +    } | 
|  | 938 | +    table.put(putList); | 
|  | 939 | + | 
|  | 940 | +    ServerName sn; | 
|  | 941 | +    try (RegionLocator rl = connection.getRegionLocator(tableName)) { | 
|  | 942 | +      sn = rl.getRegionLocation(Bytes.toBytes(1)).getServerName(); | 
|  | 943 | +    } | 
|  | 944 | +    RpcClient rpcClient = ((AsyncConnectionImpl) connection.toAsyncConnection()).rpcClient; | 
|  | 945 | + | 
|  | 946 | +    // Avoid cancelling connection more than once per scan RPC. | 
|  | 947 | +    final AtomicInteger canCancelConnection = new AtomicInteger(0); | 
|  | 948 | + | 
|  | 949 | +    Thread t = new Thread("testScanRepeatThread") { | 
|  | 950 | +      @Override | 
|  | 951 | +      public void run() { | 
|  | 952 | +        while (true) { | 
|  | 953 | +          try { | 
|  | 954 | +            Thread.sleep(10); | 
|  | 955 | +            if (canCancelConnection.get() == 1) { | 
|  | 956 | +              canCancelConnection.set(0); | 
|  | 957 | +              rpcClient.cancelConnections(sn); | 
|  | 958 | +            } | 
|  | 959 | +          } catch (InterruptedException t) { | 
|  | 960 | +            break; | 
|  | 961 | +          } | 
|  | 962 | +        } | 
|  | 963 | +      } | 
|  | 964 | +    }; | 
|  | 965 | +    t.start(); | 
|  | 966 | + | 
|  | 967 | +    Scan scan = new Scan(); | 
|  | 968 | +    scan.addColumn(FAMILY, QUALIFIER); | 
|  | 969 | +    scan.setCaching(10); | 
|  | 970 | + | 
|  | 971 | +    for (int run = 0; run < 5; run++) { | 
|  | 972 | +      try (ResultScanner scanner = table.getScanner(scan)) { | 
|  | 973 | +        for (int i = 0; i < ROWS_TO_INSERT; i++) { | 
|  | 974 | +          Result result; | 
|  | 975 | +          try { | 
|  | 976 | +            result = scanner.next(); | 
|  | 977 | +          } catch (RetriesExhaustedException ex) { | 
|  | 978 | +            // If most rows are ok then accept RetriesExhaustedException. This was | 
|  | 979 | +            // needed to make results consistent with and without fix. | 
|  | 980 | +            if (i > ROWS_TO_INSERT / 2) break; | 
|  | 981 | +            throw ex; | 
|  | 982 | +          } | 
|  | 983 | +          assertNotNull(result); | 
|  | 984 | +          if (i % 10 == 1) canCancelConnection.set(1); | 
|  | 985 | +        } | 
|  | 986 | +      } | 
|  | 987 | +    } | 
|  | 988 | +    t.interrupt(); | 
|  | 989 | + | 
|  | 990 | +    table.close(); | 
|  | 991 | +    connection.close(); | 
|  | 992 | +  } | 
|  | 993 | + | 
| 910 | 994 |   public static class LimitKVsReturnFilter extends FilterBase { | 
| 911 | 995 | 
 | 
| 912 | 996 |     private int cellCount = 0; | 
|  | 
0 commit comments