From 12211f59a0078fce1a53c6f20fa254307d8164ae Mon Sep 17 00:00:00 2001 From: sbarnoud060710 Date: Wed, 26 Jun 2019 19:45:41 +0200 Subject: [PATCH 1/3] HBASE-22634 Improve performance of BufferedMutator --- .../hadoop/hbase/client/AsyncProcess.java | 17 +- .../hbase/client/AsyncRequestFutureImpl.java | 92 +++++-- .../hbase/client/BufferedMutatorImpl.java | 100 +++++++- .../BufferedMutatorThreadPoolExecutor.java | 224 ++++++++++++++++++ .../hbase/client/RequestController.java | 21 ++ .../hbase/client/SimpleRequestController.java | 44 +++- .../hadoop/hbase/ipc/AbstractRpcClient.java | 23 +- .../hadoop/hbase/ipc/BlockingRpcClient.java | 5 + .../hbase/ipc/BlockingRpcConnection.java | 13 + .../hadoop/hbase/ipc/NettyRpcClient.java | 5 + .../hadoop/hbase/ipc/NettyRpcConnection.java | 25 +- .../hadoop/hbase/ipc/RpcConnection.java | 19 ++ 12 files changed, 542 insertions(+), 46 deletions(-) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorThreadPoolExecutor.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java index de7449bf2852..e19aa169544c 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java @@ -31,6 +31,8 @@ import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; @@ -255,12 +257,13 @@ private AsyncRequestFuture submit(AsyncProcessTask task, RequestController.Checker checker = requestController.newChecker(); boolean firstIter = true; do { - // Wait until there is at least one slot for a new task. - requestController.waitForFreeSlot(id, periodToLog, getLogger(tableName, -1)); int posInList = -1; if (!firstIter) { checker.reset(); } + + Set retainedServers = new TreeSet<>(); + Iterator it = rows.iterator(); while (it.hasNext()) { Row r = it.next(); @@ -309,10 +312,16 @@ private AsyncRequestFuture submit(AsyncProcessTask task, // TODO: replica-get is not supported on this path byte[] regionName = loc.getRegionInfo().getRegionName(); addAction(loc.getServerName(), regionName, action, actionsByServer, nonceGroup); + retainedServers.add(loc.getServerName()); it.remove(); } } firstIter = false; + + // Wait until there is at least one slot per server + requestController.waitForFreeSlot(retainedServers.size(),id, periodToLog, + getLogger(tableName, -1)); + } while (retainedActions.isEmpty() && atLeastOne && (locationErrors == null)); if (retainedActions.isEmpty()) return NO_REQS_RESULT; @@ -321,6 +330,10 @@ private AsyncRequestFuture submit(AsyncProcessTask task, locationErrors, locationErrorRows, actionsByServer); } + public void waitAllSlot() throws InterruptedIOException { + requestController.waitForAllFreeSlot(id); + } + AsyncRequestFuture submitMultiActions(AsyncProcessTask task, List retainedActions, long nonceGroup, List locationErrors, List locationErrorRows, Map actionsByServer) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java index 525033d4c933..aa79a1137cd3 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java @@ -186,6 +186,10 @@ final class SingleServerRequestRunnable implements Runnable { private final int numAttempt; private final ServerName server; private final Set callsInProgress; + private long firstStartNanoTime; + private long startNanoTime; + private long elapseNanoTime; + private long numReject = -1; @VisibleForTesting SingleServerRequestRunnable( MultiAction multiAction, int numAttempt, ServerName server, @@ -248,6 +252,30 @@ public void run() { } } } + + public void onStart() { + ++numReject; + startNanoTime = System.nanoTime(); + if (numReject == 0) { + firstStartNanoTime = startNanoTime; + } + } + + public void onFinish() { + elapseNanoTime = System.nanoTime() - startNanoTime; + } + + public long getElapseNanoTime() { + return elapseNanoTime; + } + + public long getNumReject() { + return numReject; + } + + public long getRejectedElapseNanoTime() { + return startNanoTime - firstStartNanoTime; + } } private final Batch.Callback callback; @@ -518,8 +546,14 @@ private RegionLocations findAllLocationsOrFail(Action action, boolean useCache) * @param numAttempt the attempt number. * @param actionsForReplicaThread original actions for replica thread; null on non-first call. */ - void sendMultiAction(Map actionsByServer, - int numAttempt, List actionsForReplicaThread, boolean reuseThread) { + // Must be synchronized because of the background thread writeBufferPeriodicFlushTimer + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="SWL_SLEEP_WITH_LOCK_HELD", + justification="Slots are freed while in the executor thread, so the thread is not yet" + + " available for the pool." + + " In that case, we have a RejectedExecutionException. Sleep let some time to threads" + + " to be available again") + synchronized void sendMultiAction(Map actionsByServer, int numAttempt, + List actionsForReplicaThread, boolean reuseThread) { // Run the last item on the same thread if we are already on a send thread. // We hope most of the time it will be the only item, so we can cut down on threads. int actionsRemaining = actionsByServer.size(); @@ -527,8 +561,8 @@ void sendMultiAction(Map actionsByServer, for (Map.Entry e : actionsByServer.entrySet()) { ServerName server = e.getKey(); MultiAction multiAction = e.getValue(); - Collection runnables = getNewMultiActionRunnable(server, multiAction, - numAttempt); + Collection runnables = + getNewMultiActionRunnable(server, multiAction, numAttempt); // make sure we correctly count the number of runnables before we try to reuse the send // thread, in case we had to split the request into different runnables because of backoff if (runnables.size() > actionsRemaining) { @@ -543,22 +577,38 @@ void sendMultiAction(Map actionsByServer, && numAttempt % HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER != 0) { runnable.run(); } else { - try { - pool.submit(runnable); - } catch (Throwable t) { - if (t instanceof RejectedExecutionException) { - // This should never happen. But as the pool is provided by the end user, - // let's secure this a little. - LOG.warn("id=" + asyncProcess.id + ", task rejected by pool. Unexpected." + - " Server=" + server.getServerName(), t); - } else { - // see #HBASE-14359 for more details - LOG.warn("Caught unexpected exception/error: ", t); + boolean completed = false; + int nbTry = 0; + while (!completed) { + try { + ++nbTry; + pool.submit(runnable); + completed = true; + } catch (Throwable t) { + if (t instanceof RejectedExecutionException) { + if ((nbTry % 1000) == 0) { + LOG.warn("#" + asyncProcess.id + + ", the task was rejected by the pool. This is unexpected." + " Server is " + + server.getServerName() + " (try " + nbTry + ")", t); + } else if (LOG.isDebugEnabled()) { + LOG.debug("#" + asyncProcess.id + + ", the task was rejected by the pool. This is unexpected." + " Server is " + + server.getServerName() + " (try " + nbTry + ")", t); + } + try { + Thread.sleep(10); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + } + } else { + // see #HBASE-14359 for more details + LOG.warn("Caught unexpected exception/error: ", t); + } + asyncProcess.decTaskCounters(multiAction.getRegions(), server); + // We're likely to fail again, but this will increment the attempt counter, + // so it will finish. + receiveGlobalFailure(multiAction, server, numAttempt, t); } - asyncProcess.decTaskCounters(multiAction.getRegions(), server); - // We're likely to fail again, but this will increment the attempt counter, - // so it will finish. - receiveGlobalFailure(multiAction, server, numAttempt, t); } } } @@ -1167,6 +1217,10 @@ private String buildDetailedErrorMsg(String string, int index) { return error.toString(); } + public boolean isFinished() { + return actionsInProgress.get() == 0; + } + @Override public void waitUntilDone() throws InterruptedIOException { try { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java index d4bc811c72c4..4cedd812a130 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java @@ -32,6 +32,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.ipc.RpcControllerFactory; @@ -61,6 +63,10 @@ * @see Connection * @since 1.0.0 */ +@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC", + justification="writeBufferPeriodicFlushTimer needs to be synchronized only when not null, " + + "and in mutual exclusion with close() and flush(true). " + + "However to needs to synchronize with flush(false) coming from mutate") @InterfaceAudience.Private @InterfaceStability.Evolving public class BufferedMutatorImpl implements BufferedMutator { @@ -93,6 +99,9 @@ public class BufferedMutatorImpl implements BufferedMutator { private final boolean cleanupPoolOnClose; private volatile boolean closed = false; private final AsyncProcess ap; + private List asfList; + private int maxThreads; + private ReentrantLock lock = new ReentrantLock(); @VisibleForTesting BufferedMutatorImpl(ClusterConnection conn, BufferedMutatorParams params, AsyncProcess ap) { @@ -109,6 +118,14 @@ public class BufferedMutatorImpl implements BufferedMutator { this.pool = params.getPool(); cleanupPoolOnClose = false; } + maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE); + if (maxThreads <= 0 || maxThreads == Integer.MAX_VALUE) { + maxThreads = conf.getInt("hbase.client.max.total.tasks", Integer.MAX_VALUE); + if (maxThreads <= 0 || maxThreads == Integer.MAX_VALUE) { + maxThreads = 1; + } + } + asfList = new ArrayList(maxThreads*4); ConnectionConfiguration tableConf = new ConnectionConfiguration(conf); this.writeBufferSize = params.getWriteBufferSize() != UNSET ? @@ -223,6 +240,7 @@ private void timerCallbackForWriteBufferPeriodicFlush() { try { executedWriteBufferPeriodicFlushes.incrementAndGet(); flush(); + writeBufferPeriodicFlushTimer.notifyAll(); } catch (InterruptedIOException | RetriesExhaustedWithDetailsException e) { LOG.error("Exception during timerCallbackForWriteBufferPeriodicFlush --> " + e.getMessage()); } @@ -235,6 +253,15 @@ public synchronized void close() throws IOException { } // Stop any running Periodic Flush timer. disableWriteBufferPeriodicFlush(); + ap.waitAllSlot(); + try { + // Let time to the periodic flush thread to exit (task are finished, but not the code after) + if (writeBufferPeriodicFlushTimer != null) { + writeBufferPeriodicFlushTimer.wait(5); + } + } catch (InterruptedException e) { + throw new IOException(e); + } try { // As we can have an operation in progress even if the buffer is empty, we call // doFlush at least one time. @@ -275,9 +302,25 @@ public int getOperationTimeout() { } @Override + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="UL_UNRELEASED_LOCK", + justification="It seems that findBugs parser is wrong") public void flush() throws InterruptedIOException, RetriesExhaustedWithDetailsException { - checkClose(); - doFlush(true); + // This will avoid concurrency between period flush, flush() and close() + // mutate are not synchronized, because it use doFlush(false) + boolean haveLocked = false; + if (writeBufferPeriodicFlushTimer != null) { + lock.lock(); + // make sure to unlock even if writeBufferPeriodicFlushTimer is set to null before the end + haveLocked = true; + } + try { + checkClose(); + doFlush(true); + } finally { + if (haveLocked) { + lock.unlock(); + } + } } /** @@ -286,6 +329,9 @@ public void flush() throws InterruptedIOException, RetriesExhaustedWithDetailsEx * @param flushAll - if true, sends all the writes and wait for all of them to finish before * returning. Otherwise, flush until buffer size is smaller than threshold */ + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="SWL_SLEEP_WITH_LOCK_HELD", + justification="Backpressure, when we have to many pending Future, " + + "we wait under the lock to slow down the application") private void doFlush(boolean flushAll) throws InterruptedIOException, RetriesExhaustedWithDetailsException { List errors = new ArrayList<>(); @@ -302,11 +348,51 @@ private void doFlush(boolean flushAll) throws InterruptedIOException, } asf = ap.submit(createTask(access)); } - // DON'T do the wait in the try-with-resources. Otherwise, the undealt mutations won't - // be released. - asf.waitUntilDone(); - if (asf.hasError()) { - errors.add(asf.getErrors()); + + if (flushAll) { + // if we have setWriteBufferPeriodicFlushTimeoutMs we may have concurrent update + List waitList; + synchronized(asfList) { + waitList = new ArrayList<>(asfList); + } + // DON'T do the wait in the try-with-resources. Otherwise, the undealt mutations won't + // be released. + for(AsyncRequestFuture toWait:waitList) { + toWait.waitUntilDone(); + errors.add(toWait.getErrors()); + } + synchronized(asfList) { + asfList.removeAll(waitList); + } + asf.waitUntilDone(); + if (asf.hasError()) { + errors.add(asf.getErrors()); + } + } else { + // Do some cleanup in asfList to decrease memory + int nbRemoved = 0; + while (asfList.size() >= maxThreads*4) { + synchronized(asfList) { + Iterator it = asfList.iterator(); + while (it.hasNext()) { + AsyncRequestFutureImpl toCheck = (AsyncRequestFutureImpl) it.next(); + if (toCheck.isFinished()) { + it.remove(); + nbRemoved++; + } + } + if (nbRemoved == 0) { + try { + Thread.sleep(1); + } catch (InterruptedException e) { + throw new InterruptedIOException(e.getMessage()); + } + } + } + } + synchronized(asfList) { + asfList.add(asf); + } } } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorThreadPoolExecutor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorThreadPoolExecutor.java new file mode 100644 index 000000000000..b31067958ab1 --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorThreadPoolExecutor.java @@ -0,0 +1,224 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.client; + +import java.lang.reflect.Field; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.FutureTask; +import java.util.concurrent.SynchronousQueue; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.yetus.audience.InterfaceAudience; +import org.apache.yetus.audience.InterfaceStability; + +/* + * This class propose a ThreadPoolExecutor which + * uses hbase.client.max.total.tasks as default pool size (if hbase.htable.threads.max isn't set) + * computes some metrics on submitted task to allow to the application to know: + * how many flush have been triggered from mutate() + * the amount of time spent in those background threads + */ +@edu.umd.cs.findbugs.annotations.SuppressWarnings(value = "DLS_DEAD_LOCAL_STORE", + justification = "Class provided as illustration for application" + + " as an alternative to the default pool") +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class BufferedMutatorThreadPoolExecutor extends ThreadPoolExecutor { + + private final Object _lock = new Object(); + + private Map ssrMap; + + private long ssrCacheMaxStored = 0; + private AtomicLong ssrCachePutCount = new AtomicLong(0); + private long ssrCacheRemoveCount = 0; + + private long totalExecutedTaskCount = 0; + private long totalRejectedTaskCount = 0; + private long totalExecutedNanoTime = 0; + private long totalRejectedNanoTime = 0; + private AtomicLong totalMissingTaskCount = new AtomicLong(0); + + public BufferedMutatorThreadPoolExecutor(final int corePoolSize, final int maximumPoolSize, + final long keepAliveTime, final TimeUnit unit, final BlockingQueue workQueue, + final ThreadFactory threadFactory) { + super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory); + ssrMap = new ConcurrentHashMap<>(this.getMaximumPoolSize()); + } + + public static BufferedMutatorThreadPoolExecutor getPoolExecutor(Configuration conf) { + int maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE); + if (maxThreads <= 0 || maxThreads == Integer.MAX_VALUE) { + maxThreads = conf.getInt("hbase.client.max.total.tasks", Integer.MAX_VALUE); + } + if (maxThreads <= 0 || maxThreads == Integer.MAX_VALUE) { + throw new IllegalArgumentException("hbase.client.max.total.tasks must be >0"); + } + long keepAliveTime = conf.getLong("hbase.htable.threads.keepalivetime", 60); + + // Using the "direct handoff" approach, new threads will only be created + // if it is necessary and will grow unbounded. This could be bad but in HCM + // we only create as many Runnables as there are region servers. It means + // it also scales when new region servers are added. + BufferedMutatorThreadPoolExecutor pool = + new BufferedMutatorThreadPoolExecutor(1, maxThreads, keepAliveTime, TimeUnit.SECONDS, + new SynchronousQueue<>(), Threads.newDaemonThreadFactory("htable")); + pool.allowCoreThreadTimeOut(true); + return pool; + } + + @Override + public Future submit(final Runnable r) { + if (r instanceof AsyncRequestFutureImpl.SingleServerRequestRunnable) { + final AsyncRequestFutureImpl.SingleServerRequestRunnable task = + ((AsyncRequestFutureImpl.SingleServerRequestRunnable) r); + task.onStart(); + } + return super.submit(r); + } + + @Override + protected void beforeExecute(final Thread t, final Runnable r) { + // if t is null, this means we will get a RejectedExecutionException, and NEVER be called + // in afterExecute + // To avoid a memory leak, add in the map only if t is NOT null! + if (t != null && r instanceof Future) { + final Future f = (Future) r; + final Object o = TaskDiscoverer.findRealTask(r); + if (o instanceof AsyncRequestFutureImpl.SingleServerRequestRunnable) { + final AsyncRequestFutureImpl.SingleServerRequestRunnable task = + ((AsyncRequestFutureImpl.SingleServerRequestRunnable) o); + ssrMap.put(t, task); + ssrCachePutCount.incrementAndGet(); + } + } + super.beforeExecute(t, r); + } + + @Override + protected void afterExecute(final Runnable r, final Throwable t) { + super.afterExecute(r, t); + + final Object o = ssrMap.remove(Thread.currentThread()); + if (o instanceof AsyncRequestFutureImpl.SingleServerRequestRunnable) { + final AsyncRequestFutureImpl.SingleServerRequestRunnable task = + ((AsyncRequestFutureImpl.SingleServerRequestRunnable) o); + task.onFinish(); + synchronized (_lock) { + ++ssrCacheRemoveCount; + ssrCacheMaxStored = + Math.max(ssrCacheMaxStored, ssrCachePutCount.get() - ssrCacheRemoveCount); + ++totalExecutedTaskCount; + totalExecutedNanoTime += task.getElapseNanoTime(); + totalRejectedTaskCount += task.getNumReject(); + totalRejectedNanoTime += task.getRejectedElapseNanoTime(); + } + } else { + totalMissingTaskCount.incrementAndGet(); + } + + } + + public long getAndResetSsrCacheMaxStored() { + long ret = ssrCacheMaxStored; + ssrCacheMaxStored = 0; + ssrCachePutCount.set(0); + ssrCacheRemoveCount = 0; + return ret; + } + + public long getAvgRejectedNanoTime() { + if (totalRejectedTaskCount == 0) { + return 0; + } + return totalRejectedNanoTime / totalRejectedTaskCount; + } + + public long getTotalMissingTaskCount() { + return totalMissingTaskCount.get(); + } + + public long getAvgExecutedNanoTime() { + if (totalExecutedTaskCount == 0) { + return 0; + } + return totalExecutedNanoTime / totalExecutedTaskCount; + } + + public long getTotalExecutedTaskCount() { + return totalExecutedTaskCount; + } + + public long getTotalExecutedNanoTime() { + return totalExecutedNanoTime; + } + + public long getTotalRejectedTaskCount() { + return totalRejectedTaskCount; + } + + public long getTotalRejectedNanoTime() { + return totalRejectedNanoTime; + } + + public static class TaskDiscoverer { + + private final static Field callableInFutureTask; + private static final Class adapterClass; + private static final Field runnableInAdapter; + + static { + try { + callableInFutureTask = FutureTask.class.getDeclaredField("callable"); + callableInFutureTask.setAccessible(true); + adapterClass = Executors.callable(() -> { + }).getClass(); + runnableInAdapter = adapterClass.getDeclaredField("task"); + runnableInAdapter.setAccessible(true); + } catch (NoSuchFieldException e) { + throw new ExceptionInInitializerError(e); + } + } + + public static Object findRealTask(final Runnable task) { + if (task instanceof FutureTask) { + try { + Object callable = callableInFutureTask.get(task); + if (adapterClass.isInstance(callable)) { + return runnableInAdapter.get(callable); + } else { + return callable; + } + } catch (IllegalAccessException e) { + throw new IllegalStateException(e); + } + } + throw new ClassCastException("Not a FutureTask"); + } + } + +} diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RequestController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RequestController.java index 4c63e4d08812..813f328ae7c7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RequestController.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RequestController.java @@ -22,6 +22,7 @@ import java.io.InterruptedIOException; import java.util.Collection; import java.util.function.Consumer; + import org.apache.hadoop.hbase.HRegionLocation; import org.apache.hadoop.hbase.ServerName; import org.apache.yetus.audience.InterfaceAudience; @@ -118,4 +119,24 @@ void waitForMaximumCurrentTasks(long max, long id, */ void waitForFreeSlot(long id, int periodToTrigger, Consumer trigger) throws InterruptedIOException; + + /** + * Wait until there is at least size slot for size new tasks. + * @param size the number of requested slots + * @param id the caller's id + * @param periodToTrigger The period to invoke the trigger. This value is a hint. The real period + * depends on the implementation. + * @param trigger The object to call periodically. + * @throws java.io.InterruptedIOException If the waiting is interrupted + */ + void waitForFreeSlot(int size, long id, int periodToTrigger, + Consumer trigger) throws InterruptedIOException; + + /** + * Wait until all slots are free. + * @param id the caller's id + * @throws java.io.InterruptedIOException If the waiting is interrupted + */ + void waitForAllFreeSlot(long id) throws InterruptedIOException; + } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SimpleRequestController.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SimpleRequestController.java index 2c9a3671489f..6d9c7fffd389 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SimpleRequestController.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/SimpleRequestController.java @@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Consumer; import org.apache.hadoop.conf.Configuration; @@ -45,7 +46,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hbase.util.Bytes; -import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent; import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; @@ -93,6 +93,8 @@ class SimpleRequestController implements RequestController { = new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR); @VisibleForTesting final ConcurrentMap taskCounterPerServer = new ConcurrentHashMap<>(); + + final ReentrantLock lock = new ReentrantLock(); /** * The number of tasks simultaneously executed on the cluster. */ @@ -124,9 +126,9 @@ class SimpleRequestController implements RequestController { private final int thresholdToLogUndoneTaskDetails; public static final String THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS = "hbase.client.threshold.log.details"; - private static final int DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS = 10; public static final String THRESHOLD_TO_LOG_REGION_DETAILS = "hbase.client.threshold.log.region.details"; + private static final int DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS = 10; private static final int DEFAULT_THRESHOLD_TO_LOG_REGION_DETAILS = 2; private final int thresholdToLogRegionDetails; SimpleRequestController(final Configuration conf) { @@ -244,11 +246,26 @@ public Checker newChecker() { public void incTaskCounters(Collection regions, ServerName sn) { tasksInProgress.incrementAndGet(); - computeIfAbsent(taskCounterPerServer, sn, AtomicInteger::new).incrementAndGet(); + incrementMapForKey(taskCounterPerServer,sn); - regions.forEach((regBytes) - -> computeIfAbsent(taskCounterPerRegion, regBytes, AtomicInteger::new).incrementAndGet() - ); + regions.forEach((regBytes) -> incrementMapForKey(taskCounterPerRegion,regBytes)); + } + + private void incrementMapForKey(Map collection, K key) { + if (collection.containsKey(key)) { + collection.get(key).incrementAndGet(); + } else { + lock.lock(); + try { + if (collection.containsKey(key)) { + collection.get(key).incrementAndGet(); + } else { + collection.put(key, new AtomicInteger(1)); + } + } finally { + lock.unlock(); + } + } } @Override @@ -324,10 +341,23 @@ private void logDetailsOfUndoneTasks(long taskInProgress) { } @Override - public void waitForFreeSlot(long id, int periodToTrigger, Consumer trigger) throws InterruptedIOException { + public void waitForFreeSlot(long id, int periodToTrigger, Consumer trigger) + throws InterruptedIOException { waitForMaximumCurrentTasks(maxTotalConcurrentTasks - 1, id, periodToTrigger, trigger); } + @Override + public void waitForFreeSlot(int numberOfTask,long id, int periodToTrigger, + Consumer trigger) throws InterruptedIOException { + waitForMaximumCurrentTasks(maxTotalConcurrentTasks - numberOfTask, + id, periodToTrigger, trigger); + } + + @Override + public void waitForAllFreeSlot(long id) throws InterruptedIOException { + waitForMaximumCurrentTasks(0, id, 0, null); + } + /** * limit the heapsize of total submitted data. Reduce the limit of heapsize * for submitting quickly if there is no running task. diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java index c8904f43675b..1f2bcdbc0e66 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java @@ -140,7 +140,7 @@ public abstract class AbstractRpcClient implements RpcC private final AtomicInteger callIdCnt = new AtomicInteger(0); - private final ScheduledFuture cleanupIdleConnectionTask; + private ScheduledFuture cleanupIdleConnectionTask = null; private int maxConcurrentCallsPerServer; @@ -188,13 +188,15 @@ public AbstractRpcClient(Configuration conf, String clusterId, SocketAddress loc this.connections = new PoolMap<>(getPoolType(conf), getPoolSize(conf)); - this.cleanupIdleConnectionTask = IDLE_CONN_SWEEPER.scheduleAtFixedRate(new Runnable() { + if (!hasIdleCleanupSupport()) { + this.cleanupIdleConnectionTask = IDLE_CONN_SWEEPER.scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - cleanupIdleConnections(); - } - }, minIdleTimeBeforeClose, minIdleTimeBeforeClose, TimeUnit.MILLISECONDS); + @Override + public void run() { + cleanupIdleConnections(); + } + }, minIdleTimeBeforeClose, minIdleTimeBeforeClose, TimeUnit.MILLISECONDS); + } if (LOG.isDebugEnabled()) { LOG.debug("Codec=" + this.codec + ", compressor=" + this.compressor + ", tcpKeepAlive=" @@ -372,6 +374,8 @@ private T getConnection(ConnectionId remoteId) throws IOException { */ protected abstract T createConnection(ConnectionId remoteId) throws IOException; + protected abstract boolean hasIdleCleanupSupport(); + private void onCallFinished(Call call, HBaseRpcController hrc, InetSocketAddress addr, RpcCallback callback) { call.callStats.setCallTimeMs(EnvironmentEdgeManager.currentTime() - call.getStartTime()); @@ -450,6 +454,7 @@ public void cancelConnections(ServerName sn) { + connection.remoteId); connections.removeValue(remoteId, connection); connection.shutdown(); + connection.cleanupConnection(); } } } @@ -491,7 +496,9 @@ public void close() { connToClose = connections.values(); connections.clear(); } - cleanupIdleConnectionTask.cancel(true); + if (cleanupIdleConnectionTask != null) { + cleanupIdleConnectionTask.cancel(true); + } for (T conn : connToClose) { conn.shutdown(); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java index f84c308715b2..0dd8dfbd3bbd 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcClient.java @@ -72,6 +72,11 @@ protected BlockingRpcConnection createConnection(ConnectionId remoteId) throws I return new BlockingRpcConnection(this, remoteId); } + @Override + protected boolean hasIdleCleanupSupport() { + return (new BlockingRpcConnection()).hasIdleCleanupSupport(); + } + @Override protected void closeInternal() { } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java index b1c3ea21ba5b..9b0293a931c7 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/BlockingRpcConnection.java @@ -215,6 +215,14 @@ public void cleanup(IOException e) { } } + // Dummy constructor used to call hasIdleCleanupSupport + BlockingRpcConnection() { + super(); + callSender = null; + threadName = null; + rpcClient=null; + } + BlockingRpcConnection(BlockingRpcClient rpcClient, ConnectionId remoteId) throws IOException { super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId, rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor); @@ -765,6 +773,11 @@ public void cleanupConnection() { // do nothing } + @Override + public boolean hasIdleCleanupSupport() { + return false; + } + @Override public synchronized void sendRequest(final Call call, HBaseRpcController pcrc) throws IOException { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java index 61dedbb5c124..4ce02c9ede7b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcClient.java @@ -74,6 +74,11 @@ protected NettyRpcConnection createConnection(ConnectionId remoteId) throws IOEx return new NettyRpcConnection(this, remoteId); } + @Override + protected boolean hasIdleCleanupSupport() { + return new NettyRpcConnection().hasIdleCleanupSupport(); + } + @Override protected void closeInternal() { if (shutdownGroupWhenClose) { diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java index b8620b1b743c..238146e4d758 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/NettyRpcConnection.java @@ -85,6 +85,12 @@ class NettyRpcConnection extends RpcConnection { justification = "connect is also under lock as notifyOnCancel will call our action directly") private Channel channel; + // Dummy constructor used to call hasIdleCleanupSupport + NettyRpcConnection() { + super(); + rpcClient=null; + } + NettyRpcConnection(NettyRpcClient rpcClient, ConnectionId remoteId) throws IOException { super(rpcClient.conf, AbstractRpcClient.WHEEL_TIMER, remoteId, rpcClient.clusterId, rpcClient.userProvider.isHBaseSecurityEnabled(), rpcClient.codec, rpcClient.compressor); @@ -132,6 +138,11 @@ public synchronized void cleanupConnection() { } } + @Override + public boolean hasIdleCleanupSupport() { + return true; + } + private void established(Channel ch) throws IOException { ChannelPipeline p = ch.pipeline(); String addBeforeHandler = p.context(BufferCallBeforeInitHandler.class).name(); @@ -149,6 +160,16 @@ private void scheduleRelogin(Throwable error) { if (error instanceof FallbackDisallowedException) { return; } + // do not schedule anything if not needed... + boolean isOverKrb = false; + try { + isOverKrb = shouldAuthenticateOverKrb(); + } catch (IOException e) { + return; + } + if (!isOverKrb) { + return; + } synchronized (this) { if (reloginInProgress) { return; @@ -159,9 +180,7 @@ private void scheduleRelogin(Throwable error) { @Override public void run() { try { - if (shouldAuthenticateOverKrb()) { - relogin(); - } + relogin(); } catch (IOException e) { LOG.warn("Relogin failed", e); } diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java index a935bb41a420..9ea0b8a7ea1b 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcConnection.java @@ -81,6 +81,20 @@ abstract class RpcConnection { // the last time we were picked up from connection pool. protected long lastTouched; + // Dummy constructor used to call hasIdleCleanupSupport + RpcConnection() { + remoteId = null; + authMethod = null; + useSasl = false; + token = null; + serverPrincipal = null; + reloginMaxBackoff = 0; + codec = null; + compressor = null; + timeoutTimer = null; + conf = null; + } + protected RpcConnection(Configuration conf, HashedWheelTimer timeoutTimer, ConnectionId remoteId, String clusterId, boolean isSecurityEnabled, Codec codec, CompressionCodec compressor) throws IOException { @@ -274,4 +288,9 @@ public void setLastTouched(long lastTouched) { * Does the clean up work after the connection is removed from the connection pool */ public abstract void cleanupConnection(); + + /** + * @return true when this client supports periodic cleanup of used connection + */ + public abstract boolean hasIdleCleanupSupport(); } From ad270b86090aa84f01ad4431c11d7ef37ddce979 Mon Sep 17 00:00:00 2001 From: sbarnoud060710 Date: Tue, 16 Jul 2019 12:14:29 +0200 Subject: [PATCH 2/3] FIx add errors only when we get some. --- .../org/apache/hadoop/hbase/client/BufferedMutatorImpl.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java index 4cedd812a130..506e1bcc5ec5 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java @@ -359,7 +359,9 @@ private void doFlush(boolean flushAll) throws InterruptedIOException, // be released. for(AsyncRequestFuture toWait:waitList) { toWait.waitUntilDone(); - errors.add(toWait.getErrors()); + if (toWait.hasError()) { + errors.add(toWait.getErrors()); + } } synchronized(asfList) { asfList.removeAll(waitList); From c49f1bd7535ce1625f35eeed93094d555923cf5f Mon Sep 17 00:00:00 2001 From: sbarnoud060710 Date: Tue, 16 Jul 2019 16:21:34 +0200 Subject: [PATCH 3/3] Fix exception management in sendMultiAction. Making mutate synchronous when writeBufferSIze is 0 --- .../org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java | 1 + .../org/apache/hadoop/hbase/client/BufferedMutatorImpl.java | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java index aa79a1137cd3..a437a5abc25e 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java @@ -603,6 +603,7 @@ synchronized void sendMultiAction(Map actionsByServer, } else { // see #HBASE-14359 for more details LOG.warn("Caught unexpected exception/error: ", t); + completed = true; } asyncProcess.decTaskCounters(multiAction.getRegions(), server); // We're likely to fail again, but this will increment the attempt counter, diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java index 506e1bcc5ec5..eb2feb5d5bb6 100644 --- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java @@ -349,7 +349,7 @@ private void doFlush(boolean flushAll) throws InterruptedIOException, asf = ap.submit(createTask(access)); } - if (flushAll) { + if (flushAll || writeBufferSize == 0) { // if we have setWriteBufferPeriodicFlushTimeoutMs we may have concurrent update List waitList; synchronized(asfList) {