Skip to content

Commit a72eaa8

Browse files
authored
Identify documents by their _id. (#24460)
Now that indices have a single type by default, we can move to the next step and identify documents using their `_id` rather than the `_uid`. One notable change in this commit is that I made deletions implicitly create types. This helps with the live version map in the case that documents are deleted before the first type is introduced. Otherwise there would be no way to differenciate `DELETE index/foo/1` followed by `PUT index/foo/1` from `DELETE index/bar/1` followed by `PUT index/foo/1`, even though those are different if versioning is involved.
1 parent f222748 commit a72eaa8

File tree

68 files changed

+1288
-533
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

68 files changed

+1288
-533
lines changed

core/src/main/java/org/elasticsearch/action/bulk/MappingUpdatePerformer.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,9 @@
1919

2020
package org.elasticsearch.action.bulk;
2121

22-
import org.elasticsearch.action.index.IndexRequest;
23-
import org.elasticsearch.index.engine.Engine;
2422
import org.elasticsearch.index.mapper.Mapping;
25-
import org.elasticsearch.index.shard.IndexShard;
2623
import org.elasticsearch.index.shard.ShardId;
2724

28-
import java.util.Objects;
29-
3025
public interface MappingUpdatePerformer {
3126

3227
/**
@@ -39,6 +34,6 @@ public interface MappingUpdatePerformer {
3934
* retried on the primary due to the mappings not being present yet, or a different exception if
4035
* updating the mappings on the master failed.
4136
*/
42-
void verifyMappings(Engine.Index operation, ShardId shardId) throws Exception;
37+
void verifyMappings(Mapping update, ShardId shardId) throws Exception;
4338

4439
}

core/src/main/java/org/elasticsearch/action/bulk/TransportShardBulkAction.java

Lines changed: 51 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import org.elasticsearch.index.engine.VersionConflictEngineException;
5757
import org.elasticsearch.index.get.GetResult;
5858
import org.elasticsearch.index.mapper.MapperParsingException;
59+
import org.elasticsearch.index.mapper.MapperService;
5960
import org.elasticsearch.index.mapper.Mapping;
6061
import org.elasticsearch.index.mapper.SourceToParse;
6162
import org.elasticsearch.index.seqno.SequenceNumbersService;
@@ -150,8 +151,9 @@ private static BulkItemResultHolder executeIndexRequest(final IndexRequest index
150151

151152
private static BulkItemResultHolder executeDeleteRequest(final DeleteRequest deleteRequest,
152153
final BulkItemRequest bulkItemRequest,
153-
final IndexShard primary) throws IOException {
154-
Engine.DeleteResult deleteResult = executeDeleteRequestOnPrimary(deleteRequest, primary);
154+
final IndexShard primary,
155+
final MappingUpdatePerformer mappingUpdater) throws Exception {
156+
Engine.DeleteResult deleteResult = executeDeleteRequestOnPrimary(deleteRequest, primary, mappingUpdater);
155157
if (deleteResult.hasFailure()) {
156158
return new BulkItemResultHolder(null, deleteResult, bulkItemRequest);
157159
} else {
@@ -241,7 +243,7 @@ static Translog.Location executeBulkItemRequest(IndexMetaData metaData, IndexSha
241243
requestIndex, updateHelper, nowInMillisSupplier, mappingUpdater);
242244
break;
243245
case DELETE:
244-
responseHolder = executeDeleteRequest((DeleteRequest) itemRequest, request.items()[requestIndex], primary);
246+
responseHolder = executeDeleteRequest((DeleteRequest) itemRequest, request.items()[requestIndex], primary, mappingUpdater);
245247
break;
246248
default: throw new IllegalStateException("unexpected opType [" + itemRequest.opType() + "] found");
247249
}
@@ -303,7 +305,7 @@ private static BulkItemResultHolder executeUpdateRequest(UpdateRequest updateReq
303305
break;
304306
case DELETED:
305307
DeleteRequest deleteRequest = translate.action();
306-
result = executeDeleteRequestOnPrimary(deleteRequest, primary);
308+
result = executeDeleteRequestOnPrimary(deleteRequest, primary, mappingUpdater);
307309
break;
308310
case NOOP:
309311
primary.noopUpdate(updateRequest.type());
@@ -609,7 +611,7 @@ static Engine.IndexResult executeIndexRequestOnPrimary(IndexRequest request, Ind
609611
if (mappingUpdateNeeded) {
610612
try {
611613
operation = prepareIndexOperationOnPrimary(request, primary);
612-
mappingUpdater.verifyMappings(operation, primary.shardId());
614+
mappingUpdater.verifyMappings(operation.parsedDoc().dynamicMappingsUpdate(), primary.shardId());
613615
} catch (MapperParsingException | IllegalStateException e) {
614616
// there was an error in parsing the document that was not because
615617
// of pending mapping updates, so return a failure for the result
@@ -623,12 +625,52 @@ static Engine.IndexResult executeIndexRequestOnPrimary(IndexRequest request, Ind
623625
return primary.index(operation);
624626
}
625627

626-
private static Engine.DeleteResult executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard primary) throws IOException {
628+
private static Engine.DeleteResult executeDeleteRequestOnPrimary(DeleteRequest request, IndexShard primary,
629+
final MappingUpdatePerformer mappingUpdater) throws Exception {
630+
boolean mappingUpdateNeeded = false;
631+
if (primary.indexSettings().isSingleType()) {
632+
// When there is a single type, the unique identifier is only composed of the _id,
633+
// so there is no way to differenciate foo#1 from bar#1. This is especially an issue
634+
// if a user first deletes foo#1 and then indexes bar#1: since we do not encode the
635+
// _type in the uid it might look like we are reindexing the same document, which
636+
// would fail if bar#1 is indexed with a lower version than foo#1 was deleted with.
637+
// In order to work around this issue, we make deletions create types. This way, we
638+
// fail if index and delete operations do not use the same type.
639+
try {
640+
Mapping update = primary.mapperService().documentMapperWithAutoCreate(request.type()).getMapping();
641+
if (update != null) {
642+
mappingUpdateNeeded = true;
643+
mappingUpdater.updateMappings(update, primary.shardId(), request.type());
644+
}
645+
} catch (MapperParsingException | IllegalArgumentException e) {
646+
return new Engine.DeleteResult(e, request.version(), SequenceNumbersService.UNASSIGNED_SEQ_NO, false);
647+
}
648+
}
649+
if (mappingUpdateNeeded) {
650+
Mapping update = primary.mapperService().documentMapperWithAutoCreate(request.type()).getMapping();
651+
mappingUpdater.verifyMappings(update, primary.shardId());
652+
}
627653
final Engine.Delete delete = primary.prepareDeleteOnPrimary(request.type(), request.id(), request.version(), request.versionType());
628654
return primary.delete(delete);
629655
}
630656

631-
private static Engine.DeleteResult executeDeleteRequestOnReplica(DocWriteResponse primaryResponse, DeleteRequest request, IndexShard replica) throws IOException {
657+
private static Engine.DeleteResult executeDeleteRequestOnReplica(DocWriteResponse primaryResponse, DeleteRequest request,
658+
IndexShard replica) throws Exception {
659+
if (replica.indexSettings().isSingleType()) {
660+
// We need to wait for the replica to have the mappings
661+
Mapping update;
662+
try {
663+
update = replica.mapperService().documentMapperWithAutoCreate(request.type()).getMapping();
664+
} catch (MapperParsingException | IllegalArgumentException e) {
665+
return new Engine.DeleteResult(e, request.version(), primaryResponse.getSeqNo(), false);
666+
}
667+
if (update != null) {
668+
final ShardId shardId = replica.shardId();
669+
throw new RetryOnReplicaException(shardId,
670+
"Mappings are not available on the replica yet, triggered update: " + update);
671+
}
672+
}
673+
632674
final VersionType versionType = request.versionType().versionTypeForReplicationAndRecovery();
633675
final long version = primaryResponse.getVersion();
634676
assert versionType.validateVersionForWrites(version);
@@ -654,9 +696,9 @@ public void updateMappings(final Mapping update, final ShardId shardId,
654696
}
655697
}
656698

657-
public void verifyMappings(final Engine.Index operation,
699+
public void verifyMappings(Mapping update,
658700
final ShardId shardId) throws Exception {
659-
if (operation.parsedDoc().dynamicMappingsUpdate() != null) {
701+
if (update != null) {
660702
throw new ReplicationOperation.RetryOnPrimaryException(shardId,
661703
"Dynamic mappings are not available on the node that holds the primary yet");
662704
}

core/src/main/java/org/elasticsearch/action/explain/TransportExplainAction.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,8 +35,6 @@
3535
import org.elasticsearch.common.settings.Settings;
3636
import org.elasticsearch.index.engine.Engine;
3737
import org.elasticsearch.index.get.GetResult;
38-
import org.elasticsearch.index.mapper.Uid;
39-
import org.elasticsearch.index.mapper.UidFieldMapper;
4038
import org.elasticsearch.index.shard.ShardId;
4139
import org.elasticsearch.search.SearchService;
4240
import org.elasticsearch.search.internal.AliasFilter;
@@ -93,10 +91,13 @@ protected ExplainResponse shardOperation(ExplainRequest request, ShardId shardId
9391
ShardSearchLocalRequest shardSearchLocalRequest = new ShardSearchLocalRequest(shardId,
9492
new String[]{request.type()}, request.nowInMillis, request.filteringAlias());
9593
SearchContext context = searchService.createSearchContext(shardSearchLocalRequest, SearchService.NO_TIMEOUT, null);
96-
Term uidTerm = new Term(UidFieldMapper.NAME, Uid.createUidAsBytes(request.type(), request.id()));
9794
Engine.GetResult result = null;
9895
try {
99-
result = context.indexShard().get(new Engine.Get(false, uidTerm));
96+
Term uidTerm = context.mapperService().createUidTerm(request.type(), request.id());
97+
if (uidTerm == null) {
98+
return new ExplainResponse(shardId.getIndexName(), request.type(), request.id(), false);
99+
}
100+
result = context.indexShard().get(new Engine.Get(false, request.type(), request.id(), uidTerm));
100101
if (!result.exists()) {
101102
return new ExplainResponse(shardId.getIndexName(), request.type(), request.id(), false);
102103
}

core/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDVersionAndSeqNoLookup.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndSeqNo;
3333
import org.elasticsearch.common.lucene.uid.VersionsAndSeqNoResolver.DocIdAndVersion;
3434
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
35-
import org.elasticsearch.index.mapper.UidFieldMapper;
3635
import org.elasticsearch.index.mapper.VersionFieldMapper;
3736
import org.elasticsearch.index.seqno.SequenceNumbersService;
3837

@@ -51,6 +50,7 @@ final class PerThreadIDVersionAndSeqNoLookup {
5150
// we keep it around for now, to reduce the amount of e.g. hash lookups by field and stuff
5251

5352
/** terms enum for uid field */
53+
final String uidField;
5454
private final TermsEnum termsEnum;
5555

5656
/** Reused for iteration (when the term exists) */
@@ -62,13 +62,14 @@ final class PerThreadIDVersionAndSeqNoLookup {
6262
/**
6363
* Initialize lookup for the provided segment
6464
*/
65-
PerThreadIDVersionAndSeqNoLookup(LeafReader reader) throws IOException {
65+
PerThreadIDVersionAndSeqNoLookup(LeafReader reader, String uidField) throws IOException {
66+
this.uidField = uidField;
6667
Fields fields = reader.fields();
67-
Terms terms = fields.terms(UidFieldMapper.NAME);
68-
termsEnum = terms.iterator();
69-
if (termsEnum == null) {
70-
throw new IllegalArgumentException("reader misses the [" + UidFieldMapper.NAME + "] field");
68+
Terms terms = fields.terms(uidField);
69+
if (terms == null) {
70+
throw new IllegalArgumentException("reader misses the [" + uidField + "] field");
7171
}
72+
termsEnum = terms.iterator();
7273
if (reader.getNumericDocValues(VersionFieldMapper.NAME) == null) {
7374
throw new IllegalArgumentException("reader misses the [" + VersionFieldMapper.NAME + "] field");
7475
}

core/src/main/java/org/elasticsearch/common/lucene/uid/VersionsAndSeqNoResolver.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -25,10 +25,10 @@
2525
import org.apache.lucene.index.Term;
2626
import org.apache.lucene.util.CloseableThreadLocal;
2727
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
28-
import org.elasticsearch.index.mapper.UidFieldMapper;
2928

3029
import java.io.IOException;
3130
import java.util.List;
31+
import java.util.Objects;
3232
import java.util.concurrent.ConcurrentMap;
3333

3434
import static org.elasticsearch.common.lucene.uid.Versions.NOT_FOUND;
@@ -47,7 +47,7 @@ public final class VersionsAndSeqNoResolver {
4747
}
4848
};
4949

50-
private static PerThreadIDVersionAndSeqNoLookup getLookupState(LeafReader reader) throws IOException {
50+
private static PerThreadIDVersionAndSeqNoLookup getLookupState(LeafReader reader, String uidField) throws IOException {
5151
IndexReader.CacheHelper cacheHelper = reader.getCoreCacheHelper();
5252
CloseableThreadLocal<PerThreadIDVersionAndSeqNoLookup> ctl = lookupStates.get(cacheHelper.getKey());
5353
if (ctl == null) {
@@ -65,8 +65,11 @@ private static PerThreadIDVersionAndSeqNoLookup getLookupState(LeafReader reader
6565

6666
PerThreadIDVersionAndSeqNoLookup lookupState = ctl.get();
6767
if (lookupState == null) {
68-
lookupState = new PerThreadIDVersionAndSeqNoLookup(reader);
68+
lookupState = new PerThreadIDVersionAndSeqNoLookup(reader, uidField);
6969
ctl.set(lookupState);
70+
} else if (Objects.equals(lookupState.uidField, uidField) == false) {
71+
throw new AssertionError("Index does not consistently use the same uid field: ["
72+
+ uidField + "] != [" + lookupState.uidField + "]");
7073
}
7174

7275
return lookupState;
@@ -109,7 +112,6 @@ public static class DocIdAndSeqNo {
109112
* </ul>
110113
*/
111114
public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term) throws IOException {
112-
assert term.field().equals(UidFieldMapper.NAME) : "unexpected term field " + term.field();
113115
List<LeafReaderContext> leaves = reader.leaves();
114116
if (leaves.isEmpty()) {
115117
return null;
@@ -119,7 +121,7 @@ public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term)
119121
for (int i = leaves.size() - 1; i >= 0; i--) {
120122
LeafReaderContext context = leaves.get(i);
121123
LeafReader leaf = context.reader();
122-
PerThreadIDVersionAndSeqNoLookup lookup = getLookupState(leaf);
124+
PerThreadIDVersionAndSeqNoLookup lookup = getLookupState(leaf, term.field());
123125
DocIdAndVersion result = lookup.lookupVersion(term.bytes(), leaf.getLiveDocs(), context);
124126
if (result != null) {
125127
return result;
@@ -135,7 +137,6 @@ public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term)
135137
* </ul>
136138
*/
137139
public static DocIdAndSeqNo loadDocIdAndSeqNo(IndexReader reader, Term term) throws IOException {
138-
assert term.field().equals(UidFieldMapper.NAME) : "unexpected term field " + term.field();
139140
List<LeafReaderContext> leaves = reader.leaves();
140141
if (leaves.isEmpty()) {
141142
return null;
@@ -145,7 +146,7 @@ public static DocIdAndSeqNo loadDocIdAndSeqNo(IndexReader reader, Term term) thr
145146
for (int i = leaves.size() - 1; i >= 0; i--) {
146147
LeafReaderContext context = leaves.get(i);
147148
LeafReader leaf = context.reader();
148-
PerThreadIDVersionAndSeqNoLookup lookup = getLookupState(leaf);
149+
PerThreadIDVersionAndSeqNoLookup lookup = getLookupState(leaf, term.field());
149150
DocIdAndSeqNo result = lookup.lookupSeqNo(term.bytes(), leaf.getLiveDocs(), context);
150151
if (result != null) {
151152
return result;
@@ -157,9 +158,9 @@ public static DocIdAndSeqNo loadDocIdAndSeqNo(IndexReader reader, Term term) thr
157158
/**
158159
* Load the primaryTerm associated with the given {@link DocIdAndSeqNo}
159160
*/
160-
public static long loadPrimaryTerm(DocIdAndSeqNo docIdAndSeqNo) throws IOException {
161+
public static long loadPrimaryTerm(DocIdAndSeqNo docIdAndSeqNo, String uidField) throws IOException {
161162
LeafReader leaf = docIdAndSeqNo.context.reader();
162-
PerThreadIDVersionAndSeqNoLookup lookup = getLookupState(leaf);
163+
PerThreadIDVersionAndSeqNoLookup lookup = getLookupState(leaf, uidField);
163164
long result = lookup.lookUpPrimaryTerm(docIdAndSeqNo.docId, leaf);
164165
assert result > 0 : "should always resolve a primary term for a resolved sequence number. primary_term [" + result + "]"
165166
+ " docId [" + docIdAndSeqNo.docId + "] seqNo [" + docIdAndSeqNo.seqNo + "]";

core/src/main/java/org/elasticsearch/index/IndexSettings.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,12 +31,12 @@
3131
import org.elasticsearch.common.unit.ByteSizeValue;
3232
import org.elasticsearch.common.unit.TimeValue;
3333
import org.elasticsearch.index.mapper.AllFieldMapper;
34+
import org.elasticsearch.index.mapper.MapperService;
3435
import org.elasticsearch.index.translog.Translog;
3536
import org.elasticsearch.node.Node;
3637

3738
import java.util.Locale;
3839
import java.util.concurrent.TimeUnit;
39-
import java.util.function.Consumer;
4040
import java.util.function.Function;
4141

4242
/**
@@ -192,7 +192,10 @@ public final class IndexSettings {
192192
* The maximum number of slices allowed in a scroll request.
193193
*/
194194
private volatile int maxSlicesPerScroll;
195-
195+
/**
196+
* Whether the index is required to have at most one type.
197+
*/
198+
private final boolean singleType;
196199

197200
/**
198201
* Returns the default search field for this index.
@@ -280,6 +283,7 @@ public IndexSettings(final IndexMetaData indexMetaData, final Settings nodeSetti
280283
maxSlicesPerScroll = scopedSettings.get(MAX_SLICES_PER_SCROLL);
281284
this.mergePolicyConfig = new MergePolicyConfig(logger, this);
282285
this.indexSortConfig = new IndexSortConfig(this);
286+
singleType = scopedSettings.get(MapperService.INDEX_MAPPING_SINGLE_TYPE_SETTING);
283287

284288
scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING, mergePolicyConfig::setNoCFSRatio);
285289
scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING, mergePolicyConfig::setExpungeDeletesAllowed);
@@ -391,6 +395,11 @@ public IndexMetaData getIndexMetaData() {
391395
*/
392396
public int getNumberOfReplicas() { return settings.getAsInt(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, null); }
393397

398+
/**
399+
* Returns whether the index enforces at most one type.
400+
*/
401+
public boolean isSingleType() { return singleType; }
402+
394403
/**
395404
* Returns the node settings. The settings returned from {@link #getSettings()} are a merged version of the
396405
* index settings and the node settings where node settings are overwritten by index settings.

core/src/main/java/org/elasticsearch/index/engine/Engine.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -473,8 +473,7 @@ protected final GetResult getFromSearcher(Get get, Function<String, Searcher> se
473473
if (docIdAndVersion != null) {
474474
if (get.versionType().isVersionConflictForReads(docIdAndVersion.version, get.version())) {
475475
Releasables.close(searcher);
476-
Uid uid = Uid.createUid(get.uid().text());
477-
throw new VersionConflictEngineException(shardId, uid.type(), uid.id(),
476+
throw new VersionConflictEngineException(shardId, get.type(), get.id(),
478477
get.versionType().explainConflictForReads(docIdAndVersion.version, get.version()));
479478
}
480479
}
@@ -1028,7 +1027,6 @@ public static class Index extends Operation {
10281027
public Index(Term uid, ParsedDocument doc, long seqNo, long primaryTerm, long version, VersionType versionType, Origin origin,
10291028
long startTime, long autoGeneratedIdTimestamp, boolean isRetry) {
10301029
super(uid, seqNo, primaryTerm, version, versionType, origin, startTime);
1031-
assert uid.bytes().equals(doc.uid()) : "term uid " + uid + " doesn't match doc uid " + doc.uid();
10321030
this.doc = doc;
10331031
this.isRetry = isRetry;
10341032
this.autoGeneratedIdTimestamp = autoGeneratedIdTimestamp;
@@ -1199,18 +1197,29 @@ public int estimatedSizeInBytes() {
11991197
public static class Get {
12001198
private final boolean realtime;
12011199
private final Term uid;
1200+
private final String type, id;
12021201
private long version = Versions.MATCH_ANY;
12031202
private VersionType versionType = VersionType.INTERNAL;
12041203

1205-
public Get(boolean realtime, Term uid) {
1204+
public Get(boolean realtime, String type, String id, Term uid) {
12061205
this.realtime = realtime;
1206+
this.type = type;
1207+
this.id = id;
12071208
this.uid = uid;
12081209
}
12091210

12101211
public boolean realtime() {
12111212
return this.realtime;
12121213
}
12131214

1215+
public String type() {
1216+
return type;
1217+
}
1218+
1219+
public String id() {
1220+
return id;
1221+
}
1222+
12141223
public Term uid() {
12151224
return uid;
12161225
}

0 commit comments

Comments
 (0)