| 
 | 1 | +/**  | 
 | 2 | + * Licensed to the Apache Software Foundation (ASF) under one  | 
 | 3 | + * or more contributor license agreements.  See the NOTICE file  | 
 | 4 | + * distributed with this work for additional information  | 
 | 5 | + * regarding copyright ownership.  The ASF licenses this file  | 
 | 6 | + * to you under the Apache License, Version 2.0 (the  | 
 | 7 | + * "License"); you may not use this file except in compliance  | 
 | 8 | + * with the License.  You may obtain a copy of the License at  | 
 | 9 | + *  | 
 | 10 | + *     http://www.apache.org/licenses/LICENSE-2.0  | 
 | 11 | + *  | 
 | 12 | + * Unless required by applicable law or agreed to in writing, software  | 
 | 13 | + * distributed under the License is distributed on an "AS IS" BASIS,  | 
 | 14 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.  | 
 | 15 | + * See the License for the specific language governing permissions and  | 
 | 16 | + * limitations under the License.  | 
 | 17 | + */  | 
 | 18 | +package org.apache.hadoop.hbase.regionserver;  | 
 | 19 | + | 
 | 20 | +import static org.junit.Assert.assertNull;  | 
 | 21 | +import static org.junit.Assert.assertTrue;  | 
 | 22 | + | 
 | 23 | +import java.io.IOException;  | 
 | 24 | +import java.util.Optional;  | 
 | 25 | +import java.util.concurrent.atomic.AtomicReference;  | 
 | 26 | +import org.apache.hadoop.conf.Configuration;  | 
 | 27 | +import org.apache.hadoop.hbase.HBaseClassTestRule;  | 
 | 28 | +import org.apache.hadoop.hbase.HBaseTestingUtility;  | 
 | 29 | +import org.apache.hadoop.hbase.HConstants;  | 
 | 30 | +import org.apache.hadoop.hbase.MiniHBaseCluster.MiniHBaseClusterRegionServer;  | 
 | 31 | +import org.apache.hadoop.hbase.TableName;  | 
 | 32 | +import org.apache.hadoop.hbase.client.Admin;  | 
 | 33 | +import org.apache.hadoop.hbase.client.Put;  | 
 | 34 | +import org.apache.hadoop.hbase.client.Result;  | 
 | 35 | +import org.apache.hadoop.hbase.client.ResultScanner;  | 
 | 36 | +import org.apache.hadoop.hbase.client.Scan;  | 
 | 37 | +import org.apache.hadoop.hbase.client.Table;  | 
 | 38 | +import org.apache.hadoop.hbase.io.ByteBuffAllocator;  | 
 | 39 | +import org.apache.hadoop.hbase.io.DeallocateRewriteByteBuffAllocator;  | 
 | 40 | +import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;  | 
 | 41 | +import org.apache.hadoop.hbase.ipc.RpcCall;  | 
 | 42 | +import org.apache.hadoop.hbase.ipc.RpcServer;  | 
 | 43 | +import org.apache.hadoop.hbase.testclassification.LargeTests;  | 
 | 44 | +import org.apache.hadoop.hbase.testclassification.RegionServerTests;  | 
 | 45 | +import org.apache.hadoop.hbase.util.Bytes;  | 
 | 46 | +import org.junit.AfterClass;  | 
 | 47 | +import org.junit.BeforeClass;  | 
 | 48 | +import org.junit.ClassRule;  | 
 | 49 | +import org.junit.Rule;  | 
 | 50 | +import org.junit.Test;  | 
 | 51 | +import org.junit.experimental.categories.Category;  | 
 | 52 | +import org.junit.rules.TestName;  | 
 | 53 | + | 
 | 54 | +import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;  | 
 | 55 | +import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;  | 
 | 56 | + | 
 | 57 | +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;  | 
 | 58 | +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;  | 
 | 59 | + | 
 | 60 | +@Category({ RegionServerTests.class, LargeTests.class })  | 
 | 61 | +public class TestRegionServerScan {  | 
 | 62 | +  @ClassRule  | 
 | 63 | +  public static final HBaseClassTestRule CLASS_RULE =  | 
 | 64 | +      HBaseClassTestRule.forClass(TestRegionServerScan.class);  | 
 | 65 | + | 
 | 66 | +  @Rule  | 
 | 67 | +  public TestName name = new TestName();  | 
 | 68 | + | 
 | 69 | +  private static final byte[] CF = Bytes.toBytes("CF");  | 
 | 70 | +  private static final byte[] CQ = Bytes.toBytes("CQ");  | 
 | 71 | +  private static final byte[] VALUE = new byte[1200];  | 
 | 72 | + | 
 | 73 | +  private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();  | 
 | 74 | +  private static final Configuration conf = TEST_UTIL.getConfiguration();  | 
 | 75 | +  private static Admin admin = null;  | 
 | 76 | +  static final TableName tableName = TableName.valueOf("TestRegionServerScan");  | 
 | 77 | +  static final byte[] r0 = Bytes.toBytes("row-0");  | 
 | 78 | +  static final byte[] r1 = Bytes.toBytes("row-1");  | 
 | 79 | +  static final byte[] r2 = Bytes.toBytes("row-2");  | 
 | 80 | + | 
 | 81 | +  @BeforeClass  | 
 | 82 | +  public static void setupBeforeClass() throws Exception {  | 
 | 83 | +    /**  | 
 | 84 | +     * Use {@link DeallocateRewriteByteBuffAllocator} to rewrite the bytebuffers right after  | 
 | 85 | +     * released.  | 
 | 86 | +     */  | 
 | 87 | +    conf.set(ByteBuffAllocator.BYTEBUFF_ALLOCATOR_CLASS,  | 
 | 88 | +      DeallocateRewriteByteBuffAllocator.class.getName());  | 
 | 89 | +    conf.setBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, true);  | 
 | 90 | +    conf.setInt(ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY, 0);  | 
 | 91 | +    conf.setInt(BlockCacheFactory.BUCKET_CACHE_WRITER_THREADS_KEY, 20);  | 
 | 92 | +    conf.setInt(ByteBuffAllocator.BUFFER_SIZE_KEY, 2048);  | 
 | 93 | +    conf.set(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap");  | 
 | 94 | +    conf.setInt(HConstants.BUCKET_CACHE_SIZE_KEY, 64);  | 
 | 95 | +    conf.setStrings(HConstants.REGION_SERVER_IMPL, MyRegionServer.class.getName());  | 
 | 96 | +    conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 30 * 60 * 1000);  | 
 | 97 | +    conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 30 * 60 * 1000);  | 
 | 98 | + | 
 | 99 | +    conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 60 * 60 * 1000);  | 
 | 100 | +    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);  | 
 | 101 | +    conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 10000);  | 
 | 102 | +    conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 1024 * 1024 * 1024);  | 
 | 103 | +    TEST_UTIL.startMiniCluster(1);  | 
 | 104 | +    admin = TEST_UTIL.getAdmin();  | 
 | 105 | +  }  | 
 | 106 | + | 
 | 107 | +  @AfterClass  | 
 | 108 | +  public static void tearDownAfterClass() throws Exception {  | 
 | 109 | +    TEST_UTIL.shutdownMiniCluster();  | 
 | 110 | +  }  | 
 | 111 | + | 
 | 112 | +  @Test  | 
 | 113 | +  public void testScannWhenRpcCallContextNull() throws Exception {  | 
 | 114 | +    ResultScanner resultScanner = null;  | 
 | 115 | +    Table table = null;  | 
 | 116 | +    try {  | 
 | 117 | +      table =  | 
 | 118 | +          TEST_UTIL.createTable(tableName, new byte[][] { CF }, 1, 1024, null);  | 
 | 119 | +      putToTable(table, r0);  | 
 | 120 | +      putToTable(table, r1);  | 
 | 121 | +      putToTable(table, r2);  | 
 | 122 | + | 
 | 123 | +      admin.flush(table.getName());  | 
 | 124 | + | 
 | 125 | +      Scan scan = new Scan();  | 
 | 126 | +      scan.setCaching(2);  | 
 | 127 | +      scan.withStartRow(r0, true).withStopRow(r2, true);  | 
 | 128 | + | 
 | 129 | +      MyRSRpcServices.inTest = true;  | 
 | 130 | +      resultScanner = table.getScanner(scan);  | 
 | 131 | +      Result result = resultScanner.next();  | 
 | 132 | +      byte[] rowKey = result.getRow();  | 
 | 133 | +      assertTrue(Bytes.equals(r0, rowKey));  | 
 | 134 | + | 
 | 135 | +      result = resultScanner.next();  | 
 | 136 | +      rowKey = result.getRow();  | 
 | 137 | +      assertTrue(Bytes.equals(r1, rowKey));  | 
 | 138 | + | 
 | 139 | +      result = resultScanner.next();  | 
 | 140 | +      rowKey = result.getRow();  | 
 | 141 | +      assertTrue(Bytes.equals(r2, rowKey));  | 
 | 142 | +      assertNull(resultScanner.next());  | 
 | 143 | +      assertTrue(MyRSRpcServices.exceptionRef.get() == null);  | 
 | 144 | +    } finally {  | 
 | 145 | +      MyRSRpcServices.inTest = false;  | 
 | 146 | +      if (resultScanner != null) {  | 
 | 147 | +        resultScanner.close();  | 
 | 148 | +      }  | 
 | 149 | +      if (table != null) {  | 
 | 150 | +        table.close();  | 
 | 151 | +      }  | 
 | 152 | +    }  | 
 | 153 | +  }  | 
 | 154 | + | 
 | 155 | +  private static void putToTable(Table table, byte[] rowkey) throws IOException {  | 
 | 156 | +    Put put = new Put(rowkey);  | 
 | 157 | +    put.addColumn(CF, CQ, VALUE);  | 
 | 158 | +    table.put(put);  | 
 | 159 | +  }  | 
 | 160 | + | 
 | 161 | +  private static class MyRegionServer extends MiniHBaseClusterRegionServer {  | 
 | 162 | +    public MyRegionServer(Configuration conf) throws IOException, InterruptedException {  | 
 | 163 | +      super(conf);  | 
 | 164 | +    }  | 
 | 165 | + | 
 | 166 | +    @Override  | 
 | 167 | +    protected RSRpcServices createRpcServices() throws IOException {  | 
 | 168 | +      return new MyRSRpcServices(this);  | 
 | 169 | +    }  | 
 | 170 | +  }  | 
 | 171 | + | 
 | 172 | +  private static class MyRSRpcServices extends RSRpcServices {  | 
 | 173 | +    private static AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>(null);  | 
 | 174 | +    private static volatile boolean inTest = false;  | 
 | 175 | + | 
 | 176 | +    public MyRSRpcServices(HRegionServer rs) throws IOException {  | 
 | 177 | +      super(rs);  | 
 | 178 | +    }  | 
 | 179 | + | 
 | 180 | +    @Override  | 
 | 181 | +    public ScanResponse scan(RpcController controller, ScanRequest request)  | 
 | 182 | +        throws ServiceException {  | 
 | 183 | +      try {  | 
 | 184 | +        if (!inTest) {  | 
 | 185 | +          return super.scan(controller, request);  | 
 | 186 | +        }  | 
 | 187 | + | 
 | 188 | +        HRegion region = null;  | 
 | 189 | +        if (request.hasRegion()) {  | 
 | 190 | +          region = this.getRegion(request.getRegion());  | 
 | 191 | +        }  | 
 | 192 | + | 
 | 193 | +        if (region != null  | 
 | 194 | +            && !tableName.equals(region.getTableDescriptor().getTableName())) {  | 
 | 195 | +          return super.scan(controller, request);  | 
 | 196 | +        }  | 
 | 197 | + | 
 | 198 | +        ScanResponse result = null;  | 
 | 199 | +        //Simulate RpcCallContext is null for test.  | 
 | 200 | +        Optional<RpcCall> rpcCall = RpcServer.unsetCurrentCall();  | 
 | 201 | +        try {  | 
 | 202 | +          result = super.scan(controller, request);  | 
 | 203 | +        } finally {  | 
 | 204 | +          rpcCall.ifPresent(RpcServer::setCurrentCall);  | 
 | 205 | +        }  | 
 | 206 | +        return result;  | 
 | 207 | +      } catch (Throwable e) {  | 
 | 208 | +        exceptionRef.set(e);  | 
 | 209 | +        throw new ServiceException(e);  | 
 | 210 | +      }  | 
 | 211 | +    }  | 
 | 212 | +  }  | 
 | 213 | + | 
 | 214 | +}  | 
0 commit comments