diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index 8aec10cb1cf5..86073c4c0741 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -34,15 +34,14 @@ import java.util.Queue; import java.util.SortedSet; import java.util.TreeSet; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; @@ -52,6 +51,7 @@ import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput; import org.apache.hadoop.hbase.io.asyncfs.monitor.StreamSlowMonitor; +import org.apache.hadoop.hbase.util.FutureUtils; import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALKeyImpl; @@ -150,8 +150,6 @@ public class AsyncFSWAL extends AbstractFSWAL { private final Class channelClass; - private final Lock consumeLock = new ReentrantLock(); - private final Runnable consumer = this::consume; // check if there is already a consumer task in the event loop's task queue @@ -165,11 +163,7 @@ public class AsyncFSWAL extends AbstractFSWAL { // all other bits are the epoch number of the current writer, this is used to detect whether the // writer is still the one when you issue the sync. // notice that, modification to this field is only allowed under the protection of consumeLock. - private volatile int epochAndState; - - private boolean readyForRolling; - - private final Condition readyForRollingCond = consumeLock.newCondition(); + private int epochAndState; private final RingBuffer waitingConsumePayloads; @@ -202,6 +196,16 @@ public class AsyncFSWAL extends AbstractFSWAL { private final StreamSlowMonitor streamSlowMonitor; + /** + * {@link AsyncFSWAL#doReplaceWriter} and {@link AsyncFSWAL#doShutdown} is protected by + * {@link AbstractFSWAL#rollWriterLock}, there is at most one method call at the same time,so we + * could just use a simple variable to save the request. + */ + private final AtomicReference rollingRequestRef = + new AtomicReference(null); + + private boolean alreadyShutdown = false; + public AsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir, Configuration conf, List listeners, boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup, Class channelClass) @@ -272,8 +276,8 @@ private void markFutureDoneAndOffer(SyncFuture future, long txid, Throwable t) { syncFutureCache.offer(future); } - private static boolean waitingRoll(int epochAndState) { - return (epochAndState & 1) != 0; + private boolean waitingRoll() { + return this.rollingRequestRef.get() != null; } private static boolean writerBroken(int epochAndState) { @@ -284,53 +288,53 @@ private static int epoch(int epochAndState) { return epochAndState >>> 2; } + private boolean completeRolling() { + /** + * Poll the request. + */ + RollingRequest rollingRequest = this.rollingRequestRef.getAndSet(null); + if (rollingRequest == null) { + return false; + } + if (rollingRequest.shutdown) { + this.completeShutdown(rollingRequest); + } else { + this.completeReplaceWriter(rollingRequest); + } + return true; + } + // return whether we have successfully set readyForRolling to true. - private boolean trySetReadyForRolling() { + private boolean tryCompleteRolling() { // Check without holding lock first. Usually we will just return here. // waitingRoll is volatile and unacedEntries is only accessed inside event loop so it is safe to // check them outside the consumeLock. - if (!waitingRoll(epochAndState) || !unackedAppends.isEmpty()) { + if (!waitingRoll() || !unackedAppends.isEmpty()) { return false; } - consumeLock.lock(); - try { - // 1. a roll is requested - // 2. all out-going entries have been acked(we have confirmed above). - if (waitingRoll(epochAndState)) { - readyForRolling = true; - readyForRollingCond.signalAll(); - return true; - } else { - return false; - } - } finally { - consumeLock.unlock(); - } + + return this.completeRolling(); } private void syncFailed(long epochWhenSync, Throwable error) { LOG.warn("sync failed", error); boolean shouldRequestLogRoll = true; - consumeLock.lock(); - try { - int currentEpochAndState = epochAndState; - if (epoch(currentEpochAndState) != epochWhenSync || writerBroken(currentEpochAndState)) { - // this is not the previous writer which means we have already rolled the writer. - // or this is still the current writer, but we have already marked it as broken and request - // a roll. - return; - } - this.epochAndState = currentEpochAndState | 0b10; - if (waitingRoll(currentEpochAndState)) { - readyForRolling = true; - readyForRollingCond.signalAll(); - // this means we have already in the middle of a rollWriter so just tell the roller thread - // that you can continue without requesting an extra log roll. - shouldRequestLogRoll = false; - } - } finally { - consumeLock.unlock(); + + int currentEpochAndState = epochAndState; + if (epoch(currentEpochAndState) != epochWhenSync || writerBroken(currentEpochAndState)) { + // this is not the previous writer which means we have already rolled the writer. + // or this is still the current writer, but we have already marked it as broken and request + // a roll. + return; + } + this.epochAndState = currentEpochAndState | 0b10; + + if (this.completeRolling()) { + // this means we have already in the middle of a rollWriter so just tell the roller thread + // that you can continue without requesting an extra log roll. + shouldRequestLogRoll = false; } + for (Iterator iter = unackedAppends.descendingIterator(); iter.hasNext();) { toWriteAppends.addFirst(iter.next()); } @@ -384,7 +388,7 @@ private void syncCompleted(long epochWhenSync, AsyncWriter writer, long processe } } postSync(System.nanoTime() - startTimeNs, finishSync()); - if (trySetReadyForRolling()) { + if (tryCompleteRolling()) { // we have just finished a roll, then do not need to check for log rolling, the writer will be // closed soon. return; @@ -551,7 +555,7 @@ private void appendAndSync() { if (unackedAppends.isEmpty()) { highestSyncedTxid.set(highestProcessedAppendTxid); finishSync(); - trySetReadyForRolling(); + tryCompleteRolling(); } return; } @@ -604,27 +608,29 @@ private void drainNonMarkerEditsAndFailSyncs() { } private void consume() { - consumeLock.lock(); - try { - int currentEpochAndState = epochAndState; - if (writerBroken(currentEpochAndState)) { - return; - } - if (waitingRoll(currentEpochAndState)) { - if (writer.getLength() > fileLengthAtLastSync) { - // issue a sync - sync(writer); - } else { - if (unackedAppends.isEmpty()) { - readyForRolling = true; - readyForRollingCond.signalAll(); - } + if (this.alreadyShutdown) { + return; + } + if (this.writer == null) { + this.completeRolling(); + return; + } + if (writerBroken(epochAndState)) { + this.completeRolling(); + return; + } + if (waitingRoll()) { + if (writer.getLength() > fileLengthAtLastSync) { + // issue a sync + sync(writer); + } else { + if (unackedAppends.isEmpty()) { + this.completeRolling(); } - return; } - } finally { - consumeLock.unlock(); + return; } + long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1; for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor <= cursorBound; nextCursor++) { @@ -649,6 +655,9 @@ private void consume() { drainNonMarkerEditsAndFailSyncs(); } appendAndSync(); + if (this.alreadyShutdown) { + return; + } if (hasConsumerTask.get()) { return; } @@ -685,7 +694,7 @@ private void consume() { private boolean shouldScheduleConsumer() { int currentEpochAndState = epochAndState; - if (writerBroken(currentEpochAndState) || waitingRoll(currentEpochAndState)) { + if (writerBroken(currentEpochAndState) || waitingRoll()) { return false; } return consumerScheduled.compareAndSet(false, true); @@ -759,23 +768,11 @@ protected AsyncWriter createWriterInstance(Path path) throws IOException { return createAsyncWriter(fs, path); } - private void waitForSafePoint() { - consumeLock.lock(); - try { - int currentEpochAndState = epochAndState; - if (writerBroken(currentEpochAndState) || this.writer == null) { - return; - } - consumerScheduled.set(true); - epochAndState = currentEpochAndState | 1; - readyForRolling = false; - consumeExecutor.execute(consumer); - while (!readyForRolling) { - readyForRollingCond.awaitUninterruptibly(); - } - } finally { - consumeLock.unlock(); - } + private void addRollingRequest(RollingRequest request) { + assert this.rollingRequestRef.get() == null; + this.rollingRequestRef.set(request); + consumerScheduled.set(true); + consumeExecutor.execute(consumer); } protected final long closeWriter(AsyncWriter writer, Path path) { @@ -800,70 +797,101 @@ protected final long closeWriter(AsyncWriter writer, Path path) { @Override protected void doReplaceWriter(Path oldPath, Path newPath, AsyncWriter nextWriter) throws IOException { - Preconditions.checkNotNull(nextWriter); - waitForSafePoint(); - long oldFileLen = closeWriter(this.writer, oldPath); - logRollAndSetupWalProps(oldPath, newPath, oldFileLen); - this.writer = nextWriter; - if (nextWriter instanceof AsyncProtobufLogWriter) { - this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput(); - } - this.fileLengthAtLastSync = nextWriter.getLength(); - this.highestProcessedAppendTxidAtLastSync = 0L; - consumeLock.lock(); try { + Preconditions.checkNotNull(nextWriter); + AsyncWriter oldWriter = this.writer; + long oldFileLen = oldWriter == null ? 0 : oldWriter.getLength(); + RollingRequest request = new RollingRequest(oldPath, nextWriter, false); + addRollingRequest(request); + FutureUtils.get(request.future); + logRollAndSetupWalProps(oldPath, newPath, oldFileLen); + rollRequested.set(false); consumerScheduled.set(true); + consumeExecutor.execute(consumer); + } finally { + this.rollingRequestRef.set(null); + } + } + + private void completeReplaceWriter(RollingRequest rollingRequest) { + try { + Path oldPath = rollingRequest.oldPath; + AsyncWriter nextWriter = rollingRequest.nextWriter; + closeWriter(this.writer, oldPath); + this.writer = nextWriter; + if (nextWriter instanceof AsyncProtobufLogWriter) { + this.fsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput(); + } + this.fileLengthAtLastSync = nextWriter.getLength(); + this.highestProcessedAppendTxidAtLastSync = 0L; + int currentEpoch = epochAndState >>> 2; int nextEpoch = currentEpoch == MAX_EPOCH ? 0 : currentEpoch + 1; // set a new epoch and also clear waitingRoll and writerBroken this.epochAndState = nextEpoch << 2; - // Reset rollRequested status - rollRequested.set(false); - consumeExecutor.execute(consumer); - } finally { - consumeLock.unlock(); + rollingRequest.future.complete(true); + } catch (Exception exception) { + LOG.error("processReplaceWriter error!", exception); + rollingRequest.future.completeExceptionally(exception); } } @Override protected void doShutdown() throws IOException { - waitForSafePoint(); - closeWriter(this.writer, getOldPath()); - this.writer = null; - closeExecutor.shutdown(); try { - if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, TimeUnit.SECONDS)) { - LOG.error("We have waited " + waitOnShutdownInSeconds + " seconds but" - + " the close of async writer doesn't complete." - + "Please check the status of underlying filesystem" - + " or increase the wait time by the config \"" + ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS - + "\""); + RollingRequest request = new RollingRequest(getOldPath(), null, true); + addRollingRequest(request); + FutureUtils.get(request.future); + if (!(consumeExecutor instanceof EventLoop)) { + consumeExecutor.shutdown(); } - } catch (InterruptedException e) { - LOG.error("The wait for close of async writer is interrupted"); - Thread.currentThread().interrupt(); + } finally { + this.rollingRequestRef.set(null); } - IOException error = new IOException("WAL has been closed"); - long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1; - // drain all the pending sync requests - for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor - <= cursorBound; nextCursor++) { - if (!waitingConsumePayloads.isPublished(nextCursor)) { - break; + } + + private void completeShutdown(RollingRequest rollingRequest) { + try { + closeWriter(this.writer, rollingRequest.oldPath); + this.writer = null; + closeExecutor.shutdown(); + try { + if (!closeExecutor.awaitTermination(waitOnShutdownInSeconds, TimeUnit.SECONDS)) { + LOG.error("We have waited " + waitOnShutdownInSeconds + " seconds but" + + " the close of async writer doesn't complete." + + "Please check the status of underlying filesystem" + + " or increase the wait time by the config \"" + ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS + + "\""); + } + } catch (InterruptedException e) { + LOG.error("The wait for close of async writer is interrupted"); + Thread.currentThread().interrupt(); } - RingBufferTruck truck = waitingConsumePayloads.get(nextCursor); - switch (truck.type()) { - case SYNC: - syncFutures.add(truck.unloadSync()); - break; - default: + IOException error = new IOException("WAL has been closed"); + long nextCursor = waitingConsumePayloadsGatingSequence.get() + 1; + // drain all the pending sync requests + for (long cursorBound = waitingConsumePayloads.getCursor(); nextCursor + <= cursorBound; nextCursor++) { + if (!waitingConsumePayloads.isPublished(nextCursor)) { break; + } + RingBufferTruck truck = waitingConsumePayloads.get(nextCursor); + switch (truck.type()) { + case SYNC: + syncFutures.add(truck.unloadSync()); + break; + default: + break; + } } - } - // and fail them - syncFutures.forEach(f -> markFutureDoneAndOffer(f, f.getTxid(), error)); - if (!(consumeExecutor instanceof EventLoop)) { - consumeExecutor.shutdown(); + // and fail them + syncFutures.forEach(f -> markFutureDoneAndOffer(f, f.getTxid(), error)); + rollingRequest.future.complete(true); + } catch (Exception exception) { + LOG.error("shut down error!", exception); + rollingRequest.future.completeExceptionally(exception); + } finally { + this.alreadyShutdown = true; } } @@ -890,4 +918,18 @@ protected boolean doCheckLogLowReplication() { AsyncFSOutput output = this.fsOut; return output != null && output.isBroken(); } + + static class RollingRequest { + private final CompletableFuture future; + private final Path oldPath; + private final AsyncWriter nextWriter; + private final boolean shutdown; + + RollingRequest(Path oldPath, AsyncWriter nextWriter, boolean shutdown) { + this.oldPath = oldPath; + this.nextWriter = nextWriter; + this.future = new CompletableFuture(); + this.shutdown = shutdown; + } + } }