Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkArgument;
import static org.apache.hbase.thirdparty.com.google.common.base.Preconditions.checkNotNull;

import com.google.errorprone.annotations.RestrictedApi;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.Sequencer;
Expand Down Expand Up @@ -686,12 +687,6 @@ public void abortCacheFlush(byte[] encodedRegionName) {
this.sequenceIdAccounting.abortCacheFlush(encodedRegionName);
}

@Override
public long getEarliestMemStoreSeqNum(byte[] encodedRegionName) {
// Used by tests. Deprecated as too subtle for general usage.
return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this method in SequenceIdAccounting still useful?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This method is only being called by UTS, and I will attempt to remove those calls.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's why I say we need to introduce a equivalent version of this method in test code.

}

@Override
public long getEarliestMemStoreSeqNum(byte[] encodedRegionName, byte[] familyName) {
// This method is used by tests and for figuring if we should flush or not because our
Expand Down Expand Up @@ -730,6 +725,12 @@ public final void sync(long txid, boolean forceSync) throws IOException {
TraceUtil.trace(() -> doSync(txid, forceSync), () -> createSpan("WAL.sync"));
}

@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
public SequenceIdAccounting getSequenceIdAccounting() {
return sequenceIdAccounting;
}

/**
* This is a convenience method that computes a new filename with a given file-number.
* @param filenum to use
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;

import com.google.errorprone.annotations.RestrictedApi;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -50,7 +51,7 @@
* </p>
*/
@InterfaceAudience.Private
class SequenceIdAccounting {
public class SequenceIdAccounting {
private static final Logger LOG = LoggerFactory.getLogger(SequenceIdAccounting.class);

/**
Expand Down Expand Up @@ -112,7 +113,9 @@ class SequenceIdAccounting {
* @return Lowest outstanding unflushed sequenceid for <code>encodedRegionName</code>. Will return
* {@link HConstants#NO_SEQNUM} when none.
*/
long getLowestSequenceId(final byte[] encodedRegionName) {
@RestrictedApi(explanation = "Should only be called in tests", link = "",
allowedOnPath = ".*/src/test/.*")
public long getLowestSequenceId(final byte[] encodedRegionName) {
synchronized (this.tieLock) {
Map<?, Long> m = this.flushingSequenceIds.get(encodedRegionName);
long flushingLowest = m != null ? getLowestSequenceId(m) : Long.MAX_VALUE;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,11 +237,6 @@ public WALCoprocessorHost getCoprocessorHost() {
return coprocessorHost;
}

@Override
public long getEarliestMemStoreSeqNum(byte[] encodedRegionName) {
return HConstants.NO_SEQNUM;
}

@Override
public long getEarliestMemStoreSeqNum(byte[] encodedRegionName, byte[] familyName) {
return HConstants.NO_SEQNUM;
Expand Down
10 changes: 0 additions & 10 deletions hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WAL.java
Original file line number Diff line number Diff line change
Expand Up @@ -212,16 +212,6 @@ default void sync(long txid, boolean forceSync) throws IOException {
/** Returns Coprocessor host. */
WALCoprocessorHost getCoprocessorHost();

/**
* Gets the earliest unflushed sequence id in the memstore for the region.
* @param encodedRegionName The region to get the number for.
* @return The earliest/lowest/oldest sequence id if present, HConstants.NO_SEQNUM if absent.
* @deprecated Since version 1.2.0. Removing because not used and exposes subtle internal
* workings. Use {@link #getEarliestMemStoreSeqNum(byte[], byte[])}
*/
@Deprecated
long getEarliestMemStoreSeqNum(byte[] encodedRegionName);

/**
* Gets the earliest unflushed sequence id in the memstore for the store.
* @param encodedRegionName The region to get the number for.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.wal.AbstractTestFSWAL;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
Expand Down Expand Up @@ -162,8 +163,8 @@ public void testSelectiveFlushWhenEnabled() throws IOException {
MemStoreSize cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();

// Get the overall smallest LSN in the region's memstores.
long smallestSeqInRegionCurrentMemstore =
getWAL(region).getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
long smallestSeqInRegionCurrentMemstore = AbstractTestFSWAL
.getEarliestMemStoreSeqNum(getWAL(region), region.getRegionInfo().getEncodedNameAsBytes());

// The overall smallest LSN in the region's memstores should be the same as
// the LSN of the smallest edit in CF1
Expand Down Expand Up @@ -193,8 +194,8 @@ public void testSelectiveFlushWhenEnabled() throws IOException {
cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
totalMemstoreSize = region.getMemStoreDataSize();
smallestSeqInRegionCurrentMemstore =
getWAL(region).getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
smallestSeqInRegionCurrentMemstore = AbstractTestFSWAL.getEarliestMemStoreSeqNum(getWAL(region),
region.getRegionInfo().getEncodedNameAsBytes());

// We should have cleared out only CF1, since we chose the flush thresholds
// and number of puts accordingly.
Expand Down Expand Up @@ -231,8 +232,8 @@ public void testSelectiveFlushWhenEnabled() throws IOException {
cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
totalMemstoreSize = region.getMemStoreDataSize();
smallestSeqInRegionCurrentMemstore =
getWAL(region).getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
smallestSeqInRegionCurrentMemstore = AbstractTestFSWAL.getEarliestMemStoreSeqNum(getWAL(region),
region.getRegionInfo().getEncodedNameAsBytes());

// CF1 and CF2, both should be absent.
assertEquals(0, cf1MemstoreSize.getDataSize());
Expand All @@ -242,6 +243,7 @@ public void testSelectiveFlushWhenEnabled() throws IOException {
// CF3 shouldn't have been touched.
assertEquals(cf3MemstoreSize, oldCF3MemstoreSize);
assertEquals(totalMemstoreSize, cf3MemstoreSize.getDataSize());
assertEquals(smallestSeqInRegionCurrentMemstore, smallestSeqCF3);

// What happens when we hit the memstore limit, but we are not able to find
// any Column Family above the threshold?
Expand Down Expand Up @@ -313,8 +315,8 @@ public void testSelectiveFlushWhenNotEnabled() throws IOException {
cf2MemstoreSize = region.getStore(FAMILY2).getMemStoreSize();
cf3MemstoreSize = region.getStore(FAMILY3).getMemStoreSize();
totalMemstoreSize = region.getMemStoreDataSize();
long smallestSeqInRegionCurrentMemstore =
region.getWAL().getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
long smallestSeqInRegionCurrentMemstore = AbstractTestFSWAL
.getEarliestMemStoreSeqNum(region.getWAL(), region.getRegionInfo().getEncodedNameAsBytes());

// Everything should have been cleared
assertEquals(0, cf1MemstoreSize.getDataSize());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.wal.AbstractTestFSWAL;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
Expand Down Expand Up @@ -177,8 +178,8 @@ public void testSelectiveFlushWithEager() throws IOException {
MemStoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize();

// Get the overall smallest LSN in the region's memstores.
long smallestSeqInRegionCurrentMemstorePhaseI =
getWAL(region).getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
long smallestSeqInRegionCurrentMemstorePhaseI = AbstractTestFSWAL
.getEarliestMemStoreSeqNum(getWAL(region), region.getRegionInfo().getEncodedNameAsBytes());

String s = "\n\n----------------------------------\n"
+ "Upon initial insert and before any flush, size of CF1 is:" + cf1MemstoreSizePhaseI
Expand Down Expand Up @@ -224,8 +225,8 @@ public void testSelectiveFlushWithEager() throws IOException {
MemStoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize();
MemStoreSize cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getMemStoreSize();

long smallestSeqInRegionCurrentMemstorePhaseII =
getWAL(region).getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
long smallestSeqInRegionCurrentMemstorePhaseII = AbstractTestFSWAL
.getEarliestMemStoreSeqNum(getWAL(region), region.getRegionInfo().getEncodedNameAsBytes());
// Find the smallest LSNs for edits wrt to each CF.
long smallestSeqCF1PhaseII = region.getOldestSeqIdOfStore(FAMILY1);
long smallestSeqCF2PhaseII = region.getOldestSeqIdOfStore(FAMILY2);
Expand Down Expand Up @@ -280,8 +281,8 @@ public void testSelectiveFlushWithEager() throws IOException {
MemStoreSize cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getMemStoreSize();
MemStoreSize cf3MemstoreSizePhaseIV = region.getStore(FAMILY3).getMemStoreSize();

long smallestSeqInRegionCurrentMemstorePhaseIV =
getWAL(region).getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
long smallestSeqInRegionCurrentMemstorePhaseIV = AbstractTestFSWAL
.getEarliestMemStoreSeqNum(getWAL(region), region.getRegionInfo().getEncodedNameAsBytes());
long smallestSeqCF1PhaseIV = region.getOldestSeqIdOfStore(FAMILY1);
long smallestSeqCF2PhaseIV = region.getOldestSeqIdOfStore(FAMILY2);
long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3);
Expand Down Expand Up @@ -318,8 +319,8 @@ public void testSelectiveFlushWithEager() throws IOException {
MemStoreSize cf1MemstoreSizePhaseV = region.getStore(FAMILY1).getMemStoreSize();
MemStoreSize cf2MemstoreSizePhaseV = region.getStore(FAMILY2).getMemStoreSize();
MemStoreSize cf3MemstoreSizePhaseV = region.getStore(FAMILY3).getMemStoreSize();
long smallestSeqInRegionCurrentMemstorePhaseV =
getWAL(region).getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
long smallestSeqInRegionCurrentMemstorePhaseV = AbstractTestFSWAL
.getEarliestMemStoreSeqNum(getWAL(region), region.getRegionInfo().getEncodedNameAsBytes());

assertEquals(0, cf1MemstoreSizePhaseV.getDataSize());
assertEquals(MutableSegment.DEEP_OVERHEAD, cf1MemstoreSizePhaseV.getHeapSize());
Expand Down Expand Up @@ -405,8 +406,8 @@ public void testSelectiveFlushWithIndexCompaction() throws IOException {
MemStoreSize cf2MemstoreSizePhaseI = region.getStore(FAMILY2).getMemStoreSize();
MemStoreSize cf3MemstoreSizePhaseI = region.getStore(FAMILY3).getMemStoreSize();
// Get the overall smallest LSN in the region's memstores.
long smallestSeqInRegionCurrentMemstorePhaseI =
getWAL(region).getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
long smallestSeqInRegionCurrentMemstorePhaseI = AbstractTestFSWAL
.getEarliestMemStoreSeqNum(getWAL(region), region.getRegionInfo().getEncodedNameAsBytes());

/*------------------------------------------------------------------------------*/
/* PHASE I - validation */
Expand Down Expand Up @@ -458,8 +459,8 @@ public void testSelectiveFlushWithIndexCompaction() throws IOException {
MemStoreSize cf1MemstoreSizePhaseII = region.getStore(FAMILY1).getMemStoreSize();
MemStoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize();
MemStoreSize cf3MemstoreSizePhaseII = region.getStore(FAMILY3).getMemStoreSize();
long smallestSeqInRegionCurrentMemstorePhaseII =
getWAL(region).getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
long smallestSeqInRegionCurrentMemstorePhaseII = AbstractTestFSWAL
.getEarliestMemStoreSeqNum(getWAL(region), region.getRegionInfo().getEncodedNameAsBytes());
// Find the smallest LSNs for edits wrt to each CF.
long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3);
long totalMemstoreSizePhaseII = region.getMemStoreDataSize();
Expand Down Expand Up @@ -531,8 +532,8 @@ public void testSelectiveFlushWithIndexCompaction() throws IOException {
MemStoreSize cf1MemstoreSizePhaseIV = region.getStore(FAMILY1).getMemStoreSize();
MemStoreSize cf2MemstoreSizePhaseIV = region.getStore(FAMILY2).getMemStoreSize();
MemStoreSize cf3MemstoreSizePhaseIV = region.getStore(FAMILY3).getMemStoreSize();
long smallestSeqInRegionCurrentMemstorePhaseIV =
getWAL(region).getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
long smallestSeqInRegionCurrentMemstorePhaseIV = AbstractTestFSWAL
.getEarliestMemStoreSeqNum(getWAL(region), region.getRegionInfo().getEncodedNameAsBytes());
long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3);

/*------------------------------------------------------------------------------*/
Expand Down Expand Up @@ -563,8 +564,8 @@ public void testSelectiveFlushWithIndexCompaction() throws IOException {
MemStoreSize cf1MemstoreSizePhaseV = region.getStore(FAMILY1).getMemStoreSize();
MemStoreSize cf2MemstoreSizePhaseV = region.getStore(FAMILY2).getMemStoreSize();
MemStoreSize cf3MemstoreSizePhaseV = region.getStore(FAMILY3).getMemStoreSize();
long smallestSeqInRegionCurrentMemstorePhaseV =
getWAL(region).getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
long smallestSeqInRegionCurrentMemstorePhaseV = AbstractTestFSWAL
.getEarliestMemStoreSeqNum(getWAL(region), region.getRegionInfo().getEncodedNameAsBytes());
long totalMemstoreSizePhaseV = region.getMemStoreDataSize();

/*------------------------------------------------------------------------------*/
Expand Down Expand Up @@ -683,8 +684,8 @@ public void testSelectiveFlushAndWALinDataCompaction() throws IOException {

MemStoreSize cf2MemstoreSizePhaseII = region.getStore(FAMILY2).getMemStoreSize();

long smallestSeqInRegionCurrentMemstorePhaseII =
region.getWAL().getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
long smallestSeqInRegionCurrentMemstorePhaseII = AbstractTestFSWAL
.getEarliestMemStoreSeqNum(region.getWAL(), region.getRegionInfo().getEncodedNameAsBytes());
long smallestSeqCF1PhaseII = region.getOldestSeqIdOfStore(FAMILY1);
long smallestSeqCF2PhaseII = region.getOldestSeqIdOfStore(FAMILY2);
long smallestSeqCF3PhaseII = region.getOldestSeqIdOfStore(FAMILY3);
Expand Down Expand Up @@ -713,8 +714,8 @@ public void testSelectiveFlushAndWALinDataCompaction() throws IOException {
region.put(createPut(2, i));
}

long smallestSeqInRegionCurrentMemstorePhaseIII =
region.getWAL().getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
long smallestSeqInRegionCurrentMemstorePhaseIII = AbstractTestFSWAL
.getEarliestMemStoreSeqNum(region.getWAL(), region.getRegionInfo().getEncodedNameAsBytes());
long smallestSeqCF1PhaseIII = region.getOldestSeqIdOfStore(FAMILY1);
long smallestSeqCF2PhaseIII = region.getOldestSeqIdOfStore(FAMILY2);
long smallestSeqCF3PhaseIII = region.getOldestSeqIdOfStore(FAMILY3);
Expand All @@ -731,8 +732,8 @@ public void testSelectiveFlushAndWALinDataCompaction() throws IOException {
cms3.flushInMemory();
region.flush(false);

long smallestSeqInRegionCurrentMemstorePhaseIV =
region.getWAL().getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
long smallestSeqInRegionCurrentMemstorePhaseIV = AbstractTestFSWAL
.getEarliestMemStoreSeqNum(region.getWAL(), region.getRegionInfo().getEncodedNameAsBytes());
long smallestSeqCF1PhaseIV = region.getOldestSeqIdOfStore(FAMILY1);
long smallestSeqCF2PhaseIV = region.getOldestSeqIdOfStore(FAMILY2);
long smallestSeqCF3PhaseIV = region.getOldestSeqIdOfStore(FAMILY3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -624,7 +624,7 @@ public void testUnflushedSeqIdTrackingWithAsyncWal() throws IOException, Interru
}, startHoldingForAppend, closeFinished, holdAppend);

// now check the region's unflushed seqIds.
long seqId = wal.getEarliestMemStoreSeqNum(region.getRegionInfo().getEncodedNameAsBytes());
long seqId = getEarliestMemStoreSeqNum(wal, region.getRegionInfo().getEncodedNameAsBytes());
assertEquals("Found seqId for the region which is already closed", HConstants.NO_SEQNUM,
seqId);
} finally {
Expand All @@ -634,6 +634,16 @@ public void testUnflushedSeqIdTrackingWithAsyncWal() throws IOException, Interru
}
}

public static long getEarliestMemStoreSeqNum(WAL wal, byte[] encodedRegionName) {
if (wal != null) {
if (wal instanceof AbstractFSWAL) {
return ((AbstractFSWAL<?>) wal).getSequenceIdAccounting()
.getLowestSequenceId(encodedRegionName);
}
}
return HConstants.NO_SEQNUM;
}

private static final Set<byte[]> STORES_TO_FLUSH =
Collections.newSetFromMap(new ConcurrentSkipListMap<byte[], Boolean>(Bytes.BYTES_COMPARATOR));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ public void run() {
assertEquals("Region did not flush?", 1, region.getStoreFileList(new byte[][] { b }).size());

// now check the region's unflushed seqIds.
long seqId = log.getEarliestMemStoreSeqNum(hri.getEncodedNameAsBytes());
long seqId = AbstractTestFSWAL.getEarliestMemStoreSeqNum(log, hri.getEncodedNameAsBytes());
assertEquals("Found seqId for the region which is already flushed", HConstants.NO_SEQNUM,
seqId);

Expand Down