Skip to content

Commit 0380e49

Browse files
committed
Optimize version map for append-only indexing (#27752)
Today we still maintain a version map even if we only index append-only or in other words, documents with auto-generated IDs. We can instead maintain an un-safe version map that will be swapped to a safe version map only if necessary once we see the first document that requires access to the version map. For instance: * a auto-generated id retry * any kind of deletes * a document with a foreign ID (non-autogenerated In these cases we forcefully refresh then internal reader and start maintaining a version map until such a safe map wasn't necessary for two refresh cycles. Indices / shards that never see an autogenerated ID document will always meintain a version map and in the case of a delete / retry in a pure append-only index the version map will be de-optimized for a short amount of time until we know it's safe again to swap back. This will also minimize the requried refeshes. Closes #19813
1 parent 3a7b011 commit 0380e49

File tree

4 files changed

+351
-20
lines changed

4 files changed

+351
-20
lines changed

core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -581,7 +581,7 @@ public GetResult get(Get get, BiFunction<String, SearcherScope, Searcher> search
581581
ensureOpen();
582582
SearcherScope scope;
583583
if (get.realtime()) {
584-
VersionValue versionValue = versionMap.getUnderLock(get.uid().bytes());
584+
VersionValue versionValue = getVersionFromMap(get.uid().bytes());
585585
if (versionValue != null) {
586586
if (versionValue.isDelete()) {
587587
return GetResult.NOT_EXISTS;
@@ -619,7 +619,7 @@ enum OpVsLuceneDocStatus {
619619
private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op) throws IOException {
620620
assert op.seqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO : "resolving ops based on seq# but no seqNo is found";
621621
final OpVsLuceneDocStatus status;
622-
final VersionValue versionValue = versionMap.getUnderLock(op.uid().bytes());
622+
VersionValue versionValue = getVersionFromMap(op.uid().bytes());
623623
assert incrementVersionLookup();
624624
if (versionValue != null) {
625625
if (op.seqNo() > versionValue.seqNo ||
@@ -656,7 +656,7 @@ private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnSeqNo(final Operation op)
656656
/** resolves the current version of the document, returning null if not found */
657657
private VersionValue resolveDocVersion(final Operation op) throws IOException {
658658
assert incrementVersionLookup(); // used for asserting in tests
659-
VersionValue versionValue = versionMap.getUnderLock(op.uid().bytes());
659+
VersionValue versionValue = getVersionFromMap(op.uid().bytes());
660660
if (versionValue == null) {
661661
assert incrementIndexVersionLookup(); // used for asserting in tests
662662
final long currentVersion = loadCurrentVersionFromIndex(op.uid());
@@ -683,6 +683,21 @@ private OpVsLuceneDocStatus compareOpToLuceneDocBasedOnVersions(final Operation
683683
}
684684
}
685685

686+
private VersionValue getVersionFromMap(BytesRef id) {
687+
if (versionMap.isUnsafe()) {
688+
synchronized (versionMap) {
689+
// we are switching from an unsafe map to a safe map. This might happen concurrently
690+
// but we only need to do this once since the last operation per ID is to add to the version
691+
// map so once we pass this point we can safely lookup from the version map.
692+
if (versionMap.isUnsafe()) {
693+
refresh("unsafe_version_map", SearcherScope.INTERNAL);
694+
}
695+
versionMap.enforceSafeAccess();
696+
}
697+
}
698+
return versionMap.getUnderLock(id);
699+
}
700+
686701
private boolean canOptimizeAddDocument(Index index) {
687702
if (index.getAutoGeneratedIdTimestamp() != IndexRequest.UNSET_AUTO_GENERATED_TIMESTAMP) {
688703
assert index.getAutoGeneratedIdTimestamp() >= 0 : "autoGeneratedIdTimestamp must be positive but was: "
@@ -857,6 +872,7 @@ private IndexingStrategy planIndexingAsNonPrimary(Index index) throws IOExceptio
857872
assert index.version() == 1L : "can optimize on replicas but incoming version is [" + index.version() + "]";
858873
plan = IndexingStrategy.optimizedAppendOnly(index.seqNo());
859874
} else {
875+
versionMap.enforceSafeAccess();
860876
// drop out of order operations
861877
assert index.versionType().versionTypeForReplicationAndRecovery() == index.versionType() :
862878
"resolving out of order delivery based on versioning but version type isn't fit for it. got [" + index.versionType() + "]";
@@ -900,10 +916,12 @@ private IndexingStrategy planIndexingAsPrimary(Index index) throws IOException {
900916
if (canOptimizeAddDocument(index)) {
901917
if (mayHaveBeenIndexedBefore(index)) {
902918
plan = IndexingStrategy.overrideExistingAsIfNotThere(generateSeqNoForOperation(index), 1L);
919+
versionMap.enforceSafeAccess();
903920
} else {
904921
plan = IndexingStrategy.optimizedAppendOnly(generateSeqNoForOperation(index));
905922
}
906923
} else {
924+
versionMap.enforceSafeAccess();
907925
// resolves incoming version
908926
final VersionValue versionValue = resolveDocVersion(index);
909927
final long currentVersion;
@@ -949,7 +967,7 @@ private IndexResult indexIntoLucene(Index index, IndexingStrategy plan)
949967
assert assertDocDoesNotExist(index, canOptimizeAddDocument(index) == false);
950968
index(index.docs(), indexWriter);
951969
}
952-
versionMap.putUnderLock(index.uid().bytes(),
970+
versionMap.maybePutUnderLock(index.uid().bytes(),
953971
new VersionValue(plan.versionForIndexing, plan.seqNoForIndexing, index.primaryTerm()));
954972
return new IndexResult(plan.versionForIndexing, plan.seqNoForIndexing, plan.currentNotFoundOrDeleted);
955973
} catch (Exception ex) {
@@ -1069,7 +1087,9 @@ static IndexingStrategy processButSkipLucene(boolean currentNotFoundOrDeleted,
10691087
* Asserts that the doc in the index operation really doesn't exist
10701088
*/
10711089
private boolean assertDocDoesNotExist(final Index index, final boolean allowDeleted) throws IOException {
1072-
final VersionValue versionValue = versionMap.getUnderLock(index.uid().bytes());
1090+
// NOTE this uses direct access to the version map since we are in the assertion code where we maintain a secondary
1091+
// map in the version map such that we don't need to refresh if we are unsafe;
1092+
final VersionValue versionValue = versionMap.getVersionForAssert(index.uid().bytes());
10731093
if (versionValue != null) {
10741094
if (versionValue.isDelete() == false || allowDeleted == false) {
10751095
throw new AssertionError("doc [" + index.type() + "][" + index.id() + "] exists in version map (version " + versionValue + ")");
@@ -1095,6 +1115,7 @@ private static void update(final Term uid, final List<ParseContext.Document> doc
10951115

10961116
@Override
10971117
public DeleteResult delete(Delete delete) throws IOException {
1118+
versionMap.enforceSafeAccess();
10981119
assert Objects.equals(delete.uid().field(), uidField) : delete.uid().field();
10991120
assert assertVersionType(delete);
11001121
assert assertIncomingSequenceNumber(delete.origin(), delete.seqNo());
@@ -2184,6 +2205,15 @@ private boolean incrementIndexVersionLookup() {
21842205
return true;
21852206
}
21862207

2208+
int getVersionMapSize() {
2209+
return versionMap.getAllCurrent().size();
2210+
}
2211+
2212+
boolean isSafeAccessRequired() {
2213+
return versionMap.isSafeAccessRequired();
2214+
}
2215+
2216+
21872217
/**
21882218
* Returns <code>true</code> iff the index writer has any deletions either buffered in memory or
21892219
* in the index.

core/src/main/java/org/elasticsearch/index/engine/LiveVersionMap.java

Lines changed: 142 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
import java.util.concurrent.atomic.AtomicLong;
3333

3434
/** Maps _uid value to its version information. */
35-
class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable {
35+
final class LiveVersionMap implements ReferenceManager.RefreshListener, Accountable {
3636

3737
/**
3838
* Resets the internal map and adjusts it's capacity as if there were no indexing operations.
@@ -46,29 +46,110 @@ void adjustMapSizeUnderLock() {
4646
maps = new Maps();
4747
}
4848

49-
private static class Maps {
49+
private static final class VersionLookup {
50+
51+
private static final VersionLookup EMPTY = new VersionLookup(Collections.emptyMap());
52+
private final Map<BytesRef,VersionValue> map;
53+
54+
// each version map has a notion of safe / unsafe which allows us to apply certain optimization in the auto-generated ID usecase
55+
// where we know that documents can't have any duplicates so we can skip the version map entirely. This reduces
56+
// the memory pressure significantly for this use-case where we often get a massive amount of small document (metrics).
57+
// if the version map is in safeAccess mode we track all version in the version map. yet if a document comes in that needs
58+
// safe access but we are not in this mode we force a refresh and make the map as safe access required. All subsequent ops will
59+
// respect that and fill the version map. The nice part here is that we are only really requiring this for a single ID and since
60+
// we hold the ID lock in the engine while we do all this it's safe to do it globally unlocked.
61+
// NOTE: these values can both be non-volatile since it's ok to read a stale value per doc ID. We serialize changes in the engine
62+
// that will prevent concurrent updates to the same document ID and therefore we can rely on the happens-before guanratee of the
63+
// map reference itself.
64+
private boolean unsafe;
65+
66+
private VersionLookup(Map<BytesRef, VersionValue> map) {
67+
this.map = map;
68+
}
69+
70+
VersionValue get(BytesRef key) {
71+
return map.get(key);
72+
}
73+
74+
VersionValue put(BytesRef key, VersionValue value) {
75+
return map.put(key, value);
76+
}
77+
78+
boolean isEmpty() {
79+
return map.isEmpty();
80+
}
81+
82+
83+
int size() {
84+
return map.size();
85+
}
86+
87+
boolean isUnsafe() {
88+
return unsafe;
89+
}
90+
91+
void markAsUnsafe() {
92+
unsafe = true;
93+
}
94+
}
95+
96+
private static final class Maps {
5097

5198
// All writes (adds and deletes) go into here:
52-
final Map<BytesRef,VersionValue> current;
99+
final VersionLookup current;
53100

54101
// Used while refresh is running, and to hold adds/deletes until refresh finishes. We read from both current and old on lookup:
55-
final Map<BytesRef,VersionValue> old;
102+
final VersionLookup old;
103+
104+
// this is not volatile since we don't need to maintain a happens before relation ship across doc IDs so it's enough to
105+
// have the volatile read of the Maps reference to make it visible even across threads.
106+
boolean needsSafeAccess;
107+
final boolean previousMapsNeededSafeAccess;
56108

57-
Maps(Map<BytesRef,VersionValue> current, Map<BytesRef,VersionValue> old) {
58-
this.current = current;
59-
this.old = old;
109+
Maps(VersionLookup current, VersionLookup old, boolean previousMapsNeededSafeAccess) {
110+
this.current = current;
111+
this.old = old;
112+
this.previousMapsNeededSafeAccess = previousMapsNeededSafeAccess;
60113
}
61114

62115
Maps() {
63-
this(ConcurrentCollections.<BytesRef,VersionValue>newConcurrentMapWithAggressiveConcurrency(),
64-
Collections.emptyMap());
116+
this(new VersionLookup(ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency()), VersionLookup.EMPTY, false);
117+
}
118+
119+
boolean isSafeAccessMode() {
120+
return needsSafeAccess || previousMapsNeededSafeAccess;
121+
}
122+
123+
boolean shouldInheritSafeAccess() {
124+
final boolean mapHasNotSeenAnyOperations = current.isEmpty() && current.isUnsafe() == false;
125+
return needsSafeAccess
126+
// we haven't seen any ops and map before needed it so we maintain it
127+
|| (mapHasNotSeenAnyOperations && previousMapsNeededSafeAccess);
128+
}
129+
130+
/**
131+
* Builds a new map for the refresh transition this should be called in beforeRefresh()
132+
*/
133+
Maps buildTransitionMap() {
134+
return new Maps(new VersionLookup(ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(current.size())),
135+
current, shouldInheritSafeAccess());
136+
}
137+
138+
/**
139+
* builds a new map that invalidates the old map but maintains the current. This should be called in afterRefresh()
140+
*/
141+
Maps invalidateOldMap() {
142+
return new Maps(current, VersionLookup.EMPTY, previousMapsNeededSafeAccess);
65143
}
66144
}
67145

68146
// All deletes also go here, and delete "tombstones" are retained after refresh:
69147
private final Map<BytesRef,DeleteVersionValue> tombstones = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
70148

71149
private volatile Maps maps = new Maps();
150+
// we maintain a second map that only receives the updates that we skip on the actual map (unsafe ops)
151+
// this map is only maintained if assertions are enabled
152+
private volatile Maps unsafeKeysMap = new Maps();
72153

73154
/** Bytes consumed for each BytesRef UID:
74155
* In this base value, we account for the {@link BytesRef} object itself as
@@ -113,8 +194,8 @@ public void beforeRefresh() throws IOException {
113194
// map. While reopen is running, any lookup will first
114195
// try this new map, then fallback to old, then to the
115196
// current searcher:
116-
maps = new Maps(ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(maps.current.size()), maps.current);
117-
197+
maps = maps.buildTransitionMap();
198+
assert (unsafeKeysMap = unsafeKeysMap.buildTransitionMap()) != null;
118199
// This is not 100% correct, since concurrent indexing ops can change these counters in between our execution of the previous
119200
// line and this one, but that should be minor, and the error won't accumulate over time:
120201
ramBytesUsedCurrent.set(0);
@@ -128,13 +209,18 @@ public void afterRefresh(boolean didRefresh) throws IOException {
128209
// case. This is because we assign new maps (in beforeRefresh) slightly before Lucene actually flushes any segments for the
129210
// reopen, and so any concurrent indexing requests can still sneak in a few additions to that current map that are in fact reflected
130211
// in the previous reader. We don't touch tombstones here: they expire on their own index.gc_deletes timeframe:
131-
maps = new Maps(maps.current, Collections.emptyMap());
212+
213+
maps = maps.invalidateOldMap();
214+
assert (unsafeKeysMap = unsafeKeysMap.invalidateOldMap()) != null;
215+
132216
}
133217

134218
/** Returns the live version (add or delete) for this uid. */
135219
VersionValue getUnderLock(final BytesRef uid) {
136-
Maps currentMaps = maps;
220+
return getUnderLock(uid, maps);
221+
}
137222

223+
private VersionValue getUnderLock(final BytesRef uid, Maps currentMaps) {
138224
// First try to get the "live" value:
139225
VersionValue value = currentMaps.current.get(uid);
140226
if (value != null) {
@@ -149,11 +235,52 @@ VersionValue getUnderLock(final BytesRef uid) {
149235
return tombstones.get(uid);
150236
}
151237

238+
VersionValue getVersionForAssert(final BytesRef uid) {
239+
VersionValue value = getUnderLock(uid, maps);
240+
if (value == null) {
241+
value = getUnderLock(uid, unsafeKeysMap);
242+
}
243+
return value;
244+
}
245+
246+
boolean isUnsafe() {
247+
return maps.current.isUnsafe() || maps.old.isUnsafe();
248+
}
249+
250+
void enforceSafeAccess() {
251+
maps.needsSafeAccess = true;
252+
}
253+
254+
boolean isSafeAccessRequired() {
255+
return maps.isSafeAccessMode();
256+
}
257+
258+
/** Adds this uid/version to the pending adds map iff the map needs safe access. */
259+
void maybePutUnderLock(BytesRef uid, VersionValue version) {
260+
Maps maps = this.maps;
261+
if (maps.isSafeAccessMode()) {
262+
putUnderLock(uid, version, maps);
263+
} else {
264+
maps.current.markAsUnsafe();
265+
assert putAssertionMap(uid, version);
266+
}
267+
}
268+
269+
private boolean putAssertionMap(BytesRef uid, VersionValue version) {
270+
putUnderLock(uid, version, unsafeKeysMap);
271+
return true;
272+
}
273+
152274
/** Adds this uid/version to the pending adds map. */
153275
void putUnderLock(BytesRef uid, VersionValue version) {
276+
Maps maps = this.maps;
277+
putUnderLock(uid, version, maps);
278+
}
279+
280+
/** Adds this uid/version to the pending adds map. */
281+
private void putUnderLock(BytesRef uid, VersionValue version, Maps maps) {
154282
assert uid.bytes.length == uid.length : "Oversized _uid! UID length: " + uid.length + ", bytes length: " + uid.bytes.length;
155283
long uidRAMBytesUsed = BASE_BYTES_PER_BYTESREF + uid.bytes.length;
156-
157284
final VersionValue prev = maps.current.put(uid, version);
158285
if (prev != null) {
159286
// Deduct RAM for the version we just replaced:
@@ -264,5 +391,5 @@ public Collection<Accountable> getChildResources() {
264391

265392
/** Returns the current internal versions as a point in time snapshot*/
266393
Map<BytesRef, VersionValue> getAllCurrent() {
267-
return maps.current;
394+
return maps.current.map;
268395
}}

0 commit comments

Comments
 (0)