Skip to content

Commit 75b4f40

Browse files
authored
Refactor InternalEngine's index/delete flow for better clarity (#23711)
The InternalEngine Index/Delete methods (plus satellites like version loading from Lucene) have accumulated some cruft over the years making it hard to clearly the code flows for various use cases (primary indexing/recovery/replicas etc). This PR refactors those methods for better readability. The methods are broken up into smaller sub methods, albeit at the price of less code I reused. To support the refactoring I have considerably beefed up the versioning tests. This PR is a spin-off from #23543 , which made it clear this is needed.
1 parent c89fdd9 commit 75b4f40

File tree

19 files changed

+1418
-933
lines changed

19 files changed

+1418
-933
lines changed

core/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDAndVersionLookup.java

Lines changed: 40 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import org.apache.lucene.search.DocIdSetIterator;
3030
import org.apache.lucene.util.Bits;
3131
import org.apache.lucene.util.BytesRef;
32-
import org.elasticsearch.common.lucene.uid.Versions.DocIdAndVersion;
32+
import org.elasticsearch.common.lucene.uid.VersionsResolver.DocIdAndVersion;
3333
import org.elasticsearch.index.mapper.UidFieldMapper;
3434
import org.elasticsearch.index.mapper.VersionFieldMapper;
3535

@@ -51,49 +51,66 @@ final class PerThreadIDAndVersionLookup {
5151
private final TermsEnum termsEnum;
5252
/** _version data */
5353
private final NumericDocValues versions;
54+
5455
/** Reused for iteration (when the term exists) */
5556
private PostingsEnum docsEnum;
5657

58+
/** used for assertions to make sure class usage meets assumptions */
59+
private final Object readerKey;
60+
5761
/**
5862
* Initialize lookup for the provided segment
5963
*/
6064
PerThreadIDAndVersionLookup(LeafReader reader) throws IOException {
61-
TermsEnum termsEnum = null;
62-
NumericDocValues versions = null;
63-
6465
Fields fields = reader.fields();
65-
if (fields != null) {
66-
Terms terms = fields.terms(UidFieldMapper.NAME);
67-
if (terms != null) {
68-
termsEnum = terms.iterator();
69-
assert termsEnum != null;
70-
versions = reader.getNumericDocValues(VersionFieldMapper.NAME);
71-
assert versions != null;
72-
}
66+
Terms terms = fields.terms(UidFieldMapper.NAME);
67+
termsEnum = terms.iterator();
68+
if (termsEnum == null) {
69+
throw new IllegalArgumentException("reader misses the [" + UidFieldMapper.NAME +
70+
"] field");
7371
}
74-
75-
this.versions = versions;
76-
this.termsEnum = termsEnum;
72+
versions = reader.getNumericDocValues(VersionFieldMapper.NAME);
73+
if (versions == null) {
74+
throw new IllegalArgumentException("reader misses the [" + VersionFieldMapper.NAME +
75+
"] field");
76+
}
77+
Object readerKey = null;
78+
assert (readerKey = reader.getCoreCacheKey()) != null;
79+
this.readerKey = readerKey;
7780
}
7881

7982
/** Return null if id is not found. */
80-
public DocIdAndVersion lookup(BytesRef id, Bits liveDocs, LeafReaderContext context) throws IOException {
83+
public DocIdAndVersion lookupVersion(BytesRef id, Bits liveDocs, LeafReaderContext context)
84+
throws IOException {
85+
assert context.reader().getCoreCacheKey().equals(readerKey) :
86+
"context's reader is not the same as the reader class was initialized on.";
87+
int docID = getDocID(id, liveDocs);
88+
89+
if (docID != DocIdSetIterator.NO_MORE_DOCS) {
90+
return new DocIdAndVersion(docID, versions.get(docID), context);
91+
} else {
92+
return null;
93+
}
94+
}
95+
96+
/**
97+
* returns the internal lucene doc id for the given id bytes.
98+
* {@link DocIdSetIterator#NO_MORE_DOCS} is returned if not found
99+
* */
100+
private int getDocID(BytesRef id, Bits liveDocs) throws IOException {
81101
if (termsEnum.seekExact(id)) {
102+
int docID = DocIdSetIterator.NO_MORE_DOCS;
82103
// there may be more than one matching docID, in the case of nested docs, so we want the last one:
83104
docsEnum = termsEnum.postings(docsEnum, 0);
84-
int docID = DocIdSetIterator.NO_MORE_DOCS;
85105
for (int d = docsEnum.nextDoc(); d != DocIdSetIterator.NO_MORE_DOCS; d = docsEnum.nextDoc()) {
86106
if (liveDocs != null && liveDocs.get(d) == false) {
87107
continue;
88108
}
89109
docID = d;
90110
}
91-
92-
if (docID != DocIdSetIterator.NO_MORE_DOCS) {
93-
return new DocIdAndVersion(docID, versions.get(docID), context);
94-
}
111+
return docID;
112+
} else {
113+
return DocIdSetIterator.NO_MORE_DOCS;
95114
}
96-
97-
return null;
98115
}
99116
}

core/src/main/java/org/elasticsearch/common/lucene/uid/Versions.java

Lines changed: 1 addition & 232 deletions
Original file line numberDiff line numberDiff line change
@@ -19,32 +19,7 @@
1919

2020
package org.elasticsearch.common.lucene.uid;
2121

22-
import org.apache.lucene.index.Fields;
23-
import org.apache.lucene.index.IndexReader;
24-
import org.apache.lucene.index.LeafReader;
25-
import org.apache.lucene.index.LeafReader.CoreClosedListener;
26-
import org.apache.lucene.index.LeafReaderContext;
27-
import org.apache.lucene.index.NumericDocValues;
28-
import org.apache.lucene.index.PostingsEnum;
29-
import org.apache.lucene.index.SortedNumericDocValues;
30-
import org.apache.lucene.index.Term;
31-
import org.apache.lucene.index.Terms;
32-
import org.apache.lucene.index.TermsEnum;
33-
import org.apache.lucene.search.DocIdSetIterator;
34-
import org.apache.lucene.util.Bits;
35-
import org.apache.lucene.util.BytesRef;
36-
import org.apache.lucene.util.CloseableThreadLocal;
37-
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
38-
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
39-
import org.elasticsearch.index.mapper.UidFieldMapper;
40-
import org.elasticsearch.index.seqno.SequenceNumbersService;
41-
42-
import java.io.IOException;
43-
import java.util.List;
44-
import java.util.concurrent.ConcurrentMap;
45-
46-
/** Utility class to resolve the Lucene doc ID and version for a given uid. */
47-
public class Versions {
22+
public final class Versions {
4823

4924
/** used to indicate the write operation should succeed regardless of current version **/
5025
public static final long MATCH_ANY = -3L;
@@ -59,210 +34,4 @@ public class Versions {
5934
* i.e., not found in the index and/or found as deleted (with version) in the version map
6035
*/
6136
public static final long MATCH_DELETED = -4L;
62-
63-
// TODO: is there somewhere else we can store these?
64-
static final ConcurrentMap<Object, CloseableThreadLocal<PerThreadIDAndVersionLookup>> lookupStates = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
65-
66-
// Evict this reader from lookupStates once it's closed:
67-
private static final CoreClosedListener removeLookupState = new CoreClosedListener() {
68-
@Override
69-
public void onClose(Object key) {
70-
CloseableThreadLocal<PerThreadIDAndVersionLookup> ctl = lookupStates.remove(key);
71-
if (ctl != null) {
72-
ctl.close();
73-
}
74-
}
75-
};
76-
77-
private static PerThreadIDAndVersionLookup getLookupState(LeafReader reader) throws IOException {
78-
Object key = reader.getCoreCacheKey();
79-
CloseableThreadLocal<PerThreadIDAndVersionLookup> ctl = lookupStates.get(key);
80-
if (ctl == null) {
81-
// First time we are seeing this reader's core; make a
82-
// new CTL:
83-
ctl = new CloseableThreadLocal<>();
84-
CloseableThreadLocal<PerThreadIDAndVersionLookup> other = lookupStates.putIfAbsent(key, ctl);
85-
if (other == null) {
86-
// Our CTL won, we must remove it when the
87-
// core is closed:
88-
reader.addCoreClosedListener(removeLookupState);
89-
} else {
90-
// Another thread beat us to it: just use
91-
// their CTL:
92-
ctl = other;
93-
}
94-
}
95-
96-
PerThreadIDAndVersionLookup lookupState = ctl.get();
97-
if (lookupState == null) {
98-
lookupState = new PerThreadIDAndVersionLookup(reader);
99-
ctl.set(lookupState);
100-
}
101-
102-
return lookupState;
103-
}
104-
105-
private Versions() {
106-
}
107-
108-
/** Wraps an {@link LeafReaderContext}, a doc ID <b>relative to the context doc base</b> and a version. */
109-
public static class DocIdAndVersion {
110-
public final int docId;
111-
public final long version;
112-
public final LeafReaderContext context;
113-
114-
public DocIdAndVersion(int docId, long version, LeafReaderContext context) {
115-
this.docId = docId;
116-
this.version = version;
117-
this.context = context;
118-
}
119-
}
120-
121-
/**
122-
* Load the internal doc ID and version for the uid from the reader, returning<ul>
123-
* <li>null if the uid wasn't found,
124-
* <li>a doc ID and a version otherwise
125-
* </ul>
126-
*/
127-
public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term) throws IOException {
128-
assert term.field().equals(UidFieldMapper.NAME);
129-
List<LeafReaderContext> leaves = reader.leaves();
130-
if (leaves.isEmpty()) {
131-
return null;
132-
}
133-
// iterate backwards to optimize for the frequently updated documents
134-
// which are likely to be in the last segments
135-
for (int i = leaves.size() - 1; i >= 0; i--) {
136-
LeafReaderContext context = leaves.get(i);
137-
LeafReader leaf = context.reader();
138-
PerThreadIDAndVersionLookup lookup = getLookupState(leaf);
139-
DocIdAndVersion result = lookup.lookup(term.bytes(), leaf.getLiveDocs(), context);
140-
if (result != null) {
141-
return result;
142-
}
143-
}
144-
return null;
145-
}
146-
147-
/**
148-
* Load the version for the uid from the reader, returning<ul>
149-
* <li>{@link #NOT_FOUND} if no matching doc exists,
150-
* <li>the version associated with the provided uid otherwise
151-
* </ul>
152-
*/
153-
public static long loadVersion(IndexReader reader, Term term) throws IOException {
154-
final DocIdAndVersion docIdAndVersion = loadDocIdAndVersion(reader, term);
155-
return docIdAndVersion == null ? NOT_FOUND : docIdAndVersion.version;
156-
}
157-
158-
159-
/**
160-
* Returns the sequence number for the given uid term, returning
161-
* {@code SequenceNumbersService.UNASSIGNED_SEQ_NO} if none is found.
162-
*/
163-
public static long loadSeqNo(IndexReader reader, Term term) throws IOException {
164-
assert term.field().equals(UidFieldMapper.NAME) : "can only load _seq_no by uid";
165-
List<LeafReaderContext> leaves = reader.leaves();
166-
if (leaves.isEmpty()) {
167-
return SequenceNumbersService.UNASSIGNED_SEQ_NO;
168-
}
169-
170-
// iterate backwards to optimize for the frequently updated documents
171-
// which are likely to be in the last segments
172-
for (int i = leaves.size() - 1; i >= 0; i--) {
173-
LeafReader leaf = leaves.get(i).reader();
174-
Bits liveDocs = leaf.getLiveDocs();
175-
176-
TermsEnum termsEnum = null;
177-
SortedNumericDocValues dvField = null;
178-
PostingsEnum docsEnum = null;
179-
180-
final Fields fields = leaf.fields();
181-
if (fields != null) {
182-
Terms terms = fields.terms(UidFieldMapper.NAME);
183-
if (terms != null) {
184-
termsEnum = terms.iterator();
185-
assert termsEnum != null;
186-
dvField = leaf.getSortedNumericDocValues(SeqNoFieldMapper.NAME);
187-
assert dvField != null;
188-
189-
final BytesRef id = term.bytes();
190-
if (termsEnum.seekExact(id)) {
191-
// there may be more than one matching docID, in the
192-
// case of nested docs, so we want the last one:
193-
docsEnum = termsEnum.postings(docsEnum, 0);
194-
int docID = DocIdSetIterator.NO_MORE_DOCS;
195-
for (int d = docsEnum.nextDoc(); d != DocIdSetIterator.NO_MORE_DOCS; d = docsEnum.nextDoc()) {
196-
if (liveDocs != null && liveDocs.get(d) == false) {
197-
continue;
198-
}
199-
docID = d;
200-
}
201-
202-
if (docID != DocIdSetIterator.NO_MORE_DOCS) {
203-
dvField.setDocument(docID);
204-
assert dvField.count() == 1 : "expected only a single value for _seq_no but got " +
205-
dvField.count();
206-
return dvField.valueAt(0);
207-
}
208-
}
209-
}
210-
}
211-
212-
}
213-
return SequenceNumbersService.UNASSIGNED_SEQ_NO;
214-
}
215-
216-
/**
217-
* Returns the primary term for the given uid term, returning {@code 0} if none is found.
218-
*/
219-
public static long loadPrimaryTerm(IndexReader reader, Term term) throws IOException {
220-
assert term.field().equals(UidFieldMapper.NAME) : "can only load _primary_term by uid";
221-
List<LeafReaderContext> leaves = reader.leaves();
222-
if (leaves.isEmpty()) {
223-
return 0;
224-
}
225-
226-
// iterate backwards to optimize for the frequently updated documents
227-
// which are likely to be in the last segments
228-
for (int i = leaves.size() - 1; i >= 0; i--) {
229-
LeafReader leaf = leaves.get(i).reader();
230-
Bits liveDocs = leaf.getLiveDocs();
231-
232-
TermsEnum termsEnum = null;
233-
NumericDocValues dvField = null;
234-
PostingsEnum docsEnum = null;
235-
236-
final Fields fields = leaf.fields();
237-
if (fields != null) {
238-
Terms terms = fields.terms(UidFieldMapper.NAME);
239-
if (terms != null) {
240-
termsEnum = terms.iterator();
241-
assert termsEnum != null;
242-
dvField = leaf.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
243-
assert dvField != null;
244-
245-
final BytesRef id = term.bytes();
246-
if (termsEnum.seekExact(id)) {
247-
// there may be more than one matching docID, in the
248-
// case of nested docs, so we want the last one:
249-
docsEnum = termsEnum.postings(docsEnum, 0);
250-
int docID = DocIdSetIterator.NO_MORE_DOCS;
251-
for (int d = docsEnum.nextDoc(); d != DocIdSetIterator.NO_MORE_DOCS; d = docsEnum.nextDoc()) {
252-
if (liveDocs != null && liveDocs.get(d) == false) {
253-
continue;
254-
}
255-
docID = d;
256-
}
257-
258-
if (docID != DocIdSetIterator.NO_MORE_DOCS) {
259-
return dvField.get(docID);
260-
}
261-
}
262-
}
263-
}
264-
265-
}
266-
return 0;
267-
}
26837
}

0 commit comments

Comments
 (0)