Skip to content

Commit 63d4970

Browse files
authored
HBASE-26150 Let region server also carry ClientMetaService (#3550)
Signed-off-by: Bharath Vissapragada <[email protected]>
1 parent 73a0411 commit 63d4970

File tree

26 files changed

+1309
-678
lines changed

26 files changed

+1309
-678
lines changed
Lines changed: 283 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,283 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.client;
19+
20+
import static org.apache.hadoop.hbase.trace.TraceUtil.trace;
21+
import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture;
22+
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
23+
24+
import com.google.errorprone.annotations.RestrictedApi;
25+
import java.io.IOException;
26+
import java.util.ArrayList;
27+
import java.util.Collections;
28+
import java.util.List;
29+
import java.util.Set;
30+
import java.util.concurrent.CompletableFuture;
31+
import java.util.concurrent.ConcurrentLinkedQueue;
32+
import java.util.concurrent.ThreadLocalRandom;
33+
import java.util.concurrent.atomic.AtomicInteger;
34+
import java.util.function.Predicate;
35+
import org.apache.hadoop.conf.Configuration;
36+
import org.apache.hadoop.hbase.HConstants;
37+
import org.apache.hadoop.hbase.HRegionLocation;
38+
import org.apache.hadoop.hbase.RegionLocations;
39+
import org.apache.hadoop.hbase.ServerName;
40+
import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil;
41+
import org.apache.hadoop.hbase.exceptions.MasterRegistryFetchException;
42+
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
43+
import org.apache.hadoop.hbase.ipc.RpcClient;
44+
import org.apache.hadoop.hbase.ipc.RpcClientFactory;
45+
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
46+
import org.apache.hadoop.hbase.security.User;
47+
import org.apache.hadoop.hbase.util.FutureUtils;
48+
import org.apache.yetus.audience.InterfaceAudience;
49+
50+
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
51+
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
52+
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
53+
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
54+
55+
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
56+
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService;
57+
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterRequest;
58+
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterResponse;
59+
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdRequest;
60+
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdResponse;
61+
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsRequest;
62+
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsResponse;
63+
64+
/**
65+
* Base class for rpc based connection registry implementation.
66+
* <p/>
67+
* The implementation needs a bootstrap node list in configuration, and then it will use the methods
68+
* in {@link ClientMetaService} to refresh the connection registry end points.
69+
* <p/>
70+
* It also supports hedged reads, the default fan out value is 2.
71+
* <p/>
72+
* For the actual configuration names, see javadoc of sub classes.
73+
*/
74+
@InterfaceAudience.Private
75+
abstract class AbstractRpcBasedConnectionRegistry implements ConnectionRegistry {
76+
77+
/** Default value for the fan out of hedged requests. **/
78+
public static final int HEDGED_REQS_FANOUT_DEFAULT = 2;
79+
80+
private final int hedgedReadFanOut;
81+
82+
// Configured list of end points to probe the meta information from.
83+
private volatile ImmutableMap<ServerName, ClientMetaService.Interface> addr2Stub;
84+
85+
// RPC client used to talk to the masters.
86+
private final RpcClient rpcClient;
87+
private final RpcControllerFactory rpcControllerFactory;
88+
private final int rpcTimeoutMs;
89+
90+
private final RegistryEndpointsRefresher registryEndpointRefresher;
91+
92+
protected AbstractRpcBasedConnectionRegistry(Configuration conf,
93+
String hedgedReqsFanoutConfigName, String refreshIntervalSecsConfigName,
94+
String minRefreshIntervalSecsConfigName) throws IOException {
95+
this.hedgedReadFanOut =
96+
Math.max(1, conf.getInt(hedgedReqsFanoutConfigName, HEDGED_REQS_FANOUT_DEFAULT));
97+
rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE,
98+
conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
99+
// XXX: we pass cluster id as null here since we do not have a cluster id yet, we have to fetch
100+
// this through the master registry...
101+
// This is a problem as we will use the cluster id to determine the authentication method
102+
rpcClient = RpcClientFactory.createClient(conf, null);
103+
rpcControllerFactory = RpcControllerFactory.instantiate(conf);
104+
populateStubs(getBootstrapNodes(conf));
105+
registryEndpointRefresher = new RegistryEndpointsRefresher(conf, refreshIntervalSecsConfigName,
106+
minRefreshIntervalSecsConfigName, this::refreshStubs);
107+
registryEndpointRefresher.start();
108+
}
109+
110+
protected abstract Set<ServerName> getBootstrapNodes(Configuration conf) throws IOException;
111+
112+
protected abstract CompletableFuture<Set<ServerName>> fetchEndpoints();
113+
114+
private void refreshStubs() throws IOException {
115+
populateStubs(FutureUtils.get(fetchEndpoints()));
116+
}
117+
118+
private void populateStubs(Set<ServerName> addrs) throws IOException {
119+
Preconditions.checkNotNull(addrs);
120+
ImmutableMap.Builder<ServerName, ClientMetaService.Interface> builder =
121+
ImmutableMap.builderWithExpectedSize(addrs.size());
122+
User user = User.getCurrent();
123+
for (ServerName masterAddr : addrs) {
124+
builder.put(masterAddr,
125+
ClientMetaService.newStub(rpcClient.createRpcChannel(masterAddr, user, rpcTimeoutMs)));
126+
}
127+
addr2Stub = builder.build();
128+
}
129+
130+
/**
131+
* For describing the actual asynchronous rpc call.
132+
* <p/>
133+
* Typically, you can use lambda expression to implement this interface as
134+
*
135+
* <pre>
136+
* (c, s, d) -> s.xxx(c, your request here, d)
137+
* </pre>
138+
*/
139+
@FunctionalInterface
140+
protected interface Callable<T> {
141+
void call(HBaseRpcController controller, ClientMetaService.Interface stub, RpcCallback<T> done);
142+
}
143+
144+
private <T extends Message> CompletableFuture<T> call(ClientMetaService.Interface stub,
145+
Callable<T> callable) {
146+
HBaseRpcController controller = rpcControllerFactory.newController();
147+
CompletableFuture<T> future = new CompletableFuture<>();
148+
callable.call(controller, stub, resp -> {
149+
if (controller.failed()) {
150+
IOException failureReason = controller.getFailed();
151+
future.completeExceptionally(failureReason);
152+
if (ClientExceptionsUtil.isConnectionException(failureReason)) {
153+
// RPC has failed, trigger a refresh of end points. We can have some spurious
154+
// refreshes, but that is okay since the RPC is not expensive and not in a hot path.
155+
registryEndpointRefresher.refreshNow();
156+
}
157+
} else {
158+
future.complete(resp);
159+
}
160+
});
161+
return future;
162+
}
163+
164+
private IOException badResponse(String debug) {
165+
return new IOException(String.format("Invalid result for request %s. Will be retried", debug));
166+
}
167+
168+
/**
169+
* send requests concurrently to hedgedReadsFanout end points. If any of the request is succeeded,
170+
* we will complete the future and quit. If all the requests in one round are failed, we will
171+
* start another round to send requests concurrently tohedgedReadsFanout end points. If all end
172+
* points have been tried and all of them are failed, we will fail the future.
173+
*/
174+
private <T extends Message> void groupCall(CompletableFuture<T> future, Set<ServerName> servers,
175+
List<ClientMetaService.Interface> stubs, int startIndexInclusive, Callable<T> callable,
176+
Predicate<T> isValidResp, String debug, ConcurrentLinkedQueue<Throwable> errors) {
177+
int endIndexExclusive = Math.min(startIndexInclusive + hedgedReadFanOut, stubs.size());
178+
AtomicInteger remaining = new AtomicInteger(endIndexExclusive - startIndexInclusive);
179+
for (int i = startIndexInclusive; i < endIndexExclusive; i++) {
180+
addListener(call(stubs.get(i), callable), (r, e) -> {
181+
// a simple check to skip all the later operations earlier
182+
if (future.isDone()) {
183+
return;
184+
}
185+
if (e == null && !isValidResp.test(r)) {
186+
e = badResponse(debug);
187+
}
188+
if (e != null) {
189+
// make sure when remaining reaches 0 we have all exceptions in the errors queue
190+
errors.add(e);
191+
if (remaining.decrementAndGet() == 0) {
192+
if (endIndexExclusive == stubs.size()) {
193+
// we are done, complete the future with exception
194+
RetriesExhaustedException ex =
195+
new RetriesExhaustedException("masters", stubs.size(), new ArrayList<>(errors));
196+
future.completeExceptionally(new MasterRegistryFetchException(servers, ex));
197+
} else {
198+
groupCall(future, servers, stubs, endIndexExclusive, callable, isValidResp, debug,
199+
errors);
200+
}
201+
}
202+
} else {
203+
// do not need to decrement the counter any more as we have already finished the future.
204+
future.complete(r);
205+
}
206+
});
207+
}
208+
}
209+
210+
protected final <T extends Message> CompletableFuture<T> call(Callable<T> callable,
211+
Predicate<T> isValidResp, String debug) {
212+
ImmutableMap<ServerName, ClientMetaService.Interface> addr2StubRef = addr2Stub;
213+
Set<ServerName> servers = addr2StubRef.keySet();
214+
List<ClientMetaService.Interface> stubs = new ArrayList<>(addr2StubRef.values());
215+
Collections.shuffle(stubs, ThreadLocalRandom.current());
216+
CompletableFuture<T> future = new CompletableFuture<>();
217+
groupCall(future, servers, stubs, 0, callable, isValidResp, debug,
218+
new ConcurrentLinkedQueue<>());
219+
return future;
220+
}
221+
222+
@RestrictedApi(explanation = "Should only be called in tests", link = "",
223+
allowedOnPath = ".*/src/test/.*")
224+
Set<ServerName> getParsedServers() {
225+
return addr2Stub.keySet();
226+
}
227+
228+
/**
229+
* Simple helper to transform the result of getMetaRegionLocations() rpc.
230+
*/
231+
private static RegionLocations transformMetaRegionLocations(GetMetaRegionLocationsResponse resp) {
232+
List<HRegionLocation> regionLocations = new ArrayList<>();
233+
resp.getMetaLocationsList()
234+
.forEach(location -> regionLocations.add(ProtobufUtil.toRegionLocation(location)));
235+
return new RegionLocations(regionLocations);
236+
}
237+
238+
@Override
239+
public CompletableFuture<RegionLocations> getMetaRegionLocations() {
240+
return tracedFuture(
241+
() -> this
242+
.<GetMetaRegionLocationsResponse> call(
243+
(c, s, d) -> s.getMetaRegionLocations(c,
244+
GetMetaRegionLocationsRequest.getDefaultInstance(), d),
245+
r -> r.getMetaLocationsCount() != 0, "getMetaLocationsCount")
246+
.thenApply(AbstractRpcBasedConnectionRegistry::transformMetaRegionLocations),
247+
getClass().getSimpleName() + ".getMetaRegionLocations");
248+
}
249+
250+
@Override
251+
public CompletableFuture<String> getClusterId() {
252+
return tracedFuture(
253+
() -> this
254+
.<GetClusterIdResponse> call(
255+
(c, s, d) -> s.getClusterId(c, GetClusterIdRequest.getDefaultInstance(), d),
256+
GetClusterIdResponse::hasClusterId, "getClusterId()")
257+
.thenApply(GetClusterIdResponse::getClusterId),
258+
getClass().getSimpleName() + ".getClusterId");
259+
}
260+
261+
@Override
262+
public CompletableFuture<ServerName> getActiveMaster() {
263+
return tracedFuture(
264+
() -> this
265+
.<GetActiveMasterResponse> call(
266+
(c, s, d) -> s.getActiveMaster(c, GetActiveMasterRequest.getDefaultInstance(), d),
267+
GetActiveMasterResponse::hasServerName, "getActiveMaster()")
268+
.thenApply(resp -> ProtobufUtil.toServerName(resp.getServerName())),
269+
getClass().getSimpleName() + ".getClusterId");
270+
}
271+
272+
@Override
273+
public void close() {
274+
trace(() -> {
275+
if (registryEndpointRefresher != null) {
276+
registryEndpointRefresher.stop();
277+
}
278+
if (rpcClient != null) {
279+
rpcClient.close();
280+
}
281+
}, getClass().getSimpleName() + ".close");
282+
}
283+
}

hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,7 @@ private ConnectionRegistryFactory() {
3636
*/
3737
static ConnectionRegistry getRegistry(Configuration conf) {
3838
Class<? extends ConnectionRegistry> clazz = conf.getClass(
39-
CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, MasterRegistry.class,
40-
ConnectionRegistry.class);
39+
CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, MasterRegistry.class, ConnectionRegistry.class);
4140
return ReflectionUtils.newInstance(clazz, conf);
4241
}
4342
}

0 commit comments

Comments
 (0)