From c0f5d95dbca638b52712ec7763eda342fff279ce Mon Sep 17 00:00:00 2001 From: SiCheng-Zheng Date: Mon, 11 Jul 2022 17:58:33 +0800 Subject: [PATCH] HBASE-27144 Improvement bulkload rpc --- .../org/apache/hadoop/hbase/HConstants.java | 4 ++ .../hbase/ipc/MetricsHBaseServerSource.java | 4 ++ .../ipc/MetricsHBaseServerSourceImpl.java | 4 ++ .../hbase/ipc/MetricsHBaseServerWrapper.java | 4 ++ .../hadoop/hbase/ipc/FifoRpcScheduler.java | 10 +++++ .../ipc/MetricsHBaseServerWrapperImpl.java | 16 ++++++++ .../apache/hadoop/hbase/ipc/RpcScheduler.java | 8 ++++ .../hadoop/hbase/ipc/SimpleRpcScheduler.java | 39 ++++++++++++++++++- .../RSAnnotationReadingPriorityFunction.java | 5 +++ .../hbase/ipc/DelegatingRpcScheduler.java | 10 +++++ .../ipc/MetricsHBaseServerWrapperStub.java | 10 +++++ .../hbase/ipc/TestSimpleRpcScheduler.java | 15 ++++++- 12 files changed, 125 insertions(+), 4 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java index f4d43a2da291..421fd9b408cb 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java @@ -1063,6 +1063,9 @@ public enum OperationStatusCode { public static final String REGION_SERVER_REPLICATION_HANDLER_COUNT = "hbase.regionserver.replication.handler.count"; public static final int DEFAULT_REGION_SERVER_REPLICATION_HANDLER_COUNT = 3; + public static final String REGION_SERVER_BULKLOAD_HANDLER_COUNT = + "hbase.regionserver.bulkload.handler.count"; + public static final int DEFAULT_REGION_SERVER_BULKLOAD_HANDLER_COUNT = 0; // Meta Transition handlers to deal with meta ReportRegionStateTransitionRequest. Meta transition // should be dealt with in a separate handler in case blocking other region's transition. public static final String MASTER_META_TRANSITION_HANDLER_COUNT = @@ -1139,6 +1142,7 @@ public enum OperationStatusCode { public static final int PRIORITY_UNSET = -1; public static final int NORMAL_QOS = 0; public static final int REPLICATION_QOS = 5; + public static final int BULKLOAD_QOS = 4; /** * @deprecated since 3.0.0, will be removed in 4.0.0. DLR has been purged for a long time and * region replication has its own 'replay' method. diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java index a1ec313f97a3..98ecf8b8d92d 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSource.java @@ -56,6 +56,8 @@ public interface MetricsHBaseServerSource extends ExceptionTrackingSource { String METAPRIORITY_QUEUE_NAME = "numCallsInMetaPriorityQueue"; String REPLICATION_QUEUE_NAME = "numCallsInReplicationQueue"; String REPLICATION_QUEUE_DESC = "Number of calls in the replication call queue waiting to be run"; + String BULKLOAD_QUEUE_NAME = "numCallsInBulkLoadQueue"; + String BULKLOAD_QUEUE_DESC = "Number of calls in the bulkload call queue waiting to be run"; String PRIORITY_QUEUE_DESC = "Number of calls in the priority call queue waiting to be run"; String METAPRIORITY_QUEUE_DESC = "Number of calls in the priority call queue waiting to be run"; String WRITE_QUEUE_NAME = "numCallsInWriteQueue"; @@ -77,6 +79,8 @@ public interface MetricsHBaseServerSource extends ExceptionTrackingSource { String NUM_ACTIVE_PRIORITY_HANDLER_DESC = "Number of active priority rpc handlers."; String NUM_ACTIVE_REPLICATION_HANDLER_NAME = "numActiveReplicationHandler"; String NUM_ACTIVE_REPLICATION_HANDLER_DESC = "Number of active replication rpc handlers."; + String NUM_ACTIVE_BULKLOAD_HANDLER_NAME = "numActiveBulkLoadHandler"; + String NUM_ACTIVE_BULKLOAD_HANDLER_DESC = "Number of active bulkload rpc handlers."; String NUM_ACTIVE_WRITE_HANDLER_NAME = "numActiveWriteHandler"; String NUM_ACTIVE_WRITE_HANDLER_DESC = "Number of active write rpc handlers."; String NUM_ACTIVE_READ_HANDLER_NAME = "numActiveReadHandler"; diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java index 440ebc6f5a6c..9c75f4e6bcba 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerSourceImpl.java @@ -143,6 +143,8 @@ public void getMetrics(MetricsCollector metricsCollector, boolean all) { wrapper.getGeneralQueueLength()) .addGauge(Interns.info(REPLICATION_QUEUE_NAME, REPLICATION_QUEUE_DESC), wrapper.getReplicationQueueLength()) + .addGauge(Interns.info(BULKLOAD_QUEUE_NAME, BULKLOAD_QUEUE_DESC), + wrapper.getBulkLoadQueueLength()) .addGauge(Interns.info(PRIORITY_QUEUE_NAME, PRIORITY_QUEUE_DESC), wrapper.getPriorityQueueLength()) .addGauge(Interns.info(METAPRIORITY_QUEUE_NAME, METAPRIORITY_QUEUE_DESC), @@ -163,6 +165,8 @@ public void getMetrics(MetricsCollector metricsCollector, boolean all) { .addCounter(Interns.info(NUM_LIFO_MODE_SWITCHES_NAME, NUM_LIFO_MODE_SWITCHES_DESC), wrapper.getNumLifoModeSwitches()) .addGauge(Interns.info(WRITE_QUEUE_NAME, WRITE_QUEUE_DESC), wrapper.getWriteQueueLength()) + .addGauge(Interns.info(NUM_ACTIVE_BULKLOAD_HANDLER_NAME, NUM_ACTIVE_BULKLOAD_HANDLER_DESC), + wrapper.getActiveBulkLoadRpcHandlerCount()) .addGauge(Interns.info(READ_QUEUE_NAME, READ_QUEUE_DESC), wrapper.getReadQueueLength()) .addGauge(Interns.info(SCAN_QUEUE_NAME, SCAN_QUEUE_DESC), wrapper.getScanQueueLength()) .addGauge(Interns.info(NUM_ACTIVE_WRITE_HANDLER_NAME, NUM_ACTIVE_WRITE_HANDLER_DESC), diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapper.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapper.java index 136294883b69..1a8980bbc7bd 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapper.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapper.java @@ -27,6 +27,8 @@ public interface MetricsHBaseServerWrapper { int getReplicationQueueLength(); + int getBulkLoadQueueLength(); + int getPriorityQueueLength(); int getMetaPriorityQueueLength(); @@ -41,6 +43,8 @@ public interface MetricsHBaseServerWrapper { int getActiveReplicationRpcHandlerCount(); + int getActiveBulkLoadRpcHandlerCount(); + int getActiveMetaPriorityRpcHandlerCount(); long getNumGeneralCallsDropped(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java index b1b2193d5b9c..b51154fc24eb 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FifoRpcScheduler.java @@ -130,6 +130,11 @@ public int getReplicationQueueLength() { return 0; } + @Override + public int getBulkLoadQueueLength() { + return 0; + } + @Override public int getActiveRpcHandlerCount() { return executor.getActiveCount(); @@ -150,6 +155,11 @@ public int getActiveReplicationRpcHandlerCount() { return 0; } + @Override + public int getActiveBulkLoadRpcHandlerCount() { + return 0; + } + @Override public int getActiveMetaPriorityRpcHandlerCount() { return 0; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java index 0b00bba04fb7..857315568c5e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperImpl.java @@ -57,6 +57,14 @@ public int getReplicationQueueLength() { return server.getScheduler().getReplicationQueueLength(); } + @Override + public int getBulkLoadQueueLength() { + if (!isServerStarted() || this.server.getScheduler() == null) { + return 0; + } + return server.getScheduler().getBulkLoadQueueLength(); + } + @Override public int getPriorityQueueLength() { if (!isServerStarted() || this.server.getScheduler() == null) { @@ -121,6 +129,14 @@ public int getActiveReplicationRpcHandlerCount() { return server.getScheduler().getActiveReplicationRpcHandlerCount(); } + @Override + public int getActiveBulkLoadRpcHandlerCount() { + if (!isServerStarted() || this.server.getScheduler() == null) { + return 0; + } + return server.getScheduler().getActiveBulkLoadRpcHandlerCount(); + } + @Override public long getNumGeneralCallsDropped() { if (!isServerStarted() || this.server.getScheduler() == null) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java index 0d0a1f8659c3..f73590a96c70 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcScheduler.java @@ -35,6 +35,8 @@ public abstract class RpcScheduler { "hbase.ipc.server.priority.max.callqueue.length"; public static final String IPC_SERVER_REPLICATION_MAX_CALLQUEUE_LENGTH = "hbase.ipc.server.replication.max.callqueue.length"; + public static final String IPC_SERVER_BULKLOAD_MAX_CALLQUEUE_LENGTH = + "hbase.ipc.server.bulkload.max.callqueue.length"; /** Exposes runtime information of a {@code RpcServer} that a {@code RpcScheduler} may need. */ public static abstract class Context { @@ -78,6 +80,9 @@ public static abstract class Context { /** Retrieves length of the replication queue for metrics. */ public abstract int getReplicationQueueLength(); + /** Retrieves length of the bulkload queue for metrics. */ + public abstract int getBulkLoadQueueLength(); + /** Retrieves the total number of active handler. */ public abstract int getActiveRpcHandlerCount(); @@ -93,6 +98,9 @@ public static abstract class Context { /** Retrieves the number of active replication handler. */ public abstract int getActiveReplicationRpcHandlerCount(); + /** Retrieves the number of active bulkload handler. */ + public abstract int getActiveBulkLoadRpcHandlerCount(); + /** * If CoDel-based RPC executors are used, retrieves the number of Calls that were dropped from * general queue because RPC executor is under high load; returns 0 otherwise. diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java index 2d0c8f1f62ac..93c3e85b98d6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java @@ -49,6 +49,8 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs */ private final RpcExecutor metaTransitionExecutor; + private final RpcExecutor bulkloadExecutor; + /** What level a high priority call is at. */ private final int highPriorityLevel; @@ -63,7 +65,8 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs public SimpleRpcScheduler(Configuration conf, int handlerCount, int priorityHandlerCount, int replicationHandlerCount, int metaTransitionHandler, PriorityFunction priority, Abortable server, int highPriorityLevel) { - + int bulkLoadHandlerCount = conf.getInt(HConstants.REGION_SERVER_BULKLOAD_HANDLER_COUNT, + HConstants.DEFAULT_REGION_SERVER_BULKLOAD_HANDLER_COUNT); int maxQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_MAX_CALLQUEUE_LENGTH, handlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); int maxPriorityQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH, @@ -71,6 +74,8 @@ public SimpleRpcScheduler(Configuration conf, int handlerCount, int priorityHand int maxReplicationQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_REPLICATION_MAX_CALLQUEUE_LENGTH, replicationHandlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); + int maxBulkLoadQueueLength = conf.getInt(RpcScheduler.IPC_SERVER_BULKLOAD_MAX_CALLQUEUE_LENGTH, + bulkLoadHandlerCount * RpcServer.DEFAULT_MAX_CALLQUEUE_LENGTH_PER_HANDLER); this.priority = priority; this.highPriorityLevel = highPriorityLevel; @@ -122,6 +127,11 @@ public SimpleRpcScheduler(Configuration conf, int handlerCount, int priorityHand RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxPriorityQueueLength, priority, conf, abortable) : null; + this.bulkloadExecutor = bulkLoadHandlerCount > 0 + ? new FastPathBalancedQueueRpcExecutor("bulkLoad.FPBQ", bulkLoadHandlerCount, + RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE, maxBulkLoadQueueLength, priority, conf, + abortable) + : null; } public SimpleRpcScheduler(Configuration conf, int handlerCount, int priorityHandlerCount, @@ -173,6 +183,9 @@ public void start() { if (metaTransitionExecutor != null) { metaTransitionExecutor.start(port); } + if (bulkloadExecutor != null) { + bulkloadExecutor.start(port); + } } @@ -188,6 +201,9 @@ public void stop() { if (metaTransitionExecutor != null) { metaTransitionExecutor.stop(); } + if (bulkloadExecutor != null) { + bulkloadExecutor.stop(); + } } @@ -208,6 +224,8 @@ public boolean dispatch(CallRunner callTask) { return priorityExecutor.dispatch(callTask); } else if (replicationExecutor != null && level == HConstants.REPLICATION_QOS) { return replicationExecutor.dispatch(callTask); + } else if (bulkloadExecutor != null && level == HConstants.BULKLOAD_QOS) { + return bulkloadExecutor.dispatch(callTask); } else { return callExecutor.dispatch(callTask); } @@ -233,10 +251,16 @@ public int getReplicationQueueLength() { return replicationExecutor == null ? 0 : replicationExecutor.getQueueLength(); } + @Override + public int getBulkLoadQueueLength() { + return bulkloadExecutor == null ? 0 : bulkloadExecutor.getQueueLength(); + } + @Override public int getActiveRpcHandlerCount() { return callExecutor.getActiveHandlerCount() + getActivePriorityRpcHandlerCount() - + getActiveReplicationRpcHandlerCount() + getActiveMetaPriorityRpcHandlerCount(); + + getActiveReplicationRpcHandlerCount() + getActiveMetaPriorityRpcHandlerCount() + + getActiveBulkLoadRpcHandlerCount(); } @Override @@ -259,6 +283,11 @@ public int getActiveReplicationRpcHandlerCount() { return (replicationExecutor == null ? 0 : replicationExecutor.getActiveHandlerCount()); } + @Override + public int getActiveBulkLoadRpcHandlerCount() { + return bulkloadExecutor == null ? 0 : bulkloadExecutor.getActiveHandlerCount(); + } + @Override public long getNumGeneralCallsDropped() { return callExecutor.getNumGeneralCallsDropped(); @@ -330,6 +359,12 @@ public CallQueueInfo getCallQueueInfo() { callQueueInfo.setCallMethodSize(queueName, metaTransitionExecutor.getCallQueueSizeSummary()); } + if (null != bulkloadExecutor) { + queueName = "BulkLoad Queue"; + callQueueInfo.setCallMethodCount(queueName, bulkloadExecutor.getCallQueueCountsSummary()); + callQueueInfo.setCallMethodSize(queueName, bulkloadExecutor.getCallQueueSizeSummary()); + } + return callQueueInfo; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSAnnotationReadingPriorityFunction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSAnnotationReadingPriorityFunction.java index 905b88136323..1197f7b5359c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSAnnotationReadingPriorityFunction.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSAnnotationReadingPriorityFunction.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.BulkLoadHFileRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.GetRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutateRequest; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; @@ -96,6 +97,10 @@ protected int getBasePriority(RequestHeader header, Message param) { if (header.hasPriority()) { return header.getPriority(); } + if (param instanceof BulkLoadHFileRequest) { + return HConstants.BULKLOAD_QOS; + } + String cls = param.getClass().getName(); Class rpcArgClass = argumentToClassMap.get(cls); RegionSpecifier regionSpecifier = null; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/DelegatingRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/DelegatingRpcScheduler.java index 7d2836e7c6ca..f8ac4a1bb9a1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/DelegatingRpcScheduler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/DelegatingRpcScheduler.java @@ -44,6 +44,11 @@ public int getReplicationQueueLength() { return delegate.getReplicationQueueLength(); } + @Override + public int getBulkLoadQueueLength() { + return delegate.getBulkLoadQueueLength(); + } + @Override public int getPriorityQueueLength() { return delegate.getPriorityQueueLength(); @@ -74,6 +79,11 @@ public int getActiveReplicationRpcHandlerCount() { return delegate.getActiveReplicationRpcHandlerCount(); } + @Override + public int getActiveBulkLoadRpcHandlerCount() { + return delegate.getActiveBulkLoadRpcHandlerCount(); + } + @Override public boolean dispatch(CallRunner task) { return delegate.dispatch(task); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperStub.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperStub.java index f525c027a409..6e5dfe87fc7b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperStub.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/MetricsHBaseServerWrapperStub.java @@ -33,6 +33,11 @@ public int getReplicationQueueLength() { return 103; } + @Override + public int getBulkLoadQueueLength() { + return 109; + } + @Override public int getPriorityQueueLength() { return 104; @@ -63,6 +68,11 @@ public int getActiveReplicationRpcHandlerCount() { return 203; } + @Override + public int getActiveBulkLoadRpcHandlerCount() { + return 204; + } + @Override public long getNumGeneralCallsDropped() { return 3; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java index 79bc97fb07c0..0c629231728f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java @@ -480,12 +480,21 @@ public void testScanQueues() throws Exception { when(scanCall.getHeader()).thenReturn(scanHead); when(scanCall.getParam()).thenReturn(scanCall.param); + CallRunner bulkLoadCallTask = mock(CallRunner.class); + ServerCall bulkLoadCall = mock(ServerCall.class); + bulkLoadCall.param = ScanRequest.newBuilder().build(); + RequestHeader bulkLadHead = RequestHeader.newBuilder().setMethodName("bulkload").build(); + when(bulkLoadCallTask.getRpcCall()).thenReturn(bulkLoadCall); + when(bulkLoadCall.getHeader()).thenReturn(bulkLadHead); + when(bulkLoadCall.getParam()).thenReturn(bulkLoadCall.param); + ArrayList work = new ArrayList<>(); doAnswerTaskExecution(putCallTask, work, 1, 1000); doAnswerTaskExecution(getCallTask, work, 2, 1000); doAnswerTaskExecution(scanCallTask, work, 3, 1000); + doAnswerTaskExecution(bulkLoadCallTask, work, 4, 1000); - // There are 3 queues: [puts], [gets], [scans] + // There are 3 queues: [puts], [gets], [scans], [bulkload] // so the calls will be interleaved scheduler.dispatch(putCallTask); scheduler.dispatch(putCallTask); @@ -496,7 +505,9 @@ public void testScanQueues() throws Exception { scheduler.dispatch(scanCallTask); scheduler.dispatch(scanCallTask); scheduler.dispatch(scanCallTask); - + scheduler.dispatch(bulkLoadCallTask); + scheduler.dispatch(bulkLoadCallTask); + scheduler.dispatch(bulkLoadCallTask); while (work.size() < 6) { Thread.sleep(100); }