diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRpcBasedConnectionRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRpcBasedConnectionRegistry.java new file mode 100644 index 000000000000..fbed214306c1 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRpcBasedConnectionRegistry.java @@ -0,0 +1,283 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.apache.hadoop.hbase.trace.TraceUtil.trace; +import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture; +import static org.apache.hadoop.hbase.util.FutureUtils.addListener; + +import com.google.errorprone.annotations.RestrictedApi; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Predicate; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.RegionLocations; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; +import org.apache.hadoop.hbase.exceptions.MasterRegistryFetchException; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.ipc.RpcClient; +import org.apache.hadoop.hbase.ipc.RpcClientFactory; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.util.FutureUtils; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; +import org.apache.hbase.thirdparty.com.google.protobuf.Message; +import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsResponse; + +/** + * Base class for rpc based connection registry implementation. + *

+ * The implementation needs a bootstrap node list in configuration, and then it will use the methods + * in {@link ClientMetaService} to refresh the connection registry end points. + *

+ * It also supports hedged reads, the default fan out value is 2. + *

+ * For the actual configuration names, see javadoc of sub classes. + */ +@InterfaceAudience.Private +abstract class AbstractRpcBasedConnectionRegistry implements ConnectionRegistry { + + /** Default value for the fan out of hedged requests. **/ + public static final int HEDGED_REQS_FANOUT_DEFAULT = 2; + + private final int hedgedReadFanOut; + + // Configured list of end points to probe the meta information from. + private volatile ImmutableMap addr2Stub; + + // RPC client used to talk to the masters. + private final RpcClient rpcClient; + private final RpcControllerFactory rpcControllerFactory; + private final int rpcTimeoutMs; + + private final RegistryEndpointsRefresher registryEndpointRefresher; + + protected AbstractRpcBasedConnectionRegistry(Configuration conf, + String hedgedReqsFanoutConfigName, String refreshIntervalSecsConfigName, + String minRefreshIntervalSecsConfigName) throws IOException { + this.hedgedReadFanOut = + Math.max(1, conf.getInt(hedgedReqsFanoutConfigName, HEDGED_REQS_FANOUT_DEFAULT)); + rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE, + conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); + // XXX: we pass cluster id as null here since we do not have a cluster id yet, we have to fetch + // this through the master registry... + // This is a problem as we will use the cluster id to determine the authentication method + rpcClient = RpcClientFactory.createClient(conf, null); + rpcControllerFactory = RpcControllerFactory.instantiate(conf); + populateStubs(getBootstrapNodes(conf)); + registryEndpointRefresher = new RegistryEndpointsRefresher(conf, refreshIntervalSecsConfigName, + minRefreshIntervalSecsConfigName, this::refreshStubs); + registryEndpointRefresher.start(); + } + + protected abstract Set getBootstrapNodes(Configuration conf) throws IOException; + + protected abstract CompletableFuture> fetchEndpoints(); + + private void refreshStubs() throws IOException { + populateStubs(FutureUtils.get(fetchEndpoints())); + } + + private void populateStubs(Set addrs) throws IOException { + Preconditions.checkNotNull(addrs); + ImmutableMap.Builder builder = + ImmutableMap.builderWithExpectedSize(addrs.size()); + User user = User.getCurrent(); + for (ServerName masterAddr : addrs) { + builder.put(masterAddr, + ClientMetaService.newStub(rpcClient.createRpcChannel(masterAddr, user, rpcTimeoutMs))); + } + addr2Stub = builder.build(); + } + + /** + * For describing the actual asynchronous rpc call. + *

+ * Typically, you can use lambda expression to implement this interface as + * + *

+   * (c, s, d) -> s.xxx(c, your request here, d)
+   * 
+ */ + @FunctionalInterface + protected interface Callable { + void call(HBaseRpcController controller, ClientMetaService.Interface stub, RpcCallback done); + } + + private CompletableFuture call(ClientMetaService.Interface stub, + Callable callable) { + HBaseRpcController controller = rpcControllerFactory.newController(); + CompletableFuture future = new CompletableFuture<>(); + callable.call(controller, stub, resp -> { + if (controller.failed()) { + IOException failureReason = controller.getFailed(); + future.completeExceptionally(failureReason); + if (ClientExceptionsUtil.isConnectionException(failureReason)) { + // RPC has failed, trigger a refresh of end points. We can have some spurious + // refreshes, but that is okay since the RPC is not expensive and not in a hot path. + registryEndpointRefresher.refreshNow(); + } + } else { + future.complete(resp); + } + }); + return future; + } + + private IOException badResponse(String debug) { + return new IOException(String.format("Invalid result for request %s. Will be retried", debug)); + } + + /** + * send requests concurrently to hedgedReadsFanout end points. If any of the request is succeeded, + * we will complete the future and quit. If all the requests in one round are failed, we will + * start another round to send requests concurrently tohedgedReadsFanout end points. If all end + * points have been tried and all of them are failed, we will fail the future. + */ + private void groupCall(CompletableFuture future, Set servers, + List stubs, int startIndexInclusive, Callable callable, + Predicate isValidResp, String debug, ConcurrentLinkedQueue errors) { + int endIndexExclusive = Math.min(startIndexInclusive + hedgedReadFanOut, stubs.size()); + AtomicInteger remaining = new AtomicInteger(endIndexExclusive - startIndexInclusive); + for (int i = startIndexInclusive; i < endIndexExclusive; i++) { + addListener(call(stubs.get(i), callable), (r, e) -> { + // a simple check to skip all the later operations earlier + if (future.isDone()) { + return; + } + if (e == null && !isValidResp.test(r)) { + e = badResponse(debug); + } + if (e != null) { + // make sure when remaining reaches 0 we have all exceptions in the errors queue + errors.add(e); + if (remaining.decrementAndGet() == 0) { + if (endIndexExclusive == stubs.size()) { + // we are done, complete the future with exception + RetriesExhaustedException ex = + new RetriesExhaustedException("masters", stubs.size(), new ArrayList<>(errors)); + future.completeExceptionally(new MasterRegistryFetchException(servers, ex)); + } else { + groupCall(future, servers, stubs, endIndexExclusive, callable, isValidResp, debug, + errors); + } + } + } else { + // do not need to decrement the counter any more as we have already finished the future. + future.complete(r); + } + }); + } + } + + protected final CompletableFuture call(Callable callable, + Predicate isValidResp, String debug) { + ImmutableMap addr2StubRef = addr2Stub; + Set servers = addr2StubRef.keySet(); + List stubs = new ArrayList<>(addr2StubRef.values()); + Collections.shuffle(stubs, ThreadLocalRandom.current()); + CompletableFuture future = new CompletableFuture<>(); + groupCall(future, servers, stubs, 0, callable, isValidResp, debug, + new ConcurrentLinkedQueue<>()); + return future; + } + + @RestrictedApi(explanation = "Should only be called in tests", link = "", + allowedOnPath = ".*/src/test/.*") + Set getParsedServers() { + return addr2Stub.keySet(); + } + + /** + * Simple helper to transform the result of getMetaRegionLocations() rpc. + */ + private static RegionLocations transformMetaRegionLocations(GetMetaRegionLocationsResponse resp) { + List regionLocations = new ArrayList<>(); + resp.getMetaLocationsList() + .forEach(location -> regionLocations.add(ProtobufUtil.toRegionLocation(location))); + return new RegionLocations(regionLocations); + } + + @Override + public CompletableFuture getMetaRegionLocations() { + return tracedFuture( + () -> this + . call( + (c, s, d) -> s.getMetaRegionLocations(c, + GetMetaRegionLocationsRequest.getDefaultInstance(), d), + r -> r.getMetaLocationsCount() != 0, "getMetaLocationsCount") + .thenApply(AbstractRpcBasedConnectionRegistry::transformMetaRegionLocations), + getClass().getSimpleName() + ".getMetaRegionLocations"); + } + + @Override + public CompletableFuture getClusterId() { + return tracedFuture( + () -> this + . call( + (c, s, d) -> s.getClusterId(c, GetClusterIdRequest.getDefaultInstance(), d), + GetClusterIdResponse::hasClusterId, "getClusterId()") + .thenApply(GetClusterIdResponse::getClusterId), + getClass().getSimpleName() + ".getClusterId"); + } + + @Override + public CompletableFuture getActiveMaster() { + return tracedFuture( + () -> this + . call( + (c, s, d) -> s.getActiveMaster(c, GetActiveMasterRequest.getDefaultInstance(), d), + GetActiveMasterResponse::hasServerName, "getActiveMaster()") + .thenApply(resp -> ProtobufUtil.toServerName(resp.getServerName())), + getClass().getSimpleName() + ".getClusterId"); + } + + @Override + public void close() { + trace(() -> { + if (registryEndpointRefresher != null) { + registryEndpointRefresher.stop(); + } + if (rpcClient != null) { + rpcClient.close(); + } + }, getClass().getSimpleName() + ".close"); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java index 5688dea3b8f5..3879d00280cc 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java @@ -36,8 +36,7 @@ private ConnectionRegistryFactory() { */ static ConnectionRegistry getRegistry(Configuration conf) { Class clazz = conf.getClass( - CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, MasterRegistry.class, - ConnectionRegistry.class); + CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, MasterRegistry.class, ConnectionRegistry.class); return ReflectionUtils.newInstance(clazz, conf); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterAddressRefresher.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterAddressRefresher.java deleted file mode 100644 index 3cbb9f74e3ec..000000000000 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterAddressRefresher.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.client; - -import java.io.Closeable; -import java.io.IOException; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; -import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; - -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService; - -/** - * Thread safe utility that keeps master end points used by {@link MasterRegistry} up to date. This - * uses the RPC {@link ClientMetaService#getMasters} to fetch the latest list of registered masters. - * By default the refresh happens periodically (configured via - * {@link #PERIODIC_REFRESH_INTERVAL_SECS}). The refresh can also be triggered on demand via - * {@link #refreshNow()}. To prevent a flood of on-demand refreshes we expect that any attempts two - * should be spaced at least {@link #MIN_SECS_BETWEEN_REFRESHES} seconds apart. - */ -@InterfaceAudience.Private -public class MasterAddressRefresher implements Closeable { - private static final Logger LOG = LoggerFactory.getLogger(MasterAddressRefresher.class); - public static final String PERIODIC_REFRESH_INTERVAL_SECS = - "hbase.client.master_registry.refresh_interval_secs"; - private static final int PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT = 300; - public static final String MIN_SECS_BETWEEN_REFRESHES = - "hbase.client.master_registry.min_secs_between_refreshes"; - private static final int MIN_SECS_BETWEEN_REFRESHES_DEFAULT = 60; - - private final ExecutorService pool; - private final MasterRegistry registry; - private final long periodicRefreshMs; - private final long timeBetweenRefreshesMs; - private final Object refreshMasters = new Object(); - - @Override - public void close() { - pool.shutdownNow(); - } - - /** - * Thread that refreshes the master end points until it is interrupted via {@link #close()}. - * Multiple callers attempting to refresh at the same time synchronize on {@link #refreshMasters}. - */ - private class RefreshThread implements Runnable { - @Override - public void run() { - long lastRpcTs = 0; - while (!Thread.interrupted()) { - try { - // Spurious wake ups are okay, worst case we make an extra RPC call to refresh. We won't - // have duplicate refreshes because once the thread is past the wait(), notify()s are - // ignored until the thread is back to the waiting state. - synchronized (refreshMasters) { - refreshMasters.wait(periodicRefreshMs); - } - long currentTs = EnvironmentEdgeManager.currentTime(); - if (lastRpcTs != 0 && currentTs - lastRpcTs <= timeBetweenRefreshesMs) { - continue; - } - lastRpcTs = currentTs; - LOG.debug("Attempting to refresh master address end points."); - Set newMasters = new HashSet<>(registry.getMasters().get()); - registry.populateMasterStubs(newMasters); - LOG.debug("Finished refreshing master end points. {}", newMasters); - } catch (InterruptedException e) { - LOG.debug("Interrupted during wait, aborting refresh-masters-thread.", e); - break; - } catch (ExecutionException | IOException e) { - LOG.debug("Error populating latest list of masters.", e); - } - } - LOG.info("Master end point refresher loop exited."); - } - } - - MasterAddressRefresher(Configuration conf, MasterRegistry registry) { - pool = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder() - .setNameFormat("master-registry-refresh-end-points").setDaemon(true).build()); - periodicRefreshMs = TimeUnit.SECONDS.toMillis(conf.getLong(PERIODIC_REFRESH_INTERVAL_SECS, - PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT)); - timeBetweenRefreshesMs = TimeUnit.SECONDS.toMillis(conf.getLong(MIN_SECS_BETWEEN_REFRESHES, - MIN_SECS_BETWEEN_REFRESHES_DEFAULT)); - Preconditions.checkArgument(periodicRefreshMs > 0); - Preconditions.checkArgument(timeBetweenRefreshesMs < periodicRefreshMs); - this.registry = registry; - pool.submit(new RefreshThread()); - } - - /** - * Notifies the refresher thread to refresh the configuration. This does not guarantee a refresh. - * See class comment for details. - */ - void refreshNow() { - synchronized (refreshMasters) { - refreshMasters.notify(); - } - } -} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java index 92239355ea8b..76477aac8773 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java @@ -18,55 +18,28 @@ package org.apache.hadoop.hbase.client; import static org.apache.hadoop.hbase.HConstants.MASTER_ADDRS_KEY; -import static org.apache.hadoop.hbase.trace.TraceUtil.trace; -import static org.apache.hadoop.hbase.trace.TraceUtil.tracedFuture; import static org.apache.hadoop.hbase.util.DNS.getHostname; -import static org.apache.hadoop.hbase.util.FutureUtils.addListener; +import com.google.errorprone.annotations.RestrictedApi; import java.io.IOException; import java.net.UnknownHostException; -import java.util.ArrayList; -import java.util.Collections; import java.util.HashSet; -import java.util.List; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.ThreadLocalRandom; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Predicate; import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionLocation; -import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; -import org.apache.hadoop.hbase.exceptions.MasterRegistryFetchException; -import org.apache.hadoop.hbase.ipc.HBaseRpcController; -import org.apache.hadoop.hbase.ipc.RpcClient; -import org.apache.hadoop.hbase.ipc.RpcClientFactory; -import org.apache.hadoop.hbase.ipc.RpcControllerFactory; -import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.DNS.ServerType; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.common.base.Strings; -import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; import org.apache.hbase.thirdparty.com.google.common.net.HostAndPort; -import org.apache.hbase.thirdparty.com.google.protobuf.Message; -import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersResponseEntry; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersResponse; /** * Master based registry implementation. Makes RPCs to the configured master addresses from config @@ -74,40 +47,31 @@ *

* It supports hedged reads, set the fan out of the requests batch by * {@link #MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY} to a value greater than {@code 1} will enable - * it(the default value is {@link #MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT}). + * it(the default value is {@link AbstractRpcBasedConnectionRegistry#HEDGED_REQS_FANOUT_DEFAULT}). *

* TODO: Handle changes to the configuration dynamically without having to restart the client. */ @InterfaceAudience.Private -public class MasterRegistry implements ConnectionRegistry { +public class MasterRegistry extends AbstractRpcBasedConnectionRegistry { /** Configuration key that controls the fan out of requests **/ public static final String MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY = "hbase.client.master_registry.hedged.fanout"; - /** Default value for the fan out of hedged requests. **/ - public static final int MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT = 2; + public static final String MASTER_REGISTRY_PERIODIC_REFRESH_INTERVAL_SECS = + "hbase.client.master_registry.refresh_interval_secs"; - private static final String MASTER_ADDRS_CONF_SEPARATOR = ","; - - private final int hedgedReadFanOut; - - // Configured list of masters to probe the meta information from. - private volatile ImmutableMap masterAddr2Stub; - - // RPC client used to talk to the masters. - private final RpcClient rpcClient; - private final RpcControllerFactory rpcControllerFactory; - private final int rpcTimeoutMs; + public static final String MASTER_REGISTRY_MIN_SECS_BETWEEN_REFRESHES = + "hbase.client.master_registry.min_secs_between_refreshes"; - protected final MasterAddressRefresher masterAddressRefresher; + private static final String MASTER_ADDRS_CONF_SEPARATOR = ","; /** * Parses the list of master addresses from the provided configuration. Supported format is comma * separated host[:port] values. If no port number if specified, default master port is assumed. * @param conf Configuration to parse from. */ - private static Set parseMasterAddrs(Configuration conf) throws UnknownHostException { + public static Set parseMasterAddrs(Configuration conf) throws UnknownHostException { Set masterAddrs = new HashSet<>(); String configuredMasters = getMasterAddr(conf); for (String masterAddr : configuredMasters.split(MASTER_ADDRS_CONF_SEPARATOR)) { @@ -120,31 +84,18 @@ private static Set parseMasterAddrs(Configuration conf) throws Unkno } MasterRegistry(Configuration conf) throws IOException { - this.hedgedReadFanOut = Math.max(1, conf.getInt(MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, - MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT)); - rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE, - conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); - // XXX: we pass cluster id as null here since we do not have a cluster id yet, we have to fetch - // this through the master registry... - // This is a problem as we will use the cluster id to determine the authentication method - rpcClient = RpcClientFactory.createClient(conf, null); - rpcControllerFactory = RpcControllerFactory.instantiate(conf); - // Generate the seed list of master stubs. Subsequent RPCs try to keep a live list of masters - // by fetching the end points from this list. - populateMasterStubs(parseMasterAddrs(conf)); - masterAddressRefresher = new MasterAddressRefresher(conf, this); + super(conf, MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, + MASTER_REGISTRY_PERIODIC_REFRESH_INTERVAL_SECS, MASTER_REGISTRY_MIN_SECS_BETWEEN_REFRESHES); } - void populateMasterStubs(Set masters) throws IOException { - Preconditions.checkNotNull(masters); - ImmutableMap.Builder builder = - ImmutableMap.builderWithExpectedSize(masters.size()); - User user = User.getCurrent(); - for (ServerName masterAddr : masters) { - builder.put(masterAddr, - ClientMetaService.newStub(rpcClient.createRpcChannel(masterAddr, user, rpcTimeoutMs))); - } - masterAddr2Stub = builder.build(); + @Override + protected Set getBootstrapNodes(Configuration conf) throws IOException { + return parseMasterAddrs(conf); + } + + @Override + protected CompletableFuture> fetchEndpoints() { + return getMasters(); } /** @@ -162,195 +113,18 @@ public static String getMasterAddr(Configuration conf) throws UnknownHostExcepti return String.format("%s:%d", hostname, port); } - /** - * For describing the actual asynchronous rpc call. - *

- * Typically, you can use lambda expression to implement this interface as - * - *

-   * (c, s, d) -> s.xxx(c, your request here, d)
-   * 
- */ - @FunctionalInterface - private interface Callable { - void call(HBaseRpcController controller, ClientMetaService.Interface stub, RpcCallback done); + private static Set transformServerNames(GetMastersResponse resp) { + return resp.getMasterServersList().stream() + .map(s -> ProtobufUtil.toServerName(s.getServerName())).collect(Collectors.toSet()); } - private CompletableFuture call(ClientMetaService.Interface stub, - Callable callable) { - HBaseRpcController controller = rpcControllerFactory.newController(); - CompletableFuture future = new CompletableFuture<>(); - callable.call(controller, stub, resp -> { - if (controller.failed()) { - IOException failureReason = controller.getFailed(); - future.completeExceptionally(failureReason); - if (ClientExceptionsUtil.isConnectionException(failureReason)) { - // RPC has failed, trigger a refresh of master end points. We can have some spurious - // refreshes, but that is okay since the RPC is not expensive and not in a hot path. - masterAddressRefresher.refreshNow(); - } - } else { - future.complete(resp); - } - }); - return future; - } - - private IOException badResponse(String debug) { - return new IOException(String.format("Invalid result for request %s. Will be retried", debug)); - } - - /** - * send requests concurrently to hedgedReadsFanout masters. If any of the request is succeeded, we - * will complete the future and quit. If all the requests in one round are failed, we will start - * another round to send requests concurrently tohedgedReadsFanout masters. If all masters have - * been tried and all of them are failed, we will fail the future. - */ - private void groupCall(CompletableFuture future, - Set masterServers, List masterStubs, - int startIndexInclusive, Callable callable, Predicate isValidResp, String debug, - ConcurrentLinkedQueue errors) { - int endIndexExclusive = Math.min(startIndexInclusive + hedgedReadFanOut, masterStubs.size()); - AtomicInteger remaining = new AtomicInteger(endIndexExclusive - startIndexInclusive); - for (int i = startIndexInclusive; i < endIndexExclusive; i++) { - addListener(call(masterStubs.get(i), callable), (r, e) -> { - // a simple check to skip all the later operations earlier - if (future.isDone()) { - return; - } - if (e == null && !isValidResp.test(r)) { - e = badResponse(debug); - } - if (e != null) { - // make sure when remaining reaches 0 we have all exceptions in the errors queue - errors.add(e); - if (remaining.decrementAndGet() == 0) { - if (endIndexExclusive == masterStubs.size()) { - // we are done, complete the future with exception - RetriesExhaustedException ex = new RetriesExhaustedException("masters", - masterStubs.size(), new ArrayList<>(errors)); - future.completeExceptionally( - new MasterRegistryFetchException(masterServers, ex)); - } else { - groupCall(future, masterServers, masterStubs, endIndexExclusive, callable, - isValidResp, debug, errors); - } - } - } else { - // do not need to decrement the counter any more as we have already finished the future. - future.complete(r); - } - }); - } - } - - private CompletableFuture call(Callable callable, - Predicate isValidResp, String debug) { - ImmutableMap masterAddr2StubRef = masterAddr2Stub; - Set masterServers = masterAddr2StubRef.keySet(); - List masterStubs = new ArrayList<>(masterAddr2StubRef.values()); - Collections.shuffle(masterStubs, ThreadLocalRandom.current()); - CompletableFuture future = new CompletableFuture<>(); - groupCall(future, masterServers, masterStubs, 0, callable, isValidResp, debug, - new ConcurrentLinkedQueue<>()); - return future; - } - - /** - * Simple helper to transform the result of getMetaRegionLocations() rpc. - */ - private static RegionLocations transformMetaRegionLocations(GetMetaRegionLocationsResponse resp) { - List regionLocations = new ArrayList<>(); - resp.getMetaLocationsList() - .forEach(location -> regionLocations.add(ProtobufUtil.toRegionLocation(location))); - return new RegionLocations(regionLocations); - } - - @Override - public CompletableFuture getMetaRegionLocations() { - return tracedFuture( - () -> this - . call( - (c, s, d) -> s.getMetaRegionLocations(c, - GetMetaRegionLocationsRequest.getDefaultInstance(), d), - r -> r.getMetaLocationsCount() != 0, "getMetaLocationsCount") - .thenApply(MasterRegistry::transformMetaRegionLocations), - "MasterRegistry.getMetaRegionLocations"); - } - - @Override - public CompletableFuture getClusterId() { - return tracedFuture(() -> this - . call( - (c, s, d) -> s.getClusterId(c, GetClusterIdRequest.getDefaultInstance(), d), - GetClusterIdResponse::hasClusterId, "getClusterId()") - .thenApply(GetClusterIdResponse::getClusterId), "MasterRegistry.getClusterId"); - } - - private static boolean hasActiveMaster(GetMastersResponse resp) { - List activeMasters = - resp.getMasterServersList().stream().filter(GetMastersResponseEntry::getIsActive).collect( - Collectors.toList()); - return activeMasters.size() == 1; - } - - private static ServerName filterActiveMaster(GetMastersResponse resp) throws IOException { - List activeMasters = - resp.getMasterServersList().stream().filter(GetMastersResponseEntry::getIsActive).collect( - Collectors.toList()); - if (activeMasters.size() != 1) { - throw new IOException(String.format("Incorrect number of active masters encountered." + - " Expected: 1 found: %d. Content: %s", activeMasters.size(), activeMasters)); - } - return ProtobufUtil.toServerName(activeMasters.get(0).getServerName()); - } - - @Override - public CompletableFuture getActiveMaster() { - return tracedFuture(() -> { - CompletableFuture future = new CompletableFuture<>(); - addListener(call((c, s, d) -> s.getMasters(c, GetMastersRequest.getDefaultInstance(), d), - MasterRegistry::hasActiveMaster, "getMasters()"), (resp, ex) -> { - if (ex != null) { - future.completeExceptionally(ex); - } - ServerName result = null; - try { - result = filterActiveMaster((GetMastersResponse) resp); - } catch (IOException e) { - future.completeExceptionally(e); - } - future.complete(result); - }); - return future; - }, "MasterRegistry.getActiveMaster"); - } - - private static List transformServerNames(GetMastersResponse resp) { - return resp.getMasterServersList().stream().map(s -> ProtobufUtil.toServerName( - s.getServerName())).collect(Collectors.toList()); - } - - CompletableFuture> getMasters() { + @RestrictedApi(explanation = "Should only be called in tests", link = "", + allowedOnPath = ".*/(.*/MasterRegistry.java|src/test/.*)") + CompletableFuture> getMasters() { return this - . call((c, s, d) -> s.getMasters( - c, GetMastersRequest.getDefaultInstance(), d), r -> r.getMasterServersCount() != 0, - "getMasters()").thenApply(MasterRegistry::transformServerNames); - } - - Set getParsedMasterServers() { - return masterAddr2Stub.keySet(); - } - - @Override - public void close() { - trace(() -> { - if (masterAddressRefresher != null) { - masterAddressRefresher.close(); - } - if (rpcClient != null) { - rpcClient.close(); - } - }, "MasterRegistry.close"); + . call( + (c, s, d) -> s.getMasters(c, GetMastersRequest.getDefaultInstance(), d), + r -> r.getMasterServersCount() != 0, "getMasters()") + .thenApply(MasterRegistry::transformServerNames); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegistryEndpointsRefresher.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegistryEndpointsRefresher.java new file mode 100644 index 000000000000..7eb81b0883f3 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegistryEndpointsRefresher.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.concurrent.TimeUnit; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; + +/** + * Thread safe utility that keeps registry end points used by {@link ConnectionRegistry} up to date. + * By default the refresh happens periodically (configured via {@code intervalSecsConfigName}). The + * refresh can also be triggered on demand via {@link #refreshNow()}. To prevent a flood of + * on-demand refreshes we expect that any attempts two should be spaced at least + * {@code minIntervalSecsConfigName} seconds apart. + */ +@InterfaceAudience.Private +class RegistryEndpointsRefresher { + + private static final Logger LOG = LoggerFactory.getLogger(RegistryEndpointsRefresher.class); + + public static final String PERIODIC_REFRESH_INTERVAL_SECS = + "hbase.client.rpc_registry.refresh_interval_secs"; + private static final int PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT = 300; + + public static final String MIN_SECS_BETWEEN_REFRESHES = + "hbase.client.rpc_registry.min_secs_between_refreshes"; + private static final int MIN_SECS_BETWEEN_REFRESHES_DEFAULT = 60; + + private final Thread thread; + private final Refresher refresher; + private final long periodicRefreshMs; + private final long minTimeBetweenRefreshesMs; + + private boolean refreshNow = false; + private boolean stopped = false; + + public void start() { + thread.start(); + } + + public synchronized void stop() { + stopped = true; + notifyAll(); + } + + // The main loop for the refresh thread. + private void mainLoop() { + long lastRefreshTime = EnvironmentEdgeManager.currentTime(); + for (;;) { + synchronized (this) { + for (;;) { + if (stopped) { + LOG.info("Registry end points refresher loop exited."); + return; + } + // if refreshNow is true, then we will wait until minTimeBetweenRefreshesMs elapsed, + // otherwise wait until periodicRefreshMs elapsed + long waitTime = (refreshNow ? minTimeBetweenRefreshesMs : periodicRefreshMs) - + (EnvironmentEdgeManager.currentTime() - lastRefreshTime); + if (waitTime <= 0) { + break; + } + try { + wait(waitTime); + } catch (InterruptedException e) { + LOG.warn("Interrupted during wait", e); + Thread.currentThread().interrupt(); + continue; + } + } + // we are going to refresh, reset this flag + refreshNow = false; + } + LOG.debug("Attempting to refresh registry end points"); + try { + refresher.refresh(); + } catch (IOException e) { + LOG.warn("Error refresh registry end points", e); + } + // We do not think it is a big deal to fail one time, so no matter what is refresh result, we + // just update this refresh time and wait for the next round. If later this becomes critical, + // could change to only update this value when we have done a successful refreshing. + lastRefreshTime = EnvironmentEdgeManager.currentTime(); + LOG.debug("Finished refreshing registry end points"); + } + } + + @FunctionalInterface + public interface Refresher { + + void refresh() throws IOException; + } + + RegistryEndpointsRefresher(Configuration conf, String intervalSecsConfigName, + String minIntervalSecsConfigName, Refresher refresher) { + periodicRefreshMs = TimeUnit.SECONDS + .toMillis(conf.getLong(intervalSecsConfigName, PERIODIC_REFRESH_INTERVAL_SECS_DEFAULT)); + minTimeBetweenRefreshesMs = TimeUnit.SECONDS + .toMillis(conf.getLong(minIntervalSecsConfigName, MIN_SECS_BETWEEN_REFRESHES_DEFAULT)); + Preconditions.checkArgument(periodicRefreshMs > 0); + Preconditions.checkArgument(minTimeBetweenRefreshesMs < periodicRefreshMs); + thread = new Thread(this::mainLoop); + thread.setName("Registry-endpoints-refresh-end-points"); + thread.setDaemon(true); + this.refresher = refresher; + } + + /** + * Notifies the refresher thread to refresh the configuration. This does not guarantee a refresh. + * See class comment for details. + */ + synchronized void refreshNow() { + refreshNow = true; + notifyAll(); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcConnectionRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcConnectionRegistry.java new file mode 100644 index 000000000000..0096bfc01712 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcConnectionRegistry.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.stream.Collectors; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.ServerName; +import org.apache.yetus.audience.InterfaceAudience; + +import org.apache.hbase.thirdparty.com.google.common.base.Splitter; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesResponse; + +/** + * Rpc based connection registry. It will make use of the {@link ClientMetaService} to get registry + * information. + *

+ * It needs bootstrap node list when start up, and then it will use {@link ClientMetaService} to + * refresh the bootstrap node list periodically. + *

+ * Usually, you could set masters as the bootstrap nodes,as they will also implement the + * {@link ClientMetaService}, and then, we will switch to use region servers after refreshing the + * bootstrap nodes. + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) +public class RpcConnectionRegistry extends AbstractRpcBasedConnectionRegistry { + + /** Configuration key that controls the fan out of requests **/ + public static final String HEDGED_REQS_FANOUT_KEY = "hbase.client.rpc_registry.hedged.fanout"; + + public static final String PERIODIC_REFRESH_INTERVAL_SECS = + "hbase.client.bootstrap.refresh_interval_secs"; + + public static final String MIN_SECS_BETWEEN_REFRESHES = + "hbase.client.bootstrap.min_secs_between_refreshes"; + + public static final String BOOTSTRAP_NODES = "hbase.client.bootstrap.servers"; + + private static final char ADDRS_CONF_SEPARATOR = ','; + + RpcConnectionRegistry(Configuration conf) throws IOException { + super(conf, HEDGED_REQS_FANOUT_KEY, PERIODIC_REFRESH_INTERVAL_SECS, MIN_SECS_BETWEEN_REFRESHES); + } + + @Override + protected Set getBootstrapNodes(Configuration conf) throws IOException { + // try get bootstrap nodes config first + String configuredBootstrapNodes = conf.get(BOOTSTRAP_NODES); + if (!StringUtils.isBlank(configuredBootstrapNodes)) { + return Splitter.on(ADDRS_CONF_SEPARATOR).trimResults().splitToStream(configuredBootstrapNodes) + .map(addr -> ServerName.valueOf(addr, ServerName.NON_STARTCODE)) + .collect(Collectors.toSet()); + } else { + // otherwise, just use master addresses + return MasterRegistry.parseMasterAddrs(conf); + } + } + + private static Set transformServerNames(GetBootstrapNodesResponse resp) { + return resp.getServerNameList().stream().map(ProtobufUtil::toServerName) + .collect(Collectors.toSet()); + } + + private CompletableFuture> getBootstrapNodes() { + return this + . call( + (c, s, d) -> s.getBootstrapNodes(c, GetBootstrapNodesRequest.getDefaultInstance(), d), + r -> r.getServerNameCount() != 0, "getBootstrapNodes()") + .thenApply(RpcConnectionRegistry::transformServerNames); + } + + @Override + protected CompletableFuture> fetchEndpoints() { + return getBootstrapNodes(); + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SecurityInfo.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SecurityInfo.java index f5f69224b67b..749190a6bbc9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SecurityInfo.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/SecurityInfo.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos; /** * Maps RPC protocol interfaces to required configuration @@ -49,7 +50,7 @@ public class SecurityInfo { new SecurityInfo(SecurityConstants.MASTER_KRB_PRINCIPAL, Kind.HBASE_AUTH_TOKEN)); infos.put(MasterProtos.HbckService.getDescriptor().getName(), new SecurityInfo(SecurityConstants.MASTER_KRB_PRINCIPAL, Kind.HBASE_AUTH_TOKEN)); - infos.put(MasterProtos.ClientMetaService.getDescriptor().getName(), + infos.put(RegistryProtos.ClientMetaService.getDescriptor().getName(), new SecurityInfo(SecurityConstants.MASTER_KRB_PRINCIPAL, Kind.HBASE_AUTH_TOKEN)); // NOTE: IF ADDING A NEW SERVICE, BE SURE TO UPDATE HBasePolicyProvider ALSO ELSE // new Service will not be found when all is Kerberized!!!! diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistryHedgedReads.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRpcBasedRegistryHedgedReads.java similarity index 78% rename from hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistryHedgedReads.java rename to hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRpcBasedRegistryHedgedReads.java index d1cd05c16572..66eee821c71c 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistryHedgedReads.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRpcBasedRegistryHedgedReads.java @@ -33,7 +33,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseCommonTestingUtil; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ipc.RpcClient; import org.apache.hadoop.hbase.ipc.RpcClientFactory; @@ -58,22 +57,30 @@ import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdResponse; @Category({ ClientTests.class, SmallTests.class }) -public class TestMasterRegistryHedgedReads { +public class TestRpcBasedRegistryHedgedReads { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestMasterRegistryHedgedReads.class); + HBaseClassTestRule.forClass(TestRpcBasedRegistryHedgedReads.class); - private static final Logger LOG = LoggerFactory.getLogger(TestMasterRegistryHedgedReads.class); + private static final Logger LOG = LoggerFactory.getLogger(TestRpcBasedRegistryHedgedReads.class); + + private static final String HEDGED_REQS_FANOUT_CONFIG_NAME = "hbase.test.hedged.reqs.fanout"; + private static final String REFRESH_INTERVAL_SECS_CONFIG_NAME = + "hbase.test.refresh.interval.secs"; + private static final String MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME = + "hbase.test.min.refresh.interval.secs"; private static final HBaseCommonTestingUtil UTIL = new HBaseCommonTestingUtil(); private static final ExecutorService EXECUTOR = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true).build()); + private static Set BOOTSTRAP_NODES; + private static AtomicInteger CALLED = new AtomicInteger(0); private static volatile int BAD_RESP_INDEX; @@ -142,14 +149,35 @@ public void callMethod(MethodDescriptor method, RpcController controller, Messag } } + private AbstractRpcBasedConnectionRegistry createRegistry(int hedged) throws IOException { + Configuration conf = UTIL.getConfiguration(); + conf.setInt(HEDGED_REQS_FANOUT_CONFIG_NAME, hedged); + return new AbstractRpcBasedConnectionRegistry(conf, HEDGED_REQS_FANOUT_CONFIG_NAME, + REFRESH_INTERVAL_SECS_CONFIG_NAME, MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME) { + + @Override + protected Set getBootstrapNodes(Configuration conf) throws IOException { + return BOOTSTRAP_NODES; + } + + @Override + protected CompletableFuture> fetchEndpoints() { + return CompletableFuture.completedFuture(BOOTSTRAP_NODES); + } + }; + } + @BeforeClass public static void setUpBeforeClass() { Configuration conf = UTIL.getConfiguration(); conf.setClass(RpcClientFactory.CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, RpcClientImpl.class, RpcClient.class); - String masters = IntStream.range(0, 10).mapToObj(i -> "localhost:" + (10000 + 100 * i)) - .collect(Collectors.joining(",")); - conf.set(HConstants.MASTER_ADDRS_KEY, masters); + // disable refresh, we do not need to refresh in this test + conf.setLong(REFRESH_INTERVAL_SECS_CONFIG_NAME, Integer.MAX_VALUE); + conf.setLong(MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME, Integer.MAX_VALUE - 1); + BOOTSTRAP_NODES = IntStream.range(0, 10) + .mapToObj(i -> ServerName.valueOf("localhost", (10000 + 100 * i), ServerName.NON_STARTCODE)) + .collect(Collectors.toSet()); } @AfterClass @@ -175,9 +203,7 @@ private T logIfError(CompletableFuture future) throws IOException { @Test public void testAllFailNoHedged() throws IOException { - Configuration conf = UTIL.getConfiguration(); - conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 1); - try (MasterRegistry registry = new MasterRegistry(conf)) { + try (AbstractRpcBasedConnectionRegistry registry = createRegistry(1)) { assertThrows(IOException.class, () -> logIfError(registry.getClusterId())); assertEquals(10, CALLED.get()); } @@ -185,10 +211,8 @@ public void testAllFailNoHedged() throws IOException { @Test public void testAllFailHedged3() throws IOException { - Configuration conf = UTIL.getConfiguration(); - conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 3); BAD_RESP_INDEX = 5; - try (MasterRegistry registry = new MasterRegistry(conf)) { + try (AbstractRpcBasedConnectionRegistry registry = createRegistry(3)) { assertThrows(IOException.class, () -> logIfError(registry.getClusterId())); assertEquals(10, CALLED.get()); } @@ -196,12 +220,10 @@ public void testAllFailHedged3() throws IOException { @Test public void testFirstSucceededNoHedge() throws IOException { - Configuration conf = UTIL.getConfiguration(); - // will be set to 1 - conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 0); GOOD_RESP_INDEXS = IntStream.range(0, 10).mapToObj(Integer::valueOf).collect(Collectors.toSet()); - try (MasterRegistry registry = new MasterRegistry(conf)) { + // will be set to 1 + try (AbstractRpcBasedConnectionRegistry registry = createRegistry(0)) { String clusterId = logIfError(registry.getClusterId()); assertEquals(RESP.getClusterId(), clusterId); assertEquals(1, CALLED.get()); @@ -210,10 +232,8 @@ public void testFirstSucceededNoHedge() throws IOException { @Test public void testSecondRoundSucceededHedge4() throws IOException { - Configuration conf = UTIL.getConfiguration(); - conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 4); GOOD_RESP_INDEXS = Collections.singleton(6); - try (MasterRegistry registry = new MasterRegistry(conf)) { + try (AbstractRpcBasedConnectionRegistry registry = createRegistry(4)) { String clusterId = logIfError(registry.getClusterId()); assertEquals(RESP.getClusterId(), clusterId); UTIL.waitFor(5000, () -> CALLED.get() == 8); @@ -222,10 +242,8 @@ public void testSecondRoundSucceededHedge4() throws IOException { @Test public void testSucceededWithLargestHedged() throws IOException, InterruptedException { - Configuration conf = UTIL.getConfiguration(); - conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, Integer.MAX_VALUE); GOOD_RESP_INDEXS = Collections.singleton(5); - try (MasterRegistry registry = new MasterRegistry(conf)) { + try (AbstractRpcBasedConnectionRegistry registry = createRegistry(Integer.MAX_VALUE)) { String clusterId = logIfError(registry.getClusterId()); assertEquals(RESP.getClusterId(), clusterId); UTIL.waitFor(5000, () -> CALLED.get() == 10); diff --git a/hbase-protocol-shaded/src/main/protobuf/server/Registry.proto b/hbase-protocol-shaded/src/main/protobuf/server/Registry.proto new file mode 100644 index 000000000000..8dd0d1abdf3d --- /dev/null +++ b/hbase-protocol-shaded/src/main/protobuf/server/Registry.proto @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +syntax = "proto2"; + +// The protos for ConnectionRegistry. +package hbase.pb; + +option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated"; +option java_outer_classname = "RegistryProtos"; +option java_generic_services = true; +option java_generate_equals_and_hash = true; +option optimize_for = SPEED; + +import "HBase.proto"; + +/** Request and response to get the clusterID for this cluster */ +message GetClusterIdRequest { +} +message GetClusterIdResponse { + /** Not set if cluster ID could not be determined. */ + optional string cluster_id = 1; +} + +/** Request and response to get the currently active master name for this cluster */ +message GetActiveMasterRequest { +} +message GetActiveMasterResponse { + /** Not set if an active master could not be determined. */ + optional ServerName server_name = 1; +} + +/** Request and response to get the current list of all registers master servers */ +message GetMastersRequest { + option deprecated = true; +} +message GetMastersResponseEntry { + option deprecated = true; + required ServerName server_name = 1; + required bool is_active = 2; +} +message GetMastersResponse { + option deprecated = true; + repeated GetMastersResponseEntry master_servers = 1; +} + +/** Request and response to get the current list of meta region locations */ +message GetMetaRegionLocationsRequest { +} +message GetMetaRegionLocationsResponse { + /** Not set if meta region locations could not be determined. */ + repeated RegionLocation meta_locations = 1; +} + +/** Request and response to get the nodes which could be used to as ClientMetaService */ +message GetBootstrapNodesRequest { +} +message GetBootstrapNodesResponse { + repeated ServerName server_name = 1; +} + +/** + * Implements all the RPCs needed by clients to look up cluster meta information needed for + * connection establishment. + */ +service ClientMetaService { + /** + * Get Cluster ID for this cluster. + */ + rpc GetClusterId(GetClusterIdRequest) returns(GetClusterIdResponse); + + /** + * Get active master server name for this cluster. Retained for out of sync client and master + * rolling upgrades. Newer clients switched to GetMasters RPC request. + */ + rpc GetActiveMaster(GetActiveMasterRequest) returns(GetActiveMasterResponse); + + /** + * Get registered list of master servers in this cluster. + */ + rpc GetMasters(GetMastersRequest) returns(GetMastersResponse) { + option deprecated = true; + }; + + /** + * Get current meta replicas' region locations. + */ + rpc GetMetaRegionLocations(GetMetaRegionLocationsRequest) returns(GetMetaRegionLocationsResponse); + + /** + * Get nodes which could be used as ClientMetaService + */ + rpc GetBootstrapNodes(GetBootstrapNodesRequest) returns (GetBootstrapNodesResponse); +} \ No newline at end of file diff --git a/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto b/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto index 3d265dd806c2..628b0ca77d87 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/master/Master.proto @@ -1295,65 +1295,3 @@ service HbckService { rpc FixMeta(FixMetaRequest) returns(FixMetaResponse); } - -/** Request and response to get the clusterID for this cluster */ -message GetClusterIdRequest { -} -message GetClusterIdResponse { - /** Not set if cluster ID could not be determined. */ - optional string cluster_id = 1; -} - -/** Request and response to get the currently active master name for this cluster */ -message GetActiveMasterRequest { -} -message GetActiveMasterResponse { - /** Not set if an active master could not be determined. */ - optional ServerName server_name = 1; -} - -/** Request and response to get the current list of all registers master servers */ -message GetMastersRequest { -} -message GetMastersResponseEntry { - required ServerName server_name = 1; - required bool is_active = 2; -} -message GetMastersResponse { - repeated GetMastersResponseEntry master_servers = 1; -} - -/** Request and response to get the current list of meta region locations */ -message GetMetaRegionLocationsRequest { -} -message GetMetaRegionLocationsResponse { - /** Not set if meta region locations could not be determined. */ - repeated RegionLocation meta_locations = 1; -} - -/** - * Implements all the RPCs needed by clients to look up cluster meta information needed for - * connection establishment. - */ -service ClientMetaService { - /** - * Get Cluster ID for this cluster. - */ - rpc GetClusterId(GetClusterIdRequest) returns(GetClusterIdResponse); - - /** - * Get active master server name for this cluster. Retained for out of sync client and master - * rolling upgrades. Newer clients switched to GetMasters RPC request. - */ - rpc GetActiveMaster(GetActiveMasterRequest) returns(GetActiveMasterResponse); - - /** - * Get registered list of master servers in this cluster. - */ - rpc GetMasters(GetMastersRequest) returns(GetMastersResponse); - - /** - * Get current meta replicas' region locations. - */ - rpc GetMetaRegionLocations(GetMetaRegionLocationsRequest) returns(GetMetaRegionLocationsResponse); -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetaRegionLocationCache.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/MetaRegionLocationCache.java similarity index 96% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetaRegionLocationCache.java rename to hbase-server/src/main/java/org/apache/hadoop/hbase/MetaRegionLocationCache.java index 07512d16fd60..b294f7be4a05 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MetaRegionLocationCache.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/MetaRegionLocationCache.java @@ -15,15 +15,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hbase.master; +package org.apache.hadoop.hbase; import java.util.ArrayList; import java.util.List; import java.util.Optional; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ThreadFactory; -import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.exceptions.DeserializationException; +import org.apache.hadoop.hbase.master.RegionState; import org.apache.hadoop.hbase.types.CopyOnWriteArrayMap; import org.apache.hadoop.hbase.util.RetryCounter; import org.apache.hadoop.hbase.util.RetryCounterFactory; @@ -35,7 +35,9 @@ import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; + import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; /** @@ -87,10 +89,10 @@ public MetaRegionLocationCache(ZKWatcher zkWatcher) { // are established. Subsequent updates are handled by the registered listener. Also, this runs // in a separate thread in the background to not block master init. ThreadFactory threadFactory = new ThreadFactoryBuilder().setDaemon(true).build(); - RetryCounterFactory retryFactory = new RetryCounterFactory( - Integer.MAX_VALUE, SLEEP_INTERVAL_MS_BETWEEN_RETRIES, SLEEP_INTERVAL_MS_MAX); - threadFactory.newThread( - ()->loadMetaLocationsFromZk(retryFactory.create(), ZNodeOpType.INIT)).start(); + RetryCounterFactory retryFactory = new RetryCounterFactory(Integer.MAX_VALUE, + SLEEP_INTERVAL_MS_BETWEEN_RETRIES, SLEEP_INTERVAL_MS_MAX); + threadFactory.newThread(() -> loadMetaLocationsFromZk(retryFactory.create(), ZNodeOpType.INIT)) + .start(); } /** diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index 1791ce4c4a55..e6ae03a91245 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.hbase.HConstants.HBASE_MASTER_LOGCLEANER_PLUGINS; import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK; import static org.apache.hadoop.hbase.util.DNS.MASTER_HOSTNAME_KEY; + import java.io.IOException; import java.io.InterruptedIOException; import java.lang.reflect.Constructor; @@ -222,6 +223,7 @@ import org.apache.zookeeper.KeeperException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.hbase.thirdparty.com.google.common.collect.Maps; import org.apache.hbase.thirdparty.com.google.common.collect.Sets; @@ -232,6 +234,7 @@ import org.apache.hbase.thirdparty.org.eclipse.jetty.server.ServerConnector; import org.apache.hbase.thirdparty.org.eclipse.jetty.servlet.ServletHolder; import org.apache.hbase.thirdparty.org.eclipse.jetty.webapp.WebAppContext; + import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos.SnapshotDescription; @@ -307,13 +310,6 @@ public class HMaster extends HRegionServer implements MasterServices { // manager of assignment nodes in zookeeper private AssignmentManager assignmentManager; - - /** - * Cache for the meta region replica's locations. Also tracks their changes to avoid stale - * cache entries. - */ - private final MetaRegionLocationCache metaRegionLocationCache; - private RSGroupInfoManager rsGroupInfoManager; // manager of replication @@ -480,7 +476,6 @@ public HMaster(final Configuration conf) throws IOException { } } - this.metaRegionLocationCache = new MetaRegionLocationCache(this.zooKeeper); this.activeMasterManager = createActiveMasterManager(zooKeeper, serverName, this); cachedClusterId = new CachedClusterId(this, conf); @@ -3810,10 +3805,6 @@ public void runReplicationBarrierCleaner() { } } - public MetaRegionLocationCache getMetaRegionLocationCache() { - return this.metaRegionLocationCache; - } - @Override public RSGroupInfoManager getRSGroupInfoManager() { return rsGroupInfoManager; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java index e7bf96dba1e9..9a55d1db935c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterRpcServices.java @@ -41,7 +41,6 @@ import org.apache.hadoop.hbase.ClusterMetricsBuilder; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.Server; @@ -75,7 +74,6 @@ import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.master.assignment.RegionStates; -import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure; import org.apache.hadoop.hbase.master.janitor.MetaFixer; import org.apache.hadoop.hbase.master.locking.LockProcedure; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; @@ -183,7 +181,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.BalanceResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClearDeadServersResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateNamespaceResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.CreateTableRequest; @@ -208,21 +205,12 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ExecProcedureResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FixMetaRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.FixMetaResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterStatusResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetCompletedSnapshotsResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetLocksResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMastersResponseEntry; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetNamespaceDescriptorResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetProcedureResultRequest; @@ -383,6 +371,16 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRSFatalErrorResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.ReportRegionStateTransitionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersResponseEntry; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.AddReplicationPeerResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos.DisableReplicationPeerRequest; @@ -3005,9 +3003,11 @@ private boolean shouldSubmitSCP(ServerName serverName) { return true; } + // Override this method since for backup master we will not set the clusterId field, which means + // we need to find another way to get cluster id for backup masters. @Override public GetClusterIdResponse getClusterId(RpcController rpcController, GetClusterIdRequest request) - throws ServiceException { + throws ServiceException { GetClusterIdResponse.Builder resp = GetClusterIdResponse.newBuilder(); String clusterId = master.getClusterId(); if (clusterId != null) { @@ -3016,40 +3016,43 @@ public GetClusterIdResponse getClusterId(RpcController rpcController, GetCluster return resp.build(); } + // Override this method since we use ActiveMasterManager to get active master on HMaster while in + // HRegionServer we use MasterAddressTracker @Override public GetActiveMasterResponse getActiveMaster(RpcController rpcController, - GetActiveMasterRequest request) throws ServiceException { + GetActiveMasterRequest request) throws ServiceException { GetActiveMasterResponse.Builder resp = GetActiveMasterResponse.newBuilder(); Optional serverName = master.getActiveMaster(); serverName.ifPresent(name -> resp.setServerName(ProtobufUtil.toServerName(name))); return resp.build(); } + // Override this method since we use ActiveMasterManager to get backup masters on HMaster while in + // HRegionServer we use MasterAddressTracker @Override public GetMastersResponse getMasters(RpcController rpcController, GetMastersRequest request) - throws ServiceException { + throws ServiceException { GetMastersResponse.Builder resp = GetMastersResponse.newBuilder(); // Active master Optional serverName = master.getActiveMaster(); serverName.ifPresent(name -> resp.addMasterServers(GetMastersResponseEntry.newBuilder() - .setServerName(ProtobufUtil.toServerName(name)).setIsActive(true).build())); + .setServerName(ProtobufUtil.toServerName(name)).setIsActive(true).build())); // Backup masters - for (ServerName backupMaster: master.getBackupMasters()) { - resp.addMasterServers(GetMastersResponseEntry.newBuilder().setServerName( - ProtobufUtil.toServerName(backupMaster)).setIsActive(false).build()); + for (ServerName backupMaster : master.getBackupMasters()) { + resp.addMasterServers(GetMastersResponseEntry.newBuilder() + .setServerName(ProtobufUtil.toServerName(backupMaster)).setIsActive(false).build()); } return resp.build(); } @Override - public GetMetaRegionLocationsResponse getMetaRegionLocations(RpcController rpcController, - GetMetaRegionLocationsRequest request) throws ServiceException { - GetMetaRegionLocationsResponse.Builder response = GetMetaRegionLocationsResponse.newBuilder(); - Optional> metaLocations = - master.getMetaRegionLocationCache().getMetaRegionLocations(); - metaLocations.ifPresent(hRegionLocations -> hRegionLocations.forEach( - location -> response.addMetaLocations(ProtobufUtil.toRegionLocation(location)))); - return response.build(); + public GetBootstrapNodesResponse getBootstrapNodes(RpcController controller, + GetBootstrapNodesRequest request) throws ServiceException { + GetBootstrapNodesResponse.Builder builder = GetBootstrapNodesResponse.newBuilder(); + for (ServerName sn : master.getServerManager().getOnlineServers().keySet()) { + builder.addServerName(ProtobufUtil.toServerName(sn)); + } + return builder.build(); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index c00a8b7d068e..d8a43d5279d6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -76,6 +76,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.HealthCheckChore; +import org.apache.hadoop.hbase.MetaRegionLocationCache; import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.PleaseHoldException; @@ -179,6 +180,7 @@ import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker; import org.apache.hadoop.hbase.zookeeper.MasterAddressTracker; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; +import org.apache.hadoop.hbase.zookeeper.RegionServerAddressTracker; import org.apache.hadoop.hbase.zookeeper.ZKClusterId; import org.apache.hadoop.hbase.zookeeper.ZKNodeTracker; import org.apache.hadoop.hbase.zookeeper.ZKUtil; @@ -423,6 +425,16 @@ public class HRegionServer extends Thread implements // master address tracker private final MasterAddressTracker masterAddressTracker; + /** + * Cache for the meta region replica's locations. Also tracks their changes to avoid stale cache + * entries. Used for serving ClientMetaService. + */ + private final MetaRegionLocationCache metaRegionLocationCache; + /** + * Cache for all the region servers in the cluster. Used for serving ClientMetaService. + */ + private final RegionServerAddressTracker regionServerAddressTracker; + // Cluster Status Tracker protected final ClusterStatusTracker clusterStatusTracker; @@ -669,6 +681,8 @@ public HRegionServer(final Configuration conf) throws IOException { clusterStatusTracker = null; } this.rpcServices.start(zooKeeper); + this.metaRegionLocationCache = new MetaRegionLocationCache(zooKeeper); + this.regionServerAddressTracker = new RegionServerAddressTracker(zooKeeper, this); // This violates 'no starting stuff in Constructor' but Master depends on the below chore // and executor being created and takes a different startup route. Lots of overlap between HRS // and M (An M IS A HRS now). Need to refactor so less duplication between M and its super @@ -3992,4 +4006,12 @@ public CompactedHFilesDischarger getCompactedHFilesDischarger() { public long getRetryPauseTime() { return this.retryPauseTime; } + + public MetaRegionLocationCache getMetaRegionLocationCache() { + return this.metaRegionLocationCache; + } + + RegionServerAddressTracker getRegionServerAddressTracker() { + return regionServerAddressTracker; + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index 4dacb7fd3095..56c53714bc47 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.DroppedSnapshotException; import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.MultiActionResultTooLarge; import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.PrivateCellUtil; @@ -160,6 +161,7 @@ import org.apache.hbase.thirdparty.com.google.common.cache.Cache; import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder; +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; import org.apache.hbase.thirdparty.com.google.protobuf.Message; @@ -258,6 +260,18 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse.TableQuotaSnapshot; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersResponseEntry; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.BulkLoadDescriptor; import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.CompactionDescriptor; @@ -270,8 +284,8 @@ @InterfaceAudience.Private @SuppressWarnings("deprecation") public class RSRpcServices implements HBaseRPCErrorHandler, - AdminService.BlockingInterface, ClientService.BlockingInterface, PriorityFunction, - ConfigurationObserver { + AdminService.BlockingInterface, ClientService.BlockingInterface, + ClientMetaService.BlockingInterface, PriorityFunction, ConfigurationObserver { private static final Logger LOG = LoggerFactory.getLogger(RSRpcServices.class); /** RPC scheduler to use for the region server. */ @@ -377,9 +391,11 @@ public class RSRpcServices implements HBaseRPCErrorHandler, * where you would ever turn off one or the other). */ public static final String REGIONSERVER_ADMIN_SERVICE_CONFIG = - "hbase.regionserver.admin.executorService"; + "hbase.regionserver.admin.executorService"; public static final String REGIONSERVER_CLIENT_SERVICE_CONFIG = - "hbase.regionserver.client.executorService"; + "hbase.regionserver.client.executorService"; + public static final String REGIONSERVER_CLIENT_META_SERVICE_CONFIG = + "hbase.regionserver.client.meta.executorService"; /** * An Rpc callback for closing a RegionScanner. @@ -1582,23 +1598,24 @@ protected void checkOpen() throws IOException { * supports */ protected List getServices() { - boolean admin = - getConfiguration().getBoolean(REGIONSERVER_ADMIN_SERVICE_CONFIG, true); - boolean client = - getConfiguration().getBoolean(REGIONSERVER_CLIENT_SERVICE_CONFIG, true); + boolean admin = getConfiguration().getBoolean(REGIONSERVER_ADMIN_SERVICE_CONFIG, true); + boolean client = getConfiguration().getBoolean(REGIONSERVER_CLIENT_SERVICE_CONFIG, true); + boolean clientMeta = + getConfiguration().getBoolean(REGIONSERVER_CLIENT_META_SERVICE_CONFIG, true); List bssi = new ArrayList<>(); if (client) { - bssi.add(new BlockingServiceAndInterface( - ClientService.newReflectiveBlockingService(this), - ClientService.BlockingInterface.class)); + bssi.add(new BlockingServiceAndInterface(ClientService.newReflectiveBlockingService(this), + ClientService.BlockingInterface.class)); } if (admin) { - bssi.add(new BlockingServiceAndInterface( - AdminService.newReflectiveBlockingService(this), - AdminService.BlockingInterface.class)); + bssi.add(new BlockingServiceAndInterface(AdminService.newReflectiveBlockingService(this), + AdminService.BlockingInterface.class)); } - return new org.apache.hbase.thirdparty.com.google.common.collect. - ImmutableList.Builder().addAll(bssi).build(); + if (clientMeta) { + bssi.add(new BlockingServiceAndInterface(ClientMetaService.newReflectiveBlockingService(this), + ClientMetaService.BlockingInterface.class)); + } + return new ImmutableList.Builder().addAll(bssi).build(); } public InetSocketAddress getSocketAddress() { @@ -4064,4 +4081,61 @@ protected AccessChecker getAccessChecker() { protected ZKPermissionWatcher getZkPermissionWatcher() { return zkPermissionWatcher; } + + @Override + public GetClusterIdResponse getClusterId(RpcController controller, GetClusterIdRequest request) + throws ServiceException { + return GetClusterIdResponse.newBuilder().setClusterId(regionServer.getClusterId()).build(); + } + + @Override + public GetActiveMasterResponse getActiveMaster(RpcController controller, + GetActiveMasterRequest request) throws ServiceException { + GetActiveMasterResponse.Builder builder = GetActiveMasterResponse.newBuilder(); + ServerName activeMaster = regionServer.getMasterAddressTracker().getMasterAddress(); + if (activeMaster != null) { + builder.setServerName(ProtobufUtil.toServerName(activeMaster)); + } + return builder.build(); + } + + @Override + public GetMastersResponse getMasters(RpcController controller, GetMastersRequest request) + throws ServiceException { + try { + GetMastersResponse.Builder builder = GetMastersResponse.newBuilder(); + ServerName activeMaster = regionServer.getMasterAddressTracker().getMasterAddress(); + if (activeMaster != null) { + builder.addMasterServers(GetMastersResponseEntry.newBuilder() + .setServerName(ProtobufUtil.toServerName(activeMaster)).setIsActive(true)); + } + for (ServerName backupMaster : regionServer.getMasterAddressTracker().getBackupMasters()) { + builder.addMasterServers(GetMastersResponseEntry.newBuilder() + .setServerName(ProtobufUtil.toServerName(backupMaster)).setIsActive(false)); + } + return builder.build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public GetMetaRegionLocationsResponse getMetaRegionLocations(RpcController controller, + GetMetaRegionLocationsRequest request) throws ServiceException { + GetMetaRegionLocationsResponse.Builder builder = GetMetaRegionLocationsResponse.newBuilder(); + Optional> metaLocations = + regionServer.getMetaRegionLocationCache().getMetaRegionLocations(); + metaLocations.ifPresent(hRegionLocations -> hRegionLocations + .forEach(location -> builder.addMetaLocations(ProtobufUtil.toRegionLocation(location)))); + return builder.build(); + } + + @Override + public GetBootstrapNodesResponse getBootstrapNodes(RpcController controller, + GetBootstrapNodesRequest request) throws ServiceException { + GetBootstrapNodesResponse.Builder builder = GetBootstrapNodesResponse.newBuilder(); + regionServer.getRegionServerAddressTracker().getRegionServers().stream() + .map(ProtobufUtil::toServerName).forEach(builder::addServerName); + return builder.build(); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBasePolicyProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBasePolicyProvider.java index b7ab7f005963..8fbe6ac418dd 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBasePolicyProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/security/HBasePolicyProvider.java @@ -17,18 +17,20 @@ */ package org.apache.hadoop.hbase.security; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService; import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.authorize.ProxyUsers; import org.apache.hadoop.security.authorize.Service; import org.apache.hadoop.security.authorize.ServiceAuthorizationManager; import org.apache.yetus.audience.InterfaceAudience; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionServerStatusService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos; + /** * Implementation of secure Hadoop policy provider for mapping * protocol interfaces to hbase-policy.xml entries. @@ -41,7 +43,7 @@ public class HBasePolicyProvider extends PolicyProvider { new Service("security.client.protocol.acl", MasterProtos.HbckService.BlockingInterface.class), new Service("security.client.protocol.acl", - MasterProtos.ClientMetaService.BlockingInterface.class), + RegistryProtos.ClientMetaService.BlockingInterface.class), new Service("security.admin.protocol.acl", MasterService.BlockingInterface.class), new Service("security.masterregion.protocol.acl", RegionServerStatusService.BlockingInterface.class) diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/FromClientSideBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/FromClientSideBase.java index 540acc602233..21fa57a56615 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/FromClientSideBase.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/FromClientSideBase.java @@ -96,7 +96,7 @@ protected static boolean isSameParameterizedCluster(Class registryImpl, int n Class confClass = conf.getClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, ZKConnectionRegistry.class); int hedgedReqConfig = conf.getInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, - MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT); + AbstractRpcBasedConnectionRegistry.HEDGED_REQS_FANOUT_DEFAULT); return confClass.getName().equals(registryImpl.getName()) && numHedgedReqs == hedgedReqConfig; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterAddressRefresher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterAddressRefresher.java deleted file mode 100644 index ad1e738f1783..000000000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterAddressRefresher.java +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.client; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.Waiter; -import org.apache.hadoop.hbase.testclassification.ClientTests; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.junit.Assert; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles; - -@Category({ClientTests.class, SmallTests.class}) -public class TestMasterAddressRefresher { - - @ClassRule - public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestMasterAddressRefresher.class); - - private class DummyMasterRegistry extends MasterRegistry { - - private final AtomicInteger getMastersCallCounter = new AtomicInteger(0); - private final List callTimeStamps = new ArrayList<>(); - - DummyMasterRegistry(Configuration conf) throws IOException { - super(conf); - } - - @Override - CompletableFuture> getMasters() { - getMastersCallCounter.incrementAndGet(); - callTimeStamps.add(EnvironmentEdgeManager.currentTime()); - return CompletableFuture.completedFuture(new ArrayList<>()); - } - - public int getMastersCount() { - return getMastersCallCounter.get(); - } - - public List getCallTimeStamps() { - return callTimeStamps; - } - } - - @Test - public void testPeriodicMasterEndPointRefresh() throws IOException { - Configuration conf = HBaseConfiguration.create(); - // Refresh every 1 second. - conf.setLong(MasterAddressRefresher.PERIODIC_REFRESH_INTERVAL_SECS, 1); - conf.setLong(MasterAddressRefresher.MIN_SECS_BETWEEN_REFRESHES, 0); - try (DummyMasterRegistry registry = new DummyMasterRegistry(conf)) { - // Wait for > 3 seconds to see that at least 3 getMasters() RPCs have been made. - Waiter.waitFor( - conf, 5000, (Waiter.Predicate) () -> registry.getMastersCount() > 3); - } - } - - @Test - public void testDurationBetweenRefreshes() throws IOException { - Configuration conf = HBaseConfiguration.create(); - // Disable periodic refresh - conf.setLong(MasterAddressRefresher.PERIODIC_REFRESH_INTERVAL_SECS, Integer.MAX_VALUE); - // A minimum duration of 1s between refreshes - conf.setLong(MasterAddressRefresher.MIN_SECS_BETWEEN_REFRESHES, 1); - try (DummyMasterRegistry registry = new DummyMasterRegistry(conf)) { - // Issue a ton of manual refreshes. - for (int i = 0; i < 10000; i++) { - registry.masterAddressRefresher.refreshNow(); - Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS); - } - // Overall wait time is 10000 ms, so the number of requests should be <=10 - List callTimeStamps = registry.getCallTimeStamps(); - // Actual calls to getMasters() should be much lower than the refresh count. - Assert.assertTrue( - String.valueOf(registry.getMastersCount()), registry.getMastersCount() <= 20); - Assert.assertTrue(callTimeStamps.size() > 0); - // Verify that the delta between subsequent RPCs is at least 1sec as configured. - for (int i = 1; i < callTimeStamps.size() - 1; i++) { - long delta = callTimeStamps.get(i) - callTimeStamps.get(i - 1); - // Few ms cushion to account for any env jitter. - Assert.assertTrue(callTimeStamps.toString(), delta > 990); - } - } - - } -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java index e4bdff96f7a7..9d2936bc52b5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java @@ -53,7 +53,7 @@ public class TestMasterRegistry { @ClassRule public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestMasterRegistry.class); + HBaseClassTestRule.forClass(TestMasterRegistry.class); private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); @BeforeClass @@ -90,7 +90,7 @@ public void testMasterAddressParsing() throws IOException { int numMasters = 10; conf.set(HConstants.MASTER_ADDRS_KEY, generateDummyMastersList(numMasters)); try (MasterRegistry registry = new MasterRegistry(conf)) { - List parsedMasters = new ArrayList<>(registry.getParsedMasterServers()); + List parsedMasters = new ArrayList<>(registry.getParsedServers()); // Half of them would be without a port, duplicates are removed. assertEquals(numMasters / 2 + 1, parsedMasters.size()); // Sort in the increasing order of port numbers. @@ -149,17 +149,17 @@ public void testDynamicMasterConfigurationRefresh() throws Exception { // Set the hedging fan out so that all masters are queried. conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 4); // Do not limit the number of refreshes during the test run. - conf.setLong(MasterAddressRefresher.MIN_SECS_BETWEEN_REFRESHES, 0); + conf.setLong(MasterRegistry.MASTER_REGISTRY_MIN_SECS_BETWEEN_REFRESHES, 0); try (MasterRegistry registry = new MasterRegistry(conf)) { - final Set masters = registry.getParsedMasterServers(); + final Set masters = registry.getParsedServers(); assertTrue(masters.contains(badServer)); // Make a registry RPC, this should trigger a refresh since one of the hedged RPC fails. assertEquals(registry.getClusterId().get(), clusterId); // Wait for new set of masters to be populated. TEST_UTIL.waitFor(5000, - (Waiter.Predicate) () -> !registry.getParsedMasterServers().equals(masters)); + (Waiter.Predicate) () -> !registry.getParsedServers().equals(masters)); // new set of masters should not include the bad server - final Set newMasters = registry.getParsedMasterServers(); + final Set newMasters = registry.getParsedServers(); // Bad one should be out. assertEquals(3, newMasters.size()); assertFalse(newMasters.contains(badServer)); @@ -170,8 +170,8 @@ public void testDynamicMasterConfigurationRefresh() throws Exception { TEST_UTIL.getMiniHBaseCluster().waitForActiveAndReadyMaster(10000); // Wait until the killed master de-registered. This should also trigger another refresh. TEST_UTIL.waitFor(10000, () -> registry.getMasters().get().size() == 2); - TEST_UTIL.waitFor(20000, () -> registry.getParsedMasterServers().size() == 2); - final Set newMasters2 = registry.getParsedMasterServers(); + TEST_UTIL.waitFor(20000, () -> registry.getParsedServers().size() == 2); + final Set newMasters2 = registry.getParsedServers(); assertEquals(2, newMasters2.size()); assertFalse(newMasters2.contains(activeMaster.getServerName())); } finally { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java index 2197a218fb28..2bfe0849fe6d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java @@ -28,11 +28,11 @@ import org.apache.hadoop.hbase.HBaseTestingUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.MetaRegionLocationCache; import org.apache.hadoop.hbase.MultithreadedTestUtil; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.master.HMaster; -import org.apache.hadoop.hbase.master.MetaRegionLocationCache; import org.apache.hadoop.hbase.master.RegionState; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.SmallTests; @@ -94,20 +94,20 @@ private void verifyCachedMetaLocations(HMaster master) throws Exception { break; } } - List metaHRLs = - master.getMetaRegionLocationCache().getMetaRegionLocations().get(); - assertFalse(metaHRLs.isEmpty()); ZKWatcher zk = master.getZooKeeper(); List metaZnodes = zk.getMetaReplicaNodes(); // Wait till all replicas available. retries = 0; - while (master.getMetaRegionLocationCache().getMetaRegionLocations().get().size() != - metaZnodes.size()) { + while (master.getMetaRegionLocationCache().getMetaRegionLocations().get().size() != metaZnodes + .size()) { Thread.sleep(1000); if (++retries == 10) { break; } } + List metaHRLs = + master.getMetaRegionLocationCache().getMetaRegionLocations().get(); + assertFalse(metaHRLs.isEmpty()); assertEquals(metaZnodes.size(), metaHRLs.size()); List actualHRLs = getCurrentMetaLocations(zk); Collections.sort(metaHRLs); @@ -115,13 +115,14 @@ private void verifyCachedMetaLocations(HMaster master) throws Exception { assertEquals(actualHRLs, metaHRLs); } - @Test public void testInitialMetaLocations() throws Exception { + @Test + public void testInitialMetaLocations() throws Exception { verifyCachedMetaLocations(TEST_UTIL.getMiniHBaseCluster().getMaster()); } - @Test public void testStandByMetaLocations() throws Exception { + @Test + public void testStandByMetaLocations() throws Exception { HMaster standBy = TEST_UTIL.getMiniHBaseCluster().startMaster().getMaster(); - standBy.isInitialized(); verifyCachedMetaLocations(standBy); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRegistryEndpointsRefresher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRegistryEndpointsRefresher.java new file mode 100644 index 000000000000..1447099168f4 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRegistryEndpointsRefresher.java @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles; + +@Category({ ClientTests.class, SmallTests.class }) +public class TestRegistryEndpointsRefresher { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRegistryEndpointsRefresher.class); + + private static final String INTERVAL_SECS_CONFIG_NAME = + "hbase.test.registry.refresh.interval.secs"; + private static final String MIN_INTERVAL_SECS_CONFIG_NAME = + "hbase.test.registry.refresh.min.interval.secs"; + + private Configuration conf; + private RegistryEndpointsRefresher refresher; + private AtomicInteger getMastersCallCounter; + private CopyOnWriteArrayList callTimestamps; + + @Before + public void setUp() { + conf = HBaseConfiguration.create(); + getMastersCallCounter = new AtomicInteger(0); + callTimestamps = new CopyOnWriteArrayList<>(); + } + + @After + public void tearDown() { + if (refresher != null) { + refresher.stop(); + } + } + + private void refresh() { + getMastersCallCounter.incrementAndGet(); + callTimestamps.add(EnvironmentEdgeManager.currentTime()); + } + + private void createAndStartRefresher(long intervalSecs, long minIntervalSecs) { + conf.setLong(INTERVAL_SECS_CONFIG_NAME, intervalSecs); + conf.setLong(MIN_INTERVAL_SECS_CONFIG_NAME, minIntervalSecs); + refresher = new RegistryEndpointsRefresher(conf, INTERVAL_SECS_CONFIG_NAME, + MIN_INTERVAL_SECS_CONFIG_NAME, this::refresh); + refresher.start(); + } + + @Test + public void testPeriodicMasterEndPointRefresh() throws IOException { + // Refresh every 1 second. + createAndStartRefresher(1, 0); + // Wait for > 3 seconds to see that at least 3 getMasters() RPCs have been made. + Waiter.waitFor(conf, 5000, () -> getMastersCallCounter.get() > 3); + } + + @Test + public void testDurationBetweenRefreshes() throws IOException { + // Disable periodic refresh + // A minimum duration of 1s between refreshes + createAndStartRefresher(Integer.MAX_VALUE, 1); + // Issue a ton of manual refreshes. + for (int i = 0; i < 10000; i++) { + refresher.refreshNow(); + Uninterruptibles.sleepUninterruptibly(1, TimeUnit.MILLISECONDS); + } + // Overall wait time is 10000 ms, so the number of requests should be <=10 + // Actual calls to getMasters() should be much lower than the refresh count. + assertTrue(String.valueOf(getMastersCallCounter.get()), getMastersCallCounter.get() <= 20); + assertTrue(callTimestamps.size() > 0); + // Verify that the delta between subsequent RPCs is at least 1sec as configured. + for (int i = 1; i < callTimestamps.size() - 1; i++) { + long delta = callTimestamps.get(i) - callTimestamps.get(i - 1); + // Few ms cushion to account for any env jitter. + assertTrue(callTimestamps.toString(), delta > 990); + } + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcConnectionRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcConnectionRegistry.java new file mode 100644 index 000000000000..70683733e435 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcConnectionRegistry.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import static org.hamcrest.CoreMatchers.hasItems; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +import org.apache.hbase.thirdparty.com.google.common.io.Closeables; + +@Category({ MediumTests.class, ClientTests.class }) +public class TestRpcConnectionRegistry { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRpcConnectionRegistry.class); + + private static final HBaseTestingUtil UTIL = new HBaseTestingUtil(); + + private RpcConnectionRegistry registry; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + // allow refresh immediately so we will switch to use region servers soon. + UTIL.getConfiguration().setLong(RpcConnectionRegistry.PERIODIC_REFRESH_INTERVAL_SECS, 1); + UTIL.getConfiguration().setLong(RpcConnectionRegistry.MIN_SECS_BETWEEN_REFRESHES, 0); + UTIL.startMiniCluster(3); + HBaseTestingUtil.setReplicas(UTIL.getAdmin(), TableName.META_TABLE_NAME, 3); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + UTIL.shutdownMiniCluster(); + } + + @Before + public void setUp() throws IOException { + registry = new RpcConnectionRegistry(UTIL.getConfiguration()); + } + + @After + public void tearDown() throws IOException { + Closeables.close(registry, true); + } + + @Test + public void testRegistryRPCs() throws Exception { + HMaster activeMaster = UTIL.getHBaseCluster().getMaster(); + // wait until we switch to use region servers + UTIL.waitFor(10000, () -> registry.getParsedServers().size() == 3); + assertThat(registry.getParsedServers(), + hasItems(activeMaster.getServerManager().getOnlineServersList().toArray(new ServerName[0]))); + + // Add wait on all replicas being assigned before proceeding w/ test. Failed on occasion + // because not all replicas had made it up before test started. + RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(UTIL, registry); + + assertEquals(registry.getClusterId().get(), activeMaster.getClusterId()); + assertEquals(registry.getActiveMaster().get(), activeMaster.getServerName()); + List metaLocations = + Arrays.asList(registry.getMetaRegionLocations().get().getRegionLocations()); + List actualMetaLocations = + activeMaster.getMetaRegionLocationCache().getMetaRegionLocations().get(); + Collections.sort(metaLocations); + Collections.sort(actualMetaLocations); + assertEquals(actualMetaLocations, metaLocations); + } +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java index ef5225201534..70264ed935ec 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestScannersFromClientSide.java @@ -133,7 +133,7 @@ private static boolean isSameParameterizedCluster(Class registryImpl, int num Class confClass = conf.getClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, ZKConnectionRegistry.class); int hedgedReqConfig = conf.getInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, - MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_DEFAULT); + AbstractRpcBasedConnectionRegistry.HEDGED_REQS_FANOUT_DEFAULT); return confClass.getName().equals(registryImpl.getName()) && numHedgedReqs == hedgedReqConfig; } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestClientMetaServiceRPCs.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestClientMetaServiceRPCs.java index d17ecf8d7bac..cf825492d468 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestClientMetaServiceRPCs.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/TestClientMetaServiceRPCs.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_TIMEOUT; import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY; import static org.junit.Assert.assertEquals; + import java.io.IOException; import java.util.ArrayList; import java.util.Collections; @@ -45,14 +46,15 @@ import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; + import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.ClientMetaService; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetActiveMasterResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetClusterIdResponse; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.GetMetaRegionLocationsResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsResponse; @Category({MediumTests.class, MasterTests.class}) public class TestClientMetaServiceRPCs { diff --git a/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RegionServerAddressTracker.java b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RegionServerAddressTracker.java new file mode 100644 index 000000000000..e478639737fe --- /dev/null +++ b/hbase-zookeeper/src/main/java/org/apache/hadoop/hbase/zookeeper/RegionServerAddressTracker.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper; + +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.ServerName; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; + +/** + * Class for tracking the region servers for a cluster. + */ +@InterfaceAudience.Private +public class RegionServerAddressTracker extends ZKListener { + + private static final Logger LOG = LoggerFactory.getLogger(RegionServerAddressTracker.class); + + private volatile List regionServers = Collections.emptyList(); + + private final Abortable abortable; + + public RegionServerAddressTracker(ZKWatcher watcher, Abortable abortable) { + super(watcher); + this.abortable = abortable; + watcher.registerListener(this); + loadRegionServerList(); + } + + private void loadRegionServerList() { + List names; + try { + names = ZKUtil.listChildrenAndWatchForNewChildren(watcher, watcher.getZNodePaths().rsZNode); + } catch (KeeperException e) { + LOG.error("failed to list region servers", e); + abortable.abort("failed to list region servers", e); + return; + } + if (CollectionUtils.isEmpty(names)) { + regionServers = Collections.emptyList(); + } else { + regionServers = names.stream().map(ServerName::parseServerName) + .collect(Collectors.collectingAndThen(Collectors.toList(), Collections::unmodifiableList)); + } + } + + @Override + public void nodeChildrenChanged(String path) { + if (path.equals(watcher.getZNodePaths().rsZNode)) { + loadRegionServerList(); + } + } + + public List getRegionServers() { + return regionServers; + } +} diff --git a/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestRegionServerAddressTracker.java b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestRegionServerAddressTracker.java new file mode 100644 index 000000000000..8a7b53b1b6b2 --- /dev/null +++ b/hbase-zookeeper/src/test/java/org/apache/hadoop/hbase/zookeeper/TestRegionServerAddressTracker.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.zookeeper; + +import static org.hamcrest.CoreMatchers.hasItems; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertEquals; + +import java.io.IOException; +import org.apache.hadoop.hbase.Abortable; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HBaseZKTestingUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.ZooKeeperConnectionException; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.testclassification.ZKTests; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.zookeeper.KeeperException; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.io.Closeables; + +@Category({ ZKTests.class, MediumTests.class }) +public class TestRegionServerAddressTracker { + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestRegionServerAddressTracker.class); + + private static final Logger LOG = LoggerFactory.getLogger(TestRegionServerAddressTracker.class); + + private static final HBaseZKTestingUtil TEST_UTIL = new HBaseZKTestingUtil(); + + private ZKWatcher zk; + + private RegionServerAddressTracker tracker; + + @Rule + public TestName name = new TestName(); + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.startMiniZKCluster(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniZKCluster(); + } + + @Before + public void setUp() throws ZooKeeperConnectionException, IOException, KeeperException { + TEST_UTIL.getConfiguration().set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/" + name.getMethodName()); + zk = new ZKWatcher(TEST_UTIL.getConfiguration(), name.getMethodName(), null); + ZKUtil.createWithParents(zk, zk.getZNodePaths().rsZNode); + tracker = new RegionServerAddressTracker(zk, new WarnOnlyAbortable()); + } + + @After + public void tearDown() throws IOException { + Closeables.close(zk, true); + } + + @Test + public void test() throws KeeperException { + ServerName rs1 = ServerName.valueOf("127.0.0.1", 16000, EnvironmentEdgeManager.currentTime()); + ZKUtil.createWithParents(zk, ZNodePaths.joinZNode(zk.getZNodePaths().rsZNode, rs1.toString())); + TEST_UTIL.waitFor(10000, () -> tracker.getRegionServers().size() == 1); + assertEquals(rs1, tracker.getRegionServers().get(0)); + + ServerName rs2 = ServerName.valueOf("127.0.0.2", 16000, EnvironmentEdgeManager.currentTime()); + ZKUtil.createWithParents(zk, ZNodePaths.joinZNode(zk.getZNodePaths().rsZNode, rs2.toString())); + TEST_UTIL.waitFor(10000, () -> tracker.getRegionServers().size() == 2); + assertThat(tracker.getRegionServers(), hasItems(rs1, rs2)); + + ZKUtil.deleteNode(zk, ZNodePaths.joinZNode(zk.getZNodePaths().rsZNode, rs1.toString())); + TEST_UTIL.waitFor(10000, () -> tracker.getRegionServers().size() == 1); + assertEquals(rs2, tracker.getRegionServers().get(0)); + + ZKUtil.deleteNode(zk, ZNodePaths.joinZNode(zk.getZNodePaths().rsZNode, rs2.toString())); + TEST_UTIL.waitFor(10000, () -> tracker.getRegionServers().isEmpty()); + } + + private static final class WarnOnlyAbortable implements Abortable { + @Override + public void abort(String why, Throwable e) { + LOG.warn("RegionServerAddressTracker received abort, ignoring. Reason: {}", why, e); + } + + @Override + public boolean isAborted() { + return false; + } + } +}