Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public static ImmutableByteArray wrap(byte[] b) {
return new ImmutableByteArray(b);
}

@Override
public String toString() {
return Bytes.toStringBinary(b);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2977,7 +2977,7 @@ protected FlushResultImpl internalFlushCacheAndCommit(WAL wal, MonitoredTask sta

// If we get to here, the HStores have been written.
if (wal != null) {
wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes());
wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes(), flushedSeqId);
}

// Record latest flush time
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -517,8 +517,8 @@ public Long startCacheFlush(byte[] encodedRegionName, Map<byte[], Long> familyTo
}

@Override
public void completeCacheFlush(byte[] encodedRegionName) {
this.sequenceIdAccounting.completeCacheFlush(encodedRegionName);
public void completeCacheFlush(byte[] encodedRegionName, long maxFlushedSeqId) {
this.sequenceIdAccounting.completeCacheFlush(encodedRegionName, maxFlushedSeqId);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,9 +352,36 @@ Long startCacheFlush(final byte[] encodedRegionName, final Map<byte[], Long> fam
return lowestUnflushedInRegion;
}

void completeCacheFlush(final byte[] encodedRegionName) {
void completeCacheFlush(byte[] encodedRegionName, long maxFlushedSeqId) {
// This is a simple hack to avoid maxFlushedSeqId go backwards.
// The system works fine normally, but if we make use of Durability.ASYNC_WAL and we are going
// to flush all the stores, the maxFlushedSeqId will be next seq id of the region, but we may
// still have some unsynced WAL entries in the ringbuffer after we call startCacheFlush, and
// then it will be recorded as the lowestUnflushedSeqId by the above update method, which is
// less than the current maxFlushedSeqId. And if next time we only flush the family with this
// unusual lowestUnflushedSeqId, the maxFlushedSeqId will go backwards.
// This is an unexpected behavior so we should fix it, otherwise it may cause unexpected
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How we going to fix it? Can't we drive through the flush marker before completing the flush?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we need to do a wal sync under the updateLock.writeLock, which will impact the performance.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

... maybe a slight hiccup for the async wal case -- maybe, given sync'ing to HDFS is such an erratic affair -- but we will also be more 'correct' having pushed out all in the ring buffer ahead of this flush sync w/o having to rewrite their sequenceid.

// behavior in other area.
// The solution here is a bit hack but fine. Just replace the lowestUnflushedSeqId with
// maxFlushedSeqId + 1 if it is lesser. The meaning of maxFlushedSeqId is that, all edits less
// than or equal to it have been flushed, i.e, persistent to HFile, so set
// lowestUnflushedSequenceId to maxFlushedSeqId + 1 will not cause data loss.
// And technically, using +1 is fine here. If the maxFlushesSeqId is just the flushOpSeqId, it
// means we have flushed all the stores so the seq id for actual data should be at least plus 1.
// And if we do not flush all the stores, then the maxFlushedSeqId is calculated by
// lowestUnflushedSeqId - 1, so here let's plus the 1 back.
Long wrappedSeqId = Long.valueOf(maxFlushedSeqId + 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The +1 makes me wary. Just add these edits w/ maxFlushedSeqId? Wouldn't that be safer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I explained why we need to +1 here, so what's your real problem? If you do not want to +1, you need to change bunch of comments and field names...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense.

Your comment/explanation here is good. It is also where it should be, in the class named SequenceIdAccounting. Though hard to follow, it should be possible to read this one class to find how sequnceid accounting is done in the system including tricks to keep the system working when the likes of ASYNC_WAL is enabled.

synchronized (tieLock) {
this.flushingSequenceIds.remove(encodedRegionName);
Map<ImmutableByteArray, Long> unflushed = lowestUnflushedSequenceIds.get(encodedRegionName);
if (unflushed == null) {
return;
}
for (Map.Entry<ImmutableByteArray, Long> e : unflushed.entrySet()) {
if (e.getValue().longValue() <= maxFlushedSeqId) {
e.setValue(wrappedSeqId);
}
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ public Long startCacheFlush(final byte[] encodedRegionName, Set<byte[]> flushedF
}

@Override
public void completeCacheFlush(final byte[] encodedRegionName) {
public void completeCacheFlush(final byte[] encodedRegionName, long maxFlushedSeqId) {
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ default void sync(long txid, boolean forceSync) throws IOException {
* being flushed; in other words, this is effectively same as a flush of all of the region
* though we were passed a subset of regions. Otherwise, it returns the sequence id of the
* oldest/lowest outstanding edit.
* @see #completeCacheFlush(byte[])
* @see #completeCacheFlush(byte[], long)
* @see #abortCacheFlush(byte[])
*/
Long startCacheFlush(final byte[] encodedRegionName, Set<byte[]> families);
Expand All @@ -194,10 +194,12 @@ default void sync(long txid, boolean forceSync) throws IOException {
/**
* Complete the cache flush.
* @param encodedRegionName Encoded region name.
* @param maxFlushedSeqId The maxFlushedSeqId for this flush. There is no edit in memory that is
* less that this sequence id.
* @see #startCacheFlush(byte[], Set)
* @see #abortCacheFlush(byte[])
*/
void completeCacheFlush(final byte[] encodedRegionName);
void completeCacheFlush(final byte[] encodedRegionName, long maxFlushedSeqId);

/**
* Abort a cache flush. Call if the flush fails. Note that the only recovery
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
Expand All @@ -38,10 +40,12 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -67,7 +71,10 @@
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.SampleRegionWALCoprocessor;
import org.apache.hadoop.hbase.regionserver.ChunkCreator;
import org.apache.hadoop.hbase.regionserver.FlushPolicy;
import org.apache.hadoop.hbase.regionserver.FlushPolicyFactory;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
Expand Down Expand Up @@ -187,12 +194,10 @@ protected void addEdits(WAL log, RegionInfo hri, TableDescriptor htd, int times,

/**
* helper method to simulate region flush for a WAL.
* @param wal
* @param regionEncodedName
*/
protected void flushRegion(WAL wal, byte[] regionEncodedName, Set<byte[]> flushedFamilyNames) {
wal.startCacheFlush(regionEncodedName, flushedFamilyNames);
wal.completeCacheFlush(regionEncodedName);
wal.completeCacheFlush(regionEncodedName, HConstants.NO_SEQNUM);
}

/**
Expand Down Expand Up @@ -349,7 +354,7 @@ public void testFindMemStoresEligibleForFlush() throws Exception {
// tests partial flush: roll on a partial flush, and ensure that wal is not archived.
wal.startCacheFlush(hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames());
wal.rollWriter();
wal.completeCacheFlush(hri1.getEncodedNameAsBytes());
wal.completeCacheFlush(hri1.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
assertEquals(1, wal.getNumRolledLogFiles());

// clear test data
Expand Down Expand Up @@ -533,93 +538,165 @@ public void testRollWriterForClosedWAL() throws IOException {
wal.rollWriter();
}

@Test
public void testUnflushedSeqIdTrackingWithAsyncWal() throws IOException, InterruptedException {
final String testName = currentTest.getMethodName();
final byte[] b = Bytes.toBytes("b");

final AtomicBoolean startHoldingForAppend = new AtomicBoolean(false);
final CountDownLatch holdAppend = new CountDownLatch(1);
final CountDownLatch closeFinished = new CountDownLatch(1);
final CountDownLatch putFinished = new CountDownLatch(1);

try (AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getRootDir(CONF), testName,
HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null)) {
wal.init();
wal.registerWALActionsListener(new WALActionsListener() {
@Override
public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException {
if (startHoldingForAppend.get()) {
try {
holdAppend.await();
} catch (InterruptedException e) {
LOG.error(e.toString(), e);
}
}
}
});

// open a new region which uses this WAL
TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf("table"))
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(b)).build();
RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
TEST_UTIL.createLocalHRegion(hri, htd, wal).close();
RegionServerServices rsServices = mock(RegionServerServices.class);
when(rsServices.getServerName()).thenReturn(ServerName.valueOf("localhost:12345", 123456));
when(rsServices.getConfiguration()).thenReturn(TEST_UTIL.getConfiguration());
final HRegion region = HRegion.openHRegion(TEST_UTIL.getDataTestDir(), hri, htd, wal,
TEST_UTIL.getConfiguration(), rsServices, null);

ExecutorService exec = Executors.newFixedThreadPool(2);

// do a regular write first because of memstore size calculation.
region.put(new Put(b).addColumn(b, b, b));

startHoldingForAppend.set(true);
exec.submit(new Runnable() {
@Override
public void run() {
private AbstractFSWAL<?> createHoldingWAL(String testName, AtomicBoolean startHoldingForAppend,
CountDownLatch holdAppend) throws IOException {
AbstractFSWAL<?> wal = newWAL(FS, CommonFSUtils.getRootDir(CONF), testName,
HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null);
wal.init();
wal.registerWALActionsListener(new WALActionsListener() {
@Override
public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException {
if (startHoldingForAppend.get()) {
try {
region.put(new Put(b).addColumn(b, b, b).setDurability(Durability.ASYNC_WAL));
putFinished.countDown();
} catch (IOException e) {
holdAppend.await();
} catch (InterruptedException e) {
LOG.error(e.toString(), e);
}
}
});
}
});
return wal;
}

// give the put a chance to start
Threads.sleep(3000);
private HRegion createHoldingHRegion(Configuration conf, TableDescriptor htd, WAL wal)
throws IOException {
RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
TEST_UTIL.createLocalHRegion(hri, htd, wal).close();
RegionServerServices rsServices = mock(RegionServerServices.class);
when(rsServices.getServerName()).thenReturn(ServerName.valueOf("localhost:12345", 123456));
when(rsServices.getConfiguration()).thenReturn(conf);
return HRegion.openHRegion(TEST_UTIL.getDataTestDir(), hri, htd, wal, conf, rsServices, null);
}

exec.submit(new Runnable() {
@Override
public void run() {
try {
Map<?, ?> closeResult = region.close();
LOG.info("Close result:" + closeResult);
closeFinished.countDown();
} catch (IOException e) {
LOG.error(e.toString(), e);
}
}
});
private void doPutWithAsyncWAL(ExecutorService exec, HRegion region, Put put,
Runnable flushOrCloseRegion, AtomicBoolean startHoldingForAppend,
CountDownLatch flushOrCloseFinished, CountDownLatch holdAppend)
throws InterruptedException, IOException {
// do a regular write first because of memstore size calculation.
region.put(put);

startHoldingForAppend.set(true);
region.put(new Put(put).setDurability(Durability.ASYNC_WAL));

// give the put a chance to start
Threads.sleep(3000);

// give the flush a chance to start. Flush should have got the region lock, and
// should have been waiting on the mvcc complete after this.
Threads.sleep(3000);
exec.submit(flushOrCloseRegion);

// let the append to WAL go through now that the flush already started
holdAppend.countDown();
putFinished.await();
closeFinished.await();
// give the flush a chance to start. Flush should have got the region lock, and
// should have been waiting on the mvcc complete after this.
Threads.sleep(3000);

// let the append to WAL go through now that the flush already started
holdAppend.countDown();
flushOrCloseFinished.await();
}

// Testcase for HBASE-23181
@Test
public void testUnflushedSeqIdTrackingWithAsyncWal() throws IOException, InterruptedException {
String testName = currentTest.getMethodName();
byte[] b = Bytes.toBytes("b");
TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf("table"))
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(b)).build();

AtomicBoolean startHoldingForAppend = new AtomicBoolean(false);
CountDownLatch holdAppend = new CountDownLatch(1);
CountDownLatch closeFinished = new CountDownLatch(1);
ExecutorService exec = Executors.newFixedThreadPool(1);
AbstractFSWAL<?> wal = createHoldingWAL(testName, startHoldingForAppend, holdAppend);
// open a new region which uses this WAL
HRegion region = createHoldingHRegion(TEST_UTIL.getConfiguration(), htd, wal);
try {
doPutWithAsyncWAL(exec, region, new Put(b).addColumn(b, b, b), () -> {
try {
Map<?, ?> closeResult = region.close();
LOG.info("Close result:" + closeResult);
closeFinished.countDown();
} catch (IOException e) {
LOG.error(e.toString(), e);
}
}, startHoldingForAppend, closeFinished, holdAppend);

// now check the region's unflushed seqIds.
long seqId = wal.getEarliestMemStoreSeqNum(hri.getEncodedNameAsBytes());
long seqId = wal.getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
assertEquals("Found seqId for the region which is already closed", HConstants.NO_SEQNUM,
seqId);
} finally {
exec.shutdownNow();
region.close();
wal.close();
}
}

private static final Set<byte[]> STORES_TO_FLUSH =
Collections.newSetFromMap(new ConcurrentSkipListMap<byte[], Boolean>(Bytes.BYTES_COMPARATOR));

// Testcase for HBASE-23157
@Test
public void testMaxFlushedSequenceIdGoBackwards() throws IOException, InterruptedException {
String testName = currentTest.getMethodName();
byte[] a = Bytes.toBytes("a");
byte[] b = Bytes.toBytes("b");
TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf("table"))
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(a))
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(b)).build();

AtomicBoolean startHoldingForAppend = new AtomicBoolean(false);
CountDownLatch holdAppend = new CountDownLatch(1);
CountDownLatch flushFinished = new CountDownLatch(1);
ExecutorService exec = Executors.newFixedThreadPool(2);
Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
conf.setClass(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushSpecificStoresPolicy.class,
FlushPolicy.class);
AbstractFSWAL<?> wal = createHoldingWAL(testName, startHoldingForAppend, holdAppend);
// open a new region which uses this WAL
HRegion region = createHoldingHRegion(conf, htd, wal);
try {
Put put = new Put(a).addColumn(a, a, a).addColumn(b, b, b);
doPutWithAsyncWAL(exec, region, put, () -> {
try {
HRegion.FlushResult flushResult = region.flush(true);
LOG.info("Flush result:" + flushResult.getResult());
LOG.info("Flush succeeded:" + flushResult.isFlushSucceeded());
flushFinished.countDown();
} catch (IOException e) {
LOG.error(e.toString(), e);
}
}, startHoldingForAppend, flushFinished, holdAppend);

// get the max flushed sequence id after the first flush
long maxFlushedSeqId1 = region.getMaxFlushedSeqId();

region.put(put);
// this time we only flush family a
STORES_TO_FLUSH.add(a);
region.flush(false);

// get the max flushed sequence id after the second flush
long maxFlushedSeqId2 = region.getMaxFlushedSeqId();
// make sure that the maxFlushedSequenceId does not go backwards
assertTrue(
"maxFlushedSeqId1(" + maxFlushedSeqId1 +
") is not greater than or equal to maxFlushedSeqId2(" + maxFlushedSeqId2 + ")",
maxFlushedSeqId1 <= maxFlushedSeqId2);
} finally {
exec.shutdownNow();
region.close();
wal.close();
}
}

public static final class FlushSpecificStoresPolicy extends FlushPolicy {

@Override
public Collection<HStore> selectStoresToFlush() {
if (STORES_TO_FLUSH.isEmpty()) {
return region.getStores();
} else {
return STORES_TO_FLUSH.stream().map(region::getStore).collect(Collectors.toList());
}
}
}
}
Loading