From 57baa7b15a9166e7562e72b274dfea8cdf95c58a Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Sun, 27 Mar 2022 11:34:19 -0700 Subject: [PATCH] HBASE-25975: Row Commit Sequencer Use a row commit sequencer in HRegion to ensure that only the operations that mutate disjoint sets of rows are able to commit within the same clock tick. This maintains the invariant that more than one mutation to a given row will never be committed in the same clock tick. Callers will first acquire row locks for the row(s) the pending mutation will mutate. Then they will use RowCommitSequencer.getRowSequence to ensure that the set of rows about to be mutated do not overlap with those for any other pending mutations in the current clock tick. If an overlap is identified, getRowSequence will yield and loop until there is no longer an overlap and the caller's pending mutation can succeed. --- .../regionserver/MetricsRegionSource.java | 6 + .../regionserver/MetricsRegionSourceImpl.java | 18 +- .../regionserver/MetricsRegionWrapper.java | 4 + .../TestMetricsRegionSourceImpl.java | 5 + .../hadoop/hbase/regionserver/HRegion.java | 251 +++++++++++++++++- .../MetricsRegionWrapperImpl.java | 5 + .../apache/hadoop/hbase/util/HashedBytes.java | 8 +- .../coprocessor/TestAppendTimeRange.java | 3 + .../coprocessor/TestIncrementTimeRange.java | 3 + .../MetricsRegionWrapperStub.java | 5 + .../hbase/regionserver/TestHRegion.java | 214 +++++++++++++-- .../hbase/regionserver/TestMinVersions.java | 10 +- .../hbase/util/TestCoprocessorScanPolicy.java | 3 + 13 files changed, 507 insertions(+), 28 deletions(-) diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSource.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSource.java index b3a556e3d9f2..86690588b81a 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSource.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSource.java @@ -38,6 +38,7 @@ public interface MetricsRegionSource extends Comparable { String NUM_FILES_COMPACTED_COUNT = "numFilesCompactedCount"; String FLUSHES_QUEUED_COUNT = "flushesQueuedCount"; String MAX_FLUSH_QUEUE_SIZE = "maxFlushQueueSize"; + String ROW_SEQUENCING_YIELDS = "rowSequencingYields"; String COMPACTIONS_COMPLETED_DESC = "Number of compactions that have completed."; String COMPACTIONS_FAILED_DESC = "Number of compactions that have failed."; String LAST_MAJOR_COMPACTION_DESC = "Age of the last major compaction in milliseconds."; @@ -57,6 +58,7 @@ public interface MetricsRegionSource extends Comparable { String ROW_READS_ONLY_ON_MEMSTORE_DESC = "Row reads happening completely out of memstore"; String MIXED_ROW_READS = "mixedRowReadsCount"; String MIXED_ROW_READS_ON_STORE_DESC = "Row reads happening out of files and memstore on store"; + String ROW_SEQUENCING_YIELDS_DESC = "Number of yields taken to sequence row commits"; /** * Close the region's metrics as this region is closing. @@ -99,5 +101,9 @@ public interface MetricsRegionSource extends Comparable { */ MetricsRegionAggregateSource getAggregateSource(); + /** + * Update count of row sequencing yields. + */ + void updateRowSequencingYields(); } diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSourceImpl.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSourceImpl.java index 2f7f8074c9df..b3793d8dcd6b 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSourceImpl.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionSourceImpl.java @@ -57,7 +57,7 @@ public class MetricsRegionSourceImpl implements MetricsRegionSource { private final String regionIncrementKey; private final String regionAppendKey; private final String regionScanKey; - + private final String regionRowSequencerYieldsKey; /* * Implementation note: Do not put histograms per region. With hundreds of regions in a server * histograms allocate too many counters. See HBASE-17016. @@ -69,6 +69,8 @@ public class MetricsRegionSourceImpl implements MetricsRegionSource { private final MutableFastCounter regionGet; private final MutableFastCounter regionScan; + private final MutableFastCounter regionRowSequencerYields; + private final int hashCode; public MetricsRegionSourceImpl(MetricsRegionWrapper regionWrapper, @@ -107,6 +109,10 @@ public MetricsRegionSourceImpl(MetricsRegionWrapper regionWrapper, regionScanKey = regionNamePrefix + MetricsRegionServerSource.SCAN_KEY + suffix; regionScan = registry.getCounter(regionScanKey, 0L); + + regionRowSequencerYieldsKey = regionNamePrefix + MetricsRegionSource.ROW_SEQUENCING_YIELDS + + suffix; + regionRowSequencerYields = registry.getCounter(regionRowSequencerYieldsKey, 0L); } @Override @@ -135,6 +141,7 @@ public void close() { registry.removeMetric(regionAppendKey); registry.removeMetric(regionGetKey); registry.removeMetric(regionScanKey); + registry.removeMetric(regionRowSequencerYieldsKey); regionWrapper = null; } @@ -170,6 +177,11 @@ public void updateAppend() { regionAppend.incr(); } + @Override + public void updateRowSequencingYields() { + regionRowSequencerYields.incr(); + } + @Override public MetricsRegionAggregateSource getAggregateSource() { return agg; @@ -302,6 +314,10 @@ void snapshot(MetricsRecordBuilder mrb, boolean ignored) { regionNamePrefix + MetricsRegionSource.MAX_FLUSH_QUEUE_SIZE, MetricsRegionSource.MAX_FLUSH_QUEUE_DESC), this.regionWrapper.getMaxFlushQueueSize()); + mrb.addCounter(Interns.info( + regionNamePrefix + MetricsRegionSource.ROW_SEQUENCING_YIELDS, + MetricsRegionSource.ROW_SEQUENCING_YIELDS_DESC), + this.regionWrapper.getRowSequencingYields()); addCounter(mrb, this.regionWrapper.getMemstoreOnlyRowReadsCount(), MetricsRegionSource.ROW_READS_ONLY_ON_MEMSTORE, MetricsRegionSource.ROW_READS_ONLY_ON_MEMSTORE_DESC); diff --git a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapper.java b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapper.java index 6bf010ce91b4..7de88f7e50b3 100644 --- a/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapper.java +++ b/hbase-hadoop-compat/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapper.java @@ -183,4 +183,8 @@ public interface MetricsRegionWrapper { */ Map getMixedRowReadsCount(); + /** + * @return the number of yields made for row sequencing + */ + long getRowSequencingYields(); } diff --git a/hbase-hadoop-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionSourceImpl.java b/hbase-hadoop-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionSourceImpl.java index 598658a56ccc..0af7225ce01b 100644 --- a/hbase-hadoop-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionSourceImpl.java +++ b/hbase-hadoop-compat/src/test/java/org/apache/hadoop/hbase/regionserver/TestMetricsRegionSourceImpl.java @@ -233,5 +233,10 @@ public Map getMixedRowReadsCount() { map.put("info", 0L); return map; } + + @Override + public long getRowSequencingYields() { + return 0; + } } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 538264b87bf5..b4a5b9907547 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -40,6 +40,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -64,6 +65,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.LongAdder; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; @@ -233,6 +236,8 @@ */ @SuppressWarnings("deprecation") @InterfaceAudience.Private +@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="JLM_JSR166_UTILCONCURRENT_MONITORENTER", + justification="Intentional") public class HRegion implements HeapSize, PropagatingConfigurationObserver, Region { private static final Logger LOG = LoggerFactory.getLogger(HRegion.class); @@ -709,6 +714,8 @@ void sawNoSuchFamily() { private final MultiVersionConcurrencyControl mvcc; + private final RowCommitSequencer rowCommitSequencer; + // Coprocessor host private volatile RegionCoprocessorHost coprocessorHost; @@ -792,6 +799,10 @@ public HRegion(final HRegionFileSystem fs, final WAL wal, final Configuration co // 'conf' renamed to 'confParam' b/c we use this.conf in the constructor this.baseConf = confParam; this.conf = new CompoundConfiguration().add(confParam).addBytesMap(htd.getValues()); + + // rowCommitSequencer depends on this.conf + this.rowCommitSequencer = new RowCommitSequencer(); + this.cellComparator = htd.isMetaTable() || conf.getBoolean(USE_META_CELL_COMPARATOR, DEFAULT_USE_META_CELL_COMPARATOR) ? MetaCellComparator.META_COMPARATOR : CellComparatorImpl.COMPARATOR; @@ -1605,6 +1616,9 @@ public Map> close() throws IOException { public static final String CLOSE_WAIT_INTERVAL = "hbase.regionserver.close.wait.interval.ms"; public static final long DEFAULT_CLOSE_WAIT_INTERVAL = 10000; // 10 seconds + public static final String COMMIT_SEQUENCER_ENABLED_KEY = "hbase.hregion.commit.sequencer.enabled"; + public static final boolean COMMIT_SEQUENCER_ENABLED_DEFAULT = true; + public Map> close(boolean abort) throws IOException { return close(abort, false); } @@ -4514,6 +4528,9 @@ protected void checkAndPreparePut(Put p) throws IOException { @Override public void checkAndPrepare() throws IOException { + // TODO: Currently validation is done with current time before acquiring locks and + // updates are done with different timestamps after acquiring locks. This behavior is + // inherited from the code prior to this change. Can this be changed? long now = EnvironmentEdgeManager.currentTime(); visitBatchOperations(true, this.size(), (int index) -> { checkAndPrepareMutation(index, now); @@ -4683,6 +4700,12 @@ private void doMiniBatchMutate(BatchOperation batchOp) throws IOException { return; } + // Use the row commit sequencer to ensure that only operations that mutate disjoint + // sets of rows are committed within the same clock tick. + // Do this before we take the updatesLock because the sequencer may decide the operation + // will yield. + long now = rowCommitSequencer.getRowSequence(acquiredRowLocks); + // Check for thread interrupt status in case we have been signaled from // #interruptRegionOperation. Do it before we take the lock and disable interrupts for // the WAL append. @@ -4691,15 +4714,15 @@ private void doMiniBatchMutate(BatchOperation batchOp) throws IOException { lock(this.updatesLock.readLock(), miniBatchOp.getReadyToWriteCount()); locked = true; - // From this point until memstore update this operation should not be interrupted. - disableInterrupts(); - // STEP 2. Update mini batch of all operations in progress with LATEST_TIMESTAMP timestamp // We should record the timestamp only after we have acquired the rowLock, // otherwise, newer puts/deletes/increment/append are not guaranteed to have a newer // timestamp - long now = EnvironmentEdgeManager.currentTime(); + // From this point until memstore update this operation should not be interrupted. + disableInterrupts(); + + // Prepare the batch, making any timestamp substitutions as needed batchOp.prepareMiniBatchOperations(miniBatchOp, now, acquiredRowLocks); // STEP 3. Build WAL edit @@ -4956,11 +4979,14 @@ private CheckAndMutateResult checkAndMutateInternal(CheckAndMutate checkAndMutat // If matches, perform the mutation or the rowMutations if (matches) { - // We have acquired the row lock already. If the system clock is NOT monotonically - // non-decreasing (see HBASE-14070) we should make sure that the mutation has a - // larger timestamp than what was observed via Get. doBatchMutate already does this, but - // there is no way to pass the cellTs. See HBASE-14054. - long now = EnvironmentEdgeManager.currentTime(); + + // Use the row commit sequencer to ensure that only operations that mutate disjoint + // sets of rows are committed within the same clock tick. + // Even if we yield it is safe to do this conditionally. The thread will yield but + // the row will remain locked. It will not be possible for any other thread to update + // the value. We don't need to re-read. + long now = rowCommitSequencer.getRowSequence(rowLock); + long ts = Math.max(now, cellTs); // ensure write is not eclipsed byte[] byteTs = Bytes.toBytes(ts); if (mutation != null) { @@ -7081,6 +7107,210 @@ public String toString() { } } + /** + * Sequences the commit of rows such that more than one mutation to a given row will never be + * committed in the same clock tick. + *

+ * Callers will first acquire row locks for the row(s) the pending mutation will mutate. + * Then they will use RowCommitSequencer.getRowSequence to ensure that the set of rows about + * to be mutated are disjoint with respect to all other pending mutations in the current clock + * tick. If an overlap is found, getRowSequence will yield and loop until there is no longer + * an overlap and the caller's pending mutation can proceed. + *

+ * Note: This should all be REMOVED once we use a hybrid logical clock for timekeeping. + */ + public class RowCommitSequencer { + + public static final int ROW_SEQUENCER_SLEEP_TIME = 1; + + private class RowSet { + ReentrantLock lock; + // LinkedHashSet is O(1) insert and O(1) contains, this is important + LinkedHashSet set; + public RowSet() { + lock = new ReentrantLock(); + set = new LinkedHashSet<>(); + } + } + + private AtomicReference rowSet; + private AtomicLong sequence; + private LongAdder yieldCount; + private final boolean enabled; + + public RowCommitSequencer() { + this.enabled = conf.getBoolean(COMMIT_SEQUENCER_ENABLED_KEY, true); + if (this.enabled) { + this.sequence = new AtomicLong(EnvironmentEdgeManager.currentTime()); + this.rowSet = new AtomicReference<>(new RowSet()); + this.yieldCount = new LongAdder(); + } + } + + /** + * Update the current time and take the sequencer lock to prepare for row set updates. + * @param now the current time + */ + // Visible for testing + void updateTime(final long now) throws IOException { + sequence.updateAndGet(x -> { + if (x != now) { + // Time changed, reset the row set. + rowSet.set(new RowSet()); + } + return now; + }); + } + + /** + * Check if one or more of the rows we have acquired locks for would overlap with a commit + * made to a row in the same clock tick. + * @param rowLocks the list of rows locked for the current operation + * @return false if one or more rows overlap with an operation in progress, true otherwise + */ + // Visible for testing + boolean checkAndAddRows(Collection rowLocks) throws IOException { + // For each row, test if the set already contains the row. If there is no mutation + // and the current operation will be allowed to go forward, then add all of its rows + // to the set. + // This operation is going to be O(N*2) number of row locks, so the underlying set + // should have O(1) add and O(1) contains, like LinkedHashSet. + RowSet thisSet = rowSet.get(); + try { + thisSet.lock.lockInterruptibly(); + } catch (InterruptedException e) { + throw (IOException) new InterruptedIOException().initCause(e); + } + try { + for (RowLock l: rowLocks) { + HashedBytes row = ((RowLockImpl)l).context.row; + if (thisSet.set.contains(row)) { + return false; + } + } + for (RowLock l: rowLocks) { + HashedBytes row = ((RowLockImpl)l).context.row; + thisSet.set.add(row); + } + return true; + } finally { + thisSet.lock.unlock(); + } + } + + /** + * Check if the row we have acquired a lock for would overlap with a commit made in the same + * clock tick. + * @param lock the row locked for the current operation + * @return false if the row overlaps with an operation in progress, true otherwise + */ + // Visible for testing + boolean checkAndAddRow(RowLock lock) throws IOException { + RowSet thisSet = rowSet.get(); + try { + thisSet.lock.lockInterruptibly(); + } catch (InterruptedException e) { + throw (IOException) new InterruptedIOException().initCause(e); + } + try { + HashedBytes row = ((RowLockImpl)lock).context.row; + if (thisSet.set.contains(row)) { + return false; + } + thisSet.set.add(row); + return true; + } finally { + thisSet.lock.unlock(); + } + } + + /** + * Get the timestamp to use for substitution as cell timestamps for the current operation + * in progress. + *

+ * This method may yield the thread if one or more of the rows we have acquired locks for + * would overlap with a commit made to a row in the same clock tick, until the system time + * advances. + * @param rowLocks list of row locks accumulated for a batch mutation + * @return the timestamp to use for the current operation + */ + public long getRowSequence(List rowLocks) throws IOException { + while (true) { + long now = EnvironmentEdgeManager.currentTime(); + if (!enabled) { + return now; + } + updateTime(now); + // Now we can check for collisions. + if (checkAndAddRows(rowLocks)) { + // No collision detected, proceed. + return now; + } + try { + // The typical clock resolution on a modern system is ~1ms. Wait times less than + // this may be rounded up to at least the time for one clock tick on some platforms. + yieldCount.increment(); + Thread.sleep(ROW_SEQUENCER_SLEEP_TIME, 0); + } catch (InterruptedException e) { + throw (IOException) new InterruptedIOException().initCause(e); + } + } + } + + /** + * Get the timestamp to use for substitution as cell timestamps for the current operation + * in progress. + *

+ * This method may block if one or more of the rows we have acquired locks for would + * overlap with a commit made to a row in the same clock tick, until the system time + * advances. + * @param rowLock row lock + * @return the timestamp to use for the current operation + */ + public long getRowSequence(RowLock rowLock) throws IOException { + while (true) { + long now = EnvironmentEdgeManager.currentTime(); + if (!enabled) { + return now; + } + updateTime(now); + // Now we can check for collisions. + if (checkAndAddRow(rowLock)) { + // No collision detected, proceed. + return now; + } + try { + // The typical clock resolution on a modern system is ~1ms. Wait times less than + // this may be rounded up to at least the time for one clock tick on some platforms. + yieldCount.increment(); + Thread.sleep(1,0); + } catch (InterruptedException e) { + throw (IOException) new InterruptedIOException().initCause(e); + } + } + } + + /** + * @return the number of times the row sequencer yielded the current threads + */ + public long getYieldCount() { + return yieldCount.sum(); + } + + } + + // Visible for testing + RowCommitSequencer getRowCommitSequencer() { + return this.rowCommitSequencer; + } + + /** + * @return the number of times the row sequencer yielded the current threads + */ + public long getRowSequencingYields() { + return rowCommitSequencer.getYieldCount(); + } + /** * Determines whether multiple column families are present * Precondition: familyPaths is not null @@ -8030,6 +8260,8 @@ public Result increment(Increment increment, long nonceGroup, long nonce) throws /** * @return writeEntry associated with this append */ + @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="NP_NULL_ON_SOME_PATH", + justification="Findbugs doesn't know about Preconditions") private WriteEntry doWALAppend(WALEdit walEdit, Durability durability, List clusterIds, long now, long nonceGroup, long nonce, long origLogSeqNum) throws IOException { Preconditions.checkArgument(walEdit != null && !walEdit.isEmpty(), @@ -8750,4 +8982,5 @@ public void addReadRequestsCount(long readRequestsCount) { public void addWriteRequestsCount(long writeRequestsCount) { this.writeRequestsCount.add(writeRequestsCount); } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java index 1a266f76abb6..f9e8b92abf88 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperImpl.java @@ -246,6 +246,11 @@ public Map getMixedRowReadsCount() { return mixedReadsOnStore; } + @Override + public long getRowSequencingYields() { + return region.getRowSequencingYields(); + } + public class HRegionMetricsWrapperRunnable implements Runnable { @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java index 774871b38263..02db8f3687dc 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java @@ -29,7 +29,7 @@ */ @InterfaceAudience.Private @InterfaceStability.Stable -public class HashedBytes { +public class HashedBytes implements Comparable { private final byte[] bytes; private final int hashCode; @@ -58,8 +58,14 @@ public boolean equals(Object obj) { return (hashCode == other.hashCode) && Arrays.equals(bytes, other.bytes); } + @Override + public int compareTo(HashedBytes that) { + return Bytes.compareTo(this.bytes, that.bytes); + } + @Override public String toString() { return Bytes.toStringBinary(bytes); } + } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAppendTimeRange.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAppendTimeRange.java index 2e40de3112c2..0e7752ba3140 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAppendTimeRange.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAppendTimeRange.java @@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher; +import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.testclassification.CoprocessorTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; @@ -80,6 +81,8 @@ public static void setupBeforeClass() throws Exception { // Make general delay zero rather than default. Timing is off in this // test that depends on an evironment edge that is manually moved forward. util.getConfiguration().setInt(RemoteProcedureDispatcher.DISPATCH_DELAY_CONF_KEY, 0); + // Row commit sequencer won't work because this test uses ManualEnvironmentEdge + util.getConfiguration().setBoolean(HRegion.COMMIT_SEQUENCER_ENABLED_KEY, false); util.startMiniCluster(); EnvironmentEdgeManager.injectEdge(mee); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestIncrementTimeRange.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestIncrementTimeRange.java index 99bf3fa05489..33511db27cc2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestIncrementTimeRange.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestIncrementTimeRange.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.io.TimeRange; import org.apache.hadoop.hbase.procedure2.RemoteProcedureDispatcher; +import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.testclassification.CoprocessorTests; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.util.Bytes; @@ -90,6 +91,8 @@ public static void setupBeforeClass() throws Exception { // Make general delay zero rather than default. Timing is off in this // test that depends on an evironment edge that is manually moved forward. util.getConfiguration().setInt(RemoteProcedureDispatcher.DISPATCH_DELAY_CONF_KEY, 0); + // Row commit sequencer won't work because this test uses ManualEnvironmentEdge + util.getConfiguration().setBoolean(HRegion.COMMIT_SEQUENCER_ENABLED_KEY, false); util.startMiniCluster(); EnvironmentEdgeManager.injectEdge(mee); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperStub.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperStub.java index 4f40f6289cb3..52e37932dcd8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperStub.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/MetricsRegionWrapperStub.java @@ -199,4 +199,9 @@ public Map getMixedRowReadsCount() { map.put("info", 0L); return map; } + + @Override + public long getRowSequencingYields() { + return 0; + } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index c63fec84a08f..200e0381933e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -7121,9 +7121,9 @@ private void checkScan(int expectCellSize) throws IOException{ @Test public void testIncrementTimestampsAreMonotonic() throws IOException { - region = initHRegion(tableName, method, CONF, fam1); ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); EnvironmentEdgeManager.injectEdge(edge); + region = initHRegion(tableName, method, CONF, fam1); edge.setValue(10); Increment inc = new Increment(row); @@ -7146,9 +7146,9 @@ public void testIncrementTimestampsAreMonotonic() throws IOException { @Test public void testAppendTimestampsAreMonotonic() throws IOException { - region = initHRegion(tableName, method, CONF, fam1); ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); EnvironmentEdgeManager.injectEdge(edge); + region = initHRegion(tableName, method, CONF, fam1); edge.setValue(10); Append a = new Append(row); @@ -7177,29 +7177,29 @@ public void testAppendTimestampsAreMonotonic() throws IOException { @Test public void testCheckAndMutateTimestampsAreMonotonic() throws IOException { - region = initHRegion(tableName, method, CONF, fam1); - ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); + IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge(); EnvironmentEdgeManager.injectEdge(edge); + region = initHRegion(tableName, method, CONF, fam1); - edge.setValue(10); + final long t0 = edge.currentTime(); Put p = new Put(row); p.setDurability(Durability.SKIP_WAL); - p.addColumn(fam1, qual1, qual1); + p.addColumn(fam1, qual1, t0, qual1); region.put(p); Result result = region.get(new Get(row)); Cell c = result.getColumnLatestCell(fam1, qual1); assertNotNull(c); - assertEquals(10L, c.getTimestamp()); + assertEquals(t0, c.getTimestamp()); - edge.setValue(1); // clock goes back + edge.incrementTime(-10000); // clock goes back p = new Put(row); p.setDurability(Durability.SKIP_WAL); p.addColumn(fam1, qual1, qual2); region.checkAndMutate(row, fam1, qual1, CompareOperator.EQUAL, new BinaryComparator(qual1), p); result = region.get(new Get(row)); c = result.getColumnLatestCell(fam1, qual1); - assertEquals(10L, c.getTimestamp()); + assertEquals(t0, c.getTimestamp()); assertTrue(Bytes.equals(c.getValueArray(), c.getValueOffset(), c.getValueLength(), qual2, 0, qual2.length)); @@ -7381,22 +7381,22 @@ public Void call() throws Exception { @Test public void testCheckAndRowMutateTimestampsAreMonotonic() throws IOException { - region = initHRegion(tableName, method, CONF, fam1); - ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); + IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge(); EnvironmentEdgeManager.injectEdge(edge); + region = initHRegion(tableName, method, CONF, fam1); - edge.setValue(10); + long t0 = edge.currentTime(); Put p = new Put(row); p.setDurability(Durability.SKIP_WAL); - p.addColumn(fam1, qual1, qual1); + p.addColumn(fam1, qual1, t0, qual1); region.put(p); Result result = region.get(new Get(row)); Cell c = result.getColumnLatestCell(fam1, qual1); assertNotNull(c); - assertEquals(10L, c.getTimestamp()); + assertEquals(t0, c.getTimestamp()); - edge.setValue(1); // clock goes back + edge.incrementTime(-10000); // clock goes back p = new Put(row); p.setDurability(Durability.SKIP_WAL); p.addColumn(fam1, qual1, qual2); @@ -7406,7 +7406,7 @@ public void testCheckAndRowMutateTimestampsAreMonotonic() throws IOException { new BinaryComparator(qual1), rm)); result = region.get(new Get(row)); c = result.getColumnLatestCell(fam1, qual1); - assertEquals(10L, c.getTimestamp()); + assertEquals(t0, c.getTimestamp()); LOG.info("c value " + Bytes.toStringBinary(c.getValueArray(), c.getValueOffset(), c.getValueLength())); @@ -8024,7 +8024,189 @@ public void testRegionOnCoprocessorsWithoutChange() throws IOException { assertEquals(regionCoprocessor, regionCoprocessorAfterOnConfigurationChange); } + @Test + public void testRowCommitSequencer() throws Exception { + final byte[] cf1 = Bytes.toBytes("CF1"); + final byte[][] families = { cf1 }; + final byte[] AAA = Bytes.toBytes("AAA"); + final byte[] AAB = Bytes.toBytes("AAB"); + final byte[] AAC = Bytes.toBytes("AAC"); + final byte[] AAD = Bytes.toBytes("AAD"); + final byte[] AAE = Bytes.toBytes("AAE"); + final byte[] AAF = Bytes.toBytes("AAF"); + final byte[] AAG = Bytes.toBytes("AAG"); + + region = initHRegion(tableName, method, CONF, families); + HRegion.RowCommitSequencer sequencer = region.getRowCommitSequencer(); + + // Tick 1 + + sequencer.updateTime(1); + assertTrue (checkForOverlap(AAA, AAB, AAC)); + assertFalse(checkForOverlap(AAA)); + + // Tick 2 + + sequencer.updateTime(2); + assertTrue (checkForOverlap(AAA)); + + // Tick 3 + + sequencer.updateTime(3); + // Two disjoint batches allowed + assertTrue (checkForOverlap(AAA, AAB, AAC)); + assertTrue (checkForOverlap(AAD, AAE, AAF)); + // Single row mutations that conflict with the batches are disallowed + assertFalse(checkForOverlap(AAA)); + assertFalse(checkForOverlap(AAB)); + assertFalse(checkForOverlap(AAC)); + assertFalse(checkForOverlap(AAD)); + assertFalse(checkForOverlap(AAE)); + assertFalse(checkForOverlap(AAF)); + // Single row mutation that does not conflict is fine + assertTrue (checkForOverlap(AAG)); + + // Tick 4 + + sequencer.updateTime(4); + // Two single row mutations allowed + assertTrue (checkForOverlap(AAA)); + assertTrue (checkForOverlap(AAD)); + // A batch with a conflicting row disallowed + assertFalse(checkForOverlap(AAA, AAB, AAC)); + // Single row mutations in the batch that do not conflict are allowed on their own + // This tests that the check does not add rows which are not going to pass the filter. + assertTrue (checkForOverlap(AAB)); + assertTrue (checkForOverlap(AAC)); + // Another batch with a conflicting row disallowed + assertFalse(checkForOverlap(AAD, AAE, AAF)); + // More row mutations in the batch that do not conflict are allowed on their own + // This tests that the check does not add rows which are not going to pass the filter. + assertTrue (checkForOverlap(AAE)); + assertTrue (checkForOverlap(AAF)); + + // Time jumps forward + // Tick 10 + + sequencer.updateTime(10); + // A batch with a previosuly conflicting row now allowed + assertTrue (checkForOverlap(AAA, AAB, AAC)); + + // Time jumps backwards + // Tick 7 + + sequencer.updateTime(7); + // Same check + assertTrue (checkForOverlap(AAA, AAB, AAC)); + + // Time jumps backwards again + // Tick 6 + + sequencer.updateTime(6); + // Same check + assertTrue (checkForOverlap(AAA, AAB, AAC)); + + // Tick 11 + + // Some interleaving + sequencer.updateTime(11); + assertTrue (checkForOverlap(AAA, AAB)); + assertFalse(checkForOverlap(AAB, AAC)); + assertTrue (checkForOverlap(AAC, AAD)); + assertFalse(checkForOverlap(AAD, AAE)); + assertTrue (checkForOverlap(AAE, AAF)); + assertFalse(checkForOverlap(AAF, AAG)); + assertTrue (checkForOverlap(AAG)); + + // Tick 12 + + sequencer.updateTime(12); + assertTrue (checkForOverlap(AAA, AAB, AAC, AAD, AAE, AAF, AAG)); + assertFalse(checkForOverlap(AAA)); + assertFalse(checkForOverlap(AAB)); + assertFalse(checkForOverlap(AAC)); + assertFalse(checkForOverlap(AAD)); + assertFalse(checkForOverlap(AAE)); + assertFalse(checkForOverlap(AAF)); + assertFalse(checkForOverlap(AAG)); + + // Tick 13 + + sequencer.updateTime(13); + assertTrue (checkForOverlap(AAA)); + assertTrue (checkForOverlap(AAB)); + assertTrue (checkForOverlap(AAC)); + assertTrue (checkForOverlap(AAD)); + assertTrue (checkForOverlap(AAE)); + assertTrue (checkForOverlap(AAF)); + assertTrue (checkForOverlap(AAG)); + } + + @Test + public void testRowCommitSequencerYields() throws Exception { + final byte[] cf1 = Bytes.toBytes("CF1"); + final byte[][] families = { cf1 }; + final byte[] empty = HConstants.EMPTY_BYTE_ARRAY; + final Put putAAA = new Put(Bytes.toBytes("AAA")).addColumn(cf1, empty, Bytes.toBytes(0)); + ManualEnvironmentEdge mee = new ManualEnvironmentEdge(); + mee.setValue(1); + EnvironmentEdgeManager.injectEdge(mee); + try { + region = initHRegion(tableName, method, CONF, families); + Thread ticker = new Thread(() -> { + int i = 2; + while (true) { + i += 1; + LOG.info("Set time to {}", i); + mee.setValue(i); + try { + Thread.sleep(HRegion.RowCommitSequencer.ROW_SEQUENCER_SLEEP_TIME * 10); + } catch (InterruptedException e) { + LOG.info("Interrupted while ticking"); + return; + } + } + }); + Thread committer = new Thread(() -> { + for (int i = 0; i < 100; i++) { + try { + region.put(putAAA); + LOG.info("Put {}", putAAA); + } catch (IOException e) { + LOG.error("Error during put", e); + } + } + }); + ticker.start(); + committer.start(); + committer.join(); + ticker.interrupt(); + ticker.join(); + } finally { + EnvironmentEdgeManager.reset(); + } + + LOG.info("rowSequencingYields: {}", region.getRowSequencingYields()); + // There should be at least 99 yields (there will probably be many more...) + assertTrue(region.getRowSequencingYields() >= 99); + } + + private boolean checkForOverlap(final byte[]... rows) throws IOException { + List list = new ArrayList<>(rows.length); + for (byte[] row: rows) { + list.add(region.getRowLock(row)); + } + try { + return region.getRowCommitSequencer().checkAndAddRows(list); + } finally { + for (RowLock l: list) { + l.release(); + } + } + } + public static class NoOpRegionCoprocessor implements RegionCoprocessor, RegionObserver { // a empty region coprocessor class } + } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMinVersions.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMinVersions.java index 089397313fec..390171721a2d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMinVersions.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMinVersions.java @@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; import org.junit.Assert; +import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Rule; import org.junit.Test; @@ -61,7 +62,8 @@ public class TestMinVersions { public static final HBaseClassTestRule CLASS_RULE = HBaseClassTestRule.forClass(TestMinVersions.class); - HBaseTestingUtil hbu = new HBaseTestingUtil(); + private static final HBaseTestingUtil hbu = new HBaseTestingUtil(); + private final byte[] T0 = Bytes.toBytes("0"); private final byte[] T1 = Bytes.toBytes("1"); private final byte[] T2 = Bytes.toBytes("2"); @@ -73,6 +75,12 @@ public class TestMinVersions { @Rule public TestName name = new TestName(); + @BeforeClass + public static void setUpBeforeClass() { + // Row commit sequencer won't work because this test uses ManualEnvironmentEdge + hbu.getConfiguration().setBoolean(HRegion.COMMIT_SEQUENCER_ENABLED_KEY, false); + } + /** * Verify behavior of getClosestBefore(...) */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java index d5c31236d117..c58c6810058b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestCoprocessorScanPolicy.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.coprocessor.RegionObserver; import org.apache.hadoop.hbase.regionserver.DelegatingInternalScanner; import org.apache.hadoop.hbase.regionserver.FlushLifeCycleTracker; +import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.InternalScanner; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.ScanType; @@ -89,6 +90,8 @@ public class TestCoprocessorScanPolicy { public static void setUpBeforeClass() throws Exception { Configuration conf = TEST_UTIL.getConfiguration(); conf.setStrings(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY, ScanObserver.class.getName()); + // Row commit sequencer won't work because the test uses the ManualEnvironmentEdge + conf.setBoolean(HRegion.COMMIT_SEQUENCER_ENABLED_KEY, false); TEST_UTIL.startMiniCluster(); }