Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -320,11 +320,9 @@ public WalProps(Map<byte[], Long> encodedName2HighestSequenceId, long logSize) {
new ConcurrentSkipListMap<>(LOG_NAME_COMPARATOR);

/**
* Map of {@link SyncFuture}s owned by Thread objects. Used so we reuse SyncFutures.
* Thread local is used so JVM can GC the terminated thread for us. See HBASE-21228
* <p>
* A cache of sync futures reused by threads.
*/
private final ThreadLocal<SyncFuture> cachedSyncFutures;
protected final SyncFutureCache syncFutureCache;

/**
* The class name of the runtime implementation, used as prefix for logging/tracing.
Expand Down Expand Up @@ -494,12 +492,7 @@ public boolean accept(final Path fileName) {
DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS);
this.walSyncTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf.getLong(WAL_SYNC_TIMEOUT_MS,
DEFAULT_WAL_SYNC_TIMEOUT_MS));
this.cachedSyncFutures = new ThreadLocal<SyncFuture>() {
@Override
protected SyncFuture initialValue() {
return new SyncFuture();
}
};
this.syncFutureCache = new SyncFutureCache(conf);
this.implClassName = getClass().getSimpleName();
this.walTooOldNs = TimeUnit.SECONDS.toNanos(conf.getInt(
SURVIVED_TOO_LONG_SEC_KEY, SURVIVED_TOO_LONG_SEC_DEFAULT));
Expand Down Expand Up @@ -885,10 +878,6 @@ protected final void blockOnSync(SyncFuture syncFuture) throws IOException {
}
}
} catch (TimeoutIOException tioe) {
// SyncFuture reuse by thread, if TimeoutIOException happens, ringbuffer
// still refer to it, so if this thread use it next time may get a wrong
// result.
this.cachedSyncFutures.remove();
throw tioe;
} catch (InterruptedException ie) {
LOG.warn("Interrupted", ie);
Expand Down Expand Up @@ -993,6 +982,9 @@ public void shutdown() throws IOException {
rollWriterLock.lock();
try {
doShutdown();
if (syncFutureCache != null) {
syncFutureCache.clear();
}
if (logArchiveExecutor != null) {
logArchiveExecutor.shutdownNow();
}
Expand Down Expand Up @@ -1049,7 +1041,7 @@ public void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequen
}

protected final SyncFuture getSyncFuture(long sequence, boolean forceSync) {
return cachedSyncFutures.get().reset(sequence).setForceSync(forceSync);
return syncFutureCache.getIfPresentOrNew().reset(sequence).setForceSync(forceSync);
}

protected boolean isLogRollRequested() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,14 @@ public AsyncFSWAL(FileSystem fs, Abortable abortable, Path rootDir, String logDi
DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS);
}

/**
* Helper that marks the future as DONE and offers it back to the cache.
*/
private void markFutureDoneAndOffer(SyncFuture future, long txid, Throwable t) {
future.done(txid, t);
syncFutureCache.offer(future);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This patch in the current form doesn't get rid of future overwrites as it does not seem to cause any issues in AsyncWAL case (based on code reading), but if the reviewers think we should do that, I can refactor accordingly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is the future overwrite here? The call to 'done'?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@saintstack I documented the race here..

Once done() is called, the future can be reused immediately from another handler (without this patch). That was causing deadlocks in FSHLog. Based on my analysis of AsyncFSWAL, I think the overwrites are possible but it should not affect the correctness as the safe point is attained in a different manner. So wanted to check with Duo who is the expert on that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry. Should have done more background reading before commenting.

}

private static boolean waitingRoll(int epochAndState) {
return (epochAndState & 1) != 0;
}
Expand Down Expand Up @@ -397,7 +405,7 @@ private int finishSyncLowerThanTxid(long txid) {
for (Iterator<SyncFuture> iter = syncFutures.iterator(); iter.hasNext();) {
SyncFuture sync = iter.next();
if (sync.getTxid() <= txid) {
sync.done(txid, null);
markFutureDoneAndOffer(sync, txid, null);
iter.remove();
finished++;
} else {
Expand All @@ -416,7 +424,7 @@ private int finishSync() {
long maxSyncTxid = highestSyncedTxid.get();
for (SyncFuture sync : syncFutures) {
maxSyncTxid = Math.max(maxSyncTxid, sync.getTxid());
sync.done(maxSyncTxid, null);
markFutureDoneAndOffer(sync, maxSyncTxid, null);
}
highestSyncedTxid.set(maxSyncTxid);
int finished = syncFutures.size();
Expand Down Expand Up @@ -531,7 +539,7 @@ private void drainNonMarkerEditsAndFailSyncs() {
for (Iterator<SyncFuture> syncIter = syncFutures.iterator(); syncIter.hasNext();) {
SyncFuture future = syncIter.next();
if (future.getTxid() < txid) {
future.done(future.getTxid(), error);
markFutureDoneAndOffer(future, future.getTxid(), error);
syncIter.remove();
} else {
break;
Expand Down Expand Up @@ -796,7 +804,7 @@ protected void doShutdown() throws IOException {
}
}
// and fail them
syncFutures.forEach(f -> f.done(f.getTxid(), error));
syncFutures.forEach(f -> markFutureDoneAndOffer(f, f.getTxid(), error));
if (!(consumeExecutor instanceof EventLoop)) {
consumeExecutor.shutdown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -891,6 +891,14 @@ SyncFuture waitSafePoint(SyncFuture syncFuture) throws InterruptedException,
return syncFuture;
}

/**
* @return if the safepoint has been attained.
*/
@InterfaceAudience.Private
boolean isSafePointAttained() {
return this.safePointAttainedLatch.getCount() == 0;
}

/**
* Called by Thread B when it attains the 'safe point'. In this method, Thread B signals Thread
* A it can proceed. Thread B will be held in here until {@link #releaseSafePoint()} is called
Expand Down Expand Up @@ -977,6 +985,16 @@ private void cleanupOutstandingSyncsOnException(final long sequence, final Excep
for (int i = 0; i < this.syncFuturesCount.get(); i++) {
this.syncFutures[i].done(sequence, e);
}
offerDoneSyncsBackToCache();
}

/**
* Offers the finished syncs back to the cache for reuse.
*/
private void offerDoneSyncsBackToCache() {
for (int i = 0; i < this.syncFuturesCount.get(); i++) {
syncFutureCache.offer(syncFutures[i]);
}
this.syncFuturesCount.set(0);
}

Expand Down Expand Up @@ -1089,7 +1107,10 @@ public void onEvent(final RingBufferTruck truck, final long sequence, boolean en
? this.exception : new DamagedWALException("On sync", this.exception));
}
attainSafePoint(sequence);
this.syncFuturesCount.set(0);
// It is critical that we offer the futures back to the cache for reuse here after the
// safe point is attained and all the clean up has been done. There have been
// issues with reusing sync futures early causing WAL lockups, see HBASE-25984.
offerDoneSyncsBackToCache();
} catch (Throwable t) {
LOG.error("UNEXPECTED!!! syncFutures.length=" + this.syncFutures.length, t);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ synchronized SyncFuture reset(long txid) {

@Override
public synchronized String toString() {
return "done=" + isDone() + ", txid=" + this.txid;
return "done=" + isDone() + ", txid=" + this.txid + " threadID=" + t.getId() +
" threadName=" + t.getName();
}

synchronized long getTxid() {
Expand All @@ -106,6 +107,15 @@ synchronized SyncFuture setForceSync(boolean forceSync) {
return this;
}

/**
* Returns the thread that owned this sync future, use with caution as we return the reference to
* the actual thread object.
* @return the associated thread instance.
*/
Thread getThread() {
return t;
}

/**
* @param txid the transaction id at which this future 'completed'.
* @param t Can be null. Set if we are 'completing' on error (and this 't' is the error).
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;

import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;

/**
* A cache of {@link SyncFuture}s. This class supports two methods
* {@link SyncFutureCache#getIfPresentOrNew()} and {@link SyncFutureCache#offer()}.
*
* Usage pattern:
* SyncFuture sf = syncFutureCache.getIfPresentOrNew();
* sf.reset(...);
* // Use the sync future
* finally: syncFutureCache.offer(sf);
*
* Offering the sync future back to the cache makes it eligible for reuse within the same thread
* context. Cache keyed by the accessing thread instance and automatically invalidated if it remains
* unused for {@link SyncFutureCache#SYNC_FUTURE_INVALIDATION_TIMEOUT_MINS} minutes.
*/
@InterfaceAudience.Private
public final class SyncFutureCache {

private static final long SYNC_FUTURE_INVALIDATION_TIMEOUT_MINS = 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 min is good estimate? Trying to understand if we might run into overhead (cache entry getting expired followed by same entry getting created for same thread from pool)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a handler remained idle for 2 mins, that indicates there isn't enough load on the server in which case there is no advantage in keeping this around. Usually only helpful if there is high load and potential for reuse so that we don't need to GC these small objects often.


private final Cache<Thread, SyncFuture> syncFutureCache;

public SyncFutureCache(final Configuration conf) {
final int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
syncFutureCache = CacheBuilder.newBuilder().initialCapacity(handlerCount)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought this guava cache 'slow'. That Ben Manes tried to get a patch to guava that improved it but couldn't get interest so his caffeine cache has means of implementing gauava cache api.... so you can drop in his thing instead.

Maybe it doesn't matter here because scale of objects is small? It is critical section though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah sweet, I heard about this Guava vs Caffeine thing, let me replace that. (didn't notice any performance issues with the patch but if we get some extra performance why not)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if there is a benefit over a ThreadLocal in your case. The expiration time here seems to be only to evict when the thread dies, which a TL does automatically. A weakKey cache might be the closest equivalent to that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It also looks like your on 2.8.1, whereas a putIfAbsent optimization was added in 2.8.2 to avoid locking if the entry is present. That might help.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need a putIfAbsent() either, switched to put() as we are ok with the overwrites here, it doesn't show up in the profiler now. Ya we can't use the

I'm not sure if there is a benefit over a ThreadLocal in your case.

Right, we can't use TL here because of the bug, but WeakKey cache seems like a good alternative, thanks for the pointer.

.expireAfterWrite(SYNC_FUTURE_INVALIDATION_TIMEOUT_MINS, TimeUnit.MINUTES).build();
}

public SyncFuture getIfPresentOrNew() {
// Invalidate the entry if a mapping exists. We do not want it to be reused at the same time.
SyncFuture future = syncFutureCache.asMap().remove(Thread.currentThread());
return (future == null) ? new SyncFuture() : future;
Comment on lines +57 to +58
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This suggestion was great. I think we can make this change in branch-1 as well (sorry I lost the track, not sure if branch-1 PR is still pending for merge or already merged)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no branch-1 patch (yet).

}

/**
* Offers the sync future back to the cache for reuse.
*/
public void offer(SyncFuture syncFuture) {
// It is ok to overwrite an existing mapping.
syncFutureCache.asMap().put(syncFuture.getThread(), syncFuture);
}

public void clear() {
if (syncFutureCache != null) {
syncFutureCache.invalidateAll();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package org.apache.hadoop.hbase.regionserver.wal;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.lang.reflect.Field;
Expand All @@ -27,13 +29,15 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
Expand All @@ -49,8 +53,10 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
Expand All @@ -67,6 +73,8 @@ public class TestFSHLog extends AbstractTestFSWAL {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestFSHLog.class);

private static final long TEST_TIMEOUT_MS = 10000;

@Rule
public TestName name = new TestName();

Expand Down Expand Up @@ -131,6 +139,89 @@ public void testSyncRunnerIndexOverflow() throws IOException, NoSuchFieldExcepti
}
}

/**
* Test for WAL stall due to sync future overwrites. See HBASE-25984.
*/
@Test
public void testDeadlockWithSyncOverwrites() throws Exception {
final CountDownLatch blockBeforeSafePoint = new CountDownLatch(1);

class FailingWriter implements WALProvider.Writer {
@Override public void sync(boolean forceSync) throws IOException {
throw new IOException("Injected failure..");
}

@Override public void append(WAL.Entry entry) throws IOException {
}

@Override public long getLength() {
return 0;
}

@Override
public long getSyncedLength() {
return 0;
}

@Override public void close() throws IOException {
}
}

/*
* Custom FSHLog implementation with a conditional wait before attaining safe point.
*/
class CustomFSHLog extends FSHLog {
public CustomFSHLog(FileSystem fs, Path rootDir, String logDir, String archiveDir,
Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
String prefix, String suffix) throws IOException {
super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
}

@Override
protected void beforeWaitOnSafePoint() {
try {
assertTrue(blockBeforeSafePoint.await(TEST_TIMEOUT_MS, TimeUnit.MILLISECONDS));
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}

public SyncFuture publishSyncOnRingBuffer() {
long sequence = getSequenceOnRingBuffer();
return publishSyncOnRingBuffer(sequence, false);
}
}

final String name = this.name.getMethodName();
try (CustomFSHLog log = new CustomFSHLog(FS, CommonFSUtils.getRootDir(CONF), name,
HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null)) {
log.setWriter(new FailingWriter());
Field ringBufferEventHandlerField =
FSHLog.class.getDeclaredField("ringBufferEventHandler");
ringBufferEventHandlerField.setAccessible(true);
FSHLog.RingBufferEventHandler ringBufferEventHandler =
(FSHLog.RingBufferEventHandler) ringBufferEventHandlerField.get(log);
// Force a safe point
FSHLog.SafePointZigZagLatch latch = ringBufferEventHandler.attainSafePoint();
try {
SyncFuture future0 = log.publishSyncOnRingBuffer();
// Wait for the sync to be done.
Waiter.waitFor(CONF, TEST_TIMEOUT_MS, future0::isDone);
// Publish another sync from the same thread, this should not overwrite the done sync.
SyncFuture future1 = log.publishSyncOnRingBuffer();
assertFalse(future1.isDone());
// Unblock the safe point trigger..
blockBeforeSafePoint.countDown();
// Wait for the safe point to be reached.
// With the deadlock in HBASE-25984, this is never possible, thus blocking the sync pipeline.
Waiter.waitFor(CONF, TEST_TIMEOUT_MS, latch::isSafePointAttained);
} finally {
// Force release the safe point, for the clean up.
latch.releaseSafePoint();
}
}
}

/**
* Test case for https://issues.apache.org/jira/browse/HBASE-16721
*/
Expand Down
Loading