Skip to content

Commit 5a19bcf

Browse files
authored
HBASE-25984: Avoid premature reuse of sync futures in FSHLog (apache#3371)
Signed-off-by: Viraj Jasani <[email protected]>
1 parent 7466e08 commit 5a19bcf

File tree

7 files changed

+285
-21
lines changed

7 files changed

+285
-21
lines changed

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java

Lines changed: 7 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -320,11 +320,9 @@ public WalProps(Map<byte[], Long> encodedName2HighestSequenceId, long logSize) {
320320
new ConcurrentSkipListMap<>(LOG_NAME_COMPARATOR);
321321

322322
/**
323-
* Map of {@link SyncFuture}s owned by Thread objects. Used so we reuse SyncFutures.
324-
* Thread local is used so JVM can GC the terminated thread for us. See HBASE-21228
325-
* <p>
323+
* A cache of sync futures reused by threads.
326324
*/
327-
private final ThreadLocal<SyncFuture> cachedSyncFutures;
325+
protected final SyncFutureCache syncFutureCache;
328326

329327
/**
330328
* The class name of the runtime implementation, used as prefix for logging/tracing.
@@ -494,12 +492,7 @@ public boolean accept(final Path fileName) {
494492
DEFAULT_SLOW_SYNC_ROLL_INTERVAL_MS);
495493
this.walSyncTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf.getLong(WAL_SYNC_TIMEOUT_MS,
496494
DEFAULT_WAL_SYNC_TIMEOUT_MS));
497-
this.cachedSyncFutures = new ThreadLocal<SyncFuture>() {
498-
@Override
499-
protected SyncFuture initialValue() {
500-
return new SyncFuture();
501-
}
502-
};
495+
this.syncFutureCache = new SyncFutureCache(conf);
503496
this.implClassName = getClass().getSimpleName();
504497
this.walTooOldNs = TimeUnit.SECONDS.toNanos(conf.getInt(
505498
SURVIVED_TOO_LONG_SEC_KEY, SURVIVED_TOO_LONG_SEC_DEFAULT));
@@ -885,10 +878,6 @@ protected final void blockOnSync(SyncFuture syncFuture) throws IOException {
885878
}
886879
}
887880
} catch (TimeoutIOException tioe) {
888-
// SyncFuture reuse by thread, if TimeoutIOException happens, ringbuffer
889-
// still refer to it, so if this thread use it next time may get a wrong
890-
// result.
891-
this.cachedSyncFutures.remove();
892881
throw tioe;
893882
} catch (InterruptedException ie) {
894883
LOG.warn("Interrupted", ie);
@@ -993,6 +982,9 @@ public void shutdown() throws IOException {
993982
rollWriterLock.lock();
994983
try {
995984
doShutdown();
985+
if (syncFutureCache != null) {
986+
syncFutureCache.clear();
987+
}
996988
if (logArchiveExecutor != null) {
997989
logArchiveExecutor.shutdownNow();
998990
}
@@ -1049,7 +1041,7 @@ public void updateStore(byte[] encodedRegionName, byte[] familyName, Long sequen
10491041
}
10501042

10511043
protected final SyncFuture getSyncFuture(long sequence, boolean forceSync) {
1052-
return cachedSyncFutures.get().reset(sequence).setForceSync(forceSync);
1044+
return syncFutureCache.getIfPresentOrNew().reset(sequence).setForceSync(forceSync);
10531045
}
10541046

10551047
protected boolean isLogRollRequested() {

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,14 @@ public AsyncFSWAL(FileSystem fs, Abortable abortable, Path rootDir, String logDi
263263
DEFAULT_ASYNC_WAL_WAIT_ON_SHUTDOWN_IN_SECONDS);
264264
}
265265

266+
/**
267+
* Helper that marks the future as DONE and offers it back to the cache.
268+
*/
269+
private void markFutureDoneAndOffer(SyncFuture future, long txid, Throwable t) {
270+
future.done(txid, t);
271+
syncFutureCache.offer(future);
272+
}
273+
266274
private static boolean waitingRoll(int epochAndState) {
267275
return (epochAndState & 1) != 0;
268276
}
@@ -397,7 +405,7 @@ private int finishSyncLowerThanTxid(long txid) {
397405
for (Iterator<SyncFuture> iter = syncFutures.iterator(); iter.hasNext();) {
398406
SyncFuture sync = iter.next();
399407
if (sync.getTxid() <= txid) {
400-
sync.done(txid, null);
408+
markFutureDoneAndOffer(sync, txid, null);
401409
iter.remove();
402410
finished++;
403411
} else {
@@ -416,7 +424,7 @@ private int finishSync() {
416424
long maxSyncTxid = highestSyncedTxid.get();
417425
for (SyncFuture sync : syncFutures) {
418426
maxSyncTxid = Math.max(maxSyncTxid, sync.getTxid());
419-
sync.done(maxSyncTxid, null);
427+
markFutureDoneAndOffer(sync, maxSyncTxid, null);
420428
}
421429
highestSyncedTxid.set(maxSyncTxid);
422430
int finished = syncFutures.size();
@@ -531,7 +539,7 @@ private void drainNonMarkerEditsAndFailSyncs() {
531539
for (Iterator<SyncFuture> syncIter = syncFutures.iterator(); syncIter.hasNext();) {
532540
SyncFuture future = syncIter.next();
533541
if (future.getTxid() < txid) {
534-
future.done(future.getTxid(), error);
542+
markFutureDoneAndOffer(future, future.getTxid(), error);
535543
syncIter.remove();
536544
} else {
537545
break;
@@ -796,7 +804,7 @@ protected void doShutdown() throws IOException {
796804
}
797805
}
798806
// and fail them
799-
syncFutures.forEach(f -> f.done(f.getTxid(), error));
807+
syncFutures.forEach(f -> markFutureDoneAndOffer(f, f.getTxid(), error));
800808
if (!(consumeExecutor instanceof EventLoop)) {
801809
consumeExecutor.shutdown();
802810
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -891,6 +891,14 @@ SyncFuture waitSafePoint(SyncFuture syncFuture) throws InterruptedException,
891891
return syncFuture;
892892
}
893893

894+
/**
895+
* @return if the safepoint has been attained.
896+
*/
897+
@InterfaceAudience.Private
898+
boolean isSafePointAttained() {
899+
return this.safePointAttainedLatch.getCount() == 0;
900+
}
901+
894902
/**
895903
* Called by Thread B when it attains the 'safe point'. In this method, Thread B signals Thread
896904
* A it can proceed. Thread B will be held in here until {@link #releaseSafePoint()} is called
@@ -977,6 +985,16 @@ private void cleanupOutstandingSyncsOnException(final long sequence, final Excep
977985
for (int i = 0; i < this.syncFuturesCount.get(); i++) {
978986
this.syncFutures[i].done(sequence, e);
979987
}
988+
offerDoneSyncsBackToCache();
989+
}
990+
991+
/**
992+
* Offers the finished syncs back to the cache for reuse.
993+
*/
994+
private void offerDoneSyncsBackToCache() {
995+
for (int i = 0; i < this.syncFuturesCount.get(); i++) {
996+
syncFutureCache.offer(syncFutures[i]);
997+
}
980998
this.syncFuturesCount.set(0);
981999
}
9821000

@@ -1089,7 +1107,10 @@ public void onEvent(final RingBufferTruck truck, final long sequence, boolean en
10891107
? this.exception : new DamagedWALException("On sync", this.exception));
10901108
}
10911109
attainSafePoint(sequence);
1092-
this.syncFuturesCount.set(0);
1110+
// It is critical that we offer the futures back to the cache for reuse here after the
1111+
// safe point is attained and all the clean up has been done. There have been
1112+
// issues with reusing sync futures early causing WAL lockups, see HBASE-25984.
1113+
offerDoneSyncsBackToCache();
10931114
} catch (Throwable t) {
10941115
LOG.error("UNEXPECTED!!! syncFutures.length=" + this.syncFutures.length, t);
10951116
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SyncFuture.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,8 @@ synchronized SyncFuture reset(long txid) {
9090

9191
@Override
9292
public synchronized String toString() {
93-
return "done=" + isDone() + ", txid=" + this.txid;
93+
return "done=" + isDone() + ", txid=" + this.txid + " threadID=" + t.getId() +
94+
" threadName=" + t.getName();
9495
}
9596

9697
synchronized long getTxid() {
@@ -106,6 +107,15 @@ synchronized SyncFuture setForceSync(boolean forceSync) {
106107
return this;
107108
}
108109

110+
/**
111+
* Returns the thread that owned this sync future, use with caution as we return the reference to
112+
* the actual thread object.
113+
* @return the associated thread instance.
114+
*/
115+
Thread getThread() {
116+
return t;
117+
}
118+
109119
/**
110120
* @param txid the transaction id at which this future 'completed'.
111121
* @param t Can be null. Set if we are 'completing' on error (and this 't' is the error).
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.regionserver.wal;
19+
20+
import java.util.concurrent.TimeUnit;
21+
import org.apache.hadoop.conf.Configuration;
22+
import org.apache.hadoop.hbase.HConstants;
23+
import org.apache.yetus.audience.InterfaceAudience;
24+
import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
25+
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
26+
27+
/**
28+
* A cache of {@link SyncFuture}s. This class supports two methods
29+
* {@link SyncFutureCache#getIfPresentOrNew()} and {@link SyncFutureCache#offer()}.
30+
*
31+
* Usage pattern:
32+
* SyncFuture sf = syncFutureCache.getIfPresentOrNew();
33+
* sf.reset(...);
34+
* // Use the sync future
35+
* finally: syncFutureCache.offer(sf);
36+
*
37+
* Offering the sync future back to the cache makes it eligible for reuse within the same thread
38+
* context. Cache keyed by the accessing thread instance and automatically invalidated if it remains
39+
* unused for {@link SyncFutureCache#SYNC_FUTURE_INVALIDATION_TIMEOUT_MINS} minutes.
40+
*/
41+
@InterfaceAudience.Private
42+
public final class SyncFutureCache {
43+
44+
private static final long SYNC_FUTURE_INVALIDATION_TIMEOUT_MINS = 2;
45+
46+
private final Cache<Thread, SyncFuture> syncFutureCache;
47+
48+
public SyncFutureCache(final Configuration conf) {
49+
final int handlerCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT,
50+
HConstants.DEFAULT_REGION_SERVER_HANDLER_COUNT);
51+
syncFutureCache = CacheBuilder.newBuilder().initialCapacity(handlerCount)
52+
.expireAfterWrite(SYNC_FUTURE_INVALIDATION_TIMEOUT_MINS, TimeUnit.MINUTES).build();
53+
}
54+
55+
public SyncFuture getIfPresentOrNew() {
56+
// Invalidate the entry if a mapping exists. We do not want it to be reused at the same time.
57+
SyncFuture future = syncFutureCache.asMap().remove(Thread.currentThread());
58+
return (future == null) ? new SyncFuture() : future;
59+
}
60+
61+
/**
62+
* Offers the sync future back to the cache for reuse.
63+
*/
64+
public void offer(SyncFuture syncFuture) {
65+
// It is ok to overwrite an existing mapping.
66+
syncFutureCache.asMap().put(syncFuture.getThread(), syncFuture);
67+
}
68+
69+
public void clear() {
70+
if (syncFutureCache != null) {
71+
syncFutureCache.invalidateAll();
72+
}
73+
}
74+
}

hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java

Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
package org.apache.hadoop.hbase.regionserver.wal;
1919

2020
import static org.junit.Assert.assertEquals;
21+
import static org.junit.Assert.assertFalse;
22+
import static org.junit.Assert.assertTrue;
2123

2224
import java.io.IOException;
2325
import java.lang.reflect.Field;
@@ -27,13 +29,15 @@
2729
import java.util.concurrent.CountDownLatch;
2830
import java.util.concurrent.ExecutorService;
2931
import java.util.concurrent.Executors;
32+
import java.util.concurrent.TimeUnit;
3033
import java.util.concurrent.atomic.AtomicBoolean;
3134
import org.apache.hadoop.conf.Configuration;
3235
import org.apache.hadoop.fs.FileSystem;
3336
import org.apache.hadoop.fs.Path;
3437
import org.apache.hadoop.hbase.HBaseClassTestRule;
3538
import org.apache.hadoop.hbase.HConstants;
3639
import org.apache.hadoop.hbase.TableName;
40+
import org.apache.hadoop.hbase.Waiter;
3741
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
3842
import org.apache.hadoop.hbase.client.Put;
3943
import org.apache.hadoop.hbase.client.RegionInfo;
@@ -49,8 +53,10 @@
4953
import org.apache.hadoop.hbase.util.Bytes;
5054
import org.apache.hadoop.hbase.util.CommonFSUtils;
5155
import org.apache.hadoop.hbase.util.Threads;
56+
import org.apache.hadoop.hbase.wal.WAL;
5257
import org.apache.hadoop.hbase.wal.WALEdit;
5358
import org.apache.hadoop.hbase.wal.WALKey;
59+
import org.apache.hadoop.hbase.wal.WALProvider;
5460
import org.junit.ClassRule;
5561
import org.junit.Rule;
5662
import org.junit.Test;
@@ -67,6 +73,8 @@ public class TestFSHLog extends AbstractTestFSWAL {
6773
public static final HBaseClassTestRule CLASS_RULE =
6874
HBaseClassTestRule.forClass(TestFSHLog.class);
6975

76+
private static final long TEST_TIMEOUT_MS = 10000;
77+
7078
@Rule
7179
public TestName name = new TestName();
7280

@@ -131,6 +139,89 @@ public void testSyncRunnerIndexOverflow() throws IOException, NoSuchFieldExcepti
131139
}
132140
}
133141

142+
/**
143+
* Test for WAL stall due to sync future overwrites. See HBASE-25984.
144+
*/
145+
@Test
146+
public void testDeadlockWithSyncOverwrites() throws Exception {
147+
final CountDownLatch blockBeforeSafePoint = new CountDownLatch(1);
148+
149+
class FailingWriter implements WALProvider.Writer {
150+
@Override public void sync(boolean forceSync) throws IOException {
151+
throw new IOException("Injected failure..");
152+
}
153+
154+
@Override public void append(WAL.Entry entry) throws IOException {
155+
}
156+
157+
@Override public long getLength() {
158+
return 0;
159+
}
160+
161+
@Override
162+
public long getSyncedLength() {
163+
return 0;
164+
}
165+
166+
@Override public void close() throws IOException {
167+
}
168+
}
169+
170+
/*
171+
* Custom FSHLog implementation with a conditional wait before attaining safe point.
172+
*/
173+
class CustomFSHLog extends FSHLog {
174+
public CustomFSHLog(FileSystem fs, Path rootDir, String logDir, String archiveDir,
175+
Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists,
176+
String prefix, String suffix) throws IOException {
177+
super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix);
178+
}
179+
180+
@Override
181+
protected void beforeWaitOnSafePoint() {
182+
try {
183+
assertTrue(blockBeforeSafePoint.await(TEST_TIMEOUT_MS, TimeUnit.MILLISECONDS));
184+
} catch (InterruptedException e) {
185+
throw new RuntimeException(e);
186+
}
187+
}
188+
189+
public SyncFuture publishSyncOnRingBuffer() {
190+
long sequence = getSequenceOnRingBuffer();
191+
return publishSyncOnRingBuffer(sequence, false);
192+
}
193+
}
194+
195+
final String name = this.name.getMethodName();
196+
try (CustomFSHLog log = new CustomFSHLog(FS, CommonFSUtils.getRootDir(CONF), name,
197+
HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null)) {
198+
log.setWriter(new FailingWriter());
199+
Field ringBufferEventHandlerField =
200+
FSHLog.class.getDeclaredField("ringBufferEventHandler");
201+
ringBufferEventHandlerField.setAccessible(true);
202+
FSHLog.RingBufferEventHandler ringBufferEventHandler =
203+
(FSHLog.RingBufferEventHandler) ringBufferEventHandlerField.get(log);
204+
// Force a safe point
205+
FSHLog.SafePointZigZagLatch latch = ringBufferEventHandler.attainSafePoint();
206+
try {
207+
SyncFuture future0 = log.publishSyncOnRingBuffer();
208+
// Wait for the sync to be done.
209+
Waiter.waitFor(CONF, TEST_TIMEOUT_MS, future0::isDone);
210+
// Publish another sync from the same thread, this should not overwrite the done sync.
211+
SyncFuture future1 = log.publishSyncOnRingBuffer();
212+
assertFalse(future1.isDone());
213+
// Unblock the safe point trigger..
214+
blockBeforeSafePoint.countDown();
215+
// Wait for the safe point to be reached.
216+
// With the deadlock in HBASE-25984, this is never possible, thus blocking the sync pipeline.
217+
Waiter.waitFor(CONF, TEST_TIMEOUT_MS, latch::isSafePointAttained);
218+
} finally {
219+
// Force release the safe point, for the clean up.
220+
latch.releaseSafePoint();
221+
}
222+
}
223+
}
224+
134225
/**
135226
* Test case for https://issues.apache.org/jira/browse/HBASE-16721
136227
*/

0 commit comments

Comments
 (0)