From 8a1213ce0f7351e50ab47cea6e51c900f05318e7 Mon Sep 17 00:00:00 2001 From: Lee Hinman Date: Mon, 31 Oct 2016 16:21:38 -0600 Subject: [PATCH] Add internal _primary_term field This adds the `_primary_term` field internally to the mappings. This field is populated with the current shard's primary term. It is intended to be used for collision resolution when two document copies have the same sequence id, therefore, doc_values for the field are stored but the filed itself is not indexed. This also fixes the `_seq_no` field so that doc_values are retrievable (they were previously stored but irretrievable) and changes the `stats` implementation to more efficiently use the points API to retrieve the min/max instead of iterating on each doc_value value. Relates to #10708 --- .../action/delete/TransportDeleteAction.java | 3 +- .../elasticsearch/index/engine/Engine.java | 28 ++- .../index/mapper/DocumentParser.java | 1 + .../index/mapper/ParseContext.java | 25 ++ .../index/mapper/ParsedDocument.java | 28 ++- .../index/mapper/PrimaryTermFieldMapper.java | 215 ++++++++++++++++++ .../{internal => }/SeqNoFieldMapper.java | 53 ++--- .../elasticsearch/index/shard/IndexShard.java | 27 ++- .../shard/TranslogRecoveryPerformer.java | 5 +- .../index/translog/Translog.java | 29 ++- .../elasticsearch/indices/IndicesModule.java | 4 +- .../index/IndexingSlowLogTests.java | 2 +- .../index/engine/InternalEngineTests.java | 137 +++++------ .../index/engine/ShadowEngineTests.java | 5 +- .../mapper/FieldNamesFieldMapperTests.java | 4 +- .../index/shard/IndexShardIT.java | 7 +- .../index/shard/IndexShardTests.java | 3 +- .../index/shard/RefreshListenersTests.java | 4 +- .../index/translog/TranslogTests.java | 9 +- 19 files changed, 443 insertions(+), 146 deletions(-) create mode 100644 core/src/main/java/org/elasticsearch/index/mapper/PrimaryTermFieldMapper.java rename core/src/main/java/org/elasticsearch/index/mapper/{internal => }/SeqNoFieldMapper.java (81%) diff --git a/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java b/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java index 5bf618543f437..68c6540fdbcce 100644 --- a/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java +++ b/core/src/main/java/org/elasticsearch/action/delete/TransportDeleteAction.java @@ -142,7 +142,8 @@ public static WriteResult executeDeleteRequestOnPrimary(DeleteRe } public static Engine.Delete executeDeleteRequestOnReplica(DeleteRequest request, IndexShard indexShard) { - Engine.Delete delete = indexShard.prepareDeleteOnReplica(request.type(), request.id(), request.seqNo(), request.version(), request.versionType()); + Engine.Delete delete = indexShard.prepareDeleteOnReplica(request.type(), request.id(), request.seqNo(), request.primaryTerm(), + request.version(), request.versionType()); indexShard.delete(delete); return delete; } diff --git a/core/src/main/java/org/elasticsearch/index/engine/Engine.java b/core/src/main/java/org/elasticsearch/index/engine/Engine.java index 56fe8dc906110..ac26cc7f82142 100644 --- a/core/src/main/java/org/elasticsearch/index/engine/Engine.java +++ b/core/src/main/java/org/elasticsearch/index/engine/Engine.java @@ -776,17 +776,20 @@ public abstract static class Operation { private final Term uid; private long version; private long seqNo; + private long primaryTerm; private final VersionType versionType; private final Origin origin; private Translog.Location location; private final long startTime; private long endTime; - public Operation(Term uid, long seqNo, long version, VersionType versionType, Origin origin, long startTime) { + public Operation(Term uid, long seqNo, long primaryTerm, long version, VersionType versionType, Origin origin, long startTime) { this.uid = uid; - assert origin != Origin.PRIMARY || seqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO : "seqNo should not be set when origin is PRIMARY"; + assert origin != Origin.PRIMARY || seqNo == SequenceNumbersService.UNASSIGNED_SEQ_NO + : "seqNo should not be set when origin is PRIMARY"; assert origin == Origin.PRIMARY || seqNo >= 0 : "seqNo should be set when origin is not PRIMARY"; this.seqNo = seqNo; + this.primaryTerm = primaryTerm; this.version = version; this.versionType = versionType; this.origin = origin; @@ -828,6 +831,10 @@ public void updateSeqNo(long seqNo) { this.seqNo = seqNo; } + public long primaryTerm() { + return primaryTerm; + } + public void setTranslogLocation(Translog.Location location) { this.location = location; } @@ -880,9 +887,9 @@ public static class Index extends Operation { private final boolean isRetry; private boolean created; - public Index(Term uid, ParsedDocument doc, long seqNo, long version, VersionType versionType, Origin origin, long startTime, - long autoGeneratedIdTimestamp, boolean isRetry) { - super(uid, seqNo, version, versionType, origin, startTime); + public Index(Term uid, ParsedDocument doc, long seqNo, long primaryTerm, long version, VersionType versionType, Origin origin, + long startTime, long autoGeneratedIdTimestamp, boolean isRetry) { + super(uid, seqNo, primaryTerm, version, versionType, origin, startTime); this.doc = doc; this.isRetry = isRetry; this.autoGeneratedIdTimestamp = autoGeneratedIdTimestamp; @@ -893,7 +900,8 @@ public Index(Term uid, ParsedDocument doc) { } // TEST ONLY Index(Term uid, ParsedDocument doc, long version) { - this(uid, doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, version, VersionType.INTERNAL, Origin.PRIMARY, System.nanoTime(), -1, false); + this(uid, doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, version, VersionType.INTERNAL, + Origin.PRIMARY, System.nanoTime(), -1, false); } public ParsedDocument parsedDoc() { @@ -984,15 +992,17 @@ public static class Delete extends Operation { private final String id; private boolean found; - public Delete(String type, String id, Term uid, long seqNo, long version, VersionType versionType, Origin origin, long startTime, boolean found) { - super(uid, seqNo, version, versionType, origin, startTime); + public Delete(String type, String id, Term uid, long seqNo, long primaryTerm, long version, VersionType versionType, + Origin origin, long startTime, boolean found) { + super(uid, seqNo, primaryTerm, version, versionType, origin, startTime); this.type = type; this.id = id; this.found = found; } public Delete(String type, String id, Term uid) { - this(type, id, uid, SequenceNumbersService.UNASSIGNED_SEQ_NO, Versions.MATCH_ANY, VersionType.INTERNAL, Origin.PRIMARY, System.nanoTime(), false); + this(type, id, uid, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, Versions.MATCH_ANY, VersionType.INTERNAL, + Origin.PRIMARY, System.nanoTime(), false); } @Override diff --git a/core/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java b/core/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java index dd7335e18317c..c510df2ca5593 100644 --- a/core/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java +++ b/core/src/main/java/org/elasticsearch/index/mapper/DocumentParser.java @@ -150,6 +150,7 @@ private static ParsedDocument parsedDocument(SourceToParse source, ParseContext. return new ParsedDocument( context.version(), context.seqNo(), + context.primaryTerm(), context.sourceToParse().id(), context.sourceToParse().type(), source.routing(), diff --git a/core/src/main/java/org/elasticsearch/index/mapper/ParseContext.java b/core/src/main/java/org/elasticsearch/index/mapper/ParseContext.java index 2f5f1135b13ee..b1516ec7c9ae6 100644 --- a/core/src/main/java/org/elasticsearch/index/mapper/ParseContext.java +++ b/core/src/main/java/org/elasticsearch/index/mapper/ParseContext.java @@ -263,6 +263,16 @@ public void seqNo(Field seqNo) { in.seqNo(seqNo); } + @Override + public Field primaryTerm() { + return in.primaryTerm(); + } + + @Override + public void primaryTerm(Field primaryTerm) { + in.primaryTerm(primaryTerm); + } + @Override public AllEntries allEntries() { return in.allEntries(); @@ -312,6 +322,8 @@ public static class InternalParseContext extends ParseContext { private Field seqNo; + private Field primaryTerm; + private final AllEntries allEntries; private final List dynamicMappers; @@ -413,6 +425,15 @@ public void seqNo(Field seqNo) { this.seqNo = seqNo; } + @Override + public Field primaryTerm() { + return this.primaryTerm; + } + + @Override + public void primaryTerm(Field primaryTerm) { + this.primaryTerm = primaryTerm; + } @Override public AllEntries allEntries() { @@ -544,6 +565,10 @@ public boolean isWithinMultiFields() { public abstract void seqNo(Field seqNo); + public abstract Field primaryTerm(); + + public abstract void primaryTerm(Field primaryTerm); + public final boolean includeInAll(Boolean includeInAll, FieldMapper mapper) { return includeInAll(includeInAll, mapper.fieldType().indexOptions() != IndexOptions.NONE); } diff --git a/core/src/main/java/org/elasticsearch/index/mapper/ParsedDocument.java b/core/src/main/java/org/elasticsearch/index/mapper/ParsedDocument.java index dc0ba197b15c1..b84af41b56fdb 100644 --- a/core/src/main/java/org/elasticsearch/index/mapper/ParsedDocument.java +++ b/core/src/main/java/org/elasticsearch/index/mapper/ParsedDocument.java @@ -36,6 +36,7 @@ public class ParsedDocument { private final String id, type; private final BytesRef uid; private final Field seqNo; + private final Field primaryTerm; private final String routing; @@ -51,19 +52,20 @@ public class ParsedDocument { private String parent; - public ParsedDocument( - Field version, - Field seqNo, - String id, - String type, - String routing, - long timestamp, - long ttl, - List documents, - BytesReference source, - Mapping dynamicMappingsUpdate) { + public ParsedDocument(Field version, + Field seqNo, + Field primaryTerm, + String id, + String type, + String routing, + long timestamp, + long ttl, + List documents, + BytesReference source, + Mapping dynamicMappingsUpdate) { this.version = version; this.seqNo = seqNo; + this.primaryTerm = primaryTerm; this.id = id; this.type = type; this.uid = Uid.createUidAsBytes(type, id); @@ -95,6 +97,10 @@ public Field seqNo() { return seqNo; } + public Field primaryTerm() { + return primaryTerm; + } + public String routing() { return this.routing; } diff --git a/core/src/main/java/org/elasticsearch/index/mapper/PrimaryTermFieldMapper.java b/core/src/main/java/org/elasticsearch/index/mapper/PrimaryTermFieldMapper.java new file mode 100644 index 0000000000000..4b1c3937cb641 --- /dev/null +++ b/core/src/main/java/org/elasticsearch/index/mapper/PrimaryTermFieldMapper.java @@ -0,0 +1,215 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.index.mapper; + +import org.apache.lucene.document.Field; +import org.apache.lucene.document.NumericDocValuesField; +import org.apache.lucene.index.DocValuesType; +import org.apache.lucene.index.IndexOptions; +import org.apache.lucene.index.IndexReader; +import org.apache.lucene.index.LeafReader; +import org.apache.lucene.index.LeafReaderContext; +import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.search.Query; +import org.apache.lucene.util.Bits; +import org.elasticsearch.action.fieldstats.FieldStats; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.fielddata.IndexFieldData; +import org.elasticsearch.index.fielddata.IndexNumericFieldData.NumericType; +import org.elasticsearch.index.fielddata.plain.DocValuesIndexFieldData; +import org.elasticsearch.index.mapper.MappedFieldType; +import org.elasticsearch.index.mapper.Mapper; +import org.elasticsearch.index.mapper.MapperParsingException; +import org.elasticsearch.index.mapper.MetadataFieldMapper; +import org.elasticsearch.index.mapper.ParseContext; +import org.elasticsearch.index.mapper.ParseContext.Document; +import org.elasticsearch.index.query.QueryShardContext; +import org.elasticsearch.index.query.QueryShardException; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +/** + * Mapper for the {@code _primary_term} field. + * + * The primary term is only used during collision after receiving identical seq# + * values for two document copies. The primary term is stored as a doc value + * field without being indexed, since it is only intended for use as a + * key-value lookup. + */ +public class PrimaryTermFieldMapper extends MetadataFieldMapper { + + public static final String NAME = "_primary_term"; + public static final String CONTENT_TYPE = "_primary_term"; + + public static class Defaults { + + public static final String NAME = PrimaryTermFieldMapper.NAME; + public static final MappedFieldType FIELD_TYPE = new PrimaryTermFieldType(); + + static { + FIELD_TYPE.setName(NAME); + // Need sorted doc_values for key-value lookup of primary term + FIELD_TYPE.setDocValuesType(DocValuesType.SORTED); + FIELD_TYPE.setHasDocValues(true); + // We don't do any searches on this field, so don't even index it + FIELD_TYPE.setIndexOptions(IndexOptions.NONE); + FIELD_TYPE.freeze(); + } + } + + public static class Builder extends MetadataFieldMapper.Builder { + + public Builder() { + super(Defaults.NAME, Defaults.FIELD_TYPE, Defaults.FIELD_TYPE); + } + + @Override + public PrimaryTermFieldMapper build(BuilderContext context) { + return new PrimaryTermFieldMapper(context.indexSettings()); + } + } + + public static class TypeParser implements MetadataFieldMapper.TypeParser { + @Override + public MetadataFieldMapper.Builder parse(String name, Map node, ParserContext parserContext) + throws MapperParsingException { + throw new MapperParsingException(NAME + " is not configurable"); + } + + @Override + public MetadataFieldMapper getDefault(Settings indexSettings, MappedFieldType fieldType, String typeName) { + return new PrimaryTermFieldMapper(indexSettings); + } + } + + static final class PrimaryTermFieldType extends MappedFieldType { + + public PrimaryTermFieldType() { + } + + protected PrimaryTermFieldType(PrimaryTermFieldType ref) { + super(ref); + } + + @Override + public MappedFieldType clone() { + return new PrimaryTermFieldType(this); + } + + @Override + public String typeName() { + return CONTENT_TYPE; + } + + @Override + public Query termQuery(Object value, @Nullable QueryShardContext context) { + throw new QueryShardException(context, "PrimaryTermField field [" + name() + "] is not searchable"); + } + + @Override + public IndexFieldData.Builder fielddataBuilder() { + failIfNoDocValues(); + return new DocValuesIndexFieldData.Builder().numericType(NumericType.LONG); + } + + @Override + public FieldStats stats(IndexReader reader) throws IOException { + // TODO: evaluate whether we need this at all, do we need field stats on this field? + // nocommit remove implementation when late-binding commits are possible + final List leaves = reader.leaves(); + if (leaves.isEmpty()) { + return null; + } + + long currentMin = Long.MAX_VALUE; + long currentMax = Long.MIN_VALUE; + boolean found = false; + for (int i = 0; i < leaves.size(); i++) { + final LeafReader leaf = leaves.get(i).reader(); + final NumericDocValues values = leaf.getNumericDocValues(name()); + if (values == null) continue; + final Bits bits = leaf.getLiveDocs(); + for (int docID = 0; docID < leaf.maxDoc(); docID++) { + if (bits == null || bits.get(docID)) { + found = true; + final long value = values.get(docID); + currentMin = Math.min(currentMin, value); + currentMax = Math.max(currentMax, value); + } + } + } + return found ? new FieldStats.Long(reader.maxDoc(), 0, -1, -1, false, false, currentMin, currentMax) : null; + } + + } + + public PrimaryTermFieldMapper(Settings indexSettings) { + super(NAME, Defaults.FIELD_TYPE, Defaults.FIELD_TYPE, indexSettings); + } + + @Override + public void preParse(ParseContext context) throws IOException { + super.parse(context); + } + + @Override + protected void parseCreateField(ParseContext context, List fields) throws IOException { + // see IndexShard.prepareIndex* to see where the real version value is passed through + final Field primaryTerm = new NumericDocValuesField(NAME, 0); + context.primaryTerm(primaryTerm); + fields.add(primaryTerm); + } + + @Override + public Mapper parse(ParseContext context) throws IOException { + return null; + } + + @Override + public void postParse(ParseContext context) throws IOException { + // In the case of nested docs, let's fill nested docs with primaryTerm=0 + // so that Lucene doesn't write a Bitset for documents that don't have + // the field. This is consistent with the default value for efficiency. + for (int i = 1; i < context.docs().size(); i++) { + final Document doc = context.docs().get(i); + doc.add(new NumericDocValuesField(NAME, 0L)); + } + } + + @Override + protected String contentType() { + return CONTENT_TYPE; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return builder; + } + + @Override + protected void doMerge(Mapper mergeWith, boolean updateAllTypes) { + // nothing to do + } + +} diff --git a/core/src/main/java/org/elasticsearch/index/mapper/internal/SeqNoFieldMapper.java b/core/src/main/java/org/elasticsearch/index/mapper/SeqNoFieldMapper.java similarity index 81% rename from core/src/main/java/org/elasticsearch/index/mapper/internal/SeqNoFieldMapper.java rename to core/src/main/java/org/elasticsearch/index/mapper/SeqNoFieldMapper.java index 479e5f23527c4..68cdeaa616ddf 100644 --- a/core/src/main/java/org/elasticsearch/index/mapper/internal/SeqNoFieldMapper.java +++ b/core/src/main/java/org/elasticsearch/index/mapper/SeqNoFieldMapper.java @@ -17,21 +17,26 @@ * under the License. */ -package org.elasticsearch.index.mapper.internal; +package org.elasticsearch.index.mapper; import org.apache.lucene.document.Field; +import org.apache.lucene.document.LongPoint; import org.apache.lucene.document.NumericDocValuesField; import org.apache.lucene.index.DocValuesType; import org.apache.lucene.index.IndexReader; import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.LeafReaderContext; import org.apache.lucene.index.NumericDocValues; +import org.apache.lucene.index.XPointValues; import org.apache.lucene.search.Query; import org.apache.lucene.util.Bits; import org.elasticsearch.action.fieldstats.FieldStats; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.index.fielddata.IndexFieldData; +import org.elasticsearch.index.fielddata.IndexNumericFieldData.NumericType; +import org.elasticsearch.index.fielddata.plain.DocValuesIndexFieldData; import org.elasticsearch.index.mapper.MappedFieldType; import org.elasticsearch.index.mapper.Mapper; import org.elasticsearch.index.mapper.MapperParsingException; @@ -46,7 +51,13 @@ import java.util.List; import java.util.Map; -/** Mapper for the _seq_no field. */ +/** + * Mapper for the {@code _seq_no} field. + * + * We expect to use the seq# for sorting, during collision checking and for + * doing range searches. Therefore the {@code _seq_no} field is stored both + * as a numeric doc value and as numeric indexed field. + */ public class SeqNoFieldMapper extends MetadataFieldMapper { public static final String NAME = "_seq_no"; @@ -114,34 +125,24 @@ public Query termQuery(Object value, @Nullable QueryShardContext context) { throw new QueryShardException(context, "SeqNoField field [" + name() + "] is not searchable"); } + @Override + public IndexFieldData.Builder fielddataBuilder() { + failIfNoDocValues(); + return new DocValuesIndexFieldData.Builder().numericType(NumericType.LONG); + } + @Override public FieldStats stats(IndexReader reader) throws IOException { - // nocommit remove implementation when late-binding commits - // are possible - final List leaves = reader.leaves(); - if (leaves.isEmpty()) { + String fieldName = name(); + long size = XPointValues.size(reader, fieldName); + if (size == 0) { return null; } - - long currentMin = Long.MAX_VALUE; - long currentMax = Long.MIN_VALUE; - boolean found = false; - for (int i = 0; i < leaves.size(); i++) { - final LeafReader leaf = leaves.get(i).reader(); - final NumericDocValues values = leaf.getNumericDocValues(name()); - if (values == null) continue; - final Bits bits = leaf.getLiveDocs(); - for (int docID = 0; docID < leaf.maxDoc(); docID++) { - if (bits == null || bits.get(docID)) { - found = true; - final long value = values.get(docID); - currentMin = Math.min(currentMin, value); - currentMax = Math.max(currentMax, value); - } - } - } - - return found ? new FieldStats.Long(reader.maxDoc(), 0, -1, -1, false, true, currentMin, currentMax) : null; + int docCount = XPointValues.getDocCount(reader, fieldName); + byte[] min = XPointValues.getMinPackedValue(reader, fieldName); + byte[] max = XPointValues.getMaxPackedValue(reader, fieldName); + return new FieldStats.Long(reader.maxDoc(),docCount, -1L, size, true, false, + LongPoint.decodeDimension(min, 0), LongPoint.decodeDimension(max, 0)); } } diff --git a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java index 7d56b5e20b6c4..2841aff1038ac 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java +++ b/core/src/main/java/org/elasticsearch/index/shard/IndexShard.java @@ -495,7 +495,7 @@ public Engine.Index prepareIndexOnPrimary(SourceToParse source, long version, Ve boolean isRetry) { try { verifyPrimary(); - return prepareIndex(docMapper(source.type()), source, SequenceNumbersService.UNASSIGNED_SEQ_NO, version, versionType, + return prepareIndex(docMapper(source.type()), source, SequenceNumbersService.UNASSIGNED_SEQ_NO, primaryTerm, version, versionType, Engine.Operation.Origin.PRIMARY, autoGeneratedIdTimestamp, isRetry); } catch (Exception e) { verifyNotClosed(e); @@ -507,16 +507,17 @@ public Engine.Index prepareIndexOnReplica(SourceToParse source, long seqNo, long boolean isRetry) { try { verifyReplicationTarget(); - return prepareIndex(docMapper(source.type()), source, seqNo, version, versionType, Engine.Operation.Origin.REPLICA, autoGeneratedIdTimestamp, - isRetry); + return prepareIndex(docMapper(source.type()), source, seqNo, primaryTerm, version, versionType, + Engine.Operation.Origin.REPLICA, autoGeneratedIdTimestamp, isRetry); } catch (Exception e) { verifyNotClosed(e); throw e; } } - static Engine.Index prepareIndex(DocumentMapperForType docMapper, SourceToParse source, long seqNo, long version, VersionType versionType, - Engine.Operation.Origin origin, long autoGeneratedIdTimestamp, boolean isRetry) { + static Engine.Index prepareIndex(DocumentMapperForType docMapper, SourceToParse source, long seqNo, long primaryTerm, long version, + VersionType versionType, Engine.Operation.Origin origin, long autoGeneratedIdTimestamp, + boolean isRetry) { long startTime = System.nanoTime(); ParsedDocument doc = docMapper.getDocumentMapper().parse(source); if (docMapper.getMapping() != null) { @@ -526,7 +527,8 @@ static Engine.Index prepareIndex(DocumentMapperForType docMapper, SourceToParse Query uidQuery = uidFieldType.termQuery(doc.uid(), null); Term uid = MappedFieldType.extractTerm(uidQuery); doc.seqNo().setLongValue(seqNo); - return new Engine.Index(uid, doc, seqNo, version, versionType, origin, startTime, autoGeneratedIdTimestamp, isRetry); + doc.primaryTerm().setLongValue(primaryTerm); + return new Engine.Index(uid, doc, seqNo, primaryTerm, version, versionType, origin, startTime, autoGeneratedIdTimestamp, isRetry); } public void index(Engine.Index index) { @@ -557,21 +559,24 @@ public Engine.Delete prepareDeleteOnPrimary(String type, String id, long version final MappedFieldType uidFieldType = documentMapper.uidMapper().fieldType(); final Query uidQuery = uidFieldType.termQuery(Uid.createUid(type, id), null); final Term uid = MappedFieldType.extractTerm(uidQuery); - return prepareDelete(type, id, uid, SequenceNumbersService.UNASSIGNED_SEQ_NO, version, versionType, Engine.Operation.Origin.PRIMARY); + return prepareDelete(type, id, uid, SequenceNumbersService.UNASSIGNED_SEQ_NO, primaryTerm, version, + versionType, Engine.Operation.Origin.PRIMARY); } - public Engine.Delete prepareDeleteOnReplica(String type, String id, long seqNo, long version, VersionType versionType) { + public Engine.Delete prepareDeleteOnReplica(String type, String id, long seqNo, long primaryTerm, + long version, VersionType versionType) { verifyReplicationTarget(); final DocumentMapper documentMapper = docMapper(type).getDocumentMapper(); final MappedFieldType uidFieldType = documentMapper.uidMapper().fieldType(); final Query uidQuery = uidFieldType.termQuery(Uid.createUid(type, id), null); final Term uid = MappedFieldType.extractTerm(uidQuery); - return prepareDelete(type, id, uid, seqNo, version, versionType, Engine.Operation.Origin.REPLICA); + return prepareDelete(type, id, uid, seqNo, primaryTerm, version, versionType, Engine.Operation.Origin.REPLICA); } - static Engine.Delete prepareDelete(String type, String id, Term uid, long seqNo, long version, VersionType versionType, Engine.Operation.Origin origin) { + static Engine.Delete prepareDelete(String type, String id, Term uid, long seqNo, long primaryTerm, long version, + VersionType versionType, Engine.Operation.Origin origin) { long startTime = System.nanoTime(); - return new Engine.Delete(type, id, uid, seqNo, version, versionType, origin, startTime, false); + return new Engine.Delete(type, id, uid, seqNo, primaryTerm, version, versionType, origin, startTime, false); } public void delete(Engine.Delete delete) { diff --git a/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java b/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java index 5ed2280e36688..c4556e225c7b8 100644 --- a/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java +++ b/core/src/main/java/org/elasticsearch/index/shard/TranslogRecoveryPerformer.java @@ -155,7 +155,7 @@ private void performRecoveryOperation(Engine engine, Translog.Operation operatio // we set canHaveDuplicates to true all the time such that we de-optimze the translog case and ensure that all // autoGeneratedID docs that are coming from the primary are updated correctly. Engine.Index engineIndex = IndexShard.prepareIndex(docMapper(index.type()), source(shardId.getIndexName(), index.type(), index.id(), index.source()) - .routing(index.routing()).parent(index.parent()).timestamp(index.timestamp()).ttl(index.ttl()), index.seqNo(), + .routing(index.routing()).parent(index.parent()).timestamp(index.timestamp()).ttl(index.ttl()), index.seqNo(), index.primaryTerm(), index.version(), index.versionType().versionTypeForReplicationAndRecovery(), origin, index.getAutoGeneratedIdTimestamp(), true); maybeAddMappingUpdate(engineIndex.type(), engineIndex.parsedDoc().dynamicMappingsUpdate(), engineIndex.id(), allowMappingUpdates); if (logger.isTraceEnabled()) { @@ -170,7 +170,8 @@ private void performRecoveryOperation(Engine engine, Translog.Operation operatio logger.trace("[translog] recover [delete] op of [{}][{}]", uid.type(), uid.id()); } final Engine.Delete engineDelete = new Engine.Delete(uid.type(), uid.id(), delete.uid(), delete.seqNo(), - delete.version(), delete.versionType().versionTypeForReplicationAndRecovery(), origin, System.nanoTime(), false); + delete.primaryTerm(), delete.version(), delete.versionType().versionTypeForReplicationAndRecovery(), + origin, System.nanoTime(), false); delete(engine, engineDelete); break; default: diff --git a/core/src/main/java/org/elasticsearch/index/translog/Translog.java b/core/src/main/java/org/elasticsearch/index/translog/Translog.java index 0abf232fa1251..2d2cca39e55ff 100644 --- a/core/src/main/java/org/elasticsearch/index/translog/Translog.java +++ b/core/src/main/java/org/elasticsearch/index/translog/Translog.java @@ -829,6 +829,7 @@ public static class Index implements Operation { private final long autoGeneratedIdTimestamp; private final String type; private long seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO; + private long primaryTerm = 0; private final long version; private final VersionType versionType; private final BytesReference source; @@ -857,6 +858,7 @@ public Index(StreamInput in) throws IOException { } if (format >= FORMAT_SEQ_NO) { seqNo = in.readVLong(); + primaryTerm = in.readVLong(); } } @@ -867,6 +869,7 @@ public Index(Engine.Index index) { this.routing = index.routing(); this.parent = index.parent(); this.seqNo = index.seqNo(); + this.primaryTerm = index.primaryTerm(); this.version = index.version(); this.timestamp = index.timestamp(); this.ttl = index.ttl(); @@ -930,6 +933,10 @@ public long seqNo() { return seqNo; } + public long primaryTerm() { + return primaryTerm; + } + public long version() { return this.version; } @@ -957,6 +964,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeByte(versionType.getValue()); out.writeLong(autoGeneratedIdTimestamp); out.writeVLong(seqNo); + out.writeVLong(primaryTerm); } @Override @@ -972,6 +980,7 @@ public boolean equals(Object o) { if (version != index.version || seqNo != index.seqNo || + primaryTerm != index.primaryTerm || timestamp != index.timestamp || ttl != index.ttl || id.equals(index.id) == false || @@ -993,6 +1002,7 @@ public int hashCode() { int result = id.hashCode(); result = 31 * result + type.hashCode(); result = 31 * result + Long.hashCode(seqNo); + result = 31 * result + Long.hashCode(primaryTerm); result = 31 * result + Long.hashCode(version); result = 31 * result + versionType.hashCode(); result = 31 * result + source.hashCode(); @@ -1024,6 +1034,7 @@ public static class Delete implements Operation { private Term uid; private long seqNo = SequenceNumbersService.UNASSIGNED_SEQ_NO; + private long primaryTerm = 0; private long version = Versions.MATCH_ANY; private VersionType versionType = VersionType.INTERNAL; @@ -1036,23 +1047,25 @@ public Delete(StreamInput in) throws IOException { assert versionType.validateVersionForWrites(this.version); if (format >= FORMAT_SEQ_NO) { seqNo = in.readVLong(); + primaryTerm = in.readVLong(); } } public Delete(Engine.Delete delete) { - this(delete.uid(), delete.seqNo(), delete.version(), delete.versionType()); + this(delete.uid(), delete.seqNo(), delete.primaryTerm(), delete.version(), delete.versionType()); } /** utility for testing */ public Delete(Term uid) { - this(uid, 0, Versions.MATCH_ANY, VersionType.INTERNAL); + this(uid, 0, 0, Versions.MATCH_ANY, VersionType.INTERNAL); } - public Delete(Term uid, long seqNo, long version, VersionType versionType) { + public Delete(Term uid, long seqNo, long primaryTerm, long version, VersionType versionType) { this.uid = uid; this.version = version; this.versionType = versionType; this.seqNo = seqNo; + this.primaryTerm = primaryTerm; } @Override @@ -1073,6 +1086,10 @@ public long seqNo() { return seqNo; } + public long primaryTerm() { + return primaryTerm; + } + public long version() { return this.version; } @@ -1094,6 +1111,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeLong(version); out.writeByte(versionType.getValue()); out.writeVLong(seqNo); + out.writeVLong(primaryTerm); } @Override @@ -1107,7 +1125,9 @@ public boolean equals(Object o) { Delete delete = (Delete) o; - return version == delete.version && seqNo == delete.seqNo && + return version == delete.version && + seqNo == delete.seqNo && + primaryTerm == delete.primaryTerm && uid.equals(delete.uid) && versionType == delete.versionType; } @@ -1116,6 +1136,7 @@ public boolean equals(Object o) { public int hashCode() { int result = uid.hashCode(); result = 31 * result + Long.hashCode(seqNo); + result = 31 * result + Long.hashCode(primaryTerm); result = 31 * result + Long.hashCode(version); result = 31 * result + versionType.hashCode(); return result; diff --git a/core/src/main/java/org/elasticsearch/indices/IndicesModule.java b/core/src/main/java/org/elasticsearch/indices/IndicesModule.java index 8465dfbd540ad..1e68af550ffb3 100644 --- a/core/src/main/java/org/elasticsearch/indices/IndicesModule.java +++ b/core/src/main/java/org/elasticsearch/indices/IndicesModule.java @@ -46,8 +46,10 @@ import org.elasticsearch.index.mapper.NumberFieldMapper; import org.elasticsearch.index.mapper.ObjectMapper; import org.elasticsearch.index.mapper.ParentFieldMapper; +import org.elasticsearch.index.mapper.PrimaryTermFieldMapper; import org.elasticsearch.index.mapper.RoutingFieldMapper; import org.elasticsearch.index.mapper.ScaledFloatFieldMapper; +import org.elasticsearch.index.mapper.SeqNoFieldMapper; import org.elasticsearch.index.mapper.SourceFieldMapper; import org.elasticsearch.index.mapper.StringFieldMapper; import org.elasticsearch.index.mapper.TTLFieldMapper; @@ -57,7 +59,6 @@ import org.elasticsearch.index.mapper.TypeFieldMapper; import org.elasticsearch.index.mapper.UidFieldMapper; import org.elasticsearch.index.mapper.VersionFieldMapper; -import org.elasticsearch.index.mapper.internal.SeqNoFieldMapper; import org.elasticsearch.index.seqno.GlobalCheckpointSyncAction; import org.elasticsearch.indices.cluster.IndicesClusterStateService; import org.elasticsearch.indices.flush.SyncedFlushService; @@ -156,6 +157,7 @@ private Map getMetadataMappers(List SequenceNumbersService.UNASSIGNED_SEQ_NO)) { fail("corrupted"); } catch (IllegalStateException ex) { - assertEquals(ex.getMessage(), "Checkpoint file translog-2.ckp already exists but has corrupted content expected: Checkpoint{offset=3178, numOps=55, translogFileGeneration=2, globalCheckpoint=-2} but got: Checkpoint{offset=0, numOps=0, translogFileGeneration=0, globalCheckpoint=-2}"); + assertEquals(ex.getMessage(), "Checkpoint file translog-2.ckp already exists but has corrupted content expected: Checkpoint{offset=3233, numOps=55, translogFileGeneration=2, globalCheckpoint=-2} but got: Checkpoint{offset=0, numOps=0, translogFileGeneration=0, globalCheckpoint=-2}"); } Checkpoint.write(FileChannel::open, config.getTranslogPath().resolve(Translog.getCommitCheckpointFileName(read.generation)), read, StandardOpenOption.WRITE, StandardOpenOption.TRUNCATE_EXISTING); try (Translog translog = new Translog(config, translogGeneration, () -> SequenceNumbersService.UNASSIGNED_SEQ_NO)) { @@ -1304,6 +1304,7 @@ public void run() { op = new Translog.Delete( new Term("_uid", threadId + "_" + opCount), opCount, + 0, 1 + randomInt(100000), randomFrom(VersionType.values())); break;