1- /**
1+ /*
22 * Licensed to the Apache Software Foundation (ASF) under one
33 * or more contributor license agreements. See the NOTICE file
44 * distributed with this work for additional information
1919package org .apache .hadoop .hbase .ipc ;
2020
2121import java .util .ArrayList ;
22+ import java .util .Collection ;
2223import java .util .Comparator ;
23- import java .util .HashMap ;
2424import java .util .List ;
2525import java .util .Locale ;
2626import java .util .Map ;
2929import java .util .concurrent .LinkedBlockingQueue ;
3030import java .util .concurrent .atomic .AtomicInteger ;
3131import java .util .concurrent .atomic .LongAdder ;
32+ import java .util .function .Function ;
33+ import java .util .stream .Collectors ;
3234import org .apache .hadoop .conf .Configuration ;
3335import org .apache .hadoop .hbase .Abortable ;
3436import org .apache .hadoop .hbase .HConstants ;
3537import org .apache .hadoop .hbase .conf .ConfigurationObserver ;
3638import org .apache .hadoop .hbase .util .BoundedPriorityBlockingQueue ;
39+ import org .apache .hadoop .hbase .util .Pair ;
3740import org .apache .hadoop .hbase .util .ReflectionUtils ;
38- import org .apache .hbase .thirdparty .io .netty .util .internal .StringUtil ;
3941import org .apache .yetus .audience .InterfaceAudience ;
4042import org .slf4j .Logger ;
4143import org .slf4j .LoggerFactory ;
4244import org .apache .hbase .thirdparty .com .google .common .base .Preconditions ;
4345import org .apache .hbase .thirdparty .com .google .common .base .Strings ;
46+ import org .apache .hbase .thirdparty .com .google .protobuf .Descriptors ;
4447
4548/**
4649 * Runs the CallRunners passed here via {@link #dispatch(CallRunner)}. Subclass and add particular
@@ -51,14 +54,16 @@ public abstract class RpcExecutor {
5154 private static final Logger LOG = LoggerFactory .getLogger (RpcExecutor .class );
5255
5356 protected static final int DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT = 250 ;
54- public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY = "hbase.ipc.server.callqueue.handler.factor" ;
57+ public static final String CALL_QUEUE_HANDLER_FACTOR_CONF_KEY =
58+ "hbase.ipc.server.callqueue.handler.factor" ;
5559
56- /** max delay in msec used to bound the deprioritized requests */
57- public static final String QUEUE_MAX_CALL_DELAY_CONF_KEY = "hbase.ipc.server.queue.max.call.delay" ;
60+ /** max delay in msec used to bound the de-prioritized requests */
61+ public static final String QUEUE_MAX_CALL_DELAY_CONF_KEY =
62+ "hbase.ipc.server.queue.max.call.delay" ;
5863
5964 /**
6065 * The default, 'fifo', has the least friction but is dumb. If set to 'deadline', uses a priority
61- * queue and deprioritizes long-running scans. Sorting by priority comes at a cost, reduced
66+ * queue and de-prioritizes long-running scans. Sorting by priority comes at a cost, reduced
6267 * throughput.
6368 */
6469 public static final String CALL_QUEUE_TYPE_CODEL_CONF_VALUE = "codel" ;
@@ -68,14 +73,18 @@ public abstract class RpcExecutor {
6873 public static final String CALL_QUEUE_TYPE_CONF_KEY = "hbase.ipc.server.callqueue.type" ;
6974 public static final String CALL_QUEUE_TYPE_CONF_DEFAULT = CALL_QUEUE_TYPE_FIFO_CONF_VALUE ;
7075
71- public static final String CALL_QUEUE_QUEUE_BALANCER_CLASS = "hbase.ipc.server.callqueue.balancer.class" ;
76+ public static final String CALL_QUEUE_QUEUE_BALANCER_CLASS =
77+ "hbase.ipc.server.callqueue.balancer.class" ;
7278 public static final Class <?> CALL_QUEUE_QUEUE_BALANCER_CLASS_DEFAULT = RandomQueueBalancer .class ;
7379
7480
7581 // These 3 are only used by Codel executor
76- public static final String CALL_QUEUE_CODEL_TARGET_DELAY = "hbase.ipc.server.callqueue.codel.target.delay" ;
77- public static final String CALL_QUEUE_CODEL_INTERVAL = "hbase.ipc.server.callqueue.codel.interval" ;
78- public static final String CALL_QUEUE_CODEL_LIFO_THRESHOLD = "hbase.ipc.server.callqueue.codel.lifo.threshold" ;
82+ public static final String CALL_QUEUE_CODEL_TARGET_DELAY =
83+ "hbase.ipc.server.callqueue.codel.target.delay" ;
84+ public static final String CALL_QUEUE_CODEL_INTERVAL =
85+ "hbase.ipc.server.callqueue.codel.interval" ;
86+ public static final String CALL_QUEUE_CODEL_LIFO_THRESHOLD =
87+ "hbase.ipc.server.callqueue.codel.lifo.threshold" ;
7988
8089 public static final int CALL_QUEUE_CODEL_DEFAULT_TARGET_DELAY = 100 ;
8190 public static final int CALL_QUEUE_CODEL_DEFAULT_INTERVAL = 100 ;
@@ -86,16 +95,14 @@ public abstract class RpcExecutor {
8695 public static final String PLUGGABLE_CALL_QUEUE_WITH_FAST_PATH_ENABLED =
8796 "hbase.ipc.server.callqueue.pluggable.queue.fast.path.enabled" ;
8897
89- private LongAdder numGeneralCallsDropped = new LongAdder ();
90- private LongAdder numLifoModeSwitches = new LongAdder ();
98+ private final LongAdder numGeneralCallsDropped = new LongAdder ();
99+ private final LongAdder numLifoModeSwitches = new LongAdder ();
91100
92101 protected final int numCallQueues ;
93102 protected final List <BlockingQueue <CallRunner >> queues ;
94103 private final Class <? extends BlockingQueue > queueClass ;
95104 private final Object [] queueInitArgs ;
96105
97- private final PriorityFunction priority ;
98-
99106 protected volatile int currentQueueLimit ;
100107
101108 private final AtomicInteger activeHandlerCount = new AtomicInteger (0 );
@@ -105,8 +112,8 @@ public abstract class RpcExecutor {
105112
106113 private String name ;
107114
108- private Configuration conf = null ;
109- private Abortable abortable = null ;
115+ private final Configuration conf ;
116+ private final Abortable abortable ;
110117
111118 public RpcExecutor (final String name , final int handlerCount , final int maxQueueLength ,
112119 final PriorityFunction priority , final Configuration conf , final Abortable abortable ) {
@@ -142,12 +149,10 @@ public RpcExecutor(final String name, final int handlerCount, final String callQ
142149 this .handlerCount = Math .max (handlerCount , this .numCallQueues );
143150 this .handlers = new ArrayList <>(this .handlerCount );
144151
145- this .priority = priority ;
146-
147152 if (isDeadlineQueueType (callQueueType )) {
148153 this .name += ".Deadline" ;
149154 this .queueInitArgs = new Object [] { maxQueueLength ,
150- new CallPriorityComparator (conf , this . priority ) };
155+ new CallPriorityComparator (conf , priority ) };
151156 this .queueClass = BoundedPriorityBlockingQueue .class ;
152157 } else if (isCodelQueueType (callQueueType )) {
153158 this .name += ".Codel" ;
@@ -157,16 +162,17 @@ public RpcExecutor(final String name, final int handlerCount, final String callQ
157162 double codelLifoThreshold = conf .getDouble (CALL_QUEUE_CODEL_LIFO_THRESHOLD ,
158163 CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD );
159164 this .queueInitArgs = new Object [] { maxQueueLength , codelTargetDelay , codelInterval ,
160- codelLifoThreshold , numGeneralCallsDropped , numLifoModeSwitches };
165+ codelLifoThreshold , numGeneralCallsDropped , numLifoModeSwitches };
161166 this .queueClass = AdaptiveLifoCoDelCallQueue .class ;
162167 } else if (isPluggableQueueType (callQueueType )) {
163- Optional <Class <? extends BlockingQueue <CallRunner >>> pluggableQueueClass = getPluggableQueueClass ();
168+ Optional <Class <? extends BlockingQueue <CallRunner >>> pluggableQueueClass =
169+ getPluggableQueueClass ();
164170
165171 if (!pluggableQueueClass .isPresent ()) {
166172 throw new PluggableRpcQueueNotFound ("Pluggable call queue failed to load and selected call"
167173 + " queue type required" );
168174 } else {
169- this .queueInitArgs = new Object [] { maxQueueLength , this . priority , conf };
175+ this .queueInitArgs = new Object [] { maxQueueLength , priority , conf };
170176 this .queueClass = pluggableQueueClass .get ();
171177 }
172178 } else {
@@ -184,50 +190,41 @@ protected int computeNumCallQueues(final int handlerCount, final float callQueue
184190 return Math .max (1 , Math .round (handlerCount * callQueuesHandlersFactor ));
185191 }
186192
187- public Map <String , Long > getCallQueueCountsSummary () {
188- HashMap <String , Long > callQueueMethodTotalCount = new HashMap <>();
189-
190- for (BlockingQueue <CallRunner > queue : queues ) {
191- for (CallRunner cr :queue ) {
192- RpcCall rpcCall = cr .getRpcCall ();
193-
194- String method ;
195-
196- if (null ==rpcCall .getMethod () ||
197- StringUtil .isNullOrEmpty (method = rpcCall .getMethod ().getName ())) {
198- method = "Unknown" ;
199- }
193+ /**
194+ * Return the {@link Descriptors.MethodDescriptor#getName()} from {@code callRunner} or "Unknown".
195+ */
196+ private static String getMethodName (final CallRunner callRunner ) {
197+ return Optional .ofNullable (callRunner )
198+ .map (CallRunner ::getRpcCall )
199+ .map (RpcCall ::getMethod )
200+ .map (Descriptors .MethodDescriptor ::getName )
201+ .orElse ("Unknown" );
202+ }
200203
201- callQueueMethodTotalCount .put (method , 1 +callQueueMethodTotalCount .getOrDefault (method , 0L ));
202- }
203- }
204+ /**
205+ * Return the {@link RpcCall#getSize()} from {@code callRunner} or 0L.
206+ */
207+ private static long getRpcCallSize (final CallRunner callRunner ) {
208+ return Optional .ofNullable (callRunner )
209+ .map (CallRunner ::getRpcCall )
210+ .map (RpcCall ::getSize )
211+ .orElse (0L );
212+ }
204213
205- return callQueueMethodTotalCount ;
214+ public Map <String , Long > getCallQueueCountsSummary () {
215+ return queues .stream ()
216+ .flatMap (Collection ::stream )
217+ .map (RpcExecutor ::getMethodName )
218+ .collect (Collectors .groupingBy (Function .identity (), Collectors .counting ()));
206219 }
207220
208221 public Map <String , Long > getCallQueueSizeSummary () {
209- HashMap <String , Long > callQueueMethodTotalSize = new HashMap <>();
210-
211- for (BlockingQueue <CallRunner > queue : queues ) {
212- for (CallRunner cr :queue ) {
213- RpcCall rpcCall = cr .getRpcCall ();
214- String method ;
215-
216- if (null ==rpcCall .getMethod () ||
217- StringUtil .isNullOrEmpty (method = rpcCall .getMethod ().getName ())) {
218- method = "Unknown" ;
219- }
220-
221- long size = rpcCall .getSize ();
222-
223- callQueueMethodTotalSize .put (method , size +callQueueMethodTotalSize .getOrDefault (method , 0L ));
224- }
225- }
226-
227- return callQueueMethodTotalSize ;
222+ return queues .stream ()
223+ .flatMap (Collection ::stream )
224+ .map (callRunner -> new Pair <>(getMethodName (callRunner ), getRpcCallSize (callRunner )))
225+ .collect (Collectors .groupingBy (Pair ::getFirst , Collectors .summingLong (Pair ::getSecond )));
228226 }
229227
230-
231228 protected void initializeQueues (final int numQueues ) {
232229 if (queueInitArgs .length > 0 ) {
233230 currentQueueLimit = (int ) queueInitArgs [0 ];
@@ -250,7 +247,7 @@ public void stop() {
250247 }
251248
252249 /** Add the request to the executor queue */
253- public abstract boolean dispatch (final CallRunner callTask ) throws InterruptedException ;
250+ public abstract boolean dispatch (final CallRunner callTask );
254251
255252 /** Returns the list of request queues */
256253 protected List <BlockingQueue <CallRunner >> getQueues () {
@@ -296,26 +293,26 @@ protected void startHandlers(final String nameSuffix, final int numHandlers,
296293 handlers .size (), threadPrefix , qsize , port );
297294 }
298295
299- public static QueueBalancer getBalancer (String executorName , Configuration conf , List <BlockingQueue <CallRunner >> queues ) {
296+ /**
297+ * All requests go to the first queue, at index 0
298+ */
299+ private static final QueueBalancer ONE_QUEUE = val -> 0 ;
300+
301+ public static QueueBalancer getBalancer (
302+ final String executorName ,
303+ final Configuration conf ,
304+ final List <BlockingQueue <CallRunner >> queues
305+ ) {
300306 Preconditions .checkArgument (queues .size () > 0 , "Queue size is <= 0, must be at least 1" );
301307 if (queues .size () == 1 ) {
302308 return ONE_QUEUE ;
303309 } else {
304- Class <?> balancerClass = conf .getClass (CALL_QUEUE_QUEUE_BALANCER_CLASS , CALL_QUEUE_QUEUE_BALANCER_CLASS_DEFAULT );
310+ Class <?> balancerClass = conf .getClass (
311+ CALL_QUEUE_QUEUE_BALANCER_CLASS , CALL_QUEUE_QUEUE_BALANCER_CLASS_DEFAULT );
305312 return (QueueBalancer ) ReflectionUtils .newInstance (balancerClass , conf , executorName , queues );
306313 }
307314 }
308315
309- /**
310- * All requests go to the first queue, at index 0
311- */
312- private static QueueBalancer ONE_QUEUE = new QueueBalancer () {
313- @ Override
314- public int getNextQueue (CallRunner callRunner ) {
315- return 0 ;
316- }
317- };
318-
319316 /**
320317 * Comparator used by the "normal callQueue" if DEADLINE_CALL_QUEUE_CONF_KEY is set to true. It
321318 * uses the calculated "deadline" e.g. to deprioritize long-running job If multiple requests have
@@ -449,7 +446,8 @@ public void resizeQueues(Configuration conf) {
449446 if (name != null && name .toLowerCase (Locale .ROOT ).contains ("priority" )) {
450447 configKey = RpcScheduler .IPC_SERVER_PRIORITY_MAX_CALLQUEUE_LENGTH ;
451448 }
452- currentQueueLimit = conf .getInt (configKey , currentQueueLimit );
449+ final int queueLimit = currentQueueLimit ;
450+ currentQueueLimit = conf .getInt (configKey , queueLimit );
453451 }
454452
455453 public void onConfigurationChange (Configuration conf ) {
0 commit comments