Skip to content

Commit e697283

Browse files
committed
Move uid lock into LiveVersionMap (#27905)
While the LiveVersionMap is an internal class that belongs to the engine we do rely on some external locking to enforce the desired semantics. Yet, in tests we mimic the outer locking but we don't have any way to enforce or assert on that the lock is actually hold. This change moves the KeyedLock inside the LiveVersionMap that allows the engine to access it as before but enables assertions in the LiveVersionMap to ensure the lock for the modifying or reading key is actually hold.
1 parent cba80f3 commit e697283

File tree

3 files changed

+161
-98
lines changed

3 files changed

+161
-98
lines changed

core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,6 @@ public class InternalEngine extends Engine {
117117
// we use the hashed variant since we iterate over it and check removal and additions on existing keys
118118
private final LiveVersionMap versionMap = new LiveVersionMap();
119119

120-
private final KeyedLock<BytesRef> keyedLock = new KeyedLock<>();
121-
122120
private volatile SegmentInfos lastCommittedSegmentInfos;
123121

124122
private final IndexThrottle throttle;
@@ -577,7 +575,11 @@ public GetResult get(Get get, BiFunction<String, SearcherScope, Searcher> search
577575
ensureOpen();
578576
SearcherScope scope;
579577
if (get.realtime()) {
580-
VersionValue versionValue = getVersionFromMap(get.uid().bytes());
578+
VersionValue versionValue = null;
579+
try (Releasable ignore = versionMap.acquireLock(get.uid().bytes())) {
580+
// we need to lock here to access the version map to do this truly in RT
581+
versionValue = getVersionFromMap(get.uid().bytes());
582+
}
581583
if (versionValue != null) {
582584
if (versionValue.isDelete()) {
583585
return GetResult.NOT_EXISTS;
@@ -785,7 +787,7 @@ public IndexResult index(Index index) throws IOException {
785787
ensureOpen();
786788
assert assertIncomingSequenceNumber(index.origin(), index.seqNo());
787789
assert assertVersionType(index);
788-
try (Releasable ignored = acquireLock(index.uid());
790+
try (Releasable ignored = versionMap.acquireLock(index.uid().bytes());
789791
Releasable indexThrottle = doThrottle ? () -> {} : throttle.acquireThrottle()) {
790792
lastWriteNanos = index.startTime();
791793
/* A NOTE ABOUT APPEND ONLY OPTIMIZATIONS:
@@ -1117,7 +1119,7 @@ public DeleteResult delete(Delete delete) throws IOException {
11171119
assert assertIncomingSequenceNumber(delete.origin(), delete.seqNo());
11181120
final DeleteResult deleteResult;
11191121
// NOTE: we don't throttle this when merges fall behind because delete-by-id does not create new segments:
1120-
try (ReleasableLock ignored = readLock.acquire(); Releasable ignored2 = acquireLock(delete.uid())) {
1122+
try (ReleasableLock ignored = readLock.acquire(); Releasable ignored2 = versionMap.acquireLock(delete.uid().bytes())) {
11211123
ensureOpen();
11221124
lastWriteNanos = delete.startTime();
11231125
final DeletionStrategy plan;
@@ -1582,7 +1584,8 @@ private void pruneDeletedTombstones() {
15821584
// we only need to prune the deletes map; the current/old version maps are cleared on refresh:
15831585
for (Map.Entry<BytesRef, DeleteVersionValue> entry : versionMap.getAllTombstones()) {
15841586
BytesRef uid = entry.getKey();
1585-
try (Releasable ignored = acquireLock(uid)) { // can we do it without this lock on each value? maybe batch to a set and get the lock once per set?
1587+
try (Releasable ignored = versionMap.acquireLock(uid)) {
1588+
// can we do it without this lock on each value? maybe batch to a set and get the lock once per set?
15861589

15871590
// Must re-get it here, vs using entry.getValue(), in case the uid was indexed/deleted since we pulled the iterator:
15881591
DeleteVersionValue versionValue = versionMap.getTombstoneUnderLock(uid);
@@ -1829,14 +1832,6 @@ protected ReferenceManager<IndexSearcher> getSearcherManager(String source, Sear
18291832
}
18301833
}
18311834

1832-
private Releasable acquireLock(BytesRef uid) {
1833-
return keyedLock.acquire(uid);
1834-
}
1835-
1836-
private Releasable acquireLock(Term uid) {
1837-
return acquireLock(uid.bytes());
1838-
}
1839-
18401835
private long loadCurrentVersionFromIndex(Term uid) throws IOException {
18411836
assert incrementIndexVersionLookup();
18421837
try (Searcher searcher = acquireSearcher("load_version", SearcherScope.INTERNAL)) {

core/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java

Lines changed: 77 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
import org.apache.lucene.util.Accountable;
2424
import org.apache.lucene.util.BytesRef;
2525
import org.apache.lucene.util.RamUsageEstimator;
26+
import org.elasticsearch.common.lease.Releasable;
2627
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
28+
import org.elasticsearch.common.util.concurrent.KeyedLock;
2729

2830
import java.io.IOException;
2931
import java.util.Collection;
@@ -34,6 +36,8 @@
3436
/** Maps _uid value to its version information. */
3537
final class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable {
3638

39+
private final KeyedLock<BytesRef> keyedLock = new KeyedLock<>();
40+
3741
/**
3842
* Resets the internal map and adjusts it's capacity as if there were no indexing operations.
3943
* This must be called under write lock in the engine
@@ -49,7 +53,7 @@ void adjustMapSizeUnderLock() {
4953
private static final class VersionLookup {
5054

5155
private static final VersionLookup EMPTY = new VersionLookup(Collections.emptyMap());
52-
private final Map<BytesRef,VersionValue> map;
56+
private final Map<BytesRef, VersionValue> map;
5357

5458
// each version map has a notion of safe / unsafe which allows us to apply certain optimization in the auto-generated ID usecase
5559
// where we know that documents can't have any duplicates so we can skip the version map entirely. This reduces
@@ -144,31 +148,36 @@ Maps invalidateOldMap() {
144148
}
145149

146150
// All deletes also go here, and delete "tombstones" are retained after refresh:
147-
private final Map<BytesRef,DeleteVersionValue> tombstones = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
151+
private final Map<BytesRef, DeleteVersionValue> tombstones = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
148152

149153
private volatile Maps maps = new Maps();
150154
// we maintain a second map that only receives the updates that we skip on the actual map (unsafe ops)
151155
// this map is only maintained if assertions are enabled
152156
private volatile Maps unsafeKeysMap = new Maps();
153157

154-
/** Bytes consumed for each BytesRef UID:
158+
/**
159+
* Bytes consumed for each BytesRef UID:
155160
* In this base value, we account for the {@link BytesRef} object itself as
156161
* well as the header of the byte[] array it holds, and some lost bytes due
157162
* to object alignment. So consumers of this constant just have to add the
158163
* length of the byte[] (assuming it is not shared between multiple
159-
* instances). */
164+
* instances).
165+
*/
160166
private static final long BASE_BYTES_PER_BYTESREF =
161-
// shallow memory usage of the BytesRef object
162-
RamUsageEstimator.shallowSizeOfInstance(BytesRef.class) +
167+
// shallow memory usage of the BytesRef object
168+
RamUsageEstimator.shallowSizeOfInstance(BytesRef.class) +
163169
// header of the byte[] array
164170
RamUsageEstimator.NUM_BYTES_ARRAY_HEADER +
165171
// with an alignment size (-XX:ObjectAlignmentInBytes) of 8 (default),
166172
// there could be between 0 and 7 lost bytes, so we account for 3
167173
// lost bytes on average
168174
3;
169175

170-
/** Bytes used by having CHM point to a key/value. */
176+
/**
177+
* Bytes used by having CHM point to a key/value.
178+
*/
171179
private static final long BASE_BYTES_PER_CHM_ENTRY;
180+
172181
static {
173182
// use the same impl as the Maps does
174183
Map<Integer, Integer> map = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
@@ -181,11 +190,15 @@ Maps invalidateOldMap() {
181190
BASE_BYTES_PER_CHM_ENTRY = chmEntryShallowSize + 2 * RamUsageEstimator.NUM_BYTES_OBJECT_REF;
182191
}
183192

184-
/** Tracks bytes used by current map, i.e. what is freed on refresh. For deletes, which are also added to tombstones, we only account
185-
* for the CHM entry here, and account for BytesRef/VersionValue against the tombstones, since refresh would not clear this RAM. */
193+
/**
194+
* Tracks bytes used by current map, i.e. what is freed on refresh. For deletes, which are also added to tombstones, we only account
195+
* for the CHM entry here, and account for BytesRef/VersionValue against the tombstones, since refresh would not clear this RAM.
196+
*/
186197
final AtomicLong ramBytesUsedCurrent = new AtomicLong();
187198

188-
/** Tracks bytes used by tombstones (deletes) */
199+
/**
200+
* Tracks bytes used by tombstones (deletes)
201+
*/
189202
final AtomicLong ramBytesUsedTombstones = new AtomicLong();
190203

191204
@Override
@@ -215,12 +228,15 @@ public void afterRefresh(boolean didRefresh) throws IOException {
215228

216229
}
217230

218-
/** Returns the live version (add or delete) for this uid. */
231+
/**
232+
* Returns the live version (add or delete) for this uid.
233+
*/
219234
VersionValue getUnderLock(final BytesRef uid) {
220235
return getUnderLock(uid, maps);
221236
}
222237

223238
private VersionValue getUnderLock(final BytesRef uid, Maps currentMaps) {
239+
assert keyedLock.isHeldByCurrentThread(uid);
224240
// First try to get the "live" value:
225241
VersionValue value = currentMaps.current.get(uid);
226242
if (value != null) {
@@ -255,8 +271,11 @@ boolean isSafeAccessRequired() {
255271
return maps.isSafeAccessMode();
256272
}
257273

258-
/** Adds this uid/version to the pending adds map iff the map needs safe access. */
274+
/**
275+
* Adds this uid/version to the pending adds map iff the map needs safe access.
276+
*/
259277
void maybePutUnderLock(BytesRef uid, VersionValue version) {
278+
assert keyedLock.isHeldByCurrentThread(uid);
260279
Maps maps = this.maps;
261280
if (maps.isSafeAccessMode()) {
262281
putUnderLock(uid, version, maps);
@@ -271,14 +290,19 @@ private boolean putAssertionMap(BytesRef uid, VersionValue version) {
271290
return true;
272291
}
273292

274-
/** Adds this uid/version to the pending adds map. */
293+
/**
294+
* Adds this uid/version to the pending adds map.
295+
*/
275296
void putUnderLock(BytesRef uid, VersionValue version) {
276297
Maps maps = this.maps;
277298
putUnderLock(uid, version, maps);
278299
}
279300

280-
/** Adds this uid/version to the pending adds map. */
301+
/**
302+
* Adds this uid/version to the pending adds map.
303+
*/
281304
private void putUnderLock(BytesRef uid, VersionValue version, Maps maps) {
305+
assert keyedLock.isHeldByCurrentThread(uid);
282306
assert uid.bytes.length == uid.length : "Oversized _uid! UID length: " + uid.length + ", bytes length: " + uid.bytes.length;
283307
long uidRAMBytesUsed = BASE_BYTES_PER_BYTESREF + uid.bytes.length;
284308
final VersionValue prev = maps.current.put(uid, version);
@@ -301,7 +325,7 @@ private void putUnderLock(BytesRef uid, VersionValue version, Maps maps) {
301325
final VersionValue prevTombstone;
302326
if (version.isDelete()) {
303327
// Also enroll the delete into tombstones, and account for its RAM too:
304-
prevTombstone = tombstones.put(uid, (DeleteVersionValue)version);
328+
prevTombstone = tombstones.put(uid, (DeleteVersionValue) version);
305329

306330
// We initially account for BytesRef/VersionValue RAM for a delete against the tombstones, because this RAM will not be freed up
307331
// on refresh. Later, in removeTombstoneUnderLock, if we clear the tombstone entry but the delete remains in current, we shift
@@ -321,20 +345,22 @@ private void putUnderLock(BytesRef uid, VersionValue version, Maps maps) {
321345
// Deduct tombstones bytes used for the version we just removed or replaced:
322346
if (prevTombstone != null) {
323347
long v = ramBytesUsedTombstones.addAndGet(-(BASE_BYTES_PER_CHM_ENTRY + prevTombstone.ramBytesUsed() + uidRAMBytesUsed));
324-
assert v >= 0: "bytes=" + v;
348+
assert v >= 0 : "bytes=" + v;
325349
}
326350
}
327351

328-
/** Removes this uid from the pending deletes map. */
352+
/**
353+
* Removes this uid from the pending deletes map.
354+
*/
329355
void removeTombstoneUnderLock(BytesRef uid) {
330-
356+
assert keyedLock.isHeldByCurrentThread(uid);
331357
long uidRAMBytesUsed = BASE_BYTES_PER_BYTESREF + uid.bytes.length;
332358

333359
final VersionValue prev = tombstones.remove(uid);
334360
if (prev != null) {
335361
assert prev.isDelete();
336362
long v = ramBytesUsedTombstones.addAndGet(-(BASE_BYTES_PER_CHM_ENTRY + prev.ramBytesUsed() + uidRAMBytesUsed));
337-
assert v >= 0: "bytes=" + v;
363+
assert v >= 0 : "bytes=" + v;
338364
}
339365
final VersionValue curVersion = maps.current.get(uid);
340366
if (curVersion != null && curVersion.isDelete()) {
@@ -345,22 +371,31 @@ void removeTombstoneUnderLock(BytesRef uid) {
345371
}
346372
}
347373

348-
/** Caller has a lock, so that this uid will not be concurrently added/deleted by another thread. */
374+
/**
375+
* Caller has a lock, so that this uid will not be concurrently added/deleted by another thread.
376+
*/
349377
DeleteVersionValue getTombstoneUnderLock(BytesRef uid) {
378+
assert keyedLock.isHeldByCurrentThread(uid);
350379
return tombstones.get(uid);
351380
}
352381

353-
/** Iterates over all deleted versions, including new ones (not yet exposed via reader) and old ones (exposed via reader but not yet GC'd). */
382+
/**
383+
* Iterates over all deleted versions, including new ones (not yet exposed via reader) and old ones (exposed via reader but not yet GC'd).
384+
*/
354385
Iterable<Map.Entry<BytesRef, DeleteVersionValue>> getAllTombstones() {
355386
return tombstones.entrySet();
356387
}
357388

358-
/** clears all tombstones ops */
389+
/**
390+
* clears all tombstones ops
391+
*/
359392
void clearTombstones() {
360393
tombstones.clear();
361394
}
362395

363-
/** Called when this index is closed. */
396+
/**
397+
* Called when this index is closed.
398+
*/
364399
synchronized void clear() {
365400
maps = new Maps();
366401
tombstones.clear();
@@ -377,8 +412,10 @@ public long ramBytesUsed() {
377412
return ramBytesUsedCurrent.get() + ramBytesUsedTombstones.get();
378413
}
379414

380-
/** Returns how much RAM would be freed up by refreshing. This is {@link #ramBytesUsed} except does not include tombstones because they
381-
* don't clear on refresh. */
415+
/**
416+
* Returns how much RAM would be freed up by refreshing. This is {@link #ramBytesUsed} except does not include tombstones because they
417+
* don't clear on refresh.
418+
*/
382419
long ramBytesUsedForRefresh() {
383420
return ramBytesUsedCurrent.get();
384421
}
@@ -389,7 +426,20 @@ public Collection<Accountable> getChildResources() {
389426
return Collections.emptyList();
390427
}
391428

392-
/** Returns the current internal versions as a point in time snapshot*/
429+
/**
430+
* Returns the current internal versions as a point in time snapshot
431+
*/
393432
Map<BytesRef, VersionValue> getAllCurrent() {
394433
return maps.current.map;
395-
}}
434+
}
435+
436+
/**
437+
* Acquires a releaseable lock for the given uId. All *UnderLock methods require
438+
* this lock to be hold by the caller otherwise the visibility guarantees of this version
439+
* map are broken. We assert on this lock to be hold when calling these methods.
440+
* @see KeyedLock
441+
*/
442+
Releasable acquireLock(BytesRef uid) {
443+
return keyedLock.acquire(uid);
444+
}
445+
}

0 commit comments

Comments
 (0)