Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@
class NamespaceStateManager {

private static final Logger LOG = LoggerFactory.getLogger(NamespaceStateManager.class);
private ConcurrentMap<String, NamespaceTableAndRegionInfo> nsStateCache;
private MasterServices master;

private final ConcurrentMap<String, NamespaceTableAndRegionInfo> nsStateCache;
private final MasterServices master;
private volatile boolean initialized = false;

public NamespaceStateManager(MasterServices masterServices) {
Expand Down Expand Up @@ -76,6 +77,9 @@ public NamespaceTableAndRegionInfo getState(String name) {
*/
synchronized boolean checkAndUpdateNamespaceRegionCount(TableName name, byte[] regionName,
int incr) throws IOException {
if (name.isSystemTable()) {
return true;
}
String namespace = name.getNamespaceAsString();
NamespaceDescriptor nspdesc = getNamespaceDescriptor(namespace);
if (nspdesc != null) {
Expand All @@ -84,17 +88,18 @@ synchronized boolean checkAndUpdateNamespaceRegionCount(TableName name, byte[] r
int regionCount = currentStatus.getRegionCount();
long maxRegionCount = TableNamespaceManager.getMaxRegions(nspdesc);
if (incr > 0 && regionCount >= maxRegionCount) {
LOG.warn("The region " + Bytes.toStringBinary(regionName)
+ " cannot be created. The region count will exceed quota on the namespace. "
+ "This may be transient, please retry later if there are any ongoing split"
+ " operations in the namespace.");
LOG.warn(
"The region {} cannot be created. The region count will exceed quota on the namespace. "
+ "This may be transient, please retry later if there are any ongoing split"
+ " operations in the namespace.",
Bytes.toStringBinary(regionName));
return false;
}
NamespaceTableAndRegionInfo nsInfo = nsStateCache.get(namespace);
if (nsInfo != null) {
nsInfo.incRegionCountForTable(name, incr);
} else {
LOG.warn("Namespace state found null for namespace : " + namespace);
LOG.warn("Namespace state found null for namespace : {}", namespace);
}
}
return true;
Expand All @@ -110,6 +115,9 @@ synchronized boolean checkAndUpdateNamespaceRegionCount(TableName name, byte[] r
*/
synchronized void checkAndUpdateNamespaceRegionCount(TableName name, int incr)
throws IOException {
if (name.isSystemTable()) {
return;
}
String namespace = name.getNamespaceAsString();
NamespaceDescriptor nspdesc = getNamespaceDescriptor(namespace);
if (nspdesc != null) {
Expand All @@ -133,13 +141,16 @@ private NamespaceDescriptor getNamespaceDescriptor(String namespaceAsString) {
try {
return this.master.getClusterSchema().getNamespace(namespaceAsString);
} catch (IOException e) {
LOG.error("Error while fetching namespace descriptor for namespace : " + namespaceAsString);
LOG.error("Error while fetching namespace descriptor for namespace : {}", namespaceAsString);
return null;
}
}

synchronized void checkAndUpdateNamespaceTableCount(TableName table, int numRegions)
throws IOException {
if (table.isSystemTable()) {
return;
}
String namespace = table.getNamespaceAsString();
NamespaceDescriptor nspdesc = getNamespaceDescriptor(namespace);
if (nspdesc != null) {
Expand Down Expand Up @@ -186,6 +197,7 @@ void deleteNamespace(String namespace) {
}

private void addTable(TableName tableName, int regionCount) throws IOException {
assert !tableName.isSystemTable() : "Tracking of system tables is not supported";
NamespaceTableAndRegionInfo info = nsStateCache.get(tableName.getNamespaceAsString());
if (info != null) {
info.addTable(tableName, regionCount);
Expand All @@ -196,6 +208,9 @@ private void addTable(TableName tableName, int regionCount) throws IOException {
}

synchronized void removeTable(TableName tableName) {
if (tableName.isSystemTable()) {
return;
}
NamespaceTableAndRegionInfo info = nsStateCache.get(tableName.getNamespaceAsString());
if (info != null) {
info.removeTable(tableName);
Expand All @@ -219,7 +234,7 @@ private void initialize() throws IOException {
addTable(table, regions.size());
}
}
LOG.info("Finished updating state of " + nsStateCache.size() + " namespaces. ");
LOG.info("Finished updating state of {} namespaces.", nsStateCache.size());
initialized = true;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;

import static org.junit.Assert.assertEquals;

import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.AsyncAdmin;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.quotas.QuotaUtil;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.MiscTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExternalResource;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Test that we can split and merge the quota table given the presence of various configuration
* settings.
*/
@Category({ MiscTests.class, LargeTests.class })
@RunWith(Parameterized.class)
public class TestSplitMergeQuotaTable {

private static final Logger LOG = LoggerFactory.getLogger(TestSplitMergeQuotaTable.class);

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestSplitMergeQuotaTable.class);

@Parameterized.Parameters(name = "{1}")
public static Object[][] params() {
return new Object[][] { { Map.of(QuotaUtil.QUOTA_CONF_KEY, "false") },
{ Map.of(QuotaUtil.QUOTA_CONF_KEY, "true") }, };
}

private final TableName tableName = QuotaUtil.QUOTA_TABLE_NAME;
private final MiniClusterRule miniClusterRule;

@Rule
public final RuleChain ruleChain;

public TestSplitMergeQuotaTable(Map<String, String> configMap) {
this.miniClusterRule = MiniClusterRule.newBuilder().setConfiguration(() -> {
Configuration conf = HBaseConfiguration.create();
conf.setInt(HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT, 1000);
conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
configMap.forEach(conf::set);
return conf;
}).build();
TestRule ensureQuotaTableRule = new ExternalResource() {
@Override
protected void before() throws Throwable {
if (
!miniClusterRule.getTestingUtility().getAsyncConnection().getAdmin()
.tableExists(QuotaUtil.QUOTA_TABLE_NAME).get(30, TimeUnit.SECONDS)
) {
miniClusterRule.getTestingUtility().getHBaseCluster().getMaster()
.createSystemTable(QuotaUtil.QUOTA_TABLE_DESC);
}
}
};
this.ruleChain = RuleChain.outerRule(miniClusterRule).around(ensureQuotaTableRule);
}

@Test
public void testSplitMerge() throws Exception {
HBaseTestingUtil util = miniClusterRule.getTestingUtility();
util.waitTableAvailable(tableName, 30_000);
AsyncAdmin admin = util.getAsyncConnection().getAdmin();
admin.split(tableName, Bytes.toBytes(0x10)).get(30, TimeUnit.SECONDS);
util.waitFor(30_000, new Waiter.ExplainingPredicate<Exception>() {

@Override
public boolean evaluate() throws Exception {
// can potentially observe the parent and both children via this interface.
return admin.getRegions(tableName)
.thenApply(val -> val.stream()
.filter(info -> info.getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID).toList())
.get(30, TimeUnit.SECONDS).size() > 1;
}

@Override
public String explainFailure() {
return "Split has not finished yet";
}
});
util.waitUntilNoRegionsInTransition();
List<RegionInfo> regionInfos = admin.getRegions(tableName)
.thenApply(val -> val.stream()
.filter(info -> info.getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID).toList())
.get(30, TimeUnit.SECONDS);
assertEquals(2, regionInfos.size());
LOG.info("{}", regionInfos);
admin.mergeRegions(regionInfos.stream().map(RegionInfo::getRegionName).toList(), false).get(30,
TimeUnit.SECONDS);
util.waitFor(30000, new Waiter.ExplainingPredicate<Exception>() {

@Override
public boolean evaluate() throws Exception {
// can potentially observe the parent and both children via this interface.
return admin.getRegions(tableName)
.thenApply(val -> val.stream()
.filter(info -> info.getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID).toList())
.get(30, TimeUnit.SECONDS).size() == 1;
}

@Override
public String explainFailure() {
return "Merge has not finished yet";
}
});
assertEquals(1, admin.getRegions(tableName).get(30, TimeUnit.SECONDS).size());
}
}