Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hbase.ipc;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;

/**
* RPC Executor that uses different queues for reads and writes for meta.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class MetaRWQueueRpcExecutor extends RWQueueRpcExecutor {
public static final String META_CALL_QUEUE_READ_SHARE_CONF_KEY =
"hbase.ipc.server.metacallqueue.read.ratio";
public static final String META_CALL_QUEUE_SCAN_SHARE_CONF_KEY =
"hbase.ipc.server.metacallqueue.scan.ratio";
public static final float DEFAULT_META_CALL_QUEUE_READ_SHARE = 0.9f;

public MetaRWQueueRpcExecutor(final String name, final int handlerCount, final int maxQueueLength,
final PriorityFunction priority, final Configuration conf, final Abortable abortable) {
super(name, handlerCount, maxQueueLength, priority, conf, abortable);
}

@Override
protected float getReadShare(final Configuration conf) {
return conf.getFloat(META_CALL_QUEUE_READ_SHARE_CONF_KEY, DEFAULT_META_CALL_QUEUE_READ_SHARE);
}

@Override
protected float getScanShare(final Configuration conf) {
return conf.getFloat(META_CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@ public RWQueueRpcExecutor(final String name, final int handlerCount, final int m
final PriorityFunction priority, final Configuration conf, final Abortable abortable) {
super(name, handlerCount, maxQueueLength, priority, conf, abortable);

float callqReadShare = conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0);
float callqScanShare = conf.getFloat(CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0);
float callqReadShare = getReadShare(conf);
float callqScanShare = getScanShare(conf);

numWriteQueues = calcNumWriters(this.numCallQueues, callqReadShare);
writeHandlersCount = Math.max(numWriteQueues, calcNumWriters(handlerCount, callqReadShare));
Expand Down Expand Up @@ -195,7 +195,7 @@ public int getActiveScanHandlerCount() {
return activeScanHandlerCount.get();
}

private boolean isWriteRequest(final RequestHeader header, final Message param) {
protected boolean isWriteRequest(final RequestHeader header, final Message param) {
// TODO: Is there a better way to do this?
if (param instanceof MultiRequest) {
MultiRequest multi = (MultiRequest)param;
Expand Down Expand Up @@ -232,6 +232,14 @@ private boolean isScanRequest(final RequestHeader header, final Message param) {
return param instanceof ScanRequest;
}

protected float getReadShare(final Configuration conf) {
return conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0);
}

protected float getScanShare(final Configuration conf) {
return conf.getFloat(CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0);
}

/*
* Calculate the number of writers based on the "total count" and the read share.
* You'll get at least one writer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,22 @@ public SimpleRpcScheduler(
}
}

// Create 2 queues to help priorityExecutor be more scalable.
this.priorityExecutor = priorityHandlerCount > 0 ? new FastPathBalancedQueueRpcExecutor(
"priority.FPBQ", priorityHandlerCount, RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE,
maxPriorityQueueLength, priority, conf, abortable) : null;
float metaCallqReadShare =
conf.getFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_READ_SHARE_CONF_KEY,
MetaRWQueueRpcExecutor.DEFAULT_META_CALL_QUEUE_READ_SHARE);
if (metaCallqReadShare > 0) {
// different read/write handler for meta, at least 1 read handler and 1 write handler
this.priorityExecutor =
new MetaRWQueueRpcExecutor("priority.RWQ", Math.max(2, priorityHandlerCount),
maxPriorityQueueLength, priority, conf, server);
} else {
// Create 2 queues to help priorityExecutor be more scalable.
this.priorityExecutor = priorityHandlerCount > 0 ?
new FastPathBalancedQueueRpcExecutor("priority.FPBQ", priorityHandlerCount,
RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxPriorityQueueLength, priority, conf,
abortable) :
null;
}
this.replicationExecutor =
replicationHandlerCount > 0
? new FastPathBalancedQueueRpcExecutor("replication.FPBQ", replicationHandlerCount,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -559,6 +559,75 @@ public void testCoDelScheduling() throws Exception {
}
}

@Test
public void testMetaRWScanQueues() throws Exception {
Configuration schedConf = HBaseConfiguration.create();
schedConf.setFloat(RpcExecutor.CALL_QUEUE_HANDLER_FACTOR_CONF_KEY, 1.0f);
schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_READ_SHARE_CONF_KEY, 0.7f);
schedConf.setFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0.5f);

PriorityFunction priority = mock(PriorityFunction.class);
when(priority.getPriority(any(), any(), any())).thenReturn(HConstants.HIGH_QOS);

RpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 3, 3, 1, priority,
HConstants.QOS_THRESHOLD);
try {
scheduler.start();

CallRunner putCallTask = mock(CallRunner.class);
ServerCall putCall = mock(ServerCall.class);
putCall.param = RequestConverter.buildMutateRequest(
Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
when(putCallTask.getRpcCall()).thenReturn(putCall);
when(putCall.getHeader()).thenReturn(putHead);
when(putCall.getParam()).thenReturn(putCall.param);

CallRunner getCallTask = mock(CallRunner.class);
ServerCall getCall = mock(ServerCall.class);
RequestHeader getHead = RequestHeader.newBuilder().setMethodName("get").build();
when(getCallTask.getRpcCall()).thenReturn(getCall);
when(getCall.getHeader()).thenReturn(getHead);

CallRunner scanCallTask = mock(CallRunner.class);
ServerCall scanCall = mock(ServerCall.class);
scanCall.param = ScanRequest.newBuilder().build();
RequestHeader scanHead = RequestHeader.newBuilder().setMethodName("scan").build();
when(scanCallTask.getRpcCall()).thenReturn(scanCall);
when(scanCall.getHeader()).thenReturn(scanHead);
when(scanCall.getParam()).thenReturn(scanCall.param);

ArrayList<Integer> work = new ArrayList<>();
doAnswerTaskExecution(putCallTask, work, 1, 1000);
doAnswerTaskExecution(getCallTask, work, 2, 1000);
doAnswerTaskExecution(scanCallTask, work, 3, 1000);

// There are 3 queues: [puts], [gets], [scans]
// so the calls will be interleaved
scheduler.dispatch(putCallTask);
scheduler.dispatch(putCallTask);
scheduler.dispatch(putCallTask);
scheduler.dispatch(getCallTask);
scheduler.dispatch(getCallTask);
scheduler.dispatch(getCallTask);
scheduler.dispatch(scanCallTask);
scheduler.dispatch(scanCallTask);
scheduler.dispatch(scanCallTask);

while (work.size() < 6) {
Thread.sleep(100);
}

for (int i = 0; i < work.size() - 2; i += 3) {
assertNotEquals(work.get(i + 0), work.get(i + 1));
assertNotEquals(work.get(i + 0), work.get(i + 2));
assertNotEquals(work.get(i + 1), work.get(i + 2));
}
} finally {
scheduler.stop();
}
}

// Get mocked call that has the CallRunner sleep for a while so that the fast
// path isn't hit.
private CallRunner getMockedCallRunner(long timestamp, final long sleepTime) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ protected AssignmentManager createAssignmentManager(MasterServices master) {
public static void setUp() throws Exception {
UTIL.getConfiguration().setClass(HConstants.MASTER_IMPL, HMasterForTest.class, HMaster.class);
UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 1000);
UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT);
UTIL.startMiniCluster(1);
UTIL.createTable(NAME, CF);
UTIL.waitTableAvailable(NAME);
Expand Down