Skip to content

Commit ecf8168

Browse files
authored
Use sequence numbers to identify out of order delivery in replicas & recovery (#24060)
Internal indexing requests in Elasticsearch may be processed out of order and repeatedly. This is important during recovery and due to concurrency in replicating requests between primary and replicas. As such, a replica/recovering shard needs to be able to identify that an incoming request contains information that is old and thus need not be processed. The current logic is based on external version. This is sadly not sufficient. This PR moves the logic to rely on sequences numbers and primary terms which give the semantics we need. Relates to #10708
1 parent 162ce85 commit ecf8168

26 files changed

+413
-377
lines changed

core/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDAndVersionLookup.java renamed to core/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,12 @@
2929
import org.apache.lucene.search.DocIdSetIterator;
3030
import org.apache.lucene.util.Bits;
3131
import org.apache.lucene.util.BytesRef;
32-
import org.elasticsearch.common.lucene.uid.VersionsResolver.DocIdAndVersion;
32+
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo;
33+
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion;
34+
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
3335
import org.elasticsearch.index.mapper.UidFieldMapper;
3436
import org.elasticsearch.index.mapper.VersionFieldMapper;
37+
import org.elasticsearch.index.seqno.SequenceNumbersService;
3538

3639
import java.io.IOException;
3740

@@ -43,15 +46,18 @@
4346
* in more than one document! It will only return the first one it
4447
* finds. */
4548

46-
final class PerThreadIDAndVersionLookup {
49+
final class PerThreadIDVersionAndSeqNoLookup {
4750
// TODO: do we really need to store all this stuff? some if it might not speed up anything.
4851
// we keep it around for now, to reduce the amount of e.g. hash lookups by field and stuff
4952

5053
/** terms enum for uid field */
5154
private final TermsEnum termsEnum;
5255
/** _version data */
5356
private final NumericDocValues versions;
54-
57+
/** _seq_no data */
58+
private final NumericDocValues seqNos;
59+
/** _primary_term data */
60+
private final NumericDocValues primaryTerms;
5561
/** Reused for iteration (when the term exists) */
5662
private PostingsEnum docsEnum;
5763

@@ -61,7 +67,7 @@ final class PerThreadIDAndVersionLookup {
6167
/**
6268
* Initialize lookup for the provided segment
6369
*/
64-
PerThreadIDAndVersionLookup(LeafReader reader) throws IOException {
70+
PerThreadIDVersionAndSeqNoLookup(LeafReader reader) throws IOException {
6571
Fields fields = reader.fields();
6672
Terms terms = fields.terms(UidFieldMapper.NAME);
6773
termsEnum = terms.iterator();
@@ -74,6 +80,8 @@ final class PerThreadIDAndVersionLookup {
7480
throw new IllegalArgumentException("reader misses the [" + VersionFieldMapper.NAME +
7581
"] field");
7682
}
83+
seqNos = reader.getNumericDocValues(SeqNoFieldMapper.NAME);
84+
primaryTerms = reader.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
7785
Object readerKey = null;
7886
assert (readerKey = reader.getCoreCacheKey()) != null;
7987
this.readerKey = readerKey;
@@ -113,4 +121,25 @@ private int getDocID(BytesRef id, Bits liveDocs) throws IOException {
113121
return DocIdSetIterator.NO_MORE_DOCS;
114122
}
115123
}
124+
125+
/** Return null if id is not found. */
126+
DocIdAndSeqNo lookupSeqNo(BytesRef id, Bits liveDocs, LeafReaderContext context) throws IOException {
127+
assert context.reader().getCoreCacheKey().equals(readerKey) :
128+
"context's reader is not the same as the reader class was initialized on.";
129+
int docID = getDocID(id, liveDocs);
130+
if (docID != DocIdSetIterator.NO_MORE_DOCS) {
131+
return new DocIdAndSeqNo(docID, seqNos == null ? SequenceNumbersService.UNASSIGNED_SEQ_NO : seqNos.get(docID), context);
132+
} else {
133+
return null;
134+
}
135+
}
136+
137+
/**
138+
* returns 0 if the primary term is not found.
139+
*
140+
* Note that 0 is an illegal primary term. See {@link org.elasticsearch.cluster.metadata.IndexMetaData#primaryTerm(int)}
141+
**/
142+
long lookUpPrimaryTerm(int docID) throws IOException {
143+
return primaryTerms == null ? 0 : primaryTerms.get(docID);
144+
}
116145
}
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
/*
2+
* Licensed to Elasticsearch under one or more contributor
3+
* license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright
5+
* ownership. Elasticsearch licenses this file to you under
6+
* the Apache License, Version 2.0 (the "License"); you may
7+
* not use this file except in compliance with the License.
8+
* You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.elasticsearch.common.lucene.uid;
21+
22+
import org.apache.lucene.index.IndexReader;
23+
import org.apache.lucene.index.LeafReader;
24+
import org.apache.lucene.index.LeafReader.CoreClosedListener;
25+
import org.apache.lucene.index.LeafReaderContext;
26+
import org.apache.lucene.index.Term;
27+
import org.apache.lucene.util.CloseableThreadLocal;
28+
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
29+
import org.elasticsearch.index.mapper.UidFieldMapper;
30+
31+
import java.io.IOException;
32+
import java.util.List;
33+
import java.util.concurrent.ConcurrentMap;
34+
35+
import static org.elasticsearch.common.lucene.uid.Versions.NOT_FOUND;
36+
37+
/** Utility class to resolve the Lucene doc ID, version, seqNo and primaryTerms for a given uid. */
38+
public final class VersionsAndSeqNoResolver {
39+
40+
static final ConcurrentMap<Object, CloseableThreadLocal<PerThreadIDVersionAndSeqNoLookup>> lookupStates =
41+
ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
42+
43+
// Evict this reader from lookupStates once it's closed:
44+
private static final CoreClosedListener removeLookupState = key -> {
45+
CloseableThreadLocal<PerThreadIDVersionAndSeqNoLookup> ctl = lookupStates.remove(key);
46+
if (ctl != null) {
47+
ctl.close();
48+
}
49+
};
50+
51+
private static PerThreadIDVersionAndSeqNoLookup getLookupState(LeafReader reader) throws IOException {
52+
Object key = reader.getCoreCacheKey();
53+
CloseableThreadLocal<PerThreadIDVersionAndSeqNoLookup> ctl = lookupStates.get(key);
54+
if (ctl == null) {
55+
// First time we are seeing this reader's core; make a new CTL:
56+
ctl = new CloseableThreadLocal<>();
57+
CloseableThreadLocal<PerThreadIDVersionAndSeqNoLookup> other = lookupStates.putIfAbsent(key, ctl);
58+
if (other == null) {
59+
// Our CTL won, we must remove it when the core is closed:
60+
reader.addCoreClosedListener(removeLookupState);
61+
} else {
62+
// Another thread beat us to it: just use their CTL:
63+
ctl = other;
64+
}
65+
}
66+
67+
PerThreadIDVersionAndSeqNoLookup lookupState = ctl.get();
68+
if (lookupState == null) {
69+
lookupState = new PerThreadIDVersionAndSeqNoLookup(reader);
70+
ctl.set(lookupState);
71+
}
72+
73+
return lookupState;
74+
}
75+
76+
private VersionsAndSeqNoResolver() {
77+
}
78+
79+
/** Wraps an {@link LeafReaderContext}, a doc ID <b>relative to the context doc base</b> and a version. */
80+
public static class DocIdAndVersion {
81+
public final int docId;
82+
public final long version;
83+
public final LeafReaderContext context;
84+
85+
DocIdAndVersion(int docId, long version, LeafReaderContext context) {
86+
this.docId = docId;
87+
this.version = version;
88+
this.context = context;
89+
}
90+
}
91+
92+
/** Wraps an {@link LeafReaderContext}, a doc ID <b>relative to the context doc base</b> and a seqNo. */
93+
public static class DocIdAndSeqNo {
94+
public final int docId;
95+
public final long seqNo;
96+
public final LeafReaderContext context;
97+
98+
DocIdAndSeqNo(int docId, long seqNo, LeafReaderContext context) {
99+
this.docId = docId;
100+
this.seqNo = seqNo;
101+
this.context = context;
102+
}
103+
}
104+
105+
106+
/**
107+
* Load the internal doc ID and version for the uid from the reader, returning<ul>
108+
* <li>null if the uid wasn't found,
109+
* <li>a doc ID and a version otherwise
110+
* </ul>
111+
*/
112+
public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term) throws IOException {
113+
assert term.field().equals(UidFieldMapper.NAME) : "unexpected term field " + term.field();
114+
List<LeafReaderContext> leaves = reader.leaves();
115+
if (leaves.isEmpty()) {
116+
return null;
117+
}
118+
// iterate backwards to optimize for the frequently updated documents
119+
// which are likely to be in the last segments
120+
for (int i = leaves.size() - 1; i >= 0; i--) {
121+
LeafReaderContext context = leaves.get(i);
122+
LeafReader leaf = context.reader();
123+
PerThreadIDVersionAndSeqNoLookup lookup = getLookupState(leaf);
124+
DocIdAndVersion result = lookup.lookupVersion(term.bytes(), leaf.getLiveDocs(), context);
125+
if (result != null) {
126+
return result;
127+
}
128+
}
129+
return null;
130+
}
131+
132+
/**
133+
* Load the internal doc ID and sequence number for the uid from the reader, returning<ul>
134+
* <li>null if the uid wasn't found,
135+
* <li>a doc ID and the associated seqNo otherwise
136+
* </ul>
137+
*/
138+
public static DocIdAndSeqNo loadDocIdAndSeqNo(IndexReader reader, Term term) throws IOException {
139+
assert term.field().equals(UidFieldMapper.NAME) : "unexpected term field " + term.field();
140+
List<LeafReaderContext> leaves = reader.leaves();
141+
if (leaves.isEmpty()) {
142+
return null;
143+
}
144+
// iterate backwards to optimize for the frequently updated documents
145+
// which are likely to be in the last segments
146+
for (int i = leaves.size() - 1; i >= 0; i--) {
147+
LeafReaderContext context = leaves.get(i);
148+
LeafReader leaf = context.reader();
149+
PerThreadIDVersionAndSeqNoLookup lookup = getLookupState(leaf);
150+
DocIdAndSeqNo result = lookup.lookupSeqNo(term.bytes(), leaf.getLiveDocs(), context);
151+
if (result != null) {
152+
return result;
153+
}
154+
}
155+
return null;
156+
}
157+
158+
/**
159+
* Load the primaryTerm associated with the given {@link DocIdAndSeqNo}
160+
*/
161+
public static long loadPrimaryTerm(DocIdAndSeqNo docIdAndSeqNo) throws IOException {
162+
LeafReader leaf = docIdAndSeqNo.context.reader();
163+
PerThreadIDVersionAndSeqNoLookup lookup = getLookupState(leaf);
164+
long result = lookup.lookUpPrimaryTerm(docIdAndSeqNo.docId);
165+
assert result > 0 : "should always resolve a primary term for a resolved sequence number. primary_term [" + result + "]"
166+
+ " docId [" + docIdAndSeqNo.docId + "] seqNo [" + docIdAndSeqNo.seqNo + "]";
167+
return result;
168+
}
169+
170+
/**
171+
* Load the version for the uid from the reader, returning<ul>
172+
* <li>{@link Versions#NOT_FOUND} if no matching doc exists,
173+
* <li>the version associated with the provided uid otherwise
174+
* </ul>
175+
*/
176+
public static long loadVersion(IndexReader reader, Term term) throws IOException {
177+
final DocIdAndVersion docIdAndVersion = loadDocIdAndVersion(reader, term);
178+
return docIdAndVersion == null ? NOT_FOUND : docIdAndVersion.version;
179+
}
180+
}

0 commit comments

Comments
 (0)