Skip to content

Commit 32057e1

Browse files
committed
HBASE-22280 Separate read/write handler for priority request(especially for meta).
1 parent f9f6354 commit 32057e1

File tree

3 files changed

+81
-8
lines changed

3 files changed

+81
-8
lines changed
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/**
2+
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing, software
14+
* distributed under the License is distributed on an "AS IS" BASIS,
15+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16+
* See the License for the specific language governing permissions and
17+
* limitations under the License.
18+
*/
19+
20+
package org.apache.hadoop.hbase.ipc;
21+
22+
import org.apache.hadoop.conf.Configuration;
23+
import org.apache.hadoop.hbase.Abortable;
24+
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
25+
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
26+
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
27+
28+
public class MetaRWQueueRpcExecutor extends RWQueueRpcExecutor {
29+
public static final String META_CALL_QUEUE_READ_SHARE_CONF_KEY =
30+
"hbase.ipc.server.metacallqueue.read.ratio";
31+
public static final String META_CALL_QUEUE_SCAN_SHARE_CONF_KEY =
32+
"hbase.ipc.server.metacallqueue.scan.ratio";
33+
34+
public MetaRWQueueRpcExecutor(final String name, final int handlerCount, final int maxQueueLength,
35+
final PriorityFunction priority, final Configuration conf, final Abortable abortable) {
36+
super(name, handlerCount, maxQueueLength, priority, conf, abortable);
37+
}
38+
39+
protected boolean isScanRequest(final RPCProtos.RequestHeader header, final Message param) {
40+
if (param instanceof ClientProtos.ScanRequest) {
41+
// Client read meta use scan request.
42+
return true;
43+
}
44+
return false;
45+
}
46+
47+
protected float getReadShare(final Configuration conf) {
48+
return conf.getFloat(META_CALL_QUEUE_READ_SHARE_CONF_KEY, 0.9f);
49+
}
50+
51+
protected float getScanShare(final Configuration conf) {
52+
return conf.getFloat(META_CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0);
53+
}
54+
}

hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RWQueueRpcExecutor.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,8 @@ public RWQueueRpcExecutor(final String name, final int handlerCount, final int m
7171
final PriorityFunction priority, final Configuration conf, final Abortable abortable) {
7272
super(name, handlerCount, maxQueueLength, priority, conf, abortable);
7373

74-
float callqReadShare = conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0);
75-
float callqScanShare = conf.getFloat(CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0);
74+
float callqReadShare = getReadShare(conf);
75+
float callqScanShare = getScanShare(conf);
7676

7777
numWriteQueues = calcNumWriters(this.numCallQueues, callqReadShare);
7878
writeHandlersCount = Math.max(numWriteQueues, calcNumWriters(handlerCount, callqReadShare));
@@ -195,7 +195,7 @@ public int getActiveScanHandlerCount() {
195195
return activeScanHandlerCount.get();
196196
}
197197

198-
private boolean isWriteRequest(final RequestHeader header, final Message param) {
198+
protected boolean isWriteRequest(final RequestHeader header, final Message param) {
199199
// TODO: Is there a better way to do this?
200200
if (param instanceof MultiRequest) {
201201
MultiRequest multi = (MultiRequest)param;
@@ -228,7 +228,7 @@ private boolean isWriteRequest(final RequestHeader header, final Message param)
228228
return false;
229229
}
230230

231-
private boolean isScanRequest(final RequestHeader header, final Message param) {
231+
protected boolean isScanRequest(final RequestHeader header, final Message param) {
232232
if (param instanceof ScanRequest) {
233233
// The first scan request will be executed as a "short read"
234234
ScanRequest request = (ScanRequest)param;
@@ -237,6 +237,14 @@ private boolean isScanRequest(final RequestHeader header, final Message param) {
237237
return false;
238238
}
239239

240+
protected float getReadShare(final Configuration conf) {
241+
return conf.getFloat(CALL_QUEUE_READ_SHARE_CONF_KEY, 0);
242+
}
243+
244+
protected float getScanShare(final Configuration conf) {
245+
return conf.getFloat(CALL_QUEUE_SCAN_SHARE_CONF_KEY, 0);
246+
}
247+
240248
/*
241249
* Calculate the number of writers based on the "total count" and the read share.
242250
* You'll get at least one writer.

hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -100,10 +100,21 @@ public SimpleRpcScheduler(
100100
}
101101
}
102102

103-
// Create 2 queues to help priorityExecutor be more scalable.
104-
this.priorityExecutor = priorityHandlerCount > 0 ? new FastPathBalancedQueueRpcExecutor(
105-
"priority.FPBQ", priorityHandlerCount, RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE,
106-
maxPriorityQueueLength, priority, conf, abortable) : null;
103+
float metaCallqReadShare =
104+
conf.getFloat(MetaRWQueueRpcExecutor.META_CALL_QUEUE_READ_SHARE_CONF_KEY, 0.9f);
105+
if (metaCallqReadShare > 0) {
106+
// different read/write handler for meta, at least 1 read handler and 1 write handler
107+
this.priorityExecutor =
108+
new MetaRWQueueRpcExecutor("priority.RWQ", Math.max(2, priorityHandlerCount),
109+
maxPriorityQueueLength, priority, conf, server);
110+
} else {
111+
// Create 2 queues to help priorityExecutor be more scalable.
112+
this.priorityExecutor = priorityHandlerCount > 0 ?
113+
new FastPathBalancedQueueRpcExecutor("priority.FPBQ", priorityHandlerCount,
114+
RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxPriorityQueueLength, priority, conf,
115+
abortable) :
116+
null;
117+
}
107118
this.replicationExecutor =
108119
replicationHandlerCount > 0
109120
? new FastPathBalancedQueueRpcExecutor("replication.FPBQ", replicationHandlerCount,

0 commit comments

Comments
 (0)