Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1063,6 +1063,9 @@ public enum OperationStatusCode {
public static final String REGION_SERVER_REPLICATION_HANDLER_COUNT =
"hbase.regionserver.replication.handler.count";
public static final int DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT = 3;
public static final String REGION_SERVER_BULKLOAD_HANDLER_COUNT =
"hbase.regionserver.bulkload.handler.count";
public static final int DEFAULT_REGION_SERVER_BULKLOAD_HANDLER_COUNT = 0;
// Meta Transition handlers to deal with meta ReportRegionStateTransitionRequest. Meta transition
// should be dealt with in a separate handler in case blocking other region's transition.
public static final String MASTER_META_TRANSITION_HANDLER_COUNT =
Expand Down Expand Up @@ -1139,6 +1142,7 @@ public enum OperationStatusCode {
public static final int PRIORITY_UNSET = -1;
public static final int NORMAL_QOS = 0;
public static final int REPLICATION_QOS = 5;
public static final int BULKLOAD_QOS = 4;
/**
* @deprecated since 3.0.0, will be removed in 4.0.0. DLR has been purged for a long time and
* region replication has its own 'replay' method.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ public interface MetricsHBaseServerSource extends ExceptionTrackingSource {
String METAPRIORITY_QUEUE_NAME = "numCallsInMetaPriorityQueue";
String REPLICATION_QUEUE_NAME = "numCallsInReplicationQueue";
String REPLICATION_QUEUE_DESC = "Number of calls in the replication call queue waiting to be run";
String BULKLOAD_QUEUE_NAME = "numCallsInBulkLoadQueue";
String BULKLOAD_QUEUE_DESC = "Number of calls in the bulkload call queue waiting to be run";
String PRIORITY_QUEUE_DESC = "Number of calls in the priority call queue waiting to be run";
String METAPRIORITY_QUEUE_DESC = "Number of calls in the priority call queue waiting to be run";
String WRITE_QUEUE_NAME = "numCallsInWriteQueue";
Expand All @@ -77,6 +79,8 @@ public interface MetricsHBaseServerSource extends ExceptionTrackingSource {
String NUM_ACTIVE_PRIORITY_HANDLER_DESC = "Number of active priority rpc handlers.";
String NUM_ACTIVE_REPLICATION_HANDLER_NAME = "numActiveReplicationHandler";
String NUM_ACTIVE_REPLICATION_HANDLER_DESC = "Number of active replication rpc handlers.";
String NUM_ACTIVE_BULKLOAD_HANDLER_NAME = "numActiveBulkLoadHandler";
String NUM_ACTIVE_BULKLOAD_HANDLER_DESC = "Number of active bulkload rpc handlers.";
String NUM_ACTIVE_WRITE_HANDLER_NAME = "numActiveWriteHandler";
String NUM_ACTIVE_WRITE_HANDLER_DESC = "Number of active write rpc handlers.";
String NUM_ACTIVE_READ_HANDLER_NAME = "numActiveReadHandler";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ public void getMetrics(MetricsCollector metricsCollector, boolean all) {
wrapper.getGeneralQueueLength())
.addGauge(Interns.info(REPLICATION_QUEUE_NAME, REPLICATION_QUEUE_DESC),
wrapper.getReplicationQueueLength())
.addGauge(Interns.info(BULKLOAD_QUEUE_NAME, BULKLOAD_QUEUE_DESC),
wrapper.getBulkLoadQueueLength())
.addGauge(Interns.info(PRIORITY_QUEUE_NAME, PRIORITY_QUEUE_DESC),
wrapper.getPriorityQueueLength())
.addGauge(Interns.info(METAPRIORITY_QUEUE_NAME, METAPRIORITY_QUEUE_DESC),
Expand All @@ -163,6 +165,8 @@ public void getMetrics(MetricsCollector metricsCollector, boolean all) {
.addCounter(Interns.info(NUM_LIFO_MODE_SWITCHES_NAME, NUM_LIFO_MODE_SWITCHES_DESC),
wrapper.getNumLifoModeSwitches())
.addGauge(Interns.info(WRITE_QUEUE_NAME, WRITE_QUEUE_DESC), wrapper.getWriteQueueLength())
.addGauge(Interns.info(NUM_ACTIVE_BULKLOAD_HANDLER_NAME, NUM_ACTIVE_BULKLOAD_HANDLER_DESC),
wrapper.getActiveBulkLoadRpcHandlerCount())
.addGauge(Interns.info(READ_QUEUE_NAME, READ_QUEUE_DESC), wrapper.getReadQueueLength())
.addGauge(Interns.info(SCAN_QUEUE_NAME, SCAN_QUEUE_DESC), wrapper.getScanQueueLength())
.addGauge(Interns.info(NUM_ACTIVE_WRITE_HANDLER_NAME, NUM_ACTIVE_WRITE_HANDLER_DESC),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ public interface MetricsHBaseServerWrapper {

int getReplicationQueueLength();

int getBulkLoadQueueLength();

int getPriorityQueueLength();

int getMetaPriorityQueueLength();
Expand All @@ -41,6 +43,8 @@ public interface MetricsHBaseServerWrapper {

int getActiveReplicationRpcHandlerCount();

int getActiveBulkLoadRpcHandlerCount();

int getActiveMetaPriorityRpcHandlerCount();

long getNumGeneralCallsDropped();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,11 @@ public int getReplicationQueueLength() {
return 0;
}

@Override
public int getBulkLoadQueueLength() {
return 0;
}

@Override
public int getActiveRpcHandlerCount() {
return executor.getActiveCount();
Expand All @@ -150,6 +155,11 @@ public int getActiveReplicationRpcHandlerCount() {
return 0;
}

@Override
public int getActiveBulkLoadRpcHandlerCount() {
return 0;
}

@Override
public int getActiveMetaPriorityRpcHandlerCount() {
return 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ public int getReplicationQueueLength() {
return server.getScheduler().getReplicationQueueLength();
}

@Override
public int getBulkLoadQueueLength() {
if (!isServerStarted() || this.server.getScheduler() == null) {
return 0;
}
return server.getScheduler().getBulkLoadQueueLength();
}

@Override
public int getPriorityQueueLength() {
if (!isServerStarted() || this.server.getScheduler() == null) {
Expand Down Expand Up @@ -121,6 +129,14 @@ public int getActiveReplicationRpcHandlerCount() {
return server.getScheduler().getActiveReplicationRpcHandlerCount();
}

@Override
public int getActiveBulkLoadRpcHandlerCount() {
if (!isServerStarted() || this.server.getScheduler() == null) {
return 0;
}
return server.getScheduler().getActiveBulkLoadRpcHandlerCount();
}

@Override
public long getNumGeneralCallsDropped() {
if (!isServerStarted() || this.server.getScheduler() == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ public abstract class RpcScheduler {
"hbase.ipc.server.priority.max.callqueue.length";
public static final String IPC_SERVER_REPLICATION_MAX_CALLQUEUE_LENGTH =
"hbase.ipc.server.replication.max.callqueue.length";
public static final String IPC_SERVER_BULKLOAD_MAX_CALLQUEUE_LENGTH =
"hbase.ipc.server.bulkload.max.callqueue.length";

/** Exposes runtime information of a {@code RpcServer} that a {@code RpcScheduler} may need. */
public static abstract class Context {
Expand Down Expand Up @@ -78,6 +80,9 @@ public static abstract class Context {
/** Retrieves length of the replication queue for metrics. */
public abstract int getReplicationQueueLength();

/** Retrieves length of the bulkload queue for metrics. */
public abstract int getBulkLoadQueueLength();

/** Retrieves the total number of active handler. */
public abstract int getActiveRpcHandlerCount();

Expand All @@ -93,6 +98,9 @@ public static abstract class Context {
/** Retrieves the number of active replication handler. */
public abstract int getActiveReplicationRpcHandlerCount();

/** Retrieves the number of active bulkload handler. */
public abstract int getActiveBulkLoadRpcHandlerCount();

/**
* If CoDel-based RPC executors are used, retrieves the number of Calls that were dropped from
* general queue because RPC executor is under high load; returns 0 otherwise.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
*/
private final RpcExecutor metaTransitionExecutor;

private final RpcExecutor bulkloadExecutor;

/** What level a high priority call is at. */
private final int highPriorityLevel;

Expand All @@ -63,14 +65,17 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
public SimpleRpcScheduler(Configuration conf, int handlerCount, int priorityHandlerCount,
int replicationHandlerCount, int metaTransitionHandler, PriorityFunction priority,
Abortable server, int highPriorityLevel) {

int bulkLoadHandlerCount = conf.getInt(HConstants.REGION_SERVER_BULKLOAD_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_BULKLOAD_HANDLER_COUNT);
int maxQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH,
handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
int maxPriorityQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH,
priorityHandlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
int maxReplicationQueueLength =
conf.getInt(RpcScheduler.IPC_SERVER_REPLICATION_MAX_CALLQUEUE_LENGTH,
replicationHandlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);
int maxBulkLoadQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_BULKLOAD_MAX_CALLQUEUE_LENGTH,
bulkLoadHandlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER);

this.priority = priority;
this.highPriorityLevel = highPriorityLevel;
Expand Down Expand Up @@ -122,6 +127,11 @@ public SimpleRpcScheduler(Configuration conf, int handlerCount, int priorityHand
RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxPriorityQueueLength, priority, conf,
abortable)
: null;
this.bulkloadExecutor = bulkLoadHandlerCount > 0
? new FastPathBalancedQueueRpcExecutor("bulkLoad.FPBQ", bulkLoadHandlerCount,
RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxBulkLoadQueueLength, priority, conf,
abortable)
: null;
}

public SimpleRpcScheduler(Configuration conf, int handlerCount, int priorityHandlerCount,
Expand Down Expand Up @@ -173,6 +183,9 @@ public void start() {
if (metaTransitionExecutor != null) {
metaTransitionExecutor.start(port);
}
if (bulkloadExecutor != null) {
bulkloadExecutor.start(port);
}

}

Expand All @@ -188,6 +201,9 @@ public void stop() {
if (metaTransitionExecutor != null) {
metaTransitionExecutor.stop();
}
if (bulkloadExecutor != null) {
bulkloadExecutor.stop();
}

}

Expand All @@ -208,6 +224,8 @@ public boolean dispatch(CallRunner callTask) {
return priorityExecutor.dispatch(callTask);
} else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) {
return replicationExecutor.dispatch(callTask);
} else if (bulkloadExecutor != null && level == HConstants.BULKLOAD_QOS) {
return bulkloadExecutor.dispatch(callTask);
} else {
return callExecutor.dispatch(callTask);
}
Expand All @@ -233,10 +251,16 @@ public int getReplicationQueueLength() {
return replicationExecutor == null ? 0 : replicationExecutor.getQueueLength();
}

@Override
public int getBulkLoadQueueLength() {
return bulkloadExecutor == null ? 0 : bulkloadExecutor.getQueueLength();
}

@Override
public int getActiveRpcHandlerCount() {
return callExecutor.getActiveHandlerCount() + getActivePriorityRpcHandlerCount()
+ getActiveReplicationRpcHandlerCount() + getActiveMetaPriorityRpcHandlerCount();
+ getActiveReplicationRpcHandlerCount() + getActiveMetaPriorityRpcHandlerCount()
+ getActiveBulkLoadRpcHandlerCount();
}

@Override
Expand All @@ -259,6 +283,11 @@ public int getActiveReplicationRpcHandlerCount() {
return (replicationExecutor == null ? 0 : replicationExecutor.getActiveHandlerCount());
}

@Override
public int getActiveBulkLoadRpcHandlerCount() {
return bulkloadExecutor == null ? 0 : bulkloadExecutor.getActiveHandlerCount();
}

@Override
public long getNumGeneralCallsDropped() {
return callExecutor.getNumGeneralCallsDropped();
Expand Down Expand Up @@ -330,6 +359,12 @@ public CallQueueInfo getCallQueueInfo() {
callQueueInfo.setCallMethodSize(queueName, metaTransitionExecutor.getCallQueueSizeSummary());
}

if (null != bulkloadExecutor) {
queueName = "BulkLoad Queue";
callQueueInfo.setCallMethodCount(queueName, bulkloadExecutor.getCallQueueCountsSummary());
callQueueInfo.setCallMethodSize(queueName, bulkloadExecutor.getCallQueueSizeSummary());
}

return callQueueInfo;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
Expand Down Expand Up @@ -96,6 +97,10 @@ protected int getBasePriority(RequestHeader header, Message param) {
if (header.hasPriority()) {
return header.getPriority();
}
if (param instanceof BulkLoadHFileRequest) {
return HConstants.BULKLOAD_QOS;
}

String cls = param.getClass().getName();
Class<? extends Message> rpcArgClass = argumentToClassMap.get(cls);
RegionSpecifier regionSpecifier = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ public int getReplicationQueueLength() {
return delegate.getReplicationQueueLength();
}

@Override
public int getBulkLoadQueueLength() {
return delegate.getBulkLoadQueueLength();
}

@Override
public int getPriorityQueueLength() {
return delegate.getPriorityQueueLength();
Expand Down Expand Up @@ -74,6 +79,11 @@ public int getActiveReplicationRpcHandlerCount() {
return delegate.getActiveReplicationRpcHandlerCount();
}

@Override
public int getActiveBulkLoadRpcHandlerCount() {
return delegate.getActiveBulkLoadRpcHandlerCount();
}

@Override
public boolean dispatch(CallRunner task) {
return delegate.dispatch(task);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ public int getReplicationQueueLength() {
return 103;
}

@Override
public int getBulkLoadQueueLength() {
return 109;
}

@Override
public int getPriorityQueueLength() {
return 104;
Expand Down Expand Up @@ -63,6 +68,11 @@ public int getActiveReplicationRpcHandlerCount() {
return 203;
}

@Override
public int getActiveBulkLoadRpcHandlerCount() {
return 204;
}

@Override
public long getNumGeneralCallsDropped() {
return 3;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -480,12 +480,21 @@ public void testScanQueues() throws Exception {
when(scanCall.getHeader()).thenReturn(scanHead);
when(scanCall.getParam()).thenReturn(scanCall.param);

CallRunner bulkLoadCallTask = mock(CallRunner.class);
ServerCall bulkLoadCall = mock(ServerCall.class);
bulkLoadCall.param = ScanRequest.newBuilder().build();
RequestHeader bulkLadHead = RequestHeader.newBuilder().setMethodName("bulkload").build();
when(bulkLoadCallTask.getRpcCall()).thenReturn(bulkLoadCall);
when(bulkLoadCall.getHeader()).thenReturn(bulkLadHead);
when(bulkLoadCall.getParam()).thenReturn(bulkLoadCall.param);

ArrayList<Integer> work = new ArrayList<>();
doAnswerTaskExecution(putCallTask, work, 1, 1000);
doAnswerTaskExecution(getCallTask, work, 2, 1000);
doAnswerTaskExecution(scanCallTask, work, 3, 1000);
doAnswerTaskExecution(bulkLoadCallTask, work, 4, 1000);

// There are 3 queues: [puts], [gets], [scans]
// There are 3 queues: [puts], [gets], [scans], [bulkload]
// so the calls will be interleaved
scheduler.dispatch(putCallTask);
scheduler.dispatch(putCallTask);
Expand All @@ -496,7 +505,9 @@ public void testScanQueues() throws Exception {
scheduler.dispatch(scanCallTask);
scheduler.dispatch(scanCallTask);
scheduler.dispatch(scanCallTask);

scheduler.dispatch(bulkLoadCallTask);
scheduler.dispatch(bulkLoadCallTask);
scheduler.dispatch(bulkLoadCallTask);
while (work.size() < 6) {
Thread.sleep(100);
}
Expand Down