diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/HBaseRpcServicesBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/HBaseRpcServicesBase.java new file mode 100644 index 000000000000..4bd0b3304d40 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/HBaseRpcServicesBase.java @@ -0,0 +1,386 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.net.BindException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.ThreadLocalRandom; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.ConnectionUtils; +import org.apache.hadoop.hbase.conf.ConfigurationObserver; +import org.apache.hadoop.hbase.io.ByteBuffAllocator; +import org.apache.hadoop.hbase.ipc.HBaseRPCErrorHandler; +import org.apache.hadoop.hbase.ipc.PriorityFunction; +import org.apache.hadoop.hbase.ipc.QosPriority; +import org.apache.hadoop.hbase.ipc.RpcScheduler; +import org.apache.hadoop.hbase.ipc.RpcServer; +import org.apache.hadoop.hbase.ipc.RpcServer.BlockingServiceAndInterface; +import org.apache.hadoop.hbase.ipc.RpcServerFactory; +import org.apache.hadoop.hbase.ipc.RpcServerInterface; +import org.apache.hadoop.hbase.namequeues.NamedQueuePayload; +import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder; +import org.apache.hadoop.hbase.namequeues.RpcLogDetails; +import org.apache.hadoop.hbase.namequeues.request.NamedQueueGetRequest; +import org.apache.hadoop.hbase.namequeues.response.NamedQueueGetResponse; +import org.apache.hadoop.hbase.net.Address; +import org.apache.hadoop.hbase.regionserver.RpcSchedulerFactory; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.access.AccessChecker; +import org.apache.hadoop.hbase.security.access.NoopAccessChecker; +import org.apache.hadoop.hbase.security.access.Permission; +import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher; +import org.apache.hadoop.hbase.util.DNS; +import org.apache.hadoop.hbase.util.OOMEChecker; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.zookeeper.KeeperException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.protobuf.ByteString; +import org.apache.hbase.thirdparty.com.google.protobuf.Message; +import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; +import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; + +import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponseRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearSlowLogResponses; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponseRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.SlowLogResponses; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.ClientMetaService; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetActiveMasterResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetBootstrapNodesResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetClusterIdResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMastersResponseEntry; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.RegistryProtos.GetMetaRegionLocationsResponse; +import org.apache.hadoop.hbase.shaded.protobuf.generated.TooSlowLog.SlowLogPayload; + +/** + * Base class for Master and RegionServer RpcServices. + */ +@InterfaceAudience.Private +public abstract class HBaseRpcServicesBase> + implements ClientMetaService.BlockingInterface, AdminService.BlockingInterface, + HBaseRPCErrorHandler, PriorityFunction, ConfigurationObserver { + + private static final Logger LOG = LoggerFactory.getLogger(HBaseRpcServicesBase.class); + + public static final String CLIENT_BOOTSTRAP_NODE_LIMIT = "hbase.client.bootstrap.node.limit"; + + public static final int DEFAULT_CLIENT_BOOTSTRAP_NODE_LIMIT = 10; + + protected final S server; + + // Server to handle client requests. + protected final RpcServer rpcServer; + + private final InetSocketAddress isa; + + protected final PriorityFunction priority; + + private AccessChecker accessChecker; + + private ZKPermissionWatcher zkPermissionWatcher; + + protected HBaseRpcServicesBase(S server, String processName) throws IOException { + this.server = server; + Configuration conf = server.getConfiguration(); + final RpcSchedulerFactory rpcSchedulerFactory; + try { + rpcSchedulerFactory = getRpcSchedulerFactoryClass(conf).asSubclass(RpcSchedulerFactory.class) + .getDeclaredConstructor().newInstance(); + } catch (NoSuchMethodException | InvocationTargetException | InstantiationException + | IllegalAccessException e) { + throw new IllegalArgumentException(e); + } + String hostname = DNS.getHostname(conf, getDNSServerType()); + int port = conf.getInt(getPortConfigName(), getDefaultPort()); + // Creation of a HSA will force a resolve. + final InetSocketAddress initialIsa = new InetSocketAddress(hostname, port); + final InetSocketAddress bindAddress = new InetSocketAddress(getHostname(conf, hostname), port); + if (initialIsa.getAddress() == null) { + throw new IllegalArgumentException("Failed resolve of " + initialIsa); + } + priority = createPriority(); + // Using Address means we don't get the IP too. Shorten it more even to just the host name + // w/o the domain. + final String name = processName + "/" + + Address.fromParts(initialIsa.getHostName(), initialIsa.getPort()).toStringWithoutDomain(); + server.setName(name); + // Set how many times to retry talking to another server over Connection. + ConnectionUtils.setServerSideHConnectionRetriesConfig(conf, name, LOG); + boolean reservoirEnabled = + conf.getBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, defaultReservoirEnabled()); + try { + // use final bindAddress for this server. + rpcServer = RpcServerFactory.createRpcServer(server, name, getServices(), bindAddress, conf, + rpcSchedulerFactory.create(conf, this, server), reservoirEnabled); + } catch (BindException be) { + throw new IOException(be.getMessage() + ". To switch ports use the '" + getPortConfigName() + + "' configuration property.", be.getCause() != null ? be.getCause() : be); + } + final InetSocketAddress address = rpcServer.getListenerAddress(); + if (address == null) { + throw new IOException("Listener channel is closed"); + } + // Set our address, however we need the final port that was given to rpcServer + isa = new InetSocketAddress(initialIsa.getHostName(), address.getPort()); + rpcServer.setErrorHandler(this); + } + + protected abstract boolean defaultReservoirEnabled(); + + protected abstract DNS.ServerType getDNSServerType(); + + protected abstract String getHostname(Configuration conf, String defaultHostname); + + protected abstract String getPortConfigName(); + + protected abstract int getDefaultPort(); + + protected abstract PriorityFunction createPriority(); + + protected abstract Class getRpcSchedulerFactoryClass(Configuration conf); + + protected abstract List getServices(); + + protected final void internalStart(ZKWatcher zkWatcher) { + if (AccessChecker.isAuthorizationSupported(getConfiguration())) { + accessChecker = new AccessChecker(getConfiguration()); + } else { + accessChecker = new NoopAccessChecker(getConfiguration()); + } + zkPermissionWatcher = + new ZKPermissionWatcher(zkWatcher, accessChecker.getAuthManager(), getConfiguration()); + try { + zkPermissionWatcher.start(); + } catch (KeeperException e) { + LOG.error("ZooKeeper permission watcher initialization failed", e); + } + rpcServer.start(); + } + + protected final void requirePermission(String request, Permission.Action perm) + throws IOException { + if (accessChecker != null) { + accessChecker.requirePermission(RpcServer.getRequestUser().orElse(null), request, null, perm); + } + } + + public AccessChecker getAccessChecker() { + return accessChecker; + } + + public ZKPermissionWatcher getZkPermissionWatcher() { + return zkPermissionWatcher; + } + + protected final void internalStop() { + if (zkPermissionWatcher != null) { + zkPermissionWatcher.close(); + } + rpcServer.stop(); + } + + public Configuration getConfiguration() { + return server.getConfiguration(); + } + + public S getServer() { + return server; + } + + public InetSocketAddress getSocketAddress() { + return isa; + } + + public RpcServerInterface getRpcServer() { + return rpcServer; + } + + public RpcScheduler getRpcScheduler() { + return rpcServer.getScheduler(); + } + + @Override + public int getPriority(RequestHeader header, Message param, User user) { + return priority.getPriority(header, param, user); + } + + @Override + public long getDeadline(RequestHeader header, Message param) { + return priority.getDeadline(header, param); + } + + /** + * Check if an OOME and, if so, abort immediately to avoid creating more objects. + * @return True if we OOME'd and are aborting. + */ + @Override + public boolean checkOOME(Throwable e) { + return OOMEChecker.exitIfOOME(e, getClass().getSimpleName()); + } + + @Override + public void onConfigurationChange(Configuration conf) { + rpcServer.onConfigurationChange(conf); + } + + @Override + public GetClusterIdResponse getClusterId(RpcController controller, GetClusterIdRequest request) + throws ServiceException { + return GetClusterIdResponse.newBuilder().setClusterId(server.getClusterId()).build(); + } + + @Override + public GetActiveMasterResponse getActiveMaster(RpcController controller, + GetActiveMasterRequest request) throws ServiceException { + GetActiveMasterResponse.Builder builder = GetActiveMasterResponse.newBuilder(); + server.getActiveMaster() + .ifPresent(name -> builder.setServerName(ProtobufUtil.toServerName(name))); + return builder.build(); + } + + @Override + public GetMastersResponse getMasters(RpcController controller, GetMastersRequest request) + throws ServiceException { + GetMastersResponse.Builder builder = GetMastersResponse.newBuilder(); + server.getActiveMaster() + .ifPresent(activeMaster -> builder.addMasterServers(GetMastersResponseEntry.newBuilder() + .setServerName(ProtobufUtil.toServerName(activeMaster)).setIsActive(true))); + server.getBackupMasters() + .forEach(backupMaster -> builder.addMasterServers(GetMastersResponseEntry.newBuilder() + .setServerName(ProtobufUtil.toServerName(backupMaster)).setIsActive(false))); + return builder.build(); + } + + @Override + public GetMetaRegionLocationsResponse getMetaRegionLocations(RpcController controller, + GetMetaRegionLocationsRequest request) throws ServiceException { + GetMetaRegionLocationsResponse.Builder builder = GetMetaRegionLocationsResponse.newBuilder(); + server.getMetaLocations() + .forEach(location -> builder.addMetaLocations(ProtobufUtil.toRegionLocation(location))); + return builder.build(); + } + + @Override + public final GetBootstrapNodesResponse getBootstrapNodes(RpcController controller, + GetBootstrapNodesRequest request) throws ServiceException { + List bootstrapNodes = new ArrayList<>(server.getRegionServers()); + Collections.shuffle(bootstrapNodes, ThreadLocalRandom.current()); + int maxNodeCount = server.getConfiguration().getInt(CLIENT_BOOTSTRAP_NODE_LIMIT, + DEFAULT_CLIENT_BOOTSTRAP_NODE_LIMIT); + GetBootstrapNodesResponse.Builder builder = GetBootstrapNodesResponse.newBuilder(); + bootstrapNodes.stream().limit(maxNodeCount).map(ProtobufUtil::toServerName) + .forEach(builder::addServerName); + return builder.build(); + } + + @Override + public UpdateConfigurationResponse updateConfiguration(RpcController controller, + UpdateConfigurationRequest request) throws ServiceException { + try { + requirePermission("updateConfiguration", Permission.Action.ADMIN); + this.server.updateConfiguration(); + } catch (Exception e) { + throw new ServiceException(e); + } + return UpdateConfigurationResponse.getDefaultInstance(); + } + + @Override + @QosPriority(priority = HConstants.ADMIN_QOS) + public ClearSlowLogResponses clearSlowLogsResponses(final RpcController controller, + final ClearSlowLogResponseRequest request) throws ServiceException { + try { + requirePermission("clearSlowLogsResponses", Permission.Action.ADMIN); + } catch (IOException e) { + throw new ServiceException(e); + } + final NamedQueueRecorder namedQueueRecorder = this.server.getNamedQueueRecorder(); + boolean slowLogsCleaned = Optional.ofNullable(namedQueueRecorder) + .map( + queueRecorder -> queueRecorder.clearNamedQueue(NamedQueuePayload.NamedQueueEvent.SLOW_LOG)) + .orElse(false); + ClearSlowLogResponses clearSlowLogResponses = + ClearSlowLogResponses.newBuilder().setIsCleaned(slowLogsCleaned).build(); + return clearSlowLogResponses; + } + + private List getSlowLogPayloads(SlowLogResponseRequest request, + NamedQueueRecorder namedQueueRecorder) { + if (namedQueueRecorder == null) { + return Collections.emptyList(); + } + List slowLogPayloads; + NamedQueueGetRequest namedQueueGetRequest = new NamedQueueGetRequest(); + namedQueueGetRequest.setNamedQueueEvent(RpcLogDetails.SLOW_LOG_EVENT); + namedQueueGetRequest.setSlowLogResponseRequest(request); + NamedQueueGetResponse namedQueueGetResponse = + namedQueueRecorder.getNamedQueueRecords(namedQueueGetRequest); + slowLogPayloads = namedQueueGetResponse != null ? namedQueueGetResponse.getSlowLogPayloads() : + Collections.emptyList(); + return slowLogPayloads; + } + + @Override + @QosPriority(priority = HConstants.ADMIN_QOS) + public HBaseProtos.LogEntry getLogEntries(RpcController controller, + HBaseProtos.LogRequest request) throws ServiceException { + try { + final String logClassName = request.getLogClassName(); + Class logClass = Class.forName(logClassName).asSubclass(Message.class); + Method method = logClass.getMethod("parseFrom", ByteString.class); + if (logClassName.contains("SlowLogResponseRequest")) { + SlowLogResponseRequest slowLogResponseRequest = + (SlowLogResponseRequest) method.invoke(null, request.getLogMessage()); + final NamedQueueRecorder namedQueueRecorder = this.server.getNamedQueueRecorder(); + final List slowLogPayloads = + getSlowLogPayloads(slowLogResponseRequest, namedQueueRecorder); + SlowLogResponses slowLogResponses = + SlowLogResponses.newBuilder().addAllSlowLogPayloads(slowLogPayloads).build(); + return HBaseProtos.LogEntry.newBuilder() + .setLogClassName(slowLogResponses.getClass().getName()) + .setLogMessage(slowLogResponses.toByteString()).build(); + } + } catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException + | InvocationTargetException e) { + LOG.error("Error while retrieving log entries.", e); + throw new ServiceException(e); + } + throw new ServiceException("Invalid request params"); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/HBaseServerBase.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/HBaseServerBase.java new file mode 100644 index 000000000000..316f1b6a7b27 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/HBaseServerBase.java @@ -0,0 +1,600 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK; +import static org.apache.hadoop.hbase.HConstants.HBASE_SPLIT_WAL_COORDINATED_BY_ZK; + +import com.google.errorprone.annotations.RestrictedApi; +import java.io.IOException; +import java.lang.management.MemoryType; +import java.net.BindException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.concurrent.atomic.AtomicBoolean; +import javax.servlet.http.HttpServlet; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.SystemUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.AsyncClusterConnection; +import org.apache.hadoop.hbase.client.ClusterConnectionFactory; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.ConnectionRegistryEndpoint; +import org.apache.hadoop.hbase.conf.ConfigurationManager; +import org.apache.hadoop.hbase.conf.ConfigurationObserver; +import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager; +import org.apache.hadoop.hbase.executor.ExecutorService; +import org.apache.hadoop.hbase.fs.HFileSystem; +import org.apache.hadoop.hbase.http.InfoServer; +import org.apache.hadoop.hbase.io.util.MemorySizeUtil; +import org.apache.hadoop.hbase.ipc.RpcServerInterface; +import org.apache.hadoop.hbase.master.HMaster; +import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder; +import org.apache.hadoop.hbase.regionserver.ChunkCreator; +import org.apache.hadoop.hbase.regionserver.HeapMemoryManager; +import org.apache.hadoop.hbase.regionserver.MemStoreLAB; +import org.apache.hadoop.hbase.security.Superusers; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.security.UserProvider; +import org.apache.hadoop.hbase.security.access.AccessChecker; +import org.apache.hadoop.hbase.security.access.ZKPermissionWatcher; +import org.apache.hadoop.hbase.util.Addressing; +import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FSTableDescriptors; +import org.apache.hadoop.hbase.util.NettyEventLoopGroupConfig; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.Sleeper; +import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker; +import org.apache.hadoop.hbase.zookeeper.ZKUtil; +import org.apache.hadoop.hbase.zookeeper.ZKWatcher; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import sun.misc.Signal; + +/** + * Base class for hbase services, such as master or region server. + */ +@InterfaceAudience.Private +public abstract class HBaseServerBase> extends Thread + implements Server, ConfigurationObserver, ConnectionRegistryEndpoint { + + private static final Logger LOG = LoggerFactory.getLogger(HBaseServerBase.class); + + protected final Configuration conf; + + // Go down hard. Used if file system becomes unavailable and also in + // debugging and unit tests. + protected final AtomicBoolean abortRequested = new AtomicBoolean(false); + + // Set when a report to the master comes back with a message asking us to + // shutdown. Also set by call to stop when debugging or running unit tests + // of HRegionServer in isolation. + protected volatile boolean stopped = false; + + /** + * This servers startcode. + */ + protected final long startcode; + + protected final UserProvider userProvider; + + // zookeeper connection and watcher + protected final ZKWatcher zooKeeper; + + /** + * The server name the Master sees us as. Its made from the hostname the master passes us, port, + * and server startcode. Gets set after registration against Master. + */ + protected ServerName serverName; + + protected final R rpcServices; + + /** + * hostname specified by hostname config + */ + protected final String useThisHostnameInstead; + + /** + * Provide online slow log responses from ringbuffer + */ + protected final NamedQueueRecorder namedQueueRecorder; + + /** + * Configuration manager is used to register/deregister and notify the configuration observers + * when the regionserver is notified that there was a change in the on disk configs. + */ + protected final ConfigurationManager configurationManager; + + /** + * ChoreService used to schedule tasks that we want to run periodically + */ + protected final ChoreService choreService; + + // Instance of the hbase executor executorService. + protected final ExecutorService executorService; + + // Cluster Status Tracker + protected final ClusterStatusTracker clusterStatusTracker; + + protected final CoordinatedStateManager csm; + + // Info server. Default access so can be used by unit tests. REGIONSERVER + // is name of the webapp and the attribute name used stuffing this instance + // into web context. + protected InfoServer infoServer; + + protected HFileSystem dataFs; + + protected HFileSystem walFs; + + protected Path dataRootDir; + + protected Path walRootDir; + + protected final int msgInterval; + + // A sleeper that sleeps for msgInterval. + protected final Sleeper sleeper; + + /** + * Go here to get table descriptors. + */ + protected TableDescriptors tableDescriptors; + + /** + * The asynchronous cluster connection to be shared by services. + */ + protected AsyncClusterConnection asyncClusterConnection; + + /** + * Cache for the meta region replica's locations. Also tracks their changes to avoid stale cache + * entries. Used for serving ClientMetaService. + */ + protected final MetaRegionLocationCache metaRegionLocationCache; + + protected final NettyEventLoopGroupConfig eventLoopGroupConfig; + + /** + * If running on Windows, do windows-specific setup. + */ + private static void setupWindows(final Configuration conf, ConfigurationManager cm) { + if (!SystemUtils.IS_OS_WINDOWS) { + Signal.handle(new Signal("HUP"), signal -> { + conf.reloadConfiguration(); + cm.notifyAllObservers(conf); + }); + } + } + + /** + * Setup our cluster connection if not already initialized. + */ + protected final synchronized void setupClusterConnection() throws IOException { + if (asyncClusterConnection == null) { + InetSocketAddress localAddress = + new InetSocketAddress(rpcServices.getSocketAddress().getAddress(), 0); + User user = userProvider.getCurrent(); + asyncClusterConnection = + ClusterConnectionFactory.createAsyncClusterConnection(this, conf, localAddress, user); + } + } + + protected final void initializeFileSystem() throws IOException { + // Get fs instance used by this RS. Do we use checksum verification in the hbase? If hbase + // checksum verification enabled, then automatically switch off hdfs checksum verification. + boolean useHBaseChecksum = conf.getBoolean(HConstants.HBASE_CHECKSUM_VERIFICATION, true); + String walDirUri = CommonFSUtils.getDirUri(this.conf, + new Path(conf.get(CommonFSUtils.HBASE_WAL_DIR, conf.get(HConstants.HBASE_DIR)))); + // set WAL's uri + if (walDirUri != null) { + CommonFSUtils.setFsDefault(this.conf, walDirUri); + } + // init the WALFs + this.walFs = new HFileSystem(this.conf, useHBaseChecksum); + this.walRootDir = CommonFSUtils.getWALRootDir(this.conf); + // Set 'fs.defaultFS' to match the filesystem on hbase.rootdir else + // underlying hadoop hdfs accessors will be going against wrong filesystem + // (unless all is set to defaults). + String rootDirUri = + CommonFSUtils.getDirUri(this.conf, new Path(conf.get(HConstants.HBASE_DIR))); + if (rootDirUri != null) { + CommonFSUtils.setFsDefault(this.conf, rootDirUri); + } + // init the filesystem + this.dataFs = new HFileSystem(this.conf, useHBaseChecksum); + this.dataRootDir = CommonFSUtils.getRootDir(this.conf); + this.tableDescriptors = new FSTableDescriptors(this.dataFs, this.dataRootDir, + !canUpdateTableDescriptor(), cacheTableDescriptor()); + } + + public HBaseServerBase(Configuration conf, String name) + throws ZooKeeperConnectionException, IOException { + super(name); // thread name + this.conf = conf; + this.eventLoopGroupConfig = + NettyEventLoopGroupConfig.setup(conf, getClass().getSimpleName() + "-EventLoopGroup"); + this.startcode = EnvironmentEdgeManager.currentTime(); + this.userProvider = UserProvider.instantiate(conf); + this.msgInterval = conf.getInt("hbase.regionserver.msginterval", 3 * 1000); + this.sleeper = new Sleeper(this.msgInterval, this); + this.namedQueueRecorder = createNamedQueueRecord(); + this.rpcServices = createRpcServices(); + useThisHostnameInstead = getUseThisHostnameInstead(conf); + InetSocketAddress addr = rpcServices.getSocketAddress(); + String hostName = StringUtils.isBlank(useThisHostnameInstead) ? addr.getHostName() : + this.useThisHostnameInstead; + serverName = ServerName.valueOf(hostName, addr.getPort(), this.startcode); + // login the zookeeper client principal (if using security) + ZKUtil.loginClient(this.conf, HConstants.ZK_CLIENT_KEYTAB_FILE, + HConstants.ZK_CLIENT_KERBEROS_PRINCIPAL, hostName); + // login the server principal (if using secure Hadoop) + login(userProvider, hostName); + // init superusers and add the server principal (if using security) + // or process owner as default super user. + Superusers.initialize(conf); + zooKeeper = + new ZKWatcher(conf, getProcessName() + ":" + addr.getPort(), this, canCreateBaseZNode()); + + this.configurationManager = new ConfigurationManager(); + setupWindows(conf, configurationManager); + + initializeFileSystem(); + + this.choreService = new ChoreService(getName(), true); + this.executorService = new ExecutorService(getName()); + + this.metaRegionLocationCache = new MetaRegionLocationCache(zooKeeper); + + if (clusterMode()) { + if (conf.getBoolean(HBASE_SPLIT_WAL_COORDINATED_BY_ZK, + DEFAULT_HBASE_SPLIT_COORDINATED_BY_ZK)) { + csm = new ZkCoordinatedStateManager(this); + } else { + csm = null; + } + clusterStatusTracker = new ClusterStatusTracker(zooKeeper, this); + clusterStatusTracker.start(); + } else { + csm = null; + clusterStatusTracker = null; + } + putUpWebUI(); + } + + /** + * Puts up the webui. + */ + private void putUpWebUI() throws IOException { + int port = + this.conf.getInt(HConstants.REGIONSERVER_INFO_PORT, HConstants.DEFAULT_REGIONSERVER_INFOPORT); + String addr = this.conf.get("hbase.regionserver.info.bindAddress", "0.0.0.0"); + + if (this instanceof HMaster) { + port = conf.getInt(HConstants.MASTER_INFO_PORT, HConstants.DEFAULT_MASTER_INFOPORT); + addr = this.conf.get("hbase.master.info.bindAddress", "0.0.0.0"); + } + // -1 is for disabling info server + if (port < 0) { + return; + } + + if (!Addressing.isLocalAddress(InetAddress.getByName(addr))) { + String msg = "Failed to start http info server. Address " + addr + + " does not belong to this host. Correct configuration parameter: " + + "hbase.regionserver.info.bindAddress"; + LOG.error(msg); + throw new IOException(msg); + } + // check if auto port bind enabled + boolean auto = this.conf.getBoolean(HConstants.REGIONSERVER_INFO_PORT_AUTO, false); + while (true) { + try { + this.infoServer = new InfoServer(getProcessName(), addr, port, false, this.conf); + infoServer.addPrivilegedServlet("dump", "/dump", getDumpServlet()); + configureInfoServer(infoServer); + this.infoServer.start(); + break; + } catch (BindException e) { + if (!auto) { + // auto bind disabled throw BindException + LOG.error("Failed binding http info server to port: " + port); + throw e; + } + // auto bind enabled, try to use another port + LOG.info("Failed binding http info server to port: " + port); + port++; + LOG.info("Retry starting http info server with port: " + port); + } + } + port = this.infoServer.getPort(); + conf.setInt(HConstants.REGIONSERVER_INFO_PORT, port); + int masterInfoPort = + conf.getInt(HConstants.MASTER_INFO_PORT, HConstants.DEFAULT_MASTER_INFOPORT); + conf.setInt("hbase.master.info.port.orig", masterInfoPort); + conf.setInt(HConstants.MASTER_INFO_PORT, port); + } + + /** + * Sets the abort state if not already set. + * @return True if abortRequested set to True successfully, false if an abort is already in + * progress. + */ + protected final boolean setAbortRequested() { + return abortRequested.compareAndSet(false, true); + } + + @Override + public boolean isStopped() { + return stopped; + } + + @Override + public boolean isAborted() { + return abortRequested.get(); + } + + @Override + public Configuration getConfiguration() { + return conf; + } + + @Override + public AsyncClusterConnection getAsyncClusterConnection() { + return asyncClusterConnection; + } + + @Override + public ZKWatcher getZooKeeper() { + return zooKeeper; + } + + protected final void shutdownChore(ScheduledChore chore) { + if (chore != null) { + chore.shutdown(); + } + } + + protected final void initializeMemStoreChunkCreator(HeapMemoryManager hMemManager) { + if (MemStoreLAB.isEnabled(conf)) { + // MSLAB is enabled. So initialize MemStoreChunkPool + // By this time, the MemstoreFlusher is already initialized. We can get the global limits from + // it. + Pair pair = MemorySizeUtil.getGlobalMemStoreSize(conf); + long globalMemStoreSize = pair.getFirst(); + boolean offheap = pair.getSecond() == MemoryType.NON_HEAP; + // When off heap memstore in use, take full area for chunk pool. + float poolSizePercentage = offheap ? 1.0F : + conf.getFloat(MemStoreLAB.CHUNK_POOL_MAXSIZE_KEY, MemStoreLAB.POOL_MAX_SIZE_DEFAULT); + float initialCountPercentage = conf.getFloat(MemStoreLAB.CHUNK_POOL_INITIALSIZE_KEY, + MemStoreLAB.POOL_INITIAL_SIZE_DEFAULT); + int chunkSize = conf.getInt(MemStoreLAB.CHUNK_SIZE_KEY, MemStoreLAB.CHUNK_SIZE_DEFAULT); + float indexChunkSizePercent = conf.getFloat(MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_KEY, + MemStoreLAB.INDEX_CHUNK_SIZE_PERCENTAGE_DEFAULT); + // init the chunkCreator + ChunkCreator.initialize(chunkSize, offheap, globalMemStoreSize, poolSizePercentage, + initialCountPercentage, hMemManager, indexChunkSizePercent); + } + } + + protected abstract void stopChores(); + + protected final void stopChoreService() { + // clean up the scheduled chores + if (choreService != null) { + LOG.info("Shutdown chores and chore service"); + stopChores(); + // cancel the remaining scheduled chores (in case we missed out any) + // TODO: cancel will not cleanup the chores, so we need make sure we do not miss any + choreService.shutdown(); + } + } + + protected final void stopExecutorService() { + if (executorService != null) { + LOG.info("Shutdown executor service"); + executorService.shutdown(); + } + } + + protected final void closeClusterConnection() { + if (asyncClusterConnection != null) { + LOG.info("Close async cluster connection"); + try { + this.asyncClusterConnection.close(); + } catch (IOException e) { + // Although the {@link Closeable} interface throws an {@link + // IOException}, in reality, the implementation would never do that. + LOG.warn("Attempt to close server's AsyncClusterConnection failed.", e); + } + } + } + + protected final void stopInfoServer() { + if (this.infoServer != null) { + LOG.info("Stop info server"); + try { + this.infoServer.stop(); + } catch (Exception e) { + LOG.error("Failed to stop infoServer", e); + } + } + } + + protected final void closeZooKeeper() { + if (this.zooKeeper != null) { + LOG.info("Close zookeeper"); + this.zooKeeper.close(); + } + } + + @Override + public ServerName getServerName() { + return serverName; + } + + @Override + public ChoreService getChoreService() { + return choreService; + } + + /** + * @return Return table descriptors implementation. + */ + public TableDescriptors getTableDescriptors() { + return this.tableDescriptors; + } + + public ExecutorService getExecutorService() { + return executorService; + } + + public AccessChecker getAccessChecker() { + return rpcServices.getAccessChecker(); + } + + public ZKPermissionWatcher getZKPermissionWatcher() { + return rpcServices.getZkPermissionWatcher(); + } + + @Override + public CoordinatedStateManager getCoordinatedStateManager() { + return csm; + } + + @Override + public Connection createConnection(Configuration conf) throws IOException { + User user = UserProvider.instantiate(conf).getCurrent(); + return ConnectionFactory.createConnection(conf, null, user); + } + + /** + * @return Return the rootDir. + */ + public Path getDataRootDir() { + return dataRootDir; + } + + @Override + public FileSystem getFileSystem() { + return dataFs; + } + + /** + * @return Return the walRootDir. + */ + public Path getWALRootDir() { + return walRootDir; + } + + /** + * @return Return the walFs. + */ + public FileSystem getWALFileSystem() { + return walFs; + } + + /** + * @return True if the cluster is up. + */ + public boolean isClusterUp() { + return !clusterMode() || this.clusterStatusTracker.isClusterUp(); + } + + /** + * @return time stamp in millis of when this server was started + */ + public long getStartcode() { + return this.startcode; + } + + public InfoServer getInfoServer() { + return infoServer; + } + + public int getMsgInterval() { + return msgInterval; + } + + /** + * get NamedQueue Provider to add different logs to ringbuffer + * @return NamedQueueRecorder + */ + public NamedQueueRecorder getNamedQueueRecorder() { + return this.namedQueueRecorder; + } + + public RpcServerInterface getRpcServer() { + return rpcServices.getRpcServer(); + } + + public NettyEventLoopGroupConfig getEventLoopGroupConfig() { + return eventLoopGroupConfig; + } + + public R getRpcServices() { + return rpcServices; + } + + @RestrictedApi(explanation = "Should only be called in tests", link = "", + allowedOnPath = ".*/src/test/.*") + public MetaRegionLocationCache getMetaRegionLocationCache() { + return this.metaRegionLocationCache; + } + + /** + * Reload the configuration from disk. + */ + public void updateConfiguration() { + LOG.info("Reloading the configuration from disk."); + // Reload the configuration from disk. + conf.reloadConfiguration(); + configurationManager.notifyAllObservers(conf); + } + + @Override + public String toString() { + return getServerName().toString(); + } + + protected abstract boolean canCreateBaseZNode(); + + protected abstract String getProcessName(); + + protected abstract R createRpcServices() throws IOException; + + protected abstract String getUseThisHostnameInstead(Configuration conf) throws IOException; + + protected abstract void login(UserProvider user, String host) throws IOException; + + protected abstract NamedQueueRecorder createNamedQueueRecord(); + + protected abstract void configureInfoServer(InfoServer infoServer); + + protected abstract Class getDumpServlet(); + + protected abstract boolean canUpdateTableDescriptor(); + + protected abstract boolean cacheTableDescriptor(); + + protected abstract boolean clusterMode(); +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/Server.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/Server.java index 811b7c0c0f1b..31b8226cd4a9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/Server.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/Server.java @@ -49,7 +49,9 @@ public interface Server extends Abortable, Stoppable { * Important note: this method returns a reference to Connection which is managed * by Server itself, so callers must NOT attempt to close connection obtained. */ - Connection getConnection(); + default Connection getConnection() { + return getAsyncConnection().toConnection(); + } Connection createConnection(Configuration conf) throws IOException; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AnnotationReadingPriorityFunction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AnnotationReadingPriorityFunction.java index 109375b9323b..a2d0169010eb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AnnotationReadingPriorityFunction.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AnnotationReadingPriorityFunction.java @@ -21,8 +21,8 @@ import java.util.HashMap; import java.util.Map; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hbase.HBaseRpcServicesBase; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.regionserver.RSRpcServices; import org.apache.hadoop.hbase.security.User; import org.apache.yetus.audience.InterfaceAudience; @@ -52,7 +52,7 @@ // RegionSpecifier object. Methods can be invoked on the returned object // to figure out whether it is a meta region or not. @InterfaceAudience.Private -public abstract class AnnotationReadingPriorityFunction +public abstract class AnnotationReadingPriorityFunction> implements PriorityFunction { protected final Map annotatedQos; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java index b7d446bc0feb..a3ee71fc6fb2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcServer.java @@ -24,6 +24,7 @@ import java.util.concurrent.CountDownLatch; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.HBaseServerBase; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.security.HBasePolicyProvider; @@ -79,7 +80,7 @@ public NettyRpcServer(Server server, String name, List channelClass; if (server instanceof HRegionServer) { - NettyEventLoopGroupConfig config = ((HRegionServer) server).getEventLoopGroupConfig(); + NettyEventLoopGroupConfig config = ((HBaseServerBase) server).getEventLoopGroupConfig(); eventLoopGroup = config.group(); channelClass = config.serverChannelClass(); } else { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index fed3d06a907a..9706149e82d5 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -60,7 +60,6 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellBuilderFactory; import org.apache.hadoop.hbase.CellBuilderType; -import org.apache.hadoop.hbase.client.BalanceRequest; import org.apache.hadoop.hbase.ClusterId; import org.apache.hadoop.hbase.ClusterMetrics; import org.apache.hadoop.hbase.ClusterMetrics.Option; @@ -68,7 +67,9 @@ import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.HBaseIOException; import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.HBaseServerBase; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.InvalidFamilyOperationException; import org.apache.hadoop.hbase.MasterNotRunningException; import org.apache.hadoop.hbase.MetaTableAccessor; @@ -83,6 +84,7 @@ import org.apache.hadoop.hbase.TableNotDisabledException; import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.UnknownRegionException; +import org.apache.hadoop.hbase.client.BalanceRequest; import org.apache.hadoop.hbase.client.BalanceResponse; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor; import org.apache.hadoop.hbase.client.CompactionState; @@ -102,6 +104,7 @@ import org.apache.hadoop.hbase.executor.ExecutorType; import org.apache.hadoop.hbase.favored.FavoredNodesManager; import org.apache.hadoop.hbase.http.HttpServer; +import org.apache.hadoop.hbase.http.InfoServer; import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; import org.apache.hadoop.hbase.ipc.RpcServer; import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException; @@ -114,6 +117,7 @@ import org.apache.hadoop.hbase.master.assignment.RegionStates; import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure; import org.apache.hadoop.hbase.master.balancer.BalancerChore; +import org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer; import org.apache.hadoop.hbase.master.balancer.ClusterStatusChore; import org.apache.hadoop.hbase.master.balancer.LoadBalancerFactory; import org.apache.hadoop.hbase.master.balancer.MaintenanceLoadBalancer; @@ -166,6 +170,7 @@ import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; +import org.apache.hadoop.hbase.namequeues.NamedQueueRecorder; import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost; import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager; import org.apache.hadoop.hbase.procedure2.LockedResource; @@ -190,7 +195,6 @@ import org.apache.hadoop.hbase.quotas.SpaceViolationPolicy; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; -import org.apache.hadoop.hbase.regionserver.RSRpcServices; import org.apache.hadoop.hbase.replication.ReplicationException; import org.apache.hadoop.hbase.replication.ReplicationLoadSource; import org.apache.hadoop.hbase.replication.ReplicationPeerConfig; @@ -199,16 +203,17 @@ import org.apache.hadoop.hbase.replication.SyncReplicationState; import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner; import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner; -import org.apache.hadoop.hbase.replication.regionserver.ReplicationStatus; import org.apache.hadoop.hbase.rsgroup.RSGroupAdminEndpoint; import org.apache.hadoop.hbase.rsgroup.RSGroupBasedLoadBalancer; import org.apache.hadoop.hbase.rsgroup.RSGroupInfoManager; import org.apache.hadoop.hbase.rsgroup.RSGroupUtil; import org.apache.hadoop.hbase.security.AccessDeniedException; import org.apache.hadoop.hbase.security.SecurityConstants; +import org.apache.hadoop.hbase.security.Superusers; import org.apache.hadoop.hbase.security.UserProvider; import org.apache.hadoop.hbase.util.Addressing; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.FutureUtils; @@ -266,8 +271,7 @@ * @see org.apache.zookeeper.Watcher */ @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.TOOLS) -@SuppressWarnings("deprecation") -public class HMaster extends HRegionServer implements MasterServices { +public class HMaster extends HBaseServerBase implements MasterServices { private static final Logger LOG = LoggerFactory.getLogger(HMaster.class); @@ -304,6 +308,8 @@ public class HMaster extends HRegionServer implements MasterServices { public static final int DEFAULT_HBASE_MASTER_CLEANER_INTERVAL = 600 * 1000; + private String clusterId; + // Metrics for the HMaster final MetricsMaster metricsMaster; // file system manager for the master FS operations @@ -435,7 +441,7 @@ public class HMaster extends HRegionServer implements MasterServices { * active one. */ public HMaster(final Configuration conf) throws IOException { - super(conf); + super(conf, "Master"); try { if (conf.getBoolean(MAINTENANCE_MODE, false)) { LOG.info("Detected {}=true via configuration.", MAINTENANCE_MODE); @@ -447,9 +453,10 @@ public HMaster(final Configuration conf) throws IOException { maintenanceMode = false; } this.rsFatals = new MemoryBoundedLogMessageBuffer( - conf.getLong("hbase.master.buffer.for.rs.fatals", 1 * 1024 * 1024)); - LOG.info("hbase.rootdir={}, hbase.cluster.distributed={}", getDataRootDir(), - this.conf.getBoolean(HConstants.CLUSTER_DISTRIBUTED, false)); + conf.getLong("hbase.master.buffer.for.rs.fatals", 1 * 1024 * 1024)); + LOG.info("hbase.rootdir={}, hbase.cluster.distributed={}", + CommonFSUtils.getRootDir(this.conf), + this.conf.getBoolean(HConstants.CLUSTER_DISTRIBUTED, false)); // Disable usage of meta replicas in the master this.conf.setBoolean(HConstants.USE_META_REPLICAS, false); @@ -491,12 +498,10 @@ public HMaster(final Configuration conf) throws IOException { getChoreService().scheduleChore(clusterStatusPublisherChore); } } - this.activeMasterManager = createActiveMasterManager(zooKeeper, serverName, this); - cachedClusterId = new CachedClusterId(this, conf); - this.regionServerTracker = new RegionServerTracker(zooKeeper, this); + this.rpcServices.start(zooKeeper); } catch (Throwable t) { // Make sure we log the exception. HMaster is often started via reflection and the // cause of failed startup is lost. @@ -519,11 +524,17 @@ protected String getUseThisHostnameInstead(Configuration conf) { return conf.get(MASTER_HOSTNAME_KEY); } + private void registerConfigurationObservers() { + configurationManager.registerObserver(this.rpcServices); + configurationManager.registerObserver(this); + } + // Main run loop. Calls through to the regionserver run loop AFTER becoming active Master; will // block in here until then. @Override public void run() { try { + registerConfigurationObservers(); Threads.setDaemonThreadRunning(new Thread(() -> { try { int infoPort = putUpJettyServer(); @@ -538,9 +549,16 @@ public void run() { } } }), getName() + ":becomeActiveMaster"); - // Fall in here even if we have been aborted. Need to run the shutdown services and - // the super run call will do this for us. - super.run(); + while (!isStopped() && !isAborted()) { + sleeper.sleep(); + } + stopInfoServer(); + closeClusterConnection(); + stopServiceThreads(); + if (this.rpcServices != null) { + this.rpcServices.stop(); + } + closeZooKeeper(); } finally { if (this.clusterSchemaService != null) { // If on way out, then we are no longer active master. @@ -615,26 +633,16 @@ private int putUpJettyServer() throws IOException { @Override protected void login(UserProvider user, String host) throws IOException { try { - super.login(user, host); + user.login(SecurityConstants.REGIONSERVER_KRB_KEYTAB_FILE, + SecurityConstants.REGIONSERVER_KRB_PRINCIPAL, host); } catch (IOException ie) { - user.login(SecurityConstants.MASTER_KRB_KEYTAB_FILE, - SecurityConstants.MASTER_KRB_PRINCIPAL, host); - } - } - - /** - * Loop till the server is stopped or aborted. - */ - @Override - protected void waitForMasterActive() { - while (!isStopped() && !isAborted()) { - sleeper.sleep(); + user.login(SecurityConstants.MASTER_KRB_KEYTAB_FILE, SecurityConstants.MASTER_KRB_PRINCIPAL, + host); } } - @InterfaceAudience.Private public MasterRpcServices getMasterRpcServices() { - return (MasterRpcServices)rpcServices; + return rpcServices; } public boolean balanceSwitch(final boolean b) throws IOException { @@ -661,13 +669,12 @@ protected boolean cacheTableDescriptor() { return true; } - @Override - protected RSRpcServices createRpcServices() throws IOException { + protected MasterRpcServices createRpcServices() throws IOException { return new MasterRpcServices(this); } @Override - protected void configureInfoServer() { + protected void configureInfoServer(InfoServer infoServer) { infoServer.addUnprivilegedServlet("master-status", "/master-status", MasterStatusServlet.class); infoServer.setAttribute(MASTER, this); } @@ -860,7 +867,7 @@ private void finishActiveMasterInitialization(MonitoredTask status) throws IOExc // always initialize the MemStoreLAB as we use a region to store data in master now, see // localStore. - initializeMemStoreChunkCreator(); + initializeMemStoreChunkCreator(null); this.fileSystemManager = new MasterFileSystem(conf); this.walManager = new MasterWalManager(this); @@ -1539,7 +1546,6 @@ private void startServiceThreads() throws IOException { } } - @Override protected void stopServiceThreads() { if (masterJettyServer != null) { LOG.info("Stopping master jetty server"); @@ -1549,9 +1555,8 @@ protected void stopServiceThreads() { LOG.error("Failed to stop master jetty server", e); } } - stopChores(); - - super.stopServiceThreads(); + stopChoreService(); + stopExecutorService(); if (cleanerPool != null) { cleanerPool.shutdownNow(); cleanerPool = null; @@ -1680,25 +1685,23 @@ private void stopProcedureExecutor() { } } - private void stopChores() { - if (getChoreService() != null) { - shutdownChore(mobFileCleanerChore); - shutdownChore(mobFileCompactionChore); - shutdownChore(balancerChore); - if (regionNormalizerManager != null) { - shutdownChore(regionNormalizerManager.getRegionNormalizerChore()); - } - shutdownChore(clusterStatusChore); - shutdownChore(catalogJanitorChore); - shutdownChore(clusterStatusPublisherChore); - shutdownChore(snapshotQuotaChore); - shutdownChore(logCleaner); - shutdownChore(hfileCleaner); - shutdownChore(replicationBarrierCleaner); - shutdownChore(snapshotCleanerChore); - shutdownChore(hbckChore); - shutdownChore(regionsRecoveryChore); + protected void stopChores() { + shutdownChore(mobFileCleanerChore); + shutdownChore(mobFileCompactionChore); + shutdownChore(balancerChore); + if (regionNormalizerManager != null) { + shutdownChore(regionNormalizerManager.getRegionNormalizerChore()); } + shutdownChore(clusterStatusChore); + shutdownChore(catalogJanitorChore); + shutdownChore(clusterStatusPublisherChore); + shutdownChore(snapshotQuotaChore); + shutdownChore(logCleaner); + shutdownChore(hfileCleaner); + shutdownChore(replicationBarrierCleaner); + shutdownChore(snapshotCleanerChore); + shutdownChore(hbckChore); + shutdownChore(regionsRecoveryChore); } /** @@ -2722,16 +2725,6 @@ public ClusterMetrics getClusterMetrics(EnumSet