- 
                Notifications
    You must be signed in to change notification settings 
- Fork 3.4k
HBASE-22634 : Improve performance of BufferedMutator #343
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | 
|---|---|---|
|  | @@ -32,6 +32,8 @@ | |
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
| import java.util.concurrent.atomic.AtomicLong; | ||
| import java.util.concurrent.locks.ReentrantLock; | ||
|  | ||
| import org.apache.hadoop.conf.Configuration; | ||
| import org.apache.hadoop.hbase.TableName; | ||
| import org.apache.hadoop.hbase.ipc.RpcControllerFactory; | ||
|  | @@ -61,6 +63,10 @@ | |
| * @see Connection | ||
| * @since 1.0.0 | ||
| */ | ||
| @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="IS2_INCONSISTENT_SYNC", | ||
| justification="writeBufferPeriodicFlushTimer needs to be synchronized only when not null, " | ||
| + "and in mutual exclusion with close() and flush(true). " | ||
| + "However to needs to synchronize with flush(false) coming from mutate") | ||
| @InterfaceAudience.Private | ||
| @InterfaceStability.Evolving | ||
| public class BufferedMutatorImpl implements BufferedMutator { | ||
|  | @@ -93,6 +99,9 @@ public class BufferedMutatorImpl implements BufferedMutator { | |
| private final boolean cleanupPoolOnClose; | ||
| private volatile boolean closed = false; | ||
| private final AsyncProcess ap; | ||
| private List<AsyncRequestFuture> asfList; | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does this field need to be a class member? It seems that we only use it in the doFlush method. | ||
| private int maxThreads; | ||
| private ReentrantLock lock = new ReentrantLock(); | ||
|  | ||
| @VisibleForTesting | ||
| BufferedMutatorImpl(ClusterConnection conn, BufferedMutatorParams params, AsyncProcess ap) { | ||
|  | @@ -109,6 +118,14 @@ public class BufferedMutatorImpl implements BufferedMutator { | |
| this.pool = params.getPool(); | ||
| cleanupPoolOnClose = false; | ||
| } | ||
| maxThreads = conf.getInt("hbase.htable.threads.max", Integer.MAX_VALUE); | ||
| if (maxThreads <= 0 || maxThreads == Integer.MAX_VALUE) { | ||
| maxThreads = conf.getInt("hbase.client.max.total.tasks", Integer.MAX_VALUE); | ||
| if (maxThreads <= 0 || maxThreads == Integer.MAX_VALUE) { | ||
| maxThreads = 1; | ||
| } | ||
| } | ||
| asfList = new ArrayList<AsyncRequestFuture>(maxThreads*4); | ||
| ConnectionConfiguration tableConf = new ConnectionConfiguration(conf); | ||
| this.writeBufferSize = | ||
| params.getWriteBufferSize() != UNSET ? | ||
|  | @@ -223,6 +240,7 @@ private void timerCallbackForWriteBufferPeriodicFlush() { | |
| try { | ||
| executedWriteBufferPeriodicFlushes.incrementAndGet(); | ||
| flush(); | ||
| writeBufferPeriodicFlushTimer.notifyAll(); | ||
| } catch (InterruptedIOException | RetriesExhaustedWithDetailsException e) { | ||
| LOG.error("Exception during timerCallbackForWriteBufferPeriodicFlush --> " + e.getMessage()); | ||
| } | ||
|  | @@ -235,6 +253,15 @@ public synchronized void close() throws IOException { | |
| } | ||
| // Stop any running Periodic Flush timer. | ||
| disableWriteBufferPeriodicFlush(); | ||
| ap.waitAllSlot(); | ||
| try { | ||
| // Let time to the periodic flush thread to exit (task are finished, but not the code after) | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the problem if the code after is not finished? I'm a bit nervous that we just set a magic 5ms sleep time here... | ||
| if (writeBufferPeriodicFlushTimer != null) { | ||
| writeBufferPeriodicFlushTimer.wait(5); | ||
| } | ||
| } catch (InterruptedException e) { | ||
| throw new IOException(e); | ||
| } | ||
| try { | ||
| // As we can have an operation in progress even if the buffer is empty, we call | ||
| // doFlush at least one time. | ||
|  | @@ -275,9 +302,25 @@ public int getOperationTimeout() { | |
| } | ||
|  | ||
| @Override | ||
| @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="UL_UNRELEASED_LOCK", | ||
| justification="It seems that findBugs parser is wrong") | ||
| public void flush() throws InterruptedIOException, RetriesExhaustedWithDetailsException { | ||
| checkClose(); | ||
| doFlush(true); | ||
| // This will avoid concurrency between period flush, flush() and close() | ||
| // mutate are not synchronized, because it use doFlush(false) | ||
| boolean haveLocked = false; | ||
| if (writeBufferPeriodicFlushTimer != null) { | ||
| lock.lock(); | ||
| // make sure to unlock even if writeBufferPeriodicFlushTimer is set to null before the end | ||
| haveLocked = true; | ||
| } | ||
| try { | ||
| checkClose(); | ||
| doFlush(true); | ||
| } finally { | ||
| if (haveLocked) { | ||
| lock.unlock(); | ||
| } | ||
| } | ||
| } | ||
|  | ||
| /** | ||
|  | @@ -286,6 +329,9 @@ public void flush() throws InterruptedIOException, RetriesExhaustedWithDetailsEx | |
| * @param flushAll - if true, sends all the writes and wait for all of them to finish before | ||
| * returning. Otherwise, flush until buffer size is smaller than threshold | ||
| */ | ||
| @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="SWL_SLEEP_WITH_LOCK_HELD", | ||
| justification="Backpressure, when we have to many pending Future, " | ||
| + "we wait under the lock to slow down the application") | ||
| private void doFlush(boolean flushAll) throws InterruptedIOException, | ||
| RetriesExhaustedWithDetailsException { | ||
| List<RetriesExhaustedWithDetailsException> errors = new ArrayList<>(); | ||
|  | @@ -302,11 +348,53 @@ private void doFlush(boolean flushAll) throws InterruptedIOException, | |
| } | ||
| asf = ap.submit(createTask(access)); | ||
| } | ||
| // DON'T do the wait in the try-with-resources. Otherwise, the undealt mutations won't | ||
| // be released. | ||
| asf.waitUntilDone(); | ||
| if (asf.hasError()) { | ||
| errors.add(asf.getErrors()); | ||
|  | ||
| if (flushAll || writeBufferSize == 0) { | ||
| // if we have setWriteBufferPeriodicFlushTimeoutMs we may have concurrent update | ||
| List<AsyncRequestFuture> waitList; | ||
| synchronized(asfList) { | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oh, I see, BufferedMutator is thread safe, so multiple threads can enter this method at the same time and we want them to share the safe asfList, so it should be class member. | ||
| waitList = new ArrayList<>(asfList); | ||
| } | ||
| // DON'T do the wait in the try-with-resources. Otherwise, the undealt mutations won't | ||
| // be released. | ||
| for(AsyncRequestFuture toWait:waitList) { | ||
| toWait.waitUntilDone(); | ||
| if (toWait.hasError()) { | ||
| errors.add(toWait.getErrors()); | ||
| } | ||
| } | ||
| synchronized(asfList) { | ||
| asfList.removeAll(waitList); | ||
| } | ||
| asf.waitUntilDone(); | ||
| if (asf.hasError()) { | ||
| errors.add(asf.getErrors()); | ||
| } | ||
| } else { | ||
| // Do some cleanup in asfList to decrease memory | ||
| int nbRemoved = 0; | ||
| while (asfList.size() >= maxThreads*4) { | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here we get size out of synchornized? | ||
| synchronized(asfList) { | ||
| Iterator<AsyncRequestFuture> it = asfList.iterator(); | ||
| while (it.hasNext()) { | ||
| AsyncRequestFutureImpl toCheck = (AsyncRequestFutureImpl) it.next(); | ||
| if (toCheck.isFinished()) { | ||
| it.remove(); | ||
| nbRemoved++; | ||
| } | ||
| } | ||
| if (nbRemoved == 0) { | ||
| try { | ||
| Thread.sleep(1); | ||
| There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So this means we will do a busy waiting here? This is not always the best choice, maybe we should provide a configurable way to wait here, the default one should be the typical wait/notify, and if do not care wasting the CPU cycles but only want the maximum throughput, you can use busy waiting. | ||
| } catch (InterruptedException e) { | ||
| throw new InterruptedIOException(e.getMessage()); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| synchronized(asfList) { | ||
| asfList.add(asf); | ||
| } | ||
| } | ||
| } | ||
|  | ||
|  | ||
Uh oh!
There was an error while loading. Please reload this page.