Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ public QuotaLimiter getUserLimiter(final UserGroupInformation ugi, final TableNa
* @return the quota info associated to specified user
*/
public UserQuotaState getUserQuotaState(final UserGroupInformation ugi) {
return computeIfAbsent(userQuotaCache, getQuotaUserName(ugi), UserQuotaState::new,
return computeIfAbsent(userQuotaCache, getQuotaUserName(ugi),
() -> QuotaUtil.buildDefaultUserQuotaState(rsServices.getConfiguration()),
this::triggerCacheRefresh);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.DoNotRetryIOException;
Expand Down Expand Up @@ -49,7 +50,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TimeUnit;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.QuotaScope;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Quotas;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.Throttle;
Expand All @@ -73,6 +76,26 @@ public class QuotaUtil extends QuotaTableUtil {
// the default one write capacity unit is 1024 bytes (1KB)
public static final long DEFAULT_WRITE_CAPACITY_UNIT = 1024;

/*
* The below defaults, if configured, will be applied to otherwise unthrottled users. For example,
* set `hbase.quota.default.user.machine.read.size` to `1048576` in your hbase-site.xml to ensure
* that any given user may not query more than 1mb per second from any given machine, unless
* explicitly permitted by a persisted quota. All of these defaults use TimeUnit.SECONDS and
* QuotaScope.MACHINE.
*/
public static final String QUOTA_DEFAULT_USER_MACHINE_READ_NUM =
"hbase.quota.default.user.machine.read.num";
public static final String QUOTA_DEFAULT_USER_MACHINE_READ_SIZE =
"hbase.quota.default.user.machine.read.size";
public static final String QUOTA_DEFAULT_USER_MACHINE_REQUEST_NUM =
"hbase.quota.default.user.machine.request.num";
public static final String QUOTA_DEFAULT_USER_MACHINE_REQUEST_SIZE =
"hbase.quota.default.user.machine.request.size";
public static final String QUOTA_DEFAULT_USER_MACHINE_WRITE_NUM =
"hbase.quota.default.user.machine.write.num";
public static final String QUOTA_DEFAULT_USER_MACHINE_WRITE_SIZE =
"hbase.quota.default.user.machine.write.size";

/** Table descriptor for Quota internal table */
public static final TableDescriptor QUOTA_TABLE_DESC =
TableDescriptorBuilder.newBuilder(QUOTA_TABLE_NAME)
Expand Down Expand Up @@ -284,10 +307,14 @@ public static Map<String, UserQuotaState> fetchUserQuotas(final Connection conne
assert isUserRowKey(key);
String user = getUserFromRowKey(key);

if (results[i].isEmpty()) {
userQuotas.put(user, buildDefaultUserQuotaState(connection.getConfiguration()));
continue;
}

final UserQuotaState quotaInfo = new UserQuotaState(nowTs);
userQuotas.put(user, quotaInfo);

if (results[i].isEmpty()) continue;
assert Bytes.equals(key, results[i].getRow());

try {
Expand Down Expand Up @@ -321,6 +348,38 @@ public void visitUserQuotas(String userName, Quotas quotas) {
return userQuotas;
}

protected static UserQuotaState buildDefaultUserQuotaState(Configuration conf) {
QuotaProtos.Throttle.Builder throttleBuilder = QuotaProtos.Throttle.newBuilder();

buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_READ_NUM)
.ifPresent(throttleBuilder::setReadNum);
buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_READ_SIZE)
.ifPresent(throttleBuilder::setReadSize);
buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_REQUEST_NUM)
.ifPresent(throttleBuilder::setReqNum);
buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_REQUEST_SIZE)
.ifPresent(throttleBuilder::setReqSize);
buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_WRITE_NUM)
.ifPresent(throttleBuilder::setWriteNum);
buildDefaultTimedQuota(conf, QUOTA_DEFAULT_USER_MACHINE_WRITE_SIZE)
.ifPresent(throttleBuilder::setWriteSize);

UserQuotaState state = new UserQuotaState();
QuotaProtos.Quotas defaultQuotas =
QuotaProtos.Quotas.newBuilder().setThrottle(throttleBuilder.build()).build();
state.setQuotas(defaultQuotas);
return state;
}

private static Optional<TimedQuota> buildDefaultTimedQuota(Configuration conf, String key) {
int defaultSoftLimit = conf.getInt(key, -1);
if (defaultSoftLimit == -1) {
return Optional.empty();
}
return Optional.of(ProtobufUtil.toTimedQuota(defaultSoftLimit,
java.util.concurrent.TimeUnit.SECONDS, org.apache.hadoop.hbase.quotas.QuotaScope.MACHINE));
}

public static Map<TableName, QuotaState> fetchTableQuotas(final Connection connection,
final List<Get> gets, Map<TableName, Double> tableMachineFactors) throws IOException {
return fetchGlobalQuotas("table", connection, gets, new KeyFromRow<TableName>() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.quotas;

import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.triggerUserCacheRefresh;
import static org.apache.hadoop.hbase.quotas.ThrottleQuotaTestUtil.waitMinuteQuota;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({ RegionServerTests.class, MediumTests.class })
public class TestDefaultQuota {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestDefaultQuota.class);
private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil();
private static final TableName TABLE_NAME = TableName.valueOf(UUID.randomUUID().toString());
private static final int REFRESH_TIME = 5000;
private static final byte[] FAMILY = Bytes.toBytes("cf");
private static final byte[] QUALIFIER = Bytes.toBytes("q");

@After
public void tearDown() throws Exception {
ThrottleQuotaTestUtil.clearQuotaCache(TEST_UTIL);
EnvironmentEdgeManager.reset();
TEST_UTIL.deleteTable(TABLE_NAME);
TEST_UTIL.shutdownMiniCluster();
}

@BeforeClass
public static void setUpBeforeClass() throws Exception {
// quotas enabled, using block bytes scanned
TEST_UTIL.getConfiguration().setBoolean(QuotaUtil.QUOTA_CONF_KEY, true);
TEST_UTIL.getConfiguration().setInt(QuotaCache.REFRESH_CONF_KEY, REFRESH_TIME);
TEST_UTIL.getConfiguration().setInt(QuotaUtil.QUOTA_DEFAULT_USER_MACHINE_READ_NUM, 1);

// don't cache blocks to make IO predictable
TEST_UTIL.getConfiguration().setFloat(HConstants.HFILE_BLOCK_CACHE_SIZE_KEY, 0.0f);

TEST_UTIL.startMiniCluster(1);
TEST_UTIL.waitTableAvailable(QuotaTableUtil.QUOTA_TABLE_NAME);
TEST_UTIL.createTable(TABLE_NAME, FAMILY);
TEST_UTIL.waitTableAvailable(TABLE_NAME);
QuotaCache.TEST_FORCE_REFRESH = true;

try (Admin admin = TEST_UTIL.getAdmin()) {
ThrottleQuotaTestUtil.doPuts(1_000, FAMILY, QUALIFIER,
admin.getConnection().getTable(TABLE_NAME));
}
TEST_UTIL.flush(TABLE_NAME);
}

@Test
public void testDefaultUserReadNum() throws Exception {
// Should have a strict throttle by default
TEST_UTIL.waitFor(60_000, () -> runGetsTest(100) < 100);

// Add big quota and should be effectively unlimited
configureLenientThrottle();
refreshQuotas();
// Should run without error
TEST_UTIL.waitFor(60_000, () -> runGetsTest(100) == 100);

// Remove all the limits, and should revert to strict default
unsetQuota();
TEST_UTIL.waitFor(60_000, () -> runGetsTest(100) < 100);
}

private void configureLenientThrottle() throws IOException {
try (Admin admin = TEST_UTIL.getAdmin()) {
admin.setQuota(QuotaSettingsFactory.throttleUser(getUserName(), ThrottleType.READ_NUMBER,
100_000, TimeUnit.SECONDS));
}
}

private static String getUserName() throws IOException {
return User.getCurrent().getShortName();
}

private void refreshQuotas() throws Exception {
triggerUserCacheRefresh(TEST_UTIL, false, TABLE_NAME);
waitMinuteQuota();
}

private void unsetQuota() throws Exception {
try (Admin admin = TEST_UTIL.getAdmin()) {
admin.setQuota(QuotaSettingsFactory.unthrottleUser(getUserName()));
}
refreshQuotas();
}

private long runGetsTest(int attempts) throws Exception {
refreshQuotas();
try (Table table = getTable()) {
return ThrottleQuotaTestUtil.doGets(attempts, FAMILY, QUALIFIER, table);
}
}

private Table getTable() throws IOException {
TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 100);
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
return TEST_UTIL.getConnection().getTableBuilder(TABLE_NAME, null).setOperationTimeout(250)
.build();
}

}