diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java index 0094ffad3993..b0e067f98192 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java @@ -18,9 +18,11 @@ package org.apache.hadoop.hbase.client; import java.io.IOException; +import java.net.InetSocketAddress; import java.net.SocketAddress; import java.security.PrivilegedExceptionAction; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.util.ReflectionUtils; @@ -38,18 +40,8 @@ public final class ClusterConnectionFactory { private ClusterConnectionFactory() { } - /** - * Create a new {@link AsyncClusterConnection} instance. - *

- * Unlike what we have done in {@link ConnectionFactory}, here we just return an - * {@link AsyncClusterConnection} instead of a {@link java.util.concurrent.CompletableFuture}, - * which means this method could block on fetching the cluster id. This is just used to simplify - * the implementation, as when starting new region servers, we do not need to be event-driven. Can - * change later if we want a {@link java.util.concurrent.CompletableFuture} here. - */ - public static AsyncClusterConnection createAsyncClusterConnection(Configuration conf, - SocketAddress localAddress, User user) throws IOException { - ConnectionRegistry registry = ConnectionRegistryFactory.getRegistry(conf); + private static AsyncClusterConnection createAsyncClusterConnection(Configuration conf, + ConnectionRegistry registry, SocketAddress localAddress, User user) throws IOException { String clusterId = FutureUtils.get(registry.getClusterId()); Class clazz = conf.getClass(HBASE_SERVER_CLUSTER_CONNECTION_IMPL, AsyncClusterConnectionImpl.class, @@ -62,4 +54,32 @@ public static AsyncClusterConnection createAsyncClusterConnection(Configuration throw new IOException(e); } } + + /** + * Create a new {@link AsyncClusterConnection} instance. + *

+ * Unlike what we have done in {@link ConnectionFactory}, here we just return an + * {@link AsyncClusterConnection} instead of a {@link java.util.concurrent.CompletableFuture}, + * which means this method could block on fetching the cluster id. This is just used to simplify + * the implementation, as when starting new region servers, we do not need to be event-driven. Can + * change later if we want a {@link java.util.concurrent.CompletableFuture} here. + */ + public static AsyncClusterConnection createAsyncClusterConnection(Configuration conf, + SocketAddress localAddress, User user) throws IOException { + return createAsyncClusterConnection(conf, ConnectionRegistryFactory.getRegistry(conf), + localAddress, user); + } + + /** + * Create a new {@link AsyncClusterConnection} instance for a region server. + */ + public static AsyncClusterConnection createAsyncClusterConnection(HRegionServer regionServer) + throws IOException { + RegionServerRegistry registry = new RegionServerRegistry(regionServer); + Configuration conf = regionServer.getConfiguration(); + InetSocketAddress localAddress = + new InetSocketAddress(regionServer.getRSRpcServices().getSocketAddress().getAddress(), 0); + User user = regionServer.getUserProvider().getCurrent(); + return createAsyncClusterConnection(conf, registry, localAddress, user); + } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/RegionServerRegistry.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/RegionServerRegistry.java new file mode 100644 index 000000000000..cdfbb6d925f4 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/RegionServerRegistry.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.IOException; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.RegionLocations; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Connection registry implementation for region server. + */ +@InterfaceAudience.Private +public class RegionServerRegistry implements ConnectionRegistry { + + private final HRegionServer regionServer; + + public RegionServerRegistry(HRegionServer regionServer) { + this.regionServer = regionServer; + } + + @Override + public CompletableFuture getMetaRegionLocations() { + CompletableFuture future = new CompletableFuture<>(); + Optional> locs = + regionServer.getMetaRegionLocationCache().getMetaRegionLocations(); + if (locs.isPresent()) { + List list = locs.get(); + if (list.isEmpty()) { + future.completeExceptionally(new IOException("no meta location available")); + } else { + future.complete(new RegionLocations(list)); + } + } else { + future.completeExceptionally(new IOException("no meta location available")); + } + return future; + } + + @Override + public CompletableFuture getClusterId() { + return CompletableFuture.completedFuture(regionServer.getClusterId()); + } + + @Override + public CompletableFuture getActiveMaster() { + CompletableFuture future = new CompletableFuture<>(); + Optional activeMaster = regionServer.getActiveMaster(); + if (activeMaster.isPresent()) { + future.complete(activeMaster.get()); + } else { + future.completeExceptionally(new IOException("no active master available")); + } + return future; + } + + @Override + public void close() { + // nothing + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 90edff2bb90b..b2cc0f5bb7c2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -863,26 +863,6 @@ public boolean registerService(Service instance) { return true; } - private Configuration cleanupConfiguration() { - Configuration conf = this.conf; - // We use ZKConnectionRegistry for all the internal communication, primarily for these reasons: - // - Decouples RS and master life cycles. RegionServers can continue be up independent of - // masters' availability. - // - Configuration management for region servers (cluster internal) is much simpler when adding - // new masters or removing existing masters, since only clients' config needs to be updated. - // - We need to retain ZKConnectionRegistry for replication use anyway, so we just extend it for - // other internal connections too. - conf.set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, - HConstants.ZK_CONNECTION_REGISTRY_CLASS); - if (conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM) != null) { - // Use server ZK cluster for server-issued connections, so we clone - // the conf and unset the client ZK related properties - conf = new Configuration(this.conf); - conf.unset(HConstants.CLIENT_ZOOKEEPER_QUORUM); - } - return conf; - } - /** * Run test on configured codecs to make sure supporting libs are in place. */ @@ -907,11 +887,7 @@ public String getClusterId() { */ protected final synchronized void setupClusterConnection() throws IOException { if (asyncClusterConnection == null) { - Configuration conf = cleanupConfiguration(); - InetSocketAddress localAddress = new InetSocketAddress(this.rpcServices.isa.getAddress(), 0); - User user = userProvider.getCurrent(); - asyncClusterConnection = - ClusterConnectionFactory.createAsyncClusterConnection(conf, localAddress, user); + asyncClusterConnection = ClusterConnectionFactory.createAsyncClusterConnection(this); } } @@ -4022,4 +3998,8 @@ public List getRegionServers() { public MetaRegionLocationCache getMetaRegionLocationCache() { return this.metaRegionLocationCache; } + + public UserProvider getUserProvider() { + return userProvider; + } }