Skip to content

Commit 1cbe383

Browse files
committed
HBASE-23157 WAL unflushed seqId tracking may wrong when Durability.ASYNC_WAL is used (apache#762)
Signed-off-by: stack <[email protected]>
1 parent 52933e2 commit 1cbe383

File tree

11 files changed

+206
-98
lines changed

11 files changed

+206
-98
lines changed

hbase-common/src/main/java/org/apache/hadoop/hbase/util/ImmutableByteArray.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ public static ImmutableByteArray wrap(byte[] b) {
4848
return new ImmutableByteArray(b);
4949
}
5050

51+
@Override
5152
public String toString() {
5253
return Bytes.toStringBinary(b);
5354
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2899,7 +2899,7 @@ protected FlushResultImpl internalFlushCacheAndCommit(WAL wal, MonitoredTask sta
28992899

29002900
// If we get to here, the HStores have been written.
29012901
if (wal != null) {
2902-
wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
2902+
wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes(), flushedSeqId);
29032903
}
29042904

29052905
// Record latest flush time

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -468,8 +468,8 @@ public Long startCacheFlush(byte[] encodedRegionName, Map<byte[], Long> familyTo
468468
}
469469

470470
@Override
471-
public void completeCacheFlush(byte[] encodedRegionName) {
472-
this.sequenceIdAccounting.completeCacheFlush(encodedRegionName);
471+
public void completeCacheFlush(byte[] encodedRegionName, long maxFlushedSeqId) {
472+
this.sequenceIdAccounting.completeCacheFlush(encodedRegionName, maxFlushedSeqId);
473473
}
474474

475475
@Override

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -350,9 +350,36 @@ Long startCacheFlush(final byte[] encodedRegionName, final Map<byte[], Long> fam
350350
return lowestUnflushedInRegion;
351351
}
352352

353-
void completeCacheFlush(final byte[] encodedRegionName) {
353+
void completeCacheFlush(byte[] encodedRegionName, long maxFlushedSeqId) {
354+
// This is a simple hack to avoid maxFlushedSeqId go backwards.
355+
// The system works fine normally, but if we make use of Durability.ASYNC_WAL and we are going
356+
// to flush all the stores, the maxFlushedSeqId will be next seq id of the region, but we may
357+
// still have some unsynced WAL entries in the ringbuffer after we call startCacheFlush, and
358+
// then it will be recorded as the lowestUnflushedSeqId by the above update method, which is
359+
// less than the current maxFlushedSeqId. And if next time we only flush the family with this
360+
// unusual lowestUnflushedSeqId, the maxFlushedSeqId will go backwards.
361+
// This is an unexpected behavior so we should fix it, otherwise it may cause unexpected
362+
// behavior in other area.
363+
// The solution here is a bit hack but fine. Just replace the lowestUnflushedSeqId with
364+
// maxFlushedSeqId + 1 if it is lesser. The meaning of maxFlushedSeqId is that, all edits less
365+
// than or equal to it have been flushed, i.e, persistent to HFile, so set
366+
// lowestUnflushedSequenceId to maxFlushedSeqId + 1 will not cause data loss.
367+
// And technically, using +1 is fine here. If the maxFlushesSeqId is just the flushOpSeqId, it
368+
// means we have flushed all the stores so the seq id for actual data should be at least plus 1.
369+
// And if we do not flush all the stores, then the maxFlushedSeqId is calculated by
370+
// lowestUnflushedSeqId - 1, so here let's plus the 1 back.
371+
Long wrappedSeqId = Long.valueOf(maxFlushedSeqId + 1);
354372
synchronized (tieLock) {
355373
this.flushingSequenceIds.remove(encodedRegionName);
374+
Map<ImmutableByteArray, Long> unflushed = lowestUnflushedSequenceIds.get(encodedRegionName);
375+
if (unflushed == null) {
376+
return;
377+
}
378+
for (Map.Entry<ImmutableByteArray, Long> e : unflushed.entrySet()) {
379+
if (e.getValue().longValue() <= maxFlushedSeqId) {
380+
e.setValue(wrappedSeqId);
381+
}
382+
}
356383
}
357384
}
358385

hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ public Long startCacheFlush(final byte[] encodedRegionName, Set<byte[]> flushedF
225225
}
226226

227227
@Override
228-
public void completeCacheFlush(final byte[] encodedRegionName) {
228+
public void completeCacheFlush(final byte[] encodedRegionName, long maxFlushedSeqId) {
229229
}
230230

231231
@Override

hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -184,7 +184,7 @@ default void sync(long txid, boolean forceSync) throws IOException {
184184
* being flushed; in other words, this is effectively same as a flush of all of the region
185185
* though we were passed a subset of regions. Otherwise, it returns the sequence id of the
186186
* oldest/lowest outstanding edit.
187-
* @see #completeCacheFlush(byte[])
187+
* @see #completeCacheFlush(byte[], long)
188188
* @see #abortCacheFlush(byte[])
189189
*/
190190
Long startCacheFlush(final byte[] encodedRegionName, Set<byte[]> families);
@@ -194,10 +194,12 @@ default void sync(long txid, boolean forceSync) throws IOException {
194194
/**
195195
* Complete the cache flush.
196196
* @param encodedRegionName Encoded region name.
197+
* @param maxFlushedSeqId The maxFlushedSeqId for this flush. There is no edit in memory that is
198+
* less that this sequence id.
197199
* @see #startCacheFlush(byte[], Set)
198200
* @see #abortCacheFlush(byte[])
199201
*/
200-
void completeCacheFlush(final byte[] encodedRegionName);
202+
void completeCacheFlush(final byte[] encodedRegionName, long maxFlushedSeqId);
201203

202204
/**
203205
* Abort a cache flush. Call if the flush fails. Note that the only recovery

hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java

Lines changed: 154 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -30,17 +30,21 @@
3030

3131
import java.io.IOException;
3232
import java.util.ArrayList;
33+
import java.util.Collection;
34+
import java.util.Collections;
3335
import java.util.Comparator;
3436
import java.util.List;
3537
import java.util.Map;
3638
import java.util.NavigableMap;
3739
import java.util.Set;
3840
import java.util.TreeMap;
3941
import java.util.UUID;
42+
import java.util.concurrent.ConcurrentSkipListMap;
4043
import java.util.concurrent.CountDownLatch;
4144
import java.util.concurrent.ExecutorService;
4245
import java.util.concurrent.Executors;
4346
import java.util.concurrent.atomic.AtomicBoolean;
47+
import java.util.stream.Collectors;
4448
import org.apache.hadoop.conf.Configuration;
4549
import org.apache.hadoop.fs.FileStatus;
4650
import org.apache.hadoop.fs.FileSystem;
@@ -65,7 +69,10 @@
6569
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
6670
import org.apache.hadoop.hbase.coprocessor.SampleRegionWALCoprocessor;
6771
import org.apache.hadoop.hbase.regionserver.ChunkCreator;
72+
import org.apache.hadoop.hbase.regionserver.FlushPolicy;
73+
import org.apache.hadoop.hbase.regionserver.FlushPolicyFactory;
6874
import org.apache.hadoop.hbase.regionserver.HRegion;
75+
import org.apache.hadoop.hbase.regionserver.HStore;
6976
import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
7077
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
7178
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
@@ -186,12 +193,10 @@ protected void addEdits(WAL log, RegionInfo hri, TableDescriptor htd, int times,
186193

187194
/**
188195
* helper method to simulate region flush for a WAL.
189-
* @param wal
190-
* @param regionEncodedName
191196
*/
192197
protected void flushRegion(WAL wal, byte[] regionEncodedName, Set<byte[]> flushedFamilyNames) {
193198
wal.startCacheFlush(regionEncodedName, flushedFamilyNames);
194-
wal.completeCacheFlush(regionEncodedName);
199+
wal.completeCacheFlush(regionEncodedName, HConstants.NO_SEQNUM);
195200
}
196201

197202
/**
@@ -333,7 +338,7 @@ public void testFindMemStoresEligibleForFlush() throws Exception {
333338
// tests partial flush: roll on a partial flush, and ensure that wal is not archived.
334339
wal.startCacheFlush(hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames());
335340
wal.rollWriter();
336-
wal.completeCacheFlush(hri1.getEncodedNameAsBytes());
341+
wal.completeCacheFlush(hri1.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
337342
assertEquals(1, wal.getNumRolledLogFiles());
338343
} finally {
339344
if (wal != null) {
@@ -488,93 +493,165 @@ public void testWriteEntryCanBeNull() throws IOException {
488493
}
489494
}
490495

491-
@Test
492-
public void testUnflushedSeqIdTrackingWithAsyncWal() throws IOException, InterruptedException {
493-
final String testName = currentTest.getMethodName();
494-
final byte[] b = Bytes.toBytes("b");
495-
496-
final AtomicBoolean startHoldingForAppend = new AtomicBoolean(false);
497-
final CountDownLatch holdAppend = new CountDownLatch(1);
498-
final CountDownLatch closeFinished = new CountDownLatch(1);
499-
final CountDownLatch putFinished = new CountDownLatch(1);
500-
501-
try (AbstractFSWAL<?> wal = newWAL(FS, FSUtils.getRootDir(CONF), testName,
502-
HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null)) {
503-
wal.init();
504-
wal.registerWALActionsListener(new WALActionsListener() {
505-
@Override
506-
public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException {
507-
if (startHoldingForAppend.get()) {
508-
try {
509-
holdAppend.await();
510-
} catch (InterruptedException e) {
511-
LOG.error(e.toString(), e);
512-
}
513-
}
514-
}
515-
});
516-
517-
// open a new region which uses this WAL
518-
TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf("table"))
519-
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(b)).build();
520-
RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
521-
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
522-
TEST_UTIL.createLocalHRegion(hri, htd, wal).close();
523-
RegionServerServices rsServices = mock(RegionServerServices.class);
524-
when(rsServices.getServerName()).thenReturn(ServerName.valueOf("localhost:12345", 123456));
525-
when(rsServices.getConfiguration()).thenReturn(TEST_UTIL.getConfiguration());
526-
final HRegion region = HRegion.openHRegion(TEST_UTIL.getDataTestDir(), hri, htd, wal,
527-
TEST_UTIL.getConfiguration(), rsServices, null);
528-
529-
ExecutorService exec = Executors.newFixedThreadPool(2);
530-
531-
// do a regular write first because of memstore size calculation.
532-
region.put(new Put(b).addColumn(b, b, b));
533-
534-
startHoldingForAppend.set(true);
535-
exec.submit(new Runnable() {
536-
@Override
537-
public void run() {
496+
private AbstractFSWAL<?> createHoldingWAL(String testName, AtomicBoolean startHoldingForAppend,
497+
CountDownLatch holdAppend) throws IOException {
498+
AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getRootDir(CONF), testName,
499+
HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null);
500+
wal.init();
501+
wal.registerWALActionsListener(new WALActionsListener() {
502+
@Override
503+
public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException {
504+
if (startHoldingForAppend.get()) {
538505
try {
539-
region.put(new Put(b).addColumn(b, b, b).setDurability(Durability.ASYNC_WAL));
540-
putFinished.countDown();
541-
} catch (IOException e) {
506+
holdAppend.await();
507+
} catch (InterruptedException e) {
542508
LOG.error(e.toString(), e);
543509
}
544510
}
545-
});
511+
}
512+
});
513+
return wal;
514+
}
515+
516+
private HRegion createHoldingHRegion(Configuration conf, TableDescriptor htd, WAL wal)
517+
throws IOException {
518+
RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
519+
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
520+
TEST_UTIL.createLocalHRegion(hri, htd, wal).close();
521+
RegionServerServices rsServices = mock(RegionServerServices.class);
522+
when(rsServices.getServerName()).thenReturn(ServerName.valueOf("localhost:12345", 123456));
523+
when(rsServices.getConfiguration()).thenReturn(conf);
524+
return HRegion.openHRegion(TEST_UTIL.getDataTestDir(), hri, htd, wal, conf, rsServices, null);
525+
}
546526

547-
// give the put a chance to start
548-
Threads.sleep(3000);
527+
private void doPutWithAsyncWAL(ExecutorService exec, HRegion region, Put put,
528+
Runnable flushOrCloseRegion, AtomicBoolean startHoldingForAppend,
529+
CountDownLatch flushOrCloseFinished, CountDownLatch holdAppend)
530+
throws InterruptedException, IOException {
531+
// do a regular write first because of memstore size calculation.
532+
region.put(put);
549533

550-
exec.submit(new Runnable() {
551-
@Override
552-
public void run() {
553-
try {
554-
Map<?, ?> closeResult = region.close();
555-
LOG.info("Close result:" + closeResult);
556-
closeFinished.countDown();
557-
} catch (IOException e) {
558-
LOG.error(e.toString(), e);
559-
}
560-
}
561-
});
534+
startHoldingForAppend.set(true);
535+
region.put(new Put(put).setDurability(Durability.ASYNC_WAL));
562536

563-
// give the flush a chance to start. Flush should have got the region lock, and
564-
// should have been waiting on the mvcc complete after this.
565-
Threads.sleep(3000);
537+
// give the put a chance to start
538+
Threads.sleep(3000);
566539

567-
// let the append to WAL go through now that the flush already started
568-
holdAppend.countDown();
569-
putFinished.await();
570-
closeFinished.await();
540+
exec.submit(flushOrCloseRegion);
541+
542+
// give the flush a chance to start. Flush should have got the region lock, and
543+
// should have been waiting on the mvcc complete after this.
544+
Threads.sleep(3000);
545+
546+
// let the append to WAL go through now that the flush already started
547+
holdAppend.countDown();
548+
flushOrCloseFinished.await();
549+
}
550+
551+
// Testcase for HBASE-23181
552+
@Test
553+
public void testUnflushedSeqIdTrackingWithAsyncWal() throws IOException, InterruptedException {
554+
String testName = currentTest.getMethodName();
555+
byte[] b = Bytes.toBytes("b");
556+
TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf("table"))
557+
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(b)).build();
558+
559+
AtomicBoolean startHoldingForAppend = new AtomicBoolean(false);
560+
CountDownLatch holdAppend = new CountDownLatch(1);
561+
CountDownLatch closeFinished = new CountDownLatch(1);
562+
ExecutorService exec = Executors.newFixedThreadPool(1);
563+
AbstractFSWAL<?> wal = createHoldingWAL(testName, startHoldingForAppend, holdAppend);
564+
// open a new region which uses this WAL
565+
HRegion region = createHoldingHRegion(TEST_UTIL.getConfiguration(), htd, wal);
566+
try {
567+
doPutWithAsyncWAL(exec, region, new Put(b).addColumn(b, b, b), () -> {
568+
try {
569+
Map<?, ?> closeResult = region.close();
570+
LOG.info("Close result:" + closeResult);
571+
closeFinished.countDown();
572+
} catch (IOException e) {
573+
LOG.error(e.toString(), e);
574+
}
575+
}, startHoldingForAppend, closeFinished, holdAppend);
571576

572577
// now check the region's unflushed seqIds.
573-
long seqId = wal.getEarliestMemStoreSeqNum(hri.getEncodedNameAsBytes());
578+
long seqId = wal.getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
574579
assertEquals("Found seqId for the region which is already closed", HConstants.NO_SEQNUM,
575580
seqId);
581+
} finally {
582+
exec.shutdownNow();
583+
region.close();
584+
wal.close();
585+
}
586+
}
576587

588+
private static final Set<byte[]> STORES_TO_FLUSH =
589+
Collections.newSetFromMap(new ConcurrentSkipListMap<byte[], Boolean>(Bytes.BYTES_COMPARATOR));
590+
591+
// Testcase for HBASE-23157
592+
@Test
593+
public void testMaxFlushedSequenceIdGoBackwards() throws IOException, InterruptedException {
594+
String testName = currentTest.getMethodName();
595+
byte[] a = Bytes.toBytes("a");
596+
byte[] b = Bytes.toBytes("b");
597+
TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf("table"))
598+
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(a))
599+
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(b)).build();
600+
601+
AtomicBoolean startHoldingForAppend = new AtomicBoolean(false);
602+
CountDownLatch holdAppend = new CountDownLatch(1);
603+
CountDownLatch flushFinished = new CountDownLatch(1);
604+
ExecutorService exec = Executors.newFixedThreadPool(2);
605+
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
606+
conf.setClass(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushSpecificStoresPolicy.class,
607+
FlushPolicy.class);
608+
AbstractFSWAL<?> wal = createHoldingWAL(testName, startHoldingForAppend, holdAppend);
609+
// open a new region which uses this WAL
610+
HRegion region = createHoldingHRegion(conf, htd, wal);
611+
try {
612+
Put put = new Put(a).addColumn(a, a, a).addColumn(b, b, b);
613+
doPutWithAsyncWAL(exec, region, put, () -> {
614+
try {
615+
HRegion.FlushResult flushResult = region.flush(true);
616+
LOG.info("Flush result:" + flushResult.getResult());
617+
LOG.info("Flush succeeded:" + flushResult.isFlushSucceeded());
618+
flushFinished.countDown();
619+
} catch (IOException e) {
620+
LOG.error(e.toString(), e);
621+
}
622+
}, startHoldingForAppend, flushFinished, holdAppend);
623+
624+
// get the max flushed sequence id after the first flush
625+
long maxFlushedSeqId1 = region.getMaxFlushedSeqId();
626+
627+
region.put(put);
628+
// this time we only flush family a
629+
STORES_TO_FLUSH.add(a);
630+
region.flush(false);
631+
632+
// get the max flushed sequence id after the second flush
633+
long maxFlushedSeqId2 = region.getMaxFlushedSeqId();
634+
// make sure that the maxFlushedSequenceId does not go backwards
635+
assertTrue(
636+
"maxFlushedSeqId1(" + maxFlushedSeqId1 +
637+
") is not greater than or equal to maxFlushedSeqId2(" + maxFlushedSeqId2 + ")",
638+
maxFlushedSeqId1 <= maxFlushedSeqId2);
639+
} finally {
640+
exec.shutdownNow();
641+
region.close();
577642
wal.close();
578643
}
579644
}
645+
646+
public static final class FlushSpecificStoresPolicy extends FlushPolicy {
647+
648+
@Override
649+
public Collection<HStore> selectStoresToFlush() {
650+
if (STORES_TO_FLUSH.isEmpty()) {
651+
return region.getStores();
652+
} else {
653+
return STORES_TO_FLUSH.stream().map(region::getStore).collect(Collectors.toList());
654+
}
655+
}
656+
}
580657
}

0 commit comments

Comments
 (0)