Skip to content

Commit 4477dd5

Browse files
committed
HBASE-22317 Support reading from meta replicas
1 parent 962585d commit 4477dd5

File tree

9 files changed

+264
-261
lines changed

9 files changed

+264
-261
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncNonMetaRegionLocator.java

Lines changed: 63 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717
*/
1818
package org.apache.hadoop.hbase.client;
1919

20+
import static org.apache.hadoop.hbase.HConstants.DEFAULT_USE_META_REPLICAS;
2021
import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
2122
import static org.apache.hadoop.hbase.HConstants.NINES;
23+
import static org.apache.hadoop.hbase.HConstants.USE_META_REPLICAS;
2224
import static org.apache.hadoop.hbase.HConstants.ZEROES;
2325
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
2426
import static org.apache.hadoop.hbase.client.AsyncRegionLocatorHelper.canUpdateOnError;
@@ -87,6 +89,8 @@ class AsyncNonMetaRegionLocator {
8789

8890
private final int locatePrefetchLimit;
8991

92+
private final boolean useMetaReplicas;
93+
9094
private final ConcurrentMap<TableName, TableCache> cache = new ConcurrentHashMap<>();
9195

9296
private static final class LocateRequest {
@@ -193,6 +197,8 @@ private boolean tryComplete(LocateRequest req, CompletableFuture<RegionLocations
193197
MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE, DEFAULT_MAX_CONCURRENT_LOCATE_REQUEST_PER_TABLE);
194198
this.locatePrefetchLimit =
195199
conn.getConfiguration().getInt(LOCATE_PREFETCH_LIMIT, DEFAULT_LOCATE_PREFETCH_LIMIT);
200+
this.useMetaReplicas =
201+
conn.getConfiguration().getBoolean(USE_META_REPLICAS, DEFAULT_USE_META_REPLICAS);
196202
}
197203

198204
private TableCache getTableCache(TableName tableName) {
@@ -425,69 +431,72 @@ private void locateInMeta(TableName tableName, LocateRequest req) {
425431
}
426432
byte[] metaStopKey =
427433
RegionInfo.createRegionName(tableName, HConstants.EMPTY_START_ROW, "", false);
428-
conn.getTable(META_TABLE_NAME)
429-
.scan(new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey, true)
430-
.addFamily(HConstants.CATALOG_FAMILY).setReversed(true).setCaching(locatePrefetchLimit)
431-
.setReadType(ReadType.PREAD), new AdvancedScanResultConsumer() {
434+
Scan scan = new Scan().withStartRow(metaStartKey).withStopRow(metaStopKey, true)
435+
.addFamily(HConstants.CATALOG_FAMILY).setReversed(true).setCaching(locatePrefetchLimit)
436+
.setReadType(ReadType.PREAD);
437+
if (useMetaReplicas) {
438+
scan.setConsistency(Consistency.TIMELINE);
439+
}
440+
conn.getTable(META_TABLE_NAME).scan(scan, new AdvancedScanResultConsumer() {
432441

433-
private boolean completeNormally = false;
442+
private boolean completeNormally = false;
434443

435-
private boolean tableNotFound = true;
444+
private boolean tableNotFound = true;
436445

437-
@Override
438-
public void onError(Throwable error) {
439-
complete(tableName, req, null, error);
440-
}
446+
@Override
447+
public void onError(Throwable error) {
448+
complete(tableName, req, null, error);
449+
}
441450

442-
@Override
443-
public void onComplete() {
444-
if (tableNotFound) {
445-
complete(tableName, req, null, new TableNotFoundException(tableName));
446-
} else if (!completeNormally) {
447-
complete(tableName, req, null, new IOException("Unable to find region for '" +
448-
Bytes.toStringBinary(req.row) + "' in " + tableName));
449-
}
450-
}
451+
@Override
452+
public void onComplete() {
453+
if (tableNotFound) {
454+
complete(tableName, req, null, new TableNotFoundException(tableName));
455+
} else if (!completeNormally) {
456+
complete(tableName, req, null, new IOException(
457+
"Unable to find region for '" + Bytes.toStringBinary(req.row) + "' in " + tableName));
458+
}
459+
}
451460

452-
@Override
453-
public void onNext(Result[] results, ScanController controller) {
454-
if (results.length == 0) {
455-
return;
461+
@Override
462+
public void onNext(Result[] results, ScanController controller) {
463+
if (results.length == 0) {
464+
return;
465+
}
466+
tableNotFound = false;
467+
int i = 0;
468+
for (; i < results.length; i++) {
469+
if (onScanNext(tableName, req, results[i])) {
470+
completeNormally = true;
471+
controller.terminate();
472+
i++;
473+
break;
474+
}
475+
}
476+
// Add the remaining results into cache
477+
if (i < results.length) {
478+
TableCache tableCache = getTableCache(tableName);
479+
for (; i < results.length; i++) {
480+
RegionLocations locs = MetaTableAccessor.getRegionLocations(results[i]);
481+
if (locs == null) {
482+
continue;
456483
}
457-
tableNotFound = false;
458-
int i = 0;
459-
for (; i < results.length; i++) {
460-
if (onScanNext(tableName, req, results[i])) {
461-
completeNormally = true;
462-
controller.terminate();
463-
i++;
464-
break;
465-
}
484+
HRegionLocation loc = locs.getDefaultRegionLocation();
485+
if (loc == null) {
486+
continue;
466487
}
467-
// Add the remaining results into cache
468-
if (i < results.length) {
469-
TableCache tableCache = getTableCache(tableName);
470-
for (; i < results.length; i++) {
471-
RegionLocations locs = MetaTableAccessor.getRegionLocations(results[i]);
472-
if (locs == null) {
473-
continue;
474-
}
475-
HRegionLocation loc = locs.getDefaultRegionLocation();
476-
if (loc == null) {
477-
continue;
478-
}
479-
RegionInfo info = loc.getRegion();
480-
if (info == null || info.isOffline() || info.isSplitParent()) {
481-
continue;
482-
}
483-
RegionLocations addedLocs = addToCache(tableCache, locs);
484-
synchronized (tableCache) {
485-
tableCache.clearCompletedRequests(Optional.of(addedLocs));
486-
}
487-
}
488+
RegionInfo info = loc.getRegion();
489+
if (info == null || info.isOffline() || info.isSplitParent()) {
490+
continue;
491+
}
492+
RegionLocations addedLocs = addToCache(tableCache, locs);
493+
synchronized (tableCache) {
494+
tableCache.clearCompletedRequests(Optional.of(addedLocs));
488495
}
489496
}
490-
});
497+
}
498+
}
499+
});
491500
}
492501

493502
private RegionLocations locateInCache(TableCache tableCache, TableName tableName, byte[] row,

hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestRegionLocator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,8 @@ protected static void startClusterAndCreateTable() throws Exception {
5959
UTIL.getAdmin().createTable(td, SPLIT_KEYS);
6060
UTIL.waitTableAvailable(TABLE_NAME);
6161
try (AsyncRegistry registry = AsyncRegistryFactory.getRegistry(UTIL.getConfiguration())) {
62-
RegionReplicaTestHelper.waitUntilAllMetaReplicasHavingRegionLocation(registry,
63-
REGION_REPLICATION);
62+
RegionReplicaTestHelper.waitUntilAllMetaReplicasHavingRegionLocation(UTIL.getConfiguration(),
63+
registry, REGION_REPLICATION);
6464
}
6565
UTIL.getAdmin().balancerSwitch(false, true);
6666
}

hbase-server/src/test/java/org/apache/hadoop/hbase/client/RegionReplicaTestHelper.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,14 @@
2222

2323
import java.io.IOException;
2424
import java.util.Optional;
25+
import org.apache.hadoop.conf.Configuration;
2526
import org.apache.hadoop.hbase.HBaseTestingUtility;
2627
import org.apache.hadoop.hbase.HRegionLocation;
2728
import org.apache.hadoop.hbase.NotServingRegionException;
2829
import org.apache.hadoop.hbase.RegionLocations;
2930
import org.apache.hadoop.hbase.ServerName;
3031
import org.apache.hadoop.hbase.TableName;
32+
import org.apache.hadoop.hbase.Waiter;
3133
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
3234

3335
final class RegionReplicaTestHelper {
@@ -36,12 +38,10 @@ private RegionReplicaTestHelper() {
3638
}
3739

3840
// waits for all replicas to have region location
39-
static void waitUntilAllMetaReplicasHavingRegionLocation(AsyncRegistry registry,
40-
int regionReplication) throws IOException {
41-
TestZKAsyncRegistry.TEST_UTIL.waitFor(
42-
TestZKAsyncRegistry.TEST_UTIL.getConfiguration()
43-
.getLong("hbase.client.sync.wait.timeout.msec", 60000),
44-
200, true, new ExplainingPredicate<IOException>() {
41+
static void waitUntilAllMetaReplicasHavingRegionLocation(Configuration conf,
42+
AsyncRegistry registry, int regionReplication) throws IOException {
43+
Waiter.waitFor(conf, conf.getLong("hbase.client.sync.wait.timeout.msec", 60000), 200, true,
44+
new ExplainingPredicate<IOException>() {
4545
@Override
4646
public String explainFailure() throws IOException {
4747
return "Not all meta replicas get assigned";

hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminWithRegionReplicas.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ public static void setUpBeforeClass() throws Exception {
5454
TEST_UTIL.getConfiguration().setInt(HConstants.META_REPLICAS_NUM, 3);
5555
TestAsyncAdminBase.setUpBeforeClass();
5656
try (AsyncRegistry registry = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration())) {
57-
RegionReplicaTestHelper.waitUntilAllMetaReplicasHavingRegionLocation(registry, 3);
57+
RegionReplicaTestHelper
58+
.waitUntilAllMetaReplicasHavingRegionLocation(TEST_UTIL.getConfiguration(), registry, 3);
5859
}
5960
}
6061

hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,8 @@ public static void setUp() throws Exception {
5555
TEST_UTIL.getConfiguration().setInt(HConstants.META_REPLICAS_NUM, 3);
5656
TEST_UTIL.startMiniCluster(3);
5757
REGISTRY = AsyncRegistryFactory.getRegistry(TEST_UTIL.getConfiguration());
58-
RegionReplicaTestHelper.waitUntilAllMetaReplicasHavingRegionLocation(REGISTRY, 3);
58+
RegionReplicaTestHelper
59+
.waitUntilAllMetaReplicasHavingRegionLocation(TEST_UTIL.getConfiguration(), REGISTRY, 3);
5960
TEST_UTIL.getAdmin().balancerSwitch(false, true);
6061
LOCATOR = new AsyncMetaRegionLocator(REGISTRY);
6162
}
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.client;
19+
20+
import static org.junit.Assert.assertArrayEquals;
21+
22+
import java.io.IOException;
23+
import java.util.Optional;
24+
import java.util.concurrent.ExecutionException;
25+
import java.util.concurrent.TimeUnit;
26+
import org.apache.hadoop.conf.Configuration;
27+
import org.apache.hadoop.hbase.HBaseClassTestRule;
28+
import org.apache.hadoop.hbase.HBaseTestingUtility;
29+
import org.apache.hadoop.hbase.HConstants;
30+
import org.apache.hadoop.hbase.TableName;
31+
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
32+
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
33+
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
34+
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
35+
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
36+
import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
37+
import org.apache.hadoop.hbase.testclassification.ClientTests;
38+
import org.apache.hadoop.hbase.testclassification.LargeTests;
39+
import org.apache.hadoop.hbase.util.Bytes;
40+
import org.apache.hadoop.hbase.util.FutureUtils;
41+
import org.junit.After;
42+
import org.junit.AfterClass;
43+
import org.junit.BeforeClass;
44+
import org.junit.ClassRule;
45+
import org.junit.Test;
46+
import org.junit.experimental.categories.Category;
47+
48+
@Category({ ClientTests.class, LargeTests.class })
49+
public class TestAsyncTableUseMetaReplicas {
50+
51+
@ClassRule
52+
public static final HBaseClassTestRule CLASS_RULE =
53+
HBaseClassTestRule.forClass(TestAsyncTableUseMetaReplicas.class);
54+
55+
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
56+
57+
private static TableName TABLE_NAME = TableName.valueOf("Replica");
58+
59+
private static byte[] FAMILY = Bytes.toBytes("Family");
60+
61+
private static byte[] QUALIFIER = Bytes.toBytes("Qual");
62+
63+
private static byte[] ROW = Bytes.toBytes("Row");
64+
65+
private static byte[] VALUE = Bytes.toBytes("Value");
66+
67+
private static volatile boolean FAIL_PRIMARY_SCAN = false;
68+
69+
public static final class FailPrimaryMetaScanCp implements RegionObserver, RegionCoprocessor {
70+
71+
@Override
72+
public Optional<RegionObserver> getRegionObserver() {
73+
return Optional.of(this);
74+
}
75+
76+
@Override
77+
public void preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan)
78+
throws IOException {
79+
RegionInfo region = c.getEnvironment().getRegionInfo();
80+
if (FAIL_PRIMARY_SCAN && TableName.isMetaTableName(region.getTable()) &&
81+
region.getReplicaId() == RegionReplicaUtil.DEFAULT_REPLICA_ID) {
82+
throw new IOException("Inject error");
83+
}
84+
}
85+
}
86+
87+
@BeforeClass
88+
public static void setUp() throws Exception {
89+
Configuration conf = UTIL.getConfiguration();
90+
conf.setInt(HConstants.META_REPLICAS_NUM, 3);
91+
conf.setInt(StorefileRefresherChore.REGIONSERVER_STOREFILE_REFRESH_PERIOD, 1000);
92+
conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
93+
FailPrimaryMetaScanCp.class.getName());
94+
UTIL.startMiniCluster(3);
95+
try (AsyncRegistry registry = AsyncRegistryFactory.getRegistry(conf)) {
96+
RegionReplicaTestHelper.waitUntilAllMetaReplicasHavingRegionLocation(conf, registry, 3);
97+
}
98+
try (Table table = UTIL.createTable(TABLE_NAME, FAMILY)) {
99+
table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE));
100+
}
101+
UTIL.flush(TableName.META_TABLE_NAME);
102+
// wait for the store file refresh so we can read the region location from secondary meta
103+
// replicas
104+
Thread.sleep(2000);
105+
}
106+
107+
@AfterClass
108+
public static void tearDown() throws Exception {
109+
UTIL.shutdownMiniCluster();
110+
}
111+
112+
@After
113+
public void tearDownAfterTest() {
114+
// make sure we do not mess up cleanup code.
115+
FAIL_PRIMARY_SCAN = false;
116+
}
117+
118+
private void testRead(boolean useMetaReplicas)
119+
throws IOException, InterruptedException, ExecutionException {
120+
FAIL_PRIMARY_SCAN = true;
121+
Configuration conf = new Configuration(UTIL.getConfiguration());
122+
conf.setBoolean(HConstants.USE_META_REPLICAS, useMetaReplicas);
123+
conf.setLong(HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT, TimeUnit.SECONDS.toMicros(1));
124+
try (AsyncConnection conn = ConnectionFactory.createAsyncConnection(conf).get()) {
125+
Result result = FutureUtils.get(conn.getTableBuilder(TABLE_NAME)
126+
.setOperationTimeout(3, TimeUnit.SECONDS).build().get(new Get(ROW)));
127+
assertArrayEquals(VALUE, result.getValue(FAMILY, QUALIFIER));
128+
}
129+
}
130+
131+
@Test(expected = RetriesExhaustedException.class)
132+
public void testNotUseMetaReplicas()
133+
throws IOException, InterruptedException, ExecutionException {
134+
testRead(false);
135+
}
136+
137+
@Test
138+
public void testUseMetaReplicas() throws IOException, InterruptedException, ExecutionException {
139+
testRead(true);
140+
}
141+
}

hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestConnectionImplementation.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
import java.util.concurrent.atomic.AtomicReference;
4141
import java.util.stream.Collectors;
4242
import java.util.stream.IntStream;
43-
4443
import org.apache.hadoop.conf.Configuration;
4544
import org.apache.hadoop.hbase.Cell;
4645
import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -1072,4 +1071,22 @@ public void testLocateRegionsWithRegionReplicas() throws IOException {
10721071
TEST_UTIL.deleteTable(tableName);
10731072
}
10741073
}
1074+
1075+
@Test
1076+
public void testMetaLookupThreadPoolCreated() throws Exception {
1077+
final TableName tableName = TableName.valueOf(name.getMethodName());
1078+
byte[][] FAMILIES = new byte[][] { Bytes.toBytes("foo") };
1079+
if (TEST_UTIL.getAdmin().tableExists(tableName)) {
1080+
TEST_UTIL.getAdmin().disableTable(tableName);
1081+
TEST_UTIL.getAdmin().deleteTable(tableName);
1082+
}
1083+
try (Table htable = TEST_UTIL.createTable(tableName, FAMILIES)) {
1084+
byte[] row = Bytes.toBytes("test");
1085+
ConnectionImplementation c = ((ConnectionImplementation) TEST_UTIL.getConnection());
1086+
// check that metalookup pool would get created
1087+
c.relocateRegion(tableName, row);
1088+
ExecutorService ex = c.getCurrentMetaLookupPool();
1089+
assertNotNull(ex);
1090+
}
1091+
}
10751092
}

0 commit comments

Comments
 (0)