From ee72ef991e423974937c2319d869c8200cd86bfc Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Sun, 27 Mar 2022 11:32:27 -0700 Subject: [PATCH 1/2] HBASE-25913 Introduce EnvironmentEdge.Clock and Clock.currentTimeAdvancing - Introduce a Clock abstraction into EnvironmentEdge and define Clock#currentTimeAdvancing, which ensures that every call to this method returns an advancing time. --- .../hadoop/hbase/backup/TestBackupDelete.java | 9 ++ .../hbase/util/BaseEnvironmentEdge.java | 105 ++++++++++++++++++ .../hbase/util/DefaultEnvironmentEdge.java | 18 +-- .../hadoop/hbase/util/EnvironmentEdge.java | 65 ++++++++++- .../apache/hadoop/hbase/util/HashedBytes.java | 0 .../util/IncrementingEnvironmentEdge.java | 3 +- .../BoundedIncrementSpinAdvancingClock.java | 94 ++++++++++++++++ .../BoundedIncrementYieldAdvancingClock.java | 95 ++++++++++++++++ .../util/clock/IncrementAdvancingClock.java | 98 ++++++++++++++++ .../hbase/util/clock/SpinAdvancingClock.java | 94 ++++++++++++++++ .../hbase/util/clock/YieldAdvancingClock.java | 41 +++++++ .../util/NonRepeatedEnvironmentEdge.java | 2 +- .../util/TestEnvironmentEdgeManager.java | 27 +++++ .../hbase/util/TimeOffsetEnvironmentEdge.java | 10 ++ ...tedBoundedIncrementSpinAdvancingClock.java | 67 +++++++++++ ...edBoundedIncrementYieldAdvancingClock.java | 67 +++++++++++ .../InstrumentedIncrementAdvancingClock.java | 64 +++++++++++ .../clock/InstrumentedSpinAdvancingClock.java | 57 ++++++++++ .../InstrumentedYieldAdvancingClock.java | 57 ++++++++++ ...estBoundedIncrementSpinAdvancingClock.java | 77 +++++++++++++ ...stBoundedIncrementYieldAdvancingClock.java | 77 +++++++++++++ .../clock/TestIncrementAdvancingClock.java | 52 +++++++++ .../util/clock/TestSpinAdvancingClock.java | 73 ++++++++++++ .../util/clock/TestYieldAdvancingClock.java | 73 ++++++++++++ .../hbase/util/ManualEnvironmentEdge.java | 15 ++- .../hbase/ipc/TestSimpleRpcScheduler.java | 12 ++ .../master/cleaner/TestHFileCleaner.java | 9 ++ .../hadoop/hbase/quotas/TestRateLimiter.java | 10 +- .../regionserver/TestCompactingMemStore.java | 4 +- .../regionserver/TestDefaultMemStore.java | 9 ++ .../hbase/regionserver/TestStoreScanner.java | 12 +- .../compactions/TestCurrentHourProvider.java | 5 +- 32 files changed, 1373 insertions(+), 28 deletions(-) create mode 100644 hbase-common/src/main/java/org/apache/hadoop/hbase/util/BaseEnvironmentEdge.java rename {hbase-server => hbase-common}/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java (100%) create mode 100644 hbase-common/src/main/java/org/apache/hadoop/hbase/util/clock/BoundedIncrementSpinAdvancingClock.java create mode 100644 hbase-common/src/main/java/org/apache/hadoop/hbase/util/clock/BoundedIncrementYieldAdvancingClock.java create mode 100644 hbase-common/src/main/java/org/apache/hadoop/hbase/util/clock/IncrementAdvancingClock.java create mode 100644 hbase-common/src/main/java/org/apache/hadoop/hbase/util/clock/SpinAdvancingClock.java create mode 100644 hbase-common/src/main/java/org/apache/hadoop/hbase/util/clock/YieldAdvancingClock.java create mode 100644 hbase-common/src/test/java/org/apache/hadoop/hbase/util/clock/InstrumentedBoundedIncrementSpinAdvancingClock.java create mode 100644 hbase-common/src/test/java/org/apache/hadoop/hbase/util/clock/InstrumentedBoundedIncrementYieldAdvancingClock.java create mode 100644 hbase-common/src/test/java/org/apache/hadoop/hbase/util/clock/InstrumentedIncrementAdvancingClock.java create mode 100644 hbase-common/src/test/java/org/apache/hadoop/hbase/util/clock/InstrumentedSpinAdvancingClock.java create mode 100644 hbase-common/src/test/java/org/apache/hadoop/hbase/util/clock/InstrumentedYieldAdvancingClock.java create mode 100644 hbase-common/src/test/java/org/apache/hadoop/hbase/util/clock/TestBoundedIncrementSpinAdvancingClock.java create mode 100644 hbase-common/src/test/java/org/apache/hadoop/hbase/util/clock/TestBoundedIncrementYieldAdvancingClock.java create mode 100644 hbase-common/src/test/java/org/apache/hadoop/hbase/util/clock/TestIncrementAdvancingClock.java create mode 100644 hbase-common/src/test/java/org/apache/hadoop/hbase/util/clock/TestSpinAdvancingClock.java create mode 100644 hbase-common/src/test/java/org/apache/hadoop/hbase/util/clock/TestYieldAdvancingClock.java diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDelete.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDelete.java index bc8b346175a6..a56f013860d6 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDelete.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDelete.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.HashedBytes; import org.apache.hadoop.util.ToolRunner; import org.junit.Assert; import org.junit.ClassRule; @@ -118,6 +119,14 @@ public void testBackupPurgeOldBackupsCommand() throws Exception { public long currentTime() { return System.currentTimeMillis() - 2 * 24 * 3600 * 1000 ; } + @Override + public Clock getClock(HashedBytes name) { + return null; + } + @Override + public boolean removeClock(Clock clock) { + return false; + } }); String backupId = fullTableBackup(tableList); assertTrue(checkSucceeded(backupId)); diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/BaseEnvironmentEdge.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/BaseEnvironmentEdge.java new file mode 100644 index 000000000000..ea298840d183 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/BaseEnvironmentEdge.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util; + +import java.lang.reflect.Constructor; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.hadoop.hbase.util.clock.BoundedIncrementYieldAdvancingClock; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Base implementation of an environment edge. + */ +@InterfaceAudience.Private +public abstract class BaseEnvironmentEdge implements EnvironmentEdge { + + // TODO: Hardcoded for now. Should this be configurable? BoundedIncrementYieldAdvancingClock + // is the best option, as determined by microbenchmarks. See HBASE-25913. + private static final Constructor CLOCK_IMPL_CONSTRUCTOR; + static { + try { + CLOCK_IMPL_CONSTRUCTOR = + BoundedIncrementYieldAdvancingClock.class.getConstructor(HashedBytes.class); + } catch (Exception e) { + // If there is a problem this exception will bubble up and ultimately cause a JVM abort, + // which is what we want, because it means a developer failed to update here after + // changing how clocks are constructed. + throw new RuntimeException(e); + } + } + + /** + * A clock instance representing the system time directly, so we introduce no overheads for the + * vast majority of users of EnvironmentEdgeManager.currentTime. + */ + private static final Clock SYSTEM_CLOCK = new Clock() { + final HashedBytes NAME = new HashedBytes(Bytes.toBytes("DEFAULT")); + @Override + public HashedBytes getName() { + return NAME; + } + @Override + public long currentTime() { + return System.currentTimeMillis(); + } + @Override + public long currentTimeAdvancing() { + throw new UnsupportedOperationException("Default clock does not implement currentTimeAdvancing()"); + } + @Override + public void get() { + throw new UnsupportedOperationException("Default clock does not implement get()"); + } + @Override + public boolean remove() { + throw new UnsupportedOperationException("Default clock does not implement remove()"); + } + }; + + @Override + public long currentTime() { + return SYSTEM_CLOCK.currentTime(); + } + + private static final ConcurrentHashMap clockMap = new ConcurrentHashMap<>(); + + @Override + public Clock getClock(final HashedBytes name) { + Clock clock = clockMap.computeIfAbsent(name, k -> { + try { + return (Clock)CLOCK_IMPL_CONSTRUCTOR.newInstance(k); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + clock.get(); // increment reference count + return clock; + } + + @Override + public boolean removeClock(Clock clock) { + if (clock.remove()) { // only remove when refcount drops to zero + if (clockMap.remove(clock.getName()) != null) { + return true; + } + } + return false; + } + +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/DefaultEnvironmentEdge.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/DefaultEnvironmentEdge.java index 422cc165077e..076441832026 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/DefaultEnvironmentEdge.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/DefaultEnvironmentEdge.java @@ -20,19 +20,9 @@ import org.apache.yetus.audience.InterfaceAudience; -/** - * Default implementation of an environment edge. - */ @InterfaceAudience.Private -public class DefaultEnvironmentEdge implements EnvironmentEdge { - /** - * {@inheritDoc} - *

- * This implementation returns {@link System#currentTimeMillis()} - *

- */ - @Override - public long currentTime() { - return System.currentTimeMillis(); - } +public class DefaultEnvironmentEdge extends BaseEnvironmentEdge { + + // No overrides here + } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/EnvironmentEdge.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/EnvironmentEdge.java index 635c2764d2bc..6ce51ab9f98a 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/EnvironmentEdge.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/EnvironmentEdge.java @@ -28,10 +28,71 @@ */ @InterfaceAudience.Private public interface EnvironmentEdge { + /** - * Returns the currentTime. + * Returns the current time using the default clock. + *

+ * This is almost always what you want, unless managing timekeeping with a named clock. * - * @return Current time. + * @return The current time. */ long currentTime(); + + /** + * Get the clock associated with the given identifier. + * @param name clock identifier + * @returns the clock instance for the given identifier + */ + Clock getClock(HashedBytes name); + + /** + * Remove the clock with the given identifier. + * @param clockId clock identifier + * @return true if the clock was removed, false if it did not exist + */ + boolean removeClock(Clock clock); + + /** + * Abstraction for an environment's time source. + */ + public interface Clock { + + /** + * Returns the clock's identifier. + */ + HashedBytes getName(); + + /** + * Returns the current time using a named clock. + * @param clockId clock identifier + * @return The current time, according to the given named clock. + */ + long currentTime(); + + /** + * Returns the current time using a named clock. Ensure the clock advanced by + * at least one tick before returning. + *

+ * This method may block the current thread's execution or cause it to yield. + * @param clockId clock identifier + * @return The current time, according to the given named clock. + * @throws InterruptedException if interrupted while waiting for the clock to advance + */ + default long currentTimeAdvancing() throws InterruptedException { + throw new UnsupportedOperationException("BaseClock does not implement currentTimeAdvancing"); + } + + /** + * Called to increment the reference count of the clock. + */ + void get(); + + /** + * Called when the clock is removed. + * @return true if the reference count is zero, false otherwise. + */ + boolean remove(); + + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java similarity index 100% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java rename to hbase-common/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/IncrementingEnvironmentEdge.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/IncrementingEnvironmentEdge.java index 0a38bf4605d2..06f4417821ba 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/IncrementingEnvironmentEdge.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/IncrementingEnvironmentEdge.java @@ -24,7 +24,7 @@ * Uses an incrementing algorithm instead of the default. */ @InterfaceAudience.Private -public class IncrementingEnvironmentEdge implements EnvironmentEdge { +public class IncrementingEnvironmentEdge extends BaseEnvironmentEdge { private long timeIncrement; @@ -62,4 +62,5 @@ public synchronized long incrementTime(long amount) { timeIncrement += amount; return timeIncrement; } + } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/clock/BoundedIncrementSpinAdvancingClock.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/clock/BoundedIncrementSpinAdvancingClock.java new file mode 100644 index 000000000000..cdfbbf31bdcd --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/clock/BoundedIncrementSpinAdvancingClock.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util.clock; + +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.hbase.util.HashedBytes; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * BoundedIncrementSpinAdvancingClock uses AtomicLong#updateAndGet to increment our + * clock's notion of current tick at each method call until currentTimeMillis arrives + * at a time greater than our clock's notion of current tick, up to a certain bound, + * at which point it will spin until the system clock catches + * up. + */ +@InterfaceAudience.Private +public class BoundedIncrementSpinAdvancingClock extends IncrementAdvancingClock { + + static final int MAX_ADVANCE = 1000; // one second in milliseconds + + protected AtomicLong currentAdvance = new AtomicLong(0); + + public BoundedIncrementSpinAdvancingClock(HashedBytes name) { + super(name); + } + + @Override + public long currentTime() { + return super.currentTime(); + } + + @Override + public long currentTimeAdvancing() throws InterruptedException { + // If we have advanced too far, now we have to wait for the system clock to + // catch up. + if (currentAdvance.incrementAndGet() >= MAX_ADVANCE) { + long now; + while (true) { + now = System.currentTimeMillis(); + if (now > lastTime.get()) { + final long updateTime = now; + return lastTime.updateAndGet(x -> update(updateTime)); + } + spin(); + } + // Otherwise, it's fine to advance our notion of the system time. + } else { + return lastTime.updateAndGet(x -> { + long now = System.currentTimeMillis(); + if (now > x) { + return update(now); + } + return advance(x); + }); + } + } + + // Broken out to inlinable method for subclassing and instrumentation. + protected long advance(long last) { + // System clock hasn't moved forward. Increment our notion of current tick to keep + // the time advancing. + currentAdvance.incrementAndGet(); + return super.advance(last); + } + + // Broken out to inlinable method for subclassing and instrumentation. + protected long update(long now) { + // System clock has moved ahead of our notion of current tick. Move us forward to match. + // We do that just by returning the current time, which was passed in to us as 'n'. + currentAdvance.set(0); + return super.update(now); + } + + // Broken out to inlinable method for subclassing and instrumentation. + protected void spin() throws InterruptedException { + } + +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/clock/BoundedIncrementYieldAdvancingClock.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/clock/BoundedIncrementYieldAdvancingClock.java new file mode 100644 index 000000000000..11fd2f595428 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/clock/BoundedIncrementYieldAdvancingClock.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util.clock; + +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.hbase.util.HashedBytes; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * BoundedIncrementYieldAdvancingClock uses AtomicLong#updateAndGet to increment our + * clock's notion of current tick at each method call until currentTimeMillis arrives + * at a time greater than our clock's notion of current tick, up to a certain bound, + * at which point it will switch to a yielding strategy until the system clock catches + * up. + */ +@InterfaceAudience.Private +public class BoundedIncrementYieldAdvancingClock extends IncrementAdvancingClock { + + static final int MAX_ADVANCE = 1000; // one second in milliseconds + + protected AtomicLong currentAdvance = new AtomicLong(0); + + public BoundedIncrementYieldAdvancingClock(HashedBytes name) { + super(name); + } + + @Override + public long currentTime() { + return super.currentTime(); + } + + @Override + public long currentTimeAdvancing() throws InterruptedException { + // If we have advanced too far, now we have to wait for the system clock to + // catch up. + if (currentAdvance.incrementAndGet() >= MAX_ADVANCE) { + long now; + while (true) { + now = System.currentTimeMillis(); + if (now > lastTime.get()) { + final long updateTime = now; + return lastTime.updateAndGet(x -> update(updateTime)); + } + spin(); + } + // Otherwise, it's fine to advance our notion of the system time. + } else { + return lastTime.updateAndGet(x -> { + long now = System.currentTimeMillis(); + if (now > x) { + return update(now); + } + return advance(x); + }); + } + } + + // Broken out to inlinable method for subclassing and instrumentation. + protected long advance(long last) { + // System clock hasn't moved forward. Increment our notion of current tick to keep + // the time advancing. + currentAdvance.incrementAndGet(); + return super.advance(last); + } + + // Broken out to inlinable method for subclassing and instrumentation. + protected long update(long now) { + // System clock has moved ahead of our notion of current tick. Move us forward to match. + // We do that just by returning the current time, which was passed in to us as 'n'. + currentAdvance.set(0); + return super.update(now); + } + + // Broken out to inlinable method for subclassing and instrumentation. + protected void spin() throws InterruptedException { + Thread.sleep(1, 0); // black magic to yield on all platforms + } + +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/clock/IncrementAdvancingClock.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/clock/IncrementAdvancingClock.java new file mode 100644 index 000000000000..b9f75558e3fb --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/clock/IncrementAdvancingClock.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util.clock; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.hbase.util.HashedBytes; +import org.apache.hadoop.hbase.util.EnvironmentEdge.Clock; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * IncrementAdvancingClock uses AtomicLong#updateAndGet to increment our clock's notion of + * current tick at each method call until currentTimeMillis arrives at a time greater than + * our clock's notion of current tick. + */ +@InterfaceAudience.Private +public class IncrementAdvancingClock implements Clock { + + protected HashedBytes name; + protected AtomicInteger refCount = new AtomicInteger(); + protected AtomicLong lastTime = new AtomicLong(System.currentTimeMillis()); + + public IncrementAdvancingClock(HashedBytes name) { + this.name = name; + } + + @Override + public HashedBytes getName() { + return name; + } + + @Override + public void get() { + refCount.incrementAndGet(); + } + + @Override + public boolean remove() { + return refCount.decrementAndGet() <= 0; + } + + @Override + public long currentTime() { + return lastTime.updateAndGet(x -> { + long now = System.currentTimeMillis(); + if (now > x) { + return update(now); + } + return x; + }); + } + + @Override + public long currentTimeAdvancing() throws InterruptedException { + return lastTime.updateAndGet(x -> { + long now = System.currentTimeMillis(); + if (now > x) { + return update(now); + } + return advance(x); + }); + } + + // Broken out to inlinable method for subclassing and instrumentation. + protected long advance(long last) { + // System clock hasn't moved forward. Increment our notion of current tick to keep + // the time advancing. + return last + 1; + } + + // Broken out to inlinable method for subclassing and instrumentation. + protected long update(long now) { + // System clock has moved ahead of our notion of current tick. Move us forward to match. + // We do that just by returning the current time, which was passed in to us as 'n'. + return now; + } + + // Visible for testing + public int getRefCount() { + return refCount.get(); + } +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/clock/SpinAdvancingClock.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/clock/SpinAdvancingClock.java new file mode 100644 index 000000000000..24590ba4b0bf --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/clock/SpinAdvancingClock.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util.clock; + +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.hbase.util.HashedBytes; +import org.apache.hadoop.hbase.util.EnvironmentEdge.Clock; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * SpinAdvancingClock implements a strategy for currentTimeAdvancing that spins on the CPU + * waiting for the clock to tick over. + */ +@InterfaceAudience.Private +public class SpinAdvancingClock implements Clock { + + protected HashedBytes name; + protected AtomicInteger refCount = new AtomicInteger(); + protected AtomicLong lastTime = new AtomicLong(System.currentTimeMillis()); + + public SpinAdvancingClock(HashedBytes name) { + this.name = name; + } + + @Override + public HashedBytes getName() { + return name; + } + + @Override + public void get() { + refCount.incrementAndGet(); + } + + @Override + public boolean remove() { + return refCount.decrementAndGet() <= 0; + } + + @Override + public long currentTime() { + return lastTime.updateAndGet(x -> { + long now = System.currentTimeMillis(); + if (now > x) { + return update(now); + } + return x; + }); + } + + @Override + public long currentTimeAdvancing() throws InterruptedException { + long now; + while (true) { + now = System.currentTimeMillis(); + if (now > lastTime.get()) { + final long updateTime = now; + return lastTime.updateAndGet(x -> update(updateTime)); + } + spin(); + } + } + + // Broken out to inlinable method for subclassing and instrumentation. + protected void spin() throws InterruptedException { } + + // Broken out to inlinable method for subclassing and instrumentation. + protected long update(long now) { + return now; + } + + // Visible for testing + public int getRefCount() { + return refCount.get(); + } + +} diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/clock/YieldAdvancingClock.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/clock/YieldAdvancingClock.java new file mode 100644 index 000000000000..95514606df23 --- /dev/null +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/clock/YieldAdvancingClock.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util.clock; + +import org.apache.hadoop.hbase.util.HashedBytes; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * YieldAdvancingClock implements a strategy for currentTimeAdvancing that yields the thread + * until the clock ticks over. + */ +@InterfaceAudience.Private +public class YieldAdvancingClock extends SpinAdvancingClock { + + public YieldAdvancingClock(HashedBytes name) { + super(name); + } + + // Broken out to inlinable method for subclassing and instrumentation. + @Override + protected void spin() throws InterruptedException { + Thread.sleep(1, 0); // black magic to yield on all platforms + super.spin(); + } + +} diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/NonRepeatedEnvironmentEdge.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/NonRepeatedEnvironmentEdge.java index c5f320b5c215..eccd3a779cbb 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/NonRepeatedEnvironmentEdge.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/NonRepeatedEnvironmentEdge.java @@ -24,7 +24,7 @@ * An clock which will never return the same clock twice. */ @InterfaceAudience.Private -public class NonRepeatedEnvironmentEdge implements EnvironmentEdge { +public class NonRepeatedEnvironmentEdge extends BaseEnvironmentEdge { private final AtomicLong prevTime = new AtomicLong(0L); diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestEnvironmentEdgeManager.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestEnvironmentEdgeManager.java index 325727c081c5..54ff64ca06e5 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestEnvironmentEdgeManager.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestEnvironmentEdgeManager.java @@ -21,12 +21,17 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; + +import java.lang.reflect.Method; + import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.testclassification.MiscTests; import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.EnvironmentEdge.Clock; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -67,4 +72,26 @@ public void testCurrentTimeInMillis() { verify(mock).currentTime(); assertEquals(expectation, result); } + + @Test + public void testClockRegistration() throws Exception { + DefaultEnvironmentEdge edge = new DefaultEnvironmentEdge(); + HashedBytes key = new HashedBytes(Bytes.toBytes("key")); + Clock clock = edge.getClock(key); + assertNotNull("Clock instance was not created", clock); + Method method; + try { + method = clock.getClass().getMethod("getRefCount"); + } catch (NoSuchMethodException e) { + fail("Clock instance does not have a getRefCount method"); + return; + } + assertEquals("Clock does not have correct reference count after getClock()", + method.invoke(clock), 1); + boolean removed = edge.removeClock(clock); + assertTrue("Clock was not removed", removed); + assertEquals("Clock does not have correct reference count after removeClock()", + method.invoke(clock), 0); + } + } diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TimeOffsetEnvironmentEdge.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TimeOffsetEnvironmentEdge.java index 3d1c32a32855..aea1baf0d7f8 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TimeOffsetEnvironmentEdge.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TimeOffsetEnvironmentEdge.java @@ -35,4 +35,14 @@ public void increment(long incr) { public long currentTime() { return System.currentTimeMillis() + offset; } + + @Override + public Clock getClock(HashedBytes name) { + return null; + } + + @Override + public boolean removeClock(Clock clock) { + return false; + } } diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/clock/InstrumentedBoundedIncrementSpinAdvancingClock.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/clock/InstrumentedBoundedIncrementSpinAdvancingClock.java new file mode 100644 index 000000000000..10ace55b0e69 --- /dev/null +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/clock/InstrumentedBoundedIncrementSpinAdvancingClock.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util.clock; + +import java.util.concurrent.atomic.LongAdder; + +import org.apache.hadoop.hbase.util.HashedBytes; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@InterfaceAudience.Private +public class InstrumentedBoundedIncrementSpinAdvancingClock + extends BoundedIncrementSpinAdvancingClock { + + static final Logger LOG = + LoggerFactory.getLogger(InstrumentedBoundedIncrementSpinAdvancingClock.class); + final LongAdder countOk = new LongAdder(); + final LongAdder countAdvance = new LongAdder(); + final LongAdder countSpins = new LongAdder(); + + public InstrumentedBoundedIncrementSpinAdvancingClock(HashedBytes name) { + super(name); + } + + @Override + protected long advance(long last) { + countAdvance.increment(); + return super.advance(last); + } + + @Override + protected long update(long now) { + countOk.increment(); + return super.update(now); + } + + @Override + protected void spin() throws InterruptedException { + countSpins.increment(); + super.spin(); + } + + @Override + public boolean remove() { + boolean result = super.remove(); + LOG.debug("{}: ok={}, advanced={}, spins={}", getName(), countOk.longValue(), + countAdvance.longValue(), countSpins.longValue()); + return result; + } + +} diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/clock/InstrumentedBoundedIncrementYieldAdvancingClock.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/clock/InstrumentedBoundedIncrementYieldAdvancingClock.java new file mode 100644 index 000000000000..69cf6b0bdfb3 --- /dev/null +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/clock/InstrumentedBoundedIncrementYieldAdvancingClock.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util.clock; + +import java.util.concurrent.atomic.LongAdder; + +import org.apache.hadoop.hbase.util.HashedBytes; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@InterfaceAudience.Private +public class InstrumentedBoundedIncrementYieldAdvancingClock + extends BoundedIncrementYieldAdvancingClock { + + static final Logger LOG = + LoggerFactory.getLogger(InstrumentedBoundedIncrementYieldAdvancingClock.class); + final LongAdder countOk = new LongAdder(); + final LongAdder countAdvance = new LongAdder(); + final LongAdder countYields = new LongAdder(); + + public InstrumentedBoundedIncrementYieldAdvancingClock(HashedBytes name) { + super(name); + } + + @Override + protected long advance(long last) { + countAdvance.increment(); + return super.advance(last); + } + + @Override + protected long update(long now) { + countOk.increment(); + return super.update(now); + } + + @Override + protected void spin() throws InterruptedException { + countYields.increment(); + super.spin(); + } + + @Override + public boolean remove() { + boolean result = super.remove(); + LOG.debug("{}: ok={}, advanced={}, yields={}", getName(), countOk.longValue(), + countAdvance.longValue(), countYields.longValue()); + return result; + } + +} diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/clock/InstrumentedIncrementAdvancingClock.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/clock/InstrumentedIncrementAdvancingClock.java new file mode 100644 index 000000000000..c33ef4a03773 --- /dev/null +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/clock/InstrumentedIncrementAdvancingClock.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util.clock; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; + +import org.apache.hadoop.hbase.util.HashedBytes; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@InterfaceAudience.Private +public class InstrumentedIncrementAdvancingClock extends IncrementAdvancingClock { + + static final Logger LOG = LoggerFactory.getLogger(InstrumentedIncrementAdvancingClock.class); + final LongAdder countOk = new LongAdder(); + final LongAdder countAdvance = new LongAdder(); + final LongAdder currentAdvance = new LongAdder(); + final AtomicLong maxAdvance = new AtomicLong(); + + public InstrumentedIncrementAdvancingClock(HashedBytes name) { + super(name); + } + + @Override + protected long advance(long last) { + countAdvance.increment(); + currentAdvance.increment(); + return super.advance(last); + } + + @Override + protected long update(long now) { + countOk.increment(); + maxAdvance.updateAndGet(u -> Math.max(u, currentAdvance.longValue())); + currentAdvance.reset(); + return super.update(now); + } + + @Override + public boolean remove() { + boolean result = super.remove(); + LOG.debug("{}: ok={}, advanced={}, maxAdvance={}", getName(), countOk.longValue(), + countAdvance.longValue(), maxAdvance.get()); + return result; + } + +} diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/clock/InstrumentedSpinAdvancingClock.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/clock/InstrumentedSpinAdvancingClock.java new file mode 100644 index 000000000000..6136909fe9c8 --- /dev/null +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/clock/InstrumentedSpinAdvancingClock.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util.clock; + +import java.util.concurrent.atomic.LongAdder; + +import org.apache.hadoop.hbase.util.HashedBytes; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@InterfaceAudience.Private +public class InstrumentedSpinAdvancingClock extends SpinAdvancingClock { + + static final Logger LOG = LoggerFactory.getLogger(InstrumentedSpinAdvancingClock.class); + final LongAdder countOk = new LongAdder(); + final LongAdder countSpins = new LongAdder(); + + public InstrumentedSpinAdvancingClock(HashedBytes name) { + super(name); + } + + @Override + protected void spin() throws InterruptedException { + countSpins.increment(); + super.spin(); + } + + @Override + protected long update(long now) { + countOk.increment(); + return super.update(now); + } + + @Override + public boolean remove() { + boolean result = super.remove(); + LOG.debug("{}: ok={}, spins={}", getName(), countOk.longValue(), countSpins.longValue()); + return result; + } + +} diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/clock/InstrumentedYieldAdvancingClock.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/clock/InstrumentedYieldAdvancingClock.java new file mode 100644 index 000000000000..1c498a2a933d --- /dev/null +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/clock/InstrumentedYieldAdvancingClock.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util.clock; + +import java.util.concurrent.atomic.LongAdder; + +import org.apache.hadoop.hbase.util.HashedBytes; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@InterfaceAudience.Private +public class InstrumentedYieldAdvancingClock extends YieldAdvancingClock { + + static final Logger LOG = LoggerFactory.getLogger(InstrumentedYieldAdvancingClock.class); + final LongAdder countOk = new LongAdder(); + final LongAdder countYields = new LongAdder(); + + public InstrumentedYieldAdvancingClock(HashedBytes name) { + super(name); + } + + @Override + protected void spin() throws InterruptedException { + countYields.increment(); + super.spin(); + } + + @Override + protected long update(long now) { + countOk.increment(); + return super.update(now); + } + + @Override + public boolean remove() { + boolean result = super.remove(); + LOG.debug("{}: ok={}, yields={}", getName(), countOk.longValue(), countYields.longValue()); + return result; + } + +} diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/clock/TestBoundedIncrementSpinAdvancingClock.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/clock/TestBoundedIncrementSpinAdvancingClock.java new file mode 100644 index 000000000000..ec63cd812966 --- /dev/null +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/clock/TestBoundedIncrementSpinAdvancingClock.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util.clock; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.testclassification.MiscTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdge.Clock; +import org.apache.hadoop.hbase.util.HashedBytes; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Category({MiscTests.class, SmallTests.class}) +public class TestBoundedIncrementSpinAdvancingClock { + + final Logger LOG = LoggerFactory.getLogger(TestBoundedIncrementSpinAdvancingClock.class); + final HashedBytes KEY = new HashedBytes(Bytes.toBytes("key")); + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestBoundedIncrementSpinAdvancingClock.class); + + @Test + public void testAdvance() throws Exception { + Clock clock = new BoundedIncrementSpinAdvancingClock(KEY); + long last = clock.currentTime(); + for (int i = 0; i < 100; i++) { + long now = clock.currentTimeAdvancing(); + assertTrue("Time did not advance", now > last); + last = now; + } + } + + @Test + public void testAdvanceLimit() throws Exception { + InstrumentedBoundedIncrementSpinAdvancingClock clock = + new InstrumentedBoundedIncrementSpinAdvancingClock(KEY); + boolean advancedTooFar = false; + long last = clock.currentTime(); + for (int i = 0; i < BoundedIncrementSpinAdvancingClock.MAX_ADVANCE * 2; i++) { + long now = clock.currentTimeAdvancing(); + assertTrue("Did not advance", now > last); + last = now; + advancedTooFar |= + ((InstrumentedBoundedIncrementSpinAdvancingClock)clock).currentAdvance.get() > + BoundedIncrementYieldAdvancingClock.MAX_ADVANCE; + } + LOG.info("ok={}, advanced={}, spins={}", + ((InstrumentedBoundedIncrementSpinAdvancingClock)clock).countOk.longValue(), + ((InstrumentedBoundedIncrementSpinAdvancingClock)clock).countAdvance.longValue(), + ((InstrumentedBoundedIncrementSpinAdvancingClock)clock).countSpins.longValue()); + assertFalse("We advanced too far", advancedTooFar); + } + +} diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/clock/TestBoundedIncrementYieldAdvancingClock.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/clock/TestBoundedIncrementYieldAdvancingClock.java new file mode 100644 index 000000000000..83c4d7c56d46 --- /dev/null +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/clock/TestBoundedIncrementYieldAdvancingClock.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util.clock; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.testclassification.MiscTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdge.Clock; +import org.apache.hadoop.hbase.util.HashedBytes; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Category({MiscTests.class, SmallTests.class}) +public class TestBoundedIncrementYieldAdvancingClock { + + final Logger LOG = LoggerFactory.getLogger(TestBoundedIncrementYieldAdvancingClock.class); + final HashedBytes KEY = new HashedBytes(Bytes.toBytes("key")); + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestBoundedIncrementYieldAdvancingClock.class); + + @Test + public void testAdvance() throws Exception { + Clock clock = new BoundedIncrementYieldAdvancingClock(KEY); + long last = clock.currentTime(); + for (int i = 0; i < 100; i++) { + long now = clock.currentTimeAdvancing(); + assertTrue("Time did not advance", now > last); + last = now; + } + } + + @Test + public void testAdvanceLimit() throws Exception { + InstrumentedBoundedIncrementYieldAdvancingClock clock = + new InstrumentedBoundedIncrementYieldAdvancingClock(KEY); + boolean advancedTooFar = false; + long last = clock.currentTime(); + for (int i = 0; i < BoundedIncrementYieldAdvancingClock.MAX_ADVANCE * 2; i++) { + long now = clock.currentTimeAdvancing(); + assertTrue("Did not advance", now > last); + last = now; + advancedTooFar |= + ((InstrumentedBoundedIncrementYieldAdvancingClock)clock).currentAdvance.get() > + BoundedIncrementYieldAdvancingClock.MAX_ADVANCE; + } + LOG.info("ok={}, advanced={}, yield={}", + ((InstrumentedBoundedIncrementYieldAdvancingClock)clock).countOk.longValue(), + ((InstrumentedBoundedIncrementYieldAdvancingClock)clock).countAdvance.longValue(), + ((InstrumentedBoundedIncrementYieldAdvancingClock)clock).countYields.longValue()); + assertFalse("We advanced too far", advancedTooFar); + } + +} diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/clock/TestIncrementAdvancingClock.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/clock/TestIncrementAdvancingClock.java new file mode 100644 index 000000000000..a6fc160e15ed --- /dev/null +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/clock/TestIncrementAdvancingClock.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util.clock; + +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.testclassification.MiscTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdge.Clock; +import org.apache.hadoop.hbase.util.HashedBytes; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category({MiscTests.class, SmallTests.class}) +public class TestIncrementAdvancingClock { + + final HashedBytes KEY = new HashedBytes(Bytes.toBytes("key")); + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestIncrementAdvancingClock.class); + + @Test + public void testAdvance() throws Exception { + Clock clock = new IncrementAdvancingClock(KEY); + long last = clock.currentTime(); + for (int i = 0; i < 100; i++) { + long now = clock.currentTimeAdvancing(); + assertTrue("Time did not advance", now > last); + last = now; + } + } + +} diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/clock/TestSpinAdvancingClock.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/clock/TestSpinAdvancingClock.java new file mode 100644 index 000000000000..17bb0876aadc --- /dev/null +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/clock/TestSpinAdvancingClock.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util.clock; + +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.testclassification.MiscTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdge.Clock; +import org.apache.hadoop.hbase.util.HashedBytes; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Category({MiscTests.class, SmallTests.class}) +public class TestSpinAdvancingClock { + + final Logger LOG = LoggerFactory.getLogger(TestSpinAdvancingClock.class); + final HashedBytes KEY = new HashedBytes(Bytes.toBytes("key")); + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestSpinAdvancingClock.class); + + @Test + public void testAdvance() throws Exception { + Clock clock = new SpinAdvancingClock(KEY); + long last = clock.currentTime(); + for (int i = 0; i < 100; i++) { + long now = clock.currentTimeAdvancing(); + assertTrue("Time did not advance", now > last); + last = now; + } + } + + @Test + public void testSpin() throws Exception { + InstrumentedSpinAdvancingClock clock = new InstrumentedSpinAdvancingClock(KEY); + long last = clock.currentTime(); + for (int i = 0; i < 100; i++) { + // We have to do currentTime immediately before currentTimeAdvancing so both + // are likely to fall within the same clock tick. + long now = clock.currentTimeAdvancing(); + assertTrue("Did not advance", now > last); + last = now; + } + LOG.info("ok={}, spins={}", + ((InstrumentedSpinAdvancingClock)clock).countOk.longValue(), + ((InstrumentedSpinAdvancingClock)clock).countSpins.longValue()); + assertTrue("Did not spin", + ((InstrumentedSpinAdvancingClock)clock).countSpins.longValue() > 0); + } + +} diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/clock/TestYieldAdvancingClock.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/clock/TestYieldAdvancingClock.java new file mode 100644 index 000000000000..71647080216e --- /dev/null +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/clock/TestYieldAdvancingClock.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.util.clock; + +import static org.junit.Assert.assertTrue; + +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.testclassification.MiscTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdge.Clock; +import org.apache.hadoop.hbase.util.HashedBytes; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Category({MiscTests.class, SmallTests.class}) +public class TestYieldAdvancingClock { + + final Logger LOG = LoggerFactory.getLogger(TestYieldAdvancingClock.class); + final HashedBytes KEY = new HashedBytes(Bytes.toBytes("key")); + + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestYieldAdvancingClock.class); + + @Test + public void testAdvance() throws Exception { + Clock clock = new YieldAdvancingClock(KEY); + long last = clock.currentTime(); + for (int i = 0; i < 100; i++) { + long now = clock.currentTimeAdvancing(); + assertTrue("Time did not advance", now > last); + last = now; + } + } + + @Test + public void testYield() throws Exception { + InstrumentedYieldAdvancingClock clock = new InstrumentedYieldAdvancingClock(KEY); + long last = clock.currentTime(); + for (int i = 0; i < 100; i++) { + // We have to do currentTime immediately before currentTimeAdvancing so both + // are likely to fall within the same clock tick. + long now = clock.currentTimeAdvancing(); + assertTrue("Did not advance", now > last); + last = now; + } + LOG.info("ok={}, yields={}", + ((InstrumentedYieldAdvancingClock)clock).countOk.longValue(), + ((InstrumentedYieldAdvancingClock)clock).countYields.longValue()); + assertTrue("Did not yield", + ((InstrumentedYieldAdvancingClock)clock).countYields.longValue() > 0); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ManualEnvironmentEdge.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ManualEnvironmentEdge.java index e5081273d472..353b3bf2c8b7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ManualEnvironmentEdge.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ManualEnvironmentEdge.java @@ -25,10 +25,18 @@ * happen in the same millisecond. */ @InterfaceAudience.Private -public class ManualEnvironmentEdge implements EnvironmentEdge { +public class ManualEnvironmentEdge extends BaseEnvironmentEdge { - // Sometimes 0 ts might have a special value, so lets start with 1 - protected long value = 1L; + protected long value; + + public ManualEnvironmentEdge() { + // Sometimes 0 ts might have a special value, so lets start with 1 + this(1L); + } + + public ManualEnvironmentEdge(long value) { + this.value = value; + } public void setValue(long newValue) { value = newValue; @@ -42,4 +50,5 @@ public void incValue(long addedValue) { public long currentTime() { return this.value; } + } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java index ce283701b47f..05b969174491 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java @@ -52,6 +52,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.HashedBytes; import org.apache.hadoop.hbase.util.Threads; import org.junit.Before; import org.junit.ClassRule; @@ -599,6 +600,17 @@ public long currentTime() { } return System.currentTimeMillis(); } + + @Override + public Clock getClock(HashedBytes name) { + return null; + } + + @Override + public boolean removeClock(Clock clock) { + return false; + } + } // FIX. I don't get this test (St.Ack). When I time this test, the minDelay is > 2 * codel delay diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java index ca41c559f5a6..4e68a86ee7a2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestHFileCleaner.java @@ -44,6 +44,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.HFileArchiveUtil; +import org.apache.hadoop.hbase.util.HashedBytes; import org.apache.hadoop.hbase.util.MockServer; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.junit.AfterClass; @@ -202,6 +203,14 @@ public void testHFileCleaning() throws Exception { public long currentTime() { return createTime; } + @Override + public Clock getClock(HashedBytes name) { + return null; + } + @Override + public boolean removeClock(Clock clock) { + return false; + } }; EnvironmentEdgeManager.injectEdge(setTime); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java index 41fcf923e6d6..185eae51a85f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/quotas/TestRateLimiter.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.HashedBytes; import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; import org.junit.ClassRule; import org.junit.Test; @@ -131,11 +132,18 @@ public void testOverconsumptionFixedIntervalRefillStrategy() throws InterruptedE // fix the current time in order to get the precise value of interval EnvironmentEdge edge = new EnvironmentEdge() { private final long ts = EnvironmentEdgeManager.currentTime(); - @Override public long currentTime() { return ts; } + @Override + public Clock getClock(HashedBytes name) { + return null; + } + @Override + public boolean removeClock(Clock clock) { + return false; + } }; EnvironmentEdgeManager.injectEdge(edge); // 10 resources are available, but we need to consume 20 resources diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java index d0e03d7f01fe..3fa0a94f785d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java @@ -53,8 +53,8 @@ import org.apache.hadoop.hbase.io.util.MemorySizeUtil; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; +import org.apache.hadoop.hbase.util.BaseEnvironmentEdge; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL; @@ -906,7 +906,7 @@ protected int addRowsByKeys(final AbstractMemStore hmc, String[] keys, byte[] va return totalLen; } - private class EnvironmentEdgeForMemstoreTest implements EnvironmentEdge { + private class EnvironmentEdgeForMemstoreTest extends BaseEnvironmentEdge { long t = 1234; @Override diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java index 4800786244c5..eb73e31c82f5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java @@ -60,6 +60,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSTableDescriptors; +import org.apache.hadoop.hbase.util.HashedBytes; import org.apache.hadoop.hbase.wal.WALFactory; import org.junit.AfterClass; import org.junit.Before; @@ -1018,6 +1019,14 @@ public long currentTime() { public void setCurrentTimeMillis(long t) { this.t = t; } + @Override + public Clock getClock(HashedBytes name) { + return null; + } + @Override + public boolean removeClock(Clock clock) { + return false; + } } /** diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java index d12342f64a0f..ec4e5c36179f 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java @@ -43,7 +43,6 @@ import org.apache.hadoop.hbase.ExtendedCellBuilderFactory; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HBaseTestingUtil; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeepDeletedCells; import org.apache.hadoop.hbase.KeyValue; @@ -61,6 +60,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; +import org.apache.hadoop.hbase.util.HashedBytes; import org.junit.ClassRule; import org.junit.Ignore; import org.junit.Rule; @@ -83,7 +83,7 @@ public class TestStoreScanner { private static final String CF_STR = "cf"; private static final byte[] CF = Bytes.toBytes(CF_STR); static Configuration CONF = HBaseConfiguration.create(); - private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); + private ScanInfo scanInfo = new ScanInfo(CONF, CF, 0, Integer.MAX_VALUE, Long.MAX_VALUE, KeepDeletedCells.FALSE, HConstants.DEFAULT_BLOCKSIZE, 0, CellComparator.getInstance(), false); @@ -979,6 +979,14 @@ public void testDeleteMarkerLongevity() throws Exception { public long currentTime() { return now; } + @Override + public Clock getClock(HashedBytes name) { + return null; + } + @Override + public boolean removeClock(Clock clock) { + return false; + } }); KeyValue[] kvs = new KeyValue[]{ /*0*/ new KeyValue(Bytes.toBytes("R1"), Bytes.toBytes("cf"), null, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCurrentHourProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCurrentHourProvider.java index 4a0e1d0fbcba..a7c69024ae9a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCurrentHourProvider.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestCurrentHourProvider.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -59,7 +60,7 @@ public void testWithEnvironmentEdge() { // set a time represent hour 11 long deltaFor11 = TimeZone.getDefault().getRawOffset() - 28800000; long timeFor11 = 1597895561000L - deltaFor11; - EnvironmentEdgeManager.injectEdge(() -> timeFor11); + EnvironmentEdgeManager.injectEdge(new ManualEnvironmentEdge(timeFor11)); CurrentHourProvider.advanceTick(); int hour11 = CurrentHourProvider.getCurrentHour(); if (TimeZone.getDefault().inDaylightTime(new Date(timeFor11))) { @@ -70,7 +71,7 @@ public void testWithEnvironmentEdge() { // set a time represent hour 15 long deltaFor15 = TimeZone.getDefault().getRawOffset() - 28800000; long timeFor15 = 1597907081000L - deltaFor15; - EnvironmentEdgeManager.injectEdge(() -> timeFor15); + EnvironmentEdgeManager.injectEdge(new ManualEnvironmentEdge(timeFor15)); CurrentHourProvider.advanceTick(); int hour15 = CurrentHourProvider.getCurrentHour(); if (TimeZone.getDefault().inDaylightTime(new Date(timeFor15))) { From 71583a2c80e2ddf45a7d80fc0c991ab91d8c402b Mon Sep 17 00:00:00 2001 From: Andrew Purtell Date: Sun, 27 Mar 2022 11:33:07 -0700 Subject: [PATCH 2/2] HBASE-25913 Introduce EnvironmentEdge.Clock and Clock.currentTimeAdvancing - Use a per region Clock in HRegion to ensure the time advances - Update unit tests as needed --- .../hadoop/hbase/backup/TestBackupDelete.java | 28 +++--- .../hbase/util/BaseEnvironmentEdge.java | 3 +- .../hadoop/hbase/util/EnvironmentEdge.java | 8 +- .../apache/hadoop/hbase/util/HashedBytes.java | 13 ++- .../util/IncrementingEnvironmentEdge.java | 44 +++++++++ .../hbase/util/ManualEnvironmentEdge.java | 44 +++++++++ .../BoundedIncrementSpinAdvancingClock.java | 11 +-- .../BoundedIncrementYieldAdvancingClock.java | 13 ++- .../util/clock/IncrementAdvancingClock.java | 2 +- .../hbase/util/clock/SpinAdvancingClock.java | 2 +- .../hbase/util/TimeOffsetEnvironmentEdge.java | 13 +-- .../hadoop/hbase/regionserver/HRegion.java | 88 ++++++++++++------ .../hbase/ipc/TestSimpleRpcScheduler.java | 15 +-- .../regionserver/TestCompactingMemStore.java | 14 +-- .../regionserver/TestDefaultMemStore.java | 42 +++------ .../hbase/regionserver/TestHRegion.java | 92 +++++++++++-------- 16 files changed, 257 insertions(+), 175 deletions(-) rename {hbase-server => hbase-common}/src/main/java/org/apache/hadoop/hbase/util/ManualEnvironmentEdge.java (69%) diff --git a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDelete.java b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDelete.java index a56f013860d6..3d99f8e47546 100644 --- a/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDelete.java +++ b/hbase-backup/src/test/java/org/apache/hadoop/hbase/backup/TestBackupDelete.java @@ -28,9 +28,8 @@ import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.backup.impl.BackupSystemTable; import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.apache.hadoop.hbase.util.EnvironmentEdge; +import org.apache.hadoop.hbase.util.BaseEnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.HashedBytes; import org.apache.hadoop.util.ToolRunner; import org.junit.Assert; import org.junit.ClassRule; @@ -113,21 +112,7 @@ public void testBackupDeleteCommand() throws Exception { public void testBackupPurgeOldBackupsCommand() throws Exception { LOG.info("test backup delete (purge old backups) on a single table with data: command-line"); List tableList = Lists.newArrayList(table1); - EnvironmentEdgeManager.injectEdge(new EnvironmentEdge() { - // time - 2 days - @Override - public long currentTime() { - return System.currentTimeMillis() - 2 * 24 * 3600 * 1000 ; - } - @Override - public Clock getClock(HashedBytes name) { - return null; - } - @Override - public boolean removeClock(Clock clock) { - return false; - } - }); + EnvironmentEdgeManager.injectEdge(new TwoDaysBackEnvironmentEdge()); String backupId = fullTableBackup(tableList); assertTrue(checkSucceeded(backupId)); @@ -169,4 +154,13 @@ public boolean removeClock(Clock clock) { LOG.info(baos.toString()); assertTrue(output.indexOf("Deleted 1 backups") >= 0); } + + static class TwoDaysBackEnvironmentEdge extends BaseEnvironmentEdge { + // time - 2 days + @Override + public long currentTime() { + return System.currentTimeMillis() - 2 * 24 * 3600 * 1000 ; + } + } + } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/BaseEnvironmentEdge.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/BaseEnvironmentEdge.java index ea298840d183..07fd207b59b8 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/BaseEnvironmentEdge.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/BaseEnvironmentEdge.java @@ -60,7 +60,8 @@ public long currentTime() { } @Override public long currentTimeAdvancing() { - throw new UnsupportedOperationException("Default clock does not implement currentTimeAdvancing()"); + throw new UnsupportedOperationException( + "Default clock does not implement currentTimeAdvancing()"); } @Override public void get() { diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/EnvironmentEdge.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/EnvironmentEdge.java index 6ce51ab9f98a..4fd59305a09d 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/EnvironmentEdge.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/EnvironmentEdge.java @@ -41,13 +41,13 @@ public interface EnvironmentEdge { /** * Get the clock associated with the given identifier. * @param name clock identifier - * @returns the clock instance for the given identifier + * @return the clock instance for the given identifier */ Clock getClock(HashedBytes name); /** - * Remove the clock with the given identifier. - * @param clockId clock identifier + * Release the reference to and possible remove this clock. + * @param clock the clock * @return true if the clock was removed, false if it did not exist */ boolean removeClock(Clock clock); @@ -64,7 +64,6 @@ public interface Clock { /** * Returns the current time using a named clock. - * @param clockId clock identifier * @return The current time, according to the given named clock. */ long currentTime(); @@ -74,7 +73,6 @@ public interface Clock { * at least one tick before returning. *

* This method may block the current thread's execution or cause it to yield. - * @param clockId clock identifier * @return The current time, according to the given named clock. * @throws InterruptedException if interrupted while waiting for the clock to advance */ diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java index 774871b38263..fe740214dd68 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/HashedBytes.java @@ -36,7 +36,12 @@ public class HashedBytes { public HashedBytes(byte[] bytes) { this.bytes = bytes; - hashCode = Bytes.hashCode(bytes); + hashCode = Bytes.hashCode(this.bytes); + } + + public HashedBytes(byte[]... bytes) { + this.bytes = Bytes.add(bytes); + hashCode = Bytes.hashCode(this.bytes); } public byte[] getBytes() { @@ -50,10 +55,12 @@ public int hashCode() { @Override public boolean equals(Object obj) { - if (this == obj) + if (this == obj) { return true; - if (obj == null || getClass() != obj.getClass()) + } + if (obj == null || getClass() != obj.getClass()) { return false; + } HashedBytes other = (HashedBytes) obj; return (hashCode == other.hashCode) && Arrays.equals(bytes, other.bytes); } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/IncrementingEnvironmentEdge.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/IncrementingEnvironmentEdge.java index 06f4417821ba..0e1c3dfc1cce 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/IncrementingEnvironmentEdge.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/IncrementingEnvironmentEdge.java @@ -26,6 +26,40 @@ @InterfaceAudience.Private public class IncrementingEnvironmentEdge extends BaseEnvironmentEdge { + class ManualIncrementingClock implements EnvironmentEdge.Clock { + + private HashedBytes name; + + public ManualIncrementingClock(HashedBytes name) { + this.name = name; + } + + @Override + public HashedBytes getName() { + return name; + } + + @Override + public long currentTime() { + return System.currentTimeMillis() + timeIncrement; + } + + @Override + public long currentTimeAdvancing() { + return System.currentTimeMillis() + timeIncrement; + } + + @Override + public void get() { + } + + @Override + public boolean remove() { + return true; + } + + } + private long timeIncrement; /** @@ -63,4 +97,14 @@ public synchronized long incrementTime(long amount) { return timeIncrement; } + @Override + public Clock getClock(HashedBytes name) { + return new ManualIncrementingClock(name); + } + + @Override + public boolean removeClock(Clock clock) { + return true; + } + } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ManualEnvironmentEdge.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ManualEnvironmentEdge.java similarity index 69% rename from hbase-server/src/main/java/org/apache/hadoop/hbase/util/ManualEnvironmentEdge.java rename to hbase-common/src/main/java/org/apache/hadoop/hbase/util/ManualEnvironmentEdge.java index 353b3bf2c8b7..6a85eb8b4413 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ManualEnvironmentEdge.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/ManualEnvironmentEdge.java @@ -27,6 +27,40 @@ @InterfaceAudience.Private public class ManualEnvironmentEdge extends BaseEnvironmentEdge { + class ManualFixedClock implements EnvironmentEdge.Clock { + + private HashedBytes name; + + public ManualFixedClock(HashedBytes name) { + this.name = name; + } + + @Override + public HashedBytes getName() { + return name; + } + + @Override + public long currentTime() { + return value; + } + + @Override + public long currentTimeAdvancing() { + return value; + } + + @Override + public void get() { + } + + @Override + public boolean remove() { + return true; + } + + } + protected long value; public ManualEnvironmentEdge() { @@ -51,4 +85,14 @@ public long currentTime() { return this.value; } + @Override + public Clock getClock(HashedBytes name) { + return new ManualFixedClock(name); + } + + @Override + public boolean removeClock(Clock clock) { + return true; + } + } diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/clock/BoundedIncrementSpinAdvancingClock.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/clock/BoundedIncrementSpinAdvancingClock.java index cdfbbf31bdcd..7d8b6b1eef76 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/clock/BoundedIncrementSpinAdvancingClock.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/clock/BoundedIncrementSpinAdvancingClock.java @@ -50,12 +50,11 @@ public long currentTimeAdvancing() throws InterruptedException { // If we have advanced too far, now we have to wait for the system clock to // catch up. if (currentAdvance.incrementAndGet() >= MAX_ADVANCE) { - long now; while (true) { - now = System.currentTimeMillis(); + long now = System.currentTimeMillis(); if (now > lastTime.get()) { - final long updateTime = now; - return lastTime.updateAndGet(x -> update(updateTime)); + lastTime.set(update(now)); + return now; } spin(); } @@ -76,7 +75,7 @@ protected long advance(long last) { // System clock hasn't moved forward. Increment our notion of current tick to keep // the time advancing. currentAdvance.incrementAndGet(); - return super.advance(last); + return last + 1; } // Broken out to inlinable method for subclassing and instrumentation. @@ -84,7 +83,7 @@ protected long update(long now) { // System clock has moved ahead of our notion of current tick. Move us forward to match. // We do that just by returning the current time, which was passed in to us as 'n'. currentAdvance.set(0); - return super.update(now); + return now; } // Broken out to inlinable method for subclassing and instrumentation. diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/clock/BoundedIncrementYieldAdvancingClock.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/clock/BoundedIncrementYieldAdvancingClock.java index 11fd2f595428..0bdff284837e 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/clock/BoundedIncrementYieldAdvancingClock.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/clock/BoundedIncrementYieldAdvancingClock.java @@ -32,7 +32,7 @@ @InterfaceAudience.Private public class BoundedIncrementYieldAdvancingClock extends IncrementAdvancingClock { - static final int MAX_ADVANCE = 1000; // one second in milliseconds + static final int MAX_ADVANCE = 1000; protected AtomicLong currentAdvance = new AtomicLong(0); @@ -50,12 +50,11 @@ public long currentTimeAdvancing() throws InterruptedException { // If we have advanced too far, now we have to wait for the system clock to // catch up. if (currentAdvance.incrementAndGet() >= MAX_ADVANCE) { - long now; while (true) { - now = System.currentTimeMillis(); + long now = System.currentTimeMillis(); if (now > lastTime.get()) { - final long updateTime = now; - return lastTime.updateAndGet(x -> update(updateTime)); + lastTime.set(update(now)); + return now; } spin(); } @@ -76,7 +75,7 @@ protected long advance(long last) { // System clock hasn't moved forward. Increment our notion of current tick to keep // the time advancing. currentAdvance.incrementAndGet(); - return super.advance(last); + return last + 1; } // Broken out to inlinable method for subclassing and instrumentation. @@ -84,7 +83,7 @@ protected long update(long now) { // System clock has moved ahead of our notion of current tick. Move us forward to match. // We do that just by returning the current time, which was passed in to us as 'n'. currentAdvance.set(0); - return super.update(now); + return now; } // Broken out to inlinable method for subclassing and instrumentation. diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/clock/IncrementAdvancingClock.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/clock/IncrementAdvancingClock.java index b9f75558e3fb..adc97cdeee2d 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/clock/IncrementAdvancingClock.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/clock/IncrementAdvancingClock.java @@ -20,8 +20,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import org.apache.hadoop.hbase.util.HashedBytes; import org.apache.hadoop.hbase.util.EnvironmentEdge.Clock; +import org.apache.hadoop.hbase.util.HashedBytes; import org.apache.yetus.audience.InterfaceAudience; /** diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/clock/SpinAdvancingClock.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/clock/SpinAdvancingClock.java index 24590ba4b0bf..d082a6664c6a 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/clock/SpinAdvancingClock.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/clock/SpinAdvancingClock.java @@ -20,8 +20,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import org.apache.hadoop.hbase.util.HashedBytes; import org.apache.hadoop.hbase.util.EnvironmentEdge.Clock; +import org.apache.hadoop.hbase.util.HashedBytes; import org.apache.yetus.audience.InterfaceAudience; /** diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TimeOffsetEnvironmentEdge.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TimeOffsetEnvironmentEdge.java index aea1baf0d7f8..644bee4d3336 100644 --- a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TimeOffsetEnvironmentEdge.java +++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TimeOffsetEnvironmentEdge.java @@ -21,7 +21,7 @@ import org.apache.yetus.audience.InterfaceAudience; @InterfaceAudience.Private -public class TimeOffsetEnvironmentEdge implements EnvironmentEdge { +public class TimeOffsetEnvironmentEdge extends BaseEnvironmentEdge { private long offset; public TimeOffsetEnvironmentEdge() { @@ -33,16 +33,7 @@ public void increment(long incr) { @Override public long currentTime() { - return System.currentTimeMillis() + offset; + return super.currentTime() + offset; } - @Override - public Clock getClock(HashedBytes name) { - return null; - } - - @Override - public boolean removeClock(Clock clock) { - return false; - } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 538264b87bf5..2770d2a8c07a 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -165,6 +165,7 @@ import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.CoprocessorConfigurationUtil; +import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.HashedBytes; @@ -709,6 +710,8 @@ void sawNoSuchFamily() { private final MultiVersionConcurrencyControl mvcc; + private final EnvironmentEdge.Clock clock; + // Coprocessor host private volatile RegionCoprocessorHost coprocessorHost; @@ -788,6 +791,8 @@ public HRegion(final HRegionFileSystem fs, final WAL wal, final Configuration co this.wal = wal; this.fs = fs; this.mvcc = new MultiVersionConcurrencyControl(getRegionInfo().getShortNameToLog()); + this.clock = EnvironmentEdgeManager.getDelegate() + .getClock(new HashedBytes(getRegionInfo().getEncodedNameAsBytes())); // 'conf' renamed to 'confParam' b/c we use this.conf in the constructor this.baseConf = confParam; @@ -851,7 +856,7 @@ public HRegion(final HRegionFileSystem fs, final WAL wal, final Configuration co /* * timestamp.slop provides a server-side constraint on the timestamp. This - * assumes that you base your TS around EnvironmentEdgeManager.currentTime(). In this case, + * assumes that you base your TS is around the current time. In this case, * throw an error to the user if the user-specified TS is newer than now + * slop. LATEST_TIMESTAMP == don't use this functionality */ @@ -1055,7 +1060,7 @@ private long initializeRegionInternals(final CancelableProgressable reporter, // Initialize flush policy this.flushPolicy = FlushPolicyFactory.create(this, conf); - long lastFlushTime = EnvironmentEdgeManager.currentTime(); + long lastFlushTime = clock.currentTime(); for (HStore store: stores.values()) { this.lastStoreFlushTimeMap.put(store, lastFlushTime); } @@ -1641,6 +1646,7 @@ public Map> close(boolean abort, boolean ignoreStatus) return doClose(abort, status); } } finally { + EnvironmentEdgeManager.getDelegate().removeClock(clock); if (LOG.isDebugEnabled()) { LOG.debug("Region close journal for {}:\n{}", this.getRegionInfo().getEncodedName(), status.prettyPrintJournal()); @@ -1756,7 +1762,7 @@ private Map> doClose(boolean abort, MonitoredTask statu } boolean acquired = false; do { - long start = EnvironmentEdgeManager.currentTime(); + long start = clock.currentTime(); try { acquired = lock.writeLock().tryLock(Math.min(remainingWaitTime, closeWaitInterval), TimeUnit.MILLISECONDS); @@ -1769,7 +1775,7 @@ private Map> doClose(boolean abort, MonitoredTask statu LOG.warn(msg, e); throw (InterruptedIOException) new InterruptedIOException(msg).initCause(e); } - long elapsed = EnvironmentEdgeManager.currentTime() - start; + long elapsed = clock.currentTime() - start; elapsedWaitTime += elapsed; remainingWaitTime -= elapsed; if (canAbort && !acquired && remainingWaitTime > 0) { @@ -1801,9 +1807,9 @@ private Map> doClose(boolean abort, MonitoredTask statu } else { - long start = EnvironmentEdgeManager.currentTime(); + long start = clock.currentTime(); lock.writeLock().lock(); - elapsedWaitTime = EnvironmentEdgeManager.currentTime() - start; + elapsedWaitTime = clock.currentTime() - start; } @@ -1919,6 +1925,7 @@ public Pair> call() throws IOException { writeRegionCloseMarker(wal); } this.closed.set(true); + if (!canFlush) { decrMemStoreSize(this.memStoreSizing.getMemStoreSize()); } else if (this.memStoreSizing.getDataSize() != 0) { @@ -1990,7 +1997,7 @@ public boolean waitForFlushes(long timeout) { return true; } if (!writestate.flushing) return true; - long start = EnvironmentEdgeManager.currentTime(); + long start = clock.currentTime(); long duration = 0; boolean interrupted = false; LOG.debug("waiting for cache flush to complete for region " + this); @@ -2006,7 +2013,7 @@ public boolean waitForFlushes(long timeout) { interrupted = true; break; } finally { - duration = EnvironmentEdgeManager.currentTime() - start; + duration = clock.currentTime() - start; } } } finally { @@ -2666,7 +2673,7 @@ boolean shouldFlushStore(HStore store) { if (this.flushCheckInterval <= 0) { return false; } - long now = EnvironmentEdgeManager.currentTime(); + long now = clock.currentTime(); if (store.timeOfOldestEdit() < now - this.flushCheckInterval) { if (LOG.isDebugEnabled()) { LOG.debug("Flush column family: " + store.getColumnFamilyName() + " of " + @@ -2697,7 +2704,7 @@ boolean shouldFlush(final StringBuilder whyFlush) { if (modifiedFlushCheckInterval <= 0) { //disabled return false; } - long now = EnvironmentEdgeManager.currentTime(); + long now = clock.currentTime(); //if we flushed in the recent past, we don't need to do again now if ((now - getEarliestFlushTimeForAllStores() < modifiedFlushCheckInterval)) { return false; @@ -2771,7 +2778,7 @@ protected PrepareFlushResult internalPrepareFlushCache(WAL wal, long myseqid, // Don't flush when server aborting, it's unsafe throw new IOException("Aborting flush because server is aborted..."); } - final long startTime = EnvironmentEdgeManager.currentTime(); + final long startTime = clock.currentTime(); // If nothing to flush, return, but return with a valid unused sequenceId. // Its needed by bulk upload IIRC. It flushes until no edits in memory so it can insert a // bulk loaded file between memory and existing hfiles. It wants a good seqeunceId that belongs @@ -3123,7 +3130,7 @@ FlushResultImpl internalFlushCacheAndCommit(WAL wal, MonitoredTask status, notifyAll(); // FindBugs NN_NAKED_NOTIFY } - long time = EnvironmentEdgeManager.currentTime() - startTime; + long time = clock.currentTime() - startTime; MemStoreSize mss = prepareResult.totalFlushableSize.getMemStoreSize(); long memstoresize = this.memStoreSizing.getMemStoreSize().getDataSize(); String msg = "Finished flush of" @@ -3845,6 +3852,11 @@ public void checkAndPrepare() throws IOException { // index 0: puts, index 1: deletes, index 2: increments, index 3: append final int[] metrics = {0, 0, 0, 0}; + // TODO: Currently validation is done with system time before acquiring locks and + // updates are done with different timestamps after acquiring locks. This behavior is + // inherited from the code prior to this change. Should be fine, but this is due for + // a rewrite. + visitBatchOperations(true, this.size(), new Visitor() { private long now = EnvironmentEdgeManager.currentTime(); private WALEdit walEdit; @@ -3862,9 +3874,6 @@ public boolean visit(int index) throws IOException { } } if (isOperationPending(index)) { - // TODO: Currently validation is done with current time before acquiring locks and - // updates are done with different timestamps after acquiring locks. This behavior is - // inherited from the code prior to this change. Can this be changed? checkAndPrepareMutation(index, now); } return true; @@ -4514,6 +4523,10 @@ protected void checkAndPreparePut(Put p) throws IOException { @Override public void checkAndPrepare() throws IOException { + // TODO: Currently validation is done with system time before acquiring locks and + // updates are done with different timestamps after acquiring locks. This behavior is + // inherited from the code prior to this change. Should be fine, but this is due for + // a rewrite. long now = EnvironmentEdgeManager.currentTime(); visitBatchOperations(true, this.size(), (int index) -> { checkAndPrepareMutation(index, now); @@ -4699,7 +4712,18 @@ private void doMiniBatchMutate(BatchOperation batchOp) throws IOException { // otherwise, newer puts/deletes/increment/append are not guaranteed to have a newer // timestamp - long now = EnvironmentEdgeManager.currentTime(); + // Use getCurrentTimeAdvancing() to ensure our notion of current time has advanced + // since the last clock read. This enforces the HBASE-24440 invariant on any timestamp + // substitutions we might make. + + long now; + try { + now = clock.currentTimeAdvancing(); + } catch (InterruptedException e) { + // Should not happen, but handle in any case + throw (IOException) new InterruptedIOException().initCause(e); + } + batchOp.prepareMiniBatchOperations(miniBatchOp, now, acquiredRowLocks); // STEP 3. Build WAL edit @@ -4956,11 +4980,15 @@ private CheckAndMutateResult checkAndMutateInternal(CheckAndMutate checkAndMutat // If matches, perform the mutation or the rowMutations if (matches) { - // We have acquired the row lock already. If the system clock is NOT monotonically - // non-decreasing (see HBASE-14070) we should make sure that the mutation has a - // larger timestamp than what was observed via Get. doBatchMutate already does this, but - // there is no way to pass the cellTs. See HBASE-14054. - long now = EnvironmentEdgeManager.currentTime(); + // Use getCurrentTimeAdvancing() to ensure our notion of current time has advanced + // since the last clock read. This enforces the HBASE-24440 invariant on any timestamp + // substitutions we might make. + long now; + try { + now = clock.currentTimeAdvancing(); + } catch (InterruptedException e) { + throw (IOException) new InterruptedIOException().initCause(e); + } long ts = Math.max(now, cellTs); // ensure write is not eclipsed byte[] byteTs = Bytes.toBytes(ts); if (mutation != null) { @@ -5499,7 +5527,7 @@ private long replayRecoveredEdits(final Path edits, Map maxSeqIdIn int interval = this.conf.getInt("hbase.hstore.report.interval.edits", 2000); // How often to send a progress report (default 1/2 master timeout) int period = this.conf.getInt("hbase.hstore.report.period", 300000); - long lastReport = EnvironmentEdgeManager.currentTime(); + long lastReport = clock.currentTime(); if (coprocessorHost != null) { coprocessorHost.preReplayWALs(this.getRegionInfo(), edits); @@ -5518,7 +5546,7 @@ private long replayRecoveredEdits(final Path edits, Map maxSeqIdIn if (intervalEdits >= interval) { // Number of edits interval reached intervalEdits = 0; - long cur = EnvironmentEdgeManager.currentTime(); + long cur = clock.currentTime(); if (lastReport + period <= cur) { status.setStatus("Replaying edits..." + " skipped=" + skippedEdits + @@ -6049,7 +6077,7 @@ private void replayFlushInStores(FlushDescriptor flush, PrepareFlushResult prepa } List flushFiles = storeFlush.getFlushOutputList(); StoreFlushContext ctx = null; - long startTime = EnvironmentEdgeManager.currentTime(); + long startTime = clock.currentTime(); if (prepareFlushResult == null || prepareFlushResult.storeFlushCtxs == null) { ctx = store.createFlushContext(flush.getFlushSequenceNumber(), FlushLifeCycleTracker.DUMMY); } else { @@ -6275,7 +6303,7 @@ void replayWALRegionEventMarker(RegionEventDescriptor regionEvent) throws IOExce } if (store.getMaxSequenceId().orElse(0L) != storeSeqId) { // Record latest flush time if we picked up new files - lastStoreFlushTimeMap.put(store, EnvironmentEdgeManager.currentTime()); + lastStoreFlushTimeMap.put(store, clock.currentTime()); } if (writestate.flushing) { @@ -6919,7 +6947,7 @@ protected RowLock getRowLockInternal(byte[] row, boolean readLock, RowLock prevR if (call.isPresent()) { long deadline = call.get().getDeadline(); if (deadline < Long.MAX_VALUE) { - int timeToDeadline = (int) (deadline - EnvironmentEdgeManager.currentTime()); + int timeToDeadline = (int) (deadline - clock.currentTime()); if (timeToDeadline <= this.rowLockWaitDuration) { reachDeadlineFirst = true; timeout = timeToDeadline; @@ -6998,6 +7026,10 @@ class RowLockContext { this.row = row; } + public byte[] getRow() { + return this.row.getBytes(); + } + RowLockImpl newWriteLock() { Lock l = readWriteLock.writeLock(); return getRowLock(l); @@ -7850,7 +7882,7 @@ private List get(Get get, boolean withCoprocessor, long nonceGroup, long n private List getInternal(Get get, boolean withCoprocessor, long nonceGroup, long nonce) throws IOException { List results = new ArrayList<>(); - long before = EnvironmentEdgeManager.currentTime(); + long before = clock.currentTime(); // pre-get CP hook if (withCoprocessor && (coprocessorHost != null)) { @@ -7887,7 +7919,7 @@ private List getInternal(Get get, boolean withCoprocessor, long nonceGroup void metricsUpdateForGet(List results, long before) { if (this.metricsRegion != null) { - this.metricsRegion.updateGet(EnvironmentEdgeManager.currentTime() - before); + this.metricsRegion.updateGet(clock.currentTime() - before); } if (this.rsServices != null && this.rsServices.getMetrics() != null) { rsServices.getMetrics().updateReadQueryMeter(getRegionInfo().getTable(), 1); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java index 05b969174491..338e9aa7b800 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java @@ -49,10 +49,9 @@ import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RPCTests; +import org.apache.hadoop.hbase.util.BaseEnvironmentEdge; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.HashedBytes; import org.apache.hadoop.hbase.util.Threads; import org.junit.Before; import org.junit.ClassRule; @@ -577,7 +576,7 @@ public void testSoftAndHardQueueLimits() throws Exception { } } - private static final class CoDelEnvironmentEdge implements EnvironmentEdge { + private static final class CoDelEnvironmentEdge extends BaseEnvironmentEdge { private final BlockingQueue timeQ = new LinkedBlockingQueue<>(); @@ -601,16 +600,6 @@ public long currentTime() { return System.currentTimeMillis(); } - @Override - public Clock getClock(HashedBytes name) { - return null; - } - - @Override - public boolean removeClock(Clock clock) { - return false; - } - } // FIX. I don't get this test (St.Ack). When I time this test, the minDelay is > 2 * codel delay diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java index 3fa0a94f785d..b09d0204f189 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactingMemStore.java @@ -53,9 +53,9 @@ import org.apache.hadoop.hbase.io.util.MemorySizeUtil; import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; -import org.apache.hadoop.hbase.util.BaseEnvironmentEdge; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL; import org.junit.After; @@ -355,7 +355,8 @@ public void testUpsertMemstoreSize() throws Exception { @Test public void testUpdateToTimeOfOldestEdit() throws Exception { try { - EnvironmentEdgeForMemstoreTest edge = new EnvironmentEdgeForMemstoreTest(); + ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); + edge.setValue(1234); EnvironmentEdgeManager.injectEdge(edge); long t = memstore.timeOfOldestEdit(); assertEquals(Long.MAX_VALUE, t); @@ -906,15 +907,6 @@ protected int addRowsByKeys(final AbstractMemStore hmc, String[] keys, byte[] va return totalLen; } - private class EnvironmentEdgeForMemstoreTest extends BaseEnvironmentEdge { - long t = 1234; - - @Override - public long currentTime() { - return t; - } - } - static protected class MyCompactingMemStore extends CompactingMemStore { public MyCompactingMemStore(Configuration conf, CellComparator c, HStore store, diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java index eb73e31c82f5..b6c56427f8bf 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultMemStore.java @@ -57,10 +57,9 @@ import org.apache.hadoop.hbase.testclassification.MediumTests; import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSTableDescriptors; -import org.apache.hadoop.hbase.util.HashedBytes; +import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; import org.apache.hadoop.hbase.wal.WALFactory; import org.junit.AfterClass; import org.junit.Before; @@ -884,7 +883,8 @@ public void testUpsertMemstoreSize() throws Exception { @Test public void testUpdateToTimeOfOldestEdit() throws Exception { try { - EnvironmentEdgeForMemstoreTest edge = new EnvironmentEdgeForMemstoreTest(); + ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); + edge.setValue(1234); EnvironmentEdgeManager.injectEdge(edge); DefaultMemStore memstore = new DefaultMemStore(); long t = memstore.timeOfOldestEdit(); @@ -936,21 +936,20 @@ public void testShouldFlush() throws Exception { protected void checkShouldFlush(Configuration conf, boolean expected) throws Exception { try { - EnvironmentEdgeForMemstoreTest edge = new EnvironmentEdgeForMemstoreTest(); + ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); + edge.setValue(1234); EnvironmentEdgeManager.injectEdge(edge); HBaseTestingUtil hbaseUtility = new HBaseTestingUtil(conf); String cf = "foo"; HRegion region = hbaseUtility.createTestRegion("foobar", ColumnFamilyDescriptorBuilder.of(cf)); - - edge.setCurrentTimeMillis(1234); Put p = new Put(Bytes.toBytes("r")); p.add(KeyValueTestUtil.create("r", cf, "q", 100, "v")); region.put(p); - edge.setCurrentTimeMillis(1234 + 100); + edge.setValue(1234 + 100); StringBuilder sb = new StringBuilder(); assertTrue(!region.shouldFlush(sb)); - edge.setCurrentTimeMillis(1234 + 10000); + edge.setValue(1234 + 10000); assertTrue(region.shouldFlush(sb) == expected); } finally { EnvironmentEdgeManager.reset(); @@ -966,9 +965,9 @@ public void testShouldFlushMeta() throws Exception { conf.setInt(HRegion.MEMSTORE_PERIODIC_FLUSH_INTERVAL, HRegion.SYSTEM_CACHE_FLUSH_INTERVAL * 10); HBaseTestingUtil hbaseUtility = new HBaseTestingUtil(conf); Path testDir = hbaseUtility.getDataTestDir(); - EnvironmentEdgeForMemstoreTest edge = new EnvironmentEdgeForMemstoreTest(); + ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); + edge.setValue(1234); EnvironmentEdgeManager.injectEdge(edge); - edge.setCurrentTimeMillis(1234); WALFactory wFactory = new WALFactory(conf, "1234"); TableDescriptors tds = new FSTableDescriptors(conf); FSTableDescriptors.tryUpdateMetaTableDescriptor(conf); @@ -983,10 +982,10 @@ public void testShouldFlushMeta() throws Exception { .setStartKey(Bytes.toBytes("row_0200")).setEndKey(Bytes.toBytes("row_0300")).build(); HRegion r = HRegion.createHRegion(hri, testDir, conf, desc, wFactory.getWAL(hri)); addRegionToMETA(meta, r); - edge.setCurrentTimeMillis(1234 + 100); + edge.setValue(1234 + 100); StringBuilder sb = new StringBuilder(); assertTrue(meta.shouldFlush(sb) == false); - edge.setCurrentTimeMillis(edge.currentTime() + HRegion.SYSTEM_CACHE_FLUSH_INTERVAL + 1); + edge.setValue(edge.currentTime() + HRegion.SYSTEM_CACHE_FLUSH_INTERVAL + 1); assertTrue(meta.shouldFlush(sb) == true); } @@ -1010,25 +1009,6 @@ private static void addRegionToMETA(final HRegion meta, final HRegion r) throws meta.put(new Put(row, HConstants.LATEST_TIMESTAMP, familyMap)); } - private class EnvironmentEdgeForMemstoreTest implements EnvironmentEdge { - long t = 1234; - @Override - public long currentTime() { - return t; - } - public void setCurrentTimeMillis(long t) { - this.t = t; - } - @Override - public Clock getClock(HashedBytes name) { - return null; - } - @Override - public boolean removeClock(Clock clock) { - return false; - } - } - /** * Adds {@link #ROW_COUNT} rows and {@link #QUALIFIER_COUNT} * @param hmc Instance to add rows to. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index c63fec84a08f..ca52784262ae 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -155,9 +155,11 @@ import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; +import org.apache.hadoop.hbase.util.EnvironmentEdge.Clock; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; import org.apache.hadoop.hbase.util.HFileArchiveUtil; +import org.apache.hadoop.hbase.util.HashedBytes; import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge; import org.apache.hadoop.hbase.util.ManualEnvironmentEdge; import org.apache.hadoop.hbase.util.Threads; @@ -451,11 +453,14 @@ public void testFlushAndMemstoreSizeCounting() throws Exception { this.region = initHRegion(tableName, method, CONF, family); final WALFactory wals = new WALFactory(CONF, method); try { + Mutation[] mutations = new Mutation[HBaseTestingUtil.ROWS.length]; + int i = 0; for (byte[] row : HBaseTestingUtil.ROWS) { - Put put = new Put(row); - put.addColumn(family, family, row); - region.put(put); + Put p = new Put(row); + p.addColumn(family, family, row); + mutations[i++] = p; } + region.batchMutate(mutations); region.flush(true); // After flush, data size should be zero assertEquals(0, region.getMemStoreDataSize()); @@ -3533,7 +3538,7 @@ public void doTestDelete_AndPostInsert(Delete delete) throws IOException, Interr } @Test - public void testDelete_CheckTimestampUpdated() throws IOException { + public void testDelete_CheckTimestampUpdated() throws Exception { byte[] row1 = Bytes.toBytes("row1"); byte[] col1 = Bytes.toBytes("col1"); byte[] col2 = Bytes.toBytes("col2"); @@ -3553,9 +3558,13 @@ public void testDelete_CheckTimestampUpdated() throws IOException { deleteMap.put(fam1, kvs); region.delete(new Delete(forUnitTestsOnly, HConstants.LATEST_TIMESTAMP, deleteMap)); - // extract the key values out the memstore: + // Extract the key values out the memstore after ensuring the time has advanced + // for the test's comparisons to be valid. // This is kinda hacky, but better than nothing... - long now = EnvironmentEdgeManager.currentTime(); + Clock clock = EnvironmentEdgeManager.getDelegate() + .getClock(new HashedBytes(Bytes.toBytes("foo"))); + long now = clock.currentTimeAdvancing(); + EnvironmentEdgeManager.getDelegate().removeClock(clock); AbstractMemStore memstore = (AbstractMemStore) region.getStore(fam1).memstore; Cell firstCell = memstore.getActive().first(); assertTrue(firstCell.getTimestamp() <= now); @@ -6548,22 +6557,18 @@ public void testReverseScanner_StackOverflow() throws IOException { scan.setReversed(true); InternalScanner scanner = region.getScanner(scan); - // create one storefile contains many rows will be skipped - // to check StoreFileScanner.seekToPreviousRow - for (int i = 10000; i < 20000; i++) { - Put p = new Put(Bytes.toBytes(""+i)); - p.addColumn(cf1, col, Bytes.toBytes("" + i)); - region.put(p); - } - region.flushcache(true, true, FlushLifeCycleTracker.DUMMY); - // create one memstore contains many rows will be skipped // to check MemStoreScanner.seekToPreviousRow - for (int i = 10000; i < 20000; i++) { - Put p = new Put(Bytes.toBytes(""+i)); - p.addColumn(cf1, col, Bytes.toBytes("" + i)); - region.put(p); + Mutation[] mutations = new Mutation[10000]; + for (int i = 0; i < 10000; i++) { + Put p = new Put(Bytes.toBytes("" + (10000 + i))); + p.addColumn(cf1, col, Bytes.toBytes("" + (10000 + i))); + mutations[i] = p; } + region.batchMutate(mutations); + region.flushcache(true, true, FlushLifeCycleTracker.DUMMY); + + region.batchMutate(mutations); List currRow = new ArrayList<>(); boolean hasNext; @@ -6600,11 +6605,14 @@ public void testReverseScanShouldNotScanMemstoreIfReadPtLesser() throws Exceptio // create one memstore contains many rows will be skipped // to check MemStoreScanner.seekToPreviousRow - for (int i = 10000; i < 20000; i++) { - Put p = new Put(Bytes.toBytes("" + i)); - p.addColumn(cf1, col, Bytes.toBytes("" + i)); - region.put(p); + Mutation[] mutations = new Mutation[10000]; + for (int i = 0; i < 10000; i++) { + Put p = new Put(Bytes.toBytes("" + (10000+i))); + p.addColumn(cf1, col, Bytes.toBytes("" + (10000+i))); + mutations[i] = p; } + region.batchMutate(mutations); + List currRow = new ArrayList<>(); boolean hasNext; boolean assertDone = false; @@ -6649,11 +6657,14 @@ public void testReverseScanWhenPutCellsAfterOpenReverseScan() throws Exception { RegionScannerImpl scanner = region.getScanner(scan); // Put a lot of cells that have sequenceIDs grater than the readPt of the reverse scan - for (int i = 100000; i < 200000; i++) { - Put p = new Put(Bytes.toBytes("" + i)); - p.addColumn(cf1, col, Bytes.toBytes("" + i)); - region.put(p); + Mutation[] mutations = new Mutation[100000]; + for (int i = 0; i < 100000; i++) { + Put p = new Put(Bytes.toBytes("" + ( 100000 + i ))); + p.addColumn(cf1, col, Bytes.toBytes("" + ( 100000 + i ))); + mutations[i] = p; } + region.batchMutate(mutations); + List currRow = new ArrayList<>(); boolean hasNext; do { @@ -6930,7 +6941,8 @@ public void run() { @Test public void testCellTTLs() throws IOException { - IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge(); + ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); + edge.setValue(1234); EnvironmentEdgeManager.injectEdge(edge); final byte[] row = Bytes.toBytes("testRow"); @@ -6979,7 +6991,7 @@ public void testCellTTLs() throws IOException { assertNotNull(r.getValue(fam1, q4)); // Increment time to T+5 seconds - edge.incrementTime(5000); + edge.setValue(edge.currentTime() + (5 * 1000) + 1); r = region.get(new Get(row)); assertNull(r.getValue(fam1, q1)); @@ -6988,7 +7000,7 @@ public void testCellTTLs() throws IOException { assertNotNull(r.getValue(fam1, q4)); // Increment time to T+10 seconds - edge.incrementTime(5000); + edge.setValue(edge.currentTime() + (5 * 1000) + 1); r = region.get(new Get(row)); assertNull(r.getValue(fam1, q1)); @@ -6997,7 +7009,7 @@ public void testCellTTLs() throws IOException { assertNotNull(r.getValue(fam1, q4)); // Increment time to T+15 seconds - edge.incrementTime(5000); + edge.setValue(edge.currentTime() + (5 * 1000) + 1); r = region.get(new Get(row)); assertNull(r.getValue(fam1, q1)); @@ -7006,7 +7018,7 @@ public void testCellTTLs() throws IOException { assertNotNull(r.getValue(fam1, q4)); // Increment time to T+20 seconds - edge.incrementTime(10000); + edge.setValue(edge.currentTime() + (5 * 1000) + 1); r = region.get(new Get(row)); assertNull(r.getValue(fam1, q1)); @@ -7023,9 +7035,9 @@ public void testCellTTLs() throws IOException { assertNotNull(val); assertEquals(1L, Bytes.toLong(val)); - // Increment with a TTL of 5 seconds + // Increment with a TTL of 4 seconds Increment incr = new Increment(row).addColumn(fam1, q1, 1L); - incr.setTTL(5000); + incr.setTTL(4000); region.increment(incr); // 2 // New value should be 2 @@ -7035,7 +7047,7 @@ public void testCellTTLs() throws IOException { assertEquals(2L, Bytes.toLong(val)); // Increment time to T+25 seconds - edge.incrementTime(5000); + edge.setValue(edge.currentTime() + (5 * 1000) + 1); // Value should be back to 1 r = region.get(new Get(row)); @@ -7044,7 +7056,7 @@ public void testCellTTLs() throws IOException { assertEquals(1L, Bytes.toLong(val)); // Increment time to T+30 seconds - edge.incrementTime(5000); + edge.setValue(edge.currentTime() + (5 * 1000) + 1); // Original value written at T+20 should be gone now via family TTL r = region.get(new Get(row)); @@ -7121,9 +7133,9 @@ private void checkScan(int expectCellSize) throws IOException{ @Test public void testIncrementTimestampsAreMonotonic() throws IOException { - region = initHRegion(tableName, method, CONF, fam1); ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); EnvironmentEdgeManager.injectEdge(edge); + region = initHRegion(tableName, method, CONF, fam1); edge.setValue(10); Increment inc = new Increment(row); @@ -7146,9 +7158,9 @@ public void testIncrementTimestampsAreMonotonic() throws IOException { @Test public void testAppendTimestampsAreMonotonic() throws IOException { - region = initHRegion(tableName, method, CONF, fam1); ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); EnvironmentEdgeManager.injectEdge(edge); + region = initHRegion(tableName, method, CONF, fam1); edge.setValue(10); Append a = new Append(row); @@ -7177,9 +7189,9 @@ public void testAppendTimestampsAreMonotonic() throws IOException { @Test public void testCheckAndMutateTimestampsAreMonotonic() throws IOException { - region = initHRegion(tableName, method, CONF, fam1); ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); EnvironmentEdgeManager.injectEdge(edge); + region = initHRegion(tableName, method, CONF, fam1); edge.setValue(10); Put p = new Put(row); @@ -7381,9 +7393,9 @@ public Void call() throws Exception { @Test public void testCheckAndRowMutateTimestampsAreMonotonic() throws IOException { - region = initHRegion(tableName, method, CONF, fam1); ManualEnvironmentEdge edge = new ManualEnvironmentEdge(); EnvironmentEdgeManager.injectEdge(edge); + region = initHRegion(tableName, method, CONF, fam1); edge.setValue(10); Put p = new Put(row);