Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
package org.apache.hadoop.hbase.client;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.security.PrivilegedExceptionAction;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.ReflectionUtils;
Expand All @@ -38,18 +40,8 @@ public final class ClusterConnectionFactory {
private ClusterConnectionFactory() {
}

/**
* Create a new {@link AsyncClusterConnection} instance.
* <p/>
* Unlike what we have done in {@link ConnectionFactory}, here we just return an
* {@link AsyncClusterConnection} instead of a {@link java.util.concurrent.CompletableFuture},
* which means this method could block on fetching the cluster id. This is just used to simplify
* the implementation, as when starting new region servers, we do not need to be event-driven. Can
* change later if we want a {@link java.util.concurrent.CompletableFuture} here.
*/
public static AsyncClusterConnection createAsyncClusterConnection(Configuration conf,
SocketAddress localAddress, User user) throws IOException {
ConnectionRegistry registry = ConnectionRegistryFactory.getRegistry(conf);
private static AsyncClusterConnection createAsyncClusterConnection(Configuration conf,
ConnectionRegistry registry, SocketAddress localAddress, User user) throws IOException {
String clusterId = FutureUtils.get(registry.getClusterId());
Class<? extends AsyncClusterConnection> clazz =
conf.getClass(HBASE_SERVER_CLUSTER_CONNECTION_IMPL, AsyncClusterConnectionImpl.class,
Expand All @@ -62,4 +54,32 @@ public static AsyncClusterConnection createAsyncClusterConnection(Configuration
throw new IOException(e);
}
}

/**
* Create a new {@link AsyncClusterConnection} instance.
* <p/>
* Unlike what we have done in {@link ConnectionFactory}, here we just return an
* {@link AsyncClusterConnection} instead of a {@link java.util.concurrent.CompletableFuture},
* which means this method could block on fetching the cluster id. This is just used to simplify
* the implementation, as when starting new region servers, we do not need to be event-driven. Can
* change later if we want a {@link java.util.concurrent.CompletableFuture} here.
*/
public static AsyncClusterConnection createAsyncClusterConnection(Configuration conf,
SocketAddress localAddress, User user) throws IOException {
return createAsyncClusterConnection(conf, ConnectionRegistryFactory.getRegistry(conf),
localAddress, user);
}

/**
* Create a new {@link AsyncClusterConnection} instance for a region server.
*/
public static AsyncClusterConnection createAsyncClusterConnection(HRegionServer regionServer)
throws IOException {
RegionServerRegistry registry = new RegionServerRegistry(regionServer);
Configuration conf = regionServer.getConfiguration();
InetSocketAddress localAddress =
new InetSocketAddress(regionServer.getRSRpcServices().getSocketAddress().getAddress(), 0);
User user = regionServer.getUserProvider().getCurrent();
return createAsyncClusterConnection(conf, registry, localAddress, user);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;

import java.io.IOException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.yetus.audience.InterfaceAudience;

/**
* Connection registry implementation for region server.
*/
@InterfaceAudience.Private
public class RegionServerRegistry implements ConnectionRegistry {

private final HRegionServer regionServer;

public RegionServerRegistry(HRegionServer regionServer) {
this.regionServer = regionServer;
}

@Override
public CompletableFuture<RegionLocations> getMetaRegionLocations() {
CompletableFuture<RegionLocations> future = new CompletableFuture<>();
Optional<List<HRegionLocation>> locs =
regionServer.getMetaRegionLocationCache().getMetaRegionLocations();
if (locs.isPresent()) {
List<HRegionLocation> list = locs.get();
if (list.isEmpty()) {
future.completeExceptionally(new IOException("no meta location available"));
} else {
future.complete(new RegionLocations(list));
}
} else {
future.completeExceptionally(new IOException("no meta location available"));
}
return future;
}

@Override
public CompletableFuture<String> getClusterId() {
return CompletableFuture.completedFuture(regionServer.getClusterId());
}

@Override
public CompletableFuture<ServerName> getActiveMaster() {
CompletableFuture<ServerName> future = new CompletableFuture<>();
Optional<ServerName> activeMaster = regionServer.getActiveMaster();
if (activeMaster.isPresent()) {
future.complete(activeMaster.get());
} else {
future.completeExceptionally(new IOException("no active master available"));
}
return future;
}

@Override
public void close() {
// nothing
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -863,26 +863,6 @@ public boolean registerService(Service instance) {
return true;
}

private Configuration cleanupConfiguration() {
Configuration conf = this.conf;
// We use ZKConnectionRegistry for all the internal communication, primarily for these reasons:
// - Decouples RS and master life cycles. RegionServers can continue be up independent of
// masters' availability.
// - Configuration management for region servers (cluster internal) is much simpler when adding
// new masters or removing existing masters, since only clients' config needs to be updated.
// - We need to retain ZKConnectionRegistry for replication use anyway, so we just extend it for
// other internal connections too.
conf.set(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY,
HConstants.ZK_CONNECTION_REGISTRY_CLASS);
if (conf.get(HConstants.CLIENT_ZOOKEEPER_QUORUM) != null) {
// Use server ZK cluster for server-issued connections, so we clone
// the conf and unset the client ZK related properties
conf = new Configuration(this.conf);
conf.unset(HConstants.CLIENT_ZOOKEEPER_QUORUM);
}
return conf;
}

/**
* Run test on configured codecs to make sure supporting libs are in place.
*/
Expand All @@ -907,11 +887,7 @@ public String getClusterId() {
*/
protected final synchronized void setupClusterConnection() throws IOException {
if (asyncClusterConnection == null) {
Configuration conf = cleanupConfiguration();
InetSocketAddress localAddress = new InetSocketAddress(this.rpcServices.isa.getAddress(), 0);
User user = userProvider.getCurrent();
asyncClusterConnection =
ClusterConnectionFactory.createAsyncClusterConnection(conf, localAddress, user);
asyncClusterConnection = ClusterConnectionFactory.createAsyncClusterConnection(this);
}
}

Expand Down Expand Up @@ -4022,4 +3998,8 @@ public List<ServerName> getRegionServers() {
public MetaRegionLocationCache getMetaRegionLocationCache() {
return this.metaRegionLocationCache;
}

public UserProvider getUserProvider() {
return userProvider;
}
}