Skip to content

Commit 2bdb8b3

Browse files
committed
HBASE-27144 Improvement bulkload rpc
1 parent 5cf728d commit 2bdb8b3

File tree

13 files changed

+252
-10
lines changed

13 files changed

+252
-10
lines changed

hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1063,6 +1063,13 @@ public enum OperationStatusCode {
10631063
public static final String REGION_SERVER_REPLICATION_HANDLER_COUNT =
10641064
"hbase.regionserver.replication.handler.count";
10651065
public static final int DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT = 3;
1066+
public static final String REGION_SERVER_BULKLOAD_HANDLER_COUNT =
1067+
"hbase.regionserver.bulkload.handler.count";
1068+
public static final int DEFAULT_REGION_SERVER_BULKLOAD_HANDLER_COUNT = 1;
1069+
public static final String HBASE_REGIONSERVER_BULKLOAD_INDEPENDENCE_QUEUE_ENABLED =
1070+
"hbase.regionserver.bulkload.independence.queue.enabled";
1071+
public static final boolean DEFAULT_HBASE_REGIONSERVER_BULKLOAD_INDEPENDENCE_QUEUE_ENABLED =
1072+
false;
10661073
// Meta Transition handlers to deal with meta ReportRegionStateTransitionRequest. Meta transition
10671074
// should be dealt with in a separate handler in case blocking other region's transition.
10681075
public static final String MASTER_META_TRANSITION_HANDLER_COUNT =
@@ -1135,10 +1142,11 @@ public enum OperationStatusCode {
11351142
* QOS attributes: these attributes are used to demarcate RPC call processing by different set of
11361143
* handlers. For example, HIGH_QOS tagged methods are handled by high priority handlers.
11371144
*/
1138-
// normal_QOS < replication_QOS < replay_QOS < QOS_threshold < admin_QOS < high_QOS < meta_QOS
1145+
// normal_QOS < BULKLOAD_QOS < replication_QOS < replay_QOS < QOS_threshold < admin_QOS < high_QOS < meta_QOS
11391146
public static final int PRIORITY_UNSET = -1;
11401147
public static final int NORMAL_QOS = 0;
11411148
public static final int REPLICATION_QOS = 5;
1149+
public static final int BULKLOAD_QOS = 4;
11421150
/**
11431151
* @deprecated since 3.0.0, will be removed in 4.0.0. DLR has been purged for a long time and
11441152
* region replication has its own 'replay' method.

hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,9 @@ public interface MetricsHBaseServerSource extends ExceptionTrackingSource {
5656
String METAPRIORITY_QUEUE_NAME = "numCallsInMetaPriorityQueue";
5757
String REPLICATION_QUEUE_NAME = "numCallsInReplicationQueue";
5858
String REPLICATION_QUEUE_DESC = "Number of calls in the replication call queue waiting to be run";
59+
String BULKLOAD_QUEUE_NAME = "numCallsInBulkLoadQueue";
60+
String BULKLOAD_QUEUE_DESC =
61+
"Number of calls in the bulkload call queue waiting to be run";
5962
String PRIORITY_QUEUE_DESC = "Number of calls in the priority call queue waiting to be run";
6063
String METAPRIORITY_QUEUE_DESC = "Number of calls in the priority call queue waiting to be run";
6164
String WRITE_QUEUE_NAME = "numCallsInWriteQueue";
@@ -77,6 +80,8 @@ public interface MetricsHBaseServerSource extends ExceptionTrackingSource {
7780
String NUM_ACTIVE_PRIORITY_HANDLER_DESC = "Number of active priority rpc handlers.";
7881
String NUM_ACTIVE_REPLICATION_HANDLER_NAME = "numActiveReplicationHandler";
7982
String NUM_ACTIVE_REPLICATION_HANDLER_DESC = "Number of active replication rpc handlers.";
83+
String NUM_ACTIVE_BULKLOAD_HANDLER_NAME = "numActiveBulkLoadHandler";
84+
String NUM_ACTIVE_BULKLOAD_HANDLER_DESC = "Number of active bulkload rpc handlers.";
8085
String NUM_ACTIVE_WRITE_HANDLER_NAME = "numActiveWriteHandler";
8186
String NUM_ACTIVE_WRITE_HANDLER_DESC = "Number of active write rpc handlers.";
8287
String NUM_ACTIVE_READ_HANDLER_NAME = "numActiveReadHandler";

hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,8 @@ public void getMetrics(MetricsCollector metricsCollector, boolean all) {
143143
wrapper.getGeneralQueueLength())
144144
.addGauge(Interns.info(REPLICATION_QUEUE_NAME, REPLICATION_QUEUE_DESC),
145145
wrapper.getReplicationQueueLength())
146+
.addGauge(Interns.info(BULKLOAD_QUEUE_NAME, BULKLOAD_QUEUE_DESC),
147+
wrapper.getBulkLoadQueueLength())
146148
.addGauge(Interns.info(PRIORITY_QUEUE_NAME, PRIORITY_QUEUE_DESC),
147149
wrapper.getPriorityQueueLength())
148150
.addGauge(Interns.info(METAPRIORITY_QUEUE_NAME, METAPRIORITY_QUEUE_DESC),
@@ -163,6 +165,9 @@ public void getMetrics(MetricsCollector metricsCollector, boolean all) {
163165
.addCounter(Interns.info(NUM_LIFO_MODE_SWITCHES_NAME, NUM_LIFO_MODE_SWITCHES_DESC),
164166
wrapper.getNumLifoModeSwitches())
165167
.addGauge(Interns.info(WRITE_QUEUE_NAME, WRITE_QUEUE_DESC), wrapper.getWriteQueueLength())
168+
.addGauge(
169+
Interns.info(NUM_ACTIVE_BULKLOAD_HANDLER_NAME, NUM_ACTIVE_BULKLOAD_HANDLER_DESC),
170+
wrapper.getActiveBulkLoadRpcHandlerCount())
166171
.addGauge(Interns.info(READ_QUEUE_NAME, READ_QUEUE_DESC), wrapper.getReadQueueLength())
167172
.addGauge(Interns.info(SCAN_QUEUE_NAME, SCAN_QUEUE_DESC), wrapper.getScanQueueLength())
168173
.addGauge(Interns.info(NUM_ACTIVE_WRITE_HANDLER_NAME, NUM_ACTIVE_WRITE_HANDLER_DESC),

hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapper.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@ public interface MetricsHBaseServerWrapper {
2727

2828
int getReplicationQueueLength();
2929

30+
int getBulkLoadQueueLength();
31+
3032
int getPriorityQueueLength();
3133

3234
int getMetaPriorityQueueLength();
@@ -41,6 +43,8 @@ public interface MetricsHBaseServerWrapper {
4143

4244
int getActiveReplicationRpcHandlerCount();
4345

46+
int getActiveBulkLoadRpcHandlerCount();
47+
4448
int getActiveMetaPriorityRpcHandlerCount();
4549

4650
long getNumGeneralCallsDropped();

hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,10 @@ public int getReplicationQueueLength() {
130130
return 0;
131131
}
132132

133+
@Override public int getBulkLoadQueueLength() {
134+
return 0;
135+
}
136+
133137
@Override
134138
public int getActiveRpcHandlerCount() {
135139
return executor.getActiveCount();
@@ -150,6 +154,10 @@ public int getActiveReplicationRpcHandlerCount() {
150154
return 0;
151155
}
152156

157+
@Override public int getActiveBulkLoadRpcHandlerCount() {
158+
return 0;
159+
}
160+
153161
@Override
154162
public int getActiveMetaPriorityRpcHandlerCount() {
155163
return 0;

hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,13 @@ public int getReplicationQueueLength() {
5757
return server.getScheduler().getReplicationQueueLength();
5858
}
5959

60+
@Override public int getBulkLoadQueueLength() {
61+
if (!isServerStarted() || this.server.getScheduler() == null) {
62+
return 0;
63+
}
64+
return server.getScheduler().getBulkLoadQueueLength();
65+
}
66+
6067
@Override
6168
public int getPriorityQueueLength() {
6269
if (!isServerStarted() || this.server.getScheduler() == null) {
@@ -121,6 +128,13 @@ public int getActiveReplicationRpcHandlerCount() {
121128
return server.getScheduler().getActiveReplicationRpcHandlerCount();
122129
}
123130

131+
@Override public int getActiveBulkLoadRpcHandlerCount() {
132+
if (!isServerStarted() || this.server.getScheduler() == null) {
133+
return 0;
134+
}
135+
return server.getScheduler().getActiveBulkLoadRpcHandlerCount();
136+
}
137+
124138
@Override
125139
public long getNumGeneralCallsDropped() {
126140
if (!isServerStarted() || this.server.getScheduler() == null) {

hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@ public abstract class RpcScheduler {
3535
"hbase.ipc.server.priority.max.callqueue.length";
3636
public static final String IPC_SERVER_REPLICATION_MAX_CALLQUEUE_LENGTH =
3737
"hbase.ipc.server.replication.max.callqueue.length";
38+
public static final String IPC_SERVER_BULKLOAD_MAX_CALLQUEUE_LENGTH =
39+
"hbase.ipc.server.bulkload.max.callqueue.length";
3840

3941
/** Exposes runtime information of a {@code RpcServer} that a {@code RpcScheduler} may need. */
4042
public static abstract class Context {
@@ -78,6 +80,9 @@ public static abstract class Context {
7880
/** Retrieves length of the replication queue for metrics. */
7981
public abstract int getReplicationQueueLength();
8082

83+
/** Retrieves length of the bulkload queue for metrics. */
84+
public abstract int getBulkLoadQueueLength();
85+
8186
/** Retrieves the total number of active handler. */
8287
public abstract int getActiveRpcHandlerCount();
8388

@@ -93,6 +98,9 @@ public static abstract class Context {
9398
/** Retrieves the number of active replication handler. */
9499
public abstract int getActiveReplicationRpcHandlerCount();
95100

101+
/** Retrieves the number of active bulkload handler. */
102+
public abstract int getActiveBulkLoadRpcHandlerCount();
103+
96104
/**
97105
* If CoDel-based RPC executors are used, retrieves the number of Calls that were dropped from
98106
* general queue because RPC executor is under high load; returns 0 otherwise.

hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java

Lines changed: 60 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,38 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
4949
*/
5050
private final RpcExecutor metaTransitionExecutor;
5151

52+
private RpcExecutor bulkloadExecutor;
53+
5254
/** What level a high priority call is at. */
5355
private final int highPriorityLevel;
5456

5557
private Abortable abortable = null;
5658

59+
/**
60+
* @param conf conf
61+
* @param handlerCount the number of handler threads that will be used to process calls
62+
* @param priorityHandlerCount How many threads for priority handling.
63+
* @param replicationHandlerCount How many threads for replication handling.
64+
* @param metaTransitionHandler How many threads for meta handling.
65+
* @param bulkLoadHandlerCount How many threads for bulkload handling
66+
* @param highPriorityLevel high Priority Level
67+
* @param priority Function to extract request priority.
68+
*/
69+
public SimpleRpcScheduler(Configuration conf, int handlerCount, int priorityHandlerCount,
70+
int replicationHandlerCount, int metaTransitionHandler, int bulkLoadHandlerCount,
71+
PriorityFunction priority, Abortable server, int highPriorityLevel) {
72+
this(conf, handlerCount, priorityHandlerCount, replicationHandlerCount,
73+
metaTransitionHandler, priority, server, highPriorityLevel);
74+
int maxBulkLoadQueueLength =
75+
conf.getInt(RpcScheduler.IPC_SERVER_BULKLOAD_MAX_CALLQUEUE_LENGTH,
76+
bulkLoadHandlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
77+
this.bulkloadExecutor = bulkLoadHandlerCount > 0 ?
78+
new FastPathBalancedQueueRpcExecutor("bulkLoad.FPBQ", bulkLoadHandlerCount,
79+
RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxBulkLoadQueueLength, priority, conf,
80+
abortable) :
81+
null;
82+
}
83+
5784
/**
5885
* n * @param handlerCount the number of handler threads that will be used to process calls
5986
* @param priorityHandlerCount How many threads for priority handling.
@@ -130,6 +157,14 @@ public SimpleRpcScheduler(Configuration conf, int handlerCount, int priorityHand
130157
highPriorityLevel);
131158
}
132159

160+
public SimpleRpcScheduler(Configuration conf, int handlerCount, int priorityHandlerCount,
161+
int replicationHandlerCount, int bulkLoadHandlerCount,
162+
PriorityFunction priority, int highPriorityLevel) {
163+
this(conf, handlerCount, priorityHandlerCount, replicationHandlerCount,
164+
0, bulkLoadHandlerCount, priority, null,
165+
highPriorityLevel);
166+
}
167+
133168
/**
134169
* Resize call queues;
135170
* @param conf new configuration
@@ -173,6 +208,9 @@ public void start() {
173208
if (metaTransitionExecutor != null) {
174209
metaTransitionExecutor.start(port);
175210
}
211+
if (bulkloadExecutor != null) {
212+
bulkloadExecutor.start(port);
213+
}
176214

177215
}
178216

@@ -188,6 +226,9 @@ public void stop() {
188226
if (metaTransitionExecutor != null) {
189227
metaTransitionExecutor.stop();
190228
}
229+
if (bulkloadExecutor != null) {
230+
bulkloadExecutor.stop();
231+
}
191232

192233
}
193234

@@ -208,6 +249,8 @@ public boolean dispatch(CallRunner callTask) {
208249
return priorityExecutor.dispatch(callTask);
209250
} else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) {
210251
return replicationExecutor.dispatch(callTask);
252+
} else if (bulkloadExecutor != null && level == HConstants.BULKLOAD_QOS) {
253+
return bulkloadExecutor.dispatch(callTask);
211254
} else {
212255
return callExecutor.dispatch(callTask);
213256
}
@@ -233,10 +276,15 @@ public int getReplicationQueueLength() {
233276
return replicationExecutor == null ? 0 : replicationExecutor.getQueueLength();
234277
}
235278

279+
@Override public int getBulkLoadQueueLength() {
280+
return bulkloadExecutor == null ? 0 : bulkloadExecutor.getQueueLength();
281+
}
282+
236283
@Override
237284
public int getActiveRpcHandlerCount() {
238285
return callExecutor.getActiveHandlerCount() + getActivePriorityRpcHandlerCount()
239-
+ getActiveReplicationRpcHandlerCount() + getActiveMetaPriorityRpcHandlerCount();
286+
+ getActiveReplicationRpcHandlerCount() + getActiveMetaPriorityRpcHandlerCount()
287+
+ getActiveBulkLoadRpcHandlerCount();
240288
}
241289

242290
@Override
@@ -259,6 +307,10 @@ public int getActiveReplicationRpcHandlerCount() {
259307
return (replicationExecutor == null ? 0 : replicationExecutor.getActiveHandlerCount());
260308
}
261309

310+
@Override public int getActiveBulkLoadRpcHandlerCount() {
311+
return bulkloadExecutor == null ? 0 : bulkloadExecutor.getActiveHandlerCount();
312+
}
313+
262314
@Override
263315
public long getNumGeneralCallsDropped() {
264316
return callExecutor.getNumGeneralCallsDropped();
@@ -330,6 +382,13 @@ public CallQueueInfo getCallQueueInfo() {
330382
callQueueInfo.setCallMethodSize(queueName, metaTransitionExecutor.getCallQueueSizeSummary());
331383
}
332384

385+
if (null != bulkloadExecutor) {
386+
queueName = "BulkLoad Queue";
387+
callQueueInfo.setCallMethodCount(queueName,
388+
bulkloadExecutor.getCallQueueCountsSummary());
389+
callQueueInfo.setCallMethodSize(queueName, bulkloadExecutor.getCallQueueSizeSummary());
390+
}
391+
333392
return callQueueInfo;
334393
}
335394

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSAnnotationReadingPriorityFunction.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
3939
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
4040
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
41+
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
4142
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
4243
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
4344

@@ -96,6 +97,10 @@ protected int getBasePriority(RequestHeader header, Message param) {
9697
if (header.hasPriority()) {
9798
return header.getPriority();
9899
}
100+
if (param instanceof BulkLoadHFileRequest) {
101+
return HConstants.BULKLOAD_QOS;
102+
}
103+
99104
String cls = param.getClass().getName();
100105
Class<? extends Message> rpcArgClass = argumentToClassMap.get(cls);
101106
RegionSpecifier regionSpecifier = null;

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SimpleRpcSchedulerFactory.java

Lines changed: 24 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -37,13 +37,29 @@ public class SimpleRpcSchedulerFactory implements RpcSchedulerFactory {
3737
public RpcScheduler create(Configuration conf, PriorityFunction priority, Abortable server) {
3838
int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
3939
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
40-
return new SimpleRpcScheduler(conf, handlerCount,
41-
conf.getInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT,
42-
HConstants.DEFAULT_REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT),
43-
conf.getInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT,
44-
HConstants.DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT),
45-
conf.getInt(HConstants.MASTER_META_TRANSITION_HANDLER_COUNT,
46-
HConstants.MASTER__META_TRANSITION_HANDLER_COUNT_DEFAULT),
47-
priority, server, HConstants.QOS_THRESHOLD);
40+
boolean bulkLoadIndependenceQueueEnabled = conf.getBoolean(
41+
HConstants.HBASE_REGIONSERVER_BULKLOAD_INDEPENDENCE_QUEUE_ENABLED,
42+
HConstants.DEFAULT_HBASE_REGIONSERVER_BULKLOAD_INDEPENDENCE_QUEUE_ENABLED);
43+
if (bulkLoadIndependenceQueueEnabled) {
44+
return new SimpleRpcScheduler(conf, handlerCount,
45+
conf.getInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT,
46+
HConstants.DEFAULT_REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT),
47+
conf.getInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT,
48+
HConstants.DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT),
49+
conf.getInt(HConstants.MASTER_META_TRANSITION_HANDLER_COUNT,
50+
HConstants.MASTER__META_TRANSITION_HANDLER_COUNT_DEFAULT),
51+
conf.getInt(HConstants.REGION_SERVER_BULKLOAD_HANDLER_COUNT,
52+
HConstants.DEFAULT_REGION_SERVER_BULKLOAD_HANDLER_COUNT),
53+
priority, server, HConstants.QOS_THRESHOLD);
54+
} else {
55+
return new SimpleRpcScheduler(conf, handlerCount,
56+
conf.getInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT,
57+
HConstants.DEFAULT_REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT),
58+
conf.getInt(HConstants.REGION_SERVER_REPLICATION_HANDLER_COUNT,
59+
HConstants.DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT),
60+
conf.getInt(HConstants.MASTER_META_TRANSITION_HANDLER_COUNT,
61+
HConstants.MASTER__META_TRANSITION_HANDLER_COUNT_DEFAULT),
62+
priority, server, HConstants.QOS_THRESHOLD);
63+
}
4864
}
4965
}

0 commit comments

Comments
 (0)