From bafc84be6dc348ac561249bb492bd5a6f18e69bf Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Thu, 25 Jan 2024 17:06:46 +0800 Subject: [PATCH 1/2] HBASE-25051 DIGEST based auth broken for rpc based ConnectionRegistry --- .../AbstractRpcBasedConnectionRegistry.java | 61 +++---- .../hadoop/hbase/client/ClusterIdFetcher.java | 134 +++++++++++++++ .../hbase/client/ConnectionFactory.java | 2 +- .../client/ConnectionOverAsyncConnection.java | 2 +- .../client/ConnectionRegistryFactory.java | 5 +- .../ConnectionRegistryRpcStubHolder.java | 161 ++++++++++++++++++ .../hadoop/hbase/client/ConnectionUtils.java | 10 ++ .../hadoop/hbase/client/MasterRegistry.java | 8 +- .../hbase/client/RpcConnectionRegistry.java | 7 +- .../hbase/client/TableOverAsyncTable.java | 2 +- .../hbase/client/ZKConnectionRegistry.java | 4 +- .../hadoop/hbase/ipc/AbstractRpcClient.java | 2 +- .../hbase/ipc/BlockingRpcConnection.java | 103 +++-------- .../org/apache/hadoop/hbase/ipc/Call.java | 5 + .../hadoop/hbase/ipc/NettyRpcConnection.java | 26 ++- .../hbase/ipc/NettyRpcDuplexHandler.java | 88 +--------- .../apache/hadoop/hbase/ipc/RpcClient.java | 2 + .../hadoop/hbase/ipc/RpcConnection.java | 120 ++++++++++++- .../BuiltInSaslAuthenticationProvider.java | 6 + .../DigestSaslAuthenticationProvider.java | 5 +- .../GssSaslAuthenticationProvider.java | 5 +- .../SimpleSaslAuthenticationProvider.java | 6 +- .../client/DoNothingConnectionRegistry.java | 5 +- .../client/TestAsyncAdminRpcPriority.java | 7 +- .../client/TestAsyncConnectionTracing.java | 7 +- .../TestAsyncMetaRegionLocatorFailFast.java | 11 +- .../client/TestAsyncRegionLocatorTracing.java | 6 +- .../client/TestAsyncTableRpcPriority.java | 6 +- .../hbase/client/TestAsyncTableTracing.java | 60 +++---- .../client/TestConnectionRegistryLeak.java | 5 +- .../TestRpcBasedRegistryHedgedReads.java | 14 +- .../hbase/ipc/TestTLSHandshadeFailure.java | 7 +- .../hadoop/hbase/util/ConcurrentMapUtils.java | 8 - .../hbase/util/IOExceptionSupplier.java | 30 ++++ .../src/main/protobuf/server/Registry.proto | 19 ++- .../client/ClusterConnectionFactory.java | 2 +- .../hadoop/hbase/ipc/NettyRpcServer.java | 9 +- .../ipc/NettyRpcServerPreambleHandler.java | 18 +- .../hadoop/hbase/ipc/ServerRpcConnection.java | 79 ++++++--- .../hbase/ipc/SimpleServerRpcConnection.java | 19 ++- .../apache/hadoop/hbase/wal/WALSplitUtil.java | 2 +- .../TestZooKeeperTableArchiveClient.java | 13 +- .../client/AbstractTestRegionLocator.java | 3 +- .../hbase/client/DummyConnectionRegistry.java | 57 ------- .../TestAsyncAdminWithRegionReplicas.java | 3 +- .../client/TestAsyncMetaRegionLocator.java | 4 +- .../client/TestAsyncNonMetaRegionLocator.java | 4 +- ...ncNonMetaRegionLocatorConcurrenyLimit.java | 2 +- .../hbase/client/TestAsyncRegionLocator.java | 2 +- ...stAsyncSingleRequestRpcRetryingCaller.java | 2 +- .../client/TestAsyncTableUseMetaReplicas.java | 4 +- .../hbase/client/TestBootstrapNodeUpdate.java | 3 +- ...talogReplicaLoadBalanceSimpleSelector.java | 3 +- .../hbase/client/TestMasterRegistry.java | 54 +++--- .../client/TestMetaRegionLocationCache.java | 4 +- .../client/TestRpcConnectionRegistry.java | 57 ++++++- .../client/TestZKConnectionRegistry.java | 6 +- .../hadoop/hbase/ipc/AbstractTestIPC.java | 83 ++++++++- .../hadoop/hbase/ipc/TestBlockingIPC.java | 4 +- .../apache/hadoop/hbase/ipc/TestNettyIPC.java | 5 +- .../hadoop/hbase/ipc/TestNettyTlsIPC.java | 23 ++- .../regionserver/TestWALEntrySinkFilter.java | 13 +- .../token/TestGenerateDelegationToken.java | 80 ++++++--- 63 files changed, 1024 insertions(+), 483 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterIdFetcher.java create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryRpcStubHolder.java create mode 100644 hbase-common/src/main/java/org/apache/hadoop/hbase/util/IOExceptionSupplier.java delete mode 100644 hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyConnectionRegistry.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRpcBasedConnectionRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRpcBasedConnectionRegistry.java index 4e97dcab24dd..62c6951b4535 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRpcBasedConnectionRegistry.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AbstractRpcBasedConnectionRegistry.java @@ -33,22 +33,17 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.exceptions.ClientExceptionsUtil; import org.apache.hadoop.hbase.exceptions.MasterRegistryFetchException; import org.apache.hadoop.hbase.ipc.HBaseRpcController; -import org.apache.hadoop.hbase.ipc.RpcClient; -import org.apache.hadoop.hbase.ipc.RpcClientFactory; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.yetus.audience.InterfaceAudience; -import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; -import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; import org.apache.hbase.thirdparty.com.google.protobuf.Message; import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; @@ -79,30 +74,21 @@ abstract class AbstractRpcBasedConnectionRegistry implements ConnectionRegistry private final int hedgedReadFanOut; - // Configured list of end points to probe the meta information from. - private volatile ImmutableMap addr2Stub; - // RPC client used to talk to the masters. - private final RpcClient rpcClient; + private final ConnectionRegistryRpcStubHolder rpcStubHolder; private final RpcControllerFactory rpcControllerFactory; - private final int rpcTimeoutMs; private final RegistryEndpointsRefresher registryEndpointRefresher; - protected AbstractRpcBasedConnectionRegistry(Configuration conf, + protected AbstractRpcBasedConnectionRegistry(Configuration conf, User user, String hedgedReqsFanoutConfigName, String initialRefreshDelaySecsConfigName, String refreshIntervalSecsConfigName, String minRefreshIntervalSecsConfigName) throws IOException { this.hedgedReadFanOut = Math.max(1, conf.getInt(hedgedReqsFanoutConfigName, HEDGED_REQS_FANOUT_DEFAULT)); - rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE, - conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); - // XXX: we pass cluster id as null here since we do not have a cluster id yet, we have to fetch - // this through the master registry... - // This is a problem as we will use the cluster id to determine the authentication method - rpcClient = RpcClientFactory.createClient(conf, null); rpcControllerFactory = RpcControllerFactory.instantiate(conf); - populateStubs(getBootstrapNodes(conf)); + rpcStubHolder = new ConnectionRegistryRpcStubHolder(conf, user, rpcControllerFactory, + getBootstrapNodes(conf)); // could return null here is refresh interval is less than zero registryEndpointRefresher = RegistryEndpointsRefresher.create(conf, initialRefreshDelaySecsConfigName, @@ -114,19 +100,7 @@ protected AbstractRpcBasedConnectionRegistry(Configuration conf, protected abstract CompletableFuture> fetchEndpoints(); private void refreshStubs() throws IOException { - populateStubs(FutureUtils.get(fetchEndpoints())); - } - - private void populateStubs(Set addrs) throws IOException { - Preconditions.checkNotNull(addrs); - ImmutableMap.Builder builder = - ImmutableMap.builderWithExpectedSize(addrs.size()); - User user = User.getCurrent(); - for (ServerName masterAddr : addrs) { - builder.put(masterAddr, - ClientMetaService.newStub(rpcClient.createRpcChannel(masterAddr, user, rpcTimeoutMs))); - } - addr2Stub = builder.build(); + rpcStubHolder.refreshStubs(() -> FutureUtils.get(fetchEndpoints())); } /** @@ -211,20 +185,25 @@ private void groupCall(CompletableFuture future, Set CompletableFuture call(Callable callable, Predicate isValidResp, String debug) { - ImmutableMap addr2StubRef = addr2Stub; - Set servers = addr2StubRef.keySet(); - List stubs = new ArrayList<>(addr2StubRef.values()); - Collections.shuffle(stubs, ThreadLocalRandom.current()); CompletableFuture future = new CompletableFuture<>(); - groupCall(future, servers, stubs, 0, callable, isValidResp, debug, - new ConcurrentLinkedQueue<>()); + FutureUtils.addListener(rpcStubHolder.getStubs(), (addr2Stub, error) -> { + if (error != null) { + future.completeExceptionally(error); + return; + } + Set servers = addr2Stub.keySet(); + List stubs = new ArrayList<>(addr2Stub.values()); + Collections.shuffle(stubs, ThreadLocalRandom.current()); + groupCall(future, servers, stubs, 0, callable, isValidResp, debug, + new ConcurrentLinkedQueue<>()); + }); return future; } @RestrictedApi(explanation = "Should only be called in tests", link = "", allowedOnPath = ".*/src/test/.*") - Set getParsedServers() { - return addr2Stub.keySet(); + Set getParsedServers() throws IOException { + return FutureUtils.get(rpcStubHolder.getStubs()).keySet(); } /** @@ -277,8 +256,8 @@ public void close() { if (registryEndpointRefresher != null) { registryEndpointRefresher.stop(); } - if (rpcClient != null) { - rpcClient.close(); + if (rpcStubHolder != null) { + rpcStubHolder.close(); } }, getClass().getSimpleName() + ".close"); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterIdFetcher.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterIdFetcher.java new file mode 100644 index 000000000000..277629681ec6 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterIdFetcher.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.ipc.RpcClient; +import org.apache.hadoop.hbase.ipc.RpcClientFactory; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.util.FutureUtils; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; +import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ConnectionRegistryService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetConnectionRegistryRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetConnectionRegistryResponse; + +/** + * Fetch cluster id through special preamble header. + *

+ * An instance of this class should only be used once, like: + * + *

+ * new ClusterIdFetcher().fetchClusterId()
+ * 
+ * + * Calling the fetchClusterId multiple times will lead unexpected behavior. + *

+ * See HBASE-25051 for more details. + */ +@InterfaceAudience.Private +class ClusterIdFetcher { + + private static final Logger LOG = LoggerFactory.getLogger(ClusterIdFetcher.class); + + private final List bootstrapServers; + + private final User user; + + private final RpcClient rpcClient; + + private final RpcControllerFactory rpcControllerFactory; + + private final CompletableFuture future; + + ClusterIdFetcher(Configuration conf, User user, RpcControllerFactory rpcControllerFactory, + Set bootstrapServers) { + this.user = user; + // use null cluster id here as we do not know the cluster id yet, we will fetch it through this + // rpc client + this.rpcClient = RpcClientFactory.createClient(conf, null); + this.rpcControllerFactory = rpcControllerFactory; + this.bootstrapServers = new ArrayList(bootstrapServers); + // shuffle the bootstrap servers so we will not always fetch from the same one + Collections.shuffle(this.bootstrapServers); + future = new CompletableFuture(); + } + + /** + * Try get cluster id from the server with the given {@code index} in {@link #bootstrapServers}. + */ + private void getClusterId(int index) { + ServerName server = bootstrapServers.get(index); + LOG.debug("Going to request {} for getting cluster id", server); + // user and rpcTimeout are both not important here, as we will not actually send any rpc calls + // out, only a preamble connection header, but if we pass null as user, there will be NPE in + // some code paths... + RpcChannel channel = rpcClient.createRpcChannel(server, user, 0); + ConnectionRegistryService.Interface stub = ConnectionRegistryService.newStub(channel); + HBaseRpcController controller = rpcControllerFactory.newController(); + stub.getConnectionRegistry(controller, GetConnectionRegistryRequest.getDefaultInstance(), + new RpcCallback() { + + @Override + public void run(GetConnectionRegistryResponse resp) { + if (!controller.failed()) { + LOG.debug("Got connection registry info: {}", resp); + future.complete(resp.getClusterId()); + return; + } + if (ConnectionUtils.isUnexpectedPreambleHeaderException(controller.getFailed())) { + // this means we have connected to an old server where it does not support passing + // cluster id through preamble connnection header, so we fallback to use null + // cluster id, which is the old behavior + LOG.debug("Failed to get connection registry info, should be an old server," + + " fallback to use null cluster id", controller.getFailed()); + future.complete(null); + } else { + LOG.debug("Failed to get connection registry info", controller.getFailed()); + if (index == bootstrapServers.size() - 1) { + future.completeExceptionally(controller.getFailed()); + } else { + // try next bootstrap server + getClusterId(index + 1); + } + } + } + }); + + } + + CompletableFuture fetchClusterId() { + getClusterId(0); + // close the rpc client after we finish the request + FutureUtils.addListener(future, (r, e) -> rpcClient.close()); + return future; + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java index ac70091dcf65..716fb4863fe8 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java @@ -338,7 +338,7 @@ public static CompletableFuture createAsyncConnection(Configura final User user, Map connectionAttributes) { return TraceUtil.tracedFuture(() -> { CompletableFuture future = new CompletableFuture<>(); - ConnectionRegistry registry = ConnectionRegistryFactory.getRegistry(conf); + ConnectionRegistry registry = ConnectionRegistryFactory.getRegistry(conf, user); addListener(registry.getClusterId(), (clusterId, error) -> { if (error != null) { registry.close(); diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java index 51368fc23c15..30c348e6d1f1 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java @@ -29,8 +29,8 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.log.HBaseMarkers; -import org.apache.hadoop.hbase.util.ConcurrentMapUtils.IOExceptionSupplier; import org.apache.hadoop.hbase.util.FutureUtils; +import org.apache.hadoop.hbase.util.IOExceptionSupplier; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java index f198c3c22002..415d46397b8f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryFactory.java @@ -20,6 +20,7 @@ import static org.apache.hadoop.hbase.HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.ReflectionUtils; import org.apache.yetus.audience.InterfaceAudience; @@ -33,10 +34,10 @@ private ConnectionRegistryFactory() { } /** Returns The connection registry implementation to use. */ - static ConnectionRegistry getRegistry(Configuration conf) { + static ConnectionRegistry getRegistry(Configuration conf, User user) { Class clazz = conf.getClass(CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, RpcConnectionRegistry.class, ConnectionRegistry.class); - return ReflectionUtils.newInstance(clazz, conf); + return ReflectionUtils.newInstance(clazz, conf, user); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryRpcStubHolder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryRpcStubHolder.java new file mode 100644 index 000000000000..11a37b4afac3 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryRpcStubHolder.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.ipc.RpcClient; +import org.apache.hadoop.hbase.ipc.RpcClientFactory; +import org.apache.hadoop.hbase.ipc.RpcControllerFactory; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.util.FutureUtils; +import org.apache.hadoop.hbase.util.IOExceptionSupplier; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; +import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap; + +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService; + +/** + * A class for creating {@link RpcClient} and related stubs used by + * {@link AbstractRpcBasedConnectionRegistry}. We need to connect to bootstrap nodes to get the + * cluster id first, before creating the final {@link RpcClient} and related stubs. + *

+ * See HBASE-25051 for more details. + */ +@InterfaceAudience.Private +class ConnectionRegistryRpcStubHolder implements Closeable { + + private static final Logger LOG = LoggerFactory.getLogger(ConnectionRegistryRpcStubHolder.class); + + private final Configuration conf; + + // used for getting cluster id + private final Configuration noAuthConf; + + private final User user; + + private final RpcControllerFactory rpcControllerFactory; + + private final Set bootstrapNodes; + + private final int rpcTimeoutMs; + + private volatile ImmutableMap addr2Stub; + + private volatile RpcClient rpcClient; + + private CompletableFuture> addr2StubFuture; + + ConnectionRegistryRpcStubHolder(Configuration conf, User user, + RpcControllerFactory rpcControllerFactory, Set bootstrapNodes) { + this.conf = conf; + if (User.isHBaseSecurityEnabled(conf)) { + this.noAuthConf = new Configuration(conf); + this.noAuthConf.set(User.HBASE_SECURITY_CONF_KEY, "simple"); + } else { + this.noAuthConf = conf; + } + this.user = user; + this.rpcControllerFactory = rpcControllerFactory; + this.bootstrapNodes = Collections.unmodifiableSet(bootstrapNodes); + this.rpcTimeoutMs = (int) Math.min(Integer.MAX_VALUE, + conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT)); + } + + private ImmutableMap createStubs(RpcClient rpcClient, + Collection addrs) { + LOG.debug("Going to use new servers to create stubs: {}", addrs); + Preconditions.checkNotNull(addrs); + ImmutableMap.Builder builder = + ImmutableMap.builderWithExpectedSize(addrs.size()); + for (ServerName masterAddr : addrs) { + builder.put(masterAddr, + ClientMetaService.newStub(rpcClient.createRpcChannel(masterAddr, user, rpcTimeoutMs))); + } + return builder.build(); + } + + private void fetchClusterIdAndCreateStubs() { + addr2StubFuture = new CompletableFuture<>(); + FutureUtils.addListener( + new ClusterIdFetcher(noAuthConf, user, rpcControllerFactory, bootstrapNodes).fetchClusterId(), + (clusterId, error) -> { + synchronized (ConnectionRegistryRpcStubHolder.this) { + if (error != null) { + addr2StubFuture.completeExceptionally(error); + } else { + RpcClient c = RpcClientFactory.createClient(conf, clusterId); + ImmutableMap m = + createStubs(c, bootstrapNodes); + rpcClient = c; + addr2Stub = m; + addr2StubFuture.complete(m); + } + addr2StubFuture = null; + } + }); + } + + CompletableFuture> getStubs() { + ImmutableMap s = this.addr2Stub; + if (s != null) { + return CompletableFuture.completedFuture(s); + } + synchronized (this) { + s = this.addr2Stub; + if (s != null) { + return CompletableFuture.completedFuture(s); + } + if (addr2StubFuture != null) { + return addr2StubFuture; + } + fetchClusterIdAndCreateStubs(); + return addr2StubFuture; + } + } + + void refreshStubs(IOExceptionSupplier> fetchEndpoints) throws IOException { + // There is no actual call yet so we have not initialize the rpc client and related stubs yet, + // give up refreshing + if (addr2Stub == null) { + LOG.debug("Skip refreshing stubs as we have not initialized rpc client yet"); + return; + } + LOG.debug("Going to refresh stubs"); + assert rpcClient != null; + addr2Stub = createStubs(rpcClient, fetchEndpoints.get()); + } + + @Override + public void close() { + if (rpcClient != null) { + rpcClient.close(); + } + } +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java index 4827708a02e3..d073fef929fd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.metrics.ScanMetrics; +import org.apache.hadoop.hbase.ipc.FatalConnectionException; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.ipc.ServerRpcController; import org.apache.hadoop.hbase.util.Bytes; @@ -663,4 +664,13 @@ static void setCoprocessorError(RpcController controller, Throwable error) { controller.setFailed(error.toString()); } } + + static boolean isUnexpectedPreambleHeaderException(IOException e) { + if (!(e instanceof RemoteException)) { + return false; + } + RemoteException re = (RemoteException) e; + return FatalConnectionException.class.getName().equals(re.getClassName()) + && re.getMessage().startsWith("Expected HEADER="); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java index b6f81c30f0bd..364180fe1414 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MasterRegistry.java @@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.DNS.ServerType; import org.apache.yetus.audience.InterfaceAudience; @@ -105,9 +106,10 @@ public static Set parseMasterAddrs(Configuration conf) throws Unknow private final String connectionString; - MasterRegistry(Configuration conf) throws IOException { - super(conf, MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, MASTER_REGISTRY_INITIAL_REFRESH_DELAY_SECS, - MASTER_REGISTRY_PERIODIC_REFRESH_INTERVAL_SECS, MASTER_REGISTRY_MIN_SECS_BETWEEN_REFRESHES); + MasterRegistry(Configuration conf, User user) throws IOException { + super(conf, user, MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, + MASTER_REGISTRY_INITIAL_REFRESH_DELAY_SECS, MASTER_REGISTRY_PERIODIC_REFRESH_INTERVAL_SECS, + MASTER_REGISTRY_MIN_SECS_BETWEEN_REFRESHES); connectionString = getConnectionString(conf); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcConnectionRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcConnectionRegistry.java index 2c320d3a9d1d..c3ed560923ff 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcConnectionRegistry.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcConnectionRegistry.java @@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.security.User; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hbase.thirdparty.com.google.common.base.Splitter; @@ -75,9 +76,9 @@ public class RpcConnectionRegistry extends AbstractRpcBasedConnectionRegistry { private final String connectionString; - RpcConnectionRegistry(Configuration conf) throws IOException { - super(conf, HEDGED_REQS_FANOUT_KEY, INITIAL_REFRESH_DELAY_SECS, PERIODIC_REFRESH_INTERVAL_SECS, - MIN_SECS_BETWEEN_REFRESHES); + RpcConnectionRegistry(Configuration conf, User user) throws IOException { + super(conf, user, HEDGED_REQS_FANOUT_KEY, INITIAL_REFRESH_DELAY_SECS, + PERIODIC_REFRESH_INTERVAL_SECS, MIN_SECS_BETWEEN_REFRESHES); connectionString = buildConnectionString(conf); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java index 0a7dabd476ce..8c61e8b584f9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java @@ -55,9 +55,9 @@ import org.apache.hadoop.hbase.trace.HBaseSemanticAttributes; import org.apache.hadoop.hbase.trace.TraceUtil; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.ConcurrentMapUtils.IOExceptionSupplier; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FutureUtils; +import org.apache.hadoop.hbase.util.IOExceptionSupplier; import org.apache.hadoop.hbase.util.Pair; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java index 1634b13ec7e8..0e13f0b83c91 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ZKConnectionRegistry.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.exceptions.DeserializationException; import org.apache.hadoop.hbase.master.RegionState; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient; import org.apache.hadoop.hbase.zookeeper.ZNodePaths; @@ -60,7 +61,8 @@ class ZKConnectionRegistry implements ConnectionRegistry { private final ZNodePaths znodePaths; - ZKConnectionRegistry(Configuration conf) { + // User not used, but for rpc based registry we need it + ZKConnectionRegistry(Configuration conf, User user) { this.znodePaths = new ZNodePaths(conf); this.zk = new ReadOnlyZKClient(conf); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java index 5926539d0679..7972cc08acd2 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java @@ -454,7 +454,7 @@ public void run(Call call) { } } - private static Address createAddr(ServerName sn) { + static Address createAddr(ServerName sn) { return Address.fromParts(sn.getHostname(), sn.getPort()); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java index f30b77c64fe9..0478000a2375 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java @@ -18,7 +18,6 @@ package org.apache.hadoop.hbase.ipc; import static org.apache.hadoop.hbase.ipc.IPCUtil.buildRequestHeader; -import static org.apache.hadoop.hbase.ipc.IPCUtil.getTotalSizeWhenWrittenDelimited; import static org.apache.hadoop.hbase.ipc.IPCUtil.setCancelled; import static org.apache.hadoop.hbase.ipc.IPCUtil.write; @@ -43,7 +42,6 @@ import java.util.concurrent.ThreadLocalRandom; import javax.security.sasl.SaslException; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.exceptions.ConnectionClosingException; import org.apache.hadoop.hbase.io.ByteArrayOutputStream; @@ -64,19 +62,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.com.google.protobuf.Message; import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback; -import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; import org.apache.hbase.thirdparty.io.netty.buffer.PooledByteBufAllocator; -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader; /** * Thread that reads responses and notifies callers. Each connection owns a socket connected to a @@ -218,7 +211,7 @@ public void cleanup(IOException e) { BlockingRpcConnection(BlockingRpcClient rpcClient, ConnectionId remoteId) throws IOException { super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId, rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor, - rpcClient.metrics, rpcClient.connectionAttributes); + rpcClient.cellBlockBuilder, rpcClient.metrics, rpcClient.connectionAttributes); this.rpcClient = rpcClient; this.connectionHeaderPreamble = getConnectionHeaderPreamble(); ConnectionHeader header = getConnectionHeader(); @@ -435,6 +428,15 @@ public Object run() throws IOException, InterruptedException { }); } + private void getConnectionRegistry(OutputStream outStream) throws IOException { + outStream.write(RpcClient.REGISTRY_PREAMBLE_HEADER); + } + + private void createStreams(InputStream inStream, OutputStream outStream) { + this.in = new DataInputStream(new BufferedInputStream(inStream)); + this.out = new DataOutputStream(new BufferedOutputStream(outStream)); + } + private void setupIOstreams() throws IOException { if (socket != null) { // The connection is already available. Perfect. @@ -462,6 +464,11 @@ private void setupIOstreams() throws IOException { InputStream inStream = NetUtils.getInputStream(socket); // This creates a socket with a write timeout. This timeout cannot be changed. OutputStream outStream = NetUtils.getOutputStream(socket, this.rpcClient.writeTO); + if (connectionRegistryCall != null) { + getConnectionRegistry(outStream); + createStreams(inStream, outStream); + break; + } // Write out the preamble -- MAGIC, version, and auth to use. writeConnectionHeaderPreamble(outStream); if (useSasl) { @@ -494,13 +501,11 @@ public Boolean run() throws IOException { // reconnecting because regionserver may change its sasl config after restart. } } - this.in = new DataInputStream(new BufferedInputStream(inStream)); - this.out = new DataOutputStream(new BufferedOutputStream(outStream)); + createStreams(inStream, outStream); // Now write out the connection header writeConnectionHeader(); // process the response from server for connection header if necessary processResponseForConnectionHeader(); - break; } } catch (Throwable t) { @@ -611,7 +616,9 @@ private void writeRequest(Call call) throws IOException { cellBlockMeta = null; } RequestHeader requestHeader = buildRequestHeader(call, cellBlockMeta); - + if (call.isConnectionRegistryCall()) { + connectionRegistryCall = call; + } setupIOstreams(); // Now we're going to write the call. We take the lock, then check that the connection @@ -646,77 +653,13 @@ private void writeRequest(Call call) throws IOException { * Receive a response. Because only one receiver, so no synchronization on in. */ private void readResponse() { - Call call = null; - boolean expectedCall = false; try { - // See HBaseServer.Call.setResponse for where we write out the response. - // Total size of the response. Unused. But have to read it in anyways. - int totalSize = in.readInt(); - - // Read the header - ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(in); - int id = responseHeader.getCallId(); - if (LOG.isTraceEnabled()) { - LOG.trace("got response header " + TextFormat.shortDebugString(responseHeader) - + ", totalSize: " + totalSize + " bytes"); - } - RemoteException remoteExc; - if (responseHeader.hasException()) { - ExceptionResponse exceptionResponse = responseHeader.getException(); - remoteExc = IPCUtil.createRemoteException(exceptionResponse); - if (IPCUtil.isFatalConnectionException(exceptionResponse)) { - // Here we will cleanup all calls so do not need to fall back, just return. - synchronized (this) { - closeConn(remoteExc); - } - return; - } - } else { - remoteExc = null; - } - - call = calls.remove(id); // call.done have to be set before leaving this method - expectedCall = (call != null && !call.isDone()); - if (!expectedCall) { - // So we got a response for which we have no corresponding 'call' here on the client-side. - // We probably timed out waiting, cleaned up all references, and now the server decides - // to return a response. There is nothing we can do w/ the response at this stage. Clean - // out the wire of the response so its out of the way and we can get other responses on - // this connection. - int readSoFar = getTotalSizeWhenWrittenDelimited(responseHeader); - int whatIsLeftToRead = totalSize - readSoFar; - LOG.debug("Unknown callId: " + id + ", skipping over this response of " + whatIsLeftToRead - + " bytes"); - IOUtils.skipFully(in, whatIsLeftToRead); - if (call != null) { - call.callStats.setResponseSizeBytes(totalSize); + readResponse(in, calls, remoteExc -> { + synchronized (this) { + closeConn(remoteExc); } - return; - } - call.callStats.setResponseSizeBytes(totalSize); - if (remoteExc != null) { - call.setException(remoteExc); - return; - } - Message value = null; - if (call.responseDefaultType != null) { - Message.Builder builder = call.responseDefaultType.newBuilderForType(); - ProtobufUtil.mergeDelimitedFrom(builder, in); - value = builder.build(); - } - CellScanner cellBlockScanner = null; - if (responseHeader.hasCellBlockMeta()) { - int size = responseHeader.getCellBlockMeta().getLength(); - byte[] cellBlock = new byte[size]; - IOUtils.readFully(this.in, cellBlock, 0, cellBlock.length); - cellBlockScanner = - this.rpcClient.cellBlockBuilder.createCellScanner(this.codec, this.compressor, cellBlock); - } - call.setResponse(value, cellBlockScanner); + }); } catch (IOException e) { - if (expectedCall) { - call.setException(e); - } if (e instanceof SocketTimeoutException) { // Clean up open calls but don't treat this as a fatal condition, // since we expect certain responses to not make it by the specified diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java index 669fc73a3bfa..d175ea0b6e90 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java @@ -34,6 +34,7 @@ import org.apache.hbase.thirdparty.io.netty.util.Timeout; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ConnectionRegistryService; /** A call waiting for a value. */ @InterfaceAudience.Private @@ -156,4 +157,8 @@ public synchronized boolean isDone() { public long getStartTime() { return this.callStats.getStartTime(); } + + public boolean isConnectionRegistryCall() { + return md.getService().equals(ConnectionRegistryService.getDescriptor()); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java index 408ea347e7a3..a0f8f10d1cf9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java @@ -104,7 +104,7 @@ class NettyRpcConnection extends RpcConnection { NettyRpcConnection(NettyRpcClient rpcClient, ConnectionId remoteId) throws IOException { super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId, rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor, - rpcClient.metrics, rpcClient.connectionAttributes); + rpcClient.cellBlockBuilder, rpcClient.metrics, rpcClient.connectionAttributes); this.rpcClient = rpcClient; this.eventLoop = rpcClient.group.next(); byte[] connectionHeaderPreamble = getConnectionHeaderPreamble(); @@ -274,6 +274,12 @@ public void operationComplete(Future future) throws Exception { }); } + private void getConnectionRegistry(Channel ch) throws IOException { + established(ch); + NettyFutureUtils.safeWriteAndFlush(ch, + Unpooled.directBuffer(6).writeBytes(RpcClient.REGISTRY_PREAMBLE_HEADER)); + } + private void connect() throws UnknownHostException { assert eventLoop.inEventLoop(); LOG.trace("Connecting to {}", remoteId.getAddress()); @@ -303,12 +309,16 @@ protected void initChannel(Channel ch) throws Exception { .addListener(new ChannelFutureListener() { private void succeed(Channel ch) throws IOException { - ch.writeAndFlush(connectionHeaderPreamble.retainedDuplicate()); + if (connectionRegistryCall != null) { + getConnectionRegistry(ch); + return; + } + NettyFutureUtils.safeWriteAndFlush(ch, connectionHeaderPreamble.retainedDuplicate()); if (useSasl) { saslNegotiate(ch); } else { // send the connection header to server - ch.write(connectionHeaderWithLength.retainedDuplicate()); + NettyFutureUtils.safeWrite(ch, connectionHeaderWithLength.retainedDuplicate()); established(ch); } } @@ -317,6 +327,9 @@ private void fail(Channel ch, Throwable error) { IOException ex = toIOE(error); LOG.warn("Exception encountered while connecting to the server " + remoteId.getAddress(), ex); + if (connectionRegistryCall != null) { + connectionRegistryCall.setException(ex); + } failInit(ch, ex); rpcClient.failedServers.addToFailedServers(remoteId.getAddress(), error); } @@ -346,6 +359,13 @@ public void operationComplete(ChannelFuture future) throws Exception { private void sendRequest0(Call call, HBaseRpcController hrc) throws IOException { assert eventLoop.inEventLoop(); + if (call.isConnectionRegistryCall()) { + connectionRegistryCall = call; + // For get connection registry call, we will send a special preamble header to get the + // response, instead of sending a real rpc call. See HBASE-25051 + connect(); + return; + } if (reloginInProgress) { throw new IOException(RpcConnectionConstants.RELOGIN_IS_IN_PROGRESS); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java index ad8c51568a32..44772ae2dbf9 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcDuplexHandler.java @@ -18,21 +18,16 @@ package org.apache.hadoop.hbase.ipc; import io.opentelemetry.context.Scope; -import java.io.EOFException; import java.io.IOException; import java.util.HashMap; import java.util.Map; -import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.exceptions.ConnectionClosedException; import org.apache.hadoop.io.compress.CompressionCodec; -import org.apache.hadoop.ipc.RemoteException; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hbase.thirdparty.com.google.protobuf.Message; -import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; import org.apache.hbase.thirdparty.io.netty.buffer.ByteBuf; import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufInputStream; import org.apache.hbase.thirdparty.io.netty.buffer.ByteBufOutputStream; @@ -44,9 +39,7 @@ import org.apache.hbase.thirdparty.io.netty.util.concurrent.PromiseCombiner; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; -import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader; /** * The netty rpc handler. @@ -127,88 +120,15 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) } } - private void finishCall(ResponseHeader responseHeader, ByteBufInputStream in, Call call) - throws IOException { - Message value; - if (call.responseDefaultType != null) { - Message.Builder builder = call.responseDefaultType.newBuilderForType(); - if (!builder.mergeDelimitedFrom(in)) { - // The javadoc of mergeDelimitedFrom says returning false means the stream reaches EOF - // before reading any bytes out, so here we need to manually finish create the EOFException - // and finish the call - call.setException(new EOFException("EOF while reading response with type: " - + call.responseDefaultType.getClass().getName())); - return; - } - value = builder.build(); - } else { - value = null; - } - CellScanner cellBlockScanner; - if (responseHeader.hasCellBlockMeta()) { - int size = responseHeader.getCellBlockMeta().getLength(); - // Maybe we could read directly from the ByteBuf. - // The problem here is that we do not know when to release it. - byte[] cellBlock = new byte[size]; - in.readFully(cellBlock); - cellBlockScanner = cellBlockBuilder.createCellScanner(this.codec, this.compressor, cellBlock); - } else { - cellBlockScanner = null; - } - call.setResponse(value, cellBlockScanner); - } - private void readResponse(ChannelHandlerContext ctx, ByteBuf buf) throws IOException { - int totalSize = buf.readInt(); - ByteBufInputStream in = new ByteBufInputStream(buf); - ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(in); - int id = responseHeader.getCallId(); - if (LOG.isTraceEnabled()) { - LOG.trace("got response header " + TextFormat.shortDebugString(responseHeader) - + ", totalSize: " + totalSize + " bytes"); - } - RemoteException remoteExc; - if (responseHeader.hasException()) { - ExceptionResponse exceptionResponse = responseHeader.getException(); - remoteExc = IPCUtil.createRemoteException(exceptionResponse); - if (IPCUtil.isFatalConnectionException(exceptionResponse)) { - // Here we will cleanup all calls so do not need to fall back, just return. - exceptionCaught(ctx, remoteExc); - return; - } - } else { - remoteExc = null; - } - Call call = id2Call.remove(id); - if (call == null) { - // So we got a response for which we have no corresponding 'call' here on the client-side. - // We probably timed out waiting, cleaned up all references, and now the server decides - // to return a response. There is nothing we can do w/ the response at this stage. Clean - // out the wire of the response so its out of the way and we can get other responses on - // this connection. - if (LOG.isDebugEnabled()) { - int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader); - int whatIsLeftToRead = totalSize - readSoFar; - LOG.debug("Unknown callId: " + id + ", skipping over this response of " + whatIsLeftToRead - + " bytes"); - } - return; - } - call.callStats.setResponseSizeBytes(totalSize); - if (remoteExc != null) { - call.setException(remoteExc); - return; - } try { - finishCall(responseHeader, in, call); + conn.readResponse(new ByteBufInputStream(buf), id2Call, + remoteExc -> exceptionCaught(ctx, remoteExc)); } catch (IOException e) { - // As the call has been removed from id2Call map, if we hit an exception here, the - // exceptionCaught method can not help us finish the call, so here we need to catch the - // exception and finish it - // And in netty, the decoding the frame based, when reaching here we have already read a full + // In netty, the decoding the frame based, when reaching here we have already read a full // frame, so hitting exception here does not mean the stream decoding is broken, thus we do // not need to throw the exception out and close the connection. - call.setException(e); + LOG.warn("failed to process response", e); } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java index 045216e88811..369430e337ae 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClient.java @@ -54,6 +54,8 @@ public interface RpcClient extends Closeable { // The client in 0.99+ does not ping the server. int PING_CALL_ID = -1; + byte[] REGISTRY_PREAMBLE_HEADER = new byte[] { 'R', 'e', 'g', 'i', 's', 't' }; + /** * Creates a "channel" that can be used by a blocking protobuf service. Useful setting up protobuf * blocking stubs. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java index dbe6ed1648df..65f936d6fc38 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java @@ -17,12 +17,17 @@ */ package org.apache.hadoop.hbase.ipc; +import java.io.DataInput; +import java.io.EOFException; import java.io.IOException; +import java.io.InputStream; import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.Map; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.MetricsConnection; import org.apache.hadoop.hbase.codec.Codec; @@ -34,12 +39,15 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.protobuf.Message; +import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations; import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer; import org.apache.hbase.thirdparty.io.netty.util.Timeout; @@ -48,6 +56,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation; /** @@ -72,6 +82,8 @@ abstract class RpcConnection { protected final CompressionCodec compressor; + protected final CellBlockBuilder cellBlockBuilder; + protected final MetricsConnection metrics; private final Map connectionAttributes; @@ -90,10 +102,12 @@ abstract class RpcConnection { protected RpcConnection(Configuration conf, HashedWheelTimer timeoutTimer, ConnectionId remoteId, String clusterId, boolean isSecurityEnabled, Codec codec, CompressionCodec compressor, - MetricsConnection metrics, Map connectionAttributes) throws IOException { + CellBlockBuilder cellBlockBuilder, MetricsConnection metrics, + Map connectionAttributes) throws IOException { this.timeoutTimer = timeoutTimer; this.codec = codec; this.compressor = compressor; + this.cellBlockBuilder = cellBlockBuilder; this.conf = conf; this.metrics = metrics; this.connectionAttributes = connectionAttributes; @@ -150,14 +164,13 @@ protected byte[] getConnectionHeaderPreamble() { // Assemble the preamble up in a buffer first and then send it. Writing individual elements, // they are getting sent across piecemeal according to wireshark and then server is messing // up the reading on occasion (the passed in stream is not buffered yet). - - // Preamble is six bytes -- 'HBas' + VERSION + AUTH_CODE int rpcHeaderLen = HConstants.RPC_HEADER.length; + // Preamble is six bytes -- 'HBas' + VERSION + AUTH_CODE byte[] preamble = new byte[rpcHeaderLen + 2]; System.arraycopy(HConstants.RPC_HEADER, 0, preamble, 0, rpcHeaderLen); preamble[rpcHeaderLen] = HConstants.RPC_CURRENT_VERSION; synchronized (this) { - preamble[rpcHeaderLen + 1] = provider.getSaslAuthMethod().getCode(); + preamble[preamble.length - 1] = provider.getSaslAuthMethod().getCode(); } return preamble; } @@ -238,4 +251,103 @@ public void setLastTouched(long lastTouched) { * Does the clean up work after the connection is removed from the connection pool */ public abstract void cleanupConnection(); + + protected Call connectionRegistryCall; + + private void finishCall(ResponseHeader responseHeader, T in, + Call call) throws IOException { + Message value; + if (call.responseDefaultType != null) { + Message.Builder builder = call.responseDefaultType.newBuilderForType(); + if (!builder.mergeDelimitedFrom(in)) { + // The javadoc of mergeDelimitedFrom says returning false means the stream reaches EOF + // before reading any bytes out, so here we need to manually finish create the EOFException + // and finish the call + call.setException(new EOFException("EOF while reading response with type: " + + call.responseDefaultType.getClass().getName())); + return; + } + value = builder.build(); + } else { + value = null; + } + CellScanner cellBlockScanner; + if (responseHeader.hasCellBlockMeta()) { + int size = responseHeader.getCellBlockMeta().getLength(); + // Maybe we could read directly from the ByteBuf. + // The problem here is that we do not know when to release it. + byte[] cellBlock = new byte[size]; + in.readFully(cellBlock); + cellBlockScanner = cellBlockBuilder.createCellScanner(this.codec, this.compressor, cellBlock); + } else { + cellBlockScanner = null; + } + call.setResponse(value, cellBlockScanner); + } + + void readResponse(T in, Map id2Call, + Consumer fatalConnectionErrorConsumer) throws IOException { + int totalSize = in.readInt(); + ResponseHeader responseHeader = ResponseHeader.parseDelimitedFrom(in); + int id = responseHeader.getCallId(); + if (LOG.isTraceEnabled()) { + LOG.trace("got response header " + TextFormat.shortDebugString(responseHeader) + + ", totalSize: " + totalSize + " bytes"); + } + RemoteException remoteExc; + if (responseHeader.hasException()) { + ExceptionResponse exceptionResponse = responseHeader.getException(); + remoteExc = IPCUtil.createRemoteException(exceptionResponse); + if (IPCUtil.isFatalConnectionException(exceptionResponse)) { + // Here we will cleanup all calls so do not need to fall back, just return. + fatalConnectionErrorConsumer.accept(remoteExc); + if (connectionRegistryCall != null) { + connectionRegistryCall.setException(remoteExc); + connectionRegistryCall = null; + } + return; + } + } else { + remoteExc = null; + } + if (id < 0) { + if (connectionRegistryCall != null) { + LOG.debug("process connection registry call"); + finishCall(responseHeader, in, connectionRegistryCall); + connectionRegistryCall = null; + return; + } + } + Call call = id2Call.remove(id); + if (call == null) { + // So we got a response for which we have no corresponding 'call' here on the client-side. + // We probably timed out waiting, cleaned up all references, and now the server decides + // to return a response. There is nothing we can do w/ the response at this stage. Clean + // out the wire of the response so its out of the way and we can get other responses on + // this connection. + if (LOG.isDebugEnabled()) { + int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader); + int whatIsLeftToRead = totalSize - readSoFar; + LOG.debug("Unknown callId: " + id + ", skipping over this response of " + whatIsLeftToRead + + " bytes"); + } + return; + } + call.callStats.setResponseSizeBytes(totalSize); + if (remoteExc != null) { + call.setException(remoteExc); + return; + } + try { + finishCall(responseHeader, in, call); + } catch (IOException e) { + // As the call has been removed from id2Call map, if we hit an exception here, the + // exceptionCaught method can not help us finish the call, so here we need to catch the + // exception and finish it + call.setException(e); + // throw the exception out, the upper layer should determine whether this is a critical + // problem + throw e; + } + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/BuiltInSaslAuthenticationProvider.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/BuiltInSaslAuthenticationProvider.java index 712d4035448b..b573a5ee771e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/BuiltInSaslAuthenticationProvider.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/BuiltInSaslAuthenticationProvider.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.security.provider; +import org.apache.hadoop.hbase.security.AuthMethod; import org.apache.yetus.audience.InterfaceAudience; /** @@ -35,4 +36,9 @@ public abstract class BuiltInSaslAuthenticationProvider implements SaslAuthentic public String getTokenKind() { return AUTH_TOKEN_TYPE; } + + protected static SaslAuthMethod createSaslAuthMethod(AuthMethod authMethod) { + return new SaslAuthMethod(authMethod.name(), authMethod.code, authMethod.mechanismName, + authMethod.authenticationMethod); + } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/DigestSaslAuthenticationProvider.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/DigestSaslAuthenticationProvider.java index d71c07d1575a..f22a06474aef 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/DigestSaslAuthenticationProvider.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/DigestSaslAuthenticationProvider.java @@ -17,7 +17,7 @@ */ package org.apache.hadoop.hbase.security.provider; -import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; +import org.apache.hadoop.hbase.security.AuthMethod; import org.apache.yetus.audience.InterfaceAudience; /** @@ -26,8 +26,7 @@ @InterfaceAudience.Private public class DigestSaslAuthenticationProvider extends BuiltInSaslAuthenticationProvider { - public static final SaslAuthMethod SASL_AUTH_METHOD = - new SaslAuthMethod("DIGEST", (byte) 82, "DIGEST-MD5", AuthenticationMethod.TOKEN); + public static final SaslAuthMethod SASL_AUTH_METHOD = createSaslAuthMethod(AuthMethod.DIGEST); @Override public SaslAuthMethod getSaslAuthMethod() { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/GssSaslAuthenticationProvider.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/GssSaslAuthenticationProvider.java index 7dea40f2657a..df6fce859b7f 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/GssSaslAuthenticationProvider.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/GssSaslAuthenticationProvider.java @@ -17,7 +17,7 @@ */ package org.apache.hadoop.hbase.security.provider; -import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; +import org.apache.hadoop.hbase.security.AuthMethod; import org.apache.yetus.audience.InterfaceAudience; /** @@ -26,8 +26,7 @@ @InterfaceAudience.Private public class GssSaslAuthenticationProvider extends BuiltInSaslAuthenticationProvider { - public static final SaslAuthMethod SASL_AUTH_METHOD = - new SaslAuthMethod("KERBEROS", (byte) 81, "GSSAPI", AuthenticationMethod.KERBEROS); + public static final SaslAuthMethod SASL_AUTH_METHOD = createSaslAuthMethod(AuthMethod.KERBEROS); @Override public SaslAuthMethod getSaslAuthMethod() { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/SimpleSaslAuthenticationProvider.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/SimpleSaslAuthenticationProvider.java index 01b1f452685a..9d79b648c6e4 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/SimpleSaslAuthenticationProvider.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/security/provider/SimpleSaslAuthenticationProvider.java @@ -17,7 +17,7 @@ */ package org.apache.hadoop.hbase.security.provider; -import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; +import org.apache.hadoop.hbase.security.AuthMethod; import org.apache.yetus.audience.InterfaceAudience; /** @@ -25,8 +25,8 @@ */ @InterfaceAudience.Private public class SimpleSaslAuthenticationProvider extends BuiltInSaslAuthenticationProvider { - public static final SaslAuthMethod SASL_AUTH_METHOD = - new SaslAuthMethod("SIMPLE", (byte) 80, "", AuthenticationMethod.SIMPLE); + + public static final SaslAuthMethod SASL_AUTH_METHOD = createSaslAuthMethod(AuthMethod.SIMPLE); @Override public SaslAuthMethod getSaslAuthMethod() { diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/DoNothingConnectionRegistry.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/DoNothingConnectionRegistry.java index 3b792a5bd15f..30d69d4b3f9e 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/DoNothingConnectionRegistry.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/DoNothingConnectionRegistry.java @@ -21,15 +21,16 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.RegionLocations; import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.security.User; import org.apache.yetus.audience.InterfaceAudience; /** * Registry that does nothing. Otherwise, default Registry wants zookeeper up and running. */ @InterfaceAudience.Private -class DoNothingConnectionRegistry implements ConnectionRegistry { +public class DoNothingConnectionRegistry implements ConnectionRegistry { - public DoNothingConnectionRegistry(Configuration conf) { + public DoNothingConnectionRegistry(Configuration conf, User user) { } @Override diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminRpcPriority.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminRpcPriority.java index 4bf425ed562e..f65c7ccb6e75 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminRpcPriority.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminRpcPriority.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.SmallTests; @@ -141,9 +142,9 @@ public Void answer(InvocationOnMock invocation) throws Throwable { } }).when(adminStub).stopServer(any(HBaseRpcController.class), any(StopServerRequest.class), any()); - - conn = new AsyncConnectionImpl(CONF, new DoNothingConnectionRegistry(CONF), "test", null, - UserProvider.instantiate(CONF).getCurrent()) { + User user = UserProvider.instantiate(CONF).getCurrent(); + conn = new AsyncConnectionImpl(CONF, new DoNothingConnectionRegistry(CONF, user), "test", null, + user) { @Override CompletableFuture getMasterStub() { diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncConnectionTracing.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncConnectionTracing.java index ff4a92ae394d..e56fffbb2642 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncConnectionTracing.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncConnectionTracing.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -63,15 +64,15 @@ public class TestAsyncConnectionTracing { @Before public void setUp() throws IOException { - ConnectionRegistry registry = new DoNothingConnectionRegistry(CONF) { + User user = UserProvider.instantiate(CONF).getCurrent(); + ConnectionRegistry registry = new DoNothingConnectionRegistry(CONF, user) { @Override public CompletableFuture getActiveMaster() { return CompletableFuture.completedFuture(masterServer); } }; - conn = new AsyncConnectionImpl(CONF, registry, "test", null, - UserProvider.instantiate(CONF).getCurrent()); + conn = new AsyncConnectionImpl(CONF, registry, "test", null, user); } @After diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocatorFailFast.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocatorFailFast.java index b306500c8b13..6380f1f6fb0f 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocatorFailFast.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocatorFailFast.java @@ -24,6 +24,8 @@ import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.RegionLocations; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.FutureUtils; @@ -45,8 +47,8 @@ public class TestAsyncMetaRegionLocatorFailFast { private static final class FaultyConnectionRegistry extends DoNothingConnectionRegistry { - public FaultyConnectionRegistry(Configuration conf) { - super(conf); + public FaultyConnectionRegistry(Configuration conf, User user) { + super(conf, user); } @Override @@ -56,8 +58,9 @@ public CompletableFuture getMetaRegionLocations() { } @BeforeClass - public static void setUp() { - LOCATOR = new AsyncMetaRegionLocator(new FaultyConnectionRegistry(CONF)); + public static void setUp() throws IOException { + LOCATOR = new AsyncMetaRegionLocator( + new FaultyConnectionRegistry(CONF, UserProvider.instantiate(CONF).getCurrent())); } @Test(expected = DoNotRetryIOException.class) diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTracing.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTracing.java index 335894303c08..a7df92999d08 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTracing.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocatorTracing.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -92,13 +93,14 @@ public void setUp() throws IOException { ServerName.valueOf("127.0.0.2", 12345, EnvironmentEdgeManager.currentTime())), new HRegionLocation(RegionReplicaUtil.getRegionInfoForReplica(metaRegionInfo, 2), ServerName.valueOf("127.0.0.3", 12345, EnvironmentEdgeManager.currentTime()))); - conn = new AsyncConnectionImpl(CONF, new DoNothingConnectionRegistry(CONF) { + User user = UserProvider.instantiate(CONF).getCurrent(); + conn = new AsyncConnectionImpl(CONF, new DoNothingConnectionRegistry(CONF, user) { @Override public CompletableFuture getMetaRegionLocations() { return CompletableFuture.completedFuture(locs); } - }, "test", null, UserProvider.instantiate(CONF).getCurrent()); + }, "test", null, user); } @After diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRpcPriority.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRpcPriority.java index e57967ae7211..cb5431c35d3e 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRpcPriority.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRpcPriority.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ipc.HBaseRpcController; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -160,8 +161,9 @@ public Void answer(InvocationOnMock invocation) throws Throwable { return null; } }).when(stub).get(any(HBaseRpcController.class), any(GetRequest.class), any()); - conn = new AsyncConnectionImpl(CONF, new DoNothingConnectionRegistry(CONF), "test", null, - UserProvider.instantiate(CONF).getCurrent()) { + User user = UserProvider.instantiate(CONF).getCurrent(); + conn = new AsyncConnectionImpl(CONF, new DoNothingConnectionRegistry(CONF, user), "test", null, + user) { @Override AsyncRegionLocator getLocator() { diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java index f9b86221af1e..2cecc974b6ef 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableTracing.java @@ -209,37 +209,37 @@ public Void answer(InvocationOnMock invocation) throws Throwable { } }).when(stub).get(any(HBaseRpcController.class), any(GetRequest.class), any()); final User user = UserProvider.instantiate(CONF).getCurrent(); - conn = - new AsyncConnectionImpl(CONF, new DoNothingConnectionRegistry(CONF), "test", null, user) { - - @Override - AsyncRegionLocator getLocator() { - AsyncRegionLocator locator = mock(AsyncRegionLocator.class); - Answer> answer = - new Answer>() { - - @Override - public CompletableFuture answer(InvocationOnMock invocation) - throws Throwable { - TableName tableName = invocation.getArgument(0); - RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build(); - ServerName serverName = ServerName.valueOf("rs", 16010, 12345); - HRegionLocation loc = new HRegionLocation(info, serverName); - return CompletableFuture.completedFuture(loc); - } - }; - doAnswer(answer).when(locator).getRegionLocation(any(TableName.class), any(byte[].class), - any(RegionLocateType.class), anyLong()); - doAnswer(answer).when(locator).getRegionLocation(any(TableName.class), any(byte[].class), - anyInt(), any(RegionLocateType.class), anyLong()); - return locator; - } + conn = new AsyncConnectionImpl(CONF, new DoNothingConnectionRegistry(CONF, user), "test", null, + user) { - @Override - ClientService.Interface getRegionServerStub(ServerName serverName) throws IOException { - return stub; - } - }; + @Override + AsyncRegionLocator getLocator() { + AsyncRegionLocator locator = mock(AsyncRegionLocator.class); + Answer> answer = + new Answer>() { + + @Override + public CompletableFuture answer(InvocationOnMock invocation) + throws Throwable { + TableName tableName = invocation.getArgument(0); + RegionInfo info = RegionInfoBuilder.newBuilder(tableName).build(); + ServerName serverName = ServerName.valueOf("rs", 16010, 12345); + HRegionLocation loc = new HRegionLocation(info, serverName); + return CompletableFuture.completedFuture(loc); + } + }; + doAnswer(answer).when(locator).getRegionLocation(any(TableName.class), any(byte[].class), + any(RegionLocateType.class), anyLong()); + doAnswer(answer).when(locator).getRegionLocation(any(TableName.class), any(byte[].class), + anyInt(), any(RegionLocateType.class), anyLong()); + return locator; + } + + @Override + ClientService.Interface getRegionServerStub(ServerName serverName) throws IOException { + return stub; + } + }; table = conn.getTable(TableName.valueOf("table"), ForkJoinPool.commonPool()); } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestConnectionRegistryLeak.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestConnectionRegistryLeak.java index a2df7e932395..bd2ca9867f34 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestConnectionRegistryLeak.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestConnectionRegistryLeak.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.FutureUtils; @@ -49,8 +50,8 @@ public static final class ConnectionRegistryForTest extends DoNothingConnectionR private boolean closed = false; - public ConnectionRegistryForTest(Configuration conf) { - super(conf); + public ConnectionRegistryForTest(Configuration conf, User user) { + super(conf, user); CREATED.add(this); } diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRpcBasedRegistryHedgedReads.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRpcBasedRegistryHedgedReads.java index 54b351f00a3b..08c56fe95868 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRpcBasedRegistryHedgedReads.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestRpcBasedRegistryHedgedReads.java @@ -58,7 +58,9 @@ import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ConnectionRegistryService; import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetConnectionRegistryResponse; @Category({ ClientTests.class, SmallTests.class }) public class TestRpcBasedRegistryHedgedReads { @@ -132,6 +134,12 @@ public static final class RpcChannelImpl implements RpcChannel { @Override public void callMethod(MethodDescriptor method, RpcController controller, Message request, Message responsePrototype, RpcCallback done) { + if (method.getService().equals(ConnectionRegistryService.getDescriptor())) { + // this is for setting up the rpc client + done.run( + GetConnectionRegistryResponse.newBuilder().setClusterId(RESP.getClusterId()).build()); + return; + } if (!method.getName().equals("GetClusterId")) { // On RPC failures, MasterRegistry internally runs getMasters() RPC to keep the master list // fresh. We do not want to intercept those RPCs here and double count. @@ -155,9 +163,9 @@ public void callMethod(MethodDescriptor method, RpcController controller, Messag private AbstractRpcBasedConnectionRegistry createRegistry(int hedged) throws IOException { Configuration conf = UTIL.getConfiguration(); conf.setInt(HEDGED_REQS_FANOUT_CONFIG_NAME, hedged); - return new AbstractRpcBasedConnectionRegistry(conf, HEDGED_REQS_FANOUT_CONFIG_NAME, - INITIAL_DELAY_SECS_CONFIG_NAME, REFRESH_INTERVAL_SECS_CONFIG_NAME, - MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME) { + return new AbstractRpcBasedConnectionRegistry(conf, User.getCurrent(), + HEDGED_REQS_FANOUT_CONFIG_NAME, INITIAL_DELAY_SECS_CONFIG_NAME, + REFRESH_INTERVAL_SECS_CONFIG_NAME, MIN_REFRESH_INTERVAL_SECS_CONFIG_NAME) { @Override protected Set getBootstrapNodes(Configuration conf) throws IOException { diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestTLSHandshadeFailure.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestTLSHandshadeFailure.java index 10948358ff92..8aadce85651d 100644 --- a/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestTLSHandshadeFailure.java +++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestTLSHandshadeFailure.java @@ -56,6 +56,8 @@ import org.apache.hbase.thirdparty.com.google.common.io.Closeables; import org.apache.hbase.thirdparty.io.netty.handler.ssl.NotSslRecordException; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService; + /** * A simple UT to make sure that we do not leak the SslExceptions to netty's TailContext, where it * will generate a confusing WARN message. @@ -149,11 +151,12 @@ public Void answer(InvocationOnMock invocation) throws Throwable { Address.fromParts("127.0.0.1", server.getLocalPort())); NettyRpcConnection conn = client.createConnection(id); BlockingRpcCallback done = new BlockingRpcCallback<>(); - Call call = - new Call(1, null, null, null, null, 0, 0, Collections.emptyMap(), done, new CallStats()); + Call call = new Call(1, ClientMetaService.getDescriptor().getMethods().get(0), null, null, null, + 0, 0, Collections.emptyMap(), done, new CallStats()); HBaseRpcController hrc = new HBaseRpcControllerImpl(); conn.sendRequest(call, hrc); done.get(); + call.error.printStackTrace(); assertThat(call.error, instanceOf(NotSslRecordException.class)); Waiter.waitFor(conf, 5000, () -> msg.get() != null); verify(mockAppender).append(any()); diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ConcurrentMapUtils.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ConcurrentMapUtils.java index cf8130e624e2..9c5ebe8519f4 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ConcurrentMapUtils.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ConcurrentMapUtils.java @@ -38,14 +38,6 @@ public static V computeIfAbsent(ConcurrentMap map, K key, Supplier< }); } - /** - * A supplier that throws IOException when get. - */ - @FunctionalInterface - public interface IOExceptionSupplier { - V get() throws IOException; - } - /** * In HBASE-16648 we found that ConcurrentHashMap.get is much faster than computeIfAbsent if the * value already exists. So here we copy the implementation of diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/IOExceptionSupplier.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/IOExceptionSupplier.java new file mode 100644 index 000000000000..11771a47c083 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/IOExceptionSupplier.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.io.IOException; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * A supplier that throws IOException when get. + */ +@InterfaceAudience.Private +@FunctionalInterface +public interface IOExceptionSupplier { + V get() throws IOException; +} diff --git a/hbase-protocol-shaded/src/main/protobuf/server/Registry.proto b/hbase-protocol-shaded/src/main/protobuf/server/Registry.proto index 8dd0d1abdf3d..f55b892413b2 100644 --- a/hbase-protocol-shaded/src/main/protobuf/server/Registry.proto +++ b/hbase-protocol-shaded/src/main/protobuf/server/Registry.proto @@ -105,4 +105,21 @@ service ClientMetaService { * Get nodes which could be used as ClientMetaService */ rpc GetBootstrapNodes(GetBootstrapNodesRequest) returns (GetBootstrapNodesResponse); -} \ No newline at end of file +} + +message GetConnectionRegistryRequest { +} + +/** + * For returning connection registry information to client, like cluster id + */ +message GetConnectionRegistryResponse { + required string cluster_id = 1; +} + +/** + * Just a fake rpc service for getting connection registry information + */ +service ConnectionRegistryService { + rpc GetConnectionRegistry(GetConnectionRegistryRequest) returns(GetConnectionRegistryResponse); +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java index 579da46af1c1..7225f92b7ff9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java @@ -64,7 +64,7 @@ private static AsyncClusterConnection createAsyncClusterConnection(Configuration */ public static AsyncClusterConnection createAsyncClusterConnection(Configuration conf, SocketAddress localAddress, User user) throws IOException { - return createAsyncClusterConnection(conf, ConnectionRegistryFactory.getRegistry(conf), + return createAsyncClusterConnection(conf, ConnectionRegistryFactory.getRegistry(conf, user), localAddress, user); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java index 629b3468cbe5..1d93fbd0f668 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java @@ -63,7 +63,6 @@ import org.apache.hbase.thirdparty.io.netty.channel.WriteBufferWaterMark; import org.apache.hbase.thirdparty.io.netty.channel.group.ChannelGroup; import org.apache.hbase.thirdparty.io.netty.channel.group.DefaultChannelGroup; -import org.apache.hbase.thirdparty.io.netty.handler.codec.FixedLengthFrameDecoder; import org.apache.hbase.thirdparty.io.netty.handler.ssl.OptionalSslHandler; import org.apache.hbase.thirdparty.io.netty.handler.ssl.SslContext; import org.apache.hbase.thirdparty.io.netty.handler.ssl.SslHandler; @@ -167,13 +166,15 @@ protected void initChannel(Channel ch) throws Exception { ch.config().setWriteBufferWaterMark(writeBufferWaterMark); ch.config().setAllocator(channelAllocator); ChannelPipeline pipeline = ch.pipeline(); - FixedLengthFrameDecoder preambleDecoder = new FixedLengthFrameDecoder(6); - preambleDecoder.setSingleDecode(true); + NettyServerRpcConnection conn = createNettyServerRpcConnection(ch); + if (conf.getBoolean(HBASE_SERVER_NETTY_TLS_ENABLED, false)) { initSSL(pipeline, conn, conf.getBoolean(HBASE_SERVER_NETTY_TLS_SUPPORTPLAINTEXT, true)); } - pipeline.addLast(NettyRpcServerPreambleHandler.DECODER_NAME, preambleDecoder) + pipeline + .addLast(NettyRpcServerPreambleHandler.DECODER_NAME, + NettyRpcServerPreambleHandler.createDecoder()) .addLast(new NettyRpcServerPreambleHandler(NettyRpcServer.this, conn)) // We need NettyRpcServerResponseEncoder here because NettyRpcServerPreambleHandler may // send RpcResponse to client. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerPreambleHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerPreambleHandler.java index 02e1b5858117..5aa77e0e8ace 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerPreambleHandler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServerPreambleHandler.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.ipc; import java.nio.ByteBuffer; +import org.apache.hadoop.hbase.ipc.ServerRpcConnection.PreambleResponse; import org.apache.hadoop.hbase.util.NettyFutureUtils; import org.apache.yetus.audience.InterfaceAudience; @@ -25,6 +26,7 @@ import org.apache.hbase.thirdparty.io.netty.channel.ChannelHandlerContext; import org.apache.hbase.thirdparty.io.netty.channel.ChannelPipeline; import org.apache.hbase.thirdparty.io.netty.channel.SimpleChannelInboundHandler; +import org.apache.hbase.thirdparty.io.netty.handler.codec.FixedLengthFrameDecoder; import org.apache.hbase.thirdparty.io.netty.handler.codec.LengthFieldBasedFrameDecoder; /** @@ -45,6 +47,12 @@ public NettyRpcServerPreambleHandler(NettyRpcServer rpcServer, NettyServerRpcCon this.conn = conn; } + static FixedLengthFrameDecoder createDecoder() { + FixedLengthFrameDecoder preambleDecoder = new FixedLengthFrameDecoder(6); + preambleDecoder.setSingleDecode(true); + return preambleDecoder; + } + @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { if (processPreambleError) { @@ -57,11 +65,19 @@ protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Excep ByteBuffer buf = ByteBuffer.allocate(msg.readableBytes()); msg.readBytes(buf); buf.flip(); - if (!conn.processPreamble(buf)) { + PreambleResponse resp = conn.processPreamble(buf); + if (resp == PreambleResponse.CLOSE) { processPreambleError = true; conn.close(); return; } + if (resp == PreambleResponse.CONTINUE) { + // we use a single decode decoder, so here we need to replace it with a new one so it will + // decode a new preamble header again + ctx.pipeline().replace(DECODER_NAME, DECODER_NAME, createDecoder()); + return; + } + // resp == PreambleResponse.SUCCEED ChannelPipeline p = ctx.pipeline(); if (conn.useSasl) { LengthFieldBasedFrameDecoder decoder = diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java index 4c32b2b6a5fa..be97ad582c37 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java @@ -41,6 +41,7 @@ import org.apache.commons.crypto.random.CryptoRandomFactory; import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.client.ConnectionRegistryEndpoint; import org.apache.hadoop.hbase.client.VersionInfoUtil; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.io.ByteBufferOutputStream; @@ -57,6 +58,7 @@ import org.apache.hadoop.hbase.security.provider.SaslServerAuthenticationProviders; import org.apache.hadoop.hbase.security.provider.SimpleSaslServerAuthenticationProvider; import org.apache.hadoop.hbase.trace.TraceUtil; +import org.apache.hadoop.hbase.util.ByteBufferUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.IntWritable; @@ -87,6 +89,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ResponseHeader; import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.UserInformation; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetConnectionRegistryResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.TracingProtos.RPCTInfo; /** Reads calls from a connection and queues them for handling. */ @@ -688,10 +691,32 @@ private void doBadPreambleHandling(String msg) throws IOException { } private void doBadPreambleHandling(String msg, Exception e) throws IOException { - RpcServer.LOG.warn(msg); + RpcServer.LOG.warn(msg, e); doRespond(getErrorResponse(msg, e)); } + private boolean doConnectionRegistryResponse() throws IOException { + if (!(rpcServer.server instanceof ConnectionRegistryEndpoint)) { + // should be in tests or some scenarios where we should not reach here + return false; + } + // on backup masters, this request may be blocked since we need to fetch it from filesystem, + // but since it is just backup master, it is not a critical problem + String clusterId = ((ConnectionRegistryEndpoint) rpcServer.server).getClusterId(); + RpcServer.LOG.debug("Response connection registry, clusterId = '{}'", clusterId); + if (clusterId == null) { + // should be in tests or some scenarios where we should not reach here + return false; + } + GetConnectionRegistryResponse resp = + GetConnectionRegistryResponse.newBuilder().setClusterId(clusterId).build(); + ResponseHeader header = ResponseHeader.newBuilder().setCallId(-1).build(); + ByteBuffer buf = ServerCall.createHeaderAndMessageBytes(resp, header, 0, null); + BufferChain bufChain = new BufferChain(buf); + doRespond(() -> bufChain); + return true; + } + protected final void callCleanupIfNeeded() { if (callCleanup != null) { callCleanup.run(); @@ -699,30 +724,42 @@ protected final void callCleanupIfNeeded() { } } - protected final boolean processPreamble(ByteBuffer preambleBuffer) throws IOException { - assert preambleBuffer.remaining() == 6; - for (int i = 0; i < RPC_HEADER.length; i++) { - if (RPC_HEADER[i] != preambleBuffer.get()) { - doBadPreambleHandling( - "Expected HEADER=" + Bytes.toStringBinary(RPC_HEADER) + " but received HEADER=" - + Bytes.toStringBinary(preambleBuffer.array(), 0, RPC_HEADER.length) + " from " - + toString()); - return false; - } - } - int version = preambleBuffer.get() & 0xFF; - byte authbyte = preambleBuffer.get(); + protected enum PreambleResponse { + SUCCEED, // successfully processed the rpc preamble header + CONTINUE, // the preamble header is for other purpose, wait for the rpc preamble header + CLOSE // close the rpc connection + } + protected final PreambleResponse processPreamble(ByteBuffer preambleBuffer) throws IOException { + assert preambleBuffer.remaining() == 6; + if ( + ByteBufferUtils.equals(preambleBuffer, preambleBuffer.position(), 6, + RpcClient.REGISTRY_PREAMBLE_HEADER, 0, 6) && doConnectionRegistryResponse() + ) { + return PreambleResponse.CLOSE; + } + if (!ByteBufferUtils.equals(preambleBuffer, preambleBuffer.position(), 4, RPC_HEADER, 0, 4)) { + doBadPreambleHandling( + "Expected HEADER=" + Bytes.toStringBinary(RPC_HEADER) + " but received HEADER=" + + Bytes.toStringBinary( + ByteBufferUtils.toBytes(preambleBuffer, preambleBuffer.position(), RPC_HEADER.length), + 0, RPC_HEADER.length) + + " from " + toString()); + return PreambleResponse.CLOSE; + } + int version = preambleBuffer.get(preambleBuffer.position() + 4) & 0xFF; + byte authByte = preambleBuffer.get(preambleBuffer.position() + 5); if (version != RpcServer.CURRENT_VERSION) { - String msg = getFatalConnectionString(version, authbyte); + String msg = getFatalConnectionString(version, authByte); doBadPreambleHandling(msg, new WrongVersionException(msg)); - return false; + return PreambleResponse.CLOSE; } - this.provider = this.saslProviders.selectProvider(authbyte); + + this.provider = this.saslProviders.selectProvider(authByte); if (this.provider == null) { - String msg = getFatalConnectionString(version, authbyte); + String msg = getFatalConnectionString(version, authByte); doBadPreambleHandling(msg, new BadAuthException(msg)); - return false; + return PreambleResponse.CLOSE; } // TODO this is a wart while simple auth'n doesn't go through sasl. if (this.rpcServer.isSecurityEnabled && isSimpleAuthentication()) { @@ -732,7 +769,7 @@ protected final boolean processPreamble(ByteBuffer preambleBuffer) throws IOExce } else { AccessDeniedException ae = new AccessDeniedException("Authentication is required"); doRespond(getErrorResponse(ae.getMessage(), ae)); - return false; + return PreambleResponse.CLOSE; } } if (!this.rpcServer.isSecurityEnabled && !isSimpleAuthentication()) { @@ -745,7 +782,7 @@ protected final boolean processPreamble(ByteBuffer preambleBuffer) throws IOExce skipInitialSaslHandshake = true; } useSasl = !(provider instanceof SimpleSaslServerAuthenticationProvider); - return true; + return PreambleResponse.SUCCEED; } boolean isSimpleAuthentication() { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java index ac705d7a26fa..9e90a7a31339 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleServerRpcConnection.java @@ -137,12 +137,21 @@ private int readPreamble() throws IOException { return count; } preambleBuffer.flip(); - if (!processPreamble(preambleBuffer)) { - return -1; + PreambleResponse resp = processPreamble(preambleBuffer); + switch (resp) { + case SUCCEED: + preambleBuffer = null; // do not need it anymore + connectionPreambleRead = true; + return count; + case CONTINUE: + // wait for the next preamble header + preambleBuffer.reset(); + return count; + case CLOSE: + return -1; + default: + throw new IllegalArgumentException("Unknown preamble response: " + resp); } - preambleBuffer = null; // do not need it anymore - connectionPreambleRead = true; - return count; } private int read4Bytes() throws IOException { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java index d001ea755b74..cbfde9c7e172 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java @@ -50,9 +50,9 @@ import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; -import org.apache.hadoop.hbase.util.ConcurrentMapUtils.IOExceptionSupplier; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.IOExceptionSupplier; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.yetus.audience.InterfaceAudience; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java index a94c214a3250..509d74e0335c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/backup/example/TestZooKeeperTableArchiveClient.java @@ -40,7 +40,8 @@ import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.DummyConnectionRegistry; +import org.apache.hadoop.hbase.client.ConnectionRegistry; +import org.apache.hadoop.hbase.client.DoNothingConnectionRegistry; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.master.cleaner.BaseHFileCleanerDelegate; import org.apache.hadoop.hbase.master.cleaner.DirScanPool; @@ -49,6 +50,7 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.RegionServerServices; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.util.Bytes; @@ -92,9 +94,10 @@ public class TestZooKeeperTableArchiveClient { private static RegionServerServices rss; private static DirScanPool POOL; - public static final class MockRegistry extends DummyConnectionRegistry { + public static final class MockRegistry extends DoNothingConnectionRegistry { - public MockRegistry(Configuration conf) { + public MockRegistry(Configuration conf, User user) { + super(conf, user); } @Override @@ -110,8 +113,8 @@ public CompletableFuture getClusterId() { public static void setupCluster() throws Exception { setupConf(UTIL.getConfiguration()); UTIL.startMiniZKCluster(); - UTIL.getConfiguration().setClass(MockRegistry.REGISTRY_IMPL_CONF_KEY, MockRegistry.class, - DummyConnectionRegistry.class); + UTIL.getConfiguration().setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, + MockRegistry.class, ConnectionRegistry.class); CONNECTION = ConnectionFactory.createConnection(UTIL.getConfiguration()); archivingClient = new ZKTableArchiveClient(UTIL.getConfiguration(), CONNECTION); // make hfile archiving node so we can archive files diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestRegionLocator.java index 8b38db974d7c..0ff105743e0c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestRegionLocator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestRegionLocator.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.junit.After; @@ -59,7 +60,7 @@ protected static void startClusterAndCreateTable() throws Exception { UTIL.getAdmin().createTable(td, SPLIT_KEYS); UTIL.waitTableAvailable(TABLE_NAME); try (ConnectionRegistry registry = - ConnectionRegistryFactory.getRegistry(UTIL.getConfiguration())) { + ConnectionRegistryFactory.getRegistry(UTIL.getConfiguration(), User.getCurrent())) { RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(UTIL, registry); } UTIL.getAdmin().balancerSwitch(false, true); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyConnectionRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyConnectionRegistry.java deleted file mode 100644 index cc2e9493d039..000000000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyConnectionRegistry.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.client; - -import java.util.concurrent.CompletableFuture; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.RegionLocations; -import org.apache.hadoop.hbase.ServerName; - -/** - * Can be overridden in UT if you only want to implement part of the methods in - * {@link ConnectionRegistry}. - */ -public class DummyConnectionRegistry implements ConnectionRegistry { - - public static final String REGISTRY_IMPL_CONF_KEY = - HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY; - - @Override - public CompletableFuture getMetaRegionLocations() { - return null; - } - - @Override - public CompletableFuture getClusterId() { - return null; - } - - @Override - public CompletableFuture getActiveMaster() { - return null; - } - - @Override - public String getConnectionString() { - return null; - } - - @Override - public void close() { - } -} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminWithRegionReplicas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminWithRegionReplicas.java index 4dd4d4550777..da400f29c0c6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminWithRegionReplicas.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncAdminWithRegionReplicas.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.Bytes; @@ -55,7 +56,7 @@ public static void setUpBeforeClass() throws Exception { TestAsyncAdminBase.setUpBeforeClass(); HBaseTestingUtil.setReplicas(TEST_UTIL.getAdmin(), TableName.META_TABLE_NAME, 3); try (ConnectionRegistry registry = - ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration())) { + ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration(), User.getCurrent())) { RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL, registry); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java index ad9bf551c033..90d2cb51e8cf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncMetaRegionLocator.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.RegionReplicaTestHelper.Locator; import org.apache.hadoop.hbase.client.trace.StringTraceRenderer; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.trace.OpenTelemetryClassRule; @@ -106,7 +107,8 @@ protected void before() throws Throwable { testUtil = miniClusterRule.getTestingUtility(); HBaseTestingUtil.setReplicas(admin, TableName.META_TABLE_NAME, 3); testUtil.waitUntilNoRegionsInTransition(); - registry = ConnectionRegistryFactory.getRegistry(testUtil.getConfiguration()); + registry = + ConnectionRegistryFactory.getRegistry(testUtil.getConfiguration(), User.getCurrent()); RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(testUtil, registry); admin.balancerSwitch(false).get(); locator = new AsyncMetaRegionLocator(registry); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java index f3b231d5bde8..a6d0ab81f912 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocator.java @@ -128,7 +128,7 @@ public void setUpBeforeTest() throws InterruptedException, ExecutionException, I // Enable meta replica LoadBalance mode for this connection. c.set(RegionLocator.LOCATOR_META_REPLICAS_MODE, metaReplicaMode.toString()); ConnectionRegistry registry = - ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration()); + ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration(), User.getCurrent()); conn = new AsyncConnectionImpl(c, registry, registry.getClusterId().get(), null, User.getCurrent()); locator = new AsyncNonMetaRegionLocator(conn); @@ -147,7 +147,7 @@ public void tearDownAfterTest() throws IOException { } @Parameterized.Parameters - public static Collection parameters() { + public static Collection paramAbstractTestRegionLocatoreters() { return Arrays .asList(new Object[][] { { CatalogReplicaMode.NONE }, { CatalogReplicaMode.LOAD_BALANCE } }); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java index 2dd08b36b30d..50c9ab9f5657 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncNonMetaRegionLocatorConcurrenyLimit.java @@ -125,7 +125,7 @@ public static void setUp() throws Exception { TEST_UTIL.startMiniCluster(3); TEST_UTIL.getAdmin().balancerSwitch(false, true); ConnectionRegistry registry = - ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration()); + ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration(), User.getCurrent()); CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry, registry.getClusterId().get(), null, User.getCurrent()); LOCATOR = new AsyncNonMetaRegionLocator(CONN); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java index ee0963e1f8b3..bacd7bb32d70 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegionLocator.java @@ -100,7 +100,7 @@ public static void setUp() throws Exception { TEST_UTIL.createTable(TABLE_NAME, FAMILY); TEST_UTIL.waitTableAvailable(TABLE_NAME); ConnectionRegistry registry = - ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration()); + ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration(), User.getCurrent()); CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry, registry.getClusterId().get(), null, User.getCurrent()); LOCATOR = CONN.getLocator(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java index ab43ec545d93..3c8327145f32 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java @@ -73,7 +73,7 @@ public static void setUpBeforeClass() throws Exception { TEST_UTIL.createTable(TABLE_NAME, FAMILY); TEST_UTIL.waitTableAvailable(TABLE_NAME); ConnectionRegistry registry = - ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration()); + ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration(), User.getCurrent()); CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), registry, registry.getClusterId().get(), null, User.getCurrent()); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableUseMetaReplicas.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableUseMetaReplicas.java index 1a7ac8819e49..0de59a4c32bf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableUseMetaReplicas.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableUseMetaReplicas.java @@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; @@ -94,7 +95,8 @@ public static void setUp() throws Exception { FailPrimaryMetaScanCp.class.getName()); UTIL.startMiniCluster(3); HBaseTestingUtil.setReplicas(UTIL.getAdmin(), TableName.META_TABLE_NAME, 3); - try (ConnectionRegistry registry = ConnectionRegistryFactory.getRegistry(conf)) { + try (ConnectionRegistry registry = + ConnectionRegistryFactory.getRegistry(conf, User.getCurrent())) { RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(UTIL, registry); } try (Table table = UTIL.createTable(TABLE_NAME, FAMILY)) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBootstrapNodeUpdate.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBootstrapNodeUpdate.java index d5b0ee18e594..549575f4f404 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBootstrapNodeUpdate.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestBootstrapNodeUpdate.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtil; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.regionserver.BootstrapNodeManager; +import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.junit.AfterClass; @@ -65,7 +66,7 @@ public static void setUpBeforeClass() throws Exception { conf.setLong(RpcConnectionRegistry.PERIODIC_REFRESH_INTERVAL_SECS, 1); conf.setLong(RpcConnectionRegistry.MIN_SECS_BETWEEN_REFRESHES, 1); UTIL.startMiniCluster(3); - REGISTRY = new RpcConnectionRegistry(conf); + REGISTRY = new RpcConnectionRegistry(conf, UserProvider.instantiate(conf).getCurrent()); } @AfterClass diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCatalogReplicaLoadBalanceSimpleSelector.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCatalogReplicaLoadBalanceSimpleSelector.java index 0e2feae841cf..5c78e53f7e60 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCatalogReplicaLoadBalanceSimpleSelector.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestCatalogReplicaLoadBalanceSimpleSelector.java @@ -77,7 +77,8 @@ public static void setUp() throws Exception { () -> TEST_UTIL.getMiniHBaseCluster().getRegions(TableName.META_TABLE_NAME).size() >= numOfMetaReplica); - registry = ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration()); + registry = + ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration(), User.getCurrent()); CONN = new AsyncConnectionImpl(conf, registry, registry.getClusterId().get(), null, User.getCurrent()); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java index c9238bc99978..d79603cea3cc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMasterRegistry.java @@ -21,7 +21,6 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -38,6 +37,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.junit.AfterClass; @@ -85,45 +85,39 @@ private static String generateDummyMastersList(int size) { * Makes sure the master registry parses the master end points in the configuration correctly. */ @Test - public void testMasterAddressParsing() throws IOException { + public void testMasterAddressParsing() throws Exception { Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); int numMasters = 10; conf.set(HConstants.MASTER_ADDRS_KEY, generateDummyMastersList(numMasters)); - try (MasterRegistry registry = new MasterRegistry(conf)) { - List parsedMasters = new ArrayList<>(registry.getParsedServers()); - // Half of them would be without a port, duplicates are removed. - assertEquals(numMasters / 2 + 1, parsedMasters.size()); - // Sort in the increasing order of port numbers. - Collections.sort(parsedMasters, Comparator.comparingInt(ServerName::getPort)); - for (int i = 0; i < parsedMasters.size(); i++) { - ServerName sn = parsedMasters.get(i); - assertEquals("localhost", sn.getHostname()); - if (i == parsedMasters.size() - 1) { - // Last entry should be the one with default port. - assertEquals(HConstants.DEFAULT_MASTER_PORT, sn.getPort()); - } else { - assertEquals(1000 + (2 * i), sn.getPort()); - } + List parsedMasters = new ArrayList<>(MasterRegistry.parseMasterAddrs(conf)); + // Half of them would be without a port, duplicates are removed. + assertEquals(numMasters / 2 + 1, parsedMasters.size()); + // Sort in the increasing order of port numbers. + Collections.sort(parsedMasters, Comparator.comparingInt(ServerName::getPort)); + for (int i = 0; i < parsedMasters.size(); i++) { + ServerName sn = parsedMasters.get(i); + assertEquals("localhost", sn.getHostname()); + if (i == parsedMasters.size() - 1) { + // Last entry should be the one with default port. + assertEquals(HConstants.DEFAULT_MASTER_PORT, sn.getPort()); + } else { + assertEquals(1000 + (2 * i), sn.getPort()); } } } @Test - public void testMasterPortDefaults() throws IOException { + public void testMasterPortDefaults() throws Exception { Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); conf.set(HConstants.MASTER_ADDRS_KEY, "localhost"); - try (MasterRegistry registry = new MasterRegistry(conf)) { - List parsedMasters = new ArrayList<>(registry.getParsedServers()); - ServerName sn = parsedMasters.get(0); - assertEquals(HConstants.DEFAULT_MASTER_PORT, sn.getPort()); - } + List parsedMasters = new ArrayList<>(MasterRegistry.parseMasterAddrs(conf)); + ServerName sn = parsedMasters.get(0); + assertEquals(HConstants.DEFAULT_MASTER_PORT, sn.getPort()); final int CUSTOM_MASTER_PORT = 9999; conf.setInt(HConstants.MASTER_PORT, CUSTOM_MASTER_PORT); - try (MasterRegistry registry = new MasterRegistry(conf)) { - List parsedMasters = new ArrayList<>(registry.getParsedServers()); - ServerName sn = parsedMasters.get(0); - assertEquals(CUSTOM_MASTER_PORT, sn.getPort()); - } + parsedMasters = new ArrayList<>(MasterRegistry.parseMasterAddrs(conf)); + sn = parsedMasters.get(0); + assertEquals(CUSTOM_MASTER_PORT, sn.getPort()); } @Test @@ -133,7 +127,7 @@ public void testRegistryRPCs() throws Exception { final int size = activeMaster.getMetaLocations().size(); for (int numHedgedReqs = 1; numHedgedReqs <= size; numHedgedReqs++) { conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, numHedgedReqs); - try (MasterRegistry registry = new MasterRegistry(conf)) { + try (MasterRegistry registry = new MasterRegistry(conf, User.getCurrent())) { // Add wait on all replicas being assigned before proceeding w/ test. Failed on occasion // because not all replicas had made it up before test started. RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL, registry); @@ -166,7 +160,7 @@ public void testDynamicMasterConfigurationRefresh() throws Exception { conf.setInt(MasterRegistry.MASTER_REGISTRY_HEDGED_REQS_FANOUT_KEY, 4); // Do not limit the number of refreshes during the test run. conf.setLong(MasterRegistry.MASTER_REGISTRY_MIN_SECS_BETWEEN_REFRESHES, 0); - try (MasterRegistry registry = new MasterRegistry(conf)) { + try (MasterRegistry registry = new MasterRegistry(conf, User.getCurrent())) { final Set masters = registry.getParsedServers(); assertTrue(masters.contains(badServer)); // Make a registry RPC, this should trigger a refresh since one of the hedged RPC fails. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java index d78832d9a8a0..beb054eaf366 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaRegionLocationCache.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.master.RegionState; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.JVMClusterUtil; @@ -63,7 +64,8 @@ public class TestMetaRegionLocationCache { public static void setUp() throws Exception { TEST_UTIL.startMiniCluster(3); HBaseTestingUtil.setReplicas(TEST_UTIL.getAdmin(), TableName.META_TABLE_NAME, 3); - REGISTRY = ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration()); + REGISTRY = + ConnectionRegistryFactory.getRegistry(TEST_UTIL.getConfiguration(), User.getCurrent()); RegionReplicaTestHelper.waitUntilAllMetaReplicasAreReady(TEST_UTIL, REGISTRY); TEST_UTIL.getAdmin().balancerSwitch(false, true); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcConnectionRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcConnectionRegistry.java index 9c26bccbbb31..d33cc943355c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcConnectionRegistry.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRpcConnectionRegistry.java @@ -17,22 +17,29 @@ */ package org.apache.hadoop.hbase.client; -import static org.hamcrest.CoreMatchers.hasItems; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.hasItems; +import static org.hamcrest.Matchers.hasSize; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Set; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtil; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.master.HMaster; import org.apache.hadoop.hbase.regionserver.BootstrapNodeManager; import org.apache.hadoop.hbase.regionserver.RSRpcServices; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.ClientTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.junit.After; @@ -43,6 +50,7 @@ import org.junit.Test; import org.junit.experimental.categories.Category; +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.common.io.Closeables; @Category({ MediumTests.class, ClientTests.class }) @@ -74,7 +82,7 @@ public static void tearDownAfterClass() throws Exception { @Before public void setUp() throws IOException { - registry = new RpcConnectionRegistry(UTIL.getConfiguration()); + registry = new RpcConnectionRegistry(UTIL.getConfiguration(), User.getCurrent()); } @After @@ -94,9 +102,20 @@ private void setMaxNodeCount(int count) { @Test public void testRegistryRPCs() throws Exception { HMaster activeMaster = UTIL.getHBaseCluster().getMaster(); - // sleep 3 seconds, since our initial delay is 1 second, we should have refreshed the endpoints - Thread.sleep(3000); - assertThat(registry.getParsedServers(), + // should only contains the active master + Set initialParsedServers = registry.getParsedServers(); + assertThat(initialParsedServers, hasSize(1)); + // no start code in configuration + assertThat(initialParsedServers, + hasItem(ServerName.valueOf(activeMaster.getServerName().getHostname(), + activeMaster.getServerName().getPort(), -1))); + // Since our initial delay is 1 second, finally we should have refreshed the endpoints + UTIL.waitFor(5000, () -> registry.getParsedServers() + .contains(activeMaster.getServerManager().getOnlineServersList().get(0))); + Set parsedServers = registry.getParsedServers(); + assertThat(parsedServers, + hasSize(activeMaster.getServerManager().getOnlineServersList().size())); + assertThat(parsedServers, hasItems(activeMaster.getServerManager().getOnlineServersList().toArray(new ServerName[0]))); // Add wait on all replicas being assigned before proceeding w/ test. Failed on occasion @@ -116,4 +135,32 @@ public void testRegistryRPCs() throws Exception { setMaxNodeCount(1); UTIL.waitFor(10000, () -> registry.getParsedServers().size() == 1); } + + /** + * Make sure that we can create the RpcClient when there are broken servers in the bootstrap nodes + */ + @Test + public void testBrokenBootstrapNodes() throws Exception { + Configuration conf = new Configuration(UTIL.getConfiguration()); + String currentMasterAddrs = Preconditions.checkNotNull(conf.get(HConstants.MASTER_ADDRS_KEY)); + HMaster activeMaster = UTIL.getHBaseCluster().getMaster(); + String clusterId = activeMaster.getClusterId(); + // Add a non-working master + ServerName badServer = ServerName.valueOf("localhost", 1234, -1); + conf.set(RpcConnectionRegistry.BOOTSTRAP_NODES, badServer.toShortString()); + // only a bad server, the request should fail + try (RpcConnectionRegistry reg = new RpcConnectionRegistry(conf, User.getCurrent())) { + assertThrows(IOException.class, () -> reg.getParsedServers()); + } + + conf.set(RpcConnectionRegistry.BOOTSTRAP_NODES, + badServer.toShortString() + ", " + currentMasterAddrs); + // we will choose bootstrap node randomly so here we need to test it multiple times to make sure + // that we can skip the broken node + for (int i = 0; i < 10; i++) { + try (RpcConnectionRegistry reg = new RpcConnectionRegistry(conf, User.getCurrent())) { + assertEquals(clusterId, reg.getClusterId().get()); + } + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKConnectionRegistry.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKConnectionRegistry.java index 1cbb36196684..6d585245e959 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKConnectionRegistry.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestZKConnectionRegistry.java @@ -65,7 +65,7 @@ public class TestZKConnectionRegistry { public static void setUp() throws Exception { TEST_UTIL.startMiniCluster(3); HBaseTestingUtil.setReplicas(TEST_UTIL.getAdmin(), TableName.META_TABLE_NAME, 3); - REGISTRY = new ZKConnectionRegistry(TEST_UTIL.getConfiguration()); + REGISTRY = new ZKConnectionRegistry(TEST_UTIL.getConfiguration(), null); } @AfterClass @@ -99,7 +99,7 @@ public void testIndependentZKConnections() throws IOException { try (ReadOnlyZKClient zk1 = REGISTRY.getZKClient()) { Configuration otherConf = new Configuration(TEST_UTIL.getConfiguration()); otherConf.set(HConstants.ZOOKEEPER_QUORUM, MiniZooKeeperCluster.HOST); - try (ZKConnectionRegistry otherRegistry = new ZKConnectionRegistry(otherConf)) { + try (ZKConnectionRegistry otherRegistry = new ZKConnectionRegistry(otherConf, null)) { ReadOnlyZKClient zk2 = otherRegistry.getZKClient(); assertNotSame("Using a different configuration / quorum should result in different " + "backing zk connection.", zk1, zk2); @@ -116,7 +116,7 @@ public void testIndependentZKConnections() throws IOException { public void testNoMetaAvailable() throws InterruptedException { Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); conf.set("zookeeper.znode.metaserver", "whatever"); - try (ZKConnectionRegistry registry = new ZKConnectionRegistry(conf)) { + try (ZKConnectionRegistry registry = new ZKConnectionRegistry(conf, null)) { try { registry.getMetaRegionLocations().get(); fail("Should have failed since we set an incorrect meta znode prefix"); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java index a93f54d4d9d1..e4427c1690c3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java @@ -34,6 +34,7 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.startsWith; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; @@ -42,8 +43,10 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import static org.mockito.internal.verification.VerificationModeFactory.times; import io.opentelemetry.api.common.AttributeKey; @@ -63,12 +66,18 @@ import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HBaseServerBase; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.MatcherPredicate; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.io.compress.GzipCodec; +import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.util.StringUtils; import org.hamcrest.Matcher; import org.junit.Rule; @@ -78,6 +87,8 @@ import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList; import org.apache.hbase.thirdparty.com.google.common.collect.Lists; +import org.apache.hbase.thirdparty.com.google.protobuf.BlockingRpcChannel; +import org.apache.hbase.thirdparty.com.google.protobuf.RpcChannel; import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestProtos.EchoRequestProto; @@ -88,6 +99,9 @@ import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.BlockingInterface; import org.apache.hadoop.hbase.shaded.ipc.protobuf.generated.TestRpcServiceProtos.TestProtobufRpcProto.Interface; import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ConnectionRegistryService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetConnectionRegistryRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetConnectionRegistryResponse; /** * Some basic ipc tests. @@ -105,9 +119,14 @@ public abstract class AbstractTestIPC { CONF.set(RpcServerFactory.CUSTOM_RPC_SERVER_IMPL_CONF_KEY, SimpleRpcServer.class.getName()); } - protected abstract RpcServer createRpcServer(final String name, - final List services, final InetSocketAddress bindAddress, - Configuration conf, RpcScheduler scheduler) throws IOException; + protected abstract RpcServer createRpcServer(Server server, String name, + List services, InetSocketAddress bindAddress, Configuration conf, + RpcScheduler scheduler) throws IOException; + + private RpcServer createRpcServer(String name, List services, + InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler) throws IOException { + return createRpcServer(null, name, services, bindAddress, conf, scheduler); + } protected abstract AbstractRpcClient createRpcClientNoCodec(Configuration conf); @@ -568,4 +587,62 @@ public void testBadPreambleHeader() throws IOException, ServiceException { rpcServer.stop(); } } + + /** + * Testcase for getting connection registry information through connection preamble header, see + * HBASE-25051 for more details. + */ + @Test + public void testGetConnectionRegistry() throws IOException, ServiceException { + Configuration clientConf = new Configuration(CONF); + String clusterId = "test_cluster_id"; + HBaseServerBase server = mock(HBaseServerBase.class); + when(server.getClusterId()).thenReturn(clusterId); + // do not need any services + RpcServer rpcServer = createRpcServer(server, "testRpcServer", Collections.emptyList(), + new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); + try (AbstractRpcClient client = createRpcClient(clientConf)) { + rpcServer.start(); + InetSocketAddress addr = rpcServer.getListenerAddress(); + BlockingRpcChannel channel = + client.createBlockingRpcChannel(ServerName.valueOf(addr.getHostName(), addr.getPort(), + EnvironmentEdgeManager.currentTime()), User.getCurrent(), 0); + ConnectionRegistryService.BlockingInterface stub = + ConnectionRegistryService.newBlockingStub(channel); + GetConnectionRegistryResponse resp = + stub.getConnectionRegistry(null, GetConnectionRegistryRequest.getDefaultInstance()); + assertEquals(clusterId, resp.getClusterId()); + } + } + + /** + * Test server does not support getting connection registry information through connection + * preamble header, i.e, a new client connecting to an old server. We simulate this by using a + * Server without implementing the ConnectionRegistryEndpoint interface. + */ + @Test + public void testGetConnectionRegistryError() throws IOException, ServiceException { + Configuration clientConf = new Configuration(CONF); + // do not need any services + RpcServer rpcServer = createRpcServer("testRpcServer", Collections.emptyList(), + new InetSocketAddress("localhost", 0), CONF, new FifoRpcScheduler(CONF, 1)); + try (AbstractRpcClient client = createRpcClient(clientConf)) { + rpcServer.start(); + InetSocketAddress addr = rpcServer.getListenerAddress(); + RpcChannel channel = client.createRpcChannel(ServerName.valueOf(addr.getHostName(), + addr.getPort(), EnvironmentEdgeManager.currentTime()), User.getCurrent(), 0); + ConnectionRegistryService.Interface stub = ConnectionRegistryService.newStub(channel); + HBaseRpcController pcrc = new HBaseRpcControllerImpl(); + BlockingRpcCallback done = new BlockingRpcCallback<>(); + stub.getConnectionRegistry(pcrc, GetConnectionRegistryRequest.getDefaultInstance(), done); + // should have failed so no response + assertNull(done.get()); + assertTrue(pcrc.failed()); + // should be a FatalConnectionException + assertThat(pcrc.getFailed(), instanceOf(RemoteException.class)); + assertEquals(FatalConnectionException.class.getName(), + ((RemoteException) pcrc.getFailed()).getClassName()); + assertThat(pcrc.getFailed().getMessage(), startsWith("Expected HEADER=")); + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBlockingIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBlockingIPC.java index 9544e8c35458..e60cc879fd4f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBlockingIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestBlockingIPC.java @@ -40,10 +40,10 @@ public class TestBlockingIPC extends AbstractTestIPC { HBaseClassTestRule.forClass(TestBlockingIPC.class); @Override - protected RpcServer createRpcServer(String name, + protected RpcServer createRpcServer(Server server, String name, List services, InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler) throws IOException { - return RpcServerFactory.createRpcServer(null, name, services, bindAddress, conf, scheduler); + return RpcServerFactory.createRpcServer(server, name, services, bindAddress, conf, scheduler); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java index 6feab5f2cac8..a1b60e2cfa45 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyIPC.java @@ -24,6 +24,7 @@ import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RPCTests; @@ -103,10 +104,10 @@ private void setConf(Configuration conf) { } @Override - protected RpcServer createRpcServer(String name, + protected RpcServer createRpcServer(Server server, String name, List services, InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler) throws IOException { - return new NettyRpcServer(null, name, services, bindAddress, conf, scheduler, true); + return new NettyRpcServer(server, name, services, bindAddress, conf, scheduler, true); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyTlsIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyTlsIPC.java index 4c654123e130..1cbf6be26c65 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyTlsIPC.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestNettyTlsIPC.java @@ -31,6 +31,8 @@ import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseCommonTestingUtil; import org.apache.hadoop.hbase.HBaseServerBase; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.client.ConnectionRegistryEndpoint; import org.apache.hadoop.hbase.codec.Codec; import org.apache.hadoop.hbase.io.crypto.tls.KeyStoreFileType; import org.apache.hadoop.hbase.io.crypto.tls.X509KeyType; @@ -65,8 +67,6 @@ public class TestNettyTlsIPC extends AbstractTestIPC { private static NettyEventLoopGroupConfig EVENT_LOOP_GROUP_CONFIG; - private static HBaseServerBase SERVER; - @Parameterized.Parameter(0) public X509KeyType caKeyType; @@ -115,8 +115,6 @@ public static void setUpBeforeClass() throws IOException { PROVIDER = new X509TestContextProvider(CONF, dir); EVENT_LOOP_GROUP_CONFIG = NettyEventLoopGroupConfig.setup(CONF, TestNettyTlsIPC.class.getSimpleName()); - SERVER = mock(HBaseServerBase.class); - when(SERVER.getEventLoopGroupConfig()).thenReturn(EVENT_LOOP_GROUP_CONFIG); } @AfterClass @@ -147,9 +145,16 @@ public void tearDown() { } @Override - protected RpcServer createRpcServer(String name, List services, - InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler) throws IOException { - return new NettyRpcServer(SERVER, name, services, bindAddress, conf, scheduler, true); + protected RpcServer createRpcServer(Server server, String name, + List services, InetSocketAddress bindAddress, Configuration conf, + RpcScheduler scheduler) throws IOException { + HBaseServerBase mockServer = mock(HBaseServerBase.class); + when(mockServer.getEventLoopGroupConfig()).thenReturn(EVENT_LOOP_GROUP_CONFIG); + if (server instanceof ConnectionRegistryEndpoint) { + String clusterId = ((ConnectionRegistryEndpoint) server).getClusterId(); + when(mockServer.getClusterId()).thenReturn(clusterId); + } + return new NettyRpcServer(mockServer, name, services, bindAddress, conf, scheduler, true); } @Override @@ -184,7 +189,9 @@ protected boolean isTcpNoDelay() { protected RpcServer createTestFailingRpcServer(String name, List services, InetSocketAddress bindAddress, Configuration conf, RpcScheduler scheduler) throws IOException { - return new FailingNettyRpcServer(SERVER, name, services, bindAddress, conf, scheduler); + HBaseServerBase mockServer = mock(HBaseServerBase.class); + when(mockServer.getEventLoopGroupConfig()).thenReturn(EVENT_LOOP_GROUP_CONFIG); + return new FailingNettyRpcServer(mockServer, name, services, bindAddress, conf, scheduler); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java index 93fa22c00fd3..d6c7a0250015 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestWALEntrySinkFilter.java @@ -34,15 +34,17 @@ import org.apache.hadoop.hbase.CellScanner; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer; import org.apache.hadoop.hbase.client.AsyncClusterConnection; import org.apache.hadoop.hbase.client.AsyncConnection; import org.apache.hadoop.hbase.client.AsyncTable; import org.apache.hadoop.hbase.client.ClusterConnectionFactory; +import org.apache.hadoop.hbase.client.ConnectionRegistry; +import org.apache.hadoop.hbase.client.DoNothingConnectionRegistry; import org.apache.hadoop.hbase.client.DummyAsyncClusterConnection; import org.apache.hadoop.hbase.client.DummyAsyncTable; -import org.apache.hadoop.hbase.client.DummyConnectionRegistry; import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.ReplicationTests; @@ -84,8 +86,8 @@ public class TestWALEntrySinkFilter { public void testWALEntryFilter() throws IOException { Configuration conf = HBaseConfiguration.create(); // Make it so our filter is instantiated on construction of ReplicationSink. - conf.setClass(DummyConnectionRegistry.REGISTRY_IMPL_CONF_KEY, DevNullConnectionRegistry.class, - DummyConnectionRegistry.class); + conf.setClass(HConstants.CLIENT_CONNECTION_REGISTRY_IMPL_CONF_KEY, + DevNullConnectionRegistry.class, ConnectionRegistry.class); conf.setClass(WALEntrySinkFilter.WAL_ENTRY_FILTER_KEY, IfTimeIsGreaterThanBOUNDARYWALEntrySinkFilterImpl.class, WALEntrySinkFilter.class); conf.setClass(ClusterConnectionFactory.HBASE_SERVER_CLUSTER_CONNECTION_IMPL, @@ -166,9 +168,10 @@ public boolean filter(TableName table, long writeTime) { } } - public static class DevNullConnectionRegistry extends DummyConnectionRegistry { + public static class DevNullConnectionRegistry extends DoNothingConnectionRegistry { - public DevNullConnectionRegistry(Configuration conf) { + public DevNullConnectionRegistry(Configuration conf, User user) { + super(conf, user); } @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestGenerateDelegationToken.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestGenerateDelegationToken.java index 16ac215acaff..f132eb6964b1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestGenerateDelegationToken.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/security/token/TestGenerateDelegationToken.java @@ -21,22 +21,26 @@ import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertThrows; import java.io.IOException; +import java.security.PrivilegedExceptionAction; import java.util.Arrays; import java.util.Collection; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.AsyncConnection; +import org.apache.hadoop.hbase.client.AsyncTable; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel; import org.apache.hadoop.hbase.ipc.NettyRpcClient; import org.apache.hadoop.hbase.ipc.RpcClientFactory; import org.apache.hadoop.hbase.security.AccessDeniedException; +import org.apache.hadoop.hbase.security.User; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.SecurityTests; +import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; import org.apache.hadoop.security.token.Token; @@ -51,11 +55,9 @@ import org.junit.runners.Parameterized.Parameter; import org.junit.runners.Parameterized.Parameters; -import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; - -import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AuthenticationProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AuthenticationProtos.AuthenticationService; import org.apache.hadoop.hbase.shaded.protobuf.generated.AuthenticationProtos.GetAuthenticationTokenRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AuthenticationProtos.GetAuthenticationTokenResponse; import org.apache.hadoop.hbase.shaded.protobuf.generated.AuthenticationProtos.WhoAmIRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AuthenticationProtos.WhoAmIResponse; @@ -92,24 +94,58 @@ public void setUpBeforeMethod() { rpcClientImpl); } - @Test - public void test() throws Exception { - try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration()); - Table table = conn.getTable(TableName.META_TABLE_NAME)) { - CoprocessorRpcChannel rpcChannel = table.coprocessorService(HConstants.EMPTY_START_ROW); - AuthenticationProtos.AuthenticationService.BlockingInterface service = - AuthenticationProtos.AuthenticationService.newBlockingStub(rpcChannel); - WhoAmIResponse response = service.whoAmI(null, WhoAmIRequest.getDefaultInstance()); + private void testToken() throws Exception { + try (AsyncConnection conn = + ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get()) { + AsyncTable table = conn.getTable(TableName.META_TABLE_NAME); + WhoAmIResponse response = + table. coprocessorService( + AuthenticationService::newStub, + (s, c, r) -> s.whoAmI(c, WhoAmIRequest.getDefaultInstance(), r), + HConstants.EMPTY_START_ROW).get(); assertEquals(USERNAME, response.getUsername()); assertEquals(AuthenticationMethod.TOKEN.name(), response.getAuthMethod()); - try { - service.getAuthenticationToken(null, GetAuthenticationTokenRequest.getDefaultInstance()); - } catch (ServiceException e) { - IOException ioe = ProtobufUtil.getRemoteException(e); - assertThat(ioe, instanceOf(AccessDeniedException.class)); - assertThat(ioe.getMessage(), - containsString("Token generation only allowed for Kerberos authenticated clients")); - } + IOException ioe = + assertThrows(IOException.class, + () -> FutureUtils.get(table. coprocessorService(AuthenticationService::newStub, + (s, c, r) -> s.getAuthenticationToken(c, + GetAuthenticationTokenRequest.getDefaultInstance(), r), + HConstants.EMPTY_START_ROW))); + assertThat(ioe, instanceOf(AccessDeniedException.class)); + assertThat(ioe.getMessage(), + containsString("Token generation only allowed for Kerberos authenticated clients")); } + + } + + /** + * Confirm that we will use delegation token first if token and kerberos tickets are both present + */ + @Test + public void testTokenFirst() throws Exception { + testToken(); + } + + /** + * Confirm that we can connect to cluster successfully when there is only token present, i.e, no + * kerberos ticket + */ + @Test + public void testOnlyToken() throws Exception { + User user = + User.createUserForTesting(TEST_UTIL.getConfiguration(), "no_krb_user", new String[0]); + for (Token token : User.getCurrent().getUGI().getCredentials() + .getAllTokens()) { + user.getUGI().addToken(token); + } + user.getUGI().doAs(new PrivilegedExceptionAction() { + + @Override + public Void run() throws Exception { + testToken(); + return null; + } + }); } } From e3355a4cf57ad59cfa59167508246e9fb97d5e33 Mon Sep 17 00:00:00 2001 From: Duo Zhang Date: Sat, 3 Feb 2024 22:03:53 +0800 Subject: [PATCH 2/2] fix NPE --- .../client/ConnectionRegistryRpcStubHolder.java | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryRpcStubHolder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryRpcStubHolder.java index 11a37b4afac3..3dbcfbe8e6bf 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryRpcStubHolder.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionRegistryRpcStubHolder.java @@ -101,8 +101,11 @@ private ImmutableMap createStubs(RpcCli return builder.build(); } - private void fetchClusterIdAndCreateStubs() { - addr2StubFuture = new CompletableFuture<>(); + private CompletableFuture> + fetchClusterIdAndCreateStubs() { + CompletableFuture> future = + new CompletableFuture<>(); + addr2StubFuture = future; FutureUtils.addListener( new ClusterIdFetcher(noAuthConf, user, rpcControllerFactory, bootstrapNodes).fetchClusterId(), (clusterId, error) -> { @@ -120,6 +123,11 @@ private void fetchClusterIdAndCreateStubs() { addr2StubFuture = null; } }); + // here we must use the local variable future instead of addr2StubFuture, as the above listener + // could be executed directly in the same thread(if the future completes quick enough), since + // the synchronized lock is reentrant, it could set addr2StubFuture to null in the end, so when + // arriving here the addr2StubFuture could be null. + return future; } CompletableFuture> getStubs() { @@ -135,8 +143,7 @@ CompletableFuture> getStub if (addr2StubFuture != null) { return addr2StubFuture; } - fetchClusterIdAndCreateStubs(); - return addr2StubFuture; + return fetchClusterIdAndCreateStubs(); } }