Skip to content

Commit 506935a

Browse files
committed
Checking replica id on AsyncRpcRetryingCaller side
1 parent b6e9cd3 commit 506935a

File tree

4 files changed

+124
-95
lines changed

4 files changed

+124
-95
lines changed

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java

Lines changed: 24 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import java.util.function.Function;
3636
import java.util.function.Supplier;
3737
import java.util.stream.Collectors;
38-
import org.apache.hadoop.hbase.DoNotRetryIOException;
3938
import org.apache.hadoop.hbase.HRegionLocation;
4039
import org.apache.hadoop.hbase.RegionLocations;
4140
import org.apache.hadoop.hbase.ServerName;
@@ -152,67 +151,36 @@ CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[
152151
}, AsyncRegionLocator::getRegionNames, supplier);
153152
}
154153

155-
private void internalAddListener(CompletableFuture<HRegionLocation> future,
156-
CompletableFuture<RegionLocations> locsFuture, TableName tableName, byte[] row, int replicaId,
157-
RegionLocateType type) {
158-
addListener(locsFuture, (locs, error) -> {
159-
if (error != null) {
160-
future.completeExceptionally(error);
161-
return;
162-
}
163-
HRegionLocation loc = locs.getRegionLocation(replicaId);
164-
if (loc == null) {
165-
future.completeExceptionally(
166-
new RegionOfflineException("No location for " + tableName + ", row='"
167-
+ Bytes.toStringBinary(row) + "', locateType=" + type + ", replicaId=" + replicaId));
168-
} else if (loc.getServerName() == null) {
169-
future
170-
.completeExceptionally(new RegionOfflineException("No server address listed for region '"
171-
+ loc.getRegion().getRegionNameAsString() + ", row='" + Bytes.toStringBinary(row)
172-
+ "', locateType=" + type + ", replicaId=" + replicaId));
173-
} else {
174-
future.complete(loc);
175-
}
176-
});
177-
}
178-
179154
CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
180155
int replicaId, RegionLocateType type, boolean reload, long timeoutNs) {
181156
final Supplier<Span> supplier = new TableSpanBuilder(conn)
182157
.setName("AsyncRegionLocator.getRegionLocation").setTableName(tableName);
183158
return tracedLocationFuture(() -> {
159+
// meta region can not be split right now so we always call the same method.
160+
// Change it later if the meta table can have more than one regions.
184161
CompletableFuture<HRegionLocation> future = new CompletableFuture<>();
185-
if (replicaId == RegionReplicaUtil.DEFAULT_REPLICA_ID) {
186-
// meta region can not be split right now so we always call the same method.
187-
// Change it later if the meta table can have more than one regions.
188-
CompletableFuture<RegionLocations> locsFuture = isMeta(tableName)
189-
? metaRegionLocator.getRegionLocations(replicaId, reload)
190-
: nonMetaRegionLocator.getRegionLocations(tableName, row, replicaId, type, reload);
191-
internalAddListener(future, locsFuture, tableName, row, replicaId, type);
192-
} else {
193-
addListener(conn.getAdmin().getDescriptor(tableName), (tdesc, error) -> {
194-
if (error != null) {
195-
future.completeExceptionally(error);
196-
return;
197-
}
198-
int regionReplicationCount = tdesc.getRegionReplication();
199-
if (replicaId >= regionReplicationCount) {
200-
future
201-
.completeExceptionally(new DoNotRetryIOException("The specified region replica id "
202-
+ replicaId + " does not exist, the REGION_REPLICATION of this table "
203-
+ tableName.getNameAsString() + " is " + regionReplicationCount + ","
204-
+ " this means that the maximum region replica id you can specify is "
205-
+ (regionReplicationCount - 1) + "."));
206-
return;
207-
}
208-
// meta region can not be split right now so we always call the same method.
209-
// Change it later if the meta table can have more than one regions.
210-
CompletableFuture<RegionLocations> locsFuture = isMeta(tableName)
211-
? metaRegionLocator.getRegionLocations(replicaId, reload)
212-
: nonMetaRegionLocator.getRegionLocations(tableName, row, replicaId, type, reload);
213-
internalAddListener(future, locsFuture, tableName, row, replicaId, type);
214-
});
215-
}
162+
CompletableFuture<RegionLocations> locsFuture = isMeta(tableName)
163+
? metaRegionLocator.getRegionLocations(replicaId, reload)
164+
: nonMetaRegionLocator.getRegionLocations(tableName, row, replicaId, type, reload);
165+
addListener(locsFuture, (locs, error) -> {
166+
if (error != null) {
167+
future.completeExceptionally(error);
168+
return;
169+
}
170+
HRegionLocation loc = locs.getRegionLocation(replicaId);
171+
if (loc == null) {
172+
future.completeExceptionally(
173+
new RegionOfflineException("No location for " + tableName + ", row='"
174+
+ Bytes.toStringBinary(row) + "', locateType=" + type + ", replicaId=" + replicaId));
175+
} else if (loc.getServerName() == null) {
176+
future.completeExceptionally(
177+
new RegionOfflineException("No server address listed for region '"
178+
+ loc.getRegion().getRegionNameAsString() + ", row='" + Bytes.toStringBinary(row)
179+
+ "', locateType=" + type + ", replicaId=" + replicaId));
180+
} else {
181+
future.complete(loc);
182+
}
183+
});
216184
return withTimeout(future, timeoutNs,
217185
() -> "Timeout(" + TimeUnit.NANOSECONDS.toMillis(timeoutNs)
218186
+ "ms) waiting for region location for " + tableName + ", row='"

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,29 @@ protected final void onError(Throwable t, Supplier<String> errMsg,
204204
}
205205
if (disabled) {
206206
future.completeExceptionally(new TableNotEnabledException(tableName.get()));
207+
return;
208+
}
209+
if (this instanceof AsyncSingleRequestRpcRetryingCaller) {
210+
AsyncSingleRequestRpcRetryingCaller caller = (AsyncSingleRequestRpcRetryingCaller) this;
211+
int replicaId = caller.getReplicaId();
212+
FutureUtils.addListener(conn.getAdmin().getDescriptor(tableName.get()),
213+
(tdesc, tdescError) -> {
214+
if (tdescError != null) {
215+
future.completeExceptionally(tdescError);
216+
return;
217+
}
218+
int regionReplicationCount = tdesc.getRegionReplication();
219+
if (replicaId >= regionReplicationCount) {
220+
future.completeExceptionally(
221+
new DoNotRetryIOException("The specified region replica id " + replicaId
222+
+ " does not exist, the REGION_REPLICATION of this table "
223+
+ tableName.get().getNameAsString() + " is " + regionReplicationCount + ","
224+
+ " this means that the maximum region replica id you can specify is "
225+
+ (regionReplicationCount - 1) + "."));
226+
return;
227+
}
228+
tryScheduleRetry(error);
229+
});
207230
} else {
208231
tryScheduleRetry(error);
209232
}

hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,4 +122,8 @@ protected void doCall() {
122122
protected Optional<TableName> getTableName() {
123123
return Optional.of(tableName);
124124
}
125+
126+
public int getReplicaId() {
127+
return replicaId;
128+
}
125129
}
Lines changed: 73 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,15 @@
1818
package org.apache.hadoop.hbase.client;
1919

2020
import static org.junit.Assert.assertEquals;
21-
import static org.junit.Assert.assertNotNull;
2221
import static org.junit.Assert.assertTrue;
2322

23+
import java.io.IOException;
2424
import java.util.List;
25-
import java.util.concurrent.CompletableFuture;
26-
import java.util.concurrent.ExecutionException;
2725
import org.apache.hadoop.hbase.DoNotRetryIOException;
2826
import org.apache.hadoop.hbase.HBaseClassTestRule;
2927
import org.apache.hadoop.hbase.HBaseTestingUtil;
30-
import org.apache.hadoop.hbase.HRegionLocation;
3128
import org.apache.hadoop.hbase.TableName;
29+
import org.apache.hadoop.hbase.filter.PrefixFilter;
3230
import org.apache.hadoop.hbase.regionserver.HRegion;
3331
import org.apache.hadoop.hbase.testclassification.ClientTests;
3432
import org.apache.hadoop.hbase.testclassification.MediumTests;
@@ -46,10 +44,10 @@
4644
import org.junit.rules.TestName;
4745

4846
@Category({ MediumTests.class, ClientTests.class })
49-
public class TestAsyncTableRegionLocatorWithRegionReplicaId {
47+
public class TestScanOrGetWithReplicationFromClient {
5048
@ClassRule
5149
public static final HBaseClassTestRule CLASS_RULE =
52-
HBaseClassTestRule.forClass(TestAsyncTableRegionLocatorWithRegionReplicaId.class);
50+
HBaseClassTestRule.forClass(TestScanOrGetWithReplicationFromClient.class);
5351

5452
@Rule
5553
public TestName name = new TestName();
@@ -63,15 +61,13 @@ public class TestAsyncTableRegionLocatorWithRegionReplicaId {
6361
// region replica id starts from 0
6462
private static final int NON_EXISTING_REGION_REPLICA_ID = REGION_REPLICATION_COUNT;
6563
private static Connection connection;
66-
private static AsyncConnection asyncConn;
6764
private static Admin admin;
6865
private TableName tableName;
6966

7067
@BeforeClass
7168
public static void setUpBeforeClass() throws Exception {
7269
UTIL.startMiniCluster(1);
7370
connection = UTIL.getConnection();
74-
asyncConn = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get();
7571
admin = UTIL.getAdmin();
7672
}
7773

@@ -115,60 +111,98 @@ public void tearDown() throws Exception {
115111
}
116112

117113
@Test
118-
public void testMetaTableRegionLocatorWithRegionReplicaId()
119-
throws ExecutionException, InterruptedException {
120-
AsyncTableRegionLocator locator = asyncConn.getRegionLocator(TableName.META_TABLE_NAME);
121-
CompletableFuture<HRegionLocation> future =
122-
locator.getRegionLocation(tableName.getName(), RegionReplicaUtil.DEFAULT_REPLICA_ID, true);
123-
HRegionLocation hrl = future.get();
124-
assertNotNull(hrl);
114+
public void testScanMetaWithRegionReplicaId() throws IOException {
115+
Table metaTable = connection.getTable(TableName.META_TABLE_NAME);
116+
Scan scan = new Scan();
117+
scan.setFilter(new PrefixFilter(tableName.getName()));
118+
scan.setReplicaId(RegionReplicaUtil.DEFAULT_REPLICA_ID);
119+
scan.setConsistency(Consistency.TIMELINE);
120+
ResultScanner rs = metaTable.getScanner(scan);
121+
rs.forEach(r -> assertTrue(Bytes.toString(r.getRow()).contains(tableName.getNameAsString())));
125122
}
126123

127124
@Test
128-
public void testMetaTableRegionLocatorWithNonExistingRegionReplicaId()
129-
throws InterruptedException {
130-
AsyncTableRegionLocator locator = asyncConn.getRegionLocator(TableName.META_TABLE_NAME);
131-
CompletableFuture<HRegionLocation> future =
132-
locator.getRegionLocation(tableName.getName(), NON_EXISTING_REGION_REPLICA_ID, true);
125+
public void testScanMetaWithNonExistingRegionReplicaId() throws IOException {
126+
Table metaTable = connection.getTable(TableName.META_TABLE_NAME);
127+
Scan scan = new Scan();
128+
scan.setReplicaId(NON_EXISTING_REGION_REPLICA_ID);
129+
scan.setConsistency(Consistency.TIMELINE);
130+
exception.expect(DoNotRetryIOException.class);
131+
ResultScanner rs = metaTable.getScanner(scan);
133132
try {
134-
future.get();
135-
} catch (ExecutionException e) {
136-
assertTrue(e.getCause() instanceof DoNotRetryIOException);
133+
rs.forEach(r -> Bytes.toString(r.getRow()));
134+
} catch (Exception e) {
135+
Throwable throwable = e.getCause();
136+
assertTrue(throwable instanceof DoNotRetryIOException);
137137
String message = "The specified region replica id " + NON_EXISTING_REGION_REPLICA_ID
138138
+ " does not exist, the REGION_REPLICATION of this table "
139139
+ TableName.META_TABLE_NAME.getNameAsString() + " is "
140140
+ TableDescriptorBuilder.DEFAULT_REGION_REPLICATION + ", "
141141
+ "this means that the maximum region replica id you can specify is "
142142
+ (TableDescriptorBuilder.DEFAULT_REGION_REPLICATION - 1) + ".";
143-
assertEquals(message, e.getCause().getMessage());
143+
assertEquals(message, throwable.getMessage());
144+
}
145+
}
146+
147+
@Test
148+
public void testScanTableWithRegionReplicaId() throws IOException {
149+
Table table = connection.getTable(tableName);
150+
Scan scan = new Scan();
151+
scan.setReplicaId(RegionReplicaUtil.DEFAULT_REPLICA_ID);
152+
scan.setConsistency(Consistency.TIMELINE);
153+
ResultScanner rs = table.getScanner(scan);
154+
rs.forEach(r -> assertEquals(ROW, Bytes.toString(r.getRow())));
155+
}
156+
157+
@Test
158+
public void testScanTableWithNonExistingRegionReplicaId() throws IOException {
159+
Table table = connection.getTable(tableName);
160+
Scan scan = new Scan();
161+
scan.setReplicaId(NON_EXISTING_REGION_REPLICA_ID);
162+
scan.setConsistency(Consistency.TIMELINE);
163+
exception.expect(DoNotRetryIOException.class);
164+
ResultScanner rs = table.getScanner(scan);
165+
try {
166+
rs.forEach(r -> Bytes.toString(r.getRow()));
167+
} catch (Exception e) {
168+
Throwable throwable = e.getCause();
169+
assertTrue(throwable instanceof DoNotRetryIOException);
170+
String message = "The specified region replica id " + NON_EXISTING_REGION_REPLICA_ID
171+
+ " does not exist, the REGION_REPLICATION of this table " + tableName.getNameAsString()
172+
+ " is " + REGION_REPLICATION_COUNT + ", "
173+
+ "this means that the maximum region replica id you can specify is "
174+
+ (REGION_REPLICATION_COUNT - 1) + ".";
175+
assertEquals(message, throwable.getMessage());
144176
}
145177
}
146178

147179
@Test
148-
public void testTableRegionLocatorWithRegionReplicaId()
149-
throws ExecutionException, InterruptedException {
150-
AsyncTableRegionLocator locator = asyncConn.getRegionLocator(tableName);
151-
CompletableFuture<HRegionLocation> future =
152-
locator.getRegionLocation(Bytes.toBytes(ROW), RegionReplicaUtil.DEFAULT_REPLICA_ID, true);
153-
HRegionLocation hrl = future.get();
154-
assertNotNull(hrl);
180+
public void testGetTableWithRegionReplicaId() throws IOException {
181+
Table table = connection.getTable(tableName);
182+
Get get = new Get(Bytes.toBytes(ROW)).setConsistency(Consistency.TIMELINE)
183+
.setReplicaId(RegionReplicaUtil.DEFAULT_REPLICA_ID);
184+
Result result = table.get(get);
185+
assertEquals(ROW, Bytes.toString(result.getRow()));
186+
String value = Bytes.toString(result.getValue(FAMILY, Bytes.toBytes("q")));
187+
assertEquals("test_value", value);
155188
}
156189

157190
@Test
158-
public void testTableRegionLocatorWithNonExistingRegionReplicaId() throws InterruptedException {
159-
AsyncTableRegionLocator locator = asyncConn.getRegionLocator(tableName);
160-
CompletableFuture<HRegionLocation> future =
161-
locator.getRegionLocation(Bytes.toBytes(ROW), NON_EXISTING_REGION_REPLICA_ID, true);
191+
public void testGetTableWithNonExistingRegionReplicaId() throws IOException {
192+
Table table = connection.getTable(tableName);
193+
Get get = new Get(Bytes.toBytes(ROW)).setConsistency(Consistency.TIMELINE)
194+
.setReplicaId(NON_EXISTING_REGION_REPLICA_ID);
162195
try {
163-
future.get();
164-
} catch (ExecutionException e) {
165-
assertTrue(e.getCause() instanceof DoNotRetryIOException);
196+
Result result = table.get(get);
197+
result.getValue(FAMILY, Bytes.toBytes("q"));
198+
} catch (Exception e) {
199+
assertTrue(e instanceof DoNotRetryIOException);
166200
String message = "The specified region replica id " + NON_EXISTING_REGION_REPLICA_ID
167201
+ " does not exist, the REGION_REPLICATION of this table " + tableName.getNameAsString()
168202
+ " is " + REGION_REPLICATION_COUNT + ", "
169203
+ "this means that the maximum region replica id you can specify is "
170204
+ (REGION_REPLICATION_COUNT - 1) + ".";
171-
assertEquals(message, e.getCause().getMessage());
205+
assertEquals(message, e.getMessage());
172206
}
173207
}
174208
}

0 commit comments

Comments
 (0)