diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ImmutableByteArray.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ImmutableByteArray.java index 1232b9ce35e9..57f7cbc7be03 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ImmutableByteArray.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ImmutableByteArray.java @@ -48,6 +48,7 @@ public static ImmutableByteArray wrap(byte[] b) { return new ImmutableByteArray(b); } + @Override public String toString() { return Bytes.toStringBinary(b); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 40a009c2c7c0..9c0b792813f2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -2977,7 +2977,7 @@ protected FlushResultImpl internalFlushCacheAndCommit(WAL wal, MonitoredTask sta // If we get to here, the HStores have been written. if (wal != null) { - wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes()); + wal.completeCacheFlush(this.getRegionInfo().getEncodedNameAsBytes(), flushedSeqId); } // Record latest flush time diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index a1e4facde712..d2c624ab446c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -517,8 +517,8 @@ public Long startCacheFlush(byte[] encodedRegionName, Map familyTo } @Override - public void completeCacheFlush(byte[] encodedRegionName) { - this.sequenceIdAccounting.completeCacheFlush(encodedRegionName); + public void completeCacheFlush(byte[] encodedRegionName, long maxFlushedSeqId) { + this.sequenceIdAccounting.completeCacheFlush(encodedRegionName, maxFlushedSeqId); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java index 986a10f68dd5..7146ca743926 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/SequenceIdAccounting.java @@ -352,9 +352,36 @@ Long startCacheFlush(final byte[] encodedRegionName, final Map fam return lowestUnflushedInRegion; } - void completeCacheFlush(final byte[] encodedRegionName) { + void completeCacheFlush(byte[] encodedRegionName, long maxFlushedSeqId) { + // This is a simple hack to avoid maxFlushedSeqId go backwards. + // The system works fine normally, but if we make use of Durability.ASYNC_WAL and we are going + // to flush all the stores, the maxFlushedSeqId will be next seq id of the region, but we may + // still have some unsynced WAL entries in the ringbuffer after we call startCacheFlush, and + // then it will be recorded as the lowestUnflushedSeqId by the above update method, which is + // less than the current maxFlushedSeqId. And if next time we only flush the family with this + // unusual lowestUnflushedSeqId, the maxFlushedSeqId will go backwards. + // This is an unexpected behavior so we should fix it, otherwise it may cause unexpected + // behavior in other area. + // The solution here is a bit hack but fine. Just replace the lowestUnflushedSeqId with + // maxFlushedSeqId + 1 if it is lesser. The meaning of maxFlushedSeqId is that, all edits less + // than or equal to it have been flushed, i.e, persistent to HFile, so set + // lowestUnflushedSequenceId to maxFlushedSeqId + 1 will not cause data loss. + // And technically, using +1 is fine here. If the maxFlushesSeqId is just the flushOpSeqId, it + // means we have flushed all the stores so the seq id for actual data should be at least plus 1. + // And if we do not flush all the stores, then the maxFlushedSeqId is calculated by + // lowestUnflushedSeqId - 1, so here let's plus the 1 back. + Long wrappedSeqId = Long.valueOf(maxFlushedSeqId + 1); synchronized (tieLock) { this.flushingSequenceIds.remove(encodedRegionName); + Map unflushed = lowestUnflushedSequenceIds.get(encodedRegionName); + if (unflushed == null) { + return; + } + for (Map.Entry e : unflushed.entrySet()) { + if (e.getValue().longValue() <= maxFlushedSeqId) { + e.setValue(wrappedSeqId); + } + } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java index dbc08cc84828..0ff2195eaa04 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java @@ -224,7 +224,7 @@ public Long startCacheFlush(final byte[] encodedRegionName, Set flushedF } @Override - public void completeCacheFlush(final byte[] encodedRegionName) { + public void completeCacheFlush(final byte[] encodedRegionName, long maxFlushedSeqId) { } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java index 902ca6d398a6..747b2770d457 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java @@ -184,7 +184,7 @@ default void sync(long txid, boolean forceSync) throws IOException { * being flushed; in other words, this is effectively same as a flush of all of the region * though we were passed a subset of regions. Otherwise, it returns the sequence id of the * oldest/lowest outstanding edit. - * @see #completeCacheFlush(byte[]) + * @see #completeCacheFlush(byte[], long) * @see #abortCacheFlush(byte[]) */ Long startCacheFlush(final byte[] encodedRegionName, Set families); @@ -194,10 +194,12 @@ default void sync(long txid, boolean forceSync) throws IOException { /** * Complete the cache flush. * @param encodedRegionName Encoded region name. + * @param maxFlushedSeqId The maxFlushedSeqId for this flush. There is no edit in memory that is + * less that this sequence id. * @see #startCacheFlush(byte[], Set) * @see #abortCacheFlush(byte[]) */ - void completeCacheFlush(final byte[] encodedRegionName); + void completeCacheFlush(final byte[] encodedRegionName, long maxFlushedSeqId); /** * Abort a cache flush. Call if the flush fails. Note that the only recovery diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java index 7c491dc2583d..5188165b944f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestFSWAL.java @@ -30,6 +30,8 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.Comparator; import java.util.HashSet; import java.util.List; @@ -38,10 +40,12 @@ import java.util.Set; import java.util.TreeMap; import java.util.UUID; +import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -67,7 +71,10 @@ import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.SampleRegionWALCoprocessor; import org.apache.hadoop.hbase.regionserver.ChunkCreator; +import org.apache.hadoop.hbase.regionserver.FlushPolicy; +import org.apache.hadoop.hbase.regionserver.FlushPolicyFactory; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HStore; import org.apache.hadoop.hbase.regionserver.MemStoreLABImpl; import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl; import org.apache.hadoop.hbase.regionserver.RegionServerServices; @@ -187,12 +194,10 @@ protected void addEdits(WAL log, RegionInfo hri, TableDescriptor htd, int times, /** * helper method to simulate region flush for a WAL. - * @param wal - * @param regionEncodedName */ protected void flushRegion(WAL wal, byte[] regionEncodedName, Set flushedFamilyNames) { wal.startCacheFlush(regionEncodedName, flushedFamilyNames); - wal.completeCacheFlush(regionEncodedName); + wal.completeCacheFlush(regionEncodedName, HConstants.NO_SEQNUM); } /** @@ -349,7 +354,7 @@ public void testFindMemStoresEligibleForFlush() throws Exception { // tests partial flush: roll on a partial flush, and ensure that wal is not archived. wal.startCacheFlush(hri1.getEncodedNameAsBytes(), t1.getColumnFamilyNames()); wal.rollWriter(); - wal.completeCacheFlush(hri1.getEncodedNameAsBytes()); + wal.completeCacheFlush(hri1.getEncodedNameAsBytes(), HConstants.NO_SEQNUM); assertEquals(1, wal.getNumRolledLogFiles()); // clear test data @@ -533,93 +538,165 @@ public void testRollWriterForClosedWAL() throws IOException { wal.rollWriter(); } - @Test - public void testUnflushedSeqIdTrackingWithAsyncWal() throws IOException, InterruptedException { - final String testName = currentTest.getMethodName(); - final byte[] b = Bytes.toBytes("b"); - - final AtomicBoolean startHoldingForAppend = new AtomicBoolean(false); - final CountDownLatch holdAppend = new CountDownLatch(1); - final CountDownLatch closeFinished = new CountDownLatch(1); - final CountDownLatch putFinished = new CountDownLatch(1); - - try (AbstractFSWAL wal = newWAL(FS, CommonFSUtils.getRootDir(CONF), testName, - HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null)) { - wal.init(); - wal.registerWALActionsListener(new WALActionsListener() { - @Override - public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException { - if (startHoldingForAppend.get()) { - try { - holdAppend.await(); - } catch (InterruptedException e) { - LOG.error(e.toString(), e); - } - } - } - }); - - // open a new region which uses this WAL - TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf("table")) - .setColumnFamily(ColumnFamilyDescriptorBuilder.of(b)).build(); - RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); - ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); - TEST_UTIL.createLocalHRegion(hri, htd, wal).close(); - RegionServerServices rsServices = mock(RegionServerServices.class); - when(rsServices.getServerName()).thenReturn(ServerName.valueOf("localhost:12345", 123456)); - when(rsServices.getConfiguration()).thenReturn(TEST_UTIL.getConfiguration()); - final HRegion region = HRegion.openHRegion(TEST_UTIL.getDataTestDir(), hri, htd, wal, - TEST_UTIL.getConfiguration(), rsServices, null); - - ExecutorService exec = Executors.newFixedThreadPool(2); - - // do a regular write first because of memstore size calculation. - region.put(new Put(b).addColumn(b, b, b)); - - startHoldingForAppend.set(true); - exec.submit(new Runnable() { - @Override - public void run() { + private AbstractFSWAL createHoldingWAL(String testName, AtomicBoolean startHoldingForAppend, + CountDownLatch holdAppend) throws IOException { + AbstractFSWAL wal = newWAL(FS, CommonFSUtils.getRootDir(CONF), testName, + HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null); + wal.init(); + wal.registerWALActionsListener(new WALActionsListener() { + @Override + public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException { + if (startHoldingForAppend.get()) { try { - region.put(new Put(b).addColumn(b, b, b).setDurability(Durability.ASYNC_WAL)); - putFinished.countDown(); - } catch (IOException e) { + holdAppend.await(); + } catch (InterruptedException e) { LOG.error(e.toString(), e); } } - }); + } + }); + return wal; + } - // give the put a chance to start - Threads.sleep(3000); + private HRegion createHoldingHRegion(Configuration conf, TableDescriptor htd, WAL wal) + throws IOException { + RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build(); + ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null); + TEST_UTIL.createLocalHRegion(hri, htd, wal).close(); + RegionServerServices rsServices = mock(RegionServerServices.class); + when(rsServices.getServerName()).thenReturn(ServerName.valueOf("localhost:12345", 123456)); + when(rsServices.getConfiguration()).thenReturn(conf); + return HRegion.openHRegion(TEST_UTIL.getDataTestDir(), hri, htd, wal, conf, rsServices, null); + } - exec.submit(new Runnable() { - @Override - public void run() { - try { - Map closeResult = region.close(); - LOG.info("Close result:" + closeResult); - closeFinished.countDown(); - } catch (IOException e) { - LOG.error(e.toString(), e); - } - } - }); + private void doPutWithAsyncWAL(ExecutorService exec, HRegion region, Put put, + Runnable flushOrCloseRegion, AtomicBoolean startHoldingForAppend, + CountDownLatch flushOrCloseFinished, CountDownLatch holdAppend) + throws InterruptedException, IOException { + // do a regular write first because of memstore size calculation. + region.put(put); + + startHoldingForAppend.set(true); + region.put(new Put(put).setDurability(Durability.ASYNC_WAL)); + + // give the put a chance to start + Threads.sleep(3000); - // give the flush a chance to start. Flush should have got the region lock, and - // should have been waiting on the mvcc complete after this. - Threads.sleep(3000); + exec.submit(flushOrCloseRegion); - // let the append to WAL go through now that the flush already started - holdAppend.countDown(); - putFinished.await(); - closeFinished.await(); + // give the flush a chance to start. Flush should have got the region lock, and + // should have been waiting on the mvcc complete after this. + Threads.sleep(3000); + + // let the append to WAL go through now that the flush already started + holdAppend.countDown(); + flushOrCloseFinished.await(); + } + + // Testcase for HBASE-23181 + @Test + public void testUnflushedSeqIdTrackingWithAsyncWal() throws IOException, InterruptedException { + String testName = currentTest.getMethodName(); + byte[] b = Bytes.toBytes("b"); + TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf("table")) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(b)).build(); + + AtomicBoolean startHoldingForAppend = new AtomicBoolean(false); + CountDownLatch holdAppend = new CountDownLatch(1); + CountDownLatch closeFinished = new CountDownLatch(1); + ExecutorService exec = Executors.newFixedThreadPool(1); + AbstractFSWAL wal = createHoldingWAL(testName, startHoldingForAppend, holdAppend); + // open a new region which uses this WAL + HRegion region = createHoldingHRegion(TEST_UTIL.getConfiguration(), htd, wal); + try { + doPutWithAsyncWAL(exec, region, new Put(b).addColumn(b, b, b), () -> { + try { + Map closeResult = region.close(); + LOG.info("Close result:" + closeResult); + closeFinished.countDown(); + } catch (IOException e) { + LOG.error(e.toString(), e); + } + }, startHoldingForAppend, closeFinished, holdAppend); // now check the region's unflushed seqIds. - long seqId = wal.getEarliestMemStoreSeqNum(hri.getEncodedNameAsBytes()); + long seqId = wal.getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes()); assertEquals("Found seqId for the region which is already closed", HConstants.NO_SEQNUM, seqId); + } finally { + exec.shutdownNow(); + region.close(); + wal.close(); + } + } + private static final Set STORES_TO_FLUSH = + Collections.newSetFromMap(new ConcurrentSkipListMap(Bytes.BYTES_COMPARATOR)); + + // Testcase for HBASE-23157 + @Test + public void testMaxFlushedSequenceIdGoBackwards() throws IOException, InterruptedException { + String testName = currentTest.getMethodName(); + byte[] a = Bytes.toBytes("a"); + byte[] b = Bytes.toBytes("b"); + TableDescriptor htd = TableDescriptorBuilder.newBuilder(TableName.valueOf("table")) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(a)) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(b)).build(); + + AtomicBoolean startHoldingForAppend = new AtomicBoolean(false); + CountDownLatch holdAppend = new CountDownLatch(1); + CountDownLatch flushFinished = new CountDownLatch(1); + ExecutorService exec = Executors.newFixedThreadPool(2); + Configuration conf = new Configuration(TEST_UTIL.getConfiguration()); + conf.setClass(FlushPolicyFactory.HBASE_FLUSH_POLICY_KEY, FlushSpecificStoresPolicy.class, + FlushPolicy.class); + AbstractFSWAL wal = createHoldingWAL(testName, startHoldingForAppend, holdAppend); + // open a new region which uses this WAL + HRegion region = createHoldingHRegion(conf, htd, wal); + try { + Put put = new Put(a).addColumn(a, a, a).addColumn(b, b, b); + doPutWithAsyncWAL(exec, region, put, () -> { + try { + HRegion.FlushResult flushResult = region.flush(true); + LOG.info("Flush result:" + flushResult.getResult()); + LOG.info("Flush succeeded:" + flushResult.isFlushSucceeded()); + flushFinished.countDown(); + } catch (IOException e) { + LOG.error(e.toString(), e); + } + }, startHoldingForAppend, flushFinished, holdAppend); + + // get the max flushed sequence id after the first flush + long maxFlushedSeqId1 = region.getMaxFlushedSeqId(); + + region.put(put); + // this time we only flush family a + STORES_TO_FLUSH.add(a); + region.flush(false); + + // get the max flushed sequence id after the second flush + long maxFlushedSeqId2 = region.getMaxFlushedSeqId(); + // make sure that the maxFlushedSequenceId does not go backwards + assertTrue( + "maxFlushedSeqId1(" + maxFlushedSeqId1 + + ") is not greater than or equal to maxFlushedSeqId2(" + maxFlushedSeqId2 + ")", + maxFlushedSeqId1 <= maxFlushedSeqId2); + } finally { + exec.shutdownNow(); + region.close(); wal.close(); } } + + public static final class FlushSpecificStoresPolicy extends FlushPolicy { + + @Override + public Collection selectStoresToFlush() { + if (STORES_TO_FLUSH.isEmpty()) { + return region.getStores(); + } else { + return STORES_TO_FLUSH.stream().map(region::getStore).collect(Collectors.toList()); + } + } + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java index e3fc8e3811fe..7f0fd7a11367 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java @@ -799,7 +799,7 @@ public void testReplayEditsWrittenIntoWAL() throws Exception { // Add a cache flush, shouldn't have any effect wal.startCacheFlush(regionName, familyNames); - wal.completeCacheFlush(regionName); + wal.completeCacheFlush(regionName, HConstants.NO_SEQNUM); // Add an edit to another family, should be skipped. WALEdit edit = new WALEdit(); @@ -901,7 +901,7 @@ public void testSequentialEditLogSeqNum() throws IOException { wal.doCompleteCacheFlush = true; // allow complete cache flush with the previous seq number got after first // set of edits. - wal.completeCacheFlush(hri.getEncodedNameAsBytes()); + wal.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM); wal.shutdown(); FileStatus[] listStatus = wal.getFiles(); assertNotNull(listStatus); @@ -1084,11 +1084,11 @@ public MockWAL(FileSystem fs, Path rootDir, String logName, Configuration conf) } @Override - public void completeCacheFlush(byte[] encodedRegionName) { + public void completeCacheFlush(byte[] encodedRegionName, long maxFlushedSeqId) { if (!doCompleteCacheFlush) { return; } - super.completeCacheFlush(encodedRegionName); + super.completeCacheFlush(encodedRegionName, maxFlushedSeqId); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSequenceIdAccounting.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSequenceIdAccounting.java index 22b24abfb95f..098dc86461b6 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSequenceIdAccounting.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestSequenceIdAccounting.java @@ -57,12 +57,12 @@ public void testStartCacheFlush() { Map m = new HashMap<>(); m.put(ENCODED_REGION_NAME, HConstants.NO_SEQNUM); assertEquals(HConstants.NO_SEQNUM, (long)sida.startCacheFlush(ENCODED_REGION_NAME, FAMILIES)); - sida.completeCacheFlush(ENCODED_REGION_NAME); + sida.completeCacheFlush(ENCODED_REGION_NAME, HConstants.NO_SEQNUM); long sequenceid = 1; sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid, true); // Only one family so should return NO_SEQNUM still. assertEquals(HConstants.NO_SEQNUM, (long)sida.startCacheFlush(ENCODED_REGION_NAME, FAMILIES)); - sida.completeCacheFlush(ENCODED_REGION_NAME); + sida.completeCacheFlush(ENCODED_REGION_NAME, HConstants.NO_SEQNUM); long currentSequenceId = sequenceid; sida.update(ENCODED_REGION_NAME, FAMILIES, sequenceid, true); final Set otherFamily = new HashSet<>(1); @@ -70,7 +70,7 @@ public void testStartCacheFlush() { sida.update(ENCODED_REGION_NAME, FAMILIES, ++sequenceid, true); // Should return oldest sequence id in the region. assertEquals(currentSequenceId, (long)sida.startCacheFlush(ENCODED_REGION_NAME, otherFamily)); - sida.completeCacheFlush(ENCODED_REGION_NAME); + sida.completeCacheFlush(ENCODED_REGION_NAME, HConstants.NO_SEQNUM); } @Test @@ -101,7 +101,7 @@ public void testAreAllLower() { m.put(ENCODED_REGION_NAME, HConstants.NO_SEQNUM); assertTrue(sida.areAllLower(m, null)); // Let the flush complete and if we ask if the sequenceid is lower, should be yes since no edits - sida.completeCacheFlush(ENCODED_REGION_NAME); + sida.completeCacheFlush(ENCODED_REGION_NAME, HConstants.NO_SEQNUM); m.put(ENCODED_REGION_NAME, sequenceid); assertTrue(sida.areAllLower(m, null)); // Flush again but add sequenceids while we are flushing. @@ -114,7 +114,7 @@ public void testAreAllLower() { sida.startCacheFlush(ENCODED_REGION_NAME, FAMILIES); // The cache flush will clear out all sequenceid accounting by region. assertEquals(HConstants.NO_SEQNUM, sida.getLowestSequenceId(ENCODED_REGION_NAME)); - sida.completeCacheFlush(ENCODED_REGION_NAME); + sida.completeCacheFlush(ENCODED_REGION_NAME, HConstants.NO_SEQNUM); // No new edits have gone in so no sequenceid to work with. assertEquals(HConstants.NO_SEQNUM, sida.getLowestSequenceId(ENCODED_REGION_NAME)); // Make an edit behind all we'll put now into sida. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogProvider.java index 82eefb26a4ce..72e4998e6c8e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogProvider.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestFSHLogProvider.java @@ -34,6 +34,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableName; @@ -178,7 +179,7 @@ WALKeyImpl getWalKey(final byte[] info, final TableName tableName, final long ti */ protected void flushRegion(WAL wal, byte[] regionEncodedName, Set flushedFamilyNames) { wal.startCacheFlush(regionEncodedName, flushedFamilyNames); - wal.completeCacheFlush(regionEncodedName); + wal.completeCacheFlush(regionEncodedName, HConstants.NO_SEQNUM); } @Test @@ -230,7 +231,7 @@ public void testLogCleaning() throws Exception { // archived. We need to append something or writer won't be rolled. addEdits(log, hri2, htd2, 1, scopes2); log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames()); - log.completeCacheFlush(hri.getEncodedNameAsBytes()); + log.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM); log.rollWriter(); int count = AbstractFSWALProvider.getNumRolledLogFiles(log); assertEquals(2, count); @@ -240,7 +241,7 @@ public void testLogCleaning() throws Exception { // flush information addEdits(log, hri2, htd2, 1, scopes2); log.startCacheFlush(hri2.getEncodedNameAsBytes(), htd2.getColumnFamilyNames()); - log.completeCacheFlush(hri2.getEncodedNameAsBytes()); + log.completeCacheFlush(hri2.getEncodedNameAsBytes(), HConstants.NO_SEQNUM); log.rollWriter(); assertEquals(0, AbstractFSWALProvider.getNumRolledLogFiles(log)); } finally { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java index 9fa7be0d31e6..a899bdcb4538 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java @@ -528,7 +528,7 @@ public void testEditAdd() throws IOException { htd.getTableName(), System.currentTimeMillis(), mvcc, scopes), cols); log.sync(txid); log.startCacheFlush(info.getEncodedNameAsBytes(), htd.getColumnFamilyNames()); - log.completeCacheFlush(info.getEncodedNameAsBytes()); + log.completeCacheFlush(info.getEncodedNameAsBytes(), HConstants.NO_SEQNUM); log.shutdown(); Path filename = AbstractFSWALProvider.getCurrentFileName(log); // Now open a reader on the log and assert append worked. @@ -584,7 +584,7 @@ public void testAppend() throws IOException { htd.getTableName(), System.currentTimeMillis(), mvcc, scopes), cols); log.sync(txid); log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames()); - log.completeCacheFlush(hri.getEncodedNameAsBytes()); + log.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM); log.shutdown(); Path filename = AbstractFSWALProvider.getCurrentFileName(log); // Now open a reader on the log and assert append worked.