Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,8 @@ public static Engine.DeleteResult executeDeleteRequestOnPrimary(DeleteRequest re
}

public static Engine.DeleteResult executeDeleteRequestOnReplica(DeleteRequest request, IndexShard replica) {
final Engine.Delete delete =
replica.prepareDeleteOnReplica(request.type(), request.id(), request.seqNo(), request.version(), request.versionType());
final Engine.Delete delete = replica.prepareDeleteOnReplica(request.type(), request.id(),
request.seqNo(), request.primaryTerm(), request.version(), request.versionType());
return replica.delete(delete);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,4 +228,3 @@ public static Engine.IndexResult executeIndexRequestOnPrimary(IndexRequest reque
}

}

122 changes: 122 additions & 0 deletions core/src/main/java/org/elasticsearch/common/lucene/uid/Versions.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,25 @@

package org.elasticsearch.common.lucene.uid;

import org.apache.lucene.index.Fields;
import org.apache.lucene.index.IndexReader;
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.LeafReader.CoreClosedListener;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.NumericDocValues;
import org.apache.lucene.index.PostingsEnum;
import org.apache.lucene.index.SortedNumericDocValues;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.Terms;
import org.apache.lucene.index.TermsEnum;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.CloseableThreadLocal;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;
import org.elasticsearch.index.mapper.UidFieldMapper;
import org.elasticsearch.index.seqno.SequenceNumbersService;

import java.io.IOException;
import java.util.List;
Expand Down Expand Up @@ -143,4 +154,115 @@ public static long loadVersion(IndexReader reader, Term term) throws IOException
final DocIdAndVersion docIdAndVersion = loadDocIdAndVersion(reader, term);
return docIdAndVersion == null ? NOT_FOUND : docIdAndVersion.version;
}


/**
* Returns the sequence number for the given uid term, returning
* {@code SequenceNumbersService.UNASSIGNED_SEQ_NO} if none is found.
*/
public static long loadSeqNo(IndexReader reader, Term term) throws IOException {
assert term.field().equals(UidFieldMapper.NAME) : "can only load _seq_no by uid";
List<LeafReaderContext> leaves = reader.leaves();
if (leaves.isEmpty()) {
return SequenceNumbersService.UNASSIGNED_SEQ_NO;
}

// iterate backwards to optimize for the frequently updated documents
// which are likely to be in the last segments
for (int i = leaves.size() - 1; i >= 0; i--) {
LeafReader leaf = leaves.get(i).reader();
Bits liveDocs = leaf.getLiveDocs();

TermsEnum termsEnum = null;
SortedNumericDocValues dvField = null;
PostingsEnum docsEnum = null;

final Fields fields = leaf.fields();
if (fields != null) {
Terms terms = fields.terms(UidFieldMapper.NAME);
if (terms != null) {
termsEnum = terms.iterator();
assert termsEnum != null;
dvField = leaf.getSortedNumericDocValues(SeqNoFieldMapper.NAME);
assert dvField != null;

final BytesRef id = term.bytes();
if (termsEnum.seekExact(id)) {
// there may be more than one matching docID, in the
// case of nested docs, so we want the last one:
docsEnum = termsEnum.postings(docsEnum, 0);
int docID = DocIdSetIterator.NO_MORE_DOCS;
for (int d = docsEnum.nextDoc(); d != DocIdSetIterator.NO_MORE_DOCS; d = docsEnum.nextDoc()) {
if (liveDocs != null && liveDocs.get(d) == false) {
continue;
}
docID = d;
}

if (docID != DocIdSetIterator.NO_MORE_DOCS) {
dvField.setDocument(docID);
assert dvField.count() == 1 : "expected only a single value for _seq_no but got " +
dvField.count();
return dvField.valueAt(0);
}
}
}
}

}
return SequenceNumbersService.UNASSIGNED_SEQ_NO;
}

/**
* Returns the primary term for the given uid term, returning {@code 0} if none is found.
*/
public static long loadPrimaryTerm(IndexReader reader, Term term) throws IOException {
assert term.field().equals(UidFieldMapper.NAME) : "can only load _primary_term by uid";
List<LeafReaderContext> leaves = reader.leaves();
if (leaves.isEmpty()) {
return 0;
}

// iterate backwards to optimize for the frequently updated documents
// which are likely to be in the last segments
for (int i = leaves.size() - 1; i >= 0; i--) {
LeafReader leaf = leaves.get(i).reader();
Bits liveDocs = leaf.getLiveDocs();

TermsEnum termsEnum = null;
NumericDocValues dvField = null;
PostingsEnum docsEnum = null;

final Fields fields = leaf.fields();
if (fields != null) {
Terms terms = fields.terms(UidFieldMapper.NAME);
if (terms != null) {
termsEnum = terms.iterator();
assert termsEnum != null;
dvField = leaf.getNumericDocValues(SeqNoFieldMapper.PRIMARY_TERM_NAME);
assert dvField != null;

final BytesRef id = term.bytes();
if (termsEnum.seekExact(id)) {
// there may be more than one matching docID, in the
// case of nested docs, so we want the last one:
docsEnum = termsEnum.postings(docsEnum, 0);
int docID = DocIdSetIterator.NO_MORE_DOCS;
for (int d = docsEnum.nextDoc(); d != DocIdSetIterator.NO_MORE_DOCS; d = docsEnum.nextDoc()) {
if (liveDocs != null && liveDocs.get(d) == false) {
continue;
}
docID = d;
}

if (docID != DocIdSetIterator.NO_MORE_DOCS) {
return dvField.get(docID);
}
}
}
}

}
return 0;
}
}
28 changes: 19 additions & 9 deletions core/src/main/java/org/elasticsearch/index/engine/Engine.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.collect.Tuple;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
Expand Down Expand Up @@ -925,13 +926,15 @@ public String getLowercase() {
private final Term uid;
private final long version;
private final long seqNo;
private final long primaryTerm;
private final VersionType versionType;
private final Origin origin;
private final long startTime;

public Operation(Term uid, long seqNo, long version, VersionType versionType, Origin origin, long startTime) {
public Operation(Term uid, long seqNo, long primaryTerm, long version, VersionType versionType, Origin origin, long startTime) {
this.uid = uid;
this.seqNo = seqNo;
this.primaryTerm = primaryTerm;
this.version = version;
this.versionType = versionType;
this.origin = origin;
Expand Down Expand Up @@ -965,6 +968,10 @@ public long seqNo() {
return seqNo;
}

public long primaryTerm() {
return primaryTerm;
}

public abstract int estimatedSizeInBytes();

public VersionType versionType() {
Expand All @@ -991,9 +998,9 @@ public static class Index extends Operation {
private final long autoGeneratedIdTimestamp;
private final boolean isRetry;

public Index(Term uid, ParsedDocument doc, long seqNo, long version, VersionType versionType, Origin origin, long startTime,
long autoGeneratedIdTimestamp, boolean isRetry) {
super(uid, seqNo, version, versionType, origin, startTime);
public Index(Term uid, ParsedDocument doc, long seqNo, long primaryTerm, long version, VersionType versionType, Origin origin,
long startTime, long autoGeneratedIdTimestamp, boolean isRetry) {
super(uid, seqNo, primaryTerm, version, versionType, origin, startTime);
this.doc = doc;
this.isRetry = isRetry;
this.autoGeneratedIdTimestamp = autoGeneratedIdTimestamp;
Expand All @@ -1004,7 +1011,8 @@ public Index(Term uid, ParsedDocument doc) {
} // TEST ONLY

Index(Term uid, ParsedDocument doc, long version) {
this(uid, doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, version, VersionType.INTERNAL, Origin.PRIMARY, System.nanoTime(), -1, false);
this(uid, doc, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, version, VersionType.INTERNAL,
Origin.PRIMARY, System.nanoTime(), -1, false);
}

public ParsedDocument parsedDoc() {
Expand Down Expand Up @@ -1071,18 +1079,20 @@ public static class Delete extends Operation {
private final String type;
private final String id;

public Delete(String type, String id, Term uid, long seqNo, long version, VersionType versionType, Origin origin, long startTime) {
super(uid, seqNo, version, versionType, origin, startTime);
public Delete(String type, String id, Term uid, long seqNo, long primaryTerm, long version, VersionType versionType,
Origin origin, long startTime) {
super(uid, seqNo, primaryTerm, version, versionType, origin, startTime);
this.type = type;
this.id = id;
}

public Delete(String type, String id, Term uid) {
this(type, id, uid, SequenceNumbersService.UNASSIGNED_SEQ_NO, Versions.MATCH_ANY, VersionType.INTERNAL, Origin.PRIMARY, System.nanoTime());
this(type, id, uid, SequenceNumbersService.UNASSIGNED_SEQ_NO, 0, Versions.MATCH_ANY, VersionType.INTERNAL, Origin.PRIMARY, System.nanoTime());
}

public Delete(Delete template, VersionType versionType) {
this(template.type(), template.id(), template.uid(), template.seqNo(), template.version(), versionType, template.origin(), template.startTime());
this(template.type(), template.id(), template.uid(), template.seqNo(), template.primaryTerm(), template.version(),
versionType, template.origin(), template.startTime());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -650,6 +650,16 @@ private IndexResult innerIndex(Index index) throws IOException {
}
updatedVersion = index.versionType().updateVersion(currentVersion, expectedVersion);
index.parsedDoc().version().setLongValue(updatedVersion);

// Update the document's sequence number and primary term, the
// sequence number here is derived here from either the sequence
// number service if this is on the primary, or the existing
// document's sequence number if this is on the replica. The
// primary term here has already been set, see
// IndexShard.prepareIndex where the Engine.Index operation is
// created
index.parsedDoc().updateSeqID(seqNo, index.primaryTerm());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another drive by question - should we do it here, or separate this into two, more explicit, flows - one we when we create an Index operation on a replica (where we set the seq no number based on incoming request) and one here when we operate as a primary? If you guys feel the current version is more intuitive, I'm good but I wanted to get confirmation this was considered.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer having exactly one place where this is updated, as it's less likely to get out of sync than if it were separated. It's also easier to find when it's only in a single place.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm totally open to suggestions otherwise though, @jasontedor you're about to use this PR, do you have a preference?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I prefer it the way that you have it.


if (currentVersion == Versions.NOT_FOUND && forceUpdateDocument == false) {
// document does not exists, we can optimize for create, but double check if assertions are running
assert assertDocDoesNotExist(index, canOptimizeAddDocument == false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ private static void reverseOrder(ParseContext.InternalParseContext context) {
private static ParsedDocument parsedDocument(SourceToParse source, ParseContext.InternalParseContext context, Mapping update) {
return new ParsedDocument(
context.version(),
context.seqNo(),
context.seqID(),
context.sourceToParse().id(),
context.sourceToParse().type(),
source.routing(),
Expand Down
24 changes: 12 additions & 12 deletions core/src/main/java/org/elasticsearch/index/mapper/ParseContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.elasticsearch.common.lucene.all.AllEntries;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;

import java.util.ArrayList;
import java.util.Iterator;
Expand Down Expand Up @@ -254,13 +255,13 @@ public void version(Field version) {
}

@Override
public Field seqNo() {
return in.seqNo();
public SeqNoFieldMapper.SequenceID seqID() {
return in.seqID();
}

@Override
public void seqNo(Field seqNo) {
in.seqNo(seqNo);
public void seqID(SeqNoFieldMapper.SequenceID seqID) {
in.seqID(seqID);
}

@Override
Expand Down Expand Up @@ -310,7 +311,7 @@ public static class InternalParseContext extends ParseContext {

private Field version;

private Field seqNo;
private SeqNoFieldMapper.SequenceID seqID;

private final AllEntries allEntries;

Expand Down Expand Up @@ -404,16 +405,15 @@ public void version(Field version) {
}

@Override
public Field seqNo() {
return this.seqNo;
public SeqNoFieldMapper.SequenceID seqID() {
return this.seqID;
}

@Override
public void seqNo(Field seqNo) {
this.seqNo = seqNo;
public void seqID(SeqNoFieldMapper.SequenceID seqID) {
this.seqID = seqID;
}


@Override
public AllEntries allEntries() {
return this.allEntries;
Expand Down Expand Up @@ -540,9 +540,9 @@ public boolean isWithinMultiFields() {

public abstract void version(Field version);

public abstract Field seqNo();
public abstract SeqNoFieldMapper.SequenceID seqID();

public abstract void seqNo(Field seqNo);
public abstract void seqID(SeqNoFieldMapper.SequenceID seqID);

public final boolean includeInAll(Boolean includeInAll, FieldMapper mapper) {
return includeInAll(includeInAll, mapper.fieldType().indexOptions() != IndexOptions.NONE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.index.mapper.ParseContext.Document;
import org.elasticsearch.index.mapper.SeqNoFieldMapper;

import java.util.List;

Expand All @@ -35,7 +36,7 @@ public class ParsedDocument {

private final String id, type;
private final BytesRef uid;
private final Field seqNo;
private final SeqNoFieldMapper.SequenceID seqID;

private final String routing;

Expand All @@ -47,17 +48,16 @@ public class ParsedDocument {

private String parent;

public ParsedDocument(
Field version,
Field seqNo,
String id,
String type,
String routing,
List<Document> documents,
BytesReference source,
Mapping dynamicMappingsUpdate) {
public ParsedDocument(Field version,
SeqNoFieldMapper.SequenceID seqID,
String id,
String type,
String routing,
List<Document> documents,
BytesReference source,
Mapping dynamicMappingsUpdate) {
this.version = version;
this.seqNo = seqNo;
this.seqID = seqID;
this.id = id;
this.type = type;
this.uid = Uid.createUidAsBytes(type, id);
Expand All @@ -83,8 +83,10 @@ public Field version() {
return version;
}

public Field seqNo() {
return seqNo;
public void updateSeqID(long sequenceNumber, long primaryTerm) {
this.seqID.seqNo.setLongValue(sequenceNumber);
this.seqID.seqNoDocValue.setLongValue(sequenceNumber);
this.seqID.primaryTerm.setLongValue(primaryTerm);
}

public String routing() {
Expand Down
Loading