From 719db2133fb4ddeb2982a9c0c8218497f4c593c2 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 19 Dec 2017 16:07:31 +0100 Subject: [PATCH 1/2] Move uid lock into LiveVersionMap While the LiveVersionMap is an internal class that belongs to the engine we do rely on some external locking to enforce the desired semantics. Yet, in tests we mimic the outer locking but we don't have any way to enforce or assert on that the lock is actually hold. This change moves the KeyedLock inside the LiveVersionMap that allows the engine to access it as before but enables assertions in the LiveVersionMap to ensure the lock for the modifying or reading key is actually hold. --- .../index/engine/InternalEngine.java | 16 +-- .../index/engine/LiveVersionMap.java | 98 +++++++++---- .../index/engine/LiveVersionMapTests.java | 132 ++++++++++-------- 3 files changed, 153 insertions(+), 93 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index 91b33f5ec60fd..c9b087ce8801e 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -117,8 +117,6 @@ public class InternalEngine extends Engine { // we use the hashed variant since we iterate over it and check removal and additions on existing keys private final LiveVersionMap versionMap = new LiveVersionMap(); - private final KeyedLock keyedLock = new KeyedLock<>(); - private volatile SegmentInfos lastCommittedSegmentInfos; private final IndexThrottle throttle; @@ -551,7 +549,10 @@ public GetResult get(Get get, BiFunction search ensureOpen(); SearcherScope scope; if (get.realtime()) { - VersionValue versionValue = getVersionFromMap(get.uid().bytes()); + VersionValue versionValue = null; + try (Releasable ignore = acquireLock(get.uid())) { // we need to lock here to access the version map to do this truly in RT + versionValue = getVersionFromMap(get.uid().bytes()); + } if (versionValue != null) { if (versionValue.isDelete()) { return GetResult.NOT_EXISTS; @@ -1520,7 +1521,8 @@ private void pruneDeletedTombstones() { // we only need to prune the deletes map; the current/old version maps are cleared on refresh: for (Map.Entry entry : versionMap.getAllTombstones()) { BytesRef uid = entry.getKey(); - try (Releasable ignored = acquireLock(uid)) { // can we do it without this lock on each value? maybe batch to a set and get the lock once per set? + try (Releasable ignored = versionMap.acquireLock(uid)) { + // can we do it without this lock on each value? maybe batch to a set and get the lock once per set? // Must re-get it here, vs using entry.getValue(), in case the uid was indexed/deleted since we pulled the iterator: DeleteVersionValue versionValue = versionMap.getTombstoneUnderLock(uid); @@ -1767,12 +1769,8 @@ protected ReferenceManager getSearcherManager(String source, Sear } } - private Releasable acquireLock(BytesRef uid) { - return keyedLock.acquire(uid); - } - private Releasable acquireLock(Term uid) { - return acquireLock(uid.bytes()); + return versionMap.acquireLock(uid.bytes()); } private long loadCurrentVersionFromIndex(Term uid) throws IOException { diff --git a/core/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java b/core/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java index 5d58081b624dc..81e165c0e5225 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java +++ b/core/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java @@ -23,7 +23,9 @@ import org.apache.lucene.util.Accountable; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.RamUsageEstimator; +import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; +import org.elasticsearch.common.util.concurrent.KeyedLock; import java.io.IOException; import java.util.Collection; @@ -34,6 +36,8 @@ /** Maps _uid value to its version information. */ final class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable { + private final KeyedLock keyedLock = new KeyedLock<>(); + /** * Resets the internal map and adjusts it's capacity as if there were no indexing operations. * This must be called under write lock in the engine @@ -49,7 +53,7 @@ void adjustMapSizeUnderLock() { private static final class VersionLookup { private static final VersionLookup EMPTY = new VersionLookup(Collections.emptyMap()); - private final Map map; + private final Map map; // each version map has a notion of safe / unsafe which allows us to apply certain optimization in the auto-generated ID usecase // where we know that documents can't have any duplicates so we can skip the version map entirely. This reduces @@ -144,22 +148,24 @@ Maps invalidateOldMap() { } // All deletes also go here, and delete "tombstones" are retained after refresh: - private final Map tombstones = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(); + private final Map tombstones = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(); private volatile Maps maps = new Maps(); // we maintain a second map that only receives the updates that we skip on the actual map (unsafe ops) // this map is only maintained if assertions are enabled private volatile Maps unsafeKeysMap = new Maps(); - /** Bytes consumed for each BytesRef UID: + /** + * Bytes consumed for each BytesRef UID: * In this base value, we account for the {@link BytesRef} object itself as * well as the header of the byte[] array it holds, and some lost bytes due * to object alignment. So consumers of this constant just have to add the * length of the byte[] (assuming it is not shared between multiple - * instances). */ + * instances). + */ private static final long BASE_BYTES_PER_BYTESREF = - // shallow memory usage of the BytesRef object - RamUsageEstimator.shallowSizeOfInstance(BytesRef.class) + + // shallow memory usage of the BytesRef object + RamUsageEstimator.shallowSizeOfInstance(BytesRef.class) + // header of the byte[] array RamUsageEstimator.NUM_BYTES_ARRAY_HEADER + // with an alignment size (-XX:ObjectAlignmentInBytes) of 8 (default), @@ -167,8 +173,11 @@ Maps invalidateOldMap() { // lost bytes on average 3; - /** Bytes used by having CHM point to a key/value. */ + /** + * Bytes used by having CHM point to a key/value. + */ private static final long BASE_BYTES_PER_CHM_ENTRY; + static { // use the same impl as the Maps does Map map = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(); @@ -181,11 +190,15 @@ Maps invalidateOldMap() { BASE_BYTES_PER_CHM_ENTRY = chmEntryShallowSize + 2 * RamUsageEstimator.NUM_BYTES_OBJECT_REF; } - /** Tracks bytes used by current map, i.e. what is freed on refresh. For deletes, which are also added to tombstones, we only account - * for the CHM entry here, and account for BytesRef/VersionValue against the tombstones, since refresh would not clear this RAM. */ + /** + * Tracks bytes used by current map, i.e. what is freed on refresh. For deletes, which are also added to tombstones, we only account + * for the CHM entry here, and account for BytesRef/VersionValue against the tombstones, since refresh would not clear this RAM. + */ final AtomicLong ramBytesUsedCurrent = new AtomicLong(); - /** Tracks bytes used by tombstones (deletes) */ + /** + * Tracks bytes used by tombstones (deletes) + */ final AtomicLong ramBytesUsedTombstones = new AtomicLong(); @Override @@ -215,12 +228,15 @@ public void afterRefresh(boolean didRefresh) throws IOException { } - /** Returns the live version (add or delete) for this uid. */ + /** + * Returns the live version (add or delete) for this uid. + */ VersionValue getUnderLock(final BytesRef uid) { return getUnderLock(uid, maps); } private VersionValue getUnderLock(final BytesRef uid, Maps currentMaps) { + assert keyedLock.isHeldByCurrentThread(uid); // First try to get the "live" value: VersionValue value = currentMaps.current.get(uid); if (value != null) { @@ -255,8 +271,11 @@ boolean isSafeAccessRequired() { return maps.isSafeAccessMode(); } - /** Adds this uid/version to the pending adds map iff the map needs safe access. */ + /** + * Adds this uid/version to the pending adds map iff the map needs safe access. + */ void maybePutUnderLock(BytesRef uid, VersionValue version) { + assert keyedLock.isHeldByCurrentThread(uid); Maps maps = this.maps; if (maps.isSafeAccessMode()) { putUnderLock(uid, version, maps); @@ -271,14 +290,19 @@ private boolean putAssertionMap(BytesRef uid, VersionValue version) { return true; } - /** Adds this uid/version to the pending adds map. */ + /** + * Adds this uid/version to the pending adds map. + */ void putUnderLock(BytesRef uid, VersionValue version) { Maps maps = this.maps; putUnderLock(uid, version, maps); } - /** Adds this uid/version to the pending adds map. */ + /** + * Adds this uid/version to the pending adds map. + */ private void putUnderLock(BytesRef uid, VersionValue version, Maps maps) { + assert keyedLock.isHeldByCurrentThread(uid); assert uid.bytes.length == uid.length : "Oversized _uid! UID length: " + uid.length + ", bytes length: " + uid.bytes.length; long uidRAMBytesUsed = BASE_BYTES_PER_BYTESREF + uid.bytes.length; final VersionValue prev = maps.current.put(uid, version); @@ -301,7 +325,7 @@ private void putUnderLock(BytesRef uid, VersionValue version, Maps maps) { final VersionValue prevTombstone; if (version.isDelete()) { // Also enroll the delete into tombstones, and account for its RAM too: - prevTombstone = tombstones.put(uid, (DeleteVersionValue)version); + prevTombstone = tombstones.put(uid, (DeleteVersionValue) version); // We initially account for BytesRef/VersionValue RAM for a delete against the tombstones, because this RAM will not be freed up // on refresh. Later, in removeTombstoneUnderLock, if we clear the tombstone entry but the delete remains in current, we shift @@ -321,20 +345,22 @@ private void putUnderLock(BytesRef uid, VersionValue version, Maps maps) { // Deduct tombstones bytes used for the version we just removed or replaced: if (prevTombstone != null) { long v = ramBytesUsedTombstones.addAndGet(-(BASE_BYTES_PER_CHM_ENTRY + prevTombstone.ramBytesUsed() + uidRAMBytesUsed)); - assert v >= 0: "bytes=" + v; + assert v >= 0 : "bytes=" + v; } } - /** Removes this uid from the pending deletes map. */ + /** + * Removes this uid from the pending deletes map. + */ void removeTombstoneUnderLock(BytesRef uid) { - + assert keyedLock.isHeldByCurrentThread(uid); long uidRAMBytesUsed = BASE_BYTES_PER_BYTESREF + uid.bytes.length; final VersionValue prev = tombstones.remove(uid); if (prev != null) { assert prev.isDelete(); long v = ramBytesUsedTombstones.addAndGet(-(BASE_BYTES_PER_CHM_ENTRY + prev.ramBytesUsed() + uidRAMBytesUsed)); - assert v >= 0: "bytes=" + v; + assert v >= 0 : "bytes=" + v; } final VersionValue curVersion = maps.current.get(uid); if (curVersion != null && curVersion.isDelete()) { @@ -345,22 +371,31 @@ void removeTombstoneUnderLock(BytesRef uid) { } } - /** Caller has a lock, so that this uid will not be concurrently added/deleted by another thread. */ + /** + * Caller has a lock, so that this uid will not be concurrently added/deleted by another thread. + */ DeleteVersionValue getTombstoneUnderLock(BytesRef uid) { + assert keyedLock.isHeldByCurrentThread(uid); return tombstones.get(uid); } - /** Iterates over all deleted versions, including new ones (not yet exposed via reader) and old ones (exposed via reader but not yet GC'd). */ + /** + * Iterates over all deleted versions, including new ones (not yet exposed via reader) and old ones (exposed via reader but not yet GC'd). + */ Iterable> getAllTombstones() { return tombstones.entrySet(); } - /** clears all tombstones ops */ + /** + * clears all tombstones ops + */ void clearTombstones() { tombstones.clear(); } - /** Called when this index is closed. */ + /** + * Called when this index is closed. + */ synchronized void clear() { maps = new Maps(); tombstones.clear(); @@ -377,8 +412,10 @@ public long ramBytesUsed() { return ramBytesUsedCurrent.get() + ramBytesUsedTombstones.get(); } - /** Returns how much RAM would be freed up by refreshing. This is {@link #ramBytesUsed} except does not include tombstones because they - * don't clear on refresh. */ + /** + * Returns how much RAM would be freed up by refreshing. This is {@link #ramBytesUsed} except does not include tombstones because they + * don't clear on refresh. + */ long ramBytesUsedForRefresh() { return ramBytesUsedCurrent.get(); } @@ -389,7 +426,14 @@ public Collection getChildResources() { return Collections.emptyList(); } - /** Returns the current internal versions as a point in time snapshot*/ + /** + * Returns the current internal versions as a point in time snapshot + */ Map getAllCurrent() { return maps.current.map; - }} + } + + Releasable acquireLock(BytesRef uid) { + return keyedLock.acquire(uid); + } +} diff --git a/core/src/test/java/org/elasticsearch/index/engine/LiveVersionMapTests.java b/core/src/test/java/org/elasticsearch/index/engine/LiveVersionMapTests.java index f3613f72cd648..fd402f2437d98 100644 --- a/core/src/test/java/org/elasticsearch/index/engine/LiveVersionMapTests.java +++ b/core/src/test/java/org/elasticsearch/index/engine/LiveVersionMapTests.java @@ -47,7 +47,9 @@ public void testRamBytesUsed() throws Exception { BytesRefBuilder uid = new BytesRefBuilder(); uid.copyChars(TestUtil.randomSimpleString(random(), 10, 20)); VersionValue version = new VersionValue(randomLong(), randomLong(), randomLong()); - map.putUnderLock(uid.toBytesRef(), version); + try (Releasable r = map.acquireLock(uid.toBytesRef())) { + map.putUnderLock(uid.toBytesRef(), version); + } } long actualRamBytesUsed = RamUsageTester.sizeOf(map); long estimatedRamBytesUsed = map.ramBytesUsed(); @@ -62,7 +64,9 @@ public void testRamBytesUsed() throws Exception { BytesRefBuilder uid = new BytesRefBuilder(); uid.copyChars(TestUtil.randomSimpleString(random(), 10, 20)); VersionValue version = new VersionValue(randomLong(), randomLong(), randomLong()); - map.putUnderLock(uid.toBytesRef(), version); + try (Releasable r = map.acquireLock(uid.toBytesRef())) { + map.putUnderLock(uid.toBytesRef(), version); + } } actualRamBytesUsed = RamUsageTester.sizeOf(map); estimatedRamBytesUsed = map.ramBytesUsed(); @@ -79,33 +83,37 @@ private BytesRef uid(String string) { public void testBasics() throws IOException { LiveVersionMap map = new LiveVersionMap(); - map.putUnderLock(uid("test"), new VersionValue(1,1,1)); - assertEquals(new VersionValue(1,1,1), map.getUnderLock(uid("test"))); - map.beforeRefresh(); - assertEquals(new VersionValue(1,1,1), map.getUnderLock(uid("test"))); - map.afterRefresh(randomBoolean()); - assertNull(map.getUnderLock(uid("test"))); - - - map.putUnderLock(uid("test"), new DeleteVersionValue(1,1,1, Long.MAX_VALUE)); - assertEquals(new DeleteVersionValue(1,1,1, Long.MAX_VALUE), map.getUnderLock(uid("test"))); - map.beforeRefresh(); - assertEquals(new DeleteVersionValue(1,1,1, Long.MAX_VALUE), map.getUnderLock(uid("test"))); - map.afterRefresh(randomBoolean()); - assertEquals(new DeleteVersionValue(1,1,1, Long.MAX_VALUE), map.getUnderLock(uid("test"))); - map.removeTombstoneUnderLock(uid("test")); - assertNull(map.getUnderLock(uid("test"))); + try (Releasable r = map.acquireLock(uid("test"))) { + map.putUnderLock(uid("test"), new VersionValue(1, 1, 1)); + assertEquals(new VersionValue(1, 1, 1), map.getUnderLock(uid("test"))); + map.beforeRefresh(); + assertEquals(new VersionValue(1, 1, 1), map.getUnderLock(uid("test"))); + map.afterRefresh(randomBoolean()); + assertNull(map.getUnderLock(uid("test"))); + map.putUnderLock(uid("test"), new DeleteVersionValue(1, 1, 1, Long.MAX_VALUE)); + assertEquals(new DeleteVersionValue(1, 1, 1, Long.MAX_VALUE), map.getUnderLock(uid("test"))); + map.beforeRefresh(); + assertEquals(new DeleteVersionValue(1, 1, 1, Long.MAX_VALUE), map.getUnderLock(uid("test"))); + map.afterRefresh(randomBoolean()); + assertEquals(new DeleteVersionValue(1, 1, 1, Long.MAX_VALUE), map.getUnderLock(uid("test"))); + map.removeTombstoneUnderLock(uid("test")); + assertNull(map.getUnderLock(uid("test"))); + } } public void testAdjustMapSizeUnderLock() throws IOException { LiveVersionMap map = new LiveVersionMap(); - map.putUnderLock(uid("test"), new VersionValue(1,1,1)); + try (Releasable r = map.acquireLock(uid("test"))) { + map.putUnderLock(uid("test"), new VersionValue(1, 1, 1)); + } boolean withinRefresh = randomBoolean(); if (withinRefresh) { map.beforeRefresh(); } - assertEquals(new VersionValue(1,1,1), map.getUnderLock(uid("test"))); + try (Releasable r = map.acquireLock(uid("test"))) { + assertEquals(new VersionValue(1, 1, 1), map.getUnderLock(uid("test"))); + } final String msg; if (Assertions.ENABLED) { msg = expectThrows(AssertionError.class, map::adjustMapSizeUnderLock).getMessage(); @@ -113,7 +121,9 @@ public void testAdjustMapSizeUnderLock() throws IOException { msg = expectThrows(IllegalStateException.class, map::adjustMapSizeUnderLock).getMessage(); } assertEquals("map must be empty", msg); - assertEquals(new VersionValue(1,1,1), map.getUnderLock(uid("test"))); + try (Releasable r = map.acquireLock(uid("test"))) { + assertEquals(new VersionValue(1, 1, 1), map.getUnderLock(uid("test"))); + } if (withinRefresh == false) { map.beforeRefresh(); } @@ -131,7 +141,6 @@ public void testConcurrently() throws IOException, InterruptedException { } List keyList = new ArrayList<>(keySet); ConcurrentHashMap values = new ConcurrentHashMap<>(); - KeyedLock keyedLock = new KeyedLock<>(); LiveVersionMap map = new LiveVersionMap(); int numThreads = randomIntBetween(2, 5); @@ -151,7 +160,7 @@ public void testConcurrently() throws IOException, InterruptedException { try { for (int i = 0; i < randomValuesPerThread; ++i) { BytesRef bytesRef = randomFrom(random(), keyList); - try (Releasable r = keyedLock.acquire(bytesRef)) { + try (Releasable r = map.acquireLock(bytesRef)) { VersionValue versionValue = values.computeIfAbsent(bytesRef, v -> new VersionValue(randomLong(), randomLong(), randomLong())); boolean isDelete = versionValue instanceof DeleteVersionValue; @@ -180,20 +189,24 @@ public void testConcurrently() throws IOException, InterruptedException { Map valueMap = new HashMap<>(map.getAllCurrent()); map.beforeRefresh(); valueMap.forEach((k, v) -> { - VersionValue actualValue = map.getUnderLock(k); - assertNotNull(actualValue); - assertTrue(v.version <= actualValue.version); + try (Releasable r = map.acquireLock(k)) { + VersionValue actualValue = map.getUnderLock(k); + assertNotNull(actualValue); + assertTrue(v.version <= actualValue.version); + } }); map.afterRefresh(randomBoolean()); valueMap.forEach((k, v) -> { - VersionValue actualValue = map.getUnderLock(k); - if (actualValue != null) { - if (actualValue instanceof DeleteVersionValue) { - assertTrue(v.version <= actualValue.version); // deletes can be the same version - } else { - assertTrue(v.version < actualValue.version); - } + try (Releasable r = map.acquireLock(k)) { + VersionValue actualValue = map.getUnderLock(k); + if (actualValue != null) { + if (actualValue instanceof DeleteVersionValue) { + assertTrue(v.version <= actualValue.version); // deletes can be the same version + } else { + assertTrue(v.version < actualValue.version); + } + } } }); if (randomBoolean()) { @@ -232,7 +245,9 @@ public void testCarryOnSafeAccess() throws IOException { assertTrue("failed in iter: " + i, map.isSafeAccessRequired()); } - map.maybePutUnderLock(new BytesRef(""), new VersionValue(randomLong(), randomLong(), randomLong())); + try (Releasable r = map.acquireLock(uid(""))) { + map.maybePutUnderLock(new BytesRef(""), new VersionValue(randomLong(), randomLong(), randomLong())); + } assertFalse(map.isUnsafe()); assertEquals(1, map.getAllCurrent().size()); @@ -240,8 +255,9 @@ public void testCarryOnSafeAccess() throws IOException { map.afterRefresh(randomBoolean()); assertFalse(map.isUnsafe()); assertFalse(map.isSafeAccessRequired()); - - map.maybePutUnderLock(new BytesRef(""), new VersionValue(randomLong(), randomLong(), randomLong())); + try (Releasable r = map.acquireLock(uid(""))) { + map.maybePutUnderLock(new BytesRef(""), new VersionValue(randomLong(), randomLong(), randomLong())); + } assertTrue(map.isUnsafe()); assertFalse(map.isSafeAccessRequired()); assertEquals(0, map.getAllCurrent().size()); @@ -249,27 +265,29 @@ public void testCarryOnSafeAccess() throws IOException { public void testRefreshTransition() throws IOException { LiveVersionMap map = new LiveVersionMap(); - map.maybePutUnderLock(uid("1"), new VersionValue(randomLong(), randomLong(), randomLong())); - assertTrue(map.isUnsafe()); - assertNull(map.getUnderLock(uid("1"))); - map.beforeRefresh(); - assertTrue(map.isUnsafe()); - assertNull(map.getUnderLock(uid("1"))); - map.afterRefresh(randomBoolean()); - assertNull(map.getUnderLock(uid("1"))); - assertFalse(map.isUnsafe()); + try (Releasable r = map.acquireLock(uid("1"))) { + map.maybePutUnderLock(uid("1"), new VersionValue(randomLong(), randomLong(), randomLong())); + assertTrue(map.isUnsafe()); + assertNull(map.getUnderLock(uid("1"))); + map.beforeRefresh(); + assertTrue(map.isUnsafe()); + assertNull(map.getUnderLock(uid("1"))); + map.afterRefresh(randomBoolean()); + assertNull(map.getUnderLock(uid("1"))); + assertFalse(map.isUnsafe()); - map.enforceSafeAccess(); - map.maybePutUnderLock(uid("1"), new VersionValue(randomLong(), randomLong(), randomLong())); - assertFalse(map.isUnsafe()); - assertNotNull(map.getUnderLock(uid("1"))); - map.beforeRefresh(); - assertFalse(map.isUnsafe()); - assertTrue(map.isSafeAccessRequired()); - assertNotNull(map.getUnderLock(uid("1"))); - map.afterRefresh(randomBoolean()); - assertNull(map.getUnderLock(uid("1"))); - assertFalse(map.isUnsafe()); - assertTrue(map.isSafeAccessRequired()); + map.enforceSafeAccess(); + map.maybePutUnderLock(uid("1"), new VersionValue(randomLong(), randomLong(), randomLong())); + assertFalse(map.isUnsafe()); + assertNotNull(map.getUnderLock(uid("1"))); + map.beforeRefresh(); + assertFalse(map.isUnsafe()); + assertTrue(map.isSafeAccessRequired()); + assertNotNull(map.getUnderLock(uid("1"))); + map.afterRefresh(randomBoolean()); + assertNull(map.getUnderLock(uid("1"))); + assertFalse(map.isUnsafe()); + assertTrue(map.isSafeAccessRequired()); + } } } From 599e73e8feb4190b6222ceb4c776f52017ba40a3 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Tue, 19 Dec 2017 16:32:41 +0100 Subject: [PATCH 2/2] apply review comments --- .../elasticsearch/index/engine/InternalEngine.java | 11 ++++------- .../elasticsearch/index/engine/LiveVersionMap.java | 6 ++++++ 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java index c9b087ce8801e..470b0e1108164 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java @@ -550,7 +550,8 @@ public GetResult get(Get get, BiFunction search SearcherScope scope; if (get.realtime()) { VersionValue versionValue = null; - try (Releasable ignore = acquireLock(get.uid())) { // we need to lock here to access the version map to do this truly in RT + try (Releasable ignore = versionMap.acquireLock(get.uid().bytes())) { + // we need to lock here to access the version map to do this truly in RT versionValue = getVersionFromMap(get.uid().bytes()); } if (versionValue != null) { @@ -734,7 +735,7 @@ public IndexResult index(Index index) throws IOException { ensureOpen(); assert assertIncomingSequenceNumber(index.origin(), index.seqNo()); assert assertVersionType(index); - try (Releasable ignored = acquireLock(index.uid()); + try (Releasable ignored = versionMap.acquireLock(index.uid().bytes()); Releasable indexThrottle = doThrottle ? () -> {} : throttle.acquireThrottle()) { lastWriteNanos = index.startTime(); /* A NOTE ABOUT APPEND ONLY OPTIMIZATIONS: @@ -1060,7 +1061,7 @@ public DeleteResult delete(Delete delete) throws IOException { assert assertIncomingSequenceNumber(delete.origin(), delete.seqNo()); final DeleteResult deleteResult; // NOTE: we don't throttle this when merges fall behind because delete-by-id does not create new segments: - try (ReleasableLock ignored = readLock.acquire(); Releasable ignored2 = acquireLock(delete.uid())) { + try (ReleasableLock ignored = readLock.acquire(); Releasable ignored2 = versionMap.acquireLock(delete.uid().bytes())) { ensureOpen(); lastWriteNanos = delete.startTime(); final DeletionStrategy plan; @@ -1769,10 +1770,6 @@ protected ReferenceManager getSearcherManager(String source, Sear } } - private Releasable acquireLock(Term uid) { - return versionMap.acquireLock(uid.bytes()); - } - private long loadCurrentVersionFromIndex(Term uid) throws IOException { assert incrementIndexVersionLookup(); try (Searcher searcher = acquireSearcher("load_version", SearcherScope.INTERNAL)) { diff --git a/core/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java b/core/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java index 81e165c0e5225..fa4131eac0a80 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java +++ b/core/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java @@ -433,6 +433,12 @@ Map getAllCurrent() { return maps.current.map; } + /** + * Acquires a releaseable lock for the given uId. All *UnderLock methods require + * this lock to be hold by the caller otherwise the visibility guarantees of this version + * map are broken. We assert on this lock to be hold when calling these methods. + * @see KeyedLock + */ Releasable acquireLock(BytesRef uid) { return keyedLock.acquire(uid); }